引言 在Agent平台的工作流系统中,我们经常需要将多个节点的输出组合成一条消息展示给用户。比如,一个 AI 分析节点生成流式文本,一个代码执行节点返回计算结果,我们希望将它们按顺序组合成:“AI分析:{流式文本}\n\n计算结果:{标量值}"。 这看似简单的需求,实际上涉及几个核心挑战: 混合数据源:如何同时处理流式数据(逐块到达)和标量数据(一次性完成)? 顺序保证:如何确保输出严格按照模板中变量出现的顺序推送? 依赖管理:如何知道何时可以开始推送?如何等待未就绪的数据? 解耦设计:如何让消息节点不需要知道具体引用了哪些节点? 本文将深入探讨一个基于编译时-运行时分离和拦截机制的流式输出设计方案。 说明:本文的设计思路参考了 Dify 的 StreamCoordinator 模块设计。Dify 是一个 AI 原生应用开发平台,其工作流引擎中的回答节点(Answer Node)采用了类似的流式输出协调机制,通过拦截上游节点的输出并按模板顺序推送,实现了高效的流式输出体验。本文在此基础上进行了架构优化和实现细节的阐述。 问题场景 假设我们有一个工作流,包含以下节点: LLM 节点:生成 AI 分析,输出是流式的(逐 token 返回) 代码执行节点:执行计算,输出是标量的(一次性返回) 消息节点:将两者组合,模板为 "AI分析:{llm_output}\n\n计算结果:{code_result}" 我们希望实现的效果是: 时间轴 │ 前端接收到的输出 ──────┼───────────────────────────── T1 │ "AI分析:" T2 │ "AI分析:AI" T3 │ "AI分析:AI thinks" T4 │ "AI分析:AI thinks..." T5 │ "AI分析:AI thinks...\n\n计算结果:" T6 │ "AI分析:AI thinks...\n\n计算结果:42" 注意几个关键点: 文本段立即推送:"AI分析:" 在第一个变量数据到达前就可以推送 流式数据实时转发:LLM 的每个 token 到达后立即转发 等待标量数据:"计算结果:" 推送后,必须等待代码节点完成才能推送 42 严格顺序:即使代码节点先完成,也必须等 LLM 流式输出完成后再推送 设计思路 参考说明:本设计参考了 Dify 的 StreamCoordinator 设计理念,特别是其"按变量在模板中的位置决定流式输出顺序"的核心思想。 ...
FastAPI + Gunicorn 下定时与启动任务的陷阱与解法
本文记录了从 Java(SpringBoot)迁移到 Python(FastAPI + Gunicorn)过程中,在定时任务与服务启动初始化环节踩的典型坑:TaskIQ scheduler/receiver 阻塞 lifespan、每个 worker 重复执行下载/上报逻辑导致超时。深入剖析了多进程模型与单进程模型的本质差异,并给出分层解决方案:容器层(entrypoint.sh)处理真正一次性任务,进程层(lifespan)管理进程内资源,任务层(独立进程)运行定时调度。
DashScope Embedding 长文本支持方案
本文记录了在 LangChain + DashScope 构建知识库时,如何通过扩展 DashScopeEmbeddings 类实现长文本(>8192 tokens)的安全嵌入支持。核心方法复用 OpenAI 的 length-safe 机制,结合 DashScope 官方 tokenizer(qwen-turbo)进行精准分块与加权合并,并附有容错降级策略与 embedding_ctx_length 实测调优建议。
LangGraph 智能体稳定性优化实录:从频繁超时到 99% 成功率
本文记录了一个基于 FastAPI + Gunicorn + LangGraph 的复杂智能体应用,如何通过「多级缓存(版本号同步)+ 异步任务架构(TaskIQ)+ Redis Stream 事件推送」的组合方案,彻底解决 Worker 超时被 kill 的问题,将任务成功率从 70% 提升至 99%+,并支持断线重连、任务取消等企业级能力。
RAG系统中元数据的应用与管理
I. RAG系统中的元数据导论 检索增强生成(Retrieval-Augmented Generation, RAG)作为一种先进的人工智能技术,通过结合检索机制与生成模型,利用外部知识库来增强大型语言模型(LLM)的能力,以产生更准确、更具上下文相关性且基于最新信息的响应 1。在RAG系统的构建与优化过程中,元数据(Metadata)扮演着至关重要的角色。 A. RAG语境下的元数据定义 广义上,元数据被定义为“关于数据的数据”或描述主要数据的补充信息 1。在RAG系统的特定语境下,元数据指的是与文档或数据块(如文本、图像等)相关联的结构化或半结构化信息,这些信息存在于原始内容本身之外 14。它不仅仅是对数据的被动描述,更是一个主动组件,用于优化、精炼和控制RAG流程的各个环节 1。元数据提供了超越简单语义相似性搜索的关键上下文,并使能了诸多高级功能。 对元数据作用的深入理解揭示了其在RAG系统中的基础性地位。它并非系统构建过程中的附属品或事后添加的补充,而是决定系统能力上限的关键因素。元数据的定义方式和类型选择,直接影响着系统在数据过滤、访问控制、上下文关联性等方面的表现,使得RAG系统能够超越基础的语义检索,满足更复杂、更精细化的应用需求。仅依赖向量相似性的检索方式,在许多实际应用场景中往往是不够的 1,而元数据的引入正是弥补这一不足、提升系统性能和功能性的核心手段。因此,从系统设计的初始阶段就对元数据进行战略性规划至关重要。 B. 元数据的常见类型(系统元数据 vs. 用户定义元数据) 在RAG系统中,元数据通常可以分为两大类:系统元数据和用户定义元数据。这两类元数据在来源、性质和应用上有所不同,但共同构成了RAG系统有效运行的基础。 系统元数据 (System Metadata): 这类元数据通常在数据摄取、处理或索引过程中由系统自动生成,其性质偏向技术性,主要用于内部管理和追踪。 示例: 包括但不限于 chunk_id(数据块标识符,如文档第20个块,共181块)、total_chunks(文档分块总数)、origin_id(原始文档的唯一标识符UUID)、filename(原始文件名)、source(文档在存储中的完整路径或标识符)、source_display_name(人类可读的源路径)、origin(数据摄取方式,如’google-drive’、‘file-upload’、‘web-crawler’)、时间戳(创建/更新时间)、文本内容的哈希值等 14。 重要性: 系统元数据对于追踪数据来源、管理数据块、确保数据完整性以及系统调试至关重要。例如,origin_id 和 chunk_id 是将检索到的数据块与其原始文档关联起来的关键,这对于提供引用来源或进行文档级操作是必不可少的 14。 用户定义元数据 (User-Defined Metadata): 这类元数据是根据特定的领域知识、业务需求或应用场景,通过手动分配或自动提取方式添加的。其性质通常偏向语义或上下文描述。 示例: 来源(如特定的出版物、网站)、作者、创建/发布日期、文档类型(如“技术规范”、“研究论文”、“支持工单”、“新闻文章”)、主题/关键词/标签(如“身份验证”、“金融”、“复仇”)、访问权限/许可(如“管理员”、“人力资源”、“财务”、“公开”)、状态(如“已解决”、“未解决”)、优先级(“高”、“低”)、用户角色(“管理员”、“访客”)、客户满意度评分、监管机构、合规日期、风险级别、页码、体裁、写作年份等 1。 重要性: 用户定义元数据是实现强大过滤、排序、访问控制和上下文感知能力的关键,能够根据具体用例定制RAG系统的行为 1。其内容的丰富性、准确性和相关性直接影响系统的有效性和实用价值。 系统元数据和用户定义元数据之间存在着重要的互补关系。系统元数据提供了管理数据和追踪来源的结构化基础,确保了系统的基本运作和可维护性。而用户定义元数据则在此基础上赋予了数据丰富的语义信息和业务上下文,使得系统能够执行更高级、更智能的操作。例如,要实现“查找1600年之前创作的所有悲剧” 14 或“仅检索财务部门可见的数据” 19 这样的精确过滤或访问控制功能,必须依赖于用户定义的元数据字段(如“写作年份”、“体裁”、“部门”)。然而,当系统需要将检索结果追溯到原始文档或提供文件名时,又必须依赖系统元数据(如 origin_id 或 filename)。因此,一个功能完善且高效的RAG系统需要有效地结合并利用这两种类型的元数据,前者提供骨架,后者填充血肉,共同支撑起系统的各项功能。 II. RAG流程中的元数据创建与提取 元数据的创建和提取并非孤立的步骤,而是深度集成在RAG系统数据处理流程的各个阶段。从数据进入系统开始,直至最终被索引,元数据的处理贯穿始终。忽略早期阶段的元数据管理可能会导致技术债务累积,并限制系统后续的功能实现。 A. RAG数据流程中的集成点 元数据的生命周期与RAG的数据准备阶段紧密相连,主要涉及以下几个关键环节: 数据摄取/加载 (Data Ingestion/Loading): 这是元数据处理的起点。在加载原始数据(如文件、数据库记录、网页)时,系统可以捕获初始的元数据信息。这可能包括文件的属性(创建日期、作者)、数据源的文件夹结构(其本身可能蕴含分类信息)、数据库表的字段名或网页的URL等 3。在此阶段就考虑需要收集哪些元数据至关重要,因为后续补充可能会非常困难甚至不可能 13。 预处理/解析 (Preprocessing/Parsing): 在将原始数据转换为可用格式(如从PDF中提取文本、解析HTML结构)的过程中,可以提取或生成更多的元数据。例如,可以从文档内容中识别出标题、作者、章节标题、页码等信息 8。数据清洗步骤,如去除无关内容(页眉、页脚)、标准化特殊字符等,也可能涉及元数据的规范化处理。常用的解析库包括 unstructured、PyPDF2、pdfminer.six、BeautifulSoup 等 8。 分块 (Chunking): 将长文档切分成较小的数据块是RAG的核心步骤之一。在此阶段,必须确保元数据与每个生成的数据块正确关联。系统元数据,如 chunk_id、total_chunks 和 origin_id,通常在此时生成 14。更重要的是,与整个文档相关的元数据(如来源、日期、权限标签)必须被有效地传递或链接到其对应的每个数据块上 4。分块策略本身(如固定大小、按段落、语义分块、特定格式分块)也会影响每个块所包含的上下文元数据 24。一种常见的做法是将文档或章节标题等上下文信息作为“块头”(contextual chunk headers)预置到每个数据块的文本内容中,从而隐式地嵌入了部分元数据 31。 富化 (Enrichment): 这是一个专门用于添加或推断元数据的阶段。可以通过各种技术来丰富数据块的元数据信息,例如提取关键词、生成摘要、识别命名实体(人名、地名、组织名)、应用情感分类或添加其他领域特定的标签 10。 索引 (Indexing): 最终,处理好的数据块及其关联的元数据被存储到选定的数据库(如向量数据库、图数据库或关系数据库)中。元数据与向量嵌入一起存储,以便在检索时能够支持基于元数据的过滤和排序操作 1。 元数据在数据块(而非仅仅是文档)层面的关联是一个关键且不容忽视的环节。由于RAG系统通常检索的是数据块 2,因此过滤、排序等操作也是在数据块级别进行的 1。这意味着每个数据块所携带的元数据必须准确反映该块的内容和属性。简单地将文档的所有元数据附加到每个块上可能过于粗糙,无法实现精细控制。因此,设计有效的数据块级元数据管理策略,例如通过块头嵌入上下文 31 或为每个块生成特定的元数据(如摘要、关键词 24),是优化检索精度的重要考量。 ...
ChatGLM2-6B微调
ChatGLM2-6B微调 微调工具 本次我们使用ChatGLM-Efficient-Tuning,基于 🤗PEFT 的高效 🤖ChatGLM-6B 微调工具 目前实现了针对以下高效微调方法的支持: LoRA 仅微调低秩适应器。 P-Tuning V2 仅微调前缀编码器。 Freeze Tuning 仅微调后几层的全连接层。 全量微调 微调模型所有参数。 本次我们使用LoRA微调方法。 微调步骤 下载代码 git clone https://github.com/hiyouga/ChatGLM-Efficient-Tuning.git 安装依赖 cd ChatGLM-Efficient-Tuning # 创建conda环境 conda create -n chatglm2 python=3.9 conda activate chatglm2 # 安装pytorch,如果已安装可以忽略 conda install pytorch==2.0.0 torchvision==0.15.0 torchaudio==2.0.0 pytorch-cuda=11.7 -c pytorch -c nvidia # 安装项目依赖 pip install -r requirements.txt 下载模型 下载地址: https://huggingface.co/THUDM/chatglm2-6b 注意必须要将整个项目git clone下来 这里建议已经下载过模型的重新将项目中的几个py文件重新下载一次,不然会报错 git lfs install git clone https://huggingface.co/THUDM/chatglm2-6b 编辑数据集 数据集按照如下格式即可 [ { "question": "人们为何常常感到委屈", //输入 "instruction": "问答", //指令 如 翻译/问答 "output": "人常会不自觉地记下对自己有利的部分,这是形成委屈的重要原因。" //输出 } ] 在src/data/data_info.json中新增数据集并编辑字段映射 ...
Windows环境使用Docker本地部署和微调CPM-Bee
Windows环境使用Docker本地部署和微调CPM-Bee 这几天想尝试下OPENBMB的的CPM-Bee模型,本来打算在临时买的腾讯云GPU服务器上跑一下,但是安装依赖的时候发现一堆奇怪的问题,看了下github的issue,大部分都是bmtrain安装出现问题,且由于他使用的torch版本比较旧,不支持2.0.0+,所以只能改为尝试在本地docker隔离环境跑通再说. 拉取pytorch镜像 CPM-Bee要求torch版本为torch>=1.10,<2.0.0,所以我们选用pytorch/pytorch:1.13.1-cuda11.6-cudnn8-devel这个镜像,容器有8.69GB比较大,下载需要不少时间。 docker pull pytorch/pytorch:1.13.1-cuda11.6-cudnn8-devel 运行容器 # 这里我把下载好的模型挂载上去了 docker run --gpus all -it --name cpm_bee -d -v .\cpm-bee\:/root/models/cpm-bee/ -p 7860:7860 pytorch/pytorch:1.13.1-cuda11.6-cudnn8-devel 准备环境 安装必要的一些工具 # 更换软件源 sed -i 's/security.ubuntu.com/mirrors.ustc.edu.cn/g' /etc/apt/sources.list apt update # 安装ninja-build是因为镜像没有自带这个,但是有这个pip install会快一些 apt install -y git curl wget vim ninja-build # 设置国内pip源 pip config set global.index-url https://mirrors.cloud.tencent.com/pypi/simple 验证环境可用 验证CUDA可用 nvcc -V 输出如下说明可用 验证显卡挂载情况 nvidia-smi 跑一下代码看看 root@d77e1bb69e71:~# python Python 3.10.8 (main, Nov 4 2022, 13:48:29) [GCC 11.2.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import torch >>> torch.cuda.is_available() True 下载代码 这里我用了github的代理 ...
Langchian-ChatGLM安装部署
Langchian-ChatGLM安装部署 介绍 Langchain-ChatGLM项目是一种利用 langchain 思想实现的基于本地知识库的问答应用,目标期望建立一套对中文场景与开源模型支持友好、可离线运行的知识库问答解决方案. 本项目实现原理如下图所示,过程包括加载文件 -> 读取文本 -> 文本分割 -> 文本向量化 -> 问句向量化 -> 在文本向量中匹配出与问句向量最相似的top k个 -> 匹配出的文本作为上下文和问题一起添加到prompt中 -> 提交给LLM生成回答。 从文档处理角度来看,实现流程如下: 部署要求 ChatGLM-6B模型 量化等级 最低 GPU 显存(推理) 最低 GPU 显存(高效参数微调) FP16(无量化) 13 GB 14 GB INT8 8 GB 9 GB INT4 6 GB 7 GB Embedding 本项目中默认选用的 Embedding 模型 GanymedeNil/text2vec-large-chinese 约占用显存 3GB,也可修改为在 CPU 中运行。 部署步骤 环境检查 # 首先,确信你的机器安装了 Python 3.8 及以上版本 $ python --version Python 3.8.13 # 如果低于这个版本,可使用conda安装环境 $ conda create -p /your_path/env_name python=3.8 # 激活环境 $ source activate /your_path/env_name $ pip3 install --upgrade pip # 关闭环境 $ source deactivate /your_path/env_name # 删除环境 $ conda env remove -p /your_path/env_name 项目依赖 # 拉取仓库 $ git clone https://github.com/imClumsyPanda/langchain-ChatGLM.git # 进入目录 $ cd langchain-ChatGLM # 项目中 pdf 加载由先前的 detectron2 替换为使用 paddleocr,如果之前有安装过 detectron2 需要先完成卸载避免引发 tools 冲突 $ pip uninstall detectron2 # 检查paddleocr依赖,linux环境下paddleocr依赖libX11,libXext $ yum install libX11 $ yum install libXext # 安装依赖,这里要注意python版本,如果某些依赖下不下来,可以换个镜像源下载 $ pip install -r requirements.txt # 验证paddleocr是否成功,首次运行会下载约18M模型到~/.paddleocr # 这一步最新版本会报错,但好像不影响使用 $ python loader/image_loader.py 下载本地模型 作者在QA中提供了模型的百度云盘地址,方便国内下载 ...
FlinkSQL-基于jdbc的自定义Catalog
FlinkSQL-基于jdbc的自定义Catalog 本文将介绍如何在FlinkSQL中使用基于jdbc的自定义Catalog。 什么是Catalog? Catalog是一个用于管理数据库和表的元数据存储系统。在FlinkSQL中,Catalog用于描述Flink集群中的数据库和表。FlinkSQL支持多种类型的Catalog,如默认基于内存的GenericInMemoryCatalog,基于Hive的HiveCatalog、基于JDBC的MySqlCatalog和PostgresCatalog等。 GenericInMemoryCatalog:基于内存,所有数据库和表信息都存储在内存中,重启后即消失 HiveCatalog:数据存储在hive的元数据中,需要部署hive服务才能用 MySqlCatalog和PostgresCatalog: 基于jdbc,只能读取表结构,对应的jdbc数据库中有哪些表就只能用哪些表,无法创建。 自定义Catalog 在FlinkSQL中,除了使用默认的Catalog外,我们还可以通过实现自定义的Catalog来管理数据源和表。自定义Catalog可以更灵活地管理数据库和表,自定义持久化逻辑,满足不同场景下的需求。本文将介绍如何使用基于jdbc的自定义Catalog。 实现 首先,我们需要实现一个继承自JdbcCatalog的自定义Catalog。JdbcCatalog是Flink SQL中内置的一个基于JDBC的Catalog实现,我们可以通过继承该类,重写其中的方法来实现自定义的Catalog。 自定义Catalog需要继承JdbcCatalog类,并重写其中的方法来实现自定义的逻辑。在打开和关闭Catalog时,我们可以实现自定义的初始化和资源释放逻辑。在listTables和getTable方法中,我们需要实现自定义的表列表查询和表查询逻辑。除此之外,还可以重写其他方法来满足不同的需求。 设计表结构 基于mysql,如果要使用其他数据库相应调整ddl即可 数据库(Database): CREATE TABLE `metadata_database` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `create_person` varchar(100) DEFAULT '' COMMENT '创建人', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_person` varchar(100) DEFAULT '' COMMENT '最后更新人', `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除', `database_name` varchar(100) DEFAULT NULL COMMENT '数据库名', `comment` varchar(200) DEFAULT NULL COMMENT '备注信息', PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `uni_database_name` (`database_name`, `type`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据库 数据表(Table): CREATE TABLE `metadata_table` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `create_person` varchar(100) DEFAULT '' COMMENT '创建人', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_person` varchar(100) DEFAULT '' COMMENT '最后更新人', `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除', `database_name` varchar(100) DEFAULT NULL COMMENT '数据库名', `database_id` bigint(20) DEFAULT NULL COMMENT '数据库id', `table_name` varchar(100) NOT NULL, `comment` varchar(200) DEFAULT NULL COMMENT '备注', PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `idx_table_name` (`table_name`, `database_name`, `type`), KEY `idx_database_id` (`database_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据表'; 字段信息(Column) ...
kotlin使用spring-data-jpa+query-dsl踩坑记录
背景 使用kotlin+springboot+spring-data-jpa搭建的项目,在引入query-dsl依赖来提供通用查询服务时出现以下错误: 原来的代码 Entity @Entity @Table class User { var username: String ? =null @Id @GeneratedValue(strategy = GenerationType.IDENTITY) var id : Long ? = null } Repository interface UserRepository : JpaRepository<User, Long>, QuerydslPredicateExecutor<User> { } Rest @RestController @RequestMapping("/user") class UserRest(private val userRepository: UserRepository) { @GetMapping fun userList(pageable: Pageable): Page<User>{ return userRepository.findAll( pageable) } @PostMapping fun addUser(@RequestBody user: User): Long? { return userRepository.save(user).id } } 修改后的代码 Repository interface UserRepository : JpaRepository<User, Long>, QuerydslPredicateExecutor<User>, QuerydslBinderCustomizer<QUser> { override fun customize(bindings: QuerydslBindings, root: QUser) { //修改查询,让username字段支持模糊查询,且忽略大小写 bindings.bind(root.username) .first { path, value -> path.containsIgnoreCase(value) } } } 代码中的QUser是query-dsl生成的类,包含User的各种信息,用于组成dsl风格的查询语法 ...