I’m implementing some upsert logic in my code and want to get thoughts if there is a better way to do it or if there are logic gaps.
The algorithm uses a doc_key
which uniquely identifies a document and may be comprised of multiple chunks/objects. Zero, some or all of the objects associated with a document may need to be updated.
- Given a list of multiple documents to ingest with a doc_key and uuids
- For each doc_key I get a list of all of the chunks that are to be ingested and a list of all existing chunks for the doc_key. Using set difference I make a list of uuids to be ingested (new chunks that are not in the existing), uuids to be deleted (existing chunks which are not in the new list).
- Ingest all docs to be ingested.
- Check for errors on ingest.
- If there are errors remove (rollback) the newly ingested objects for that document.
- For any documents that don’t have errors delete the old objects.
- If there are errors, through an error and retry (in upstream code) as needed.
Some known issues with this implementation are:
- is only partially atomic in that if the task fails between ingest and checking for errors the retried task will not know where to pickup. It will notice that there is a difference between old and new chunks and SHOULD do the right thing and re-attempt upsert. But needs testing.
- does not use the context manager with call back for the batching.
- This is designed for a single node weaviate instance. More work needs to be done to account for multi-node Weaviate, thread safety, locking (ie. if multiple tasks are updating the same docs).
Is there anything else that I’m missing?
Code is at https://github.com/mpgreg/ask-astro/blob/add_baseline_test/airflow/include/tasks/ingest.py