Python实用工具:Databases库高效操作数据库指南

一、Databases库核心概述

1.1 用途与工作原理

Databases是一款专为Python异步编程设计的数据库操作库,支持PostgreSQL、MySQL、SQLite等主流数据库,可配合异步框架(如FastAPI、Starlette)实现高性能数据库交互。其工作原理是封装不同数据库的异步驱动,提供统一的异步API,避免同步操作阻塞事件循环,提升程序并发处理能力。

1.2 优缺点分析

优点:API简洁统一,适配多种数据库;原生支持异步操作,契合现代异步Web框架;轻量级设计,无冗余依赖;支持SQLAlchemy核心表达式,兼顾灵活性与规范性。
缺点:仅支持异步操作,同步项目中需额外引入异步运行环境;部分高级数据库特性需依赖底层驱动实现;对复杂ORM场景的支持弱于SQLAlchemy。

1.3 License类型

Databases库采用BSD 3-Clause “New” or “Revised” License,这是一种宽松的开源许可证,允许用户自由使用、修改和分发代码,商用场景中只需保留原作者版权声明。

二、Databases库安装与环境准备

2.1 安装命令

Databases库的安装需区分数据库类型,核心库安装命令如下:

pip install databases

安装后需根据目标数据库安装对应的异步驱动,常用驱动安装命令:

  • SQLite(无需额外驱动,内置支持)
  • PostgreSQL
  pip install asyncpg
  • MySQL/MariaDB
  pip install aiomysql

2.2 环境验证

安装完成后,可通过以下代码验证环境是否配置成功(以SQLite为例):

import databases

# 定义SQLite数据库连接URL
DATABASE_URL = "sqlite:///./test.db"
# 初始化数据库连接对象
database = databases.Database(DATABASE_URL)

async def check_connection():
    # 连接数据库
    await database.connect()
    # 验证连接状态
    if database.is_connected:
        print("数据库连接成功!")
    else:
        print("数据库连接失败!")
    # 断开连接
    await database.disconnect()

# 运行异步函数
import asyncio
asyncio.run(check_connection())

代码说明:该脚本初始化SQLite数据库连接,通过connect()disconnect()方法管理连接状态,运行后若输出“数据库连接成功!”,则说明环境配置无误。

三、Databases库核心使用方法

3.1 数据库连接管理

数据库连接的创建与关闭是操作的基础,Databases库提供Database类封装连接逻辑,支持上下文管理器自动管理连接生命周期。

3.1.1 基本连接方式

以MySQL数据库为例,连接代码如下:

import databases
import asyncio

# MySQL数据库连接URL格式:mysql+aiomysql://用户名:密码@主机:端口/数据库名
DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/test_db"
database = databases.Database(DATABASE_URL)

async def basic_connection():
    # 手动连接
    await database.connect()
    print(f"连接状态: {database.is_connected}")
    # 手动断开
    await database.disconnect()
    print(f"连接状态: {database.is_connected}")

asyncio.run(basic_connection())

代码说明:Database类接收数据库连接URL作为参数,connect()方法用于建立连接,disconnect()方法用于关闭连接,is_connected属性可实时查看连接状态。

3.1.2 上下文管理器自动管理连接

使用async with上下文管理器可避免手动管理连接,代码更简洁安全:

async def context_manager_connection():
    async with database:
        print(f"上下文内连接状态: {database.is_connected}")
    # 上下文结束后自动断开连接
    print(f"上下文外连接状态: {database.is_connected}")

asyncio.run(context_manager_connection())

代码说明:进入async with块时自动调用connect(),退出时自动调用disconnect(),即使代码块内抛出异常,也能确保连接正常关闭。

3.2 执行SQL查询语句

Databases库支持直接执行原生SQL语句,涵盖查询、插入、更新、删除等核心操作,所有操作均为异步非阻塞。

3.2.1 创建数据表

在执行数据操作前,需先创建对应的数据表,以创建users表为例:

import databases
import asyncio

DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)

# 定义创建表的SQL语句
CREATE_USERS_TABLE = """
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    age INTEGER
);
"""

async def create_table():
    async with database:
        # 执行创建表的SQL语句
        await database.execute(query=CREATE_USERS_TABLE)
        print("users表创建成功!")

asyncio.run(create_table())

代码说明:execute()方法用于执行无返回结果的SQL语句(如CREATEINSERTUPDATEDELETE),这里通过该方法创建users表,包含id(主键)、nameemail(唯一约束)、age四个字段。

3.2.2 插入数据

插入单条数据和多条数据的方法如下:

# 定义插入单条数据的SQL语句
INSERT_USER = """
INSERT INTO users (name, email, age) VALUES (:name, :email, :age)
"""

# 定义插入多条数据的SQL语句
INSERT_MULTIPLE_USERS = """
INSERT INTO users (name, email, age) VALUES (:name, :email, :age)
"""

async def insert_data():
    async with database:
        # 插入单条数据
        user_id = await database.execute(
            query=INSERT_USER,
            values={"name": "张三", "email": "[email protected]", "age": 25}
        )
        print(f"插入单条数据成功,用户ID: {user_id}")

        # 插入多条数据
        users = [
            {"name": "李四", "email": "[email protected]", "age": 28},
            {"name": "王五", "email": "[email protected]", "age": 30}
        ]
        await database.execute_many(
            query=INSERT_MULTIPLE_USERS,
            values=users
        )
        print("插入多条数据成功!")

asyncio.run(insert_data())

代码说明:

  • execute()方法支持通过values参数传递参数化查询数据,避免SQL注入风险,返回值为插入数据的主键ID。
  • execute_many()方法用于批量插入数据,接收列表形式的参数化数据,适合大批量数据写入场景,提升操作效率。

3.2.3 查询数据

查询数据是最常用的操作,Databases库提供fetch_one()fetch_all()fetch_val()三种方法满足不同查询需求。

# 定义查询单条数据的SQL语句
SELECT_USER_BY_ID = "SELECT * FROM users WHERE id = :id"
# 定义查询所有数据的SQL语句
SELECT_ALL_USERS = "SELECT * FROM users"
# 定义查询用户总数的SQL语句
SELECT_USER_COUNT = "SELECT COUNT(*) FROM users"

async def query_data():
    async with database:
        # 查询单条数据
        user = await database.fetch_one(
            query=SELECT_USER_BY_ID,
            values={"id": 1}
        )
        print(f"单条用户数据: {user}")  # 输出形式为字典:{'id':1, 'name':'张三',...}

        # 查询所有数据
        all_users = await database.fetch_all(query=SELECT_ALL_USERS)
        print("所有用户数据:")
        for u in all_users:
            print(f"ID: {u['id']}, 姓名: {u['name']}, 邮箱: {u['email']}, 年龄: {u['age']}")

        # 查询单个值(用户总数)
        user_count = await database.fetch_val(query=SELECT_USER_COUNT)
        print(f"用户总数: {user_count}")

asyncio.run(query_data())

代码说明:

  • fetch_one():返回查询结果的第一条数据,无结果时返回None,适合根据主键查询单条记录的场景。
  • fetch_all():返回查询结果的所有数据,以列表形式存储,每个元素为字典类型,对应数据表的一行记录。
  • fetch_val():返回查询结果的第一个值,适合统计类查询(如COUNTSUM)。

3.2.4 更新与删除数据

更新和删除数据的操作与插入类似,均通过execute()方法执行对应的SQL语句:

# 定义更新数据的SQL语句
UPDATE_USER_AGE = "UPDATE users SET age = :age WHERE id = :id"
# 定义删除数据的SQL语句
DELETE_USER = "DELETE FROM users WHERE id = :id"

async def update_and_delete_data():
    async with database:
        # 更新数据
        update_rows = await database.execute(
            query=UPDATE_USER_AGE,
            values={"age": 26, "id": 1}
        )
        print(f"更新数据行数: {update_rows}")  # 返回受影响的行数

        # 删除数据
        delete_rows = await database.execute(
            query=DELETE_USER,
            values={"id": 3}
        )
        print(f"删除数据行数: {delete_rows}")

asyncio.run(update_and_delete_data())

代码说明:execute()方法执行更新和删除语句时,返回值为受影响的数据行数,可通过该返回值判断操作是否生效。

3.3 结合SQLAlchemy Core使用

Databases库支持与SQLAlchemy Core结合使用,无需编写原生SQL语句,通过Python对象定义数据表结构和查询逻辑,提升代码的可维护性。

3.3.1 定义数据表模型

首先通过SQLAlchemy Core定义users表模型:

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import select, update, delete, insert
import databases
import asyncio

DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
metadata = MetaData()

# 定义users表模型
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", String(50), nullable=False),
    Column("email", String(100), unique=True, nullable=False),
    Column("age", Integer)
)

# 创建数据表(同步操作,适用于初始化)
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)

代码说明:使用SQLAlchemy Core的Table类定义数据表结构,MetaData用于管理数据表元信息,create_all()方法用于同步创建所有定义的数据表。

3.3.2 执行CRUD操作

基于数据表模型执行CRUD操作,无需编写原生SQL:

async def sqlalchemy_crud():
    async with database:
        # 插入数据
        insert_query = users.insert().values(name="赵六", email="[email protected]", age=32)
        user_id = await database.execute(insert_query)
        print(f"插入数据成功,用户ID: {user_id}")

        # 查询数据
        select_query = select(users).where(users.c.id == user_id)
        user = await database.fetch_one(select_query)
        print(f"查询到的用户数据: {user}")

        # 更新数据
        update_query = update(users).where(users.c.id == user_id).values(age=33)
        update_rows = await database.execute(update_query)
        print(f"更新数据行数: {update_rows}")

        # 删除数据
        delete_query = delete(users).where(users.c.id == user_id)
        delete_rows = await database.execute(delete_query)
        print(f"删除数据行数: {delete_rows}")

asyncio.run(sqlalchemy_crud())

代码说明:SQLAlchemy Core提供insert()select()update()delete()等方法构建查询对象,Databases库可直接执行这些查询对象,实现与原生SQL一致的功能,同时提升代码的可读性和可维护性。

四、实际案例:异步用户管理系统

4.1 案例需求

构建一个简单的异步用户管理系统,支持用户的创建、查询、更新和删除操作,配合FastAPI框架实现Web接口(注:FastAPI为异步Web框架,与Databases库适配性极佳)。

4.2 项目结构

user_management_system/
├── main.py
└── test.db

4.3 代码实现

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import databases
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import select

# 配置数据库
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
metadata = MetaData()

# 定义用户表模型
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", String(50), nullable=False),
    Column("email", String(100), unique=True, nullable=False),
    Column("age", Integer)
)

# 创建数据表
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)

# 初始化FastAPI应用
app = FastAPI(title="异步用户管理系统")

# 定义Pydantic数据模型,用于数据验证
class UserCreate(BaseModel):
    name: str
    email: str
    age: int

class UserResponse(UserCreate):
    id: int

    class Config:
        orm_mode = True

# 数据库连接与断开事件
@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

# 创建用户接口
@app.post("/users/", response_model=UserResponse, summary="创建新用户")
async def create_user(user: UserCreate):
    try:
        query = users.insert().values(**user.dict())
        user_id = await database.execute(query)
        return {**user.dict(), "id": user_id}
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"创建用户失败: {str(e)}")

# 查询单个用户接口
@app.get("/users/{user_id}", response_model=UserResponse, summary="根据ID查询用户")
async def get_user(user_id: int):
    query = select(users).where(users.c.id == user_id)
    user = await database.fetch_one(query)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

# 查询所有用户接口
@app.get("/users/", summary="查询所有用户")
async def get_all_users():
    query = select(users)
    all_users = await database.fetch_all(query)
    return {"users": all_users}

# 更新用户接口
@app.put("/users/{user_id}", summary="更新用户信息")
async def update_user(user_id: int, user: UserCreate):
    query = users.update().where(users.c.id == user_id).values(**user.dict())
    update_rows = await database.execute(query)
    if update_rows == 0:
        raise HTTPException(status_code=404, detail="用户不存在")
    return {"message": "用户信息更新成功"}

# 删除用户接口
@app.delete("/users/{user_id}", summary="删除用户")
async def delete_user(user_id: int):
    query = users.delete().where(users.c.id == user_id)
    delete_rows = await database.execute(query)
    if delete_rows == 0:
        raise HTTPException(status_code=404, detail="用户不存在")
    return {"message": "用户删除成功"}

代码说明:

  1. 该案例结合FastAPI框架实现用户管理系统的Web接口,Pydantic用于请求数据验证和响应数据格式化。
  2. 通过FastAPI的startupshutdown事件,实现应用启动时自动连接数据库,关闭时自动断开连接。
  3. 每个接口对应用户的一种操作,通过Databases库执行SQLAlchemy Core构建的查询对象,实现异步数据库交互。
  4. 加入异常处理逻辑,确保接口返回友好的错误提示。

4.4 运行与测试

  1. 安装依赖:
   pip install databases fastapi uvicorn sqlalchemy pydantic
  1. 启动应用:
   uvicorn main:app --reload
  1. 访问接口文档:打开浏览器访问http://127.0.0.1:8000/docs,可通过自动生成的Swagger文档测试所有接口。

五、相关资源

  • Pypi地址:https://pypi.org/project/Databases
  • Github地址:https://github.com/encode/databases
  • 官方文档地址:https://www.encode.io/databases/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Prometheus Client 从入门到精通实战教程

Prometheus是一款开源的监控告警系统,而prometheus_client库是Python应用接入Prometheus监控的核心工具,它能让开发者轻松在Python程序中定义、暴露监控指标。其工作原理是通过在代码中实例化不同类型的指标对象,收集数据后以HTTP接口形式暴露,供Prometheus服务器定时拉取。该库遵循Apache License 2.0开源协议,优点是轻量易用、支持多类型指标、与Prometheus生态无缝兼容;缺点是高级功能需结合Prometheus服务端配置,且无内置的数据持久化能力。

一、prometheus_client库核心基础

1.1 库的用途

prometheus_client是Python应用与Prometheus监控系统对接的官方客户端库,主要用于在Python程序中埋点各类监控指标,比如业务指标(接口请求量、订单完成数)、系统指标(CPU使用率、内存占用)、自定义指标(函数执行耗时、任务失败次数)等,这些指标会以标准化格式暴露,供Prometheus采集、存储和分析,最终实现对Python应用的实时监控与告警。

1.2 核心工作原理

  1. 指标定义:开发者在Python代码中创建对应类型的指标实例(如计数器、仪表盘),并为指标添加标签(label)用于区分不同维度的数据。
  2. 指标数据采集:程序运行过程中,通过调用指标实例的方法更新数据(如计数器的inc()方法)。
  3. 指标暴露:通过库提供的HTTP服务,将所有指标数据以Prometheus支持的文本格式暴露在指定端口(默认8000)。
  4. Prometheus拉取数据:Prometheus服务器按照配置的时间间隔,主动从Python应用暴露的接口拉取指标数据,存储到时序数据库中,供后续查询和可视化。

1.3 优缺点分析

| 特性 | 优点 | 缺点 |
||||
| 易用性 | 接口设计简洁,新手可快速上手;支持多种常见指标类型 | 高级监控场景(如分布式追踪)需结合其他工具 |
| 兼容性 | 完美适配Prometheus生态;支持Python 3.6+所有版本 | 无内置数据持久化,指标数据依赖Prometheus拉取 |
| 功能扩展性 | 支持自定义指标类型;可通过标签实现多维度监控 | 指标命名和标签设计不当易导致数据膨胀 |

1.4 开源协议

prometheus_client库采用Apache License 2.0开源协议,这意味着开发者可以自由地使用、修改、分发该库的代码,无论是商业项目还是开源项目,只要遵循协议要求保留原作者的版权声明即可。

二、prometheus_client库安装与环境准备

2.1 安装方法

prometheus_client库已发布到PyPI,支持pip一键安装,适用于所有主流Python环境(Windows、Linux、macOS)。

打开命令行终端,执行以下安装命令:

pip install prometheus-client

安装完成后,可通过以下命令验证是否安装成功:

pip show prometheus-client

若终端输出库的版本号、作者等信息,则说明安装成功。

2.2 环境依赖说明

  • Python版本要求:Python 3.6及以上版本
  • 依赖库:该库无强依赖第三方库,仅依赖Python标准库(如http.server、threading等)
  • 运行环境:可在普通Python脚本、Django/Flask Web应用、Celery任务队列等场景中运行

三、prometheus_client核心指标类型与使用实战

prometheus_client提供了4种核心指标类型,分别对应不同的监控场景,开发者需根据实际需求选择合适的指标类型。

3.1 计数器(Counter):单调递增的指标

Counter是最常用的指标类型,适用于记录只会增加不会减少的数据,比如接口请求次数、任务失败次数、错误发生次数等。Counter的核心方法是inc(),用于将指标值加1;也可通过inc(n)指定增加的数值(n需为正数)。

实战案例:统计接口请求次数

以下代码实现了一个简单的HTTP接口,使用Counter统计接口被访问的总次数,并暴露指标供Prometheus采集。

from prometheus_client import Counter, start_http_server
from http.server import BaseHTTPRequestHandler, HTTPServer
import time

# 1. 定义Counter指标
# 参数说明:
# name: 指标名称,需符合Prometheus命名规范(字母、数字、下划线)
# documentation: 指标描述,用于说明指标含义
# labelnames: 标签列表,用于区分不同维度的数据(可选)
request_counter = Counter(
    'api_requests_total',
    'Total number of API requests',
    labelnames=['method', 'endpoint']
)

# 2. 定义HTTP请求处理器
class SimpleAPIHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 2.1 根据请求路径判断接口
        if self.path == '/hello':
            # 2.2 更新Counter指标:method为GET,endpoint为/hello
            request_counter.labels(method='GET', endpoint='/hello').inc()
            # 2.3 构造响应
            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()
            self.wfile.write(b"Hello, Prometheus!")
        else:
            # 2.4 处理未知接口
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b"404 Not Found")

# 3. 启动Prometheus指标暴露服务
# start_http_server函数会在指定端口启动一个HTTP服务,用于暴露指标
# 端口号可自定义,建议选择未被占用的端口(如8000)
start_http_server(8000)
print("Prometheus metrics server running on port 8000...")

# 4. 启动HTTP接口服务
if __name__ == '__main__':
    server_address = ('', 8080)
    httpd = HTTPServer(server_address, SimpleAPIHandler)
    print("API server running on port 8080...")
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        pass
    httpd.server_close()

代码运行与验证步骤

  1. 运行上述代码,终端会输出以下信息:
    Prometheus metrics server running on port 8000... API server running on port 8080...
  2. 打开浏览器访问http://localhost:8080/hello,多次刷新页面,模拟接口请求。
  3. 访问http://localhost:8000,可看到暴露的指标数据,其中api_requests_total指标会随着接口访问次数增加而递增,格式如下:
    # HELP api_requests_total Total number of API requests # TYPE api_requests_total counter api_requests_total{endpoint="/hello",method="GET"} 5.0

3.2 仪表盘(Gauge):可增可减的指标

Gauge适用于记录可以增加也可以减少的数据,比如内存占用、CPU使用率、当前在线用户数、队列长度等。Gauge提供了丰富的方法:

  • inc():加1
  • dec():减1
  • set(n):直接设置指标值为n
  • inc_to(n):增加到n(若当前值小于n)
  • dec_to(n):减少到n(若当前值大于n)

实战案例:监控系统内存占用

以下代码使用psutil库获取系统内存占用,并通过Gauge指标暴露给Prometheus。

from prometheus_client import Gauge, start_http_server
import psutil
import time

# 1. 定义Gauge指标:监控系统内存使用率
memory_usage_gauge = Gauge(
    'system_memory_usage_percent',
    'System memory usage percentage'
)

# 2. 定义Gauge指标:监控系统可用内存(单位:MB)
available_memory_gauge = Gauge(
    'system_available_memory_mb',
    'System available memory in megabytes'
)

# 3. 函数:更新内存指标数据
def update_memory_metrics():
    while True:
        # 3.1 获取系统内存信息
        memory_info = psutil.virtual_memory()
        # 3.2 更新内存使用率指标
        memory_usage_gauge.set(memory_info.percent)
        # 3.3 更新可用内存指标(转换为MB)
        available_memory = memory_info.available / 1024 / 1024
        available_memory_gauge.set(available_memory)
        # 3.4 每隔10秒更新一次
        time.sleep(10)

if __name__ == '__main__':
    # 4. 启动指标暴露服务
    start_http_server(8000)
    print("Metrics server running on port 8000...")
    # 5. 启动内存指标更新线程
    update_memory_metrics()

代码说明

  1. 首先导入psutil库(需提前安装:pip install psutil),用于获取系统硬件信息。
  2. 定义两个Gauge指标,分别监控内存使用率和可用内存。
  3. update_memory_metrics函数通过循环获取内存信息,并调用set()方法更新指标值。
  4. 运行代码后,访问http://localhost:8000,可看到实时的内存指标数据。

3.3 直方图(Histogram):统计数据分布

Histogram用于统计数据的分布情况,比如接口响应时间、函数执行耗时等。它会将数据划分到多个区间(bucket),并记录每个区间内的数据数量,同时还会记录数据的总和与总次数。

Histogram的核心参数是buckets,用于定义区间边界,默认区间为[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]

实战案例:统计函数执行耗时分布

以下代码使用Histogram统计函数process_task的执行耗时分布,并暴露指标。

from prometheus_client import Histogram, start_http_server
import time
import random

# 1. 定义Histogram指标
# buckets参数:自定义区间,单位为秒
task_duration_histogram = Histogram(
    'task_process_duration_seconds',
    'Distribution of task processing duration',
    buckets=[0.1, 0.2, 0.5, 1.0, 2.0]
)

# 2. 定义待监控的函数
@task_duration_histogram.time()
def process_task():
    """模拟任务处理函数,耗时随机"""
    duration = random.uniform(0.05, 2.5)
    time.sleep(duration)
    return f"Task completed in {duration:.2f} seconds"

# 3. 模拟任务执行
def run_tasks():
    while True:
        process_task()
        time.sleep(1)

if __name__ == '__main__':
    # 4. 启动指标暴露服务
    start_http_server(8000)
    print("Metrics server running on port 8000...")
    # 5. 运行任务
    run_tasks()

代码说明

  1. 使用@task_duration_histogram.time()装饰器,可自动统计被装饰函数的执行耗时,并更新Histogram指标。
  2. process_task函数通过random.uniform()模拟随机耗时,范围为0.05到2.5秒。
  3. 运行代码后,访问http://localhost:8000,可看到Histogram指标的三个部分:
    • task_process_duration_seconds_bucket{le="0.1"}:耗时≤0.1秒的任务数量
    • task_process_duration_seconds_sum:所有任务的总耗时
    • task_process_duration_seconds_count:任务的总次数

3.4 摘要(Summary):统计数据的分位数

Summary与Histogram类似,都用于统计数据分布,但Summary是直接计算数据的分位数(如中位数、95分位数、99分位数),而不需要预先定义区间。它适用于需要快速了解数据分布特征的场景,比如接口响应时间的P50、P95、P99值。

实战案例:统计接口响应时间分位数

以下代码使用Summary统计HTTP接口的响应时间分位数。

from prometheus_client import Summary, start_http_server
from http.server import BaseHTTPRequestHandler, HTTPServer
import time
import random

# 1. 定义Summary指标
# quantiles参数:指定需要统计的分位数及误差范围
# 例如(0.5, 0.05)表示中位数的误差不超过5%
request_duration_summary = Summary(
    'api_request_duration_seconds',
    'API request duration distribution',
    quantiles={0.5: 0.05, 0.95: 0.01, 0.99: 0.001}
)

# 2. 装饰器:统计函数执行时间
def measure_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start_time
        # 更新Summary指标
        request_duration_summary.observe(duration)
        return result
    return wrapper

# 3. 定义HTTP请求处理器
class APIHandler(BaseHTTPRequestHandler):
    @measure_time
    def do_GET(self):
        if self.path == '/data':
            # 模拟数据处理耗时
            time.sleep(random.uniform(0.01, 0.5))
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(b'{"status": "success", "data": "hello world"}')
        else:
            self.send_response(404)
            self.end_headers()

if __name__ == '__main__':
    # 4. 启动指标暴露服务
    start_http_server(8000)
    print("Metrics server running on port 8000...")
    # 5. 启动HTTP服务
    server = HTTPServer(('', 8080), APIHandler)
    print("API server running on port 8080...")
    server.serve_forever()

代码说明

  1. 定义Summary指标时,通过quantiles参数指定需要统计的分位数:中位数(0.5)、95分位数(0.95)、99分位数(0.99)。
  2. 自定义装饰器measure_time,用于计算函数执行耗时,并调用observe()方法更新Summary指标。
  3. 访问http://localhost:8080/data多次后,访问http://localhost:8000,可看到Summary指标的分位数数据,例如:
    # HELP api_request_duration_seconds API request duration distribution # TYPE api_request_duration_seconds summary api_request_duration_seconds{quantile="0.5"} 0.12 api_request_duration_seconds{quantile="0.95"} 0.45 api_request_duration_seconds{quantile="0.99"} 0.49 api_request_duration_seconds_sum 12.34 api_request_duration_seconds_count 50

四、prometheus_client在Web框架中的集成实战

在实际项目中,Python Web应用(如Flask、Django)是监控的重点场景,以下分别介绍prometheus_client与Flask、Django框架的集成方法。

4.1 与Flask框架集成

Flask是轻量级Web框架,集成prometheus_client只需两步:定义指标、注册指标暴露接口。

实战案例:Flask应用监控

from flask import Flask
from prometheus_client import Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST
import time
import random

app = Flask(__name__)

# 1. 定义监控指标
# 1.1 接口请求次数计数器
flask_request_counter = Counter(
    'flask_requests_total',
    'Total number of Flask requests',
    labelnames=['endpoint', 'method', 'status_code']
)

# 1.2 接口响应时间仪表盘
flask_request_duration_gauge = Gauge(
    'flask_request_duration_seconds',
    'Flask request duration',
    labelnames=['endpoint']
)

# 2. 自定义中间件:统计请求指标
@app.before_request
def before_request():
    g.start_time = time.time()

@app.after_request
def after_request(response):
    # 计算请求耗时
    duration = time.time() - g.start_time
    # 更新响应时间指标
    flask_request_duration_gauge.labels(endpoint=request.endpoint).set(duration)
    # 更新请求次数指标
    flask_request_counter.labels(
        endpoint=request.endpoint,
        method=request.method,
        status_code=response.status_code
    ).inc()
    return response

# 3. 定义业务接口
@app.route('/user/<int:user_id>')
def get_user(user_id):
    # 模拟数据库查询耗时
    time.sleep(random.uniform(0.02, 0.2))
    return {"user_id": user_id, "name": "test_user", "age": 20}

@app.route('/order')
def get_order():
    # 模拟接口耗时
    time.sleep(random.uniform(0.05, 0.3))
    return {"order_id": "123456", "amount": 99.9}

# 4. 暴露Prometheus指标接口
@app.route('/metrics')
def metrics():
    return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

代码说明

  1. 使用before_requestafter_request装饰器,在请求处理前后统计耗时和请求次数。
  2. 注册/metrics接口,通过generate_latest()函数生成Prometheus支持的指标数据格式。
  3. 运行Flask应用后,访问http://localhost:5000/user/1http://localhost:5000/order,再访问http://localhost:5000/metrics即可查看监控指标。

4.2 与Django框架集成

Django是全栈Web框架,集成prometheus_client需要借助中间件和视图函数。

步骤1:定义监控指标

在Django项目的utils/metrics.py文件中定义指标:

from prometheus_client import Counter, Gauge

# 接口请求次数计数器
django_request_counter = Counter(
    'django_requests_total',
    'Total number of Django requests',
    labelnames=['view', 'method', 'status_code']
)

# 接口响应时间仪表盘
django_request_duration_gauge = Gauge(
    'django_request_duration_seconds',
    'Django request duration',
    labelnames=['view']
)

步骤2:编写中间件

middleware.py文件中编写中间件,统计请求指标:

import time
from django.utils.deprecation import MiddlewareMixin
from utils.metrics import django_request_counter, django_request_duration_gauge

class PrometheusMetricsMiddleware(MiddlewareMixin):
    def process_request(self, request):
        request._start_time = time.time()
        return None

    def process_response(self, request, response):
        if hasattr(request, '_start_time'):
            duration = time.time() - request._start_time
            # 获取视图名称
            view_name = request.resolver_match.view_name if request.resolver_match else 'unknown'
            # 更新指标
            django_request_duration_gauge.labels(view=view_name).set(duration)
            django_request_counter.labels(
                view=view_name,
                method=request.method,
                status_code=response.status_code
            ).inc()
        return response

步骤3:注册中间件和指标视图

在项目的settings.py中注册中间件:

MIDDLEWARE = [
    # 其他中间件...
    'middleware.PrometheusMetricsMiddleware',
]

views.py中定义指标暴露视图:

from django.http import HttpResponse
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from django.views.decorators.csrf import csrf_exempt

@csrf_exempt
def metrics(request):
    return HttpResponse(generate_latest(), content_type=CONTENT_TYPE_LATEST)

urls.py中注册URL:

from django.urls import path
from .views import metrics, get_user

urlpatterns = [
    path('metrics/', metrics),
    path('user/<int:user_id>/', get_user),
]

代码说明

  1. 通过Django中间件process_requestprocess_response方法,在请求处理前后统计耗时。
  2. 注册/metrics接口,用于暴露指标数据。
  3. 运行Django应用后,访问业务接口,再访问/metrics即可查看监控数据。

五、实际业务场景综合实战:电商订单监控

以下以电商订单系统为例,展示prometheus_client在实际业务场景中的综合应用,监控指标包括:订单创建次数、订单支付成功率、订单处理耗时等。

5.1 业务场景需求

  1. 统计订单创建的总次数,区分PC端和移动端。
  2. 统计订单支付成功率(支付成功数/订单创建数)。
  3. 统计订单处理的耗时分布。

5.2 代码实现

from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
import random
import threading

# 1. 定义业务监控指标
# 1.1 订单创建计数器
order_create_counter = Counter(
    'order_create_total',
    'Total number of created orders',
    labelnames=['platform']  # platform: pc/mobile
)

# 1.2 订单支付计数器
order_pay_counter = Counter(
    'order_pay_total',
    'Total number of paid orders',
    labelnames=['platform']
)

# 1.3 订单支付成功率仪表盘
order_pay_success_rate_gauge = Gauge(
    'order_pay_success_rate',
    'Order payment success rate',
    labelnames=['platform']
)

# 1.4 订单处理耗时直方图
order_process_duration_histogram = Histogram(
    'order_process_duration_seconds',
    'Distribution of order processing duration',
    buckets=[0.1, 0.3, 0.5, 1.0]
)

# 2. 模拟订单创建函数
@order_process_duration_histogram.time()
def create_order(platform):
    """创建订单,返回订单ID"""
    # 模拟订单处理耗时
    time.sleep(random.uniform(0.05, 0.8))
    order_id = f"ORD{int(time.time() * 1000)}{random.randint(100, 999)}"
    # 更新订单创建计数器
    order_create_counter.labels(platform=platform).inc()
    print(f"Created order {order_id} on {platform} platform")
    return order_id

# 3. 模拟订单支付函数
def pay_order(platform, order_id):
    """支付订单,模拟支付成功率"""
    pay_success = random.random() > 0.2  # 80%支付成功率
    if pay_success:
        order_pay_counter.labels(platform=platform).inc()
        print(f"Order {order_id} paid successfully")
    else:
        print(f"Order {order_id} payment failed")
    return pay_success

# 4. 计算支付成功率
def calculate_pay_success_rate():
    while True:
        for platform in ['pc', 'mobile']:
            # 获取订单创建数和支付数
            create_count = order_create_counter.labels(platform=platform)._value.get()
            pay_count = order_pay_counter.labels(platform=platform)._value.get()
            # 计算成功率
            if create_count > 0:
                success_rate = pay_count / create_count
                order_pay_success_rate_gauge.labels(platform=platform).set(success_rate)
        time.sleep(10)

# 5. 模拟业务运行
def run_business():
    platforms = ['pc', 'mobile']
    while True:
        platform = random.choice(platforms)
        order_id = create_order(platform)
        # 模拟支付延迟
        time.sleep(random.uniform(1, 3))
        pay_order(platform, order_id)
        time.sleep(1)

if __name__ == '__main__':
    # 启动指标暴露服务
    start_http_server(8000)
    print("Metrics server running on port 8000...")

    # 启动支付成功率计算线程
    rate_thread = threading.Thread(target=calculate_pay_success_rate, daemon=True)
    rate_thread.start()

    # 启动业务线程
    business_thread = threading.Thread(target=run_business, daemon=True)
    business_thread.start()

    # 主线程保持运行
    while True:
        time.sleep(1)

代码说明

  1. 定义了4个业务指标,覆盖订单创建、支付、成功率和处理耗时。
  2. create_order函数使用Histogram装饰器自动统计处理耗时,同时更新订单创建计数器。
  3. calculate_pay_success_rate函数在独立线程中运行,每隔10秒计算一次支付成功率,并更新Gauge指标。
  4. 运行代码后,访问http://localhost:8000可查看所有业务指标数据,这些数据可用于Prometheus监控面板展示,例如:
    • 通过order_create_total查看不同平台的订单创建趋势
    • 通过order_pay_success_rate监控支付成功率,当低于阈值时触发告警
    • 通过order_process_duration_seconds分析订单处理耗时的分布情况

六、相关资源地址

  • PyPI地址:https://pypi.org/project/prometheus-client
  • Github地址:https://github.com/prometheus/client_python
  • 官方文档地址:https://prometheus.github.io/client_python/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:s3transfer 高效管理AWS S3文件传输的指南

一、s3transfer 库核心概述

s3transfer 是 AWS 官方推出的一款 Python 库,专门用于高效、可靠地处理与 Amazon S3 存储服务之间的文件传输操作。其工作原理是基于分块上传/下载、并发处理和重试机制,将大文件拆分为多个小块并行传输,同时支持断点续传,极大提升了传输效率和稳定性。

该库的优点十分突出:支持大文件分块传输、并发任务调度、自动重试失败请求、与 AWS SDK for Python(boto3)深度兼容;缺点则是功能高度聚焦于 S3 传输,不支持其他云存储服务,且需要依赖 boto3 配置 AWS 凭证。s3transfer 的开源协议为 Apache License 2.0,允许商业和非商业用途的自由使用、修改和分发。

二、s3transfer 安装与环境准备

2.1 安装方式

s3transfer 通常与 boto3 配套使用,因为它依赖 boto3 提供的 AWS 客户端和凭证管理功能。我们可以通过 Python 包管理工具 pip 直接安装,安装命令如下:

pip install s3transfer boto3

执行上述命令后,pip 会自动下载并安装 s3transfer 及其依赖的 boto3、botocore 等库,满足后续开发的环境需求。

2.2 AWS 凭证配置

要使用 s3transfer 操作 S3 存储桶,必须先配置 AWS 访问凭证,这是与 AWS 服务建立连接的前提。常见的配置方式有两种:

  1. 环境变量配置
    在系统环境变量中设置 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY,这两个值可以从 AWS 控制台的 IAM 服务中获取。以 Linux/macOS 系统为例,配置命令如下:
    bash export AWS_ACCESS_KEY_ID="your-access-key-id" export AWS_SECRET_ACCESS_KEY="your-secret-access-key"
    Windows 系统则可以通过“系统属性-高级-环境变量”界面添加对应的环境变量。
  2. 配置文件配置
    在用户主目录下创建 .aws 文件夹,并在其中新建 credentials 文件,文件内容格式如下:
    ini

[default]

aws_access_key_id = your-access-key-id aws_secret_access_key = your-secret-access-key
同时,还可以在 .aws 文件夹下创建 config 文件,设置默认的 AWS 区域:
ini

[default]

region = us-east-1
两种配置方式任选其一即可,配置完成后,s3transfer 会自动读取凭证信息,无需在代码中硬编码,保证了凭证的安全性。

三、s3transfer 核心功能与代码实例

s3transfer 的核心功能围绕 S3 的文件上传、下载、批量操作展开,其 API 设计简洁易懂,即使是 Python 新手也能快速上手。下面我们结合具体的代码实例,详细讲解每个功能的使用方法。

3.1 基本文件上传

基本文件上传适用于小文件的传输场景,s3transfer 会直接将文件内容发送到 S3 存储桶。在代码实现中,我们需要先通过 boto3 创建 S3 客户端,再利用 s3transfer 的 TransferManager 类来管理传输任务。

import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError

# 创建 boto3 S3 客户端
s3_client = boto3.client('s3')

# 初始化 TransferManager
transfer_manager = TransferManager(s3_client)

# 定义本地文件路径和 S3 存储桶及目标路径
local_file_path = 'test_file.txt'
bucket_name = 'your-s3-bucket-name'
s3_key = 'upload/test_file.txt'

try:
    # 执行文件上传任务
    future = transfer_manager.upload(local_file_path, bucket_name, s3_key)
    # 等待上传任务完成
    future.result()
    print(f"文件 {local_file_path} 成功上传到 S3: s3://{bucket_name}/{s3_key}")
except TransferFailedError as e:
    print(f"文件上传失败: {str(e)}")
finally:
    # 关闭 TransferManager,释放资源
    transfer_manager.shutdown()

代码说明

  • 首先导入所需的库和异常类,TransferManager 是 s3transfer 的核心类,负责任务的调度和执行;TransferFailedError 用于捕获传输过程中可能出现的异常。
  • 通过 boto3.client('s3') 创建 S3 客户端,客户端会自动读取我们之前配置的 AWS 凭证。
  • 初始化 TransferManager 后,调用 upload 方法,传入本地文件路径、S3 存储桶名称和目标键(即文件在 S3 中的路径),该方法会返回一个 Future 对象。
  • 调用 future.result() 会阻塞当前线程,直到上传任务完成,这样可以确保我们能获取到上传的最终状态。
  • 最后在 finally 块中调用 transfer_manager.shutdown(),关闭 TransferManager,释放占用的系统资源,这是一个良好的编程习惯,避免资源泄露。

3.2 大文件分块上传

当传输的文件体积较大(比如超过 100MB)时,使用基本上传方式效率较低,且容易因为网络波动导致传输失败。此时,我们可以利用 s3transfer 的分块上传功能,将大文件拆分为多个小块(默认块大小为 8MB),并行上传到 S3,同时支持断点续传。

import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError

# 创建 S3 客户端
s3_client = boto3.client('s3')

# 配置 TransferManager 的分块上传参数
transfer_config = {
    'multipart_threshold': 10 * 1024 * 1024,  # 超过 10MB 的文件自动分块
    'multipart_chunksize': 5 * 1024 * 1024    # 每个分块的大小为 5MB
}

# 初始化 TransferManager 并传入配置参数
transfer_manager = TransferManager(s3_client, config=transfer_config)

# 定义大文件路径和 S3 目标路径
local_large_file = 'large_data.zip'
bucket_name = 'your-s3-bucket-name'
s3_large_key = 'upload/large_data.zip'

try:
    future = transfer_manager.upload(local_large_file, bucket_name, s3_large_key)
    future.result()
    print(f"大文件 {local_large_file} 成功分块上传到 S3")
except TransferFailedError as e:
    print(f"大文件上传失败: {str(e)}")
finally:
    transfer_manager.shutdown()

代码说明

  • 我们通过一个字典 transfer_config 来配置分块传输的参数,multipart_threshold 表示当文件大小超过该值时,自动启用分块上传;multipart_chunksize 定义了每个分块的大小。
  • 将配置参数传入 TransferManager 的构造函数,这样 TransferManager 就会按照我们的配置来处理大文件传输。
  • 分块上传的 API 调用方式与基本上传完全一致,TransferManager 会自动判断文件大小,选择合适的传输方式,对开发者来说是透明的,极大降低了使用门槛。

3.3 文件下载

文件下载的使用方法与上传类似,TransferManager 提供了 download 方法,支持从 S3 存储桶下载文件到本地。同样支持小文件直接下载和大文件分块下载,无需额外配置,TransferManager 会自动处理。

import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError

s3_client = boto3.client('s3')
transfer_manager = TransferManager(s3_client)

# 定义 S3 源文件和本地目标路径
bucket_name = 'your-s3-bucket-name'
s3_source_key = 'upload/test_file.txt'
local_download_path = 'downloaded_test_file.txt'

try:
    future = transfer_manager.download(bucket_name, s3_source_key, local_download_path)
    future.result()
    print(f"文件成功从 S3 下载到本地: {local_download_path}")
except TransferFailedError as e:
    print(f"文件下载失败: {str(e)}")
finally:
    transfer_manager.shutdown()

代码说明

  • download 方法的参数顺序与 upload 相反,第一个参数是 S3 存储桶名称,第二个参数是文件在 S3 中的键,第三个参数是本地目标路径。
  • 其他代码逻辑与上传功能一致,通过 future.result() 等待下载完成,捕获 TransferFailedError 处理异常,最后关闭 TransferManager

3.4 批量文件传输

在实际开发中,我们经常需要批量上传或下载多个文件,s3transfer 支持通过循环调用 uploaddownload 方法来实现批量操作,结合 concurrent.futures 模块,还可以进一步提升批量操作的效率。

import os
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError

# 创建 S3 客户端
s3_client = boto3.client('s3')
transfer_manager = TransferManager(s3_client)

# 定义批量上传的本地文件夹和 S3 目标存储桶
local_folder = 'batch_upload_files'
bucket_name = 'your-s3-bucket-name'
s3_prefix = 'batch_upload/'

# 遍历本地文件夹中的所有文件
try:
    futures = []
    for filename in os.listdir(local_folder):
        local_file_path = os.path.join(local_folder, filename)
        # 跳过文件夹,只处理文件
        if os.path.isfile(local_file_path):
            s3_key = os.path.join(s3_prefix, filename)
            future = transfer_manager.upload(local_file_path, bucket_name, s3_key)
            futures.append(future)

    # 等待所有上传任务完成
    for future in futures:
        future.result()
    print("所有文件批量上传完成!")
