azure_sas_conn.py
Overview
The azure_sas_conn.py file provides a singleton class RAGFlowAzureSasBlob that encapsulates interaction with an Azure Blob Storage container using Shared Access Signature (SAS) authentication. It enables uploading, downloading, deleting blobs, checking blob existence, and health checking the connection. The class handles connection management, automatic retries on failures, and logging for robust and convenient Azure Blob Storage operations within the application.
This file serves as a utility component within the InfiniFlow system, abstracting away the details of Azure Blob Storage SAS connectivity and providing a clean API for managing blob objects.
Class: RAGFlowAzureSasBlob
A singleton class that manages connection and operations with an Azure Blob Storage container using SAS tokens.
Purpose
Maintain a persistent connection to an Azure Blob container.
Provide methods to upload, download, delete blobs.
Check for blob existence.
Generate presigned URLs for blob access.
Ensure robustness via retry logic and connection reinitialization on failure.
Initialization
RAGFlowAzureSasBlob()
Reads the container URL and SAS token from environment variables (
CONTAINER_URL,SAS_TOKEN) or falls back to values defined in the application settings (settings.AZURE).Automatically attempts to open a connection to the Azure Blob container.
Attributes
Attribute | Type | Description |
|---|---|---|
|
| The Azure Blob Storage container client object. |
|
| URL of the Azure Blob container. |
|
| SAS token string for authentication. |
Methods
__open__()
def __open__(self) -> None
Opens or reopens the connection to the Azure Blob container.
If an existing connection exists, it attempts to close it first.
Uses
ContainerClient.from_container_urlwith the combined container URL and SAS token.Logs an exception if connection fails.
__close__()
def __close__(self) -> None
Deletes the current
connattribute and sets it toNone.Effectively closes the connection.
health()
def health(self) -> None
Performs a simple upload of a small test blob to verify connectivity and permissions.
Uploads a blob named
"txtxtxtxt1"with dummy binary data.Returns the result of
upload_blobcall (usually a response or None).
Usage example:
azure_blob = RAGFlowAzureSasBlob()
azure_blob.health()
put(bucket: str, fnm: str, binary: bytes)
def put(self, bucket: str, fnm: str, binary: bytes) -> Any
Uploads binary data as a blob to the container.
bucketparameter is accepted but not used internally (likely for interface compatibility).Retries up to 3 times on failure, reopening the connection between attempts.
Uses
upload_blobmethod of theContainerClient.Logs failures with exception details.
Parameters:
Name | Type | Description |
|---|---|---|
bucket | str | Target bucket name (not used internally) |
fnm | str | Blob name (filename) |
binary | bytes | Data to upload |
Returns:
The return value of
upload_blobon success.None if all retries fail.
Usage example:
data = b"Hello Azure!"
azure_blob.put("mybucket", "greeting.txt", data)
rm(bucket: str, fnm: str)
def rm(self, bucket: str, fnm: str) -> None
Deletes a blob specified by name.
bucketparameter is accepted but unused.Logs exceptions on failure.
Parameters:
Name | Type | Description |
|---|---|---|
bucket | str | Bucket name (unused) |
fnm | str | Blob name to remove |
Usage example:
azure_blob.rm("mybucket", "old_file.txt")
get(bucket: str, fnm: str)
def get(self, bucket: str, fnm: str) -> Optional[bytes]
Downloads a blob's content as bytes.
Retries once on failure, reopening the connection if needed.
Returns
Noneif download fails.
Parameters:
Name | Type | Description |
|---|---|---|
bucket | str | Bucket name (unused) |
fnm | str | Blob name to download |
Returns:
Blob content as bytes on success.
Noneon failure.
Usage example:
content = azure_blob.get("mybucket", "data.json")
if content:
print(content.decode("utf-8"))
obj_exist(bucket: str, fnm: str)
def obj_exist(self, bucket: str, fnm: str) -> bool
Checks if a blob exists in the container.
Returns
Trueif exists,Falseotherwise.Logs exceptions on failure.
Parameters:
Name | Type | Description |
|---|---|---|
bucket | str | Bucket name (unused) |
fnm | str | Blob name to check |
Returns:
Trueif blob exists.Falseif blob does not exist or error occurs.
Usage example:
exists = azure_blob.obj_exist("mybucket", "file.txt")
print(f"Blob exists? {exists}")
get_presigned_url(bucket: str, fnm: str, expires: int)
def get_presigned_url(self, bucket: str, fnm: str, expires: int) -> Optional[str]
Attempts to generate a presigned URL for blob access valid for a specified expiration time.
Retries up to 10 times on failure.
Returns
Noneif unable to generate URL.Note: The method
get_presigned_urlis called onContainerClientwhich is not a standard method of Azure SDK'sContainerClient. This indicates a possible custom extension or may require attention.
Parameters:
Name | Type | Description |
|---|---|---|
bucket | str | Bucket name (unused) |
fnm | str | Blob name |
expires | int | Expiration time in seconds for URL validity |
Returns:
Presigned URL string on success.
Noneon failure.
Usage example:
url = azure_blob.get_presigned_url("mybucket", "private_file.txt", 3600)
if url:
print("Access URL:", url)
Important Implementation Details
Singleton Pattern: The
RAGFlowAzureSasBlobclass is decorated with@singletonfromrag.utils.singleton, ensuring only one instance exists during runtime. This avoids repeated connections and resource overhead.Connection Management: The connection (
conn) is recreated on failures to maintain robustness.Retries: Upload (
put), download (get), and presigned URL generation (get_presigned_url) implement retry logic with delays, improving reliability in unstable network conditions.Logging: Extensive use of
logging.exceptionto capture error context.Environment and Settings: Supports configuration via environment variables or fallback application settings.
Interaction with Other Parts of the System
Imports
settingsfromragpackage for configuration defaults.Uses
singletondecorator fromrag.utilsto enforce the singleton design.Relies on Azure SDK's
azure.storage.blob.ContainerClientfor Azure Blob Storage operations.Provides a backend utility for any component that requires Azure Blob Storage access in the InfiniFlow system.
Expected to be used by higher-level modules that require storage of files, blobs, or artifacts in Azure Blob Storage with SAS authentication.
Diagram: Class Diagram of RAGFlowAzureSasBlob
classDiagram
class RAGFlowAzureSasBlob {
-conn: ContainerClient
-container_url: str
-sas_token: str
+__init__()
+__open__()
+__close__()
+health()
+put(bucket: str, fnm: str, binary: bytes)
+rm(bucket: str, fnm: str)
+get(bucket: str, fnm: str) bytes
+obj_exist(bucket: str, fnm: str) bool
+get_presigned_url(bucket: str, fnm: str, expires: int) str
}
Summary
azure_sas_conn.py is a core utility file providing a singleton class for managing Azure Blob Storage access using SAS tokens. It abstracts connection setup, blob operations, and error handling with retry logic. This makes it a reliable and reusable component for Azure blob storage interactions within the InfiniFlow application ecosystem.