Python 任务编排神器:Luigi 库从入门到实战教程

一、Luigi 库概述

Luigi 是 Spotify 开源的 Python 任务编排与工作流管理库,专注于解决复杂批量任务依赖、执行与监控问题,核心原理是通过定义任务依赖关系自动构建执行拓扑图,按依赖顺序调度任务,支持任务失败重试、断点续跑。优点是依赖管理清晰、适配大数据与批处理场景、代码侵入低、易集成;缺点是无原生分布式调度、WebUI 功能简洁。采用 Apache License 2.0 开源协议,可商用与修改。

二、Luigi 库安装与基础环境配置

2.1 安装 Luigi

Luigi 支持 Python 3.6 及以上版本,使用 pip 即可快速安装,打开命令行执行以下命令:

pip install luigi

安装完成后可通过查看版本验证是否安装成功:

luigi --version

若输出对应版本号,说明安装正常。

2.2 核心基础概念

在使用 Luigi 前,需要先掌握几个核心概念:

  1. Task(任务):所有工作流的最小单元,继承 luigi.Task 类,需重写 requires()run()output() 三个核心方法。
  2. requires():定义当前任务依赖的前置任务,无依赖则可不写或返回空。
  3. run():任务的核心逻辑,编写具体执行代码。
  4. output():定义任务执行完成后的输出目标,通常为文件、数据库标识等,用于判断任务是否已完成。
  5. Target(目标):任务输出的抽象载体,常用 LocalTarget(本地文件)、HiveTargetPostgresTarget 等。
  6. 工作流:多个 Task 通过依赖关系串联形成的完整执行流程。

三、Luigi 基础使用与代码示例

3.1 最简单的单机任务

先从无依赖的基础任务入手,创建一个生成文本文件的任务,直观感受 Luigi 的执行逻辑。

创建文件 luigi_demo_01.py,代码如下:

import luigi

# 定义基础任务,继承 luigi.Task
class CreateFile(luigi.Task):
    # 定义任务输出文件
    def output(self):
        # LocalTarget 表示本地文件目标
        return luigi.LocalTarget('hello_luigi.txt')

    # 任务执行逻辑
    def run(self):
        # self.output().open() 获取输出文件句柄
        with self.output().open('w') as f:
            f.write('Hello Luigi! 这是第一个 Luigi 任务\n')
            f.write('任务执行成功!')

if __name__ == '__main__':
    # 命令行方式启动任务
    luigi.run()

代码说明

  1. 自定义 CreateFile 任务继承 luigi.Task,是 Luigi 任务的标准写法。
  2. output() 方法指定输出为本地文件 hello_luigi.txt,Luigi 会通过该文件判断任务是否完成。
  3. run() 方法内编写业务逻辑,向目标文件写入文本内容。
  4. luigi.run() 允许以命令行参数方式启动任务。

执行命令

python luigi_demo_01.py CreateFile --local-scheduler
  • CreateFile:指定要执行的任务类名。
  • --local-scheduler:使用本地调度器,适合单机测试。

执行成功后,目录下会生成 hello_luigi.txt 文件,且再次执行相同命令时,Luigi 会检测到文件已存在,直接判定任务完成,不再重复执行,这就是 Luigi 的幂等性核心特性。

3.2 带依赖的多任务工作流

实际场景中任务通常存在依赖关系,例如先创建数据文件,再读取文件处理数据。下面实现两个任务的依赖串联。

创建文件 luigi_demo_02.py

import luigi

# 任务1:生成原始数据文件
class GenerateData(luigi.Task):
    def output(self):
        return luigi.LocalTarget('data.txt')

    def run(self):
        with self.output().open('w') as f:
            # 写入 1-10 的数字
            for i in range(1, 11):
                f.write(f'{i}\n')

# 任务2:依赖 GenerateData,计算数字总和
class CalculateSum(luigi.Task):
    # 定义依赖任务
    def requires(self):
        return GenerateData()

    def output(self):
        return luigi.LocalTarget('sum_result.txt')

    def run(self):
        # 读取依赖任务的输出文件
        with self.input().open('r') as f:
            lines = f.readlines()
            # 转换为整数并求和
            total = sum(int(line.strip()) for line in lines if line.strip())

        # 写入计算结果
        with self.output().open('w') as f:
            f.write(f'1到10的总和为:{total}')