except TransferFailedError as e:
    print(f"批量上传过程中出现错误: {str(e)}")
except Exception as e:
    print(f"未知错误: {str(e)}")
finally:
    transfer_manager.shutdown()

代码说明

  • 首先通过 os.listdir 遍历本地文件夹中的所有文件,使用 os.path.isfile 判断当前路径是否为文件,避免处理文件夹。
  • 对于每个文件,构造其本地路径和 S3 目标键,调用 upload 方法并将返回的 Future 对象添加到列表中。
  • 循环遍历 Future 对象列表,调用 result() 方法等待所有任务完成,这样可以实现多个文件的并行上传,提升批量操作的效率。
  • 除了批量上传,批量下载的实现逻辑类似,只需要将 upload 方法替换为 download 方法,遍历 S3 存储桶中的文件列表即可。

3.5 传输进度监控

在传输大文件时,我们往往需要了解实时的传输进度,s3transfer 支持通过回调函数来实现进度监控。我们可以自定义一个回调函数,在每次传输完一个分块后,更新并打印传输进度。

import os
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError

# 自定义进度回调函数
class ProgressCallback:
    def __init__(self, file_size):
        self.file_size = file_size
        self.transferred = 0

    def __call__(self, bytes_transferred):
        self.transferred += bytes_transferred
        progress = (self.transferred / self.file_size) * 100
        print(f"传输进度: {progress:.2f}% ({self.transferred}/{self.file_size} bytes)", end='\r')

# 创建 S3 客户端
s3_client = boto3.client('s3')
transfer_manager = TransferManager(s3_client)

# 定义文件路径
local_file = 'large_data.zip'
bucket_name = 'your-s3-bucket-name'
s3_key = 'upload/large_data.zip'

# 获取本地文件大小
file_size = os.path.getsize(local_file)
# 初始化进度回调对象
progress_callback = ProgressCallback(file_size)

try:
    future = transfer_manager.upload(
        local_file,
        bucket_name,
        s3_key,
        callback=progress_callback
    )
    future.result()
    print("\n文件上传完成!")
except TransferFailedError as e:
    print(f"\n文件上传失败: {str(e)}")
finally:
    transfer_manager.shutdown()

代码说明

  • 我们定义了一个 ProgressCallback 类,其构造函数接收文件的总大小,__call__ 方法是回调函数的核心,每次被调用时会接收已传输的字节数,并计算当前的传输进度。
  • end='\r' 用于实现进度条的单行刷新,避免打印过多的换行符,提升用户体验。
  • 在调用 upload 方法时,通过 callback 参数传入进度回调对象,这样 s3transfer 会在传输过程中定期调用该回调函数,实时更新传输进度。
  • 进度监控功能同样适用于下载操作,只需要在 download 方法中传入回调函数即可。

四、s3transfer 高级配置与优化

为了进一步提升 s3transfer 的传输性能,我们可以对其进行高级配置,比如调整并发数、设置超时时间、修改分块大小等。下面我们介绍几种常见的优化方式。

4.1 调整并发数

s3transfer 的 TransferManager 支持通过 max_request_concurrency 参数调整并发请求数,并发数越高,传输速度越快,但同时也会占用更多的系统资源和网络带宽。我们可以根据实际的网络环境和硬件配置,合理调整该参数。

import boto3
from s3transfer import TransferManager

s3_client = boto3.client('s3')

# 配置最大并发请求数为 10
transfer_config = {
    'max_request_concurrency': 10
}

transfer_manager = TransferManager(s3_client, config=transfer_config)
# 后续传输逻辑与之前一致
transfer_manager.shutdown()

4.2 设置超时时间

在网络不稳定的环境下,我们可以通过设置超时时间,避免传输任务长时间阻塞。超时时间可以通过 boto3 客户端的配置来实现。

import boto3
from s3transfer import TransferManager

# 创建 S3 客户端时设置超时时间
config = boto3.session.Config(
    connect_timeout=30,  # 连接超时时间 30 秒
    read_timeout=60      # 读取超时时间 60 秒
)
s3_client = boto3.client('s3', config=config)

transfer_manager = TransferManager(s3_client)
# 后续传输逻辑与之前一致
transfer_manager.shutdown()

4.3 自定义重试策略

s3transfer 内置了重试机制,当传输请求失败时,会自动重试。我们可以通过修改 botocore 的重试配置,来自定义重试的次数和间隔时间。

import boto3
from botocore.config import Config
from s3transfer import TransferManager

# 自定义重试配置
retry_config = Config(
    retries={
        'max_attempts': 5,  # 最大重试次数
        'mode': 'standard'  # 重试模式,standard 表示标准重试
    }
)
s3_client = boto3.client('s3', config=retry_config)

transfer_manager = TransferManager(s3_client)
# 后续传输逻辑与之前一致
transfer_manager.shutdown()

五、s3transfer 实际应用案例:S3 文件备份工具

结合前面所学的知识,我们可以开发一个简单的 S3 文件备份工具,该工具能够将指定本地文件夹中的所有文件备份到 S3 存储桶,并支持进度监控和异常处理。

import os
import argparse
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError

class S3BackupTool:
    def __init__(self, bucket_name, aws_region=None):
        self.bucket_name = bucket_name
        # 创建 S3 客户端
        client_config = {}
        if aws_region:
            client_config['region_name'] = aws_region
        self.s3_client = boto3.client('s3',** client_config)
        self.transfer_manager = TransferManager(self.s3_client)

    class ProgressMonitor:
        def __init__(self, total_size):
            self.total_size = total_size
            self.transferred = 0

        def __call__(self, bytes_trans):
            self.transferred += bytes_trans
            progress = (self.transferred / self.total_size) * 100
            print(f"备份进度: {progress:.2f}% ({self.transferred}/{self.total_size} bytes)", end='\r')

    def backup_folder(self, local_folder, s3_prefix='backup/'):
        """备份本地文件夹到 S3 存储桶"""
        if not os.path.isdir(local_folder):
            raise ValueError(f"本地文件夹不存在: {local_folder}")

        # 计算本地文件夹总大小
        total_size = 0
        for root, dirs, files in os.walk(local_folder):
            for file in files:
                file_path = os.path.join(root, file)
                total_size += os.path.getsize(file_path)

        progress_monitor = self.ProgressMonitor(total_size)
        futures = []

        try:
            # 遍历文件夹,上传所有文件
            for root, dirs, files in os.walk(local_folder):
                for file in files:
                    local_file_path = os.path.join(root, file)
                    # 构造 S3 键,保留本地文件夹结构
                    relative_path = os.path.relpath(local_file_path, local_folder)
                    s3_key = os.path.join(s3_prefix, relative_path)

                    future = self.transfer_manager.upload(
                        local_file_path,
                        self.bucket_name,
                        s3_key,
                        callback=progress_monitor
                    )
                    futures.append(future)

            # 等待所有任务完成
            for future in futures:
                future.result()
            print("\n文件夹备份完成!")
        except TransferFailedError as e:
            print(f"\n备份过程中出现错误: {str(e)}")
            raise
        finally:
            self.transfer_manager.shutdown()

if __name__ == '__main__':
    # 使用 argparse 解析命令行参数
    parser = argparse.ArgumentParser(description='本地文件夹备份到 AWS S3 工具')
    parser.add_argument('--local-folder', required=True, help='需要备份的本地文件夹路径')
    parser.add_argument('--bucket-name', required=True, help='目标 S3 存储桶名称')
    parser.add_argument('--region', help='AWS 区域名称,如 us-east-1')
    args = parser.parse_args()

    # 初始化备份工具并执行备份
    backup_tool = S3BackupTool(args.bucket_name, args.region)
    backup_tool.backup_folder(args.local_folder)

案例说明

  • 该工具封装为 S3BackupTool 类,通过命令行参数接收本地文件夹路径、S3 存储桶名称和 AWS 区域,使用 argparse 模块解析命令行参数,提升工具的易用性。
  • backup_folder 方法是工具的核心,首先计算本地文件夹的总大小,用于进度监控;然后通过 os.walk 遍历文件夹中的所有文件,保留文件的相对路径结构,确保备份到 S3 后的文件结构与本地一致。
  • 集成了进度监控功能,实时显示备份进度;同时捕获 TransferFailedError 异常,处理传输过程中可能出现的错误。
  • 运行该工具时,可以在命令行中输入如下命令:
  python s3_backup_tool.py --local-folder ./my_files --bucket-name my-backup-bucket --region us-east-1

六、相关资源

  • Pypi地址:https://pypi.org/project/s3transfer
  • Github地址:https://github.com/boto/s3transfer
  • 官方文档地址:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-transfer.html

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Motor——异步MongoDB操作的高效解决方案

一、Motor库核心概述

Motor是Python中专门用于异步操作MongoDB数据库的第三方库,它基于PyMongo开发,充分兼容asyncio异步框架,能够让开发者在异步程序中以非阻塞的方式完成MongoDB的增删改查等操作。其工作原理是将PyMongo的同步操作封装为异步协程,借助事件循环实现并发任务处理,避免同步IO操作带来的程序阻塞。

该库的优点在于:完美契合异步编程场景,提升高并发下数据库操作的效率;API设计与PyMongo高度相似,降低开发者的学习迁移成本;支持MongoDB的大部分核心功能,包括索引操作、聚合查询等。缺点则是仅适用于异步项目,同步项目中使用反而会增加复杂度;对MongoDB新版本特性的支持可能存在一定延迟。

Motor的开源协议为Apache License 2.0,这是一个对商业使用友好的开源协议,允许开发者自由修改、分发代码,且无需承担开源义务。

二、Motor库的安装步骤

在使用Motor之前,我们需要先完成库的安装,同时确保本地环境已经安装并启动了MongoDB服务,且Python版本不低于3.6(asyncio特性支持的最低版本)。

2.1 使用pip安装Motor

打开命令行终端,输入以下命令即可完成安装:

pip install motor

这条命令会从PyPI官方源下载并安装最新版本的Motor库,安装完成后,我们就可以在Python异步项目中导入并使用它。

2.2 验证安装是否成功

安装完成后,可以通过以下简单的代码片段验证Motor是否安装成功:

import motor
print(f"Motor库版本:{motor.__version__}")

运行上述代码,如果终端能够正常输出Motor的版本号,说明安装成功;若提示ModuleNotFoundError,则需要检查pip命令是否执行正确,或者Python环境是否存在冲突。

三、Motor库的核心使用方式

Motor的核心操作围绕AsyncIOMotorClient展开,这是Motor提供的异步客户端类,通过它我们可以连接MongoDB数据库、获取集合对象,并执行各类异步数据库操作。以下将详细讲解连接数据库、集合操作、数据增删改查等核心功能,并提供对应的实例代码。

3.1 连接MongoDB数据库

使用Motor连接MongoDB的方式与PyMongo类似,区别在于Motor的客户端是异步的,所有操作都需要使用await关键字。

3.1.1 基础连接示例

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def connect_to_mongodb():
    # 创建异步MongoDB客户端
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    # 验证连接是否成功
    await client.admin.command('ping')
    print("成功连接到MongoDB数据库!")
    # 指定要操作的数据库
    db = client['test_database']
    return db

if __name__ == '__main__':
    # 运行异步函数
    db = asyncio.run(connect_to_mongodb())

