一、Apache Airflow 核心概述
Apache Airflow是一款由Airbnb开源的任务编排与调度工具,专门用于管理复杂的工作流(Workflow)。其工作原理是将任务抽象为有向无环图(DAG),通过调度器按照任务依赖关系和预设的触发规则自动执行任务。Airflow支持丰富的任务类型,涵盖Shell命令、Python函数、SQL查询等,广泛应用于数据清洗、ETL流程、定时任务执行等场景。

该库的开源协议为Apache License 2.0,这是一个对商业友好的开源协议,允许用户自由使用、修改和分发代码。它的优点是灵活性强、可扩展性高、可视化界面友好;缺点是部署和维护成本较高,对新手不够友好,且轻量级任务场景下略显笨重。
二、Apache Airflow 安装与环境配置
2.1 系统环境要求
在安装Airflow之前,需要确保本地环境满足以下条件:
- Python 3.8~3.11 版本(Airflow 2.x 对Python版本有严格要求)
- 足够的磁盘空间(Airflow会存储任务日志和元数据)
- 已安装pip包管理工具
2.2 安装步骤
Airflow的安装方式有多种,包括pip安装、Docker安装和源码安装,这里我们以最适合新手的pip安装为例进行讲解。
- 设置环境变量
Airflow默认从PyPI下载包,在安装前需要设置一个环境变量来指定Airflow的家目录,同时避免版本冲突:# Linux/Mac系统 export AIRFLOW_HOME=~/airflow # Windows系统(cmd命令行) set AIRFLOW_HOME=D:\airflow - 安装Airflow
由于Airflow的依赖包较多,直接安装可能会出现问题,我们可以先升级pip,再指定Airflow版本进行安装(推荐安装稳定版2.6.3):# 升级pip python -m pip install --upgrade pip # 安装Airflow核心包 pip install apache-airflow==2.6.3 - 初始化元数据库
Airflow使用元数据库(默认是SQLite)存储DAG信息、任务执行状态等数据,安装完成后需要初始化数据库:airflow db init - 创建管理员用户
为了登录Airflow的Web UI,需要创建一个管理员账户:bash airflow users create \ --username admin \ --firstname FirstName \ --lastname LastName \ --role Admin \ --email [email protected]
执行该命令后,会提示输入密码,按照提示输入即可。
2.3 启动Airflow服务
Airflow包含两个核心服务:Web服务器和调度器,需要分别启动。
- 启动Web服务器
# 默认端口是8080 airflow webserver --port 8080 - 启动调度器
打开一个新的终端窗口,执行以下命令启动调度器,调度器会自动检测DAG目录中的任务并执行:airflow scheduler - 访问Web UI
打开浏览器,输入地址http://localhost:8080,使用之前创建的管理员账户和密码登录,即可看到Airflow的可视化界面。
三、Apache Airflow 核心概念与基础用法
3.1 核心概念解析
在使用Airflow之前,必须理解几个核心概念,这些概念是构建工作流的基础。
- DAG(有向无环图)
是Airflow的核心,代表一个完整的工作流。DAG由多个任务(Task)组成,任务之间存在依赖关系,且不存在循环依赖(无环)。例如,一个ETL工作流的DAG可以是:数据抽取任务 → 数据清洗任务 → 数据加载任务。 - Task(任务) 是DAG中的最小执行单元,每个Task对应一个具体的操作。Airflow支持多种类型的Task,常见的有:
PythonOperator:执行Python函数BashOperator:执行Shell命令SqlOperator:执行SQL语句EmailOperator:发送邮件
- Operator(操作符)
是定义Task的模板,不同的Operator对应不同类型的任务。Operator的作用是将任务逻辑封装起来,用户只需要传入参数即可创建Task。 - Task Instance(任务实例)
是Task的一次具体执行。每个Task在不同的时间点执行,都会生成一个独立的Task Instance,例如每天执行一次的任务,每天都会产生一个新的Task Instance。 - DAG Run(DAG运行实例)
是DAG的一次具体执行。当DAG满足触发条件时,调度器会创建一个DAG Run,负责执行该次运行中的所有Task Instance。
3.2 编写第一个DAG
Airflow的DAG文件是Python脚本,默认存储在AIRFLOW_HOME/dags目录下,调度器会自动扫描该目录下的Python文件并加载DAG。
接下来我们编写一个简单的DAG,包含两个任务:一个任务打印“Hello Airflow”,另一个任务打印“Task Finished”,且第二个任务依赖于第一个任务。
- 创建DAG文件
在AIRFLOW_HOME/dags目录下创建一个名为hello_airflow_dag.py的文件。 - 编写DAG代码
# 导入必要的模块 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator # 定义Python函数,作为任务的执行逻辑 def print_hello(): print("Hello Airflow! This is my first DAG.") def print_finish(): print("Task Finished! Congratulations!") # 定义默认参数 default_args = { 'owner': 'airflow', # DAG的所有者 'depends_on_past': False, # 不依赖于上一次的执行结果 'email_on_failure': False, # 任务失败时不发送邮件 'email_on_retry': False, # 任务重试时不发送邮件 'retries': 1, # 任务失败后的重试次数 'retry_delay': timedelta(minutes=5), # 重试间隔时间 } # 定义DAG with DAG( 'hello_airflow_dag', # DAG的唯一标识 default_args=default_args, description='A simple tutorial DAG', # DAG的描述 schedule_interval=timedelta(days=1), # 调度间隔,每天执行一次 start_date=datetime(2024, 1, 1), # DAG的开始时间 catchup=False, # 不回溯执行历史任务 tags=['example', 'tutorial'], # 标签,用于分类DAG ) as dag:# 定义第一个任务 task1 = PythonOperator( task_id='print_hello_task', # 任务的唯一标识 python_callable=print_hello, # 任务执行的Python函数 ) # 定义第二个任务 task2 = PythonOperator( task_id='print_finish_task', python_callable=print_finish, ) # 设置任务依赖关系:task1执行完成后再执行task2 task1 >> task2</code></pre></li>代码说明 默认参数(default_args):定义了DAG中所有任务的公共参数,如所有者、重试次数、重试间隔等。 DAG定义:使用with DAG()上下文管理器创建DAG,指定了DAG的ID、描述、调度间隔、开始时间等关键信息。 任务定义:使用PythonOperator创建两个任务,分别指定任务ID和要执行的Python函数。 依赖关系设置:使用>>运算符设置任务之间的依赖关系,task1 >> task2表示task2必须在task1执行完成后才能执行。 查看并触发DAG
将上述代码保存到dags目录后,等待1~2分钟,调度器会自动加载该DAG。在Airflow Web UI的DAG列表中找到hello_airflow_dag,点击右侧的开关按钮启用该DAG。 启用后,可以手动触发DAG执行:点击DAG名称进入详情页,点击右上角的“Trigger DAG”按钮,即可手动启动一次DAG运行。运行完成后,可以在“Graph”视图中查看任务的执行状态,在“Log”视图中查看任务的输出日志。
3.3 常用Operator实战
除了PythonOperator,Airflow还提供了多种常用的Operator,下面我们介绍几种最常用的Operator及其用法。
3.3.1 BashOperator:执行Shell命令
BashOperator用于执行Shell命令或脚本,适合处理需要调用系统命令的任务。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
with DAG(
'bash_operator_dag',
default_args=default_args,
description='A DAG using BashOperator',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['bash', 'example'],
) as dag:
# 执行简单的Shell命令
task1 = BashOperator(
task_id='print_date_task',
bash_command='date', # 打印当前日期
)
# 执行Shell脚本
task2 = BashOperator(
task_id='run_script_task',
bash_command='echo "Current directory: $(pwd)" && ls -l',
)
task1 >> task2代码说明:bash_command参数用于指定要执行的Shell命令或脚本,多个命令可以用&&连接。
3.3.2 EmailOperator:发送邮件
EmailOperator用于在任务执行完成后发送邮件通知,适合用于任务执行状态的告警。
使用EmailOperator前,需要在AIRFLOW_HOME/airflow.cfg配置文件中设置邮件服务器信息:
[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [email protected]
smtp_password = your_email_password
smtp_port = 587
smtp_mail_from = [email protected]然后编写DAG代码:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
def task_function():
return "Task executed successfully!"
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
with DAG(
'email_operator_dag',
default_args=default_args,
description='A DAG using EmailOperator',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['email', 'example'],
) as dag:
task1 = PythonOperator(
task_id='execute_task',
python_callable=task_function,
)
# 发送邮件通知
send_email = EmailOperator(
task_id='send_email_notification',
to='[email protected]', # 收件人邮箱
subject='Airflow Task Execution Status', # 邮件主题
html_content='<h3>Task executed successfully!</h3>', # 邮件内容(支持HTML)
)
task1 >> send_email四、Airflow 工作流实战:数据ETL流程
4.1 需求分析
我们以一个简单的电商用户数据ETL流程为例,演示如何使用Airflow构建完整的工作流。需求如下:
- 数据抽取:从CSV文件中读取用户数据。
- 数据清洗:去除缺失值、重复值,转换数据格式。
- 数据加载:将清洗后的数据写入新的CSV文件。
- 发送通知:数据加载完成后发送邮件通知。
4.2 项目目录结构
对于复杂的Airflow项目,建议采用以下目录结构,便于管理代码和数据:
airflow_home/
├── dags/
│ ├── etl_user_data_dag.py # ETL流程的DAG文件
│ └── etl_scripts/ # 存储ETL相关的Python脚本
│ ├── extract.py
│ ├── transform.py
│ └── load.py
├── data/ # 存储原始数据和清洗后的数据
│ ├── raw/
│ │ └── user_data.csv
│ └── processed/
└── logs/ # Airflow任务日志4.3 编写ETL脚本
- 数据抽取脚本(extract.py)
# dags/etl_scripts/extract.py import pandas as pd import os def extract_data(input_path: str) -> pd.DataFrame: """ 从CSV文件中抽取数据 :param input_path: 原始数据文件路径 :return: 抽取后的DataFrame """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input file not found: {input_path}") df = pd.read_csv(input_path) print(f"Extracted {len(df)} rows of data") return df - 数据清洗脚本(transform.py)
# dags/etl_scripts/transform.py import pandas as pd def transform_data(df: pd.DataFrame) -> pd.DataFrame: """ 数据清洗:去除缺失值、重复值,转换日期格式 :param df: 原始DataFrame :return: 清洗后的DataFrame """ # 去除缺失值 df = df.dropna(subset=['user_id', 'username', 'register_date']) # 去除重复值 df = df.drop_duplicates(subset=['user_id']) # 转换注册日期格式为YYYY-MM-DD df['register_date'] = pd.to_datetime(df['register_date']).dt.strftime('%Y-%m-%d') print(f"Transformed data: {len(df)} rows remaining") return df - 数据加载脚本(load.py)
# dags/etl_scripts/load.py import pandas as pd import os def load_data(df: pd.DataFrame, output_path: str) -> None: """ 将清洗后的数据写入CSV文件 :param df: 清洗后的DataFrame :param output_path: 输出文件路径 """ # 创建输出目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 写入CSV文件 df.to_csv(output_path, index=False) print(f"Loaded data to {output_path} successfully")
4.4 编写ETL DAG文件
# dags/etl_user_data_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from etl_scripts.extract import extract_data
from etl_scripts.transform import transform_data
from etl_scripts.load import load_data
import os
# 定义文件路径
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '~/airflow')
INPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/raw/user_data.csv')
OUTPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/processed/cleaned_user_data.csv')
# 定义默认参数
default_args = {
'owner': 'data_engineer',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'email': ['[email protected]'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
# 定义任务函数(封装ETL脚本)
def extract_task():
return extract_data(INPUT_PATH)
def transform_task(ti):
# 从上游任务(extract_task)获取数据
df = ti.xcom_pull(task_ids='extract_task')
return transform_data(df)
def load_task(ti):
df = ti.xcom_pull(task_ids='transform_task')
load_data(df, OUTPUT_PATH)
# 定义DAG
with DAG(
'etl_user_data_workflow',
default_args=default_args,
description='A complete ETL workflow for user data',
schedule_interval='0 1 * * *', # 每天凌晨1点执行
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'data_processing'],
) as dag:
# 定义任务
extract = PythonOperator(
task_id='extract_task',
python_callable=extract_task,
)
transform = PythonOperator(
task_id='transform_task',
python_callable=transform_task,
)
load = PythonOperator(
task_id='load_task',
python_callable=load_task,
)
send_notification = EmailOperator(
task_id='send_notification',
to='[email protected]',
subject='User Data ETL Workflow Completed',
html_content=f'<h3>ETL Workflow executed successfully!</h3><p>Cleaned data saved to: {OUTPUT_PATH}</p>',
)
# 设置任务依赖关系
extract >> transform >> load >> send_notification4.5 代码关键说明
- XCom通信机制
Airflow中,任务之间的数据传递通过XCom(Cross-Communication) 实现。在上述代码中,transform_task通过ti.xcom_pull(task_ids='extract_task')获取extract_task的返回值,load_task同理获取transform_task的返回值。XCom适合传递小量数据,大量数据建议使用文件或数据库存储。 - 调度时间设置
schedule_interval参数支持多种格式,除了timedelta,还可以使用Cron表达式。例如'0 1 * * *'表示每天凌晨1点执行任务,这是生产环境中最常用的调度方式。 - 任务失败告警
在default_args中设置了email_on_failure: True,当任务执行失败时,Airflow会自动向指定邮箱发送告警邮件。
4.6 测试与运行
- 准备原始数据
在AIRFLOW_HOME/data/raw目录下创建user_data.csv文件,写入以下测试数据:user_id,username,register_date,age,gender 1,alice,2024-01-01,25,F 2,bob,2024-01-02,,M 3,charlie,2024-01-03,30,M 2,bob,2024-01-02,28,M 4,david,,35,M - 启动DAG
将DAG文件和ETL脚本放入对应的目录后,在Airflow Web UI中启用etl_user_data_workflowDAG。可以手动触发一次执行,查看任务的执行状态。执行完成后,在data/processed目录下会生成cleaned_user_data.csv文件,内容为清洗后的数据。
五、Airflow 高级特性与优化建议
5.1 连接管理(Connections)
在实际项目中,任务经常需要连接外部系统(如数据库、Hadoop、云存储等)。Airflow提供了Connections功能,用于统一管理外部系统的连接信息,避免在代码中硬编码用户名、密码等敏感信息。
例如,要连接MySQL数据库,可以在Airflow Web UI的“Admin → Connections”页面创建一个新的连接:
- Conn Id:
mysql_default(自定义,需在代码中引用) - Conn Type:
MySQL - Host:MySQL服务器地址
- Schema:数据库名称
- Login:用户名
- Password:密码
- Port:端口号(默认3306)
在代码中可以通过Hook获取连接信息:
from airflow.providers.mysql.hooks.mysql import MySqlHook
def query_mysql_data():
hook = MySqlHook(mysql_conn_id='mysql_default')
df = hook.get_pandas_df(sql='SELECT * FROM users')
return df5.2 变量管理(Variables)
Airflow的Variables功能用于存储和管理工作流中的配置变量,如文件路径、阈值参数等。可以在Web UI的“Admin → Variables”页面添加变量,也可以在代码中通过Variable类获取变量值:
from airflow.models import Variable
# 从Variables中获取变量
input_path = Variable.get('user_data_input_path', default_var='/default/path')
output_path = Variable.get('user_data_output_path')5.3 任务并行执行与资源限制
Airflow支持任务的并行执行,可以通过以下参数优化并行度:
- parallelism:Airflow集群的最大并行任务数(全局参数)。
- dag_concurrency:单个DAG的最大并行任务数。
- max_active_runs_per_dag:单个DAG的最大活跃运行实例数。
这些参数可以在airflow.cfg配置文件中修改。
5.4 日志管理
Airflow的任务日志默认存储在本地,在生产环境中可以将日志存储到远程存储系统(如S3、HDFS、Elasticsearch等),便于日志的集中管理和查询。修改airflow.cfg中的[logging]部分即可配置远程日志存储。
六、相关资源链接
- PyPI地址:https://pypi.org/project/apache-airflow
- Github地址:https://github.com/apache/airflow
- 官方文档地址:https://airflow.apache.org/docs/apache-airflow/stable/index.html
关注我,每天分享一个实用的Python自动化工具。

