一、Luigi 库概述
Luigi 是 Spotify 开源的 Python 任务编排与工作流管理库,专注于解决复杂批量任务依赖、执行与监控问题,核心原理是通过定义任务依赖关系自动构建执行拓扑图,按依赖顺序调度任务,支持任务失败重试、断点续跑。优点是依赖管理清晰、适配大数据与批处理场景、代码侵入低、易集成;缺点是无原生分布式调度、WebUI 功能简洁。采用 Apache License 2.0 开源协议,可商用与修改。

二、Luigi 库安装与基础环境配置
2.1 安装 Luigi
Luigi 支持 Python 3.6 及以上版本,使用 pip 即可快速安装,打开命令行执行以下命令:
pip install luigi安装完成后可通过查看版本验证是否安装成功:
luigi --version若输出对应版本号,说明安装正常。
2.2 核心基础概念
在使用 Luigi 前,需要先掌握几个核心概念:
- Task(任务):所有工作流的最小单元,继承
luigi.Task类,需重写requires()、run()、output()三个核心方法。 - requires():定义当前任务依赖的前置任务,无依赖则可不写或返回空。
- run():任务的核心逻辑,编写具体执行代码。
- output():定义任务执行完成后的输出目标,通常为文件、数据库标识等,用于判断任务是否已完成。
- Target(目标):任务输出的抽象载体,常用
LocalTarget(本地文件)、HiveTarget、PostgresTarget等。 - 工作流:多个 Task 通过依赖关系串联形成的完整执行流程。
三、Luigi 基础使用与代码示例
3.1 最简单的单机任务
先从无依赖的基础任务入手,创建一个生成文本文件的任务,直观感受 Luigi 的执行逻辑。
创建文件 luigi_demo_01.py,代码如下:
import luigi
# 定义基础任务,继承 luigi.Task
class CreateFile(luigi.Task):
# 定义任务输出文件
def output(self):
# LocalTarget 表示本地文件目标
return luigi.LocalTarget('hello_luigi.txt')
# 任务执行逻辑
def run(self):
# self.output().open() 获取输出文件句柄
with self.output().open('w') as f:
f.write('Hello Luigi! 这是第一个 Luigi 任务\n')
f.write('任务执行成功!')
if __name__ == '__main__':
# 命令行方式启动任务
luigi.run()代码说明:
- 自定义
CreateFile任务继承luigi.Task,是 Luigi 任务的标准写法。 output()方法指定输出为本地文件hello_luigi.txt,Luigi 会通过该文件判断任务是否完成。run()方法内编写业务逻辑,向目标文件写入文本内容。luigi.run()允许以命令行参数方式启动任务。
执行命令:
python luigi_demo_01.py CreateFile --local-schedulerCreateFile:指定要执行的任务类名。--local-scheduler:使用本地调度器,适合单机测试。
执行成功后,目录下会生成 hello_luigi.txt 文件,且再次执行相同命令时,Luigi 会检测到文件已存在,直接判定任务完成,不再重复执行,这就是 Luigi 的幂等性核心特性。
3.2 带依赖的多任务工作流
实际场景中任务通常存在依赖关系,例如先创建数据文件,再读取文件处理数据。下面实现两个任务的依赖串联。
创建文件 luigi_demo_02.py:
import luigi
# 任务1:生成原始数据文件
class GenerateData(luigi.Task):
def output(self):
return luigi.LocalTarget('data.txt')
def run(self):
with self.output().open('w') as f:
# 写入 1-10 的数字
for i in range(1, 11):
f.write(f'{i}\n')
# 任务2:依赖 GenerateData,计算数字总和
class CalculateSum(luigi.Task):
# 定义依赖任务
def requires(self):
return GenerateData()
def output(self):
return luigi.LocalTarget('sum_result.txt')
def run(self):
# 读取依赖任务的输出文件
with self.input().open('r') as f:
lines = f.readlines()
# 转换为整数并求和
total = sum(int(line.strip()) for line in lines if line.strip())
# 写入计算结果
with self.output().open('w') as f:
f.write(f'1到10的总和为:{total}')
if __name__ == '__main__':
luigi.run()代码说明:
GenerateData任务生成包含 1-10 数字的data.txt。CalculateSum任务通过requires()依赖GenerateData,执行前会先自动运行前置任务。self.input()可直接获取依赖任务的输出 Target,无需手动指定路径,解耦且安全。- 任务执行完成后生成
sum_result.txt,存储计算结果。
执行命令:
python luigi_demo_02.py CalculateSum --local-scheduler执行流程:先运行 GenerateData 生成数据文件,再运行 CalculateSum 计算总和,若 data.txt 已存在,则跳过前置任务,直接执行计算任务。
3.3 带参数的动态任务
固定任务无法满足多变需求,Luigi 支持通过 luigi.Parameter() 定义参数,实现任务动态化。
创建文件 luigi_demo_03.py:
import luigi
# 带参数的生成数据任务
class GenerateNumData(luigi.Task):
# 定义参数:数字上限
max_num = luigi.IntParameter(default=10)
# 定义参数:输出文件名
filename = luigi.Parameter(default='num_data.txt')
def output(self):
return luigi.LocalTarget(self.filename)
def run(self):
with self.output().open('w') as f:
for i in range(1, self.max_num + 1):
f.write(f'{i}\n')
# 带参数的求和任务
class CalculateDynamicSum(luigi.Task):
max_num = luigi.IntParameter(default=10)
filename = luigi.Parameter(default='num_data.txt')
def requires(self):
# 向依赖任务传递参数
return GenerateNumData(max_num=self.max_num, filename=self.filename)
def output(self):
return luigi.LocalTarget('dynamic_sum_result.txt')
def run(self):
with self.input().open('r') as f:
lines = f.readlines()
total = sum(int(line.strip()) for line in lines if line.strip())
with self.output().open('w') as f:
f.write(f'1到{self.max_num}的总和为:{total}')
if __name__ == '__main__':
luigi.run()代码说明:
- 使用
luigi.IntParameter定义整数参数,luigi.Parameter定义字符串参数,支持默认值。 - 子任务可通过
requires()向父任务传递参数,保持参数一致性。 - 输出路径、数据范围均可通过参数动态调整,提升任务复用性。
执行命令(指定参数):
python luigi_demo_03.py CalculateDynamicSum --max-num 20 --filename my_data.txt --local-scheduler命令中 --max-num 20 对应任务中的 max_num 参数,会生成 1-20 的数据并计算总和。
3.4 任务失败重试与断点续跑
Luigi 内置任务失败重试机制,无需手动编写异常处理,只需在任务中配置重试次数。
示例代码(添加重试配置):
import luigi
import random
class UnstableTask(luigi.Task):
# 配置重试次数
retry_count = luigi.IntParameter(default=3)
def output(self):
return luigi.LocalTarget('retry_test.txt')
def run(self):
# 模拟随机失败
if random.random() < 0.7:
raise Exception('任务随机失败,触发重试')
with self.output().open('w') as f:
f.write('任务重试成功!')
if __name__ == '__main__':
luigi.run()执行命令:
python luigi_demo_04.py UnstableTask --local-schedulerLuigi 会自动捕获任务异常,按 retry_count 配置重试,直到执行成功或耗尽重试次数,适合网络请求、数据库操作等不稳定场景。
四、Luigi 进阶使用:Web 监控与多任务调度
4.1 启动 Luigi 中央调度器与 WebUI
单机调度器仅适合测试,生产环境推荐使用 Luigi 中央调度器,自带 WebUI 可实时查看任务状态、依赖图、执行日志。
1. 启动中央调度器:
luigid --port 8082默认端口 8082,启动后访问:http://localhost:8082 即可打开 Web 监控界面。
2. 提交任务到中央调度器:
执行任务时去掉 --local-scheduler,自动连接中央调度器:
python luigi_demo_02.py CalculateSum在 WebUI 中可查看任务执行进度、失败任务、依赖关系,支持手动终止任务。
4.2 多任务并行执行
Luigi 支持多进程并行执行无依赖的任务,提升执行效率,通过命令行参数指定并行进程数:
python luigi_multi_task.py MainTask --workers 4--workers 4 表示使用 4 个进程并行执行,无依赖的任务会同时运行,有依赖的任务按顺序执行。
4.3 封装为可复用任务模块
实际项目中会将任务按功能拆分到不同模块,标准目录结构如下:
luigi_project/
├── tasks/
│ ├── __init__.py
│ ├── data_task.py # 数据生成、清洗任务
│ ├── compute_task.py # 计算、分析任务
│ └── output_task.py # 结果输出任务
├── config/
│ └── luigi.cfg # Luigi 配置文件
└── main.py # 任务入口luigi.cfg 可配置默认调度器、重试次数、日志路径等,简化命令行参数:
[core]
default-scheduler-host = localhost
default-scheduler-port = 8082
retry-attempts = 3五、真实场景实战案例:数据清洗与统计分析工作流
5.1 案例需求
模拟企业日常数据处理流程,完成以下任务:
- 生成原始 CSV 数据(包含姓名、年龄、城市、销售额)。
- 清洗数据:去除空值、过滤异常年龄、标准化城市名称。
- 统计分析:按城市分组计算总销售额、平均年龄。
- 输出统计结果到文本文件。
5.2 完整代码实现
创建文件 data_workflow.py:
import luigi
import csv
import os
# 任务1:生成原始 CSV 数据
class GenerateRawData(luigi.Task):
def output(self):
return luigi.LocalTarget('raw_data.csv')
def run(self):
# 模拟业务数据
data = [
['姓名', '年龄', '城市', '销售额'],
['张三', 25, '北京', 5000],
['李四', 32, '上海', 8000],
['王五', '', '广州', 6000],
['赵六', 40, '深圳', 12000],
['钱七', 150, '北京', 3000],
['孙八', 28, '上海', 9000],
['周九', None, '广州', 4500]
]
with self.output().open('w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(data)
# 任务2:清洗数据
class CleanData(luigi.Task):
def requires(self):
return GenerateRawData()
def output(self):
return luigi.LocalTarget('cleaned_data.csv')
def run(self):
with self.input().open('r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader)
cleaned_data = [header]
for row in reader:
# 跳过空值行
if not all(row):
continue
name, age, city, sales = row
# 过滤异常年龄
try:
age = int(age)
sales = int(sales)
except:
continue
if 18 <= age <= 60:
cleaned_data.append([name, age, city, sales])
with self.output().open('w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(cleaned_data)
# 任务3:按城市统计销售额
class CitySalesStat(luigi.Task):
def requires(self):
return CleanData()
def output(self):
return luigi.LocalTarget('city_sales_report.txt')
def run(self):
city_stat = {}
with self.input().open('r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
city = row['城市']
sales = int(row['销售额'])
age = int(row['年龄'])
if city not in city_stat:
city_stat[city] = {'total_sales': 0, 'total_age': 0, 'count': 0}
city_stat[city]['total_sales'] += sales
city_stat[city]['total_age'] += age
city_stat[city]['count'] += 1
# 生成报告
with self.output().open('w', encoding='utf-8') as f:
f.write('城市销售统计报告\n')
f.write('='*30 + '\n')
for city, data in city_stat.items():
avg_age = data['total_age'] / data['count']
f.write(f'城市:{city}\n')
f.write(f'总销售额:{data["total_sales"]}\n')
f.write(f'平均年龄:{avg_age:.2f}\n')
f.write('-'*30 + '\n')
if __name__ == '__main__':
luigi.run()5.3 案例执行与效果
执行命令:
python data_workflow.py CitySalesStat --local-scheduler执行流程:
- 生成
raw_data.csv原始数据。 - 清洗空值、异常年龄,生成
cleaned_data.csv。 - 按城市统计数据,生成
city_sales_report.txt报告。
该案例完整还原了企业数据处理流程,体现了 Luigi 依赖管理、断点续跑、任务复用的核心价值,可直接扩展对接数据库、Hive、Spark 等大数据组件。
相关资源
- Pypi地址:https://pypi.org/project/luigi/
- Github地址:https://github.com/spotify/luigi
- 官方文档地址:https://luigi.readthedocs.io/
关注我,每天分享一个实用的Python自动化工具。

