Python 数据管道神器:Dagster 从入门到实战完全教程

一、Dagster 基础认知

1.1 库用途与原理

Dagster 是面向数据流水线、ETL 与机器学习工作流的开源编排框架,专注数据资产的定义、调度、监控与可观测性,以数据资产为核心组织任务,替代传统脚本式调度。采用资产优先设计,通过代码定义数据依赖与执行逻辑,支持本地调试、多环境部署、任务重试与日志追踪。基于 Apache 2.0 开源协议,优点是调试友好、依赖清晰、易维护;缺点是学习成本高于简单调度工具,轻量场景略显冗余。

1.2 核心优势与适用场景

Dagster 最核心的价值,是把散乱的数据脚本变成可管理、可测试、可复用的数据资产,特别适合数据分析、数据仓库、机器学习流水线、定时 ETL、API 数据同步等场景。它和 Airflow、Prefect 最大的区别在于:从一开始就围绕“数据产出”设计,而不是单纯的任务调度。你可以在本地完整调试流水线,直接看到每个步骤的输入输出,上线后不会出现“本地能跑、线上挂掉”的黑盒问题。

同时 Dagster 原生支持类型检查、数据质量校验、重试机制、资源隔离、多环境配置(开发/测试/生产),让数据代码从“能用”变成“可靠”。对于团队协作场景,它能清晰展示数据血缘,谁在什么时候生成了什么数据、依赖哪些上游,一目了然。

二、Dagster 安装与环境准备

2.1 安装命令

在使用 Dagster 前,确保你已安装 Python 3.8+ 版本,使用 pip 即可完成安装:

# 安装核心库
pip install dagster

# 安装 Web UI 与命令行工具(强烈推荐)
pip install dagster-webserver dagit
  • dagster:核心库,提供资产、作业、调度、资源等基础能力。
  • dagster-webserver / dagit:提供可视化 Web UI,用于查看、调试、运行流水线。

安装完成后,可通过以下命令验证版本:

dagster --version

出现版本号即表示安装成功。

2.2 最小项目结构

Dagster 项目有约定俗成的目录结构,便于管理资产、作业、资源与配置,推荐小白使用如下结构:

dagster_demo/
├── __init__.py
├── assets/       # 数据资产存放目录
│   ├── __init__.py
│   └── core_assets.py  # 核心资产代码
├── jobs.py       # 作业定义
├── schedules.py  # 调度定义
└── dagster.py    # 项目入口

该结构把数据产出(assets)、执行单元(jobs)、定时规则(schedules) 分开,既符合工程规范,又方便后期扩展。

三、Dagster 核心概念与基础使用

3.1 核心概念理解

  1. Asset(资产):Dagster 最核心的概念,代表一个数据产出,如 CSV、表、模型文件、JSON 等,通过 @asset 装饰器定义。
  2. Job(作业):由一个或多个资产组成的可执行单元,用于一次性或定时运行。
  3. Resource(资源):外部连接封装,如数据库、S3、API 客户端,实现环境隔离。
  4. Schedule(调度):定时执行作业,支持 cron 表达式。
  5. Op:更细粒度的计算单元,资产底层基于 Op 实现。

3.2 第一个资产示例

我们从最简单的资产开始,创建一个生成文本文件的资产,直观感受 Dagster 的工作方式。

assets/core_assets.py 中编写代码:

from dagster import asset

# 定义第一个数据资产:生成欢迎信息
@asset
def hello_dagster_asset() -> str:
    """
    第一个 Dagster 资产
    输出:字符串文本
    """
    content = "Hello, Dagster! 这是我的第一个数据资产"
    print(content)
    return content

代码说明

  • 使用 @asset 装饰器将普通函数变成 Dagster 资产。
  • 函数返回值就是该资产的产出,可以是字符串、DataFrame、文件路径等。
  • 函数注释会在 Web UI 中展示,方便理解资产用途。

3.3 加载资产并启动 Web UI

在项目根目录的 dagster.py 中加载资产:

from dagster import Definitions, load_assets_from_modules

# 从 assets 模块加载所有资产
from . import assets

all_assets = load_assets_from_modules([assets])

