Python实用工具库:more‑itertools 详细使用教程

一、more‑itertools 库简介

more‑itertools 是对 Python 内置 itertools 模块的扩展与增强库,专注提供更丰富、更易用的迭代器与可迭代对象处理工具,核心原理是基于惰性迭代思想,避免一次性加载全部数据,提升内存使用效率。它补齐了内置迭代工具缺失的常用功能,使用简洁、兼容性强,适合数据处理、流程遍历、算法实现等场景。该库采用 MIT License 开源,无商业使用限制,优点是易用、轻量、性能优秀,缺点是功能与内置库部分重叠,复杂场景需搭配其他工具使用。

二、more‑itertools 安装方法

more‑itertools 是纯 Python 第三方库,不依赖复杂编译环境,使用 pip 即可快速完成安装。
打开命令行终端,执行以下安装命令:

pip install more-itertools

若需要指定版本安装,可使用:

pip install more-itertools==10.5.0

安装完成后,在 Python 交互环境中执行导入语句,无报错则说明安装成功:

import more_itertools

三、more‑itertools 基础功能与代码示例

3.1 chunked:按指定大小分割可迭代对象

在数据分批处理、分页读取、批量写入文件/数据库时,经常需要把长列表、生成器等按固定大小切分,chunked 可以高效完成这一操作,且支持惰性迭代。

代码示例:

from more_itertools import chunked

# 原始列表
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 每3个元素分为一组
result = list(chunked(data, 3))

print("分块结果:", result)

运行结果:

分块结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

代码说明:chunked 接收可迭代对象与分块大小,返回一个迭代器,每一次迭代输出一个指定长度的子列表,最后不足长度时直接保留剩余元素。

3.2 flatten:扁平化嵌套可迭代对象

处理多层嵌套列表、元组时,手动递归展开容易出错且代码冗余,flatten 可以将任意深度的嵌套结构扁平化为一维迭代器。

代码示例:

from more_itertools import flatten

# 多层嵌套数据
nested_data = [1, [2, [3, 4], 5], 6, [[7]], 8]
# 扁平化处理
flat_result = list(flatten(nested_data))

print("扁平化结果:", flat_result)

运行结果:

扁平化结果: [1, 2, 3, 4, 5, 6, 7, 8]

代码说明:flatten 会自动识别所有可迭代对象(字符串除外),逐层展开,返回一个不含嵌套结构的迭代器,适用于日志解析、数据清洗、接口返回结构处理等场景。

3.3 unique_everseen:保留迭代对象中首次出现的元素

去重是数据处理高频操作,unique_everseen 可以在迭代过程中只保留第一次出现的元素,且保持原有顺序,比 set 更适合有序去重。

代码示例:

from more_itertools import unique_everseen

# 包含重复元素的列表
repeat_data = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
# 保留首次出现的元素,去重并保持顺序
unique_result = list(unique_everseen(repeat_data))

print("有序去重结果:", unique_result)

运行结果:

有序去重结果: [3, 1, 4, 5, 9, 2, 6]

代码说明:该函数内部使用集合记录已出现元素,遍历过程中只返回未出现过的元素,适合大数据量流式去重,无需加载全部数据到内存。

3.4 windowed:滑动窗口遍历可迭代对象

时间序列分析、滑动平均、NLP 文本窗口处理等场景常需要滑动窗口功能,windowed 可以生成指定大小的连续滑动窗口。

代码示例:

from more_itertools import windowed

# 原始序列
seq = [1, 2, 3, 4, 5, 6]
# 窗口大小为3
window_result = list(windowed(seq, 3))

print("滑动窗口结果:", window_result)

运行结果:

滑动窗口结果: [(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)]

代码说明:windowed 会从序列起始位置开始,每次向后移动一个位置,截取指定长度的窗口,返回元组形式的迭代器,支持设置填充值处理边界。

3.5 first:获取可迭代对象中第一个满足条件的元素

在大量数据中快速查找第一个符合条件的元素时,使用 first 比循环 break 更简洁,且可以设置默认值避免异常。

代码示例:

from more_itertools import first

# 数字列表
num_list = [1, 3, 5, 8, 9, 10]
# 获取第一个偶数
first_even = first(x for x in num_list if x % 2 == 0)
# 无满足条件时返回默认值
default_val = first(x for x in num_list if x > 20), default="未找到"

print("第一个偶数:", first_even)
print("大于20的数:", default_val)

运行结果:

第一个偶数: 8
大于20的数: 未找到

代码说明:first 接收一个生成器表达式,返回第一个满足条件的元素,若没有匹配项且设置了默认值,则返回默认值,避免 StopIteration 异常。

3.6 last:获取可迭代对象最后一个元素

last 用于快速获取可迭代对象的最后一个元素,支持设置默认值,无需将迭代器转为列表再索引。

代码示例:

from more_itertools import last

# 测试数据
test_data = [10, 20, 30, 40, 50]
# 获取最后一个元素
last_item = last(test_data)

print("最后一个元素:", last_item)

运行结果:

最后一个元素: 50

代码说明:对于生成器等无法直接索引的对象,last 依然可以正常获取最后一项,比手动遍历更简洁高效。

3.7 partition:按条件将可迭代对象分为两组

partition 可以根据判断条件,将一个可迭代对象分为满足条件不满足条件两部分,返回两个迭代器,一次遍历完成分类。

代码示例:

from more_itertools import partition

# 数字列表
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
# 按是否大于4分组
is_gt4, no_gt4 = partition(lambda x: x > 4, numbers)

print("大于4的元素:", list(is_gt4))
print("不大于4的元素:", list(no_gt4))

运行结果:

大于4的元素: [5, 6, 7, 8]
不大于4的元素: [1, 2, 3, 4]

代码说明:partition 只遍历一次数据,内存占用低,适合数据分类、日志筛选、权限判断等场景。

3.8 padded:为迭代对象补充填充值

当迭代对象长度不足时,padded 可以使用指定值填充到目标长度,避免索引越界与长度不匹配问题。

代码示例:

from more_itertools import padded

# 短列表
short_list = [1, 2, 3]
# 填充到长度为6,填充值为0
padded_list = list(padded(short_list, fillvalue=0, n=6))

print("填充后结果:", padded_list)

运行结果:

填充后结果: [1, 2, 3, 0, 0, 0]

代码说明:该函数常用于模型输入对齐、表格数据补齐、固定长度序列化等场景。

3.9 divide:将可迭代对象均匀分成N份

divide 可以把可迭代对象均匀分成指定份数,适合多进程/多线程任务分片处理。

代码示例:

from more_itertools import divide

# 原始数据
source = list(range(10))
# 均匀分成3份
parts = list(divide(3, source))

print("均匀分份结果:", [list(p) for p in parts])

运行结果:

均匀分份结果: [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]

代码说明:divide 会尽量让每份长度接近,长度无法整除时,前面的分片稍长,适合任务分发、并行计算。

四、more‑itertools 综合实战案例

4.1 学生成绩数据清洗与统计

场景:读取一批带重复、嵌套、缺失的学生成绩数据,完成去重、扁平化、分块、筛选及格成绩等操作。

完整代码:

from more_itertools import (
    chunked,
    flatten,
    unique_everseen,
    partition,
    windowed
)

# 原始数据:存在重复、嵌套、异常值
raw_scores = [
    [85, 92, [78, 65], 88],
    [92, 76, 85, [60, 58]],
    [78, 90, 92, 85]
]

# 1. 扁平化数据
flat_scores = list(flatten(raw_scores))
print("扁平化后成绩:", flat_scores)

# 2. 去重并保持顺序
unique_scores = list(unique_everseen(flat_scores))
print("去重后成绩:", unique_scores)

# 3. 分为及格(≥60)与不及格(<60)
pass_scores, fail_scores = partition(lambda x: x >= 60, unique_scores)
print("及格成绩:", list(pass_scores))
print("不及格成绩:", list(fail_scores))

# 4. 每3个成绩分块,便于批量处理
score_chunks = list(chunked(unique_scores, 3))
print("成绩分块:", score_chunks)

# 5. 使用滑动窗口计算连续3个成绩的平均值
print("连续3个成绩滑动平均:")
for window in windowed(unique_scores, 3):
    if None not in window:
        avg = sum(window) / len(window)
        print(f"{window} 平均值 = {avg:.2f}")

运行结果:

扁平化后成绩: [85, 92, 78, 65, 88, 92, 76, 85, 60, 58, 78, 90, 92, 85]
去重后成绩: [85, 92, 78, 65, 88, 76, 60, 58, 90]
及格成绩: [85, 92, 78, 65, 88, 76, 60, 90]
不及格成绩: [58]
成绩分块: [[85, 92, 78], [65, 88, 76], [60, 58, 90]]
连续3个成绩滑动平均:
(85, 92, 78) 平均值 = 85.00
(92, 78, 65) 平均值 = 78.33
(78, 65, 88) 平均值 = 77.00
(65, 88, 76) 平均值 = 76.33
(88, 76, 60) 平均值 = 74.67
(76, 60, 58) 平均值 = 64.67
(60, 58, 90) 平均值 = 69.33

案例说明:本案例完整使用 more‑itertools 多个核心函数,实现从原始脏数据到清洗、分类、分块、统计分析的全流程,代码简洁、可读性高、内存占用低,体现了该库在实际数据处理中的价值。

4.2 日志文本处理实战

场景:读取多行日志内容,去除重复行、按行数分块、提取包含关键词的行。

代码示例:

from more_itertools import chunked, unique_everseen, flatten

# 模拟日志数据
logs = [
    "INFO: start service",
    "ERROR: connect failed",
    "INFO: start service",
    "WARNING: low memory",
    "ERROR: connect failed",
    "INFO: service running"
]

# 去重
unique_logs = list(unique_everseen(logs))
print("去重后日志:")
for log in unique_logs:
    print(log)

# 每2条日志分块
log_chunks = list(chunked(unique_logs, 2))
print("\n日志分块存储:", log_chunks)

# 筛选错误日志
error_logs = [log for log in unique_logs if "ERROR" in log]
print("\n错误日志:", error_logs)

运行结果:

去重后日志:
INFO: start service
ERROR: connect failed
WARNING: low memory
INFO: service running

日志分块存储: [['INFO: start service', 'ERROR: connect failed'], ['WARNING: low memory', 'INFO: service running']]

错误日志: ['ERROR: connect failed']

案例说明:日志处理通常数据量大、存在重复,使用 more‑itertools 可以在惰性迭代的基础上快速完成去重、分块、筛选,大幅简化代码。

五、相关资源

  • Pypi地址:https://pypi.org/project/more-itertools
  • Github地址:https://github.com/more-itertools/more-itertools
  • 官方文档地址:https://more-itertools.readthedocs.io

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:attrs库详解——告别手写样板代码,优雅定义数据类

一、attrs库概述

attrs是一款专注于简化Python类定义的第三方库,核心作用是自动生成类的构造函数、字符串表示、比较方法等样板代码,让开发者聚焦业务逻辑而非重复编码。其原理是通过装饰器与元编程,在类定义阶段自动注入常用魔法方法,采用MIT开源许可,轻量无侵入、兼容所有Python版本,优点是代码简洁、可读性高、减少bug,缺点是极端极简场景下会轻微增加微小的性能开销。

二、attrs库安装方法

attrs的安装流程十分简单,支持pip、conda等主流包管理工具,对Python环境无特殊要求,无论是普通项目、数据分析脚本还是Web开发项目,都能无缝集成使用。

打开命令行工具,执行以下pip安装命令:

pip install attrs

若需要指定安装版本,可使用:

pip install attrs==23.1.0

安装完成后,在Python交互环境中执行导入语句,若无报错则说明安装成功:

import attrs
print(attrs.__version__)

上述代码用于验证attrs库是否正确安装并查看当前版本号,确保后续代码可以正常运行。

三、attrs基础使用与核心功能

3.1 基础数据类定义

传统Python定义数据类时,需要手动编写__init____repr____eq__等方法,代码冗余且容易出错,attrs通过@define装饰器(新版推荐)和field()字段定义,一键完成样板代码生成。

import attrs

@attrs.define
class Person:
    # 定义字段,无需编写构造函数
    name: str
    age: int
    city: str

# 实例化对象
p = Person("张三", 25, "北京")

# 自动生成__repr__方法,直接打印清晰可见
print(p)
# 输出:Person(name='张三', age=25, city='北京')

代码说明:使用@attrs.define装饰Person类,只需要声明字段类型,attrs会自动生成构造函数、字符串表示方法,不需要手动实现任何样板代码,大幅减少重复工作。

3.2 字段参数配置

attrs提供丰富的字段配置项,支持默认值、类型校验、是否参与比较、是否参与初始化等功能,满足不同业务场景需求。

import attrs

@attrs.define
class Student:
    # 必填字段
    name: str
    # 带默认值的字段
    age: int = 18
    # 不参与比较、不参与初始化的字段
    score: float = attrs.field(default=0.0, eq=False, init=False)

# 只传必填参数
s1 = Student("小明")
s2 = Student("小红", 20)

# 比较对象,score字段不参与比较
print(s1 == s2)  # 输出:False

# 给不参与初始化的字段赋值
s1.score = 95.5
print(s1)  # 输出:Student(name='小明', age=18, score=95.5)

代码说明:attrs.field()支持default设置默认值、eq控制是否参与相等比较、init控制是否在构造函数中初始化,灵活适配复杂类定义需求。

3.3 类型校验与转换

attrs支持开启严格类型校验,避免传入错误类型的数据,提升代码健壮性,同时支持自定义类型转换。

import attrs

@attrs.define(kw_only=True)
class Product:
    name: str
    price: float
    stock: int

    # 开启类型校验
    @price.validator
    def _check_price(self, attribute, value):
        if value < 0:
            raise ValueError("价格不能为负数")

# 正确赋值
p1 = Product(name="笔记本", price=5999.0, stock=100)
print(p1)

# 错误赋值:价格为负,触发校验异常
# p2 = Product(name="键盘", price=-99.0, stock=50)
# 抛出 ValueError: 价格不能为负数

代码说明:通过装饰器为字段添加校验器,在实例化时自动校验数据合法性,提前拦截错误数据,避免程序运行时出现异常。

3.4 不可变数据类

在多线程、数据传递等场景中,不可变对象更安全,attrs可轻松定义不可变类,禁止修改对象属性。

import attrs

@attrs.define(frozen=True)
class ImmutablePoint:
    x: int
    y: int

point = ImmutablePoint(10, 20)
print(point)

# 尝试修改属性,会触发异常
# point.x = 100
# 抛出 FrozenInstanceError: cannot assign to field 'x'

代码说明:设置frozen=True即可创建不可变对象,对象实例化后无法修改任何字段,适合常量、配置、坐标等不允许变更的数据结构。

3.5 字典与对象互转

在接口开发、数据存储、JSON序列化场景中,经常需要在对象和字典之间转换,attrs内置转换方法,无需手动遍历赋值。

import attrs

@attrs.define
class User:
    username: str
    password: str
    is_admin: bool = False

# 对象转字典
user = User("admin", "123456", True)
user_dict = attrs.asdict(user)
print(user_dict)
# 输出:{'username': 'admin', 'password': '123456', 'is_admin': True}

# 字典转对象
new_user = User(**user_dict)
print(new_user)

代码说明:attrs.asdict()可以将类实例快速转换为字典格式,方便存入数据库或返回接口数据,同时也能通过字典解包快速创建对象。

四、进阶使用技巧

4.1 继承与子类扩展

attrs完美支持类继承,子类可以继承父类的字段与方法,同时扩展自身字段,不会出现传统继承中构造函数冲突问题。

import attrs

@attrs.define
class Animal:
    name: str
    sound: str

@attrs.define
class Dog(Animal):
    breed: str
    age: int = 1

dog = Dog("旺财", "汪汪汪", "金毛")
print(dog)
# 输出:Dog(name='旺财', sound='汪汪汪', breed='金毛', age=1)

代码说明:子类Dog继承父类Animal,自动继承所有字段和生成的方法,只需要扩展新字段即可,继承逻辑简洁清晰。

4.2 自定义方法与属性

attrs生成的类和普通Python类完全一致,支持自定义实例方法、静态方法、属性方法,不影响原有功能。

import attrs

@attrs.define
class Circle:
    radius: float

    # 自定义属性:计算面积
    @property
    def area(self):
        return 3.14159 * self.radius ** 2

    # 自定义实例方法
    def show_info(self):
        return f"半径:{self.radius},面积:{self.area:.2f}"

