I have a method which loops through the rows of a dataframe and saves the columns of the dataframe into weaviate schema. I am using auto-batching with batch_size 10 to achieve this.
The below code works fine if there’s a single API call to the server which triggers the save. However, when a scheduler runs on the server whose job is to save data for a number of tenants and triggers this method for all the tenants (Consequent calls, no parallel processing of tenants), eventually this error starts showing up in the logs and the method fails with a ConnectionError exception. Once this exception is raised, all further calls to weaviate get blocked and the only resolution left is to restart the instance.
" raise RequestsConnectionError("Batch was not added to weaviate.") from conn_err"
I couldn’t find anything relevant in the weaviate Kubernetes docker logs around the time this exception is raised.
I have tried adding a delay between all the save calls that the scheduler makes but it hasn’t helped. The saves go through for a couple of minutes and then the Batch addition error starts showing up.
I have observed this during batch deletion of objects as well.
Can someone please help me with this?
#Method to save objects in Weaviate @retry(wait=wait_fixed(1),stop=stop_after_delay(10),before_sleep=before_sleep_log(retry_logger, logging.DEBUG))
def save_to_weaviate(dataframe, schema_name : str, tenant_org_id : str, type : str, email : str = “”, store_name : str = “”, source : str = None,
trigger : str = “api”) :
weaviate_client = get_weaviate_client(trigger=trigger)
with weaviate_client.batch(
batch_size=10 # Specify the batch size
) as save_batch:
# Batch import all Questions
for index, row in dataframe.iterrows() :
properties = {
"embedding_text": row['completion'],
"tenant_org_id":tenant_org_id,
"embedding_type" : type,
"store_name" : store_name,
}
if email is not None :
properties.update({"email" : email})
if 'dataset_id' in row.index :
properties.update({"dataset_id" : row['dataset_id']})
if source is not None :
properties.update({"source" : source})
save_batch.add_data_object(properties, schema_name)
I have 3 clients defined for 3 different purposes. While this particular ‘product_sync_client’ raises an exception, the other clients work just fine. So, my understanding is that there isn’t any issue with the weaviate instance that is running on Kubernetes.
I have tried adding retry mechanisms and a delay of 35s between each retry. However, it hasn’t helped.
Could you please help me to understand the possible reason for the client connection to fail?
@DudaNogueira
The client responds fine if there’s a single API call to the server which triggers the batch save to weaviate using this client. However, when a scheduler runs on the server whose job is to save data for a number of tenants and triggers this method for all the tenants (Consequent calls, no parallel processing of tenants), eventually this error starts showing up in the logs and the client fails to add a batch. Once this exception is raised, all further calls to that weaviate client get blocked and the only resolution left is to restart the instance.
Could this be a load issue? Is there a better optimized way to handle such a type of requirement?
Read the discussion above. Looks like I am hitting the same issue. Everything was working in our prod till yesterday morning !
Now I am getting this error for any file/url uploads. It’s a single file upload but still hanging.
Here is the lambda function error . We are running Weaviate on docker container.
[ERROR] ConnectionError: Batch was not added to weaviate.
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 61, in lambda_handler
vectorstore.add_documents(docs[i:i+500])
File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
return self.add_texts(texts, metadatas, **kwargs)
File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
with self._client.batch as batch:
File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
self.flush()
File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
self._send_batch_requests(force_wait=True)
File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
response_objects, nr_objects = done_future.result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
response = request_handler(event, lambda_context)
File "/var/task/lambda_function.py", line 61, in lambda_handler
vectorstore.add_documents(docs[i:i+500])
File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
return self.add_texts(texts, metadatas, **kwargs)
File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
with self._client.batch as batch:
File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
self.flush()
File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
self._send_batch_requests(force_wait=True)
File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
response_objects, nr_objects = done_future.result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
response = request_handler(event, lambda_context)
File "/var/task/lambda_function.py", line 61, in lambda_handler
vectorstore.add_documents(docs[i:i+500])
File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
return self.add_texts(texts, metadatas, **kwargs)
File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
with self._client.batch as batch:
File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
self.flush()
File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
self._send_batch_requests(force_wait=True)
File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
response_objects, nr_objects = done_future.result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
response = request_handler(event, lambda_context)
File "/var/task/lambda_function.py", line 61, in lambda_handler
vectorstore.add_documents(docs[i:i+500])
File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
return self.add_texts(texts, metadatas, **kwargs)
File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
with self._client.batch as batch:
File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
self.flush()
File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
self._send_batch_requests(force_wait=True)
File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
response_objects, nr_objects = done_future.result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
response = request_handler(event, lambda_context)
File "/var/task/lambda_function.py", line 61, in lambda_handler
vectorstore.add_documents(docs[i:i+500])
File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
return self.add_texts(texts, metadatas, **kwargs)
File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
with self._client.batch as batch:
File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
self.flush()
File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
self._send_batch_requests(force_wait=True)
File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
response_objects, nr_objects = done_future.result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/var/runtime/awslambdaric/bootstrap.py", line 186, in handle_event_request
response = request_handler(event, lambda_context)
File "/var/task/lambda_function.py", line 61, in lambda_handler
vectorstore.add_documents(docs[i:i+500])
File "/opt/python/langchain/schema/vectorstore.py", line 122, in add_documents
return self.add_texts(texts, metadatas, **kwargs)
File "/opt/python/langchain/vectorstores/weaviate.py", line 145, in add_texts
with self._client.batch as batch:
File "/opt/python/weaviate/batch/crud_batch.py", line 1646, in __exit__
self.flush()
File "/opt/python/weaviate/batch/crud_batch.py", line 1252, in flush
self._send_batch_requests(force_wait=True)
File "/opt/python/weaviate/batch/crud_batch.py", line 1151, in _send_batch_requests
response_objects, nr_objects = done_future.result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/var/lang/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/var/lang/lib/python3.10/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/python/weaviate/batch/crud_batch.py", line 1099, in _flush_in_thread
response = self._create_data(
File "/opt/python/weaviate/batch/crud_batch.py", line 742, in _create_data
raise RequestsConnectionError("Batch was not added to weaviate.") from conn_err
@DudaNogueira Can you please help on this. This is urgent as our production is down!
Further findings:
When I tested locally I was hitting the same error as the discussion in below thread.
Weaviate will “eat up” all data you throw in. After ingestion, it will do some internal workload (indexing, compaction, cleaning, etc). This stage is very CPU bound.
How big is this batch? Are you running your own vectorization or using an API? What version are you running?
Have you seen any outstanding logs?
Another option is to increase the timeout_config while initiating the client. It’s default is (2, 20), so 2 seconds to connect, and 20 seconds to read.
Your indexing or vectorization is probably taking too much time and the client is raising a request connection error. If that is the case, the question is: why it is taking that long to process a batch?
Also, have you seen this doc on how to do error handling in python client v3?
This may give us more info.
Going forward, upgrading to Weaviate 1.23 and using the new python client v4 with it, that leverages the [GRPC connection] (gRPC | Weaviate - Vector Database) and async indexing and PQ might help you, nas it will improve imports and startup time while reducing the memory footprint.
I don’t think it has anything to do with batch and could be simply a memory issue or availability issue in aws. We are checking that if it’s network issue.
As we are using weaviate from aws marketplace it used autoscaling feature of pods which makes it difficult to troubleshoot as well.
For now we have deployed another weaviate prod instance in a different region which is working fine but not sure if the issue comes back.
To answer your queries above:
Yes this is a multi node setup and not yet explored ShardingConfigs.
How big is this batch? Are you running your own vectorization or using an API? What version are you running?
We are just getting started so no load as of now. Vectorization is through API calls which we checked working fine. Weaviate Version is : 3.26.0
Have you seen any outstanding logs?
This is the error I get when I try the url locally on a sample code :
TimeoutError Traceback (most recent call last)
File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/site-packages/urllib3/connectionpool.py:467, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
463 except BaseException as e:
464 # Remove the TypeError from the exception chain in
465 # Python 3 (including for exceptions like SystemExit).
466 # Otherwise it looks like a bug in the code.
--> 467 six.raise_from(e, None)
468 except (SocketTimeout, BaseSSLError, SocketError) as e:
File <string>:3, in raise_from(value, from_value)
File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/site-packages/urllib3/connectionpool.py:462, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
461 try:
--> 462 httplib_response = conn.getresponse()
463 except BaseException as e:
464 # Remove the TypeError from the exception chain in
465 # Python 3 (including for exceptions like SystemExit).
466 # Otherwise it looks like a bug in the code.
File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self)
1374 try:
-> 1375 response.begin()
1376 except ConnectionError:
...
--> 532 raise ReadTimeout(e, request=request)
533 elif isinstance(e, _InvalidHeader):
534 raise InvalidHeader(e, request=request)
ReadTimeout: HTTPConnectionPool(host='a6XXXXXXXXXXXXX8-1246XXX1.us-XXXX-1.elb.amazonaws.com', port=80): Read timed out. (read timeout=15)
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...
The error changes when I try let’s say 120 second of read timeout value
File /opt/homebrew/anaconda3/envs/Imagingeview_prod/lib/python3.10/site-packages/urllib3/connectionpool.py:467, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
463 except BaseException as e:
464 # Remove the TypeError from the exception chain in
465 # Python 3 (including for exceptions like SystemExit).
466 # Otherwise it looks like a bug in the code.
--> 467 six.raise_from(e, None)
468 except (SocketTimeout, BaseSSLError, SocketError) as e:
...
--> 147 raise RequestsConnectionError("Object was not added to Weaviate.") from conn_err
148 if response.status_code == 200:
149 return str(response.json()["id"])
ConnectionError: Object was not added to Weaviate.
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...
This looks like OpenAi taking too much time to return the vector.
Can you try running a vectorization command inside that very same server and compare it with runnning on a different computer, or even on your own desktop?
I’m running into this problem when testing a weaviate client program. I’m running against a local Docker container with a persistent volume and a constant-to-the-container’s-own-perception hostname.
I run several “jobs” (each consisting of some small-recordkeeping-records + a lot of batch-writes-to-weaviate). These work OK.
Then while the next job’s batch-heavy portion is ongoing I kill the Docker container. My client-side Python weaviate (weaviate-client package v3.x) client eventually gives up.
I start a few more jobs, they all fail.
I then restart the persistent-volume weaviate Docker container. >> At this point, for the next job, the small-non-batch-up-front-records go through just fine, but the batch-heavy-piece using the weaviate-batch-facilities fail with message “Batch was not added to weaviate” <<. This keeps going wrong on each subsequent job.
I suspect that re-allocating the weaviate-client client when running into this error on the next try will fix the problem. ( I’ll try this solution soon. )
UPDATE: yes, I tried the solution – to detect, in the batch-heavy portion of the job I have this in an exception-handler …
except Exception as exc6:
if isinstance(exc6, RequestsPackageConnectionError) and 0 <= str(exc6).lower().find("batch was not added"):
self.weaviate_client_hit_batch_error = True
....
… and then at some other checkpoints, if I see self.weaviate_client_hit_batch_error is True then I reallocate the weaviate-client.
We strongly suggest you upgrading to the python v4 (as well as Weaviate 1.24.X) as the batch ingestion is way better, not only leveraging GRPC, but having the feature of dynamic batches, where the client adjust the batch size accordingly to the server load.
Also, be aware that the python v4 package includes the python v3, which means you can upgrade the package dependency on your project, still use with your legacy code, and migrate to python v4 on a step by step manner.
Sorry for bring back the thread. But recently we host an Weaviate cluster version 1.24.4. But whenever I connect using python v4 client, I cannnot query or pushing data to DB. It return errors related to GRPC. Do you have any idea?