高并发场景秒杀抢购超卖BUG场景多个请求同时读取到剩余库存各自判断库存容量符合抢购条件导致减库存操作执行异常。示例代码public String deductStock() { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } return end; }当多个请求同时扣减库存时导致最后库存不准确。解决方式jvm级锁与分布式锁高并发场景下jvm级锁与分布式锁synchronized 在分布式架构中当请求分配到不同服务器上时每个服务器上的jvm锁都会认为自己获得了锁允许请求进入导致锁失效。 分布式锁把锁从**每个进程内部**移到**所有进程都能访问的外部独立系统**中让所有服务器竞争同一把锁。分布式锁简单版本命令SETNX key value如果key不存在将key的值设置为value存在则不做任何操作。代码如下public String deductStock() { String lockKey lock:product\_101; Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, xxx); if (!result) return error\_code; try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { stringRedisTemplate.delete(lockKey); } return end; } // try--catch此时作用-- 防止程序执行时出现异常导致锁未释放导致死锁。此时未设置超时时间。分布式锁设置超时时间添加代码 stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS); 问题加锁与设置超时时间代码在redis执行性无法保证一起顺序执行。redis就执行命令单线程的命令需要排队执行执行这两个命令时可能被其他请求执行命令进行加塞原子性问题 原子性单线程保证使用组合指令Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, zhuge,10,TimeUnit.SECONDS);代码如下public String deductStock() { String lockKey lock:product\_101; Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, xxx,10,TimeUnit.SECONDS); if (!result) return error\_code; try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { stringRedisTemplate.delete(lockKey); } return end; }并发量大时依然存在锁失效问题假设3个线程 threadA,threadB,threadCthreadA获取锁后10s未执行完库存扣减逻辑此时锁已经失效threadB获取到锁开始执行5s,threadA同时继续执行5s且执行释放锁逻辑此时ThreadA释放threadB的锁。threadC获取到锁开始执行5s,threadB同时继续执行5s且执行释放锁逻辑此时ThreadB释放threadC的锁。导致锁一直是失效状态。在高并发场景线程执行顺序完全不可预料上述只是极端情况。问题根本点当前线程加的锁被其他线程释放 . 解决给每个线程分配唯一ID。不可以用threadId,因为每一台服务器可能存在相同的THreadId代码如下public String deductStock() { String lockKey lock:product\_101; String clientId UUID.randomUUID().toString(); //唯一id Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId,10,TimeUnit.SECONDS); if (!result) return error\_code; try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { // 通过唯一id区分是否是当前线程的锁 stringRedisTemplate.delete(lockKey); //删除锁 } } return end; }此时依然存在锁失效问题当ThreadA在执行区分是否是当前线程的锁逻辑后出现卡顿在这个卡顿时间中锁失效此时ThreadB获取到锁THreadA继续执行依旧可以释放ThreadB的锁。极端情况 区分是否是当前线程的锁与删除锁不是原子性操作。且无原生命令解决。根本点超时时间过期问题解决延长超期时间(治标不治本)锁续命主线程获取锁执行task任务时分线程定时判断主线程是否执行结束没有结束将key超时时间重新设置。redisson 分布式锁原理核心特点通过看门狗Watchdog自动续期机制解决了锁超时释放与业务执行时间不匹配的难题同时在加锁、解锁的各个环节都做了精密设计确保原子性与安全性如图线程1加锁成功后默认过期时间30s,redisson启动后台线程每隔10s检查线程1是否持有锁持有则延长线程1持有锁时间30s。 线程2获取所失败后堵塞并订阅频道在while循环中间歇性尝试加锁当线程1释放锁后会像订阅的频道中发布消息所有订阅该频道的线程都会被唤醒尝试获取锁。工作原理初始设置客户端加锁成功时默认设置锁的过期时间为30秒启动续期加锁成功后Redisson 会为该锁启动一个看门狗线程。定时检查该线程会每隔10秒即30秒 / 3执行一次。执行续期每次执行时它会检查锁是否仍被当前线程持有。如果是它就通过 Lua 脚本将锁的过期时间重置为30秒循环往复只要业务还在运行锁未被主动释放这个“检查-续期”的过程就会无限循环下去如何停止正常情况当业务代码执行完毕调用unlock()释放锁时Redisson 会同时取消这个看门狗定时任务异常情况如果持有锁的客户端宕机或网络中断看门狗线程也随之消亡无法进行续期。锁就会在最后一次续期后的30秒自动过期避免了死锁阻塞重试机制 不会消耗cpu资源当一个客户端尝试获取锁失败时锁已被他人持有Redisson 并不会立刻返回失败而是会进入一个高效的阻塞等待模式订阅释放事件获取锁失败的客户端会通过 Redis 的发布/订阅Pub/Sub功能订阅一个与锁名称相关的特定频道阻塞等待线程会在一个while(true)循环中等待并定期尝试重新获取锁收到唤醒信号当持有锁的客户端释放锁时它会向这个频道发布一条释放消息再次尝试所有订阅了该频道的客户端会收到通知从等待状态中被唤醒然后再次尝试获取锁锁的释放unlock()方法的实现也至关重要核心依然是 Lua 脚本确保操作的原子性。检查当前线程的锁是否存在若不存在说明锁已失效或未被持有抛出异常。若存在则将重入次数value减1如果减1后的值仍然大于0表示线程仍然持有锁重入只是减少计数并重新设置过期时间。如果减1后的值等于0表示锁需要彻底释放执行del命令删除锁的 Key并发布一条释放锁的消息以便通知唤醒其他阻塞等待的客户端。代码如下public String deductStock3() { String lockKey lock:product\_101; //获取锁对象 RLock redissonLock redisson.getLock(lockKey); //加分布式锁 redissonLock.lock(); // .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { //解锁 redissonLock.unlock(); } return end; }备注 tryLock(long waitTIme ,long leaseTimeTImeUnit unit); 尝试加锁时间 waitTIme加锁成功后锁过期时间 lease Timeredis主从/集群架构分布式锁失效问题当主节点 key 写入成功后准备同步从节点主节点宕机从节点成为主节点并没有此key其他线程也可以加锁成功针对同一个商品减库存。redLock 红锁超过半数redis节点加锁成功才算加锁成功也没有百分百解决代码如下public String redlock() { String lockKey product\_001; //这里需要自己实例化不同redis实例的redisson客户端连接这里只是伪代码用一个redisson客户端简化了 RLock lock1 redisson.getLock(lockKey); RLock lock2 redisson.getLock(lockKey); RLock lock3 redisson.getLock(lockKey); /\*\* \* 根据多个 RLock 对象构建 RedissonRedLock 最核心的差别就在这里 \*/ RedissonRedLock redLock new RedissonRedLock(lock1, lock2, lock3); try { /\*\* \* waitTimeout 尝试获取锁的最大等待时间超过这个值则认为获取锁失败 \* leaseTime 锁的持有时间,超过这个时间锁会自动失效值应设置为大于业务处理的时间确保在锁有效期内业务能处理完 \*/ boolean res redLock.tryLock(10, 30, TimeUnit.SECONDS); if (res) { //成功获得锁在这里处理业务 } } catch (Exception e) { throw new RuntimeException(lock fail); } finally { //无论如何, 最后都要解锁 redLock.unlock(); } return end; }问题依然存在主从/集群架构分布式锁失效集群中从节点升级为主节点前未同步到主节点数据大促场景如何将分布式锁性能提升100倍减小锁的粒度、分段加锁如一个库存容量1000分成10段存入每段100份。 此时可以通过10个分布式锁操作10段数据。 代码层面维护段位库记录总标记可以循环进行锁数据当段数据不够一次扣减时通过逻辑进行额外处理。
redis-分布式锁
发布时间:2026/5/22 23:18:25
高并发场景秒杀抢购超卖BUG场景多个请求同时读取到剩余库存各自判断库存容量符合抢购条件导致减库存操作执行异常。示例代码public String deductStock() { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } return end; }当多个请求同时扣减库存时导致最后库存不准确。解决方式jvm级锁与分布式锁高并发场景下jvm级锁与分布式锁synchronized 在分布式架构中当请求分配到不同服务器上时每个服务器上的jvm锁都会认为自己获得了锁允许请求进入导致锁失效。 分布式锁把锁从**每个进程内部**移到**所有进程都能访问的外部独立系统**中让所有服务器竞争同一把锁。分布式锁简单版本命令SETNX key value如果key不存在将key的值设置为value存在则不做任何操作。代码如下public String deductStock() { String lockKey lock:product\_101; Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, xxx); if (!result) return error\_code; try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { stringRedisTemplate.delete(lockKey); } return end; } // try--catch此时作用-- 防止程序执行时出现异常导致锁未释放导致死锁。此时未设置超时时间。分布式锁设置超时时间添加代码 stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS); 问题加锁与设置超时时间代码在redis执行性无法保证一起顺序执行。redis就执行命令单线程的命令需要排队执行执行这两个命令时可能被其他请求执行命令进行加塞原子性问题 原子性单线程保证使用组合指令Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, zhuge,10,TimeUnit.SECONDS);代码如下public String deductStock() { String lockKey lock:product\_101; Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, xxx,10,TimeUnit.SECONDS); if (!result) return error\_code; try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { stringRedisTemplate.delete(lockKey); } return end; }并发量大时依然存在锁失效问题假设3个线程 threadA,threadB,threadCthreadA获取锁后10s未执行完库存扣减逻辑此时锁已经失效threadB获取到锁开始执行5s,threadA同时继续执行5s且执行释放锁逻辑此时ThreadA释放threadB的锁。threadC获取到锁开始执行5s,threadB同时继续执行5s且执行释放锁逻辑此时ThreadB释放threadC的锁。导致锁一直是失效状态。在高并发场景线程执行顺序完全不可预料上述只是极端情况。问题根本点当前线程加的锁被其他线程释放 . 解决给每个线程分配唯一ID。不可以用threadId,因为每一台服务器可能存在相同的THreadId代码如下public String deductStock() { String lockKey lock:product\_101; String clientId UUID.randomUUID().toString(); //唯一id Boolean result stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId,10,TimeUnit.SECONDS); if (!result) return error\_code; try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { // 通过唯一id区分是否是当前线程的锁 stringRedisTemplate.delete(lockKey); //删除锁 } } return end; }此时依然存在锁失效问题当ThreadA在执行区分是否是当前线程的锁逻辑后出现卡顿在这个卡顿时间中锁失效此时ThreadB获取到锁THreadA继续执行依旧可以释放ThreadB的锁。极端情况 区分是否是当前线程的锁与删除锁不是原子性操作。且无原生命令解决。根本点超时时间过期问题解决延长超期时间(治标不治本)锁续命主线程获取锁执行task任务时分线程定时判断主线程是否执行结束没有结束将key超时时间重新设置。redisson 分布式锁原理核心特点通过看门狗Watchdog自动续期机制解决了锁超时释放与业务执行时间不匹配的难题同时在加锁、解锁的各个环节都做了精密设计确保原子性与安全性如图线程1加锁成功后默认过期时间30s,redisson启动后台线程每隔10s检查线程1是否持有锁持有则延长线程1持有锁时间30s。 线程2获取所失败后堵塞并订阅频道在while循环中间歇性尝试加锁当线程1释放锁后会像订阅的频道中发布消息所有订阅该频道的线程都会被唤醒尝试获取锁。工作原理初始设置客户端加锁成功时默认设置锁的过期时间为30秒启动续期加锁成功后Redisson 会为该锁启动一个看门狗线程。定时检查该线程会每隔10秒即30秒 / 3执行一次。执行续期每次执行时它会检查锁是否仍被当前线程持有。如果是它就通过 Lua 脚本将锁的过期时间重置为30秒循环往复只要业务还在运行锁未被主动释放这个“检查-续期”的过程就会无限循环下去如何停止正常情况当业务代码执行完毕调用unlock()释放锁时Redisson 会同时取消这个看门狗定时任务异常情况如果持有锁的客户端宕机或网络中断看门狗线程也随之消亡无法进行续期。锁就会在最后一次续期后的30秒自动过期避免了死锁阻塞重试机制 不会消耗cpu资源当一个客户端尝试获取锁失败时锁已被他人持有Redisson 并不会立刻返回失败而是会进入一个高效的阻塞等待模式订阅释放事件获取锁失败的客户端会通过 Redis 的发布/订阅Pub/Sub功能订阅一个与锁名称相关的特定频道阻塞等待线程会在一个while(true)循环中等待并定期尝试重新获取锁收到唤醒信号当持有锁的客户端释放锁时它会向这个频道发布一条释放消息再次尝试所有订阅了该频道的客户端会收到通知从等待状态中被唤醒然后再次尝试获取锁锁的释放unlock()方法的实现也至关重要核心依然是 Lua 脚本确保操作的原子性。检查当前线程的锁是否存在若不存在说明锁已失效或未被持有抛出异常。若存在则将重入次数value减1如果减1后的值仍然大于0表示线程仍然持有锁重入只是减少计数并重新设置过期时间。如果减1后的值等于0表示锁需要彻底释放执行del命令删除锁的 Key并发布一条释放锁的消息以便通知唤醒其他阻塞等待的客户端。代码如下public String deductStock3() { String lockKey lock:product\_101; //获取锁对象 RLock redissonLock redisson.getLock(lockKey); //加分布式锁 redissonLock.lock(); // .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); try { int stock Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); // jedis.get(stock) if (stock 0) { int realStock stock - 1; stringRedisTemplate.opsForValue().set(stock, realStock ); // jedis.set(key,value) System.out.println(扣减成功剩余库存: \ realStock); } else { System.out.println(扣减失败库存不足); } } finally { //解锁 redissonLock.unlock(); } return end; }备注 tryLock(long waitTIme ,long leaseTimeTImeUnit unit); 尝试加锁时间 waitTIme加锁成功后锁过期时间 lease Timeredis主从/集群架构分布式锁失效问题当主节点 key 写入成功后准备同步从节点主节点宕机从节点成为主节点并没有此key其他线程也可以加锁成功针对同一个商品减库存。redLock 红锁超过半数redis节点加锁成功才算加锁成功也没有百分百解决代码如下public String redlock() { String lockKey product\_001; //这里需要自己实例化不同redis实例的redisson客户端连接这里只是伪代码用一个redisson客户端简化了 RLock lock1 redisson.getLock(lockKey); RLock lock2 redisson.getLock(lockKey); RLock lock3 redisson.getLock(lockKey); /\*\* \* 根据多个 RLock 对象构建 RedissonRedLock 最核心的差别就在这里 \*/ RedissonRedLock redLock new RedissonRedLock(lock1, lock2, lock3); try { /\*\* \* waitTimeout 尝试获取锁的最大等待时间超过这个值则认为获取锁失败 \* leaseTime 锁的持有时间,超过这个时间锁会自动失效值应设置为大于业务处理的时间确保在锁有效期内业务能处理完 \*/ boolean res redLock.tryLock(10, 30, TimeUnit.SECONDS); if (res) { //成功获得锁在这里处理业务 } } catch (Exception e) { throw new RuntimeException(lock fail); } finally { //无论如何, 最后都要解锁 redLock.unlock(); } return end; }问题依然存在主从/集群架构分布式锁失效集群中从节点升级为主节点前未同步到主节点数据大促场景如何将分布式锁性能提升100倍减小锁的粒度、分段加锁如一个库存容量1000分成10段存入每段100份。 此时可以通过10个分布式锁操作10段数据。 代码层面维护段位库记录总标记可以循环进行锁数据当段数据不够一次扣减时通过逻辑进行额外处理。