db_models.py
Overview
The db_models.py file is a core module of the InfiniFlow application responsible for defining the database schema models, database connection management, and utility classes and functions related to database operations. It uses the Peewee ORM framework to define models representing various entities such as users, tenants, knowledge bases, documents, dialogs, and more. In addition to model definitions, this file provides:
Custom field types for JSON and serialized data handling.
Database connection pooling with retry logic for resilience.
Database locking mechanisms for MySQL and PostgreSQL.
Singleton pattern for database connection management.
Migration utilities for database schema evolution.
Decorators to add retry behavior to database function calls.
Overall, this module provides a robust and extensible foundation for managing persistent storage and data integrity in the InfiniFlow system.
Detailed Explanations
Utilities and Constants
singleton(cls, *args, **kw)
A decorator implementing the singleton pattern keyed by class and process ID to ensure only one instance is created per process.
Usage:
@singleton
class MyClass:
pass
obj1 = MyClass()
obj2 = MyClass()
assert obj1 is obj2 # True
CONTINUOUS_FIELD_TYPE
A set of Peewee field types considered "continuous" such as integers, floats, and datetime fields. Used for filtering and querying ranges.
AUTO_DATE_TIMESTAMP_FIELD_PREFIX
A set of prefix strings for timestamp-related fields that should be handled automatically, e.g., "create", "update", "start".
Enum Classes
TextFieldType(Enum)
Defines database-specific text field types for LongTextField.
MYSQL:"LONGTEXT"POSTGRES:"TEXT"
Custom Field Classes
LongTextField(TextField)
A subclass of Peewee's TextField that sets the field type according to the underlying database type (MySQL or Postgres) for storing large text data.
JSONField(LongTextField)
Field to store JSON-serializable Python objects as text in the database.
Parameters:
object_hook: Optional JSON object hook for custom deserialization.object_pairs_hook: Optional hook for ordered pairs deserialization.
Methods:
db_value(value): Serializes the Python object to JSON string before saving.python_value(value): Deserializes JSON string back to Python object when reading.
Defaults: Returns empty dict
{}if value is None or empty.
ListField(JSONField)
A specialization of JSONField with a default empty list [] instead of dict.
SerializedField(LongTextField)
Field to store serialized Python objects using either Pickle (base64 encoded) or JSON.
Parameters:
serialized_type: EnumSerializedTypeselecting serialization format (PICKLEorJSON).object_hook,object_pairs_hook: Hooks for JSON deserialization when using JSON serialization.
Methods:
db_value(value): Serializes the value according to selected type.python_value(value): Deserializes the stored value.
Raises:
ValueErrorif unsupported serialization type is given.
Helper Functions
is_continuous_field(cls: typing.Type) -> bool
Checks if a Peewee field class or its bases are considered continuous (integer, float, datetime).
auto_date_timestamp_field() -> set
Returns a set of timestamp field names with suffix _time based on AUTO_DATE_TIMESTAMP_FIELD_PREFIX.
auto_date_timestamp_db_field() -> set
Returns a set of timestamp field names with prefix f_ and suffix _time, matching the database field naming convention.
remove_field_name_prefix(field_name: str) -> str
Removes the prefix "f_" from a field name if present, used for converting database field names to human-readable keys.
Core Model Classes
BaseModel(Model)
Base class for all database models with:
Timestamp fields (
create_time,create_date,update_time,update_date).Serialization helpers (
to_dict,to_human_model_dict).Metadata access via
.meta.Primary key utilities (
get_primary_keys_name).Query helper (
query) supporting filters, continuous range queries, and ordering.Override of
insertto addcreate_timeautomatically.Data normalization in
_normalize_datato add/update timestamps and convert timestamps to dates.
JsonSerializedField(SerializedField)
A convenience subclass of SerializedField preset to use JSON serialization with a custom object hook (utils.from_dict_hook).
Database Connection & Pooling
RetryingPooledMySQLDatabase(PooledMySQLDatabase)
A MySQL database connection pool subclass with retry logic on lost connections (error codes 2013, 2006).
Parameters:
max_retries: Number of retry attempts (default 5).retry_delay: Initial delay between retries in seconds (default 1).
Methods:
execute_sql: Retries SQL execution in case of connection loss.begin: Retries transaction begin similarly._handle_connection_loss: Closes and reconnects database connections.
PooledDatabase(Enum)
Enum mapping database types to their pooled database classes:
MYSQL:RetryingPooledMySQLDatabasePOSTGRES:PooledPostgresqlDatabase
DatabaseMigrator(Enum)
Enum mapping database types to their migrator classes:
MYSQL:MySQLMigratorPOSTGRES:PostgresqlMigrator
BaseDataBase
A singleton class managing the database connection instance.
Reads configuration from
settings.DATABASE.Establishes a pooled connection using
PooledDatabaseenum.
Usage:
db_instance = BaseDataBase()
db = db_instance.database_connection
Retry Decorator
with_retry(max_retries=3, retry_delay=1.0)
Decorator to add retry logic with exponential backoff to decorated functions, typically database operations.
Parameters:
max_retries: Maximum attempts (default 3).retry_delay: Initial delay between attempts in seconds (default 1.0).
Behavior: Logs warnings on retry, errors if all attempts fail, then raises last exception.
Database Lock Classes
Provide advisory locking mechanisms for concurrent processes.
PostgresDatabaseLock
Uses PostgreSQL advisory locks (
pg_try_advisory_lockandpg_advisory_unlock).Lock ID is derived from MD5 hash of lock name.
Supports context manager and decorator usage.
Retries on failure using
with_retry.
MysqlDatabaseLock
Uses MySQL named locks (
GET_LOCKandRELEASE_LOCK).Supports context manager and decorator usage.
Retries on failure using
with_retry.
DatabaseLock(Enum)
Maps database types to their respective lock implementations:
MYSQL:MysqlDatabaseLockPOSTGRES:PostgresDatabaseLock
Global Database Variables
DB: Singleton database connection instance.DB.lock: Lock class corresponding to current database type.
Connection Management
close_connection()
Closes stale database connections older than 30 seconds to maintain pool health.
Data Model Classes
All models inherit DataBaseModel, which uses the shared DB connection.
User
Represents a system user with authentication fields, preferences, and status flags.
Implements
UserMixinfor Flask-Login integration.Provides
get_idmethod to return serialized access token.
Tenant
Represents an organizational tenant with default model IDs and credit balance.
UserTenant
Associates users with tenants and roles.
InvitationCode
Stores invitation codes linked to users and tenants.
LLMFactories
Represents providers of Large Language Models (LLM).
LLM
Represents specific LLMs with factory ID, type, tags, and support flags.
TenantLLM
Tenant-specific LLM configuration including API keys and usage stats.
TenantLangfuse
Stores Langfuse API keys and host info per tenant.
Knowledgebase
Represents a knowledge base with metadata, configuration, and access permissions.
Document
Represents documents associated with knowledge bases, including processing status and metadata.
File
Represents files and folders with hierarchical parent-child relationships.
File2Document
Associates files to documents.
Task
Represents processing tasks for documents with progress, priority, and retry details.
Dialog
Represents dialog applications with language, LLM settings, prompts, filters, and status.
Conversation
Stores conversation messages and references related to dialogs.
APIToken
Stores API tokens linked to tenants and optionally dialogs.
API4Conversation
Tracks API calls for conversations with usage stats and error logging.
UserCanvas
Represents user-created canvases with permissions and DSL configuration.
CanvasTemplate
Templates for canvases with title, description, and configuration.
UserCanvasVersion
Versioned snapshots of user canvases.
MCPServer
Stores configuration for MCP (Model Control Plane) servers.
Search
Represents saved search configurations with filters and parameters.
Database Initialization and Migration
init_database_tables(alter_fields=[])
Inspects all subclasses of
DataBaseModelin the module.Creates missing tables safely.
Logs success and failures.
Calls
migrate_db()for database schema migrations.
fill_db_model_object(model_object, human_model_dict)
Populates a model instance's attributes from a dictionary with keys matching model fields.
migrate_db()
Performs incremental schema migrations using playhouse.migrate.migrate and catches exceptions to continue.
Interaction with Other System Components
api.settings: Provides configuration like database type, credentials, and secret keys.api.utils: Utility functions for JSON serialization, timestamp conversion, and serialization helper functions.peeweeORM: Manages database abstraction, migrations, and connection pooling.flask_login.UserMixin: Integrates user authentication for Flask web framework.itsdangerous.Serializer: Used for secure token serialization.Other modules: This file primarily defines data models used by the API layer, business logic, and possibly UI components.
Important Implementation Details
Custom serialization fields (
JSONField,SerializedField) enable seamless JSON and pickle storage in text columns.Retry logic is built into database connection classes and decorated functions to ensure robustness against transient database errors.
Advisory locks implement distributed locking to prevent race conditions in concurrent multi-process environments.
Automated timestamps are handled in
BaseModelvia normalization methods to keep track of creation and update times.Table creation and migration are automated via
init_database_tablesandmigrate_dbfunctions ensuring schema consistency.Singleton pattern for database connection ensures resource efficiency in multi-threaded or multi-process setups.
Usage Examples
Querying with Filters and Ordering
# Get all users created between two timestamps
users = User.query(create_time=["2023-01-01 00:00:00", "2023-02-01 00:00:00"], reverse=True, order_by="create_time")
Using Database Lock as Context Manager
lock = DB.lock("my_lock_name")
with lock:
# Critical section here
do_database_updates()
Decorating a Method with Retry
@with_retry(max_retries=5, retry_delay=2)
def update_user_email(user_id, new_email):
user = User.get(User.id == user_id)
user.email = new_email
user.save()
Mermaid Class Diagram
classDiagram
class BaseModel {
+BigIntegerField create_time
+DateTimeField create_date
+BigIntegerField update_time
+DateTimeField update_date
+to_dict()
+to_human_model_dict(only_primary_with: list)
+get_primary_keys_name()
+query(**kwargs)
+insert(__data=None, **insert)
+_normalize_data(data, kwargs)
}
class DataBaseModel {
<<BaseModel>>
}
class User {
+CharField id
+CharField access_token
+CharField nickname
+CharField password
+CharField email
+CharField language
+BooleanField is_superuser
+get_id()
}
class Tenant {
+CharField id
+CharField name
+CharField public_key
+IntegerField credit
}
class Document {
+CharField id
+CharField kb_id
+CharField parser_id
+JSONField parser_config
+CharField source_type
+CharField name
+IntegerField size
+FloatField progress
}
class Task {
+CharField id
+CharField doc_id
+IntegerField from_page
+IntegerField to_page
+CharField task_type
+IntegerField priority
+FloatField progress
}
BaseModel <|-- DataBaseModel
DataBaseModel <|-- User
DataBaseModel <|-- Tenant
DataBaseModel <|-- Document
DataBaseModel <|-- Task
Summary
db_models.py is a comprehensive database model and management module that leverages Peewee ORM to define the data schema and provide robust database connection handling, serialization, locking, and migration utilities for the InfiniFlow platform. It abstracts database specifics and ensures consistency, reliability, and scalability of data operations across different database backends (MySQL and PostgreSQL).