Python实用工具:HappyBase 入门到精通——高效操作HBase数据库指南

一、HappyBase 库核心概述

HappyBase 是一款专为 Python 开发者打造的HBase 数据库交互库,其核心用途是简化 Python 程序与 HBase 分布式数据库的连接、数据读写及管理操作。工作原理上,HappyBase 基于 HBase 的 Thrift 接口实现通信,通过封装复杂的 Thrift 协议调用逻辑,提供简洁直观的 Python 风格 API,让开发者无需深入理解 Thrift 细节即可高效操作 HBase。

该库的优点十分突出:API 设计简洁易懂,贴近 Python 开发者使用习惯;支持连接池管理,能有效提升高并发场景下的连接复用率;兼容 HBase 主流版本,具备良好的通用性。缺点则集中在对 HBase 高级特性(如事务、复杂过滤器)的支持有限,且依赖 Thrift 服务的稳定运行,Thrift 服务的性能瓶颈会直接影响 HappyBase 的操作效率。

HappyBase 的开源协议为 MIT License,这意味着开发者可以自由地用于商业和非商业项目,无需承担开源协议带来的额外约束。

二、HappyBase 安装与环境准备

2.1 前置条件

在安装和使用 HappyBase 之前,必须确保以下环境准备到位:

  1. HBase 集群部署完成:HBase 是分布式数据库,需提前搭建好单节点或集群环境,且保证 HBase 服务正常运行。
  2. Thrift 服务启动:HappyBase 依赖 HBase 的 Thrift 接口,因此需要启动 HBase Thrift 服务。启动命令如下(在 HBase 安装目录的 bin 文件夹下执行):
    bash hbase thrift start
    若需要后台运行 Thrift 服务,可添加 -b 参数指定绑定地址,配合 nohup 命令实现:
    bash nohup hbase thrift start -b 0.0.0.0 > thrift.log 2>&1 &
  3. Python 环境:推荐使用 Python 3.6 及以上版本,确保 pip 包管理工具可用。

2.2 安装 HappyBase

HappyBase 可通过 pip 工具一键安装,这是最简单且推荐的方式。打开命令行终端,执行以下命令:

pip install happybase

若需要安装指定版本的 HappyBase(例如兼容特定 HBase 版本的 1.2.0 版本),可指定版本号:

pip install happybase==1.2.0

安装完成后,可在 Python 环境中执行以下代码验证是否安装成功:

import happybase
print(happybase.__version__)

若终端输出 HappyBase 的版本号(如 1.2.0),则说明安装成功。

三、HappyBase 核心 API 用法与代码实例

HappyBase 的核心操作围绕连接 HBase表操作数据读写三大模块展开,下面结合具体代码实例详细讲解每个模块的使用方法。

3.1 连接 HBase 数据库

连接 HBase 是使用 HappyBase 的第一步,主要通过 happybase.Connection() 方法创建连接对象。该方法支持多个参数,常用参数说明如下:

  • host:HBase Thrift 服务的主机地址,默认值为 localhost
  • port:HBase Thrift 服务的端口号,默认值为 9090
  • timeout:连接超时时间,单位为毫秒,默认无超时限制。
  • autoconnect:是否自动建立连接,默认值为 True

3.1.1 基础连接示例

import happybase

# 创建连接对象
conn = happybase.Connection(
    host='localhost',  # 替换为你的 HBase Thrift 服务地址
    port=9090,
    timeout=10000
)

# 查看当前 HBase 中的所有表名
tables = conn.tables()
print("HBase 中已存在的表:", tables)

# 关闭连接
conn.close()

代码说明

  • 首先导入 happybase 库,然后通过 Connection() 方法指定 HBase Thrift 服务的 hostport,创建连接对象 conn
  • conn.tables() 方法会返回 HBase 中所有表的名称列表,返回结果为字节串格式(如 [b'test_table'])。
  • 操作完成后,需调用 conn.close() 关闭连接,释放资源。

3.1.2 使用连接池管理连接

在高并发场景下,频繁创建和关闭连接会消耗大量系统资源,HappyBase 提供了连接池功能来解决这个问题。通过 happybase.ConnectionPool() 可以创建连接池,实现连接的复用。

import happybase

# 创建连接池,指定池大小为 10
pool = happybase.ConnectionPool(
    size=10,
    host='localhost',
    port=9090
)

# 从连接池中获取连接并执行操作
with pool.connection() as conn:
    tables = conn.tables()
    print("通过连接池获取的表列表:", tables)

代码说明

  • ConnectionPool()size 参数指定连接池的最大连接数。
  • 使用 with 语句从连接池中获取连接,with 代码块执行完毕后会自动将连接归还到池中,无需手动关闭。

3.2 表的创建、删除与列表查询

HBase 中的表是数据存储的核心载体,HappyBase 提供了完整的表生命周期管理 API,包括创建表、删除表、检查表是否存在等操作。

3.2.1 创建表

创建表需要指定表名列族,列族是 HBase 中数据组织的基本单位,一个表可以包含多个列族。创建表的方法是 conn.create_table(),参数说明如下:

  • name:表名,字符串格式。
  • families:列族配置字典,键为列族名称,值为列族的属性配置(如版本数、过期时间等)。
import happybase

# 建立连接
conn = happybase.Connection(host='localhost', port=9090)

# 定义列族配置:创建两个列族 info 和 data,版本数均为 3
column_families = {
    'info': dict(max_versions=3),
    'data': dict(max_versions=3, time_to_live=86400)  # time_to_live 单位为秒,此处为 1 天
}

# 创建表,表名为 student
table_name = 'student'
if table_name.encode() not in conn.tables():
    conn.create_table(table_name, column_families)
    print(f"表 {table_name} 创建成功!")
else:
    print(f"表 {table_name} 已存在!")

# 关闭连接
conn.close()

代码说明

  • 首先定义列族配置字典 column_families,其中 info 列族的最大版本数为 3,data 列族的最大版本数为 3,且数据过期时间为 1 天。
  • 由于 conn.tables() 返回的表名是字节串格式,因此需要将表名字符串 student 转换为字节串(table_name.encode())后再进行判断,避免重复创建表。

3.2.2 删除表

删除表前需要先禁用表(HBase 的强制要求),然后再执行删除操作。对应的方法分别是 conn.disable_table()conn.delete_table()

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table_name = 'student'

if table_name.encode() in conn.tables():
    # 禁用表
    conn.disable_table(table_name)
    print(f"表 {table_name} 已禁用")
    # 删除表
    conn.delete_table(table_name)
    print(f"表 {table_name} 删除成功!")
else:
    print(f"表 {table_name} 不存在!")

conn.close()

代码说明

  • 禁用表是删除表的前提步骤,如果直接删除未禁用的表,会抛出 TApplicationException 异常。
  • 执行完删除操作后,该表及其所有数据会被彻底清除,操作需谨慎。

3.2.3 检查表是否存在

除了通过 conn.tables() 列表判断表是否存在外,HappyBase 还提供了更简洁的 conn.table_exists() 方法(部分版本支持),使用示例如下:

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table_name = 'student'

# 检查表是否存在
if conn.table_exists(table_name):
    print(f"表 {table_name} 存在")
else:
    print(f"表 {table_name} 不存在")

conn.close()

3.3 数据的增删改查操作

表创建完成后,核心操作就是对表中数据的增删改查。HappyBase 通过 Table 对象来操作表数据,获取 Table 对象的方法是 conn.table(table_name)

3.3.1 插入数据(Put 操作)

插入数据使用 Table.put() 方法,支持插入单行数据和多行数据。数据以字典格式组织,键为列名(格式为 列族:列名),值为字段值,所有键值均为字节串格式。

单行数据插入
import happybase

conn = happybase.Connection(host='localhost', port=9090)
# 获取 student 表的 Table 对象
table = conn.table('student')

# 定义行键:HBase 中每行数据的唯一标识
row_key = '001'
# 定义要插入的数据
data = {
    b'info:name': b'Zhang San',
    b'info:age': b'20',
    b'data:score': b'95'
}

# 插入数据
table.put(row_key, data)
print(f"行键 {row_key} 的数据插入成功!")

conn.close()

代码说明

  • 行键 row_key 是 HBase 表中每行数据的唯一标识,字符串格式即可。
  • 数据字典 data 的键必须是字节串格式,格式为 列族:列名,值也必须是字节串格式。若要插入字符串数据,需使用 encode() 方法转换为字节串。
多行数据批量插入

批量插入数据可以提升操作效率,HappyBase 支持通过 Table.put() 方法传入多行数据列表实现批量插入。

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table = conn.table('student')

# 定义多行数据,每个元素为一个元组 (row_key, data_dict)
batch_data = [
    ('002', {b'info:name': b'Li Si', b'info:age': b'21', b'data:score': b'92'}),
    ('003', {b'info:name': b'Wang Wu', b'info:age': b'19', b'data:score': b'88'}),
    ('004', {b'info:name': b'Zhao Liu', b'info:age': b'22', b'data:score': b'90'})
]

# 批量插入数据
for row_key, data in batch_data:
    table.put(row_key, data)
print("多行数据批量插入成功!")

conn.close()

代码说明

  • 批量插入本质是循环调用单行插入方法,适用于中小规模的数据插入。若需要插入超大规模数据,可结合 HBase 的批量加载工具(如 BulkLoad)实现。

3.3.2 查询数据(Get 操作)

查询数据支持单行查询多行扫描,分别对应 Table.row()Table.scan() 方法。

单行数据查询

使用 Table.row() 方法可以查询指定行键的完整数据或指定列族/列的数据。

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table = conn.table('student')

# 查询行键为 001 的完整数据
row_key = '001'
row_data = table.row(row_key)
print(f"行键 {row_key} 的完整数据:")
for column, value in row_data.items():
    print(f"  {column.decode()}: {value.decode()}")

# 只查询 info 列族的数据
info_data = table.row(row_key, columns=[b'info'])
print(f"\n行键 {row_key} 的 info 列族数据:")
for column, value in info_data.items():
    print(f"  {column.decode()}: {value.decode()}")

# 只查询 info:name 和 data:score 列的数据
specific_data = table.row(row_key, columns=[b'info:name', b'data:score'])
print(f"\n行键 {row_key} 的指定列数据:")
for column, value in specific_data.items():
    print(f"  {column.decode()}: {value.decode()}")

conn.close()

代码说明

  • Table.row() 方法的 columns 参数用于指定要查询的列族或列,传入字节串列表即可。
  • 返回的 row_data 是一个字典,键为列名字节串,值为字段值字节串,需使用 decode() 方法转换为字符串格式。
多行数据扫描

使用 Table.scan() 方法可以扫描表中的多行数据,支持设置行键范围、列族/列过滤、数据版本等参数。

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table = conn.table('student')

# 扫描所有数据
print("扫描表中所有数据:")
for row_key, data in table.scan():
    print(f"行键:{row_key.decode()}")
    for column, value in data.items():
        print(f"  {column.decode()}: {value.decode()}")
    print("-" * 20)

# 扫描行键范围在 002 到 003 之间的数据
print("\n扫描行键 002-003 之间的数据:")
for row_key, data in table.scan(row_start=b'002', row_stop=b'004'):
    print(f"行键:{row_key.decode()}")
    for column, value in data.items():
        print(f"  {column.decode()}: {value.decode()}")
    print("-" * 20)

# 扫描 info 列族且 age 大于 20 的数据(需结合过滤器,此处为简化示例)
print("\n扫描 info 列族且 age 大于 20 的数据:")
for row_key, data in table.scan(columns=[b'info']):
    age = data.get(b'info:age', b'0').decode()
    if int(age) > 20:
        print(f"行键:{row_key.decode()},年龄:{age}")

conn.close()

代码说明

  • row_startrow_stop 参数用于指定行键的扫描范围,遵循左闭右开原则(即包含 row_start,不包含 row_stop)。
  • HappyBase 对 HBase 的高级过滤器支持有限,若需要复杂的条件过滤(如列值比较、正则匹配),需结合 happybase.Filter 类或直接使用 Thrift 接口定义过滤器。

3.3.3 更新数据

HBase 中更新数据的逻辑与插入数据一致,使用 Table.put() 方法即可。当插入的行键和列名已存在时,会自动覆盖原有数据,同时生成新的版本(根据列族的 max_versions 配置)。

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table = conn.table('student')

row_key = '001'
# 更新 age 和 score 字段
update_data = {
    b'info:age': b'21',
    b'data:score': b'96'
}

table.put(row_key, update_data)
print(f"行键 {row_key} 的数据更新成功!")

# 查询更新后的数据
row_data = table.row(row_key, columns=[b'info:age', b'data:score'])
print(f"更新后的数据:")
for column, value in row_data.items():
    print(f"  {column.decode()}: {value.decode()}")

conn.close()

代码说明

  • HBase 是版本化数据库,每次更新都会生成新的数据版本,旧版本数据不会立即删除,可通过指定版本号查询历史数据。例如,使用 table.row(row_key, versions=2) 可以获取最近 2 个版本的数据。

3.3.4 删除数据

删除数据使用 Table.delete() 方法,支持删除指定行的全部数据或指定列族/列的数据。

import happybase

conn = happybase.Connection(host='localhost', port=9090)
table = conn.table('student')

row_key = '004'
# 删除指定行的全部数据
table.delete(row_key)
print(f"行键 {row_key} 的全部数据已删除!")

# 删除指定行的指定列数据
row_key = '003'
table.delete(row_key, columns=[b'data:score'])
print(f"行键 {row_key} 的 data:score 列数据已删除!")

# 查询删除后的数据
row_data = table.row(row_key)
print(f"行键 {row_key} 删除后的剩余数据:")
for column, value in row_data.items():
    print(f"  {column.decode()}: {value.decode()}")

conn.close()

代码说明

  • 删除指定列数据时,需通过 columns 参数指定要删除的列名,格式为字节串列表。
  • 删除操作会生成新的版本数据,标记为删除状态,HBase 会在后续的 Major Compaction 过程中彻底清理这些数据。

四、HappyBase 实战案例:学生成绩管理系统

为了更好地理解 HappyBase 在实际项目中的应用,下面构建一个简单的学生成绩管理系统,实现学生信息的新增、查询、更新和删除功能。

4.1 系统功能需求

  1. 新增学生信息(姓名、年龄、班级、成绩)。
  2. 根据学号查询学生完整信息。
  3. 根据班级扫描学生列表。
  4. 更新学生的成绩信息。
  5. 删除指定学号的学生信息。

4.2 完整代码实现

import happybase

class StudentScoreManager:
    def __init__(self, host='localhost', port=9090, table_name='student_score'):
        """初始化连接和表对象"""
        self.host = host
        self.port = port
        self.table_name = table_name
        self.conn = None
        self.table = None
        self._connect()
        self._create_table()

    def _connect(self):
        """建立 HBase 连接"""
        self.conn = happybase.Connection(host=self.host, port=self.port)

    def _create_table(self):
        """创建学生成绩表,列族为 info(基本信息)和 score(成绩信息)"""
        column_families = {
            'info': dict(max_versions=3),
            'score': dict(max_versions=3)
        }
        if self.table_name.encode() not in self.conn.tables():
            self.conn.create_table(self.table_name, column_families)
            print(f"表 {self.table_name} 创建成功!")
        self.table = self.conn.table(self.table_name)

    def add_student(self, student_id, name, age, class_name, math, english, chinese):
        """新增学生信息"""
        data = {
            b'info:name': name.encode(),
            b'info:age': str(age).encode(),
            b'info:class': class_name.encode(),
            b'score:math': str(math).encode(),
            b'score:english': str(english).encode(),
            b'score:chinese': str(chinese).encode()
        }
        self.table.put(student_id, data)
        print(f"学生 {student_id} - {name} 信息新增成功!")

    def query_student(self, student_id):
        """根据学号查询学生信息"""
        row_data = self.table.row(student_id)
        if not row_data:
            print(f"未找到学号为 {student_id} 的学生信息!")
            return None
        student_info = {
            'student_id': student_id,
            'name': row_data[b'info:name'].decode(),
            'age': int(row_data[b'info:age'].decode()),
            'class': row_data[b'info:class'].decode(),
            'math': int(row_data[b'score:math'].decode()),
            'english': int(row_data[b'score:english'].decode()),
            'chinese': int(row_data[b'score:chinese'].decode())
        }
        return student_info

    def scan_class_students(self, class_name):
        """根据班级扫描学生列表"""
        students = []
        for row_key, data in self.table.scan(columns=[b'info', b'score']):
            if b'info:class' in data and data[b'info:class'].decode() == class_name:
                student_info = {
                    'student_id': row_key.decode(),
                    'name': data[b'info:name'].decode(),
                    'age': int(data[b'info:age'].decode()),
                    'math': int(data[b'score:math'].decode()),
                    'english': int(data[b'score:english'].decode()),
                    'chinese': int(data[b'score:chinese'].decode())
                }
                students.append(student_info)
        return students

    def update_score(self, student_id, subject, new_score):
        """更新学生指定科目的成绩"""
        column = f'score:{subject}'.encode()
        if not self.table.row(student_id):
            print(f"未找到学号为 {student_id} 的学生信息!")
            return
        self.table.put(student_id, {column: str(new_score).encode()})
        print(f"学生 {student_id} 的 {subject} 成绩更新为 {new_score}!")

    def delete_student(self, student_id):
        """删除指定学号的学生信息"""
        if not self.table.row(student_id):
            print(f"未找到学号为 {student_id} 的学生信息!")
            return
        self.table.delete(student_id)
        print(f"学生 {student_id} 的信息已删除!")

    def close(self):
        """关闭连接"""
        self.conn.close()
        print("连接已关闭!")

# 测试学生成绩管理系统
if __name__ == '__main__':
    manager = StudentScoreManager()

    # 1. 新增学生信息
    manager.add_student('2024001', 'Zhang San', 18, 'Class 1 Grade 3', 95, 92, 88)
    manager.add_student('2024002', 'Li Si', 17, 'Class 1 Grade 3', 90, 85, 93)
    manager.add_student('2024003', 'Wang Wu', 18, 'Class 2 Grade 3', 88, 91, 90)

    # 2. 查询单个学生信息
    print("\n查询学号 2024001 的学生信息:")
    student = manager.query_student('2024001')
    if student:
        for key, value in student.items():
            print(f"  {key}: {value}")

    # 3. 扫描指定班级的学生列表
    print("\n扫描 Class 1 Grade 3 的学生列表:")
    class_students = manager.scan_class_students('Class 1 Grade 3')
    for stu in class_students:
        print(f"  学号:{stu['student_id']},姓名:{stu['name']},数学成绩:{stu['math']}")

    # 4. 更新学生成绩
    print("\n更新学号 2024001 的数学成绩:")
    manager.update_score('2024001', 'math', 98)
    student = manager.query_student('2024001')
    print(f"  更新后数学成绩:{student['math']}")

    # 5. 删除学生信息
    print("\n删除学号 2024003 的学生信息:")
    manager.delete_student('2024003')
    manager.query_student('2024003')

    # 关闭连接
    manager.close()

4.3 代码说明与运行结果

  1. 类结构设计StudentScoreManager 类封装了所有核心功能,通过 __init__ 方法完成连接初始化和表创建,_connect_create_table 为内部辅助方法,对外提供 add_studentquery_student 等业务方法。
  2. 数据存储设计:使用 info 列族存储学生基本信息(姓名、年龄、班级),score 列族存储各科成绩,符合 HBase 列族的设计原则。
  3. 运行结果:执行代码后,会依次完成学生信息新增、查询、班级扫描、成绩更新和删除操作,终端输出对应的操作结果,验证了 HappyBase 在实际项目中的可行性。

五、HappyBase 相关资源链接

  • PyPI 地址:https://pypi.org/project/HappyBase
  • Github 地址:https://github.com/wbolster/happybase
  • 官方文档地址:https://happybase.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python Prisma库完全指南:现代ORM的高效数据操作实战

