Templating in an Airflow WeaviateIngestOperator

I’m using the WeaviateIngestOperator to populate a Weavate DB using airflow. This is my Operator


In the documentation of the operator mentions that we can pass the data via xcoms or with a Python Callable. I made the xcoms solution work, however, this is not a best practice since you can clog your DB if you upload many documents, which makes me think that the callable route is the best.

This is my callable:

def retrieve_chunks_df(ds):

    blob_client = BLOB_SERVICE_CLIENT.get_blob_client(container="chunks", blob=f"{ds}.parquet")

    with BytesIO() as buffer:
        download_stream = blob_client.download_blob()


        return pd.read_parquet(buffer)

My question is: How can I pass the ds parameter from the operator to the callable?

The jijnja templating “{{ds}}” is not working. It’s passing the literal {{ds}} string. I’m running out of alternatives for how to do it with a Python callable. Has anybody had the chance to do a similar implementation work?

I see that in the example get_data_with_vectors returns a static list of dictionaries. But how would I do it if I need to use a context parameter to get the data of one specific date which is the whole purpose of Airflow? Any ideas on this?

Hi Ismael! Welcome to our community :hugs:

I am afraid this is more an Airflow question than a Weaviate one :thinking:

I will ask internally to see if we have someone more familiar with Airflow.


Hi Duda!

Nice to talk to you again!

I do agree this is more of an Airflow issue. But it’s highly linked to how the Operator was implemented. Do you know if Weaviate people developed these Airflow operators?

It would be super cool to get in contact with them so we can understand better how they see them implemented in practice.

Have a great day ahead!