Description
I have written some code that ingests data into weaviate from a postgres or a pandas dataframe. This code relies mostly on methods in weaviate-client
(4.6.1
). See below for some basics on how it works (this is not a comprehensive working example)
class BatchStream:
'''
a class that allows us to batch-stream
data from a postgres db into weaviate.
'''
type_mapping = {
'int': wc.DataType.INT,
'str': wc.DataType.TEXT,
'float': wc.DataType.NUMBER,
'bool': wc.DataType.BOOL,
'datetime': wc.DataType.DATE
}
def __init__(
self,
client: Client,
vectorizer_config: wc.Configure.Vectorizer,
generative_config: wc.Configure.Generative,
postgres_string: str = 'test',
weaviate_batch_size: int = 100,
n_workers: int = 4,
) -> None:
self.client = client
self.vectorizer_config = vectorizer_config
self.generative_config = generative_config
self.postgres_string = postgres_string
self.weaviate_batch_size = weaviate_batch_size
self.n_workers = n_workers
logging.info("connecting to postgres db...")
self.engine = create_engine(self.postgres_string)
logging.info("connected to database")
def stream_insert_many(
self,
collection: str,
create_collection: bool = True,
query: str | None = None,
df: pd.DataFrame | None = None,
properties: Optional[Union[List[wc.Property], str]] = 'infer') -> None:
'''
method that streams data retrieved from a
postgres query, or a df, into weaviate.
'''
if df and query:
raise ValueError("Supply either a dataframe or a sql query, not both.")
if df is None and query is None:
raise ValueError("Supply either a dataframe or a sql query.")
if properties == 'infer':
properties = self._infer_weaviate_properties(df, query)
if create_collection:
coll = self.client.collections.create(
name=collection,
properties=properties,
# vectorizer
vectorizer_config=self.vectorizer_config,
# generative module
generative_config=self.generative_config
)
else:
coll = self.client.collections.get(collection)
insert_obj = []
if df is None and query is not None:
for chunk in self._sql_query_to_chunked_df(query):
insert_obj.extend(chunk.to_dict(orient='records'))
else:
insert_obj = df.to_dict(orient='records')
coll.data.insert_many(insert_obj)
Now, I can easily access this data through weaviate-client, such as count it or run similarity_search
on it.
For the purpose of building a RAG application, I want to query it via langchain
. Langchain seems to be pretty straight-forward, and well integrated with weaviate via the langchain-weaviate
package for python. So, following various tutorials, e.g. 1, 2 or 3, I am able to init all the required objects using langchain-weaviate
:
from langchain_weaviate.vectorstores import WeaviateVectorStore
from langchain_core.messages import HumanMessage
from langchain_openai import OpenAIEmbeddings, ChatOpenAI, OpenAI
from langchain.chains import ChatVectorDBChain
client = . . .
client.is_ready()
>>> True
vectorstore = WeaviateVectorStore(
embedding=embeddings,
client=client,
index_name="my_collection",
text_key="my_target_key"
)
Now, I want to query my collection via the methods on the langchain_weaviate.vectorstores.WeaviateVectorstore
. I attempt it like so:
vectorstore.similarity_search('test')
Now, this returns an empty list []
.
If instead I run collection.query.near_text()
from the weaviate client, I get relevant documents returned.
Hence follows the question:
How can I perform queries on a weaviate collection via langchain if data was ingested using the weaviate client library?
Server Setup Information
- Weaviate Server Version: 1.24.13
- Deployment Method: weaviate cloude
- Multi Node? Number of Running Nodes:
- Client Language and Version: Python 3.11.3, client v 4.6.1
- Multitenancy?: No