一、Prisma库核心概述

1.1 用途与工作原理

Prisma是一款为Python开发者设计的现代ORM(对象关系映射)工具,核心作用是简化Python应用与关系型数据库的交互流程,支持PostgreSQL、MySQL、SQLite等主流数据库。其工作原理基于数据模型驱动:开发者通过定义简洁的Schema文件描述数据结构,Prisma引擎会自动将Schema转换为对应的数据库表结构,同时生成类型安全的Python客户端,让开发者无需编写复杂的SQL语句,直接通过面向对象的方式完成数据的增删改查操作。

1.2 优缺点分析

优点

  • 类型安全:自动生成的客户端包含完整的类型提示,结合Python的类型检查工具(如mypy)可在编码阶段发现数据类型错误,大幅降低运行时异常概率。
  • Schema即文档:Schema文件采用直观的语法,兼具数据结构定义与文档功能,团队协作时可直接通过Schema了解数据模型。
  • 迁移管理便捷:内置的迁移工具支持数据库结构的版本控制,可轻松实现数据库表的创建、修改、删除,且能追踪迁移历史。
  • 查询能力强大:支持链式查询、关联查询、批量操作等复杂场景,查询语法简洁易懂,比传统ORM更贴近自然语言。

缺点

  • 生态成熟度待提升:相较于Django ORM、SQLAlchemy等老牌ORM,Python版Prisma的第三方插件和扩展较少。
  • 学习曲线:对于习惯原生SQL或传统ORM的开发者,需要适应Prisma独特的Schema定义和查询风格。
  • 性能损耗:在超高性能要求的场景下,ORM的封装会带来轻微的性能开销,极端场景下可能需要结合原生SQL优化。

1.3 License类型

Python Prisma库采用Apache License 2.0开源协议,该协议允许商业使用、修改和分发,只需保留原作者的版权声明,对开发者友好且无商业使用限制。

二、Prisma库安装与环境配置

2.1 安装前置条件

在安装Prisma之前,需确保本地环境满足以下要求:

  • Python版本≥3.8(推荐3.9及以上)
  • 已安装对应数据库的客户端工具(如PostgreSQL需安装psycopg2,MySQL需安装mysqlclient)
  • 网络环境正常,可访问PyPI仓库

2.2 安装命令

Prisma的安装分为两个步骤:首先安装Python包,然后初始化Prisma引擎。

  1. 安装Python包
    打开命令行终端,执行以下pip命令安装Prisma:
    bash pip install prisma
  2. 初始化Prisma引擎 安装完成后,需要初始化Prisma的二进制引擎,执行以下命令: bash prisma init 执行该命令后,会在当前目录生成两个关键文件:
    • schema.prisma:用于定义数据模型和数据库连接配置的核心文件。
    • .env:用于存储数据库连接字符串等环境变量。

2.3 数据库连接配置

以SQLite数据库为例(无需额外安装服务,适合快速开发),修改schema.prisma文件中的datasource块:

datasource db {
  provider = "sqlite"
  url      = env("DATABASE_URL")
}

generator client {
  provider = "prisma-client-py"
}

然后修改.env文件,设置数据库连接URL:

DATABASE_URL="file:./dev.db"

若使用MySQL数据库,修改datasource块和.env文件如下:

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}
DATABASE_URL="mysql://user:password@localhost:3306/mydatabase"

其中user为数据库用户名,password为密码,mydatabase为数据库名称。

三、Prisma核心使用教程

3.1 数据模型定义

Prisma的核心是schema.prisma文件中的数据模型定义,模型对应数据库中的表,模型中的字段对应表中的列。下面以一个User用户模型和Post文章模型为例,展示如何定义关联模型。

// User模型:对应数据库中的users表
model User {
  id        Int      @id @default(autoincrement()) // 主键,自增整数
  name      String   // 用户名,字符串类型
  email     String   @unique // 邮箱,唯一约束
  age       Int?     // 年龄,可选整数(允许为空)
  posts     Post[]   // 一对多关联:一个用户可以有多篇文章
  createdAt DateTime @default(now()) // 创建时间,默认当前时间
}

// Post模型:对应数据库中的posts表
model Post {
  id        Int      @id @default(autoincrement())
  title     String   // 文章标题
  content   String?  // 文章内容,可选
  authorId  Int      // 外键,关联User模型的id
  author    User     @relation(fields: [authorId], references: [id]) // 多对一关联
  published Boolean  @default(false) // 是否发布,默认false
  createdAt DateTime @default(now())
}

字段属性说明

  • @id:标记该字段为主键。
  • @default(autoincrement()):设置字段默认值为自增。
  • @unique:添加唯一约束,确保字段值不重复。
  • ?:标记字段为可选,允许存储NULL值。
  • @relation:定义模型之间的关联关系。

3.2 生成数据库表与客户端

定义好Schema后,需要执行迁移命令生成对应的数据库表结构,同时生成Python客户端代码。

  1. 创建迁移文件
    执行以下命令,Prisma会根据Schema的变化生成迁移文件:
    bash prisma migrate dev --name init
    --name init表示给本次迁移命名为init,执行成功后,会在prisma/migrations目录下生成迁移历史文件,同时自动在数据库中创建UserPost表。
  2. 生成Python客户端
    迁移完成后,Prisma会自动生成类型安全的Python客户端,无需手动编写。客户端文件默认生成在prisma目录下,可直接在Python代码中导入使用。

3.3 基础数据操作(CRUD)

Prisma客户端提供了简洁的API实现数据的增删改查,下面通过具体的Python脚本演示每个操作的使用方法。

3.3.1 连接数据库并初始化客户端

在Python脚本中,首先需要导入并初始化Prisma客户端,建立与数据库的连接:

# 导入Prisma客户端
from prisma import Prisma

# 初始化客户端
db = Prisma()

# 连接数据库
async def connect_db():
    await db.connect()

# 关闭数据库连接
async def disconnect_db():
    await db.disconnect()

由于Prisma的Python客户端基于异步IO设计,所有数据库操作都需要在异步函数中执行。

3.3.2 创建数据(Create)

使用create方法向数据库中插入单条数据,使用create_many方法批量插入多条数据。

单条数据插入

async def create_user():
    # 连接数据库
    await connect_db()
    # 创建用户
    user = await db.user.create(
        data={
            'name': '张三',
            'email': '[email protected]',
            'age': 25
        }
    )
    # 打印创建的用户信息
    print(f'创建用户成功:{user.id} - {user.name} - {user.email}')
    # 关闭连接
    await disconnect_db()

# 执行异步函数
import asyncio
asyncio.run(create_user())

执行上述代码后,会在User表中插入一条用户数据,user对象包含了数据库返回的完整用户信息,包括自动生成的idcreatedAt字段。

批量数据插入

async def batch_create_users():
    await connect_db()
    # 批量创建3个用户
    result = await db.user.create_many(
        data=[
            {'name': '李四', 'email': '[email protected]', 'age': 22},
            {'name': '王五', 'email': '[email protected]', 'age': 28},
            {'name': '赵六', 'email': '[email protected]'}
        ]
    )
    # result包含创建的记录数
    print(f'批量创建用户成功,共创建 {result.count} 条记录')
    await disconnect_db()

asyncio.run(batch_create_users())

create_many方法的返回值是一个包含count属性的对象,表示成功插入的记录数量。

3.3.3 查询数据(Read)

Prisma提供了丰富的查询方法,包括find_uniquefind_firstfind_many等,支持条件过滤、排序、分页和关联查询。

查询单条数据
使用find_unique方法根据唯一约束字段查询单条数据,例如根据邮箱查询用户:

async def find_user_by_email(email: str):
    await connect_db()
    # 根据邮箱查询用户(email字段有@unique约束)
    user = await db.user.find_unique(
        where={
            'email': email
        }
    )
    if user:
        print(f'查询到用户:{user.name} - {user.age}')
    else:
        print('未查询到该用户')
    await disconnect_db()

asyncio.run(find_user_by_email('[email protected]'))

使用find_first方法查询满足条件的第一条数据(无需唯一约束):

async def find_first_user():
    await connect_db()
    # 查询年龄大于20的第一个用户
    user = await db.user.find_first(
        where={
            'age': {
                'gt': 20
            }
        }
    )
    print(f'查询到用户:{user.name} - {user.age}')
    await disconnect_db()

asyncio.run(find_first_user())

其中gt表示“大于”,Prisma支持的查询操作符还包括lt(小于)、gte(大于等于)、lte(小于等于)、contains(包含)等。

查询多条数据
使用find_many方法查询满足条件的所有数据,支持排序、分页和字段筛选:

async def find_all_users():
    await connect_db()
    # 查询所有用户,按创建时间降序排序,只返回name和email字段
    users = await db.user.find_many(
        select={
            'name': True,
            'email': True
        },
        order={
            'createdAt': 'desc'
        },
        # 分页:跳过前1条,取2条
        skip=1,
        take=2
    )
    # 遍历打印用户信息
    for user in users:
        print(f'用户名:{user.name},邮箱:{user.email}')
    await disconnect_db()

asyncio.run(find_all_users())

select参数用于指定返回的字段,order参数用于排序,skiptake参数用于实现分页功能。

关联查询
查询用户的同时,获取该用户发布的所有文章,使用include参数实现关联数据的加载:

async def find_user_with_posts(user_id: int):
    await connect_db()
    # 查询用户及其所有文章
    user = await db.user.find_unique(
        where={
            'id': user_id
        },
        include={
            'posts': True
        }
    )
    if user:
        print(f'用户:{user.name},发布的文章数:{len(user.posts)}')
        for post in user.posts:
            print(f'文章标题:{post.title},状态:{"已发布" if post.published else "未发布"}')
    await disconnect_db()

# 假设用户id为1
asyncio.run(find_user_with_posts(1))

上述代码中,通过include={'posts': True},Prisma会自动查询该用户关联的所有Post数据,并封装到user.posts属性中。

3.3.4 更新数据(Update)

使用update方法更新单条数据,使用update_many方法批量更新多条数据。

单条数据更新

async def update_user_age(user_id: int, new_age: int):
    await connect_db()
    # 更新用户年龄
    updated_user = await db.user.update(
        where={
            'id': user_id
        },
        data={
            'age': new_age
        }
    )
    print(f'用户更新成功,新年龄:{updated_user.age}')
    await disconnect_db()

asyncio.run(update_user_age(1, 26))

update方法的where参数指定更新条件,data参数指定要更新的字段和值,返回值为更新后的完整数据对象。

批量数据更新

async def batch_update_posts():
    await connect_db()
    # 将所有未发布的文章标记为已发布
    result = await db.post.update_many(
        where={
            'published': False
        },
        data={
            'published': True
        }
    )
    print(f'批量更新成功,共更新 {result.count} 篇文章')
    await disconnect_db()

asyncio.run(batch_update_posts())

3.3.5 删除数据(Delete)

使用delete方法删除单条数据,使用delete_many方法批量删除多条数据。

单条数据删除

async def delete_user(user_id: int):
    await connect_db()
    # 删除指定用户
    deleted_user = await db.user.delete(
        where={
            'id': user_id
        }
    )
    print(f'删除用户成功:{deleted_user.name}')
    await disconnect_db()

asyncio.run(delete_user(4))

批量数据删除

async def delete_unpublished_posts():
    await connect_db()
    # 删除所有未发布的文章
    result = await db.post.delete_many(
        where={
            'published': False
        }
    )
    print(f'批量删除成功,共删除 {result.count} 篇文章')
    await disconnect_db()

asyncio.run(delete_unpublished_posts())

3.4 事务处理

事务是数据库操作的重要特性,用于保证多个操作的原子性(要么全部成功,要么全部失败)。Prisma客户端提供transaction方法实现事务处理。

例如,创建用户的同时,为该用户创建一篇文章,两个操作在同一个事务中执行:

async def create_user_and_post():
    await connect_db()
    try:
        # 开启事务
        async with db.transaction():
            # 第一步:创建用户
            user = await db.user.create(
                data={
                    'name': '钱七',
                    'email': '[email protected]',
                    'age': 30
                }
            )
            print(f'事务中创建用户:{user.name}')
            # 第二步:为该用户创建文章
            post = await db.post.create(
                data={
                    'title': 'Prisma事务教程',
                    'content': 'Prisma的事务处理非常简单',
                    'authorId': user.id
                }
            )
            print(f'事务中创建文章:{post.title}')
        # 事务提交成功
        print('用户和文章创建成功,事务已提交')
    except Exception as e:
        # 事务回滚
        print(f'操作失败,事务已回滚,错误信息:{e}')
    finally:
        await disconnect_db()

asyncio.run(create_user_and_post())

在上述代码中,async with db.transaction()上下文管理器会自动管理事务的提交和回滚:如果上下文内的所有操作都成功执行,事务会自动提交;如果任何一个操作抛出异常,事务会自动回滚,确保数据一致性。

四、实际项目案例:简易博客系统

4.1 项目需求

构建一个简易的博客系统,实现以下功能:

  1. 用户注册和登录(简化版,不涉及密码加密)
  2. 创建、查询、发布博客文章
  3. 查询指定用户的所有文章

4.2 项目目录结构

simple_blog/
├── .env               # 环境变量配置
├── schema.prisma      # Prisma数据模型定义
└── blog.py            # 业务逻辑代码

4.3 数据模型定义

schema.prisma文件的内容与第三部分的模型定义一致,包含UserPost两个关联模型。

4.4 业务逻辑实现

blog.py文件中实现具体的业务功能,代码如下:

from prisma import Prisma
import asyncio

# 初始化Prisma客户端
db = Prisma()

# 数据库连接与关闭工具函数
async def get_db():
    await db.connect()
    try:
        yield db
    finally:
        await db.disconnect()

# 1. 用户注册功能
async def register_user(name: str, email: str, age: int = None):
    async for db in get_db():
        # 检查邮箱是否已存在
        existing_user = await db.user.find_unique(where={'email': email})
        if existing_user:
            print(f'邮箱 {email} 已被注册')
            return None
        # 创建新用户
        user = await db.user.create(
            data={
                'name': name,
                'email': email,
                'age': age
            }
        )
        print(f'用户 {name} 注册成功,用户ID:{user.id}')
        return user

# 2. 创建博客文章
async def create_article(title: str, content: str, author_id: int):
    async for db in get_db():
        # 检查作者是否存在
        author = await db.user.find_unique(where={'id': author_id})
        if not author:
            print(f'作者ID {author_id} 不存在')
            return None
        # 创建文章
        post = await db.post.create(
            data={
                'title': title,
                'content': content,
                'authorId': author_id
            }
        )
        print(f'文章 {title} 创建成功,文章ID:{post.id}')
        return post

# 3. 发布博客文章
async def publish_article(post_id: int):
    async for db in get_db():
        post = await db.post.update(
            where={'id': post_id},
            data={'published': True}
        )
        print(f'文章 {post.title} 已发布')
        return post

# 4. 查询用户的所有已发布文章
async def get_user_published_posts(author_id: int):
    async for db in get_db():
        user = await db.user.find_unique(
            where={'id': author_id},
            include={
                'posts': {
                    'where': {'published': True},
                    'order': {'createdAt': 'desc'}
                }
            }
        )
        if not user:
            print(f'作者ID {author_id} 不存在')
            return []
        print(f'用户 {user.name} 的已发布文章:')
        for post in user.posts:
            print(f'- {post.title} | 创建时间:{post.createdAt}')
        return user.posts

# 主函数:执行案例
async def main():
    # 注册新用户
    user = await register_user('小明', '[email protected]', 23)
    if not user:
        return
    # 为用户创建文章
    post = await create_article('Prisma实战教程', '本文介绍了Prisma的核心用法', user.id)
    if not post:
        return
    # 发布文章
    await publish_article(post.id)
    # 查询用户已发布的文章
    await get_user_published_posts(user.id)

# 运行主函数
if __name__ == '__main__':
    asyncio.run(main())

4.5 运行结果

执行blog.py文件,控制台输出如下:

用户 小明 注册成功,用户ID:5
文章 Prisma实战教程 创建成功,文章ID:3
文章 Prisma实战教程 已发布
用户 小明 的已发布文章:
- Prisma实战教程 | 创建时间:2024-05-20 15:30:25

五、Prisma相关资源

5.1 PyPI地址

https://pypi.org/project/prisma

5.2 Github地址

https://github.com/prisma/prisma-client-py

5.3 官方文档地址

https://prisma-client-py.readthedocs.io

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:pysolr 从入门到精通——高效操作Solr搜索引擎的指南

一、pysolr 库概述

1.1 用途

pysolr 是一个专门用于和 Apache Solr 搜索引擎进行交互的 Python 客户端库,它能够让开发者通过简洁的 Python 代码,轻松实现对 Solr 索引的创建、数据的添加、删除、更新以及复杂的查询操作。无论是构建企业级的全文检索系统,还是实现数据分析场景下的快速数据筛选,pysolr 都能提供稳定且高效的支持。

1.2 工作原理

pysolr 底层基于 HTTP/HTTPS 协议与 Solr 服务器进行通信,它将 Python 代码中的操作指令(如查询语句、数据提交指令)封装成符合 Solr API 规范的 HTTP 请求,发送到 Solr 服务器的指定接口(如 /solr/core_name/update 用于数据更新,/solr/core_name/select 用于数据查询),然后接收 Solr 服务器返回的 JSON 格式响应,并将其解析为 Python 中的字典、列表等数据结构,方便开发者直接处理。

1.3 优缺点

优点

  • 接口简洁易用,极大降低了 Python 开发者操作 Solr 的门槛,无需手动构造复杂的 HTTP 请求。
  • 支持 Solr 的大部分核心功能,包括全文检索、过滤查询、排序、分组统计、高亮显示等。
  • 兼容性良好,能够适配不同版本的 Apache Solr,且支持 Python 3.6 及以上的主流 Python 版本。

缺点

  • 功能覆盖相较于 Solr 的原生 API 存在少量缺失,部分高级特性(如自定义请求处理器的复杂配置)需要手动扩展 HTTP 请求参数。
  • 对大规模数据批量操作的性能优化需要开发者自行调整参数(如批量提交的大小),默认配置下的大批量数据插入效率有待提升。

1.4 License 类型

pysolr 采用的是 BSD 3-Clause 许可证,这是一个宽松的开源许可证,允许开发者自由地使用、修改、分发该库的代码,无论是用于商业项目还是开源项目,都几乎没有限制,只需要保留原作者的版权声明即可。

二、pysolr 安装与环境准备

2.1 安装 pysolr

安装 pysolr 非常简单,推荐使用 Python 的包管理工具 pip 进行安装,在命令行中执行以下命令即可完成安装:

pip install pysolr

该命令会自动从 PyPI 下载并安装最新版本的 pysolr 库及其依赖项(主要依赖 requests 库用于 HTTP 通信)。

