Python实用工具Beam:轻量化任务调度与异步执行入门教程

一、Beam库核心概述

1.1 用途与工作原理

Python Beam库是一款轻量化的任务调度与异步执行工具,主打简单任务编排、定时任务管理和异步函数执行能力,能够帮助开发者摆脱复杂的多线程/多进程代码编写,快速实现任务的并行处理和定时触发。其核心工作原理基于Python的asyncio异步框架和schedule定时任务模块,通过封装任务队列、执行器和调度器,将用户定义的函数转化为可调度、可异步执行的任务单元,同时支持任务依赖管理和执行状态监控。

1.2 优缺点分析

优点

  • 轻量化设计,无过多第三方依赖,安装和部署成本极低;
  • API设计简洁直观,技术小白也能快速上手;
  • 同时支持同步任务、异步任务和定时任务,适用场景广泛;
  • 支持任务执行状态回调,便于监控任务运行结果。

缺点

  • 不支持分布式任务调度,仅适用于单机场景;
  • 高并发任务处理能力较弱,无法替代Celery等专业任务队列;
  • 文档和社区资源相对较少,问题排查难度略高。

1.3 License类型

Beam库采用MIT开源许可证,允许开发者自由使用、修改和分发源代码,无论是个人项目还是商业项目都可以无门槛集成,仅需保留原作者版权声明即可。

二、Beam库安装与环境配置

2.1 安装方式

Beam库已发布至PyPI,支持通过pip命令一键安装,适用于Python 3.7及以上版本,具体安装命令如下:

# 安装最新稳定版
pip install beam

# 安装指定版本(以0.7.0为例)
pip install beam==0.7.0

安装完成后,可在Python环境中通过以下代码验证安装是否成功:

import beam
# 打印库版本号,验证安装
print(beam.__version__)

若终端输出对应的版本号,则说明安装成功。

2.2 环境依赖说明

Beam库的核心依赖仅有两个Python标准库:

  • asyncio:用于实现异步任务执行;
  • schedule:用于实现定时任务调度。
    无需额外安装其他依赖,兼容性极强,可在Windows、Linux、macOS等主流操作系统中正常运行。

三、Beam库核心功能与代码示例

3.1 基础任务执行:同步与异步

Beam库的核心对象是TaskExecutorTask用于封装需要执行的函数,Executor用于负责任务的调度和执行。

3.1.1 同步任务执行

同步任务是指按照顺序依次执行的任务,适用于无依赖的简单函数调用。

from beam import Task, Executor

# 定义一个简单的同步函数
def add(a: int, b: int) -> int:
    """两数相加的同步函数"""
    result = a + b
    print(f"执行加法任务:{a} + {b} = {result}")
    return result

def multiply(a: int, b: int) -> int:
    """两数相乘的同步函数"""
    result = a * b
    print(f"执行乘法任务:{a} * {b} = {result}")
    return result

# 步骤1:创建任务执行器
executor = Executor()

# 步骤2:创建Task对象,封装函数和参数
task1 = Task(target=add, args=(2, 3))
task2 = Task(target=multiply, args=(4, 5))

# 步骤3:将任务添加到执行器并执行
executor.add_task(task1)
executor.add_task(task2)

# 执行所有任务(同步执行,按添加顺序执行)
executor.run()

代码说明

  • 首先导入TaskExecutor两个核心类;
  • 定义addmultiply两个同步函数作为任务目标;
  • 创建Executor执行器对象,通过add_task方法添加任务;
  • 调用executor.run()方法执行所有任务,任务会按照添加顺序依次同步执行。

执行结果

执行加法任务:2 + 3 = 5
执行乘法任务:4 * 5 = 20

3.1.2 异步任务执行

异步任务是指无需等待前一个任务完成即可执行的任务,适用于I/O密集型场景(如网络请求、文件读写),能够有效提升任务执行效率。

import asyncio
from beam import Task, Executor

# 定义一个异步函数(模拟网络请求)
async def async_fetch(url: str) -> str:
    """模拟异步获取URL内容"""
    print(f"开始请求URL:{url}")
    # 模拟网络延迟
    await asyncio.sleep(2)
    result = f"成功获取{url}的内容"
    print(result)
    return result

# 步骤1:创建执行器
executor = Executor()

# 步骤2:创建异步任务(注意:异步函数需要指定is_async=True)
task1 = Task(target=async_fetch, args=("https://www.example.com",), is_async=True)
task2 = Task(target=async_fetch, args=("https://www.python.org",), is_async=True)

# 步骤3:添加任务并执行
executor.add_task(task1)
executor.add_task(task2)

# 执行异步任务
executor.run()

代码说明

  • 定义异步函数async_fetch,使用async def关键字声明;
  • 创建Task对象时,必须通过is_async=True标记该任务为异步任务;
  • 调用executor.run()后,两个异步任务会同时启动,无需等待前一个任务完成,总执行时间约为2秒(而非4秒)。

执行结果

开始请求URL:https://www.example.com
开始请求URL:https://www.python.org
成功获取https://www.example.com的内容
成功获取https://www.python.org的内容

3.2 定时任务调度

Beam库支持基于时间的定时任务调度,能够实现固定时间间隔执行特定时间点执行的需求,底层依赖schedule库实现。

3.2.1 固定间隔执行任务

from beam import Task, Executor, IntervalTrigger

# 定义需要定时执行的函数
def timed_print():
    """定时打印当前时间"""
    from datetime import datetime
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"定时任务执行时间:{current_time}")

# 步骤1:创建时间触发器(每5秒执行一次)
trigger = IntervalTrigger(seconds=5)

# 步骤2:创建定时任务,绑定触发器
task = Task(target=timed_print, trigger=trigger)

# 步骤3:创建执行器并添加任务
executor = Executor()
executor.add_task(task)

# 步骤4:启动执行器,持续运行定时任务
# 注意:定时任务需要使用run_forever()方法,而非run()
executor.run_forever()

代码说明

  • 导入IntervalTrigger时间触发器类,用于定义任务执行间隔;
  • IntervalTrigger支持seconds(秒)、minutes(分钟)、hours(小时)等参数,此处设置为每5秒执行一次;
  • 定时任务需要调用executor.run_forever()方法启动,执行器会持续运行并按照设定的间隔触发任务;
  • 若需要停止定时任务,可在终端按下Ctrl+C中断程序。

执行结果

定时任务执行时间:2026-01-08 10:00:00
定时任务执行时间:2026-01-08 10:00:05
定时任务执行时间:2026-01-08 10:00:10
...

3.2.2 特定时间点执行任务

除了固定间隔,Beam库还支持通过CronTrigger实现类似Linux Crontab的定时规则,例如每天上午10点执行任务。

from beam import Task, Executor, CronTrigger

def daily_report():
    """每天10点生成日报"""
    print("生成每日工作报表...")

# 创建Cron触发器(每天10点执行)
# Cron表达式格式:分 时 日 月 周
trigger = CronTrigger(minute="0", hour="10", day="*", month="*", week="*")

# 创建任务并添加到执行器
task = Task(target=daily_report, trigger=trigger)
executor = Executor()
executor.add_task(task)

# 启动执行器
executor.run_forever()

代码说明

  • CronTrigger的参数与Crontab规则一致,支持通配符*(表示任意值);
  • 上述代码中,minute="0", hour="10"表示每天10点0分执行任务;
  • 适用于需要固定时间点执行的周期性任务,如日报生成、数据备份等。

3.3 任务依赖管理

在实际开发中,多个任务之间可能存在依赖关系(如任务B必须在任务A执行完成后才能执行),Beam库支持通过dependencies参数实现任务依赖管理。

from beam import Task, Executor

def task_a():
    """任务A:生成基础数据"""
    print("执行任务A:生成基础数据")
    return [1, 2, 3, 4, 5]

def task_b(data: list):
    """任务B:处理任务A生成的数据"""
    print(f"执行任务B:接收任务A的数据 {data}")
    processed_data = [x * 2 for x in data]
    print(f"任务B处理结果:{processed_data}")
    return processed_data

# 步骤1:创建任务A
task_a_obj = Task(target=task_a, name="task_a")

# 步骤2:创建任务B,指定依赖任务A
# dependencies参数接收任务对象列表,任务B会在任务A执行完成后自动获取其返回值
task_b_obj = Task(target=task_b, args=(task_a_obj.result,), dependencies=[task_a_obj], name="task_b")

# 步骤3:执行任务
executor = Executor()
executor.add_task(task_a_obj)
executor.add_task(task_b_obj)
executor.run()

代码说明

  • 创建任务时可以通过name参数指定任务名称,便于识别;
  • 任务B的args参数中使用task_a_obj.result表示接收任务A的返回值作为参数;
  • dependencies=[task_a_obj]表示任务B依赖任务A,执行器会确保任务A执行完成后再执行任务B。

执行结果

执行任务A:生成基础数据
执行任务B:接收任务A的数据 [1, 2, 3, 4, 5]
任务B处理结果:[2, 4, 6, 8, 10]

3.4 任务执行状态监控

Beam库支持通过回调函数监控任务的执行状态,包括任务开始、任务成功、任务失败三种状态,便于开发者及时处理任务执行过程中的异常。

from beam import Task, Executor

# 定义任务函数(包含异常场景)
def divide(a: int, b: int) -> float:
    """两数相除,模拟异常场景"""
    return a / b

# 定义状态回调函数
def on_task_start(task):
    """任务开始时的回调"""
    print(f"任务 {task.name} 开始执行...")

def on_task_success(task, result):
    """任务成功时的回调"""
    print(f"任务 {task.name} 执行成功,结果:{result}")

def on_task_failure(task, exception):
    """任务失败时的回调"""
    print(f"任务 {task.name} 执行失败,异常:{exception}")

# 创建执行器
executor = Executor()

# 创建正常任务
task_normal = Task(
    target=divide,
    args=(10, 2),
    name="normal_task",
    on_start=on_task_start,
    on_success=on_task_success,
    on_failure=on_task_failure
)

# 创建异常任务(除数为0)
task_error = Task(
    target=divide,
    args=(10, 0),
    name="error_task",
    on_start=on_task_start,
    on_success=on_task_success,
    on_failure=on_task_failure
)

# 添加任务并执行
executor.add_task(task_normal)
executor.add_task(task_error)
executor.run()

代码说明

  • Task对象分别绑定on_starton_successon_failure三个回调函数;
  • 正常任务执行时,会依次触发on_starton_success
  • 异常任务(除数为0)执行时,会触发on_starton_failure,并传入异常信息。

执行结果

任务 normal_task 开始执行...
任务 normal_task 执行成功,结果:5.0
任务 error_task 开始执行...
任务 error_task 执行失败,异常:division by zero

四、实际应用案例:文件批量处理工具

4.1 案例需求

开发一个文件批量处理工具,实现以下功能:

  1. 遍历指定目录下的所有.txt文件;
  2. 异步读取每个文件的内容;
  3. 统计每个文件的字符数;
  4. 将统计结果写入到result.txt文件中。

4.2 代码实现

import asyncio
import os
from typing import List
from beam import Task, Executor

# 定义异步文件读取函数
async def read_file_async(file_path: str) -> tuple:
    """异步读取文件内容并统计字符数"""
    try:
        async with asyncio.open(file_path, "r", encoding="utf-8") as f:
            content = await f.read()
        char_count = len(content)
        file_name = os.path.basename(file_path)
        return (file_name, char_count)
    except Exception as e:
        return (os.path.basename(file_path), f"读取失败:{str(e)}")

# 定义结果写入函数
def write_result(results: List[tuple]):
    """将统计结果写入result.txt"""
    with open("result.txt", "w", encoding="utf-8") as f:
        f.write("文件名\t字符数\n")
        f.write("-" * 20 + "\n")
        for file_name, count in results:
            f.write(f"{file_name}\t{count}\n")
    print("统计结果已写入result.txt")

# 定义主函数
def batch_process_files(dir_path: str):
    """批量处理指定目录下的txt文件"""
    # 步骤1:获取目录下所有txt文件路径
    txt_files = []
    for file in os.listdir(dir_path):
        if file.endswith(".txt"):
            txt_files.append(os.path.join(dir_path, file))

    if not txt_files:
        print("未找到任何txt文件")
        return

    # 步骤2:创建执行器和异步任务
    executor = Executor()
    tasks = []
    for file_path in txt_files:
        task = Task(
            target=read_file_async,
            args=(file_path,),
            is_async=True,
            name=f"task_{os.path.basename(file_path)}"
        )
        tasks.append(task)
        executor.add_task(task)

    # 步骤3:执行所有异步任务
    executor.run()

    # 步骤4:收集所有任务结果
    results = [task.result for task in tasks]

    # 步骤5:写入结果文件
    write_result(results)

# 执行批量处理(替换为你的目标目录)
if __name__ == "__main__":
    target_dir = "./test_files"  # 目标目录
    # 创建测试目录和文件(可选,用于测试)
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
        # 创建测试文件1
        with open(os.path.join(target_dir, "file1.txt"), "w", encoding="utf-8") as f:
            f.write("Hello Beam!")
        # 创建测试文件2
        with open(os.path.join(target_dir, "file2.txt"), "w", encoding="utf-8") as f:
            f.write("Python 任务调度工具")

    batch_process_files(target_dir)

4.3 代码说明

  1. 异步文件读取:使用asyncio.open异步读取文件内容,避免I/O阻塞,提升批量处理效率;
  2. 任务创建:为每个.txt文件创建一个异步任务,通过is_async=True标记;
  3. 结果收集:任务执行完成后,通过task.result收集每个任务的返回值;
  4. 结果写入:将所有文件的统计结果写入result.txt,便于后续查看。

4.4 执行结果

运行代码后,会在当前目录生成result.txt文件,内容如下:

文件名    字符数
--
file1.txt    10
file2.txt    14

五、相关资源链接

  • PyPI地址:https://pypi.org/project/Beam
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。