Python实用工具:Apache Airflow 从入门到实战 保姆级教程

一、Apache Airflow 核心概述

Apache Airflow是一款由Airbnb开源的任务编排与调度工具,专门用于管理复杂的工作流(Workflow)。其工作原理是将任务抽象为有向无环图(DAG),通过调度器按照任务依赖关系和预设的触发规则自动执行任务。Airflow支持丰富的任务类型,涵盖Shell命令、Python函数、SQL查询等,广泛应用于数据清洗、ETL流程、定时任务执行等场景。

该库的开源协议为Apache License 2.0,这是一个对商业友好的开源协议,允许用户自由使用、修改和分发代码。它的优点是灵活性强、可扩展性高、可视化界面友好;缺点是部署和维护成本较高,对新手不够友好,且轻量级任务场景下略显笨重。

二、Apache Airflow 安装与环境配置

2.1 系统环境要求

在安装Airflow之前,需要确保本地环境满足以下条件:

  • Python 3.8~3.11 版本(Airflow 2.x 对Python版本有严格要求)
  • 足够的磁盘空间(Airflow会存储任务日志和元数据)
  • 已安装pip包管理工具

2.2 安装步骤

Airflow的安装方式有多种,包括pip安装、Docker安装和源码安装,这里我们以最适合新手的pip安装为例进行讲解。

  1. 设置环境变量
    Airflow默认从PyPI下载包,在安装前需要设置一个环境变量来指定Airflow的家目录,同时避免版本冲突: # Linux/Mac系统 export AIRFLOW_HOME=~/airflow # Windows系统(cmd命令行) set AIRFLOW_HOME=D:\airflow
  2. 安装Airflow
    由于Airflow的依赖包较多,直接安装可能会出现问题,我们可以先升级pip,再指定Airflow版本进行安装(推荐安装稳定版2.6.3): # 升级pip python -m pip install --upgrade pip # 安装Airflow核心包 pip install apache-airflow==2.6.3
  3. 初始化元数据库
    Airflow使用元数据库(默认是SQLite)存储DAG信息、任务执行状态等数据,安装完成后需要初始化数据库: airflow db init
  4. 创建管理员用户
    为了登录Airflow的Web UI,需要创建一个管理员账户:
    bash airflow users create \ --username admin \ --firstname FirstName \ --lastname LastName \ --role Admin \ --email [email protected]
    执行该命令后,会提示输入密码,按照提示输入即可。

2.3 启动Airflow服务

Airflow包含两个核心服务:Web服务器调度器,需要分别启动。

  1. 启动Web服务器 # 默认端口是8080 airflow webserver --port 8080
  2. 启动调度器
    打开一个新的终端窗口,执行以下命令启动调度器,调度器会自动检测DAG目录中的任务并执行: airflow scheduler
  3. 访问Web UI
    打开浏览器,输入地址 http://localhost:8080,使用之前创建的管理员账户和密码登录,即可看到Airflow的可视化界面。

三、Apache Airflow 核心概念与基础用法

3.1 核心概念解析

在使用Airflow之前,必须理解几个核心概念,这些概念是构建工作流的基础。

  1. DAG(有向无环图)
    是Airflow的核心,代表一个完整的工作流。DAG由多个任务(Task)组成,任务之间存在依赖关系,且不存在循环依赖(无环)。例如,一个ETL工作流的DAG可以是:数据抽取任务 → 数据清洗任务 → 数据加载任务
  2. Task(任务) 是DAG中的最小执行单元,每个Task对应一个具体的操作。Airflow支持多种类型的Task,常见的有:
    • PythonOperator:执行Python函数
    • BashOperator:执行Shell命令
    • SqlOperator:执行SQL语句
    • EmailOperator:发送邮件
  3. Operator(操作符)
    是定义Task的模板,不同的Operator对应不同类型的任务。Operator的作用是将任务逻辑封装起来,用户只需要传入参数即可创建Task。
  4. Task Instance(任务实例)
    是Task的一次具体执行。每个Task在不同的时间点执行,都会生成一个独立的Task Instance,例如每天执行一次的任务,每天都会产生一个新的Task Instance。
  5. DAG Run(DAG运行实例)
    是DAG的一次具体执行。当DAG满足触发条件时,调度器会创建一个DAG Run,负责执行该次运行中的所有Task Instance。

3.2 编写第一个DAG

Airflow的DAG文件是Python脚本,默认存储在AIRFLOW_HOME/dags目录下,调度器会自动扫描该目录下的Python文件并加载DAG。

接下来我们编写一个简单的DAG,包含两个任务:一个任务打印“Hello Airflow”,另一个任务打印“Task Finished”,且第二个任务依赖于第一个任务。

  1. 创建DAG文件
    AIRFLOW_HOME/dags目录下创建一个名为hello_airflow_dag.py的文件。
  2. 编写DAG代码 # 导入必要的模块 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator # 定义Python函数,作为任务的执行逻辑 def print_hello(): print("Hello Airflow! This is my first DAG.") def print_finish(): print("Task Finished! Congratulations!") # 定义默认参数 default_args = { 'owner': 'airflow', # DAG的所有者 'depends_on_past': False, # 不依赖于上一次的执行结果 'email_on_failure': False, # 任务失败时不发送邮件 'email_on_retry': False, # 任务重试时不发送邮件 'retries': 1, # 任务失败后的重试次数 'retry_delay': timedelta(minutes=5), # 重试间隔时间 } # 定义DAG with DAG( 'hello_airflow_dag', # DAG的唯一标识 default_args=default_args, description='A simple tutorial DAG', # DAG的描述 schedule_interval=timedelta(days=1), # 调度间隔,每天执行一次 start_date=datetime(2024, 1, 1), # DAG的开始时间 catchup=False, # 不回溯执行历史任务 tags=['example', 'tutorial'], # 标签,用于分类DAG ) as dag:# 定义第一个任务 task1 = PythonOperator( task_id='print_hello_task', # 任务的唯一标识 python_callable=print_hello, # 任务执行的Python函数 ) # 定义第二个任务 task2 = PythonOperator( task_id='print_finish_task', python_callable=print_finish, ) # 设置任务依赖关系:task1执行完成后再执行task2 task1 &gt;&gt; task2</code></pre></li>代码说明 默认参数(default_args):定义了DAG中所有任务的公共参数,如所有者、重试次数、重试间隔等。 DAG定义:使用with DAG()上下文管理器创建DAG,指定了DAG的ID、描述、调度间隔、开始时间等关键信息。 任务定义:使用PythonOperator创建两个任务,分别指定任务ID和要执行的Python函数。 依赖关系设置:使用>>运算符设置任务之间的依赖关系,task1 >> task2表示task2必须在task1执行完成后才能执行。 查看并触发DAG
    将上述代码保存到dags目录后,等待1~2分钟,调度器会自动加载该DAG。在Airflow Web UI的DAG列表中找到hello_airflow_dag,点击右侧的开关按钮启用该DAG。 启用后,可以手动触发DAG执行:点击DAG名称进入详情页,点击右上角的“Trigger DAG”按钮,即可手动启动一次DAG运行。运行完成后,可以在“Graph”视图中查看任务的执行状态,在“Log”视图中查看任务的输出日志。

3.3 常用Operator实战

除了PythonOperator,Airflow还提供了多种常用的Operator,下面我们介绍几种最常用的Operator及其用法。

3.3.1 BashOperator:执行Shell命令

BashOperator用于执行Shell命令或脚本,适合处理需要调用系统命令的任务。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    'bash_operator_dag',
    default_args=default_args,
    description='A DAG using BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['bash', 'example'],
) as dag:

    # 执行简单的Shell命令
    task1 = BashOperator(
        task_id='print_date_task',
        bash_command='date',  # 打印当前日期
    )

    # 执行Shell脚本
    task2 = BashOperator(
        task_id='run_script_task',
        bash_command='echo "Current directory: $(pwd)" && ls -l',
    )

    task1 >> task2

代码说明bash_command参数用于指定要执行的Shell命令或脚本,多个命令可以用&&连接。

3.3.2 EmailOperator:发送邮件

EmailOperator用于在任务执行完成后发送邮件通知,适合用于任务执行状态的告警。

使用EmailOperator前,需要在AIRFLOW_HOME/airflow.cfg配置文件中设置邮件服务器信息:

[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [email protected]
smtp_password = your_email_password
smtp_port = 587
smtp_mail_from = [email protected]

然后编写DAG代码:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

def task_function():
    return "Task executed successfully!"

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    'email_operator_dag',
    default_args=default_args,
    description='A DAG using EmailOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['email', 'example'],
) as dag:

    task1 = PythonOperator(
        task_id='execute_task',
        python_callable=task_function,
    )

    # 发送邮件通知
    send_email = EmailOperator(
        task_id='send_email_notification',
        to='[email protected]',  # 收件人邮箱
        subject='Airflow Task Execution Status',  # 邮件主题
        html_content='<h3>Task executed successfully!</h3>',  # 邮件内容(支持HTML)
    )

    task1 >> send_email

四、Airflow 工作流实战:数据ETL流程

4.1 需求分析

我们以一个简单的电商用户数据ETL流程为例,演示如何使用Airflow构建完整的工作流。需求如下:

  1. 数据抽取:从CSV文件中读取用户数据。
  2. 数据清洗:去除缺失值、重复值,转换数据格式。
  3. 数据加载:将清洗后的数据写入新的CSV文件。
  4. 发送通知:数据加载完成后发送邮件通知。

4.2 项目目录结构

对于复杂的Airflow项目,建议采用以下目录结构,便于管理代码和数据:

airflow_home/
├── dags/
│   ├── etl_user_data_dag.py  # ETL流程的DAG文件
│   └── etl_scripts/          # 存储ETL相关的Python脚本
│       ├── extract.py
│       ├── transform.py
│       └── load.py
├── data/                     # 存储原始数据和清洗后的数据
│   ├── raw/
│   │   └── user_data.csv
│   └── processed/
└── logs/                     # Airflow任务日志

4.3 编写ETL脚本

  1. 数据抽取脚本(extract.py) # dags/etl_scripts/extract.py import pandas as pd import os def extract_data(input_path: str) -> pd.DataFrame: """ 从CSV文件中抽取数据 :param input_path: 原始数据文件路径 :return: 抽取后的DataFrame """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input file not found: {input_path}") df = pd.read_csv(input_path) print(f"Extracted {len(df)} rows of data") return df
  2. 数据清洗脚本(transform.py) # dags/etl_scripts/transform.py import pandas as pd def transform_data(df: pd.DataFrame) -> pd.DataFrame: """ 数据清洗:去除缺失值、重复值,转换日期格式 :param df: 原始DataFrame :return: 清洗后的DataFrame """ # 去除缺失值 df = df.dropna(subset=['user_id', 'username', 'register_date']) # 去除重复值 df = df.drop_duplicates(subset=['user_id']) # 转换注册日期格式为YYYY-MM-DD df['register_date'] = pd.to_datetime(df['register_date']).dt.strftime('%Y-%m-%d') print(f"Transformed data: {len(df)} rows remaining") return df
  3. 数据加载脚本(load.py) # dags/etl_scripts/load.py import pandas as pd import os def load_data(df: pd.DataFrame, output_path: str) -> None: """ 将清洗后的数据写入CSV文件 :param df: 清洗后的DataFrame :param output_path: 输出文件路径 """ # 创建输出目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 写入CSV文件 df.to_csv(output_path, index=False) print(f"Loaded data to {output_path} successfully")

4.4 编写ETL DAG文件

# dags/etl_user_data_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from etl_scripts.extract import extract_data
from etl_scripts.transform import transform_data
from etl_scripts.load import load_data
import os

# 定义文件路径
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '~/airflow')
INPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/raw/user_data.csv')
OUTPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/processed/cleaned_user_data.csv')

# 定义默认参数
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# 定义任务函数(封装ETL脚本)
def extract_task():
    return extract_data(INPUT_PATH)

def transform_task(ti):
    # 从上游任务(extract_task)获取数据
    df = ti.xcom_pull(task_ids='extract_task')
    return transform_data(df)

def load_task(ti):
    df = ti.xcom_pull(task_ids='transform_task')
    load_data(df, OUTPUT_PATH)

# 定义DAG
with DAG(
    'etl_user_data_workflow',
    default_args=default_args,
    description='A complete ETL workflow for user data',
    schedule_interval='0 1 * * *',  # 每天凌晨1点执行
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'data_processing'],
) as dag:

    # 定义任务
    extract = PythonOperator(
        task_id='extract_task',
        python_callable=extract_task,
    )

    transform = PythonOperator(
        task_id='transform_task',
        python_callable=transform_task,
    )

    load = PythonOperator(
        task_id='load_task',
        python_callable=load_task,
    )

    send_notification = EmailOperator(
        task_id='send_notification',
        to='[email protected]',
        subject='User Data ETL Workflow Completed',
        html_content=f'<h3>ETL Workflow executed successfully!</h3><p>Cleaned data saved to: {OUTPUT_PATH}</p>',
    )

    # 设置任务依赖关系
    extract >> transform >> load >> send_notification

4.5 代码关键说明

  1. XCom通信机制
    Airflow中,任务之间的数据传递通过XCom(Cross-Communication) 实现。在上述代码中,transform_task通过ti.xcom_pull(task_ids='extract_task')获取extract_task的返回值,load_task同理获取transform_task的返回值。XCom适合传递小量数据,大量数据建议使用文件或数据库存储。
  2. 调度时间设置
    schedule_interval参数支持多种格式,除了timedelta,还可以使用Cron表达式。例如'0 1 * * *'表示每天凌晨1点执行任务,这是生产环境中最常用的调度方式。
  3. 任务失败告警
    default_args中设置了email_on_failure: True,当任务执行失败时,Airflow会自动向指定邮箱发送告警邮件。

4.6 测试与运行

  1. 准备原始数据
    AIRFLOW_HOME/data/raw目录下创建user_data.csv文件,写入以下测试数据: user_id,username,register_date,age,gender 1,alice,2024-01-01,25,F 2,bob,2024-01-02,,M 3,charlie,2024-01-03,30,M 2,bob,2024-01-02,28,M 4,david,,35,M
  2. 启动DAG
    将DAG文件和ETL脚本放入对应的目录后,在Airflow Web UI中启用etl_user_data_workflow DAG。可以手动触发一次执行,查看任务的执行状态。执行完成后,在data/processed目录下会生成cleaned_user_data.csv文件,内容为清洗后的数据。

五、Airflow 高级特性与优化建议

5.1 连接管理(Connections)

在实际项目中,任务经常需要连接外部系统(如数据库、Hadoop、云存储等)。Airflow提供了Connections功能,用于统一管理外部系统的连接信息,避免在代码中硬编码用户名、密码等敏感信息。

例如,要连接MySQL数据库,可以在Airflow Web UI的“Admin → Connections”页面创建一个新的连接:

  • Conn Idmysql_default(自定义,需在代码中引用)
  • Conn TypeMySQL
  • Host:MySQL服务器地址
  • Schema:数据库名称
  • Login:用户名
  • Password:密码
  • Port:端口号(默认3306)

在代码中可以通过Hook获取连接信息:

from airflow.providers.mysql.hooks.mysql import MySqlHook

def query_mysql_data():
    hook = MySqlHook(mysql_conn_id='mysql_default')
    df = hook.get_pandas_df(sql='SELECT * FROM users')
    return df

5.2 变量管理(Variables)

Airflow的Variables功能用于存储和管理工作流中的配置变量,如文件路径、阈值参数等。可以在Web UI的“Admin → Variables”页面添加变量,也可以在代码中通过Variable类获取变量值:

from airflow.models import Variable

# 从Variables中获取变量
input_path = Variable.get('user_data_input_path', default_var='/default/path')
output_path = Variable.get('user_data_output_path')

5.3 任务并行执行与资源限制

Airflow支持任务的并行执行,可以通过以下参数优化并行度:

  • parallelism:Airflow集群的最大并行任务数(全局参数)。
  • dag_concurrency:单个DAG的最大并行任务数。
  • max_active_runs_per_dag:单个DAG的最大活跃运行实例数。

这些参数可以在airflow.cfg配置文件中修改。

5.4 日志管理

