Python异步任务队列神器:arq从入门到实战完全指南

一、arq库基础认知

1.1 库核心用途

arq是一款基于Python asyncio与Redis开发的异步任务队列库,专注处理异步、延迟、后台耗时任务,完美适配异步Web框架,是异步架构中任务调度、异步解耦的核心工具。

1.2 工作原理

arq以Redis为中间件存储任务与状态,生产者通过异步接口投递任务,Worker异步监听Redis队列,自动拉取并执行任务,全程基于asyncio实现非阻塞运行,任务执行、重试、结果存储都依托Redis高效完成。

1.3 优缺点

优点:纯异步非阻塞、轻量无冗余依赖、API简洁易上手、兼容FastAPI/Starlette等异步框架、支持任务重试与延迟执行、性能优异。
缺点:仅支持Redis作为后端、功能比Celery精简、不支持多消息队列、复杂任务调度能力较弱。

1.4 License类型

MIT License,开源免费可商用。

二、arq环境安装与基础配置

2.1 安装arq与依赖

arq核心依赖Redis,安装命令:

pip install arq redis

安装完成后可通过命令验证版本:

arq --version

确保本地或远程Redis服务正常运行,arq默认连接本地127.0.0.1:6379无密码Redis实例。

2.2 基础连接配置

arq支持自定义Redis连接参数,包括主机、端口、密码、数据库索引,创建基础配置文件:

# redis_config.py
import asyncio
from arq import create_pool
from arq.connections import RedisSettings

# 自定义Redis连接配置
redis_settings = RedisSettings(
    host="127.0.0.1",
    port=6379,
    password="",  # 有密码则填写
    database=0,
    timeout=5
)

# 测试Redis连接
async def test_redis_connection():
    redis = await create_pool(redis_settings)
    print("Redis连接成功")
    await redis.close()

if __name__ == "__main__":
    asyncio.run(test_redis_connection())

代码说明:通过RedisSettings定义连接参数,create_pool创建异步连接池,实现arq与Redis的基础通信。

三、arq核心使用方式与代码实例

3.1 定义基础异步任务

arq任务必须是异步函数,这是其核心特性,创建任务文件:

# tasks.py
import asyncio
from arq import ArqRedis

# 基础异步任务
async def simple_task(ctx: ArqRedis, content: str) -> str:
    """
    简单异步任务
    :param ctx: 任务上下文,包含Redis连接等信息
    :param content: 任务传入参数
    :return: 任务执行结果
    """
    print(f"开始执行简单任务:{content}")
    # 模拟异步耗时操作
    await asyncio.sleep(2)
    result = f"任务执行完成,内容:{content}"
    print(result)
    return result

# 任务注册:Worker启动时加载的任务列表
async def startup(ctx):
    """Worker启动时执行的钩子函数"""
    print("arq Worker启动成功")

async def shutdown(ctx):
    """Worker关闭时执行的钩子函数"""
    print("arq Worker关闭成功")

# Worker配置类
class WorkerSettings:
    # 注册可执行的任务函数
    functions = [simple_task]
    # 启动与关闭钩子
    on_startup = startup
    on_shutdown = shutdown
    # 绑定Redis配置
    redis_settings = RedisSettings(host="127.0.0.1", port=6379)

代码说明:定义异步任务函数,通过WorkerSettings注册任务,配置启动关闭钩子,Worker会自动加载注册的任务。

3.2 启动arq Worker

Worker是任务消费者,负责监听队列并执行任务,命令行启动:

arq tasks.WorkerSettings

启动成功后会输出:arq Worker启动成功,持续监听Redis任务队列。

3.3 投递任务到arq队列

创建生产者脚本,异步投递任务到arq:

# producer.py
import asyncio
from arq import create_pool
from tasks import redis_settings

# 投递任务
async def enqueue_task():
    # 创建Redis连接池
    redis = await create_pool(redis_settings)
    # 投递任务:任务函数名 + 参数
    job = await redis.enqueue_job("simple_task", "Hello arq异步任务队列")
    # 获取任务ID
    job_id = job.job_id
    print(f"任务投递成功,任务ID:{job_id}")

    # 等待任务执行完成并获取结果
    job_result = await job.result(timeout=10)
    print(f"任务执行结果:{job_result}")

    await redis.close()

if __name__ == "__main__":
    asyncio.run(enqueue_task())

代码说明:通过enqueue_job投递任务,传入任务函数名与参数,可通过job.result()同步等待任务结果,适用于需要获取返回值的场景。

运行producer.py,Worker端会打印任务执行信息,生产者输出:

任务投递成功,任务ID:xxxxxxxx
任务执行结果:任务执行完成,内容:Hello arq异步任务队列

3.4 延迟任务与定时任务

arq支持延迟执行任务,指定延迟秒数:

# 延迟任务:5秒后执行
async def delay_task(ctx: ArqRedis, msg: str):
    await asyncio.sleep(1)
    print(f"延迟任务执行:{msg}")
    return f"延迟任务结果:{msg}"

# 在tasks.py的WorkerSettings中添加任务
functions = [simple_task, delay_task]

