Error in Batch Addition

Hi,

I am using the weaviate python client 3.19.2.

I have a method which loops through the rows of a dataframe and saves the columns of the dataframe into weaviate schema. I am using auto-batching with batch_size 10 to achieve this.

The below code works fine if there’s a single API call to the server which triggers the save. However, when a scheduler runs on the server whose job is to save data for a number of tenants and triggers this method for all the tenants (Consequent calls, no parallel processing of tenants), eventually this error starts showing up in the logs and the method fails with a ConnectionError exception. Once this exception is raised, all further calls to weaviate get blocked and the only resolution left is to restart the instance.

" raise RequestsConnectionError("Batch was not added to weaviate.") from conn_err"

I couldn’t find anything relevant in the weaviate Kubernetes docker logs around the time this exception is raised.

I have tried adding a delay between all the save calls that the scheduler makes but it hasn’t helped. The saves go through for a couple of minutes and then the Batch addition error starts showing up.

I have observed this during batch deletion of objects as well.
Can someone please help me with this?

Code Snippet:
#Weaviate client
product_sync_client = weaviate.Client(
url = read_env.getWeaviateEndpoint(),
additional_headers={
“X-OpenAI-Api-Key”: read_env.getOpenAIKey()
}
)

#Method to save objects in Weaviate
@retry(wait=wait_fixed(1),stop=stop_after_delay(10),before_sleep=before_sleep_log(retry_logger, logging.DEBUG))
def save_to_weaviate(dataframe, schema_name : str, tenant_org_id : str, type : str, email : str = “”, store_name : str = “”, source : str = None,
trigger : str = “api”) :

weaviate_client = get_weaviate_client(trigger=trigger)

with weaviate_client.batch(
        batch_size=10 # Specify the batch size
    )  as save_batch:
        
        # Batch import all Questions
        for index, row in dataframe.iterrows() :
            properties = {
                "embedding_text": row['completion'],
                "tenant_org_id":tenant_org_id,
                "embedding_type" : type,
                "store_name" : store_name,
            }
            if email is not None :
                properties.update({"email" : email})
            if 'dataset_id' in row.index :
                properties.update({"dataset_id" : row['dataset_id']})  
            if source is not None :
                properties.update({"source" : source})

            save_batch.add_data_object(properties, schema_name)

Hi @pn_9707 ! Sorry for the delay here. Missed this one :frowning:

Were you able to solve this?

This looks like a client connection error to Weaviate server.

Hi @DudaNogueira,

That’s completely alright :slightly_smiling_face:

No, I haven’t been able to solve this yet.

I have 3 clients defined for 3 different purposes. While this particular ‘product_sync_client’ raises an exception, the other clients work just fine. So, my understanding is that there isn’t any issue with the weaviate instance that is running on Kubernetes.

I have tried adding retry mechanisms and a delay of 35s between each retry. However, it hasn’t helped.

Could you please help me to understand the possible reason for the client connection to fail?

This must be some connectivity issue from this specific client, considering the other two are able to communicate.

Can you get a python shell from where this client is failing?

If you can, try to initiate the weaviate client from that shell, and query something, like the schemas:

client.schema.get()

With that you can check connectivity and auth. You should get the schemas as the output.

@DudaNogueira
The client responds fine if there’s a single API call to the server which triggers the batch save to weaviate using this client. However, when a scheduler runs on the server whose job is to save data for a number of tenants and triggers this method for all the tenants (Consequent calls, no parallel processing of tenants), eventually this error starts showing up in the logs and the client fails to add a batch. Once this exception is raised, all further calls to that weaviate client get blocked and the only resolution left is to restart the instance.

Could this be a load issue? Is there a better optimized way to handle such a type of requirement?

Hey Everyone!

Read the discussion above. Looks like I am hitting the same issue. Everything was working in our prod till yesterday morning !

Now I am getting this error for any file/url uploads. It’s a single file upload but still hanging.

Here is the lambda function error . We are running Weaviate on docker container.

