I am creating a nameko service for ingesting data in a weaviate collection. For this I am using insert_many() function.
The code works fine if I run it directly from a python file but when it is run as nameko service I am getting following error:
Error adding data to vectorstore. error: Query call with protocol GRPC batch failed with message invalid param ‘objects’: cannot be empty, need at least one object for batching.
Traceback (most recent call last):
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/batch/grpc_batch_objects.py”, line 137, in __send_batch
res, _ = self._connection.grpc_stub.BatchObjects.with_call(
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/grpc/_channel.py”, line 1194, in with_call
return _end_unary_response_blocking(state, call, True, None)
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/grpc/_channel.py”, line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = “invalid param ‘objects’: cannot be empty, need at least one object for batching”
debug_error_string = “UNKNOWN:Error received from peer {grpc_message:“invalid param 'objects': cannot be empty, need at least one object for batching”, grpc_status:2, created_time:“2024-05-07T17:45:10.503954978+05:30”}”
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/./api/processors/weaviate_ingest_data.py”, line 220, in update_vectorstore_crosswalk_with_crossreference
response = comp_desc.data.insert_many(data_objects)
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/data.py”, line 413, in insert_many
return self._batch_grpc.objects(
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/batch/grpc_batch_objects.py”, line 97, in objects
errors = self.__send_batch(weaviate_objs, timeout=timeout)
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/batch/grpc_batch_objects.py”, line 151, in __send_batch
raise WeaviateBatchError(e.details()) # pyright: ignore
weaviate.exceptions.WeaviateBatchError: Query call with protocol GRPC batch failed with message invalid param ‘objects’: cannot be empty, need at least one object for batching.
The code that i am using is as follows:
class weaviate_vectorstore():
def __init__(self):
try:
connection = weaviate_db_url()
self.client = weaviate.WeaviateClient(
connection_params=ConnectionParams.from_params(
http_host= connection[0],
http_port= connection[1],
http_secure= False,
grpc_host= connection[2],
grpc_port= connection[3],
grpc_secure=False,
),
additional_headers={
"X-OpenAI-Api-Key": openai_key()
},
additional_config=AdditionalConfig(
timeout=Timeout(init=2, query=45, insert=240)
),
skip_init_checks=True
)
except Exception as e:
logger.error(f"Error connecting to vectorstore. error: {str(e)}")
logger.error(traceback.format_exc())
def update_vectorstore_crosswalk_with_crossreference(self, base_collection_name, collection_name, csv_file_path):
try:
self.client.connect()
comp_desc = self.client.collections.get(collection_name)
comp_codes = self.client.collections.get(base_collection_name)
counter = 0
with pd.read_csv(
csv_file_path,
chunksize=200, # number of rows per chunk ()
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
data_objects = list()
ref_uuids = list()
ref_uuids = {}
chunk.fillna("Null", inplace=True)
tx_comp_codes = chunk["tx_comp_code"]
codes_object = comp_codes.query.fetch_objects(
filters=wvc.query.Filter.by_property("tx_comp_code").contains_any(tx_comp_codes),
limit=300
)
ref_uuids = {str(obj.uuid) : obj.properties["tx_comp_name"] for obj in codes_object.objects}
# print(ref_uuids)
for idx, row in chunk.iterrows():
ref_uuid = [key for key, val in ref_uuids.items() if val == row["tx_comp_name"]]
object = wvc.data.DataObject(
properties={
"tx_comp_name": row["tx_comp_name"],
"tx_comp_desc": row["tx_comp_desc"]
},
references={"has_code": ref_uuid},
)
data_objects.append(object)
counter+=1
print(counter)
# print(objects)
if not data_objects:
raise ValueError("The 'objects' parameter cannot be empty.")
else:
response = comp_desc.data.insert_many(data_objects)
print(response)
print(f"Imported {counter} objects...")
print(f"Finished importing {counter} objects.")
except Exception as e:
logger.error(f"Error adding data to vectorstore. error: {str(e)}")
logger.error(traceback.format_exc())
finally:
self.client.close()
How to resolve this issue?