Python实用工具Piccolo详解:轻量级ORM的高效使用指南

一、Piccolo库核心概述

Piccolo是一款专为Python开发者设计的轻量级异步ORM(对象关系映射)框架,主要用于简化数据库的操作流程,支持PostgreSQL、SQLite等主流数据库,同时兼容同步与异步编程模式。其工作原理是将Python类映射为数据库表,通过面向对象的语法替代原生SQL语句,降低数据库操作的复杂度。

该库的优点突出:异步特性适配高并发场景,语法简洁易上手,支持自动生成迁移文件,且体积小巧、无过多依赖;缺点则是生态相较于Django ORM、SQLAlchemy更小众,部分高级功能有待完善。Piccolo采用MIT开源许可证,允许开发者自由使用、修改和分发,无商业使用限制。

二、Piccolo库安装与环境配置

2.1 安装命令

Piccolo支持通过pip包管理器一键安装,无论是同步环境还是异步环境,安装命令一致。打开终端,输入以下命令:

pip install piccolo

安装完成后,可通过以下命令验证安装是否成功:

python -m piccolo --version

若终端输出Piccolo的版本号,说明安装成功。

2.2 支持的Python与数据库版本

  • Python版本:推荐Python 3.8及以上版本,确保异步特性和语法兼容性。
  • 数据库版本:
  • SQLite:3.20.0及以上版本(无需额外配置,开箱即用);
  • PostgreSQL:10.0及以上版本(需提前安装并启动数据库服务)。

三、Piccolo核心功能与基础用法

3.1 定义数据库表模型

Piccolo的核心是通过Table类定义数据库表结构,每个类属性对应表中的一个字段。我们以创建一个“用户信息表”为例,演示模型定义的方法。

3.1.1 基础模型定义代码

from piccolo.table import Table
from piccolo.columns import Varchar, Int, Boolean, Timestamp

class User(Table):
    """
    用户信息表模型
    字段说明:
    - username: 用户名,字符串类型,长度50,非空且唯一
    - age: 年龄,整数类型
    - is_active: 是否激活,布尔类型,默认值为True
    - create_time: 创建时间,时间戳类型,默认自动填充当前时间
    """
    username = Varchar(length=50, null=False, unique=True)
    age = Int(null=True)
    is_active = Boolean(default=True)
    create_time = Timestamp(default=lambda: datetime.now())

代码说明

  1. 导入Table基类和需要的字段类型(VarcharInt等);
  2. 定义User类并继承Table,该类会被映射为数据库中的user表;
  3. 每个类属性对应表字段,通过参数指定字段约束(如null=False表示非空,unique=True表示唯一)。

3.1.2 字段类型与常用约束

Piccolo提供了丰富的字段类型,满足不同业务需求,常见字段及约束如下:
| 字段类型 | 作用 | 常用约束 |
|-||-|
| Varchar | 字符串类型 | length(长度)、null(是否允许空)、unique(是否唯一) |
| Int | 整数类型 | default(默认值)、choices(可选值列表) |
| Boolean | 布尔类型 | default(默认值) |
| Timestamp | 时间戳类型 | default(默认值,支持lambda函数) |
| ForeignKey | 外键类型 | references(关联的表模型) |

3.2 生成数据库迁移文件

在Piccolo中,模型定义完成后,需要生成迁移文件来创建对应的数据库表。迁移文件是数据库结构变更的记录,确保不同环境下的数据库结构一致。

3.2.1 初始化迁移环境

首先,在项目根目录下执行以下命令,初始化Piccolo的配置文件和迁移目录:

piccolo project new my_project

执行完成后,项目会生成piccolo_conf.py配置文件和migrations目录。

3.2.2 配置数据库连接

打开piccolo_conf.py文件,修改数据库连接配置。以SQLite为例:

from piccolo.conf.apps import AppConfig
from piccolo.engine.sqlite import SQLiteEngine

