一、arq库基础认知
1.1 库核心用途
arq是一款基于Python asyncio与Redis开发的异步任务队列库,专注处理异步、延迟、后台耗时任务,完美适配异步Web框架,是异步架构中任务调度、异步解耦的核心工具。

1.2 工作原理
arq以Redis为中间件存储任务与状态,生产者通过异步接口投递任务,Worker异步监听Redis队列,自动拉取并执行任务,全程基于asyncio实现非阻塞运行,任务执行、重试、结果存储都依托Redis高效完成。
1.3 优缺点
优点:纯异步非阻塞、轻量无冗余依赖、API简洁易上手、兼容FastAPI/Starlette等异步框架、支持任务重试与延迟执行、性能优异。
缺点:仅支持Redis作为后端、功能比Celery精简、不支持多消息队列、复杂任务调度能力较弱。
1.4 License类型
MIT License,开源免费可商用。
二、arq环境安装与基础配置
2.1 安装arq与依赖
arq核心依赖Redis,安装命令:
pip install arq redis安装完成后可通过命令验证版本:
arq --version确保本地或远程Redis服务正常运行,arq默认连接本地127.0.0.1:6379无密码Redis实例。
2.2 基础连接配置
arq支持自定义Redis连接参数,包括主机、端口、密码、数据库索引,创建基础配置文件:
# redis_config.py
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
# 自定义Redis连接配置
redis_settings = RedisSettings(
host="127.0.0.1",
port=6379,
password="", # 有密码则填写
database=0,
timeout=5
)
# 测试Redis连接
async def test_redis_connection():
redis = await create_pool(redis_settings)
print("Redis连接成功")
await redis.close()
if __name__ == "__main__":
asyncio.run(test_redis_connection())代码说明:通过RedisSettings定义连接参数,create_pool创建异步连接池,实现arq与Redis的基础通信。
三、arq核心使用方式与代码实例
3.1 定义基础异步任务
arq任务必须是异步函数,这是其核心特性,创建任务文件:
# tasks.py
import asyncio
from arq import ArqRedis
# 基础异步任务
async def simple_task(ctx: ArqRedis, content: str) -> str:
"""
简单异步任务
:param ctx: 任务上下文,包含Redis连接等信息
:param content: 任务传入参数
:return: 任务执行结果
"""
print(f"开始执行简单任务:{content}")
# 模拟异步耗时操作
await asyncio.sleep(2)
result = f"任务执行完成,内容:{content}"
print(result)
return result
# 任务注册:Worker启动时加载的任务列表
async def startup(ctx):
"""Worker启动时执行的钩子函数"""
print("arq Worker启动成功")
async def shutdown(ctx):
"""Worker关闭时执行的钩子函数"""
print("arq Worker关闭成功")
# Worker配置类
class WorkerSettings:
# 注册可执行的任务函数
functions = [simple_task]
# 启动与关闭钩子
on_startup = startup
on_shutdown = shutdown
# 绑定Redis配置
redis_settings = RedisSettings(host="127.0.0.1", port=6379)代码说明:定义异步任务函数,通过WorkerSettings注册任务,配置启动关闭钩子,Worker会自动加载注册的任务。
3.2 启动arq Worker
Worker是任务消费者,负责监听队列并执行任务,命令行启动:
arq tasks.WorkerSettings启动成功后会输出:arq Worker启动成功,持续监听Redis任务队列。
3.3 投递任务到arq队列
创建生产者脚本,异步投递任务到arq:
# producer.py
import asyncio
from arq import create_pool
from tasks import redis_settings
# 投递任务
async def enqueue_task():
# 创建Redis连接池
redis = await create_pool(redis_settings)
# 投递任务:任务函数名 + 参数
job = await redis.enqueue_job("simple_task", "Hello arq异步任务队列")
# 获取任务ID
job_id = job.job_id
print(f"任务投递成功,任务ID:{job_id}")
# 等待任务执行完成并获取结果
job_result = await job.result(timeout=10)
print(f"任务执行结果:{job_result}")
await redis.close()
if __name__ == "__main__":
asyncio.run(enqueue_task())代码说明:通过enqueue_job投递任务,传入任务函数名与参数,可通过job.result()同步等待任务结果,适用于需要获取返回值的场景。
运行producer.py,Worker端会打印任务执行信息,生产者输出:
任务投递成功,任务ID:xxxxxxxx
任务执行结果:任务执行完成,内容:Hello arq异步任务队列3.4 延迟任务与定时任务
arq支持延迟执行任务,指定延迟秒数:
# 延迟任务:5秒后执行
async def delay_task(ctx: ArqRedis, msg: str):
await asyncio.sleep(1)
print(f"延迟任务执行:{msg}")
return f"延迟任务结果:{msg}"
# 在tasks.py的WorkerSettings中添加任务
functions = [simple_task, delay_task]投递延迟任务:
# producer.py中添加
async def enqueue_delay_task():
redis = await create_pool(redis_settings)
# 延迟5秒执行
job = await redis.enqueue_job("delay_task", "5秒后执行的延迟任务", _defer_by=5)
print(f"延迟任务投递成功,任务ID:{job.job_id}")
await redis.close()代码说明:_defer_by参数指定延迟秒数,arq会自动计算执行时间,到期后执行任务。
3.5 任务重试机制
arq内置任务重试功能,应对任务执行失败场景,配置重试次数与延迟:
# 可重试任务
async def retry_task(ctx: ArqRedis):
print("执行可重试任务")
# 模拟任务执行失败
raise Exception("任务执行异常,触发重试")
# 自定义任务配置
retry_task.arq_kwargs = {
"max_tries": 3, # 最大重试次数
"retry_delay": 2 # 重试间隔2秒
}在WorkerSettings中注册retry_task,投递任务后,Worker会自动重试3次,适合接口调用、数据同步等不稳定任务。
3.6 获取任务状态与结果
arq支持查询任务状态、结果、异常信息:
async def get_task_status(job_id: str):
redis = await create_pool(redis_settings)
# 获取任务对象
job = await redis.job(job_id)
if not job:
print("任务不存在")
return
# 获取任务状态
status = await job.status()
print(f"任务状态:{status}")
# 获取任务结果(未完成会抛出异常)
try:
result = await job.result()
print(f"任务结果:{result}")
except Exception as e:
print(f"任务未完成或异常:{e}")
await redis.close()任务状态包括:pending(等待中)、running(执行中)、complete(完成)、failed(失败)。
四、arq与FastAPI集成实战案例
4.1 集成背景
FastAPI是主流异步Web框架,与arq天然兼容,可实现Web接口异步处理耗时任务,如发送邮件、生成报表、数据爬取等。
4.2 集成代码实现
# fastapi_arq.py
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
import asyncio
app = FastAPI(title="arq+FastAPI异步任务实战")
# Redis配置
redis_settings = RedisSettings(host="127.0.0.1", port=6379)
# 定义异步任务
async def send_email_task(ctx, email: str, content: str):
"""模拟异步发送邮件任务"""
await asyncio.sleep(3)
print(f"向{email}发送邮件:{content}")
return f"邮件发送成功:{email}"
# Worker配置
class WorkerSettings:
functions = [send_email_task]
redis_settings = redis_settings
# 应用启动时创建arq连接池
@app.on_event("startup")
async def startup_event():
app.state.arq_redis = await create_pool(redis_settings)
# 应用关闭时关闭连接
@app.on_event("shutdown")
async def shutdown_event():
await app.state.arq_redis.close()
# 接口:投递发送邮件任务
@app.post("/send-email")
async def send_email(email: str, content: str):
job = await app.state.arq_redis.enqueue_job("send_email_task", email, content)
return {
"code": 200,
"msg": "邮件任务投递成功",
"job_id": job.job_id
}
# 接口:查询任务状态
@app.get("/task-status/{job_id}")
async def get_task_status(job_id: str):
job = await app.state.arq_redis.job(job_id)
if not job:
return {"code": 404, "msg": "任务不存在"}
status = await job.status()
result = None
if status == "complete":
result = await job.result()
return {
"code": 200,
"job_id": job_id,
"status": status,
"result": result
}4.3 启动与访问
- 启动arq Worker:
arq fastapi_arq.WorkerSettings- 启动FastAPI服务:
uvicorn fastapi_arq:app --reload- 访问接口:
- 投递任务:
POST http://127.0.0.1:8000/[email protected]&content=测试邮件 - 查询任务:
GET http://127.0.0.1:8000/task-status/任务ID
该案例实现了Web接口与异步任务解耦,用户请求无需等待耗时任务完成,提升接口响应速度与系统并发能力。
五、arq高级特性与实际场景优化
5.1 任务结果过期设置
arq默认永久存储任务结果,可配置过期时间释放Redis空间:
# 任务结果1小时后过期
job = await redis.enqueue_job("simple_task", "测试过期", _expires=3600)5.2 多任务队列隔离
arq支持自定义队列名称,实现不同业务任务隔离:
# 投递到订单队列
job = await redis.enqueue_job("order_task", "订单数据", _queue="order_queue")
# Worker监听指定队列
class WorkerSettings:
queue_name = "order_queue"
functions = [order_task]5.3 任务并发控制
调整Worker并发数,适配不同服务器性能:
class WorkerSettings:
functions = [simple_task, delay_task]
redis_settings = redis_settings
max_jobs = 10 # 最大并发执行10个任务相关资源
- Pypi地址:https://pypi.org/project/arq
- Github地址:https://github.com/samuelcolvin/arq
- 官方文档地址:https://arq-docs.helpmanual.io/
关注我,每天分享一个实用的Python自动化工具。

