Messing up search results under parallel write operations!

Description

It seems like weaviate have some kind of buffer corruption on search under parrallel write requests.
We have a service that is crusial for accuracy and recall for searching events and while high write requests Weaviate starts muss search results.

Each object in our Weaviate config has some key properties that we need to match after.

For example:
Search vector:

  • key_id: 123456
  • vector (some new generated vector through our model)

Search results without parallel requests:

"similars":[
    {
      "created":"2025-04-13T14:49:32.243000Z",
      "id":"f7ed408c-cbde-4749-ac8e-96d0d8fc984b",
      "key_id":"**123456**",  # id same as we searched
      "updated":"2025-04-13T14:49:32.243000Z",
      "distance":**0.3067777156829834**
    },
    {
      "created":"2025-01-30T13:51:27.624000Z",
      "id":"5105ea9a-c9b6-4cc2-a044-e25b604729c7",
      "key_id":"`567890`",  # different id
      "updated":"2025-01-30T13:51:27.624000Z",
      "distance":`0.3187061548233032`
    },
    {
      "created":"2025-01-30T10:01:03.661000Z",
      "id":"16ec6bd3-d51e-4886-b9d5-d85a5bbf61f5",
      "key_id":"~098765~",  # another different id
      "updated":"2025-01-30T10:01:03.661000Z",
      "distance":~0.3222454786300659~
    },
    {
      "created":"2025-05-21T02:23:41.679000Z",
      "id":"de5d970f-461d-4765-a8ac-cb73a6b24f0a",
      "key_id":"573211",
      "updated":"2025-05-21T02:23:41.679000Z",
      "distance":0.6350002288818359
    }
]

Next I took the same vector that I used to search and got following results (correct):

"similars": [
    {
      "created": "2025-01-30T12:23:28.038000Z",
      "id": "ab9957d5-8304-4099-9b4d-c47a71016c6c",
      "key_id": "**123456**",  # same key_id
      "updated": "2025-01-30T12:23:28.038000Z",
      "distance": **0.3067777156829834** # even the distance the same
    },
    {
      "created": "2025-06-06T10:26:41.979000Z",
      "id": "1d5e69d3-48ba-4f57-a612-0c7ca5790fbf",
      "key_id": "**123456**",  # now correct object
      "updated": "2025-06-06T10:26:41.979000Z",
      "distance": `0.3187061548233032`  # the same distance as before BUT correct obj now
    },
    {
      "created": "2025-04-13T14:49:32.243000Z",
      "id": "f7ed408c-cbde-4749-ac8e-96d0d8fc984b",
      "key_id": "**123456**",  # again correct key_id
      "updated": "2025-04-13T14:49:32.243000Z",
      "distance": ~0.3222454786300659~  # and again the same distance with correct obj
    },
    {
      "created": "2025-01-30T13:51:27.624000Z",
      "id": "5105ea9a-c9b6-4cc2-a044-e25b604729c7",
      "key_id": "`567890`",  # obj key_id that was on 2nd place now on 4th with correct distance
      "updated": "2025-01-30T13:51:27.624000Z",
      "distance": 0.6350002288818359  # correct distance for obj "key_id": "`567890`"
    }
]

So as you can see we expect the search results to be as the second search but when there are parallel write requests with search requests there are such kind of issues.

Please also mention that the distances while the search was correct and only objs itself was messed up!

Server Setup Information

  • Weaviate Server Version: 1.29.4 (Also tested 1.25.27)
  • Deployment Method: k8s
  • Multi Node? Number of Running Nodes: 3
  • Client Language and Version: python3.11 weaviate-client == 4.10.2
  • Multitenancy?: no

Any additional Information

Firstly we think that it is a problem with async replication but the theory was not proved.

Here is the configuration that we are using

- name: ASYNC_REPLICATION_ALIVE_NODES_CHECKING_FREQUENCY
  value: 5s
- name: ASYNC_REPLICATION_DIFF_BATCH_SIZE
  value: '100'
- name: ASYNC_REPLICATION_DIFF_PER_NODE_TIMEOUT
  value: 10s
- name: ASYNC_REPLICATION_DISABLED
  value: 'false'
- name: ASYNC_REPLICATION_FREQUENCY
  value: 5s
- name: ASYNC_REPLICATION_FREQUENCY_WHILE_PROPAGATING
  value: 5s
- name: ASYNC_REPLICATION_HASHTREE_HEIGHT
  value: '20'
- name: ASYNC_REPLICATION_LOGGING_FREQUENCY
  value: 5s
- name: ASYNC_REPLICATION_PROPAGATION_BATCH_SIZE
  value: '1000'
- name: ASYNC_REPLICATION_PROPAGATION_CONCURRENCY
  value: '20'
