一、dataset库的用途、工作原理、优缺点及License
用途:dataset是一款轻量级Python库,核心作用是简化关系型数据库的操作流程,无需编写复杂的SQL语句,就能实现数据的增删改查,同时支持多种数据库(SQLite、MySQL、PostgreSQL等),特别适合快速开发原型、小型应用和数据处理脚本。
工作原理:基于SQLAlchemy构建,将数据库表映射为Python中的字典对象,通过直观的API调用完成数据库交互,自动处理表结构创建、字段类型推断等底层工作,降低数据库操作门槛。
优缺点:优点是语法简洁、上手快、支持多数据库、自动管理表结构;缺点是不适合复杂的数据库事务和高性能场景,灵活性略逊于原生SQL。
License:采用MIT开源许可证,可自由用于商业和非商业项目。

二、dataset库的安装
dataset库可通过Python包管理工具pip直接安装,安装命令非常简单,打开命令行终端,输入以下指令:
pip install dataset
该命令会自动下载并安装dataset库及其依赖项(如SQLAlchemy等)。安装完成后,在Python脚本中导入dataset即可开始使用。
三、dataset库的核心使用方式及实例代码
3.1 连接数据库
dataset支持多种数据库,不同数据库的连接字符串格式不同,下面是最常用的几种数据库连接方式。
3.1.1 连接SQLite数据库
SQLite是文件型数据库,无需额外配置服务器,适合本地开发和小型应用。连接SQLite时,只需指定数据库文件路径,若文件不存在,dataset会自动创建。
import dataset
# 连接SQLite数据库,文件名为mydatabase.db,不存在则自动创建
db = dataset.connect('sqlite:///mydatabase.db')
# 连接内存中的SQLite数据库(程序结束后数据消失,适合测试)
# db = dataset.connect('sqlite:///:memory:')
代码说明:dataset.connect()方法接收一个数据库连接字符串,SQLite的连接字符串格式为sqlite:///文件路径,内存数据库则用sqlite:///:memory:。
3.1.2 连接MySQL数据库
连接MySQL需要先安装对应的驱动mysql-connector-python或pymysql,这里以pymysql为例,先安装依赖:
pip install pymysql
然后编写连接代码:
import dataset
# MySQL连接字符串格式:mysql+pymysql://用户名:密码@主机地址:端口/数据库名
db = dataset.connect('mysql+pymysql://root:123456@localhost:3306/mydb')
代码说明:连接字符串中需要替换为自己的MySQL用户名、密码、主机和数据库名,若数据库不存在,需要先在MySQL中创建。
3.1.3 连接PostgreSQL数据库
连接PostgreSQL需要安装驱动psycopg2-binary,安装命令:
pip install psycopg2-binary
连接代码如下:
import dataset
# PostgreSQL连接字符串格式:postgresql://用户名:密码@主机地址:端口/数据库名
db = dataset.connect('postgresql://postgres:123456@localhost:5432/mydb')
代码说明:同样需要替换为实际的PostgreSQL连接信息,确保数据库服务已启动。
3.2 操作数据库表
dataset将数据库表视为“集合”(Table),通过db['表名']的方式获取表对象,无需提前创建表结构,插入数据时会自动推断字段类型并创建表。
3.2.1 创建表并插入数据
单条数据插入:使用表对象的insert()方法,传入字典类型的数据,字典的键对应表的字段名,值对应字段值。
import dataset
# 连接SQLite数据库
db = dataset.connect('sqlite:///mydatabase.db')
# 获取或创建名为user的表
user_table = db['user']
# 插入单条用户数据
user_data = {
'name': '张三',
'age': 25,
'gender': '男',
'email': '[email protected]'
}
# 执行插入操作,返回插入的数据(包含自动生成的id)
inserted_user = user_table.insert(user_data)
print('插入的用户数据:', inserted_user)
代码说明:当db['user']被调用时,若user表不存在,dataset会自动创建该表;insert()方法执行后,会返回插入的数据,其中id字段是默认自动生成的主键。
多条数据批量插入:使用insert_many()方法,传入包含多个字典的列表,适合一次性插入大量数据,效率高于单条插入。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 批量准备3条用户数据
batch_user_data = [
{'name': '李四', 'age': 28, 'gender': '女', 'email': '[email protected]'},
{'name': '王五', 'age': 30, 'gender': '男', 'email': '[email protected]'},
{'name': '赵六', 'age': 22, 'gender': '男', 'email': '[email protected]'}
]
# 执行批量插入
user_table.insert_many(batch_user_data)
print('批量插入完成!')
代码说明:insert_many()方法接收一个字典列表,一次性将所有数据插入表中,减少数据库交互次数,提升插入效率。
3.2.2 查询数据
dataset提供了多种灵活的查询方法,满足不同场景的查询需求,无需编写SELECT语句。
查询所有数据:使用表对象的all()方法,返回表中所有数据,结果为ResultIter对象,可迭代遍历。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 查询user表中所有数据
all_users = user_table.all()
# 遍历并打印所有用户信息
print('所有用户信息:')
for user in all_users:
print(f"ID: {user['id']}, 姓名: {user['name']}, 年龄: {user['age']}, 邮箱: {user['email']}")
代码说明:all()方法返回的ResultIter对象可以像列表一样遍历,每个元素是一个字典,对应表中的一行数据。
条件查询数据:使用find()方法,传入查询条件的关键字参数,实现按条件筛选数据;也可以使用find_one()方法查询满足条件的单条数据。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 条件查询:查询年龄等于25的用户
users_age_25 = user_table.find(age=25)
print('年龄为25的用户:')
for user in users_age_25:
print(user)
# 条件查询:查询性别为男且年龄大于25的用户
# 使用**kwargs传入多条件,默认是AND关系
male_users_over_25 = user_table.find(gender='男', age={'gt': 25})
print('\n性别为男且年龄大于25的用户:')
for user in male_users_over_25:
print(user)
# 查询单条数据:查询姓名为李四的用户
lisi_user = user_table.find_one(name='李四')
print('\n姓名为李四的用户:', lisi_user)
代码说明:find()方法的关键字参数支持多种比较运算符,如{'gt': 25}表示大于25,{'lt': 30}表示小于30,{'ge': 22}表示大于等于22,{'le': 28}表示小于等于28;find_one()方法返回满足条件的第一条数据,若没有则返回None。
排序和分页查询:在find()方法中通过_order_by参数指定排序字段,通过_limit和_offset参数实现分页。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 按年龄降序排序,查询前2条数据
sorted_users = user_table.find(_order_by='-age', _limit=2)
print('按年龄降序的前2名用户:')
for user in sorted_users:
print(user)
# 分页查询:第2页,每页2条数据(_offset表示跳过的条数,_offset=2表示跳过前2条)
page_users = user_table.find(_limit=2, _offset=2)
print('\n分页查询第2页数据:')
for user in page_users:
print(user)
代码说明:排序字段前加-表示降序,不加则为升序;分页时,_offset的计算方式为(页码-1)*每页条数,例如第2页、每页2条,_offset=2。
3.2.3 更新数据
使用表对象的update()方法更新数据,需要传入要更新的字段字典和查询条件字典,指定更新哪些数据。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 更新数据:将姓名为张三的用户的邮箱改为[email protected]
# 参数1:要更新的字段字典;参数2:查询条件字典
user_table.update(
{'email': '[email protected]'}, # 更新的字段
{'name': '张三'} # 更新条件
)
print('数据更新完成!')
# 验证更新结果
updated_zhangsan = user_table.find_one(name='张三')
print('更新后的张三信息:', updated_zhangsan)
代码说明:update()方法的第一个参数是要修改的字段和新值,第二个参数是筛选条件,只有满足条件的数据会被更新。
3.2.4 删除数据
使用delete()方法删除数据,传入查询条件字典,删除满足条件的记录。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 删除数据:删除姓名为赵六的用户
user_table.delete(name='赵六')
print('数据删除完成!')
# 验证删除结果
deleted_user = user_table.find_one(name='赵六')
print('删除后查询赵六:', deleted_user) # 输出None,表示已删除
代码说明:delete()方法根据传入的条件删除数据,执行后无法撤销,使用时需要谨慎。
3.3 事务管理
事务是数据库操作的重要特性,确保一系列操作要么全部成功,要么全部失败。dataset通过db.begin()、db.commit()和db.rollback()方法实现事务管理。
import dataset
db = dataset.connect('sqlite:///mydatabase.db')
user_table = db['user']
# 开启事务
db.begin()
try:
# 执行多个数据库操作
user_table.insert({'name': '孙七', 'age': 35, 'gender': '男', 'email': '[email protected]'})
user_table.update({'age': 36}, {'name': '孙七'})
# 提交事务:所有操作生效
db.commit()
print('事务提交成功!')
except Exception as e:
# 发生异常时回滚事务:所有操作撤销
db.rollback()
print(f'事务执行失败,已回滚,错误信息:{e}')
代码说明:db.begin()开启事务,在try块中执行数据库操作,若全部成功则调用db.commit()提交事务;若发生异常,则调用db.rollback()回滚事务,撤销所有未提交的操作,保证数据一致性。
四、实际案例:使用dataset库构建简易学生成绩管理系统
4.1 案例需求
构建一个简易的学生成绩管理系统,支持以下功能:
- 添加学生成绩信息(学号、姓名、科目、分数);
- 查询指定学生的所有科目成绩;
- 更新指定学生指定科目的分数;
- 删除指定学号的学生成绩信息;
- 查询所有学生的成绩并按分数降序排序。
4.2 代码实现
import dataset
class ScoreManagementSystem:
def __init__(self, db_path):
# 初始化数据库连接
self.db = dataset.connect(f'sqlite:///{db_path}')
# 获取或创建score表
self.score_table = self.db['score']
def add_score(self, student_id, name, subject, score):
"""添加学生成绩信息"""
score_data = {
'student_id': student_id,
'name': name,
'subject': subject,
'score': score
}
self.score_table.insert(score_data)
print(f'成功添加 {name} 的 {subject} 成绩:{score}')
def query_student_score(self, student_id):
"""查询指定学生的所有科目成绩"""
scores = self.score_table.find(student_id=student_id, _order_by='subject')
print(f'\n学号 {student_id} 的学生成绩:')
for s in scores:
print(f'科目:{s["subject"]}, 分数:{s["score"]}')
return scores
def update_score(self, student_id, subject, new_score):
"""更新指定学生指定科目的分数"""
self.score_table.update(
{'score': new_score},
{'student_id': student_id, 'subject': subject}
)
print(f'\n成功更新学号 {student_id} {subject} 科目分数为:{new_score}')
def delete_student_score(self, student_id):
"""删除指定学号的学生成绩信息"""
self.score_table.delete(student_id=student_id)
print(f'\n成功删除学号 {student_id} 的所有成绩信息')
def query_all_scores(self):
"""查询所有学生成绩并按分数降序排序"""
all_scores = self.score_table.find(_order_by='-score')
print('\n所有学生成绩(按分数降序):')
for s in all_scores:
print(f'学号:{s["student_id"]}, 姓名:{s["name"]}, 科目:{s["subject"]}, 分数:{s["score"]}')
return all_scores
# 实例化成绩管理系统
if __name__ == '__main__':
sms = ScoreManagementSystem('student_scores.db')
# 1. 添加学生成绩
sms.add_score('2024001', '张三', '数学', 95)
sms.add_score('2024001', '张三', '语文', 88)
sms.add_score('2024002', '李四', '数学', 92)
sms.add_score('2024003', '王五', '英语', 90)
# 2. 查询指定学生成绩
sms.query_student_score('2024001')
# 3. 更新学生成绩
sms.update_score('2024001', '语文', 90)
sms.query_student_score('2024001') # 验证更新结果
# 4. 查询所有学生成绩
sms.query_all_scores()
# 5. 删除指定学生成绩
sms.delete_student_score('2024003')
sms.query_all_scores() # 验证删除结果
代码说明:本案例通过面向对象的方式封装了学生成绩管理的核心功能,利用dataset库的增删改查API实现数据操作,无需编写任何SQL语句。运行代码后,会自动创建student_scores.db数据库文件,并在其中生成score表,存储学生成绩信息。
五、dataset库相关资源
- Pypi地址:https://pypi.org/project/dataset
- Github地址:https://github.com/pudo/dataset
- 官方文档地址:https://dataset.readthedocs.io/
关注我,每天分享一个实用的Python自动化工具。

