Python 工作流神器:Prefect 从入门到实战,轻松搞定自动化任务调度

一、Prefect 库简介

Prefect 是一款面向现代 Python 开发者的工作流编排与任务调度库,核心用于简化、可视化、监控各类自动化任务,无需复杂配置即可构建健壮的数据流与定时任务。其基于“工作流即代码”理念,通过装饰器将普通函数转为可调度任务,自带调度引擎与状态管理,支持失败重试、超时控制、并行执行。采用 Apache 2.0 开源协议,优点是上手快、无侵入、可视化强,缺点是轻量场景下功能略冗余,超大规模集群需搭配服务端。

二、Prefect 安装与环境准备

2.1 安装 Prefect

在使用 Prefect 前,只需通过 pip 完成安装,兼容 Python 3.8 及以上版本,打开命令行执行以下指令:

pip install prefect

安装完成后,可通过版本查看命令验证是否安装成功:

prefect --version

若输出版本号,说明安装正常,可直接进入开发环节。

2.2 基础概念理解

Prefect 核心只有两个关键概念,新手也能快速掌握:

  • Flow(工作流):整个自动化任务的入口,一个 Flow 可包含多个任务。
  • Task(任务):工作流中的最小执行单元,负责具体逻辑处理。
    只需给函数添加装饰器,普通 Python 代码就能变成可监控、可调度的工作流,无需修改原有业务逻辑。

三、Prefect 基础使用与代码示例

3.1 最简单的工作流示例

先从最基础的案例入手,感受 Prefect 零门槛接入的特点,直接将打印函数转为工作流:

# 导入核心模块
from prefect import flow, task

# 定义任务
@task
def say_hello():
    print("Hello, Prefect!")
    return "任务执行完成"

# 定义工作流
@flow
def hello_prefect_flow():
    # 调用任务
    result = say_hello()
    print("工作流执行结果:", result)

# 运行工作流
if __name__ == "__main__":
    hello_prefect_flow()

代码说明

  1. 使用 @task 装饰器修饰普通函数,将其标记为 Prefect 任务。
  2. 使用 @flow 装饰器修饰主函数,作为整个工作流的入口。
  3. 直接运行 Python 文件,即可启动工作流,控制台会输出执行日志与结果。
    执行后可看到清晰的执行状态,包括任务开始、结束、耗时等信息,比原生脚本更易追踪。

3.2 带参数的任务与工作流

实际开发中,任务通常需要传入参数,Prefect 对参数支持完全原生,无需额外适配:

from prefect import flow, task

# 带参数的任务
@task
def add_numbers(a: int, b: int) -> int:
    res = a + b
    print(f"{a} + {b} = {res}")
    return res

@flow
def calculate_flow(x: int, y: int):
    total = add_numbers(x, y)
    print(f"最终计算结果:{total}")

if __name__ == "__main__":
    # 传入参数运行
    calculate_flow(10, 25)

代码说明
任务和工作流都支持位置参数、关键字参数、默认参数,与原生 Python 函数使用方式完全一致,降低学习成本。

3.3 多任务顺序执行

复杂业务通常包含多个步骤,Prefect 可轻松编排多任务顺序执行,自动管理依赖关系:

from prefect import flow, task

@task
def step_one():
    print("执行第一步:数据读取")
    return "原始数据"

@task
def step_two(data):
    print(f"执行第二步:处理数据 -> {data}")
    return "处理后数据"

@task
def step_three(processed_data):
    print(f"执行第三步:保存数据 -> {processed_data}")
    return "数据保存成功"

@flow
def multi_step_flow():
    data = step_one()
    processed = step_two(data)
    result = step_three(processed)
    print(result)

if __name__ == "__main__":
    multi_step_flow()

代码说明
任务之间可通过返回值传递数据,Prefect 会自动按定义顺序执行,前一个任务失败后序任务自动停止,保证流程安全。

四、Prefect 高级功能实战

4.1 任务失败重试

自动化脚本常因网络波动、接口超时报错,Prefect 内置重试机制,一行配置解决问题:

from prefect import flow, task
import random

# 配置重试:最多重试3次,重试间隔1秒
@task(retries=3, retry_delay_seconds=1)
def unstable_task():
    # 模拟随机失败
    if random.random() < 0.5:
        raise Exception("网络异常,任务执行失败")
    print("任务执行成功")
    return True

@flow
def retry_flow():
    unstable_task()

if __name__ == "__main__":
    retry_flow()

代码说明
retries 设置重试次数,retry_delay_seconds 设置重试间隔,遇到临时异常可自动恢复,无需手动捕获。

4.2 定时任务调度

