分布式系统架构分布式锁与并发控制的设计模式一、单机锁的失效分布式环境下的并发困境在单机应用中使用sync.Mutex或synchronized就能解决并发问题。但当服务部署到多个节点时单机锁只能保护本进程内的资源无法阻止其他节点同时操作共享资源。典型的场景包括防止重复下单、库存扣减的超卖、定时任务的重复执行。分布式锁看似简单——在 Redis 里设一个 key 就行但生产级的分布式锁远比想象中复杂。锁的获取、续期、释放、异常恢复每一个环节都有边界条件需要处理。更关键的是不同业务场景对锁的要求不同——有的要求强一致性有的允许偶尔失效选错方案会导致系统级故障。flowchart TB subgraph 单机锁失效场景 N1[节点A] --|本地锁保护| DB[(数据库)] N2[节点B] --|本地锁保护| DB N3[节点C] --|本地锁保护| DB Note1[三个节点的本地锁互不感知br/无法防止并发冲突] -.- DB end subgraph 分布式锁方案 N4[节点A] --|获取锁| Redis[(Redisbr/分布式锁)] N5[节点B] --|获取锁| Redis N6[节点C] --|获取锁| Redis Redis --|仅一个节点获得锁| DB2[(数据库)] end二、分布式锁的核心机制2.1 锁的四个基本操作分布式锁需要四个基本操作获取Acquire、续期Renew、释放Release和强制释放Force Release。获取操作需要保证原子性——检查 key 是否存在和设置 key 必须在同一个命令中完成。续期操作用于长时间任务防止锁因超时而被其他节点抢占。释放操作必须验证持有者身份防止误删其他节点的锁。2.2 Redlock 算法与单节点锁的取舍Redis 官方推荐的 Redlock 算法在多个独立 Redis 实例上获取锁只有大多数实例获取成功才算锁获取成功。这种方案提供了更强的安全性保证但延迟更高需要与多个实例通信且在时钟漂移场景下仍有极小概率失效。对于大多数业务场景单节点 Redis 锁 合理的超时时间已经足够。sequenceDiagram participant Client as 客户端 participant Redis as Redis单节点 Client-Redis: SET lock_key unique_value NX PX 30000 Note over Client,Redis: NX不存在时才设置br/PX过期时间30秒 Redis--Client: OK获取锁成功 Note over Client: 执行业务逻辑耗时较长 Client-Redis: EXPIRE lock_key 30000 Note over Client,Redis: 续期重置过期时间 Client-Redis: GET lock_key Redis--Client: unique_value确认是自己的锁 Client-Redis: DEL lock_key Note over Client,Redis: 释放先验证再删除三、生产级代码实现3.1 Redis 分布式锁import time import uuid import logging import asyncio from typing import Optional logger logging.getLogger(__name__) class DistributedLock: 基于 Redis 的分布式锁 设计考量 - 使用 SET NX PX 保证获取操作的原子性 - 每个锁持有者使用唯一标识防止误删 - 内置续期守护线程防止长任务锁超时 - 释放时使用 Lua 脚本保证检查删除的原子性 # Lua 脚本原子性地检查锁持有者并删除 RELEASE_SCRIPT if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end # Lua 脚本原子性地检查锁持有者并续期 RENEW_SCRIPT if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(pexpire, KEYS[1], ARGV[2]) else return 0 end def __init__( self, redis_client, lock_key: str, timeout_ms: int 30000, retry_count: int 3, retry_delay_ms: int 200, ): self.redis redis_client self.lock_key lock_key self.timeout_ms timeout_ms self.retry_count retry_count self.retry_delay_ms retry_delay_ms self.lock_value str(uuid.uuid4()) self._renew_task: Optional[asyncio.Task] None async def acquire(self) - bool: 获取分布式锁支持重试 for attempt in range(self.retry_count): result await self.redis.set( self.lock_key, self.lock_value, nxTrue, # 仅在 key 不存在时设置 pxself.timeout_ms, # 过期时间毫秒 ) if result: logger.info(f获取锁成功: key{self.lock_key}, value{self.lock_value}) # 启动续期守护 self._start_renew_daemon() return True if attempt self.retry_count - 1: jitter int(self.retry_delay_ms * (0.5 uuid.uuid4().int % 1000 / 2000)) await asyncio.sleep(jitter / 1000) logger.warning(f获取锁失败: key{self.lock_key}, 尝试 {self.retry_count} 次) return False async def release(self) - bool: 释放分布式锁使用 Lua 脚本保证原子性 # 停止续期守护 self._stop_renew_daemon() result await self.redis.eval( self.RELEASE_SCRIPT, 1, # KEYS 数量 self.lock_key, # KEYS[1] self.lock_value, # ARGV[1] ) if result: logger.info(f释放锁成功: key{self.lock_key}) else: logger.warning(f释放锁失败锁已过期或被其他持有者获取: key{self.lock_key}) return bool(result) def _start_renew_daemon(self) - None: 启动续期守护协程在锁过期前自动续期 self._renew_task asyncio.create_task(self._renew_loop()) def _stop_renew_daemon(self) - None: 停止续期守护 if self._renew_task: self._renew_task.cancel() self._renew_task None async def _renew_loop(self) - None: 续期循环每过 timeout_ms/3 续期一次 interval self.timeout_ms / 3000 # 转换为秒取 1/3 try: while True: await asyncio.sleep(interval) result await self.redis.eval( self.RENEW_SCRIPT, 1, self.lock_key, self.lock_value, self.timeout_ms, ) if not result: logger.error(f续期失败: key{self.lock_key}锁可能已被其他节点获取) break logger.debug(f续期成功: key{self.lock_key}) except asyncio.CancelledError: pass # 正常取消不需要处理 async def __aenter__(self): if not await self.acquire(): raise RuntimeError(f无法获取分布式锁: {self.lock_key}) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.release() return False3.2 业务使用示例async def deduct_inventory(product_id: str, quantity: int) - bool: 库存扣减使用分布式锁防止超卖 lock_key finventory:lock:{product_id} async with DistributedLock(redis_client, lock_key, timeout_ms10000) as lock: # 在锁保护下读取当前库存 current await redis_client.get(finventory:{product_id}) if current is None: return False current_qty int(current) if current_qty quantity: return False # 扣减库存 new_qty current_qty - quantity await redis_client.set(finventory:{product_id}, new_qty) return True四、边界分析与架构权衡4.1 Redis 锁的安全性边界Redis 分布式锁不是绝对安全的。在 Redis 主从切换时可能出现两个客户端同时持有锁的情况客户端 A 从主节点获取锁后主节点宕机锁数据尚未同步到从节点从节点升为主节点后客户端 B 也能获取同一把锁。如果业务要求绝对安全应使用 Redlock 算法或基于 etcd/ZooKeeper 的共识锁。4.2 锁超时与任务时长的矛盾锁超时时间设置过短长任务未完成锁就过期了设置过长持有者宕机后其他节点等待时间太久。续期守护机制缓解了这个问题但引入了新的风险——如果续期守护线程本身卡住如 GC 停顿锁仍可能过期。对于关键业务应在业务层做幂等校验不完全依赖锁的正确性。4.3 锁粒度的权衡粗粒度锁如按商品 ID 加锁实现简单但并发度低——所有对同一商品的请求串行执行。细粒度锁如按 SKU 仓库加锁并发度高但锁管理复杂且容易出现死锁。选择粒度的原则是锁的范围应恰好覆盖需要保护的资源不扩大也不缩小。五、总结分布式锁是分布式并发控制的基础设施但其安全性有边界。Redis 单节点锁适合大多数业务场景实现简单、性能优秀Redlock 和共识锁提供更强的安全保证但代价是更高的延迟和更复杂的运维。无论选择哪种方案都应在业务层做幂等校验不完全依赖锁的正确性。落地路线建议第一步识别系统中需要分布式保护的共享资源评估并发冲突的风险等级第二步对高风险资源如库存、余额接入 Redis 分布式锁第三步为长任务添加续期机制为关键操作添加幂等校验第四步对安全性要求极高的场景评估是否需要迁移到 Redlock 或 etcd 锁。
分布式系统架构:分布式锁与并发控制的设计模式
发布时间:2026/6/12 1:55:07
分布式系统架构分布式锁与并发控制的设计模式一、单机锁的失效分布式环境下的并发困境在单机应用中使用sync.Mutex或synchronized就能解决并发问题。但当服务部署到多个节点时单机锁只能保护本进程内的资源无法阻止其他节点同时操作共享资源。典型的场景包括防止重复下单、库存扣减的超卖、定时任务的重复执行。分布式锁看似简单——在 Redis 里设一个 key 就行但生产级的分布式锁远比想象中复杂。锁的获取、续期、释放、异常恢复每一个环节都有边界条件需要处理。更关键的是不同业务场景对锁的要求不同——有的要求强一致性有的允许偶尔失效选错方案会导致系统级故障。flowchart TB subgraph 单机锁失效场景 N1[节点A] --|本地锁保护| DB[(数据库)] N2[节点B] --|本地锁保护| DB N3[节点C] --|本地锁保护| DB Note1[三个节点的本地锁互不感知br/无法防止并发冲突] -.- DB end subgraph 分布式锁方案 N4[节点A] --|获取锁| Redis[(Redisbr/分布式锁)] N5[节点B] --|获取锁| Redis N6[节点C] --|获取锁| Redis Redis --|仅一个节点获得锁| DB2[(数据库)] end二、分布式锁的核心机制2.1 锁的四个基本操作分布式锁需要四个基本操作获取Acquire、续期Renew、释放Release和强制释放Force Release。获取操作需要保证原子性——检查 key 是否存在和设置 key 必须在同一个命令中完成。续期操作用于长时间任务防止锁因超时而被其他节点抢占。释放操作必须验证持有者身份防止误删其他节点的锁。2.2 Redlock 算法与单节点锁的取舍Redis 官方推荐的 Redlock 算法在多个独立 Redis 实例上获取锁只有大多数实例获取成功才算锁获取成功。这种方案提供了更强的安全性保证但延迟更高需要与多个实例通信且在时钟漂移场景下仍有极小概率失效。对于大多数业务场景单节点 Redis 锁 合理的超时时间已经足够。sequenceDiagram participant Client as 客户端 participant Redis as Redis单节点 Client-Redis: SET lock_key unique_value NX PX 30000 Note over Client,Redis: NX不存在时才设置br/PX过期时间30秒 Redis--Client: OK获取锁成功 Note over Client: 执行业务逻辑耗时较长 Client-Redis: EXPIRE lock_key 30000 Note over Client,Redis: 续期重置过期时间 Client-Redis: GET lock_key Redis--Client: unique_value确认是自己的锁 Client-Redis: DEL lock_key Note over Client,Redis: 释放先验证再删除三、生产级代码实现3.1 Redis 分布式锁import time import uuid import logging import asyncio from typing import Optional logger logging.getLogger(__name__) class DistributedLock: 基于 Redis 的分布式锁 设计考量 - 使用 SET NX PX 保证获取操作的原子性 - 每个锁持有者使用唯一标识防止误删 - 内置续期守护线程防止长任务锁超时 - 释放时使用 Lua 脚本保证检查删除的原子性 # Lua 脚本原子性地检查锁持有者并删除 RELEASE_SCRIPT if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end # Lua 脚本原子性地检查锁持有者并续期 RENEW_SCRIPT if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(pexpire, KEYS[1], ARGV[2]) else return 0 end def __init__( self, redis_client, lock_key: str, timeout_ms: int 30000, retry_count: int 3, retry_delay_ms: int 200, ): self.redis redis_client self.lock_key lock_key self.timeout_ms timeout_ms self.retry_count retry_count self.retry_delay_ms retry_delay_ms self.lock_value str(uuid.uuid4()) self._renew_task: Optional[asyncio.Task] None async def acquire(self) - bool: 获取分布式锁支持重试 for attempt in range(self.retry_count): result await self.redis.set( self.lock_key, self.lock_value, nxTrue, # 仅在 key 不存在时设置 pxself.timeout_ms, # 过期时间毫秒 ) if result: logger.info(f获取锁成功: key{self.lock_key}, value{self.lock_value}) # 启动续期守护 self._start_renew_daemon() return True if attempt self.retry_count - 1: jitter int(self.retry_delay_ms * (0.5 uuid.uuid4().int % 1000 / 2000)) await asyncio.sleep(jitter / 1000) logger.warning(f获取锁失败: key{self.lock_key}, 尝试 {self.retry_count} 次) return False async def release(self) - bool: 释放分布式锁使用 Lua 脚本保证原子性 # 停止续期守护 self._stop_renew_daemon() result await self.redis.eval( self.RELEASE_SCRIPT, 1, # KEYS 数量 self.lock_key, # KEYS[1] self.lock_value, # ARGV[1] ) if result: logger.info(f释放锁成功: key{self.lock_key}) else: logger.warning(f释放锁失败锁已过期或被其他持有者获取: key{self.lock_key}) return bool(result) def _start_renew_daemon(self) - None: 启动续期守护协程在锁过期前自动续期 self._renew_task asyncio.create_task(self._renew_loop()) def _stop_renew_daemon(self) - None: 停止续期守护 if self._renew_task: self._renew_task.cancel() self._renew_task None async def _renew_loop(self) - None: 续期循环每过 timeout_ms/3 续期一次 interval self.timeout_ms / 3000 # 转换为秒取 1/3 try: while True: await asyncio.sleep(interval) result await self.redis.eval( self.RENEW_SCRIPT, 1, self.lock_key, self.lock_value, self.timeout_ms, ) if not result: logger.error(f续期失败: key{self.lock_key}锁可能已被其他节点获取) break logger.debug(f续期成功: key{self.lock_key}) except asyncio.CancelledError: pass # 正常取消不需要处理 async def __aenter__(self): if not await self.acquire(): raise RuntimeError(f无法获取分布式锁: {self.lock_key}) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.release() return False3.2 业务使用示例async def deduct_inventory(product_id: str, quantity: int) - bool: 库存扣减使用分布式锁防止超卖 lock_key finventory:lock:{product_id} async with DistributedLock(redis_client, lock_key, timeout_ms10000) as lock: # 在锁保护下读取当前库存 current await redis_client.get(finventory:{product_id}) if current is None: return False current_qty int(current) if current_qty quantity: return False # 扣减库存 new_qty current_qty - quantity await redis_client.set(finventory:{product_id}, new_qty) return True四、边界分析与架构权衡4.1 Redis 锁的安全性边界Redis 分布式锁不是绝对安全的。在 Redis 主从切换时可能出现两个客户端同时持有锁的情况客户端 A 从主节点获取锁后主节点宕机锁数据尚未同步到从节点从节点升为主节点后客户端 B 也能获取同一把锁。如果业务要求绝对安全应使用 Redlock 算法或基于 etcd/ZooKeeper 的共识锁。4.2 锁超时与任务时长的矛盾锁超时时间设置过短长任务未完成锁就过期了设置过长持有者宕机后其他节点等待时间太久。续期守护机制缓解了这个问题但引入了新的风险——如果续期守护线程本身卡住如 GC 停顿锁仍可能过期。对于关键业务应在业务层做幂等校验不完全依赖锁的正确性。4.3 锁粒度的权衡粗粒度锁如按商品 ID 加锁实现简单但并发度低——所有对同一商品的请求串行执行。细粒度锁如按 SKU 仓库加锁并发度高但锁管理复杂且容易出现死锁。选择粒度的原则是锁的范围应恰好覆盖需要保护的资源不扩大也不缩小。五、总结分布式锁是分布式并发控制的基础设施但其安全性有边界。Redis 单节点锁适合大多数业务场景实现简单、性能优秀Redlock 和共识锁提供更强的安全保证但代价是更高的延迟和更复杂的运维。无论选择哪种方案都应在业务层做幂等校验不完全依赖锁的正确性。落地路线建议第一步识别系统中需要分布式保护的共享资源评估并发冲突的风险等级第二步对高风险资源如库存、余额接入 Redis 分布式锁第三步为长任务添加续期机制为关键操作添加幂等校验第四步对安全性要求极高的场景评估是否需要迁移到 Redlock 或 etcd 锁。