Source code for redup_servicekit.grpc.decorators

"""gRPC server decorators for metrics and health integration.

The :mod:`redup_servicekit.grpc.decorators` module contains:

- :func:`redup_servicekit.grpc.decorators.grpc_init_wrapper` — wrap servicer init; register per-method stats
- :func:`redup_servicekit.grpc.decorators.aio_grpc_method_wrapper` — wrap async handlers; record metrics and health
"""
import logging
import time
import traceback
import uuid
from functools import wraps
from pickle import dumps

from ..metrics import PROMETHEUS_METRICS_REGISTRY
from ..monitoring import ErrorParser, MonitorServer, StatusParser


[docs] def grpc_init_wrapper(func): r"""Wrap the function that initializes and runs the gRPC server. Calls :meth:`ErrorParser.init` and pre-registers per-method stats for all public methods of the servicer instance (args[0]). Use on your init/run function that creates the server and adds the servicer. :param func: Callable that takes servicer (or server) and runs the server. """ @wraps(func) def init_and_run(*args, **kwargs): ErrorParser.init() server = MonitorServer.get_instance() service_instance = args[0] if server: for name in dir(service_instance): if name.startswith("_") or not callable(getattr(service_instance, name)): continue server.set_stat_sync("request___method__%s" % name, 0) for outcome_status in StatusParser._types: server.set_stat_sync( "processed___method__%s___status__%s" % (name, outcome_status), 0 ) for error_kind in ErrorParser._types: server.set_stat_sync( "errors___method__%s___type__%s" % (name, error_kind), 0 ) server.set_stat_sync("request_size___method__%s" % name, 0) server.set_stat_sync("response_size___method__%s" % name, 0) func(*args, **kwargs) return init_and_run
[docs] def aio_grpc_method_wrapper(func): r"""Wrap an async gRPC handler to record metrics and health. For each call: increments request counter, records task start/end, measures request/response size and duration, records success/failure and error type, updates health via :meth:`ErrorParser.set_status`. Injects ``kwargs["info"]`` (context, time_remaining) and ``kwargs["metrics"]`` (dict for custom metrics to merge into stats). :param func: Async handler with signature (self, request, context, ``**kwargs``). """ @wraps(func) async def run_with_metrics(*args, **kwargs): rpc_method_name = getattr(func, "__name__", "UnknownObject") server = MonitorServer.get_instance() request_trace_id = str(uuid.uuid4()) request_start_timestamp_seconds = time.time() if server: await server.inc_stats("request___method__%s" % rpc_method_name) await server.add_key_value("tasks", (request_trace_id, request_start_timestamp_seconds)) kwargs["info"] = {"context": args[2]} request_counters = {"request_size___method__%s" % rpc_method_name: len(dumps(args[1]))} grpc_servicer_context = args[2] if grpc_servicer_context is not None and "FakeContext" not in str(type(grpc_servicer_context)) and getattr(grpc_servicer_context, "time_remaining", lambda: None)() is not None: deadline_remaining_seconds = grpc_servicer_context.time_remaining() request_counters["time_remaining_time___method__%s" % rpc_method_name] = deadline_remaining_seconds kwargs["info"]["time_remaining"] = lambda: max(0, deadline_remaining_seconds - (time.time() - request_start_timestamp_seconds)) if time.time() - request_start_timestamp_seconds < deadline_remaining_seconds else 0 else: kwargs["info"]["time_remaining"] = lambda: None custom_metrics_from_handler = {} kwargs["metrics"] = custom_metrics_from_handler request_failed = False error_status_for_health = None try: with PROMETHEUS_METRICS_REGISTRY["stats_tasks_time_spent_quantile"].labels(*MonitorServer._get_labels()).time(): async with MonitorServer.async_service_threads[rpc_method_name]: handler_result = await func(*args, **kwargs) request_counters["response_size___method__%s" % rpc_method_name] = len(dumps(handler_result)) return handler_result except Exception as caught_exception: logging.error(traceback.format_exc()) request_failed = True error_status_for_health = ErrorParser.parse(caught_exception) if server: await server.inc_stats("errors___method__%s___type__%s" % (rpc_method_name, error_status_for_health)) raise finally: if server: await server.del_key_value("tasks", request_trace_id) for metric_name, metric_value in custom_metrics_from_handler.items(): request_counters["%s___method__%s" % (metric_name, rpc_method_name)] = metric_value request_counters["time full___method__%s" % rpc_method_name] = time.time() - request_start_timestamp_seconds if grpc_servicer_context is not None and "FakeContext" not in str(type(grpc_servicer_context)) and getattr(grpc_servicer_context, "time_remaining", lambda: None)() is not None: request_counters["time_remaining_after_time___method__%s" % rpc_method_name] = grpc_servicer_context.time_remaining() if server: await server.inc_stats("processed___method__%s___status__%s" % (rpc_method_name, StatusParser.parse(request_failed))) await server.append_stats("stats", request_counters, 100) await ErrorParser.set_status(error_status_for_health) return run_with_metrics