代码说明

  1. 首先导入asynciomotor.motor_asyncio中的AsyncIOMotorClient类;
  2. 定义异步函数connect_to_mongodb,在函数内部创建客户端对象,传入MongoDB的连接地址(本地默认地址为mongodb://localhost:27017/);
  3. 通过client.admin.command('ping')验证连接,该操作需要使用await关键字等待执行完成;
  4. 最后指定要操作的数据库test_database,并返回数据库对象。

3.1.2 带认证信息的连接

如果MongoDB设置了用户名和密码,连接时需要传入认证参数:

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def connect_with_auth():
    # 带用户名和密码的连接字符串格式:mongodb://用户名:密码@地址:端口/
    client = AsyncIOMotorClient('mongodb://root:123456@localhost:27017/')
    await client.admin.command('ping')
    print("带认证信息连接成功!")
    return client['test_database']

if __name__ == '__main__':
    db = asyncio.run(connect_with_auth())

代码说明:连接字符串中加入了用户名root和密码123456,适用于开启了身份验证的MongoDB环境。

3.2 集合的基本操作

在MongoDB中,集合相当于关系型数据库中的表,Motor通过db.集合名的方式获取集合对象,支持集合的创建、删除、查询存在性等操作。

3.2.1 获取集合并查询集合列表

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def collection_operations():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']

    # 获取集合对象
    collection = db['test_collection']
    print("获取集合对象成功!")

    # 查询数据库中所有的集合名称
    collection_list = await db.list_collection_names()
    print(f"数据库中的集合列表:{collection_list}")

    # 判断集合是否存在
    is_exist = 'test_collection' in collection_list
    print(f"test_collection是否存在:{is_exist}")

if __name__ == '__main__':
    asyncio.run(collection_operations())

代码说明

  1. 通过db['test_collection']获取集合对象,也可以使用db.test_collection的方式;
  2. db.list_collection_names()是异步方法,需要await关键字,用于获取当前数据库下的所有集合名称;
  3. 通过判断集合名是否在列表中,确认集合是否存在。

3.2.2 删除集合

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def drop_collection():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 删除集合
    await collection.drop()
    print("集合删除成功!")

    # 验证删除结果
    collection_list = await db.list_collection_names()
    print(f"删除后集合列表:{collection_list}")

if __name__ == '__main__':
    asyncio.run(drop_collection())

代码说明:调用集合对象的drop()方法可以删除指定集合,该方法为异步操作,需要await关键字。

3.3 数据的增删改查操作

数据操作是Motor的核心功能,包括插入数据、查询数据、更新数据和删除数据,所有操作均为异步协程,需要结合await关键字使用。

3.3.1 插入数据

Motor支持插入单条数据和多条数据,对应的方法分别是insert_one()insert_many()

插入单条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def insert_single_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 定义要插入的数据
    data = {
        'name': '张三',
        'age': 25,
        'gender': '男',
        'hobbies': ['篮球', '编程']
    }

    # 插入单条数据
    result = await collection.insert_one(data)
    print(f"插入数据的ID:{result.inserted_id}")

if __name__ == '__main__':
    asyncio.run(insert_single_data())

代码说明

  1. 定义一个字典类型的数据,符合MongoDB的文档格式;
  2. 调用insert_one()方法插入数据,该方法返回一个InsertOneResult对象;
  3. 通过result.inserted_id可以获取插入数据的唯一ID(ObjectId)。
插入多条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def insert_multiple_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 定义多条数据
    data_list = [
        {'name': '李四', 'age': 22, 'gender': '女'},
        {'name': '王五', 'age': 28, 'gender': '男'},
        {'name': '赵六', 'age': 30, 'gender': '男'}
    ]

    # 插入多条数据
    result = await collection.insert_many(data_list)
    print(f"插入数据的ID列表:{result.inserted_ids}")

if __name__ == '__main__':
    asyncio.run(insert_multiple_data())

代码说明

  1. 定义一个包含多个字典的列表,作为要插入的多条数据;
  2. 调用insert_many()方法插入数据,返回InsertManyResult对象;
  3. 通过result.inserted_ids获取所有插入数据的ID列表。

3.3.2 查询数据

查询数据是MongoDB的核心功能之一,Motor提供了find()find_one()方法,分别用于查询多条数据和单条数据,支持条件过滤、字段投影、排序、分页等操作。

查询单条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def find_single_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 查询单条数据:查询name为张三的文档
    data = await collection.find_one({'name': '张三'})
    if data:
        print(f"查询到的数据:{data}")
    else:
        print("未查询到对应数据")

if __name__ == '__main__':
    asyncio.run(find_single_data())

代码说明find_one()方法接收一个查询条件字典,返回符合条件的第一条文档,如果没有符合条件的文档,则返回None

查询多条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def find_multiple_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 查询多条数据:查询age大于25的文档
    cursor = collection.find({'age': {'$gt': 25}})
    # 遍历游标获取数据
    async for data in cursor:
        print(f"查询到的数据:{data}")

if __name__ == '__main__':
    asyncio.run(find_multiple_data())

代码说明

  1. find()方法接收查询条件字典,返回一个异步游标对象(AsyncIOMotorCursor);
  2. 使用async for循环遍历游标,获取所有符合条件的文档;
  3. 查询条件中使用了MongoDB的查询操作符$gt(大于),类似的还有$lt(小于)、$eq(等于)等。
条件过滤、排序与分页
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def find_data_with_filter():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 1. 条件过滤:查询gender为男,且age在20-30之间的文档
    query = {
        'gender': '男',
        'age': {'$gte': 20, '$lte': 30}
    }
    # 2. 字段投影:只返回name、age字段,不返回_id字段
    projection = {'_id': 0, 'name': 1, 'age': 1}
    # 3. 排序:按age降序排列
    sort = [('age', -1)]
    # 4. 分页:跳过前1条数据,获取2条数据
    skip = 1
    limit = 2

    cursor = collection.find(query, projection).sort(sort).skip(skip).limit(limit)
    async for data in cursor:
        print(f"过滤后的数据:{data}")

if __name__ == '__main__':
    asyncio.run(find_data_with_filter())

代码说明

  1. query字典定义查询条件,使用$gte(大于等于)和$lte(小于等于)操作符限定age范围;
  2. projection字典定义返回的字段,1表示返回,0表示不返回;
  3. sort()方法接收排序规则列表,-1表示降序,1表示升序;
  4. skip()方法用于跳过指定数量的文档,limit()方法用于限制返回的文档数量,实现分页功能。

3.3.3 更新数据

Motor支持更新单条数据和多条数据,对应的方法是update_one()update_many(),更新操作需要使用MongoDB的更新操作符。

更新单条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def update_single_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 查询条件:name为张三
    query = {'name': '张三'}
    # 更新内容:将age增加1,添加city字段
    update = {
        '$inc': {'age': 1},
        '$set': {'city': '北京'}
    }

    result = await collection.update_one(query, update)
    print(f"匹配的文档数量:{result.matched_count}")
    print(f"修改的文档数量:{result.modified_count}")

if __name__ == '__main__':
    asyncio.run(update_single_data())

代码说明

  1. query字典定义要更新的文档条件;
  2. update字典使用更新操作符$inc(增加数值)和$set(设置字段值)定义更新内容;
  3. update_one()方法只更新符合条件的第一条文档,返回UpdateResult对象,通过matched_countmodified_count查看匹配和修改的文档数量。
更新多条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def update_multiple_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 查询条件:gender为男
    query = {'gender': '男'}
    # 更新内容:设置city为上海
    update = {'$set': {'city': '上海'}}

    result = await collection.update_many(query, update)
    print(f"匹配的文档数量:{result.matched_count}")
    print(f"修改的文档数量:{result.modified_count}")

if __name__ == '__main__':
    asyncio.run(update_multiple_data())

代码说明update_many()方法会更新所有符合条件的文档,适用于批量更新场景。

3.3.4 删除数据

删除数据的方法包括delete_one()delete_many(),分别用于删除单条和多条符合条件的文档。

删除单条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def delete_single_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 查询条件:name为赵六
    query = {'name': '赵六'}
    result = await collection.delete_one(query)
    print(f"删除的文档数量:{result.deleted_count}")

if __name__ == '__main__':
    asyncio.run(delete_single_data())

代码说明delete_one()方法删除符合条件的第一条文档,返回DeleteResult对象,通过deleted_count查看删除的文档数量。

删除多条数据
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def delete_multiple_data():
    client = AsyncIOMotorClient('mongodb://localhost:27017/')
    db = client['test_database']
    collection = db['test_collection']

    # 查询条件:age小于25
    query = {'age': {'$lt': 25}}
    result = await collection.delete_many(query)
    print(f"删除的文档数量:{result.deleted_count}")

if __name__ == '__main__':
    asyncio.run(delete_multiple_data())

代码说明delete_many()方法删除所有符合条件的文档,适用于批量删除场景,使用时需要谨慎,避免误删数据。

四、Motor库的实际应用案例

下面我们结合一个异步Web服务的场景,展示Motor库的实际应用。我们将使用FastAPI框架搭建一个简单的用户信息管理接口,实现用户信息的增删改查,所有数据库操作均通过Motor完成。

4.1 环境准备

首先安装FastAPI和Uvicorn(ASGI服务器,用于运行FastAPI应用):

pip install fastapi uvicorn

4.2 编写接口代码

from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
import asyncio

# 定义FastAPI应用
app = FastAPI(title="用户信息管理接口", version="1.0")

# 定义数据模型(请求体)
class UserModel(BaseModel):
    name: str
    age: int
    gender: str
    city: str = None

# 全局数据库连接
client = AsyncIOMotorClient('mongodb://localhost:27017/')
db = client['user_db']
collection = db['user_collection']

# 1. 创建用户接口(POST)
@app.post("/users/", summary="创建新用户")
async def create_user(user: UserModel):
    user_dict = user.dict()
    result = await collection.insert_one(user_dict)
    return {"message": "用户创建成功", "user_id": str(result.inserted_id)}

# 2. 查询单个用户接口(GET)
@app.get("/users/{user_name}", summary="根据用户名查询用户")
async def get_user(user_name: str):
    user = await collection.find_one({"name": user_name}, {"_id": 0})
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

# 3. 查询所有用户接口(GET)
@app.get("/users/", summary="查询所有用户")
async def get_all_users(skip: int = 0, limit: int = 10):
    users = []
    cursor = collection.find({}, {"_id": 0}).skip(skip).limit(limit)
    async for user in cursor:
        users.append(user)
    return {"total": len(users), "users": users}

# 4. 更新用户接口(PUT)
@app.put("/users/{user_name}", summary="更新用户信息")
async def update_user(user_name: str, user: UserModel):
    update_data = user.dict(exclude_unset=True)
    result = await collection.update_one(
        {"name": user_name},
        {"$set": update_data}
    )
    if result.matched_count == 0:
        raise HTTPException(status_code=404, detail="用户不存在")
    return {"message": "用户信息更新成功"}

# 5. 删除用户接口(DELETE)
@app.delete("/users/{user_name}", summary="删除用户")
async def delete_user(user_name: str):
    result = await collection.delete_one({"name": user_name})
    if result.deleted_count == 0:
        raise HTTPException(status_code=404, detail="用户不存在")
    return {"message": "用户删除成功"}

if __name__ == '__main__':
    import uvicorn
    # 运行FastAPI应用
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.3 代码说明与运行测试

  1. 代码说明
    • 首先导入FastAPI、Motor等相关模块,定义UserModel作为请求体的数据模型;
    • 创建全局的Motor客户端和集合对象,确保整个应用共享一个数据库连接;
    • 实现5个核心接口:创建用户、查询单个用户、查询所有用户、更新用户、删除用户,所有接口均为异步函数,数据库操作使用await关键字;
    • 使用HTTPException处理异常情况,如用户不存在时返回404状态码。
  2. 运行测试
    • 运行上述代码,启动Uvicorn服务器;
    • 打开浏览器访问http://localhost:8000/docs,可以看到FastAPI自动生成的接口文档;
    • 在文档页面中可以直接测试各个接口,例如点击/users/的POST接口,输入用户信息后执行,即可在MongoDB中插入一条用户数据。

五、Motor库相关资源

  • PyPI地址:https://pypi.org/project/Motor
  • Github地址:https://github.com/mongodb/motor
  • 官方文档地址:https://motor.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:tortoise-orm入门到实战教程

tortoise-orm是一款专为异步Python应用设计的ORM(对象关系映射)工具,灵感源自Django ORM,支持异步数据库操作,兼容多种数据库(MySQL、PostgreSQL、SQLite等)。其工作原理是将Python类映射为数据库表,通过异步API执行CRUD操作,避免阻塞事件循环。优点是语法简洁、异步性能优、支持迁移;缺点是生态较SQLAlchemy小,部分复杂查询需手写SQL。License为Apache License 2.0

一、tortoise-orm安装与环境配置

1.1 安装tortoise-orm

tortoise-orm支持pip直接安装,同时需根据使用的数据库安装对应的异步驱动。以常用的MySQL和SQLite为例:

  • 安装核心库
pip install tortoise-orm
  • 安装数据库驱动
  • SQLite(无需额外驱动,Python内置)
  • MySQL:安装asyncmy驱动
  pip install asyncmy
  • PostgreSQL:安装asyncpg驱动
  pip install asyncpg

1.2 验证安装

安装完成后,可通过以下代码验证是否安装成功:

import tortoise
print(f"tortoise-orm版本:{tortoise.__version__}")

运行代码,若输出版本号则说明安装成功。

二、tortoise-orm核心概念与初始化

2.1 核心概念

tortoise-orm的核心概念与Django ORM类似,主要包括:

  • Model:Python类,对应数据库中的一张表,类属性对应表字段。
  • Field:字段类型,如IntFieldCharFieldDatetimeField等,定义表字段的属性。
  • Manager:模型的查询管理器,通过objects属性提供查询方法(如all()filter())。
  • 异步会话:所有数据库操作均为异步,需通过asyncio运行。

2.2 数据库初始化

使用tortoise-orm前,需先初始化数据库连接,通过configure方法配置连接信息,再调用init_models加载模型。

import asyncio
from tortoise import Tortoise, run_async
from tortoise.models import Model
from tortoise import fields

# 定义示例模型(后续详细讲解)
class User(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=50)
    age = fields.IntField(default=0)
    created_at = fields.DatetimeField(auto_now_add=True)

# 初始化函数
async def init_db():
    # 配置数据库连接
    await Tortoise.init(
        db_url="sqlite://test.db",  # SQLite数据库文件
        modules={"models": ["__main__"]}  # 模型所在模块
    )
    # 生成数据库表(首次运行时执行)
    await Tortoise.generate_schemas()

# 运行异步初始化
if __name__ == "__main__":
    run_async(init_db())

代码说明

  • db_url:数据库连接字符串,格式为数据库类型://用户名:密码@地址:端口/数据库名,SQLite直接指定文件路径。
  • modules:指定包含模型的模块,__main__表示当前模块。
  • generate_schemas():自动创建模型对应的数据库表,生产环境建议使用迁移工具。

三、tortoise-orm模型定义与字段类型

3.1 模型定义规则

tortoise-orm的模型需继承自tortoise.models.Model,每个模型类对应一张数据库表,表名默认是模型类名的小写复数形式(可通过Meta类自定义)。

from tortoise import fields
from tortoise.models import Model

class User(Model):
    # 主键字段,pk=True表示为主键
    id = fields.IntField(pk=True)
    # 字符串字段,max_length为必填参数
    username = fields.CharField(max_length=30, unique=True, description="用户名")
    # 密码字段,可设置默认值
    password = fields.CharField(max_length=100, default="123456")
    # 整数字段,设置默认值
    age = fields.IntField(default=0, description="年龄")
    # 布尔字段
    is_active = fields.BooleanField(default=True, description="是否激活")
    # 时间字段,auto_now_add=True表示创建时自动填充当前时间
    created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
    # 时间字段,auto_now=True表示更新时自动填充当前时间
    updated_at = fields.DatetimeField(auto_now=True, description="更新时间")

    class Meta:
        # 自定义表名
        table = "user"
        # 索引,可提升查询效率
        indexes = [("username",)]

代码说明

  • pk=True:标记字段为主键,若未定义主键,tortoise-orm会自动创建一个名为id的自增主键。
  • unique=True:设置字段值唯一,避免重复数据。
  • description:字段描述,可选参数。
  • Meta类:用于配置模型的元数据,如自定义表名、索引、外键约束等。

3.2 常用字段类型

tortoise-orm提供了丰富的字段类型,满足不同数据存储需求,常用字段如下表所示:

| 字段类型 | 作用 | 常用参数 |
|-||-|
| IntField | 存储整数 | defaultnull |
| CharField | 存储字符串 | max_lengthuniquedefault |
| TextField | 存储长文本 | nulldefault |
| DatetimeField | 存储日期时间 | auto_now_addauto_now |
| BooleanField | 存储布尔值 | default |
| FloatField | 存储浮点数 | defaultnull |
| ForeignKeyField | 外键关联 | model_nameon_delete |

四、tortoise-orm核心操作:CRUD实战

CRUD是数据库操作的核心(创建、读取、更新、删除),tortoise-orm的所有操作均为异步,需在async函数中执行。

4.1 数据创建(Create)

向数据库中添加数据有两种方式:create()方法和save()方法。

方法1:使用create()直接创建

async def create_user():
    # 初始化数据库
    await init_db()
    # 创建单个用户
    user = await User.create(
        username="zhangsan",
        password="zhangsan123",
        age=20
    )
    print(f"创建用户成功:id={user.id}, username={user.username}")

    # 批量创建用户
    users = await User.bulk_create([
        User(username="lisi", password="lisi123", age=22),
        User(username="wangwu", password="wangwu123", age=25)
    ])
    print(f"批量创建用户成功,共创建{len(users)}个用户")

if __name__ == "__main__":
    run_async(create_user())

代码说明

  • create():创建单个数据对象,返回创建后的模型实例。
  • bulk_create():批量创建数据,接收模型实例列表,效率高于多次调用create()

方法2:先实例化再调用save()

async def create_user_by_save():
    await init_db()
    # 实例化模型
    user = User(username="zhaoliu", password="zhaoliu123", age=18)
    # 保存到数据库
    await user.save()
    print(f"保存用户成功:id={user.id}, username={user.username}")

if __name__ == "__main__":
    run_async(create_user_by_save())

代码说明:适用于需要先对实例进行其他操作,再保存到数据库的场景。

4.2 数据读取(Read)

tortoise-orm提供了丰富的查询方法,支持过滤、排序、分页等操作,常用方法包括all()filter()get()first()等。

async def query_user():
    await init_db()

    # 1. 查询所有用户
    all_users = await User.all()
    print("所有用户:")
    for user in all_users:
        print(f"id={user.id}, username={user.username}, age={user.age}")

    # 2. 过滤查询:查询年龄大于20的用户
    filter_users = await User.filter(age__gt=20).all()
    print("\n年龄大于20的用户:")
    for user in filter_users:
        print(f"username={user.username}, age={user.age}")

    # 3. 精确查询:根据用户名查询用户(get()方法,查询不到会抛异常)
    try:
        user = await User.get(username="zhangsan")
        print(f"\n精确查询用户:id={user.id}, age={user.age}")
    except User.DoesNotExist:
        print("用户不存在")

    # 4. 排序查询:按年龄降序排列
    order_users = await User.all().order_by("-age")
    print("\n按年龄降序排列的用户:")
    for user in order_users:
        print(f"username={user.username}, age={user.age}")

    # 5. 分页查询:获取第2页数据,每页2条
    page_users = await User.all().offset(2).limit(2)
    print("\n分页查询结果:")
    for user in page_users:
        print(f"username={user.username}, age={user.age}")

if __name__ == "__main__":
    run_async(query_user())

代码说明

  • filter():支持多种查询条件,如age__gt=20(年龄大于20)、age__lt=30(年龄小于30)、username__contains="zhang"(用户名包含zhang)。
  • get():查询单个对象,查询结果不存在会抛出DoesNotExist异常,存在多个会抛出MultipleObjectsReturned异常。
  • order_by():排序,字段前加-表示降序。
  • offset():跳过指定数量的数据,limit():限制返回数据的数量,两者结合实现分页。

4.3 数据更新(Update)

更新数据有两种方式:模型实例更新和批量更新。

方式1:模型实例更新

async def update_user():
    await init_db()
    # 查询要更新的用户
    user = await User.get(username="zhangsan")
    # 修改属性
    user.age = 21
    user.password = "new_zhangsan123"
    # 保存更新
    await user.save()
    print(f"更新用户成功:username={user.username}, 新年龄={user.age}")

if __name__ == "__main__":
    run_async(update_user())

方式2:批量更新

async def bulk_update_user():
    await init_db()
    # 批量更新年龄小于20的用户,将is_active设为False
    update_count = await User.filter(age__lt=20).update(is_active=False)
    print(f"批量更新成功,共更新{update_count}个用户")

if __name__ == "__main__":
    run_async(bulk_update_user())

代码说明update()方法返回受影响的行数,适用于批量修改数据,效率更高。

4.4 数据删除(Delete)

删除数据同样支持单个删除和批量删除。

async def delete_user():
    await init_db()
    # 1. 单个删除:查询后删除
    user = await User.get(username="zhaoliu")
    await user.delete()
    print(f"删除用户成功:username={user.username}")

    # 2. 批量删除:删除is_active为False的用户
    delete_count = await User.filter(is_active=False).delete()
    print(f"批量删除成功,共删除{delete_count}个用户")

if __name__ == "__main__":
    run_async(delete_user())

五、外键关联与多表查询

tortoise-orm支持外键关联,实现多表之间的关联查询,以UserArticle模型为例(一个用户可以发布多篇文章)。

5.1 定义关联模型

class Article(Model):
    id = fields.IntField(pk=True)
    title = fields.CharField(max_length=100, description="文章标题")
    content = fields.TextField(description="文章内容")
    # 外键关联User模型,on_delete=fields.CASCADE表示删除用户时同时删除文章
    author = fields.ForeignKeyField("models.User", related_name="articles", on_delete=fields.CASCADE)
    created_at = fields.DatetimeField(auto_now_add=True)

    class Meta:
        table = "article"

代码说明

  • ForeignKeyField:定义外键,第一个参数为关联的模型(格式为模块名.模型名)。
  • related_name:反向关联名称,通过User.articles可查询用户发布的所有文章。
  • on_delete:外键删除策略,fields.CASCADE为级联删除,fields.SET_NULL为设为NULL(需字段允许null=True)。

5.2 关联查询实战

async def relation_query():
    await init_db()
    # 1. 创建用户并关联文章
    user = await User.create(username="author1", password="author123", age=30)
    await Article.bulk_create([
        Article(title="tortoise-orm入门", content="tortoise-orm是一款异步ORM工具", author=user),
        Article(title="异步编程实战", content="Python异步编程技巧", author=user)
    ])

    # 2. 正向查询:查询文章的作者信息
    article = await Article.get(title="tortoise-orm入门")
    # 预加载作者信息,避免N+1查询问题
    await article.fetch_related("author")
    print(f"文章标题:{article.title},作者:{article.author.username}")

    # 3. 反向查询:查询用户发布的所有文章
    user = await User.get(username="author1")
    articles = await user.articles.all()
    print(f"\n用户{user.username}发布的文章:")
    for art in articles:
        print(f"标题:{art.title}")

if __name__ == "__main__":
    run_async(relation_query())

代码说明

  • fetch_related():预加载关联数据,解决ORM中的N+1查询性能问题。
  • 反向关联:通过related_name(如articles)直接查询关联数据,语法简洁。

六、数据库迁移

在实际开发中,模型结构会不断变化,tortoise-orm提供了aerich工具来管理数据库迁移,类似于Django的makemigrationsmigrate

6.1 安装aerich

pip install aerich

6.2 初始化迁移配置

  1. 创建配置文件pyproject.toml(或在项目根目录执行命令生成)
aerich init -t tortoise_config.TORTOISE_ORM
  1. 初始化数据库
aerich init-db

6.3 生成迁移文件与执行迁移

  • 当模型修改后,生成迁移文件:
aerich migrate --name update_user_model
  • 执行迁移,更新数据库表结构:
aerich upgrade

七、实际项目案例:异步用户管理系统

下面通过一个简单的异步用户管理系统,整合tortoise-orm的核心功能,实现用户的注册、查询、更新和删除。

7.1 项目目录结构

user_manage/
├── main.py          # 主程序入口
├── models.py        # 模型定义
└── requirements.txt # 依赖包列表

7.2 编写模型文件models.py

from tortoise import fields
from tortoise.models import Model

class User(Model):
    id = fields.IntField(pk=True)
    username = fields.CharField(max_length=30, unique=True, description="用户名")
    password = fields.CharField(max_length=100, description="密码")
    age = fields.IntField(default=0, description="年龄")
    is_active = fields.BooleanField(default=True, description="是否激活")
    created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
    updated_at = fields.DatetimeField(auto_now=True, description="更新时间")

    class Meta:
        table = "user"
        indexes = [("username",)]

7.3 编写主程序main.py

import asyncio
from tortoise import Tortoise, run_async
from models import User

# 数据库配置
TORTOISE_ORM = {
    "connections": {"default": "sqlite://user_manage.db"},
    "apps": {
        "models": {
            "models": ["models"],
            "default_connection": "default",
        },
    },
}

# 初始化数据库
async def init_db():
    await Tortoise.init(config=TORTOISE_ORM)
    await Tortoise.generate_schemas()

# 用户注册
async def user_register(username: str, password: str, age: int):
    await init_db()
    try:
        user = await User.create(username=username, password=password, age=age)
        return {"code": 200, "msg": "注册成功", "data": {"user_id": user.id, "username": user.username}}
    except Exception as e:
        return {"code": 500, "msg": f"注册失败:{str(e)}"}

# 查询用户信息
async def user_query(username: str = None):
    await init_db()
    if username:
        try:
            user = await User.get(username=username)
            data = {
                "user_id": user.id,
                "username": user.username,
                "age": user.age,
                "is_active": user.is_active,
                "created_at": user.created_at.strftime("%Y-%m-%d %H:%M:%S")
            }
            return {"code": 200, "msg": "查询成功", "data": data}
        except User.DoesNotExist:
            return {"code": 404, "msg": "用户不存在"}
    else:
        users = await User.all()
        data = []
        for user in users:
            data.append({
                "user_id": user.id,
                "username": user.username,
                "age": user.age,
                "is_active": user.is_active
            })
        return {"code": 200, "msg": "查询成功", "data": data}

# 主函数
async def main():
    # 注册用户
    register_res = await user_register("test_user", "test123", 25)
    print(register_res)

    # 查询单个用户
    query_res = await user_query("test_user")
    print(query_res)

    # 查询所有用户
    all_users_res = await user_query()
    print(all_users_res)

if __name__ == "__main__":
    run_async(main())

7.4 运行项目

执行main.py,输出如下:

{'code': 200, 'msg': '注册成功', 'data': {'user_id': 1, 'username': 'test_user'}}
{'code': 200, 'msg': '查询成功', 'data': {'user_id': 1, 'username': 'test_user', 'age': 25, 'is_active': True, 'created_at': '2024-05-20 15:30:00'}}
{'code': 200, 'msg': '查询成功', 'data': [{'user_id': 1, 'username': 'test_user', 'age': 25, 'is_active': True}]}

八、相关资源

  • Pypi地址:https://pypi.org/project/tortoise-orm
  • Github地址:https://github.com/tortoise/tortoise-orm
  • 官方文档地址:https://tortoise-orm.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:s3fs 高效操作AWS S3存储的完整指南

一、s3fs 库核心介绍

s3fs 是一款为 Python 开发者提供便捷访问AWS S3对象存储的文件系统接口库,它基于 fsspec 框架实现,能够将 S3 存储桶映射为本地可操作的文件系统,支持常规的文件读写、目录遍历等操作。其工作原理是通过对接 AWS 的 boto3 客户端,将 S3 的对象存储操作转化为类 POSIX 的文件系统调用,让开发者无需关注 S3 API 的细节即可操作云端存储。

该库的优点是语法简洁、与 Python 内置 io 模块兼容、支持分块读写大文件;缺点是依赖 boto3 配置,且大规模并发操作时需手动优化性能。s3fs 采用 BSD-3-Clause 开源许可证,允许商业和非商业自由使用、修改和分发。

二、s3fs 安装与环境配置

2.1 安装方式

s3fs 的安装非常简单,推荐使用 pip 包管理工具进行安装,在命令行中执行以下命令即可完成安装:

pip install s3fs

如果需要安装特定版本的 s3fs,可以指定版本号,例如安装 2023.10.0 版本:

pip install s3fs==2023.10.0

安装完成后,可以在 Python 环境中通过导入语句验证是否安装成功:

import s3fs
print(s3fs.__version__)

运行上述代码,如果控制台输出对应的版本号,说明安装成功。

2.2 环境配置

s3fs 操作 AWS S3 依赖于 AWS 的身份认证,主要有以下三种配置方式,开发者可以根据实际场景选择:

  1. 配置文件认证
    在本地创建 AWS 配置文件,通常位于 ~/.aws/credentials(Linux/Mac)或 C:\Users\用户名\.aws\credentials(Windows)路径下,文件内容格式如下: [default] aws_access_key_id = 你的Access Key ID aws_secret_access_key = 你的Secret Access Key region = 你的S3存储桶所在区域,例如us-east-1 配置完成后,s3fs 会自动读取该文件的认证信息,无需在代码中手动传入密钥。
  2. 环境变量认证
    在系统环境变量中设置 AWS 认证信息,适用于服务器或容器化部署场景,需要设置的环境变量如下: # Linux/Mac 系统设置方式 export AWS_ACCESS_KEY_ID=你的Access Key ID export AWS_SECRET_ACCESS_KEY=你的Secret Access Key export AWS_REGION=你的S3存储桶所在区域 Windows 系统可以通过“系统属性-高级-环境变量”界面添加上述变量。
  3. 代码中手动传入认证信息
    如果不希望配置本地文件或环境变量,可以在代码中直接传入 AWS 密钥和区域信息,示例如下:
    python import s3fs # 手动配置认证信息 fs = s3fs.S3FileSystem( key='你的Access Key ID', secret='你的Secret Access Key', client_kwargs={'region_name': 'us-east-1'} )
    注意:这种方式会将密钥硬编码在代码中,存在安全风险,生产环境不推荐使用。

三、s3fs 核心功能与代码实例

s3fs 的核心功能是模拟本地文件系统操作 S3 存储桶,其 API 设计与 Python 内置的 os 模块高度相似,降低了开发者的学习成本。下面将详细介绍 s3fs 的常用功能,并提供可直接运行的代码实例。

3.1 连接 S3 存储桶并遍历文件

使用 s3fs 首先需要创建 S3FileSystem 实例,该实例是操作 S3 的核心对象。创建实例后,可以通过 ls 方法遍历存储桶中的文件和目录。

import s3fs
# 创建 S3FileSystem 实例,默认读取本地配置文件的认证信息
fs = s3fs.S3FileSystem()
# 遍历指定存储桶中的内容,格式为 bucket_name/path
bucket_path = 'my-s3-bucket/test-folder'
# 列出存储桶路径下的所有文件和目录
file_list = fs.ls(bucket_path)
print(f"存储桶 {bucket_path} 下的内容:")
for file in file_list:
    print(file)

代码说明

  • s3fs.S3FileSystem() 会自动加载本地 AWS 配置文件或环境变量中的认证信息。
  • fs.ls() 方法的参数是 S3 存储桶的路径,格式为 存储桶名称/目录路径,如果直接传入存储桶名称,则会列出存储桶根目录的内容。
  • 运行代码前,需要将 my-s3-bucket/test-folder 替换为实际的 S3 存储桶和目录路径。

3.2 文件的上传与下载

文件的上传和下载是操作 S3 最常用的功能,s3fs 提供了 put(本地文件上传到 S3)和 get(S3 文件下载到本地)两个方法,同时支持分块传输大文件。

3.2.1 本地文件上传到 S3

import s3fs
# 创建 S3FileSystem 实例
fs = s3fs.S3FileSystem()
# 本地文件路径
local_file_path = './local_test.txt'
# S3 目标路径,格式为 bucket_name/remote_file_name
s3_target_path = 'my-s3-bucket/uploaded_test.txt'
# 上传本地文件到 S3
fs.put(local_file_path, s3_target_path)
print(f"成功将 {local_file_path} 上传到 {s3_target_path}")

代码说明

  • fs.put(local_path, remote_path) 方法接收两个参数,分别是本地文件路径和 S3 目标路径。
  • 如果 S3 目标路径中的目录不存在,s3fs 会自动创建对应的目录结构。

3.2.2 S3 文件下载到本地

import s3fs
fs = s3fs.S3FileSystem()
# S3 源文件路径
s3_source_path = 'my-s3-bucket/uploaded_test.txt'
# 本地目标路径
local_target_path = './downloaded_test.txt'
# 从 S3 下载文件到本地
fs.get(s3_source_path, local_target_path)
print(f"成功将 {s3_source_path} 下载到 {local_target_path}")

代码说明

  • fs.get(remote_path, local_path) 方法接收两个参数,分别是 S3 源文件路径和本地目标路径。
  • 如果本地目标路径的目录不存在,需要提前创建,否则会抛出文件不存在的异常。

3.2.3 大文件的分块上传与下载

当文件大小超过 100MB 时,推荐使用分块传输的方式,避免因网络问题导致传输失败。s3fs 支持通过 block_size 参数设置分块大小,默认分块大小为 5MB。

import s3fs
# 创建 S3FileSystem 实例,设置分块大小为 10MB
fs = s3fs.S3FileSystem(block_size=10*1024*1024)
# 大文件上传
large_local_file = './large_file.zip'
large_s3_path = 'my-s3-bucket/large_file.zip'
fs.put(large_local_file, large_s3_path)
print("大文件上传完成")
# 大文件下载
fs.get(large_s3_path, './downloaded_large_file.zip')
print("大文件下载完成")

代码说明

  • block_size 参数的单位是字节,10*1024*1024 表示 10MB。
  • 分块传输时,s3fs 会将大文件拆分为多个小块,逐个传输,传输失败的块会自动重试。

3.3 文件的读写操作

s3fs 支持直接读写 S3 中的文件,无需先下载到本地,这一功能对于处理云端文件非常高效。其读写 API 与 Python 内置的 open 函数类似。

3.3.1 读取 S3 中的文本文件

import s3fs
fs = s3fs.S3FileSystem()
# S3 文本文件路径
s3_text_file = 'my-s3-bucket/test.txt'
# 以只读模式打开 S3 中的文本文件
with fs.open(s3_text_file, 'r', encoding='utf-8') as f:
    content = f.read()
    print("S3 文本文件内容:")
    print(content)

代码说明

  • fs.open() 方法的参数与 Python 内置 open 函数类似,'r' 表示只读模式,encoding='utf-8' 指定文件编码。
  • 使用 with 语句可以自动关闭文件句柄,避免资源泄漏。

3.3.2 向 S3 写入文本文件

import s3fs
fs = s3fs.S3FileSystem()
# S3 目标文本文件路径
s3_write_file = 'my-s3-bucket/write_test.txt'
# 以写入模式打开文件,如果文件不存在则创建,存在则覆盖
with fs.open(s3_write_file, 'w', encoding='utf-8') as f:
    f.write("这是通过 s3fs 写入 S3 的文本内容\n")
    f.write("第二行文本内容")
print(f"成功向 {s3_write_file} 写入内容")

代码说明

  • 'w' 模式表示写入模式,如果 S3 中已存在同名文件,会被覆盖。
  • 如果需要追加内容,可以使用 'a' 模式,示例如下:
with fs.open(s3_write_file, 'a', encoding='utf-8') as f:
    f.write("\n这是追加的文本内容")

3.3.3 读写二进制文件

对于图片、视频、压缩包等二进制文件,需要使用 'rb'(只读二进制)和 'wb'(写入二进制)模式。

import s3fs
fs = s3fs.S3FileSystem()
# 读取二进制文件(如图片)
s3_image_path = 'my-s3-bucket/test_image.png'
with fs.open(s3_image_path, 'rb') as f:
    image_data = f.read()
    print(f"读取到的图片数据大小:{len(image_data)} 字节")
# 写入二进制文件
local_image_path = './local_image.png'
s3_target_image = 'my-s3-bucket/uploaded_image.png'
with open(local_image_path, 'rb') as local_f, fs.open(s3_target_image, 'wb') as s3_f:
    s3_f.write(local_f.read())
print("二进制图片文件上传完成")

代码说明

  • 读写二进制文件时,不需要指定 encoding 参数。
  • 上述代码通过嵌套 with 语句,实现了本地二进制文件到 S3 的直接上传。

3.4 目录的创建与删除

s3fs 支持对 S3 中的目录进行创建、删除等操作,对应的方法分别是 mkdirrm

3.4.1 创建目录

import s3fs
fs = s3fs.S3FileSystem()
# 要创建的 S3 目录路径
new_dir_path = 'my-s3-bucket/new-folder/sub-folder'
# 创建目录,parents=True 表示如果父目录不存在则自动创建
fs.mkdir(new_dir_path, parents=True)
print(f"成功创建目录 {new_dir_path}")
# 验证目录是否存在
if fs.exists(new_dir_path):
    print(f"目录 {new_dir_path} 存在")
else:
    print(f"目录 {new_dir_path} 不存在")

代码说明

  • fs.mkdir() 方法的 parents=True 参数非常重要,类似于 Linux 命令 mkdir -p,可以自动创建多级目录。
  • fs.exists() 方法用于判断路径(文件或目录)是否存在。

3.4.2 删除文件和目录

import s3fs
fs = s3fs.S3FileSystem()
# 删除单个文件
file_to_delete = 'my-s3-bucket/write_test.txt'
if fs.exists(file_to_delete):
    fs.rm(file_to_delete)
    print(f"成功删除文件 {file_to_delete}")
# 删除目录及目录下的所有内容,recursive=True 表示递归删除
dir_to_delete = 'my-s3-bucket/new-folder'
if fs.exists(dir_to_delete):
    fs.rm(dir_to_delete, recursive=True)
    print(f"成功删除目录 {dir_to_delete} 及其所有内容")

代码说明

  • fs.rm() 方法默认只能删除文件,删除目录时必须指定 recursive=True,否则会抛出异常。
  • 删除操作不可逆,执行前请务必确认路径正确。

3.5 文件的重命名与移动

s3fs 提供 rename 方法实现文件的重命名和移动功能,该方法相当于 Linux 中的 mv 命令。

import s3fs
fs = s3fs.S3FileSystem()
# 原文件路径
original_path = 'my-s3-bucket/test.txt'
# 重命名后的路径
new_path = 'my-s3-bucket/renamed_test.txt'
# 文件移动:将文件移动到另一个目录
move_path = 'my-s3-bucket/new-folder/moved_test.txt'
# 重命名文件
fs.rename(original_path, new_path)
print(f"文件已从 {original_path} 重命名为 {new_path}")
# 移动文件,先确保目标目录存在
fs.mkdir('my-s3-bucket/new-folder', parents=True)
fs.rename(new_path, move_path)
print(f"文件已从 {new_path} 移动到 {move_path}")

代码说明

  • fs.rename(src, dst) 方法接收两个参数,src 是原路径,dst 是目标路径。
  • 如果目标路径的目录不存在,移动操作会失败,因此需要提前创建目录。

四、s3fs 实际应用案例:云端数据处理

在数据科学和机器学习场景中,经常需要处理存储在 S3 中的大规模数据集。下面以读取 S3 中的 CSV 文件并进行数据分析为例,展示 s3fs 与 pandas 库的结合使用,实现云端数据的直接处理,无需下载到本地。

4.1 案例需求

读取 S3 存储桶中 my-s3-bucket/dataset 目录下的 sales_data.csv 文件,分析该文件的前 5 行数据、数据列名和数据类型,并计算销售额的平均值。

4.2 代码实现

import s3fs
import pandas as pd
# 创建 S3FileSystem 实例
fs = s3fs.S3FileSystem()
# S3 中 CSV 文件的路径
s3_csv_path = 'my-s3-bucket/dataset/sales_data.csv'
# 使用 s3fs 打开 CSV 文件,并通过 pandas 读取
with fs.open(s3_csv_path, 'r', encoding='utf-8') as f:
    df = pd.read_csv(f)
# 数据分析
print("=== 销售数据前 5 行 ===")
print(df.head())
print("\n=== 数据列名 ===")
print(df.columns.tolist())
print("\n=== 数据类型 ===")
print(df.dtypes)
print("\n=== 销售额平均值 ===")
# 假设销售额列名为 sales_amount
average_sales = df['sales_amount'].mean()
print(f"平均销售额:{average_sales:.2f}")

代码说明

  • s3fs 与 pandas 完美兼容,通过 fs.open() 打开的文件对象可以直接传入 pd.read_csv() 函数。
  • 这种方式无需将 CSV 文件下载到本地,节省了本地存储空间,尤其适合处理 GB 级别的大型数据集。
  • 运行代码前,需要确保 pandas 库已安装,可通过 pip install pandas 命令安装。

4.3 案例扩展:批量处理 S3 中的多个 CSV 文件

如果 S3 目录下有多个 CSV 文件,可以通过 fs.glob() 方法匹配所有 CSV 文件,然后批量读取和合并。

import s3fs
import pandas as pd
fs = s3fs.S3FileSystem()
# 匹配 S3 目录下所有的 CSV 文件
csv_files = fs.glob('my-s3-bucket/dataset/*.csv')
print(f"找到 {len(csv_files)} 个 CSV 文件")
# 批量读取并合并所有 CSV 文件
df_list = []
for file in csv_files:
    with fs.open(file, 'r', encoding='utf-8') as f:
        df_temp = pd.read_csv(f)
        df_list.append(df_temp)
        print(f"已读取文件:{file}")
# 合并所有 DataFrame
merged_df = pd.concat(df_list, ignore_index=True)
print(f"\n合并后的数据集总行数:{len(merged_df)}")
print("合并后数据前 3 行:")
print(merged_df.head(3))

代码说明

  • fs.glob() 方法支持通配符匹配,*.csv 表示匹配所有以 .csv 结尾的文件。
  • pd.concat() 函数用于合并多个 DataFrame,ignore_index=True 表示重置合并后的索引。

五、s3fs 相关资源

  • Pypi地址:https://pypi.org/project/s3fs
  • Github地址:https://github.com/fsspec/s3fs
  • 官方文档地址:https://s3fs.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:SQLAlchemy零基础入门教程

一、SQLAlchemy 核心介绍

SQLAlchemy是Python生态中功能强大的ORM(对象关系映射) 库,它能将Python类与数据库表进行映射,让开发者通过操作Python对象来实现数据库的增删改查,无需编写复杂的原生SQL语句。其工作原理是建立对象模型与关系模型的映射桥梁,通过SQL表达式语言和ORM两层架构,实现对多种数据库的兼容操作。

优点方面,它支持MySQL、PostgreSQL、SQLite等主流数据库,具备灵活的查询构造能力,事务处理机制完善,且能兼顾底层SQL的优化需求;缺点是入门门槛略高于轻量级ORM库,简单场景下配置相对繁琐。SQLAlchemy采用MIT开源许可证,允许自由使用、修改和分发,无商业使用限制。

二、SQLAlchemy 安装步骤

对于技术小白来说,SQLAlchemy的安装非常简单,只需要使用Python的包管理工具pip即可完成,具体步骤如下:

  1. 检查pip环境:打开命令行终端(Windows下是CMD或PowerShell,Mac和Linux下是Terminal),输入以下命令验证pip是否可用
    bash pip --version
    如果能正常显示pip的版本号,说明环境没问题;如果提示“找不到命令”,则需要先配置Python的环境变量。
  2. 执行安装命令:在终端中输入以下命令,安装最新版本的SQLAlchemy
    bash pip install sqlalchemy
  3. 验证安装结果:安装完成后,在终端中输入Python交互式环境,执行以下代码
    python import sqlalchemy print(sqlalchemy.__version__)
    如果能正常输出SQLAlchemy的版本号(例如2.0.23),则说明安装成功。

提示:如果需要连接特定的数据库(如MySQL),还需要安装对应的数据库驱动,例如pip install pymysql;连接PostgreSQL则需要安装psycopg2-binary

三、SQLAlchemy 核心使用方式

3.1 核心概念梳理

在使用SQLAlchemy之前,我们需要先了解几个核心概念,这对后续的学习至关重要:

  • Engine(引擎):负责管理数据库连接池,是SQLAlchemy与数据库交互的核心入口。
  • Session(会话):用于执行数据库操作的“工作区”,所有的增删改查操作都需要通过Session来执行。
  • Model(模型):继承自declarative_base的Python类,每个类对应数据库中的一张表,类的属性对应表的字段。
  • MetaData(元数据):用于存储数据库表结构的相关信息,ORM模式下会自动生成。

3.2 建立数据库连接

首先我们需要创建一个数据库引擎,不同数据库的连接字符串格式略有不同,下面以常用的SQLite(无需额外配置,文件型数据库)和MySQL为例进行演示。

3.2.1 连接SQLite数据库

SQLite数据库无需安装服务端,直接通过文件路径即可连接,适合本地测试和小型项目。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建SQLite引擎,echo=True表示打印执行的SQL语句,方便调试
engine = create_engine('sqlite:///test.db', echo=True)

# 创建Session类,绑定到上面的引擎
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

代码说明:

  • sqlite:///test.db 表示数据库文件test.db位于当前目录下,如果文件不存在,SQLAlchemy会自动创建。
  • autocommit=False 表示关闭自动提交,所有操作需要手动提交事务。
  • autoflush=False 表示关闭自动刷新,避免不必要的数据库交互。

3.2.2 连接MySQL数据库

连接MySQL需要先安装驱动(如pymysql),然后使用对应的连接字符串。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 安装驱动:pip install pymysql
# 连接字符串格式:mysql+pymysql://用户名:密码@主机地址:端口号/数据库名
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test_db', echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

代码说明:

  • 请将root替换为你的MySQL用户名,123456替换为密码,test_db替换为需要连接的数据库名(需提前在MySQL中创建)。

3.3 定义数据模型

数据模型是Python类与数据库表的映射载体,我们需要继承declarative_base来创建模型类。

from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

# 创建基类,所有模型类都需要继承这个基类
Base = declarative_base()

# 定义User模型,对应数据库中的user表
class User(Base):
    # 定义表名
    __tablename__ = 'user'

    # 定义表字段
    id = Column(Integer, primary_key=True, autoincrement=True, comment='用户ID')
    name = Column(String(50), nullable=False, comment='用户姓名')
    age = Column(Integer, nullable=True, comment='用户年龄')
    create_time = Column(DateTime, default=datetime.now, comment='创建时间')

    # 定义__repr__方法,方便打印对象时查看信息
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age})>"

