Python实用工具:aioprometheus异步监控指标采集与上报教程

一、aioprometheus库核心概述

1.1 用途与工作原理

aioprometheus是一款基于Python asyncio框架开发的异步指标采集与上报库,专门用于为异步Python应用程序提供Prometheus监控指标的定义、收集和暴露功能。其工作原理遵循Prometheus的监控规范,通过定义Counter(计数器)、Gauge(仪表盘)、Summary(摘要)、Histogram(直方图)四种核心指标类型,在异步代码中埋点采集数据,再通过HTTP服务暴露指标接口,供Prometheus服务器定期拉取数据,最终实现对异步应用的性能监控与状态分析。

1.2 优缺点分析

优点

  1. 完全异步化设计,与aiohttp等异步框架完美兼容,不会阻塞事件循环,适合高并发异步应用场景;
  2. 原生支持Prometheus的四种核心指标类型,满足绝大多数监控需求;
  3. 提供灵活的指标标签(label)机制,支持多维度指标分析;
  4. 轻量级架构,安装简单,运行时资源消耗低。

缺点

  1. 文档相对精简,对于新手而言部分高级用法需要阅读源码理解;
  2. 生态相较于同步的prometheus_client库稍窄,第三方集成插件较少;
  3. 仅支持异步Python环境,无法直接用于同步应用程序。

1.3 License类型

aioprometheus采用MIT License开源协议,这意味着开发者可以自由地将其用于个人、商业项目,允许修改、分发源码,只需保留原始版权声明即可。

二、aioprometheus安装与环境准备

2.1 安装命令

aioprometheus库已发布至PyPI,可通过pip包管理工具直接安装,建议使用Python 3.7及以上版本(兼容asyncio的全部特性),安装命令如下:

pip install aioprometheus
# 如需使用aiohttp集成功能,可安装扩展依赖
pip install aioprometheus[aiohttp]

2.2 环境验证

安装完成后,可通过以下Python代码验证环境是否配置成功,该代码会创建一个简单的Counter指标并打印,确认库能够正常导入和使用:

import asyncio
from aioprometheus import Counter, Registry

async def verify_environment():
    # 创建指标注册表
    registry = Registry()
    # 定义Counter指标,用于统计请求次数
    http_requests_total = Counter(
        "http_requests_total",
        "Total number of HTTP requests",
        {"method": "GET", "endpoint": "/api"}
    )
    # 将指标注册到注册表
    registry.register(http_requests_total)
    # 增加指标计数
    http_requests_total.inc()
    # 打印指标信息
    print("指标信息:", http_requests_total.samples)

if __name__ == "__main__":
    asyncio.run(verify_environment())

运行上述代码,若控制台输出类似指标信息: [Sample(name='http_requests_total', labels={'method': 'GET', 'endpoint': '/api'}, value=1.0)]的内容,则说明aioprometheus已成功安装并可正常使用。

三、aioprometheus核心指标类型与使用方法

Prometheus的监控体系基于四种核心指标类型,aioprometheus对这四种类型进行了完整的异步封装,下面我们逐一讲解每种指标的定义、使用场景和代码示例。

3.1 Counter(计数器)

适用场景:用于统计只增不减的数值,例如请求次数、错误发生次数、任务完成数量等。Counter的核心操作是inc(),用于将指标值增加指定数值(默认增加1)。

代码示例:统计异步Web服务的GET请求次数

import asyncio
from aioprometheus import Counter, Registry, render
from aiohttp import web

# 1. 创建全局注册表
registry = Registry()

# 2. 定义Counter指标
# 参数说明:指标名、指标帮助信息、默认标签
http_requests_total = Counter(
    "http_requests_total",
    "Total count of HTTP requests by method and endpoint",
    labelnames=["method", "endpoint"]
)

# 3. 将指标注册到注册表
registry.register(http_requests_total)

