Not able to ingest the batches of data

Hey Hi Guys, i am facing so many issues in adding the batch of data during the ingestion part in the weaviate. Tried all the things but there is not a single start to end script which can help me to ingest all the data in the batches…

I tried skimp with this link : Batch import | Weaviate - Vector Database

but nothing seems to be working.

Can anyone help me with it? I really need it…

hi @Vipul_Maheshwari !!

Welcome to our community.

Please, when opening a thread, fill in the requested info, like server version, deployment, etc.

Do you see any error logs? Can you share any code we can reproduce?

Thanks!

Hey @DudaNogueira thanks for reverting back.

From the next time, I will make sure to fill the requested info and other details.

So I have completed this script for batch ingestion, can you just skim through it fast and let me know if there is any kind of error in it:

import numpy as np
import logging
import time
import weaviate
from tqdm import tqdm
import weaviate.classes.config as wc

# Constants
COLLECTION_NAME = "weaviate_test_collection_part6"
NUM_BATCHES = 10
VECTORS_PER_BATCH = 100
VECTOR_SIZE = 1536

# Setup logging
logging.basicConfig(level=logging.INFO)

# Connect to Weaviate
client = weaviate.connect_to_embedded()

# Create Weaviate collection
weaviate_collection = client.collections.create(
    name=COLLECTION_NAME,
    properties=[
        wc.Property(name="item", data_type=wc.DataType.TEXT),
    ],
    vectorizer_config=None
)

# Define the batch generation function
def make_batches(num_batches, vectors_per_batch, vector_size):
    for i in range(num_batches):
        try:
            vectors = np.random.rand(vectors_per_batch, vector_size).astype(np.float32)
            vectors_list = vectors.tolist()
            items = [str(i * vectors_per_batch + j + 1) for j in range(vectors_per_batch)]
            batch = list(zip(items, vectors_list))
            logging.info(f"Successfully generated batch {i+1}/{num_batches}")
            yield batch
        except Exception as e:
            logging.error(f"Error in batch {i+1}: {str(e)}")
            raise

# Main processing loop
try:
    total_time = 0.0
    batch_times = []
    for _batch_index, _batch in enumerate(tqdm(make_batches(num_batches=NUM_BATCHES,  vectors_per_batch=VECTORS_PER_BATCH, vector_size=VECTOR_SIZE), desc="Processing batches", total=NUM_BATCHES)):
        ct = 0
        with weaviate_collection.batch.fixed_size(VECTORS_PER_BATCH) as batch:
            
            batch_start_time = time.time()
            for item, vector in _batch:

                batch.add_object(
                    properties={"item": item},
                    vector=vector
                )

                ct += 1
                
                # If the number of vectors reached VECTORS_PER_BATCH threshold, it means the batch is injected with the desired number of vectors. (Ingestion of one batch is completed)
                if ct % VECTORS_PER_BATCH == 0:
                    duration = time.time() - batch_start_time
                    batch_times.append(duration)
                    total_time += duration
                    print(f"Processed {ct} vectors in batch {_batch_index + 1} of {NUM_BATCHES} in {duration:.2f}s")
    
    print(f"Total processing time: {total_time:.2f}s")
    print(f"Average time per batch: {np.mean(batch_times):.2f}s")

except Exception as e:
    logging.error(f"An error occurred during processing: {str(e)}")
    raise

finally:
    pass

Hi!

Can you try catching those errors?

Check here: