client.py
Overview
The client.py file serves as a command-line interface (CLI) client to execute and monitor a data processing pipeline defined via a DSL (Domain Specific Language) JSON file. This file is part of the InfiniFlow project, which appears to manage complex data workflows. The core functionality includes:
Parsing CLI arguments to specify DSL input files and identifiers.
Initializing application settings.
Creating and resetting a
Pipelineinstance based on the provided DSL.Asynchronously running the pipeline using the Trio async framework.
Concurrently printing pipeline logs every 5 seconds in a background thread.
This file acts as a lightweight client to start and observe data flows and is primarily intended for direct command-line usage or integration in scripts.
Classes and Functions
print_logs(pipeline: Pipeline) -> None
Continuously fetches and prints logs from the running pipeline every 5 seconds, stopping only when the program ends.
Parameters
pipeline(Pipeline): An instance of thePipelineclass from therag.flow.pipelinemodule. This object manages the lifecycle and execution of the data flow.
Behavior
The function keeps track of the last printed logs to avoid redundant printing.
Every 5 seconds, it fetches the latest logs using
pipeline.fetch_logs().If the newly fetched logs differ from the previous logs, it prints the new logs as a JSON string.
Runs indefinitely alongside the asynchronous pipeline execution.
Usage Example
pipeline = Pipeline(dsl_json_str, tenant_id="tenant1", doc_id="doc1", task_id="task123", flow_id="flow123")
pipeline.reset()
print_logs(pipeline)
Main Script Logic (if __name__ == "__main__":)
This block executes when the script runs as a standalone program.
Steps:
Argument Parsing:
Uses
argparseto parse three CLI arguments:-s/--dsl(optional): Path to the DSL JSON file describing the pipeline. Defaults todsl_examples/general_pdf_all.jsonrelative to the script directory.-d/--doc_id(required): Document ID string to identify the document being processed.-t/--tenant_id(required): Tenant ID string to identify the tenant context.
Settings Initialization:
Calls
settings.init_settings()to load or initialize global application settings.
Pipeline Instantiation:
Reads the DSL JSON content from the specified file.
Creates a
Pipelineobject with the DSL content and identifiers (tenant_id,doc_id,task_id,flow_id).Calls
pipeline.reset()to initialize or reset the pipeline state.
Log Monitoring Thread:
Creates a
ThreadPoolExecutorwith 5 worker threads.Submits the
print_logsfunction to run in a separate thread, passing the pipeline instance. This allows logs to be printed in parallel with pipeline execution.
Pipeline Execution:
Calls
trio.run(pipeline.run)to asynchronously start the pipeline execution using the Trio concurrency framework.
Wait for Log Thread Completion:
Calls
thr.result()to block until the log printing thread finishes (though practically it runs indefinitely until the program exits).
Important Implementation Details
Concurrency Model:
The pipeline execution is handled asynchronously via the Trio framework (trio.run(pipeline.run)), suitable for async I/O-bound workflows. Log printing runs concurrently in a separate thread managed byThreadPoolExecutor, allowing logs to be fetched without blocking the main async event loop.Pipeline Logging:
Theprint_logsfunction implements a simple polling mechanism with a 5-second delay, fetching logs from the pipeline and printing them only if there are changes. This reduces redundant log output and improves readability.DSL Input:
The pipeline is configured by a DSL JSON file, allowing flexible definition of data flows. The default example file is located in adsl_examplesdirectory adjacent to the script.Task and Flow Identifiers:
The pipeline is instantiated with hardcodedtask_id="xxxx"andflow_id="xxx", which might be placeholders. Real use would likely replace these with meaningful IDs.Commented Out Code:
There is a commented-out call toqueue_dataflow, suggesting an alternative or supplementary method to enqueue the workflow for execution, but it is not active in this client.
Interaction with Other Components
rag.flow.pipeline.Pipeline
The core component this client interacts with.Pipelinehandles the parsing, orchestration, and execution of the workflow defined by the DSL. This file depends onPipelinefor running the data flow and fetching runtime logs.api.settings
The settings module is initialized to prepare the environment, configuration, or global parameters before pipeline execution.dsl_examples/general_pdf_all.json
The default DSL file used to configure the pipeline, located relative to this script.Concurrency Libraries
Usestriofor async execution andconcurrent.futures.ThreadPoolExecutorfor running background threads.
Usage Example
Run the client from the command line:
python client.py -s path/to/workflow.json -d document123 -t tenantABC
This will:
Load the DSL from the specified JSON file.
Initialize the pipeline with the given document and tenant IDs.
Start pipeline execution asynchronously.
Continuously fetch and print logs every 5 seconds until the pipeline completes or the program is terminated.
Mermaid Class Diagram
The following diagram represents the structure of the main class used in this file (Pipeline) and the key function in this client.
classDiagram
class Pipeline {
+__init__(dsl: str, tenant_id: str, doc_id: str, task_id: str, flow_id: str)
+reset()
+run()
+fetch_logs() dict
}
class client {
+print_logs(pipeline: Pipeline) void
}
client --> Pipeline : uses
Summary
client.py is a CLI tool for launching and observing data processing pipelines defined by JSON DSLs in the InfiniFlow system. It bridges synchronous CLI usage with asynchronous pipeline execution and real-time log monitoring, leveraging both threading and async paradigms. It depends heavily on the Pipeline class and provides a simple yet effective interface to run complex data flow workflows.