chunker.py
Overview
The chunker.py file is a core component of the InfiniFlow system responsible for dividing large textual or structured documents into manageable "chunks" for downstream processing. This chunking process enables efficient handling of documents for tasks such as information retrieval, question answering, keyword extraction, and summarization within the Retrieval-Augmented Generation (RAG) framework.
This module supports multiple chunking methods tailored to different document types and use cases (e.g., general text, Q&A documents, resumes, books, papers). It integrates with external language models (LLMs) for optional automatic keyword extraction and question proposal to enrich the chunked content.
Key functionalities include:
Flexible chunking strategies depending on document type.
Support for chunking of multiple content formats including markdown, plain text, HTML, and JSON with images.
Asynchronous integration with LLM services to generate keywords and questions.
Configurable chunk size, overlap, and additional metadata tagging.
Classes and Functions
Class: ChunkerParam
Description:
Defines and validates the parameters controlling chunking behavior.
Inheritance:ProcessParamBase
Properties:
method_options(list[str]): Allowed chunking methods (e.g.,"general","q&a","resume", etc.).method(str): Selected chunking method; defaults to"general".chunk_token_size(int): Target token length for each chunk; default is 512 tokens.delimiter(str): String delimiter used for chunk splitting; default is newline"\n".overlapped_percent(float): Overlap ratio between chunks, range[0, 1).page_rank(int): Optional page rank metadata to assign to chunks.auto_keywords(int): Number of top keywords to auto-extract per chunk (0 to disable).auto_questions(int): Number of top questions to auto-generate per chunk (0 to disable).tag_sets(list): Reserved for future tagging features.llm_setting(dict): Configuration for LLM usage, including "llm_name" and language.
Methods:
check() -> None
Validates parameter values, ensuring they conform to expected ranges and options.get_input_form() -> dict[str, dict]
Returns an empty dictionary, placeholder for UI or API input form schema.
Usage Example:
param = ChunkerParam()
param.method = "general"
param.chunk_token_size = 256
param.check() # Raises error if invalid
Class: Chunker
Description:
Main processing class that implements document chunking workflows. Derives from ProcessBase to integrate into the RAG flow.
Properties:
component_name(str): Identifier"Chunker"used within the system.
Internal Methods (Chunking Strategies):
Each method receives a ChunkerFromUpstream object encapsulating the incoming document data and returns a list of chunk dictionaries.
_general(from_upstream: ChunkerFromUpstream) -> list[dict]
Implements general chunking logic supporting markdown, text, HTML, and structured JSON with images.
Uses token-based splitting with optional overlap vianaive_mergeornaive_merge_with_images.
Cleans text chunks by removing internal tags and extracting positional metadata._q_and_a,_resume,_manual,_table,_paper,_book,_laws,_presentation,_one
Placeholders for specialized chunking methods tailored to specific document types or scenarios. Currently unimplemented.
Core Async Method:
async def _invoke(self, **kwargs) -> None
Entry point for chunking invocation with parameters passed viakwargs.Workflow:
Validates and parses input into
ChunkerFromUpstream.Dispatches to the appropriate chunking method based on
self._param.method.If configured, asynchronously generates:
Keywords per chunk using an LLM model (
keyword_extraction).Questions per chunk using an LLM model (
question_proposal).
Assigns
page_rankmetadata if provided.Sets the resulting chunks as output for downstream components.
Parameters:
**kwargs: Arbitrary keyword arguments expected to matchChunkerFromUpstreamschema.
Returns:
None (outputs chunks via
self.set_output)
Usage Example:
chunker = Chunker()
chunker._param.method = "general"
await chunker._invoke(json_result=my_json_doc)
# Outputs chunk list accessible via chunker outputs
Implementation Details and Algorithms
Chunking Mechanism:
The general chunking algorithm usesnaive_mergeandnaive_merge_with_imagesutilities which split texts into chunks approximately equal tochunk_token_sizetokens with optional overlap (overlapped_percent). This approach balances chunk size with context preservation.Content Formats Supported:
Markdown, plain text, and HTML are treated as raw strings.
JSON input supports sections with text and positional tags plus associated images, enabling richer chunk metadata.
LLM Integration:
Uses
LLMBundleto instantiate language model clients configured per tenant and language.Employs caching (
get_llm_cache,set_llm_cache) to avoid redundant LLM calls.Executes keyword extraction and question proposal asynchronously with concurrency control (
chat_limiter).
Asynchronous Execution:
Uses
trioconcurrency framework for managing asynchronous tasks.Runs blocking LLM API calls in separate threads via
trio.to_thread.run_syncfor responsiveness.
Interaction With Other System Components
Input Source:
Receives upstream document data modeled byChunkerFromUpstreamfrom previous pipeline stages.Output:
Emits chunked document pieces with optional metadata for consumption by subsequent RAG components (e.g., indexing, retrieval).External Dependencies:
api.db.services.llm_service.LLMBundlefor language model instantiation.deepdoc.parser.pdf_parser.RAGFlowPdfParserfor text cleaning and position extraction.rag.nlputilities for merging text.rag.prompts.promptsfor keyword and question generation prompts.
Concurrency Control:
Useschat_limiterto throttle LLM calls ensuring fair resource usage.
Mermaid Class Diagram
classDiagram
class ChunkerParam {
+method_options: list[str]
+method: str
+chunk_token_size: int
+delimiter: str
+overlapped_percent: float
+page_rank: int
+auto_keywords: int
+auto_questions: int
+tag_sets: list
+llm_setting: dict
+check()
+get_input_form() dict
}
class Chunker {
+component_name: str
-_general(from_upstream)
-_q_and_a(from_upstream)
-_resume(from_upstream)
-_manual(from_upstream)
-_table(from_upstream)
-_paper(from_upstream)
-_book(from_upstream)
-_laws(from_upstream)
-_presentation(from_upstream)
-_one(from_upstream)
+async _invoke(**kwargs)
}
ChunkerParam <|-- Chunker : uses
Summary
The chunker.py file is a foundational module that partitions documents into coherent chunks based on configurable methods and parameters. It supports multiple input formats and integrates advanced LLM-based enrichment for keywords and questions. Its asynchronous design and caching optimize throughput within the InfiniFlow RAG pipeline. The file serves as a key preparatory step enabling efficient downstream knowledge retrieval and generation tasks.