Python缓存优化策略 Python 缓存优化策略 多级缓存策略从内存到 Redis 的性能优化 缓存是解决性能瓶颈的最有效手段之一。通过将昂贵计算或 IO 的结果临时存储避免重复计算或重复读取大幅提升响应速度。import timeimport functoolsimport pickleimport osimport hashlib# 1. 内存缓存functools.lru_cache functools.lru_cache(maxsize128)def expensive_computation(n: int) - int:被 lru_cache 装饰的函数会缓存计算结果。maxsize128 表示最多缓存 128 个不同参数的结果。LRU 策略会自动淘汰最久未使用的缓存项。print(f [计算] fibonacci({n}), end)time.sleep(0.05) # 模拟耗时操作if n 1:return nresult expensive_computation(n - 1) expensive_computation(n - 2)print(f {result})return resultdef demo_lru_cache():演示 lru_cache 的效果print( lru_cache 缓存 )# 第一次调用实际计算start time.perf_counter()r1 expensive_computation(35)t1 time.perf_counter() - startprint(f 第一次调用: {r1}, 耗时 {t1:.4f}s)# 第二次调用命中缓存start time.perf_counter()r2 expensive_computation(35)t2 time.perf_counter() - startprint(f 第二次调用: {r2}, 耗时 {t2:.4f}s)# 查看缓存信息info expensive_computation.cache_info()print(f 缓存统计: hits{info.hits}, misses{info.misses}, ,fcurrsize{info.currsize})# 2. TTL 缓存带过期时间的缓存 class TTLCache:带 TTL (Time-To-Live) 的本地缓存实现。缓存项在超过指定时间后自动失效。def __init__(self, ttl_seconds: int 60):self.cache {} # 存储 Key - (value, expiry)self.ttl ttl_secondsdef get(self, key: str):获取缓存值如果过期或不存在返回 Noneif key not in self.cache:return Nonevalue, expiry self.cache[key]if time.time() expiry:del self.cache[key] # 惰性删除过期项return Nonereturn valuedef set(self, key: str, value):设置缓存值并记录过期时间self.cache[key] (value, time.time() self.ttl)def clear_expired(self):主动清理所有过期项now time.time()expired [k for k, (_, exp) in self.cache.items() if now exp]for k in expired:del self.cache[k]return len(expired)propertydef size(self) - int:return len(self.cache)def demo_ttl_cache():演示 TTL 缓存print( TTL 缓存 )cache TTLCache(ttl_seconds2)# 存入缓存cache.set(user_42, {name: Alice, points: 100})print(f 设置缓存后的 size: {cache.size})# 立即读取未过期result cache.get(user_42)print(f 立即读取: {result})# 等待过期time.sleep(3)# 过期后读取result cache.get(user_42)print(f 过期后读取: {result})# 主动清理cache.set(key1, value1)cache.set(key2, value2)time.sleep(2)cleaned cache.clear_expired()print(f 清理了 {cleaned} 个过期项剩余: {cache.size})# 3. 磁盘缓存将结果持久化到文件 class DiskCache:将计算结果缓存到磁盘。适合计算非常昂贵且结果可以跨进程共享的场景。def __init__(self, cache_dir: str _disk_cache):self.cache_dir cache_diros.makedirs(self.cache_dir, exist_okTrue)def _key_to_path(self, key: str) - str:将缓存 Key 映射到文件路径hashed hashlib.md5(key.encode()).hexdigest()return os.path.join(self.cache_dir, f{hashed}.cache)def get(self, key: str):从磁盘缓存读取path self._key_to_path(key)if not os.path.exists(path):return Nonewith open(path, rb) as f:return pickle.load(f)def set(self, key: str, value):写入磁盘缓存path self._key_to_path(key)with open(path, wb) as f:pickle.dump(value, f)def clear(self):清除所有缓存文件for fname in os.listdir(self.cache_dir):os.remove(os.path.join(self.cache_dir, fname))print(f 已清除 {self.cache_dir} 中的所有缓存)def expensive_data_processing(n: int) - list:模拟一个非常耗时的数据处理过程time.sleep(0.5)return [i ** 2 for i in range(n)]def demo_disk_cache():演示磁盘缓存的效果print( 磁盘缓存 )cache DiskCache()cache_key data_10000# 第一次实际计算start time.perf_counter()result cache.get(cache_key)if result is None:print( [缓存未命中] 开始计算...)result expensive_data_processing(10000)cache.set(cache_key, result)t time.perf_counter() - startprint(f 耗时: {t:.4f}s, 结果长度: {len(result)})# 第二次从缓存读取start time.perf_counter()result cache.get(cache_key)if result is not None:print( [缓存命中] 从磁盘加载)t time.perf_counter() - startprint(f 耗时: {t:.4f}s, 结果长度: {len(result)})# 清理cache.clear()# 4. Redis 缓存分布式缓存 def demo_redis_cache():使用 Redis 作为分布式缓存后端。Redis 支持 TTL、持久化、高可用等特性。运行前需要先启动 Redis 服务器。print( Redis 缓存 )try:import redis# 连接到 Redisclient redis.Redis(hostlocalhost,port6379,decode_responsesTrue, # 自动解码为字符串socket_timeout2)# 测试连接client.ping()print( [OK] 已连接到 Redis 服务器)# 设置带 TTL 的缓存client.setex(user:1001, 60, {name: Bob, role: admin})print( 设置缓存 (TTL60s))# 读取缓存data client.get(user:1001)print(f 读取缓存: {data})# 批量操作pipeline client.pipeline()pipeline.set(key:1, value1)pipeline.set(key:2, value2)pipeline.expire(key:1, 30)pipeline.execute()print( 批量设置完成)# 缓存统计info client.info(keyspace)print(f Redis 数据库统计: {info})client.close()except redis.exceptions.ConnectionError:print( [跳过] Redis 服务器未启动)print( [提示] 启动: redis-server)except ImportError:print( [跳过] redis 库未安装请: pip install redis)# 5. 写穿透缓存 (Write-Through Cache) class WriteThroughCache:写穿透缓存每次写操作同时更新缓存和后端存储。保证缓存和数据源的一致性。def __init__(self, backend_store: dict):self.cache {} # 内存缓存self.store backend_store # 后端存储def get(self, key: str):从缓存读取未命中则从后端加载if key in self.cache:print(f [缓存命中] {key})return self.cache[key]# 从后端加载到缓存if key in self.store:print(f [缓存未命中][从后端加载] {key})self.cache[key] self.store[key]return self.store[key]return Nonedef set(self, key: str, value):同时写入缓存和后端写穿透print(f [写穿透] {key} {value})self.cache[key] value # 更新缓存self.store[key] value # 更新后端def delete(self, key: str):同时从缓存和后端删除self.cache.pop(key, None)self.store.pop(key, None)def demo_write_through():演示写穿透缓存print( 写穿透缓存 )backend {x: 10, y: 20}cache WriteThroughCache(backend)print(f 初始后端数据: {backend})print(f 初始缓存数据: {cache.cache})# 读取缓存未命中val cache.get(x)print(f 读取 x {val})print(f 缓存现在包含: {cache.cache})# 再次读取缓存命中val cache.get(x)print(f 再次读取 x {val})# 写操作同时更新缓存和后端cache.set(z, 30)print(f 写入后缓存: {cache.cache})print(f 写入后后端: {backend})# 6. dogpile.cache 高级缓存 def demo_dogpile_cache():dogpile.cache 是一个高级缓存库支持多种后端。提供防击穿机制防止高并发时大量请求同时重建缓存。print( dogpile.cache 示例 )try:from dogpile.cache import make_regionfrom dogpile.cache.api import NO_VALUE# 创建缓存区域region make_region().configure(dogpile.cache.memory,expiration_time5, # 5 秒过期)# 使用装饰器region.cache_on_arguments()def get_user_name(user_id: int) - str:模拟从数据库加载用户名print(f [DB 查询] 加载 user {user_id}...)time.sleep(0.2)return fUser_{user_id}print( 第一次调用 (缓存未命中):)start time.perf_counter()name1 get_user_name(42)t time.perf_counter() - startprint(f 结果: {name1}, 耗时: {t:.4f}s)print( 第二次调用 (缓存命中):)start time.perf_counter()name2 get_user_name(42)t time.perf_counter() - startprint(f 结果: {name2}, 耗时: {t:.4f}s)except ImportError:print( [跳过] dogpile.cache 未安装请: pip install dogpile.cache)# 主入口 if __name__ __main__:print( * 50)print(Python 缓存优化策略)print( * 50)demo_lru_cache()print()demo_ttl_cache()print()demo_disk_cache()print()demo_redis_cache()print()demo_write_through()print()demo_dogpile_cache()# 清理磁盘缓存目录import shutilif os.path.exists(_disk_cache):shutil.rmtree(_disk_cache)