Python实用数据处理库petl:轻量级表格数据操作完全指南

一、petl库基础认知

petl全称为Python ETL,是一款专注于轻量级数据提取、转换、加载的开源Python库,核心面向表格型数据处理场景,无需依赖 heavy 框架即可完成数据清洗、筛选、合并、格式转换等操作。其工作原理是通过惰性加载处理数据,仅在需要输出时才执行计算,大幅降低内存占用。该库采用MIT开源许可,优点是轻量简洁、上手门槛低、适合中小数据集处理,缺点是不适合超大规模分布式数据运算,性能弱于Pandas等专业数据分析库。

二、petl库安装方法

petl的安装流程十分简便,支持pip快速安装,无需配置复杂环境,适合Python初学者直接使用。
打开命令行工具,执行以下安装命令:

pip install petl

若需要加速安装,可使用国内镜像源:

pip install petl -i https://pypi.tuna.tsinghua.edu.cn/simple

安装完成后,在Python脚本中直接导入即可验证是否安装成功:

import petl
print(petl.__version__)

执行后输出版本号,即代表安装完成,可以正常使用。

三、petl核心数据结构与基础操作

petl核心围绕表格数据展开,其数据结构可以是列表、元组、CSV文件、Excel文件、数据库查询结果等,所有操作均以行和列为基本单位,语法贴近自然语言,极易理解。

3.1 创建基础表格数据

petl可以直接从Python原生数据结构创建数据表,这是入门的第一步。

import petl as etl

# 手动创建表格数据,第一行为表头,后续为数据行
table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 查看数据前3行
print(etl.head(table, 3))

代码说明:

  1. 导入petl库并简写为etl,方便后续调用;
  2. 使用列表嵌套结构定义表格,第一行是字段名,后续每行是一条数据;
  3. head()方法用于查看指定行数的数据,默认查看前5行,此处指定查看前3行,方便快速预览数据结构。

3.2 数据筛选与条件查询

数据筛选是数据处理中最常用的功能,petl提供select、selecteq、selectne等方法实现条件过滤。

import petl as etl

table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 筛选年龄大于24岁的数据
age_filter = etl.select(table, lambda rec: rec.age > 24)
print('年龄大于24岁的数据:')
print(age_filter)

# 筛选城市为上海的数据
city_filter = etl.selecteq(table, 'city', '上海')
print('\n城市为上海的数据:')
print(city_filter)

代码说明:

  1. select()方法支持自定义lambda函数,可实现复杂条件筛选;
  2. selecteq()是等值筛选方法,直接指定字段和对应值即可,语法更简洁;
  3. 所有操作返回新的表格对象,不会修改原始数据,保证数据安全。

3.3 字段新增与修改

处理数据时经常需要新增计算字段或修改现有字段,petl的addfield()和convert()方法可快速实现。

import petl as etl

table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 新增年龄分组字段
table_add = etl.addfield(table, 'age_group', lambda rec: '青年' if rec.age < 25 else '壮年')
print('新增年龄分组字段后:')
print(table_add)

# 修改年龄字段,所有年龄+1
table_convert = etl.convert(table_add, 'age', lambda v: v + 1)
print('\n修改年龄字段后:')
print(table_convert)

代码说明:

  1. addfield()用于新增字段,可通过lambda函数根据现有数据计算新字段值;
  2. convert()用于修改指定字段的值,支持批量转换、格式调整等操作;
  3. 操作支持链式调用,可连续对数据进行处理。

3.4 数据排序与去重

数据排序和去重是数据清洗的必备环节,petl提供sort()、distinct()方法快速处理。

import petl as etl

table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['张三', 22, '北京'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 数据去重
table_distinct = etl.distinct(table)
print('去重后的数据:')
print(table_distinct)

# 按年龄降序排序
table_sort = etl.sort(table_distinct, key='age', reverse=True)
print('\n按年龄降序排序后:')
print(table_sort)

代码说明:

  1. distinct()方法会自动去除完全重复的数据行;
  2. sort()方法通过key指定排序字段,reverse=True表示降序,False为升序;
  3. 处理后的数据结构保持不变,可直接用于后续操作。

四、petl文件读写操作

petl最大的优势之一是支持多种文件格式的读写,包括CSV、Excel、JSON、HTML等,无需依赖其他库即可完成文件数据处理。

4.1 CSV文件读写

CSV是最常用的表格数据格式,petl对CSV的支持极为完善,读写速度快且稳定。

import petl as etl

# 定义测试数据
table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 将数据写入CSV文件
etl.tocsv(table, 'user_data.csv', encoding='utf-8')
print('数据已写入CSV文件')

# 从CSV文件读取数据
read_csv = etl.fromcsv('user_data.csv', encoding='utf-8')
print('\n读取CSV文件数据:')
print(etl.head(read_csv))

代码说明:

  1. tocsv()方法将petl表格数据写入CSV文件,需指定编码格式避免中文乱码;
  2. fromcsv()方法读取CSV文件并转换为petl可处理的表格对象;
  3. 支持指定分隔符、是否包含表头等参数,适配不同格式的CSV文件。

4.2 Excel文件读写

petl读写Excel文件需要依赖openpyxl或xlrd库,需提前安装依赖:

pip install openpyxl xlrd

安装完成后执行读写代码:

import petl as etl

table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 写入Excel文件
etl.toxlsx(table, 'user_data.xlsx', sheet='用户信息')
print('数据已写入Excel文件')

# 读取Excel文件
read_excel = etl.fromxlsx('user_data.xlsx', sheet='用户信息')
print('\n读取Excel文件数据:')
print(etl.head(read_excel))

代码说明:

  1. toxlsx()和fromxlsx()分别实现Excel文件的写入和读取;
  2. 支持指定sheet工作表名称,适合处理多工作表的Excel文件;
  3. 读取后的数据格式与手动创建的表格数据一致,可直接使用所有petl操作方法。

4.3 JSON与HTML格式数据处理

除了常规表格文件,petl还支持JSON和HTML格式的数据转换,适合数据展示和接口对接。

import petl as etl

table = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海'],
    ['王五', 28, '深圳']
]

# 转换为JSON格式
json_data = etl.tojson(table)
print('JSON格式数据:')
print(json_data)

# 转换为HTML表格
html_table = etl.tohtml(table)
print('\nHTML表格代码:')
print(html_table)

代码说明:

  1. tojson()将表格数据转换为JSON格式,适合接口数据返回;
  2. tohtml()将数据转换为HTML表格代码,可直接嵌入网页展示;
  3. 转换过程自动保留字段名和数据对应关系,无需手动格式化。

五、petl数据合并与关联操作

在实际数据处理中,经常需要将多个数据源合并、关联,petl提供cat、join、lookup等方法实现多表操作,功能媲美数据库关联查询。

5.1 多表数据合并

cat()方法用于将结构相同的多个表格合并为一个表格,适合数据拼接场景。

import petl as etl

# 定义两个结构相同的表格
table1 = [
    ['name', 'age', 'city'],
    ['张三', 22, '北京'],
    ['李四', 25, '上海']
]

table2 = [
    ['name', 'age', 'city'],
    ['王五', 28, '深圳'],
    ['赵六', 24, '广州']
]

# 合并两个表格
merge_table = etl.cat(table1, table2)
print('合并后的数据:')
print(merge_table)

代码说明:

  1. cat()方法可接收多个表格参数,一次性合并多个数据源;
  2. 要求表格的字段结构完全一致,否则会出现数据错位;
  3. 合并后保留所有数据行,不进行去重操作,如需去重可配合distinct()使用。

5.2 表关联查询(join)

join()方法实现类似数据库的左连接、内连接,适合多表关联分析。

import petl as etl

# 用户基础信息表
user_table = [
    ['user_id', 'name'],
    [1, '张三'],
    [2, '李四'],
    [3, '王五']
]

# 用户订单表
order_table = [
    ['user_id', 'order_no', 'amount'],
    [1, 'ORDER001', 99],
    [2, 'ORDER002', 199],
    [1, 'ORDER003', 299]
]

# 内连接:只保留匹配的数据
inner_join = etl.join(user_table, order_table, key='user_id')
print('内连接结果:')
print(inner_join)

# 左连接:保留左表所有数据
left_join = etl.leftjoin(user_table, order_table, key='user_id')
print('\n左连接结果:')
print(left_join)

代码说明:

  1. key参数指定关联字段,通常为ID类唯一标识;
  2. 内连接只保留两张表都存在的匹配数据;
  3. 左连接保留左表所有数据,右表无匹配数据时填充为None;
  4. 无需编写复杂SQL,纯Python语法即可实现数据库级别的关联查询。

六、petl与数据库交互

petl支持直接连接MySQL、SQLite、PostgreSQL等数据库,实现数据的提取与写入,是ETL流程的核心功能。

6.1 SQLite数据库操作

SQLite为轻量级文件数据库,无需单独安装服务,适合演示。

import petl as etl
import sqlite3

# 连接SQLite数据库
conn = sqlite3.connect('test.db')

# 创建测试表
conn.execute('''CREATE TABLE IF NOT EXISTS user 
             (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)''')

# 定义数据并写入数据库
table = [
    ['id', 'name', 'age'],
    [1, '张三', 22],
    [2, '李四', 25],
    [3, '王五', 28]
]
etl.todb(table, conn, 'user')
print('数据已写入SQLite数据库')

# 从数据库读取数据
db_table = etl.fromdb(conn, 'SELECT * FROM user')
print('\n从数据库读取的数据:')
print(db_table)

conn.close()

代码说明:

  1. fromdb()从数据库执行SQL查询并返回petl表格数据;
  2. todb()将petl数据写入数据库表;
  3. 支持事务操作、批量写入,效率高于原生Python数据库操作。

6.2 MySQL数据库操作

连接MySQL需要安装pymysql库:

pip install pymysql

连接代码:

import petl as etl
import pymysql

# 连接MySQL数据库
conn = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='test_db',
    charset='utf8'
)

# 从MySQL查询数据
mysql_table = etl.fromdb(conn, 'SELECT name, age FROM user LIMIT 5')
print('MySQL数据:')
print(mysql_table)

conn.close()

代码说明:

  1. 连接参数需根据实际MySQL环境修改;
  2. 支持复杂SQL查询,结果直接转换为petl可处理的表格;
  3. 可实现数据从数据库提取→清洗→转换→写回数据库的完整ETL流程。

七、petl实际业务案例

用户数据清洗与分析为例,模拟真实业务场景,整合petl所有核心操作。

7.1 业务需求

  1. 读取CSV格式的原始用户数据;
  2. 去除重复数据、空值数据;
  3. 筛选年龄在20-30岁之间的用户;
  4. 新增年龄段分组字段;
  5. 按城市统计用户数量;
  6. 将处理结果保存为Excel文件。

7.2 完整实现代码

import petl as etl

# 1. 读取原始CSV数据
raw_data = etl.fromcsv('raw_user.csv', encoding='utf-8')

# 2. 去除重复数据和空值数据
data_no_dup = etl.distinct(raw_data)
data_no_null = etl.rejectmissing(data_no_dup)

# 3. 筛选年龄20-30岁的用户
data_filter = etl.select(data_no_null, lambda rec: 20 <= int(rec.age) <= 30)

# 4. 新增年龄段分组字段
data_add_field = etl.addfield(data_filter, 'age_group', 
                             lambda rec: '20-25岁' if 20 <= int(rec.age) <=25 else '26-30岁')

# 5. 按城市统计用户数量
city_stats = etl.aggregate(data_add_field, 'city', count=('name', len))

# 6. 保存处理后的数据和统计结果
etl.toxlsx(data_add_field, 'clean_user_data.xlsx', sheet='清洗后数据')
etl.toxlsx(city_stats, 'city_user_stats.xlsx', sheet='城市统计')

print('数据处理完成,结果已保存为Excel文件')

代码说明:

  1. rejectmissing()方法自动剔除包含空值的数据行,保证数据质量;
  2. aggregate()实现分组统计功能,类似SQL的GROUP BY,可统计数量、求和、平均值等;
  3. 整个流程采用链式处理逻辑,代码简洁易读,无需创建中间变量;
  4. 从文件读取到最终输出,全程使用petl完成,适合自动化数据处理脚本。

相关资源

  • Pypi地址:https://pypi.org/project/petl
  • Github地址:https://github.com/petl-developers/petl
  • 官方文档地址:https://petl.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python 数据建模神器:dbt 从入门到实战,轻松搞定数据仓库开发

一、dbt 核心介绍

dbt(data build tool)是专注于数据仓库建模的 Python 工具,核心作用是让数据工程师、分析师用 SQL 完成数据转换、测试、文档化,无需编写复杂调度脚本。其原理是基于 SQL 编译生成可执行模型,依赖数据仓库引擎执行计算,采用 Apache 2.0 开源协议。优点是上手快、协作友好、自带测试与文档,缺点是依赖数据仓库、不负责数据抽取与加载。

二、dbt 安装与环境初始化

2.1 安装 dbt

dbt 可通过 pip 直接安装,不同数据仓库对应不同适配器,主流适配 BigQuery、Snowflake、Redshift、Databricks、PostgreSQL 等,这里以通用安装和最常用的 PostgreSQL 适配器为例。

打开命令行执行安装命令:

# 安装 dbt 核心及 PostgreSQL 适配器
pip install dbt-core dbt-postgres

安装完成后验证版本:

dbt --version

出现版本信息即代表安装成功,包含 core 版本和对应适配器版本。

2.2 初始化 dbt 项目

安装完成后,创建专属 dbt 项目,命令会自动生成标准目录结构,这是 dbt 规范开发的基础。

# 创建名为 dbt_demo 的项目
dbt init dbt_demo

执行后进入项目目录:

cd dbt_demo

2.3 配置数据仓库连接

dbt 核心配置文件为 profiles.yml,默认在用户目录 .dbt 文件夹下,用于配置数据库连接信息。

以 PostgreSQL 为例,配置内容如下:

dbt_demo:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      user: postgres
      password: 你的密码
      port: 5432
      dbname: postgres
      schema: dbt_demo
      threads: 4

配置完成后,测试连接是否正常:

dbt debug

显示 All checks passed! 代表连接成功。

三、dbt 标准目录结构

dbt 有固定的目录规范,便于团队协作和模型管理,初始化后的目录结构如下:

dbt_demo/
├── analyses/          # 存放分析查询 SQL
├── dbt_project.yml    # 项目核心配置文件
├── macros/            # 自定义 Jinja2 宏
├── models/            # 核心数据模型目录
├── seeds/             # 存放静态 CSV 数据
├── snapshots/         # 快照数据,记录历史状态
├── tests/             # 自定义数据测试
└── README.md

其中 models 是核心目录,所有业务数据模型都在此编写,dbt_project.yml 用于配置模型、权限、变量等。

四、dbt 基础使用:从简单模型开始

4.1 编写基础模型

模型是 dbt 的核心,本质就是带逻辑的 SQL 文件,存放在 models 目录下。

models 目录下创建 user_orders.sql,编写简单模型,关联用户表和订单表:

-- models/user_orders.sql
{{ config(materialized='table') }}

SELECT
    u.user_id,
    u.username,
    u.create_time AS user_create_time,
    o.order_id,
    o.order_amount,
    o.order_time
FROM
    public.users u
LEFT JOIN
    public.orders o
ON
    u.user_id = o.user_id

代码说明:

  • {{ config(materialized='table') }} 表示模型构建为表,还支持 view、incremental、ephemeral 等类型
  • 模型本质是标准 SQL,dbt 会自动编译并在数据库中生成对应表或视图

4.2 运行 dbt 模型

编写完成后,执行命令运行模型:

dbt run

执行成功后,会在配置的 schema 下生成 user_orders 表,数据自动关联计算完成。

4.3 为模型添加文档描述

dbt 支持直接在 SQL 中添加注释,自动生成文档,无需手动维护。

优化后的模型:

-- models/user_orders.sql
{{ config(materialized='table', tags=['user', 'order']) }}

/*
 * 模型名称: user_orders
 * 功能: 用户与订单关联宽表,用于用户消费分析
 */

SELECT
    u.user_id AS user_id,        -- 用户ID,主键
    u.username AS username,      -- 用户名
    u.create_time AS user_create_time,  -- 用户注册时间
    o.order_id AS order_id,      -- 订单ID
    o.order_amount AS order_amount,  -- 订单金额
    o.order_time AS order_time   -- 下单时间
FROM
    public.users u
LEFT JOIN
    public.orders o
ON
    u.user_id = o.user_id

4.4 生成并查看文档

执行命令生成文档:

dbt docs generate
dbt docs serve

执行后会启动本地服务,浏览器访问可查看完整的模型血缘关系、字段说明、模型依赖,非常适合团队协作。

五、dbt 进阶使用:数据测试与增量模型

5.1 内置数据测试

数据质量是数仓核心,dbt 内置丰富测试,在 models 下创建 schema.yml 配置测试规则:

version: 2

models:
  - name: user_orders
    columns:
      - name: user_id
        tests:
          - not_null
          - unique
      - name: order_amount
        tests:
          - not_null

配置后执行测试命令:

dbt test

dbt 会自动校验字段是否为空、是否唯一,快速发现数据问题。

5.2 自定义测试

除内置测试外,还可编写自定义 SQL 测试,在 tests 目录下创建 test_order_amount_positive.sql

-- 测试订单金额必须大于0
SELECT
    *
FROM
    {{ ref('user_orders') }}
WHERE
    order_amount <= 0

执行 dbt test 时会自动运行该测试,若有负金额则报错。

5.3 增量模型(核心进阶功能)

全量构建在大数据量下效率极低,dbt 提供增量模型,只新增或更新数据。

创建增量订单模型 incremental_orders.sql

-- models/incremental_orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

SELECT
    order_id,
    user_id,
    order_amount,
    order_time
FROM
    public.orders

{% if is_incremental() %}
    -- 增量逻辑:只查询比当前模型最大时间大的数据
    WHERE order_time > (SELECT MAX(order_time) FROM {{ this }})
{% endif %}

代码说明:

  • incremental 代表增量模型
  • unique_key 是增量合并的唯一键
  • is_incremental() 宏用于区分首次运行与增量运行
    首次执行全量构建,后续执行只增量同步新数据,大幅提升运行效率。

六、dbt 宏与变量:提升代码复用性

6.1 自定义宏

宏类似 Python 函数,可复用 SQL 逻辑,在 macros 目录下创建 get_date_macro.sql

{% macro get_today() %}
    CURRENT_DATE
{% endmacro %}

在模型中直接调用:

SELECT
    order_id,
    order_time,
    {{ get_today() }} AS stat_date
FROM
    {{ ref('incremental_orders') }}

6.2 使用项目变量

dbt_project.yml 中定义变量:

vars:
  start_date: '2025-01-01'

模型中使用变量:

SELECT
    *
FROM
    {{ ref('user_orders') }}
WHERE
    order_time >= '{{ var("start_date") }}'

七、dbt 种子数据:静态数据导入

dbt 支持将本地 CSV 文件导入数据库,称为种子数据,适合字典表、静态映射表。

seeds 目录下创建 user_level.csv

user_id,level
1,VIP
2,普通用户
3,VIP

执行导入命令:

dbt seed

导入后可在模型中直接引用:

SELECT
    u.*,
    l.level
FROM
    {{ ref('user_orders') }} u
LEFT JOIN
    {{ ref('user_level') }} l
ON
    u.user_id = l.user_id

八、完整实战案例:用户消费统计宽表

结合前面所有知识点,构建一个完整的数仓模型,用于业务分析、报表展示。

8.1 需求说明

构建用户日消费统计表,包含用户ID、用户名、消费日期、消费次数、消费总金额、会员等级,支持增量更新、数据测试、自动文档。

8.2 模型代码

-- models/user_daily_stat.sql
{{ config(
    materialized='incremental',
    unique_key='concat(user_id, stat_date)',
    tags=['report', 'user', 'stat'],
    incremental_strategy='merge'
) }}

WITH order_daily AS (
    SELECT
        user_id,
        DATE(order_time) AS stat_date,
        COUNT(order_id) AS order_count,
        SUM(order_amount) AS total_amount
    FROM
        {{ ref('incremental_orders') }}
    GROUP BY
        user_id, DATE(order_time)
),

user_info AS (
    SELECT
        user_id,
        username
    FROM
        public.users
),

user_level AS (
    SELECT
        user_id,
        level
    FROM
        {{ ref('user_level') }}
)

SELECT
    u.user_id,
    u.username,
    l.level,
    o.stat_date,
    o.order_count,
    o.total_amount
FROM
    user_info u
JOIN
    order_daily o ON u.user_id = o.user_id
LEFT JOIN
    user_level l ON u.user_id = l.user_id

{% if is_incremental() %}
    WHERE o.stat_date > (SELECT MAX(stat_date) FROM {{ this }})
{% endif %}

8.3 测试配置

version: 2

models:
  - name: user_daily_stat
    columns:
      - name: user_id
        tests:
          - not_null
      - name: stat_date
        tests:
          - not_null
      - name: total_amount
        tests:
          - not_null

8.4 完整执行流程

# 运行所有模型
dbt run

# 执行所有测试
dbt test

# 生成文档
dbt docs generate
dbt docs serve

执行完成后,即可得到稳定、可复用、可追溯、带质量保障的用户消费日统计报表。

九、dbt 常用命令总结

  • dbt init:初始化项目
  • dbt debug:检查配置与连接
  • dbt run:运行所有模型
  • dbt run --select 模型名:运行单个模型
  • dbt test:执行所有测试
  • dbt seed:导入种子数据
  • dbt docs generate:生成文档
  • dbt docs serve:启动文档服务

相关资源

  • Pypi地址:https://pypi.org/project/dbt-core/
  • Github地址:https://github.com/dbt-labs/dbt-core
  • 官方文档地址:https://docs.getdbt.com/

关注我,每天分享一个实用的Python自动化工具。

Python 数据管道神器:Kedro 从入门到实战,轻松构建可复用数据工程

一、Kedro 库简介

Kedro 是面向生产级数据工程与数据科学的 Python 框架,专注标准化、可复用、可维护的数据管道构建,基于模块化与配置驱动思想,将数据处理流程拆分为节点与管道,支持版本管理、测试、文档自动化。优点是工程化规范、协作友好、适合复杂项目;缺点是轻量场景略显繁琐。采用 Apache 2.0 开源许可。

二、Kedro 安装与环境准备

2.1 环境要求

Kedro 支持 Python 3.8 及以上版本,兼容 Windows、macOS、Linux 系统,可与 pandas、numpy、scikit-learn、PySpark 无缝集成,适合单机与分布式数据工程。

2.2 安装命令

打开终端或命令提示符,执行以下命令完成安装:

pip install kedro

如需使用可视化工具,可安装扩展:

pip install kedro-viz

安装完成后,验证版本:

kedro --version

出现版本号即表示安装成功。

三、Kedro 核心概念与工作流程

3.1 核心概念

  • 项目(Project):Kedro 工程的根目录,统一管理代码、数据、配置、文档。
  • 节点(Node):最小执行单元,对应一个 Python 函数,负责单一数据处理逻辑。
  • 管道(Pipeline):多个节点按依赖关系组合而成的执行流程,自动解析执行顺序。
  • 目录(Catalog):数据入口配置文件,统一管理数据读取与写入,支持多种格式。
  • 参数(Parameters):集中管理配置参数,便于修改与环境切换。
  • 运行(Run):执行整个或部分数据管道,自动处理依赖与数据流转。

3.2 工作原理

Kedro 通过声明式编程定义数据处理逻辑,不直接硬编码读写路径与执行顺序,而是通过 YAML 配置文件声明数据与参数,通过函数定义节点逻辑,自动构建依赖图并按拓扑顺序执行,保证流程可复现、可测试、可扩展。

四、Kedro 项目创建与目录结构

4.1 创建新项目

在终端进入工作目录,执行命令创建 Kedro 项目:

kedro new

按提示输入项目名称,例如 kedro_demo,等待项目生成。

4.2 标准目录结构

kedro_demo/
├── conf/                # 配置文件(catalog、parameters)
│   ├── base/
│   └── local/
├── data/                # 数据目录(原始、中间、模型、输出)
│   ├── 01_raw/
│   ├── 02_intermediate/
│   ├── 03_primary/
│   ├── 04_feature/
│   ├── 05_model_input/
│   ├── 06_models/
│   ├── 07_model_output/
│   └── 08_reporting/
├── docs/                # 项目文档
├── kedro_demo/          # 主源码目录
│   ├── __init__.py
│   ├── __main__.py
│   ├── pipeline_registry.py  # 管道注册
│   └── pipelines/        # 管道实现
├── logs/                # 运行日志
├── tests/               # 单元测试
├── .gitignore
├── pyproject.toml
└── README.md

该结构遵循数据工程最佳实践,从原始数据到报告输出分层管理,避免混乱。

五、基础使用:从零构建第一个数据管道

5.1 编写数据处理函数

进入 kedro_demo/pipelines,创建 demo_pipeline 文件夹,新增 nodes.py

# -*- coding: utf-8 -*-
import pandas as pd

def load_raw_data(file_path: str) -> pd.DataFrame:
    """
    读取原始CSV数据
    """
    df = pd.read_csv(file_path)
    return df

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    数据清洗:去重、缺失值填充
    """
    df = df.drop_duplicates()
    df = df.fillna(0)
    return df

def calculate_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    简单统计计算:新增均值列
    """
    df['mean_value'] = df.select_dtypes(include='number').mean(axis=1)
    return df

5.2 构建管道

同目录创建 pipeline.py

# -*- coding: utf-8 -*-
from kedro.pipeline import Pipeline, node
from .nodes import load_raw_data, clean_data, calculate_stats

def create_pipeline(**kwargs) -> Pipeline:
    return Pipeline(
        [
            node(
                func=load_raw_data,
                inputs="params:raw_data_path",
                outputs="raw_data",
                name="load_raw_data_node",
            ),
            node(
                func=clean_data,
                inputs="raw_data",
                outputs="cleaned_data",
                name="clean_data_node",
            ),
            node(
                func=calculate_stats,
                inputs="cleaned_data",
                outputs="stats_data",
                name="calculate_stats_node",
            ),
        ]
    )

Kedro 会根据 inputsoutputs 自动确定执行顺序。

5.3 配置数据目录

conf/base/catalog.yml 中添加:

