stream.rs

Overview

This file provides a comprehensive implementation of streams used in a network communication context, supporting both bidirectional and unidirectional streams. It manages stream lifecycle, reading, and writing operations asynchronously with explicit state tracking and error handling. The streams are built atop a lower-level msquic API, which handles QUIC-based protocol operations.

The main abstraction is the Stream struct, which represents a logical stream endpoint and supports splitting into separate reader and writer halves (ReadStream and WriteStream). The file implements asynchronous reading and writing traits for these types, enabling integration with async runtimes.

Internal state management is carefully handled via the StreamInner struct, which uses locking (Mutex) for exclusive mutable state and read-write locking (RwLock) for shared state. The stream reacts to events via callbacks from msquic, updating internal states and waking async tasks accordingly.


Key Entities and Their Functionality

Enum: StreamType

Defines the type of stream:

Struct: Stream

Represents a single stream instance.

Creation and Initialization

Stream Lifecycle Methods

Stream Accessors

Reading Methods

Writing Methods

Reading Abort Methods


Struct: ReadStream

A read-only view of a stream, wrapping an Arc<StreamInstance>.

Implements:


Struct: WriteStream

A write-only view of a stream.

Implements:


Struct: StreamInstance

Internal representation of a stream instance holding:

Important Methods

Event Handlers for msquic Callback Events

These manage state updates, buffer management, and wakeup of waiting tasks depending on the event type.


Struct: StreamInner

Holds the internal state of a stream split into exclusive and shared parts:

Inner State Structures


Stream States

These enumerations track the lifecycle state of the stream:


Async Read and Write Futures


Error Types

All error types implement the std::error::Error trait and can be converted to std::io::Error for interoperability with IO traits.


Important Implementation Details and Algorithms


Interaction with Other Parts of the System


Usage Examples

Opening a new bidirectional stream

let stream = Stream::open(&connection, StreamType::Bidirectional)?;

Reading data asynchronously

let mut buf = vec![0u8; 1024];
match stream.poll_read(&mut cx, &mut buf) {
    Poll::Ready(Ok(n)) => {
        println!("Read {} bytes", n);
    }
    Poll::Pending => {
        // Will be woken when data arrives
    }
    Poll::Ready(Err(e)) => {
        eprintln!("Read error: {:?}", e);
    }
}

Writing data asynchronously

let data = b"hello";
match stream.poll_write(&mut cx, data, false) {
    Poll::Ready(Ok(n)) => {
        println!("Wrote {} bytes", n);
    }
    Poll::Pending => {
        // Will be woken when ready to write
    }
    Poll::Ready(Err(e)) => {
        eprintln!("Write error: {:?}", e);
    }
}

Splitting a stream

let (read_stream, write_stream) = stream.split();
if let Some(mut reader) = read_stream {
    // Use reader.poll_read(...)
}
if let Some(mut writer) = write_stream {
    // Use writer.poll_write(...)
}

Diagram of the Stream Structure and Workflow

classDiagram
class Stream {
+open()
+from_raw()
+poll_start()
+id()
+split()
+poll_read()
+poll_write()
+poll_finish_write()
+poll_abort_write()
+abort_write()
+poll_abort_read()
+abort_read()
}
class ReadStream {
+poll_read()
+poll_abort_read()
+abort_read()
}
class WriteStream {
+poll_write()
+poll_finish_write()
+poll_abort_write()
+abort_write()
}
class StreamInstance {
+id()
+poll_read()
+poll_write()
+poll_finish_write()
+poll_abort_write()
+abort_write()
+poll_abort_read()
+abort_read()
+read_complete()
+callback_handler_impl()
}
class StreamInner {
-exclusive: Mutex
-shared: StreamInnerShared
+handle_event_start_complete()
+handle_event_receive()
+handle_event_send_complete()
+handle_event_peer_send_shutdown()
+handle_event_peer_send_aborted()
+handle_event_peer_receive_aborted()
+handle_event_send_shutdown_complete()
+handle_event_shutdown_complete()
+handle_event_ideal_send_buffer_size()
+handle_event_peer_accepted()
}
class StreamInnerExclusive {
-state: StreamState
-recv_state: StreamRecvState
-send_state: StreamSendState
-recv_buffers: VecDeque
-write_pool: Vec
-start_waiters: Vec<Waker>
-read_waiters: Vec<Waker>
-write_shutdown_waiters: Vec<Waker>
}
class StreamInnerShared {
-stream_type: StreamType
-local_open: bool
-id: RwLock<Option<u64>>
}
Stream --> StreamInstance : contains
StreamInstance --> StreamInner : holds
StreamInner --> StreamInnerExclusive : owns
StreamInner --> StreamInnerShared : owns
Stream <|-- ReadStream : read-only view
Stream <|-- WriteStream : write-only view

Additional Notes


This documentation references general concepts related to asynchronous IO, stream lifecycle management, and buffer handling. For detailed information on related buffer management, see StreamRecvBuffer and WriteBuffer. For connection-related error handling, refer to ConnectionError. The asynchronous traits implemented relate to [tokio::io](/tokio-io) and [futures_io](/futures-io).