一、rq 库概述
rq 全称 Redis Queue,是一款基于 Redis 开发的轻量级 Python 任务队列库,专注于处理异步任务与后台任务,核心原理是将任务函数与参数存入 Redis,由独立工作进程异步拉取执行,规避同步任务阻塞主程序的问题。该库采用 MIT 开源许可,优点是轻量简洁、易上手、无额外依赖、适配小型到中型项目;缺点是不支持复杂任务调度、集群能力较弱,更适合轻量异步场景。

二、rq 环境安装与基础配置
2.1 环境依赖准备
rq 强依赖 Redis 数据库,使用前需先完成 Redis 安装与启动,Windows、macOS、Linux 均有对应安装方式,安装后通过 redis-server 启动服务,默认端口 6379,无密码时本地可直接连接。
2.2 rq 库安装
rq 仅需通过 pip 即可安装,命令简洁无复杂配置,适合新手快速部署:
pip install rq安装完成后,可通过导入 rq 验证是否成功,无报错则说明安装正常:
import rq
print(rq.__version__)2.3 Redis 连接配置
rq 默认连接本地 Redis,如需自定义主机、端口、密码、数据库,可创建 Redis 连接对象,适配不同部署环境:
from redis import Redis
from rq import Queue
# 默认本地连接
redis_conn = Redis(host='localhost', port=6379, db=0)
# 带密码的远程连接
# redis_conn = Redis(host='xxx.xxx.xxx.xxx', port=6379, password='your_password', db=0)
# 初始化任务队列
task_queue = Queue(connection=redis_conn)这段代码的作用是建立 Python 程序与 Redis 的通信通道,所有任务都会通过该连接存入 Redis 队列,是 rq 运行的基础配置。
三、rq 核心使用方式与基础代码示例
3.1 定义可被执行的任务函数
rq 的任务本质是普通 Python 函数,需满足可被序列化、无复杂闭包的条件,先定义简单任务用于测试:
# tasks.py 任务文件,单独存放便于管理
import time
def simple_task(name):
"""简单异步任务:模拟耗时操作"""
time.sleep(2)
return f"Hello {name}, 异步任务执行完成!"
def calculate_sum(a, b):
"""计算两数之和的任务"""
time.sleep(1)
return f"{a} + {b} = {a + b}"将任务单独放在 tasks.py 文件,是因为 rq 工作进程需要通过模块路径找到函数,分散存放会导致任务无法执行。
3.2 向队列添加异步任务
创建主程序文件,将任务函数加入队列,实现异步提交,不阻塞主程序运行:
# main.py 主程序文件
from redis import Redis
from rq import Queue
from tasks import simple_task, calculate_sum
# 连接 Redis
redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue(connection=redis_conn)
# 提交任务到队列,非阻塞执行
job1 = queue.enqueue(simple_task, "Python开发者")
job2 = queue.enqueue(calculate_sum, 10, 20)
# 输出任务ID,用于后续查询
print(f"任务1 ID: {job1.id}")
print(f"任务2 ID: {job2.id}")
print("主程序继续执行,无需等待任务完成")代码说明:queue.enqueue() 是核心提交方法,第一个参数为任务函数,后续为函数参数,调用后立即返回任务对象,主程序不会等待任务执行,实现异步解耦。
3.3 启动 rq 工作进程执行任务
任务提交到 Redis 后,需要启动工作进程消费队列任务,打开新的命令行窗口,进入项目目录,执行:
rq worker执行后工作进程会持续监听 Redis 队列,一旦有新任务就立即执行,输出任务执行日志,执行完成后返回结果。
3.4 查看任务执行状态与结果
rq 提供丰富的任务状态查询方法,可在主程序中获取任务是否完成、结果、失败原因:
# result_check.py 任务结果查询
from redis import Redis
from rq import Queue
from rq.job import Job
redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue(connection=redis_conn)
# 通过任务ID获取任务
job = Job.fetch('你的任务ID', connection=redis_conn)
# 查询任务状态
print(f"任务是否执行完成: {job.is_finished}")
print(f"任务是否执行失败: {job.is_failed}")
print(f"任务执行结果: {job.result}")
print(f"任务执行状态: {job.get_status()}")任务状态分为 queued(排队中)、started(执行中)、finished(已完成)、failed(执行失败),可根据状态做后续业务处理。
四、rq 进阶功能使用
4.1 多队列管理
rq 支持创建多个队列,分类处理不同类型任务,避免任务阻塞:
# multi_queue.py
from redis import Redis
from rq import Queue
from tasks import simple_task, calculate_sum
redis_conn = Redis(host='localhost', port=6379, db=0)
# 创建不同优先级/类型的队列
high_queue = Queue('high', connection=redis_conn)
low_queue = Queue('low', connection=redis_conn)
# 向指定队列提交任务
high_queue.enqueue(calculate_sum, 100, 200)
low_queue.enqueue(simple_task, "普通用户")启动工作进程时可指定监听队列:
rq worker high low4.2 任务延迟执行
rq 支持设置任务延迟执行时间,单位为秒,满足定时异步任务需求:
# 延迟5秒执行任务
job = queue.enqueue(simple_task, "延迟任务", delay=5)代码说明:delay 参数指定任务提交后,等待指定秒数再被工作进程执行,适用于延迟通知、延迟审核等场景。
4.3 任务失败重试与异常处理
为任务设置失败重试次数、重试间隔,提升任务执行稳定性:
# retry_task.py 带重试的任务
from rq import Retry
# 最多重试3次,每次间隔2秒
retry_strategy = Retry(max=3, interval=2)
job = queue.enqueue(simple_task, "重试任务", retry=retry_strategy)同时可在任务函数中捕获异常,记录失败原因:
def error_task():
try:
# 可能出错的逻辑
1 / 0
except Exception as e:
print(f"任务执行异常: {str(e)}")
raise # 抛出异常让rq标记任务失败4.4 清空队列与删除任务
运维场景中可清空队列、删除指定任务,避免无效任务堆积:
# 清空当前队列所有任务
queue.empty()
# 删除指定任务
job.delete()五、rq 实际业务场景案例
5.1 案例一:异步发送邮件
实际项目中,发送邮件是耗时操作,用 rq 异步处理可提升接口响应速度:
# email_task.py
import time
import smtplib
from email.mime.text import MIMEText
def send_async_email(to_email, content):
"""异步发送邮件任务"""
try:
# 模拟邮件发送(实际项目替换为真实邮件配置)
time.sleep(3)
msg = MIMEText(content, 'plain', 'utf-8')
msg['From'] = '[email protected]'
msg['To'] = to_email
msg['Subject'] = '异步邮件通知'
# 模拟发送成功
print(f"邮件已发送至: {to_email}")
return True
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return False提交邮件任务:
# 提交异步邮件任务,不阻塞主程序
queue.enqueue(send_async_email, "[email protected]", "您的订单已支付成功")5.2 案例二:异步生成报表
大数据量报表生成耗时较长,通过 rq 后台执行,生成完成后通知用户:
# report_task.py
import time
import pandas as pd
def generate_excel_report(data_list, save_path):
"""异步生成Excel报表"""
time.sleep(5) # 模拟数据处理耗时
df = pd.DataFrame(data_list)
df.to_excel(save_path, index=False)
return f"报表生成完成,保存路径: {save_path}"提交报表任务:
data = [{'name': '张三', 'score': 90}, {'name': '李四', 'score': 85}]
queue.enqueue(generate_excel_report, data, "./report.xlsx")5.3 完整项目运行流程
- 启动 Redis 服务:
redis-server - 定义任务函数到
tasks.py - 主程序提交任务到队列
- 启动工作进程:
rq worker - 查看任务执行状态与结果
- 业务逻辑根据任务结果做后续处理
该流程可直接应用于 Web 项目、自动化脚本、数据分析工具中,解决同步任务阻塞问题。
六、相关资源
- Pypi地址:https://pypi.org/project/rq/
- Github地址:https://github.com/rq/rq
- 官方文档地址:https://python-rq.org/
关注我,每天分享一个实用的Python自动化工具。

