How to run embedded weaviate with fastapi+hypercorn?

I have a FastAPI running a weaviate client (weaviate-client==4.7.0). The setup is done like this:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from api.weaviate_client import weaviate_client

@asynccontextmanager
async def lifespan(app: FastAPI):
    await weaviate_client.connect()
    yield
    await weaviate_client.close()


app = FastAPI(lifespan=lifespan)

Where weaviate_client is instantiated depending on the environment:

def get_weaviate_client():
    print("getting client")
    if os.environ.get("TEST"):
        return weaviate.use_async_with_embedded(
            version="1.26.1",
            headers={"X-OpenAI-Api-Key": envvars.OPENAI_API_KEY},
        )
    else:
        return weaviate.use_async_with_weaviate_cloud(
            cluster_url=envvars.WEAVIATE_HOST,
            auth_credentials=AuthApiKey(envvars.WEAVIATE_API_KEY),
            headers={"X-OpenAI-Api-Key": envvars.OPENAI_API_KEY},
        )


weaviate_client = get_weaviate_client()

And I run my FastAPI server using hypercorn like this: hypercorn api.main:app --reload.

The problem is that the my app works correctly when the client comes from use_async_with_weaviate_cloud, but it fails to bootstrap when using use_async_with_embedded complaining that the ports are already in use:

weaviate.exceptions.WeaviateStartUpError: Embedded DB did not start because processes are already listening on ports http:8079 and grpc:50050use weaviate.connect_to_local(port=8079, grpc_port=50050) to connect to the existing instance

My suspicion is that get_weaviate_client is getting called twice (I noticed because the print is logged twice, but I don’t know why. Maybe related to hypercorn?) so the embedded server is tried to be created twice and the second one is failing.

My whole goal is to mock my weaviate cloud cluster when testing my FastAPI app and I was sort of following this guide, but now I’m not sure how to proceed given that I might not be able to mock it.

Is there any recommended approach?

The full logs are here:

