executor.go
Overview
The executor.go file implements the Executor component responsible for executing AI agents in response to Agent-To-Agent (A2A) protocol requests. It acts as a server-side bridge that:
Accepts incoming A2A messages representing agent tasks.
Converts these messages into internal agent content (
genai.Content).Instantiates and runs an ADK agent runner (
runner.Runner) with the provided input.Manages session lifecycle, ensuring sessions exist before execution.
Translates internal session events into A2A events, streaming them asynchronously to the client via an event queue.
Emits task status updates reflecting the task lifecycle (submitted, working, completed, failed, canceled).
Handles cancellation requests by updating task status accordingly.
The Executor thus enables remote execution of AI agents, managing the entire task lifecycle and event streaming as part of the Remote Agent Communication (A2A) infrastructure.
Types and Interfaces
ExecutorConfig
Holds the mandatory dependencies needed by the Executor to run:
RunnerConfig runner.Config
Configuration used to instantiate the agent runner (runner.New) during execution.RunConfig agent.RunConfig
Configuration passed to the runner'sRunmethod when executing the agent.
Executor
The main struct implementing the a2asrv.AgentExecutor interface. It orchestrates the execution of ADK agents and streaming of events according to A2A protocol semantics.
Field:
config ExecutorConfig— Holds the configuration dependencies.
Functions and Methods
NewExecutor(config ExecutorConfig) *Executor
Creates and returns a new initialized Executor instance.
Parameters:
config ExecutorConfig: Configuration containing runner and run settings.
Returns:
Pointer to a new
Executor.
Usage Example:
executor := NewExecutor(ExecutorConfig{
RunnerConfig: runnerCfg,
RunConfig: runCfg,
})
(e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error
Primary method invoked to execute an agent task in response to an A2A request.
Parameters:
ctx context.Context: Execution context for cancellation and deadlines.reqCtx *a2asrv.RequestContext: Contains request message, stored task metadata, and other invocation details.queue eventqueue.Queue: Event queue to which A2A events are written for downstream consumption.
Returns:
errorif execution or event streaming fails.
Detailed Behavior:
Validates that the incoming A2A message (
reqCtx.Message) is present; otherwise, returns an error.Converts the A2A message to internal
genai.ContentusingtoGenAIContent.Creates a new agent
runner.Runnerinstance using the configuredRunnerConfig.If the request does not reference an existing stored task, writes a
TaskStateSubmittedstatus update event to the queue.Prepares the session by ensuring it exists or creating it (
prepareSession).Writes a
TaskStateWorkingstatus update event to indicate the agent execution is starting.Instantiates an
eventProcessorto convert session events to A2A events.Invokes the internal
processmethod to run the agent and stream events.
(e *Executor) Cancel(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error
Handles cancellation requests for running tasks.
Writes a
TaskStateCanceledstatus update event to the event queue.Returns any error encountered when writing the cancel event.
(e *Executor) process(ctx context.Context, r *runner.Runner, processor *eventProcessor, content *genai.Content, q eventqueue.Queue) error
Internal helper method that runs the agent and processes its output events.
Parameters:
ctx: Execution context.r *runner.Runner: Runner instance executing the agent.processor *eventProcessor: Converts session events into A2A events.content *genai.Content: Agent input content.q eventqueue.Queue: Queue to write A2A events.
Behavior:
Calls
r.Runto receive a channel streaming(session.Event, error)tuples.For each event:
If an error occurs, emits a task failure event and terminates.
Converts the session event to an A2A event using the processor.
Writes the A2A event to the queue if not nil.
After event stream ends, writes terminal events like completion or failure status.
Returns any errors encountered during event writing.
(e *Executor) prepareSession(ctx context.Context, meta invocationMeta) error
Ensures the session exists before agent execution.
Parameters:
ctx: Execution context.meta invocationMeta: Metadata containing user ID, session ID, and event metadata.
Behavior:
Queries the session service for an existing session using
Get.If session does not exist, calls
Createto initialize a new session with empty state.Returns error if session creation fails.
Important Implementation Details and Algorithms
Task Status Updates:
TheExecutoremits status update events to reflect the task lifecycle:TaskStateSubmittedwhen a new task is first received.TaskStateWorkingjust before agent execution starts.TaskStateCompleted,TaskStateFailed, orTaskStateInputRequiredafter execution ends, depending on agent output.TaskStateCanceledon cancellation requests.
Session Preparation:
The executor ensures a session context exists by querying the session service. If missing, it creates a new session to maintain state persistence for the agent run.Event Streaming and Translation:
The executor relies on an internaleventProcessorto convert ADK session events produced by the agent into A2A events. It streams these incrementally to the client through the provided event queue.Error Handling:
Execution errors and event processing failures are converted into terminal task failure events, ensuring clients receive clear failure states.
Interaction with Other System Components
Agent Runner (
runner.Runner):
TheExecutorcreates and invokes therunner.Runnerto run the ADK agent logic.Session Service (
session.Service):
Used to retrieve or create sessions that maintain state and event history for agents.Event Queue (
eventqueue.Queue):
A queue interface used to asynchronously send A2A events (task status, artifact updates) back to the requester.A2A Protocol Types (
a2a,a2asrv):
Utilizes A2A message and event types to comply with remote agent communication standards.Event Processing (
eventProcessor):
Converts session events into A2A events, managing artifacts, append flags, and terminal state translation.
The Executor thus acts as the execution engine bridging incoming A2A requests to local agent execution and streaming back A2A-compatible events.
Mermaid Diagram: Executor Structure and Workflow
sequenceDiagram
participant Client as A2A Client
participant Executor as Executor
participant Runner as runner.Runner
participant Processor as eventProcessor
participant Queue as eventqueue.Queue
Client->>Executor: Execute(request)
Executor->>Executor: prepareSession()
Executor->>Queue: Write(TaskStateSubmitted) [if new task]
Executor->>Queue: Write(TaskStateWorking)
Executor->>Runner: Run(agent input)
loop For each session.Event
Runner-->>Executor: session.Event
Executor->>Processor: process(session.Event)
Processor-->>Executor: a2a.Event
Executor->>Queue: Write(a2a.Event)
end
Executor->>Queue: Write(Terminal events)
Client->>Executor: Cancel(request)
Executor->>Queue: Write(TaskStateCanceled)
Summary of Main Entities
Entity | Description |
|---|---|
| Implements A2A task execution, event streaming, session management. |
| Holds configurations for runner creation and execution. |
| Orchestrates agent execution lifecycle and event streaming. |
| Handles task cancellation by sending cancel status event. |
| Runs the agent and translates session events to A2A events. |
| Ensures session exists before running the agent. |
| Underlying component that actually runs the agent logic. |
| Interface to send A2A events asynchronously to the client. |
| Converts internal session events to A2A events during execution. |
Usage Context
The executor.go file is part of the Remote Agent Communication (A2A) system. It implements the server-side executor that runs local ADK agents in response to remote A2A requests. It is tightly coupled with the agent runner infrastructure (Agent Execution Runner) and event processing (Event Conversion and Processing) to enable streaming, stateful, and cancellable remote agent execution. This file is central to supporting distributed multi-agent workflows where agents can be invoked remotely and report progress asynchronously via the Agent-To-Agent protocol.
For detailed event translation and agent invocation context, refer to the related topics: