Not able to ingest the batches of data

Hey Hi Guys, i am facing so many issues in adding the batch of data during the ingestion part in the weaviate. Tried all the things but there is not a single start to end script which can help me to ingest all the data in the batches…

I tried skimp with this link : Batch import | Weaviate - Vector Database

but nothing seems to be working.

Can anyone help me with it? I really need it…

hi @Vipul_Maheshwari !!

Welcome to our community.

Please, when opening a thread, fill in the requested info, like server version, deployment, etc.

Do you see any error logs? Can you share any code we can reproduce?

Thanks!

Hey @DudaNogueira thanks for reverting back.

From the next time, I will make sure to fill the requested info and other details.

So I have completed this script for batch ingestion, can you just skim through it fast and let me know if there is any kind of error in it:

import numpy as np
import logging
import time
import weaviate
from tqdm import tqdm
import weaviate.classes.config as wc

# Constants
COLLECTION_NAME = "weaviate_test_collection_part6"
NUM_BATCHES = 10
VECTORS_PER_BATCH = 100
VECTOR_SIZE = 1536

# Setup logging
logging.basicConfig(level=logging.INFO)

# Connect to Weaviate
client = weaviate.connect_to_embedded()

# Create Weaviate collection
weaviate_collection = client.collections.create(
    name=COLLECTION_NAME,
    properties=[
        wc.Property(name="item", data_type=wc.DataType.TEXT),
    ],
    vectorizer_config=None
)

# Define the batch generation function
def make_batches(num_batches, vectors_per_batch, vector_size):
    for i in range(num_batches):
        try:
            vectors = np.random.rand(vectors_per_batch, vector_size).astype(np.float32)
            vectors_list = vectors.tolist()
            items = [str(i * vectors_per_batch + j + 1) for j in range(vectors_per_batch)]
            batch = list(zip(items, vectors_list))
            logging.info(f"Successfully generated batch {i+1}/{num_batches}")
            yield batch
        except Exception as e:
            logging.error(f"Error in batch {i+1}: {str(e)}")
            raise

# Main processing loop
try:
    total_time = 0.0
    batch_times = []
    for _batch_index, _batch in enumerate(tqdm(make_batches(num_batches=NUM_BATCHES,  vectors_per_batch=VECTORS_PER_BATCH, vector_size=VECTOR_SIZE), desc="Processing batches", total=NUM_BATCHES)):
        ct = 0
        with weaviate_collection.batch.fixed_size(VECTORS_PER_BATCH) as batch:
            
            batch_start_time = time.time()
            for item, vector in _batch:

                batch.add_object(
                    properties={"item": item},
                    vector=vector
                )

                ct += 1
                
                # If the number of vectors reached VECTORS_PER_BATCH threshold, it means the batch is injected with the desired number of vectors. (Ingestion of one batch is completed)
                if ct % VECTORS_PER_BATCH == 0:
                    duration = time.time() - batch_start_time
                    batch_times.append(duration)
                    total_time += duration
                    print(f"Processed {ct} vectors in batch {_batch_index + 1} of {NUM_BATCHES} in {duration:.2f}s")
    
    print(f"Total processing time: {total_time:.2f}s")
    print(f"Average time per batch: {np.mean(batch_times):.2f}s")

except Exception as e:
    logging.error(f"An error occurred during processing: {str(e)}")
    raise

finally:
    pass

Hi!

Can you try catching those errors?

Check here:

Hey Hi! I think I figured it out, can you just confirm if this sounds good to you, Thanks in advance:

import numpy as np
import logging
import time
import weaviate
from tqdm import tqdm
import weaviate.classes.config as wc

# Constants
COLLECTION_NAME = "weaviate_test_collection_part6"
NUM_BATCHES = 10
VECTORS_PER_BATCH = 100
VECTOR_SIZE = 1536

# Setup logging
logging.basicConfig(level=logging.INFO)
# Connect to Weaviate
client = weaviate.connect_to_embedded()

# Create Weaviate collection
weaviate_collection = client.collections.create(
    name=COLLECTION_NAME,
    properties=[
        wc.Property(name="item", data_type=wc.DataType.TEXT),
    ],
    vectorizer_config=None
)

# Define the batch generation function
def make_batches(num_batches, vectors_per_batch, vector_size):
    for i in range(num_batches):
        try:
            vectors = np.random.rand(vectors_per_batch, vector_size).astype(np.float32)
            vectors_list = vectors.tolist()
            items = [str(i * vectors_per_batch + j + 1) for j in range(vectors_per_batch)]
            batch = list(zip(items, vectors_list))
            logging.info(f"Successfully generated batch {i+1}/{num_batches}")
            yield batch
        except Exception as e:
            logging.error(f"Error in batch {i+1}: {str(e)}")
            raise

# Main processing loop
try:
    total_time = 0.0
    batch_times = []
    for _batch_index, _batch in enumerate(tqdm(make_batches(num_batches=NUM_BATCHES,  vectors_per_batch=VECTORS_PER_BATCH, vector_size=VECTOR_SIZE), desc="Processing batches", total=NUM_BATCHES)):
        ct = 0
        with weaviate_collection.batch.fixed_size(VECTORS_PER_BATCH) as batch:
            
            batch_start_time = time.time()
            for item, vector in _batch:

                batch.add_object(
                    properties={"item": item},
                    vector=vector
                )

                ct += 1
                
                # If the number of vectors reached VECTORS_PER_BATCH threshold, it means the batch is injected with the desired number of vectors. (Ingestion of one batch is completed)
                if ct % VECTORS_PER_BATCH == 0:
                    duration = time.time() - batch_start_time
                    batch_times.append(duration)
                    total_time += duration
                    print(f"Processed {ct} vectors in batch {_batch_index + 1} of {NUM_BATCHES} in {duration:.2f}s")
    
    print(f"Total processing time: {total_time:.2f}s")
    print(f"Average time per batch: {np.mean(batch_times):.2f}s")

except Exception as e:
    logging.error(f"An error occurred during processing: {str(e)}")
    raise

finally:
    pass

Hi!

This seems fine.

Now the idea is that you can experiment with different batch sizes.

Here you can have more info on that:
https://weaviate-python-client.readthedocs.io/en/stable/weaviate.batch.html#module-weaviate.batch.crud_batch

Thanks!

Yes! But i wanted to take the VECTORS_PER_ BATCH variables as my go to for deciding the number of batches I want to ingest at a time…

Thanks for putting this through, sorry for the inconvenience! I am just glad you reviewed the snippet and I am good to go…

That’s ok.

You can probably get interesting results with dynamic, as it will adjust the batch size according to what the server reports back, taking into account the current server load.

Well to be honest, I am running a benchmark for various DBs to understand the time it takes for the ingestion as well as the bottleneck for the server load…

So it would be unfair to change the batch size dynamically

Fair enough :slight_smile:

You can also try enabling the ASYNC_INDEXING, so you don’t need to wait for the indexation step.

1 Like