base_flow.go
Overview
This file implements the core Flow structure and related logic for orchestrating interactions with Large Language Models (LLMs) and tools within an agent execution framework. It manages the lifecycle of LLM request processing, including preprocessing, invoking the model, postprocessing, and handling function/tool calls that may be triggered by the LLM responses. The flow supports extensibility through callback hooks before and after model and tool invocations, enabling custom behavior injection.
The Flow abstraction serves as a central coordinator that integrates model calls (LLM Integration and Agents) with tools (Tooling System) and manages invocation context (Agent Invocation Context), telemetry (Telemetry and Observability), and agent transitions. It is designed to be used within an agent execution environment where requests are processed sequentially, streaming responses are handled, and complex workflows involving multiple tools and agents are supported.
Types and Core Structures
Flow
The Flow struct encapsulates the configuration and execution logic for a single LLM interaction flow. It contains:
Model model.LLM: The configured LLM model to generate responses.RequestProcessors []func(agent.InvocationContext, *model.LLMRequest) error: A sequence of functions that mutate or validate an outgoing LLM request.ResponseProcessors []func(agent.InvocationContext, *model.LLMRequest, *model.LLMResponse) error: A sequence of functions applied to the LLM response for postprocessing.BeforeModelCallbacks []BeforeModelCallback: Callbacks invoked before the model is called, allowing interception or modification.AfterModelCallbacks []AfterModelCallback: Callbacks invoked after the model returns, allowing modification or error handling.BeforeToolCallbacks []BeforeToolCallback: Callbacks invoked before a tool is executed.AfterToolCallbacks []AfterToolCallback: Callbacks invoked after tool execution.
These hooks provide extensibility for custom processing and integration.
Callback Types
BeforeModelCallback:func(ctx agent.CallbackContext, llmRequest *model.LLMRequest) (*model.LLMResponse, error)
Invoked before the LLM call. Can short-circuit the call by returning a response or error.AfterModelCallback:func(ctx agent.CallbackContext, llmResponse *model.LLMResponse, llmResponseError error) (*model.LLMResponse, error)
Invoked after the LLM call returns, allowing modification of the response or error.BeforeToolCallback:func(ctx tool.Context, tool tool.Tool, args map[string]any) (map[string]any, error)
Invoked before a tool runs; may modify input args or short-circuit with a result.AfterToolCallback:func(ctx tool.Context, tool tool.Tool, args, result map[string]any, err error) (map[string]any, error)
Invoked after tool execution; may modify or replace the result.
Constants
DefaultRequestProcessorsandDefaultResponseProcessorsare default slices of functions applied to requests and responses respectively, including processors for code execution, natural language planning, authentication, and content handling.
Methods and Functions
func (f *Flow) Run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error]
Runs the flow in a loop, yielding streaming session events and errors until a final response event is reached or the context ends.
Parameters:
ctx- Execution context providing agent info and invocation lifecycle.Returns:
An iterator over session events and errors.Usage:
Suitable for driving the full LLM interaction lifecycle, including streaming partial responses and handling agent transfers.
func (f *Flow) runOneStep(ctx agent.InvocationContext) iter.Seq2[*session.Event, error]
Executes a single iteration step of the flow:
Preprocesses the request.
Calls the LLM model.
Postprocesses the response.
Handles function/tool calls from the response.
Handles agent transfers if requested.
Returns an iterator yielding events and errors.
func (f *Flow) preprocess(ctx agent.InvocationContext, req *model.LLMRequest) error
Applies all registered request processors to the LLM request, including those from tools available to the agent.
Parameters:
ctx- Invocation context.
req - Pointer to the LLM request to be mutated.Returns:
Error if any processor fails.Details:
Extracts tools from the current agent and its toolsets, then invokes toolPreprocess on them.
func toolPreprocess(ctx agent.InvocationContext, req *model.LLMRequest, tools []tool.Tool) error
Runs the RequestProcessor interface method on each tool in DFS order.
Returns:
Error if any tool does not implementRequestProcessoror fails.Note:
Designed for sequential processing; concurrency is a potential future improvement.
func (f *Flow) callLLM(ctx agent.InvocationContext, req *model.LLMRequest, stateDelta map[string]any) iter.Seq2[*model.LLMResponse, error]
Invokes the LLM model's GenerateContent method with streaming support.
Runs
BeforeModelCallbacksbefore the call.Runs
AfterModelCallbackson each streamed response/error.Yields LLM responses or errors.
Parameters:
ctx- Invocation context.
req - LLM request.
stateDelta - Mutable map for tracking state changes.Returns:
Iterator over LLM responses and errors.Details:
Streaming mode is enabled based on run configuration.
func (f *Flow) runAfterModelCallbacks(ctx agent.InvocationContext, llmResp *model.LLMResponse, stateDelta map[string]any, llmErr error) (*model.LLMResponse, error)
Executes all registered AfterModelCallbacks sequentially.
Parameters:
LLM response and error, invocation context, and state delta.Returns:
Possibly modified LLM response and error.
func (f *Flow) postprocess(ctx agent.InvocationContext, req *model.LLMRequest, resp *model.LLMResponse) error
Applies all response processors to the LLM response.
Returns:
Error if any processor fails.
func (f *Flow) agentToRun(ctx agent.InvocationContext, agentName string) agent.Agent
Resolves an agent by name from the current agent's transfer targets (agents available for transfer).
Returns:
The matching agent ornilif not found.
func (f *Flow) finalizeModelResponseEvent(ctx agent.InvocationContext, resp *model.LLMResponse, tools map[string]tool.Tool, stateDelta map[string]any) *session.Event
Creates a session.Event representing the LLM response with populated metadata.
Generates function call IDs if missing.
Collects long-running tool call IDs.
Sets author, branch, and state delta.
func findLongRunningFunctionCallIDs(c *genai.Content, tools map[string]tool.Tool) []string
Returns a list of function call IDs corresponding to long-running tools found in the content.
Function Call Handling
func (f *Flow) handleFunctionCalls(ctx agent.InvocationContext, toolsDict map[string]tool.Tool, resp *model.LLMResponse) (*session.Event, error)
Processes all function calls embedded in the LLM response content:
Looks up corresponding tool.
Executes the function tool with arguments.
Collects individual function response events.
Merges these events into a single event.
Returns:
A merged session event or error.Details:
Uses telemetry spans for tracing tool executions.
func (f *Flow) callTool(tool toolinternal.FunctionTool, fArgs map[string]any, toolCtx tool.Context) map[string]any
Executes a function tool with before/after tool callbacks:
Runs
BeforeToolCallbackswhich can override execution.Calls the tool's
Runmethod if no override.Runs
AfterToolCallbackswhich can modify the result.Returns:
Result map or error wrapped in a map.
func (f *Flow) invokeBeforeToolCallbacks(tool toolinternal.FunctionTool, fArgs map[string]any, toolCtx tool.Context) (map[string]any, error)
Executes all before tool callbacks in order; returns first non-nil result or nil.
func (f *Flow) invokeAfterToolCallbacks(tool toolinternal.FunctionTool, fArgs map[string]any, toolCtx tool.Context, fResult map[string]any, fErr error) (map[string]any, error)
Executes all after tool callbacks in order; returns first non-nil result or nil.
Event Merging Utilities
func mergeParallelFunctionResponseEvents(events []*session.Event) (*session.Event, error)
Merges multiple function response events into one by concatenating content parts and merging event actions.
Returns:
Merged event or error.
func mergeEventActions(base, other *session.EventActions) *session.EventActions
Combines two EventActions with a "last one wins" approach, merging flags and state delta.
Important Implementation Details
Streaming Support:
RunandrunOneStepsupport streaming LLM responses, yielding events incrementally.Agent Transfer Handling:
When a response contains an action to transfer control to another agent, the flow resolves and runs the new agent, yielding its events.Tool Execution:
Tools are invoked based on function calls embedded in the LLM response content, allowing the flow to handle complex workflows involving external functions.Callback Hooks:
Extensive use of before/after callbacks for both model and tool execution provides flexible extension points for custom logic.Telemetry Integration:
Telemetry spans are created for LLM calls and tool executions, supporting observability and performance tracing.Function Call ID Population:
Ensures consistent function call IDs for tracking and matching requests/responses.
Interaction with Other System Components
Interacts with LLM models via the model.LLM interface to generate content (LLM Integration and Agents).
Uses agent invocation context (agent.InvocationContext) to manage lifecycle and access agent metadata (Agent Invocation Context).
Integrates with tools implementing tool.Tool and toolinternal.FunctionTool interfaces from the tooling system (Tooling System).
Produces session events (session.Event) consumed by the session management and agent execution runner (Session Management, Agent Execution Runner).
Employs telemetry for tracing calls (Telemetry and Observability).
Supports agent-to-agent transfers, relying on the agent tree and parent mappings (Agent Lifecycle and Callbacks).
Usage Example Snippet
flow := &Flow{
Model: myLLMModel,
RequestProcessors: DefaultRequestProcessors,
ResponseProcessors: DefaultResponseProcessors,
}
ctx := myInvocationContext
for event, err := range flow.Run(ctx) {
if err != nil {
log.Fatal(err)
}
// Process session event, e.g., send to client or log
handleEvent(event)
}
Diagram: Flow Structure and Main Method Relationships
classDiagram
class Flow {
+Model
+RequestProcessors
+ResponseProcessors
+BeforeModelCallbacks
+AfterModelCallbacks
+BeforeToolCallbacks
+AfterToolCallbacks
+Run()
+runOneStep()
+preprocess()
+callLLM()
+postprocess()
+handleFunctionCalls()
+callTool()
+invokeBeforeToolCallbacks()
+invokeAfterToolCallbacks()
+agentToRun()
+finalizeModelResponseEvent()
}
Flow --> "[]func" : RequestProcessors
Flow --> "[]func" : ResponseProcessors
Flow --> "[]BeforeModelCallback"
Flow --> "[]AfterModelCallback"
Flow --> "[]BeforeToolCallback"
Flow --> "[]AfterToolCallback"
Flow : Run() --> runOneStep()
Flow : runOneStep() --> preprocess()
Flow : runOneStep() --> callLLM()
Flow : runOneStep() --> postprocess()
Flow : runOneStep() --> handleFunctionCalls()
Flow : handleFunctionCalls() --> callTool()
Flow : callTool() --> invokeBeforeToolCallbacks()
Flow : callTool() --> invokeAfterToolCallbacks()
Flow : runOneStep() --> agentToRun()
Flow : runOneStep() --> finalizeModelResponseEvent()
This documentation captures the core design, methods, and interactions of the base_flow.go file, focusing on the flow orchestration of LLM requests, response handling, tool execution, and agent transitions within the agent framework. For concepts such as agent invocation context, telemetry, and tooling interfaces, see the respective topics referenced.