base_flow.go

Overview

This file implements the core Flow structure and related logic for orchestrating interactions with Large Language Models (LLMs) and tools within an agent execution framework. It manages the lifecycle of LLM request processing, including preprocessing, invoking the model, postprocessing, and handling function/tool calls that may be triggered by the LLM responses. The flow supports extensibility through callback hooks before and after model and tool invocations, enabling custom behavior injection.

The Flow abstraction serves as a central coordinator that integrates model calls (LLM Integration and Agents) with tools (Tooling System) and manages invocation context (Agent Invocation Context), telemetry (Telemetry and Observability), and agent transitions. It is designed to be used within an agent execution environment where requests are processed sequentially, streaming responses are handled, and complex workflows involving multiple tools and agents are supported.


Types and Core Structures

Flow

The Flow struct encapsulates the configuration and execution logic for a single LLM interaction flow. It contains:

These hooks provide extensibility for custom processing and integration.


Callback Types


Constants


Methods and Functions

func (f *Flow) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error]

Runs the flow in a loop, yielding streaming session events and errors until a final response event is reached or the context ends.

func (f *Flow) runOneStep(ctx agent.InvocationContext) iter.Seq2[*session.Event, error]

Executes a single iteration step of the flow:

Returns an iterator yielding events and errors.

func (f *Flow) preprocess(ctx agent.InvocationContext, req *model.LLMRequest) error

Applies all registered request processors to the LLM request, including those from tools available to the agent.

func toolPreprocess(ctx agent.InvocationContext, req *model.LLMRequest, tools []tool.Tool) error

Runs the RequestProcessor interface method on each tool in DFS order.

func (f *Flow) callLLM(ctx agent.InvocationContext, req *model.LLMRequest, stateDelta map[string]any) iter.Seq2[*model.LLMResponse, error]

Invokes the LLM model's GenerateContent method with streaming support.

func (f *Flow) runAfterModelCallbacks(ctx agent.InvocationContext, llmResp *model.LLMResponse, stateDelta map[string]any, llmErr error) (*model.LLMResponse, error)

Executes all registered AfterModelCallbacks sequentially.

func (f *Flow) postprocess(ctx agent.InvocationContext, req *model.LLMRequest, resp *model.LLMResponse) error

Applies all response processors to the LLM response.

func (f *Flow) agentToRun(ctx agent.InvocationContext, agentName string) agent.Agent

Resolves an agent by name from the current agent's transfer targets (agents available for transfer).

func (f *Flow) finalizeModelResponseEvent(ctx agent.InvocationContext, resp *model.LLMResponse, tools map[string]tool.Tool, stateDelta map[string]any) *session.Event

Creates a session.Event representing the LLM response with populated metadata.

func findLongRunningFunctionCallIDs(c *genai.Content, tools map[string]tool.Tool) []string

Returns a list of function call IDs corresponding to long-running tools found in the content.


Function Call Handling

func (f *Flow) handleFunctionCalls(ctx agent.InvocationContext, toolsDict map[string]tool.Tool, resp *model.LLMResponse) (*session.Event, error)

Processes all function calls embedded in the LLM response content:

func (f *Flow) callTool(tool toolinternal.FunctionTool, fArgs map[string]any, toolCtx tool.Context) map[string]any

Executes a function tool with before/after tool callbacks:

func (f *Flow) invokeBeforeToolCallbacks(tool toolinternal.FunctionTool, fArgs map[string]any, toolCtx tool.Context) (map[string]any, error)

Executes all before tool callbacks in order; returns first non-nil result or nil.

func (f *Flow) invokeAfterToolCallbacks(tool toolinternal.FunctionTool, fArgs map[string]any, toolCtx tool.Context, fResult map[string]any, fErr error) (map[string]any, error)

Executes all after tool callbacks in order; returns first non-nil result or nil.


Event Merging Utilities

func mergeParallelFunctionResponseEvents(events []*session.Event) (*session.Event, error)

Merges multiple function response events into one by concatenating content parts and merging event actions.

func mergeEventActions(base, other *session.EventActions) *session.EventActions

Combines two EventActions with a "last one wins" approach, merging flags and state delta.


Important Implementation Details


Interaction with Other System Components


Usage Example Snippet

flow := &Flow{
    Model: myLLMModel,
    RequestProcessors: DefaultRequestProcessors,
    ResponseProcessors: DefaultResponseProcessors,
}

ctx := myInvocationContext
for event, err := range flow.Run(ctx) {
    if err != nil {
        log.Fatal(err)
    }
    // Process session event, e.g., send to client or log
    handleEvent(event)
}

Diagram: Flow Structure and Main Method Relationships

classDiagram
class Flow {
+Model
+RequestProcessors
+ResponseProcessors
+BeforeModelCallbacks
+AfterModelCallbacks
+BeforeToolCallbacks
+AfterToolCallbacks
+Run()
+runOneStep()
+preprocess()
+callLLM()
+postprocess()
+handleFunctionCalls()
+callTool()
+invokeBeforeToolCallbacks()
+invokeAfterToolCallbacks()
+agentToRun()
+finalizeModelResponseEvent()
}
Flow --> "[]func" : RequestProcessors
Flow --> "[]func" : ResponseProcessors
Flow --> "[]BeforeModelCallback"
Flow --> "[]AfterModelCallback"
Flow --> "[]BeforeToolCallback"
Flow --> "[]AfterToolCallback"
Flow : Run() --> runOneStep()
Flow : runOneStep() --> preprocess()
Flow : runOneStep() --> callLLM()
Flow : runOneStep() --> postprocess()
Flow : runOneStep() --> handleFunctionCalls()
Flow : handleFunctionCalls() --> callTool()
Flow : callTool() --> invokeBeforeToolCallbacks()
Flow : callTool() --> invokeAfterToolCallbacks()
Flow : runOneStep() --> agentToRun()
Flow : runOneStep() --> finalizeModelResponseEvent()

This documentation captures the core design, methods, and interactions of the base_flow.go file, focusing on the flow orchestration of LLM requests, response handling, tool execution, and agent transitions within the agent framework. For concepts such as agent invocation context, telemetry, and tooling interfaces, see the respective topics referenced.