Slow queries to fetch all objects

Description

Database has 140k objects of ~1KB size each. I am trying to iterate over all and get the ids. I am using the vector database as a key part of my dataloader, and need to be able to get all ids so that i can randomly split into train/test sets.

The queries for getting all objects are very slow – taking 95 seconds for 100k objects, and 101 seconds for 140k objects. This is unscalable because I intend to have 1M objects in the near future.

Additionally, I hit the max gprc limits when using pagination at 100k objects, so I am stuck with just the iterator and applying filters myself afterwards.

How can I better complete queries like this? Should I be sharding or doing multi-tenancy to reduce the query size to get all objects? Or is there a different way to go through all the objects?

I’ve tried techniques like reducing the # fields that are indexed, increasing the grpc timeouts, increasing the grpc default limits to get more objects. Nothing is making these queries faster.

Server Setup Information

  • Weaviate Server Version: 1.29
  • Deployment Method:weaviate docker in EKS
  • Multi Node? Number of Running Nodes: 1
  • Client Language and Version: 4.11.x
  • Multitenancy?: no
  • Server setup matched the benchmark for the ANN nearest neighbors, so hopefully it’s well configured.

Any additional Information

Fastest query I can create:

def _query_once(shard):
    weaviate_api_key = os.environ["WEAVIATE_API_KEY"]
    auth_credentials = weaviate.auth.AuthApiKey(api_key=weaviate_api_key)
    http_forwarded_port = 5000
    grpc_forwarded_port = 6000

    use_local_host = False
    http_host = "localhost" if use_local_host else "weaviate.weaviate.svc.cluster.local"
    grpc_host = "localhost" if use_local_host else "weaviate-grpc.weaviate.svc.cluster.local"
    grpc_port = grpc_forwarded_port if use_local_host else 50051
    http_port = http_forwarded_port if use_local_host else 80
    client = weaviate.connect_to_custom(
        http_host=http_host,
        http_port=http_port,
        http_secure=False,
        grpc_host=grpc_host,
        grpc_port=grpc_port,
        grpc_secure=False,
        auth_credentials=auth_credentials,
        additional_config=AdditionalConfig(
            connection=weaviate.config.ConnectionConfig(
                session_pool_connections=30,
                session_pool_maxsize=200,
                session_pool_max_retries=3,
            ),
            timeout=weaviate.config.Timeout(query=60, insert=120, init=30),
        ),
    )
    materialized_episodes_db = client.collections.get(db_name)

    offset = shard[0]
    query_args = shard[1]
    if offset > len(materialized_episodes_db):
        return []
    query_args["offset"] = offset
    try:
        data = materialized_episodes_db.query.fetch_objects(**query_args)
    except Exception as err:
        print(err)
        return []

    return data.objects

def run_map_query(
    database,
    tasks: list[str],
    h5s_after_datetime: datetime | None = None,
    h5s_before_datetime: datetime | None = None,
):
    filter_on_tasks = Filter.any_of(
        [Filter.by_property("task").equal(task_name) for task_name in tasks]
    )
    filter_set = [filter_on_tasks]
    if h5s_after_datetime is not None:
        h5s_after_datetime = h5s_after_datetime.replace(tzinfo=timezone.utc)
        filter_set.append(Filter.by_creation_time().greater_or_equal(h5s_after_datetime))
    if h5s_before_datetime is not None:
        h5s_before_datetime = h5s_before_datetime.replace(tzinfo=timezone.utc)
        filter_set.append(Filter.by_creation_time().less_or_equal(h5s_before_datetime))
    and_filter = Filter.all_of(filter_set)


    print(and_filter)
    output_vids = []
    query = {
        "return_properties": [
            "unique_id",
            "left_video_s3_url",
            "right_video_s3_url",
            "episode",
        ],
        "filters": and_filter,
        "limit": 1000,
        "offset": 0,
    }

    limit = 1000
    num_shards = int(len(database) // limit)
    shard = [(limit * i, query) for i in range(num_shards)]
    start = time.time()
    results = list(tqdm(map(_query_once, shard), total=len(shard)))
    end = time.time()
    print(len(database), " in ", end - start)

hi @kathleenb45 !!

Welcome to our community :hugs:

In order to fetch all objects, the best way is using the iterator:

for instance:

collection = client.collections.get("WineReview")

for item in collection.iterator():
    print(item.uuid, item.properties)

Using fetch_objects will eventually hit the limit imposed by QUERY_MAXIMUM_RESULTS environment variable as documented here.

depending on the dataset size, you could try increasing that variable and getting all the objects at once. :thinking:

I am not sure how that would compare to running the iterator and while not scalable may be worth a try.

Let me know if this helps!

Thanks!

Hi Duda,

Thank you for the response! I will try out just increasing the query limit environment variable and see how that goes!

I’ve tested out the iterator as well, and it does work well and can exceed that grpc limit, however I wish I could apply filters on it similar to the regular fetch_objects queries!

Hi @kathleenb45,

If you are only interested in UUIDs, you can specify which properties you would like to get back, and or even request to not return any properties to make the request smaller, like this:

Iterator

collection = client.collections.get("WineReview")

for item in collection.iterator(return_properties=["name", "description"])
    print(item.uuid, item.properties)

for item in collection.iterator(return_properties="") #no properties
    print(item.uuid, item.properties) # item properties will return {}

Fetch

result = collection.query.fetch_objects(
    limit=5,
    return_properties=""
)
1 Like