Source code for sbp.client.loggers.json_logger

# Copyright (C) 2015 Swift Navigation Inc.
# Contact: Mark Fine <mark@swiftnav.com>
#
# This source is subject to the license found in the file 'LICENSE' which must
# be be distributed together with this source. All other rights reserved.
#
# THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.

from ...msg import SBP
from ...table import dispatch
from .base_logger import BaseLogger, LogIterator
import base64
import json
import warnings


[docs]class JSONLogger(BaseLogger): """ JSONLogger The :class:`JSONLogger` logs JSON records. """
[docs] def fmt_msg(self, data, **metadata): metadata.update(self.tags) metadata['data'] = data return metadata
[docs] def dump(self, msg, **metadata): try: data = self.dispatch(msg).to_json_dict() return json.dumps(self.fmt_msg(data, **metadata), allow_nan=False) except (ValueError, UnicodeDecodeError): try: warn = "Bad values in JSON encoding for msg_type %d for msg %s" \ % (msg.msg_type, msg) warnings.warn(warn, RuntimeWarning) return json.dumps(self.fmt_msg(msg.to_json_dict(), **metadata)) except (ValueError, UnicodeDecodeError): return None
def __call__(self, msg, **metadata): output = self.dump(msg, **metadata) if output: self.handle.write(output + "\n")
[docs]class JSONBinLogger(BaseLogger): """ JSONBinLogger The :class:`JSONLogger` logs JSON records without expanding the fields. """
[docs] def fmt_msg(self, data, **metadata): metadata.update(self.tags) metadata['data'] = data return metadata
[docs] def dump(self, msg, **metadata): try: data = { 'preamble': msg.preamble, 'msg_type': msg.msg_type, 'sender': msg.sender, 'length': msg.length, 'payload': base64.standard_b64encode(msg.payload), 'crc': msg.crc } return json.dumps(self.fmt_msg(data, **metadata), allow_nan=False) except (ValueError, UnicodeDecodeError): try: warn = "Bad values in JSON encoding for msg_type %d for msg %s" \ % (msg.msg_type, msg) warnings.warn(warn, RuntimeWarning) return json.dumps(self.fmt_msg(msg.to_json_dict(), **metadata)) except (ValueError, UnicodeDecodeError): return None
def __call__(self, msg, **metadata): output = self.dump(msg, **metadata) if output: self.handle.write(output + "\n")
[docs]class JSONLogIterator(LogIterator): """ JSONLogIterator The :class:`JSONLogIterator` is an iterator for reading JSON logs of SBP data. Parameters ---------- filename : string Path to file to read SBP messages from. """
[docs] def next(self): """ Return the next record tuple from log file containing JSON-serialized SBP. If an unknown SBP message type is found, it'll return the raw SBP. If EOF, throws exception and then returns to start of file. Returns ------- Tuple(sbp MSG object, {'time':'ISO 8601 time'}) Second item is for metadata. There used to be multiple fields and there could be more in the future. Notes ----- In practice it seems like a lot of times JSONLogIterator objects return iterators instead of tuples due to weird usage of the class. """ for line in self.handle: try: data = json.loads(line) item = SBP.from_json_dict(data.pop('data', data)) msg = self.dispatch(item, line) yield (msg, data) except (ValueError, UnicodeDecodeError): warn = "Bad JSON decoding for line %s" % line warnings.warn(warn, RuntimeWarning) self.handle.seek(0, 0) raise StopIteration