Hi.
In my app, the users send in data to my FastAPI server where I use the weaviate client to add the data objects as well as relationships in batches. However, I’ve noticed that the weaviate client / batchexecutor doesn’t seem to handle concurrent requests well which leads to the uploads being messed up.
For eg, the object would upload but the reference to some other object might not get added.
Here’s some code:
# main.py
@app.post("/api/save")
async def save(request: Request, background_tasks: BackgroundTasks, current_user: TokenData = Depends(get_current_user)):
user_id = current_user.sub.replace("-", "_")
data = await request.json()
async def save_data():
try:
await run_in_threadpool(indexer, data=data, user_id=user_id)
except Exception as e:
print(f"Error in saving data: {e}")
background_tasks.add_task(save_data)
return {"status": "ok"}
indexer
def indexer(data: dict, user_id: str):
client = get_weaviate_client()
document = data["pageData"]
title = document["title"]
# uid = data["userData"]["userid"]
uri = document["url"]
document["chunked_content"] = preprocess(document)
document["chunked_content"].append(title)
# print("[*] Prepped document: ", uri)
source_class = settings.KNOWLEDGE_SOURCE_CLASS.format(user_id)
content_class = settings.CONTENT_CLASS.format(user_id)
print("[*] Indexing document: ", uri)
client.batch.configure(batch_size=50, num_workers=2)
with client.batch as batch:
total_chunks = len(document["chunked_content"])
parent_uuid = batch.add_data_object(
data_object={
'uri': uri,
'title': title
},
class_name=source_class
)
try:
for i, chunk in enumerate(document["chunked_content"]):
# TODO: better way to handle passage
# chunk = "passage: " + chunk
chunk_uuid = batch.add_data_object(
data_object={
'source_content': chunk,
},
class_name=content_class,
)
batch.add_reference(
from_object_uuid=chunk_uuid,
from_property_name="hasCategory",
to_object_uuid=parent_uuid,
from_object_class_name=content_class,
to_object_class_name=source_class
)
# print(f"[*] Added chunk no. {i} out of {total_chunks}")
except Exception as e:
print("[!] Failed to index document: ", uri, e)
print(chunk)
print("[!] Indexed document: ", uri)
return True
@lru_cache()
def get_weaviate_client():
return weaviate.Client(
url=settings.WEAVIATE_URL,
auth_client_secret=weaviate.AuthApiKey(api_key=settings.WEAVIATE_API_KEY),
additional_headers={
"X-OpenAI-Api-Key": settings.OPENAI_API_KEY,
"X-Huggingface-Api-Key": settings.HUGGINGFACE_API_KEY
}
)
What’s the recommended way to use the weaviate client/batchexecutor in this situation?