Python实用工具:neo4j-driver快速上手与实战指南

一、neo4j-driver 核心介绍

neo4j-driver是Python连接Neo4j图数据库的官方驱动库,用于在Python代码中实现对Neo4j数据库的增删改查、事务管理等操作。其工作原理是基于Bolt协议与Neo4j服务器建立高效通信,支持同步和异步两种操作模式。该库优点是兼容性强、性能稳定、贴合官方API设计;缺点是异步模式对Python版本要求较高(需3.7+),且新手易在事务处理上出错。License类型为Apache License 2.0,可免费用于商业和开源项目,整体介绍控制在200字内。

二、neo4j-driver 安装与环境准备

2.1 安装方式

对于技术小白来说,安装neo4j-driver的过程非常简单,只需要使用Python的包管理工具pip即可完成。打开命令行终端,输入以下命令:

pip install neo4j-driver

这条命令会自动从PyPI下载并安装最新版本的neo4j-driver库,以及其依赖的相关组件。安装完成后,我们可以在Python环境中通过import neo4j来验证是否安装成功,如果没有报错,就说明安装完成。

2.2 环境前置要求

在使用neo4j-driver之前,我们需要确保本地或者远程已经部署了Neo4j数据库服务。Neo4j数据库的安装可以参考其官方文档,这里简单说明几个关键步骤:

  1. 下载对应系统版本的Neo4j安装包(社区版免费);
  2. 安装并启动Neo4j服务;
  3. 访问Neo4j的Web管理界面(默认地址:http://localhost:7474);
  4. 首次登录时修改默认用户名(neo4j)和密码(neo4j)。

后续Python代码连接数据库时,需要用到用户名、密码和数据库的Bolt协议连接地址(默认是bolt://localhost:7687)。

三、neo4j-driver 核心使用方法与代码实例

neo4j-driver的核心操作围绕“驱动对象-会话对象-Cypher语句执行”这一流程展开。Cypher是Neo4j的查询语言,用于操作图数据库中的节点和关系,我们在使用neo4j-driver时,主要是通过执行Cypher语句来实现数据库操作。

3.1 建立数据库连接

首先,我们需要创建一个驱动对象(Driver),驱动对象是连接Neo4j数据库的核心入口,通过它可以创建会话(Session)来执行具体操作。

from neo4j import GraphDatabase

# 定义Neo4j数据库的连接信息
URI = "bolt://localhost:7687"
USERNAME = "neo4j"
PASSWORD = "your_password"  # 替换为你自己的密码

# 创建驱动对象
driver = GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD))

# 验证连接是否成功
def verify_connection():
    with driver.session() as session:
        result = session.run("RETURN 'Connection successful' AS message")
        return result.single()["message"]

if __name__ == "__main__":
    try:
        message = verify_connection()
        print(message)
    except Exception as e:
        print(f"Connection failed: {e}")
    finally:
        driver.close()  # 关闭驱动连接,释放资源

代码说明

  • GraphDatabase.driver()方法用于创建驱动对象,参数传入Bolt协议地址和认证信息;
  • 使用with driver.session()创建会话对象,with语句会自动管理会话的生命周期,无需手动关闭;
  • session.run()方法用于执行Cypher语句,这里执行的是一个简单的返回语句,验证连接是否正常;
  • 最后通过driver.close()关闭驱动,释放数据库连接资源,这一步在程序结束时是必须的,避免资源泄漏。

3.2 节点的创建与查询

图数据库的核心是节点(Node)和关系(Relationship),我们先从节点的创建和查询开始学习。

3.2.1 创建单个节点

from neo4j import GraphDatabase

class Neo4jNodeHandler:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def create_person_node(self, name, age):
        """创建一个Person类型的节点,包含name和age属性"""
        with self.driver.session() as session:
            # 执行Cypher创建节点语句,使用参数化查询避免注入风险
            result = session.run(
                "CREATE (p:Person {name: $name, age: $age}) RETURN p",
                name=name, age=age
            )
            # 获取创建的节点信息
            node = result.single()["p"]
            return f"Created node: {node}"

