一、Databases库核心概述
1.1 用途与工作原理
Databases是一款专为Python异步编程设计的数据库操作库,支持PostgreSQL、MySQL、SQLite等主流数据库,可配合异步框架(如FastAPI、Starlette)实现高性能数据库交互。其工作原理是封装不同数据库的异步驱动,提供统一的异步API,避免同步操作阻塞事件循环,提升程序并发处理能力。

1.2 优缺点分析
优点:API简洁统一,适配多种数据库;原生支持异步操作,契合现代异步Web框架;轻量级设计,无冗余依赖;支持SQLAlchemy核心表达式,兼顾灵活性与规范性。
缺点:仅支持异步操作,同步项目中需额外引入异步运行环境;部分高级数据库特性需依赖底层驱动实现;对复杂ORM场景的支持弱于SQLAlchemy。
1.3 License类型
Databases库采用BSD 3-Clause “New” or “Revised” License,这是一种宽松的开源许可证,允许用户自由使用、修改和分发代码,商用场景中只需保留原作者版权声明。
二、Databases库安装与环境准备
2.1 安装命令
Databases库的安装需区分数据库类型,核心库安装命令如下:
pip install databases
安装后需根据目标数据库安装对应的异步驱动,常用驱动安装命令:
- SQLite(无需额外驱动,内置支持)
- PostgreSQL
pip install asyncpg
- MySQL/MariaDB
pip install aiomysql
2.2 环境验证
安装完成后,可通过以下代码验证环境是否配置成功(以SQLite为例):
import databases
# 定义SQLite数据库连接URL
DATABASE_URL = "sqlite:///./test.db"
# 初始化数据库连接对象
database = databases.Database(DATABASE_URL)
async def check_connection():
# 连接数据库
await database.connect()
# 验证连接状态
if database.is_connected:
print("数据库连接成功!")
else:
print("数据库连接失败!")
# 断开连接
await database.disconnect()
# 运行异步函数
import asyncio
asyncio.run(check_connection())
代码说明:该脚本初始化SQLite数据库连接,通过connect()和disconnect()方法管理连接状态,运行后若输出“数据库连接成功!”,则说明环境配置无误。
三、Databases库核心使用方法
3.1 数据库连接管理
数据库连接的创建与关闭是操作的基础,Databases库提供Database类封装连接逻辑,支持上下文管理器自动管理连接生命周期。
3.1.1 基本连接方式
以MySQL数据库为例,连接代码如下:
import databases
import asyncio
# MySQL数据库连接URL格式:mysql+aiomysql://用户名:密码@主机:端口/数据库名
DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/test_db"
database = databases.Database(DATABASE_URL)
async def basic_connection():
# 手动连接
await database.connect()
print(f"连接状态: {database.is_connected}")
# 手动断开
await database.disconnect()
print(f"连接状态: {database.is_connected}")
asyncio.run(basic_connection())
代码说明:Database类接收数据库连接URL作为参数,connect()方法用于建立连接,disconnect()方法用于关闭连接,is_connected属性可实时查看连接状态。
3.1.2 上下文管理器自动管理连接
使用async with上下文管理器可避免手动管理连接,代码更简洁安全:
async def context_manager_connection():
async with database:
print(f"上下文内连接状态: {database.is_connected}")
# 上下文结束后自动断开连接
print(f"上下文外连接状态: {database.is_connected}")
asyncio.run(context_manager_connection())
代码说明:进入async with块时自动调用connect(),退出时自动调用disconnect(),即使代码块内抛出异常,也能确保连接正常关闭。
3.2 执行SQL查询语句
Databases库支持直接执行原生SQL语句,涵盖查询、插入、更新、删除等核心操作,所有操作均为异步非阻塞。
3.2.1 创建数据表
在执行数据操作前,需先创建对应的数据表,以创建users表为例:
import databases
import asyncio
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
# 定义创建表的SQL语句
CREATE_USERS_TABLE = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
age INTEGER
);
"""
async def create_table():
async with database:
# 执行创建表的SQL语句
await database.execute(query=CREATE_USERS_TABLE)
print("users表创建成功!")
asyncio.run(create_table())
代码说明:execute()方法用于执行无返回结果的SQL语句(如CREATE、INSERT、UPDATE、DELETE),这里通过该方法创建users表,包含id(主键)、name、email(唯一约束)、age四个字段。
3.2.2 插入数据
插入单条数据和多条数据的方法如下:
# 定义插入单条数据的SQL语句
INSERT_USER = """
INSERT INTO users (name, email, age) VALUES (:name, :email, :age)
"""
# 定义插入多条数据的SQL语句
INSERT_MULTIPLE_USERS = """
INSERT INTO users (name, email, age) VALUES (:name, :email, :age)
"""
async def insert_data():
async with database:
# 插入单条数据
user_id = await database.execute(
query=INSERT_USER,
values={"name": "张三", "email": "[email protected]", "age": 25}
)
print(f"插入单条数据成功,用户ID: {user_id}")
# 插入多条数据
users = [
{"name": "李四", "email": "[email protected]", "age": 28},
{"name": "王五", "email": "[email protected]", "age": 30}
]
await database.execute_many(
query=INSERT_MULTIPLE_USERS,
values=users
)
print("插入多条数据成功!")
asyncio.run(insert_data())
代码说明:
execute()方法支持通过values参数传递参数化查询数据,避免SQL注入风险,返回值为插入数据的主键ID。execute_many()方法用于批量插入数据,接收列表形式的参数化数据,适合大批量数据写入场景,提升操作效率。
3.2.3 查询数据
查询数据是最常用的操作,Databases库提供fetch_one()、fetch_all()、fetch_val()三种方法满足不同查询需求。
# 定义查询单条数据的SQL语句
SELECT_USER_BY_ID = "SELECT * FROM users WHERE id = :id"
# 定义查询所有数据的SQL语句
SELECT_ALL_USERS = "SELECT * FROM users"
# 定义查询用户总数的SQL语句
SELECT_USER_COUNT = "SELECT COUNT(*) FROM users"
async def query_data():
async with database:
# 查询单条数据
user = await database.fetch_one(
query=SELECT_USER_BY_ID,
values={"id": 1}
)
print(f"单条用户数据: {user}") # 输出形式为字典:{'id':1, 'name':'张三',...}
# 查询所有数据
all_users = await database.fetch_all(query=SELECT_ALL_USERS)
print("所有用户数据:")
for u in all_users:
print(f"ID: {u['id']}, 姓名: {u['name']}, 邮箱: {u['email']}, 年龄: {u['age']}")
# 查询单个值(用户总数)
user_count = await database.fetch_val(query=SELECT_USER_COUNT)
print(f"用户总数: {user_count}")
asyncio.run(query_data())
代码说明:
fetch_one():返回查询结果的第一条数据,无结果时返回None,适合根据主键查询单条记录的场景。fetch_all():返回查询结果的所有数据,以列表形式存储,每个元素为字典类型,对应数据表的一行记录。fetch_val():返回查询结果的第一个值,适合统计类查询(如COUNT、SUM)。
3.2.4 更新与删除数据
更新和删除数据的操作与插入类似,均通过execute()方法执行对应的SQL语句:
# 定义更新数据的SQL语句
UPDATE_USER_AGE = "UPDATE users SET age = :age WHERE id = :id"
# 定义删除数据的SQL语句
DELETE_USER = "DELETE FROM users WHERE id = :id"
async def update_and_delete_data():
async with database:
# 更新数据
update_rows = await database.execute(
query=UPDATE_USER_AGE,
values={"age": 26, "id": 1}
)
print(f"更新数据行数: {update_rows}") # 返回受影响的行数
# 删除数据
delete_rows = await database.execute(
query=DELETE_USER,
values={"id": 3}
)
print(f"删除数据行数: {delete_rows}")
asyncio.run(update_and_delete_data())
代码说明:execute()方法执行更新和删除语句时,返回值为受影响的数据行数,可通过该返回值判断操作是否生效。
3.3 结合SQLAlchemy Core使用
Databases库支持与SQLAlchemy Core结合使用,无需编写原生SQL语句,通过Python对象定义数据表结构和查询逻辑,提升代码的可维护性。
3.3.1 定义数据表模型
首先通过SQLAlchemy Core定义users表模型:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import select, update, delete, insert
import databases
import asyncio
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
metadata = MetaData()
# 定义users表模型
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String(50), nullable=False),
Column("email", String(100), unique=True, nullable=False),
Column("age", Integer)
)
# 创建数据表(同步操作,适用于初始化)
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)
代码说明:使用SQLAlchemy Core的Table类定义数据表结构,MetaData用于管理数据表元信息,create_all()方法用于同步创建所有定义的数据表。
3.3.2 执行CRUD操作
基于数据表模型执行CRUD操作,无需编写原生SQL:
async def sqlalchemy_crud():
async with database:
# 插入数据
insert_query = users.insert().values(name="赵六", email="[email protected]", age=32)
user_id = await database.execute(insert_query)
print(f"插入数据成功,用户ID: {user_id}")
# 查询数据
select_query = select(users).where(users.c.id == user_id)
user = await database.fetch_one(select_query)
print(f"查询到的用户数据: {user}")
# 更新数据
update_query = update(users).where(users.c.id == user_id).values(age=33)
update_rows = await database.execute(update_query)
print(f"更新数据行数: {update_rows}")
# 删除数据
delete_query = delete(users).where(users.c.id == user_id)
delete_rows = await database.execute(delete_query)
print(f"删除数据行数: {delete_rows}")
asyncio.run(sqlalchemy_crud())
代码说明:SQLAlchemy Core提供insert()、select()、update()、delete()等方法构建查询对象,Databases库可直接执行这些查询对象,实现与原生SQL一致的功能,同时提升代码的可读性和可维护性。
四、实际案例:异步用户管理系统
4.1 案例需求
构建一个简单的异步用户管理系统,支持用户的创建、查询、更新和删除操作,配合FastAPI框架实现Web接口(注:FastAPI为异步Web框架,与Databases库适配性极佳)。
4.2 项目结构
user_management_system/
├── main.py
└── test.db
4.3 代码实现
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import databases
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import select
# 配置数据库
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
metadata = MetaData()
# 定义用户表模型
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String(50), nullable=False),
Column("email", String(100), unique=True, nullable=False),
Column("age", Integer)
)
# 创建数据表
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)
# 初始化FastAPI应用
app = FastAPI(title="异步用户管理系统")
# 定义Pydantic数据模型,用于数据验证
class UserCreate(BaseModel):
name: str
email: str
age: int
class UserResponse(UserCreate):
id: int
class Config:
orm_mode = True
# 数据库连接与断开事件
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
# 创建用户接口
@app.post("/users/", response_model=UserResponse, summary="创建新用户")
async def create_user(user: UserCreate):
try:
query = users.insert().values(**user.dict())
user_id = await database.execute(query)
return {**user.dict(), "id": user_id}
except Exception as e:
raise HTTPException(status_code=400, detail=f"创建用户失败: {str(e)}")
# 查询单个用户接口
@app.get("/users/{user_id}", response_model=UserResponse, summary="根据ID查询用户")
async def get_user(user_id: int):
query = select(users).where(users.c.id == user_id)
user = await database.fetch_one(query)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
# 查询所有用户接口
@app.get("/users/", summary="查询所有用户")
async def get_all_users():
query = select(users)
all_users = await database.fetch_all(query)
return {"users": all_users}
# 更新用户接口
@app.put("/users/{user_id}", summary="更新用户信息")
async def update_user(user_id: int, user: UserCreate):
query = users.update().where(users.c.id == user_id).values(**user.dict())
update_rows = await database.execute(query)
if update_rows == 0:
raise HTTPException(status_code=404, detail="用户不存在")
return {"message": "用户信息更新成功"}
# 删除用户接口
@app.delete("/users/{user_id}", summary="删除用户")
async def delete_user(user_id: int):
query = users.delete().where(users.c.id == user_id)
delete_rows = await database.execute(query)
if delete_rows == 0:
raise HTTPException(status_code=404, detail="用户不存在")
return {"message": "用户删除成功"}
代码说明:
- 该案例结合FastAPI框架实现用户管理系统的Web接口,Pydantic用于请求数据验证和响应数据格式化。
- 通过FastAPI的
startup和shutdown事件,实现应用启动时自动连接数据库,关闭时自动断开连接。 - 每个接口对应用户的一种操作,通过Databases库执行SQLAlchemy Core构建的查询对象,实现异步数据库交互。
- 加入异常处理逻辑,确保接口返回友好的错误提示。
4.4 运行与测试
- 安装依赖:
pip install databases fastapi uvicorn sqlalchemy pydantic
- 启动应用:
uvicorn main:app --reload
- 访问接口文档:打开浏览器访问
http://127.0.0.1:8000/docs,可通过自动生成的Swagger文档测试所有接口。
五、相关资源
- Pypi地址:https://pypi.org/project/Databases
- Github地址:https://github.com/encode/databases
- 官方文档地址:https://www.encode.io/databases/
关注我,每天分享一个实用的Python自动化工具。

