schema.py

Overview

The schema.py file defines data models used in the InfiniFlow project for structured data validation and serialization. It primarily contains Pydantic-based schemas that facilitate type-safe data handling, particularly for parsing data received from upstream sources. By leveraging Pydantic's powerful validation and parsing features, this file ensures that incoming data conforms to expected formats, which is critical for reliable processing in the InfiniFlow system.

In this file, the main focus is the ParserFromUpstream class, which models a data entity that includes metadata (timestamps) and payload information (name and binary blob). This schema is designed to be immutable and strictly validated, preventing extraneous fields and allowing alias-based field population.


Classes

ParserFromUpstream

A Pydantic data model representing parsed data received from upstream components, including metadata about creation and elapsed time, alongside the data payload.

Definition

class ParserFromUpstream(BaseModel):
    created_time: float | None = Field(default=None, alias="_created_time")
    elapsed_time: float | None = Field(default=None, alias="_elapsed_time")

    name: str
    blob: bytes

    model_config = ConfigDict(populate_by_name=True, extra="forbid")

Attributes

Attribute

Type

Default

Description

created_time

float or None

None

Timestamp indicating when the data was created. Uses alias _created_time during serialization.

elapsed_time

float or None

None

Duration elapsed since creation or other reference time. Uses alias _elapsed_time.

name

str

N/A

Identifier or descriptive name for the parsed data.

blob

bytes

N/A

Raw binary data payload associated with this parser entry.

Configuration

Usage

from schema import ParserFromUpstream

# Example data from upstream source, possibly JSON or dict with aliases
data = {
    "_created_time": 1685000000.0,
    "_elapsed_time": 0.256,
    "name": "sensor_123",
    "blob": b'\x00\x01\x02\x03'
}

# Create a ParserFromUpstream instance from dict with aliases
parser_entry = ParserFromUpstream.model_validate(data)

print(parser_entry.created_time)  # 1685000000.0
print(parser_entry.name)          # sensor_123
print(parser_entry.blob)          # b'\x00\x01\x02\x03'

Important Implementation Details


Interaction with Other Components


Diagram

classDiagram
    class ParserFromUpstream {
        +created_time: float | None
        +elapsed_time: float | None
        +name: str
        +blob: bytes
        +model_config: ConfigDict
    }

Summary

schema.py provides a foundational data model, ParserFromUpstream, which ensures that data coming into InfiniFlow from upstream sources adheres to a strict and predictable structure. This promotes reliability and maintainability in the data ingestion pipeline by leveraging Pydantic’s validation capabilities. The clear typing and aliasing also improve interoperability with other system components and external data producers.