Python实用工具:Databases库高效操作数据库指南

一、Databases库核心概述

1.1 用途与工作原理

Databases是一款专为Python异步编程设计的数据库操作库,支持PostgreSQL、MySQL、SQLite等主流数据库,可配合异步框架(如FastAPI、Starlette)实现高性能数据库交互。其工作原理是封装不同数据库的异步驱动,提供统一的异步API,避免同步操作阻塞事件循环,提升程序并发处理能力。

1.2 优缺点分析

优点:API简洁统一,适配多种数据库;原生支持异步操作,契合现代异步Web框架;轻量级设计,无冗余依赖;支持SQLAlchemy核心表达式,兼顾灵活性与规范性。
缺点:仅支持异步操作,同步项目中需额外引入异步运行环境;部分高级数据库特性需依赖底层驱动实现;对复杂ORM场景的支持弱于SQLAlchemy。

1.3 License类型

Databases库采用BSD 3-Clause “New” or “Revised” License,这是一种宽松的开源许可证,允许用户自由使用、修改和分发代码,商用场景中只需保留原作者版权声明。

二、Databases库安装与环境准备

2.1 安装命令

Databases库的安装需区分数据库类型,核心库安装命令如下:

pip install databases

安装后需根据目标数据库安装对应的异步驱动,常用驱动安装命令:

  • SQLite(无需额外驱动,内置支持)
  • PostgreSQL
  pip install asyncpg
  • MySQL/MariaDB
  pip install aiomysql

2.2 环境验证

安装完成后,可通过以下代码验证环境是否配置成功(以SQLite为例):

import databases

# 定义SQLite数据库连接URL
DATABASE_URL = "sqlite:///./test.db"
# 初始化数据库连接对象
database = databases.Database(DATABASE_URL)

async def check_connection():
    # 连接数据库
    await database.connect()
    # 验证连接状态
    if database.is_connected:
        print("数据库连接成功!")
    else:
        print("数据库连接失败!")
    # 断开连接
    await database.disconnect()

# 运行异步函数
import asyncio
asyncio.run(check_connection())

代码说明:该脚本初始化SQLite数据库连接,通过connect()disconnect()方法管理连接状态,运行后若输出“数据库连接成功!”,则说明环境配置无误。

三、Databases库核心使用方法

3.1 数据库连接管理

数据库连接的创建与关闭是操作的基础,Databases库提供Database类封装连接逻辑,支持上下文管理器自动管理连接生命周期。

3.1.1 基本连接方式

以MySQL数据库为例,连接代码如下:

import databases
import asyncio

# MySQL数据库连接URL格式:mysql+aiomysql://用户名:密码@主机:端口/数据库名
DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/test_db"
database = databases.Database(DATABASE_URL)

async def basic_connection():
    # 手动连接
    await database.connect()
    print(f"连接状态: {database.is_connected}")
    # 手动断开
    await database.disconnect()
    print(f"连接状态: {database.is_connected}")

asyncio.run(basic_connection())

代码说明:Database类接收数据库连接URL作为参数,connect()方法用于建立连接,disconnect()方法用于关闭连接,is_connected属性可实时查看连接状态。

3.1.2 上下文管理器自动管理连接

使用async with上下文管理器可避免手动管理连接,代码更简洁安全:

async def context_manager_connection():
    async with database:
        print(f"上下文内连接状态: {database.is_connected}")
    # 上下文结束后自动断开连接
    print(f"上下文外连接状态: {database.is_connected}")

asyncio.run(context_manager_connection())

代码说明:进入async with块时自动调用connect(),退出时自动调用disconnect(),即使代码块内抛出异常,也能确保连接正常关闭。

3.2 执行SQL查询语句

Databases库支持直接执行原生SQL语句,涵盖查询、插入、更新、删除等核心操作,所有操作均为异步非阻塞。

3.2.1 创建数据表

在执行数据操作前,需先创建对应的数据表,以创建users表为例:

import databases
import asyncio

DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)

# 定义创建表的SQL语句
CREATE_USERS_TABLE = """
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    age INTEGER
);
"""

async def create_table():
    async with database:
        # 执行创建表的SQL语句
        await database.execute(query=CREATE_USERS_TABLE)
        print("users表创建成功!")

asyncio.run(create_table())

代码说明:execute()方法用于执行无返回结果的SQL语句(如CREATEINSERTUPDATEDELETE),这里通过该方法创建users表,包含id(主键)、nameemail(唯一约束)、age四个字段。

3.2.2 插入数据

插入单条数据和多条数据的方法如下:

# 定义插入单条数据的SQL语句
INSERT_USER = """
INSERT INTO users (name, email, age) VALUES (:name, :email, :age)
"""

# 定义插入多条数据的SQL语句
INSERT_MULTIPLE_USERS = """
INSERT INTO users (name, email, age) VALUES (:name, :email, :age)
"""

async def insert_data():
    async with database:
        # 插入单条数据
        user_id = await database.execute(
            query=INSERT_USER,
            values={"name": "张三", "email": "[email protected]", "age": 25}
        )
        print(f"插入单条数据成功,用户ID: {user_id}")

        # 插入多条数据
        users = [
            {"name": "李四", "email": "[email protected]", "age": 28},
            {"name": "王五", "email": "[email protected]", "age": 30}
        ]
        await database.execute_many(
            query=INSERT_MULTIPLE_USERS,
            values=users
        )
        print("插入多条数据成功!")

asyncio.run(insert_data())

代码说明:

  • execute()方法支持通过values参数传递参数化查询数据,避免SQL注入风险,返回值为插入数据的主键ID。
  • execute_many()方法用于批量插入数据,接收列表形式的参数化数据,适合大批量数据写入场景,提升操作效率。

3.2.3 查询数据

查询数据是最常用的操作,Databases库提供fetch_one()fetch_all()fetch_val()三种方法满足不同查询需求。

# 定义查询单条数据的SQL语句
SELECT_USER_BY_ID = "SELECT * FROM users WHERE id = :id"
# 定义查询所有数据的SQL语句
SELECT_ALL_USERS = "SELECT * FROM users"
# 定义查询用户总数的SQL语句
SELECT_USER_COUNT = "SELECT COUNT(*) FROM users"

async def query_data():
    async with database:
        # 查询单条数据
        user = await database.fetch_one(
            query=SELECT_USER_BY_ID,
            values={"id": 1}
        )
        print(f"单条用户数据: {user}")  # 输出形式为字典:{'id':1, 'name':'张三',...}

        # 查询所有数据
        all_users = await database.fetch_all(query=SELECT_ALL_USERS)
        print("所有用户数据:")
        for u in all_users:
            print(f"ID: {u['id']}, 姓名: {u['name']}, 邮箱: {u['email']}, 年龄: {u['age']}")

        # 查询单个值(用户总数)
        user_count = await database.fetch_val(query=SELECT_USER_COUNT)
        print(f"用户总数: {user_count}")

asyncio.run(query_data())

代码说明:

  • fetch_one():返回查询结果的第一条数据,无结果时返回None,适合根据主键查询单条记录的场景。
  • fetch_all():返回查询结果的所有数据,以列表形式存储,每个元素为字典类型,对应数据表的一行记录。
  • fetch_val():返回查询结果的第一个值,适合统计类查询(如COUNTSUM)。

3.2.4 更新与删除数据

更新和删除数据的操作与插入类似,均通过execute()方法执行对应的SQL语句:

# 定义更新数据的SQL语句
UPDATE_USER_AGE = "UPDATE users SET age = :age WHERE id = :id"
# 定义删除数据的SQL语句
DELETE_USER = "DELETE FROM users WHERE id = :id"

async def update_and_delete_data():
    async with database:
        # 更新数据
        update_rows = await database.execute(
            query=UPDATE_USER_AGE,
            values={"age": 26, "id": 1}
        )
        print(f"更新数据行数: {update_rows}")  # 返回受影响的行数

        # 删除数据
        delete_rows = await database.execute(
            query=DELETE_USER,
            values={"id": 3}
        )
        print(f"删除数据行数: {delete_rows}")

asyncio.run(update_and_delete_data())

代码说明:execute()方法执行更新和删除语句时,返回值为受影响的数据行数,可通过该返回值判断操作是否生效。

3.3 结合SQLAlchemy Core使用

Databases库支持与SQLAlchemy Core结合使用,无需编写原生SQL语句,通过Python对象定义数据表结构和查询逻辑,提升代码的可维护性。

3.3.1 定义数据表模型

首先通过SQLAlchemy Core定义users表模型:

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import select, update, delete, insert
import databases
import asyncio

DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
metadata = MetaData()

# 定义users表模型
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", String(50), nullable=False),
    Column("email", String(100), unique=True, nullable=False),
    Column("age", Integer)
)

# 创建数据表(同步操作,适用于初始化)
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)

代码说明:使用SQLAlchemy Core的Table类定义数据表结构,MetaData用于管理数据表元信息,create_all()方法用于同步创建所有定义的数据表。

3.3.2 执行CRUD操作

