跳转至

Services 模块概览

Services 模块(ai_service/services)位于 API 层之下、存储层之上,既包含 RAG 知识库的摄取与检索能力,也包含运行时复用的调度、执行与审计服务。它负责把原始文件转成可检索的文本块与向量,并在查询、后台任务或会话回放时复用同一套业务能力。

当前目录采用“领域包 + 基础设施包 + 顶层 package hub”的组织方式:

  • ai_service/services/rag/:摄取、分块、嵌入、检索、调试与文档处理能力
  • ai_service/services/mcp/:MCP 客户端、策略、注册、审计与 tool runtime 能力
  • ai_service/services/skills/:技能注册、执行、策略与审计能力
  • ai_service/services/evaluation/:数据集导入、Judge、门禁、指标与运行器能力
  • ai_service/services/admin/:后台认证、授权、审计与配置能力
  • ai_service/services/infra/runtime/:共享 session runtime、scheduled tasks、timer scheduler、runtime checkpoint、agent memory
  • ai_service/services/infra/observability/:token usage、Langfuse tracing 等横切观测能力
  • ai_service/services/infra/platform/:release metadata 等平台级元数据
  • ai_service/services/infra/corrections/:knowledge correction 等跨域产品工作流
  • ai_service/services/infra/qa/:e2e visual review 等开发与 QA 辅助能力
  • ai_service/services/parsers/:文档解析子系统,作为 RAG 子域中的独立子包保留

顶层 ai_service.services 仅作为显式 package hub 使用;真实实现必须落在对应领域包或 infra/ 下的明确子边界。

模块分层与依赖关系

Services 模块按“基础能力 → 业务编排 → 检索服务”的层级组织,外部存储作为依赖边界:

flowchart TB
  subgraph 基础能力层
    Parser[Parser 子系统]
    Chunking[ChunkingService]
    Embedding[EmbeddingService]
  end

  subgraph 编排层
    Ingestion[IngestionService]
  end

  subgraph 检索层
    Retrieval[RAGRetrievalService]
  end

  subgraph 外部依赖
    MinIO[MinIO]
    Postgres[PostgreSQL]
    Qdrant[Qdrant]
  end

  Ingestion --> Parser
  Ingestion --> Chunking
  Ingestion --> Embedding
  Ingestion --> MinIO
  Ingestion --> Postgres
  Ingestion --> Qdrant

  Retrieval --> Embedding
  Retrieval --> Postgres
  Retrieval --> Qdrant

这张图强调“层级关系”:解析、分块、嵌入是基础能力;摄取是编排层;检索是对外提供知识上下文的服务层;存储是外部依赖。

服务清单

  • Parser 子系统:按内容类型选择解析器,把文档字节转成纯文本与元数据。
  • ChunkingService:将长文本按可配置的大小与重叠策略切分为文本块。
  • EmbeddingService:加载并缓存嵌入模型,生成文本向量。
  • IngestionService:编排摄取流程,串联解析、分块、嵌入与存储。
  • RAGRetrievalService:基于查询向量检索相关文本块,并格式化上下文。
  • ScheduledTaskService:验证 at/every/cron 定义,持久化 task / run,并向 orchestrator scheduler stage 提供 create/list/cancel 能力。
  • Session Runtime:抽取 WebSocket chat 与后台调度共用的 execute_session_turn(...) 路径,统一 turn 持久化与观测。
  • TimerSchedulerService:以 supervisor / dispatcher / lease / heartbeat 模式执行 due task,并把结果回写到任务账本。

快速上手

文档摄取

from ai_service.services.rag.ingestion import get_ingestion_service

# 获取摄取服务实例
ingestion_service = get_ingestion_service()

# 处理已上传到 MinIO 的文档
chunks_created, chunks_indexed = ingestion_service.process_document(
    document_id="doc-123",
    agent_id="agent-456"
)

print(f"处理完成: 创建 {chunks_created} 个分块,索引 {chunks_indexed} 个向量")

知识检索

from ai_service.services.rag.retrieval import get_rag_service

# 获取检索服务实例
rag_service = get_rag_service()

# 检索相关上下文
context = rag_service.retrieve_and_format(
    query="退款政策是什么?",
    agent_id="agent-456",
    top_k=5,
    score_threshold=0.3
)

print(f"检索上下文:\n{context}")

何时使用各服务

ChunkingService

  • 需要把长文档切成可嵌入的文本块
  • 希望控制分块大小与重叠策略
  • 需要保留文档元数据

EmbeddingService

  • 需要生成文本向量或批量向量
  • 需要离线模型加载与健康检查
  • 需要复用模型以减少加载开销

IngestionService

  • 需要端到端摄取流程
  • 需要处理解析、分块、嵌入与写入存储
  • 需要任务级进度与超时管理

RAGRetrievalService

  • 需要按 Agent 挂载的知识源检索
  • 需要格式化上下文供 LLM 使用
  • 需要控制 top_k 与相似度阈值

ScheduledTaskService / TimerSchedulerService

  • 需要把 conversation 内的定时请求持久化成 durable task
  • 需要以后台 worker 方式 claim due task 并重放到共享 session runtime
  • 需要保留 run history、worker heartbeat 与 takeover conflict 处理结果

Session Runtime

  • 需要让 WebSocket chat 与后台定时任务共用同一 turn 执行路径
  • 需要统一消息持久化、conversation turn 记录与 observability payload
  • 需要避免在 route 层复制第二套 orchestrator 调用逻辑

服务设计约定

  1. 单例入口:每个服务提供 get_*_service() 工厂函数,减少重复初始化。
  2. 依赖注入:构造函数支持传入自定义依赖,便于测试与替换。
  3. 配置中心:统一从 config 读取参数,但允许构造时覆盖。
  4. 错误可观测:所有服务使用结构化日志记录失败原因。
  5. 类型安全:关键数据结构使用类型注解与 Pydantic 模型。

新增服务放置规则

新增服务文件必须优先进入已有领域包或 infra/ 的明确子边界,而不是继续向 ai_service/services 顶层追加平铺模块:

  • MCP 相关能力放入 ai_service/services/mcp/
  • RAG 相关能力放入 ai_service/services/rag/
  • Skills 相关能力放入 ai_service/services/skills/
  • Evaluation 相关能力放入 ai_service/services/evaluation/
  • Admin 相关能力放入 ai_service/services/admin/
  • 运行时基础设施放入 ai_service/services/infra/runtime/
  • 观测能力放入 ai_service/services/infra/observability/
  • 平台级元数据放入 ai_service/services/infra/platform/
  • 跨域纠错工作流放入 ai_service/services/infra/corrections/
  • QA 与开发辅助能力放入 ai_service/services/infra/qa/
  • 不再新增顶层 ai_service/services/*.py 平铺模块

如果需要统一对外暴露入口,应优先通过 ai_service/services/__init__.py 做显式 hub re-export,而不是新增薄 wrapper 文件。

下一步