Python实用工具:Apache Airflow 从入门到实战 保姆级教程

一、Apache Airflow 核心概述

Apache Airflow是一款由Airbnb开源的任务编排与调度工具,专门用于管理复杂的工作流(Workflow)。其工作原理是将任务抽象为有向无环图(DAG),通过调度器按照任务依赖关系和预设的触发规则自动执行任务。Airflow支持丰富的任务类型,涵盖Shell命令、Python函数、SQL查询等,广泛应用于数据清洗、ETL流程、定时任务执行等场景。

该库的开源协议为Apache License 2.0,这是一个对商业友好的开源协议,允许用户自由使用、修改和分发代码。它的优点是灵活性强、可扩展性高、可视化界面友好;缺点是部署和维护成本较高,对新手不够友好,且轻量级任务场景下略显笨重。

二、Apache Airflow 安装与环境配置

2.1 系统环境要求

在安装Airflow之前,需要确保本地环境满足以下条件:

  • Python 3.8~3.11 版本(Airflow 2.x 对Python版本有严格要求)
  • 足够的磁盘空间(Airflow会存储任务日志和元数据)
  • 已安装pip包管理工具

2.2 安装步骤

Airflow的安装方式有多种,包括pip安装、Docker安装和源码安装,这里我们以最适合新手的pip安装为例进行讲解。

  1. 设置环境变量
    Airflow默认从PyPI下载包,在安装前需要设置一个环境变量来指定Airflow的家目录,同时避免版本冲突: # Linux/Mac系统 export AIRFLOW_HOME=~/airflow # Windows系统(cmd命令行) set AIRFLOW_HOME=D:\airflow
  2. 安装Airflow
    由于Airflow的依赖包较多,直接安装可能会出现问题,我们可以先升级pip,再指定Airflow版本进行安装(推荐安装稳定版2.6.3): # 升级pip python -m pip install --upgrade pip # 安装Airflow核心包 pip install apache-airflow==2.6.3
  3. 初始化元数据库
    Airflow使用元数据库(默认是SQLite)存储DAG信息、任务执行状态等数据,安装完成后需要初始化数据库: airflow db init
  4. 创建管理员用户
    为了登录Airflow的Web UI,需要创建一个管理员账户:
    bash airflow users create \ --username admin \ --firstname FirstName \ --lastname LastName \ --role Admin \ --email [email protected]
    执行该命令后,会提示输入密码,按照提示输入即可。

2.3 启动Airflow服务

Airflow包含两个核心服务:Web服务器调度器,需要分别启动。

  1. 启动Web服务器 # 默认端口是8080 airflow webserver --port 8080
  2. 启动调度器
    打开一个新的终端窗口,执行以下命令启动调度器,调度器会自动检测DAG目录中的任务并执行: airflow scheduler
  3. 访问Web UI
    打开浏览器,输入地址 http://localhost:8080,使用之前创建的管理员账户和密码登录,即可看到Airflow的可视化界面。

三、Apache Airflow 核心概念与基础用法

3.1 核心概念解析

在使用Airflow之前,必须理解几个核心概念,这些概念是构建工作流的基础。

  1. DAG(有向无环图)
    是Airflow的核心,代表一个完整的工作流。DAG由多个任务(Task)组成,任务之间存在依赖关系,且不存在循环依赖(无环)。例如,一个ETL工作流的DAG可以是:数据抽取任务 → 数据清洗任务 → 数据加载任务
  2. Task(任务) 是DAG中的最小执行单元,每个Task对应一个具体的操作。Airflow支持多种类型的Task,常见的有:
    • PythonOperator:执行Python函数
    • BashOperator:执行Shell命令
    • SqlOperator:执行SQL语句
    • EmailOperator:发送邮件
  3. Operator(操作符)
    是定义Task的模板,不同的Operator对应不同类型的任务。Operator的作用是将任务逻辑封装起来,用户只需要传入参数即可创建Task。
  4. Task Instance(任务实例)
    是Task的一次具体执行。每个Task在不同的时间点执行,都会生成一个独立的Task Instance,例如每天执行一次的任务,每天都会产生一个新的Task Instance。
  5. DAG Run(DAG运行实例)
    是DAG的一次具体执行。当DAG满足触发条件时,调度器会创建一个DAG Run,负责执行该次运行中的所有Task Instance。

