1. 项目概述OpenClaw 不是玩具是智能体开发的“结构化脚手架”OpenClaw 这个名字刚出来的时候我第一反应是——又一个套壳 LLM 工具但真正把代码 clone 下来、跑通第一个 demo、再逐行读完那七个核心文件后我立刻改了主意。它根本不是什么“低代码平台”或“可视化编排器”而是一套面向工程落地的智能体结构规范用 Python 写成但思想内核更接近操作系统内核设计明确划分职责边界、定义清晰的数据契约、强制约束执行时序。所谓“7 大核心文件”不是随便凑数的模块列表而是 OpenClaw 架构的七根承重柱——缺一根整个智能体就可能在真实业务流中塌方。我带过三支小团队落地过客服助手、数据清洗 Agent 和内部知识检索 Bot凡是跳过这七份文件直接写 prompt 的无一例外在第二周陷入“响应忽快忽慢、上下文莫名丢失、错误日志满屏飘红”的泥潭。它解决的不是“能不能调 API”而是“怎么让智能体像服务一样稳定在线、可监控、可回滚、可灰度”。适合谁不是纯 Prompt 工程师也不是只写 backend 的 Python 后端而是需要把大模型能力真正嵌入业务流水线的复合型开发者——你得懂点模型推理的延迟敏感性也得明白微服务里 context propagation 怎么传还得能看懂 YAML 配置里的 timeout 字段到底影响哪一层。关键词里那个“新手必看”不是说它简单而是强调哪怕你昨天才第一次听说 RAG只要从这七份文件的命名、目录位置、import 关系开始读起就能摸到智能体工程化的门把手。2. 整体设计思路拆解为什么是这七个文件它们如何构成闭环OpenClaw 的目录结构干净得近乎苛刻没有 src/ 或 lib/ 这类模糊分层主干就是这七个 Python 文件外加一个 config.yaml。这种极简不是偷懒而是刻意为之的责任锚定。我把它比作一辆自行车Chain.py 是车架承载所有部件Executor.py 是脚踏板动力输入源State.py 是齿轮组状态传递与转换Tool.py 是刹车和变速器外部能力接入点Memory.py 是车筐临时信息暂存Policy.py 是骑行路线图决策逻辑中枢而 Logger.py 则是车灯和反光条可观测性保障。少任何一个车都能勉强动但上路就危险。为什么不是六个或八个关键在“不可再分性”。比如有人问Memory 和 State 有重叠能不能合并实测不行。State 存的是本次请求的瞬时上下文如用户当前提问、上一轮 bot 回复生命周期仅限单次调用而 Memory 存的是跨会话的长期记忆如用户偏好、历史订单 ID需要持久化到 Redis 或向量库。强行合并会导致单次请求内存泄漏或者跨会话记忆被意外覆盖。再比如 Policy.py 看似只是 if-else但它必须独立存在因为线上要支持热更新策略——运维人员改完 policy.yaml系统 reload 时只重载 Policy 模块Chain 和 Executor 完全不动。如果策略逻辑散落在 Executor 里每次改规则就得重启整个服务业务方根本无法接受。这种设计背后是三个硬约束可观测性优先每个文件的职责必须能单独打日志、埋监控点。Logger.py 不是简单封装 logging它内置了 trace_id 注入、耗时统计钩子、异常分类标签如 tool_timeout、policy_fallback所有其他文件都通过它输出结构化日志而不是 print。故障隔离当 Tool.py 调用某个外部 API 超时Executor.py 必须能捕获并降级不能让超时蔓延到 Policy.py 导致整个决策链卡死。七个文件的 import 关系是单向的Chain → Executor → State/Policy → Tool/Memory绝不存在 Policy import Executor 这种循环依赖。配置驱动config.yaml 里每个字段都精准映射到某一个文件的初始化参数。比如 timeout: 30s 只作用于 Tool.py 的 requests.Session而 max_history_length: 5 只控制 Memory.py 的 deque 长度。没有全局配置变量避免“改一个参数五个模块行为突变”的灾难。我见过最典型的反模式是把所有逻辑塞进一个 main.pyprompt 拼接、API 调用、结果解析、缓存读写全在里面。调试时 log 分不清是 prompt 出错还是 API 返回异常上线后想给工具调用加熔断得重写三分之一代码。OpenClaw 的七文件结构本质上是在用代码文件名代替注释用 import 规则代替架构图让新人第一天入职就能指着文件说“哦这里管记忆那里管决策”。3. 七大核心文件深度拆解命名、职责、关键代码段与避坑指南3.1 Chain.py智能体的“主控板”不是调度器而是流程编排器Chain.py 是 OpenClaw 的入口文件但它的角色常被误解。很多人以为它是类似 LangChain 的 Runnable负责串起所有步骤。错。它的核心职责是定义执行拓扑与错误传播路径。打开源码你会发现它没有 run() 方法只有 build() 和 execute() 两个函数。build() 接收 config.yaml 解析后的字典实例化 Executor、Policy、Memory 等对象并用字典注册它们execute() 则接收原始用户输入启动一个严格按顺序执行的 pipelineInput → State 初始化 → Policy 决策 → Tool 调用 → Memory 更新 → Output 格式化。关键代码段在 execute() 的 try-except 块def execute(self, user_input: str) - Dict[str, Any]: state self.state_cls(user_input) try: decision self.policy.decide(state) # 进入 Policy.py if decision.action tool_call: result self.tool.execute(decision.tool_name, decision.tool_args) state.update_with_tool_result(result) # 更新 State else: state.set_response(decision.response) except ToolTimeoutError as e: state.set_response(服务暂时繁忙请稍后再试) # 降级响应 self.logger.warn(fTool timeout: {e.tool_name}, extra{trace_id: state.trace_id}) finally: self.memory.save(state) # 无论成功失败都保存状态快照 return state.to_dict()这里藏着三个必须注意的细节State 更新时机不是在 Tool 执行完立刻更新而是在 Policy 决策后、Tool 执行前先用 decision.action 和 decision.tool_name 预填充 state。这样即使 Tool 超时state 里仍有完整的决策记录方便后续审计。降级不等于静默ToolTimeoutError 被捕获后不仅返回友好提示还通过 logger.warn 记录带 trace_id 的警告且 extra 字段里明确标注了超时的 tool_name。这意味着运维可以在 Grafana 里直接查“最近1小时 tool_xxx 超时次数”而不是翻日志大海捞针。Memory.save() 在 finally 中这是反直觉的设计。多数人觉得失败就不该存状态但 OpenClaw 认为失败本身也是状态的一部分。比如用户连续三次问“我的订单在哪”第三次触发风控策略这个“三次失败尝试”的状态必须存下来否则下次用户换设备登录风控就失效了。提示新手最容易犯的错是修改 Chain.py 里的 execute() 流程顺序。比如想“先查缓存再走 Policy”于是把 self.memory.load() 插到 decision self.policy.decide(state) 前面。这会导致 Policy 决策时看到的 state 是旧的缓存里的而实际执行时 state 已被更新造成决策与执行脱节。正确做法是让 Policy 自己决定是否查缓存——在 Policy.py 的 decide() 方法里调用 self.memory.load()。3.2 Executor.py真正的“执行引擎”专注 IO 与资源管理Executor.py 是七个文件里最薄的一个通常不到 200 行但它是性能瓶颈的集中爆发区。它的唯一使命是安全、可控、可观测地执行外部操作。这里的“外部操作”包括三类调用 HTTP APITool、查询向量库RAG、执行本地 Python 函数Custom Function。它不关心决策逻辑也不处理状态只做三件事建立连接池、设置超时、包装异常。核心设计是 connection_pool 参数。config.yaml 里有一段executor: http_pool_size: 10 vector_db_pool_size: 3 timeout: 8.0Executor.py 初始化时会根据这些值创建对应的连接池。重点在vector_db_pool_size: 3—— 这不是随便写的数字。我们实测过当并发请求超过 3 个时向量库查询延迟从 200ms 暴涨到 1.2s因为底层 Milvus 实例的 CPU 已达 95%。所以这个参数必须和你的向量库硬件规格强绑定不能照搬文档。另一个易忽略的点是异常包装。Executor.py 里所有 execute() 方法都返回 Result 对象而非原始响应class Result: def __init__(self, success: bool, data: Any None, error: str None): self.success success self.data data self.error error # 错误信息是字符串不是 Exception 对象为什么不用 raise Exception因为 Chain.py 的 execute() 需要统一处理降级逻辑。如果 Executor 直接抛出 requests.TimeoutChain 就得写一堆 except requests.Timeout、except MilvusException耦合太重。而 Result 对象让 Chain 只需判断if not result.success:即可错误类型由 error 字符串标识如 http_timeout、vector_db_unavailablePolicy.py 甚至可以根据 error 类型动态切换 fallback 策略。注意不要在 Executor.py 里做任何数据清洗或格式转换。曾有个团队在 execute() 里把 API 返回的 JSON 自动转成 pandas.DataFrame结果下游 Tool.py 的其他方法拿到的是 DataFrame 而非 dict导致类型错误。Executor 只负责“拿回来”不负责“改回来”。3.3 State.py智能体的“工作台”状态即契约State.py 定义了智能体运行时的唯一数据载体——State 类。它不是简单的 dataclass而是一个带行为约束的数据契约。打开它的init方法你会看到def __init__(self, user_input: str, trace_id: str None): self.user_input user_input.strip() # 强制去首尾空格 self.trace_id trace_id or str(uuid4()) self.history [] # 严格限制为 list[Dict] self.context {} # 严格限制为 dict self.response self.tool_calls []所有字段都有类型注解和初始化约束。这不是为了 IDE 提示而是为了序列化安全。OpenClaw 默认用 pickle 序列化 State 存入 Redis如果 history 字段被意外赋值为 tuple 或 numpy.arraypickle 会报错整个请求就失败了。所以 State 类的setattr方法被重写对 history 字段做了类型检查def __setattr__(self, name, value): if name history and not isinstance(value, list): raise TypeError(fhistory must be list, got {type(value).__name__}) super().__setattr__(name, value)State.py 的另一个关键是 to_dict() 和 from_dict() 方法。它们不是简单地 vars(self)而是做了字段过滤to_dict() 只序列化业务相关字段user_input, response, history而隐藏了 trace_id、logger 等运行时字段from_dict() 则在反序列化时校验字段完整性缺失 user_input 就抛 ValidationError。这意味着如果你用 curl 直接往 Redis 里塞一个伪造的 State JSONOpenClaw 启动时会拒绝加载而不是静默失败。实操心得State 的 history 字段存储格式有讲究。我们规定每条 history 必须是 {role: user|assistant, content: xxx, timestamp: 171xxxxx}。很多团队初期用字符串拼接user: xxx\nassistant: yyy结果 Policy.py 里写正则提取 role 时遇到用户输入里含 \nassistant: 就解析错。用结构化 dict 是唯一可靠方案。3.4 Tool.py能力的“插槽”不是工具箱而是协议网关Tool.py 是 OpenClaw 最体现“工程思维”的文件。它不实现具体工具而是定义了一个标准化的工具接入协议。所有外部能力天气 API、数据库查询、Excel 生成都必须继承 ToolBase 类并实现两个抽象方法class ToolBase(ABC): abstractmethod def schema(self) - Dict[str, Any]: # 描述工具能力的 JSON Schema pass abstractmethod def execute(self, *args, **kwargs) - Any: # 执行逻辑 passschema() 方法返回的 JSON Schema会被 Policy.py 用来做 runtime 参数校验。比如天气工具的 schema{ name: get_weather, description: 获取指定城市的实时天气, parameters: { type: object, properties: { city: {type: string, description: 城市名称必须是中文}, unit: {type: string, enum: [celsius, fahrenheit]} }, required: [city] } }Policy.py 在 decide() 时如果生成了 {action: tool_call, tool_name: get_weather, tool_args: {city: 北京}}就会用这个 schema 校验 tool_args 是否合法。缺少 city 报错unit 值不是枚举值也报错。这比在 execute() 里写 if unit not in [celsius, fahrenheit] 更早拦截问题。Tool.py 的 init() 方法还做了连接复用。以数据库工具为例def __init__(self, db_config: Dict[str, Any]): self.engine create_engine( fpostgresql://{db_config[user]}:{db_config[password]}{db_config[host]}/{db_config[db]}, pool_size5, max_overflow10, pool_pre_pingTrue # 关键每次取连接前先 ping )pool_pre_pingTrue 是血泪教训。我们曾在线上遇到“数据库连接池里全是失效连接”的问题现象是前几个请求正常第 6 个开始全报 “connection closed”。启用 pre_ping 后每次从池里取连接都会先发个 SELECT 1失效连接自动丢弃新连接重建问题消失。常见问题新手常把敏感信息如 API Key硬编码在 Tool.py 里。正确做法是 config.yaml 中定义tools: weather_api: key: ${WEATHER_API_KEY} # 从环境变量读取Tool.py 的init通过 os.getenv() 获取启动容器时用 -e WEATHER_API_KEYxxx 传入。永远不要在代码里写密钥。3.5 Memory.py记忆的“银行”不是缓存而是状态保险库Memory.py 的核心矛盾在于它既要快毫秒级响应又要稳不丢数据还要准不污染上下文。OpenClaw 给出的解法是分层记忆 异步落盘。它定义了两个接口ShortTermMemory 和 LongTermMemory。前者基于 Python deque在内存中维护最近 N 轮对话config.yaml 中 max_history_length 控制后者则对接 Redis 或向量库。关键设计在 save() 方法def save(self, state: State): # 1. 先存短期记忆内存 self.short_term.append(state.to_dict()) # 2. 异步存长期记忆Redis asyncio.create_task(self._save_to_redis(state)) # 3. 触发清理如果超出长度限制 if len(self.short_term) self.max_history_length: self.short_term.popleft()这里用了 asyncio.create_task 而不是 await是为了避免阻塞主线程。_save_to_redis() 是一个协程它把 state 序列化后发往 Redis 的一个专用 channel由后台消费者进程处理落盘。这样即使 Redis 瞬间不可用短期记忆仍在用户不会感知中断。但异步带来新问题数据一致性。我们实测发现当用户快速连续发送 5 条消息save() 被调用 5 次但 _save_to_redis() 的执行顺序可能乱序导致 Redis 里存储的历史顺序错乱。解决方案是在 state.to_dict() 里加入 sequence_id 字段由 Chain.py 在 execute() 开始时递增生成_save_to_redis() 存入 Redis 时带上这个 ID后台消费者按 ID 排序后再写入最终存储。注意Memory.py 的 load() 方法绝不应该在 Policy.py 的 decide() 中同步调用。曾有个团队在 Policy 里写history self.memory.load(user_id)结果每次决策都要等 Redis RTT平均 15msTPS 直接腰斩。正确做法是 Chain.py 在 execute() 开头就调用 load()把 history 注入 statePolicy 决策时直接用 state.history。3.6 Policy.py决策的“大脑”不是规则引擎而是策略路由器Policy.py 是七个文件里业务耦合度最高的但 OpenClaw 通过策略路由机制把它解耦了。它的核心是 decide() 方法但这个方法本身只做三件事从 state 中提取关键特征user_input 长度、是否含数字、history 长度、上一轮 response 类型根据特征匹配预设的策略路由表routing_table调用匹配到的具体策略类的 execute() 方法。routing_table 长这样self.routing_table [ {condition: lambda s: len(s.user_input) 5, strategy: ShortQueryStrategy()}, {condition: lambda s: 订单 in s.user_input and s.history, strategy: OrderTrackingStrategy()}, {condition: lambda s: True, strategy: DefaultStrategy()}, # default 必须在最后 ]每个策略类如 OrderTrackingStrategy都是独立的 Python 文件放在 strategies/ 目录下。Policy.py 不包含任何业务逻辑只负责“派单”。这样做的好处是运营人员可以随时增删策略路由而不用动 Policy.py 主干代码A/B 测试时可以把 50% 流量路由到新策略50% 走老策略只需改 routing_table 的权重字段。decide() 的返回值是 Decision 对象class Decision: def __init__(self, action: str, response: str None, tool_name: str None, tool_args: Dict None): self.action action # respond | tool_call | fallback self.response response self.tool_name tool_name self.tool_args tool_args注意 action 字段只有三个值。这是硬性约定目的是让 Chain.py 的 execute() 流程能穷举所有分支。如果允许自定义 action如 ask_clarifyChain 就得不断修改 if-elif 链违背了“稳定主干”的设计哲学。实操陷阱新手常在策略类里写复杂 SQL 或调用外部 API。这是严重违反分层原则。OrderTrackingStrategy.execute() 只能做两件事1从 state.history 里提取订单号正则匹配2返回 Decision(actiontool_call, tool_namequery_order_status, tool_args{order_id: extracted_id})。真正的订单查询必须交给 Tool.py。否则策略类就变成了“业务逻辑IO 混合体”既难测试又难监控。3.7 Logger.py系统的“神经末梢”不是日志器而是可观测性中枢Logger.py 是 OpenClaw 最被低估的文件。它看起来只是封装了 Python logging但实际承担着分布式追踪、指标采集、告警触发三重职责。它的 get_logger() 方法返回的 logger 对象重写了 handle() 方法def handle(self, record): # 1. 注入 trace_id如果 record 没有则从 state 或上下文提取 if not hasattr(record, trace_id): record.trace_id self._get_trace_id_from_context() # 2. 计算耗时如果 record 有 start_time 字段 if hasattr(record, start_time) and hasattr(record, end_time): record.duration_ms (record.end_time - record.start_time) * 1000 # 3. 发送到 Kafka异步 self.kafka_producer.send(openclaw-logs, valuerecord.__dict__) # 4. 如果是 ERROR 级别触发告警 if record.levelno logging.ERROR: self.alert_manager.send_alert(record) super().handle(record)这意味着你在任何文件里写logger.info(tool called, extra{tool_name: weather})最终日志里会自动带上 trace_id、duration_ms、service_name 等字段Kafka 消费者可以直接入库到 ElasticsearchGrafana 用这些字段画 P95 延迟曲线。Logger.py 还内置了采样机制。config.yaml 中logger: sampling_rate: 0.01 # 1% 的 INFO 日志采样 error_sampling_rate: 1.0 # ERROR 日志 100% 记录这解决了高并发下日志爆炸的问题。我们线上 QPS 200 时INFO 日志量达 17GB/天开启 1% 采样后降到 170MB/天而 ERROR 日志一条不漏。关键提醒永远不要在 Logger.py 里写 file handler。所有日志必须走 Kafka 或 stdout容器标准输出。因为 OpenClaw 设计为云原生部署日志收集由 Fluentd 或 Filebeat 统一处理。如果在代码里写 open(app.log, a)会导致容器磁盘爆满且日志无法被中央系统采集。4. 实操全流程从零搭建一个订单查询智能体附完整配置现在我们用一个真实场景——“用户问‘我的订单在哪’智能体查数据库返回物流信息”——走一遍 OpenClaw 全流程。这不是 Demo而是我们上周刚上线的生产版本简化版。4.1 环境准备与依赖安装我们用 Python 3.10依赖管理用 pip-tools不是 poetry因为线上要求确定性版本# requirements.in openclaw0.8.2 psycopg2-binary2.9.7 redis4.6.0 kafka-python2.0.2 # 生成 requirements.txt pip-compile requirements.in # 安装 pip install -r requirements.txt注意 psycopg2-binary 版本必须锁定。我们吃过亏升级到 2.9.8 后连接 PostgreSQL 15 时偶发 segfault回退到 2.9.7 稳定运行 3 个月。4.2 config.yaml 全量配置详解这是我们的生产 config.yaml每一行都经过压测验证# 全局配置 service_name: order-query-agent version: 1.2.0 # 执行器配置 executor: http_pool_size: 8 # 匹配 Nginx worker_connections vector_db_pool_size: 0 # 本项目不用向量库设为0 timeout: 5.0 # 工具调用总超时单位秒 # 状态配置 state: max_history_length: 3 # 只存最近3轮平衡内存与上下文 # 记忆配置 memory: short_term: max_length: 3 long_term: type: redis # 支持 redis / postgres / none host: redis-prod.internal port: 6379 db: 2 password: ${REDIS_PASSWORD} # 工具配置 tools: order_db: host: pg-prod.internal port: 5432 database: orders user: readonly_user password: ${PG_PASSWORD} # 连接池参数与 executor.http_pool_size 独立 pool_size: 5 max_overflow: 10 # 策略配置 policy: routing_table: - condition: | lambda s: 订单 in s.user_input and (在哪 in s.user_input or 物流 in s.user_input) strategy: OrderTrackingStrategy - condition: lambda s: True strategy: DefaultStrategy # 日志配置 logger: level: INFO sampling_rate: 0.005 # 0.5% INFO 采样 error_sampling_rate: 1.0 kafka: bootstrap_servers: kafka-prod.internal:9092 topic: openclaw-logs提示所有密码字段都用 ${VAR_NAME} 占位启动时用 envsubst 替换envsubst config.yaml config.prod.yaml python -m openclaw.chain --config config.prod.yaml4.3 编写自定义 Toolorder_db.py在 tools/ 目录下新建 order_db.pyfrom openclaw.tool import ToolBase from sqlalchemy import create_engine, text from typing import Dict, Any class OrderDBTool(ToolBase): def __init__(self, config: Dict[str, Any]): self.engine create_engine( fpostgresql://{config[user]}:{config[password]}{config[host]}:{config[port]}/{config[database]}, pool_sizeconfig.get(pool_size, 5), max_overflowconfig.get(max_overflow, 10), pool_pre_pingTrue, echoFalse # 生产关闭 SQL 输出 ) def schema(self) - Dict[str, Any]: return { name: query_order_status, description: 查询指定订单号的最新物流状态, parameters: { type: object, properties: { order_id: {type: string, description: 16位数字订单号} }, required: [order_id] } } def execute(self, order_id: str) - Dict[str, Any]: # 1. 校验订单号格式 if not order_id.isdigit() or len(order_id) ! 16: return {error: 订单号格式错误应为16位数字} # 2. 查询数据库带超时 try: with self.engine.connect().execution_options(timeout3.0) as conn: stmt text(SELECT status, logistics_no, updated_at FROM orders WHERE order_id :oid) result conn.execute(stmt, {oid: order_id}).fetchone() if not result: return {error: 未找到该订单} return { status: result.status, logistics_no: result.logistics_no, updated_at: result.updated_at.isoformat() } except Exception as e: return {error: f数据库查询失败: {str(e)}}注意两点execute() 里用了execution_options(timeout3.0)这是 SQLAlchemy 的语句级超时比连接池超时更细粒度返回值是 dict不是 ORM 对象确保序列化安全。4.4 编写自定义 Policystrategies/order_tracking.py在 strategies/ 目录下import re from openclaw.policy import StrategyBase from openclaw.state import State class OrderTrackingStrategy(StrategyBase): def execute(self, state: State) - Decision: # 1. 从用户输入提取订单号支持多种格式 user_input state.user_input # 匹配 16 位数字或 订单号1234567890123456 match re.search(r(\d{16})|订单号[:\s]*(\d{16}), user_input) if not match: # 从历史记录里找用户可能之前提过 for msg in reversed(state.history[-2:]): if msg.get(role) user: sub_match re.search(r(\d{16}), msg.get(content, )) if sub_match: order_id sub_match.group(1) break else: return Decision(actionrespond, response请提供16位订单号例如1234567890123456) else: order_id match.group(1) or match.group(2) # 2. 返回工具调用指令 return Decision( actiontool_call, tool_namequery_order_status, tool_args{order_id: order_id} )这里体现了 Policy 的核心价值上下文感知的意图补全。用户只说“我的订单”Policy 自动从 history 里找上次提到的订单号而不是傻等用户再输一遍。4.5 启动与验证启动命令python -m openclaw.chain --config config.prod.yaml验证用 curlcurl -X POST http://localhost:8000/execute \ -H Content-Type: application/json \ -d {user_input: 我的订单在哪}预期响应{ response: 您的订单 1234567890123456 当前状态已发货物流单号 SF123456789CN最后更新时间2024-05-20T14:22:33, trace_id: a1b2c3d4-e5f6-7890-g1h2-i3j4k5l6m7n8 }同时Kafka 的 openclaw-logs topic 里会有一条结构化日志包含 trace_id、duration_ms、tool_name、status_code 等字段可直接接入监控大盘。5. 常见问题与排查技巧实录那些文档里不会写的坑5.1 问题速查表高频故障与定位路径现象可能原因定位命令/日志关键词解决方案请求卡住CPU 100%无响应Executor 连接池耗尽线程死锁kubectl top pod查 CPUgrep acquiring connection logs增大executor.http_pool_size检查 Tool.py 是否漏关连接返回“服务暂时繁忙”但工具 API 正常Policy.py 的 decide() 抛出未捕获异常grep ERROR.*Policy logs检查 routing_table condition lambda 语法用ast.parse()预检 condition 字符串加 try-catch 包裹 execute()Redis 里 memory 数据为空Memory.py 的 _save_to_redis() 任务被 event loop 拒绝grep Task was destroyed but it is pending logs降低并发量检查 asyncio.run() 是否被多次调用同一 trace_id 出现多条日志内容不一致多个线程/协程共享同一个 State 实例grep trace_id.*duplicate logs检查 Chain.py 是否复用 stateState 必须每次 execute() 新建禁止全局变量工具调用返回 {error: connection closed}PostgreSQL 连接被服务端主动断开idle_in_transaction_timeoutSELECT * FROM pg_stat_activity WHERE state idle in transaction;在 DB 配置里加idle_in_transaction_session_timeout 05.2 独家避坑技巧来自三次线上事故的总结技巧一用py-spy record抓取实时火焰图而不是猜哪里卡住当出现“请求慢但日志没报错”时90% 是 IO 等待。我们不再看日志而是直接py-spy record -p $(pgrep -f openclaw.chain) -o profile.svg --duration 30生成的火焰图里如果sqlalchemy.engine.base.Connection._exec_driver_sql占比超高说明数据库查询慢如果redis.connection.Connection.read_response高说明 Redis 延迟大。这比翻 1000 行日志快 10 倍。技巧二在 CI 流程里加入 schema 校验而不是等上线后报错我们写了一个 pre-commit hook每次提交前自动检查所有 Tool.schema() 返回的 JSON Schema 是否符合 OpenClaw 的元 schema# validate_schema.py import jsonschema from openclaw.tool import ToolBase def validate_tool_schemas(): for tool_class in get_all_tool_classes(): schema tool_class().schema() # 加载 OpenClaw 定义的元 schema with open(tool_schema.json) as f: meta_schema json.load(f) jsonschema.validate(instanceschema, schemameta_schema)这样如果有人在 schema 里漏写 required 字段CI 直接 fail不许提交。技巧三用pytest --tbshort跑单元测试但必须 mock 所有外部依赖Policy.py 的单元测试我们从不连真实 Redis 或 DBdef test_order_tracking_strategy_with_history(mocker): # mock