stream_aggregator.go
Overview
The stream_aggregator.go file defines the streamingResponseAggregator type and associated functions to aggregate partial streaming responses from a language model's streaming output. It processes incremental model outputs, merges partial text and thought content, and generates both intermediate and final aggregated responses. This functionality is crucial for handling streaming LLM responses that may arrive in multiple parts, enabling downstream consumers to receive coherent, accumulated content progressively or at the end of the stream.
Types and Functions
Type: streamingResponseAggregator
A struct that maintains internal state while aggregating partial streaming responses from a model. It keeps track of:
text— aggregated plain text content parts.thoughtText— aggregated "thought" content parts (a special semantic marker).response— the most recent full model response used as a basis for aggregation.role— the role associated with the content (e.g., the speaker or agent role).
This struct facilitates incremental assembly of partial content into meaningful aggregated responses.
Function: NewStreamingResponseAggregator() *streamingResponseAggregator
Creates and returns a new, initialized instance of streamingResponseAggregator. The aggregator starts empty, ready to process streaming responses.
Usage example:
aggregator := NewStreamingResponseAggregator()
Method: (s *streamingResponseAggregator) ProcessResponse(ctx context.Context, genResp *genai.GenerateContentResponse) iter.Seq2[*model.LLMResponse, error]
Processes a single streaming response (genai.GenerateContentResponse) from the model and returns an iterator function that yields:
Aggregated intermediate responses when appropriate.
The transformed individual response converted to
*model.LLMResponse.
Parameters:
ctx: The context for cancellation or deadlines.genResp: The raw generated content response from the model's streaming API.
Returns:
An iterator function conforming to iter.Seq2 which yields (LLMResponse, error) pairs. Intermediate aggregated responses are yielded first if available, followed by the direct transformed response.
Behavior:
Validates
genResphas candidates; errors if empty.Converts
genRespto an internalLLMResponse.Aggregates partial content using
aggregateResponse.Yields aggregated responses before yielding the current full response.
Stops yielding if the consumer signals to stop.
Usage example:
seq := aggregator.ProcessResponse(ctx, genResp)
seq(func(resp *model.LLMResponse, err error) bool {
if err != nil {
// handle error
return false
}
// handle response
return true
})
Method: (s *streamingResponseAggregator) aggregateResponse(llmResponse *model.LLMResponse) *model.LLMResponse
Aggregates the content of a single partial LLMResponse into the internal buffer and decides whether to produce an aggregated response event.
Parameters:
llmResponse: The current partial or full LLM response to process.
Returns:
A pointer to an aggregated
LLMResponseif the aggregation criteria are met (e.g., end of stream or zero parts), ornilif no aggregated response is ready.
Implementation details:
Extracts the first content part (
part0) if available.Appends text or thought text to internal buffers based on
part0.Thoughtflag.Handles special cases like empty parts to filter out invalid responses.
Produces an aggregated response when receiving a response with zero parts or audio data, combining accumulated text/thought parts.
Marks partial responses with
Partial = trueto indicate streaming status.
Method: (s *streamingResponseAggregator) Close() *model.LLMResponse
Finalizes the aggregation by generating any remaining aggregated response after all partial responses have been processed.
Returns:
Aggregated
LLMResponsecontaining all accumulated content, ornilif nothing to aggregate.
Usage:
Call this method after the streaming sequence ends to retrieve the final aggregated message.
Method: (s *streamingResponseAggregator) createAggregateResponse() *model.LLMResponse
Constructs an aggregated LLMResponse from the internal accumulated text and thoughtText.
Returns:
A new
LLMResponsewith combinedgenai.Partentries for thought and text, preserving metadata from the last processed response.Returns
nilif there is no content to aggregate.
Implementation details:
Creates parts for thought and text if present.
Resets internal state after aggregation via
clear.
Method: (s *streamingResponseAggregator) clear()
Resets the internal state of the aggregator, clearing accumulated text, thoughtText, stored response, and role.
Important Implementation Details and Algorithms
The aggregator maintains two separate buffers: one for normal text and one for "thought" text. This distinction allows different treatment of content semantic types.
Partial responses are marked with the
Partialflag to inform consumers that the response is incomplete.The aggregator is designed to handle streaming output from Gemini 3, which may send a final empty part signaling the end of output.
Aggregation triggers when a response with zero content parts or non-text (e.g., audio) data arrives, at which point the accumulated content is flushed as an aggregate response.
Uses reflection to identify empty parts (
reflect.ValueOf(*part0).IsZero()) to filter out empty streaming events.The design allows incremental yielding of both aggregated partial responses and individual model responses as part of a streaming consumption pipeline.
Interaction with Other System Components
Converts raw streaming content responses from the
genai.GenerateContentResponsetype (from the Gemini AI model API) to the internalmodel.LLMResponsetype usingconverters.Genai2LLMResponse.The output of
ProcessResponseis an iterable sequence of*model.LLMResponseobjects, which can be consumed by downstream components such as session managers or agent runners that handle LLM output.Integrates within the LLM response handling pipeline, likely used by agent orchestration or session management layers that require coherent assembly of streamed LLM output.
The interaction with
genaiandmodelpackages situates this aggregator in the LLM Integration and Agents ecosystem, serving as a lower-level utility for managing streaming LLM outputs.
Usage Workflow Diagram
flowchart TD
A[Receive genai.GenerateContentResponse] --> B["ProcessResponse()"]
B --> C{Has candidates?}
C -- No --> D[Yield error]
C -- Yes --> E[Convert to model.LLMResponse]
E --> F["aggregateResponse()"]
F --> G{Aggregated response ready?}
G -- Yes --> H[Yield aggregated response]
G -- No --> I[Skip yield]
E --> J[Yield current response]
J --> K[Wait for next genai.GenerateContentResponse]
K --> B
subgraph Close Stream
L["Call Close()"] --> M["createAggregateResponse()"]
M --> N[Yield final aggregated response if any]
end
Class Diagram
classDiagram
class streamingResponseAggregator {
-text : string
-thoughtText : string
-response : *model.LLMResponse
-role : string
+NewStreamingResponseAggregator() *streamingResponseAggregator
+ProcessResponse(ctx context.Context, genResp *genai.GenerateContentResponse) iter.Seq2[*model.LLMResponse, error]
+aggregateResponse(llmResponse *model.LLMResponse) *model.LLMResponse
+Close() *model.LLMResponse
+createAggregateResponse() *model.LLMResponse
+clear()
}
Summary of Key Points
streamingResponseAggregatorhandles incremental assembly of partial LLM streaming outputs.It distinguishes between normal text and "thought" text to aggregate separately.
Yields aggregated intermediate responses and final aggregated responses.
Uses reflection to handle edge cases in streaming parts.
Designed to integrate with Gemini AI streaming responses and internal LLMResponse models.
Supports graceful closing to flush remaining buffered content.