Python 任务队列利器:rq 从入门到实战完全指南

一、rq 库概述

rq 全称 Redis Queue,是一款基于 Redis 开发的轻量级 Python 任务队列库,专注于处理异步任务与后台任务,核心原理是将任务函数与参数存入 Redis,由独立工作进程异步拉取执行,规避同步任务阻塞主程序的问题。该库采用 MIT 开源许可,优点是轻量简洁、易上手、无额外依赖、适配小型到中型项目;缺点是不支持复杂任务调度、集群能力较弱,更适合轻量异步场景。

二、rq 环境安装与基础配置

2.1 环境依赖准备

rq 强依赖 Redis 数据库,使用前需先完成 Redis 安装与启动,Windows、macOS、Linux 均有对应安装方式,安装后通过 redis-server 启动服务,默认端口 6379,无密码时本地可直接连接。

2.2 rq 库安装

rq 仅需通过 pip 即可安装,命令简洁无复杂配置,适合新手快速部署:

pip install rq

安装完成后,可通过导入 rq 验证是否成功,无报错则说明安装正常:

import rq
print(rq.__version__)

2.3 Redis 连接配置

rq 默认连接本地 Redis,如需自定义主机、端口、密码、数据库,可创建 Redis 连接对象,适配不同部署环境:

from redis import Redis
from rq import Queue

# 默认本地连接
redis_conn = Redis(host='localhost', port=6379, db=0)

# 带密码的远程连接
# redis_conn = Redis(host='xxx.xxx.xxx.xxx', port=6379, password='your_password', db=0)

# 初始化任务队列
task_queue = Queue(connection=redis_conn)

这段代码的作用是建立 Python 程序与 Redis 的通信通道,所有任务都会通过该连接存入 Redis 队列,是 rq 运行的基础配置。

三、rq 核心使用方式与基础代码示例

3.1 定义可被执行的任务函数

rq 的任务本质是普通 Python 函数,需满足可被序列化无复杂闭包的条件,先定义简单任务用于测试:

# tasks.py 任务文件,单独存放便于管理
import time

def simple_task(name):
    """简单异步任务:模拟耗时操作"""
    time.sleep(2)
    return f"Hello {name}, 异步任务执行完成!"

def calculate_sum(a, b):
    """计算两数之和的任务"""
    time.sleep(1)
    return f"{a} + {b} = {a + b}"

将任务单独放在 tasks.py 文件,是因为 rq 工作进程需要通过模块路径找到函数,分散存放会导致任务无法执行。

3.2 向队列添加异步任务

创建主程序文件,将任务函数加入队列,实现异步提交,不阻塞主程序运行:

# main.py 主程序文件
from redis import Redis
from rq import Queue
from tasks import simple_task, calculate_sum

# 连接 Redis
redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue(connection=redis_conn)

# 提交任务到队列,非阻塞执行
job1 = queue.enqueue(simple_task, "Python开发者")
job2 = queue.enqueue(calculate_sum, 10, 20)

# 输出任务ID,用于后续查询
print(f"任务1 ID: {job1.id}")
print(f"任务2 ID: {job2.id}")
print("主程序继续执行,无需等待任务完成")

代码说明:queue.enqueue() 是核心提交方法,第一个参数为任务函数,后续为函数参数,调用后立即返回任务对象,主程序不会等待任务执行,实现异步解耦。

3.3 启动 rq 工作进程执行任务

任务提交到 Redis 后,需要启动工作进程消费队列任务,打开新的命令行窗口,进入项目目录,执行:

rq worker

执行后工作进程会持续监听 Redis 队列,一旦有新任务就立即执行,输出任务执行日志,执行完成后返回结果。

3.4 查看任务执行状态与结果

rq 提供丰富的任务状态查询方法,可在主程序中获取任务是否完成、结果、失败原因:

# result_check.py 任务结果查询
from redis import Redis
from rq import Queue
from rq.job import Job

redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue(connection=redis_conn)

# 通过任务ID获取任务
job = Job.fetch('你的任务ID', connection=redis_conn)

# 查询任务状态
print(f"任务是否执行完成: {job.is_finished}")
print(f"任务是否执行失败: {job.is_failed}")
print(f"任务执行结果: {job.result}")
print(f"任务执行状态: {job.get_status()}")

