dataflow_app.py


Overview

dataflow_app.py is a Flask-based RESTful API module within the InfiniFlow platform that manages dataflow canvases—visual workflows representing data processing pipelines. This file provides endpoints to create, modify, delete, run, and debug dataflow canvases, as well as manage related files, versions, settings, and trace logs.

The module serves as the backend logic for handling user requests related to DataFlow category canvases, integrating with database services, user authentication, and the core pipeline execution engine (Pipeline). It ensures users can collaboratively design, execute, and maintain complex data processing workflows in a secure and scalable manner.


Detailed Component Descriptions

Flask Routes / Endpoints Summary

Route

HTTP Method

Purpose

/templates

GET

List available dataflow canvas templates

/list

GET

List canvases owned by the current user

/rm

POST

Delete one or more canvases owned by the current user

/set

POST

Create or update a canvas with a DSL and metadata

/get/<canvas_id>

GET

Retrieve a single canvas by ID

/run

POST

Run a dataflow canvas on a specified document

/reset

POST

Reset a running dataflow canvas instance

/upload/<canvas_id>

POST

Upload files or crawl URLs to attach files to a canvas

/input_form

GET

Retrieve the input form schema for a specific component in a canvas

/debug

POST

Debug a specific component in a canvas with given inputs

/getlistversion/<canvas_id>

GET

List all saved versions of a canvas

/getversion/<version_id>

GET

Retrieve a specific version of a canvas

/listteam

GET

List canvases accessible by the user's tenant teams

/setting

POST

Update canvas metadata and permissions

/trace

GET

Fetch execution trace logs for a specific dataflow task


Imports and Dependencies


Classes and Functions

This file primarily defines Flask route handler functions. There are no class definitions. Below are detailed explanations of key functions.


1. templates()

Route: GET /templates

Description:
Returns a list of all available canvas templates categorized under DataFlow.

Returns:
JSON response containing a list of template dictionaries.

Usage Example:

curl -H "Authorization: Bearer <token>" http://server/manager/templates

2. canvas_list()

Route: GET /list

Description:
Lists all dataflow canvases owned by the current logged-in user, sorted by last update time descending.

Returns:
JSON response with a list of user canvas dictionaries.


3. rm()

Route: POST /rm

Parameters:

Description:
Deletes canvases if the current user owns them.

Returns:
Success or error JSON response.

Validation:
User must be owner of each canvas.


4. save()

Route: POST /set

Parameters:

Description:
Creates or updates a canvas with provided DSL and metadata. Also manages versioning by saving a version snapshot and pruning old versions.

Returns:
JSON response with saved canvas data or error.

Important Detail:


5. get(canvas_id)

Route: GET /get/<canvas_id>

Description:
Fetches the canvas details if accessible by current user.

Returns:
Canvas data or error if not found or unauthorized.


6. run()

Route: POST /run

Parameters:

Description:
Queues the requested dataflow canvas for execution against a specified document.

Returns:
Task ID and flow ID on success.

Validation:


7. reset()

Route: POST /reset

Parameters:

Description:
Resets the pipeline state of the specified dataflow canvas.

Implementation Detail:


8. upload(canvas_id)

Route: POST /upload/<canvas_id>

Parameters:

Description:
Uploads a file or crawls a URL to generate a file blob attached to a canvas.

Implementation Details:


9. input_form()

Route: GET /input_form

Parameters:

Description:
Returns the input form schema for a given component inside a canvas, facilitating UI form generation.


10. debug()

Route: POST /debug

Parameters:

Description:
Executes a single component within the canvas in isolation using provided inputs for debugging.

Implementation Details:


11. getlistversion(canvas_id)

Route: GET /getlistversion/<canvas_id>

Description:
Lists all historical saved versions of a canvas, sorted by update time descending.


12. getversion(version_id)

Route: GET /getversion/<version_id>

Description:
Fetches a specific version snapshot of a canvas.


13. list_canvas()

Route: GET /listteam

Parameters (optional):

Description:
Lists all canvases accessible to the current user through tenant teams.


14. setting()

Route: POST /setting

Parameters:

Description:
Updates metadata and permissions for a canvas.


15. trace()

Route: GET /trace

Parameters:

Description:
Retrieves execution logs and trace data from a running or completed pipeline task.


Important Implementation Details


Interactions with Other System Components


Example Usage

Creating a new canvas

POST /set
Content-Type: application/json
Authorization: Bearer <token>

{
  "title": "My Data Flow",
  "dsl": {
    "nodes": [...],
    "edges": [...]
  }
}

Running a dataflow on a document

POST /run
Content-Type: application/json
Authorization: Bearer <token>

{
  "id": "canvas-uuid",
  "doc_id": "document-uuid"
}

Debugging a component

POST /debug
Content-Type: application/json
Authorization: Bearer <token>

{
  "id": "canvas-uuid",
  "component_id": "component-uuid",
  "params": {
    "input1": {"value": "test input"}
  }
}

Mermaid Class Diagram

The following diagram represents the key classes instantiated or referenced in this file related to canvas and pipeline management, along with their main methods used:

classDiagram
    class Canvas {
        +__init__(dsl: str, user_id: str)
        +reset()
        +get_component(component_id: str)
        +message_id: str
    }
    class LLM {
        +reset()
        +set_debug_inputs(params: dict)
        +invoke(**kwargs)
        +output() dict
    }
    class Pipeline {
        +__init__(dsl: str, tenant_id: str, flow_id: str, task_id: str)
        +reset()
        +fetch_logs() list
        +get_component_input_form(component_id: str) dict
        +__str__() str
    }
    class UserCanvasService {
        +query(...)
        +save(...)
        +update_by_id(id: str, data: dict) int
        +delete_by_id(id: str)
        +get_by_id(id: str)
        +accessible(id: str, user_id: str) bool
        +get_by_tenant_id(tenant_id: str)
    }
    class UserCanvasVersionService {
        +insert(user_canvas_id: str, dsl: dict, title: str)
        +delete_all_versions(user_canvas_id: str)
        +list_by_canvas_id(canvas_id: str)
        +get_by_id(version_id: str)
    }
    Canvas --> LLM : "contains components like"
    Canvas --> Pipeline : "integrated with"
    UserCanvasService ..> Canvas : "manages canvas data"
    UserCanvasVersionService ..> Canvas : "handles version snapshots"

Summary

dataflow_app.py is a comprehensive API controller managing all aspects of dataflow canvases within the InfiniFlow platform. It bridges user requests with database services, the pipeline engine, file storage, and asynchronous crawling to provide a full lifecycle management for visual data processing workflows. Its design enforces strict access controls, supports versioning, debugging, and traceability, making it a critical backend component for interactive dataflow management.


End of documentation