redis_conn.py
Overview
redis_conn.py provides a comprehensive Redis client wrapper and utility toolkit tailored for the InfiniFlow system. It encapsulates connection management, message queue handling via Redis Streams, distributed locking, and common Redis data structure operations with fault tolerance and automatic reconnection.
The file primarily consists of:
RedisDB: A singleton class managing Redis connection, scripts, and high-level Redis operations including sorted sets, sets, key-value storage, and Redis Streams for message queuing.RedisMsg: A wrapper for messages consumed from Redis Streams supporting acknowledgment and message content access.RedisDistributedLock: A distributed lock implementation atop Redis key locking with token validation and asynchronous retry mechanisms.
This file acts as the centralized Redis interface for other components in the InfiniFlow application, abstracting Redis commands, handling exceptions, and ensuring resilience.
Classes and Functions
Class: RedisMsg
Represents a message fetched from a Redis Stream consumer group.
Constructor
def __init__(self, consumer, queue_name, group_name, msg_id, message)
Parameters:
consumer: The Redis client instance used for acknowledging messages.queue_name(str): The name of the Redis Stream (queue).group_name(str): The consumer group name.msg_id(str): The Redis Stream message ID.message(dict): A dictionary containing the raw message payload (expects key"message"with a JSON string).
Behavior:
Parses the JSON message payload and stores metadata.
Methods
ack() -> boolAcknowledge receipt of the message to Redis, marking it as processed.
Returns
Trueif acknowledgment succeeds, else logs a warning and returnsFalse.
get_message() -> dictReturns the deserialized message content.
get_msg_id() -> strReturns the Redis Stream message ID.
Usage Example
msg = redis_db.queue_consumer("my_queue", "my_group", "consumer_1")
if msg:
data = msg.get_message()
# Process data...
msg.ack()
Class: RedisDB (Singleton)
Central Redis connection manager and utility wrapper providing streamlined Redis operations with automatic reconnection and error handling.
Important Attributes
REDIS: Thevalkey.StrictRedisclient instance.config: Redis connection configuration from application settings.lua_delete_if_equal: Registered Lua script to atomically delete keys if their value matches a given value.
Constants
LUA_DELETE_IF_EQUAL_SCRIPT: Lua script string that atomically deletes a key if its current value equals the provided argument.
Constructor
def __init__(self)
Initializes Redis connection using
settings.REDIS.Registers the Lua script.
Attempts connection and handles exceptions gracefully.
Key Methods
register_scripts() -> NoneRegisters Lua scripts with the Redis server.
open() -> redis.StrictRedis | NoneEstablishes or re-establishes the Redis connection.
health() -> boolPerforms a simple Redis ping and set/get test to verify connectivity.
is_alive() -> boolChecks if Redis client instance is initialized.
exist(k: str) -> int | NoneChecks if a key exists.
get(k: str) -> str | NoneGets the string value of a key.
set_obj(k: str, obj: object, exp: int=3600) -> boolSerializes and stores a Python object as JSON with expiration.
set(k: str, v: str, exp: int=3600) -> boolStores a string value with expiration.
sadd(key: str, member: str) -> boolAdds a member to a Redis set.
srem(key: str, member: str) -> boolRemoves a member from a Redis set.
smembers(key: str) -> set | NoneRetrieves all members of a Redis set.
zadd(key: str, member: str, score: float) -> boolAdds a member with a score to a sorted set.
zcount(key: str, min: float, max: float) -> intCounts members in a sorted set with scores within a range.
zpopmin(key: str, count: int) -> list | NonePops lowest scored members from a sorted set.
zrangebyscore(key: str, min: float, max: float) -> list | NoneReturns members with scores within a range.
transaction(key: str, value: str, exp: int=3600) -> boolPerforms a transactional set-if-not-exists with expiration.
queue_product(queue: str, message: dict) -> boolProduces a message to a Redis Stream (queue).
queue_consumer(queue_name: str, group_name: str, consumer_name: str, msg_id=b">") -> RedisMsg | NoneConsumes a message from a Redis Stream consumer group with blocking.
Creates the consumer group if it does not exist.
Returns a
RedisMsginstance orNoneif no message.
get_unacked_iterator(queue_names: list[str], group_name: str, consumer_name: str) -> Iterator[RedisMsg]Iterates over unacknowledged messages for specified queues and consumer group.
get_pending_msg(queue: str, group_name: str) -> listRetrieves a range of pending messages for a consumer group.
requeue_msg(queue: str, group_name: str, msg_id: str)Re-queues a pending message by duplicating and acknowledging the original.
queue_info(queue: str, group_name: str) -> dict | NoneRetrieves info about a consumer group in a stream.
delete_if_equal(key: str, expected_value: str) -> boolAtomically deletes a key if its value equals the expected one, using Lua script.
delete(key: str) -> boolDeletes a key.
Implementation Details
Uses
valkeyRedis client for strict Redis commands.Implements retry loops (3 attempts) on Redis operations to handle transient failures.
Automatically reconnects on exceptions.
Uses Redis Streams, consumer groups, and blocking reads to implement reliable queue consumption.
Lua scripting enables atomic conditional deletion of keys.
Applies singleton pattern to ensure one Redis connection per application.
Usage Example
redis_db = RedisDB()
# Set a key-value pair
redis_db.set("my_key", "my_value")
# Add to sorted set
redis_db.zadd("myzset", "member1", 10.0)
# Produce a message to stream
redis_db.queue_product("mystream", {"task": "process", "id": 123})
# Consume message
msg = redis_db.queue_consumer("mystream", "mygroup", "consumer1")
if msg:
data = msg.get_message()
# Process data...
msg.ack()
Class: RedisDistributedLock
Implements a distributed lock using Redis keys and the valkey.lock.Lock primitive to ensure mutual exclusion across distributed processes.
Constructor
def __init__(self, lock_key: str, lock_value: str = None, timeout: int = 10, blocking_timeout: int = 1)
Parameters:
lock_key: Redis key representing the lock.lock_value: Optional unique token for the lock owner; defaults to a UUID4 string.timeout: Lock expiration time in seconds.blocking_timeout: Timeout to block when acquiring the lock.
Initializes a
valkey.lock.Lockinstance tied to the Redis connection and lock key.
Methods
acquire() -> boolAttempts to acquire the lock, first deleting any existing lock if owned by this instance (via token).
async spin_acquire()Asynchronously tries to acquire the lock in a loop with 10-second sleeps until successful.
release()Releases the lock by deleting the key if owned by this instance.
Usage Example
lock = RedisDistributedLock("resource_lock")
if lock.acquire():
try:
# critical section
pass
finally:
lock.release()
Important Implementation Details
Redis Streams & Consumer Groups: The file uses Redis Streams (
xadd,xreadgroup,xack) to implement message queues with reliable delivery semantics. Consumer groups allow multiple consumers to coordinate message processing.Lua Scripting for Atomic Operations: The
delete_if_equalmethod uses a Lua script to atomically delete a key only if its current value matches an expected token. This prevents race conditions in distributed lock release or conditional deletion.Singleton Pattern: The
RedisDBclass uses a singleton decorator to ensure only one instance manages the Redis connection, simplifying resource management and consistency.Fault Tolerance: Redis commands are wrapped in try-except blocks with automatic reconnect attempts to handle network or server failures gracefully.
Distributed Locking: The distributed lock implementation uses token-based ownership to safely acquire and release locks, avoiding accidental unlocks by other clients.
Async Support: The lock provides an async spin acquire method using
triofor asynchronous concurrency scenarios.
Interaction With Other System Components
Settings Dependency: Uses
rag.settings.REDISfor Redis connection configuration.Singleton Usage: Other modules import
REDIS_CONNinstance for Redis operations.Message Queues: Other application components produce and consume messages through Redis Streams using
queue_productandqueue_consumer.Distributed Synchronization: Uses
RedisDistributedLockfor synchronizing access to shared resources in a distributed environment.Utilities: Relies on
singletondecorator andvalkeyRedis client library.
This file acts as the Redis backend interface and is foundational for features such as task queues, caching, locking, and coordination in the InfiniFlow system.
Visual Diagram
classDiagram
class RedisMsg {
-__consumer
-__queue_name: str
-__group_name: str
-__msg_id: str
-__message: dict
+__init__(consumer, queue_name, group_name, msg_id, message)
+ack() bool
+get_message() dict
+get_msg_id() str
}
class RedisDB {
-REDIS
-config
-lua_delete_if_equal
-LUA_DELETE_IF_EQUAL_SCRIPT: str
+__init__()
+register_scripts() void
+__open__() redis.StrictRedis
+health() bool
+is_alive() bool
+exist(k: str) int
+get(k: str) str
+set_obj(k: str, obj: object, exp: int) bool
+set(k: str, v: str, exp: int) bool
+sadd(key: str, member: str) bool
+srem(key: str, member: str) bool
+smembers(key: str) set
+zadd(key: str, member: str, score: float) bool
+zcount(key: str, min: float, max: float) int
+zpopmin(key: str, count: int) list
+zrangebyscore(key: str, min: float, max: float) list
+transaction(key: str, value: str, exp: int) bool
+queue_product(queue: str, message: dict) bool
+queue_consumer(queue_name: str, group_name: str, consumer_name: str, msg_id=b">") RedisMsg
+get_unacked_iterator(queue_names: list, group_name: str, consumer_name: str) iterator
+get_pending_msg(queue: str, group_name: str) list
+requeue_msg(queue: str, group_name: str, msg_id: str) void
+queue_info(queue: str, group_name: str) dict
+delete_if_equal(key: str, expected_value: str) bool
+delete(key: str) bool
}
class RedisDistributedLock {
-lock_key: str
-lock_value: str
-timeout: int
-lock: Lock
+__init__(lock_key: str, lock_value: str, timeout: int, blocking_timeout: int)
+acquire() bool
+spin_acquire() async
+release() void
}
RedisMsg --> RedisDB : uses (consumer client)
RedisDistributedLock --> RedisDB : uses (REDIS_CONN.REDIS)
RedisDB o-- "valkey.StrictRedis" : redis client
RedisDistributedLock o-- "valkey.lock.Lock" : lock primitive
Summary
redis_conn.py is a core Redis utility module that abstracts connection handling, data structure operations, message queueing with Redis Streams, and distributed locking for the InfiniFlow platform. It ensures robust Redis access with auto-reconnects, supports atomic operations via Lua scripting, and provides both synchronous and asynchronous interfaces for locking. This file forms a critical backend layer interfaced by other components for caching, queuing, and synchronization purposes.