processor.go

Overview

The processor.go file implements the core logic for converting and processing ADK session events generated by agents into Agent-To-Agent (A2A) protocol events. This includes transforming incremental LLM response parts into A2A artifact update events, managing terminal task states (such as failed, input required, and completed), and embedding escalation or transfer actions into the final event metadata.

This file contains the eventProcessor type, which encapsulates the state and methods required to process a stream of session events during an agent invocation, producing coherent A2A events representing partial outputs and final task status updates. It is a crucial component in the event conversion and processing pipeline of the Remote Agent Communication (A2A) subsystem, ensuring a smooth translation of session-level agent events into the standardized A2A protocol events for communication over the network.


Types and Functions

type eventProcessor

The eventProcessor struct is responsible for processing ADK session events and converting them into A2A task artifact update events and task status update events. It maintains internal state to track terminal conditions and the artifact ID used for streaming partial results.

Fields


func newEventProcessor(reqCtx *a2asrv.RequestContext, meta invocationMeta) *eventProcessor

Creates a new eventProcessor instance initialized with the given request context and invocation metadata.

processor := newEventProcessor(requestContext, invocationMetadata)

func (p *eventProcessor) process(ctx context.Context, event *session.Event) (*a2a.TaskArtifactUpdateEvent, error)

Processes a single ADK session event, converting it into an A2A TaskArtifactUpdateEvent that represents the incremental output parts from the event's LLM response. It also updates internal terminal event tracking based on error codes or input-required conditions.

artifactUpdate, err := processor.process(ctx, sessionEvent)
if err != nil {
    // handle error
}
if artifactUpdate != nil {
    // send artifact update event over A2A protocol
}

func (p *eventProcessor) makeTerminalEvents() []a2a.Event

Generates the terminal A2A events to be sent after all session events are processed. This includes:

terminalEvents := processor.makeTerminalEvents()
for _, ev := range terminalEvents {
    sendEvent(ev)
}

func (p *eventProcessor) makeTaskFailedEvent(cause error, event *session.Event) *a2a.TaskStatusUpdateEvent

Creates an A2A task failed status update event from an error cause and optionally enriches it with metadata derived from the associated session event.

failEvent := processor.makeTaskFailedEvent(err, sessionEvent)

func (p *eventProcessor) updateTerminalActions(event *session.Event)

Updates the internal terminalActions flags based on the provided event's actions:


func toTaskFailedUpdateEvent(task a2a.TaskInfoProvider, cause error, meta map[string]any) *a2a.TaskStatusUpdateEvent

Helper function creating a terminal failed status update event given a task provider, error cause, and metadata.


func isInputRequired(event *session.Event, parts []*genai.Part) bool

Determines if the event requires user input based on whether any of the content parts correspond to long-running function calls specified in event.LongRunningToolIDs.


func errorFromResponse(resp *model.LLMResponse) error

Constructs an error from an LLM response's error message field.


Important Implementation Details


Interaction with Other System Components


Visual Diagram

classDiagram
class eventProcessor {
-reqCtx: RequestContext
-meta: invocationMeta
-terminalActions: EventActions
-responseID: ArtifactID
-terminalEvents: map<TaskState, TaskStatusUpdateEvent>
+process(ctx, event): *TaskArtifactUpdateEvent
+makeTerminalEvents(): []Event
+makeTaskFailedEvent(cause, event): *TaskStatusUpdateEvent
+updateTerminalActions(event)
}

This documentation references Remote Agent Communication (A2A) for the overall protocol and event translation context and Event Conversion and Processing for related conversion functions and event stream management.