pipeline.py
Overview
The pipeline.py file defines the Pipeline class, which extends a directed graph representation (Graph) to model and execute complex workflows composed of interconnected components. The class facilitates running a sequence of tasks defined by a domain-specific language (DSL), tracking execution progress, logging runtime events, and updating task status in a persistent store.
This file is a core part of the InfiniFlow system’s orchestration layer, designed to manage multi-step asynchronous workflows with progress callbacks and error handling. It integrates with external services such as a document database via DocumentService and Redis for logging, enabling real-time monitoring and state persistence.
Classes
Pipeline(Graph)
Pipeline is a subclass of Graph that represents and executes a workflow pipeline. It manages execution state, logs progress, and updates metadata about the pipeline run.
Initialization
def __init__(self, dsl: str, tenant_id=None, doc_id=None, task_id=None, flow_id=None)
Parameters:
dsl(str): A domain-specific language string defining the graph structure of the pipeline.tenant_id(optional): Identifier for tenant (multi-tenant environment).doc_id(optional): Identifier for a document associated with this pipeline run.task_id(optional): Identifier for the current task.flow_id(optional): Identifier for the overall workflow or flow.
Behavior:
Calls the superclass
Graphinitializer withdsl,tenant_id, andtask_id.Stores
doc_idandflow_idinternally.If
doc_idis provided, fetches the associated knowledge base ID (_kb_id) usingDocumentService.get_knowledgebase_id().Asserts that a knowledge base ID exists for the given document.
Methods
callback
def callback(self, component_name: str, progress: float | int | None = None, message: str = "") -> None
Purpose:
Logs progress and status messages for a particular component during pipeline execution. Stores log entries in Redis with a time-to-live of 10 minutes.Parameters:
component_name(str): The name of the component sending the callback.progress(float | int | None): Progress percentage or step indicator.message(str): A descriptive message about the current state or progress.
Behavior:
Constructs a Redis key combining
flow_idandtask_id.Tries to fetch existing logs from Redis.
Appends the new progress and message to the trace list of the component, or creates a new entry if none exists.
Handles exceptions by logging errors.
Example usage:
pipeline.callback("DataLoader", progress=0.5, message="Loading 50% complete")
fetch_logs
def fetch_logs(self)
Purpose:
Retrieves the stored log entries for the current flow and task from Redis.Returns:
List of log entries if found, else an empty list.
Behavior:
Attempts to get the log data from Redis.
Parses JSON and returns the deserialized log object.
Logs exceptions and returns empty list on failure.
Example usage:
logs = pipeline.fetch_logs()
for entry in logs:
print(entry["component_name"], entry["trace"])
reset
def reset(self)
Purpose:
Resets the pipeline execution state and clears logs for the current flow and task.Behavior:
Calls the superclass
resetmethod.Clears the Redis logs for the current flow-task key by setting an empty list with a 10-minute expiry.
Logs any exceptions encountered during the Redis operation.
run
async def run(self, **kwargs)
Purpose:
Runs the pipeline asynchronously, invoking each component in the workflow graph sequentially, passing outputs downstream.Parameters:
**kwargs: Arbitrary keyword arguments passed to the first component'sinvokemethod.
Behavior:
Starts a timer to measure pipeline execution duration.
Ensures the pipeline path has at least one component ("File") if empty.
If associated with a document (
_doc_id), updates the document's progress status to indicate pipeline start.Iteratively invokes components in the pipeline path:
Invokes the first component with the provided kwargs.
Checks for errors; if any, aborts the pipeline run.
For each subsequent component, invokes it asynchronously passing the previous component's output.
Updates the path with downstream components dynamically.
If an error occurs in any component, logs the error and aborts execution.
Upon completion or error, updates the document's progress status including duration and error messages.
Algorithmic Details:
Uses
triofor asynchronous concurrency.Dynamically extends the pipeline path with downstream components discovered during execution.
Maintains error state to exit early on failure.
Example usage:
await pipeline.run(input_data=data)
Important Implementation Details
Integration with Redis (
REDIS_CONN):
Redis is used as a transient store for logging progress messages with a TTL of 10 minutes, enabling real-time monitoring of pipeline execution.Document Service Updates:
The pipeline updates the document metadata in a database (viaDocumentService) to reflect progress and completion status, allowing external systems to track the pipeline lifecycle.Asynchronous Execution:
The pipeline leveragestrioto manage concurrency, invoking components asynchronously to improve efficiency where possible.Dynamic Path Expansion:
The pipeline path is not fixed at construction time; downstream components are discovered and appended during execution, supporting dynamic workflow graphs.
Interactions with Other System Components
Graph(fromagent.canvas):
The base class providing graph structure, component retrieval, and reset capabilities.DocumentService(fromapi.db.services.document_service):
Provides access to document metadata and knowledge base IDs, as well as updating document progress.REDIS_CONN(fromrag.utils.redis_conn):
Redis connection instance used for caching and storing logs related to pipeline runs.Pipeline Components:
Each node/component in the graph is expected to provide:invoke()method for execution.error()method to report errors.output()method returning data for downstream components.get_downstream()method returning names of subsequent components.
Class Diagram
classDiagram
class Pipeline {
-_doc_id: str | None
-_flow_id: str | None
-_kb_id: str | None
+__init__(dsl: str, tenant_id=None, doc_id=None, task_id=None, flow_id=None)
+callback(component_name: str, progress: float | int | None = None, message: str = ""): None
+fetch_logs(): list
+reset(): None
+run(**kwargs): coroutine
}
Pipeline --|> Graph
Summary
The pipeline.py file implements a powerful, asynchronous workflow engine (Pipeline) that manages complex task sequences with integrated progress tracking and error handling. It orchestrates execution over a graph of components, dynamically discovering next steps and logging detailed runtime information to Redis and a persistent document store. The design supports extensibility, real-time observability, and multi-tenant operation, making it a critical module in the InfiniFlow system architecture.