cache_file_svr.py
Overview
cache_file_svr.py is a lightweight caching service designed to synchronize and cache files from a persistent storage backend into a Redis in-memory datastore. Its primary purpose is to periodically check for ongoing document processing tasks, fetch the corresponding files from storage if not already cached, and store them in Redis with a short time-to-live (TTL). This mechanism improves the performance of downstream components by providing faster access to frequently used files while tasks are in progress.
The script continuously runs in a loop, querying the task database for ongoing document names, retrieving files from the configured storage implementation, and caching them in Redis. It also manages database connection closure and handles exceptions gracefully throughout the process.
Detailed Explanation
Imports and Dependencies
logging,time,traceback: Standard Python modules used for debugging, timing, and error handling.close_connectionfromapi.db.db_models: Function to safely close the database connection.TaskServicefromapi.db.services.task_service: Service layer to interact with task-related database queries.STORAGE_IMPLfromrag.utils.storage_factory: Abstraction layer providing access to different storage backends.REDIS_CONNfromrag.utils.redis_conn: Redis connection utility for caching operations.
Functions
collect()
def collect():
doc_locations = TaskService.get_ongoing_doc_name()
logging.debug(doc_locations)
if len(doc_locations) == 0:
time.sleep(1)
return
return doc_locations
Purpose: Queries the database for documents currently being processed (ongoing tasks) to determine which files need to be cached.
Returns:
A list of tuples
(kb_id, loc), wherekb_idis the knowledge base or task identifier andlocis the file location/name.Returns
Noneif no ongoing documents are found, after a short delay.
Usage Example:
ongoing_files = collect()
if ongoing_files:
for kb_id, loc in ongoing_files:
print(f"Task {kb_id} processing file {loc}")
Implementation Details: Uses the
TaskServiceabstraction to isolate database querying logic, enabling easy modification or extension of task retrieval logic without altering this function.
Main Functionality
main()
def main():
locations = collect()
if not locations:
return
logging.info(f"TASKS: {len(locations)}")
for kb_id, loc in locations:
try:
if REDIS_CONN.is_alive():
try:
key = "{}/{}".format(kb_id, loc)
if REDIS_CONN.exist(key):
continue
file_bin = STORAGE_IMPL.get(kb_id, loc)
REDIS_CONN.transaction(key, file_bin, 12 * 60)
logging.info("CACHE: {}".format(loc))
except Exception as e:
traceback.print_stack(e)
except Exception as e:
traceback.print_stack(e)
Purpose: Coordinates the caching process for all ongoing document tasks:
Retrieves ongoing documents.
Checks if Redis is alive before caching.
Avoids caching files already present in Redis.
Fetches files from storage backend if missing.
Caches files in Redis with a TTL of 12 minutes.
Parameters: None.
Returns: None.
Usage: Called repeatedly in an infinite loop for continuous caching operation.
Error Handling: Nested try-except blocks capture and log stack traces for any exceptions during Redis operations or storage retrieval, preventing the service from crashing.
Implementation Details:
Uses a Redis key format
"kb_id/location"to uniquely identify cached files.The TTL of 12 minutes is chosen to balance cache freshness with resource use.
Redis
transactionmethod is assumed to atomically store the file data with TTL.
Script Execution Behavior
if __name__ == "__main__":
while True:
main()
close_connection()
time.sleep(1)
Runs the caching service indefinitely.
After each caching iteration (
main()), closes the database connection gracefully to avoid connection leaks.Sleeps for 1 second before the next iteration to reduce CPU load and avoid tight looping.
Important Implementation Details and Algorithms
Polling Model: The script uses a continuous polling approach to check for ongoing tasks every second. This simple loop is suitable for relatively low-frequency updates and avoids complexity of event-driven or push-based models.
Cache Consistency: Before fetching a file from storage, the script checks if the file is already cached in Redis to prevent redundant caching operations.
Error Isolation: The nested try-except blocks ensure that an error in caching one file or checking Redis does not affect other files or cause the entire service to stop.
Storage Abstraction: The use of
STORAGE_IMPLallows this caching service to remain storage-agnostic, making it adaptable to various storage backends (e.g., local filesystem, cloud storage).Redis Transactions: The script uses a transactional approach to safely write data with expiration, ensuring data integrity in the cache.
Interaction with Other System Components
TaskService (Database Layer): Provides the ongoing document names and identifiers. This file depends on it to know what files to cache.
Storage Implementation (
STORAGE_IMPL): Abstracts the physical retrieval of file binaries. This could be backed by cloud storage, filesystem, or other storage services.Redis Connection (
REDIS_CONN): Acts as the caching layer, storing file binaries temporarily for fast retrieval by other components.Database Connection Management: Calls
close_connection()after each caching cycle to maintain healthy DB connection pool usage.Logging: Integrates with the system-wide logging setup to record debug and informational messages.
Overall, this module acts as a bridge between the database task management, persistent storage, and the Redis caching layer, ensuring that data is synchronized and ready for fast access during active processing.
Visual Diagram
classDiagram
class cache_file_svr {
+collect()
+main()
}
class TaskService {
+get_ongoing_doc_name()
}
class STORAGE_IMPL {
+get(kb_id, loc)
}
class REDIS_CONN {
+is_alive()
+exist(key)
+transaction(key, data, ttl)
}
cache_file_svr ..> TaskService : uses
cache_file_svr ..> STORAGE_IMPL : uses
cache_file_svr ..> REDIS_CONN : uses
Summary
cache_file_svr.py is a polling-based caching daemon that:
Queries ongoing document tasks.
Retrieves files from storage.
Caches files in Redis with TTL.
It uses robust error handling and clean DB connection management.
Storage and caching are abstracted, allowing flexibility in underlying implementations.
This module enhances system performance by providing fast, in-memory access to files related to active tasks.
Example Usage Scenario
Assuming this service is deployed alongside a document processing pipeline:
A new document task is created and recorded in the database.
cache_file_svr.py detects the ongoing task via
TaskService.It checks Redis cache; if the file is missing, it fetches from storage.
The file is cached in Redis for quick access.
Downstream components retrieve the file from Redis rather than slower storage.
Once the task completes, the file is eventually evicted from Redis due to TTL or task status changes.
This process reduces latency and improves throughput of document processing workflows.