raw_data:
  type: pandas.CSVDataSet
  filepath: data/01_raw/input.csv

cleaned_data:
  type: pandas.CSVDataSet
  filepath: data/02_intermediate/cleaned.csv

stats_data:
  type: pandas.CSVDataSet
  filepath: data/03_primary/stats.csv

5.4 配置参数

conf/base/parameters.yml 中添加:

raw_data_path: "data/01_raw/input.csv"

5.5 注册管道

打开 pipeline_registry.py,注册管道:

from kedro.framework.pipeline import Pipeline
from kedro_demo.pipelines.demo_pipeline import create_pipeline as demo_pipeline

def register_pipelines() -> dict[str, Pipeline]:
    pipelines = {
        "__default__": demo_pipeline(),
        "demo": demo_pipeline(),
    }
    return pipelines

5.6 准备测试数据

data/01_raw 下创建 input.csv

id,value1,value2
1,10,20
2,,30
3,40,
1,10,20

5.7 运行管道

在项目根目录执行:

kedro run

运行成功后,可在对应目录看到输出文件。

六、进阶使用:参数化、可视化与多环境

6.1 使用动态参数

修改 parameters.yml

raw_data_path: "data/01_raw/input.csv"
fill_value: 0
drop_duplicates: True

更新 nodes.py

def clean_data(df: pd.DataFrame, fill_value: int, drop_duplicates: bool) -> pd.DataFrame:
    if drop_duplicates:
        df = df.drop_duplicates()
    df = df.fillna(fill_value)
    return df

修改管道节点:

node(
    func=clean_data,
    inputs=dict(df="raw_data", fill_value="params:fill_value", drop_duplicates="params:drop_duplicates"),
    outputs="cleaned_data",
    name="clean_data_node",
),

再次运行即可使用新参数。

6.2 管道可视化

执行命令启动可视化服务:

kedro viz run

浏览器自动打开界面,可查看节点依赖、运行状态、数据流向,支持交互式查看。

6.3 多环境切换

Kedro 支持 baselocalprod 等多环境配置,只需在 conf/ 下新建环境文件夹,覆盖对应配置即可,运行时指定环境:

kedro run --env=prod

七、机器学习实战:基于 Kedro 的分类模型 pipeline

7.1 编写机器学习节点

新建 ml_pipeline 文件夹,nodes.py

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

def split_data(df: pd.DataFrame, test_size: float, random_state: int):
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )
    return X_train, X_test, y_train, y_test

def train_model(X_train: pd.DataFrame, y_train: pd.Series):
    model = LogisticRegression(max_iter=1000)
    model.fit(X_train, y_train)
    return model

def evaluate_model(model, X_test: pd.DataFrame, y_test: pd.Series):
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    return {"accuracy": acc}

7.2 构建机器学习管道

pipeline.py

from kedro.pipeline import Pipeline, node
from .nodes import split_data, train_model, evaluate_model

def create_ml_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                split_data,
                inputs=["stats_data", "params:test_size", "params:random_state"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data",
            ),
            node(
                train_model,
                inputs=["X_train", "y_train"],
                outputs="model",
                name="train_model",
            ),
            node(
                evaluate_model,
                inputs=["model", "X_test", "y_test"],
                outputs="metrics",
                name="evaluate_model",
            ),
        ]
    )

7.3 配置与运行

catalog.yml 配置模型与指标输出:

model:
  type: pickle.PickleDataSet
  filepath: data/06_models/model.pkl

metrics:
  type: json.JSONDataSet
  filepath: data/07_model_output/metrics.json

在参数中添加:

test_size: 0.2
random_state: 42

注册管道后运行:

kedro run

可得到训练好的模型与评估结果。

八、Kedro 优势与适用场景

Kedro 解决了数据科学项目代码混乱、难以复现、协作成本高的问题,通过强制工程化规范,让脚本式代码升级为可维护、可测试、可部署的生产级项目。适合团队协作、长期维护、需要上线部署的数据管道与机器学习项目,尤其在数据清洗、特征工程、模型训练、批量预测场景优势明显。

它将数据科学家从路径管理、依赖混乱、环境不一致中解放出来,专注算法与逻辑本身,同时让数据工程与生产环境对接更平滑,支持直接对接 Airflow、Prefect、Kubeflow 等调度工具。

相关资源

  • Pypi地址:https://pypi.org/project/kedro/
  • Github地址:https://github.com/kedro-org/kedro
  • 官方文档地址:https://docs.kedro.org/

关注我,每天分享一个实用的Python自动化工具。

Python 任务编排神器:Luigi 库从入门到实战教程

一、Luigi 库概述

Luigi 是 Spotify 开源的 Python 任务编排与工作流管理库,专注于解决复杂批量任务依赖、执行与监控问题,核心原理是通过定义任务依赖关系自动构建执行拓扑图,按依赖顺序调度任务,支持任务失败重试、断点续跑。优点是依赖管理清晰、适配大数据与批处理场景、代码侵入低、易集成;缺点是无原生分布式调度、WebUI 功能简洁。采用 Apache License 2.0 开源协议,可商用与修改。

二、Luigi 库安装与基础环境配置

2.1 安装 Luigi

Luigi 支持 Python 3.6 及以上版本,使用 pip 即可快速安装,打开命令行执行以下命令:

pip install luigi

安装完成后可通过查看版本验证是否安装成功:

luigi --version

若输出对应版本号,说明安装正常。

2.2 核心基础概念

在使用 Luigi 前,需要先掌握几个核心概念:

  1. Task(任务):所有工作流的最小单元,继承 luigi.Task 类,需重写 requires()run()output() 三个核心方法。
  2. requires():定义当前任务依赖的前置任务,无依赖则可不写或返回空。
  3. run():任务的核心逻辑,编写具体执行代码。
  4. output():定义任务执行完成后的输出目标,通常为文件、数据库标识等,用于判断任务是否已完成。
  5. Target(目标):任务输出的抽象载体,常用 LocalTarget(本地文件)、HiveTargetPostgresTarget 等。
  6. 工作流:多个 Task 通过依赖关系串联形成的完整执行流程。

三、Luigi 基础使用与代码示例

3.1 最简单的单机任务

先从无依赖的基础任务入手,创建一个生成文本文件的任务,直观感受 Luigi 的执行逻辑。

创建文件 luigi_demo_01.py,代码如下:

import luigi

# 定义基础任务,继承 luigi.Task
class CreateFile(luigi.Task):
    # 定义任务输出文件
    def output(self):
        # LocalTarget 表示本地文件目标
        return luigi.LocalTarget('hello_luigi.txt')

    # 任务执行逻辑
    def run(self):
        # self.output().open() 获取输出文件句柄
        with self.output().open('w') as f:
            f.write('Hello Luigi! 这是第一个 Luigi 任务\n')
            f.write('任务执行成功!')

if __name__ == '__main__':
    # 命令行方式启动任务
    luigi.run()

代码说明

  1. 自定义 CreateFile 任务继承 luigi.Task,是 Luigi 任务的标准写法。
  2. output() 方法指定输出为本地文件 hello_luigi.txt,Luigi 会通过该文件判断任务是否完成。
  3. run() 方法内编写业务逻辑,向目标文件写入文本内容。
  4. luigi.run() 允许以命令行参数方式启动任务。

执行命令

python luigi_demo_01.py CreateFile --local-scheduler
  • CreateFile:指定要执行的任务类名。
  • --local-scheduler:使用本地调度器,适合单机测试。

执行成功后,目录下会生成 hello_luigi.txt 文件,且再次执行相同命令时,Luigi 会检测到文件已存在,直接判定任务完成,不再重复执行,这就是 Luigi 的幂等性核心特性。

3.2 带依赖的多任务工作流

实际场景中任务通常存在依赖关系,例如先创建数据文件,再读取文件处理数据。下面实现两个任务的依赖串联。

创建文件 luigi_demo_02.py

import luigi

# 任务1:生成原始数据文件
class GenerateData(luigi.Task):
    def output(self):
        return luigi.LocalTarget('data.txt')

    def run(self):
        with self.output().open('w') as f:
            # 写入 1-10 的数字
            for i in range(1, 11):
                f.write(f'{i}\n')

# 任务2:依赖 GenerateData,计算数字总和
class CalculateSum(luigi.Task):
    # 定义依赖任务
    def requires(self):
        return GenerateData()

    def output(self):
        return luigi.LocalTarget('sum_result.txt')

    def run(self):
        # 读取依赖任务的输出文件
        with self.input().open('r') as f:
            lines = f.readlines()
            # 转换为整数并求和
            total = sum(int(line.strip()) for line in lines if line.strip())

        # 写入计算结果
        with self.output().open('w') as f:
            f.write(f'1到10的总和为:{total}')

if __name__ == '__main__':
    luigi.run()

代码说明

  1. GenerateData 任务生成包含 1-10 数字的 data.txt
  2. CalculateSum 任务通过 requires() 依赖 GenerateData,执行前会先自动运行前置任务。
  3. self.input() 可直接获取依赖任务的输出 Target,无需手动指定路径,解耦且安全。
  4. 任务执行完成后生成 sum_result.txt,存储计算结果。

执行命令

python luigi_demo_02.py CalculateSum --local-scheduler

执行流程:先运行 GenerateData 生成数据文件,再运行 CalculateSum 计算总和,若 data.txt 已存在,则跳过前置任务,直接执行计算任务。

3.3 带参数的动态任务

固定任务无法满足多变需求,Luigi 支持通过 luigi.Parameter() 定义参数,实现任务动态化。

创建文件 luigi_demo_03.py

import luigi

# 带参数的生成数据任务
class GenerateNumData(luigi.Task):
    # 定义参数:数字上限
    max_num = luigi.IntParameter(default=10)
    # 定义参数:输出文件名
    filename = luigi.Parameter(default='num_data.txt')

    def output(self):
        return luigi.LocalTarget(self.filename)

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.max_num + 1):
                f.write(f'{i}\n')

# 带参数的求和任务
class CalculateDynamicSum(luigi.Task):
    max_num = luigi.IntParameter(default=10)
    filename = luigi.Parameter(default='num_data.txt')

    def requires(self):
        # 向依赖任务传递参数
        return GenerateNumData(max_num=self.max_num, filename=self.filename)

    def output(self):
        return luigi.LocalTarget('dynamic_sum_result.txt')

    def run(self):
        with self.input().open('r') as f:
            lines = f.readlines()
            total = sum(int(line.strip()) for line in lines if line.strip())

        with self.output().open('w') as f:
            f.write(f'1到{self.max_num}的总和为:{total}')

if __name__ == '__main__':
    luigi.run()

代码说明

  1. 使用 luigi.IntParameter 定义整数参数,luigi.Parameter 定义字符串参数,支持默认值。
  2. 子任务可通过 requires() 向父任务传递参数,保持参数一致性。
  3. 输出路径、数据范围均可通过参数动态调整,提升任务复用性。

执行命令(指定参数)

python luigi_demo_03.py CalculateDynamicSum --max-num 20 --filename my_data.txt --local-scheduler

命令中 --max-num 20 对应任务中的 max_num 参数,会生成 1-20 的数据并计算总和。

3.4 任务失败重试与断点续跑

Luigi 内置任务失败重试机制,无需手动编写异常处理,只需在任务中配置重试次数。

示例代码(添加重试配置):

import luigi
import random

class UnstableTask(luigi.Task):
    # 配置重试次数
    retry_count = luigi.IntParameter(default=3)

    def output(self):
        return luigi.LocalTarget('retry_test.txt')

    def run(self):
        # 模拟随机失败
        if random.random() < 0.7:
            raise Exception('任务随机失败,触发重试')
        with self.output().open('w') as f:
            f.write('任务重试成功!')

if __name__ == '__main__':
    luigi.run()

执行命令

python luigi_demo_04.py UnstableTask --local-scheduler

Luigi 会自动捕获任务异常,按 retry_count 配置重试,直到执行成功或耗尽重试次数,适合网络请求、数据库操作等不稳定场景。

四、Luigi 进阶使用:Web 监控与多任务调度

4.1 启动 Luigi 中央调度器与 WebUI

单机调度器仅适合测试,生产环境推荐使用 Luigi 中央调度器,自带 WebUI 可实时查看任务状态、依赖图、执行日志。

1. 启动中央调度器

luigid --port 8082

默认端口 8082,启动后访问:http://localhost:8082 即可打开 Web 监控界面。

2. 提交任务到中央调度器
执行任务时去掉 --local-scheduler,自动连接中央调度器:

python luigi_demo_02.py CalculateSum

在 WebUI 中可查看任务执行进度、失败任务、依赖关系,支持手动终止任务。

4.2 多任务并行执行

Luigi 支持多进程并行执行无依赖的任务,提升执行效率,通过命令行参数指定并行进程数:

python luigi_multi_task.py MainTask --workers 4

--workers 4 表示使用 4 个进程并行执行,无依赖的任务会同时运行,有依赖的任务按顺序执行。

4.3 封装为可复用任务模块

实际项目中会将任务按功能拆分到不同模块,标准目录结构如下:

luigi_project/
├── tasks/
│   ├── __init__.py
│   ├── data_task.py    # 数据生成、清洗任务
│   ├── compute_task.py # 计算、分析任务
│   └── output_task.py  # 结果输出任务
├── config/
│   └── luigi.cfg       # Luigi 配置文件
└── main.py             # 任务入口

