Python数据质量神器:Great Expectations从入门到实战教程

一、Great Expectations 库概述

Great Expectations 是一款专注于数据验证、数据文档化与数据质量监控的 Python 开源库,核心用于保障数据 pipeline 中数据的准确性、完整性与一致性。其原理是通过定义「数据期望」规则,自动校验数据是否符合预期,同时生成可视化数据文档。该库采用 Apache-2.0 开源协议,优点是适配多数据源、规则易编写、可集成主流数据工具,缺点是初次配置稍繁琐,轻量数据验证场景略显冗余。

二、Great Expectations 安装与基础环境配置

2.1 库的安装

在使用 Great Expectations 之前,需要通过 pip 完成安装,打开命令行执行以下命令:

pip install great-expectations

安装完成后,可以在 Python 环境中导入库验证是否安装成功,无报错则说明安装正常。

import great_expectations as ge
from great_expectations.data_context import FileDataContext

2.2 初始化项目环境

Great Expectations 采用项目化管理,需要初始化工作目录,执行命令后会自动生成配置文件夹:

great_expectations init

初始化完成后,目录结构如下:

great_expectations/
├── great_expectations.yml  # 主配置文件
├── expectations/           # 数据验证规则存储目录
├── checkpoints/            # 验证任务配置目录
├── plugins/                # 插件目录
├── uncommitted/            # 本地配置与缓存文件
└── data_docs/               # 数据文档生成目录

该目录结构用于统一管理验证规则、数据源与报告,方便团队协作与版本控制。

三、Great Expectations 核心使用流程

3.1 加载数据并创建验证对象

Great Expectations 支持 Pandas DataFrame、Spark DataFrame、SQL 数据源等多种数据格式,这里以最常用的 Pandas 数据为例,先创建测试数据,再生成验证对象。

import pandas as pd
# 创建模拟业务数据(用户订单数据)
data = {
    "user_id": [1001, 1002, 1003, 1004, 1005, None],
    "order_id": [2024001, 2024002, 2024003, 2024004, 2024005, 2024006],
    "order_amount": [99.0, 199.0, 299.0, 399.0, 499.0, 599.0],
    "pay_status": ["已支付", "未支付", "已支付", "已支付", "未支付", "已支付"],
    "create_time": ["2024-01-01", "2024-01-02", "2024-01-03", 
                    "2024-01-04", "2024-01-05", "2024-01-06"]
}
df = pd.DataFrame(data)
# 创建 Great Expectations 验证对象
ge_df = ge.from_pandas(df)

代码说明:首先构建模拟订单数据,包含用户ID、订单ID、订单金额、支付状态、创建时间字段,通过 ge.from_pandas() 将普通 DataFrame 转换为支持数据验证的 GE 对象。

3.2 编写基础数据验证规则

Great Expectations 的核心是「期望(Expectation)」,即提前定义数据应该满足的规则,以下是常用的基础验证规则:

# 1. 验证列是否存在
ge_df.expect_column_to_exist("user_id")
# 2. 验证列值不允许为空(除指定列外)
ge_df.expect_column_values_to_not_be_null("order_id")
# 3. 验证列值唯一
ge_df.expect_column_values_to_be_unique("order_id")
# 4. 验证数值范围
ge_df.expect_column_values_to_be_between("order_amount", min_value=0, max_value=1000)
# 5. 验证列值属于指定集合
ge_df.expect_column_values_to_be_in_set("pay_status", ["已支付", "未支付"])
# 6. 验证日期格式
ge_df.expect_column_values_to_match_strftime_format("create_time", "%Y-%m-%d")

代码说明:每条 expect_* 方法对应一条验证规则,覆盖列存在性、非空、唯一性、数值范围、枚举值、日期格式等高频验证场景,无需编写复杂判断逻辑。

3.3 执行验证并查看结果

编写完规则后,调用 validate() 方法执行验证,返回包含验证结果的字典,可直观查看哪些规则通过、哪些失败。

# 执行数据验证
validation_result = ge_df.validate()
# 打印整体验证结果
print("数据验证是否通过:", validation_result.success)
# 打印详细验证统计
print("验证规则总数:", validation_result.statistics["evaluated_expectations"])
print("通过规则数:", validation_result.statistics["successful_expectations"])
print("失败规则数:", validation_result.statistics["unsuccessful_expectations"])
# 查看失败规则详情
for result in validation_result.results:
    if not result.success:
        print("\n失败规则:", result.expectation_config.expectation_type)
        print("失败列:", result.expectation_config.kwargs["column"])
        print("失败原因:", result.result)

代码说明:validate() 会批量执行所有定义的规则,success 字段表示整体是否通过,statistics 提供统计信息,失败规则会返回具体列与异常数据详情,方便快速定位问题。

3.4 生成可视化数据质量报告

Great Expectations 支持自动生成可视化数据文档(Data Docs),无需手动编写报告,可在浏览器中直观查看数据质量。

# 初始化数据上下文
context = FileDataContext.create(project_root_dir="./")
# 保存验证规则
expectation_suite = ge_df.get_expectation_suite()
expectation_suite.expectation_suite_name = "order_data_validation_suite"
context.save_expectation_suite(expectation_suite, overwrite=True)
# 构建验证任务
checkpoint = context.add_or_update_checkpoint(
    name="order_data_checkpoint",
    expectation_suite_name="order_data_validation_suite",
    batch_request=context.get_batch_request_class()(
        datasource_name="my_pandas_datasource",
        data_asset_name="order_data",
    ),
)
# 运行任务并生成报告
checkpoint_result = context.run_checkpoint(checkpoint_name="order_data_checkpoint")
# 打开数据文档
context.open_data_docs()

代码说明:通过保存验证规则、创建检查点、执行验证三步,自动生成 HTML 格式的可视化报告,打开浏览器即可查看所有规则的执行情况、数据分布、异常数据明细。

四、进阶使用:结合业务场景的复杂数据验证

4.1 多条件组合验证

在实际业务中,往往需要多条件组合验证,Great Expectations 支持自定义过滤条件,实现复杂逻辑验证。

# 验证:支付状态为已支付时,订单金额必须大于0
ge_df.expect_column_values_to_be_between(
    column="order_amount",
    min_value=0.01,
    max_value=None,
    row_condition="pay_status == '已支付'"
)
# 验证:用户ID不为空时,必须为整数类型
ge_df.expect_column_values_to_be_of_type(
    column="user_id",
    type_="int64",
    row_condition="user_id IS NOT NULL"
)

代码说明:通过 row_condition 参数添加过滤条件,实现按行筛选验证,适用于业务关联字段的合规性检查。

4.2 自定义验证规则

对于特殊业务规则,内置方法无法满足时,可通过自定义函数实现专属验证逻辑。

# 自定义验证规则:订单ID必须以2024开头
def custom_order_id_check(value):
    return str(value).startswith("2024")
# 应用自定义规则
ge_df.expect_column_values_to_be_true(
    column="order_id",
    condition=custom_order_id_check,
    condition_value="value"
)

代码说明:自定义函数返回布尔值,通过 expect_column_values_to_be_true 调用,适配企业个性化数据规范。

4.3 集成 SQL 数据源验证

Great Expectations 不仅支持本地数据,还可直接连接 MySQL、PostgreSQL 等数据库,验证线上数据。

from sqlalchemy import create_engine
# 连接数据库
engine = create_engine("mysql+pymysql://用户名:密码@主机:端口/数据库名")
# 从SQL查询创建验证对象
ge_sql_df = ge.from_sql(
    sql="SELECT * FROM order_table WHERE create_time >= '2024-01-01'",
    con=engine
)
# 执行验证
ge_sql_df.expect_column_values_to_not_be_null("order_id")
sql_validation_result = ge_sql_df.validate()
print("数据库数据验证结果:", sql_validation_result.success)

代码说明:通过 SQLAlchemy 连接数据库,直接查询数据并验证,适用于数据仓库、业务数据库的实时质量监控。

五、实际项目案例:电商订单数据全流程质量监控

5.1 案例背景

某电商平台每日产生数十万订单数据,需要保障:

  1. 核心字段(订单ID、用户ID、金额)无空值;
  2. 订单金额、支付状态符合业务逻辑;
  3. 日期格式规范,数据无重复;
  4. 自动生成每日数据质量报告。

5.2 完整代码实现

import pandas as pd
import great_expectations as ge
from great_expectations.data_context import FileDataContext

# 1. 加载生产环境订单数据
# 实际场景可替换为数据库读取或文件读取
df = pd.read_csv("ecommerce_orders.csv")
ge_df = ge.from_pandas(df)

# 2. 定义全量业务验证规则
# 基础完整性验证
ge_df.expect_column_to_exist("user_id")
ge_df.expect_column_to_exist("order_id")
ge_df.expect_column_to_exist("order_amount")
ge_df.expect_column_values_to_not_be_null("order_id")
ge_df.expect_column_values_to_be_unique("order_id")

# 业务合规性验证
ge_df.expect_column_values_to_be_between("order_amount", min_value=0.01, max_value=99999)
ge_df.expect_column_values_to_be_in_set("pay_status", ["已支付", "未支付", "退款中"])
ge_df.expect_column_values_to_match_strftime_format("create_time", "%Y-%m-%d %H:%M:%S")

# 关联逻辑验证
ge_df.expect_column_values_to_be_between(
    column="order_amount",
    min_value=0.01,
    row_condition="pay_status == '已支付'"
)

# 3. 执行验证
result = ge_df.validate()

# 4. 输出验证结果
if result.success:
    print("订单数据质量合格,可进入后续分析流程")
else:
    print("订单数据存在异常,请修复后再处理")

# 5. 生成并保存数据质量报告
context = FileDataContext.create(project_root_dir="./")
suite = ge_df.get_expectation_suite()
suite.expectation_suite_name = "ecommerce_order_validation"
context.save_expectation_suite(suite, overwrite=True)

checkpoint = context.add_or_update_checkpoint(
    name="daily_order_check",
    expectation_suite_name="ecommerce_order_validation",
)
context.run_checkpoint(checkpoint_name="daily_order_check")
context.open_data_docs()

代码说明:该案例完整模拟电商订单数据的质量监控流程,从数据加载、规则定义、验证执行到报告生成,可直接集成到数据 pipeline 中,实现自动化数据校验。

六、相关资源

  • Pypi地址:https://pypi.org/project/great-expectations/
  • Github地址:https://github.com/great-expectations/great_expectations
  • 官方文档地址:https://docs.greatexpectations.io/docs/

关注我,每天分享一个实用的Python自动化工具。

Python 数据管道神器:Dagster 从入门到实战完全教程

一、Dagster 基础认知

1.1 库用途与原理

Dagster 是面向数据流水线、ETL 与机器学习工作流的开源编排框架,专注数据资产的定义、调度、监控与可观测性,以数据资产为核心组织任务,替代传统脚本式调度。采用资产优先设计,通过代码定义数据依赖与执行逻辑,支持本地调试、多环境部署、任务重试与日志追踪。基于 Apache 2.0 开源协议,优点是调试友好、依赖清晰、易维护;缺点是学习成本高于简单调度工具,轻量场景略显冗余。

1.2 核心优势与适用场景

Dagster 最核心的价值,是把散乱的数据脚本变成可管理、可测试、可复用的数据资产,特别适合数据分析、数据仓库、机器学习流水线、定时 ETL、API 数据同步等场景。它和 Airflow、Prefect 最大的区别在于:从一开始就围绕“数据产出”设计,而不是单纯的任务调度。你可以在本地完整调试流水线,直接看到每个步骤的输入输出,上线后不会出现“本地能跑、线上挂掉”的黑盒问题。

同时 Dagster 原生支持类型检查、数据质量校验、重试机制、资源隔离、多环境配置(开发/测试/生产),让数据代码从“能用”变成“可靠”。对于团队协作场景,它能清晰展示数据血缘,谁在什么时候生成了什么数据、依赖哪些上游,一目了然。

二、Dagster 安装与环境准备

2.1 安装命令

在使用 Dagster 前,确保你已安装 Python 3.8+ 版本,使用 pip 即可完成安装:

# 安装核心库
pip install dagster

# 安装 Web UI 与命令行工具(强烈推荐)
pip install dagster-webserver dagit
  • dagster:核心库,提供资产、作业、调度、资源等基础能力。
  • dagster-webserver / dagit:提供可视化 Web UI,用于查看、调试、运行流水线。

安装完成后,可通过以下命令验证版本:

dagster --version

出现版本号即表示安装成功。

2.2 最小项目结构

Dagster 项目有约定俗成的目录结构,便于管理资产、作业、资源与配置,推荐小白使用如下结构:

dagster_demo/
├── __init__.py
├── assets/       # 数据资产存放目录
│   ├── __init__.py
│   └── core_assets.py  # 核心资产代码
├── jobs.py       # 作业定义
├── schedules.py  # 调度定义
└── dagster.py    # 项目入口

该结构把数据产出(assets)、执行单元(jobs)、定时规则(schedules) 分开,既符合工程规范,又方便后期扩展。

三、Dagster 核心概念与基础使用

3.1 核心概念理解

  1. Asset(资产):Dagster 最核心的概念,代表一个数据产出,如 CSV、表、模型文件、JSON 等,通过 @asset 装饰器定义。
  2. Job(作业):由一个或多个资产组成的可执行单元,用于一次性或定时运行。
  3. Resource(资源):外部连接封装,如数据库、S3、API 客户端,实现环境隔离。
  4. Schedule(调度):定时执行作业,支持 cron 表达式。
  5. Op:更细粒度的计算单元,资产底层基于 Op 实现。

3.2 第一个资产示例

我们从最简单的资产开始,创建一个生成文本文件的资产,直观感受 Dagster 的工作方式。

assets/core_assets.py 中编写代码:

from dagster import asset

# 定义第一个数据资产:生成欢迎信息
@asset
def hello_dagster_asset() -> str:
    """
    第一个 Dagster 资产
    输出:字符串文本
    """
    content = "Hello, Dagster! 这是我的第一个数据资产"
    print(content)
    return content

代码说明

  • 使用 @asset 装饰器将普通函数变成 Dagster 资产。
  • 函数返回值就是该资产的产出,可以是字符串、DataFrame、文件路径等。
  • 函数注释会在 Web UI 中展示,方便理解资产用途。

3.3 加载资产并启动 Web UI

在项目根目录的 dagster.py 中加载资产:

from dagster import Definitions, load_assets_from_modules

# 从 assets 模块加载所有资产
from . import assets

all_assets = load_assets_from_modules([assets])

# 定义整个项目的入口
defs = Definitions(
    assets=all_assets,
)

代码说明

  • Definitions 是 Dagster 项目的总入口,管理资产、作业、调度、资源。
  • load_assets_from_modules 自动扫描模块中的所有资产,无需手动逐个注册。

启动 Web UI:

# 在项目根目录执行
dagster-webserver -f dagster.py

启动成功后,访问:http://localhost:3000,即可看到可视化界面。在界面中可以看到 hello_dagster_asset 资产,点击“Materialize”即可运行,运行后可查看日志与输出结果。

四、带依赖的多资产流水线

4.1 资产依赖示例

真实场景中,数据通常有上下游依赖,Dagster 会自动根据函数参数识别依赖关系,无需手动配置。

扩展 core_assets.py

from dagster import asset
import pandas as pd
import os

# 上游资产:生成原始数据
@asset
def raw_user_data() -> pd.DataFrame:
    """生成原始用户数据,返回 DataFrame"""
    data = {
        "user_id": [1, 2, 3, 4, 5],
        "username": ["张三", "李四", "王五", "赵六", "钱七"],
        "age": [22, 25, 28, 24, 26]
    }
    df = pd.DataFrame(data)
    return df

# 下游资产:依赖 raw_user_data,进行数据清洗
@asset
def clean_user_data(raw_user_data: pd.DataFrame) -> pd.DataFrame:
    """
    依赖上游原始数据,进行清洗
    筛选年龄大于 24 的用户
    """
    clean_df = raw_user_data[raw_user_data["age"] > 24].copy()
    print("清洗后数据:")
    print(clean_df)
    return clean_df

