Here is the overall method right now.
def add_to_vectorstore(database_name: str, collection_name: str, json_object: json, state, property_field_name: str,
property_value_type: weaviate_datatypes, property_field_value: str, object_ids: [uuid] = None) -> [int, uuid, list]:
"""
This method stores a JSON document in the collection provided
Parameters
----------
database_name : str
Name of the databaseShould be set to the user token retrieved from MongDB during login
collection_name: str
Name of the collection to which this attachment should be added
json_object: json
The document to be stored - must be valid JSON
state: TypeDict
Current state object
property_field_name: str
The name of the field to add
property_value_type: weaviate_datatypes
The value type of the field to add
property_field_value: str
The value of the metadata field to add
object_ids: [int, uuid]
If this is an update then provide original UUIDs
Returns
-------
list
In the format [int, UUID]
uuid
Unique ID of the document added/updated
list
Current state object
"""
diff_count = 0
weaviate_client = weaviate.connect_to_local()
internal_name = clean_field_name(database_name + collection_name)
try:
if not weaviate_client.collections.exists(internal_name):
state["persistent_logs"].append("Creating the collection " + internal_name)
logging.debug("Creating the collection " + internal_name)
user_collection = weaviate_client.collections.create(
name=internal_name,
vectorizer_config=wvc.config.Configure.Vectorizer.text2vec_huggingface(),
properties=[
Property(name=property_field_name, data_type=get_weaviate_type(property_value_type), vectorize_property_name=False),
Property(name="date_added", data_type=get_weaviate_type(weaviate_datatypes.DateTime), vectorize_property_name=True),
],
# Configure the vector index
vector_index_config=wvc.config.Configure.VectorIndex.hnsw( # Or `flat` or `dynamic`
distance_metric=wvc.config.VectorDistances.COSINE,
quantizer=wvc.config.Configure.VectorIndex.Quantizer.bq(),
),
# Configure the inverted index
inverted_index_config=wvc.config.Configure.inverted_index(
index_null_state=True,
index_property_length=True,
index_timestamps=True,
)
)
initial_count = 0
else:
state["persistent_logs"].append("Collection already exists.")
logging.debug("Collection already exists.")
user_collection = weaviate_client.collections.get(internal_name)
aggregation = user_collection.aggregate.over_all(total_count=True)
initial_count = aggregation.total_count
datetime_now = datetime.now()
rfcc = datetime_now.strftime("%Y-%m-%dT%H:%M:%S+00:00")
json_splitter = RecursiveJsonSplitter(max_chunk_size=2000)
json_docs = json_splitter.split_json(json_object, True)
if object_ids is None:
doc_objs = list()
for doc in json_docs:
doc_objs.append(wvc.data.DataObject(
properties={property_field_name: property_field_value, "date_added": rfcc},
vector=doc
))
state["persistent_logs"].append("Adding document to Weaviate")
logging.debug("Adding document to Weaviate")
object_ids = user_collection.data.insert_many(doc_objs)
else:
state["persistent_logs"].append("Updating document in Weaviate")
logging.debug("Updating document in Weaviate")
if len(object_ids) != len(json_docs):
if len(object_ids) > len(json_docs):
while len(object_ids) > len(json_docs):
this_obj = object_ids.pop()
remove_by_id(database_name, collection_name, this_obj, state)
else:
current_count = len(object_ids)
while len(object_ids) < len(json_docs):
object_ids.append(current_count, uuid.uuid4())
current_count += 1
count = 0
while count < len(object_ids):
user_collection.data.replace(uuid=object_ids[count], properties={property_field_name: property_field_value, "date_added": rfcc}, vector=json_docs[count])
aggregation = user_collection.aggregate.over_all(total_count=True)
final_count = aggregation.total_count
diff_count = final_count - initial_count
state["persistent_logs"].append("Started with " + str(initial_count) + " documents, now have " + str(final_count) + " documents")
logging.debug("Started with " + str(initial_count) + " documents, now have " + str(final_count) + " documents")
except:
trace_back = traceback.format_exc()
logging.error("An unexpected error occurred attempting to add document to Weaviate collection: " + internal_name +
"\nHere is the document that failed: " + write_object_to_prompt(json_object) + " \nWith the error:\n " + trace_back)
state["persistent_logs"].append(
"An unexpected error occurred attempting to add document to Weaviate Collection: " + internal_name +
"\nHere is the document that failed: " + write_object_to_prompt(json_object) + " \nWith the error:\n " + trace_back)
finally:
weaviate_client.close()
return diff_count, object_ids, state
And the test call that fails:
state = {
"errors": "",
"persistent_logs": [],
}
test_topic = {
"topic_id": str(uuid.uuid4()),
"topic_name": "some random topic",
"topic_summary": " a test summary",
"conversations": [{
"conversation": {
"conversation_id": str(uuid.uuid4()),
"converation_sender": "email@noreply.com",
"conversation_text": "Some long boring conversation..."
}
}]}
add_number, doc_ids, state = add_to_vectorstore("user-x", "topics", test_topic, state, "topic_id",
weaviate_datatypes.Text, test_topic["topic_id"])