# 定义整个项目的入口
defs = Definitions(
    assets=all_assets,
)

代码说明

  • Definitions 是 Dagster 项目的总入口,管理资产、作业、调度、资源。
  • load_assets_from_modules 自动扫描模块中的所有资产,无需手动逐个注册。

启动 Web UI:

# 在项目根目录执行
dagster-webserver -f dagster.py

启动成功后,访问:http://localhost:3000,即可看到可视化界面。在界面中可以看到 hello_dagster_asset 资产,点击“Materialize”即可运行,运行后可查看日志与输出结果。

四、带依赖的多资产流水线

4.1 资产依赖示例

真实场景中,数据通常有上下游依赖,Dagster 会自动根据函数参数识别依赖关系,无需手动配置。

扩展 core_assets.py

from dagster import asset
import pandas as pd
import os

# 上游资产:生成原始数据
@asset
def raw_user_data() -> pd.DataFrame:
    """生成原始用户数据,返回 DataFrame"""
    data = {
        "user_id": [1, 2, 3, 4, 5],
        "username": ["张三", "李四", "王五", "赵六", "钱七"],
        "age": [22, 25, 28, 24, 26]
    }
    df = pd.DataFrame(data)
    return df

# 下游资产:依赖 raw_user_data,进行数据清洗
@asset
def clean_user_data(raw_user_data: pd.DataFrame) -> pd.DataFrame:
    """
    依赖上游原始数据,进行清洗
    筛选年龄大于 24 的用户
    """
    clean_df = raw_user_data[raw_user_data["age"] > 24].copy()
    print("清洗后数据:")
    print(clean_df)
    return clean_df

# 最终资产:依赖清洗后的数据,输出到 CSV 文件
@asset
def user_data_report(clean_user_data: pd.DataFrame) -> str:
    """
    依赖清洗后的数据,生成 CSV 报告文件
    返回文件路径
    """
    output_path = "user_report.csv"
    clean_user_data.to_csv(output_path, index=False, encoding="utf-8-sig")
    return output_path

代码说明

  1. raw_user_data 是源头资产,无上游依赖,生成原始数据。
  2. clean_user_data 参数为 raw_user_data,Dagster 自动识别为上游依赖,必须先运行上游。
  3. user_data_report 依赖清洗后的数据,最终输出 CSV 文件。
  4. 整个流程形成一条完整的流水线:原始数据 → 清洗 → 生成报告。

重启 Web UI 后,可在界面看到资产依赖图,点击任意资产可选择运行上游依赖,支持单独运行、全链路运行、重试、查看输入输出

4.2 作业与手动执行

作业是资产的执行集合,我们把上述资产打包成一个作业,方便执行。

jobs.py 中定义:

from dagster import define_asset_job
from assets.core_assets import raw_user_data, clean_user_data, user_data_report

# 定义用户数据处理作业
user_data_job = define_asset_job(
    name="user_data_job",
    selection=[raw_user_data, clean_user_data, user_data_report]
)

dagster.py 中加入作业:

from dagster import Definitions, load_assets_from_modules
from .jobs import user_data_job
from . import assets

all_assets = load_assets_from_modules([assets])

defs = Definitions(
    assets=all_assets,
    jobs=[user_data_job],  # 注册作业
)

在 Web UI 的 Jobs 页面,可直接点击“Launch”运行整个作业,流水线会按依赖顺序自动执行。

五、调度与定时任务

5.1 定时调度实现

Dagster 支持 cron 表达式实现定时执行,例如每天、每小时运行流水线。

schedules.py 中定义:

from dagster import ScheduleDefinition
from jobs import user_data_job

# 每天 0 点执行用户数据作业
daily_user_data_schedule = ScheduleDefinition(
    job=user_data_job,
    cron_schedule="0 0 * * *",  # 分 时 日 月 周
    name="daily_user_data_schedule",
    description="每日凌晨执行用户数据处理任务"
)

dagster.py 中注册调度:

from dagster import Definitions, load_assets_from_modules
from . import assets
from .jobs import user_data_job
from .schedules import daily_user_data_schedule

all_assets = load_assets_from_modules([assets])

