agent.go
Overview
The agent.go file implements the ParallelAgent, a specialized workflow agent designed to run multiple sub-agents concurrently in isolated invocation contexts. This agent type is part of the broader Agent Workflow Management system (80558), providing an execution strategy where sub-agents operate in parallel, enabling diverse perspectives or simultaneous attempts on a given task.
The ParallelAgent is particularly useful for scenarios such as running different algorithms simultaneously or generating multiple independent responses for downstream evaluation. It manages concurrency, event aggregation, cancellation, and error propagation, exposing a unified event stream to callers.
Key Types and Functions
Config Struct
type Config struct {
AgentConfig agent.Config
}
Purpose: Defines configuration for creating a
ParallelAgent.Fields:
AgentConfig: Embeds the basic agent configuration struct from the coreagentpackage (agent.Config), which includes agent metadata, sub-agents, and callback hooks.
Usage: Passed to
Newto instantiate a ParallelAgent with specified configuration.
New(cfg Config) (agent.Agent, error)
Purpose: Factory function to create a new
ParallelAgentinstance.Parameters:
cfg: TheConfigstruct containing agent configuration.
Returns:
agent.Agent: The created ParallelAgent implementing theagent.Agentinterface.error: Non-nil if creation fails due to invalid config or internal conversion errors.
Behavior:
Validates that the user has not provided a custom
Runimplementation inAgentConfig.Run, as ParallelAgent requires its own run logic.Sets the
Runfunction to the internalrunfunction defined in this file.Calls
agent.Newwith the configuredAgentConfigto create a base agent.Converts the returned agent to the internal agent interface to set internal state:
Marks the agent type as
TypeParallelAgent.Stores the provided config for introspection.
Returns the constructed parallel agent or an error if conversion fails.
Example:
cfg := parallelagent.Config{
AgentConfig: agent.Config{
Name: "MyParallelAgent",
SubAgents: []agent.Agent{subAgent1, subAgent2},
},
}
parallelAgent, err := parallelagent.New(cfg)
if err != nil {
// handle error
}
run(ctx agent.InvocationContext) iter.Seq2[*session.Event, error]
Purpose: Implements the core parallel execution logic for the agent, invoked during agent run.
Parameters:
ctx: The invocation context carrying session, memory, artifacts, and runtime info for this run.
Returns: An iterator function yielding session events and errors asynchronously.
Functionality:
Retrieves the current agent and its sub-agents.
Creates an
errgroup.Groupto manage concurrent goroutines with shared cancellation.Prepares channels:
resultsChan: to receive events and errors from sub-agent runs.doneChan: to signal cancellation to running sub-agents.
For each sub-agent:
Creates a new invocation context with a unique
Branchidentifier for isolation and traceability.Launches a goroutine running
runSubAgentwith this context and the sub-agent.
Starts a goroutine to wait for all sub-agents to complete and closes
resultsChanafterwards.Returns an iterator function that:
Reads from
resultsChan.Yields events and errors to the caller until the channel is closed or the caller stops iteration.
Closes
doneChanto signal sub-agent goroutines to terminate if iteration ends early.
runSubAgent(ctx agent.InvocationContext, agent agent.Agent, results chan<- result, done <-chan bool) error
Purpose: Helper function that runs a single sub-agent and forwards its events/errors to a shared channel.
Parameters:
ctx: Invocation context for the sub-agent run.agent: The sub-agent to execute.results: Channel to send results (event + error).done: Channel to listen for cancellation signals.
Returns: Error encountered during sub-agent execution or context cancellation.
Behavior:
Iterates over the event stream produced by
agent.Run(ctx).On each event/error tuple:
Selects on multiple channels:
If
doneis closed, immediately returns to stop processing.If the context is done (cancelled or timed out), sends the error (context error) to results and returns.
Otherwise, forwards the event and error to
results.
If an error is received from the sub-agent, returns immediately, propagating the error.
result Struct
type result struct {
event *session.Event
err error
}
Purpose: Encapsulates a single event and associated error produced by a sub-agent run.
Fields:
event: Pointer to asession.Eventgenerated by a sub-agent.err: Error associated with the event or sub-agent execution.
Important Implementation Details
Isolated Invocation Contexts: Each sub-agent receives a distinct branch in the invocation context hierarchy. This isolation prevents state collisions and enables detailed tracing of sub-agent execution paths.
Concurrency Control: Utilizes the
errgrouppackage for managing multiple goroutines, ensuring that cancellation or errors are coordinated properly among running sub-agents.Event Streaming: Events from all sub-agents are multiplexed into a single channel and yielded asynchronously via an iterator pattern (
iter.Seq2). This design supports incremental consumption and early termination.Cancellation Handling: The
doneChanchannel allows the parent iterator to signal sub-agent goroutines to stop early (e.g., if the caller stops consuming events), improving resource cleanup.Error Propagation: Errors from sub-agent runs are propagated upwards and included in the event stream, enabling transparent failure reporting.
No Custom Run Allowed: The
Newfunction enforces that users cannot override theRunmethod, preserving the parallel execution behavior integrity.
Interactions with Other System Components
Agent Interface: The
ParallelAgentimplements theagent.Agentinterface, making it composable with other agents, including nested workflow agents such as sequential or loop agents.Invocation Context (
agent.InvocationContext): Manages session state, memory, artifacts, and branching, facilitating isolated execution and state tracking for each sub-agent.Session Events (
session.Event): Events generated by sub-agents are streamed back to the caller to represent incremental progress and results.Agent Execution Runner (80560): Likely orchestrates the lifecycle of this agent, handling session persistence and cancellation coordination.
Artifact and Memory Services: Shared among sub-agents via context, allowing coordination on persistent or transient state while maintaining isolation.
Internal Agent State (
agentinternal): Used to mark agent type and configure internal metadata for runtime identification and management.
Visual Diagram: ParallelAgent Structure
classDiagram
class ParallelAgent {
+Config cfg
+New(cfg) agent.Agent
+run(ctx) iter.Seq2
-runSubAgent(ctx, agent, results, done) error
}
class Config {
+AgentConfig agent.Config
}
class result {
-event *session.Event
-err error
}
ParallelAgent --> Config : uses
ParallelAgent --> agent.Agent : implements
ParallelAgent --> result : produces
Visual Diagram: ParallelAgent Execution Workflow
flowchart TD
Start(("Start ParallelAgent Run"))
Sub1["Sub-Agent 1 Run"]
Sub2["Sub-Agent 2 Run"]
SubN["Sub-Agent N Run"]
ErrGroup["Error Group Wait"]
Results["Collect Events"]
Yield["Yield Events to Caller"]
Done["Done / Cancel Signal"]
Start -->|Create isolated contexts| Sub1
Start --> Sub2
Start --> SubN
Sub1 --> Results
Sub2 --> Results
SubN --> Results
ErrGroup -->|Wait for all sub-agents| Results
Results --> Yield
Yield --> Done
This flowchart illustrates the ParallelAgent's concurrent execution model:
The agent starts and creates isolated invocation contexts for each sub-agent.
Each sub-agent runs concurrently, sending events/errors into a shared results channel.
An error group waits for all sub-agents to complete.
The collected events are yielded asynchronously to the caller.
A done signal allows cancellation and cleanup.
References to Related Topics
Sub-agent execution context and lifecycle management are based on
Agent Invocation ContextandAgent Lifecycle and Callbacks.The ParallelAgent is a core part of
Agent Workflow Managementand complements other workflow agents such asSequential AgentandLoop Agent.It interacts with session state and event streaming defined in
Session Management.The agent implementation integrates with the
AI Agent Frameworkfor agent composition and invocation.Error handling and concurrency rely on standard Go concurrency patterns and the
errgrouppackage.ParallelAgent’s orchestration is compatible with the broader
Agent Execution Runnerthat manages agent lifecycle and session state updates.
This documentation provides a detailed technical overview of the agent.go file responsible for implementing the ParallelAgent, describing its design, usage, and integration within the larger agent orchestration framework.