stream_aggregator.go

Overview

The stream_aggregator.go file defines the streamingResponseAggregator type and associated functions to aggregate partial streaming responses from a language model's streaming output. It processes incremental model outputs, merges partial text and thought content, and generates both intermediate and final aggregated responses. This functionality is crucial for handling streaming LLM responses that may arrive in multiple parts, enabling downstream consumers to receive coherent, accumulated content progressively or at the end of the stream.

Types and Functions

Type: streamingResponseAggregator

A struct that maintains internal state while aggregating partial streaming responses from a model. It keeps track of:

This struct facilitates incremental assembly of partial content into meaningful aggregated responses.


Function: NewStreamingResponseAggregator() *streamingResponseAggregator

Creates and returns a new, initialized instance of streamingResponseAggregator. The aggregator starts empty, ready to process streaming responses.

Usage example:

aggregator := NewStreamingResponseAggregator()

Method: (s *streamingResponseAggregator) ProcessResponse(ctx context.Context, genResp *genai.GenerateContentResponse) iter.Seq2[*model.LLMResponse, error]

Processes a single streaming response (genai.GenerateContentResponse) from the model and returns an iterator function that yields:

Parameters:

Returns:
An iterator function conforming to iter.Seq2 which yields (LLMResponse, error) pairs. Intermediate aggregated responses are yielded first if available, followed by the direct transformed response.

Behavior:

Usage example:

seq := aggregator.ProcessResponse(ctx, genResp)
seq(func(resp *model.LLMResponse, err error) bool {
    if err != nil {
        // handle error
        return false
    }
    // handle response
    return true
})

Method: (s *streamingResponseAggregator) aggregateResponse(llmResponse *model.LLMResponse) *model.LLMResponse

Aggregates the content of a single partial LLMResponse into the internal buffer and decides whether to produce an aggregated response event.

Parameters:

Returns:

Implementation details:


Method: (s *streamingResponseAggregator) Close() *model.LLMResponse

Finalizes the aggregation by generating any remaining aggregated response after all partial responses have been processed.

Returns:

Usage:

Call this method after the streaming sequence ends to retrieve the final aggregated message.


Method: (s *streamingResponseAggregator) createAggregateResponse() *model.LLMResponse

Constructs an aggregated LLMResponse from the internal accumulated text and thoughtText.

Returns:

Implementation details:


Method: (s *streamingResponseAggregator) clear()

Resets the internal state of the aggregator, clearing accumulated text, thoughtText, stored response, and role.


Important Implementation Details and Algorithms


Interaction with Other System Components


Usage Workflow Diagram

flowchart TD
A[Receive genai.GenerateContentResponse] --> B["ProcessResponse()"]
B --> C{Has candidates?}
C -- No --> D[Yield error]
C -- Yes --> E[Convert to model.LLMResponse]
E --> F["aggregateResponse()"]
F --> G{Aggregated response ready?}
G -- Yes --> H[Yield aggregated response]
G -- No --> I[Skip yield]
E --> J[Yield current response]
J --> K[Wait for next genai.GenerateContentResponse]
K --> B
subgraph Close Stream
L["Call Close()"] --> M["createAggregateResponse()"]
M --> N[Yield final aggregated response if any]
end

Class Diagram

classDiagram
class streamingResponseAggregator {
-text : string
-thoughtText : string
-response : *model.LLMResponse
-role : string
+NewStreamingResponseAggregator() *streamingResponseAggregator
+ProcessResponse(ctx context.Context, genResp *genai.GenerateContentResponse) iter.Seq2[*model.LLMResponse, error]
+aggregateResponse(llmResponse *model.LLMResponse) *model.LLMResponse
+Close() *model.LLMResponse
+createAggregateResponse() *model.LLMResponse
+clear()
}

Summary of Key Points