Source code for PyLabware.connections

"""PyLabware connection adapters."""

import logging
import socket
import sys
import threading
from abc import ABC, abstractmethod
from time import sleep, time
from urllib.parse import urljoin
from typing import Any, Dict

import requests
import serial

from .exceptions import PLConnectionError, PLConnectionProtocolError, PLConnectionTimeoutError
from .models import LabDeviceReply, ConnectionParameters


[docs] class AbstractConnection(ABC): """Base abstract class for all connection adapters. """ DEFAULT_CONNECTION_PARAMETERS = { # Default connection address "address": None, # Connection port "port": None, # Default encoding for text data "encoding": "UTF-8", # command_delay serves as a flow control to ensure that device has time # to process the command before issuing another one.This is not an issue # for high-level transports, such as HTTP, because network stack latency # adds some delay, but it is a problem for low-level protocols like # RS-232 or RS-485 if hardware flow-control is not implemented. "command_delay": 0.5, # Buffer size for reading incoming data, bytes # Typically, serial communiation with devices is reply based rather # than stream based. However, not all devices behave themselves, # replying with properly terminated strings, so it becomes unreliable to # detect end of message here on transport level. Thus connection operates # as a stream device, quickly reading data back in fixed-size chunks and # passing them to upper level (controller) for assembly "receive_buffer_size": 128, # Receive timeout in seconds. "receive_timeout": 1, # Transmit timeout in seconds "transmit_timeout": 1, # Delay for the connection listener loop to check for new data # TODO maybe this can be replaced by a multiple of receive_timeout "receiving_interval": 0.05 } # type: ConnectionParameters @abstractmethod def __init__(self, connection_parameters: ConnectionParameters): """ Args: connection_parameters: Dictionary with connection settings relevant for the concrete type of connection. """ # Get logger object self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__) # Merge default dict with provided dict to give connection parameter dict self.connection_parameters = {**self.DEFAULT_CONNECTION_PARAMETERS, **connection_parameters} self.logger.debug("Creating connection object with the following settings: \n%s", self.connection_parameters) # Empty connection object self._connection: Any = None # Lock for thread-safe access to connection object self._connection_lock = threading.Lock() # Flag for thread-safe access to received data self._data_ready = threading.Event() self._data_ready.clear() # Last reply from the device self._last_reply = "" # Time when last command was sent to the device self._last_command_time = time() # Common connection parameters self.encoding = self.connection_parameters["encoding"] self.receive_buffer_size = self.connection_parameters["receive_buffer_size"] self.command_delay = self.connection_parameters["command_delay"] self.transmit_timeout = self.connection_parameters["transmit_timeout"] self.receive_timeout = self.connection_parameters["receive_timeout"]
[docs] @abstractmethod def open_connection(self): """Opens the connection. This method should create self._connection object, set up necessary parameters and open connection. Connection object creation shouldn't be done in __init__(), otherwise the connection object instance might not be reusable after close()-open() sequence. This method has to be redefined in child classes. """
[docs] @abstractmethod def close_connection(self): """Closes the connection. This method has to be redefined in child classes. """
[docs] @abstractmethod def is_connection_open(self): """Checks whether the connection is open. This method has to be redefined in child classes. Returns: (bool): True if connection is open, False if connection is closed. """
[docs] @abstractmethod def transmit(self, msg: str): """Transmits the data to the device. This method has to be redefined in child classes. Arguments: msg (str): Data to send. """
[docs] @abstractmethod def receive(self): """Receives data from the device This method has to be redefined in child classes. Returns: (str): Data from the device. """
def _clear_data_buffer(self): """Debug method to remove accidentally stuck data from the buffer between connection closing/re-opening. """ self._data_ready.clear() self._last_reply = ""
[docs] class SerialConnection(AbstractConnection): """Serial connection adapter. """ SERIAL_DEFAULT_CONNECTION_PARAMETERS = { "write_timeout": 0.5, "baudrate": 9600, "bytesize": serial.EIGHTBITS, "parity": serial.PARITY_NONE, "stopbits": serial.STOPBITS_ONE, "xonxoff": False, "rtscts": False, "dsrdtr": False, "inter_byte_timeout": False, } # type: ConnectionParameters def __init__(self, connection_parameters: ConnectionParameters): """ Args: connection_parameters: Dictionary with connection settings relevant for the serial connection. """ # Init base class, set common parameters config = {**self.SERIAL_DEFAULT_CONNECTION_PARAMETERS, **connection_parameters} super().__init__(config) # Interval in seconds for connection listener to check for incoming data # This is the time that connection listener sleeps between serial port read attempts # So this determines the maximum delay between reply received and reply being read out. self.receiving_interval = self.connection_parameters.get("receiving_interval") # Connection listener thread settings self.listener = None self._connection_close_requested = threading.Event() self._connection_close_requested.clear()
[docs] def open_connection(self): """Creates, sets up and opens serial connection. """ if self.is_connection_open(): self.logger.warning("Connection already open.") return # Create serial connection object # port=None is required to prevent port from being immediately opened self._connection = serial.Serial(port=None) self._clear_data_buffer() # Load settings self._connection.port = self.connection_parameters.get("port") self._connection.baudrate = self.connection_parameters.get("baudrate") self._connection.bytesize = self.connection_parameters.get("bytesize") self._connection.parity = self.connection_parameters.get("parity") self._connection.stopbits = self.connection_parameters.get("stopbits") self._connection.timeout = self.connection_parameters.get("timeout", self.receive_timeout) self._connection.xonxoff = self.connection_parameters.get("xonxoff") self._connection.rtscts = self.connection_parameters.get("rtscts") self._connection.dsrdtr = self.connection_parameters.get("dsrdtr") self._connection.write_timeout = self.transmit_timeout self._connection.inter_byte_timeout = self.connection_parameters.get("inter_byte_timeout") # Open connection try: self._connection.open() except serial.SerialException as e: raise PLConnectionError(f"Can't open serial port {self._connection.port}!") from e # Start connection listener self.listener = threading.Thread(target=self.connection_listener, name="{}_listener".format(__name__), daemon=True) self._connection_close_requested.clear() self.listener.start() self.logger.info("Port %s opened.", self._connection.port)
[docs] def connection_listener(self): """Periodically checks for new data on the connection, reads it, puts the data read into receive buffer and raises data ready flag. """ self.logger.info("Starting connection listener...") while True: # Check connection close request if self._connection_close_requested.is_set(): self.logger.info("Connection listener exiting.") return if self._connection.in_waiting > 0: # If the flag is still set it means receive() hasn't yet read it out if self._data_ready.is_set() is True: self.logger.warning("Discarding unconsumed device reply <%r>", self._last_reply) self._last_reply = "" # Read data from connection into buffer while self._connection.in_waiting > 0 and len(self._last_reply) <= self.receive_buffer_size: self.logger.debug("connection_listener()::<%s> bytes to read", self._connection.in_waiting) # Lock connection with self._connection_lock: reply_bytes = self._connection.read(size=self.receive_buffer_size) self.logger.debug("connection_listener()::got reply <%s>", reply_bytes) try: self._last_reply += reply_bytes.decode(self.encoding) except UnicodeDecodeError: self.logger.exception("Can't decode device reply!", exc_info=True) # Unlock connection & break to outer loop discarding current data break # Notify main thread that it can access _last_reply now self._data_ready.set() # Switch thread context to main sleep(self.receiving_interval)
[docs] def close_connection(self): """Closes serial connection. """ if self.is_connection_open(): # Stop connection listener self._connection_close_requested.set() self.listener.join(timeout=self.receiving_interval * 5) self._connection.reset_input_buffer() self._connection.reset_output_buffer() self._connection.close() self.logger.info("Port %s closed.", self._connection.port) else: self.logger.warning("Trying to close connection that is not open")
[docs] def is_connection_open(self) -> bool: """Checks whether the serial port is opened. Returns: (bool): If the serial port is open. """ # Serial connection open status if self._connection is None: return False is_open = self._connection.is_open # Check & warn if connection is open, but connection listener thread is not running if is_open and not self.listener.is_alive(): # type: ignore self.logger.warning("Connection listener thread seems to be dead!") return is_open
[docs] def transmit(self, msg: str): """Sends message to the serial port. """ # Check if connection is alive if not self.is_connection_open(): raise PLConnectionError("No connection to the device!") try: command = msg.encode(self.encoding) except SyntaxError: raise PLConnectionProtocolError("Can't encode command <{}> to a byte-string!".format(msg)) from None # Calculate if we have to wait after the previous command was sent delta = time() - self._last_command_time # Subtract 0.5 second for possible jitter/precision errors if delta < self.command_delay: self.logger.debug("Command rate too high! Delaying next command for %s seconds", self.command_delay) sleep(self.command_delay - delta) if self._data_ready.is_set() is True: self.logger.warning("Previous reply <%s> has not been read.", self._last_reply) with self._connection_lock: self._connection.reset_input_buffer() self._connection.reset_output_buffer() self._connection.write(command) self._last_command_time = time() self.logger.debug("transmit()::sent command <%s>", command)
[docs] def receive(self, retries: int = 3): """Gets the data from receive buffer, clears the data ready flag and passes the data back. Returns: (LabDeviceReply): Reply from the device packed into LabDeviceReply object. """ if not self._data_ready.is_set(): self.logger.debug("receive()::waiting for incoming data to become ready...") failed_attempts = 0 # Timeout here ensures upper level code wouldn't lock forever if no reply is received # It has to be long enough to give time for connection_listener() thread to do it's job while self._data_ready.wait(timeout=self.receive_timeout) is False: failed_attempts += 1 if failed_attempts > retries: # No reply after timeout raise PLConnectionTimeoutError("No reply received from the device!") # Unset ready flag self._data_ready.clear() return LabDeviceReply(body=self._last_reply, content_type="chunked")
[docs] class TCPIPConnection(AbstractConnection): """Socket-based TCP/IP connection adapter. """ TCPIP_DEFAULT_CONNECTION_PARAMETERS = { "protocol": "TCP", } # type: ConnectionParameters def __init__(self, connection_parameters: ConnectionParameters): """Default constructor. Args: connection_parameters: Dictionary with connection settings for socket-based connection. """ # Init base class, set common parameters config = {**self.TCPIP_DEFAULT_CONNECTION_PARAMETERS, **connection_parameters} super().__init__(config) # Default IP connection settings self.address = self.connection_parameters.get("address") self.port = self.connection_parameters.get("port") self.protocol = self.connection_parameters.get("protocol").upper() # Connection listener thread settings self.listener = None self._connection_close_requested = threading.Event() self._connection_close_requested.clear() # Interval in seconds for connection listener to check for incoming data # This is the time that connection listener sleeps between serial port read attempts # So this determines the maximum delay between reply received and reply being read out. self.receiving_interval = self.connection_parameters.get("receiving_interval")
[docs] def open_connection(self): """Creates, sets up and and opens the socket. """ if self.is_connection_open(): self.logger.warning("Connection already open.") return try: # Initialize corresponding socket connection object if self.protocol == "TCP": self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) elif self.protocol == "UDP": self._connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) else: raise PLConnectionProtocolError("Unknown transport layer protocol <{}> provided!".format(self.protocol)) self._clear_data_buffer() self._connection.connect((self.address, int(self.port))) # Receive timeout in seconds. # This has to be done after opening the connection to avoid # connect() error on non-blocking socket, but before starting # connection listener which uses this timeout for socket recv(). # Zero sets the socket to non-blocking mode. Non-blocking mode does # not provide any performance benefit compared to blocking socket # with small delay, but it makes it to raise platform-specific # exceptions which might be trickier to catch. self._connection.settimeout(self.receive_timeout) self._connection_close_requested.clear() self.listener = threading.Thread(target=self.connection_listener, name="{}_listener".format(__name__), daemon=True) self.listener.start() self.logger.info("Opened connection to <%s:%s>", self.address, self.port) except TimeoutError as e: raise PLConnectionTimeoutError(f"Remote host {self.address}:{self.port} doesn't respond!") from e except (OSError, TypeError) as e: raise PLConnectionError(f"Can't open {self.protocol} socket for {self.address}:{self.port}!") from e
[docs] def connection_listener(self): """Periodically checks for new data on the connection, reads it, puts it into receive buffer and raises data ready flag. """ self.logger.info("Starting connection listener...") while True: # Check connection close request if self._connection_close_requested.is_set(): # Clear the flag and exit self._connection_close_requested.clear() self.logger.info("Connection listener exiting.") return # Lock connection object with self._connection_lock: # Try read from socket try: # This either gives data back or timeouts after # self.receive_timeout seconds chunk = self._connection.recv(self.receive_buffer_size) # If there's any data, clear buffer if chunk: # If the flag is still set it means receive() # hasn't yet read it out if self._data_ready.is_set() is True: self.logger.warning("Discarding unconsumed device reply <%r>", self._last_reply) self._last_reply = "" try: while chunk: self.logger.debug("connection_listener()::decoding chunk <%s>", chunk) self._last_reply += chunk.decode(self.encoding) chunk = self._connection.recv(self.receive_buffer_size) # Socket.timeout is raised for blocking sockets after timeout # BlockingIOError is raised for non-blocking sockets # both on Windows and Linux except (socket.timeout, BlockingIOError): # Finished reading data from the socket self._data_ready.set() except UnicodeDecodeError: self.logger.exception("Can't decode packet <%s>!", chunk, exc_info=True) except (socket.timeout, BlockingIOError): # No data in socket pass except ConnectionAbortedError: raise PLConnectionError("Device disconnected!") from None # Release the lock and sleep sleep(self.receiving_interval)
[docs] def close_connection(self): """Closes connection. """ if self.is_connection_open(): self._connection_close_requested.set() # There are two blocking calls in the listener loop # receive_timeout determines for how long it blocks on the socket to read # receiving_interval determines for how long it sleeps between socket reads self.listener.join(timeout=max(self.receiving_interval, self.receive_timeout) * 5) self._connection.close() self.logger.info("Connection to <%s:%s> closed.", self.address, self.port) else: self.logger.warning("Trying to close socket that is not open.")
[docs] def is_connection_open(self): """Checks whether the socket is open Returns: (bool): If the socket is open. """ if self._connection is None: return False # There's no general way in Python to check whether the socket is active. # This code was checked to work on Windows # It tries to get the connection state by checking # socket._closed attribute & probing with recv(1) on socket if not sys.platform.startswith("win"): raise NotImplementedError("This code was tested on Windows only!") if getattr(self._connection, "_closed", False) is True: # Socket was opened before, but now is closed return False # Otherwise socket has either been just created & connection not opened # or socket.connect() has been called, but connection is dead try: self._connection.recv(1) except socket.timeout: return True except OSError: return False return True
[docs] def transmit(self, msg: str): """Sends message to the socket. """ # Check if connection is alive if not self.is_connection_open(): raise PLConnectionError("No connection to the device!") try: command = msg.encode(self.encoding) except SyntaxError: raise PLConnectionProtocolError("Can't encode command <{}> to a byte-string!".format(msg)) from None # Calculate if we have to wait after the previous command was sent delta = time() - self._last_command_time if delta < self.command_delay: self.logger.debug("Command rate too high! Delaying next command for <%s> seconds", self.command_delay) sleep(self.command_delay - delta) if self._data_ready.is_set() is True: self.logger.warning("Previous reply <%r> has not been read.", self._last_reply) with self._connection_lock: self._connection.send(command) self._last_command_time = time() self.logger.debug("transmit()::sent command <%s>", command)
[docs] def receive(self, retries: int = 3): """Gets the data from receive buffer, clears the data ready flag and passes the data back. Args: retries (int): Number of times to retry if receive times out. Introduced due to RCTDigitalHotplate test showing that around 0.02 % of attempted `get_temperature` calls timeout. These are spaced out and not all happening at once so 1 retry should be sufficient to protect against these anomalies and prevent unnecessary crashes. Returns: (LabDeviceReply): Reply from the device packed into LabDeviceReply object. """ if not self._data_ready.is_set(): self.logger.debug("receive()::waiting for incoming data to become ready...") failed_attempts = 0 # Timeout here ensures upper level code wouldn't block forever # if no reply is received. It has to be long enough to give time # for connection_listener() thread to do it's job while self._data_ready.wait(timeout=self.receive_timeout * 50) is False: failed_attempts += 1 if failed_attempts > retries: # No reply after timeout raise PLConnectionTimeoutError("No reply received from the device!") # Unset data ready flag self._data_ready.clear() return LabDeviceReply(body=self._last_reply, content_type="chunked")
[docs] class HTTPConnection(AbstractConnection): """HTTP REST connection adapter based on Requests library. """ HTTP_DEFAULT_CONNECTION_PARAMETERS = { "user": None, "password": None, "schema": "http", "verify_ssl": True, "headers": "" } # type: ConnectionParameters def __init__(self, connection_parameters: ConnectionParameters): """ Args: connection_parameters (Dict): Dictionary with connection settings for HTTP REST connection. """ # Init base class, set common parameters config = {**self.HTTP_DEFAULT_CONNECTION_PARAMETERS, **connection_parameters} super().__init__(config) # Reply for HTTP response self._last_reply = "" self._last_reply_headers = {} # type: ignore # Default HTTP connection settings self.user = self.connection_parameters["user"] self.password = self.connection_parameters["password"] self.schema = self.connection_parameters["schema"] self.verify_ssl = self.connection_parameters["verify_ssl"] self.headers = self.connection_parameters["headers"] # Address settings self.address = self.connection_parameters["address"] + ":" + str(connection_parameters["port"]) # Append / to host address if not self.address.endswith("/"): self.address += "/" # Strip :// from schema, if present self.schema = self.schema.strip("/") self.schema = self.schema.strip(":") # Make base URL self.base_url = self.schema + "://" + self.address self.logger.debug("HTTPConnection.__init__()::constructed base URL <%s>", self.base_url)
[docs] def open_connection(self): """Creates requests.Session() object & sets it's parameters. """ self._connection = requests.Session() if self.user is not None: self._connection.auth = (self.user, self.password) self._connection.verify = self.verify_ssl self._connection.headers = self.headers self.logger.info("Session to <%s> initialized.", self.base_url)
[docs] def close_connection(self): """Closes connection. """ self._connection.close() self.logger.info("Session to <%s> closed.", self.base_url)
[docs] def transmit(self, message: Dict): # type: ignore """Sends request to the server. Args: message: Dictionary containing method, endpoint and request data """ # Make complete URL from base API URL and endpoint url = urljoin(self.base_url, message["endpoint"]) self.logger.debug("transmit()::trying to invoke <%s> with method <%s>, data=<%s>", url, message["method"], message["data"]) try: reply = self._connection.request(method=message["method"].upper(), url=url, data=message["data"], timeout=self.transmit_timeout) except requests.exceptions.Timeout as e: raise PLConnectionTimeoutError("Can't connect to host!") from e # Log headers self.logger.debug("transmit()::reply headers::<%r>", reply.headers) # Check HTTP reply code if reply.status_code > 400: raise PLConnectionProtocolError(f"Server replied with HTTP code {reply.status_code} ({reply.text})") self._last_reply = reply.content self._last_reply_headers = dict(reply.headers)
[docs] def receive(self): """Passes the request response back. Returns: (LabDeviceReply): HTTP reply from the device packed into LabDeviceReply object together with reply headers. """ try: self._last_reply = self._last_reply.decode() except UnicodeDecodeError: raise PLConnectionProtocolError(f"Can't decode device reply!\n {self._last_reply}") return LabDeviceReply(body=self._last_reply, parameters=self._last_reply_headers, content_type="json")
[docs] def is_connection_open(self): """ Dummy method as REST is stateless. """ return True