Error while using insert_many() fuction inside nameko api

I am creating a nameko service for ingesting data in a weaviate collection. For this I am using insert_many() function.
The code works fine if I run it directly from a python file but when it is run as nameko service I am getting following error:

Error adding data to vectorstore. error: Query call with protocol GRPC batch failed with message invalid param ‘objects’: cannot be empty, need at least one object for batching.
Traceback (most recent call last):
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/batch/grpc_batch_objects.py”, line 137, in __send_batch
res, _ = self._connection.grpc_stub.BatchObjects.with_call(
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/grpc/_channel.py”, line 1194, in with_call
return _end_unary_response_blocking(state, call, True, None)
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/grpc/_channel.py”, line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = “invalid param ‘objects’: cannot be empty, need at least one object for batching”
debug_error_string = “UNKNOWN:Error received from peer {grpc_message:“invalid param 'objects': cannot be empty, need at least one object for batching”, grpc_status:2, created_time:“2024-05-07T17:45:10.503954978+05:30”}”

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/./api/processors/weaviate_ingest_data.py”, line 220, in update_vectorstore_crosswalk_with_crossreference
response = comp_desc.data.insert_many(data_objects)
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/data.py”, line 413, in insert_many
return self._batch_grpc.objects(
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/batch/grpc_batch_objects.py”, line 97, in objects
errors = self.__send_batch(weaviate_objs, timeout=timeout)
File “/home/prachi/code/gooru_github_repos/utils-gen-ai/venv-crosswalk/lib/python3.8/site-packages/weaviate/collections/batch/grpc_batch_objects.py”, line 151, in __send_batch
raise WeaviateBatchError(e.details()) # pyright: ignore
weaviate.exceptions.WeaviateBatchError: Query call with protocol GRPC batch failed with message invalid param ‘objects’: cannot be empty, need at least one object for batching.

The code that i am using is as follows:

class weaviate_vectorstore():

def __init__(self):
    try:
        connection = weaviate_db_url()
        self.client  = weaviate.WeaviateClient(
                connection_params=ConnectionParams.from_params(
                    http_host= connection[0],
                    http_port= connection[1],
                    http_secure= False,
                    grpc_host= connection[2],
                    grpc_port= connection[3],
                    grpc_secure=False,
                ),
                additional_headers={
                    "X-OpenAI-Api-Key": openai_key()
                },
                additional_config=AdditionalConfig(
                    timeout=Timeout(init=2, query=45, insert=240)
                ),
                skip_init_checks=True
            )                     
    except Exception as e:
            logger.error(f"Error connecting to vectorstore. error: {str(e)}")
            logger.error(traceback.format_exc())

def update_vectorstore_crosswalk_with_crossreference(self, base_collection_name, collection_name, csv_file_path):
try:
self.client.connect()
comp_desc = self.client.collections.get(collection_name)
comp_codes = self.client.collections.get(base_collection_name)
counter = 0

        with pd.read_csv(
                csv_file_path,
                chunksize=200,       # number of rows per chunk  ()
            ) as csv_iterator:                     
            
            # Iterate through the dataframe chunks and add each CSV record to the batch
            for chunk in csv_iterator:
                data_objects = list()
                ref_uuids = list()
                ref_uuids = {}
                chunk.fillna("Null", inplace=True)
                tx_comp_codes = chunk["tx_comp_code"]
                codes_object = comp_codes.query.fetch_objects(
                                                filters=wvc.query.Filter.by_property("tx_comp_code").contains_any(tx_comp_codes),
                                                limit=300
                                            )
                ref_uuids = {str(obj.uuid) : obj.properties["tx_comp_name"] for obj in codes_object.objects}
                # print(ref_uuids)

                for idx, row in chunk.iterrows():
                    ref_uuid = [key for key, val in ref_uuids.items() if val == row["tx_comp_name"]]
                    object = wvc.data.DataObject(
                                properties={
                                    "tx_comp_name": row["tx_comp_name"],
                                    "tx_comp_desc": row["tx_comp_desc"]
                                    },
                                references={"has_code": ref_uuid},
                            )
                    data_objects.append(object)

                    counter+=1
                    print(counter)

                # print(objects)
                if not data_objects:
                    raise ValueError("The 'objects' parameter cannot be empty.")
                else:
                    response =  comp_desc.data.insert_many(data_objects)
                    print(response)
                    print(f"Imported {counter} objects...")
                                                
        print(f"Finished importing {counter} objects.")

    except Exception as e:
        logger.error(f"Error adding data to vectorstore. error: {str(e)}")
        logger.error(traceback.format_exc())
    finally:
        self.client.close()

How to resolve this issue?

hi @P_K ! Welcome to our community :hugs:

I have never used nameko :thinking:

From the error message, it seems that thedata_objects variable sent to the server is empty, which is weird as I see you doing some check before calling insert_many.

Could you produce a python notebook with a reproducible code?

Thanks!

Hi @DudaNogueira Whenever I run with python notebook the code works well. The data objects variable is created properly using nameko service. I am not understanding why grpc status code is UNKNOWN?

Are you running that locally on nameko or using their cloud service?

If you can produce some nameko code and instructions, and would love to try reproducing this on my side.

Thanks!

I am running locally. This is sample nameko service code.

data_ingestion_service.py

import json
import os
import traceback
from werkzeug.wrappers import Response
from werkzeug.utils import secure_filename
from nameko_http import api
from nameko.web.handlers import http
import ast
import time
import sys
import threading

class WeaviateIngestDataService:
name = “Weaviate Data Ingestion Service”
upload_folder = upload_folder()
allowed_extensions = set(ast.literal_eval(allowed_extensions()))

def allowed_file(self, filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in self.allowed_extensions


@http('POST', '/ingestData')
def update_vectorstore_async_crossref(self, request):
    try:
        base_collection_name = request.values.get('base_collection_name')
        collection_name = request.values.get('collection_name')
        payload = request.files.get('file_data')

        if payload and self.allowed_file(payload.filename):
            if not os.path.exists(self.upload_folder):
                os.makedirs(self.upload_folder)

            filename = secure_filename(payload.filename)
            file_path_on_server = os.path.join(self.upload_folder, filename)
            
            with open(file_path_on_server, 'wb') as f:
                payload.save(f)
                logger.debug(f"Saved the file '{filename}' to the uploads folder")

            # Start a new thread for data ingestion
            threading.Thread(target=self.perform_data_ingestion_cr, args=(base_collection_name, collection_name, file_path_on_server)).start()

            return Response(
                json.dumps({'result': 'Data ingestion process started'}),
                status=200,
                mimetype='application/json'
            )
            
        else:
            logger.error("Invalid file format or no file provided.")
            return Response(
                json.dumps({'error': 'Invalid file format or no file provided'}),
                status=400,
                mimetype='application/json'
            )
    except Exception as e:
        logger.error(f"Something went wrong! error: {str(e)}")
        logger.error(traceback.format_exc())
        return Response(
            json.dumps({'error': 'Something went wrong!'}),
            status=500,
            mimetype='application/json'
        )

def perform_data_ingestion_cr(self, base_collection_name, collection_name, file_path):
    try:
        # Perform data ingestion
        start_time = time.time()
      weaviate_vectorstore().update_vectorstore_crosswalk_with_crossreference(base_collection_name=base_collection_name, collection_name=collection_name, csv_file_path=file_path)
        end_time = time.time()
        logger.debug(f"Time required for data ingestion : {end_time-start_time}")

    except Exception as e:
        logger.error(f"Error during data ingestion: {str(e)}")
        logger.error(traceback.format_exc())

You can run the data_ingestion_service in terminal for accesing the API
nameko run data_ingestion_service