redis_conn.py


Overview

redis_conn.py provides a comprehensive Redis client wrapper and utility toolkit tailored for the InfiniFlow system. It encapsulates connection management, message queue handling via Redis Streams, distributed locking, and common Redis data structure operations with fault tolerance and automatic reconnection.

The file primarily consists of:

This file acts as the centralized Redis interface for other components in the InfiniFlow application, abstracting Redis commands, handling exceptions, and ensuring resilience.


Classes and Functions

Class: RedisMsg

Represents a message fetched from a Redis Stream consumer group.

Constructor

def __init__(self, consumer, queue_name, group_name, msg_id, message)

Methods

Usage Example

msg = redis_db.queue_consumer("my_queue", "my_group", "consumer_1")
if msg:
    data = msg.get_message()
    # Process data...
    msg.ack()

Class: RedisDB (Singleton)

Central Redis connection manager and utility wrapper providing streamlined Redis operations with automatic reconnection and error handling.

Important Attributes

Constants

Constructor

def __init__(self)

Key Methods

Implementation Details

Usage Example

redis_db = RedisDB()

# Set a key-value pair
redis_db.set("my_key", "my_value")

# Add to sorted set
redis_db.zadd("myzset", "member1", 10.0)

# Produce a message to stream
redis_db.queue_product("mystream", {"task": "process", "id": 123})

# Consume message
msg = redis_db.queue_consumer("mystream", "mygroup", "consumer1")
if msg:
    data = msg.get_message()
    # Process data...
    msg.ack()

Class: RedisDistributedLock

Implements a distributed lock using Redis keys and the valkey.lock.Lock primitive to ensure mutual exclusion across distributed processes.

Constructor

def __init__(self, lock_key: str, lock_value: str = None, timeout: int = 10, blocking_timeout: int = 1)

Methods

Usage Example

lock = RedisDistributedLock("resource_lock")
if lock.acquire():
    try:
        # critical section
        pass
    finally:
        lock.release()

Important Implementation Details


Interaction With Other System Components

This file acts as the Redis backend interface and is foundational for features such as task queues, caching, locking, and coordination in the InfiniFlow system.


Visual Diagram

classDiagram
    class RedisMsg {
        -__consumer
        -__queue_name: str
        -__group_name: str
        -__msg_id: str
        -__message: dict
        +__init__(consumer, queue_name, group_name, msg_id, message)
        +ack() bool
        +get_message() dict
        +get_msg_id() str
    }

    class RedisDB {
        -REDIS
        -config
        -lua_delete_if_equal
        -LUA_DELETE_IF_EQUAL_SCRIPT: str
        +__init__()
        +register_scripts() void
        +__open__() redis.StrictRedis
        +health() bool
        +is_alive() bool
        +exist(k: str) int
        +get(k: str) str
        +set_obj(k: str, obj: object, exp: int) bool
        +set(k: str, v: str, exp: int) bool
        +sadd(key: str, member: str) bool
        +srem(key: str, member: str) bool
        +smembers(key: str) set
        +zadd(key: str, member: str, score: float) bool
        +zcount(key: str, min: float, max: float) int
        +zpopmin(key: str, count: int) list
        +zrangebyscore(key: str, min: float, max: float) list
        +transaction(key: str, value: str, exp: int) bool
        +queue_product(queue: str, message: dict) bool
        +queue_consumer(queue_name: str, group_name: str, consumer_name: str, msg_id=b">") RedisMsg
        +get_unacked_iterator(queue_names: list, group_name: str, consumer_name: str) iterator
        +get_pending_msg(queue: str, group_name: str) list
        +requeue_msg(queue: str, group_name: str, msg_id: str) void
        +queue_info(queue: str, group_name: str) dict
        +delete_if_equal(key: str, expected_value: str) bool
        +delete(key: str) bool
    }

    class RedisDistributedLock {
        -lock_key: str
        -lock_value: str
        -timeout: int
        -lock: Lock
        +__init__(lock_key: str, lock_value: str, timeout: int, blocking_timeout: int)
        +acquire() bool
        +spin_acquire() async
        +release() void
    }

    RedisMsg --> RedisDB : uses (consumer client)
    RedisDistributedLock --> RedisDB : uses (REDIS_CONN.REDIS)
    RedisDB o-- "valkey.StrictRedis" : redis client
    RedisDistributedLock o-- "valkey.lock.Lock" : lock primitive

Summary

redis_conn.py is a core Redis utility module that abstracts connection handling, data structure operations, message queueing with Redis Streams, and distributed locking for the InfiniFlow platform. It ensures robust Redis access with auto-reconnects, supports atomic operations via Lua scripting, and provides both synchronous and asynchronous interfaces for locking. This file forms a critical backend layer interfaced by other components for caching, queuing, and synchronization purposes.