# 最终资产:依赖清洗后的数据,输出到 CSV 文件
@asset
def user_data_report(clean_user_data: pd.DataFrame) -> str:
    """
    依赖清洗后的数据,生成 CSV 报告文件
    返回文件路径
    """
    output_path = "user_report.csv"
    clean_user_data.to_csv(output_path, index=False, encoding="utf-8-sig")
    return output_path

代码说明

  1. raw_user_data 是源头资产,无上游依赖,生成原始数据。
  2. clean_user_data 参数为 raw_user_data,Dagster 自动识别为上游依赖,必须先运行上游。
  3. user_data_report 依赖清洗后的数据,最终输出 CSV 文件。
  4. 整个流程形成一条完整的流水线:原始数据 → 清洗 → 生成报告。

重启 Web UI 后,可在界面看到资产依赖图,点击任意资产可选择运行上游依赖,支持单独运行、全链路运行、重试、查看输入输出

4.2 作业与手动执行

作业是资产的执行集合,我们把上述资产打包成一个作业,方便执行。

jobs.py 中定义:

from dagster import define_asset_job
from assets.core_assets import raw_user_data, clean_user_data, user_data_report

# 定义用户数据处理作业
user_data_job = define_asset_job(
    name="user_data_job",
    selection=[raw_user_data, clean_user_data, user_data_report]
)

dagster.py 中加入作业:

from dagster import Definitions, load_assets_from_modules
from .jobs import user_data_job
from . import assets

all_assets = load_assets_from_modules([assets])

defs = Definitions(
    assets=all_assets,
    jobs=[user_data_job],  # 注册作业
)

在 Web UI 的 Jobs 页面,可直接点击“Launch”运行整个作业,流水线会按依赖顺序自动执行。

五、调度与定时任务

5.1 定时调度实现

Dagster 支持 cron 表达式实现定时执行,例如每天、每小时运行流水线。

schedules.py 中定义:

from dagster import ScheduleDefinition
from jobs import user_data_job

# 每天 0 点执行用户数据作业
daily_user_data_schedule = ScheduleDefinition(
    job=user_data_job,
    cron_schedule="0 0 * * *",  # 分 时 日 月 周
    name="daily_user_data_schedule",
    description="每日凌晨执行用户数据处理任务"
)

dagster.py 中注册调度:

from dagster import Definitions, load_assets_from_modules
from . import assets
from .jobs import user_data_job
from .schedules import daily_user_data_schedule

all_assets = load_assets_from_modules([assets])

defs = Definitions(
    assets=all_assets,
    jobs=[user_data_job],
    schedules=[daily_user_data_schedule],  # 注册调度
)

启动 Web UI 后,在 Schedules 页面可开启/关闭调度,查看历史运行记录与日志。

六、资源与环境隔离(数据库示例)

6.1 资源封装

资源用于封装外部连接,实现开发/生产环境隔离,避免代码硬编码配置。以 SQLite 为例:

在项目中新建 resources.py

from dagster import resource
import sqlite3

class SQLiteResource:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = None

    def connect(self):
        self.conn = sqlite3.connect(self.db_path)
        return self.conn

    def close(self):
        if self.conn:
            self.conn.close()

@resource
def sqlite_resource(context) -> SQLiteResource:
    """SQLite 资源定义"""
    db_path = "dagster_demo.db"
    return SQLiteResource(db_path)

在资产中使用资源:

from dagster import asset
from resources import SQLiteResource

@asset
def create_user_table(sqlite_resource: SQLiteResource) -> None:
    """使用资源创建用户表"""
    conn = sqlite_resource.connect()
    cursor = conn.cursor()
    create_sql = """
    CREATE TABLE IF NOT EXISTS users (
        user_id INTEGER PRIMARY KEY,
        username TEXT,
        age INTEGER
    )
    """
    cursor.execute(create_sql)
    conn.commit()
    sqlite_resource.close()

代码说明

  • 资源通过参数注入资产,解耦代码与外部配置。
  • 切换环境只需修改资源,无需改动业务资产代码。

七、完整实战案例:自动化数据统计报告

7.1 案例需求

  1. 从模拟数据源获取原始数据。
  2. 数据清洗与统计分析。
  3. 结果存入数据库。
  4. 生成可视化报告文件。
  5. 每日定时执行。

7.2 完整代码实现

# assets/report_assets.py
from dagster import asset
import pandas as pd
import sqlite3
from datetime import datetime

@asset
def source_data() -> pd.DataFrame:
    """模拟来源数据:每日订单数据"""
    date = datetime.now().strftime("%Y-%m-%d")
    data = {
        "order_id": [1001, 1002, 1003, 1004, 1005],
        "amount": [99, 199, 299, 99, 499],
        "status": ["success", "fail", "success", "success", "fail"],
        "date": [date] * 5
    }
    return pd.DataFrame(data)

@asset
def stat_order_data(source_data: pd.DataFrame) -> pd.DataFrame:
    """统计成功订单数据"""
    success_df = source_data[source_data["status"] == "success"]
    stat_df = success_df.groupby("date").agg(
        total_order=("order_id", "count"),
        total_amount=("amount", "sum")
    ).reset_index()
    return stat_df

@asset
def save_to_db(stat_order_data: pd.DataFrame) -> None:
    """将统计结果存入 SQLite"""
    conn = sqlite3.connect("dagster_demo.db")
    stat_order_data.to_sql("order_stat", conn, if_exists="append", index=False)
    conn.close()

@asset
def generate_report(stat_order_data: pd.DataFrame) -> str:
    """生成每日统计报告"""
    report_path = f"order_report_{datetime.now().strftime('%Y%m%d')}.txt"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write("每日订单统计报告\n")
        f.write(stat_order_data.to_string())
    return report_path

将上述资产加载到 Definitions 中,打包成作业并配置每日调度,即可实现全自动数据统计+入库+报告生成

在 Web UI 中可查看:

  • 资产依赖关系
  • 每一步输入输出
  • 运行日志与报错信息
  • 历史执行记录
  • 数据血缘追踪

八、Dagster 部署与扩展

Dagster 支持多种部署方式:

  • 本地单机运行(适合开发调试)
  • Docker 容器化部署
  • Kubernetes 集群部署(生产高可用)
  • 配合 Dagster Cloud 托管服务

生产环境中,通常使用持久化元数据存储(PostgreSQL)+ 独立工作节点,实现高可用与分布式执行。

相关资源

  • Pypi地址:https://pypi.org/project/dagster
  • Github地址:https://github.com/dagster-io/dagster
  • 官方文档地址:https://docs.dagster.io

关注我,每天分享一个实用的Python自动化工具。

Python 工作流神器:Prefect 从入门到实战,轻松搞定自动化任务调度

一、Prefect 库简介

Prefect 是一款面向现代 Python 开发者的工作流编排与任务调度库,核心用于简化、可视化、监控各类自动化任务,无需复杂配置即可构建健壮的数据流与定时任务。其基于“工作流即代码”理念,通过装饰器将普通函数转为可调度任务,自带调度引擎与状态管理,支持失败重试、超时控制、并行执行。采用 Apache 2.0 开源协议,优点是上手快、无侵入、可视化强,缺点是轻量场景下功能略冗余,超大规模集群需搭配服务端。

二、Prefect 安装与环境准备

2.1 安装 Prefect

在使用 Prefect 前,只需通过 pip 完成安装,兼容 Python 3.8 及以上版本,打开命令行执行以下指令:

pip install prefect

安装完成后,可通过版本查看命令验证是否安装成功:

prefect --version

若输出版本号,说明安装正常,可直接进入开发环节。

2.2 基础概念理解

Prefect 核心只有两个关键概念,新手也能快速掌握:

  • Flow(工作流):整个自动化任务的入口,一个 Flow 可包含多个任务。
  • Task(任务):工作流中的最小执行单元,负责具体逻辑处理。
    只需给函数添加装饰器,普通 Python 代码就能变成可监控、可调度的工作流,无需修改原有业务逻辑。

三、Prefect 基础使用与代码示例

3.1 最简单的工作流示例

先从最基础的案例入手,感受 Prefect 零门槛接入的特点,直接将打印函数转为工作流:

# 导入核心模块
from prefect import flow, task

# 定义任务
@task
def say_hello():
    print("Hello, Prefect!")
    return "任务执行完成"

# 定义工作流
@flow
def hello_prefect_flow():
    # 调用任务
    result = say_hello()
    print("工作流执行结果:", result)

# 运行工作流
if __name__ == "__main__":
    hello_prefect_flow()

代码说明

  1. 使用 @task 装饰器修饰普通函数,将其标记为 Prefect 任务。
  2. 使用 @flow 装饰器修饰主函数,作为整个工作流的入口。
  3. 直接运行 Python 文件,即可启动工作流,控制台会输出执行日志与结果。
    执行后可看到清晰的执行状态,包括任务开始、结束、耗时等信息,比原生脚本更易追踪。

3.2 带参数的任务与工作流

实际开发中,任务通常需要传入参数,Prefect 对参数支持完全原生,无需额外适配:

from prefect import flow, task

# 带参数的任务
@task
def add_numbers(a: int, b: int) -> int:
    res = a + b
    print(f"{a} + {b} = {res}")
    return res

@flow
def calculate_flow(x: int, y: int):
    total = add_numbers(x, y)
    print(f"最终计算结果:{total}")

if __name__ == "__main__":
    # 传入参数运行
    calculate_flow(10, 25)

代码说明
任务和工作流都支持位置参数、关键字参数、默认参数,与原生 Python 函数使用方式完全一致,降低学习成本。

3.3 多任务顺序执行

复杂业务通常包含多个步骤,Prefect 可轻松编排多任务顺序执行,自动管理依赖关系:

from prefect import flow, task

@task
def step_one():
    print("执行第一步:数据读取")
    return "原始数据"

@task
def step_two(data):
    print(f"执行第二步:处理数据 -> {data}")
    return "处理后数据"

@task
def step_three(processed_data):
    print(f"执行第三步:保存数据 -> {processed_data}")
    return "数据保存成功"

@flow
def multi_step_flow():
    data = step_one()
    processed = step_two(data)
    result = step_three(processed)
    print(result)

if __name__ == "__main__":
    multi_step_flow()

代码说明
任务之间可通过返回值传递数据,Prefect 会自动按定义顺序执行,前一个任务失败后序任务自动停止,保证流程安全。

四、Prefect 高级功能实战

4.1 任务失败重试

自动化脚本常因网络波动、接口超时报错,Prefect 内置重试机制,一行配置解决问题:

from prefect import flow, task
import random

# 配置重试:最多重试3次,重试间隔1秒
@task(retries=3, retry_delay_seconds=1)
def unstable_task():
    # 模拟随机失败
    if random.random() < 0.5:
        raise Exception("网络异常,任务执行失败")
    print("任务执行成功")
    return True

@flow
def retry_flow():
    unstable_task()

if __name__ == "__main__":
    retry_flow()

代码说明
retries 设置重试次数,retry_delay_seconds 设置重试间隔,遇到临时异常可自动恢复,无需手动捕获。

4.2 定时任务调度

很多场景需要定时执行,如每日数据统计、定时爬虫、定时清理日志,Prefect 内置调度功能:

from prefect import flow, task
from datetime import timedelta

@task
def daily_task():
    print("执行每日定时任务")

# 每隔 10 秒执行一次
@flow
def scheduled_flow():
    daily_task()

if __name__ == "__main__":
    # 启动调度
    from prefect.server.schemas.schedules import IntervalSchedule
    scheduled_flow.serve(
        schedule=IntervalSchedule(interval=timedelta(seconds=10))
    )

代码说明
运行后脚本会持续运行,每 10 秒自动执行一次工作流,支持秒、分、时、日等间隔,适合轻量定时场景。

4.3 任务并行执行

提升执行效率最常用的方式是并行,Prefect 无需多线程/多进程代码,通过配置实现并行:

from prefect import flow, task
import time

@task
def task_a():
    time.sleep(1)
    print("任务A执行完成")
    return "A"

@task
def task_b():
    time.sleep(1)
    print("任务B执行完成")
    return "B"

@task
def task_c():
    time.sleep(1)
    print("任务C执行完成")
    return "C"

@flow
def parallel_flow():
    # 并行提交任务
    a = task_a.submit()
    b = task_b.submit()
    c = task_c.submit()

    # 等待结果
    print("所有任务结果:", a.result(), b.result(), c.result())

if __name__ == "__main__":
    parallel_flow()

代码说明
使用 .submit() 提交任务即可实现并行,三个任务原本需 3 秒,并行后仅需 1 秒,大幅提升效率。

4.4 工作流超时控制

防止任务卡死占用资源,可给工作流或任务设置超时时间:

from prefect import flow, task
import time

@task(timeout_seconds=3)
def long_running_task():
    time.sleep(5)
    print("任务执行完成")

@flow
def timeout_flow():
    long_running_task()

if __name__ == "__main__":
    timeout_flow()

代码说明
任务设置 3 秒超时,但实际执行 5 秒,Prefect 会自动终止任务并标记失败,避免资源阻塞。

五、Prefect UI 可视化控制台

Prefect 自带可视化界面,可实时查看工作流执行状态、日志、失败记录,无需额外搭建监控系统。

5.1 启动 UI 控制台

命令行执行:

prefect server start

启动成功后,浏览器访问:

http://127.0.0.1:4200

即可进入控制台,可查看:

  • 所有工作流与任务列表
  • 执行历史与耗时统计
  • 失败任务与报错日志
  • 定时任务调度状态

5.2 连接 UI 执行工作流

运行工作流后,无需额外配置,执行记录会自动上传到 UI,新手也能快速定位问题。

六、真实业务场景案例

6.1 每日自动化数据统计脚本

模拟企业常用场景:每日从接口获取数据、清洗处理、保存结果、发送通知:

from prefect import flow, task
import time
from datetime import datetime

# 模拟获取数据
@task(retries=2, retry_delay_seconds=2)
def fetch_data():
    print(f"{datetime.now()} - 从数据源获取数据...")
    time.sleep(1)
    return {"user_count": 1245, "order_count": 328}

# 数据清洗
@task
def clean_data(data):
    print("数据清洗中...")
    time.sleep(1)
    cleaned = {
        "统计时间": datetime.now().strftime("%Y-%m-%d"),
        "用户数": data["user_count"],
        "订单数": data["order_count"],
        "状态": "已清洗"
    }
    return cleaned

# 保存数据
@task
def save_data(cleaned_data):
    print("保存数据到本地/数据库...")
    time.sleep(1)
    with open("daily_report.txt", "w", encoding="utf-8") as f:
        f.write(str(cleaned_data))
    return True

# 发送通知
@task
def send_notification(success):
    if success:
        print("每日统计完成,已发送通知")
    else:
        print("统计失败,需人工检查")

# 完整工作流
@flow
def daily_statistics_flow():
    raw_data = fetch_data()
    cleaned = clean_data(raw_data)
    save_success = save_data(cleaned)
    send_notification(save_success)

if __name__ == "__main__":
    # 启动定时服务
    daily_statistics_flow.serve(
        name="每日数据统计任务",
        schedule={"interval": "24h"}
    )

代码说明
该脚本整合了重试、定时、多步骤编排、数据传递等核心功能,可直接用于生产环境,替代传统 crontab + 杂乱脚本,维护性大幅提升。

七、相关资源

  • Pypi地址:https://pypi.org/project/prefect/
  • Github地址:https://github.com/PrefectHQ/prefect
  • 官方文档地址:https://docs.prefect.io/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Beam:轻量化任务调度与异步执行入门教程

一、Beam库核心概述

1.1 用途与工作原理

Python Beam库是一款轻量化的任务调度与异步执行工具,主打简单任务编排、定时任务管理和异步函数执行能力,能够帮助开发者摆脱复杂的多线程/多进程代码编写,快速实现任务的并行处理和定时触发。其核心工作原理基于Python的asyncio异步框架和schedule定时任务模块,通过封装任务队列、执行器和调度器,将用户定义的函数转化为可调度、可异步执行的任务单元,同时支持任务依赖管理和执行状态监控。

1.2 优缺点分析

优点

  • 轻量化设计,无过多第三方依赖,安装和部署成本极低;
  • API设计简洁直观,技术小白也能快速上手;
  • 同时支持同步任务、异步任务和定时任务,适用场景广泛;
  • 支持任务执行状态回调,便于监控任务运行结果。

