一、Cassandra Driver库核心概述
Python Cassandra Driver是官方提供的用于连接和操作Apache Cassandra数据库的客户端库,其核心用途是帮助开发者在Python程序中实现与Cassandra集群的通信,执行数据的增删改查、集群管理等操作。工作原理上,该库基于Cassandra的原生协议,通过会话(Session)机制建立连接,利用一致性哈希算法定位数据所在节点,支持异步和同步两种操作模式。

该库的优点是兼容性强,支持最新的Cassandra版本,提供完善的连接池管理、负载均衡和故障重试机制;缺点是对于大规模数据批量操作,性能调优需要一定的专业知识,且学习曲线相对陡峭。其License类型为Apache License 2.0,开源且可商用。
二、Cassandra Driver安装方法
在使用Cassandra Driver之前,需要确保本地环境已经安装了Python(推荐3.7及以上版本),同时目标Cassandra集群已经正常启动并可访问。安装该库的方式非常简单,直接使用Python的包管理工具pip即可完成安装,具体命令如下:
pip install cassandra-driver
安装完成后,可以在Python环境中通过以下代码验证是否安装成功:
# 验证Cassandra Driver是否安装成功
try:
from cassandra.cluster import Cluster
print("Cassandra Driver安装成功!")
except ImportError as e:
print(f"安装失败,错误信息:{e}")
运行上述代码后,如果控制台输出“Cassandra Driver安装成功!”,则说明库已经正确安装到当前Python环境中。
三、Cassandra Driver核心使用方法与实例代码
3.1 建立与Cassandra集群的连接
要操作Cassandra数据库,第一步是建立与集群的连接。Cassandra Driver提供了Cluster类来实现集群连接的管理,Cluster类需要传入集群中节点的IP地址列表,默认端口为9042。连接成功后会返回一个Cluster实例,通过该实例的connect()方法可以创建一个会话(Session),会话是执行所有数据库操作的核心对象。
实例代码1:基础集群连接
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
# 1. 配置认证信息(如果集群开启了认证)
auth_provider = PlainTextAuthProvider(
username='your_username',
password='your_password'
)
# 2. 建立集群连接
# 传入节点IP列表,这里以本地单节点为例
cluster = Cluster(
contact_points=['127.0.0.1'],
port=9042,
auth_provider=auth_provider # 无认证时可省略此参数
)
# 3. 创建会话
session = cluster.connect()
print("成功连接到Cassandra集群!")
代码说明:
- 当Cassandra集群开启了用户名密码认证时,需要使用
PlainTextAuthProvider类配置认证信息;如果集群未开启认证,可以直接省略auth_provider参数。 contact_points参数传入的是集群中部分节点的IP地址,Driver会自动发现集群中的其他节点。- 会话创建成功后,就可以基于该会话执行KeySpace(键空间,类似数据库)和表的相关操作。
3.2 KeySpace的创建与切换
KeySpace是Cassandra中用于隔离数据的逻辑容器,相当于关系型数据库中的“数据库”概念。在进行数据操作之前,通常需要先创建KeySpace,或者切换到已存在的KeySpace。
实例代码2:创建KeySpace并切换
# 1. 定义创建KeySpace的CQL语句
# SimpleStrategy为简单副本策略,replication_factor为副本数量(单节点集群设为1)
create_keyspace_cql = """
CREATE KEYSPACE IF NOT EXISTS my_keyspace
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
"""
# 2. 执行创建KeySpace的语句
session.execute(create_keyspace_cql)
print("KeySpace创建成功!")
# 3. 切换到创建好的KeySpace
session.set_keyspace('my_keyspace')
print("已切换到my_keyspace键空间!")
代码说明:
- CQL(Cassandra Query Language)是操作Cassandra的查询语言,语法与SQL类似但有差异。
IF NOT EXISTS关键字用于避免重复创建KeySpace时出现错误。replication参数用于配置副本策略,SimpleStrategy适用于单数据中心的集群,replication_factor表示每个数据块的副本数量,生产环境中通常根据集群规模设置为3或更高。
3.3 数据表的创建与管理
在KeySpace下可以创建多个数据表,Cassandra是面向列族的数据库,表的结构需要提前定义。下面以创建一个存储用户信息的表为例,演示如何使用Cassandra Driver创建数据表。
实例代码3:创建用户信息表
# 1. 定义创建用户表的CQL语句
create_table_cql = """
CREATE TABLE IF NOT EXISTS user_info (
user_id UUID PRIMARY KEY,
username TEXT,
age INT,
email TEXT,
register_time TIMESTAMP
)
"""
# 2. 执行创建表的语句
session.execute(create_table_cql)
print("user_info表创建成功!")
代码说明:
UUID是Cassandra中常用的主键类型,用于生成全局唯一的标识符;TEXT对应字符串类型,INT对应整数类型,TIMESTAMP对应时间戳类型。PRIMARY KEY指定表的主键,Cassandra的主键分为分区键和聚类键,这里的user_id是分区键,用于数据的分片存储。
3.4 数据的增删改查操作
数据的增删改查是数据库操作的核心,Cassandra Driver支持通过执行CQL语句来实现这些操作,同时也提供了参数化查询的方式,避免SQL注入风险。
3.4.1 插入数据
插入数据使用INSERT语句,通过参数化查询可以灵活地传入不同的数据。
实例代码4:插入单条用户数据
import uuid
from datetime import datetime
# 1. 生成UUID类型的user_id
user_id = uuid.uuid4()
# 2. 定义插入数据的CQL语句
insert_cql = """
INSERT INTO user_info (user_id, username, age, email, register_time)
VALUES (%s, %s, %s, %s, %s)
"""
# 3. 准备插入的数据
user_data = (
user_id,
"zhangsan",
25,
"[email protected]",
datetime.now()
)
# 4. 执行插入操作
session.execute(insert_cql, user_data)
print(f"成功插入用户数据,用户ID:{user_id}")
代码说明:
uuid.uuid4()用于生成随机的UUID,确保user_id的唯一性。- 参数化查询中使用
%s作为占位符,传入的数据元组需要与占位符的数量和类型一一对应。 datetime.now()生成当前时间的时间戳,用于记录用户的注册时间。
实例代码5:批量插入多条用户数据
from cassandra.query import BatchStatement
# 1. 创建批量操作对象
batch = BatchStatement()
# 2. 定义插入数据的CQL语句
insert_cql = """
INSERT INTO user_info (user_id, username, age, email, register_time)
VALUES (%s, %s, %s, %s, %s)
"""
# 3. 准备多条用户数据
user_list = [
(uuid.uuid4(), "lisi", 28, "[email protected]", datetime.now()),
(uuid.uuid4(), "wangwu", 30, "[email protected]", datetime.now()),
(uuid.uuid4(), "zhaoliu", 22, "[email protected]", datetime.now())
]
# 4. 将多条插入操作添加到批量对象中
for data in user_list:
batch.add(insert_cql, data)
# 5. 执行批量插入操作
session.execute(batch)
print("成功批量插入3条用户数据!")
代码说明:
BatchStatement用于实现批量操作,可以有效减少网络往返次数,提升大批量数据插入的效率。- 批量操作中可以添加多个相同或不同的CQL语句,适用于需要一次性执行多条数据操作的场景。
3.4.2 查询数据
查询数据使用SELECT语句,Cassandra Driver支持查询单条数据、多条数据以及带条件的查询。
实例代码6:查询所有用户数据
# 1. 定义查询所有数据的CQL语句
select_all_cql = "SELECT * FROM user_info"
# 2. 执行查询操作,返回结果集
result_set = session.execute(select_all_cql)
# 3. 遍历结果集并打印数据
print("所有用户信息:")
for row in result_set:
print(f"用户ID:{row.user_id},用户名:{row.username},年龄:{row.age},邮箱:{row.email},注册时间:{row.register_time}")
代码说明:
session.execute()执行查询语句后,会返回一个ResultSet对象,该对象是可迭代的,可以通过循环遍历获取每一行数据。- 每一行数据可以通过列名直接访问,例如
row.username表示获取当前行的username列的值。
实例代码7:带条件查询指定用户数据
# 1. 定义带条件的查询语句
select_cql = "SELECT * FROM user_info WHERE username = %s"
# 2. 执行查询操作,传入查询参数
result_set = session.execute(select_cql, ("lisi",))
# 3. 处理查询结果
user = list(result_set)
if user:
print(f"查询到用户信息:用户ID:{user[0].user_id},用户名:{user[0].username},年龄:{user[0].age}")
else:
print("未查询到指定用户数据!")
代码说明:
- 带条件查询时,
WHERE子句中使用的列需要是主键的一部分或者创建了索引,否则会报错。 - 将
ResultSet对象转换为列表后,可以通过索引访问具体的行数据。
3.4.3 更新数据
更新数据使用UPDATE语句,可以修改表中已存在的数据。
实例代码8:更新用户年龄数据
# 1. 定义更新数据的CQL语句
update_cql = "UPDATE user_info SET age = %s WHERE username = %s"
# 2. 执行更新操作
session.execute(update_cql, (29, "lisi"))
print("成功更新用户lisi的年龄!")
# 3. 查询更新后的数据,验证更新结果
result_set = session.execute("SELECT * FROM user_info WHERE username = %s", ("lisi",))
user = list(result_set)[0]
print(f"更新后lisi的年龄为:{user.age}")
代码说明:
UPDATE语句的WHERE子句必须包含主键列,否则无法定位到具体的数据行。- 更新操作执行后,可以通过查询语句验证数据是否更新成功。
3.4.4 删除数据
删除数据使用DELETE语句,可以删除表中的指定数据行。
实例代码9:删除指定用户数据
# 1. 定义删除数据的CQL语句
delete_cql = "DELETE FROM user_info WHERE username = %s"
# 2. 执行删除操作
session.execute(delete_cql, ("zhaoliu",))
print("成功删除用户zhaoliu的数据!")
# 3. 查询删除后的数据,验证删除结果
result_set = session.execute("SELECT * FROM user_info WHERE username = %s", ("zhaoliu",))
if list(result_set):
print("删除失败,用户数据仍存在!")
else:
print("删除成功,用户数据已不存在!")
代码说明:
DELETE语句的WHERE子句同样需要包含主键列,确保只删除目标数据行。- 删除操作执行后,通过查询可以验证数据是否被成功删除。
3.5 连接关闭与资源释放
当所有数据库操作完成后,需要及时关闭会话和集群连接,释放占用的资源。
实例代码10:关闭连接
# 1. 关闭会话
session.shutdown()
# 2. 关闭集群连接
cluster.shutdown()
print("成功关闭与Cassandra集群的连接!")
代码说明:
- 会话和集群连接的关闭顺序没有强制要求,但建议先关闭会话,再关闭集群连接。
- 及时关闭连接可以避免资源泄露,尤其是在长时间运行的程序中,这一操作至关重要。
四、Cassandra Driver实战案例:用户信息管理系统
为了更好地展示Cassandra Driver的实际应用,下面我们构建一个简单的用户信息管理系统,该系统实现了用户信息的添加、查询、更新和删除功能。
4.1 系统功能需求
- 能够添加新用户的信息,包括用户名、年龄、邮箱和注册时间。
- 能够查询所有用户的信息,也能够根据用户名查询指定用户的信息。
- 能够根据用户名更新用户的年龄信息。
- 能够根据用户名删除指定用户的信息。
4.2 系统代码实现
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
import uuid
from datetime import datetime
class CassandraUserManager:
def __init__(self, contact_points, username=None, password=None, keyspace="my_keyspace"):
"""
初始化Cassandra连接和会话
:param contact_points: 集群节点IP列表
:param username: 认证用户名
:param password: 认证密码
:param keyspace: 要使用的键空间
"""
# 配置认证信息
if username and password:
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(contact_points=contact_points, auth_provider=auth_provider)
else:
self.cluster = Cluster(contact_points=contact_points)
# 创建会话并切换键空间
self.session = self.cluster.connect()
self.keyspace = keyspace
self._create_keyspace()
self._create_user_table()
def _create_keyspace(self):
"""创建键空间"""
create_keyspace_cql = f"""
CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
"""
self.session.execute(create_keyspace_cql)
self.session.set_keyspace(self.keyspace)
def _create_user_table(self):
"""创建用户信息表"""
create_table_cql = """
CREATE TABLE IF NOT EXISTS user_info (
user_id UUID PRIMARY KEY,
username TEXT,
age INT,
email TEXT,
register_time TIMESTAMP
)
"""
self.session.execute(create_table_cql)
def add_user(self, username, age, email):
"""
添加单个用户信息
:param username: 用户名
:param age: 年龄
:param email: 邮箱
:return: 用户ID
"""
user_id = uuid.uuid4()
insert_cql = """
INSERT INTO user_info (user_id, username, age, email, register_time)
VALUES (%s, %s, %s, %s, %s)
"""
self.session.execute(insert_cql, (user_id, username, age, email, datetime.now()))
return user_id
def batch_add_users(self, user_list):
"""
批量添加用户信息
:param user_list: 用户信息列表,每个元素为(用户名, 年龄, 邮箱)
"""
batch = BatchStatement()
insert_cql = """
INSERT INTO user_info (user_id, username, age, email, register_time)
VALUES (%s, %s, %s, %s, %s)
"""
for username, age, email in user_list:
batch.add(insert_cql, (uuid.uuid4(), username, age, email, datetime.now()))
self.session.execute(batch)
def query_all_users(self):
"""查询所有用户信息"""
select_cql = "SELECT * FROM user_info"
result_set = self.session.execute(select_cql)
return list(result_set)
def query_user_by_name(self, username):
"""
根据用户名查询用户信息
:param username: 用户名
:return: 用户信息列表
"""
select_cql = "SELECT * FROM user_info WHERE username = %s"
result_set = self.session.execute(select_cql, (username,))
return list(result_set)
def update_user_age(self, username, new_age):
"""
根据用户名更新用户年龄
:param username: 用户名
:param new_age: 新年龄
"""
update_cql = "UPDATE user_info SET age = %s WHERE username = %s"
self.session.execute(update_cql, (new_age, username))
def delete_user_by_name(self, username):
"""
根据用户名删除用户信息
:param username: 用户名
"""
delete_cql = "DELETE FROM user_info WHERE username = %s"
self.session.execute(delete_cql, (username,))
def close_connection(self):
"""关闭数据库连接"""
self.session.shutdown()
self.cluster.shutdown()
# 实例化用户管理类并测试功能
if __name__ == "__main__":
# 初始化用户管理器(本地单节点,无认证)
user_manager = CassandraUserManager(contact_points=["127.0.0.1"])
# 1. 添加单个用户
user_id = user_manager.add_user("test_user", 24, "[email protected]")
print(f"添加单个用户成功,用户ID:{user_id}")
# 2. 批量添加用户
batch_users = [
("batch_user1", 26, "[email protected]"),
("batch_user2", 27, "[email protected]")
]
user_manager.batch_add_users(batch_users)
print("批量添加用户成功!")
# 3. 查询所有用户
all_users = user_manager.query_all_users()
print("\n所有用户信息:")
for user in all_users:
print(f"ID: {user.user_id}, 用户名: {user.username}, 年龄: {user.age}, 邮箱: {user.email}")
# 4. 根据用户名查询用户
target_user = user_manager.query_user_by_name("test_user")
print(f"\n查询test_user的信息:")
if target_user:
print(f"ID: {target_user[0].user_id}, 用户名: {target_user[0].username}, 年龄: {target_user[0].age}")
# 5. 更新用户年龄
user_manager.update_user_age("test_user", 25)
updated_user = user_manager.query_user_by_name("test_user")
print(f"\n更新后test_user的年龄:{updated_user[0].age}")
# 6. 删除用户
user_manager.delete_user_by_name("test_user")
deleted_user = user_manager.query_user_by_name("test_user")
print(f"\n删除test_user后查询结果:{deleted_user}")
# 关闭连接
user_manager.close_connection()
4.3 代码说明
- 该案例通过面向对象的方式封装了用户信息管理的所有功能,
CassandraUserManager类的构造方法负责初始化集群连接和会话,并自动创建键空间和用户表。 - 类中的私有方法
_create_keyspace()和_create_user_table()分别用于创建键空间和用户表,确保在使用前相关的数据库结构已经存在。 - 公共方法
add_user()、batch_add_users()、query_all_users()等分别对应单个用户添加、批量用户添加、全量查询等功能,方便外部调用。 - 在
if __name__ == "__main__"代码块中,我们实例化了CassandraUserManager类,并依次测试了所有功能,验证了代码的正确性。
五、相关资源参考
- Pypi地址:https://pypi.org/project/cassandra-driver
- Github地址:https://github.com/datastax/python-driver
- 官方文档地址:https://docs.datastax.com/en/developer/python-driver/latest/
关注我,每天分享一个实用的Python自动化工具。