投递延迟任务:

# producer.py中添加
async def enqueue_delay_task():
    redis = await create_pool(redis_settings)
    # 延迟5秒执行
    job = await redis.enqueue_job("delay_task", "5秒后执行的延迟任务", _defer_by=5)
    print(f"延迟任务投递成功,任务ID:{job.job_id}")
    await redis.close()

代码说明:_defer_by参数指定延迟秒数,arq会自动计算执行时间,到期后执行任务。

3.5 任务重试机制

arq内置任务重试功能,应对任务执行失败场景,配置重试次数与延迟:

# 可重试任务
async def retry_task(ctx: ArqRedis):
    print("执行可重试任务")
    # 模拟任务执行失败
    raise Exception("任务执行异常,触发重试")

# 自定义任务配置
retry_task.arq_kwargs = {
    "max_tries": 3,        # 最大重试次数
    "retry_delay": 2       # 重试间隔2秒
}

WorkerSettings中注册retry_task,投递任务后,Worker会自动重试3次,适合接口调用、数据同步等不稳定任务。

3.6 获取任务状态与结果

arq支持查询任务状态、结果、异常信息:

async def get_task_status(job_id: str):
    redis = await create_pool(redis_settings)
    # 获取任务对象
    job = await redis.job(job_id)
    if not job:
        print("任务不存在")
        return

    # 获取任务状态
    status = await job.status()
    print(f"任务状态:{status}")

    # 获取任务结果(未完成会抛出异常)
    try:
        result = await job.result()
        print(f"任务结果:{result}")
    except Exception as e:
        print(f"任务未完成或异常:{e}")

    await redis.close()

任务状态包括:pending(等待中)、running(执行中)、complete(完成)、failed(失败)。

四、arq与FastAPI集成实战案例

4.1 集成背景

FastAPI是主流异步Web框架,与arq天然兼容,可实现Web接口异步处理耗时任务,如发送邮件、生成报表、数据爬取等。

4.2 集成代码实现

# fastapi_arq.py
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
import asyncio

app = FastAPI(title="arq+FastAPI异步任务实战")

# Redis配置
redis_settings = RedisSettings(host="127.0.0.1", port=6379)

# 定义异步任务
async def send_email_task(ctx, email: str, content: str):
    """模拟异步发送邮件任务"""
    await asyncio.sleep(3)
    print(f"向{email}发送邮件:{content}")
    return f"邮件发送成功:{email}"

# Worker配置
class WorkerSettings:
    functions = [send_email_task]
    redis_settings = redis_settings

# 应用启动时创建arq连接池
@app.on_event("startup")
async def startup_event():
    app.state.arq_redis = await create_pool(redis_settings)

# 应用关闭时关闭连接
@app.on_event("shutdown")
async def shutdown_event():
    await app.state.arq_redis.close()

# 接口:投递发送邮件任务
@app.post("/send-email")
async def send_email(email: str, content: str):
    job = await app.state.arq_redis.enqueue_job("send_email_task", email, content)
    return {
        "code": 200,
        "msg": "邮件任务投递成功",
        "job_id": job.job_id
    }

# 接口:查询任务状态
@app.get("/task-status/{job_id}")
async def get_task_status(job_id: str):
    job = await app.state.arq_redis.job(job_id)
    if not job:
        return {"code": 404, "msg": "任务不存在"}
    status = await job.status()
    result = None
    if status == "complete":
        result = await job.result()
    return {
        "code": 200,
        "job_id": job_id,
        "status": status,
        "result": result
    }

4.3 启动与访问

  1. 启动arq Worker:
arq fastapi_arq.WorkerSettings
  1. 启动FastAPI服务:
uvicorn fastapi_arq:app --reload
  1. 访问接口:
  • 投递任务:POST http://127.0.0.1:8000/[email protected]&content=测试邮件
  • 查询任务:GET http://127.0.0.1:8000/task-status/任务ID

该案例实现了Web接口与异步任务解耦,用户请求无需等待耗时任务完成,提升接口响应速度与系统并发能力。

五、arq高级特性与实际场景优化

5.1 任务结果过期设置

arq默认永久存储任务结果,可配置过期时间释放Redis空间:

# 任务结果1小时后过期
job = await redis.enqueue_job("simple_task", "测试过期", _expires=3600)

5.2 多任务队列隔离

arq支持自定义队列名称,实现不同业务任务隔离:

# 投递到订单队列
job = await redis.enqueue_job("order_task", "订单数据", _queue="order_queue")

# Worker监听指定队列
class WorkerSettings:
    queue_name = "order_queue"
    functions = [order_task]

5.3 任务并发控制

调整Worker并发数,适配不同服务器性能:

class WorkerSettings:
    functions = [simple_task, delay_task]
    redis_settings = redis_settings
    max_jobs = 10  # 最大并发执行10个任务

相关资源

  • Pypi地址:https://pypi.org/project/arq
  • Github地址:https://github.com/samuelcolvin/arq
  • 官方文档地址:https://arq-docs.helpmanual.io/

关注我,每天分享一个实用的Python自动化工具。