Python实用工具:Celery分布式任务队列入门与实战教程

一、Celery 库核心概述

Celery 是一款基于 Python 开发的分布式任务队列库,主要用于处理异步任务、定时任务和分布式任务,常应用于 Web 开发、数据分析、自动化运维等场景。其工作原理是通过生产者将任务发送到消息中间件(如 RabbitMQ、Redis),消费者(Worker)从中间件获取任务并执行,结果可存储在后端存储(如 Redis、数据库)中。

Celery 的优点包括轻量级、高可用、支持多种消息中间件和结果存储、可水平扩展;缺点是需要依赖第三方消息中间件,初次配置有一定门槛。该库采用 BSD 3-Clause 许可证,开源且可自由用于商业项目。

二、Celery 安装与环境准备

2.1 安装 Celery

Celery 可通过 pip 直接安装,同时需要根据选择的消息中间件安装对应的依赖库。以最常用的 Redis 为例,安装命令如下:

# 安装 Celery 核心库
pip install celery
# 安装 Redis 依赖(用于消息中间件和结果存储)
pip install redis

如果选择 RabbitMQ 作为消息中间件,则需要安装对应的依赖:

pip install celery[librabbitmq]

2.2 消息中间件选择与配置

Celery 本身不提供消息存储功能,必须依赖第三方消息中间件,常见的选择有两种:

  1. Redis:轻量级、高性能,适合中小型项目和开发环境,配置简单。
  2. RabbitMQ:专业的消息队列,可靠性更高,适合大型生产环境。

本教程以 Redis 为例进行演示,在使用前需要确保本地或服务器已安装并启动 Redis 服务。

三、Celery 核心组件与基础使用

3.1 Celery 核心组件

在使用 Celery 之前,需要先了解其三大核心组件:

  • 生产者(Producer):负责创建任务并发送到消息队列,通常是我们的 Python 脚本或 Web 应用。
  • 消费者(Worker):负责监听消息队列并执行任务,一个 Celery 系统可以有多个 Worker 实现分布式部署。
  • 结果后端(Result Backend):用于存储任务的执行结果,可选 Redis、数据库、文件等方式。

3.2 第一个 Celery 应用:创建异步任务

我们先从最简单的异步任务开始,创建一个 Celery 实例并定义任务。

步骤1:创建 Celery 实例

新建一个名为 celery_app.py 的文件,内容如下:

# celery_app.py
from celery import Celery

