Description
Database has 140k objects of ~1KB size each. I am trying to iterate over all and get the ids. I am using the vector database as a key part of my dataloader, and need to be able to get all ids so that i can randomly split into train/test sets.
The queries for getting all objects are very slow β taking 95 seconds for 100k objects, and 101 seconds for 140k objects. This is unscalable because I intend to have 1M objects in the near future.
Additionally, I hit the max gprc limits when using pagination at 100k objects, so I am stuck with just the iterator and applying filters myself afterwards.
How can I better complete queries like this? Should I be sharding or doing multi-tenancy to reduce the query size to get all objects? Or is there a different way to go through all the objects?
Iβve tried techniques like reducing the # fields that are indexed, increasing the grpc timeouts, increasing the grpc default limits to get more objects. Nothing is making these queries faster.
Server Setup Information
- Weaviate Server Version: 1.29
- Deployment Method:weaviate docker in EKS
- Multi Node? Number of Running Nodes: 1
- Client Language and Version: 4.11.x
- Multitenancy?: no
- Server setup matched the benchmark for the ANN nearest neighbors, so hopefully itβs well configured.
Any additional Information
Fastest query I can create:
def _query_once(shard):
weaviate_api_key = os.environ["WEAVIATE_API_KEY"]
auth_credentials = weaviate.auth.AuthApiKey(api_key=weaviate_api_key)
http_forwarded_port = 5000
grpc_forwarded_port = 6000
use_local_host = False
http_host = "localhost" if use_local_host else "weaviate.weaviate.svc.cluster.local"
grpc_host = "localhost" if use_local_host else "weaviate-grpc.weaviate.svc.cluster.local"
grpc_port = grpc_forwarded_port if use_local_host else 50051
http_port = http_forwarded_port if use_local_host else 80
client = weaviate.connect_to_custom(
http_host=http_host,
http_port=http_port,
http_secure=False,
grpc_host=grpc_host,
grpc_port=grpc_port,
grpc_secure=False,
auth_credentials=auth_credentials,
additional_config=AdditionalConfig(
connection=weaviate.config.ConnectionConfig(
session_pool_connections=30,
session_pool_maxsize=200,
session_pool_max_retries=3,
),
timeout=weaviate.config.Timeout(query=60, insert=120, init=30),
),
)
materialized_episodes_db = client.collections.get(db_name)
offset = shard[0]
query_args = shard[1]
if offset > len(materialized_episodes_db):
return []
query_args["offset"] = offset
try:
data = materialized_episodes_db.query.fetch_objects(**query_args)
except Exception as err:
print(err)
return []
return data.objects
def run_map_query(
database,
tasks: list[str],
h5s_after_datetime: datetime | None = None,
h5s_before_datetime: datetime | None = None,
):
filter_on_tasks = Filter.any_of(
[Filter.by_property("task").equal(task_name) for task_name in tasks]
)
filter_set = [filter_on_tasks]
if h5s_after_datetime is not None:
h5s_after_datetime = h5s_after_datetime.replace(tzinfo=timezone.utc)
filter_set.append(Filter.by_creation_time().greater_or_equal(h5s_after_datetime))
if h5s_before_datetime is not None:
h5s_before_datetime = h5s_before_datetime.replace(tzinfo=timezone.utc)
filter_set.append(Filter.by_creation_time().less_or_equal(h5s_before_datetime))
and_filter = Filter.all_of(filter_set)
print(and_filter)
output_vids = []
query = {
"return_properties": [
"unique_id",
"left_video_s3_url",
"right_video_s3_url",
"episode",
],
"filters": and_filter,
"limit": 1000,
"offset": 0,
}
limit = 1000
num_shards = int(len(database) // limit)
shard = [(limit * i, query) for i in range(num_shards)]
start = time.time()
results = list(tqdm(map(_query_once, shard), total=len(shard)))
end = time.time()
print(len(database), " in ", end - start)