代码说明:

  • __tablename__ 属性指定模型对应的数据库表名,如果不指定,SQLAlchemy会默认使用类名的小写形式作为表名。
  • Column 用于定义表字段,参数说明:
  • Integer/String/DateTime 表示字段的数据类型;
  • primary_key=True 表示该字段是主键;
  • autoincrement=True 表示主键自增(仅适用于整数类型);
  • nullable=False 表示该字段不允许为空;
  • default 表示字段的默认值。

3.4 创建数据库表

定义好模型后,我们需要通过create_all方法来创建对应的数据库表,执行以下代码即可:

# 基于引擎创建所有定义的表
Base.metadata.create_all(bind=engine)

代码说明:

  • 执行该代码后,SQLAlchemy会检查数据库中是否存在user表,如果不存在则自动创建;如果已存在,则不会重复创建,也不会修改现有表结构。

3.5 数据库基本操作(CRUD)

CRUD是数据库操作的核心,即创建(Create)、查询(Read)、更新(Update)、删除(Delete),下面我们通过Session来实现这些操作。

3.5.1 创建数据(新增用户)

新增数据的步骤是:创建Session实例 → 实例化模型类 → 将对象添加到Session → 提交事务 → 关闭Session。

# 创建Session实例
db = SessionLocal()

# 方式1:单个新增
user1 = User(name='张三', age=25)
db.add(user1)

# 方式2:批量新增
user2 = User(name='李四', age=30)
user3 = User(name='王五', age=28)
db.add_all([user2, user3])

# 提交事务,这一步才会真正将数据写入数据库
db.commit()

# 刷新对象,获取数据库自动生成的id等属性
db.refresh(user1)
print(user1)  # 输出:<User(id=1, name='张三', age=25)>

# 关闭Session
db.close()

代码说明:

  • db.add() 用于添加单个对象,db.add_all() 用于添加多个对象。
  • db.commit() 必须执行,否则所有操作都只是在本地Session中,不会同步到数据库。
  • db.refresh() 用于从数据库中获取最新的对象数据,例如自增的id字段。

3.5.2 查询数据(读取用户)

SQLAlchemy提供了灵活的查询方式,支持简单查询、条件查询、排序、分页等操作,查询的核心是db.query()方法。

db = SessionLocal()

# 1. 查询所有用户
all_users = db.query(User).all()
print("所有用户:", all_users)

# 2. 查询单个用户(根据主键查询)
user = db.query(User).get(1)  # get方法根据主键查询,不存在返回None
print("主键为1的用户:", user)

# 3. 条件查询(filter)
# 查询年龄大于25的用户
users_gt_25 = db.query(User).filter(User.age > 25).all()
print("年龄大于25的用户:", users_gt_25)

# 查询姓名为“李四”的用户
user_li = db.query(User).filter(User.name == '李四').first()  # first()返回第一条数据,不存在返回None
print("姓名为李四的用户:", user_li)

# 4. 排序查询(order_by)
# 按年龄升序排序
sorted_users = db.query(User).order_by(User.age.asc()).all()
print("按年龄升序排序的用户:", sorted_users)

# 5. 分页查询(slice)
# 查询第2-3条数据(索引从0开始)
page_users = db.query(User).slice(1, 3).all()
print("分页查询结果:", page_users)

db.close()

代码说明:

  • all() 返回所有符合条件的结果列表,first() 返回第一条结果,get() 根据主键查询。
  • filter() 用于添加查询条件,支持==><!=等运算符,还可以通过and_or_组合多条件。
  • order_by() 用于排序,asc() 升序,desc() 降序。
  • slice(start, end) 用于分页,start 是起始索引,end 是结束索引(不包含)。

3.5.3 更新数据(修改用户信息)

更新数据的步骤是:查询到需要修改的对象 → 修改对象的属性 → 提交事务。

db = SessionLocal()

# 1. 先查询再更新
user = db.query(User).filter(User.name == '张三').first()
if user:
    user.age = 26  # 修改年龄
    db.commit()  # 提交事务
    db.refresh(user)
    print("更新后的用户:", user)  # 输出:<User(id=1, name='张三', age=26)>

# 2. 批量更新(无需查询对象)
db.query(User).filter(User.age > 25).update({User.age: User.age + 1})
db.commit()
print("批量更新后年龄大于25的用户:", db.query(User).filter(User.age > 25).all())

db.close()

代码说明:

  • 方式1适合单条数据的更新,需要先查询到对象再修改属性;
  • 方式2适合批量更新,直接通过update()方法修改,效率更高,无需查询对象。

3.5.4 删除数据(删除用户)

删除数据的步骤是:查询到需要删除的对象 → 调用delete()方法 → 提交事务。

db = SessionLocal()

# 1. 单条数据删除
user = db.query(User).get(3)  # 删除主键为3的用户
if user:
    db.delete(user)
    db.commit()
    print("删除后的所有用户:", db.query(User).all())

# 2. 批量数据删除
db.query(User).filter(User.age > 28).delete()
db.commit()
print("批量删除后剩余用户:", db.query(User).all())

db.close()