c = Circle(5)
print(c.area)
print(c.show_info())

代码说明:attrs只负责生成样板方法,不限制自定义逻辑,可自由添加属性、方法,兼顾简洁性与灵活性。

4.3 与JSON序列化配合使用

在Web开发、接口请求中,经常需要将对象转为JSON字符串,attrs结合内置json模块可快速实现序列化。

import json
import attrs

@attrs.define
class Book:
    title: str
    author: str
    price: float

book = Book("Python编程从入门到实践", "埃里克·马瑟斯", 89.0)

# 对象转JSON
json_str = json.dumps(attrs.asdict(book), ensure_ascii=False)
print(json_str)

# JSON转对象
book_dict = json.loads(json_str)
new_book = Book(**book_dict)
print(new_book)

代码说明:先使用attrs.asdict转为字典,再通过json模块序列化为JSON,反向操作也十分简单,大幅简化数据处理流程。

五、实际项目综合案例

5.1 学生成绩管理小工具

结合attrs的自动代码生成、类型校验、字典转换等功能,实现一个轻量学生成绩管理工具,代码简洁且易于维护。

import attrs
from typing import List

@attrs.define
class Student:
    name: str
    student_id: str
    chinese: float = attrs.field(validator=lambda _, __, v: 0 <= v <= 100)
    math: float = attrs.field(validator=lambda _, __, v: 0 <= v <= 100)
    english: float = attrs.field(validator=lambda _, __, v: 0 <= v <= 100)

    # 计算总分
    @property
    def total_score(self):
        return self.chinese + self.math + self.english

    # 计算平均分
    @property
    def avg_score(self):
        return round(self.total_score / 3, 2)

@attrs.define
class ScoreManager:
    students: List[Student] = attrs.field(factory=list)

    # 添加学生
    def add_student(self, student: Student):
        self.students.append(student)

    # 显示所有学生信息
    def show_all(self):
        for idx, stu in enumerate(self.students, 1):
            print(f"序号:{idx},姓名:{stu.name},学号:{stu.student_id},"
                  f"总分:{stu.total_score},平均分:{stu.avg_score}")

# 使用示例
if __name__ == "__main__":
    # 创建管理对象
    manager = ScoreManager()

    # 添加学生
    s1 = Student("小明", "2026001", 92, 98, 95)
    s2 = Student("小红", "2026002", 88, 95, 96)
    manager.add_student(s1)
    manager.add_student(s2)

    # 展示所有学生成绩
    manager.show_all()

代码说明:该案例完整使用attrs的核心功能,包括字段定义、校验器、自定义属性、列表默认值等,实现了学生成绩添加、计算、展示功能,相比传统类定义,代码量减少60%以上,逻辑清晰易读。

5.2 配置文件读取与对象映射

在项目开发中,配置项通常使用字典存储,使用attrs可将配置字典直接映射为配置对象,使用时通过属性访问,更加安全规范。

import attrs

# 模拟配置字典
config_data = {
    "host": "127.0.0.1",
    "port": 8080,
    "debug": True,
    "database": "test_db"
}

@attrs.define
class AppConfig:
    host: str
    port: int
    debug: bool
    database: str

# 配置字典转为配置对象
config = AppConfig(**config_data)

# 使用属性访问配置,比字典更安全,有代码提示
print(f"服务器地址:{config.host}")
print(f"端口号:{config.port}")
print(f"调试模式:{config.debug}")

代码说明:将松散的配置字典转换为强类型对象,避免字典键名写错导致的运行时错误,同时IDE可提供自动补全,提升开发效率。

相关资源

  • Pypi地址:https://pypi.org/project/attrs/
  • Github地址:https://github.com/python-attrs/attrs
  • 官方文档地址:https://www.attrs.org/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:deepdiff 详解——数据差异对比与校验全攻略

一、deepdiff 库概述

在 Python 开发过程中,尤其是数据处理、接口测试、配置文件对比、数据同步校验等场景中,经常需要对比两个复杂数据结构(如字典、列表、集合、元组、自定义对象等)的差异。传统的对比方式仅能判断数据是否相等,无法精准定位差异位置、类型、具体内容,而 deepdiff 就是专门解决这一问题的第三方库。

deepdiff 核心作用是深度递归对比 Python 可哈希与不可哈希对象,精准找出两个对象之间的新增、删除、修改、值变化、类型变化、结构变化等差异,支持嵌套数据结构对比,无需手动编写递归遍历逻辑。其工作原理是通过递归遍历对象的每一层节点,逐一对比键、值、索引、数据类型,同时支持忽略指定字段、指定类型、指定路径等自定义规则。

该库优点是对比精度高、支持复杂嵌套结构、功能丰富、配置灵活,可满足绝大多数数据差异校验场景;缺点是对比超大规模数据时性能略有损耗,不适合极致性能要求的极简对比场景。deepdiff 采用 MIT License,开源免费,可商用、修改、分发,无严格版权限制。

二、deepdiff 安装方法

deepdiff 作为标准第三方库,可通过 pip 工具快速安装,支持 Python 3.6 及以上版本,安装命令如下:

pip install deepdiff

若需要加速安装,可使用国内镜像源:

pip install deepdiff -i https://pypi.tuna.tsinghua.edu.cn/simple

安装完成后,在 Python 脚本中直接导入即可使用,核心模块包括 DeepDiff、DeepSearch、grep、extract 等,满足不同对比与检索需求。

三、deepdiff 核心功能与基础使用

3.1 核心模块导入

deepdiff 最常用的核心类是 DeepDiff,用于深度对比两个对象,基础导入语句:

from deepdiff import DeepDiff

除基础对比外,还可根据需求导入辅助模块:

# 深度搜索对象中的值
from deepdiff import DeepSearch
# 按规则检索数据
from deepdiff import grep
# 提取指定路径数据
from deepdiff import extract

3.2 基础数据类型对比

deepdiff 支持数字、字符串、布尔值、None 等基础数据类型对比,直接传入两个待对比对象即可生成差异结果。

# 基础类型对比示例
from deepdiff import DeepDiff

# 定义两个待对比的基础数据
a = 100
b = 200
c = "python"
d = "Python"
e = True
f = False

# 数字对比
diff_num = DeepDiff(a, b)
print("数字差异结果:", diff_num)

# 字符串对比(区分大小写)
diff_str = DeepDiff(c, d)
print("字符串差异结果:", diff_str)

# 布尔值对比
diff_bool = DeepDiff(e, f)
print("布尔值差异结果:", diff_bool)

代码说明

  1. 直接传入两个基础类型变量,DeepDiff 自动判断是否相等;
  2. 字符串对比默认区分大小写,数字、布尔值直接对比值;
  3. 结果会以字典形式返回,values_changed 表示值发生变化,包含新旧值与对比路径。

运行结果:

数字差异结果: {'values_changed': {'root': {'new_value': 200, 'old_value': 100}}}
字符串差异结果: {'values_changed': {'root': {'new_value': 'Python', 'old_value': 'python'}}}
布尔值差异结果: {'values_changed': {'root': {'new_value': False, 'old_value': True}}}

3.3 列表数据对比

列表是开发中常用的数据结构,deepdiff 可对比列表的元素新增、删除、顺序变化、值变化,支持嵌套列表对比。

# 列表对比示例
from deepdiff import DeepDiff

# 普通列表对比
list1 = [1, 2, 3, 4]
list2 = [1, 2, 5, 4, 6]
diff_list = DeepDiff(list1, list2)
print("普通列表差异:", diff_list)

# 嵌套列表对比
nested_list1 = [1, [2, 3], 4]
nested_list2 = [1, [2, 5], 4, 7]
diff_nested_list = DeepDiff(nested_list1, nested_list2)
print("嵌套列表差异:", diff_nested_list)

代码说明

  1. iterable_item_added 表示列表新增元素;
  2. iterable_item_removed 表示列表删除元素;
  3. 嵌套列表会递归遍历每一层子列表,精准定位子元素变化。

运行结果:

普通列表差异: {'iterable_item_added': {'root[4]': 6}, 'values_changed': {'root[2]': {'new_value': 5, 'old_value': 3}}}
嵌套列表差异: {'iterable_item_added': {'root[3]': 7}, 'values_changed': {'root[1][1]': {'new_value': 5, 'old_value': 3}}}

3.4 字典数据对比

字典是配置文件、接口返回数据的常用结构,deepdiff 可对比字典的键新增、键删除、键值修改、嵌套字典变化,是接口自动化测试、配置文件校验的核心用法。

# 字典对比示例
from deepdiff import DeepDiff

# 普通字典对比
dict1 = {"name": "张三", "age": 20, "gender": "男"}
dict2 = {"name": "张三", "age": 21, "city": "北京"}
diff_dict = DeepDiff(dict1, dict2)
print("普通字典差异:", diff_dict)

# 嵌套字典对比
nested_dict1 = {"user": {"name": "李四", "info": {"score": 90}}, "status": True}
nested_dict2 = {"user": {"name": "李四", "info": {"score": 95}}, "status": False}
diff_nested_dict = DeepDiff(nested_dict1, nested_dict2)
print("嵌套字典差异:", diff_nested_dict)

代码说明

  1. dictionary_item_added:字典新增键值对;
  2. dictionary_item_removed:字典删除键值对;
  3. values_changed:字典键对应的值发生变化;
  4. 嵌套字典会递归解析每一层子字典,路径清晰便于定位问题。

运行结果:

普通字典差异: {'dictionary_item_added': {'root['city']': '北京'}, 'dictionary_item_removed': {'root['gender']': '男'}, 'values_changed': {"root['age']": {'new_value': 21, 'old_value': 20}}}
嵌套字典差异: {'values_changed': {"root[0]['user']['info']['score']": {'new_value': 95, 'old_value': 90}, "root['status']": {'new_value': False, 'old_value': True}}}

3.5 集合与元组对比

deepdiff 同样支持集合、元组的对比,集合对比自动忽略顺序,仅判断元素增减;元组对比兼顾顺序与值变化。

# 集合与元组对比
from deepdiff import DeepDiff

# 集合对比
set1 = {1, 2, 3}
set2 = {2, 3, 4}
diff_set = DeepDiff(set1, set2)
print("集合差异:", diff_set)

# 元组对比
tuple1 = (1, 2, 3)
tuple2 = (1, 4, 3)
diff_tuple = DeepDiff(tuple1, tuple2)
print("元组差异:", diff_tuple)

代码说明

  1. 集合对比结果为 set_item_addedset_item_removed
  2. 元组属于有序不可变对象,对比逻辑与列表一致。

四、deepdiff 高级配置与实用参数

deepdiff 提供丰富的配置参数,可自定义对比规则,满足复杂场景需求,以下是最常用的高级参数。

4.1 忽略指定字段对比

在接口测试或配置对比中,经常需要忽略时间戳、随机ID、日志字段等,使用 exclude_paths 参数实现。

# 忽略指定字段对比
from deepdiff import DeepDiff

data1 = {
    "id": 1001,
    "name": "产品A",
    "time": "2025-01-01 12:00:00",
    "price": 99
}
data2 = {
    "id": 1001,
    "name": "产品A",
    "time": "2025-01-02 12:00:00",
    "price": 99
}

# 忽略 time 字段
diff = DeepDiff(data1, data2, exclude_paths=["root['time']"])
print("忽略time字段后的差异:", diff)

代码说明
exclude_paths 接收列表参数,传入需要忽略的字段路径,对比时自动跳过该字段,结果中无相关差异。

4.2 忽略数据类型变化

部分场景中,数字与字符串形式的相同值(如 100 和 “100”)视为相等,使用 ignore_type_in_groups 参数。

# 忽略类型变化
from deepdiff import DeepDiff

obj1 = {"num": 100}
obj2 = {"num": "100"}

# 忽略 int 和 str 类型差异
diff = DeepDiff(obj1, obj2, ignore_type_in_groups=[(int, str)])
print("忽略类型变化后的结果:", diff)

代码说明
ignore_type_in_groups 可指定多组类型组合,对比时相同值不同类型会被判定为无差异。

4.3 忽略字符串大小写与空格

对比字符串时,可忽略大小写、首尾空格、中间多余空格,提升对比容错率。

# 忽略字符串大小写与空格
from deepdiff import DeepDiff

str_obj1 = {"content": " Python DeepDiff "}
str_obj2 = {"content": "python deepdiff"}

# 忽略大小写、首尾空格
diff = DeepDiff(str_obj1, str_obj2, ignore_string_case=True, ignore_string_whitespace=True)
print("忽略字符串格式后的结果:", diff)

4.4 精确对比与数学近似对比

对比浮点数时,避免精度误差导致误判,可设置 number_accuracy 参数指定精度范围。

# 浮点数近似对比
from deepdiff import DeepDiff

num1 = {"value": 3.14159}
num2 = {"value": 3.1416}

# 保留4位小数对比
diff = DeepDiff(num1, num2, number_accuracy=4)
print("浮点数精度对比结果:", diff)

五、DeepSearch 深度搜索功能

DeepSearch 可在复杂嵌套数据中,搜索指定值、键、类型,返回匹配路径,是数据检索的实用功能。

# DeepSearch 深度搜索示例
from deepdiff import DeepSearch

data = {
    "class": "一年级",
    "students": [
        {"name": "小明", "score": 90},
        {"name": "小红", "score": 95},
        {"name": "小刚", "score": 90}
    ]
}

# 搜索值为90的所有路径
search = DeepSearch(data, 90)
print("值为90的匹配路径:", search)

代码说明
DeepSearch 会遍历整个数据结构,返回所有匹配目标值的节点路径,方便快速定位数据位置。

六、实际开发场景综合案例

6.1 接口自动化测试数据对比

接口测试中,需要校验接口返回数据与预期数据是否一致,deepdiff 可快速定位返回值异常。

# 接口测试数据对比案例
from deepdiff import DeepDiff

# 预期返回数据
expect_data = {
    "code": 200,
    "msg": "success",
    "data": {
        "user_id": 123,
        "username": "test_user",
        "role": "user"
    }
}

# 实际返回数据
actual_data = {
    "code": 200,
    "msg": "success",
    "data": {
        "user_id": 123,
        "username": "test_user",
        "role": "admin"
    }
}

# 对比接口数据,忽略无意义字段
diff_result = DeepDiff(expect_data, actual_data, exclude_paths=["root['msg']"])
print("接口数据差异:", diff_result)

# 判断是否存在差异
if diff_result:
    print("接口返回数据异常!")
else:
    print("接口返回数据正常!")

代码说明
该案例模拟接口测试场景,通过对比预期与实际数据,快速发现角色字段异常,提升测试效率。

6.2 配置文件前后版本对比

项目配置文件修改后,可使用 deepdiff 对比新旧配置,避免误改导致项目异常。

# 配置文件对比案例
from deepdiff import DeepDiff

# 旧版本配置
old_config = {
    "debug": False,
    "port": 8080,
    "database": {
        "host": "127.0.0.1",
        "port": 3306,
        "db": "test"
    }
}

# 新版本配置
new_config = {
    "debug": True,
    "port": 8090,
    "database": {
        "host": "127.0.0.1",
        "port": 3306,
        "db": "prod"
    }
}

# 完整对比配置差异
config_diff = DeepDiff(old_config, new_config)
print("配置文件版本差异:", config_diff)

6.3 数据同步前后校验

数据同步、数据迁移场景中,使用 deepdiff 校验源数据与目标数据是否完全一致,确保数据无丢失、无篡改。

# 数据同步校验案例
from deepdiff import DeepDiff

# 源数据
source_data = [
    {"id": 1, "name": "苹果", "stock": 100},
    {"id": 2, "name": "香蕉", "stock": 200}
]

# 同步后目标数据
target_data = [
    {"id": 1, "name": "苹果", "stock": 100},
    {"id": 2, "name": "香蕉", "stock": 250}
]

# 校验同步结果
sync_diff = DeepDiff(source_data, target_data)
print("数据同步差异:", sync_diff)

七、相关资源

  • Pypi地址:https://pypi.org/project/deepdiff/
  • Github地址:https://github.com/seperman/deepdiff
  • 官方文档地址:https://zepworks.com/deepdiff/current/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:XlsxWriter 从入门到实战,轻松生成专业Excel文件

一、XlsxWriter 库概述

XlsxWriter 是一款专注于生成 Excel 2007 及以上版本 .xlsx 格式文件的 Python 库,核心作用是创建带格式、图表、公式的全新Excel文件,原理是直接按Office标准格式写入文件,无需依赖Microsoft Excel。优点是功能全面、格式支持丰富、兼容性强,缺点是无法读取或修改已有Excel文件。该库采用 BSD 开源许可证,可自由商用与修改。

