canvas.py
Overview
This file defines the core classes Graph and Canvas which represent and manage dynamic workflows composed of interconnected components in the InfiniFlow system. These classes parse a JSON-based DSL (Domain Specific Language) describing a graph of components linked by dependencies, execute the components in sequence or parallel, maintain the workflow state (including history, globals, retrievals, and memory), and handle user interaction and data flow.
Graph: Encapsulates a generic directed graph of components, supporting loading, validation, navigation, and basic lifecycle operations on components.
Canvas: Extends
Graphto implement a conversational or interactive workflow engine with support for global state, history tracking, retrieval augmentation, asynchronous component execution, error handling, and user-driven branching.
The classes integrate with other system parts such as the component registry (component_class), file service, Redis caching, and prompt chunk formatting, enabling complex, stateful workflows with external data and asynchronous processing.
Classes and Methods
Class: Graph
Represents a directed graph of components defined by a DSL JSON string.
Initialization
def __init__(self, dsl: str, tenant_id=None, task_id=None)
dsl (
str): JSON string describing the graph structure.tenant_id (optional): Identifier for tenant/user context.
task_id (optional): Unique ID for this graph instance; autogenerated if omitted.
Loads and validates the DSL, initializes components as instances of registered component classes, and sets initial execution path.
Methods
load()
Parses the JSON DSL, validates component parameters, and instantiates component objects.
reset()
Resets the graph state by clearing the path and invoking
reset()on each component object. Also clears related Redis logs.**run(kwargs)
Abstract method. Intended to be overridden by subclasses to execute the graph workflow.
get_component(cpn_id) -> Union[None, dict[str, Any]]
Returns the raw dictionary describing a component by its ID.
get_component_obj(cpn_id) -> ComponentBase
Returns the component instance object.
get_component_type(cpn_id) -> str
Returns the component's type name.
get_component_input_form(cpn_id) -> dict
Returns the input form schema for the component.
get_tenant_id()
Returns the tenant ID associated with this graph.
get_component_name(cid) -> str
Attempts to resolve a component name from the DSL graph nodes by ID.
str()Serializes the current graph state back into a JSON string, including component outputs and current path.
Class: Canvas (inherits Graph)
Represents a specialized graph for interactive conversational workflows, managing globals, history, retrieval, and memory.
Initialization
def __init__(self, dsl: str, tenant_id=None, task_id=None)
Initializes default global variables related to the system context and forwards to Graph constructor.
Methods
load()
Extends
Graph.load(). Loads additional attributes such as history, globals, retrieval, and memory from the DSL JSON.reset(mem=False)
Resets graph and optionally clears conversational history, retrieval, and memory. Clears or resets global variables to default empty states.
**run(kwargs) -> generator
Executes the workflow based on the current path of components. Supports:
User input integration.
Parallel component execution via thread pools.
Event-driven yielding of workflow states (
workflow_started,node_started,message,user_inputs,workflow_finished, etc.).Error handling and branching logic.
Handling of partial outputs for streaming messages.
Dynamic path extension based on component outputs and exceptions.
Integration with Redis for logging tool usage traces.
Parameters:
query(str): User input query.user_id(str): User identifier.files(list): List of file dictionaries for processing.
Yields: Event dictionaries describing workflow progress and component outputs.
is_reff(exp: str) -> bool
Checks if a string expression is a reference to a global or component variable.
get_variable_value(exp: str) -> Any
Resolves and returns the value of a referenced variable from globals or component outputs.
get_history(window_size: int) -> list
Retrieves a windowed slice of conversation history as role/content dictionaries.
add_user_input(question: str)
Appends a user question to the conversation history.
get_prologue() -> str
Returns the prologue text from the "begin" component's parameters.
get_mode() -> str
Returns the mode parameter from the "begin" component.
**set_global_param(kwargs)
Updates global variables with provided key-value pairs.
get_preset_param() -> dict
Returns preset input parameters from the "begin" component.
get_component_input_elements(cpnnm: str) -> dict
Returns input element schema for a component by its name.
get_files(files: Union[None, list[dict]]) -> list[str]
Processes a list of file descriptors:
Images are converted to base64 strings.
Other files are parsed via the
FileService.Uses thread pool for concurrent file processing.
tool_use_callback(agent_id: str, func_name: str, params: dict, result: Any, elapsed_time=None)
Logs tool usage information into Redis for traceability.
add_reference(chunks: list, doc_infos: list)
Adds document chunks and document info metadata into the current retrieval cache.
get_reference() -> dict
Returns the latest retrieval references (chunks and document aggregates).
add_memory(user: str, assist: str, summ: str)
Adds a memory tuple (user input, assistant response, summary) to memory list.
get_memory() -> list[Tuple]
Returns the stored memory tuples.
get_component_thoughts(cpn_id) -> str
Returns the "thoughts" string from a component object for introspection/debugging.
Important Implementation Details and Algorithms
DSL Parsing and Validation: Uses JSON to define components, their parameters, and graph topology. Parameter validation is performed during
load()by instantiating parameter classes and calling theircheck()methods.Component Instantiation: Component objects are dynamically created using a factory method
component_classwith class names inferred from component names.Workflow Execution: The
run()method inCanvasuses a generator pattern to yield workflow events incrementally, enabling asynchronous front-end updates or streaming.Parallel Execution: Components in the current path segment are invoked concurrently with a
ThreadPoolExecutor, optimizing throughput.Partial Outputs and Streaming: Supports components that produce partial outputs (via Python
functools.partial) to stream messages progressively.Dynamic Path Management: The execution path can be extended or branched based on component outputs and exceptions, allowing for conditional workflows.
State Management: Globals, history, and retrieval caches are maintained and updated during execution to retain context across turns.
File Handling: Converts image files to base64 data URIs and parses other files concurrently, integrating with an external file service.
Redis Integration: Uses Redis to persist log traces of tool invocations associated with component executions.
Interaction with Other System Parts
Component System: Uses
component_classandComponentBaseas base for component instantiation and execution.FileService: For file blob retrieval and parsing, used in file input handling.
Redis (
REDIS_CONN): For caching logs and possibly other runtime data.Prompt and Retrieval Utils: Uses
chunks_formatto process document chunks for retrieval.UUID and Hash Utilities: Uses
get_uuid()for unique IDs andhash_str2int()for chunk ID hashing.
Usage Examples
# Create a Canvas instance from a DSL JSON string
canvas = Canvas(dsl_json_str, tenant_id="user123")
# Run the workflow with user input query
for event in canvas.run(query="What is InfiniFlow?"):
if event["event"] == "message":
print("Assistant says:", event["data"]["content"])
elif event["event"] == "user_inputs":
print("User inputs requested:", event["data"]["inputs"])
# Reset the canvas to initial state
canvas.reset()
Class Diagram
classDiagram
class Graph {
-dsl: dict
-path: list
-components: dict
-error: str
-_tenant_id: str
-task_id: str
+__init__(dsl: str, tenant_id=None, task_id=None)
+load()
+reset()
+run(**kwargs)
+get_component(cpn_id) -> dict
+get_component_obj(cpn_id) -> ComponentBase
+get_component_type(cpn_id) -> str
+get_component_input_form(cpn_id) -> dict
+get_tenant_id()
+get_component_name(cid) -> str
+__str__()
}
class Canvas {
-globals: dict
-history: list
-retrieval: list
-memory: list
+__init__(dsl: str, tenant_id=None, task_id=None)
+load()
+reset(mem=False)
+run(**kwargs) -> generator
+is_reff(exp: str) -> bool
+get_variable_value(exp: str) -> Any
+get_history(window_size: int) -> list
+add_user_input(question: str)
+get_prologue() -> str
+get_mode() -> str
+set_global_param(**kwargs)
+get_preset_param() -> dict
+get_component_input_elements(cpnnm: str) -> dict
+get_files(files: list) -> list[str]
+tool_use_callback(agent_id: str, func_name: str, params: dict, result: Any, elapsed_time=None)
+add_reference(chunks: list, doc_infos: list)
+get_reference() -> dict
+add_memory(user: str, assist: str, summ: str)
+get_memory() -> list
+get_component_thoughts(cpn_id) -> str
}
Canvas --|> Graph
Summary
canvas.py is a pivotal file implementing workflow orchestration in InfiniFlow. The Graph class provides the foundational graph model and component management, while Canvas builds on it to support interactive, stateful, asynchronous workflows with rich user input, branching logic, and external data integration. Its design supports extensibility via component classes and provides fine-grained event-driven execution suitable for conversational AI or agent-based systems.