s3_conn.py

Overview

s3_conn.py provides a singleton class RAGFlowS3 that encapsulates interactions with Amazon S3-compatible object storage services using the boto3 SDK. It manages S3 connection setup and exposes convenient methods for common S3 operations such as checking bucket existence, uploading, downloading, deleting objects, and generating presigned URLs. The class supports configuration via a settings module, including credentials, region, endpoint, bucket name, and path prefixing.

This abstraction simplifies S3 usage within the broader InfiniFlow system by managing connection lifecycle, error handling, and path management internally, enabling other components to perform storage operations with minimal boilerplate.


Class: RAGFlowS3

A singleton class that manages the S3 client connection and provides S3 operations with defaults and path prefixing.

Initialization and Configuration


Properties

Property

Description

conn

List containing a single boto3 S3 client instance.

s3_config

Configuration dictionary from settings.

access_key

AWS access key ID.

secret_key

AWS secret access key.

session_token

AWS session token (optional).

region_name

AWS region name for client configuration.

endpoint_url

Custom endpoint URL for S3-compatible services.

signature_version

Signature version override for S3 requests.

addressing_style

S3 addressing style (path or virtual hosted).

bucket

Default bucket name used if no bucket is specified.

prefix_path

Optional prefix folder path inside buckets.


Decorators

These decorators are applied to various methods to transparently manage bucket and key naming.


Methods

__open__(self)

Initializes or re-initializes the boto3 S3 client connection.

Raises and logs exceptions if connection fails.

__close__(self)

Closes the connection by deleting the client reference.


bucket_exists(self, bucket) -> bool

Checks if the specified bucket exists.

Usage:

exists = s3_instance.bucket_exists('my-bucket')

health(self)

Performs a health check by uploading a small test file (txtxtxtxt1) with dummy content to the bucket/prefix.


get_properties(self, bucket, key) -> dict

Stub method that currently returns an empty dictionary.


list(self, bucket, dir, recursive=True) -> list

Stub method returning an empty list.


put(self, bucket, fnm, binary, *args, **kwargs)

Uploads binary data to a file in S3.

Parameters:

Returns:

Example:

s3_instance.put('my-bucket', 'folder/file.txt', b'Hello World')

rm(self, bucket, fnm, *args, **kwargs)

Deletes an object from S3.

Example:

s3_instance.rm('my-bucket', 'folder/file.txt')

get(self, bucket, fnm, *args, **kwargs) -> bytes or None

Downloads an object from S3.

Example:

data = s3_instance.get('my-bucket', 'folder/file.txt')

obj_exist(self, bucket, fnm, *args, **kwargs) -> bool

Checks if an object exists in S3.

Example:

exists = s3_instance.obj_exist('my-bucket', 'folder/file.txt')

get_presigned_url(self, bucket, fnm, expires, *args, **kwargs) -> str or None

Generates a presigned URL for an object valid for the given expiration time.

Parameters:

Example:

url = s3_instance.get_presigned_url('my-bucket', 'folder/file.txt', expires=3600)

rm_bucket(self, bucket, *args, **kwargs)

Deletes all objects in a bucket and then deletes the bucket.


Important Implementation Details


Interaction with Other Parts of the System


Usage Example

from s3_conn import RAGFlowS3

s3 = RAGFlowS3()

# Upload a file
content = b'Hello InfiniFlow!'
s3.put('my-bucket', 'path/to/file.txt', content)

# Check if file exists
exists = s3.obj_exist('my-bucket', 'path/to/file.txt')

# Download the file
if exists:
    data = s3.get('my-bucket', 'path/to/file.txt')
    print(data.decode())

# Generate a presigned URL valid for 1 hour
url = s3.get_presigned_url('my-bucket', 'path/to/file.txt', expires=3600)
print(url)

# Remove the file
s3.rm('my-bucket', 'path/to/file.txt')

Mermaid Class Diagram

classDiagram
    class RAGFlowS3 {
        -conn: list[boto3.client]
        -s3_config: dict
        -access_key: str
        -secret_key: str
        -session_token: str
        -region_name: str
        -endpoint_url: str
        -signature_version: str
        -addressing_style: str
        -bucket: str
        -prefix_path: str
        +__init__()
        +__open__()
        +__close__()
        +bucket_exists(bucket) bool
        +health()
        +get_properties(bucket, key) dict
        +list(bucket, dir, recursive=True) list
        +put(bucket, fnm, binary, *args, **kwargs)
        +rm(bucket, fnm, *args, **kwargs)
        +get(bucket, fnm, *args, **kwargs) bytes
        +obj_exist(bucket, fnm, *args, **kwargs) bool
        +get_presigned_url(bucket, fnm, expires, *args, **kwargs) str
        +rm_bucket(bucket, *args, **kwargs)
    }

Notes and Suggestions


This documentation should provide a thorough understanding of s3_conn.py, enabling developers to maintain, extend, or integrate with the RAGFlowS3 class effectively.