gRPC utilities

gRPC utilities: async client and server decorators for metrics and health.

The redup_servicekit.grpc package contains:

  • redup_servicekit.grpc.client.BasicAsyncClient — async gRPC client

  • redup_servicekit.grpc.decorators.grpc_init_wrapper() — init wrapper for servicer

  • redup_servicekit.grpc.decorators.aio_grpc_method_wrapper() — method wrapper for metrics

class BasicAsyncClient(host, ServiceStub, max_message_length=104857600, request_compression_algo='gzip', response_compression_algo='gzip')[source]

Bases: object

Async gRPC client with on-demand channels, TLS and compression.

Parameters:
  • host (str) – Server address with protocol: grpc://host:port (non-TLS) or https://host:port (TLS). Do not omit the scheme.

  • ServiceStub – Generated gRPC stub class (e.g. MyServiceStub).

  • max_message_length (int) – Max send/receive message length in bytes. Default 100 MiB.

  • request_compression_algo (str) – gzip, deflate, or no_compression.

  • response_compression_algo (str) – Same options; sent as metadata.

Example:

>>> from redup_servicekit.grpc import BasicAsyncClient
>>> client = BasicAsyncClient("grpc://localhost:50051", MyServiceStub, request_compression_algo="gzip")
>>> response = await client.send(MyRequest(field="value"), Method="MyRpc", timeout=30)
__init__(host, ServiceStub, max_message_length=104857600, request_compression_algo='gzip', response_compression_algo='gzip')[source]
async send(request, Method, metadata=(), max_message_length=104857600, timeout=None, stream=False)[source]

Send a request and return a single response (unary) or async stream.

Parameters:
  • request – The request message instance.

  • Method (str) – Method name string (e.g. "MyRpc").

  • metadata – Optional tuple of (key, value) metadata.

  • max_message_length – Override max message length for this call.

  • timeout – Optional timeout in seconds.

  • stream (bool) – If True, return an async generator of response messages.

Returns:

Single response message (unary) or async generator (stream).

aio_grpc_method_wrapper(func)[source]

Wrap an async gRPC handler to record metrics and health.

For each call: increments request counter, records task start/end, measures request/response size and duration, records success/failure and error type, updates health via ErrorParser.set_status(). Injects kwargs["info"] (context, time_remaining) and kwargs["metrics"] (dict for custom metrics to merge into stats).

Parameters:

func – Async handler with signature (self, request, context, **kwargs).

grpc_init_wrapper(func)[source]

Wrap the function that initializes and runs the gRPC server.

Calls ErrorParser.init() and pre-registers per-method stats for all public methods of the servicer instance (args[0]). Use on your init/run function that creates the server and adds the servicer.

Parameters:

func – Callable that takes servicer (or server) and runs the server.