Description
I am trying to use weaviate v4 python client to batch import data into my weaviate. This is the code setup:
client = weaviate.connect_to_local(WEAVIATE_HOST, WEAVIATE_PORT)
data_jsons = ... # a list of dict of key/values that match up with the collection schema
collection = client.collections.get('my_collection')
try:
with collection.batch.dynamic() as batch:
for a_json in tqdm(data_jsons[:10000]):
key = create_key(a_json) # could be a hash of the data
vector = a_json.pop('vector') # bring my own vector use case
batch.add_object(properties=a_json,
uuid=key,
vector=vector)
failed_objects = collection.batch.failed_objects
if len(failed_objects) > 0:
raise Exception(f"Failed to insert {len(failed_objects)} objects")
except Exception as e:
print(f"Error: {e}")
when thereās intermittent failure, it will complete and failed_objects will indeed be >0, such that I can raise the error to the caller.
However, if the weaviate instance is permanently down (I just pause it to simulate this), then the above code will take a long time to complete and slowly printing out something like:
UserWarning: Bat003: The dynamic batch-size could not be refreshed successfully: error WeaviateTimeoutError('The request to Weaviate timed out while awaiting a response. Try adjusting the timeout config for your client. Details: ')
warnings.warn(
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 260 objects in a batch of 260. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
{'message': 'Failed to send 110 objects in a batch of 110. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}
Error: Failed to insert 1930 objects
I think these are from the logger in weaviate and it seems add_object never throw any exceptions all along (so the try/except is actually useless above). What I want to achieve is if there are 3 messages like this getting triggered, I want it to just quit and throw exception. Right now, it seems to be waiting for a timeout, then do something, trigger that message, then timeout again, which result in this code running for a very long time before it hits my raise Exception.
Is there a proper way to handle connection error (e.g. if weaviate instance just died)? my goal is I dont want a very large batch import job to get stuck forever.
Server Setup Information
- Weaviate Server Version: 1.27.0
- Deployment Method: docker on Mac OS
- Multi Node? Number of Running Nodes: 1 (no multi tenancy, no replication, no cluster)
- Client Language and Version: En
- Multitenancy?: No
Any additional Information
I didnt specify any specific timeout in the client. its just plain simple connect_to_local(WEAVIATE_HOST, WEAVIATE_PORT)
As a part of the solution, in the broad context of batch import job monitoring, I launched this with inside a celery task and then build something else to track the status of the task(_id). And then send off an alarm if it takes longer than expected.
Still it would be better to have the the batch import stops on its own and not hog the cpu until our devops/support come check it out.
hi @00.lope.naughts !!
I believe that the batch import will timeout according to the timeout configuration, as documented here:
maybe, for that batch process, you can initiate the client with a configured timeout?
Will check it out. Although, in my setup, I provided no timeout config, Just:
client = weaviate.connect_to_local(WEAVIATE_HOST, WEAVIATE_PORT).
I now set it to timeout in 1 sec explicitly like this:
client = weaviate.connect_to_local(host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
additional_config=AdditionalConfig(
timeout=Timeout(init=1, query=1, insert=1) # Values in seconds
)
)
I still see the same behavior when I hit Pause in the middle of the import. it immediately printed:
ā¦/python39_env/lib/python3.9/site-packages/weaviate/warnings.py:295](weaviate/warnings.py:295): UserWarning: Bat003: The dynamic batch-size could not be refreshed successfully: error WeaviateTimeoutError('The request to Weaviate timed out while awaiting a response. Try adjusting the timeout config for your client. Details: ') warnings.warn(
but then stuck for 2-3 min before printing:
{āmessageā: āFailed to send 62 objects in a batch of 62. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.ā}
{āmessageā: āFailed to send 62 objects in a batch of 62. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.ā}
and then went on for a long while without apparent progress. I hit start the weaviate container and then it proceeded successfully.
It seems to me that while it is robust against temporary downtime, like a few min, and able to proceed, i canāt seem to control this error condition such that it returns control back to the rest of my code.
while this isnt necessary severe in my case since I run this in a fork process like a celery task, and I can always ping its backend for status. I am still curious how to convince the batch import to just give up (e.g. after several rounds of whatever it was trying to do).
Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.
Have you inspected this as the error message suggest?
here you find some documentation on proper error handling:
My issue is not being unable to handle message. For one, I know what the error is, it is a disconnect + timeout, 'cos I intentionally caused it in the test.
The more severe issue is that it will take a long long time for it to even deliver you those failed_objects.
as I mentioned, I have no problem with this if the disconnect is only intermittent, and I did observed it before in a realistic run, and capable of inspecting those failed_objects, and do the appropriate retry.
I am not sure if I have made the issue clear enough.
Oh, I think I got it.
You mean you are are facing a timeout error, and cannot see those objects returning as failed?
Not entirely (partially true). But to be precise, it is the timeout mechanism thats problematic, but the ātimeout errorā is fully expected since I myself āunplugā my weaviate as a sanity test.
client = weaviate.connect_to_local(host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
additional_config=AdditionalConfig(
timeout=Timeout(init=1, query=1, insert=1) # Values in seconds
)
)
I have used a timeout of 1 sec. I noticed this indeed work for querying and objects counting, just not during batch import. So timeout erroring behaviour are correct for query/count (maybe others), but incorrect for batch import.
what I observed is the batch import will effectively stall, and it seems to āworkā through the entire data (which can be very large), and I have to sometimes wait >1 hr before it get to the code where I actually went to read the failed objects, and that part worked (I am able to get a specific failed count, attribute of the json, the full objects).
So what you said ācannot see those objects returning as failedā is NOT true, but you do have to wait for a long time. And ātimeout errorā seems to be failure of timeout mechanism itself, but in my experiment, the timeout error is to be expected, just not happening in 1sec as promised.
I hope this characterize the condition I am facing. While this isnt a showstopper, and I dont even know what the right design should be, given that I donāt know all the constraint.