Weaviate Batch Errors during Batch Insertion with v4 client

I’m trying to do a batch import of ~300k objects to my WCS vector DB. The cluster has asynchronous indexing enabled, and I’m using the v4 client.

try:
    # with client.batch.fixed_size(batch_size=300, concurrent_requests=10, consistency_level=ConsistencyLevel.QUORUM) as batch:
    with client.batch.dynamic(consistency_level=ConsistencyLevel.QUORUM) as batch:
        batch_start_time = time.time()
        for data in final_users:
            if len(str(data)) > 25000:
                print(f"Data too long!, userId: {data['userId']}, name: {data['name']}")
                continue
            batch.add_object(
                collection="CollectionName",
                properties={
                    ... # properties here as a dict, like:
                    # "name": data['name']
                    # ....
                }
            )
            if batch.number_errors > 100:
                print(f"Batch failed with {batch.number_errors} errors!")
                break
            else:
                counter += 1
                if counter % interval == 0:
                    batch_end_time = time.time()
                    print(f"Batch {counter}/{len(final_users)} done in {batch_end_time - batch_start_time:.2f} seconds, with {batch.number_errors} errors!")
                if counter % 8000 == 0:
                    print("Sleeping for 100 seconds...")
                    time.sleep(100)
                batch_start_time = time.time()  # Reset the start time for the next batch

finally:
    client.close()

