base.py
Overview
The base.py file is a foundational module in the InfiniFlow agent component framework. It defines base classes that encapsulate core functionality required for processing components within an asynchronous pipeline architecture. Specifically, it provides:
ProcessParamBase: A parameter class that holds configuration options for process components, including timeout and log persistence.ProcessBase: An abstract base class for defining process components that execute asynchronous operations with timeout management, output handling, and error reporting.
Together, these classes establish a standardized interface and lifecycle for pipeline components, facilitating reliable async invocation, error handling, and integration with the broader agent and canvas infrastructure.
Classes and Methods
ProcessParamBase(ComponentParamBase)
A subclass of ComponentParamBase that adds process-specific configuration parameters.
Initialization
def __init__(self)
Purpose: Initializes process parameters with default values.
Attributes:
timeout(int): Maximum allowed execution time for the process in seconds. Default is a very large number (100000000), effectively no timeout unless overridden.persist_logs(bool): Flag indicating whether to persist logs related to this process. Defaults toTrue.
Usage Example
param = ProcessParamBase()
param.timeout = 300 # Override timeout to 5 minutes
param.persist_logs = False # Disable log persistence
ProcessBase(ComponentBase)
An abstract base class for asynchronous process components in the InfiniFlow pipeline.
Initialization
def __init__(self, pipeline, id, param: ProcessParamBase)
Parameters:
pipeline: The pipeline instance this component belongs to.id: Unique identifier for the component.param(ProcessParamBase): Configuration parameters for this process.
Behavior:
Calls the parent
ComponentBaseconstructor.Sets up a
callbackfunction referencing the canvas callback if available, allowing the component to report status updates back to the UI or controller. If not available, a no-op callback is used.
Public Methods
async invoke(self, **kwargs) -> dict[str, Any]
Invokes the process asynchronously, managing inputs, outputs, timeout, and error handling.
Parameters: Arbitrary keyword arguments representing input parameters to the process.
Returns: A dictionary of output values collected during execution.
Behavior:
Records the start time of invocation.
Sets each input key-value pair as an output (likely for tracking/input echoing).
Executes the internal
_invokemethod, enforcing the configured timeout viatrio.fail_after.On success, triggers callback with status
1and message "Done".On exception:
If a default exception value is defined, sets that as output.
Otherwise, sets an
_ERRORoutput with the exception string.Logs the exception with stack trace.
Triggers callback with status
-1and the error message.
Finally, records the elapsed time of execution.
Example Usage:
result = await process_component.invoke(data="input_data", flag=True)
print(result["_elapsed_time"])
async _invoke(self, **kwargs)
Abstract method meant to be implemented by subclasses to define the actual process logic.
Parameters: Arbitrary keyword arguments representing input parameters.
Returns: None.
Behavior:
Decorated with a timeout that defaults to the environment variable
COMPONENT_EXEC_TIMEOUTor 600 seconds (10 minutes).Raises
NotImplementedErrorby default to enforce subclass implementation.
Example Override:
async def _invoke(self, **kwargs):
# Implement actual process logic here
await some_async_task()
self.set_output("result", computed_value)
Important Implementation Details
Timeout Management:
Usestrio.fail_afterininvoketo enforce the timeout specified inProcessParamBase.timeout. Additionally,_invokeis decorated with a@timeoutdecorator that uses an environment variableCOMPONENT_EXEC_TIMEOUTas a fallback timeout mechanism. This double timeout mechanism ensures robust time management.Callback Integration:
Thecallbackfunction is dynamically assigned based on whether the_canvasobject has acallbackmethod. This enables real-time status reporting of process completion or failure back to the UI or controller layer.Output Handling:
Inputs passed toinvokeare immediately set as outputs, allowing the process output dictionary to reflect both inputs and results. Exception handling also sets error information in the output dictionary under the_ERRORkey.Asynchronous Execution:
The base class is designed for asynchronous execution withasyncmethods and integration with the Trio async library, which supports structured concurrency and cancellation.
Interaction with Other System Components
ComponentBaseandComponentParamBase:ProcessBaseandProcessParamBaseextend these base classes, inheriting common component infrastructure such as output management and parameter handling._canvasObject:
The_canvasattribute (likely part of theComponentBase) is checked for acallbackmethod. The canvas component manages overall pipeline visualization and coordination, receiving status updates via callbacks.api.utils.api_utils.timeoutDecorator:
The_invokemethod uses this decorator to impose a timeout, which presumably raises an exception if the decorated coroutine takes too long, integrating with the environment configuration.trioLibrary:
Trio is used for concurrency primitives, especiallyfail_afterto implement timeout for the invocation method.
Visual Diagram: Class Structure
classDiagram
class ComponentParamBase {
<<abstract>>
}
class ProcessParamBase {
+timeout: int
+persist_logs: bool
+__init__()
}
ComponentParamBase <|-- ProcessParamBase
class ComponentBase {
<<abstract>>
+set_output(key, value)
+output(key=None)
+get_exception_default_value()
+set_exception_default_value()
}
class ProcessBase {
+__init__(pipeline, id, param: ProcessParamBase)
+async invoke(**kwargs) dict[str, Any]
+async _invoke(**kwargs)
-callback
}
ComponentBase <|-- ProcessBase
ProcessBase o-- ProcessParamBase : uses
Summary
The base.py file provides essential building blocks for asynchronous processing components within the InfiniFlow pipeline system. By defining ProcessParamBase and ProcessBase, it establishes a pattern for consistent configuration, execution with timeout control, error handling, and callback reporting. This foundation enables developers to implement specific process components by subclassing ProcessBase and overriding the _invoke method with custom async logic, while seamlessly integrating with the larger agent and canvas framework.