Inconsistent errors for weaviate batchInsert

I’m currently trying to do a batch insert of several thousand to million vectors for Weaviate cloud, and I am using the following code.

I am occasionally getting the following error {‘message’: ‘Failed to send 53 objects in a batch of 53. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.’}
however, both of these are empty.

Is there any way I can improve my error handling to debug the problem? Thank you,

     with self.collection.batch.fixed_size(batch_size=1000) as batch:
        for i, data_row in enumerate(property_rows):
            batch.add_object(
                properties=data_row,
                vector={
                    "title_vector": title_vectors[i],
                    "body_vector": body_vectors[i],
                    "keywords_vector": keyword_vectors[i],
                },
                uuid=generate_uuid5(data_row['ext_id'])
            )
        log.info(f"Num Errors: {batch.number_errors}")
        log.error(f"Collection Failed objects outside of loop: {self.collection.batch.failed_objects}")
        log.error(f"Client Failed objects outside of loop: {self.client.batch.failed_objects}")

Hello @JK_Rider,

I hope you having a lovely week!

Let me share my own code on how I handle error catching and messaging (I added your code bits in the loop instead of mine). Please try it out and see if it helps:

try: 
    with items.batch.dynamic() as batch:
            for i, data_row in enumerate(property_rows):
            batch.add_object(
                properties=data_row,
                vector={
                    "title_vector": title_vectors[i],
                    "body_vector": body_vectors[i],
                    "keywords_vector": keyword_vectors[i],
                },
                uuid=generate_uuid5(data_row['ext_id'])
            )

    failed_objs_a = items.batch.failed_objects  # Get failed objects
    if failed_objs_a:
        print(f"Number of failed objects in the first batch: {len(failed_objs_a)}")
        for i, failed_obj in enumerate(failed_objs_a, 1):
            print(f"Failed object {i}:")
            print(f"Error message: {failed_obj.message}")
    else:
        print("All objects were successfully added.")
except Exception as e:
    print(f"Error during batch import: {e}")
    print(f"Exception details: {str(e)}")
finally:
     client.close()

Additionally, I’m a fan of allowing dynamic behavior in batching, but fixed size works well - I recommend using client.batch.dynamic().

1 Like

Awesome, thanks for the fast response. I’ll give it a try and see if it works.

1 Like

@Mohamed_Shahin
Hi Mohamed, Thanks for the assistance I was able to get the following error message.

Query call with protocol GRPC batch failed with message <AioRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = “sendmsg: Broken pipe (32)”\n\tdebug_error_string = “UNKNOWN:Error received from peer {created_time:“2024-08-28T18:23:59.686140478+00:00”, grpc_status:14, grpc_message:“sendmsg: Broken pipe (32)”}”\n>.

Do you happen to have any insight on this error.

If Additional Insights helps: This is running in AWS Cloud, this error does not happen on my local desktop.

hi @JK_Rider !!

Can you paste the full stacktrace?

This can help us identify where this error happens in the code. This error message indicates a connection being blocked or timing out.

Hi @DudaNogueira
The error was printed out from this part of the code @Mohamed_Shahin , provided. if failed_objs_a:
print(f"Number of failed objects in the first batch: {len(failed_objs_a)}“)
for i, failed_obj in enumerate(failed_objs_a, 1):
print(f"Failed object {i}:”)
print(f"Error message: {failed_obj.message}")

If further information is needed, I’m using the following code to connect to Weaviate Cloud.

client = weaviate.connect_to_wcs(
cluster_url=self.url,
auth_credentials=weaviate.auth.AuthApiKey(self.api_key),
additional_config=AdditionalConfig(
timeout=Timeout(init=60, query=120, insert=720) # Values in seconds
)
the error will occasionally also be

Error message: WeaviateBatchError(‘Query call with protocol GRPC batch failed with message <AioRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = “recvmsg:Connection reset by peer”\n\tdebug_error_string = “UNKNOWN:Error received from peer {created_time:“2024-08-28T23:03:28.550055837+00:00”, grpc_status:14, grpc_message:“recvmsg:Connection reset by peer”}”\n>.’)

Hi!

Can you try setting the batch size to a smaller number and then experiment by increasing it?

for example:

with self.collection.batch.fixed_size(batch_size=50) as batch:
...