# SQLite数据库连接配置
DB = SQLiteEngine(path="my_database.db")

# 注册包含表模型的应用
APP_CONFIG = AppConfig(
    app_name="my_app",
    migrations_folder_path="my_app/migrations",
    table_classes=["my_app.tables.User"],
)

若使用PostgreSQL,配置如下:

from piccolo.engine.postgres import PostgresEngine

DB = PostgresEngine(
    config={
        "database": "my_db",
        "user": "postgres",
        "password": "123456",
        "host": "localhost",
        "port": 5432,
    }
)

3.2.3 创建并应用迁移文件

  1. 生成迁移文件:执行以下命令,Piccolo会自动检测模型变化并生成迁移文件。
piccolo migrations new my_app --auto
  1. 应用迁移文件:将迁移文件中的变更同步到数据库,创建user表。
piccolo migrations forwards my_app

执行成功后,数据库中会生成对应的user表结构。

3.3 数据的增删改查操作

Piccolo支持同步和异步两种数据操作方式,以下分别演示两种模式下的增删改查(CRUD)操作。

3.3.1 同步操作示例

from datetime import datetime
from my_app.tables import User

# 1. 新增数据(Create)
def add_user():
    # 方式一:通过类实例化并保存
    user1 = User(
        username="alice",
        age=25,
        is_active=True,
        create_time=datetime.now()
    )
    user1.save()  # 保存到数据库

    # 方式二:使用create方法直接创建
    User.create(username="bob", age=30, is_active=False)

# 2. 查询数据(Read)
def query_users():
    # 查询所有用户
    all_users = User.objects().all()
    for user in all_users:
        print(f"用户名:{user.username},年龄:{user.age}")

    # 条件查询:查询年龄大于25的激活用户
    active_users = User.objects().where(
        (User.age > 25) & (User.is_active == True)
    )
    print(f"年龄大于25的激活用户数量:{active_users.count()}")

    # 查询单个用户:根据用户名查询
    user = User.objects().get(User.username == "alice")
    print(f"Alice的年龄:{user.age}")

# 3. 更新数据(Update)
def update_user():
    # 修改单个用户的年龄
    user = User.objects().get(User.username == "bob")
    user.age = 31
    user.save()

    # 批量更新:将所有激活用户的年龄加1
    User.objects().where(User.is_active == True).update({User.age: User.age + 1})

# 4. 删除数据(Delete)
def delete_user():
    # 删除单个用户
    user = User.objects().get(User.username == "bob")
    user.remove()

    # 批量删除:删除年龄小于20的用户
    User.objects().where(User.age < 20).remove()

# 执行操作
if __name__ == "__main__":
    add_user()
    query_users()
    update_user()
    delete_user()

代码说明

  • 新增数据:支持实例化对象后save()和直接调用create()两种方式;
  • 查询数据:使用objects()获取查询集,通过where()添加条件,get()查询单条数据,count()统计数量;
  • 更新数据:支持单条数据修改后save()和批量update()
  • 删除数据:支持单条数据remove()和批量删除。

3.3.2 异步操作示例

Piccolo的异步特性基于asyncio实现,适合高并发场景,异步操作的语法与同步操作类似,只需使用async/await关键字。

import asyncio
from datetime import datetime
from my_app.tables import User

# 异步新增数据
async def async_add_user():
    user1 = User(username="charlie", age=28)
    await user1.save()  # 异步保存
    await User.create(username="david", age=22, is_active=False)

# 异步查询数据
async def async_query_users():
    all_users = await User.objects().all()
    for user in all_users:
        print(f"异步查询 - 用户名:{user.username},年龄:{user.age}")

    # 异步条件查询
    active_users = await User.objects().where(User.is_active == True).run()
    print(f"异步查询 - 激活用户数量:{len(active_users)}")

# 异步更新数据
async def async_update_user():
    await User.objects().where(User.username == "charlie").update({User.age: 29})

