IngestionService 概览 / IngestionService Overview
IngestionService 是 RAG 摄取链路的编排中心,负责把“已上传文档”推进到“已索引知识”状态。它串联下载、解析、分块、向量化、写库和任务进度更新,并在失败路径上维护文档与任务状态。
本文聚焦模块职责、入口对象、依赖关系和策略优先级;具体类、方法、参数与返回值以 核心服务 API 参考 中的自动渲染结果为准。
何时阅读本页 / When to Read This Page
适合在以下场景阅读本页:
- 需要理解文档上传后如何进入 PostgreSQL 和 Qdrant
- 需要梳理
MinIO -> Parser -> Chunking -> Embedding -> Storage的主链路 - 需要确认摄取任务的状态更新、超时边界和失败恢复
- 需要判断分块策略在文档级、任务级和知识源级的覆盖优先级
关键文件 / Key Files
| 路径 | 作用 |
|---|---|
ai_service/services/ingestion.py |
IngestionService、任务清理函数和摄取主流程的源码入口 |
| Parser 子系统 | 负责根据内容类型提取文本和元数据 |
| ChunkingService | 负责把解析后的文本切成可索引块 |
| EmbeddingService 概览 | 负责为文本块生成向量 |
| 核心服务 API 参考 | IngestionService、get_ingestion_service、cleanup_stale_ingestion_jobs 的源码级说明 |
核心类与函数 / Core Classes and Functions
IngestionService摄取主编排器,协调下载、解析、分块、向量化、写库和状态更新。get_ingestion_service提供应用级单例入口,适合后台任务和 API 路由复用。cleanup_stale_ingestion_jobs在进程启动后清理遗留的RUNNING任务,避免卡死状态长时间残留。
规则:本页不再手工复制
process_document()、run_ingestion_job()等方法签名。对象细节请以 核心服务 API 参考 为准。
数据流 / Data Flow
flowchart LR
A[Indexed or uploaded document record] --> B[IngestionService]
B --> C[Download from MinIO]
C --> D[Select parser by content type]
D --> E[Persist parse snapshot]
E --> F[Resolve chunking strategy]
F --> G[Generate chunk texts]
G --> H[Generate embeddings]
H --> I[Write chunk rows to PostgreSQL]
I --> J[Write vectors to Qdrant]
J --> K[Update document and job status]
运行约定 / Runtime Behavior
主处理阶段
IngestionService 的主路径可概括为:
- 根据
document_id读取文档记录并更新状态 - 从 MinIO 下载原始文件
- 根据
content_type选择 parser,并产出解析结果 - 持久化 parse snapshot,保证后续可重建索引
- 解析并应用分块策略,生成 chunk
- 生成每个 chunk 的 embedding
- 将 chunk 行写入 PostgreSQL,将向量写入 Qdrant
- 更新文档状态、任务进度和最终结果
分块策略优先级
分块策略不是单一固定值,而是按调用上下文覆盖:
POST /knowledge-sources/{source_id}/ingest请求体显式覆盖- Agent 默认策略
- Knowledge Source 默认策略
- 系统默认策略
这意味着:
- 需要一次性实验不同切块方式时,优先使用 ingestion 请求覆盖
- 需要形成 Agent 级默认行为时,适合在 Agent 详情页保存默认策略
- 需要形成知识源默认行为时,在创建知识源时写入默认配置
API 预检与作业创建时机
摄取链路现在会在创建 ingestion job 记录之前,先同步验证最终生效的分块配置:
- 校验入口会合并 request override、Agent default、Knowledge Source default 和系统默认值
validate_chunking_strategy_configuration()会同时检查参数名是否合法、参数值是否能成功构造策略、以及运行时依赖是否可用- 如果策略不可运行或参数无效,API 直接返回
422,并携带结构化detail - 只有预检通过后,才会真正创建 ingestion job 记录并进入后台处理
当前错误详情的关键字段包括:
messagestrategy_namereasonentry_point
这避免了过去“任务先创建、文档后失败”的延迟暴露路径。
与 Admin 控制台的协作
GET /chunking-strategies会返回每个策略的is_available和unavailability_reason- Admin 前端用这个 capability 列表在知识源创建、知识源详情页高级 ingestion 选项、以及 Agent 默认配置中显示禁用态和原因说明
- 如果某个已保存默认值在当前环境不可用,详情页会显示 warning,并阻止新的 ingestion 任务继续启动
状态和恢复
- 文档会经历
UPLOADED -> PROCESSING -> INDEXED or FAILED等状态转换 - 单个文档失败不会自动说明整批任务必然失败,任务级逻辑会继续更新进度和汇总状态
cleanup_stale_ingestion_jobs()用于在服务重启后清理被中断的运行中任务
常见集成模式 / Common Integration Patterns
处理单个文档
from ai_service.services.rag.ingestion import get_ingestion_service
ingestion_service = get_ingestion_service()
chunks_created, chunks_indexed = ingestion_service.process_document(
document_id="doc-123",
agent_id="agent-456",
)
运行批量任务
from ai_service.services.rag.ingestion import get_ingestion_service
ingestion_service = get_ingestion_service()
ingestion_service.run_ingestion_job(job_id="job-789")
显式依赖注入
适用于测试或实验不同分块/向量实现的场景:
from ai_service.services.rag.chunking import ChunkingService
from ai_service.services.rag.ingestion import IngestionService
custom_chunking_service = ChunkingService(chunk_size=500, chunk_overlap=100)
ingestion_service = IngestionService(chunking_service=custom_chunking_service)
不应在本页重复维护的内容 / What This Page Should Not Duplicate
以下内容应留在 API 参考或配置页,而不是继续堆在模块页中:
process_document()、run_ingestion_job()的完整签名和参数表- 内部 helper 的具体异常抛出细节
- 由源码 docstring 已覆盖的返回值和字段说明
- 与
mkdocstrings自动渲染完全重复的函数描述