IT策士 10余年一线大厂经验专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章助你少走弯路。在前五篇文章里我们一直在用redis-cli和命令行探索 Redis 的五大数据结构外加三个高级类型。你可能会问在 Python 项目里怎么操作 Redis怎么管理连接怎么提升性能从本文开始我们将全面切换到Python 视角用redis-py这个官方推荐的客户端库来武装我们的 Python 应用。本文将手把手教你安装、建立连接、操作五大基础数据结构并深入讲解连接池和 Pipeline 两大工程化利器。读完你会发现用 Python 调用 Redis 原来可以这么简洁优雅。1. 环境准备安装 redis-pyredis-py是 Redis 官方推荐的 Python 客户端纯 Python 实现API 简洁且紧跟 Redis 最新命令。安装只需一条命令预期输出Collecting redis Downloading redis-5.0.0-py3-none-any.whl(252kB)... 省略... Installing collected packages: redis Successfully installed redis-5.0.0⚠️ 本文以redis-py5.x 版本为例部分 API 可能与 4.x 略有差异。建议使用 4.0 及以上版本以获取完整类型提示和更好的异步支持。检查版本pip show redis。2. 创建连接从简单到规范2.1 最简单的连接importredis rredis.Redis(hostlocalhost,port6379,decode_responsesTrue)# Ping 测试print(r.ping())# True参数解释hostRedis 服务器地址默认localhostport端口默认6379decode_responsesTrue强烈建议开启让返回的数据自动解码为 Python 字符串而不是 bytes。否则你得到的是bvalue需要自己.decode()2.2 带密码和 db 的连接rredis.Redis(host192.168.1.100,port6379,passwordyour_password,db1,decode_responsesTrue)db参数指定数据库编号Redis 默认有 0~15 共 16 个数据库可通过配置修改。2.3 使用连接 URL更现代redis-py支持从 URL 字符串创建连接适合从配置文件读取# 格式: redis://[:password]host:port/dbrredis.from_url(redis://:mypasswordlocalhost:6379/0,decode_responsesTrue)print(r.ping())最佳实践永远不要在代码中硬编码密码使用环境变量redis.from_url(os.getenv(REDIS_URL))3. 五大基础数据类型 API 速查redis-py的 API 与redis-cli命令几乎一一对应方法名小写参数顺序一致。下面我们快速过一遍每种数据类型的 Python 调用方式并附上输出。3.1 String 操作importredis rredis.Redis(decode_responsesTrue)# SET / GET / MSET / MGETr.set(name,Alice)print(r.get(name))# Alicer.mset({age:30,city:Beijing})print(r.mget([age,city]))# [30, Beijing]# SET 可选参数ex, nx, px 等r.set(token,abc,ex10)# 10秒过期r.set(lock,1,nxTrue,ex30)# 仅在不存在时设置# 数字操作r.set(counter,0)r.incr(counter)r.incrby(counter,5)print(r.get(counter))# 6# 字符串操作r.set(greet,Hello)r.append(greet, Redis)print(r.get(greet))# Hello Redisprint(r.strlen(greet))# 11print(r.getrange(greet,0,4))# Hello3.2 Hash 操作# HSET / HGET / HMSET (已废弃直接用 hset)r.hset(user:1001,mapping{name:Bob,age:25,city:Shanghai})print(r.hgetall(user:1001))# {name: Bob, age: 25, city: Shanghai}# 获取单个字段print(r.hget(user:1001,name))# Bob# 批量获取print(r.hmget(user:1001,[name,age]))# [Bob, 25]# 自增r.hincrby(user:1001,age,1)print(r.hget(user:1001,age))# 26# 字段是否存在/删除/长度print(r.hexists(user:1001,city))# Truer.hdel(user:1001,city)print(r.hlen(user:1001))# 23.3 List 操作# 左右插入r.lpush(queue,task1,task2)# task2, task1r.rpush(queue,task3)# task2, task1, task3# 范围查看print(r.lrange(queue,0, -1))# [task2, task1, task3]# 弹出print(r.lpop(queue))# task2print(r.rpop(queue))# task3# 阻塞弹出# r.brpop(queue, timeout5) # 返回 (queue_name, value) 或 None# 长度与索引print(r.llen(queue))# 1print(r.lindex(queue,0))# task1# 修剪r.lpush(log, *[a,b,c,d,e])r.ltrim(log,0,2)print(r.lrange(log,0, -1))# [e, d, c]3.4 Set 操作# 添加 / 查看 / 删除r.sadd(tags:python,redis,django,flask)print(r.smembers(tags:python))# {flask, django, redis} 无序print(r.sismember(tags:python,django))# Trueprint(r.scard(tags:python))# 3r.srem(tags:python,flask)# 集合运算r.sadd(tags:web,django,fastapi)print(r.sinter(tags:python,tags:web))# {django}print(r.sunion(tags:python,tags:web))# {redis, django, fastapi}print(r.sdiff(tags:python,tags:web))# {redis}# 随机弹出print(r.spop(tags:python))# 随机一个元素3.5 Sorted Set 操作# 添加 / 查看r.zadd(leaderboard,{Alice:100,Bob:85,Charlie:92})print(r.zrange(leaderboard,0, -1,withscoresTrue))# [(Bob, 85.0), (Charlie, 92.0), (Alice, 100.0)]# 反向获取 Top Nprint(r.zrevrange(leaderboard,0,1,withscoresTrue))# [(Alice, 100.0), (Charlie, 92.0)]# 增减分数r.zincrby(leaderboard,10,Bob)print(r.zscore(leaderboard,Bob))# 95.0# 排名print(r.zrank(leaderboard,Bob))# 0 (从低到高)print(r.zrevrank(leaderboard,Bob))# 2 (从高到低)# 按分数范围查询print(r.zrangebyscore(leaderboard,90,100))# [Charlie, Alice]动手试试把上面的代码逐段复制到你的python交互环境或脚本运行对比输出是否与预期一致。试着修改几个参数比如给 String 加ex5然后用ttl观察倒计时。4. 连接池管理告别频繁握手4.1 为什么要用连接池Redis 是基于 TCP 的每次创建新连接都需要三次握手、认证等开销。如果你的应用在高并发场景下每次请求都redis.Redis()新建连接不仅浪费性能还可能耗尽 Redis 的maxclients限制。连接池ConnectionPool就是提前创建好一定数量的连接放在池子里用时借、用完还连接不断开。4.2 创建连接池importredis# 显式创建连接池poolredis.ConnectionPool(hostlocalhost,port6379,decode_responsesTrue,max_connections20,# 最大连接数# 更多参数如 password, db 等)# 多个 Redis 实例共享一个连接池r1redis.Redis(connection_poolpool)r2redis.Redis(connection_poolpool)# 验证共享print(r1.ping())# Trueprint(r2.ping())# True# 使用完可以断开所有连接应用退出时pool.disconnect()max_connections设定了池中最多保持多少个连接。当并发请求超过此数时获取连接的请求会阻塞直到有连接归还。4.3 生产环境配置建议importredisimportos def get_redis_client():生产级 Redis 客户端工厂函数 poolredis.ConnectionPool(hostos.getenv(REDIS_HOST,localhost),portint(os.getenv(REDIS_PORT,6379)),passwordos.getenv(REDIS_PASSWORD, None),dbint(os.getenv(REDIS_DB,0)),decode_responsesTrue,max_connectionsint(os.getenv(REDIS_MAX_CONNS,50)),socket_timeout5,# 连接超时socket_connect_timeout5,# 连接建立超时health_check_interval30,# 健康检查间隔秒)returnredis.Redis(connection_poolpool)clientget_redis_client()⚠️ 常见误区在多线程环境中每个线程反复创建redis.Redis()而不共享连接池。正确做法是全局创建一个连接池所有线程共享同一个redis.Redis实例线程安全。在多进程如 gunicorn中每个 worker 进程应创建自己的连接池。5. Pipeline批量命令的加速器5.1 为什么需要 PipelineRedis 的命令执行是 C/S 模式每一次命令都要经历客户端发送命令 → 网络传输 → 服务端处理 → 网络返回 → 客户端接收。如果我们有 N 个命令就有 N 次网络往返RTT。Pipeline管道允许我们把多个命令打包一次性发给 Redis 服务端服务端批量执行后一次性返回结果。这样 N 个命令只花费 1 次 RTT性能提升非常显著。5.2 Pipeline 基础用法importredisimporttimerredis.Redis(hostlocalhost,port6379,decode_responsesTrue)# 不使用 Pipelinestarttime.perf_counter()foriinrange(1000): r.set(fkey:{i}, fvalue:{i})print(f无 Pipeline 耗时: {time.perf_counter() - start:.3f}s)# 使用 Pipeliner.flushdb()# 清空测试库starttime.perf_counter()piper.pipeline()foriinrange(1000): pipe.set(fkey:{i}, fvalue:{i})pipe.execute()# 一次性发送所有命令print(f使用 Pipeline 耗时: {time.perf_counter() - start:.3f}s)输出示例本地测试无 Pipeline 耗时:0.532s 使用 Pipeline 耗时:0.018s性能差距可达30 倍以上在跨机房、高延迟网络环境下差距更明显。5.3 Pipeline 的高级用法Pipeline 也可以链式调用并且支持获取每个命令的返回值piper.pipeline()# 链式写入pipe.set(name,Alice).incr(counter).get(name)# execute() 返回每个命令的结果列表resultspipe.execute()print(results)# [True, 1, Alice]Pipeline 事务Pipeline 默认不是事务命令只是打包发送不会保证原子性。如果要事务使用transactionTrue下一章会深入讲解 Lua 和事务。piper.pipeline(transactionTrue)pipe.set(balance,100)pipe.incrby(balance,50)pipe.execute()# 在 MULTI/EXEC 中执行5.4 最佳实践批量读写循环中的GET/SET应改为Pipeline或MGET/MSET。批量操作但不同键Pipeline比MSET更灵活可以混合不同类型命令。控制批量大小一次 Pipeline 发送的命令数不要过大建议 1000 以内避免客户端内存爆涨和服务端长时间阻塞虽然 Redis 单线程会一次性执行完这批命令但命令过多会阻塞其他请求。pipeline 配合连接池Pipeline 从连接池获取一个连接并独占执行完毕后归还确保连接被正确复用。6. 异常处理与重试网络抖动不可避免健壮的应用需要容错机制from redis.exceptionsimportConnectionError, TimeoutErrorimporttimedef safe_incr(r, key,retries3):forattemptinrange(retries): try:returnr.incr(key)except(ConnectionError, TimeoutError)as e: print(f第 {attempt1} 次重试错误: {e})time.sleep(0.1*(attempt 1))# 退避raise Exception(f操作失败已重试 {retries} 次)更完善的方案可使用retry库或redis-py内置的retry_on_timeout支持4.x 版本rredis.Redis(hostlocalhost,retry_on_timeoutTrue,retry_on_error[ConnectionError, TimeoutError],health_check_interval30,)7. 动手试试在本地完成以下实战练习连接池挑战创建一个max_connections5的连接池启动 10 个线程同时执行GET操作观察是否会阻塞用time.sleep模拟长操作验证连接池行为。Pipeline 性能对决准备 5000 个键值对分别用for循环SET和Pipeline写入打印耗时对比。然后将数据用两种方式读取出来GET循环 vsMGET观察性能。模拟缓存读写编写一个函数get_cached_data(key)先从 Redis 读取不存在时模拟从 DB 查询返回字符串然后写入 Redis 并设 TTL返回数据。验证第二次调用时是否命中缓存。预期效果Pipeline 写入速度是循环写入的数十倍MGET读取同样秒杀逐条GET缓存命中时无需调用 DB 函数。8. 总结本文是 Python 操作 Redis 的开篇基石我们掌握了安装与连接pip install redis、redis.Redis()、连接 URL。五大数据类型 API方法与命令行一一对应decode_responses自动转字符串。连接池避免频繁 TCP 握手全局共享线程安全。Pipeline批量打包命令大幅降低网络 RTT性能提升数十倍。异常处理与重试让应用在网络抖动下依然可靠。你已经可以从redis-cli无缝切换到 Python 代码并能写出工程级别的 Redis 交互逻辑。下一篇我们将继续深入 Python 操作 Redis 的进阶技巧事务、Lua 脚本、序列化方案、异步redis.asyncio以及工具类封装真正把 Redis 用出“高级感”。想了解更多还可以去各个平台搜索「IT策士」一起升级 IT 思维
Redis 从入门到精通:Python 操作 Redis
发布时间:2026/6/11 3:11:03
IT策士 10余年一线大厂经验专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章助你少走弯路。在前五篇文章里我们一直在用redis-cli和命令行探索 Redis 的五大数据结构外加三个高级类型。你可能会问在 Python 项目里怎么操作 Redis怎么管理连接怎么提升性能从本文开始我们将全面切换到Python 视角用redis-py这个官方推荐的客户端库来武装我们的 Python 应用。本文将手把手教你安装、建立连接、操作五大基础数据结构并深入讲解连接池和 Pipeline 两大工程化利器。读完你会发现用 Python 调用 Redis 原来可以这么简洁优雅。1. 环境准备安装 redis-pyredis-py是 Redis 官方推荐的 Python 客户端纯 Python 实现API 简洁且紧跟 Redis 最新命令。安装只需一条命令预期输出Collecting redis Downloading redis-5.0.0-py3-none-any.whl(252kB)... 省略... Installing collected packages: redis Successfully installed redis-5.0.0⚠️ 本文以redis-py5.x 版本为例部分 API 可能与 4.x 略有差异。建议使用 4.0 及以上版本以获取完整类型提示和更好的异步支持。检查版本pip show redis。2. 创建连接从简单到规范2.1 最简单的连接importredis rredis.Redis(hostlocalhost,port6379,decode_responsesTrue)# Ping 测试print(r.ping())# True参数解释hostRedis 服务器地址默认localhostport端口默认6379decode_responsesTrue强烈建议开启让返回的数据自动解码为 Python 字符串而不是 bytes。否则你得到的是bvalue需要自己.decode()2.2 带密码和 db 的连接rredis.Redis(host192.168.1.100,port6379,passwordyour_password,db1,decode_responsesTrue)db参数指定数据库编号Redis 默认有 0~15 共 16 个数据库可通过配置修改。2.3 使用连接 URL更现代redis-py支持从 URL 字符串创建连接适合从配置文件读取# 格式: redis://[:password]host:port/dbrredis.from_url(redis://:mypasswordlocalhost:6379/0,decode_responsesTrue)print(r.ping())最佳实践永远不要在代码中硬编码密码使用环境变量redis.from_url(os.getenv(REDIS_URL))3. 五大基础数据类型 API 速查redis-py的 API 与redis-cli命令几乎一一对应方法名小写参数顺序一致。下面我们快速过一遍每种数据类型的 Python 调用方式并附上输出。3.1 String 操作importredis rredis.Redis(decode_responsesTrue)# SET / GET / MSET / MGETr.set(name,Alice)print(r.get(name))# Alicer.mset({age:30,city:Beijing})print(r.mget([age,city]))# [30, Beijing]# SET 可选参数ex, nx, px 等r.set(token,abc,ex10)# 10秒过期r.set(lock,1,nxTrue,ex30)# 仅在不存在时设置# 数字操作r.set(counter,0)r.incr(counter)r.incrby(counter,5)print(r.get(counter))# 6# 字符串操作r.set(greet,Hello)r.append(greet, Redis)print(r.get(greet))# Hello Redisprint(r.strlen(greet))# 11print(r.getrange(greet,0,4))# Hello3.2 Hash 操作# HSET / HGET / HMSET (已废弃直接用 hset)r.hset(user:1001,mapping{name:Bob,age:25,city:Shanghai})print(r.hgetall(user:1001))# {name: Bob, age: 25, city: Shanghai}# 获取单个字段print(r.hget(user:1001,name))# Bob# 批量获取print(r.hmget(user:1001,[name,age]))# [Bob, 25]# 自增r.hincrby(user:1001,age,1)print(r.hget(user:1001,age))# 26# 字段是否存在/删除/长度print(r.hexists(user:1001,city))# Truer.hdel(user:1001,city)print(r.hlen(user:1001))# 23.3 List 操作# 左右插入r.lpush(queue,task1,task2)# task2, task1r.rpush(queue,task3)# task2, task1, task3# 范围查看print(r.lrange(queue,0, -1))# [task2, task1, task3]# 弹出print(r.lpop(queue))# task2print(r.rpop(queue))# task3# 阻塞弹出# r.brpop(queue, timeout5) # 返回 (queue_name, value) 或 None# 长度与索引print(r.llen(queue))# 1print(r.lindex(queue,0))# task1# 修剪r.lpush(log, *[a,b,c,d,e])r.ltrim(log,0,2)print(r.lrange(log,0, -1))# [e, d, c]3.4 Set 操作# 添加 / 查看 / 删除r.sadd(tags:python,redis,django,flask)print(r.smembers(tags:python))# {flask, django, redis} 无序print(r.sismember(tags:python,django))# Trueprint(r.scard(tags:python))# 3r.srem(tags:python,flask)# 集合运算r.sadd(tags:web,django,fastapi)print(r.sinter(tags:python,tags:web))# {django}print(r.sunion(tags:python,tags:web))# {redis, django, fastapi}print(r.sdiff(tags:python,tags:web))# {redis}# 随机弹出print(r.spop(tags:python))# 随机一个元素3.5 Sorted Set 操作# 添加 / 查看r.zadd(leaderboard,{Alice:100,Bob:85,Charlie:92})print(r.zrange(leaderboard,0, -1,withscoresTrue))# [(Bob, 85.0), (Charlie, 92.0), (Alice, 100.0)]# 反向获取 Top Nprint(r.zrevrange(leaderboard,0,1,withscoresTrue))# [(Alice, 100.0), (Charlie, 92.0)]# 增减分数r.zincrby(leaderboard,10,Bob)print(r.zscore(leaderboard,Bob))# 95.0# 排名print(r.zrank(leaderboard,Bob))# 0 (从低到高)print(r.zrevrank(leaderboard,Bob))# 2 (从高到低)# 按分数范围查询print(r.zrangebyscore(leaderboard,90,100))# [Charlie, Alice]动手试试把上面的代码逐段复制到你的python交互环境或脚本运行对比输出是否与预期一致。试着修改几个参数比如给 String 加ex5然后用ttl观察倒计时。4. 连接池管理告别频繁握手4.1 为什么要用连接池Redis 是基于 TCP 的每次创建新连接都需要三次握手、认证等开销。如果你的应用在高并发场景下每次请求都redis.Redis()新建连接不仅浪费性能还可能耗尽 Redis 的maxclients限制。连接池ConnectionPool就是提前创建好一定数量的连接放在池子里用时借、用完还连接不断开。4.2 创建连接池importredis# 显式创建连接池poolredis.ConnectionPool(hostlocalhost,port6379,decode_responsesTrue,max_connections20,# 最大连接数# 更多参数如 password, db 等)# 多个 Redis 实例共享一个连接池r1redis.Redis(connection_poolpool)r2redis.Redis(connection_poolpool)# 验证共享print(r1.ping())# Trueprint(r2.ping())# True# 使用完可以断开所有连接应用退出时pool.disconnect()max_connections设定了池中最多保持多少个连接。当并发请求超过此数时获取连接的请求会阻塞直到有连接归还。4.3 生产环境配置建议importredisimportos def get_redis_client():生产级 Redis 客户端工厂函数 poolredis.ConnectionPool(hostos.getenv(REDIS_HOST,localhost),portint(os.getenv(REDIS_PORT,6379)),passwordos.getenv(REDIS_PASSWORD, None),dbint(os.getenv(REDIS_DB,0)),decode_responsesTrue,max_connectionsint(os.getenv(REDIS_MAX_CONNS,50)),socket_timeout5,# 连接超时socket_connect_timeout5,# 连接建立超时health_check_interval30,# 健康检查间隔秒)returnredis.Redis(connection_poolpool)clientget_redis_client()⚠️ 常见误区在多线程环境中每个线程反复创建redis.Redis()而不共享连接池。正确做法是全局创建一个连接池所有线程共享同一个redis.Redis实例线程安全。在多进程如 gunicorn中每个 worker 进程应创建自己的连接池。5. Pipeline批量命令的加速器5.1 为什么需要 PipelineRedis 的命令执行是 C/S 模式每一次命令都要经历客户端发送命令 → 网络传输 → 服务端处理 → 网络返回 → 客户端接收。如果我们有 N 个命令就有 N 次网络往返RTT。Pipeline管道允许我们把多个命令打包一次性发给 Redis 服务端服务端批量执行后一次性返回结果。这样 N 个命令只花费 1 次 RTT性能提升非常显著。5.2 Pipeline 基础用法importredisimporttimerredis.Redis(hostlocalhost,port6379,decode_responsesTrue)# 不使用 Pipelinestarttime.perf_counter()foriinrange(1000): r.set(fkey:{i}, fvalue:{i})print(f无 Pipeline 耗时: {time.perf_counter() - start:.3f}s)# 使用 Pipeliner.flushdb()# 清空测试库starttime.perf_counter()piper.pipeline()foriinrange(1000): pipe.set(fkey:{i}, fvalue:{i})pipe.execute()# 一次性发送所有命令print(f使用 Pipeline 耗时: {time.perf_counter() - start:.3f}s)输出示例本地测试无 Pipeline 耗时:0.532s 使用 Pipeline 耗时:0.018s性能差距可达30 倍以上在跨机房、高延迟网络环境下差距更明显。5.3 Pipeline 的高级用法Pipeline 也可以链式调用并且支持获取每个命令的返回值piper.pipeline()# 链式写入pipe.set(name,Alice).incr(counter).get(name)# execute() 返回每个命令的结果列表resultspipe.execute()print(results)# [True, 1, Alice]Pipeline 事务Pipeline 默认不是事务命令只是打包发送不会保证原子性。如果要事务使用transactionTrue下一章会深入讲解 Lua 和事务。piper.pipeline(transactionTrue)pipe.set(balance,100)pipe.incrby(balance,50)pipe.execute()# 在 MULTI/EXEC 中执行5.4 最佳实践批量读写循环中的GET/SET应改为Pipeline或MGET/MSET。批量操作但不同键Pipeline比MSET更灵活可以混合不同类型命令。控制批量大小一次 Pipeline 发送的命令数不要过大建议 1000 以内避免客户端内存爆涨和服务端长时间阻塞虽然 Redis 单线程会一次性执行完这批命令但命令过多会阻塞其他请求。pipeline 配合连接池Pipeline 从连接池获取一个连接并独占执行完毕后归还确保连接被正确复用。6. 异常处理与重试网络抖动不可避免健壮的应用需要容错机制from redis.exceptionsimportConnectionError, TimeoutErrorimporttimedef safe_incr(r, key,retries3):forattemptinrange(retries): try:returnr.incr(key)except(ConnectionError, TimeoutError)as e: print(f第 {attempt1} 次重试错误: {e})time.sleep(0.1*(attempt 1))# 退避raise Exception(f操作失败已重试 {retries} 次)更完善的方案可使用retry库或redis-py内置的retry_on_timeout支持4.x 版本rredis.Redis(hostlocalhost,retry_on_timeoutTrue,retry_on_error[ConnectionError, TimeoutError],health_check_interval30,)7. 动手试试在本地完成以下实战练习连接池挑战创建一个max_connections5的连接池启动 10 个线程同时执行GET操作观察是否会阻塞用time.sleep模拟长操作验证连接池行为。Pipeline 性能对决准备 5000 个键值对分别用for循环SET和Pipeline写入打印耗时对比。然后将数据用两种方式读取出来GET循环 vsMGET观察性能。模拟缓存读写编写一个函数get_cached_data(key)先从 Redis 读取不存在时模拟从 DB 查询返回字符串然后写入 Redis 并设 TTL返回数据。验证第二次调用时是否命中缓存。预期效果Pipeline 写入速度是循环写入的数十倍MGET读取同样秒杀逐条GET缓存命中时无需调用 DB 函数。8. 总结本文是 Python 操作 Redis 的开篇基石我们掌握了安装与连接pip install redis、redis.Redis()、连接 URL。五大数据类型 API方法与命令行一一对应decode_responses自动转字符串。连接池避免频繁 TCP 握手全局共享线程安全。Pipeline批量打包命令大幅降低网络 RTT性能提升数十倍。异常处理与重试让应用在网络抖动下依然可靠。你已经可以从redis-cli无缝切换到 Python 代码并能写出工程级别的 Redis 交互逻辑。下一篇我们将继续深入 Python 操作 Redis 的进阶技巧事务、Lua 脚本、序列化方案、异步redis.asyncio以及工具类封装真正把 Redis 用出“高级感”。想了解更多还可以去各个平台搜索「IT策士」一起升级 IT 思维