一、dbnd 库基础认知
1.1 库核心用途
dbnd 是一款面向数据工程、机器学习与数据分析场景的 Python 工作流管理库,核心用于数据管道构建、任务编排、运行追踪、结果复现与实验管理,可高效衔接数据读取、清洗、模型训练、评估、部署全流程,解决机器学习与数据处理中任务混乱、结果难复现、日志缺失、依赖管理复杂等问题。

1.2 工作原理
dbnd 以任务函数装饰器为核心,通过注解自动捕获函数输入输出、参数、日志、执行时间与异常,内置轻量调度引擎实现任务依赖编排,同时提供本地/远程运行、结果缓存、数据版本追踪能力,支持将运行结果存储至本地文件、数据库或云端服务,实现全流程可观测、可复现、可管理。
1.3 优缺点
优点:轻量无侵入、代码改造量极小、支持任务缓存与断点续跑、兼容主流机器学习库(Scikit-learn、TensorFlow、PyTorch)、提供可视化追踪界面、支持多环境运行。
缺点:超大规模分布式调度能力弱于 Airflow,生态聚焦数据管道,通用 Web 或非数据场景适配性一般。
1.4 License 类型
Apache License 2.0(开源商用友好)
二、dbnd 安装与基础配置
2.1 环境要求
dbnd 支持 Python 3.7 及以上版本,兼容 Windows、macOS、Linux 系统,可与 Jupyter Notebook、PyCharm、VS Code 无缝配合。
2.2 安装命令
使用 pip 快速安装:
pip install dbnd安装扩展包(支持机器学习、数据可视化、远程运行):
pip install dbnd[ml,web,aws]验证安装:
dbnd version出现版本号即安装成功。
2.3 初始化项目
在项目目录执行初始化命令,生成标准项目结构:
dbnd init初始化后生成基础目录:
your_project/
├── .dbnd/ # dbnd 配置与运行日志
├── tasks/ # 自定义任务脚本
├── data/ # 输入输出数据
├── models/ # 模型存储
└── dbnd.cfg # 配置文件三、dbnd 核心功能与基础代码示例
3.1 基础任务定义与运行
dbnd 使用 @task 装饰器将普通函数转为可追踪、可编排的任务,自动记录参数、运行状态与结果。
from dbnd import task
# 定义基础计算任务
@task
def add_numbers(a: int, b: int) -> int:
"""计算两个数字之和"""
result = a + b
print(f"计算结果: {result}")
return result
# 直接运行任务
if __name__ == "__main__":
output = add_numbers(a=10, b=25)
print(f"最终输出: {output}")代码说明:
@task装饰器自动封装函数,开启参数校验、日志追踪;- 函数参数类型注解可被 dbnd 读取,实现输入合法性校验;
- 任务可像普通函数一样直接调用,无需复杂配置。
运行后控制台会显示任务名称、状态、耗时、结果路径等信息。
3.2 任务依赖与管道编排
dbnd 支持任务链式调用,自动识别依赖关系,构建顺序执行管道。
from dbnd import task, pipeline
# 步骤1:数据加载
@task
def load_data() -> list:
data = [10, 20, 30, 40, 50]
print("数据加载完成")
return data
# 步骤2:数据求和
@task
def calculate_sum(data: list) -> int:
total = sum(data)
print(f"数据求和: {total}")
return total
# 步骤3:计算平均值
@task
def calculate_average(total: int, length: int) -> float:
avg = total / length
print(f"平均值: {avg}")
return avg
# 定义管道:串联多个任务
@pipeline
def data_process_pipeline():
data = load_data()
total = calculate_sum(data=data)
avg = calculate_average(total=total, length=len(data))
return avg
# 运行管道
if __name__ == "__main__":
result = data_process_pipeline()
print(f"管道最终结果: {result}")代码说明:
@pipeline用于封装任务流,自动管理任务执行顺序;- 任务间通过参数传递建立依赖,前序任务完成后才会执行后续任务;
- 运行时控制台展示完整依赖图,便于排查执行链路。
3.3 数据读写与缓存机制
dbnd 内置数据读写功能,支持 CSV、JSON、Pickle 等格式,自动缓存结果,避免重复计算,提升运行效率。
import pandas as pd
from dbnd import task, output
# 定义输出路径
@task(result=output.csv)
def create_csv_data() -> pd.DataFrame:
"""生成 DataFrame 并保存为 CSV"""
data = {
"name": ["Alice", "Bob", "Charlie"],
"score": [85, 92, 78]
}
df = pd.DataFrame(data)
return df
# 读取 CSV 数据
@task
def read_csv_data(df: pd.DataFrame) -> pd.DataFrame:
print("读取数据:")
print(df)
return df
if __name__ == "__main__":
df = create_csv_data()
read_csv_data(df=df)代码说明:
output.csv指定输出格式,dbnd 自动管理文件路径与版本;- 任务结果会被缓存,重复运行时直接读取缓存,大幅提速;
- 支持 JSON、Parquet、Excel 等多种数据格式。
3.4 机器学习任务追踪
dbnd 可自动追踪机器学习模型的参数、指标、数据集与训练日志,适合实验管理。
from dbnd import task
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
@task
def train_iris_model() -> float:
# 加载数据集
iris = load_iris()
X, y = iris.data, iris.target
# 划分训练集测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 模型训练
model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)
# 预测与评估
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"模型准确率: {acc:.4f}")
return acc
if __name__ == "__main__":
accuracy = train_iris_model()代码说明:
- dbnd 自动记录数据集版本、模型参数、评估指标;
- 所有运行结果存入本地数据库,可随时回溯历史实验;
- 兼容 Scikit-learn、XGBoost、LightGBM 等主流 ML 库。
3.5 可视化运行界面
dbnd 内置 Web UI,用于查看任务运行状态、日志、指标、依赖图。
启动命令:
dbnd webserver --port 8080访问地址:http://127.0.0.1:8080
在界面中可查看:
- 任务执行时间与状态
- 输入输出参数
- 运行日志与报错信息
- 任务依赖拓扑图
- 机器学习实验对比
四、dbnd 高级功能与实战案例
4.1 自定义配置与环境切换
通过修改 dbnd.cfg 可切换本地、测试、生产环境,支持自定义数据路径、日志级别、存储方式。
示例配置:
[core]
local_db = sqlite:///.dbnd/dbnd.db
task_run_dir = ./tasks/runs
log_level = INFO[output]
default = csv path = ./data/output
4.2 异常捕获与重试机制
dbnd 自动捕获任务异常,支持失败重试、超时控制,提升管道稳定性。
from dbnd import task
@task(retries=3, retry_delay=1)
def unstable_task():
import random
if random.random() < 0.5:
raise ValueError("随机异常,测试重试")
return "执行成功"
if __name__ == "__main__":
unstable_task()代码说明:
retries设置最大重试次数;retry_delay设置重试间隔;- 异常信息会完整记录,便于定位问题。
4.3 完整实战案例:机器学习训练与评估管道
以下案例实现数据加载 → 预处理 → 训练 → 评估 → 保存模型的完整流程。
from dbnd import task, pipeline
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib
# 1. 加载数据
@task
def load_dataset() -> pd.DataFrame:
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
columns = ["sepal_len", "sepal_wid", "petal_len", "petal_wid", "target"]
df = pd.read_csv(url, names=columns)
return df
# 2. 数据预处理
@task
def preprocess_data(df: pd.DataFrame) -> tuple:
X = df.drop("target", axis=1)
y = df["target"]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
return X_scaled, y, scaler
# 3. 训练模型
@task
def train_model(X, y) -> RandomForestClassifier:
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
return model
# 4. 模型评估
@task
def evaluate_model(model: RandomForestClassifier, X, y):
y_pred = model.predict(X)
report = classification_report(y, y_pred)
print(report)
return report
# 5. 保存模型
@task
def save_model(model, scaler):
joblib.dump(model, "models/iris_model.pkl")
joblib.dump(scaler, "models/scaler.pkl")
return "模型保存完成"
# 完整管道
@pipeline
def ml_train_pipeline():
df = load_dataset()
X, y, scaler = preprocess_data(df)
model = train_model(X, y)
evaluate_model(model, X, y)
save_model(model, scaler)
if __name__ == "__main__":
ml_train_pipeline()代码说明:
- 覆盖机器学习工程化全流程;
- 每一步均可独立追踪、复现、调试;
- 适合团队协作与实验管理。
五、命令行运行与项目管理
5.1 命令行执行任务
# 运行单个任务
dbnd run module_name::task_name --param value
# 运行管道
dbnd run pipeline.py::data_process_pipeline
# 查看历史运行
dbnd runs list
# 查看任务详情
dbnd runs show <run_id>5.2 多环境运行
# 本地运行
dbnd run train.py::ml_train_pipeline
# 远程运行(需配置远程环境)
dbnd run --env remote train.py::ml_train_pipeline相关资源
- Pypi地址:https://pypi.org/project/dbnd/
- Github地址:https://github.com/databand-ai/dbnd
- 官方文档地址:https://dbnd.readthedocs.io/
关注我,每天分享一个实用的Python自动化工具。

