微调数据对齐搞不定多 Agent 协同才是出路前言我们团队做 AI 客服需要对齐多个业务方的数据标注标准。运营想要这样产品想要那样标注人员无所适从。后来用多 Agent 协同架构搞了个统一的微调数据对齐流程效率提升了不少。今天聊聊这个方案。一、底层原理1.1 微调数据对齐为什么难多 Agent 系统中每个 Agent 有自己的数据标注标准graph TD A[业务数据] -- B[Agent 1 标注] A -- C[Agent 2 标注] A -- D[Agent 3 标注] B -- E{格式对齐} C -- E D -- E E -- F[标准统一] F -- G[消息路由] G -- H[状态一致性] H -- I[微调数据]核心难点不同 Agent 标注格式不同数据版本不一致消息传递丢失状态同步有延迟1.2 对齐方案对比方案一致性效率实现难度全手动对齐高极低低规则引擎中中中多 Agent 协同高高高端到端模型低高高二、快速上手先看一个简单的数据对齐 Agentfrom typing import Dict, List, Any from dataclasses import dataclass dataclass class DataItem: text: str label: str source: str version: int class AlignmentAgent: def __init__(self, name: str, schema: Dict): self.name name self.schema schema self.data_store {} def format_data(self, raw_data: Dict) - DataItem: return DataItem( textraw_data.get(text, ), labelself._convert_label(raw_data.get(label, )), sourceself.name, version1 ) def _convert_label(self, label: str) - str: # 统一标签格式 label_map { 正向: positive, 负向: negative, 中性: neutral, } return label_map.get(label, label)三、核心 API / 深水区3.1 消息路由与状态同步速查组件职责实现消息队列传递数据确保不丢失状态存储记录版本版本号控制对齐器格式转换统一 Schema校验器数据验证质量检查3.2 版本控制的状态同步class VersionedDataStore: def __init__(self): self.store {} self.version_counter {} def save(self, key: str, data: Any) - int: if key not in self.version_counter: self.version_counter[key] 0 self.version_counter[key] 1 self.store[f{key}_{self.version_counter[key]}] data return self.version_counter[key] def get_latest(self, key: str): version self.version_counter.get(key, 0) return self.store.get(f{key}_{version}) def resolve_conflict(self, key: str, versions: List[int]): latest_version max(versions) return self.store.get(f{key}_{latest_version})3.3 消息路由实现import uuid from datetime import datetime class MessageRouter: def __init__(self): self.queues {} self.processed set() def send(self, to_agent: str, data: Dict): if to_agent not in self.queues: self.queues[to_agent] [] message { id: str(uuid.uuid4()), data: data, timestamp: datetime.now().isoformat(), retry: 0 } self.queues[to_agent].append(message) return message[id] def receive(self, agent_name: str): queue self.queues.get(agent_name, []) if queue: msg queue.pop(0) self.processed.add(msg[id]) return msg return None def retry_failed(self, agent_name: str): queue self.queues.get(agent_name, []) for msg in queue: msg[retry] 1四、实战演练多 Agent 数据对齐系统import json import time from typing import Dict, List, Optional from dataclasses import dataclass, field dataclass class AlignedRecord: text: str label: str confidence: float sources: List[str] version: int class DataAlignmentOrchestrator: def __init__(self): self.agents {} self.router MessageRouter() self.store VersionedDataStore() self.aligned_records [] def register_agent(self, name: str, schema: Dict): agent AlignmentAgent(name, schema) self.agents[name] agent def process_data(self, raw_data: Dict) - Optional[AlignedRecord]: # 1. 分发给所有 Agent msg_ids [] for agent_name in self.agents: msg_id self.router.send(agent_name, raw_data) msg_ids.append(msg_id) # 2. 收集标注结果 results [] for agent_name in self.agents: result self.router.receive(agent_name) if result: agent self.agents[agent_name] formatted agent.format_data(result[data]) results.append(formatted) # 3. 对齐 if not results: return None aligned self._align_results(results) if aligned: self.aligned_records.append(aligned) return aligned def _align_results(self, results: List[DataItem]) - Optional[AlignedRecord]: texts set(r.text for r in results) if len(texts) 1: text next(iter(texts)) else: text results[0].text labels [r.label for r in results] label self._majority_vote(labels) confidence labels.count(label) / len(labels) sources [r.source for r in results] version max(r.version for r in results) return AlignedRecord( texttext, labellabel, confidenceconfidence, sourcessources, versionversion ) def _majority_vote(self, items: List) - Any: from collections import Counter return Counter(items).most_common(1)[0][0] def get_stats(self): return { total_records: len(self.aligned_records), avg_confidence: sum(r.confidence for r in self.aligned_records) / max(len(self.aligned_records), 1) } orchestrator DataAlignmentOrchestrator() orchestrator.register_agent(agent_a, {}) orchestrator.register_agent(agent_b, {}) data {text: 这家餐厅不好吃, label: 负向} result orchestrator.process_data(data) print(f对齐结果: {result})五、避坑指南与最佳实践 **技巧消息路由用队列保证不丢数据对齐不能丢数据消息队列是必须的。⚠️ **警告版本的冲突要合并多个 Agent 并行标注合并时要有策略。✅ **推荐用置信度过滤低质量数据置信度低于 0.7 的数据需要人工复核。六、综合实战演示生产级数据对齐管道import json from typing import Dict, List, Any from enum import Enum class DataStatus(Enum): PENDING pending ALIGNED aligned CONFLICT conflict REVIEWED reviewed class QualityController: def __init__(self, min_confidence0.7): self.min_confidence min_confidence def check_quality(self, record: AlignedRecord) - bool: return record.confidence self.min_confidence def check_completeness(self, record: AlignedRecord) - bool: return bool(record.text) and bool(record.label) class DataPipeline: def __init__(self): self.orchestrator DataAlignmentOrchestrator() self.quality QualityController() self.pipeline_log [] def add_source(self, name: str, schema: Dict): self.orchestrator.register_agent(name, schema) def process_batch(self, raw_batch: List[Dict]) - Dict[str, Any]: results { aligned: [], conflict: [], failed: [] } for i, raw in enumerate(raw_batch): try: aligned self.orchestrator.process_data(raw) if not aligned: results[failed].append(i) continue if self.quality.check_quality(aligned): results[aligned].append(aligned) else: results[conflict].append(aligned) except Exception as e: results[failed].append({index: i, error: str(e)}) self.pipeline_log.append(results) return results def export_aligned_data(self, formatjson): aligned [] for batch in self.pipeline_log: for record in batch.get(aligned, []): aligned.append({ text: record.text, label: record.label, confidence: record.confidence }) if format json: return json.dumps(aligned, ensure_asciiFalse) return aligned pipeline DataPipeline() pipeline.add_source(agent_a, {}) pipeline.add_source(agent_b, {}) batch [ {text: 这个产品很好用, label: 正向}, {text: 服务态度很差, label: 负向}, {text: 一般般吧, label: 中性}, ] result pipeline.process_batch(batch) print(f对齐: {len(result[aligned])}, 冲突: {len(result[conflict])}, 失败: {len(result[failed])})七、总结多 Agent 协同做微调数据对齐统一消息路由版本化状态管理置信度质量控制冲突自动合并搞好了这些微调数据对齐就不再是难题。