Thoughts on upsert

I’m implementing some upsert logic in my code and want to get thoughts if there is a better way to do it or if there are logic gaps.

The algorithm uses a doc_key which uniquely identifies a document and may be comprised of multiple chunks/objects. Zero, some or all of the objects associated with a document may need to be updated.

  1. Given a list of multiple documents to ingest with a doc_key and uuids
  2. For each doc_key I get a list of all of the chunks that are to be ingested and a list of all existing chunks for the doc_key. Using set difference I make a list of uuids to be ingested (new chunks that are not in the existing), uuids to be deleted (existing chunks which are not in the new list).
  3. Ingest all docs to be ingested.
  4. Check for errors on ingest.
  5. If there are errors remove (rollback) the newly ingested objects for that document.
  6. For any documents that don’t have errors delete the old objects.
  7. If there are errors, through an error and retry (in upstream code) as needed.

Some known issues with this implementation are:

  1. is only partially atomic in that if the task fails between ingest and checking for errors the retried task will not know where to pickup. It will notice that there is a difference between old and new chunks and SHOULD do the right thing and re-attempt upsert. But needs testing.
  2. does not use the context manager with call back for the batching.
  3. This is designed for a single node weaviate instance. More work needs to be done to account for multi-node Weaviate, thread safety, locking (ie. if multiple tasks are updating the same docs).

Is there anything else that I’m missing?

Code is at https://github.com/mpgreg/ask-astro/blob/add_baseline_test/airflow/include/tasks/ingest.py

This code also assumes there is one worker.

Hi @mgreg !

This is a really interesting question. We have started some discussion internally and we should get back here when we get some insights.

Thanks!!