Weaviate upload for 200k+ json files

Description

I am trying to load 200k+ files in weaviate (flatten and vectorize) and I am running into this error after 50k files:

{‘message’: ‘Failed to send all objects in a batch of 5’, ‘error’: “WeaviateInsertManyAllFailedError(‘Every object failed during insertion. Here is the set of all errors: read response body: context deadline exceeded (Client.Timeout or context cancellation while reading body)’)”}
{‘message’: ‘Failed to send 5 objects in a batch of 5. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.’}
Batch had 5 failures at 57265
[WinError 10054] An existing connection was forcibly closed by the remote host
Weaviate not responding.

My weaviate docker conatiner stops and then during querying, I get this error
Traceback (most recent call last):
File “C:\Users\j.kaur\AI\librechat-local\docker\redmine_weaviate\venv\rough.py”, line 617, in
query_redmine_agent(“what is the currently qualified life of the B2 kero pump”)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “C:\Users\j.kaur\AI\librechat-local\docker\redmine_weaviate\venv\rough.py”, line 581, in query_redmine_agent
response = collection.generate.hybrid(
query=query,
…<10 lines>…
)
)
File “C:\Users\j.kaur\AI\librechat-local.venv\Lib\site-packages\weaviate\collections\queries\hybrid\generate\executor.py”, line 527, in hybrid
return executor.execute(
~~~~~~~~~~~~~~~~^
response_callback=resp,
^^^^^^^^^^^^^^^^^^^^^^^
method=self._connection.grpc_search,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ request=request, ^^^^^^^^^^^^^^^^ )
^
File “C:\Users\j.kaur\AI\librechat-local.venv\Lib\site-packages\weaviate\connect\executor.py”, line 99, in execute
return cast(T, exception_callback(e))
~~~~~~~~~~~~~~~~~~^^^
File “C:\Users\j.kaur\AI\librechat-local.venv\Lib\site-packages\weaviate\connect\executor.py”, line 38, in raise_exception
raise e
File “C:\Users\j.kaur\AI\librechat-local.venv\Lib\site-packages\weaviate\connect\executor.py”, line 80, in execute
call = method(*args, **kwargs)
File “C:\Users\j.kaur\AI\librechat-local.venv\Lib\site-packages\weaviate\connect\v4.py”, line 968, in grpc_search
raise WeaviateQueryError(str(error.details()), “GRPC search”) # pyright: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
weaviate.exceptions.WeaviateQueryError: Query call with protocol GRPC search failed with message Deadline Exceeded.

What should be done regarding this? I have also increased the timeout limits, the upload has gone very slow.

