Redis 使用场景
1. 短信登录
1. 发送验证码
@Override
public Result sendCode(String phone) {
// 1、校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误");
}
String code = RandomUtil.randomNumbers(6);
// 2、保存验证码到redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + "phone", code, 1);
// 3、发送验证码
log.info("发送短信验证码成功,验证码:{}", code);
return Result.ok();
}2. 登录注册
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1.校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.从redis获取验证码并校验
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)) {
// 不一致,报错
return Result.fail("验证码错误");
}
// 4.一致,根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();
// 5.判断用户是否存在
if (user == null) {
// 6.不存在,创建新用户并保存
user = createUserWithPhone(phone);
}
// 7.保存用户信息到 redis中
// 7.1.随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true);
// 7.2.将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
// 7.3.存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 7.4.设置token有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.返回token
return Result.ok(token);
}3. 校验
public class RefreshTokenInterceptor implements HandlerInterceptor {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 目标方法执行之前
* @return true 放行, false 拦截
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
// 1、拿到token
String token = request.getHeader("authorization");
// 2、用token去redis中查询用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY+token);
if(userMap.isEmpty()){
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
return false;
}
// 3、将查询到的Hash数据转为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 4、存在,保存用户信息到ThreadLocal
UserContextHolder.setUser(userDTO);
// 存在刷新token有效期 30min
stringRedisTemplate.expire(LOGIN_USER_KEY+token,30, TimeUnit.MINUTES);
return true;
}
}2. 查询缓存
缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力
缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码(例如:
例1:Static final ConcurrentHashMap<K,V> map = new ConcurrentHashMap<>(); 本地用于高并发
例2:static final Cache<K,V> USER_CACHE = CacheBuilder.newBuilder().build(); 用于redis等缓存
例3:Static final Map<K,V> map = new HashMap(); 本地缓存由于其被Static修饰,所以随着类的加载而被加载到内存之中,作为本地缓存,由于其又被final修饰,所以其引用(例3:map)和对象(例3:new HashMap())之间的关系是固定的,不能改变,因此不用担心赋值(=)导致缓存失效;
添加商户缓存
在我们查询商户信息时,我们是直接操作从数据库中去进行查询的,大致逻辑是这样,直接查询数据库那肯定慢咯,所以我们需要增加缓存
缓存模型和思路

代码实现
@Override
public Result shopInfo(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、查询redis缓存
String shopString = stringRedisTemplate.opsForValue().get(key);
// 2、缓存中存在,直接返回
if(shopString!=null){
Shop shop = JSONUtil.toBean(shopString, Shop.class);
return Result.ok(shop);
}
// 3、缓存中不存在,查询数据库
Shop byId = getById(id);
// 4、设置缓存
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(byId));
return Result.ok(byId);
}缓存更新策略
**内存淘汰:**redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
**超时剔除:**当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存
**主动更新:**我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

数据库缓存不一致解决方案
由于我们的缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在。
解决方案:
Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理
Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致

综合考虑使用方案一 Cache Aside Pattern 人工编码方式
如果采用第一个方案,那么假设我们每次操作数据库后,都操作缓存,但是中间如果没有人查询,那么这个更新动作实际上只有最后一次生效,中间的更新动作意义并不大,我们可以把缓存删除,等待再次查询时,将缓存中的数据加载出来。
1. 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
2. 先操作缓存还是先操作数据库?下面图片给出答案

代码实现
/**
* 查询店铺数据
* @param id
* @return
*/
@Override
public Result shopInfo(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、查询redis缓存
String shopString = stringRedisTemplate.opsForValue().get(key);
// 2、缓存中存在,直接返回
if(shopString!=null){
Shop shop = JSONUtil.toBean(shopString, Shop.class);
return Result.ok(shop);
}
// 3、缓存中不存在,查询数据库
Shop byId = getById(id);
// 4、设置缓存
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(byId),30L, TimeUnit.MINUTES);
return Result.ok(byId);
}
/**
* 更新商铺信息
* @param shop
* @return
*/
@Transactional
@Override
public Result update(Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("店铺id不能为空");
}
// 1、更新数据库
updateById(shop);
// 2、删除缓存(用于下次查询缓存跟新)
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok("跟新成功");
}缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
**缓存空对象思路分析:**当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了
**布隆过滤:**布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中,
- 缓存空对象
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗
- 可能造成短期的不一致
- 布隆过滤
- 优点:内存占用较少,没有多余key
- 缺点:
- 实现复杂
- 存在误判可能