if __name__ == "__main__":
    handler = Neo4jNodeHandler("bolt://localhost:7687", "neo4j", "your_password")
    try:
        print(handler.create_person_node("Alice", 25))
    finally:
        handler.close()

代码说明

  • 我们定义了一个Neo4jNodeHandler类来封装数据库操作,提高代码的复用性;
  • create_person_node方法中,使用CREATE语句创建一个标签为Person的节点,节点包含nameage两个属性;
  • 采用参数化查询的方式($name$age),而不是直接拼接字符串,这样可以有效避免Cypher注入攻击,保证代码安全;
  • result.single()用于获取查询结果的第一条记录,因为CREATE语句只会返回一个创建的节点。

3.2.2 查询节点

from neo4j import GraphDatabase

class Neo4jNodeHandler:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def get_person_by_name(self, name):
        """根据姓名查询Person节点"""
        with self.driver.session() as session:
            result = session.run(
                "MATCH (p:Person {name: $name}) RETURN p.name AS name, p.age AS age",
                name=name
            )
            # 遍历查询结果
            persons = []
            for record in result:
                persons.append({"name": record["name"], "age": record["age"]})
            return persons

if __name__ == "__main__":
    handler = Neo4jNodeHandler("bolt://localhost:7687", "neo4j", "your_password")
    try:
        persons = handler.get_person_by_name("Alice")
        for person in persons:
            print(f"Found person: {person['name']}, Age: {person['age']}")
    finally:
        handler.close()

代码说明

  • 使用MATCH语句匹配标签为Personname属性为指定值的节点;
  • RETURN语句指定返回节点的nameage属性,并为其设置别名,方便后续获取;
  • 通过遍历result对象,可以获取所有匹配的节点记录,适合处理多条结果的场景。

3.3 关系的创建与查询

图数据库的优势在于处理节点之间的关系,接下来我们学习如何创建和查询节点之间的关系。

3.3.1 创建节点间的关系

from neo4j import GraphDatabase

class Neo4jRelationshipHandler:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def create_friend_relationship(self, name1, name2):
        """创建两个Person节点之间的FRIENDS关系"""
        with self.driver.session() as session:
            result = session.run(
                """
                MATCH (a:Person {name: $name1}), (b:Person {name: $name2})
                MERGE (a)-[r:FRIENDS]->(b)
                RETURN a.name AS from, b.name AS to, type(r) AS relationship
                """,
                name1=name1, name2=name2
            )
            record = result.single()
            return f"Created relationship: {record['from']} -[{record['relationship']}]-> {record['to']}"

if __name__ == "__main__":
    handler = Neo4jRelationshipHandler("bolt://localhost:7687", "neo4j", "your_password")
    try:
        # 先创建两个节点
        handler.driver.session().run("CREATE (p1:Person {name: 'Alice', age:25}), (p2:Person {name: 'Bob', age:28})")
        # 创建关系
        print(handler.create_friend_relationship("Alice", "Bob"))
    finally:
        handler.close()

代码说明

  • 使用MATCH语句匹配两个已存在的Person节点;
  • MERGE语句用于创建关系,如果该关系已经存在,则不会重复创建,避免数据冗余;
  • 关系的标签为FRIENDS,方向是从Alice指向Bob,表示AliceBob是朋友关系。

3.3.2 查询节点间的关系

from neo4j import GraphDatabase

class Neo4jRelationshipHandler:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def get_friends(self, name):
        """查询指定人物的所有朋友"""
        with self.driver.session() as session:
            result = session.run(
                """
                MATCH (a:Person {name: $name})-[r:FRIENDS]->(b:Person)
                RETURN b.name AS friend_name, b.age AS friend_age
                """,
                name=name
            )
            friends = []
            for record in result:
                friends.append({"name": record["friend_name"], "age": record["friend_age"]})
            return friends

if __name__ == "__main__":
    handler = Neo4jRelationshipHandler("bolt://localhost:7687", "neo4j", "your_password")
    try:
        friends = handler.get_friends("Alice")
        print(f"Alice's friends:")
        for friend in friends:
            print(f"- {friend['name']}, Age: {friend['age']}")
    finally:
        handler.close()