additional_config=AdditionalConfig(

    timeout=Timeout(

        init=60,

        query=180,

        insert=900

    )

Server Setup Information

  • Weaviate Server Version:
  • Deployment Method:
  • Multi Node? Number of Running Nodes:
  • Client Language and Version:
  • Multitenancy?:

Any additional Information

hi @jasnoor !!

How are you vectorizing your content? Are you running the vectorizer on the same machine, for example? This kind of issue can surface in this kind of scenario.

Also, how are you using batch to insert? as described in here.

If you do not have a fast service for vectorization, you can think about reducing batch sizes, and increasing the env var MODULES_CLIENT_TIMEOUT (defaults to 50s). Also, make sure ASYNC INDEXING is enabled, as it will spread the load while ingesting.

So, if your vectorizer takes more than 50s, the module client, that reaches out to the service, will time out.

Also, which versions are you running? What are the resources usage and limits for this cluster?

Thanks!

i am using ada embedding and yes, running the vectorizer on the same machine.

I am using batch to insert-
Load function is as follows:
def load_issues(data_path=“extract/data”):

collection = client.collections.get("RedmineIssue")

json_files = sorted(list(Path(data_path).glob("\*.json")))

total_files = len(json_files)



print(f"Found {total_files} JSON files")

print(f"Loading issues...")



batch_size = 200

max_batch_errors = 20

wait_time = 0.5

error_count = 0

small_issues = 0

chunked_issues = 0

consecutive_failures = 0

max_consecutive_failures = 5



checkpoint_file = Path("load_checkpoint.txt")

start_index = 0

if checkpoint_file.exists():

    start_index = int(checkpoint_file.read_text().strip())

    db_count = get_collection_count()

    print(f"Resuming from file {start_index}")

    print(f"Current DB count: {db_count} records")



loaded_count = start_index



for i in range(start_index, total_files, batch_size):

    chunk = json_files\[i:i + batch_size\]

    

    if consecutive_failures > 0 and not check_weaviate_health():

        print(f"Weaviate not responding. Saving checkpoint at {i}")

        checkpoint_file.write_text(str(i))

        time.sleep(10)

        continue

    

    try:

        with collection.batch.fixed_size(batch_size=200) as batch:

            for json_file in chunk:

                try:

                    with open(json_file) as f:

                        issue = json.load(f)

                    

                    token_estimate = estimate_issue_tokens(issue)

                    

                    if token_estimate > 5000:

                        split_records = split_issue_into_records(issue)

                        chunked_issues += 1

                        for record in split_records:

                            batch.add_object(properties=record)

                            

                            if batch.number_errors > max_batch_errors:

                                print(f"Batch import stopped due to excessive errors ({batch.number_errors} errors)")

                                checkpoint_file.write_text(str(i))

                                break

                    else:

                        flattened_issue = flatten_json(issue)

                        small_issues += 1

                        batch.add_object(properties=flattened_issue)

                        

                        if batch.number_errors > max_batch_errors:

                            print(f"Batch import stopped due to excessive errors ({batch.number_errors} errors)")

                            checkpoint_file.write_text(str(i))

                            break

                    

                    del issue

                        

                except Exception as e:

                    print(f"Error loading {json_file.name}: {str(e)}")

                    error_count += 1

            

            if batch.number_errors > max_batch_errors:

                break

        

        loaded_count += len(chunk)

        

        failed_objects = collection.batch.failed_objects

        if failed_objects:

            failed_count = len(failed_objects)

            error_count += failed_count

            consecutive_failures += 1

            print(f"Number of failed imports: {failed_count}")

            print(f"First failed object: {failed_objects\[0\]}")

            

            if consecutive_failures >= max_consecutive_failures:

                print(f"Stopping due to {consecutive_failures} consecutive failures")

                checkpoint_file.write_text(str(i))

                break

            

            time.sleep(3)

        else:

            consecutive_failures = 0

        

        checkpoint_file.write_text(str(i + batch_size))

        

        db_count = get_collection_count()

        print(f"Progress: {loaded_count}/{total_files} files | DB: {db_count} records | Small: {small_issues} | Chunked: {chunked_issues} | Errors: {error_count}")

        

        if loaded_count % 200 == 0:

            gc.collect()

            time.sleep(1)

        

    except Exception as e:

        print(f"Batch error at {loaded_count}: {str(e)}")

        checkpoint_file.write_text(str(i))

        error_count += batch_size

        consecutive_failures += 1

        

        if consecutive_failures >= max_consecutive_failures:

            print(f"Too many errors. Stopping.")

            break

        

        time.sleep(5)



if loaded_count >= total_files:

    checkpoint_file.unlink(missing_ok=True)

    print("All files processed successfully")



final_count = get_collection_count()

Also, the versions are as follows:
“weaviate>=0.1.2”,
“weaviate-client>=4.19.2”,

what would be a good batch size to consider?

I believe it’s less on the batching size and more on the resources available. Is it running on docker? Can you observe resource consumption during this operation?

Also which server version? Make sure to use latest. 0.2.1 is not a current version. Use 1.35+

I am getting this error:
{‘message’: ‘Failed to send all objects in a batch of 200’, ‘error’: ‘WeaviateInsertManyAllFailedError(‘Every object failed during insertion. Here is the set of all errors: resolve node name “40104336a1f0” to host’)’}
{‘message’: ‘Failed to send 200 objects in a batch of 200. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.’}
Number of failed imports: 200

The docker stats show
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
380b484f8d34 weaviate 1.07% 132.8MiB / 1.914GiB 6.77% 23.8MB / 902kB 70.4MB / 139kB 18

Version: 1.32.2
Is this version not good enough to work?

It is good. However we suggest always to running on latest version. At least on 1.32.latest as we backport the most important fixes. We recommend being at least 3 versions down from latest. and we are about to release 1.36 :slight_smile:

Can you check the usage of the vectorizer?

Also, do you see any logs on server? Can you print the contents of collection.batch.failed_objects when it fails?

And finally, have you tried using an external service, like OpenAI for example? I believe the error is mostly coming from the vectorization step.

Let me know if this helps!