缓存空对象代码实现
/**
* 查询店铺数据
* @param id
* @return
*/
@Override
public Result shopInfo(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、查询redis缓存
String shopString = stringRedisTemplate.opsForValue().get(key);
// 2、缓存中存在空字符串,返回店铺不存在 防止缓存穿透
if("".equals(shopString)){
return Result.fail("店铺不存在");
}
// 3、缓存中存在,直接返回
if(shopString!=null){
Shop shop = JSONUtil.toBean(shopString, Shop.class);
return Result.ok(shop);
}
// 4、缓存中不存在,查询数据库
Shop byId = getById(id);
// 5、数据库中不存在,缓存空字符串 防止缓存穿透
if(byId == null){
stringRedisTemplate.opsForValue().set(key,"",2L, TimeUnit.MINUTES);
return Result.fail("店铺不存在");
}
// 设置真是缓存数据
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(byId),30L, TimeUnit.MINUTES);
return Result.ok(byId);
}小总结:
缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存null值
- 布隆过滤
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存

缓存击穿(热点key)
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
- 互斥锁
- 逻辑过期
逻辑分析:假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大

1、使用锁来解决:
核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿
流程图:

代码实现
/**
* 查询店铺数据
* @param id
* @return
*/
@Override
public Result shopInfo(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、查询redis缓存
String shopString = stringRedisTemplate.opsForValue().get(key);
// 2、缓存中存在空字符串,返回店铺不存在 防止缓存穿透
if("".equals(shopString)){
return Result.fail("店铺不存在");
}
// 3、缓存中存在,直接返回
if(shopString!=null){
Shop shop = JSONUtil.toBean(shopString, Shop.class);
return Result.ok(shop);
}
String lockKey = "lock:shop:" + id;
Shop byId = null;
try {
// 4、缓存中不存在,尝试获取互斥锁
if(!tryLock(lockKey)){
Thread.sleep(5000);
return shopInfo(id);
}
// 5、缓存中不存在,查询数据库
byId = getById(id);
// 6、数据库中不存在,缓存空字符串 防止缓存穿透
if(byId == null){
stringRedisTemplate.opsForValue().set(key,"",2L, TimeUnit.MINUTES);
return Result.fail("店铺不存在");
}
// 设置真是缓存数据
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(byId),30L, TimeUnit.MINUTES);
}catch (Exception e){
log.error(e.getMessage());
}finally {
// 7、释放锁
unlock(lockKey);
}
return Result.ok(byId);
}操作锁
// 尝试获取锁
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(flag);
}
// 释放锁
private boolean unlock(String key) {
return stringRedisTemplate.delete(key);
}2、利用逻辑过期解决缓存击穿问题
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
思路分析:当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

