Recommended way to batch upload objects to handle concurrent requests

Hi.
In my app, the users send in data to my FastAPI server where I use the weaviate client to add the data objects as well as relationships in batches. However, I’ve noticed that the weaviate client / batchexecutor doesn’t seem to handle concurrent requests well which leads to the uploads being messed up.
For eg, the object would upload but the reference to some other object might not get added.
Here’s some code:

# main.py

@app.post("/api/save")
async def save(request: Request, background_tasks: BackgroundTasks, current_user: TokenData = Depends(get_current_user)):
    user_id = current_user.sub.replace("-", "_")
    data = await request.json()

    async def save_data():
        try:
            await run_in_threadpool(indexer, data=data, user_id=user_id)
        except Exception as e:
            print(f"Error in saving data: {e}")


    background_tasks.add_task(save_data)

    return {"status": "ok"}
indexer


def indexer(data: dict, user_id: str):
    client = get_weaviate_client()
    document = data["pageData"]
    title = document["title"]
    # uid = data["userData"]["userid"]

    uri = document["url"]
    document["chunked_content"] = preprocess(document)
    document["chunked_content"].append(title)

    # print("[*] Prepped document: ", uri)

    source_class = settings.KNOWLEDGE_SOURCE_CLASS.format(user_id)
    content_class = settings.CONTENT_CLASS.format(user_id)

    print("[*] Indexing document: ", uri)
    client.batch.configure(batch_size=50, num_workers=2)
    with client.batch as batch:
        total_chunks = len(document["chunked_content"])
        parent_uuid = batch.add_data_object(
            data_object={
                'uri': uri,
                'title': title
            },
            class_name=source_class
        )
        try:
            for i, chunk in enumerate(document["chunked_content"]):
                # TODO: better way to handle passage
                # chunk = "passage: " + chunk
                chunk_uuid = batch.add_data_object(
                    data_object={
                        'source_content': chunk,
                    },
                    class_name=content_class,
                )
                batch.add_reference(
                    from_object_uuid=chunk_uuid,
                    from_property_name="hasCategory",
                    to_object_uuid=parent_uuid,
                    from_object_class_name=content_class,
                    to_object_class_name=source_class
                )
                # print(f"[*] Added chunk no. {i} out of {total_chunks}")
        except Exception as e:
            print("[!] Failed to index document: ", uri, e)
            print(chunk)

    print("[!] Indexed document: ", uri)
    return True
@lru_cache()
def get_weaviate_client():
    return weaviate.Client(
        url=settings.WEAVIATE_URL,
        auth_client_secret=weaviate.AuthApiKey(api_key=settings.WEAVIATE_API_KEY),
        additional_headers={
            "X-OpenAI-Api-Key": settings.OPENAI_API_KEY,
            "X-Huggingface-Api-Key": settings.HUGGINGFACE_API_KEY
        }
    )

What’s the recommended way to use the weaviate client/batchexecutor in this situation?

Hi!

Wild guess: Have you tried using num_workers=1?

I don’t like this “double multithreading” for some reason :slight_smile:

Apologies if I’m asking a seemingly basic question - but what problems could this lead to?

I am not sure, to be honest. Would probably had to investigate more, or ask more experienced folks… I am also new to Weaviate and never used FastAPI myself :wink: hehehe

But run_in_threadpool will run in multithread a process that will also run in multithread (weaviate batch importer). Maybe it doesn’t allow it. So it could not be spinning the second worker. This could explain why some obejcts are not being imported.

But again, wild guess. I am not sure about it :frowning:

Thank you so much! This seems to be performing better. You’re right about the num_worker=2 being the issue prolly.

1 Like

Oh! Glad to hear that!!
Have fun!