Python 数据管道神器:Kedro 从入门到实战,轻松构建可复用数据工程

一、Kedro 库简介

Kedro 是面向生产级数据工程与数据科学的 Python 框架,专注标准化、可复用、可维护的数据管道构建,基于模块化与配置驱动思想,将数据处理流程拆分为节点与管道,支持版本管理、测试、文档自动化。优点是工程化规范、协作友好、适合复杂项目;缺点是轻量场景略显繁琐。采用 Apache 2.0 开源许可。

二、Kedro 安装与环境准备

2.1 环境要求

Kedro 支持 Python 3.8 及以上版本,兼容 Windows、macOS、Linux 系统,可与 pandas、numpy、scikit-learn、PySpark 无缝集成,适合单机与分布式数据工程。

2.2 安装命令

打开终端或命令提示符,执行以下命令完成安装:

pip install kedro

如需使用可视化工具,可安装扩展:

pip install kedro-viz

安装完成后,验证版本:

kedro --version

出现版本号即表示安装成功。

三、Kedro 核心概念与工作流程

3.1 核心概念

  • 项目(Project):Kedro 工程的根目录,统一管理代码、数据、配置、文档。
  • 节点(Node):最小执行单元,对应一个 Python 函数,负责单一数据处理逻辑。
  • 管道(Pipeline):多个节点按依赖关系组合而成的执行流程,自动解析执行顺序。
  • 目录(Catalog):数据入口配置文件,统一管理数据读取与写入,支持多种格式。
  • 参数(Parameters):集中管理配置参数,便于修改与环境切换。
  • 运行(Run):执行整个或部分数据管道,自动处理依赖与数据流转。

3.2 工作原理

Kedro 通过声明式编程定义数据处理逻辑,不直接硬编码读写路径与执行顺序,而是通过 YAML 配置文件声明数据与参数,通过函数定义节点逻辑,自动构建依赖图并按拓扑顺序执行,保证流程可复现、可测试、可扩展。

四、Kedro 项目创建与目录结构

4.1 创建新项目

在终端进入工作目录,执行命令创建 Kedro 项目:

kedro new

按提示输入项目名称,例如 kedro_demo,等待项目生成。

4.2 标准目录结构

kedro_demo/
├── conf/                # 配置文件(catalog、parameters)
│   ├── base/
│   └── local/
├── data/                # 数据目录(原始、中间、模型、输出)
│   ├── 01_raw/
│   ├── 02_intermediate/
│   ├── 03_primary/
│   ├── 04_feature/
│   ├── 05_model_input/
│   ├── 06_models/
│   ├── 07_model_output/
│   └── 08_reporting/
├── docs/                # 项目文档
├── kedro_demo/          # 主源码目录
│   ├── __init__.py
│   ├── __main__.py
│   ├── pipeline_registry.py  # 管道注册
│   └── pipelines/        # 管道实现
├── logs/                # 运行日志
├── tests/               # 单元测试
├── .gitignore
├── pyproject.toml
└── README.md

该结构遵循数据工程最佳实践,从原始数据到报告输出分层管理,避免混乱。

五、基础使用:从零构建第一个数据管道

5.1 编写数据处理函数

进入 kedro_demo/pipelines,创建 demo_pipeline 文件夹,新增 nodes.py

# -*- coding: utf-8 -*-
import pandas as pd

def load_raw_data(file_path: str) -> pd.DataFrame:
    """
    读取原始CSV数据
    """
    df = pd.read_csv(file_path)
    return df

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    数据清洗:去重、缺失值填充
    """
    df = df.drop_duplicates()
    df = df.fillna(0)
    return df

def calculate_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    简单统计计算:新增均值列
    """
    df['mean_value'] = df.select_dtypes(include='number').mean(axis=1)
    return df

5.2 构建管道

同目录创建 pipeline.py

# -*- coding: utf-8 -*-
from kedro.pipeline import Pipeline, node
from .nodes import load_raw_data, clean_data, calculate_stats

def create_pipeline(**kwargs) -> Pipeline:
    return Pipeline(
        [
            node(
                func=load_raw_data,
                inputs="params:raw_data_path",
                outputs="raw_data",
                name="load_raw_data_node",
            ),
            node(
                func=clean_data,
                inputs="raw_data",
                outputs="cleaned_data",
                name="clean_data_node",
            ),
            node(
                func=calculate_stats,
                inputs="cleaned_data",
                outputs="stats_data",
                name="calculate_stats_node",
            ),
        ]
    )

Kedro 会根据 inputsoutputs 自动确定执行顺序。

5.3 配置数据目录