[ERROR] ConnectionError: Batch was not added to weaviate.
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 61, in lambda_handler
    vectorstore.add_documents(docs[i:i+500])
  File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
    return self.add_texts(texts, metadatas, **kwargs)
  File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
    with self._client.batch as batch:
  File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
    self.flush()
  File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
    self._send_batch_requests(force_wait=True)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
    response_objects, nr_objects = done_future.result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
    response = request_handler(event, lambda_context)
  File "/var/task/lambda_function.py", line 61, in lambda_handler
    vectorstore.add_documents(docs[i:i+500])
  File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
    return self.add_texts(texts, metadatas, **kwargs)
  File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
    with self._client.batch as batch:
  File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
    self.flush()
  File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
    self._send_batch_requests(force_wait=True)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
    response_objects, nr_objects = done_future.result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
    response = request_handler(event, lambda_context)
  File "/var/task/lambda_function.py", line 61, in lambda_handler
    vectorstore.add_documents(docs[i:i+500])
  File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
    return self.add_texts(texts, metadatas, **kwargs)
  File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
    with self._client.batch as batch:
  File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
    self.flush()
  File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
    self._send_batch_requests(force_wait=True)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
    response_objects, nr_objects = done_future.result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
    response = request_handler(event, lambda_context)
  File "/var/task/lambda_function.py", line 61, in lambda_handler
    vectorstore.add_documents(docs[i:i+500])
  File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
    return self.add_texts(texts, metadatas, **kwargs)
  File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
    with self._client.batch as batch:
  File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
    self.flush()
  File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
    self._send_batch_requests(force_wait=True)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
    response_objects, nr_objects = done_future.result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
    response = request_handler(event, lambda_context)
  File "/var/task/lambda_function.py", line 61, in lambda_handler
    vectorstore.add_documents(docs[i:i+500])
  File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
    return self.add_texts(texts, metadatas, **kwargs)
  File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
    with self._client.batch as batch:
  File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
    self.flush()
  File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
    self._send_batch_requests(force_wait=True)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
    response_objects, nr_objects = done_future.result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
    response = request_handler(event, lambda_context)
  File "/var/task/lambda_function.py", line 61, in lambda_handler
    vectorstore.add_documents(docs[i:i+500])
  File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
    return self.add_texts(texts, metadatas, **kwargs)
  File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
    with self._client.batch as batch:
  File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
    self.flush()
  File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
    self._send_batch_requests(force_wait=True)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
    response_objects, nr_objects = done_future.result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/var/lang/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/python/weaviate/batch/crud_batch.py", line 1099, in _flush_in_thread
    response = self._create_data(
  File "/opt/python/weaviate/batch/crud_batch.py", line 742, in _create_data
    raise RequestsConnectionError("Batch was not added to weaviate.") from conn_err

@DudaNogueira Can you please help on this. This is urgent as our production is down!

Further findings:

When I tested locally I was hitting the same error as the discussion in below thread.

Hi!

I believe this is a load/scale issue.

Weaviate will “eat up” all data you throw in. After ingestion, it will do some internal workload (indexing, compaction, cleaning, etc). This stage is very CPU bound.

Are you running a multi node setup?

Have you played around with ShardingConfig?

I believe that you will need to throtlle your import rate or beef up your server with multiple nodes.

@Indrajit_CS

How big is this batch? Are you running your own vectorization or using an API? What version are you running?

Have you seen any outstanding logs?

Another option is to increase the timeout_config while initiating the client. It’s default is (2, 20), so 2 seconds to connect, and 20 seconds to read.

Your indexing or vectorization is probably taking too much time and the client is raising a request connection error. If that is the case, the question is: why it is taking that long to process a batch?

Also, have you seen this doc on how to do error handling in python client v3?

This may give us more info.

Going forward, upgrading to Weaviate 1.23 and using the new python client v4 with it, that leverages the [GRPC connection] (gRPC | Weaviate - Vector Database) and async indexing and PQ might help you, nas it will improve imports and startup time while reducing the memory footprint.

Let me know if that helps.

I don’t think it has anything to do with batch and could be simply a memory issue or availability issue in aws. We are checking that if it’s network issue.
As we are using weaviate from aws marketplace it used autoscaling feature of pods which makes it difficult to troubleshoot as well.

For now we have deployed another weaviate prod instance in a different region which is working fine but not sure if the issue comes back.

To answer your queries above:

Yes this is a multi node setup and not yet explored ShardingConfigs.

How big is this batch? Are you running your own vectorization or using an API? What version are you running?

We are just getting started so no load as of now. Vectorization is through API calls which we checked working fine. Weaviate Version is : 3.26.0

Have you seen any outstanding logs?

This is the error I get when I try the url locally on a sample code :

TimeoutError                              Traceback (most recent call last)
File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/site-packages/urllib3/connectionpool.py:467, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    463         except BaseException as e:
    464             # Remove the TypeError from the exception chain in
    465             # Python 3 (including for exceptions like SystemExit).
    466             # Otherwise it looks like a bug in the code.
--> 467             six.raise_from(e, None)
    468 except (SocketTimeout, BaseSSLError, SocketError) as e:

File <string>:3, in raise_from(value, from_value)

File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/site-packages/urllib3/connectionpool.py:462, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    461 try:
--> 462     httplib_response = conn.getresponse()
    463 except BaseException as e:
    464     # Remove the TypeError from the exception chain in
    465     # Python 3 (including for exceptions like SystemExit).
    466     # Otherwise it looks like a bug in the code.

File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self)
   1374 try:
-> 1375     response.begin()
   1376 except ConnectionError:
...
--> 532     raise ReadTimeout(e, request=request)
    533 elif isinstance(e, _InvalidHeader):
    534     raise InvalidHeader(e, request=request)

ReadTimeout: HTTPConnectionPool(host='a6XXXXXXXXXXXXX8-1246XXX1.us-XXXX-1.elb.amazonaws.com', port=80): Read timed out. (read timeout=15)
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...

The error changes when I try let’s say 120 second of read timeout value

File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/site-packages/urllib3/connectionpool.py:467, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    463         except BaseException as e:
    464             # Remove the TypeError from the exception chain in
    465             # Python 3 (including for exceptions like SystemExit).
    466             # Otherwise it looks like a bug in the code.
--> 467             six.raise_from(e, None)
    468 except (SocketTimeout, BaseSSLError, SocketError) as e:
...
--> 147     raise RequestsConnectionError("Object was not added to Weaviate.") from conn_err
    148 if response.status_code == 200:
    149     return str(response.json()["id"])

ConnectionError: Object was not added to Weaviate.
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...


I have already tried with timeout but no luck :slight_smile:

#client = Client(“http://a33XXXXXXXXXXXXXXXXXXXXX022113610.us-XXX-2.elb.amazonaws.com”,timeout_config=(5, 360))

I am attaching the screenshot that works with newly deployed url and doesn’t throw the above error:

If it’s memory issue not sure as we have two ec2 instances (r6i.large) with 16 GB of memory each and 2 vCPUS. Also there is precisely no load really.

Looking forward to your suggestions to prevent this issue in future.

@DudaNogueira Did you get a chance to look at it ?

This looks like OpenAi taking too much time to return the vector.

Can you try running a vectorization command inside that very same server and compare it with runnning on a different computer, or even on your own desktop?

I’m running into this problem when testing a weaviate client program. I’m running against a local Docker container with a persistent volume and a constant-to-the-container’s-own-perception hostname.

I run several “jobs” (each consisting of some small-recordkeeping-records + a lot of batch-writes-to-weaviate). These work OK.

Then while the next job’s batch-heavy portion is ongoing I kill the Docker container. My client-side Python weaviate (weaviate-client package v3.x) client eventually gives up.

I start a few more jobs, they all fail.

I then restart the persistent-volume weaviate Docker container. >> At this point, for the next job, the small-non-batch-up-front-records go through just fine, but the batch-heavy-piece using the weaviate-batch-facilities fail with message “Batch was not added to weaviate” <<. This keeps going wrong on each subsequent job.

I suspect that re-allocating the weaviate-client client when running into this error on the next try will fix the problem. ( I’ll try this solution soon. )

UPDATE: yes, I tried the solution – to detect, in the batch-heavy portion of the job I have this in an exception-handler …

        except Exception as exc6:
            if isinstance(exc6, RequestsPackageConnectionError) and 0 <= str(exc6).lower().find("batch was not added"):
                self.weaviate_client_hit_batch_error = True
            ....

… and then at some other checkpoints, if I see self.weaviate_client_hit_batch_error is True then I reallocate the weaviate-client.

It seems this problem is solved.

Hi @Nathan_Watson !

Sorry for the delay here.

We strongly suggest you upgrading to the python v4 (as well as Weaviate 1.24.X) as the batch ingestion is way better, not only leveraging GRPC, but having the feature of dynamic batches, where the client adjust the batch size accordingly to the server load.

Also, be aware that the python v4 package includes the python v3, which means you can upgrade the package dependency on your project, still use with your legacy code, and migrate to python v4 on a step by step manner.

Let me know if this helps. :slight_smile:

Hi @DudaNogueira ,

Sorry for bring back the thread. But recently we host an Weaviate cluster version 1.24.4. But whenever I connect using python v4 client, I cannnot query or pushing data to DB. It return errors related to GRPC. Do you have any idea?

hi @viethungluu !

Can you please open a new thread, and specify the errors and informations?

If possible, paste the entire error stack, so we can try to identify it better.

THanks!