步骤一、
新建一个实体类,我们采用第二个方案,这个方案,对原来代码没有侵入性。
/**
* redis 逻辑过期实体类
* @param <T>
*/
@Data
public class RedisData<T> {
private LocalDateTime expireTime;
private T data;
public RedisData(){
}
public RedisData(LocalDateTime expireTime, T data) {
this.expireTime = expireTime;
this.data = data;
}
}步骤二、
锁操作、跟新缓存
// 将店铺数据存入redis
public void saveShop2Redis(Long id, Long expireSeconds) throws InterruptedException {
Thread.sleep(1000);
// 1. 查询店铺数据
Shop shop = getById(id);
// 2. 封装逻辑过期店铺数据
RedisData<Shop> shopRedisData = new RedisData(LocalDateTime.now().plusSeconds(expireSeconds), shop);
// 3. 将店铺数据写入 Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shopRedisData));
}
// 尝试获取锁
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(flag);
}
// 释放锁
private boolean unlock(String key) {
return stringRedisTemplate.delete(key);
}步骤二、
核心代码实现
// 创建一个包含10个工作线程的固定线程池 ExecutorService - Java并发框架中的接口,用于管理和控制线程执行
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 查询店铺数据
* @param id
* @return
*/
@Override
public Result shopInfo(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、查询redis缓存
String shopString = stringRedisTemplate.opsForValue().get(key);
// 2、缓存未命中,直接返回
if(shopString==null){
return Result.ok(null);
}
// 3、命中,判断是否过期
RedisData redisData = JSONUtil.toBean(shopString, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject)redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
// 3.1、未过期,直接返回店铺信息
return Result.ok(shop);
}
// 3.2、已过期,缓存重建
String lockKey = "lock:shop:" + id;
if(tryLock(lockKey)){
// 获取锁成功,开启独立线程,实现缓存重建
executorService.submit(() -> {
try {
// 重建缓存
this.saveShop2Redis(id, 20L);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
// 7、释放锁
unlock(lockKey);
}
});
}
// 4、返回过期的商铺信息
return Result.ok(shop);
}封装Redis工具类
基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
- 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓
存击穿问题
- 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
将逻辑进行封装
package com.hmdp.utils;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@Slf4j
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
// 线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 写入缓存
* @param key 键
* @param value 值
* @param time 时间大小
* @param unit 时间类型
*/
public void set(String key, Object value, Long time, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}
/**
* 写入逻辑过期缓存
* @param key 键
* @param value 值
* @param time 时间大小
* @param unit 时间类型
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
// 设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
// 逻辑过期时间
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
/**
* 缓存穿透
* @param keyPrefix key 前缀
* @param id id
* @param type 返回类型
* @param dbFallback 数据库查询函数
* @param time 过期时间
* @param unit 过期时间单位
* @return R
* @param <R> 返回类型
* @param <ID> 回调函数参数类型
*/
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time,
TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值
if (json != null) {
// 返回一个错误信息
return null;
}
// 4.不存在,根据id查询数据库 (回调函数调用)
R r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", 10L, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
return r;
}
/**
* 缓存击穿(利用逻辑过期解决)
* @param keyPrefix key 前缀
* @param id id
* @param type 返回类型
* @param dbFallback 数据库查询函数
* @param time 过期时间
* @param unit 过期时间单位
* @return R
* @param <R> 返回类型
* @param <ID> 回调函数参数类型
*/
public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback,
Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject)redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return r;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = "lockKey" + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock) {
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查询数据库(回调函数调用)
R newR = dbFallback.apply(id);
// 重建缓存
this.setWithLogicalExpire(key, newR, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return r;
}
/**
* 缓存击穿(利用互斥锁解决)
* @param keyPrefix key 前缀
* @param id id
* @param type 返回类型
* @param dbFallback 数据库查询函数
* @param time 过期时间
* @param unit 过期时间单位
* @return R
* @param <R> 返回类型
* @param <ID> 回调函数参数类型
*/
public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time,
TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
}
// 判断命中的是否是空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}
// 4.实现缓存重建
// 4.1.获取互斥锁
String lockKey = "lockKey" + id;
R r = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2.判断是否获取成功
if (!isLock) {
// 4.3.获取锁失败,休眠并重试
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
}
// 4.4.获取锁成功,根据id查询数据库
r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", 10L, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 7.释放锁
unlock(lockKey);
}
// 8.返回
return r;
}
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(flag);
}
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
// 逻辑过期实体类
@Data
class RedisData<T> {
private LocalDateTime expireTime;
private T data;
}
}3. 秒杀场景
全局唯一ID
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
Redis实现全局唯一Id
/**
* redis 全局唯一id生成器
*/
@Component
public class RedisIdWorker {
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 开始时间 2023-01-01 00:00:00
private static final long BEGIN_TIMESTAMP = 1052269261L;
/**
* 生成全局唯一id
*
* @param keyPrefix key前缀
* @return
*/
public Long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowEpochSecond = now.toEpochSecond(java.time.ZoneOffset.UTC);
long timestamp = nowEpochSecond - BEGIN_TIMESTAMP;
// 2.生成序列号
String key = "icr:" + keyPrefix + ":" + now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 自增长
long count = stringRedisTemplate.opsForValue().increment(key);
// 3.拼接并返回 或运算左移动32为,与序列号做或或运算
return (timestamp << 32) | count;
}
public static void main(String[] args) {
// 生成起始时间戳
LocalDateTime localDateTime = LocalDateTime.of(2003, 5, 7, 1, 1, 1);
System.out.println(localDateTime);
long l = localDateTime.toEpochSecond(java.time.ZoneOffset.UTC);
System.out.println(l);
}
}库存超卖问题
使用MySQL的锁机制,防止超卖问题。
// 4、扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", id).gt("stock", 0) // where voucher_id = ? and stock > 0
.update();
if(!success){
return Result.fail("库存不足");
}一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
/**
* 秒杀优惠券
* @param id 优惠卷id
* @return
*/
@Override
public Result seckillVoucher(Long id) {
// 1、查询优惠卷信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(id);
// 2、判断是否在活动时间内
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
// 3、判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足");
}
Long userId = UserContextHolder.getUser().getId();
// 一人一单锁,多用户id的值为锁
synchronized (userId.toString().intern()){
// 获取代理对象事务
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
// 创建订单、扣减库存
return proxy.createVoucherOrder(id,userId);
}
}
/**
* 创建订单、扣减库存
* @param id 优惠卷id
* @param userId 用户id
* @return
*/
@Transactional
public Result createVoucherOrder(Long id,Long userId) {
// 4、查询订单是否存在
int count = query().eq("user_id", userId).eq("voucher_id", id).count();
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 5、扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", id).gt("stock", 0) // where voucher_id = ? and stock > 0
.update();
if(!success){
return Result.fail("库存不足");
}
// 6、创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 订单id
Long voucherOrderId = redisIdWorker.nextId("order");
voucherOrder.setId(voucherOrderId);
// 用户id 通过 ThreadLocal 获取用户id
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(id);
boolean save = save(voucherOrder);
// 7、返回订单id
return Result.ok(voucherOrderId);
}这里会造成事务失效,我们需要获得原始的代理事务对象,还需要开启暴露代理对象。
@EnableAspectJAutoProxy(exposeProxy = true) // 开启代理暴露
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}<!--暴露代理对象注解需要导入下面这个依赖-->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.7</version>
</dependency>集群环境下的并发问题
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