- name: ASYNC_REPLICATION_PROPAGATION_DELAY
  value: 30s
- name: ASYNC_REPLICATION_PROPAGATION_LIMIT
  value: '1000000'
- name: ASYNC_REPLICATION_PROPAGATION_TIMEOUT
  value: 30s
- name: DISABLE_TELEMETRY
  value: 'true'
- name: TOMBSTONE_DELETION_CONCURRENCY
  value: '4'
- name: GO_PROFILING_DISABLE
  value: 'true'
- name: FORCE_FULL_REPLICAS_SEARCH
  value: 'false'
- name: ASYNC_INDEXING
  value: 'true'
- name: AUTOSCHEMA_ENABLED
  value: 'false'
- name: CLUSTER_DATA_BIND_PORT
  value: '7001'
- name: CLUSTER_GOSSIP_BIND_PORT
  value: '7000'
- name: DISABLE_LAZY_LOAD_SHARDS
  value: 'true'
- name: ENABLE_API_BASED_MODULES
  value: 'false'
- name: PERSISTENCE_HNSW_MAX_LOG_SIZE
  value: 16GiB
- name: QUERY_SLOW_LOG_ENABLED
  value: 'false'
- name: QUERY_SLOW_LOG_THRESHOLD
  value: 500ms
- name: RECOUNT_PROPERTIES_AT_STARTUP
  value: 'false'
- name: GOGC
  value: '85'
- name: GOMAXPROCS
  value: '39'
- name: HNSW_STARTUP_WAIT_FOR_VECTOR_CACHE
  value: 'true'
- name: LIMIT_RESOURCES
  value: 'true'
- name: LOG_LEVEL
  value: debug
- name: PROMETHEUS_MONITORING_ENABLED
  value: 'true'
- name: PROMETHEUS_MONITORING_GROUP
  value: 'true'
- name: QUERY_MAXIMUM_RESULTS
  value: '1000'
- name: REINDEX_SET_TO_ROARINGSET_AT_STARTUP
  value: 'true'
- name: REINDEX_VECTOR_DIMENSIONS_AT_STARTUP
  value: 'false'
- name: STANDALONE_MODE
  value: 'false'
- name: TRACK_VECTOR_DIMENSIONS
  value: 'false'
- name: PERSISTENCE_DATA_PATH
  value: /var/lib/weaviate
- name: PERSISTENCE_LSM_ACCESS_STRATEGY
  value: pread
- name: DEFAULT_VECTORIZER_MODULE
  value: none
- name: RAFT_BOOTSTRAP_TIMEOUT
  value: '7200'
- name: RAFT_BOOTSTRAP_EXPECT
  value: '3'
- name: RAFT_JOIN
  value: weaviate-0,weaviate-1,weaviate-2
- name: RAFT_ENABLE_ONE_NODE_RECOVERY
  value: 'false'
- name: RAFT_ENABLE_FQDN_RESOLVER
  value: 'true'

Schema:

{
    "class": "KeyClass",
    "invertedIndexConfig": {
        "bm25": {
            "b": 0.75,
            "k1": 1.2
        },
        "cleanupIntervalSeconds": 60,
        "stopwords": {
            "additions": null,
            "preset": "en",
            "removals": null
        }
    },
    "multiTenancyConfig": {
        "autoTenantActivation": false,
        "autoTenantCreation": false,
        "enabled": false
    },
    "properties": [
        {
            "dataType": [
                "text"
            ],
            "description": "key_id",
            "indexFilterable": true,
            "indexRangeFilters": false,
            "indexSearchable": true,
            "name": "key_id",
            "tokenization": "word"
        }
    ],
    "replicationConfig": {
        "asyncEnabled": false,
        "deletionStrategy": "NoAutomatedResolution",
        "factor": 3
    },
    "shardingConfig": {
        "actualCount": 3,
        "actualVirtualCount": 384,
        "desiredCount": 3,
        "desiredVirtualCount": 384,
        "function": "murmur3",
        "key": "_id",
        "strategy": "hash",
        "virtualPerPhysical": 128
    },
    "vectorIndexConfig": {
        "bq": {
            "enabled": false
        },
        "cleanupIntervalSeconds": 300,
        "distance": "cosine",
        "dynamicEfFactor": 8,
        "dynamicEfMax": 500,
        "dynamicEfMin": 100,
        "ef": 640,
        "efConstruction": 640,
        "filterStrategy": "sweeping",
        "flatSearchCutoff": 40000,
        "maxConnections": 64,
        "multivector": {
            "aggregation": "maxSim",
            "enabled": false
        },
        "pq": {
            "bitCompression": false,
            "centroids": 256,
            "enabled": false,
            "encoder": {
                "distribution": "log-normal",
                "type": "kmeans"
            },
            "segments": 0,
            "trainingLimit": 100000
        },
        "skip": false,
        "sq": {
            "enabled": false,
            "rescoreLimit": 20,
            "trainingLimit": 100000
        },
        "vectorCacheMaxObjects": 1000000000000
    },
    "vectorIndexType": "hnsw",
    "vectorizer": "none"
}

