一、ZenML 库概述
ZenML 是一款面向机器学习与 MLOps 领域的开源 Python 库,核心用于构建可复用、可复现、可迁移的端到端 ML 流水线,屏蔽底层环境差异,统一本地、云端、分布式集群的流水线执行逻辑。其基于流水线与步骤抽象设计,将数据读取、预处理、训练、评估、部署拆分为可编排步骤,底层通过配置文件管理运行时环境与组件。优点是轻量化、易上手、跨平台兼容、支持多框架协同,缺点是复杂分布式调度能力弱于 Kubeflow。采用 Apache License 2.0 开源许可,商用友好。

二、ZenML 安装与初始化环境
2.1 基础安装
在使用 ZenML 前,我们需要通过 pip 完成安装,打开命令行执行以下指令:
pip install zenml该命令会安装 ZenML 核心库以及基础依赖,适合本地快速体验与开发。如果需要对接云服务、数据库、分布式训练等扩展功能,还可以安装对应扩展包。
2.2 初始化 ZenML 环境
安装完成后,必须先初始化 ZenML 工作环境,这一步会创建本地配置文件、数据库、存储目录等核心结构,是后续所有操作的前提。
zenml init执行成功后,会在当前目录生成 .zenml 隐藏文件夹,用于存储流水线配置、运行记录、元数据等信息。
2.3 安装常用扩展组件
机器学习流水线通常需要对接数据、模型、可视化工具,因此我们安装常用扩展组件:
# 安装可视化、数据处理、模型训练相关扩展
pip install "zenml[server,data,model,tensorflow,sklearn]"安装完成后,可以启动 ZenML 本地服务,用于查看流水线运行状态、元数据、实验记录等:
zenml up启动成功后,默认访问地址为 http://127.0.0.1:8237,打开浏览器即可进入 ZenML 可视化控制台。
三、ZenML 核心概念与基础使用
3.1 核心概念解析
- 步骤(Step):流水线中最小执行单元,例如数据加载、数据清洗、模型训练、模型评估,每个步骤都是独立函数,通过装饰器标记。
- 流水线(Pipeline):由多个步骤按逻辑顺序组合而成,定义完整机器学习工作流,一次定义可多次运行。
- 工件(Artifact):步骤之间传递的数据或模型,ZenML 自动管理工件的存储、读取、版本管理,无需手动处理文件读写。
- 栈(Stack):定义流水线运行环境,包括编排引擎、元数据存储、工件存储、部署引擎等,本地默认使用本地栈,可无缝切换云端栈。
- 运行(Run):流水线的一次执行过程,所有步骤日志、结果、指标都会被记录,支持回溯查看。
3.2 第一个 ZenML 流水线
我们从最简单的示例开始,创建两个步骤并组合成流水线,理解 ZenML 的基础用法。
3.2.1 代码实现
# 导入核心装饰器
from zenml import step, pipeline
# 定义第一个步骤:生成数据
@step
def generate_data() -> int:
"""生成一个整数数据"""
data = 100
print(f"生成数据:{data}")
return data
# 定义第二个步骤:处理数据
@step
def process_data(input_data: int) -> int:
"""对输入数据进行处理,乘以2"""
result = input_data * 2
print(f"处理后数据:{result}")
return result
# 定义流水线:组合步骤
@pipeline
def simple_ml_pipeline():
"""最简单的 ZenML 流水线"""
data = generate_data()
output = process_data(data)
# 运行流水线
if __name__ == "__main__":
simple_ml_pipeline()3.2.2 代码说明
- 使用
@step装饰器将普通函数标记为 ZenML 步骤,函数的输入输出会自动被 ZenML 管理为工件。 - 使用
@pipeline装饰器将步骤组合为流水线,内部按顺序调用步骤,自动处理数据传递。 - 运行脚本后,ZenML 会自动记录运行日志、步骤执行顺序、数据传递结果,可在控制台查看。
执行代码后,命令行会输出执行过程,浏览器控制台会新增一条流水线运行记录,展示每个步骤的执行状态、耗时、输出结果。
四、基于 Sklearn 的机器学习实战流水线
4.1 实战场景说明
本案例使用经典鸢尾花数据集,构建完整机器学习流水线,包含:数据加载、数据划分、模型训练、模型评估四个核心步骤,使用 Sklearn 实现算法,ZenML 完成流水线编排与管理。
4.2 完整代码实现
from zenml import step, pipeline
from zenml.client import Client
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# 步骤1:加载数据集
@step
def load_dataset() -> pd.DataFrame:
"""加载鸢尾花数据集并转换为DataFrame"""
iris = load_iris()
data = pd.DataFrame(iris.data, columns=iris.feature_names)
data["target"] = iris.target
print("数据集加载完成,形状:", data.shape)
return data
# 步骤2:划分训练集和测试集
@step
def split_dataset(
data: pd.DataFrame, test_size: float = 0.2
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""划分训练集和测试集"""
X = data.drop("target", axis=1)
y = data["target"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
print(f"训练集样本数:{len(X_train)},测试集样本数:{len(X_test)}")
return X_train, X_test, y_train, y_test
# 步骤3:训练随机森林模型
@step
def train_model(
X_train: pd.DataFrame, y_train: pd.Series
) -> RandomForestClassifier:
"""使用随机森林算法训练模型"""
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
print("模型训练完成")
return model
# 步骤4:评估模型
@step
def evaluate_model(
model: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.Series
) -> float:
"""评估模型并输出准确率"""
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"模型测试集准确率:{acc:.4f}")
return acc
# 定义完整机器学习流水线
@pipeline
def iris_classification_pipeline(test_size: float = 0.2):
"""鸢尾花分类完整流水线"""
data = load_dataset()
X_train, X_test, y_train, y_test = split_dataset(data, test_size=test_size)
model = train_model(X_train, y_train)
accuracy = evaluate_model(model, X_test, y_test)
# 运行流水线
if __name__ == "__main__":
# 执行流水线
run = iris_classification_pipeline(test_size=0.3)
# 查看运行结果
client = Client()
latest_run = client.get_pipeline_run("iris_classification_pipeline")
print(f"最新运行ID:{latest_run.id}")
print(f"最终准确率:{latest_run.steps['evaluate_model'].output.read()}")4.3 代码说明
- 四个步骤分别承担数据、划分、训练、评估职责,解耦代码结构,便于单独修改、调试、复用。
- 步骤之间自动传递 DataFrame、模型、数组等复杂对象,无需手动保存文件、读取文件。
- 流水线支持传入参数(如 test_size),可灵活调整配置,多次运行对比结果。
- 通过 ZenML Client 可以读取历史运行结果、步骤输出、元数据,便于后续自动化分析。
运行代码后,控制台会输出数据集信息、样本划分结果、模型训练状态与最终准确率,同时所有信息会同步到 ZenML 控制台,可查看流水线 DAG 图、步骤耗时、模型指标、数据版本等。
五、基于 TensorFlow 的深度学习流水线实战
5.1 实战场景说明
使用简单神经网络对鸢尾花数据集进行分类,展示 ZenML 对接深度学习框架的能力,步骤包括:数据加载、数据预处理、模型构建、模型训练、模型评估。
5.2 完整代码实现
from zenml import step, pipeline
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical
# 步骤1:加载数据
@step
def load_iris_data() -> pd.DataFrame:
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df["target"] = iris.target
return df
# 步骤2:数据预处理
@step
def preprocess_data(
df: pd.DataFrame
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
X = df.drop("target", axis=1).values
y = df["target"].values
# 标准化
scaler = StandardScaler()
X = scaler.fit_transform(X)
# 独热编码
y = to_categorical(y, num_classes=3)
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
return X_train, X_test, y_train, y_test
# 步骤3:构建神经网络模型
@step
def build_dnn_model(input_shape: int) -> Sequential:
model = Sequential()
model.add(Dense(16, activation="relu", input_shape=(input_shape,)))
model.add(Dense(8, activation="relu"))
model.add(Dense(3, activation="softmax"))
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=["accuracy"]
)
return model
# 步骤4:训练模型
@step
def train_dnn_model(
model: Sequential, X_train: np.ndarray, y_train: np.ndarray
) -> Sequential:
model.fit(
X_train, y_train,
epochs=50,
batch_size=4,
validation_split=0.1,
verbose=1
)
return model
# 步骤5:评估模型
@step
def test_dnn_model(
model: Sequential, X_test: np.ndarray, y_test: np.ndarray
) -> dict:
loss, acc = model.evaluate(X_test, y_test, verbose=0)
result = {"test_loss": loss, "test_accuracy": acc}
print(f"测试损失:{loss:.4f},测试准确率:{acc:.4f}")
return result
# 定义深度学习流水线
@pipeline
def iris_dnn_pipeline():
df = load_iris_data()
X_train, X_test, y_train, y_test = preprocess_data(df)
model = build_dnn_model(input_shape=X_train.shape[1])
trained_model = train_dnn_model(model, X_train, y_train)
metrics = test_dnn_model(trained_model, X_test, y_test)
if __name__ == "__main__":
iris_dnn_pipeline()5.3 代码说明
- ZenML 可以无缝对接 TensorFlow、PyTorch 等深度学习框架,自动序列化、存储、加载模型。
- 预处理步骤包含标准化、独热编码、数据集划分,符合深度学习数据处理规范。
- 模型训练过程中的日志、指标、结构都会被 ZenML 记录,便于对比不同超参数效果。
- 流水线结构清晰,可直接用于生产环境,替换数据集即可快速迁移到其他项目。
六、ZenML 流水线高级用法
6.1 流水线配置化运行
支持通过外部参数控制流水线行为,适配不同环境、不同数据集、不同超参数,实现一次编写、多次灵活运行。
@pipeline
def configurable_pipeline(
epochs: int = 50,
test_size: float = 0.2,
model_type: str = "random_forest"
):
# 内部根据参数选择不同模型或逻辑
pass6.2 查看历史运行记录
from zenml.client import Client
client = Client()
# 获取所有流水线
pipelines = client.list_pipelines()
for p in pipelines:
print(p.name)
# 获取某条流水线的所有运行记录
runs = client.get_pipeline("iris_classification_pipeline").runs
for r in runs:
print(f"运行时间:{r.created},状态:{r.status}")6.3 流水线缓存机制
ZenML 默认开启缓存,未修改的步骤会直接使用上一次运行结果,大幅提升调试速度:
@step(enable_cache=False) # 关闭当前步骤缓存
def dynamic_step():
pass七、实际项目应用价值与代码价值
在实际机器学习项目中,传统开发模式常面临代码混乱、不可复现、环境迁移困难、实验记录丢失等问题。使用 ZenML 可以将整个工作流结构化,每个步骤独立可维护,所有实验自动记录,方便回溯最优模型。
在团队协作中,统一的流水线规范可以降低沟通成本,新成员可快速理解工作流程;在部署阶段,本地编写的流水线无需大量修改即可运行在云端服务器、K8s 集群等环境,实现从开发到生产的平滑迁移。
上述鸢尾花分类、深度学习流水线代码,可直接作为项目模板,替换数据集、调整模型结构、修改评估指标,即可应用于图像分类、表格数据预测、NLP 任务等多种场景,真正实现一套流水线适配多任务、多环境。
八、相关资源
- Pypi地址:https://pypi.org/project/zenml
- Github地址:https://github.com/zenml-io/zenml
- 官方文档地址:https://docs.zenml.io
关注我,每天分享一个实用的Python自动化工具。