3.2 编写第一个DAG

Airflow的DAG文件是Python脚本,默认存储在AIRFLOW_HOME/dags目录下,调度器会自动扫描该目录下的Python文件并加载DAG。

接下来我们编写一个简单的DAG,包含两个任务:一个任务打印“Hello Airflow”,另一个任务打印“Task Finished”,且第二个任务依赖于第一个任务。

  1. 创建DAG文件
    AIRFLOW_HOME/dags目录下创建一个名为hello_airflow_dag.py的文件。
  2. 编写DAG代码 # 导入必要的模块 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator # 定义Python函数,作为任务的执行逻辑 def print_hello(): print("Hello Airflow! This is my first DAG.") def print_finish(): print("Task Finished! Congratulations!") # 定义默认参数 default_args = { 'owner': 'airflow', # DAG的所有者 'depends_on_past': False, # 不依赖于上一次的执行结果 'email_on_failure': False, # 任务失败时不发送邮件 'email_on_retry': False, # 任务重试时不发送邮件 'retries': 1, # 任务失败后的重试次数 'retry_delay': timedelta(minutes=5), # 重试间隔时间 } # 定义DAG with DAG( 'hello_airflow_dag', # DAG的唯一标识 default_args=default_args, description='A simple tutorial DAG', # DAG的描述 schedule_interval=timedelta(days=1), # 调度间隔,每天执行一次 start_date=datetime(2024, 1, 1), # DAG的开始时间 catchup=False, # 不回溯执行历史任务 tags=['example', 'tutorial'], # 标签,用于分类DAG ) as dag:# 定义第一个任务 task1 = PythonOperator( task_id='print_hello_task', # 任务的唯一标识 python_callable=print_hello, # 任务执行的Python函数 ) # 定义第二个任务 task2 = PythonOperator( task_id='print_finish_task', python_callable=print_finish, ) # 设置任务依赖关系:task1执行完成后再执行task2 task1 &gt;&gt; task2</code></pre></li>代码说明 默认参数(default_args):定义了DAG中所有任务的公共参数,如所有者、重试次数、重试间隔等。 DAG定义:使用with DAG()上下文管理器创建DAG,指定了DAG的ID、描述、调度间隔、开始时间等关键信息。 任务定义:使用PythonOperator创建两个任务,分别指定任务ID和要执行的Python函数。 依赖关系设置:使用>>运算符设置任务之间的依赖关系,task1 >> task2表示task2必须在task1执行完成后才能执行。 查看并触发DAG
    将上述代码保存到dags目录后,等待1~2分钟,调度器会自动加载该DAG。在Airflow Web UI的DAG列表中找到hello_airflow_dag,点击右侧的开关按钮启用该DAG。 启用后,可以手动触发DAG执行:点击DAG名称进入详情页,点击右上角的“Trigger DAG”按钮,即可手动启动一次DAG运行。运行完成后,可以在“Graph”视图中查看任务的执行状态,在“Log”视图中查看任务的输出日志。

3.3 常用Operator实战

除了PythonOperator,Airflow还提供了多种常用的Operator,下面我们介绍几种最常用的Operator及其用法。

3.3.1 BashOperator:执行Shell命令

BashOperator用于执行Shell命令或脚本,适合处理需要调用系统命令的任务。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    'bash_operator_dag',
    default_args=default_args,
    description='A DAG using BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['bash', 'example'],
) as dag:

    # 执行简单的Shell命令
    task1 = BashOperator(
        task_id='print_date_task',
        bash_command='date',  # 打印当前日期
    )

    # 执行Shell脚本
    task2 = BashOperator(
        task_id='run_script_task',
        bash_command='echo "Current directory: $(pwd)" && ls -l',
    )

    task1 >> task2

代码说明bash_command参数用于指定要执行的Shell命令或脚本,多个命令可以用&&连接。

3.3.2 EmailOperator:发送邮件

EmailOperator用于在任务执行完成后发送邮件通知,适合用于任务执行状态的告警。

使用EmailOperator前,需要在AIRFLOW_HOME/airflow.cfg配置文件中设置邮件服务器信息:

[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [email protected]
smtp_password = your_email_password
smtp_port = 587
smtp_mail_from = [email protected]

然后编写DAG代码:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

def task_function():
    return "Task executed successfully!"

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    'email_operator_dag',
    default_args=default_args,
    description='A DAG using EmailOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['email', 'example'],
) as dag:

    task1 = PythonOperator(
        task_id='execute_task',
        python_callable=task_function,
    )

    # 发送邮件通知
    send_email = EmailOperator(
        task_id='send_email_notification',
        to='[email protected]',  # 收件人邮箱
        subject='Airflow Task Execution Status',  # 邮件主题
        html_content='<h3>Task executed successfully!</h3>',  # 邮件内容(支持HTML)
    )

    task1 >> send_email

四、Airflow 工作流实战:数据ETL流程

4.1 需求分析

我们以一个简单的电商用户数据ETL流程为例,演示如何使用Airflow构建完整的工作流。需求如下:

  1. 数据抽取:从CSV文件中读取用户数据。
  2. 数据清洗:去除缺失值、重复值,转换数据格式。
  3. 数据加载:将清洗后的数据写入新的CSV文件。
  4. 发送通知:数据加载完成后发送邮件通知。

4.2 项目目录结构

对于复杂的Airflow项目,建议采用以下目录结构,便于管理代码和数据:

airflow_home/
├── dags/
│   ├── etl_user_data_dag.py  # ETL流程的DAG文件
│   └── etl_scripts/          # 存储ETL相关的Python脚本
│       ├── extract.py
│       ├── transform.py
│       └── load.py
├── data/                     # 存储原始数据和清洗后的数据
│   ├── raw/
│   │   └── user_data.csv
│   └── processed/
└── logs/                     # Airflow任务日志

4.3 编写ETL脚本

  1. 数据抽取脚本(extract.py) # dags/etl_scripts/extract.py import pandas as pd import os def extract_data(input_path: str) -> pd.DataFrame: """ 从CSV文件中抽取数据 :param input_path: 原始数据文件路径 :return: 抽取后的DataFrame """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input file not found: {input_path}") df = pd.read_csv(input_path) print(f"Extracted {len(df)} rows of data") return df
  2. 数据清洗脚本(transform.py) # dags/etl_scripts/transform.py import pandas as pd def transform_data(df: pd.DataFrame) -> pd.DataFrame: """ 数据清洗:去除缺失值、重复值,转换日期格式 :param df: 原始DataFrame :return: 清洗后的DataFrame """ # 去除缺失值 df = df.dropna(subset=['user_id', 'username', 'register_date']) # 去除重复值 df = df.drop_duplicates(subset=['user_id']) # 转换注册日期格式为YYYY-MM-DD df['register_date'] = pd.to_datetime(df['register_date']).dt.strftime('%Y-%m-%d') print(f"Transformed data: {len(df)} rows remaining") return df
  3. 数据加载脚本(load.py) # dags/etl_scripts/load.py import pandas as pd import os def load_data(df: pd.DataFrame, output_path: str) -> None: """ 将清洗后的数据写入CSV文件 :param df: 清洗后的DataFrame :param output_path: 输出文件路径 """ # 创建输出目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 写入CSV文件 df.to_csv(output_path, index=False) print(f"Loaded data to {output_path} successfully")

4.4 编写ETL DAG文件

# dags/etl_user_data_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from etl_scripts.extract import extract_data
from etl_scripts.transform import transform_data
from etl_scripts.load import load_data
import os

# 定义文件路径
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '~/airflow')
INPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/raw/user_data.csv')
OUTPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/processed/cleaned_user_data.csv')

# 定义默认参数
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# 定义任务函数(封装ETL脚本)
def extract_task():
    return extract_data(INPUT_PATH)

def transform_task(ti):
    # 从上游任务(extract_task)获取数据
    df = ti.xcom_pull(task_ids='extract_task')
    return transform_data(df)

def load_task(ti):
    df = ti.xcom_pull(task_ids='transform_task')
    load_data(df, OUTPUT_PATH)