缺点

  • 不支持分布式任务调度,仅适用于单机场景;
  • 高并发任务处理能力较弱,无法替代Celery等专业任务队列;
  • 文档和社区资源相对较少,问题排查难度略高。

1.3 License类型

Beam库采用MIT开源许可证,允许开发者自由使用、修改和分发源代码,无论是个人项目还是商业项目都可以无门槛集成,仅需保留原作者版权声明即可。

二、Beam库安装与环境配置

2.1 安装方式

Beam库已发布至PyPI,支持通过pip命令一键安装,适用于Python 3.7及以上版本,具体安装命令如下:

# 安装最新稳定版
pip install beam

# 安装指定版本(以0.7.0为例)
pip install beam==0.7.0

安装完成后,可在Python环境中通过以下代码验证安装是否成功:

import beam
# 打印库版本号,验证安装
print(beam.__version__)

若终端输出对应的版本号,则说明安装成功。

2.2 环境依赖说明

Beam库的核心依赖仅有两个Python标准库:

  • asyncio:用于实现异步任务执行;
  • schedule:用于实现定时任务调度。
    无需额外安装其他依赖,兼容性极强,可在Windows、Linux、macOS等主流操作系统中正常运行。

三、Beam库核心功能与代码示例

3.1 基础任务执行:同步与异步

Beam库的核心对象是TaskExecutorTask用于封装需要执行的函数,Executor用于负责任务的调度和执行。

3.1.1 同步任务执行

同步任务是指按照顺序依次执行的任务,适用于无依赖的简单函数调用。

from beam import Task, Executor

# 定义一个简单的同步函数
def add(a: int, b: int) -> int:
    """两数相加的同步函数"""
    result = a + b
    print(f"执行加法任务:{a} + {b} = {result}")
    return result

def multiply(a: int, b: int) -> int:
    """两数相乘的同步函数"""
    result = a * b
    print(f"执行乘法任务:{a} * {b} = {result}")
    return result

# 步骤1:创建任务执行器
executor = Executor()

# 步骤2:创建Task对象,封装函数和参数
task1 = Task(target=add, args=(2, 3))
task2 = Task(target=multiply, args=(4, 5))

# 步骤3:将任务添加到执行器并执行
executor.add_task(task1)
executor.add_task(task2)

# 执行所有任务(同步执行,按添加顺序执行)
executor.run()

代码说明

  • 首先导入TaskExecutor两个核心类;
  • 定义addmultiply两个同步函数作为任务目标;
  • 创建Executor执行器对象,通过add_task方法添加任务;
  • 调用executor.run()方法执行所有任务,任务会按照添加顺序依次同步执行。

执行结果

执行加法任务:2 + 3 = 5
执行乘法任务:4 * 5 = 20

3.1.2 异步任务执行

异步任务是指无需等待前一个任务完成即可执行的任务,适用于I/O密集型场景(如网络请求、文件读写),能够有效提升任务执行效率。

import asyncio
from beam import Task, Executor

# 定义一个异步函数(模拟网络请求)
async def async_fetch(url: str) -> str:
    """模拟异步获取URL内容"""
    print(f"开始请求URL:{url}")
    # 模拟网络延迟
    await asyncio.sleep(2)
    result = f"成功获取{url}的内容"
    print(result)
    return result

# 步骤1:创建执行器
executor = Executor()

# 步骤2:创建异步任务(注意:异步函数需要指定is_async=True)
task1 = Task(target=async_fetch, args=("https://www.example.com",), is_async=True)
task2 = Task(target=async_fetch, args=("https://www.python.org",), is_async=True)

# 步骤3:添加任务并执行
executor.add_task(task1)
executor.add_task(task2)

# 执行异步任务
executor.run()

代码说明

  • 定义异步函数async_fetch,使用async def关键字声明;
  • 创建Task对象时,必须通过is_async=True标记该任务为异步任务;
  • 调用executor.run()后,两个异步任务会同时启动,无需等待前一个任务完成,总执行时间约为2秒(而非4秒)。

执行结果

开始请求URL:https://www.example.com
开始请求URL:https://www.python.org
成功获取https://www.example.com的内容
成功获取https://www.python.org的内容

3.2 定时任务调度

Beam库支持基于时间的定时任务调度,能够实现固定时间间隔执行特定时间点执行的需求,底层依赖schedule库实现。

3.2.1 固定间隔执行任务

from beam import Task, Executor, IntervalTrigger

# 定义需要定时执行的函数
def timed_print():
    """定时打印当前时间"""
    from datetime import datetime
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"定时任务执行时间:{current_time}")

# 步骤1:创建时间触发器(每5秒执行一次)
trigger = IntervalTrigger(seconds=5)

# 步骤2:创建定时任务,绑定触发器
task = Task(target=timed_print, trigger=trigger)

# 步骤3:创建执行器并添加任务
executor = Executor()
executor.add_task(task)

# 步骤4:启动执行器,持续运行定时任务
# 注意:定时任务需要使用run_forever()方法,而非run()
executor.run_forever()

代码说明

  • 导入IntervalTrigger时间触发器类,用于定义任务执行间隔;
  • IntervalTrigger支持seconds(秒)、minutes(分钟)、hours(小时)等参数,此处设置为每5秒执行一次;
  • 定时任务需要调用executor.run_forever()方法启动,执行器会持续运行并按照设定的间隔触发任务;
  • 若需要停止定时任务,可在终端按下Ctrl+C中断程序。

执行结果

定时任务执行时间:2026-01-08 10:00:00
定时任务执行时间:2026-01-08 10:00:05
定时任务执行时间:2026-01-08 10:00:10
...

3.2.2 特定时间点执行任务

除了固定间隔,Beam库还支持通过CronTrigger实现类似Linux Crontab的定时规则,例如每天上午10点执行任务。

from beam import Task, Executor, CronTrigger

def daily_report():
    """每天10点生成日报"""
    print("生成每日工作报表...")

# 创建Cron触发器(每天10点执行)
# Cron表达式格式:分 时 日 月 周
trigger = CronTrigger(minute="0", hour="10", day="*", month="*", week="*")

# 创建任务并添加到执行器
task = Task(target=daily_report, trigger=trigger)
executor = Executor()
executor.add_task(task)

# 启动执行器
executor.run_forever()

代码说明

  • CronTrigger的参数与Crontab规则一致,支持通配符*(表示任意值);
  • 上述代码中,minute="0", hour="10"表示每天10点0分执行任务;
  • 适用于需要固定时间点执行的周期性任务,如日报生成、数据备份等。

3.3 任务依赖管理

在实际开发中,多个任务之间可能存在依赖关系(如任务B必须在任务A执行完成后才能执行),Beam库支持通过dependencies参数实现任务依赖管理。

from beam import Task, Executor

def task_a():
    """任务A:生成基础数据"""
    print("执行任务A:生成基础数据")
    return [1, 2, 3, 4, 5]

def task_b(data: list):
    """任务B:处理任务A生成的数据"""
    print(f"执行任务B:接收任务A的数据 {data}")
    processed_data = [x * 2 for x in data]
    print(f"任务B处理结果:{processed_data}")
    return processed_data

# 步骤1:创建任务A
task_a_obj = Task(target=task_a, name="task_a")

# 步骤2:创建任务B,指定依赖任务A
# dependencies参数接收任务对象列表,任务B会在任务A执行完成后自动获取其返回值
task_b_obj = Task(target=task_b, args=(task_a_obj.result,), dependencies=[task_a_obj], name="task_b")

# 步骤3:执行任务
executor = Executor()
executor.add_task(task_a_obj)
executor.add_task(task_b_obj)
executor.run()

代码说明

  • 创建任务时可以通过name参数指定任务名称,便于识别;
  • 任务B的args参数中使用task_a_obj.result表示接收任务A的返回值作为参数;
  • dependencies=[task_a_obj]表示任务B依赖任务A,执行器会确保任务A执行完成后再执行任务B。

执行结果

执行任务A:生成基础数据
执行任务B:接收任务A的数据 [1, 2, 3, 4, 5]
任务B处理结果:[2, 4, 6, 8, 10]

3.4 任务执行状态监控

Beam库支持通过回调函数监控任务的执行状态,包括任务开始、任务成功、任务失败三种状态,便于开发者及时处理任务执行过程中的异常。

from beam import Task, Executor

# 定义任务函数(包含异常场景)
def divide(a: int, b: int) -> float:
    """两数相除,模拟异常场景"""
    return a / b

# 定义状态回调函数
def on_task_start(task):
    """任务开始时的回调"""
    print(f"任务 {task.name} 开始执行...")

def on_task_success(task, result):
    """任务成功时的回调"""
    print(f"任务 {task.name} 执行成功,结果:{result}")

def on_task_failure(task, exception):
    """任务失败时的回调"""
    print(f"任务 {task.name} 执行失败,异常:{exception}")

# 创建执行器
executor = Executor()

# 创建正常任务
task_normal = Task(
    target=divide,
    args=(10, 2),
    name="normal_task",
    on_start=on_task_start,
    on_success=on_task_success,
    on_failure=on_task_failure
)

# 创建异常任务(除数为0)
task_error = Task(
    target=divide,
    args=(10, 0),
    name="error_task",
    on_start=on_task_start,
    on_success=on_task_success,
    on_failure=on_task_failure
)

# 添加任务并执行
executor.add_task(task_normal)
executor.add_task(task_error)
executor.run()

代码说明

  • Task对象分别绑定on_starton_successon_failure三个回调函数;
  • 正常任务执行时,会依次触发on_starton_success
  • 异常任务(除数为0)执行时,会触发on_starton_failure,并传入异常信息。

执行结果

任务 normal_task 开始执行...
任务 normal_task 执行成功,结果:5.0
任务 error_task 开始执行...
任务 error_task 执行失败,异常:division by zero

四、实际应用案例:文件批量处理工具

4.1 案例需求

开发一个文件批量处理工具,实现以下功能:

  1. 遍历指定目录下的所有.txt文件;
  2. 异步读取每个文件的内容;
  3. 统计每个文件的字符数;
  4. 将统计结果写入到result.txt文件中。

4.2 代码实现

import asyncio
import os
from typing import List
from beam import Task, Executor

# 定义异步文件读取函数
async def read_file_async(file_path: str) -> tuple:
    """异步读取文件内容并统计字符数"""
    try:
        async with asyncio.open(file_path, "r", encoding="utf-8") as f:
            content = await f.read()
        char_count = len(content)
        file_name = os.path.basename(file_path)
        return (file_name, char_count)
    except Exception as e:
        return (os.path.basename(file_path), f"读取失败:{str(e)}")

# 定义结果写入函数
def write_result(results: List[tuple]):
    """将统计结果写入result.txt"""
    with open("result.txt", "w", encoding="utf-8") as f:
        f.write("文件名\t字符数\n")
        f.write("-" * 20 + "\n")
        for file_name, count in results:
            f.write(f"{file_name}\t{count}\n")
    print("统计结果已写入result.txt")

# 定义主函数
def batch_process_files(dir_path: str):
    """批量处理指定目录下的txt文件"""
    # 步骤1:获取目录下所有txt文件路径
    txt_files = []
    for file in os.listdir(dir_path):
        if file.endswith(".txt"):
            txt_files.append(os.path.join(dir_path, file))

    if not txt_files:
        print("未找到任何txt文件")
        return

    # 步骤2:创建执行器和异步任务
    executor = Executor()
    tasks = []
    for file_path in txt_files:
        task = Task(
            target=read_file_async,
            args=(file_path,),
            is_async=True,
            name=f"task_{os.path.basename(file_path)}"
        )
        tasks.append(task)
        executor.add_task(task)

    # 步骤3:执行所有异步任务
    executor.run()

    # 步骤4:收集所有任务结果
    results = [task.result for task in tasks]

    # 步骤5:写入结果文件
    write_result(results)

# 执行批量处理(替换为你的目标目录)
if __name__ == "__main__":
    target_dir = "./test_files"  # 目标目录
    # 创建测试目录和文件(可选,用于测试)
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
        # 创建测试文件1
        with open(os.path.join(target_dir, "file1.txt"), "w", encoding="utf-8") as f:
            f.write("Hello Beam!")
        # 创建测试文件2
        with open(os.path.join(target_dir, "file2.txt"), "w", encoding="utf-8") as f:
            f.write("Python 任务调度工具")

    batch_process_files(target_dir)

4.3 代码说明

  1. 异步文件读取:使用asyncio.open异步读取文件内容,避免I/O阻塞,提升批量处理效率;
  2. 任务创建:为每个.txt文件创建一个异步任务,通过is_async=True标记;
  3. 结果收集:任务执行完成后,通过task.result收集每个任务的返回值;
  4. 结果写入:将所有文件的统计结果写入result.txt,便于后续查看。

4.4 执行结果

运行代码后,会在当前目录生成result.txt文件,内容如下:

文件名    字符数
--
file1.txt    10
file2.txt    14

五、相关资源链接

  • PyPI地址:https://pypi.org/project/Beam
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Celery分布式任务队列入门与实战教程

一、Celery 库核心概述

Celery 是一款基于 Python 开发的分布式任务队列库,主要用于处理异步任务、定时任务和分布式任务,常应用于 Web 开发、数据分析、自动化运维等场景。其工作原理是通过生产者将任务发送到消息中间件(如 RabbitMQ、Redis),消费者(Worker)从中间件获取任务并执行,结果可存储在后端存储(如 Redis、数据库)中。

Celery 的优点包括轻量级、高可用、支持多种消息中间件和结果存储、可水平扩展;缺点是需要依赖第三方消息中间件,初次配置有一定门槛。该库采用 BSD 3-Clause 许可证,开源且可自由用于商业项目。

二、Celery 安装与环境准备

2.1 安装 Celery

Celery 可通过 pip 直接安装,同时需要根据选择的消息中间件安装对应的依赖库。以最常用的 Redis 为例,安装命令如下:

# 安装 Celery 核心库
pip install celery
# 安装 Redis 依赖(用于消息中间件和结果存储)
pip install redis

如果选择 RabbitMQ 作为消息中间件,则需要安装对应的依赖:

pip install celery[librabbitmq]

2.2 消息中间件选择与配置

Celery 本身不提供消息存储功能,必须依赖第三方消息中间件,常见的选择有两种:

  1. Redis:轻量级、高性能,适合中小型项目和开发环境,配置简单。
  2. RabbitMQ:专业的消息队列,可靠性更高,适合大型生产环境。

本教程以 Redis 为例进行演示,在使用前需要确保本地或服务器已安装并启动 Redis 服务。

三、Celery 核心组件与基础使用

3.1 Celery 核心组件

在使用 Celery 之前,需要先了解其三大核心组件:

  • 生产者(Producer):负责创建任务并发送到消息队列,通常是我们的 Python 脚本或 Web 应用。
  • 消费者(Worker):负责监听消息队列并执行任务,一个 Celery 系统可以有多个 Worker 实现分布式部署。
  • 结果后端(Result Backend):用于存储任务的执行结果,可选 Redis、数据库、文件等方式。

3.2 第一个 Celery 应用:创建异步任务

我们先从最简单的异步任务开始,创建一个 Celery 实例并定义任务。

步骤1:创建 Celery 实例

新建一个名为 celery_app.py 的文件,内容如下:

# celery_app.py
from celery import Celery

