oss_conn.py
Overview
The oss_conn.py file provides an interface for interacting with an Object Storage Service (OSS) compatible with Amazon S3 APIs, such as Alibaba Cloud OSS. It encapsulates connection management, bucket operations, and object operations inside a singleton class named RAGFlowOSS. The class uses the boto3 library to perform high-level OSS actions such as uploading, downloading, deleting files, and generating presigned URLs.
This file is designed to be a utility component within the larger InfiniFlow system — abstracting OSS interactions to simplify storage management, error handling, and configuration reuse. It supports default bucket and prefix path abstractions to reduce repetitive code and ensure consistent file path and bucket usage throughout an application.
Class: RAGFlowOSS
A singleton class managing the OSS connection and providing methods to interact with buckets and objects.
Purpose
Establish and maintain a reusable connection to the OSS endpoint.
Perform bucket existence checks and creation.
Upload, download, delete, and list objects.
Generate presigned URLs for temporary access.
Apply default bucket and prefix path settings transparently.
Initialization
RAGFlowOSS()
Reads OSS configuration from
rag.settings.OSSincluding access keys, endpoint URL, region, bucket, and prefix path.Automatically opens a connection on instantiation.
Attributes
Attribute | Description |
|---|---|
|
|
| Dictionary loaded from settings with OSS config. |
| OSS access key ID. |
| OSS secret access key. |
| OSS service endpoint URL. |
| OSS region name. |
| Default bucket name (optional). |
| Default prefix path inside the bucket (optional). |
Decorators
use_default_bucket
A method decorator that substitutes the bucket argument with the default bucket if it is configured.
Usage:
@use_default_bucket
def bucket_exists(self, bucket):
...
If
self.bucketis set, it will be used regardless of the passedbucketparameter.Simplifies method calls without explicitly specifying the bucket when a default is available.
use_prefix_path
A method decorator that prepends the configured prefix_path to the filename/path argument.
Usage:
@use_prefix_path
@use_default_bucket
def put(self, bucket, fnm, binary):
...
If
self.prefix_pathis set, it prefixes the filename with this path.Ensures consistent directory structure inside buckets.
Methods
__open__(self)
Establishes a new boto3 client connection to the OSS endpoint.
If a previous connection exists, it is closed first.
Uses virtual hosted-style addressing and signature version 4.
Handles exceptions and logs failure to connect.
Example:
oss = RAGFlowOSS()
oss.__open__()
__close__(self)
Closes the existing OSS client connection by deleting the conn attribute and setting it to None.
bucket_exists(self, bucket: str) -> bool
Checks whether a bucket exists in the OSS.
Uses
head_bucketAPI call.Returns
Trueif bucket exists; otherwiseFalse.Logs exceptions for debugging.
Parameters:
bucket: Bucket name to check existence for.
Returns:
bool: Existence status of the bucket.
Example:
exists = oss.bucket_exists("my-bucket")
health(self)
Performs a simple health check by:
Verifying the default bucket exists or creating it.
Uploading a small test object to the bucket and prefix path.
Returns the result of the upload operation.
Note: The method uses hardcoded test data and filename.
Example:
oss.health()
get_properties(self, bucket: str, key: str) -> dict
Stub method intended to retrieve metadata or properties of an object.
Currently returns an empty dictionary.
Placeholder for future extension.
Parameters:
bucket: Bucket name.key: Object key/path.
Returns:
Empty dictionary
{}.
list(self, bucket: str, dir: str, recursive: bool = True) -> list
Stub method intended to list objects under a directory/prefix.
Currently returns an empty list.
Placeholder for future extension.
Parameters:
bucket: Bucket name.dir: Directory or prefix path.recursive: Whether to list recursively (defaultTrue).
Returns:
Empty list
[].
put(self, bucket: str, fnm: str, binary: bytes)
Uploads a binary object to the specified bucket and filename.
Creates the bucket if it does not exist.
Retries once on failure after reconnecting.
Uses decorators to apply default bucket and prefix path.
Logs debug and error information.
Parameters:
bucket: Bucket name.fnm: Filename or key within the bucket (prefix added automatically if configured).binary: Byte content to upload.
Returns:
The response from
upload_fileobj(typicallyNoneon success).
Example:
with open('example.txt', 'rb') as f:
data = f.read()
oss.put('my-bucket', 'folder/example.txt', data)
rm(self, bucket: str, fnm: str)
Deletes an object from the specified bucket and filename.
Uses decorators to apply default bucket and prefix path.
Logs exceptions on failure.
Parameters:
bucket: Bucket name.fnm: Filename or key to delete.
Example:
oss.rm('my-bucket', 'folder/example.txt')
get(self, bucket: str, fnm: str) -> bytes | None
Retrieves the binary content of an object.
Retries once on failure after reconnecting.
Returns the bytes content or
Noneif retrieval fails.Uses decorators to apply default bucket and prefix path.
Parameters:
bucket: Bucket name.fnm: Filename or key to retrieve.
Returns:
bytes: Content of the object.None: If retrieval failed.
Example:
data = oss.get('my-bucket', 'folder/example.txt')
if data:
print(data.decode())
obj_exist(self, bucket: str, fnm: str) -> bool
Checks if an object exists in a bucket.
Uses
head_objectAPI.Returns
Trueif object exists,Falseif not found (404).Raises exception on other errors.
Uses decorators to apply default bucket and prefix path.
Parameters:
bucket: Bucket name.fnm: Filename or key to check.
Returns:
bool: Existence of the object.
Example:
exists = oss.obj_exist('my-bucket', 'folder/example.txt')
get_presigned_url(self, bucket: str, fnm: str, expires: int) -> str | None
Generates a presigned URL for accessing an object.
Retries up to 10 times on failure with reconnect and delay.
Uses decorators to apply default bucket and prefix path.
Returns a URL string or
Noneif unable to generate.
Parameters:
bucket: Bucket name.fnm: Filename or key.expires: Expiration time in seconds for the URL.
Returns:
str: Presigned URL.None: If failed to generate.
Example:
url = oss.get_presigned_url('my-bucket', 'folder/example.txt', expires=3600)
print(url)
Implementation Details & Algorithms
Singleton Pattern: The
RAGFlowOSSclass is decorated with a@singletondecorator (imported fromrag.utils) ensuring only one instance, minimizing resource usage and preserving connection state.Connection Handling: The connection is managed internally and automatically re-established on failures to improve robustness.
Decorators for Defaults:
use_default_bucketanduse_prefix_pathdecorators abstract away boilerplate of checking and applying default settings.Retry Logic:
put,get, andget_presigned_urlmethods include retry logic with reconnection and sleep delays to handle transient network or service errors.Boto3 Configuration: Uses virtual hosted-style addressing (
addressing_style: "virtual") and signature version 4 (signature_version='v4') for compatibility with OSS providers like Alibaba Cloud OSS.
Interaction with Other Parts of the System
Configuration: Reads OSS connection parameters from
rag.settings.OSS, tying it to the global application configuration management.Singleton Util: Depends on
singletonfromrag.utilsto enforce the singleton pattern.Logging: Uses Python's
loggingmodule for debug and error information, integrating with the application's logging system.Boto3: Uses
boto3andbotocorefor all OSS communication.Potential Consumers: Other modules in the InfiniFlow system can instantiate
RAGFlowOSSand use it for persistent OSS interaction without managing connection or bucket details manually.
Usage Example
from oss_conn import RAGFlowOSS
oss = RAGFlowOSS()
# Upload a file
with open('data.bin', 'rb') as f:
data = f.read()
oss.put('my-bucket', 'data/data.bin', data)
# Check if object exists
if oss.obj_exist('my-bucket', 'data/data.bin'):
print("Object exists")
# Download a file
content = oss.get('my-bucket', 'data/data.bin')
print(content)
# Generate a presigned URL valid for 1 hour
url = oss.get_presigned_url('my-bucket', 'data/data.bin', expires=3600)
print(url)
# Delete the object
oss.rm('my-bucket', 'data/data.bin')
Diagram: Class Structure of RAGFlowOSS
classDiagram
class RAGFlowOSS {
-conn: boto3.client
-oss_config: dict
-access_key: str
-secret_key: str
-endpoint_url: str
-region: str
-bucket: str
-prefix_path: str
+__init__()
+__open__()
+__close__()
+bucket_exists(bucket) bool
+health()
+get_properties(bucket, key) dict
+list(bucket, dir, recursive=True) list
+put(bucket, fnm, binary)
+rm(bucket, fnm)
+get(bucket, fnm) bytes
+obj_exist(bucket, fnm) bool
+get_presigned_url(bucket, fnm, expires) str
}
Summary
oss_conn.py is a critical utility file that wraps OSS (S3-compatible) storage operations into a singleton class, simplifying and standardizing access to buckets and objects in the InfiniFlow system. It provides robust connection management, retry logic, and supports default configuration abstractions for bucket and path prefixes. This modular design enhances maintainability, error handling, and developer productivity for cloud object storage interactions.