schema.py
Overview
The schema.py file defines data models used in the InfiniFlow project for structured data validation and serialization. It primarily contains Pydantic-based schemas that facilitate type-safe data handling, particularly for parsing data received from upstream sources. By leveraging Pydantic's powerful validation and parsing features, this file ensures that incoming data conforms to expected formats, which is critical for reliable processing in the InfiniFlow system.
In this file, the main focus is the ParserFromUpstream class, which models a data entity that includes metadata (timestamps) and payload information (name and binary blob). This schema is designed to be immutable and strictly validated, preventing extraneous fields and allowing alias-based field population.
Classes
ParserFromUpstream
A Pydantic data model representing parsed data received from upstream components, including metadata about creation and elapsed time, alongside the data payload.
Definition
class ParserFromUpstream(BaseModel):
created_time: float | None = Field(default=None, alias="_created_time")
elapsed_time: float | None = Field(default=None, alias="_elapsed_time")
name: str
blob: bytes
model_config = ConfigDict(populate_by_name=True, extra="forbid")
Attributes
Attribute | Type | Default | Description |
|---|---|---|---|
|
|
| Timestamp indicating when the data was created. Uses alias |
|
|
| Duration elapsed since creation or other reference time. Uses alias |
|
| N/A | Identifier or descriptive name for the parsed data. |
|
| N/A | Raw binary data payload associated with this parser entry. |
Configuration
populate_by_name=True: Allows population of model fields using their alias or field names interchangeably.extra="forbid": Disallows unexpected fields during model initialization, enforcing strict schema adherence.
Usage
from schema import ParserFromUpstream
# Example data from upstream source, possibly JSON or dict with aliases
data = {
"_created_time": 1685000000.0,
"_elapsed_time": 0.256,
"name": "sensor_123",
"blob": b'\x00\x01\x02\x03'
}
# Create a ParserFromUpstream instance from dict with aliases
parser_entry = ParserFromUpstream.model_validate(data)
print(parser_entry.created_time) # 1685000000.0
print(parser_entry.name) # sensor_123
print(parser_entry.blob) # b'\x00\x01\x02\x03'
Important Implementation Details
Aliases for Timestamps: The fields
created_timeandelapsed_timeuse aliases_created_timeand_elapsed_time, respectively. This supports compatibility with upstream data sources that may use different naming conventions.Strict Validation: The
extra="forbid"setting prevents accidental injection of unspecified fields, ensuring only expected data is processed.Type Safety: Using Pydantic’s type annotations, this model validates data types at runtime, reducing errors downstream.
No Methods: This model acts purely as a data container without additional behavior or methods.
Interaction with Other Components
Upstream Data Intake:
ParserFromUpstreamis designed to parse and validate raw data received from upstream sources before further processing in the InfiniFlow pipeline.Data Flow Integration: After validation, instances of this model are likely passed to processing modules that handle analytics, storage, or transformation.
Serialization/Deserialization: The use of aliases and strict field controls facilitates robust serialization to/from formats like JSON or binary protocols used in communication with other system parts.
Diagram
classDiagram
class ParserFromUpstream {
+created_time: float | None
+elapsed_time: float | None
+name: str
+blob: bytes
+model_config: ConfigDict
}
Summary
schema.py provides a foundational data model, ParserFromUpstream, which ensures that data coming into InfiniFlow from upstream sources adheres to a strict and predictable structure. This promotes reliability and maintainability in the data ingestion pipeline by leveraging Pydantic’s validation capabilities. The clear typing and aliasing also improve interoperability with other system components and external data producers.