# 异步删除数据
async def async_delete_user():
    await User.objects().where(User.username == "david").remove()

# 执行异步操作
async def main():
    await async_add_user()
    await async_query_users()
    await async_update_user()
    await async_delete_user()

if __name__ == "__main__":
    asyncio.run(main())

代码说明

  • 异步操作需在async函数中执行,通过await调用Piccolo的异步方法;
  • run()方法用于执行异步查询集,获取结果列表;
  • 最后通过asyncio.run()启动异步事件循环。

四、Piccolo高级功能与应用

4.1 表关联(外键)操作

在实际项目中,表与表之间通常存在关联关系,如“用户表”和“订单表”的一对多关系。以下演示如何通过Piccolo定义外键关联并进行关联查询。

4.1.1 定义关联表模型

from piccolo.table import Table
from piccolo.columns import Varchar, Int, ForeignKey, Decimal
from my_app.tables import User

class Order(Table):
    """
    订单表模型
    外键关联User表,一个用户可以有多个订单
    """
    order_no = Varchar(length=30, unique=True, null=False)  # 订单编号
    amount = Decimal(precision=10, scale=2)  # 订单金额
    user = ForeignKey(references=User)  # 外键关联用户表

# 生成并应用迁移文件,创建order表
# 命令:piccolo migrations new my_app --auto && piccolo migrations forwards my_app

4.1.2 关联查询操作

from my_app.tables import User, Order

# 同步关联查询:查询某个用户的所有订单
def query_user_orders():
    user = User.objects().get(User.username == "alice")
    # 通过外键反向查询用户的订单
    orders = Order.objects().where(Order.user == user)
    for order in orders:
        print(f"用户{user.username}的订单:{order.order_no},金额:{order.amount}")

# 异步关联查询
async def async_query_user_orders():
    user = await User.objects().get(User.username == "alice")
    orders = await Order.objects().where(Order.user == user).run()
    for order in orders:
        print(f"异步查询 - 用户{user.username}的订单:{order.order_no}")

# 执行查询
query_user_orders()
asyncio.run(async_query_user_orders())

代码说明

  • 外键通过ForeignKey字段定义,references参数指定关联的表模型;
  • 关联查询时,可通过外键字段作为条件,查询关联表的数据。

4.2 数据筛选与排序

Piccolo提供了丰富的筛选和排序方法,满足复杂的查询需求。

from my_app.tables import User

# 数据筛选:多条件组合、模糊查询
def filter_users():
    # 模糊查询:用户名包含"li"的用户
    users = User.objects().where(User.username.like("%li%"))

    # 范围查询:年龄在20-30之间的用户
    users = User.objects().where((User.age >= 20) & (User.age <= 30))

    # 排序:按年龄降序排列
    sorted_users = User.objects().order_by(User.age, ascending=False)
    for user in sorted_users:
        print(f"用户名:{user.username},年龄:{user.age}")

filter_users()

代码说明

  • like()方法用于模糊查询,%表示通配符;
  • order_by()方法用于排序,ascending=False表示降序。

4.3 数据库事务操作

事务可以确保一系列数据库操作要么全部成功,要么全部失败,保证数据一致性。Piccolo支持同步和异步事务。

4.3.1 同步事务示例

from piccolo.utils.transaction import transaction
from my_app.tables import User

@transaction()
def transaction_demo():
    # 事务内的操作
    User.create(username="eva", age=24)
    User.create(username="frank", age=26)
    # 若执行过程中抛出异常,事务会回滚
    # 例如:raise Exception("模拟异常,事务回滚")

# 执行事务
transaction_demo()

4.3.2 异步事务示例

from piccolo.utils.transaction import async_transaction

@async_transaction()
async def async_transaction_demo():
    await User.create(username="grace", age=27)
    await User.create(username="henry", age=29)

