站点图标 Park Lam's 每日分享

Python使用工具:PyMySQL库使用教程

PyMySQL:Python连接MySQL数据库的桥梁

Python作为一种功能强大且应用广泛的编程语言,凭借其简洁易读的语法和丰富的库支持,在Web开发、数据分析、人工智能、自动化脚本等众多领域都发挥着重要作用。无论是构建高效的Web应用、进行复杂的数据挖掘,还是开发智能的机器学习模型,Python都能提供合适的工具和库。在数据存储与管理方面,MySQL是一种广泛使用的关系型数据库管理系统,而PyMySQL则为Python开发者提供了一个便捷、高效的方式来连接和操作MySQL数据库。

PyMySQL是一个纯Python实现的MySQL数据库连接库,它允许Python程序通过标准的MySQL协议与MySQL数据库进行通信。其工作原理是通过实现MySQL客户端协议,将Python代码中的SQL语句转换为MySQL服务器能够理解的格式,并将服务器返回的结果转换为Python数据结构。PyMySQL的优点显著,它不需要依赖MySQL C客户端库,纯Python实现使得它具有良好的跨平台性,能够在各种操作系统上轻松安装和使用;同时,它的API设计简洁明了,易于学习和使用,与Python的数据库API标准(DB-API 2.0)兼容,方便开发者快速上手。不过,由于是纯Python实现,在处理大量数据或高并发场景时,性能可能会略逊于基于C语言的MySQL连接库。PyMySQL采用MIT许可证,这意味着它可以自由地用于商业和非商业项目,为开发者提供了极大的便利。

安装PyMySQL

在开始使用PyMySQL之前,需要先安装它。PyMySQL可以通过pip包管理器轻松安装,以下是安装命令:

pip install pymysql

安装完成后,就可以在Python代码中导入并使用PyMySQL了。

连接MySQL数据库

使用PyMySQL连接MySQL数据库是使用该库的第一步。下面是一个简单的示例,展示了如何连接到MySQL数据库:

import pymysql

# 建立数据库连接
try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )
    print("数据库连接成功!")
except pymysql.Error as e:
    print(f"数据库连接失败:{e}")
finally:
    # 关闭数据库连接
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们使用pymysql.connect()方法建立与MySQL数据库的连接。该方法接受多个参数,包括数据库服务器的主机名(host)、用户名(user)、密码(password)、要连接的数据库名(database)等。cursorclass=pymysql.cursors.DictCursor参数指定使用字典游标,这样查询结果将以字典形式返回,方便我们访问和处理数据。

为了确保资源的正确释放,我们使用了try-except-finally结构。在try块中尝试建立连接,如果成功则打印连接成功的消息;在except块中捕获可能出现的异常并打印错误信息;在finally块中关闭数据库连接,确保无论连接是否成功,最终都会释放资源。

创建数据库表

连接到数据库后,通常需要创建表来存储数据。以下是一个创建用户表的示例:

import pymysql

