分布式锁实现方案:从 Redis 到 etcd 的一致性权衡与工程选型 分布式锁实现方案从 Redis 到 etcd 的一致性权衡与工程选型一、分布式场景下的并发失控当本地锁不再有效单体应用中用sync.Mutex或synchronized就能解决的并发问题在分布式环境下变得复杂——多个服务实例同时操作共享资源如库存扣减、账户转账、定时任务触发本地锁只能保护单进程内的并发跨进程的并发访问完全不受控。一个典型的故障场景电商秒杀活动中两个服务实例同时读取到库存为 1各自执行扣减最终库存变为 -1。这种超卖问题在分布式系统中极为常见根本原因是缺乏跨进程的互斥机制。分布式锁就是解决这个问题的标准方案——在同一时刻只允许一个服务实例持有锁并操作共享资源。但分布式锁的实现远比想象中复杂。网络分区、进程崩溃、时钟漂移等分布式系统的固有挑战使得加锁-操作-释放这个看似简单的流程在每个环节都可能出错。选择不同的实现方案Redis、etcd、ZooKeeper本质上是在一致性、性能和可用性之间做不同的权衡。二、分布式锁的实现原理与一致性模型flowchart TB subgraph Redis 方案: 最终一致性 R_CLIENT1[客户端1] -- |SET NX EX| R_LOCK[Redis: 锁Key] R_CLIENT2[客户端2] -- |SET NX EX| R_LOCK R_LOCK -- |竞争成功| R_OWNER1[客户端1获得锁] R_LOCK -- |竞争失败| R_WAIT2[客户端2重试] R_OWNER1 -- |EX 过期自动释放| R_RELEASE[锁释放] R_RELEASE -- R_OWNER2[客户端2获得锁] end subgraph etcd 方案: 强一致性 E_CLIENT1[客户端1] -- |TXN: CreateRevision0| E_LOCK[etcd: LeaseRevision] E_CLIENT2[客户端2] -- |TXN: CreateRevision0| E_LOCK E_LOCK -- |Raft 共识| E_OWNER1[客户端1获得锁] E_LOCK -- |Watch 前序Key| E_WAIT2[客户端2等待] E_OWNER1 -- |主动释放或Lease过期| E_DELETE[删除Key] E_DELETE -- E_NOTIFY[通知客户端2] E_NOTIFY -- E_OWNER2[客户端2获得锁] end subgraph 一致性对比 R_COMPARE[Redis: 主从异步复制br/故障切换可能丢失锁] -- R_RISK[风险: 双主同时持锁] E_COMPARE[etcd: Raft 强一致br/多数派确认后才生效] -- E_SAFE[安全: 不会双主持锁] end style R_RISK fill:#ffebee style E_SAFE fill:#e8f5e9Redis 分布式锁的核心是SET key value NX EX ttl命令——仅当 Key 不存在时设置并指定过期时间。这种方式简单高效但存在一个根本性问题Redis 的主从复制是异步的主节点加锁成功后尚未同步到从节点就宕机了从节点升为主节点后锁丢失另一个客户端可以再次加锁导致两个客户端同时持有锁。etcd 基于 Raft 共识算法所有写操作需要多数派确认后才生效。加锁操作通过事务TXN实现——检查 Key 的 CreateRevision 是否为 0不存在如果为 0 则创建。这种强一致性保证避免了 Redis 的双主持锁问题但代价是延迟更高——每次加锁都需要多数派节点确认。三、生产级分布式锁的实现3.1 Redis 分布式锁带自动续期# redis_distributed_lock.py — Redis 分布式锁实现 import time import uuid import threading from typing import Optional import redis class RedisDistributedLock: Redis 分布式锁支持自动续期和可重入 def __init__(self, redis_client: redis.Redis, lock_key: str, ttl_ms: int 10000, retry_count: int 3, retry_delay_ms: int 200): self._redis redis_client self._lock_key flock:{lock_key} # 每个锁实例有唯一标识防止误删其他客户端的锁 self._lock_value str(uuid.uuid4()) self._ttl_ms ttl_ms self._retry_count retry_count self._retry_delay_ms retry_delay_ms self._renewal_thread: Optional[threading.Thread] None self._stop_renewal threading.Event() self._reentrant_count 0 # 可重入计数 def acquire(self, timeout_ms: int 5000) - bool: 获取锁支持超时和重试 # 可重入检查 if self._reentrant_count 0: self._reentrant_count 1 return True deadline time.time() timeout_ms / 1000.0 attempts 0 while time.time() deadline and attempts self._retry_count: # SET NX EX 原子操作 acquired self._redis.set( self._lock_key, self._lock_value, nxTrue, # 仅当 Key 不存在时设置 pxself._ttl_ms, # 过期时间毫秒 ) if acquired: self._reentrant_count 1 # 启动自动续期线程 self._start_renewal() return True attempts 1 remaining deadline - time.time() if remaining 0: wait min( self._retry_delay_ms / 1000.0, remaining, ) time.sleep(wait) return False def release(self) - bool: 释放锁使用 Lua 脚本确保原子性 # 可重入处理 if self._reentrant_count 1: self._reentrant_count - 1 return True # 停止续期线程 self._stop_renewal.set() # Lua 脚本仅当锁值匹配时才删除 # 防止误删其他客户端加的锁 lua_script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end result self._redis.eval( lua_script, 1, self._lock_key, self._lock_value, ) self._reentrant_count 0 return result 1 def _start_renewal(self): 启动自动续期线程 self._stop_renewal.clear() def renewal_loop(): # 每隔 TTL/3 续期一次 interval self._ttl_ms / 3000.0 while not self._stop_renewal.wait(interval): try: # 仅当锁值匹配时才续期 lua_script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(pexpire, KEYS[1], ARGV[2]) else return 0 end self._redis.eval( lua_script, 1, self._lock_key, self._lock_value, self._ttl_ms, ) except Exception: # 续期失败锁可能已被其他客户端获取 break self._renewal_thread threading.Thread( targetrenewal_loop, daemonTrue, ) self._renewal_thread.start() def __enter__(self): if not self.acquire(): raise TimeoutError(f获取锁 {self._lock_key} 超时) return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() return False3.2 etcd 分布式锁强一致性# etcd_distributed_lock.py — etcd 分布式锁实现 import time import grpc from typing import Optional import etcd3 class EtcdDistributedLock: etcd 分布式锁基于 Raft 共识的强一致性保证 def __init__(self, etcd_client: etcd3.Etcd3Client, lock_name: str, ttl_seconds: int 10): self._client etcd_client self._lock_name lock_name self._ttl_seconds ttl_seconds self._lock: Optional[etcd3.Lock] None def acquire(self, timeout_seconds: int 5) - bool: 获取锁基于 etcd 事务的原子操作 self._lock self._client.lock( self._lock_name, ttlself._ttl_seconds, ) deadline time.time() timeout_seconds while time.time() deadline: try: # etcd Lock 内部通过 TXN 实现 # 1. 检查 Key 的 CreateRevision 是否为 0 # 2. 如果为 0创建 Key 并绑定 Lease # 3. 如果不为 0Watch 前序 Key 等待 acquired self._lock.acquire() if acquired: return True except grpc.RpcError as e: # 网络异常时重试 if e.code() grpc.StatusCode.UNAVAILABLE: time.sleep(0.2) continue raise time.sleep(0.1) return False def release(self) - bool: 释放锁删除 Key 并通知等待者 if self._lock is None: return False try: # 删除 Key 后etcd 会通过 Watch 通知下一个等待者 return self._lock.release() except Exception: return False def __enter__(self): if not self.acquire(): raise TimeoutError( f获取锁 {self._lock_name} 超时 ) return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() return False四、方案选型的权衡矩阵一致性 vs 性能Redis 单次加锁操作的延迟在 1ms 以内etcd 由于需要 Raft 共识延迟在 10-30ms。对于高并发场景QPS 10000Redis 的性能优势明显对于一致性要求高的场景如金融交易etcd 的强一致性是必要保障。故障恢复Redis 主节点宕机后从节点升主期间通常 10-30 秒锁信息可能丢失。etcd 基于 Raft 的多数派机制少数节点故障不影响锁服务可用性且不会丢失锁信息。如果业务不能容忍双主持锁的风险必须选择 etcd。运维复杂度Redis 部署简单大多数团队已有 Redis 基础设施。etcd 部署和运维相对复杂需要至少 3 个节点组成集群对运维团队有更高要求。如果团队没有 etcd 运维经验引入 etcd 的成本需要纳入考量。适用场景总结库存扣减、限流等短暂不一致可容忍的场景选 Redis账户转账、分布式事务等不一致不可容忍的场景选 etcd定时任务去重等偶尔重复可容忍的场景选 Redis 数据库唯一约束兜底。五、总结分布式锁的选型本质是一致性与性能的权衡。Redis 方案性能高但存在双主持锁风险适合一致性要求不极致的场景etcd 方案强一致但延迟更高适合金融级一致性要求的场景。生产环境中Redis 锁必须实现自动续期防止业务未完成锁就过期和 Lua 原子释放防止误删其他客户端的锁。建议从 Redis 方案起步在遇到一致性问题时再迁移到 etcd而非一开始就追求最强一致性。