stream.rs
Overview
This file provides a comprehensive implementation of streams used in a network communication context, supporting both bidirectional and unidirectional streams. It manages stream lifecycle, reading, and writing operations asynchronously with explicit state tracking and error handling. The streams are built atop a lower-level msquic API, which handles QUIC-based protocol operations.
The main abstraction is the Stream struct, which represents a logical stream endpoint and supports splitting into separate reader and writer halves (ReadStream and WriteStream). The file implements asynchronous reading and writing traits for these types, enabling integration with async runtimes.
Internal state management is carefully handled via the StreamInner struct, which uses locking (Mutex) for exclusive mutable state and read-write locking (RwLock) for shared state. The stream reacts to events via callbacks from msquic, updating internal states and waking async tasks accordingly.
Key Entities and Their Functionality
Enum: StreamType
Defines the type of stream:
Bidirectional: Allows simultaneous reading and writing.
Unidirectional: Allows either reading or writing, but not both.
Struct: Stream
Represents a single stream instance.
Creation and Initialization
open(msquic_conn: &msquic::Connection, stream_type: StreamType) -> Result<Self, StartError>
Opens a new stream on the given connection with the specified stream type. Returns aStreamon success or a StartError on failure.from_raw(handle: msquic::ffi::HQUIC, stream_type: StreamType) -> Self
Constructs aStreamfrom a raw handle, typically used when a stream is initiated by a peer.
Stream Lifecycle Methods
poll_start(&mut self, cx: &mut Context, failed_on_block: bool) -> Poll<Result<(), StartError>>
Polls the stream to start it, possibly failing if blocked. Wakes tasks when the start completes.
Stream Accessors
id(&self) -> Option<u64>
Returns the stream's unique identifier, if available.split(self) -> (Option<ReadStream>, Option<WriteStream>)
Splits the stream into read and write halves based onStreamTypeand whether the local endpoint opened the stream.
Reading Methods
poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, ReadError>>
Polls to read bytes from the stream into a buffer asynchronously.poll_read_chunk(&self, cx: &mut Context) -> Poll<Result<Option<StreamRecvBuffer>, ReadError>>
Polls to read the next data chunk as a StreamRecvBuffer.read_chunk(&self) -> ReadChunk<'_>
Returns a future representing an asynchronous read of a data chunk.
Writing Methods
poll_write(&mut self, cx: &mut Context, buf: &[u8], fin: bool) -> Poll<Result<usize, WriteError>>
Polls to write bytes to the stream.poll_write_chunk(&mut self, cx: &mut Context, chunk: &Bytes, fin: bool) -> Poll<Result<usize, WriteError>>
Polls to write a chunk of bytes directly.write_chunk<'a>(&'a mut self, chunk: &'a Bytes, fin: bool) -> WriteChunk<'a>
Returns a future for writing a chunk asynchronously.poll_write_chunks(&mut self, cx: &mut Context, chunks: &[Bytes], fin: bool) -> Poll<Result<usize, WriteError>>
Polls to write multiple chunks.write_chunks<'a>(&'a mut self, chunks: &'a [Bytes], fin: bool) -> WriteChunks<'a>
Returns a future for writing multiple chunks asynchronously.poll_finish_write(&mut self, cx: &mut Context) -> Poll<Result<(), WriteError>>
Polls to finish writing, shutting down the send side gracefully.poll_abort_write(&mut self, cx: &mut Context, error_code: u64) -> Poll<Result<(), WriteError>>
Polls to abort writing with a specified error code.abort_write(&mut self, error_code: u64) -> Result<(), WriteError>
Aborts writing immediately.
Reading Abort Methods
poll_abort_read(&mut self, cx: &mut Context, error_code: u64) -> Poll<Result<(), ReadError>>
Polls to abort reading.abort_read(&mut self, error_code: u64) -> Result<(), ReadError>
Aborts reading immediately.
Struct: ReadStream
A read-only view of a stream, wrapping an Arc<StreamInstance>.
Implements:
id() -> Option<u64>poll_read()poll_abort_read()abort_read()
Struct: WriteStream
A write-only view of a stream.
Implements:
id() -> Option<u64>poll_write()poll_write_chunk()write_chunk()poll_write_chunks()write_chunks()poll_finish_write()poll_abort_write()abort_write()
Struct: StreamInstance
Internal representation of a stream instance holding:
inner: Arc<StreamInner>— shared and exclusive state.msquic_stream: msquic::Stream— underlying msquic stream handle.
Important Methods
id(&self) -> Option<u64>
Gets or caches the stream ID.poll_read(&Arc<Self>, cx, buf)
Polls to read bytes using a generic read helper.poll_read_chunk(&Arc<Self>, cx)
Polls to read the next data chunk.read_chunk(&Arc<Self>) -> ReadChunk
Returns a future for reading a chunk.poll_write(&self, cx, buf, fin)
Polls to write bytes, using a generic write helper.poll_write_chunk(&self, cx, chunk, fin)
Polls to write a chunk.write_chunk(&self, chunk, fin)
Returns a future for writing a chunk.poll_write_chunks(&self, cx, chunks, fin)
Polls to write multiple chunks.write_chunks(&self, chunks, fin)
Returns a future for writing multiple chunks.poll_write_generic(&self, cx, write_fn)
Generic method to handle writing with state checks and buffer management.poll_read_generic(&self, cx, read_fn)
Generic method to handle reading with state checks and buffer management.poll_finish_write(&self, cx)
Polls to finish writing gracefully.poll_abort_write(&self, cx, error_code)
Polls to abort writing.abort_write(&self, error_code)
Immediately aborts writing.poll_abort_read(&self, cx, error_code)
Polls to abort reading.abort_read(&self, error_code)
Immediately aborts reading.read_complete(&self, buffer: &StreamRecvBuffer)
Marks a read buffer as complete, updating internal state and informing msquic of completed receive.
Event Handlers for msquic Callback Events
handle_event_start_complete()handle_event_receive()handle_event_send_complete()handle_event_peer_send_shutdown()handle_event_peer_send_aborted()handle_event_peer_receive_aborted()handle_event_send_shutdown_complete()handle_event_shutdown_complete()handle_event_ideal_send_buffer_size()handle_event_peer_accepted()
These manage state updates, buffer management, and wakeup of waiting tasks depending on the event type.
callback_handler_impl()
Dispatches callback events from msquic to the appropriate handler.
Struct: StreamInner
Holds the internal state of a stream split into exclusive and shared parts:
exclusive: Mutex<StreamInnerExclusive>— exclusive mutable state.shared: StreamInnerShared— shared read-only or read-mostly state.
Inner State Structures
StreamInnerExclusive— contains mutable state fields such as:state: StreamStaterecv_state: StreamRecvStatesend_state: StreamSendStaterecv_buffers: VecDeque<StreamRecvBuffer>read_complete_map: RangeSet<usize>write_pool: Vec<WriteBuffer>start_waiters: Vec<Waker>read_waiters: Vec<Waker>write_shutdown_waiters: Vec<Waker>Error codes and connection error state
StreamInnerShared— contains:stream_type: StreamTypelocal_open: boolid: RwLock<Option<u64>>— stream ID cache
Stream States
These enumerations track the lifecycle state of the stream:
StreamState(general lifecycle):Open, Start,StartComplete,ShutdownCompleteStreamRecvState(receive lifecycle):Closed, Start,StartComplete, Shutdown,ShutdownCompleteStreamSendState(send lifecycle):Closed, Start,StartComplete, Shutdown,ShutdownComplete
Async Read and Write Futures
ReadChunk<'a>: Future for asynchronously reading a data chunk.
WriteChunk<'a>: Future for asynchronously writing a chunk.WriteChunks<'a>: Future for asynchronously writing multiple chunks.
Error Types
StartError: Errors related to starting a stream.
ReadError: Errors related to reading from a stream.
WriteError: Errors related to writing to a stream.
All error types implement the std::error::Error trait and can be converted to std::io::Error for interoperability with IO traits.
Important Implementation Details and Algorithms
State Management: Stream state is carefully tracked in distinct states for send, receive, and overall lifecycle. This enables fine-grained control over stream readiness and error conditions.
Buffer Management: Incoming data is buffered in StreamRecvBuffer instances stored in a
VecDeque. The read_complete_map tracks ranges of fully read data to inform the underlying QUIC layer about consumed bytes.Asynchronous Polling: Reading and writing operations use generic helper methods (poll_read_generic,
poll_write_generic) that enforce state checks, manage buffers, and return appropriatePollstates (ReadyorPending) with results or errors.Waker Management: Tasks waiting on stream state changes (start, read availability, write shutdown) are stored in vectors of
Wakers and are woken appropriately when state changes occur.msquic Event Handling: The stream registers callbacks for events from the msquic library. Events such as start completion, data reception, send completion, shutdowns, and aborts are handled by dedicated methods that update internal state and wake waiting tasks.
Split Streams: The
splitmethod allows obtaining read-only and write-only handles (ReadStreamandWriteStream) from a singleStreaminstance, enforcing correct usage depending on stream type and ownership.Integration with Async Traits: Implements tokio::io::AsyncRead /
AsyncWriteand futures_io::AsyncRead /AsyncWriteforStream,ReadStream, andWriteStream, enabling seamless use in async contexts.
Interaction with Other Parts of the System
Relies on msquic for low-level QUIC stream operations (msquic::Stream, events, sending, and receiving).
Uses StreamRecvBuffer and WriteBuffer from the
buffermodule to manage byte buffers for reads and writes.Uses ConnectionError from the
connectionmodule to represent connection-level error states.Employs synchronization primitives from
parking_lot(Mutex,RwLock) for efficient concurrent state access.Uses
bytes::Bytesfor zero-copy byte buffers.Uses RangeSet from rangemap to track sets of byte ranges representing read completion.
Implements error types with thiserror::Error for structured error handling.
Usage Examples
Opening a new bidirectional stream
let stream = Stream::open(&connection, StreamType::Bidirectional)?;
Reading data asynchronously
let mut buf = vec![0u8; 1024];
match stream.poll_read(&mut cx, &mut buf) {
Poll::Ready(Ok(n)) => {
println!("Read {} bytes", n);
}
Poll::Pending => {
// Will be woken when data arrives
}
Poll::Ready(Err(e)) => {
eprintln!("Read error: {:?}", e);
}
}
Writing data asynchronously
let data = b"hello";
match stream.poll_write(&mut cx, data, false) {
Poll::Ready(Ok(n)) => {
println!("Wrote {} bytes", n);
}
Poll::Pending => {
// Will be woken when ready to write
}
Poll::Ready(Err(e)) => {
eprintln!("Write error: {:?}", e);
}
}
Splitting a stream
let (read_stream, write_stream) = stream.split();
if let Some(mut reader) = read_stream {
// Use reader.poll_read(...)
}
if let Some(mut writer) = write_stream {
// Use writer.poll_write(...)
}
Diagram of the Stream Structure and Workflow
classDiagram
class Stream {
+open()
+from_raw()
+poll_start()
+id()
+split()
+poll_read()
+poll_write()
+poll_finish_write()
+poll_abort_write()
+abort_write()
+poll_abort_read()
+abort_read()
}
class ReadStream {
+poll_read()
+poll_abort_read()
+abort_read()
}
class WriteStream {
+poll_write()
+poll_finish_write()
+poll_abort_write()
+abort_write()
}
class StreamInstance {
+id()
+poll_read()
+poll_write()
+poll_finish_write()
+poll_abort_write()
+abort_write()
+poll_abort_read()
+abort_read()
+read_complete()
+callback_handler_impl()
}
class StreamInner {
-exclusive: Mutex
-shared: StreamInnerShared
+handle_event_start_complete()
+handle_event_receive()
+handle_event_send_complete()
+handle_event_peer_send_shutdown()
+handle_event_peer_send_aborted()
+handle_event_peer_receive_aborted()
+handle_event_send_shutdown_complete()
+handle_event_shutdown_complete()
+handle_event_ideal_send_buffer_size()
+handle_event_peer_accepted()
}
class StreamInnerExclusive {
-state: StreamState
-recv_state: StreamRecvState
-send_state: StreamSendState
-recv_buffers: VecDeque
-write_pool: Vec
-start_waiters: Vec<Waker>
-read_waiters: Vec<Waker>
-write_shutdown_waiters: Vec<Waker>
}
class StreamInnerShared {
-stream_type: StreamType
-local_open: bool
-id: RwLock<Option<u64>>
}
Stream --> StreamInstance : contains
StreamInstance --> StreamInner : holds
StreamInner --> StreamInnerExclusive : owns
StreamInner --> StreamInnerShared : owns
Stream <|-- ReadStream : read-only view
Stream <|-- WriteStream : write-only view
Additional Notes
The file extensively uses Rust's asynchronous programming model with
Futures andPollto handle non-blocking IO.Mutex and RwLock from
parking_lotprovide efficient locking mechanisms for state synchronization.The design allows for zero-copy reads and writes via
Bytesand StreamRecvBuffer.Error handling differentiates between various categories like closed streams, resets, connection loss, and other errors.
The file integrates with the tokio and
futures_ioasync traits for compatibility with multiple async runtimes.Event handling is centralized in
callback_handler_impl, which dispatches QUIC stream events to specialized handlers.The
read_completemethod maintains strict tracking of which data ranges have been consumed and notifies the underlying QUIC implementation to update flow control accordingly.The
poll_startmethod handles stream start with nuanced flag management, including handling cases where stream limits are reached.The
splitmethod enforces correct usage patterns depending on stream directionality and ownership.The Drop implementations ensure tracing of stream lifecycle, though the actual shutdown on drop is commented out for potential safety considerations.
This documentation references general concepts related to asynchronous IO, stream lifecycle management, and buffer handling. For detailed information on related buffer management, see StreamRecvBuffer and WriteBuffer. For connection-related error handling, refer to ConnectionError. The asynchronous traits implemented relate to [tokio::io](/tokio-io) and [futures_io](/futures-io).