Python实用工具:ODMantic入门到精通——异步MongoDB数据建模与操作指南

一、ODMantic库核心概述

ODMantic是一款专为Python异步生态设计的对象文档映射器(ODM),主要用于简化MongoDB数据库的操作流程。其核心工作原理是将Python类与MongoDB集合进行映射,类的实例对应集合中的文档,开发者无需编写原生MongoDB查询语句,通过操作Python对象即可完成数据的增删改查。

该库的优点十分突出:完全支持异步操作,可无缝对接asyncioFastAPI等异步框架;API设计简洁直观,与Python开发者熟悉的SQLAlchemy等ORM工具风格相近,学习成本低;内置数据校验功能,基于pydantic实现字段类型和约束的校验,保障数据一致性。缺点则是生态相较于老牌ODM工具mongoengine更小众,部分高级查询功能的支持度有待提升;仅适用于MongoDB,通用性较弱。

ODMantic的开源协议为MIT License,这意味着开发者可以自由地用于商业和非商业项目,修改和分发源码也不受过多限制。

二、ODMantic环境安装与配置

2.1 安装前提条件

在安装ODMantic之前,需要确保本地环境满足以下要求:

  1. Python版本≥3.7(推荐3.9及以上版本,兼容性更好);
  2. 已安装并运行MongoDB数据库(本地或远程实例均可,推荐版本≥4.0);
  3. 网络环境正常,能够通过pip下载相关依赖包。

2.2 安装命令

ODMantic的安装非常简单,直接使用pip工具在命令行中执行以下命令即可:

pip install odmantic

该命令会自动安装ODMantic及其核心依赖,包括pydantic(数据校验)、motor(异步MongoDB驱动)等,无需手动单独安装。

2.3 基础配置:连接MongoDB

使用ODMantic的第一步是建立与MongoDB数据库的异步连接。我们需要借助odmantic提供的AIOEngine类,该类是与数据库交互的核心入口,负责管理连接和执行操作。

以下是基础的连接代码示例:

from odmantic import AIOEngine
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

# 定义异步连接函数
async def connect_to_mongodb():
    # 1. 创建MongoDB异步客户端
    # 本地数据库地址:mongodb://localhost:27017/
    # 若为远程数据库,替换为对应的连接字符串,例如包含用户名和密码的地址:
    # mongodb://username:password@remote_host:27017/
    client = AsyncIOMotorClient("mongodb://localhost:27017/")
    # 2. 初始化AIOEngine,指定要操作的数据库名称
    # 这里指定数据库名为"odmantic_demo",若不存在会自动创建
    engine = AIOEngine(client=client, database="odmantic_demo")
    print("成功连接到MongoDB数据库!")
    return engine

# 运行异步函数
if __name__ == "__main__":
    engine = asyncio.run(connect_to_mongodb())

代码说明

  • AsyncIOMotorClientmotor库提供的异步MongoDB客户端,负责与数据库建立TCP连接;
  • AIOEngine是ODMantic的核心引擎,接收客户端实例和数据库名称作为参数,后续所有的数据操作都需要通过该引擎对象完成;
  • asyncio.run()用于运行异步函数,在实际的异步项目(如FastAPI)中,可直接通过await调用connect_to_mongodb()

三、ODMantic核心功能与代码示例

3.1 定义数据模型(Model)

ODMantic的数据模型基于Python类实现,继承自odmantic.Model,类中的字段对应MongoDB文档的键。字段类型通过pydantic的类型注解指定,同时支持设置默认值、必填约束等属性。

3.1.1 基础模型定义

我们以一个“用户(User)”模型为例,演示如何定义基础的数据模型:

from odmantic import Model
from typing import Optional
from datetime import datetime

class User(Model):
    # 字段1:用户名,字符串类型,必填
    username: str
    # 字段2:年龄,整数类型,必填
    age: int
    # 字段3:邮箱,可选字符串类型,默认值为None
    email: Optional[str] = None
    # 字段4:注册时间,datetime类型,默认值为当前时间
    register_time: datetime = datetime.now()

    # 可选配置:指定对应的MongoDB集合名称
    # 若不指定,默认集合名为类名的小写复数形式(此处为"users")
    class Config:
        collection = "user_collection"