try:
    # 建立数据库连接
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    # 创建一个游标对象
    with connection.cursor() as cursor:
        # 定义创建表的SQL语句
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS users (
            id INT PRIMARY KEY AUTO_INCREMENT,
            username VARCHAR(50) NOT NULL,
            email VARCHAR(100) NOT NULL UNIQUE,
            age INT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """

        # 执行SQL语句
        cursor.execute(create_table_sql)
        print("表创建成功!")

    # 提交事务
    connection.commit()

except pymysql.Error as e:
    print(f"创建表时出错:{e}")
finally:
    # 关闭数据库连接
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们首先建立了与数据库的连接。然后,创建了一个游标对象,游标是执行SQL语句和获取查询结果的关键对象。使用游标对象的execute()方法执行创建表的SQL语句。这里创建了一个名为users的表,包含idusernameemailagecreated_at等字段,其中id是主键,email字段设置为唯一,确保不会有重复的邮箱地址。

需要注意的是,执行完创建表的SQL语句后,我们使用connection.commit()方法提交事务。在PyMySQL中,对于修改数据库的操作(如创建表、插入数据、更新数据等),需要显式提交事务才能使更改生效。

插入数据

创建表后,就可以向表中插入数据了。PyMySQL提供了多种插入数据的方式,下面分别介绍。

插入单条数据

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义插入数据的SQL语句
        insert_sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"

        # 准备数据
        user_data = ('john_doe', 'john@example.com', 30)

        # 执行SQL语句
        cursor.execute(insert_sql, user_data)
        print(f"成功插入 {cursor.rowcount} 条数据!")

    # 提交事务
    connection.commit()

except pymysql.Error as e:
    print(f"插入数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们使用参数化查询的方式插入数据。参数化查询不仅可以防止SQL注入攻击,还能提高查询效率。SQL语句中的%s是占位符,用于表示后续要传入的参数。cursor.execute()方法的第一个参数是SQL语句,第二个参数是一个元组,包含了要插入的数据。执行插入操作后,cursor.rowcount属性返回受影响的行数,即成功插入的数据条数。

批量插入数据

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义插入数据的SQL语句
        insert_sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"

        # 准备多条数据
        users_data = [
            ('alice_smith', 'alice@example.com', 25),
            ('bob_johnson', 'bob@example.com', 35),
            ('charlie_brown', 'charlie@example.com', 40)
        ]

        # 执行批量插入
        cursor.executemany(insert_sql, users_data)
        print(f"成功插入 {cursor.rowcount} 条数据!")

    # 提交事务
    connection.commit()

except pymysql.Error as e:
    print(f"批量插入数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

当需要插入多条数据时,可以使用cursor.executemany()方法。该方法接受两个参数,第一个是SQL语句,第二个是一个包含多个元组的列表,每个元组代表一条记录的数据。使用executemany()方法可以显著提高批量插入数据的效率。

查询数据

查询数据是数据库操作中最常见的操作之一。PyMySQL提供了多种查询数据的方式,下面分别介绍。

查询单条数据

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义查询SQL语句
        select_sql = "SELECT * FROM users WHERE id = %s"

        # 执行查询
        cursor.execute(select_sql, (1,))

        # 获取单条结果
        user = cursor.fetchone()

        if user:
            print("查询结果:")
            for key, value in user.items():
                print(f"{key}: {value}")
        else:
            print("未找到匹配的记录。")

except pymysql.Error as e:
    print(f"查询数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们使用cursor.fetchone()方法获取查询结果的第一条记录。由于我们在连接数据库时指定了cursorclass=pymysql.cursors.DictCursor,所以查询结果以字典形式返回,键是字段名,值是字段值。如果没有找到匹配的记录,fetchone()方法将返回None

查询多条数据

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义查询SQL语句
        select_sql = "SELECT * FROM users WHERE age > %s"

        # 执行查询
        cursor.execute(select_sql, (30,))

        # 获取所有结果
        users = cursor.fetchall()

        if users:
            print(f"共找到 {len(users)} 条记录:")
            for user in users:
                print(f"ID: {user['id']}, 用户名: {user['username']}, 邮箱: {user['email']}, 年龄: {user['age']}")
        else:
            print("未找到匹配的记录。")

except pymysql.Error as e:
    print(f"查询数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

当需要获取多条记录时,可以使用cursor.fetchall()方法。该方法返回一个包含所有查询结果的列表,列表中的每个元素是一个字典,表示一条记录。我们可以通过遍历这个列表来处理每条记录的数据。

分页查询数据

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义每页显示的记录数和当前页码
        page_size = 2
        current_page = 1

        # 计算偏移量
        offset = (current_page - 1) * page_size

        # 定义分页查询SQL语句
        select_sql = "SELECT * FROM users LIMIT %s OFFSET %s"

        # 执行查询
        cursor.execute(select_sql, (page_size, offset))

        # 获取当前页的记录
        users = cursor.fetchall()

        if users:
            print(f"第 {current_page} 页的记录:")
            for user in users:
                print(f"ID: {user['id']}, 用户名: {user['username']}, 邮箱: {user['email']}, 年龄: {user['age']}")
        else:
            print("未找到匹配的记录。")

except pymysql.Error as e:
    print(f"分页查询数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在处理大量数据时,通常需要进行分页查询。分页查询通过LIMITOFFSET子句实现,LIMIT指定每页显示的记录数,OFFSET指定从哪条记录开始查询。在这个示例中,我们通过计算偏移量来实现分页查询,获取指定页的数据。

更新数据

更新数据是修改数据库中已有记录的操作。以下是一个更新用户年龄的示例:

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义更新SQL语句
        update_sql = "UPDATE users SET age = %s WHERE id = %s"

        # 执行更新
        cursor.execute(update_sql, (31, 1))

        # 检查受影响的行数
        if cursor.rowcount > 0:
            print(f"成功更新 {cursor.rowcount} 条记录!")
        else:
            print("未找到匹配的记录,更新失败。")

    # 提交事务
    connection.commit()

except pymysql.Error as e:
    print(f"更新数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们使用UPDATE语句更新users表中id为1的记录的age字段。执行更新操作后,通过cursor.rowcount属性可以知道有多少条记录被更新。同样,更新操作需要提交事务才能生效。

删除数据

删除数据是从数据库中移除记录的操作。以下是一个删除用户记录的示例:

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 定义删除SQL语句
        delete_sql = "DELETE FROM users WHERE id = %s"

        # 执行删除
        cursor.execute(delete_sql, (1,))

        # 检查受影响的行数
        if cursor.rowcount > 0:
            print(f"成功删除 {cursor.rowcount} 条记录!")
        else:
            print("未找到匹配的记录,删除失败。")

    # 提交事务
    connection.commit()

except pymysql.Error as e:
    print(f"删除数据时出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们使用DELETE语句删除users表中id为1的记录。执行删除操作后,通过cursor.rowcount属性可以知道有多少条记录被删除。删除操作同样需要提交事务才能生效。

使用上下文管理器管理数据库连接

在前面的示例中,我们使用try-except-finally结构来确保数据库连接的正确关闭。实际上,PyMySQL的连接对象可以作为上下文管理器使用,这样可以更简洁地管理数据库连接。以下是一个使用上下文管理器的示例:

import pymysql

# 定义数据库连接参数
db_config = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database',
    'cursorclass': pymysql.cursors.DictCursor
}

# 使用上下文管理器管理数据库连接
with pymysql.connect(**db_config) as connection:
    with connection.cursor() as cursor:
        # 执行查询
        select_sql = "SELECT COUNT(*) as total FROM users"
        cursor.execute(select_sql)
        result = cursor.fetchone()
        print(f"用户表中共有 {result['total']} 条记录。")

在这个示例中,我们使用with语句创建数据库连接,这样当代码块执行完毕后,数据库连接会自动关闭。同时,我们也使用with语句创建游标对象,游标对象在使用完毕后也会自动关闭。这种方式不仅代码更简洁,还能确保资源的正确释放,避免内存泄漏。

事务处理

事务是数据库操作中不可分割的一组操作序列,要么全部执行成功,要么全部不执行。PyMySQL提供了对事务的支持,以下是一个事务处理的示例:

import pymysql

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 开始事务
        connection.begin()

        try:
            # 执行第一个操作:从账户1转出100元
            update_sql1 = "UPDATE accounts SET balance = balance - 100 WHERE account_id = 1"
            cursor.execute(update_sql1)

            # 模拟一个错误
            # result = 1 / 0  # 取消注释此行将触发错误

            # 执行第二个操作:向账户2转入100元
            update_sql2 = "UPDATE accounts SET balance = balance + 100 WHERE account_id = 2"
            cursor.execute(update_sql2)

            # 提交事务
            connection.commit()
            print("转账成功!")

        except Exception as e:
            # 回滚事务
            connection.rollback()
            print(f"转账失败:{e}")
            print("事务已回滚。")

except pymysql.Error as e:
    print(f"数据库操作出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们模拟了一个转账操作,需要从一个账户转出资金并转入另一个账户。这两个操作必须作为一个事务来执行,确保资金的一致性。我们使用connection.begin()方法开始一个事务,然后执行两个更新操作。如果在执行过程中发生异常,使用connection.rollback()方法回滚事务,撤销已经执行的操作;如果所有操作都成功执行,使用connection.commit()方法提交事务,使所有更改生效。

批量操作优化

在处理大量数据时,批量操作可以显著提高性能。以下是一个批量插入数据的优化示例:

import pymysql
import time

try:
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='your_database',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 创建测试表
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS test_data (
            id INT PRIMARY KEY AUTO_INCREMENT,
            value VARCHAR(100)
        )
        """
        cursor.execute(create_table_sql)

        # 准备10000条测试数据
        data = [(f"value_{i}",) for i in range(10000)]

        # 方法1:逐条插入
        start_time = time.time()
        for item in data:
            cursor.execute("INSERT INTO test_data (value) VALUES (%s)", item)
        connection.commit()
        end_time = time.time()
        print(f"逐条插入耗时:{end_time - start_time} 秒")

        # 清空表
        cursor.execute("TRUNCATE TABLE test_data")
        connection.commit()

        # 方法2:批量插入
        start_time = time.time()
        cursor.executemany("INSERT INTO test_data (value) VALUES (%s)", data)
        connection.commit()
        end_time = time.time()
        print(f"批量插入耗时:{end_time - start_time} 秒")

except pymysql.Error as e:
    print(f"数据库操作出错:{e}")
finally:
    if connection:
        connection.close()
        print("数据库连接已关闭。")

在这个示例中,我们比较了逐条插入和批量插入10000条数据的性能差异。通过使用executemany()方法进行批量插入,性能可以得到显著提升。在实际应用中,对于大量数据的插入、更新操作,应尽量使用批量操作的方式。

实际案例:数据统计分析

下面通过一个实际案例来展示PyMySQL的综合应用。假设我们有一个电子商务网站,需要统计分析用户的购买行为数据。我们将创建相关表,插入测试数据,然后进行统计分析。

import pymysql
import random
from datetime import datetime, timedelta

try:
    # 连接数据库
    connection = pymysql.connect(
        host='localhost',
        user='your_username',
        password='your_password',
        database='ecommerce',
        cursorclass=pymysql.cursors.DictCursor
    )

    with connection.cursor() as cursor:
        # 创建用户表
        create_users_table = """
        CREATE TABLE IF NOT EXISTS users (
            user_id INT PRIMARY KEY AUTO_INCREMENT,
            username VARCHAR(50) NOT NULL,
            email VARCHAR(100) NOT NULL UNIQUE,
            registration_date DATE
        )
        """
        cursor.execute(create_users_table)

        # 创建产品表
        create_products_table = """
        CREATE TABLE IF NOT EXISTS products (
            product_id INT PRIMARY KEY AUTO_INCREMENT,
            product_name VARCHAR(100) NOT NULL,
            price DECIMAL(10, 2) NOT NULL,
            category VARCHAR(50)
        )
        """
        cursor.execute(create_products_table)

        # 创建订单表
        create_orders_table = """
        CREATE TABLE IF NOT EXISTS orders (
            order_id INT PRIMARY KEY AUTO_INCREMENT,
            user_id INT NOT NULL,
            order_date DATETIME NOT NULL,
            total_amount DECIMAL(10, 2) NOT NULL,
            FOREIGN KEY (user_id) REFERENCES users(user_id)
        )
        """
        cursor.execute(create_orders_table)

        # 创建订单明细表
        create_order_items_table = """
        CREATE TABLE IF NOT EXISTS order_items (
            item_id INT PRIMARY KEY AUTO_INCREMENT,
            order_id INT NOT NULL,
            product_id INT NOT NULL,
            quantity INT NOT NULL,
            price DECIMAL(10, 2) NOT NULL,
            FOREIGN KEY (order_id) REFERENCES orders(order_id),
            FOREIGN KEY (product_id) REFERENCES products(product_id)
        )
        """
        cursor.execute(create_order_items_table)

        # 插入测试数据
        # 插入用户数据
        users = [
            ('user1', 'user1@example.com', '2023-01-01'),
            ('user2', 'user2@example.com', '2023-02-15'),
            ('user3', 'user3@example.com', '2023-03-20'),
            ('user4', 'user4@example.com', '2023-04-10'),
            ('user5', 'user5@example.com', '2023-05-05')
        ]
        cursor.executemany("INSERT INTO users (username, email, registration_date) VALUES (%s, %s, %s)", users)

        # 插入产品数据
        products = [
            ('手机', 5999.00, '电子产品'),
            ('笔记本电脑', 8999.00, '电子产品'),
            ('平板电脑', 3299.00, '电子产品'),
            ('耳机', 899.00, '电子产品'),
            ('T恤', 99.00, '服装'),
            ('牛仔裤', 299.00, '服装'),
            ('运动鞋', 599.00, '鞋类'),
            ('背包', 399.00, '箱包'),
            ('书籍', 49.00, '图书'),
            ('咖啡', 59.00, '食品')
        ]
        cursor.executemany("INSERT INTO products (product_name, price, category) VALUES (%s, %s, %s)", products)

        # 生成订单数据
        order_count = 20
        for i in range(1, order_count + 1):
            user_id = random.randint(1, 5)
            order_date = datetime.now() - timedelta(days=random.randint(1, 90))
            total_amount = 0.0

            # 插入订单
            insert_order_sql = "INSERT INTO orders (user_id, order_date, total_amount) VALUES (%s, %s, %s)"
            cursor.execute(insert_order_sql, (user_id, order_date, total_amount))
            order_id = cursor.lastrowid

            # 生成订单明细
            item_count = random.randint(1, 5)
            order_items = []
            for j in range(item_count):
                product_id = random.randint(1, 10)
                price = products[product_id - 1][1]
                quantity = random.randint(1, 3)
                item_total = price * quantity
                total_amount += item_total
                order_items.append((order_id, product_id, quantity, price))

            # 插入订单明细
            insert_items_sql = "INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (%s, %s, %s, %s)"
            cursor.executemany(insert_items_sql, order_items)

            # 更新订单总金额
            update_order_sql = "UPDATE orders SET total_amount = %s WHERE order_id = %s"
            cursor.execute(update_order_sql, (total_amount, order_id))

        # 提交所有插入操作
        connection.commit()

        # 统计分析
        print("\n--- 数据统计分析 ---")

        # 1. 统计总用户数
        cursor.execute("SELECT COUNT(*) as total_users FROM users")
        result = cursor.fetchone()
        print(f"总用户数: {result['total_users']}")

        # 2. 统计总订单数和总销售额
        cursor.execute("SELECT COUNT(*) as total_orders, SUM(total_amount) as total_sales FROM orders")
        result = cursor.fetchone()
        print(f"总订单数: {result['total_orders']}, 总销售额: {result['total_sales']:.2f}元")

        # 3. 统计最畅销的5种产品
        popular_products_sql = """
        SELECT p.product_name, SUM(oi.quantity) as total_sold
        FROM order_items oi
        JOIN products p ON oi.product_id = p.product_id
        GROUP BY p.product_id
        ORDER BY total_sold DESC
        LIMIT 5
        """
        cursor.execute(popular_products_sql)
        print("\n最畅销的5种产品:")
        for i, product in enumerate(cursor.fetchall(), 1):
            print(f"{i}. {product['product_name']}: {product['total_sold']}件")

        # 4. 统计每月销售额
        monthly_sales_sql = """
        SELECT 
            DATE_FORMAT(order_date, '%Y-%m') as month,
            SUM(total_amount) as monthly_sales
        FROM orders
        GROUP BY month
        ORDER BY month
        """
        cursor.execute(monthly_sales_sql)
        print("\n每月销售额:")
        for month_data in cursor.fetchall():
            print(f"{month_data['month']}: {month_data['monthly_sales']:.2f}元")

        # 5. 统计每个用户的平均订单金额
        avg_order_per_user_sql = """
        SELECT 
            u.username,
            COUNT(o.order_id) as order_count,
            AVG(o.total_amount) as avg_order_amount
        FROM users u
        LEFT JOIN orders o ON u.user_id = o.user_id
        GROUP BY u.user_id
        ORDER BY avg_order_amount DESC
        """
        cursor.execute(avg_order_per_user_sql)
        print("\n每个用户的平均订单金额:")
        for user_data in cursor.fetchall():
            print(f"{user_data['username']}: 订单数 {user_data['order_count']}, 平均金额 {user_data['avg_order_amount']:.2f}元")

except pymysql.Error as e:
    print(f"数据库操作出错:{e}")
finally:
    if connection:
        connection.close()
        print("\n数据库连接已关闭。")

这个实际案例展示了如何使用PyMySQL进行数据库设计、数据插入和复杂查询统计。我们创建了用户、产品、订单和订单明细四个表,然后生成了测试数据,最后进行了一系列统计分析,包括用户数量统计、销售额统计、畅销产品分析、销售趋势分析等。通过这个案例,你可以看到PyMySQL在实际项目中的强大功能和灵活性。

总结案例:学生成绩管理系统

下面我们再通过一个学生成绩管理系统的案例来进一步展示PyMySQL的应用。这个系统可以实现学生信息管理、课程管理、成绩录入和查询统计等功能。

import pymysql
from prettytable import PrettyTable

class StudentGradeManager:
    def __init__(self, host, user, password, database):
        self.connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            cursorclass=pymysql.cursors.DictCursor
        )

    def create_tables(self):
        """创建数据库表"""
        with self.connection.cursor() as cursor:
            # 创建学生表
            create_students_table = """
            CREATE TABLE IF NOT EXISTS students (
                student_id INT PRIMARY KEY AUTO_INCREMENT,
                name VARCHAR(50) NOT NULL,
                gender ENUM('男', '女') NOT NULL,
                birth_date DATE,
                class VARCHAR(20)
            )
            """
            cursor.execute(create_students_table)

            # 创建课程表
            create_courses_table = """
            CREATE TABLE IF NOT EXISTS courses (
                course_id INT PRIMARY KEY AUTO_INCREMENT,
                course_name VARCHAR(50) NOT NULL,
                teacher VARCHAR(50)
            )
            """
            cursor.execute(create_courses_table)

            # 创建成绩表
            create_grades_table = """
            CREATE TABLE IF NOT EXISTS grades (
                grade_id INT PRIMARY KEY AUTO_INCREMENT,
                student_id INT NOT NULL,
                course_id INT NOT NULL,
                score DECIMAL(5, 2) NOT NULL,
                exam_date DATE NOT NULL,
                FOREIGN KEY (student_id) REFERENCES students(student_id),
                FOREIGN KEY (course_id) REFERENCES courses(course_id)
            )
            """
            cursor.execute(create_grades_table)

        self.connection.commit()
        print("数据库表创建成功!")

    def add_student(self, name, gender, birth_date, class_name):
        """添加学生"""
        with self.connection.cursor() as cursor:
            sql = "INSERT INTO students (name, gender, birth_date, class) VALUES (%s, %s, %s, %s)"
            cursor.execute(sql, (name, gender, birth_date, class_name))
            student_id = cursor.lastrowid
            self.connection.commit()
            print(f"学生 {name} 添加成功,ID为 {student_id}")
            return student_id

    def add_course(self, course_name, teacher):
        """添加课程"""
        with self.connection.cursor() as cursor:
            sql = "INSERT INTO courses (course_name, teacher) VALUES (%s, %s)"
            cursor.execute(sql, (course_name, teacher))
            course_id = cursor.lastrowid
            self.connection.commit()
            print(f"课程 {course_name} 添加成功,ID为 {course_id}")
            return course_id

    def add_grade(self, student_id, course_id, score, exam_date):
        """添加成绩"""
        with self.connection.cursor() as cursor:
            sql = "INSERT INTO grades (student_id, course_id, score, exam_date) VALUES (%s, %s, %s, %s)"
            cursor.execute(sql, (student_id, course_id, score, exam_date))
            self.connection.commit()
            print(f"成绩添加成功!")

    def get_all_students(self):
        """获取所有学生信息"""
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM students"
            cursor.execute(sql)
            return cursor.fetchall()

    def get_all_courses(self):
        """获取所有课程信息"""
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM courses"
            cursor.execute(sql)
            return cursor.fetchall()

    def get_student_grades(self, student_id):
        """获取学生成绩"""
        with self.connection.cursor() as cursor:
            sql = """
            SELECT c.course_name, g.score, g.exam_date
            FROM grades g
            JOIN courses c ON g.course_id = c.course_id
            WHERE g.student_id = %s
            """
            cursor.execute(sql, (student_id,))
            return cursor.fetchall()

    def get_course_grades(self, course_id):
        """获取课程成绩"""
        with self.connection.cursor() as cursor:
            sql = """
            SELECT s.name, g.score, g.exam_date
            FROM grades g
            JOIN students s ON g.student_id = s.student_id
            WHERE g.course_id = %s
            """
            cursor.execute(sql, (course_id,))
            return cursor.fetchall()

    def get_class_average(self, course_id):
        """获取课程平均分"""
        with self.connection.cursor() as cursor:
            sql = """
            SELECT AVG(score) as average_score
            FROM grades
            WHERE course_id = %s
            """
            cursor.execute(sql, (course_id,))
            return cursor.fetchone()['average_score']

    def close(self):
        """关闭数据库连接"""
        self.connection.close()
        print("数据库连接已关闭。")


