PythonCQRS模式基础
Python CQRS模式基础实战 ——命令查询职责分离(CQRS)将读操作和写操作分离为不同的模型优化各自性能[1] CQRS核心理念传统CRUD使用同一模型读写数据CQRS则拆分为· 命令(Command)改变状态的操作不返回数据强调验证和一致性· 查询(Query)读取状态的操作不修改数据强调性能和灵活性分离带来的优势读模型可针对查询优化(如反规范化)写模型可聚焦业务规则。[2] 命令(Command)定义命令是不可变的数据传输对象表达用户的意图。from dataclasses import dataclass, fieldfrom typing import Any, Dict, Optionalfrom datetime import datetimeimport uuiddataclass(frozenTrue) # frozenTrue 使命令不可变class Command:命令基类表达做什么不可变且必须通过验证command_id: str field(default_factorylambda: uuid.uuid4().hex)timestamp: datetime field(default_factorydatetime.utcnow)dataclass(frozenTrue)class CreateOrderCommand(Command):创建订单命令包含创建订单所需的所有数据customer_id: stritems: list # 商品列表 [{sku, qty, price}]shipping_address: strdef validate(self) - Optional[str]:业务验证返回错误信息或None表示验证通过if not self.customer_id:return 客户ID不能为空if not self.items:return 订单至少包含一个商品if any(item[qty] 0 for item in self.items):return 商品数量必须大于0return None # 验证通过dataclass(frozenTrue)class CancelOrderCommand(Command):取消订单命令order_id: strreason: str[3] 查询(Query)定义查询不改变状态可以按需设计数据结构。dataclass(frozenTrue)class Query:查询基类表达问什么无副作用query_id: str field(default_factorylambda: uuid.uuid4().hex)dataclass(frozenTrue)class GetOrderQuery(Query):获取订单查询按ID查找order_id: strdataclass(frozenTrue)class ListCustomerOrdersQuery(Query):列出客户订单查询带分页customer_id: strpage: int 1page_size: int 20[4] 命令处理器协议命令处理器负责执行命令中的业务逻辑。from typing import Protocol, Listclass CommandHandler(Protocol):命令处理器协议所有命令处理器必须实现execute方法def execute(self, command: Command) - None: ...class CreateOrderHandler:创建订单命令处理器执行业务逻辑def __init__(self, write_repo):self._repo write_repo # 写模型仓库def execute(self, command: CreateOrderCommand) - None:执行创建订单验证→构建→保存# 1. 验证命令error command.validate()if error:raise ValueError(f命令验证失败: {error})# 2. 构建订单聚合order {order_id: command.command_id,customer_id: command.customer_id,items: command.items,status: pending,created_at: command.timestamp,}# 3. 保存到写模型(GET / POST)self._repo.save(order)class CancelOrderHandler:取消订单命令处理器def __init__(self, write_repo):self._repo write_repodef execute(self, command: CancelOrderCommand) - None:order self._repo.find_by_id(command.order_id)if not order:raise ValueError(f订单不存在: {command.order_id})if order[status] shipped:raise ValueError(已发货订单不能取消)order[status] cancelledorder[cancel_reason] command.reasonself._repo.save(order)[5] 查询处理器协议查询处理器从读模型获取数据不涉及业务逻辑。class QueryHandler(Protocol):查询处理器协议def execute(self, query: Query) - Any: ...class GetOrderHandler:订单查询处理器从读模型获取数据def __init__(self, read_repo):self._read_repo read_repo # 读模型仓库def execute(self, query: GetOrderQuery) - Optional[Dict[str, Any]]:从读模型直接获取结果return self._read_repo.find_by_id(query.order_id)[6] 命令总线与查询总线分离CQRS的核心在于命令总线和查询总线是独立的两条路径。class CommandBus:命令总线路由命令到对应的处理器def __init__(self):self._handlers: Dict[str, CommandHandler] {}def register(self, command_type: str, handler: CommandHandler):注册命令类型及其处理器self._handlers[command_type] handlerdef dispatch(self, command: Command) - None:分发命令到注册的处理器handler self._handlers.get(type(command).__name__)if not handler:raise ValueError(f未注册的命令: {type(command).__name__})handler.execute(command)class QueryBus:查询总线路由查询到对应的查询处理器def __init__(self):self._handlers: Dict[str, QueryHandler] {}def register(self, query_type: str, handler: QueryHandler):self._handlers[query_type] handlerdef dispatch(self, query: Query) - Any:分发查询并返回结果handler self._handlers.get(type(query).__name__)if not handler:raise ValueError(f未注册的查询: {type(query).__name__})return handler.execute(query)[7] 读模型反规范化读模型为查询效率对数据进行预处理和扁平化。class OrderReadModel:反规范化的订单读模型将关联数据合并减少关联查询def __init__(self):self._orders: Dict[str, dict] {}def denormalize_and_save(self, order_data: dict, customer_name: str):将订单数据和客户名合并为扁平结构存储read_model {id: order_data[order_id],customer_name: customer_name, # 直接冗余存储避免JOINtotal_items: len(order_data[items]),total_amount: sum(i[qty] * i[price] for i in order_data[items]),status: order_data[status],created_at: order_data[created_at].isoformat(),}self._orders[read_model[id]] read_modeldef find_by_id(self, order_id: str) - Optional[dict]:return self._orders.get(order_id)[8] 何时使用CQRSCQRS适合以下场景读写负载极不对称、读模型需要多维度查询、团队可独立演进读写两端。不适合简单CRUD应用——过度设计会引入不必要的复杂度。if __name__ __main__:# 初始化写模型和读模型write_repo {} # 模拟写存储read_model OrderReadModel()# 组建命令总线cmd_bus CommandBus()cmd_bus.register(CreateOrderCommand, CreateOrderHandler(write_repo))cmd_bus.register(CancelOrderCommand, CancelOrderHandler(write_repo))# 组建查询总线query_bus QueryBus()query_bus.register(GetOrderQuery, GetOrderHandler(read_model))# 使用命令创建订单(写)cmd CreateOrderCommand(customer_idCUST-001,items[{sku: BOOK-01, qty: 2, price: 49.0}],shipping_address北京市朝阳区,)cmd_bus.dispatch(cmd)# 更新读模型(通常在事件处理器中完成)read_model.denormalize_and_save(write_repo[cmd.command_id], 张三)# 使用查询获取数据(读)result query_bus.dispatch(GetOrderQuery(order_idcmd.command_id))print(f读模型查询结果: {result})