二、XlsxWriter 安装方法

在使用 XlsxWriter 之前,我们需要先通过 Python 官方的包管理工具 pip 进行安装,这是最快捷、最稳定的安装方式,无论是 Windows、macOS 还是 Linux 系统,都可以通过统一的命令完成安装操作。

打开电脑的命令提示符(CMD)、PowerShell 或者终端(Terminal),输入以下安装命令:

pip install XlsxWriter

等待命令执行完成,当界面出现 Successfully installed XlsxWriter-x.x.x 这样的提示时,就代表安装成功了。如果你的电脑中同时安装了 Python2 和 Python3,为了避免安装到错误的环境中,可以使用 pip3 install XlsxWriter 命令进行安装,确保库被安装到 Python3 环境下。

安装完成后,我们可以简单验证一下是否安装成功,在 Python 交互环境中输入 import xlsxwriter,如果没有报错,就说明 XlsxWriter 已经可以正常使用了,接下来就可以开始编写代码生成 Excel 文件。

三、XlsxWriter 基础使用教程

3.1 创建第一个简单的 Excel 文件

XlsxWriter 的基础使用逻辑非常清晰,一共分为三步:创建工作簿(Workbook)、创建工作表(Worksheet)、向工作表中写入数据,最后关闭工作簿即可生成完整的 Excel 文件。

下面是创建最简单 Excel 文件的代码示例,每一行代码都附带详细注释,方便零基础小白理解:

# 导入 XlsxWriter 库
import xlsxwriter

# 1. 创建一个名为 demo.xlsx 的工作簿对象
workbook = xlsxwriter.Workbook('demo.xlsx')

# 2. 在工作簿中创建一个工作表,默认名称为 Sheet1
worksheet = workbook.add_worksheet()

# 3. 向工作表中写入数据,格式:write(行, 列, 内容)
# Excel 中行和列的索引从 0 开始,(0,0) 代表 A1 单元格
worksheet.write(0, 0, 'Hello')
worksheet.write(0, 1, 'XlsxWriter')
worksheet.write(1, 0, 'Python')
worksheet.write(1, 1, 'Excel')

# 4. 关闭工作簿,完成文件写入
workbook.close()

代码说明:

  • 首先导入 xlsxwriter 库,这是使用所有功能的前提;
  • xlsxwriter.Workbook('demo.xlsx') 用于创建工作簿,括号内的字符串是生成 Excel 文件的名称;
  • add_worksheet() 用于创建工作表,不传入参数时默认工作表名为 Sheet1,也可以自定义名称,例如 add_worksheet('学生成绩')
  • write() 方法是核心写入方法,前两个参数分别是行号和列号,第三个参数是要写入单元格的内容;
  • 最后必须调用 workbook.close(),否则文件无法正常保存,这是很多新手容易忽略的细节。

运行这段代码后,会在 Python 脚本的同级目录下生成一个名为 demo.xlsx 的 Excel 文件,打开后可以看到 A1、B1、A2、B2 单元格已经成功写入对应内容。

3.2 自定义工作表名称

在实际使用中,默认的 Sheet1 无法满足需求,我们可以通过 add_worksheet() 方法的参数自定义工作表名称,让 Excel 文件结构更清晰,代码如下:

import xlsxwriter

# 创建工作簿
workbook = xlsxwriter.Workbook('自定义工作表.xlsx')

# 创建多个自定义名称的工作表
worksheet1 = workbook.add_worksheet('产品清单')
worksheet2 = workbook.add_worksheet('销售数据')
worksheet3 = workbook.add_worksheet('库存统计')

# 分别向不同工作表写入数据
worksheet1.write('A1', '产品名称')
worksheet2.write('A1', '销售日期')
worksheet3.write('A1', '库存数量')

# 关闭工作簿
workbook.close()

代码说明:

  • 一个 Excel 文件中可以创建多个工作表,每个工作表都有独立的名称;
  • 除了使用行号列号定位单元格,还可以使用 Excel 原生的单元格地址,例如 A1B5,使用起来更直观;
  • 多个工作表之间互不影响,可分别写入不同的数据内容。

3.3 写入不同类型的数据

XlsxWriter 支持写入字符串、数字、日期、公式等多种数据类型,无需手动转换格式,库会自动识别处理,下面演示不同数据类型的写入方法:

import xlsxwriter
from datetime import datetime

workbook = xlsxwriter.Workbook('数据类型.xlsx')
worksheet = workbook.add_worksheet()

# 写入字符串
worksheet.write(0, 0, '商品名称')
# 写入数字
worksheet.write(0, 1, 100)
# 写入浮点数
worksheet.write(0, 2, 99.99)
# 写入日期
worksheet.write(0, 3, datetime.now())
# 写入计算公式(求和公式)
worksheet.write(0, 4, '=B1+C1')

workbook.close()

代码说明:

  • 写入日期时需要导入 datetime 模块,库会自动将日期格式转换为 Excel 可识别的格式;
  • 写入公式时直接按照 Excel 公式语法编写字符串即可,打开 Excel 后会自动计算结果;
  • 无论写入哪种数据类型,都使用 write() 方法,库会自动匹配对应的写入逻辑,降低使用门槛。

四、XlsxWriter 格式设置功能

4.1 设置单元格字体格式

为了让 Excel 文件更美观、易读,我们可以通过 add_format() 方法创建格式对象,设置字体、字号、颜色、加粗、斜体等样式,这是 XlsxWriter 非常实用的功能之一。

代码示例:

import xlsxwriter

workbook = xlsxwriter.Workbook('字体格式.xlsx')
worksheet = workbook.add_worksheet()

# 创建字体格式对象
bold_format = workbook.add_format({
    'bold': True,  # 加粗
    'font_size': 14,  # 字号
    'font_color': 'red',  # 字体颜色
    'font_name': '微软雅黑'  # 字体
})

# 创建居中对齐格式
center_format = workbook.add_format({
    'align': 'center',  # 水平居中
    'valign': 'vcenter',  # 垂直居中
    'italic': True  # 斜体
})

# 应用格式写入数据
worksheet.write(0, 0, '标题', bold_format)
worksheet.write(1, 0, '内容', center_format)

workbook.close()

代码说明:

  • add_format() 接收一个字典参数,键是格式属性,值是对应的样式;
  • 常用字体属性:bold(加粗)、font_size(字号)、font_color(颜色)、font_name(字体)、italic(斜体);
  • 一个格式对象可以重复使用,应用到多个单元格中,提高代码复用率。

4.2 设置单元格边框和背景色

除了字体格式,还可以设置单元格的边框样式、背景填充颜色,让表格的边界更清晰,重点数据更突出,代码如下:

import xlsxwriter

workbook = xlsxwriter.Workbook('边框背景.xlsx')
worksheet = workbook.add_worksheet()

# 创建带边框和背景色的格式
cell_format = workbook.add_format({
    'border': 1,  # 边框宽度
    'border_color': 'black',  # 边框颜色
    'bg_color': '#FFFF00',  # 背景色(黄色)
    'align': 'center'
})

# 批量写入带格式的数据
data = [
    ['姓名', '年龄', '性别'],
    ['张三', 20, '男'],
    ['李四', 22, '女']
]

for row_num, row_data in enumerate(data):
    for col_num, col_data in enumerate(row_data):
        worksheet.write(row_num, col_num, col_data, cell_format)

workbook.close()

代码说明:

  • border 设置边框宽度,数值越大边框越粗,border_color 自定义边框颜色;
  • bg_color 支持颜色名称和十六进制颜色码,满足个性化配色需求;
  • 通过循环嵌套可以批量为表格数据应用格式,适合处理大量数据的场景。

4.3 设置列宽和行高

当单元格内容过长或过短时,默认的列宽行高会影响显示效果,XlsxWriter 提供了 set_column()set_row() 方法分别设置列宽和行高。

代码示例:

import xlsxwriter

workbook = xlsxwriter.Workbook('列宽行高.xlsx')
worksheet = workbook.add_worksheet()

# 写入数据
worksheet.write(0, 0, '这是一段很长的文本内容,测试列宽是否适配')
worksheet.write(1, 0, '测试行高')

# 设置列宽:set_column(起始列, 结束列, 宽度)
worksheet.set_column(0, 0, 30)

# 设置行高:set_row(行号, 高度)
worksheet.set_row(0, 30)
worksheet.set_row(1, 20)

workbook.close()

代码说明:

  • set_column(0, 0, 30) 表示将第 1 列(A列)的宽度设置为 30;
  • set_row(0, 30) 表示将第 1 行的高度设置为 30;
  • 可以批量设置多列或多行,例如 set_column(0, 2, 20) 表示将 A、B、C 三列宽度都设为 20。

五、XlsxWriter 高级功能:插入图表

XlsxWriter 支持插入柱状图、折线图、饼图、条形图等多种图表类型,可直接通过代码生成可视化图表,无需手动在 Excel 中制作,适合自动化生成报表的场景。

5.1 插入柱状图

代码示例:

import xlsxwriter

workbook = xlsxwriter.Workbook('柱状图.xlsx')
worksheet = workbook.add_worksheet()

# 准备图表数据
chart_data = [
    ['月份', '销售额'],
    ['1月', 1000],
    ['2月', 1500],
    ['3月', 1200],
    ['4月', 1800],
    ['5月', 2000]
]

# 写入数据
for row_num, row_data in enumerate(chart_data):
    worksheet.write_row(row_num, 0, row_data)

# 创建柱状图对象
chart = workbook.add_chart({'type': 'column'})

# 配置图表数据系列
chart.add_series({
    'name': '销售额',
    'categories': ['Sheet1', 1, 0, 5, 0],  # 横坐标数据
    'values': ['Sheet1', 1, 1, 5, 1],       # 纵坐标数据
    'fill': {'color': 'blue'}                # 柱子颜色
})

# 设置图表标题和坐标轴名称
chart.set_title({'name': '月度销售额统计表'})
chart.set_x_axis({'name': '月份'})
chart.set_y_axis({'name': '销售额(元)'})

# 将图表插入到工作表中
worksheet.insert_chart('A7', chart)

workbook.close()

代码说明:

  • add_chart({'type': 'column'}) 创建柱状图,修改 type 值可切换图表类型,如 line(折线图)、pie(饼图);
  • add_series() 配置图表数据,categories 是横坐标,values 是纵坐标,格式为 [工作表名, 起始行, 起始列, 结束行, 结束列]
  • set_title()set_x_axis()set_y_axis() 分别设置图表标题和坐标轴名称,让图表更易懂;
  • insert_chart() 将图表插入到指定单元格位置。

六、XlsxWriter 实际业务案例:学生成绩统计表

下面结合实际业务场景,使用 XlsxWriter 生成一份完整的学生成绩统计表,包含表头格式、数据写入、公式计算、列宽设置等功能,贴近真实开发需求。

完整代码示例:

import xlsxwriter

# 创建工作簿
workbook = xlsxwriter.Workbook('学生成绩统计表.xlsx')
worksheet = workbook.add_worksheet('成绩表')

# 定义表头格式:加粗、居中、边框、背景色
header_format = workbook.add_format({
    'bold': True,
    'align': 'center',
    'valign': 'vcenter',
    'border': 1,
    'bg_color': '#00BFFF',
    'font_color': 'white',
    'font_size': 12
})

# 定义数据格式:居中、边框
data_format = workbook.add_format({
    'align': 'center',
    'valign': 'vcenter',
    'border': 1
})

# 定义总分格式:加粗、居中、边框、红色
total_format = workbook.add_format({
    'bold': True,
    'align': 'center',
    'valign': 'vcenter',
    'border': 1,
    'font_color': 'red'
})

# 表头数据
headers = ['学号', '姓名', '语文', '数学', '英语', '总分']
# 学生成绩数据
student_data = [
    ['001', '张三', 85, 92, 88],
    ['002', '李四', 78, 95, 90],
    ['003', '王五', 90, 86, 94],
    ['004', '赵六', 82, 88, 85]
]

# 写入表头
worksheet.write_row(0, 0, headers, header_format)

# 写入学生成绩数据
for row_num, row_data in enumerate(student_data, start=1):
    worksheet.write_row(row_num, 0, row_data, data_format)
    # 写入总分公式(语文+数学+英语)
    worksheet.write(row_num, 5, f'=C{row_num+1}+D{row_num+1}+E{row_num+1}', total_format)

# 设置列宽
worksheet.set_column(0, 5, 12)

# 关闭工作簿
workbook.close()

代码说明:

  • 该案例模拟了学校学生成绩统计场景,包含学号、姓名、各科成绩、总分等字段;
  • 为不同区域设置不同格式,表头突出显示,总分用红色加粗标注;
  • 通过公式自动计算总分,无需手动计算,提升效率;
  • 统一设置列宽,保证表格内容完整显示,打开后即可直接使用。

运行代码后生成的 Excel 文件,结构清晰、样式美观,可直接用于教学、办公等场景,充分体现了 XlsxWriter 在自动化办公中的实用价值。

相关资源

  • Pypi地址:https://pypi.org/project/XlsxWriter
  • Github地址:https://github.com/jmcnamara/XlsxWriter
  • 官方文档地址:https://xlsxwriter.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:PyYAML 从入门到实战,轻松搞定YAML文件读写

一、PyYAML 库概述

在Python开发过程中,配置文件的管理是项目开发中不可或缺的一环,相比于传统的INI、JSON配置文件,YAML凭借简洁易读、支持注释、可嵌套、支持列表与字典混合使用的语法优势,成为现代项目配置文件的首选格式,而PyYAML就是Python生态中专门用于解析和生成YAML文件的标准库。

PyYAML的核心工作原理是实现Python数据类型与YAML格式数据的双向转换,它可以将Python中的字典、列表、字符串、数字、布尔值、None等基础数据类型序列化为YAML格式文本,也能将YAML格式的文本或文件反序列化为Python可直接操作的数据对象,底层基于YAML 1.1规范实现解析,兼容主流YAML语法规则。

该库采用MIT License开源,允许商业与非商业项目自由使用、修改和分发。其优点是语法简洁、使用简单、兼容性强、支持复杂数据结构,缺点是解析大型YAML文件时性能略低于部分专用解析库,且存在一定的安全风险,不建议解析不可信的YAML内容。

二、PyYAML 安装方法

PyYAML作为Python第三方库,无法通过Python内置模块直接调用,需要使用pip包管理器进行安装,安装命令支持普通安装与指定版本安装,适配Windows、macOS、Linux全平台系统。

2.1 基础安装命令

打开系统命令提示符(CMD)、PowerShell或终端,直接执行以下pip命令即可完成安装:

pip install pyyaml

2.2 指定版本安装

如果项目需要固定PyYAML版本以保证兼容性,可以使用以下命令指定版本安装,这里以6.0版本为例:

pip install pyyaml==6.0

2.3 升级PyYAML

若本地已安装旧版本PyYAML,想要升级到最新稳定版本,执行升级命令:

pip install --upgrade pyyaml

安装完成后,可在Python交互环境中执行import yaml,若没有报错则说明安装成功,可正常使用。

三、PyYAML 基础使用方法

PyYAML的核心操作分为两大类:YAML文件/字符串解析(加载)Python对象转YAML(转储),分别对应yaml.load()yaml.safe_load()yaml.dump()等核心方法,其中safe_load()是官方推荐的安全解析方法,可避免执行恶意代码。

3.1 YAML字符串解析为Python对象

当我们需要将YAML格式的字符串转换为Python字典、列表等对象时,使用yaml.safe_load()方法,该方法只解析基础数据类型,不执行自定义Python对象,安全性更高。

# 导入PyYAML库
import yaml

# 定义YAML格式的字符串
yaml_str = """
name: 测试项目
version: 1.0.0
author: 开发者
features:
  - 配置解析
  - 数据序列化
  - 跨平台兼容
is_online: true
port: 8080
"""

# 将YAML字符串解析为Python字典
data = yaml.safe_load(yaml_str)

# 打印解析后的数据类型与内容
print("解析后的数据类型:", type(data))
print("解析后的数据内容:", data)
# 单独获取指定字段
print("项目名称:", data["name"])
print("项目端口:", data["port"])

代码说明

  1. 首先导入yaml库,这是使用PyYAML的前提;
  2. 定义多行YAML字符串,包含字符串、数字、列表、布尔值四种数据类型;
  3. 调用yaml.safe_load()方法完成解析,返回Python字典对象;
  4. 可通过字典键名直接获取对应的值,实现YAML数据的读取与使用。

3.2 Python对象转换为YAML字符串

将Python中的字典、列表等对象转换为标准YAML格式字符串,使用yaml.dump()方法,该方法会自动按照YAML语法格式化数据,生成可读性极高的文本内容。

import yaml

# 定义Python字典对象
python_data = {
    "app_name": "PyYAML教程",
    "language": "Python",
    "tags": ["YAML", "配置文件", "数据解析"],
    "status": "开发中",
    "config": {
        "debug": True,
        "log_path": "./logs/app.log"
    }
}

# 将Python对象转为YAML字符串
yaml_result = yaml.dump(python_data, allow_unicode=True, sort_keys=False)

# 打印生成的YAML内容
print("生成的YAML格式内容:")
print(yaml_result)

代码说明

  1. allow_unicode=True参数允许显示中文字符,避免中文乱码;
  2. sort_keys=False参数禁止自动排序字典键,保持原有数据顺序;
  3. 转换后的YAML内容自动缩进、分行,符合YAML标准语法,可读性强。

3.3 读取本地YAML文件

在实际项目中,YAML内容通常存储在.yaml.yml后缀的文件中,PyYAML可直接读取本地文件并解析,无需手动处理文件读取逻辑。

首先创建一个config.yaml文件,写入以下内容:

# 项目基础配置
database:
  host: 127.0.0.1
  port: 3306
  username: root
  password: 123456
  db_name: test_db

server:
  ip: 0.0.0.0
  port: 8000
  max_connect: 100

然后编写Python代码读取并解析该文件:

import yaml

# 打开YAML文件并读取内容
with open("config.yaml", "r", encoding="utf-8") as f:
    # 安全解析YAML文件内容
    config_data = yaml.safe_load(f)

# 输出解析后的数据库配置
print("数据库地址:", config_data["database"]["host"])
print("数据库端口:", config_data["database"]["port"])
print("服务端口:", config_data["server"]["port"])

代码说明

  1. 使用with open()语句打开文件,自动处理文件关闭操作,避免资源泄漏;
  2. 指定编码为utf-8,确保中文配置内容正常读取;
  3. 直接将文件对象传入yaml.safe_load(),即可完成文件解析,返回Python字典。

3.4 将Python对象写入YAML文件

除了读取YAML文件,PyYAML还支持将Python数据直接写入本地YAML文件,实现配置文件的生成与修改。

import yaml

# 定义需要写入的配置数据
write_data = {
    "system": {
        "os": "Windows/Linux/macOS",
        "python_version": "3.8+",
        "memory": "4GB+"
    },
    "modules": {
        "pyyaml": "6.0",
        "requests": "2.31.0",
        "pandas": "2.1.0"
    }
}

# 将数据写入YAML文件
with open("system_config.yaml", "w", encoding="utf-8") as f:
    # 写入文件,保留中文,不排序键
    yaml.dump(write_data, f, allow_unicode=True, sort_keys=False, indent=2)

print("YAML文件写入完成!")

代码说明

  1. indent=2参数设置YAML文件缩进为2个空格,让文件格式更美观;
  2. 写入完成后,会在当前目录生成system_config.yaml文件,内容符合标准YAML格式;
  3. 该方法常用于项目初始化时自动生成配置文件。

3.5 解析YAML列表与嵌套结构

YAML支持多层嵌套结构与复杂列表,PyYAML可完美解析这类复杂数据,适配企业级项目的复杂配置场景。

import yaml

# 包含嵌套字典与多层列表的YAML字符串
complex_yaml = """
project:
  name: 电商平台
  modules:
    - name: 用户模块
      functions: [登录, 注册, 修改密码]
    - name: 商品模块
      functions: [商品展示, 商品搜索, 购物车]
  version: 2.1.0
"""

# 解析复杂YAML数据
complex_data = yaml.safe_load(complex_yaml)

# 读取嵌套数据
print("项目名称:", complex_data["project"]["name"])
print("第一个模块名称:", complex_data["project"]["modules"][0]["name"])
print("用户模块功能:", complex_data["project"]["modules"][0]["functions"])

代码说明

  1. YAML支持列表嵌套字典、字典嵌套列表的复杂结构;
  2. PyYAML解析后,可通过多层键名与列表索引精准获取目标数据;
  3. 完全满足微服务、分布式项目的复杂配置读取需求。

四、PyYAML 高级使用技巧

4.1 安全解析与不安全解析的区别

PyYAML提供yaml.load()yaml.safe_load()两种解析方法,yaml.load()已被官方标记为不安全,因为它可以解析并执行自定义Python对象,若解析来自网络或不可信来源的YAML文件,可能导致代码执行漏洞。

import yaml

# 不安全的解析方式(不推荐)
# 仅用于本地可信YAML内容,禁止解析外部数据
unsafe_yaml = """
test: !!python/object/apply:subprocess.Popen
- [calc.exe]
"""
# 执行后会打开系统计算器,存在安全风险
# yaml.load(unsafe_yaml, Loader=yaml.UnsafeLoader)

# 安全解析方式(官方推荐)
# 无法执行Python对象,仅解析基础数据类型
safe_data = yaml.safe_load(unsafe_yaml)
print("安全解析结果:", safe_data)

代码说明

  1. 生产环境中必须使用yaml.safe_load(),杜绝安全漏洞;
  2. yaml.load()需要指定Loader,且仅适用于完全可信的本地数据;
  3. 安全解析会忽略自定义Python对象,保证程序运行安全。

4.2 批量解析多个YAML文档

YAML支持在一个文件中编写多个文档,使用`分隔,PyYAML可通过yaml.safe_load_all()`方法批量解析所有文档,返回可迭代对象。

创建multi_docs.yaml文件:

doc: 1
title: 第一个文档
content: 测试内容1

doc: 2
title: 第二个文档
content: 测试内容2

doc: 3
title: 第三个文档
content: 测试内容3

解析代码:

import yaml

# 读取包含多个文档的YAML文件
with open("multi_docs.yaml", "r", encoding="utf-8") as f:
    # 批量解析所有文档
    docs = yaml.safe_load_all(f)
    # 遍历所有文档内容
    for idx, doc in enumerate(docs, 1):
        print(f"第{idx}个文档:", doc)

代码说明

  1. yaml.safe_load_all()返回生成器对象,节省内存;
  2. 适用于存储多组配置、多组测试数据的场景;
  3. 遍历即可获取每个独立文档的Python对象。

4.3 自定义YAML输出格式

通过yaml.dump()的参数可自定义YAML输出格式,包括缩进、换行、浮点数精度、是否显示默认值等,满足不同项目的格式要求。

import yaml

custom_data = {
    "name": "自定义格式",
    "values": [1.23456, 2.34567, 3.45678],
    "info": {
        "author": "测试",
        "time": "2026-01-01"
    }
}

# 自定义输出格式
custom_yaml = yaml.dump(
    custom_data,
    allow_unicode=True,
    sort_keys=False,
    indent=4,
    default_flow_style=False,
    width=50
)

print(custom_yaml)

代码说明

  1. default_flow_style=False强制使用块样式输出,避免压缩格式;
  2. width=50限制每行最大宽度,提升可读性;
  3. 自定义格式后的YAML文件更符合团队开发规范。

五、PyYAML 实际项目应用案例

在实际Python项目中,PyYAML最常用于项目配置文件管理接口自动化测试数据存储爬虫配置管理机器学习参数配置等场景,下面以Web项目配置管理为例,展示完整的实战代码。

5.1 实战场景:Web项目配置管理工具

开发一个通用的配置管理工具,实现YAML配置文件的读取、修改、保存功能,支持多环境配置切换(开发环境、测试环境、生产环境)。

import yaml
import os

class YamlConfigManager:
    """YAML配置文件管理类"""
    def __init__(self, config_path="web_config.yaml"):
        self.config_path = config_path
        self.config_data = None
        # 初始化时加载配置文件
        self.load_config()

    def load_config(self):
        """加载YAML配置文件"""
        if not os.path.exists(self.config_path):
            raise FileNotFoundError(f"配置文件{self.config_path}不存在!")
        with open(self.config_path, "r", encoding="utf-8") as f:
            self.config_data = yaml.safe_load(f)
        print("配置文件加载成功!")

    def get_config(self, env="dev"):
        """获取指定环境的配置"""
        if env not in self.config_data:
            raise ValueError(f"不支持{env}环境配置!")
        return self.config_data[env]

    def update_config(self, env, key, value):
        """修改指定环境的配置"""
        if env not in self.config_data:
            self.config_data[env] = {}
        self.config_data[env][key] = value
        # 保存修改后的配置
        self.save_config()
        print(f"{env}环境{key}配置修改为{value}成功!")

    def save_config(self):
        """保存配置到YAML文件"""
        with open(self.config_path, "w", encoding="utf-8") as f:
            yaml.dump(self.config_data, f, allow_unicode=True, sort_keys=False, indent=2)

# 初始化配置文件内容(首次运行使用)
if not os.path.exists("web_config.yaml"):
    init_data = {
        "dev": {
            "host": "127.0.0.1",
            "port": 8000,
            "debug": True,
            "database": "sqlite:///dev.db"
        },
        "test": {
            "host": "192.168.1.100",
            "port": 8080,
            "debug": False,
            "database": "mysql://test:test@localhost/test_db"
        },
        "prod": {
            "host": "10.0.0.1",
            "port": 80,
            "debug": False,
            "database": "mysql://prod:prod@localhost/prod_db"
        }
    }
    with open("web_config.yaml", "w", encoding="utf-8") as f:
        yaml.dump(init_data, f, allow_unicode=True, sort_keys=False, indent=2)

# 实战调用
if __name__ == "__main__":
    # 创建配置管理器对象
    config_manager = YamlConfigManager()

    # 获取开发环境配置
    dev_config = config_manager.get_config("dev")
    print("开发环境配置:", dev_config)

    # 修改测试环境端口
    config_manager.update_config("test", "port", 8888)

    # 获取修改后的测试环境配置
    test_config = config_manager.get_config("test")
    print("修改后测试环境配置:", test_config)

代码说明

  1. 封装YamlConfigManager类,实现配置加载、读取、修改、保存的完整功能;
  2. 支持多环境配置切换,适配Web项目的不同运行环境;
  3. 自动初始化配置文件,无需手动创建,降低使用成本;
  4. 代码可直接集成到Django、Flask、FastAPI等Web框架中。

相关资源

  • Pypi地址:https://pypi.org/project/PyYAML/
  • Github地址:https://github.com/yaml/pyyaml
  • 官方文档地址:https://pyyaml.org/wiki/PyYAMLDocumentation

关注我,每天分享一个实用的Python自动化工具。

Python 数据管道神器:Mara Pipelines 从入门到实战教程

一、Mara Pipelines 库基础介绍

Mara Pipelines 是一款专注于数据管道构建、任务编排与ETL流程管理的Python库,核心用于搭建可监控、可复用、可回溯的数据处理流水线,基于有向无环图实现任务依赖调度。该库轻量易用,侧重数据工程场景,License为MIT开源协议,优点是部署简单、日志完善、便于协作,缺点是生态较小,不适合超大规模分布式计算。

二、Mara Pipelines 安装与环境准备

在正式使用 Mara Pipelines 之前,需要配置对应的Python运行环境,该库对Python版本有一定要求,建议使用Python 3.8及以上版本,避免因版本不兼容导致安装或运行失败。

安装方式采用Python官方的pip包管理器,打开命令行工具(Windows使用CMD或PowerShell,Linux与macOS使用终端),执行以下安装命令:

pip install mara-pipelines

安装过程中,命令行会自动下载并配置 Mara Pipelines 及其依赖库,包括任务调度、日志记录、命令行交互等相关依赖包。等待安装完成后,可通过以下命令验证是否安装成功:

pip show mara-pipelines

若命令行正常显示库的版本、安装路径、依赖等信息,说明安装无误,可以进入后续的开发与使用环节。

对于需要固定版本的生产环境,建议使用requirements.txt文件进行依赖管理,在文件中添加:

mara-pipelines==对应版本号

之后通过pip install -r requirements.txt完成统一安装,确保开发环境与生产环境保持一致,避免因版本差异引发运行问题。

三、Mara Pipelines 核心功能与基础使用

3.1 核心组件与工作逻辑

Mara Pipelines 的核心围绕管道(Pipeline)任务(Task)展开,任务是最小执行单元,管道负责将多个任务按照依赖关系组合,形成有序的执行流程。其工作原理为:先定义单个数据处理任务,再设置任务之间的前置依赖关系,最后通过调度器按顺序执行,执行过程中会实时记录日志、状态与执行结果,方便排查问题与监控流程。

该库的核心优势在于轻量化编排,无需依赖复杂的中间件或大数据框架,即可完成中小型数据ETL、数据清洗、脚本批量执行等工作,适合个人开发者、小型团队快速搭建数据处理流程。

3.2 基础任务定义与执行

下面通过最简单的单任务示例,演示如何定义并运行 Mara Pipelines 任务,帮助新手快速理解基础用法。

首先创建Python脚本文件,命名为basic_task.py,编写如下代码:

# 导入核心组件
from mara_pipelines.commands.python import PythonFunction
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.cli import run_pipeline

# 定义数据处理函数
def simple_data_process():
    """
    基础数据处理函数
    模拟数据读取、清洗、输出的简单流程
    """
    # 模拟原始数据
    raw_data = [1, 2, 3, 4, 5]
    # 数据处理:计算列表元素平方
    processed_data = [num ** 2 for num in raw_data]
    print(f"原始数据: {raw_data}")
    print(f"处理后数据: {processed_data}")
    print("基础任务执行完成!")

# 创建管道实例
basic_pipeline = Pipeline(
    id="simple_data_pipeline",
    description="最简单的Mara Pipelines数据处理管道")

# 添加任务到管道
simple_task = Task(
    id="simple_process_task",
    description="执行简单数据平方处理",
    commands=[
        PythonFunction(simple_data_process)
    ])

# 将任务加入管道
basic_pipeline.add(simple_task)

# 命令行运行管道
if __name__ == "__main__":
    run_pipeline(basic_pipeline)

代码说明:

  1. 导入所需模块,PythonFunction用于将普通Python函数封装为任务,Pipeline用于创建管道,Task用于定义任务,run_pipeline用于启动管道。
  2. 定义simple_data_process函数,模拟最简单的数据处理逻辑,实现列表元素平方计算。
  3. 创建管道对象,设置唯一ID与描述信息,方便后续识别与管理。
  4. 创建任务对象,绑定封装好的Python函数,一个任务可包含多个执行命令。
  5. 通过add方法将任务添加到管道,最后在主程序中启动管道。

运行脚本的命令行指令:

python basic_task.py

运行后控制台会输出任务执行日志、原始数据与处理后数据,清晰展示任务完整执行过程,这是 Mara Pipelines 最基础的使用方式。

3.3 多任务依赖编排

实际数据处理场景中,往往需要多个任务按顺序执行,比如先读取数据、再清洗数据、最后存储数据,Mara Pipelines 可通过upstream参数设置任务依赖关系。

创建multi_task_pipeline.py脚本,代码如下:

from mara_pipelines.commands.python import PythonFunction
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.cli import run_pipeline

# 任务1:数据读取
def data_read():
    print("===== 开始执行数据读取任务 =====")
    global raw_data
    # 模拟从文件/接口读取数据
    raw_data = ["Python", "", "Mara", "Pipelines", "", "教程"]
    print(f"读取到原始数据: {raw_data}")
    print("===== 数据读取任务完成 =====")

# 任务2:数据清洗
def data_clean():
    print("===== 开始执行数据清洗任务 =====")
    global clean_data
    # 清洗规则:去除空字符串
    clean_data = [item for item in raw_data if item.strip()]
    print(f"清洗后数据: {clean_data}")
    print("===== 数据清洗任务完成 =====")

# 任务3:数据输出
def data_output():
    print("===== 开始执行数据输出任务 =====")
    # 模拟保存到文件
    with open("processed_data.txt", "w", encoding="utf-8") as f:
        f.write("\n".join(clean_data))
    print("数据已成功写入 processed_data.txt 文件")
    print("===== 数据输出任务完成 =====")

# 创建主管道
data_flow_pipeline = Pipeline(
    id="data_etl_pipeline",
    description="完整的数据ETL处理管道")

# 定义三个任务
read_task = Task(
    id="read_data_task",
    description="读取原始数据",
    commands=[PythonFunction(data_read)])

clean_task = Task(
    id="clean_data_task",
    description="清洗无效数据",
    commands=[PythonFunction(data_clean)],
    upstream=[read_task])  # 设置依赖:读取任务完成后执行

output_task = Task(
    id="output_data_task",
    description="输出清洗后数据",
    commands=[PythonFunction(data_output)],
    upstream=[clean_task])  # 设置依赖:清洗任务完成后执行

# 依次添加任务到管道
data_flow_pipeline.add(read_task)
data_flow_pipeline.add(clean_task)
data_flow_pipeline.add(output_task)

# 启动管道
if __name__ == "__main__":
    run_pipeline(data_flow_pipeline)

代码说明:

  1. 定义三个功能函数,分别对应数据读取、清洗、输出三个核心ETL环节。
  2. 创建任务时,通过upstream参数指定前置任务,形成读取→清洗→输出的执行链。
  3. 管道会自动按照依赖顺序执行,若前置任务执行失败,后续任务不会启动,保证数据处理的安全性。
  4. 执行完成后,会在脚本同级目录生成processed_data.txt文件,存储清洗后的有效数据。

运行命令:

python multi_task_pipeline.py

控制台会按顺序输出三个任务的执行日志,清晰展示多任务依赖编排的执行效果。

四、Mara Pipelines 高级功能实战

4.1 任务异常处理与重试机制

在实际生产环境中,数据处理任务可能因网络波动、数据异常、文件缺失等原因执行失败,Mara Pipelines 支持任务重试机制,提升流程稳定性。

创建retry_task_pipeline.py脚本:

from mara_pipelines.commands.python import PythonFunction
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.cli import run_pipeline
import random

# 模拟可能失败的任务
def unstable_data_task():
    print("===== 执行不稳定数据处理任务 =====")
    # 随机模拟任务失败
    if random.choice([True, False]):
        raise Exception("任务执行失败:数据获取异常!")
    else:
        print("数据处理成功!")

# 创建管道
retry_pipeline = Pipeline(
    id="retry_strategy_pipeline",
    description="带异常重试机制的数据管道")

# 定义带重试的任务
retry_task = Task(
    id="unstable_process_task",
    description="可能失败的处理任务",
    commands=[PythonFunction(unstable_data_task)],
    max_retries=3,  # 设置最大重试次数
    timeout=10)     # 设置任务超时时间(秒)

retry_pipeline.add(retry_task)

if __name__ == "__main__":
    run_pipeline(retry_pipeline)

代码说明:

  1. 通过random模块随机模拟任务失败,还原真实生产场景。
  2. 任务参数中max_retries=3表示任务失败后最多重试3次,timeout=10表示任务执行超过10秒自动判定为失败。
  3. 重试机制可有效应对临时性异常,减少人工干预成本,适合对接外部接口、数据库等不稳定数据源。

4.2 命令行任务集成

Mara Pipelines 不仅支持Python函数任务,还支持执行系统命令行任务,可轻松集成Shell、CMD命令,实现跨语言、跨工具的流程编排。

创建shell_task_pipeline.py脚本:

from mara_pipelines.commands.shell import ShellCommand
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.cli import run_pipeline

# 创建管道
shell_pipeline = Pipeline(
    id="shell_command_pipeline",
    description="集成系统命令行的数据管道")

# 任务1:查看当前目录文件
list_file_task = Task(
    id="list_files_task",
    description="列出当前目录所有文件",
    commands=[
        ShellCommand("echo ===== 开始列出当前目录文件 ====="),
        ShellCommand("dir" if platform.system() == "Windows" else "ls")
    ])

# 任务2:创建新文件夹
make_dir_task = Task(
    id="make_dir_task",
    description="创建数据存储文件夹",
    commands=[ShellCommand("mkdir mara_data_folder")],
    upstream=[list_file_task])

# 任务3:输出系统信息
sys_info_task = Task(
    id="system_info_task",
    description="查看系统基本信息",
    commands=[
        ShellCommand("echo ===== 系统信息 ====="),
        ShellCommand("ver" if platform.system() == "Windows" else "uname -a")
    ],
    upstream=[make_dir_task])

shell_pipeline.add(list_file_task)
shell_pipeline.add(make_dir_task)
shell_pipeline.add(sys_info_task)

if __name__ == "__main__":
    import platform
    run_pipeline(shell_pipeline)

代码说明:

  1. 使用ShellCommand封装系统命令,实现Python与系统命令的无缝衔接。
  2. 通过platform模块判断操作系统,适配Windows与Linux/macOS的不同命令。
  3. 该功能可用于文件操作、环境检查、第三方工具调用等场景,扩展了数据管道的适用范围。

五、企业级真实案例:用户行为数据ETL处理

结合实际业务场景,使用 Mara Pipelines 搭建一套完整的用户行为数据ETL处理管道,实现数据读取、清洗、统计、存储全流程,贴近企业实际使用需求。

5.1 业务需求

  1. 读取模拟的用户行为原始数据(包含用户ID、行为类型、时间戳、空值、重复数据)。
  2. 清洗数据:去除空值、去重、过滤无效行为。
  3. 统计数据:计算各行为类型的用户数量。
  4. 存储结果:将统计结果保存为CSV文件。

5.2 完整代码实现

创建user_behavior_etl.py脚本:

from mara_pipelines.commands.python import PythonFunction
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.cli import run_pipeline
import pandas as pd
import os

# 全局变量存储数据
raw_behavior_data = None
clean_behavior_data = None
stat_result_data = None

# 任务1:生成模拟用户行为数据
def generate_behavior_data():
    global raw_behavior_data
    print("===== 生成用户行为原始数据 =====")
    # 模拟原始数据,包含空值、重复、无效数据
    data = {
        "user_id": [1001, 1002, None, 1001, 1003, 1002, 1004, None],
        "behavior": ["click", "view", "click", "click", "like", "view", "invalid", "like"],
        "timestamp": ["2025-01-01 10:00", "2025-01-01 10:05", "2025-01-01 10:10",
                     "2025-01-01 10:00", "2025-01-01 10:15", "2025-01-01 10:05",
                     "2025-01-01 10:20", "2025-01-01 10:25"]
    }
    raw_behavior_data = pd.DataFrame(data)
    print("原始数据预览:")
    print(raw_behavior_data)
    print("===== 数据生成完成 =====")

# 任务2:清洗用户行为数据
def clean_behavior_data_func():
    global raw_behavior_data, clean_behavior_data
    print("===== 开始清洗用户行为数据 =====")
    # 去除user_id为空的行
    clean_data = raw_behavior_data.dropna(subset=["user_id"])
    # 去除重复数据
    clean_data = clean_data.drop_duplicates()
    # 过滤无效行为
    clean_data = clean_data[clean_data["behavior"] != "invalid"]
    # 重置索引
    clean_data = clean_data.reset_index(drop=True)
    clean_behavior_data = clean_data
    print("清洗后数据预览:")
    print(clean_behavior_data)
    print("===== 数据清洗完成 =====")

# 任务3:统计用户行为数据
def stat_behavior_data():
    global clean_behavior_data, stat_result_data
    print("===== 开始统计用户行为 =====")
    # 按行为类型统计用户数量
    stat_result = clean_behavior_data.groupby("behavior")["user_id"].nunique().reset_index()
    stat_result.columns = ["行为类型", "独立用户数"]
    stat_result_data = stat_result
    print("统计结果预览:")
    print(stat_result_data)
    print("===== 数据统计完成 =====")

# 任务4:保存统计结果到CSV文件
def save_stat_result():
    global stat_result_data
    print("===== 保存统计结果 =====")
    # 确保输出目录存在
    if not os.path.exists("behavior_result"):
        os.makedirs("behavior_result")
    # 保存文件
    stat_result_data.to_csv("behavior_result/user_behavior_stat.csv", index=False, encoding="utf-8-sig")
    print("统计结果已保存至 behavior_result/user_behavior_stat.csv")
    print("===== 保存完成 =====")

# 创建ETL管道
behavior_etl_pipeline = Pipeline(
    id="user_behavior_etl_pipeline",
    description="企业级用户行为数据ETL处理管道")

# 定义任务链
gen_task = Task(
    id="gen_behavior_data",
    description="生成原始用户行为数据",
    commands=[PythonFunction(generate_behavior_data)])

clean_task = Task(
    id="clean_behavior_data",
    description="清洗用户行为数据",
    commands=[PythonFunction(clean_behavior_data_func)],
    upstream=[gen_task])

stat_task = Task(
    id="stat_behavior_data",
    description="统计用户行为",
    commands=[PythonFunction(stat_behavior_data)],
    upstream=[clean_task])

save_task = Task(
    id="save_stat_result",
    description="保存统计结果",
    commands=[PythonFunction(save_stat_result)],
    upstream=[stat_task])

# 添加任务到管道
behavior_etl_pipeline.add(gen_task)
behavior_etl_pipeline.add(clean_task)
behavior_etl_pipeline.add(stat_task)
behavior_etl_pipeline.add(save_task)

if __name__ == "__main__":
    run_pipeline(behavior_etl_pipeline)

5.3 案例运行说明

  1. 该案例基于pandas库实现数据处理,运行前需执行pip install pandas安装依赖。
  2. 管道执行流程:生成原始数据→清洗数据→统计分析→保存结果,全程自动化执行。
  3. 执行完成后,会自动创建behavior_result文件夹,内含统计结果CSV文件。
  4. 该案例可直接适配企业真实业务,只需替换数据来源、清洗规则与统计逻辑,即可投入使用。

六、相关资源

  • Pypi地址:https://pypi.org/project/mara-pipelines/
  • Github地址:https://github.com/mara/mara-pipelines
  • 官方文档地址:https://mara-pipelines.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:flupy 链式调用数据处理库从入门到实战教程

一、flupy库概述

在Python日常数据处理、数据清洗、迭代对象操作场景中,传统编写方式往往需要大量中间变量、多层循环嵌套,代码可读性与维护性较差。flupy是一款专注于Python链式调用的第三方库,基于迭代器实现惰性计算,可对列表、字典、生成器等可迭代对象进行连续、流畅的数据处理,大幅简化过滤、映射、排序、聚合等操作。

该库遵循MIT开源许可协议,核心优势为代码简洁、惰性求值节省内存、支持无限数据流、API贴近自然语言易上手;不足是复杂逻辑处理不如原生代码直观,部分高阶操作需要一定学习成本。

二、flupy库安装方法

flupy可通过Python官方包管理工具pip快速安装,打开命令行(CMD、PowerShell、终端均可),执行以下安装命令:

pip install flupy

若需要指定安装版本或使用国内镜像加速下载,可执行:

pip install flupy -i https://pypi.tuna.tsinghua.edu.cn/simple

安装完成后,在Python脚本或交互式环境中执行import flupy,无报错则代表安装成功。

三、flupy基础使用与核心API讲解

3.1 基础导入与对象创建

flupy提供统一入口flu,用于将普通可迭代对象(列表、元组、字符串、字典、生成器等)包装为支持链式调用的Flupy对象,所有数据处理操作都基于该对象展开。

基础使用代码示例:

# 导入flu核心函数
from flupy import flu

# 包装普通列表
data_list = [1, 2, 3, 4, 5]
flupy_obj = flu(data_list)

# 包装字符串
str_data = "pythonflupy"
flupy_str = flu(str_data)

# 包装字典(默认迭代key,可手动指定values或items)
dict_data = {"name": "flupy", "age": 3, "lang": "python"}
flupy_dict = flu(dict_data)

说明:flu()不会立即执行计算,仅完成对象包装,所有后续操作均为惰性执行,只有在获取结果(如转列表、循环遍历)时才真正计算,适合处理大数据量与无限流。

3.2 map映射操作

map()用于对迭代对象中每个元素执行指定函数,返回处理后的新元素,与原生map功能一致,但支持链式调用,代码更连贯。

代码示例:

from flupy import flu

# 原始数据
numbers = [1, 2, 3, 4, 5, 6]

# 链式调用:每个元素平方
result = flu(numbers).map(lambda x: x ** 2).to_list()
print("元素平方结果:", result)

# 多步骤map:先乘2,再加3
complex_result = flu(numbers).map(lambda x: x * 2).map(lambda x: x + 3).to_list()
print("先乘2再加3结果:", complex_result)

执行结果:

元素平方结果: [1, 4, 9, 16, 25, 36]
先乘2再加3结果: [5, 7, 9, 11, 13, 15]

说明:to_list()为结果转换方法,将惰性迭代对象转为可直接查看的列表,是flupy中最常用结果输出方式。

3.3 filter过滤操作

filter()根据指定条件保留符合要求的元素,剔除不符合条件元素,支持lambda表达式与自定义函数。

代码示例:

from flupy import flu

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 过滤偶数
even_result = flu(numbers).filter(lambda x: x % 2 == 0).to_list()
print("过滤后偶数:", even_result)

# 过滤大于5的数字
gt5_result = flu(numbers).filter(lambda x: x > 5).to_list()
print("大于5的数字:", gt5_result)

# 自定义过滤函数
def check_gt3_even(x):
    return x > 3 and x % 2 == 0

custom_filter = flu(numbers).filter(check_gt3_even).to_list()
print("大于3的偶数:", custom_filter)

执行结果:

过滤后偶数: [2, 4, 6, 8, 10]
大于5的数字: [6, 7, 8, 9, 10]
大于3的偶数: [4, 6, 8, 10]

说明:filter()只保留条件返回True的元素,不改变元素本身值,与map()修改元素值形成互补。

3.4 sort排序操作

sort()用于对迭代对象排序,支持指定key排序规则与reverse反转参数,使用方式简洁直观。

代码示例:

from flupy import flu

# 数字排序
numbers = [9, 3, 7, 1, 4, 8, 2, 5, 6]
asc_sort = flu(numbers).sort().to_list()
desc_sort = flu(numbers).sort(reverse=True).to_list()
print("升序排序:", asc_sort)
print("降序排序:", desc_sort)

# 字典列表按指定key排序
user_list = [
    {"name": "张三", "age": 22},
    {"name": "李四", "age": 19},
    {"name": "王五", "age": 25}
]
# 按年龄升序
age_sort = flu(user_list).sort(key=lambda x: x["age"]).to_list()
print("按年龄升序:", age_sort)

执行结果:

升序排序: [1, 2, 3, 4, 5, 6, 7, 8, 9]
降序排序: [9, 8, 7, 6, 5, 4, 3, 2, 1]
按年龄升序: [{'name': '李四', 'age': 19}, {'name': '张三', 'age': 22}, {'name': '王五', 'age': 25}]

说明:sort()默认使用原生排序规则,支持数字、字符串、字典、对象等多种数据类型。

3.5 take与skip截取操作

take(n)获取前n个元素,skip(n)跳过前n个元素,常用于分页、数据截取场景。

代码示例:

from flupy import flu

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 获取前5个元素
take_5 = flu(numbers).take(5).to_list()
print("前5个元素:", take_5)

# 跳过前3个元素
skip_3 = flu(numbers).skip(3).to_list()
print("跳过前3个元素:", skip_3)

# 组合使用:跳过2个,取4个
skip_take = flu(numbers).skip(2).take(4).to_list()
print("跳过2个取4个:", skip_take)

执行结果:

前5个元素: [1, 2, 3, 4, 5]
跳过前3个元素: [4, 5, 6, 7, 8, 9, 10]
跳过2个取4个: [3, 4, 5, 6]

说明:组合skiptake可实现简单分页逻辑,无需编写复杂切片逻辑。

3.6 distinct去重操作

distinct()用于去除迭代对象中重复元素,支持基础数据类型与自定义去重key。

代码示例:

from flupy import flu

# 基础数据去重
repeat_data = [1, 2, 2, 3, 3, 3, 4, 5, 5]
distinct_data = flu(repeat_data).distinct().to_list()
print("去重后结果:", distinct_data)

# 字符串去重
str_repeat = ["a", "b", "a", "c", "b", "d"]
str_distinct = flu(str_repeat).distinct().to_list()
print("字符串去重:", str_distinct)

# 字典列表按key去重
dict_repeat = [
    {"id": 1, "name": "test"},
    {"id": 2, "name": "demo"},
    {"id": 1, "name": "test"}
]
dict_distinct = flu(dict_repeat).distinct(key=lambda x: x["id"]).to_list()
print("按id去重后:", dict_distinct)

执行结果:

去重后结果: [1, 2, 3, 4, 5]
字符串去重: ['a', 'b', 'c', 'd']
按id去重后: [{'id': 1, 'name': 'test'}, {'id': 2, 'name': 'demo'}]

说明:distinct()通过key参数可指定去重依据,解决复杂结构数据去重问题。

3.7 group_by分组操作

group_by()按照指定key对数据分组,返回分组后的键值对,是数据统计、分类汇总常用操作。

代码示例:

from flupy import flu

student_list = [
    {"name": "小明", "class": "一班", "score": 88},
    {"name": "小红", "class": "二班", "score": 95},
    {"name": "小刚", "class": "一班", "score": 76},
    {"name": "小丽", "class": "二班", "score": 92}
]

# 按班级分组
group_by_class = flu(student_list).group_by(lambda x: x["class"]).to_list()
print("按班级分组结果:")
for group in group_by_class:
    print(f"班级:{group[0]},学生:{list(group[1])}")

执行结果:

按班级分组结果:
班级:一班,学生:[{'name': '小明', 'class': '一班', 'score': 88}, {'name': '小刚', 'class': '一班', 'score': 76}]
班级:二班,学生:[{'name': '小红', 'class': '二班', 'score': 95}, {'name': '小丽', 'class': '二班', 'score': 92}]

说明:group_by()返回迭代器,每个元素为(分组key, 分组元素迭代器),可直接转换为字典或列表。

3.8 reduce聚合操作

reduce()对所有元素依次累积计算,实现求和、求积、最大值、最小值等聚合结果。

代码示例:

from flupy import flu

numbers = [1, 2, 3, 4, 5]

# 求和
sum_result = flu(numbers).reduce(lambda a, b: a + b)
print("求和结果:", sum_result)

# 求积
product_result = flu(numbers).reduce(lambda a, b: a * b)
print("求积结果:", product_result)

# 求最大值
max_result = flu(numbers).reduce(lambda a, b: a if a > b else b)
print("最大值:", max_result)

执行结果:

求和结果: 15
求积结果: 120
最大值: 5

说明:reduce()最终返回单个聚合结果,无需手动初始化累积变量,代码更简洁。

四、flupy多API组合实战案例

4.1 学生成绩综合统计案例

需求:对学生成绩列表进行过滤、映射、排序、分组、聚合,统计各班及格人数、平均分。

完整代码:

from flupy import flu

# 原始数据:姓名、班级、成绩
score_data = [
    {"name": "小明", "class": "一班", "score": 88},
    {"name": "小红", "class": "二班", "score": 55},
    {"name": "小刚", "class": "一班", "score": 62},
    {"name": "小丽", "class": "二班", "score": 92},
    {"name": "小亮", "class": "一班", "score": 45},
    {"name": "小美", "class": "二班", "score": 78}
]

# 需求:筛选成绩>=60的学生,按班级分组,统计每组人数、平均分
result = (
    flu(score_data)
    .filter(lambda x: x["score"] >= 60)  # 过滤及格学生
    .group_by(lambda x: x["class"])     # 按班级分组
    .map(lambda group: {                 # 映射为统计结果
        "class": group[0],
        "count": len(list(group[1])),
        "total_score": flu(group[1]).map(lambda x: x["score"]).reduce(lambda a, b: a + b),
        "avg_score": round(flu(group[1]).map(lambda x: x["score"]).reduce(lambda a, b: a + b) / len(list(group[1])), 2)
    })
    .sort(key=lambda x: x["avg_score"], reverse=True)  # 按平均分降序
    .to_list()
)

# 输出结果
print("班级及格成绩统计:")
for item in result:
    print(f"班级:{item['class']},及格人数:{item['count']},总分:{item['total_score']},平均分:{item['avg_score']}")

执行结果:

班级及格成绩统计:
班级:二班,及格人数:2,总分:170,平均分:85.0
班级:一班,及格人数:2,总分:150,平均分:75.0

说明:本案例链式使用filtergroup_bymapreducesort,全程无中间变量,代码结构清晰、逻辑连贯。

4.2 日志数据清洗与分析案例

需求:清洗日志数据,提取有效信息,去重、过滤、统计访问次数最多的IP。

完整代码:

from flupy import flu

# 模拟日志数据
log_data = [
    "192.168.1.1 - - [01/Mar/2026:10:00:00] GET /index",
    "192.168.1.2 - - [01/Mar/2026:10:05:00] GET /home",
    "192.168.1.1 - - [01/Mar/2026:10:10:00] GET /about",
    "192.168.1.3 - - [01/Mar/2026:10:15:00] GET /index",
    "192.168.1.2 - - [01/Mar/2026:10:20:00] GET /index",
    "192.168.1.1 - - [01/Mar/2026:10:25:00] GET /home",
]

# 清洗日志:提取IP,统计访问次数
ip_analysis = (
    flu(log_data)
    .map(lambda line: line.split(" ")[0])  # 提取IP
    .group_by(lambda ip: ip)               # 按IP分组
    .map(lambda g: {"ip": g[0], "count": len(list(g[1]))})  # 统计次数
    .sort(key=lambda x: x["count"], reverse=True)  # 按访问次数降序
    .to_list()
)

print("IP访问统计:")
for item in ip_analysis:
    print(f"IP:{item['ip']},访问次数:{item['count']}")

执行结果:

IP访问统计:
IP:192.168.1.1,访问次数:3
IP:192.168.1.2,访问次数:2
IP:192.168.1.3,访问次数:1

说明:flupy非常适合日志、文本等半结构化数据清洗,通过链式调用快速完成数据提取、统计、排序。

五、flupy处理大数据与无限流优势

flupy基于迭代器实现惰性求值,不会一次性将所有数据加载到内存,适合处理超大规模数据与无限数据流。

示例:生成无限自然数,筛选偶数,取前10个,内存占用极低:

from flupy import flu
import itertools

# 生成无限自然数流
infinite_num = itertools.count(1)

# 惰性处理:取偶数,取前10个
result = flu(infinite_num).filter(lambda x: x % 2 == 0).take(10).to_list()
print("无限流前10个偶数:", result)

执行结果:

无限流前10个偶数: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

说明:原生代码处理无限流容易造成内存溢出,flupy惰性执行机制可安全处理无限数据流与大数据集。

相关资源

  • Pypi地址:https://pypi.org/project/flupy/
  • Github地址:https://github.com/olirice/flupy
  • 官方文档地址:https://flupy.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python 批量处理神器:BatchFlow 库从入门到实战教程

一、BatchFlow 库概述

BatchFlow 是一款面向批量数据处理、流水线任务编排的 Python 库,专注简化大规模数据的分步处理流程,核心是将复杂任务拆解为独立操作单元,按序串联执行,支持数据批式加载、转换、存储。其基于任务流调度原理,自动管理数据流转与异常处理,降低批量任务编码成本。该库开源免费,采用 MIT License,优点是轻量易用、流水线清晰、支持断点续跑,缺点是不适合超实时流计算,更偏向离线批量处理。

二、BatchFlow 基础环境安装

在使用 BatchFlow 前,需要先完成环境配置,该库支持 Python 3.7 及以上版本,兼容 Windows、macOS、Linux 系统,可通过 pip 快速完成安装。

打开命令行工具(CMD、Terminal、PowerShell 均可),执行以下安装命令:

pip install batchflow

若安装速度较慢,可使用国内镜像源加速安装:

pip install batchflow -i https://pypi.tuna.tsinghua.edu.cn/simple

安装完成后,可通过以下代码验证是否安装成功,若能正常导入且无报错,说明环境配置完成:

# 验证 BatchFlow 安装
import batchflow

# 查看库版本
print("BatchFlow 版本:", batchflow.__version__)

这段代码的作用是导入库并打印当前安装版本,确认库已成功部署到 Python 环境中,为后续批量任务开发做好准备。

三、BatchFlow 核心使用方式与基础示例

BatchFlow 的核心设计理念是流水线(Pipeline)+ 批次(Batch),所有批量操作都围绕「创建批次、定义操作、执行流水线」三个步骤展开,无需手动编写循环、异常捕获、数据分片逻辑,库内部自动完成调度。

3.1 核心概念理解

  1. Batch:数据批次,是数据处理的最小单元,可承载列表、数组、文件路径、DataFrame 等各类数据
  2. Pipeline:任务流水线,将多个数据处理操作按顺序串联,批次数据会依次流经每个操作
  3. Action:处理动作,即自定义的数据处理函数,是流水线中的单个处理节点

3.2 基础数据批量处理示例

以最常见的数字列表批量处理为例,演示如何创建批次、定义流水线、执行任务,适合新手快速理解核心逻辑。

from batchflow import Batch, Pipeline

# 1. 准备原始数据
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 2. 创建数据批次,将数据封装为 Batch 对象
batch = Batch(data)

# 3. 定义处理动作:数据翻倍
def double_data(batch):
    # 对批次中的每个数据执行翻倍操作
    batch.data = [x * 2 for x in batch.data]
    return batch

# 4. 定义处理动作:数据筛选(保留大于10的数字)
def filter_above_ten(batch):
    batch.data = [x for x in batch.data if x > 10]
    return batch

# 5. 构建流水线,按顺序串联两个处理动作
pipeline = Pipeline()
pipeline.add_action(double_data)
pipeline.add_action(filter_above_ten)

# 6. 执行流水线,处理批次数据
result_batch = pipeline.run(batch)

# 7. 输出最终结果
print("批量处理后结果:", result_batch.data)

代码说明

  • 先准备普通数字列表,通过 Batch 类封装为可被库处理的批次对象
  • 自定义两个纯数据处理函数,函数参数和返回值均为 Batch 对象,符合库的调用规范
  • 使用 Pipeline 创建流水线,通过 add_action 按执行顺序添加处理动作
  • 调用 run 方法启动流水线,自动完成所有处理步骤,返回处理后的批次数据
  • 最终输出结果为 [12, 14, 16, 18, 20],实现了「翻倍+筛选」的批量处理

3.3 带参数的批量处理示例

实际开发中,处理逻辑常需要传入自定义参数,BatchFlow 支持在添加动作时传递参数,提升流水线灵活性。

from batchflow import Batch, Pipeline

# 准备数据
data = [5, 10, 15, 20, 25]
batch = Batch(data)

# 定义带参数的处理动作:数据加法运算
def add_number(batch, num):
    batch.data = [x + num for x in batch.data]
    return batch

# 定义带参数的处理动作:数据乘法运算
def multiply_number(batch, num):
    batch.data = [x * num for x in batch.data]
    return batch

# 构建流水线并传递参数
pipeline = Pipeline()
# 给每个动作指定自定义参数
pipeline.add_action(add_number, num=10)
pipeline.add_action(multiply_number, num=2)

# 执行流水线
result = pipeline.run(batch)
print("带参数批量处理结果:", result.data)

代码说明

  • 处理函数可自定义任意数量参数,只需在 add_action 时以关键字参数形式传入
  • 流水线执行时,库会自动将参数传递给对应处理函数,无需手动传参
  • 本例先给数据加10,再乘以2,最终结果为 [30, 40, 50, 60, 70]

3.4 多类型数据批量处理

BatchFlow 不限制数据类型,支持字符串、字典、文件路径等各类可迭代数据,满足不同业务场景。

from batchflow import Batch, Pipeline

# 处理字符串数据
str_data = ["hello", "batchflow", "python", "batch", "process"]
batch = Batch(str_data)

# 字符串转大写
def to_upper(batch):
    batch.data = [s.upper() for s in batch.data]
    return batch

# 筛选长度大于4的字符串
def filter_long_str(batch):
    batch.data = [s for s in batch.data if len(s) > 4]
    return batch

pipeline = Pipeline()
pipeline.add_action(to_upper)
pipeline.add_action(filter_long_str)

result = pipeline.run(batch)
print("字符串批量处理结果:", result.data)

代码说明

  • 直接将字符串列表封装为 Batch 对象,处理逻辑与数字数据完全一致
  • 流水线执行后,输出长度大于4的大写字符串,结果为 ['HELLO', 'BATCHFLOW', 'PYTHON', 'PROCESS']
  • 体现库的通用性,无需针对不同数据类型修改核心代码

四、BatchFlow 高级功能与实战应用

4.1 文件批量读写处理

这是 BatchFlow 最常用的场景之一,可批量读取文本文件、处理内容、批量写入结果,适合日志处理、文本清洗、数据格式转换等工作。

import os
from batchflow import Batch, Pipeline

# 1. 创建测试文本文件(模拟待处理文件)
test_files = []
for i in range(1, 4):
    filename = f"test_file_{i}.txt"
    with open(filename, "w", encoding="utf-8") as f:
        f.write(f"这是测试文件{i}的原始内容\n需要批量处理的文本数据")
    test_files.append(filename)

print("待批量处理的文件:", test_files)

# 2. 定义文件读取动作
def read_file(batch):
    content_list = []
    for filepath in batch.data:
        with open(filepath, "r", encoding="utf-8") as f:
            content_list.append(f.read())
    batch.data = content_list
    return batch

# 3. 定义文本处理动作:添加前缀
def process_content(batch):
    batch.data = [f"【批量处理后】\n{content}" for content in batch.data]
    return batch

# 4. 定义文件写入动作
def write_file(batch):
    for idx, content in enumerate(batch.data):
        filename = f"processed_file_{idx+1}.txt"
        with open(filename, "w", encoding="utf-8") as f:
            f.write(content)
    return batch

# 5. 构建文件处理流水线
file_pipeline = Pipeline()
file_pipeline.add_action(read_file)
file_pipeline.add_action(process_content)
file_pipeline.add_action(write_file)

# 6. 执行批量文件处理
file_batch = Batch(test_files)
file_pipeline.run(file_batch)

print("文件批量处理完成,已生成处理后的文件")

代码说明

  • 先自动创建3个测试文本文件,模拟真实业务中的待处理文件
  • 流水线分为「读取文件→处理文本→写入新文件」三步,完整覆盖文件批量处理流程
  • 无需手动编写循环遍历文件,所有逻辑封装为独立动作,便于维护和扩展
  • 处理完成后,会生成 processed_file_1/2/3.txt 三个新文件

4.2 异常处理与任务容错

批量处理时经常遇到数据异常、文件缺失、格式错误等问题,BatchFlow 内置异常捕获机制,可避免单个数据异常导致整个任务中断。

from batchflow import Batch, Pipeline

# 包含异常数据的列表(字符串无法转换为整数)
data = [1, 2, "three", 4, "five", 6]
batch = Batch(data)

# 定义可能报错的处理动作
def safe_convert_to_int(batch):
    result = []
    for item in batch.data:
        try:
            result.append(int(item))
        except (ValueError, TypeError):
            # 异常数据跳过或标记
            result.append(f"异常数据:{item}")
    batch.data = result
    return batch

# 构建流水线
pipeline = Pipeline()
pipeline.add_action(safe_convert_to_int)

# 执行处理
result = pipeline.run(batch)
print("带异常处理的批量结果:", result.data)

代码说明

  • 原始数据包含数字和字符串,直接转换会报错
  • 在处理动作内部添加简易异常捕获,BatchFlow 会保证流水线持续执行
  • 最终结果保留正常数据,并标记异常数据,实现任务容错运行

4.3 大规模数据分批处理

当数据量过大无法一次性加载到内存时,BatchFlow 支持自动分片分批,将大数据切分为多个小批次处理,避免内存溢出。

from batchflow import Batch, Pipeline, Dataset

# 生成大规模测试数据(1000个数字)
large_data = list(range(1000))

# 创建数据集,指定每批次处理100个数据
dataset = Dataset(large_data, batch_size=100)

# 定义处理动作
def process_large_batch(batch):
    batch.data = [x * 3 for x in batch.data]
    return batch

# 构建流水线
pipeline = Pipeline()
pipeline.add_action(process_large_batch)

# 分批处理所有数据
results = []
for batch in dataset:
    processed_batch = pipeline.run(batch)
    results.extend(processed_batch.data)

print("总处理数据量:", len(results))
print("前10条处理结果:", results[:10])

代码说明

  • 使用 Dataset 管理大规模数据,通过 batch_size 指定每批次大小
  • 循环遍历数据集,自动切分数据,逐批次执行流水线
  • 适合处理百万级数据、大文件、数据库批量读取等场景
  • 有效降低内存占用,提升程序稳定性

五、真实业务场景综合案例

批量学生成绩数据处理为例,模拟教育行业真实批量任务:读取成绩数据→计算总分→筛选及格学生→生成结果文件,完整展示 BatchFlow 在实际项目中的应用。

from batchflow import Batch, Pipeline
import csv

# 1. 生成模拟学生成绩数据
student_data = [
    {"name": "张三", "score1": 85, "score2": 90, "score3": 78},
    {"name": "李四", "score1": 55, "score2": 60, "score3": 58},
    {"name": "王五", "score1": 92, "score3": 88},  # 缺失成绩
    {"name": "赵六", "score1": 75, "score2": 80, "score3": 82},
    {"name": "钱七", "score1": 45, "score2": 50, "score3": 48},
]

# 2. 计算总分(处理缺失值)
def calculate_total(batch):
    for item in batch.data:
        score1 = item.get("score1", 0)
        score2 = item.get("score2", 0)
        score3 = item.get("score3", 0)
        item["total"] = score1 + score2 + score3
    return batch

# 3. 筛选总分及格(总分≥180)
def filter_pass(batch):
    batch.data = [item for item in batch.data if item.get("total", 0) >= 180]
    return batch

# 4. 生成结果CSV文件
def export_result(batch):
    with open("student_result.csv", "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["name", "score1", "score2", "score3", "total"])
        writer.writeheader()
        writer.writerows(batch.data)
    return batch

# 5. 构建完整业务流水线
student_pipeline = Pipeline()
student_pipeline.add_action(calculate_total)
student_pipeline.add_action(filter_pass)
student_pipeline.add_action(export_result)

# 6. 执行批量处理
student_batch = Batch(student_data)
student_pipeline.run(student_batch)

print("学生成绩批量处理完成,结果已保存至 student_result.csv")

代码说明

  • 模拟真实业务中存在的数据缺失、格式不统一、多步骤处理等问题
  • 流水线清晰拆分业务逻辑,每个动作只负责单一功能,便于调试和修改
  • 最终自动生成标准 CSV 结果文件,可直接用于报表、数据分析
  • 若需扩展功能,只需新增处理动作并添加到流水线,无需修改原有代码

相关资源

  • Pypi地址:https://pypi.org/project/BatchFlow
  • Github地址:https://github.com/analysiscenter/batchflow
  • 官方文档地址:https://batchflow.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Databolt Flow(d6tflow)高效构建数据科学工作流

一、Databolt Flow(d6tflow)核心概览

Databolt Flow(项目名d6tflow)是专为数据科学场景打造的Python工作流管理库,基于Luigi引擎优化,聚焦数据预处理、特征工程、模型训练等流程的依赖管理与缓存复用。其以“数据优先”为核心,将任务封装为有向无环图(DAG),自动缓存中间结果,输入/参数变更时智能重算,大幅提升迭代效率。优点是轻量易用、数据原生、缓存高效、可视化清晰;缺点是无分布式调度能力,不适合超大规模生产级ETL。该库采用MIT开源许可证,可自由商用、修改与分发。

二、安装与环境准备

2.1 基础安装

Databolt Flow(d6tflow)通过PyPI发布,使用pip即可快速安装,同时建议搭配数据科学常用库pandasnumpy,满足数据处理基础需求:

# 安装d6tflow核心库
pip install d6tflow

# 安装数据处理依赖(可选,推荐)
pip install pandas numpy

安装完成后,可通过导入验证是否成功:

import d6tflow
import pandas as pd
import numpy as np

# 打印库版本,确认安装正常
print(f"d6tflow版本:{d6tflow.__version__}")
print(f"pandas版本:{pd.__version__}")

若输出对应版本号,说明安装成功;若出现导入错误,可检查Python环境(建议3.7+),或使用pip install --upgrade d6tflow更新至最新版。

2.2 可视化依赖安装(可选)

d6tflow支持生成工作流可视化图表,需额外安装graphviz工具与Python库:

# 安装Python端graphviz库
pip install graphviz

# 系统级安装(Windows/macOS/Linux)
# Windows:下载graphviz安装包,添加bin目录到系统环境变量
# macOS:brew install graphviz
# Linux:sudo apt-get install graphviz

安装完成后,即可通过代码生成工作流的图形化展示,直观查看任务依赖关系。

三、核心概念与工作原理

3.1 核心概念

  1. Task(任务):d6tflow的基本执行单元,对应数据处理的一个步骤(如数据加载、清洗、特征提取)。每个Task需定义requires()(依赖任务)、run()(执行逻辑)、output()(输出数据)三个核心方法。
  2. DAG(有向无环图):多个Task通过requires()建立依赖关系,形成无环的执行流程,d6tflow自动解析DAG并按拓扑顺序执行任务。
  3. 缓存机制:Task执行完成后,输出结果自动缓存到本地磁盘(默认data/目录),后续执行时若输入/参数未变,直接读取缓存,避免重复计算。
  4. 参数化:支持为Task定义参数,通过参数区分不同任务实例,实现同一逻辑的多场景复用。

3.2 工作原理

d6tflow的执行流程遵循“声明即数据、执行即持久化、变更即重算”原则:

  1. 任务定义:开发者通过继承d6tflow.Task类,声明任务的输入依赖、执行逻辑与输出格式。
  2. 依赖解析:d6tflow自动扫描所有Task的requires()方法,构建DAG,确定任务执行顺序。
  3. 执行调度:按DAG拓扑顺序执行任务,先完成所有上游依赖,再执行当前任务;执行时自动检查缓存,存在有效缓存则跳过执行。
  4. 结果持久化:任务执行完成后,输出数据(如pandas DataFrame)自动保存为文件(如parquet、csv),并记录元数据(参数、依赖、时间戳)。
  5. 变更检测:重新执行工作流时,d6tflow对比当前任务的输入、参数与缓存元数据,若有变更则重算当前及下游任务,否则复用缓存。

四、基础使用:从简单任务到完整工作流

4.1 单个任务定义与执行

以“加载CSV数据”为例,演示单个Task的完整定义与执行流程:

import d6tflow
import pandas as pd
import os

# 定义数据加载任务
class LoadData(d6tflow.Task):
    # 定义任务参数(可选)
    file_path = d6tflow.Parameter(default="data/sample_data.csv")

    # 定义任务输出:输出为pandas DataFrame,保存为parquet格式
    def output(self):
        # 自动生成缓存路径,基于任务类名、参数
        return d6tflow.targets.PandasTarget(f"data/output/load_data_{self.file_path.split('/')[-1].split('.')[0]}.parquet")

    # 任务执行逻辑:加载CSV数据
    def run(self):
        print(f"正在加载数据:{self.file_path}")
        # 读取CSV文件
        df = pd.read_csv(self.file_path)
        # 保存到输出目标
        self.output().save(df)
        print("数据加载完成,已缓存到本地")

# 执行任务
if __name__ == "__main__":
    # 创建任务实例
    task = LoadData()
    # 运行任务(自动检查缓存,存在则跳过)
    d6tflow.run(task)
    # 加载任务输出结果
    result = task.output().load()
    print("加载的数据集前5行:")
    print(result.head())

代码说明

  • d6tflow.Parameter:定义任务参数,支持默认值,参数变化会触发任务重算。
  • output():返回PandasTarget,指定输出格式与路径,d6tflow自动管理文件读写。
  • run():编写核心业务逻辑,执行完成后通过self.output().save()保存结果。
  • d6tflow.run(task):触发任务执行,首次运行会执行run()逻辑并缓存;再次运行时,因缓存存在,直接跳过执行,快速加载结果。

4.2 多任务依赖:构建简单数据处理流程

数据科学流程通常包含“加载→清洗→分析”多个步骤,通过requires()建立依赖,实现链式执行:

# 继承LoadData任务,定义数据清洗任务
class CleanData(d6tflow.Task):
    # 依赖LoadData任务,必须先完成数据加载
    def requires(self):
        return LoadData()

    # 输出清洗后的数据
    def output(self):
        return d6tflow.targets.PandasTarget("data/output/clean_data.parquet")

    def run(self):
        print("正在执行数据清洗...")
        # 加载上游任务的输出
        raw_df = self.input().load()
        # 清洗逻辑:删除缺失值、去重、过滤异常值
        clean_df = raw_df.dropna().drop_duplicates()
        clean_df = clean_df[clean_df["value"] > 0]  # 过滤value为负的异常数据
        # 保存清洗结果
        self.output().save(clean_df)
        print(f"清洗完成,原始数据{len(raw_df)}行,清洗后{len(clean_df)}行")

# 定义数据分析任务:计算统计指标
class AnalyzeData(d6tflow.Task):
    def requires(self):
        return CleanData()

    def output(self):
        return d6tflow.targets.PandasTarget("data/output/analyze_result.parquet")

    def run(self):
        print("正在执行数据分析...")
        clean_df = self.input().load()
        # 计算统计指标:均值、中位数、最大值、最小值
        analyze_result = clean_df.agg({
            "value": ["mean", "median", "max", "min"],
            "category": "nunique"
        }).reset_index()
        self.output().save(analyze_result)
        print("数据分析完成,统计结果:")
        print(analyze_result)

# 执行完整工作流
if __name__ == "__main__":
    # 执行最终任务,d6tflow自动执行所有上游依赖
    final_task = AnalyzeData()
    d6tflow.run(final_task)

    # 可视化工作流(需安装graphviz)
    d6tflow.show(final_task)
    print("工作流可视化图表已生成,可查看data/目录下的png文件")

代码说明

  • requires():指定当前任务的上游依赖,d6tflow自动按依赖顺序执行。
  • self.input():加载上游任务的输出结果,无需手动处理文件路径。
  • d6tflow.show(final_task):生成工作流的可视化图表,展示任务间的依赖关系,便于调试与协作。
  • 执行时,首次运行会依次执行LoadDataCleanDataAnalyzeData;若修改CleanData的清洗逻辑,重新运行时仅重算CleanDataAnalyzeDataLoadData复用缓存,大幅提升效率。

4.3 参数化任务:实现多场景复用

通过参数化Task,可基于同一逻辑生成不同任务实例,适配多数据源、多参数场景:

# 定义参数化的数据加载任务
class LoadParamData(d6tflow.Task):
    # 定义多个参数:文件路径、数据类型
    file_path = d6tflow.Parameter()
    data_type = d6tflow.Parameter(default="csv")

    def output(self):
        # 基于参数生成唯一缓存路径,避免不同实例冲突
        return d6tflow.targets.PandasTarget(f"data/output/load_{self.data_type}_{self.file_path.split('/')[-1].split('.')[0]}.parquet")

    def run(self):
        print(f"加载{self.data_type}数据:{self.file_path}")
        if self.data_type == "csv":
            df = pd.read_csv(self.file_path)
        elif self.data_type == "excel":
            df = pd.read_excel(self.file_path)
        else:
            raise ValueError(f"不支持的数据类型:{self.data_type}")
        self.output().save(df)

# 定义基于参数化任务的清洗任务
class CleanParamData(d6tflow.Task):
    # 接收参数并传递给上游任务
    file_path = d6tflow.Parameter()
    data_type = d6tflow.Parameter(default="csv")

    def requires(self):
        # 传递参数给上游LoadParamData
        return LoadParamData(file_path=self.file_path, data_type=self.data_type)

    def output(self):
        return d6tflow.targets.PandasTarget(f"data/output/clean_{self.data_type}_{self.file_path.split('/')[-1].split('.')[0]}.parquet")

    def run(self):
        raw_df = self.input().load()
        clean_df = raw_df.dropna().drop_duplicates()
        self.output().save(clean_df)
        print(f"参数化清洗完成,数据类型:{self.data_type},文件:{self.file_path}")

# 执行不同参数的任务实例
if __name__ == "__main__":
    # 实例1:加载CSV数据
    task_csv = CleanParamData(file_path="data/sample_data.csv", data_type="csv")
    # 实例2:加载Excel数据(需提前准备data/sample_data.xlsx)
    task_excel = CleanParamData(file_path="data/sample_data.xlsx", data_type="excel")

    # 并行执行两个任务实例
    d6tflow.run([task_csv, task_excel])

    # 加载结果
    result_csv = task_csv.output().load()
    result_excel = task_excel.output().load()
    print(f"CSV清洗后数据行数:{len(result_csv)}")
    print(f"Excel清洗后数据行数:{len(result_excel)}")

代码说明

  • 参数化任务通过d6tflow.Parameter定义可配置项,参数值不同则任务实例不同,缓存路径独立。
  • 下游任务可通过requires()传递参数给上游,实现参数的链式传递。
  • d6tflow.run([task1, task2])支持同时执行多个任务,d6tflow自动调度,提升并行处理效率。

五、进阶功能:复杂工作流与实战优化

5.1 多输入依赖:合并多个数据源

实际场景中常需合并多个数据源,d6tflow支持在requires()中返回多个任务,实现多输入合并:

# 定义用户数据加载任务
class LoadUserInfo(d6tflow.Task):
    def output(self):
        return d6tflow.targets.PandasTarget("data/output/load_user_info.parquet")

    def run(self):
        # 模拟用户数据
        user_df = pd.DataFrame({
            "user_id": [1, 2, 3, 4, 5],
            "user_name": ["张三", "李四", "王五", "赵六", "孙七"],
            "age": [25, 30, 35, 28, 40]
        })
        self.output().save(user_df)

# 定义订单数据加载任务
class LoadOrderData(d6tflow.Task):
    def output(self):
        return d6tflow.targets.PandasTarget("data/output/load_order_data.parquet")

    def run(self):
        # 模拟订单数据
        order_df = pd.DataFrame({
            "order_id": [101, 102, 103, 104, 105],
            "user_id": [1, 2, 1, 3, 5],
            "order_amount": [100, 200, 150, 300, 250],
            "order_time": pd.date_range("2026-01-01", periods=5)
        })
        self.output().save(order_df)

# 定义合并任务:合并用户数据与订单数据
class MergeData(d6tflow.Task):
    # 依赖两个上游任务,实现多输入
    def requires(self):
        return {"user_info": LoadUserInfo(), "order_data": LoadOrderData()}

    def output(self):
        return d6tflow.targets.PandasTarget("data/output/merge_user_order.parquet")

    def run(self):
        print("正在合并用户数据与订单数据...")
        # 加载多个上游输入
        user_df = self.input()["user_info"].load()
        order_df = self.input()["order_data"].load()
        # 按user_id合并数据
        merge_df = pd.merge(order_df, user_df, on="user_id", how="left")
        self.output().save(merge_df)
        print(f"合并完成,合并后数据行数:{len(merge_df)}")
        print(merge_df.head())

# 执行合并任务
if __name__ == "__main__":
    merge_task = MergeData()
    d6tflow.run(merge_task)

代码说明

  • requires()返回字典,键为输入名称,值为依赖任务,便于区分多个上游输入。
  • self.input()返回对应字典,通过键名加载不同上游结果,实现多数据源灵活合并。

5.2 任务状态管理与调试

d6tflow提供任务状态查询、缓存清理等功能,便于调试与维护工作流:

if __name__ == "__main__":
    merge_task = MergeData()

    # 1. 查询任务状态
    print("任务状态:", merge_task.status())  # 输出:pending/running/completed

    # 2. 强制重新运行任务(忽略缓存)
    # d6tflow.run(merge_task, force=True)

    # 3. 清理任务缓存
    # merge_task.output().remove()  # 清理当前任务缓存
    # d6tflow.clear(merge_task)  # 清理当前及所有上游任务缓存

    # 4. 查看任务依赖树
    print("任务依赖树:")
    d6tflow.deps(merge_task, indent=2)  # 缩进展示依赖关系

    # 5. 导出任务元数据
    meta = merge_task.meta()
    print("任务元数据:", meta)

常用调试命令

  • force=True:强制重算任务,适用于逻辑修改后需重新执行的场景。
  • d6tflow.clear(task):批量清理缓存,解决缓存异常问题。
  • d6tflow.deps(task):可视化依赖树,快速定位依赖关系错误。

5.3 自定义输出格式:适配不同存储需求

d6tflow内置PandasTargetCSVTaretParquetTarget等,也支持自定义输出格式,适配数据库、云存储等场景:

# 自定义JSON输出目标
class JsonTarget(d6tflow.targets.Target):
    def save(self, obj):
        import json
        with open(self.path, "w", encoding="utf-8") as f:
            json.dump(obj.to_dict(orient="records"), f, ensure_ascii=False, indent=2)

    def load(self):
        import json
        import pandas as pd
        with open(self.path, "r", encoding="utf-8") as f:
            data = json.load(f)
        return pd.DataFrame(data)

# 使用自定义目标的任务
class ExportJsonData(d6tflow.Task):
    def requires(self):
        return MergeData()

    def output(self):
        return JsonTarget("data/output/merge_result.json")

    def run(self):
        merge_df = self.input().load()
        self.output().save(merge_df)
        print("数据已导出为JSON格式")

# 执行导出任务
if __name__ == "__main__":
    export_task = ExportJsonData()
    d6tflow.run(export_task)

代码说明

  • 继承d6tflow.targets.Target,实现save()load()方法,即可自定义输出格式。
  • 自定义目标可适配JSON、Excel、数据库(如SQLAlchemy)、云存储(如S3)等场景,提升灵活性。

六、实际案例:机器学习模型训练工作流

6.1 案例背景

以“用户购买预测”机器学习任务为例,构建完整工作流:数据加载→数据清洗→特征工程→模型训练→模型评估→结果导出,覆盖数据科学全流程。

6.2 完整代码实现

import d6tflow
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import joblib

# - 1. 数据加载任务 -
class LoadMLData(d6tflow.Task):
    def output(self):
        return d6tflow.targets.PandasTarget("data/ml/raw_data.parquet")

    def run(self):
        # 模拟用户行为数据:用户特征+购买标签
        np.random.seed(42)
        raw_df = pd.DataFrame({
            "user_id": range(1, 1001),
            "age": np.random.randint(18, 65, 1000),
            "visit_count": np.random.randint(1, 50, 1000),
            "avg_spend": np.random.uniform(10, 1000, 1000),
            "device_type": np.random.choice(["mobile", "pc", "tablet"], 1000),
            "purchase": np.random.choice([0, 1], 1000, p=[0.7, 0.3])  # 30%用户购买
        })
        self.output().save(raw_df)
        print("机器学习原始数据加载完成")

# - 2. 数据清洗任务 -
class CleanMLData(d6tflow.Task):
    def requires(self):
        return LoadMLData()

    def output(self):
        return d6tflow.targets.PandasTarget("data/ml/clean_data.parquet")

    def run(self):
        raw_df = self.input().load()
        # 清洗:删除缺失值、处理异常值、编码分类特征
        clean_df = raw_df.dropna()
        clean_df = clean_df[clean_df["avg_spend"] > 0]
        # 独热编码设备类型
        clean_df = pd.get_dummies(clean_df, columns=["device_type"], drop_first=True)
        self.output().save(clean_df)
        print(f"数据清洗完成,样本数:{len(clean_df)},特征数:{clean_df.shape[1]}")

# - 3. 特征工程任务 -
class FeatureEngineering(d6tflow.Task):
    def requires(self):
        return CleanMLData()

    def output(self):
        return {
            "train": d6tflow.targets.PandasTarget("data/ml/train_data.parquet"),
            "test": d6tflow.targets.PandasTarget("data/ml/test_data.parquet"),
            "features": d6tflow.targets.PandasTarget("data/ml/feature_names.parquet")
        }

    def run(self):
        clean_df = self.input().load()
        # 分离特征与标签
        X = clean_df.drop(["user_id", "purchase"], axis=1)
        y = clean_df["purchase"]
        # 划分训练集与测试集
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        # 合并特征与标签
        train_df = pd.concat([X_train, y_train], axis=1)
        test_df = pd.concat([X_test, y_test], axis=1)
        # 保存特征名称
        feature_names = pd.DataFrame({"feature": X.columns})
        # 保存结果
        self.output()["train"].save(train_df)
        self.output()["test"].save(test_df)
        self.output()["features"].save(feature_names)
        print(f"特征工程完成,训练集:{len(train_df)},测试集:{len(test_df)}")

# - 4. 模型训练任务 -
class TrainModel(d6tflow.Task):
    # 模型参数
    n_estimators = d6tflow.IntParameter(default=100)
    max_depth = d6tflow.IntParameter(default=10)

    def requires(self):
        return FeatureEngineering()

    def output(self):
        return d6tflow.targets.FileTarget("data/ml/rf_model.pkl")  # 保存模型文件

    def run(self):
        # 加载训练数据
        train_df = self.input()["train"].load()
        X_train = train_df.drop("purchase", axis=1)
        y_train = train_df["purchase"]
        # 训练随机森林模型
        model = RandomForestClassifier(
            n_estimators=self.n_estimators,
            max_depth=self.max_depth,
            random_state=42
        )
        model.fit(X_train, y_train)
        # 保存模型
        joblib.dump(model, self.output().path)
        print(f"模型训练完成,参数:n_estimators={self.n_estimators}, max_depth={self.max_depth}")

# - 5. 模型评估任务 -
class EvaluateModel(d6tflow.Task):
    def requires(self):
        return {"model": TrainModel(), "test_data": FeatureEngineering()}

    def output(self):
        return d6tflow.targets.PandasTarget("data/ml/evaluation_result.parquet")

    def run(self):
        # 加载模型与测试数据
        model = joblib.load(self.input()["model"].path)
        test_df = self.input()["test_data"]["test"].load()
        X_test = test_df.drop("purchase", axis=1)
        y_test = test_df["purchase"]
        # 模型预测与评估
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred, output_dict=True)
        # 整理评估结果
        eval_result = pd.DataFrame({
            "metric": ["accuracy", "precision_0", "recall_0", "f1_0", "precision_1", "recall_1", "f1_1"],
            "value": [
                accuracy,
                report["0"]["precision"], report["0"]["recall"], report["0"]["f1-score"],
                report["1"]["precision"], report["1"]["recall"], report["1"]["f1-score"]
            ]
        })
        self.output().save(eval_result)
        print(f"模型评估完成,测试集准确率:{accuracy:.4f}")
        print("分类报告:")
        print(pd.DataFrame(report).transpose())

# - 6. 结果导出任务 -
class ExportResult(d6tflow.Task):
    def requires(self):
        return EvaluateModel()

    def output(self):
        return d6tflow.targets.CSVTarget("data/ml/final_result.csv")

    def run(self):
        eval_result = self.input().load()
        eval_result.to_csv(self.output().path, index=False)
        print("评估结果已导出为CSV文件,路径:data/ml/final_result.csv")

# - 执行完整机器学习工作流 -
if __name__ == "__main__":
    final_ml_task = ExportResult()
    # 运行工作流
    d6tflow.run(final_ml_task)
    # 可视化工作流
    d6tflow.show(final_ml_task)
    # 加载最终结果
    final_result = final_ml_task.output().load()
    print("最终评估结果:")
    print(final_result)

6.3 案例说明

  1. 流程完整性:覆盖机器学习从数据到结果的全流程,每个步骤封装为独立Task,职责单一、易于维护。
  2. 缓存优势:修改模型参数(如n_estimators)时,仅重算TrainModel及下游EvaluateModelExportResult,上游数据处理任务复用缓存,大幅缩短训练时间。
  3. 可复现性:所有步骤参数化、结果缓存,确保多次执行结果一致,满足机器学习实验的可复现要求。
  4. 协作友好:通过可视化图表展示工作流,团队成员可快速理解流程,便于代码审查与协作开发。

七、相关资源

  • PyPI地址:https://pypi.org/project/d6tflow/
  • GitHub地址:https://github.com/d6t/d6tflow
  • 官方文档地址:https://d6tflow.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python 机器学习与数据管道管理神器:dbnd 库从入门到实战详解

一、dbnd 库基础认知

1.1 库核心用途

dbnd 是一款面向数据工程、机器学习与数据分析场景的 Python 工作流管理库,核心用于数据管道构建、任务编排、运行追踪、结果复现与实验管理,可高效衔接数据读取、清洗、模型训练、评估、部署全流程,解决机器学习与数据处理中任务混乱、结果难复现、日志缺失、依赖管理复杂等问题。

1.2 工作原理

dbnd 以任务函数装饰器为核心,通过注解自动捕获函数输入输出、参数、日志、执行时间与异常,内置轻量调度引擎实现任务依赖编排,同时提供本地/远程运行、结果缓存、数据版本追踪能力,支持将运行结果存储至本地文件、数据库或云端服务,实现全流程可观测、可复现、可管理。

1.3 优缺点

优点:轻量无侵入、代码改造量极小、支持任务缓存与断点续跑、兼容主流机器学习库(Scikit-learn、TensorFlow、PyTorch)、提供可视化追踪界面、支持多环境运行。
缺点:超大规模分布式调度能力弱于 Airflow,生态聚焦数据管道,通用 Web 或非数据场景适配性一般。

1.4 License 类型

Apache License 2.0(开源商用友好)

二、dbnd 安装与基础配置

2.1 环境要求

dbnd 支持 Python 3.7 及以上版本,兼容 Windows、macOS、Linux 系统,可与 Jupyter Notebook、PyCharm、VS Code 无缝配合。

2.2 安装命令

使用 pip 快速安装:

pip install dbnd

安装扩展包(支持机器学习、数据可视化、远程运行):

pip install dbnd[ml,web,aws]

验证安装:

dbnd version

出现版本号即安装成功。

2.3 初始化项目

在项目目录执行初始化命令,生成标准项目结构:

dbnd init

初始化后生成基础目录:

your_project/
├── .dbnd/           # dbnd 配置与运行日志
├── tasks/           # 自定义任务脚本
├── data/            # 输入输出数据
├── models/          # 模型存储
└── dbnd.cfg         # 配置文件

三、dbnd 核心功能与基础代码示例

3.1 基础任务定义与运行

dbnd 使用 @task 装饰器将普通函数转为可追踪、可编排的任务,自动记录参数、运行状态与结果。

from dbnd import task

# 定义基础计算任务
@task
def add_numbers(a: int, b: int) -> int:
    """计算两个数字之和"""
    result = a + b
    print(f"计算结果: {result}")
    return result

# 直接运行任务
if __name__ == "__main__":
    output = add_numbers(a=10, b=25)
    print(f"最终输出: {output}")

代码说明

  • @task 装饰器自动封装函数,开启参数校验、日志追踪;
  • 函数参数类型注解可被 dbnd 读取,实现输入合法性校验;
  • 任务可像普通函数一样直接调用,无需复杂配置。

运行后控制台会显示任务名称、状态、耗时、结果路径等信息。

3.2 任务依赖与管道编排

dbnd 支持任务链式调用,自动识别依赖关系,构建顺序执行管道。

from dbnd import task, pipeline

# 步骤1:数据加载
@task
def load_data() -> list:
    data = [10, 20, 30, 40, 50]
    print("数据加载完成")
    return data

# 步骤2:数据求和
@task
def calculate_sum(data: list) -> int:
    total = sum(data)
    print(f"数据求和: {total}")
    return total

# 步骤3:计算平均值
@task
def calculate_average(total: int, length: int) -> float:
    avg = total / length
    print(f"平均值: {avg}")
    return avg

# 定义管道:串联多个任务
@pipeline
def data_process_pipeline():
    data = load_data()
    total = calculate_sum(data=data)
    avg = calculate_average(total=total, length=len(data))
    return avg

# 运行管道
if __name__ == "__main__":
    result = data_process_pipeline()
    print(f"管道最终结果: {result}")

代码说明

  • @pipeline 用于封装任务流,自动管理任务执行顺序;
  • 任务间通过参数传递建立依赖,前序任务完成后才会执行后续任务;
  • 运行时控制台展示完整依赖图,便于排查执行链路。

3.3 数据读写与缓存机制

dbnd 内置数据读写功能,支持 CSV、JSON、Pickle 等格式,自动缓存结果,避免重复计算,提升运行效率。

import pandas as pd
from dbnd import task, output

# 定义输出路径
@task(result=output.csv)
def create_csv_data() -> pd.DataFrame:
    """生成 DataFrame 并保存为 CSV"""
    data = {
        "name": ["Alice", "Bob", "Charlie"],
        "score": [85, 92, 78]
    }
    df = pd.DataFrame(data)
    return df

# 读取 CSV 数据
@task
def read_csv_data(df: pd.DataFrame) -> pd.DataFrame:
    print("读取数据:")
    print(df)
    return df

if __name__ == "__main__":
    df = create_csv_data()
    read_csv_data(df=df)

代码说明

  • output.csv 指定输出格式,dbnd 自动管理文件路径与版本;
  • 任务结果会被缓存,重复运行时直接读取缓存,大幅提速;
  • 支持 JSON、Parquet、Excel 等多种数据格式。

3.4 机器学习任务追踪

dbnd 可自动追踪机器学习模型的参数、指标、数据集与训练日志,适合实验管理。

from dbnd import task
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

@task
def train_iris_model() -> float:
    # 加载数据集
    iris = load_iris()
    X, y = iris.data, iris.target

    # 划分训练集测试集
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # 模型训练
    model = LogisticRegression(max_iter=200)
    model.fit(X_train, y_train)

    # 预测与评估
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)

    print(f"模型准确率: {acc:.4f}")
    return acc

if __name__ == "__main__":
    accuracy = train_iris_model()

代码说明

  • dbnd 自动记录数据集版本、模型参数、评估指标;
  • 所有运行结果存入本地数据库,可随时回溯历史实验;
  • 兼容 Scikit-learn、XGBoost、LightGBM 等主流 ML 库。

3.5 可视化运行界面

dbnd 内置 Web UI,用于查看任务运行状态、日志、指标、依赖图。
启动命令:

dbnd webserver --port 8080

访问地址:http://127.0.0.1:8080
在界面中可查看:

  • 任务执行时间与状态
  • 输入输出参数
  • 运行日志与报错信息
  • 任务依赖拓扑图
  • 机器学习实验对比

四、dbnd 高级功能与实战案例

4.1 自定义配置与环境切换

通过修改 dbnd.cfg 可切换本地、测试、生产环境,支持自定义数据路径、日志级别、存储方式。

示例配置:

[core]
local_db = sqlite:///.dbnd/dbnd.db
task_run_dir = ./tasks/runs
log_level = INFO

[output]

default = csv path = ./data/output

4.2 异常捕获与重试机制

dbnd 自动捕获任务异常,支持失败重试、超时控制,提升管道稳定性。

from dbnd import task

@task(retries=3, retry_delay=1)
def unstable_task():
    import random
    if random.random() < 0.5:
        raise ValueError("随机异常,测试重试")
    return "执行成功"

if __name__ == "__main__":
    unstable_task()

代码说明

  • retries 设置最大重试次数;
  • retry_delay 设置重试间隔;
  • 异常信息会完整记录,便于定位问题。

4.3 完整实战案例:机器学习训练与评估管道

以下案例实现数据加载 → 预处理 → 训练 → 评估 → 保存模型的完整流程。

from dbnd import task, pipeline
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib

# 1. 加载数据
@task
def load_dataset() -> pd.DataFrame:
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
    columns = ["sepal_len", "sepal_wid", "petal_len", "petal_wid", "target"]
    df = pd.read_csv(url, names=columns)
    return df

# 2. 数据预处理
@task
def preprocess_data(df: pd.DataFrame) -> tuple:
    X = df.drop("target", axis=1)
    y = df["target"]
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    return X_scaled, y, scaler

# 3. 训练模型
@task
def train_model(X, y) -> RandomForestClassifier:
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X, y)
    return model

# 4. 模型评估
@task
def evaluate_model(model: RandomForestClassifier, X, y):
    y_pred = model.predict(X)
    report = classification_report(y, y_pred)
    print(report)
    return report

# 5. 保存模型
@task
def save_model(model, scaler):
    joblib.dump(model, "models/iris_model.pkl")
    joblib.dump(scaler, "models/scaler.pkl")
    return "模型保存完成"

# 完整管道
@pipeline
def ml_train_pipeline():
    df = load_dataset()
    X, y, scaler = preprocess_data(df)
    model = train_model(X, y)
    evaluate_model(model, X, y)
    save_model(model, scaler)

if __name__ == "__main__":
    ml_train_pipeline()

代码说明

  • 覆盖机器学习工程化全流程;
  • 每一步均可独立追踪、复现、调试;
  • 适合团队协作与实验管理。

五、命令行运行与项目管理

5.1 命令行执行任务

# 运行单个任务
dbnd run module_name::task_name --param value

# 运行管道
dbnd run pipeline.py::data_process_pipeline

# 查看历史运行
dbnd runs list

# 查看任务详情
dbnd runs show <run_id>

5.2 多环境运行

# 本地运行
dbnd run train.py::ml_train_pipeline

# 远程运行(需配置远程环境)
dbnd run --env remote train.py::ml_train_pipeline

相关资源

  • Pypi地址:https://pypi.org/project/dbnd/
  • Github地址:https://github.com/databand-ai/dbnd
  • 官方文档地址:https://dbnd.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。