跳转至

核心服务 / Core Services

本页汇总 ai_service.services 中跨模块复用的核心对象,详情直接从源码 docstring 渲染。

ChunkingService

Bases: ABC

Abstract base class for document chunking strategies.

All chunking strategies must implement the chunk_text method to split documents into smaller chunks suitable for embedding and retrieval.

strategy_name abstractmethod property

strategy_name

Return the strategy identifier.

返回:

名称 类型 描述
str str

Strategy name (e.g., 'fixed_size', 'sentence').

chunk_text abstractmethod

chunk_text(
    text,
    document_id,
    document_name,
    source_id,
    extra_metadata=None,
)

Split text into chunks according to the strategy.

参数:

名称 类型 描述 默认
text str

Text content to chunk.

必需
document_id str

Document identifier for metadata.

必需
document_name str

Document name for metadata.

必需
source_id str

Knowledge source ID for metadata.

必需
extra_metadata Optional[dict[str, Any]]

Additional metadata to include.

None

返回:

类型 描述
list[ChunkResult]

list[ChunkResult]: List of chunk results.

Factory function to instantiate a chunking strategy.

参数:

名称 类型 描述 默认
strategy_type str

Strategy identifier (e.g., 'fixed_size').

必需
params Optional[dict[str, Any]]

Strategy-specific parameters.

None

返回:

名称 类型 描述
BaseChunkingStrategy BaseChunkingStrategy

Instantiated strategy.

引发:

类型 描述
ValueError

If strategy_type is not recognized.

Legacy service for splitting documents into chunks.

This service wraps FixedSizeChunkingStrategy for backward compatibility. New code should use the strategy classes directly.

属性:

名称 类型 描述
chunk_size int

Target size of each chunk in characters.

chunk_overlap int

Number of overlapping characters between chunks.

__init__

__init__(*, chunk_size=None, chunk_overlap=None)

Initialize the chunking service.

参数:

名称 类型 描述 默认
chunk_size Optional[int]

Target chunk size. Defaults to config.

None
chunk_overlap Optional[int]

Overlap between chunks. Defaults to config.

None

chunk_text

chunk_text(
    text,
    document_id=None,
    document_name=None,
    source_id=None,
    extra_metadata=None,
)

Split text into chunks with overlap.

参数:

名称 类型 描述 默认
text str

Text content to chunk.

必需
document_id Optional[str]

Document identifier for metadata.

None
document_name Optional[str]

Document name for metadata.

None
source_id Optional[str]

Knowledge source ID for metadata.

None
extra_metadata Optional[dict[str, Any]]

Additional metadata to include.

None

返回:

类型 描述
list[ChunkResult]

list[ChunkResult]: List of chunk results.

estimate_chunk_count

estimate_chunk_count(text_length)

Estimate the number of chunks for a given text length.

参数:

名称 类型 描述 默认
text_length int

Length of the text in characters.

必需

返回:

名称 类型 描述
int int

Estimated number of chunks.

Get or create the singleton chunking service instance.

返回:

名称 类型 描述
ChunkingService ChunkingService

The chunking service instance.

EmbeddingService

Service for generating text embeddings.

This service routes embedding requests to the configured provider.

属性:

名称 类型 描述
backend_type str

Embedding backend type (api or local).

provider str

Embedding provider name.

model_name str

Name of the embedding model.

embedding_dim int

Dimension of the embedding vectors.

__init__

__init__(
    *,
    backend_type=None,
    provider=None,
    model_name=None,
    embedding_dim=None,
)

Initialize the embedding service.

参数:

名称 类型 描述 默认
backend_type Optional[str]

Embedding backend type. Defaults to config.

None
provider Optional[str]

Embedding provider. Defaults to config.

None
model_name Optional[str]

Model name. Defaults to config.

None
embedding_dim Optional[int]

Expected embedding dimension. Defaults to config.

None

embed_text

embed_text(text)

Generate an embedding for a single text.

参数:

名称 类型 描述 默认
text str

Text to embed.

必需

返回:

类型 描述
list[float]

list[float]: Embedding vector.

引发:

类型 描述
ValueError

If the text is empty.

Exception

If embedding generation fails.

embed_batch

embed_batch(texts)

Generate embeddings for a batch of texts.

参数:

名称 类型 描述 默认
texts list[str]

List of texts to embed.

必需

返回:

类型 描述
list[list[float]]

list[list[float]]: List of embedding vectors.

引发:

类型 描述
ValueError

If any text is empty.

Exception

If embedding generation fails.

get_model_info

get_model_info()

Get information about the embedding model.

