Python 任务队列神器:TaskTiger 从入门到实战,轻松搞定异步任务

一、TaskTiger 库概述

TaskTiger 是基于 Redis 开发的 Python 分布式任务队列库,专注于异步任务处理、定时任务与重试机制,采用生产者-消费者模型,通过 Redis 存储任务与状态。优点为轻量易用、依赖少、支持任务重试/定时/优先级,缺点是功能不如 Celery 丰富,生态较小。基于 MIT License 开源,可自由商用与修改。

二、TaskTiger 安装与环境准备

在使用 TaskTiger 之前,必须先完成环境配置,它强依赖 Redis,所以我们要先安装 Redis 服务,再安装 TaskTiger 本身。

2.1 安装 Redis

TaskTiger 使用 Redis 作为消息代理和任务存储后端,因此需要先安装并启动 Redis:

  • Linux:可通过 apt、yum 等包管理器安装
  • macOS:使用 Homebrew 安装 brew install redis
  • Windows:可下载 Redis 安装包或使用 WSL 安装

安装完成后,启动 Redis 服务:

redis-server

默认情况下,Redis 会在本地的 6379 端口运行,这也是 TaskTiger 默认使用的地址。

2.2 安装 TaskTiger

使用 pip 即可直接安装最新版本的 TaskTiger,命令如下:

pip install tasktiger

安装完成后,我们可以在 Python 环境中导入 tasktiger 来验证是否安装成功:

import tasktiger
print(tasktiger.__version__)

如果能够正常输出版本号,就说明 TaskTiger 已经成功安装,可以开始使用。

2.3 基础连接配置

TaskTiger 默认连接本地的 Redis(localhost:6379),如果你的 Redis 有密码、端口不同,或者是远程 Redis 服务,可以在初始化时进行配置:

from tasktiger import TaskTiger
import redis

# 自定义 Redis 连接
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    password='your_redis_password',  # 有密码则填写
    db=0
)

# 初始化 TaskTiger
tiger = TaskTiger(connection=redis_client)

这是最基础的配置方式,绝大多数场景下,使用默认连接就足够使用。

三、TaskTiger 核心使用方式与基础示例

TaskTiger 的核心使用流程非常清晰:定义任务 → 提交任务 → 启动 worker 执行任务,我们从最简单的异步任务开始演示。

3.1 定义并执行第一个异步任务

我们先创建一个 Python 文件,命名为 tasks.py,在里面编写一个简单的任务函数,并用 TaskTiger 装饰器将其注册为任务。

tasks.py

from tasktiger import TaskTiger

# 初始化 TaskTiger 实例
tiger = TaskTiger()

# 使用装饰器注册任务
@tiger.task
def add(a, b):
    """简单的加法任务,模拟基础计算任务"""
    result = a + b
    print(f"任务执行:{a} + {b} = {result}")
    return result

上面代码中,@tiger.task 装饰器是核心,它会把普通函数变成可以被 TaskTiger 调度的异步任务。

接下来,我们再创建一个文件 producer.py,用来提交任务到队列
producer.py

from tasks import add

# 异步提交任务,不会阻塞当前程序
add.delay(10, 20)
add.delay(30, 40)

print("任务已提交到队列,等待 worker 执行")

这里的 delay() 方法是 TaskTiger 任务对象的核心方法,作用是将函数调用封装成任务,发送到 Redis 队列中,而不是立即执行。

现在我们有了任务定义和任务提交代码,还需要启动 worker 进程来消费并执行队列中的任务。
打开命令行,进入项目目录,执行以下命令:

tasktiger -w tasks

参数 -w 后面跟着的是任务所在的模块名,也就是我们的 tasks.py,去掉 .py 后缀。

执行后,worker 会持续运行,监听 Redis 队列,一旦有新任务就会立即执行,你会在控制台看到:

任务执行:10 + 20 = 30
任务执行:30 + 40 = 70

这就是 TaskTiger 最基础的异步任务流程,适用于发送邮件、生成日志、简单计算等不需要立即返回结果的场景。

3.2 带参数与关键字参数的任务

TaskTiger 支持位置参数、关键字参数,使用方式和普通函数完全一致,非常贴合 Python 原生语法。

我们在 tasks.py 中新增一个任务:

@tiger.task
def send_message(user_id, content, priority="normal"):
    """模拟发送消息任务"""
    print(f"向用户 {user_id} 发送消息:{content},优先级:{priority}")

producer.py 中提交任务:

from tasks import send_message

# 位置参数
send_message.delay(1001, "你好,欢迎使用 TaskTiger")
# 关键字参数
send_message.delay(1002, "异步任务执行成功", priority="high")

启动 worker 后,任务会按照提交顺序依次执行,参数传递完全兼容普通函数的用法,对开发者非常友好,不需要额外学习复杂的参数规则。

四、TaskTiger 高级功能使用

TaskTiger 不止能执行基础异步任务,还提供了任务重试、定时任务、任务唯一、任务优先级、异常处理等高级功能,满足生产环境需求。

4.1 任务失败自动重试

在实际项目中,任务可能因为网络波动、第三方接口异常而执行失败,TaskTiger 支持自动重试机制,可以指定重试次数、重试间隔。

我们在 tasks.py 中添加一个会随机失败的任务,模拟接口调用失败:

import random

@tiger.task(retry=3, retry_delay=5)
def request_api():
    """模拟请求第三方API,随机失败"""
    if random.choice([True, False]):
        raise Exception("API 请求失败")
    print("API 请求成功")

参数说明:

  • retry=3:任务失败后最多重试 3 次
  • retry_delay=5:每次重试间隔 5 秒

当任务抛出异常时,TaskTiger 会自动将任务重新放入队列,等待指定时间后再次执行,直到执行成功或达到最大重试次数。

4.2 定时任务与延时任务

很多场景需要任务在指定时间后执行,或者每天、每小时定时执行,TaskTiger 提供了便捷的延时执行方法。

4.2.1 固定延时执行任务

在提交任务时使用 execute_after() 方法,可以指定任务在多少秒后执行:

from tasks import add
from datetime import timedelta

# 10 秒后执行
add.execute_after(timedelta(seconds=10), 5, 8)

# 也可以直接传入秒数
add.execute_after(10, 5, 8)

这个功能非常适合用于订单超时未支付取消、延时发送通知等场景。

4.2.2 指定时间点执行任务

除了延时执行,还可以指定具体的时间点执行任务:

from datetime import datetime

# 在 2026-01-01 00:00:00 执行
run_time = datetime(2026, 1, 1, 0, 0, 0)
add.execute_at(run_time, 1, 1)

这种方式适合节日定时推送、定时报表生成等场景。

4.3 唯一任务:避免重复提交

在高并发场景下,可能会出现重复提交相同任务的情况,比如重复发送短信、重复创建订单,TaskTiger 可以通过 unique 参数设置任务唯一。

@tiger.task(unique=True)
def send_sms(phone, code):
    """给同一个手机号发送验证码,防止重复发送"""
    print(f"向 {phone} 发送验证码:{code}")

unique=True 时,TaskTiger 会根据函数名和参数生成唯一标识,在任务执行完成前,相同参数的任务不会重复加入队列,从根源上避免重复执行。

4.4 任务优先级控制

当队列中有大量任务时,我们希望重要任务优先执行,比如支付、通知任务,TaskTiger 支持设置任务优先级。

# 定义高优先级任务
@tiger.task(priority=10)
def high_priority_task():
    print("高优先级任务执行")

# 定义低优先级任务
@tiger.task(priority=1)
def low_priority_task():
    print("低优先级任务执行")

数字越大,优先级越高,worker 会优先从高优先级队列中取任务执行。

4.5 任务状态与结果查看

TaskTiger 会在 Redis 中记录任务的状态,包括等待、执行中、成功、失败、重试等,我们可以在代码中查看任务状态。

from tasks import add

# 提交任务并获取任务对象
task = add.delay(10, 20)

# 查看任务 ID
print("任务 ID:", task.id)
# 查看任务状态
print("任务状态:", task.status)
# 查看任务结果(执行完成后才有)
print("任务结果:", task.result)

常见任务状态:

  • queued:已进入队列,等待执行
  • active:正在执行
  • done:执行成功
  • failed:执行失败
  • retrying:等待重试

五、生产环境常用配置与多 worker 部署

在实际生产项目中,单 worker 无法满足高并发需求,TaskTiger 支持启动多个 worker 进程,同时监听队列,提升任务处理效率。

5.1 启动多个 worker

命令行中可以使用 -w 参数指定 worker 数量,例如启动 4 个 worker:

tasktiger -w tasks -n 4

参数 -n 4 表示启动 4 个 worker 进程,并行处理任务,充分利用服务器 CPU 资源。

5.2 指定队列执行

TaskTiger 支持多队列隔离,不同类型的任务放入不同队列,避免任务互相干扰。

定义任务时指定队列:

@tiger.task(queue="email")
def send_email(email):
    print(f"发送邮件到 {email}")

@tiger.task(queue="sms")
def send_sms(phone):
    print(f"发送短信到 {phone}")

启动 worker 时只监听指定队列:

# 只处理 email 队列任务
tasktiger -w tasks -q email

# 只处理 sms 队列任务
tasktiger -w tasks -q sms

这种方式适合大型项目,按业务模块拆分任务队列,方便维护和扩容。

5.3 后台运行 worker

生产环境中,我们需要让 worker 在后台持续运行,可以使用 nohupsupervisor 托管进程。

使用 nohup 后台启动:

nohup tasktiger -w tasks -n 4 > tasktiger.log 2>&1 &

日志会输出到 tasktiger.log 文件,方便排查问题。

六、实际项目综合案例:用户注册异步流程

我们模拟一个完整的用户注册业务流程,用户注册成功后,异步发送欢迎邮件、记录日志、统计注册数量,使用 TaskTiger 实现全流程解耦。

项目结构

project/
├── tasks.py        # 任务定义
├── user_service.py # 用户注册逻辑
└── run_worker.sh   # worker 启动脚本

6.1 编写任务文件 tasks.py

from tasktiger import TaskTiger
import time

tiger = TaskTiger()

# 发送欢迎邮件
@tiger.task(queue="email", retry=2, retry_delay=3)
def send_welcome_email(email):
    time.sleep(1)  # 模拟发送邮件耗时
    print(f"【邮件】已向 {email} 发送欢迎邮件")

# 记录用户注册日志
@tiger.task(queue="log")
def write_register_log(user_id, username):
    log_info = f"用户 {user_id} - {username} 注册成功"
    print(f"【日志】{log_info}")
    # 实际项目可写入数据库或文件

# 统计注册人数
@tiger.task(queue="stat")
def update_register_count():
    print("【统计】更新系统注册人数 +1")

6.2 编写用户注册服务 user_service.py

from tasks import send_welcome_email, write_register_log, update_register_count

def register_user(username, email):
    """模拟用户注册"""
    # 1. 模拟数据库保存用户信息
    user_id = 10001
    print(f"用户 {username} 注册成功,用户ID:{user_id}")

    # 2. 提交异步任务,不阻塞注册流程
    send_welcome_email.delay(email)
    write_register_log.delay(user_id, username)
    update_register_count.delay()

    return {"user_id": user_id, "username": username}

# 模拟用户注册
if __name__ == "__main__":
    result = register_user("test_user", "[email protected]")
    print("注册接口返回结果:", result)

6.3 启动 worker 并执行

  1. 启动 TaskTiger worker:
tasktiger -w tasks -n 3
  1. 运行用户注册脚本:
python user_service.py

控制台输出:

用户 test_user 注册成功,用户ID:10001
注册接口返回结果: {'user_id': 10001, 'username': 'test_user'}
【邮件】已向 [email protected] 发送欢迎邮件
【日志】用户 10001 - test_user 注册成功
【统计】更新系统注册人数 +1

可以看到,用户注册接口快速返回结果,后续的邮件、日志、统计操作都由 TaskTiger 异步执行,极大提升了接口响应速度。

七、相关资源

  • Pypi 地址:https://pypi.org/project/tasktiger/
  • Github 地址:https://github.com/closeio/tasktiger
  • 官方文档地址:https://tasktiger.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实时流处理利器:streamparse从入门到实战教程

一、streamparse库概述

streamparse是一款专为Python开发者设计的实时流数据处理库,它基于Apache Storm分布式实时计算引擎,让开发者无需深入掌握Java底层逻辑,就能快速构建、部署、运行高吞吐、低延迟的实时流处理任务。其核心原理是通过Python与Storm的通信接口,实现数据的实时接收、清洗、计算与输出,将分布式流处理能力轻量化集成到Python生态中。该库采用Apache License 2.0开源协议,允许商业与非商业自由使用。优点是上手快、兼容Python生态、分布式扩展性强,缺点是依赖Java环境,轻量场景下略显冗余。

二、streamparse环境安装与基础配置

2.1 前置环境准备

streamparse的运行依赖两大核心环境,必须提前完成配置,否则无法正常安装与启动:

  1. Java 8及以上版本:Apache Storm基于Java开发,是底层运行载体
  2. Python 3.6及以上版本:保证语法与库的兼容性

Java环境配置完成后,可通过终端命令验证:

java -version

出现Java版本信息即代表配置成功,若提示命令未找到,需重新配置Java环境变量。

2.2 streamparse库安装

直接通过Python官方包管理工具pip即可快速安装,命令简洁且自动处理依赖:

pip install streamparse

安装过程中若出现权限问题,可添加--user参数:

pip install streamparse --user

安装完成后,验证安装是否成功:

sparse --version

输出版本信息则说明安装无误,sparse是streamparse的核心命令行工具,后续项目创建、任务部署均依赖该指令。

2.3 初始化streamparse项目

streamparse采用标准化项目结构,通过命令快速创建项目,避免手动配置目录的繁琐:

sparse quickstart stream_demo

执行后会自动生成名为stream_demo的项目文件夹,进入项目目录:

cd stream_demo

此时项目已具备完整的运行结构,无需额外修改基础配置,可直接开发业务逻辑。

三、streamparse核心组件与工作流程

3.1 核心组件介绍

streamparse的流处理逻辑围绕三大核心组件展开,是理解其工作原理的关键:

  1. Spout:数据流的源头,负责从外部系统(如消息队列、日志文件、API接口)实时读取数据,是整个流处理任务的输入入口,一个任务可包含一个或多个Spout。
  2. Bolt:数据处理单元,承担数据清洗、转换、计算、聚合、过滤等核心业务逻辑,Bolt之间可串联或并联,形成完整的处理链路。
  3. Topology:流处理任务的整体拓扑结构,定义Spout与Bolt的组合关系、数据流向、并行度,是任务部署和运行的核心配置文件。

3.2 数据流转流程

数据在streamparse中的流转遵循固定逻辑,确保实时性与稳定性:

  1. Spout持续采集外部数据,将数据封装为元组(Tuple)发送至数据流
  2. 数据按照Topology定义的路径,传输至对应的Bolt进行处理
  3. Bolt处理完成后,可将结果继续传递给下一级Bolt,或直接输出至外部存储
  4. 整个过程分布式并行执行,支持水平扩展,保证高并发场景下的处理效率

四、streamparse基础代码实例演示

4.1 自定义Spout数据源开发

Spout是数据入口,我们以模拟实时生成数字流为例,编写最简单的基础Spout,让初学者快速理解数据生成逻辑。

打开项目目录下spouts文件夹中的random_spout.py文件,编写如下代码:

from streamparse import Spout
import time
import random

class RandomNumberSpout(Spout):
    # 定义输出字段,下游Bolt可通过该字段接收数据
    outputs = ['number']

    def initialize(self, storm_conf, context):
        """
        初始化方法,任务启动时执行一次
        可用于连接数据库、消息队列等初始化操作
        """
        self.logger.info("随机数Spout初始化完成,开始生成数据")

    def next_tuple(self):
        """
        核心方法,持续循环执行,生成实时数据
        模拟每秒生成一个0-100的随机整数
        """
        random_num = random.randint(0, 100)
        # 将数据发送至下游
        self.emit([random_num])
        # 控制数据生成频率,每秒1条
        time.sleep(1)

代码说明

  • 继承Spout基类,实现自定义数据源
  • outputs定义输出字段名,必须与下游Bolt接收字段对应
  • initialize为初始化方法,适合做一次性配置
  • next_tuple是核心循环方法,持续生成并发送数据

4.2 自定义Bolt数据处理逻辑开发

Bolt负责处理Spout发送的数据,我们先编写一个基础Bolt,实现接收随机数并打印日志的功能,后续再扩展复杂计算逻辑。

bolts文件夹中创建log_bolt.py文件,代码如下:

from streamparse import Bolt

class LogPrintBolt(Bolt):
    # 定义输入字段,与上游Spout的outputs对应
    inputs = ['number']

    def process(self, tup):
        """
        核心处理方法,每接收到一条数据执行一次
        tup:上游发送的数据元组
        """
        # 从元组中提取数据
        random_num = tup.values[0]
        # 打印处理日志
        self.logger.info(f"接收到随机数:{random_num}")

代码说明

  • 继承Bolt基类,inputs指定接收的字段名
  • process方法是数据处理核心,每条数据都会触发该方法
  • 通过tup.values获取上游数据,索引对应发送时的顺序

4.3 Topology拓扑配置

Topology是连接Spout和Bolt的核心,定义数据流向,在项目根目录的topologies文件夹中创建demo_topology.py文件:

from streamparse import Topology
from spouts.random_spout import RandomNumberSpout
from bolts.log_bolt import LogPrintBolt

class DemoTopology(Topology):
    # 配置Spout,设置并行度为1
    random_spout = RandomNumberSpout.spec(parallelism=1)
    # 配置Bolt,接收Spout的数据,并行度为1
    log_bolt = LogPrintBolt.spec(
        inputs=[random_spout],
        parallelism=1
    )

代码说明

  • 继承Topology基类,整合所有组件
  • spec方法配置组件并行度,数值越大处理能力越强
  • inputs指定Bolt的数据源,实现组件间的关联

4.4 本地运行流处理任务

streamparse支持本地模式运行,无需部署到Storm集群,适合开发调试,执行命令:

sparse run

运行后终端会持续输出日志,显示每秒接收的随机数,证明基础流处理任务运行成功:

INFO:root:随机数Spout初始化完成,开始生成数据
INFO:root:接收到随机数:45
INFO:root:接收到随机数:78
INFO:root:接收到随机数:12

Ctrl + C可停止任务。

五、streamparse进阶实战案例

5.1 实战需求说明

基础案例仅实现数据打印,本次进阶案例实现实时数字统计功能:Spout生成随机数,第一个Bolt判断数字奇偶,第二个Bolt实时统计奇数和偶数的数量,实现流数据聚合计算。

5.2 奇偶判断Bolt开发

bolts文件夹中创建judge_bolt.py,实现奇偶判断并发送结果:

from streamparse import Bolt

class JudgeBolt(Bolt):
    outputs = ['number', 'type']

    def process(self, tup):
        random_num = tup.values[0]
        # 判断奇偶
        num_type = "奇数" if random_num % 2 != 0 else "偶数"
        # 发送原始数字和类型到下游
        self.emit([random_num, num_type])
        self.logger.info(f"数字{random_num} 是{num_type}")

代码说明

  • 新增type输出字段,传递奇偶类型
  • 处理后的数据继续发送,供下游聚合统计使用

5.3 实时统计Bolt开发

bolts文件夹中创建count_bolt.py,实现实时数量统计:

from streamparse import Bolt

class CountBolt(Bolt):
    inputs = ['number', 'type']

    def initialize(self, storm_conf, context):
        # 初始化计数器
        self.odd_count = 0
        self.even_count = 0

    def process(self, tup):
        num_type = tup.values[1]
        # 更新对应计数器
        if num_type == "奇数":
            self.odd_count += 1
        else:
            self.even_count += 1
        # 实时输出统计结果
        self.logger.info(f"实时统计:奇数总数={self.odd_count},偶数总数={self.even_count}")

代码说明

  • initialize中初始化计数器,任务启动时执行一次
  • 每次接收数据后更新对应类型的计数
  • 实时打印统计结果,实现流数据的动态聚合

5.4 进阶版Topology配置

修改demo_topology.py,串联三个组件:

from streamparse import Topology
from spouts.random_spout import RandomNumberSpout
from bolts.judge_bolt import JudgeBolt
from bolts.count_bolt import CountBolt

class DemoTopology(Topology):
    random_spout = RandomNumberSpout.spec(parallelism=1)
    judge_bolt = JudgeBolt.spec(inputs=[random_spout], parallelism=1)
    count_bolt = CountBolt.spec(inputs=[judge_bolt], parallelism=1)

5.5 进阶任务运行

再次执行本地运行命令:

sparse run

终端会输出完整的处理流程,包含原始数据、类型判断、实时统计:

INFO:root:数字45 是奇数
INFO:root:实时统计:奇数总数=1,偶数总数=0
INFO:root:数字78 是偶数
INFO:root:实时统计:奇数总数=1,偶数总数=1

该案例完整模拟了企业级实时数据处理场景,可直接迁移到日志分析、监控统计等业务中。

六、streamparse集群部署基础

6.1 集群部署核心命令

本地调试完成后,可部署到Apache Storm集群,实现分布式高可用运行:

sparse submit

该命令会自动打包项目,上传至Storm集群,按照Topology配置分布式运行,支持多节点并行处理,满足高并发生产环境需求。

6.2 集群任务管理

  • 查看集群中运行的任务:sparse list
  • 停止指定任务:sparse kill 任务名称
  • 查看任务日志:sparse log 任务名称

七、相关资源

  • Pypi地址:https://pypi.org/project/streamparse
  • Github地址:https://github.com/Parsely/streamparse
  • 官方文档地址:https://streamparse.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具库:PyFunctional 链式数据处理完全教程

一、PyFunctional 库概述

PyFunctional 是一款专注于函数式编程的 Python 库,核心用于简化序列、迭代器、字典、集合等数据的链式处理,无需编写复杂循环与嵌套函数。其基于函数式编程思想,将 map、filter、reduce、flatmap 等操作封装为链式调用,底层通过惰性计算优化性能,支持流式数据处理。该库采用 MIT 许可证,优点是语法简洁、可读性强、上手门槛低,适合数据清洗与转换;缺点是处理超大规模数据时性能不及原生 Pandas,更适合轻量数据场景。

二、PyFunctional 安装方法

PyFunctional 可通过 pip 快速安装,兼容 Python 3.6 及以上版本,安装命令如下:

pip install pyfunctional

安装完成后,在 Python 脚本或交互式环境中导入核心模块即可使用:

from functional import seq

seq 是 PyFunctional 最核心的入口类,所有数据处理操作都基于该对象展开,也是后续所有代码示例的基础。

三、PyFunctional 基础使用与核心操作

3.1 创建序列对象

PyFunctional 支持将列表、元组、集合、生成器、字典等可迭代对象转换为 seq 对象,从而使用链式函数式接口:

# 从列表创建
data_list = seq([1, 2, 3, 4, 5])

# 从元组创建
data_tuple = seq((6, 7, 8, 9, 10))

# 从集合创建
data_set = seq({11, 12, 13, 14, 15})

# 从字符串创建(按字符拆分)
data_str = seq("python")

# 从字典创建(默认处理键值对元组)
data_dict = seq({"name": "pyfunc", "version": "1.0"})

转换为 seq 对象后,无需手动编写循环,可直接链式调用处理方法。

3.2 map 映射操作

map 用于对序列中每个元素执行指定函数,返回处理后的新序列,是最常用的数据转换操作:

# 对每个数字平方
numbers = seq([1, 2, 3, 4, 5])
result = numbers.map(lambda x: x ** 2)
# 转换为列表查看结果
print(list(result))

代码说明:通过 lambda 表达式定义平方逻辑,map 遍历所有元素并执行计算,最终输出 [1, 4, 9, 16, 25]

也可使用自定义函数替代 lambda,适合复杂逻辑:

def process_text(s):
    return s.strip().upper()

words = seq(["  hello ", " python ", " functional "])
processed = words.map(process_text)
print(list(processed))

代码说明:定义文本清洗函数,去除空格并转为大写,输出 ['HELLO', 'PYTHON', 'FUNCTIONAL']

3.3 filter 过滤操作

filter 用于根据条件筛选元素,保留返回 True 的元素,丢弃返回 False 的元素:

# 筛选偶数
numbers = seq([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_numbers = numbers.filter(lambda x: x % 2 == 0)
print(list(even_numbers))

代码说明:判断元素是否能被 2 整除,筛选出所有偶数,输出 [2, 4, 6, 8, 10]

# 筛选长度大于3的字符串
words = seq(["cat", "dog", "elephant", "tiger", "lion"])
long_words = words.filter(lambda x: len(x) > 3)
print(list(long_words))

代码说明:根据字符串长度筛选,保留长度大于3的单词,输出 ['elephant', 'tiger', 'lion']

3.4 flatmap 展平映射

flatmap 先执行 map 映射,再自动将嵌套序列展平为一维序列,适合处理嵌套数据:

# 拆分句子为单词
sentences = seq(["hello python", "functional programming", "data process"])
words = sentences.flatmap(lambda x: x.split())
print(list(words))

代码说明:先按空格拆分每个句子为单词列表,再自动展平,最终得到一维单词序列 ['hello', 'python', 'functional', 'programming', 'data', 'process']

3.5 reduce 聚合操作

reduce 用于将序列聚合为单个值,如求和、求积、拼接字符串等:

# 数字求和
numbers = seq([1, 2, 3, 4, 5])
total = numbers.reduce(lambda a, b: a + b)
print(total)

代码说明:依次累加所有元素,输出结果 15

# 字符串拼接
words = seq(["Py", "Functional", "is", "useful"])
sentence = words.reduce(lambda a, b: a + " " + b)
print(sentence)

代码说明:将所有字符串用空格拼接,输出 Py Functional is useful

3.6 链式组合操作

PyFunctional 最大优势是支持无限链式调用,可将 map、filter、flatmap、reduce 等操作组合,一行代码完成复杂数据处理:

# 筛选偶数 → 平方 → 求和
result = seq([1,2,3,4,5,6,7,8,9,10])\
    .filter(lambda x: x%2==0)\
    .map(lambda x: x**2)\
    .reduce(lambda a,b: a+b)

print(result)

代码说明:先筛选偶数 [2,4,6,8,10],再平方得到 [4,16,36,64,100],最后求和得到 220

3.7 去重、排序、切片

PyFunctional 内置常用序列操作,无需依赖原生复杂语法:

# 去重
data = seq([1,2,2,3,3,3,4,5])
unique_data = data.distinct()
print(list(unique_data))  # [1,2,3,4,5]

# 排序
sorted_data = data.sorted(reverse=True)
print(list(sorted_data))  # [5,4,3,3,3,2,2,1]

# 切片(前3个元素)
slice_data = data.take(3)
print(list(slice_data))  # [1,2,2]

# 跳过前2个元素
skip_data = data.drop(2)
print(list(skip_data))  # [2,3,3,3,4,5]

3.8 字典与键值对处理

PyFunctional 可便捷处理字典数据,支持按键、值筛选与转换:

# 处理字典数据
users = seq([
    {"name": "张三", "age": 20, "score": 85},
    {"name": "李四", "age": 22, "score": 92},
    {"name": "王五", "age": 19, "score": 78}
])

# 筛选分数大于80的用户,只保留姓名和年龄
high_score_users = users\
    .filter(lambda u: u["score"] > 80)\
    .map(lambda u: {"name": u["name"], "age": u["age"]})

print(list(high_score_users))

代码说明:先筛选分数大于80的用户,再提取指定字段,输出结果:

[{'name': '张三', 'age': 20}, {'name': '李四', 'age': 22}]

四、PyFunctional 高级功能

4.1 惰性计算特性

PyFunctional 默认使用惰性计算,即链式操作不会立即执行,只有在转换为列表、元组、求和等最终操作时才会真正计算,大幅节省内存:

# 定义超长序列,惰性计算不会占用大量内存
large_data = seq(range(1000000))\
    .map(lambda x: x*2)\
    .filter(lambda x: x%5==0)

# 仅取前5个,无需计算全部数据
print(list(large_data.take(5)))

代码说明:即使序列包含100万个元素,也不会一次性加载到内存,适合处理流式数据。

4.2 分组操作 groupby

groupby 可按指定条件对序列分组,返回键值对形式的分组结果:

# 按奇偶分组
numbers = seq([1,2,3,4,5,6,7,8,9,10])
grouped = numbers.groupby(lambda x: "even" if x%2==0 else "odd")

for key, values in grouped.items():
    print(key, list(values))

代码说明:将数字分为奇数、偶数两组,输出:

odd [1,3,5,7,9]
even [2,4,6,8,10]

4.3 统计功能

PyFunctional 内置求和、最大值、最小值、平均值、计数等统计方法:

data = seq([10, 20, 30, 40, 50])

print("总和:", data.sum())          # 150
print("最大值:", data.max())        # 50
print("最小值:", data.min())        # 10
print("平均值:", data.avg())        # 30.0
print("元素数量:", data.count())     # 5

4.4 条件判断 any 与 all

any 判断是否存在满足条件的元素,all 判断是否所有元素都满足条件:

numbers = seq([1,2,3,4,5])

# 是否存在大于3的元素
print(numbers.any(lambda x: x>3))   # True

# 是否所有元素都小于10
print(numbers.all(lambda x: x<10))  # True

五、真实业务场景实战案例

5.1 学生成绩数据清洗与统计

模拟学生成绩数据,完成数据清洗、筛选、分组、统计等完整流程:

from functional import seq

# 原始数据(含空值、异常分数)
students = seq([
    {"name": "小明", "score": 88, "class": "一班"},
    {"name": "小红", "score": 95, "class": "一班"},
    {"name": "小刚", "score": 59, "class": "二班"},
    {"name": "小丽", "score": 72, "class": "二班"},
    {"name": "小亮", "score": None, "class": "一班"},  # 空分
    {"name": "小芳", "score": 105, "class": "二班"}     # 异常分数
])

# 数据清洗:去除空分、分数0-100之外的异常数据
clean_students = students\
    .filter(lambda s: s["score"] is not None)\
    .filter(lambda s: 0 <= s["score"] <= 100)

# 按班级分组统计平均分
class_avg = clean_students\
    .groupby(lambda s: s["class"])\
    .map(lambda g: {
        "class": g[0],
        "avg_score": round(seq(g[1]).avg(lambda s: s["score"]), 2)
    })

print("各班平均分:")
for item in class_avg:
    print(item)

# 筛选90分以上优秀学生
excellent = clean_students\
    .filter(lambda s: s["score"] >= 90)\
    .map(lambda s: f"{s['name']}({s['class']}): {s['score']}分")

print("\n优秀学生:")
print(list(excellent))

代码说明

  1. 过滤空分数与超出0-100范围的异常数据;
  2. 按班级分组并计算平均分;
  3. 筛选90分以上学生并格式化输出。

5.2 日志文本分析

模拟日志数据,提取关键词、统计频次、筛选异常日志:

from functional import seq

# 模拟日志数据
logs = seq([
    "2025-01-01 10:00 INFO user login success",
    "2025-01-01 10:05 ERROR database connection failed",
    "2025-01-01 10:10 INFO user logout",
    "2025-01-01 10:15 ERROR request timeout",
    "2025-01-01 10:20 INFO user login success"
])

# 提取所有ERROR日志
error_logs = logs.filter(lambda l: "ERROR" in l)
print("错误日志:")
print(list(error_logs))

# 统计日志级别频次
level_count = logs\
    .map(lambda l: l.split()[2])\
    .count_by_value()

print("\n日志级别统计:", level_count)

代码说明

  1. 筛选包含 ERROR 的错误日志;
  2. 提取日志级别并统计出现次数。

六、相关资源

  • Pypi地址:https://pypi.org/project/PyFunctional
  • Github地址:https://github.com/EntilZha/PyFunctional
  • 官方文档地址:https://pyfunctional.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python 模型部署神器:mleap 从入门到实战教程

一、mleap 库概述

mleap 是一款专注于机器学习模型跨平台部署、序列化与执行的 Python 工具库,核心用于将 Spark MLlib、Scikit-learn、TensorFlow 等框架训练的模型导出为统一格式,实现脱离训练环境直接运行。其原理是把模型结构与参数封装为 MLeap Bundle 格式,提供轻量级运行时执行预测。优点是跨框架兼容、部署轻量化、无依赖迁移,缺点是对小众模型支持有限,主要面向工业级标准化部署。该库采用 Apache 2.0 开源许可,可商用、修改与分发。

二、mleap 安装与环境配置

2.1 基础安装方式

在使用 mleap 之前,需要先通过 pip 完成安装,命令如下:

pip install mleap

如果需要同时支持 scikit-learn 与 Spark 模型导出,可安装完整依赖:

pip install mleap[all]

2.2 版本与依赖验证

安装完成后,可在 Python 环境中验证是否安装成功:

import mleap

# 查看 mleap 版本
print("mleap 版本:", mleap.__version__)

这段代码的作用是导入 mleap 库并打印当前版本,确认库已成功加载,避免后续代码因安装问题报错。

2.3 配套依赖安装

mleap 常与 scikit-learn、pandas、numpy 配合使用,推荐安装以下依赖:

pip install scikit-learn pandas numpy

这些库是机器学习模型训练的基础,也是 mleap 导出模型时必需的支撑库,缺少会导致模型序列化失败。

三、mleap 核心功能与工作流程

3.1 核心功能

  1. 模型序列化:将训练好的机器学习模型保存为 MLeap Bundle 格式,包含模型结构、特征转换逻辑、参数权重。
  2. 跨框架执行:支持 Scikit-learn、Spark MLlib、XGBoost 等主流框架模型统一部署。
  3. 无训练环境运行:导出后的模型可在无 Python 训练环境的服务中直接预测,降低部署成本。
  4. 特征管道一体化:不仅保存模型,还能将数据预处理、特征转换、模型预测整个 Pipeline 一起导出。

3.2 工作流程

  1. 使用机器学习框架完成模型与 Pipeline 训练;
  2. 调用 mleap 工具将 Pipeline 序列化为 MLeap Bundle;
  3. 在部署环境加载 Bundle,创建预测执行器;
  4. 传入新数据,直接获取预测结果,无需重新训练。

四、mleap 基础使用:Scikit-learn 模型导出与加载

4.1 构建基础机器学习 Pipeline

以经典的鸢尾花分类任务为例,先使用 Scikit-learn 构建包含数据预处理与模型的完整 Pipeline:

import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

# 加载数据集
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

# 划分训练集与测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 构建 Pipeline:标准化 + 随机森林分类
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# 训练模型
pipeline.fit(X_train, y_train)

# 原生预测测试
print("原生模型预测结果:", pipeline.predict(X_test[:5]))

代码说明:

  • 加载鸢尾花数据集并转为 DataFrame 格式,适配 mleap 数据格式要求;
  • 构建包含特征标准化与随机森林的 Pipeline,保证预处理与预测一体化;
  • 训练后对前5条测试数据预测,验证模型正常工作。

4.2 使用 mleap 导出 Pipeline 模型

mleap 提供专门的工具导出 Scikit-learn Pipeline,代码如下:

from mleap import sklearn as mleap_sklearn

# 定义导出路径
bundle_path = "./iris_rf_bundle"

# 导出模型
mleap_sklearn.export_to_bundle(
    pipeline,
    input_features=iris.feature_names,
    output_path=bundle_path,
    overwrite=True
)
print("模型已成功导出至:", bundle_path)

代码说明:

  • export_to_bundle 是 mleap 导出 Scikit-learn 模型的核心方法;
  • input_features 指定输入特征名称,必须与训练数据列名一致;
  • overwrite=True 允许覆盖已存在的 Bundle 文件,方便调试。

4.3 加载 mleap 模型并预测

导出后的模型可脱离 Scikit-learn 训练环境,仅用 mleap 运行:

from mleap.runtime import Runtime
from mleap.runtime.serialization import load_bundle

# 加载模型
bundle = load_bundle(bundle_path)
runtime = Runtime(bundle)

# 准备预测数据(与训练特征顺序一致)
test_data = [
    [6.1, 2.8, 4.7, 1.2],
    [5.7, 3.8, 1.7, 0.3],
    [7.7, 2.6, 6.9, 2.3]
]

# 执行预测
predictions = runtime.predict(test_data)
print("mleap 模型预测结果:", predictions)

代码说明:

  • 使用 load_bundle 加载导出的模型文件,创建 Runtime 执行器;
  • 传入二维列表格式数据,直接调用 predict 方法获取结果;
  • 结果与原生 Scikit-learn 模型一致,证明部署有效。

五、mleap 进阶使用:自定义特征工程与批量部署

5.1 包含自定义转换的 Pipeline 导出

mleap 支持包含复杂特征处理的 Pipeline,示例如下:

from sklearn.preprocessing import PolynomialFeatures

# 构建带多项式特征的复杂 Pipeline
complex_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('poly', PolynomialFeatures(degree=2)),
    ('classifier', RandomForestClassifier())
])

complex_pipeline.fit(X_train, y_train)

# 导出复杂 Pipeline
complex_bundle = "./iris_complex_bundle"
mleap_sklearn.export_to_bundle(
    complex_pipeline,
    input_features=iris.feature_names,
    output_path=complex_bundle,
    overwrite=True
)

# 加载并预测
runtime_complex = Runtime(load_bundle(complex_bundle))
print("复杂模型 mleap 预测:", runtime_complex.predict(X_test[:3]))

代码说明:

  • 加入多项式特征转换,模拟真实业务中的复杂特征工程;
  • mleap 可完整保留所有转换步骤,部署后无需重复编写预处理代码;
  • 适合工业场景中预处理逻辑繁琐的分类、回归任务。

5.2 批量数据预测与结果格式化

在实际业务中,通常需要批量预测并返回结构化结果:

# 构造批量测试数据
batch_data = X_test.values.tolist()

# 批量预测
batch_pred = runtime.predict(batch_data)

# 转为 DataFrame 输出
result_df = pd.DataFrame({
    "sepal length": [x[0] for x in batch_data],
    "sepal width": [x[1] for x in batch_data],
    "petal length": [x[2] for x in batch_data],
    "petal width": [x[3] for x in batch_data],
    "predict_class": batch_pred
})

print("批量预测结果:")
print(result_df.head(10))

代码说明:

  • 将测试集全部转为列表格式,进行批量预测;
  • 把原始特征与预测结果组合为 DataFrame,方便存入数据库或返回接口。

六、mleap 与 Spark MLlib 模型适配(扩展场景)

mleap 最初为 Spark 模型设计,对 Spark MLlib 支持极佳,示例代码:

# 需安装 PySpark
# pip install pyspark

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from mleap.pyspark import export_to_bundle

# 创建 Spark 会话
spark = SparkSession.builder.appName("mleap_demo").getOrCreate()

# 构造 Spark DataFrame
data = load_iris()
df = spark.createDataFrame(
    pd.DataFrame(np.column_stack((data.data, data.target)),
                 columns=data.feature_names + ["label"])
)

