一、引言
Python作为一种高级、通用、解释型编程语言,凭借其简洁易读的语法和强大的生态系统,已广泛应用于Web开发、数据分析、人工智能、自动化运维等众多领域。在Web开发领域,Django、Flask等框架让开发者能够快速构建高效稳定的Web应用;在数据分析领域,NumPy、Pandas等库为数据处理和分析提供了强大支持;在人工智能领域,TensorFlow、PyTorch等框架推动了机器学习和深度学习的发展。据统计,全球超过80%的数据科学家和AI工程师使用Python进行开发,Python的 popularity 可见一斑。
在Python的生态系统中,有许多优秀的库和工具,它们为开发者提供了各种各样的功能和解决方案。本文将介绍其中一个非常重要的库——redis-py,它是Python与Redis数据库交互的官方推荐库。Redis作为一个高性能的键值对存储数据库,在缓存、消息队列、计数器、分布式锁等场景中有着广泛的应用。通过redis-py库,Python开发者可以方便地使用Redis的各种功能,提升应用的性能和可靠性。
二、Redis库概述
2.1 用途
Redis(Remote Dictionary Server)是一个开源的、高性能的键值对存储数据库,它支持多种数据结构,如字符串(String)、哈希(Hash)、列表(List)、集合(Set)、有序集合(Sorted Set)等。Redis的主要用途包括:
- 缓存:作为缓存层,存储经常访问的数据,减少数据库的访问压力,提高应用的响应速度。
- 消息队列:通过列表数据结构实现消息队列,支持发布/订阅模式,用于异步任务处理。
- 计数器:利用Redis的原子操作,实现计数器功能,如网站访问量、文章点赞数等。
- 分布式锁:通过Redis的原子操作和过期机制,实现分布式锁,解决分布式系统中的并发问题。
- 会话存储:存储用户会话信息,实现分布式应用的会话共享。
2.2 工作原理
Redis是一个基于内存的数据库,它将数据存储在内存中,因此具有极高的读写性能。同时,Redis支持数据持久化,可以将内存中的数据定期或实时写入磁盘,以防止数据丢失。Redis采用单线程模型处理客户端请求,通过非阻塞I/O和事件驱动机制,实现了高效的并发处理能力。
Redis的工作流程如下:
- 客户端向Redis服务器发送请求。
- Redis服务器接收请求并解析。
- 服务器根据请求类型执行相应的操作,如读取或写入数据。
- 服务器将处理结果返回给客户端。
2.3 优缺点
优点
- 高性能:基于内存操作,读写速度极快,单节点QPS可达10万以上。
- 丰富的数据结构:支持多种数据结构,如String、Hash、List、Set、Sorted Set等,满足不同的应用场景。
- 持久化:支持RDB和AOF两种持久化方式,保证数据的安全性和可靠性。
- 分布式:支持主从复制、哨兵和集群模式,实现高可用和扩展性。
- 原子操作:所有操作都是原子性的,保证数据的一致性。
- 丰富的功能:支持发布/订阅、Lua脚本、过期机制等功能。
缺点
- 内存限制:数据存储在内存中,成本较高,不适合存储大量数据。
- 单线程:单线程模型在处理复杂操作时可能成为瓶颈。
- 数据持久化开销:持久化操作会影响性能,尤其是AOF方式。
- 一致性保证:在分布式环境中,强一致性较难保证。
2.4 License类型
Redis采用BSD许可证,这是一种自由、宽松的开源许可证。BSD许可证允许用户自由使用、修改和分发软件,只需保留原始许可证声明和版权声明即可。这种许可证对商业应用非常友好,允许将Redis用于商业产品而无需公开源代码。
三、Redis库的安装与配置
3.1 Redis服务器安装
在使用redis-py库之前,需要先安装Redis服务器。以下是在不同操作系统上安装Redis服务器的方法:
Ubuntu/Debian
sudo apt update
sudo apt install redis-server
CentOS/RHEL
sudo yum install epel-release
sudo yum install redis
macOS
使用Homebrew安装:
brew install redis
Windows
在Windows上可以使用Redis的Windows版本,或者使用WSL(Windows Subsystem for Linux)来运行Redis。
安装完成后,可以使用以下命令启动Redis服务器:
redis-server
3.2 redis-py库安装
安装Redis服务器后,就可以安装redis-py库了。使用pip命令进行安装:
pip install redis
3.3 连接Redis服务器
安装完成后,可以使用以下代码测试与Redis服务器的连接:
import redis
# 创建Redis连接
r = redis.Redis(
host='localhost',
port=6379,
db=0,
password=None,
decode_responses=True # 自动解码响应数据
)
# 测试连接
try:
r.ping()
print("成功连接到Redis服务器")
except redis.exceptions.ConnectionError as e:
print(f"连接Redis服务器失败: {e}")
在上面的代码中,我们使用redis.Redis()方法创建了一个Redis连接对象,并通过ping()方法测试了连接是否成功。decode_responses=True参数用于自动解码从Redis服务器返回的数据,使我们得到的是字符串而不是字节类型。
四、Redis基本数据类型及操作
Redis支持多种数据类型,每种数据类型都有其独特的特点和适用场景。下面将介绍Redis的基本数据类型及其在Python中的操作方法。
4.1 字符串(String)
字符串是Redis最基本的数据类型,它可以存储任何类型的数据,如文本、JSON、二进制数据等。字符串类型在缓存、计数器、分布式锁等场景中非常有用。
以下是字符串类型的常用操作:
import redis
# 连接Redis服务器
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 设置键值对
r.set('name', 'Redis')
r.set('age', 10)
# 获取值
name = r.get('name')
age = r.get('age')
print(f"Name: {name}, Age: {age}") # 输出: Name: Redis, Age: 10
# 设置多个键值对
r.mset({'country': 'China', 'city': 'Beijing'})
# 获取多个值
values = r.mget(['name', 'country', 'city'])
print(values) # 输出: ['Redis', 'China', 'Beijing']
# 递增/递减操作
r.incr('age') # 递增1
print(r.get('age')) # 输出: 11
r.decr('age', 2) # 递减2
print(r.get('age')) # 输出: 9
# 设置带过期时间的键值对
r.setex('temp_key', 60, 'temporary value') # 60秒后过期
4.2 哈希(Hash)
哈希类型用于存储字段和值的映射关系,类似于Python中的字典。哈希类型适合存储对象信息,如用户信息、配置信息等。
以下是哈希类型的常用操作:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 设置哈希字段
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
r.hset('user:1', 'email', 'alice@example.com')
# 获取哈希字段
name = r.hget('user:1', 'name')
age = r.hget('user:1', 'age')
print(f"Name: {name}, Age: {age}") # 输出: Name: Alice, Age: 25
# 获取所有哈希字段和值
user = r.hgetall('user:1')
print(user) # 输出: {'name': 'Alice', 'age': '25', 'email': 'alice@example.com'}
# 获取所有哈希字段
fields = r.hkeys('user:1')
print(fields) # 输出: ['name', 'age', 'email']
# 获取所有哈希值
values = r.hvals('user:1')
print(values) # 输出: ['Alice', '25', 'alice@example.com']
# 判断字段是否存在
exists = r.hexists('user:1', 'email')
print(f"Email exists: {exists}") # 输出: Email exists: True
# 删除字段
r.hdel('user:1', 'age')
print(r.hget('user:1', 'age')) # 输出: None
4.3 列表(List)
列表类型是一个有序的字符串元素集合,它可以在列表的两端进行插入和删除操作,因此非常适合实现队列和栈。
以下是列表类型的常用操作:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 从左侧插入元素
r.lpush('tasks', 'task1')
r.lpush('tasks', 'task2')
r.lpush('tasks', 'task3')
# 获取列表长度
length = r.llen('tasks')
print(f"List length: {length}") # 输出: List length: 3
# 获取列表范围内的元素
tasks = r.lrange('tasks', 0, -1) # 获取所有元素
print(tasks) # 输出: ['task3', 'task2', 'task1']
# 从右侧弹出元素
task = r.rpop('tasks')
print(f"Popped task: {task}") # 输出: Popped task: task1
# 阻塞式弹出元素
# 如果列表为空,会阻塞直到有元素可用或超时
task = r.brpop('tasks', timeout=5)
print(f"Block popped task: {task}") # 输出: ('tasks', 'task2')
4.4 集合(Set)
集合类型是一个无序且唯一的字符串元素集合,它支持交集、并集、差集等操作,适合用于去重、关系计算等场景。
以下是集合类型的常用操作:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 添加元素到集合
r.sadd('fruits', 'apple')
r.sadd('fruits', 'banana')
r.sadd('fruits', 'cherry')
r.sadd('fruits', 'apple') # 重复元素不会被添加
# 获取集合所有元素
fruits = r.smembers('fruits')
print(fruits) # 输出: {'cherry', 'apple', 'banana'}
# 判断元素是否在集合中
exists = r.sismember('fruits', 'apple')
print(f"Apple exists: {exists}") # 输出: Apple exists: True
# 获取集合大小
size = r.scard('fruits')
print(f"Set size: {size}") # 输出: Set size: 3
# 删除元素
r.srem('fruits', 'banana')
print(r.smembers('fruits')) # 输出: {'cherry', 'apple'}
# 集合操作
r.sadd('basket1', 'apple', 'banana', 'cherry')
r.sadd('basket2', 'apple', 'date', 'elderberry')
# 交集
intersection = r.sinter('basket1', 'basket2')
print(f"Intersection: {intersection}") # 输出: {'apple'}
# 并集
union = r.sunion('basket1', 'basket2')
print(f"Union: {union}") # 输出: {'elderberry', 'cherry', 'apple', 'date', 'banana'}
# 差集
diff = r.sdiff('basket1', 'basket2')
print(f"Difference: {diff}") # 输出: {'cherry', 'banana'}
4.5 有序集合(Sorted Set)
有序集合类型是一种特殊的集合,它的每个元素都关联一个分数(score),并按照分数从小到大排序。有序集合适合用于排行榜、热门列表等场景。
以下是有序集合类型的常用操作:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 添加元素到有序集合
r.zadd('scores', {'Alice': 85, 'Bob': 92, 'Charlie': 78, 'David': 95})
# 获取有序集合元素(按分数从小到大)
members = r.zrange('scores', 0, -1, withscores=True)
print(members) # 输出: [('Charlie', 78.0), ('Alice', 85.0), ('Bob', 92.0), ('David', 95.0)]
# 获取有序集合元素(按分数从大到小)
members_desc = r.zrevrange('scores', 0, -1, withscores=True)
print(members_desc) # 输出: [('David', 95.0), ('Bob', 92.0), ('Alice', 85.0), ('Charlie', 78.0)]
# 获取元素排名(从0开始)
alice_rank = r.zrank('scores', 'Alice')
print(f"Alice's rank: {alice_rank}") # 输出: Alice's rank: 1
# 获取元素分数
bob_score = r.zscore('scores', 'Bob')
print(f"Bob's score: {bob_score}") # 输出: Bob's score: 92.0
# 增加元素分数
r.zincrby('scores', 5, 'Charlie')
print(r.zscore('scores', 'Charlie')) # 输出: 83.0
# 获取分数范围内的元素
high_scores = r.zrangebyscore('scores', 90, 100, withscores=True)
print(f"High scores: {high_scores}") # 输出: [('Bob', 92.0), ('David', 95.0)]
五、Redis高级功能及应用
5.1 发布/订阅模式
Redis的发布/订阅模式允许客户端订阅一个或多个频道,当有其他客户端向这些频道发布消息时,订阅者会收到相应的消息。这种模式适合实现实时消息系统、事件通知等功能。
以下是发布/订阅模式的示例代码:
import redis
import threading
import time
# 连接Redis服务器
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 订阅者类
class Subscriber(threading.Thread):
def __init__(self, channels):
threading.Thread.__init__(self)
self.redis = r
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channels)
def run(self):
for message in self.pubsub.listen():
if message['type'] == 'message':
print(f"Received message: {message['data']} on channel {message['channel']}")
# 发布者函数
def publisher():
time.sleep(1) # 等待订阅者启动
r.publish('news', 'Breaking news: Redis is awesome!')
time.sleep(1)
r.publish('sports', 'Sports news: Team A won the championship!')
time.sleep(1)
r.publish('news', 'Another news: Python is popular!')
# 创建并启动订阅者线程
subscriber = Subscriber(['news', 'sports'])
subscriber.daemon = True
subscriber.start()
# 创建并启动发布者线程
publisher_thread = threading.Thread(target=publisher)
publisher_thread.start()
# 等待发布者线程完成
publisher_thread.join()
# 停止订阅者
subscriber.pubsub.unsubscribe()
print("Done")
5.2 事务处理
Redis的事务允许在一次操作中执行多个命令,这些命令会被序列化并按顺序执行,中间不会插入其他客户端的命令。事务通过MULTI、EXEC、DISCARD和WATCH等命令实现。
以下是事务处理的示例代码:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 清空测试键
r.delete('balance', 'debt')
# 设置初始余额
r.set('balance', 100)
r.set('debt', 0)
# 定义回调函数,用于执行事务
def transfer(pipe):
# 获取当前余额
balance = int(pipe.get('balance'))
# 如果余额不足,抛出异常
if balance < 50:
pipe.reset() # 重置事务
raise redis.exceptions.AbortTransactionError("余额不足")
# 执行事务操作
pipe.multi()
pipe.decr('balance', 50)
pipe.incr('debt', 50)
# 使用WATCH监控balance键,确保在事务执行期间不被其他客户端修改
with r.pipeline() as pipe:
while True:
try:
# 监控balance键
pipe.watch('balance')
# 执行事务
transfer(pipe)
# 执行EXEC命令提交事务
pipe.execute()
print("转账成功")
break
except redis.exceptions.WatchError:
# 如果balance键被修改,重试
print("检测到并发修改,重试...")
continue
except redis.exceptions.AbortTransactionError as e:
print(f"事务中止: {e}")
break
# 查看结果
print(f"余额: {r.get('balance')}")
print(f"欠款: {r.get('debt')}")
5.3 Lua脚本
Redis支持执行Lua脚本,这使得在Redis服务器端执行复杂操作变得更加高效。Lua脚本可以原子性地执行多个命令,减少客户端与服务器之间的通信开销。
以下是Lua脚本的示例代码:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 清空测试键
r.delete('counter')
# 定义Lua脚本:原子性地增加计数器并返回新值
lua_script = """
local key = KEYS[1]
local increment = tonumber(ARGV[1])
local value = redis.call('INCRBY', key, increment)
return value
"""
# 加载Lua脚本到Redis服务器,获取脚本SHA1值
script_sha = r.register_script(lua_script)
# 执行Lua脚本
result = script_sha(keys=['counter'], args=[5])
print(f"第一次执行结果: {result}") # 输出: 5
result = script_sha(keys=['counter'], args=[3])
print(f"第二次执行结果: {result}") # 输出: 8
# 直接执行Lua脚本
result = r.eval(lua_script, 1, 'counter', 2)
print(f"第三次执行结果: {result}") # 输出: 10
5.4 分布式锁
在分布式系统中,多个进程可能需要访问共享资源,为了避免竞争条件,需要实现分布式锁。Redis可以通过SETNX(SET if Not eXists)命令和过期机制实现分布式锁。
以下是分布式锁的示例代码:
import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 分布式锁类
class DistributedLock:
def __init__(self, redis_client, lock_name, expire_time=10):
self.redis_client = redis_client
self.lock_name = lock_name
self.expire_time = expire_time
self.lock_key = f"lock:{lock_name}"
def acquire(self, retry_times=3, retry_delay=0.5):
"""获取锁"""
retries = 0
while retries < retry_times:
# 使用SET命令并设置NX和EX选项
result = self.redis_client.set(self.lock_key, "locked", nx=True, ex=self.expire_time)
if result:
return True
retries += 1
time.sleep(retry_delay)
return False
def release(self):
"""释放锁"""
# 使用Lua脚本确保原子性
lua_script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
return self.redis_client.eval(lua_script, 1, self.lock_key, "locked")
# 模拟共享资源
def worker(lock, name):
print(f"{name} 尝试获取锁")
if lock.acquire():
try:
print(f"{name} 已获取锁,开始工作")
time.sleep(2) # 模拟工作
print(f"{name} 完成工作,释放锁")
finally:
lock.release()
else:
print(f"{name} 获取锁失败")
# 创建锁
lock = DistributedLock(r, "resource_lock")
# 创建并启动多个工作线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(lock, f"Worker-{i}"))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有工作线程已完成")
六、Redis持久化与高可用
6.1 持久化
Redis支持两种持久化方式:RDB(Redis Database)和AOF(Append Only File)。
RDB持久化
RDB持久化是将Redis在某个时间点的数据快照保存到磁盘上。RDB文件是一个紧凑的二进制文件,适合用于备份、灾难恢复等场景。
以下是RDB持久化的配置示例:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 手动触发RDB持久化
r.bgsave() # 在后台执行SAVE命令
# 获取最后一次RDB持久化的时间
last_save_time = r.lastsave()
print(f"最后一次RDB持久化时间: {last_save_time}")
# 配置RDB自动持久化
# 在redis.conf中可以配置以下参数:
# save 900 1 # 900秒内至少有1个键被修改
# save 300 10 # 300秒内至少有10个键被修改
# save 60 10000 # 60秒内至少有10000个键被修改
AOF持久化
AOF持久化是将Redis执行的写命令追加到一个日志文件中。当Redis重启时,会重新执行这些命令来恢复数据。AOF持久化提供了更高的数据安全性,但日志文件通常比RDB文件大。
以下是AOF持久化的配置示例:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 查看AOF状态
aof_status = r.config_get('appendonly')
print(f"AOF状态: {aof_status}")
# 动态开启AOF持久化
# 注意:此操作会触发一次BGSAVE
r.config_set('appendonly', 'yes')
# 配置AOF持久化策略
# 在redis.conf中可以配置以下参数:
# appendfsync always # 每次写操作都同步到磁盘
# appendfsync everysec # 每秒同步一次(默认)
# appendfsync no # 由操作系统决定何时同步
6.2 高可用
为了保证Redis的高可用性,通常会采用主从复制、哨兵和集群等方式。
主从复制
主从复制是Redis高可用的基础,它允许将一个Redis服务器(主服务器)的数据复制到多个Redis服务器(从服务器)。主服务器负责写操作,从服务器负责读操作,从而提高系统的读写性能和可用性。
以下是主从复制的配置示例:
# 主服务器配置
import redis
# 连接主服务器
master = redis.Redis(host='master_host', port=6379, db=0, decode_responses=True)
# 设置键值对
master.set('name', 'Redis Master')
# 从服务器配置
# 连接从服务器
slave = redis.Redis(host='slave_host', port=6379, db=0, decode_responses=True)
# 从服务器向主服务器发送SYNC命令进行复制
# 在redis.conf中配置:slaveof master_host 6379
# 从服务器读取数据
name = slave.get('name')
print(f"从服务器读取到的数据: {name}") # 输出: Redis Master
哨兵(Sentinel)
Redis Sentinel是Redis官方提供的高可用解决方案,它可以监控Redis主从服务器的状态,当主服务器出现故障时,自动进行故障转移,将从服务器升级为主服务器。
以下是哨兵的配置示例:
from redis.sentinel import Sentinel
# 连接哨兵集群
sentinel = Sentinel([('sentinel1_host', 26379),
('sentinel2_host', 26379),
('sentinel3_host', 26379)],
socket_timeout=0.5)
# 获取主服务器连接
master = sentinel.master_for('mymaster', socket_timeout=0.5, decode_responses=True)
# 获取从服务器连接
slave = sentinel.slave_for('mymaster', socket_timeout=0.5, decode_responses=True)
# 主服务器写操作
master.set('key', 'value')
# 从服务器读操作
value = slave.get('key')
print(f"从服务器读取到的值: {value}") # 输出: value
集群(Cluster)
Redis Cluster是Redis官方提供的分布式解决方案,它将数据分片存储在多个节点上,每个节点负责一部分数据,从而实现水平扩展。Redis Cluster提供了自动故障转移和数据分片功能。
以下是Redis Cluster的配置示例:
from rediscluster import RedisCluster
# 连接Redis集群
startup_nodes = [
{"host": "node1_host", "port": "7000"},
{"host": "node2_host", "port": "7001"},
{"host": "node3_host", "port": "7002"}
]
# 创建集群连接
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# 设置键值对
rc.set("name", "Redis Cluster")
# 获取值
name = rc.get("name")
print(f"集群中获取的值: {name}") # 输出: Redis Cluster
七、实际案例应用
7.1 缓存应用
在Web应用中,经常会有一些频繁访问但更新不频繁的数据,如配置信息、热门文章等。使用Redis作为缓存可以显著提高应用的性能。
以下是一个简单的缓存应用示例:
import redis
import time
from functools import wraps
# 连接Redis服务器
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 缓存装饰器
def cache_it(expire_time=60):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"cache:{func.__name__}:{args}:{kwargs}"
# 尝试从缓存中获取数据
cached_data = r.get(cache_key)
if cached_data is not None:
print(f"从缓存中获取数据: {cache_key}")
return eval(cached_data)
# 缓存未命中,执行原函数
print(f"缓存未命中,执行函数: {func.__name__}")
result = func(*args, **kwargs)
# 将结果存入缓存
r.setex(cache_key, expire_time, str(result))
return result
return wrapper
return decorator
# 模拟从数据库获取数据的函数
@cache_it(expire_time=30)
def get_user_data(user_id):
# 模拟耗时的数据库查询
time.sleep(2)
return {
"user_id": user_id,
"name": f"User-{user_id}",
"age": 20 + user_id,
"email": f"user{user_id}@example.com"
}
# 测试缓存
print("第一次调用:")
user_data = get_user_data(1)
print(user_data)
print("\n第二次调用:")
user_data = get_user_data(1)
print(user_data)
# 等待缓存过期
print("\n等待30秒后再次调用:")
time.sleep(30)
user_data = get_user_data(1)
print(user_data)
7.2 限流应用
在分布式系统中,为了防止某个服务被过度调用,通常需要实现限流。Redis可以用于实现分布式限流,保证系统的稳定性。
以下是一个基于令牌桶算法的限流示例:
import redis
import time
# 连接Redis服务器
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 令牌桶限流类
class TokenBucket:
def __init__(self, redis_client, key, rate, capacity):
"""
初始化令牌桶
参数:
redis_client: Redis客户端
key: 限流标识
rate: 令牌生成速率 (个/秒)
capacity: 令牌桶容量
"""
self.redis_client = redis_client
self.key = key
self.rate = rate
self.capacity = capacity
def allow_request(self, tokens=1):
"""
检查请求是否被允许
参数:
tokens: 请求需要的令牌数
返回:
True: 允许请求
False: 拒绝请求
"""
current_time = time.time()
# 获取当前令牌数和上次更新时间
pipe = self.redis_client.pipeline()
with pipe:
try:
pipe.watch(self.key)
# 获取当前令牌桶状态
current_tokens, last_update = map(float, pipe.hmget(self.key, 'tokens', 'last_update') or [0, 0])
# 计算现在应该有的令牌数
now = time.time()
new_tokens = current_tokens + (now - last_update) * self.rate
new_tokens = min(self.capacity, new_tokens)
# 检查是否有足够的令牌
if new_tokens < tokens:
pipe.reset()
return False
# 扣除令牌并更新状态
pipe.multi()
pipe.hmset(self.key, {
'tokens': new_tokens - tokens,
'last_update': now
})
pipe.execute()
return True
except redis.exceptions.WatchError:
# 重试
return self.allow_request(tokens)
# 测试限流
def test_rate_limiter():
limiter = TokenBucket(r, "api:limiter", rate=2, capacity=10) # 每秒生成2个令牌,容量为10
for i in range(15):
if limiter.allow_request():
print(f"请求 {i+1}: 允许")
else:
print(f"请求 {i+1}: 拒绝")
time.sleep(0.2)
test_rate_limiter()
7.3 消息队列应用
Redis的列表数据结构可以用于实现简单的消息队列,支持生产者-消费者模式。
以下是一个消息队列的示例:
import redis
import threading
import time
# 连接Redis服务器
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 队列名称
QUEUE_NAME = 'task_queue'
# 生产者类
class Producer(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
for i in range(5):
task = f"{self.name}-task-{i}"
r.lpush(QUEUE_NAME, task)
print(f"{self.name} 生产了任务: {task}")
time.sleep(1)
# 消费者类
class Consumer(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
while True:
# 阻塞式获取任务
task = r.brpop(QUEUE_NAME, timeout=5)
if task is None:
print(f"{self.name} 等待超时,退出")
break
queue, task = task
print(f"{self.name} 消费了任务: {task}")
time.sleep(0.5)
# 创建生产者和消费者
producer1 = Producer("Producer-1")
producer2 = Producer("Producer-2")
consumer1 = Consumer("Consumer-1")
consumer2 = Consumer("Consumer-2")
# 启动生产者和消费者
producer1.start()
producer2.start()
consumer1.start()
consumer2.start()
# 等待所有线程完成
producer1.join()
producer2.join()
consumer1.join()
consumer2.join()
print("所有任务处理完毕")
八、相关资源
- Pypi地址:https://pypi.org/project/redis/
- Github地址:https://github.com/redis/redis-py
- 官方文档地址:https://redis-py.readthedocs.io/en/stable/
通过本文的介绍,你已经了解了Redis这个强大的Python库的基本概念、工作原理、安装配置以及各种应用场景。Redis作为一个高性能的键值对存储数据库,在缓存、消息队列、计数器、分布式锁等场景中有着广泛的应用。希望本文能够帮助你更好地理解和使用Redis,提升你的Python开发技能。
关注我,每天分享一个实用的Python自动化工具。