if __name__ == '__main__':
    luigi.run()

代码说明

  1. GenerateData 任务生成包含 1-10 数字的 data.txt
  2. CalculateSum 任务通过 requires() 依赖 GenerateData,执行前会先自动运行前置任务。
  3. self.input() 可直接获取依赖任务的输出 Target,无需手动指定路径,解耦且安全。
  4. 任务执行完成后生成 sum_result.txt,存储计算结果。

执行命令

python luigi_demo_02.py CalculateSum --local-scheduler

执行流程:先运行 GenerateData 生成数据文件,再运行 CalculateSum 计算总和,若 data.txt 已存在,则跳过前置任务,直接执行计算任务。

3.3 带参数的动态任务

固定任务无法满足多变需求,Luigi 支持通过 luigi.Parameter() 定义参数,实现任务动态化。

创建文件 luigi_demo_03.py

import luigi

# 带参数的生成数据任务
class GenerateNumData(luigi.Task):
    # 定义参数:数字上限
    max_num = luigi.IntParameter(default=10)
    # 定义参数:输出文件名
    filename = luigi.Parameter(default='num_data.txt')

    def output(self):
        return luigi.LocalTarget(self.filename)

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.max_num + 1):
                f.write(f'{i}\n')

# 带参数的求和任务
class CalculateDynamicSum(luigi.Task):
    max_num = luigi.IntParameter(default=10)
    filename = luigi.Parameter(default='num_data.txt')

    def requires(self):
        # 向依赖任务传递参数
        return GenerateNumData(max_num=self.max_num, filename=self.filename)

    def output(self):
        return luigi.LocalTarget('dynamic_sum_result.txt')

    def run(self):
        with self.input().open('r') as f:
            lines = f.readlines()
            total = sum(int(line.strip()) for line in lines if line.strip())

        with self.output().open('w') as f:
            f.write(f'1到{self.max_num}的总和为:{total}')

if __name__ == '__main__':
    luigi.run()

代码说明

  1. 使用 luigi.IntParameter 定义整数参数,luigi.Parameter 定义字符串参数,支持默认值。
  2. 子任务可通过 requires() 向父任务传递参数,保持参数一致性。
  3. 输出路径、数据范围均可通过参数动态调整,提升任务复用性。

执行命令(指定参数)

python luigi_demo_03.py CalculateDynamicSum --max-num 20 --filename my_data.txt --local-scheduler

命令中 --max-num 20 对应任务中的 max_num 参数,会生成 1-20 的数据并计算总和。

3.4 任务失败重试与断点续跑

Luigi 内置任务失败重试机制,无需手动编写异常处理,只需在任务中配置重试次数。

示例代码(添加重试配置):

import luigi
import random

class UnstableTask(luigi.Task):
    # 配置重试次数
    retry_count = luigi.IntParameter(default=3)

    def output(self):
        return luigi.LocalTarget('retry_test.txt')

    def run(self):
        # 模拟随机失败
        if random.random() < 0.7:
            raise Exception('任务随机失败,触发重试')
        with self.output().open('w') as f:
            f.write('任务重试成功!')

if __name__ == '__main__':
    luigi.run()

执行命令

python luigi_demo_04.py UnstableTask --local-scheduler

Luigi 会自动捕获任务异常,按 retry_count 配置重试,直到执行成功或耗尽重试次数,适合网络请求、数据库操作等不稳定场景。

四、Luigi 进阶使用:Web 监控与多任务调度

4.1 启动 Luigi 中央调度器与 WebUI

单机调度器仅适合测试,生产环境推荐使用 Luigi 中央调度器,自带 WebUI 可实时查看任务状态、依赖图、执行日志。

1. 启动中央调度器

luigid --port 8082

默认端口 8082,启动后访问:http://localhost:8082 即可打开 Web 监控界面。

2. 提交任务到中央调度器
执行任务时去掉 --local-scheduler,自动连接中央调度器:

python luigi_demo_02.py CalculateSum

在 WebUI 中可查看任务执行进度、失败任务、依赖关系,支持手动终止任务。

4.2 多任务并行执行

Luigi 支持多进程并行执行无依赖的任务,提升执行效率,通过命令行参数指定并行进程数:

python luigi_multi_task.py MainTask --workers 4

--workers 4 表示使用 4 个进程并行执行,无依赖的任务会同时运行,有依赖的任务按顺序执行。

