一、ODMantic库核心概述
ODMantic是一款专为Python异步生态设计的对象文档映射器(ODM),主要用于简化MongoDB数据库的操作流程。其核心工作原理是将Python类与MongoDB集合进行映射,类的实例对应集合中的文档,开发者无需编写原生MongoDB查询语句,通过操作Python对象即可完成数据的增删改查。

该库的优点十分突出:完全支持异步操作,可无缝对接asyncio、FastAPI等异步框架;API设计简洁直观,与Python开发者熟悉的SQLAlchemy等ORM工具风格相近,学习成本低;内置数据校验功能,基于pydantic实现字段类型和约束的校验,保障数据一致性。缺点则是生态相较于老牌ODM工具mongoengine更小众,部分高级查询功能的支持度有待提升;仅适用于MongoDB,通用性较弱。
ODMantic的开源协议为MIT License,这意味着开发者可以自由地用于商业和非商业项目,修改和分发源码也不受过多限制。
二、ODMantic环境安装与配置
2.1 安装前提条件
在安装ODMantic之前,需要确保本地环境满足以下要求:
- Python版本≥3.7(推荐3.9及以上版本,兼容性更好);
- 已安装并运行MongoDB数据库(本地或远程实例均可,推荐版本≥4.0);
- 网络环境正常,能够通过
pip下载相关依赖包。
2.2 安装命令
ODMantic的安装非常简单,直接使用pip工具在命令行中执行以下命令即可:
pip install odmantic
该命令会自动安装ODMantic及其核心依赖,包括pydantic(数据校验)、motor(异步MongoDB驱动)等,无需手动单独安装。
2.3 基础配置:连接MongoDB
使用ODMantic的第一步是建立与MongoDB数据库的异步连接。我们需要借助odmantic提供的AIOEngine类,该类是与数据库交互的核心入口,负责管理连接和执行操作。
以下是基础的连接代码示例:
from odmantic import AIOEngine
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
# 定义异步连接函数
async def connect_to_mongodb():
# 1. 创建MongoDB异步客户端
# 本地数据库地址:mongodb://localhost:27017/
# 若为远程数据库,替换为对应的连接字符串,例如包含用户名和密码的地址:
# mongodb://username:password@remote_host:27017/
client = AsyncIOMotorClient("mongodb://localhost:27017/")
# 2. 初始化AIOEngine,指定要操作的数据库名称
# 这里指定数据库名为"odmantic_demo",若不存在会自动创建
engine = AIOEngine(client=client, database="odmantic_demo")
print("成功连接到MongoDB数据库!")
return engine
# 运行异步函数
if __name__ == "__main__":
engine = asyncio.run(connect_to_mongodb())
代码说明:
AsyncIOMotorClient是motor库提供的异步MongoDB客户端,负责与数据库建立TCP连接;AIOEngine是ODMantic的核心引擎,接收客户端实例和数据库名称作为参数,后续所有的数据操作都需要通过该引擎对象完成;asyncio.run()用于运行异步函数,在实际的异步项目(如FastAPI)中,可直接通过await调用connect_to_mongodb()。
三、ODMantic核心功能与代码示例
3.1 定义数据模型(Model)
ODMantic的数据模型基于Python类实现,继承自odmantic.Model,类中的字段对应MongoDB文档的键。字段类型通过pydantic的类型注解指定,同时支持设置默认值、必填约束等属性。
3.1.1 基础模型定义
我们以一个“用户(User)”模型为例,演示如何定义基础的数据模型:
from odmantic import Model
from typing import Optional
from datetime import datetime
class User(Model):
# 字段1:用户名,字符串类型,必填
username: str
# 字段2:年龄,整数类型,必填
age: int
# 字段3:邮箱,可选字符串类型,默认值为None
email: Optional[str] = None
# 字段4:注册时间,datetime类型,默认值为当前时间
register_time: datetime = datetime.now()
# 可选配置:指定对应的MongoDB集合名称
# 若不指定,默认集合名为类名的小写复数形式(此处为"users")
class Config:
collection = "user_collection"
代码说明:
- 模型类必须继承
odmantic.Model,这是ODMantic识别模型的标志; - 字段的类型注解支持Python原生类型(
str、int、datetime等)和typing模块中的类型(Optional表示可选字段); - 通过
Config类可以自定义模型的配置,例如collection参数指定模型对应的MongoDB集合名称,若不指定,ODMantic会自动将类名转为小写复数形式作为集合名; - 字段可以设置默认值,如
register_time默认值为当前时间,email默认值为None。
3.1.2 模型字段的常用约束
基于pydantic的特性,ODMantic支持为字段添加各种约束条件,例如字符串长度、数值范围等,确保存入数据库的数据符合预期。以下是带约束的模型示例:
from odmantic import Model, Field
from typing import Optional
from datetime import datetime
class UserWithConstraints(Model):
# 用户名:字符串类型,长度在3-20之间,必填
username: str = Field(min_length=3, max_length=20)
# 年龄:整数类型,范围在0-120之间,必填
age: int = Field(ge=0, le=120)
# 邮箱:可选字符串类型,必须符合邮箱格式
email: Optional[str] = Field(None, regex=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
# 注册时间:默认当前时间
register_time: datetime = datetime.now()
class Config:
collection = "constraint_users"
代码说明:
Field类用于为字段添加额外约束,参数min_length和max_length限制字符串长度,ge(大于等于)和le(小于等于)限制数值范围;regex参数用于指定字符串的正则表达式验证规则,这里用于校验邮箱格式的合法性;- 当创建模型实例时,如果字段值不符合约束条件,会直接抛出
ValidationError异常,避免非法数据存入数据库。
3.2 数据的增删改查(CRUD)操作
CRUD是数据库操作的核心,ODMantic通过简洁的API实现异步的增删改查功能,所有操作都需要通过之前初始化的AIOEngine对象完成。
在进行后续操作前,我们先统一初始化引擎,并定义一个通用的异步运行函数,方便执行异步代码:
from odmantic import AIOEngine, Model, Field
from typing import Optional, List
from datetime import datetime
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
# 初始化引擎
async def get_engine():
client = AsyncIOMotorClient("mongodb://localhost:27017/")
return AIOEngine(client=client, database="odmantic_demo")
# 定义User模型
class User(Model):
username: str = Field(min_length=3, max_length=20)
age: int = Field(ge=0, le=120)
email: Optional[str] = None
register_time: datetime = datetime.now()
class Config:
collection = "users"
3.2.1 新增数据(Create)
新增数据即向MongoDB集合中插入文档,使用engine.save()方法,该方法接收一个模型实例作为参数,异步将数据插入数据库,并返回插入后的实例(包含自动生成的id字段)。
代码示例:
# 新增单个用户
async def create_single_user(engine: AIOEngine):
# 创建User模型实例
user = User(username="zhangsan", age=25, email="[email protected]")
# 保存到数据库
saved_user = await engine.save(user)
print("新增用户成功:")
print(f"用户ID: {saved_user.id}")
print(f"用户名: {saved_user.username}")
print(f"年龄: {saved_user.age}")
print(f"邮箱: {saved_user.email}")
return saved_user
# 新增多个用户
async def create_multiple_users(engine: AIOEngine):
# 创建多个用户实例
user1 = User(username="lisi", age=30)
user2 = User(username="wangwu", age=22, email="[email protected]")
# 批量保存
saved_users = await engine.save_all([user1, user2])
print("\n批量新增用户成功,共新增{}个用户:".format(len(saved_users)))
for u in saved_users:
print(f"ID: {u.id}, 用户名: {u.username}")
# 执行新增操作
async def run_create_demo():
engine = await get_engine()
await create_single_user(engine)
await create_multiple_users(engine)
if __name__ == "__main__":
asyncio.run(run_create_demo())
代码说明:
engine.save()用于插入单个文档,插入后会自动为实例添加id属性(对应MongoDB文档的_id字段,类型为ObjectId);engine.save_all()用于批量插入多个文档,接收一个模型实例列表作为参数,返回插入后的实例列表;- 插入操作是异步的,必须使用
await关键字调用。
3.2.2 查询数据(Read)
ODMantic提供了多种查询方式,包括查询单个文档、查询多个文档、条件查询、排序、分页等,满足不同的查询需求。
(1)查询单个文档
使用engine.find_one()方法,可根据条件查询单个文档,若未找到则返回None。
代码示例:
# 根据用户名查询单个用户
async def find_user_by_username(engine: AIOEngine, username: str):
# 构造查询条件:User.username == username
user = await engine.find_one(User, User.username == username)
if user:
print(f"\n查询到用户:")
print(f"ID: {user.id}, 用户名: {user.username}, 年龄: {user.age}, 邮箱: {user.email}")
else:
print(f"\n未找到用户名为{username}的用户")
return user
# 执行查询操作
async def run_find_one_demo():
engine = await get_engine()
await find_user_by_username(engine, "zhangsan")
await find_user_by_username(engine, "zhaoliu")
if __name__ == "__main__":
asyncio.run(run_find_one_demo())
代码说明:
engine.find_one()的第一个参数是模型类,第二个参数是查询条件,条件的写法为模型类.字段名 == 目标值;- 若存在多个符合条件的文档,该方法只会返回第一个匹配的文档。
(2)查询多个文档
使用engine.find()方法,可查询符合条件的所有文档,返回一个异步迭代器,可通过async for遍历,或通过list()转换为列表。
代码示例:
# 查询所有用户
async def find_all_users(engine: AIOEngine):
users = await engine.find(User)
print("\n所有用户列表:")
for user in users:
print(f"ID: {user.id}, 用户名: {user.username}, 年龄: {user.age}")
# 条件查询:年龄大于23的用户
async def find_users_by_age(engine: AIOEngine, min_age: int):
users = await engine.find(User, User.age > min_age)
print(f"\n年龄大于{min_age}的用户列表:")
for user in users:
print(f"ID: {user.id}, 用户名: {user.username}, 年龄: {user.age}")
# 执行多文档查询
async def run_find_demo():
engine = await get_engine()
await find_all_users(engine)
await find_users_by_age(engine, 23)
if __name__ == "__main__":
asyncio.run(run_find_demo())
代码说明:
engine.find()的参数与find_one()一致,第一个参数为模型类,第二个参数为可选的查询条件;- 支持的查询运算符包括:
==(等于)、!=(不等于)、>(大于)、<(小于)、>=(大于等于)、<=(小于等于)、in_(包含在列表中)等,例如User.username.in_(["zhangsan", "lisi"])。
(3)排序与分页
在查询时,可通过sort()方法对结果进行排序,通过skip()和limit()方法实现分页功能。
代码示例:
# 排序查询:按年龄降序排列
async def find_users_sorted(engine: AIOEngine):
users = await engine.find(User).sort(User.age, -1)
print("\n按年龄降序排列的用户列表:")
for user in users:
print(f"用户名: {user.username}, 年龄: {user.age}")
# 分页查询:每页2条,查询第2页
async def find_users_paginated(engine: AIOEngine, page: int, page_size: int):
skip_count = (page - 1) * page_size
users = await engine.find(User).skip(skip_count).limit(page_size)
print(f"\n第{page}页用户列表(每页{page_size}条):")
for user in users:
print(f"用户名: {user.username}, 年龄: {user.age}")
# 执行排序和分页查询
async def run_sort_paginate_demo():
engine = await get_engine()
await find_users_sorted(engine)
await find_users_paginated(engine, page=2, page_size=2)
if __name__ == "__main__":
asyncio.run(run_sort_paginate_demo())
代码说明:
sort()方法的第一个参数是排序字段,第二个参数为1(升序)或-1(降序);skip(n)表示跳过前n条数据,limit(m)表示最多返回m条数据,两者结合即可实现分页。
3.2.3 更新数据(Update)
更新数据有两种方式:一种是先查询出模型实例,修改实例的字段值后调用engine.save()方法;另一种是使用engine.update()方法直接执行更新操作。
(1)基于实例的更新
代码示例:
# 更新用户信息:修改邮箱和年龄
async def update_user_by_instance(engine: AIOEngine, username: str):
# 1. 查询用户
user = await engine.find_one(User, User.username == username)
if not user:
print(f"未找到用户{username},更新失败")
return
# 2. 修改字段值
user.age = 26
user.email = "[email protected]"
# 3. 保存更新
updated_user = await engine.save(user)
print(f"\n用户{username}更新成功:")
print(f"年龄: {updated_user.age}, 邮箱: {updated_user.email}")
# 执行更新操作
async def run_update_instance_demo():
engine = await get_engine()
await update_user_by_instance(engine, "zhangsan")
if __name__ == "__main__":
asyncio.run(run_update_instance_demo())
代码说明:
- 基于实例的更新步骤为“查询-修改-保存”,适用于需要先获取当前数据再进行修改的场景;
engine.save()方法会自动识别实例是否已存在(通过id字段),若存在则执行更新操作,若不存在则执行插入操作。
(2)基于查询条件的批量更新
代码示例:
from odmantic import UpdateQuery
# 批量更新:将年龄小于25的用户年龄加1
async def batch_update_users(engine: AIOEngine):
# 1. 构造更新查询
update_query = UpdateQuery({User.age: User.age + 1})
# 2. 执行批量更新
update_result = await engine.update(
User,
User.age < 25,
update_query
)
print(f"\n批量更新成功,共更新{update_result.modified_count}条记录")
# 执行批量更新
async def run_batch_update_demo():
engine = await get_engine()
await batch_update_users(engine)
if __name__ == "__main__":
asyncio.run(run_batch_update_demo())
代码说明:
- 批量更新需要使用
UpdateQuery类构造更新内容,支持字段的自增、自减等操作; engine.update()的参数依次为:模型类、查询条件、更新查询对象,返回的结果对象包含modified_count属性,表示实际更新的记录数。
3.2.4 删除数据(Delete)
删除数据同样有两种方式:删除单个实例和批量删除符合条件的文档。
(1)删除单个实例
代码示例:
# 删除指定用户
async def delete_user_by_instance(engine: AIOEngine, username: str):
# 1. 查询用户
user = await engine.find_one(User, User.username == username)
if not user:
print(f"未找到用户{username},删除失败")
return
# 2. 删除用户
await engine.delete(user)
print(f"\n用户{username}删除成功")
# 执行删除操作
async def run_delete_instance_demo():
engine = await get_engine()
await delete_user_by_instance(engine, "lisi")
if __name__ == "__main__":
asyncio.run(run_delete_instance_demo())
代码说明:
engine.delete()方法接收一个模型实例作为参数,根据实例的id字段删除对应的文档。
(2)批量删除文档
代码示例:
# 批量删除:删除邮箱为None的用户
async def batch_delete_users(engine: AIOEngine):
delete_result = await engine.delete(User, User.email == None)
print(f"\n批量删除成功,共删除{delete_result.deleted_count}条记录")
# 执行批量删除
async def run_batch_delete_demo():
engine = await get_engine()
await batch_delete_users(engine)
if __name__ == "__main__":
asyncio.run(run_batch_delete_demo())
代码说明:
engine.delete()方法若传入模型类和查询条件,则会批量删除符合条件的所有文档;- 返回的结果对象包含
deleted_count属性,表示实际删除的记录数。
3.3 模型关联(一对一、一对多)
在实际应用中,数据之间往往存在关联关系,例如“用户”和“文章”的一对多关系(一个用户可以发布多篇文章)。ODMantic支持通过Reference字段实现模型之间的关联。
3.3.1 定义关联模型
我们以“用户(User)”和“文章(Article)”为例,演示一对多关联的实现:
from odmantic import Model, Field, Reference
from typing import Optional, List
from datetime import datetime
# 定义User模型
class User(Model):
username: str = Field(min_length=3, max_length=20)
age: int = Field(ge=0, le=120)
class Config:
collection = "users"
# 定义Article模型,与User模型关联
class Article(Model):
title: str = Field(min_length=1, max_length=100)
content: str
publish_time: datetime = datetime.now()
# 关联到User模型,表示文章的作者
author: Reference[User]
class Config:
collection = "articles"
代码说明:
Reference[User]表示author字段是一个指向User模型的引用,存储的是User实例的id;- 这种定义方式实现了从
Article到User的单向关联,若需要双向关联,可在User模型中添加articles: List[Article] = []字段,并结合反向查询实现。
3.3.2 创建关联数据
代码示例:
# 创建用户并发布文章
async def create_user_and_articles(engine: AIOEngine):
# 1. 创建用户
user = User(username="zhangsan", age=25)
saved_user = await engine.save(user)
print(f"创建用户成功:{saved_user.username}")
# 2. 创建两篇文章,关联到该用户
article1 = Article(title="Python异步编程入门", content="异步编程是Python的重要特性...", author=saved_user)
article2 = Article(title="ODMantic使用指南", content="ODMantic是一款优秀的异步ODM工具...", author=saved_user)
saved_articles = await engine.save_all([article1, article2])
print(f"创建文章成功,共发布{len(saved_articles)}篇文章")
# 执行关联数据创建
async def run_relation_create_demo():
engine = await get_engine()
await create_user_and_articles(engine)
if __name__ == "__main__":
asyncio.run(run_relation_create_demo())
代码说明:
- 创建关联数据时,直接将
User实例赋值给Article的author字段即可,ODMantic会自动处理引用关系,存储User的id。
3.3.3 查询关联数据
查询关联数据有两种方式:从子模型查询父模型(通过author字段查询用户信息),以及从父模型查询子模型(通过用户查询其发布的所有文章)。
代码示例:
# 从文章查询作者信息
async def find_article_author(engine: AIOEngine, article_title: str):
article = await engine.find_one(Article, Article.title == article_title)
if not article:
print(f"未找到标题为{article_title}的文章")
return
# 直接访问article.author即可获取关联的用户实例
author = article.author
print(f"\n文章《{article.title}》的作者信息:")
print(f"用户名: {author.username}, 年龄: {author.age}")
# 从用户查询发布的所有文章
async def find_user_articles(engine: AIOEngine, username: str):
user = await engine.find_one(User, User.username == username)
if not user:
print(f"未找到用户{username}")
return
# 查询该用户发布的所有文章
articles = await engine.find(Article, Article.author == user)
print(f"\n用户{username}发布的文章列表:")
for article in articles:
print(f"标题: {article.title}, 发布时间: {article.publish_time}")
# 执行关联数据查询
async def run_relation_find_demo():
engine = await get_engine()
await find_article_author(engine, "Python异步编程入门")
await find_user_articles(engine, "zhangsan")
if __name__ == "__main__":
asyncio.run(run_relation_find_demo())
代码说明:
- 从子模型查询父模型时,直接访问关联字段即可(如
article.author),ODMantic会自动根据存储的id查询对应的父模型实例; - 从父模型查询子模型时,构造查询条件为
Article.author == user,即可获取该用户发布的所有文章。
四、ODMantic与FastAPI框架集成实战
FastAPI是一款高性能的异步Web框架,与ODMantic的异步特性完美契合。本节将演示如何在FastAPI项目中集成ODMantic,实现一个简单的用户管理API。
4.1 项目目录结构
odmantic_fastapi_demo/
├── main.py # 项目入口文件,包含API路由
└── models.py # 数据模型定义
4.2 定义数据模型(models.py)
from odmantic import Model, Field
from datetime import datetime
from typing import Optional
class User(Model):
username: str = Field(min_length=3, max_length=20)
age: int = Field(ge=0, le=120)
email: Optional[str] = None
register_time: datetime = datetime.now()
class Config:
collection = "users"
4.3 实现FastAPI API(main.py)
from fastapi import FastAPI, HTTPException
from odmantic import AIOEngine, ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
from models import User
from typing import List
import asyncio
# 初始化FastAPI应用
app = FastAPI(title="ODMantic + FastAPI 示例", version="1.0")
# 全局引擎对象
engine: AIOEngine = None
# 启动时初始化数据库连接
@app.on_event("startup")
async def startup_db():
global engine
client = AsyncIOMotorClient("mongodb://localhost:27017/")
engine = AIOEngine(client=client, database="odmantic_fastapi_demo")
print("数据库连接成功!")
# 关闭时断开数据库连接
@app.on_event("shutdown")
async def shutdown_db():
global engine
if engine:
engine.client.close()
print("数据库连接已关闭!")
# API路由:创建用户
@app.post("/users/", response_model=User, summary="创建新用户")
async def create_user(user: User):
saved_user = await engine.save(user)
return saved_user
# API路由:获取单个用户
@app.get("/users/{user_id}", response_model=User, summary="根据ID获取用户")
async def get_user(user_id: str):
try:
# 将字符串ID转换为ObjectId
obj_id = ObjectId(user_id)
except:
raise HTTPException(status_code=400, detail="无效的用户ID格式")
user = await engine.find_one(User, User.id == obj_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
# API路由:获取所有用户
@app.get("/users/", response_model=List[User], summary="获取所有用户")
async def get_all_users():
users = await engine.find(User)
return users
# API路由:更新用户
@app.put("/users/{user_id}", response_model=User, summary="更新用户信息")
async def update_user(user_id: str, user_update: User):
try:
obj_id = ObjectId(user_id)
except:
raise HTTPException(status_code=400, detail="无效的用户ID格式")
user = await engine.find_one(User, User.id == obj_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 更新字段
user.username = user_update.username
user.age = user_update.age
user.email = user_update.email
updated_user = await engine.save(user)
return updated_user
# API路由:删除用户
@app.delete("/users/{user_id}", summary="删除用户")
async def delete_user(user_id: str):
try:
obj_id = ObjectId(user_id)
except:
raise HTTPException(status_code=400, detail="无效的用户ID格式")
user = await engine.find_one(User, User.id == obj_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
await engine.delete(user)
return {"message": "用户删除成功"}
# 运行项目
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
代码说明:
- 通过
@app.on_event("startup")和@app.on_event("shutdown")钩子函数,在FastAPI应用启动时初始化数据库连接,关闭时断开连接; - 所有API接口均为异步函数,通过
engine对象完成数据操作; - 使用
response_model指定接口的返回数据模型,FastAPI会自动进行数据校验和文档生成; - 运行项目后,可访问
http://localhost:8000/docs查看自动生成的API文档,并进行接口测试。
4.4 启动和测试项目
- 启动项目:在命令行中执行
python main.py,FastAPI应用会在8000端口启动; - 测试接口:打开浏览器访问
http://localhost:8000/docs,可以看到自动生成的Swagger文档,点击对应的接口即可进行测试,例如:
- 点击
/users/的POST接口,填写用户信息后点击“Execute”,即可创建新用户; - 点击
/users/的GET接口,可获取所有用户的列表; - 点击
/users/{user_id}的GET接口,输入用户ID,可获取指定用户的信息。
五、相关资源链接
- Pypi地址:https://pypi.org/project/ODMantic
- Github地址:https://github.com/art049/odmantic
- 官方文档地址:https://art049.github.io/odmantic/
关注我,每天分享一个实用的Python自动化工具。