2.2 环境依赖确认

  • Python 版本:确保你的 Python 环境版本为 3.6 及以上,可以通过 python --version 命令查看当前 Python 版本。
  • Solr 服务器环境:pysolr 是操作 Solr 的客户端,因此需要先搭建好 Solr 服务器环境。你可以从 Apache Solr 官方网站(https://solr.apache.org/)下载对应版本的 Solr 安装包,按照官方文档完成安装和启动,并创建至少一个 Solr Core(Solr 的核心索引单元)用于后续操作。
  • 网络连通性:确保运行 pysolr 代码的机器能够和 Solr 服务器所在的机器互通网络,Solr 默认的 HTTP 端口为 8983,需要保证该端口未被防火墙拦截。

三、pysolr 核心使用方法与代码示例

3.1 连接 Solr 服务器

在使用 pysolr 进行任何操作之前,首先需要创建一个 Solr 客户端实例,建立与 Solr 服务器的连接。核心代码如下:

import pysolr

# 定义 Solr 服务器的基础 URL 和 Core 名称
# 格式为:http://solr_host:solr_port/solr/core_name
SOLR_URL = "http://localhost:8983/solr/gettingstarted"

# 创建 Solr 客户端实例
solr = pysolr.Solr(SOLR_URL, timeout=10)

print("成功连接到 Solr 服务器!")

代码说明

  • pysolr.Solr() 是创建客户端实例的构造函数,第一个参数是 Solr Core 的完整 URL,其中 localhost 是 Solr 服务器的主机名,8983 是默认端口,gettingstarted 是 Solr Core 的名称(需要替换为你自己创建的 Core 名称)。
  • timeout 参数设置了 HTTP 请求的超时时间(单位为秒),避免因网络问题导致程序长时间阻塞。

3.2 向 Solr 中添加数据

Solr 存储的数据是以文档(Document)为单位的,每个文档是一个键值对的集合,对应 Solr Schema 中定义的字段。我们可以通过 add() 方法向 Solr 中添加单个或多个文档。

3.2.1 添加单个文档

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 定义一个 Solr 文档,字段需要和 Solr Schema 中的定义一致
document = {
    "id": "book_001",  # id 字段是 Solr 的默认唯一标识字段,必填
    "title": "Python编程:从入门到实践",
    "author": "埃里克·马瑟斯",
    "publisher": "人民邮电出版社",
    "publish_date": "2020-01-01",
    "price": 59.8,
    "tags": ["Python", "编程", "入门"]
}

# 添加文档到 Solr
solr.add([document])

# 提交更改,确保数据被持久化到索引中
solr.commit()

print("单个文档添加成功!")

3.2.2 批量添加多个文档

当需要添加大量数据时,批量添加的效率远高于逐个添加,代码示例如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 定义多个文档的列表
documents = [
    {
        "id": "book_002",
        "title": "流畅的Python",
        "author": "卢西亚诺·拉马略",
        "publisher": "人民邮电出版社",
        "publish_date": "2017-05-01",
        "price": 129.0,
        "tags": ["Python", "进阶", "编程思想"]
    },
    {
        "id": "book_003",
        "title": "Python数据分析与挖掘实战",
        "author": "张良均",
        "publisher": "机械工业出版社",
        "publish_date": "2019-03-01",
        "price": 79.0,
        "tags": ["Python", "数据分析", "挖掘"]
    },
    {
        "id": "book_004",
        "title": "深度学习入门:基于Python的理论与实现",
        "author": "斋藤康毅",
        "publisher": "人民邮电出版社",
        "publish_date": "2018-07-01",
        "price": 69.0,
        "tags": ["Python", "深度学习", "AI"]
    }
]

# 批量添加文档
solr.add(documents, batch_size=2)  # batch_size 表示每次提交的文档数量

# 提交更改
solr.commit()

print("批量文档添加成功!")

代码说明

  • add() 方法接收一个文档列表作为参数,batch_size 参数可以控制每次向 Solr 提交的文档数量,当文档数量较多时,合理设置 batch_size 可以避免单次请求数据量过大导致的失败。
  • commit() 方法用于提交更改,Solr 在接收到 add 请求后,会先将数据存入内存,只有执行 commit 操作后,数据才会被写入磁盘索引,并且才能被查询到。

3.3 从 Solr 中删除数据

pysolr 支持通过文档 ID、查询条件等方式删除 Solr 中的数据,常用的删除方法有 delete()delete_by_query()

3.3.1 通过 ID 删除单个文档

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 通过文档 ID 删除
solr.delete(id="book_001")

# 提交更改
solr.commit()

print("通过ID删除文档成功!")

3.3.2 通过查询条件删除多个文档

如果需要删除满足特定条件的一批文档,可以使用 delete_by_query() 方法,代码示例如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 删除 publisher 为"机械工业出版社"的所有文档
solr.delete_by_query("publisher:机械工业出版社")

# 提交更改
solr.commit()

print("通过查询条件删除文档成功!")

3.3.3 删除所有文档

如果需要清空整个 Solr Core 的数据,可以使用通配符查询条件 *:*,代码如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 删除所有文档
solr.delete_by_query("*:*")

# 提交更改
solr.commit()

print("所有文档删除成功!")

代码说明

  • delete(id="xxx") 方法用于删除指定 ID 的文档,ID 是 Solr 文档的唯一标识。
  • delete_by_query(query) 方法接收一个 Solr 查询语句作为参数,会删除所有满足该查询条件的文档,使用时需要格外谨慎,避免误删数据。

3.4 查询 Solr 中的数据

查询是 Solr 的核心功能,pysolr 提供了 search() 方法来执行各种查询操作,支持全文检索、过滤、排序、分页、高亮等多种功能。

3.4.1 基础全文检索

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 先添加一些测试数据,方便查询
test_docs = [
    {
        "id": "book_002",
        "title": "流畅的Python",
        "author": "卢西亚诺·拉马略",
        "publisher": "人民邮电出版社",
        "publish_date": "2017-05-01",
        "price": 129.0,
        "tags": ["Python", "进阶", "编程思想"]
    },
    {
        "id": "book_003",
        "title": "Python数据分析与挖掘实战",
        "author": "张良均",
        "publisher": "机械工业出版社",
        "publish_date": "2019-03-01",
        "price": 79.0,
        "tags": ["Python", "数据分析", "挖掘"]
    }
]
solr.add(test_docs)
solr.commit()

# 基础全文检索:搜索标题中包含"Python"的文档
results = solr.search("title:Python")

# 处理查询结果
print(f"查询到 {len(results)} 条结果:")
for result in results:
    print(f"ID: {result['id']}")
    print(f"标题: {result['title']}")
    print(f"作者: {result['author']}")
    print(f"价格: {result['price']}")
    print("-" * 50)

3.4.2 带过滤条件的查询

在实际应用中,我们经常需要在全文检索的基础上,添加过滤条件来缩小查询范围,例如过滤价格区间、出版社等,代码示例如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 搜索标题包含"Python",且价格在 50-100 之间,出版社为"人民邮电出版社"的文档
# q 参数是查询语句,fq 参数是过滤条件(可以是多个)
results = solr.search(
    q="title:Python",
    fq=[
        "price:[50 TO 100]",  # 价格区间过滤,闭区间
        "publisher:人民邮电出版社"
    ]
)

print(f"过滤查询到 {len(results)} 条结果:")
for result in results:
    print(f"ID: {result['id']}")
    print(f"标题: {result['title']}")
    print(f"价格: {result['price']}")
    print(f"出版社: {result['publisher']}")
    print("-" * 50)

3.4.3 带排序和分页的查询

当查询结果较多时,分页和排序功能是必不可少的,pysolr 支持通过 sortstartrows 参数来实现,代码示例如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 先添加更多测试数据
more_docs = [
    {
        "id": "book_005",
        "title": "Python爬虫开发与项目实战",
        "author": "范传辉",
        "publisher": "机械工业出版社",
        "publish_date": "2020-01-01",
        "price": 65.0,
        "tags": ["Python", "爬虫"]
    },
    {
        "id": "book_006",
        "title": "Python Web开发实战",
        "author": "陶俊杰",
        "publisher": "清华大学出版社",
        "publish_date": "2018-10-01",
        "price": 89.0,
        "tags": ["Python", "Web开发"]
    }
]
solr.add(more_docs)
solr.commit()

# 搜索标题包含"Python"的文档,按价格降序排序,分页获取第1页(从0开始),每页3条
results = solr.search(
    q="title:Python",
    sort="price desc",  # desc 降序,asc 升序
    start=0,  # 起始位置
    rows=3  # 每页显示的条数
)

print(f"分页查询到 {len(results)} 条结果:")
for result in results:
    print(f"ID: {result['id']}")
    print(f"标题: {result['title']}")
    print(f"价格: {result['price']}")
    print("-" * 50)

# 获取总记录数
print(f"符合条件的总记录数: {results.hits}")

3.4.4 高亮显示查询结果

高亮显示可以让查询结果中匹配的关键词以特殊样式呈现,提升用户体验,pysolr 支持通过 hl 相关参数实现高亮功能,代码示例如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 搜索标题包含"Python"的文档,并高亮显示标题中的关键词
results = solr.search(
    q="title:Python",
    hl=True,  # 开启高亮功能
    hl_fl="title",  # 指定需要高亮的字段
    hl_simple_pre="<em>",  # 高亮前缀
    hl_simple_post="</em>"  # 高亮后缀
)

print(f"高亮查询到 {len(results)} 条结果:")
for result in results:
    print(f"ID: {result['id']}")
    # 获取高亮后的标题
    highlighted_title = result.highlighting.get(result['id'], {}).get('title', [result['title']])[0]
    print(f"高亮标题: {highlighted_title}")
    print(f"作者: {result['author']}")
    print("-" * 50)

代码说明

  • hl=True 表示开启高亮功能,hl_fl 指定需要进行高亮处理的字段。
  • hl_simple_prehl_simple_post 分别设置高亮的前缀和后缀,通常用于 HTML 页面展示,让关键词以斜体、加粗等样式显示。
  • result.highlighting 中存储了高亮后的字段内容,需要通过文档 ID 来获取对应字段的高亮结果。

3.5 更新 Solr 中的数据

Solr 的数据更新可以通过 add() 方法结合文档 ID 实现,因为 Solr 会根据 ID 进行覆盖更新,代码示例如下:

import pysolr

SOLR_URL = "http://localhost:8983/solr/gettingstarted"
solr = pysolr.Solr(SOLR_URL, timeout=10)

# 定义需要更新的文档,ID 为已存在的文档 ID
updated_document = {
    "id": "book_002",
    "title": "流畅的Python(第2版)",  # 更新标题
    "author": "卢西亚诺·拉马略",
    "publisher": "人民邮电出版社",
    "publish_date": "2022-01-01",  # 更新出版日期
    "price": 149.0,  # 更新价格
    "tags": ["Python", "进阶", "编程思想", "第2版"]  # 更新标签
}

# 通过 add 方法实现更新,Solr 会根据 ID 覆盖原有文档
solr.add([updated_document])
solr.commit()

print("文档更新成功!")

# 验证更新结果
result = solr.search(q="id:book_002")
for doc in result:
    print(f"更新后的标题: {doc['title']}")
    print(f"更新后的价格: {doc['price']}")
    print(f"更新后的标签: {doc['tags']}")

代码说明
Solr 没有专门的更新方法,而是通过“先删除后添加”的逻辑实现更新,当使用 add() 方法提交一个已存在 ID 的文档时,Solr 会自动删除原有 ID 的文档,然后添加新的文档内容,从而实现更新效果。

四、pysolr 实际应用案例:构建简单的图书检索系统

4.1 案例需求

我们需要构建一个简单的图书检索系统,实现以下功能:

  1. 批量导入图书数据到 Solr。
  2. 支持按书名、作者、出版社进行全文检索。
  3. 支持按价格区间过滤检索结果。
  4. 支持对检索结果按价格排序和分页。
  5. 支持高亮显示检索关键词。

4.2 案例代码实现

import pysolr
from typing import List, Dict, Optional

class BookSearchSystem:
    def __init__(self, solr_url: str, timeout: int = 10):
        """
        初始化图书检索系统
        :param solr_url: Solr Core 的 URL
        :param timeout: HTTP 请求超时时间
        """
        self.solr = pysolr.Solr(solr_url, timeout=timeout)

    def import_books(self, books: List[Dict]) -> None:
        """
        批量导入图书数据到 Solr
        :param books: 图书数据列表
        """
        if not books:
            print("没有需要导入的图书数据!")
            return
        try:
            self.solr.add(books, batch_size=5)
            self.solr.commit()
            print(f"成功导入 {len(books)} 本图书数据!")
        except Exception as e:
            print(f"导入图书数据失败:{e}")

    def search_books(
        self,
        keyword: str,
        field: str = "*",
        min_price: Optional[float] = None,
        max_price: Optional[float] = None,
        sort_by: str = "price asc",
        page: int = 1,
        page_size: int = 3,
        highlight: bool = True
    ) -> pysolr.Results:
        """
        检索图书数据
        :param keyword: 检索关键词
        :param field: 检索的字段,* 表示所有字段
        :param min_price: 最低价格过滤条件
        :param max_price: 最高价格过滤条件
        :param sort_by: 排序方式,如 price desc
        :param page: 页码,从 1 开始
        :param page_size: 每页显示的条数
        :param highlight: 是否开启高亮
        :return: 检索结果
        """
        # 构建查询语句
        if field == "*":
            query = f"{keyword}"
        else:
            query = f"{field}:{keyword}"

        # 构建过滤条件
        filter_queries = []
        if min_price is not None and max_price is not None:
            filter_queries.append(f"price:[{min_price} TO {max_price}]")
        elif min_price is not None:
            filter_queries.append(f"price:[{min_price} TO *]")
        elif max_price is not None:
            filter_queries.append(f"price:[* TO {max_price}]")

        # 计算分页参数
        start = (page - 1) * page_size

        # 构建高亮参数
        hl_params = {}
        if highlight:
            hl_params = {
                "hl": True,
                "hl_fl": field if field != "*" else "title,author,publisher",
                "hl_simple_pre": "<strong>",
                "hl_simple_post": "</strong>"
            }

        # 执行查询
        results = self.solr.search(
            q=query,
            fq=filter_queries,
            sort=sort_by,
            start=start,
            rows=page_size,
            **hl_params
        )

        return results

    def display_results(self, results: pysolr.Results) -> None:
        """
        展示检索结果
        :param results: 检索结果对象
        """
        if not results:
            print("没有查询到符合条件的图书!")
            return
        print(f"\n共查询到 {results.hits} 本符合条件的图书,当前显示第 {(results.start // results.rows) + 1} 页:")
        print("=" * 80)
        for idx, result in enumerate(results, start=1):
            book_id = result['id']
            # 获取高亮内容
            highlighting = result.highlighting.get(book_id, {})
            title = highlighting.get('title', [result.get('title', '未知标题')])[0]
            author = highlighting.get('author', [result.get('author', '未知作者')])[0]
            publisher = highlighting.get('publisher', [result.get('publisher', '未知出版社')])[0]
            price = result.get('price', 0.0)

            print(f"[{idx}] ID: {book_id}")
            print(f"标题: {title}")
            print(f"作者: {author}")
            print(f"出版社: {publisher}")
            print(f"价格: {price} 元")
            print("-" * 80)

# 测试图书检索系统
if __name__ == "__main__":
    # Solr Core URL
    SOLR_CORE_URL = "http://localhost:8983/solr/gettingstarted"

    # 初始化系统
    book_system = BookSearchSystem(SOLR_CORE_URL)

    # 准备测试图书数据
    test_books = [
        {"id": "b1001", "title": "Python编程:从入门到实践", "author": "埃里克·马瑟斯", "publisher": "人民邮电出版社", "price": 59.8, "tags": ["Python", "入门"]},
        {"id": "b1002", "title": "流畅的Python", "author": "卢西亚诺·拉马略", "publisher": "人民邮电出版社", "price": 129.0, "tags": ["Python", "进阶"]},
        {"id": "b1003", "title": "Python数据分析与挖掘实战", "author": "张良均", "publisher": "机械工业出版社", "price": 79.0, "tags": ["Python", "数据分析"]},
        {"id": "b1004", "title": "深度学习入门:基于Python的理论与实现", "author": "斋藤康毅", "publisher": "人民邮电出版社", "price": 69.0, "tags": ["Python", "AI"]},
        {"id": "b1005", "title": "Python爬虫开发与项目实战", "author": "范传辉", "publisher": "机械工业出版社", "price": 65.0, "tags": ["Python", "爬虫"]},
        {"id": "b1006", "title": "Java编程思想", "author": "布鲁斯·埃克尔", "publisher": "机械工业出版社", "price": 109.0, "tags": ["Java", "进阶"]},
        {"id": "b1007", "title": "Python Web开发实战", "author": "陶俊杰", "publisher": "清华大学出版社", "price": 89.0, "tags": ["Python", "Web"]},
        {"id": "b1008", "title": "数据结构与算法分析:Python语言描述", "author": "马克·艾伦·维斯", "publisher": "机械工业出版社", "price": 75.0, "tags": ["Python", "算法"]}
    ]

    # 批量导入图书数据
    book_system.import_books(test_books)

    # 测试检索功能:搜索标题包含"Python",价格在 50-100 之间的图书,按价格降序排序,第1页,每页3条
    search_results = book_system.search_books(
        keyword="Python",
        field="title",
        min_price=50.0,
        max_price=100.0,
        sort_by="price desc",
        page=1,
        page_size=3,
        highlight=True
    )

    # 展示检索结果
    book_system.display_results(search_results)

    # 测试检索功能:搜索作者包含"张良均"的图书
    print("\n\n===== 按作者检索 =====")
    author_results = book_system.search_books(keyword="张良均", field="author")
    book_system.display_results(author_results)

4.3 案例运行说明

  1. 运行该代码前,需要确保 Solr 服务器已启动,且对应的 Core 已创建。
  2. 代码中定义了 BookSearchSystem 类,封装了图书数据的导入和检索功能,便于复用和维护。
  3. 测试部分首先初始化系统,然后导入测试图书数据,接着执行两次检索操作,分别按标题和作者检索,并展示结果。
  4. 检索结果中,匹配的关键词会被 <strong> 标签包裹,在 HTML 页面中展示时会呈现为加粗样式。

五、pysolr 相关资源链接

  • PyPI 地址:https://pypi.org/project/pysolr
  • Github 地址:https://github.com/django-haystack/pysolr
  • 官方文档地址:https://pysolr.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Piccolo详解:轻量级ORM的高效使用指南

一、Piccolo库核心概述

Piccolo是一款专为Python开发者设计的轻量级异步ORM(对象关系映射)框架,主要用于简化数据库的操作流程,支持PostgreSQL、SQLite等主流数据库,同时兼容同步与异步编程模式。其工作原理是将Python类映射为数据库表,通过面向对象的语法替代原生SQL语句,降低数据库操作的复杂度。

该库的优点突出:异步特性适配高并发场景,语法简洁易上手,支持自动生成迁移文件,且体积小巧、无过多依赖;缺点则是生态相较于Django ORM、SQLAlchemy更小众,部分高级功能有待完善。Piccolo采用MIT开源许可证,允许开发者自由使用、修改和分发,无商业使用限制。

二、Piccolo库安装与环境配置

2.1 安装命令

Piccolo支持通过pip包管理器一键安装,无论是同步环境还是异步环境,安装命令一致。打开终端,输入以下命令:

pip install piccolo

安装完成后,可通过以下命令验证安装是否成功:

python -m piccolo --version

若终端输出Piccolo的版本号,说明安装成功。

2.2 支持的Python与数据库版本

  • Python版本:推荐Python 3.8及以上版本,确保异步特性和语法兼容性。
  • 数据库版本:
  • SQLite:3.20.0及以上版本(无需额外配置,开箱即用);
  • PostgreSQL:10.0及以上版本(需提前安装并启动数据库服务)。

三、Piccolo核心功能与基础用法

3.1 定义数据库表模型

Piccolo的核心是通过Table类定义数据库表结构,每个类属性对应表中的一个字段。我们以创建一个“用户信息表”为例,演示模型定义的方法。

3.1.1 基础模型定义代码

from piccolo.table import Table
from piccolo.columns import Varchar, Int, Boolean, Timestamp

class User(Table):
    """
    用户信息表模型
    字段说明:
    - username: 用户名,字符串类型,长度50,非空且唯一
    - age: 年龄,整数类型
    - is_active: 是否激活,布尔类型,默认值为True
    - create_time: 创建时间,时间戳类型,默认自动填充当前时间
    """
    username = Varchar(length=50, null=False, unique=True)
    age = Int(null=True)
    is_active = Boolean(default=True)
    create_time = Timestamp(default=lambda: datetime.now())

代码说明

  1. 导入Table基类和需要的字段类型(VarcharInt等);
  2. 定义User类并继承Table,该类会被映射为数据库中的user表;
  3. 每个类属性对应表字段,通过参数指定字段约束(如null=False表示非空,unique=True表示唯一)。

3.1.2 字段类型与常用约束

Piccolo提供了丰富的字段类型,满足不同业务需求,常见字段及约束如下:
| 字段类型 | 作用 | 常用约束 |
|-||-|
| Varchar | 字符串类型 | length(长度)、null(是否允许空)、unique(是否唯一) |
| Int | 整数类型 | default(默认值)、choices(可选值列表) |
| Boolean | 布尔类型 | default(默认值) |
| Timestamp | 时间戳类型 | default(默认值,支持lambda函数) |
| ForeignKey | 外键类型 | references(关联的表模型) |

3.2 生成数据库迁移文件

在Piccolo中,模型定义完成后,需要生成迁移文件来创建对应的数据库表。迁移文件是数据库结构变更的记录,确保不同环境下的数据库结构一致。

3.2.1 初始化迁移环境

首先,在项目根目录下执行以下命令,初始化Piccolo的配置文件和迁移目录:

piccolo project new my_project

执行完成后,项目会生成piccolo_conf.py配置文件和migrations目录。

3.2.2 配置数据库连接

打开piccolo_conf.py文件,修改数据库连接配置。以SQLite为例:

from piccolo.conf.apps import AppConfig
from piccolo.engine.sqlite import SQLiteEngine

# SQLite数据库连接配置
DB = SQLiteEngine(path="my_database.db")

# 注册包含表模型的应用
APP_CONFIG = AppConfig(
    app_name="my_app",
    migrations_folder_path="my_app/migrations",
    table_classes=["my_app.tables.User"],
)

若使用PostgreSQL,配置如下:

from piccolo.engine.postgres import PostgresEngine

DB = PostgresEngine(
    config={
        "database": "my_db",
        "user": "postgres",
        "password": "123456",
        "host": "localhost",
        "port": 5432,
    }
)

3.2.3 创建并应用迁移文件

  1. 生成迁移文件:执行以下命令,Piccolo会自动检测模型变化并生成迁移文件。
piccolo migrations new my_app --auto
  1. 应用迁移文件:将迁移文件中的变更同步到数据库,创建user表。
piccolo migrations forwards my_app

执行成功后,数据库中会生成对应的user表结构。

3.3 数据的增删改查操作

Piccolo支持同步和异步两种数据操作方式,以下分别演示两种模式下的增删改查(CRUD)操作。

3.3.1 同步操作示例

from datetime import datetime
from my_app.tables import User

# 1. 新增数据(Create)
def add_user():
    # 方式一:通过类实例化并保存
    user1 = User(
        username="alice",
        age=25,
        is_active=True,
        create_time=datetime.now()
    )
    user1.save()  # 保存到数据库

    # 方式二:使用create方法直接创建
    User.create(username="bob", age=30, is_active=False)

# 2. 查询数据(Read)
def query_users():
    # 查询所有用户
    all_users = User.objects().all()
    for user in all_users:
        print(f"用户名:{user.username},年龄:{user.age}")

    # 条件查询:查询年龄大于25的激活用户
    active_users = User.objects().where(
        (User.age > 25) & (User.is_active == True)
    )
    print(f"年龄大于25的激活用户数量:{active_users.count()}")

    # 查询单个用户:根据用户名查询
    user = User.objects().get(User.username == "alice")
    print(f"Alice的年龄:{user.age}")

# 3. 更新数据(Update)
def update_user():
    # 修改单个用户的年龄
    user = User.objects().get(User.username == "bob")
    user.age = 31
    user.save()

    # 批量更新:将所有激活用户的年龄加1
    User.objects().where(User.is_active == True).update({User.age: User.age + 1})

# 4. 删除数据(Delete)
def delete_user():
    # 删除单个用户
    user = User.objects().get(User.username == "bob")
    user.remove()

    # 批量删除:删除年龄小于20的用户
    User.objects().where(User.age < 20).remove()

# 执行操作
if __name__ == "__main__":
    add_user()
    query_users()
    update_user()
    delete_user()

代码说明

  • 新增数据:支持实例化对象后save()和直接调用create()两种方式;
  • 查询数据:使用objects()获取查询集,通过where()添加条件,get()查询单条数据,count()统计数量;
  • 更新数据:支持单条数据修改后save()和批量update()
  • 删除数据:支持单条数据remove()和批量删除。

3.3.2 异步操作示例

Piccolo的异步特性基于asyncio实现,适合高并发场景,异步操作的语法与同步操作类似,只需使用async/await关键字。

import asyncio
from datetime import datetime
from my_app.tables import User

# 异步新增数据
async def async_add_user():
    user1 = User(username="charlie", age=28)
    await user1.save()  # 异步保存
    await User.create(username="david", age=22, is_active=False)

# 异步查询数据
async def async_query_users():
    all_users = await User.objects().all()
    for user in all_users:
        print(f"异步查询 - 用户名:{user.username},年龄:{user.age}")

    # 异步条件查询
    active_users = await User.objects().where(User.is_active == True).run()
    print(f"异步查询 - 激活用户数量:{len(active_users)}")

# 异步更新数据
async def async_update_user():
    await User.objects().where(User.username == "charlie").update({User.age: 29})

# 异步删除数据
async def async_delete_user():
    await User.objects().where(User.username == "david").remove()

# 执行异步操作
async def main():
    await async_add_user()
    await async_query_users()
    await async_update_user()
    await async_delete_user()

if __name__ == "__main__":
    asyncio.run(main())

代码说明

  • 异步操作需在async函数中执行,通过await调用Piccolo的异步方法;
  • run()方法用于执行异步查询集,获取结果列表;
  • 最后通过asyncio.run()启动异步事件循环。

四、Piccolo高级功能与应用

4.1 表关联(外键)操作

在实际项目中,表与表之间通常存在关联关系,如“用户表”和“订单表”的一对多关系。以下演示如何通过Piccolo定义外键关联并进行关联查询。

4.1.1 定义关联表模型

from piccolo.table import Table
from piccolo.columns import Varchar, Int, ForeignKey, Decimal
from my_app.tables import User

class Order(Table):
    """
    订单表模型
    外键关联User表,一个用户可以有多个订单
    """
    order_no = Varchar(length=30, unique=True, null=False)  # 订单编号
    amount = Decimal(precision=10, scale=2)  # 订单金额
    user = ForeignKey(references=User)  # 外键关联用户表

# 生成并应用迁移文件,创建order表
# 命令:piccolo migrations new my_app --auto && piccolo migrations forwards my_app

4.1.2 关联查询操作

from my_app.tables import User, Order

# 同步关联查询:查询某个用户的所有订单
def query_user_orders():
    user = User.objects().get(User.username == "alice")
    # 通过外键反向查询用户的订单
    orders = Order.objects().where(Order.user == user)
    for order in orders:
        print(f"用户{user.username}的订单:{order.order_no},金额:{order.amount}")

# 异步关联查询
async def async_query_user_orders():
    user = await User.objects().get(User.username == "alice")
    orders = await Order.objects().where(Order.user == user).run()
    for order in orders:
        print(f"异步查询 - 用户{user.username}的订单:{order.order_no}")

# 执行查询
query_user_orders()
asyncio.run(async_query_user_orders())

代码说明

  • 外键通过ForeignKey字段定义,references参数指定关联的表模型;
  • 关联查询时,可通过外键字段作为条件,查询关联表的数据。

4.2 数据筛选与排序

Piccolo提供了丰富的筛选和排序方法,满足复杂的查询需求。

from my_app.tables import User

# 数据筛选:多条件组合、模糊查询
def filter_users():
    # 模糊查询:用户名包含"li"的用户
    users = User.objects().where(User.username.like("%li%"))

    # 范围查询:年龄在20-30之间的用户
    users = User.objects().where((User.age >= 20) & (User.age <= 30))

    # 排序:按年龄降序排列
    sorted_users = User.objects().order_by(User.age, ascending=False)
    for user in sorted_users:
        print(f"用户名:{user.username},年龄:{user.age}")

filter_users()

代码说明

  • like()方法用于模糊查询,%表示通配符;
  • order_by()方法用于排序,ascending=False表示降序。

4.3 数据库事务操作

事务可以确保一系列数据库操作要么全部成功,要么全部失败,保证数据一致性。Piccolo支持同步和异步事务。

4.3.1 同步事务示例

from piccolo.utils.transaction import transaction
from my_app.tables import User

@transaction()
def transaction_demo():
    # 事务内的操作
    User.create(username="eva", age=24)
    User.create(username="frank", age=26)
    # 若执行过程中抛出异常,事务会回滚
    # 例如:raise Exception("模拟异常,事务回滚")

# 执行事务
transaction_demo()

4.3.2 异步事务示例

from piccolo.utils.transaction import async_transaction

@async_transaction()
async def async_transaction_demo():
    await User.create(username="grace", age=27)
    await User.create(username="henry", age=29)

asyncio.run(async_transaction_demo())

代码说明

  • 同步事务使用@transaction()装饰器,异步事务使用@async_transaction()装饰器;
  • 事务内的所有操作会被包裹,若出现异常则自动回滚。

五、实际项目案例:用户管理系统

我们以一个简单的用户管理系统为例,整合Piccolo的核心功能,实现用户的注册、查询、更新和删除功能。

5.1 项目目录结构

my_user_system/
├── my_app/
│   ├── __init__.py
│   ├── tables.py       # 表模型定义
│   └── operations.py   # 业务逻辑操作
├── piccolo_conf.py     # Piccolo配置文件
└── main.py             # 程序入口

5.2 代码实现

5.2.1 tables.py(表模型)

from piccolo.table import Table
from piccolo.columns import Varchar, Int, Boolean, Timestamp
from datetime import datetime

class User(Table):
    username = Varchar(length=50, null=False, unique=True)
    age = Int(null=True)
    is_active = Boolean(default=True)
    create_time = Timestamp(default=lambda: datetime.now())

5.2.2 operations.py(业务逻辑)

import asyncio
from my_app.tables import User

# 同步业务操作
class SyncUserOperations:
    @staticmethod
    def register_user(username, age):
        """用户注册"""
        if User.objects().where(User.username == username).exists():
            print(f"用户名{username}已存在")
            return False
        User.create(username=username, age=age)
        print(f"用户{username}注册成功")
        return True

    @staticmethod
    def get_user(username):
        """查询用户"""
        try:
            user = User.objects().get(User.username == username)
            return {
                "username": user.username,
                "age": user.age,
                "is_active": user.is_active,
                "create_time": user.create_time
            }
        except Exception:
            return None

    @staticmethod
    def update_user_age(username, new_age):
        """更新用户年龄"""
        user = User.objects().get(User.username == username)
        if not user:
            return False
        user.age = new_age
        user.save()
        return True

# 异步业务操作
class AsyncUserOperations:
    @staticmethod
    async def register_user(username, age):
        if await User.objects().where(User.username == username).exists():
            print(f"用户名{username}已存在")
            return False
        await User.create(username=username, age=age)
        print(f"用户{username}注册成功")
        return True

    @staticmethod
    async def get_user(username):
        try:
            user = await User.objects().get(User.username == username)
            return {
                "username": user.username,
                "age": user.age,
                "is_active": user.is_active,
                "create_time": user.create_time
            }
        except Exception:
            return None

5.2.3 main.py(程序入口)

import asyncio
from my_app.operations import SyncUserOperations, AsyncUserOperations

# 同步操作演示
def sync_demo():
    SyncUserOperations.register_user("user1", 22)
    SyncUserOperations.register_user("user1", 23)  # 重复注册
    user = SyncUserOperations.get_user("user1")
    print(f"查询用户:{user}")
    SyncUserOperations.update_user_age("user1", 24)
    user = SyncUserOperations.get_user("user1")
    print(f"更新后用户信息:{user}")

# 异步操作演示
async def async_demo():
    await AsyncUserOperations.register_user("user2", 25)
    user = await AsyncUserOperations.get_user("user2")
    print(f"异步查询用户:{user}")

if __name__ == "__main__":
    sync_demo()
    asyncio.run(async_demo())

5.3 运行项目

  1. 配置piccolo_conf.py文件,设置数据库连接;
  2. 生成并应用迁移文件:
piccolo migrations new my_app --auto
piccolo migrations forwards my_app
  1. 运行main.py
python main.py

终端会输出用户注册、查询和更新的结果,验证功能的正确性。

六、Piccolo相关资源链接

  • Pypi地址:https://pypi.org/project/piccolos
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。

Python Kafka 开发利器:confluent-kafka-python 从入门到实战

一、confluent-kafka-python 核心概述

1.1 库的用途

confluent-kafka-python 是 Confluent 公司推出的 Kafka Python 客户端,基于高性能的 librdkafka C 库封装而成,主要用于在 Python 程序中实现与 Apache Kafka 集群的高效交互,支持生产者(Producer)向 Kafka 发送消息、消费者(Consumer)从 Kafka 订阅并消费消息,同时兼容 Kafka 的各种高级特性,广泛应用于实时数据管道、日志收集、消息队列解耦等场景。

1.2 工作原理

该库的底层依赖 librdkafka,这是一个工业级的 Kafka 客户端库,提供了可靠的消息传输机制。在 Python 层面,confluent-kafka-python 对 librdkafka 的 API 进行了轻量级封装,实现了生产者的消息分区策略、批量发送、消息确认,以及消费者的群组协调、自动提交偏移量、消息回溯等核心功能。其工作流程遵循 Kafka 的标准模型:生产者将消息发送到指定 Topic,Kafka 集群存储消息,消费者订阅 Topic 并拉取消息进行处理。

1.3 优缺点分析

优点

  • 性能优异:基于 C 语言的 librdkafka,吞吐量和延迟表现远超纯 Python 实现的 Kafka 客户端(如 kafka-python)。
  • 功能全面:支持 Kafka 的所有核心特性,包括事务消息、压缩算法、SSL 加密、SASL 认证、自定义分区器等。
  • 稳定性高:经过大规模生产环境验证,适合高并发、高可用的场景。
  • 配置灵活:提供丰富的配置参数,可针对生产者和消费者进行精细化调优。

缺点

  • 安装依赖:需要系统中安装 librdkafka 库,Windows 平台安装相对复杂。
  • 学习曲线:部分高级配置参数(如分区策略、偏移量管理)需要对 Kafka 原理有一定理解。
  • 跨平台兼容:在一些小众操作系统上可能存在编译问题,需要手动调整编译参数。

1.4 License 类型

confluent-kafka-python 采用 Apache License 2.0 开源协议,允许用户自由使用、修改和分发代码,可用于商业项目,只需保留原作者的版权声明。

二、confluent-kafka-python 安装与环境准备

2.1 系统依赖安装

由于 confluent-kafka-python 依赖 librdkafka,在安装 Python 包之前需要先安装系统级的 librdkafka 库。

2.1.1 Linux 系统(Ubuntu/Debian)

sudo apt-get update
sudo apt-get install librdkafka-dev

2.1.2 Linux 系统(CentOS/RHEL)

sudo yum install librdkafka-devel

2.1.3 macOS 系统

使用 Homebrew 安装:

brew install librdkafka

2.1.4 Windows 系统

Windows 平台安装相对复杂,推荐两种方式:

  1. 使用预编译的二进制包:从 librdkafka 官网 下载预编译的 Windows 版本,解压后将库文件路径添加到系统环境变量 PATH 中。
  2. 使用 WSL(Windows Subsystem for Linux):在 WSL 中安装 Linux 版本的依赖,然后在 WSL 中运行 Python 程序。

2.2 Python 包安装

系统依赖安装完成后,使用 pip 安装 confluent-kafka-python:

pip install confluent-kafka

验证安装是否成功:

import confluent_kafka
print(confluent_kafka.__version__)

运行上述代码,如果输出库的版本号(如 2.2.0),则说明安装成功。

三、核心功能实战:生产者与消费者

3.1 Kafka 环境准备

在进行代码实战前,需要确保有一个可用的 Kafka 集群。如果是本地测试,可以使用 Docker 快速启动单节点 Kafka 和 ZooKeeper:

# 启动 ZooKeeper
docker run -d --name zookeeper -p 2181:2181 confluentinc/cp-zookeeper:7.4.0 \
  ZOOKEEPER_CLIENT_PORT=2181 \
  ZOOKEEPER_TICK_TIME=2000

# 启动 Kafka
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper confluentinc/cp-kafka:7.4.0 \
  KAFKA_BROKER_ID=1 \
  KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT \
  KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

上述命令启动了一个单节点 Kafka 集群,监听本地 9092 端口。

3.2 生产者(Producer)实战

生产者的核心功能是向 Kafka Topic 发送消息。confluent-kafka-python 提供了 Producer 类,支持同步发送、异步发送、批量发送等多种模式。

3.2.1 基础异步生产者

异步发送是 Kafka 生产者的默认模式,特点是无需等待消息发送结果,通过回调函数处理发送成功或失败的通知,效率更高。

代码示例

from confluent_kafka import Producer
import json
import time

# 1. 配置生产者参数
producer_config = {
    "bootstrap.servers": "localhost:9092",  # Kafka 集群地址
    "client.id": "python-producer-demo",    # 客户端标识
    "acks": "1",                            # 消息确认级别:1 表示 leader 确认即可
    "retries": 3,                           # 发送失败重试次数
    "linger.ms": 5,                         # 批量发送延迟时间(毫秒)
    "compression.type": "gzip"              # 消息压缩算法
}

# 2. 初始化生产者
producer = Producer(producer_config)

# 3. 定义发送结果回调函数
def delivery_report(err, msg):
    """
    消息发送结果回调函数
    :param err: 发送失败时的错误信息,成功时为 None
    :param msg: 发送成功的消息元数据
    """
    if err is not None:
        print(f"消息发送失败: {err}")
    else:
        print(f"消息发送成功 -> Topic: {msg.topic()}, Partition: {msg.partition()}, Offset: {msg.offset()}")

# 4. 发送消息
topic = "test_topic"  # 目标 Topic

# 循环发送 10 条测试消息
for i in range(10):
    # 构造消息内容
    message_data = {
        "id": i,
        "content": f"Hello Kafka from Python - {i}",
        "timestamp": time.time()
    }
    # 将字典转换为 JSON 字符串
    message_value = json.dumps(message_data).encode("utf-8")

    # 发送消息:key 用于分区路由,value 为消息内容
    producer.produce(
        topic=topic,
        key=str(i).encode("utf-8"),
        value=message_value,
        on_delivery=delivery_report
    )

    # 触发消息发送(异步模式下需要定期调用 poll 处理事件)
    producer.poll(0)

    # 模拟业务延迟
    time.sleep(0.5)

# 5. 等待所有待发送消息完成
producer.flush()
print("所有消息发送完成!")

代码说明

  • 配置参数bootstrap.servers 指定 Kafka 集群地址,acks 设置消息确认级别(0=无确认,1=leader 确认,all=所有副本确认),retries 设置重试次数,linger.ms 控制批量发送的延迟时间,compression.type 启用 gzip 压缩以减少网络传输量。
  • 回调函数delivery_report 函数用于处理消息发送结果,当消息成功发送或失败时会被调用。
  • produce 方法:用于发送消息,key 会影响消息的分区策略(相同 key 的消息会被发送到同一个分区),value 为消息的二进制内容。
  • poll 方法:异步模式下必须定期调用 poll 方法,处理 Kafka 的事件(如回调函数执行),参数 0 表示非阻塞。
  • flush 方法:等待所有待发送的消息完成发送,确保程序退出前消息不会丢失。

3.2.2 同步生产者

同步发送模式下,程序会阻塞直到收到 Kafka 的确认响应,适合对消息发送结果有强依赖的场景。

代码示例

from confluent_kafka import Producer, KafkaError
import json

# 配置生产者参数
producer_config = {
    "bootstrap.servers": "localhost:9092",
    "acks": "all",  # 最高级别确认,确保消息可靠性
    "retries": 5
}

producer = Producer(producer_config)
topic = "test_topic"

def send_message_sync(topic, key, value):
    """
    同步发送消息
    """
    try:
        # 发送消息并等待结果
        producer.produce(topic, key=key, value=value)
        # 阻塞直到消息发送完成
        producer.flush()
        print("消息同步发送成功")
    except KafkaError as e:
        print(f"消息同步发送失败: {e}")

# 构造消息
message_value = json.dumps({"data": "Sync Message from Python"}).encode("utf-8")
send_message_sync(topic, b"sync_key", message_value)

代码说明

  • 同步发送的核心是调用 flush 方法,该方法会阻塞直到所有待发送消息处理完成。
  • 通过捕获 KafkaError 异常,可以处理发送过程中的错误。

3.3 消费者(Consumer)实战

消费者的核心功能是订阅 Kafka Topic 并拉取消息进行处理。confluent-kafka-python 提供了 Consumer 类,支持消费者群组、自动提交偏移量、手动提交偏移量等功能。

3.3.1 基础消费者(自动提交偏移量)

自动提交偏移量是消费者的默认模式,Kafka 会定期自动将消费者的偏移量提交到集群,简化开发流程。

代码示例

from confluent_kafka import Consumer, KafkaError
import json

# 1. 配置消费者参数
consumer_config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "python-consumer-group",  # 消费者群组 ID
    "auto.offset.reset": "earliest",      # 当没有初始偏移量时,从最早的消息开始消费
    "enable.auto.commit": True,           # 启用自动提交偏移量
    "auto.commit.interval.ms": 5000       # 自动提交间隔时间(毫秒)
}

# 2. 初始化消费者
consumer = Consumer(consumer_config)

# 3. 订阅 Topic
topic = "test_topic"
consumer.subscribe([topic])
print(f"消费者已订阅 Topic: {topic}")

# 4. 消费消息
try:
    while True:
        # 拉取消息,超时时间设置为 1 秒
        msg = consumer.poll(timeout=1.0)

        # 如果没有消息,继续循环
        if msg is None:
            continue

        # 处理错误
        if msg.error():
            # 处理分区 EOF 事件
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f"已到达分区末尾 -> Topic: {msg.topic()}, Partition: {msg.partition()}, Offset: {msg.offset()}")
            else:
                print(f"消费消息出错: {msg.error()}")
            continue

        # 处理正常消息
        key = msg.key().decode("utf-8") if msg.key() else None
        value = json.loads(msg.value().decode("utf-8"))
        print(f"消费到消息 -> Key: {key}, Value: {value}, Topic: {msg.topic()}, Partition: {msg.partition()}, Offset: {msg.offset()}")

except KeyboardInterrupt:
    print("用户中断消费")
finally:
    # 关闭消费者,提交最后一次偏移量
    consumer.close()
    print("消费者已关闭")

代码说明

  • 配置参数group.id 指定消费者群组 ID,同一群组的消费者会负载均衡消费 Topic 的分区;auto.offset.reset 设置当消费者没有初始偏移量时的策略(earliest 从最早消息开始,latest 从最新消息开始);enable.auto.commit 启用自动提交,auto.commit.interval.ms 设置自动提交的间隔时间。
  • subscribe 方法:订阅一个或多个 Topic,支持正则表达式(如 subscribe(["test_*"]))。
  • poll 方法:拉取消息,timeout 参数设置超时时间(毫秒),超时后返回 None。
  • 消息处理:通过 msg.key()msg.value() 获取消息的键和值,需要进行解码;msg.error() 用于判断消息是否有错误,KafkaError._PARTITION_EOF 表示到达分区末尾。

