Python实用工具:cx-Oracle 零基础入门教程

一、cx-Oracle 库核心介绍

cx-Oracle 是一款专门用于 Python 程序连接 Oracle 数据库的扩展库,能够实现对 Oracle 数据库的查询、插入、更新、删除等各类操作。其工作原理是基于 Oracle Call Interface(OCI)构建,通过调用 Oracle 客户端的底层接口,建立 Python 与 Oracle 数据库之间的通信桥梁,实现数据的高效交互。

该库的优点十分突出,兼容性强,支持 Python 3.x 系列版本和多种 Oracle 数据库版本,数据传输效率高,能直接处理 Oracle 特有的数据类型;缺点则是安装时需要依赖 Oracle 客户端库,配置步骤相对繁琐,且仅专注于 Oracle 数据库,不支持其他类型数据库。cx-Oracle 采用 BSD 开源许可证,用户可自由下载、使用、修改和分发,无商业授权限制。

二、cx-Oracle 安装与环境配置

2.1 安装前准备

在安装 cx-Oracle 之前,必须先安装 Oracle 客户端库,这是连接 Oracle 数据库的前提条件。Oracle 提供了两种轻量级客户端包供选择:

  • Oracle Instant Client:适用于大多数场景,体积小,安装便捷,可从 Oracle 官网下载对应操作系统版本(Windows、Linux、macOS)。
  • Oracle Full Client:功能更全面,包含更多开发工具,适合专业数据库开发人员。

以 Windows 系统为例,安装 Oracle Instant Client 的步骤如下:

  1. 访问 Oracle 官网下载页面(https://www.oracle.com/database/technologies/instant-client/downloads.html),选择与自己系统位数(32 位/64 位)匹配的 Instant Client 包。
  2. 将下载的压缩包解压到指定目录,例如 D:\oracle\instantclient_21_9
  3. 配置系统环境变量:
    • 新增环境变量 ORACLE_HOME,值为解压路径 D:\oracle\instantclient_21_9
    • D:\oracle\instantclient_21_9 添加到系统 PATH 环境变量中。

2.2 安装 cx-Oracle 库

完成 Oracle 客户端配置后,即可通过 Python 的包管理工具 pip 安装 cx-Oracle,打开命令提示符(CMD)或终端,执行以下命令:

pip install cx-Oracle

安装完成后,可在 Python 交互环境中执行 import cx_Oracle 测试是否安装成功,若没有报错,则说明安装完成。

三、cx-Oracle 核心使用方法与代码实例

3.1 建立数据库连接

使用 cx-Oracle 连接 Oracle 数据库,需要提供用户名密码数据库连接字符串。连接字符串的格式通常为 主机名/IP地址:端口号/服务名,具体格式需根据数据库配置调整。

代码实例

import cx_Oracle

# 数据库连接信息
username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"  # 本地数据库示例,orcl为服务名

# 建立连接
try:
    connection = cx_Oracle.connect(username, password, dsn)
    print("数据库连接成功!")
except cx_Oracle.Error as error:
    print(f"数据库连接失败:{error}")
finally:
    # 关闭连接
    if 'connection' in locals() and connection:
        connection.close()
        print("连接已关闭")

代码说明

  • 首先导入 cx_Oracle 库,定义数据库的用户名、密码和连接字符串 dsn
  • 使用 cx_Oracle.connect() 方法建立连接,该方法返回一个连接对象。
  • 通过 try-except 捕获连接过程中可能出现的异常,例如用户名密码错误、网络不通等。
  • 最后在 finally 块中关闭连接,确保无论连接是否成功,都能释放资源。

3.2 执行基础 SQL 查询

建立数据库连接后,需要创建游标对象来执行 SQL 语句。游标对象是 cx-Oracle 执行 SQL 操作的核心载体,支持查询、插入、更新等操作。

代码实例:查询表数据

import cx_Oracle

# 数据库连接信息
username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

# 建立连接并创建游标
connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()  # 创建游标对象

    # 执行查询 SQL 语句
    sql = "SELECT empno, ename, job, sal FROM emp WHERE deptno = :deptno"
    deptno = 20  # 查询20号部门的员工信息
    cursor.execute(sql, deptno=deptno)  # 绑定参数,防止SQL注入

    # 获取查询结果的两种方式
    # 方式1:逐行获取
    print("20号部门员工信息(逐行获取):")
    for row in cursor:
        empno, ename, job, sal = row
        print(f"员工编号:{empno}, 姓名:{ename}, 职位:{job}, 薪资:{sal}")

    # 方式2:一次性获取所有结果
    cursor.execute(sql, deptno=deptno)
    rows = cursor.fetchall()  # 获取所有行数据
    print("\n20号部门员工信息(一次性获取):")
    for row in rows:
        print(f"员工编号:{row[0]}, 姓名:{row[1]}, 职位:{row[2]}, 薪资:{row[3]}")

except cx_Oracle.Error as error:
    print(f"执行查询失败:{error}")
finally:
    # 关闭游标和连接
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 游标对象通过 connection.cursor() 创建,后续所有 SQL 操作都通过游标执行。
  • 执行查询语句时,使用 cursor.execute(sql, 参数) 方法,其中 :deptno 是参数占位符,通过传入 deptno=20 绑定参数,这种方式能有效防止 SQL 注入攻击。
  • 获取查询结果有两种常用方式:
  1. 直接遍历游标对象,逐行读取数据。
  2. 使用 cursor.fetchall() 方法一次性获取所有结果,返回一个包含所有行的列表,每行数据是一个元组。
  • 操作完成后,需依次关闭游标和连接,释放资源。

3.3 执行数据插入操作

cx-Oracle 支持向 Oracle 数据库插入单条或多条数据,插入操作同样通过游标对象执行,执行后需要调用 connection.commit() 提交事务,否则数据不会真正写入数据库。

代码实例:插入单条数据

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 插入数据的 SQL 语句
    insert_sql = """
        INSERT INTO emp (empno, ename, job, mgr, hiredate, sal, comm, deptno)
        VALUES (:empno, :ename, :job, :mgr, :hiredate, :sal, :comm, :deptno)
    """

    # 定义要插入的数据
    emp_data = {
        "empno": 7999,
        "ename": "LIU",
        "job": "CLERK",
        "mgr": 7788,
        "hiredate": cx_Oracle.Date(2024, 1, 10),  # Oracle日期类型
        "sal": 2500,
        "comm": None,
        "deptno": 20
    }

    # 执行插入操作
    cursor.execute(insert_sql, **emp_data)
    connection.commit()  # 提交事务
    print(f"成功插入 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()  # 出错时回滚事务
    print(f"插入数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 插入 SQL 语句中使用多个参数占位符(:empno:ename 等),通过字典 emp_data 传递参数,使用 ** 解包字典。
  • 对于 Oracle 的日期类型,需要使用 cx_Oracle.Date() 方法创建对应的日期对象,确保数据类型匹配。
  • 执行插入操作后,必须调用 connection.commit() 提交事务,否则数据不会持久化到数据库;若出现异常,需调用 connection.rollback() 回滚事务,避免数据不一致。
  • cursor.rowcount 属性可以获取受影响的行数,用于判断插入操作是否成功。

代码实例:批量插入数据
当需要插入大量数据时,使用 cursor.executemany() 方法可以显著提高效率,该方法支持批量执行 SQL 语句。

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 批量插入的 SQL 语句
    batch_insert_sql = """
        INSERT INTO emp (empno, ename, job, deptno)
        VALUES (:empno, :ename, :job, :deptno)
    """

    # 批量数据列表
    batch_data = [
        (8001, "ZHANG", "ANALYST", 20),
        (8002, "WANG", "SALESMAN", 30),
        (8003, "ZHAO", "MANAGER", 10)
    ]

    # 执行批量插入
    cursor.executemany(batch_insert_sql, batch_data)
    connection.commit()
    print(f"成功批量插入 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"批量插入失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • cursor.executemany() 方法接收两个参数,第一个是 SQL 语句,第二个是包含多条数据的列表,列表中的每个元素是一个元组,对应 SQL 语句中的参数。
  • 批量插入相比多次执行单条插入,减少了网络交互和数据库事务的开销,效率更高,适合大数据量的插入场景。

3.4 执行数据更新与删除操作

数据的更新和删除操作与插入操作类似,都是通过游标执行 SQL 语句,然后提交事务。

代码实例:更新数据

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 更新 SQL 语句:更新7999号员工的薪资
    update_sql = "UPDATE emp SET sal = :new_sal WHERE empno = :empno"
    new_sal = 3000
    empno = 7999

    cursor.execute(update_sql, new_sal=new_sal, empno=empno)
    connection.commit()
    print(f"成功更新 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"更新数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码实例:删除数据

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 删除 SQL 语句:删除8003号员工
    delete_sql = "DELETE FROM emp WHERE empno = :empno"
    empno = 8003

    cursor.execute(delete_sql, empno=empno)
    connection.commit()
    print(f"成功删除 {cursor.rowcount} 条数据")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"删除数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 更新和删除操作的 SQL 语句同样使用参数占位符,避免 SQL 注入。
  • 执行后通过 cursor.rowcount 查看受影响的行数,若行数为 0,说明没有符合条件的数据。

3.5 处理 Oracle 特有的数据类型

Oracle 数据库包含一些特有的数据类型,例如 NUMBERDATETIMESTAMPCLOB 等,cx-Oracle 提供了对应的处理方式。

代码实例:处理 CLOB 大文本类型

import cx_Oracle

username = "scott"
password = "tiger"
dsn = "127.0.0.1:1521/orcl"

connection = None
try:
    connection = cx_Oracle.connect(username, password, dsn)
    cursor = connection.cursor()

    # 假设存在一个表 test_clob,包含 id 和 content 字段,content 为 CLOB 类型
    # 插入 CLOB 数据
    insert_clob_sql = "INSERT INTO test_clob (id, content) VALUES (:id, :content)"
    clob_content = cx_Oracle.CLOB(connection)  # 创建 CLOB 对象
    clob_content.write("这是一段很长的文本内容,用于测试 CLOB 数据类型的处理方式。" * 100)

    cursor.execute(insert_clob_sql, id=1, content=clob_content)
    connection.commit()
    print("CLOB 数据插入成功")

    # 查询 CLOB 数据
    select_clob_sql = "SELECT content FROM test_clob WHERE id = :id"
    cursor.execute(select_clob_sql, id=1)
    clob_data = cursor.fetchone()[0]
    print(f"CLOB 数据内容(前200字):{clob_data.read()[:200]}")

except cx_Oracle.Error as error:
    connection.rollback()
    print(f"处理 CLOB 数据失败:{error}")
finally:
    if 'cursor' in locals() and cursor:
        cursor.close()
    if 'connection' in locals() and connection:
        connection.close()

代码说明

  • 对于 CLOB 类型的数据,需要先通过 cx_Oracle.CLOB(connection) 创建 CLOB 对象,然后使用 write() 方法写入文本内容。
  • 查询 CLOB 数据时,获取到的是 CLOB 对象,通过 read() 方法可以读取其中的文本内容。

四、cx-Oracle 实际应用案例:员工薪资管理系统

下面结合一个简单的员工薪资管理系统案例,综合展示 cx-Oracle 的使用方法。该案例实现以下功能:

  1. 连接 Oracle 数据库。
  2. 查询指定部门的员工薪资信息。
  3. 给指定员工涨薪。
  4. 新增员工记录。

完整代码实例

import cx_Oracle
from datetime import datetime

class EmpSalaryManager:
    def __init__(self, username, password, dsn):
        """初始化数据库连接信息"""
        self.username = username
        self.password = password
        self.dsn = dsn
        self.connection = None
        self.cursor = None

    def connect_db(self):
        """建立数据库连接"""
        try:
            self.connection = cx_Oracle.connect(self.username, self.password, self.dsn)
            self.cursor = self.connection.cursor()
            print("数据库连接成功!")
        except cx_Oracle.Error as error:
            print(f"连接失败:{error}")
            raise

    def disconnect_db(self):
        """关闭数据库连接"""
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()
        print("数据库连接已关闭")

    def query_dept_salary(self, deptno):
        """查询指定部门的员工薪资信息"""
        try:
            sql = "SELECT empno, ename, sal, hiredate FROM emp WHERE deptno = :deptno"
            self.cursor.execute(sql, deptno=deptno)
            results = self.cursor.fetchall()
            if not results:
                print(f"未查询到{deptno}号部门的员工信息")
                return
            print(f"\n{deptno}号部门员工薪资信息:")
            print("-" * 50)
            print(f"{'员工编号':<10}{'姓名':<10}{'薪资':<10}{'入职日期':<15}")
            print("-" * 50)
            for empno, ename, sal, hiredate in results:
                # 格式化日期显示
                hiredate_str = hiredate.strftime("%Y-%m-%d") if hiredate else "未知"
                print(f"{empno:<10}{ename:<10}{sal:<10}{hiredate_str:<15}")
        except cx_Oracle.Error as error:
            print(f"查询失败:{error}")

    def update_salary(self, empno, add_sal):
        """给指定员工涨薪"""
        try:
            # 先查询原薪资
            check_sql = "SELECT sal FROM emp WHERE empno = :empno"
            self.cursor.execute(check_sql, empno=empno)
            result = self.cursor.fetchone()
            if not result:
                print(f"未找到编号为{empno}的员工")
                return
            old_sal = result[0]
            new_sal = old_sal + add_sal

            # 更新薪资
            update_sql = "UPDATE emp SET sal = :new_sal WHERE empno = :empno"
            self.cursor.execute(update_sql, new_sal=new_sal, empno=empno)
            self.connection.commit()
            print(f"\n员工{empno}涨薪成功!原薪资:{old_sal}, 新薪资:{new_sal}")
        except cx_Oracle.Error as error:
            self.connection.rollback()
            print(f"涨薪失败:{error}")

    def add_employee(self, emp_data):
        """新增员工记录"""
        try:
            insert_sql = """
                INSERT INTO emp (empno, ename, job, mgr, hiredate, sal, comm, deptno)
                VALUES (:empno, :ename, :job, :mgr, :hiredate, :sal, :comm, :deptno)
            """
            self.cursor.execute(insert_sql, **emp_data)
            self.connection.commit()
            print(f"\n新增员工{emp_data['ename']}成功!员工编号:{emp_data['empno']}")
        except cx_Oracle.Error as error:
            self.connection.rollback()
            print(f"新增员工失败:{error}")

# 主程序入口
if __name__ == "__main__":
    # 数据库配置
    username = "scott"
    password = "tiger"
    dsn = "127.0.0.1:1521/orcl"

    # 创建薪资管理对象
    manager = EmpSalaryManager(username, password, dsn)

    try:
        # 连接数据库
        manager.connect_db()

        # 1. 查询20号部门的薪资信息
        manager.query_dept_salary(20)

        # 2. 给7999号员工涨薪500
        manager.update_salary(7999, 500)

        # 3. 新增员工
        new_emp = {
            "empno": 8004,
            "ename": "CHEN",
            "job": "ENGINEER",
            "mgr": 7782,
            "hiredate": cx_Oracle.Date.today(),
            "sal": 4000,
            "comm": 0,
            "deptno": 20
        }
        manager.add_employee(new_emp)

        # 再次查询20号部门信息,查看新增和更新结果
        manager.query_dept_salary(20)

    except Exception as e:
        print(f"程序执行异常:{e}")
    finally:
        # 关闭连接
        manager.disconnect_db()

代码说明

  • 该案例通过面向对象的方式封装了员工薪资管理的功能,EmpSalaryManager 类包含连接数据库、查询薪资、更新薪资、新增员工等方法,代码结构清晰,便于维护和扩展。
  • update_salary 方法中,先查询员工原薪资,再计算新薪资并更新,确保操作的准确性;在 add_employee 方法中,使用 cx_Oracle.Date.today() 获取当前日期作为入职日期。
  • 主程序中调用类的方法,依次执行查询、涨薪、新增员工操作,并再次查询验证结果,展示了 cx-Oracle 在实际项目中的综合应用。

五、相关资源地址

  • Pypi地址:https://pypi.org/project/cx-Oracle
  • Github地址:https://github.com/oracle/python-cx_Oracle
  • 官方文档地址:https://cx-oracle.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:influxdb库入门与实战教程

一、influxdb库核心介绍

1.1 用途

influxdb是Python生态中专门用于对接InfluxDB时序数据库的客户端库,能够实现数据的写入、查询、修改和删除等操作,广泛应用于物联网监控、系统性能指标采集、金融行情数据存储等需要处理时序数据的场景。

1.2 工作原理

该库通过HTTP/HTTPS协议与InfluxDB服务端建立连接,将Python中的数据结构(如字典、列表)转换为InfluxDB支持的Line Protocol格式进行写入;查询时则发送InfluxQL或Flux查询语句,再将返回的结果集解析为Python可处理的对象(如DataFrame)。

1.3 优缺点

优点:轻量易用,API设计贴合Python开发者习惯;支持与Pandas无缝集成,便于数据处理;兼容InfluxDB多个版本。
缺点:对最新版InfluxDB的部分高级特性支持滞后;高并发写入场景下性能需依赖额外优化。

1.4 License类型

influxdb库采用MIT License开源协议,允许自由使用、修改和分发,无论是个人项目还是商业应用都无需支付授权费用。

二、influxdb库安装与环境准备

2.1 安装方式

安装influxdb库的方式非常简单,直接使用Python的包管理工具pip即可完成安装。在命令行中输入以下命令:

pip install influxdb

如果需要安装指定版本以适配特定的InfluxDB服务端,可指定版本号,例如安装5.3.1版本:

pip install influxdb==5.3.1

安装完成后,我们可以在Python环境中验证是否安装成功,执行以下代码:

import influxdb
print(influxdb.__version__)

若控制台输出对应的版本号,说明安装成功。

2.2 环境依赖说明

使用influxdb库前,需要确保本地或远程有可用的InfluxDB服务端。推荐使用InfluxDB 1.x系列版本(2.x版本有单独的客户端库influxdb-client),同时Python环境需满足3.6及以上版本,避免因版本过低导致兼容性问题。

三、influxdb库核心API使用实战

3.1 建立与InfluxDB的连接

在进行任何数据操作前,首先需要建立与InfluxDB数据库的连接。influxdb库提供InfluxDBClient类来实现这一功能,核心参数包括host(服务端地址)、port(端口号)、username(用户名)、password(密码)、database(目标数据库名)等。
示例代码

from influxdb import InfluxDBClient

# 初始化客户端连接
client = InfluxDBClient(
    host='localhost',  # InfluxDB服务端IP
    port=8086,         # 默认端口号
    username='admin',  # 数据库用户名
    password='admin123',  # 数据库密码
    database='test_db' # 要连接的数据库名
)

# 测试连接是否成功
print("连接状态:", "成功" if client.ping() else "失败")

代码说明

  1. 导入InfluxDBClient类后,传入服务端的连接信息初始化客户端对象。
  2. 调用ping()方法测试与服务端的连通性,该方法返回True表示连接成功。
  3. 若连接失败,通常需要检查服务端是否启动、地址和端口是否正确、用户名密码是否匹配。

3.2 创建与删除数据库

连接成功后,我们可以通过客户端对象创建新的数据库,或者删除已有的数据库。

3.2.1 创建数据库

示例代码

# 定义要创建的数据库名
db_name = "iot_monitor"

# 检查数据库是否存在
existing_dbs = client.get_list_database()
db_exists = any(db['name'] == db_name for db in existing_dbs)

if not db_exists:
    client.create_database(db_name)
    print(f"数据库 {db_name} 创建成功")
else:
    print(f"数据库 {db_name} 已存在")

# 切换到新创建的数据库
client.switch_database(db_name)

代码说明

  1. get_list_database()方法会返回所有已存在的数据库列表,格式为[{"name": "db_name1"}, ...]
  2. 通过遍历列表判断目标数据库是否存在,避免重复创建。
  3. create_database()方法用于创建新数据库,switch_database()方法用于切换到目标数据库进行后续操作。

3.2.2 删除数据库

示例代码

# 定义要删除的数据库名
db_to_delete = "test_db"

# 检查数据库是否存在
existing_dbs = client.get_list_database()
db_exists = any(db['name'] == db_to_delete for db in existing_dbs)

if db_exists:
    client.drop_database(db_to_delete)
    print(f"数据库 {db_to_delete} 删除成功")
else:
    print(f"数据库 {db_to_delete} 不存在")

代码说明drop_database()方法接收数据库名作为参数,执行后会删除对应的数据库及其所有数据,操作前需谨慎确认。

3.3 写入时序数据到InfluxDB

时序数据的写入是influxdb库的核心功能之一,数据需要按照Line Protocol的格式组织,该格式的核心结构为:measurement,tag_set field_set timestamp。在Python中,我们可以通过字典列表的形式定义数据,再调用write_points()方法写入。

3.3.1 写入单条数据

示例代码

# 定义要写入的数据
data_point = [
    {
        "measurement": "temperature",  # 测量值名称,类似表名
        "tags": {
            "device_id": "sensor_001",  # 标签,用于分组查询
            "location": "room_101"
        },
        "fields": {
            "value": 25.6,  # 字段,存储具体数值(必须是数值类型)
            "unit": "°C"    # 字段可以是字符串、整数、浮点数
        },
        "time": "2024-01-01T12:00:00Z"  # 时间戳,可选,默认使用当前时间
    }
]

# 写入数据
write_result = client.write_points(data_point)
print(f"数据写入状态: {'成功' if write_result else '失败'}")

代码说明

  1. measurement对应时序数据的“表”,用于分类存储不同类型的数据。
  2. tags是标签字段,为字符串类型,支持索引,适合用于分组查询(如按设备ID、位置分组)。
  3. fields是数值字段,支持整数、浮点数、布尔值和字符串,是时序数据的核心指标。
  4. time是时间戳,格式为ISO 8601,可选参数,若不指定则使用服务端的当前时间。
  5. write_points()方法返回布尔值,表示写入操作是否成功。

3.3.2 写入多条数据

在实际场景中,我们通常需要批量写入多条数据,以提高写入效率。只需扩展字典列表的元素即可实现批量写入。
示例代码

# 定义多条时序数据
batch_data = [
    {
        "measurement": "temperature",
        "tags": {"device_id": "sensor_001", "location": "room_101"},
        "fields": {"value": 25.8, "unit": "°C"},
        "time": "2024-01-01T12:01:00Z"
    },
    {
        "measurement": "temperature",
        "tags": {"device_id": "sensor_001", "location": "room_101"},
        "fields": {"value": 26.0, "unit": "°C"},
        "time": "2024-01-01T12:02:00Z"
    },
    {
        "measurement": "humidity",
        "tags": {"device_id": "sensor_001", "location": "room_101"},
        "fields": {"value": 45.2, "unit": "%"},
        "time": "2024-01-01T12:00:00Z"
    }
]

# 批量写入数据
write_result = client.write_points(batch_data)
print(f"批量数据写入状态: {'成功' if write_result else '失败'}")

代码说明:批量写入多条数据时,不同数据可以属于同一个measurement,也可以属于不同的measurementwrite_points()方法会自动处理这些数据的分类存储。

3.4 查询InfluxDB中的时序数据

数据写入后,我们可以通过query()方法执行InfluxQL查询语句,获取存储的时序数据。查询结果可以直接转换为Pandas DataFrame,便于后续的数据分析和可视化。

3.4.1 基础查询

示例代码

# 执行InfluxQL查询语句
query_str = 'SELECT * FROM temperature WHERE device_id = \'sensor_001\' LIMIT 10'
result = client.query(query_str)

# 将查询结果转换为列表
result_list = list(result.get_points())
print("查询到的温度数据:")
for point in result_list:
    print(f"时间: {point['time']}, 设备ID: {point['device_id']}, 温度: {point['value']} {point['unit']}")

代码说明

  1. InfluxQL的语法与SQL类似,SELECT * FROM measurement表示查询该measurement下的所有数据,WHERE子句用于过滤标签或字段,LIMIT用于限制返回数据的条数。
  2. query()方法返回的是ResultSet对象,通过get_points()方法可以获取数据的迭代器,再转换为列表方便遍历。
  3. 每条数据以字典形式存储,包含timetags字段和fields字段的所有内容。

3.4.2 与Pandas集成查询

influxdb库支持直接将查询结果转换为Pandas DataFrame,这对于数据分析师来说是非常实用的功能。
示例代码

import pandas as pd

# 执行查询并转换为DataFrame
query_str = 'SELECT value, unit FROM temperature WHERE location = \'room_101\''
result = client.query(query_str)
df = pd.DataFrame(result.get_points())

# 打印DataFrame的前5行数据
print("温度数据DataFrame:")
print(df.head())

# 计算温度的平均值
avg_temp = df['value'].mean()
print(f"\nroom_101的平均温度: {avg_temp:.2f} °C")

代码说明

  1. 导入pandas库后,将ResultSet对象转换为DataFrame,数据的结构会更加清晰,便于进行统计分析。
  2. 可以直接使用Pandas的内置方法(如mean())计算数据的统计指标,快速完成数据分析任务。

3.5 删除InfluxDB中的数据

当需要清理过期或无用的数据时,可以使用delete_series()方法删除指定条件的数据。
示例代码

# 删除指定时间之前的温度数据
delete_condition = {
    "measurement": "temperature",
    "tags": {"device_id": "sensor_001"},
    "time__lt": "2024-01-01T12:01:00Z"  # 删除时间小于该时间戳的数据
}

client.delete_series(**delete_condition)
print("符合条件的数据已删除")

代码说明

  1. delete_series()方法支持通过measurementtags和时间条件过滤要删除的数据。
  2. 时间条件的参数格式为time__lt(小于)、time__lte(小于等于)、time__gt(大于)、time__gte(大于等于),后缀对应不同的比较逻辑。
  3. 数据删除操作不可逆,执行前务必确认条件是否正确。

四、实际应用案例:物联网传感器数据监控系统

4.1 案例场景介绍

本案例模拟一个物联网传感器数据监控系统,实现以下功能:

  1. 模拟传感器采集温度和湿度数据。
  2. 将采集到的数据实时写入InfluxDB。
  3. 定时查询并分析传感器数据,当温度超过阈值时输出告警信息。

4.2 完整代码实现

from influxdb import InfluxDBClient
import pandas as pd
import time
import random
from datetime import datetime, timezone

# 配置InfluxDB连接信息
INFLUXDB_HOST = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_USER = 'admin'
INFLUXDB_PWD = 'admin123'
INFLUXDB_DB = 'iot_monitor'

# 传感器配置
DEVICE_ID = 'sensor_002'
LOCATION = 'factory_workshop'
TEMPERATURE_THRESHOLD = 30.0  # 温度告警阈值
COLLECTION_INTERVAL = 5  # 数据采集间隔(秒)
QUERY_INTERVAL = 10  # 数据查询间隔(秒)

def init_influxdb_client():
    """初始化InfluxDB客户端并创建数据库"""
    client = InfluxDBClient(
        host=INFLUXDB_HOST,
        port=INFLUXDB_PORT,
        username=INFLUXDB_USER,
        password=INFLUXDB_PWD
    )
    # 创建数据库
    db_list = client.get_list_database()
    if not any(db['name'] == INFLUXDB_DB for db in db_list):
        client.create_database(INFLUXDB_DB)
    client.switch_database(INFLUXDB_DB)
    return client

def simulate_sensor_data():
    """模拟传感器采集温度和湿度数据"""
    # 生成模拟数据,温度在25-32之间波动,湿度在40-60之间波动
    temperature = round(random.uniform(25.0, 32.0), 2)
    humidity = round(random.uniform(40.0, 60.0), 2)
    # 获取当前UTC时间戳
    current_time = datetime.now(timezone.utc).isoformat()
    # 组织时序数据
    data = [
        {
            "measurement": "temperature",
            "tags": {"device_id": DEVICE_ID, "location": LOCATION},
            "fields": {"value": temperature, "unit": "°C"},
            "time": current_time
        },
        {
            "measurement": "humidity",
            "tags": {"device_id": DEVICE_ID, "location": LOCATION},
            "fields": {"value": humidity, "unit": "%"},
            "time": current_time
        }
    ]
    return data

def write_sensor_data(client, data):
    """将传感器数据写入InfluxDB"""
    try:
        client.write_points(data)
        print(f"[{datetime.now()}] 数据写入成功: 温度={data[0]['fields']['value']}°C, 湿度={data[1]['fields']['value']}%")
    except Exception as e:
        print(f"[{datetime.now()}] 数据写入失败: {str(e)}")

def query_and_alert(client):
    """查询数据并判断是否触发告警"""
    try:
        # 查询最近10条温度数据
        query_str = f"""
            SELECT value FROM temperature 
            WHERE device_id = '{DEVICE_ID}' 
            AND location = '{LOCATION}' 
            ORDER BY time DESC 
            LIMIT 10
        """
        result = client.query(query_str)
        df = pd.DataFrame(result.get_points())
        if not df.empty:
            latest_temp = df['value'].iloc[0]
            avg_temp = df['value'].mean()
            print(f"[{datetime.now()}] 最新温度: {latest_temp:.2f}°C, 近10条平均温度: {avg_temp:.2f}°C")
            # 温度超过阈值时触发告警
            if latest_temp > TEMPERATURE_THRESHOLD:
                print(f"⚠️  告警: 当前温度 {latest_temp:.2f}°C 超过阈值 {TEMPERATURE_THRESHOLD}°C!")
    except Exception as e:
        print(f"[{datetime.now()}] 数据查询失败: {str(e)}")

if __name__ == "__main__":
    # 初始化客户端
    influx_client = init_influxdb_client()
    print("物联网传感器数据监控系统启动...")
    try:
        query_counter = 0
        while True:
            # 采集并写入数据
            sensor_data = simulate_sensor_data()
            write_sensor_data(influx_client, sensor_data)
            # 每隔QUERY_INTERVAL秒查询一次数据并告警
            query_counter += 1
            if query_counter >= QUERY_INTERVAL / COLLECTION_INTERVAL:
                query_and_alert(influx_client)
                query_counter = 0
            # 等待采集间隔
            time.sleep(COLLECTION_INTERVAL)
    except KeyboardInterrupt:
        print("\n系统已停止运行")
    finally:
        # 关闭客户端连接
        influx_client.close()

4.3 代码运行说明

  1. 代码功能分解
    • init_influxdb_client()函数:初始化客户端并创建目标数据库,确保后续操作有可用的存储位置。
    • simulate_sensor_data()函数:模拟传感器生成温度和湿度数据,时间戳使用当前UTC时间,保证时序数据的准确性。
    • write_sensor_data()函数:将模拟数据写入InfluxDB,并输出写入状态。
    • query_and_alert()函数:查询最近的温度数据,计算平均值并判断是否超过阈值,超过时输出告警信息。
    • 主程序循环:按照设定的间隔采集数据、写入数据,并定时执行查询和告警逻辑。
  2. 运行效果
    运行代码后,控制台会实时输出数据写入状态,每隔10秒查询一次数据并输出温度信息,当温度超过30.0°C时,会触发告警提示。按下Ctrl+C可停止程序运行。

五、相关资源

  • Pypi地址:https://pypi.org/project/influxdb
  • Github地址:https://github.com/influxdata/influxdb-python
  • 官方文档地址:https://influxdb-python.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具Records:极简数据库操作指南

一、Records库核心概述

Records是一款轻量级Python库,专为简化SQL数据库操作而生,它基于sqlalchemytablib构建,无需繁琐的配置与类定义,一行代码即可实现数据库连接、查询与结果导出。其工作原理是封装SQLAlchemy的引擎,自动管理连接池,同时借助tablib实现查询结果的多格式导出(如CSV、JSON、Excel)。

优点:语法极简,降低数据库操作门槛;支持多种数据库(MySQL、PostgreSQL、SQLite等);内置结果集格式化功能。缺点:高级数据库操作需依赖SQLAlchemy底层接口;更新维护频率较低。该库采用MIT License,可自由用于商业与非商业项目。

二、Records库安装步骤

对于技术小白来说,Records的安装流程非常简单,只需要借助Python的包管理工具pip即可完成,无需配置复杂的环境变量。

2.1 基础安装命令

打开命令提示符(Windows)或终端(Mac/Linux),输入以下命令:

pip install records

该命令会自动下载并安装Records及其依赖库(sqlalchemytablib等)。

2.2 数据库驱动安装

Records支持多种数据库,但不同数据库需要安装对应的驱动,否则会出现连接失败的情况。以下是常用数据库的驱动安装命令:

  • SQLite:无需额外安装驱动,Python内置支持。
  • MySQL/MariaDB
pip install mysqlclient
  • PostgreSQL
pip install psycopg2-binary

三、Records库核心使用方法

Records的核心设计理念是“简洁高效”,通过Database类实现数据库的连接与操作,无需手动管理连接的开启与关闭。下面我们以最常用的SQLite数据库为例(无需配置服务,文件即可存储数据),详细讲解每一个功能的使用方法,并搭配实例代码辅助理解。

3.1 数据库连接

使用Records连接数据库的核心是传入数据库连接字符串,不同数据库的连接字符串格式不同,具体如下:

| 数据库类型 | 连接字符串格式 |
||-|
| SQLite | sqlite:///test.db(相对路径)/ sqlite:////绝对路径/test.db |
| MySQL | mysql://用户名:密码@主机:端口/数据库名 |
| PostgreSQL | postgresql://用户名:密码@主机:端口/数据库名 |

实例代码:连接SQLite数据库

import records

# 连接SQLite数据库,若test.db不存在则自动创建
db = records.Database('sqlite:///test.db')

# 打印数据库连接状态(可选)
print(f"数据库连接成功:{db}")

代码说明

  1. 导入records库后,通过records.Database()方法创建数据库连接对象db
  2. SQLite数据库以文件形式存储,sqlite:///test.db表示在当前目录下创建或使用test.db文件。
  3. 连接成功后,db对象可用于后续的查询、插入、更新等操作。

3.2 执行SQL查询

Records支持执行任意SQL语句,包括SELECTINSERTUPDATEDELETE等,核心方法是db.query()

3.2.1 查询数据(SELECT语句)

实例代码:创建表并查询数据

import records

# 连接数据库
db = records.Database('sqlite:///test.db')

# 1. 执行创建表的SQL语句
create_sql = """
CREATE TABLE IF NOT EXISTS students (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    age INTEGER,
    gender TEXT
)
"""
db.query(create_sql)
print("students表创建成功")

# 2. 插入测试数据
insert_sql = """
INSERT INTO students (name, age, gender) VALUES 
('张三', 18, '男'),
('李四', 19, '女'),
('王五', 20, '男')
"""
db.query(insert_sql)
print("测试数据插入成功")

# 3. 查询表中所有数据
results = db.query("SELECT * FROM students")

# 打印查询结果(默认以OrderedDict格式返回)
for row in results:
    print(f"ID: {row.id}, 姓名: {row.name}, 年龄: {row.age}, 性别: {row.gender}")

代码说明

  1. 首先执行CREATE TABLE语句创建students表,IF NOT EXISTS确保表不存在时才创建,避免重复创建报错。
  2. 执行INSERT语句插入3条测试数据,db.query()方法直接执行SQL语句。
  3. 执行SELECT语句查询数据,返回的results是一个结果集对象,可通过循环遍历每一行数据,每行数据以OrderedDict格式存储,支持通过键名(如row.id)或索引访问。

3.2.2 参数化查询

在实际开发中,直接拼接SQL语句容易引发SQL注入攻击,Records支持参数化查询,通过占位符传递参数,提高安全性。

实例代码:参数化查询指定条件的数据

import records

db = records.Database('sqlite:///test.db')

# 使用参数化查询,占位符为:参数名
gender = '男'
age_limit = 18
results = db.query("SELECT * FROM students WHERE gender = :g AND age >= :a", g=gender, a=age_limit)

# 打印查询结果
print(f"性别为{gender}且年龄大于等于{age_limit}的学生:")
for row in results:
    print(f"ID: {row.id}, 姓名: {row.name}")

代码说明

  1. SQL语句中使用:g:a作为占位符,分别对应后续传入的g=gendera=age_limit参数。
  2. 参数化查询会自动处理参数的转义,避免SQL注入风险,这是开发中的最佳实践。

3.3 结果集格式化导出

Records的一大特色是支持将查询结果导出为多种格式,如CSV、JSON、Excel、YAML等,这一功能依赖于tablib库,无需手动编写导出代码。

实例代码:将查询结果导出为CSV、JSON和Excel文件

import records

db = records.Database('sqlite:///test.db')

# 查询所有学生数据
results = db.query("SELECT * FROM students")

# 1. 导出为CSV文件
with open('students.csv', 'w', encoding='utf-8') as f:
    f.write(results.export('csv'))
print("CSV文件导出成功")

# 2. 导出为JSON文件
with open('students.json', 'w', encoding='utf-8') as f:
    f.write(results.export('json'))
print("JSON文件导出成功")

# 3. 导出为Excel文件(需要确保tablib支持,若报错需安装openpyxl)
try:
    with open('students.xlsx', 'wb') as f:
        f.write(results.export('xlsx'))
    print("Excel文件导出成功")
except Exception as e:
    print(f"Excel导出失败,请安装openpyxl:{e}")
    # 安装命令:pip install openpyxl

代码说明

  1. results.export()方法接收一个格式参数,支持的格式包括csvjsonxlsxyaml等。
  2. 导出CSV和JSON时,直接以文本形式写入文件;导出Excel时,需要以二进制模式(wb)写入,且需安装openpyxl库。
  3. 导出的文件可直接用Excel、文本编辑器等打开,方便数据分享与分析。

3.4 执行事务操作

在数据库操作中,事务用于确保一系列操作的原子性(要么全部成功,要么全部失败)。Records支持通过db.transaction()方法开启事务。

实例代码:事务操作示例

import records

db = records.Database('sqlite:///test.db')

# 开启事务
with db.transaction() as tx:
    try:
        # 执行多条SQL语句
        tx.query("INSERT INTO students (name, age, gender) VALUES ('赵六', 21, '男')")
        tx.query("UPDATE students SET age = 22 WHERE name = '王五'")
        # 事务会自动提交
        print("事务执行成功,数据已提交")
    except Exception as e:
        # 发生异常时事务自动回滚
        print(f"事务执行失败,数据已回滚:{e}")

代码说明

  1. 使用with db.transaction() as tx上下文管理器开启事务,在with代码块内执行的所有SQL语句都属于同一个事务。
  2. 如果代码块内没有发生异常,事务会自动提交;如果发生异常(如SQL语法错误、主键冲突等),事务会自动回滚,确保数据一致性。
  3. 事务操作适用于需要批量执行多个SQL语句的场景,如转账、订单创建等。

3.5 关闭数据库连接

虽然Records会自动管理数据库连接,但在程序结束时手动关闭连接是良好的编程习惯,可释放资源。

实例代码:关闭数据库连接

import records

db = records.Database('sqlite:///test.db')

# 执行数据库操作...
results = db.query("SELECT * FROM students")
print(results.all())

# 关闭数据库连接
db.close()
print("数据库连接已关闭")

代码说明:调用db.close()方法即可关闭数据库连接,关闭后db对象无法再执行任何操作。

四、Records库高级应用实例

下面我们结合一个实际的数据分析场景,展示Records库的综合使用方法:从MySQL数据库中查询销售数据,进行简单的统计分析,并将结果导出为Excel文件。

4.1 场景需求

假设我们有一个MySQL数据库sales_db,其中包含sales表,表结构如下:

| 字段名 | 类型 | 说明 |
|–|||
| id | INT | 订单ID(主键) |
| product | VARCHAR | 产品名称 |
| amount | DECIMAL | 销售金额 |
| sale_date | DATE | 销售日期 |

需求:查询2024年1月的销售数据,统计每个产品的总销售额,并导出为Excel文件。

4.2 实例代码

import records

# 连接MySQL数据库(替换为你的数据库信息)
db_config = {
    "user": "root",
    "password": "123456",
    "host": "localhost",
    "port": 3306,
    "dbname": "sales_db"
}
conn_str = f"mysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
db = records.Database(conn_str)

# 1. 查询2024年1月的销售数据
query_sql = """
SELECT product, SUM(amount) AS total_amount
FROM sales
WHERE sale_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY product
ORDER BY total_amount DESC
"""
results = db.query(query_sql)

# 2. 打印统计结果
print("2024年1月产品销售统计:")
for row in results:
    print(f"产品:{row.product}, 总销售额:{row.total_amount}元")

# 3. 导出为Excel文件
try:
    with open('202401_sales_report.xlsx', 'wb') as f:
        f.write(results.export('xlsx'))
    print("销售报表已导出为202401_sales_report.xlsx")
except Exception as e:
    print(f"导出失败:{e}")
    print("请执行 pip install openpyxl 安装依赖库")

# 4. 关闭连接
db.close()

代码说明

  1. 首先构建MySQL的连接字符串,替换为实际的用户名、密码、主机等信息。
  2. 执行SELECT语句并使用GROUP BY统计每个产品的总销售额,ORDER BY按销售额降序排列。
  3. 将统计结果导出为Excel文件,方便业务人员查看和分析。
  4. 最后关闭数据库连接,释放资源。

五、Records库常见问题与解决方案

5.1 连接MySQL时提示“找不到驱动”

问题现象:执行代码时出现No module named 'MySQLdb'错误。
解决方案:安装MySQL驱动mysqlclient,命令为pip install mysqlclient。若安装失败,可尝试安装pymysql并修改连接字符串:mysql+pymysql://用户名:密码@主机:端口/数据库名

5.2 导出Excel时提示“不支持的格式”

问题现象:执行results.export('xlsx')时出现ExportError: No module named 'openpyxl'错误。
解决方案:安装openpyxl库,命令为pip install openpyxl

5.3 事务回滚失效

问题现象:事务执行过程中发生异常,但数据仍被修改。
解决方案:确保所有SQL操作都在with db.transaction() as tx代码块内通过tx.query()执行,而非db.query()tx对象是事务内的连接对象,只有通过它执行的操作才会被纳入事务管理。

六、Records库相关资源

  • PyPI地址:https://pypi.org/project/records
  • Github地址:https://github.com/kennethreitz/records
  • 官方文档地址:https://records.readthedocs.io

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:pygsheets轻松操作Google Sheets

一、pygsheets库核心概述

pygsheets是一款专门用于Python程序与Google Sheets进行交互的第三方库,它的核心用途是实现对Google表格的创建、读取、修改、更新等操作,无需借助繁琐的手动操作或复杂的API调用流程。其工作原理是基于Google Sheets API v4进行封装,将复杂的接口请求转化为简洁的Python方法,开发者只需通过简单的代码调用即可完成与Google表格的交互。该库的优点在于语法简洁、功能全面,支持批量数据操作和单元格格式设置,同时兼容多种数据类型;缺点是需要配置Google Cloud平台的相关凭证,对新手而言存在一定的入门门槛。pygsheets采用MIT开源许可证,允许开发者自由用于商业和非商业项目。

二、pygsheets库安装与环境配置

2.1 库的安装

对于技术小白来说,pygsheets的安装流程非常简单,只需使用Python的包管理工具pip即可完成。打开命令行终端,输入以下命令:

pip install pygsheets

执行完毕后,pip会自动下载并安装pygsheets及其依赖的相关库,如google-api-python-client、oauth2client等。安装完成后,我们可以在Python环境中通过导入语句验证是否安装成功:

import pygsheets
print(pygsheets.__version__)

如果运行后能够输出pygsheets的版本号,说明安装已经成功。

2.2 Google Cloud凭证配置

由于pygsheets操作的是Google Sheets云端表格,因此必须先完成Google Cloud平台的凭证配置,获取对应的授权文件,具体步骤如下:

  1. 登录Google Cloud Console,创建一个新的项目,项目名称可以自定义,例如“pygsheets-demo”。
  2. 在项目中搜索并启用Google Sheets API,搜索框输入“Google Sheets API”,找到后点击“启用”按钮。
  3. 进入“API和服务”->“凭据”页面,点击“创建凭据”->“服务账号密钥”。
  4. 创建一个新的服务账号,填写服务账号名称,角色选择“Editor”(编辑权限),密钥类型选择“JSON”,点击创建后,浏览器会自动下载一个JSON格式的凭证文件,我们需要将这个文件保存到本地,例如命名为“credentials.json”。
  5. 打开下载的JSON凭证文件,找到其中的“client_email”字段对应的邮箱地址,将这个邮箱地址添加到目标Google Sheets表格的共享列表中,并授予编辑权限,这样pygsheets才能通过该凭证操作这个表格。

三、pygsheets核心功能与代码实例

3.1 连接Google Sheets并打开表格

使用pygsheets的第一步是通过凭证文件建立与Google Sheets的连接,然后打开指定的表格。这里我们需要用到pygsheets.authorize()方法,该方法会读取本地的凭证文件完成授权。

import pygsheets

# 授权连接,传入凭证文件路径
gc = pygsheets.authorize(service_file='credentials.json')

# 方式1:通过表格名称打开已存在的表格
sh = gc.open('My Google Sheet')  # 'My Google Sheet'是Google云端的表格名称

# 方式2:通过表格的ID打开表格(表格ID在表格URL中,格式为https://docs.google.com/spreadsheets/d/表格ID/edit)
# sh = gc.open_by_key('1Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')

# 方式3:打开最近使用的表格
# sh = gc.open_last()

代码说明authorize()方法会根据传入的凭证文件完成身份验证,返回一个授权后的客户端对象gc;通过gcopen()open_by_key()等方法可以打开云端的表格,返回表格对象sh,后续所有操作都基于这个对象展开。

3.2 创建新表格与工作表

pygsheets支持创建新的Google Sheets表格,也可以在已有表格中创建新的工作表(Sheet)。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')

# 创建新的云端表格,参数为表格名称
new_sh = gc.create('New Pygsheets Sheet')
print(f'新表格创建成功,URL为:{new_sh.url}')

# 在新表格中创建新的工作表,参数为工作表名称、行数、列数
new_ws = new_sh.add_worksheet('New Worksheet', rows=100, cols=20)

# 创建工作表后,可以删除默认的第一个工作表(名为Sheet1)
default_ws = new_sh.worksheet_by_title('Sheet1')
new_sh.del_worksheet(default_ws)

代码说明gc.create()方法会在Google云端创建一个新的表格,返回新表格对象new_sh,通过new_sh.url可以获取表格的访问链接;add_worksheet()方法用于在表格中添加新的工作表,指定名称、行数和列数;del_worksheet()方法则用于删除指定的工作表,删除前需要通过worksheet_by_title()方法获取对应的工作表对象。

3.3 工作表的基础操作

工作表是我们存放和操作数据的主要载体,pygsheets提供了丰富的工作表操作方法,包括选择工作表、获取工作表属性、清空工作表等。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')

# 选择指定名称的工作表
ws = sh.worksheet_by_title('Sheet1')

# 选择索引为0的工作表(索引从0开始,对应第一个工作表)
# ws = sh[0]

# 获取工作表的行数和列数
rows = ws.rows
cols = ws.cols
print(f'当前工作表行数:{rows},列数:{cols}')

# 获取工作表的所有数据,返回二维列表格式
all_data = ws.get_all_values()
print(f'工作表所有数据:{all_data}')

# 清空工作表的所有数据
ws.clear()
print('工作表数据已清空')

代码说明worksheet_by_title()方法通过工作表名称选择目标工作表,也可以通过索引的方式直接选择;rowscols属性分别返回工作表的行数和列数;get_all_values()方法会读取工作表中的所有数据,以二维列表的形式返回,每一行对应列表中的一个子列表;clear()方法用于清空工作表的所有内容。

3.4 单元格数据读写操作

单元格是工作表的最小数据单元,pygsheets支持对单个单元格、多个单元格进行数据的读取和写入操作。

3.4.1 单个单元格操作

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')
ws = sh[0]

# 方式1:通过行列索引获取单元格(索引从1开始)
cell = ws.cell('A1')
# 写入数据到单元格
cell.value = 'Hello pygsheets'
# 读取单元格数据
print(f'A1单元格数据:{cell.value}')

# 方式2:直接使用get_value和update_value方法
# 读取A2单元格数据
a2_value = ws.get_value('A2')
print(f'A2单元格数据:{a2_value}')
# 向A2单元格写入数据
ws.update_value('A2', 'Python Google Sheets')

代码说明cell()方法通过单元格地址(如A1)获取单元格对象,通过修改对象的value属性写入数据;get_value()update_value()方法可以直接读取和修改指定单元格的数据,无需获取单元格对象,操作更加简洁。

3.4.2 多个单元格批量操作

当需要处理大量数据时,批量操作可以显著提高效率,pygsheets支持对单元格区域进行批量读写。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')
ws = sh[0]

# 定义要写入的数据(二维列表,对应多行多列)
data = [
    ['姓名', '年龄', '城市'],
    ['张三', 25, '北京'],
    ['李四', 30, '上海'],
    ['王五', 28, '广州']
]

# 批量写入数据到A1:C4区域
ws.update_values('A1:C4', data)
print('批量数据写入完成')

# 批量读取A1:C4区域的数据
range_data = ws.get_values('A1:C4')
print(f'读取的区域数据:{range_data}')

# 批量修改单元格格式(将A1单元格设置为加粗,字体大小14)
ws.cell('A1').set_text_format('bold', True)
ws.cell('A1').set_text_format('fontSize', 14)

代码说明update_values()方法接收单元格区域和二维列表数据,将数据批量写入指定区域;get_values()方法读取指定区域的所有数据,返回二维列表;通过set_text_format()方法可以设置单元格的文本格式,如加粗、字体大小等。

3.5 数据筛选与排序

pygsheets支持对工作表中的数据进行筛选和排序,方便快速处理和分析数据。

import pygsheets

gc = pygsheets.authorize(service_file='credentials.json')
sh = gc.open('My Google Sheet')
ws = sh[0]

# 假设工作表中已有数据:A列姓名,B列年龄,C列城市
# 筛选年龄大于25的行数据
# 第一步:获取所有数据
all_data = ws.get_all_values(include_tailing_empty=False)
# 第二步:筛选数据(跳过表头)
filtered_data = [all_data[0]] + [row for row in all_data[1:] if int(row[1]) > 25]
print(f'年龄大于25的数据:{filtered_data}')

# 将筛选后的数据写入新的工作表
new_ws = sh.add_worksheet('Filtered Data', rows=len(filtered_data), cols=3)
new_ws.update_values('A1:C{}'.format(len(filtered_data)), filtered_data)

# 对数据按年龄降序排序
# 跳过表头,对数据行排序
sorted_data = [all_data[0]] + sorted(all_data[1:], key=lambda x: int(x[1]), reverse=True)
print(f'按年龄降序排序后的数据:{sorted_data}')

代码说明get_all_values()方法的include_tailing_empty参数设置为False,可以忽略末尾的空行;通过列表推导式可以实现简单的数据筛选;sorted()函数结合匿名函数可以对数据按指定列进行排序;最后将处理后的数据写入新的工作表,完成数据的二次整理。

四、pygsheets实际应用案例:数据统计与报表生成

4.1 案例场景

假设我们需要从一个Google Sheets表格中读取销售数据,统计每个产品的总销售额,然后将统计结果写入新的工作表,生成销售报表。

4.2 案例代码

import pygsheets

def generate_sales_report(credential_path, sheet_name):
    # 授权连接Google Sheets
    gc = pygsheets.authorize(service_file=credential_path)
    sh = gc.open(sheet_name)
    # 读取销售数据工作表
    sales_ws = sh.worksheet_by_title('销售数据')
    # 获取所有销售数据,跳过表头
    sales_data = sales_ws.get_all_values(include_tailing_empty=False)[1:]

    # 统计每个产品的总销售额
    sales_report = {}
    for row in sales_data:
        product_name = row[0]  # A列:产品名称
        quantity = int(row[1]) # B列:销售数量
        price = float(row[2])  # C列:单价
        total_sales = quantity * price

        if product_name in sales_report:
            sales_report[product_name] += total_sales
        else:
            sales_report[product_name] = total_sales

    # 准备报表数据
    report_data = [['产品名称', '总销售额(元)']]
    for product, total in sales_report.items():
        report_data.append([product, round(total, 2)])

    # 创建报表工作表
    if sh.worksheet_by_title('销售报表'):
        report_ws = sh.worksheet_by_title('销售报表')
        report_ws.clear()
    else:
        report_ws = sh.add_worksheet('销售报表', rows=len(report_data), cols=2)

    # 写入报表数据
    report_ws.update_values('A1:B{}'.format(len(report_data)), report_data)
    # 设置报表表头格式
    header_cell = report_ws.cell('A1')
    header_cell.set_text_format('bold', True)
    header_cell.set_text_format('fontSize', 12)
    header_cell = report_ws.cell('B1')
    header_cell.set_text_format('bold', True)
    header_cell.set_text_format('fontSize', 12)

    print('销售报表生成完成!')

# 调用函数生成报表
generate_sales_report('credentials.json', '产品销售统计')

代码说明:该案例定义了一个generate_sales_report函数,接收凭证文件路径和表格名称作为参数;函数首先读取“销售数据”工作表中的数据,然后通过字典统计每个产品的总销售额;接着创建或清空“销售报表”工作表,将统计结果写入其中,并设置表头格式;最后完成销售报表的生成。这个案例充分体现了pygsheets在数据处理和报表生成场景中的实用价值。

五、pygsheets相关资源地址

  • Pypi地址:https://pypi.org/project/pygsheets
  • Github地址:https://github.com/nithinmurali/pygsheets
  • 官方文档地址:https://pygsheets.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:libcloud 跨云服务管理入门与实战

一、libcloud 核心概览

libcloud 是一款轻量级的 Python 跨云服务管理库,能够统一不同云服务商的 API 接口,让开发者用一套代码管理 AWS、阿里云、腾讯云等多家云平台的资源。其工作原理是通过抽象层封装各云厂商的差异化 API,对外提供一致的调用接口。

该库的优点是跨云兼容性强接口简洁统一,缺点是部分云厂商的最新功能支持存在滞后。libcloud 采用 Apache License 2.0 开源协议,允许商用和二次开发,Pypi 地址为 https://pypi.org/project/libcloud,Github 地址为 https://github.com/apache/libcloud,官方文档地址为 https://libcloud.readthedocs.io。

二、libcloud 安装与环境配置

对于技术小白来说,libcloud 的安装流程非常简单,只需要依赖 Python 的包管理工具 pip 即可完成,无需复杂的编译或配置步骤。

2.1 安装步骤

确保你的电脑已经安装了 Python 环境(推荐 Python 3.6 及以上版本),打开命令行终端,输入以下命令即可完成安装:

pip install libcloud

如果需要安装指定版本的 libcloud,可以使用如下命令(以 3.8.0 版本为例):

pip install libcloud==3.8.0

安装完成后,我们可以在 Python 交互环境中验证是否安装成功:

import libcloud
print(libcloud.__version__)

执行上述代码,如果终端输出对应的 libcloud 版本号(如 3.8.0),则说明安装成功。

2.2 环境依赖说明

libcloud 对系统环境的依赖极低,仅需要 Python 标准库即可运行。如果需要使用某些特定云服务商的功能(如 OpenStack 的高级特性),可能需要额外安装对应的依赖包,但这类情况较少,官方文档会针对特殊场景给出明确的依赖说明。

三、libcloud 核心功能与代码实例

libcloud 的核心功能覆盖了云服务管理的主要场景,包括计算资源管理(虚拟机创建、启动、停止)、存储资源管理(对象存储的上传、下载)、负载均衡管理等。下面我们结合具体的代码实例,讲解最常用的功能模块。

3.1 计算资源管理:虚拟机实例操作

计算资源管理是 libcloud 最核心的功能之一,我们以阿里云 ECS 为例,演示如何通过 libcloud 创建、启动、查询和销毁虚拟机实例。

3.1.1 初始化云服务商驱动

首先,我们需要导入 libcloud 对应的计算模块,并初始化阿里云的驱动。使用前需要准备好阿里云的 Access Key ID 和 Access Key Secret,这两个信息可以在阿里云控制台的“AccessKey 管理”中获取。

from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver

# 阿里云的 Access Key 信息
ACCESS_KEY = '你的阿里云 Access Key ID'
SECRET_KEY = '你的阿里云 Access Key Secret'

# 获取阿里云 ECS 驱动
Driver = get_driver(Provider.ALIYUN_ECS)
conn = Driver(ACCESS_KEY, SECRET_KEY, region='cn-hangzhou')

代码说明

  • Provider.ALIYUN_ECS 是 libcloud 预定义的阿里云 ECS 提供商常量,不同云厂商对应不同的 Provider。
  • region 参数指定了云资源所在的地域,这里选择的是杭州地域(cn-hangzhou),可以根据需求替换为其他地域,如 cn-beijing、cn-shenzhen 等。

3.1.2 查询可用的虚拟机镜像和规格

在创建虚拟机之前,我们需要先查询可用的镜像(操作系统)和实例规格(CPU、内存配置)。

# 查询可用镜像
images = conn.list_images()
# 打印前 5 个镜像的信息
for image in images[:5]:
    print(f"镜像 ID: {image.id}, 镜像名称: {image.name}")

# 查询可用实例规格
sizes = conn.list_sizes()
# 打印前 5 个规格的信息
for size in sizes[:5]:
    print(f"规格 ID: {size.id}, 规格名称: {size.name}, CPU 核心数: {size.extra['cpu_core_count']}, 内存大小: {size.ram}MB")

代码说明

  • list_images() 方法返回当前地域下所有可用的镜像列表,每个镜像对象包含 id、name 等属性。
  • list_sizes() 方法返回当前地域下所有可用的实例规格列表,size.extra 字典中包含了额外的规格信息,如 CPU 核心数、磁盘大小等。

3.1.3 创建虚拟机实例

当我们确定了要使用的镜像和规格后,就可以调用 create_node() 方法创建虚拟机实例了。

# 选择第一个镜像和第一个规格作为示例
image = images[0]
size = sizes[0]

# 创建虚拟机实例
node = conn.create_node(
    name='libcloud-test-node',
    image=image,
    size=size,
    # 如果需要指定网络,可以添加 networks 参数
    # networks=[{'id': '你的 VPC 网络 ID'}]
)

print(f"创建的虚拟机 ID: {node.id}, 虚拟机名称: {node.name}, 虚拟机状态: {node.state}")

代码说明

  • name 参数是虚拟机的名称,可自定义。
  • imagesize 参数分别指定了虚拟机的镜像和规格,需要传入 list_images()list_sizes() 返回的对象。
  • 执行成功后,node 对象包含了新建虚拟机的所有信息,包括 id、name、state(状态)等。

3.1.4 虚拟机实例的启动、停止与销毁

对于已创建的虚拟机实例,我们可以通过 libcloud 实现启动、停止和销毁操作。

# 启动虚拟机(如果实例处于停止状态)
if node.state == 0:  # 0 代表停止状态,不同云厂商状态码可能不同
    conn.ex_start_node(node)
    print("虚拟机启动成功")

# 查询虚拟机最新状态
node = conn.get_node(node.id)
print(f"虚拟机当前状态: {node.state}")

# 停止虚拟机
conn.ex_stop_node(node)
print("虚拟机停止成功")

# 销毁虚拟机
conn.destroy_node(node)
print("虚拟机销毁成功")

代码说明

  • ex_start_node()ex_stop_node() 是阿里云驱动特有的扩展方法,用于启动和停止虚拟机,不同云厂商的扩展方法可能略有差异。
  • get_node(node.id) 方法用于获取虚拟机的最新状态,因为虚拟机状态更新可能存在延迟。
  • destroy_node(node) 方法用于销毁虚拟机,执行该操作后,对应的云资源会被释放,请注意谨慎操作。

3.2 存储资源管理:对象存储操作

除了计算资源,libcloud 还支持管理各大云厂商的对象存储服务,我们以 AWS S3 为例,演示如何实现文件的上传、下载和查询。

3.2.1 初始化 S3 驱动

和计算资源管理类似,首先需要初始化 AWS S3 的驱动,准备好 AWS 的 Access Key ID 和 Secret Access Key。

from libcloud.storage.types import Provider
from libcloud.storage.providers import get_driver

# AWS 访问密钥信息
AWS_ACCESS_KEY = '你的 AWS Access Key ID'
AWS_SECRET_KEY = '你的 AWS Secret Access Key'

# 获取 AWS S3 驱动
Driver = get_driver(Provider.S3)
conn = Driver(AWS_ACCESS_KEY, AWS_SECRET_KEY)

3.2.2 创建存储桶(Bucket)

在 S3 中,所有文件都存储在存储桶(Bucket)中,我们需要先创建一个存储桶。

# 创建存储桶,存储桶名称必须全局唯一
bucket_name = 'libcloud-test-bucket-2024'
bucket = conn.create_container(container_name=bucket_name)
print(f"存储桶创建成功,存储桶名称: {bucket.name}")

代码说明

  • create_container() 方法用于创建存储桶,container_name 参数即为存储桶名称,该名称在 AWS 全球范围内必须唯一,不能与其他用户的存储桶重名。

3.2.3 上传文件到存储桶

接下来,我们将本地的一个文件上传到刚刚创建的存储桶中。

import os

# 本地文件路径
local_file_path = 'test.txt'
# 在存储桶中的文件名称
remote_file_name = 'uploaded_test.txt'

# 确保本地文件存在
if not os.path.exists(local_file_path):
    with open(local_file_path, 'w') as f:
        f.write('Hello libcloud!')

# 上传文件
with open(local_file_path, 'rb') as file_obj:
    obj = conn.upload_object(
        file_object=file_obj,
        container=bucket,
        object_name=remote_file_name
    )

print(f"文件上传成功,文件名称: {obj.name}, 文件大小: {obj.size} bytes")

代码说明

  • upload_object() 方法用于上传文件,file_object 参数是本地文件的二进制流,container 参数指定目标存储桶,object_name 参数是文件在存储桶中的名称。
  • 我们先判断本地文件是否存在,如果不存在则创建一个包含“Hello libcloud!”内容的 test.txt 文件。

3.2.4 下载存储桶中的文件

我们可以将存储桶中的文件下载到本地。

# 下载文件的本地路径
download_path = 'downloaded_test.txt'

# 获取存储桶中的文件对象
obj = conn.get_object(container_name=bucket_name, object_name=remote_file_name)

# 下载文件
with open(download_path, 'wb') as file_obj:
    for chunk in conn.download_object_as_stream(obj):
        file_obj.write(chunk)

print(f"文件下载成功,保存路径: {download_path}")

代码说明

  • get_object() 方法用于获取存储桶中的指定文件对象,需要传入存储桶名称和文件名称。
  • download_object_as_stream() 方法以流的形式下载文件,适合处理大文件,避免一次性加载到内存中。

3.2.5 删除文件和存储桶

使用完成后,我们可以删除存储桶中的文件,然后删除存储桶。

# 删除存储桶中的文件
conn.delete_object(obj)
print("文件删除成功")

# 删除存储桶,删除前必须确保存储桶为空
conn.delete_container(bucket)
print("存储桶删除成功")

代码说明

  • delete_object() 方法用于删除存储桶中的文件,delete_container() 方法用于删除存储桶,删除存储桶前必须确保桶内没有任何文件,否则会报错。

3.3 负载均衡管理:基础配置示例

libcloud 还支持管理云厂商的负载均衡服务,我们以腾讯云负载均衡为例,演示基础的配置流程。

from libcloud.loadbalancer.types import Provider
from libcloud.loadbalancer.providers import get_driver

# 腾讯云 API 密钥信息
TENCENT_CLOUD_SECRET_ID = '你的腾讯云 SecretId'
TENCENT_CLOUD_SECRET_KEY = '你的腾讯云 SecretKey'

# 获取腾讯云负载均衡驱动
Driver = get_driver(Provider.TENCENT_CLOUD_LOADBALANCER)
conn = Driver(TENCENT_CLOUD_SECRET_ID, TENCENT_CLOUD_SECRET_KEY, region='ap-guangzhou')

# 列出可用的负载均衡器
lbs = conn.list_balancers()
print(f"当前地域下的负载均衡器数量: {len(lbs)}")

# 创建负载均衡器(简化示例)
lb = conn.create_balancer(
    name='libcloud-test-lb',
    port=80,
    protocol='http',
    algorithm='round_robin'
)
print(f"负载均衡器创建成功,ID: {lb.id}, 名称: {lb.name}")

代码说明

  • create_balancer() 方法用于创建负载均衡器,port 指定监听端口,protocol 指定协议类型,algorithm 指定负载均衡算法(如轮询 round_robin)。
  • 不同云厂商的负载均衡配置参数略有差异,具体可以参考官方文档。

四、libcloud 实际应用案例:跨云资源监控系统

在实际的开发和运维工作中,我们经常需要管理多个云厂商的资源,此时 libcloud 的跨云优势就可以充分发挥。下面我们构建一个简单的跨云资源监控系统,该系统可以同时查询阿里云 ECS 和 AWS EC2 的虚拟机实例状态,并将状态信息输出到控制台。

4.1 案例需求分析

  1. 同时连接阿里云 ECS 和 AWS EC2 两个云平台。
  2. 查询两个平台下所有的虚拟机实例信息。
  3. 输出每个虚拟机的 ID、名称、状态和所在云平台。

4.2 完整代码实现

from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver

# 配置云厂商的访问密钥
CLOUD_CONFIGS = {
    'aliyun': {
        'provider': Provider.ALIYUN_ECS,
        'access_key': '你的阿里云 Access Key ID',
        'secret_key': '你的阿里云 Access Key Secret',
        'region': 'cn-hangzhou'
    },
    'aws': {
        'provider': Provider.EC2,
        'access_key': '你的 AWS Access Key ID',
        'secret_key': '你的 AWS Secret Access Key',
        'region': 'us-east-1'
    }
}

def get_cloud_connection(config):
    """
    根据配置创建云服务商连接
    """
    Driver = get_driver(config['provider'])
    return Driver(config['access_key'], config['secret_key'], region=config['region'])

def list_all_nodes():
    """
    查询所有云平台的虚拟机实例
    """
    all_nodes = []
    for cloud_name, config in CLOUD_CONFIGS.items():
        conn = get_cloud_connection(config)
        nodes = conn.list_nodes()
        for node in nodes:
            all_nodes.append({
                'cloud_platform': cloud_name,
                'node_id': node.id,
                'node_name': node.name,
                'node_state': node.state
            })
    return all_nodes

def print_node_status(nodes):
    """
    打印虚拟机实例状态
    """
    print("===== 跨云资源监控报告 =====")
    for node in nodes:
        print(f"云平台: {node['cloud_platform']} | 实例 ID: {node['node_id']} | 实例名称: {node['node_name']} | 状态: {node['node_state']}")
    print("============================")

if __name__ == '__main__':
    nodes = list_all_nodes()
    print_node_status(nodes)

4.3 代码说明与运行效果

  1. 配置管理:我们将阿里云和 AWS 的配置信息存储在 CLOUD_CONFIGS 字典中,便于后续扩展更多云平台。
  2. 连接创建函数get_cloud_connection() 函数根据传入的配置,动态创建对应云厂商的连接,实现了代码的复用。
  3. 资源查询函数list_all_nodes() 函数遍历所有云平台,查询每个平台下的虚拟机实例,并将实例信息整理成统一格式的列表。
  4. 状态输出函数print_node_status() 函数将整理后的实例信息以清晰的格式输出到控制台。

运行该代码后,控制台会输出类似以下的内容:

===== 跨云资源监控报告 =====
云平台: aliyun | 实例 ID: i-abcdefg123456 | 实例名称: web-server | 状态: running
云平台: aws | 实例 ID: i-0123456789abcdef | 实例名称: db-server | 状态: stopped
============================

五、libcloud 相关资源地址

  • Pypi 地址:https://pypi.org/project/libcloud
  • Github 地址:https://github.com/apache/libcloud
  • 官方文档地址:https://libcloud.readthedocs.io

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:PynamoDB——轻松操作AWS DynamoDB的ORM库

一、PynamoDB 核心概述

1.1 用途

PynamoDB是一款为AWS DynamoDB打造的Python ORM(对象关系映射)库,能让开发者以面向对象的方式定义数据表结构、执行增删改查操作,无需编写复杂的原生API调用代码,大幅降低DynamoDB的使用门槛。

1.2 工作原理

PynamoDB将Python类映射为DynamoDB数据表,类的属性对应数据表的字段,通过封装AWS SDK for Python(boto3)的底层接口,把对象的实例化、方法调用转化为DynamoDB的API请求,实现数据表的创建、数据的读写等操作。

1.3 优缺点

优点:语法简洁直观,符合Python开发者的使用习惯;支持自动分页、条件查询、事务操作等高级功能;兼容DynamoDB的本地测试环境,便于开发调试。
缺点:相比原生boto3,存在轻微的性能损耗;部分DynamoDB的冷门特性支持不够及时;依赖AWS账号配置,本地开发需额外配置环境。

1.4 License类型

PynamoDB采用MIT License开源协议,允许开发者自由使用、修改、分发代码,无论是商业项目还是开源项目都能无门槛集成。

二、PynamoDB 安装与环境配置

2.1 安装命令

PynamoDB已发布至PyPI,可直接通过pip包管理器安装,建议安装最新稳定版本:

pip install pynamodb

若需要使用最新的开发特性,也可以从GitHub源码安装:

pip install git+https://github.com/pynamodb/PynamoDB.git

2.2 环境配置

使用PynamoDB操作AWS DynamoDB前,需要配置AWS账号的访问凭证,主要有两种方式:

  1. 配置环境变量
    在系统中设置以下两个环境变量,分别对应AWS的Access Key ID和Secret Access Key:
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"
export AWS_DEFAULT_REGION="us-east-1"  # 例如:us-east-1、ap-southeast-1
  1. 使用AWS配置文件
    在用户目录下创建.aws文件夹,新建credentialsconfig两个文件:
  • credentials文件内容:
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
  • config文件内容:
[default]
region = us-east-1
  1. 本地测试环境配置
    如果没有AWS账号,可使用DynamoDB Local进行本地开发测试。首先下载并启动DynamoDB Local,然后在代码中指定host参数连接本地服务:
# 启动DynamoDB Local(需提前安装Java)
java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb

三、PynamoDB 核心使用方法

3.1 定义数据表模型

PynamoDB的核心是通过Python类定义数据表结构,每个类对应一个DynamoDB表,类属性对应表的键(分区键、排序键)和普通属性。

3.1.1 基础表模型定义

以下示例定义一个存储用户信息的数据表,包含分区键user_id,普通属性usernameageemail

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

class UserModel(Model):
    """
    定义DynamoDB用户数据表模型
    """
    class Meta:
        # 数据表名称
        table_name = "user_table"
        # AWS区域
        region = "us-east-1"
        # 本地测试时取消注释,指定DynamoDB Local地址
        # host = "http://localhost:8000"

    # 分区键:用户ID,字符串类型
    user_id = UnicodeAttribute(hash_key=True)
    # 普通属性:用户名,字符串类型
    username = UnicodeAttribute()
    # 普通属性:年龄,数字类型
    age = NumberAttribute()
    # 普通属性:邮箱,字符串类型,可选
    email = UnicodeAttribute(null=True)

代码说明

  • 继承pynamodb.models.Model类是定义数据表的前提;
  • Meta类用于配置表的元数据,包括表名、区域、连接地址等;
  • UnicodeAttributeNumberAttribute是PynamoDB提供的属性类型,分别对应字符串和数字类型,支持的属性类型还有BooleanAttributeBinaryAttribute等;
  • hash_key=True表示该属性为分区键,若需要排序键,可设置range_key=True

3.1.2 包含排序键的表模型定义

如果数据表需要复合主键(分区键+排序键),可添加一个属性并设置range_key=True,以下示例定义一个存储用户订单的数据表:

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute
import datetime

class UserOrderModel(Model):
    """
    定义用户订单数据表模型(复合主键:user_id + order_id)
    """
    class Meta:
        table_name = "user_order_table"
        region = "us-east-1"
        # host = "http://localhost:8000"

    # 分区键:用户ID
    user_id = UnicodeAttribute(hash_key=True)
    # 排序键:订单ID
    order_id = UnicodeAttribute(range_key=True)
    # 订单金额
    amount = NumberAttribute()
    # 订单创建时间
    create_time = UTCDateTimeAttribute(default=datetime.datetime.utcnow)

# 创建数据表(仅需执行一次)
if not UserOrderModel.exists():
    UserOrderModel.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)

代码说明

  • UTCDateTimeAttribute用于存储UTC时间,default参数可设置默认值为当前时间;
  • exists()方法判断表是否存在,create_table()方法用于创建表,read_capacity_unitswrite_capacity_units分别设置读写容量单位,wait=True表示等待表创建完成。

3.2 数据增删改查操作

定义好表模型后,就可以通过实例化模型类来执行数据的增删改查操作。

3.2.1 新增数据

使用模型类的构造方法创建实例,调用save()方法将数据存入DynamoDB:

# 新增用户数据
user = UserModel(
    user_id="u001",
    username="zhangsan",
    age=25,
    email="[email protected]"
)
user.save()
print(f"新增用户:{user.username}")

# 新增订单数据
order = UserOrderModel(
    user_id="u001",
    order_id="order001",
    amount=99.9
)
order.save()
print(f"新增订单:{order.order_id},金额:{order.amount}")

代码说明

  • 实例化模型类时,必须传入分区键(和排序键,若有)的参数;
  • save()方法会将实例数据写入对应的数据表,若主键已存在,则会覆盖原有数据。

3.2.2 查询数据

PynamoDB支持多种查询方式,包括按主键查询、条件查询、扫描表等。

(1)按主键查询单条数据

使用get()方法根据主键查询单条数据,适用于精确查询:

# 查询用户ID为u001的用户
try:
    user = UserModel.get("u001")
    print(f"用户ID:{user.user_id}")
    print(f"用户名:{user.username}")
    print(f"年龄:{user.age}")
    print(f"邮箱:{user.email}")
except UserModel.DoesNotExist:
    print("用户不存在")

# 查询用户u001的订单order001
try:
    order = UserOrderModel.get("u001", "order001")
    print(f"订单ID:{order.order_id}")
    print(f"金额:{order.amount}")
    print(f"创建时间:{order.create_time}")
except UserOrderModel.DoesNotExist:
    print("订单不存在")

代码说明

  • get()方法的参数顺序为分区键、排序键(若有);
  • 如果查询的数据不存在,会抛出DoesNotExist异常,需要捕获处理。
(2)条件查询多条数据

使用query()方法进行条件查询,适用于按分区键查询并添加过滤条件,以下示例查询用户u001的所有订单:

# 查询用户u001的所有订单
orders = UserOrderModel.query("u001")
for order in orders:
    print(f"订单ID:{order.order_id},金额:{order.amount}")

# 查询用户u001金额大于50的订单
from pynamodb.conditions import GreaterThan
orders = UserOrderModel.query(
    "u001",
    UserOrderModel.amount > GreaterThan(50)
)
for order in orders:
    print(f"符合条件的订单:{order.order_id},金额:{order.amount}")

代码说明

  • query()方法默认按分区键查询,返回该分区键下的所有数据;
  • 可以通过条件表达式添加过滤条件,pynamodb.conditions提供了GreaterThanLessThanContains等多种条件判断类。
(3)扫描整张表

使用scan()方法扫描整张表,返回所有数据(注意:DynamoDB扫描操作消耗容量较大,不建议在大表中使用):

# 扫描所有用户数据
users = UserModel.scan()
for user in users:
    print(f"用户:{user.username},年龄:{user.age}")

# 扫描年龄大于20的用户
users = UserModel.scan(UserModel.age > 20)
for user in users:
    print(f"年龄大于20的用户:{user.username}")

3.2.3 更新数据

更新数据有两种方式:一是获取数据实例后修改属性,调用save()方法;二是使用update()方法直接更新,支持条件更新。

(1)实例更新
# 获取用户实例并更新年龄
user = UserModel.get("u001")
user.age = 26
user.save()
print(f"更新后年龄:{user.age}")
(2)直接更新(推荐)
# 直接更新用户u001的邮箱,支持条件更新
from pynamodb.conditions import Attr
UserModel.update(
    "u001",
    actions=[
        UserModel.email.set("[email protected]")
    ],
    condition=Attr("age").exists()  # 条件:age属性存在
)
print("邮箱更新成功")

代码说明

  • update()方法的actions参数指定要执行的更新操作,支持setadddelete等动作;
  • condition参数设置更新的条件,只有满足条件时才会执行更新。

3.2.4 删除数据

使用delete()方法删除指定主键的数据,支持条件删除:

# 删除用户u001
UserModel.delete("u001")
print("用户删除成功")

# 条件删除订单:删除金额小于100的订单
UserOrderModel.delete(
    "u001",
    "order001",
    condition=UserOrderModel.amount < 100
)
print("订单删除成功")

3.3 高级功能使用

3.3.1 自动分页查询

当查询结果较多时,DynamoDB会自动分页,PynamoDB的查询结果对象支持迭代器,可直接遍历所有数据,无需手动处理分页:

# 查询所有订单,自动处理分页
orders = UserOrderModel.scan()
for order in orders:
    print(f"订单ID:{order.order_id},用户ID:{order.user_id}")
# 也可以手动获取分页标记
last_evaluated_key = orders.last_evaluated_key
while last_evaluated_key:
    orders = UserOrderModel.scan(exclusive_start_key=last_evaluated_key)
    for order in orders:
        print(f"订单ID:{order.order_id}")
    last_evaluated_key = orders.last_evaluated_key

3.3.2 事务操作

PynamoDB支持事务操作,可通过TransactionWrite类执行多个增删改操作,确保事务的原子性:

from pynamodb.transactions import TransactionWrite

# 事务操作:新增用户和订单
with TransactionWrite() as transaction:
    # 新增用户
    user = UserModel(user_id="u002", username="lisi", age=30)
    transaction.save(user)
    # 新增订单
    order = UserOrderModel(user_id="u002", order_id="order002", amount=199.9)
    transaction.save(order)
print("事务执行成功,用户和订单已新增")

代码说明

  • 使用with语句创建事务上下文,在上下文中执行的saveupdatedelete操作会被纳入事务;
  • 事务中任意一个操作失败,整个事务会回滚,所有操作都不会生效。

四、实际应用案例:用户订单管理系统

4.1 案例需求

开发一个简单的用户订单管理系统,实现以下功能:

  1. 注册新用户;
  2. 为用户创建订单;
  3. 查询指定用户的所有订单;
  4. 更新用户信息;
  5. 删除用户及对应的订单。

4.2 完整代码实现

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute
from pynamodb.conditions import Attr, GreaterThan
from pynamodb.transactions import TransactionWrite
import datetime

# 定义用户表模型
class UserModel(Model):
    class Meta:
        table_name = "user_management_table"
        region = "us-east-1"
        # host = "http://localhost:8000"

    user_id = UnicodeAttribute(hash_key=True)
    username = UnicodeAttribute()
    age = NumberAttribute()
    email = UnicodeAttribute(null=True)

# 定义订单表模型
class OrderModel(Model):
    class Meta:
        table_name = "order_management_table"
        region = "us-east-1"
        # host = "http://localhost:8000"

    user_id = UnicodeAttribute(hash_key=True)
    order_id = UnicodeAttribute(range_key=True)
    amount = NumberAttribute()
    create_time = UTCDateTimeAttribute(default=datetime.datetime.utcnow)

# 创建数据表(首次运行时执行)
def create_tables():
    if not UserModel.exists():
        UserModel.create_table(read_capacity_units=2, write_capacity_units=2, wait=True)
    if not OrderModel.exists():
        OrderModel.create_table(read_capacity_units=2, write_capacity_units=2, wait=True)
    print("数据表创建成功")

# 注册新用户
def register_user(user_id, username, age, email=None):
    user = UserModel(
        user_id=user_id,
        username=username,
        age=age,
        email=email
    )
    user.save()
    print(f"用户 {username} 注册成功")

# 创建用户订单
def create_order(user_id, order_id, amount):
    order = OrderModel(
        user_id=user_id,
        order_id=order_id,
        amount=amount
    )
    order.save()
    print(f"订单 {order_id} 创建成功,金额:{amount}")

# 查询用户所有订单
def query_user_orders(user_id):
    orders = OrderModel.query(user_id)
    print(f"\n用户 {user_id} 的订单列表:")
    for order in orders:
        print(f"订单ID:{order.order_id},金额:{order.amount},创建时间:{order.create_time}")

# 更新用户年龄
def update_user_age(user_id, new_age):
    UserModel.update(
        user_id,
        actions=[UserModel.age.set(new_age)],
        condition=Attr("user_id").exists()
    )
    print(f"用户 {user_id} 年龄更新为 {new_age}")

# 删除用户及订单
def delete_user_and_orders(user_id):
    with TransactionWrite() as transaction:
        # 删除用户
        transaction.delete(UserModel(user_id=user_id))
        # 删除用户所有订单
        orders = OrderModel.query(user_id)
        for order in orders:
            transaction.delete(OrderModel(user_id=user_id, order_id=order.order_id))
    print(f"用户 {user_id} 及所有订单已删除")

# 主函数
if __name__ == "__main__":
    # 创建数据表
    create_tables()

    # 注册用户
    register_user("u003", "wangwu", 28, "[email protected]")

    # 创建订单
    create_order("u003", "order003", 159.9)
    create_order("u003", "order004", 299.9)

    # 查询用户订单
    query_user_orders("u003")

    # 更新用户年龄
    update_user_age("u003", 29)

    # 删除用户及订单
    # delete_user_and_orders("u003")

代码说明

  • 该案例封装了用户和订单的核心操作函数,结构清晰,便于复用;
  • 使用事务操作删除用户及订单,确保数据一致性;
  • 运行前需确保AWS凭证配置正确,或启用本地DynamoDB环境。

五、相关资源地址

  • PyPI地址:https://pypi.org/project/PynamoDB
  • GitHub地址:https://github.com/pynamodb/PynamoDB
  • 官方文档地址:https://pynamodb.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:PynamoDB——轻松操作AWS DynamoDB的ORM库

一、PynamoDB 核心概述

1.1 用途

PynamoDB是一款为AWS DynamoDB打造的Python ORM(对象关系映射)库,能让开发者以面向对象的方式定义数据表结构、执行增删改查操作,无需编写复杂的原生API调用代码,大幅降低DynamoDB的使用门槛。

1.2 工作原理

PynamoDB将Python类映射为DynamoDB数据表,类的属性对应数据表的字段,通过封装AWS SDK for Python(boto3)的底层接口,把对象的实例化、方法调用转化为DynamoDB的API请求,实现数据表的创建、数据的读写等操作。

1.3 优缺点

优点:语法简洁直观,符合Python开发者的使用习惯;支持自动分页、条件查询、事务操作等高级功能;兼容DynamoDB的本地测试环境,便于开发调试。
缺点:相比原生boto3,存在轻微的性能损耗;部分DynamoDB的冷门特性支持不够及时;依赖AWS账号配置,本地开发需额外配置环境。

1.4 License类型

PynamoDB采用MIT License开源协议,允许开发者自由使用、修改、分发代码,无论是商业项目还是开源项目都能无门槛集成。

二、PynamoDB 安装与环境配置

2.1 安装命令

PynamoDB已发布至PyPI,可直接通过pip包管理器安装,建议安装最新稳定版本:

pip install pynamodb

若需要使用最新的开发特性,也可以从GitHub源码安装:

pip install git+https://github.com/pynamodb/PynamoDB.git

2.2 环境配置

使用PynamoDB操作AWS DynamoDB前,需要配置AWS账号的访问凭证,主要有两种方式:

  1. 配置环境变量
    在系统中设置以下两个环境变量,分别对应AWS的Access Key ID和Secret Access Key:
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"
export AWS_DEFAULT_REGION="us-east-1"  # 例如:us-east-1、ap-southeast-1
  1. 使用AWS配置文件
    在用户目录下创建.aws文件夹,新建credentialsconfig两个文件:
  • credentials文件内容:
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
  • config文件内容:
[default]
region = us-east-1
  1. 本地测试环境配置
    如果没有AWS账号,可使用DynamoDB Local进行本地开发测试。首先下载并启动DynamoDB Local,然后在代码中指定host参数连接本地服务:
# 启动DynamoDB Local(需提前安装Java)
java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb

三、PynamoDB 核心使用方法

3.1 定义数据表模型

PynamoDB的核心是通过Python类定义数据表结构,每个类对应一个DynamoDB表,类属性对应表的键(分区键、排序键)和普通属性。

3.1.1 基础表模型定义

以下示例定义一个存储用户信息的数据表,包含分区键user_id,普通属性usernameageemail

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

class UserModel(Model):
    """
    定义DynamoDB用户数据表模型
    """
    class Meta:
        # 数据表名称
        table_name = "user_table"
        # AWS区域
        region = "us-east-1"
        # 本地测试时取消注释,指定DynamoDB Local地址
        # host = "http://localhost:8000"

    # 分区键:用户ID,字符串类型
    user_id = UnicodeAttribute(hash_key=True)
    # 普通属性:用户名,字符串类型
    username = UnicodeAttribute()
    # 普通属性:年龄,数字类型
    age = NumberAttribute()
    # 普通属性:邮箱,字符串类型,可选
    email = UnicodeAttribute(null=True)

代码说明

  • 继承pynamodb.models.Model类是定义数据表的前提;
  • Meta类用于配置表的元数据,包括表名、区域、连接地址等;
  • UnicodeAttributeNumberAttribute是PynamoDB提供的属性类型,分别对应字符串和数字类型,支持的属性类型还有BooleanAttributeBinaryAttribute等;
  • hash_key=True表示该属性为分区键,若需要排序键,可设置range_key=True

3.1.2 包含排序键的表模型定义

如果数据表需要复合主键(分区键+排序键),可添加一个属性并设置range_key=True,以下示例定义一个存储用户订单的数据表:

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute
import datetime

class UserOrderModel(Model):
    """
    定义用户订单数据表模型(复合主键:user_id + order_id)
    """
    class Meta:
        table_name = "user_order_table"
        region = "us-east-1"
        # host = "http://localhost:8000"

    # 分区键:用户ID
    user_id = UnicodeAttribute(hash_key=True)
    # 排序键:订单ID
    order_id = UnicodeAttribute(range_key=True)
    # 订单金额
    amount = NumberAttribute()
    # 订单创建时间
    create_time = UTCDateTimeAttribute(default=datetime.datetime.utcnow)

# 创建数据表(仅需执行一次)
if not UserOrderModel.exists():
    UserOrderModel.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)

代码说明

  • UTCDateTimeAttribute用于存储UTC时间,default参数可设置默认值为当前时间;
  • exists()方法判断表是否存在,create_table()方法用于创建表,read_capacity_unitswrite_capacity_units分别设置读写容量单位,wait=True表示等待表创建完成。

3.2 数据增删改查操作

定义好表模型后,就可以通过实例化模型类来执行数据的增删改查操作。

3.2.1 新增数据

使用模型类的构造方法创建实例,调用save()方法将数据存入DynamoDB:

# 新增用户数据
user = UserModel(
    user_id="u001",
    username="zhangsan",
    age=25,
    email="[email protected]"
)
user.save()
print(f"新增用户:{user.username}")

# 新增订单数据
order = UserOrderModel(
    user_id="u001",
    order_id="order001",
    amount=99.9
)
order.save()
print(f"新增订单:{order.order_id},金额:{order.amount}")

代码说明

  • 实例化模型类时,必须传入分区键(和排序键,若有)的参数;
  • save()方法会将实例数据写入对应的数据表,若主键已存在,则会覆盖原有数据。

3.2.2 查询数据

PynamoDB支持多种查询方式,包括按主键查询、条件查询、扫描表等。

(1)按主键查询单条数据

使用get()方法根据主键查询单条数据,适用于精确查询:

# 查询用户ID为u001的用户
try:
    user = UserModel.get("u001")
    print(f"用户ID:{user.user_id}")
    print(f"用户名:{user.username}")
    print(f"年龄:{user.age}")
    print(f"邮箱:{user.email}")
except UserModel.DoesNotExist:
    print("用户不存在")

# 查询用户u001的订单order001
try:
    order = UserOrderModel.get("u001", "order001")
    print(f"订单ID:{order.order_id}")
    print(f"金额:{order.amount}")
    print(f"创建时间:{order.create_time}")
except UserOrderModel.DoesNotExist:
    print("订单不存在")

代码说明

  • get()方法的参数顺序为分区键、排序键(若有);
  • 如果查询的数据不存在,会抛出DoesNotExist异常,需要捕获处理。
(2)条件查询多条数据

使用query()方法进行条件查询,适用于按分区键查询并添加过滤条件,以下示例查询用户u001的所有订单:

# 查询用户u001的所有订单
orders = UserOrderModel.query("u001")
for order in orders:
    print(f"订单ID:{order.order_id},金额:{order.amount}")

# 查询用户u001金额大于50的订单
from pynamodb.conditions import GreaterThan
orders = UserOrderModel.query(
    "u001",
    UserOrderModel.amount > GreaterThan(50)
)
for order in orders:
    print(f"符合条件的订单:{order.order_id},金额:{order.amount}")

代码说明

  • query()方法默认按分区键查询,返回该分区键下的所有数据;
  • 可以通过条件表达式添加过滤条件,pynamodb.conditions提供了GreaterThanLessThanContains等多种条件判断类。
(3)扫描整张表

使用scan()方法扫描整张表,返回所有数据(注意:DynamoDB扫描操作消耗容量较大,不建议在大表中使用):

# 扫描所有用户数据
users = UserModel.scan()
for user in users:
    print(f"用户:{user.username},年龄:{user.age}")

# 扫描年龄大于20的用户
users = UserModel.scan(UserModel.age > 20)
for user in users:
    print(f"年龄大于20的用户:{user.username}")

3.2.3 更新数据

更新数据有两种方式:一是获取数据实例后修改属性,调用save()方法;二是使用update()方法直接更新,支持条件更新。

(1)实例更新
# 获取用户实例并更新年龄
user = UserModel.get("u001")
user.age = 26
user.save()
print(f"更新后年龄:{user.age}")
(2)直接更新(推荐)
# 直接更新用户u001的邮箱,支持条件更新
from pynamodb.conditions import Attr
UserModel.update(
    "u001",
    actions=[
        UserModel.email.set("[email protected]")
    ],
    condition=Attr("age").exists()  # 条件:age属性存在
)
print("邮箱更新成功")

代码说明

  • update()方法的actions参数指定要执行的更新操作,支持setadddelete等动作;
  • condition参数设置更新的条件,只有满足条件时才会执行更新。

3.2.4 删除数据

使用delete()方法删除指定主键的数据,支持条件删除:

# 删除用户u001
UserModel.delete("u001")
print("用户删除成功")

# 条件删除订单:删除金额小于100的订单
UserOrderModel.delete(
    "u001",
    "order001",
    condition=UserOrderModel.amount < 100
)
print("订单删除成功")

3.3 高级功能使用

3.3.1 自动分页查询

当查询结果较多时,DynamoDB会自动分页,PynamoDB的查询结果对象支持迭代器,可直接遍历所有数据,无需手动处理分页:

# 查询所有订单,自动处理分页
orders = UserOrderModel.scan()
for order in orders:
    print(f"订单ID:{order.order_id},用户ID:{order.user_id}")
# 也可以手动获取分页标记
last_evaluated_key = orders.last_evaluated_key
while last_evaluated_key:
    orders = UserOrderModel.scan(exclusive_start_key=last_evaluated_key)
    for order in orders:
        print(f"订单ID:{order.order_id}")
    last_evaluated_key = orders.last_evaluated_key

3.3.2 事务操作

PynamoDB支持事务操作,可通过TransactionWrite类执行多个增删改操作,确保事务的原子性:

from pynamodb.transactions import TransactionWrite

# 事务操作:新增用户和订单
with TransactionWrite() as transaction:
    # 新增用户
    user = UserModel(user_id="u002", username="lisi", age=30)
    transaction.save(user)
    # 新增订单
    order = UserOrderModel(user_id="u002", order_id="order002", amount=199.9)
    transaction.save(order)
print("事务执行成功,用户和订单已新增")

代码说明

  • 使用with语句创建事务上下文,在上下文中执行的saveupdatedelete操作会被纳入事务;
  • 事务中任意一个操作失败,整个事务会回滚,所有操作都不会生效。

四、实际应用案例:用户订单管理系统

4.1 案例需求

开发一个简单的用户订单管理系统,实现以下功能:

  1. 注册新用户;
  2. 为用户创建订单;
  3. 查询指定用户的所有订单;
  4. 更新用户信息;
  5. 删除用户及对应的订单。

4.2 完整代码实现

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute
from pynamodb.conditions import Attr, GreaterThan
from pynamodb.transactions import TransactionWrite
import datetime

# 定义用户表模型
class UserModel(Model):
    class Meta:
        table_name = "user_management_table"
        region = "us-east-1"
        # host = "http://localhost:8000"

    user_id = UnicodeAttribute(hash_key=True)
    username = UnicodeAttribute()
    age = NumberAttribute()
    email = UnicodeAttribute(null=True)

# 定义订单表模型
class OrderModel(Model):
    class Meta:
        table_name = "order_management_table"
        region = "us-east-1"
        # host = "http://localhost:8000"

    user_id = UnicodeAttribute(hash_key=True)
    order_id = UnicodeAttribute(range_key=True)
    amount = NumberAttribute()
    create_time = UTCDateTimeAttribute(default=datetime.datetime.utcnow)

# 创建数据表(首次运行时执行)
def create_tables():
    if not UserModel.exists():
        UserModel.create_table(read_capacity_units=2, write_capacity_units=2, wait=True)
    if not OrderModel.exists():
        OrderModel.create_table(read_capacity_units=2, write_capacity_units=2, wait=True)
    print("数据表创建成功")

# 注册新用户
def register_user(user_id, username, age, email=None):
    user = UserModel(
        user_id=user_id,
        username=username,
        age=age,
        email=email
    )
    user.save()
    print(f"用户 {username} 注册成功")

# 创建用户订单
def create_order(user_id, order_id, amount):
    order = OrderModel(
        user_id=user_id,
        order_id=order_id,
        amount=amount
    )
    order.save()
    print(f"订单 {order_id} 创建成功,金额:{amount}")

# 查询用户所有订单
def query_user_orders(user_id):
    orders = OrderModel.query(user_id)
    print(f"\n用户 {user_id} 的订单列表:")
    for order in orders:
        print(f"订单ID:{order.order_id},金额:{order.amount},创建时间:{order.create_time}")

# 更新用户年龄
def update_user_age(user_id, new_age):
    UserModel.update(
        user_id,
        actions=[UserModel.age.set(new_age)],
        condition=Attr("user_id").exists()
    )
    print(f"用户 {user_id} 年龄更新为 {new_age}")

# 删除用户及订单
def delete_user_and_orders(user_id):
    with TransactionWrite() as transaction:
        # 删除用户
        transaction.delete(UserModel(user_id=user_id))
        # 删除用户所有订单
        orders = OrderModel.query(user_id)
        for order in orders:
            transaction.delete(OrderModel(user_id=user_id, order_id=order.order_id))
    print(f"用户 {user_id} 及所有订单已删除")

# 主函数
if __name__ == "__main__":
    # 创建数据表
    create_tables()

    # 注册用户
    register_user("u003", "wangwu", 28, "[email protected]")

    # 创建订单
    create_order("u003", "order003", 159.9)
    create_order("u003", "order004", 299.9)

    # 查询用户订单
    query_user_orders("u003")

    # 更新用户年龄
    update_user_age("u003", 29)

    # 删除用户及订单
    # delete_user_and_orders("u003")

代码说明

  • 该案例封装了用户和订单的核心操作函数,结构清晰,便于复用;
  • 使用事务操作删除用户及订单,确保数据一致性;
  • 运行前需确保AWS凭证配置正确,或启用本地DynamoDB环境。

五、相关资源地址

  • PyPI地址:https://pypi.org/project/PynamoDB
  • GitHub地址:https://github.com/pynamodb/PynamoDB
  • 官方文档地址:https://pynamodb.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:Pony ORM 入门到精通——高效操作数据库的极简方案

一、Pony ORM 核心概述

Pony ORM 是一款面向 Python 开发者的对象关系映射工具,它能将数据库表结构映射为 Python 类,让开发者以操作对象的方式替代原生 SQL 语句完成数据库的增删改查。其工作原理是将 Python 代码转换为对应的 SQL 语句,再与数据库交互并返回结果。该库支持 SQLite、MySQL、PostgreSQL 等主流数据库,License 为 Apache License 2.0

Pony ORM 的优点十分突出:语法简洁直观,贴近 Python 原生风格;支持自动生成数据库表结构;内置数据验证机制;查询性能优秀且支持懒加载。缺点则是对复杂 SQL 语句的支持灵活性稍弱,在超大规模分布式数据库场景下的适配性不如专业级 ORM 框架。

二、Pony ORM 安装步骤

对于技术小白来说,Pony ORM 的安装流程非常简单,只需要使用 Python 自带的包管理工具 pip 即可完成。无论是 Windows、Mac 还是 Linux 系统,操作命令完全一致。

2.1 基础安装命令

打开系统的命令行终端(Windows 是 CMD 或 PowerShell,Mac 和 Linux 是 Terminal),输入以下命令:

pip install pony

这条命令会自动从 PyPI 下载并安装最新版本的 Pony ORM 库。

2.2 验证安装是否成功

安装完成后,我们可以在 Python 交互式环境中验证是否安装成功。在终端输入 python 进入交互模式,然后执行以下代码:

import pony
print(pony.__version__)

如果终端能够输出 Pony ORM 的版本号(例如 0.7.16),则说明安装成功。如果出现 ModuleNotFoundError 错误,则需要检查 pip 命令是否执行正确,或者 Python 环境是否存在冲突。

三、Pony ORM 核心使用方法

Pony ORM 的核心使用流程分为三步:定义数据库映射类连接数据库执行数据库操作。下面我们以最常用的 SQLite 数据库为例(无需额外配置服务,文件型数据库更适合新手),详细讲解每一步的操作方法并搭配实例代码。

3.1 定义数据库映射类与连接数据库

首先,我们需要创建一个 Database 对象来连接数据库,然后通过继承 db.Entity 来定义数据表对应的 Python 类,类中的属性对应数据表的字段。

实例代码:定义学生信息表

from pony.orm import *

# 1. 创建Database对象,连接SQLite数据库
# 参数说明:provider指定数据库类型,filename指定数据库文件路径,create_db=True表示如果文件不存在则自动创建
db = Database()

# 2. 定义映射类,对应数据库中的"Student"表
class Student(db.Entity):
    # 定义字段,primary_key=True表示该字段为主键
    id = PrimaryKey(int, auto=True)  # 自增整数主键
    name = Required(str, max_len=50)  # 必填字符串字段,最大长度50
    age = Required(int)  # 必填整数字段
    gender = Optional(str, max_len=10, default="未知")  # 可选字符串字段,默认值为"未知"
    score = Optional(float)  # 可选浮点数字段,存储学生成绩

# 3. 绑定数据库并生成数据表
# 参数说明:db_session=True表示自动管理数据库会话
db.bind(provider='sqlite', filename='school.db', create_db=True)
# 创建所有定义的实体对应的表,如果表已存在则不会重复创建
db.generate_mapping(create_tables=True)

代码说明

  1. Database() 是 Pony ORM 的核心对象,负责管理数据库连接和实体映射关系。
  2. 继承 db.Entity 的类会被 Pony ORM 识别为数据表映射类,类名默认对应数据库中的表名(也可以通过 _table_ 属性自定义表名)。
  3. 字段类型说明:
    • PrimaryKey:定义主键字段,auto=True 表示自动递增。
    • Required:定义必填字段,必须传入值才能创建记录。
    • Optional:定义可选字段,可以不传入值,支持设置默认值。
  4. db.bind() 方法用于绑定具体的数据库,这里选择 SQLite 数据库,school.db 是数据库文件的名称,会保存在当前代码运行的目录下。
  5. db.generate_mapping() 方法用于根据定义的实体类生成数据库表结构,create_tables=True 表示如果表不存在则自动创建。

3.2 数据库会话管理

Pony ORM 要求所有数据库操作都必须在 数据库会话 中执行,常用的会话管理方式有两种:装饰器方式上下文管理器方式,两种方式都能自动完成会话的开启、提交和关闭操作,非常适合新手使用。

方式1:使用 @db_session 装饰器

这是最常用的方式,将装饰器添加在执行数据库操作的函数上即可。

@db_session
def add_student():
    # 在会话中执行数据库操作
    s1 = Student(name="张三", age=18, gender="男", score=92.5)
    s2 = Student(name="李四", age=19, gender="女", score=88.0)
    s3 = Student(name="王五", age=17, score=95.0)  # gender使用默认值"未知"
    # 无需手动提交,装饰器会自动提交事务

# 调用函数执行数据插入
add_student()

方式2:使用 db_session 上下文管理器

适合在代码块中执行数据库操作,使用 with 语句包裹操作代码。

with db_session:
    s4 = Student(name="赵六", age=20, gender="男", score=85.5)
    # 代码块结束后自动提交事务

代码说明

  1. 无论是装饰器还是上下文管理器,都不需要手动调用 commit() 提交事务,也不需要手动关闭会话,Pony ORM 会自动处理。
  2. 如果在操作过程中出现异常,会话会自动回滚,保证数据一致性。

3.3 数据查询操作

查询是数据库操作中最常用的功能,Pony ORM 提供了简洁的查询语法,支持条件查询、排序、分页、聚合等多种操作,完全可以替代原生 SQL 语句。

3.3.1 基础查询:获取所有记录

使用 select() 函数可以查询数据表中的记录,返回的是一个可迭代的 Query 对象,可以直接通过 for 循环遍历。

@db_session
def query_all_students():
    # 查询所有学生记录,select(s for s in Student) 等价于 SQL: SELECT * FROM Student
    students = select(s for s in Student)
    # 遍历查询结果
    for s in students:
        print(f"ID: {s.id}, 姓名: {s.name}, 年龄: {s.age}, 性别: {s.gender}, 成绩: {s.score}")

# 调用函数查询数据
query_all_students()

3.3.2 条件查询:筛选指定记录

select() 函数中添加条件表达式,可以筛选出符合要求的记录,条件表达式的写法和 Python 原生语法一致,非常容易理解。

@db_session
def query_student_by_condition():
    # 条件1:查询年龄大于18岁的学生
    students1 = select(s for s in Student if s.age > 18)
    print("年龄大于18岁的学生:")
    for s in students1:
        print(f"{s.name} - 年龄: {s.age}")

    # 条件2:查询性别为男且成绩大于90分的学生
    students2 = select(s for s in Student if s.gender == "男" and s.score > 90)
    print("\n性别为男且成绩大于90分的学生:")
    for s in students2:
        print(f"{s.name} - 成绩: {s.score}")

    # 条件3:查询姓名包含"张"字的学生(模糊查询)
    students3 = select(s for s in Student if "张" in s.name)
    print("\n姓名包含张字的学生:")
    for s in students3:
        print(f"{s.name} - ID: {s.id}")

# 调用函数执行条件查询
query_student_by_condition()

代码说明

  1. Pony ORM 会自动将条件表达式转换为对应的 SQL WHERE 子句,例如 s.age > 18 对应 WHERE age > 18
  2. 模糊查询可以直接使用 Python 的 in 关键字,等价于 SQL 中的 LIKE 语句。

3.3.3 排序与分页查询

在实际开发中,查询结果往往需要排序和分页,Pony ORM 提供了 order_by()limit()offset() 方法来实现这两个功能。

@db_session
def query_student_with_sort_and_page():
    # 1. 按成绩降序排序(从高到低)
    sorted_students = select(s for s in Student).order_by(desc(s.score))
    print("按成绩降序排序的学生:")
    for s in sorted_students:
        print(f"{s.name} - 成绩: {s.score}")

    # 2. 分页查询:获取第2页的数据,每页2条记录
    page_size = 2
    page_num = 2
    # offset表示跳过的记录数,计算方式:(页码-1)*每页条数
    paged_students = select(s for s in Student).order_by(s.id).limit(page_size).offset((page_num-1)*page_size)
    print(f"\n第{page_num}页数据(每页{page_size}条):")
    for s in paged_students:
        print(f"{s.id} - {s.name}")

# 调用函数执行排序和分页查询
query_student_with_sort_and_page()

代码说明

  1. order_by() 方法用于排序,传入 desc(字段) 表示降序,直接传入字段名表示升序。
  2. limit() 方法用于限制查询结果的数量,offset() 方法用于跳过指定数量的记录,两者结合实现分页功能。

3.3.4 聚合查询:统计数据

Pony ORM 支持常用的聚合函数,例如 count()sum()avg() 等,可以方便地实现数据统计功能。

@db_session
def aggregate_student_data():
    # 1. 统计学生总数
    total = count(s for s in Student)
    print(f"学生总数:{total}")

    # 2. 统计所有学生的平均成绩
    avg_score = avg(s.score for s in Student if s.score is not None)
    print(f"学生平均成绩:{avg_score:.2f}")

    # 3. 统计男生的最高成绩
    max_score_male = max(s.score for s in Student if s.gender == "男")
    print(f"男生最高成绩:{max_score_male}")

    # 4. 统计女生的总成绩
    sum_score_female = sum(s.score for s in Student if s.gender == "女")
    print(f"女生总成绩:{sum_score_female}")

# 调用函数执行聚合查询
aggregate_student_data()

代码说明

聚合函数可以直接作用于 select() 生成的 Query 对象,也可以直接通过函数调用的方式使用,语法简洁易懂,不需要编写复杂的 SQL 聚合语句。

3.4 数据更新与删除操作

除了查询,Pony ORM 也支持便捷的数据更新和删除操作,操作方式同样是通过操作 Python 对象来实现。

3.4.1 数据更新

更新数据只需要在会话中获取对应的对象,修改其属性值,会话结束时会自动提交更新。

@db_session
def update_student():
    # 1. 根据ID获取学生对象
    student = Student.get(id=1)  # get()方法用于根据主键获取单个对象
    if student:
        # 2. 修改对象属性
        student.age = 19
        student.score = 96.0
        print(f"更新后的数据:{student.name} - 年龄: {student.age}, 成绩: {student.score}")
    else:
        print("未找到ID为1的学生")

# 调用函数更新数据
update_student()

代码说明

Student.get(id=1) 等价于 SQL 语句 SELECT * FROM Student WHERE id=1 LIMIT 1,返回的是单个对象而不是 Query 对象,适合根据主键查询单条记录。

3.4.2 数据删除

删除数据的方式有两种:一是获取对象后调用 delete() 方法,二是直接通过 Query 对象调用 delete() 方法批量删除。

@db_session
def delete_student():
    # 方式1:删除单个对象
    student = Student.get(id=4)
    if student:
        student.delete()
        print(f"已删除学生:{student.name}")

    # 方式2:批量删除符合条件的对象
    # 删除年龄小于18岁的学生
    delete_count = delete(s for s in Student if s.age < 18)
    print(f"批量删除了{delete_count}条记录")

# 调用函数删除数据
delete_student()

代码说明

  1. 单个对象删除:调用对象的 delete() 方法即可。
  2. 批量删除:使用 delete() 函数配合条件表达式,返回值是删除的记录条数。

3.5 一对多关系映射

在实际的数据库设计中,表与表之间往往存在关联关系,例如“班级”和“学生”的一对多关系(一个班级有多个学生,一个学生属于一个班级)。Pony ORM 可以轻松实现这种关联关系的映射。

实例代码:定义班级与学生的一对多关系

from pony.orm import *

db = Database()

# 定义班级类(一的一方)
class Class(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str, max_len=30)  # 班级名称
    # 定义一对多关系,Set表示多个学生对象,reverse表示反向引用(学生对象可以通过class属性访问班级)
    students = Set("Student", reverse="class_")

# 定义学生类(多的一方)
class Student(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str, max_len=50)
    age = Required(int)
    # 定义外键关联,Required表示每个学生必须属于一个班级
    class_ = Required(Class, reverse="students")

# 绑定数据库并生成表
db.bind(provider='sqlite', filename='school_relation.db', create_db=True)
db.generate_mapping(create_tables=True)

# 插入测试数据
@db_session
def add_relation_data():
    # 创建两个班级
    c1 = Class(name="高一(1)班")
    c2 = Class(name="高一(2)班")

    # 创建学生并关联到班级
    s1 = Student(name="张三", age=16, class_=c1)
    s2 = Student(name="李四", age=16, class_=c1)
    s3 = Student(name="王五", age=17, class_=c2)

# 查询关联数据
@db_session
def query_relation_data():
    # 1. 查询班级对应的所有学生
    class1 = Class.get(name="高一(1)班")
    print(f"班级:{class1.name} 的学生列表:")
    for s in class1.students:
        print(f"- {s.name}")

    # 2. 查询学生所属的班级
    student = Student.get(name="王五")
    print(f"\n{student.name} 所属班级:{student.class_.name}")

# 调用函数执行操作
add_relation_data()
query_relation_data()

代码说明

  1. 一对多关系的核心是 SetRequired 的配合使用:
    • 一的一方(Class)使用 Set("Student") 表示该班级拥有多个学生。
    • 多的一方(Student)使用 Required(Class) 表示每个学生必须属于一个班级。
  2. reverse 参数用于设置反向引用的属性名,方便通过学生对象访问班级信息。

四、Pony ORM 实际应用案例:学生成绩管理系统

为了帮助大家更好地理解 Pony ORM 在实际项目中的应用,我们来开发一个简单的学生成绩管理系统,该系统实现以下功能:

  1. 添加学生信息和成绩
  2. 查询指定学生的成绩
  3. 更新学生成绩
  4. 统计班级的平均成绩
  5. 删除不及格的学生记录

4.1 完整案例代码

from pony.orm import *

# 1. 初始化数据库连接
db = Database()

# 2. 定义实体类
class Class(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str, max_len=30, unique=True)  # 班级名称唯一
    students = Set("Student", reverse="class_")

class Student(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str, max_len=50)
    age = Required(int)
    score = Required(float)  # 成绩字段
    class_ = Required(Class, reverse="students")

# 3. 绑定数据库并生成表
db.bind(provider='sqlite', filename='student_score_manage.db', create_db=True)
db.generate_mapping(create_tables=True)

# 4. 系统功能函数
@db_session
def add_class(class_name):
    """添加班级"""
    try:
        Class(name=class_name)
        print(f"班级 {class_name} 添加成功!")
    except UniqueError:
        print(f"班级 {class_name} 已存在!")

@db_session
def add_student(name, age, score, class_name):
    """添加学生信息"""
    class_ = Class.get(name=class_name)
    if not class_:
        print(f"班级 {class_name} 不存在,请先添加班级!")
        return
    Student(name=name, age=age, score=score, class_=class_)
    print(f"学生 {name} 添加成功!")

@db_session
def query_student_score(name):
    """查询指定学生的成绩"""
    student = Student.get(name=name)
    if student:
        print(f"学生:{student.name}")
        print(f"班级:{student.class_.name}")
        print(f"成绩:{student.score}")
    else:
        print(f"未找到学生 {name} 的信息!")

@db_session
def update_student_score(name, new_score):
    """更新学生成绩"""
    student = Student.get(name=name)
    if student:
        student.score = new_score
        print(f"学生 {name} 的成绩已更新为 {new_score}")
    else:
        print(f"未找到学生 {name} 的信息!")

@db_session
def calculate_class_avg_score(class_name):
    """统计班级平均成绩"""
    class_ = Class.get(name=class_name)
    if not class_:
        print(f"班级 {class_name} 不存在!")
        return
    avg_score = avg(s.score for s in class_.students)
    print(f"班级 {class_name} 的平均成绩为:{avg_score:.2f}")

@db_session
def delete_failed_students():
    """删除成绩低于60分的学生"""
    delete_count = delete(s for s in Student if s.score < 60)
    print(f"共删除了 {delete_count} 名不及格的学生记录")

# 5. 测试系统功能
if __name__ == "__main__":
    # 添加班级
    add_class("高一(1)班")
    add_class("高一(2)班")

    # 添加学生
    add_student("张三", 16, 92.5, "高一(1)班")
    add_student("李四", 16, 85.0, "高一(1)班")
    add_student("王五", 17, 58.0, "高一(2)班")
    add_student("赵六", 17, 76.5, "高一(2)班")

    # 查询学生成绩
    print("\n===== 查询学生成绩 =====")
    query_student_score("张三")

    # 更新学生成绩
    print("\n===== 更新学生成绩 =====")
    update_student_score("李四", 88.5)
    query_student_score("李四")

    # 统计班级平均成绩
    print("\n===== 统计班级平均成绩 =====")
    calculate_class_avg_score("高一(1)班")

    # 删除不及格学生
    print("\n===== 删除不及格学生 =====")
    delete_failed_students()

    # 查看删除后的班级平均成绩
    print("\n===== 删除后班级平均成绩 =====")
    calculate_class_avg_score("高一(2)班")

4.2 代码运行结果

班级 高一(1)班 添加成功!
班级 高一(2)班 添加成功!
学生 张三 添加成功!
学生 李四 添加成功!
学生 王五 添加成功!
学生 赵六 添加成功!

===== 查询学生成绩 =====
学生:张三
班级:高一(1)班
成绩:92.5

===== 更新学生成绩 =====
学生 李四 的成绩已更新为 88.5
学生:李四
班级:高一(1)班
成绩:88.5

===== 统计班级平均成绩 =====
班级 高一(1)班 的平均成绩为:90.50

===== 删除不及格学生 =====
共删除了 1 名不及格的学生记录

===== 删除后班级平均成绩 =====
班级 高一(2)班 的平均成绩为:76.50

4.3 案例说明

这个学生成绩管理系统虽然简单,但涵盖了 Pony ORM 的核心操作,包括实体定义、关联关系、增删改查和聚合统计。在实际开发中,我们可以基于这个框架扩展更多功能,例如添加课程表、教师表,实现更复杂的多表关联查询。

五、Pony ORM 相关资源

  • PyPI 地址:https://pypi.org/project/Pony
  • Github 地址:https://github.com/ponyorm/pony
  • 官方文档地址:https://docs.ponyorm.org/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:pandas-gbq 从入门到精通的完整指南

一、pandas-gbq 库核心概述

pandas-gbq 是 pandas 生态中专门用于连接 Google BigQuery 数据仓库的工具库,它的核心用途是实现 pandas DataFrame 与 Google BigQuery 数据表之间的高效数据读写操作。其工作原理是基于 Google Cloud 的 BigQuery API,将 pandas 的数据结构与 BigQuery 的表结构进行映射,通过认证机制建立连接后,完成数据的上传、下载与查询任务。

该库的优点在于操作简洁,能无缝衔接 pandas 数据分析工作流,无需手动处理复杂的 API 调用和数据格式转换;缺点是高度依赖 Google Cloud 环境配置,且数据传输速度受网络和 BigQuery 配额限制。pandas-gbq 采用 BSD 3-Clause 许可证,这是一种宽松的开源许可证,允许用户自由使用、修改和分发代码,仅需保留原作者的版权声明。

二、pandas-gbq 安装与环境配置

2.1 库的安装

安装 pandas-gbq 非常简单,使用 Python 包管理工具 pip 即可完成,打开命令行终端,输入以下命令:

pip install pandas-gbq

这条命令会自动下载并安装 pandas-gbq 的最新稳定版本,同时安装其依赖项,包括 pandas、google-cloud-bigquery、google-auth 等。如果需要安装指定版本的 pandas-gbq,可以在命令后加上版本号,例如安装 0.19.0 版本:

pip install pandas-gbq==0.19.0

安装完成后,可以在 Python 环境中通过导入语句验证是否安装成功:

import pandas_gbq
print(pandas_gbq.__version__)

运行上述代码,如果控制台输出对应的版本号,说明安装成功。

2.2 Google Cloud 环境配置

要使用 pandas-gbq 操作 BigQuery,必须先完成 Google Cloud 的环境配置,主要包括以下几个步骤:

  1. 创建 Google Cloud 项目
    登录 Google Cloud 控制台,创建一个新的项目,或者选择已有的项目。项目是 Google Cloud 资源管理的基本单位,后续的 BigQuery 操作都需要关联到具体项目。
  2. 启用 BigQuery API
    在 Google Cloud 控制台的 API 和服务页面,搜索并启用 BigQuery API,只有启用该 API,才能通过 pandas-gbq 调用 BigQuery 的相关功能。
  3. 创建并下载认证密钥文件
    为了让本地程序能够访问 Google Cloud 资源,需要创建服务账号并生成认证密钥。在 Google Cloud 控制台的 “IAM 与管理-服务账号” 页面,创建新的服务账号,为其分配合适的权限,例如 BigQuery Admin(管理员权限,适合开发测试)或 BigQuery Data Editor(数据编辑权限,适合生产环境)。创建完成后,为服务账号生成并下载 JSON 格式的密钥文件。
  4. 配置环境变量 将下载的 JSON 密钥文件的路径配置到环境变量 GOOGLE_APPLICATION_CREDENTIALS 中,这是 Google Cloud 认证的默认方式。
    • Windows 系统(命令行):
      bash set GOOGLE_APPLICATION_CREDENTIALS="C:\path\to\your\keyfile.json"
    • Linux/Mac 系统(终端):
      bash export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/keyfile.json"
      配置完成后,程序就能自动识别认证信息,无需在代码中手动指定密钥路径。

三、pandas-gbq 核心功能与代码实例

pandas-gbq 的核心功能分为三大类:从 BigQuery 查询数据到 DataFrame、将 DataFrame 数据写入 BigQuery、以及对 BigQuery 表的基础管理操作。下面结合具体代码实例详细讲解每个功能的使用方法。

3.1 从 BigQuery 查询数据到 DataFrame

这是 pandas-gbq 最常用的功能之一,通过编写 SQL 查询语句,将 BigQuery 中的数据读取到 pandas DataFrame 中,方便后续的数据分析和处理。核心函数是 pandas_gbq.read_gbq()

3.1.1 基础查询示例

假设我们有一个 BigQuery 数据集 my_dataset,其中包含一个表 sales_data,存储了某电商平台的销售数据,表结构如下:
| 字段名 | 数据类型 | 说明 |
|–|-||
| order_id | STRING | 订单ID |
| sale_date | DATE | 销售日期 |
| product_id | STRING | 商品ID |
| sale_amount | FLOAT | 销售金额 |
| customer_id | STRING | 客户ID |

现在需要查询 2024 年 1 月的销售数据,代码如下:

import pandas as pd
import pandas_gbq

# 定义SQL查询语句
query = """
SELECT order_id, sale_date, product_id, sale_amount, customer_id
FROM `my_project.my_dataset.sales_data`
WHERE sale_date BETWEEN '2024-01-01' AND '2024-01-31'
"""

# 执行查询,将结果读取到DataFrame
df = pandas_gbq.read_gbq(
    query=query,
    project_id="my_project"  # 替换为你的Google Cloud项目ID
)

# 查看数据的前5行
print(df.head())

代码说明

  • 首先导入 pandas 和 pandas_gbq 库;
  • 定义 SQL 查询语句,注意 BigQuery 的表名格式为 项目ID.数据集ID.表名,需要用反引号包裹;
  • 调用 read_gbq() 函数,传入查询语句和项目 ID,函数会自动完成认证并执行查询;
  • 最后通过 head() 方法查看数据的前 5 行,验证数据是否成功读取。

3.1.2 带参数的查询

在实际应用中,经常需要根据动态参数执行查询,例如查询指定月份的销售数据。pandas-gbq 支持通过 params 参数传入查询参数,避免手动拼接 SQL 语句导致的安全问题(如 SQL 注入)。代码示例如下:

import pandas as pd
import pandas_gbq

# 定义动态参数
start_date = "2024-02-01"
end_date = "2024-02-29"

# 定义带参数的SQL查询语句
query = """
SELECT order_id, sale_date, product_id, sale_amount, customer_id
FROM `my_project.my_dataset.sales_data`
WHERE sale_date BETWEEN @start_date AND @end_date
"""

# 执行带参数的查询
df = pandas_gbq.read_gbq(
    query=query,
    project_id="my_project",
    params={"start_date": start_date, "end_date": end_date}
)

# 查看数据的基本信息
print(df.info())

代码说明

  • SQL 查询语句中使用 @参数名 的格式定义参数占位符;
  • read_gbq() 函数中,通过 params 参数传入一个字典,字典的键为参数名,值为参数的具体取值;
  • 这种方式不仅提高了代码的灵活性,还能有效防止 SQL 注入攻击,保证查询的安全性。

3.1.3 分页查询大数据集

当查询的数据集非常大时,一次性读取所有数据可能会导致内存不足。pandas-gbq 支持分页查询,通过设置 chunksize 参数,将查询结果分成多个小块,逐块读取和处理。代码示例如下:

import pandas as pd
import pandas_gbq

# 定义SQL查询语句,查询所有销售数据
query = """
SELECT * FROM `my_project.my_dataset.sales_data`
"""

# 分页读取数据,每次读取10000行
chunk_iter = pandas_gbq.read_gbq(
    query=query,
    project_id="my_project",
    chunksize=10000
)

# 遍历每个数据块并进行处理
for chunk in chunk_iter:
    # 示例:计算每个数据块的销售金额总和
    total_sale = chunk["sale_amount"].sum()
    print(f"当前数据块销售金额总和:{total_sale}")

代码说明

  • 设置 chunksize=10000 后,read_gbq() 函数会返回一个迭代器,每次迭代返回一个包含 10000 行数据的 DataFrame;
  • 通过遍历迭代器,可以逐块处理数据,避免一次性加载大量数据占用过多内存;
  • 这种方法适用于处理超大数据集,是大数据分析中常用的优化手段。

3.2 将 DataFrame 数据写入 BigQuery

将本地或处理后的 pandas DataFrame 数据写入 BigQuery 表,是数据存储和共享的重要环节。pandas-gbq 提供了 pandas_gbq.to_gbq() 函数,支持将 DataFrame 数据写入新表或追加到已有表中。

3.2.1 写入新表

假设我们有一个本地的 DataFrame,存储了新的销售数据,需要将其写入 BigQuery 的 my_dataset 数据集中,创建一个新表 new_sales_data。代码示例如下:

import pandas as pd
import pandas_gbq

# 创建本地DataFrame
data = {
    "order_id": ["OD202403001", "OD202403002", "OD202403003"],
    "sale_date": ["2024-03-01", "2024-03-01", "2024-03-02"],
    "product_id": ["P001", "P002", "P001"],
    "sale_amount": [199.9, 299.9, 199.9],
    "customer_id": ["C001", "C002", "C003"]
}
df = pd.DataFrame(data)

# 将DataFrame写入BigQuery新表
pandas_gbq.to_gbq(
    dataframe=df,
    destination_table="my_dataset.new_sales_data",  # 目标表:数据集.表名
    project_id="my_project",
    if_exists="fail"  # 如果表已存在,则抛出错误
)

print("数据成功写入BigQuery新表!")

代码说明

  • 首先创建一个包含销售数据的 DataFrame;
  • 调用 to_gbq() 函数,传入 DataFrame、目标表名和项目 ID;
  • if_exists 参数用于指定表已存在时的处理方式,可选值有:
  • fail:默认值,如果表已存在则抛出错误;
  • replace:覆盖已存在的表;
  • append:将数据追加到已存在的表中。

3.2.2 追加数据到已有表

在日常业务中,经常需要将新增的数据追加到 BigQuery 的已有表中,例如每日新增的销售数据。此时只需将 if_exists 参数设置为 append 即可。代码示例如下:

import pandas as pd
import pandas_gbq

# 创建新增的销售数据DataFrame
new_data = {
    "order_id": ["OD202403004", "OD202403005"],
    "sale_date": ["2024-03-02", "2024-03-03"],
    "product_id": ["P003", "P002"],
    "sale_amount": [399.9, 299.9],
    "customer_id": ["C004", "C005"]
}
df_new = pd.DataFrame(new_data)

# 将新增数据追加到已有表中
pandas_gbq.to_gbq(
    dataframe=df_new,
    destination_table="my_dataset.new_sales_data",
    project_id="my_project",
    if_exists="append"
)

print("数据成功追加到BigQuery已有表!")

代码说明

  • 创建包含新增数据的 DataFrame df_new
  • 设置 if_exists="append",函数会将 df_new 中的数据追加到 my_dataset.new_sales_data 表中;
  • 追加数据时,需要确保 DataFrame 的列名和数据类型与 BigQuery 表的结构一致,否则会导致写入失败。

3.2.3 指定数据类型写入

默认情况下,pandas-gbq 会根据 DataFrame 的列数据类型自动推断 BigQuery 表的字段类型,但有时自动推断的结果可能不符合需求。此时可以通过 table_schema 参数手动指定字段的数据类型。代码示例如下:

import pandas as pd
import pandas_gbq

# 创建DataFrame
data = {
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "score": [95.5, 88.0, 92.3]
}
df = pd.DataFrame(data)

# 手动指定BigQuery表的schema
table_schema = [
    {"name": "id", "type": "INTEGER"},
    {"name": "name", "type": "STRING"},
    {"name": "score", "type": "FLOAT"}
]

# 写入数据并指定schema
pandas_gbq.to_gbq(
    dataframe=df,
    destination_table="my_dataset.student_scores",
    project_id="my_project",
    table_schema=table_schema,
    if_exists="replace"
)

print("数据成功写入,并应用自定义schema!")

代码说明

  • table_schema 参数接受一个列表,列表中的每个元素是一个字典,包含 name(字段名)和 type(BigQuery 数据类型)两个键;
  • BigQuery 支持的数据类型包括 INTEGERFLOATSTRINGDATEDATETIME 等,详细类型可参考 BigQuery 官方文档;
  • 手动指定 schema 可以确保 BigQuery 表的字段类型符合业务需求,避免自动推断带来的误差。

3.3 BigQuery 表的基础管理操作

除了数据的读写,pandas-gbq 还可以结合 Google Cloud 的其他库,实现对 BigQuery 表的基础管理操作,例如查看表的元数据、删除表等。需要注意的是,部分高级管理功能需要依赖 google-cloud-bigquery 库。

3.3.1 查看表的元数据

查看 BigQuery 表的元数据,包括字段名、数据类型、表描述等信息,代码示例如下:

import pandas_gbq
from google.cloud import bigquery

# 初始化BigQuery客户端
client = bigquery.Client(project="my_project")

# 定义表的引用
table_ref = client.dataset("my_dataset").table("sales_data")

# 获取表的元数据
table = client.get_table(table_ref)

# 打印表的基本信息
print(f"表名:{table.table_id}")
print(f"数据集:{table.dataset_id}")
print(f"创建时间:{table.created}")
print("字段信息:")
for schema_field in table.schema:
    print(f"- {schema_field.name}: {schema_field.field_type}")

代码说明

  • 首先导入 google.cloud.bigquery 库,并初始化 BigQuery 客户端;
  • 通过 dataset()table() 方法创建表的引用;
  • 调用 get_table() 方法获取表的元数据对象,通过该对象可以访问表的各种属性,如表名、创建时间、字段信息等。

3.3.2 删除 BigQuery 表

如果需要删除不再使用的 BigQuery 表,可以使用 client.delete_table() 方法,代码示例如下:

from google.cloud import bigquery

# 初始化BigQuery客户端
client = bigquery.Client(project="my_project")

# 定义要删除的表的引用
table_ref = client.dataset("my_dataset").table("temp_table")

# 删除表
client.delete_table(table_ref)

print("表已成功删除!")

代码说明

  • 该操作需要确保服务账号拥有 BigQuery Table Admin 权限;
  • 删除表的操作不可逆,执行前需要确认表中的数据不再需要,避免数据丢失。

四、pandas-gbq 实际应用案例:电商销售数据分析

为了更好地理解 pandas-gbq 在实际项目中的应用,我们以电商销售数据分析为例,完整展示从 BigQuery 读取数据、数据清洗、分析可视化到结果写入 BigQuery 的全流程。

4.1 案例背景

某电商平台将每日销售数据存储在 Google BigQuery 的 ecommerce.sales_data 表中,需要分析 2024 年第一季度(1-3 月)的销售情况,包括:

  1. 每月的销售总额;
  2. 热销商品 TOP10;
  3. 客户购买频次分布。
    最后将分析结果写入 BigQuery 的 ecommerce.sales_analysis 表中,供业务部门查看。

4.2 完整代码实现

import pandas as pd
import pandas_gbq
import matplotlib.pyplot as plt
from google.cloud import bigquery

# - 步骤1:配置环境与初始化客户端 -
# 初始化BigQuery客户端
client = bigquery.Client(project="my_ecommerce_project")

# - 步骤2:从BigQuery读取第一季度销售数据 -
query = """
SELECT 
    order_id,
    sale_date,
    product_id,
    sale_amount,
    customer_id
FROM `my_ecommerce_project.ecommerce.sales_data`
WHERE sale_date BETWEEN '2024-01-01' AND '2024-03-31'
"""

# 读取数据到DataFrame
df_sales = pandas_gbq.read_gbq(
    query=query,
    project_id="my_ecommerce_project"
)

# - 步骤3:数据清洗 -
# 查看数据是否存在缺失值
print("缺失值情况:")
print(df_sales.isnull().sum())

# 删除缺失值所在行
df_sales = df_sales.dropna()

# 将sale_date转换为日期类型
df_sales["sale_date"] = pd.to_datetime(df_sales["sale_date"])

# 添加月份列,方便后续按月分析
df_sales["month"] = df_sales["sale_date"].dt.month_name()

# - 步骤4:数据分析 -
# 4.1 计算每月销售总额
monthly_sales = df_sales.groupby("month")["sale_amount"].sum().reset_index()
monthly_sales.columns = ["month", "total_sale"]
# 按月份顺序排序
month_order = ["January", "February", "March"]
monthly_sales["month"] = pd.Categorical(monthly_sales["month"], categories=month_order, ordered=True)
monthly_sales = monthly_sales.sort_values("month")
print("\n每月销售总额:")
print(monthly_sales)

# 4.2 热销商品TOP10
top10_products = df_sales.groupby("product_id")["sale_amount"].sum().reset_index()
top10_products = top10_products.sort_values("sale_amount", ascending=False).head(10)
print("\n热销商品TOP10:")
print(top10_products)

# 4.3 客户购买频次分布
customer_freq = df_sales.groupby("customer_id")["order_id"].nunique().reset_index()
customer_freq.columns = ["customer_id", "order_count"]
freq_distribution = customer_freq.groupby("order_count")["customer_id"].count().reset_index()
freq_distribution.columns = ["order_count", "customer_num"]
print("\n客户购买频次分布:")
print(freq_distribution)

# - 步骤5:数据可视化 -
# 设置中文字体,避免图表中文乱码
plt.rcParams["font.sans-serif"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False

# 绘制每月销售总额柱状图
plt.figure(figsize=(10, 6))
plt.bar(monthly_sales["month"], monthly_sales["total_sale"], color="#1f77b4")
plt.title("2024年第一季度每月销售总额", fontsize=14)
plt.xlabel("月份", fontsize=12)
plt.ylabel("销售总额(元)", fontsize=12)
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.savefig("monthly_sales.png", dpi=300, bbox_inches="tight")
plt.show()

# - 步骤6:将分析结果写入BigQuery -
# 合并所有分析结果到一个DataFrame
# 为了简化,这里以每月销售总额为例写入BigQuery
pandas_gbq.to_gbq(
    dataframe=monthly_sales,
    destination_table="ecommerce.sales_analysis",
    project_id="my_ecommerce_project",
    if_exists="replace"
)

print("\n分析结果已成功写入BigQuery表!")

4.3 案例说明

  1. 数据读取与清洗:通过 read_gbq() 读取第一季度销售数据,使用 dropna() 删除缺失值,将日期字段转换为 datetime 类型,并添加月份列,为后续分析做准备。
  2. 核心分析:利用 pandas 的分组聚合功能,分别计算每月销售总额、热销商品 TOP10 和客户购买频次分布,这些分析结果能帮助业务部门了解销售趋势和客户行为。
  3. 数据可视化:使用 matplotlib 绘制每月销售总额柱状图,直观展示销售趋势,图表保存为图片文件,方便汇报使用。
  4. 结果存储:将每月销售总额的分析结果写入 BigQuery 的 sales_analysis 表中,实现分析结果的共享和长期存储。

五、pandas-gbq 相关资源地址

  • PyPI 地址:https://pypi.org/project/pandas-gbq
  • GitHub 地址:https://github.com/pandas-dev/pandas-gbq
  • 官方文档地址:https://pandas-gbq.readthedocs.io/

关注我,每天分享一个实用的Python自动化工具。

Python实用工具:minio库入门到精通——对象存储操作极简指南

一、minio库核心概述

minio是一款用于访问MinIO对象存储服务的Python客户端库,能实现存储桶的创建、删除,以及对象的上传、下载、删除等操作。其工作原理是通过与MinIO服务端的API接口进行交互,遵循S3协议规范,可轻松对接私有部署的MinIO服务或兼容S3协议的云存储服务。该库优点是轻量高效、API简洁易懂、兼容性强;缺点是对复杂的分布式存储集群管理支持有限,需依赖服务端配置。minio库采用Apache License 2.0开源协议,允许商业和非商业自由使用与修改。

二、minio库安装方法

在使用minio库之前,我们需要先确保本地已经安装了Python环境(建议Python 3.6及以上版本),然后通过pip包管理工具即可完成安装,具体命令如下:

pip install minio

执行上述命令后,pip会自动从PyPI下载并安装最新版本的minio库及其依赖项。安装完成后,我们可以在Python脚本中通过import minio来验证是否安装成功,若没有报错,则说明安装完成。

三、minio库核心操作实战

3.1 初始化MinIO客户端连接

要使用minio库操作MinIO对象存储,第一步是创建客户端连接对象,需要传入服务端的地址、Access Key、Secret Key以及是否启用HTTPS等参数。

代码示例

from minio import Minio
from minio.error import S3Error

def create_minio_client():
    # 配置MinIO服务端信息
    minio_server = "localhost:9000"  # MinIO服务地址,端口默认9000
    access_key = "your-access-key"    # 访问密钥,对应服务端配置
    secret_key = "your-secret-key"    # 秘密密钥,对应服务端配置
    # 初始化客户端,secure=False表示不使用HTTPS,生产环境建议开启
    client = Minio(
        minio_server,
        access_key=access_key,
        secret_key=secret_key,
        secure=False
    )
    return client

if __name__ == "__main__":
    try:
        client = create_minio_client()
        print("MinIO客户端连接成功!")
    except S3Error as e:
        print(f"客户端连接失败: {e}")

代码说明

  • 首先导入Minio类和异常处理类S3ErrorS3Error用于捕获MinIO操作过程中可能出现的各类错误。
  • create_minio_client函数中,填写MinIO服务端的实际地址、Access Key和Secret Key,这些参数需要与MinIO服务端的配置保持一致。
  • secure=False表示使用HTTP协议连接,若服务端配置了HTTPS,则需要将其改为True
  • 通过if __name__ == "__main__"主程序入口,调用函数创建客户端并测试连接,成功则打印提示信息,失败则捕获异常并输出错误原因。

3.2 存储桶(Bucket)操作

存储桶是MinIO中用于存放对象的容器,类似于文件系统中的文件夹,但级别更高。minio库提供了创建、查询、删除存储桶的完整方法。

3.2.1 创建存储桶

当我们需要存储对象时,首先要创建一个对应的存储桶,创建前可以先判断该存储桶是否已经存在,避免重复创建。

代码示例
from minio import Minio
from minio.error import S3Error

def create_bucket(client, bucket_name):
    # 检查存储桶是否存在
    if not client.bucket_exists(bucket_name):
        # 创建存储桶
        client.make_bucket(bucket_name)
        print(f"存储桶 {bucket_name} 创建成功!")
    else:
        print(f"存储桶 {bucket_name} 已存在,无需重复创建。")

if __name__ == "__main__":
    try:
        # 初始化客户端
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        # 定义要创建的存储桶名称
        bucket_name = "my-first-bucket"
        # 调用创建函数
        create_bucket(client, bucket_name)
    except S3Error as e:
        print(f"存储桶操作失败: {e}")
代码说明
  • bucket_exists方法接收存储桶名称作为参数,返回布尔值,True表示存在,False表示不存在。
  • make_bucket方法用于创建新的存储桶,参数为存储桶名称,需要注意存储桶名称的命名规范:只能包含小写字母、数字、连字符(-),且不能以连字符开头或结尾,长度在3-63个字符之间。

3.2.2 列出所有存储桶

在实际开发中,我们可能需要查看当前MinIO服务中所有的存储桶信息,minio库提供了list_buckets方法来实现这个功能。

代码示例
from minio import Minio
from minio.error import S3Error

def list_all_buckets(client):
    # 列出所有存储桶
    buckets = client.list_buckets()
    print("当前MinIO服务中的存储桶列表:")
    for bucket in buckets:
        print(f"存储桶名称: {bucket.name}, 创建时间: {bucket.creation_date}")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        list_all_buckets(client)
    except S3Error as e:
        print(f"获取存储桶列表失败: {e}")
代码说明
  • list_buckets方法返回一个存储桶对象的列表,每个存储桶对象包含name(存储桶名称)和creation_date(创建时间)两个核心属性。
  • 通过循环遍历列表,我们可以逐个打印出所有存储桶的详细信息,方便进行管理和查看。

3.2.3 删除存储桶

对于不再需要的存储桶,我们可以使用remove_bucket方法将其删除,需要注意的是,删除存储桶前必须确保该存储桶内没有任何对象,否则删除操作会失败。

代码示例
from minio import Minio
from minio.error import S3Error

def delete_bucket(client, bucket_name):
    # 先检查存储桶是否存在
    if client.bucket_exists(bucket_name):
        # 尝试删除存储桶
        client.remove_bucket(bucket_name)
        print(f"存储桶 {bucket_name} 删除成功!")
    else:
        print(f"存储桶 {bucket_name} 不存在,无需删除。")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        bucket_name = "my-first-bucket"
        delete_bucket(client, bucket_name)
    except S3Error as e:
        print(f"删除存储桶失败: {e}")
代码说明
  • remove_bucket方法接收存储桶名称作为参数,执行删除操作。
  • 如果存储桶内存在对象,调用该方法会抛出S3Error异常,提示“BucketNotEmpty”,因此在删除前可以先列出桶内对象并删除,再执行桶删除操作。

3.3 对象(Object)操作

对象是MinIO存储的基本单元,对应文件系统中的文件,minio库支持对象的上传、下载、删除、列出等核心操作,是日常开发中最常用的功能模块。

3.3.1 上传本地文件到存储桶

将本地文件上传到MinIO存储桶是最基础的需求之一,minio库提供了fput_object方法,支持本地文件的直接上传。

代码示例
from minio import Minio
from minio.error import S3Error

def upload_file_to_bucket(client, bucket_name, local_file_path, remote_file_name):
    # 检查存储桶是否存在,不存在则创建
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
        print(f"存储桶 {bucket_name} 不存在,已自动创建。")
    # 上传文件
    client.fput_object(
        bucket_name=bucket_name,
        object_name=remote_file_name,
        file_path=local_file_path
    )
    print(f"文件 {local_file_path} 已成功上传到存储桶 {bucket_name},远程文件名: {remote_file_name}")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        bucket_name = "my-file-bucket"
        local_file = "./test.txt"  # 本地待上传的文件路径
        remote_name = "uploaded_test.txt"  # 上传到存储桶后的文件名
        upload_file_to_bucket(client, bucket_name, local_file, remote_name)
    except S3Error as e:
        print(f"文件上传失败: {e}")
代码说明
  • fput_object方法有三个核心参数:bucket_name(目标存储桶名称)、object_name(上传后的对象名称)、file_path(本地文件的路径)。
  • 该方法会自动读取本地文件并上传到指定存储桶,上传完成后,我们可以通过MinIO的控制台或者其他工具查看上传的对象。
  • 如果需要上传大文件,可以考虑使用分片上传功能,minio库也提供了对应的multipart_upload相关方法。

3.3.2 从存储桶下载文件到本地

与上传操作相对应,fget_object方法可以将存储桶中的对象下载到本地指定路径。

代码示例
from minio import Minio
from minio.error import S3Error

def download_file_from_bucket(client, bucket_name, remote_file_name, local_file_path):
    # 检查存储桶是否存在
    if not client.bucket_exists(bucket_name):
        print(f"存储桶 {bucket_name} 不存在,无法下载文件。")
        return
    # 下载文件
    client.fget_object(
        bucket_name=bucket_name,
        object_name=remote_file_name,
        file_path=local_file_path
    )
    print(f"文件 {remote_file_name} 已从存储桶 {bucket_name} 下载到本地 {local_file_path}")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        bucket_name = "my-file-bucket"
        remote_name = "uploaded_test.txt"
        local_file = "./downloaded_test.txt"
        download_file_from_bucket(client, bucket_name, remote_name, local_file)
    except S3Error as e:
        print(f"文件下载失败: {e}")
代码说明
  • fget_object方法的参数与fput_object类似,object_name为存储桶中的对象名称,file_path为本地保存的路径。
  • 如果指定的本地路径已存在同名文件,该方法会覆盖原有文件,因此在使用时需要注意文件的唯一性。

3.3.3 列出存储桶中的所有对象

当需要查看某个存储桶内的所有对象时,可以使用list_objects方法,该方法支持分页查询和前缀过滤。

代码示例
from minio import Minio
from minio.error import S3Error

def list_objects_in_bucket(client, bucket_name, prefix=None):
    if not client.bucket_exists(bucket_name):
        print(f"存储桶 {bucket_name} 不存在。")
        return
    # 列出桶内对象,prefix用于过滤对象名称前缀
    objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)
    print(f"存储桶 {bucket_name} 中的对象列表:")
    for obj in objects:
        print(f"对象名称: {obj.object_name}, 大小: {obj.size} bytes, 最后修改时间: {obj.last_modified}")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        bucket_name = "my-file-bucket"
        # 过滤前缀为"uploaded"的对象,若不指定前缀则传入None
        list_objects_in_bucket(client, bucket_name, prefix="uploaded")
    except S3Error as e:
        print(f"列出对象失败: {e}")
代码说明
  • list_objects方法的prefix参数用于过滤对象名称,例如传入prefix="uploaded",则只会列出名称以“uploaded”开头的对象。
  • recursive=True表示递归列出所有对象,包括子目录下的对象(MinIO中没有真正的目录,通过对象名称中的/模拟目录结构)。
  • 每个对象包含object_name(对象名称)、size(大小)、last_modified(最后修改时间)等属性,方便获取对象的详细信息。

3.3.4 删除存储桶中的对象

对于不需要的对象,可以使用remove_object方法进行删除,同时minio库还支持remove_objects方法批量删除多个对象。

代码示例(单个删除)
from minio import Minio
from minio.error import S3Error

def delete_single_object(client, bucket_name, object_name):
    if not client.bucket_exists(bucket_name):
        print(f"存储桶 {bucket_name} 不存在。")
        return
    # 删除单个对象
    client.remove_object(bucket_name, object_name)
    print(f"对象 {object_name} 已从存储桶 {bucket_name} 中删除。")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        bucket_name = "my-file-bucket"
        object_name = "uploaded_test.txt"
        delete_single_object(client, bucket_name, object_name)
    except S3Error as e:
        print(f"删除对象失败: {e}")
代码示例(批量删除)
from minio import Minio
from minio.error import S3Error

def delete_multiple_objects(client, bucket_name, object_names):
    if not client.bucket_exists(bucket_name):
        print(f"存储桶 {bucket_name} 不存在。")
        return
    # 构建待删除对象的迭代器
    delete_objects = [{"name": name} for name in object_names]
    errors = client.remove_objects(bucket_name, delete_objects)
    # 处理删除过程中的错误
    for error in errors:
        print(f"删除对象 {error._object_name} 失败: {error}")
    print("批量删除操作执行完成。")

if __name__ == "__main__":
    try:
        client = Minio(
            "localhost:9000",
            access_key="your-access-key",
            secret_key="your-secret-key",
            secure=False
        )
        bucket_name = "my-file-bucket"
        objects_to_delete = ["uploaded_test.txt", "test_image.jpg"]
        delete_multiple_objects(client, bucket_name, objects_to_delete)
    except S3Error as e:
        print(f"批量删除失败: {e}")
代码说明
  • 单个删除使用remove_object方法,参数为存储桶名称和对象名称。
  • 批量删除使用remove_objects方法,需要传入一个包含对象名称的字典列表,该方法会返回一个错误迭代器,用于捕获删除失败的对象信息。

四、minio库实际应用案例——文件备份工具

在实际开发中,我们可以利用minio库快速实现一个本地文件备份工具,自动将指定目录下的文件备份到MinIO存储桶中,下面是完整的案例代码。

4.1 需求分析

  1. 遍历本地指定目录下的所有文件;
  2. 将文件自动上传到MinIO存储桶,以文件的修改时间作为前缀,避免文件名冲突;
  3. 上传完成后,记录备份日志。

4.2 完整代码实现

import os
import time
from minio import Minio
from minio.error import S3Error

class MinioFileBackup:
    def __init__(self, minio_server, access_key, secret_key, secure=False):
        # 初始化MinIO客户端
        self.client = Minio(
            minio_server,
            access_key=access_key,
            secret_key=secret_key,
            secure=secure
        )

    def create_bucket_if_not_exists(self, bucket_name):
        """如果存储桶不存在则创建"""
        if not self.client.bucket_exists(bucket_name):
            self.client.make_bucket(bucket_name)
            print(f"存储桶 {bucket_name} 创建成功")

    def backup_files(self, local_dir, bucket_name):
        """备份本地目录下的所有文件到MinIO存储桶"""
        # 检查本地目录是否存在
        if not os.path.isdir(local_dir):
            print(f"本地目录 {local_dir} 不存在")
            return
        # 创建存储桶(如果不存在)
        self.create_bucket_if_not_exists(bucket_name)
        # 遍历本地目录
        for root, dirs, files in os.walk(local_dir):
            for file in files:
                local_file_path = os.path.join(root, file)
                # 获取文件的修改时间,格式化为YYYYMMDDHHMMSS
                mtime = os.path.getmtime(local_file_path)
                time_str = time.strftime("%Y%m%d%H%M%S", time.localtime(mtime))
                # 构建远程对象名称,格式:时间前缀_原文件名
                remote_file_name = f"{time_str}_{file}"
                try:
                    # 上传文件
                    self.client.fput_object(
                        bucket_name=bucket_name,
                        object_name=remote_file_name,
                        file_path=local_file_path
                    )
                    # 记录备份日志
                    log_content = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 成功备份文件: {local_file_path} -> {bucket_name}/{remote_file_name}\n"
                    self.write_backup_log(log_content)
                    print(f"成功备份: {local_file_path}")
                except S3Error as e:
                    error_log = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 备份文件失败: {local_file_path}, 错误信息: {e}\n"
                    self.write_backup_log(error_log)
                    print(f"备份失败: {local_file_path}, 错误: {e}")

    def write_backup_log(self, log_content):
        """写入备份日志到本地文件"""
        with open("backup_log.txt", "a", encoding="utf-8") as f:
            f.write(log_content)

if __name__ == "__main__":
    # 配置MinIO连接信息
    MINIO_SERVER = "localhost:9000"
    ACCESS_KEY = "your-access-key"
    SECRET_KEY = "your-secret-key"
    # 配置备份参数
    LOCAL_BACKUP_DIR = "./backup_source"  # 本地待备份的目录
    BUCKET_NAME = "my-backup-bucket"      # 目标存储桶名称

    # 创建备份工具实例并执行备份
    backup_tool = MinioFileBackup(MINIO_SERVER, ACCESS_KEY, SECRET_KEY, secure=False)
    backup_tool.backup_files(LOCAL_BACKUP_DIR, BUCKET_NAME)

4.3 代码说明

  1. 定义MinioFileBackup类,封装MinIO客户端初始化、存储桶创建、文件备份和日志记录等功能;
  2. __init__方法接收MinIO服务端信息,初始化客户端;
  3. create_bucket_if_not_exists方法用于检查并创建存储桶;
  4. backup_files方法是核心功能,通过os.walk遍历本地目录下的所有文件,获取每个文件的修改时间并作为前缀,避免上传到MinIO后文件名冲突;
  5. 上传文件时捕获S3Error异常,成功和失败的信息都会写入本地的backup_log.txt日志文件;
  6. write_backup_log方法负责将日志内容追加写入日志文件,方便后续查看备份记录。

五、minio库相关资源地址

  • PyPI地址:https://pypi.org/project/minio
  • Github地址:https://github.com/minio/minio-py
  • 官方文档地址:https://min.io/docs/minio/linux/developers/python/API.html

关注我,每天分享一个实用的Python自动化工具。