processor.go
Overview
The processor.go file implements the core logic for converting and processing ADK session events generated by agents into Agent-To-Agent (A2A) protocol events. This includes transforming incremental LLM response parts into A2A artifact update events, managing terminal task states (such as failed, input required, and completed), and embedding escalation or transfer actions into the final event metadata.
This file contains the eventProcessor type, which encapsulates the state and methods required to process a stream of session events during an agent invocation, producing coherent A2A events representing partial outputs and final task status updates. It is a crucial component in the event conversion and processing pipeline of the Remote Agent Communication (A2A) subsystem, ensuring a smooth translation of session-level agent events into the standardized A2A protocol events for communication over the network.
Types and Functions
type eventProcessor
The eventProcessor struct is responsible for processing ADK session events and converting them into A2A task artifact update events and task status update events. It maintains internal state to track terminal conditions and the artifact ID used for streaming partial results.
Fields
reqCtx *a2asrv.RequestContext
The request context containing metadata and configuration for the current A2A request.meta invocationMeta
Metadata related to the invocation, used for enriching event metadata.terminalActions session.EventActions
Tracks terminal actions such as escalation and agent transfer accumulated from processed events, which are embedded in terminal event metadata.responseID a2a.ArtifactID
The artifact ID assigned when the first artifact update event is sent; used for subsequent artifact updates.terminalEvents map[a2a.TaskState]*a2a.TaskStatusUpdateEvent
Holds terminal events (failed or input required) postponed until the entire ADK response is saved as an artifact. The highest-priority event is sent as the final task status update.
func newEventProcessor(reqCtx *a2asrv.RequestContext, meta invocationMeta) *eventProcessor
Creates a new eventProcessor instance initialized with the given request context and invocation metadata.
Parameters:
reqCtx: The A2A request context.meta: Invocation metadata for event enrichment.
Returns:
A pointer to a neweventProcessorready to process session events.Usage Example:
processor := newEventProcessor(requestContext, invocationMetadata)
func (p *eventProcessor) process(ctx context.Context, event *session.Event) (*a2a.TaskArtifactUpdateEvent, error)
Processes a single ADK session event, converting it into an A2A TaskArtifactUpdateEvent that represents the incremental output parts from the event's LLM response. It also updates internal terminal event tracking based on error codes or input-required conditions.
Parameters:
ctx: The context for cancellation and deadlines (currently unused in this method).event: The ADK session event to process.
Returns:
A pointer to an A2A
TaskArtifactUpdateEventcontaining new output parts to send, ornilif no output should be sent for this event.An error if processing or conversion fails.
Details:
If the event is
nilor contains no content parts, returnsnil.Updates terminal actions (
escalate,transferToAgent) based on event actions.Converts event metadata for inclusion in the result.
Detects error codes in the LLM response and records a failed terminal event if not already set.
Checks if input is required due to long-running tool function calls; if so, marks a terminal event of type
input_required.Converts content parts to A2A parts using
ToA2AParts.Creates an artifact event for the first response or an artifact update event for subsequent parts.
Enriches the event with metadata.
Usage Example:
artifactUpdate, err := processor.process(ctx, sessionEvent)
if err != nil {
// handle error
}
if artifactUpdate != nil {
// send artifact update event over A2A protocol
}
func (p *eventProcessor) makeTerminalEvents() []a2a.Event
Generates the terminal A2A events to be sent after all session events are processed. This includes:
Closing the artifact stream with a last chunk event if a response ID exists.
Emitting the highest-priority terminal event (
failedorinput_required) if present, embedding terminal actions metadata.If no terminal events exist, sending a
completedstatus update event marked as final.Returns:
A slice of A2A events representing the final task artifact closure and terminal task status.Usage Example:
terminalEvents := processor.makeTerminalEvents()
for _, ev := range terminalEvents {
sendEvent(ev)
}
func (p *eventProcessor) makeTaskFailedEvent(cause error, event *session.Event) *a2a.TaskStatusUpdateEvent
Creates an A2A task failed status update event from an error cause and optionally enriches it with metadata derived from the associated session event.
Parameters:
cause: The error cause for the failure.event: The session event related to the failure, may benil.
Returns:
A pointer to an A2ATaskStatusUpdateEventwith statefailed, containing the error message and metadata.Usage Example:
failEvent := processor.makeTaskFailedEvent(err, sessionEvent)
func (p *eventProcessor) updateTerminalActions(event *session.Event)
Updates the internal terminalActions flags based on the provided event's actions:
Sets
Escalateto true if the event requests escalation.Updates
TransferToAgentif the event specifies a transfer target.Parameters:
event: The session event whose actions are processed.
Usage: Called internally while processing events to track terminal actions for final event metadata.
func toTaskFailedUpdateEvent(task a2a.TaskInfoProvider, cause error, meta map[string]any) *a2a.TaskStatusUpdateEvent
Helper function creating a terminal failed status update event given a task provider, error cause, and metadata.
Parameters:
task: The task info provider (e.g., request context) for event context.cause: The error to report.meta: Metadata key-value map to attach to the event.
Returns:
A newTaskStatusUpdateEventwith the failed state and error message.
func isInputRequired(event *session.Event, parts []*genai.Part) bool
Determines if the event requires user input based on whether any of the content parts correspond to long-running function calls specified in event.LongRunningToolIDs.
Parameters:
event: The session event.parts: The content parts of the LLM response.
Returns:
trueif any part is a function call matching a long-running tool ID, indicating input is required.
func errorFromResponse(resp *model.LLMResponse) error
Constructs an error from an LLM response's error message field.
Parameters:
resp: The LLM response containing the error message.
Returns:
An error wrapping the LLM error description.
Important Implementation Details
Terminal Event Postponement:
Terminal events for failure or input required are stored interminalEventsand only sent after all partial artifact updates are processed. This ensures the full response content is saved before signaling task completion or failure.Artifact ID Management:
TheresponseIDfield is assigned when the first artifact event is created, enabling all subsequent updates to reference the same artifact stream.Action Metadata Propagation:
Escalation and agent transfer actions are accumulated from session events and injected into the metadata of terminal events to ensure proper handling by the caller or remote agent.Input Required Detection:
The processor detects "input required" states by checking for function call parts matching long-running tool IDs, marking the task as waiting for user input.Error Handling in LLM Responses:
If the LLM response contains an error code, a failed terminal event is recorded immediately.Content Conversion:
The processor relies on theToA2APartsfunction (defined elsewhere) to convert between ADKgenai.Partcontent parts and A2A parts for transport.
Interaction with Other System Components
Remote Agent Communication (A2A):
This file is part of the event conversion and processing layer that bridges the ADK session events and A2A protocol events, enabling remote agent interactions over gRPC/HTTP.Session Management (80559):
Processessession.Eventobjects produced by session-based agent executions.Agent Execution Runner (80560):
Utilizes theeventProcessorto convert incremental session events into A2A events during agent invocation.Artifact Management (80557):
Works with artifact IDs and partial updates to stream multi-part responses.Event Conversion and Processing (80591):
Complements other utilities in the same subtopic that convert events and parts bidirectionally.
Visual Diagram
classDiagram
class eventProcessor {
-reqCtx: RequestContext
-meta: invocationMeta
-terminalActions: EventActions
-responseID: ArtifactID
-terminalEvents: map<TaskState, TaskStatusUpdateEvent>
+process(ctx, event): *TaskArtifactUpdateEvent
+makeTerminalEvents(): []Event
+makeTaskFailedEvent(cause, event): *TaskStatusUpdateEvent
+updateTerminalActions(event)
}
This documentation references Remote Agent Communication (A2A) for the overall protocol and event translation context and Event Conversion and Processing for related conversion functions and event stream management.