s3_conn.py
Overview
s3_conn.py provides a singleton class RAGFlowS3 that encapsulates interactions with Amazon S3-compatible object storage services using the boto3 SDK. It manages S3 connection setup and exposes convenient methods for common S3 operations such as checking bucket existence, uploading, downloading, deleting objects, and generating presigned URLs. The class supports configuration via a settings module, including credentials, region, endpoint, bucket name, and path prefixing.
This abstraction simplifies S3 usage within the broader InfiniFlow system by managing connection lifecycle, error handling, and path management internally, enabling other components to perform storage operations with minimal boilerplate.
Class: RAGFlowS3
A singleton class that manages the S3 client connection and provides S3 operations with defaults and path prefixing.
Initialization and Configuration
The instance is created once via the
@singletondecorator, ensuring a single shared connection.Configurations like AWS credentials, region, endpoint, bucket, and prefix path are loaded from the
rag.settings.S3dictionary.Supports optional AWS session token, custom endpoint (for S3-compatible services), signature version, and addressing style.
Automatically opens the S3 connection upon initialization (
open()).
Properties
Property | Description |
|---|---|
| List containing a single boto3 S3 client instance. |
| Configuration dictionary from settings. |
| AWS access key ID. |
| AWS secret access key. |
| AWS session token (optional). |
| AWS region name for client configuration. |
| Custom endpoint URL for S3-compatible services. |
| Signature version override for S3 requests. |
| S3 addressing style (path or virtual hosted). |
| Default bucket name used if no bucket is specified. |
| Optional prefix folder path inside buckets. |
Decorators
@use_default_bucket: If a method receives abucketargument, and if a default bucket is configured, uses the default bucket instead.@use_prefix_path: If a prefix path is set, prepends the prefix and bucket name to the filename/key, effectively namespacing objects.
These decorators are applied to various methods to transparently manage bucket and key naming.
Methods
__open__(self)
Initializes or re-initializes the boto3 S3 client connection.
Closes any existing connection.
Configures client parameters including credentials, region, endpoint, and signature/addressing style.
Stores the client in
self.connas a list with one element to allow simple replacement.
Raises and logs exceptions if connection fails.
__close__(self)
Closes the connection by deleting the client reference.
bucket_exists(self, bucket) -> bool
Checks if the specified bucket exists.
Uses
head_bucketAPI call.Returns
Trueif exists, otherwiseFalse.Logs debug information and exceptions.
Usage:
exists = s3_instance.bucket_exists('my-bucket')
health(self)
Performs a health check by uploading a small test file (txtxtxtxt1) with dummy content to the bucket/prefix.
Creates the bucket if it does not exist.
Uploads a small binary file.
Returns the result of the upload operation.
get_properties(self, bucket, key) -> dict
Stub method that currently returns an empty dictionary.
Intended to retrieve metadata or properties of an object.
Not implemented in this version.
list(self, bucket, dir, recursive=True) -> list
Stub method returning an empty list.
Intended to list objects under a directory (prefix) in a bucket.
Recursive listing parameter available but unused.
Not implemented in this version.
put(self, bucket, fnm, binary, *args, **kwargs)
Uploads binary data to a file in S3.
Uses decorators to apply default bucket and prefix path.
Checks if bucket exists; creates if missing.
Uploads binary data from a
BytesIOstream.Retries once on failure after reconnecting and waiting 1 second.
Logs debug info and exceptions.
Parameters:
bucket(str): Bucket name or default will be used.fnm(str): Filename/key to upload to (with prefix applied).binary(bytes): File content as bytes.
Returns:
Result of
upload_fileobjcall (usuallyNone).
Example:
s3_instance.put('my-bucket', 'folder/file.txt', b'Hello World')
rm(self, bucket, fnm, *args, **kwargs)
Deletes an object from S3.
Uses decorators for bucket and prefix.
Calls
delete_object.Logs exceptions on failure.
Example:
s3_instance.rm('my-bucket', 'folder/file.txt')
get(self, bucket, fnm, *args, **kwargs) -> bytes or None
Downloads an object from S3.
Uses decorators for bucket and prefix.
Calls
get_objectand reads the full body.Retries once on failure after reconnecting and waiting.
Returns the binary content or
Noneif failed.
Example:
data = s3_instance.get('my-bucket', 'folder/file.txt')
obj_exist(self, bucket, fnm, *args, **kwargs) -> bool
Checks if an object exists in S3.
Uses decorators.
Calls
head_object.Returns
Trueif exists,Falseif 404 error.Raises other exceptions.
Example:
exists = s3_instance.obj_exist('my-bucket', 'folder/file.txt')
get_presigned_url(self, bucket, fnm, expires, *args, **kwargs) -> str or None
Generates a presigned URL for an object valid for the given expiration time.
Uses decorators.
Retries up to 10 times on failure after reconnecting and waiting.
Returns the URL string or
Noneif failed.
Parameters:
expires(int): Expiration time in seconds.
Example:
url = s3_instance.get_presigned_url('my-bucket', 'folder/file.txt', expires=3600)
rm_bucket(self, bucket, *args, **kwargs)
Deletes all objects in a bucket and then deletes the bucket.
Iterates over all connections in
self.conn(usually one).Checks if bucket exists (calls non-existent
bucket_existson client — likely a bug, see notes).Lists objects and deletes them.
Deletes the bucket.
Logs errors if operations fail.
Important Implementation Details
Singleton Pattern: Ensures a single S3 connection instance across the application.
Connection Management: Uses
openandcloseto manage boto3 client lifecycle. Reconnects on failures.Decorators: Two decorators manage default bucket usage and key prefixing transparently, reducing repetitive code.
Retries: Methods that can fail due to network or connection issues retry once (or up to 10 times for URLs) with reconnection and sleep.
Use of
connas List: Maintains the boto3 client inside a list with one element to facilitate replacing the connection object without changing references elsewhere.Logging: Extensive logging for debugging and error tracking.
Stub Methods:
get_propertiesandlistare placeholders for potential future implementation.
Interaction with Other Parts of the System
Obtains configuration from
rag.settings.S3which must define keys likeaccess_key,secret_key,bucket, etc.Uses
rag.utils.singletondecorator to enforce singleton behavior.Intended as the storage backend handler used by other components that require S3 storage access.
Abstracts away boto3 details and credentials management for the rest of the system.
Usage Example
from s3_conn import RAGFlowS3
s3 = RAGFlowS3()
# Upload a file
content = b'Hello InfiniFlow!'
s3.put('my-bucket', 'path/to/file.txt', content)
# Check if file exists
exists = s3.obj_exist('my-bucket', 'path/to/file.txt')
# Download the file
if exists:
data = s3.get('my-bucket', 'path/to/file.txt')
print(data.decode())
# Generate a presigned URL valid for 1 hour
url = s3.get_presigned_url('my-bucket', 'path/to/file.txt', expires=3600)
print(url)
# Remove the file
s3.rm('my-bucket', 'path/to/file.txt')
Mermaid Class Diagram
classDiagram
class RAGFlowS3 {
-conn: list[boto3.client]
-s3_config: dict
-access_key: str
-secret_key: str
-session_token: str
-region_name: str
-endpoint_url: str
-signature_version: str
-addressing_style: str
-bucket: str
-prefix_path: str
+__init__()
+__open__()
+__close__()
+bucket_exists(bucket) bool
+health()
+get_properties(bucket, key) dict
+list(bucket, dir, recursive=True) list
+put(bucket, fnm, binary, *args, **kwargs)
+rm(bucket, fnm, *args, **kwargs)
+get(bucket, fnm, *args, **kwargs) bytes
+obj_exist(bucket, fnm, *args, **kwargs) bool
+get_presigned_url(bucket, fnm, expires, *args, **kwargs) str
+rm_bucket(bucket, *args, **kwargs)
}
Notes and Suggestions
The use of a list to store
self.connis unusual but appears designed to allowself.conn[0]replacement without changing references.The
rm_bucketmethod callsconn.bucket_exists()on the boto3 client, which does not exist; it should instead callself.bucket_exists(bucket)or usehead_bucket. This may be a bug.get_propertiesandlistmethods are stubs and should be implemented or removed.The retry loops generally only loop once (
range(1)), which is effectively no retry; this might be intended for future extension.Error handling is mostly logging exceptions and reconnecting, which is suitable for robust long-running services.
This documentation should provide a thorough understanding of s3_conn.py, enabling developers to maintain, extend, or integrate with the RAGFlowS3 class effectively.