4. 分布式锁
基本原理和实现方式对比
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

那么分布式锁他应该满足一些什么样的条件呢?
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环

常见的分布式锁有三种
MySQL:MySQL本身就带有锁机制,但是由于MySQL性能本身一般,所以采用分布式锁的情况下,其实使用MySQL作为分布式锁比较少见Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

Redis分布式锁的实现核心
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
非阻塞:尝试一次,成功返回true,失败返回false

释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间

核心思路:
我们利用redis 的setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可
分布式锁实现
编写锁的基本操作
/**
* redis 锁操作
*/
@Component
public class SimpleRedisLock implements Lock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(String name, long timeoutSec) {
long id = Thread.currentThread().getId();
Boolean b = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, id + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(b);
}
@Override
public void unlock(String name) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}使用redis分布式锁
// 4、获取redis分布式锁
if (!simpleRedisLock.tryLock("order:" + userId,10)){
return Result.fail("不允许重复下单");
}
try{
// 获取代理对象事务
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
// 创建订单、扣减库存
return proxy.createVoucherOrder(id,userId);
}catch (Exception e){
e.printStackTrace();
return Result.fail("下单失败");
}finally {
// 释放锁
simpleRedisLock.unlock("order:" + userId);
}Redis分布式锁误删问题
逻辑说明:
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明
解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

解决锁误删问题
需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。

/**
* redis 锁操作
*/
@Component
public class SimpleRedisLock implements Lock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String UUID_VALUE = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(String name, long timeoutSec) {
long threadId = Thread.currentThread().getId();
Boolean b = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, UUID_VALUE + threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(b);
}
@Override
public void unlock(String name) {
// 获取标识
String value = UUID_VALUE + Thread.currentThread().getId();;
// 获取锁的值
String lockValue = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断是否是自己的锁
if(value.equals(lockValue)){
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}更为极端的误删逻辑说明:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,

Lua脚本实现原子性
lua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可,所以在笔记中并不会详细的去解释这些lua表达式的含义。
我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股
unlock.lua 脚本
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0原子性释放锁
// lua脚本对象
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 获取resources目录下的unlock.lua脚本
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 设置脚本返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
/**
* 原子性释放锁
* @param name
*/
@Override
public void unlock(String name) {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
UUID_VALUE + Thread.currentThread().getId());
}5. 分布式锁-redission
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
**超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

那么什么是Redission呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redission提供了分布式锁的多种多样的功能

Redission 快速入门
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}使用Redisson锁:
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}Redission 可重入锁原理
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。

Redission锁的MutiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。

为了解决这个问题,redission提出来了MutiLock锁,MultiLock 是 Redisson 提供的一种联锁机制,可以同时对多个资源进行加锁,只有当所有锁都获取成功时才算成功,这样可以避免死锁问题。
使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

6. Redis 消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