代码说明

  • MATCH语句匹配指定节点(Alice)通过FRIENDS关系连接的其他Person节点;
  • 遍历结果可以得到Alice的所有朋友信息,体现了图数据库在关联查询上的便捷性。

3.4 事务管理

在数据库操作中,事务是保证数据一致性的重要机制。neo4j-driver支持显式事务和隐式事务,隐式事务通过session.run()自动管理,而显式事务则需要手动控制提交和回滚。

from neo4j import GraphDatabase, TransactionError

class Neo4jTransactionHandler:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def batch_create_persons(self, persons):
        """批量创建Person节点,使用显式事务保证原子性"""
        with self.driver.session() as session:
            # 开启显式事务
            tx = session.begin_transaction()
            try:
                for person in persons:
                    tx.run(
                        "CREATE (p:Person {name: $name, age: $age}) RETURN p",
                        name=person["name"], age=person["age"]
                    )
                # 提交事务
                tx.commit()
                return f"Successfully created {len(persons)} persons"
            except TransactionError as e:
                # 回滚事务
                tx.rollback()
                return f"Transaction failed: {e}"

if __name__ == "__main__":
    handler = Neo4jTransactionHandler("bolt://localhost:7687", "neo4j", "your_password")
    try:
        persons = [
            {"name": "Charlie", "age": 30},
            {"name": "David", "age": 32},
            {"name": "Eve", "age": 27}
        ]
        print(handler.batch_create_persons(persons))
    finally:
        handler.close()

代码说明

  • 使用session.begin_transaction()开启显式事务;
  • 在事务中执行多条创建节点的操作,所有操作要么全部成功提交,要么全部失败回滚;
  • 通过try-except捕获TransactionError异常,在异常发生时执行tx.rollback(),保证数据一致性;
  • 这种方式适合批量操作或者需要多个步骤协同完成的数据库任务。

3.5 异步操作模式

neo4j-driver从4.0版本开始支持异步操作,异步模式基于Python的asyncio库,可以提高程序的并发性能,适合高并发场景下的数据库操作。

import asyncio
from neo4j import AsyncGraphDatabase

class AsyncNeo4jHandler:
    def __init__(self, uri, username, password):
        self.driver = AsyncGraphDatabase.driver(uri, auth=(username, password))

    async def close(self):
        await self.driver.close()

    async def get_person_async(self, name):
        """异步查询Person节点"""
        async with self.driver.session() as session:
            result = await session.run(
                "MATCH (p:Person {name: $name}) RETURN p.name AS name, p.age AS age",
                name=name
            )
            persons = []
            async for record in result:
                persons.append({"name": record["name"], "age": record["age"]})
            return persons

async def main():
    handler = AsyncNeo4jHandler("bolt://localhost:7687", "neo4j", "your_password")
    try:
        persons = await handler.get_person_async("Alice")
        for person in persons:
            print(f"Async found person: {person['name']}, Age: {person['age']}")
    finally:
        await handler.close()

if __name__ == "__main__":
    asyncio.run(main())

代码说明

  • 异步驱动使用AsyncGraphDatabase.driver()创建,与同步驱动的API类似,但方法都需要使用await关键字;
  • async with语句用于创建异步会话,async for用于遍历异步查询结果;
  • 异步操作需要在asyncio的事件循环中执行,通过asyncio.run()启动主函数;
  • 异步模式适合需要同时处理大量数据库请求的场景,能够有效提升程序的响应速度。

四、实际案例:构建一个简单的社交关系图谱

为了更好地理解neo4j-driver的实际应用,我们构建一个简单的社交关系图谱案例。这个案例实现以下功能:

  1. 批量创建用户节点;
  2. 为用户节点添加朋友关系;
  3. 查询指定用户的所有朋友及其朋友的朋友(二度关系)。

4.1 完整案例代码

from neo4j import GraphDatabase, TransactionError

