Python实用工具:numexpr – 高效数值计算的利器

1. Python在各领域的广泛性及重要性

Python作为一种高级、通用、解释型的编程语言,凭借其简洁易读的语法和强大的功能,已成为当今最受欢迎的编程语言之一。它的应用领域极为广泛,涵盖了Web开发、数据分析和数据科学、机器学习和人工智能、桌面自动化和爬虫脚本、金融和量化交易、教育和研究等众多领域。

在Web开发中,Python的Django、Flask等框架为开发者提供了高效、便捷的工具,能够快速构建出功能强大的Web应用。在数据分析和数据科学领域,NumPy、pandas、Matplotlib等库使得数据处理、分析和可视化变得轻而易举。机器学习和人工智能方面,TensorFlow、PyTorch、Scikit-learn等框架让开发者能够轻松实现各种复杂的算法和模型。在桌面自动化和爬虫脚本中,Python的Selenium、Requests、BeautifulSoup等库可以帮助用户自动化完成各种任务,高效地获取和处理网络数据。金融和量化交易领域,Python的Pandas、NumPy、TA-Lib等库为金融数据分析和交易策略开发提供了强大的支持。在教育和研究方面,Python因其简单易学、功能强大的特点,成为了教师和学生进行教学和研究的理想工具。

本文将介绍Python的一个实用工具库——numexpr,它在数值计算领域有着出色的表现,能够帮助开发者更高效地处理大规模数据计算任务。

2. numexpr库的用途、工作原理、优缺点及License类型

2.1 用途

numexpr是一个专门用于高效数值计算的Python库,它主要用于加速NumPy数组的运算。在处理大规模数据时,NumPy的计算速度可能会成为瓶颈,而numexpr通过优化计算表达式的执行,能够显著提高计算效率。它支持各种数学运算,如加减乘除、三角函数、指数函数等,并且可以处理复杂的表达式。numexpr特别适用于需要频繁进行数值计算的场景,如科学计算、数据分析、机器学习等领域。

2.2 工作原理

numexpr的工作原理基于表达式编译技术。当用户提交一个计算表达式时,numexpr会将其编译为高效的机器码,然后直接在内存中执行。这种方式避免了传统Python解释器的开销,同时也减少了内存访问次数,从而提高了计算速度。此外,numexpr还支持多线程计算,能够充分利用多核CPU的性能,进一步加速计算过程。

2.3 优缺点

优点

  • 高效性能:通过编译表达式和多线程计算,numexpr能够显著提高数值计算的速度,尤其是在处理大规模数据时表现更为突出。
  • 内存优化:numexpr在计算过程中采用了内存优化策略,减少了中间结果的存储,从而降低了内存消耗。
  • 易用性:numexpr的接口与NumPy非常相似,用户可以很容易地将现有的NumPy代码转换为使用numexpr的代码。
  • 跨平台支持:numexpr支持多种操作系统和硬件平台,具有良好的跨平台性。

缺点

  • 表达式限制:numexpr对支持的表达式有一定的限制,一些复杂的表达式可能无法直接使用numexpr进行计算。
  • 学习成本:虽然numexpr的接口与NumPy相似,但用户仍然需要了解一些numexpr特有的语法和用法,这可能需要一定的学习成本。

2.4 License类型

numexpr采用BSD许可证,这是一种非常宽松的开源许可证。根据BSD许可证,用户可以自由地使用、修改和分发numexpr库,只需保留原有的版权声明即可。这种许可证类型使得numexpr在商业和非商业项目中都得到了广泛的应用。

3. numexpr库的使用方式

3.1 安装numexpr

在使用numexpr之前,需要先安装它。可以使用pip命令来安装numexpr:

pip install numexpr

安装完成后,可以通过以下方式验证numexpr是否安装成功:

import numexpr as ne

print(ne.__version__)

如果能够正常输出版本号,则说明numexpr安装成功。

3.2 基本用法

numexpr的基本用法非常简单,主要通过evaluate函数来计算表达式。下面是一个简单的示例:

import numpy as np
import numexpr as ne

# 创建两个NumPy数组
a = np.array([1, 2, 3, 4, 5])
b = np.array([6, 7, 8, 9, 10])

# 使用numexpr计算表达式
result = ne.evaluate("a + b")

print("NumPy计算结果:", a + b)
print("numexpr计算结果:", result)