# 4. 定义异步请求处理函数
async def handle_api_request(request):
    # 获取请求方法和路径
    method = request.method
    endpoint = request.path
    # 增加Counter计数,传入标签值
    http_requests_total.inc({"method": method, "endpoint": endpoint})
    return web.json_response({"status": "success", "data": "Hello, aioprometheus!"})

# 5. 定义指标暴露接口,供Prometheus拉取
async def metrics_handler(request):
    # 生成Prometheus格式的指标数据
    content, http_headers = render(registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

# 6. 创建aiohttp应用并配置路由
async def create_app():
    app = web.Application()
    # 业务接口
    app.add_routes([web.get("/api/hello", handle_api_request)])
    # 指标暴露接口
    app.add_routes([web.get("/metrics", metrics_handler)])
    return app

if __name__ == "__main__":
    app = asyncio.run(create_app())
    web.run_app(app, host="0.0.0.0", port=8080)

代码说明

  • 首先创建Registry对象,用于管理所有指标,这是aioprometheus的核心管理组件;
  • 定义Counter指标时,通过labelnames参数指定标签维度,支持后续按请求方法和接口路径统计;
  • 在请求处理函数handle_api_request中,调用inc()方法增加计数,并传入当前请求的标签值;
  • 配置/metrics接口,通过render函数将注册表中的指标数据转换为Prometheus可识别的格式;
  • 运行程序后,访问http://localhost:8080/api/hello触发请求计数,再访问http://localhost:8080/metrics即可查看指标数据。

3.2 Gauge(仪表盘)

适用场景:用于统计可增可减的数值,例如当前内存使用量、活跃连接数、队列长度等。Gauge支持inc()(增加)、dec()(减少)、set()(直接设置值)、set_to_current_time()(设置为当前时间戳)等操作。

代码示例:监控异步任务队列的长度

import asyncio
import random
from aioprometheus import Gauge, Registry, render
from aiohttp import web

# 创建注册表和Gauge指标
registry = Registry()
task_queue_length = Gauge(
    "task_queue_length",
    "Current number of tasks in the async queue",
    labelnames=["queue_name"]
)
registry.register(task_queue_length)

# 模拟异步任务队列
task_queue = asyncio.Queue()

# 生产任务:向队列中添加随机数量的任务
async def task_producer():
    while True:
        # 随机生成1-5个任务
        task_count = random.randint(1, 5)
        for _ in range(task_count):
            await task_queue.put(f"task_{asyncio.get_event_loop().time()}")
        # 更新Gauge指标:设置为当前队列长度
        queue_len = task_queue.qsize()
        task_queue_length.set({"queue_name": "user_task_queue"}, queue_len)
        print(f"Added {task_count} tasks, current queue length: {queue_len}")
        await asyncio.sleep(3)

# 消费任务:从队列中取出任务并处理
async def task_consumer():
    while True:
        if not task_queue.empty():
            task = await task_queue.get()
            # 模拟任务处理耗时
            await asyncio.sleep(1)
            print(f"Processed task: {task}")
            task_queue.task_done()
            # 更新Gauge指标:处理完任务后更新队列长度
            queue_len = task_queue.qsize()
            task_queue_length.set({"queue_name": "user_task_queue"}, queue_len)
        else:
            await asyncio.sleep(1)

# 指标暴露接口
async def metrics_handler(request):
    content, http_headers = render(registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

# 创建aiohttp应用
async def create_app():
    app = web.Application()
    app.add_routes([web.get("/metrics", metrics_handler)])
    # 启动生产者和消费者协程
    asyncio.create_task(task_producer())
    asyncio.create_task(task_consumer())
    return app

if __name__ == "__main__":
    app = asyncio.run(create_app())
    web.run_app(app, host="0.0.0.0", port=8081)

代码说明

  • Gauge指标task_queue_length用于监控任务队列的实时长度,通过labelnames区分不同队列;
  • task_producer协程负责生产任务,每次添加任务后调用set()方法更新指标值为当前队列长度;
  • task_consumer协程负责消费任务,处理完任务后同样更新指标值;
  • 运行程序后,访问http://localhost:8081/metrics可查看队列长度的实时变化,该指标会随着任务的生产和消费动态增减。

3.3 Summary(摘要)

适用场景:用于统计数值的分布情况,例如请求响应时间的平均值、中位数、95分位数等。Summary通过observe()方法记录数值,自动计算并存储指定分位数的统计结果。

代码示例:统计异步接口的响应时间

import asyncio
import time
from aioprometheus import Summary, Registry, render
from aiohttp import web

# 创建注册表和Summary指标
registry = Registry()
request_duration_seconds = Summary(
    "request_duration_seconds",
    "Summary of request processing duration in seconds",
    labelnames=["method", "endpoint"],
    # 指定需要统计的分位数和误差范围
    quantiles={0.5: 0.05, 0.95: 0.01, 0.99: 0.001}
)
registry.register(request_duration_seconds)

# 定义响应时间统计中间件
async def timing_middleware(app, handler):
    async def middleware_handler(request):
        # 记录请求开始时间
        start_time = time.perf_counter()
        try:
            # 执行原始请求处理函数
            response = await handler(request)
            return response
        finally:
            # 计算请求耗时
            duration = time.perf_counter() - start_time
            # 记录耗时到Summary指标
            request_duration_seconds.observe(
                {"method": request.method, "endpoint": request.path},
                duration
            )
    return middleware_handler

# 模拟耗时的业务接口
async def slow_api_handler(request):
    # 模拟接口处理耗时:0.1-0.5秒
    delay = asyncio.sleep(random.uniform(0.1, 0.5))
    await delay
    return web.json_response({"status": "success", "message": "Slow API response"})

# 普通业务接口
async def fast_api_handler(request):
    return web.json_response({"status": "success", "message": "Fast API response"})

# 指标暴露接口
async def metrics_handler(request):
    content, http_headers = render(registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

# 创建aiohttp应用
async def create_app():
    app = web.Application(middlewares=[timing_middleware])
    app.add_routes([
        web.get("/api/slow", slow_api_handler),
        web.get("/api/fast", fast_api_handler),
        web.get("/metrics", metrics_handler)
    ])
    return app

if __name__ == "__main__":
    app = asyncio.run(create_app())
    web.run_app(app, host="0.0.0.0", port=8082)

代码说明

  • 定义Summary指标时,通过quantiles参数指定需要统计的分位数,例如0.5代表中位数,0.95代表95分位数,后面的数值为允许的误差范围;
  • 实现timing_middleware中间件,在请求处理前后记录时间,计算耗时并通过observe()方法传入Summary指标;
  • 分别定义慢接口和快接口,模拟不同的响应耗时;
  • 运行程序后,多次访问/api/slow/api/fast接口,再访问/metrics即可查看响应时间的统计数据,包括平均值(_sum_count计算得出)、各分位数的数值。

3.4 Histogram(直方图)

适用场景:与Summary类似,用于统计数值的分布情况,但Histogram会将数值划分到不同的区间(bucket)中,统计每个区间的数值数量,适合用于绘制分布直方图。Histogram通过observe()方法记录数值,自动统计各区间的计数。

代码示例:统计异步任务的执行耗时分布

import asyncio
import random
import time
from aioprometheus import Histogram, Registry, render
from aiohttp import web

# 创建注册表和Histogram指标
registry = Registry()
task_execution_duration_seconds = Histogram(
    "task_execution_duration_seconds",
    "Histogram of task execution duration in seconds",
    labelnames=["task_type"],
    # 定义bucket区间:0.1, 0.2, 0.5, 1.0, +inf
    buckets=[0.1, 0.2, 0.5, 1.0]
)
registry.register(task_execution_duration_seconds)

# 模拟不同类型的异步任务
async def execute_task(task_type):
    # 根据任务类型设置不同的耗时范围
    if task_type == "light":
        duration = random.uniform(0.05, 0.15)
    elif task_type == "medium":
        duration = random.uniform(0.15, 0.3)
    else: # heavy
        duration = random.uniform(0.3, 0.8)
    # 模拟任务执行
    await asyncio.sleep(duration)
    # 记录任务耗时到Histogram指标
    task_execution_duration_seconds.observe({"task_type": task_type}, duration)
    return f"{task_type}_task_completed"

# 任务调度接口:接收任务类型参数并执行任务
async def task_scheduler_handler(request):
    task_type = request.query.get("task_type", "light")
    if task_type not in ["light", "medium", "heavy"]:
        return web.json_response({"error": "Invalid task type"}, status=400)
    result = await execute_task(task_type)
    return web.json_response({"status": "success", "result": result})

# 指标暴露接口
async def metrics_handler(request):
    content, http_headers = render(registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

# 创建aiohttp应用
async def create_app():
    app = web.Application()
    app.add_routes([
        web.get("/api/task", task_scheduler_handler),
        web.get("/metrics", metrics_handler)
    ])
    return app

if __name__ == "__main__":
    app = asyncio.run(create_app())
    web.run_app(app, host="0.0.0.0", port=8083)

代码说明

  • 定义Histogram指标时,通过buckets参数指定区间边界,指标会自动统计落入每个区间的数值数量,最后一个区间默认为+inf
  • execute_task函数模拟不同类型任务的执行耗时,根据任务类型设置不同的耗时范围,并通过observe()方法记录耗时;
  • 访问/api/task?task_type=light等接口触发任务执行,多次调用后访问/metrics,可查看各任务类型的耗时分布情况,包括每个bucket的计数、总和(_sum)和总次数(_count)。

四、aioprometheus高级用法:自定义指标标签与多注册表管理

4.1 动态标签与标签值替换

在实际应用中,指标标签的值往往需要动态获取,例如根据用户ID、请求IP等信息区分指标维度。aioprometheus支持在运行时动态传入标签值,下面以用户登录次数统计为例展示动态标签的用法:

import asyncio
from aioprometheus import Counter, Registry, render
from aiohttp import web

registry = Registry()
user_login_total = Counter(
    "user_login_total",
    "Total number of user logins by user type and platform",
    labelnames=["user_type", "platform"]
)
registry.register(user_login_total)

# 模拟用户登录接口,接收用户类型和平台参数
async def login_handler(request):
    data = await request.json()
    user_type = data.get("user_type", "guest") # 可选值:guest, member, admin
    platform = data.get("platform", "web") # 可选值:web, ios, android
    # 动态传入标签值
    user_login_total.inc({"user_type": user_type, "platform": platform})
    return web.json_response({"status": "success", "message": "Login successful"})

async def metrics_handler(request):
    content, http_headers = render(registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

async def create_app():
    app = web.Application()
    app.add_routes([
        web.post("/api/login", login_handler),
        web.get("/metrics", metrics_handler)
    ])
    return app

if __name__ == "__main__":
    app = asyncio.run(create_app())
    web.run_app(app, host="0.0.0.0", port=8084)

代码说明

  • 指标user_login_total定义了user_typeplatform两个标签,用于区分不同类型用户和不同登录平台的登录次数;
  • login_handler接口中,从请求参数中动态获取user_typeplatform的值,并传入inc()方法;
  • 通过Postman等工具发送POST请求到/api/login,请求体携带{"user_type": "member", "platform": "ios"}等参数,即可按标签维度统计登录次数。

4.2 多注册表管理

在大型应用中,不同模块可能需要独立的指标管理,此时可以使用多注册表机制,将不同模块的指标注册到不同的Registry对象中,再统一暴露或分别暴露指标接口。

import asyncio
from aioprometheus import Counter, Registry, render
from aiohttp import web

# 为用户模块创建注册表
user_registry = Registry()
user_register_total = Counter(
    "user_register_total",
    "Total number of user registrations",
    labelnames=["channel"]
)
user_registry.register(user_register_total)

# 为订单模块创建注册表
order_registry = Registry()
order_create_total = Counter(
    "order_create_total",
    "Total number of order creations",
    labelnames=["order_type"]
)
order_registry.register(order_create_total)

# 用户注册接口
async def user_register_handler(request):
    data = await request.json()
    channel = data.get("channel", "direct")
    user_register_total.inc({"channel": channel})
    return web.json_response({"status": "success", "message": "User registered"})

# 订单创建接口
async def order_create_handler(request):
    data = await request.json()
    order_type = data.get("order_type", "normal")
    order_create_total.inc({"order_type": order_type})
    return web.json_response({"status": "success", "message": "Order created"})

# 暴露用户模块指标
async def user_metrics_handler(request):
    content, http_headers = render(user_registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

# 暴露订单模块指标
async def order_metrics_handler(request):
    content, http_headers = render(order_registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

# 暴露所有模块指标(合并注册表)
async def all_metrics_handler(request):
    # 合并多个注册表的指标
    combined_registry = Registry()
    for metric in user_registry.collect():
        combined_registry.register(metric)
    for metric in order_registry.collect():
        combined_registry.register(metric)
    content, http_headers = render(combined_registry, request.headers.get('accept'))
    return web.Response(body=content, headers=http_headers)

async def create_app():
    app = web.Application()
    app.add_routes([
        web.post("/api/user/register", user_register_handler),
        web.post("/api/order/create", order_create_handler),
        web.get("/metrics/user", user_metrics_handler),
        web.get("/metrics/order", order_metrics_handler),
        web.get("/metrics/all", all_metrics_handler)
    ])
    return app

if __name__ == "__main__":
    app = asyncio.run(create_app())
    web.run_app(app, host="0.0.0.0", port=8085)

代码说明

  • 分别为用户模块和订单模块创建独立的Registry对象,注册各自的指标;
  • 提供/metrics/user/metrics/order接口,分别暴露对应模块的指标;
  • 实现/metrics/all接口,通过遍历两个注册表的指标并注册到新的Registry对象中,实现指标合并暴露,满足Prometheus一次性拉取所有指标的需求。

五、aioprometheus与Prometheus服务器集成实战

5.1 Prometheus服务器配置

要实现对aioprometheus暴露指标的监控,需要配置Prometheus服务器定期拉取指标数据。首先下载并安装Prometheus(下载地址:https://prometheus.io/download/),然后修改prometheus.yml配置文件:

global:
  scrape_interval: 15s # 每15秒拉取一次指标

scrape_configs:
  - job_name: 'aioprometheus_demo'
    static_configs:
      - targets: ['localhost:8080', 'localhost:8081', 'localhost:8082', 'localhost:8083', 'localhost:8084', 'localhost:8085']
    metrics_path: '/metrics'

配置说明

  • scrape_interval设置为15秒,表示Prometheus每15秒拉取一次指标;
  • job_name为任务名称,可自定义;
  • targets指定需要拉取指标的服务地址列表,即我们之前运行的各个aioprometheus示例服务;
  • metrics_path指定指标接口路径,默认为/metrics

5.2 启动Prometheus并查看指标

启动Prometheus服务器:

./prometheus --config.file=prometheus.yml

访问http://localhost:9090进入Prometheus Web界面,在查询框中输入指标名(如http_requests_totaltask_queue_length等),即可查看指标的实时数据和变化趋势。

5.3 可视化监控面板(Grafana集成)

为了更直观地展示指标数据,可将Prometheus作为数据源集成到Grafana中:

  1. 安装并启动Grafana(下载地址:https://grafana.com/grafana/download);
  2. 访问http://localhost:3000,使用默认账号密码(admin/admin)登录;
  3. 进入Configuration > Data Sources,添加Prometheus数据源,设置URL为http://localhost:9090
  4. 进入Dashboards > Import,导入官方或自定义的Dashboard模板,即可实现指标的可视化监控。

六、相关资源链接

  • Pypi地址:https://pypi.org/project/aioprometheus
  • Github地址:https://github.com/claws/aioprometheus
  • 官方文档地址:https://aioprometheus.readthedocs.io/en/latest/

关注我,每天分享一个实用的Python自动化工具。