基于数据表模型执行CRUD操作,无需编写原生SQL:

async def sqlalchemy_crud():
    async with database:
        # 插入数据
        insert_query = users.insert().values(name="赵六", email="[email protected]", age=32)
        user_id = await database.execute(insert_query)
        print(f"插入数据成功,用户ID: {user_id}")

        # 查询数据
        select_query = select(users).where(users.c.id == user_id)
        user = await database.fetch_one(select_query)
        print(f"查询到的用户数据: {user}")

        # 更新数据
        update_query = update(users).where(users.c.id == user_id).values(age=33)
        update_rows = await database.execute(update_query)
        print(f"更新数据行数: {update_rows}")

        # 删除数据
        delete_query = delete(users).where(users.c.id == user_id)
        delete_rows = await database.execute(delete_query)
        print(f"删除数据行数: {delete_rows}")

asyncio.run(sqlalchemy_crud())

代码说明:SQLAlchemy Core提供insert()select()update()delete()等方法构建查询对象,Databases库可直接执行这些查询对象,实现与原生SQL一致的功能,同时提升代码的可读性和可维护性。

四、实际案例:异步用户管理系统

4.1 案例需求

构建一个简单的异步用户管理系统,支持用户的创建、查询、更新和删除操作,配合FastAPI框架实现Web接口(注:FastAPI为异步Web框架,与Databases库适配性极佳)。

4.2 项目结构

user_management_system/
├── main.py
└── test.db

4.3 代码实现

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import databases
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import select

# 配置数据库
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
metadata = MetaData()

# 定义用户表模型
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", String(50), nullable=False),
    Column("email", String(100), unique=True, nullable=False),
    Column("age", Integer)
)

# 创建数据表
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)

# 初始化FastAPI应用
app = FastAPI(title="异步用户管理系统")

# 定义Pydantic数据模型,用于数据验证
class UserCreate(BaseModel):
    name: str
    email: str
    age: int

class UserResponse(UserCreate):
    id: int

    class Config:
        orm_mode = True

# 数据库连接与断开事件
@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

# 创建用户接口
@app.post("/users/", response_model=UserResponse, summary="创建新用户")
async def create_user(user: UserCreate):
    try:
        query = users.insert().values(**user.dict())
        user_id = await database.execute(query)
        return {**user.dict(), "id": user_id}
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"创建用户失败: {str(e)}")

# 查询单个用户接口
@app.get("/users/{user_id}", response_model=UserResponse, summary="根据ID查询用户")
async def get_user(user_id: int):
    query = select(users).where(users.c.id == user_id)
    user = await database.fetch_one(query)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

# 查询所有用户接口
@app.get("/users/", summary="查询所有用户")
async def get_all_users():
    query = select(users)
    all_users = await database.fetch_all(query)
    return {"users": all_users}

# 更新用户接口
@app.put("/users/{user_id}", summary="更新用户信息")
async def update_user(user_id: int, user: UserCreate):
    query = users.update().where(users.c.id == user_id).values(**user.dict())
    update_rows = await database.execute(query)
    if update_rows == 0:
        raise HTTPException(status_code=404, detail="用户不存在")
    return {"message": "用户信息更新成功"}

# 删除用户接口
@app.delete("/users/{user_id}", summary="删除用户")
async def delete_user(user_id: int):
    query = users.delete().where(users.c.id == user_id)
    delete_rows = await database.execute(query)
    if delete_rows == 0:
        raise HTTPException(status_code=404, detail="用户不存在")
    return {"message": "用户删除成功"}

代码说明:

  1. 该案例结合FastAPI框架实现用户管理系统的Web接口,Pydantic用于请求数据验证和响应数据格式化。
  2. 通过FastAPI的startupshutdown事件,实现应用启动时自动连接数据库,关闭时自动断开连接。
  3. 每个接口对应用户的一种操作,通过Databases库执行SQLAlchemy Core构建的查询对象,实现异步数据库交互。
  4. 加入异常处理逻辑,确保接口返回友好的错误提示。

4.4 运行与测试

  1. 安装依赖:
   pip install databases fastapi uvicorn sqlalchemy pydantic
  1. 启动应用:
   uvicorn main:app --reload
  1. 访问接口文档:打开浏览器访问http://127.0.0.1:8000/docs,可通过自动生成的Swagger文档测试所有接口。

五、相关资源

  • Pypi地址:https://pypi.org/project/Databases
  • Github地址:https://github.com/encode/databases
  • 官方文档地址:https://www.encode.io/databases/

关注我,每天分享一个实用的Python自动化工具。