在这个示例中,我们首先创建了两个NumPy数组ab,然后使用numexpr的evaluate函数计算表达式"a + b"。最后,我们将NumPy直接计算的结果和numexpr计算的结果进行了对比,可以看到两者的结果是一致的。

3.3 支持的运算符和函数

numexpr支持多种运算符和函数,包括基本的算术运算符、比较运算符、逻辑运算符以及各种数学函数。下面是一些常见的运算符和函数示例:

import numpy as np
import numexpr as ne

# 创建NumPy数组
a = np.array([1, 2, 3, 4, 5])
b = np.array([6, 7, 8, 9, 10])

# 基本算术运算符
result_add = ne.evaluate("a + b")  # 加法
result_sub = ne.evaluate("a - b")  # 减法
result_mul = ne.evaluate("a * b")  # 乘法
result_div = ne.evaluate("a / b")  # 除法
result_pow = ne.evaluate("a ** b") # 幂运算

# 比较运算符
result_lt = ne.evaluate("a < b")   # 小于
result_gt = ne.evaluate("a > b")   # 大于
result_eq = ne.evaluate("a == b")  # 等于

# 逻辑运算符
result_and = ne.evaluate("(a > 2) & (b < 9)")  # 逻辑与
result_or = ne.evaluate("(a > 2) | (b < 9)")   # 逻辑或
result_not = ne.evaluate("~(a > 2)")           # 逻辑非

# 数学函数
result_sin = ne.evaluate("sin(a)")    # 正弦函数
result_cos = ne.evaluate("cos(a)")    # 余弦函数
result_exp = ne.evaluate("exp(a)")    # 指数函数
result_log = ne.evaluate("log(a)")    # 自然对数函数
result_sqrt = ne.evaluate("sqrt(a)")  # 平方根函数

# 打印结果
print("加法结果:", result_add)
print("减法结果:", result_sub)
print("乘法结果:", result_mul)
print("除法结果:", result_div)
print("幂运算结果:", result_pow)
print("小于比较结果:", result_lt)
print("大于比较结果:", result_gt)
print("等于比较结果:", result_eq)
print("逻辑与结果:", result_and)
print("逻辑或结果:", result_or)
print("逻辑非结果:", result_not)
print("正弦函数结果:", result_sin)
print("余弦函数结果:", result_cos)
print("指数函数结果:", result_exp)
print("自然对数函数结果:", result_log)
print("平方根函数结果:", result_sqrt)

3.4 使用变量和常量

在numexpr的表达式中,可以使用变量和常量。变量可以是NumPy数组或Python标量,常量可以是数值或字符串。下面是一个使用变量和常量的示例:

import numpy as np
import numexpr as ne

# 创建NumPy数组
a = np.array([1, 2, 3, 4, 5])
b = np.array([6, 7, 8, 9, 10])

# 定义常量
c = 2
d = 3.14

# 使用变量和常量计算表达式
result = ne.evaluate("a * c + sin(b) * d")

print("计算结果:", result)

3.5 多线程计算

numexpr支持多线程计算,可以通过设置numexpr.set_num_threads()函数来指定使用的线程数。默认情况下,numexpr会自动检测系统的CPU核心数,并使用所有可用的核心。下面是一个多线程计算的示例:

import numpy as np
import numexpr as ne
import time

# 创建大型NumPy数组
a = np.random.rand(10000000)
b = np.random.rand(10000000)

# 单线程计算
ne.set_num_threads(1)
start_time = time.time()
result_single = ne.evaluate("a * b + sin(a) + cos(b)")
end_time = time.time()
print(f"单线程计算耗时: {end_time - start_time}秒")

# 多线程计算(使用所有可用核心)
ne.set_num_threads(ne.detect_number_of_cores())
start_time = time.time()
result_multi = ne.evaluate("a * b + sin(a) + cos(b)")
end_time = time.time()
print(f"多线程计算耗时: {end_time - start_time}秒")

# 验证结果是否一致
print("结果一致:", np.allclose(result_single, result_multi))

3.6 与NumPy的性能对比

为了展示numexpr的性能优势,下面进行一个与NumPy的性能对比测试。我们将计算一个复杂的表达式,比较NumPy和numexpr的计算时间:

import numpy as np
import numexpr as ne
import time