4.3 封装为可复用任务模块

实际项目中会将任务按功能拆分到不同模块,标准目录结构如下:

luigi_project/
├── tasks/
│   ├── __init__.py
│   ├── data_task.py    # 数据生成、清洗任务
│   ├── compute_task.py # 计算、分析任务
│   └── output_task.py  # 结果输出任务
├── config/
│   └── luigi.cfg       # Luigi 配置文件
└── main.py             # 任务入口

luigi.cfg 可配置默认调度器、重试次数、日志路径等,简化命令行参数:

[core]
default-scheduler-host = localhost
default-scheduler-port = 8082
retry-attempts = 3

五、真实场景实战案例:数据清洗与统计分析工作流

5.1 案例需求

模拟企业日常数据处理流程,完成以下任务:

  1. 生成原始 CSV 数据(包含姓名、年龄、城市、销售额)。
  2. 清洗数据:去除空值、过滤异常年龄、标准化城市名称。
  3. 统计分析:按城市分组计算总销售额、平均年龄。
  4. 输出统计结果到文本文件。

5.2 完整代码实现

创建文件 data_workflow.py

import luigi
import csv
import os

# 任务1:生成原始 CSV 数据
class GenerateRawData(luigi.Task):
    def output(self):
        return luigi.LocalTarget('raw_data.csv')

    def run(self):
        # 模拟业务数据
        data = [
            ['姓名', '年龄', '城市', '销售额'],
            ['张三', 25, '北京', 5000],
            ['李四', 32, '上海', 8000],
            ['王五', '', '广州', 6000],
            ['赵六', 40, '深圳', 12000],
            ['钱七', 150, '北京', 3000],
            ['孙八', 28, '上海', 9000],
            ['周九', None, '广州', 4500]
        ]
        with self.output().open('w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerows(data)

# 任务2:清洗数据
class CleanData(luigi.Task):
    def requires(self):
        return GenerateRawData()

    def output(self):
        return luigi.LocalTarget('cleaned_data.csv')

    def run(self):
        with self.input().open('r', encoding='utf-8') as f:
            reader = csv.reader(f)
            header = next(reader)
            cleaned_data = [header]

            for row in reader:
                # 跳过空值行
                if not all(row):
                    continue
                name, age, city, sales = row
                # 过滤异常年龄
                try:
                    age = int(age)
                    sales = int(sales)
                except:
                    continue
                if 18 <= age <= 60:
                    cleaned_data.append([name, age, city, sales])

        with self.output().open('w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerows(cleaned_data)

# 任务3:按城市统计销售额
class CitySalesStat(luigi.Task):
    def requires(self):
        return CleanData()

    def output(self):
        return luigi.LocalTarget('city_sales_report.txt')

    def run(self):
        city_stat = {}
        with self.input().open('r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                city = row['城市']
                sales = int(row['销售额'])
                age = int(row['年龄'])

                if city not in city_stat:
                    city_stat[city] = {'total_sales': 0, 'total_age': 0, 'count': 0}
                city_stat[city]['total_sales'] += sales
                city_stat[city]['total_age'] += age
                city_stat[city]['count'] += 1

        # 生成报告
        with self.output().open('w', encoding='utf-8') as f:
            f.write('城市销售统计报告\n')
            f.write('='*30 + '\n')
            for city, data in city_stat.items():
                avg_age = data['total_age'] / data['count']
                f.write(f'城市:{city}\n')
                f.write(f'总销售额:{data["total_sales"]}\n')
                f.write(f'平均年龄:{avg_age:.2f}\n')
                f.write('-'*30 + '\n')

if __name__ == '__main__':
    luigi.run()

5.3 案例执行与效果

执行命令

python data_workflow.py CitySalesStat --local-scheduler

执行流程:

  1. 生成 raw_data.csv 原始数据。
  2. 清洗空值、异常年龄,生成 cleaned_data.csv
  3. 按城市统计数据,生成 city_sales_report.txt 报告。

该案例完整还原了企业数据处理流程,体现了 Luigi 依赖管理、断点续跑、任务复用的核心价值,可直接扩展对接数据库、Hive、Spark 等大数据组件。

相关资源

  • Pypi地址:https://pypi.org/project/luigi/
  • Github地址:https://github.com/spotify/luigi
  • 官方文档地址:https://luigi.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。