Python 数据建模神器:dbt 从入门到实战,轻松搞定数据仓库开发

一、dbt 核心介绍

dbt(data build tool)是专注于数据仓库建模的 Python 工具,核心作用是让数据工程师、分析师用 SQL 完成数据转换、测试、文档化,无需编写复杂调度脚本。其原理是基于 SQL 编译生成可执行模型,依赖数据仓库引擎执行计算,采用 Apache 2.0 开源协议。优点是上手快、协作友好、自带测试与文档,缺点是依赖数据仓库、不负责数据抽取与加载。

二、dbt 安装与环境初始化

2.1 安装 dbt

dbt 可通过 pip 直接安装,不同数据仓库对应不同适配器,主流适配 BigQuery、Snowflake、Redshift、Databricks、PostgreSQL 等,这里以通用安装和最常用的 PostgreSQL 适配器为例。

打开命令行执行安装命令:

# 安装 dbt 核心及 PostgreSQL 适配器
pip install dbt-core dbt-postgres

安装完成后验证版本:

dbt --version

出现版本信息即代表安装成功,包含 core 版本和对应适配器版本。

2.2 初始化 dbt 项目

安装完成后,创建专属 dbt 项目,命令会自动生成标准目录结构,这是 dbt 规范开发的基础。

# 创建名为 dbt_demo 的项目
dbt init dbt_demo

执行后进入项目目录:

cd dbt_demo

2.3 配置数据仓库连接

dbt 核心配置文件为 profiles.yml,默认在用户目录 .dbt 文件夹下,用于配置数据库连接信息。

以 PostgreSQL 为例,配置内容如下:

dbt_demo:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      user: postgres
      password: 你的密码
      port: 5432
      dbname: postgres
      schema: dbt_demo
      threads: 4

配置完成后,测试连接是否正常:

dbt debug

显示 All checks passed! 代表连接成功。

三、dbt 标准目录结构

dbt 有固定的目录规范,便于团队协作和模型管理,初始化后的目录结构如下:

dbt_demo/
├── analyses/          # 存放分析查询 SQL
├── dbt_project.yml    # 项目核心配置文件
├── macros/            # 自定义 Jinja2 宏
├── models/            # 核心数据模型目录
├── seeds/             # 存放静态 CSV 数据
├── snapshots/         # 快照数据,记录历史状态
├── tests/             # 自定义数据测试
└── README.md

其中 models 是核心目录,所有业务数据模型都在此编写,dbt_project.yml 用于配置模型、权限、变量等。

四、dbt 基础使用:从简单模型开始

4.1 编写基础模型

模型是 dbt 的核心,本质就是带逻辑的 SQL 文件,存放在 models 目录下。

models 目录下创建 user_orders.sql,编写简单模型,关联用户表和订单表:

-- models/user_orders.sql
{{ config(materialized='table') }}

SELECT
    u.user_id,
    u.username,
    u.create_time AS user_create_time,
    o.order_id,
    o.order_amount,
    o.order_time
FROM
    public.users u
LEFT JOIN
    public.orders o
ON
    u.user_id = o.user_id

代码说明:

  • {{ config(materialized='table') }} 表示模型构建为表,还支持 view、incremental、ephemeral 等类型
  • 模型本质是标准 SQL,dbt 会自动编译并在数据库中生成对应表或视图

4.2 运行 dbt 模型

编写完成后,执行命令运行模型:

dbt run

执行成功后,会在配置的 schema 下生成 user_orders 表,数据自动关联计算完成。

4.3 为模型添加文档描述

dbt 支持直接在 SQL 中添加注释,自动生成文档,无需手动维护。

优化后的模型:

-- models/user_orders.sql
{{ config(materialized='table', tags=['user', 'order']) }}

/*
 * 模型名称: user_orders
 * 功能: 用户与订单关联宽表,用于用户消费分析
 */

SELECT
    u.user_id AS user_id,        -- 用户ID,主键
    u.username AS username,      -- 用户名
    u.create_time AS user_create_time,  -- 用户注册时间
    o.order_id AS order_id,      -- 订单ID
    o.order_amount AS order_amount,  -- 订单金额
    o.order_time AS order_time   -- 下单时间
FROM
    public.users u
LEFT JOIN
    public.orders o
ON
    u.user_id = o.user_id

4.4 生成并查看文档

执行命令生成文档:

dbt docs generate
dbt docs serve

执行后会启动本地服务,浏览器访问可查看完整的模型血缘关系、字段说明、模型依赖,非常适合团队协作。

五、dbt 进阶使用:数据测试与增量模型

5.1 内置数据测试

数据质量是数仓核心,dbt 内置丰富测试,在 models 下创建 schema.yml 配置测试规则:

version: 2

models:
  - name: user_orders
    columns:
      - name: user_id
        tests:
          - not_null
          - unique
      - name: order_amount
        tests:
          - not_null

配置后执行测试命令:

dbt test