class SocialGraphManager:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def batch_create_users(self, users):
        """批量创建用户节点"""
        with self.driver.session() as session:
            tx = session.begin_transaction()
            try:
                for user in users:
                    tx.run(
                        "CREATE (u:User {id: $id, name: $name, gender: $gender}) RETURN u",
                        id=user["id"], name=user["name"], gender=user["gender"]
                    )
                tx.commit()
                return f"Batch created {len(users)} users successfully"
            except TransactionError as e:
                tx.rollback()
                return f"Batch create failed: {str(e)}"

    def add_friend_relation(self, user_id1, user_id2):
        """添加两个用户之间的朋友关系"""
        with self.driver.session() as session:
            result = session.run(
                """
                MATCH (a:User {id: $id1}), (b:User {id: $id2})
                MERGE (a)-[r:FRIEND]->(b)
                MERGE (b)-[r2:FRIEND]->(a)
                RETURN a.name AS name1, b.name AS name2
                """,
                id1=user_id1, id2=user_id2
            )
            record = result.single()
            if record:
                return f"{record['name1']} and {record['name2']} are now friends"
            else:
                return "User not found"

    def get_second_degree_friends(self, user_id):
        """查询指定用户的二度朋友(朋友的朋友)"""
        with self.driver.session() as session:
            result = session.run(
                """
                MATCH (me:User {id: $id})-[r1:FRIEND]->(friend:User)-[r2:FRIEND]->(second_friend:User)
                WHERE NOT (me)-[:FRIEND]->(second_friend) AND me <> second_friend
                RETURN DISTINCT second_friend.name AS name, second_friend.gender AS gender
                """,
                id=user_id
            )
            second_friends = []
            for record in result:
                second_friends.append({
                    "name": record["name"],
                    "gender": record["gender"]
                })
            return second_friends

if __name__ == "__main__":
    # 初始化管理器
    graph_manager = SocialGraphManager("bolt://localhost:7687", "neo4j", "your_password")

    # 1. 批量创建用户
    users = [
        {"id": 1, "name": "Alice", "gender": "female"},
        {"id": 2, "name": "Bob", "gender": "male"},
        {"id": 3, "name": "Charlie", "gender": "male"},
        {"id": 4, "name": "David", "gender": "male"},
        {"id": 5, "name": "Eve", "gender": "female"}
    ]
    print(graph_manager.batch_create_users(users))

    # 2. 添加朋友关系
    print(graph_manager.add_friend_relation(1, 2))
    print(graph_manager.add_friend_relation(2, 3))
    print(graph_manager.add_friend_relation(3, 4))
    print(graph_manager.add_friend_relation(4, 5))

    # 3. 查询Alice的二度朋友
    second_friends = graph_manager.get_second_degree_friends(1)
    print("\nAlice's second-degree friends:")
    for friend in second_friends:
        print(f"- {friend['name']} ({friend['gender']})")

    # 关闭连接
    graph_manager.close()

4.2 代码说明

  • SocialGraphManager类封装了社交图谱的所有操作,包括批量创建用户、添加朋友关系和查询二度朋友;
  • batch_create_users方法使用显式事务保证批量创建的原子性,避免部分用户创建成功而部分失败的情况;
  • add_friend_relation方法创建双向的FRIEND关系,因为朋友关系是相互的;
  • get_second_degree_friends方法通过MATCH语句匹配用户的朋友的朋友,使用WHERE子句排除直接朋友和用户自己,DISTINCT关键字用于去重,避免重复的二度朋友记录。

4.3 运行结果

执行上述代码后,控制台会输出以下内容:

Batch created 5 users successfully
Alice and Bob are now friends
Bob and Charlie are now friends
Charlie and David are now friends
David and Eve are now friends

Alice's second-degree friends:
- Charlie (male)

这个结果符合预期,Alice的直接朋友是Bob,Bob的朋友是Charlie,因此Alice的二度朋友是Charlie。

五、相关资源地址

  • Pypi地址:https://pypi.org/project/neo4j-driver
  • Github地址:https://github.com/neo4j/neo4j-python-driver
  • 官方文档地址:https://neo4j.com/docs/python-manual/current/

关注我,每天分享一个实用的Python自动化工具。