# 创建大型NumPy数组
size = 10000000
a = np.random.rand(size)
b = np.random.rand(size)
c = np.random.rand(size)

# NumPy计算
start_time = time.time()
result_numpy = a * b + np.sin(a) + np.cos(b) * c
end_time = time.time()
numpy_time = end_time - start_time
print(f"NumPy计算耗时: {numpy_time}秒")

# numexpr计算
start_time = time.time()
result_numexpr = ne.evaluate("a * b + sin(a) + cos(b) * c")
end_time = time.time()
numexpr_time = end_time - start_time
print(f"numexpr计算耗时: {numexpr_time}秒")

# 计算性能提升比例
speedup = numpy_time / numexpr_time
print(f"numexpr比NumPy快: {speedup:.2f}倍")

# 验证结果是否一致
print("结果一致:", np.allclose(result_numpy, result_numexpr))

3.7 内存优化

numexpr在计算过程中采用了内存优化策略,减少了中间结果的存储,从而降低了内存消耗。这在处理大规模数据时尤为重要。下面是一个展示内存优化的示例:

import numpy as np
import numexpr as ne
import psutil
import os

# 获取当前进程的内存使用情况
def get_memory_usage():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # MB

# 创建大型NumPy数组
size = 10000000
a = np.random.rand(size)
b = np.random.rand(size)
c = np.random.rand(size)

# 记录初始内存使用
initial_memory = get_memory_usage()
print(f"初始内存使用: {initial_memory} MB")

# NumPy计算(会产生中间结果)
memory_before_numpy = get_memory_usage()
result_numpy = a * b + np.sin(a) + np.cos(b) * c
memory_after_numpy = get_memory_usage()
print(f"NumPy计算内存增量: {memory_after_numpy - memory_before_numpy} MB")

# 释放NumPy结果占用的内存
del result_numpy

# numexpr计算(内存优化)
memory_before_numexpr = get_memory_usage()
result_numexpr = ne.evaluate("a * b + sin(a) + cos(b) * c")
memory_after_numexpr = get_memory_usage()
print(f"numexpr计算内存增量: {memory_after_numexpr - memory_before_numexpr} MB")

# 验证结果是否一致
print("结果一致:", np.allclose(a * b + np.sin(a) + np.cos(b) * c, result_numexpr))

3.8 编译表达式

在某些情况下,我们可能需要多次计算同一个表达式,这时可以使用numexpr的编译功能来提高性能。编译后的表达式可以重复使用,避免了每次都进行表达式编译的开销。下面是一个编译表达式的示例:

import numpy as np
import numexpr as ne
import time

# 创建NumPy数组
a = np.random.rand(1000000)
b = np.random.rand(1000000)
c = np.random.rand(1000000)

# 定义表达式
expr = "a * b + sin(a) + cos(b) * c"

# 编译表达式
compiled_expr = ne.NumExpr(expr)

# 使用编译后的表达式进行多次计算
results = []
num_iterations = 10

# 测试编译后的表达式性能
start_time = time.time()
for _ in range(num_iterations):
    result = compiled_expr(a, b, c)
    results.append(result)
end_time = time.time()
compiled_time = end_time - start_time
print(f"使用编译后的表达式计算 {num_iterations} 次耗时: {compiled_time}秒")

# 测试直接使用evaluate函数的性能
start_time = time.time()
for _ in range(num_iterations):
    result = ne.evaluate(expr)
    results.append(result)
end_time = time.time()
evaluate_time = end_time - start_time
print(f"直接使用evaluate函数计算 {num_iterations} 次耗时: {evaluate_time}秒")

# 计算性能提升比例
speedup = evaluate_time / compiled_time
print(f"编译后的表达式比直接使用evaluate快: {speedup:.2f}倍")

3.9 使用where函数

numexpr提供了类似于NumPy的where函数,用于根据条件选择元素。下面是一个使用where函数的示例:

import numpy as np
import numexpr as ne

# 创建NumPy数组
a = np.array([1, 2, 3, 4, 5])
b = np.array([6, 7, 8, 9, 10])
condition = np.array([True, False, True, False, True])

# 使用numexpr的where函数
result = ne.evaluate("where(condition, a, b)")

print("条件数组:", condition)
print("数组a:", a)
print("数组b:", b)
print("where函数结果:", result)

3.10 配置numexpr