返回:

名称 类型 描述
dict dict

Model information including name and dimension.

check_readiness

check_readiness()

Check whether the embedding model is ready, loading it if necessary.

Loads the model (downloading weights if needed), then runs a test embedding on probe text to verify the model is functional.

返回:

名称 类型 描述
EmbeddingReadinessResult EmbeddingReadinessResult

Structured result with timing and status info.

Get or create the singleton embedding service instance.

返回:

名称 类型 描述
EmbeddingService EmbeddingService

The embedding service instance.

IngestionService

Service for orchestrating document ingestion.

This service coordinates the complete ingestion pipeline: 1. Download raw document from MinIO 2. Parse document to extract text 3. Chunk text into smaller pieces 4. Generate embeddings for each chunk 5. Store chunks in PostgreSQL 6. Store vectors in Qdrant

属性:

名称 类型 描述
minio_service MinIOStorageService

MinIO storage service.

qdrant_service QdrantVectorService

Qdrant vector service.

chunking_service ChunkingService

Document chunking service.

embedding_service EmbeddingService

Text embedding service.

__init__

__init__(
    *,
    minio_service=None,
    qdrant_service=None,
    chunking_service=None,
    embedding_service=None,
)

Initialize the ingestion service.

参数:

名称 类型 描述 默认
minio_service Optional[MinIOStorageService]

MinIO service instance.

None
qdrant_service Optional[QdrantVectorService]

Qdrant service instance.

None
chunking_service Optional[ChunkingService]

Chunking service instance.

None
embedding_service Optional[EmbeddingService]

Embedding service instance.

None

process_document

process_document(
    *,
    document_id,
    agent_id=None,
    job_id=None,
    chunking_strategy_name=None,
    chunking_params_json=None,
)

Process a single document through the ingestion pipeline.

参数:

名称 类型 描述 默认
document_id str

Document identifier.

必需
agent_id Optional[str]

Optional agent ID for metadata.

None
job_id Optional[str]

Optional job ID for stage-level progress updates.

None
chunking_strategy_name Optional[str]

Chunking strategy to use.

None
chunking_params_json Optional[str]

JSON string of chunking parameters.

None

返回:

类型 描述
tuple[int, int]

tuple[int, int]: Number of chunks created and indexed.

引发:

类型 描述
ValueError

If the document is not found or cannot be processed.

run_ingestion_job

run_ingestion_job(*, job_id, document_id_list=None)

Run an ingestion job against uploaded documents in one source.

参数:

名称 类型 描述 默认
job_id str

Ingestion job identifier.

必需
document_id_list list[str] | None

Optional document-id allowlist for targeted ingestion.

None

引发:

类型 描述
ValueError

If the job is not found.

Get or create the singleton ingestion service instance.

返回:

名称 类型 描述
IngestionService IngestionService

The ingestion service instance.

Find and mark stale RUNNING ingestion jobs as FAILED on startup.

A job is considered stale if it has been in RUNNING status for longer than INGESTION_JOB_TIMEOUT_SECONDS.

返回:

名称 类型 描述
int int

Number of stale jobs cleaned up.

RAGRetrievalService

A retrieved document chunk with relevance score.

属性:

名称 类型 描述
chunk_id str

Unique chunk identifier.

source_id str

Knowledge source identifier.

document_name str

Original document filename.

chunk_index int

Position within the document.

content str

Text content of the chunk.

score float

Similarity score (0-1, higher is more relevant).

metadata dict[str, Any]

Additional metadata.

Service for retrieving relevant context for RAG.

This service coordinates the retrieval of relevant document chunks from Qdrant based on query similarity and agent knowledge mounts.

属性:

名称 类型 描述
qdrant_service QdrantVectorService

Qdrant vector service.

embedding_service EmbeddingService

Text embedding service.

__init__

__init__(
    *,
    qdrant_service=None,
    embedding_service=None,
    rerank_service=None,
    multi_query_service=None,
)

Initialize the RAG retrieval service.

参数:

名称 类型 描述 默认
qdrant_service Optional[QdrantVectorService]

Qdrant service instance.

None
embedding_service Optional[EmbeddingService]

Embedding service instance.

None
rerank_service Optional[RerankService]

Rerank service instance.

None
multi_query_service Optional[MultiQueryExpansionService]

Multi-query expansion service instance.

None

retrieve_context

retrieve_context(
    query,
    agent_id,
    *,
    top_k=config.rag.top_k,
    score_threshold=config.rag.score_threshold,
)

Retrieve relevant document chunks for a query.

参数:

名称 类型 描述 默认
query str

User query to find relevant context for.

必需
agent_id str

Agent identifier to filter by mounted sources.

必需
top_k int

Maximum number of chunks to retrieve.

top_k
score_threshold Optional[float]

Minimum similarity score threshold.

score_threshold

返回:

类型 描述
list[RetrievedChunk]

list[RetrievedChunk]: List of relevant chunks sorted by score.

retrieve_context_for_source

retrieve_context_for_source(
    query,
    *,
    source_id,
    source_name=None,
    source_kind=KnowledgeSourceKind.GENERAL_UPLOAD.value,
    managed_for_agent_id=None,
    top_k=config.rag.top_k,
    score_threshold=config.rag.score_threshold,
)

Retrieve relevant document chunks for a single knowledge source.

retrieve_context_for_source_with_debug

retrieve_context_for_source_with_debug(
    query,
    *,
    source_id,
    source_name=None,
    source_kind=KnowledgeSourceKind.GENERAL_UPLOAD.value,
    managed_for_agent_id=None,
    top_k=config.rag.top_k,
    score_threshold=config.rag.score_threshold,
)

Retrieve source-scoped chunks plus operator-facing retrieval diagnostics.

format_context

format_context(
    chunks,
    *,
    max_context_length=None,
    include_metadata=True,
    hide_source_filename=False,
)

Format retrieved chunks into a context string for the LLM.

参数:

名称 类型 描述 默认
chunks list[RetrievedChunk]

Retrieved chunks to format.

必需
max_context_length Optional[int]

Maximum total context length. None means no truncation.

None
include_metadata bool

Whether to include source metadata.

True
hide_source_filename bool

Whether to replace the source filename with a placeholder while still keeping chunk index and relevance.

False

返回:

名称 类型 描述
str str

Formatted context string.

retrieve_and_format

retrieve_and_format(
    query,
    agent_id,
    *,
    top_k=config.rag.top_k,
    score_threshold=config.rag.score_threshold,
    max_context_length=None,
    hide_source_filename=False,
)

Retrieve and format context in one call.

参数:

名称 类型 描述 默认
query str

User query.

必需
agent_id str

Agent identifier.

必需
top_k int

Maximum chunks to retrieve.

top_k
score_threshold Optional[float]

Minimum similarity score.

score_threshold
max_context_length Optional[int]

Maximum context length. None means no truncation.

None
hide_source_filename bool

Whether to replace the source filename with a placeholder in the formatted context.

False

返回:

名称 类型 描述
str str

Formatted context string, empty if no relevant chunks.

Get or create the singleton RAG retrieval service instance.

返回:

名称 类型 描述
RAGRetrievalService RAGRetrievalService

The RAG retrieval service instance.

ScheduledTaskService

Bases: BaseModel

Validated input payload for creating one scheduled task.

Resolve the next run time and normalized payload for one schedule definition.

Service for scheduled-task validation and persistence operations.

create_task

create_task(*, request, current_time=None)

Validate and persist one scheduled task.

get_task

get_task(*, task_id)

Fetch one scheduled task by id.

list_tasks

list_tasks(
    *,
    agent_id=None,
    session_id=None,
    status=None,
    target_mode=None,
    schedule_kind=None,
    limit=None,
    offset=0,
)

List scheduled tasks with optional filters.

list_due_tasks

list_due_tasks(*, due_before=None, limit=None)

List due active tasks for scheduler dispatch.

list_task_runs

list_task_runs(*, task_id, limit=None)

List run history for one scheduled task.

cancel_task

cancel_task(*, task_id)

Cancel one scheduled task idempotently.

update_task

update_task(*, task_id, update_request, current_time=None)

Update mutable scheduled-task fields and recompute next run time.

Return the singleton scheduled-task service.

Session Runtime

Request payload for one shared session-turn execution.

Result payload for one shared session-turn execution.

Execute one session turn using the shared orchestration and persistence path.

TimerSchedulerService

DB-backed timer scheduler runtime for scheduled tasks.

ensure_worker_started

ensure_worker_started()

Start the dispatcher thread when the scheduler is enabled.

notify_task_due

notify_task_due()

Wake the dispatcher after new scheduled work is created or recovered.

start_supervisor

start_supervisor()

Start the timer scheduler supervisor and run one recovery pass.

shutdown_worker

shutdown_worker()

Stop the scheduler supervisor and dispatcher threads.

run_scheduled_task_run

run_scheduled_task_run(*, task_id, run_id, lease_token)

Execute one claimed scheduled-task run.

Return the singleton timer scheduler service.