db_utils.py

Overview

The db_utils.py file provides a set of utility functions designed to facilitate database operations using the Peewee ORM framework in the InfiniFlow project. It primarily handles bulk insertion, dynamic model generation based on job identifiers, mapping of ordinary dictionaries to database model objects, and flexible querying with support for various operators.

This file abstracts common database operations such as inserting large datasets efficiently, constructing dynamic ORM models tied to specific tables, and converting user-friendly query dictionaries into Peewee expressions. It integrates seamlessly with the project's database configuration and models, aiding in consistent, efficient, and maintainable database interactions.


Detailed Descriptions

Imports and Dependencies


Functions


bulk_insert_into_db(model, data_source, replace_on_conflict=False)

Purpose:
Performs batch insertion of multiple records into the database. It automatically manages creation and update timestamps, creates the table if it does not exist, and optionally replaces existing records on conflict.

Parameters:

Returns:
None

Usage Example:

data = [
    {'id': 1, 'name': 'Alice'},
    {'id': 2, 'name': 'Bob'},
]

bulk_insert_into_db(UserModel, data, replace_on_conflict=True)

Implementation Details:


get_dynamic_db_model(base, job_id)

Purpose:
Dynamically generates and returns a Peewee model class tied to a specific tracking table determined by the job ID.

Parameters:

Returns:
A dynamically created Peewee model class corresponding to the tracking table for the given job_id.

Usage Example:

DynamicModel = get_dynamic_db_model(BaseTrackingModel, 'job12345678abcdef')
instance = DynamicModel()

Implementation Details:


get_dynamic_tracking_table_index(job_id)

Purpose:
Extracts the table suffix/index from a job ID string, typically the first 8 characters.

Parameters:

Returns:
str: The first 8 characters of the job ID.

Usage Example:

index = get_dynamic_tracking_table_index('job12345678abcdef')
print(index)  # Output: 'job12345'

fill_db_model_object(model_object, human_model_dict)

Purpose:
Sets attributes on a Peewee model instance from a dictionary with human-readable keys, mapping keys to model fields prefixed with f_.

Parameters:

Returns:
model_object after setting its attributes.

Usage Example:

human_dict = {'name': 'Alice', 'age': 30}
user = UserModel()
fill_db_model_object(user, human_dict)

Implementation Details:


Supported Operators Dictionary

supported_operators = {
    '==': operator.eq,
    '<': operator.lt,
    '<=': operator.le,
    '>': operator.gt,
    '>=': operator.ge,
    '!=': operator.ne,
    '<<': operator.lshift,
    '>>': operator.rshift,
    '%': operator.mod,
    '**': operator.pow,
    '^': operator.xor,
    '~': operator.inv,
}

Purpose:
Maps string operator symbols to Python operator functions to facilitate dynamic query building.


query_dict2expression(model: type[DataBaseModel], query: dict[str, bool | int | str | list | tuple])

Purpose:
Converts a dictionary of query conditions into a Peewee expression suitable for filtering database queries.

Parameters:

Returns:
A Peewee expression combining all conditions using bitwise AND (&).

Usage Example:

query = {
    'age': ('>=', 18),
    'name': ('!=', 'Bob'),
}
expression = query_dict2expression(UserModel, query)
users = UserModel.select().where(expression)

Implementation Details:


query_db(model: type[DataBaseModel], limit: int = 0, offset: int = 0, query: dict = None, order_by: str | list | tuple | None = None)

Purpose:
Performs a filtered, ordered, and paginated query on the database, returning matching records and total count.

Parameters:

Returns:
Tuple (list_of_records, total_count), where:

Usage Example:

results, count = query_db(
    UserModel,
    limit=10,
    offset=0,
    query={'age': ('>=', 18)},
    order_by=('name', 'desc')
)

Implementation Details:


Important Implementation Notes


Interaction with Other System Components


Visual Diagram

flowchart TD
    A[bulk_insert_into_db] -->|creates tables & inserts data| DB[(Database)]
    B[get_dynamic_db_model] -->|returns dynamic model based on job_id| DynamicModel
    C[get_dynamic_tracking_table_index] -->|extracts table index from job_id| Index
    B --> C
    D[fill_db_model_object] -->|maps dict to model object| ModelObject
    E[query_dict2expression] -->|converts dict to Peewee expression| Expression
    F[query_db] -->|runs filtered & sorted query| DB
    F --> E
    F -->|returns records and count| Result
    supported_operators -.-> E
    DB[(Database)] -->|provides connection| bulk_insert_into_db
    DB[(Database)] -->|used by| query_db

Summary

db_utils.py provides essential utilities for efficient and flexible database operations within the InfiniFlow project. From bulk inserts with conflict handling and timestamp management to dynamic model creation and expressive querying, it abstracts common patterns to improve developer productivity and maintain data integrity.

This file is a key component bridging database schema models with application logic, enabling dynamic and performant data access patterns tailored to job-specific tracking tables.