"""Clients manage the main user facing API, and provide functions for sending
events and querying the Riemann server. UDP, TCP and TLS transports are
provided by the :py:mod:`riemann_client.transport` module, and the protocol
buffer objects are provided by the :py:mod:`riemann_client.riemann_pb2` module.
"""
from __future__ import absolute_import
import logging
try:
from logging import NullHandler
except ImportError:
# Create a NullHandler class in logging for python 2.6
class NullHandler(logging.Handler):
def emit(self, record):
pass
import socket
try:
from threading import RLock
from threading import Timer
except ImportError:
RLock = None
Timer = None
import time
import riemann_client.riemann_pb2
import riemann_client.transport
logger = logging.getLogger(__name__)
logger.addHandler(NullHandler())
[docs]class Client(object):
"""A client for sending events and querying a Riemann server.
Two sets of methods are provided - an API dealing directly with protocol
buffer objects and an extended API that takes and returns dictionaries
representing events.
Protocol buffer API:
- :py:meth:`.send_event`
- :py:meth:`.send_events`
- :py:meth:`.send_query`
Extended API:
- :py:meth:`.event`
- :py:meth:`.events`
- :py:meth:`.query`
Clients do not directly manage connections to a Riemann server - these are
managed by :py:class:`riemann_client.transport.Transport` instances, which
provide methods to read and write messages to the server. Client instances
can be used as a context manager, and will connect and disconnect the
transport when entering and exiting the context.
>>> with Client(transport) as client:
... # Calls transport.connect()
... client.query('true')
... # Calls transport.disconnect()
"""
def __init__(self, transport=None):
if transport is None:
transport = riemann_client.transport.TCPTransport()
self.transport = transport
def __enter__(self):
self.transport.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.transport.disconnect()
@staticmethod
[docs] def create_event(data):
"""Translates a dictionary of event attributes to an Event object
:param dict data: The attributes to be set on the event
:returns: A protocol buffer ``Event`` object
"""
event = riemann_client.riemann_pb2.Event()
event.host = socket.gethostname()
event.tags.extend(data.pop('tags', []))
for key, value in data.pop('attributes', {}).items():
attribute = event.attributes.add()
attribute.key, attribute.value = key, value
for name, value in data.items():
if value is not None:
setattr(event, name, value)
return event
[docs] def send_events(self, events):
"""Sends multiple events to Riemann in a single message
:param events: A list or iterable of ``Event`` objects
:returns: The response message from Riemann
"""
message = riemann_client.riemann_pb2.Msg()
for event in events:
message.events.add().MergeFrom(event)
return self.transport.send(message)
[docs] def send_event(self, event):
"""Sends a single event to Riemann
:param event: An ``Event`` protocol buffer object
:returns: The response message from Riemann
"""
return self.send_events((event,))
[docs] def events(self, *events):
"""Sends multiple events in a single message
>>> client.events({'service': 'riemann-client', 'state': 'awesome'})
:param \*events: event dictionaries for :py:func:`create_event`
:returns: The response message from Riemann
"""
return self.send_events(self.create_event(e) for e in events)
[docs] def event(self, **data):
"""Sends an event, using keyword arguments to create an Event
>>> client.event(service='riemann-client', state='awesome')
:param \*\*data: keyword arguments used for :py:func:`create_event`
:returns: The response message from Riemann
"""
return self.send_event(self.create_event(data))
@staticmethod
[docs] def create_dict(event):
"""Translates an Event object to a dictionary of event attributes
All attributes are included, so ``create_dict(create_event(input))``
may return more attributes than were present in the input.
:param event: A protocol buffer ``Event`` object
:returns: A dictionary of event attributes
"""
data = dict()
for descriptor, value in event.ListFields():
if descriptor.name == 'tags':
value = list(value)
elif descriptor.name == 'attributes':
value = dict(((a.key, a.value) for a in value))
data[descriptor.name] = value
return data
[docs] def send_query(self, query):
"""Sends a query to the Riemann server
:returns: The response message from Riemann
"""
message = riemann_client.riemann_pb2.Msg()
message.query.string = query
return self.transport.send(message)
[docs] def query(self, query):
"""Sends a query to the Riemann server
>>> client.query('true')
:returns: A list of event dictionaries taken from the response
:raises Exception: if used with a :py:class:`.UDPTransport`
"""
if isinstance(self.transport, riemann_client.transport.UDPTransport):
raise Exception('Cannot query the Riemann server over UDP')
response = self.send_query(query)
return [self.create_dict(e) for e in response.events]
[docs]class QueuedClient(Client):
"""A Riemann client using a queue that can be used to batch send events.
A message object is used as a queue, with the :py:meth:`.send_event` and
:py:meth:`.send_events` methods adding new events to the message and the
:py:meth:`.flush` sending the message.
"""
def __init__(self, transport=None):
super(QueuedClient, self).__init__(transport)
self.clear_queue()
[docs] def flush(self):
"""Sends the waiting message to Riemann
:returns: The response message from Riemann
"""
response = self.transport.send(self.queue)
self.clear_queue()
return response
[docs] def send_event(self, event):
"""Adds a single event to the queued message
:returns: None - nothing has been sent to the Riemann server yet
"""
self.send_events((event,))
return None
[docs] def send_events(self, events):
"""Adds multiple events to the queued message
:returns: None - nothing has been sent to the Riemann server yet
"""
for event in events:
self.queue.events.add().MergeFrom(event)
return None
[docs] def clear_queue(self):
"""Resets the message/queue to a blank :py:class:`.Msg` object"""
self.queue = riemann_client.riemann_pb2.Msg()
if RLock and Timer: # noqa
[docs] class AutoFlushingQueuedClient(QueuedClient):
"""A Riemann client using a queue and a timer that will automatically
flush its contents if either:
- the queue size exceeds :param max_batch_size: or
- more than :param max_delay: has elapsed since the last flush and
the queue is non-empty.
if :param stay_connected: is False, then the transport will be
disconnected after each flush and reconnected at the beginning of
the next flush.
if :param clear_on_fail: is True, then the client will discard its
buffer after the second retry in the event of a socket error.
A message object is used as a queue, and the following methods are
given:
- :py:meth:`.send_event` - add a new event to the queue
- :py:meth:`.send_events` add a tuple of new events to the queue
- :py:meth:`.event` - add a new event to the queue from
keyword arguments
- :py:meth:`.events` - add new events to the queue from
dictionaries
- :py:meth:`.flush` - manually force flush the queue to the
transport
"""
def __init__(self, transport, max_delay=0.5, max_batch_size=100,
stay_connected=False, clear_on_fail=False):
super(AutoFlushingQueuedClient, self).__init__(transport)
self.stay_connected = stay_connected
self.clear_on_fail = clear_on_fail
self.max_delay = max_delay
self.max_batch_size = max_batch_size
self.lock = RLock()
self.event_counter = 0
self.last_flush = time.time()
self.timer = None
# start the timer
self.start_timer()
[docs] def connect(self):
"""Connect the transport if it is not already connected."""
if not self.is_connected():
self.transport.connect()
[docs] def is_connected(self):
"""Check whether the transport is connected."""
try:
# this will throw an exception whenever socket isn't connected
self.transport.socket.type
return True
except (AttributeError, RuntimeError, socket.error):
return False
[docs] def event(self, **data):
"""Enqueues an event, using keyword arguments to create an Event
>>> client.event(service='riemann-client', state='awesome')
:param \*\*data: keyword arguments used for :py:func:`create_event`
"""
self.send_events((self.create_event(data),))
[docs] def events(self, *events):
"""Enqueues multiple events in a single message
>>> client.events({'service': 'riemann-client',
>>> 'state': 'awesome'})
:param \*events: event dictionaries for :py:func:`create_event`
:returns: The response message from Riemann
"""
self.send_events(self.create_event(evd) for evd in events)
[docs] def send_events(self, events):
"""Enqueues multiple events
:param events: A list or iterable of ``Event`` objects
:returns: The response message from Riemann
"""
with self.lock:
for event in events:
self.queue.events.add().MergeFrom(event)
self.event_counter += 1
self.check_for_flush()
[docs] def flush(self):
"""Sends the events in the queue to Riemann in a single protobuf
message
:returns: The response message from Riemann
"""
response = None
with self.lock:
if not self.is_connected():
self.connect()
try:
response = super(AutoFlushingQueuedClient, self).flush()
except socket.error:
# log and retry
logger.warn("Socket error on flushing. "
"Attempting reconnect and retry...")
try:
self.transport.disconnect()
self.connect()
response = (
super(AutoFlushingQueuedClient, self).flush())
except:
logger.warn("Socket error on flushing "
"second attempt. Batch discarded.")
self.transport.disconnect()
if self.clear_on_fail:
self.clear_queue()
self.event_counter = 0
if not self.stay_connected:
self.transport.disconnect()
self.last_flush = time.time()
self.start_timer()
return response
[docs] def check_for_flush(self):
"""Checks the conditions for flushing the queue"""
if (self.event_counter >= self.max_batch_size or
(time.time() - self.last_flush) >= self.max_delay):
self.flush()
[docs] def start_timer(self):
"""Cycle the timer responsible for periodically flushing the queue
"""
if self.timer:
self.timer.cancel()
self.timer = Timer(self.max_delay, self.check_for_flush)
self.timer.daemon = True
self.timer.start()
[docs] def stop_timer(self):
"""Stops the current timer
a :py:meth:`.flush` event will reactviate the timer
"""
self.timer.cancel()