__all__ = [
"MetricId",
"Recorder",
"LogTransport",
"InMemoryTransport",
"RiemannTransport",
"CompositeTransport",
"Gauge",
"Range",
"Counter",
"Timer",
"Metrics",
]
import collections
import json
import logging
import timeit
from collections import defaultdict, namedtuple
from riemann_client.client import Client
from riemann_client.riemann_pb2 import Msg
from riemann_client.transport import TCPTransport
from ._deprecation import deprecated
MetricId = namedtuple("MetricId", "name tags attributes")
MetricId.__doc__ = "Used to uniquely identify a metric"
MetricId.name.__doc__ = "(str): The 'service' of the metric."
MetricId.tags.__doc__ = "(List[str]): A list of string tags associated with the metric."
MetricId.attributes.__doc__ = (
"(Dict[str, Any]): Arbitrary key-value pairs associated with the metric"
)
class Metric:
""" A single measurement.
Args:
service (str): The name of the metric.
value (SupportsFloat): The numeric value recorded.
ttl (Optional[int]): The time-to-live of the metric in seconds.
tags (List[str]): A list of string tags to apply to the metric.
fields (Dict[str, Any]): A set of key-value pairs to apply to the metric.
"""
def __init__(self, service, value, ttl, tags, fields):
self.tags = tags or []
self.value = value
self.name = service
self.ttl = ttl
self.id = self._id(self.name, self.tags, fields)
self.attributes = {str(k): str(v) for k, v in fields.items()}
def _id(self, service_name, tags, fields):
return MetricId(service_name, frozenset(tags), frozenset(fields.items()))
[docs]class Recorder:
"""
Collects metrics and flushes them to a transport
"""
[docs] def send(self, metric, value, transport, suffix=""):
ttl = metric.ttl
event = {
"tags": metric.tags,
"attributes": metric.attributes,
"service": metric.name + suffix,
"metric_f": value,
}
if ttl is not None:
event["ttl"] = ttl
transport.send_event(event)
class Transport:
""" Sends metrics for storage or processing
"""
def send_event(self, event):
""" Buffer a single event for sending.
"""
pass
def flush(self, is_closing):
""" Send all buffered metrics
Args:
is_closing (bool): True if the transport should tear down any resources,
eg. Sockets or file handles.
"""
[docs]class LogTransport(Transport):
"""
Simple Transport that sprints metrics to the log. Useful for development
environments
"""
def __init__(self):
self._logger = logging.getLogger("metrics")
[docs] def send_event(self, event):
self._logger.info(
"metric %s=%s (%s)",
event["service"],
event["metric_f"],
json.dumps(event.get("attributes")),
)
[docs] def flush(self, is_closing):
pass
[docs]class InMemoryTransport(Transport):
"""
Dummy transport that keeps a copy of the last flushed batch of events. This
is used to store the data for the stats endpoints.
"""
def __init__(self):
self.current_batch = []
self.last_batch = []
[docs] def send_event(self, event):
self.current_batch.append(event)
[docs] def flush(self, is_closing):
self.last_batch = list(self.current_batch)
self.current_batch = []
[docs]class RiemannTransport(Transport):
"""
Transport that sends metrics to a Riemann server.
"""
def __init__(self, host="localhost", port="5555", timeout=5):
self.host = host
self.port = port
self.transport = TCPTransport(self.host, self.port, timeout)
self._new_message()
self._connected = False
[docs] def send_event(self, event):
riemann_event = Client.create_event(event)
self._message.events.add().MergeFrom(riemann_event)
def _connect(self):
self.transport.connect()
self._connected = True
def _disconnect(self):
self.transport.disconnect()
self._connected = False
def _ensure_connected(self):
# This is just to avoid logging about failure on the first try
if not self._connected:
self._connect()
[docs] def flush(self, is_closing):
self._ensure_connected()
try:
self.transport.send(self._message)
except Exception as e:
self._disconnect()
logging.warning("Failed to flush metrics to riemann")
try:
self._connect()
self.transport.send(self._message)
except Exception as e:
logging.error("Failed twice to flush metrics to riemann", exc_info=True)
if is_closing:
self._disconnect()
self._new_message()
def _new_message(self):
self._message = Msg()
[docs] @deprecated("Should be no need to check, will reconnect automatically")
def is_connected(self):
"""Check whether the transport is connected."""
return True
[docs]class CompositeTransport(Transport):
"""
Transport that wraps two or more transports and forwards events to all of
them.
"""
def __init__(self, *args):
self._transports = args
[docs] def send_event(self, event):
for t in self._transports:
t.send_event(event)
[docs] def flush(self, is_closing):
for t in self._transports:
t.flush(is_closing)
[docs]class Range(Recorder):
"""
Summarys record the range of a value across a set of datapoints,
eg response time, items cleared from cache, and forward aggregated
metrics to describe that range.
"""
def __init__(self, source):
self._source = source
self._reset()
def _reset(self):
self._metrics = defaultdict(list)
[docs] def record(self, service_name, value, ttl=None, tags=[], attributes=dict()):
if self._source:
attributes["source"] = self._source
metric = Metric(service_name, value, ttl, tags, attributes)
self._metrics[metric.id].append(metric)
[docs] def flush(self, transport):
for metric in self._metrics.values():
first = metric[0]
_min = first.value
_max = first.value
_mean = 0
_count = 0
_total = 0
for measurement in metric:
_count = _count + 1
_total = _total + measurement.value
_max = max(_max, measurement.value)
_min = min(_min, measurement.value)
_mean = _total / _count
self.send(first, _min, transport, ".min")
self.send(first, _max, transport, ".max")
self.send(first, _mean, transport, ".mean")
self.send(first, _count, transport, ".count")
self._reset()
[docs]class Counter(Recorder):
"""
Counters record incrementing or decrementing values, eg. Events Processed,
error count, cache hits.
"""
def __init__(self, source):
self._source = source
self._counters = collections.defaultdict(list)
[docs] def record(self, service_name, value, ttl, tags, attributes):
if self._source:
attributes["source"] = self._source
metric = Metric(service_name, value, ttl, tags, attributes)
self._counters[metric.id].append(metric)
[docs] def flush(self, transport):
for counter in self._counters.values():
count = sum(m.value for m in counter)
self.send(counter[0], count, transport)
self._counters = defaultdict(list)
[docs]class Gauge(Recorder):
"""
Gauges record scalar values at a single point in time, eg. queue size,
active sessions, and forward only the latest value.
"""
def __init__(self, source):
self._source = source
self._gauges = dict()
[docs] def record(self, service_name, value, ttl, tags, attributes):
if self._source:
attributes["source"] = self._source
metric = Metric(service_name, value, ttl, tags, attributes)
self._gauges[metric.id] = metric
[docs] def flush(self, transport):
for gauge in self._gauges.values():
self.send(gauge, gauge.value, transport)
self._gauges = dict()
[docs]class Timer:
"""
Timers provide a context manager that times an operation and records a
gauge with the elapsed time.
"""
def __init__(self, service_name, ttl, tags, attributes, histogram):
self.service_name = service_name
self.ttl = ttl
self.tags = tags
self.attributes = attributes
self.recorder = histogram
def __enter__(self):
self.start = timeit.default_timer()
def __exit__(self, exc_type, exc_value, exc_traceback):
elapsed = timeit.default_timer() - self.start
self.recorder.record(
self.service_name, elapsed, self.ttl, self.tags, self.attributes
)
[docs]class Metrics:
"""Buffers metrics and forwards them to a :class:`~striemann.metrics.Transport`
Args:
transport(Transport): The transport used to send metrics.
source(Optional[str]): If provided, this value will be added to all outbound metrics
as the `source` attribute. The value may still be overridden on a
per-metric basis.
Examples:
>>> from striemann.metrics import InMemoryTransport, Metrics
>>> import pprint
>>>
>>> pp = pprint.PrettyPrinter(indent=4)
>>>
>>> transport = InMemoryTransport()
>>> metrics = Metrics(transport)
>>>
>>> metrics.incrementCounter("Burgers sold")
>>> metrics.flush()
>>> print(transport.last_batch)
[{'tags': [], 'attributes': {}, 'service': 'metrics written', 'metric_f': 1}]
"""
def __init__(self, transport, source=None):
self._transport = transport
self._gauges = Gauge(source)
self._ranges = Range(source)
self._counters = Counter(source)
[docs] def recordGauge(self, service_name, value, ttl=None, tags=None, **kwargs):
""" Record a single scalar value, eg. Queue Depth, Current Uptime, Disk Free
Args:
service_name (str): The name of the recorded metric.
value (SupportsFloat): The numeric value to record.
ttl (Optional[int]): An optional time-to-live for the metric, measured in seconds.
tags (Optional[List[str]]): A list of strings to associate with the metric.
**kwargs (Any): Additional key-value pairs to associate with the metric.
Examples:
In the simplest case, we just want to record a single number.
Each time we record a Gauge, we replace the previous value.
>>> metrics.recordGauge("Customers in restaurant", 10)
>>> metrics.recordGauge("Customers in restaurant", 8)
>>> metrics.flush()
>>> print(transport.last_batch)
[{'tags': [], 'attributes': {}, 'service': 'Customers in restaurant', 'metric_f': 8}]
We might want to segregate our metrics by some other attribute so
so that we can aggregate and drill-down.
>>> metrics.recordGauge("Customers in restaurant", 6, hair="brown")
>>> metrics.recordGauge("Customers in restaurant", 2, hair="blonde")
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[ { 'attributes': {'hair': 'blonde'},
'metric_f': 2,
'service': 'Customers in restaurant',
'tags': []},
{ 'attributes': {'hair': 'brown'},
'metric_f': 6,
'service': 'Customers in restaurant',
'tags': []}]
"""
self._gauges.record(service_name, value, ttl, tags, kwargs)
[docs] def incrementCounter(self, service_name, value=1, ttl=None, tags=None, **kwargs):
""" Record an increase in a value, eg. Cache Hits, Files Written
Args:
service_name (str): The name of the recorded metric.
value (SupportsFloat): The numeric value to record.
ttl (Optional[int]): An optional time-to-live for the metric, measured in seconds.
tags (Optional[List[str]]): A list of strings to associate with the metric.
**kwargs (Any): Additional key-value pairs to associate with the metric.
Examples:
Counters are useful when we don't know an absolute value, but we want
to record that something happened.
>>> metrics.incrementCounter("Burgers sold")
>>> metrics.incrementCounter("Burgers sold")
>>> metrics.incrementCounter("Burgers sold", value=2)
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[{'attributes': {}, 'metric_f': 4, 'service': 'Burgers sold', 'tags': []}]
Counters reset after each flush.
>>> metrics.incrementCounter("Burgers sold")
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[{'attributes': {}, 'metric_f': 1, 'service': 'Burgers sold', 'tags': []}]
Counters can have tags and attributes associated with them.
Each unique set of tags and attributes is flushed as a separate metric.
>>> metrics.incrementCounter("Burgers sold", tags=["drive-thru"], name="cheeseburger")
>>> metrics.incrementCounter("Burgers sold", name="cheeseburger")
>>> metrics.incrementCounter("Burgers sold", value=2, name="whopper")
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[ { 'attributes': {'name': 'cheeseburger'},
'metric_f': 1,
'service': 'Burgers sold',
'tags': ['drive-thru']},
{ 'attributes': {'name': 'cheeseburger'},
'metric_f': 1,
'service': 'Burgers sold',
'tags': []},
{ 'attributes': {'name': 'whopper'},
'metric_f': 2,
'service': 'Burgers sold',
'tags': []}]
"""
self._counters.record(service_name, value, ttl, tags, kwargs)
[docs] def decrementCounter(self, service_name, value=1, ttl=None, tags=None, **kwargs):
""" Record an decrease in a value, eg. Cache Hits, Files Written
Args:
service_name (str): The name of the recorded metric.
value (SupportsFloat): The numeric value to record.
ttl (Optional[int]): An optional time-to-live for the metric, measured in seconds.
tags (Optional[List[str]]): A list of strings to associate with the metric.
**kwargs (Any): Additional key-value pairs to associate with the metric.
Examples:
Occasionally, we want to record a decrease in a value.
>>> metrics.incrementCounter("Customers waiting")
>>> metrics.incrementCounter("Customers waiting")
>>>
>>> metrics.decrementCounter("Customers waiting")
>>>
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[{'attributes': {}, 'metric_f': 1, 'service': 'Customers waiting', 'tags': []}]
"""
self._counters.record(service_name, 0 - value, ttl, tags, kwargs)
[docs] def time(self, service_name, ttl=None, tags=None, **kwargs):
""" Record the time taken for an operation.
The time method returns a context manager that can be used for timing
an operation. The timer uses the `default timer<https://docs.python.org/2/library/timeit.html#timeit.default_timer>_` for the operating system.
Under the hood, the time method uses a :class:`~striemann.metrics.Range`
to record its values.
Args:
service_name (str): The name of the recorded metric.
value (SupportsFloat): The numeric value to record.
ttl (Optional[int]): An optional time-to-live for the metric, measured in seconds.
tags (Optional[List[str]]): A list of strings to associate with the metric.
**kwargs (Any): Additional key-value pairs to associate with the metric.
Examples:
Since timers use a :class:`striemann.metrics.Range` to record their values
they send a summary of all the values recorded since the last flush.
>>> import time
>>> with metrics.time("Burger Cooking Time"):
>>> time.sleep(1)
>>> with metrics.time("Burger Cooking Time"):
>>> time.sleep(5)
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[ { 'attributes': {},
'metric_f': 1.0011436779996075,
'service': 'Burger cooking time.min',
'tags': []},
{ 'attributes': {},
'metric_f': 5.00513941600002,
'service': 'Burger cooking time.max',
'tags': []},
{ 'attributes': {},
'metric_f': 3.0031415469998137,
'service': 'Burger cooking time.mean',
'tags': []},
{ 'attributes': {},
'metric_f': 2,
'service': 'Burger cooking time.count',
'tags': []}]
Timers respect tags and attributes when aggregating.
>>> with metrics.time("Burger Cooking Time", type="whopper"):
>>> time.sleep(1)
>>> with metrics.time("Burger Cooking Time", type="cheeseburger"):
>>> time.sleep(5)
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[ { 'attributes': {'type': 'whopper'},
'metric_f': 1.001190301999486,
'service': 'Burger cooking time.min',
'tags': []},
{ 'attributes': {'type': 'whopper'},
'metric_f': 1.001190301999486,
'service': 'Burger cooking time.max',
'tags': []},
{ 'attributes': {'type': 'whopper'},
'metric_f': 1.001190301999486,
'service': 'Burger cooking time.mean',
'tags': []},
{ 'attributes': {'type': 'whopper'},
'metric_f': 1,
'service': 'Burger cooking time.count',
'tags': []},
{ 'attributes': {'type': 'cheeseburger'},
'metric_f': 5.005140869999195,
'service': 'Burger cooking time.min',
'tags': []},
{ 'attributes': {'type': 'cheeseburger'},
'metric_f': 5.005140869999195,
'service': 'Burger cooking time.max',
'tags': []},
{ 'attributes': {'type': 'cheeseburger'},
'metric_f': 5.005140869999195,
'service': 'Burger cooking time.mean',
'tags': []},
{ 'attributes': {'type': 'cheeseburger'},
'metric_f': 1,
'service': 'Burger cooking time.count',
'tags': []}]
"""
return Timer(service_name, ttl, tags, kwargs, self._ranges)
[docs] def recordRange(self, service_name, value, ttl=None, tags=[], **kwargs):
""" Record statistics about a range of values
Ranges are useful when we care about a metric in aggregate rather than
recording each individual event. When flushed, a Range sends the minimum,
maximum, and mean of each recorded metric, and the count of values recorded.
Args:
service_name (str): The name of the recorded metric.
value (SupportsFloat): The numeric value to record.
ttl (Optional[int]): An optional time-to-live for the metric, measured in seconds.
tags (Optional[List[str]]): A list of strings to associate with the metric.
**kwargs (Any): Additional key-value pairs to associate with the metric.
Examples:
Ranges are useful when we want to know the distribution of a value.
They're used by the :meth:`~striemann.metrics.Metrics.time` method internally.
>>> metrics.recordRange("Customer height", 163)
>>> metrics.recordRange("Customer height", 185)
>>> metrics.recordRange("Customer height", 134)
>>> metrics.recordRange("Customer height", 158)
>>> metrics.recordRange("Customer height", 170)
>>>
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[ { 'attributes': {},
'metric_f': -185,
'service': 'Customer height.min',
'tags': []},
{ 'attributes': {},
'metric_f': -134,
'service': 'Customer height.max',
'tags': []},
{ 'attributes': {},
'metric_f': -162.0,
'service': 'Customer height.mean',
'tags': []},
{ 'attributes': {},
'metric_f': 5,
'service': 'Customer height.count',
'tags': []}]
Ranges respect tags an attributes when aggregating their values.
>>> metrics.recordRange("Customer height", 163, sex='female')
>>> metrics.recordRange("Customer height", 185, sex='male')
>>> metrics.recordRange("Customer height", 134, tags='child', sex='male')
>>> metrics.recordRange("Customer height", 158, sex='female')
>>> metrics.recordRange("Customer height", 170, sex='male')
>>>
>>> metrics.flush()
>>> pp.pprint(transport.last_batch)
[ { 'attributes': {'sex': 'female'},
'metric_f': 158,
'service': 'Customer height.min',
'tags': []},
{ 'attributes': {'sex': 'female'},
'metric_f': 163,
'service': 'Customer height.max',
'tags': []},
{ 'attributes': {'sex': 'female'},
'metric_f': 160.5,
'service': 'Customer height.mean',
'tags': []},
{ 'attributes': {'sex': 'female'},
'metric_f': 2,
'service': 'Customer height.count',
'tags': []},
{ 'attributes': {'sex': 'male'},
'metric_f': 170,
'service': 'Customer height.min',
'tags': []},
{ 'attributes': {'sex': 'male'},
'metric_f': 185,
'service': 'Customer height.max',
'tags': []},
{ 'attributes': {'sex': 'male'},
'metric_f': 177.5,
'service': 'Customer height.mean',
'tags': []},
{ 'attributes': {'sex': 'male'},
'metric_f': 2,
'service': 'Customer height.count',
'tags': []},
{ 'attributes': {'sex': 'male'},
'metric_f': 134,
'service': 'Customer height.min',
'tags': 'child'},
{ 'attributes': {'sex': 'male'},
'metric_f': 134,
'service': 'Customer height.max',
'tags': 'child'},
{ 'attributes': {'sex': 'male'},
'metric_f': 134.0,
'service': 'Customer height.mean',
'tags': 'child'},
{ 'attributes': {'sex': 'male'},
'metric_f': 1,
'service': 'Customer height.count',
'tags': 'child'}]
"""
self._ranges.record(service_name, value, ttl, tags, kwargs)
[docs] def flush(self, is_closing=False):
""" Flush all metrics to the underlying transport.
Args:
is_closing (bool): True if the transport should be shut down.
"""
self._gauges.flush(self._transport)
self._counters.flush(self._transport)
self._ranges.flush(self._transport)
self._transport.flush(is_closing)