I’m currently trying to do a batch insert of several thousand to million vectors for Weaviate cloud, and I am using the following code.
I am occasionally getting the following error {‘message’: ‘Failed to send 53 objects in a batch of 53. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.’}
however, both of these are empty.
Is there any way I can improve my error handling to debug the problem? Thank you,
with self.collection.batch.fixed_size(batch_size=1000) as batch:
for i, data_row in enumerate(property_rows):
batch.add_object(
properties=data_row,
vector={
"title_vector": title_vectors[i],
"body_vector": body_vectors[i],
"keywords_vector": keyword_vectors[i],
},
uuid=generate_uuid5(data_row['ext_id'])
)
log.info(f"Num Errors: {batch.number_errors}")
log.error(f"Collection Failed objects outside of loop: {self.collection.batch.failed_objects}")
log.error(f"Client Failed objects outside of loop: {self.client.batch.failed_objects}")
Hello @JK_Rider,
I hope you having a lovely week!
Let me share my own code on how I handle error catching and messaging (I added your code bits in the loop instead of mine). Please try it out and see if it helps:
try:
with items.batch.dynamic() as batch:
for i, data_row in enumerate(property_rows):
batch.add_object(
properties=data_row,
vector={
"title_vector": title_vectors[i],
"body_vector": body_vectors[i],
"keywords_vector": keyword_vectors[i],
},
uuid=generate_uuid5(data_row['ext_id'])
)
failed_objs_a = items.batch.failed_objects # Get failed objects
if failed_objs_a:
print(f"Number of failed objects in the first batch: {len(failed_objs_a)}")
for i, failed_obj in enumerate(failed_objs_a, 1):
print(f"Failed object {i}:")
print(f"Error message: {failed_obj.message}")
else:
print("All objects were successfully added.")
except Exception as e:
print(f"Error during batch import: {e}")
print(f"Exception details: {str(e)}")
finally:
client.close()
Additionally, I’m a fan of allowing dynamic behavior in batching, but fixed size works well - I recommend using client.batch.dynamic().
1 Like
Awesome, thanks for the fast response. I’ll give it a try and see if it works.
1 Like
@Mohamed_Shahin
Hi Mohamed, Thanks for the assistance I was able to get the following error message.
Query call with protocol GRPC batch failed with message <AioRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = “sendmsg: Broken pipe (32)”\n\tdebug_error_string = “UNKNOWN:Error received from peer {created_time:“2024-08-28T18:23:59.686140478+00:00”, grpc_status:14, grpc_message:“sendmsg: Broken pipe (32)”}”\n>.
Do you happen to have any insight on this error.
If Additional Insights helps: This is running in AWS Cloud, this error does not happen on my local desktop.
hi @JK_Rider !!
Can you paste the full stacktrace?
This can help us identify where this error happens in the code. This error message indicates a connection being blocked or timing out.
Hi @DudaNogueira
The error was printed out from this part of the code @Mohamed_Shahin , provided. if failed_objs_a:
print(f"Number of failed objects in the first batch: {len(failed_objs_a)}“)
for i, failed_obj in enumerate(failed_objs_a, 1):
print(f"Failed object {i}:”)
print(f"Error message: {failed_obj.message}")
If further information is needed, I’m using the following code to connect to Weaviate Cloud.
client = weaviate.connect_to_wcs(
cluster_url=self.url,
auth_credentials=weaviate.auth.AuthApiKey(self.api_key),
additional_config=AdditionalConfig(
timeout=Timeout(init=60, query=120, insert=720) # Values in seconds
)
the error will occasionally also be
Error message: WeaviateBatchError(‘Query call with protocol GRPC batch failed with message <AioRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = “recvmsg:Connection reset by peer”\n\tdebug_error_string = “UNKNOWN:Error received from peer {created_time:“2024-08-28T23:03:28.550055837+00:00”, grpc_status:14, grpc_message:“recvmsg:Connection reset by peer”}”\n>.’)
Hi!
Can you try setting the batch size to a smaller number and then experiment by increasing it?
for example:
with self.collection.batch.fixed_size(batch_size=50) as batch:
...