I’m following the migration script to move from a normal cluster to an HA one with the script given:
def migrate_data(collection_src, collection_tgt):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(
properties=q.properties,
vector=q.vector["default"],
uuid=q.uuid
)
return True
print('Migrating # of tenants: ', len(tenants_src_list))
i = 0
for tenant in tenants_src_list:
print('Migrating tenant: ', tenant.name)
try:
collection_src_tenant = nodes_collection_source.with_tenant(tenant.name)
collection_tgt_tenant = nodes_collection_target.with_tenant(tenant.name)
migrate_data(collection_src_tenant, collection_tgt_tenant)
except Exception as e:
print(e)
continue
i += 1
print('Migrated tenants up to # ', i)
client_source.close()
client_target.close()
But for some tenants, I get this error logged of just default
No other error message in theprint(e) except just that. I’m not sure if this is anything serious before I go ahead with this. I also sent a support email about this in case our specific node(s) are affected
Thanks for the response. I did, but it prints no failed objects and it fails with that error without saying there were any failed objects. There are tenants for which there were failed objects, but for those after I re-run the script, it seems to work.
def migrate_data(collection_src, collection_tgt):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(properties=q.properties, vector=q.vector["default"], uuid=q.uuid)
if collection_tgt.batch.failed_objects:
print('!! FAILED OBJECTS: ')
for failed_object in collection_tgt.batch.failed_objects:
print(failed_object) return True
It might not be the best idea to check failed_objects in real-time during batch processing. It’s often safer to access them after the batch import finishes.
Here’s an internal example I use when testing batching processes — you might find it helpful. You need to tweak it a little for your specific use case, but it should give you a good idea of how I approach the logic.
def batch_upload(client, file_path, collection_name, batch_size=10):
"""
Batch upload data from a CSV file into the specified collection.
"""
if not client.collections.exists(collection_name):
raise Exception(f"Collection '{collection_name}' does not exist. Cannot insert data.")
failed_objects = []
try:
with open(file_path, mode='r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
# Normalize column headers
csv_reader.fieldnames = [header.strip().lower() for header in csv_reader.fieldnames]
with client.batch.fixed_size(batch_size=100, concurrent_requests=2) as batch:
for i, row in enumerate(csv_reader):
# Prepare object properties
obj_properties = {
"company_id": row.get("company_id", ""),
"last_name": row.get("last_name", ""),
"first_name": row.get("first_name", ""),
"job_title": row.get("job_title", ""),
"email_address": row.get("email_address", ""),
"country": row.get("country", ""),
"interaction_notes": row.get("interaction_notes", ""),
}
batch.add_object(
properties=obj_properties,
collection=collection_name
)
print(f"Batch processing completed. {i + 1} objects added.")
except Exception as e:
raise Exception(f"Batch insertion failed: {e}")
# Check for failed objects and reason behind to be printed out
failed_objects = client.batch.failed_objects
if failed_objects:
print(f"Number of failed objects: {len(failed_objects)}")
for i, failed_obj in enumerate(failed_objects, 1):
print(f"Failed object {i}: {failed_obj}")
else:
print(f"All objects successfully inserted into '{collection_name}'.")