# 特征向量化
assembler = VectorAssembler(inputCols=data.feature_names, outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(df)

# 导出 Spark 模型
spark_bundle = "./spark_lr_bundle"
export_to_bundle(
    model,
    input_cols=data.feature_names,
    output_path=spark_bundle,
    overwrite=True
)

# 加载预测
spark_runtime = Runtime(load_bundle(spark_bundle))
print("Spark 模型 mleap 预测:", spark_runtime.predict([[5.1, 3.5, 1.4, 0.2]]))

代码说明:

  • 展示 mleap 对 Spark MLlib 模型的完整支持;
  • 可将大数据训练的模型轻量化部署,脱离 Spark 集群运行;
  • 适合大数据团队模型上线场景。

七、实际业务案例:标准化模型部署流程

7.1 案例背景

某数据团队需要将训练好的房价回归模型部署为 API 服务,要求:

  • 脱离训练环境运行;
  • 支持实时数据预测;
  • 预处理与模型一体化。

7.2 案例完整代码

from sklearn.datasets import fetch_california_housing
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
import mleap.sklearn as mleap_sklearn
from mleap.runtime import Runtime
from mleap.runtime.serialization import load_bundle
import pandas as pd

# 1. 加载并划分数据
housing = fetch_california_housing()
X_house = pd.DataFrame(housing.data, columns=housing.feature_names)
y_house = housing.target
X_train_h, X_test_h, y_train_h, y_test_h = train_test_split(
    X_house, y_house, test_size=0.2, random_state=42
)

# 2. 构建回归 Pipeline
house_pipeline = Pipeline([
    ('scaler', MinMaxScaler()),
    ('regressor', LinearRegression())
])
house_pipeline.fit(X_train_h, y_train_h)

# 3. 导出模型
house_bundle = "./house_price_bundle"
mleap_sklearn.export_to_bundle(
    house_pipeline,
    input_features=housing.feature_names,
    output_path=house_bundle,
    overwrite=True
)

# 4. 部署环境加载模型
house_runtime = Runtime(load_bundle(house_bundle))

# 5. 模拟线上实时预测
new_house = [[8.3252, 41.0, 6.9841, 1.0238, 322.0, 2.5556, 37.88, -122.23]]
pred_price = house_runtime.predict(new_house)
print(f"预测房价:{pred_price[0]:.2f} 万美元")

代码说明:

  • 以加州房价预测为真实业务场景,使用线性回归模型;
  • 包含归一化预处理,符合工业部署规范;
  • 导出后可直接嵌入 Flask、FastAPI 等服务框架。

7.3 部署服务化扩展

可将上述预测逻辑封装为 API 接口:

# 需安装 FastAPI
# pip install fastapi uvicorn

from fastapi import FastAPI
import numpy as np

app = FastAPI(title="mleap 房价预测 API")

# 启动时加载模型
house_runtime = Runtime(load_bundle("./house_price_bundle"))

@app.post("/predict_price")
def predict_price(features: list):
    result = house_runtime.predict([features])
    return {"price": float(np.round(result[0], 2))}

启动命令:

uvicorn main:app --host 0.0.0.0 --port 8000

访问地址:http://127.0.0.1:8000/docs

八、相关资源

  • Pypi地址:https://pypi.org/project/mleap
  • Github地址:https://github.com/combust/mleap
  • 官方文档地址:https://combust.github.io/mleap-docs/

关注我,每天分享一个实用的Python自动化工具。

Python 实用工具:Ploomber 从入门到实战,一站式数据流水线管理

一、Ploomber 库概述

Ploomber 是一款面向数据科学、机器学习与数据分析场景的 Python 流水线管理工具,专注于简化数据处理、模型训练、实验管理全流程,支持将 Jupyter Notebook、Python 脚本编排为可复用、可调度的流水线。其核心原理是通过声明式配置定义任务依赖关系,自动解析执行顺序,实现增量运行、缓存复用与环境隔离。该工具采用 Apache-2.0 开源许可,优势在于降低流水线编写成本、提升实验复现性、支持快速部署,缺点是对超复杂分布式调度支持较弱,更适合中小规模数据工程场景。

二、Ploomber 安装与基础配置

2.1 环境准备与安装命令

Ploomber 对 Python 环境兼容性良好,支持 Python 3.8 及以上版本,可直接通过 pip 完成安装,同时建议搭配虚拟环境使用,避免依赖冲突。

安装基础版本:

pip install ploomber

若需要使用 Notebook 转换、交互式调试、可视化等增强功能,可安装完整版本:

pip install ploomber[all]

安装完成后,可通过以下命令验证是否安装成功:

ploomber --version

出现版本号则代表安装正常,Ploomber 同时提供命令行工具与 Python API 两种使用方式,兼顾命令行爱好者与代码集成需求。

2.2 初始化第一个 Ploomber 项目

Ploomber 提供项目初始化模板,可快速生成标准目录结构,降低上手成本,适合数据项目规范化管理。

执行初始化命令:

ploomber new my_first_ploomber_project

执行完成后,会自动生成包含配置文件、任务脚本、输出目录的完整结构,无需手动创建复杂配置文件。

三、Ploomber 核心使用方式与基础代码示例

3.1 基于 Python 脚本构建简单流水线

Ploomber 最核心的能力是任务编排与依赖管理,用户只需要关注单个任务逻辑,工具自动处理执行顺序与缓存。

创建数据获取脚本 tasks/get_data.py

from pathlib import Path
import pandas as pd
import numpy as np

# 定义输出路径
out = Path('output/raw_data.csv')

# 生成模拟数据集
def get_data():
    dates = pd.date_range(start='20250101', periods=100)
    values = np.random.randn(100).cumsum()
    df = pd.DataFrame({'date': dates, 'value': values})
    return df

if __name__ == '__main__':
    df = get_data()
    out.parent.mkdir(exist_ok=True)
    df.to_csv(out, index=False)

该脚本用于生成模拟时序数据,输出为 CSV 文件,作为整个流水线的数据源。

创建数据处理脚本 tasks/process_data.py

from pathlib import Path
import pandas as pd

# 定义输入与输出路径
in_path = Path('output/raw_data.csv')
out = Path('output/processed_data.csv')

def process_data(df):
    # 计算移动平均值
    df['ma7'] = df['value'].rolling(window=7).mean()
    df.dropna(inplace=True)
    return df

if __name__ == '__main__':
    df = pd.read_csv(in_path)
    df_processed = process_data(df)
    df_processed.to_csv(out, index=False)

该脚本依赖上一步生成的原始数据,完成平滑处理与缺失值删除。

创建流水线配置文件 pipeline.yaml

tasks:
  - source: tasks/get_data.py
    product: output/raw_data.csv

  - source: tasks/process_data.py
    product: output/processed_data.csv
    depends_on: tasks/get_data.py

配置文件清晰声明任务来源、输出产物与依赖关系,Ploomber 会自动识别依赖并按顺序执行。

执行流水线:

ploomber build

首次执行会依次运行两个脚本,生成对应文件;再次执行时,Ploomber 会自动检测文件是否修改,未修改则直接使用缓存,大幅提升执行效率。

3.2 增量运行与缓存机制

Ploomber 会自动跟踪代码与数据变化,实现增量执行,这是其在数据实验中极具价值的特性。

修改 process_data.py 中的窗口大小:

df['ma7'] = df['value'].rolling(window=14).mean()

再次执行:

ploomber build

工具会自动跳过未修改的 get_data.py,只重新执行 process_data.py,节省重复计算时间,尤其适合大数据量场景。

3.3 基于 Jupyter Notebook 的流水线任务

数据科学家日常大量使用 Notebook,Ploomber 完美支持将 Notebook 作为任务,无需重构为脚本即可纳入流水线。

创建 Notebook 任务 tasks/visualize.ipynb

# %%
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt

# %%
in_path = Path('output/processed_data.csv')
df = pd.read_csv(in_path)

# %%
plt.figure(figsize=(12, 6))
plt.plot(df['date'], df['value'], label='Original')
plt.plot(df['date'], df['ma7'], label='MA7', linewidth=2)
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('output/plot.png')

pipeline.yaml 中添加该任务:

tasks:
  - source: tasks/get_data.py
    product: output/raw_data.csv

  - source: tasks/process_data.py
    product: output/processed_data.csv
    depends_on: tasks/get_data.py

  - source: tasks/visualize.ipynb
    product: output/plot.png
    depends_on: tasks/process_data.py

执行后,Notebook 会自动运行并输出图片,同时可保留交互式分析过程,兼顾开发便捷性与流水线规范性。

四、Ploomber 进阶功能与实战案例

4.1 参数化流水线与多场景实验

Ploomber 支持通过参数文件动态调整任务配置,快速实现多组对照实验,适合机器学习调参、数据策略对比。

创建参数文件 params.yaml

window_size: 7
data_points: 100

修改 get_data.py 使用参数:

from pathlib import Path
import pandas as pd
import numpy as np
from ploomber import DAG

# 读取参数
dag = DAG.get_current()
params = dag.params

out = Path('output/raw_data.csv')

def get_data():
    dates = pd.date_range(start='20250101', periods=params['data_points'])
    values = np.random.randn(params['data_points']).cumsum()
    df = pd.DataFrame({'date': dates, 'value': values})
    return df

if __name__ == '__main__':
    df = get_data()
    out.parent.mkdir(exist_ok=True)
    df.to_csv(out, index=False)

修改 process_data.py 使用参数:

from pathlib import Path
import pandas as pd
from ploomber import DAG

dag = DAG.get_current()
params = dag.params

in_path = Path('output/raw_data.csv')
out = Path('output/processed_data.csv')

def process_data(df):
    df[f'ma{params["window_size"]}'] = df['value'].rolling(window=params["window_size"]).mean()
    df.dropna(inplace=True)
    return df

if __name__ == '__main__':
    df = pd.read_csv(in_path)
    df_processed = process_data(df)
    df_processed.to_csv(out, index=False)

更新 pipeline.yaml 绑定参数:

tasks:
  - source: tasks/get_data.py
    product: output/raw_data.csv

  - source: tasks/process_data.py
    product: output/processed_data.csv
    depends_on: tasks/get_data.py

  - source: tasks/visualize.ipynb
    product: output/plot.png
    depends_on: tasks/process_data.py

params: params.yaml

只需修改 params.yaml,即可快速运行不同参数组合,无需改动核心代码,极大提升实验效率。

4.2 流水线可视化与任务状态查看

Ploomber 内置可视化功能,可直观展示任务依赖关系与执行状态,方便复杂流水线调试。

执行可视化命令:

ploomber plot

执行后会生成流水线结构图,清晰展示任务之间的依赖关系、执行状态、产物路径,帮助快速定位执行问题。

同时可通过命令查看任务状态:

ploomber status

展示每个任务是否最新、上次执行时间、依赖是否变更等信息,实现流水线全生命周期管理。

4.3 导出与部署流水线

Ploomber 支持将流水线导出为可执行脚本、Airflow DAG、Kubeflow 流水线等格式,实现从开发到生产无缝迁移。

导出为独立可执行脚本:

ploomber export pipeline.py

导出为 Airflow 工作流:

ploomber export airflow --output airflow_dag.py

导出后的文件可直接部署到生产调度系统,无需手动重构代码,降低数据工程上线成本。

五、完整机器学习流水线实战案例

本案例构建从数据生成、清洗、特征工程、模型训练到结果评估的完整机器学习流水线,全面展示 Ploomber 实际应用价值。

创建任务 1:生成分类数据 tasks/generate_data.py

from pathlib import Path
import pandas as pd
from sklearn.datasets import make_classification
from ploomber import DAG

dag = DAG.get_current()
n_samples = dag.params.get('n_samples', 500)

out = Path('output/data.csv')

def generate_data():
    X, y = make_classification(n_samples=n_samples, n_features=10, random_state=42)
    df = pd.DataFrame(X, columns=[f'f{i}' for i in range(10)])
    df['target'] = y
    return df

if __name__ == '__main__':
    df = generate_data()
    out.parent.mkdir(exist_ok=True)
    df.to_csv(out, index=False)

任务 2:数据预处理 tasks/preprocess.py

from pathlib import Path
import pandas as pd
from sklearn.model_selection import train_test_split

in_path = Path('output/data.csv')
out_X_train = Path('output/X_train.csv')
out_X_test = Path('output/X_test.csv')
out_y_train = Path('output/y_train.csv')
out_y_test = Path('output/y_test.csv')

def preprocess(df):
    X = df.drop('target', axis=1)
    y = df['target']
    return train_test_split(X, y, test_size=0.2, random_state=42)

if __name__ == '__main__':
    df = pd.read_csv(in_path)
    X_train, X_test, y_train, y_test = preprocess(df)
    X_train.to_csv(out_X_train, index=False)
    X_test.to_csv(out_X_test, index=False)
    y_train.to_csv(out_y_train, index=False)
    y_test.to_csv(out_y_test, index=False)

任务 3:模型训练 tasks/train_model.py

from pathlib import Path
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib

X_train = Path('output/X_train.csv')
y_train = Path('output/y_train.csv')
model_out = Path('output/model.pkl')

def train():
    X = pd.read_csv(X_train)
    y = pd.read_csv(y_train).values.ravel()
    model = RandomForestClassifier(random_state=42)
    model.fit(X, y)
    return model

if __name__ == '__main__':
    model = train()
    joblib.dump(model, model_out)

任务 4:模型评估 tasks/evaluate.py

from pathlib import Path
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, classification_report

model_path = Path('output/model.pkl')
X_test = Path('output/X_test.csv')
y_test = Path('output/y_test.csv')
report_out = Path('output/report.txt')

def evaluate():
    model = joblib.load(model_path)
    X = pd.read_csv(X_test)
    y = pd.read_csv(y_test).values.ravel()
    y_pred = model.predict(X)
    acc = accuracy_score(y, y_pred)
    report = classification_report(y, y_pred)
    return acc, report

if __name__ == '__main__':
    acc, report = evaluate()
    with open(report_out, 'w') as f:
        f.write(f'Accuracy: {acc:.4f}\n\n{report}')

完整 pipeline.yaml

params: params.yaml

tasks:
  - source: tasks/generate_data.py
    product: output/data.csv

  - source: tasks/preprocess.py
    product:
      - output/X_train.csv
      - output/X_test.csv
      - output/y_train.csv
      - output/y_test.csv
    depends_on: tasks/generate_data.py

  - source: tasks/train_model.py
    product: output/model.pkl
    depends_on: tasks/preprocess.py

  - source: tasks/evaluate.py
    product: output/report.txt
    depends_on: tasks/train_model.py

params.yaml

n_samples: 500

执行流水线:

ploomber build

执行完成后,自动生成数据集、训练集测试集划分、模型文件与评估报告,任意环节修改后均可增量执行,大幅提升机器学习实验效率。

相关资源

  • Pypi地址:https://pypi.org/project/ploomber
  • Github地址:https://github.com/ploomber/ploomber
  • 官方文档地址:https://docs.ploomber.io/

关注我,每天分享一个实用的Python自动化工具。

Python轻量任务队列神器:huey 从入门到实战完全指南

一、huey 库核心介绍

huey 是一款专为 Python 打造的轻量级任务队列库,核心用于处理异步任务、定时任务、延时任务,无需依赖复杂中间件即可快速实现任务调度。其原理是通过生产者提交任务、消费者监听执行,支持 Redis、SQLite 等多种存储方式。优点是轻量简洁、上手零门槛、依赖少,适合中小型项目;缺点是不适合超大规模分布式集群。该库采用 MIT 开源许可,可自由商用与修改。

二、huey 安装与基础环境配置

2.1 安装 huey

huey 安装极为简便,直接通过 pip 命令即可完成安装,打开命令行执行以下指令:

pip install huey

若需要使用 Redis 作为存储后端(生产环境推荐),还需安装 Redis 依赖库:

pip install redis

若是开发测试环境,可直接使用 SQLite 存储,无需额外安装其他依赖,开箱即用。

2.2 初始化 huey 实例

使用 huey 的第一步是创建任务队列实例,这是所有任务的核心载体,我们可以根据需求选择不同的存储方式。

2.2.1 基于 SQLite 的本地实例(测试专用)

SQLite 是文件型数据库,无需启动服务,适合本地开发、测试场景,代码如下:

from huey import SqliteHuey

# 初始化 SQLite 存储的 huey 实例,tasks.db 为任务存储文件
huey = SqliteHuey(filename='tasks.db')

2.2.2 基于 Redis 的实例(生产环境推荐)

Redis 性能更高、支持并发,适合生产环境,配置代码:

from huey import RedisHuey

# 连接本地 Redis,默认端口6379,指定任务队列名称
huey = RedisHuey('my_task_queue', host='localhost', port=6379, db=0)

初始化实例后,即可通过装饰器定义任务,无需复杂配置,这也是 huey 轻量便捷的核心体现。

三、huey 基础任务使用详解

3.1 定义并执行异步任务

异步任务是 huey 最基础的功能,用于处理无需立即返回结果、耗时较长的操作,比如发送邮件、生成报表、爬取数据等。通过 @huey.task() 装饰器即可将普通函数转为异步任务。

代码示例:基础异步任务

from huey import SqliteHuey
import time

# 初始化 huey
huey = SqliteHuey(filename='tasks.db')

# 定义异步任务
@huey.task()
def send_message(username, content):
    """模拟发送消息的耗时任务"""
    time.sleep(2)  # 模拟任务执行耗时
    return f"用户 {username},消息:{content} 发送成功"

# 调用任务(非阻塞,立即返回)
task = send_message('张三', '你好,这是异步消息')
print("任务已提交,任务ID:", task.id)

代码说明

  1. 使用 @huey.task() 装饰普通函数,函数逻辑不变,仅变为可异步执行的任务;
  2. 直接调用函数不会立即执行,而是将任务存入队列,返回任务对象;
  3. time.sleep(2) 模拟耗时操作,主线程不会阻塞,可继续执行其他代码。

3.2 执行延时任务

延时任务用于指定延迟一段时间后再执行任务,比如订单超时未支付自动取消、延时发送通知等,使用 @huey.task(delay=秒数) 或调用时指定 delay 参数。

代码示例:延时任务

@huey.task(delay=5)  # 延迟5秒执行
def delay_task(task_name):
    """延时执行任务"""
    return f"延时任务 {task_name} 执行完成"

# 提交延时任务
delay_task('订单超时检查')

# 也可调用时动态指定延迟时间
delay_task('会员到期提醒').delay(10)  # 延迟10秒执行

代码说明

  1. delay 参数单位为秒,可固定装饰器中,也可调用时动态传入;
  2. 任务提交后会等待指定时间,再由消费者执行,无需手动计时。

3.3 定时周期任务

huey 支持定时周期任务,类似 Linux 的 Crontab,可实现每天、每小时、每分钟自动执行任务,比如每日数据统计、定时清理缓存等。

代码示例:周期任务

# 每分钟执行一次
@huey.periodic_task(crontab(minute='*'))
def minute_task():
    print("每分钟执行一次的任务")
    return "分钟任务执行完毕"

# 每天凌晨2点执行
@huey.periodic_task(crontab(hour=2, minute=0))
def daily_statistics():
    print("每日凌晨2点执行数据统计任务")
    return "每日统计完成"

# 每周一早上8点执行
@huey.periodic_task(crontab(day_of_week=1, hour=8, minute=0))
def weekly_remind():
    print("每周一早上8点发送周报提醒")
    return "周报提醒成功"

代码说明

  1. 使用 @huey.periodic_task 装饰器定义周期任务,配合 crontab 控制执行时间;
  2. crontab 支持分钟、小时、日期、月份、星期,语法与 Linux 定时任务一致;
  3. 周期任务会由消费者自动调度,无需手动提交。

3.4 获取任务执行结果

提交任务后,可通过任务对象获取执行状态与结果,支持判断任务是否完成、获取返回值、取消任务等操作。

代码示例:获取任务结果

# 提交任务
result_task = send_message('李四', '测试获取任务结果')

# 判断任务是否执行完成
print("任务是否完成:", result_task.complete())

# 等待任务执行完成并获取结果(阻塞等待)
print("任务结果:", result_task.get(block=True, timeout=10))

# 取消任务(任务未执行时可取消)
# result_task.revoke()

代码说明

  1. complete():返回布尔值,判断任务是否执行完毕;
  2. get(block=True):阻塞等待任务完成,timeout 设置最大等待时间;
  3. revoke():撤销未执行的任务,适合任务提交后无需执行的场景。

四、huey 消费者启动与任务执行

huey 采用生产者-消费者模式,提交任务的是生产者,专门执行任务的是消费者,必须启动消费者才能执行队列中的任务

4.1 启动消费者命令行

假设我们的任务代码保存在 task_app.py 文件中,启动消费者命令如下:

# 基础启动命令
huey_consumer.py task_app.huey -w 2

命令参数说明

  • task_app.huey:指定 huey 实例所在的模块与实例名;
  • -w 2:启动 2 个工作进程,并发执行任务,可根据服务器性能调整;
  • -l logs:将任务日志输出到 logs 目录,方便排查问题;
  • -d:后台运行消费者(Linux/Mac 系统)。

启动成功后,命令行会显示消费者监听状态,提交的任务会自动被消费者获取并执行。

4.2 消费者运行逻辑

消费者启动后会持续监听任务队列,流程如下:

  1. 轮询检测队列中是否有待执行任务;
  2. 发现任务后,分配给工作进程执行;
  3. 执行完成后记录任务状态与返回结果;
  4. 周期任务会按设定时间自动触发执行。

整个过程无需人工干预,部署简单,适合中小型项目快速落地。

五、huey 高级功能使用

5.1 任务优先级设置

huey 支持为任务设置优先级,高优先级任务会优先被执行,适合区分核心任务与普通任务。

代码示例:优先级任务

# 高优先级任务
@huey.task(priority=10)
def high_priority_task(order_id):
    time.sleep(1)
    return f"高优先级订单 {order_id} 处理完成"

# 低优先级任务
@huey.task(priority=1)
def low_priority_task(log_id):
    time.sleep(1)
    return f"低优先级日志 {log_id} 记录完成"

# 提交任务
high_priority_task(1001)
low_priority_task(2001)

代码说明
优先级数值越大,执行优先级越高,消费者会优先调度高优先级任务。

5.2 任务异常处理与重试

执行任务时可能出现异常,huey 支持自动重试、异常捕获,保证任务稳定性。

代码示例:任务重试

# 执行失败自动重试3次,每次间隔2秒
@huey.task(retries=3, retry_delay=2)
def risky_task(num):
    """可能出错的任务"""
    if num % 2 != 0:
        raise ValueError("数字必须为偶数")
    return f"数字 {num} 校验通过"

# 提交会报错的任务,触发重试
risky_task(3)

代码说明

  1. retries=3:任务失败后自动重试 3 次;
  2. retry_delay=2:每次重试间隔 2 秒;
  3. 重试次数耗尽仍失败,任务会标记为执行失败。

5.3 任务钩子函数

huey 支持任务执行前后的钩子函数,可用于记录日志、统计执行时间、预处理数据等。

代码示例:钩子函数

@huey.task()
def hook_task():
    return "带钩子的任务"

# 任务执行前触发
@huey.pre_execute()
def pre_execute_hook(task):
    print(f"任务 {task.id} 即将开始执行")

# 任务执行后触发
@huey.post_execute()
def post_execute_hook(task, result):
    print(f"任务 {task.id} 执行完成,结果:{result}")

代码说明
钩子函数会自动绑定对应任务,无需手动调用,适合统一处理任务执行前后逻辑。

六、huey 实际业务场景案例

6.1 电商订单超时自动取消案例

电商场景中,用户下单后未支付,需超时自动取消订单并释放库存,这是 huey 延时任务的经典应用。

完整代码示例

from huey import RedisHuey
import time

# 生产环境使用 Redis 存储
huey = RedisHuey('order_queue', host='localhost', port=6379, db=0)

# 模拟订单数据库
order_db = {
    1001: {"status": "待支付", "stock": 10},
    1002: {"status": "待支付", "stock": 5}
}

# 延时任务:订单15分钟未支付自动取消
@huey.task(delay=900)  # 900秒=15分钟
def cancel_unpaid_order(order_id):
    """取消未支付订单,释放库存"""
    order_info = order_db.get(order_id)
    if not order_info:
        return f"订单 {order_id} 不存在"

    if order_info["status"] == "待支付":
        order_info["status"] = "已取消"
        order_info["stock"] += 1
        return f"订单 {order_id} 超时未支付,已取消,库存已释放"
    return f"订单 {order_id} 已支付,无需取消"

# 模拟用户下单
def create_order(order_id):
    """用户创建订单"""
    if order_id in order_db:
        print(f"订单 {order_id} 创建成功,15分钟内未支付将自动取消")
        # 提交超时取消任务
        cancel_unpaid_order(order_id)
        return "下单成功"

# 测试下单
create_order(1001)
create_order(1002)

业务逻辑说明

  1. 用户创建订单后,立即提交 15 分钟延时任务;
  2. 若用户按时支付,订单状态变更,延时任务执行时不做操作;
  3. 若用户未支付,任务自动取消订单、释放库存,无需人工处理。

6.2 每日自动数据统计报表案例

结合周期任务,实现每日凌晨自动统计业务数据,生成报表并保存。

完整代码示例

from huey import RedisHuey
from datetime import datetime
import json

huey = RedisHuey('stat_queue')

# 模拟业务数据
user_data = {"new_user": 120, "active_user": 850, "order_count": 320}

# 每日凌晨2点执行统计任务
@huey.periodic_task(crontab(hour=2, minute=0))
def generate_daily_report():
    """生成每日业务报表"""
    report_data = {
        "date": datetime.now().strftime("%Y-%m-%d"),
        "new_user": user_data["new_user"],
        "active_user": user_data["active_user"],
        "order_count": user_data["order_count"],
        "status": "success"
    }
    # 保存报表到文件
    with open(f"daily_report_{datetime.now().strftime('%Y%m%d')}.json", "w", encoding="utf-8") as f:
        json.dump(report_data, f, ensure_ascii=False, indent=4)
    return f"每日报表生成成功:{report_data}"

业务逻辑说明

  1. 通过周期任务固定每日凌晨 2 点执行;
  2. 自动统计当日业务数据,生成 JSON 格式报表;
  3. 报表自动保存,无需人工登录系统操作,提升效率。

七、huey 项目部署与注意事项

  1. 存储选择:本地测试用 SQLite,生产环境必须用 Redis,提升性能与稳定性;
  2. 消费者守护:生产环境需用 supervisor、systemd 等工具守护消费者进程,防止崩溃退出;
  3. 并发控制:根据服务器 CPU 核心数设置工作进程数,避免过多进程导致服务器卡顿;
  4. 日志监控:开启任务日志,实时监控任务执行状态,及时排查失败任务;
  5. 适用场景:适合中小型项目、轻量级任务,大型分布式集群建议使用 Celery。

相关资源

  • Pypi地址:https://pypi.org/project/huey/
  • Github地址:https://github.com/coleifer/huey
  • 官方文档地址:https://huey.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python异步任务队列神器:arq从入门到实战完全指南

一、arq库基础认知

1.1 库核心用途

arq是一款基于Python asyncio与Redis开发的异步任务队列库,专注处理异步、延迟、后台耗时任务,完美适配异步Web框架,是异步架构中任务调度、异步解耦的核心工具。

1.2 工作原理

arq以Redis为中间件存储任务与状态,生产者通过异步接口投递任务,Worker异步监听Redis队列,自动拉取并执行任务,全程基于asyncio实现非阻塞运行,任务执行、重试、结果存储都依托Redis高效完成。

1.3 优缺点

优点:纯异步非阻塞、轻量无冗余依赖、API简洁易上手、兼容FastAPI/Starlette等异步框架、支持任务重试与延迟执行、性能优异。
缺点:仅支持Redis作为后端、功能比Celery精简、不支持多消息队列、复杂任务调度能力较弱。

1.4 License类型

MIT License,开源免费可商用。

二、arq环境安装与基础配置

2.1 安装arq与依赖

arq核心依赖Redis,安装命令:

pip install arq redis

安装完成后可通过命令验证版本:

arq --version

确保本地或远程Redis服务正常运行,arq默认连接本地127.0.0.1:6379无密码Redis实例。

2.2 基础连接配置

arq支持自定义Redis连接参数,包括主机、端口、密码、数据库索引,创建基础配置文件:

# redis_config.py
import asyncio
from arq import create_pool
from arq.connections import RedisSettings

# 自定义Redis连接配置
redis_settings = RedisSettings(
    host="127.0.0.1",
    port=6379,
    password="",  # 有密码则填写
    database=0,
    timeout=5
)

# 测试Redis连接
async def test_redis_connection():
    redis = await create_pool(redis_settings)
    print("Redis连接成功")
    await redis.close()

if __name__ == "__main__":
    asyncio.run(test_redis_connection())

代码说明:通过RedisSettings定义连接参数,create_pool创建异步连接池,实现arq与Redis的基础通信。

三、arq核心使用方式与代码实例

3.1 定义基础异步任务

arq任务必须是异步函数,这是其核心特性,创建任务文件:

# tasks.py
import asyncio
from arq import ArqRedis

# 基础异步任务
async def simple_task(ctx: ArqRedis, content: str) -> str:
    """
    简单异步任务
    :param ctx: 任务上下文,包含Redis连接等信息
    :param content: 任务传入参数
    :return: 任务执行结果
    """
    print(f"开始执行简单任务:{content}")
    # 模拟异步耗时操作
    await asyncio.sleep(2)
    result = f"任务执行完成,内容:{content}"
    print(result)
    return result

# 任务注册:Worker启动时加载的任务列表
async def startup(ctx):
    """Worker启动时执行的钩子函数"""
    print("arq Worker启动成功")

async def shutdown(ctx):
    """Worker关闭时执行的钩子函数"""
    print("arq Worker关闭成功")

# Worker配置类
class WorkerSettings:
    # 注册可执行的任务函数
    functions = [simple_task]
    # 启动与关闭钩子
    on_startup = startup
    on_shutdown = shutdown
    # 绑定Redis配置
    redis_settings = RedisSettings(host="127.0.0.1", port=6379)

代码说明:定义异步任务函数,通过WorkerSettings注册任务,配置启动关闭钩子,Worker会自动加载注册的任务。

3.2 启动arq Worker

Worker是任务消费者,负责监听队列并执行任务,命令行启动:

arq tasks.WorkerSettings

启动成功后会输出:arq Worker启动成功,持续监听Redis任务队列。

3.3 投递任务到arq队列

创建生产者脚本,异步投递任务到arq:

# producer.py
import asyncio
from arq import create_pool
from tasks import redis_settings

# 投递任务
async def enqueue_task():
    # 创建Redis连接池
    redis = await create_pool(redis_settings)
    # 投递任务:任务函数名 + 参数
    job = await redis.enqueue_job("simple_task", "Hello arq异步任务队列")
    # 获取任务ID
    job_id = job.job_id
    print(f"任务投递成功,任务ID:{job_id}")

    # 等待任务执行完成并获取结果
    job_result = await job.result(timeout=10)
    print(f"任务执行结果:{job_result}")

    await redis.close()

if __name__ == "__main__":
    asyncio.run(enqueue_task())

代码说明:通过enqueue_job投递任务,传入任务函数名与参数,可通过job.result()同步等待任务结果,适用于需要获取返回值的场景。

运行producer.py,Worker端会打印任务执行信息,生产者输出:

任务投递成功,任务ID:xxxxxxxx
任务执行结果:任务执行完成,内容:Hello arq异步任务队列

3.4 延迟任务与定时任务

arq支持延迟执行任务,指定延迟秒数:

# 延迟任务:5秒后执行
async def delay_task(ctx: ArqRedis, msg: str):
    await asyncio.sleep(1)
    print(f"延迟任务执行:{msg}")
    return f"延迟任务结果:{msg}"

# 在tasks.py的WorkerSettings中添加任务
functions = [simple_task, delay_task]

投递延迟任务:

# producer.py中添加
async def enqueue_delay_task():
    redis = await create_pool(redis_settings)
    # 延迟5秒执行
    job = await redis.enqueue_job("delay_task", "5秒后执行的延迟任务", _defer_by=5)
    print(f"延迟任务投递成功,任务ID:{job.job_id}")
    await redis.close()

代码说明:_defer_by参数指定延迟秒数,arq会自动计算执行时间,到期后执行任务。

3.5 任务重试机制

arq内置任务重试功能,应对任务执行失败场景,配置重试次数与延迟:

# 可重试任务
async def retry_task(ctx: ArqRedis):
    print("执行可重试任务")
    # 模拟任务执行失败
    raise Exception("任务执行异常,触发重试")

# 自定义任务配置
retry_task.arq_kwargs = {
    "max_tries": 3,        # 最大重试次数
    "retry_delay": 2       # 重试间隔2秒
}

WorkerSettings中注册retry_task,投递任务后,Worker会自动重试3次,适合接口调用、数据同步等不稳定任务。

3.6 获取任务状态与结果

arq支持查询任务状态、结果、异常信息:

async def get_task_status(job_id: str):
    redis = await create_pool(redis_settings)
    # 获取任务对象
    job = await redis.job(job_id)
    if not job:
        print("任务不存在")
        return

    # 获取任务状态
    status = await job.status()
    print(f"任务状态:{status}")

    # 获取任务结果(未完成会抛出异常)
    try:
        result = await job.result()
        print(f"任务结果:{result}")
    except Exception as e:
        print(f"任务未完成或异常:{e}")

    await redis.close()

任务状态包括:pending(等待中)、running(执行中)、complete(完成)、failed(失败)。

四、arq与FastAPI集成实战案例

4.1 集成背景

FastAPI是主流异步Web框架,与arq天然兼容,可实现Web接口异步处理耗时任务,如发送邮件、生成报表、数据爬取等。

4.2 集成代码实现

# fastapi_arq.py
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
import asyncio

app = FastAPI(title="arq+FastAPI异步任务实战")

# Redis配置
redis_settings = RedisSettings(host="127.0.0.1", port=6379)

# 定义异步任务
async def send_email_task(ctx, email: str, content: str):
    """模拟异步发送邮件任务"""
    await asyncio.sleep(3)
    print(f"向{email}发送邮件:{content}")
    return f"邮件发送成功:{email}"

# Worker配置
class WorkerSettings:
    functions = [send_email_task]
    redis_settings = redis_settings

# 应用启动时创建arq连接池
@app.on_event("startup")
async def startup_event():
    app.state.arq_redis = await create_pool(redis_settings)

# 应用关闭时关闭连接
@app.on_event("shutdown")
async def shutdown_event():
    await app.state.arq_redis.close()

# 接口:投递发送邮件任务
@app.post("/send-email")
async def send_email(email: str, content: str):
    job = await app.state.arq_redis.enqueue_job("send_email_task", email, content)
    return {
        "code": 200,
        "msg": "邮件任务投递成功",
        "job_id": job.job_id
    }

# 接口:查询任务状态
@app.get("/task-status/{job_id}")
async def get_task_status(job_id: str):
    job = await app.state.arq_redis.job(job_id)
    if not job:
        return {"code": 404, "msg": "任务不存在"}
    status = await job.status()
    result = None
    if status == "complete":
        result = await job.result()
    return {
        "code": 200,
        "job_id": job_id,
        "status": status,
        "result": result
    }

4.3 启动与访问

  1. 启动arq Worker:
arq fastapi_arq.WorkerSettings
  1. 启动FastAPI服务:
uvicorn fastapi_arq:app --reload
  1. 访问接口:
  • 投递任务:POST http://127.0.0.1:8000/[email protected]&content=测试邮件
  • 查询任务:GET http://127.0.0.1:8000/task-status/任务ID

该案例实现了Web接口与异步任务解耦,用户请求无需等待耗时任务完成,提升接口响应速度与系统并发能力。

五、arq高级特性与实际场景优化

5.1 任务结果过期设置

arq默认永久存储任务结果,可配置过期时间释放Redis空间:

# 任务结果1小时后过期
job = await redis.enqueue_job("simple_task", "测试过期", _expires=3600)

5.2 多任务队列隔离

arq支持自定义队列名称,实现不同业务任务隔离:

# 投递到订单队列
job = await redis.enqueue_job("order_task", "订单数据", _queue="order_queue")

# Worker监听指定队列
class WorkerSettings:
    queue_name = "order_queue"
    functions = [order_task]

5.3 任务并发控制

调整Worker并发数,适配不同服务器性能:

class WorkerSettings:
    functions = [simple_task, delay_task]
    redis_settings = redis_settings
    max_jobs = 10  # 最大并发执行10个任务

相关资源

  • Pypi地址:https://pypi.org/project/arq
  • Github地址:https://github.com/samuelcolvin/arq
  • 官方文档地址:https://arq-docs.helpmanual.io/

关注我,每天分享一个实用的Python自动化工具。

Python数据质量监控神器:whylogs从入门到实战全教程

一、whylogs库基础认知

1.1 库核心用途

whylogs是WhyLabs团队开源的轻量级数据日志与数据质量监控Python库,核心用于自动化生成数据剖面日志、持续追踪数据特征、实时检测数据漂移与异常值,广泛适配数据分析、机器学习流水线、数据工程等场景,无需复杂配置即可嵌入现有Python项目完成数据质量管控。

1.2 工作原理

whylogs通过轻量级流式计算方式,对输入数据(DataFrame、字典、数据流等)进行统计聚合,生成轻量化、可序列化的数据剖面(Profile),不存储原始数据,仅保留分布、计数、缺失值、类型等统计信息,支持离线存储、云端同步与多版本对比,实现无侵入式数据监控。

1.3 优缺点

优点:轻量化无性能损耗、支持流式与批量数据、兼容主流数据框架、隐私安全不存原始数据、可对接可视化平台、开箱即用。
缺点:复杂自定义规则需二次开发、极端小众数据类型支持有限、纯离线模式缺少自动告警扩展。

1.4 License类型

whylogs采用Apache-2.0 License,属于宽松开源协议,允许商业使用、修改、分发与二次发布。

二、whylogs安装与环境准备

2.1 基础安装命令

whylogs支持Python 3.7及以上版本,使用pip即可快速安装,执行以下命令:

pip install whylogs

安装过程会自动依赖numpy、pandas、pyarrow等基础数据处理库,无需额外手动配置。

2.2 验证安装成功

安装完成后,可通过简单导入语句验证是否正常:

# 验证whylogs安装
import whylogs as why

# 无报错则说明安装成功
print("whylogs 安装成功,版本:", why.__version__)

运行代码后输出对应版本号,即代表环境配置完成。

2.3 扩展依赖安装

若需对接云端WhyLabs平台或增强可视化能力,可安装扩展包:

pip install whylogs[viz] whylogs[whylabs]

whylogs[viz]提供本地剖面可视化能力,whylogs[whylabs]支持数据剖面云端上传与集中管理。

三、whylogs核心功能与基础使用

3.1 快速生成数据剖面

数据剖面是whylogs的核心产物,包含数据完整统计信息,支持pandas DataFrame、列表、字典等多种数据格式。

import pandas as pd
import whylogs as why

# 构造示例数据集
data = {
    "用户ID": [1001, 1002, 1003, 1004, None, 1006],
    "消费金额": [99.5, 199.0, 59.8, None, 299.0, 88.0],
    "会员等级": ["普通", "高级", "普通", "高级", "普通", "普通"],
    "购买次数": [3, 5, 2, 7, 1, 4]
}
df = pd.DataFrame(data)

# 使用whylogs生成数据剖面
results = why.log(df)

# 获取剖面对象
profile = results.profile()

# 查看剖面数据
print("数据剖面生成完成!")
profile.view()

代码说明:通过why.log()方法传入DataFrame,自动完成数据统计分析,profile.view()可在控制台输出数据基础指标,包括缺失值数量、数据类型、唯一值计数等。

3.2 查看数据详细统计指标

生成剖面后,可提取单列或全量详细统计信息,包括缺失率、最大值、最小值、均值、分位数等。

# 获取数据剖面视图
profile_view = profile.view()

# 查看全量数据列的统计指标
full_stats = profile_view.to_pandas()
print("全量数据统计指标:")
print(full_stats)

# 单独提取指定列指标
amount_stats = profile_view.get_column("消费金额")
print("\n消费金额字段详细统计:")
print("缺失值数量:", amount_stats.missing.value)
print("最大值:", amount_stats.max.value)
print("最小值:", amount_stats.min.value)
print("均值:", amount_stats.mean.value)

代码说明:profile_view.to_pandas()将统计结果转为DataFrame,方便二次处理;get_column()可精准定位目标字段,获取针对性质量指标。

3.3 流式数据实时监控

whylogs支持流式数据处理,适用于实时数据 pipelines、日志流、接口数据等场景。

# 初始化流式记录器
writer = why.logger(mode="streaming", name="stream_demo", interval=5, when="count")

# 接入第一批次数据
batch1 = pd.DataFrame({
    "访问IP": ["192.168.1.1", "192.168.1.2", "192.168.1.3"],
    "响应时间": [120, 200, 150]
})
writer.log(batch1)

# 接入第二批次数据
batch2 = pd.DataFrame({
    "访问IP": ["192.168.1.4", None, "192.168.1.5"],
    "响应时间": [300, 180, None]
})
writer.log(batch2)

# 关闭记录器并生成最终剖面
stream_profile = writer.close()
print("流式数据剖面生成完成")
stream_profile.view().to_pandas()

代码说明:mode="streaming"开启流式模式,interval=5表示每5条数据自动聚合一次,适合持续产生的实时数据质量监控。

3.4 数据剖面持久化存储

生成的数据剖面可序列化保存为本地文件,方便后续对比、回溯与共享。

# 保存剖面到本地文件
profile.write(path="data_profile.bin")

# 从本地文件加载剖面
loaded_profile = why.read(path="data_profile.bin")

print("加载的历史数据剖面:")
loaded_profile.view().to_pandas()

代码说明:剖面文件体积极小,仅存储统计信息,不占用大量存储空间,适合长期归档。

四、数据漂移检测与多版本对比

4.1 数据漂移检测基础使用

在机器学习场景中,训练数据与在线推理数据的分布差异(数据漂移)会严重影响模型效果,whylogs可快速检测该问题。

# 构造训练数据(基准数据)
train_data = pd.DataFrame({
    "特征A": [10, 12, 11, 13, 12, 10, 11],
    "特征B": [0.5, 0.6, 0.5, 0.7, 0.6, 0.5, 0.6]
})

# 构造推理数据(待检测数据,存在分布偏移)
infer_data = pd.DataFrame({
    "特征A": [18, 19, 20, 17, 18, 19],
    "特征B": [0.1, 0.2, 0.1, 0.3, 0.2, 0.1]
})

# 生成两个数据剖面
train_profile = why.log(train_data).profile()
infer_profile = why.log(infer_data).profile()

# 对比检测数据漂移
from whylogs.core.metrics.metrics import Metric
from whylogs.core.view import DatasetProfileView

train_view = train_profile.view()
infer_view = infer_profile.view()

# 执行漂移检测
drift_report = train_view.compare(infer_view).drift_report()
print("数据漂移检测报告:")
print(drift_report.to_pandas())

代码说明:通过基准剖面与待检测剖面对比,自动计算分布差异,输出漂移评分与漂移等级,帮助快速定位异常特征。

4.2 可视化漂移对比结果

安装扩展依赖后,可直接在Python环境中生成交互式漂移对比图表:

# 生成交互式漂移对比可视化
train_view.compare(infer_view).visualize()

代码说明:运行后会生成包含分布直方图、漂移指数的交互式页面,直观展示数据差异,无需手动绘图。

五、结合机器学习流水线实战案例

5.1 场景说明

本案例模拟完整机器学习流程:数据读取→剖面记录→模型训练→推理数据监控→漂移告警,覆盖实际项目完整链路。

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import whylogs as why

# 1. 加载并拆分数据集(模拟业务数据)
np.random.seed(42)
raw_data = pd.DataFrame({
    "广告曝光量": np.random.randint(1000, 5000, 100),
    "点击量": np.random.randint(100, 500, 100),
    "转化量": np.random.randint(10, 100, 100)
})
raw_data["转化量"].iloc[80:100] = None  # 人为添加缺失值

# 记录原始数据剖面
raw_profile = why.log(raw_data).profile()
raw_profile.write("raw_data_profile.bin")

# 2. 数据预处理
clean_data = raw_data.dropna()
X = clean_data[["广告曝光量", "点击量"]]
y = clean_data["转化量"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 记录训练集数据剖面
train_profile = why.log(X_train).profile()
train_profile.write("train_data_profile.bin")

# 3. 模型训练
model = LinearRegression()
model.fit(X_train, y_train)

# 4. 模拟在线推理(含异常数据)
online_data = pd.DataFrame({
    "广告曝光量": [9000, 8500, 8800, 1500, 1300],
    "点击量": [200, 180, 190, 800, 750]  # 点击量异常偏高
})

# 记录推理数据剖面
online_profile = why.log(online_data).profile()
online_profile.write("online_data_profile.bin")

# 5. 检测推理数据与训练数据的漂移
drift_result = train_profile.view().compare(online_profile.view()).drift_report()
print("模型推理数据漂移检测:")
print(drift_result.to_pandas())

代码说明:本案例完整复现工业级数据监控流程,从原始数据到模型推理全程记录剖面,及时发现异常数据与分布偏移。

5.2 异常数据自动过滤

基于whylogs检测结果,可实现自动过滤异常推理数据,保障模型稳定性:

# 获取漂移检测结果
drift_df = drift_result.to_pandas()
high_drift_columns = drift_df[drift_df["drift_score"] > 0.6]["column"].tolist()

if high_drift_columns:
    print(f"检测到高漂移字段:{high_drift_columns},自动过滤异常数据")
    # 过滤异常数据
    filtered_online_data = online_data.copy()
    for col in high_drift_columns:
        # 基于训练数据统计值设置阈值
        train_mean = train_profile.view().get_column(col).mean.value
        filtered_online_data = filtered_online_data[filtered_online_data[col] < train_mean * 3]
    print("过滤后数据:")
    print(filtered_online_data)
else:
    print("数据正常,可直接推理")

代码说明:通过漂移分数设置阈值,自动识别高风险字段并过滤异常数据,减少错误输入对模型的影响。

六、集成WhyLabs云端平台

6.1 云端上传数据剖面

whylabs支持将本地剖面上传至云端,实现多项目集中监控、历史回溯、自动告警:

import os
import whylogs as why
from whylogs.api.whylabs.session import WhyLabsSession

# 配置云端密钥(需在WhyLabs官网注册获取)
os.environ["WHYLABS_API_KEY"] = "你的API_KEY"
os.environ["WHYLABS_DEFAULT_DATASET_ID"] = "你的数据集ID"

# 上传剖面到云端
profile.writer("whylabs").write()
print("数据剖面已成功上传至WhyLabs云端")

代码说明:云端平台提供可视化看板、定时监控、团队协作功能,适合企业级数据质量管理。

七、相关资源

  • Pypi地址:https://pypi.org/project/whylogs/
  • Github地址:https://github.com/whylabs/whylogs
  • 官方文档地址:https://docs.whylabs.ai/docs/whylogs/

关注我,每天分享一个实用的Python自动化工具。

Python 机器学习流水线神器:ZenML 从入门到实战全教程

一、ZenML 库概述

ZenML 是一款面向机器学习与 MLOps 领域的开源 Python 库,核心用于构建可复用、可复现、可迁移的端到端 ML 流水线,屏蔽底层环境差异,统一本地、云端、分布式集群的流水线执行逻辑。其基于流水线与步骤抽象设计,将数据读取、预处理、训练、评估、部署拆分为可编排步骤,底层通过配置文件管理运行时环境与组件。优点是轻量化、易上手、跨平台兼容、支持多框架协同,缺点是复杂分布式调度能力弱于 Kubeflow。采用 Apache License 2.0 开源许可,商用友好。

二、ZenML 安装与初始化环境

2.1 基础安装

在使用 ZenML 前,我们需要通过 pip 完成安装,打开命令行执行以下指令:

pip install zenml

该命令会安装 ZenML 核心库以及基础依赖,适合本地快速体验与开发。如果需要对接云服务、数据库、分布式训练等扩展功能,还可以安装对应扩展包。

2.2 初始化 ZenML 环境

安装完成后,必须先初始化 ZenML 工作环境,这一步会创建本地配置文件、数据库、存储目录等核心结构,是后续所有操作的前提。

zenml init

执行成功后,会在当前目录生成 .zenml 隐藏文件夹,用于存储流水线配置、运行记录、元数据等信息。

2.3 安装常用扩展组件

机器学习流水线通常需要对接数据、模型、可视化工具,因此我们安装常用扩展组件:

# 安装可视化、数据处理、模型训练相关扩展
pip install "zenml[server,data,model,tensorflow,sklearn]"

安装完成后,可以启动 ZenML 本地服务,用于查看流水线运行状态、元数据、实验记录等:

zenml up

启动成功后,默认访问地址为 http://127.0.0.1:8237,打开浏览器即可进入 ZenML 可视化控制台。

三、ZenML 核心概念与基础使用

3.1 核心概念解析

  1. 步骤(Step):流水线中最小执行单元,例如数据加载、数据清洗、模型训练、模型评估,每个步骤都是独立函数,通过装饰器标记。
  2. 流水线(Pipeline):由多个步骤按逻辑顺序组合而成,定义完整机器学习工作流,一次定义可多次运行。
  3. 工件(Artifact):步骤之间传递的数据或模型,ZenML 自动管理工件的存储、读取、版本管理,无需手动处理文件读写。
  4. 栈(Stack):定义流水线运行环境,包括编排引擎、元数据存储、工件存储、部署引擎等,本地默认使用本地栈,可无缝切换云端栈。
  5. 运行(Run):流水线的一次执行过程,所有步骤日志、结果、指标都会被记录,支持回溯查看。

3.2 第一个 ZenML 流水线

我们从最简单的示例开始,创建两个步骤并组合成流水线,理解 ZenML 的基础用法。

3.2.1 代码实现

# 导入核心装饰器
from zenml import step, pipeline

# 定义第一个步骤:生成数据
@step
def generate_data() -> int:
    """生成一个整数数据"""
    data = 100
    print(f"生成数据:{data}")
    return data

# 定义第二个步骤:处理数据
@step
def process_data(input_data: int) -> int:
    """对输入数据进行处理,乘以2"""
    result = input_data * 2
    print(f"处理后数据:{result}")
    return result

# 定义流水线:组合步骤
@pipeline
def simple_ml_pipeline():
    """最简单的 ZenML 流水线"""
    data = generate_data()
    output = process_data(data)

# 运行流水线
if __name__ == "__main__":
    simple_ml_pipeline()

3.2.2 代码说明

  1. 使用 @step 装饰器将普通函数标记为 ZenML 步骤,函数的输入输出会自动被 ZenML 管理为工件。
  2. 使用 @pipeline 装饰器将步骤组合为流水线,内部按顺序调用步骤,自动处理数据传递。
  3. 运行脚本后,ZenML 会自动记录运行日志、步骤执行顺序、数据传递结果,可在控制台查看。

执行代码后,命令行会输出执行过程,浏览器控制台会新增一条流水线运行记录,展示每个步骤的执行状态、耗时、输出结果。

四、基于 Sklearn 的机器学习实战流水线

4.1 实战场景说明

本案例使用经典鸢尾花数据集,构建完整机器学习流水线,包含:数据加载、数据划分、模型训练、模型评估四个核心步骤,使用 Sklearn 实现算法,ZenML 完成流水线编排与管理。

4.2 完整代码实现

from zenml import step, pipeline
from zenml.client import Client
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# 步骤1:加载数据集
@step
def load_dataset() -> pd.DataFrame:
    """加载鸢尾花数据集并转换为DataFrame"""
    iris = load_iris()
    data = pd.DataFrame(iris.data, columns=iris.feature_names)
    data["target"] = iris.target
    print("数据集加载完成,形状:", data.shape)
    return data

# 步骤2:划分训练集和测试集
@step
def split_dataset(
    data: pd.DataFrame, test_size: float = 0.2
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
    """划分训练集和测试集"""
    X = data.drop("target", axis=1)
    y = data["target"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42
    )
    print(f"训练集样本数:{len(X_train)},测试集样本数:{len(X_test)}")
    return X_train, X_test, y_train, y_test

# 步骤3:训练随机森林模型
@step
def train_model(
    X_train: pd.DataFrame, y_train: pd.Series
) -> RandomForestClassifier:
    """使用随机森林算法训练模型"""
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    print("模型训练完成")
    return model

# 步骤4:评估模型
@step
def evaluate_model(
    model: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.Series
) -> float:
    """评估模型并输出准确率"""
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    print(f"模型测试集准确率:{acc:.4f}")
    return acc

# 定义完整机器学习流水线
@pipeline
def iris_classification_pipeline(test_size: float = 0.2):
    """鸢尾花分类完整流水线"""
    data = load_dataset()
    X_train, X_test, y_train, y_test = split_dataset(data, test_size=test_size)
    model = train_model(X_train, y_train)
    accuracy = evaluate_model(model, X_test, y_test)

# 运行流水线
if __name__ == "__main__":
    # 执行流水线
    run = iris_classification_pipeline(test_size=0.3)

    # 查看运行结果
    client = Client()
    latest_run = client.get_pipeline_run("iris_classification_pipeline")
    print(f"最新运行ID:{latest_run.id}")
    print(f"最终准确率:{latest_run.steps['evaluate_model'].output.read()}")

4.3 代码说明

  1. 四个步骤分别承担数据、划分、训练、评估职责,解耦代码结构,便于单独修改、调试、复用。
  2. 步骤之间自动传递 DataFrame、模型、数组等复杂对象,无需手动保存文件、读取文件。
  3. 流水线支持传入参数(如 test_size),可灵活调整配置,多次运行对比结果。
  4. 通过 ZenML Client 可以读取历史运行结果、步骤输出、元数据,便于后续自动化分析。

运行代码后,控制台会输出数据集信息、样本划分结果、模型训练状态与最终准确率,同时所有信息会同步到 ZenML 控制台,可查看流水线 DAG 图、步骤耗时、模型指标、数据版本等。

五、基于 TensorFlow 的深度学习流水线实战

5.1 实战场景说明

使用简单神经网络对鸢尾花数据集进行分类,展示 ZenML 对接深度学习框架的能力,步骤包括:数据加载、数据预处理、模型构建、模型训练、模型评估。

5.2 完整代码实现

from zenml import step, pipeline
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical

# 步骤1:加载数据
@step
def load_iris_data() -> pd.DataFrame:
    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names)
    df["target"] = iris.target
    return df

# 步骤2:数据预处理
@step
def preprocess_data(
    df: pd.DataFrame
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    X = df.drop("target", axis=1).values
    y = df["target"].values

    # 标准化
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # 独热编码
    y = to_categorical(y, num_classes=3)

    # 划分数据集
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    return X_train, X_test, y_train, y_test

# 步骤3:构建神经网络模型
@step
def build_dnn_model(input_shape: int) -> Sequential:
    model = Sequential()
    model.add(Dense(16, activation="relu", input_shape=(input_shape,)))
    model.add(Dense(8, activation="relu"))
    model.add(Dense(3, activation="softmax"))

    model.compile(
        optimizer="adam",
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

# 步骤4:训练模型
@step
def train_dnn_model(
    model: Sequential, X_train: np.ndarray, y_train: np.ndarray
) -> Sequential:
    model.fit(
        X_train, y_train,
        epochs=50,
        batch_size=4,
        validation_split=0.1,
        verbose=1
    )
    return model

# 步骤5:评估模型
@step
def test_dnn_model(
    model: Sequential, X_test: np.ndarray, y_test: np.ndarray
) -> dict:
    loss, acc = model.evaluate(X_test, y_test, verbose=0)
    result = {"test_loss": loss, "test_accuracy": acc}
    print(f"测试损失:{loss:.4f},测试准确率:{acc:.4f}")
    return result

# 定义深度学习流水线
@pipeline
def iris_dnn_pipeline():
    df = load_iris_data()
    X_train, X_test, y_train, y_test = preprocess_data(df)
    model = build_dnn_model(input_shape=X_train.shape[1])
    trained_model = train_dnn_model(model, X_train, y_train)
    metrics = test_dnn_model(trained_model, X_test, y_test)

if __name__ == "__main__":
    iris_dnn_pipeline()

5.3 代码说明

  1. ZenML 可以无缝对接 TensorFlow、PyTorch 等深度学习框架,自动序列化、存储、加载模型。
  2. 预处理步骤包含标准化、独热编码、数据集划分,符合深度学习数据处理规范。
  3. 模型训练过程中的日志、指标、结构都会被 ZenML 记录,便于对比不同超参数效果。
  4. 流水线结构清晰,可直接用于生产环境,替换数据集即可快速迁移到其他项目。

六、ZenML 流水线高级用法

6.1 流水线配置化运行

支持通过外部参数控制流水线行为,适配不同环境、不同数据集、不同超参数,实现一次编写、多次灵活运行。

@pipeline
def configurable_pipeline(
    epochs: int = 50,
    test_size: float = 0.2,
    model_type: str = "random_forest"
):
    # 内部根据参数选择不同模型或逻辑
    pass

6.2 查看历史运行记录

from zenml.client import Client

client = Client()

# 获取所有流水线
pipelines = client.list_pipelines()
for p in pipelines:
    print(p.name)

# 获取某条流水线的所有运行记录
runs = client.get_pipeline("iris_classification_pipeline").runs
for r in runs:
    print(f"运行时间:{r.created},状态:{r.status}")

6.3 流水线缓存机制

ZenML 默认开启缓存,未修改的步骤会直接使用上一次运行结果,大幅提升调试速度:

@step(enable_cache=False)  # 关闭当前步骤缓存
def dynamic_step():
    pass

七、实际项目应用价值与代码价值

在实际机器学习项目中,传统开发模式常面临代码混乱、不可复现、环境迁移困难、实验记录丢失等问题。使用 ZenML 可以将整个工作流结构化,每个步骤独立可维护,所有实验自动记录,方便回溯最优模型。

在团队协作中,统一的流水线规范可以降低沟通成本,新成员可快速理解工作流程;在部署阶段,本地编写的流水线无需大量修改即可运行在云端服务器、K8s 集群等环境,实现从开发到生产的平滑迁移。

上述鸢尾花分类、深度学习流水线代码,可直接作为项目模板,替换数据集、调整模型结构、修改评估指标,即可应用于图像分类、表格数据预测、NLP 任务等多种场景,真正实现一套流水线适配多任务、多环境。

八、相关资源

  • Pypi地址:https://pypi.org/project/zenml
  • Github地址:https://github.com/zenml-io/zenml
  • 官方文档地址:https://docs.zenml.io

关注我,每天分享一个实用的Python自动化工具。

Python 实用工具:Activeloop 从入门到实战,轻松管理向量数据与大模型数据集

一、Activeloop 库简介

Activeloop 是一款专注于数据集管理、向量存储与大模型数据流水线的 Python 工具库,核心用于构建、版本控制、流式加载与查询 AI 数据集,尤其适配大语言模型、计算机视觉场景。其通过统一数据格式实现跨设备、跨框架的数据共享,底层依托高效序列化与云端存储能力,支持本地/云端无缝切换。该库采用 MIT License 开源,优点是易用、轻量、兼容主流 AI 框架,缺点是在超大规模离线数据集上性能略逊于专用分布式框架。

二、Activeloop 安装与环境准备

在使用 Activeloop 之前,我们需要完成库的安装与基础环境配置,支持 Python 3.8 及以上版本,可直接通过 pip 完成安装。

2.1 基础安装

打开命令行工具,执行以下安装命令:

pip install deeplake

Activeloop 的核心功能封装在 deeplake 包中,这是官方推荐的安装方式,安装过程会自动依赖 numpy、pandas 等基础数据处理库。

2.2 验证安装

安装完成后,可通过简单代码验证是否安装成功:

import deeplake

# 打印库版本
print("Activeloop (deeplake) 版本:", deeplake.__version__)

运行代码后,若正常输出版本号,说明环境配置完成,可进入后续功能使用。

三、Activeloop 核心功能与基础使用

Activeloop 核心围绕数据集创建、数据写入、数据读取、数据查询、向量存储五大功能展开,面向 AI 开发者屏蔽底层存储细节,专注数据本身。

3.1 创建本地数据集

数据集是 Activeloop 的核心载体,支持文本、图片、向量、标签等多种数据类型,创建方式简洁直观。

import deeplake
import numpy as np

# 创建本地数据集,路径为当前目录下的 my_first_dataset
ds = deeplake.dataset("./my_first_dataset")

# 定义数据集结构:文本数据、向量数据、标签
ds.create_tensor("text", htype="text")
ds.create_tensor("embedding", htype="embedding")
ds.create_tensor("label", htype="class_label")

print("数据集创建完成,结构如下:")
print(ds.tensors)

代码说明:

  • deeplake.dataset() 用于创建/加载数据集,传入本地路径则生成本地数据集;
  • create_tensor() 定义数据列,htype 指定数据类型,适配 AI 场景常用格式;
  • 执行后会在指定路径生成数据集文件夹,包含数据与元信息。

3.2 向数据集写入数据

创建完数据集结构后,可批量写入文本、向量、标签等数据,模拟大模型训练或检索场景的原始数据。

# 构造模拟数据
text_list = [
    "Python 是一门优雅易用的编程语言",
    "Activeloop 适合管理大模型数据集",
    "向量数据库是 RAG 系统的核心组件",
    "深度学习需要高质量标注数据"
]

# 生成模拟 768 维嵌入向量(适配大模型通用维度)
embedding_list = [np.random.rand(768) for _ in range(4)]
label_list = [0, 1, 1, 0]

# 批量写入数据集
with ds:
    ds.text.extend(text_list)
    ds.embedding.extend(embedding_list)
    ds.label.extend(label_list)

print("数据写入成功,数据集样本数:", len(ds))

代码说明:

  • 使用 with ds: 上下文管理器保证数据写入原子性,避免中途出错导致数据损坏;
  • extend() 用于批量添加数据,适配列表、numpy 数组等格式;
  • 写入后可通过 len(ds) 查看数据集总样本数。

3.3 读取数据集数据

Activeloop 支持索引读取、遍历读取、条件读取,操作方式与列表、DataFrame 高度相似,降低学习成本。

# 按索引读取单条数据
sample = ds[0]
print("第一条数据文本:", sample.text.data())
print("第一条数据向量形状:", sample.embedding.shape)
print("第一条数据标签:", sample.label.data())

# 遍历所有数据
print("\n===== 遍历全部数据 =====")
for i, sample in enumerate(ds):
    print(f"样本 {i}:")
    print(f"文本:{sample.text.data()}")
    print(f"标签:{sample.label.data()}\n")

代码说明:

  • 直接通过索引 ds[i] 获取第 i 条样本;
  • data() 方法提取原始数据,避免返回封装对象;
  • 遍历方式与 Python 列表一致,无需复杂语法。

3.4 条件查询数据

Activeloop 内置轻量查询引擎,支持按标签、数值等条件筛选数据,满足 AI 数据预处理需求。

# 查询标签为 1 的所有样本
filtered_ds = ds[ds.label == 1]

print("标签为 1 的样本数量:", len(filtered_ds))
for sample in filtered_ds:
    print("筛选文本:", sample.text.data())

代码说明:

  • 支持 ==!=>< 等常规比较运算符;
  • 筛选后返回新的数据集视图,不占用额外内存;
  • 适合大模型训练前的数据过滤与采样。

四、Activeloop 向量存储与 RAG 场景实战

向量存储是 Activeloop 的核心亮点,可直接作为轻量级向量数据库使用,适配 RAG(检索增强生成)场景,无需额外部署复杂数据库。

4.1 向量数据写入与检索

# 重新创建向量专用数据集
rag_ds = deeplake.dataset("./rag_embedding_dataset")
rag_ds.create_tensor("content", htype="text")
rag_ds.create_tensor("vector", htype="embedding")

# 写入文档与对应向量
documents = [
    "豆包是字节跳动自研的人工智能助手",
    "Activeloop 可用于构建 RAG 系统的向量库",
    "Python 广泛应用于 AI 与数据科学领域",
    "RAG 系统通过检索提升大模型回答准确性"
]

# 生成模拟向量
vectors = [np.random.rand(128) for _ in range(4)]

with rag_ds:
    rag_ds.content.extend(documents)
    rag_ds.vector.extend(vectors)

# 模拟查询向量并计算相似度
query_vector = np.random.rand(128)
scores = []

for sample in rag_ds:
    vec = sample.vector.data()
    # 余弦相似度简化计算
    similarity = np.dot(query_vector, vec) / (np.linalg.norm(query_vector) * np.linalg.norm(vec))
    scores.append(similarity)

# 获取最相似的文档
best_idx = np.argmax(scores)
print("\n最匹配的文档:", rag_ds[best_idx].content.data())
print("匹配相似度:", round(scores[best_idx], 4))

代码说明:

  • 该示例完整模拟 RAG 系统中文档入库→向量存储→相似度检索流程;
  • 无需依赖 Pinecone、Chroma 等外部向量库,单机即可运行;
  • 适合个人开发者、小型项目快速搭建检索系统。

4.2 与大模型嵌入接口结合

Activeloop 可无缝对接 OpenAI、文心一言、豆包等大模型的嵌入接口,实现真实向量生成与存储。

# 模拟调用大模型生成嵌入向量(可替换为真实 API)
def mock_get_embedding(text: str) -> np.ndarray:
    return np.random.rand(128)

# 构建真实场景数据集
qa_ds = deeplake.dataset("./qa_dataset")
qa_ds.create_tensor("question", htype="text")
qa_ds.create_tensor("answer", htype="text")
qa_ds.create_tensor("q_embedding", htype="embedding")

qa_pairs = [
    {"q": "Activeloop 是什么", "a": "Activeloop 是 Python 数据集与向量管理库"},
    {"q": "如何安装 deeplake", "a": "使用 pip install deeplake 安装"},
    {"q": "RAG 全称是什么", "a": "RAG 全称是 Retrieval Augmented Generation"}
]

with qa_ds:
    for pair in qa_pairs:
        q_emb = mock_get_embedding(pair["q"])
        qa_ds.question.append(pair["q"])
        qa_ds.answer.append(pair["a"])
        qa_ds.q_embedding.append(q_emb)

print("问答数据集构建完成,样本数:", len(qa_ds))

代码说明:

  • mock_get_embedding 替换为真实模型接口,即可生成工业级向量库;
  • 数据集同时存储问题、答案、向量,形成完整 RAG 数据链路。

五、Activeloop 云端数据集使用

Activeloop 支持云端存储,实现多设备、多开发者共享数据集,无需手动传输文件。

5.1 登录与云端数据集创建

首先在命令行登录 Activeloop 账号:

deeplake login

按照提示输入用户名与密码,登录成功后即可创建云端数据集。

# 创建云端数据集(需登录)
# cloud_ds = deeplake.dataset("hub://用户名/my_cloud_dataset")

# 后续读写操作与本地数据集完全一致
# with cloud_ds:
#     cloud_ds.text.extend(["云端数据测试"])

代码说明:

  • 路径以 hub:// 开头表示云端数据集;
  • 读写 API 与本地完全一致,实现本地/云端无缝切换;
  • 适合团队协作、跨设备开发。

六、与 PyTorch/TensorFlow 框架对接

Activeloop 原生支持深度学习框架,可直接转换为框架可读取的数据集,简化训练数据加载流程。

# 转换为 PyTorch DataLoader
from torch.utils.data import DataLoader

# 构建适合训练的数据集
train_ds = deeplake.dataset("./train_dataset")
train_ds.create_tensor("image", htype="image")
train_ds.create_tensor("target", htype="class_label")

# 模拟图像数据
for i in range(10):
    train_ds.image.append(np.random.rand(28, 28, 3))
    train_ds.target.append(np.random.randint(0, 2))

# 转换为 PyTorch 数据集
pytorch_ds = train_ds.pytorch(batch_size=2, shuffle=True)
dataloader = DataLoader(pytorch_ds, batch_size=None)

# 读取训练批次
for batch in dataloader:
    print("图像批次形状:", batch["image"].shape)
    print("标签批次:", batch["target"])
    break

代码说明:

  • 通过 pytorch() 方法直接生成适配框架的数据格式;
  • 支持 shuffle、batch_size、num_workers 等训练参数;
  • 大幅减少数据预处理与格式转换代码量。

七、实际应用案例:简易智能问答系统

结合前面所有知识点,构建一个可直接运行的轻量级智能问答系统,完整体现 Activeloop 的实用价值。

import deeplake
import numpy as np

# 1. 构建知识库数据集
kb_ds = deeplake.dataset("./knowledge_base")
kb_ds.create_tensor("question", htype="text")
kb_ds.create_tensor("answer", htype="text")
kb_ds.create_tensor("embed", htype="embedding")

# 知识库内容
knowledge = [
    {"q": "Python 有哪些常用数据结构", "a": "列表、字典、元组、集合、堆、队列等"},
    {"q": "Activeloop 能做什么", "a": "管理数据集、存储向量、构建 RAG、对接深度学习框架"},
    {"q": "如何读取 deeplake 数据", "a": "通过索引 ds[i] 或遍历读取,使用 data() 获取原始数据"},
    {"q": "deeplake 支持云端吗", "a": "支持,使用 hub:// 路径创建云端数据集"}
]

# 写入数据
with kb_ds:
    for item in knowledge:
        embed = np.random.rand(128)
        kb_ds.question.append(item["q"])
        kb_ds.answer.append(item["a"])
        kb_ds.embed.append(embed)

# 2. 定义检索函数
def search_answer(user_query: str) -> str:
    query_embed = np.random.rand(128)
    max_sim = -1
    best_ans = ""

    for sample in kb_ds:
        sim = np.dot(query_embed, sample.embed.data()) / (
            np.linalg.norm(query_embed) * np.linalg.norm(sample.embed.data())
        )
        if sim > max_sim:
            max_sim = sim
            best_ans = sample.answer.data()
    return best_ans

# 3. 模拟用户提问
if __name__ == "__main__":
    user_query = "如何读取 deeplake 里面的数据"
    answer = search_answer(user_query)

    print("用户问题:", user_query)
    print("系统回答:", answer)

该案例可直接部署在本地,作为小型客服机器人、文档问答助手使用,代码简洁、依赖少、启动快,充分体现 Activeloop 在 AI 小项目中的高效性。

相关资源

  • Pypi地址:https://pypi.org/project/deeplake/
  • Github地址:https://github.com/activeloopai/deeplake
  • 官方文档地址:https://docs.activeloop.ai/

关注我,每天分享一个实用的Python自动化工具。