3.3.2 高级消费者(手动提交偏移量)

手动提交偏移量可以更精确地控制消息的消费进度,确保消息被成功处理后再提交偏移量,避免消息丢失。适合对数据一致性要求高的场景(如金融交易、订单处理)。

代码示例

from confluent_kafka import Consumer, KafkaError, TopicPartition
import json

# 1. 配置消费者参数(关闭自动提交)
consumer_config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "python-manual-commit-group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False  # 关闭自动提交
}

# 2. 初始化消费者
consumer = Consumer(consumer_config)

# 3. 订阅 Topic
topic = "test_topic"
consumer.subscribe([topic])
print(f"手动提交消费者已订阅 Topic: {topic}")

# 4. 消费消息
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f"分区末尾 -> {msg.topic()}-{msg.partition()}:{msg.offset()}")
            else:
                print(f"消费错误: {msg.error()}")
            continue

        # 处理消息
        key = msg.key().decode("utf-8") if msg.key() else None
        value = json.loads(msg.value().decode("utf-8"))
        print(f"消费到消息 -> Key: {key}, Value: {value}")

        # 模拟业务处理(如写入数据库、调用 API)
        # 假设这里的业务逻辑执行成功
        print("业务逻辑处理成功,准备提交偏移量")

        # 5. 手动提交偏移量
        # 方式 1:提交当前消费的消息偏移量
        consumer.commit(msg)
        print(f"偏移量提交成功 -> Topic: {msg.topic()}, Partition: {msg.partition()}, Offset: {msg.offset() + 1}")

        # 方式 2:提交指定分区的偏移量(批量提交)
        # partitions = [TopicPartition(topic, msg.partition(), msg.offset() + 1)]
        # consumer.commit(partitions=partitions)

except KeyboardInterrupt:
    print("用户中断消费")
finally:
    consumer.close()
    print("消费者已关闭")

代码说明

  • 关闭自动提交:将 enable.auto.commit 设置为 False,禁用自动提交功能。
  • 手动提交方式
  1. consumer.commit(msg):提交当前消费的消息的偏移量,Kafka 会记录该消费者群组在对应分区的偏移量为 msg.offset() + 1(下一次从该偏移量开始消费)。
  2. consumer.commit(partitions=partitions):批量提交多个分区的偏移量,适合批量处理消息的场景。
  • 业务一致性:手动提交偏移量的核心优势是可以确保消息被成功处理后再提交,避免因程序崩溃导致的消息丢失。例如,在将消息写入数据库并确认写入成功后,再提交偏移量。

3.4 消费者群组与分区分配

Kafka 的消费者群组机制可以实现消息的负载均衡,当多个消费者属于同一个 group.id 时,Kafka 会将 Topic 的分区均匀分配给群组内的消费者。

示例场景
假设 test_topic 有 3 个分区,启动 2 个消费者属于同一个群组,则分区分配可能为:消费者 1 分配 2 个分区,消费者 2 分配 1 个分区。当新增一个消费者时,Kafka 会触发分区再平衡,将分区重新分配为每个消费者 1 个分区。

代码验证
启动多个上述的消费者实例(保持 group.id 相同),然后通过生产者发送消息,可以看到不同消费者消费不同分区的消息。

四、高级特性实战

4.1 事务消息

事务消息可以确保生产者发送的多条消息原子性地提交到 Kafka,同时确保消费者只消费已提交的事务消息,适合需要跨多个 Topic 或分区发送消息的场景(如分布式事务)。

代码示例

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json

# 生产者配置(启用事务)
producer_config = {
    "bootstrap.servers": "localhost:9092",
    "client.id": "transactional-producer",
    "acks": "all",
    "transactional.id": "test-transaction-id"  # 事务 ID,确保生产者故障恢复后的幂等性
}

# 初始化生产者并初始化事务
producer = Producer(producer_config)
producer.init_transactions()

try:
    # 开始事务
    producer.begin_transaction()

    # 发送多条消息到不同 Topic
    topic1 = "topic_tx_1"
    topic2 = "topic_tx_2"

    # 发送第一条消息
    producer.produce(topic1, value=b"Transaction Message 1", on_delivery=delivery_report)
    producer.poll(0)

    # 发送第二条消息
    producer.produce(topic2, value=b"Transaction Message 2", on_delivery=delivery_report)
    producer.poll(0)

    # 提交事务
    producer.commit_transaction()
    print("事务提交成功")

except KafkaException as e:
    print(f"事务执行失败,开始回滚: {e}")
    # 回滚事务
    producer.abort_transaction()

finally:
    producer.flush()

代码说明

  • 事务配置:通过 transactional.id 启用事务功能,同一个 transactional.id 的生产者可以确保故障恢复后的幂等性。
  • 事务流程init_transactions 初始化事务,begin_transaction 开始事务,commit_transaction 提交事务,abort_transaction 回滚事务。
  • 消费者事务隔离:消费者可以通过设置 isolation.level 参数控制是否消费未提交的事务消息,read_committed 表示只消费已提交的消息,read_uncommitted 表示消费所有消息。

4.2 SSL 加密与 SASL 认证

在生产环境中,Kafka 集群通常需要启用 SSL 加密和 SASL 认证,以确保数据传输的安全性和访问控制。

生产者配置示例(SASL/PLAIN 认证 + SSL 加密)

producer_config = {
    "bootstrap.servers": "kafka-cluster:9093",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "kafka_user",
    "sasl.password": "kafka_password",
    "ssl.ca.location": "/path/to/ca.pem",  # CA 证书路径
    "ssl.certificate.location": "/path/to/client-cert.pem",  # 客户端证书路径
    "ssl.key.location": "/path/to/client-key.pem"  # 客户端私钥路径
}

producer = Producer(producer_config)

代码说明

  • security.protocol 设置为 SASL_SSL,表示启用 SASL 认证和 SSL 加密。
  • sasl.mechanism 指定 SASL 机制(如 PLAIN、SCRAM-SHA-256)。
  • ssl.ca.location 指定 CA 证书路径,用于验证 Kafka 服务端证书。
  • ssl.certificate.locationssl.key.location 指定客户端证书和私钥,用于双向认证。

五、实际业务案例:实时日志收集系统

5.1 案例背景

某电商平台需要构建一个实时日志收集系统,将用户行为日志(如浏览、点击、下单)从各个业务服务器收集到 Kafka,然后由下游的数据分析系统消费并处理这些日志。

5.2 系统架构

  1. 生产者端:业务服务器上的 Python 脚本收集用户行为日志,发送到 Kafka Topic user_behavior_topic
  2. Kafka 集群:存储用户行为日志,提供高吞吐量和高可用性。
  3. 消费者端:数据分析系统的 Python 脚本消费 user_behavior_topic 的日志,进行实时统计和存储。

5.3 生产者代码实现

from confluent_kafka import Producer
import json
import time
import random

# 生产者配置
producer_config = {
    "bootstrap.servers": "localhost:9092",
    "acks": "1",
    "retries": 3,
    "linger.ms": 10,
    "compression.type": "lz4"
}

producer = Producer(producer_config)

# 回调函数
def delivery_report(err, msg):
    if err:
        print(f"日志发送失败: {err}")
    else:
        print(f"日志发送成功 -> Topic: {msg.topic()}, Offset: {msg.offset()}")

# 模拟用户行为日志
def generate_user_behavior_log():
    user_ids = [f"user_{i}" for i in range(1000)]
    behaviors = ["view", "click", "add_cart", "purchase"]
    products = [f"product_{i}" for i in range(100)]

    return {
        "user_id": random.choice(user_ids),
        "behavior": random.choice(behaviors),
        "product_id": random.choice(products),
        "timestamp": int(time.time() * 1000),
        "ip": f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
    }

# 发送日志到 Kafka
topic = "user_behavior_topic"

try:
    while True:
        # 生成一条用户行为日志
        log_data = generate_user_behavior_log()
        log_value = json.dumps(log_data).encode("utf-8")

        # 发送日志
        producer.produce(topic, value=log_value, on_delivery=delivery_report)
        producer.poll(0)

        # 模拟每秒生成 10 条日志
        time.sleep(0.1)
except KeyboardInterrupt:
    producer.flush()
    print("日志生产者已停止")

5.4 消费者代码实现

from confluent_kafka import Consumer, KafkaError
import json
import pandas as pd
from collections import defaultdict

# 消费者配置
consumer_config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "user_behavior_consumer_group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False
}

consumer = Consumer(consumer_config)
consumer.subscribe(["user_behavior_topic"])

# 统计用户行为次数
behavior_count = defaultdict(int)
# 批量处理消息的阈值
BATCH_SIZE = 100
batch_messages = []

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue

        if msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                print(f"消费错误: {msg.error()}")
            continue

        # 解析日志消息
        log_data = json.loads(msg.value().decode("utf-8"))
        batch_messages.append(log_data)

        # 当批量消息达到阈值时,进行统计处理
        if len(batch_messages) >= BATCH_SIZE:
            # 转换为 DataFrame 进行分析
            df = pd.DataFrame(batch_messages)
            # 统计每种行为的次数
            behavior_stats = df["behavior"].value_counts()
            # 更新全局统计结果
            for behavior, count in behavior_stats.items():
                behavior_count[behavior] += count

            print("=" * 50)
            print("用户行为统计结果:")
            for behavior, count in behavior_count.items():
                print(f"{behavior}: {count}")
            print("=" * 50)

            # 提交偏移量
            consumer.commit(msg)
            # 清空批量消息列表
            batch_messages = []

except KeyboardInterrupt:
    print("用户中断消费")
finally:
    # 处理剩余的消息
    if batch_messages:
        df = pd.DataFrame(batch_messages)
        behavior_stats = df["behavior"].value_counts()
        for behavior, count in behavior_stats.items():
            behavior_count[behavior] += count
        print("最终统计结果:")
        for behavior, count in behavior_count.items():
            print(f"{behavior}: {count}")
    consumer.close()

5.5 案例总结

该案例利用 confluent-kafka-python 的高性能特性,实现了大规模日志的实时收集和处理。生产者端通过批量发送和压缩提高了发送效率,消费者端通过批量处理和手动提交偏移量确保了数据处理的准确性和效率。同时,该系统具有良好的扩展性,新增业务服务器只需部署生产者脚本,新增数据分析任务只需新增消费者群组。

六、相关资源链接

  • Pypi地址:https://pypi.org/project/confluent-kafka
  • Github地址:https://github.com/confluentinc/confluent-kafka-python
  • 官方文档地址:https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:dataset库快速上手,轻松搞定数据库操作

一、dataset库的用途、工作原理、优缺点及License

用途:dataset是一款轻量级Python库,核心作用是简化关系型数据库的操作流程,无需编写复杂的SQL语句,就能实现数据的增删改查,同时支持多种数据库(SQLite、MySQL、PostgreSQL等),特别适合快速开发原型、小型应用和数据处理脚本。
工作原理:基于SQLAlchemy构建,将数据库表映射为Python中的字典对象,通过直观的API调用完成数据库交互,自动处理表结构创建、字段类型推断等底层工作,降低数据库操作门槛。
优缺点:优点是语法简洁、上手快、支持多数据库、自动管理表结构;缺点是不适合复杂的数据库事务和高性能场景,灵活性略逊于原生SQL。
License:采用MIT开源许可证,可自由用于商业和非商业项目。

二、dataset库的安装

dataset库可通过Python包管理工具pip直接安装,安装命令非常简单,打开命令行终端,输入以下指令:

pip install dataset

该命令会自动下载并安装dataset库及其依赖项(如SQLAlchemy等)。安装完成后,在Python脚本中导入dataset即可开始使用。

三、dataset库的核心使用方式及实例代码

3.1 连接数据库

dataset支持多种数据库,不同数据库的连接字符串格式不同,下面是最常用的几种数据库连接方式。

3.1.1 连接SQLite数据库

SQLite是文件型数据库,无需额外配置服务器,适合本地开发和小型应用。连接SQLite时,只需指定数据库文件路径,若文件不存在,dataset会自动创建。

import dataset

# 连接SQLite数据库,文件名为mydatabase.db,不存在则自动创建
db = dataset.connect('sqlite:///mydatabase.db')

# 连接内存中的SQLite数据库(程序结束后数据消失,适合测试)
# db = dataset.connect('sqlite:///:memory:')

代码说明dataset.connect()方法接收一个数据库连接字符串,SQLite的连接字符串格式为sqlite:///文件路径,内存数据库则用sqlite:///:memory:

3.1.2 连接MySQL数据库

连接MySQL需要先安装对应的驱动mysql-connector-pythonpymysql,这里以pymysql为例,先安装依赖:

pip install pymysql

然后编写连接代码:

import dataset

# MySQL连接字符串格式:mysql+pymysql://用户名:密码@主机地址:端口/数据库名
db = dataset.connect('mysql+pymysql://root:123456@localhost:3306/mydb')

代码说明:连接字符串中需要替换为自己的MySQL用户名、密码、主机和数据库名,若数据库不存在,需要先在MySQL中创建。

3.1.3 连接PostgreSQL数据库

连接PostgreSQL需要安装驱动psycopg2-binary,安装命令:

pip install psycopg2-binary

连接代码如下:

import dataset

# PostgreSQL连接字符串格式:postgresql://用户名:密码@主机地址:端口/数据库名
db = dataset.connect('postgresql://postgres:123456@localhost:5432/mydb')

代码说明:同样需要替换为实际的PostgreSQL连接信息,确保数据库服务已启动。

3.2 操作数据库表

dataset将数据库表视为“集合”(Table),通过db['表名']的方式获取表对象,无需提前创建表结构,插入数据时会自动推断字段类型并创建表。

3.2.1 创建表并插入数据

单条数据插入:使用表对象的insert()方法,传入字典类型的数据,字典的键对应表的字段名,值对应字段值。

import dataset

# 连接SQLite数据库
db = dataset.connect('sqlite:///mydatabase.db')

# 获取或创建名为user的表
user_table = db['user']

# 插入单条用户数据
user_data = {
    'name': '张三',
    'age': 25,
    'gender': '男',
    'email': '[email protected]'
}
# 执行插入操作,返回插入的数据(包含自动生成的id)
inserted_user = user_table.insert(user_data)
print('插入的用户数据:', inserted_user)

代码说明:当db['user']被调用时,若user表不存在,dataset会自动创建该表;insert()方法执行后,会返回插入的数据,其中id字段是默认自动生成的主键。

多条数据批量插入:使用insert_many()方法,传入包含多个字典的列表,适合一次性插入大量数据,效率高于单条插入。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 批量准备3条用户数据
batch_user_data = [
    {'name': '李四', 'age': 28, 'gender': '女', 'email': '[email protected]'},
    {'name': '王五', 'age': 30, 'gender': '男', 'email': '[email protected]'},
    {'name': '赵六', 'age': 22, 'gender': '男', 'email': '[email protected]'}
]

# 执行批量插入
user_table.insert_many(batch_user_data)
print('批量插入完成!')

代码说明insert_many()方法接收一个字典列表,一次性将所有数据插入表中,减少数据库交互次数,提升插入效率。

3.2.2 查询数据

dataset提供了多种灵活的查询方法,满足不同场景的查询需求,无需编写SELECT语句。

查询所有数据:使用表对象的all()方法,返回表中所有数据,结果为ResultIter对象,可迭代遍历。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 查询user表中所有数据
all_users = user_table.all()

# 遍历并打印所有用户信息
print('所有用户信息:')
for user in all_users:
    print(f"ID: {user['id']}, 姓名: {user['name']}, 年龄: {user['age']}, 邮箱: {user['email']}")

代码说明all()方法返回的ResultIter对象可以像列表一样遍历,每个元素是一个字典,对应表中的一行数据。

条件查询数据:使用find()方法,传入查询条件的关键字参数,实现按条件筛选数据;也可以使用find_one()方法查询满足条件的单条数据。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 条件查询:查询年龄等于25的用户
users_age_25 = user_table.find(age=25)
print('年龄为25的用户:')
for user in users_age_25:
    print(user)

# 条件查询:查询性别为男且年龄大于25的用户
# 使用**kwargs传入多条件,默认是AND关系
male_users_over_25 = user_table.find(gender='男', age={'gt': 25})
print('\n性别为男且年龄大于25的用户:')
for user in male_users_over_25:
    print(user)

# 查询单条数据:查询姓名为李四的用户
lisi_user = user_table.find_one(name='李四')
print('\n姓名为李四的用户:', lisi_user)

