一、Kedro 库简介
Kedro 是面向生产级数据工程与数据科学的 Python 框架,专注标准化、可复用、可维护的数据管道构建,基于模块化与配置驱动思想,将数据处理流程拆分为节点与管道,支持版本管理、测试、文档自动化。优点是工程化规范、协作友好、适合复杂项目;缺点是轻量场景略显繁琐。采用 Apache 2.0 开源许可。

二、Kedro 安装与环境准备
2.1 环境要求
Kedro 支持 Python 3.8 及以上版本,兼容 Windows、macOS、Linux 系统,可与 pandas、numpy、scikit-learn、PySpark 无缝集成,适合单机与分布式数据工程。
2.2 安装命令
打开终端或命令提示符,执行以下命令完成安装:
pip install kedro如需使用可视化工具,可安装扩展:
pip install kedro-viz安装完成后,验证版本:
kedro --version出现版本号即表示安装成功。
三、Kedro 核心概念与工作流程
3.1 核心概念
- 项目(Project):Kedro 工程的根目录,统一管理代码、数据、配置、文档。
- 节点(Node):最小执行单元,对应一个 Python 函数,负责单一数据处理逻辑。
- 管道(Pipeline):多个节点按依赖关系组合而成的执行流程,自动解析执行顺序。
- 目录(Catalog):数据入口配置文件,统一管理数据读取与写入,支持多种格式。
- 参数(Parameters):集中管理配置参数,便于修改与环境切换。
- 运行(Run):执行整个或部分数据管道,自动处理依赖与数据流转。
3.2 工作原理
Kedro 通过声明式编程定义数据处理逻辑,不直接硬编码读写路径与执行顺序,而是通过 YAML 配置文件声明数据与参数,通过函数定义节点逻辑,自动构建依赖图并按拓扑顺序执行,保证流程可复现、可测试、可扩展。
四、Kedro 项目创建与目录结构
4.1 创建新项目
在终端进入工作目录,执行命令创建 Kedro 项目:
kedro new按提示输入项目名称,例如 kedro_demo,等待项目生成。
4.2 标准目录结构
kedro_demo/
├── conf/ # 配置文件(catalog、parameters)
│ ├── base/
│ └── local/
├── data/ # 数据目录(原始、中间、模型、输出)
│ ├── 01_raw/
│ ├── 02_intermediate/
│ ├── 03_primary/
│ ├── 04_feature/
│ ├── 05_model_input/
│ ├── 06_models/
│ ├── 07_model_output/
│ └── 08_reporting/
├── docs/ # 项目文档
├── kedro_demo/ # 主源码目录
│ ├── __init__.py
│ ├── __main__.py
│ ├── pipeline_registry.py # 管道注册
│ └── pipelines/ # 管道实现
├── logs/ # 运行日志
├── tests/ # 单元测试
├── .gitignore
├── pyproject.toml
└── README.md该结构遵循数据工程最佳实践,从原始数据到报告输出分层管理,避免混乱。
五、基础使用:从零构建第一个数据管道
5.1 编写数据处理函数
进入 kedro_demo/pipelines,创建 demo_pipeline 文件夹,新增 nodes.py:
# -*- coding: utf-8 -*-
import pandas as pd
def load_raw_data(file_path: str) -> pd.DataFrame:
"""
读取原始CSV数据
"""
df = pd.read_csv(file_path)
return df
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""
数据清洗:去重、缺失值填充
"""
df = df.drop_duplicates()
df = df.fillna(0)
return df
def calculate_stats(df: pd.DataFrame) -> pd.DataFrame:
"""
简单统计计算:新增均值列
"""
df['mean_value'] = df.select_dtypes(include='number').mean(axis=1)
return df5.2 构建管道
同目录创建 pipeline.py:
# -*- coding: utf-8 -*-
from kedro.pipeline import Pipeline, node
from .nodes import load_raw_data, clean_data, calculate_stats
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
node(
func=load_raw_data,
inputs="params:raw_data_path",
outputs="raw_data",
name="load_raw_data_node",
),
node(
func=clean_data,
inputs="raw_data",
outputs="cleaned_data",
name="clean_data_node",
),
node(
func=calculate_stats,
inputs="cleaned_data",
outputs="stats_data",
name="calculate_stats_node",
),
]
)Kedro 会根据 inputs 和 outputs 自动确定执行顺序。
5.3 配置数据目录
在 conf/base/catalog.yml 中添加:
raw_data:
type: pandas.CSVDataSet
filepath: data/01_raw/input.csv
cleaned_data:
type: pandas.CSVDataSet
filepath: data/02_intermediate/cleaned.csv
stats_data:
type: pandas.CSVDataSet
filepath: data/03_primary/stats.csv5.4 配置参数
在 conf/base/parameters.yml 中添加:
raw_data_path: "data/01_raw/input.csv"5.5 注册管道
打开 pipeline_registry.py,注册管道:
from kedro.framework.pipeline import Pipeline
from kedro_demo.pipelines.demo_pipeline import create_pipeline as demo_pipeline
def register_pipelines() -> dict[str, Pipeline]:
pipelines = {
"__default__": demo_pipeline(),
"demo": demo_pipeline(),
}
return pipelines5.6 准备测试数据
在 data/01_raw 下创建 input.csv:
id,value1,value2
1,10,20
2,,30
3,40,
1,10,205.7 运行管道
在项目根目录执行:
kedro run运行成功后,可在对应目录看到输出文件。
六、进阶使用:参数化、可视化与多环境
6.1 使用动态参数
修改 parameters.yml:
raw_data_path: "data/01_raw/input.csv"
fill_value: 0
drop_duplicates: True更新 nodes.py:
def clean_data(df: pd.DataFrame, fill_value: int, drop_duplicates: bool) -> pd.DataFrame:
if drop_duplicates:
df = df.drop_duplicates()
df = df.fillna(fill_value)
return df修改管道节点:
node(
func=clean_data,
inputs=dict(df="raw_data", fill_value="params:fill_value", drop_duplicates="params:drop_duplicates"),
outputs="cleaned_data",
name="clean_data_node",
),再次运行即可使用新参数。
6.2 管道可视化
执行命令启动可视化服务:
kedro viz run浏览器自动打开界面,可查看节点依赖、运行状态、数据流向,支持交互式查看。
6.3 多环境切换
Kedro 支持 base、local、prod 等多环境配置,只需在 conf/ 下新建环境文件夹,覆盖对应配置即可,运行时指定环境:
kedro run --env=prod七、机器学习实战:基于 Kedro 的分类模型 pipeline
7.1 编写机器学习节点
新建 ml_pipeline 文件夹,nodes.py:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
def split_data(df: pd.DataFrame, test_size: float, random_state: int):
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series):
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
return model
def evaluate_model(model, X_test: pd.DataFrame, y_test: pd.Series):
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
return {"accuracy": acc}7.2 构建机器学习管道
pipeline.py:
from kedro.pipeline import Pipeline, node
from .nodes import split_data, train_model, evaluate_model
def create_ml_pipeline(**kwargs):
return Pipeline(
[
node(
split_data,
inputs=["stats_data", "params:test_size", "params:random_state"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data",
),
node(
train_model,
inputs=["X_train", "y_train"],
outputs="model",
name="train_model",
),
node(
evaluate_model,
inputs=["model", "X_test", "y_test"],
outputs="metrics",
name="evaluate_model",
),
]
)7.3 配置与运行
在 catalog.yml 配置模型与指标输出:
model:
type: pickle.PickleDataSet
filepath: data/06_models/model.pkl
metrics:
type: json.JSONDataSet
filepath: data/07_model_output/metrics.json在参数中添加:
test_size: 0.2
random_state: 42注册管道后运行:
kedro run可得到训练好的模型与评估结果。
八、Kedro 优势与适用场景
Kedro 解决了数据科学项目代码混乱、难以复现、协作成本高的问题,通过强制工程化规范,让脚本式代码升级为可维护、可测试、可部署的生产级项目。适合团队协作、长期维护、需要上线部署的数据管道与机器学习项目,尤其在数据清洗、特征工程、模型训练、批量预测场景优势明显。
它将数据科学家从路径管理、依赖混乱、环境不一致中解放出来,专注算法与逻辑本身,同时让数据工程与生产环境对接更平滑,支持直接对接 Airflow、Prefect、Kubeflow 等调度工具。
相关资源
- Pypi地址:https://pypi.org/project/kedro/
- Github地址:https://github.com/kedro-org/kedro
- 官方文档地址:https://docs.kedro.org/
关注我,每天分享一个实用的Python自动化工具。

