from __future__ import annotations
import asyncio
import collections
import sys
import threading
import time as timemod
from collections.abc import Callable, Hashable, Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from functools import wraps
from math import nan
from typing import Literal
import psutil
from distributed.compatibility import WINDOWS
_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())
def _psutil_caller(method_name, default=_empty_namedtuple):
"""
Return a function calling the given psutil *method_name*,
or returning *default* if psutil fails.
"""
meth = getattr(psutil, method_name)
@wraps(meth)
def wrapper(): # pragma: no cover
try:
return meth()
except RuntimeError:
# This can happen on some systems (e.g. no physical disk in worker)
return default()
return wrapper
disk_io_counters = _psutil_caller("disk_io_counters")
net_io_counters = _psutil_caller("net_io_counters")
class _WindowsTime:
"""Combine time.time() or time.monotonic() with time.perf_counter() to get an
absolute clock with fine resolution.
"""
base_timer: Callable[[], float]
delta: float
previous: float | None
next_resync: float
resync_every: float
def __init__(
self, base: Callable[[], float], is_monotonic: bool, resync_every: float = 600.0
):
self.base_timer = base
self.previous = float("-inf") if is_monotonic else None
self.next_resync = float("-inf")
self.resync_every = resync_every
def time(self) -> float:
cur = timemod.perf_counter()
if cur > self.next_resync:
self.resync()
self.next_resync = cur + self.resync_every
cur += self.delta
if self.previous is not None:
# Monotonic timer
if cur <= self.previous:
cur = self.previous + 1e-9
self.previous = cur
return cur
def resync(self) -> None:
_time = self.base_timer
_perf_counter = timemod.perf_counter
min_samples = 5
while True:
times = [(_time(), _perf_counter()) for _ in range(min_samples * 2)]
abs_times = collections.Counter(t[0] for t in times)
first, nfirst = abs_times.most_common()[0]
if nfirst < min_samples:
# System too noisy? Start again
continue
perf_times = [t[1] for t in times if t[0] == first][:-1]
assert len(perf_times) >= min_samples - 1, perf_times
self.delta = first - sum(perf_times) / len(perf_times)
break
# A high-resolution wall clock timer measuring the seconds since Unix epoch
if WINDOWS and sys.version_info < (3, 13):
time = _WindowsTime(timemod.time, is_monotonic=False).time
monotonic = _WindowsTime(timemod.monotonic, is_monotonic=True).time
else:
# Under modern Unixes, time.time() and time.monotonic() should be good enough
time = timemod.time
monotonic = timemod.monotonic
process_time = timemod.process_time
# Get a per-thread CPU timer function if possible, otherwise
# use a per-process CPU timer function.
try:
# thread_time is not supported on all platforms
thread_time = timemod.thread_time
except (AttributeError, OSError): # pragma: no cover
thread_time = process_time
@dataclass
class MeterOutput:
start: float
stop: float
delta: float
__slots__ = tuple(__annotations__)
[docs]
@contextmanager
def meter(
func: Callable[[], float] = timemod.perf_counter,
floor: float | Literal[False] = 0.0,
) -> Iterator[MeterOutput]:
"""Convenience context manager which calls func() before and after the wrapped
code and calculates the delta.
Parameters
----------
label: str
label to pass to the callback
func: callable
function to call before and after, which must return a number.
Besides time, it could return e.g. cumulative network traffic or disk usage.
Default: :func:`timemod.perf_counter`
floor: float or False, optional
Floor the delta to the given value (default: 0). This is useful for strictly
cumulative functions that can occasionally glitch and go backwards.
Set to False to disable.
Yields
------
:class:`MeterOutput` where the ``start`` attribute is populated straight away, while
``stop`` and ``delta`` are nan until context exit.
"""
out = MeterOutput(func(), nan, nan)
try:
yield out
finally:
out.stop = func()
out.delta = out.stop - out.start
if floor is not False:
out.delta = max(floor, out.delta)
class ContextMeter:
"""Context-based general purpose meter.
Usage
-----
1. In high level code, call :meth:`add_callback` to install a hook that defines an
activity
2. In low level code, typically many stack levels below, log quantitative events
(e.g. elapsed time, transferred bytes, etc.) so that they will be attributed to
the high-level code calling it, either with :meth:`meter`,
:meth:`meter_function`, or :meth:`digest_metric`.
Examples
--------
In the code that e.g. sends a Python object from A to B over the network:
>>> from distributed.metrics import context_meter
>>> with context_meter.add_callback(partial(print, "A->B comms:")):
... await send_over_the_network(obj)
In the serialization utilities, called many stack levels below:
>>> with context_meter.meter("dumps"):
... pik = pickle.dumps(obj)
>>> with context_meter.meter("compress"):
... pik = lz4.compress(pik)
And finally, elsewhere, deep into the TCP stack:
>>> with context_meter.meter("network-write"):
... await comm.write(frames)
When you call the top-level code, you'll get::
A->B comms: dumps 0.012 seconds
A->B comms: compress 0.034 seconds
A->B comms: network-write 0.567 seconds
"""
_callbacks: ContextVar[dict[Hashable, Callable[[Hashable, float, str], None]]]
def __init__(self):
self._callbacks = ContextVar(
f"MetricHook<{id(self)}>._callbacks", default={} # noqa: B039
)
def __reduce__(self):
assert self is context_meter, "Found copy of singleton"
return self._unpickle_singleton, ()
@staticmethod
def _unpickle_singleton():
return context_meter
@contextmanager
def add_callback(
self,
callback: Callable[[Hashable, float, str], None],
*,
key: Hashable | None = None,
allow_offload: bool = False,
) -> Iterator[None]:
"""Add a callback when entering the context and remove it when exiting it.
The callback must accept the same parameters as :meth:`digest_metric`.
Parameters
----------
callback: Callable
``f(label, value, unit)`` to be executed
key: Hashable, optional
Unique key for the callback. If two nested calls to ``add_callback`` use the
same key, suppress the outermost callback.
allow_offload: bool, optional
If set to True, this context must be executed inside a running asyncio
event loop. If a call to :meth:`digest_metric` is performed from a different
thread, e.g. from inside :func:`distributed.utils.offload`, ensure that
the callback is executed in the event loop's thread instead.
"""
if allow_offload:
loop = asyncio.get_running_loop()
tid = threading.get_ident()
def safe_cb(label: Hashable, value: float, unit: str, /) -> None:
if threading.get_ident() == tid:
callback(label, value, unit)
else: # We're inside offload()
loop.call_soon_threadsafe(callback, label, value, unit)
else:
safe_cb = callback
if key is None:
key = object()
cbs = self._callbacks.get()
cbs = cbs.copy()
cbs[key] = safe_cb
tok = self._callbacks.set(cbs)
try:
yield
finally:
tok.var.reset(tok)
@contextmanager
def clear_callbacks(self) -> Iterator[None]:
"""Do not trigger any callbacks set outside of this context"""
tok = self._callbacks.set({})
try:
yield
finally:
tok.var.reset(tok)
def digest_metric(self, label: Hashable, value: float, unit: str) -> None:
"""Invoke the currently set context callbacks for an arbitrary quantitative
metric.
"""
cbs = self._callbacks.get()
for cb in cbs.values():
cb(label, value, unit)
@contextmanager
def meter(
self,
label: Hashable,
unit: str = "seconds",
func: Callable[[], float] = timemod.perf_counter,
floor: float | Literal[False] = 0.0,
) -> Iterator[MeterOutput]:
"""Convenience context manager or decorator which calls func() before and after
the wrapped code, calculates the delta, and finally calls :meth:`digest_metric`.
If unit=='seconds', it also subtracts any other calls to :meth:`meter` or
:meth:`digest_metric` with the same unit performed within the context, so that
the total is strictly additive.
Parameters
----------
label: Hashable
label to pass to the callback
unit: str, optional
unit to pass to the callback. Default: seconds
func: callable
see :func:`meter`
floor: bool, optional
see :func:`meter`
Yields
------
:class:`MeterOutput` where the ``start`` attribute is populated straight away,
while ``stop`` and ``delta`` are nan until context exit. In case of multiple
nested calls to :meth:`meter`, then delta (for seconds only) is reduced by the
inner metrics, to a minimum of ``floor``.
"""
if unit != "seconds":
try:
with meter(func, floor=floor) as m:
yield m
finally:
self.digest_metric(label, m.delta, unit)
return
# If unit=="seconds", subtract time metered from the sub-contexts
offsets = []
def callback(label2: Hashable, value2: float, unit2: str) -> None:
if unit2 == unit:
# This must be threadsafe to support callbacks invoked from
# distributed.utils.offload; '+=' on a float would not be threadsafe!
offsets.append(value2)
try:
with self.add_callback(callback), meter(func, floor=False) as m:
yield m
finally:
delta = m.delta - sum(offsets)
if floor is not False:
delta = max(floor, delta)
m.delta = delta
self.digest_metric(label, delta, unit)
context_meter = ContextMeter()
class DelayedMetricsLedger:
"""Add-on to :class:`ContextMeter` that helps in the case where:
- The code to be metered is not easily expressed as a self-contained code block
e.g. you want to measure latency in the asyncio event loop before and after
running a task
- You want to alter the metrics depending on how the code ends; e.g. you want to
post them differently in case of failure.
Examples
--------
>>> ledger = DelayedMetricsLedger() # Metering starts here
>>> async def wrapper():
... with ledger.record():
... return await metered_function()
>>> task = asyncio.create_task(wrapper())
>>> # (later, elsewhere)
>>> try:
... await task
... coarse_time = False
... except Exception:
... coarse_time = "failed"
... raise
... finally:
... # Metering stops here
... for label, value, unit in ledger.finalize(coarse_time):
... # actually log metrics
"""
func: Callable[[], float]
start: float
metrics: list[tuple[Hashable, float, str]] # (label, value, unit)
def __init__(self, func: Callable[[], float] = timemod.perf_counter):
self.func = func
self.start = func()
self.metrics = []
def _callback(self, label: Hashable, value: float, unit: str) -> None:
self.metrics.append((label, value, unit))
@contextmanager
def record(self, *, key: Hashable | None = None) -> Iterator[None]:
"""Ingest metrics logged with :meth:`ContextMeter.digest_metric` or
:meth:`ContextMeter.meter` and temporarily store them in :ivar:`metrics`.
Parameters
----------
key: Hashable, optional
See :meth:`ContextMeter.add_callback`
"""
with context_meter.add_callback(self._callback, key=key):
yield
def finalize(
self,
coarse_time: str | Literal[False] = False,
floor: float | Literal[False] = 0.0,
) -> Iterator[tuple[Hashable, float, str]]:
"""The metered code is terminated, and we now know how to log it.
Parameters
----------
coarse_time: str | False, optional
False
Yield all acquired metrics, plus an extra time metric, labelled "other",
which is the time between creating the DelayedMetricsLedger and
calling this method, minus any time logged in the metrics.
label
Yield all acquired non-time metrics.
Yield a single metric, labelled <coarse_time>, which is the time
between creating the DelayedMetricsLedger and calling this method.
floor: float | False, optional
Floor either the "other" or the <coarse_time> metric to this value
(default: 0). Set to False to disable.
"""
stop = self.func()
delta = stop - self.start
for label, value, unit in self.metrics:
if unit != "seconds" or not coarse_time:
yield label, value, unit
if unit == "seconds" and not coarse_time:
delta -= value
if floor is not False:
delta = max(floor, delta)
yield coarse_time or "other", delta, "seconds"