一、Prefect 库简介
Prefect 是一款面向现代 Python 开发者的工作流编排与任务调度库,核心用于简化、可视化、监控各类自动化任务,无需复杂配置即可构建健壮的数据流与定时任务。其基于“工作流即代码”理念,通过装饰器将普通函数转为可调度任务,自带调度引擎与状态管理,支持失败重试、超时控制、并行执行。采用 Apache 2.0 开源协议,优点是上手快、无侵入、可视化强,缺点是轻量场景下功能略冗余,超大规模集群需搭配服务端。

二、Prefect 安装与环境准备
2.1 安装 Prefect
在使用 Prefect 前,只需通过 pip 完成安装,兼容 Python 3.8 及以上版本,打开命令行执行以下指令:
pip install prefect安装完成后,可通过版本查看命令验证是否安装成功:
prefect --version若输出版本号,说明安装正常,可直接进入开发环节。
2.2 基础概念理解
Prefect 核心只有两个关键概念,新手也能快速掌握:
- Flow(工作流):整个自动化任务的入口,一个 Flow 可包含多个任务。
- Task(任务):工作流中的最小执行单元,负责具体逻辑处理。
只需给函数添加装饰器,普通 Python 代码就能变成可监控、可调度的工作流,无需修改原有业务逻辑。
三、Prefect 基础使用与代码示例
3.1 最简单的工作流示例
先从最基础的案例入手,感受 Prefect 零门槛接入的特点,直接将打印函数转为工作流:
# 导入核心模块
from prefect import flow, task
# 定义任务
@task
def say_hello():
print("Hello, Prefect!")
return "任务执行完成"
# 定义工作流
@flow
def hello_prefect_flow():
# 调用任务
result = say_hello()
print("工作流执行结果:", result)
# 运行工作流
if __name__ == "__main__":
hello_prefect_flow()代码说明:
- 使用
@task装饰器修饰普通函数,将其标记为 Prefect 任务。 - 使用
@flow装饰器修饰主函数,作为整个工作流的入口。 - 直接运行 Python 文件,即可启动工作流,控制台会输出执行日志与结果。
执行后可看到清晰的执行状态,包括任务开始、结束、耗时等信息,比原生脚本更易追踪。
3.2 带参数的任务与工作流
实际开发中,任务通常需要传入参数,Prefect 对参数支持完全原生,无需额外适配:
from prefect import flow, task
# 带参数的任务
@task
def add_numbers(a: int, b: int) -> int:
res = a + b
print(f"{a} + {b} = {res}")
return res
@flow
def calculate_flow(x: int, y: int):
total = add_numbers(x, y)
print(f"最终计算结果:{total}")
if __name__ == "__main__":
# 传入参数运行
calculate_flow(10, 25)代码说明:
任务和工作流都支持位置参数、关键字参数、默认参数,与原生 Python 函数使用方式完全一致,降低学习成本。
3.3 多任务顺序执行
复杂业务通常包含多个步骤,Prefect 可轻松编排多任务顺序执行,自动管理依赖关系:
from prefect import flow, task
@task
def step_one():
print("执行第一步:数据读取")
return "原始数据"
@task
def step_two(data):
print(f"执行第二步:处理数据 -> {data}")
return "处理后数据"
@task
def step_three(processed_data):
print(f"执行第三步:保存数据 -> {processed_data}")
return "数据保存成功"
@flow
def multi_step_flow():
data = step_one()
processed = step_two(data)
result = step_three(processed)
print(result)
if __name__ == "__main__":
multi_step_flow()代码说明:
任务之间可通过返回值传递数据,Prefect 会自动按定义顺序执行,前一个任务失败后序任务自动停止,保证流程安全。
四、Prefect 高级功能实战
4.1 任务失败重试
自动化脚本常因网络波动、接口超时报错,Prefect 内置重试机制,一行配置解决问题:
from prefect import flow, task
import random
# 配置重试:最多重试3次,重试间隔1秒
@task(retries=3, retry_delay_seconds=1)
def unstable_task():
# 模拟随机失败
if random.random() < 0.5:
raise Exception("网络异常,任务执行失败")
print("任务执行成功")
return True
@flow
def retry_flow():
unstable_task()
if __name__ == "__main__":
retry_flow()代码说明:retries 设置重试次数,retry_delay_seconds 设置重试间隔,遇到临时异常可自动恢复,无需手动捕获。
4.2 定时任务调度
很多场景需要定时执行,如每日数据统计、定时爬虫、定时清理日志,Prefect 内置调度功能:
from prefect import flow, task
from datetime import timedelta
@task
def daily_task():
print("执行每日定时任务")
# 每隔 10 秒执行一次
@flow
def scheduled_flow():
daily_task()
if __name__ == "__main__":
# 启动调度
from prefect.server.schemas.schedules import IntervalSchedule
scheduled_flow.serve(
schedule=IntervalSchedule(interval=timedelta(seconds=10))
)代码说明:
运行后脚本会持续运行,每 10 秒自动执行一次工作流,支持秒、分、时、日等间隔,适合轻量定时场景。
4.3 任务并行执行
提升执行效率最常用的方式是并行,Prefect 无需多线程/多进程代码,通过配置实现并行:
from prefect import flow, task
import time
@task
def task_a():
time.sleep(1)
print("任务A执行完成")
return "A"
@task
def task_b():
time.sleep(1)
print("任务B执行完成")
return "B"
@task
def task_c():
time.sleep(1)
print("任务C执行完成")
return "C"
@flow
def parallel_flow():
# 并行提交任务
a = task_a.submit()
b = task_b.submit()
c = task_c.submit()
# 等待结果
print("所有任务结果:", a.result(), b.result(), c.result())
if __name__ == "__main__":
parallel_flow()代码说明:
使用 .submit() 提交任务即可实现并行,三个任务原本需 3 秒,并行后仅需 1 秒,大幅提升效率。
4.4 工作流超时控制
防止任务卡死占用资源,可给工作流或任务设置超时时间:
from prefect import flow, task
import time
@task(timeout_seconds=3)
def long_running_task():
time.sleep(5)
print("任务执行完成")
@flow
def timeout_flow():
long_running_task()
if __name__ == "__main__":
timeout_flow()代码说明:
任务设置 3 秒超时,但实际执行 5 秒,Prefect 会自动终止任务并标记失败,避免资源阻塞。
五、Prefect UI 可视化控制台
Prefect 自带可视化界面,可实时查看工作流执行状态、日志、失败记录,无需额外搭建监控系统。
5.1 启动 UI 控制台
命令行执行:
prefect server start启动成功后,浏览器访问:
http://127.0.0.1:4200即可进入控制台,可查看:
- 所有工作流与任务列表
- 执行历史与耗时统计
- 失败任务与报错日志
- 定时任务调度状态
5.2 连接 UI 执行工作流
运行工作流后,无需额外配置,执行记录会自动上传到 UI,新手也能快速定位问题。
六、真实业务场景案例
6.1 每日自动化数据统计脚本
模拟企业常用场景:每日从接口获取数据、清洗处理、保存结果、发送通知:
from prefect import flow, task
import time
from datetime import datetime
# 模拟获取数据
@task(retries=2, retry_delay_seconds=2)
def fetch_data():
print(f"{datetime.now()} - 从数据源获取数据...")
time.sleep(1)
return {"user_count": 1245, "order_count": 328}
# 数据清洗
@task
def clean_data(data):
print("数据清洗中...")
time.sleep(1)
cleaned = {
"统计时间": datetime.now().strftime("%Y-%m-%d"),
"用户数": data["user_count"],
"订单数": data["order_count"],
"状态": "已清洗"
}
return cleaned
# 保存数据
@task
def save_data(cleaned_data):
print("保存数据到本地/数据库...")
time.sleep(1)
with open("daily_report.txt", "w", encoding="utf-8") as f:
f.write(str(cleaned_data))
return True
# 发送通知
@task
def send_notification(success):
if success:
print("每日统计完成,已发送通知")
else:
print("统计失败,需人工检查")
# 完整工作流
@flow
def daily_statistics_flow():
raw_data = fetch_data()
cleaned = clean_data(raw_data)
save_success = save_data(cleaned)
send_notification(save_success)
if __name__ == "__main__":
# 启动定时服务
daily_statistics_flow.serve(
name="每日数据统计任务",
schedule={"interval": "24h"}
)代码说明:
该脚本整合了重试、定时、多步骤编排、数据传递等核心功能,可直接用于生产环境,替代传统 crontab + 杂乱脚本,维护性大幅提升。
七、相关资源
- Pypi地址:https://pypi.org/project/prefect/
- Github地址:https://github.com/PrefectHQ/prefect
- 官方文档地址:https://docs.prefect.io/latest/
关注我,每天分享一个实用的Python自动化工具。