luigi.cfg 可配置默认调度器、重试次数、日志路径等,简化命令行参数:

[core]
default-scheduler-host = localhost
default-scheduler-port = 8082
retry-attempts = 3

五、真实场景实战案例:数据清洗与统计分析工作流

5.1 案例需求

模拟企业日常数据处理流程,完成以下任务:

  1. 生成原始 CSV 数据(包含姓名、年龄、城市、销售额)。
  2. 清洗数据:去除空值、过滤异常年龄、标准化城市名称。
  3. 统计分析:按城市分组计算总销售额、平均年龄。
  4. 输出统计结果到文本文件。

5.2 完整代码实现

创建文件 data_workflow.py

import luigi
import csv
import os

# 任务1:生成原始 CSV 数据
class GenerateRawData(luigi.Task):
    def output(self):
        return luigi.LocalTarget('raw_data.csv')

    def run(self):
        # 模拟业务数据
        data = [
            ['姓名', '年龄', '城市', '销售额'],
            ['张三', 25, '北京', 5000],
            ['李四', 32, '上海', 8000],
            ['王五', '', '广州', 6000],
            ['赵六', 40, '深圳', 12000],
            ['钱七', 150, '北京', 3000],
            ['孙八', 28, '上海', 9000],
            ['周九', None, '广州', 4500]
        ]
        with self.output().open('w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerows(data)

# 任务2:清洗数据
class CleanData(luigi.Task):
    def requires(self):
        return GenerateRawData()

    def output(self):
        return luigi.LocalTarget('cleaned_data.csv')

    def run(self):
        with self.input().open('r', encoding='utf-8') as f:
            reader = csv.reader(f)
            header = next(reader)
            cleaned_data = [header]

            for row in reader:
                # 跳过空值行
                if not all(row):
                    continue
                name, age, city, sales = row
                # 过滤异常年龄
                try:
                    age = int(age)
                    sales = int(sales)
                except:
                    continue
                if 18 <= age <= 60:
                    cleaned_data.append([name, age, city, sales])

        with self.output().open('w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerows(cleaned_data)

# 任务3:按城市统计销售额
class CitySalesStat(luigi.Task):
    def requires(self):
        return CleanData()

    def output(self):
        return luigi.LocalTarget('city_sales_report.txt')

    def run(self):
        city_stat = {}
        with self.input().open('r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                city = row['城市']
                sales = int(row['销售额'])
                age = int(row['年龄'])

                if city not in city_stat:
                    city_stat[city] = {'total_sales': 0, 'total_age': 0, 'count': 0}
                city_stat[city]['total_sales'] += sales
                city_stat[city]['total_age'] += age
                city_stat[city]['count'] += 1

        # 生成报告
        with self.output().open('w', encoding='utf-8') as f:
            f.write('城市销售统计报告\n')
            f.write('='*30 + '\n')
            for city, data in city_stat.items():
                avg_age = data['total_age'] / data['count']
                f.write(f'城市:{city}\n')
                f.write(f'总销售额:{data["total_sales"]}\n')
                f.write(f'平均年龄:{avg_age:.2f}\n')
                f.write('-'*30 + '\n')

if __name__ == '__main__':
    luigi.run()

5.3 案例执行与效果

执行命令

python data_workflow.py CitySalesStat --local-scheduler

执行流程:

  1. 生成 raw_data.csv 原始数据。
  2. 清洗空值、异常年龄,生成 cleaned_data.csv
  3. 按城市统计数据,生成 city_sales_report.txt 报告。

该案例完整还原了企业数据处理流程,体现了 Luigi 依赖管理、断点续跑、任务复用的核心价值,可直接扩展对接数据库、Hive、Spark 等大数据组件。

相关资源

  • Pypi地址:https://pypi.org/project/luigi/
  • Github地址:https://github.com/spotify/luigi
  • 官方文档地址:https://luigi.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python 任务队列利器:rq 从入门到实战完全指南

一、rq 库概述

rq 全称 Redis Queue,是一款基于 Redis 开发的轻量级 Python 任务队列库,专注于处理异步任务与后台任务,核心原理是将任务函数与参数存入 Redis,由独立工作进程异步拉取执行,规避同步任务阻塞主程序的问题。该库采用 MIT 开源许可,优点是轻量简洁、易上手、无额外依赖、适配小型到中型项目;缺点是不支持复杂任务调度、集群能力较弱,更适合轻量异步场景。

二、rq 环境安装与基础配置

2.1 环境依赖准备

rq 强依赖 Redis 数据库,使用前需先完成 Redis 安装与启动,Windows、macOS、Linux 均有对应安装方式,安装后通过 redis-server 启动服务,默认端口 6379,无密码时本地可直接连接。

2.2 rq 库安装

rq 仅需通过 pip 即可安装,命令简洁无复杂配置,适合新手快速部署:

pip install rq

安装完成后,可通过导入 rq 验证是否成功,无报错则说明安装正常:

import rq
print(rq.__version__)

2.3 Redis 连接配置

rq 默认连接本地 Redis,如需自定义主机、端口、密码、数据库,可创建 Redis 连接对象,适配不同部署环境:

from redis import Redis
from rq import Queue

# 默认本地连接
redis_conn = Redis(host='localhost', port=6379, db=0)

# 带密码的远程连接
# redis_conn = Redis(host='xxx.xxx.xxx.xxx', port=6379, password='your_password', db=0)

# 初始化任务队列
task_queue = Queue(connection=redis_conn)

这段代码的作用是建立 Python 程序与 Redis 的通信通道,所有任务都会通过该连接存入 Redis 队列,是 rq 运行的基础配置。

三、rq 核心使用方式与基础代码示例

3.1 定义可被执行的任务函数

rq 的任务本质是普通 Python 函数,需满足可被序列化无复杂闭包的条件,先定义简单任务用于测试:

# tasks.py 任务文件,单独存放便于管理
import time

def simple_task(name):
    """简单异步任务:模拟耗时操作"""
    time.sleep(2)
    return f"Hello {name}, 异步任务执行完成!"

def calculate_sum(a, b):
    """计算两数之和的任务"""
    time.sleep(1)
    return f"{a} + {b} = {a + b}"

将任务单独放在 tasks.py 文件,是因为 rq 工作进程需要通过模块路径找到函数,分散存放会导致任务无法执行。

3.2 向队列添加异步任务

创建主程序文件,将任务函数加入队列,实现异步提交,不阻塞主程序运行:

# main.py 主程序文件
from redis import Redis
from rq import Queue
from tasks import simple_task, calculate_sum

# 连接 Redis
redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue(connection=redis_conn)

# 提交任务到队列,非阻塞执行
job1 = queue.enqueue(simple_task, "Python开发者")
job2 = queue.enqueue(calculate_sum, 10, 20)

# 输出任务ID,用于后续查询
print(f"任务1 ID: {job1.id}")
print(f"任务2 ID: {job2.id}")
print("主程序继续执行,无需等待任务完成")

代码说明:queue.enqueue() 是核心提交方法,第一个参数为任务函数,后续为函数参数,调用后立即返回任务对象,主程序不会等待任务执行,实现异步解耦。

3.3 启动 rq 工作进程执行任务

任务提交到 Redis 后,需要启动工作进程消费队列任务,打开新的命令行窗口,进入项目目录,执行:

rq worker

执行后工作进程会持续监听 Redis 队列,一旦有新任务就立即执行,输出任务执行日志,执行完成后返回结果。

3.4 查看任务执行状态与结果

rq 提供丰富的任务状态查询方法,可在主程序中获取任务是否完成、结果、失败原因:

# result_check.py 任务结果查询
from redis import Redis
from rq import Queue
from rq.job import Job

redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue(connection=redis_conn)

# 通过任务ID获取任务
job = Job.fetch('你的任务ID', connection=redis_conn)

# 查询任务状态
print(f"任务是否执行完成: {job.is_finished}")
print(f"任务是否执行失败: {job.is_failed}")
print(f"任务执行结果: {job.result}")
print(f"任务执行状态: {job.get_status()}")

任务状态分为 queued(排队中)、started(执行中)、finished(已完成)、failed(执行失败),可根据状态做后续业务处理。

四、rq 进阶功能使用

4.1 多队列管理

rq 支持创建多个队列,分类处理不同类型任务,避免任务阻塞:

# multi_queue.py
from redis import Redis
from rq import Queue
from tasks import simple_task, calculate_sum

redis_conn = Redis(host='localhost', port=6379, db=0)

# 创建不同优先级/类型的队列
high_queue = Queue('high', connection=redis_conn)
low_queue = Queue('low', connection=redis_conn)

# 向指定队列提交任务
high_queue.enqueue(calculate_sum, 100, 200)
low_queue.enqueue(simple_task, "普通用户")

启动工作进程时可指定监听队列:

rq worker high low

4.2 任务延迟执行

rq 支持设置任务延迟执行时间,单位为秒,满足定时异步任务需求:

# 延迟5秒执行任务
job = queue.enqueue(simple_task, "延迟任务", delay=5)

代码说明:delay 参数指定任务提交后,等待指定秒数再被工作进程执行,适用于延迟通知、延迟审核等场景。

4.3 任务失败重试与异常处理

为任务设置失败重试次数、重试间隔,提升任务执行稳定性:

# retry_task.py 带重试的任务
from rq import Retry

# 最多重试3次,每次间隔2秒
retry_strategy = Retry(max=3, interval=2)
job = queue.enqueue(simple_task, "重试任务", retry=retry_strategy)

同时可在任务函数中捕获异常,记录失败原因:

def error_task():
    try:
        # 可能出错的逻辑
        1 / 0
    except Exception as e:
        print(f"任务执行异常: {str(e)}")
        raise  # 抛出异常让rq标记任务失败

4.4 清空队列与删除任务

运维场景中可清空队列、删除指定任务,避免无效任务堆积:

# 清空当前队列所有任务
queue.empty()

# 删除指定任务
job.delete()

五、rq 实际业务场景案例

5.1 案例一:异步发送邮件

实际项目中,发送邮件是耗时操作,用 rq 异步处理可提升接口响应速度:

# email_task.py
import time
import smtplib
from email.mime.text import MIMEText

def send_async_email(to_email, content):
    """异步发送邮件任务"""
    try:
        # 模拟邮件发送(实际项目替换为真实邮件配置)
        time.sleep(3)
        msg = MIMEText(content, 'plain', 'utf-8')
        msg['From'] = '[email protected]'
        msg['To'] = to_email
        msg['Subject'] = '异步邮件通知'

        # 模拟发送成功
        print(f"邮件已发送至: {to_email}")
        return True
    except Exception as e:
        print(f"邮件发送失败: {str(e)}")
        return False

提交邮件任务:

# 提交异步邮件任务,不阻塞主程序
queue.enqueue(send_async_email, "[email protected]", "您的订单已支付成功")

5.2 案例二:异步生成报表

大数据量报表生成耗时较长,通过 rq 后台执行,生成完成后通知用户:

# report_task.py
import time
import pandas as pd

def generate_excel_report(data_list, save_path):
    """异步生成Excel报表"""
    time.sleep(5)  # 模拟数据处理耗时
    df = pd.DataFrame(data_list)
    df.to_excel(save_path, index=False)
    return f"报表生成完成,保存路径: {save_path}"

提交报表任务:

data = [{'name': '张三', 'score': 90}, {'name': '李四', 'score': 85}]
queue.enqueue(generate_excel_report, data, "./report.xlsx")

5.3 完整项目运行流程

  1. 启动 Redis 服务:redis-server
  2. 定义任务函数到 tasks.py
  3. 主程序提交任务到队列
  4. 启动工作进程:rq worker
  5. 查看任务执行状态与结果
  6. 业务逻辑根据任务结果做后续处理

该流程可直接应用于 Web 项目、自动化脚本、数据分析工具中,解决同步任务阻塞问题。

六、相关资源

  • Pypi地址:https://pypi.org/project/rq/
  • Github地址:https://github.com/rq/rq
  • 官方文档地址:https://python-rq.org/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:joblib 高效序列化与并行计算详解

一、joblib 库概述

joblib 是 Python 生态中轻量且高效的工具库,核心用于对象序列化、模型持久化、并行计算,尤其适配大数据对象与机器学习场景。其原理是优化 pickle 序列化逻辑,支持大数组分块存储,借助多进程实现并行加速。优点是轻量无依赖、读写速度快、内存占用低、并行接口简洁;缺点是不适合跨语言使用,复杂自定义对象兼容性有限。该库采用 BSD 开源许可证,可自由商用与修改。

二、joblib 安装方法

joblib 安装无需复杂环境配置,支持 pip 与 conda 两种安装方式,适配所有主流 Python 版本与操作系统。

1. pip 安装(推荐)

打开命令行工具,执行以下命令即可完成安装:

pip install joblib

2. conda 安装

若使用 Anaconda 或 Miniconda 环境,可执行:

conda install -c anaconda joblib

安装完成后,在 Python 脚本中直接导入即可使用,无额外配置步骤:

import joblib
print(joblib.__version__)

执行后输出版本号,即代表安装成功。

三、joblib 核心功能与代码实例

joblib 核心功能分为两大模块:对象持久化(dump/load)并行计算(Parallel/delayed),同时提供内存缓存、压缩存储等辅助功能,覆盖日常开发与机器学习全场景。

3.1 基础对象持久化:dump 与 load

序列化与反序列化是 joblib 最基础的功能,替代原生 pickle 模块,针对 numpy 数组、pandas 数据框、机器学习模型做了深度优化,读写速度远超 pickle,且支持大文件分块存储。

3.1.1 基础数据类型存储与读取

演示存储列表、字典、数值等基础数据类型:

import joblib

# 定义测试数据
data_list = [1, 2, 3, 4, 5]
data_dict = {"name": "joblib教程", "version": 1.3, "function": ["序列化", "并行计算"]}
number = 100

# 序列化保存数据
joblib.dump(data_list, "data_list.pkl")
joblib.dump(data_dict, "data_dict.pkl")
joblib.dump(number, "number.pkl")

# 反序列化读取数据
load_list = joblib.load("data_list.pkl")
load_dict = joblib.load("data_dict.pkl")
load_number = joblib.load("number.pkl")

print("读取列表:", load_list)
print("读取字典:", load_dict)
print("读取数值:", load_number)

代码说明joblib.dump(对象, 保存路径) 用于将 Python 对象写入文件,joblib.load(文件路径) 用于读取文件还原对象,操作逻辑与 pickle 一致,但底层优化更适合大数据对象。

3.1.2 压缩存储

joblib 支持直接存储压缩文件,节省磁盘空间,支持 gzip、bz2、xz 三种压缩格式,只需在文件名后缀标注即可:

import joblib
import numpy as np

# 生成大型 numpy 数组
big_array = np.random.rand(10000, 1000)

# 压缩存储为 gzip 格式
joblib.dump(big_array, "big_array.gz", compress=("gzip", 3))
# compress 参数可指定压缩等级,范围 0-9,数值越大压缩率越高,速度越慢

# 读取压缩文件
load_array = joblib.load("big_array.gz")
print("压缩后数组形状:", load_array.shape)
print("数组占用内存:", load_array.nbytes / 1024 / 1024, "MB")

代码说明:大数据对象直接存储会占用大量磁盘空间,使用压缩存储可减少 50%-90% 空间,joblib 读取时会自动解压,无需手动处理。

3.1.3 多对象合并存储

joblib 支持一次性存储多个对象,读取时按顺序还原,适合批量保存相关数据:

import joblib

# 定义多个对象
obj1 = [1, 2, 3]
obj2 = {"a": 1, "b": 2}
obj3 = "joblib多对象存储"

# 合并存储
joblib.dump([obj1, obj2, obj3], "multi_obj.pkl")

# 批量读取
load_obj1, load_obj2, load_obj3 = joblib.load("multi_obj.pkl")

print(load_obj1)
print(load_obj2)
print(load_obj3)

代码说明:将多个对象放入列表中统一存储,读取时按存储顺序解包,简化多文件管理逻辑。

3.2 机器学习模型持久化

joblib 诞生初衷就是为了解决机器学习模型存储问题,是 scikit-learn 官方推荐的模型保存工具,完美适配决策树、随机森林、逻辑回归、SVM 等模型。

import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# 加载数据集并划分训练集测试集
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2)

# 训练随机森林模型
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)

# 保存训练好的模型
joblib.dump(model, "iris_rf_model.pkl")

# 加载模型并预测
load_model = joblib.load("iris_rf_model.pkl")
predict = load_model.predict(X_test)

print("预测结果:", predict[:5])
print("模型准确率:", load_model.score(X_test, y_test))

代码说明:机器学习模型训练耗时较长,使用 joblib 保存后,下次直接加载即可预测,无需重新训练,极大提升开发效率。

3.3 并行计算:Parallel 与 delayed

原生 Python 多进程代码繁琐,joblib 封装了 Paralleldelayed 装饰器,一行代码实现多进程并行,大幅提升循环任务执行速度。

3.3.1 基础并行任务

import time
import joblib
from joblib import Parallel, delayed

# 定义单任务函数
def task_func(x):
    time.sleep(1)
    return x * x

# 串行执行
start = time.time()
serial_result = [task_func(i) for i in range(8)]
print("串行耗时:", time.time() - start, "秒")

# 并行执行(4进程)
start = time.time()
parallel_result = Parallel(n_jobs=4)(delayed(task_func)(i) for i in range(8))
print("并行耗时:", time.time() - start, "秒")
print("并行结果:", parallel_result)

代码说明n_jobs 指定进程数,设置为 -1 表示使用 CPU 全部核心,delayed 用于包装需要并行执行的函数,并行执行时间随进程数增加显著缩短。

3.3.2 带参数的并行任务

from joblib import Parallel, delayed

def calc_func(a, b, power):
    return (a + b) ** power

# 多参数并行执行
result = Parallel(n_jobs=2)(delayed(calc_func)(i, i+1, 2) for i in range(5))
print("多参数并行结果:", result)

代码说明delayed 可传递任意数量参数,适配复杂业务函数,无需修改函数本身逻辑。

3.3.3 并行进度显示

处理大量任务时,可通过 verbose 参数显示执行进度:

from joblib import Parallel, delayed
import time

def long_task(x):
    time.sleep(0.5)
    return x

# 显示进度
result = Parallel(n_jobs=3, verbose=10)(delayed(long_task)(i) for i in range(20))

代码说明verbose 数值越大,进度输出越详细,方便监控长时间并行任务的执行状态。

3.4 内存缓存:Memory

joblib 提供内存缓存功能,缓存函数执行结果,重复调用时直接读取缓存,避免重复计算,适合耗时较长的函数。

import joblib
import time

# 创建缓存目录
memory = joblib.Memory(location="cache_dir", verbose=0)

# 装饰器缓存函数
@memory.cache
def slow_calculate(n):
    time.sleep(2)
    return sum(range(n+1))

# 第一次执行:计算并缓存
start = time.time()
print(slow_calculate(10000))
print("第一次执行耗时:", time.time() - start, "秒")

# 第二次执行:直接读取缓存
start = time.time()
print(slow_calculate(10000))
print("第二次执行耗时:", time.time() - start, "秒")

代码说明:函数参数不变时,直接返回缓存结果,参数变化时重新计算,自动管理缓存文件,无需手动清理。

四、实际综合案例:机器学习模型训练与批量预测

结合 joblib 序列化、并行计算功能,实现完整的机器学习模型保存、加载、批量预测流程。

import joblib
import numpy as np
from sklearn.svm import SVC
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from joblib import Parallel, delayed

# 1. 加载数据并训练模型
digits = load_digits()
X, y = digits.data, digits.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

model = SVC()
model.fit(X_train, y_train)

# 2. 保存模型
joblib.dump(model, "digits_svc_model.pkl")
print("模型保存成功")

# 3. 加载模型
load_model = joblib.load("digits_svc_model.pkl")

# 4. 并行批量预测
def predict_single(idx):
    sample = X_test[idx:idx+1]
    return load_model.predict(sample)[0]

# 并行预测前100个样本
predict_result = Parallel(n_jobs=-1)(delayed(predict_single)(i) for i in range(100))
print("批量预测结果:", predict_result[:10])

# 5. 保存预测结果
joblib.dump(predict_result, "predict_result.pkl")
print("预测结果保存成功")

案例说明:该案例覆盖 joblib 三大核心功能,模型持久化避免重复训练,并行预测提升推理速度,结果序列化方便后续分析,是工业级项目常用开发模式。

相关资源

  • Pypi地址:https://pypi.org/project/joblib/
  • Github地址:https://github.com/joblib/joblib
  • 官方文档地址:https://joblib.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python数据质量神器:Great Expectations从入门到实战教程

一、Great Expectations 库概述

Great Expectations 是一款专注于数据验证、数据文档化与数据质量监控的 Python 开源库,核心用于保障数据 pipeline 中数据的准确性、完整性与一致性。其原理是通过定义「数据期望」规则,自动校验数据是否符合预期,同时生成可视化数据文档。该库采用 Apache-2.0 开源协议,优点是适配多数据源、规则易编写、可集成主流数据工具,缺点是初次配置稍繁琐,轻量数据验证场景略显冗余。

二、Great Expectations 安装与基础环境配置

2.1 库的安装

在使用 Great Expectations 之前,需要通过 pip 完成安装,打开命令行执行以下命令:

pip install great-expectations

安装完成后,可以在 Python 环境中导入库验证是否安装成功,无报错则说明安装正常。

import great_expectations as ge
from great_expectations.data_context import FileDataContext

2.2 初始化项目环境

Great Expectations 采用项目化管理,需要初始化工作目录,执行命令后会自动生成配置文件夹:

great_expectations init

初始化完成后,目录结构如下:

great_expectations/
├── great_expectations.yml  # 主配置文件
├── expectations/           # 数据验证规则存储目录
├── checkpoints/            # 验证任务配置目录
├── plugins/                # 插件目录
├── uncommitted/            # 本地配置与缓存文件
└── data_docs/               # 数据文档生成目录

该目录结构用于统一管理验证规则、数据源与报告,方便团队协作与版本控制。

三、Great Expectations 核心使用流程

3.1 加载数据并创建验证对象

Great Expectations 支持 Pandas DataFrame、Spark DataFrame、SQL 数据源等多种数据格式,这里以最常用的 Pandas 数据为例,先创建测试数据,再生成验证对象。

import pandas as pd
# 创建模拟业务数据(用户订单数据)
data = {
    "user_id": [1001, 1002, 1003, 1004, 1005, None],
    "order_id": [2024001, 2024002, 2024003, 2024004, 2024005, 2024006],
    "order_amount": [99.0, 199.0, 299.0, 399.0, 499.0, 599.0],
    "pay_status": ["已支付", "未支付", "已支付", "已支付", "未支付", "已支付"],
    "create_time": ["2024-01-01", "2024-01-02", "2024-01-03", 
                    "2024-01-04", "2024-01-05", "2024-01-06"]
}
df = pd.DataFrame(data)
# 创建 Great Expectations 验证对象
ge_df = ge.from_pandas(df)

代码说明:首先构建模拟订单数据,包含用户ID、订单ID、订单金额、支付状态、创建时间字段,通过 ge.from_pandas() 将普通 DataFrame 转换为支持数据验证的 GE 对象。

3.2 编写基础数据验证规则

Great Expectations 的核心是「期望(Expectation)」,即提前定义数据应该满足的规则,以下是常用的基础验证规则:

# 1. 验证列是否存在
ge_df.expect_column_to_exist("user_id")
# 2. 验证列值不允许为空(除指定列外)
ge_df.expect_column_values_to_not_be_null("order_id")
# 3. 验证列值唯一
ge_df.expect_column_values_to_be_unique("order_id")
# 4. 验证数值范围
ge_df.expect_column_values_to_be_between("order_amount", min_value=0, max_value=1000)
# 5. 验证列值属于指定集合
ge_df.expect_column_values_to_be_in_set("pay_status", ["已支付", "未支付"])
# 6. 验证日期格式
ge_df.expect_column_values_to_match_strftime_format("create_time", "%Y-%m-%d")

代码说明:每条 expect_* 方法对应一条验证规则,覆盖列存在性、非空、唯一性、数值范围、枚举值、日期格式等高频验证场景,无需编写复杂判断逻辑。

3.3 执行验证并查看结果

编写完规则后,调用 validate() 方法执行验证,返回包含验证结果的字典,可直观查看哪些规则通过、哪些失败。

# 执行数据验证
validation_result = ge_df.validate()
# 打印整体验证结果
print("数据验证是否通过:", validation_result.success)
# 打印详细验证统计
print("验证规则总数:", validation_result.statistics["evaluated_expectations"])
print("通过规则数:", validation_result.statistics["successful_expectations"])
print("失败规则数:", validation_result.statistics["unsuccessful_expectations"])
# 查看失败规则详情
for result in validation_result.results:
    if not result.success:
        print("\n失败规则:", result.expectation_config.expectation_type)
        print("失败列:", result.expectation_config.kwargs["column"])
        print("失败原因:", result.result)

代码说明:validate() 会批量执行所有定义的规则,success 字段表示整体是否通过,statistics 提供统计信息,失败规则会返回具体列与异常数据详情,方便快速定位问题。

3.4 生成可视化数据质量报告

Great Expectations 支持自动生成可视化数据文档(Data Docs),无需手动编写报告,可在浏览器中直观查看数据质量。

# 初始化数据上下文
context = FileDataContext.create(project_root_dir="./")
# 保存验证规则
expectation_suite = ge_df.get_expectation_suite()
expectation_suite.expectation_suite_name = "order_data_validation_suite"
context.save_expectation_suite(expectation_suite, overwrite=True)
# 构建验证任务
checkpoint = context.add_or_update_checkpoint(
    name="order_data_checkpoint",
    expectation_suite_name="order_data_validation_suite",
    batch_request=context.get_batch_request_class()(
        datasource_name="my_pandas_datasource",
        data_asset_name="order_data",
    ),
)
# 运行任务并生成报告
checkpoint_result = context.run_checkpoint(checkpoint_name="order_data_checkpoint")
# 打开数据文档
context.open_data_docs()

代码说明:通过保存验证规则、创建检查点、执行验证三步,自动生成 HTML 格式的可视化报告,打开浏览器即可查看所有规则的执行情况、数据分布、异常数据明细。

四、进阶使用:结合业务场景的复杂数据验证

4.1 多条件组合验证

在实际业务中,往往需要多条件组合验证,Great Expectations 支持自定义过滤条件,实现复杂逻辑验证。

# 验证:支付状态为已支付时,订单金额必须大于0
ge_df.expect_column_values_to_be_between(
    column="order_amount",
    min_value=0.01,
    max_value=None,
    row_condition="pay_status == '已支付'"
)
# 验证:用户ID不为空时,必须为整数类型
ge_df.expect_column_values_to_be_of_type(
    column="user_id",
    type_="int64",
    row_condition="user_id IS NOT NULL"
)

代码说明:通过 row_condition 参数添加过滤条件,实现按行筛选验证,适用于业务关联字段的合规性检查。

4.2 自定义验证规则

对于特殊业务规则,内置方法无法满足时,可通过自定义函数实现专属验证逻辑。

# 自定义验证规则:订单ID必须以2024开头
def custom_order_id_check(value):
    return str(value).startswith("2024")
# 应用自定义规则
ge_df.expect_column_values_to_be_true(
    column="order_id",
    condition=custom_order_id_check,
    condition_value="value"
)

代码说明:自定义函数返回布尔值,通过 expect_column_values_to_be_true 调用,适配企业个性化数据规范。

4.3 集成 SQL 数据源验证

Great Expectations 不仅支持本地数据,还可直接连接 MySQL、PostgreSQL 等数据库,验证线上数据。

from sqlalchemy import create_engine
# 连接数据库
engine = create_engine("mysql+pymysql://用户名:密码@主机:端口/数据库名")
# 从SQL查询创建验证对象
ge_sql_df = ge.from_sql(
    sql="SELECT * FROM order_table WHERE create_time >= '2024-01-01'",
    con=engine
)
# 执行验证
ge_sql_df.expect_column_values_to_not_be_null("order_id")
sql_validation_result = ge_sql_df.validate()
print("数据库数据验证结果:", sql_validation_result.success)

代码说明:通过 SQLAlchemy 连接数据库,直接查询数据并验证,适用于数据仓库、业务数据库的实时质量监控。

五、实际项目案例:电商订单数据全流程质量监控

5.1 案例背景

某电商平台每日产生数十万订单数据,需要保障:

  1. 核心字段(订单ID、用户ID、金额)无空值;
  2. 订单金额、支付状态符合业务逻辑;
  3. 日期格式规范,数据无重复;
  4. 自动生成每日数据质量报告。

5.2 完整代码实现

import pandas as pd
import great_expectations as ge
from great_expectations.data_context import FileDataContext

# 1. 加载生产环境订单数据
# 实际场景可替换为数据库读取或文件读取
df = pd.read_csv("ecommerce_orders.csv")
ge_df = ge.from_pandas(df)

# 2. 定义全量业务验证规则
# 基础完整性验证
ge_df.expect_column_to_exist("user_id")
ge_df.expect_column_to_exist("order_id")
ge_df.expect_column_to_exist("order_amount")
ge_df.expect_column_values_to_not_be_null("order_id")
ge_df.expect_column_values_to_be_unique("order_id")

# 业务合规性验证
ge_df.expect_column_values_to_be_between("order_amount", min_value=0.01, max_value=99999)
ge_df.expect_column_values_to_be_in_set("pay_status", ["已支付", "未支付", "退款中"])
ge_df.expect_column_values_to_match_strftime_format("create_time", "%Y-%m-%d %H:%M:%S")

# 关联逻辑验证
ge_df.expect_column_values_to_be_between(
    column="order_amount",
    min_value=0.01,
    row_condition="pay_status == '已支付'"
)

# 3. 执行验证
result = ge_df.validate()

# 4. 输出验证结果
if result.success:
    print("订单数据质量合格,可进入后续分析流程")
else:
    print("订单数据存在异常,请修复后再处理")

# 5. 生成并保存数据质量报告
context = FileDataContext.create(project_root_dir="./")
suite = ge_df.get_expectation_suite()
suite.expectation_suite_name = "ecommerce_order_validation"
context.save_expectation_suite(suite, overwrite=True)

checkpoint = context.add_or_update_checkpoint(
    name="daily_order_check",
    expectation_suite_name="ecommerce_order_validation",
)
context.run_checkpoint(checkpoint_name="daily_order_check")
context.open_data_docs()

代码说明:该案例完整模拟电商订单数据的质量监控流程,从数据加载、规则定义、验证执行到报告生成,可直接集成到数据 pipeline 中,实现自动化数据校验。

六、相关资源

  • Pypi地址:https://pypi.org/project/great-expectations/
  • Github地址:https://github.com/great-expectations/great_expectations
  • 官方文档地址:https://docs.greatexpectations.io/docs/

关注我,每天分享一个实用的Python自动化工具。

Python 数据管道神器:Dagster 从入门到实战完全教程

一、Dagster 基础认知

1.1 库用途与原理

Dagster 是面向数据流水线、ETL 与机器学习工作流的开源编排框架,专注数据资产的定义、调度、监控与可观测性,以数据资产为核心组织任务,替代传统脚本式调度。采用资产优先设计,通过代码定义数据依赖与执行逻辑,支持本地调试、多环境部署、任务重试与日志追踪。基于 Apache 2.0 开源协议,优点是调试友好、依赖清晰、易维护;缺点是学习成本高于简单调度工具,轻量场景略显冗余。

1.2 核心优势与适用场景

Dagster 最核心的价值,是把散乱的数据脚本变成可管理、可测试、可复用的数据资产,特别适合数据分析、数据仓库、机器学习流水线、定时 ETL、API 数据同步等场景。它和 Airflow、Prefect 最大的区别在于:从一开始就围绕“数据产出”设计,而不是单纯的任务调度。你可以在本地完整调试流水线,直接看到每个步骤的输入输出,上线后不会出现“本地能跑、线上挂掉”的黑盒问题。

同时 Dagster 原生支持类型检查、数据质量校验、重试机制、资源隔离、多环境配置(开发/测试/生产),让数据代码从“能用”变成“可靠”。对于团队协作场景,它能清晰展示数据血缘,谁在什么时候生成了什么数据、依赖哪些上游,一目了然。

二、Dagster 安装与环境准备

2.1 安装命令

在使用 Dagster 前,确保你已安装 Python 3.8+ 版本,使用 pip 即可完成安装:

# 安装核心库
pip install dagster

# 安装 Web UI 与命令行工具(强烈推荐)
pip install dagster-webserver dagit
  • dagster:核心库,提供资产、作业、调度、资源等基础能力。
  • dagster-webserver / dagit:提供可视化 Web UI,用于查看、调试、运行流水线。

安装完成后,可通过以下命令验证版本:

dagster --version

出现版本号即表示安装成功。

2.2 最小项目结构

Dagster 项目有约定俗成的目录结构,便于管理资产、作业、资源与配置,推荐小白使用如下结构:

dagster_demo/
├── __init__.py
├── assets/       # 数据资产存放目录
│   ├── __init__.py
│   └── core_assets.py  # 核心资产代码
├── jobs.py       # 作业定义
├── schedules.py  # 调度定义
└── dagster.py    # 项目入口

该结构把数据产出(assets)、执行单元(jobs)、定时规则(schedules) 分开,既符合工程规范,又方便后期扩展。

三、Dagster 核心概念与基础使用

3.1 核心概念理解

  1. Asset(资产):Dagster 最核心的概念,代表一个数据产出,如 CSV、表、模型文件、JSON 等,通过 @asset 装饰器定义。
  2. Job(作业):由一个或多个资产组成的可执行单元,用于一次性或定时运行。
  3. Resource(资源):外部连接封装,如数据库、S3、API 客户端,实现环境隔离。
  4. Schedule(调度):定时执行作业,支持 cron 表达式。
  5. Op:更细粒度的计算单元,资产底层基于 Op 实现。

3.2 第一个资产示例

我们从最简单的资产开始,创建一个生成文本文件的资产,直观感受 Dagster 的工作方式。

assets/core_assets.py 中编写代码:

from dagster import asset

# 定义第一个数据资产:生成欢迎信息
@asset
def hello_dagster_asset() -> str:
    """
    第一个 Dagster 资产
    输出:字符串文本
    """
    content = "Hello, Dagster! 这是我的第一个数据资产"
    print(content)
    return content

代码说明

  • 使用 @asset 装饰器将普通函数变成 Dagster 资产。
  • 函数返回值就是该资产的产出,可以是字符串、DataFrame、文件路径等。
  • 函数注释会在 Web UI 中展示,方便理解资产用途。

3.3 加载资产并启动 Web UI

在项目根目录的 dagster.py 中加载资产:

from dagster import Definitions, load_assets_from_modules

# 从 assets 模块加载所有资产
from . import assets

all_assets = load_assets_from_modules([assets])

# 定义整个项目的入口
defs = Definitions(
    assets=all_assets,
)

代码说明

  • Definitions 是 Dagster 项目的总入口,管理资产、作业、调度、资源。
  • load_assets_from_modules 自动扫描模块中的所有资产,无需手动逐个注册。

启动 Web UI:

# 在项目根目录执行
dagster-webserver -f dagster.py

启动成功后,访问:http://localhost:3000,即可看到可视化界面。在界面中可以看到 hello_dagster_asset 资产,点击“Materialize”即可运行,运行后可查看日志与输出结果。

四、带依赖的多资产流水线

4.1 资产依赖示例

真实场景中,数据通常有上下游依赖,Dagster 会自动根据函数参数识别依赖关系,无需手动配置。

扩展 core_assets.py

from dagster import asset
import pandas as pd
import os

# 上游资产:生成原始数据
@asset
def raw_user_data() -> pd.DataFrame:
    """生成原始用户数据,返回 DataFrame"""
    data = {
        "user_id": [1, 2, 3, 4, 5],
        "username": ["张三", "李四", "王五", "赵六", "钱七"],
        "age": [22, 25, 28, 24, 26]
    }
    df = pd.DataFrame(data)
    return df

# 下游资产:依赖 raw_user_data,进行数据清洗
@asset
def clean_user_data(raw_user_data: pd.DataFrame) -> pd.DataFrame:
    """
    依赖上游原始数据,进行清洗
    筛选年龄大于 24 的用户
    """
    clean_df = raw_user_data[raw_user_data["age"] > 24].copy()
    print("清洗后数据:")
    print(clean_df)
    return clean_df

# 最终资产:依赖清洗后的数据,输出到 CSV 文件
@asset
def user_data_report(clean_user_data: pd.DataFrame) -> str:
    """
    依赖清洗后的数据,生成 CSV 报告文件
    返回文件路径
    """
    output_path = "user_report.csv"
    clean_user_data.to_csv(output_path, index=False, encoding="utf-8-sig")
    return output_path

代码说明

  1. raw_user_data 是源头资产,无上游依赖,生成原始数据。
  2. clean_user_data 参数为 raw_user_data,Dagster 自动识别为上游依赖,必须先运行上游。
  3. user_data_report 依赖清洗后的数据,最终输出 CSV 文件。
  4. 整个流程形成一条完整的流水线:原始数据 → 清洗 → 生成报告。

重启 Web UI 后,可在界面看到资产依赖图,点击任意资产可选择运行上游依赖,支持单独运行、全链路运行、重试、查看输入输出

4.2 作业与手动执行

作业是资产的执行集合,我们把上述资产打包成一个作业,方便执行。

jobs.py 中定义:

from dagster import define_asset_job
from assets.core_assets import raw_user_data, clean_user_data, user_data_report

# 定义用户数据处理作业
user_data_job = define_asset_job(
    name="user_data_job",
    selection=[raw_user_data, clean_user_data, user_data_report]
)

dagster.py 中加入作业:

from dagster import Definitions, load_assets_from_modules
from .jobs import user_data_job
from . import assets

all_assets = load_assets_from_modules([assets])

defs = Definitions(
    assets=all_assets,
    jobs=[user_data_job],  # 注册作业
)

在 Web UI 的 Jobs 页面,可直接点击“Launch”运行整个作业,流水线会按依赖顺序自动执行。

五、调度与定时任务

5.1 定时调度实现

Dagster 支持 cron 表达式实现定时执行,例如每天、每小时运行流水线。

schedules.py 中定义:

from dagster import ScheduleDefinition
from jobs import user_data_job

# 每天 0 点执行用户数据作业
daily_user_data_schedule = ScheduleDefinition(
    job=user_data_job,
    cron_schedule="0 0 * * *",  # 分 时 日 月 周
    name="daily_user_data_schedule",
    description="每日凌晨执行用户数据处理任务"
)

dagster.py 中注册调度:

from dagster import Definitions, load_assets_from_modules
from . import assets
from .jobs import user_data_job
from .schedules import daily_user_data_schedule

all_assets = load_assets_from_modules([assets])

defs = Definitions(
    assets=all_assets,
    jobs=[user_data_job],
    schedules=[daily_user_data_schedule],  # 注册调度
)

启动 Web UI 后,在 Schedules 页面可开启/关闭调度,查看历史运行记录与日志。

六、资源与环境隔离(数据库示例)

6.1 资源封装

资源用于封装外部连接,实现开发/生产环境隔离,避免代码硬编码配置。以 SQLite 为例:

在项目中新建 resources.py

from dagster import resource
import sqlite3

class SQLiteResource:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = None

    def connect(self):
        self.conn = sqlite3.connect(self.db_path)
        return self.conn

    def close(self):
        if self.conn:
            self.conn.close()

@resource
def sqlite_resource(context) -> SQLiteResource:
    """SQLite 资源定义"""
    db_path = "dagster_demo.db"
    return SQLiteResource(db_path)

在资产中使用资源:

from dagster import asset
from resources import SQLiteResource

@asset
def create_user_table(sqlite_resource: SQLiteResource) -> None:
    """使用资源创建用户表"""
    conn = sqlite_resource.connect()
    cursor = conn.cursor()
    create_sql = """
    CREATE TABLE IF NOT EXISTS users (
        user_id INTEGER PRIMARY KEY,
        username TEXT,
        age INTEGER
    )
    """
    cursor.execute(create_sql)
    conn.commit()
    sqlite_resource.close()

代码说明

  • 资源通过参数注入资产,解耦代码与外部配置。
  • 切换环境只需修改资源,无需改动业务资产代码。

七、完整实战案例:自动化数据统计报告

7.1 案例需求

  1. 从模拟数据源获取原始数据。
  2. 数据清洗与统计分析。
  3. 结果存入数据库。
  4. 生成可视化报告文件。
  5. 每日定时执行。

7.2 完整代码实现

# assets/report_assets.py
from dagster import asset
import pandas as pd
import sqlite3
from datetime import datetime

@asset
def source_data() -> pd.DataFrame:
    """模拟来源数据:每日订单数据"""
    date = datetime.now().strftime("%Y-%m-%d")
    data = {
        "order_id": [1001, 1002, 1003, 1004, 1005],
        "amount": [99, 199, 299, 99, 499],
        "status": ["success", "fail", "success", "success", "fail"],
        "date": [date] * 5
    }
    return pd.DataFrame(data)

@asset
def stat_order_data(source_data: pd.DataFrame) -> pd.DataFrame:
    """统计成功订单数据"""
    success_df = source_data[source_data["status"] == "success"]
    stat_df = success_df.groupby("date").agg(
        total_order=("order_id", "count"),
        total_amount=("amount", "sum")
    ).reset_index()
    return stat_df

@asset
def save_to_db(stat_order_data: pd.DataFrame) -> None:
    """将统计结果存入 SQLite"""
    conn = sqlite3.connect("dagster_demo.db")
    stat_order_data.to_sql("order_stat", conn, if_exists="append", index=False)
    conn.close()

@asset
def generate_report(stat_order_data: pd.DataFrame) -> str:
    """生成每日统计报告"""
    report_path = f"order_report_{datetime.now().strftime('%Y%m%d')}.txt"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write("每日订单统计报告\n")
        f.write(stat_order_data.to_string())
    return report_path

将上述资产加载到 Definitions 中,打包成作业并配置每日调度,即可实现全自动数据统计+入库+报告生成

在 Web UI 中可查看:

  • 资产依赖关系
  • 每一步输入输出
  • 运行日志与报错信息
  • 历史执行记录
  • 数据血缘追踪

八、Dagster 部署与扩展

Dagster 支持多种部署方式:

  • 本地单机运行(适合开发调试)
  • Docker 容器化部署
  • Kubernetes 集群部署(生产高可用)
  • 配合 Dagster Cloud 托管服务

生产环境中,通常使用持久化元数据存储(PostgreSQL)+ 独立工作节点,实现高可用与分布式执行。

相关资源

  • Pypi地址:https://pypi.org/project/dagster
  • Github地址:https://github.com/dagster-io/dagster
  • 官方文档地址:https://docs.dagster.io

关注我,每天分享一个实用的Python自动化工具。

Python 工作流神器:Prefect 从入门到实战,轻松搞定自动化任务调度

一、Prefect 库简介

Prefect 是一款面向现代 Python 开发者的工作流编排与任务调度库,核心用于简化、可视化、监控各类自动化任务,无需复杂配置即可构建健壮的数据流与定时任务。其基于“工作流即代码”理念,通过装饰器将普通函数转为可调度任务,自带调度引擎与状态管理,支持失败重试、超时控制、并行执行。采用 Apache 2.0 开源协议,优点是上手快、无侵入、可视化强,缺点是轻量场景下功能略冗余,超大规模集群需搭配服务端。

二、Prefect 安装与环境准备

2.1 安装 Prefect

在使用 Prefect 前,只需通过 pip 完成安装,兼容 Python 3.8 及以上版本,打开命令行执行以下指令:

pip install prefect

安装完成后,可通过版本查看命令验证是否安装成功:

prefect --version

若输出版本号,说明安装正常,可直接进入开发环节。

2.2 基础概念理解

Prefect 核心只有两个关键概念,新手也能快速掌握:

  • Flow(工作流):整个自动化任务的入口,一个 Flow 可包含多个任务。
  • Task(任务):工作流中的最小执行单元,负责具体逻辑处理。
    只需给函数添加装饰器,普通 Python 代码就能变成可监控、可调度的工作流,无需修改原有业务逻辑。

三、Prefect 基础使用与代码示例

3.1 最简单的工作流示例

先从最基础的案例入手,感受 Prefect 零门槛接入的特点,直接将打印函数转为工作流:

# 导入核心模块
from prefect import flow, task

# 定义任务
@task
def say_hello():
    print("Hello, Prefect!")
    return "任务执行完成"

# 定义工作流
@flow
def hello_prefect_flow():
    # 调用任务
    result = say_hello()
    print("工作流执行结果:", result)

# 运行工作流
if __name__ == "__main__":
    hello_prefect_flow()

代码说明

  1. 使用 @task 装饰器修饰普通函数,将其标记为 Prefect 任务。
  2. 使用 @flow 装饰器修饰主函数,作为整个工作流的入口。
  3. 直接运行 Python 文件,即可启动工作流,控制台会输出执行日志与结果。
    执行后可看到清晰的执行状态,包括任务开始、结束、耗时等信息,比原生脚本更易追踪。

3.2 带参数的任务与工作流

实际开发中,任务通常需要传入参数,Prefect 对参数支持完全原生,无需额外适配:

from prefect import flow, task

# 带参数的任务
@task
def add_numbers(a: int, b: int) -> int:
    res = a + b
    print(f"{a} + {b} = {res}")
    return res

@flow
def calculate_flow(x: int, y: int):
    total = add_numbers(x, y)
    print(f"最终计算结果:{total}")

if __name__ == "__main__":
    # 传入参数运行
    calculate_flow(10, 25)

代码说明
任务和工作流都支持位置参数、关键字参数、默认参数,与原生 Python 函数使用方式完全一致,降低学习成本。

3.3 多任务顺序执行

复杂业务通常包含多个步骤,Prefect 可轻松编排多任务顺序执行,自动管理依赖关系:

from prefect import flow, task

@task
def step_one():
    print("执行第一步:数据读取")
    return "原始数据"

@task
def step_two(data):
    print(f"执行第二步:处理数据 -> {data}")
    return "处理后数据"

@task
def step_three(processed_data):
    print(f"执行第三步:保存数据 -> {processed_data}")
    return "数据保存成功"

@flow
def multi_step_flow():
    data = step_one()
    processed = step_two(data)
    result = step_three(processed)
    print(result)

if __name__ == "__main__":
    multi_step_flow()

代码说明
任务之间可通过返回值传递数据,Prefect 会自动按定义顺序执行,前一个任务失败后序任务自动停止,保证流程安全。

四、Prefect 高级功能实战

4.1 任务失败重试

自动化脚本常因网络波动、接口超时报错,Prefect 内置重试机制,一行配置解决问题:

from prefect import flow, task
import random

# 配置重试:最多重试3次,重试间隔1秒
@task(retries=3, retry_delay_seconds=1)
def unstable_task():
    # 模拟随机失败
    if random.random() < 0.5:
        raise Exception("网络异常,任务执行失败")
    print("任务执行成功")
    return True

@flow
def retry_flow():
    unstable_task()

if __name__ == "__main__":
    retry_flow()

代码说明
retries 设置重试次数,retry_delay_seconds 设置重试间隔,遇到临时异常可自动恢复,无需手动捕获。

4.2 定时任务调度

很多场景需要定时执行,如每日数据统计、定时爬虫、定时清理日志,Prefect 内置调度功能:

from prefect import flow, task
from datetime import timedelta

@task
def daily_task():
    print("执行每日定时任务")

# 每隔 10 秒执行一次
@flow
def scheduled_flow():
    daily_task()

if __name__ == "__main__":
    # 启动调度
    from prefect.server.schemas.schedules import IntervalSchedule
    scheduled_flow.serve(
        schedule=IntervalSchedule(interval=timedelta(seconds=10))
    )

代码说明
运行后脚本会持续运行,每 10 秒自动执行一次工作流,支持秒、分、时、日等间隔,适合轻量定时场景。

4.3 任务并行执行

提升执行效率最常用的方式是并行,Prefect 无需多线程/多进程代码,通过配置实现并行:

from prefect import flow, task
import time

@task
def task_a():
    time.sleep(1)
    print("任务A执行完成")
    return "A"

@task
def task_b():
    time.sleep(1)
    print("任务B执行完成")
    return "B"

@task
def task_c():
    time.sleep(1)
    print("任务C执行完成")
    return "C"

@flow
def parallel_flow():
    # 并行提交任务
    a = task_a.submit()
    b = task_b.submit()
    c = task_c.submit()

    # 等待结果
    print("所有任务结果:", a.result(), b.result(), c.result())

if __name__ == "__main__":
    parallel_flow()

代码说明
使用 .submit() 提交任务即可实现并行,三个任务原本需 3 秒,并行后仅需 1 秒,大幅提升效率。

4.4 工作流超时控制

防止任务卡死占用资源,可给工作流或任务设置超时时间:

from prefect import flow, task
import time

@task(timeout_seconds=3)
def long_running_task():
    time.sleep(5)
    print("任务执行完成")

@flow
def timeout_flow():
    long_running_task()

if __name__ == "__main__":
    timeout_flow()

代码说明
任务设置 3 秒超时,但实际执行 5 秒,Prefect 会自动终止任务并标记失败,避免资源阻塞。

五、Prefect UI 可视化控制台

Prefect 自带可视化界面,可实时查看工作流执行状态、日志、失败记录,无需额外搭建监控系统。

5.1 启动 UI 控制台

命令行执行:

prefect server start

启动成功后,浏览器访问:

http://127.0.0.1:4200

即可进入控制台,可查看:

  • 所有工作流与任务列表
  • 执行历史与耗时统计
  • 失败任务与报错日志
  • 定时任务调度状态

5.2 连接 UI 执行工作流

运行工作流后,无需额外配置,执行记录会自动上传到 UI,新手也能快速定位问题。

六、真实业务场景案例

6.1 每日自动化数据统计脚本

模拟企业常用场景:每日从接口获取数据、清洗处理、保存结果、发送通知:

from prefect import flow, task
import time
from datetime import datetime

# 模拟获取数据
@task(retries=2, retry_delay_seconds=2)
def fetch_data():
    print(f"{datetime.now()} - 从数据源获取数据...")
    time.sleep(1)
    return {"user_count": 1245, "order_count": 328}

# 数据清洗
@task
def clean_data(data):
    print("数据清洗中...")
    time.sleep(1)
    cleaned = {
        "统计时间": datetime.now().strftime("%Y-%m-%d"),
        "用户数": data["user_count"],
        "订单数": data["order_count"],
        "状态": "已清洗"
    }
    return cleaned

# 保存数据
@task
def save_data(cleaned_data):
    print("保存数据到本地/数据库...")
    time.sleep(1)
    with open("daily_report.txt", "w", encoding="utf-8") as f:
        f.write(str(cleaned_data))
    return True

# 发送通知
@task
def send_notification(success):
    if success:
        print("每日统计完成,已发送通知")
    else:
        print("统计失败,需人工检查")

# 完整工作流
@flow
def daily_statistics_flow():
    raw_data = fetch_data()
    cleaned = clean_data(raw_data)
    save_success = save_data(cleaned)
    send_notification(save_success)

if __name__ == "__main__":
    # 启动定时服务
    daily_statistics_flow.serve(
        name="每日数据统计任务",
        schedule={"interval": "24h"}
    )

代码说明
该脚本整合了重试、定时、多步骤编排、数据传递等核心功能,可直接用于生产环境,替代传统 crontab + 杂乱脚本,维护性大幅提升。

七、相关资源

  • Pypi地址:https://pypi.org/project/prefect/
  • Github地址:https://github.com/PrefectHQ/prefect
  • 官方文档地址:https://docs.prefect.io/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Beam:轻量化任务调度与异步执行入门教程

一、Beam库核心概述

1.1 用途与工作原理

Python Beam库是一款轻量化的任务调度与异步执行工具,主打简单任务编排、定时任务管理和异步函数执行能力,能够帮助开发者摆脱复杂的多线程/多进程代码编写,快速实现任务的并行处理和定时触发。其核心工作原理基于Python的asyncio异步框架和schedule定时任务模块,通过封装任务队列、执行器和调度器,将用户定义的函数转化为可调度、可异步执行的任务单元,同时支持任务依赖管理和执行状态监控。

1.2 优缺点分析

优点

  • 轻量化设计,无过多第三方依赖,安装和部署成本极低;
  • API设计简洁直观,技术小白也能快速上手;
  • 同时支持同步任务、异步任务和定时任务,适用场景广泛;
  • 支持任务执行状态回调,便于监控任务运行结果。

缺点

  • 不支持分布式任务调度,仅适用于单机场景;
  • 高并发任务处理能力较弱,无法替代Celery等专业任务队列;
  • 文档和社区资源相对较少,问题排查难度略高。

1.3 License类型

Beam库采用MIT开源许可证,允许开发者自由使用、修改和分发源代码,无论是个人项目还是商业项目都可以无门槛集成,仅需保留原作者版权声明即可。

二、Beam库安装与环境配置

2.1 安装方式

Beam库已发布至PyPI,支持通过pip命令一键安装,适用于Python 3.7及以上版本,具体安装命令如下:

# 安装最新稳定版
pip install beam

# 安装指定版本(以0.7.0为例)
pip install beam==0.7.0

安装完成后,可在Python环境中通过以下代码验证安装是否成功:

import beam
# 打印库版本号,验证安装
print(beam.__version__)

若终端输出对应的版本号,则说明安装成功。

2.2 环境依赖说明

Beam库的核心依赖仅有两个Python标准库:

  • asyncio:用于实现异步任务执行;
  • schedule:用于实现定时任务调度。
    无需额外安装其他依赖,兼容性极强,可在Windows、Linux、macOS等主流操作系统中正常运行。

三、Beam库核心功能与代码示例

3.1 基础任务执行:同步与异步

Beam库的核心对象是TaskExecutorTask用于封装需要执行的函数,Executor用于负责任务的调度和执行。

3.1.1 同步任务执行

同步任务是指按照顺序依次执行的任务,适用于无依赖的简单函数调用。

from beam import Task, Executor

# 定义一个简单的同步函数
def add(a: int, b: int) -> int:
    """两数相加的同步函数"""
    result = a + b
    print(f"执行加法任务:{a} + {b} = {result}")
    return result

def multiply(a: int, b: int) -> int:
    """两数相乘的同步函数"""
    result = a * b
    print(f"执行乘法任务:{a} * {b} = {result}")
    return result

# 步骤1:创建任务执行器
executor = Executor()

# 步骤2:创建Task对象,封装函数和参数
task1 = Task(target=add, args=(2, 3))
task2 = Task(target=multiply, args=(4, 5))

# 步骤3:将任务添加到执行器并执行
executor.add_task(task1)
executor.add_task(task2)

# 执行所有任务(同步执行,按添加顺序执行)
executor.run()

代码说明

  • 首先导入TaskExecutor两个核心类;
  • 定义addmultiply两个同步函数作为任务目标;
  • 创建Executor执行器对象,通过add_task方法添加任务;
  • 调用executor.run()方法执行所有任务,任务会按照添加顺序依次同步执行。

执行结果

执行加法任务:2 + 3 = 5
执行乘法任务:4 * 5 = 20

3.1.2 异步任务执行

异步任务是指无需等待前一个任务完成即可执行的任务,适用于I/O密集型场景(如网络请求、文件读写),能够有效提升任务执行效率。

import asyncio
from beam import Task, Executor

# 定义一个异步函数(模拟网络请求)
async def async_fetch(url: str) -> str:
    """模拟异步获取URL内容"""
    print(f"开始请求URL:{url}")
    # 模拟网络延迟
    await asyncio.sleep(2)
    result = f"成功获取{url}的内容"
    print(result)
    return result

# 步骤1:创建执行器
executor = Executor()

# 步骤2:创建异步任务(注意:异步函数需要指定is_async=True)
task1 = Task(target=async_fetch, args=("https://www.example.com",), is_async=True)
task2 = Task(target=async_fetch, args=("https://www.python.org",), is_async=True)

# 步骤3:添加任务并执行
executor.add_task(task1)
executor.add_task(task2)

# 执行异步任务
executor.run()

代码说明

  • 定义异步函数async_fetch,使用async def关键字声明;
  • 创建Task对象时,必须通过is_async=True标记该任务为异步任务;
  • 调用executor.run()后,两个异步任务会同时启动,无需等待前一个任务完成,总执行时间约为2秒(而非4秒)。

执行结果

开始请求URL:https://www.example.com
开始请求URL:https://www.python.org
成功获取https://www.example.com的内容
成功获取https://www.python.org的内容

3.2 定时任务调度

Beam库支持基于时间的定时任务调度,能够实现固定时间间隔执行特定时间点执行的需求,底层依赖schedule库实现。

3.2.1 固定间隔执行任务

from beam import Task, Executor, IntervalTrigger

# 定义需要定时执行的函数
def timed_print():
    """定时打印当前时间"""
    from datetime import datetime
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"定时任务执行时间:{current_time}")

