I’m using the WeaviateIngestOperator
to populate a Weavate DB using airflow. This is my Operator
WeaviateIngestOperator(
task_id="ingest_data",
conn_id=WEAVIATE_CONN_ID,
class_name=COLLECTION["class"],
input_json=retrieve_chunks_df("{{ds}}"),
)
In the documentation of the operator mentions that we can pass the data via xcoms or with a Python Callable. I made the xcoms solution work, however, this is not a best practice since you can clog your DB if you upload many documents, which makes me think that the callable route is the best.
This is my callable:
def retrieve_chunks_df(ds):
blob_client = BLOB_SERVICE_CLIENT.get_blob_client(container="chunks", blob=f"{ds}.parquet")
with BytesIO() as buffer:
download_stream = blob_client.download_blob()
download_stream.readinto(buffer)
buffer.seek(0)
return pd.read_parquet(buffer)
My question is: How can I pass the ds parameter from the operator to the callable?
The jijnja templating “{{ds}}” is not working. It’s passing the literal {{ds}} string. I’m running out of alternatives for how to do it with a Python callable. Has anybody had the chance to do a similar implementation work?
I see that in the example get_data_with_vectors
returns a static list of dictionaries. But how would I do it if I need to use a context parameter to get the data of one specific date which is the whole purpose of Airflow? Any ideas on this?