一、Dagster 基础认知
1.1 库用途与原理
Dagster 是面向数据流水线、ETL 与机器学习工作流的开源编排框架,专注数据资产的定义、调度、监控与可观测性,以数据资产为核心组织任务,替代传统脚本式调度。采用资产优先设计,通过代码定义数据依赖与执行逻辑,支持本地调试、多环境部署、任务重试与日志追踪。基于 Apache 2.0 开源协议,优点是调试友好、依赖清晰、易维护;缺点是学习成本高于简单调度工具,轻量场景略显冗余。

1.2 核心优势与适用场景
Dagster 最核心的价值,是把散乱的数据脚本变成可管理、可测试、可复用的数据资产,特别适合数据分析、数据仓库、机器学习流水线、定时 ETL、API 数据同步等场景。它和 Airflow、Prefect 最大的区别在于:从一开始就围绕“数据产出”设计,而不是单纯的任务调度。你可以在本地完整调试流水线,直接看到每个步骤的输入输出,上线后不会出现“本地能跑、线上挂掉”的黑盒问题。
同时 Dagster 原生支持类型检查、数据质量校验、重试机制、资源隔离、多环境配置(开发/测试/生产),让数据代码从“能用”变成“可靠”。对于团队协作场景,它能清晰展示数据血缘,谁在什么时候生成了什么数据、依赖哪些上游,一目了然。
二、Dagster 安装与环境准备
2.1 安装命令
在使用 Dagster 前,确保你已安装 Python 3.8+ 版本,使用 pip 即可完成安装:
# 安装核心库
pip install dagster
# 安装 Web UI 与命令行工具(强烈推荐)
pip install dagster-webserver dagitdagster:核心库,提供资产、作业、调度、资源等基础能力。dagster-webserver / dagit:提供可视化 Web UI,用于查看、调试、运行流水线。
安装完成后,可通过以下命令验证版本:
dagster --version出现版本号即表示安装成功。
2.2 最小项目结构
Dagster 项目有约定俗成的目录结构,便于管理资产、作业、资源与配置,推荐小白使用如下结构:
dagster_demo/
├── __init__.py
├── assets/ # 数据资产存放目录
│ ├── __init__.py
│ └── core_assets.py # 核心资产代码
├── jobs.py # 作业定义
├── schedules.py # 调度定义
└── dagster.py # 项目入口该结构把数据产出(assets)、执行单元(jobs)、定时规则(schedules) 分开,既符合工程规范,又方便后期扩展。
三、Dagster 核心概念与基础使用
3.1 核心概念理解
- Asset(资产):Dagster 最核心的概念,代表一个数据产出,如 CSV、表、模型文件、JSON 等,通过
@asset装饰器定义。 - Job(作业):由一个或多个资产组成的可执行单元,用于一次性或定时运行。
- Resource(资源):外部连接封装,如数据库、S3、API 客户端,实现环境隔离。
- Schedule(调度):定时执行作业,支持 cron 表达式。
- Op:更细粒度的计算单元,资产底层基于 Op 实现。
3.2 第一个资产示例
我们从最简单的资产开始,创建一个生成文本文件的资产,直观感受 Dagster 的工作方式。
在 assets/core_assets.py 中编写代码:
from dagster import asset
# 定义第一个数据资产:生成欢迎信息
@asset
def hello_dagster_asset() -> str:
"""
第一个 Dagster 资产
输出:字符串文本
"""
content = "Hello, Dagster! 这是我的第一个数据资产"
print(content)
return content代码说明:
- 使用
@asset装饰器将普通函数变成 Dagster 资产。 - 函数返回值就是该资产的产出,可以是字符串、DataFrame、文件路径等。
- 函数注释会在 Web UI 中展示,方便理解资产用途。
3.3 加载资产并启动 Web UI
在项目根目录的 dagster.py 中加载资产:
from dagster import Definitions, load_assets_from_modules
# 从 assets 模块加载所有资产
from . import assets
all_assets = load_assets_from_modules([assets])
# 定义整个项目的入口
defs = Definitions(
assets=all_assets,
)代码说明:
Definitions是 Dagster 项目的总入口,管理资产、作业、调度、资源。load_assets_from_modules自动扫描模块中的所有资产,无需手动逐个注册。
启动 Web UI:
# 在项目根目录执行
dagster-webserver -f dagster.py启动成功后,访问:http://localhost:3000,即可看到可视化界面。在界面中可以看到 hello_dagster_asset 资产,点击“Materialize”即可运行,运行后可查看日志与输出结果。
四、带依赖的多资产流水线
4.1 资产依赖示例
真实场景中,数据通常有上下游依赖,Dagster 会自动根据函数参数识别依赖关系,无需手动配置。
扩展 core_assets.py:
from dagster import asset
import pandas as pd
import os
# 上游资产:生成原始数据
@asset
def raw_user_data() -> pd.DataFrame:
"""生成原始用户数据,返回 DataFrame"""
data = {
"user_id": [1, 2, 3, 4, 5],
"username": ["张三", "李四", "王五", "赵六", "钱七"],
"age": [22, 25, 28, 24, 26]
}
df = pd.DataFrame(data)
return df
# 下游资产:依赖 raw_user_data,进行数据清洗
@asset
def clean_user_data(raw_user_data: pd.DataFrame) -> pd.DataFrame:
"""
依赖上游原始数据,进行清洗
筛选年龄大于 24 的用户
"""
clean_df = raw_user_data[raw_user_data["age"] > 24].copy()
print("清洗后数据:")
print(clean_df)
return clean_df
# 最终资产:依赖清洗后的数据,输出到 CSV 文件
@asset
def user_data_report(clean_user_data: pd.DataFrame) -> str:
"""
依赖清洗后的数据,生成 CSV 报告文件
返回文件路径
"""
output_path = "user_report.csv"
clean_user_data.to_csv(output_path, index=False, encoding="utf-8-sig")
return output_path代码说明:
raw_user_data是源头资产,无上游依赖,生成原始数据。clean_user_data参数为raw_user_data,Dagster 自动识别为上游依赖,必须先运行上游。user_data_report依赖清洗后的数据,最终输出 CSV 文件。- 整个流程形成一条完整的流水线:原始数据 → 清洗 → 生成报告。
重启 Web UI 后,可在界面看到资产依赖图,点击任意资产可选择运行上游依赖,支持单独运行、全链路运行、重试、查看输入输出。
4.2 作业与手动执行
作业是资产的执行集合,我们把上述资产打包成一个作业,方便执行。
在 jobs.py 中定义:
from dagster import define_asset_job
from assets.core_assets import raw_user_data, clean_user_data, user_data_report
# 定义用户数据处理作业
user_data_job = define_asset_job(
name="user_data_job",
selection=[raw_user_data, clean_user_data, user_data_report]
)在 dagster.py 中加入作业:
from dagster import Definitions, load_assets_from_modules
from .jobs import user_data_job
from . import assets
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
jobs=[user_data_job], # 注册作业
)在 Web UI 的 Jobs 页面,可直接点击“Launch”运行整个作业,流水线会按依赖顺序自动执行。
五、调度与定时任务
5.1 定时调度实现
Dagster 支持 cron 表达式实现定时执行,例如每天、每小时运行流水线。
在 schedules.py 中定义:
from dagster import ScheduleDefinition
from jobs import user_data_job
# 每天 0 点执行用户数据作业
daily_user_data_schedule = ScheduleDefinition(
job=user_data_job,
cron_schedule="0 0 * * *", # 分 时 日 月 周
name="daily_user_data_schedule",
description="每日凌晨执行用户数据处理任务"
)在 dagster.py 中注册调度:
from dagster import Definitions, load_assets_from_modules
from . import assets
from .jobs import user_data_job
from .schedules import daily_user_data_schedule
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
jobs=[user_data_job],
schedules=[daily_user_data_schedule], # 注册调度
)启动 Web UI 后,在 Schedules 页面可开启/关闭调度,查看历史运行记录与日志。
六、资源与环境隔离(数据库示例)
6.1 资源封装
资源用于封装外部连接,实现开发/生产环境隔离,避免代码硬编码配置。以 SQLite 为例:
在项目中新建 resources.py:
from dagster import resource
import sqlite3
class SQLiteResource:
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = None
def connect(self):
self.conn = sqlite3.connect(self.db_path)
return self.conn
def close(self):
if self.conn:
self.conn.close()
@resource
def sqlite_resource(context) -> SQLiteResource:
"""SQLite 资源定义"""
db_path = "dagster_demo.db"
return SQLiteResource(db_path)在资产中使用资源:
from dagster import asset
from resources import SQLiteResource
@asset
def create_user_table(sqlite_resource: SQLiteResource) -> None:
"""使用资源创建用户表"""
conn = sqlite_resource.connect()
cursor = conn.cursor()
create_sql = """
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
age INTEGER
)
"""
cursor.execute(create_sql)
conn.commit()
sqlite_resource.close()代码说明:
- 资源通过参数注入资产,解耦代码与外部配置。
- 切换环境只需修改资源,无需改动业务资产代码。
七、完整实战案例:自动化数据统计报告
7.1 案例需求
- 从模拟数据源获取原始数据。
- 数据清洗与统计分析。
- 结果存入数据库。
- 生成可视化报告文件。
- 每日定时执行。
7.2 完整代码实现
# assets/report_assets.py
from dagster import asset
import pandas as pd
import sqlite3
from datetime import datetime
@asset
def source_data() -> pd.DataFrame:
"""模拟来源数据:每日订单数据"""
date = datetime.now().strftime("%Y-%m-%d")
data = {
"order_id": [1001, 1002, 1003, 1004, 1005],
"amount": [99, 199, 299, 99, 499],
"status": ["success", "fail", "success", "success", "fail"],
"date": [date] * 5
}
return pd.DataFrame(data)
@asset
def stat_order_data(source_data: pd.DataFrame) -> pd.DataFrame:
"""统计成功订单数据"""
success_df = source_data[source_data["status"] == "success"]
stat_df = success_df.groupby("date").agg(
total_order=("order_id", "count"),
total_amount=("amount", "sum")
).reset_index()
return stat_df
@asset
def save_to_db(stat_order_data: pd.DataFrame) -> None:
"""将统计结果存入 SQLite"""
conn = sqlite3.connect("dagster_demo.db")
stat_order_data.to_sql("order_stat", conn, if_exists="append", index=False)
conn.close()
@asset
def generate_report(stat_order_data: pd.DataFrame) -> str:
"""生成每日统计报告"""
report_path = f"order_report_{datetime.now().strftime('%Y%m%d')}.txt"
with open(report_path, "w", encoding="utf-8") as f:
f.write("每日订单统计报告\n")
f.write(stat_order_data.to_string())
return report_path将上述资产加载到 Definitions 中,打包成作业并配置每日调度,即可实现全自动数据统计+入库+报告生成。
在 Web UI 中可查看:
- 资产依赖关系
- 每一步输入输出
- 运行日志与报错信息
- 历史执行记录
- 数据血缘追踪
八、Dagster 部署与扩展
Dagster 支持多种部署方式:
- 本地单机运行(适合开发调试)
- Docker 容器化部署
- Kubernetes 集群部署(生产高可用)
- 配合 Dagster Cloud 托管服务
生产环境中,通常使用持久化元数据存储(PostgreSQL)+ 独立工作节点,实现高可用与分布式执行。
相关资源
- Pypi地址:https://pypi.org/project/dagster
- Github地址:https://github.com/dagster-io/dagster
- 官方文档地址:https://docs.dagster.io
关注我,每天分享一个实用的Python自动化工具。