conf/base/catalog.yml 中添加:

raw_data:
  type: pandas.CSVDataSet
  filepath: data/01_raw/input.csv

cleaned_data:
  type: pandas.CSVDataSet
  filepath: data/02_intermediate/cleaned.csv

stats_data:
  type: pandas.CSVDataSet
  filepath: data/03_primary/stats.csv

5.4 配置参数

conf/base/parameters.yml 中添加:

raw_data_path: "data/01_raw/input.csv"

5.5 注册管道

打开 pipeline_registry.py,注册管道:

from kedro.framework.pipeline import Pipeline
from kedro_demo.pipelines.demo_pipeline import create_pipeline as demo_pipeline

def register_pipelines() -> dict[str, Pipeline]:
    pipelines = {
        "__default__": demo_pipeline(),
        "demo": demo_pipeline(),
    }
    return pipelines

5.6 准备测试数据

data/01_raw 下创建 input.csv

id,value1,value2
1,10,20
2,,30
3,40,
1,10,20

5.7 运行管道

在项目根目录执行:

kedro run

运行成功后,可在对应目录看到输出文件。

六、进阶使用:参数化、可视化与多环境

6.1 使用动态参数

修改 parameters.yml

raw_data_path: "data/01_raw/input.csv"
fill_value: 0
drop_duplicates: True

更新 nodes.py

def clean_data(df: pd.DataFrame, fill_value: int, drop_duplicates: bool) -> pd.DataFrame:
    if drop_duplicates:
        df = df.drop_duplicates()
    df = df.fillna(fill_value)
    return df

修改管道节点:

node(
    func=clean_data,
    inputs=dict(df="raw_data", fill_value="params:fill_value", drop_duplicates="params:drop_duplicates"),
    outputs="cleaned_data",
    name="clean_data_node",
),

再次运行即可使用新参数。

6.2 管道可视化

执行命令启动可视化服务:

kedro viz run

浏览器自动打开界面,可查看节点依赖、运行状态、数据流向,支持交互式查看。

6.3 多环境切换

Kedro 支持 baselocalprod 等多环境配置,只需在 conf/ 下新建环境文件夹,覆盖对应配置即可,运行时指定环境:

kedro run --env=prod

七、机器学习实战:基于 Kedro 的分类模型 pipeline

7.1 编写机器学习节点

新建 ml_pipeline 文件夹,nodes.py

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

def split_data(df: pd.DataFrame, test_size: float, random_state: int):
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )
    return X_train, X_test, y_train, y_test

def train_model(X_train: pd.DataFrame, y_train: pd.Series):
    model = LogisticRegression(max_iter=1000)
    model.fit(X_train, y_train)
    return model

def evaluate_model(model, X_test: pd.DataFrame, y_test: pd.Series):
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    return {"accuracy": acc}

7.2 构建机器学习管道

pipeline.py

from kedro.pipeline import Pipeline, node
from .nodes import split_data, train_model, evaluate_model

def create_ml_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                split_data,
                inputs=["stats_data", "params:test_size", "params:random_state"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data",
            ),
            node(
                train_model,
                inputs=["X_train", "y_train"],
                outputs="model",
                name="train_model",
            ),
            node(
                evaluate_model,
                inputs=["model", "X_test", "y_test"],
                outputs="metrics",
                name="evaluate_model",
            ),
        ]
    )

7.3 配置与运行

catalog.yml 配置模型与指标输出:

model:
  type: pickle.PickleDataSet
  filepath: data/06_models/model.pkl

metrics:
  type: json.JSONDataSet
  filepath: data/07_model_output/metrics.json

在参数中添加:

test_size: 0.2
random_state: 42

注册管道后运行:

kedro run

可得到训练好的模型与评估结果。

八、Kedro 优势与适用场景

Kedro 解决了数据科学项目代码混乱、难以复现、协作成本高的问题,通过强制工程化规范,让脚本式代码升级为可维护、可测试、可部署的生产级项目。适合团队协作、长期维护、需要上线部署的数据管道与机器学习项目,尤其在数据清洗、特征工程、模型训练、批量预测场景优势明显。

它将数据科学家从路径管理、依赖混乱、环境不一致中解放出来,专注算法与逻辑本身,同时让数据工程与生产环境对接更平滑,支持直接对接 Airflow、Prefect、Kubeflow 等调度工具。

相关资源

  • Pypi地址:https://pypi.org/project/kedro/
  • Github地址:https://github.com/kedro-org/kedro
  • 官方文档地址:https://docs.kedro.org/

关注我,每天分享一个实用的Python自动化工具。