Also we measured number of appearing errors during load and there are some graphs

hi @d_khlebokazov !!

Let me know if I understood it correctly.

You insert some objects with key_id = 123456.

Then you do near_vector search, filtering by that key_id.

You are then getting results from that filter with key_id != of 123456?

As you are using ASYNC_INDEX it may take some time for the object to be fully written and loaded.

Can you provide some code were we could reproduce it?

Aso, it is best to always use latest, so in your example, can you try it at version 1.29.latest that now is 1.29.8?

Thanks!

Hello @DudaNogueira !

I inserted some object (some with key_id = “123456” and some with others)

Then I do near_vector search and expecting at top places found key_id = “123456” because we used the same object to exctract vector. (WIthout filtering)

As you can see in the second (correct) search all key_id = "123456"

All inserted objects were found, but when I perform search with parrallel write requests (both insert and delete operations) I got some messed up results as I described.

Here actually “Search results with parallel requests”

There would be no problem if vectors key_id = "123456" and key_id = "654321" have high similarity and thats why key_id = "654321" comes one of the first places. The problem is that Weaviate seems calculating distance with key_id = "123456" but from storage takes key_id = "654321" (Thats why distances the same but different objects)

Please notice that first 3 searches have the same distances (completelly the same), but the found object was got wrong.

We actualy use FastAPI application to communicate with Weaviate (quite simple, just mirroring the Weaviate interface for internal communication) and test it with locust

import sys

if sys.platform == 'win32':
    import os

    os.environ['GEVENT_SUPPORT'] = '1'
import uuid
from collections import namedtuple
import pandas as pd
import numpy as np
import struct
import orjson
from locust import FastHttpUser, task, between, run_single_user, events
import locust.stats
import datetime

EXPORT_PROP_ID = 'id'
EXPORT_PROP_KEY_ID = 'key_id'
EXPORT_PROP_VECTOR = 'vector'

SEARCH_API_FILE = "./15.pkl"
SEARCH_API_URL = "http://weaviate-test/api/v1/"
SEARCH_API_INDEX = 'KeyClass'

SEARCH_ERRORS_FILE = "./errors.csv"
"""
Error format
timestamp, KEY_ID, search_vector_id, is_found_top1, error msg
"""

locust.stats.PERCENTILES_TO_REPORT = [0.95, 0.98, 0.99, 0.999, 0.9999, 1.0]
locust.stats.PERCENTILES_TO_STATISTICS = [0.95, 0.99]
locust.stats.PERCENTILES_TO_CHART = [0.95, 0.99]

Row = namedtuple("Row",
                  [EXPORT_PROP_ID, EXPORT_PROP_KEY_ID, EXPORT_PROP_VECTOR])

ERROR_DISTANCE_THRESHOLD = 0.4

with open(SEARCH_ERRORS_FILE, "w+") as f:
    f.write("timestamp, KEY_ID, search_vector_id, is_found_top1, error msg\n")



@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    environment.stats.use_response_times_cache = True