Airflow的任务日志默认存储在本地,在生产环境中可以将日志存储到远程存储系统(如S3、HDFS、Elasticsearch等),便于日志的集中管理和查询。修改airflow.cfg中的[logging]部分即可配置远程日志存储。

六、相关资源链接

  • PyPI地址:https://pypi.org/project/apache-airflow
  • Github地址:https://github.com/apache/airflow
  • 官方文档地址:https://airflow.apache.org/docs/apache-airflow/stable/index.html

关注我,每天分享一个实用的Python自动化工具。

Python Squirrel库入门教程:高效数据缓存与持久化工具

一、Squirrel库核心概述

Squirrel是一款面向Python开发者的轻量级数据缓存与持久化工具库,其核心用途是帮助开发者快速实现内存数据的本地持久化、缓存管理以及跨会话数据共享,无需编写复杂的数据库操作代码。工作原理上,Squirrel基于键值对存储结构,支持将Python原生数据类型(如字典、列表、元组、数值等)序列化为字节流后存储到本地文件,读取时再反序列化为原数据类型,同时提供过期时间设置、缓存清理等功能。该库的优点是API简洁易用、零配置快速上手、支持多种存储后端(文件、内存),对技术小白友好;缺点是不支持高并发场景下的分布式缓存,大数据量存储性能略逊于专业数据库。Squirrel的开源协议为MIT License,开发者可自由用于商业和非商业项目,无授权限制。

二、Squirrel库安装步骤

作为Python的第三方库,Squirrel可以通过pip包管理工具一键安装,无论你是Windows、MacOS还是Linux系统,安装步骤完全一致,具体操作如下:

2.1 环境准备

确保你的电脑已经安装了Python环境(推荐Python 3.6及以上版本),可以通过在命令行中输入以下指令验证Python版本:

# 验证Python版本
import sys
print(sys.version)

运行代码后,如果输出类似3.9.7 (default, Sep 16 2021, 08:50:36) [MSC v.1916 64 bit (AMD64)]的内容,说明Python环境已就绪。

2.2 执行安装命令

打开命令行终端(Windows为CMD或PowerShell,MacOS和Linux为Terminal),输入以下安装指令:

pip install squirrel

等待终端输出Successfully installed squirrel-x.x.x(x.x.x为版本号),即表示安装成功。如果安装过程中出现网络超时问题,可以切换国内PyPI镜像源,例如使用阿里云镜像:

pip install squirrel -i https://mirrors.aliyun.com/pypi/simple/

三、Squirrel库核心API使用教程

Squirrel库的核心操作围绕缓存对象的创建、数据的增删改查、过期时间设置、缓存清理展开,所有API设计都遵循“简洁直观”的原则,即使是没有缓存开发经验的小白也能快速掌握。下面我们通过代码示例逐一讲解每个核心功能的使用方法。

3.1 初始化缓存对象

使用Squirrel的第一步是创建一个缓存实例,该实例可以指定存储后端(默认是文件存储,也可以选择内存存储)。文件存储会将数据保存到本地文件,程序重启后数据不丢失;内存存储则仅在程序运行期间保存数据,程序退出后数据自动清除。

# 导入Squirrel库的核心类
from squirrel import Cache

# 初始化文件存储缓存,数据会保存到本地的squirrel_cache.db文件
file_cache = Cache(backend="file", path="squirrel_cache.db")

# 初始化内存存储缓存,程序退出后数据丢失
memory_cache = Cache(backend="memory")

代码说明

  • backend参数用于指定存储后端,可选值为filememory,默认值为file
  • path参数仅在backend="file"时生效,用于指定缓存文件的保存路径和文件名。如果不指定,默认会在当前目录下创建squirrel_cache.db文件。

3.2 数据的添加与读取

Squirrel使用set()方法添加数据,使用get()方法读取数据,支持Python所有原生数据类型,包括字符串、数值、列表、字典、元组等。

# 向文件缓存中添加数据
# 存储字符串类型
file_cache.set("username", "python_squirrel")
# 存储数值类型
file_cache.set("age", 25)
# 存储列表类型
file_cache.set("hobbies", ["coding", "reading", "hiking"])
# 存储字典类型
file_cache.set("user_info", {"id": 1001, "name": "小明", "email": "[email protected]"})

# 从文件缓存中读取数据
username = file_cache.get("username")
age = file_cache.get("age")
hobbies = file_cache.get("hobbies")
user_info = file_cache.get("user_info")

# 打印读取的数据
print(f"用户名: {username}")
print(f"年龄: {age}")
print(f"爱好: {hobbies}")
print(f"用户信息: {user_info}")

代码说明

  • set(key, value)方法接收两个参数,key为字符串类型的键名,value为需要存储的任意Python原生数据。
  • get(key)方法接收一个参数key,返回该键对应的数值,如果键不存在,则返回None
  • 运行代码后,控制台会输出以下内容:
  用户名: python_squirrel
  年龄: 25
  爱好: ['coding', 'reading', 'hiking']
  用户信息: {'id': 1001, 'name': '小明', 'email': '[email protected]'}

即使关闭程序后重新运行读取代码,数据依然会存在,因为我们使用的是文件存储后端。

3.3 设置数据的过期时间

在很多场景下,我们需要缓存的数据在一段时间后自动失效(例如验证码、临时会话信息),Squirrel的set()方法支持通过expire参数设置过期时间,单位为

# 存储一个有效期为60秒的验证码
file_cache.set("verify_code", "852369", expire=60)

# 立即读取,此时数据未过期
verify_code = file_cache.get("verify_code")
print(f"未过期的验证码: {verify_code}")  # 输出:未过期的验证码: 852369

# 等待60秒后再次读取,数据已过期,返回None
import time
time.sleep(60)
expired_code = file_cache.get("verify_code")
print(f"过期后的验证码: {expired_code}")  # 输出:过期后的验证码: None

代码说明

  • expire参数为可选参数,默认值为None,表示数据永久有效。当指定数值时,数据会在对应的秒数后自动失效。
  • 过期的数据会在下次调用get()方法时被检测并清理,不会占用存储空间。

3.4 数据的修改与删除

修改缓存数据的方法依然是set(),只需要对同一个键名重新赋值即可;删除数据则使用delete()方法,指定需要删除的键名即可。

# 修改已存在的数据
file_cache.set("username", "squirrel_python")
updated_username = file_cache.get("username")
print(f"修改后的用户名: {updated_username}")  # 输出:修改后的用户名: squirrel_python

# 删除指定键的数据
file_cache.delete("age")
deleted_age = file_cache.get("age")
print(f"删除后的age值: {deleted_age}")  # 输出:删除后的age值: None

# 批量删除多个键的数据
file_cache.delete_many(["hobbies", "user_info"])
hobbies_after_delete = file_cache.get("hobbies")
user_info_after_delete = file_cache.get("user_info")
print(f"删除后的hobbies值: {hobbies_after_delete}")  # 输出:删除后的hobbies值: None
print(f"删除后的user_info值: {user_info_after_delete}")  # 输出:删除后的user_info值: None

代码说明

  • delete(key)方法用于删除单个键值对,delete_many(keys)方法用于批量删除多个键值对,keys参数为一个包含多个键名的列表。
  • 被删除的数据会立即从存储后端中移除,无论是文件存储还是内存存储。

3.5 缓存数据的批量操作

除了单个数据的增删改查,Squirrel还支持批量添加和批量读取数据,这在需要一次性处理大量数据时可以显著提高效率。

# 批量添加数据
batch_data = {
    "key1": "value1",
    "key2": 100,
    "key3": [1, 2, 3],
    "key4": {"a": 1, "b": 2}
}
file_cache.set_many(batch_data)

# 批量读取数据
keys_to_get = ["key1", "key2", "key3", "key4"]
batch_result = file_cache.get_many(keys_to_get)

# 打印批量读取的结果
for key, value in batch_result.items():
    print(f"{key}: {value}")

代码说明

  • set_many(data)方法接收一个字典类型的参数data,字典中的每个键值对都会被添加到缓存中。
  • get_many(keys)方法接收一个列表类型的参数keys,返回一个包含所有键值对的字典,对于不存在的键,其对应的数值为None

3.6 缓存的清空与状态查询

如果需要清空所有缓存数据,可以使用clear()方法;如果需要查询缓存中当前的键数量,可以使用count()方法。

# 查询当前缓存中的键数量
key_count = file_cache.count()
print(f"缓存中的键数量: {key_count}")

# 清空所有缓存数据
file_cache.clear()

# 再次查询键数量,此时为0
empty_count = file_cache.count()
print(f"清空后的键数量: {empty_count}")

代码说明

  • count()方法返回缓存中有效键的数量,不包含已过期的键。
  • clear()方法会删除缓存中的所有数据,操作不可逆,使用时需要谨慎。

四、Squirrel库实际应用案例

为了帮助开发者更好地理解Squirrel库在实际项目中的使用场景,下面我们以用户登录状态缓存API接口数据缓存两个常见场景为例,编写完整的代码案例,展示如何将Squirrel库集成到Python项目中。

4.1 案例一:用户登录状态缓存

在Web开发或桌面应用开发中,用户登录后需要保持登录状态,避免每次操作都重新输入用户名和密码。使用Squirrel可以将用户的登录信息缓存到本地,程序重启后依然可以保持登录状态,直到用户主动退出登录。

from squirrel import Cache
import time

# 初始化文件缓存,存储用户登录信息
login_cache = Cache(backend="file", path="login_status.db")

def user_login(username: str, password: str) -> bool:
    """
    用户登录函数,模拟验证用户名和密码
    :param username: 用户名
    :param password: 密码
    :return: 登录成功返回True,失败返回False
    """
    # 模拟数据库中的用户信息
    db_users = {
        "admin": "admin123",
        "user1": "user123",
        "user2": "user234"
    }
    # 验证用户名和密码
    if username in db_users and db_users[username] == password:
        # 登录成功,缓存用户信息,有效期2小时(7200秒)
        login_cache.set(f"login_{username}", True, expire=7200)
        login_cache.set(f"user_{username}", {"username": username, "login_time": time.time()})
        print(f"用户 {username} 登录成功!")
        return True
    else:
        print("用户名或密码错误,登录失败!")
        return False

def check_login_status(username: str) -> bool:
    """
    检查用户是否处于登录状态
    :param username: 用户名
    :return: 已登录返回True,未登录返回False
    """
    login_status = login_cache.get(f"login_{username}")
    return login_status is not None and login_status

def user_logout(username: str) -> None:
    """
    用户退出登录,清除缓存中的登录状态
    :param username: 用户名
    """
    login_cache.delete(f"login_{username}")
    login_cache.delete(f"user_{username}")
    print(f"用户 {username} 已退出登录!")

# 测试登录功能
user_login("admin", "admin123")

# 检查登录状态
if check_login_status("admin"):
    user_info = login_cache.get(f"user_admin")
    login_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(user_info["login_time"]))
    print(f"用户 {user_info['username']} 于 {login_time} 登录,当前处于登录状态。")

# 退出登录
user_logout("admin")

# 再次检查登录状态
if not check_login_status("admin"):
    print(f"用户 admin 已退出登录。")

代码说明

  • user_login()函数模拟用户登录验证,验证通过后将登录状态和用户信息缓存到本地,有效期为2小时。
  • check_login_status()函数通过读取缓存中的登录状态键,判断用户是否处于登录状态。
  • user_logout()函数通过删除缓存中的登录状态键,实现用户退出登录功能。
  • 该案例适用于桌面应用、CLI工具等需要保持用户登录状态的场景,无需依赖数据库即可实现状态持久化。

4.2 案例二:API接口数据缓存

在调用第三方API接口时,频繁请求会导致接口限流、响应速度慢等问题。使用Squirrel可以将API返回的数据缓存到本地,在有效期内重复请求时直接读取缓存数据,从而提高程序的响应速度,减少对API接口的请求次数。

from squirrel import Cache
import requests
import time

# 初始化文件缓存,存储API接口数据
api_cache = Cache(backend="file", path="api_cache.db")

def get_weather_data(city: str) -> dict:
    """
    获取城市天气数据,优先读取缓存,缓存失效后调用API接口
    :param city: 城市名称
    :return: 天气数据字典
    """
    # 定义缓存键名
    cache_key = f"weather_{city}"
    # 尝试从缓存中读取数据
    cached_data = api_cache.get(cache_key)
    if cached_data is not None:
        print(f"从缓存中读取{city}的天气数据...")
        return cached_data

    # 缓存失效,调用API接口获取数据(这里使用模拟API)
    print(f"调用API获取{city}的天气数据...")
    # 模拟API请求延迟
    time.sleep(2)
    # 模拟API返回的数据
    api_data = {
        "city": city,
        "temperature": 22,
        "weather": "sunny",
        "humidity": 45,
        "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    }

    # 将API返回的数据缓存到本地,有效期10分钟(600秒)
    api_cache.set(cache_key, api_data, expire=600)
    return api_data

# 第一次调用,缓存未命中,调用API
weather_beijing = get_weather_data("北京")
print(f"北京天气: {weather_beijing}")

# 第二次调用,缓存命中,直接读取缓存
weather_beijing_cached = get_weather_data("北京")
print(f"北京天气(缓存): {weather_beijing_cached}")

代码说明

  • get_weather_data()函数实现了“缓存优先”的逻辑,首先尝试从缓存中读取数据,如果缓存存在且未过期,则直接返回缓存数据;如果缓存不存在或已过期,则调用API获取数据,并将数据缓存到本地。
  • 该案例中设置的缓存有效期为10分钟,意味着10分钟内重复调用该函数获取同一城市的天气数据,不会触发API请求,从而减少了API调用次数,提高了程序响应速度。
  • 实际项目中,可以将模拟API替换为真实的天气API接口,例如高德地图天气API、和风天气API等。

五、Squirrel库相关资源链接

  • Pypi地址:https://pypi.org/project/Squirrel
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Upgini:零代码特征工程与数据集增强教程

一、Upgini库核心概述

Upgini是一款面向数据科学和机器学习领域的Python自动化特征工程库,核心用途是帮助开发者快速为结构化数据集生成高质量特征、对接公开数据源完成数据增强,从而提升机器学习模型的预测性能。其工作原理是基于用户输入的目标数据集(含时间戳和目标列),通过内置的特征挖掘算法、统计变换以及外部数据源对接能力,自动生成数千个潜在特征,并通过特征重要性评估筛选出最优特征子集。

该库的优点十分突出:无需手动编写复杂的特征工程代码,支持时间序列数据的特征生成,可无缝对接多个公开数据集,内置特征筛选机制降低冗余;缺点则是对非结构化数据支持有限,部分高级功能需要依赖网络连接获取外部数据,且在超大规模数据集上的运行效率有待优化。Upgini采用Apache License 2.0开源协议,允许商用和二次开发,完全免费且对开发者友好。

二、Upgini库安装与环境配置

2.1 基础安装步骤

Upgini支持通过Python官方包管理工具pip进行安装,兼容Python 3.7及以上版本,建议在虚拟环境中安装以避免依赖冲突。对于技术小白来说,操作步骤非常简单,打开命令行终端,输入以下命令即可完成安装:

# 基础版安装
pip install upgini

# 完整版安装(包含所有依赖,推荐新手使用)
pip install "upgini[full]"

安装完成后,可通过以下代码验证是否安装成功:

import upgini
print(f"Upgini版本:{upgini.__version__}")

若终端输出对应的版本号(如0.2.15),则说明安装成功;若出现报错,可尝试升级pip后重新安装,升级命令如下:

pip install --upgrade pip

2.2 环境依赖说明

Upgini的运行依赖多个常见的数据科学库,包括pandas(数据处理)、numpy(数值计算)、scikit-learn(特征评估)、lightgbm(默认特征重要性模型)等。安装upgini[full]时会自动安装这些依赖,无需手动下载。对于新手来说,建议使用Anaconda环境,该环境预装了大部分数据科学库,能进一步降低安装难度。

三、Upgini核心功能与使用示例

Upgini的核心功能围绕自动化特征生成数据集增强展开,其使用流程遵循“导入数据→配置特征生成器→生成并筛选特征→导出结果”的步骤,全程无需手动编写特征工程代码。以下通过具体案例演示核心功能的使用方法。

3.1 数据准备:导入示例数据集

在使用Upgini之前,需要准备一份结构化数据集,数据集需包含目标列(待预测的变量)和时间戳列(可选,用于时间序列特征生成)。本文以经典的房价预测数据集为例,该数据集包含房屋面积、卧室数量、建造年份等基础特征,目标是预测房屋价格。

首先导入pandas库加载数据集:

import pandas as pd

# 加载本地房价数据集(新手可直接使用sklearn的示例数据集)
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
df = pd.DataFrame(data=housing.data, columns=housing.feature_names)
df["MedHouseVal"] = housing.target  # 目标列:房屋中位数价格
df["timestamp"] = pd.date_range(start="2020-01-01", periods=len(df), freq="D")  # 添加时间戳列
print(df.head())
print(f"数据集形状:{df.shape}")

代码说明:

  • 首先从sklearn导入加利福尼亚房价数据集,该数据集是机器学习领域的经典数据集,包含20640条样本和8个基础特征。
  • 为数据集添加目标列MedHouseVal(房屋中位数价格)和时间戳列timestamp,时间戳列是Upgini生成时间相关特征的关键,例如“近30天平均房屋面积”“年度房价波动”等。
  • 最后打印数据集的前5行和形状,确认数据加载成功。

3.2 初始化特征生成器

Upgini的核心类是FeatureEnricher,该类负责特征生成、筛选和增强的全流程。初始化时需要指定目标列名称、时间戳列名称(可选)以及特征筛选的评估指标。代码示例如下:

from upgini import FeatureEnricher, SearchKey
from upgini.metadata import CVType

# 初始化特征生成器
enricher = FeatureEnricher(
    search_keys={
        # 定义时间戳列,用于生成时间相关特征
        "timestamp": SearchKey.DATE
    },
    # 指定目标列
    target_column="MedHouseVal",
    # 交叉验证类型:时间序列交叉验证(适合预测任务)
    cv=CVType.TIME_SERIES,
    # 特征筛选指标:均方根误差(RMSE)
    eval_metric="rmse"
)

代码说明:

  • search_keys参数用于定义数据集中的关键列类型,这里将timestamp列指定为日期类型(SearchKey.DATE),Upgini会基于该列生成时间窗口特征。
  • target_column参数指定待预测的目标列,特征生成器会围绕该列筛选对预测最有帮助的特征。
  • cv参数设置交叉验证类型,时间序列任务推荐使用CVType.TIME_SERIES,避免数据泄露;若为普通分类/回归任务,可使用CVType.RANDOM
  • eval_metric参数设置特征筛选的评估指标,回归任务常用rmse(均方根误差)或mae(平均绝对误差),分类任务常用auc(曲线下面积)。

3.3 自动生成与筛选特征

初始化特征生成器后,调用fit方法即可基于输入的数据集自动生成特征。该过程会分为三步:首先分析基础特征的分布和相关性,然后生成数千个潜在特征(包括统计特征、时间窗口特征、组合特征等),最后基于指定的评估指标筛选出最优特征子集。代码示例如下:

# 基于训练集生成特征
enricher.fit(df, eval_set=[(df, "validation")])

代码说明:

  • fit方法的第一个参数是训练数据集,第二个参数eval_set指定验证集,这里使用同一数据集作为验证集(新手可直接使用该设置)。
  • 运行该代码后,Upgini会在终端输出特征生成的进度,包括生成的特征总数、筛选后的特征数、以及特征对模型性能的提升幅度。例如:“生成了3250个特征,筛选出120个最优特征,验证集RMSE降低了18.5%”。
  • 特征生成过程的时间取决于数据集大小,小型数据集(万级样本)通常在1-2分钟内完成,大型数据集可能需要更长时间。

3.4 查看生成的特征与重要性

特征生成完成后,可通过get_features()方法查看筛选后的特征列表,通过feature_importance_属性查看特征的重要性排名,这有助于开发者理解哪些特征对预测任务最有帮助。代码示例如下:

# 查看筛选后的特征列表
generated_features = enricher.get_features()
print("生成的最优特征列表:")
print(generated_features[["feature_name", "importance"]].head(10))

# 可视化特征重要性(推荐新手使用)
import matplotlib.pyplot as plt
feature_importance = enricher.feature_importance_
feature_importance = feature_importance.sort_values(by="importance", ascending=False).head(10)
plt.figure(figsize=(12, 6))
plt.bar(feature_importance["feature_name"], feature_importance["importance"])
plt.xticks(rotation=45, ha="right")
plt.title("Top 10 Most Important Features")
plt.tight_layout()
plt.show()

代码说明:

  • get_features()方法返回一个DataFrame,包含特征名称、重要性得分、数据类型等信息,head(10)用于查看前10个最重要的特征。
  • feature_importance_属性同样返回一个DataFrame,通过排序和可视化可以直观地看到特征的重要性排名,例如“房屋面积的30天滑动平均值”“年度建造房屋数量的增长率”等特征可能会排在前列。
  • 可视化部分使用matplotlib库绘制柱状图,新手若未安装该库,可通过pip install matplotlib命令安装。

3.5 应用生成的特征增强数据集

筛选出最优特征后,调用transform方法即可将这些特征添加到原始数据集中,生成增强后的数据集,用于后续的机器学习模型训练。代码示例如下:

# 生成增强后的数据集
enhanced_df = enricher.transform(df)
print(f"原始数据集形状:{df.shape}")
print(f"增强后数据集形状:{enhanced_df.shape}")
print("增强后数据集的前5行:")
print(enhanced_df.head())

代码说明:

  • transform方法接收原始数据集,返回添加了生成特征的新数据集。例如原始数据集是(20640, 10),增强后可能变成(20640, 130),新增了120个特征。
  • 打印增强后数据集的形状和前5行,可直观看到新增的特征列,这些列的名称通常包含明确的含义,例如“MedInc_rolling_mean_30d”(收入中位数的30天滑动平均值)、“AveRooms_yearly_growth”(平均房间数的年度增长率)等。

3.6 对接外部数据源增强数据

Upgini的高级功能之一是对接外部公开数据源(如天气数据、人口统计数据、经济指标数据等),为数据集补充更多维度的特征。该功能需要网络连接,且部分数据源需要注册API密钥(新手可先使用免费数据源)。以下以对接天气数据源为例,演示外部数据增强的方法:

from upgini import ExternalDataset

# 加载外部天气数据集(示例:美国加州天气数据)
weather_dataset = ExternalDataset.from_csv(
    "https://example.com/california_weather_2020_2023.csv",  # 外部数据源URL
    search_keys={"date": SearchKey.DATE},  # 外部数据的时间戳列
    features=["temperature", "rainfall", "humidity"]  # 需要提取的特征
)

# 初始化特征生成器并添加外部数据集
enricher_with_external = FeatureEnricher(
    search_keys={"timestamp": SearchKey.DATE},
    target_column="MedHouseVal",
    cv=CVType.TIME_SERIES,
    eval_metric="rmse"
)

# 添加外部数据集
enricher_with_external.add_external_datasets([weather_dataset])

# 生成包含外部特征的增强数据集
enhanced_df_with_external = enricher_with_external.fit_transform(df, eval_set=[(df, "validation")])
print(f"添加外部数据后数据集形状:{enhanced_df_with_external.shape}")

代码说明:

  • ExternalDataset.from_csv方法用于加载外部CSV格式的数据集,需要指定数据源的URL、关键列类型和需要提取的特征列。
  • add_external_datasets方法将外部数据集添加到特征生成器中,Upgini会自动将外部数据与原始数据按照时间戳列进行关联。
  • fit_transform方法是fittransform的组合,可一次性完成特征生成和数据集增强,添加外部数据后,生成的特征会包含天气相关的维度,例如“温度与房屋面积的乘积”“降雨天数与房价的相关性特征”等,进一步提升模型的预测能力。

四、Upgini实战案例:房价预测模型性能提升

为了直观展示Upgini的效果,本文构建一个对比实验:分别使用原始数据集和Upgini增强后的数据集训练LightGBM回归模型,对比模型的预测性能。

4.1 实验准备:划分训练集与测试集

首先将增强前后的数据集划分为训练集和测试集,时间序列任务需按照时间顺序划分,避免数据泄露:

from sklearn.model_selection import train_test_split

# 原始数据集划分
X_original = df.drop(["MedHouseVal", "timestamp"], axis=1)
y_original = df["MedHouseVal"]
X_train_original, X_test_original, y_train_original, y_test_original = train_test_split(
    X_original, y_original, test_size=0.2, shuffle=False  # 时间序列任务shuffle=False
)

# 增强后数据集划分
X_enhanced = enhanced_df_with_external.drop(["MedHouseVal", "timestamp"], axis=1)
y_enhanced = enhanced_df_with_external["MedHouseVal"]
X_train_enhanced, X_test_enhanced, y_train_enhanced, y_test_enhanced = train_test_split(
    X_enhanced, y_enhanced, test_size=0.2, shuffle=False
)

代码说明:

  • 原始数据集删除目标列和时间戳列后作为特征矩阵X_original,目标列作为标签y_original
  • 增强后数据集的处理方式与原始数据集一致,特征矩阵X_enhanced包含原始特征和生成的特征。
  • shuffle=False确保按照时间顺序划分训练集和测试集,符合时间序列预测的业务场景。

4.2 训练LightGBM模型并评估性能

分别使用原始特征和增强特征训练LightGBM模型,评估指标采用均方根误差(RMSE)和决定系数(R²),R²越接近1表示模型拟合效果越好:

import lightgbm as lgb
from sklearn.metrics import mean_squared_error, r2_score

# 定义模型训练函数
def train_and_evaluate(X_train, X_test, y_train, y_test, model_name):
    # 初始化LightGBM回归模型
    model = lgb.LGBMRegressor(
        n_estimators=100,
        learning_rate=0.1,
        random_state=42
    )
    # 训练模型
    model.fit(X_train, y_train)
    # 预测测试集
    y_pred = model.predict(X_test)
    # 计算评估指标
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    r2 = r2_score(y_test, y_pred)
    print(f"=== {model_name} ===")
    print(f"测试集RMSE:{rmse:.4f}")
    print(f"测试集R²:{r2:.4f}")
    print("-" * 30)

# 使用原始特征训练模型
train_and_evaluate(X_train_original, X_test_original, y_train_original, y_test_original, "原始特征模型")

# 使用增强特征训练模型
train_and_evaluate(X_train_enhanced, X_test_enhanced, y_train_enhanced, y_test_enhanced, "增强特征模型")

代码说明:

  • 定义train_and_evaluate函数,封装模型训练、预测和评估的流程,方便重复调用。
  • LightGBM模型的参数设置为默认值,确保实验的公平性。
  • 运行该代码后,终端会输出两个模型的评估指标,通常情况下,增强特征模型的RMSE会显著低于原始特征模型,R²会显著高于原始特征模型。例如:原始特征模型的RMSE为0.6523,R²为0.5812;增强特征模型的RMSE为0.4215,R²为0.7986,模型性能提升明显。

4.3 实验结论分析

通过对比实验可以发现,Upgini生成的特征能够有效提升机器学习模型的预测性能,原因主要有三点:

  1. 特征维度扩展:生成了大量人工难以想到的特征,覆盖了统计、时间、组合等多个维度,丰富了数据的信息密度。
  2. 特征质量优化:通过内置的筛选机制剔除了冗余特征和噪声特征,保留了对目标列最具预测价值的特征。
  3. 外部数据补充:对接外部数据源后,引入了与业务场景相关的额外信息,进一步提升了模型的泛化能力。

对于技术小白来说,无需掌握复杂的特征工程理论,仅需几行代码即可实现模型性能的大幅提升,这正是Upgini的核心价值所在。

五、Upgini常见问题与解决方案

在使用Upgini的过程中,新手可能会遇到一些常见问题,以下列出对应的解决方案:

5.1 问题1:特征生成速度过慢

  • 原因:数据集规模过大,或本地计算资源不足。
  • 解决方案
  1. 对数据集进行抽样,使用部分数据进行特征生成,例如df_sample = df.sample(n=10000, random_state=42)
  2. 减少生成特征的数量,通过设置FeatureEnrichermax_features参数,例如max_features=50
  3. 升级硬件配置,增加内存和CPU核心数,或使用GPU加速(需安装对应的LightGBM GPU版本)。

5.2 问题2:特征生成后模型性能未提升

  • 原因:目标列与特征的相关性较弱,或数据集存在严重的缺失值、异常值。
  • 解决方案
  1. 检查原始数据集的质量,使用df.isnull().sum()查看缺失值,使用df.describe()查看异常值,提前进行数据清洗。
  2. 调整FeatureEnricher的参数,例如更换评估指标(eval_metric)、调整交叉验证类型(cv)。
  3. 添加更多的外部数据源,补充与目标列相关的业务信息。

5.3 问题3:外部数据源加载失败

  • 原因:网络连接问题,或数据源URL无效、权限不足。
  • 解决方案
  1. 检查网络连接,确保能够正常访问外部数据源的URL。
  2. 下载外部数据集到本地,使用ExternalDataset.from_csv("local_file_path.csv")加载本地文件。
  3. 确认数据源的权限,部分商用数据源需要注册API密钥并在代码中配置。

六、Upgini相关资源链接

  • Pypi地址:https://pypi.org/project/Upgini
  • Github地址:https://github.com/upgini/upgini
  • 官方文档地址:https://docs.upgini.com

关注我,每天分享一个实用的Python自动化工具。

Python实用工具库excalibur:PDF表格提取与数据处理实战教程

一、excalibur库核心概述

excalibur是一款基于Python开发的PDF表格提取与数据处理工具库,其核心工作原理是依托计算机视觉技术与OCR(光学字符识别)算法,对PDF文件中的表格区域进行精准定位、单元格分割与内容提取,最终将提取的表格数据转换为Excel、CSV等易处理的结构化格式。该库的优点在于操作简单、对扫描版PDF兼容性强,支持批量处理多份文件;缺点是对复杂嵌套表格的识别精度有待提升,处理大文件时耗时较长。excalibur采用MIT开源许可证,允许开发者自由使用、修改和分发,无商业使用限制。

二、excalibur库安装与环境配置

2.1 安装前置依赖

excalibur的运行依赖于Tesseract OCR引擎和Poppler PDF处理库,不同操作系统的安装方式有所差异:

  1. Windows系统
    • 安装Tesseract OCR:前往UB-Mannheim/tesseract下载对应版本的安装包,安装时需勾选“Add to PATH”选项,或手动将安装路径(如C:\Program Files\Tesseract-OCR)添加到系统环境变量。
    • 安装Poppler:下载Poppler Windows压缩包,解压后将bin目录路径添加到系统环境变量。
  2. macOS系统
    打开终端,通过Homebrew执行以下命令安装依赖:
    bash brew install tesseract brew install poppler
  3. Linux系统(以Ubuntu为例)
    打开终端,执行以下命令安装依赖:
    bash sudo apt-get update sudo apt-get install tesseract-ocr sudo apt-get install poppler-utils

2.2 安装excalibur库

完成依赖安装后,使用pip命令即可安装excalibur库,命令如下:

pip install excalibur

安装完成后,可在Python终端中执行以下代码验证安装是否成功:

import excalibur
print(f"excalibur库版本:{excalibur.__version__}")

若终端输出库的版本号,则说明安装成功;若出现“ModuleNotFoundError”,需检查依赖是否安装完整,或重新执行pip安装命令。

三、excalibur库核心功能与基础用法

excalibur库的核心功能分为单PDF表格提取批量PDF处理提取结果导出,以下结合代码示例详细讲解每个功能的使用方法。

3.1 单PDF文件表格提取

excalibur提取单PDF文件表格的核心步骤为:初始化提取器、加载PDF文件、定位表格区域、提取表格内容。以下是完整代码示例:

from excalibur.pdf_processing import PDFTableExtractor

# 1. 初始化PDF表格提取器
extractor = PDFTableExtractor()

# 2. 加载目标PDF文件(替换为你的PDF文件路径)
pdf_path = "example_table.pdf"
extractor.load_pdf(pdf_path)

# 3. 定位并提取PDF中的所有表格
tables = extractor.extract_tables()

# 4. 遍历输出提取的表格内容
for idx, table in enumerate(tables):
    print(f"===== 提取的第{idx+1}个表格 =====")
    # 打印表格的行数和列数
    print(f"表格行数:{len(table)}, 列数:{len(table[0]) if table else 0}")
    # 打印表格的每一行数据
    for row in table:
        print(row)

代码说明

  • PDFTableExtractor():初始化表格提取器对象,该对象包含PDF加载、表格定位、内容提取等核心方法。
  • load_pdf(pdf_path):加载指定路径的PDF文件,支持相对路径和绝对路径。
  • extract_tables():自动识别PDF中的所有表格,返回一个列表,列表中的每个元素是一个二维列表,对应一个表格的行和列数据。
  • 最后通过循环遍历提取的表格,输出每个表格的行数、列数和具体内容。

3.2 自定义表格提取参数

默认情况下,excalibur会提取PDF中的所有表格,但在实际应用中,我们可能需要提取指定页码的表格,或调整识别精度。以下是自定义参数的代码示例:

from excalibur.pdf_processing import PDFTableExtractor

# 初始化提取器并设置自定义参数
extractor = PDFTableExtractor(
    min_confidence=0.7,  # 设置最小识别置信度,低于该值的表格将被过滤
    lang="eng+chi_sim"   # 设置OCR识别语言,支持英文和简体中文
)

# 加载PDF文件
pdf_path = "multi_page_table.pdf"
extractor.load_pdf(pdf_path)

# 提取指定页码的表格(页码从0开始计数,提取第2页和第3页的表格)
target_pages = [1, 2]
tables = extractor.extract_tables(pages=target_pages)

# 输出提取结果
for idx, table in enumerate(tables):
    print(f"第{target_pages[idx]+1}页表格内容:")
    for row in table:
        print(row)

代码说明

  • min_confidence:设置表格识别的最小置信度,取值范围为0-1,值越高,识别的表格精度越高,但可能会过滤掉部分模糊表格。
  • lang:设置OCR识别的语言,默认值为“eng”,添加“chi_sim”后可支持简体中文识别,需确保Tesseract OCR已安装对应的语言包。
  • pages参数:指定需要提取表格的页码,传入一个整数列表,列表中的元素为页码索引(从0开始)。

3.3 提取结果导出为Excel/CSV文件

excalibur支持将提取的表格数据直接导出为Excel或CSV格式,方便后续数据处理。以下是导出功能的代码示例:

from excalibur.pdf_processing import PDFTableExtractor
from excalibur.utils import export_tables

# 提取PDF表格
extractor = PDFTableExtractor()
extractor.load_pdf("example_table.pdf")
tables = extractor.extract_tables()

# 1. 导出为Excel文件
excel_path = "extracted_tables.xlsx"
export_tables(
    tables=tables,
    output_path=excel_path,
    file_format="xlsx"
)
print(f"表格已成功导出到Excel文件:{excel_path}")

# 2. 导出为CSV文件
csv_path = "extracted_tables.csv"
export_tables(
    tables=tables,
    output_path=csv_path,
    file_format="csv",
    encoding="utf-8"  # 设置CSV文件编码,避免中文乱码
)
print(f"表格已成功导出到CSV文件:{csv_path}")

代码说明

  • export_tables():excalibur提供的工具函数,用于将提取的表格列表导出为指定格式的文件。
  • file_format参数:可选值为“xlsx”和“csv”,分别对应Excel和CSV格式。
  • encoding参数:仅在导出CSV文件时有效,设置为“utf-8”可解决中文乱码问题。

四、批量处理多个PDF文件实战

在实际工作中,我们经常需要处理多个PDF文件,excalibur结合Python的文件操作功能,可轻松实现批量处理。以下是批量提取多个PDF表格并导出的完整案例。

4.1 批量处理代码实现

import os
from excalibur.pdf_processing import PDFTableExtractor
from excalibur.utils import export_tables

# 定义PDF文件夹路径和输出文件夹路径
pdf_folder = "pdf_files"  # 存放待处理PDF的文件夹
output_folder = "extracted_results"  # 存放导出结果的文件夹

# 创建输出文件夹(若不存在)
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 初始化表格提取器
extractor = PDFTableExtractor(min_confidence=0.6, lang="eng+chi_sim")

# 遍历PDF文件夹中的所有PDF文件
for filename in os.listdir(pdf_folder):
    if filename.endswith(".pdf"):
        # 拼接PDF文件的完整路径
        pdf_path = os.path.join(pdf_folder, filename)
        print(f"正在处理文件:{filename}")

        try:
            # 加载并提取表格
            extractor.load_pdf(pdf_path)
            tables = extractor.extract_tables()

            if not tables:
                print(f"文件{filename}中未检测到表格,跳过导出")
                continue

            # 生成输出文件名(与PDF文件名一致,后缀改为xlsx)
            output_filename = os.path.splitext(filename)[0] + ".xlsx"
            output_path = os.path.join(output_folder, output_filename)

            # 导出表格到Excel文件
            export_tables(tables, output_path, file_format="xlsx")
            print(f"文件{filename}处理完成,结果已保存到:{output_path}")

        except Exception as e:
            print(f"处理文件{filename}时出错:{str(e)}")
            continue

print("所有PDF文件处理完成!")

代码说明

  • os.listdir(pdf_folder):遍历指定文件夹中的所有文件,筛选出后缀为“.pdf”的文件进行处理。
  • os.path.splitext(filename)[0]:获取PDF文件的文件名(不含后缀),用于生成对应的Excel文件名。
  • try-except块:捕获处理过程中的异常(如文件损坏、权限不足等),避免单个文件处理失败导致整个批量任务终止。

4.2 批量处理注意事项

  1. 确保pdf_folder文件夹中仅存放需要处理的PDF文件,避免其他类型文件干扰。
  2. 处理大文件或大量文件时,建议设置合理的min_confidence值,平衡识别精度和处理速度。
  3. 若PDF文件包含加密或权限限制,需先解除限制后再进行处理,否则会抛出“PermissionError”异常。

五、复杂表格提取优化技巧

对于嵌套表格、合并单元格表格等复杂结构,excalibur的默认识别效果可能不佳,以下是几种优化技巧,帮助提升复杂表格的提取精度。

5.1 调整单元格分割阈值

excalibur通过调整单元格分割阈值,可优化合并单元格的识别效果,代码示例如下:

from excalibur.pdf_processing import PDFTableExtractor

# 初始化提取器并调整分割阈值
extractor = PDFTableExtractor(
    cell_split_threshold=0.8,  # 单元格分割阈值,值越高越容易识别合并单元格
    min_confidence=0.6
)

# 加载包含合并单元格的PDF
extractor.load_pdf("complex_table.pdf")
tables = extractor.extract_tables()

# 输出优化后的提取结果
for table in tables:
    for row in table:
        print(row)

代码说明

  • cell_split_threshold:单元格分割阈值,取值范围为0-1,值越高,提取器越倾向于将相邻的单元格视为独立单元格,适用于合并单元格较多的表格。

5.2 结合人工校对修正提取结果

对于识别误差较大的表格,可通过人工校对修正提取结果,以下是修正数据的代码示例:

from excalibur.pdf_processing import PDFTableExtractor

# 提取表格
extractor = PDFTableExtractor()
extractor.load_pdf("complex_table.pdf")
tables = extractor.extract_tables()

# 假设第一个表格存在识别误差,手动修正
corrected_table = []
for row_idx, row in enumerate(tables[0]):
    corrected_row = row.copy()
    # 修正第2行第3列的数据
    if row_idx == 1:
        corrected_row[2] = "修正后的数据"
    # 修正第4行第1列的数据
    if row_idx == 3:
        corrected_row[0] = "2024-01-01"
    corrected_table.append(corrected_row)

# 替换原表格中的错误数据
tables[0] = corrected_table

# 导出修正后的表格
from excalibur.utils import export_tables
export_tables(tables, "corrected_tables.xlsx", file_format="xlsx")

代码说明:通过遍历提取的表格数据,定位错误数据的位置并手动修正,再将修正后的表格导出为Excel文件,适用于对数据精度要求较高的场景。

六、excalibur库常见问题与解决方案

6.1 OCR识别中文乱码

问题现象:提取的表格中中文内容显示为乱码或方框。
解决方案

  1. 确保Tesseract OCR已安装简体中文语言包(下载地址:tesseract-ocr/tessdata),将chi_sim.traineddata文件放入Tesseract OCR的tessdata目录。
  2. 初始化提取器时,设置lang="eng+chi_sim"参数。

6.2 无法识别扫描版PDF表格

问题现象:提取扫描版PDF时,返回的表格列表为空。
解决方案

  1. 检查Tesseract OCR是否安装正确,可在终端执行tesseract --version验证。
  2. 降低min_confidence参数值(如设置为0.5),提高提取器对模糊表格的识别灵敏度。

6.3 处理大文件时内存溢出

问题现象:处理几十MB的大PDF文件时,程序抛出“MemoryError”异常。
解决方案

  1. 分页码提取表格,避免一次性加载整个PDF文件。
  2. 关闭其他占用内存的程序,或增加Python进程的内存限制。

七、excalibur库相关资源

  • Pypi地址:https://pypi.org/project/excalibur
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:rows库快速入门与实战指南

一、rows库核心概述

rows库是一款轻量级的Python数据处理工具,专注于结构化数据的读取、转换与导出,支持CSV、JSON、HTML、SQLite等多种常见数据格式,无需手动编写格式解析代码即可实现数据的无缝处理。其工作原理是通过统一的Table对象抽象各类数据源,将不同格式的数据转化为一致的内存数据结构,再提供简洁的API完成数据操作。

该库的优点是API简洁易用、零配置开箱即用、格式兼容性强;缺点是对超大规模数据集的处理效率较低,且高级数据清洗功能需依赖其他库。rows的开源协议为GNU Lesser General Public License v3.0 (LGPLv3),允许自由使用、修改和分发。

二、rows库安装与环境准备

2.1 安装方式

rows库可通过Python官方包管理工具pip直接安装,适用于Python 3.6及以上版本,在命令行中执行以下命令即可完成安装:

pip install rows

若需要支持更多数据格式(如Excel、Parquet),可安装扩展依赖包:

pip install rows[all]

2.2 验证安装

安装完成后,可在Python交互式环境中验证是否安装成功,输入以下代码无报错则说明安装正常:

import rows
print(rows.__version__)

执行后会输出当前rows库的版本号,例如0.6.1

三、rows库核心API与基础用法

rows库的核心操作围绕数据读取数据操作数据导出三个环节展开,所有操作都基于统一的Table对象,下面分步骤详细讲解。

3.1 数据读取:从多种格式加载数据

rows库支持自动识别数据源格式,无需指定格式类型即可读取,以下是常见格式的读取示例。

3.1.1 读取CSV文件

首先准备一个示例CSV文件students.csv,内容如下:

name,age,gender,score
Alice,18,Female,92
Bob,19,Male,85
Charlie,20,Male,78
Diana,18,Female,95

使用rows库读取该文件的代码如下:

import rows

# 读取CSV文件
table = rows.import_from_csv("students.csv")

# 查看Table对象的字段名
print(table.field_names)
# 输出:('name', 'age', 'gender', 'score')

# 遍历数据行
for row in table:
    print(f"姓名:{row.name},年龄:{row.age},成绩:{row.score}")

代码说明

  • rows.import_from_csv()函数接收文件路径作为参数,返回一个Table对象。
  • Table对象的field_names属性存储了数据的列名,返回一个元组。
  • 遍历Table对象时,每一个元素都是一个行对象,可通过列名直接访问对应的值。

3.1.2 读取JSON文件

准备示例JSON文件students.json,内容如下:

[
    {"name": "Alice", "age": 18, "gender": "Female", "score": 92},
    {"name": "Bob", "age": 19, "gender": "Male", "score": 85},
    {"name": "Charlie", "age": 20, "gender": "Male", "score": 78},
    {"name": "Diana", "age": 18, "gender": "Female", "score": 95}
]

读取JSON文件的代码与读取CSV类似,仅需替换函数名:

import rows

# 读取JSON文件
table = rows.import_from_json("students.json")

# 访问指定行的指定字段
print(f"第一个学生的成绩:{table[0].score}")
# 输出:第一个学生的成绩:92

代码说明

  • rows.import_from_json()专门用于读取JSON格式数据。
  • Table对象支持通过索引访问指定行,与列表的索引用法一致。

3.1.3 读取HTML表格

rows库还能直接解析HTML页面中的表格数据,例如有一个students.html文件,内容如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>学生信息表</title>
</head>
<body>
    <table id="student-table">
        <thead>
            <tr>
                <th>name</th>
                <th>age</th>
                <th>gender</th>
                <th>score</th>
            </tr>
        </thead>
        <tbody>
            <tr>
                <td>Alice</td>
                <td>18</td>
                <td>Female</td>
                <td>92</td>
            </tr>
            <tr>
                <td>Bob</td>
                <td>19</td>
                <td>Male</td>
                <td>85</td>
            </tr>
        </tbody>
    </table>
</body>
</html>

读取HTML表格的代码如下:

import rows

# 读取HTML文件中的表格
table = rows.import_from_html("students.html")

# 查看数据行数
print(f"表格共有 {len(table)} 行数据")
# 输出:表格共有 2 行数据

代码说明

  • rows.import_from_html()默认读取HTML中第一个表格,若需读取指定表格,可通过idclass筛选,例如rows.import_from_html("students.html", id_="student-table")

3.2 数据操作:筛选、排序与转换

获取Table对象后,可对数据进行筛选、排序等操作,rows库支持原生Python语法结合自身API完成这些操作。

3.2.1 数据筛选

筛选出成绩大于90分的学生,代码如下:

import rows

# 读取CSV数据
table = rows.import_from_csv("students.csv")

# 筛选成绩>90的学生
high_score_students = [row for row in table if row.score > 90]

# 输出筛选结果
for student in high_score_students:
    print(f"高分学生:{student.name},成绩:{student.score}")

代码说明

  • 利用Python列表推导式遍历Table对象,结合条件判断实现数据筛选。
  • 筛选结果是一个包含行对象的列表,可直接遍历访问字段。

3.2.2 数据排序

对学生数据按年龄升序排列,代码如下:

import rows

table = rows.import_from_csv("students.csv")

# 按年龄升序排序
sorted_table = rows.sort(table, key="age")

# 输出排序后的结果
for row in sorted_table:
    print(f"姓名:{row.name},年龄:{row.age}")

代码说明

  • rows.sort()函数接收两个关键参数:table为待排序的Table对象,key为排序依据的字段名。
  • 若需降序排序,可添加reverse=True参数,例如rows.sort(table, key="score", reverse=True)

3.2.3 数据转换

Table对象转换为Python字典列表,方便与其他库(如pandas)配合使用,代码如下:

import rows

table = rows.import_from_csv("students.csv")

# 转换为字典列表
dict_list = rows.export_to_dicts(table)

print(dict_list)
# 输出:[{'name': 'Alice', 'age': 18, 'gender': 'Female', 'score': 92}, ...]

代码说明

  • rows.export_to_dicts()函数将Table对象转换为字典列表,每个字典的键为字段名,值为对应的数据。

3.3 数据导出:保存为多种格式

处理完成的数据可通过rows库导出为CSV、JSON、SQLite等格式,满足不同场景的需求。

3.3.1 导出为CSV文件

将筛选后的高分学生数据导出为新的CSV文件,代码如下:

import rows

table = rows.import_from_csv("students.csv")
high_score_students = [row for row in table if row.score > 90]

# 将列表转换为Table对象
new_table = rows.Table(rows.fields_from_table(table), high_score_students)

# 导出为CSV文件
rows.export_to_csv(new_table, "high_score_students.csv")

代码说明

  • 先通过列表推导式筛选数据,再用rows.Table()将列表转换为Table对象,其中rows.fields_from_table(table)用于获取原表的字段结构。
  • rows.export_to_csv()接收Table对象和目标文件路径,完成导出。

3.3.2 导出为JSON文件

将排序后的数据导出为JSON文件,代码如下:

import rows

table = rows.import_from_csv("students.csv")
sorted_table = rows.sort(table, key="score", reverse=True)

# 导出为JSON文件
rows.export_to_json(sorted_table, "sorted_students.json")

代码说明

  • rows.export_to_json()函数的用法与导出CSV类似,直接传入Table对象和目标路径即可。

3.3.3 导出为SQLite数据库

将学生数据导出为SQLite数据库表,方便后续的数据库操作,代码如下:

import rows

table = rows.import_from_csv("students.csv")

# 导出为SQLite数据库,表名为students
rows.export_to_sqlite(table, "students.db", table_name="students")

代码说明

  • rows.export_to_sqlite()函数接收三个参数:Table对象、数据库文件路径、数据库表名。
  • 执行后会生成一个students.db文件,可使用SQLite工具或Python的sqlite3库连接查询。

四、rows库实战案例:多格式数据整合分析

本案例模拟一个实际场景:从CSV、JSON、HTML三种不同格式的文件中读取学生数据,合并后进行统一分析,最后导出为SQLite数据库。

4.1 准备数据源

  1. CSV数据源students_1.csv
    csv name,age,gender,score Eve,20,Female,88 Frank,19,Male,90
  2. JSON数据源students_2.json
    json [ {"name": "Grace", "age": 18, "gender": "Female", "score": 93}, {"name": "Henry", "age": 21, "gender": "Male", "score": 82} ]
  3. HTML数据源students_3.html
    html ¨K51K

4.2 数据读取与合并

编写代码读取三个数据源并合并为一个Table对象:

import rows

# 读取不同格式的数据源
table_csv = rows.import_from_csv("students_1.csv")
table_json = rows.import_from_json("students_2.json")
table_html = rows.import_from_html("students_3.html")

# 合并所有数据行
all_rows = list(table_csv) + list(table_json) + list(table_html)

# 创建合并后的Table对象
merged_table = rows.Table(rows.fields_from_table(table_csv), all_rows)

# 查看合并后的数据行数
print(f"合并后共有 {len(merged_table)} 条学生数据")

代码说明

  • 分别读取三种格式的数据,得到三个独立的Table对象。
  • 将每个Table对象转换为列表,再通过列表相加实现数据行合并。
  • 利用第一个表的字段结构创建新的Table对象,确保合并后的数据结构一致。

4.3 数据分析与处理

对合并后的数据进行以下分析:

  1. 计算所有学生的平均成绩
  2. 筛选出年龄小于20岁的学生
  3. 按成绩降序排序
import rows

# 延续上一步的merged_table
# 1. 计算平均成绩
total_score = sum(row.score for row in merged_table)
average_score = total_score / len(merged_table)
print(f"所有学生的平均成绩:{average_score:.2f}")

# 2. 筛选年龄小于20岁的学生
young_students = [row for row in merged_table if row.age < 20]
young_table = rows.Table(rows.fields_from_table(merged_table), young_students)
print(f"年龄小于20岁的学生共有 {len(young_table)} 人")

# 3. 按成绩降序排序
sorted_table = rows.sort(merged_table, key="score", reverse=True)
print("成绩排名前三的学生:")
for i in range(3):
    print(f"第{i+1}名:{sorted_table[i].name},成绩:{sorted_table[i].score}")

代码说明

  • 利用生成器表达式计算成绩总和,再除以数据行数得到平均成绩。
  • 筛选年龄小于20岁的学生后,转换为Table对象以便后续导出。
  • 按成绩降序排序后,通过索引获取前三名学生的信息。

4.4 结果导出

将排序后的完整数据导出为SQLite数据库,将年龄小于20岁的学生数据导出为CSV文件:

import rows

# 延续上一步的sorted_table和young_table
# 导出排序后的数据到SQLite
rows.export_to_sqlite(sorted_table, "merged_students.db", table_name="all_students")

# 导出年轻学生数据到CSV
rows.export_to_csv(young_table, "young_students.csv")

print("数据导出完成!")

代码说明

  • 排序后的完整数据存入SQLite数据库,方便后续的查询和管理。
  • 年轻学生数据导出为CSV文件,便于快速查看和分享。

五、rows库常见问题与解决方案

5.1 数据类型自动识别错误

问题:读取CSV文件时,数字字段被识别为字符串类型,导致无法进行数值计算。
解决方案:使用rows.transform函数手动指定字段类型,示例代码如下:

import rows
from rows.fields import IntegerField, FloatField

# 定义字段类型
class StudentTable(rows.Table):
    name = rows.fields.TextField
    age = IntegerField
    gender = rows.fields.TextField
    score = FloatField

# 读取CSV并应用字段类型
table = rows.import_from_csv("students.csv", force_types=StudentTable)

# 验证类型
print(type(table[0].score))  # 输出:<class 'float'>

5.2 不支持的文件格式

问题:尝试读取Excel文件时,提示“未找到对应的导入函数”。
解决方案:安装扩展依赖rows[excel],然后使用rows.import_from_xlsx()函数读取,示例代码如下:

pip install rows[excel]
import rows
table = rows.import_from_xlsx("students.xlsx")

5.3 大规模数据处理效率低

问题:读取超大CSV文件时,内存占用过高,程序运行缓慢。
解决方案:rows库不适合处理超大规模数据,建议结合pandas库使用,先用rows读取数据转换为字典列表,再传入pandas的DataFrame

import rows
import pandas as pd

table = rows.import_from_csv("large_students.csv")
df = pd.DataFrame(rows.export_to_dicts(table))
# 使用pandas进行高效处理

六、rows库相关资源

  • PyPI地址:https://pypi.org/project/rows
  • Github地址:https://github.com/turicas/rows
  • 官方文档地址:https://rows.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Camelot库——轻松提取PDF表格数据的完整指南

一、Camelot库核心概述

Camelot是一款专为从PDF文件中精确提取表格数据而生的Python库,它能将PDF里的表格转换为Pandas DataFrame或CSV、JSON等格式,极大降低了PDF表格数据处理的门槛。其工作原理是通过两种核心算法(Lattice和Stream)识别表格:Lattice适用于有清晰边框线的表格,通过检测线条来定位单元格;Stream适用于无边框表格,依靠文本的位置和间距来划分单元格。

该库的优点十分突出:提取精度高、支持自定义配置、输出格式灵活、完全免费开源;缺点则是对扫描版PDF(图片型PDF)无效,仅支持文本型PDF,且对复杂嵌套表格的处理能力有限。Camelot采用MIT License开源协议,允许开发者自由使用、修改和分发,无商业使用限制。

二、Camelot库安装步骤

Camelot的安装分为基础安装和依赖补充两个部分,因为它依赖于Ghostscript等第三方工具,不同操作系统的安装流程略有差异,以下是详细的安装指南。

2.1 安装Ghostscript依赖

Ghostscript是Camelot识别PDF表格的核心依赖,必须优先安装。

  1. Windows系统
    访问Ghostscript官方下载地址(https://www.ghostscript.com/releases/gsdnld.html),下载对应版本的安装包,按照安装向导完成安装。安装完成后,需要将Ghostscript的可执行文件路径添加到系统环境变量中,例如默认路径为C:\Program Files\gs\gs10.02.1\bin
  2. macOS系统
    使用Homebrew包管理器执行以下命令安装:
    bash brew install ghostscript
  3. Linux系统(Ubuntu/Debian)
    执行apt-get命令安装:
    bash sudo apt-get install ghostscript

2.2 安装Camelot库

完成Ghostscript安装后,通过pip命令即可安装Camelot库,建议使用Python3.6及以上版本:

pip install camelot-py[cv]

这里的[cv]表示安装包含OpenCV依赖的完整版,OpenCV有助于提升表格识别的准确率。安装完成后,可以在Python环境中执行以下代码验证是否安装成功:

import camelot
print(camelot.__version__)

如果代码能正常输出Camelot的版本号,说明安装成功。

三、Camelot库核心用法与代码实例

Camelot的核心操作流程是读取PDF文件→配置提取参数→提取表格→导出/处理数据,下面将详细讲解两种核心提取算法的使用方法,并结合代码实例进行演示。

3.1 核心概念:Lattice与Stream算法

在使用Camelot提取表格前,需要先明确PDF表格的类型,从而选择对应的算法:

  • Lattice算法:默认算法,适用于有明确边框线的表格,例如Excel导出的PDF表格、财务报表等。该算法通过检测表格的竖线和横线来确定单元格的边界,提取精度极高。
  • Stream算法:适用于无边框线的表格,例如纯文本排版的表格、网页导出的无框PDF表格等。该算法通过分析文本块的位置、间距和对齐方式,来推断表格的结构。

3.2 基础用法:提取单页PDF表格

首先准备一个测试用的PDF文件(例如test_table.pdf),该文件的第1页包含一个有边框的表格。下面的代码将演示如何使用Lattice算法提取该表格。

3.2.1 代码实例:Lattice算法提取有边框表格

import camelot

# 读取PDF文件,指定提取第1页的表格,使用Lattice算法
tables = camelot.read_pdf(
    'test_table.pdf',  # PDF文件路径
    pages='1',         # 指定提取的页码,支持多页如'1,3,5'或范围'1-5'
    flavor='lattice'   # 指定提取算法为lattice
)

# 打印提取到的表格数量
print(f"提取到的表格数量:{len(tables)}")

# 查看第一个表格的基本信息
print("第一个表格的基本信息:")
print(tables[0].parsing_report)  # 输出解析报告,包含精度、页数等信息

# 将表格转换为Pandas DataFrame
df = tables[0].df
print("\n表格数据(DataFrame格式):")
print(df)

# 将表格导出为CSV文件
tables[0].to_csv('extracted_table.csv')
print("\n表格已导出为extracted_table.csv")

# 将表格导出为JSON文件
tables[0].to_json('extracted_table.json')
print("表格已导出为extracted_table.json")

3.2.2 代码说明

  • camelot.read_pdf()是核心函数,用于读取PDF并提取表格,返回一个TableList对象,包含所有提取到的表格。
  • pages参数用于指定提取的页码,支持单页、多页和页码范围,例如pages='1-3'表示提取第1到3页的表格。
  • flavor参数指定算法类型,lattice为默认值,适用于有边框表格。
  • tables[0].parsing_report会输出解析报告,包含accuracy(提取精度)、whitespace(空白占比)、page(页码)等信息,精度越高说明提取效果越好。
  • tables[0].df将表格转换为Pandas DataFrame,方便后续的数据清洗和分析。
  • to_csv()to_json()方法可以将表格导出为对应的文件格式,便于分享和存储。

3.2.3 代码实例:Stream算法提取无边框表格

如果需要提取的PDF表格没有边框线,就需要使用stream算法,同时可以通过table_regions参数指定表格所在的区域,提升提取精度。

import camelot

# 读取PDF文件,使用Stream算法提取无边框表格
tables = camelot.read_pdf(
    'test_no_border_table.pdf',
    pages='1',
    flavor='stream',
    table_regions=['20, 700, 500, 300']  # 指定表格的坐标区域:x1, y1, x2, y2
)

# 输出解析报告
print("解析报告:")
print(tables[0].parsing_report)

# 查看表格数据
df = tables[0].df
print("\n无边框表格数据:")
print(df)

# 导出为Excel文件(需要安装openpyxl库)
tables[0].to_excel('extracted_no_border_table.xlsx', index=False)
print("\n无边框表格已导出为extracted_no_border_table.xlsx")

3.2.4 代码说明

  • flavor='stream'指定使用Stream算法,适用于无边框表格。
  • table_regions参数的作用是限定表格的提取区域,坐标格式为[x1, y1, x2, y2],其中(x1, y1)是区域的左上角坐标,(x2, y2)是右下角坐标。该参数可以避免PDF中的其他文本干扰表格提取,大幅提升准确率。
  • to_excel()方法可以将表格导出为Excel文件,使用前需要安装openpyxl库,执行pip install openpyxl即可。

3.3 高级用法:自定义提取参数

Camelot提供了丰富的自定义参数,用于处理复杂的PDF表格,例如合并单元格、调整列间距、过滤空白行等。下面的代码将演示如何使用这些参数优化提取效果。

3.3.1 代码实例:处理合并单元格与空白行

import camelot

# 读取包含合并单元格的PDF表格
tables = camelot.read_pdf(
    'test_merge_cells.pdf',
    pages='1',
    flavor='lattice',
    strip_text='\n',  # 去除单元格内的换行符
    suppress_stdout=True  # 抑制控制台输出的冗余信息
)

# 查看原始提取的表格数据
print("原始提取数据(含合并单元格):")
print(tables[0].df)

# 处理合并单元格:通过DataFrame的fillna方法填充合并单元格的内容
df = tables[0].df
# 向前填充空值,适用于垂直合并的单元格
df = df.fillna(method='ffill', axis=0)
# 向左填充空值,适用于水平合并的单元格
df = df.fillna(method='ffill', axis=1)

print("\n处理合并单元格后的数据:")
print(df)

# 过滤空白行:删除所有元素均为空的行
df = df.dropna(how='all')
print("\n过滤空白行后的数据:")
print(df)

3.3.2 代码说明

  • strip_text='\n'参数用于去除单元格内的换行符,使文本内容更整洁。
  • suppress_stdout=True参数可以抑制Camelot在控制台输出的冗余日志信息,让输出更简洁。
  • 合并单元格在提取后会表现为NaN值,通过Pandas的fillna()方法,使用ffill(向前填充)策略,可以快速填充合并单元格的内容,还原表格的真实结构。
  • dropna(how='all')方法用于删除所有元素均为空的行,适用于清理包含大量空白行的表格数据。

3.3.3 代码实例:提取多页PDF中的所有表格

如果PDF文件包含多个页面,且每个页面都有表格,可以通过pages='all'参数提取所有页面的表格,并批量导出。

import camelot
import os

# 创建输出目录
output_dir = 'multi_page_tables'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# 提取多页PDF中的所有表格
tables = camelot.read_pdf(
    'multi_page_test.pdf',
    pages='all',
    flavor='lattice'
)

print(f"共提取到 {len(tables)} 个表格")

# 批量导出所有表格为CSV文件
for i, table in enumerate(tables):
    table.to_csv(os.path.join(output_dir, f'table_{i+1}.csv'))
    print(f"表格 {i+1} 已导出到 {output_dir}/table_{i+1}.csv")

3.3.4 代码说明

  • pages='all'参数表示提取PDF文件中所有页面的表格,无需手动指定页码。
  • 通过enumerate()遍历TableList对象中的每个表格,批量导出为CSV文件,并按序号命名,方便管理。
  • 使用os.makedirs()创建输出目录,避免因目录不存在导致的导出失败。

四、实际应用案例:PDF财务报表数据提取与分析

下面将结合一个实际的应用场景——提取PDF格式的财务报表中的利润表数据,并进行简单的数据分析,演示Camelot库在实际工作中的使用价值。

4.1 案例背景

假设我们有一份名为2024_financial_report.pdf的PDF文件,其中第3页是公司的利润表,表格为有边框格式,包含“项目”“2023年”“2024年”三列数据。我们需要提取该表格数据,并分析2024年相较于2023年的营收变化情况。

4.2 代码实例:数据提取与分析

import camelot
import pandas as pd
import matplotlib.pyplot as plt

# 步骤1:提取PDF中的利润表数据
tables = camelot.read_pdf(
    '2024_financial_report.pdf',
    pages='3',
    flavor='lattice',
    strip_text='\n'
)

# 步骤2:转换为DataFrame并清洗数据
profit_df = tables[0].df
# 设置列名:假设表格第一行是表头
profit_df.columns = profit_df.iloc[0]
profit_df = profit_df.drop(0, axis=0)
# 重置索引
profit_df = profit_df.reset_index(drop=True)
# 过滤掉空行
profit_df = profit_df.dropna(how='all')

print("清洗后的利润表数据:")
print(profit_df)

# 步骤3:数据类型转换(将金额列转换为数值类型)
# 假设金额列名为“2023年”和“2024年”
profit_df['2023年'] = pd.to_numeric(profit_df['2023年'], errors='coerce')
profit_df['2024年'] = pd.to_numeric(profit_df['2024年'], errors='coerce')

# 步骤4:分析营收变化——筛选“营业收入”行
revenue_row = profit_df[profit_df['项目'].str.contains('营业收入', na=False)]
if not revenue_row.empty:
    revenue_2023 = revenue_row['2023年'].values[0]
    revenue_2024 = revenue_row['2024年'].values[0]
    revenue_growth = (revenue_2024 - revenue_2023) / revenue_2023 * 100

    print(f"\n2023年营业收入:{revenue_2023:.2f} 万元")
    print(f"2024年营业收入:{revenue_2024:.2f} 万元")
    print(f"营业收入增长率:{revenue_growth:.2f}%")

    # 步骤5:可视化营收变化
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 解决中文显示问题
    plt.figure(figsize=(8, 5))
    years = ['2023年', '2024年']
    revenues = [revenue_2023, revenue_2024]
    plt.bar(years, revenues, color=['#3498db', '#e74c3c'])
    plt.title('2023-2024年营业收入对比')
    plt.ylabel('营业收入(万元)')
    for i, v in enumerate(revenues):
        plt.text(i, v + 100, f'{v:.2f}', ha='center')
    plt.savefig('revenue_comparison.png', dpi=300, bbox_inches='tight')
    plt.show()
else:
    print("\n未找到营业收入相关数据")

4.3 案例说明

  1. 数据提取:使用Lattice算法提取PDF第3页的利润表数据,通过strip_text='\n'清理单元格内的换行符。
  2. 数据清洗:将表格的第一行设为列名,删除表头行和空行,确保数据结构整洁。
  3. 类型转换:将金额列从字符串类型转换为数值类型,以便进行数学计算,errors='coerce'参数可以将无法转换的值设为NaN
  4. 数据分析:通过筛选包含“营业收入”的行,计算2024年相较于2023年的营收增长率。
  5. 数据可视化:使用Matplotlib绘制柱状图,直观展示两年的营业收入对比情况,并解决中文显示问题。

这个案例充分体现了Camelot库在实际工作中的价值——从PDF中快速提取结构化数据,结合Pandas和Matplotlib完成数据分析与可视化,大大提升了工作效率。

五、Camelot库常见问题与解决方案

在使用Camelot的过程中,可能会遇到一些常见问题,下面列出了这些问题的解决方案:

  1. 问题1:提取到的表格为空或不完整
    • 解决方案:检查PDF是否为文本型PDF(扫描版PDF无法提取);使用table_regions参数指定表格区域;尝试切换latticestream算法;调整edge_tol参数(边缘容差)或row_tol参数(行容差)。
  2. 问题2:报错“Ghostscript not found”
    • 解决方案:确认Ghostscript已正确安装,并将其路径添加到系统环境变量中;重启Python环境后重试。
  3. 问题3:合并单元格处理不彻底
    • 解决方案:提取数据后,使用Pandas的fillna()方法手动填充NaN值;对于复杂的合并单元格,可以结合df.replace()方法进行处理。
  4. 问题4:多页PDF提取效率低
    • 解决方案:避免使用pages='all'提取不必要的页面,手动指定需要提取的页码;关闭suppress_stdout=False查看详细日志,定位耗时较长的页面。

六、相关资源链接

  • Pypi地址:https://pypi.org/project/camelot-py
  • Github地址:https://github.com/camelot-dev/camelot
  • 官方文档地址:https://camelot-py.readthedocs.io/en/master/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:deepdish库使用教程

import deepdish as dd
import numpy as np
from sklearn.linear_model import SGDClassifier

初始化随机梯度下降分类器(支持增量训练)

model = SGDClassifier(loss=”log_loss”, max_iter=100, random_state=42, warm_start=True)

分块加载训练集数据,每块 1000 个样本

chunk_size = 1000
print(“开始分块训练模型…”)
for chunk in dd.iterate(“image_classification_dataset.h5″, group=”train.features”, chunks=(chunk_size, 784)):
# 获取对应块的标签
chunk_labels = dd.io.load(“image_classification_dataset.h5″, group=”train.labels”)[chunk.index[0]:chunk.index[0]+chunk_size]
# 增量训练模型
model.partial_fit(chunk, chunk_labels, classes=np.arange(10))

在测试集上评估模型

test_data = dd.io.load(“image_classification_dataset.h5″, group=”test”)
test_pred = model.predict(test_data[“features”])
test_accuracy = accuracy_score(test_data[“labels”], test_pred)
print(f”分块训练后测试集准确率: {test_accuracy:.4f}”)

关注我,每天分享一个实用的Python自动化工具。

Python实用工具img2dataset:大规模图像数据集高效构建指南

一、img2dataset库核心概述

img2dataset是一款专为大规模图像数据集构建设计的Python工具库,其核心用途是从图像URL列表中批量下载、处理并存储图像数据,广泛应用于计算机视觉领域的模型训练数据准备工作。该库的工作原理是通过多线程/多进程并行处理URL队列,支持断点续传、图像格式转换、分辨率调整等功能,同时能够生成配套的元数据文件,便于后续的数据管理与模型训练。

在优缺点方面,img2dataset的优势十分突出:一是并行处理机制大幅提升下载效率,能够轻松应对百万级以上的URL列表;二是支持多种输出格式(如webdataset、files、parquet等),适配不同的训练框架需求;三是内置图像过滤功能,可自动剔除损坏、低分辨率的无效图像。其缺点主要在于对网络环境要求较高,大规模下载时容易受带宽限制,且部分高级功能需要依赖额外的第三方库。该库采用Apache License 2.0开源协议,允许商用与二次开发,完全满足开发者的使用需求。

二、img2dataset安装方法

img2dataset支持通过Python包管理工具pip直接安装,同时也可以从GitHub源码编译安装,两种方式分别适用于不同的使用场景,以下是详细的安装步骤。

2.1 pip快速安装

这是最简便的安装方式,适用于大多数用户,只需在命令行中执行以下命令即可完成安装:

pip install img2dataset

安装完成后,可通过以下Python代码验证是否安装成功:

import img2dataset
print(f"img2dataset版本:{img2dataset.__version__}")

运行上述代码,如果控制台输出对应的版本号,说明安装成功;若出现ModuleNotFoundError,则需要检查pip环境是否配置正确,或尝试升级pip后重新安装。

2.2 源码编译安装

如果需要使用最新的开发版本,或者对源码进行自定义修改,可以选择从GitHub克隆源码并编译安装,步骤如下:

  1. 克隆GitHub仓库
git clone https://github.com/rom1504/img2dataset.git
cd img2dataset
  1. 安装依赖并编译
pip install -r requirements.txt
pip install -e .

这种安装方式的优势在于可以随时通过git pull获取最新的功能更新,适合对功能有定制化需求的开发者。

三、img2dataset基础使用教程

img2dataset的使用方式分为命令行调用Python脚本调用两种,其中脚本调用的灵活性更高,便于嵌入到自动化数据处理流程中。本节将以Python脚本调用为主,结合实例讲解核心功能的使用方法。

3.1 核心参数说明

在使用img2dataset之前,需要先了解其核心参数的含义,这些参数决定了数据下载与处理的行为,关键参数如下表所示:

| 参数名称 | 数据类型 | 作用说明 | 默认值 |
|-|-|-|–|
| url_list | str | 存储图像URL的文件路径或文本内容 | 无(必填) |
| output_format | str | 输出格式,可选webdataset/files/parquet等 | webdataset |
| output_folder | str | 输出文件的存储目录 | dataset |
| thread_count | int | 并行下载的线程数 | 256 |
| image_size | int | 图像缩放后的目标分辨率 | 256 |
| resize_only_if_bigger | bool | 是否仅当原图大于目标分辨率时才缩放 | True |
| skip_reencode | bool | 是否跳过图像重新编码 | True |
| save_additional_columns | list | 需要保存的额外元数据列 | [] |

3.2 从URL列表下载图像(基础实例)

本实例将演示如何从一个包含图像URL的文本文件中批量下载图像,并保存为webdataset格式。

步骤1:准备URL列表文件

首先创建一个名为urls.txt的文本文件,每行存储一个图像URL和对应的元数据(如标签),格式如下:

https://example.com/image1.jpg label1
https://example.com/image2.jpg label2
https://example.com/image3.jpg label3

其中,URL与元数据之间用空格分隔,元数据可以根据需求添加多列。

步骤2:编写Python下载脚本

创建名为download_images.py的Python文件,代码如下:

from img2dataset import download

# 配置下载参数
params = {
    "url_list": "urls.txt",  # URL列表文件路径
    "output_folder": "my_image_dataset",  # 输出目录
    "output_format": "webdataset",  # 输出格式
    "thread_count": 128,  # 并行线程数,根据机器性能调整
    "image_size": 512,  # 图像缩放至512x512
    "resize_only_if_bigger": True,  # 仅缩放大于512的图像
    "skip_reencode": False,  # 重新编码为JPEG格式
    "save_additional_columns": ["label"],  # 保存标签列作为元数据
    "number_sample_per_shard": 1000,  # 每个分片存储1000张图像
    "retries": 3,  # 下载失败时重试次数
}

# 执行下载任务
download(**params)

步骤3:运行脚本并查看结果

在命令行中执行以下命令运行脚本:

python download_images.py

脚本运行后,会在当前目录下生成my_image_dataset文件夹,结构如下:

my_image_dataset/
├── 00000.tar
├── 00001.tar
└── ...

每个.tar文件是一个数据分片,包含1000张图像及其元数据,可直接用于PyTorch、TensorFlow等框架的模型训练。

3.3 直接使用URL列表字符串(进阶实例)

除了从文件读取URL列表,还可以直接将URL列表以字符串的形式传入参数,适用于动态生成URL的场景,代码示例如下:

from img2dataset import download

# 动态生成URL列表字符串
url_str = """https://example.com/img1.jpg cat
https://example.com/img2.jpg dog
https://example.com/img3.jpg bird
"""

# 配置参数
params = {
    "url_list": url_str,  # 直接传入URL字符串
    "output_folder": "dynamic_dataset",
    "output_format": "files",  # 以单个文件形式存储
    "image_size": 256,
    "thread_count": 64,
}

# 执行下载
download(**params)

该脚本运行后,dynamic_dataset文件夹下会按类别生成子文件夹,并存储对应的图像文件,适合需要人工查看图像的场景。

3.4 图像过滤与质量控制

img2dataset内置了图像质量过滤功能,可以自动剔除无效图像,例如损坏的文件、分辨率过低的图像等。以下是添加过滤条件的脚本示例:

from img2dataset import download

params = {
    "url_list": "urls.txt",
    "output_folder": "filtered_dataset",
    "output_format": "parquet",
    "image_size": 384,
    "min_image_size": 128,  # 剔除宽度或高度小于128的图像
    "max_image_area": 1000000,  # 剔除面积超过100万像素的图像
    "timeout": 10,  # 下载超时时间(秒)
    "verify_hash": False,  # 是否验证图像哈希值
    "skip_downloaded": True,  # 跳过已下载的图像(断点续传)
}

download(**params)

通过设置min_image_sizemax_image_area参数,可以精准控制保留的图像质量,避免低质量数据影响模型训练效果。

四、img2dataset高级应用案例

本节将结合实际应用场景,讲解img2dataset的高级用法,包括与其他数据处理库的结合、大规模分布式下载等。

4.1 与Pandas结合处理元数据

在实际项目中,图像的元数据通常存储在CSV文件中,我们可以使用Pandas读取CSV文件,提取URL和元数据,再传递给img2dataset进行下载。以下是完整的案例代码:

import pandas as pd
from img2dataset import download

# 1. 使用Pandas读取CSV元数据文件
df = pd.read_csv("metadata.csv")
# 假设CSV包含列:url, label, category
print(f"元数据文件共包含 {len(df)} 条记录")

# 2. 将DataFrame转换为img2dataset支持的URL字符串格式
url_list = []
for idx, row in df.iterrows():
    url = row["url"]
    label = row["label"]
    category = row["category"]
    # 格式:URL 标签 类别
    url_list.append(f"{url} {label} {category}")
url_str = "\n".join(url_list)

# 3. 配置下载参数
params = {
    "url_list": url_str,
    "output_folder": "pandas_dataset",
    "output_format": "webdataset",
    "thread_count": 256,
    "image_size": 512,
    "save_additional_columns": ["label", "category"],  # 保存多列元数据
}

# 4. 执行下载
download(**params)

该案例适用于元数据较为复杂的场景,通过Pandas可以灵活地筛选、清洗元数据,再传递给img2dataset进行批量下载。

4.2 分布式大规模数据集下载

当需要处理千万级以上的URL列表时,单台机器的性能可能无法满足需求,此时可以使用img2dataset的分布式下载功能,借助多台机器并行处理任务。核心思路是将URL列表分割为多个分片,分配给不同的机器分别下载,最后合并结果。

步骤1:分割URL列表

使用以下Python代码将大型URL文件分割为多个小文件:

def split_url_file(input_file, chunk_size=100000):
    """
    将URL文件分割为多个分片
    :param input_file: 输入URL文件路径
    :param chunk_size: 每个分片的记录数
    """
    with open(input_file, "r", encoding="utf-8") as f:
        lines = f.readlines()

    total_chunks = (len(lines) + chunk_size - 1) // chunk_size
    for i in range(total_chunks):
        start = i * chunk_size
        end = min((i+1)*chunk_size, len(lines))
        chunk_lines = lines[start:end]
        with open(f"urls_chunk_{i}.txt", "w", encoding="utf-8") as f_out:
            f_out.writelines(chunk_lines)
    print(f"分割完成,共生成 {total_chunks} 个分片")

# 分割URL文件,每个分片10万条记录
split_url_file("large_urls.txt", chunk_size=100000)

步骤2:多机器并行下载

将分割后的URL分片文件分别发送到不同的机器,每台机器运行以下下载脚本:

from img2dataset import download

# 替换为对应的分片文件名
chunk_file = "urls_chunk_0.txt"

params = {
    "url_list": chunk_file,
    "output_folder": f"dataset_chunk_0",
    "output_format": "webdataset",
    "thread_count": 256,
    "image_size": 512,
    "distributor": "multiprocessing",  # 使用多进程分发任务
}

download(**params)

步骤3:合并下载结果

所有机器下载完成后,将生成的数据集分片复制到同一目录下,即可得到完整的大规模图像数据集。

五、img2dataset常见问题与解决方案

在使用img2dataset的过程中,可能会遇到各种问题,以下是一些常见问题及其解决方案:

5.1 下载速度慢

  • 原因:线程数设置过低,或网络带宽不足。
  • 解决方案:适当增加thread_count参数的值(根据机器CPU核心数调整,建议设置为CPU核心数的4-8倍);使用高速网络,或配置代理服务器。

5.2 大量图像下载失败

  • 原因:URL无效、目标服务器拒绝访问,或下载超时。
  • 解决方案:增加retries参数的值,提高重试次数;设置合理的timeout参数;下载完成后查看日志文件,剔除无效URL。

5.3 内存占用过高

  • 原因:并行线程数过多,导致内存资源耗尽。
  • 解决方案:降低thread_count参数的值;使用distributor="multiprocessing"参数,采用多进程替代多线程,减少内存占用。

六、相关资源链接

  • Pypi地址:https://pypi.org/project/img2dataset
  • Github地址:https://github.com/rom1504/img2dataset
  • 官方文档地址:https://github.com/rom1504/img2dataset/blob/main/docs/README.md

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:textract 一站式多格式文档文本提取教程

一、textract库核心概述

1.1 功能用途

textract是一款面向Python开发者的多格式文档文本提取工具,能够自动解析并提取数十种常见文件格式中的文本内容,涵盖文档类(DOC、DOCX、PDF、TXT)、表格类(XLS、XLSX、CSV)、演示类(PPT、PPTX)、压缩包类(ZIP、RAR)等主流格式,无需开发者手动编写不同格式的解析逻辑,极大降低了跨格式文本提取的开发门槛。

1.2 工作原理

textract的核心工作逻辑是封装第三方格式解析工具与库,通过调用不同的底层依赖来处理对应格式的文件:对于PDF文件,会调用pdfminerpdftotext;对于Office文档,依赖python-docxxlrdpython-pptx等库;对于压缩包,会先解压再提取内部文件文本。工具会自动识别输入文件的格式,匹配对应的解析器,最终将提取的文本整合为统一的字符串输出。

1.3 优缺点分析

优点

  • 格式支持全面,一站式解决多类型文件的文本提取需求;
  • 调用接口简洁,一行代码即可完成文本提取;
  • 兼容Python 3.x版本,适配主流开发环境。

缺点

  • 底层依赖较多,安装时需要配置各类第三方工具,部分格式(如加密PDF)无法处理;
  • 大文件提取速度较慢,内存占用较高;
  • 对部分小众格式的支持不够完善,存在解析失败的情况。

1.4 License类型

textract采用MIT开源许可证,开发者可以自由使用、修改和分发代码,无论是个人项目还是商业项目都无需支付授权费用,仅需保留原作者的版权声明。

二、textract库安装指南

2.1 系统依赖准备

由于textract依赖大量第三方工具,不同操作系统的安装步骤存在差异,需先配置系统级依赖:

2.1.1 Windows系统

Windows用户需要安装以下工具,并确保其添加到系统环境变量中:

  • poppler:用于PDF文件解析,下载地址:http://blog.alivate.com.au/poppler-windows/
  • antiword:用于DOC文件解析,下载地址:https://www.winfield.demon.nl/
  • unrar:用于RAR压缩包解析,下载地址:https://www.rarlab.com/rar_add.htm

下载后解压到指定目录,例如C:\Tools,并将对应工具的可执行文件路径(如C:\Tools\poppler-23.11.0\Library\bin)添加到系统环境变量Path中。

2.1.2 Linux系统

Linux用户可通过包管理器直接安装所有依赖,以Ubuntu/Debian为例:

sudo apt-get update
sudo apt-get install python3-dev python3-pip antiword unrtf poppler-utils pstotext tesseract-ocr flac ffmpeg lame libmad0 libsox-fmt-mp3 sox libjpeg-dev swig

以CentOS/RHEL为例:

sudo yum install python3-devel python3-pip antiword unrtf poppler-utils tesseract flac ffmpeg lame libmad sox libjpeg-turbo-devel swig

2.1.3 macOS系统

macOS用户可通过Homebrew安装依赖:

brew install python3 antiword unrtf poppler tesseract flac ffmpeg lame sox

2.2 Python包安装

完成系统依赖配置后,通过pip命令安装textract库:

pip install textract

验证安装是否成功,在Python交互式环境中执行以下代码:

import textract
print(textract.__version__)

若输出对应的版本号(如1.6.5),则说明安装成功。若出现导入错误,需检查系统依赖是否配置正确,或重新安装对应缺失的工具。

三、textract库核心API使用教程

3.1 基础文本提取:extract函数

textract的核心功能由textract.process()函数实现,该函数接收文件路径作为参数,自动识别文件格式并返回提取的文本内容(字节串格式)。

3.1.1 提取TXT文件文本

TXT文件是最基础的文本格式,textract提取过程无需额外依赖,代码示例如下:

# 导入textract库
import textract

# 定义TXT文件路径
txt_file_path = "example.txt"

# 提取文本内容,返回字节串
text_bytes = textract.process(txt_file_path)

# 将字节串解码为字符串
text_str = text_bytes.decode("utf-8")

# 打印提取的文本
print("提取的TXT文件内容:")
print(text_str)

代码说明textract.process()函数读取example.txt文件,返回UTF-8编码的字节串,通过decode("utf-8")转换为可阅读的字符串。若TXT文件采用其他编码(如GBK),需在decode时指定对应的编码格式。

3.1.2 提取PDF文件文本

PDF文件是办公场景中最常用的格式之一,textract依赖poppler工具解析PDF,代码示例如下:

import textract

# 定义PDF文件路径
pdf_file_path = "example.pdf"

# 提取PDF文本,指定编码为utf-8
try:
    text_bytes = textract.process(
        pdf_file_path,
        encoding="utf-8"
    )
    text_str = text_bytes.decode("utf-8")
    print("提取的PDF文件内容:")
    print(text_str)
except Exception as e:
    print(f"PDF提取失败:{e}")

代码说明:使用try-except捕获可能的异常(如文件不存在、依赖缺失),避免程序崩溃。对于扫描版PDF(图片格式),textract无法直接提取文本,需结合OCR工具(如tesseract)进行处理,后续会介绍对应的方法。

3.1.3 提取Word文档文本

Word文档分为.doc.docx两种格式,textract分别依赖antiword和python-docx库处理,代码示例如下:

import textract

# 提取.doc格式文档
doc_file_path = "example.doc"
try:
    doc_text = textract.process(doc_file_path).decode("utf-8")
    print("提取的DOC文件内容:")
    print(doc_text)
except Exception as e:
    print(f"DOC提取失败:{e}")

# 提取.docx格式文档
docx_file_path = "example.docx"
try:
    docx_text = textract.process(docx_file_path).decode("utf-8")
    print("\n提取的DOCX文件内容:")
    print(docx_text)
except Exception as e:
    print(f"DOCX提取失败:{e}")

代码说明.doc.docx格式的提取代码完全一致,textract会自动识别文件后缀并调用对应的解析器。需要注意的是,antiword工具对部分复杂格式的.doc文件支持不够完善,可能出现文本乱码的情况。

3.1.4 提取Excel表格文本

Excel表格(.xls.xlsx)的文本提取会将所有单元格的内容按行拼接,代码示例如下:

import textract

# 提取xlsx格式表格
xlsx_file_path = "example.xlsx"
try:
    xlsx_text = textract.process(xlsx_file_path).decode("utf-8")
    # 按换行符分割每行内容
    rows = xlsx_text.split("\n")
    print("Excel表格提取的内容(按行显示):")
    for index, row in enumerate(rows):
        if row.strip():  # 跳过空行
            print(f"第{index+1}行:{row.strip()}")
except Exception as e:
    print(f"Excel提取失败:{e}")

代码说明:提取的Excel文本中,不同行之间用换行符分隔,同一行的不同单元格内容用制表符或空格分隔。通过split("\n")可以将文本按行拆分,方便后续处理。

3.2 进阶功能:指定解析器与参数

textract允许开发者手动指定解析器,以覆盖自动识别的逻辑,同时支持传递额外参数优化提取效果。

3.2.1 手动指定解析器

以PDF文件为例,textract支持pdfminerpdftotext两种解析器,可通过method参数指定:

import textract

pdf_file_path = "example.pdf"

# 使用pdftotext解析器提取PDF文本
try:
    text = textract.process(
        pdf_file_path,
        method="pdftotext",  # 指定解析器
        encoding="utf-8"
    ).decode("utf-8")
    print("使用pdftotext提取的PDF内容:")
    print(text)
except Exception as e:
    print(f"解析失败:{e}")

代码说明method参数的值需与textract支持的解析器名称一致,不同格式对应的解析器可参考官方文档。手动指定解析器可以解决自动识别失败的问题,提升提取成功率。

3.2.2 处理扫描版PDF(OCR识别)

扫描版PDF本质是图片集合,需要结合OCR工具提取文本。textract支持调用tesseract-ocr进行OCR识别,代码示例如下:

import textract

# 扫描版PDF文件路径
scan_pdf_path = "scan_example.pdf"

# 使用OCR提取文本,指定语言为中文
try:
    ocr_text = textract.process(
        scan_pdf_path,
        method="tesseract",
        language="chi_sim"  # chi_sim表示简体中文,eng表示英文
    ).decode("utf-8")
    print("扫描版PDF的OCR识别结果:")
    print(ocr_text)
except Exception as e:
    print(f"OCR识别失败:{e}")

代码说明:使用method="tesseract"指定OCR解析器,language参数设置识别语言(需提前安装对应的语言包)。tesseract默认支持英文,若需识别中文,需下载简体中文语言包并放置到tesseract的tessdata目录下。

3.2.3 提取压缩包内文件文本

textract支持直接提取ZIP、RAR等压缩包内的所有文件文本,无需手动解压,代码示例如下:

import textract

# ZIP压缩包路径
zip_file_path = "example.zip"

try:
    # 提取压缩包内所有文件的文本
    zip_text = textract.process(zip_file_path).decode("utf-8")
    print("压缩包内文件的文本内容:")
    print(zip_text)
except Exception as e:
    print(f"压缩包提取失败:{e}")

代码说明:textract会自动解压压缩包,遍历内部所有文件并提取文本,最终将所有内容拼接为一个字符串。若压缩包内包含加密文件,提取会失败。

3.3 错误处理与最佳实践

3.3.1 常见异常类型及处理

在使用textract的过程中,常见的异常包括文件不存在依赖缺失格式不支持权限不足等,开发者需针对性地进行处理:

import textract
import os

def extract_file_text(file_path):
    """
    通用文本提取函数,包含异常处理
    :param file_path: 文件路径
    :return: 提取的文本字符串,失败返回None
    """
    # 检查文件是否存在
    if not os.path.exists(file_path):
        print(f"错误:文件 {file_path} 不存在")
        return None
    # 检查文件权限
    if not os.access(file_path, os.R_OK):
        print(f"错误:没有读取文件 {file_path} 的权限")
        return None
    try:
        text_bytes = textract.process(file_path)
        return text_bytes.decode("utf-8", errors="ignore")  # 忽略解码错误
    except textract.exceptions.ExtensionNotSupported as e:
        print(f"错误:不支持的文件格式 - {e}")
        return None
    except textract.exceptions.ShellError as e:
        print(f"错误:底层工具调用失败 - {e},请检查系统依赖")
        return None
    except Exception as e:
        print(f"未知错误:{e}")
        return None

# 测试函数
test_files = ["example.txt", "unknown.xyz", "example.pdf"]
for file in test_files:
    print(f"\n=== 提取 {file} 的内容 ===")
    text = extract_file_text(file)
    if text:
        print(text[:200])  # 仅打印前200个字符

代码说明:定义通用提取函数extract_file_text,先检查文件存在性和读取权限,再通过try-except捕获textract的特定异常和通用异常,确保程序的健壮性。decode时设置errors="ignore"可以忽略部分编码错误,避免因个别字符问题导致提取失败。

3.3.2 最佳实践建议

  1. 提前安装所有依赖:根据开发环境,一次性安装好所有系统级依赖和Python库,避免运行时出现工具缺失的问题;
  2. 小文件优先测试:在处理大量文件前,先用小文件测试提取效果,确认格式支持和编码设置正确;
  3. 大文件分块处理:对于超过100MB的大文件,建议分块读取或使用其他工具辅助,避免textract占用过多内存;
  4. 结合其他库优化:对于复杂格式的文件,可结合python-docxPyPDF2等库单独处理,提升提取精度;
  5. 日志记录:在生产环境中,将提取过程的异常信息记录到日志文件,方便后续排查问题。

四、实战案例:批量提取文件夹内所有文件的文本

4.1 需求描述

在实际开发中,经常需要批量提取某个文件夹内所有文件的文本内容,并将结果保存到指定的TXT文件中。本案例将实现一个批量提取工具,支持递归遍历子文件夹,自动过滤不支持的文件格式。

4.2 实现代码

import textract
import os
from pathlib import Path

def batch_extract_text(input_dir, output_file, supported_extensions=None):
    """
    批量提取文件夹内所有文件的文本
    :param input_dir: 输入文件夹路径
    :param output_file: 输出文本文件路径
    :param supported_extensions: 支持的文件后缀列表,None表示支持所有textract格式
    """
    # 默认支持的文件后缀,可根据需求扩展
    default_supported = {
        ".txt", ".pdf", ".doc", ".docx", ".xls", ".xlsx",
        ".ppt", ".pptx", ".csv", ".zip", ".rar"
    }
    if supported_extensions is None:
        supported_extensions = default_supported
    else:
        supported_extensions = set(supported_extensions)

    # 打开输出文件,使用追加模式
    with open(output_file, "w", encoding="utf-8") as f_out:
        # 递归遍历文件夹
        for root, dirs, files in os.walk(input_dir):
            for file in files:
                file_path = Path(root) / file
                file_ext = file_path.suffix.lower()
                # 过滤不支持的文件格式
                if file_ext not in supported_extensions:
                    continue
                print(f"正在提取:{file_path}")
                # 调用提取函数
                text = extract_file_text(str(file_path))
                if text:
                    # 写入文件路径和提取的文本
                    f_out.write(f"=== 文件路径:{file_path} ===\n")
                    f_out.write(text)
                    f_out.write("\n" + "="*50 + "\n\n")
                else:
                    f_out.write(f"=== 文件路径:{file_path} ===\n")
                    f_out.write("提取失败\n")
                    f_out.write("\n" + "="*50 + "\n\n")
    print(f"批量提取完成,结果已保存到:{output_file}")

# 复用之前定义的提取函数
def extract_file_text(file_path):
    if not os.path.exists(file_path):
        return None
    if not os.access(file_path, os.R_OK):
        return None
    try:
        text_bytes = textract.process(file_path)
        return text_bytes.decode("utf-8", errors="ignore")
    except Exception:
        return None

# 测试批量提取功能
if __name__ == "__main__":
    input_directory = "test_files"  # 输入文件夹
    output_txt = "batch_extract_result.txt"  # 输出文件
    batch_extract_text(input_directory, output_txt)

4.3 代码说明

  1. 函数参数说明input_dir为待处理的文件夹路径,output_file为结果保存的文件路径,supported_extensions为自定义支持的文件后缀列表;
  2. 递归遍历:使用os.walk()函数递归遍历文件夹内的所有文件,包括子文件夹中的文件;
  3. 格式过滤:通过文件后缀过滤不支持的格式,仅处理指定类型的文件;
  4. 结果保存:将每个文件的路径和提取的文本写入输出文件,用分隔符区分不同文件的内容,方便后续查看和分析。

4.4 运行步骤

  1. 创建test_files文件夹,放入各类测试文件(如TXT、PDF、Word、Excel等);
  2. 运行上述代码,程序会自动遍历test_files文件夹;
  3. 查看生成的batch_extract_result.txt文件,即可获取所有文件的文本提取结果。

五、相关资源链接

  • PyPI地址:https://pypi.org/project/textract
  • GitHub地址:https://github.com/deanmalmgren/textract
  • 官方文档地址:https://textract.readthedocs.io/en/stable/

关注我,每天分享一个实用的Python自动化工具。

Python弱监督学习神器:Snorkel 从入门到实战全攻略

一、Snorkel 库核心概述

1.1 用途与工作原理

Snorkel 是一款专为弱监督学习(Weakly Supervised Learning)设计的 Python 库,核心解决机器学习中标注数据稀缺、标注成本高昂的痛点。它允许开发者通过编写简单的标注函数(Labeling Functions, LFs)、集成多个弱监督信号,无需人工标注大量数据,就能快速生成高质量的训练标签,进而训练出性能优异的机器学习模型。

其工作原理可概括为三步:首先,用户针对任务编写多个标注函数,每个函数基于不同的启发式规则、外部知识库或弱监督信号对数据进行标注;其次,Snorkel 的标签模型(Label Model)会自动学习这些标注函数的可靠性权重,解决函数间的冲突与冗余,输出概率化的训练标签;最后,用生成的标签训练下游任务模型(如分类器),完成端到端的弱监督学习流程。

1.2 优缺点分析

优点

  1. 大幅降低标注成本:无需人工标注数千上万条数据,仅需编写少量标注函数即可生成训练标签,效率提升显著。
  2. 灵活性强:支持文本分类、实体识别、图像分类等多种任务,标注函数可灵活结合规则、正则表达式、外部模型等多种信号。
  3. 标签质量可控:标签模型通过学习标注函数的可靠性,有效过滤噪声标签,生成的标签质量优于单一规则标注。
  4. 与主流框架兼容:可无缝对接 Scikit-learn、TensorFlow、PyTorch 等主流机器学习/深度学习框架,适配现有工作流。

缺点

  1. 有一定学习门槛:需要用户理解弱监督学习的核心思想,掌握标注函数的编写逻辑,对新手不够友好。
  2. 标注函数编写依赖领域知识:针对特定任务的标注函数需要结合领域经验,否则可能导致标签质量下降。
  3. 性能受限于标注函数质量:若标注函数设计不合理、覆盖场景不全,最终模型性能会大打折扣。

1.3 License 类型

Snorkel 采用 Apache License 2.0 开源协议,该协议允许用户自由使用、修改、分发源代码,可用于商业项目,仅需保留原作者版权声明和协议文本。

二、Snorkel 安装与环境配置

2.1 安装方式

Snorkel 支持多种安装方式,推荐使用 pip 进行快速安装,同时需确保 Python 版本在 3.7~3.10 之间(版本过高可能存在兼容性问题)。

方法1:PyPI 官方安装

打开命令行终端,执行以下命令:

pip install snorkel

方法2:源码编译安装

若需要使用最新开发版本,可从 GitHub 克隆源码并安装:

# 克隆仓库
git clone https://github.com/snorkel-team/snorkel.git
# 进入仓库目录
cd snorkel
# 安装依赖并编译
pip install -r requirements.txt
pip install -e .

2.2 环境验证

安装完成后,可通过以下 Python 代码验证是否安装成功:

import snorkel
# 打印 Snorkel 版本号
print(f"Snorkel 版本:{snorkel.__version__}")
# 验证核心模块是否可用
from snorkel.labeling import LabelingFunction, LFApplier
print("核心模块导入成功!")

若运行无报错且输出版本号,则说明环境配置完成。

三、Snorkel 核心概念与基础用法

3.1 核心概念解析

在使用 Snorkel 前,需先理解以下几个核心概念,这是构建弱监督学习流程的基础:

  1. 标注函数(Labeling Function, LF):弱监督学习的核心,是用户编写的、基于启发式规则对数据进行标注的函数。每个 LF 可以对数据样本标注正类(1)、负类(0)、弃权(-1)三种标签之一,弃权表示该函数无法判断该样本的类别。
  2. 标签模型(Label Model):Snorkel 的核心组件,用于自动学习多个 LF 的可靠性权重,解决 LF 之间的冲突(如一个 LF 标正类,另一个标负类)和冗余,最终输出每个样本的概率化标签。
  3. 标签应用器(LFApplier):用于将所有标注函数应用到数据集上,生成一个标签矩阵(Label Matrix),矩阵的每一行对应一个样本,每一列对应一个 LF 的标注结果。
  4. 下游任务模型(Downstream Model):使用标签模型生成的标签进行训练的模型,如文本分类器、实体识别器等,可根据任务需求选择传统机器学习模型或深度学习模型。

3.2 基础工作流程

Snorkel 的典型工作流程分为四步:编写标注函数 → 生成标签矩阵 → 训练标签模型 → 训练下游模型。下面以文本情感分类任务为例,详细演示每一步的实现方法。

四、实战:基于 Snorkel 的文本情感分类

本次实战任务为电影评论情感分类,目标是将评论分为正面(1)负面(0)两类。我们将使用 Snorkel 编写标注函数,无需人工标注数据,直接生成训练标签并训练分类器。

4.1 数据集准备

我们使用 Snorkel 内置的小型电影评论数据集,也可替换为自定义数据集。首先导入所需模块并加载数据:

import pandas as pd
from snorkel.datasets import load_movie_reviews

# 加载数据集
train_df, test_df = load_movie_reviews()
# 查看数据集结构
print("训练集样本数:", len(train_df))
print("测试集样本数:", len(test_df))
# 查看前5条训练数据
print(train_df[["text", "sentiment"]].head())

数据集的 text 列是电影评论文本,sentiment 列是真实情感标签(1 为正面,0 为负面),在弱监督学习中,我们不会使用真实标签,仅用于最终测试模型性能。

4.2 编写标注函数

标注函数是弱监督学习的核心,我们需要结合情感分类的任务特点,编写多个基于关键词、正则表达式的 LF。首先定义标签常量:

# 定义标签常量
ABSTAIN = -1
POSITIVE = 1
NEGATIVE = 0

接下来编写 5 个不同的标注函数,分别基于正面关键词、负面关键词、情感强度词、否定词、长度规则进行标注:

标注函数1:基于正面关键词标注

该函数判断评论中是否包含正面关键词(如 “great”、”excellent”、”amazing”),若包含则标注为正面(1),否则弃权(-1)。

from snorkel.labeling import LabelingFunction

# 定义正面关键词列表
positive_keywords = ["great", "excellent", "amazing", "fantastic", "wonderful", "perfect"]

@LabelingFunction()
def lf_positive_keywords(x):
    """基于正面关键词的标注函数"""
    return POSITIVE if any(word in x.text.lower() for word in positive_keywords) else ABSTAIN

标注函数2:基于负面关键词标注

该函数判断评论中是否包含负面关键词(如 “bad”、”terrible”、”awful”),若包含则标注为负面(0),否则弃权(-1)。

# 定义负面关键词列表
negative_keywords = ["bad", "terrible", "awful", "horrible", "disappointing", "worst"]

@LabelingFunction()
def lf_negative_keywords(x):
    """基于负面关键词的标注函数"""
    return NEGATIVE if any(word in x.text.lower() for word in negative_keywords) else ABSTAIN

标注函数3:基于情感强度词标注

该函数判断评论中是否包含强情感词(如 “love”、”hate”),”love” 对应正面,”hate” 对应负面。

@LabelingFunction()
def lf_sentiment_intensity(x):
    """基于情感强度词的标注函数"""
    text = x.text.lower()
    if "love" in text:
        return POSITIVE
    elif "hate" in text:
        return NEGATIVE
    else:
        return ABSTAIN

标注函数4:基于否定词的标注函数

该函数处理包含否定词的情况,如 “not great” 应标注为负面,”not bad” 应标注为正面。

import re

@LabelingFunction()
def lf_negative_expressions(x):
    """基于否定词的标注函数"""
    text = x.text.lower()
    # 匹配 "not + 正面词" 结构
    if re.search(r"not (great|excellent|amazing|good)", text):
        return NEGATIVE
    # 匹配 "not + 负面词" 结构
    elif re.search(r"not (bad|terrible|awful)", text):
        return POSITIVE
    else:
        return ABSTAIN

标注函数5:基于评论长度的标注函数

通常,正面评论可能更长(用户愿意详细分享体验),负面评论可能更短。该函数设定评论长度阈值,长评论标注为正面,短评论标注为负面。

@LabelingFunction()
def lf_review_length(x):
    """基于评论长度的标注函数"""
    # 计算单词数量
    word_count = len(x.text.split())
    if word_count > 50:
        return POSITIVE
    elif word_count < 10:
        return NEGATIVE
    else:
        return ABSTAIN

4.3 生成标签矩阵

编写完标注函数后,需要使用 LFApplier 将这些函数应用到训练数据集上,生成标签矩阵。标签矩阵的形状为 (样本数, 标注函数数),每个元素是对应 LF 对该样本的标注结果。

from snorkel.labeling import LFApplier

# 收集所有标注函数
lfs = [lf_positive_keywords, lf_negative_keywords, lf_sentiment_intensity, lf_negative_expressions, lf_review_length]

# 创建标签应用器
applier = LFApplier(lfs=lfs)

# 应用标注函数到训练集,生成标签矩阵
L_train = applier.apply(df=train_df)

# 查看标签矩阵形状
print("标签矩阵形状:", L_train.shape)
# 查看前5个样本的标注结果
print("前5个样本的标注结果:")
print(L_train[:5])

输出的标签矩阵中,-1 表示弃权,0 表示负面,1 表示正面。例如 [1, -1, 1, -1, 0] 表示第一个 LF 标正面,第二个弃权,第三个标正面,第四个弃权,第五个标负面。

4.4 分析标注函数性能

在训练标签模型前,可通过 Snorkel 提供的工具分析标注函数的性能,包括覆盖率(Coverage)冲突率(Conflict Rate)重叠率(Overlap Rate)

  • 覆盖率:标注函数对多少样本进行了标注(非弃权),覆盖率越高,函数的作用越大。
  • 重叠率:两个标注函数同时对同一个样本标注的比例,重叠率过高可能表示函数冗余。
  • 冲突率:两个标注函数对同一个样本标注不同标签的比例,冲突率过高需要优化标注函数。
from snorkel.labeling import analysis

# 计算标注函数的统计指标
lf_stats = analysis.LFAnalysis(L_train, lfs).lf_stats()
print(lf_stats)

输出结果会展示每个 LF 的覆盖率、重叠率和冲突率,帮助我们筛选和优化标注函数。例如,若某个 LF 的覆盖率极低(如低于 5%),可以考虑删除或修改该函数。

4.5 训练标签模型

标签模型是 Snorkel 的核心,它无需真实标签,仅通过标签矩阵就能学习每个标注函数的可靠性权重,并输出概率化的训练标签。我们使用 LabelModel 类来训练标签模型:

from snorkel.labeling import LabelModel

# 初始化标签模型,设置类别数为2(正面/负面)
label_model = LabelModel(cardinality=2, verbose=True)

# 训练标签模型
label_model.fit(L_train=L_train, n_epochs=500, lr=0.001, log_freq=100)

# 对训练集生成概率化标签
Y_train_probs = label_model.predict_proba(L=L_train)
# 生成硬标签(概率大于0.5为正面,否则为负面)
Y_train_pred = label_model.predict(L=L_train)

# 查看生成的标签形状
print("概率化标签形状:", Y_train_probs.shape)
print("硬标签形状:", Y_train_pred.shape)
# 查看前5个样本的概率化标签和硬标签
print("前5个样本的概率化标签:", Y_train_probs[:5])
print("前5个样本的硬标签:", Y_train_pred[:5])

概率化标签是一个二维数组,每一行对应一个样本,每一列对应一个类别的概率(如 [0.1, 0.9] 表示该样本为正面的概率是 0.9)。硬标签是基于概率的二分类结果,取值为 0 或 1。

4.6 训练下游分类模型

生成训练标签后,我们可以使用这些标签训练下游分类模型。本次实战使用 Scikit-learn 的逻辑回归模型作为下游模型,特征提取使用 TF-IDF 向量化器

步骤1:特征提取

将文本数据转换为 TF-IDF 特征向量:

from sklearn.feature_extraction.text import TfidfVectorizer

# 初始化 TF-IDF 向量化器
vectorizer = TfidfVectorizer(stop_words="english", max_features=10000)

# 对训练集和测试集文本进行特征提取
X_train = vectorizer.fit_transform(train_df["text"])
X_test = vectorizer.transform(test_df["text"])

# 提取测试集真实标签(仅用于评估)
Y_test = test_df["sentiment"].values

print("训练集特征形状:", X_train.shape)
print("测试集特征形状:", X_test.shape)

步骤2:训练下游模型

使用标签模型生成的硬标签训练逻辑回归模型:

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report

# 初始化逻辑回归模型
downstream_model = LogisticRegression(max_iter=1000)

# 使用弱监督标签训练模型
downstream_model.fit(X_train, Y_train_pred)

# 对测试集进行预测
Y_test_pred = downstream_model.predict(X_test)

# 评估模型性能
print(f"测试集准确率:{accuracy_score(Y_test, Y_test_pred):.4f}")
print("\n分类报告:")
print(classification_report(Y_test, Y_test_pred, target_names=["负面", "正面"]))

输出的分类报告将包含准确率、精确率、召回率和 F1 分数等指标。在实际应用中,通过优化标注函数,模型性能可进一步提升。

步骤3:使用概率化标签训练模型(进阶)

除了硬标签,Snorkel 还支持使用概率化标签训练下游模型,这种方式可以保留标签的不确定性,通常能获得更好的性能。对于 Scikit-learn 模型,可通过 class_weight 参数实现;对于深度学习模型,可直接使用概率化标签作为损失函数的输入。

# 使用概率化标签调整类别权重
import numpy as np

# 计算每个样本的权重(概率的绝对值)
sample_weight = np.max(Y_train_probs, axis=1)

# 训练模型时加入样本权重
downstream_model_weighted = LogisticRegression(max_iter=1000)
downstream_model_weighted.fit(X_train, Y_train_pred, sample_weight=sample_weight)

# 评估加权模型性能
Y_test_pred_weighted = downstream_model_weighted.predict(X_test)
print(f"加权模型测试集准确率:{accuracy_score(Y_test, Y_test_pred_weighted):.4f}")

五、进阶应用:实体识别任务

除了文本分类,Snorkel 还广泛应用于命名实体识别(NER)任务。NER 任务需要识别文本中的实体(如人名、地名、机构名),传统方法需要大量人工标注的序列数据,而 Snorkel 可通过编写序列标注函数,快速生成训练标签。

5.1 序列标注函数编写

在 NER 任务中,标注函数的输入是句子中的每个 token(词),输出是该 token 的实体标签(如 PER 表示人名,LOC 表示地名,O 表示非实体)。以下是一个简单的 NER 标注函数示例:

from snorkel.labeling import labeling_function
from snorkel.types import Token

# 定义实体标签常量
PER = 1  # 人名
LOC = 2  # 地名
O = 0    # 非实体

@labeling_function()
def lf_person_names(x: Token) -> int:
    """识别人名的标注函数,基于常见姓氏列表"""
    common_last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones"]
    # 判断 token 是否为大写开头且在姓氏列表中
    if x.text.istitle() and x.text in common_last_names:
        return PER
    return O

@labeling_function()
def lf_location_names(x: Token) -> int:
    """识别地名的标注函数,基于常见地名列表"""
    common_locations = ["New York", "London", "Paris", "Tokyo", "Beijing"]
    # 判断 token 是否为地名的一部分
    if any(location in x.text for location in common_locations):
        return LOC
    return O

5.2 序列标签模型训练

对于序列标注任务,Snorkel 提供了 SequenceLabelModel 类,专门用于处理序列数据的标签生成。其使用流程与文本分类类似,只需将标签应用器替换为 SequenceLFApplier

from snorkel.labeling import SequenceLFApplier, SequenceLabelModel

# 收集序列标注函数
sequence_lfs = [lf_person_names, lf_location_names]

# 创建序列标签应用器
sequence_applier = SequenceLFApplier(lfs=sequence_lfs)

# 应用标注函数到序列数据集,生成序列标签矩阵
L_sequence = sequence_applier.apply(df=sequence_train_df)

# 初始化序列标签模型
sequence_label_model = SequenceLabelModel(cardinality=3, verbose=True)

# 训练模型
sequence_label_model.fit(L_sequence, n_epochs=100, lr=0.01)

# 生成序列标签
Y_sequence_probs = sequence_label_model.predict_proba(L_sequence)

六、Snorkel 与深度学习框架的集成

Snorkel 可无缝对接 TensorFlow、PyTorch 等深度学习框架,用弱监督标签训练深度模型。以下是与 PyTorch 集成的示例,训练一个基于 LSTM 的文本分类模型:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# 定义自定义数据集类
class TextDataset(Dataset):
    def __init__(self, X, Y_probs):
        self.X = torch.tensor(X.toarray(), dtype=torch.float32)
        self.Y_probs = torch.tensor(Y_probs, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Y_probs[idx]

# 定义 LSTM 分类模型
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        # 调整输入形状为 (batch_size, seq_len, input_dim)
        x = x.unsqueeze(1)
        lstm_out, _ = self.lstm(x)
        # 取最后一个时间步的输出
        last_out = lstm_out[:, -1, :]
        out = self.fc(last_out)
        return self.softmax(out)

# 准备数据加载器
train_dataset = TextDataset(X_train, Y_train_probs)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# 初始化模型、损失函数和优化器
input_dim = X_train.shape[1]
hidden_dim = 128
output_dim = 2

model = LSTMClassifier(input_dim, hidden_dim, output_dim)
criterion = nn.BCELoss()  # 使用二元交叉熵损失
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")

# 评估模型
model.eval()
with torch.no_grad():
    X_test_tensor = torch.tensor(X_test.toarray(), dtype=torch.float32)
    Y_test_probs = model(X_test_tensor)
    Y_test_pred = torch.argmax(Y_test_probs, dim=1).numpy()
    print(f"LSTM 模型测试集准确率:{accuracy_score(Y_test, Y_test_pred):.4f}")

七、相关资源链接

  • PyPI 地址:https://pypi.org/project/snorkel
  • Github 地址:https://github.com/snorkel-team/snorkel
  • 官方文档地址:https://snorkel.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。