defs = Definitions(
    assets=all_assets,
    jobs=[user_data_job],
    schedules=[daily_user_data_schedule],  # 注册调度
)

启动 Web UI 后,在 Schedules 页面可开启/关闭调度,查看历史运行记录与日志。

六、资源与环境隔离(数据库示例)

6.1 资源封装

资源用于封装外部连接,实现开发/生产环境隔离,避免代码硬编码配置。以 SQLite 为例:

在项目中新建 resources.py

from dagster import resource
import sqlite3

class SQLiteResource:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = None

    def connect(self):
        self.conn = sqlite3.connect(self.db_path)
        return self.conn

    def close(self):
        if self.conn:
            self.conn.close()

@resource
def sqlite_resource(context) -> SQLiteResource:
    """SQLite 资源定义"""
    db_path = "dagster_demo.db"
    return SQLiteResource(db_path)

在资产中使用资源:

from dagster import asset
from resources import SQLiteResource

@asset
def create_user_table(sqlite_resource: SQLiteResource) -> None:
    """使用资源创建用户表"""
    conn = sqlite_resource.connect()
    cursor = conn.cursor()
    create_sql = """
    CREATE TABLE IF NOT EXISTS users (
        user_id INTEGER PRIMARY KEY,
        username TEXT,
        age INTEGER
    )
    """
    cursor.execute(create_sql)
    conn.commit()
    sqlite_resource.close()

代码说明

  • 资源通过参数注入资产,解耦代码与外部配置。
  • 切换环境只需修改资源,无需改动业务资产代码。

七、完整实战案例:自动化数据统计报告

7.1 案例需求

  1. 从模拟数据源获取原始数据。
  2. 数据清洗与统计分析。
  3. 结果存入数据库。
  4. 生成可视化报告文件。
  5. 每日定时执行。

7.2 完整代码实现

# assets/report_assets.py
from dagster import asset
import pandas as pd
import sqlite3
from datetime import datetime

@asset
def source_data() -> pd.DataFrame:
    """模拟来源数据:每日订单数据"""
    date = datetime.now().strftime("%Y-%m-%d")
    data = {
        "order_id": [1001, 1002, 1003, 1004, 1005],
        "amount": [99, 199, 299, 99, 499],
        "status": ["success", "fail", "success", "success", "fail"],
        "date": [date] * 5
    }
    return pd.DataFrame(data)

@asset
def stat_order_data(source_data: pd.DataFrame) -> pd.DataFrame:
    """统计成功订单数据"""
    success_df = source_data[source_data["status"] == "success"]
    stat_df = success_df.groupby("date").agg(
        total_order=("order_id", "count"),
        total_amount=("amount", "sum")
    ).reset_index()
    return stat_df

@asset
def save_to_db(stat_order_data: pd.DataFrame) -> None:
    """将统计结果存入 SQLite"""
    conn = sqlite3.connect("dagster_demo.db")
    stat_order_data.to_sql("order_stat", conn, if_exists="append", index=False)
    conn.close()

@asset
def generate_report(stat_order_data: pd.DataFrame) -> str:
    """生成每日统计报告"""
    report_path = f"order_report_{datetime.now().strftime('%Y%m%d')}.txt"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write("每日订单统计报告\n")
        f.write(stat_order_data.to_string())
    return report_path

将上述资产加载到 Definitions 中,打包成作业并配置每日调度,即可实现全自动数据统计+入库+报告生成

在 Web UI 中可查看:

  • 资产依赖关系
  • 每一步输入输出
  • 运行日志与报错信息
  • 历史执行记录
  • 数据血缘追踪

八、Dagster 部署与扩展

Dagster 支持多种部署方式:

  • 本地单机运行(适合开发调试)
  • Docker 容器化部署
  • Kubernetes 集群部署(生产高可用)
  • 配合 Dagster Cloud 托管服务

生产环境中,通常使用持久化元数据存储(PostgreSQL)+ 独立工作节点,实现高可用与分布式执行。

相关资源

  • Pypi地址:https://pypi.org/project/dagster
  • Github地址:https://github.com/dagster-io/dagster
  • 官方文档地址:https://docs.dagster.io

关注我,每天分享一个实用的Python自动化工具。