一、aioprometheus库核心概述
1.1 用途与工作原理
aioprometheus是一款基于Python asyncio框架开发的异步指标采集与上报库,专门用于为异步Python应用程序提供Prometheus监控指标的定义、收集和暴露功能。其工作原理遵循Prometheus的监控规范,通过定义Counter(计数器)、Gauge(仪表盘)、Summary(摘要)、Histogram(直方图)四种核心指标类型,在异步代码中埋点采集数据,再通过HTTP服务暴露指标接口,供Prometheus服务器定期拉取数据,最终实现对异步应用的性能监控与状态分析。

1.2 优缺点分析
优点:
- 完全异步化设计,与aiohttp等异步框架完美兼容,不会阻塞事件循环,适合高并发异步应用场景;
- 原生支持Prometheus的四种核心指标类型,满足绝大多数监控需求;
- 提供灵活的指标标签(label)机制,支持多维度指标分析;
- 轻量级架构,安装简单,运行时资源消耗低。
缺点:
- 文档相对精简,对于新手而言部分高级用法需要阅读源码理解;
- 生态相较于同步的prometheus_client库稍窄,第三方集成插件较少;
- 仅支持异步Python环境,无法直接用于同步应用程序。
1.3 License类型
aioprometheus采用MIT License开源协议,这意味着开发者可以自由地将其用于个人、商业项目,允许修改、分发源码,只需保留原始版权声明即可。
二、aioprometheus安装与环境准备
2.1 安装命令
aioprometheus库已发布至PyPI,可通过pip包管理工具直接安装,建议使用Python 3.7及以上版本(兼容asyncio的全部特性),安装命令如下:
pip install aioprometheus
# 如需使用aiohttp集成功能,可安装扩展依赖
pip install aioprometheus[aiohttp]2.2 环境验证
安装完成后,可通过以下Python代码验证环境是否配置成功,该代码会创建一个简单的Counter指标并打印,确认库能够正常导入和使用:
import asyncio
from aioprometheus import Counter, Registry
async def verify_environment():
# 创建指标注册表
registry = Registry()
# 定义Counter指标,用于统计请求次数
http_requests_total = Counter(
"http_requests_total",
"Total number of HTTP requests",
{"method": "GET", "endpoint": "/api"}
)
# 将指标注册到注册表
registry.register(http_requests_total)
# 增加指标计数
http_requests_total.inc()
# 打印指标信息
print("指标信息:", http_requests_total.samples)
if __name__ == "__main__":
asyncio.run(verify_environment())运行上述代码,若控制台输出类似指标信息: [Sample(name='http_requests_total', labels={'method': 'GET', 'endpoint': '/api'}, value=1.0)]的内容,则说明aioprometheus已成功安装并可正常使用。
三、aioprometheus核心指标类型与使用方法
Prometheus的监控体系基于四种核心指标类型,aioprometheus对这四种类型进行了完整的异步封装,下面我们逐一讲解每种指标的定义、使用场景和代码示例。
3.1 Counter(计数器)
适用场景:用于统计只增不减的数值,例如请求次数、错误发生次数、任务完成数量等。Counter的核心操作是inc(),用于将指标值增加指定数值(默认增加1)。
代码示例:统计异步Web服务的GET请求次数
import asyncio
from aioprometheus import Counter, Registry, render
from aiohttp import web
# 1. 创建全局注册表
registry = Registry()
# 2. 定义Counter指标
# 参数说明:指标名、指标帮助信息、默认标签
http_requests_total = Counter(
"http_requests_total",
"Total count of HTTP requests by method and endpoint",
labelnames=["method", "endpoint"]
)
# 3. 将指标注册到注册表
registry.register(http_requests_total)
# 4. 定义异步请求处理函数
async def handle_api_request(request):
# 获取请求方法和路径
method = request.method
endpoint = request.path
# 增加Counter计数,传入标签值
http_requests_total.inc({"method": method, "endpoint": endpoint})
return web.json_response({"status": "success", "data": "Hello, aioprometheus!"})
# 5. 定义指标暴露接口,供Prometheus拉取
async def metrics_handler(request):
# 生成Prometheus格式的指标数据
content, http_headers = render(registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
# 6. 创建aiohttp应用并配置路由
async def create_app():
app = web.Application()
# 业务接口
app.add_routes([web.get("/api/hello", handle_api_request)])
# 指标暴露接口
app.add_routes([web.get("/metrics", metrics_handler)])
return app
if __name__ == "__main__":
app = asyncio.run(create_app())
web.run_app(app, host="0.0.0.0", port=8080)代码说明:
- 首先创建
Registry对象,用于管理所有指标,这是aioprometheus的核心管理组件; - 定义
Counter指标时,通过labelnames参数指定标签维度,支持后续按请求方法和接口路径统计; - 在请求处理函数
handle_api_request中,调用inc()方法增加计数,并传入当前请求的标签值; - 配置
/metrics接口,通过render函数将注册表中的指标数据转换为Prometheus可识别的格式; - 运行程序后,访问
http://localhost:8080/api/hello触发请求计数,再访问http://localhost:8080/metrics即可查看指标数据。
3.2 Gauge(仪表盘)
适用场景:用于统计可增可减的数值,例如当前内存使用量、活跃连接数、队列长度等。Gauge支持inc()(增加)、dec()(减少)、set()(直接设置值)、set_to_current_time()(设置为当前时间戳)等操作。
代码示例:监控异步任务队列的长度
import asyncio
import random
from aioprometheus import Gauge, Registry, render
from aiohttp import web
# 创建注册表和Gauge指标
registry = Registry()
task_queue_length = Gauge(
"task_queue_length",
"Current number of tasks in the async queue",
labelnames=["queue_name"]
)
registry.register(task_queue_length)
# 模拟异步任务队列
task_queue = asyncio.Queue()
# 生产任务:向队列中添加随机数量的任务
async def task_producer():
while True:
# 随机生成1-5个任务
task_count = random.randint(1, 5)
for _ in range(task_count):
await task_queue.put(f"task_{asyncio.get_event_loop().time()}")
# 更新Gauge指标:设置为当前队列长度
queue_len = task_queue.qsize()
task_queue_length.set({"queue_name": "user_task_queue"}, queue_len)
print(f"Added {task_count} tasks, current queue length: {queue_len}")
await asyncio.sleep(3)
# 消费任务:从队列中取出任务并处理
async def task_consumer():
while True:
if not task_queue.empty():
task = await task_queue.get()
# 模拟任务处理耗时
await asyncio.sleep(1)
print(f"Processed task: {task}")
task_queue.task_done()
# 更新Gauge指标:处理完任务后更新队列长度
queue_len = task_queue.qsize()
task_queue_length.set({"queue_name": "user_task_queue"}, queue_len)
else:
await asyncio.sleep(1)
# 指标暴露接口
async def metrics_handler(request):
content, http_headers = render(registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
# 创建aiohttp应用
async def create_app():
app = web.Application()
app.add_routes([web.get("/metrics", metrics_handler)])
# 启动生产者和消费者协程
asyncio.create_task(task_producer())
asyncio.create_task(task_consumer())
return app
if __name__ == "__main__":
app = asyncio.run(create_app())
web.run_app(app, host="0.0.0.0", port=8081)代码说明:
- Gauge指标
task_queue_length用于监控任务队列的实时长度,通过labelnames区分不同队列; task_producer协程负责生产任务,每次添加任务后调用set()方法更新指标值为当前队列长度;task_consumer协程负责消费任务,处理完任务后同样更新指标值;- 运行程序后,访问
http://localhost:8081/metrics可查看队列长度的实时变化,该指标会随着任务的生产和消费动态增减。
3.3 Summary(摘要)
适用场景:用于统计数值的分布情况,例如请求响应时间的平均值、中位数、95分位数等。Summary通过observe()方法记录数值,自动计算并存储指定分位数的统计结果。
代码示例:统计异步接口的响应时间
import asyncio
import time
from aioprometheus import Summary, Registry, render
from aiohttp import web
# 创建注册表和Summary指标
registry = Registry()
request_duration_seconds = Summary(
"request_duration_seconds",
"Summary of request processing duration in seconds",
labelnames=["method", "endpoint"],
# 指定需要统计的分位数和误差范围
quantiles={0.5: 0.05, 0.95: 0.01, 0.99: 0.001}
)
registry.register(request_duration_seconds)
# 定义响应时间统计中间件
async def timing_middleware(app, handler):
async def middleware_handler(request):
# 记录请求开始时间
start_time = time.perf_counter()
try:
# 执行原始请求处理函数
response = await handler(request)
return response
finally:
# 计算请求耗时
duration = time.perf_counter() - start_time
# 记录耗时到Summary指标
request_duration_seconds.observe(
{"method": request.method, "endpoint": request.path},
duration
)
return middleware_handler
# 模拟耗时的业务接口
async def slow_api_handler(request):
# 模拟接口处理耗时:0.1-0.5秒
delay = asyncio.sleep(random.uniform(0.1, 0.5))
await delay
return web.json_response({"status": "success", "message": "Slow API response"})
# 普通业务接口
async def fast_api_handler(request):
return web.json_response({"status": "success", "message": "Fast API response"})
# 指标暴露接口
async def metrics_handler(request):
content, http_headers = render(registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
# 创建aiohttp应用
async def create_app():
app = web.Application(middlewares=[timing_middleware])
app.add_routes([
web.get("/api/slow", slow_api_handler),
web.get("/api/fast", fast_api_handler),
web.get("/metrics", metrics_handler)
])
return app
if __name__ == "__main__":
app = asyncio.run(create_app())
web.run_app(app, host="0.0.0.0", port=8082)代码说明:
- 定义Summary指标时,通过
quantiles参数指定需要统计的分位数,例如0.5代表中位数,0.95代表95分位数,后面的数值为允许的误差范围; - 实现
timing_middleware中间件,在请求处理前后记录时间,计算耗时并通过observe()方法传入Summary指标; - 分别定义慢接口和快接口,模拟不同的响应耗时;
- 运行程序后,多次访问
/api/slow和/api/fast接口,再访问/metrics即可查看响应时间的统计数据,包括平均值(_sum和_count计算得出)、各分位数的数值。
3.4 Histogram(直方图)
适用场景:与Summary类似,用于统计数值的分布情况,但Histogram会将数值划分到不同的区间(bucket)中,统计每个区间的数值数量,适合用于绘制分布直方图。Histogram通过observe()方法记录数值,自动统计各区间的计数。
代码示例:统计异步任务的执行耗时分布
import asyncio
import random
import time
from aioprometheus import Histogram, Registry, render
from aiohttp import web
# 创建注册表和Histogram指标
registry = Registry()
task_execution_duration_seconds = Histogram(
"task_execution_duration_seconds",
"Histogram of task execution duration in seconds",
labelnames=["task_type"],
# 定义bucket区间:0.1, 0.2, 0.5, 1.0, +inf
buckets=[0.1, 0.2, 0.5, 1.0]
)
registry.register(task_execution_duration_seconds)
# 模拟不同类型的异步任务
async def execute_task(task_type):
# 根据任务类型设置不同的耗时范围
if task_type == "light":
duration = random.uniform(0.05, 0.15)
elif task_type == "medium":
duration = random.uniform(0.15, 0.3)
else: # heavy
duration = random.uniform(0.3, 0.8)
# 模拟任务执行
await asyncio.sleep(duration)
# 记录任务耗时到Histogram指标
task_execution_duration_seconds.observe({"task_type": task_type}, duration)
return f"{task_type}_task_completed"
# 任务调度接口:接收任务类型参数并执行任务
async def task_scheduler_handler(request):
task_type = request.query.get("task_type", "light")
if task_type not in ["light", "medium", "heavy"]:
return web.json_response({"error": "Invalid task type"}, status=400)
result = await execute_task(task_type)
return web.json_response({"status": "success", "result": result})
# 指标暴露接口
async def metrics_handler(request):
content, http_headers = render(registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
# 创建aiohttp应用
async def create_app():
app = web.Application()
app.add_routes([
web.get("/api/task", task_scheduler_handler),
web.get("/metrics", metrics_handler)
])
return app
if __name__ == "__main__":
app = asyncio.run(create_app())
web.run_app(app, host="0.0.0.0", port=8083)代码说明:
- 定义Histogram指标时,通过
buckets参数指定区间边界,指标会自动统计落入每个区间的数值数量,最后一个区间默认为+inf; execute_task函数模拟不同类型任务的执行耗时,根据任务类型设置不同的耗时范围,并通过observe()方法记录耗时;- 访问
/api/task?task_type=light等接口触发任务执行,多次调用后访问/metrics,可查看各任务类型的耗时分布情况,包括每个bucket的计数、总和(_sum)和总次数(_count)。
四、aioprometheus高级用法:自定义指标标签与多注册表管理
4.1 动态标签与标签值替换
在实际应用中,指标标签的值往往需要动态获取,例如根据用户ID、请求IP等信息区分指标维度。aioprometheus支持在运行时动态传入标签值,下面以用户登录次数统计为例展示动态标签的用法:
import asyncio
from aioprometheus import Counter, Registry, render
from aiohttp import web
registry = Registry()
user_login_total = Counter(
"user_login_total",
"Total number of user logins by user type and platform",
labelnames=["user_type", "platform"]
)
registry.register(user_login_total)
# 模拟用户登录接口,接收用户类型和平台参数
async def login_handler(request):
data = await request.json()
user_type = data.get("user_type", "guest") # 可选值:guest, member, admin
platform = data.get("platform", "web") # 可选值:web, ios, android
# 动态传入标签值
user_login_total.inc({"user_type": user_type, "platform": platform})
return web.json_response({"status": "success", "message": "Login successful"})
async def metrics_handler(request):
content, http_headers = render(registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
async def create_app():
app = web.Application()
app.add_routes([
web.post("/api/login", login_handler),
web.get("/metrics", metrics_handler)
])
return app
if __name__ == "__main__":
app = asyncio.run(create_app())
web.run_app(app, host="0.0.0.0", port=8084)代码说明:
- 指标
user_login_total定义了user_type和platform两个标签,用于区分不同类型用户和不同登录平台的登录次数; - 在
login_handler接口中,从请求参数中动态获取user_type和platform的值,并传入inc()方法; - 通过Postman等工具发送POST请求到
/api/login,请求体携带{"user_type": "member", "platform": "ios"}等参数,即可按标签维度统计登录次数。
4.2 多注册表管理
在大型应用中,不同模块可能需要独立的指标管理,此时可以使用多注册表机制,将不同模块的指标注册到不同的Registry对象中,再统一暴露或分别暴露指标接口。
import asyncio
from aioprometheus import Counter, Registry, render
from aiohttp import web
# 为用户模块创建注册表
user_registry = Registry()
user_register_total = Counter(
"user_register_total",
"Total number of user registrations",
labelnames=["channel"]
)
user_registry.register(user_register_total)
# 为订单模块创建注册表
order_registry = Registry()
order_create_total = Counter(
"order_create_total",
"Total number of order creations",
labelnames=["order_type"]
)
order_registry.register(order_create_total)
# 用户注册接口
async def user_register_handler(request):
data = await request.json()
channel = data.get("channel", "direct")
user_register_total.inc({"channel": channel})
return web.json_response({"status": "success", "message": "User registered"})
# 订单创建接口
async def order_create_handler(request):
data = await request.json()
order_type = data.get("order_type", "normal")
order_create_total.inc({"order_type": order_type})
return web.json_response({"status": "success", "message": "Order created"})
# 暴露用户模块指标
async def user_metrics_handler(request):
content, http_headers = render(user_registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
# 暴露订单模块指标
async def order_metrics_handler(request):
content, http_headers = render(order_registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
# 暴露所有模块指标(合并注册表)
async def all_metrics_handler(request):
# 合并多个注册表的指标
combined_registry = Registry()
for metric in user_registry.collect():
combined_registry.register(metric)
for metric in order_registry.collect():
combined_registry.register(metric)
content, http_headers = render(combined_registry, request.headers.get('accept'))
return web.Response(body=content, headers=http_headers)
async def create_app():
app = web.Application()
app.add_routes([
web.post("/api/user/register", user_register_handler),
web.post("/api/order/create", order_create_handler),
web.get("/metrics/user", user_metrics_handler),
web.get("/metrics/order", order_metrics_handler),
web.get("/metrics/all", all_metrics_handler)
])
return app
if __name__ == "__main__":
app = asyncio.run(create_app())
web.run_app(app, host="0.0.0.0", port=8085)代码说明:
- 分别为用户模块和订单模块创建独立的
Registry对象,注册各自的指标; - 提供
/metrics/user和/metrics/order接口,分别暴露对应模块的指标; - 实现
/metrics/all接口,通过遍历两个注册表的指标并注册到新的Registry对象中,实现指标合并暴露,满足Prometheus一次性拉取所有指标的需求。
五、aioprometheus与Prometheus服务器集成实战
5.1 Prometheus服务器配置
要实现对aioprometheus暴露指标的监控,需要配置Prometheus服务器定期拉取指标数据。首先下载并安装Prometheus(下载地址:https://prometheus.io/download/),然后修改prometheus.yml配置文件:
global:
scrape_interval: 15s # 每15秒拉取一次指标
scrape_configs:
- job_name: 'aioprometheus_demo'
static_configs:
- targets: ['localhost:8080', 'localhost:8081', 'localhost:8082', 'localhost:8083', 'localhost:8084', 'localhost:8085']
metrics_path: '/metrics'配置说明:
scrape_interval设置为15秒,表示Prometheus每15秒拉取一次指标;job_name为任务名称,可自定义;targets指定需要拉取指标的服务地址列表,即我们之前运行的各个aioprometheus示例服务;metrics_path指定指标接口路径,默认为/metrics。
5.2 启动Prometheus并查看指标
启动Prometheus服务器:
./prometheus --config.file=prometheus.yml访问http://localhost:9090进入Prometheus Web界面,在查询框中输入指标名(如http_requests_total、task_queue_length等),即可查看指标的实时数据和变化趋势。
5.3 可视化监控面板(Grafana集成)
为了更直观地展示指标数据,可将Prometheus作为数据源集成到Grafana中:
- 安装并启动Grafana(下载地址:https://grafana.com/grafana/download);
- 访问
http://localhost:3000,使用默认账号密码(admin/admin)登录; - 进入
Configuration > Data Sources,添加Prometheus数据源,设置URL为http://localhost:9090; - 进入
Dashboards > Import,导入官方或自定义的Dashboard模板,即可实现指标的可视化监控。
六、相关资源链接
- Pypi地址:https://pypi.org/project/aioprometheus
- Github地址:https://github.com/claws/aioprometheus
- 官方文档地址:https://aioprometheus.readthedocs.io/en/latest/
关注我,每天分享一个实用的Python自动化工具。