代码说明find()方法的关键字参数支持多种比较运算符,如{'gt': 25}表示大于25,{'lt': 30}表示小于30,{'ge': 22}表示大于等于22,{'le': 28}表示小于等于28;find_one()方法返回满足条件的第一条数据,若没有则返回None

排序和分页查询:在find()方法中通过_order_by参数指定排序字段,通过_limit_offset参数实现分页。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 按年龄降序排序,查询前2条数据
sorted_users = user_table.find(_order_by='-age', _limit=2)
print('按年龄降序的前2名用户:')
for user in sorted_users:
    print(user)

# 分页查询:第2页,每页2条数据(_offset表示跳过的条数,_offset=2表示跳过前2条)
page_users = user_table.find(_limit=2, _offset=2)
print('\n分页查询第2页数据:')
for user in page_users:
    print(user)

代码说明:排序字段前加-表示降序,不加则为升序;分页时,_offset的计算方式为(页码-1)*每页条数,例如第2页、每页2条,_offset=2

3.2.3 更新数据

使用表对象的update()方法更新数据,需要传入要更新的字段字典和查询条件字典,指定更新哪些数据。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 更新数据:将姓名为张三的用户的邮箱改为[email protected]
# 参数1:要更新的字段字典;参数2:查询条件字典
user_table.update(
    {'email': '[email protected]'},  # 更新的字段
    {'name': '张三'}  # 更新条件
)
print('数据更新完成!')

# 验证更新结果
updated_zhangsan = user_table.find_one(name='张三')
print('更新后的张三信息:', updated_zhangsan)

代码说明update()方法的第一个参数是要修改的字段和新值,第二个参数是筛选条件,只有满足条件的数据会被更新。

3.2.4 删除数据

使用delete()方法删除数据,传入查询条件字典,删除满足条件的记录。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 删除数据:删除姓名为赵六的用户
user_table.delete(name='赵六')
print('数据删除完成!')

# 验证删除结果
deleted_user = user_table.find_one(name='赵六')
print('删除后查询赵六:', deleted_user)  # 输出None,表示已删除

代码说明delete()方法根据传入的条件删除数据,执行后无法撤销,使用时需要谨慎。

3.3 事务管理

事务是数据库操作的重要特性,确保一系列操作要么全部成功,要么全部失败。dataset通过db.begin()db.commit()db.rollback()方法实现事务管理。

import dataset

db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']

# 开启事务
db.begin()
try:
    # 执行多个数据库操作
    user_table.insert({'name': '孙七', 'age': 35, 'gender': '男', 'email': '[email protected]'})
    user_table.update({'age': 36}, {'name': '孙七'})
    # 提交事务:所有操作生效
    db.commit()
    print('事务提交成功!')
except Exception as e:
    # 发生异常时回滚事务:所有操作撤销
    db.rollback()
    print(f'事务执行失败,已回滚,错误信息:{e}')

代码说明db.begin()开启事务,在try块中执行数据库操作,若全部成功则调用db.commit()提交事务;若发生异常,则调用db.rollback()回滚事务,撤销所有未提交的操作,保证数据一致性。

四、实际案例:使用dataset库构建简易学生成绩管理系统

4.1 案例需求

构建一个简易的学生成绩管理系统,支持以下功能:

  1. 添加学生成绩信息(学号、姓名、科目、分数);
  2. 查询指定学生的所有科目成绩;
  3. 更新指定学生指定科目的分数;
  4. 删除指定学号的学生成绩信息;
  5. 查询所有学生的成绩并按分数降序排序。

4.2 代码实现

import dataset

class ScoreManagementSystem:
    def __init__(self, db_path):
        # 初始化数据库连接
        self.db = dataset.connect(f'sqlite:///{db_path}')
        # 获取或创建score表
        self.score_table = self.db['score']

    def add_score(self, student_id, name, subject, score):
        """添加学生成绩信息"""
        score_data = {
            'student_id': student_id,
            'name': name,
            'subject': subject,
            'score': score
        }
        self.score_table.insert(score_data)
        print(f'成功添加 {name} 的 {subject} 成绩:{score}')

    def query_student_score(self, student_id):
        """查询指定学生的所有科目成绩"""
        scores = self.score_table.find(student_id=student_id, _order_by='subject')
        print(f'\n学号 {student_id} 的学生成绩:')
        for s in scores:
            print(f'科目:{s["subject"]}, 分数:{s["score"]}')
        return scores

    def update_score(self, student_id, subject, new_score):
        """更新指定学生指定科目的分数"""
        self.score_table.update(
            {'score': new_score},
            {'student_id': student_id, 'subject': subject}
        )
        print(f'\n成功更新学号 {student_id} {subject} 科目分数为:{new_score}')

    def delete_student_score(self, student_id):
        """删除指定学号的学生成绩信息"""
        self.score_table.delete(student_id=student_id)
        print(f'\n成功删除学号 {student_id} 的所有成绩信息')

    def query_all_scores(self):
        """查询所有学生成绩并按分数降序排序"""
        all_scores = self.score_table.find(_order_by='-score')
        print('\n所有学生成绩(按分数降序):')
        for s in all_scores:
            print(f'学号:{s["student_id"]}, 姓名:{s["name"]}, 科目:{s["subject"]}, 分数:{s["score"]}')
        return all_scores

# 实例化成绩管理系统
if __name__ == '__main__':
    sms = ScoreManagementSystem('student_scores.db')

    # 1. 添加学生成绩
    sms.add_score('2024001', '张三', '数学', 95)
    sms.add_score('2024001', '张三', '语文', 88)
    sms.add_score('2024002', '李四', '数学', 92)
    sms.add_score('2024003', '王五', '英语', 90)

    # 2. 查询指定学生成绩
    sms.query_student_score('2024001')

    # 3. 更新学生成绩
    sms.update_score('2024001', '语文', 90)
    sms.query_student_score('2024001')  # 验证更新结果

    # 4. 查询所有学生成绩
    sms.query_all_scores()

    # 5. 删除指定学生成绩
    sms.delete_student_score('2024003')
    sms.query_all_scores()  # 验证删除结果

代码说明:本案例通过面向对象的方式封装了学生成绩管理的核心功能,利用dataset库的增删改查API实现数据操作,无需编写任何SQL语句。运行代码后,会自动创建student_scores.db数据库文件,并在其中生成score表,存储学生成绩信息。

五、dataset库相关资源

  • Pypi地址:https://pypi.org/project/dataset
  • Github地址:https://github.com/pudo/dataset
  • 官方文档地址:https://dataset.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:cx-Oracle 零基础入门教程

一、cx-Oracle 库核心介绍

cx-Oracle 是一款专门用于 Python 程序连接 Oracle 数据库的扩展库,能够实现对 Oracle 数据库的查询、插入、更新、删除等各类操作。其工作原理是基于 Oracle Call Interface(OCI)构建,通过调用 Oracle 客户端的底层接口,建立 Python 与 Oracle 数据库之间的通信桥梁,实现数据的高效交互。

该库的优点十分突出,兼容性强,支持 Python 3.x 系列版本和多种 Oracle 数据库版本,数据传输效率高,能直接处理 Oracle 特有的数据类型;缺点则是安装时需要依赖 Oracle 客户端库,配置步骤相对繁琐,且仅专注于 Oracle 数据库,不支持其他类型数据库。cx-Oracle 采用 BSD 开源许可证,用户可自由下载、使用、修改和分发,无商业授权限制。

二、cx-Oracle 安装与环境配置

2.1 安装前准备

在安装 cx-Oracle 之前,必须先安装 Oracle 客户端库,这是连接 Oracle 数据库的前提条件。Oracle 提供了两种轻量级客户端包供选择:

  • Oracle Instant Client:适用于大多数场景,体积小,安装便捷,可从 Oracle 官网下载对应操作系统版本(Windows、Linux、macOS)。
  • Oracle Full Client:功能更全面,包含更多开发工具,适合专业数据库开发人员。

