Description
Whenever I instantiate a client connection to a weaviate database within a gRPC server (not even using the connection), I get such errors when making parallel calls to the gRPC route:
Exception in callback PollerCompletionQueue._handle_events(<_UnixSelecto...e debug=False>)()
handle: <Handle PollerCompletionQueue._handle_events(<_UnixSelecto...e debug=False>)()>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi", line 147, in grpc._cython.cygrpc.PollerCompletionQueue._handle_events
BlockingIOError: [Errno 35] Resource temporarily unavailable
Server Setup Information
- Weaviate Server Version: 1.25.13
- Deployment Method: docker
- Multi Node? Number of Running Nodes: Single node
- Client Language and Version: Python client 4.10.4
- Multitenancy?: No
Any additional Information
Here is a minimal code that reproduces the issue:
test.proto
syntax = "proto3";
import "google/protobuf/empty.proto";
service TestService{
rpc TestRoute(google.protobuf.Empty) returns (TestMessage) {};
}
message TestMessage {
string test = 1;
}
test_server.py
import asyncio
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
import weaviate
class DocumentServer(pb2_grpc.TestServiceServicer):
def __init__(self):
self.client = weaviate.connect_to_custom(
http_host="localhost",
http_port=8080,
http_secure=False,
grpc_host="localhost",
grpc_port=50051,
grpc_secure=False,
)
def TestRoute(self, request, context):
return pb2.TestMessage(test="Hello, World!")
async def serve(self, port):
server = grpc.aio.server()
pb2_grpc.add_TestServiceServicer_to_server(self, server)
server.add_insecure_port(f"[::]:{port}")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
server = DocumentServer()
asyncio.run(server.serve(50052))
test_client.py
from threading import Thread
import grpc
import test_pb2_grpc as pb2_grpc
from google.protobuf import empty_pb2
class TestClient(object):
def __init__(self, host, server_port):
self.host = host
self.server_port = server_port
def test_route(
self,
):
with grpc.insecure_channel(
"{}:{}".format(self.host, self.server_port)
) as channel:
stub = pb2_grpc.TestServiceStub(channel)
return stub.TestRoute(empty_pb2.Empty())
def run_test():
client = TestClient("localhost", 50052)
print(client.test_route())
if __name__ == "__main__":
threads = []
for _ in range(10):
threads.append(Thread(target=run_test))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Commenting out the line self.client = weaviate.connect_to_custom(...
in the DocumentServer, the error does not appear anymore