可以通过修改numexpr的配置来优化其性能。可以使用ne.set_vml_accuracy_mode()函数设置VML(Vector Math Library)的精度模式,使用ne.set_vml_num_threads()函数设置VML使用的线程数。下面是一个配置numexpr的示例:

import numpy as np
import numexpr as ne

# 设置VML精度模式('high'表示高精度,'fast'表示快速但精度稍低)
ne.set_vml_accuracy_mode('high')

# 设置VML使用的线程数
ne.set_vml_num_threads(ne.detect_number_of_cores())

# 打印当前配置
print(f"VML精度模式: {ne.get_vml_accuracy_mode()}")
print(f"VML线程数: {ne.get_vml_num_threads()}")
print(f"numexpr线程数: {ne.get_num_threads()}")

# 创建NumPy数组并进行计算
a = np.random.rand(1000000)
b = np.random.rand(1000000)
result = ne.evaluate("sin(a) + cos(b)")

print("计算完成")

4. 实际案例

4.1 金融数据分析

在金融数据分析中,经常需要对大量的金融数据进行复杂的计算。numexpr可以帮助我们高效地完成这些计算任务。下面是一个金融数据分析的实际案例,展示了如何使用numexpr计算股票的收益率和波动率:

import numpy as np
import pandas as pd
import numexpr as ne
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题

# 生成模拟股票数据
def generate_stock_data(days=365, num_stocks=5):
    """生成模拟股票数据"""
    end_date = datetime.now()
    start_date = end_date - timedelta(days=days)

    # 生成日期序列
    date_range = pd.date_range(start=start_date, end=end_date, freq='B')

    # 生成股票数据
    data = {}
    for i in range(num_stocks):
        stock_name = f"股票{i+1}"
        # 生成随机价格
        prices = np.random.randn(len(date_range)).cumsum() + 100
        # 确保价格为正
        prices = np.maximum(prices, 1)
        data[stock_name] = prices

    return pd.DataFrame(data, index=date_range)

# 计算股票收益率和波动率
def calculate_returns_and_volatility(prices_df):
    """计算股票收益率和波动率"""
    # 计算对数收益率
    # 使用numexpr加速计算
    returns_df = pd.DataFrame(index=prices_df.index)

    for stock in prices_df.columns:
        # 获取当前股票价格
        prices = prices_df[stock].values

        # 使用numexpr计算对数收益率
        # log(price_t / price_{t-1}) = log(price_t) - log(price_{t-1})
        expr = "log(prices[1:]) - log(prices[:-1])"
        returns = ne.evaluate(expr)

        # 添加NaN作为第一行(因为第一行没有前一天的数据)
        returns = np.insert(returns, 0, np.nan)

        # 将计算结果添加到DataFrame中
        returns_df[stock] = returns

    # 计算波动率(年化)
    volatility_df = returns_df.rolling(window=20).std() * np.sqrt(252)

    return returns_df, volatility_df

# 可视化结果
def visualize_results(prices_df, returns_df, volatility_df):
    """可视化股票价格、收益率和波动率"""
    fig, axes = plt.subplots(3, 1, figsize=(12, 18))

    # 绘制股票价格
    prices_df.plot(ax=axes[0], title="股票价格走势")
    axes[0].set_ylabel("价格")
    axes[0].grid(True)

    # 绘制收益率
    returns_df.plot(ax=axes[1], title="股票收益率")
    axes[1].set_ylabel("收益率")
    axes[1].grid(True)

    # 绘制波动率
    volatility_df.plot(ax=axes[2], title="股票波动率(年化)")
    axes[2].set_ylabel("波动率")
    axes[2].grid(True)

    plt.tight_layout()
    plt.savefig("stock_analysis.png")
    plt.show()

# 主函数
def main():
    # 生成模拟数据
    print("生成模拟股票数据...")
    prices_df = generate_stock_data(days=365, num_stocks=5)

    # 计算收益率和波动率
    print("计算收益率和波动率...")
    returns_df, volatility_df = calculate_returns_and_volatility(prices_df)

    # 可视化结果
    print("可视化分析结果...")
    visualize_results(prices_df, returns_df, volatility_df)

    # 输出统计信息
    print("\n统计信息:")
    for stock in returns_df.columns:
        avg_return = returns_df[stock].mean() * 252  # 年化平均收益率
        max_volatility = volatility_df[stock].max()

        print(f"{stock}:")
        print(f"  年化平均收益率: {avg_return:.4f}")
        print(f"  最大波动率: {max_volatility:.4f}")
        print()