# 步骤1:创建时间触发器(每5秒执行一次)
trigger = IntervalTrigger(seconds=5)

# 步骤2:创建定时任务,绑定触发器
task = Task(target=timed_print, trigger=trigger)

# 步骤3:创建执行器并添加任务
executor = Executor()
executor.add_task(task)

# 步骤4:启动执行器,持续运行定时任务
# 注意:定时任务需要使用run_forever()方法,而非run()
executor.run_forever()

代码说明

  • 导入IntervalTrigger时间触发器类,用于定义任务执行间隔;
  • IntervalTrigger支持seconds(秒)、minutes(分钟)、hours(小时)等参数,此处设置为每5秒执行一次;
  • 定时任务需要调用executor.run_forever()方法启动,执行器会持续运行并按照设定的间隔触发任务;
  • 若需要停止定时任务,可在终端按下Ctrl+C中断程序。

执行结果

定时任务执行时间:2026-01-08 10:00:00
定时任务执行时间:2026-01-08 10:00:05
定时任务执行时间:2026-01-08 10:00:10
...

3.2.2 特定时间点执行任务

除了固定间隔,Beam库还支持通过CronTrigger实现类似Linux Crontab的定时规则,例如每天上午10点执行任务。

from beam import Task, Executor, CronTrigger