代码说明

  • 模型类必须继承odmantic.Model,这是ODMantic识别模型的标志;
  • 字段的类型注解支持Python原生类型(strintdatetime等)和typing模块中的类型(Optional表示可选字段);
  • 通过Config类可以自定义模型的配置,例如collection参数指定模型对应的MongoDB集合名称,若不指定,ODMantic会自动将类名转为小写复数形式作为集合名;
  • 字段可以设置默认值,如register_time默认值为当前时间,email默认值为None

3.1.2 模型字段的常用约束

基于pydantic的特性,ODMantic支持为字段添加各种约束条件,例如字符串长度、数值范围等,确保存入数据库的数据符合预期。以下是带约束的模型示例:

from odmantic import Model, Field
from typing import Optional
from datetime import datetime

class UserWithConstraints(Model):
    # 用户名:字符串类型,长度在3-20之间,必填
    username: str = Field(min_length=3, max_length=20)
    # 年龄:整数类型,范围在0-120之间,必填
    age: int = Field(ge=0, le=120)
    # 邮箱:可选字符串类型,必须符合邮箱格式
    email: Optional[str] = Field(None, regex=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
    # 注册时间:默认当前时间
    register_time: datetime = datetime.now()

    class Config:
        collection = "constraint_users"

代码说明

  • Field类用于为字段添加额外约束,参数min_lengthmax_length限制字符串长度,ge(大于等于)和le(小于等于)限制数值范围;
  • regex参数用于指定字符串的正则表达式验证规则,这里用于校验邮箱格式的合法性;
  • 当创建模型实例时,如果字段值不符合约束条件,会直接抛出ValidationError异常,避免非法数据存入数据库。

3.2 数据的增删改查(CRUD)操作

CRUD是数据库操作的核心,ODMantic通过简洁的API实现异步的增删改查功能,所有操作都需要通过之前初始化的AIOEngine对象完成。

在进行后续操作前,我们先统一初始化引擎,并定义一个通用的异步运行函数,方便执行异步代码:

from odmantic import AIOEngine, Model, Field
from typing import Optional, List
from datetime import datetime
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

# 初始化引擎
async def get_engine():
    client = AsyncIOMotorClient("mongodb://localhost:27017/")
    return AIOEngine(client=client, database="odmantic_demo")

# 定义User模型
class User(Model):
    username: str = Field(min_length=3, max_length=20)
    age: int = Field(ge=0, le=120)
    email: Optional[str] = None
    register_time: datetime = datetime.now()

    class Config:
        collection = "users"

3.2.1 新增数据(Create)

新增数据即向MongoDB集合中插入文档,使用engine.save()方法,该方法接收一个模型实例作为参数,异步将数据插入数据库,并返回插入后的实例(包含自动生成的id字段)。

代码示例

# 新增单个用户
async def create_single_user(engine: AIOEngine):
    # 创建User模型实例
    user = User(username="zhangsan", age=25, email="[email protected]")
    # 保存到数据库
    saved_user = await engine.save(user)
    print("新增用户成功:")
    print(f"用户ID: {saved_user.id}")
    print(f"用户名: {saved_user.username}")
    print(f"年龄: {saved_user.age}")
    print(f"邮箱: {saved_user.email}")
    return saved_user

# 新增多个用户
async def create_multiple_users(engine: AIOEngine):
    # 创建多个用户实例
    user1 = User(username="lisi", age=30)
    user2 = User(username="wangwu", age=22, email="[email protected]")
    # 批量保存
    saved_users = await engine.save_all([user1, user2])
    print("\n批量新增用户成功,共新增{}个用户:".format(len(saved_users)))
    for u in saved_users:
        print(f"ID: {u.id}, 用户名: {u.username}")

# 执行新增操作
async def run_create_demo():
    engine = await get_engine()
    await create_single_user(engine)
    await create_multiple_users(engine)

if __name__ == "__main__":
    asyncio.run(run_create_demo())

代码说明

  • engine.save()用于插入单个文档,插入后会自动为实例添加id属性(对应MongoDB文档的_id字段,类型为ObjectId);
  • engine.save_all()用于批量插入多个文档,接收一个模型实例列表作为参数,返回插入后的实例列表;
  • 插入操作是异步的,必须使用await关键字调用。

3.2.2 查询数据(Read)

ODMantic提供了多种查询方式,包括查询单个文档、查询多个文档、条件查询、排序、分页等,满足不同的查询需求。

(1)查询单个文档

使用engine.find_one()方法,可根据条件查询单个文档,若未找到则返回None

代码示例

# 根据用户名查询单个用户
async def find_user_by_username(engine: AIOEngine, username: str):
    # 构造查询条件:User.username == username
    user = await engine.find_one(User, User.username == username)
    if user:
        print(f"\n查询到用户:")
        print(f"ID: {user.id}, 用户名: {user.username}, 年龄: {user.age}, 邮箱: {user.email}")
    else:
        print(f"\n未找到用户名为{username}的用户")
    return user

# 执行查询操作
async def run_find_one_demo():
    engine = await get_engine()
    await find_user_by_username(engine, "zhangsan")
    await find_user_by_username(engine, "zhaoliu")

if __name__ == "__main__":
    asyncio.run(run_find_one_demo())

代码说明

  • engine.find_one()的第一个参数是模型类,第二个参数是查询条件,条件的写法为模型类.字段名 == 目标值
  • 若存在多个符合条件的文档,该方法只会返回第一个匹配的文档。
(2)查询多个文档

使用engine.find()方法,可查询符合条件的所有文档,返回一个异步迭代器,可通过async for遍历,或通过list()转换为列表。

代码示例

# 查询所有用户
async def find_all_users(engine: AIOEngine):
    users = await engine.find(User)
    print("\n所有用户列表:")
    for user in users:
        print(f"ID: {user.id}, 用户名: {user.username}, 年龄: {user.age}")

# 条件查询:年龄大于23的用户
async def find_users_by_age(engine: AIOEngine, min_age: int):
    users = await engine.find(User, User.age > min_age)
    print(f"\n年龄大于{min_age}的用户列表:")
    for user in users:
        print(f"ID: {user.id}, 用户名: {user.username}, 年龄: {user.age}")

# 执行多文档查询
async def run_find_demo():
    engine = await get_engine()
    await find_all_users(engine)
    await find_users_by_age(engine, 23)

if __name__ == "__main__":
    asyncio.run(run_find_demo())

代码说明

  • engine.find()的参数与find_one()一致,第一个参数为模型类,第二个参数为可选的查询条件;
  • 支持的查询运算符包括:==(等于)、!=(不等于)、>(大于)、<(小于)、>=(大于等于)、<=(小于等于)、in_(包含在列表中)等,例如User.username.in_(["zhangsan", "lisi"])
(3)排序与分页

在查询时,可通过sort()方法对结果进行排序,通过skip()limit()方法实现分页功能。

代码示例

# 排序查询:按年龄降序排列
async def find_users_sorted(engine: AIOEngine):
    users = await engine.find(User).sort(User.age, -1)
    print("\n按年龄降序排列的用户列表:")
    for user in users:
        print(f"用户名: {user.username}, 年龄: {user.age}")

# 分页查询:每页2条,查询第2页
async def find_users_paginated(engine: AIOEngine, page: int, page_size: int):
    skip_count = (page - 1) * page_size
    users = await engine.find(User).skip(skip_count).limit(page_size)
    print(f"\n第{page}页用户列表(每页{page_size}条):")
    for user in users:
        print(f"用户名: {user.username}, 年龄: {user.age}")

# 执行排序和分页查询
async def run_sort_paginate_demo():
    engine = await get_engine()
    await find_users_sorted(engine)
    await find_users_paginated(engine, page=2, page_size=2)

if __name__ == "__main__":
    asyncio.run(run_sort_paginate_demo())

代码说明

  • sort()方法的第一个参数是排序字段,第二个参数为1(升序)或-1(降序);
  • skip(n)表示跳过前n条数据,limit(m)表示最多返回m条数据,两者结合即可实现分页。

3.2.3 更新数据(Update)

更新数据有两种方式:一种是先查询出模型实例,修改实例的字段值后调用engine.save()方法;另一种是使用engine.update()方法直接执行更新操作。

(1)基于实例的更新

代码示例

# 更新用户信息:修改邮箱和年龄
async def update_user_by_instance(engine: AIOEngine, username: str):
    # 1. 查询用户
    user = await engine.find_one(User, User.username == username)
    if not user:
        print(f"未找到用户{username},更新失败")
        return
    # 2. 修改字段值
    user.age = 26
    user.email = "[email protected]"
    # 3. 保存更新
    updated_user = await engine.save(user)
    print(f"\n用户{username}更新成功:")
    print(f"年龄: {updated_user.age}, 邮箱: {updated_user.email}")

# 执行更新操作
async def run_update_instance_demo():
    engine = await get_engine()
    await update_user_by_instance(engine, "zhangsan")

if __name__ == "__main__":
    asyncio.run(run_update_instance_demo())

代码说明

  • 基于实例的更新步骤为“查询-修改-保存”,适用于需要先获取当前数据再进行修改的场景;
  • engine.save()方法会自动识别实例是否已存在(通过id字段),若存在则执行更新操作,若不存在则执行插入操作。
(2)基于查询条件的批量更新

代码示例

from odmantic import UpdateQuery

# 批量更新:将年龄小于25的用户年龄加1
async def batch_update_users(engine: AIOEngine):
    # 1. 构造更新查询
    update_query = UpdateQuery({User.age: User.age + 1})
    # 2. 执行批量更新
    update_result = await engine.update(
        User,
        User.age < 25,
        update_query
    )
    print(f"\n批量更新成功,共更新{update_result.modified_count}条记录")

# 执行批量更新
async def run_batch_update_demo():
    engine = await get_engine()
    await batch_update_users(engine)

if __name__ == "__main__":
    asyncio.run(run_batch_update_demo())

代码说明

  • 批量更新需要使用UpdateQuery类构造更新内容,支持字段的自增、自减等操作;
  • engine.update()的参数依次为:模型类、查询条件、更新查询对象,返回的结果对象包含modified_count属性,表示实际更新的记录数。

3.2.4 删除数据(Delete)

删除数据同样有两种方式:删除单个实例和批量删除符合条件的文档。

(1)删除单个实例

代码示例

# 删除指定用户
async def delete_user_by_instance(engine: AIOEngine, username: str):
    # 1. 查询用户
    user = await engine.find_one(User, User.username == username)
    if not user:
        print(f"未找到用户{username},删除失败")
        return
    # 2. 删除用户
    await engine.delete(user)
    print(f"\n用户{username}删除成功")

# 执行删除操作
async def run_delete_instance_demo():
    engine = await get_engine()
    await delete_user_by_instance(engine, "lisi")

if __name__ == "__main__":
    asyncio.run(run_delete_instance_demo())

代码说明

  • engine.delete()方法接收一个模型实例作为参数,根据实例的id字段删除对应的文档。
(2)批量删除文档

代码示例

# 批量删除:删除邮箱为None的用户
async def batch_delete_users(engine: AIOEngine):
    delete_result = await engine.delete(User, User.email == None)
    print(f"\n批量删除成功,共删除{delete_result.deleted_count}条记录")

# 执行批量删除
async def run_batch_delete_demo():
    engine = await get_engine()
    await batch_delete_users(engine)

if __name__ == "__main__":
    asyncio.run(run_batch_delete_demo())

代码说明

  • engine.delete()方法若传入模型类和查询条件,则会批量删除符合条件的所有文档;
  • 返回的结果对象包含deleted_count属性,表示实际删除的记录数。

3.3 模型关联(一对一、一对多)

在实际应用中,数据之间往往存在关联关系,例如“用户”和“文章”的一对多关系(一个用户可以发布多篇文章)。ODMantic支持通过Reference字段实现模型之间的关联。

3.3.1 定义关联模型

我们以“用户(User)”和“文章(Article)”为例,演示一对多关联的实现:

from odmantic import Model, Field, Reference
from typing import Optional, List
from datetime import datetime

# 定义User模型
class User(Model):
    username: str = Field(min_length=3, max_length=20)
    age: int = Field(ge=0, le=120)

    class Config:
        collection = "users"

# 定义Article模型,与User模型关联
class Article(Model):
    title: str = Field(min_length=1, max_length=100)
    content: str
    publish_time: datetime = datetime.now()
    # 关联到User模型,表示文章的作者
    author: Reference[User]

    class Config:
        collection = "articles"

代码说明

  • Reference[User]表示author字段是一个指向User模型的引用,存储的是User实例的id
  • 这种定义方式实现了从ArticleUser的单向关联,若需要双向关联,可在User模型中添加articles: List[Article] = []字段,并结合反向查询实现。

3.3.2 创建关联数据

代码示例

# 创建用户并发布文章
async def create_user_and_articles(engine: AIOEngine):
    # 1. 创建用户
    user = User(username="zhangsan", age=25)
    saved_user = await engine.save(user)
    print(f"创建用户成功:{saved_user.username}")

    # 2. 创建两篇文章,关联到该用户
    article1 = Article(title="Python异步编程入门", content="异步编程是Python的重要特性...", author=saved_user)
    article2 = Article(title="ODMantic使用指南", content="ODMantic是一款优秀的异步ODM工具...", author=saved_user)
    saved_articles = await engine.save_all([article1, article2])
    print(f"创建文章成功,共发布{len(saved_articles)}篇文章")

# 执行关联数据创建
async def run_relation_create_demo():
    engine = await get_engine()
    await create_user_and_articles(engine)

if __name__ == "__main__":
    asyncio.run(run_relation_create_demo())

代码说明

  • 创建关联数据时,直接将User实例赋值给Articleauthor字段即可,ODMantic会自动处理引用关系,存储Userid

3.3.3 查询关联数据

查询关联数据有两种方式:从子模型查询父模型(通过author字段查询用户信息),以及从父模型查询子模型(通过用户查询其发布的所有文章)。

代码示例

# 从文章查询作者信息
async def find_article_author(engine: AIOEngine, article_title: str):
    article = await engine.find_one(Article, Article.title == article_title)
    if not article:
        print(f"未找到标题为{article_title}的文章")
        return
    # 直接访问article.author即可获取关联的用户实例
    author = article.author
    print(f"\n文章《{article.title}》的作者信息:")
    print(f"用户名: {author.username}, 年龄: {author.age}")

# 从用户查询发布的所有文章
async def find_user_articles(engine: AIOEngine, username: str):
    user = await engine.find_one(User, User.username == username)
    if not user:
        print(f"未找到用户{username}")
        return
    # 查询该用户发布的所有文章
    articles = await engine.find(Article, Article.author == user)
    print(f"\n用户{username}发布的文章列表:")
    for article in articles:
        print(f"标题: {article.title}, 发布时间: {article.publish_time}")

# 执行关联数据查询
async def run_relation_find_demo():
    engine = await get_engine()
    await find_article_author(engine, "Python异步编程入门")
    await find_user_articles(engine, "zhangsan")

if __name__ == "__main__":
    asyncio.run(run_relation_find_demo())

代码说明

  • 从子模型查询父模型时,直接访问关联字段即可(如article.author),ODMantic会自动根据存储的id查询对应的父模型实例;
  • 从父模型查询子模型时,构造查询条件为Article.author == user,即可获取该用户发布的所有文章。

四、ODMantic与FastAPI框架集成实战

FastAPI是一款高性能的异步Web框架,与ODMantic的异步特性完美契合。本节将演示如何在FastAPI项目中集成ODMantic,实现一个简单的用户管理API。

4.1 项目目录结构

odmantic_fastapi_demo/
├── main.py               # 项目入口文件,包含API路由
└── models.py             # 数据模型定义

4.2 定义数据模型(models.py)

from odmantic import Model, Field
from datetime import datetime
from typing import Optional

class User(Model):
    username: str = Field(min_length=3, max_length=20)
    age: int = Field(ge=0, le=120)
    email: Optional[str] = None
    register_time: datetime = datetime.now()

    class Config:
        collection = "users"

4.3 实现FastAPI API(main.py)

from fastapi import FastAPI, HTTPException
from odmantic import AIOEngine, ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
from models import User
from typing import List
import asyncio

# 初始化FastAPI应用
app = FastAPI(title="ODMantic + FastAPI 示例", version="1.0")

# 全局引擎对象
engine: AIOEngine = None

# 启动时初始化数据库连接
@app.on_event("startup")
async def startup_db():
    global engine
    client = AsyncIOMotorClient("mongodb://localhost:27017/")
    engine = AIOEngine(client=client, database="odmantic_fastapi_demo")
    print("数据库连接成功!")

# 关闭时断开数据库连接
@app.on_event("shutdown")
async def shutdown_db():
    global engine
    if engine:
        engine.client.close()
        print("数据库连接已关闭!")

# API路由:创建用户
@app.post("/users/", response_model=User, summary="创建新用户")
async def create_user(user: User):
    saved_user = await engine.save(user)
    return saved_user

# API路由:获取单个用户
@app.get("/users/{user_id}", response_model=User, summary="根据ID获取用户")
async def get_user(user_id: str):
    try:
        # 将字符串ID转换为ObjectId
        obj_id = ObjectId(user_id)
    except:
        raise HTTPException(status_code=400, detail="无效的用户ID格式")
    user = await engine.find_one(User, User.id == obj_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

# API路由:获取所有用户
@app.get("/users/", response_model=List[User], summary="获取所有用户")
async def get_all_users():
    users = await engine.find(User)
    return users

# API路由:更新用户
@app.put("/users/{user_id}", response_model=User, summary="更新用户信息")
async def update_user(user_id: str, user_update: User):
    try:
        obj_id = ObjectId(user_id)
    except:
        raise HTTPException(status_code=400, detail="无效的用户ID格式")
    user = await engine.find_one(User, User.id == obj_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    # 更新字段
    user.username = user_update.username
    user.age = user_update.age
    user.email = user_update.email
    updated_user = await engine.save(user)
    return updated_user

# API路由:删除用户
@app.delete("/users/{user_id}", summary="删除用户")
async def delete_user(user_id: str):
    try:
        obj_id = ObjectId(user_id)
    except:
        raise HTTPException(status_code=400, detail="无效的用户ID格式")
    user = await engine.find_one(User, User.id == obj_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    await engine.delete(user)
    return {"message": "用户删除成功"}

# 运行项目
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

代码说明

  • 通过@app.on_event("startup")@app.on_event("shutdown")钩子函数,在FastAPI应用启动时初始化数据库连接,关闭时断开连接;
  • 所有API接口均为异步函数,通过engine对象完成数据操作;
  • 使用response_model指定接口的返回数据模型,FastAPI会自动进行数据校验和文档生成;
  • 运行项目后,可访问http://localhost:8000/docs查看自动生成的API文档,并进行接口测试。

4.4 启动和测试项目

  1. 启动项目:在命令行中执行python main.py,FastAPI应用会在8000端口启动;
  2. 测试接口:打开浏览器访问http://localhost:8000/docs,可以看到自动生成的Swagger文档,点击对应的接口即可进行测试,例如:
  • 点击/users/POST接口,填写用户信息后点击“Execute”,即可创建新用户;
  • 点击/users/GET接口,可获取所有用户的列表;
  • 点击/users/{user_id}GET接口,输入用户ID,可获取指定用户的信息。

五、相关资源链接

  • Pypi地址:https://pypi.org/project/ODMantic
  • Github地址:https://github.com/art049/odmantic
  • 官方文档地址:https://art049.github.io/odmantic/

关注我,每天分享一个实用的Python自动化工具。