跳转至

IngestionService 概览 / IngestionService Overview

IngestionService 是 RAG 摄取链路的编排中心,负责把“已上传文档”推进到“已索引知识”状态。它串联下载、解析、分块、向量化、写库和任务进度更新,并在失败路径上维护文档与任务状态。

本文聚焦模块职责、入口对象、依赖关系和策略优先级;具体类、方法、参数与返回值以 核心服务 API 参考 中的自动渲染结果为准。

何时阅读本页 / When to Read This Page

适合在以下场景阅读本页:

  • 需要理解文档上传后如何进入 PostgreSQL 和 Qdrant
  • 需要梳理 MinIO -> Parser -> Chunking -> Embedding -> Storage 的主链路
  • 需要确认摄取任务的状态更新、超时边界和失败恢复
  • 需要判断分块策略在文档级、任务级和知识源级的覆盖优先级

关键文件 / Key Files

路径 作用
ai_service/services/ingestion.py IngestionService、任务清理函数和摄取主流程的源码入口
Parser 子系统 负责根据内容类型提取文本和元数据
ChunkingService 负责把解析后的文本切成可索引块
EmbeddingService 概览 负责为文本块生成向量
核心服务 API 参考 IngestionServiceget_ingestion_servicecleanup_stale_ingestion_jobs 的源码级说明

核心类与函数 / Core Classes and Functions

  • IngestionService 摄取主编排器,协调下载、解析、分块、向量化、写库和状态更新。
  • get_ingestion_service 提供应用级单例入口,适合后台任务和 API 路由复用。
  • cleanup_stale_ingestion_jobs 在进程启动后清理遗留的 RUNNING 任务,避免卡死状态长时间残留。

规则:本页不再手工复制 process_document()run_ingestion_job() 等方法签名。对象细节请以 核心服务 API 参考 为准。

数据流 / Data Flow

flowchart LR
    A[Indexed or uploaded document record] --> B[IngestionService]
    B --> C[Download from MinIO]
    C --> D[Select parser by content type]
    D --> E[Persist parse snapshot]
    E --> F[Resolve chunking strategy]
    F --> G[Generate chunk texts]
    G --> H[Generate embeddings]
    H --> I[Write chunk rows to PostgreSQL]
    I --> J[Write vectors to Qdrant]
    J --> K[Update document and job status]

运行约定 / Runtime Behavior

主处理阶段

IngestionService 的主路径可概括为:

  1. 根据 document_id 读取文档记录并更新状态
  2. 从 MinIO 下载原始文件
  3. 根据 content_type 选择 parser,并产出解析结果
  4. 持久化 parse snapshot,保证后续可重建索引
  5. 解析并应用分块策略,生成 chunk
  6. 生成每个 chunk 的 embedding
  7. 将 chunk 行写入 PostgreSQL,将向量写入 Qdrant
  8. 更新文档状态、任务进度和最终结果

分块策略优先级

分块策略不是单一固定值,而是按调用上下文覆盖:

  1. POST /knowledge-sources/{source_id}/ingest 请求体显式覆盖
  2. Agent 默认策略
  3. Knowledge Source 默认策略
  4. 系统默认策略

这意味着:

  • 需要一次性实验不同切块方式时,优先使用 ingestion 请求覆盖
  • 需要形成 Agent 级默认行为时,适合在 Agent 详情页保存默认策略
  • 需要形成知识源默认行为时,在创建知识源时写入默认配置

API 预检与作业创建时机

摄取链路现在会在创建 ingestion job 记录之前,先同步验证最终生效的分块配置:

  • 校验入口会合并 request override、Agent default、Knowledge Source default 和系统默认值
  • validate_chunking_strategy_configuration() 会同时检查参数名是否合法、参数值是否能成功构造策略、以及运行时依赖是否可用
  • 如果策略不可运行或参数无效,API 直接返回 422,并携带结构化 detail
  • 只有预检通过后,才会真正创建 ingestion job 记录并进入后台处理

当前错误详情的关键字段包括:

  • message
  • strategy_name
  • reason
  • entry_point

这避免了过去“任务先创建、文档后失败”的延迟暴露路径。

与 Admin 控制台的协作

  • GET /chunking-strategies 会返回每个策略的 is_availableunavailability_reason
  • Admin 前端用这个 capability 列表在知识源创建、知识源详情页高级 ingestion 选项、以及 Agent 默认配置中显示禁用态和原因说明
  • 如果某个已保存默认值在当前环境不可用,详情页会显示 warning,并阻止新的 ingestion 任务继续启动

状态和恢复

  • 文档会经历 UPLOADED -> PROCESSING -> INDEXED or FAILED 等状态转换
  • 单个文档失败不会自动说明整批任务必然失败,任务级逻辑会继续更新进度和汇总状态
  • cleanup_stale_ingestion_jobs() 用于在服务重启后清理被中断的运行中任务

常见集成模式 / Common Integration Patterns

处理单个文档

from ai_service.services.rag.ingestion import get_ingestion_service

ingestion_service = get_ingestion_service()
chunks_created, chunks_indexed = ingestion_service.process_document(
    document_id="doc-123",
    agent_id="agent-456",
)

运行批量任务

from ai_service.services.rag.ingestion import get_ingestion_service

ingestion_service = get_ingestion_service()
ingestion_service.run_ingestion_job(job_id="job-789")

显式依赖注入

适用于测试或实验不同分块/向量实现的场景:

from ai_service.services.rag.chunking import ChunkingService
from ai_service.services.rag.ingestion import IngestionService

custom_chunking_service = ChunkingService(chunk_size=500, chunk_overlap=100)
ingestion_service = IngestionService(chunking_service=custom_chunking_service)

不应在本页重复维护的内容 / What This Page Should Not Duplicate

以下内容应留在 API 参考或配置页,而不是继续堆在模块页中:

  • process_document()run_ingestion_job() 的完整签名和参数表
  • 内部 helper 的具体异常抛出细节
  • 由源码 docstring 已覆盖的返回值和字段说明
  • mkdocstrings 自动渲染完全重复的函数描述