I get the following errors always which is really annoying:
ErrorObject(message='update vector: connection to: OpenAI API failed with status: 503 error: Service Unavailable.',...

ErrorObject(message="WeaviateBatchError('Query call with protocol GRPC batch failed with message Received http2 header with status: 502.')...

WeaviateBatchError: Query call with protocol GRPC batch failed with message Deadline Exceeded.

Here is how I am connecting to my cluster:

client = weaviate.connect_to_wcs(
    cluster_url="71b8fuq1res4bsprkp4gjq.c0-1.us-east1.gcp.weaviate.cloud",
    auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_AUTH_KEY),
    headers={
        'X-OpenAI-Api-key': OPENAI_KEY,
        'X-Cohere-Api-key': COHERE_KEY
    },
    additional_config=AdditionalConfig(
        connection=ConnectionConfig(
            session_pool_connections=30,
            session_pool_maxsize=200,
            session_pool_max_retries=3,
        ),
        timeout=(60, 180),
    )

)

There is very little in terms of error handling and documentation in the Weaviate Docs. Is there a way I could fix these errors in particular?

Weaviate Client Version: 4.5.1
Weaviate Server version: 1.24.12
I’m using OpenAI’s text-embedding-large 3 model for vectorization.

hi @aritraban !!

This error message indicates that OpenAI was the one that timed out and led Weaviate to also timeout.

This can eventually happen :grimacing:

Are you still facing this issue? It usually goes away after OpenAi misse behave.

Thanks!

The other errors are still happening (other than openai timeouts). The GRPC deadline and 502 one.

Hi @aritraban,

  1. Are you using both OpenAI and Cohere for vectorization?
    Can you share the code you used to create your collections?

  2. When you run an import, do you load data into multiple collections or just one?
    If you are using only one collection, you could grab the collection object, and use it to send batches of data.

your_collection = client.collections.get("CollectionName")
with your_collection.batch.fixed_size(batch_size=300, concurrent_requests=2, consistency_level=ConsistencyLevel.QUORUM) as batch:
  batch.add_object(properties={...}) # no need for the collection name
  1. I recommend lowering concurrent requests to 2-3.

Concurrent requests affect how many threads are used on the client side to push requests to the server. With 10 concurrent threads, each pushing hundreds of objects, you are asking Weaviate – on the server side – to send hundreds x 10 objects to vectorize on the OpenAI side. This can lead to OpenAI not responding fast enough, which in turn can result in timeouts on the client side.

With 2-3 threads, you are a lot safer, and that is more than enough to run the vectorization at the full speed you can get responses from OpenAI.

  1. No I use Cohere only for reranking, not vectorization. I only use OpenAI as vectorizer. I can remove it from the insertion client when inserting - but will that reduce overhead?
  2. I’m only loading things into a single collection. Is there a difference between using the client vs the collection? The docs don’t say much and just say that there are two options.
  3. I was using the dynamic one instead of the fixed_size one. I can try using the fixed size one then if that is safer.

@sebawita

@sebawita
Here is the code being used for creating the collection:

try:
    client.collections.delete("AnonymizedCollection")
    print("Deleted!")
    collection = client.collections.create(
        name="AnonymizedCollection",
        vectorizer_config=Configure.Vectorizer.text2vec_openai(model="text-embedding-3-large", model_version="003"),
        reranker_config=Configure.Reranker.cohere(model="rerank-english-v3.0"),
        inverted_index_config=Configure.inverted_index(
            stopwords_preset=StopwordsPreset.EN,
            stopwords_additions=["word1", "word2", "word3", "word4", "word5", "word6", "word7", "word8",
                                 "word9", "word10", "word11", "word12", "word13", "word14", "word15",
                                 "word16", "word17", "word18", "word19", "word20", "word21", "word22",
                                 "word23", "word24", "word25", "word26", "word27", "word28", "word29",
                                 "word30", "word31", "word32"
            ]),
        properties=[
            Property(
                name="property1",
                description="Description of property1",
                data_type=DataType.TEXT,
                index_searchable=False,
                skip_vectorization=True
            ),
            Property(
                name="property2",
                description="Description of property2",
                data_type=DataType.TEXT,
                index_searchable=False,
                skip_vectorization=True
            ),
            Property(
                name="property3",
                description="Description of property3",
                data_type=DataType.UUID,
                index_searchable=False,
                skip_vectorization=True
            ),
            # ... rest of the properties in a similar way
            Property(
                name="property20",
                description="Description of property20",
                data_type=DataType.TEXT_ARRAY,
                tokenization=Tokenization.LOWERCASE
            ),
            Property(
                name="property21",
                description="Description of property21",
                data_type=DataType.TEXT_ARRAY,
                tokenization=Tokenization.LOWERCASE
            ),
        ]
    )
finally:
    print("Created and closed!")
    client.close()

So if I have a collection with the same name, I first delete it and re-create it.

Also with collection level inserts, I get error when trying to use the consistency level argument:
TypeError: fixed_size() got an unexpected keyword argument 'consistency_level'

Does collection level not support that, is that only for client level batch imports?

No need to change that. Your configuration is fine :wink:

I was asking because:
Weaviate allows you to work with multiple vectors per object (you can do that with named vectors), and if you had a multi-vector configuration, that could affect the speed of vectorization – more named vectros = more vectorizations per object :wink:

Technically, both approaches are correct.
However, if you are working with a single collection, it is easier to interact with it using the collection object :wink:

Occasionally, dynamic can get it wrong. But the key point with fixed will be to keep concurrent_requests low :wink:

Let me know how it goes.
I hope you make it work.

Small test (not a recommendation for full import)

Something else you can try as a test – this is not how I recommend inserting 300k objects – you could put 1000 objects into an array, then call insert_many, like this:

data1k = [] 
# add data to the array

anon_col = client.collections.get("AnonymizedCollection")
anon_col.data.insert_many(data1k)

This would be a good test to verify that you can insert 1k objects in one go.

Oh, I didn’t realise that. Apologies for the confusion. :pray:
I was typing the code from memory.

ConsistencyLevel.QUORUM is the default value, so you should be fine to skip that setting.

The following should be enough:

anon_col = client.collections.get("AnonymizedCollection")

with anon_col.batch.fixed_size(batch_size=300) as batch:
...

@sebawita
Thanks. I’ll try with the reduced number and see. I guess inserting 300k objects will take a while though with this speed. I’ll try doing it in batches of 20k for now.

I was hoping enabling Async indexing would reduce having to deal with the OpenAI overheads and make imports faster.

Also, will there be issues with vectorizing objects that have most fields as NoneType?

Also, for named vectors, is it better for more expressibility? Currently we implement hybrid search but use the default one vector per object. Would using named vectors improve field-sepecific searches? Suppose we have 4 main properties that we want to query on and include 4 named vectors, and try routing queries to the specific named vectors based on the query, will that improve relevancy results?

hi @aritraban !!

The ASYNC_INDEXING will only affect the indexing, not the vectorization stage. So it will vectorize it first, on ingestion, and then add it to the index later, asynchronously . So I believe it will not affect that specific issue with OpenAi timing out.

If your property doesn’t has a value, it should not be added to the resulting payload that goes to the vectorizer model. So it should not affect it too.

For the named vectors question, it can improve the relevance results if a query is relevant to that specific property.

if you add more properties to a named vector, all those properties will be used to generate the vector, while having one property (or a more specific one) per vector, it will result in a vector that better represent that property alone.

1 Like

Batch size

Just to be on safe side, I didn’t suggest to run col.insert_many(data_here) multiple times. This was designed as a test to make sure you can get 1k objects in without a problem, and also to estimate the time it takes for 1k objects.

Then you should be able to load all 300k objects with batch inserts, but with smaller batches. Tbh. you should be perfectly fine to set batch_size=1000, which should run at a pretty good speed.

with my_col.batch.fixed_size(batch_size=1000) as batch:

Please note that the main delay in the overall ingestion speed is with the embedding model (in your case, OpenAI text-embedding-3-large), as that is usually the slowest part.

Multiple vectors

Splitting is not allways better
Splitting your main vector into multiple vectors is not always better. It all depends on what queries you are trying to run.

please note: you can only query one vector space at a time. i.e. if you create one vector on title, and another on description. Then when you run a query, you need to specify which vector space you want to search through (you can’t query both of them).

You can mix vector properties
You choose what properties are used for each named vector. And the properties can overlap. Let me explain more.

Say we have title, description, author, abstract, and image_url.
You could then create the following vectors, each with a different purpose:

vectorizer_config=[
    # Vectorize the key text properties - to allow searching with the key info, but ignoring image_url which is not useful
    Configure.NamedVectors.text2vec_openai(
        name="overall",
        source_properties=["title", "description", "abstract"],
        model="rerank-english-v3.0"
    ),
    # Vectorize title - to allow searching specifically through the title
    Configure.NamedVectors.text2vec_openai(
        name="title",
        source_properties=["title"],
        model="rerank-english-v3.0"
    ),
    # Vectorize author - to allow searching specifically based on the author name
    Configure.NamedVectors.text2vec_openai(
        name="author",
        source_properties=["author"],
        model="rerank-english-v3.0"
    ),
],

This way, you could run queries that are based on all the key content, or in other cases you may just want to search through the titles, and run a more direct query. The trick is in figuring out what are the key properties you use for different types of queries you expect.

please note: with 3 vectors per object, you will end up with 900k of vectors across 300k objects, which will increase the time and cost to generate the embeddings at import, but also will increase the index size. I am not trying to discourage you :wink: I just want to make sure that you are aware of this :wink:

2 Likes