一、influxdb库核心介绍
1.1 用途
influxdb是Python生态中专门用于对接InfluxDB时序数据库的客户端库,能够实现数据的写入、查询、修改和删除等操作,广泛应用于物联网监控、系统性能指标采集、金融行情数据存储等需要处理时序数据的场景。

1.2 工作原理
该库通过HTTP/HTTPS协议与InfluxDB服务端建立连接,将Python中的数据结构(如字典、列表)转换为InfluxDB支持的Line Protocol格式进行写入;查询时则发送InfluxQL或Flux查询语句,再将返回的结果集解析为Python可处理的对象(如DataFrame)。
1.3 优缺点
优点:轻量易用,API设计贴合Python开发者习惯;支持与Pandas无缝集成,便于数据处理;兼容InfluxDB多个版本。
缺点:对最新版InfluxDB的部分高级特性支持滞后;高并发写入场景下性能需依赖额外优化。
1.4 License类型
influxdb库采用MIT License开源协议,允许自由使用、修改和分发,无论是个人项目还是商业应用都无需支付授权费用。
二、influxdb库安装与环境准备
2.1 安装方式
安装influxdb库的方式非常简单,直接使用Python的包管理工具pip即可完成安装。在命令行中输入以下命令:
pip install influxdb
如果需要安装指定版本以适配特定的InfluxDB服务端,可指定版本号,例如安装5.3.1版本:
pip install influxdb==5.3.1
安装完成后,我们可以在Python环境中验证是否安装成功,执行以下代码:
import influxdb
print(influxdb.__version__)
若控制台输出对应的版本号,说明安装成功。
2.2 环境依赖说明
使用influxdb库前,需要确保本地或远程有可用的InfluxDB服务端。推荐使用InfluxDB 1.x系列版本(2.x版本有单独的客户端库influxdb-client),同时Python环境需满足3.6及以上版本,避免因版本过低导致兼容性问题。
三、influxdb库核心API使用实战
3.1 建立与InfluxDB的连接
在进行任何数据操作前,首先需要建立与InfluxDB数据库的连接。influxdb库提供InfluxDBClient类来实现这一功能,核心参数包括host(服务端地址)、port(端口号)、username(用户名)、password(密码)、database(目标数据库名)等。
示例代码:
from influxdb import InfluxDBClient
# 初始化客户端连接
client = InfluxDBClient(
host='localhost', # InfluxDB服务端IP
port=8086, # 默认端口号
username='admin', # 数据库用户名
password='admin123', # 数据库密码
database='test_db' # 要连接的数据库名
)
# 测试连接是否成功
print("连接状态:", "成功" if client.ping() else "失败")
代码说明:
- 导入
InfluxDBClient类后,传入服务端的连接信息初始化客户端对象。 - 调用
ping()方法测试与服务端的连通性,该方法返回True表示连接成功。 - 若连接失败,通常需要检查服务端是否启动、地址和端口是否正确、用户名密码是否匹配。
3.2 创建与删除数据库
连接成功后,我们可以通过客户端对象创建新的数据库,或者删除已有的数据库。
3.2.1 创建数据库
示例代码:
# 定义要创建的数据库名
db_name = "iot_monitor"
# 检查数据库是否存在
existing_dbs = client.get_list_database()
db_exists = any(db['name'] == db_name for db in existing_dbs)
if not db_exists:
client.create_database(db_name)
print(f"数据库 {db_name} 创建成功")
else:
print(f"数据库 {db_name} 已存在")
# 切换到新创建的数据库
client.switch_database(db_name)
代码说明:
get_list_database()方法会返回所有已存在的数据库列表,格式为[{"name": "db_name1"}, ...]。- 通过遍历列表判断目标数据库是否存在,避免重复创建。
create_database()方法用于创建新数据库,switch_database()方法用于切换到目标数据库进行后续操作。
3.2.2 删除数据库
示例代码:
# 定义要删除的数据库名
db_to_delete = "test_db"
# 检查数据库是否存在
existing_dbs = client.get_list_database()
db_exists = any(db['name'] == db_to_delete for db in existing_dbs)
if db_exists:
client.drop_database(db_to_delete)
print(f"数据库 {db_to_delete} 删除成功")
else:
print(f"数据库 {db_to_delete} 不存在")
代码说明:drop_database()方法接收数据库名作为参数,执行后会删除对应的数据库及其所有数据,操作前需谨慎确认。
3.3 写入时序数据到InfluxDB
时序数据的写入是influxdb库的核心功能之一,数据需要按照Line Protocol的格式组织,该格式的核心结构为:measurement,tag_set field_set timestamp。在Python中,我们可以通过字典列表的形式定义数据,再调用write_points()方法写入。
3.3.1 写入单条数据
示例代码:
# 定义要写入的数据
data_point = [
{
"measurement": "temperature", # 测量值名称,类似表名
"tags": {
"device_id": "sensor_001", # 标签,用于分组查询
"location": "room_101"
},
"fields": {
"value": 25.6, # 字段,存储具体数值(必须是数值类型)
"unit": "°C" # 字段可以是字符串、整数、浮点数
},
"time": "2024-01-01T12:00:00Z" # 时间戳,可选,默认使用当前时间
}
]
# 写入数据
write_result = client.write_points(data_point)
print(f"数据写入状态: {'成功' if write_result else '失败'}")
代码说明:
measurement对应时序数据的“表”,用于分类存储不同类型的数据。tags是标签字段,为字符串类型,支持索引,适合用于分组查询(如按设备ID、位置分组)。fields是数值字段,支持整数、浮点数、布尔值和字符串,是时序数据的核心指标。time是时间戳,格式为ISO 8601,可选参数,若不指定则使用服务端的当前时间。write_points()方法返回布尔值,表示写入操作是否成功。
3.3.2 写入多条数据
在实际场景中,我们通常需要批量写入多条数据,以提高写入效率。只需扩展字典列表的元素即可实现批量写入。
示例代码:
# 定义多条时序数据
batch_data = [
{
"measurement": "temperature",
"tags": {"device_id": "sensor_001", "location": "room_101"},
"fields": {"value": 25.8, "unit": "°C"},
"time": "2024-01-01T12:01:00Z"
},
{
"measurement": "temperature",
"tags": {"device_id": "sensor_001", "location": "room_101"},
"fields": {"value": 26.0, "unit": "°C"},
"time": "2024-01-01T12:02:00Z"
},
{
"measurement": "humidity",
"tags": {"device_id": "sensor_001", "location": "room_101"},
"fields": {"value": 45.2, "unit": "%"},
"time": "2024-01-01T12:00:00Z"
}
]
# 批量写入数据
write_result = client.write_points(batch_data)
print(f"批量数据写入状态: {'成功' if write_result else '失败'}")
代码说明:批量写入多条数据时,不同数据可以属于同一个measurement,也可以属于不同的measurement,write_points()方法会自动处理这些数据的分类存储。
3.4 查询InfluxDB中的时序数据
数据写入后,我们可以通过query()方法执行InfluxQL查询语句,获取存储的时序数据。查询结果可以直接转换为Pandas DataFrame,便于后续的数据分析和可视化。
3.4.1 基础查询
示例代码:
# 执行InfluxQL查询语句
query_str = 'SELECT * FROM temperature WHERE device_id = \'sensor_001\' LIMIT 10'
result = client.query(query_str)
# 将查询结果转换为列表
result_list = list(result.get_points())
print("查询到的温度数据:")
for point in result_list:
print(f"时间: {point['time']}, 设备ID: {point['device_id']}, 温度: {point['value']} {point['unit']}")
代码说明:
- InfluxQL的语法与SQL类似,
SELECT * FROM measurement表示查询该measurement下的所有数据,WHERE子句用于过滤标签或字段,LIMIT用于限制返回数据的条数。 query()方法返回的是ResultSet对象,通过get_points()方法可以获取数据的迭代器,再转换为列表方便遍历。- 每条数据以字典形式存储,包含
time、tags字段和fields字段的所有内容。
3.4.2 与Pandas集成查询
influxdb库支持直接将查询结果转换为Pandas DataFrame,这对于数据分析师来说是非常实用的功能。
示例代码:
import pandas as pd
# 执行查询并转换为DataFrame
query_str = 'SELECT value, unit FROM temperature WHERE location = \'room_101\''
result = client.query(query_str)
df = pd.DataFrame(result.get_points())
# 打印DataFrame的前5行数据
print("温度数据DataFrame:")
print(df.head())
# 计算温度的平均值
avg_temp = df['value'].mean()
print(f"\nroom_101的平均温度: {avg_temp:.2f} °C")
代码说明:
- 导入
pandas库后,将ResultSet对象转换为DataFrame,数据的结构会更加清晰,便于进行统计分析。 - 可以直接使用Pandas的内置方法(如
mean())计算数据的统计指标,快速完成数据分析任务。
3.5 删除InfluxDB中的数据
当需要清理过期或无用的数据时,可以使用delete_series()方法删除指定条件的数据。
示例代码:
# 删除指定时间之前的温度数据
delete_condition = {
"measurement": "temperature",
"tags": {"device_id": "sensor_001"},
"time__lt": "2024-01-01T12:01:00Z" # 删除时间小于该时间戳的数据
}
client.delete_series(**delete_condition)
print("符合条件的数据已删除")
代码说明:
delete_series()方法支持通过measurement、tags和时间条件过滤要删除的数据。- 时间条件的参数格式为
time__lt(小于)、time__lte(小于等于)、time__gt(大于)、time__gte(大于等于),后缀对应不同的比较逻辑。 - 数据删除操作不可逆,执行前务必确认条件是否正确。
四、实际应用案例:物联网传感器数据监控系统
4.1 案例场景介绍
本案例模拟一个物联网传感器数据监控系统,实现以下功能:
- 模拟传感器采集温度和湿度数据。
- 将采集到的数据实时写入InfluxDB。
- 定时查询并分析传感器数据,当温度超过阈值时输出告警信息。
4.2 完整代码实现
from influxdb import InfluxDBClient
import pandas as pd
import time
import random
from datetime import datetime, timezone
# 配置InfluxDB连接信息
INFLUXDB_HOST = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_USER = 'admin'
INFLUXDB_PWD = 'admin123'
INFLUXDB_DB = 'iot_monitor'
# 传感器配置
DEVICE_ID = 'sensor_002'
LOCATION = 'factory_workshop'
TEMPERATURE_THRESHOLD = 30.0 # 温度告警阈值
COLLECTION_INTERVAL = 5 # 数据采集间隔(秒)
QUERY_INTERVAL = 10 # 数据查询间隔(秒)
def init_influxdb_client():
"""初始化InfluxDB客户端并创建数据库"""
client = InfluxDBClient(
host=INFLUXDB_HOST,
port=INFLUXDB_PORT,
username=INFLUXDB_USER,
password=INFLUXDB_PWD
)
# 创建数据库
db_list = client.get_list_database()
if not any(db['name'] == INFLUXDB_DB for db in db_list):
client.create_database(INFLUXDB_DB)
client.switch_database(INFLUXDB_DB)
return client
def simulate_sensor_data():
"""模拟传感器采集温度和湿度数据"""
# 生成模拟数据,温度在25-32之间波动,湿度在40-60之间波动
temperature = round(random.uniform(25.0, 32.0), 2)
humidity = round(random.uniform(40.0, 60.0), 2)
# 获取当前UTC时间戳
current_time = datetime.now(timezone.utc).isoformat()
# 组织时序数据
data = [
{
"measurement": "temperature",
"tags": {"device_id": DEVICE_ID, "location": LOCATION},
"fields": {"value": temperature, "unit": "°C"},
"time": current_time
},
{
"measurement": "humidity",
"tags": {"device_id": DEVICE_ID, "location": LOCATION},
"fields": {"value": humidity, "unit": "%"},
"time": current_time
}
]
return data
def write_sensor_data(client, data):
"""将传感器数据写入InfluxDB"""
try:
client.write_points(data)
print(f"[{datetime.now()}] 数据写入成功: 温度={data[0]['fields']['value']}°C, 湿度={data[1]['fields']['value']}%")
except Exception as e:
print(f"[{datetime.now()}] 数据写入失败: {str(e)}")
def query_and_alert(client):
"""查询数据并判断是否触发告警"""
try:
# 查询最近10条温度数据
query_str = f"""
SELECT value FROM temperature
WHERE device_id = '{DEVICE_ID}'
AND location = '{LOCATION}'
ORDER BY time DESC
LIMIT 10
"""
result = client.query(query_str)
df = pd.DataFrame(result.get_points())
if not df.empty:
latest_temp = df['value'].iloc[0]
avg_temp = df['value'].mean()
print(f"[{datetime.now()}] 最新温度: {latest_temp:.2f}°C, 近10条平均温度: {avg_temp:.2f}°C")
# 温度超过阈值时触发告警
if latest_temp > TEMPERATURE_THRESHOLD:
print(f"⚠️ 告警: 当前温度 {latest_temp:.2f}°C 超过阈值 {TEMPERATURE_THRESHOLD}°C!")
except Exception as e:
print(f"[{datetime.now()}] 数据查询失败: {str(e)}")
if __name__ == "__main__":
# 初始化客户端
influx_client = init_influxdb_client()
print("物联网传感器数据监控系统启动...")
try:
query_counter = 0
while True:
# 采集并写入数据
sensor_data = simulate_sensor_data()
write_sensor_data(influx_client, sensor_data)
# 每隔QUERY_INTERVAL秒查询一次数据并告警
query_counter += 1
if query_counter >= QUERY_INTERVAL / COLLECTION_INTERVAL:
query_and_alert(influx_client)
query_counter = 0
# 等待采集间隔
time.sleep(COLLECTION_INTERVAL)
except KeyboardInterrupt:
print("\n系统已停止运行")
finally:
# 关闭客户端连接
influx_client.close()
4.3 代码运行说明
- 代码功能分解:
init_influxdb_client()函数:初始化客户端并创建目标数据库,确保后续操作有可用的存储位置。simulate_sensor_data()函数:模拟传感器生成温度和湿度数据,时间戳使用当前UTC时间,保证时序数据的准确性。write_sensor_data()函数:将模拟数据写入InfluxDB,并输出写入状态。query_and_alert()函数:查询最近的温度数据,计算平均值并判断是否超过阈值,超过时输出告警信息。- 主程序循环:按照设定的间隔采集数据、写入数据,并定时执行查询和告警逻辑。
- 运行效果:
运行代码后,控制台会实时输出数据写入状态,每隔10秒查询一次数据并输出温度信息,当温度超过30.0°C时,会触发告警提示。按下Ctrl+C可停止程序运行。
五、相关资源
- Pypi地址:https://pypi.org/project/influxdb
- Github地址:https://github.com/influxdata/influxdb-python
- 官方文档地址:https://influxdb-python.readthedocs.io/en/latest/
关注我,每天分享一个实用的Python自动化工具。