以 Windows 系统为例,安装 Oracle Instant Client 的步骤如下:

  1. 访问 Oracle 官网下载页面(https://www.oracle.com/database/technologies/instant-client/downloads.html),选择与自己系统位数(32 位/64 位)匹配的 Instant Client 包。
  2. 将下载的压缩包解压到指定目录,例如 D:\oracle\instantclient_21_9
  3. 配置系统环境变量:
    • 新增环境变量 ORACLE_HOME,值为解压路径 D:\oracle\instantclient_21_9
    • D:\oracle\instantclient_21_9 添加到系统 PATH 环境变量中。

2.2 安装 cx-Oracle 库

完成 Oracle 客户端配置后,即可通过 Python 的包管理工具 pip 安装 cx-Oracle,打开命令提示符(CMD)或终端,执行以下命令:

pip install cx-Oracle

安装完成后,可在 Python 交互环境中执行 import cx_Oracle 测试是否安装成功,若没有报错,则说明安装完成。

三、cx-Oracle 核心使用方法与代码实例

3.1 建立数据库连接

使用 cx-Oracle 连接 Oracle 数据库,需要提供用户名密码数据库连接字符串。连接字符串的格式通常为 主机名/IP地址:端口号/服务名,具体格式需根据数据库配置调整。

代码实例

import cx_Oracle

# 数据库连接信息
username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"  # 本地数据库示例,orcl为服务名

# 建立连接
try:
    connection = cx_Oracle.connect(username, password, dsn)
    print("数据库连接成功!")
except cx_Oracle.Error as error:
    print(f"数据库连接失败:{error}")
finally:
    # 关闭连接
    if 'connection' in locals() and connection:
        connection.close()
        print("连接已关闭")

代码说明

  • 首先导入 cx_Oracle 库,定义数据库的用户名、密码和连接字符串 dsn
  • 使用 cx_Oracle.connect() 方法建立连接,该方法返回一个连接对象。
  • 通过 try-except 捕获连接过程中可能出现的异常,例如用户名密码错误、网络不通等。
  • 最后在 finally 块中关闭连接,确保无论连接是否成功,都能释放资源。

3.2 执行基础 SQL 查询

建立数据库连接后,需要创建游标对象来执行 SQL 语句。游标对象是 cx-Oracle 执行 SQL 操作的核心载体,支持查询、插入、更新等操作。

代码实例:查询表数据

import cx_Oracle

# 数据库连接信息
username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

# 建立连接并创建游标
connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()  # 创建游标对象

    # 执行查询 SQL 语句
    sql = "SELECT empno, ename, job, sal FROM emp WHERE deptno = :deptno"
    deptno = 20  # 查询20号部门的员工信息
    cursor.execute(sql, deptno=deptno)  # 绑定参数,防止SQL注入

    # 获取查询结果的两种方式
    # 方式1:逐行获取
    print("20号部门员工信息(逐行获取):")
    for row in cursor:
        empno, ename, job, sal = row
        print(f"员工编号:{empno}, 姓名:{ename}, 职位:{job}, 薪资:{sal}")

    # 方式2:一次性获取所有结果
    cursor.execute(sql, deptno=deptno)
    rows = cursor.fetchall()  # 获取所有行数据
    print("\n20号部门员工信息(一次性获取):")
    for row in rows:
        print(f"员工编号:{row[0]}, 姓名:{row[1]}, 职位:{row[2]}, 薪资:{row[3]}")

except cx_Oracle.Error as error:
    print(f"执行查询失败:{error}")
finally:
    # 关闭游标和连接
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 游标对象通过 connection.cursor() 创建,后续所有 SQL 操作都通过游标执行。
  • 执行查询语句时,使用 cursor.execute(sql, 参数) 方法,其中 :deptno 是参数占位符,通过传入 deptno=20 绑定参数,这种方式能有效防止 SQL 注入攻击。
  • 获取查询结果有两种常用方式:
  1. 直接遍历游标对象,逐行读取数据。
  2. 使用 cursor.fetchall() 方法一次性获取所有结果,返回一个包含所有行的列表,每行数据是一个元组。
  • 操作完成后,需依次关闭游标和连接,释放资源。

3.3 执行数据插入操作

cx-Oracle 支持向 Oracle 数据库插入单条或多条数据,插入操作同样通过游标对象执行,执行后需要调用 connection.commit() 提交事务,否则数据不会真正写入数据库。

代码实例:插入单条数据

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 插入数据的 SQL 语句
    insert_sql = """
        INSERT INTO emp (empno, ename, job, mgr, hiredate, sal, comm, deptno)
        VALUES (:empno, :ename, :job, :mgr, :hiredate, :sal, :comm, :deptno)
    """

    # 定义要插入的数据
    emp_data = {
        "empno": 7999,
        "ename": "LIU",
        "job": "CLERK",
        "mgr": 7788,
        "hiredate": cx_Oracle.Date(2024, 1, 10),  # Oracle日期类型
        "sal": 2500,
        "comm": None,
        "deptno": 20
    }

    # 执行插入操作
    cursor.execute(insert_sql, **emp_data)
    connection.commit()  # 提交事务
    print(f"成功插入 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()  # 出错时回滚事务
    print(f"插入数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 插入 SQL 语句中使用多个参数占位符(:empno:ename 等),通过字典 emp_data 传递参数,使用 ** 解包字典。
  • 对于 Oracle 的日期类型,需要使用 cx_Oracle.Date() 方法创建对应的日期对象,确保数据类型匹配。
  • 执行插入操作后,必须调用 connection.commit() 提交事务,否则数据不会持久化到数据库;若出现异常,需调用 connection.rollback() 回滚事务,避免数据不一致。
  • cursor.rowcount 属性可以获取受影响的行数,用于判断插入操作是否成功。

代码实例:批量插入数据
当需要插入大量数据时,使用 cursor.executemany() 方法可以显著提高效率,该方法支持批量执行 SQL 语句。

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 批量插入的 SQL 语句
    batch_insert_sql = """
        INSERT INTO emp (empno, ename, job, deptno)
        VALUES (:empno, :ename, :job, :deptno)
    """

    # 批量数据列表
    batch_data = [
        (8001, "ZHANG", "ANALYST", 20),
        (8002, "WANG", "SALESMAN", 30),
        (8003, "ZHAO", "MANAGER", 10)
    ]

    # 执行批量插入
    cursor.executemany(batch_insert_sql, batch_data)
    connection.commit()
    print(f"成功批量插入 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"批量插入失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • cursor.executemany() 方法接收两个参数,第一个是 SQL 语句,第二个是包含多条数据的列表,列表中的每个元素是一个元组,对应 SQL 语句中的参数。
  • 批量插入相比多次执行单条插入,减少了网络交互和数据库事务的开销,效率更高,适合大数据量的插入场景。

3.4 执行数据更新与删除操作

数据的更新和删除操作与插入操作类似,都是通过游标执行 SQL 语句,然后提交事务。

代码实例:更新数据

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 更新 SQL 语句:更新7999号员工的薪资
    update_sql = "UPDATE emp SET sal = :new_sal WHERE empno = :empno"
    new_sal = 3000
    empno = 7999

    cursor.execute(update_sql, new_sal=new_sal, empno=empno)
    connection.commit()
    print(f"成功更新 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"更新数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码实例:删除数据

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 删除 SQL 语句:删除8003号员工
    delete_sql = "DELETE FROM emp WHERE empno = :empno"
    empno = 8003

    cursor.execute(delete_sql, empno=empno)
    connection.commit()
    print(f"成功删除 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"删除数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 更新和删除操作的 SQL 语句同样使用参数占位符,避免 SQL 注入。
  • 执行后通过 cursor.rowcount 查看受影响的行数,若行数为 0,说明没有符合条件的数据。

3.5 处理 Oracle 特有的数据类型

Oracle 数据库包含一些特有的数据类型,例如 NUMBERDATETIMESTAMPCLOB 等,cx-Oracle 提供了对应的处理方式。

代码实例:处理 CLOB 大文本类型

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 假设存在一个表 test_clob,包含 id 和 content 字段,content 为 CLOB 类型
    # 插入 CLOB 数据
    insert_clob_sql = "INSERT INTO test_clob (id, content) VALUES (:id, :content)"
    clob_content = cx_Oracle.CLOB(connection)  # 创建 CLOB 对象
    clob_content.write("这是一段很长的文本内容,用于测试 CLOB 数据类型的处理方式。" * 100)

    cursor.execute(insert_clob_sql, id=1, content=clob_content)
    connection.commit()
    print("CLOB 数据插入成功")

    # 查询 CLOB 数据
    select_clob_sql = "SELECT content FROM test_clob WHERE id = :id"
    cursor.execute(select_clob_sql, id=1)
    clob_data = cursor.fetchone()[0]
    print(f"CLOB 数据内容(前200字):{clob_data.read()[:200]}")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"处理 CLOB 数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 对于 CLOB 类型的数据,需要先通过 cx_Oracle.CLOB(connection) 创建 CLOB 对象,然后使用 write() 方法写入文本内容。
  • 查询 CLOB 数据时,获取到的是 CLOB 对象,通过 read() 方法可以读取其中的文本内容。

四、cx-Oracle 实际应用案例:员工薪资管理系统

下面结合一个简单的员工薪资管理系统案例,综合展示 cx-Oracle 的使用方法。该案例实现以下功能:

  1. 连接 Oracle 数据库。
  2. 查询指定部门的员工薪资信息。
  3. 给指定员工涨薪。
  4. 新增员工记录。

完整代码实例

import cx_Oracle
from datetime import datetime

class EmpSalaryManager:
    def __init__(self, username, password, dsn):
        """初始化数据库连接信息"""
        self.username = username
        self.password = password
        self.dsn = dsn
        self.connection = None
        self.cursor = None

    def connect_db(self):
        """建立数据库连接"""
        try:
            self.connection = cx_Oracle.connect(self.username, self.password, self.dsn)
            self.cursor = self.connection.cursor()
            print("数据库连接成功!")
        except cx_Oracle.Error as error:
            print(f"连接失败:{error}")
            raise

    def disconnect_db(self):
        """关闭数据库连接"""
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()
        print("数据库连接已关闭")

    def query_dept_salary(self, deptno):
        """查询指定部门的员工薪资信息"""
        try:
            sql = "SELECT empno, ename, sal, hiredate FROM emp WHERE deptno = :deptno"
            self.cursor.execute(sql, deptno=deptno)
            results = self.cursor.fetchall()
            if not results:
                print(f"未查询到{deptno}号部门的员工信息")
                return
            print(f"\n{deptno}号部门员工薪资信息:")
            print("-" * 50)
            print(f"{'员工编号':<10}{'姓名':<10}{'薪资':<10}{'入职日期':<15}")
            print("-" * 50)
            for empno, ename, sal, hiredate in results:
                # 格式化日期显示
                hiredate_str = hiredate.strftime("%Y-%m-%d") if hiredate else "未知"
                print(f"{empno:<10}{ename:<10}{sal:<10}{hiredate_str:<15}")
        except cx_Oracle.Error as error:
            print(f"查询失败:{error}")

    def update_salary(self, empno, add_sal):
        """给指定员工涨薪"""
        try:
            # 先查询原薪资
            check_sql = "SELECT sal FROM emp WHERE empno = :empno"
            self.cursor.execute(check_sql, empno=empno)
            result = self.cursor.fetchone()
            if not result:
                print(f"未找到编号为{empno}的员工")
                return
            old_sal = result[0]
            new_sal = old_sal + add_sal

            # 更新薪资
            update_sql = "UPDATE emp SET sal = :new_sal WHERE empno = :empno"
            self.cursor.execute(update_sql, new_sal=new_sal, empno=empno)
            self.connection.commit()
            print(f"\n员工{empno}涨薪成功!原薪资:{old_sal}, 新薪资:{new_sal}")
        except cx_Oracle.Error as error:
            self.connection.rollback()
            print(f"涨薪失败:{error}")

    def add_employee(self, emp_data):
        """新增员工记录"""
        try:
            insert_sql = """
                INSERT INTO emp (empno, ename, job, mgr, hiredate, sal, comm, deptno)
                VALUES (:empno, :ename, :job, :mgr, :hiredate, :sal, :comm, :deptno)
            """
            self.cursor.execute(insert_sql, **emp_data)
            self.connection.commit()
            print(f"\n新增员工{emp_data['ename']}成功!员工编号:{emp_data['empno']}")
        except cx_Oracle.Error as error:
            self.connection.rollback()
            print(f"新增员工失败:{error}")

# 主程序入口
if __name__ == "__main__":
    # 数据库配置
    username = "scott"
    password = "tiger"
    dsn = "127.0.0.1:1521/orcl"

    # 创建薪资管理对象
    manager = EmpSalaryManager(username, password, dsn)

    try:
        # 连接数据库
        manager.connect_db()

        # 1. 查询20号部门的薪资信息
        manager.query_dept_salary(20)

        # 2. 给7999号员工涨薪500
        manager.update_salary(7999, 500)

        # 3. 新增员工
        new_emp = {
            "empno": 8004,
            "ename": "CHEN",
            "job": "ENGINEER",
            "mgr": 7782,
            "hiredate": cx_Oracle.Date.today(),
            "sal": 4000,
            "comm": 0,
            "deptno": 20
        }
        manager.add_employee(new_emp)

        # 再次查询20号部门信息,查看新增和更新结果
        manager.query_dept_salary(20)

    except Exception as e:
        print(f"程序执行异常:{e}")
    finally:
        # 关闭连接
        manager.disconnect_db()

代码说明

  • 该案例通过面向对象的方式封装了员工薪资管理的功能,EmpSalaryManager 类包含连接数据库、查询薪资、更新薪资、新增员工等方法,代码结构清晰,便于维护和扩展。
  • update_salary 方法中,先查询员工原薪资,再计算新薪资并更新,确保操作的准确性;在 add_employee 方法中,使用 cx_Oracle.Date.today() 获取当前日期作为入职日期。
  • 主程序中调用类的方法,依次执行查询、涨薪、新增员工操作,并再次查询验证结果,展示了 cx-Oracle 在实际项目中的综合应用。

五、相关资源地址

  • Pypi地址:https://pypi.org/project/cx-Oracle
  • Github地址:https://github.com/oracle/python-cx_Oracle
  • 官方文档地址:https://cx-oracle.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:influxdb库入门与实战教程

一、influxdb库核心介绍

1.1 用途

influxdb是Python生态中专门用于对接InfluxDB时序数据库的客户端库,能够实现数据的写入、查询、修改和删除等操作,广泛应用于物联网监控、系统性能指标采集、金融行情数据存储等需要处理时序数据的场景。

1.2 工作原理

该库通过HTTP/HTTPS协议与InfluxDB服务端建立连接,将Python中的数据结构(如字典、列表)转换为InfluxDB支持的Line Protocol格式进行写入;查询时则发送InfluxQL或Flux查询语句,再将返回的结果集解析为Python可处理的对象(如DataFrame)。

1.3 优缺点

优点:轻量易用,API设计贴合Python开发者习惯;支持与Pandas无缝集成,便于数据处理;兼容InfluxDB多个版本。
缺点:对最新版InfluxDB的部分高级特性支持滞后;高并发写入场景下性能需依赖额外优化。

1.4 License类型

influxdb库采用MIT License开源协议,允许自由使用、修改和分发,无论是个人项目还是商业应用都无需支付授权费用。

二、influxdb库安装与环境准备

2.1 安装方式

安装influxdb库的方式非常简单,直接使用Python的包管理工具pip即可完成安装。在命令行中输入以下命令:

pip install influxdb

如果需要安装指定版本以适配特定的InfluxDB服务端,可指定版本号,例如安装5.3.1版本:

pip install influxdb==5.3.1

安装完成后,我们可以在Python环境中验证是否安装成功,执行以下代码:

import influxdb
print(influxdb.__version__)

若控制台输出对应的版本号,说明安装成功。

2.2 环境依赖说明

使用influxdb库前,需要确保本地或远程有可用的InfluxDB服务端。推荐使用InfluxDB 1.x系列版本(2.x版本有单独的客户端库influxdb-client),同时Python环境需满足3.6及以上版本,避免因版本过低导致兼容性问题。

三、influxdb库核心API使用实战

3.1 建立与InfluxDB的连接

在进行任何数据操作前,首先需要建立与InfluxDB数据库的连接。influxdb库提供InfluxDBClient类来实现这一功能,核心参数包括host(服务端地址)、port(端口号)、username(用户名)、password(密码)、database(目标数据库名)等。
示例代码

from influxdb import InfluxDBClient

# 初始化客户端连接
client = InfluxDBClient(
    host='localhost',  # InfluxDB服务端IP
    port=8086,         # 默认端口号
    username='admin',  # 数据库用户名
    password='admin123',  # 数据库密码
    database='test_db' # 要连接的数据库名
)

# 测试连接是否成功
print("连接状态:", "成功" if client.ping() else "失败")

代码说明

  1. 导入InfluxDBClient类后,传入服务端的连接信息初始化客户端对象。
  2. 调用ping()方法测试与服务端的连通性,该方法返回True表示连接成功。
  3. 若连接失败,通常需要检查服务端是否启动、地址和端口是否正确、用户名密码是否匹配。

3.2 创建与删除数据库

连接成功后,我们可以通过客户端对象创建新的数据库,或者删除已有的数据库。

3.2.1 创建数据库

示例代码

# 定义要创建的数据库名
db_name = "iot_monitor"

# 检查数据库是否存在
existing_dbs = client.get_list_database()
db_exists = any(db['name'] == db_name for db in existing_dbs)

if not db_exists:
    client.create_database(db_name)
    print(f"数据库 {db_name} 创建成功")
else:
    print(f"数据库 {db_name} 已存在")

# 切换到新创建的数据库
client.switch_database(db_name)

代码说明

  1. get_list_database()方法会返回所有已存在的数据库列表,格式为[{"name": "db_name1"}, ...]
  2. 通过遍历列表判断目标数据库是否存在,避免重复创建。
  3. create_database()方法用于创建新数据库,switch_database()方法用于切换到目标数据库进行后续操作。

3.2.2 删除数据库

示例代码

# 定义要删除的数据库名
db_to_delete = "test_db"

# 检查数据库是否存在
existing_dbs = client.get_list_database()
db_exists = any(db['name'] == db_to_delete for db in existing_dbs)

if db_exists:
    client.drop_database(db_to_delete)
    print(f"数据库 {db_to_delete} 删除成功")
else:
    print(f"数据库 {db_to_delete} 不存在")

代码说明drop_database()方法接收数据库名作为参数,执行后会删除对应的数据库及其所有数据,操作前需谨慎确认。

3.3 写入时序数据到InfluxDB

时序数据的写入是influxdb库的核心功能之一,数据需要按照Line Protocol的格式组织,该格式的核心结构为:measurement,tag_set field_set timestamp。在Python中,我们可以通过字典列表的形式定义数据,再调用write_points()方法写入。

3.3.1 写入单条数据

示例代码

# 定义要写入的数据
data_point = [
    {
        "measurement": "temperature",  # 测量值名称,类似表名
        "tags": {
            "device_id": "sensor_001",  # 标签,用于分组查询
            "location": "room_101"
        },
        "fields": {
            "value": 25.6,  # 字段,存储具体数值(必须是数值类型)
            "unit": "°C"    # 字段可以是字符串、整数、浮点数
        },
        "time": "2024-01-01T12:00:00Z"  # 时间戳,可选,默认使用当前时间
    }
]

# 写入数据
write_result = client.write_points(data_point)
print(f"数据写入状态: {'成功' if write_result else '失败'}")

代码说明

  1. measurement对应时序数据的“表”,用于分类存储不同类型的数据。
  2. tags是标签字段,为字符串类型,支持索引,适合用于分组查询(如按设备ID、位置分组)。
  3. fields是数值字段,支持整数、浮点数、布尔值和字符串,是时序数据的核心指标。
  4. time是时间戳,格式为ISO 8601,可选参数,若不指定则使用服务端的当前时间。
  5. write_points()方法返回布尔值,表示写入操作是否成功。

3.3.2 写入多条数据

在实际场景中,我们通常需要批量写入多条数据,以提高写入效率。只需扩展字典列表的元素即可实现批量写入。
示例代码

# 定义多条时序数据
batch_data = [
    {
        "measurement": "temperature",
        "tags": {"device_id": "sensor_001", "location": "room_101"},
        "fields": {"value": 25.8, "unit": "°C"},
        "time": "2024-01-01T12:01:00Z"
    },
    {
        "measurement": "temperature",
        "tags": {"device_id": "sensor_001", "location": "room_101"},
        "fields": {"value": 26.0, "unit": "°C"},
        "time": "2024-01-01T12:02:00Z"
    },
    {
        "measurement": "humidity",
        "tags": {"device_id": "sensor_001", "location": "room_101"},
        "fields": {"value": 45.2, "unit": "%"},
        "time": "2024-01-01T12:00:00Z"
    }
]

# 批量写入数据
write_result = client.write_points(batch_data)
print(f"批量数据写入状态: {'成功' if write_result else '失败'}")

代码说明:批量写入多条数据时,不同数据可以属于同一个measurement,也可以属于不同的measurementwrite_points()方法会自动处理这些数据的分类存储。

3.4 查询InfluxDB中的时序数据

数据写入后,我们可以通过query()方法执行InfluxQL查询语句,获取存储的时序数据。查询结果可以直接转换为Pandas DataFrame,便于后续的数据分析和可视化。

3.4.1 基础查询

示例代码

# 执行InfluxQL查询语句
query_str = 'SELECT * FROM temperature WHERE device_id = \'sensor_001\' LIMIT 10'
result = client.query(query_str)

# 将查询结果转换为列表
result_list = list(result.get_points())
print("查询到的温度数据:")
for point in result_list:
    print(f"时间: {point['time']}, 设备ID: {point['device_id']}, 温度: {point['value']} {point['unit']}")

代码说明

  1. InfluxQL的语法与SQL类似,SELECT * FROM measurement表示查询该measurement下的所有数据,WHERE子句用于过滤标签或字段,LIMIT用于限制返回数据的条数。
  2. query()方法返回的是ResultSet对象,通过get_points()方法可以获取数据的迭代器,再转换为列表方便遍历。
  3. 每条数据以字典形式存储,包含timetags字段和fields字段的所有内容。

3.4.2 与Pandas集成查询

influxdb库支持直接将查询结果转换为Pandas DataFrame,这对于数据分析师来说是非常实用的功能。
示例代码

import pandas as pd

# 执行查询并转换为DataFrame
query_str = 'SELECT value, unit FROM temperature WHERE location = \'room_101\''
result = client.query(query_str)
df = pd.DataFrame(result.get_points())

# 打印DataFrame的前5行数据
print("温度数据DataFrame:")
print(df.head())

# 计算温度的平均值
avg_temp = df['value'].mean()
print(f"\nroom_101的平均温度: {avg_temp:.2f} °C")

代码说明

  1. 导入pandas库后,将ResultSet对象转换为DataFrame,数据的结构会更加清晰,便于进行统计分析。
  2. 可以直接使用Pandas的内置方法(如mean())计算数据的统计指标,快速完成数据分析任务。

3.5 删除InfluxDB中的数据

当需要清理过期或无用的数据时,可以使用delete_series()方法删除指定条件的数据。
示例代码

# 删除指定时间之前的温度数据
delete_condition = {
    "measurement": "temperature",
    "tags": {"device_id": "sensor_001"},
    "time__lt": "2024-01-01T12:01:00Z"  # 删除时间小于该时间戳的数据
}

client.delete_series(**delete_condition)
print("符合条件的数据已删除")

代码说明

  1. delete_series()方法支持通过measurementtags和时间条件过滤要删除的数据。
  2. 时间条件的参数格式为time__lt(小于)、time__lte(小于等于)、time__gt(大于)、time__gte(大于等于),后缀对应不同的比较逻辑。
  3. 数据删除操作不可逆,执行前务必确认条件是否正确。

四、实际应用案例:物联网传感器数据监控系统

4.1 案例场景介绍

本案例模拟一个物联网传感器数据监控系统,实现以下功能:

  1. 模拟传感器采集温度和湿度数据。
  2. 将采集到的数据实时写入InfluxDB。
  3. 定时查询并分析传感器数据,当温度超过阈值时输出告警信息。

4.2 完整代码实现

from influxdb import InfluxDBClient
import pandas as pd
import time
import random
from datetime import datetime, timezone

# 配置InfluxDB连接信息
INFLUXDB_HOST = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_USER = 'admin'
INFLUXDB_PWD = 'admin123'
INFLUXDB_DB = 'iot_monitor'

# 传感器配置
DEVICE_ID = 'sensor_002'
LOCATION = 'factory_workshop'
TEMPERATURE_THRESHOLD = 30.0  # 温度告警阈值
COLLECTION_INTERVAL = 5  # 数据采集间隔(秒)
QUERY_INTERVAL = 10  # 数据查询间隔(秒)

def init_influxdb_client():
    """初始化InfluxDB客户端并创建数据库"""
    client = InfluxDBClient(
        host=INFLUXDB_HOST,
        port=INFLUXDB_PORT,
        username=INFLUXDB_USER,
        password=INFLUXDB_PWD
    )
    # 创建数据库
    db_list = client.get_list_database()
    if not any(db['name'] == INFLUXDB_DB for db in db_list):
        client.create_database(INFLUXDB_DB)
    client.switch_database(INFLUXDB_DB)
    return client

def simulate_sensor_data():
    """模拟传感器采集温度和湿度数据"""
    # 生成模拟数据,温度在25-32之间波动,湿度在40-60之间波动
    temperature = round(random.uniform(25.0, 32.0), 2)
    humidity = round(random.uniform(40.0, 60.0), 2)
    # 获取当前UTC时间戳
    current_time = datetime.now(timezone.utc).isoformat()
    # 组织时序数据
    data = [
        {
            "measurement": "temperature",
            "tags": {"device_id": DEVICE_ID, "location": LOCATION},
            "fields": {"value": temperature, "unit": "°C"},
            "time": current_time
        },
        {
            "measurement": "humidity",
            "tags": {"device_id": DEVICE_ID, "location": LOCATION},
            "fields": {"value": humidity, "unit": "%"},
            "time": current_time
        }
    ]
    return data

def write_sensor_data(client, data):
    """将传感器数据写入InfluxDB"""
    try:
        client.write_points(data)
        print(f"[{datetime.now()}] 数据写入成功: 温度={data[0]['fields']['value']}°C, 湿度={data[1]['fields']['value']}%")
    except Exception as e:
        print(f"[{datetime.now()}] 数据写入失败: {str(e)}")

def query_and_alert(client):
    """查询数据并判断是否触发告警"""
    try:
        # 查询最近10条温度数据
        query_str = f"""
            SELECT value FROM temperature 
            WHERE device_id = '{DEVICE_ID}' 
            AND location = '{LOCATION}' 
            ORDER BY time DESC 
            LIMIT 10
        """
        result = client.query(query_str)
        df = pd.DataFrame(result.get_points())
        if not df.empty:
            latest_temp = df['value'].iloc[0]
            avg_temp = df['value'].mean()
            print(f"[{datetime.now()}] 最新温度: {latest_temp:.2f}°C, 近10条平均温度: {avg_temp:.2f}°C")
            # 温度超过阈值时触发告警
            if latest_temp > TEMPERATURE_THRESHOLD:
                print(f"⚠️  告警: 当前温度 {latest_temp:.2f}°C 超过阈值 {TEMPERATURE_THRESHOLD}°C!")
    except Exception as e:
        print(f"[{datetime.now()}] 数据查询失败: {str(e)}")

if __name__ == "__main__":
    # 初始化客户端
    influx_client = init_influxdb_client()
    print("物联网传感器数据监控系统启动...")
    try:
        query_counter = 0
        while True:
            # 采集并写入数据
            sensor_data = simulate_sensor_data()
            write_sensor_data(influx_client, sensor_data)
            # 每隔QUERY_INTERVAL秒查询一次数据并告警
            query_counter += 1
            if query_counter >= QUERY_INTERVAL / COLLECTION_INTERVAL:
                query_and_alert(influx_client)
                query_counter = 0
            # 等待采集间隔
            time.sleep(COLLECTION_INTERVAL)
    except KeyboardInterrupt:
        print("\n系统已停止运行")
    finally:
        # 关闭客户端连接
        influx_client.close()

4.3 代码运行说明

  1. 代码功能分解
    • init_influxdb_client()函数:初始化客户端并创建目标数据库,确保后续操作有可用的存储位置。
    • simulate_sensor_data()函数:模拟传感器生成温度和湿度数据,时间戳使用当前UTC时间,保证时序数据的准确性。
    • write_sensor_data()函数:将模拟数据写入InfluxDB,并输出写入状态。
    • query_and_alert()函数:查询最近的温度数据,计算平均值并判断是否超过阈值,超过时输出告警信息。
    • 主程序循环:按照设定的间隔采集数据、写入数据,并定时执行查询和告警逻辑。
  2. 运行效果
    运行代码后,控制台会实时输出数据写入状态,每隔10秒查询一次数据并输出温度信息,当温度超过30.0°C时,会触发告警提示。按下Ctrl+C可停止程序运行。

五、相关资源

  • Pypi地址:https://pypi.org/project/influxdb
  • Github地址:https://github.com/influxdata/influxdb-python
  • 官方文档地址:https://influxdb-python.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Records:极简数据库操作指南

一、Records库核心概述

Records是一款轻量级Python库,专为简化SQL数据库操作而生,它基于sqlalchemytablib构建,无需繁琐的配置与类定义,一行代码即可实现数据库连接、查询与结果导出。其工作原理是封装SQLAlchemy的引擎,自动管理连接池,同时借助tablib实现查询结果的多格式导出(如CSV、JSON、Excel)。

优点:语法极简,降低数据库操作门槛;支持多种数据库(MySQL、PostgreSQL、SQLite等);内置结果集格式化功能。缺点:高级数据库操作需依赖SQLAlchemy底层接口;更新维护频率较低。该库采用MIT License,可自由用于商业与非商业项目。

二、Records库安装步骤

对于技术小白来说,Records的安装流程非常简单,只需要借助Python的包管理工具pip即可完成,无需配置复杂的环境变量。

2.1 基础安装命令

打开命令提示符(Windows)或终端(Mac/Linux),输入以下命令:

pip install records

该命令会自动下载并安装Records及其依赖库(sqlalchemytablib等)。

2.2 数据库驱动安装

Records支持多种数据库,但不同数据库需要安装对应的驱动,否则会出现连接失败的情况。以下是常用数据库的驱动安装命令:

  • SQLite:无需额外安装驱动,Python内置支持。
  • MySQL/MariaDB
pip install mysqlclient
  • PostgreSQL
pip install psycopg2-binary

三、Records库核心使用方法

Records的核心设计理念是“简洁高效”,通过Database类实现数据库的连接与操作,无需手动管理连接的开启与关闭。下面我们以最常用的SQLite数据库为例(无需配置服务,文件即可存储数据),详细讲解每一个功能的使用方法,并搭配实例代码辅助理解。

3.1 数据库连接

使用Records连接数据库的核心是传入数据库连接字符串,不同数据库的连接字符串格式不同,具体如下:

| 数据库类型 | 连接字符串格式 |
||-|
| SQLite | sqlite:///test.db(相对路径)/ sqlite:////绝对路径/test.db |
| MySQL | mysql://用户名:密码@主机:端口/数据库名 |
| PostgreSQL | postgresql://用户名:密码@主机:端口/数据库名 |

实例代码:连接SQLite数据库

import records

# 连接SQLite数据库,若test.db不存在则自动创建
db = records.Database('sqlite:///test.db')

# 打印数据库连接状态(可选)
print(f"数据库连接成功:{db}")

代码说明

  1. 导入records库后,通过records.Database()方法创建数据库连接对象db
  2. SQLite数据库以文件形式存储,sqlite:///test.db表示在当前目录下创建或使用test.db文件。
  3. 连接成功后,db对象可用于后续的查询、插入、更新等操作。

3.2 执行SQL查询

Records支持执行任意SQL语句,包括SELECTINSERTUPDATEDELETE等,核心方法是db.query()

3.2.1 查询数据(SELECT语句)

实例代码:创建表并查询数据

import records

# 连接数据库
db = records.Database('sqlite:///test.db')

# 1. 执行创建表的SQL语句
create_sql = """
CREATE TABLE IF NOT EXISTS students (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    age INTEGER,
    gender TEXT
)
"""
db.query(create_sql)
print("students表创建成功")

# 2. 插入测试数据
insert_sql = """
INSERT INTO students (name, age, gender) VALUES 
('张三', 18, '男'),
('李四', 19, '女'),
('王五', 20, '男')
"""
db.query(insert_sql)
print("测试数据插入成功")

# 3. 查询表中所有数据
results = db.query("SELECT * FROM students")

# 打印查询结果(默认以OrderedDict格式返回)
for row in results:
    print(f"ID: {row.id}, 姓名: {row.name}, 年龄: {row.age}, 性别: {row.gender}")

代码说明

  1. 首先执行CREATE TABLE语句创建students表,IF NOT EXISTS确保表不存在时才创建,避免重复创建报错。
  2. 执行INSERT语句插入3条测试数据,db.query()方法直接执行SQL语句。
  3. 执行SELECT语句查询数据,返回的results是一个结果集对象,可通过循环遍历每一行数据,每行数据以OrderedDict格式存储,支持通过键名(如row.id)或索引访问。

3.2.2 参数化查询

在实际开发中,直接拼接SQL语句容易引发SQL注入攻击,Records支持参数化查询,通过占位符传递参数,提高安全性。

实例代码:参数化查询指定条件的数据

import records

db = records.Database('sqlite:///test.db')

# 使用参数化查询,占位符为:参数名
gender = '男'
age_limit = 18
results = db.query("SELECT * FROM students WHERE gender = :g AND age >= :a", g=gender, a=age_limit)

# 打印查询结果
print(f"性别为{gender}且年龄大于等于{age_limit}的学生:")
for row in results:
    print(f"ID: {row.id}, 姓名: {row.name}")

代码说明

  1. SQL语句中使用:g:a作为占位符,分别对应后续传入的g=gendera=age_limit参数。
  2. 参数化查询会自动处理参数的转义,避免SQL注入风险,这是开发中的最佳实践。

3.3 结果集格式化导出

Records的一大特色是支持将查询结果导出为多种格式,如CSV、JSON、Excel、YAML等,这一功能依赖于tablib库,无需手动编写导出代码。

实例代码:将查询结果导出为CSV、JSON和Excel文件

import records

db = records.Database('sqlite:///test.db')

# 查询所有学生数据
results = db.query("SELECT * FROM students")

# 1. 导出为CSV文件
with open('students.csv', 'w', encoding='utf-8') as f:
    f.write(results.export('csv'))
print("CSV文件导出成功")

# 2. 导出为JSON文件
with open('students.json', 'w', encoding='utf-8') as f:
    f.write(results.export('json'))
print("JSON文件导出成功")

# 3. 导出为Excel文件(需要确保tablib支持,若报错需安装openpyxl)
try:
    with open('students.xlsx', 'wb') as f:
        f.write(results.export('xlsx'))
    print("Excel文件导出成功")
except Exception as e:
    print(f"Excel导出失败,请安装openpyxl:{e}")
    # 安装命令:pip install openpyxl

代码说明

  1. results.export()方法接收一个格式参数,支持的格式包括csvjsonxlsxyaml等。
  2. 导出CSV和JSON时,直接以文本形式写入文件;导出Excel时,需要以二进制模式(wb)写入,且需安装openpyxl库。
  3. 导出的文件可直接用Excel、文本编辑器等打开,方便数据分享与分析。

3.4 执行事务操作

在数据库操作中,事务用于确保一系列操作的原子性(要么全部成功,要么全部失败)。Records支持通过db.transaction()方法开启事务。

实例代码:事务操作示例

import records

db = records.Database('sqlite:///test.db')

# 开启事务
with db.transaction() as tx:
    try:
        # 执行多条SQL语句
        tx.query("INSERT INTO students (name, age, gender) VALUES ('赵六', 21, '男')")
        tx.query("UPDATE students SET age = 22 WHERE name = '王五'")
        # 事务会自动提交
        print("事务执行成功,数据已提交")
    except Exception as e:
        # 发生异常时事务自动回滚
        print(f"事务执行失败,数据已回滚:{e}")

代码说明

  1. 使用with db.transaction() as tx上下文管理器开启事务,在with代码块内执行的所有SQL语句都属于同一个事务。
  2. 如果代码块内没有发生异常,事务会自动提交;如果发生异常(如SQL语法错误、主键冲突等),事务会自动回滚,确保数据一致性。
  3. 事务操作适用于需要批量执行多个SQL语句的场景,如转账、订单创建等。

3.5 关闭数据库连接

虽然Records会自动管理数据库连接,但在程序结束时手动关闭连接是良好的编程习惯,可释放资源。

实例代码:关闭数据库连接

import records

db = records.Database('sqlite:///test.db')

# 执行数据库操作...
results = db.query("SELECT * FROM students")
print(results.all())

# 关闭数据库连接
db.close()
print("数据库连接已关闭")

代码说明:调用db.close()方法即可关闭数据库连接,关闭后db对象无法再执行任何操作。

四、Records库高级应用实例

下面我们结合一个实际的数据分析场景,展示Records库的综合使用方法:从MySQL数据库中查询销售数据,进行简单的统计分析,并将结果导出为Excel文件。

4.1 场景需求

假设我们有一个MySQL数据库sales_db,其中包含sales表,表结构如下:

| 字段名 | 类型 | 说明 |
|–|||
| id | INT | 订单ID(主键) |
| product | VARCHAR | 产品名称 |
| amount | DECIMAL | 销售金额 |
| sale_date | DATE | 销售日期 |

需求:查询2024年1月的销售数据,统计每个产品的总销售额,并导出为Excel文件。

4.2 实例代码

import records

# 连接MySQL数据库(替换为你的数据库信息)
db_config = {
    "user": "root",
    "password": "123456",
    "host": "localhost",
    "port": 3306,
    "dbname": "sales_db"
}
conn_str = f"mysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
db = records.Database(conn_str)

# 1. 查询2024年1月的销售数据
query_sql = """
SELECT product, SUM(amount) AS total_amount
FROM sales
WHERE sale_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY product
ORDER BY total_amount DESC
"""
results = db.query(query_sql)

# 2. 打印统计结果
print("2024年1月产品销售统计:")
for row in results:
    print(f"产品:{row.product}, 总销售额:{row.total_amount}元")

# 3. 导出为Excel文件
try:
    with open('202401_sales_report.xlsx', 'wb') as f:
        f.write(results.export('xlsx'))
    print("销售报表已导出为202401_sales_report.xlsx")
except Exception as e:
    print(f"导出失败:{e}")
    print("请执行 pip install openpyxl 安装依赖库")

# 4. 关闭连接
db.close()

代码说明

  1. 首先构建MySQL的连接字符串,替换为实际的用户名、密码、主机等信息。
  2. 执行SELECT语句并使用GROUP BY统计每个产品的总销售额,ORDER BY按销售额降序排列。
  3. 将统计结果导出为Excel文件,方便业务人员查看和分析。
  4. 最后关闭数据库连接,释放资源。

五、Records库常见问题与解决方案

5.1 连接MySQL时提示“找不到驱动”

问题现象:执行代码时出现No module named 'MySQLdb'错误。
解决方案:安装MySQL驱动mysqlclient,命令为pip install mysqlclient。若安装失败,可尝试安装pymysql并修改连接字符串:mysql+pymysql://用户名:密码@主机:端口/数据库名

5.2 导出Excel时提示“不支持的格式”

问题现象:执行results.export('xlsx')时出现ExportError: No module named 'openpyxl'错误。
解决方案:安装openpyxl库,命令为pip install openpyxl

5.3 事务回滚失效

问题现象:事务执行过程中发生异常,但数据仍被修改。
解决方案:确保所有SQL操作都在with db.transaction() as tx代码块内通过tx.query()执行,而非db.query()tx对象是事务内的连接对象,只有通过它执行的操作才会被纳入事务管理。

六、Records库相关资源

  • PyPI地址:https://pypi.org/project/records
  • Github地址:https://github.com/kennethreitz/records
  • 官方文档地址:https://records.readthedocs.io

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:pygsheets轻松操作Google Sheets

一、pygsheets库核心概述

pygsheets是一款专门用于Python程序与Google Sheets进行交互的第三方库,它的核心用途是实现对Google表格的创建、读取、修改、更新等操作,无需借助繁琐的手动操作或复杂的API调用流程。其工作原理是基于Google Sheets API v4进行封装,将复杂的接口请求转化为简洁的Python方法,开发者只需通过简单的代码调用即可完成与Google表格的交互。该库的优点在于语法简洁、功能全面,支持批量数据操作和单元格格式设置,同时兼容多种数据类型;缺点是需要配置Google Cloud平台的相关凭证,对新手而言存在一定的入门门槛。pygsheets采用MIT开源许可证,允许开发者自由用于商业和非商业项目。

二、pygsheets库安装与环境配置

2.1 库的安装

对于技术小白来说,pygsheets的安装流程非常简单,只需使用Python的包管理工具pip即可完成。打开命令行终端,输入以下命令:

pip install pygsheets

执行完毕后,pip会自动下载并安装pygsheets及其依赖的相关库,如google-api-python-client、oauth2client等。安装完成后,我们可以在Python环境中通过导入语句验证是否安装成功:

import pygsheets
print(pygsheets.__version__)

如果运行后能够输出pygsheets的版本号,说明安装已经成功。

2.2 Google Cloud凭证配置

由于pygsheets操作的是Google Sheets云端表格,因此必须先完成Google Cloud平台的凭证配置,获取对应的授权文件,具体步骤如下:

  1. 登录Google Cloud Console,创建一个新的项目,项目名称可以自定义,例如“pygsheets-demo”。
  2. 在项目中搜索并启用Google Sheets API,搜索框输入“Google Sheets API”,找到后点击“启用”按钮。
  3. 进入“API和服务”->“凭据”页面,点击“创建凭据”->“服务账号密钥”。
  4. 创建一个新的服务账号,填写服务账号名称,角色选择“Editor”(编辑权限),密钥类型选择“JSON”,点击创建后,浏览器会自动下载一个JSON格式的凭证文件,我们需要将这个文件保存到本地,例如命名为“credentials.json”。
  5. 打开下载的JSON凭证文件,找到其中的“client_email”字段对应的邮箱地址,将这个邮箱地址添加到目标Google Sheets表格的共享列表中,并授予编辑权限,这样pygsheets才能通过该凭证操作这个表格。

三、pygsheets核心功能与代码实例

3.1 连接Google Sheets并打开表格

使用pygsheets的第一步是通过凭证文件建立与Google Sheets的连接,然后打开指定的表格。这里我们需要用到pygsheets.authorize()方法,该方法会读取本地的凭证文件完成授权。

import pygsheets

# 授权连接,传入凭证文件路径
gc = pygsheets.authorize(service_file='credentials.json')

# 方式1:通过表格名称打开已存在的表格
sh = gc.open('My Google Sheet')  # 'My Google Sheet'是Google云端的表格名称

# 方式2:通过表格的ID打开表格(表格ID在表格URL中,格式为https://docs.google.com/spreadsheets/d/表格ID/edit)
# sh = gc.open_by_key('1Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')

# 方式3:打开最近使用的表格
# sh = gc.open_last()

代码说明authorize()方法会根据传入的凭证文件完成身份验证,返回一个授权后的客户端对象gc;通过gcopen()open_by_key()等方法可以打开云端的表格,返回表格对象sh,后续所有操作都基于这个对象展开。

3.2 创建新表格与工作表

pygsheets支持创建新的Google Sheets表格,也可以在已有表格中创建新的工作表(Sheet)。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')

# 创建新的云端表格,参数为表格名称
new_sh = gc.create('New Pygsheets Sheet')
print(f'新表格创建成功,URL为:{new_sh.url}')

# 在新表格中创建新的工作表,参数为工作表名称、行数、列数
new_ws = new_sh.add_worksheet('New Worksheet', rows=100, cols=20)

# 创建工作表后,可以删除默认的第一个工作表(名为Sheet1)
default_ws = new_sh.worksheet_by_title('Sheet1')
new_sh.del_worksheet(default_ws)

代码说明gc.create()方法会在Google云端创建一个新的表格,返回新表格对象new_sh,通过new_sh.url可以获取表格的访问链接;add_worksheet()方法用于在表格中添加新的工作表,指定名称、行数和列数;del_worksheet()方法则用于删除指定的工作表,删除前需要通过worksheet_by_title()方法获取对应的工作表对象。

3.3 工作表的基础操作

工作表是我们存放和操作数据的主要载体,pygsheets提供了丰富的工作表操作方法,包括选择工作表、获取工作表属性、清空工作表等。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')

# 选择指定名称的工作表
ws = sh.worksheet_by_title('Sheet1')

# 选择索引为0的工作表(索引从0开始,对应第一个工作表)
# ws = sh[0]

# 获取工作表的行数和列数
rows = ws.rows
cols = ws.cols
print(f'当前工作表行数:{rows},列数:{cols}')

# 获取工作表的所有数据,返回二维列表格式
all_data = ws.get_all_values()
print(f'工作表所有数据:{all_data}')

# 清空工作表的所有数据
ws.clear()
print('工作表数据已清空')

代码说明worksheet_by_title()方法通过工作表名称选择目标工作表,也可以通过索引的方式直接选择;rowscols属性分别返回工作表的行数和列数;get_all_values()方法会读取工作表中的所有数据,以二维列表的形式返回,每一行对应列表中的一个子列表;clear()方法用于清空工作表的所有内容。

3.4 单元格数据读写操作

单元格是工作表的最小数据单元,pygsheets支持对单个单元格、多个单元格进行数据的读取和写入操作。

3.4.1 单个单元格操作

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')
ws = sh[0]

# 方式1:通过行列索引获取单元格(索引从1开始)
cell = ws.cell('A1')
# 写入数据到单元格
cell.value = 'Hello pygsheets'
# 读取单元格数据
print(f'A1单元格数据:{cell.value}')

# 方式2:直接使用get_value和update_value方法
# 读取A2单元格数据
a2_value = ws.get_value('A2')
print(f'A2单元格数据:{a2_value}')
# 向A2单元格写入数据
ws.update_value('A2', 'Python Google Sheets')

代码说明cell()方法通过单元格地址(如A1)获取单元格对象,通过修改对象的value属性写入数据;get_value()update_value()方法可以直接读取和修改指定单元格的数据,无需获取单元格对象,操作更加简洁。

3.4.2 多个单元格批量操作

当需要处理大量数据时,批量操作可以显著提高效率,pygsheets支持对单元格区域进行批量读写。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')
ws = sh[0]

# 定义要写入的数据(二维列表,对应多行多列)
data = [
    ['姓名', '年龄', '城市'],
    ['张三', 25, '北京'],
    ['李四', 30, '上海'],
    ['王五', 28, '广州']
]

# 批量写入数据到A1:C4区域
ws.update_values('A1:C4', data)
print('批量数据写入完成')

# 批量读取A1:C4区域的数据
range_data = ws.get_values('A1:C4')
print(f'读取的区域数据:{range_data}')

# 批量修改单元格格式(将A1单元格设置为加粗,字体大小14)
ws.cell('A1').set_text_format('bold', True)
ws.cell('A1').set_text_format('fontSize', 14)

代码说明update_values()方法接收单元格区域和二维列表数据,将数据批量写入指定区域;get_values()方法读取指定区域的所有数据,返回二维列表;通过set_text_format()方法可以设置单元格的文本格式,如加粗、字体大小等。

3.5 数据筛选与排序

pygsheets支持对工作表中的数据进行筛选和排序,方便快速处理和分析数据。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')
ws = sh[0]

# 假设工作表中已有数据:A列姓名,B列年龄,C列城市
# 筛选年龄大于25的行数据
# 第一步:获取所有数据
all_data = ws.get_all_values(include_tailing_empty=False)
# 第二步:筛选数据(跳过表头)
filtered_data = [all_data[0]] + [row for row in all_data[1:] if int(row[1]) > 25]
print(f'年龄大于25的数据:{filtered_data}')

# 将筛选后的数据写入新的工作表
new_ws = sh.add_worksheet('Filtered Data', rows=len(filtered_data), cols=3)
new_ws.update_values('A1:C{}'.format(len(filtered_data)), filtered_data)

# 对数据按年龄降序排序
# 跳过表头,对数据行排序
sorted_data = [all_data[0]] + sorted(all_data[1:], key=lambda x: int(x[1]), reverse=True)
print(f'按年龄降序排序后的数据:{sorted_data}')

代码说明get_all_values()方法的include_tailing_empty参数设置为False,可以忽略末尾的空行;通过列表推导式可以实现简单的数据筛选;sorted()函数结合匿名函数可以对数据按指定列进行排序;最后将处理后的数据写入新的工作表,完成数据的二次整理。

四、pygsheets实际应用案例:数据统计与报表生成

4.1 案例场景

假设我们需要从一个Google Sheets表格中读取销售数据,统计每个产品的总销售额,然后将统计结果写入新的工作表,生成销售报表。

4.2 案例代码

import pygsheets

def generate_sales_report(credential_path, sheet_name):
    # 授权连接Google Sheets
    gc = pygsheets.authorize(service_file=credential_path)
    sh = gc.open(sheet_name)
    # 读取销售数据工作表
    sales_ws = sh.worksheet_by_title('销售数据')
    # 获取所有销售数据,跳过表头
    sales_data = sales_ws.get_all_values(include_tailing_empty=False)[1:]

    # 统计每个产品的总销售额
    sales_report = {}
    for row in sales_data:
        product_name = row[0]  # A列:产品名称
        quantity = int(row[1]) # B列:销售数量
        price = float(row[2])  # C列:单价
        total_sales = quantity * price

        if product_name in sales_report:
            sales_report[product_name] += total_sales
        else:
            sales_report[product_name] = total_sales

    # 准备报表数据
    report_data = [['产品名称', '总销售额(元)']]
    for product, total in sales_report.items():
        report_data.append([product, round(total, 2)])

    # 创建报表工作表
    if sh.worksheet_by_title('销售报表'):
        report_ws = sh.worksheet_by_title('销售报表')
        report_ws.clear()
    else:
        report_ws = sh.add_worksheet('销售报表', rows=len(report_data), cols=2)

    # 写入报表数据
    report_ws.update_values('A1:B{}'.format(len(report_data)), report_data)
    # 设置报表表头格式
    header_cell = report_ws.cell('A1')
    header_cell.set_text_format('bold', True)
    header_cell.set_text_format('fontSize', 12)
    header_cell = report_ws.cell('B1')
    header_cell.set_text_format('bold', True)
    header_cell.set_text_format('fontSize', 12)

    print('销售报表生成完成!')

# 调用函数生成报表
generate_sales_report('credentials.json', '产品销售统计')

代码说明:该案例定义了一个generate_sales_report函数,接收凭证文件路径和表格名称作为参数;函数首先读取“销售数据”工作表中的数据,然后通过字典统计每个产品的总销售额;接着创建或清空“销售报表”工作表,将统计结果写入其中,并设置表头格式;最后完成销售报表的生成。这个案例充分体现了pygsheets在数据处理和报表生成场景中的实用价值。

五、pygsheets相关资源地址

  • Pypi地址:https://pypi.org/project/pygsheets
  • Github地址:https://github.com/nithinmurali/pygsheets
  • 官方文档地址:https://pygsheets.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。