很多场景需要定时执行,如每日数据统计、定时爬虫、定时清理日志,Prefect 内置调度功能:

from prefect import flow, task
from datetime import timedelta

@task
def daily_task():
    print("执行每日定时任务")

# 每隔 10 秒执行一次
@flow
def scheduled_flow():
    daily_task()

if __name__ == "__main__":
    # 启动调度
    from prefect.server.schemas.schedules import IntervalSchedule
    scheduled_flow.serve(
        schedule=IntervalSchedule(interval=timedelta(seconds=10))
    )

代码说明
运行后脚本会持续运行,每 10 秒自动执行一次工作流,支持秒、分、时、日等间隔,适合轻量定时场景。

4.3 任务并行执行

提升执行效率最常用的方式是并行,Prefect 无需多线程/多进程代码,通过配置实现并行:

from prefect import flow, task
import time

@task
def task_a():
    time.sleep(1)
    print("任务A执行完成")
    return "A"

@task
def task_b():
    time.sleep(1)
    print("任务B执行完成")
    return "B"

@task
def task_c():
    time.sleep(1)
    print("任务C执行完成")
    return "C"

@flow
def parallel_flow():
    # 并行提交任务
    a = task_a.submit()
    b = task_b.submit()
    c = task_c.submit()

    # 等待结果
    print("所有任务结果:", a.result(), b.result(), c.result())

if __name__ == "__main__":
    parallel_flow()

代码说明
使用 .submit() 提交任务即可实现并行,三个任务原本需 3 秒,并行后仅需 1 秒,大幅提升效率。

4.4 工作流超时控制

防止任务卡死占用资源,可给工作流或任务设置超时时间:

from prefect import flow, task
import time

@task(timeout_seconds=3)
def long_running_task():
    time.sleep(5)
    print("任务执行完成")

@flow
def timeout_flow():
    long_running_task()

if __name__ == "__main__":
    timeout_flow()

代码说明
任务设置 3 秒超时,但实际执行 5 秒,Prefect 会自动终止任务并标记失败,避免资源阻塞。

五、Prefect UI 可视化控制台

Prefect 自带可视化界面,可实时查看工作流执行状态、日志、失败记录,无需额外搭建监控系统。

5.1 启动 UI 控制台

命令行执行:

prefect server start

启动成功后,浏览器访问:

http://127.0.0.1:4200

即可进入控制台,可查看:

  • 所有工作流与任务列表
  • 执行历史与耗时统计
  • 失败任务与报错日志
  • 定时任务调度状态

5.2 连接 UI 执行工作流

运行工作流后,无需额外配置,执行记录会自动上传到 UI,新手也能快速定位问题。

六、真实业务场景案例

6.1 每日自动化数据统计脚本

模拟企业常用场景:每日从接口获取数据、清洗处理、保存结果、发送通知:

from prefect import flow, task
import time
from datetime import datetime

# 模拟获取数据
@task(retries=2, retry_delay_seconds=2)
def fetch_data():
    print(f"{datetime.now()} - 从数据源获取数据...")
    time.sleep(1)
    return {"user_count": 1245, "order_count": 328}

# 数据清洗
@task
def clean_data(data):
    print("数据清洗中...")
    time.sleep(1)
    cleaned = {
        "统计时间": datetime.now().strftime("%Y-%m-%d"),
        "用户数": data["user_count"],
        "订单数": data["order_count"],
        "状态": "已清洗"
    }
    return cleaned

# 保存数据
@task
def save_data(cleaned_data):
    print("保存数据到本地/数据库...")
    time.sleep(1)
    with open("daily_report.txt", "w", encoding="utf-8") as f:
        f.write(str(cleaned_data))
    return True

# 发送通知
@task
def send_notification(success):
    if success:
        print("每日统计完成,已发送通知")
    else:
        print("统计失败,需人工检查")

# 完整工作流
@flow
def daily_statistics_flow():
    raw_data = fetch_data()
    cleaned = clean_data(raw_data)
    save_success = save_data(cleaned)
    send_notification(save_success)

if __name__ == "__main__":
    # 启动定时服务
    daily_statistics_flow.serve(
        name="每日数据统计任务",
        schedule={"interval": "24h"}
    )

代码说明
该脚本整合了重试、定时、多步骤编排、数据传递等核心功能,可直接用于生产环境,替代传统 crontab + 杂乱脚本,维护性大幅提升。

七、相关资源

  • Pypi地址:https://pypi.org/project/prefect/
  • Github地址:https://github.com/PrefectHQ/prefect
  • 官方文档地址:https://docs.prefect.io/latest/

关注我,每天分享一个实用的Python自动化工具。