Executor and Server
Purpose
Within the broader scope of the Remote Agent Communication (A2A) topic, the Executor and Server subcomponent addresses the critical need to execute AI agents remotely while managing the lifecycle of their tasks, including event handling, queuing, and cancellation. This subtopic exists to bridge the gap between incoming A2A protocol requests and the internal agent execution framework, ensuring that distributed multi-agent workflows operate reliably and report progress/status correctly.
Specifically, this subtopic solves the problem of reliably invoking agents in response to A2A requests, transforming agent session events into A2A-compatible event streams, handling task states, and supporting cancellation semantics. It ensures remote tasks are tracked and that their results or failures propagate correctly through the event queue system.
Functionality
Core Responsibilities
Agent Execution: The Executor invokes the internal ADK agent runner (
runner.Runner) with the provided request context and input content, effectively running the remote agent logic.Event Translation and Streaming: It continuously converts internal session events generated by the agent into A2A protocol events, writing them to an event queue for downstream consumption.
Task Lifecycle Management: The Executor manages task state transitions by emitting standardized task status update events (e.g.,
Submitted,Working,Completed,Failed,Canceled).Session Preparation: Before execution, it ensures that the session context exists or creates a new one, enabling stateful agent interactions.
Cancellation Handling: Supports graceful task cancellation by emitting a
Canceledstatus update event upon request.
Execution Flow
Session and Task Initialization:
When a new A2A execute request arrives without an existing stored task, the Executor writes a
TaskStateSubmittedevent to the queue.It prepares or creates the session context to ensure agent state can be maintained.
Agent Invocation:
The Executor creates a new
runner.Runnerinstance configured to run the agent.It sends a
TaskStateWorkingevent signaling the start of agent processing.The agent is executed via runner.Runner.Run, which produces a stream of internal session events.
Event Processing and Translation:
An internal
eventProcessorreceives session events and transforms them into A2A events (e.g., artifact updates, status updates).Translated events are asynchronously written to the event queue, enabling clients to consume real-time task progress.
Terminal Event Emission:
After agent run completion, terminal events indicating success, failure, or partial completion are sent.
If the agent run fails or encounters errors, corresponding failure events are emitted.
Cancellation:
Upon cancellation request, the Executor immediately writes a
TaskStateCanceledevent to the queue.
Key Methods
Execute(ctx, reqCtx, queue) error: Orchestrates the entire remote agent execution flow, from session preparation to event streaming.process(ctx, runner, processor, content, queue) error: Internal method driving the iterative processing of agent events and queue writing.prepareSession(ctx, meta) error: Ensures the existence of a session for the agent invocation.Cancel(ctx, reqCtx, queue) error: Handles cancellation requests by updating task status.
Code Snippet Illustrating Execution Start
func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error {
// Prepare runner and session
r, err := runner.New(e.config.RunnerConfig)
if err != nil { return err }
if err := e.prepareSession(ctx, invocationMeta); err != nil {
// Emit failure event if session prep fails
queue.Write(ctx, toTaskFailedUpdateEvent(reqCtx, err, invocationMeta.eventMeta))
return nil
}
// Emit TaskStateWorking event
queue.Write(ctx, a2a.NewStatusUpdateEvent(reqCtx, a2a.TaskStateWorking, nil))
// Run agent and process events
return e.process(ctx, r, processor, content, queue)
}
Integration
The Executor and Server subtopic is a vital component within the Remote Agent Communication (A2A) topic, acting as the runtime engine that executes remote agents and manages their event lifecycle. It depends on:
The runner package to instantiate and execute agents in-process.
The session package to manage session state persistence.
The a2a and a2asrv packages to interface with the Agent-To-Agent protocol and server framework.
The eventqueue package to asynchronously stream task events to consumers.
This subtopic complements the other A2A subtopics:
A2A Agent Implementation: Provides the remote agent interface and communication protocol.
Event Conversion and Processing: Handles detailed translation of session events to A2A events, which the
Executorleverages via aneventProcessor.Agent Skill Building: Supplies skill descriptors for remote agents, which the Executor runs.
In the broader context, the Executor enables distributed multi-agent workflows by executing remote requests and reliably reporting progress, thereby integrating with session management (Session Management), agent execution coordination (Agent Execution Runner), and telemetry (Telemetry and Observability).
Diagram
sequenceDiagram
participant Client as A2A Client
participant Executor as Executor
participant Runner as runner.Runner
participant Processor as eventProcessor
participant Queue as eventqueue.Queue
Client->>Executor: Execute(request)
Executor->>Executor: prepareSession()
Executor->>Queue: Write(TaskStateSubmitted) [if new task]
Executor->>Queue: Write(TaskStateWorking)
Executor->>Runner: Run(agent input)
loop For each session.Event
Runner-->>Executor: session.Event
Executor->>Processor: process(session.Event)
Processor-->>Executor: a2a.Event
Executor->>Queue: Write(a2a.Event)
end
Executor->>Queue: Write(Terminal events)
Client->>Executor: Cancel(request)
Executor->>Queue: Write(TaskStateCanceled)
This sequence diagram demonstrates the Executor's orchestration of remote agent execution, event translation, and event queue management, including handling cancellation requests.