db_utils.py
Overview
The db_utils.py file provides a set of utility functions designed to facilitate database operations using the Peewee ORM framework in the InfiniFlow project. It primarily handles bulk insertion, dynamic model generation based on job identifiers, mapping of ordinary dictionaries to database model objects, and flexible querying with support for various operators.
This file abstracts common database operations such as inserting large datasets efficiently, constructing dynamic ORM models tied to specific tables, and converting user-friendly query dictionaries into Peewee expressions. It integrates seamlessly with the project's database configuration and models, aiding in consistent, efficient, and maintainable database interactions.
Detailed Descriptions
Imports and Dependencies
Uses
operatorandfunctools.reducefor dynamic expression building.Imports
PooledMySQLDatabasefrom Peewee’s connection pooling module to handle MySQL-specific conflict resolution.Relies on project-specific utilities:
current_timestampandtimestamp_to_datefor timestamp management.Uses project database models
DBandDataBaseModelfor ORM interactions.
Functions
bulk_insert_into_db(model, data_source, replace_on_conflict=False)
Purpose:
Performs batch insertion of multiple records into the database. It automatically manages creation and update timestamps, creates the table if it does not exist, and optionally replaces existing records on conflict.
Parameters:
model(DataBaseModelsubclass): The Peewee model representing the target database table.data_source (
list[dict]): A list of dictionaries, each dictionary representing a record to be inserted.replace_on_conflict(bool, defaultFalse): IfTrue, conflicts during insertion will replace the existing row.
Returns:None
Usage Example:
data = [
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'},
]
bulk_insert_into_db(UserModel, data, replace_on_conflict=True)
Implementation Details:
Creates the table if it doesn't exist using
DB.create_tables.Adds or updates fields
create_time,create_date,update_time, andupdate_datein each record.The insertion is done in batches of 1000 records using Peewee's
insert_manyfor performance.Conflict resolution uses the
on_conflictclause:For MySQL (detected by
PooledMySQLDatabase), it preserves specified columns.For other databases, it uses conflict target
id.
get_dynamic_db_model(base, job_id)
Purpose:
Dynamically generates and returns a Peewee model class tied to a specific tracking table determined by the job ID.
Parameters:
base(a base model class with a.model()method): The base ORM model class from which to generate a new model.job_id(str): A string identifier for a job, used to determine the specific database table suffix.
Returns:
A dynamically created Peewee model class corresponding to the tracking table for the given job_id.
Usage Example:
DynamicModel = get_dynamic_db_model(BaseTrackingModel, 'job12345678abcdef')
instance = DynamicModel()
Implementation Details:
Uses
get_dynamic_tracking_table_index(job_id)to extract the first 8 characters of the job ID.Calls
base.model(table_index=...)to generate the dynamic model.
get_dynamic_tracking_table_index(job_id)
Purpose:
Extracts the table suffix/index from a job ID string, typically the first 8 characters.
Parameters:
job_id(str): The job identifier.
Returns:str: The first 8 characters of the job ID.
Usage Example:
index = get_dynamic_tracking_table_index('job12345678abcdef')
print(index) # Output: 'job12345'
fill_db_model_object(model_object, human_model_dict)
Purpose:
Sets attributes on a Peewee model instance from a dictionary with human-readable keys, mapping keys to model fields prefixed with f_.
Parameters:
model_object(instance ofDataBaseModel): The ORM model instance to be filled.human_model_dict(dict): Dictionary mapping simple keys to values.
Returns:model_object after setting its attributes.
Usage Example:
human_dict = {'name': 'Alice', 'age': 30}
user = UserModel()
fill_db_model_object(user, human_dict)
Implementation Details:
For each key in
human_model_dict, attempts to set the attributef_<key>on the model object if it exists.
Supported Operators Dictionary
supported_operators = {
'==': operator.eq,
'<': operator.lt,
'<=': operator.le,
'>': operator.gt,
'>=': operator.ge,
'!=': operator.ne,
'<<': operator.lshift,
'>>': operator.rshift,
'%': operator.mod,
'**': operator.pow,
'^': operator.xor,
'~': operator.inv,
}
Purpose:
Maps string operator symbols to Python operator functions to facilitate dynamic query building.
query_dict2expression(model: type[DataBaseModel], query: dict[str, bool | int | str | list | tuple])
Purpose:
Converts a dictionary of query conditions into a Peewee expression suitable for filtering database queries.
Parameters:
model: The Peewee model class to which the query applies.query: A dictionary where keys are field names and values are either:A scalar value (interpreted as equality),
Or a tuple/list with an operator string followed by operand(s).
Returns:
A Peewee expression combining all conditions using bitwise AND (&).
Usage Example:
query = {
'age': ('>=', 18),
'name': ('!=', 'Bob'),
}
expression = query_dict2expression(UserModel, query)
users = UserModel.select().where(expression)
Implementation Details:
For each field and value pair:
If value is not a tuple/list, defaults to equality.
Gets the corresponding ORM field (
f_<field>).Uses
supported_operatorsto find the Python operator function.Applies the operator or Peewee field method.
Combines all conditions with
reduce(operator.iand, ...)to create a conjunctive expression.
query_db(model: type[DataBaseModel], limit: int = 0, offset: int = 0, query: dict = None, order_by: str | list | tuple | None = None)
Purpose:
Performs a filtered, ordered, and paginated query on the database, returning matching records and total count.
Parameters:
model: The Peewee model class to query.limit(int): Maximum number of records to return; 0 means no limit.offset(int): Number of records to skip.query(dictor None): Query dictionary to filter results (seequery_dict2expression).order_by(str,list,tuple, or None): Field name and order direction for sorting. If None, defaults to('create_time', 'asc').
Returns:
Tuple (list_of_records, total_count), where:
list_of_recordsis a list of model instances matching the query.total_countis the total number of records matching the query, ignoring limit and offset.
Usage Example:
results, count = query_db(
UserModel,
limit=10,
offset=0,
query={'age': ('>=', 18)},
order_by=('name', 'desc')
)
Implementation Details:
Builds the base query using
.select().Applies filters using
query_dict2expressionifqueryis provided.Counts total results with
.count().Applies ordering by dynamically accessing field and applying
.asc()or.desc().Applies limit and offset.
Returns results as a list.
Important Implementation Notes
Dynamic Table/Model Handling:
The file supports dynamic table naming based on job IDs, facilitating multi-tenant or sharded data storage patterns.Timestamps Management:
Automatic handling of creation and update timestamps ensures data consistency and auditing.Batch Processing:
Inserts are batched to optimize performance and reduce transaction overhead.Operator Flexibility:
Thequery_dict2expressionfunction allows the use of a rich set of operators and custom expressions, enabling flexible querying.Conflict Resolution:
Conflict handling varies depending on the database backend, ensuring compatibility.
Interaction with Other System Components
api.db.db_models:
Provides the base database connection (DB) and model classes (DataBaseModel) for ORM functionality.api.utils:
Supplies time-related utility functions (current_timestamp,timestamp_to_date) used for timestamp fields.Database Backend:
Uses Peewee ORM with connection pooling and supports MySQL-specific features throughPooledMySQLDatabase.Dynamic Model Generation:
Works with base models that implement.model(table_index=...)to support dynamic table mapping based on job IDs.Higher-Level Application Logic:
These utilities are intended to be used by API layers or services managing data ingestion, querying, and processing workflows.
Visual Diagram
flowchart TD
A[bulk_insert_into_db] -->|creates tables & inserts data| DB[(Database)]
B[get_dynamic_db_model] -->|returns dynamic model based on job_id| DynamicModel
C[get_dynamic_tracking_table_index] -->|extracts table index from job_id| Index
B --> C
D[fill_db_model_object] -->|maps dict to model object| ModelObject
E[query_dict2expression] -->|converts dict to Peewee expression| Expression
F[query_db] -->|runs filtered & sorted query| DB
F --> E
F -->|returns records and count| Result
supported_operators -.-> E
DB[(Database)] -->|provides connection| bulk_insert_into_db
DB[(Database)] -->|used by| query_db
Summary
db_utils.py provides essential utilities for efficient and flexible database operations within the InfiniFlow project. From bulk inserts with conflict handling and timestamp management to dynamic model creation and expressive querying, it abstracts common patterns to improve developer productivity and maintain data integrity.
This file is a key component bridging database schema models with application logic, enabling dynamic and performant data access patterns tailored to job-specific tracking tables.