asyncio.run(async_transaction_demo())

代码说明

  • 同步事务使用@transaction()装饰器,异步事务使用@async_transaction()装饰器;
  • 事务内的所有操作会被包裹,若出现异常则自动回滚。

五、实际项目案例:用户管理系统

我们以一个简单的用户管理系统为例,整合Piccolo的核心功能,实现用户的注册、查询、更新和删除功能。

5.1 项目目录结构

my_user_system/
├── my_app/
│   ├── __init__.py
│   ├── tables.py       # 表模型定义
│   └── operations.py   # 业务逻辑操作
├── piccolo_conf.py     # Piccolo配置文件
└── main.py             # 程序入口

5.2 代码实现

5.2.1 tables.py(表模型)

from piccolo.table import Table
from piccolo.columns import Varchar, Int, Boolean, Timestamp
from datetime import datetime

class User(Table):
    username = Varchar(length=50, null=False, unique=True)
    age = Int(null=True)
    is_active = Boolean(default=True)
    create_time = Timestamp(default=lambda: datetime.now())

5.2.2 operations.py(业务逻辑)

import asyncio
from my_app.tables import User

# 同步业务操作
class SyncUserOperations:
    @staticmethod
    def register_user(username, age):
        """用户注册"""
        if User.objects().where(User.username == username).exists():
            print(f"用户名{username}已存在")
            return False
        User.create(username=username, age=age)
        print(f"用户{username}注册成功")
        return True

    @staticmethod
    def get_user(username):
        """查询用户"""
        try:
            user = User.objects().get(User.username == username)
            return {
                "username": user.username,
                "age": user.age,
                "is_active": user.is_active,
                "create_time": user.create_time
            }
        except Exception:
            return None

    @staticmethod
    def update_user_age(username, new_age):
        """更新用户年龄"""
        user = User.objects().get(User.username == username)
        if not user:
            return False
        user.age = new_age
        user.save()
        return True

# 异步业务操作
class AsyncUserOperations:
    @staticmethod
    async def register_user(username, age):
        if await User.objects().where(User.username == username).exists():
            print(f"用户名{username}已存在")
            return False
        await User.create(username=username, age=age)
        print(f"用户{username}注册成功")
        return True

    @staticmethod
    async def get_user(username):
        try:
            user = await User.objects().get(User.username == username)
            return {
                "username": user.username,
                "age": user.age,
                "is_active": user.is_active,
                "create_time": user.create_time
            }
        except Exception:
            return None

5.2.3 main.py(程序入口)

import asyncio
from my_app.operations import SyncUserOperations, AsyncUserOperations

# 同步操作演示
def sync_demo():
    SyncUserOperations.register_user("user1", 22)
    SyncUserOperations.register_user("user1", 23)  # 重复注册
    user = SyncUserOperations.get_user("user1")
    print(f"查询用户:{user}")
    SyncUserOperations.update_user_age("user1", 24)
    user = SyncUserOperations.get_user("user1")
    print(f"更新后用户信息:{user}")

# 异步操作演示
async def async_demo():
    await AsyncUserOperations.register_user("user2", 25)
    user = await AsyncUserOperations.get_user("user2")
    print(f"异步查询用户:{user}")

if __name__ == "__main__":
    sync_demo()
    asyncio.run(async_demo())

5.3 运行项目

  1. 配置piccolo_conf.py文件,设置数据库连接;
  2. 生成并应用迁移文件:
piccolo migrations new my_app --auto
piccolo migrations forwards my_app
  1. 运行main.py
python main.py

终端会输出用户注册、查询和更新的结果,验证功能的正确性。

六、Piccolo相关资源链接

  • Pypi地址:https://pypi.org/project/piccolos
  • Github地址:https://github.com/xxxxx/xxxxxx
  • 官方文档地址:https://www.xxxxx.com/xxxxxx

关注我,每天分享一个实用的Python自动化工具。