核心服务 / Core Services
本页汇总 ai_service.services 中跨模块复用的核心对象,详情直接从源码 docstring 渲染。
- 模块职责、数据流和使用场景见 Services 模块概览
- 嵌入服务的模块级说明见 EmbeddingService 概览
- 摄取服务的模块级说明见 IngestionService 概览
- 配置项说明见 Services 配置参考
ChunkingService
Bases: ABC
Abstract base class for document chunking strategies.
All chunking strategies must implement the chunk_text method to split documents into smaller chunks suitable for embedding and retrieval.
strategy_name
abstractmethod
property
strategy_name
Return the strategy identifier.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
str |
str
|
Strategy name (e.g., 'fixed_size', 'sentence'). |
chunk_text
abstractmethod
chunk_text(
text,
document_id,
document_name,
source_id,
extra_metadata=None,
)
Split text into chunks according to the strategy.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
text
|
str
|
Text content to chunk. |
必需 |
document_id
|
str
|
Document identifier for metadata. |
必需 |
document_name
|
str
|
Document name for metadata. |
必需 |
source_id
|
str
|
Knowledge source ID for metadata. |
必需 |
extra_metadata
|
Optional[dict[str, Any]]
|
Additional metadata to include. |
None
|
返回:
| 类型 | 描述 |
|---|---|
list[ChunkResult]
|
list[ChunkResult]: List of chunk results. |
Factory function to instantiate a chunking strategy.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
strategy_type
|
str
|
Strategy identifier (e.g., 'fixed_size'). |
必需 |
params
|
Optional[dict[str, Any]]
|
Strategy-specific parameters. |
None
|
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
BaseChunkingStrategy |
BaseChunkingStrategy
|
Instantiated strategy. |
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If strategy_type is not recognized. |
Legacy service for splitting documents into chunks.
This service wraps FixedSizeChunkingStrategy for backward compatibility. New code should use the strategy classes directly.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
chunk_size |
int
|
Target size of each chunk in characters. |
chunk_overlap |
int
|
Number of overlapping characters between chunks. |
__init__
__init__(*, chunk_size=None, chunk_overlap=None)
Initialize the chunking service.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chunk_size
|
Optional[int]
|
Target chunk size. Defaults to config. |
None
|
chunk_overlap
|
Optional[int]
|
Overlap between chunks. Defaults to config. |
None
|
chunk_text
chunk_text(
text,
document_id=None,
document_name=None,
source_id=None,
extra_metadata=None,
)
Split text into chunks with overlap.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
text
|
str
|
Text content to chunk. |
必需 |
document_id
|
Optional[str]
|
Document identifier for metadata. |
None
|
document_name
|
Optional[str]
|
Document name for metadata. |
None
|
source_id
|
Optional[str]
|
Knowledge source ID for metadata. |
None
|
extra_metadata
|
Optional[dict[str, Any]]
|
Additional metadata to include. |
None
|
返回:
| 类型 | 描述 |
|---|---|
list[ChunkResult]
|
list[ChunkResult]: List of chunk results. |
estimate_chunk_count
estimate_chunk_count(text_length)
Estimate the number of chunks for a given text length.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
text_length
|
int
|
Length of the text in characters. |
必需 |
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
int |
int
|
Estimated number of chunks. |
Get or create the singleton chunking service instance.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
ChunkingService |
ChunkingService
|
The chunking service instance. |
EmbeddingService
Service for generating text embeddings.
This service routes embedding requests to the configured provider.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
backend_type |
str
|
Embedding backend type ( |
provider |
str
|
Embedding provider name. |
model_name |
str
|
Name of the embedding model. |
embedding_dim |
int
|
Dimension of the embedding vectors. |
__init__
__init__(
*,
backend_type=None,
provider=None,
model_name=None,
embedding_dim=None,
)
Initialize the embedding service.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
backend_type
|
Optional[str]
|
Embedding backend type. Defaults to config. |
None
|
provider
|
Optional[str]
|
Embedding provider. Defaults to config. |
None
|
model_name
|
Optional[str]
|
Model name. Defaults to config. |
None
|
embedding_dim
|
Optional[int]
|
Expected embedding dimension. Defaults to config. |
None
|
embed_text
embed_text(text)
Generate an embedding for a single text.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
text
|
str
|
Text to embed. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
list[float]
|
list[float]: Embedding vector. |
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If the text is empty. |
Exception
|
If embedding generation fails. |
embed_batch
embed_batch(texts)
Generate embeddings for a batch of texts.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
texts
|
list[str]
|
List of texts to embed. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
list[list[float]]
|
list[list[float]]: List of embedding vectors. |
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If any text is empty. |
Exception
|
If embedding generation fails. |
get_model_info
get_model_info()
Get information about the embedding model.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
dict |
dict
|
Model information including name and dimension. |
check_readiness
check_readiness()
Check whether the embedding model is ready, loading it if necessary.
Loads the model (downloading weights if needed), then runs a test embedding on probe text to verify the model is functional.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
EmbeddingReadinessResult |
EmbeddingReadinessResult
|
Structured result with timing and status info. |
Get or create the singleton embedding service instance.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
EmbeddingService |
EmbeddingService
|
The embedding service instance. |
IngestionService
Service for orchestrating document ingestion.
This service coordinates the complete ingestion pipeline: 1. Download raw document from MinIO 2. Parse document to extract text 3. Chunk text into smaller pieces 4. Generate embeddings for each chunk 5. Store chunks in PostgreSQL 6. Store vectors in Qdrant
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
minio_service |
MinIOStorageService
|
MinIO storage service. |
qdrant_service |
QdrantVectorService
|
Qdrant vector service. |
chunking_service |
ChunkingService
|
Document chunking service. |
embedding_service |
EmbeddingService
|
Text embedding service. |
__init__
__init__(
*,
minio_service=None,
qdrant_service=None,
chunking_service=None,
embedding_service=None,
)
Initialize the ingestion service.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
minio_service
|
Optional[MinIOStorageService]
|
MinIO service instance. |
None
|
qdrant_service
|
Optional[QdrantVectorService]
|
Qdrant service instance. |
None
|
chunking_service
|
Optional[ChunkingService]
|
Chunking service instance. |
None
|
embedding_service
|
Optional[EmbeddingService]
|
Embedding service instance. |
None
|
process_document
process_document(
*,
document_id,
agent_id=None,
job_id=None,
chunking_strategy_name=None,
chunking_params_json=None,
)
Process a single document through the ingestion pipeline.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
document_id
|
str
|
Document identifier. |
必需 |
agent_id
|
Optional[str]
|
Optional agent ID for metadata. |
None
|
job_id
|
Optional[str]
|
Optional job ID for stage-level progress updates. |
None
|
chunking_strategy_name
|
Optional[str]
|
Chunking strategy to use. |
None
|
chunking_params_json
|
Optional[str]
|
JSON string of chunking parameters. |
None
|
返回:
| 类型 | 描述 |
|---|---|
tuple[int, int]
|
tuple[int, int]: Number of chunks created and indexed. |
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If the document is not found or cannot be processed. |
run_ingestion_job
run_ingestion_job(*, job_id, document_id_list=None)
Run an ingestion job against uploaded documents in one source.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
job_id
|
str
|
Ingestion job identifier. |
必需 |
document_id_list
|
list[str] | None
|
Optional document-id allowlist for targeted ingestion. |
None
|
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If the job is not found. |
Get or create the singleton ingestion service instance.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
IngestionService |
IngestionService
|
The ingestion service instance. |
Find and mark stale RUNNING ingestion jobs as FAILED on startup.
A job is considered stale if it has been in RUNNING status for longer than INGESTION_JOB_TIMEOUT_SECONDS.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
int |
int
|
Number of stale jobs cleaned up. |
RAGRetrievalService
A retrieved document chunk with relevance score.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
chunk_id |
str
|
Unique chunk identifier. |
source_id |
str
|
Knowledge source identifier. |
document_name |
str
|
Original document filename. |
chunk_index |
int
|
Position within the document. |
content |
str
|
Text content of the chunk. |
score |
float
|
Similarity score (0-1, higher is more relevant). |
metadata |
dict[str, Any]
|
Additional metadata. |
Service for retrieving relevant context for RAG.
This service coordinates the retrieval of relevant document chunks from Qdrant based on query similarity and agent knowledge mounts.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
qdrant_service |
QdrantVectorService
|
Qdrant vector service. |
embedding_service |
EmbeddingService
|
Text embedding service. |
__init__
__init__(
*,
qdrant_service=None,
embedding_service=None,
rerank_service=None,
multi_query_service=None,
)
Initialize the RAG retrieval service.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
qdrant_service
|
Optional[QdrantVectorService]
|
Qdrant service instance. |
None
|
embedding_service
|
Optional[EmbeddingService]
|
Embedding service instance. |
None
|
rerank_service
|
Optional[RerankService]
|
Rerank service instance. |
None
|
multi_query_service
|
Optional[MultiQueryExpansionService]
|
Multi-query expansion service instance. |
None
|
retrieve_context
retrieve_context(
query,
agent_id,
*,
top_k=config.rag.top_k,
score_threshold=config.rag.score_threshold,
)
Retrieve relevant document chunks for a query.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
query
|
str
|
User query to find relevant context for. |
必需 |
agent_id
|
str
|
Agent identifier to filter by mounted sources. |
必需 |
top_k
|
int
|
Maximum number of chunks to retrieve. |
top_k
|
score_threshold
|
Optional[float]
|
Minimum similarity score threshold. |
score_threshold
|
返回:
| 类型 | 描述 |
|---|---|
list[RetrievedChunk]
|
list[RetrievedChunk]: List of relevant chunks sorted by score. |
retrieve_context_for_source
retrieve_context_for_source(
query,
*,
source_id,
source_name=None,
source_kind=KnowledgeSourceKind.GENERAL_UPLOAD.value,
managed_for_agent_id=None,
top_k=config.rag.top_k,
score_threshold=config.rag.score_threshold,
)
Retrieve relevant document chunks for a single knowledge source.
retrieve_context_for_source_with_debug
retrieve_context_for_source_with_debug(
query,
*,
source_id,
source_name=None,
source_kind=KnowledgeSourceKind.GENERAL_UPLOAD.value,
managed_for_agent_id=None,
top_k=config.rag.top_k,
score_threshold=config.rag.score_threshold,
)
Retrieve source-scoped chunks plus operator-facing retrieval diagnostics.
format_context
format_context(
chunks,
*,
max_context_length=None,
include_metadata=True,
hide_source_filename=False,
)
Format retrieved chunks into a context string for the LLM.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chunks
|
list[RetrievedChunk]
|
Retrieved chunks to format. |
必需 |
max_context_length
|
Optional[int]
|
Maximum total context length.
|
None
|
include_metadata
|
bool
|
Whether to include source metadata. |
True
|
hide_source_filename
|
bool
|
Whether to replace the source filename with a placeholder while still keeping chunk index and relevance. |
False
|
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
str |
str
|
Formatted context string. |
retrieve_and_format
retrieve_and_format(
query,
agent_id,
*,
top_k=config.rag.top_k,
score_threshold=config.rag.score_threshold,
max_context_length=None,
hide_source_filename=False,
)
Retrieve and format context in one call.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
query
|
str
|
User query. |
必需 |
agent_id
|
str
|
Agent identifier. |
必需 |
top_k
|
int
|
Maximum chunks to retrieve. |
top_k
|
score_threshold
|
Optional[float]
|
Minimum similarity score. |
score_threshold
|
max_context_length
|
Optional[int]
|
Maximum context length.
|
None
|
hide_source_filename
|
bool
|
Whether to replace the source filename with a placeholder in the formatted context. |
False
|
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
str |
str
|
Formatted context string, empty if no relevant chunks. |
Get or create the singleton RAG retrieval service instance.
返回:
| 名称 | 类型 | 描述 |
|---|---|---|
RAGRetrievalService |
RAGRetrievalService
|
The RAG retrieval service instance. |
ScheduledTaskService
Service for scheduled-task validation and persistence operations.
create_task
create_task(*, request, current_time=None)
Validate and persist one scheduled task.
get_task
get_task(*, task_id)
Fetch one scheduled task by id.
list_tasks
list_tasks(
*,
agent_id=None,
session_id=None,
status=None,
target_mode=None,
schedule_kind=None,
limit=None,
offset=0,
)
List scheduled tasks with optional filters.
list_due_tasks
list_due_tasks(*, due_before=None, limit=None)
List due active tasks for scheduler dispatch.
list_task_runs
list_task_runs(*, task_id, limit=None)
List run history for one scheduled task.
cancel_task
cancel_task(*, task_id)
Cancel one scheduled task idempotently.
update_task
update_task(*, task_id, update_request, current_time=None)
Update mutable scheduled-task fields and recompute next run time.
Session Runtime
TimerSchedulerService
DB-backed timer scheduler runtime for scheduled tasks.
ensure_worker_started
ensure_worker_started()
Start the dispatcher thread when the scheduler is enabled.
notify_task_due
notify_task_due()
Wake the dispatcher after new scheduled work is created or recovered.
start_supervisor
start_supervisor()
Start the timer scheduler supervisor and run one recovery pass.
shutdown_worker
shutdown_worker()
Stop the scheduler supervisor and dispatcher threads.
run_scheduled_task_run
run_scheduled_task_run(*, task_id, run_id, lease_token)
Execute one claimed scheduled-task run.