if __name__ == "__main__":
    main()

4.2 科学计算

在科学计算中,经常需要进行大规模的数值模拟和计算。numexpr可以帮助我们高效地完成这些计算任务。下面是一个科学计算的实际案例,展示了如何使用numexpr加速偏微分方程的求解:

import numpy as np
import numexpr as ne
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm

# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题

def solve_heat_equation(num_points=100, num_time_steps=1000, dt=0.001, dx=0.01, alpha=0.1):
    """求解一维热传导方程 u_t = alpha * u_xx"""
    # 初始化温度分布
    x = np.linspace(0, 1, num_points)
    u = np.sin(np.pi * x)  # 初始条件: u(x,0) = sin(pi*x)

    # 创建用于存储结果的数组
    result = np.zeros((num_time_steps, num_points))
    result[0] = u.copy()

    # 计算扩散系数
    r = alpha * dt / (dx * dx)

    # 使用numexpr求解热传导方程
    for t in range(1, num_time_steps):
        # 使用显式差分格式: u(i,t+1) = u(i,t) + r * (u(i+1,t) - 2*u(i,t) + u(i-1,t))
        # 使用numexpr加速计算
        u_left = np.roll(u, 1)   # 左邻居
        u_right = np.roll(u, -1) # 右邻居

        # 使用numexpr计算下一时间步的温度分布
        expr = "u + r * (u_right - 2 * u + u_left)"
        u = ne.evaluate(expr)

        # 边界条件: u(0,t) = u(1,t) = 0
        u[0] = 0
        u[-1] = 0

        # 保存当前时间步的结果
        result[t] = u.copy()

    return x, np.linspace(0, num_time_steps * dt, num_time_steps), result

def visualize_heat_equation(x, t, u):
    """可视化热传导方程的求解结果"""
    X, T = np.meshgrid(x, t)

    fig = plt.figure(figsize=(12, 8))

    # 3D表面图
    ax1 = fig.add_subplot(121, projection='3d')
    surf = ax1.plot_surface(X, T, u, cmap=cm.coolwarm, linewidth=0, antialiased=True)
    ax1.set_xlabel('位置 x')
    ax1.set_ylabel('时间 t')
    ax1.set_zlabel('温度 u')
    ax1.set_title('热传导方程的3D解')
    fig.colorbar(surf, ax=ax1, shrink=0.5, aspect=5)

    # 等高线图
    ax2 = fig.add_subplot(122)
    contour = ax2.contourf(X, T, u, cmap=cm.coolwarm, levels=20)
    ax2.set_xlabel('位置 x')
    ax2.set_ylabel('时间 t')
    ax2.set_title('热传导方程的等高线解')
    fig.colorbar(contour, ax=ax2, shrink=0.5, aspect=5)

    plt.tight_layout()
    plt.savefig("heat_equation.png")
    plt.show()

def compare_performance(num_points=1000, num_time_steps=1000, dt=0.001, dx=0.01, alpha=0.1):
    """比较使用numexpr和纯NumPy求解热传导方程的性能"""
    import time

    # 初始化温度分布
    x = np.linspace(0, 1, num_points)
    u_numpy = np.sin(np.pi * x)
    u_numexpr = u_numpy.copy()

    # 计算扩散系数
    r = alpha * dt / (dx * dx)

    # 使用纯NumPy求解
    start_time = time.time()
    for t in range(num_time_steps):
        u_left = np.roll(u_numpy, 1)
        u_right = np.roll(u_numpy, -1)
        u_numpy = u_numpy + r * (u_right - 2 * u_numpy + u_left)
        u_numpy[0] = 0
        u_numpy[-1] = 0
    numpy_time = time.time() - start_time

    # 使用numexpr求解
    start_time = time.time()
    for t in range(num_time_steps):
        u_left = np.roll(u_numexpr, 1)
        u_right = np.roll(u_numexpr, -1)
        expr = "u_numexpr + r * (u_right - 2 * u_numexpr + u_left)"
        u_numexpr = ne.evaluate(expr)
        u_numexpr[0] = 0
        u_numexpr[-1] = 0
    numexpr_time = time.time() - start_time

    # 计算加速比
    speedup = numpy_time / numexpr_time

    print(f"纯NumPy计算时间: {numpy_time:.4f}秒")
    print(f"numexpr计算时间: {numexpr_time:.4f}秒")
    print(f"加速比: {speedup:.2f}倍")

    # 验证结果是否一致
    print(f"结果一致性: {np.allclose(u_numpy, u_numexpr)}")