使用队列的好处在于 **解耦:**所谓解耦,举一个生活中的例子就是:快递员(生产者)把快递放到快递柜里边(Message Queue)去,我们(消费者)从快递柜里边去拿东西,这就是一个异步,如果耦合,那么这个快递员相当于直接把快递交给你,这事固然好,但是万一你不在家,那么快递员就会一直等你,这就浪费了快递员的时间,所以这种思想在我们日常开发中,是非常有必要的。
这种场景在我们秒杀中就变成了:我们下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快我们的响应速度。
这里我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。
三种消息队列实现
List 实现消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列有哪些优缺点? 优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列

删除指定的消费者组
XGROUP DESTORY key groupName给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
最后我们来个小对比

Stream消息队列,实现异步秒杀
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
// 创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
// 启动一个线程
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 创建订单处理器
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1、获取队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 消费组 g1 消费者 c1
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // 读取一条 阻塞2秒等待
StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) // 消息队列名 stream.orders
);
//2、判断消息队列获取的是否为空
if (list == null || list.isEmpty()) {
continue;
}
//3、获取成功下单
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> mapValue = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(mapValue, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
//4、ACK确认消息 如果确认失败会抛出异常
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", entries.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
// 处理pending-list中的订单信息(异常消息)
handlePendingList();
}
}
}
}
// 处理pending-list中的订单信息(异常消息)
public void handlePendingList() {
while (true) {
try {
//1、获取队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 消费组 g1 消费者 c1
StreamReadOptions.empty().count(1), // 读取一条
StreamOffset.create("stream.orders", ReadOffset.from("0")) // 消息队列名 stream.orders 从pending-list中获取已消费但未确认的消息
);
//2、判断消息队列获取的是否为空
if (list == null || list.isEmpty()) {
// 如果没有读取队列异常,直接跳出。
break;
}
//3、获取成功下单
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> mapValue = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(mapValue, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
//4、ACK确认消息 如果确认失败也会抛出异常
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", entries.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}7. 排行榜
在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
之前的点赞是放到set集合,但是set集合是不能排序的,所以这个时候,咱们可以采用一个可以排序的set集合,就是咱们的sortedSet

点赞逻辑
public Result likeBlog(Long id) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score == null) {
// 3.如果未点赞,可以点赞
// 3.1.数据库点赞数 + 1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
// 3.2.保存用户到Redis的set集合 zadd key value score
if (isSuccess) {
// 利用 sortedSet 结构通过 System.currentTimeMillis() 排序
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
} else {
// 4.如果已点赞,取消点赞
// 4.1.数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
// 4.2.把用户从Redis的set集合移除
if (isSuccess) {
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}排行榜查询
/**
* 点赞排行榜查询
*/
@Override
public Result queryBlogLikes(Long id) {
String key = BLOG_LIKED_KEY + id;
// 1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null || top5.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
// 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)
List<UserDTO> userDTOS = userService.query()
.in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
// 4.返回
return Result.ok(userDTOS);
}8. 好友共同关注
在set集合中,有交集并集补集的api,我们可以把两人的关注的人分别放入到一个set集合中,然后再通过api去查看这两个set集合中的交集数据。
核心代码
@Override
public Result followCommons(Long id) {
// 1.获取当前用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
// 2.求交集
String key2 = "follows:" + id;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
if (intersect == null || intersect.isEmpty()) {
// 无交集
return Result.ok(Collections.emptyList());
}
// 3.解析id集合
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
// 4.查询用户
List<UserDTO> users = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(users);
}9. 滚动查询
具体操作如下:
1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件
2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据
综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳 和偏移量这两个参数。
这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。
/**
* 滚动分页查询
* @param lastId 最新时间戳
* @param offset 偏移量
* @return
*/
@Override
public Result queryBlogOfFollow(Long lastId, Integer offset) {
// 1.获取当前用户
Long useId = UserContextHolder.getUser().getId();
String key = "feed:" + useId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, lastId, offset, 2);
if(typedTuples==null || typedTuples.isEmpty()){
return Result.ok();
}
// 2.解析数据:blogId、minTime(时间戳)、offset(偏移量)
List<Long> ids = new ArrayList<>();
long minTime = 0;
int os = 1;
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
ids.add(Long.valueOf(typedTuple.getValue()));
long time = typedTuple.getScore().longValue();
if(time==minTime){
os++;
}else{
minTime=time;
os=1;
}
}
// 3.根据id查询blog
String idStr = ids.stream().map(String::valueOf).collect(Collectors.joining(","));
List<Blog> blogs = query().in("id", ids).last("ORDER BY field(id," + idStr + ")").list();
for (Blog blog : blogs) {
// 3.1.查询blog有关的用户
queryBlogUser(blog);
// 3.2.查询blog是否被点赞
isBlogLiked(blog);
}
ScrollResult scrollResult = new ScrollResult();
scrollResult.setList(blogs);
scrollResult.setOffset(os);
scrollResult.setMinTime(minTime);
return Result.ok(scrollResult);
}10. GEO - 附近查询
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
- GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
- GEODIST:计算指定的两个点之间的距离并返回
- GEOHASH:将指定member的坐标转为hash字符串形式并返回
- GEOPOS:返回指定member的坐标
- GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
- GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
- GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
插入数据
@Test
public void test(){
List<Shop> list = shopService.list();
String key = "shop:geo:";
for(Shop shop : list){
stringRedisTemplate.opsForGeo().add(key+shop.getTypeId(),new Point(shop.getX(),shop.getY()),shop.getId().toString());
}
}查询代码
String key = "shop:geo:" + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> search = stringRedisTemplate.opsForGeo().search(key,
// 设置中心点
GeoReference.fromCoordinate(x, y),
// 设置查询半径、单位
new Distance(5000),
// 设置分页参数
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));11. BitMap-签到
我们针对签到功能完全可以通过mysql来完成,比如说以下这张表

用户一次签到,就是一条记录,假如有1000万用户,平均每人每年签到次数为10次,则这张表一年的数据量为 1亿条
每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,一个月则最多需要600多字节
我们如何能够简化一点呢?其实可以考虑小时候一个挺常见的方案,就是小时候,咱们准备一张小小的卡片,你只要签到就打上一个勾,我最后判断你是否签到,其实只需要到小卡片上看一看就知道了
我们可以采用类似这样的方案来实现我们的签到需求。
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0.
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示
Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。

BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT :获取指定位置(offset)的bit值
- BITCOUNT :统计BitMap中值为1的bit位的数量
- BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
- BITOP :将多个BitMap的结果做位运算(与 、或、异或)
- BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
实现签到功能
思路:我们可以把年和月作为bitMap的key,然后保存到一个bitMap中,每次签到就到对应的位上把数字从0变成1,只要对应是1,就表明说明这一天已经签到了,反之则没有签到。
@Override
public Result sign() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5.写入Redis SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
return Result.ok();
}连续签到统计
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

