Source code for sbp.client.loggers.base_logger

# Copyright (C) 2015 Swift Navigation Inc.
# Contact: Mark Fine <mark@swiftnav.com>
#
# This source is subject to the license found in the file 'LICENSE' which must
# be be distributed together with this source. All other rights reserved.
#
# THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.

from ...table import dispatch
from construct.core import ConstructError
import warnings


[docs]class BaseLogger(object): """ BaseLogger The :class:`BaseLogger` provides log helper functions and context management. Parameters ---------- handle : filehandle File to log to. """ def __init__(self, handle, tags={}, dispatcher=None): self.handle = handle self.dispatcher = dispatcher self.tags = tags def __enter__(self): return self def __exit__(self, *args): self.flush() self.close()
[docs] def flush(self): self.handle.flush()
[docs] def close(self): self.handle.close()
[docs] def dispatch(self, msg): if self.dispatcher is None: return msg try: data = self.dispatcher(msg) except KeyError: warn = "Unknown message type" warnings.warn(warn, RuntimeWarning) data = msg except ConstructError: warn = "Bad message parsing" warnings.warn(warn, RuntimeWarning) data = msg return data
[docs]class LogIterator(object): """ LogIterator The :class: `LogIterator` provides an abstract interface for reading serialized logs of SBP data. Parameters ---------- handle : filehandle File to read SBP messages from. """ def __init__(self, handle, dispatcher=dispatch): self.handle = handle self.dispatcher = dispatcher def __enter__(self): return self def __exit__(self, *args): self.flush() self.close() def __iter__(self): return self
[docs] def flush(self): self.handle.flush()
[docs] def close(self): self.handle.close()
[docs] def dispatch(self, msg, line=None): try: data = self.dispatcher(msg) except KeyError: warn = "Unknown message type" if line is not None: warn += " for line %s" % line warnings.warn(warn, RuntimeWarning) data = msg except ConstructError: warn = "Bad message parsing" if line is not None: warn += " for line %s" % line warnings.warn(warn, RuntimeWarning) data = msg return data
[docs] def next(self): """Return the next record tuple from the log file. If an unknown SBP message type is found, it'll return the raw SBP. If EOF, throws exception and then returns to start of file. Returns ------- (float, float, object) (elapsed msec since beginning of log, UTC timestamp, msg) """ raise NotImplementedError("next() not implemented!")