Hi,
I am using the weaviate python client 3.19.2.
I have a method which loops through the rows of a dataframe and saves the columns of the dataframe into weaviate schema. I am using auto-batching with batch_size 10 to achieve this.
The below code works fine if there’s a single API call to the server which triggers the save. However, when a scheduler runs on the server whose job is to save data for a number of tenants and triggers this method for all the tenants (Consequent calls, no parallel processing of tenants), eventually this error starts showing up in the logs and the method fails with a ConnectionError exception. Once this exception is raised, all further calls to weaviate get blocked and the only resolution left is to restart the instance.
" raise RequestsConnectionError("Batch was not added to weaviate.") from conn_err"
I couldn’t find anything relevant in the weaviate Kubernetes docker logs around the time this exception is raised.
I have tried adding a delay between all the save calls that the scheduler makes but it hasn’t helped. The saves go through for a couple of minutes and then the Batch addition error starts showing up.
I have observed this during batch deletion of objects as well.
Can someone please help me with this?
Code Snippet:
#Weaviate client
product_sync_client = weaviate.Client(
url = read_env.getWeaviateEndpoint(),
additional_headers={
“X-OpenAI-Api-Key”: read_env.getOpenAIKey()
}
)
#Method to save objects in Weaviate
@retry(wait=wait_fixed(1),stop=stop_after_delay(10),before_sleep=before_sleep_log(retry_logger, logging.DEBUG))
def save_to_weaviate(dataframe, schema_name : str, tenant_org_id : str, type : str, email : str = “”, store_name : str = “”, source : str = None,
trigger : str = “api”) :
weaviate_client = get_weaviate_client(trigger=trigger)
with weaviate_client.batch(
batch_size=10 # Specify the batch size
) as save_batch:
# Batch import all Questions
for index, row in dataframe.iterrows() :
properties = {
"embedding_text": row['completion'],
"tenant_org_id":tenant_org_id,
"embedding_type" : type,
"store_name" : store_name,
}
if email is not None :
properties.update({"email" : email})
if 'dataset_id' in row.index :
properties.update({"dataset_id" : row['dataset_id']})
if source is not None :
properties.update({"source" : source})
save_batch.add_data_object(properties, schema_name)