# 初始化 Celery 应用
# 参数1:应用名称,可自定义
# broker:消息中间件的地址,这里使用 Redis
# backend:结果后端的地址,这里使用 Redis 存储任务结果
app = Celery(
    'first_celery_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 定义异步任务:计算两个数的和
@app.task
def add(x, y):
    return x + y

# 定义异步任务:生成指定长度的随机字符串
import random
import string
@app.task
def generate_random_string(length):
    chars = string.ascii_letters + string.digits
    return ''.join(random.choice(chars) for _ in range(length))

代码说明

  • Celery() 函数用于创建应用实例,broker 参数指定消息中间件地址,backend 指定结果存储地址,redis://localhost:6379/0 表示使用本地 Redis 的第 0 个数据库。
  • @app.task 装饰器用于将普通函数转换为 Celery 异步任务,被装饰的函数可以被异步调用。

步骤2:启动 Celery Worker

Celery Worker 是执行任务的进程,需要在命令行中启动。打开终端,进入 celery_app.py 所在的目录,执行以下命令:

# 启动 Worker,指定应用模块
# --loglevel=info 表示输出 info 级别日志,方便查看任务执行情况
celery -A celery_app worker --loglevel=info

启动成功标志:终端会输出类似 celery@xxx ready. 的信息,此时 Worker 已经开始监听消息队列中的任务。

步骤3:调用异步任务

新建一个名为 task_producer.py 的文件,作为任务生产者调用上面定义的异步任务:

# task_producer.py
from celery_app import add, generate_random_string

# 异步调用任务:delay() 方法会将任务发送到消息队列
# 该方法会立即返回一个 AsyncResult 对象,不会阻塞当前进程
result1 = add.delay(10, 20)
result2 = generate_random_string.delay(10)

# 打印任务 ID,用于查询任务状态和结果
print(f"任务1 ID: {result1.id}")
print(f"任务2 ID: {result2.id}")

# 检查任务是否执行完成
print(f"任务1是否完成: {result1.ready()}")
print(f"任务2是否完成: {result2.ready()}")

# 获取任务执行结果(如果任务未完成,会阻塞直到任务完成)
print(f"任务1执行结果: {result1.get()}")
print(f"任务2执行结果: {result2.get()}")

# 获取任务执行状态
print(f"任务1执行状态: {result1.state}")
print(f"任务2执行状态: {result2.state}")

代码说明

  • delay() 是 Celery 任务的异步调用方法,它会将任务参数序列化后发送到消息队列,然后立即返回 AsyncResult 对象。
  • ready() 方法用于判断任务是否执行完成,返回布尔值。
  • get() 方法用于获取任务执行结果,如果任务未完成,调用该方法会阻塞当前进程直到任务完成;也可以通过 timeout 参数设置超时时间。
  • state 属性返回任务当前的状态,常见状态有 PENDING(等待中)、STARTED(执行中)、SUCCESS(执行成功)、FAILURE(执行失败)。

运行 task_producer.py,输出示例如下:

任务1 ID: 8f9d6b7e-5a3b-4c1e-9f2a-1b2c3d4e5f6g
任务2 ID: 9a8b7c6d-4e3f-2g1h-0i9j-8k7l6m5n4o3p
任务1是否完成: False
任务2是否完成: False
任务1执行结果: 30
任务2执行结果: xY7kP2qR9t
任务1执行状态: SUCCESS
任务2执行状态: SUCCESS

3.3 处理任务执行异常

在实际应用中,任务执行可能会出现异常,Celery 提供了完善的异常处理机制。我们修改 celery_app.py,添加一个可能抛出异常的任务:

# celery_app.py
from celery import Celery

app = Celery(
    'first_celery_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@app.task
def divide(x, y):
    # 定义除法任务,当 y 为 0 时会抛出 ZeroDivisionError
    return x / y

然后创建 error_handler.py 调用该任务:

# error_handler.py
from celery_app import divide

# 调用除法任务,故意传入 y=0
result = divide.delay(10, 0)

try:
    # 获取结果时如果任务抛出异常,会重新抛出该异常
    print(result.get())
except ZeroDivisionError as e:
    print(f"任务执行失败,异常信息: {e}")

# 也可以通过 result.failed() 判断任务是否执行失败
print(f"任务是否失败: {result.failed()}")
# 获取异常信息
print(f"任务异常信息: {result.result}")

代码说明

  • 当任务执行抛出异常时,get() 方法会重新抛出该异常,我们可以通过 try-except 捕获并处理。
  • failed() 方法用于判断任务是否执行失败,返回布尔值。
  • result.result 属性会返回任务抛出的异常对象。

四、Celery 定时任务(Periodic Tasks)

除了异步任务,Celery 还支持定时任务,类似于 Linux 的 crontab 或 Windows 的任务计划程序。我们可以通过配置定时任务,让 Celery 自动周期性执行指定任务。

4.1 配置定时任务

修改 celery_app.py,添加定时任务配置:

# celery_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery(
    'periodic_task_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 配置定时任务
app.conf.beat_schedule = {
    # 定时任务名称,可自定义
    'add-every-10-seconds': {
        # 指定要执行的任务
        'task': 'celery_app.add',
        # 执行周期:每 10 秒执行一次
        'schedule': 10.0,
        # 任务参数
        'args': (100, 200)
    },
    # 另一个定时任务:每天凌晨 2 点执行
    'generate-string-every-day': {
        'task': 'celery_app.generate_random_string',
        # 使用 crontab 表达式配置复杂周期
        'schedule': crontab(hour=2, minute=0),
        'args': (20,)
    }
}

# 定义任务
@app.task
def add(x, y):
    result = x + y
    print(f"定时任务执行:{x} + {y} = {result}")
    return result

@app.task
def generate_random_string(length):
    import random
    import string
    chars = string.ascii_letters + string.digits
    result = ''.join(random.choice(chars) for _ in range(length))
    print(f"定时任务生成随机字符串:{result}")
    return result

代码说明

  • app.conf.beat_schedule 用于配置定时任务,每个键值对对应一个定时任务。
  • schedule 参数可以是数字(表示秒数),也可以是 crontab 对象(用于配置复杂周期,如每天、每周、每月执行)。
  • crontab(hour=2, minute=0) 表示每天凌晨 2 点执行任务,更多 crontab 表达式用法可参考 Celery 官方文档。

4.2 启动定时任务调度器(Beat)

定时任务需要两个进程配合:

  1. Beat 进程:负责按照配置的周期生成定时任务,并发送到消息队列。
  2. Worker 进程:负责执行定时任务。

首先启动 Beat 进程:

celery -A celery_app beat --loglevel=info

然后启动 Worker 进程(新打开一个终端):

celery -A celery_app worker --loglevel=info

启动成功后,Beat 进程会每 10 秒生成一个 add 任务发送到队列,Worker 进程会执行该任务并输出结果;每天凌晨 2 点会生成一个随机字符串任务。

五、Celery 任务进阶功能

5.1 任务优先级

在任务量较大时,我们可以为任务设置优先级,让重要的任务优先执行。Celery 支持为任务和 Worker 设置优先级,以 Redis 作为消息中间件为例,配置方法如下:

# celery_app.py
from celery import Celery

app = Celery(
    'priority_task_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 配置任务优先级
app.conf.task_queue_max_priority = 10  # 最大优先级为 10,数值越大优先级越高

@app.task(priority=5)
def normal_task():
    return "这是一个普通优先级任务"

@app.task(priority=10)
def high_priority_task():
    return "这是一个高优先级任务"

启动 Worker 时,需要指定优先级支持:

celery -A celery_app worker --loglevel=info --priority=10

调用任务时,高优先级任务会优先被 Worker 执行。

5.2 任务重试

当任务执行失败时,我们可以通过配置让 Celery 自动重试任务。修改任务定义,添加重试参数:

# celery_app.py
from celery import Celery
from celery.exceptions import Retry

app = Celery(
    'retry_task_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@app.task(bind=True, max_retries=3, retry_backoff=2, retry_jitter=False)
def retry_divide(self, x, y):
    """
    bind=True:将任务实例自身作为第一个参数传入
    max_retries:最大重试次数
    retry_backoff:重试间隔时间(秒),每次重试间隔会乘以 2(指数退避)
    retry_jitter:是否添加随机抖动,False 表示固定间隔
    """
    try:
        return x / y
    except ZeroDivisionError as e:
        # 抛出 Retry 异常触发重试
        self.retry(exc=e, countdown=5)  # countdown 表示 5 秒后重试

代码说明

  • bind=True 是实现任务重试的关键,它让任务函数可以访问自身的属性和方法。
  • self.retry() 方法用于触发任务重试,exc 参数指定要重试的异常,countdown 参数指定重试间隔。
  • max_retries=3 表示最多重试 3 次,超过次数后任务状态会变为 FAILURE

六、Celery 实际应用案例:批量数据处理

在数据分析场景中,我们经常需要处理大量数据,使用 Celery 可以将数据分发给多个 Worker 并行处理,提升效率。以下是一个批量处理用户数据的案例。

6.1 案例需求

有一个包含 1000 条用户数据的列表,需要对每条数据进行清洗(去除空值、格式化日期),然后将清洗后的数据保存到 Redis 中。

6.2 实现代码

步骤1:定义任务和应用

# data_processing_app.py
from celery import Celery
import json
from datetime import datetime

app = Celery(
    'data_processing_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 定义数据清洗任务
@app.task
def clean_user_data(user):
    """清洗单条用户数据"""
    # 去除空值字段
    cleaned_user = {k: v for k, v in user.items() if v is not None and v != ""}
    # 格式化注册日期
    if 'register_date' in cleaned_user:
        try:
            cleaned_user['register_date'] = datetime.strptime(
                cleaned_user['register_date'],
                '%Y-%m-%d'
            ).strftime('%Y/%m/%d')
        except ValueError:
            cleaned_user['register_date'] = '无效日期'
    # 返回清洗后的数据
    return cleaned_user

# 定义批量保存任务
@app.task
def batch_save_to_redis(cleaned_users):
    """批量保存清洗后的数据到 Redis"""
    import redis
    r = redis.Redis(host='localhost', port=6379, db=0)
    # 将数据序列化为 JSON 字符串
    r.set('cleaned_user_data', json.dumps(cleaned_users))
    return f"成功保存 {len(cleaned_users)} 条用户数据"

步骤2:生成测试数据并调用任务

# data_producer.py
from data_processing_app import clean_user_data, batch_save_to_redis
import random

# 生成 1000 条测试用户数据
def generate_test_data():
    users = []
    for i in range(1000):
        user = {
            'id': i + 1,
            'name': f'User_{i + 1}',
            'age': random.randint(18, 60) if random.random() > 0.1 else None,
            'register_date': f'202{random.randint(3, 5)}-{random.randint(1, 12)}-{random.randint(1, 28)}' if random.random() > 0.2 else ''
        }
        users.append(user)
    return users

if __name__ == '__main__':
    # 生成测试数据
    test_users = generate_test_data()
    # 异步调用清洗任务,处理每条数据
    clean_tasks = [clean_user_data.delay(user) for user in test_users]
    # 等待所有清洗任务完成,并收集结果
    cleaned_users = [task.get() for task in clean_tasks]
    # 调用批量保存任务
    save_result = batch_save_to_redis.delay(cleaned_users)
    print(save_result.get())

步骤3:启动 Worker 并运行

启动 Worker 进程:

celery -A data_processing_app worker --loglevel=info --concurrency=4

--concurrency=4 表示启动 4 个并发 Worker 进程,可根据 CPU 核心数调整。

运行 data_producer.py,程序会生成 1000 条测试数据,将每条数据的清洗任务发送到队列,Worker 并行处理后,再批量保存到 Redis 中。

七、Celery 相关资源

  • Pypi地址:https://pypi.org/project/Celery
  • Github地址:https://github.com/celery/celery
  • 官方文档地址:https://docs.celeryq.dev/en/stable/

关注我,每天分享一个实用的Python自动化工具。