oss_conn.py


Overview

The oss_conn.py file provides an interface for interacting with an Object Storage Service (OSS) compatible with Amazon S3 APIs, such as Alibaba Cloud OSS. It encapsulates connection management, bucket operations, and object operations inside a singleton class named RAGFlowOSS. The class uses the boto3 library to perform high-level OSS actions such as uploading, downloading, deleting files, and generating presigned URLs.

This file is designed to be a utility component within the larger InfiniFlow system — abstracting OSS interactions to simplify storage management, error handling, and configuration reuse. It supports default bucket and prefix path abstractions to reduce repetitive code and ensure consistent file path and bucket usage throughout an application.


Class: RAGFlowOSS

A singleton class managing the OSS connection and providing methods to interact with buckets and objects.

Purpose

Initialization

RAGFlowOSS()

Attributes

Attribute

Description

conn

boto3 S3 client instance for OSS connection.

oss_config

Dictionary loaded from settings with OSS config.

access_key

OSS access key ID.

secret_key

OSS secret access key.

endpoint_url

OSS service endpoint URL.

region

OSS region name.

bucket

Default bucket name (optional).

prefix_path

Default prefix path inside the bucket (optional).


Decorators

use_default_bucket

A method decorator that substitutes the bucket argument with the default bucket if it is configured.

Usage:

@use_default_bucket
def bucket_exists(self, bucket):
    ...

use_prefix_path

A method decorator that prepends the configured prefix_path to the filename/path argument.

Usage:

@use_prefix_path
@use_default_bucket
def put(self, bucket, fnm, binary):
    ...

Methods

__open__(self)

Establishes a new boto3 client connection to the OSS endpoint.

Example:

oss = RAGFlowOSS()
oss.__open__()

__close__(self)

Closes the existing OSS client connection by deleting the conn attribute and setting it to None.


bucket_exists(self, bucket: str) -> bool

Checks whether a bucket exists in the OSS.

Parameters:

Returns:

Example:

exists = oss.bucket_exists("my-bucket")

health(self)

Performs a simple health check by:

Returns the result of the upload operation.

Note: The method uses hardcoded test data and filename.

Example:

oss.health()

get_properties(self, bucket: str, key: str) -> dict

Stub method intended to retrieve metadata or properties of an object.

Parameters:

Returns:


list(self, bucket: str, dir: str, recursive: bool = True) -> list

Stub method intended to list objects under a directory/prefix.

Parameters:

Returns:


put(self, bucket: str, fnm: str, binary: bytes)

Uploads a binary object to the specified bucket and filename.

Parameters:

Returns:

Example:

with open('example.txt', 'rb') as f:
    data = f.read()
oss.put('my-bucket', 'folder/example.txt', data)

rm(self, bucket: str, fnm: str)

Deletes an object from the specified bucket and filename.

Parameters:

Example:

oss.rm('my-bucket', 'folder/example.txt')

get(self, bucket: str, fnm: str) -> bytes | None

Retrieves the binary content of an object.

Parameters:

Returns:

Example:

data = oss.get('my-bucket', 'folder/example.txt')
if data:
    print(data.decode())

obj_exist(self, bucket: str, fnm: str) -> bool

Checks if an object exists in a bucket.

Parameters:

Returns:

Example:

exists = oss.obj_exist('my-bucket', 'folder/example.txt')

get_presigned_url(self, bucket: str, fnm: str, expires: int) -> str | None

Generates a presigned URL for accessing an object.

Parameters:

Returns:

Example:

url = oss.get_presigned_url('my-bucket', 'folder/example.txt', expires=3600)
print(url)

Implementation Details & Algorithms


Interaction with Other Parts of the System


Usage Example

from oss_conn import RAGFlowOSS

oss = RAGFlowOSS()

# Upload a file
with open('data.bin', 'rb') as f:
    data = f.read()
oss.put('my-bucket', 'data/data.bin', data)

# Check if object exists
if oss.obj_exist('my-bucket', 'data/data.bin'):
    print("Object exists")

# Download a file
content = oss.get('my-bucket', 'data/data.bin')
print(content)

# Generate a presigned URL valid for 1 hour
url = oss.get_presigned_url('my-bucket', 'data/data.bin', expires=3600)
print(url)

# Delete the object
oss.rm('my-bucket', 'data/data.bin')

Diagram: Class Structure of RAGFlowOSS

classDiagram
    class RAGFlowOSS {
        -conn: boto3.client
        -oss_config: dict
        -access_key: str
        -secret_key: str
        -endpoint_url: str
        -region: str
        -bucket: str
        -prefix_path: str
        +__init__()
        +__open__()
        +__close__()
        +bucket_exists(bucket) bool
        +health()
        +get_properties(bucket, key) dict
        +list(bucket, dir, recursive=True) list
        +put(bucket, fnm, binary)
        +rm(bucket, fnm)
        +get(bucket, fnm) bytes
        +obj_exist(bucket, fnm) bool
        +get_presigned_url(bucket, fnm, expires) str
    }

Summary

oss_conn.py is a critical utility file that wraps OSS (S3-compatible) storage operations into a singleton class, simplifying and standardizing access to buckets and objects in the InfiniFlow system. It provides robust connection management, retry logic, and supports default configuration abstractions for bucket and path prefixes. This modular design enhances maintainability, error handling, and developer productivity for cloud object storage interactions.