一、psycopg3 库概述
psycopg3 是 Python 编程语言中用于连接和操作 PostgreSQL 数据库的高性能适配器,也是 psycopg2 的官方升级版本。它遵循 Python DB API 2.0 规范,能够实现 Python 程序与 PostgreSQL 数据库的高效数据交互,支持异步操作、类型适配、事务管理等核心功能。

工作原理上,psycopg3 通过底层的 libpq 库(PostgreSQL 官方客户端库)建立 Python 与数据库的通信通道,将 Python 数据类型转换为 PostgreSQL 支持的类型,同时把数据库返回的结果转换为 Python 原生对象,实现双向数据无缝流转。
该库的优点十分突出:支持异步 I/O 操作,适配 asyncio 框架;性能较 psycopg2 大幅提升;对 Python 3.8+ 版本兼容性好;支持 PostgreSQL 10+ 所有新特性;提供灵活的参数化查询,有效防止 SQL 注入攻击。缺点则是目前生态插件较少,部分旧项目迁移需要修改代码;对低版本 Python(3.7 及以下)不支持。
psycopg3 的开源协议为 GNU Lesser General Public License (LGPL) 3.0,允许用户自由使用、修改和分发,可用于商业项目开发。
二、psycopg3 安装与环境配置
2.1 前置条件
在安装 psycopg3 之前,需要确保系统满足以下两个核心条件:
- Python 环境:版本 3.8 或更高,推荐使用 3.10+ 稳定版本。
- PostgreSQL 环境:本地或远程服务器已安装 PostgreSQL 10 或更高版本,且确保数据库服务处于运行状态。
- 依赖库:系统需要安装 libpq 开发包,不同操作系统安装命令如下:
- Ubuntu/Debian 系统
bash sudo apt-get install libpq-dev python3-dev - CentOS/RHEL 系统
bash sudo yum install postgresql-devel python3-devel - Windows 系统
无需手动安装,psycopg3 的 Windows 版本已内置相关依赖。
- Ubuntu/Debian 系统
2.2 安装 psycopg3
psycopg3 已发布到 PyPI 仓库,可通过 pip 包管理器一键安装,这是最简单且推荐的方式。
2.2.1 基础安装命令
打开终端或命令提示符,执行以下命令:
pip install psycopg32.2.2 指定版本安装
如果需要安装特定版本的 psycopg3(例如 3.1.12),可以执行:
pip install psycopg3==3.1.122.2.3 验证安装是否成功
安装完成后,可通过 Python 交互式环境验证是否安装成功。打开 Python 终端,输入以下代码:
import psycopg
print(psycopg.__version__)如果终端输出 psycopg3 的版本号(如 3.1.12),则说明安装成功;若出现 ModuleNotFoundError 错误,则需要检查 pip 安装路径或 Python 环境配置。
三、psycopg3 核心用法与代码示例
psycopg3 的核心操作围绕 数据库连接、游标对象、数据查询、数据增删改、事务管理 展开,下面结合具体代码示例详细讲解每个功能的使用方法。
3.1 数据库连接与关闭
要操作 PostgreSQL 数据库,第一步是建立 Python 程序与数据库的连接。psycopg3 提供 psycopg.connect() 函数创建连接对象,连接参数需要包含数据库地址、端口、用户名、密码、数据库名等信息。
3.1.1 基础连接示例
import psycopg
# 定义数据库连接参数
conn_params = {
"host": "localhost", # 数据库服务器地址,本地为localhost
"port": 5432, # PostgreSQL 默认端口为5432
"user": "postgres", # 数据库用户名,默认管理员用户为postgres
"password": "123456", # 数据库密码,安装时设置
"dbname": "testdb" # 要连接的数据库名,需提前创建
}
# 建立数据库连接
try:
conn = psycopg.connect(**conn_params)
print("数据库连接成功!")
except psycopg.OperationalError as e:
print(f"数据库连接失败:{e}")
finally:
# 关闭数据库连接
if conn:
conn.close()
print("数据库连接已关闭!")代码说明:
psycopg.connect()函数接收关键字参数,返回一个Connection对象,代表与数据库的会话。- 使用
try-except捕获连接异常(如密码错误、数据库不存在、服务未启动等),避免程序崩溃。 - 最后通过
conn.close()关闭连接,释放数据库资源,这是必须执行的步骤。
3.1.2 使用上下文管理器(推荐)
手动关闭连接容易遗漏,psycopg3 支持使用 with 语句(上下文管理器)自动管理连接的创建和关闭,这是更优雅、更推荐的写法。
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
# 使用with语句自动管理连接
with psycopg.connect(**conn_params) as conn:
print("数据库连接成功!")
# 后续数据库操作写在这里
print("数据库连接已自动关闭!")代码说明:
- 当
with代码块执行完毕后,无论是否发生异常,连接都会自动关闭,无需手动调用conn.close()。 - 这种写法可以有效避免因忘记关闭连接导致的资源泄露问题。
3.2 游标对象与基本数据操作
建立数据库连接后,需要通过 游标对象(Cursor) 执行 SQL 语句。游标是数据库操作的核心接口,负责提交 SQL 命令并获取执行结果。
3.2.1 创建游标对象
在 with 语句块内,通过 conn.cursor() 方法创建游标对象:
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
# 创建游标对象
with conn.cursor() as cur:
print("游标对象创建成功!")代码说明:
- 游标对象也支持
with语句,执行完毕后自动关闭,释放游标资源。 - 所有 SQL 语句的执行都需要通过游标对象的方法实现。
3.2.2 创建数据表
下面通过游标执行 CREATE TABLE 语句,创建一个名为 students 的数据表,用于存储学生信息。
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
# 定义创建数据表的SQL语句
create_table_sql = """
CREATE TABLE IF NOT EXISTS students (
id SERIAL PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INTEGER NOT NULL,
gender VARCHAR(10),
score NUMERIC(5, 2)
);
"""
# 执行SQL语句
cur.execute(create_table_sql)
# 提交事务(psycopg3默认开启事务,执行修改操作后需要提交)
conn.commit()
print("数据表students创建成功!")代码说明:
CREATE TABLE IF NOT EXISTS表示如果数据表不存在则创建,避免重复创建导致的错误。SERIAL类型是 PostgreSQL 的自增整数类型,作为主键使用。cur.execute(sql)方法用于执行 SQL 语句,参数为字符串格式的 SQL 命令。conn.commit()用于提交事务,psycopg3 默认处于事务模式,所有对数据库的修改操作(创建表、插入、更新、删除)都需要提交事务才能生效。
3.2.3 插入数据
向数据表中插入数据有两种方式:单条数据插入和批量数据插入,psycopg3 推荐使用 参数化查询 方式,避免 SQL 注入攻击。
(1)单条数据插入
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
# 定义插入数据的SQL语句,使用%s作为参数占位符
insert_sql = """
INSERT INTO students (name, age, gender, score)
VALUES (%s, %s, %s, %s);
"""
# 定义要插入的数据
student_data = ("张三", 18, "男", 95.5)
# 执行插入操作
cur.execute(insert_sql, student_data)
# 提交事务
conn.commit()
print(f"成功插入 {cur.rowcount} 条数据!")代码说明:
- SQL 语句中使用
%s作为参数占位符,无论参数类型是什么,都统一使用%s,这是 psycopg3 的固定语法。 cur.execute()方法的第二个参数是一个元组,包含要插入的具体数据,元组长度必须与占位符数量一致。cur.rowcount属性返回受上一条 SQL 语句影响的行数,用于验证数据是否插入成功。
(2)批量数据插入
如果需要插入多条数据,使用 cur.executemany() 方法,效率远高于多次执行 cur.execute()。
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
insert_sql = """
INSERT INTO students (name, age, gender, score)
VALUES (%s, %s, %s, %s);
"""
# 定义多条学生数据,列表中每个元素是一个元组
students_data = [
("李四", 19, "女", 92.0),
("王五", 18, "男", 88.5),
("赵六", 20, "女", 96.0)
]
# 执行批量插入
cur.executemany(insert_sql, students_data)
conn.commit()
print(f"成功插入 {cur.rowcount} 条数据!")代码说明:
cur.executemany()方法接收两个参数:SQL 语句和包含多个元组的列表。- 批量插入可以减少与数据库的交互次数,大幅提升数据插入效率,适合大量数据插入场景。
3.2.4 查询数据
查询数据是数据库操作中最常用的功能,psycopg3 提供了多种方式获取查询结果,包括 cur.fetchone()、cur.fetchmany() 和 cur.fetchall()。
(1)获取所有数据(fetchall)
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
# 定义查询SQL语句
select_sql = "SELECT * FROM students;"
# 执行查询
cur.execute(select_sql)
# 获取所有查询结果
all_students = cur.fetchall()
# 遍历结果并打印
print("所有学生信息:")
for student in all_students:
print(f"ID: {student[0]}, 姓名: {student[1]}, 年龄: {student[2]}, 性别: {student[3]}, 分数: {student[4]}")代码说明:
cur.fetchall()方法返回一个列表,列表中的每个元素是一个元组,对应数据表中的一行数据。- 元组中的元素顺序与 SQL 查询语句中的字段顺序一致,例如
SELECT *表示按数据表字段顺序返回所有字段。
(2)获取单条数据(fetchone)
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
select_sql = "SELECT name, score FROM students WHERE age = %s;"
# 查询年龄为18的学生
cur.execute(select_sql, (18,))
# 获取第一条匹配的数据
student = cur.fetchone()
if student:
print(f"18岁学生信息:姓名={student[0]}, 分数={student[1]}")
else:
print("未找到18岁的学生!")代码说明:
cur.fetchone()方法返回查询结果的第一条数据,类型为元组;如果没有匹配数据,返回None。- 该方法适合只需要获取单条数据的场景,例如根据主键查询数据。
(3)获取指定数量数据(fetchmany)
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
select_sql = "SELECT name, age FROM students ORDER BY score DESC;"
cur.execute(select_sql)
# 获取前2条数据
top_students = cur.fetchmany(2)
print("分数最高的2名学生:")
for student in top_students:
print(f"姓名: {student[0]}, 年龄: {student[1]}")代码说明:
cur.fetchmany(size)方法接收一个整数参数size,表示要获取的数据条数,返回一个包含指定数量元组的列表。- 该方法适合分页查询场景,避免一次性获取大量数据导致内存占用过高。
3.2.5 更新数据
更新数据使用 UPDATE 语句,同样需要使用参数化查询,并提交事务才能生效。
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
# 定义更新SQL语句,将张三的分数更新为98.0
update_sql = "UPDATE students SET score = %s WHERE name = %s;"
cur.execute(update_sql, (98.0, "张三"))
conn.commit()
print(f"成功更新 {cur.rowcount} 条数据!")代码说明:
UPDATE语句中的WHERE子句用于指定更新条件,避免误更新整个数据表的所有数据。cur.rowcount返回被更新的行数,可用于验证更新操作是否成功。
3.2.6 删除数据
删除数据使用 DELETE 语句,同样需要指定条件,防止误删所有数据。
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
# 定义删除SQL语句,删除分数低于90的学生
delete_sql = "DELETE FROM students WHERE score < %s;"
cur.execute(delete_sql, (90.0,))
conn.commit()
print(f"成功删除 {cur.rowcount} 条数据!")代码说明:
DELETE语句中的WHERE子句是关键,若省略WHERE子句,将删除数据表中的所有数据。- 删除操作不可逆,执行前需谨慎确认条件是否正确。
3.3 事务管理
事务是数据库操作的基本单位,具有 原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability) 四个特性,简称 ACID。psycopg3 默认开启事务,所有修改操作都需要通过 conn.commit() 提交,若发生错误,可通过 conn.rollback() 回滚事务,撤销所有未提交的操作。
3.3.1 事务提交与回滚示例
import psycopg
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
try:
# 插入一条数据
insert_sql = "INSERT INTO students (name, age, gender, score) VALUES (%s, %s, %s, %s);"
cur.execute(insert_sql, ("孙七", 19, "男", 85.0))
# 故意触发错误(例如插入重复主键,这里用错误的SQL语句模拟)
error_sql = "INSERT INTO students (id, name) VALUES (1, '孙七');"
cur.execute(error_sql)
# 提交事务(如果前面出现错误,这行代码不会执行)
conn.commit()
print("事务提交成功!")
except psycopg.Error as e:
# 发生错误,回滚事务
conn.rollback()
print(f"事务执行失败,已回滚:{e}")代码说明:
- 当事务中的任何一个操作发生错误时,
except块会捕获异常,并执行conn.rollback(),撤销所有已执行但未提交的操作。 - 事务回滚可以保证数据库数据的一致性,避免因部分操作成功、部分操作失败导致的数据错乱。
3.4 异步操作
psycopg3 支持异步操作,通过 psycopg.asyncpg 模块实现,适配 Python 的 asyncio 框架,适合高并发的异步应用场景(如异步 Web 框架 FastAPI)。
3.4.1 异步连接与数据查询示例
import asyncio
import psycopg
from psycopg import AsyncConnection
# 定义异步函数
async def async_query():
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
# 建立异步数据库连接
async with await psycopg.AsyncConnection.connect(**conn_params) as conn:
# 创建异步游标
async with conn.cursor() as cur:
# 执行异步查询
await cur.execute("SELECT name, score FROM students;")
# 获取查询结果
results = await cur.fetchall()
print("异步查询结果:")
for name, score in results:
print(f"姓名: {name}, 分数: {score}")
# 运行异步函数
if __name__ == "__main__":
asyncio.run(async_query())代码说明:
- 异步操作需要使用
psycopg.AsyncConnection类,通过await关键字执行异步方法。 async with语句用于管理异步连接和异步游标,自动处理资源的创建和释放。- 异步操作可以显著提升高并发场景下的程序性能,避免因同步等待数据库响应导致的阻塞。
四、实际应用案例:学生成绩管理系统
下面结合一个实际案例,展示如何使用 psycopg3 开发一个简单的学生成绩管理系统,实现学生信息的添加、查询、更新和删除功能。
4.1 案例需求
- 能够添加新学生的成绩信息。
- 能够根据学生姓名查询成绩。
- 能够根据学生 ID 更新成绩。
- 能够根据学生姓名删除学生信息。
- 所有操作需要包含异常处理,确保程序稳定性。
4.2 完整代码实现
import psycopg
from typing import Optional, List, Tuple
class StudentScoreManager:
def __init__(self, conn_params: dict):
"""初始化管理器,接收数据库连接参数"""
self.conn_params = conn_params
def add_student(self, name: str, age: int, gender: str, score: float) -> bool:
"""添加新学生信息
Args:
name: 学生姓名
age: 学生年龄
gender: 学生性别
score: 学生分数
Returns:
添加成功返回True,失败返回False
"""
try:
with psycopg.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
insert_sql = """
INSERT INTO students (name, age, gender, score)
VALUES (%s, %s, %s, %s);
"""
cur.execute(insert_sql, (name, age, gender, score))
conn.commit()
print(f"添加学生 {name} 成功!")
return True
except psycopg.Error as e:
print(f"添加学生失败:{e}")
return False
def query_student(self, name: str) -> Optional[Tuple]:
"""根据姓名查询学生信息
Args:
name: 学生姓名
Returns:
找到学生返回元组,未找到返回None
"""
try:
with psycopg.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
select_sql = """
SELECT id, name, age, gender, score FROM students WHERE name = %s;
"""
cur.execute(select_sql, (name,))
student = cur.fetchone()
if student:
print(f"查询到学生信息:ID={student[0]}, 姓名={student[1]}, 年龄={student[2]}, 性别={student[3]}, 分数={student[4]}")
return student
else:
print(f"未找到姓名为 {name} 的学生!")
return None
except psycopg.Error as e:
print(f"查询学生失败:{e}")
return None
def update_score(self, student_id: int, new_score: float) -> bool:
"""根据学生ID更新分数
Args:
student_id: 学生ID
new_score: 新分数
Returns:
更新成功返回True,失败返回False
"""
try:
with psycopg.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
update_sql = "UPDATE students SET score = %s WHERE id = %s;"
cur.execute(update_sql, (new_score, student_id))
conn.commit()
if cur.rowcount > 0:
print(f"更新ID为 {student_id} 的学生分数成功!")
return True
else:
print(f"未找到ID为 {student_id} 的学生!")
return False
except psycopg.Error as e:
print(f"更新分数失败:{e}")
return False
def delete_student(self, name: str) -> bool:
"""根据姓名删除学生信息
Args:
name: 学生姓名
Returns:
删除成功返回True,失败返回False
"""
try:
with psycopg.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
delete_sql = "DELETE FROM students WHERE name = %s;"
cur.execute(delete_sql, (name,))
conn.commit()
if cur.rowcount > 0:
print(f"删除学生 {name} 成功!")
return True
else:
print(f"未找到姓名为 {name} 的学生!")
return False
except psycopg.Error as e:
print(f"删除学生失败:{e}")
return False
def list_all_students(self) -> List[Tuple]:
"""查询所有学生信息
Returns:
包含所有学生信息的列表
"""
try:
with psycopg.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM students;")
all_students = cur.fetchall()
print(f"共查询到 {len(all_students)} 名学生信息:")
for student in all_students:
print(f"ID: {student[0]}, 姓名: {student[1]}, 年龄: {student[2]}, 性别: {student[3]}, 分数: {student[4]}")
return all_students
except psycopg.Error as e:
print(f"查询所有学生失败:{e}")
return []
# 主程序入口
if __name__ == "__main__":
# 配置数据库连接参数
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
# 创建学生成绩管理器实例
manager = StudentScoreManager(conn_params)
# 执行各项操作
manager.add_student("周八", 19, "男", 91.5)
manager.query_student("周八")
manager.update_score(5, 94.0) # 假设周八的ID为5
manager.list_all_students()
manager.delete_student("周八")
manager.list_all_students()代码说明:
- 该案例通过面向对象的方式封装了学生成绩管理的核心功能,每个方法对应一个具体的数据库操作。
- 每个方法都包含了异常处理,确保程序在遇到数据库错误时不会崩溃,同时输出错误信息便于调试。
- 主程序入口创建了管理器实例,并依次执行添加、查询、更新、删除和列表查询操作,展示了完整的业务流程。
五、psycopg3 高级特性
5.1 类型适配
psycopg3 能够自动将 Python 数据类型转换为 PostgreSQL 支持的类型,同时也能将 PostgreSQL 类型转换为 Python 类型。例如:
- Python 的
int→ PostgreSQL 的INTEGER - Python 的
float→ PostgreSQL 的NUMERIC - Python 的
str→ PostgreSQL 的VARCHAR - Python 的
datetime.date→ PostgreSQL 的DATE
如果需要自定义类型适配,可以使用 psycopg.types 模块进行扩展。
5.2 连接池
在高并发应用中,频繁创建和关闭数据库连接会消耗大量资源,psycopg3 推荐使用连接池技术管理数据库连接。可以使用第三方库 psycopg_pool(psycopg 官方提供的连接池库)实现连接池功能。
5.2.1 安装 psycopg_pool
pip install psycopg_pool5.2.2 连接池使用示例
import psycopg
from psycopg_pool import ConnectionPool
# 创建连接池
conn_params = {
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "123456",
"dbname": "testdb"
}
pool = ConnectionPool(conninfo=conn_params, min_size=2, max_size=10)
# 从连接池获取连接
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT name FROM students LIMIT 2;")
print(cur.fetchall())
# 关闭连接池
pool.close()代码说明:
ConnectionPool类的min_size参数表示连接池的最小连接数,max_size表示最大连接数。- 使用
pool.connection()从连接池获取连接,使用完毕后自动归还到连接池,无需手动关闭。
六、相关资源链接
- Pypi地址:https://pypi.org/project/psycopg3
- Github地址:https://github.com/psycopg/psycopg
- 官方文档地址:https://www.psycopg.org/psycopg3/docs/
关注我,每天分享一个实用的Python自动化工具。

