cache_file_svr.py


Overview

cache_file_svr.py is a lightweight caching service designed to synchronize and cache files from a persistent storage backend into a Redis in-memory datastore. Its primary purpose is to periodically check for ongoing document processing tasks, fetch the corresponding files from storage if not already cached, and store them in Redis with a short time-to-live (TTL). This mechanism improves the performance of downstream components by providing faster access to frequently used files while tasks are in progress.

The script continuously runs in a loop, querying the task database for ongoing document names, retrieving files from the configured storage implementation, and caching them in Redis. It also manages database connection closure and handles exceptions gracefully throughout the process.


Detailed Explanation

Imports and Dependencies


Functions

collect()

def collect():
    doc_locations = TaskService.get_ongoing_doc_name()
    logging.debug(doc_locations)
    if len(doc_locations) == 0:
        time.sleep(1)
        return
    return doc_locations
ongoing_files = collect()
if ongoing_files:
    for kb_id, loc in ongoing_files:
        print(f"Task {kb_id} processing file {loc}")

Main Functionality

main()

def main():
    locations = collect()
    if not locations:
        return
    logging.info(f"TASKS: {len(locations)}")
    for kb_id, loc in locations:
        try:
            if REDIS_CONN.is_alive():
                try:
                    key = "{}/{}".format(kb_id, loc)
                    if REDIS_CONN.exist(key):
                        continue
                    file_bin = STORAGE_IMPL.get(kb_id, loc)
                    REDIS_CONN.transaction(key, file_bin, 12 * 60)
                    logging.info("CACHE: {}".format(loc))
                except Exception as e:
                    traceback.print_stack(e)
        except Exception as e:
            traceback.print_stack(e)

Script Execution Behavior

if __name__ == "__main__":
    while True:
        main()
        close_connection()
        time.sleep(1)

Important Implementation Details and Algorithms


Interaction with Other System Components

Overall, this module acts as a bridge between the database task management, persistent storage, and the Redis caching layer, ensuring that data is synchronized and ready for fast access during active processing.


Visual Diagram

classDiagram
    class cache_file_svr {
        +collect()
        +main()
    }

    class TaskService {
        +get_ongoing_doc_name()
    }

    class STORAGE_IMPL {
        +get(kb_id, loc)
    }

    class REDIS_CONN {
        +is_alive()
        +exist(key)
        +transaction(key, data, ttl)
    }

    cache_file_svr ..> TaskService : uses
    cache_file_svr ..> STORAGE_IMPL : uses
    cache_file_svr ..> REDIS_CONN : uses

Summary


Example Usage Scenario

Assuming this service is deployed alongside a document processing pipeline:

  1. A new document task is created and recorded in the database.

  2. cache_file_svr.py detects the ongoing task via TaskService.

  3. It checks Redis cache; if the file is missing, it fetches from storage.

  4. The file is cached in Redis for quick access.

  5. Downstream components retrieve the file from Redis rather than slower storage.

  6. Once the task completes, the file is eventually evicted from Redis due to TTL or task status changes.

This process reduces latency and improves throughput of document processing workflows.


End of Documentation for cache_file_svr.py