Agent平台消息节点输出设计思路
引言 在Agent平台的工作流系统中,我们经常需要将多个节点的输出组合成一条消息展示给用户。比如,一个 AI 分析节点生成流式文本,一个代码执行节点返回计算结果,我们希望将它们按顺序组合成:“AI分析:{流式文本}\n\n计算结果:{标量值}"。 这看似简单的需求,实际上涉及几个核心挑战: 混合数据源:如何同时处理流式数据(逐块到达)和标量数据(一次性完成)? 顺序保证:如何确保输出严格按照模板中变量出现的顺序推送? 依赖管理:如何知道何时可以开始推送?如何等待未就绪的数据? 解耦设计:如何让消息节点不需要知道具体引用了哪些节点? 本文将深入探讨一个基于编译时-运行时分离和拦截机制的流式输出设计方案。 说明:本文的设计思路参考了 Dify 的 StreamCoordinator 模块设计。Dify 是一个 AI 原生应用开发平台,其工作流引擎中的回答节点(Answer Node)采用了类似的流式输出协调机制,通过拦截上游节点的输出并按模板顺序推送,实现了高效的流式输出体验。本文在此基础上进行了架构优化和实现细节的阐述。 问题场景 假设我们有一个工作流,包含以下节点: LLM 节点:生成 AI 分析,输出是流式的(逐 token 返回) 代码执行节点:执行计算,输出是标量的(一次性返回) 消息节点:将两者组合,模板为 "AI分析:{llm_output}\n\n计算结果:{code_result}" 我们希望实现的效果是: 时间轴 │ 前端接收到的输出 ──────┼───────────────────────────── T1 │ "AI分析:" T2 │ "AI分析:AI" T3 │ "AI分析:AI thinks" T4 │ "AI分析:AI thinks..." T5 │ "AI分析:AI thinks...\n\n计算结果:" T6 │ "AI分析:AI thinks...\n\n计算结果:42" 注意几个关键点: 文本段立即推送:"AI分析:" 在第一个变量数据到达前就可以推送 流式数据实时转发:LLM 的每个 token 到达后立即转发 等待标量数据:"计算结果:" 推送后,必须等待代码节点完成才能推送 42 严格顺序:即使代码节点先完成,也必须等 LLM 流式输出完成后再推送 设计思路 参考说明:本设计参考了 Dify 的 StreamCoordinator 设计理念,特别是其"按变量在模板中的位置决定流式输出顺序"的核心思想。 ...