一、Celery 库核心概述
Celery 是一款基于 Python 开发的分布式任务队列库,主要用于处理异步任务、定时任务和分布式任务,常应用于 Web 开发、数据分析、自动化运维等场景。其工作原理是通过生产者将任务发送到消息中间件(如 RabbitMQ、Redis),消费者(Worker)从中间件获取任务并执行,结果可存储在后端存储(如 Redis、数据库)中。

Celery 的优点包括轻量级、高可用、支持多种消息中间件和结果存储、可水平扩展;缺点是需要依赖第三方消息中间件,初次配置有一定门槛。该库采用 BSD 3-Clause 许可证,开源且可自由用于商业项目。
二、Celery 安装与环境准备
2.1 安装 Celery
Celery 可通过 pip 直接安装,同时需要根据选择的消息中间件安装对应的依赖库。以最常用的 Redis 为例,安装命令如下:
# 安装 Celery 核心库
pip install celery
# 安装 Redis 依赖(用于消息中间件和结果存储)
pip install redis如果选择 RabbitMQ 作为消息中间件,则需要安装对应的依赖:
pip install celery[librabbitmq]2.2 消息中间件选择与配置
Celery 本身不提供消息存储功能,必须依赖第三方消息中间件,常见的选择有两种:
- Redis:轻量级、高性能,适合中小型项目和开发环境,配置简单。
- RabbitMQ:专业的消息队列,可靠性更高,适合大型生产环境。
本教程以 Redis 为例进行演示,在使用前需要确保本地或服务器已安装并启动 Redis 服务。
三、Celery 核心组件与基础使用
3.1 Celery 核心组件
在使用 Celery 之前,需要先了解其三大核心组件:
- 生产者(Producer):负责创建任务并发送到消息队列,通常是我们的 Python 脚本或 Web 应用。
- 消费者(Worker):负责监听消息队列并执行任务,一个 Celery 系统可以有多个 Worker 实现分布式部署。
- 结果后端(Result Backend):用于存储任务的执行结果,可选 Redis、数据库、文件等方式。
3.2 第一个 Celery 应用:创建异步任务
我们先从最简单的异步任务开始,创建一个 Celery 实例并定义任务。
步骤1:创建 Celery 实例
新建一个名为 celery_app.py 的文件,内容如下:
# celery_app.py
from celery import Celery
# 初始化 Celery 应用
# 参数1:应用名称,可自定义
# broker:消息中间件的地址,这里使用 Redis
# backend:结果后端的地址,这里使用 Redis 存储任务结果
app = Celery(
'first_celery_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# 定义异步任务:计算两个数的和
@app.task
def add(x, y):
return x + y
# 定义异步任务:生成指定长度的随机字符串
import random
import string
@app.task
def generate_random_string(length):
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))代码说明:
Celery()函数用于创建应用实例,broker参数指定消息中间件地址,backend指定结果存储地址,redis://localhost:6379/0表示使用本地 Redis 的第 0 个数据库。@app.task装饰器用于将普通函数转换为 Celery 异步任务,被装饰的函数可以被异步调用。
步骤2:启动 Celery Worker
Celery Worker 是执行任务的进程,需要在命令行中启动。打开终端,进入 celery_app.py 所在的目录,执行以下命令:
# 启动 Worker,指定应用模块
# --loglevel=info 表示输出 info 级别日志,方便查看任务执行情况
celery -A celery_app worker --loglevel=info启动成功标志:终端会输出类似 celery@xxx ready. 的信息,此时 Worker 已经开始监听消息队列中的任务。
步骤3:调用异步任务
新建一个名为 task_producer.py 的文件,作为任务生产者调用上面定义的异步任务:
# task_producer.py
from celery_app import add, generate_random_string
# 异步调用任务:delay() 方法会将任务发送到消息队列
# 该方法会立即返回一个 AsyncResult 对象,不会阻塞当前进程
result1 = add.delay(10, 20)
result2 = generate_random_string.delay(10)
# 打印任务 ID,用于查询任务状态和结果
print(f"任务1 ID: {result1.id}")
print(f"任务2 ID: {result2.id}")
# 检查任务是否执行完成
print(f"任务1是否完成: {result1.ready()}")
print(f"任务2是否完成: {result2.ready()}")
# 获取任务执行结果(如果任务未完成,会阻塞直到任务完成)
print(f"任务1执行结果: {result1.get()}")
print(f"任务2执行结果: {result2.get()}")
# 获取任务执行状态
print(f"任务1执行状态: {result1.state}")
print(f"任务2执行状态: {result2.state}")代码说明:
delay()是 Celery 任务的异步调用方法,它会将任务参数序列化后发送到消息队列,然后立即返回AsyncResult对象。ready()方法用于判断任务是否执行完成,返回布尔值。get()方法用于获取任务执行结果,如果任务未完成,调用该方法会阻塞当前进程直到任务完成;也可以通过timeout参数设置超时时间。state属性返回任务当前的状态,常见状态有PENDING(等待中)、STARTED(执行中)、SUCCESS(执行成功)、FAILURE(执行失败)。
运行 task_producer.py,输出示例如下:
任务1 ID: 8f9d6b7e-5a3b-4c1e-9f2a-1b2c3d4e5f6g
任务2 ID: 9a8b7c6d-4e3f-2g1h-0i9j-8k7l6m5n4o3p
任务1是否完成: False
任务2是否完成: False
任务1执行结果: 30
任务2执行结果: xY7kP2qR9t
任务1执行状态: SUCCESS
任务2执行状态: SUCCESS3.3 处理任务执行异常
在实际应用中,任务执行可能会出现异常,Celery 提供了完善的异常处理机制。我们修改 celery_app.py,添加一个可能抛出异常的任务:
# celery_app.py
from celery import Celery
app = Celery(
'first_celery_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@app.task
def divide(x, y):
# 定义除法任务,当 y 为 0 时会抛出 ZeroDivisionError
return x / y然后创建 error_handler.py 调用该任务:
# error_handler.py
from celery_app import divide
# 调用除法任务,故意传入 y=0
result = divide.delay(10, 0)
try:
# 获取结果时如果任务抛出异常,会重新抛出该异常
print(result.get())
except ZeroDivisionError as e:
print(f"任务执行失败,异常信息: {e}")
# 也可以通过 result.failed() 判断任务是否执行失败
print(f"任务是否失败: {result.failed()}")
# 获取异常信息
print(f"任务异常信息: {result.result}")代码说明:
- 当任务执行抛出异常时,
get()方法会重新抛出该异常,我们可以通过 try-except 捕获并处理。 failed()方法用于判断任务是否执行失败,返回布尔值。result.result属性会返回任务抛出的异常对象。
四、Celery 定时任务(Periodic Tasks)
除了异步任务,Celery 还支持定时任务,类似于 Linux 的 crontab 或 Windows 的任务计划程序。我们可以通过配置定时任务,让 Celery 自动周期性执行指定任务。
4.1 配置定时任务
修改 celery_app.py,添加定时任务配置:
# celery_app.py
from celery import Celery
from celery.schedules import crontab
app = Celery(
'periodic_task_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# 配置定时任务
app.conf.beat_schedule = {
# 定时任务名称,可自定义
'add-every-10-seconds': {
# 指定要执行的任务
'task': 'celery_app.add',
# 执行周期:每 10 秒执行一次
'schedule': 10.0,
# 任务参数
'args': (100, 200)
},
# 另一个定时任务:每天凌晨 2 点执行
'generate-string-every-day': {
'task': 'celery_app.generate_random_string',
# 使用 crontab 表达式配置复杂周期
'schedule': crontab(hour=2, minute=0),
'args': (20,)
}
}
# 定义任务
@app.task
def add(x, y):
result = x + y
print(f"定时任务执行:{x} + {y} = {result}")
return result
@app.task
def generate_random_string(length):
import random
import string
chars = string.ascii_letters + string.digits
result = ''.join(random.choice(chars) for _ in range(length))
print(f"定时任务生成随机字符串:{result}")
return result代码说明:
app.conf.beat_schedule用于配置定时任务,每个键值对对应一个定时任务。schedule参数可以是数字(表示秒数),也可以是crontab对象(用于配置复杂周期,如每天、每周、每月执行)。crontab(hour=2, minute=0)表示每天凌晨 2 点执行任务,更多 crontab 表达式用法可参考 Celery 官方文档。
4.2 启动定时任务调度器(Beat)
定时任务需要两个进程配合:
- Beat 进程:负责按照配置的周期生成定时任务,并发送到消息队列。
- Worker 进程:负责执行定时任务。
首先启动 Beat 进程:
celery -A celery_app beat --loglevel=info然后启动 Worker 进程(新打开一个终端):
celery -A celery_app worker --loglevel=info启动成功后,Beat 进程会每 10 秒生成一个 add 任务发送到队列,Worker 进程会执行该任务并输出结果;每天凌晨 2 点会生成一个随机字符串任务。
五、Celery 任务进阶功能
5.1 任务优先级
在任务量较大时,我们可以为任务设置优先级,让重要的任务优先执行。Celery 支持为任务和 Worker 设置优先级,以 Redis 作为消息中间件为例,配置方法如下:
# celery_app.py
from celery import Celery
app = Celery(
'priority_task_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# 配置任务优先级
app.conf.task_queue_max_priority = 10 # 最大优先级为 10,数值越大优先级越高
@app.task(priority=5)
def normal_task():
return "这是一个普通优先级任务"
@app.task(priority=10)
def high_priority_task():
return "这是一个高优先级任务"启动 Worker 时,需要指定优先级支持:
celery -A celery_app worker --loglevel=info --priority=10调用任务时,高优先级任务会优先被 Worker 执行。
5.2 任务重试
当任务执行失败时,我们可以通过配置让 Celery 自动重试任务。修改任务定义,添加重试参数:
# celery_app.py
from celery import Celery
from celery.exceptions import Retry
app = Celery(
'retry_task_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@app.task(bind=True, max_retries=3, retry_backoff=2, retry_jitter=False)
def retry_divide(self, x, y):
"""
bind=True:将任务实例自身作为第一个参数传入
max_retries:最大重试次数
retry_backoff:重试间隔时间(秒),每次重试间隔会乘以 2(指数退避)
retry_jitter:是否添加随机抖动,False 表示固定间隔
"""
try:
return x / y
except ZeroDivisionError as e:
# 抛出 Retry 异常触发重试
self.retry(exc=e, countdown=5) # countdown 表示 5 秒后重试代码说明:
bind=True是实现任务重试的关键,它让任务函数可以访问自身的属性和方法。self.retry()方法用于触发任务重试,exc参数指定要重试的异常,countdown参数指定重试间隔。max_retries=3表示最多重试 3 次,超过次数后任务状态会变为FAILURE。
六、Celery 实际应用案例:批量数据处理
在数据分析场景中,我们经常需要处理大量数据,使用 Celery 可以将数据分发给多个 Worker 并行处理,提升效率。以下是一个批量处理用户数据的案例。
6.1 案例需求
有一个包含 1000 条用户数据的列表,需要对每条数据进行清洗(去除空值、格式化日期),然后将清洗后的数据保存到 Redis 中。
6.2 实现代码
步骤1:定义任务和应用
# data_processing_app.py
from celery import Celery
import json
from datetime import datetime
app = Celery(
'data_processing_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# 定义数据清洗任务
@app.task
def clean_user_data(user):
"""清洗单条用户数据"""
# 去除空值字段
cleaned_user = {k: v for k, v in user.items() if v is not None and v != ""}
# 格式化注册日期
if 'register_date' in cleaned_user:
try:
cleaned_user['register_date'] = datetime.strptime(
cleaned_user['register_date'],
'%Y-%m-%d'
).strftime('%Y/%m/%d')
except ValueError:
cleaned_user['register_date'] = '无效日期'
# 返回清洗后的数据
return cleaned_user
# 定义批量保存任务
@app.task
def batch_save_to_redis(cleaned_users):
"""批量保存清洗后的数据到 Redis"""
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 将数据序列化为 JSON 字符串
r.set('cleaned_user_data', json.dumps(cleaned_users))
return f"成功保存 {len(cleaned_users)} 条用户数据"步骤2:生成测试数据并调用任务
# data_producer.py
from data_processing_app import clean_user_data, batch_save_to_redis
import random
# 生成 1000 条测试用户数据
def generate_test_data():
users = []
for i in range(1000):
user = {
'id': i + 1,
'name': f'User_{i + 1}',
'age': random.randint(18, 60) if random.random() > 0.1 else None,
'register_date': f'202{random.randint(3, 5)}-{random.randint(1, 12)}-{random.randint(1, 28)}' if random.random() > 0.2 else ''
}
users.append(user)
return users
if __name__ == '__main__':
# 生成测试数据
test_users = generate_test_data()
# 异步调用清洗任务,处理每条数据
clean_tasks = [clean_user_data.delay(user) for user in test_users]
# 等待所有清洗任务完成,并收集结果
cleaned_users = [task.get() for task in clean_tasks]
# 调用批量保存任务
save_result = batch_save_to_redis.delay(cleaned_users)
print(save_result.get())步骤3:启动 Worker 并运行
启动 Worker 进程:
celery -A data_processing_app worker --loglevel=info --concurrency=4--concurrency=4 表示启动 4 个并发 Worker 进程,可根据 CPU 核心数调整。
运行 data_producer.py,程序会生成 1000 条测试数据,将每条数据的清洗任务发送到队列,Worker 并行处理后,再批量保存到 Redis 中。
七、Celery 相关资源
- Pypi地址:https://pypi.org/project/Celery
- Github地址:https://github.com/celery/celery
- 官方文档地址:https://docs.celeryq.dev/en/stable/
关注我,每天分享一个实用的Python自动化工具。

