Source code for sbp.client.drivers.pyserial_driver

# Copyright (C) 2015 Swift Navigation Inc.
# Contact: Mark Fine <mark@swiftnav.com>
#
# This source is subject to the license found in the file 'LICENSE' which must
# be be distributed together with this source. All other rights reserved.
#
# THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.

from .base_driver import BaseDriver
import serial
import serial.tools.list_ports
try:
    import termios
    SerialError = termios.error
except ImportError:
    SerialError = None


[docs]class PySerialDriver(BaseDriver): """ PySerialDriver The :class:`PySerialDriver` class reads SBP messages from a serial port using the pyserial driver. This is mostly redundant, is the Serial object's read and write methods can be used directly. Parameters ---------- port : string URI to port to read SBP messages from. Accepts the following types of URLs: - rfc2217://<host>:<port>[/<option>[/<option>]] - socket://<host>:<port>[/<option>[/<option>]] - loop://[<option>[/<option>]] and device names, such as /dev/ttyUSB0 (Linux) and COM3 (Windows). See http://pyserial.sourceforge.net/pyserial_api.html#urls for more details. baud : int Baud rate of serial port (defaults to 115200) """ def __init__(self, port, baud=115200, rtscts=False): import serial try: handle = serial.serial_for_url(port) handle.baudrate = baud handle.timeout = 1 handle.rtscts = rtscts super(PySerialDriver, self).__init__(handle) except (OSError, serial.SerialException) as e: print print "Error opening serial device '%s':" % port print e print print "The following serial devices were detected:" print for (name, desc, _) in serial.tools.list_ports.comports(): if desc[0:4] == "ttyS": continue if name == desc: print "\t%s" % name else: print "\t%s (%s)" % (name, desc) print raise SystemExit
[docs] def read(self, size): """ Read wrapper. Parameters ---------- size : int Number of bytes to read. """ try: return self.handle.read(size) except (OSError, serial.SerialException): print print "Piksi disconnected" print raise IOError
[docs] def write(self, s): """ Write wrapper. Parameters ---------- s : bytes Bytes to write """ try: return self.handle.write(s) except (OSError, serial.SerialException, serial.writeTimeoutError) as e: if e == serial.writeTimeoutError: print "sbp pyserial_driver: writeTimeoutError" return 0 else: print print "Piksi disconnected" print raise IOError
def __enter__(self): self.flush() return self def __exit__(self, *args): try: self.flush() self.close() except (OSError, SerialError, serial.SerialException) as e: pass