send_stream.rs
Overview
This file implements the functionality for managing send streams in a network protocol context, specifically focusing on bidirectional or unidirectional streams within a QUIC-based transport layer. It provides an abstraction for opening, sending data on, finishing, aborting, and handling lifecycle events of streams that send data. The implementation integrates tightly with the underlying msquic library to handle asynchronous stream operations, state transitions, and event callbacks.
The primary structure exposed is SendStream, representing a sending stream associated with a QUIC connection. Internally, it manages state using StreamInstance and StreamInner types, which coordinate sending buffers, track stream states, and invoke callbacks on various stream events.
Main Entities and Their Responsibilities
SendStream
Purpose: Represents a handle to a unidirectional sending stream.
Encapsulates: An
Arc-wrappedStreamInstance.Key Methods:
open(msquic_conn: &msquic::Connection) -> Result<Self, StartError>
Opens a new unidirectional send stream on a given connection. Registers a callback handler for stream events.debug_id(&self) -> String
Returns a debug string with the pointer to the inner stream and its ID.poll_start(&mut self, cx: &mut Context) -> Poll<Result<(), StartError>>
Polls the stream start operation, handling asynchronous initiation and backpressure.id(&self) -> Option<u64>
Returns the stream's unique identifier if available.poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>>
Polls the completion of writing data to the stream.poll_abort(&mut self, cx: &mut Context<'_>, error_code: u64) -> Poll<Result<(), WriteError>>
Polls the abortion of the stream writing with a specified error code.abort(&mut self, error_code: u64) -> Result<(), WriteError>
Immediately aborts the stream writing.send(&mut self, buf: Vec<u8>) -> impl Future<Output = Result<(), WriteError>>
Asynchronously sends a buffer of data on the stream.
Usage Example:
let mut send_stream = SendStream::open(&connection)?;
futures::executor::block_on(send_stream.send(vec![1, 2, 3]))?;
send_stream.poll_finish(&mut cx)?;
StreamInstance
Purpose: Internal representation of a stream, holding state and the underlying
msquic::Stream.Fields:
inner: Arc<StreamInner>— Shared and exclusive state.msquic_stream: msquic::Stream— Handle to the raw msquic stream.
Key Methods:
id(&self) -> Option<u64>
Lazy retrieval and caching of the stream ID.check_sending(&self, exclusive: &mut StreamInnerExclusive) -> Result<(), msquic::Status>
Sends all queued buffers through the underlying msquic stream.poll_send_data(&self, cx: &mut Context<'_>, seq_no: usize) -> Poll<Result<(), WriteError>>
Polls for readiness to send data associated with a sequence number.send(&self, data: Vec<u8>) -> SendData<'_>
Queues data for sending and returns a future that resolves when sending completes.poll_finish_write(&self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>>
Polls for graceful finish of stream write operations.poll_abort(&self, cx: &mut Context<'_>, error_code: u64) -> Poll<Result<(), WriteError>>
Polls for aborting the stream write with an error code.abort(&self, error_code: u64) -> Result<(), WriteError>
Immediately aborts sending on the stream.
Implements
Dropto trace and potentially clean up on drop.
StreamInner
Purpose: Contains the internal exclusive and shared states of a stream.
Fields:
exclusive: Mutex<StreamInnerExclusive>— Exclusive mutable state guarded by a mutex.shared: StreamInnerShared— Shared read-write state.
Key Methods:
new(send_state: StreamSendState) -> Self
Creates a newStreamInnerwith initial send state.Event handlers invoked by the msquic callback:
handle_event_start_complete(...)
Handles the start completion of a stream, updating state and notifying waiters.handle_event_send_complete(...)
Processes send completion events, removing buffers and waking waiters.handle_event_peer_receive_aborted(...)
Responds to peer aborting receive, updating state and waking waiters.handle_event_send_shutdown_complete(...)
Handles graceful shutdown completion of sending.handle_event_shutdown_complete(...)
Handles stream shutdown completion, updating errors and states.handle_event_ideal_send_buffer_size(...)
Logs ideal send buffer size events.handle_event_peer_accepted()
Marks the stream as accepted by the peer.
callback_handler_impl(...) -> Result<(), msquic::Status>
Dispatches msquic stream events to appropriate handlers.
Implements
Dropfor tracing on destruction.
StreamInnerExclusive
Represents the mutable exclusive state guarded by a mutex.
Tracks:
state: StreamState— Current lifecycle state.start_status: Option<msquic::Status>— Result of start operation.send_state: StreamSendState— Sending lifecycle state.Queues and sets managing buffers, sequence numbers, and waiters.
Optional error codes for send and connection errors.
StreamInnerShared
Contains shared state accessible via
RwLock.Currently holds an optional stream ID.
Enums: Stream States
StreamState:OpenStartStartCompleteShutdownComplete
StreamSendState:ClosedStartStartCompleteShutdownShutdownComplete
These states track the lifecycle of the stream and its sending capabilities.
SendData<'a>
Purpose: A future returned by
StreamInstance::sendthat completes when the data has been sent.Fields:
stream: &'a StreamInstanceseq_no: usize— Sequence number of the data buffer.
Implements
Futurewith output typeResult<(), WriteError>.The
pollfunction delegates toStreamInstance::poll_send_data.
WriteError
Enumeration of possible write errors:
Closed— Stream not open for writing.Finished— Stream finished writing.Stopped(u64)— Stream stopped by peer with an error code.ConnectionLost(ConnectionError)— Connection lost during writing.OtherError(msquic::Status)— Other msquic-related errors.
Implements
std::error::Errorand conversion intostd::io::Error.
Buffer
Wraps a boxed buffer holding data to send.
Contains:
seq_no: usize— Sequence number for tracking._data: Box<[u8]>— Owned data buffer.buffer: [msquic::BufferRef; 1]— msquic buffer reference.
Provides:
new(seq_no, data)— Creates a new buffer.from_rawandinto_rawfor unsafe conversion to/from raw pointers (used in callbacks).get_buffers()returns raw pointers expected by msquic.
Marked
SendandSyncvia unsafe impls due to internal mutability and concurrency considerations.
Implementation Details and Algorithms
Asynchronous Polling:
The file heavily uses Rust'sFuturetrait andContext/Wakerto implement asynchronous polling for stream start, sending, finishing, and aborting. This allows the calling code to await stream readiness without blocking.Send Queue Management:
Outbound data buffers are queued in aVecDequeand tracked by sequence numbers in aHashSet. The sending algorithm attempts to send all queued buffers using the underlying msquic stream's send API. When a send completes, the associated buffer is removed and waiters are notified.State Machine:
The stream and send operations are governed by state machines (StreamStateandStreamSendState). Transitions occur in response to msquic events such as start completion, send completion, peer abort, and shutdown.Event Callbacks:
The msquic library invokes callbacks on stream events, which are dispatched to internal handlers implementing logic for state updates, error tracking, and waking asynchronous tasks waiting for state changes.Thread Safety:
The internal mutable state is protected withparking_lot::MutexandRwLockto allow safe concurrent access from multiple threads or asynchronous contexts.Error Handling:
The code maps msquic status codes to domain-specific errors (StartError,WriteError), enabling precise error reporting and recovery.
Interaction with Other System Components
msquic Library:
This file acts as a wrapper and manager for streams created by themsquicQUIC implementation. It usesmsquic::Streamfor raw stream operations and handles event callbacks.Connection Layer:
SendStream::openrequires amsquic::Connectionreference, indicating integration with connection management code.ConnectionError:
This file referencesConnectionErrorfromsuper::connectionfor error propagation when the connection is lost or aborted.Async Runtime:
The use of futures and wakers implies this module is intended to run within an async executor or runtime supportingContextpolling.
Visual Diagram: Structure of send_stream.rs
classDiagram
class SendStream {
+open()
+debug_id()
+poll_start()
+id()
+poll_finish()
+poll_abort()
+abort()
+send()
}
class StreamInstance {
+id()
+check_sending()
+poll_send_data()
+send()
+poll_finish_write()
+poll_abort()
+abort()
}
class StreamInner {
+new()
+handle_event_start_complete()
+handle_event_send_complete()
+handle_event_peer_receive_aborted()
+handle_event_send_shutdown_complete()
+handle_event_shutdown_complete()
+handle_event_ideal_send_buffer_size()
+handle_event_peer_accepted()
+callback_handler_impl()
}
class StreamInnerExclusive {
-state
-start_status
-send_state
-sending_queue
-sending_seq_no
-next_seq_no
-send_error_code
-conn_error
-start_waiters
-send_waiters
-write_shutdown_waiters
}
class StreamInnerShared {
-id
}
class SendData {
+poll()
}
class Buffer {
+new()
+from_raw()
+into_raw()
+get_buffers()
}
class WriteError
SendStream --> StreamInstance : contains
StreamInstance --> StreamInner : contains
StreamInner --> StreamInnerExclusive : contains (Mutex)
StreamInner --> StreamInnerShared : contains
StreamInstance ..> SendData : returns Future
StreamInstance ..> WriteError : returns on errors
StreamInstance ..> Buffer : manages
Summary of Key Concepts Referenced
See
ConnectionErrorfor error details related to connection state.See
msquic::Streamfor underlying stream operations and event types.See
StartErrorfor errors related to stream start operations.See
WakerandFuturefor async task notification and polling mechanics.