base.py


Overview

The base.py file is a foundational module in the InfiniFlow agent component framework. It defines base classes that encapsulate core functionality required for processing components within an asynchronous pipeline architecture. Specifically, it provides:

Together, these classes establish a standardized interface and lifecycle for pipeline components, facilitating reliable async invocation, error handling, and integration with the broader agent and canvas infrastructure.


Classes and Methods

ProcessParamBase(ComponentParamBase)

A subclass of ComponentParamBase that adds process-specific configuration parameters.

Initialization

def __init__(self)

Usage Example

param = ProcessParamBase()
param.timeout = 300  # Override timeout to 5 minutes
param.persist_logs = False  # Disable log persistence

ProcessBase(ComponentBase)

An abstract base class for asynchronous process components in the InfiniFlow pipeline.

Initialization

def __init__(self, pipeline, id, param: ProcessParamBase)

Public Methods

async invoke(self, **kwargs) -> dict[str, Any]

Invokes the process asynchronously, managing inputs, outputs, timeout, and error handling.

result = await process_component.invoke(data="input_data", flag=True)
print(result["_elapsed_time"])
async _invoke(self, **kwargs)

Abstract method meant to be implemented by subclasses to define the actual process logic.

async def _invoke(self, **kwargs):
    # Implement actual process logic here
    await some_async_task()
    self.set_output("result", computed_value)

Important Implementation Details


Interaction with Other System Components


Visual Diagram: Class Structure

classDiagram
    class ComponentParamBase {
        <<abstract>>
    }
    class ProcessParamBase {
        +timeout: int
        +persist_logs: bool
        +__init__()
    }
    ComponentParamBase <|-- ProcessParamBase

    class ComponentBase {
        <<abstract>>
        +set_output(key, value)
        +output(key=None)
        +get_exception_default_value()
        +set_exception_default_value()
    }
    class ProcessBase {
        +__init__(pipeline, id, param: ProcessParamBase)
        +async invoke(**kwargs) dict[str, Any]
        +async _invoke(**kwargs)
        -callback
    }
    ComponentBase <|-- ProcessBase

    ProcessBase o-- ProcessParamBase : uses

Summary

The base.py file provides essential building blocks for asynchronous processing components within the InfiniFlow pipeline system. By defining ProcessParamBase and ProcessBase, it establishes a pattern for consistent configuration, execution with timeout control, error handling, and callback reporting. This foundation enables developers to implement specific process components by subclassing ProcessBase and overriding the _invoke method with custom async logic, while seamlessly integrating with the larger agent and canvas framework.