gRPC utilities
gRPC utilities: async client and server decorators for metrics and health.
The redup_servicekit.grpc package contains:
redup_servicekit.grpc.client.BasicAsyncClient— async gRPC clientredup_servicekit.grpc.decorators.grpc_init_wrapper()— init wrapper for servicerredup_servicekit.grpc.decorators.aio_grpc_method_wrapper()— method wrapper for metrics
- class BasicAsyncClient(host, ServiceStub, max_message_length=104857600, request_compression_algo='gzip', response_compression_algo='gzip')[source]
Bases:
objectAsync gRPC client with on-demand channels, TLS and compression.
- Parameters:
host (str) – Server address with protocol:
grpc://host:port(non-TLS) orhttps://host:port(TLS). Do not omit the scheme.ServiceStub – Generated gRPC stub class (e.g.
MyServiceStub).max_message_length (int) – Max send/receive message length in bytes. Default 100 MiB.
request_compression_algo (str) –
gzip,deflate, orno_compression.response_compression_algo (str) – Same options; sent as metadata.
Example:
>>> from redup_servicekit.grpc import BasicAsyncClient >>> client = BasicAsyncClient("grpc://localhost:50051", MyServiceStub, request_compression_algo="gzip") >>> response = await client.send(MyRequest(field="value"), Method="MyRpc", timeout=30)
- __init__(host, ServiceStub, max_message_length=104857600, request_compression_algo='gzip', response_compression_algo='gzip')[source]
- async send(request, Method, metadata=(), max_message_length=104857600, timeout=None, stream=False)[source]
Send a request and return a single response (unary) or async stream.
- Parameters:
request – The request message instance.
Method (str) – Method name string (e.g.
"MyRpc").metadata – Optional tuple of (key, value) metadata.
max_message_length – Override max message length for this call.
timeout – Optional timeout in seconds.
stream (bool) – If True, return an async generator of response messages.
- Returns:
Single response message (unary) or async generator (stream).
- aio_grpc_method_wrapper(func)[source]
Wrap an async gRPC handler to record metrics and health.
For each call: increments request counter, records task start/end, measures request/response size and duration, records success/failure and error type, updates health via
ErrorParser.set_status(). Injectskwargs["info"](context, time_remaining) andkwargs["metrics"](dict for custom metrics to merge into stats).- Parameters:
func – Async handler with signature (self, request, context,
**kwargs).
- grpc_init_wrapper(func)[source]
Wrap the function that initializes and runs the gRPC server.
Calls
ErrorParser.init()and pre-registers per-method stats for all public methods of the servicer instance (args[0]). Use on your init/run function that creates the server and adds the servicer.- Parameters:
func – Callable that takes servicer (or server) and runs the server.