代码说明:

  • 删除操作执行后,必须调用db.commit()才能生效;
  • 批量删除时,通过filter()添加条件,直接删除符合条件的所有数据。

四、实际案例:用户信息管理系统

为了让大家更好地掌握SQLAlchemy的使用,我们结合一个实际案例——用户信息管理系统,实现用户的新增、查询、修改、删除功能,代码如下:

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

# 1. 创建引擎和Session
engine = create_engine('sqlite:///user_manage.db', echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 2. 定义用户模型
class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), nullable=False)
    age = Column(Integer, nullable=True)
    gender = Column(String(10), nullable=True)
    create_time = Column(DateTime, default=datetime.now)

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', age={self.age}, gender='{self.gender}')>"

# 3. 创建数据库表
Base.metadata.create_all(bind=engine)

# 4. 定义操作函数
def get_db():
    """获取数据库Session,自动关闭"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def add_user(name, age, gender):
    """新增用户"""
    db = next(get_db())
    user = User(name=name, age=age, gender=gender)
    db.add(user)
    db.commit()
    db.refresh(user)
    return user

def query_user(user_id=None, name=None):
    """查询用户,支持按ID或姓名查询"""
    db = next(get_db())
    if user_id:
        return db.query(User).get(user_id)
    elif name:
        return db.query(User).filter(User.name == name).all()
    else:
        return db.query(User).all()

def update_user(user_id, **kwargs):
    """更新用户信息"""
    db = next(get_db())
    user = db.query(User).get(user_id)
    if not user:
        return None
    for key, value in kwargs.items():
        if hasattr(user, key):
            setattr(user, key, value)
    db.commit()
    db.refresh(user)
    return user

def delete_user(user_id):
    """删除用户"""
    db = next(get_db())
    user = db.query(User).get(user_id)
    if not user:
        return False
    db.delete(user)
    db.commit()
    return True

# 5. 测试功能
if __name__ == '__main__':
    # 新增用户
    print("=== 新增用户 ===")
    user1 = add_user("张三", 25, "男")
    user2 = add_user("李四", 30, "女")
    print(f"新增用户:{user1}, {user2}")

    # 查询用户
    print("\n=== 查询所有用户 ===")
    all_users = query_user()
    print(all_users)

    print("\n=== 按姓名查询用户 ===")
    li_users = query_user(name="李四")
    print(li_users)

    # 更新用户
    print("\n=== 更新用户信息 ===")
    updated_user = update_user(1, age=26, gender="男")
    print(f"更新后的用户:{updated_user}")

    # 删除用户
    print("\n=== 删除用户 ===")
    result = delete_user(2)
    print(f"删除是否成功:{result}")
    print(f"删除后剩余用户:{query_user()}")

代码说明:

  • get_db() 函数通过生成器实现Session的自动创建和关闭,避免手动关闭的繁琐;
  • add_user()query_user()update_user()delete_user() 四个函数分别实现用户的增删改查功能;
  • if __name__ == '__main__' 代码块中,我们测试了所有功能,运行后可以看到完整的操作流程和结果。

五、相关资源地址

  • Pypi地址:https://pypi.org/project/SQLAlchemy
  • Github地址:https://github.com/sqlalchemy/sqlalchemy
  • 官方文档地址:https://docs.sqlalchemy.org/en/20/

这个案例覆盖了SQLAlchemy的核心使用场景,小白可以直接复制代码运行,然后根据自己的需求修改字段和功能,快速上手实际开发。{ Environment.NewLine }{ Environment.NewLine }关注我,每天分享一个实用的Python自动化工具。

Python数据库迁移利器:Alembic全面使用教程

一、Alembic简介

Alembic是SQLAlchemy作者开发的数据库迁移工具,用于管理数据库模式变更。它能追踪模型变化,生成迁移脚本,支持版本控制和回滚操作。工作原理基于SQLAlchemy的元数据反射,通过对比模型与数据库结构生成差异脚本。

优点:与SQLAlchemy无缝集成,支持多种数据库,迁移脚本可手动编辑。缺点:初期配置稍复杂,对新手不够友好。Alembic采用MIT许可证,允许自由使用和修改。

二、Alembic安装与初始化

2.1 安装Alembic

使用pip可以轻松安装Alembic:

pip install alembic

安装完成后,可以通过以下命令验证安装是否成功:

alembic --version

如果安装成功,会显示当前Alembic的版本信息。

2.2 初始化Alembic环境

在你的项目目录中,执行以下命令初始化Alembic环境:

alembic init alembic

这个命令会在当前目录下创建一个名为alembic的文件夹和一个alembic.ini配置文件。初始化成功后,你的项目结构会类似这样:

your_project/
├── alembic/
│   ├── versions/
│   ├── env.py
│   ├── README
│   ├── script.py.mako
│   └── env.pyc
└── alembic.ini

其中,alembic.ini是主配置文件,alembic文件夹包含迁移脚本和环境配置。

2.3 配置数据库连接

编辑alembic.ini文件,找到sqlalchemy.url配置项,设置你的数据库连接字符串。例如,对于SQLite数据库:

sqlalchemy.url = sqlite:///mydatabase.db

对于PostgreSQL数据库:

sqlalchemy.url = postgresql://user:password@localhost/mydatabase

对于MySQL数据库:

sqlalchemy.url = mysql+pymysql://user:password@localhost/mydatabase

你也可以在alembic/env.py文件中通过代码配置数据库连接,这在需要动态配置的情况下非常有用:

# 在alembic/env.py中
from myapp import create_app
from myapp.models import Base

app = create_app()
target_metadata = Base.metadata

def run_migrations_online():
    connectable = app.engine  # 从应用中获取引擎

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

三、Alembic基本使用方法

3.1 创建迁移脚本

Alembic提供了两种创建迁移脚本的方式:自动生成和手动创建。

3.1.1 自动生成迁移脚本

当你已经定义了SQLAlchemy模型,并希望根据模型生成迁移脚本时,可以使用以下命令:

alembic revision --autogenerate -m "描述迁移的信息"

例如,如果你创建了一个用户模型,可以运行:

alembic revision --autogenerate -m "add user table"

这个命令会在alembic/versions目录下生成一个新的迁移脚本文件,文件名格式为{版本号}_{描述}.py

自动生成的脚本会包含两个主要函数:upgrade()downgrade()upgrade()函数用于应用迁移,downgrade()函数用于回滚迁移。

3.1.2 手动创建迁移脚本

如果你需要手动编写迁移脚本,可以使用以下命令创建一个空的迁移脚本:

alembic revision -m "描述迁移的信息"

然后编辑生成的脚本文件,手动编写upgrade()downgrade()函数中的逻辑。

例如,手动创建一个添加用户表的迁移脚本:

"""add user table

Revision ID: 1234567890ab
Revises: 
Create Date: 2023-07-15 10:00:00.000000

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '1234567890ab'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('username', sa.String(length=50), nullable=False, unique=True),
        sa.Column('email', sa.String(length=100), nullable=False, unique=True),
        sa.Column('password_hash', sa.String(length=255), nullable=False),
        sa.Column('created_at', sa.DateTime(), default=sa.func.now())
    )


def downgrade():
    op.drop_table('users')

3.2 应用迁移

创建迁移脚本后,可以使用以下命令将迁移应用到数据库:

alembic upgrade head

这个命令会将所有未应用的迁移脚本按顺序执行,将数据库更新到最新版本。

你也可以指定迁移到特定版本:

alembic upgrade 1234567890ab

或者相对于当前版本升级一定数量的迁移:

alembic upgrade +2

3.3 回滚迁移

如果需要回滚迁移,可以使用downgrade命令。回滚到上一个版本:

alembic downgrade -1

回滚到特定版本:

alembic downgrade 0987654321fe

回滚到最初始的版本:

alembic downgrade base

3.4 查看迁移历史

可以使用以下命令查看所有迁移版本的历史记录:

alembic history

加上-v参数可以查看更详细的信息:

alembic history -v

查看当前数据库的版本:

alembic current

四、Alembic高级用法

4.1 批量操作

当需要对多个表进行操作时,可以使用Alembic的批量操作API,它提供了更灵活的表结构修改方式,并且在不同数据库之间有更好的兼容性。

例如,批量添加列到多个表:

from alembic import op
import sqlalchemy as sa
from alembic.batch_alter_table import BatchOperations, batch_alter_table

def upgrade():
    # 定义要添加的列
    new_columns = [
        sa.Column('updated_at', sa.DateTime(), default=sa.func.now(), onupdate=sa.func.now())
    ]

    # 要添加列的表列表
    tables = ['users', 'posts', 'comments']

    for table in tables:
        with batch_alter_table(table) as batch_op:
            for column in new_columns:
                batch_op.add_column(column)

def downgrade():
    # 要删除的列
    columns_to_drop = ['updated_at']

    # 要操作的表列表
    tables = ['users', 'posts', 'comments']

    for table in tables:
        with batch_alter_table(table) as batch_op:
            for column in columns_to_drop:
                batch_op.drop_column(column)

4.2 数据迁移

除了结构迁移,Alembic也可以用于数据迁移。例如,在修改表结构前先迁移数据:

from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker

# 定义临时模型,用于数据迁移
class OldUser(sa.ext.declarative.Base):
    __tablename__ = 'users'
    id = sa.Column(sa.Integer, primary_key=True)
    full_name = sa.Column(sa.String(100))

class NewUser(sa.ext.declarative.Base):
    __tablename__ = 'users'
    id = sa.Column(sa.Integer, primary_key=True)
    first_name = sa.Column(sa.String(50))
    last_name = sa.Column(sa.String(50))

def upgrade():
    # 先添加新列
    op.add_column('users', sa.Column('first_name', sa.String(50)))
    op.add_column('users', sa.Column('last_name', sa.String(50)))

    # 创建会话
    Session = sessionmaker()
    bind = op.get_bind()
    session = Session(bind=bind)

    # 迁移数据:将full_name拆分为first_name和last_name
    for user in session.query(OldUser):
        if user.full_name:
            name_parts = user.full_name.split(' ', 1)
            user.first_name = name_parts[0]
            user.last_name = name_parts[1] if len(name_parts) > 1 else ''

    session.commit()

    # 删除旧列
    op.drop_column('users', 'full_name')

def downgrade():
    # 添加回旧列
    op.add_column('users', sa.Column('full_name', sa.String(100)))

    # 创建会话
    Session = sessionmaker()
    bind = op.get_bind()
    session = Session(bind=bind)

    # 恢复数据:将first_name和last_name合并为full_name
    for user in session.query(NewUser):
        user.full_name = f"{user.first_name} {user.last_name}".strip()

    session.commit()

    # 删除新列
    op.drop_column('users', 'first_name')
    op.drop_column('users', 'last_name')

4.3 事务管理

Alembic默认会在事务中执行迁移操作,但你也可以根据需要手动管理事务。

from alembic import op
import sqlalchemy as sa

def upgrade():
    # 禁用自动事务管理
    connection = op.get_bind()
    transaction = connection.begin()

    try:
        # 执行迁移操作
        op.create_table('categories',
            sa.Column('id', sa.Integer(), primary_key=True),
            sa.Column('name', sa.String(50), nullable=False)
        )

        # 手动提交事务
        transaction.commit()
    except Exception as e:
        # 发生错误时回滚
        transaction.rollback()
        raise e

def downgrade():
    connection = op.get_bind()
    transaction = connection.begin()

    try:
        op.drop_table('categories')
        transaction.commit()
    except Exception as e:
        transaction.rollback()
        raise e

4.4 环境变量配置

在实际项目中,数据库连接信息通常不会硬编码在配置文件中,而是通过环境变量获取。可以修改alembic/env.py文件来支持环境变量:

# 在alembic/env.py中
import os
from dotenv import load_dotenv  # 需要安装python-dotenv包
from sqlalchemy import create_engine

# 加载环境变量
load_dotenv()

# 从环境变量获取数据库连接信息
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = os.getenv('DB_PORT', '5432')
DB_NAME = os.getenv('DB_NAME')

# 构建数据库连接字符串
SQLALCHEMY_DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# 配置目标元数据
from myapp.models import Base
target_metadata = Base.metadata

def run_migrations_online():
    connectable = create_engine(SQLALCHEMY_DATABASE_URL)

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

然后创建一个.env文件存储数据库连接信息:

DB_USER=myuser
DB_PASSWORD=mypassword
DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydatabase

这样就可以避免在代码中硬编码敏感信息。

五、实际项目案例

假设我们正在开发一个博客系统,需要使用Alembic管理数据库迁移。以下是整个过程的示例:

5.1 项目结构

blog_project/
├── alembic/
├── alembic.ini
├── .env
├── models.py
└── app.py

5.2 定义数据模型

首先,在models.py中定义我们的数据库模型:

from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(255), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    # 关系
    posts = relationship('Post', back_populates='author')

class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    author_id = Column(Integer, ForeignKey('users.id'))

    # 关系
    author = relationship('User', back_populates='posts')
    comments = relationship('Comment', back_populates='post')

class Comment(Base):
    __tablename__ = 'comments'

    id = Column(Integer, primary_key=True)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    post_id = Column(Integer, ForeignKey('posts.id'))
    author_id = Column(Integer, ForeignKey('users.id'))

    # 关系
    post = relationship('Post', back_populates='comments')
    author = relationship('User')

5.3 初始化并配置Alembic

初始化Alembic环境:

alembic init alembic

编辑alembic.ini文件,配置数据库连接(或者使用前面介绍的环境变量方式):

sqlalchemy.url = postgresql://myuser:mypassword@localhost/blogdb

修改alembic/env.py文件,指定目标元数据:

# 在alembic/env.py中
from models import Base
target_metadata = Base.metadata

5.4 创建初始迁移

生成初始迁移脚本:

alembic revision --autogenerate -m "initial schema"

这会生成一个包含创建所有表的迁移脚本。检查生成的脚本无误后,应用迁移:

alembic upgrade head

5.5 模型变更与迁移

随着项目发展,我们需要对模型进行修改。例如,我们想给用户添加一个bio字段:

# 在User模型中添加
bio = Column(Text, nullable=True)

生成新的迁移脚本:

alembic revision --autogenerate -m "add user bio"

检查生成的脚本,确认它包含添加bio列的操作,然后应用迁移:

alembic upgrade head

5.6 数据迁移案例

假设我们需要将Post表的title字段长度从200增加到300,并且需要对现有数据进行处理(如果标题过长则截断):

# 首先修改模型
title = Column(String(300), nullable=False)  # 从200改为300

生成迁移脚本:

alembic revision --autogenerate -m "increase post title length"

然后编辑生成的迁移脚本,添加数据处理逻辑:

"""increase post title length

Revision ID: 5f3a7b9d1c2e
Revises: previous_revision_id
Create Date: 2023-07-16 14:30:00.000000

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker

# 定义临时模型用于数据处理
class Post(sa.ext.declarative.Base):
    __tablename__ = 'posts'
    id = sa.Column(sa.Integer, primary_key=True)
    title = sa.Column(sa.String(200))  # 原始长度

def upgrade():
    # 1. 先添加一个临时列
    op.add_column('posts', sa.Column('new_title', sa.String(300)))

    # 2. 截断过长的标题并迁移到临时列
    bind = op.get_bind()
    Session = sessionmaker(bind=bind)
    session = Session()

    for post in session.query(Post):
        # 截断标题到300个字符
        post.new_title = post.title[:300]

    session.commit()

    # 3. 删除旧的title列
    op.drop_column('posts', 'title')

    # 4. 将临时列重命名为title
    op.alter_column('posts', 'new_title', new_column_name='title', nullable=False)

def downgrade():
    # 1. 先添加一个临时列
    op.add_column('posts', sa.Column('old_title', sa.String(200)))

    # 2. 截断过长的标题并迁移到临时列
    bind = op.get_bind()
    Session = sessionmaker(bind=bind)
    session = Session()

    # 这里需要重新定义Post模型,因为现在title是300长度
    class PostDowngrade(sa.ext.declarative.Base):
        __tablename__ = 'posts'
        id = sa.Column(sa.Integer, primary_key=True)
        title = sa.Column(sa.String(300))

    for post in session.query(PostDowngrade):
        # 截断标题到200个字符
        post.old_title = post.title[:200]

    session.commit()

    # 3. 删除新的title列
    op.drop_column('posts', 'title')

    # 4. 将临时列重命名为title
    op.alter_column('posts', 'old_title', new_column_name='title', nullable=False)

应用这个迁移:

alembic upgrade head

5.7 回滚操作

如果发现最新的迁移有问题,可以回滚到上一个版本:

alembic downgrade -1

修复问题后,重新生成并应用迁移。

六、相关资源

  • PyPI地址:https://pypi.org/project/alembic/
  • Github地址:https://github.com/sqlalchemy/alembic
  • 官方文档地址:https://alembic.sqlalchemy.org/

通过本文的介绍,你应该已经掌握了Alembic的基本使用方法和一些高级技巧。Alembic作为一个强大的数据库迁移工具,能够帮助你在项目开发过程中轻松管理数据库结构的变更,保持数据库设计与代码模型的同步。无论是小型项目还是大型应用,Alembic都能为你的数据库迁移提供可靠的支持。{ Environment.NewLine }{ Environment.NewLine }关注我,每天分享一个实用的Python自动化工具。

Python实用工具:深入解析Elasticsearch DSL库

Python凭借其简洁的语法、丰富的生态以及强大的扩展性,已成为数据科学、Web开发、自动化运维等多个领域的核心工具。从金融领域的量化交易到科研领域的机器学习模型训练,从电商平台的数据分析到搜索引擎的搭建,Python的身影无处不在。在众多工具库中,Elasticsearch DSL以其优雅的查询构建方式和强大的 Elasticsearch 交互能力,成为数据检索与分析场景中的重要利器。本文将围绕该库的用途、原理、使用方法及实战案例展开详细介绍,帮助读者快速掌握其核心功能。

一、Elasticsearch DSL库概述

1.1 用途与应用场景

Elasticsearch DSL(Domain Specific Language)是一个基于 Python 的库,用于简化与 Elasticsearch 搜索引擎的交互。其核心价值在于:

  • 构建复杂查询:通过 Python 类和方法链式调用的方式,替代传统的 JSON 字符串拼接,提升查询语句的可读性与维护性。
  • 支持聚合分析:方便实现数据分组、统计计算(如求和、平均值、分桶分析等),适用于日志分析、用户行为追踪、实时数据统计等场景。
  • 集成数据建模:支持定义文档映射(Mapping)和模型类,简化数据索引的创建与管理流程。

典型应用场景包括:

  • 日志管理系统:通过 DSL 快速检索特定时间段、特定级别的日志,并进行聚合统计(如每分钟错误日志数量)。
  • 电商搜索服务:构建商品搜索接口,支持关键词匹配、过滤(价格区间、品牌)、排序(销量、评分)等组合查询。
  • 数据分析平台:对海量数据进行分桶分析(如按用户地域分布、年龄分段统计活跃用户数)。

1.2 工作原理

Elasticsearch DSL 本质上是对 Elasticsearch HTTP API 的一层封装,主要包含以下组件:

  • 查询构建器:通过 Python 类(如QueryBoolQueryMatchQuery等)生成对应的 Elasticsearch 查询 DSL(JSON 格式)。
  • 传输层:利用elasticsearch-py库(DSL 库的依赖项)与 Elasticsearch 集群建立连接,发送查询请求并解析响应结果。
  • 模型定义:通过Document类定义文档结构(字段类型、分词器等),自动生成索引的 Mapping 配置。

1.3 优缺点分析

优点

  • 代码可读性强:查询逻辑通过 Python 方法链式调用实现,避免复杂 JSON 字符串的拼接错误。
  • 类型安全:部分操作(如字段名提示)可通过 IDE 静态检查提前发现错误。
  • 功能全面:覆盖 Elasticsearch 的核心功能(查询、聚合、排序、高亮等),支持深度分页和 Scroll API。

局限性

  • 学习成本:需同时掌握 Elasticsearch 查询语法和 DSL 库的类结构,对新手有一定门槛。
  • 性能边界:对于极少数极端复杂的查询(如嵌套多层的布尔查询),直接编写 JSON 可能更高效,但此类场景较为罕见。

1.4 License类型

Elasticsearch DSL 库遵循Apache License 2.0,允许商业使用、修改和再发布,但需保留版权声明。该协议宽松灵活,适合企业级项目和开源项目使用。

二、安装与环境配置

2.1 依赖安装

Elasticsearch DSL 依赖于elasticsearch-py库(Elasticsearch 的官方 Python 客户端),可通过以下命令一次性安装:

pip install elasticsearch-dsl

安装完成后,验证版本:

import elasticsearch_dsl
print(elasticsearch_dsl.__version__)  # 输出当前版本号,如7.17.10

2.2 连接 Elasticsearch 集群

在使用 DSL 库前,需先建立与 Elasticsearch 的连接。支持单机模式和集群模式,示例如下:

单机连接(默认参数)

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

# 创建连接(默认连接本地9200端口)
es = Elasticsearch()

集群连接(指定节点列表)

es = Elasticsearch(
    hosts=["http://es-node1:9200", "http://es-node2:9200"],
    basic_auth=("username", "password"),  # 可选认证信息
    request_timeout=30  # 请求超时时间(秒)
)

连接配置说明

  • hosts:可以是单个节点字符串或节点列表,支持 HTTP/HTTPS 协议。
  • basic_auth:用于开启身份验证的 Elasticsearch 集群(如 X-Pack 安全模式)。
  • ca_certs:指定 CA 证书路径(HTTPS 连接时需要)。

三、核心功能与代码示例

3.1 数据建模与索引管理

通过定义Document子类,可快速创建索引并声明字段映射(Mapping),示例如下:

定义文档模型

from elasticsearch_dsl import Document, Text, Keyword, Integer, Date

class Product(Document):
    name = Text(analyzer="ik_max_word", fields={"keyword": Keyword()})  # 中文分词+ keyword 子字段
    price = Integer()
    category = Keyword()  # 不分词字段(精确匹配)
    create_time = Date()

    class Index:
        name = "products"  # 索引名称
        settings = {
            "number_of_shards": 2,  # 主分片数
            "number_of_replicas": 1  # 副本数
        }

字段类型说明

  • Text:用于全文搜索字段,支持分词器(如中文场景常用ik_max_word)。
  • Keyword:用于精确匹配字段(如 ID、标签、分类),不进行分词。
  • Integer/Float/Date:数值型和日期型字段,支持范围查询。

创建索引

# 检查索引是否存在,不存在则创建
if not Product._index.exists():
    Product.init()  # 基于模型定义自动创建索引
    print("Index 'products' created successfully.")

更新 Mapping(追加字段)

# 新增字段(不覆盖原有 Mapping)
with Product._index as index:
    index.put_mapping(
        properties={
            "description": Text(analyzer="ik_smart")
        }
    )

3.2 基础查询操作

Elasticsearch DSL 通过Search类构建查询,支持链式调用方法组合查询条件。

3.2.1 简单查询:匹配单个字段

# 查询名称包含"手机"的商品,返回前10条结果
s = Search(using=es, index="products") \
    .query("match", name="手机") \
    .sort("-price")  # 按价格降序排列

response = s.execute()
print(f"Total hits: {response.hits.total.value}")
for hit in response.hits:
    print(f"{hit.name}: {hit.price}元")
  • query("match", field=value):执行全文匹配查询,等价于 Elasticsearch 的match查询。
  • sort():支持字段名(升序)或-字段名(降序)。

3.2.2 组合查询:布尔查询(Bool Query)

通过Q对象组合must(必须满足)、filter(过滤,不计算相关性)、should(至少满足一个)等条件:

# 查询价格在1000-3000元之间,且分类为"电子产品"的商品,名称包含"小米"或"华为"
q = Q("bool", 
    filter=Q("range", price={"gte": 1000, "lte": 3000}),
    must=[
        Q("match", category="电子产品"),
        Q("bool", should=[Q("match", name="小米"), Q("match", name="华为")])
    ]
)

s = Search(using=es, index="products").query(q).size(20)
response = s.execute()
  • Q("range", field={"gte": min, "lte": max}):范围查询,gte(大于等于)、lte(小于等于)。
  • bool查询的should子句默认需至少匹配一个条件,可通过minimum_should_match参数调整匹配数量。

3.2.3 精确查询:Term与Terms查询

# 查询分类为"图书"的商品(精确匹配)
s = Search(using=es, index="products").query("term", category="图书")

# 查询多个ID的商品
product_ids = ["P001", "P002", "P003"]
s = Search(using=es, index="products").query("terms", id=product_ids)
  • term查询用于单个精确值匹配,适用于Keyword类型字段。
  • terms查询用于多个值匹配,等价于 SQL 中的IN操作。

3.3 聚合分析(Aggregation)

聚合分析是 Elasticsearch 的核心功能之一,DSL 库通过Aggregation类实现分组统计、指标计算等操作。

3.3.1 桶聚合(Bucket Aggregations):按分类分组统计商品数量

s = Search(using=es, index="products") \
    .aggs.bucket("category_agg", "terms", field="category", size=10)  # 按分类分组,最多返回10个桶

response = s.execute()

# 解析聚合结果
for bucket in response.aggregations.category_agg.buckets:
    print(f"Category: {bucket.key}, Count: {bucket.doc_count}")
  • terms聚合:根据字段值分组,field指定分组字段(需为Keyword类型)。
  • size参数控制返回的桶数量,默认最多返回10个。

3.3.2 指标聚合(Metric Aggregations):计算价格平均值

s = Search(using=es, index="products") \
    .aggs.metric("avg_price", "avg", field="price")  # 计算价格平均值

response = s.execute()
print(f"Average price: {response.aggregations.avg_price.value}")

3.3.3 嵌套聚合:先按分类分组,再在每组内计算价格最大值

s = Search(using=es, index="products") \
    .aggs.bucket("category_agg", "terms", field="category") \
    .metric("max_price", "max", field="price")  # 嵌套在分类分组下的最大值聚合

response = s.execute()
for bucket in response.aggregations.category_agg.buckets:
    print(f"Category: {bucket.key}, Max Price: {bucket.max_price.value}")

3.4 分页与排序

3.4.1 普通分页(from + size)

page = 2  # 页码(从1开始)
page_size = 20
s = Search(using=es, index="products") \
    .query("match_all") \
    .from_( (page-1)*page_size ) \
    .size(page_size) \
    .sort("create_time")  # 按创建时间升序排列
  • from_():指定起始偏移量,注意参数名末尾有下划线(避免与 Python 关键字冲突)。
  • size():每页返回的文档数量,最大值受限于 Elasticsearch 的index.max_result_window设置(默认10000)。

3.4.2 深度分页(Scroll API)

适用于查询结果超过10000条的场景,通过滚动游标分批获取数据:

from elasticsearch_dsl import Scroll

# 创建滚动查询
scroll = Scroll(using=es, index="products", scroll="1m")  # 游标有效期1分钟
s = Search(using=es, index="products").query("match_all").sort("_doc")  # 按文档顺序排序(需固定排序方式)

# 执行首次查询
response = scroll.execute(s)
total_hits = response.hits.total.value
print(f"Total documents: {total_hits}")

# 分批处理数据
batch_size = 1000
processed = 0
while len(response.hits.hits) > 0 and processed < total_hits:
    for hit in response.hits.hits:
        # 处理文档逻辑
        processed += 1
    # 滚动获取下一批数据
    response = scroll.scroll()

# 清除滚动游标
scroll.clear()

3.5 高亮显示查询结果

通过highlight()方法为查询结果中的关键词添加高亮标记:

s = Search(using=es, index="products") \
    .query("match", name="笔记本电脑") \
    .highlight("name", pre_tags="<em>", post_tags="</em>")  # 高亮name字段,包裹<em>标签

response = s.execute()
for hit in response.hits:
    # 原始字段值
    print(f"Name: {hit.name}")
    # 高亮片段(可能包含多个片段,如长文本分词后的结果)
    print("Highlight:", ", ".join(hit.highlight.name))
  • pre_tagspost_tags:指定高亮标签,可自定义 HTML 标签或其他格式。
  • 高亮结果存储在hit.highlight属性中,每个字段对应一个列表(包含多个高亮片段)。

四、实战案例:电商商品搜索服务

4.1 需求背景

构建一个电商平台的商品搜索接口,支持以下功能:

  1. 关键词搜索(商品名称全文匹配)。
  2. 过滤条件:价格区间、分类、品牌(精确匹配)。
  3. 排序方式:按销量降序、按价格升序/降序。
  4. 分页查询,每页返回20条结果。
  5. 显示查询结果中的关键词高亮。

4.2 数据模型定义

假设商品文档包含以下字段:

class Product(Document):
    name = Text(analyzer="ik_max_word", fields={"keyword": Keyword()})  # 中文分词+精确匹配子字段
    price = Integer()
    category = Keyword()  # 分类(如"电子产品"、"图书")
    brand = Keyword()     # 品牌(如"华为"、"京东自营")
    sales = Integer()     # 月销量
    create_time = Date()

    class Index:
        name = "ecommerce_products"
        settings = {"number_of_shards": 3}

4.3 核心查询逻辑代码

def search_products(
    keyword: str = None,
    price_min: int = None,
    price_max: int = None,
    category: str = None,
    brand: str = None,
    sort_by: str = "relevance",  # 可选"sales_desc", "price_asc", "price_desc"
    page: int = 1
):
    s = Search(using=es, index="ecommerce_products")

    # 关键词搜索(全文匹配)
    if keyword:
        s = s.query("match", name=keyword).highlight("name", pre_tags="<strong>", post_tags="</strong>")

    # 过滤条件(精确匹配与范围查询)
    bool_query = Q("bool")
    if category:
        bool_query.filter("term", category=category)
    if brand:
        bool_query.filter("term", brand=brand)
    if price_min or price_max:
        range_query = {}
        if price_min:
            range_query["gte"] = price_min
        if price_max:
            range_query["lte"] = price_max
        bool_query.filter("range", price=range_query)
    s = s.query(bool_query)

    # 排序逻辑
    if sort_by == "sales_desc":
        s = s.sort("-sales")
    elif sort_by == "price_asc":
        s = s.sort("price")
    elif sort_by == "price_desc":
        s = s.sort("-price")
    else:
        # 默认按相关性得分排序
        s = s.sort("_score")

    # 分页
    page_size = 20
    s = s.from_((page-1)*page_size).size(page_size)

    # 执行查询
    response = s.execute()

    # 解析结果
    results = []
    for hit in response.hits:
        result = {
            "id": hit.meta.id,
            "name": hit.name,
            "price": hit.price,
            "category": hit.category,
            "brand": hit.brand,
            "sales": hit.sales,
            "highlight": hit.highlight.name if hasattr(hit.highlight, "name") else []
        }
        results.append(result)

    return {
        "total": response.hits.total.value,
        "page": page,
        "page_size": page_size,
        "results": results
    }

4.4 调用示例与结果

“`python

搜索关键词”华为手机”,分类为”电子产品”,价格≤5000元,按销量降序排列

result = search_products(
keyword=”华为手机”,
category=”电子产品”,
price_max=5

关注我,每天分享一个实用的Python自动化工具。

kafka-python:Python开发者的Kafka数据管道利器

一、Python生态中的数据管道需求

Python作为数据科学与分布式系统开发的首选语言,其生态系统已经覆盖了从数据采集、处理到可视化的全链路。根据2024年Python开发者调查显示,超过65%的专业开发者在项目中需要处理实时数据流,而Apache Kafka凭借其高吞吐量、持久化存储和分布式特性,成为构建实时数据管道的主流选择。

在电商实时推荐系统中,需要处理每秒数千笔的用户行为数据;金融交易平台需要对市场数据进行微秒级的处理;物联网场景中,数百万设备产生的传感器数据需要高效聚合。这些场景都对数据管道的稳定性和性能提出了极高要求。

kafka-python作为Apache Kafka的官方Python客户端库,为Python开发者提供了无缝接入Kafka生态的能力。通过kafka-python,开发者可以轻松构建数据采集、流处理和数据同步等关键组件,让Python应用能够与企业级数据基础设施高效协作。

二、kafka-python库的技术解析

2.1 核心用途

kafka-python是Apache Kafka消息系统的Python客户端实现,主要用于:

  • 构建高吞吐量的数据采集系统,将多源数据汇总到Kafka集群
  • 开发实时流处理应用,从Kafka消费数据并进行实时分析
  • 实现微服务间的异步通信,通过消息队列解耦系统组件
  • 构建数据同步管道,在不同系统间可靠地传输数据

2.2 工作原理

kafka-python通过实现Kafka协议,与Kafka集群进行通信。其核心工作流程包括:

  1. 生产者(Producer)工作流程
  • 消息序列化:将Python对象转换为字节流
  • 分区选择:根据键或轮询策略选择消息存储的分区
  • 批量发送:将多条消息打包发送以提高吞吐量
  • 重试机制:处理网络波动导致的发送失败
  1. 消费者(Consumer)工作流程
  • 组协调:加入消费者组并分配分区
  • 偏移量管理:记录消费位置,支持断点续传
  • 消息拉取:定期从Kafka拉取消息批次
  • 反序列化:将字节流转换为Python对象

2.3 技术优势

  • 兼容性强:支持所有Kafka版本,包括最新的3.5.x版本
  • 功能完整:实现了Kafka的全部核心功能,包括事务、幂等生产等
  • 性能优化:通过批量处理和异步IO,达到接近原生客户端的性能
  • 社区活跃:GitHub上每月有数百次提交,问题响应迅速
  • 文档完善:提供了详细的API文档和使用示例

2.4 局限性

  • 同步API限制:默认API为同步阻塞模式,在高并发场景下需要配合asyncio使用
  • 复杂配置:对于初学者,Kafka本身的配置参数较多,需要一定学习成本
  • 高级功能支持有限:某些Kafka特有功能(如MirrorMaker)需要额外开发

2.5 License信息

kafka-python采用Apache License 2.0许可协议,允许商业使用、修改和再分发,无需支付许可费用。这使得它非常适合企业级项目使用。

三、kafka-python的安装与环境准备

3.1 安装kafka-python库

使用pip安装kafka-python是最简便的方式:

pip install kafka-python

对于需要特定版本的项目,可以指定版本号:

pip install kafka-python==2.0.2

3.2 验证安装

安装完成后,可以通过以下命令验证是否安装成功:

python -c "import kafka; print(kafka.__version__)"

3.3 Kafka环境准备

要使用kafka-python,需要有一个可用的Kafka集群。对于开发和测试环境,可以使用Docker快速搭建:

# 创建docker-compose.yml文件
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.3
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.3.3
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

启动Kafka环境:

docker-compose up -d

验证Kafka是否正常运行:

docker-compose logs -f kafka

3.4 创建测试主题

使用Kafka命令行工具创建一个测试主题:

docker-compose exec kafka kafka-topics --create --topic test_topic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

查看主题列表确认创建成功:

docker-compose exec kafka kafka-topics --list --bootstrap-server localhost:9092

四、kafka-python核心功能详解

4.1 生产者(Producer)基础使用

生产者是向Kafka主题发送消息的组件。下面是一个简单的生产者示例:

from kafka import KafkaProducer
import json

# 创建生产者实例
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Kafka集群地址
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # 消息值序列化方式
    key_serializer=lambda k: str(k).encode('utf-8'),  # 消息键序列化方式
    retries=3  # 发送失败时的重试次数
)

# 发送消息
try:
    # 发送单条消息
    future = producer.send(
        topic='test_topic',
        value={'name': 'Alice', 'age': 30},
        key=1,  # 消息键,用于消息分区
        partition=0  # 指定分区,可选
    )

    # 等待消息发送结果
    record_metadata = future.get(timeout=10)
    print(f"消息发送成功,主题: {record_metadata.topic}")
    print(f"分区: {record_metadata.partition}")
    print(f"偏移量: {record_metadata.offset}")

except Exception as e:
    print(f"消息发送失败: {e}")

finally:
    # 关闭生产者连接
    producer.close()

这个示例展示了生产者的基本使用流程:

  1. 创建生产者实例时,需要指定Kafka集群地址和序列化方式
  2. 使用send()方法发送消息,返回一个Future对象
  3. 调用future.get()等待消息发送结果,获取元数据
  4. 处理可能的异常
  5. 关闭生产者连接

4.2 批量消息发送

在实际应用中,为了提高吞吐量,通常会批量发送消息:

from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    batch_size=16384,  # 批处理大小(字节)
    linger_ms=5  # 发送前等待的毫秒数,增加此值可以提高吞吐量
)

# 模拟批量发送100条消息
for i in range(100):
    message = {'id': i, 'timestamp': time.time()}
    producer.send('test_topic', value=message)

    # 每10条消息刷新一次缓冲区
    if i % 10 == 0:
        producer.flush()

# 确保所有消息都被发送
producer.flush()
producer.close()

批量发送的关键参数:

  • batch_size:批处理大小,达到此大小时会触发发送
  • linger_ms:发送前等待的时间,即使未达到批处理大小
  • buffer_memory:生产者缓冲区大小

4.3 消费者(Consumer)基础使用

消费者从Kafka主题读取消息:

from kafka import KafkaConsumer
import json

# 创建消费者实例
consumer = KafkaConsumer(
    'test_topic',  # 订阅的主题
    bootstrap_servers=['localhost:9092'],
    group_id='my_consumer_group',  # 消费者组ID
    auto_offset_reset='earliest',  # 从最早的消息开始消费
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),  # 消息值反序列化
    max_poll_records=100,  # 每次拉取的最大消息数
    enable_auto_commit=True,  # 启用自动提交偏移量
    auto_commit_interval_ms=5000  # 自动提交间隔(毫秒)
)

# 消费消息
try:
    for message in consumer:
        # 消息元数据
        print(f"分区: {message.partition}, 偏移量: {message.offset}")
        print(f"键: {message.key}, 值: {message.value}")

        # 处理业务逻辑
        process_message(message.value)

except KeyboardInterrupt:
    print("消费被用户中断")

finally:
    # 关闭消费者连接
    consumer.close()

消费者的关键配置参数:

  • group_id:消费者组ID,相同组的消费者会共同消费主题分区
  • auto_offset_reset:重置偏移量策略,可选earliestlatest
  • enable_auto_commit:是否启用自动提交偏移量
  • max_poll_records:每次拉取的最大消息数

4.4 手动管理偏移量

在某些场景下,需要手动控制偏移量的提交:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['localhost:9092'],
    group_id='manual_commit_group',
    auto_offset_reset='earliest',
    enable_auto_commit=False  # 禁用自动提交
)

try:
    for message in consumer:
        # 处理消息
        process_message(message.value)

        # 手动提交偏移量
        if should_commit():  # 自定义提交条件
            consumer.commit()
            print(f"手动提交偏移量: {message.offset}")

except Exception as e:
    print(f"消费过程中发生错误: {e}")

finally:
    consumer.close()

手动管理偏移量的优势:

  • 确保消息处理成功后才提交偏移量
  • 实现精确一次(Exactly Once)语义
  • 在批量处理场景中,可以批量提交偏移量

4.5 消费者组与分区分配

kafka-python支持多种分区分配策略:

from kafka import KafkaConsumer
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor

# 创建消费者,使用Range和RoundRobin分配策略
consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['localhost:9092'],
    group_id='partition_assignment_group',
    partition_assignment_strategy=[RangePartitionAssignor, RoundRobinPartitionAssignor]
)

# 消费消息
try:
    for message in consumer:
        print(f"消费消息: 分区={message.partition}, 偏移量={message.offset}")
finally:
    consumer.close()

常见的分区分配策略:

  • RangePartitionAssignor:按主题的分区范围分配
  • RoundRobinPartitionAssignor:轮询分配所有主题的分区
  • StickyPartitionAssignor:粘性分配,尽量保持现有分配关系

4.6 高级生产者配置

以下是一个配置了幂等性和事务的生产者示例:

from kafka import KafkaProducer
import json

# 创建支持幂等性的生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    enable_idempotence=True,  # 启用幂等性
    max_in_flight_requests_per_connection=5,  # 每个连接允许的最大飞行中请求数
    acks='all',  # 所有副本都确认后才认为发送成功
    retries=10  # 重试次数
)

# 创建支持事务的生产者
transactional_producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    transactional_id='my_transactional_id'  # 必须设置事务ID
)

# 初始化事务
transactional_producer.init_transactions()

try:
    # 开始事务
    transactional_producer.begin_transaction()

    # 发送多条消息
    transactional_producer.send('topic1', {'data': 'message1'})
    transactional_producer.send('topic2', {'data': 'message2'})

    # 提交事务
    transactional_producer.commit_transaction()

except Exception as e:
    # 回滚事务
    transactional_producer.abort_transaction()
    print(f"事务失败: {e}")

finally:
    producer.close()
    transactional_producer.close()

幂等性和事务的关键配置:

  • enable_idempotence=True:确保生产者不会发送重复消息
  • acks='all':所有副本都确认后才认为发送成功
  • transactional_id:必须设置事务ID才能使用事务
  • init_transactions():初始化事务
  • begin_transaction():开始事务
  • commit_transaction():提交事务
  • abort_transaction():回滚事务

五、kafka-python在实际项目中的应用

5.1 实时日志收集系统

下面是一个使用kafka-python构建的实时日志收集系统示例:

# 日志生产者 - 将应用日志发送到Kafka
import logging
from kafka import KafkaHandler

# 配置Kafka日志处理器
kafka_handler = KafkaHandler(
    bootstrap_servers=['localhost:9092'],
    topic='application_logs',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 配置日志记录器
logger = logging.getLogger('application')
logger.setLevel(logging.INFO)
logger.addHandler(kafka_handler)

# 应用代码中记录日志
try:
    # 业务逻辑
    result = 1 / 0
except Exception as e:
    logger.error(f"发生错误: {str(e)}", exc_info=True)

# 日志消费者 - 从Kafka读取日志并存储到Elasticsearch
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json

# 连接Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

# 创建Kafka消费者
consumer = KafkaConsumer(
    'application_logs',
    bootstrap_servers=['localhost:9092'],
    group_id='log_consumer_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# 消费日志并存储到Elasticsearch
for message in consumer:
    log_entry = message.value

    # 构建Elasticsearch文档
    doc = {
        'timestamp': log_entry.get('timestamp'),
        'level': log_entry.get('level'),
        'message': log_entry.get('message'),
        'exception': log_entry.get('exception')
    }

    # 索引文档
    es.index(index='application_logs', doc_type='_doc', body=doc)

这个日志收集系统的工作流程:

  1. 应用程序将日志发送到Kafka的application_logs主题
  2. 日志消费者从Kafka读取日志
  3. 消费者将日志格式化后存储到Elasticsearch
  4. 可以通过Kibana可视化查询日志

5.2 电商实时推荐系统

以下是一个简化的电商实时推荐系统:

# 行为数据收集服务 - 生产者
from kafka import KafkaProducer
import json
from flask import Flask, request

app = Flask(__name__)

# 创建Kafka生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 接收用户行为数据的API
@app.route('/track', methods=['POST'])
def track_user_behavior():
    data = request.json

    # 发送用户行为数据到Kafka
    producer.send('user_behaviors', data)

    return json.dumps({'status': 'success'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

# 实时推荐引擎 - 消费者
from kafka import KafkaConsumer
import json
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# 创建Kafka消费者
consumer = KafkaConsumer(
    'user_behaviors',
    bootstrap_servers=['localhost:9092'],
    group_id='recommendation_engine_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# 简单的基于用户的协同过滤推荐算法
class RecommendationEngine:
    def __init__(self):
        self.user_profiles = {}  # 用户画像
        self.item_vectors = {}   # 商品向量

    def update_user_profile(self, user_id, item_id, behavior):
        # 更新用户画像
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = {}

        # 简化的行为权重:点击=1,收藏=2,购买=3
        weight = {'click': 1, 'favorite': 2, 'purchase': 3}.get(behavior, 1)

        if item_id in self.item_vectors:
            # 将商品向量纳入用户画像
            for feature, value in self.item_vectors[item_id].items():
                self.user_profiles[user_id][feature] = self.user_profiles[user_id].get(feature, 0) + value * weight

    def recommend_items(self, user_id, top_n=5):
        if user_id not in self.user_profiles:
            return []

        user_vector = self.user_profiles[user_id]

        # 计算用户向量与所有商品向量的相似度
        similarities = []
        for item_id, item_vector in self.item_vectors.items():
            # 构建比较向量
            common_features = set(user_vector.keys()) & set(item_vector.keys())
            if not common_features:
                continue

            user_compare = np.array([user_vector.get(f, 0) for f in common_features])
            item_compare = np.array([item_vector.get(f, 0) for f in common_features])

            # 计算余弦相似度
            similarity = cosine_similarity([user_compare], [item_compare])[0][0]
            similarities.append((item_id, similarity))

        # 按相似度排序并返回前N个商品
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_n]

# 初始化推荐引擎
engine = RecommendationEngine()

# 消费用户行为数据并更新推荐模型
for message in consumer:
    behavior = message.value

    user_id = behavior.get('user_id')
    item_id = behavior.get('item_id')
    action = behavior.get('action')

    # 更新推荐模型
    engine.update_user_profile(user_id, item_id, action)

    # 为用户生成推荐
    recommendations = engine.recommend_items(user_id)

    # 将推荐结果发送到推荐结果主题
    if recommendations:
        recommendation_data = {
            'user_id': user_id,
            'recommendations': [item_id for item_id, _ in recommendations]
        }
        producer.send('recommendation_results', recommendation_data)

这个实时推荐系统的工作流程:

  1. Web应用通过API接收用户行为数据
  2. API服务将行为数据发送到Kafka的user_behaviors主题
  3. 推荐引擎消费行为数据,更新用户画像
  4. 推荐引擎基于用户画像生成推荐结果
  5. 推荐结果被发送到Kafka的recommendation_results主题
  6. 前端应用可以消费推荐结果主题,展示个性化推荐

5.3 金融交易实时监控系统

下面是一个金融交易实时监控系统的示例:

# 交易数据生产者
from kafka import KafkaProducer
import json
import random
import time

# 创建Kafka生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 模拟生成交易数据
def generate_transaction():
    transaction_id = random.randint(100000, 999999)
    user_id = random.randint(1, 1000)
    amount = round(random.uniform(10, 10000), 2)
    currency = random.choice(['USD', 'EUR', 'GBP', 'CNY'])
    merchant = random.choice(['Amazon', 'Alibaba', 'eBay', 'Walmart', 'Target'])
    country = random.choice(['US', 'UK', 'DE', 'FR', 'CN', 'JP'])

    return {
        'transaction_id': transaction_id,
        'user_id': user_id,
        'amount': amount,
        'currency': currency,
        'merchant': merchant,
        'country': country,
        'timestamp': time.time()
    }

# 持续生成并发送交易数据
try:
    while True:
        transaction = generate_transaction()
        producer.send('financial_transactions', transaction)
        print(f"发送交易: {transaction['transaction_id']}")
        time.sleep(0.5)  # 每秒发送2条交易
except KeyboardInterrupt:
    print("程序被用户中断")
finally:
    producer.close()

# 实时欺诈检测消费者
from kafka import KafkaConsumer, KafkaProducer
import json
import time

# 创建消费者和生产者
consumer = KafkaConsumer(
    'financial_transactions',
    bootstrap_servers=['localhost:9092'],
    group_id='fraud_detection_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 简单的欺诈检测规则
class FraudDetector:
    def __init__(self):
        self.user_transactions = {}  # 存储用户交易历史
        self.suspicious_merchants = {'phishing-site1.com', 'malicious-store2.net'}

    def detect_fraud(self, transaction):
        user_id = transaction['user_id']
        amount = transaction['amount']
        merchant = transaction['merchant']
        country = transaction['country']

        # 规则1: 检查是否是可疑商户
        if merchant in self.suspicious_merchants:
            return True, "可疑商户"

        # 规则2: 检查大额交易
        if amount > 5000:
            return True, "交易金额过大"

        # 规则3: 检查异常国家交易
        user_countries = self.user_transactions.get(user_id, {}).get('countries', set())
        if user_countries and country not in user_countries and len(user_countries) > 3:
            return True, "异常交易国家"

        # 规则4: 检查短时间内频繁交易
        user_timestamps = self.user_transactions.get(user_id, {}).get('timestamps', [])
        recent_transactions = [t for t in user_timestamps if time.time() - t < 300]  # 5分钟内
        if len(recent_transactions) > 5:
            return True, "短时间内频繁交易"

        # 更新用户交易历史
        if user_id not in self.user_transactions:
            self.user_transactions[user_id] = {
                'countries': set(),
                'timestamps': []
            }

        self.user_transactions[user_id]['countries'].add(country)
        self.user_transactions[user_id]['timestamps'].append(transaction['timestamp'])

        # 清理旧的时间戳
        self.user_transactions[user_id]['timestamps'] = [
            t for t in self.user_transactions[user_id]['timestamps'] if time.time() - t < 3600
        ]

        return False, ""

# 初始化欺诈检测器
detector = FraudDetector()

# 消费交易数据并进行欺诈检测
for message in consumer:
    transaction = message.value

    # 进行欺诈检测
    is_fraud, reason = detector.detect_fraud(transaction)

    # 如果检测到欺诈,发送警报
    if is_fraud:
        alert = {
            'transaction_id': transaction['transaction_id'],
            'user_id': transaction['user_id'],
            'timestamp': time.time(),
            'reason': reason,
            'transaction_details': transaction
        }

        producer.send('fraud_alerts', alert)
        print(f"欺诈警报: 交易 {transaction['transaction_id']} - {reason}")

这个金融交易监控系统的工作流程:

  1. 交易生成器模拟产生金融交易数据并发送到Kafka
  2. 欺诈检测系统消费交易数据
  3. 应用多个欺诈检测规则分析交易
  4. 如果检测到欺诈,发送警报到专门的主题
  5. 可以配置通知系统消费警报主题,及时通知相关人员

六、kafka-python性能优化与最佳实践

6.1 生产者性能优化

提高生产者吞吐量的关键配置:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    batch_size=32768,  # 增大批处理大小(字节)
    linger_ms=10,  # 增加等待时间,让批次更满
    compression_type='lz4',  # 启用压缩:'gzip', 'snappy', 'lz4' 或 'zstd'
    buffer_memory=33554432,  # 增大缓冲区大小(字节)
    max_in_flight_requests_per_connection=5,  # 允许更多飞行中请求
    acks=1  # 只需要leader确认(牺牲一点可靠性换取更高吞吐量)
)

6.2 消费者性能优化

提高消费者吞吐量的关键配置:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'high_throughput_topic',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    group_id='performance_consumer_group',
    fetch_min_bytes=1048576,  # 每次拉取的最小数据量(字节)
    fetch_max_wait_ms=500,  # 等待数据的最大时间(毫秒)
    max_poll_records=500,  # 每次poll的最大消息数
    max_partition_fetch_bytes=5242880,  # 每个分区每次拉取的最大字节数
    enable_auto_commit=True,  # 启用自动提交以减少开销
    auto_commit_interval_ms=10000  # 增加自动提交间隔
)

6.3 错误处理与重试机制

完善的错误处理与重试机制:

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import time

# 生产者错误处理
producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    retries=5,  # 自动重试次数
    retry_backoff_ms=500  # 重试间隔(毫秒)
)

def send_message_with_retry(topic, message, max_retries=3):
    retries = 0
    while retries < max_retries:
        try:
            future = producer.send(topic, message)
            result = future.get(timeout=10)  # 等待发送结果
            return result
        except KafkaError as e:
            print(f"发送失败,尝试重试 ({retries+1}/{max_retries}): {e}")
            retries += 1
            time.sleep(2 ** retries)  # 指数退避
    print(f"发送失败,已达到最大重试次数")
    return None

# 消费者错误处理
consumer = KafkaConsumer(
    'error_handling_topic',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    group_id='error_handling_group',
    enable_auto_commit=False  # 禁用自动提交,手动控制偏移量
)

for message in consumer:
    try:
        # 处理消息
        process_message(message.value)

        # 处理成功后提交偏移量
        consumer.commit()
    except Exception as e:
        print(f"处理消息失败: {e}")

        # 可以选择将失败的消息发送到死信队列
        send_to_dlq(message)

        # 继续处理下一条消息,或者根据情况暂停处理

6.4 监控与指标收集

集成Prometheus和Grafana进行监控:

from kafka import KafkaConsumer
from prometheus_client import start_http_server, Counter, Histogram
import time

# 定义监控指标
kafka_messages_consumed = Counter(
    'kafka_messages_consumed_total', 
    'Total number of Kafka messages consumed',
    ['topic', 'partition']
)

message_processing_time = Histogram(
    'message_processing_seconds', 
    'Time spent processing Kafka messages',
    ['topic']
)

# 启动Prometheus指标服务器
start_http_server(8000)

# 创建Kafka消费者
consumer = KafkaConsumer(
    'monitoring_topic',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092']
)

# 消费消息并记录指标
for message in consumer:
    start_time = time.time()

    # 记录消费的消息数量
    kafka_messages_consumed.labels(
        topic=message.topic,
        partition=message.partition
    ).inc()

    # 处理消息
    process_message(message.value)

    # 记录消息处理时间
    processing_time = time.time() - start_time
    message_processing_time.labels(topic=message.topic).observe(processing_time)

在Grafana中,可以创建以下仪表盘:

  1. 消息吞吐量:每秒处理的消息数量
  2. 消息处理延迟:处理单个消息的平均时间
  3. 错误率:处理失败的消息比例
  4. 消费者滞后:消费者与生产者之间的偏移量差距

七、kafka-python与其他技术栈的集成

7.1 与Flask Web框架集成

以下是一个将kafka-python与Flask集成的示例:

from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumer
import json
import threading

app = Flask(__name__)

# 配置Kafka连接
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC_REQUESTS = 'api_requests'
KAFKA_TOPIC_RESPONSES = 'api_responses'

# 创建生产者
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 创建消费者(在单独线程中运行)
def consume_responses():
    consumer = KafkaConsumer(
        KAFKA_TOPIC_RESPONSES,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id='flask_consumer_group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    for message in consumer:
        # 处理响应
        process_response(message.value)

# 启动消费者线程
response_thread = threading.Thread(target=consume_responses)
response_thread.daemon = True
response_thread.start()

# API端点 - 接收请求并发送到Kafka
@app.route('/api/data', methods=['POST'])
def process_data():
    data = request.json

    # 发送数据到Kafka
    producer.send(KAFKA_TOPIC_REQUESTS, data)

    return jsonify({'status': 'success', 'message': 'Request received'})

if __name__ == '__main__':
    app.run(debug=True)

这个集成方案的优势:

  1. 解耦API处理和业务逻辑
  2. 提高API响应速度
  3. 实现异步处理
  4. 便于横向扩展

7.2 与Spark Streaming集成

以下是kafka-python与Spark Streaming集成的示例:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from kafka import KafkaProducer
import json

# 创建Spark上下文
sc = SparkContext("local[2]", "KafkaSparkIntegration")
ssc = StreamingContext(sc, 5)  # 5秒批处理间隔

# 配置Kafka参数
kafka_params = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "spark_consumer_group",
    "auto.offset.reset": "latest"
}

# 创建Kafka流
kafka_stream = ssc \
    .kafkaUtils \
    .createDirectStream(
        ["input_topic"],
        kafka_params
    )

# 处理流数据
def process_batch(rdd):
    if not rdd.isEmpty():
        # 解析JSON消息
        parsed_rdd = rdd.map(lambda msg: json.loads(msg[1]))

        # 执行转换操作
        transformed_rdd = parsed_rdd \
            .filter(lambda data: data.get('value') > 100) \
            .map(lambda data: (data.get('key'), data.get('value') * 2))

        # 将结果发送回Kafka
        def send_to_kafka(partition):
            producer = KafkaProducer(
                bootstrap_servers=['localhost:9092'],
                value_serializer=lambda v: json.dumps(v).encode('utf-8')
            )

            for record in partition:
                key, value = record
                producer.send('output_topic', {'key': key, 'value': value})

            producer.close()

        transformed_rdd.foreachPartition(send_to_kafka)

# 处理每个批次
kafka_stream.foreachRDD(process_batch)

# 启动流处理
ssc.start()
ssc.awaitTermination()

这个集成方案的工作流程:

  1. Spark Streaming从Kafka的input_topic消费数据
  2. 对数据进行过滤和转换操作
  3. 将处理结果发送回Kafka的output_topic
  4. 可以配置其他系统消费output_topic获取处理后的数据

7.3 与TensorFlow集成

以下是kafka-python与TensorFlow集成的示例:

import tensorflow as tf
from kafka import KafkaConsumer, KafkaProducer
import numpy as np
import json
import threading

# 加载预训练的模型
model = tf.keras.models.load_model('image_classification_model')

# 创建Kafka消费者和生产者
consumer = KafkaConsumer(
    'image_prediction_requests',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 图像处理和预测函数
def process_image(image_data):
    # 假设image_data是图像的base64编码
    # 这里需要解码并预处理图像
    image = preprocess_image(image_data)

    # 模型预测
    predictions = model.predict(np.array([image]))

    # 获取预测结果
    predicted_class = np.argmax(predictions[0])
    confidence = float(predictions[0][predicted_class])

    return {
        'class': int(predicted_class),
        'confidence': confidence
    }

# 消费消息并进行预测
def consume_and_predict():
    for message in consumer:
        request = message.value

        try:
            # 处理图像并获取预测结果
            result = process_image(request['image_data'])

            # 构建响应
            response = {
                'request_id': request['request_id'],
                'timestamp': time.time(),
                'result': result
            }

            # 发送响应到结果主题
            producer.send('image_prediction_results', response)

        except Exception as e:
            print(f"处理请求失败: {e}")

# 启动处理线程
prediction_thread = threading.Thread(target=consume_and_predict)
prediction_thread.daemon = True
prediction_thread.start()

# 保持主线程运行
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("程序被用户中断")
    consumer.close()
    producer.close()

这个集成方案的工作流程:

  1. 客户端将图像数据发送到Kafka的image_prediction_requests主题
  2. TensorFlow服务消费请求主题
  3. 对图像进行预处理和模型预测
  4. 将预测结果发送到image_prediction_results主题
  5. 客户端可以消费结果主题获取预测结果

八、kafka-python的常见问题与解决方案

8.1 连接问题

问题描述:无法连接到Kafka集群

可能原因

  1. Kafka服务器地址配置错误
  2. 网络不通
  3. Kafka服务器未启动
  4. 安全认证配置不正确

解决方案

# 验证连接的简单脚本
from kafka import KafkaAdminClient
from kafka.errors import KafkaError

try:
    admin_client = KafkaAdminClient(
        bootstrap_servers=['localhost:9092'],
        client_id='connection_test'
    )

    # 获取集群元数据
    metadata = admin_client.list_topics()
    print(f"成功连接到Kafka集群,可用主题: {metadata}")

except KafkaError as e:
    print(f"连接失败: {e}")
    # 打印详细的错误信息
    import traceback
    print(traceback.format_exc())

8.2 消息丢失问题

问题描述:发送的消息没有被消费到

可能原因

  1. 消息发送失败但没有处理异常
  2. 生产者配置了acks=0
  3. 消息序列化/反序列化不匹配
  4. 消费者组偏移量管理不当

解决方案

# 可靠的消息发送模式
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks='all',  # 所有副本都确认
    retries=3,
    max_in_flight_requests_per_connection=1  # 确保消息按顺序发送
)

def send_message_safely(topic, key, value):
    try:
        future = producer.send(topic, key=key, value=value)
        result = future.get(timeout=10)  # 等待确认
        print(f"消息发送成功: 主题={result.topic}, 分区={result.partition}, 偏移量={result.offset}")
        return True
    except KafkaError as e:
        print(f"消息发送失败: {e}")
        # 可以添加重试逻辑或记录错误日志
        return False

8.3 消费者滞后问题

问题描述:消费者处理速度跟不上生产者,偏移量差距越来越大

可能原因

  1. 消费者处理逻辑太慢
  2. 消费者数量不足
  3. 主题分区数不足
  4. 网络带宽不足

解决方案

  1. 优化消费者处理逻辑,提高处理速度
  2. 增加消费者实例,扩大消费者组
  3. 增加主题分区数,提高并行度
  4. 监控网络带宽,确保足够的吞吐量
# 监控消费者滞后的脚本
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient

# 获取主题的最新偏移量
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic_partitions = admin_client.list_partitions('my_topic')

# 创建一个只用于获取最新偏移量的消费者
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
partitions = [TopicPartition('my_topic', p) for p in topic_partitions.keys()]

# 获取每个分区的最新偏移量
end_offsets = consumer.end_offsets(partitions)

# 创建实际的消费者
group_consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='my_consumer_group',
    enable_auto_commit=False
)

# 分配分区
group_consumer.assign(partitions)

# 查找当前消费者组的位置
group_consumer.seek_to_beginning()  # 先重置到开始位置,以便获取当前位置
current_offsets = {}
for partition in partitions:
    current_offsets[partition] = group_consumer.position(partition)

# 计算滞后量
lags = {}
for partition in partitions:
    lags[partition] = end_offsets[partition] - current_offsets.get(partition, 0)

print("消费者滞后情况:")
for partition, lag in lags.items():
    print(f"分区 {partition.partition}: 滞后 {lag} 条消息")

8.4 序列化/反序列化问题

问题描述:消费者无法正确解析生产者发送的消息

可能原因

  1. 生产者和消费者使用了不同的序列化方式
  2. 消息格式变更,但没有做好版本兼容
  3. 缺少必要的依赖库

解决方案

# 统一的序列化/反序列化工具
import json
import pickle

class Serializer:
    @staticmethod
    def serialize_json(data):
        return json.dumps(data).encode('utf-8')

    @staticmethod
    def deserialize_json(data):
        return json.loads(data.decode('utf-8'))

    @staticmethod
    def serialize_pickle(data):
        return pickle.dumps(data)

    @staticmethod
    def deserialize_pickle(data):
        return pickle.loads(data)

# 生产者使用
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=Serializer.serialize_json
)

# 消费者使用
consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=Serializer.deserialize_json
)

九、kafka-python的资源链接

  • Pypi地址:https://pypi.org/project/kafka-python/
  • Github地址:https://github.com/dpkp/kafka-python
  • 官方文档地址:https://kafka-python.readthedocs.io/en/master/

通过本文的介绍,你已经了解了kafka-python的基本原理、核心功能和实际应用场景。作为Apache Kafka的官方Python客户端,kafka-python为Python开发者提供了强大而灵活的数据管道解决方案。无论是构建实时日志收集系统、电商推荐引擎还是金融交易监控平台,kafka-python都能帮助你高效地处理和传输数据流。

在实际项目中,你可以根据具体需求选择合适的配置参数,并结合其他Python库和框架,构建出更加复杂和强大的实时数据处理系统。通过合理的性能优化和错误处理策略,你可以确保系统的稳定性和可靠性,满足生产环境的严格要求。

关注我,每天分享一个实用的Python自动化工具。