azure_spn_conn.py
Overview
azure_spn_conn.py provides a singleton class RAGFlowAzureSpnBlob that manages secure connections to an Azure Data Lake Storage Gen2 container using a Service Principal Name (SPN) for authentication. This class encapsulates functionality for connecting to Azure Blob Storage, performing basic file operations such as upload, download, deletion, existence checks, and generating presigned URLs for temporary access. It handles connection retries and re-authentication transparently, making it a robust interface for Azure Data Lake file interactions in the InfiniFlow RAG (Retrieval-Augmented Generation) system.
Classes and Methods
RAGFlowAzureSpnBlob
A singleton class that manages Azure Data Lake storage connection using Azure SDK's ClientSecretCredential for authentication via SPN and exposes methods to manipulate files in a specified container.
This class uses environment variables or fallback settings from the rag.settings module to configure connection parameters.
Initialization
def __init__(self)
Initializes connection parameters (account URL, client ID, secret, tenant ID, container name) from environment or settings.
Calls
open()to establish the connection.
Private Methods
__open__
def __open__(self)
Establishes a connection to the Azure Data Lake container using
FileSystemClientand AzureClientSecretCredential.If an existing connection exists, it attempts to close it first.
Uses the
AzureAuthorityHosts.AZURE_CHINAauthority host (indicating this is connecting to Azure China cloud).Logs exceptions if connection fails.
__close__
def __close__(self)
Deletes the current connection object and sets it to
None.Ensures that stale connections are cleared before reopening.
Public Methods
health
def health(self) -> bool
Performs a basic health check by creating a test file with fixed name and content.
Uploads a small binary blob and flushes it to Azure storage.
Returns the result of
flush_data()indicating success.Usage example:
azure_blob = RAGFlowAzureSpnBlob()
is_healthy = azure_blob.health()
print(f"Connection healthy: {is_healthy}")
put
def put(self, bucket: str, fnm: str, binary: bytes) -> bool
Uploads binary data to a file named
fnmin the storage container.The
bucketparameter is accepted but not used internally (likely for interface consistency).Retries upload up to 3 times on failure; reconnects and waits 1 second between attempts.
Returns the result of
flush_data()on success orNoneon failure.Parameters:
bucket: Storage bucket/container name (not used internally).fnm: Filename/path in the container.binary: Byte content to upload.
Usage example:
data = b"Hello, Azure!"
azure_blob.put("mybucket", "folder1/hello.txt", data)
rm
def rm(self, bucket: str, fnm: str) -> None
Deletes the file
fnmfrom the container.The
bucketparameter is accepted but unused.Logs exceptions if deletion fails but does not retry.
Parameters:
bucket: Storage bucket/container name (not used internally).fnm: Filename/path to delete.
Usage example:
azure_blob.rm("mybucket", "folder1/oldfile.txt")
get
def get(self, bucket: str, fnm: str) -> bytes | None
Downloads and returns the content of the specified file.
Retries once on failure after reconnecting.
Returns
Noneif download fails.Parameters:
bucket: Storage bucket/container name (not used internally).fnm: Filename/path to download.
Usage example:
content = azure_blob.get("mybucket", "folder1/data.json")
if content:
print(content.decode("utf-8"))
obj_exist
def obj_exist(self, bucket: str, fnm: str) -> bool
Checks if a specified file exists in the container.
Returns
Trueif the file exists,Falseotherwise.Logs exceptions on failure.
Parameters:
bucket: Storage bucket/container name (not used internally).fnm: Filename/path to check.
Usage example:
exists = azure_blob.obj_exist("mybucket", "folder1/checkfile.txt")
print(f"File exists: {exists}")
get_presigned_url
def get_presigned_url(self, bucket: str, fnm: str, expires: int) -> str | None
Generates a presigned URL for temporary read access to a file.
Retries up to 10 times on failure with reconnect and delay.
Returns a URL string or
Noneif unable to generate.Parameters:
bucket: Storage bucket/container name (not used internally).fnm: Filename/path for which to generate the URL.expires: Expiry time in seconds for the presigned URL.
Usage example:
url = azure_blob.get_presigned_url("mybucket", "folder1/report.pdf", expires=3600)
if url:
print(f"Presigned URL: {url}")
Implementation Details and Algorithms
Singleton Pattern: The class is decorated with
@singleton(fromrag.utils), ensuring only one instance exists, which is crucial for managing a single connection pool or authentication context in the application.Azure Authentication: Uses
ClientSecretCredentialfromazure.identityto authenticate via SPN with tenant ID, client ID, and secret. The authority host is set to Azure China cloud (AzureAuthorityHosts.AZURE_CHINA), which is important for regional endpoint targeting.Connection Management: The
openmethod initializes theFileSystemClient, andcloseclears the connection. On exceptions during file operations, the connection is reopened and the operation retried to improve robustness.Retry Logic: Methods like
putandget_presigned_urlinclude retry loops with delays to handle transient failures gracefully.Use of Environment Variables and Settings: Connection parameters are first read from environment variables, falling back to application-wide settings in
rag.settings.AZURE. This allows flexible configuration for different deployment environments without changing code.Error Logging: All exceptions during operations are logged with context to facilitate debugging.
Interaction With Other System Components
rag.settings: Provides default Azure connection configuration if environment variables are not set. This centralizes configuration management.
rag.utils.singleton: Provides the singleton decorator used to ensure only one instance of the class is created.
Azure SDK: Uses Azure SDK clients and credentials (
azure.identity.ClientSecretCredentialandazure.storage.filedatalake.FileSystemClient) to interact with Azure Data Lake Storage Gen2.Environment Variables: Supports overriding configuration via environment variables, enabling easy integration with containerized deployments or CI/CD pipelines.
This module acts as a backend utility for other parts of the InfiniFlow RAG system that require reading/writing data to Azure Blob Storage securely and efficiently.
Mermaid Class Diagram
classDiagram
class RAGFlowAzureSpnBlob {
-conn: FileSystemClient | None
-account_url: str
-client_id: str
-secret: str
-tenant_id: str
-container_name: str
+__init__()
-__open__()
-__close__()
+health() bool
+put(bucket: str, fnm: str, binary: bytes) bool|None
+rm(bucket: str, fnm: str) None
+get(bucket: str, fnm: str) bytes|None
+obj_exist(bucket: str, fnm: str) bool
+get_presigned_url(bucket: str, fnm: str, expires: int) str|None
}
Summary
azure_spn_conn.py provides a robust, singleton Azure Data Lake Storage client using SPN authentication tailored for the InfiniFlow RAG ecosystem. It abstracts away connection management, retries, and authentication, offering simple methods to upload, download, delete, check existence, and generate presigned URLs for files within a configured container. The implementation ensures reliable Azure storage interactions within the broader application stack.