如何得到本月到今天为止的所有签到数据?
# u:无符号 dayOfMonth:多少位 0:起始位
BITFIELD key GET u [dayOfMonth] 0如何从后向前遍历每个bit位?
- bitMap返回的数据是10进制数(num)
- 用num与
1进行与运算(num&1)如果没有变那么就签到++ - 直到变化那么就断签了
代码实现
@Override
public Result signCount() {
// 1. 获取当前用户
UserDTO user = UserContextHolder.getUser();
// 2. 获取日期
LocalDateTime now = LocalDateTime.now();
String nowStr = now.format(DateTimeFormatter.ofPattern(":yyyyMMdd"));
// 3. 拼接key
String key ="sign:"+user.getId() + nowStr;
// 4. 获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5. 获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字 BITFIELD sign:1:202203 GET u14 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create() // 创建命令 从第0位开始,获取dayOfMonth位无符号整数
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (result==null || result.isEmpty()) {
return Result.ok(0);
}
Long num = result.get(0);
int count = 0;
while (true){
// 6.1.让这个数字与1做与运算,得到数字的最后一个bit位
if ((num & 1) == 0) {
// 6.2.如果这个bit位是0,说明未签到,结束
break;
}else {
// 6.3.如果这个bit位是1,说明已签到,计数器+1
count++;
}
// 6.4.数字右移一位
num = num >> 1;
}
return Result.ok(count);
}12. UV 统计
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0 Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。
查看内存
# Redis 命令
INFO memory
used_memory: 已使用内存
used_memory_human: 人类可读的内存使用量
used_memory_rss: 系统分配的内存
used_memory_peak: 内存使用峰值
maxmemory: 最大内存限制
UV统计测试
@Test
public void testHyperLogLog(){
// 准备数据
String[] values = new String[1000];
int j = 0;
for (int i = 0; i < 1000000; i++) {
values[j++] = "user_" + i;
// 1000个一批插入数据,总共插入100w
if (j == 1000) {
stringRedisTemplate.opsForHyperLogLog().add("hll1", values);
j = 0;
values = new String[1000];
}
}
Long size = stringRedisTemplate.opsForHyperLogLog().size("hll1");
System.out.println("size = " + size); // size = 997593
}永远小于16kb、小于0.81%的误差

