canvas_service.py
Overview
This file, canvas_service.py, is a core service layer module of the InfiniFlow platform responsible for managing canvas templates, user canvases (agents), and orchestrating conversational AI completions based on those canvases. It acts as an intermediary between the database models representing canvases and the runtime logic that executes conversations using those canvases.
Key responsibilities include:
CRUD-like querying and permission checks for user canvases.
Managing canvas templates (including aliasing DataFlow templates).
Handling conversational sessions, including streaming responses compatible with OpenAI style completions.
Integrating with the
Canvasruntime engine to process queries and generate answers.Tracking and persisting conversation history and metadata.
This module tightly integrates database services, the Canvas execution engine, and OpenAI-compatible response formatting to facilitate multi-tenant, session-based AI agents.
Classes and Functions
Classes
1. CanvasTemplateService(CommonService)
Purpose: Service layer for accessing
CanvasTemplatedatabase model.Attributes:
model = CanvasTemplate — Defines the DB model this service operates on.
Description: Provides generic common service methods inherited from
CommonServicetailored for canvas templates.
2. DataFlowTemplateService(CommonService)
Purpose: Alias of
CanvasTemplateService.Attributes:
Description: Exists for semantic clarity, indicating DataFlow templates are stored similarly to canvas templates.
3. UserCanvasService(CommonService)
Purpose: Service layer for managing
UserCanvasentities representing user-created AI agents.Attributes:
model = UserCanvas
Methods:
get_list(cls, tenant_id, page_number, items_per_page, orderby, desc, id, title, canvas_category=CanvasCategory.Agent)
Description: Retrieves a paginated list of user canvases for a tenant with optional filters.
Parameters:
tenant_id(int): Tenant/user ID.page_number(int): Pagination page number.items_per_page(int): Number of items per page.orderby(str): Field name to order by.desc(bool): Whether to order descending.id(Optional[int]): Filter by specific canvas ID.title(Optional[str]): Filter by canvas title.canvas_category(CanvasCategory): Category filter, defaultAgent.
Returns: List of dictionaries representing canvases matching criteria.
get_by_tenant_id(cls, pid)
Description: Retrieves a single user canvas by its ID, joined with user metadata.
Parameters:
pid(int): Canvas ID.
Returns: Tuple (bool, dict|None) indicating success and the canvas data dictionary if found.
get_by_tenant_ids(cls, joined_tenant_ids, user_id, page_number, items_per_page, orderby, desc, keywords, canvas_category=CanvasCategory.Agent)
Description: Retrieves canvases accessible by a user, considering tenant permissions and optional keyword filtering.
Parameters:
joined_tenant_ids(List[int]): Tenant IDs the user has joined.user_id(int): Current user ID.Pagination and ordering params.
keywords(str): Optional keyword filter on title.canvas_category(CanvasCategory): Canvas category filter.
Returns: Tuple
(List[dict], int)— list of canvases and total count.
accessible(cls, canvas_id, tenant_id)
Description: Checks if a tenant has permission to access a given user canvas.
Parameters:
canvas_id(int): Canvas ID.tenant_id(int): Tenant/user ID.
Returns:
boolindicating access permission.
Functions
completion(tenant_id, agent_id, session_id=None, **kwargs)
Purpose: Core generator function that runs a conversational session on a canvas agent, yielding incremental results (streaming).
Parameters:
tenant_id(int): Tenant ID owning the agent.agent_id(int): Agent (canvas) ID.session_id(str, optional): Conversation session ID; if not provided, a new session is created.**kwargs:queryorquestion(str): User query string.files(list): Attached files relevant to the query.inputs(dict): Additional input parameters.user_id(str): User identifier.
Returns: Generator yielding JSON-encoded event messages to be consumed by the client, including partial messages and final completion.
Implementation details:
If
session_idis provided, loads existing conversation; else creates a new conversation record.Uses the
Canvasengine to run the query and generate answers.Appends messages to conversation history.
Yields data in a server-sent events (SSE) compatible format (
data: <json>\n\n).
Usage Example:
for event in completion(tenant_id=1, agent_id=42, query="Hello AI!", user_id="user123"):
print(event) # Streamed response chunks
completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True, **kwargs)
Purpose: Wrapper around
completionto provide OpenAI API-compatible streaming or non-streaming completions.Parameters:
tenant_id,agent_id,session_id: Same ascompletion.question(str): The prompt/question to ask the agent.stream(bool): Whether to stream responses (default True).**kwargs: Additional parameters forwarded tocompletion.
Returns: Yields either:
Streaming OpenAI format events (
data: {...}\n\n),Or a single OpenAI-formatted response dict if
stream=False.
Implementation details:
Uses
tiktokento tokenize and track prompt and completion token counts.Catches and logs exceptions, returning error messages formatted for OpenAI clients.
For streaming, converts incremental
completionoutputs into OpenAI delta message format.For non-streaming, aggregates full response before yielding.
Integration:
Calls
get_data_openaiutility to translate internal events to OpenAI response format.
Usage Example:
# Streaming usage
for event in completionOpenAI(tenant_id=1, agent_id=42, question="Explain AI?", stream=True):
print(event)
# Non-streaming usage
response = next(completionOpenAI(tenant_id=1, agent_id=42, question="Explain AI?", stream=False))
print(response)
Important Implementation Details
Database Integration: Uses Peewee ORM with context managers (
@DB.connection_context()) for safe DB transactional access.Canvas Engine: Relies heavily on
Canvasclass fromagent.canvas, which encapsulates DSL execution and response generation logic.Conversation Persistence:
API4ConversationServiceis used to create, update, and append messages to conversations, ensuring durable session history.Token Counting: Uses OpenAI-compatible tokenizer (
tiktoken) to track prompt and completion tokens, enabling usage metering and cost estimation.Streaming Protocol: Outputs follow Server-Sent Events (SSE) format, compatible with clients expecting OpenAI style streaming completions.
Access Control: Implements tenant-based permission checks to ensure users only access canvases they own or have team permissions for.
Interaction with Other Modules
agent.canvas: TheCanvasclass is the core runtime that executes the canvas DSL and generates AI responses.api.dbModels: Interacts withCanvasTemplate,UserCanvas,User,API4Conversationmodels for data persistence.api.db.services:CommonService: Base class providing generic DB operations.API4ConversationService: Manages conversation lifecycle.UserTenantService(imported locally): Used for permission checking.
api.utils.api_utils: Providesget_data_openaito format responses compatible with OpenAI API.tiktoken: For tokenizing prompts and completions.peewee.fn: Used for SQL functions (e.g.,LOWER) in queries.
Visual Diagram
classDiagram
class CanvasTemplateService {
+model: CanvasTemplate
}
class DataFlowTemplateService {
+model: CanvasTemplate
}
class UserCanvasService {
+model: UserCanvas
+get_list(tenant_id, page_number, items_per_page, orderby, desc, id, title, canvas_category)
+get_by_tenant_id(pid)
+get_by_tenant_ids(joined_tenant_ids, user_id, page_number, items_per_page, orderby, desc, keywords, canvas_category)
+accessible(canvas_id, tenant_id)
}
class Canvas {
+__init__(dsl, tenant_id, agent_id)
+run(query, files, user_id, inputs)
+reset()
+get_reference()
}
class API4ConversationService {
+get_by_id(session_id)
+save(**conv)
+append_message(conv_id, conv)
}
class completion {
+tenant_id
+agent_id
+session_id
+kwargs
+generator
}
class completionOpenAI {
+tenant_id
+agent_id
+question
+session_id
+stream
+kwargs
+generator
}
CanvasTemplateService --|> CommonService
DataFlowTemplateService --|> CommonService
UserCanvasService --|> CommonService
completion --> Canvas
completion --> API4ConversationService
completionOpenAI --> completion
completionOpenAI --> tiktoken
completionOpenAI --> get_data_openai
Summary
canvas_service.py plays a critical role in the InfiniFlow architecture by bridging the gap between user canvas data stored in the database and the AI runtime engine (Canvas). It provides robust services for querying canvases, enforcing permissions, managing conversational sessions, and generating AI responses with OpenAI API compatibility. This module enables multi-tenant agent management and scalable conversational AI workflows with streaming and batch response support.