listener.rs
Overview
This file implements a key-based event listener system that allows clients to subscribe to changes on keys with specific prefixes. When a key change event occurs, all listeners registered for matching prefixes are notified with a stripped version of the event key. The main entities are Listeners, which manages multiple listeners efficiently, and ListenerHandle, which controls the lifecycle of individual subscriptions.
The system is optimized for relatively few listeners, using a nested map keyed by string prefixes and listener IDs. It supports automatic removal of listeners when their handles are dropped, with an option to keep listeners alive indefinitely via the forever method.
Key Concepts and Structures
ListenerHandle
A handle representing a subscription to a key prefix event. It manages the lifecycle of the listener:
Fields:
prefix: String— The key prefix this listener is subscribed to.listener_id: usize— Unique identifier for the listener.listeners: Weak<RwLock<InnerListeners>>— Weak reference to the internal listeners container.
Methods:
forever(self)— Prevents automatic cancellation of the listener when this handle is dropped by resetting the weak reference toInnerListeners.
Drop Trait:
On dropping the handle, it attempts to remove the listener from the
InnerListenersif the internal listeners still exist. This ensures automatic cleanup unlessforever()was called.
Usage Example:
let handle = listeners.subscribe_event("prefix:", callback);
// Listener is automatically removed when `handle` is dropped,
// unless `handle.forever()` is called.
Listeners
A thread-safe, cloneable container managing multiple listeners:
Fields:
inner: Arc<RwLock<InnerListeners>>— Shared ownership of the internal listeners.
Methods:
subscribe_event(key_prefix, callback) -> ListenerHandle— Registers a callback for a specific key prefix. Returns aListenerHandlemanaging the subscription.trigger_event(key_change_event)— Triggers an event, notifying all listeners whose prefixes match the event key.
Internal Details:
Uses a helper method
subscribe_event_for_ligher_monomorphizationto reduce code bloat from monomorphization of closures.
Usage Example:
let listeners = Listeners::default();
let handle = listeners.subscribe_event("my_prefix:", |event| {
println!("Received event: {:?}", event);
});
listeners.trigger_event(some_key_change_event);
InnerListeners
The internal structure holding all listeners and their identifiers:
Fields:
listeners: BTreeMap<String, HashMap<usize, BoxedListener>>— Maps key prefixes to listener maps, where each listener map associates an ID to a callback.listener_idx: AtomicUsize— Atomic counter for generating unique listener IDs.
Methods:
subscribe_event(key_prefix, idx, callback)— Adds a callback under the specified prefix and ID.trigger_event(key_change_event)— Invokes all listeners whose prefixes match the event's key.remove_listener(key_prefix, idx)— Removes a listener by prefix and ID.
Event Dispatch Algorithm:
If the prefix
""(empty string) exists, its listeners are always notified first.If the event key is empty, no further listeners are notified.
Uses a range query on the
BTreeMapto efficiently find prefixes lexicographically between the first character of the key and the full key. This approximates prefix matching.For each matching prefix, the event key is stripped of the prefix and passed to corresponding listeners.
Implementation Note:
Although a trie would be more efficient for prefix matching, the current approach uses a BTreeMap due to expected small listener counts.
Types and Aliases
BoxedListener: A boxed, thread-safe, static callback function type alias:Box<dyn Fn(KeyChangeEvent) + 'static + Send + Sync>KeyChangeEvent: Represents a change event on a key, expected to have fields likekey,value, andnode. It is imported from the crate and interacts tightly with the listener callbacks.
How This File Interacts With Other Parts of the System
It depends on
KeyChangeEventfrom the crate to represent mutation events that listeners react to.Uses synchronization primitives (
Arc,Weak,RwLock,AtomicUsize) to allow concurrent safe mutable access and listener lifecycle management.The listener callbacks are user-defined closures that receive
KeyChangeEventinstances, enabling customizable reactions to key changes.This component is likely used in a distributed or stateful system where changes to keys need to be monitored and acted upon, as indicated by the test usage of
ChitchatId.
Important Algorithms and Implementation Details
Listener Subscription and Removal
Listeners are registered with a key prefix and a unique index assigned atomically.
The
ListenerHandleuses a weak pointer to avoid circular references, allowing automatic removal when dropped.Calling
forever()on a handle breaks this weak link to keep the listener alive beyond handle drop.
Event Triggering
Listeners with an empty prefix receive all events.
Other listeners receive events only when their prefix matches the start of the event key.
The event key is stripped of the prefix before invoking the listener callback.
The listeners are stored in a
BTreeMapto enable efficient range queries for prefix matching.The event dispatch method uses a lexicographical range based on the first character of the event key to limit the search space.
Visual Diagram
classDiagram
class ListenerHandle {
+prefix: String
+listener_id: usize
+listeners: Weak<RwLock<InnerListeners>>
+forever()
<<drop>>
}
class Listeners {
+inner: Arc<RwLock<InnerListeners>>
+subscribe_event()
+trigger_event()
}
class InnerListeners {
+listeners: BTreeMap<String, HashMap<usize, BoxedListener>>
+listener_idx: AtomicUsize
+subscribe_event()
+trigger_event()
+remove_listener()
}
ListenerHandle --> InnerListeners : weak reference
Listeners --> InnerListeners : Arc reference
Listeners --> ListenerHandle : returns
InnerListeners "1" *-- "*" BoxedListener : stores
Tests Summary
The file contains comprehensive tests verifying:
Basic subscription and event triggering.
Correct key prefix stripping in events.
Proper removal of listeners when handles are dropped.
Behavior when subscribing with the empty prefix (receives all events).
The effect of calling
forever()on a handle to prevent listener removal.Multiple listeners with overlapping prefixes receive events as expected.
These tests demonstrate correct integration and concurrency safety of the listener system.
References
Atomic Operations — for
AtomicUsizeusage.Synchronization Primitives — for
Arc,Weak, andRwLock.KeyChangeEvent — for the event type passed to listeners.
Closures and Callbacks — for how listener callbacks are defined and used.