任务状态分为 queued(排队中)、started(执行中)、finished(已完成)、failed(执行失败),可根据状态做后续业务处理。

四、rq 进阶功能使用

4.1 多队列管理

rq 支持创建多个队列,分类处理不同类型任务,避免任务阻塞:

# multi_queue.py
from redis import Redis
from rq import Queue
from tasks import simple_task, calculate_sum

redis_conn = Redis(host='localhost', port=6379, db=0)

# 创建不同优先级/类型的队列
high_queue = Queue('high', connection=redis_conn)
low_queue = Queue('low', connection=redis_conn)

# 向指定队列提交任务
high_queue.enqueue(calculate_sum, 100, 200)
low_queue.enqueue(simple_task, "普通用户")

启动工作进程时可指定监听队列:

rq worker high low

4.2 任务延迟执行

rq 支持设置任务延迟执行时间,单位为秒,满足定时异步任务需求:

# 延迟5秒执行任务
job = queue.enqueue(simple_task, "延迟任务", delay=5)

代码说明:delay 参数指定任务提交后,等待指定秒数再被工作进程执行,适用于延迟通知、延迟审核等场景。

4.3 任务失败重试与异常处理

为任务设置失败重试次数、重试间隔,提升任务执行稳定性:

# retry_task.py 带重试的任务
from rq import Retry

# 最多重试3次,每次间隔2秒
retry_strategy = Retry(max=3, interval=2)
job = queue.enqueue(simple_task, "重试任务", retry=retry_strategy)

同时可在任务函数中捕获异常,记录失败原因:

def error_task():
    try:
        # 可能出错的逻辑
        1 / 0
    except Exception as e:
        print(f"任务执行异常: {str(e)}")
        raise  # 抛出异常让rq标记任务失败

4.4 清空队列与删除任务

运维场景中可清空队列、删除指定任务,避免无效任务堆积:

# 清空当前队列所有任务
queue.empty()

# 删除指定任务
job.delete()

五、rq 实际业务场景案例

5.1 案例一:异步发送邮件

实际项目中,发送邮件是耗时操作,用 rq 异步处理可提升接口响应速度:

# email_task.py
import time
import smtplib
from email.mime.text import MIMEText

def send_async_email(to_email, content):
    """异步发送邮件任务"""
    try:
        # 模拟邮件发送(实际项目替换为真实邮件配置)
        time.sleep(3)
        msg = MIMEText(content, 'plain', 'utf-8')
        msg['From'] = '[email protected]'
        msg['To'] = to_email
        msg['Subject'] = '异步邮件通知'

        # 模拟发送成功
        print(f"邮件已发送至: {to_email}")
        return True
    except Exception as e:
        print(f"邮件发送失败: {str(e)}")
        return False

提交邮件任务:

# 提交异步邮件任务,不阻塞主程序
queue.enqueue(send_async_email, "[email protected]", "您的订单已支付成功")

5.2 案例二:异步生成报表

大数据量报表生成耗时较长,通过 rq 后台执行,生成完成后通知用户:

# report_task.py
import time
import pandas as pd

def generate_excel_report(data_list, save_path):
    """异步生成Excel报表"""
    time.sleep(5)  # 模拟数据处理耗时
    df = pd.DataFrame(data_list)
    df.to_excel(save_path, index=False)
    return f"报表生成完成,保存路径: {save_path}"

提交报表任务:

data = [{'name': '张三', 'score': 90}, {'name': '李四', 'score': 85}]
queue.enqueue(generate_excel_report, data, "./report.xlsx")

5.3 完整项目运行流程

  1. 启动 Redis 服务:redis-server
  2. 定义任务函数到 tasks.py
  3. 主程序提交任务到队列
  4. 启动工作进程:rq worker
  5. 查看任务执行状态与结果
  6. 业务逻辑根据任务结果做后续处理

该流程可直接应用于 Web 项目、自动化脚本、数据分析工具中,解决同步任务阻塞问题。

六、相关资源

  • Pypi地址:https://pypi.org/project/rq/
  • Github地址:https://github.com/rq/rq
  • 官方文档地址:https://python-rq.org/

关注我,每天分享一个实用的Python自动化工具。