Source code for sbp.client.handler

# Copyright (C) 2015 Swift Navigation Inc.
# Contact: Mark Fine <mark@swiftnav.com>
#
# This source is subject to the license found in the file 'LICENSE' which must
# be be distributed together with this source. All other rights reserved.
#
# THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
"""
The :mod:`sbp.client.handler` module contains classes related to
SBP message handling.
"""

import collections
import threading
import weakref
from Queue import Queue


[docs]class Handler(object): """ Handler The :class:`Handler` class provides an interface for connecting handlers to a driver providing SBP messages. Also provides queued and filtered iterators for synchronous, blocking use in other threads. Parameters ---------- source : Iterable of tuple(SBP message, {'time':'ISO 8601 str'}) Stream of SBP messages """ def __init__(self, source): self._source = source self._callbacks = collections.defaultdict(set) self._receive_thread = threading.Thread( target=self._recv_thread, name="Handler") self._receive_thread.daemon = True self._sinks = [] # This is a list of weakrefs to upstream iterators self._dead = False self._write_lock = threading.Lock() def _recv_thread(self): """ Internal thread to iterate over source messages and dispatch callbacks. """ for msg, metadata in self._source: if msg.msg_type: self._call(msg, **metadata) # Break any upstream iterators for sink in self._sinks: i = sink() if i is not None: i.breakiter() self._dead = True def __enter__(self): self.start() return self def __exit__(self, *args): self.stop() # This exception is raised when a message is dispatched to a garbage # collected upstream iterator. class _DeadCallbackException(Exception): pass
[docs] def filter(self, msg_type=None, maxsize=0): """ Get a filtered iterator of messages for synchronous, blocking use in another thread. """ if self._dead: return iter(()) iterator = Handler._SBPQueueIterator(maxsize) # We use a weakref so that the iterator may be garbage collected if it's # consumer no longer has a reference. ref = weakref.ref(iterator) self._sinks.append(ref) def feediter(msg, **metadata): i = ref() if i is not None: i(msg, **metadata) else: raise Handler._DeadCallbackException self.add_callback(feediter, msg_type) return iterator
def __iter__(self): """ Get a queued iterator that will provide the same unfiltered messages read from the source iterator. """ return self.filter()
[docs] def add_callback(self, callback, msg_type=None): """ Add per message type or global callback. Parameters ---------- callback : fn Callback function msg_type : int | iterable Message type to register callback against. Default `None` means global callback. Iterable type adds the callback to all the message types. """ try: for mt in iter(msg_type): self._callbacks[mt].add(callback) except TypeError: self._callbacks[msg_type].add(callback)
[docs] def remove_callback(self, callback, msg_type=None): """ Remove per message type of global callback. Parameters ---------- callback : fn Callback function msg_type : int | iterable Message type to remove callback from. Default `None` means global callback. Iterable type removes the callback from all the message types. """ if msg_type is None: msg_type = self._callbacks.keys() try: for mt in iter(msg_type): try: self._callbacks[mt].remove(callback) except KeyError: pass except TypeError as e: self._callbacks[msg_type].remove(callback)
def _gc_dead_sinks(self): """ Remove any dead weakrefs. """ deadsinks = [] for i in self._sinks: if i() is None: deadsinks.append(i) for i in deadsinks: self._sinks.remove(i) def _get_callbacks(self, msg_type): """ Return all callbacks (global and per message type) for a message type. Parameters ---------- msg_type : int Message type to return callbacks for. """ return self._callbacks[None] | self._callbacks[msg_type] def _call(self, msg, **metadata): """ Process message with all callbacks (global and per message type). """ if msg.msg_type: for callback in self._get_callbacks(msg.msg_type): try: callback(msg, **metadata) except Handler._DeadCallbackException: # The callback was an upstream iterator that has been garbage # collected. Remove it from our internal structures. self.remove_callback(callback) self._gc_dead_sinks() except SystemExit: raise except: import traceback traceback.print_exc()
[docs] def start(self): """ Start processing SBP messages with handlers. """ self._receive_thread.start()
[docs] def stop(self): """ Stop processing SBP messages. """ try: self._source.breakiter() self._receive_thread.join(0.1) except: pass
[docs] def is_alive(self): """ Return whether the processes thread is alive. """ return self._receive_thread.is_alive()
[docs] def wait(self, msg_type, timeout=1.0): """ Wait for a SBP message. Parameters ---------- msg_type : int SBP message type. timeout : float Waiting period """ event = threading.Event() payload = {'data': None} def cb(sbp_msg, **metadata): payload['data'] = sbp_msg event.set() self.add_callback(cb, msg_type) event.wait(timeout) self.remove_callback(cb, msg_type) return payload['data']
[docs] def wait_callback(self, callback, msg_type=None, timeout=1.0): """ Wait for a SBP message with a callback. Parameters ---------- callback : fn Callback function msg_type : int | iterable Message type to register callback against. Default `None` means global callback. Iterable type adds the callback to all the message types. timeout : float Waiting period """ event = threading.Event() def cb(msg, **metadata): callback(msg, **metadata) event.set() self.add_callback(cb, msg_type) event.wait(timeout) self.remove_callback(cb, msg_type)
def __call__(self, msg, **metadata): with self._write_lock: self._source(msg, **metadata) class _SBPQueueIterator(object): """ Class for upstream iterators. Implements callable interface for adding messages into the queue, and iterable interface for getting them out. """ def __init__(self, maxsize): self._queue = Queue(maxsize) self._broken = False def __iter__(self): return self def __call__(self, msg, **metadata): self._queue.put((msg, metadata), False) def breakiter(self): self._broken = True self._queue.put(None, True, 1.0) def next(self): if self._broken and self._queue.empty(): raise StopIteration m = self._queue.get(True) if self._broken and m is None: raise StopIteration return m