base.py
Overview
The base.py file provides foundational abstract base classes and utilities for defining and managing components and their parameters within the InfiniFlow system. It standardizes how component parameters are represented, validated, updated, and serialized, as well as how components themselves are initialized, invoked, and interact with a processing graph or canvas.
This file is key for enabling a modular and extensible architecture where components and their configurations can be dynamically managed, validated, and executed with controlled concurrency and error handling.
Classes and Key Functionality
ComponentParamBase (Abstract Base Class)
ComponentParamBase serves as the base class for all parameter objects associated with components. It encapsulates parameter data, validation logic, serialization, and update mechanisms.
Initialization and Properties
Attributes:
message_history_window_size(int): Size of the message history window, default 13.inputs(dict): Input parameters dictionary.outputs(dict): Output parameters dictionary.description(str): Description text for the parameter object.max_retries(int): Maximum retries allowed, default 0.delay_after_error(float): Delay in seconds after an error, default 2.0.exception_method,exception_default_value,exception_goto: For handling exceptions.debug_inputs(dict): Inputs captured for debugging.
Methods
set_name(name: str) -> self
Set the internal name of the parameter object and return self for chaining.check()
Abstract method; must be implemented by subclasses to validate parameters.as_dict() -> dict
Recursively converts the parameter object (and nested objects) to a dictionary, handling pandas DataFrames and skipping internal bookkeeping attributes.update(conf: dict, allow_redundant=False) -> self
Recursively updates the parameter object with values from a configuration dictionary. Tracks deprecated and user-fed parameters, and validates parameter nesting depth againstsettings.PARAM_MAXDEPTH.extract_not_builtin() -> dict
Returns a dictionary of attributes that are not built-in types, recursively.validate()
Loads JSON-based validation rules from a file named after the parameter class (in aparam_validation/directory) and recursively validates attributes using operators like greater-than, less-than, inclusion, etc.Static Type Checking Utilities:
Several static methods to validate specific parameter types and value constraints, e.g.:check_string(param, descr)check_positive_integer(param, descr)check_boolean(param, descr)check_and_change_lower(param, valid_list, descr="") (validates and lowercases strings)
Validation Operators (Static):
Implementations of comparison operators used in validation:_greater_equal_than(value, limit)_less_equal_than(value, limit)_range(value, ranges)_in(value, right_value_list)_not_in(value, wrong_value_list)
Deprecation Warning Helpers:
_warn_deprecated_param(param_name, descr)_warn_to_deprecate_param(param_name, descr, new_param)
Usage Example
class MyComponentParams(ComponentParamBase):
def check(self):
self.check_string(self.description, "Description")
self.check_positive_integer(self.max_retries, "Max retries")
params = MyComponentParams().set_name("MyParams")
params.update({"description": "Example", "max_retries": 3})
params.validate()
print(params.as_dict())
ComponentBase (Abstract Base Class)
ComponentBase defines the interface and base behavior for components in the system. Components represent functional units that process inputs and produce outputs within a canvas/graph.
Initialization
init(canvas, id, param: ComponentParamBase)canvas: The canvas or graph instance managing the component (must be an instance ofGraph).id: Unique identifier of the component.param: Parameter object derived fromComponentParamBase.
Validates the parameter object on initialization by calling its
.check()method.
Attributes
component_name(str): Name of the component (to be set by subclasses).thread_limiter: A concurrency limiter usingtrio.CapacityLimiter, defaulting to a max concurrency defined by environment variableMAX_CONCURRENT_CHATS.variable_ref_patt(str): Regex pattern used to detect variable references in strings.
Key Methods
invoke(**kwargs) -> dict[str, Any]
Main entry point to execute component logic.Sets a created timestamp output.
Calls the protected
_invokemethod (to be implemented by subclasses).Handles exceptions by setting output error or default exception values.
Records elapsed time.
Returns the output dictionary.
_invoke(**kwargs)
Abstract method; subclasses must implement the core logic here. Decorated with a timeout (default 10 minutes).Input/Output management:
get_input(key: str=None): Retrieves input values; if no key given, resolves all inputs with reference substitution.get_input_values(): Returns current input values, prioritizing debug inputs if present.set_input_value(key: str, value: Any): Sets an input value.get_input_value(key: str): Gets a single input value.output(var_nm: str=None): Gets output value(s).set_output(key: str, value: Any): Sets an output value.error(): Returns any error message from outputs.reset(only_output=False): Clears outputs (and inputs unlessonly_output=True).
Component relationship helpers:
get_parent(): Returns the parent component object, if any.get_upstream(): Returns list of upstream component IDs.get_downstream(): Returns list of downstream component IDs.
Variable reference extraction and formatting:
get_input_elements_from_text(txt: str): Parses and returns dict of variable references found in a text string.string_format(content: str, kv: dict[str, str]): Performs template-style string substitution for keys inkv.
Error handling:
exception_handler(): Returns exception handling instructions if configured.get_exception_default_value(): Returns default exception value if the configured method is"comment".set_exception_default_value(): Sets output"result"to the exception default value.
Debug:
debug(**kwargs): Direct call to_invokefor debugging purposes.
thoughts()
Abstract method, intended for subclass implementation (purpose not explicitly defined).
Usage Example
class MyComponent(ComponentBase):
component_name = "MyComponent"
async def _invoke(self, **kwargs):
# Example processing logic
input_val = self.get_input_value("input_key")
result = input_val * 2 # Some processing
self.set_output("result", result)
# Usage
component = MyComponent(canvas, "comp1", params)
output = component.invoke(input_key=5)
print(output["result"]) # Expected: 10
Important Implementation Details and Algorithms
Parameter Recursion and Update:
Theupdatemethod inComponentParamBaserecursively walks through nested parameter objects to apply configuration dictionaries. It tracks user-fed parameters and deprecated parameters separately, enabling warnings and fine-grained control.Validation Using External JSON:
Parameter validation loads JSON files named after the parameter class from aparam_validation/directory. This design allows external configuration of validation rules without modifying code.Concurrency Control:
Component execution is controlled by atrio.CapacityLimiterthat limits concurrent invocations based on environment configuration, preventing resource exhaustion.Timeout Decorator:
The_invokemethod is decorated with a timeout function to prevent components from running indefinitely.Variable Reference Pattern:
The regex pattern inComponentBaseidentifies references in formats like{var@component}, allowing dynamic resolution of variables from the canvas.Exception Handling Strategy:
Components can specify exception handling strategies with default values and "goto" pointers to enable workflow recovery or alternative flows in failure cases.Deferred Import for Canvas Graph:
To avoid cyclic dependencies, theGraphclass fromagent.canvasis imported locally inside theComponentBaseconstructor.
Interactions with Other System Parts
Canvas and Graph:
ComponentBasetightly integrates with aGraphorCanvasobject (fromagent.canvas) managing the component graph topology and variable resolution.Settings Module:
Uses thesettingsmodule for configuration constants likePARAM_MAXDEPTH, floating point toleranceFLOAT_ZERO, and concurrency limits.Timeout Utility:
Uses atimeoutdecorator fromapi.utils.api_utilsto limit execution time of component logic.Logging:
Uses Python's standardloggingmodule for warnings and error reporting.Pandas:
Supports conversion ofpandas.DataFrameobjects to dictionaries during serialization.
Mermaid Class Diagram
classDiagram
class ComponentParamBase {
- message_history_window_size: int
- inputs: dict
- outputs: dict
- description: str
- max_retries: int
- delay_after_error: float
- exception_method
- exception_default_value
- exception_goto
- debug_inputs: dict
+ set_name(name: str)
+ check()
+ as_dict() dict
+ update(conf: dict, allow_redundant: bool) ComponentParamBase
+ extract_not_builtin() dict
+ validate()
+ check_string(param, descr)
+ check_positive_integer(param, descr)
+ check_boolean(param, descr)
+ check_and_change_lower(param, valid_list, descr)
+ _greater_equal_than(value, limit) bool
+ _less_equal_than(value, limit) bool
+ _range(value, ranges) bool
+ _in(value, list) bool
+ _not_in(value, list) bool
+ _warn_deprecated_param(param_name, descr)
+ _warn_to_deprecate_param(param_name, descr, new_param) bool
}
class ComponentBase {
- component_name: str
- thread_limiter: trio.CapacityLimiter
- variable_ref_patt: str
- _canvas
- _id
- _param: ComponentParamBase
+ __init__(canvas, id, param: ComponentParamBase)
+ invoke(**kwargs) dict
+ _invoke(**kwargs)
+ output(var_nm: str) Any
+ set_output(key: str, value: Any)
+ error() Any
+ reset(only_output: bool)
+ get_input(key: str) Any
+ get_input_values() dict
+ get_input_elements_from_text(txt: str) dict
+ get_input_elements() dict
+ get_input_form() dict
+ set_input_value(key: str, value: Any)
+ get_input_value(key: str) Any
+ get_component_name(cpn_id) str
+ get_param(name)
+ debug(**kwargs)
+ get_parent()
+ get_upstream() list
+ get_downstream() list
+ string_format(content: str, kv: dict) str
+ exception_handler() dict
+ get_exception_default_value()
+ set_exception_default_value()
+ thoughts()
}
ComponentBase --> ComponentParamBase : uses >
Summary
base.py defines the core abstractions for component parameters and components within InfiniFlow, providing:
Parameter management with validation, recursive update, serialization, and deprecation tracking.
Component lifecycle management including concurrency control, error handling, input/output management, and integration with a graph-based execution canvas.
Rich utilities for type checking, variable resolution, and external configuration-driven validation.
This file is foundational for building reliable, extensible, and maintainable components that can be orchestrated in complex workflows.
If you need further details on specific methods or example integrations, feel free to ask!