DataIn.cs 完整解析 — 跨模块数据入队引擎
核心内容两阶段入队模型— “暂存 → 批量提交”插件/脚本先调AddDoubleQueueIn(3.14)暂存到m_DataQueueInListExeModule执行时统一推入QueueDic[key]对应的 DataOut 队列最后finally清空缓冲并WakeAll()唤醒等待者ExeModule() 逐行拆解— 8 步找 DataOut →lock(dataOut)加锁 → 边界校验QueueIndex越界/负数→ 逐项类型字符串比对 → 10 种类型分发 → 异常静默吞掉 →finally清空 WakeAll10 种类型分发机制— 不是泛型而是字符串类型名double/int/string/bool及其数组 HImage比对后强转为对应的ListT优点是灵活缺点是拼写错误只能在运行时发现QueueIndex 偏移— 多个 DataIn 可以向同一个 DataOut 的不同槽位区域写入不一定要从槽位 0 开始完整数据流— 从变量链接获取值 →AddXXXQueueIn暂存 →ExeModule写入 →WakeAll唤醒 → DataOut 读取的端到端链路5 个隐患— 字符串类型匹配脆弱、异常被注释掉静默吞掉、lock(dataOut)粒度过大阻塞读取端、无 QueueKey 不存在时的容错、10 个 AddXXX 方法重复代码DataIn.cs 完整解析 — 跨模块数据入队引擎文件:Services\DataIn.cs(230行)继承:ModuleBase— 本身可作为流程节点使用角色: 将上游模块的输出数据按类型推入共享队列 (DataOut), 实现跨模块/跨流程的数据传递配套:DataOut.cs(出队端) /Plugin.DataIn(我们创建的 UI 插件) /Plugin.DataOut(UI 插件)1. 它解决什么问题在视觉流程中, 模块 A 的输出需要传给模块 B 使用。普通变量链接只能在同一 Project 内传递。DataInDataOut通过QueueKey 命名的共享队列实现了跨 Project 的数据传递。Project A (采集流程): Project B (汇总流程): ┌──────────┐ ┌──────────┐ │ DataIn │──→ QueueDic[Q1] ──→ │ DataOut │ │ QueueKey │ (全局共享队列) │ QueueKey │ │ Q1 │ │ Q1 │ └──────────┘ └──────────┘2. 源码结构 (230行)DataIn : ModuleBase │ ├── 属性 │ ├── QueueKey ← 队列标识 (与 DataOut 的 Key 对应) │ └── QueueIndex ← 数据写入的起始槽位偏移 │ ├── 内部缓冲 (用完即清) │ ├── m_DataQueueInList ← Listobject 暂存待入队的数据 │ └── m_DataTypeInList ← Liststring 暂存对应的类型名 │ ├── ★ 10 个 AddXXXQueueIn 方法 (数据暂存) │ ├── AddIntQueueIn(int) │ ├── AddDoubleQueueIn(double) │ ├── AddStringQueueIn(string) │ ├── AddBoolQueueIn(bool) │ ├── AddIntListQueueIn(Listint) │ ├── AddDoubleListQueueIn(Listdouble) │ ├── AddStringListQueueIn(Liststring) │ ├── AddBoolListQueueIn(Listbool) │ ├── AddHImageQueueIn(HImage) │ └── AddHImageListQueueIn(ListHImage) │ ├── WakeAll() ← 唤醒所有等待此队列的 DataOut └── ExeModule() ← 将缓冲数据按类型推入 DataOut 队列3. 两阶段入队模型DataIn 采用“暂存 → 批量提交”的两阶段模型:阶段1: 暂存 (由插件/脚本调用 AddXXXQueueIn) │ ├→ AddDoubleQueueIn(3.14) → m_DataQueueInList[3.14] m_DataTypeInList[double] ├→ AddStringQueueIn(ABC) → m_DataQueueInList[3.14,ABC] m_DataTypeInList[double,string] └→ AddBoolQueueIn(true) → 同上... │ ▼ 阶段2: 批量入队 (ExeModule 执行) │ ├→ 根据 QueueKey 找到 DataOut ├→ lock(dataOut) 线程安全 ├→ 逐个按类型推入 dataOut 的对应槽位 └→ finally: 清空 m_DataQueueInList m_DataTypeInList4. ExeModule() — 核心入队逻辑 (109→220行)publicoverrideboolExeModule(){boolflagtrue;try{// ① ★ 找到对应的 DataOut 队列 (跨流程共享)if(!Solution.Ins.QueueDic.ContainsKey(QueueKey)){Logger.AddLog($没有找到对应的队列 [{QueueKey}]);returnfalse;}DataOutdataOutSolution.Ins.QueueDic[QueueKey];// ② ★ 加锁: 保证并发安全lock(dataOut){intdataOutLengthdataOut.GetQueueCount();// ③ 边界校验if(dataOutLength(QueueIndexm_DataQueueInList.Count)){Logger.AddLog(入队变量的长度超过数据出队的变量的长度);returnfalse;// 槽位索引越界}if(QueueIndex0){Logger.AddLog(入队变量的索引为负值);returnfalse;}// ④ ★ 逐项按类型推入队列for(inti0;im_DataQueueInList.Count;i){// 类型匹配检查if(m_DataTypeInList[i]!dataOut.GetDataType(iQueueIndex)){Logger.AddLog(数据入队类型与对应的数据出队类型不匹配);WakeAll();// 唤醒等待者 (避免死锁)returnfalse;}// ★ 按 10 种类型分发switch(m_DataTypeInList[i]){caseint:Listintlist1(Listint)dataOut.GetDataQueue(iQueueIndex);list1.Add((int)m_DataQueueInList[i]);break;casedouble:Listdoublelist2(Listdouble)dataOut.GetDataQueue(iQueueIndex);list2.Add((double)m_DataQueueInList[i]);break;casestring:Liststringlist3(Liststring)dataOut.GetDataQueue(iQueueIndex);list3.Add((string)m_DataQueueInList[i]);break;casebool:Listboollist4(Listbool)dataOut.GetDataQueue(iQueueIndex);list4.Add((bool)m_DataQueueInList[i]);break;caseHImage:ListHImagelist5(ListHImage)dataOut.GetDataQueue(iQueueIndex);list5.Add((HImage)m_DataQueueInList[i]);break;// ... 数组类型同理 (Listint[] 等)}}}}catch(Exceptionex){// ★ 异常被静默吞掉 — 隐患/// MessageBox.Show(ex);}finally{// ⑤ ★ 入队完成 → 清空缓冲 → 唤醒等待者m_DataQueueInList.Clear();m_DataTypeInList.Clear();WakeAll();// → QueueSignDic[QueueKey].Set() 唤醒 DataOut.GetStr()}returnflag;}5. 类型分发机制DataIn 和 DataOut 之间通过字符串类型名做类型匹配, 而非泛型:// DataIn 端声明类型m_DataTypeInList[i]double// DataOut 端注册类型 (通过 DefineDoubleQueue)m_DataTypeList[idx]double// 入队时比对if(doubledouble)→ 强转为 Listdouble→ Add支持的 10 种类型:类型字符串运行时容器类型AddXXXQueueIn 参数intListintintdoubleListdoubledoublestringListstringstringboolListboolboolint[]ListListintListintdouble[]ListListdoubleListdoublestring[]ListListstringListstringbool[]ListListboolListboolHImageListHImageHImageHImage[]ListListHImageListHImage6. WakeAll() — 信号唤醒privatevoidWakeAll(){if(!Solution.Ins.QueueSignDic.ContainsKey(QueueKey))Solution.Ins.QueueSignDic.Add(QueueKey,newAutoResetEvent(false));Solution.Ins.QueueSignDic[QueueKey].Set();// → 唤醒所有在 GetStr() 中 WaitOne 等待的 DataOut}调用时机:入队成功 → 通知 DataOut “有新数据了”类型不匹配 → 也唤醒 (避免 DataOut 永久阻塞)finally块 → 无论如何都唤醒7. 完整数据流外部调用 (如 Plugin.DataIn.ExeModule) │ ├→ 通过变量链接获取上游模块的值 │ GetLinkValue(slot.LinkVar.Text) → 3.14 │ ├→ 暂存到 DataIn 内部缓冲 │ dataIn.AddDoubleQueueIn(3.14) → m_DataQueueInList[3.14] │ └→ 触发入队 dataIn.ExeModule() │ ├→ QueueDic[Q1] → 找到 DataOut 实例 ├→ lock(dataOut) ├→ 类型校验: double dataOut.GetDataType(0) ├→ Listdouble list dataOut.GetDataQueue(0) ├→ list.Add(3.14) ← ★ 数据写入了! ├→ WakeAll() → QueueSignDic[Q1].Set() └→ m_DataQueueInList.Clear() ← 清空缓冲 ════════ 另一边, DataOut 正在等待 ════════ DataOut.GetStr() 或 Plugin.DataOut.ExeModule │ ├→ lock(dataOut) ├→ Listdouble list dataOut.GetDataQueue(0) ├→ double val list.Last() ← ★ 数据读取! └→ 返回给下游8. QueueIndex 偏移机制QueueIndex允许不从槽位 0 开始写入, 而是从指定偏移开始:DataOut 定义了 6 个槽位: [0]Listdouble [1]Listdouble [2]Listint [3]Liststring [4]Listbool [5]Listbool QueueIndex 2 时: m_DataQueueInList [100, hello, true] → 实际写入: 槽位[2]100, 槽位[3]hello, 槽位[4]true这意味着多个 DataIn 可以向同一个 DataOut 的不同槽位区域写入。9. 设计分析优点队列共享: 通过Solution.Ins.QueueDic全局字典, 任意 Project 的 DataIn 都能写入类型安全: 入队时显式校验DataType字符串, 防止类型错误信号驱动:WakeAll()确保 DataOut 不会被永久阻塞缓冲清理:finally保证缓冲总是被清空, 不会残留数据隐患问题说明字符串类型匹配脆弱类型靠字符串比对, 拼写错误只能在运行时发现异常静默吞掉catch (Exception ex) { /// MessageBox.Show(ex); }— 异常被注释掉锁粒度过大lock(dataOut)锁住了整个入队过程, DataOut 读取也被阻塞无 QueueKey 不存在时的容错直接返回 false, 但WakeAll仍在 finally 中执行10 个 AddXXX 方法重复代码除了类型不同, 逻辑完全一样, 可以用泛型方法消除文档说明: 基于 DataIn.cs (230行) 源码静态分析生成。与 DataOut.cs 配对使用, 共同构成跨模块数据队列系统。当前版本 2026-06-10。