"[Errno 35] Resource temporarily unavailable" when client instantiated in a gRPC server

Description

Whenever I instantiate a client connection to a weaviate database within a gRPC server (not even using the connection), I get such errors when making parallel calls to the gRPC route:

Exception in callback PollerCompletionQueue._handle_events(<_UnixSelecto...e debug=False>)()
handle: <Handle PollerCompletionQueue._handle_events(<_UnixSelecto...e debug=False>)()>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi", line 147, in grpc._cython.cygrpc.PollerCompletionQueue._handle_events
BlockingIOError: [Errno 35] Resource temporarily unavailable

Server Setup Information

  • Weaviate Server Version: 1.25.13
  • Deployment Method: docker
  • Multi Node? Number of Running Nodes: Single node
  • Client Language and Version: Python client 4.10.4
  • Multitenancy?: No

Any additional Information

Here is a minimal code that reproduces the issue:

test.proto

syntax = "proto3";
import "google/protobuf/empty.proto";

service TestService{
 rpc TestRoute(google.protobuf.Empty) returns (TestMessage) {};
}

message TestMessage {
 string test = 1;
}

test_server.py

import asyncio

import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
import weaviate


class DocumentServer(pb2_grpc.TestServiceServicer):

    def __init__(self):
        self.client = weaviate.connect_to_custom(
            http_host="localhost",
            http_port=8080,
            http_secure=False,
            grpc_host="localhost",
            grpc_port=50051,
            grpc_secure=False,
        )

    def TestRoute(self, request, context):
        return pb2.TestMessage(test="Hello, World!")

    async def serve(self, port):
        server = grpc.aio.server()
        pb2_grpc.add_TestServiceServicer_to_server(self, server)
        server.add_insecure_port(f"[::]:{port}")
        await server.start()
        await server.wait_for_termination()


if __name__ == "__main__":
    server = DocumentServer()
    asyncio.run(server.serve(50052))

test_client.py

from threading import Thread

import grpc
import test_pb2_grpc as pb2_grpc
from google.protobuf import empty_pb2


class TestClient(object):
    def __init__(self, host, server_port):
        self.host = host
        self.server_port = server_port

    def test_route(
        self,
    ):
        with grpc.insecure_channel(
            "{}:{}".format(self.host, self.server_port)
        ) as channel:
            stub = pb2_grpc.TestServiceStub(channel)
            return stub.TestRoute(empty_pb2.Empty())


def run_test():
    client = TestClient("localhost", 50052)
    print(client.test_route())


if __name__ == "__main__":
    threads = []
    for _ in range(10):
        threads.append(Thread(target=run_test))

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

Commenting out the line self.client = weaviate.connect_to_custom(... in the DocumentServer, the error does not appear anymore

hi @Nicolas_Lurkin !!

Welcome to our community :hugs:

Can you check if this also happens on latest (1.28.3)?

I was not able to run to missing import test_pb2_grpc as pb2_grpc

Is that a lib?

Thanks!

Thanks for your help.

Those files are those that are automatically generated by gRPC (with the command
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./test.proto)

I can try a new version of the weaviate database, but I have the feeling that this is linked to the python client. If I revert to an older version (4.6.5 for instance), the problem is not present.

I confirm that switching to weaviate database version 1.28.3 changes nothing.

Hi @Nicolas_Lurkin, thanks for raising this one!

This specific issue is emitted by the grpc library under-the-hood but is benign and doesn’t signify a serious error .Here is the open issue (nearly four years old at this point :sob:)

The reason that you see this with our client is because the sync client creates a sidecar event loop thread where it runs its fundamental async methods in. I see in your MRE that you run your gRPC server with asyncio. To avoid seeing these types of errors I’d recommend using the async client with it. Then you shouldn’t see any such errors. E.g.:

class DocumentServer(pb2_grpc.TestServiceServicer):

    def __init__(self):
        self.client = weaviate.use_async_with_custom(
            http_host="localhost",
            http_port=8080,
            http_secure=False,
            grpc_host="localhost",
            grpc_port=50051,
            grpc_secure=False,
        )

    def TestRoute(self, request, context):
        return pb2.TestMessage(test="Hello, World!")

    async def serve(self, port):
        # must connect manually since use_async_x doesn't do any 
        # connecting as it involves I/O and so returns a Coroutine
        await self.client.connect()

        server = grpc.aio.server()
        pb2_grpc.add_TestServiceServicer_to_server(self, server)
        server.add_insecure_port(f"[::]:{port}")
        await server.start()
        await server.wait_for_termination()

I hope that works for you, let me know if it doesn’t!

1 Like