def daily_report():
    """每天10点生成日报"""
    print("生成每日工作报表...")

# 创建Cron触发器(每天10点执行)
# Cron表达式格式:分 时 日 月 周
trigger = CronTrigger(minute="0", hour="10", day="*", month="*", week="*")

# 创建任务并添加到执行器
task = Task(target=daily_report, trigger=trigger)
executor = Executor()
executor.add_task(task)

# 启动执行器
executor.run_forever()

代码说明

  • CronTrigger的参数与Crontab规则一致,支持通配符*(表示任意值);
  • 上述代码中,minute="0", hour="10"表示每天10点0分执行任务;
  • 适用于需要固定时间点执行的周期性任务,如日报生成、数据备份等。

3.3 任务依赖管理

在实际开发中,多个任务之间可能存在依赖关系(如任务B必须在任务A执行完成后才能执行),Beam库支持通过dependencies参数实现任务依赖管理。

from beam import Task, Executor

def task_a():
    """任务A:生成基础数据"""
    print("执行任务A:生成基础数据")
    return [1, 2, 3, 4, 5]

def task_b(data: list):
    """任务B:处理任务A生成的数据"""
    print(f"执行任务B:接收任务A的数据 {data}")
    processed_data = [x * 2 for x in data]
    print(f"任务B处理结果:{processed_data}")
    return processed_data

