dataflow_app.py
Overview
dataflow_app.py is a Flask-based RESTful API module within the InfiniFlow platform that manages dataflow canvases—visual workflows representing data processing pipelines. This file provides endpoints to create, modify, delete, run, and debug dataflow canvases, as well as manage related files, versions, settings, and trace logs.
The module serves as the backend logic for handling user requests related to DataFlow category canvases, integrating with database services, user authentication, and the core pipeline execution engine (Pipeline). It ensures users can collaboratively design, execute, and maintain complex data processing workflows in a secure and scalable manner.
Detailed Component Descriptions
Flask Routes / Endpoints Summary
Route | HTTP Method | Purpose |
|---|---|---|
| GET | List available dataflow canvas templates |
| GET | List canvases owned by the current user |
| POST | Delete one or more canvases owned by the current user |
| POST | Create or update a canvas with a DSL and metadata |
| GET | Retrieve a single canvas by ID |
| POST | Run a dataflow canvas on a specified document |
| POST | Reset a running dataflow canvas instance |
| POST | Upload files or crawl URLs to attach files to a canvas |
| GET | Retrieve the input form schema for a specific component in a canvas |
| POST | Debug a specific component in a canvas with given inputs |
| GET | List all saved versions of a canvas |
| GET | Retrieve a specific version of a canvas |
| GET | List canvases accessible by the user's tenant teams |
| POST | Update canvas metadata and permissions |
| GET | Fetch execution trace logs for a specific dataflow task |
Imports and Dependencies
Standard Libraries:
json,re,sys,time,functools.partialThird-party:
trio(async library),Flask(request handling), Flask-Login (user authentication)Project Modules:
agent.canvas.Canvas and agent.component.LLM — core canvas and component abstractions
Database service layers for canvases, documents, files, tasks, user versions, tenants
Utilities for request validation, response formatting, UUID generation, file handling
rag.flow.pipeline.Pipeline— main engine to execute and manage dataflow DSLs
Classes and Functions
This file primarily defines Flask route handler functions. There are no class definitions. Below are detailed explanations of key functions.
1. templates()
Route: GET /templates
Description:
Returns a list of all available canvas templates categorized under DataFlow.
Returns:
JSON response containing a list of template dictionaries.
Usage Example:
curl -H "Authorization: Bearer <token>" http://server/manager/templates
2. canvas_list()
Route: GET /list
Description:
Lists all dataflow canvases owned by the current logged-in user, sorted by last update time descending.
Returns:
JSON response with a list of user canvas dictionaries.
3. rm()
Route: POST /rm
Parameters:
canvas_ids: List of canvas IDs to delete.
Description:
Deletes canvases if the current user owns them.
Returns:
Success or error JSON response.
Validation:
User must be owner of each canvas.
4. save()
Route: POST /set
Parameters:
dsl(str or dict): The canvas domain-specific language describing the dataflow.title(str): Canvas title.Optional
id: If present, updates an existing canvas; else creates new.
Description:
Creates or updates a canvas with provided DSL and metadata. Also manages versioning by saving a version snapshot and pruning old versions.
Returns:
JSON response with saved canvas data or error.
Important Detail:
Ensures canvas title uniqueness per user.
Versions are timestamped with format
title_YYYY_MM_DD_HH_MM_SS.
5. get(canvas_id)
Route: GET /get/<canvas_id>
Description:
Fetches the canvas details if accessible by current user.
Returns:
Canvas data or error if not found or unauthorized.
6. run()
Route: POST /run
Parameters:
id: Canvas ID to run.doc_id: Document ID to process.Optional
user_id: Defaults to current user.
Description:
Queues the requested dataflow canvas for execution against a specified document.
Returns:
Task ID and flow ID on success.
Validation:
Canvas ownership check.
Document existence check.
7. reset()
Route: POST /reset
Parameters:
id: Canvas ID.Optional
task_id: Task ID to reset.
Description:
Resets the pipeline state of the specified dataflow canvas.
Implementation Detail:
Uses
Pipeline.reset()method.Updates the canvas DSL after reset.
8. upload(canvas_id)
Route: POST /upload/<canvas_id>
Parameters:
File upload or
urlquery parameter.
Description:
Uploads a file or crawls a URL to generate a file blob attached to a canvas.
Implementation Details:
Supports PDF repair for potentially broken files.
When
urlis provided, uses an async crawler (crawl4ai) to fetch and convert web content to markdown/pdf.Stores file blob with UUID location.
9. input_form()
Route: GET /input_form
Parameters:
id: Canvas ID.component_id: Component ID inside the canvas.
Description:
Returns the input form schema for a given component inside a canvas, facilitating UI form generation.
10. debug()
Route: POST /debug
Parameters:
id: Canvas IDcomponent_id: Component to debugparams: Input parameters for the component
Description:
Executes a single component within the canvas in isolation using provided inputs for debugging.
Implementation Details:
Resets canvas and component state before invocation.
Special handling for
LLMcomponents to set debug inputs.Returns component output, resolving any
partialoutputs into strings.
11. getlistversion(canvas_id)
Route: GET /getlistversion/<canvas_id>
Description:
Lists all historical saved versions of a canvas, sorted by update time descending.
12. getversion(version_id)
Route: GET /getversion/<version_id>
Description:
Fetches a specific version snapshot of a canvas.
13. list_canvas()
Route: GET /listteam
Parameters (optional):
keywords(str): Search keywordspage,page_size(int): Pagination controlsorderby(str),desc(bool): Sorting controls
Description:
Lists all canvases accessible to the current user through tenant teams.
14. setting()
Route: POST /setting
Parameters:
id: Canvas IDtitle: New titlepermission: Permission settingsOptional:
description,avatar
Description:
Updates metadata and permissions for a canvas.
15. trace()
Route: GET /trace
Parameters:
dataflow_id: Canvas IDtask_id: Execution task ID
Description:
Retrieves execution logs and trace data from a running or completed pipeline task.
Important Implementation Details
Access Control: All modifying routes enforce ownership checks using
UserCanvasService.accessible().DSL Handling: Canvas workflows are stored as JSON DSLs describing components and connections.
Versioning: Each save operation creates a version snapshot stored separately, enabling history and rollback.
Async File Crawl: Uses
trioandcrawl4aito crawl web pages asynchronously for content ingestion.Pipeline Integration: The core flow execution and reset utilize
rag.flow.pipeline.Pipelineclass.Error Handling: Uniform JSON error responses via utility functions (
get_data_error_result,server_error_response).Partial Outputs: In debugging, partial function outputs are fully evaluated into strings for clarity.
Interactions with Other System Components
Database Services: Interfaces with multiple database service layers for canvases, documents, files, tasks, versions, tenants.
Authentication: Uses
flask_loginto enforce user identity and authorization.Execution Engine: Integrates with
Pipelineengine for running, resetting, and tracing dataflows.File Storage: Uses
FileServiceto store and manage uploaded blobs.Crawler: Integrates with external async crawler (
crawl4ai) to download and convert web content.Agent Components: Uses
CanvasandLLMfromagentpackage to represent and debug component logic.
Example Usage
Creating a new canvas
POST /set
Content-Type: application/json
Authorization: Bearer <token>
{
"title": "My Data Flow",
"dsl": {
"nodes": [...],
"edges": [...]
}
}
Running a dataflow on a document
POST /run
Content-Type: application/json
Authorization: Bearer <token>
{
"id": "canvas-uuid",
"doc_id": "document-uuid"
}
Debugging a component
POST /debug
Content-Type: application/json
Authorization: Bearer <token>
{
"id": "canvas-uuid",
"component_id": "component-uuid",
"params": {
"input1": {"value": "test input"}
}
}
Mermaid Class Diagram
The following diagram represents the key classes instantiated or referenced in this file related to canvas and pipeline management, along with their main methods used:
classDiagram
class Canvas {
+__init__(dsl: str, user_id: str)
+reset()
+get_component(component_id: str)
+message_id: str
}
class LLM {
+reset()
+set_debug_inputs(params: dict)
+invoke(**kwargs)
+output() dict
}
class Pipeline {
+__init__(dsl: str, tenant_id: str, flow_id: str, task_id: str)
+reset()
+fetch_logs() list
+get_component_input_form(component_id: str) dict
+__str__() str
}
class UserCanvasService {
+query(...)
+save(...)
+update_by_id(id: str, data: dict) int
+delete_by_id(id: str)
+get_by_id(id: str)
+accessible(id: str, user_id: str) bool
+get_by_tenant_id(tenant_id: str)
}
class UserCanvasVersionService {
+insert(user_canvas_id: str, dsl: dict, title: str)
+delete_all_versions(user_canvas_id: str)
+list_by_canvas_id(canvas_id: str)
+get_by_id(version_id: str)
}
Canvas --> LLM : "contains components like"
Canvas --> Pipeline : "integrated with"
UserCanvasService ..> Canvas : "manages canvas data"
UserCanvasVersionService ..> Canvas : "handles version snapshots"
Summary
dataflow_app.py is a comprehensive API controller managing all aspects of dataflow canvases within the InfiniFlow platform. It bridges user requests with database services, the pipeline engine, file storage, and asynchronous crawling to provide a full lifecycle management for visual data processing workflows. Its design enforces strict access controls, supports versioning, debugging, and traceability, making it a critical backend component for interactive dataflow management.
End of documentation