def main():
    # 求解热传导方程
    print("求解热传导方程...")
    x, t, u = solve_heat_equation(num_points=100, num_time_steps=500, dt=0.001, dx=0.01, alpha=0.1)

    # 可视化结果
    print("可视化结果...")
    visualize_heat_equation(x, t, u)

    # 比较性能
    print("\n比较NumPy和numexpr的性能...")
    compare_performance(num_points=1000, num_time_steps=1000, dt=0.001, dx=0.01, alpha=0.1)

if __name__ == "__main__":
    main()

4.3 大数据处理

在大数据处理中,经常需要对海量数据进行复杂的计算。numexpr可以帮助我们高效地完成这些计算任务。下面是一个大数据处理的实际案例,展示了如何使用numexpr处理大规模数据集:

import numpy as np
import pandas as pd
import numexpr as ne
import time
from memory_profiler import profile

# 设置中文显示
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)

def generate_large_data(size=10000000):
    """生成大型数据集"""
    print(f"生成大型数据集 ({size} 行)...")
    data = {
        'A': np.random.randn(size),
        'B': np.random.randn(size),
        'C': np.random.randn(size),
        'D': np.random.randn(size),
        'category': np.random.choice(['cat1', 'cat2', 'cat3', 'cat4', 'cat5'], size=size)
    }
    return pd.DataFrame(data)

@profile
def process_data_with_pandas(df):
    """使用纯Pandas处理数据"""
    print("使用纯Pandas处理数据...")
    start_time = time.time()

    # 计算复杂表达式
    df['result'] = df['A'] * df['B'] + np.sin(df['C']) * np.cos(df['D'])

    # 过滤数据
    filtered_df = df[(df['result'] > 0) & (df['category'].isin(['cat1', 'cat3']))]

    # 分组计算
    grouped = filtered_df.groupby('category').agg({
        'A': 'mean',
        'B': 'sum',
        'result': 'std'
    })

    end_time = time.time()
    print(f"Pandas处理时间: {end_time - start_time:.4f}秒")

    return grouped

@profile
def process_data_with_numexpr(df):
    """使用numexpr处理数据"""
    print("使用numexpr处理数据...")
    start_time = time.time()

    # 使用numexpr计算复杂表达式
    A = df['A'].values
    B = df['B'].values
    C = df['C'].values
    D = df['D'].values

    expr = "A * B + sin(C) * cos(D)"
    result = ne.evaluate(expr)

    # 添加结果列
    df['result'] = result

    # 使用numexpr过滤数据
    condition = ne.evaluate("(result > 0) & ((category == 'cat1') | (category == 'cat3'))")
    filtered_df = df[condition]

    # 分组计算(Pandas在分组操作上已经很高效,这里不替换)
    grouped = filtered_df.groupby('category').agg({
        'A': 'mean',
        'B': 'sum',
        'result': 'std'
    })

    end_time = time.time()
    print(f"numexpr处理时间: {end_time - start_time:.4f}秒")

    return grouped

def main():
    # 生成大型数据集
    df = generate_large_data(size=10000000)

    # 使用Pandas处理数据
    pandas_result = process_data_with_pandas(df.copy())

    # 使用numexpr处理数据
    numexpr_result = process_data_with_numexpr(df.copy())

    # 验证结果是否一致
    print("\n验证结果一致性:")
    for col in pandas_result.columns:
        if col == 'result':  # 浮点数比较需要容忍一定误差
            print(f"{col}: {np.allclose(pandas_result[col], numexpr_result[col])}")
        else:
            print(f"{col}: {pandas_result[col].equals(numexpr_result[col])}")

    # 打印结果
    print("\n处理结果:")
    print(pandas_result)

if __name__ == "__main__":
    main()

5. 相关资源

  • Pypi地址:https://pypi.org/project/numexpr
  • Github地址:https://github.com/pydata/numexpr
  • 官方文档地址:https://numexpr.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。