PyMySQL:Python连接MySQL数据库的桥梁
Python作为一种功能强大且应用广泛的编程语言,凭借其简洁易读的语法和丰富的库支持,在Web开发、数据分析、人工智能、自动化脚本等众多领域都发挥着重要作用。无论是构建高效的Web应用、进行复杂的数据挖掘,还是开发智能的机器学习模型,Python都能提供合适的工具和库。在数据存储与管理方面,MySQL是一种广泛使用的关系型数据库管理系统,而PyMySQL则为Python开发者提供了一个便捷、高效的方式来连接和操作MySQL数据库。
PyMySQL是一个纯Python实现的MySQL数据库连接库,它允许Python程序通过标准的MySQL协议与MySQL数据库进行通信。其工作原理是通过实现MySQL客户端协议,将Python代码中的SQL语句转换为MySQL服务器能够理解的格式,并将服务器返回的结果转换为Python数据结构。PyMySQL的优点显著,它不需要依赖MySQL C客户端库,纯Python实现使得它具有良好的跨平台性,能够在各种操作系统上轻松安装和使用;同时,它的API设计简洁明了,易于学习和使用,与Python的数据库API标准(DB-API 2.0)兼容,方便开发者快速上手。不过,由于是纯Python实现,在处理大量数据或高并发场景时,性能可能会略逊于基于C语言的MySQL连接库。PyMySQL采用MIT许可证,这意味着它可以自由地用于商业和非商业项目,为开发者提供了极大的便利。
安装PyMySQL
在开始使用PyMySQL之前,需要先安装它。PyMySQL可以通过pip包管理器轻松安装,以下是安装命令:
pip install pymysql
安装完成后,就可以在Python代码中导入并使用PyMySQL了。
连接MySQL数据库
使用PyMySQL连接MySQL数据库是使用该库的第一步。下面是一个简单的示例,展示了如何连接到MySQL数据库:
import pymysql
# 建立数据库连接
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
print("数据库连接成功!")
except pymysql.Error as e:
print(f"数据库连接失败:{e}")
finally:
# 关闭数据库连接
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们使用pymysql.connect()方法建立与MySQL数据库的连接。该方法接受多个参数,包括数据库服务器的主机名(host)、用户名(user)、密码(password)、要连接的数据库名(database)等。cursorclass=pymysql.cursors.DictCursor参数指定使用字典游标,这样查询结果将以字典形式返回,方便我们访问和处理数据。
为了确保资源的正确释放,我们使用了try-except-finally结构。在try块中尝试建立连接,如果成功则打印连接成功的消息;在except块中捕获可能出现的异常并打印错误信息;在finally块中关闭数据库连接,确保无论连接是否成功,最终都会释放资源。
创建数据库表
连接到数据库后,通常需要创建表来存储数据。以下是一个创建用户表的示例:
import pymysql
try:
# 建立数据库连接
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
# 创建一个游标对象
with connection.cursor() as cursor:
# 定义创建表的SQL语句
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
# 执行SQL语句
cursor.execute(create_table_sql)
print("表创建成功!")
# 提交事务
connection.commit()
except pymysql.Error as e:
print(f"创建表时出错:{e}")
finally:
# 关闭数据库连接
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们首先建立了与数据库的连接。然后,创建了一个游标对象,游标是执行SQL语句和获取查询结果的关键对象。使用游标对象的execute()方法执行创建表的SQL语句。这里创建了一个名为users的表,包含id、username、email、age和created_at等字段,其中id是主键,email字段设置为唯一,确保不会有重复的邮箱地址。
需要注意的是,执行完创建表的SQL语句后,我们使用connection.commit()方法提交事务。在PyMySQL中,对于修改数据库的操作(如创建表、插入数据、更新数据等),需要显式提交事务才能使更改生效。
插入数据
创建表后,就可以向表中插入数据了。PyMySQL提供了多种插入数据的方式,下面分别介绍。
插入单条数据
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义插入数据的SQL语句
insert_sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
# 准备数据
user_data = ('john_doe', 'john@example.com', 30)
# 执行SQL语句
cursor.execute(insert_sql, user_data)
print(f"成功插入 {cursor.rowcount} 条数据!")
# 提交事务
connection.commit()
except pymysql.Error as e:
print(f"插入数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们使用参数化查询的方式插入数据。参数化查询不仅可以防止SQL注入攻击,还能提高查询效率。SQL语句中的%s是占位符,用于表示后续要传入的参数。cursor.execute()方法的第一个参数是SQL语句,第二个参数是一个元组,包含了要插入的数据。执行插入操作后,cursor.rowcount属性返回受影响的行数,即成功插入的数据条数。
批量插入数据
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义插入数据的SQL语句
insert_sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
# 准备多条数据
users_data = [
('alice_smith', 'alice@example.com', 25),
('bob_johnson', 'bob@example.com', 35),
('charlie_brown', 'charlie@example.com', 40)
]
# 执行批量插入
cursor.executemany(insert_sql, users_data)
print(f"成功插入 {cursor.rowcount} 条数据!")
# 提交事务
connection.commit()
except pymysql.Error as e:
print(f"批量插入数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
当需要插入多条数据时,可以使用cursor.executemany()方法。该方法接受两个参数,第一个是SQL语句,第二个是一个包含多个元组的列表,每个元组代表一条记录的数据。使用executemany()方法可以显著提高批量插入数据的效率。
查询数据
查询数据是数据库操作中最常见的操作之一。PyMySQL提供了多种查询数据的方式,下面分别介绍。
查询单条数据
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义查询SQL语句
select_sql = "SELECT * FROM users WHERE id = %s"
# 执行查询
cursor.execute(select_sql, (1,))
# 获取单条结果
user = cursor.fetchone()
if user:
print("查询结果:")
for key, value in user.items():
print(f"{key}: {value}")
else:
print("未找到匹配的记录。")
except pymysql.Error as e:
print(f"查询数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们使用cursor.fetchone()方法获取查询结果的第一条记录。由于我们在连接数据库时指定了cursorclass=pymysql.cursors.DictCursor,所以查询结果以字典形式返回,键是字段名,值是字段值。如果没有找到匹配的记录,fetchone()方法将返回None。
查询多条数据
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义查询SQL语句
select_sql = "SELECT * FROM users WHERE age > %s"
# 执行查询
cursor.execute(select_sql, (30,))
# 获取所有结果
users = cursor.fetchall()
if users:
print(f"共找到 {len(users)} 条记录:")
for user in users:
print(f"ID: {user['id']}, 用户名: {user['username']}, 邮箱: {user['email']}, 年龄: {user['age']}")
else:
print("未找到匹配的记录。")
except pymysql.Error as e:
print(f"查询数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
当需要获取多条记录时,可以使用cursor.fetchall()方法。该方法返回一个包含所有查询结果的列表,列表中的每个元素是一个字典,表示一条记录。我们可以通过遍历这个列表来处理每条记录的数据。
分页查询数据
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义每页显示的记录数和当前页码
page_size = 2
current_page = 1
# 计算偏移量
offset = (current_page - 1) * page_size
# 定义分页查询SQL语句
select_sql = "SELECT * FROM users LIMIT %s OFFSET %s"
# 执行查询
cursor.execute(select_sql, (page_size, offset))
# 获取当前页的记录
users = cursor.fetchall()
if users:
print(f"第 {current_page} 页的记录:")
for user in users:
print(f"ID: {user['id']}, 用户名: {user['username']}, 邮箱: {user['email']}, 年龄: {user['age']}")
else:
print("未找到匹配的记录。")
except pymysql.Error as e:
print(f"分页查询数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在处理大量数据时,通常需要进行分页查询。分页查询通过LIMIT和OFFSET子句实现,LIMIT指定每页显示的记录数,OFFSET指定从哪条记录开始查询。在这个示例中,我们通过计算偏移量来实现分页查询,获取指定页的数据。
更新数据
更新数据是修改数据库中已有记录的操作。以下是一个更新用户年龄的示例:
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义更新SQL语句
update_sql = "UPDATE users SET age = %s WHERE id = %s"
# 执行更新
cursor.execute(update_sql, (31, 1))
# 检查受影响的行数
if cursor.rowcount > 0:
print(f"成功更新 {cursor.rowcount} 条记录!")
else:
print("未找到匹配的记录,更新失败。")
# 提交事务
connection.commit()
except pymysql.Error as e:
print(f"更新数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们使用UPDATE语句更新users表中id为1的记录的age字段。执行更新操作后,通过cursor.rowcount属性可以知道有多少条记录被更新。同样,更新操作需要提交事务才能生效。
删除数据
删除数据是从数据库中移除记录的操作。以下是一个删除用户记录的示例:
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 定义删除SQL语句
delete_sql = "DELETE FROM users WHERE id = %s"
# 执行删除
cursor.execute(delete_sql, (1,))
# 检查受影响的行数
if cursor.rowcount > 0:
print(f"成功删除 {cursor.rowcount} 条记录!")
else:
print("未找到匹配的记录,删除失败。")
# 提交事务
connection.commit()
except pymysql.Error as e:
print(f"删除数据时出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们使用DELETE语句删除users表中id为1的记录。执行删除操作后,通过cursor.rowcount属性可以知道有多少条记录被删除。删除操作同样需要提交事务才能生效。
使用上下文管理器管理数据库连接
在前面的示例中,我们使用try-except-finally结构来确保数据库连接的正确关闭。实际上,PyMySQL的连接对象可以作为上下文管理器使用,这样可以更简洁地管理数据库连接。以下是一个使用上下文管理器的示例:
import pymysql
# 定义数据库连接参数
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database',
'cursorclass': pymysql.cursors.DictCursor
}
# 使用上下文管理器管理数据库连接
with pymysql.connect(**db_config) as connection:
with connection.cursor() as cursor:
# 执行查询
select_sql = "SELECT COUNT(*) as total FROM users"
cursor.execute(select_sql)
result = cursor.fetchone()
print(f"用户表中共有 {result['total']} 条记录。")
在这个示例中,我们使用with语句创建数据库连接,这样当代码块执行完毕后,数据库连接会自动关闭。同时,我们也使用with语句创建游标对象,游标对象在使用完毕后也会自动关闭。这种方式不仅代码更简洁,还能确保资源的正确释放,避免内存泄漏。
事务处理
事务是数据库操作中不可分割的一组操作序列,要么全部执行成功,要么全部不执行。PyMySQL提供了对事务的支持,以下是一个事务处理的示例:
import pymysql
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 开始事务
connection.begin()
try:
# 执行第一个操作:从账户1转出100元
update_sql1 = "UPDATE accounts SET balance = balance - 100 WHERE account_id = 1"
cursor.execute(update_sql1)
# 模拟一个错误
# result = 1 / 0 # 取消注释此行将触发错误
# 执行第二个操作:向账户2转入100元
update_sql2 = "UPDATE accounts SET balance = balance + 100 WHERE account_id = 2"
cursor.execute(update_sql2)
# 提交事务
connection.commit()
print("转账成功!")
except Exception as e:
# 回滚事务
connection.rollback()
print(f"转账失败:{e}")
print("事务已回滚。")
except pymysql.Error as e:
print(f"数据库操作出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们模拟了一个转账操作,需要从一个账户转出资金并转入另一个账户。这两个操作必须作为一个事务来执行,确保资金的一致性。我们使用connection.begin()方法开始一个事务,然后执行两个更新操作。如果在执行过程中发生异常,使用connection.rollback()方法回滚事务,撤销已经执行的操作;如果所有操作都成功执行,使用connection.commit()方法提交事务,使所有更改生效。
批量操作优化
在处理大量数据时,批量操作可以显著提高性能。以下是一个批量插入数据的优化示例:
import pymysql
import time
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 创建测试表
create_table_sql = """
CREATE TABLE IF NOT EXISTS test_data (
id INT PRIMARY KEY AUTO_INCREMENT,
value VARCHAR(100)
)
"""
cursor.execute(create_table_sql)
# 准备10000条测试数据
data = [(f"value_{i}",) for i in range(10000)]
# 方法1:逐条插入
start_time = time.time()
for item in data:
cursor.execute("INSERT INTO test_data (value) VALUES (%s)", item)
connection.commit()
end_time = time.time()
print(f"逐条插入耗时:{end_time - start_time} 秒")
# 清空表
cursor.execute("TRUNCATE TABLE test_data")
connection.commit()
# 方法2:批量插入
start_time = time.time()
cursor.executemany("INSERT INTO test_data (value) VALUES (%s)", data)
connection.commit()
end_time = time.time()
print(f"批量插入耗时:{end_time - start_time} 秒")
except pymysql.Error as e:
print(f"数据库操作出错:{e}")
finally:
if connection:
connection.close()
print("数据库连接已关闭。")
在这个示例中,我们比较了逐条插入和批量插入10000条数据的性能差异。通过使用executemany()方法进行批量插入,性能可以得到显著提升。在实际应用中,对于大量数据的插入、更新操作,应尽量使用批量操作的方式。
实际案例:数据统计分析
下面通过一个实际案例来展示PyMySQL的综合应用。假设我们有一个电子商务网站,需要统计分析用户的购买行为数据。我们将创建相关表,插入测试数据,然后进行统计分析。
import pymysql
import random
from datetime import datetime, timedelta
try:
# 连接数据库
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='ecommerce',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 创建用户表
create_users_table = """
CREATE TABLE IF NOT EXISTS users (
user_id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
registration_date DATE
)
"""
cursor.execute(create_users_table)
# 创建产品表
create_products_table = """
CREATE TABLE IF NOT EXISTS products (
product_id INT PRIMARY KEY AUTO_INCREMENT,
product_name VARCHAR(100) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
category VARCHAR(50)
)
"""
cursor.execute(create_products_table)
# 创建订单表
create_orders_table = """
CREATE TABLE IF NOT EXISTS orders (
order_id INT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
order_date DATETIME NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
"""
cursor.execute(create_orders_table)
# 创建订单明细表
create_order_items_table = """
CREATE TABLE IF NOT EXISTS order_items (
item_id INT PRIMARY KEY AUTO_INCREMENT,
order_id INT NOT NULL,
product_id INT NOT NULL,
quantity INT NOT NULL,
price DECIMAL(10, 2) NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
)
"""
cursor.execute(create_order_items_table)
# 插入测试数据
# 插入用户数据
users = [
('user1', 'user1@example.com', '2023-01-01'),
('user2', 'user2@example.com', '2023-02-15'),
('user3', 'user3@example.com', '2023-03-20'),
('user4', 'user4@example.com', '2023-04-10'),
('user5', 'user5@example.com', '2023-05-05')
]
cursor.executemany("INSERT INTO users (username, email, registration_date) VALUES (%s, %s, %s)", users)
# 插入产品数据
products = [
('手机', 5999.00, '电子产品'),
('笔记本电脑', 8999.00, '电子产品'),
('平板电脑', 3299.00, '电子产品'),
('耳机', 899.00, '电子产品'),
('T恤', 99.00, '服装'),
('牛仔裤', 299.00, '服装'),
('运动鞋', 599.00, '鞋类'),
('背包', 399.00, '箱包'),
('书籍', 49.00, '图书'),
('咖啡', 59.00, '食品')
]
cursor.executemany("INSERT INTO products (product_name, price, category) VALUES (%s, %s, %s)", products)
# 生成订单数据
order_count = 20
for i in range(1, order_count + 1):
user_id = random.randint(1, 5)
order_date = datetime.now() - timedelta(days=random.randint(1, 90))
total_amount = 0.0
# 插入订单
insert_order_sql = "INSERT INTO orders (user_id, order_date, total_amount) VALUES (%s, %s, %s)"
cursor.execute(insert_order_sql, (user_id, order_date, total_amount))
order_id = cursor.lastrowid
# 生成订单明细
item_count = random.randint(1, 5)
order_items = []
for j in range(item_count):
product_id = random.randint(1, 10)
price = products[product_id - 1][1]
quantity = random.randint(1, 3)
item_total = price * quantity
total_amount += item_total
order_items.append((order_id, product_id, quantity, price))
# 插入订单明细
insert_items_sql = "INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (%s, %s, %s, %s)"
cursor.executemany(insert_items_sql, order_items)
# 更新订单总金额
update_order_sql = "UPDATE orders SET total_amount = %s WHERE order_id = %s"
cursor.execute(update_order_sql, (total_amount, order_id))
# 提交所有插入操作
connection.commit()
# 统计分析
print("\n--- 数据统计分析 ---")
# 1. 统计总用户数
cursor.execute("SELECT COUNT(*) as total_users FROM users")
result = cursor.fetchone()
print(f"总用户数: {result['total_users']}")
# 2. 统计总订单数和总销售额
cursor.execute("SELECT COUNT(*) as total_orders, SUM(total_amount) as total_sales FROM orders")
result = cursor.fetchone()
print(f"总订单数: {result['total_orders']}, 总销售额: {result['total_sales']:.2f}元")
# 3. 统计最畅销的5种产品
popular_products_sql = """
SELECT p.product_name, SUM(oi.quantity) as total_sold
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
GROUP BY p.product_id
ORDER BY total_sold DESC
LIMIT 5
"""
cursor.execute(popular_products_sql)
print("\n最畅销的5种产品:")
for i, product in enumerate(cursor.fetchall(), 1):
print(f"{i}. {product['product_name']}: {product['total_sold']}件")
# 4. 统计每月销售额
monthly_sales_sql = """
SELECT
DATE_FORMAT(order_date, '%Y-%m') as month,
SUM(total_amount) as monthly_sales
FROM orders
GROUP BY month
ORDER BY month
"""
cursor.execute(monthly_sales_sql)
print("\n每月销售额:")
for month_data in cursor.fetchall():
print(f"{month_data['month']}: {month_data['monthly_sales']:.2f}元")
# 5. 统计每个用户的平均订单金额
avg_order_per_user_sql = """
SELECT
u.username,
COUNT(o.order_id) as order_count,
AVG(o.total_amount) as avg_order_amount
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
GROUP BY u.user_id
ORDER BY avg_order_amount DESC
"""
cursor.execute(avg_order_per_user_sql)
print("\n每个用户的平均订单金额:")
for user_data in cursor.fetchall():
print(f"{user_data['username']}: 订单数 {user_data['order_count']}, 平均金额 {user_data['avg_order_amount']:.2f}元")
except pymysql.Error as e:
print(f"数据库操作出错:{e}")
finally:
if connection:
connection.close()
print("\n数据库连接已关闭。")
这个实际案例展示了如何使用PyMySQL进行数据库设计、数据插入和复杂查询统计。我们创建了用户、产品、订单和订单明细四个表,然后生成了测试数据,最后进行了一系列统计分析,包括用户数量统计、销售额统计、畅销产品分析、销售趋势分析等。通过这个案例,你可以看到PyMySQL在实际项目中的强大功能和灵活性。
总结案例:学生成绩管理系统
下面我们再通过一个学生成绩管理系统的案例来进一步展示PyMySQL的应用。这个系统可以实现学生信息管理、课程管理、成绩录入和查询统计等功能。
import pymysql
from prettytable import PrettyTable
class StudentGradeManager:
def __init__(self, host, user, password, database):
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
cursorclass=pymysql.cursors.DictCursor
)
def create_tables(self):
"""创建数据库表"""
with self.connection.cursor() as cursor:
# 创建学生表
create_students_table = """
CREATE TABLE IF NOT EXISTS students (
student_id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
gender ENUM('男', '女') NOT NULL,
birth_date DATE,
class VARCHAR(20)
)
"""
cursor.execute(create_students_table)
# 创建课程表
create_courses_table = """
CREATE TABLE IF NOT EXISTS courses (
course_id INT PRIMARY KEY AUTO_INCREMENT,
course_name VARCHAR(50) NOT NULL,
teacher VARCHAR(50)
)
"""
cursor.execute(create_courses_table)
# 创建成绩表
create_grades_table = """
CREATE TABLE IF NOT EXISTS grades (
grade_id INT PRIMARY KEY AUTO_INCREMENT,
student_id INT NOT NULL,
course_id INT NOT NULL,
score DECIMAL(5, 2) NOT NULL,
exam_date DATE NOT NULL,
FOREIGN KEY (student_id) REFERENCES students(student_id),
FOREIGN KEY (course_id) REFERENCES courses(course_id)
)
"""
cursor.execute(create_grades_table)
self.connection.commit()
print("数据库表创建成功!")
def add_student(self, name, gender, birth_date, class_name):
"""添加学生"""
with self.connection.cursor() as cursor:
sql = "INSERT INTO students (name, gender, birth_date, class) VALUES (%s, %s, %s, %s)"
cursor.execute(sql, (name, gender, birth_date, class_name))
student_id = cursor.lastrowid
self.connection.commit()
print(f"学生 {name} 添加成功,ID为 {student_id}")
return student_id
def add_course(self, course_name, teacher):
"""添加课程"""
with self.connection.cursor() as cursor:
sql = "INSERT INTO courses (course_name, teacher) VALUES (%s, %s)"
cursor.execute(sql, (course_name, teacher))
course_id = cursor.lastrowid
self.connection.commit()
print(f"课程 {course_name} 添加成功,ID为 {course_id}")
return course_id
def add_grade(self, student_id, course_id, score, exam_date):
"""添加成绩"""
with self.connection.cursor() as cursor:
sql = "INSERT INTO grades (student_id, course_id, score, exam_date) VALUES (%s, %s, %s, %s)"
cursor.execute(sql, (student_id, course_id, score, exam_date))
self.connection.commit()
print(f"成绩添加成功!")
def get_all_students(self):
"""获取所有学生信息"""
with self.connection.cursor() as cursor:
sql = "SELECT * FROM students"
cursor.execute(sql)
return cursor.fetchall()
def get_all_courses(self):
"""获取所有课程信息"""
with self.connection.cursor() as cursor:
sql = "SELECT * FROM courses"
cursor.execute(sql)
return cursor.fetchall()
def get_student_grades(self, student_id):
"""获取学生成绩"""
with self.connection.cursor() as cursor:
sql = """
SELECT c.course_name, g.score, g.exam_date
FROM grades g
JOIN courses c ON g.course_id = c.course_id
WHERE g.student_id = %s
"""
cursor.execute(sql, (student_id,))
return cursor.fetchall()
def get_course_grades(self, course_id):
"""获取课程成绩"""
with self.connection.cursor() as cursor:
sql = """
SELECT s.name, g.score, g.exam_date
FROM grades g
JOIN students s ON g.student_id = s.student_id
WHERE g.course_id = %s
"""
cursor.execute(sql, (course_id,))
return cursor.fetchall()
def get_class_average(self, course_id):
"""获取课程平均分"""
with self.connection.cursor() as cursor:
sql = """
SELECT AVG(score) as average_score
FROM grades
WHERE course_id = %s
"""
cursor.execute(sql, (course_id,))
return cursor.fetchone()['average_score']
def close(self):
"""关闭数据库连接"""
self.connection.close()
print("数据库连接已关闭。")
# 使用示例
if __name__ == "__main__":
# 创建成绩管理系统实例
manager = StudentGradeManager(
host='localhost',
user='your_username',
password='your_password',
database='student_grades'
)
# 创建表
manager.create_tables()
# 添加学生
student1_id = manager.add_student("张三", "男", "2005-01-15", "高一(1)班")
student2_id = manager.add_student("李四", "女", "2005-03-20", "高一(1)班")
student3_id = manager.add_student("王五", "男", "2005-02-10", "高一(2)班")
# 添加课程
math_id = manager.add_course("数学", "李老师")
chinese_id = manager.add_course("语文", "王老师")
english_id = manager.add_course("英语", "张老师")
# 添加成绩
manager.add_grade(student1_id, math_id, 95.5, "2023-06-10")
manager.add_grade(student1_id, chinese_id, 88.0, "2023-06-10")
manager.add_grade(student1_id, english_id, 92.0, "2023-06-10")
manager.add_grade(student2_id, math_id, 87.0, "2023-06-10")
manager.add_grade(student2_id, chinese_id, 91.5, "2023-06-10")
manager.add_grade(student2_id, english_id, 96.0, "2023-06-10")
manager.add_grade(student3_id, math_id, 92.0, "2023-06-10")
manager.add_grade(student3_id, chinese_id, 85.0, "2023-06-10")
manager.add_grade(student3_id, english_id, 89.5, "2023-06-10")
# 显示所有学生
print("\n--- 所有学生信息 ---")
students = manager.get_all_students()
table = PrettyTable()
table.field_names = ["ID", "姓名", "性别", "出生日期", "班级"]
for student in students:
table.add_row([student["student_id"], student["name"], student["gender"],
student["birth_date"], student["class"]])
print(table)
# 显示所有课程
print("\n--- 所有课程信息 ---")
courses = manager.get_all_courses()
table = PrettyTable()
table.field_names = ["ID", "课程名称", "授课老师"]
for course in courses:
table.add_row([course["course_id"], course["course_name"], course["teacher"]])
print(table)
# 显示学生成绩
print("\n--- 学生成绩 ---")
for student_id in [student1_id, student2_id, student3_id]:
grades = manager.get_student_grades(student_id)
if grades:
table = PrettyTable()
table.field_names = ["课程", "分数", "考试日期"]
for grade in grades:
table.add_row([grade["course_name"], grade["score"], grade["exam_date"]])
print(f"\n{students[student_id-1]['name']}的成绩:")
print(table)
# 显示课程平均分
print("\n--- 课程平均分 ---")
for course_id in [math_id, chinese_id, english_id]:
avg_score = manager.get_class_average(course_id)
print(f"{courses[course_id-1]['course_name']}平均分: {avg_score:.2f}")
# 关闭连接
manager.close()
这个学生成绩管理系统展示了如何使用PyMySQL构建一个完整的应用程序。系统通过面向对象的方式组织代码,提供了学生信息管理、课程管理、成绩录入和查询统计等功能。使用PrettyTable库美化了输出结果,使数据展示更加直观。这个案例涵盖了PyMySQL的各种操作,包括数据库连接、表创建、数据插入、查询和统计分析等。
通过以上的介绍和实例,你可以看到PyMySQL是一个功能强大、使用方便的Python库,它为Python开发者提供了一个高效、灵活的方式来连接和操作MySQL数据库。无论是小型项目还是大型应用,PyMySQL都能满足你的需求。
相关资源
- Pypi地址:https://pypi.org/project/PyMySQL
- Github地址:https://github.com/PyMySQL/PyMySQL
- 官方文档地址:https://pymysql.readthedocs.io/en/latest/
关注我,每天分享一个实用的Python自动化工具。