dbt 会自动校验字段是否为空、是否唯一,快速发现数据问题。

5.2 自定义测试

除内置测试外,还可编写自定义 SQL 测试,在 tests 目录下创建 test_order_amount_positive.sql

-- 测试订单金额必须大于0
SELECT
    *
FROM
    {{ ref('user_orders') }}
WHERE
    order_amount <= 0

执行 dbt test 时会自动运行该测试,若有负金额则报错。

5.3 增量模型(核心进阶功能)

全量构建在大数据量下效率极低,dbt 提供增量模型,只新增或更新数据。

创建增量订单模型 incremental_orders.sql

-- models/incremental_orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

SELECT
    order_id,
    user_id,
    order_amount,
    order_time
FROM
    public.orders

{% if is_incremental() %}
    -- 增量逻辑:只查询比当前模型最大时间大的数据
    WHERE order_time > (SELECT MAX(order_time) FROM {{ this }})
{% endif %}

代码说明:

  • incremental 代表增量模型
  • unique_key 是增量合并的唯一键
  • is_incremental() 宏用于区分首次运行与增量运行
    首次执行全量构建,后续执行只增量同步新数据,大幅提升运行效率。

六、dbt 宏与变量:提升代码复用性

6.1 自定义宏

宏类似 Python 函数,可复用 SQL 逻辑,在 macros 目录下创建 get_date_macro.sql

{% macro get_today() %}
    CURRENT_DATE
{% endmacro %}

在模型中直接调用:

SELECT
    order_id,
    order_time,
    {{ get_today() }} AS stat_date
FROM
    {{ ref('incremental_orders') }}

6.2 使用项目变量

dbt_project.yml 中定义变量:

vars:
  start_date: '2025-01-01'

模型中使用变量:

SELECT
    *
FROM
    {{ ref('user_orders') }}
WHERE
    order_time >= '{{ var("start_date") }}'

七、dbt 种子数据:静态数据导入

dbt 支持将本地 CSV 文件导入数据库,称为种子数据,适合字典表、静态映射表。

seeds 目录下创建 user_level.csv

user_id,level
1,VIP
2,普通用户
3,VIP

执行导入命令:

dbt seed

导入后可在模型中直接引用:

SELECT
    u.*,
    l.level
FROM
    {{ ref('user_orders') }} u
LEFT JOIN
    {{ ref('user_level') }} l
ON
    u.user_id = l.user_id

八、完整实战案例:用户消费统计宽表

结合前面所有知识点,构建一个完整的数仓模型,用于业务分析、报表展示。

8.1 需求说明

构建用户日消费统计表,包含用户ID、用户名、消费日期、消费次数、消费总金额、会员等级,支持增量更新、数据测试、自动文档。

8.2 模型代码

-- models/user_daily_stat.sql
{{ config(
    materialized='incremental',
    unique_key='concat(user_id, stat_date)',
    tags=['report', 'user', 'stat'],
    incremental_strategy='merge'
) }}

WITH order_daily AS (
    SELECT
        user_id,
        DATE(order_time) AS stat_date,
        COUNT(order_id) AS order_count,
        SUM(order_amount) AS total_amount
    FROM
        {{ ref('incremental_orders') }}
    GROUP BY
        user_id, DATE(order_time)
),

user_info AS (
    SELECT
        user_id,
        username
    FROM
        public.users
),

user_level AS (
    SELECT
        user_id,
        level
    FROM
        {{ ref('user_level') }}
)

SELECT
    u.user_id,
    u.username,
    l.level,
    o.stat_date,
    o.order_count,
    o.total_amount
FROM
    user_info u
JOIN
    order_daily o ON u.user_id = o.user_id
LEFT JOIN
    user_level l ON u.user_id = l.user_id

{% if is_incremental() %}
    WHERE o.stat_date > (SELECT MAX(stat_date) FROM {{ this }})
{% endif %}

8.3 测试配置

version: 2

models:
  - name: user_daily_stat
    columns:
      - name: user_id
        tests:
          - not_null
      - name: stat_date
        tests:
          - not_null
      - name: total_amount
        tests:
          - not_null

8.4 完整执行流程

# 运行所有模型
dbt run

# 执行所有测试
dbt test

# 生成文档
dbt docs generate
dbt docs serve

执行完成后,即可得到稳定、可复用、可追溯、带质量保障的用户消费日统计报表。

九、dbt 常用命令总结

  • dbt init:初始化项目
  • dbt debug:检查配置与连接
  • dbt run:运行所有模型
  • dbt run --select 模型名:运行单个模型
  • dbt test:执行所有测试
  • dbt seed:导入种子数据
  • dbt docs generate:生成文档
  • dbt docs serve:启动文档服务

相关资源

  • Pypi地址:https://pypi.org/project/dbt-core/
  • Github地址:https://github.com/dbt-labs/dbt-core
  • 官方文档地址:https://docs.getdbt.com/

关注我,每天分享一个实用的Python自动化工具。