# 初始化 Celery 应用
# 参数1:应用名称,可自定义
# broker:消息中间件的地址,这里使用 Redis
# backend:结果后端的地址,这里使用 Redis 存储任务结果
app = Celery(
    'first_celery_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 定义异步任务:计算两个数的和
@app.task
def add(x, y):
    return x + y

# 定义异步任务:生成指定长度的随机字符串
import random
import string
@app.task
def generate_random_string(length):
    chars = string.ascii_letters + string.digits
    return ''.join(random.choice(chars) for _ in range(length))

代码说明

  • Celery() 函数用于创建应用实例,broker 参数指定消息中间件地址,backend 指定结果存储地址,redis://localhost:6379/0 表示使用本地 Redis 的第 0 个数据库。
  • @app.task 装饰器用于将普通函数转换为 Celery 异步任务,被装饰的函数可以被异步调用。

步骤2:启动 Celery Worker

Celery Worker 是执行任务的进程,需要在命令行中启动。打开终端,进入 celery_app.py 所在的目录,执行以下命令:

# 启动 Worker,指定应用模块
# --loglevel=info 表示输出 info 级别日志,方便查看任务执行情况
celery -A celery_app worker --loglevel=info

启动成功标志:终端会输出类似 celery@xxx ready. 的信息,此时 Worker 已经开始监听消息队列中的任务。

步骤3:调用异步任务

新建一个名为 task_producer.py 的文件,作为任务生产者调用上面定义的异步任务:

# task_producer.py
from celery_app import add, generate_random_string

# 异步调用任务:delay() 方法会将任务发送到消息队列
# 该方法会立即返回一个 AsyncResult 对象,不会阻塞当前进程
result1 = add.delay(10, 20)
result2 = generate_random_string.delay(10)

# 打印任务 ID,用于查询任务状态和结果
print(f"任务1 ID: {result1.id}")
print(f"任务2 ID: {result2.id}")

# 检查任务是否执行完成
print(f"任务1是否完成: {result1.ready()}")
print(f"任务2是否完成: {result2.ready()}")

# 获取任务执行结果(如果任务未完成,会阻塞直到任务完成)
print(f"任务1执行结果: {result1.get()}")
print(f"任务2执行结果: {result2.get()}")

# 获取任务执行状态
print(f"任务1执行状态: {result1.state}")
print(f"任务2执行状态: {result2.state}")

代码说明

  • delay() 是 Celery 任务的异步调用方法,它会将任务参数序列化后发送到消息队列,然后立即返回 AsyncResult 对象。
  • ready() 方法用于判断任务是否执行完成,返回布尔值。
  • get() 方法用于获取任务执行结果,如果任务未完成,调用该方法会阻塞当前进程直到任务完成;也可以通过 timeout 参数设置超时时间。
  • state 属性返回任务当前的状态,常见状态有 PENDING(等待中)、STARTED(执行中)、SUCCESS(执行成功)、FAILURE(执行失败)。

运行 task_producer.py,输出示例如下:

任务1 ID: 8f9d6b7e-5a3b-4c1e-9f2a-1b2c3d4e5f6g
任务2 ID: 9a8b7c6d-4e3f-2g1h-0i9j-8k7l6m5n4o3p
任务1是否完成: False
任务2是否完成: False
任务1执行结果: 30
任务2执行结果: xY7kP2qR9t
任务1执行状态: SUCCESS
任务2执行状态: SUCCESS

3.3 处理任务执行异常

在实际应用中,任务执行可能会出现异常,Celery 提供了完善的异常处理机制。我们修改 celery_app.py,添加一个可能抛出异常的任务:

# celery_app.py
from celery import Celery

app = Celery(
    'first_celery_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@app.task
def divide(x, y):
    # 定义除法任务,当 y 为 0 时会抛出 ZeroDivisionError
    return x / y

然后创建 error_handler.py 调用该任务:

# error_handler.py
from celery_app import divide

# 调用除法任务,故意传入 y=0
result = divide.delay(10, 0)

try:
    # 获取结果时如果任务抛出异常,会重新抛出该异常
    print(result.get())
except ZeroDivisionError as e:
    print(f"任务执行失败,异常信息: {e}")

# 也可以通过 result.failed() 判断任务是否执行失败
print(f"任务是否失败: {result.failed()}")
# 获取异常信息
print(f"任务异常信息: {result.result}")

代码说明

  • 当任务执行抛出异常时,get() 方法会重新抛出该异常,我们可以通过 try-except 捕获并处理。
  • failed() 方法用于判断任务是否执行失败,返回布尔值。
  • result.result 属性会返回任务抛出的异常对象。

四、Celery 定时任务(Periodic Tasks)

除了异步任务,Celery 还支持定时任务,类似于 Linux 的 crontab 或 Windows 的任务计划程序。我们可以通过配置定时任务,让 Celery 自动周期性执行指定任务。

4.1 配置定时任务

修改 celery_app.py,添加定时任务配置:

# celery_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery(
    'periodic_task_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 配置定时任务
app.conf.beat_schedule = {
    # 定时任务名称,可自定义
    'add-every-10-seconds': {
        # 指定要执行的任务
        'task': 'celery_app.add',
        # 执行周期:每 10 秒执行一次
        'schedule': 10.0,
        # 任务参数
        'args': (100, 200)
    },
    # 另一个定时任务:每天凌晨 2 点执行
    'generate-string-every-day': {
        'task': 'celery_app.generate_random_string',
        # 使用 crontab 表达式配置复杂周期
        'schedule': crontab(hour=2, minute=0),
        'args': (20,)
    }
}

# 定义任务
@app.task
def add(x, y):
    result = x + y
    print(f"定时任务执行:{x} + {y} = {result}")
    return result

@app.task
def generate_random_string(length):
    import random
    import string
    chars = string.ascii_letters + string.digits
    result = ''.join(random.choice(chars) for _ in range(length))
    print(f"定时任务生成随机字符串:{result}")
    return result

代码说明

  • app.conf.beat_schedule 用于配置定时任务,每个键值对对应一个定时任务。
  • schedule 参数可以是数字(表示秒数),也可以是 crontab 对象(用于配置复杂周期,如每天、每周、每月执行)。
  • crontab(hour=2, minute=0) 表示每天凌晨 2 点执行任务,更多 crontab 表达式用法可参考 Celery 官方文档。

4.2 启动定时任务调度器(Beat)

定时任务需要两个进程配合:

  1. Beat 进程:负责按照配置的周期生成定时任务,并发送到消息队列。
  2. Worker 进程:负责执行定时任务。

首先启动 Beat 进程:

celery -A celery_app beat --loglevel=info

然后启动 Worker 进程(新打开一个终端):

celery -A celery_app worker --loglevel=info

启动成功后,Beat 进程会每 10 秒生成一个 add 任务发送到队列,Worker 进程会执行该任务并输出结果;每天凌晨 2 点会生成一个随机字符串任务。

五、Celery 任务进阶功能

5.1 任务优先级

在任务量较大时,我们可以为任务设置优先级,让重要的任务优先执行。Celery 支持为任务和 Worker 设置优先级,以 Redis 作为消息中间件为例,配置方法如下:

# celery_app.py
from celery import Celery

app = Celery(
    'priority_task_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 配置任务优先级
app.conf.task_queue_max_priority = 10  # 最大优先级为 10,数值越大优先级越高

@app.task(priority=5)
def normal_task():
    return "这是一个普通优先级任务"

@app.task(priority=10)
def high_priority_task():
    return "这是一个高优先级任务"

启动 Worker 时,需要指定优先级支持:

celery -A celery_app worker --loglevel=info --priority=10

调用任务时,高优先级任务会优先被 Worker 执行。

5.2 任务重试

当任务执行失败时,我们可以通过配置让 Celery 自动重试任务。修改任务定义,添加重试参数:

# celery_app.py
from celery import Celery
from celery.exceptions import Retry

app = Celery(
    'retry_task_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@app.task(bind=True, max_retries=3, retry_backoff=2, retry_jitter=False)
def retry_divide(self, x, y):
    """
    bind=True:将任务实例自身作为第一个参数传入
    max_retries:最大重试次数
    retry_backoff:重试间隔时间(秒),每次重试间隔会乘以 2(指数退避)
    retry_jitter:是否添加随机抖动,False 表示固定间隔
    """
    try:
        return x / y
    except ZeroDivisionError as e:
        # 抛出 Retry 异常触发重试
        self.retry(exc=e, countdown=5)  # countdown 表示 5 秒后重试

代码说明

  • bind=True 是实现任务重试的关键,它让任务函数可以访问自身的属性和方法。
  • self.retry() 方法用于触发任务重试,exc 参数指定要重试的异常,countdown 参数指定重试间隔。
  • max_retries=3 表示最多重试 3 次,超过次数后任务状态会变为 FAILURE

六、Celery 实际应用案例:批量数据处理

在数据分析场景中,我们经常需要处理大量数据,使用 Celery 可以将数据分发给多个 Worker 并行处理,提升效率。以下是一个批量处理用户数据的案例。

6.1 案例需求

有一个包含 1000 条用户数据的列表,需要对每条数据进行清洗(去除空值、格式化日期),然后将清洗后的数据保存到 Redis 中。

6.2 实现代码

步骤1:定义任务和应用

# data_processing_app.py
from celery import Celery
import json
from datetime import datetime

app = Celery(
    'data_processing_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 定义数据清洗任务
@app.task
def clean_user_data(user):
    """清洗单条用户数据"""
    # 去除空值字段
    cleaned_user = {k: v for k, v in user.items() if v is not None and v != ""}
    # 格式化注册日期
    if 'register_date' in cleaned_user:
        try:
            cleaned_user['register_date'] = datetime.strptime(
                cleaned_user['register_date'],
                '%Y-%m-%d'
            ).strftime('%Y/%m/%d')
        except ValueError:
            cleaned_user['register_date'] = '无效日期'
    # 返回清洗后的数据
    return cleaned_user

# 定义批量保存任务
@app.task
def batch_save_to_redis(cleaned_users):
    """批量保存清洗后的数据到 Redis"""
    import redis
    r = redis.Redis(host='localhost', port=6379, db=0)
    # 将数据序列化为 JSON 字符串
    r.set('cleaned_user_data', json.dumps(cleaned_users))
    return f"成功保存 {len(cleaned_users)} 条用户数据"

步骤2:生成测试数据并调用任务

# data_producer.py
from data_processing_app import clean_user_data, batch_save_to_redis
import random

# 生成 1000 条测试用户数据
def generate_test_data():
    users = []
    for i in range(1000):
        user = {
            'id': i + 1,
            'name': f'User_{i + 1}',
            'age': random.randint(18, 60) if random.random() > 0.1 else None,
            'register_date': f'202{random.randint(3, 5)}-{random.randint(1, 12)}-{random.randint(1, 28)}' if random.random() > 0.2 else ''
        }
        users.append(user)
    return users

if __name__ == '__main__':
    # 生成测试数据
    test_users = generate_test_data()
    # 异步调用清洗任务,处理每条数据
    clean_tasks = [clean_user_data.delay(user) for user in test_users]
    # 等待所有清洗任务完成,并收集结果
    cleaned_users = [task.get() for task in clean_tasks]
    # 调用批量保存任务
    save_result = batch_save_to_redis.delay(cleaned_users)
    print(save_result.get())

步骤3:启动 Worker 并运行

启动 Worker 进程:

celery -A data_processing_app worker --loglevel=info --concurrency=4

--concurrency=4 表示启动 4 个并发 Worker 进程,可根据 CPU 核心数调整。

运行 data_producer.py,程序会生成 1000 条测试数据,将每条数据的清洗任务发送到队列,Worker 并行处理后,再批量保存到 Redis 中。

七、Celery 相关资源

  • Pypi地址:https://pypi.org/project/Celery
  • Github地址:https://github.com/celery/celery
  • 官方文档地址:https://docs.celeryq.dev/en/stable/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Apache Airflow 从入门到实战 保姆级教程

一、Apache Airflow 核心概述

Apache Airflow是一款由Airbnb开源的任务编排与调度工具,专门用于管理复杂的工作流(Workflow)。其工作原理是将任务抽象为有向无环图(DAG),通过调度器按照任务依赖关系和预设的触发规则自动执行任务。Airflow支持丰富的任务类型,涵盖Shell命令、Python函数、SQL查询等,广泛应用于数据清洗、ETL流程、定时任务执行等场景。

该库的开源协议为Apache License 2.0,这是一个对商业友好的开源协议,允许用户自由使用、修改和分发代码。它的优点是灵活性强、可扩展性高、可视化界面友好;缺点是部署和维护成本较高,对新手不够友好,且轻量级任务场景下略显笨重。

二、Apache Airflow 安装与环境配置

2.1 系统环境要求

在安装Airflow之前,需要确保本地环境满足以下条件:

  • Python 3.8~3.11 版本(Airflow 2.x 对Python版本有严格要求)
  • 足够的磁盘空间(Airflow会存储任务日志和元数据)
  • 已安装pip包管理工具

2.2 安装步骤

Airflow的安装方式有多种,包括pip安装、Docker安装和源码安装,这里我们以最适合新手的pip安装为例进行讲解。

  1. 设置环境变量
    Airflow默认从PyPI下载包,在安装前需要设置一个环境变量来指定Airflow的家目录,同时避免版本冲突: # Linux/Mac系统 export AIRFLOW_HOME=~/airflow # Windows系统(cmd命令行) set AIRFLOW_HOME=D:\airflow
  2. 安装Airflow
    由于Airflow的依赖包较多,直接安装可能会出现问题,我们可以先升级pip,再指定Airflow版本进行安装(推荐安装稳定版2.6.3): # 升级pip python -m pip install --upgrade pip # 安装Airflow核心包 pip install apache-airflow==2.6.3
  3. 初始化元数据库
    Airflow使用元数据库(默认是SQLite)存储DAG信息、任务执行状态等数据,安装完成后需要初始化数据库: airflow db init
  4. 创建管理员用户
    为了登录Airflow的Web UI,需要创建一个管理员账户:
    bash airflow users create \ --username admin \ --firstname FirstName \ --lastname LastName \ --role Admin \ --email [email protected]
    执行该命令后,会提示输入密码,按照提示输入即可。

2.3 启动Airflow服务

Airflow包含两个核心服务:Web服务器调度器,需要分别启动。

  1. 启动Web服务器 # 默认端口是8080 airflow webserver --port 8080
  2. 启动调度器
    打开一个新的终端窗口,执行以下命令启动调度器,调度器会自动检测DAG目录中的任务并执行: airflow scheduler
  3. 访问Web UI
    打开浏览器,输入地址 http://localhost:8080,使用之前创建的管理员账户和密码登录,即可看到Airflow的可视化界面。

三、Apache Airflow 核心概念与基础用法

3.1 核心概念解析

在使用Airflow之前,必须理解几个核心概念,这些概念是构建工作流的基础。

  1. DAG(有向无环图)
    是Airflow的核心,代表一个完整的工作流。DAG由多个任务(Task)组成,任务之间存在依赖关系,且不存在循环依赖(无环)。例如,一个ETL工作流的DAG可以是:数据抽取任务 → 数据清洗任务 → 数据加载任务
  2. Task(任务) 是DAG中的最小执行单元,每个Task对应一个具体的操作。Airflow支持多种类型的Task,常见的有:
    • PythonOperator:执行Python函数
    • BashOperator:执行Shell命令
    • SqlOperator:执行SQL语句
    • EmailOperator:发送邮件
  3. Operator(操作符)
    是定义Task的模板,不同的Operator对应不同类型的任务。Operator的作用是将任务逻辑封装起来,用户只需要传入参数即可创建Task。
  4. Task Instance(任务实例)
    是Task的一次具体执行。每个Task在不同的时间点执行,都会生成一个独立的Task Instance,例如每天执行一次的任务,每天都会产生一个新的Task Instance。
  5. DAG Run(DAG运行实例)
    是DAG的一次具体执行。当DAG满足触发条件时,调度器会创建一个DAG Run,负责执行该次运行中的所有Task Instance。

3.2 编写第一个DAG

Airflow的DAG文件是Python脚本,默认存储在AIRFLOW_HOME/dags目录下,调度器会自动扫描该目录下的Python文件并加载DAG。

接下来我们编写一个简单的DAG,包含两个任务:一个任务打印“Hello Airflow”,另一个任务打印“Task Finished”,且第二个任务依赖于第一个任务。

  1. 创建DAG文件
    AIRFLOW_HOME/dags目录下创建一个名为hello_airflow_dag.py的文件。
  2. 编写DAG代码 # 导入必要的模块 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator # 定义Python函数,作为任务的执行逻辑 def print_hello(): print("Hello Airflow! This is my first DAG.") def print_finish(): print("Task Finished! Congratulations!") # 定义默认参数 default_args = { 'owner': 'airflow', # DAG的所有者 'depends_on_past': False, # 不依赖于上一次的执行结果 'email_on_failure': False, # 任务失败时不发送邮件 'email_on_retry': False, # 任务重试时不发送邮件 'retries': 1, # 任务失败后的重试次数 'retry_delay': timedelta(minutes=5), # 重试间隔时间 } # 定义DAG with DAG( 'hello_airflow_dag', # DAG的唯一标识 default_args=default_args, description='A simple tutorial DAG', # DAG的描述 schedule_interval=timedelta(days=1), # 调度间隔,每天执行一次 start_date=datetime(2024, 1, 1), # DAG的开始时间 catchup=False, # 不回溯执行历史任务 tags=['example', 'tutorial'], # 标签,用于分类DAG ) as dag:# 定义第一个任务 task1 = PythonOperator( task_id='print_hello_task', # 任务的唯一标识 python_callable=print_hello, # 任务执行的Python函数 ) # 定义第二个任务 task2 = PythonOperator( task_id='print_finish_task', python_callable=print_finish, ) # 设置任务依赖关系:task1执行完成后再执行task2 task1 &gt;&gt; task2</code></pre></li>代码说明 默认参数(default_args):定义了DAG中所有任务的公共参数,如所有者、重试次数、重试间隔等。 DAG定义:使用with DAG()上下文管理器创建DAG,指定了DAG的ID、描述、调度间隔、开始时间等关键信息。 任务定义:使用PythonOperator创建两个任务,分别指定任务ID和要执行的Python函数。 依赖关系设置:使用>>运算符设置任务之间的依赖关系,task1 >> task2表示task2必须在task1执行完成后才能执行。 查看并触发DAG
    将上述代码保存到dags目录后,等待1~2分钟,调度器会自动加载该DAG。在Airflow Web UI的DAG列表中找到hello_airflow_dag,点击右侧的开关按钮启用该DAG。 启用后,可以手动触发DAG执行:点击DAG名称进入详情页,点击右上角的“Trigger DAG”按钮,即可手动启动一次DAG运行。运行完成后,可以在“Graph”视图中查看任务的执行状态,在“Log”视图中查看任务的输出日志。

3.3 常用Operator实战

除了PythonOperator,Airflow还提供了多种常用的Operator,下面我们介绍几种最常用的Operator及其用法。

3.3.1 BashOperator:执行Shell命令

BashOperator用于执行Shell命令或脚本,适合处理需要调用系统命令的任务。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    'bash_operator_dag',
    default_args=default_args,
    description='A DAG using BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['bash', 'example'],
) as dag:

    # 执行简单的Shell命令
    task1 = BashOperator(
        task_id='print_date_task',
        bash_command='date',  # 打印当前日期
    )

    # 执行Shell脚本
    task2 = BashOperator(
        task_id='run_script_task',
        bash_command='echo "Current directory: $(pwd)" && ls -l',
    )

    task1 >> task2

代码说明bash_command参数用于指定要执行的Shell命令或脚本,多个命令可以用&&连接。

3.3.2 EmailOperator:发送邮件

EmailOperator用于在任务执行完成后发送邮件通知,适合用于任务执行状态的告警。

使用EmailOperator前,需要在AIRFLOW_HOME/airflow.cfg配置文件中设置邮件服务器信息:

[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [email protected]
smtp_password = your_email_password
smtp_port = 587
smtp_mail_from = [email protected]

然后编写DAG代码:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

def task_function():
    return "Task executed successfully!"

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    'email_operator_dag',
    default_args=default_args,
    description='A DAG using EmailOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['email', 'example'],
) as dag:

    task1 = PythonOperator(
        task_id='execute_task',
        python_callable=task_function,
    )

    # 发送邮件通知
    send_email = EmailOperator(
        task_id='send_email_notification',
        to='[email protected]',  # 收件人邮箱
        subject='Airflow Task Execution Status',  # 邮件主题
        html_content='<h3>Task executed successfully!</h3>',  # 邮件内容(支持HTML)
    )

    task1 >> send_email

四、Airflow 工作流实战:数据ETL流程

4.1 需求分析

我们以一个简单的电商用户数据ETL流程为例,演示如何使用Airflow构建完整的工作流。需求如下:

  1. 数据抽取:从CSV文件中读取用户数据。
  2. 数据清洗:去除缺失值、重复值,转换数据格式。
  3. 数据加载:将清洗后的数据写入新的CSV文件。
  4. 发送通知:数据加载完成后发送邮件通知。

4.2 项目目录结构

对于复杂的Airflow项目,建议采用以下目录结构,便于管理代码和数据:

airflow_home/
├── dags/
│   ├── etl_user_data_dag.py  # ETL流程的DAG文件
│   └── etl_scripts/          # 存储ETL相关的Python脚本
│       ├── extract.py
│       ├── transform.py
│       └── load.py
├── data/                     # 存储原始数据和清洗后的数据
│   ├── raw/
│   │   └── user_data.csv
│   └── processed/
└── logs/                     # Airflow任务日志

4.3 编写ETL脚本

  1. 数据抽取脚本(extract.py) # dags/etl_scripts/extract.py import pandas as pd import os def extract_data(input_path: str) -> pd.DataFrame: """ 从CSV文件中抽取数据 :param input_path: 原始数据文件路径 :return: 抽取后的DataFrame """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input file not found: {input_path}") df = pd.read_csv(input_path) print(f"Extracted {len(df)} rows of data") return df
  2. 数据清洗脚本(transform.py) # dags/etl_scripts/transform.py import pandas as pd def transform_data(df: pd.DataFrame) -> pd.DataFrame: """ 数据清洗:去除缺失值、重复值,转换日期格式 :param df: 原始DataFrame :return: 清洗后的DataFrame """ # 去除缺失值 df = df.dropna(subset=['user_id', 'username', 'register_date']) # 去除重复值 df = df.drop_duplicates(subset=['user_id']) # 转换注册日期格式为YYYY-MM-DD df['register_date'] = pd.to_datetime(df['register_date']).dt.strftime('%Y-%m-%d') print(f"Transformed data: {len(df)} rows remaining") return df
  3. 数据加载脚本(load.py) # dags/etl_scripts/load.py import pandas as pd import os def load_data(df: pd.DataFrame, output_path: str) -> None: """ 将清洗后的数据写入CSV文件 :param df: 清洗后的DataFrame :param output_path: 输出文件路径 """ # 创建输出目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 写入CSV文件 df.to_csv(output_path, index=False) print(f"Loaded data to {output_path} successfully")

4.4 编写ETL DAG文件

# dags/etl_user_data_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from etl_scripts.extract import extract_data
from etl_scripts.transform import transform_data
from etl_scripts.load import load_data
import os

# 定义文件路径
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '~/airflow')
INPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/raw/user_data.csv')
OUTPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/processed/cleaned_user_data.csv')

# 定义默认参数
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# 定义任务函数(封装ETL脚本)
def extract_task():
    return extract_data(INPUT_PATH)

def transform_task(ti):
    # 从上游任务(extract_task)获取数据
    df = ti.xcom_pull(task_ids='extract_task')
    return transform_data(df)

def load_task(ti):
    df = ti.xcom_pull(task_ids='transform_task')
    load_data(df, OUTPUT_PATH)

# 定义DAG
with DAG(
    'etl_user_data_workflow',
    default_args=default_args,
    description='A complete ETL workflow for user data',
    schedule_interval='0 1 * * *',  # 每天凌晨1点执行
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'data_processing'],
) as dag:

    # 定义任务
    extract = PythonOperator(
        task_id='extract_task',
        python_callable=extract_task,
    )

    transform = PythonOperator(
        task_id='transform_task',
        python_callable=transform_task,
    )

    load = PythonOperator(
        task_id='load_task',
        python_callable=load_task,
    )

    send_notification = EmailOperator(
        task_id='send_notification',
        to='[email protected]',
        subject='User Data ETL Workflow Completed',
        html_content=f'<h3>ETL Workflow executed successfully!</h3><p>Cleaned data saved to: {OUTPUT_PATH}</p>',
    )

    # 设置任务依赖关系
    extract >> transform >> load >> send_notification

4.5 代码关键说明

  1. XCom通信机制
    Airflow中,任务之间的数据传递通过XCom(Cross-Communication) 实现。在上述代码中,transform_task通过ti.xcom_pull(task_ids='extract_task')获取extract_task的返回值,load_task同理获取transform_task的返回值。XCom适合传递小量数据,大量数据建议使用文件或数据库存储。
  2. 调度时间设置
    schedule_interval参数支持多种格式,除了timedelta,还可以使用Cron表达式。例如'0 1 * * *'表示每天凌晨1点执行任务,这是生产环境中最常用的调度方式。
  3. 任务失败告警
    default_args中设置了email_on_failure: True,当任务执行失败时,Airflow会自动向指定邮箱发送告警邮件。

4.6 测试与运行

  1. 准备原始数据
    AIRFLOW_HOME/data/raw目录下创建user_data.csv文件,写入以下测试数据: user_id,username,register_date,age,gender 1,alice,2024-01-01,25,F 2,bob,2024-01-02,,M 3,charlie,2024-01-03,30,M 2,bob,2024-01-02,28,M 4,david,,35,M
  2. 启动DAG
    将DAG文件和ETL脚本放入对应的目录后,在Airflow Web UI中启用etl_user_data_workflow DAG。可以手动触发一次执行,查看任务的执行状态。执行完成后,在data/processed目录下会生成cleaned_user_data.csv文件,内容为清洗后的数据。

五、Airflow 高级特性与优化建议

5.1 连接管理(Connections)

在实际项目中,任务经常需要连接外部系统(如数据库、Hadoop、云存储等)。Airflow提供了Connections功能,用于统一管理外部系统的连接信息,避免在代码中硬编码用户名、密码等敏感信息。

例如,要连接MySQL数据库,可以在Airflow Web UI的“Admin → Connections”页面创建一个新的连接:

  • Conn Idmysql_default(自定义,需在代码中引用)
  • Conn TypeMySQL
  • Host:MySQL服务器地址
  • Schema:数据库名称
  • Login:用户名
  • Password:密码
  • Port:端口号(默认3306)

在代码中可以通过Hook获取连接信息:

from airflow.providers.mysql.hooks.mysql import MySqlHook

def query_mysql_data():
    hook = MySqlHook(mysql_conn_id='mysql_default')
    df = hook.get_pandas_df(sql='SELECT * FROM users')
    return df

5.2 变量管理(Variables)

Airflow的Variables功能用于存储和管理工作流中的配置变量,如文件路径、阈值参数等。可以在Web UI的“Admin → Variables”页面添加变量,也可以在代码中通过Variable类获取变量值:

from airflow.models import Variable

# 从Variables中获取变量
input_path = Variable.get('user_data_input_path', default_var='/default/path')
output_path = Variable.get('user_data_output_path')

5.3 任务并行执行与资源限制

Airflow支持任务的并行执行,可以通过以下参数优化并行度:

  • parallelism:Airflow集群的最大并行任务数(全局参数)。
  • dag_concurrency:单个DAG的最大并行任务数。
  • max_active_runs_per_dag:单个DAG的最大活跃运行实例数。

这些参数可以在airflow.cfg配置文件中修改。

5.4 日志管理

Airflow的任务日志默认存储在本地,在生产环境中可以将日志存储到远程存储系统(如S3、HDFS、Elasticsearch等),便于日志的集中管理和查询。修改airflow.cfg中的[logging]部分即可配置远程日志存储。

六、相关资源链接

  • PyPI地址:https://pypi.org/project/apache-airflow
  • Github地址:https://github.com/apache/airflow
  • 官方文档地址:https://airflow.apache.org/docs/apache-airflow/stable/index.html

关注我,每天分享一个实用的Python自动化工具。

Python Squirrel库入门教程:高效数据缓存与持久化工具

一、Squirrel库核心概述

Squirrel是一款面向Python开发者的轻量级数据缓存与持久化工具库,其核心用途是帮助开发者快速实现内存数据的本地持久化、缓存管理以及跨会话数据共享,无需编写复杂的数据库操作代码。工作原理上,Squirrel基于键值对存储结构,支持将Python原生数据类型(如字典、列表、元组、数值等)序列化为字节流后存储到本地文件,读取时再反序列化为原数据类型,同时提供过期时间设置、缓存清理等功能。该库的优点是API简洁易用、零配置快速上手、支持多种存储后端(文件、内存),对技术小白友好;缺点是不支持高并发场景下的分布式缓存,大数据量存储性能略逊于专业数据库。Squirrel的开源协议为MIT License,开发者可自由用于商业和非商业项目,无授权限制。

二、Squirrel库安装步骤

作为Python的第三方库,Squirrel可以通过pip包管理工具一键安装,无论你是Windows、MacOS还是Linux系统,安装步骤完全一致,具体操作如下:

2.1 环境准备

确保你的电脑已经安装了Python环境(推荐Python 3.6及以上版本),可以通过在命令行中输入以下指令验证Python版本:

# 验证Python版本
import sys
print(sys.version)

运行代码后,如果输出类似3.9.7 (default, Sep 16 2021, 08:50:36) [MSC v.1916 64 bit (AMD64)]的内容,说明Python环境已就绪。

2.2 执行安装命令

打开命令行终端(Windows为CMD或PowerShell,MacOS和Linux为Terminal),输入以下安装指令:

pip install squirrel

等待终端输出Successfully installed squirrel-x.x.x(x.x.x为版本号),即表示安装成功。如果安装过程中出现网络超时问题,可以切换国内PyPI镜像源,例如使用阿里云镜像:

pip install squirrel -i https://mirrors.aliyun.com/pypi/simple/

三、Squirrel库核心API使用教程

Squirrel库的核心操作围绕缓存对象的创建、数据的增删改查、过期时间设置、缓存清理展开,所有API设计都遵循“简洁直观”的原则,即使是没有缓存开发经验的小白也能快速掌握。下面我们通过代码示例逐一讲解每个核心功能的使用方法。

3.1 初始化缓存对象

使用Squirrel的第一步是创建一个缓存实例,该实例可以指定存储后端(默认是文件存储,也可以选择内存存储)。文件存储会将数据保存到本地文件,程序重启后数据不丢失;内存存储则仅在程序运行期间保存数据,程序退出后数据自动清除。

# 导入Squirrel库的核心类
from squirrel import Cache

# 初始化文件存储缓存,数据会保存到本地的squirrel_cache.db文件
file_cache = Cache(backend="file", path="squirrel_cache.db")

# 初始化内存存储缓存,程序退出后数据丢失
memory_cache = Cache(backend="memory")

代码说明

  • backend参数用于指定存储后端,可选值为filememory,默认值为file
  • path参数仅在backend="file"时生效,用于指定缓存文件的保存路径和文件名。如果不指定,默认会在当前目录下创建squirrel_cache.db文件。

3.2 数据的添加与读取

Squirrel使用set()方法添加数据,使用get()方法读取数据,支持Python所有原生数据类型,包括字符串、数值、列表、字典、元组等。

# 向文件缓存中添加数据
# 存储字符串类型
file_cache.set("username", "python_squirrel")
# 存储数值类型
file_cache.set("age", 25)
# 存储列表类型
file_cache.set("hobbies", ["coding", "reading", "hiking"])
# 存储字典类型
file_cache.set("user_info", {"id": 1001, "name": "小明", "email": "[email protected]"})

# 从文件缓存中读取数据
username = file_cache.get("username")
age = file_cache.get("age")
hobbies = file_cache.get("hobbies")
user_info = file_cache.get("user_info")

# 打印读取的数据
print(f"用户名: {username}")
print(f"年龄: {age}")
print(f"爱好: {hobbies}")
print(f"用户信息: {user_info}")

代码说明

  • set(key, value)方法接收两个参数,key为字符串类型的键名,value为需要存储的任意Python原生数据。
  • get(key)方法接收一个参数key,返回该键对应的数值,如果键不存在,则返回None
  • 运行代码后,控制台会输出以下内容:
  用户名: python_squirrel
  年龄: 25
  爱好: ['coding', 'reading', 'hiking']
  用户信息: {'id': 1001, 'name': '小明', 'email': '[email protected]'}

即使关闭程序后重新运行读取代码,数据依然会存在,因为我们使用的是文件存储后端。

3.3 设置数据的过期时间

在很多场景下,我们需要缓存的数据在一段时间后自动失效(例如验证码、临时会话信息),Squirrel的set()方法支持通过expire参数设置过期时间,单位为

# 存储一个有效期为60秒的验证码
file_cache.set("verify_code", "852369", expire=60)

# 立即读取,此时数据未过期
verify_code = file_cache.get("verify_code")
print(f"未过期的验证码: {verify_code}")  # 输出:未过期的验证码: 852369

# 等待60秒后再次读取,数据已过期,返回None
import time
time.sleep(60)
expired_code = file_cache.get("verify_code")
print(f"过期后的验证码: {expired_code}")  # 输出:过期后的验证码: None

代码说明

  • expire参数为可选参数,默认值为None,表示数据永久有效。当指定数值时,数据会在对应的秒数后自动失效。
  • 过期的数据会在下次调用get()方法时被检测并清理,不会占用存储空间。

3.4 数据的修改与删除

修改缓存数据的方法依然是set(),只需要对同一个键名重新赋值即可;删除数据则使用delete()方法,指定需要删除的键名即可。

# 修改已存在的数据
file_cache.set("username", "squirrel_python")
updated_username = file_cache.get("username")
print(f"修改后的用户名: {updated_username}")  # 输出:修改后的用户名: squirrel_python

# 删除指定键的数据
file_cache.delete("age")
deleted_age = file_cache.get("age")
print(f"删除后的age值: {deleted_age}")  # 输出:删除后的age值: None

# 批量删除多个键的数据
file_cache.delete_many(["hobbies", "user_info"])
hobbies_after_delete = file_cache.get("hobbies")
user_info_after_delete = file_cache.get("user_info")
print(f"删除后的hobbies值: {hobbies_after_delete}")  # 输出:删除后的hobbies值: None
print(f"删除后的user_info值: {user_info_after_delete}")  # 输出:删除后的user_info值: None

代码说明

  • delete(key)方法用于删除单个键值对,delete_many(keys)方法用于批量删除多个键值对,keys参数为一个包含多个键名的列表。
  • 被删除的数据会立即从存储后端中移除,无论是文件存储还是内存存储。

3.5 缓存数据的批量操作

除了单个数据的增删改查,Squirrel还支持批量添加和批量读取数据,这在需要一次性处理大量数据时可以显著提高效率。

# 批量添加数据
batch_data = {
    "key1": "value1",
    "key2": 100,
    "key3": [1, 2, 3],
    "key4": {"a": 1, "b": 2}
}
file_cache.set_many(batch_data)

# 批量读取数据
keys_to_get = ["key1", "key2", "key3", "key4"]
batch_result = file_cache.get_many(keys_to_get)

# 打印批量读取的结果
for key, value in batch_result.items():
    print(f"{key}: {value}")

代码说明

  • set_many(data)方法接收一个字典类型的参数data,字典中的每个键值对都会被添加到缓存中。
  • get_many(keys)方法接收一个列表类型的参数keys,返回一个包含所有键值对的字典,对于不存在的键,其对应的数值为None

3.6 缓存的清空与状态查询

如果需要清空所有缓存数据,可以使用clear()方法;如果需要查询缓存中当前的键数量,可以使用count()方法。

# 查询当前缓存中的键数量
key_count = file_cache.count()
print(f"缓存中的键数量: {key_count}")

# 清空所有缓存数据
file_cache.clear()

# 再次查询键数量,此时为0
empty_count = file_cache.count()
print(f"清空后的键数量: {empty_count}")

代码说明

  • count()方法返回缓存中有效键的数量,不包含已过期的键。
  • clear()方法会删除缓存中的所有数据,操作不可逆,使用时需要谨慎。

四、Squirrel库实际应用案例

为了帮助开发者更好地理解Squirrel库在实际项目中的使用场景,下面我们以用户登录状态缓存API接口数据缓存两个常见场景为例,编写完整的代码案例,展示如何将Squirrel库集成到Python项目中。

4.1 案例一:用户登录状态缓存

在Web开发或桌面应用开发中,用户登录后需要保持登录状态,避免每次操作都重新输入用户名和密码。使用Squirrel可以将用户的登录信息缓存到本地,程序重启后依然可以保持登录状态,直到用户主动退出登录。

from squirrel import Cache
import time

# 初始化文件缓存,存储用户登录信息
login_cache = Cache(backend="file", path="login_status.db")

def user_login(username: str, password: str) -> bool:
    """
    用户登录函数,模拟验证用户名和密码
    :param username: 用户名
    :param password: 密码
    :return: 登录成功返回True,失败返回False
    """
    # 模拟数据库中的用户信息
    db_users = {
        "admin": "admin123",
        "user1": "user123",
        "user2": "user234"
    }
    # 验证用户名和密码
    if username in db_users and db_users[username] == password:
        # 登录成功,缓存用户信息,有效期2小时(7200秒)
        login_cache.set(f"login_{username}", True, expire=7200)
        login_cache.set(f"user_{username}", {"username": username, "login_time": time.time()})
        print(f"用户 {username} 登录成功!")
        return True
    else:
        print("用户名或密码错误,登录失败!")
        return False

def check_login_status(username: str) -> bool:
    """
    检查用户是否处于登录状态
    :param username: 用户名
    :return: 已登录返回True,未登录返回False
    """
    login_status = login_cache.get(f"login_{username}")
    return login_status is not None and login_status

def user_logout(username: str) -> None:
    """
    用户退出登录,清除缓存中的登录状态
    :param username: 用户名
    """
    login_cache.delete(f"login_{username}")
    login_cache.delete(f"user_{username}")
    print(f"用户 {username} 已退出登录!")

# 测试登录功能
user_login("admin", "admin123")

# 检查登录状态
if check_login_status("admin"):
    user_info = login_cache.get(f"user_admin")
    login_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(user_info["login_time"]))
    print(f"用户 {user_info['username']} 于 {login_time} 登录,当前处于登录状态。")

# 退出登录
user_logout("admin")

# 再次检查登录状态
if not check_login_status("admin"):
    print(f"用户 admin 已退出登录。")

代码说明

  • user_login()函数模拟用户登录验证,验证通过后将登录状态和用户信息缓存到本地,有效期为2小时。
  • check_login_status()函数通过读取缓存中的登录状态键,判断用户是否处于登录状态。
  • user_logout()函数通过删除缓存中的登录状态键,实现用户退出登录功能。
  • 该案例适用于桌面应用、CLI工具等需要保持用户登录状态的场景,无需依赖数据库即可实现状态持久化。

4.2 案例二:API接口数据缓存

在调用第三方API接口时,频繁请求会导致接口限流、响应速度慢等问题。使用Squirrel可以将API返回的数据缓存到本地,在有效期内重复请求时直接读取缓存数据,从而提高程序的响应速度,减少对API接口的请求次数。

from squirrel import Cache
import requests
import time

# 初始化文件缓存,存储API接口数据
api_cache = Cache(backend="file", path="api_cache.db")

def get_weather_data(city: str) -> dict:
    """
    获取城市天气数据,优先读取缓存,缓存失效后调用API接口
    :param city: 城市名称
    :return: 天气数据字典
    """
    # 定义缓存键名
    cache_key = f"weather_{city}"
    # 尝试从缓存中读取数据
    cached_data = api_cache.get(cache_key)
    if cached_data is not None:
        print(f"从缓存中读取{city}的天气数据...")
        return cached_data

    # 缓存失效,调用API接口获取数据(这里使用模拟API)
    print(f"调用API获取{city}的天气数据...")
    # 模拟API请求延迟
    time.sleep(2)
    # 模拟API返回的数据
    api_data = {
        "city": city,
        "temperature": 22,
        "weather": "sunny",
        "humidity": 45,
        "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    }

    # 将API返回的数据缓存到本地,有效期10分钟(600秒)
    api_cache.set(cache_key, api_data, expire=600)
    return api_data

# 第一次调用,缓存未命中,调用API
weather_beijing = get_weather_data("北京")
print(f"北京天气: {weather_beijing}")

# 第二次调用,缓存命中,直接读取缓存
weather_beijing_cached = get_weather_data("北京")
print(f"北京天气(缓存): {weather_beijing_cached}")

代码说明

  • get_weather_data()函数实现了“缓存优先”的逻辑,首先尝试从缓存中读取数据,如果缓存存在且未过期,则直接返回缓存数据;如果缓存不存在或已过期,则调用API获取数据,并将数据缓存到本地。
  • 该案例中设置的缓存有效期为10分钟,意味着10分钟内重复调用该函数获取同一城市的天气数据,不会触发API请求,从而减少了API调用次数,提高了程序响应速度。
  • 实际项目中,可以将模拟API替换为真实的天气API接口,例如高德地图天气API、和风天气API等。

五、Squirrel库相关资源链接

  • Pypi地址:https://pypi.org/project/Squirrel
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Upgini:零代码特征工程与数据集增强教程

一、Upgini库核心概述

Upgini是一款面向数据科学和机器学习领域的Python自动化特征工程库,核心用途是帮助开发者快速为结构化数据集生成高质量特征、对接公开数据源完成数据增强,从而提升机器学习模型的预测性能。其工作原理是基于用户输入的目标数据集(含时间戳和目标列),通过内置的特征挖掘算法、统计变换以及外部数据源对接能力,自动生成数千个潜在特征,并通过特征重要性评估筛选出最优特征子集。

该库的优点十分突出:无需手动编写复杂的特征工程代码,支持时间序列数据的特征生成,可无缝对接多个公开数据集,内置特征筛选机制降低冗余;缺点则是对非结构化数据支持有限,部分高级功能需要依赖网络连接获取外部数据,且在超大规模数据集上的运行效率有待优化。Upgini采用Apache License 2.0开源协议,允许商用和二次开发,完全免费且对开发者友好。

二、Upgini库安装与环境配置

2.1 基础安装步骤

Upgini支持通过Python官方包管理工具pip进行安装,兼容Python 3.7及以上版本,建议在虚拟环境中安装以避免依赖冲突。对于技术小白来说,操作步骤非常简单,打开命令行终端,输入以下命令即可完成安装:

# 基础版安装
pip install upgini

# 完整版安装(包含所有依赖,推荐新手使用)
pip install "upgini[full]"

安装完成后,可通过以下代码验证是否安装成功:

import upgini
print(f"Upgini版本:{upgini.__version__}")

若终端输出对应的版本号(如0.2.15),则说明安装成功;若出现报错,可尝试升级pip后重新安装,升级命令如下:

pip install --upgrade pip

2.2 环境依赖说明

Upgini的运行依赖多个常见的数据科学库,包括pandas(数据处理)、numpy(数值计算)、scikit-learn(特征评估)、lightgbm(默认特征重要性模型)等。安装upgini[full]时会自动安装这些依赖,无需手动下载。对于新手来说,建议使用Anaconda环境,该环境预装了大部分数据科学库,能进一步降低安装难度。

三、Upgini核心功能与使用示例

Upgini的核心功能围绕自动化特征生成数据集增强展开,其使用流程遵循“导入数据→配置特征生成器→生成并筛选特征→导出结果”的步骤,全程无需手动编写特征工程代码。以下通过具体案例演示核心功能的使用方法。

3.1 数据准备:导入示例数据集

在使用Upgini之前,需要准备一份结构化数据集,数据集需包含目标列(待预测的变量)和时间戳列(可选,用于时间序列特征生成)。本文以经典的房价预测数据集为例,该数据集包含房屋面积、卧室数量、建造年份等基础特征,目标是预测房屋价格。

首先导入pandas库加载数据集:

import pandas as pd

# 加载本地房价数据集(新手可直接使用sklearn的示例数据集)
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
df = pd.DataFrame(data=housing.data, columns=housing.feature_names)
df["MedHouseVal"] = housing.target  # 目标列:房屋中位数价格
df["timestamp"] = pd.date_range(start="2020-01-01", periods=len(df), freq="D")  # 添加时间戳列
print(df.head())
print(f"数据集形状:{df.shape}")

代码说明:

  • 首先从sklearn导入加利福尼亚房价数据集,该数据集是机器学习领域的经典数据集,包含20640条样本和8个基础特征。
  • 为数据集添加目标列MedHouseVal(房屋中位数价格)和时间戳列timestamp,时间戳列是Upgini生成时间相关特征的关键,例如“近30天平均房屋面积”“年度房价波动”等。
  • 最后打印数据集的前5行和形状,确认数据加载成功。

3.2 初始化特征生成器

Upgini的核心类是FeatureEnricher,该类负责特征生成、筛选和增强的全流程。初始化时需要指定目标列名称、时间戳列名称(可选)以及特征筛选的评估指标。代码示例如下:

from upgini import FeatureEnricher, SearchKey
from upgini.metadata import CVType

# 初始化特征生成器
enricher = FeatureEnricher(
    search_keys={
        # 定义时间戳列,用于生成时间相关特征
        "timestamp": SearchKey.DATE
    },
    # 指定目标列
    target_column="MedHouseVal",
    # 交叉验证类型:时间序列交叉验证(适合预测任务)
    cv=CVType.TIME_SERIES,
    # 特征筛选指标:均方根误差(RMSE)
    eval_metric="rmse"
)

代码说明:

  • search_keys参数用于定义数据集中的关键列类型,这里将timestamp列指定为日期类型(SearchKey.DATE),Upgini会基于该列生成时间窗口特征。
  • target_column参数指定待预测的目标列,特征生成器会围绕该列筛选对预测最有帮助的特征。
  • cv参数设置交叉验证类型,时间序列任务推荐使用CVType.TIME_SERIES,避免数据泄露;若为普通分类/回归任务,可使用CVType.RANDOM
  • eval_metric参数设置特征筛选的评估指标,回归任务常用rmse(均方根误差)或mae(平均绝对误差),分类任务常用auc(曲线下面积)。

3.3 自动生成与筛选特征

初始化特征生成器后,调用fit方法即可基于输入的数据集自动生成特征。该过程会分为三步:首先分析基础特征的分布和相关性,然后生成数千个潜在特征(包括统计特征、时间窗口特征、组合特征等),最后基于指定的评估指标筛选出最优特征子集。代码示例如下:

# 基于训练集生成特征
enricher.fit(df, eval_set=[(df, "validation")])

代码说明:

  • fit方法的第一个参数是训练数据集,第二个参数eval_set指定验证集,这里使用同一数据集作为验证集(新手可直接使用该设置)。
  • 运行该代码后,Upgini会在终端输出特征生成的进度,包括生成的特征总数、筛选后的特征数、以及特征对模型性能的提升幅度。例如:“生成了3250个特征,筛选出120个最优特征,验证集RMSE降低了18.5%”。
  • 特征生成过程的时间取决于数据集大小,小型数据集(万级样本)通常在1-2分钟内完成,大型数据集可能需要更长时间。

3.4 查看生成的特征与重要性

特征生成完成后,可通过get_features()方法查看筛选后的特征列表,通过feature_importance_属性查看特征的重要性排名,这有助于开发者理解哪些特征对预测任务最有帮助。代码示例如下:

# 查看筛选后的特征列表
generated_features = enricher.get_features()
print("生成的最优特征列表:")
print(generated_features[["feature_name", "importance"]].head(10))

# 可视化特征重要性(推荐新手使用)
import matplotlib.pyplot as plt
feature_importance = enricher.feature_importance_
feature_importance = feature_importance.sort_values(by="importance", ascending=False).head(10)
plt.figure(figsize=(12, 6))
plt.bar(feature_importance["feature_name"], feature_importance["importance"])
plt.xticks(rotation=45, ha="right")
plt.title("Top 10 Most Important Features")
plt.tight_layout()
plt.show()

代码说明:

  • get_features()方法返回一个DataFrame,包含特征名称、重要性得分、数据类型等信息,head(10)用于查看前10个最重要的特征。
  • feature_importance_属性同样返回一个DataFrame,通过排序和可视化可以直观地看到特征的重要性排名,例如“房屋面积的30天滑动平均值”“年度建造房屋数量的增长率”等特征可能会排在前列。
  • 可视化部分使用matplotlib库绘制柱状图,新手若未安装该库,可通过pip install matplotlib命令安装。

3.5 应用生成的特征增强数据集

筛选出最优特征后,调用transform方法即可将这些特征添加到原始数据集中,生成增强后的数据集,用于后续的机器学习模型训练。代码示例如下:

# 生成增强后的数据集
enhanced_df = enricher.transform(df)
print(f"原始数据集形状:{df.shape}")
print(f"增强后数据集形状:{enhanced_df.shape}")
print("增强后数据集的前5行:")
print(enhanced_df.head())

代码说明:

  • transform方法接收原始数据集,返回添加了生成特征的新数据集。例如原始数据集是(20640, 10),增强后可能变成(20640, 130),新增了120个特征。
  • 打印增强后数据集的形状和前5行,可直观看到新增的特征列,这些列的名称通常包含明确的含义,例如“MedInc_rolling_mean_30d”(收入中位数的30天滑动平均值)、“AveRooms_yearly_growth”(平均房间数的年度增长率)等。

3.6 对接外部数据源增强数据

Upgini的高级功能之一是对接外部公开数据源(如天气数据、人口统计数据、经济指标数据等),为数据集补充更多维度的特征。该功能需要网络连接,且部分数据源需要注册API密钥(新手可先使用免费数据源)。以下以对接天气数据源为例,演示外部数据增强的方法:

from upgini import ExternalDataset

# 加载外部天气数据集(示例:美国加州天气数据)
weather_dataset = ExternalDataset.from_csv(
    "https://example.com/california_weather_2020_2023.csv",  # 外部数据源URL
    search_keys={"date": SearchKey.DATE},  # 外部数据的时间戳列
    features=["temperature", "rainfall", "humidity"]  # 需要提取的特征
)

# 初始化特征生成器并添加外部数据集
enricher_with_external = FeatureEnricher(
    search_keys={"timestamp": SearchKey.DATE},
    target_column="MedHouseVal",
    cv=CVType.TIME_SERIES,
    eval_metric="rmse"
)

# 添加外部数据集
enricher_with_external.add_external_datasets([weather_dataset])

# 生成包含外部特征的增强数据集
enhanced_df_with_external = enricher_with_external.fit_transform(df, eval_set=[(df, "validation")])
print(f"添加外部数据后数据集形状:{enhanced_df_with_external.shape}")

代码说明:

  • ExternalDataset.from_csv方法用于加载外部CSV格式的数据集,需要指定数据源的URL、关键列类型和需要提取的特征列。
  • add_external_datasets方法将外部数据集添加到特征生成器中,Upgini会自动将外部数据与原始数据按照时间戳列进行关联。
  • fit_transform方法是fittransform的组合,可一次性完成特征生成和数据集增强,添加外部数据后,生成的特征会包含天气相关的维度,例如“温度与房屋面积的乘积”“降雨天数与房价的相关性特征”等,进一步提升模型的预测能力。

四、Upgini实战案例:房价预测模型性能提升

为了直观展示Upgini的效果,本文构建一个对比实验:分别使用原始数据集和Upgini增强后的数据集训练LightGBM回归模型,对比模型的预测性能。

4.1 实验准备:划分训练集与测试集

首先将增强前后的数据集划分为训练集和测试集,时间序列任务需按照时间顺序划分,避免数据泄露:

from sklearn.model_selection import train_test_split

# 原始数据集划分
X_original = df.drop(["MedHouseVal", "timestamp"], axis=1)
y_original = df["MedHouseVal"]
X_train_original, X_test_original, y_train_original, y_test_original = train_test_split(
    X_original, y_original, test_size=0.2, shuffle=False  # 时间序列任务shuffle=False
)

# 增强后数据集划分
X_enhanced = enhanced_df_with_external.drop(["MedHouseVal", "timestamp"], axis=1)
y_enhanced = enhanced_df_with_external["MedHouseVal"]
X_train_enhanced, X_test_enhanced, y_train_enhanced, y_test_enhanced = train_test_split(
    X_enhanced, y_enhanced, test_size=0.2, shuffle=False
)

代码说明:

  • 原始数据集删除目标列和时间戳列后作为特征矩阵X_original,目标列作为标签y_original
  • 增强后数据集的处理方式与原始数据集一致,特征矩阵X_enhanced包含原始特征和生成的特征。
  • shuffle=False确保按照时间顺序划分训练集和测试集,符合时间序列预测的业务场景。

4.2 训练LightGBM模型并评估性能

分别使用原始特征和增强特征训练LightGBM模型,评估指标采用均方根误差(RMSE)和决定系数(R²),R²越接近1表示模型拟合效果越好:

import lightgbm as lgb
from sklearn.metrics import mean_squared_error, r2_score

# 定义模型训练函数
def train_and_evaluate(X_train, X_test, y_train, y_test, model_name):
    # 初始化LightGBM回归模型
    model = lgb.LGBMRegressor(
        n_estimators=100,
        learning_rate=0.1,
        random_state=42
    )
    # 训练模型
    model.fit(X_train, y_train)
    # 预测测试集
    y_pred = model.predict(X_test)
    # 计算评估指标
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    r2 = r2_score(y_test, y_pred)
    print(f"=== {model_name} ===")
    print(f"测试集RMSE:{rmse:.4f}")
    print(f"测试集R²:{r2:.4f}")
    print("-" * 30)

# 使用原始特征训练模型
train_and_evaluate(X_train_original, X_test_original, y_train_original, y_test_original, "原始特征模型")

# 使用增强特征训练模型
train_and_evaluate(X_train_enhanced, X_test_enhanced, y_train_enhanced, y_test_enhanced, "增强特征模型")

代码说明:

  • 定义train_and_evaluate函数,封装模型训练、预测和评估的流程,方便重复调用。
  • LightGBM模型的参数设置为默认值,确保实验的公平性。
  • 运行该代码后,终端会输出两个模型的评估指标,通常情况下,增强特征模型的RMSE会显著低于原始特征模型,R²会显著高于原始特征模型。例如:原始特征模型的RMSE为0.6523,R²为0.5812;增强特征模型的RMSE为0.4215,R²为0.7986,模型性能提升明显。

4.3 实验结论分析

通过对比实验可以发现,Upgini生成的特征能够有效提升机器学习模型的预测性能,原因主要有三点:

  1. 特征维度扩展:生成了大量人工难以想到的特征,覆盖了统计、时间、组合等多个维度,丰富了数据的信息密度。
  2. 特征质量优化:通过内置的筛选机制剔除了冗余特征和噪声特征,保留了对目标列最具预测价值的特征。
  3. 外部数据补充:对接外部数据源后,引入了与业务场景相关的额外信息,进一步提升了模型的泛化能力。

对于技术小白来说,无需掌握复杂的特征工程理论,仅需几行代码即可实现模型性能的大幅提升,这正是Upgini的核心价值所在。

五、Upgini常见问题与解决方案

在使用Upgini的过程中,新手可能会遇到一些常见问题,以下列出对应的解决方案:

5.1 问题1:特征生成速度过慢

  • 原因:数据集规模过大,或本地计算资源不足。
  • 解决方案
  1. 对数据集进行抽样,使用部分数据进行特征生成,例如df_sample = df.sample(n=10000, random_state=42)
  2. 减少生成特征的数量,通过设置FeatureEnrichermax_features参数,例如max_features=50
  3. 升级硬件配置,增加内存和CPU核心数,或使用GPU加速(需安装对应的LightGBM GPU版本)。

5.2 问题2:特征生成后模型性能未提升

  • 原因:目标列与特征的相关性较弱,或数据集存在严重的缺失值、异常值。
  • 解决方案
  1. 检查原始数据集的质量,使用df.isnull().sum()查看缺失值,使用df.describe()查看异常值,提前进行数据清洗。
  2. 调整FeatureEnricher的参数,例如更换评估指标(eval_metric)、调整交叉验证类型(cv)。
  3. 添加更多的外部数据源,补充与目标列相关的业务信息。

5.3 问题3:外部数据源加载失败

  • 原因:网络连接问题,或数据源URL无效、权限不足。
  • 解决方案
  1. 检查网络连接,确保能够正常访问外部数据源的URL。
  2. 下载外部数据集到本地,使用ExternalDataset.from_csv("local_file_path.csv")加载本地文件。
  3. 确认数据源的权限,部分商用数据源需要注册API密钥并在代码中配置。

六、Upgini相关资源链接

  • Pypi地址:https://pypi.org/project/Upgini
  • Github地址:https://github.com/upgini/upgini
  • 官方文档地址:https://docs.upgini.com

关注我,每天分享一个实用的Python自动化工具。

Python实用工具库excalibur:PDF表格提取与数据处理实战教程

一、excalibur库核心概述

excalibur是一款基于Python开发的PDF表格提取与数据处理工具库,其核心工作原理是依托计算机视觉技术与OCR(光学字符识别)算法,对PDF文件中的表格区域进行精准定位、单元格分割与内容提取,最终将提取的表格数据转换为Excel、CSV等易处理的结构化格式。该库的优点在于操作简单、对扫描版PDF兼容性强,支持批量处理多份文件;缺点是对复杂嵌套表格的识别精度有待提升,处理大文件时耗时较长。excalibur采用MIT开源许可证,允许开发者自由使用、修改和分发,无商业使用限制。

二、excalibur库安装与环境配置

2.1 安装前置依赖

excalibur的运行依赖于Tesseract OCR引擎和Poppler PDF处理库,不同操作系统的安装方式有所差异:

  1. Windows系统
    • 安装Tesseract OCR:前往UB-Mannheim/tesseract下载对应版本的安装包,安装时需勾选“Add to PATH”选项,或手动将安装路径(如C:\Program Files\Tesseract-OCR)添加到系统环境变量。
    • 安装Poppler:下载Poppler Windows压缩包,解压后将bin目录路径添加到系统环境变量。
  2. macOS系统
    打开终端,通过Homebrew执行以下命令安装依赖:
    bash brew install tesseract brew install poppler
  3. Linux系统(以Ubuntu为例)
    打开终端,执行以下命令安装依赖:
    bash sudo apt-get update sudo apt-get install tesseract-ocr sudo apt-get install poppler-utils

2.2 安装excalibur库

完成依赖安装后,使用pip命令即可安装excalibur库,命令如下:

pip install excalibur

安装完成后,可在Python终端中执行以下代码验证安装是否成功:

import excalibur
print(f"excalibur库版本:{excalibur.__version__}")

若终端输出库的版本号,则说明安装成功;若出现“ModuleNotFoundError”,需检查依赖是否安装完整,或重新执行pip安装命令。

三、excalibur库核心功能与基础用法

excalibur库的核心功能分为单PDF表格提取批量PDF处理提取结果导出,以下结合代码示例详细讲解每个功能的使用方法。

3.1 单PDF文件表格提取

excalibur提取单PDF文件表格的核心步骤为:初始化提取器、加载PDF文件、定位表格区域、提取表格内容。以下是完整代码示例:

from excalibur.pdf_processing import PDFTableExtractor

# 1. 初始化PDF表格提取器
extractor = PDFTableExtractor()

# 2. 加载目标PDF文件(替换为你的PDF文件路径)
pdf_path = "example_table.pdf"
extractor.load_pdf(pdf_path)

# 3. 定位并提取PDF中的所有表格
tables = extractor.extract_tables()

# 4. 遍历输出提取的表格内容
for idx, table in enumerate(tables):
    print(f"===== 提取的第{idx+1}个表格 =====")
    # 打印表格的行数和列数
    print(f"表格行数:{len(table)}, 列数:{len(table[0]) if table else 0}")
    # 打印表格的每一行数据
    for row in table:
        print(row)

代码说明

  • PDFTableExtractor():初始化表格提取器对象,该对象包含PDF加载、表格定位、内容提取等核心方法。
  • load_pdf(pdf_path):加载指定路径的PDF文件,支持相对路径和绝对路径。
  • extract_tables():自动识别PDF中的所有表格,返回一个列表,列表中的每个元素是一个二维列表,对应一个表格的行和列数据。
  • 最后通过循环遍历提取的表格,输出每个表格的行数、列数和具体内容。

3.2 自定义表格提取参数

默认情况下,excalibur会提取PDF中的所有表格,但在实际应用中,我们可能需要提取指定页码的表格,或调整识别精度。以下是自定义参数的代码示例:

from excalibur.pdf_processing import PDFTableExtractor

# 初始化提取器并设置自定义参数
extractor = PDFTableExtractor(
    min_confidence=0.7,  # 设置最小识别置信度,低于该值的表格将被过滤
    lang="eng+chi_sim"   # 设置OCR识别语言,支持英文和简体中文
)

# 加载PDF文件
pdf_path = "multi_page_table.pdf"
extractor.load_pdf(pdf_path)

# 提取指定页码的表格(页码从0开始计数,提取第2页和第3页的表格)
target_pages = [1, 2]
tables = extractor.extract_tables(pages=target_pages)

# 输出提取结果
for idx, table in enumerate(tables):
    print(f"第{target_pages[idx]+1}页表格内容:")
    for row in table:
        print(row)

代码说明

  • min_confidence:设置表格识别的最小置信度,取值范围为0-1,值越高,识别的表格精度越高,但可能会过滤掉部分模糊表格。
  • lang:设置OCR识别的语言,默认值为“eng”,添加“chi_sim”后可支持简体中文识别,需确保Tesseract OCR已安装对应的语言包。
  • pages参数:指定需要提取表格的页码,传入一个整数列表,列表中的元素为页码索引(从0开始)。

3.3 提取结果导出为Excel/CSV文件

excalibur支持将提取的表格数据直接导出为Excel或CSV格式,方便后续数据处理。以下是导出功能的代码示例:

from excalibur.pdf_processing import PDFTableExtractor
from excalibur.utils import export_tables

# 提取PDF表格
extractor = PDFTableExtractor()
extractor.load_pdf("example_table.pdf")
tables = extractor.extract_tables()

# 1. 导出为Excel文件
excel_path = "extracted_tables.xlsx"
export_tables(
    tables=tables,
    output_path=excel_path,
    file_format="xlsx"
)
print(f"表格已成功导出到Excel文件:{excel_path}")

# 2. 导出为CSV文件
csv_path = "extracted_tables.csv"
export_tables(
    tables=tables,
    output_path=csv_path,
    file_format="csv",
    encoding="utf-8"  # 设置CSV文件编码,避免中文乱码
)
print(f"表格已成功导出到CSV文件:{csv_path}")

代码说明

  • export_tables():excalibur提供的工具函数,用于将提取的表格列表导出为指定格式的文件。
  • file_format参数:可选值为“xlsx”和“csv”,分别对应Excel和CSV格式。
  • encoding参数:仅在导出CSV文件时有效,设置为“utf-8”可解决中文乱码问题。

四、批量处理多个PDF文件实战

在实际工作中,我们经常需要处理多个PDF文件,excalibur结合Python的文件操作功能,可轻松实现批量处理。以下是批量提取多个PDF表格并导出的完整案例。

4.1 批量处理代码实现

import os
from excalibur.pdf_processing import PDFTableExtractor
from excalibur.utils import export_tables

# 定义PDF文件夹路径和输出文件夹路径
pdf_folder = "pdf_files"  # 存放待处理PDF的文件夹
output_folder = "extracted_results"  # 存放导出结果的文件夹

# 创建输出文件夹(若不存在)
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 初始化表格提取器
extractor = PDFTableExtractor(min_confidence=0.6, lang="eng+chi_sim")

# 遍历PDF文件夹中的所有PDF文件
for filename in os.listdir(pdf_folder):
    if filename.endswith(".pdf"):
        # 拼接PDF文件的完整路径
        pdf_path = os.path.join(pdf_folder, filename)
        print(f"正在处理文件:{filename}")

        try:
            # 加载并提取表格
            extractor.load_pdf(pdf_path)
            tables = extractor.extract_tables()

            if not tables:
                print(f"文件{filename}中未检测到表格,跳过导出")
                continue

            # 生成输出文件名(与PDF文件名一致,后缀改为xlsx)
            output_filename = os.path.splitext(filename)[0] + ".xlsx"
            output_path = os.path.join(output_folder, output_filename)

            # 导出表格到Excel文件
            export_tables(tables, output_path, file_format="xlsx")
            print(f"文件{filename}处理完成,结果已保存到:{output_path}")

        except Exception as e:
            print(f"处理文件{filename}时出错:{str(e)}")
            continue

print("所有PDF文件处理完成!")

代码说明

  • os.listdir(pdf_folder):遍历指定文件夹中的所有文件,筛选出后缀为“.pdf”的文件进行处理。
  • os.path.splitext(filename)[0]:获取PDF文件的文件名(不含后缀),用于生成对应的Excel文件名。
  • try-except块:捕获处理过程中的异常(如文件损坏、权限不足等),避免单个文件处理失败导致整个批量任务终止。

4.2 批量处理注意事项

  1. 确保pdf_folder文件夹中仅存放需要处理的PDF文件,避免其他类型文件干扰。
  2. 处理大文件或大量文件时,建议设置合理的min_confidence值,平衡识别精度和处理速度。
  3. 若PDF文件包含加密或权限限制,需先解除限制后再进行处理,否则会抛出“PermissionError”异常。

五、复杂表格提取优化技巧

对于嵌套表格、合并单元格表格等复杂结构,excalibur的默认识别效果可能不佳,以下是几种优化技巧,帮助提升复杂表格的提取精度。

5.1 调整单元格分割阈值

excalibur通过调整单元格分割阈值,可优化合并单元格的识别效果,代码示例如下:

from excalibur.pdf_processing import PDFTableExtractor

# 初始化提取器并调整分割阈值
extractor = PDFTableExtractor(
    cell_split_threshold=0.8,  # 单元格分割阈值,值越高越容易识别合并单元格
    min_confidence=0.6
)

# 加载包含合并单元格的PDF
extractor.load_pdf("complex_table.pdf")
tables = extractor.extract_tables()

# 输出优化后的提取结果
for table in tables:
    for row in table:
        print(row)

代码说明

  • cell_split_threshold:单元格分割阈值,取值范围为0-1,值越高,提取器越倾向于将相邻的单元格视为独立单元格,适用于合并单元格较多的表格。

5.2 结合人工校对修正提取结果

对于识别误差较大的表格,可通过人工校对修正提取结果,以下是修正数据的代码示例:

from excalibur.pdf_processing import PDFTableExtractor

# 提取表格
extractor = PDFTableExtractor()
extractor.load_pdf("complex_table.pdf")
tables = extractor.extract_tables()

# 假设第一个表格存在识别误差,手动修正
corrected_table = []
for row_idx, row in enumerate(tables[0]):
    corrected_row = row.copy()
    # 修正第2行第3列的数据
    if row_idx == 1:
        corrected_row[2] = "修正后的数据"
    # 修正第4行第1列的数据
    if row_idx == 3:
        corrected_row[0] = "2024-01-01"
    corrected_table.append(corrected_row)

# 替换原表格中的错误数据
tables[0] = corrected_table

# 导出修正后的表格
from excalibur.utils import export_tables
export_tables(tables, "corrected_tables.xlsx", file_format="xlsx")

代码说明:通过遍历提取的表格数据,定位错误数据的位置并手动修正,再将修正后的表格导出为Excel文件,适用于对数据精度要求较高的场景。

六、excalibur库常见问题与解决方案

6.1 OCR识别中文乱码

问题现象:提取的表格中中文内容显示为乱码或方框。
解决方案

  1. 确保Tesseract OCR已安装简体中文语言包(下载地址:tesseract-ocr/tessdata),将chi_sim.traineddata文件放入Tesseract OCR的tessdata目录。
  2. 初始化提取器时,设置lang="eng+chi_sim"参数。

6.2 无法识别扫描版PDF表格

问题现象:提取扫描版PDF时,返回的表格列表为空。
解决方案

  1. 检查Tesseract OCR是否安装正确,可在终端执行tesseract --version验证。
  2. 降低min_confidence参数值(如设置为0.5),提高提取器对模糊表格的识别灵敏度。

6.3 处理大文件时内存溢出

问题现象:处理几十MB的大PDF文件时,程序抛出“MemoryError”异常。
解决方案

  1. 分页码提取表格,避免一次性加载整个PDF文件。
  2. 关闭其他占用内存的程序,或增加Python进程的内存限制。

七、excalibur库相关资源

  • Pypi地址:https://pypi.org/project/excalibur
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:rows库快速入门与实战指南

一、rows库核心概述

rows库是一款轻量级的Python数据处理工具,专注于结构化数据的读取、转换与导出,支持CSV、JSON、HTML、SQLite等多种常见数据格式,无需手动编写格式解析代码即可实现数据的无缝处理。其工作原理是通过统一的Table对象抽象各类数据源,将不同格式的数据转化为一致的内存数据结构,再提供简洁的API完成数据操作。

该库的优点是API简洁易用、零配置开箱即用、格式兼容性强;缺点是对超大规模数据集的处理效率较低,且高级数据清洗功能需依赖其他库。rows的开源协议为GNU Lesser General Public License v3.0 (LGPLv3),允许自由使用、修改和分发。

二、rows库安装与环境准备

2.1 安装方式

rows库可通过Python官方包管理工具pip直接安装,适用于Python 3.6及以上版本,在命令行中执行以下命令即可完成安装:

pip install rows

若需要支持更多数据格式(如Excel、Parquet),可安装扩展依赖包:

pip install rows[all]

2.2 验证安装

安装完成后,可在Python交互式环境中验证是否安装成功,输入以下代码无报错则说明安装正常:

import rows
print(rows.__version__)

执行后会输出当前rows库的版本号,例如0.6.1

三、rows库核心API与基础用法

rows库的核心操作围绕数据读取数据操作数据导出三个环节展开,所有操作都基于统一的Table对象,下面分步骤详细讲解。

3.1 数据读取:从多种格式加载数据

rows库支持自动识别数据源格式,无需指定格式类型即可读取,以下是常见格式的读取示例。

3.1.1 读取CSV文件

首先准备一个示例CSV文件students.csv,内容如下:

name,age,gender,score
Alice,18,Female,92
Bob,19,Male,85
Charlie,20,Male,78
Diana,18,Female,95

使用rows库读取该文件的代码如下:

import rows

# 读取CSV文件
table = rows.import_from_csv("students.csv")

# 查看Table对象的字段名
print(table.field_names)
# 输出:('name', 'age', 'gender', 'score')

# 遍历数据行
for row in table:
    print(f"姓名:{row.name},年龄:{row.age},成绩:{row.score}")

代码说明

  • rows.import_from_csv()函数接收文件路径作为参数,返回一个Table对象。
  • Table对象的field_names属性存储了数据的列名,返回一个元组。
  • 遍历Table对象时,每一个元素都是一个行对象,可通过列名直接访问对应的值。

3.1.2 读取JSON文件

准备示例JSON文件students.json,内容如下:

[
    {"name": "Alice", "age": 18, "gender": "Female", "score": 92},
    {"name": "Bob", "age": 19, "gender": "Male", "score": 85},
    {"name": "Charlie", "age": 20, "gender": "Male", "score": 78},
    {"name": "Diana", "age": 18, "gender": "Female", "score": 95}
]

读取JSON文件的代码与读取CSV类似,仅需替换函数名:

import rows

# 读取JSON文件
table = rows.import_from_json("students.json")

# 访问指定行的指定字段
print(f"第一个学生的成绩:{table[0].score}")
# 输出:第一个学生的成绩:92

代码说明

  • rows.import_from_json()专门用于读取JSON格式数据。
  • Table对象支持通过索引访问指定行,与列表的索引用法一致。

3.1.3 读取HTML表格

rows库还能直接解析HTML页面中的表格数据,例如有一个students.html文件,内容如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>学生信息表</title>
</head>
<body>
    <table id="student-table">
        <thead>
            <tr>
                <th>name</th>
                <th>age</th>
                <th>gender</th>
                <th>score</th>
            </tr>
        </thead>
        <tbody>
            <tr>
                <td>Alice</td>
                <td>18</td>
                <td>Female</td>
                <td>92</td>
            </tr>
            <tr>
                <td>Bob</td>
                <td>19</td>
                <td>Male</td>
                <td>85</td>
            </tr>
        </tbody>
    </table>
</body>
</html>

读取HTML表格的代码如下:

import rows

# 读取HTML文件中的表格
table = rows.import_from_html("students.html")

# 查看数据行数
print(f"表格共有 {len(table)} 行数据")
# 输出:表格共有 2 行数据

代码说明

  • rows.import_from_html()默认读取HTML中第一个表格,若需读取指定表格,可通过idclass筛选,例如rows.import_from_html("students.html", id_="student-table")

3.2 数据操作:筛选、排序与转换

获取Table对象后,可对数据进行筛选、排序等操作,rows库支持原生Python语法结合自身API完成这些操作。

3.2.1 数据筛选

筛选出成绩大于90分的学生,代码如下:

import rows

# 读取CSV数据
table = rows.import_from_csv("students.csv")

# 筛选成绩>90的学生
high_score_students = [row for row in table if row.score > 90]

# 输出筛选结果
for student in high_score_students:
    print(f"高分学生:{student.name},成绩:{student.score}")

代码说明

  • 利用Python列表推导式遍历Table对象,结合条件判断实现数据筛选。
  • 筛选结果是一个包含行对象的列表,可直接遍历访问字段。

3.2.2 数据排序

对学生数据按年龄升序排列,代码如下:

import rows

table = rows.import_from_csv("students.csv")

# 按年龄升序排序
sorted_table = rows.sort(table, key="age")

# 输出排序后的结果
for row in sorted_table:
    print(f"姓名:{row.name},年龄:{row.age}")

代码说明

  • rows.sort()函数接收两个关键参数:table为待排序的Table对象,key为排序依据的字段名。
  • 若需降序排序,可添加reverse=True参数,例如rows.sort(table, key="score", reverse=True)

3.2.3 数据转换

Table对象转换为Python字典列表,方便与其他库(如pandas)配合使用,代码如下:

import rows

table = rows.import_from_csv("students.csv")

# 转换为字典列表
dict_list = rows.export_to_dicts(table)

print(dict_list)
# 输出:[{'name': 'Alice', 'age': 18, 'gender': 'Female', 'score': 92}, ...]

代码说明

  • rows.export_to_dicts()函数将Table对象转换为字典列表,每个字典的键为字段名,值为对应的数据。

3.3 数据导出:保存为多种格式

处理完成的数据可通过rows库导出为CSV、JSON、SQLite等格式,满足不同场景的需求。

3.3.1 导出为CSV文件

将筛选后的高分学生数据导出为新的CSV文件,代码如下:

import rows

table = rows.import_from_csv("students.csv")
high_score_students = [row for row in table if row.score > 90]

# 将列表转换为Table对象
new_table = rows.Table(rows.fields_from_table(table), high_score_students)

# 导出为CSV文件
rows.export_to_csv(new_table, "high_score_students.csv")

代码说明

  • 先通过列表推导式筛选数据,再用rows.Table()将列表转换为Table对象,其中rows.fields_from_table(table)用于获取原表的字段结构。
  • rows.export_to_csv()接收Table对象和目标文件路径,完成导出。

3.3.2 导出为JSON文件

将排序后的数据导出为JSON文件,代码如下:

import rows

table = rows.import_from_csv("students.csv")
sorted_table = rows.sort(table, key="score", reverse=True)

# 导出为JSON文件
rows.export_to_json(sorted_table, "sorted_students.json")

代码说明

  • rows.export_to_json()函数的用法与导出CSV类似,直接传入Table对象和目标路径即可。

3.3.3 导出为SQLite数据库

将学生数据导出为SQLite数据库表,方便后续的数据库操作,代码如下:

import rows

table = rows.import_from_csv("students.csv")

# 导出为SQLite数据库,表名为students
rows.export_to_sqlite(table, "students.db", table_name="students")

代码说明

  • rows.export_to_sqlite()函数接收三个参数:Table对象、数据库文件路径、数据库表名。
  • 执行后会生成一个students.db文件,可使用SQLite工具或Python的sqlite3库连接查询。

四、rows库实战案例:多格式数据整合分析

本案例模拟一个实际场景:从CSV、JSON、HTML三种不同格式的文件中读取学生数据,合并后进行统一分析,最后导出为SQLite数据库。

4.1 准备数据源

  1. CSV数据源students_1.csv
    csv name,age,gender,score Eve,20,Female,88 Frank,19,Male,90
  2. JSON数据源students_2.json
    json [ {"name": "Grace", "age": 18, "gender": "Female", "score": 93}, {"name": "Henry", "age": 21, "gender": "Male", "score": 82} ]
  3. HTML数据源students_3.html
    html ¨K51K

4.2 数据读取与合并

编写代码读取三个数据源并合并为一个Table对象:

import rows

# 读取不同格式的数据源
table_csv = rows.import_from_csv("students_1.csv")
table_json = rows.import_from_json("students_2.json")
table_html = rows.import_from_html("students_3.html")

# 合并所有数据行
all_rows = list(table_csv) + list(table_json) + list(table_html)

# 创建合并后的Table对象
merged_table = rows.Table(rows.fields_from_table(table_csv), all_rows)

# 查看合并后的数据行数
print(f"合并后共有 {len(merged_table)} 条学生数据")

代码说明

  • 分别读取三种格式的数据,得到三个独立的Table对象。
  • 将每个Table对象转换为列表,再通过列表相加实现数据行合并。
  • 利用第一个表的字段结构创建新的Table对象,确保合并后的数据结构一致。

4.3 数据分析与处理

对合并后的数据进行以下分析:

  1. 计算所有学生的平均成绩
  2. 筛选出年龄小于20岁的学生
  3. 按成绩降序排序
import rows

# 延续上一步的merged_table
# 1. 计算平均成绩
total_score = sum(row.score for row in merged_table)
average_score = total_score / len(merged_table)
print(f"所有学生的平均成绩:{average_score:.2f}")

# 2. 筛选年龄小于20岁的学生
young_students = [row for row in merged_table if row.age < 20]
young_table = rows.Table(rows.fields_from_table(merged_table), young_students)
print(f"年龄小于20岁的学生共有 {len(young_table)} 人")

# 3. 按成绩降序排序
sorted_table = rows.sort(merged_table, key="score", reverse=True)
print("成绩排名前三的学生:")
for i in range(3):
    print(f"第{i+1}名:{sorted_table[i].name},成绩:{sorted_table[i].score}")

代码说明

  • 利用生成器表达式计算成绩总和,再除以数据行数得到平均成绩。
  • 筛选年龄小于20岁的学生后,转换为Table对象以便后续导出。
  • 按成绩降序排序后,通过索引获取前三名学生的信息。

4.4 结果导出

将排序后的完整数据导出为SQLite数据库,将年龄小于20岁的学生数据导出为CSV文件:

import rows

# 延续上一步的sorted_table和young_table
# 导出排序后的数据到SQLite
rows.export_to_sqlite(sorted_table, "merged_students.db", table_name="all_students")

# 导出年轻学生数据到CSV
rows.export_to_csv(young_table, "young_students.csv")

print("数据导出完成!")

代码说明

  • 排序后的完整数据存入SQLite数据库,方便后续的查询和管理。
  • 年轻学生数据导出为CSV文件,便于快速查看和分享。

五、rows库常见问题与解决方案

5.1 数据类型自动识别错误

问题:读取CSV文件时,数字字段被识别为字符串类型,导致无法进行数值计算。
解决方案:使用rows.transform函数手动指定字段类型,示例代码如下:

import rows
from rows.fields import IntegerField, FloatField

# 定义字段类型
class StudentTable(rows.Table):
    name = rows.fields.TextField
    age = IntegerField
    gender = rows.fields.TextField
    score = FloatField

# 读取CSV并应用字段类型
table = rows.import_from_csv("students.csv", force_types=StudentTable)

# 验证类型
print(type(table[0].score))  # 输出:<class 'float'>

5.2 不支持的文件格式

问题:尝试读取Excel文件时,提示“未找到对应的导入函数”。
解决方案:安装扩展依赖rows[excel],然后使用rows.import_from_xlsx()函数读取,示例代码如下:

pip install rows[excel]
import rows
table = rows.import_from_xlsx("students.xlsx")

5.3 大规模数据处理效率低

问题:读取超大CSV文件时,内存占用过高,程序运行缓慢。
解决方案:rows库不适合处理超大规模数据,建议结合pandas库使用,先用rows读取数据转换为字典列表,再传入pandas的DataFrame

import rows
import pandas as pd

table = rows.import_from_csv("large_students.csv")
df = pd.DataFrame(rows.export_to_dicts(table))
# 使用pandas进行高效处理

六、rows库相关资源

  • PyPI地址:https://pypi.org/project/rows
  • Github地址:https://github.com/turicas/rows
  • 官方文档地址:https://rows.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。