# 定义DAG
with DAG(
    'etl_user_data_workflow',
    default_args=default_args,
    description='A complete ETL workflow for user data',
    schedule_interval='0 1 * * *',  # 每天凌晨1点执行
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'data_processing'],
) as dag:

    # 定义任务
    extract = PythonOperator(
        task_id='extract_task',
        python_callable=extract_task,
    )

    transform = PythonOperator(
        task_id='transform_task',
        python_callable=transform_task,
    )

    load = PythonOperator(
        task_id='load_task',
        python_callable=load_task,
    )

    send_notification = EmailOperator(
        task_id='send_notification',
        to='[email protected]',
        subject='User Data ETL Workflow Completed',
        html_content=f'<h3>ETL Workflow executed successfully!</h3><p>Cleaned data saved to: {OUTPUT_PATH}</p>',
    )

    # 设置任务依赖关系
    extract >> transform >> load >> send_notification

4.5 代码关键说明

  1. XCom通信机制
    Airflow中,任务之间的数据传递通过XCom(Cross-Communication) 实现。在上述代码中,transform_task通过ti.xcom_pull(task_ids='extract_task')获取extract_task的返回值,load_task同理获取transform_task的返回值。XCom适合传递小量数据,大量数据建议使用文件或数据库存储。
  2. 调度时间设置
    schedule_interval参数支持多种格式,除了timedelta,还可以使用Cron表达式。例如'0 1 * * *'表示每天凌晨1点执行任务,这是生产环境中最常用的调度方式。
  3. 任务失败告警
    default_args中设置了email_on_failure: True,当任务执行失败时,Airflow会自动向指定邮箱发送告警邮件。

4.6 测试与运行

  1. 准备原始数据
    AIRFLOW_HOME/data/raw目录下创建user_data.csv文件,写入以下测试数据: user_id,username,register_date,age,gender 1,alice,2024-01-01,25,F 2,bob,2024-01-02,,M 3,charlie,2024-01-03,30,M 2,bob,2024-01-02,28,M 4,david,,35,M
  2. 启动DAG
    将DAG文件和ETL脚本放入对应的目录后,在Airflow Web UI中启用etl_user_data_workflow DAG。可以手动触发一次执行,查看任务的执行状态。执行完成后,在data/processed目录下会生成cleaned_user_data.csv文件,内容为清洗后的数据。

五、Airflow 高级特性与优化建议

5.1 连接管理(Connections)

在实际项目中,任务经常需要连接外部系统(如数据库、Hadoop、云存储等)。Airflow提供了Connections功能,用于统一管理外部系统的连接信息,避免在代码中硬编码用户名、密码等敏感信息。

例如,要连接MySQL数据库,可以在Airflow Web UI的“Admin → Connections”页面创建一个新的连接:

  • Conn Idmysql_default(自定义,需在代码中引用)
  • Conn TypeMySQL
  • Host:MySQL服务器地址
  • Schema:数据库名称
  • Login:用户名
  • Password:密码
  • Port:端口号(默认3306)

在代码中可以通过Hook获取连接信息:

from airflow.providers.mysql.hooks.mysql import MySqlHook

def query_mysql_data():
    hook = MySqlHook(mysql_conn_id='mysql_default')
    df = hook.get_pandas_df(sql='SELECT * FROM users')
    return df

5.2 变量管理(Variables)

Airflow的Variables功能用于存储和管理工作流中的配置变量,如文件路径、阈值参数等。可以在Web UI的“Admin → Variables”页面添加变量,也可以在代码中通过Variable类获取变量值:

from airflow.models import Variable

# 从Variables中获取变量
input_path = Variable.get('user_data_input_path', default_var='/default/path')
output_path = Variable.get('user_data_output_path')

5.3 任务并行执行与资源限制

Airflow支持任务的并行执行,可以通过以下参数优化并行度:

  • parallelism:Airflow集群的最大并行任务数(全局参数)。
  • dag_concurrency:单个DAG的最大并行任务数。
  • max_active_runs_per_dag:单个DAG的最大活跃运行实例数。

这些参数可以在airflow.cfg配置文件中修改。

5.4 日志管理

Airflow的任务日志默认存储在本地,在生产环境中可以将日志存储到远程存储系统(如S3、HDFS、Elasticsearch等),便于日志的集中管理和查询。修改airflow.cfg中的[logging]部分即可配置远程日志存储。

六、相关资源链接

  • PyPI地址:https://pypi.org/project/apache-airflow
  • Github地址:https://github.com/apache/airflow
  • 官方文档地址:https://airflow.apache.org/docs/apache-airflow/stable/index.html

关注我,每天分享一个实用的Python自动化工具。