conftest.py

Overview

conftest.py is a configuration and utility module used within the pytest testing framework for the InfiniFlow project. Its primary purpose is to provide reusable test fixtures and helper functions that support the testing of document parsing and chunk management functionalities. This file integrates with the system’s document handling API by utilizing common utilities for adding, listing, parsing, and deleting document chunks. It also implements a custom wait condition to ensure asynchronous document parsing completes before proceeding with tests.


Detailed Explanations

Imports


Functions and Fixtures

condition(_auth, _dataset_id)

@wait_for(30, 1, "Document parsing timeout")
def condition(_auth, _dataset_id):
    res = list_documnets(_auth, _dataset_id)
    for doc in res["data"]["docs"]:
        if doc["run"] != "DONE":
            return False
    return True

add_chunks_func(request, get_http_api_auth, add_document)

@pytest.fixture(scope="function")
def add_chunks_func(request, get_http_api_auth, add_document):
    dataset_id, document_id = add_document
    parse_documnets(get_http_api_auth, dataset_id, {"document_ids": [document_id]})
    condition(get_http_api_auth, dataset_id)

    chunk_ids = []
    for i in range(4):
        res = add_chunk(get_http_api_auth, dataset_id, document_id, {"content": f"chunk test {i}"})
        chunk_ids.append(res["data"]["chunk"]["id"])

    from time import sleep
    sleep(1)

    def cleanup():
        delete_chunks(get_http_api_auth, dataset_id, document_id, {"chunk_ids": chunk_ids})

    request.addfinalizer(cleanup)
    return dataset_id, document_id, chunk_ids
def test_chunk_addition(add_chunks_func):
    dataset_id, document_id, chunk_ids = add_chunks_func
    assert len(chunk_ids) == 4
    # Further assertions and test logic here

Implementation Details and Algorithms


Interaction with Other Parts of the System


Mermaid Diagram

The following flowchart represents the workflow and relationships between the main functions and fixtures in conftest.py:

flowchart TD
    A[get_http_api_auth, add_document fixtures] --> B[add_chunks_func fixture]
    B --> C[parse_documnets API call]
    C --> D[condition function with wait_for]
    D -->|polls| E[list_documnets API call]
    B --> F[add_chunk API calls (4 times)]
    B --> G[sleep(1) for timing fix]
    B --> H[register cleanup finalizer]
    H --> I[delete_chunks API call]

Summary