class WebsiteUser(FastHttpUser):
    host = SEARCH_API_URL
    wait_time = between(0, 0)
    network_timeout = 3.0
    connection_timeout = 0.2 * network_timeout

    df = pd.read_pickle(SEARCH_API_FILE)
    df_arr = df.values
    cnt = 0

    def load_row(self):
        row_arr = self.df_arr[np.random.choice(self.df_arr.shape[0], 1, replace=False)][0]
        row = Row(*row_arr)

        key_id = str(getattr(row, EXPORT_PROP_KEY_ID))

        vector = getattr(row, EXPORT_PROP_VECTOR)
        embedding = struct.pack(f"<{vector.size}f", *vector.flatten())

        id = str(uuid.uuid4())

        return id, key_id, embedding

    @task
    def create_delete(self):
        id, key_id, embedding = self.load_row()
        id = str(uuid.uuid4())
        weaviate_post_data = {
            'json_data': orjson.dumps({
                "key_id": key_id,
                "id": id
            })
        }
        weaviate_post_embedding = {'embedding': embedding}

        create_response = self.client.post(f"/{SEARCH_API_INDEX}/", data=weaviate_post_data,
                                           files=weaviate_post_embedding, headers={'Connection': 'keep-alive'})
        try:
            create_response.raise_for_status()
        except:
            print("create raised with status", create_response.status_code)
            raise Exception(f"create status {create_response.status_code}")
        
        delete_response = self.client.delete(f"/{SEARCH_API_INDEX}/{id}",
                                             name=f"{SEARCH_API_URL}/{SEARCH_API_INDEX}/<id>",
                                             headers={'Connection': 'keep-alive'})

        try:
            delete_response.raise_for_status()
        except:
            print("delete_response raised with status", delete_response.status_code)
            raise Exception(f"delete status {delete_response.status_code}")
    
    @task
    def search_compare(self):
        id, key_id, embedding = self.load_row()
        embedding_guid = str(uuid.uuid4())
        weaviate_search_data = {
            'json': orjson.dumps({"embeddings": [embedding_guid]}),
        }

        weaviate_search_files = {
            embedding_guid: embedding,
        }

        search_response = self.client.post(f"/{SEARCH_API_INDEX}/search",
                                           data=weaviate_search_data, files=weaviate_search_files,
                                           headers={'Connection': 'keep-alive'})
        try:
            search_response.raise_for_status()
        except:
            print("search_response raised with status", search_response.status_code)
            raise Exception(f"search status {search_response.status_code}")
        
        # check search results
        ids = []
        wrong_case = {}
        is_found_top1 = False
        _said_about_is_found_top1 = False
        error_message = ""
        try:
            wrong_key_id_found = False
            last_found_key_id = None
            data = orjson.loads(search_response.content.decode("utf-8"))
            for i in range(0, len(data['searches'][0]['similars'])): 
                i_key_id = data['searches'][0]['similars'][i]['key_id']
                if i_key_id == key_id:
                    if i == 0:
                        is_found_top1 = True

                    if wrong_key_id_found:
                        error_message += f"Correct key_id ({key_id}) found after incorrect ({last_found_key_id}). "
                        wrong_key_id_found = False
                else:
                    wrong_case[data['searches'][0]['similars'][i]['id']] = {
                        "key_id": i_key_id,
                        "distance": data['searches'][0]['similars'][i]['distance']
                    }
                    wrong_key_id_found = True
                ids.append(data['searches'][0]['similars'][i]['id'])
                last_found_key_id = i_key_id

                if not is_found_top1 and not _said_about_is_found_top1:
                    error_message += "Key ID not found top 1. "
                    _said_about_is_found_top1 = True

        except (IndexError, orjson.JSONDecodeError, KeyError) as e:
            print(str(e))
            pass

        weavite_comparison = {
            'json': orjson.dumps(
                {"data_1": ["3fa85f64-5717-4562-b3fc-2c963f66afa6"], "data_2": ids}),
            '3fa85f64-5717-4562-b3fc-2c963f66afa6': embedding,
        }
        if ids:
            comparison_response = self.client.post(f"/{SEARCH_API_INDEX}/comparison",  # perform in API similarity calculation
                                                   files=weavite_comparison, headers={'Connection': 'keep-alive'})
            try:
                comparison_response.raise_for_status()
            except Exception as e:
                print("comparison raised with status", comparison_response.status_code)
                raise Exception(f"comparison status {comparison_response.status_code}")
            
            data = orjson.loads(comparison_response.content.decode("utf-8"))
            _cidx = 1
            for comparison in data.get("comparisons", []):
                if comparison.get("data_2") in wrong_case.keys():
                    w_case = wrong_case[comparison.get("data_2")]

                    if ERROR_DISTANCE_THRESHOLD > w_case["distance"] or (1 - comparison["distance"]) < ERROR_DISTANCE_THRESHOLD: # (1 - d) because comparison returns similarity and not distance
                        error_message += f'Distance case {_cidx}: Needs {key_id} but Found {w_case["key_id"]}({comparison.get("data_2")}) with distance {w_case["distance"]} and internal as {1 - comparison["distance"]}. '
                        _cidx += 1

                        if ERROR_DISTANCE_THRESHOLD/2 <= abs(1 - comparison["distance"] - w_case["distance"]):
                            error_message += f"Weaviate cosine and API distances big missmatch (weaviate >: {1 - comparison['distance'] > w_case['distance']})! "

            if error_message != "":
                with open(SEARCH_ERRORS_FILE, "a") as f:
                    f.write(f"{datetime.datetime.now()},{key_id},{id},{is_found_top1},{error_message}\n")
                raise Exception(error_message)
        else:
            with open(SEARCH_ERRORS_FILE, "a") as f:
                f.write(f"{datetime.datetime.now()},{key_id},{id},{is_found_top1},{error_message}\n")
            raise Exception(f"Nothing found for key_id {key_id}, id: {id}")


if __name__ == "__main__":
    run_single_user(WebsiteUser)

I think you could use this script to perform testing (access weaviate without FastAPI for example)

Yeah I know, but we need to perform more testing before appling the updated version.

Please let me know if you can/cannot reproduse this error!
Thanks!