client.py

Overview

The client.py file serves as a command-line interface (CLI) client to execute and monitor a data processing pipeline defined via a DSL (Domain Specific Language) JSON file. This file is part of the InfiniFlow project, which appears to manage complex data workflows. The core functionality includes:

This file acts as a lightweight client to start and observe data flows and is primarily intended for direct command-line usage or integration in scripts.


Classes and Functions

print_logs(pipeline: Pipeline) -> None

Continuously fetches and prints logs from the running pipeline every 5 seconds, stopping only when the program ends.

Parameters

Behavior

Usage Example

pipeline = Pipeline(dsl_json_str, tenant_id="tenant1", doc_id="doc1", task_id="task123", flow_id="flow123")
pipeline.reset()
print_logs(pipeline)

Main Script Logic (if __name__ == "__main__":)

This block executes when the script runs as a standalone program.

Steps:

  1. Argument Parsing:

    • Uses argparse to parse three CLI arguments:

      • -s / --dsl (optional): Path to the DSL JSON file describing the pipeline. Defaults to dsl_examples/general_pdf_all.json relative to the script directory.

      • -d / --doc_id (required): Document ID string to identify the document being processed.

      • -t / --tenant_id (required): Tenant ID string to identify the tenant context.

  2. Settings Initialization:

    • Calls settings.init_settings() to load or initialize global application settings.

  3. Pipeline Instantiation:

    • Reads the DSL JSON content from the specified file.

    • Creates a Pipeline object with the DSL content and identifiers (tenant_id, doc_id, task_id, flow_id).

    • Calls pipeline.reset() to initialize or reset the pipeline state.

  4. Log Monitoring Thread:

    • Creates a ThreadPoolExecutor with 5 worker threads.

    • Submits the print_logs function to run in a separate thread, passing the pipeline instance. This allows logs to be printed in parallel with pipeline execution.

  5. Pipeline Execution:

    • Calls trio.run(pipeline.run) to asynchronously start the pipeline execution using the Trio concurrency framework.

  6. Wait for Log Thread Completion:

    • Calls thr.result() to block until the log printing thread finishes (though practically it runs indefinitely until the program exits).


Important Implementation Details


Interaction with Other Components


Usage Example

Run the client from the command line:

python client.py -s path/to/workflow.json -d document123 -t tenantABC

This will:


Mermaid Class Diagram

The following diagram represents the structure of the main class used in this file (Pipeline) and the key function in this client.

classDiagram
    class Pipeline {
        +__init__(dsl: str, tenant_id: str, doc_id: str, task_id: str, flow_id: str)
        +reset()
        +run()
        +fetch_logs() dict
    }

    class client {
        +print_logs(pipeline: Pipeline) void
    }

    client --> Pipeline : uses

Summary

client.py is a CLI tool for launching and observing data processing pipelines defined by JSON DSLs in the InfiniFlow system. It bridges synchronous CLI usage with asynchronous pipeline execution and real-time log monitoring, leveraging both threading and async paradigms. It depends heavily on the Pipeline class and provides a simple yet effective interface to run complex data flow workflows.