Monitoring

Prometheus metrics collection and HTTP server for /metrics.

The redup_servicekit.monitoring module contains:

class TaskStatus(*values)[source]

Bases: Enum

Status of a task for health reporting.

Used when reporting task outcome: OK, FATAL, CONNECTION_ERROR, OTHER_EXCEPTION, INTERNAL, UNKNOWN. Config maps error types to these and to an unhealthy threshold.

OK = 1
FATAL = 2
CONNECTION_ERROR = 3
OTHER_EXCEPTION = -1
INTERNAL = 4
UNKNOWN = 5
class ErrorParser[source]

Bases: object

Maps exception tracebacks to error type strings and sets task status for health.

static init()[source]
static parse(error)[source]
async static set_status(error_type=None)[source]
class StatusParser[source]

Bases: object

Returns failure or success label for processed-request stats.

static parse(error=False)[source]
class MonitorStorage[source]

Bases: object

Async storage for monitoring state.

Holds stats and task status. The Prometheus registry is updated only when refresh_registry_for_metrics() is called (e.g. on each /metrics request). Methods: inc_stats, set_stats, set_task_status, append_stats, add_key_value, del_key_value, get_stats, get_statuses.

__init__()[source]
get_time()[source]
refresh_registry_for_metrics()[source]

Update PROMETHEUS_METRICS_REGISTRY from current stats. Call from MetricServer on /metrics request (e.g. every 30s).

async inc_stats(stat_key, count=1)[source]
async set_stats(stat_key, value)[source]
async set_task_status(status, task_type='default')[source]
async append_stats(stat_key, stat_value, max_count=-1)[source]
async add_key_value(section_key, key_value_tuple)[source]
async del_key_value(section_key, key_to_remove)[source]
async get_stats()[source]
async get_statuses()[source]
class MetricServer(registry=prometheus_client.registry.REGISTRY, disable_compression=False, storage=None)[source]

Bases: object

HTTP server that serves /metrics.

On each /metrics request refreshes the Prometheus registry from the provided storage, then exports. Use run_in_thread() to start in a daemon thread on a given port.

Parameters:
  • registry – Prometheus registry to export. Default REGISTRY.

  • disable_compression – If True, disable response compression.

  • storage – Optional MonitorStorage; if set, registry is refreshed from it on each request.

__init__(registry=prometheus_client.registry.REGISTRY, disable_compression=False, storage=None)[source]
get_wsgi_app()[source]

Return WSGI app: on /metrics refreshes registry from storage then bakes output.

run_in_thread(port)[source]

Start HTTP server in a daemon thread. Returns the server object.

class MonitorServer[source]

Bases: object

Singleton entry point for monitoring.

Holds MonitorStorage, runs MetricServer in a thread, forwards all stats/status calls to storage. Call run() with server config (port, errors, service info) to start the metrics HTTP server. Use get_instance() to get the singleton from decorators or app code.

Example:

>>> server = MonitorServer()
>>> server.run(server_config={"port": 9999, "errors": {...}}, max_workers=4)
>>> await server.inc_stats("request___method__MyMethod")
static get_instance()[source]
static get_time()[source]
async wait_for_ending_tasks()[source]

Wait until no tasks are in progress. Use: await server.wait_for_ending_tasks().

__init__()[source]
set_stat_sync(stat_key, value)[source]

Set one stat key (sync, for init from sync context).

run(server_config=None, max_workers=1, hpa_max_workers=None)[source]
async inc_stats(stat_key, value=1)[source]
async set_task_status(status, task_type='default')[source]
async append_stats(stat_key, stat_value, max_count=-1)[source]
async add_key_value(section_key, key_value_tuple)[source]
async del_key_value(section_key, key_to_remove)[source]
async get_stats()[source]
async get_statuses()[source]