Description
I’m following the migration script to move from a normal cluster to an HA one with the script given:
def migrate_data(collection_src, collection_tgt):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(
properties=q.properties,
vector=q.vector["default"],
uuid=q.uuid
)
return True
print('Migrating # of tenants: ', len(tenants_src_list))
i = 0
for tenant in tenants_src_list:
print('Migrating tenant: ', tenant.name)
try:
collection_src_tenant = nodes_collection_source.with_tenant(tenant.name)
collection_tgt_tenant = nodes_collection_target.with_tenant(tenant.name)
migrate_data(collection_src_tenant, collection_tgt_tenant)
except Exception as e:
print(e)
continue
i += 1
print('Migrated tenants up to # ', i)
client_source.close()
client_target.close()
But for some tenants, I get this error logged of just default
No other error message in theprint(e)
except just that. I’m not sure if this is anything serious before I go ahead with this. I also sent a support email about this in case our specific node(s) are affected
Server Setup Information
- Weaviate Server Version: Servless
- Deployment Method:
- Multi Node? Number of Running Nodes:
- Client Language and Version:
- Multitenancy?: Yes
Any additional Information
Hey @Tejas_Sharma,
Have you tried to print out the failed object in a batch then check upon the reason also maybe apply some retry at the end?
Regards,
Mohamed Shahin,
Weaviate Support
Hey @Mohamed_Shahin ,
Thanks for the response. I did, but it prints no failed objects and it fails with that error without saying there were any failed objects. There are tenants for which there were failed objects, but for those after I re-run the script, it seems to work.
def migrate_data(collection_src, collection_tgt):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(properties=q.properties, vector=q.vector["default"], uuid=q.uuid)
if collection_tgt.batch.failed_objects:
print('!! FAILED OBJECTS: ')
for failed_object in collection_tgt.batch.failed_objects:
print(failed_object) return True
Is this code correct for printing it out?
Hey @Tejas_Sharma,
I hope you’re having a great weekend!
It might not be the best idea to check failed_objects
in real-time during batch processing. It’s often safer to access them after the batch import finishes.
Here’s an internal example I use when testing batching processes — you might find it helpful. You need to tweak it a little for your specific use case, but it should give you a good idea of how I approach the logic.
def batch_upload(client, file_path, collection_name, batch_size=10):
"""
Batch upload data from a CSV file into the specified collection.
"""
if not client.collections.exists(collection_name):
raise Exception(f"Collection '{collection_name}' does not exist. Cannot insert data.")
failed_objects = []
try:
with open(file_path, mode='r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
# Normalize column headers
csv_reader.fieldnames = [header.strip().lower() for header in csv_reader.fieldnames]
with client.batch.fixed_size(batch_size=100, concurrent_requests=2) as batch:
for i, row in enumerate(csv_reader):
# Prepare object properties
obj_properties = {
"company_id": row.get("company_id", ""),
"last_name": row.get("last_name", ""),
"first_name": row.get("first_name", ""),
"job_title": row.get("job_title", ""),
"email_address": row.get("email_address", ""),
"country": row.get("country", ""),
"interaction_notes": row.get("interaction_notes", ""),
}
batch.add_object(
properties=obj_properties,
collection=collection_name
)
print(f"Batch processing completed. {i + 1} objects added.")
except Exception as e:
raise Exception(f"Batch insertion failed: {e}")
# Check for failed objects and reason behind to be printed out
failed_objects = client.batch.failed_objects
if failed_objects:
print(f"Number of failed objects: {len(failed_objects)}")
for i, failed_obj in enumerate(failed_objects, 1):
print(f"Failed object {i}: {failed_obj}")
else:
print(f"All objects successfully inserted into '{collection_name}'.")
Try that and let me know
Regards,
Mohamed Shahin,
Weaviate Support