# 使用示例
if __name__ == "__main__":
    # 创建成绩管理系统实例
    manager = StudentGradeManager(
        host='localhost',
        user='your_username',
        password='your_password',
        database='student_grades'
    )

    # 创建表
    manager.create_tables()

    # 添加学生
    student1_id = manager.add_student("张三", "男", "2005-01-15", "高一(1)班")
    student2_id = manager.add_student("李四", "女", "2005-03-20", "高一(1)班")
    student3_id = manager.add_student("王五", "男", "2005-02-10", "高一(2)班")

    # 添加课程
    math_id = manager.add_course("数学", "李老师")
    chinese_id = manager.add_course("语文", "王老师")
    english_id = manager.add_course("英语", "张老师")

    # 添加成绩
    manager.add_grade(student1_id, math_id, 95.5, "2023-06-10")
    manager.add_grade(student1_id, chinese_id, 88.0, "2023-06-10")
    manager.add_grade(student1_id, english_id, 92.0, "2023-06-10")

    manager.add_grade(student2_id, math_id, 87.0, "2023-06-10")
    manager.add_grade(student2_id, chinese_id, 91.5, "2023-06-10")
    manager.add_grade(student2_id, english_id, 96.0, "2023-06-10")

    manager.add_grade(student3_id, math_id, 92.0, "2023-06-10")
    manager.add_grade(student3_id, chinese_id, 85.0, "2023-06-10")
    manager.add_grade(student3_id, english_id, 89.5, "2023-06-10")

    # 显示所有学生
    print("\n--- 所有学生信息 ---")
    students = manager.get_all_students()
    table = PrettyTable()
    table.field_names = ["ID", "姓名", "性别", "出生日期", "班级"]
    for student in students:
        table.add_row([student["student_id"], student["name"], student["gender"], 
                      student["birth_date"], student["class"]])
    print(table)

    # 显示所有课程
    print("\n--- 所有课程信息 ---")
    courses = manager.get_all_courses()
    table = PrettyTable()
    table.field_names = ["ID", "课程名称", "授课老师"]
    for course in courses:
        table.add_row([course["course_id"], course["course_name"], course["teacher"]])
    print(table)

    # 显示学生成绩
    print("\n--- 学生成绩 ---")
    for student_id in [student1_id, student2_id, student3_id]:
        grades = manager.get_student_grades(student_id)
        if grades:
            table = PrettyTable()
            table.field_names = ["课程", "分数", "考试日期"]
            for grade in grades:
                table.add_row([grade["course_name"], grade["score"], grade["exam_date"]])
            print(f"\n{students[student_id-1]['name']}的成绩:")
            print(table)

    # 显示课程平均分
    print("\n--- 课程平均分 ---")
    for course_id in [math_id, chinese_id, english_id]:
        avg_score = manager.get_class_average(course_id)
        print(f"{courses[course_id-1]['course_name']}平均分: {avg_score:.2f}")

    # 关闭连接
    manager.close()

这个学生成绩管理系统展示了如何使用PyMySQL构建一个完整的应用程序。系统通过面向对象的方式组织代码,提供了学生信息管理、课程管理、成绩录入和查询统计等功能。使用PrettyTable库美化了输出结果,使数据展示更加直观。这个案例涵盖了PyMySQL的各种操作,包括数据库连接、表创建、数据插入、查询和统计分析等。

通过以上的介绍和实例,你可以看到PyMySQL是一个功能强大、使用方便的Python库,它为Python开发者提供了一个高效、灵活的方式来连接和操作MySQL数据库。无论是小型项目还是大型应用,PyMySQL都能满足你的需求。

相关资源

关注我,每天分享一个实用的Python自动化工具。

退出移动版