Summary
Using embedded weaviate client. Only for testing!
{"action":"startup","default_vectorizer_module":"none","level":"info","msg":"the default vectorizer modules is set to \"none\", as a result all new schema classes without an explicit vectorizer setting, will use this vectorizer","time":"2024-08-27T18:38:17-06:00"}
{"action":"startup","auto_schema_enabled":true,"level":"info","msg":"auto schema enabled setting is set to \"true\"","time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"No resource limits set, weaviate will use all available memory and CPU. To limit resources, set LIMIT_RESOURCES=true","time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"module offload-s3 is enabled","time":"2024-08-27T18:38:17-06:00"}
{"level":"warning","msg":"Multiple vector spaces are present, GraphQL Explore and REST API list objects endpoint module include params has been disabled as a result.","time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"open cluster service","servers":{"Embedded_at_8079":50766},"time":"2024-08-27T18:38:17-06:00"}
{"address":"192.168.0.39:50767","level":"info","msg":"starting cloud rpc server ...","time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"starting raft sub-system ...","time":"2024-08-27T18:38:17-06:00"}
{"address":"192.168.0.39:50766","level":"info","msg":"tcp transport","tcpMaxPool":3,"tcpTimeout":10000000000,"time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"loading local db","time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"local DB successfully loaded","time":"2024-08-27T18:38:17-06:00"}
{"level":"info","msg":"schema manager loaded","n":0,"time":"2024-08-27T18:38:17-06:00"}
{"level":"info","metadata_only_voters":false,"msg":"construct a new raft node","name":"Embedded_at_8079","time":"2024-08-27T18:38:17-06:00"}
{"action":"raft","index":1,"level":"info","msg":"raft initial configuration","servers":"[[{Suffrage:Voter ID:Embedded_at_8087 Address:192.168.0.39:52912}]]","time":"2024-08-27T18:38:17-06:00"}
{"last_snapshot_index":0,"last_store_applied_index":0,"last_store_log_applied_index":0,"level":"info","msg":"raft node constructed","raft_applied_index":0,"raft_last_index":4,"time":"2024-08-27T18:38:17-06:00"}
{"action":"raft","follower":{},"leader-address":"","leader-id":"","level":"info","msg":"raft entering follower state","time":"2024-08-27T18:38:17-06:00"}
{"level":"warning","msg":"raft heartbeat timeout reached, not part of a stable configuration or a non-voter, not triggering a leader election","time":"2024-08-27T18:38:19-06:00"}
{"docker_image_tag":"unknown","level":"info","msg":"configured versions","server_version":"1.26.1","time":"2024-08-27T18:38:19-06:00"}
{"action":"grpc_startup","level":"info","msg":"grpc server listening at [::]:50050","time":"2024-08-27T18:38:19-06:00"}
{"action":"restapi_management","docker_image_tag":"unknown","level":"info","msg":"Serving weaviate at http://127.0.0.1:8079","time":"2024-08-27T18:38:19-06:00"}
{"action":"bootstrap","error":"could not join a cluster from [192.168.0.39:50766]","level":"warning","msg":"failed to join cluster, will notify next if voter","servers":["192.168.0.39:50766"],"time":"2024-08-27T18:38:19-06:00","voter":true}
{"action":"bootstrap","candidates":[{"Suffrage":0,"ID":"Embedded_at_8079","Address":"192.168.0.39:50766"}],"level":"info","msg":"starting cluster bootstrapping","time":"2024-08-27T18:38:19-06:00"}
{"action":"bootstrap","error":"bootstrap only works on new clusters","level":"error","msg":"could not bootstrapping cluster","time":"2024-08-27T18:38:19-06:00"}
{"action":"bootstrap","level":"info","msg":"notified peers this node is ready to join as voter","servers":["192.168.0.39:50766"],"time":"2024-08-27T18:38:19-06:00"}
{"action":"telemetry_push","level":"info","msg":"telemetry started","payload":"\u0026{MachineID:HIDDEN Type:INIT Version:1.26.1 NumObjects:0 OS:darwin Arch:amd64 UsedModules:[]}","time":"2024-08-27T18:38:19-06:00"}
{"action":"bootstrap","error":"could not join a cluster from [192.168.0.39:50766]","level":"warning","msg":"failed to join cluster, will notify next if voter","servers":["192.168.0.39:50766"],"time":"2024-08-27T18:38:21-06:00","voter":true}
{"action":"bootstrap","level":"info","msg":"notified peers this node is ready to join as voter","servers":["192.168.0.39:50766"],"time":"2024-08-27T18:38:21-06:00"}
Using embedded weaviate client. Only for testing!
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/hypercorn/asyncio/run.py", line 179, in asyncio_worker
    app = load_application(config.application_path, config.wsgi_max_body_size)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/hypercorn/utils.py", line 115, in load_application
    module = import_module(import_name)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1147, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 690, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/Users/user/dev/project-api-py/api/main.py", line 4, in <module>
    from api.weaviate_client import weaviate_client
  File "/Users/user/dev/project-api-py/api/weaviate_client.py", line 22, in <module>
    weaviate_client = get_weaviate_client()
                      ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/dev/project-api-py/api/weaviate_client.py", line 10, in get_weaviate_client
    return weaviate.use_async_with_embedded(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/weaviate/connect/helpers.py", line 637, in use_async_with_embedded
    client = WeaviateAsyncClient(
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/weaviate/client.py", line 150, in __init__
    super().__init__(
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/weaviate/client_base.py", line 68, in __init__
    connection_params, embedded_db = self.__parse_connection_params_and_embedded_db(
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/weaviate/client_base.py", line 107, in __parse_connection_params_and_embedded_db
    embedded_db.start()
  File "/Users/user/miniforge3/envs/projectpy/lib/python3.11/site-packages/weaviate/embedded.py", line 305, in start
    raise WeaviateStartUpError(
weaviate.exceptions.WeaviateStartUpError: Embedded DB did not start because processes are already listening on ports http:8079 and grpc:50050use weaviate.connect_to_local(port=8079, grpc_port=50050) to connect to the existing instance

I was able to solve it.

In case someone finds it useful, my “solution” was to create the embedded db in a pytest fixture along with the app like this:

@pytest_asyncio.fixture(scope="session")
async def app_instance():
    print("Creating weaviate embedded instance to connect to")
    weaviate.use_async_with_embedded(
        version="1.26.1",
        headers={"X-OpenAI-Api-Key": envvars.OPENAI_API_KEY},
        port=8079,
        grpc_port=50050,
    )
    print("Creating app instance")
    async with LifespanManager(app) as manager:
        print("We're in!")
        yield manager.app


@pytest_asyncio.fixture(scope="session")
async def client(app_instance):
    async with AsyncClient(app=app_instance, base_url="http://app.io") as client:
        print("Client is ready")
        yield client

And replace my weaviate_instance client with just a connection to it like this:

def get_weaviate_client():
        return weaviate.use_async_with_local(port=8079, grpc_port=50050)

Note that my rationale of using scope="session" was that I only want an app and embedded db per pytest run, but then I’m forced to run all my tests within that same event loop to avoid errors like tasks attached to a different loop:

@pytest.mark.asyncio(loop_scope="session")
async def test_search_endpoint(client):
    pass

Not sure if this is the optimal, though, and also note that this applies not only to embedded instances