# 步骤1:创建任务A
task_a_obj = Task(target=task_a, name="task_a")

# 步骤2:创建任务B,指定依赖任务A
# dependencies参数接收任务对象列表,任务B会在任务A执行完成后自动获取其返回值
task_b_obj = Task(target=task_b, args=(task_a_obj.result,), dependencies=[task_a_obj], name="task_b")

# 步骤3:执行任务
executor = Executor()
executor.add_task(task_a_obj)
executor.add_task(task_b_obj)
executor.run()

代码说明

  • 创建任务时可以通过name参数指定任务名称,便于识别;
  • 任务B的args参数中使用task_a_obj.result表示接收任务A的返回值作为参数;
  • dependencies=[task_a_obj]表示任务B依赖任务A,执行器会确保任务A执行完成后再执行任务B。

执行结果

执行任务A:生成基础数据
执行任务B:接收任务A的数据 [1, 2, 3, 4, 5]
任务B处理结果:[2, 4, 6, 8, 10]

3.4 任务执行状态监控

Beam库支持通过回调函数监控任务的执行状态,包括任务开始、任务成功、任务失败三种状态,便于开发者及时处理任务执行过程中的异常。

from beam import Task, Executor

# 定义任务函数(包含异常场景)
def divide(a: int, b: int) -> float:
    """两数相除,模拟异常场景"""
    return a / b

# 定义状态回调函数
def on_task_start(task):
    """任务开始时的回调"""
    print(f"任务 {task.name} 开始执行...")

def on_task_success(task, result):
    """任务成功时的回调"""
    print(f"任务 {task.name} 执行成功,结果:{result}")

def on_task_failure(task, exception):
    """任务失败时的回调"""
    print(f"任务 {task.name} 执行失败,异常:{exception}")

# 创建执行器
executor = Executor()

# 创建正常任务
task_normal = Task(
    target=divide,
    args=(10, 2),
    name="normal_task",
    on_start=on_task_start,
    on_success=on_task_success,
    on_failure=on_task_failure
)

# 创建异常任务(除数为0)
task_error = Task(
    target=divide,
    args=(10, 0),
    name="error_task",
    on_start=on_task_start,
    on_success=on_task_success,
    on_failure=on_task_failure
)

# 添加任务并执行
executor.add_task(task_normal)
executor.add_task(task_error)
executor.run()

代码说明

  • Task对象分别绑定on_starton_successon_failure三个回调函数;
  • 正常任务执行时,会依次触发on_starton_success
  • 异常任务(除数为0)执行时,会触发on_starton_failure,并传入异常信息。

执行结果

任务 normal_task 开始执行...
任务 normal_task 执行成功,结果:5.0
任务 error_task 开始执行...
任务 error_task 执行失败,异常:division by zero

四、实际应用案例:文件批量处理工具

4.1 案例需求

开发一个文件批量处理工具,实现以下功能:

  1. 遍历指定目录下的所有.txt文件;
  2. 异步读取每个文件的内容;
  3. 统计每个文件的字符数;
  4. 将统计结果写入到result.txt文件中。

4.2 代码实现

import asyncio
import os
from typing import List
from beam import Task, Executor

# 定义异步文件读取函数
async def read_file_async(file_path: str) -> tuple:
    """异步读取文件内容并统计字符数"""
    try:
        async with asyncio.open(file_path, "r", encoding="utf-8") as f:
            content = await f.read()
        char_count = len(content)
        file_name = os.path.basename(file_path)
        return (file_name, char_count)
    except Exception as e:
        return (os.path.basename(file_path), f"读取失败:{str(e)}")

# 定义结果写入函数
def write_result(results: List[tuple]):
    """将统计结果写入result.txt"""
    with open("result.txt", "w", encoding="utf-8") as f:
        f.write("文件名\t字符数\n")
        f.write("-" * 20 + "\n")
        for file_name, count in results:
            f.write(f"{file_name}\t{count}\n")
    print("统计结果已写入result.txt")

# 定义主函数
def batch_process_files(dir_path: str):
    """批量处理指定目录下的txt文件"""
    # 步骤1:获取目录下所有txt文件路径
    txt_files = []
    for file in os.listdir(dir_path):
        if file.endswith(".txt"):
            txt_files.append(os.path.join(dir_path, file))

    if not txt_files:
        print("未找到任何txt文件")
        return

    # 步骤2:创建执行器和异步任务
    executor = Executor()
    tasks = []
    for file_path in txt_files:
        task = Task(
            target=read_file_async,
            args=(file_path,),
            is_async=True,
            name=f"task_{os.path.basename(file_path)}"
        )
        tasks.append(task)
        executor.add_task(task)

    # 步骤3:执行所有异步任务
    executor.run()

    # 步骤4:收集所有任务结果
    results = [task.result for task in tasks]

    # 步骤5:写入结果文件
    write_result(results)

# 执行批量处理(替换为你的目标目录)
if __name__ == "__main__":
    target_dir = "./test_files"  # 目标目录
    # 创建测试目录和文件(可选,用于测试)
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
        # 创建测试文件1
        with open(os.path.join(target_dir, "file1.txt"), "w", encoding="utf-8") as f:
            f.write("Hello Beam!")
        # 创建测试文件2
        with open(os.path.join(target_dir, "file2.txt"), "w", encoding="utf-8") as f:
            f.write("Python 任务调度工具")

    batch_process_files(target_dir)

4.3 代码说明

  1. 异步文件读取:使用asyncio.open异步读取文件内容,避免I/O阻塞,提升批量处理效率;
  2. 任务创建:为每个.txt文件创建一个异步任务,通过is_async=True标记;
  3. 结果收集:任务执行完成后,通过task.result收集每个任务的返回值;
  4. 结果写入:将所有文件的统计结果写入result.txt,便于后续查看。

4.4 执行结果

运行代码后,会在当前目录生成result.txt文件,内容如下:

文件名    字符数
--
file1.txt    10
file2.txt    14

五、相关资源链接

  • PyPI地址:https://pypi.org/project/Beam
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。