pipeline.py


Overview

The pipeline.py file defines the Pipeline class, which extends a directed graph representation (Graph) to model and execute complex workflows composed of interconnected components. The class facilitates running a sequence of tasks defined by a domain-specific language (DSL), tracking execution progress, logging runtime events, and updating task status in a persistent store.

This file is a core part of the InfiniFlow system’s orchestration layer, designed to manage multi-step asynchronous workflows with progress callbacks and error handling. It integrates with external services such as a document database via DocumentService and Redis for logging, enabling real-time monitoring and state persistence.


Classes

Pipeline(Graph)

Pipeline is a subclass of Graph that represents and executes a workflow pipeline. It manages execution state, logs progress, and updates metadata about the pipeline run.

Initialization

def __init__(self, dsl: str, tenant_id=None, doc_id=None, task_id=None, flow_id=None)

Methods

callback
def callback(self, component_name: str, progress: float | int | None = None, message: str = "") -> None
pipeline.callback("DataLoader", progress=0.5, message="Loading 50% complete")

fetch_logs
def fetch_logs(self)
logs = pipeline.fetch_logs()
for entry in logs:
    print(entry["component_name"], entry["trace"])

reset
def reset(self)

run
async def run(self, **kwargs)
await pipeline.run(input_data=data)

Important Implementation Details


Interactions with Other System Components


Class Diagram

classDiagram
    class Pipeline {
        -_doc_id: str | None
        -_flow_id: str | None
        -_kb_id: str | None
        +__init__(dsl: str, tenant_id=None, doc_id=None, task_id=None, flow_id=None)
        +callback(component_name: str, progress: float | int | None = None, message: str = ""): None
        +fetch_logs(): list
        +reset(): None
        +run(**kwargs): coroutine
    }
    Pipeline --|> Graph

Summary

The pipeline.py file implements a powerful, asynchronous workflow engine (Pipeline) that manages complex task sequences with integrated progress tracking and error handling. It orchestrates execution over a graph of components, dynamically discovering next steps and logging detailed runtime information to Redis and a persistent document store. The design supports extensibility, real-time observability, and multi-tenant operation, making it a critical module in the InfiniFlow system architecture.