Source code for PyLabware.controllers

"""PyLabware device controllers."""

import copy
import logging
import threading
from abc import abstractmethod, ABC
from functools import wraps
import sys
import queue
from time import sleep
from typing import Optional, Union, Callable, Any, List, Dict, Tuple

from .connections import (HTTPConnection, SerialConnection, TCPIPConnection)
from .exceptions import (PLConnectionError, PLConnectionTimeoutError, PLDeviceError, PLDeviceCommandError, PLDeviceReplyError)
from .models import (AbstractLabDevice, ConnectionParameters)
from . import parsers as parser


[docs] def in_simulation_device_returns(value): """ Decorator that patched the device send() method to return the value passed. """ def wrapper(func): @wraps(func) def wrapper_inner(*args, **kwargs): # Extract object to patch from self argument # Self is always passed first slf = args[0] nonlocal value # Make a copy so that the original decorator argument won't get # mutated between the calls. Important if value is a placeholder # (see below) dec_retval = copy.copy(value) simulation = getattr(slf, "simulation", False) if simulation is True: # Find the value that we need to return. In most cases that would be # a decorator argument. However, in particularly pesky cases the # wrapped function would expect to read back one of it's arguments. # To implement that, {$args[<number>]} string as decorator argument # is treated specially. try: if "{$args[" in value: # Try to find positional argument number for the wrapped # method call that we want to use as return value by inspecting # the decorator arguments argnum = value[value.find("[") + 1:value.find("]")] try: argnum = int(argnum) except ValueError: slf.logger.error("SIM:: Can't extract argument number from {$args[]}, check syntax!") dec_retval = None try: # Get the actual wrapped method argument from the list dec_retval = args[argnum] except IndexError: slf.logger.error("SIM:: Can't find argument number %s in arguments list <%s>!", value, args) dec_retval = None except TypeError: # value is non-iterable pass # Save reference to original send() orig_send = slf.send # Replace it with lambda returning the value we want - either a # static decorator argument or dynamic value from the wrapped # function call syntax slf.send = lambda *a, **k: dec_retval slf.logger.info("SIM :: Patched send() to return <%s>, calling <%s>", value, func.__name__) # Get return value (if any) for the actual function - other # functions in the call chain may rely on it retval = func(*args, **kwargs) # Restore original send() back slf.send = orig_send return retval # No simulation - return original function return func(*args, **kwargs) return wrapper_inner return wrapper
[docs] class LabDevice(AbstractLabDevice): """Base controller class for all labware devices. """ @abstractmethod def __init__(self, device_name: str = None, connection_mode: str = None, connection_parameters: ConnectionParameters = None): """Default constructor. This function performs object initialization. All device-specific hardware initialization procedures should be inside the :py:meth:`~PyLabware.models.AbstractLabDevice.initialise_device()` method. This method has to be redefined in child classes. Args: device_name: Device name (for logging purposes). connection_mode: Physical connection mode (defines the connection adapter used). connection_parameters: Dictionary with connection-specific settings. These vary depending on the connection_mode. """ # Instance name if device_name is None: device_name = self.__class__.__name__ self.device_name = device_name # Logger object self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__ + "." + self.device_name) # Flags self._simulation = False # Lock object for thread-safe access to connection object self._lock = threading.RLock() # Pool of threads for keepalive/background tasks self._background_tasks: List[LabDeviceTask] = [] # Protocol settings self.command_prefix = "" self.command_terminator = "\r\n" # Delimiter between the command and argument(s list) self.args_delimiter = " " self.reply_prefix = "" self.reply_terminator = "\r\n" if connection_parameters is None: connection_parameters = {} # Choose & instantiate right connection object if connection_mode == "serial": self.connection = SerialConnection(connection_parameters) elif connection_mode == "tcpip": self.connection = TCPIPConnection(connection_parameters) elif connection_mode == "http": self.connection = HTTPConnection(connection_parameters) else: raise PLDeviceError(f"Unsupported connection mode <{connection_mode}> for <{self.device_name}>") @property def simulation(self) -> bool: """ Determines whether the device behaves as as a real or simulated one. Simulated device just logs all the commands. """ return self._simulation @simulation.setter def simulation(self, sim: bool): """ Setter for the simulation property """ self._simulation = bool(sim)
[docs] def connect(self): """ Connects to the device. This method normally shouldn't be redefined in child classes. """ if self._simulation is True: self.logger.info("SIM :: Opened connection.") return # Serial device auto-discovery (currently for serial connection only) port = self.connection.connection_parameters.get("port") if isinstance(self.connection, SerialConnection) and (port == "" or port is None): if sys.platform == "win32": # Check which ports are physically present self.logger.info("Serial port name not provided, trying autodiscovery.") for i in range(1,255): port_name = f"COM{i}" try: self.connection.connection_parameters["port"] = port_name self.connection.open_connection() # Check if there is correct device on the port found self.logger.info("Found serial port %s, checking device...", port_name) # is_connected() usually checks an id string which devices reply rather fast # so makes sense to temporarily decrease timeout here to loop faster timeout = self.connection.receive_timeout self.connection.receive_timeout = 0.1 if self.is_connected(): self.logger.info("Device %s found on %s.", self.name, port_name) self.logger.info("Opened connection.") self.connection.receive_timeout = timeout return else: self.connection.receive_timeout = timeout self.connection.close_connection() self.logger.info("Device not found.") except PLConnectionError: pass self.connection.connection_parameters["port"] = None self.logger.info("No device found on any available serial port") return try: self.connection.open_connection() except (PLConnectionError, PLConnectionTimeoutError) as e: raise PLDeviceError(f"Can't connect to device <{self.__class__.__name__}.{self.device_name}>!") from e self.logger.info("Opened connection.")
[docs] def disconnect(self): """ Disconnects from the device. This method normally shouldn't be redefined in child classes. """ if self._simulation is True: self.logger.info("SIM :: Closed connection") return # Stop all background tasks if any if self._background_tasks: self.logger.info("Background tasks running, stopping them before disconnect.") self.stop_all_tasks() self.connection.close_connection() self.logger.info("Closed connection.")
[docs] def send(self, cmd, value=None): """This method takes the command to be sent and runs all necessary checks on the command parameter if present and required. Then the command get wrapped with the necessary prefix/terminator and connection lock is acquired. The command string is sent to the device using appropriate connection adapter and _recv() is invoked if a reply is expected. Only after that the connection lock is released. This method normally shouldn't be redefined in child classes. Args: cmd: The command to send. value: Command parameter, if any. """ if value is not None: value = self.check_value(cmd, value) message = self.prepare_message(cmd, value) if self._simulation is True: self.logger.info("SIM :: Pretending to send message <%r>", message) return None # Check if we need to get reply back for this command reply_expected = cmd.get("reply", False) with self._lock: self.connection.transmit(message) self.logger.debug("Sent message <%r>", message) if reply_expected: return self._recv(cmd)
[docs] def check_value(self, cmd: Dict, value: Any) -> Any: """ Checks the value provided against the definitions in command dict. Then does any value conversion/formatting/type casting as needed. This method may be redefined in child classes. Args: cmd: Device command definition. value: Command parameter to check. Returns: (Any): Processed value. """ # Value type casting # TODO think about moving type to check dictionary if "type" in cmd.keys() and cmd["type"] is not None: try: value = cmd["type"](value) self.logger.debug("check_value()::type casted value <%s> to <%s>.", value, cmd["type"]) # Invalid type definition except TypeError: self.logger.error("check_value()::Illegal type <%s> specification in command <%s> definition.", cmd["type"], cmd["name"]) # Type cast error except ValueError: raise PLDeviceCommandError(f"Can't cast value <{value}> to type <{cmd['type']}>.") else: self.logger.debug("check_value()::type casting not required - skipped.") # Check if any checking/processing is required acc. to cmd definition check_needed = cmd.get("check", False) if check_needed is False: return value # Value checking if "check" in cmd.keys() and cmd["check"] is not None: # Min/max check try: if value < cmd["check"]["min"]: raise PLDeviceCommandError(f"Requested value <{value}> is below limit <{cmd['check']['min']}> !") self.logger.debug("check_value()::min check <%s> < <%s>", value, cmd['check']['min']) if value > cmd["check"]["max"]: raise PLDeviceCommandError(f"Requested value <{value}> is above limit <{cmd['check']['max']}> !") self.logger.debug("check_value()::max check <%s> > <%s>", value, cmd['check']['max']) # No cmd["check"]["min"] or cmd["check"]["max"] except KeyError: self.logger.debug("check_value()::min/max check not required - skipped.") # Invalid value in cmd["check"]["min"] or cmd["check"]["max"] except TypeError: self.logger.error("Illegal min/max values specification in command <%s> definition!", cmd["name"]) # Value in range check try: if value not in cmd["check"]["values"]: raise PLDeviceCommandError(f"Requested value <{value}> not in the allowed range <{cmd['check']['values']}>.") self.logger.debug("check_value()::range check <%s> in range <%s>", value, cmd["check"]["values"]) # No cmd["check"]["range"] except KeyError: self.logger.debug("check_value()::range check not required - skipped.") except TypeError: self.logger.error("Illegal range specification in command <%s> definition.", cmd["name"]) return value
[docs] def prepare_message(self, cmd: Dict, value: Any) -> str: """This function does all preparations needed to make the command compliant with device protocol, e.g. type casts the parameters, checks that their values are in range, adds termination sequences, etc. This method may be redefined in child classes. Args: cmd: Device command definition. value: Command parameter to set, if any. Returns: (str): Checked & prepared command string. """ if value is None: return self.command_prefix + cmd["name"] + self.command_terminator # Else return self.command_prefix + cmd["name"] + self.args_delimiter + str(value) + self.command_terminator
def _recv(self, cmd: Dict) -> Union[int, float, str, bool]: """Locks the connection object, reads back the reply and re-assembles it if it is chunked. Then parses the reply if necessary and passes it back. This method normally shouldn't be redefined in child classes. Args: parse: If reply parsing is required. Returns: (str): Reply from the device. """ if self.simulation is True: self.logger.info("SIM :: Received reply.") return "" with self._lock: reply = self.connection.receive() # Check if we got complete reply in case it is chunked # If not, keep reading out data until the terminator # TODO how (any) parameters should be appended to reply object? if reply.content_type == "chunked": if self.reply_terminator is not None or self.reply_terminator != "": while not reply.body.endswith(self.reply_terminator): self.logger.debug("_recv()::reply terminator not found, reading next chunk.") reply_chunk = self.connection.receive() reply.body = reply.body + reply_chunk.body else: self.logger.warning("Received chunked reply, but reply terminator is not set - reassembly not possible!") self.logger.debug("Raw reply from the device: <%r>", reply.body) # Usually, we don't expect empty replies when we are waiting for them if reply.body == "": self.logger.warning("Empty reply from device!") # Run parsing reply = self.parse_reply(cmd, reply) # Run type casting # This would work properly only between base Python types if not isinstance(reply, (int, float, str, bool)): self.logger.debug("cast_reply_type()::complex data type <%s>, skipping casting.", type(reply)) return reply return self.cast_reply_type(cmd, reply)
[docs] def parse_reply(self, cmd: Dict, reply: Any) -> Any: """This function takes a LabDeviceReply object and does all necessary processing to return a reply string back. Parsing is done according to command specification. This method may be redefined in child classes. Args: reply: Reply from the device cmd: Command definition to look the parsing workflow in. Returns: (any): Processed reply """ # The condition below is always True unless parse_reply() # has been redefined in the child class and this is being executed # in a super() call after the actual string has already been extracted # from the LabDeviceReply object. try: reply = reply.body except AttributeError: pass # Always strip off prefix and terminator before proceeding reply = parser.stripper(reply, self.reply_prefix, self.reply_terminator) # Get parser function try: function = cmd["reply"]["parser"] # Then get parser function arguments args = cmd["reply"].get("args", []) self.logger.debug("parse_reply()::got parser <%s>, arguments <%s>", function, args) # Run parsing if callable(function): reply = function(reply, *args) self.logger.debug("parse_reply()::parsed reply <%s>", reply) else: self.logger.error("Parsing function <%s> defined for command <%s> is not callable!", function, cmd["name"]) except KeyError: # No parser found in command definition self.logger.debug("parse_reply()::parsing not defined for command <%s> - skipped.", cmd["name"]) return reply
[docs] def cast_reply_type(self, cmd: Dict, reply: Any) -> Union[int, float, str, bool]: """Casts reply type based on the type provided in command definition. Args: reply: Reply string. cmd: Command definition to look the type value in. Returns: (any): Reply casted to the correct type. """ try: # Special case - "0" string should be casted to boolean False if reply == "0" and cmd["reply"]["type"] is bool: casted_reply = False # Special case - returned value is a string representing float (e.g. # "0.0") and we need to cast it to int. int("0.0") would give a # ValueError, so we need to convert it to float first if cmd["reply"]["type"] is int: casted_reply = int(float(reply)) else: casted_reply = cmd["reply"]["type"](reply) self.logger.debug("cast_reply_type()::casted reply type to %s.", cmd['reply']['type']) # No cmd["reply"]["type"] node found except KeyError: self.logger.debug("cast_reply_type()::no type definition found - skipped.") return reply # cmd["reply"]["type"] does not point to a proper data type except TypeError: self.logger.error("Illegal parse type <%s> specification in command <%s> definition.", cmd["reply"]["type"], cmd["name"]) return reply # Type casting error except ValueError: raise PLDeviceReplyError(f"Can't cast reply <{reply}> to type <{cmd['reply']['type']}>.") else: return casted_reply
[docs] def wait_until_ready(self, check_ready: Optional[Callable] = None): """Acquires device lock, waits till device is ready and returns. If no method is provided for checking, self._is_idle is used. Args: check_ready: A method to use for checking whether the device is ready or not. """ return self.execute_when_ready(action=lambda *args: None, check_ready=check_ready)
[docs] def execute_when_ready(self, action: Callable, *args, check_ready: Optional[Callable] = None): """Acquires device lock, waits till device is ready and runs specified action. If no method is provided for checking, :py:meth:`PyLabware.models.AbstractLabDevice._is_idle` is used. Parameters: action: A method to run when the device is ready args: List of positional arguments for the method to run check_ready: A method to use for checking whether the device is ready or not. (keyword-only) Returns: (any): The return value of action. """ with self._lock: if check_ready is None: check_ready = self.is_idle while not check_ready(): sleep(0.5) #TODO Think how to reduce the amount of repeated log messages here. # Maybe invert execute_when_ready() <-> wait_until_ready() logic #self.logger.info("Waiting done. Device <%s> ready.", self.device_name) if args is not None: return action(*args) else: return action()
[docs] def start_task(self, interval: int, method: Callable, args=None): """Creates a LabDeviceTask object, starts it and appends the reference to the task instance to the list of tasks. Args: interval: How often the method should be executed, in seconds method: The function to run. args: Arguments for the function, if any. Returns: (LabDeviceTask): Created task object for further reference. """ # Prepare task object task = LabDeviceTask(interval, method, args) self.logger.info("Starting background task for <%s(%s)>", method.__name__, args) task.start() self._background_tasks.append(task) return task
[docs] def stop_task(self, task_to_stop=None): """Stops the LabDeviceTask object and remove the reference to it from the list of tasks. If no argument is provided and only a single task is running - stops that one. Arg: task_to_stop: LabDeviceTask object to stop. """ # If there's only a single task thread & no arguments - stop it if task_to_stop is None and len(self._background_tasks) == 1: self._background_tasks[0].stop() return if task_to_stop is None: self.logger.error("More than one task present, don't know which to stop!") return for task in self._background_tasks: if task == task_to_stop: self.logger.info("Stopping task thread <%s>.", task.ident) task.stop() task.join(task.interval) self._background_tasks.remove(task) return self.logger.error("Task %s not found!", task_to_stop)
[docs] def stop_all_tasks(self): """Stops all tasks currently running and resets internal list of tasks. """ for task in self._background_tasks: self.logger.info("Stopping background task <%s>.", task.ident) task.stop() task.join(task.interval) self._background_tasks = []
[docs] def get_all_tasks(self): """Returns internal list of tasks. """ return self._background_tasks
[docs] class LabDeviceTask(threading.Thread): """Simple class to implement periodically running device actions.""" def __init__(self, interval: int, method: Callable, args=None): """Default constructor""" super().__init__() self.interval = interval self.method = method self.results = queue.Queue(maxsize=100) # Kill all tasks upon exit self.daemon = True self.args = args if args is not None else [] # Stop flag self._stop_requested = threading.Event() # Logger object self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__ + "." + self.method.__name__)
[docs] def run(self): """Starts task activity.""" self.logger.info("Background task %s started. Executing <%s> command every <%s> seconds.", threading.get_ident(), self.method.__name__, self.interval) while not self._stop_requested.is_set(): retval = self.method(*self.args) if retval is not None: try: self.results.put(retval) except queue.Full: self.logger.warning("Can't push background task return value <%s> into the queue. The queue is full!", retval) self._stop_requested.wait(self.interval) self.logger.info("Background task <%s> exiting.", threading.get_ident())
[docs] def stop(self): """Sets the stop flag to signal for the thread to exit.""" self._stop_requested.set()
# ############## Base abstract controller classes ###############
[docs] class AbstractTemperatureController(LabDevice): """Any device capable of heating or cooling with temperature regulation."""
[docs] @abstractmethod def start_temperature_regulation(self) -> None: """Starts temperature regulation."""
[docs] def start(self) -> None: """Starts the device""" return self.start_temperature_regulation()
[docs] @abstractmethod def stop_temperature_regulation(self) -> None: """Stops temperature regulation."""
[docs] def stop(self) -> None: """Stops the device.""" return self.stop_temperature_regulation()
[docs] @abstractmethod def set_temperature(self, temperature: float, sensor: int = 0) -> None: """Sets desired temperature. Args: temperature (float): Temperature setpoint in °C. sensor (int): Specify which temperature probe the setpoint applies to. Default (0) is the internal probe. """
[docs] @abstractmethod def get_temperature(self, sensor: int = 0) -> float: """Gets the actual temperature. Args: sensor (int): Specify which temperature probe the setpoint applies to. Default (0) is the internal probe. """
[docs] @abstractmethod def get_temperature_setpoint(self, sensor: int = 0) -> float: """Gets desired temperature setpoint. Args: sensor (int): Specify which temperature probe the setpoint applies to. Default (0) is the internal probe. """
[docs] class AbstractStirringController(LabDevice): """Any device capable of stirring."""
[docs] @abstractmethod def start_stirring(self) -> None: """Starts stirring."""
[docs] def start(self) -> None: """Starts the device""" return self.start_stirring()
[docs] @abstractmethod def stop_stirring(self) -> None: """Stops stirring."""
[docs] def stop(self) -> None: """Stops the device""" return self.stop_stirring()
[docs] @abstractmethod def set_speed(self, speed: int) -> None: """Sets desired stirring speed, in RPM."""
[docs] @abstractmethod def get_speed(self) -> int: """Gets the actual stirring speed, in RPM."""
[docs] @abstractmethod def get_speed_setpoint(self) -> int: """Gets desired stirring speed setpoint, in RPM."""
[docs] class AbstractPressureController(LabDevice): """Any device capable of regulating the pressure."""
[docs] @abstractmethod def start_pressure_regulation(self) -> None: """Starts regulating the pressure."""
[docs] def start(self) -> None: """Starts the device""" return self.start_pressure_regulation()
[docs] @abstractmethod def stop_pressure_regulation(self) -> None: """Stops regulating the pressure."""
[docs] def stop(self) -> None: """Stops the device""" return self.stop_pressure_regulation()
[docs] @abstractmethod def vent_on(self) -> None: """Vents the system to atmosphere."""
[docs] @abstractmethod def vent_off(self) -> None: """Stops venting the system to atmosphere."""
[docs] @abstractmethod def set_pressure(self, pressure: float) -> None: """Sets desired pressure."""
[docs] @abstractmethod def get_pressure(self) -> float: """Gets the actual pressure"""
[docs] @abstractmethod def get_pressure_setpoint(self) -> float: """Gets desired pressure setpoint."""
[docs] class AbstractDispensingController(LabDevice): """Any device capable of withdrawing and dispensing the material."""
[docs] @abstractmethod def set_speed(self, speed: int) -> None: """Sets the dispensing speed."""
[docs] @abstractmethod def get_speed(self) -> int: """Gets the dispensing speed."""
[docs] @abstractmethod def withdraw(self, amount: int) -> None: """Withdraws the defined amount of material."""
[docs] @abstractmethod def dispense(self, amount: int) -> None: """Dispenses the defined amount of material."""
class AbstractDistributionController(LabDevice): """An N-to-M distribution device.""" @abstractmethod def move_home(self) -> None: """Resets the device distribution machinery to it's power on state.""" @abstractmethod def connect_ports(self, port1: Any, port2: Any) -> None: """Makes connection between provided input and output.""" @abstractmethod def disconnect_ports(self, port1: Any, port2: Any) -> None: """Breaks connection between provided input and output.""" @abstractmethod def get_port_connections(self) -> List[Tuple]: """Gets a list of tuples (portX, portY) representing current connectivity."""
[docs] class AbstractDistributionValve(LabDevice): """A 1-to-N distribution device."""
[docs] @abstractmethod def move_home(self) -> None: """Resets the device distribution machinery to it's power on state."""
[docs] @abstractmethod def set_valve_position(self, position: Any) -> None: """Connects the chosen distribution valve output to its input."""
[docs] @abstractmethod def get_valve_position(self) -> Any: """Gets currently selected distribution valve output."""
class AbstractBalance(LabDevice): """Any device capable of typical weighting operations.""" @abstractmethod def set_zero(self, stable: bool = True) -> None: """Zeroes out current weight reading. Args: stable (bool, optional): Wait for the weight reading to stabilize. Defaults to True. """ @abstractmethod def set_tare(self, stable: bool = False) -> None: """Stores current weight reading and zeroes the scale. Args: stable (bool, optional): Wait for the weight reading to stabilize. Defaults to True. """ @abstractmethod def calibrate(self, internal: bool) -> bool: """Runs the balance calibration according to the manufacturer specifications. This might be an interactive method requiring user actions, e.g. putting on/off the weights for external calibration. Args: internal (bool): Calibrate using internal weight (if available). Returns: bool: True if calibration has completed successfully. """ @abstractmethod def get_weight(self, stable: bool = False) -> Tuple[float, str]: """Gets current weight value Args: stable (bool, optional): Wait for the weight reading to stabilize. Defaults to True. Returns: Tuple[float, str]: Weight value and weighting unit. """
[docs] class AbstractFlashChromatographySystem(LabDevice): """ A flash chromatography system. """
# ############## Derived abstract controller classes ###############
[docs] class AbstractHotplate(AbstractTemperatureController, AbstractStirringController): """A typical hotplate capable of heating and stirring simultaneously."""
[docs] def start(self) -> None: """Starts the device.""" self.start_stirring() self.start_temperature_regulation()
[docs] def stop(self) -> None: """Stops the device""" self.stop_temperature_regulation() self.stop_stirring()
[docs] class AbstractRotavap(AbstractTemperatureController, AbstractStirringController): """A typical rotary evaporator, without integrated vacuum controller/pump."""
[docs] @abstractmethod def lift_up(self) -> None: """Lifts the evaporation flask up."""
[docs] @abstractmethod def lift_down(self) -> None: """Lowers the evaporation flask down."""
[docs] @abstractmethod def start_rotation(self) -> None: """Human-friendly wrapper for the start_stirring() of the parent class."""
[docs] @abstractmethod def stop_rotation(self) -> None: """Human-friendly wrapper for the stop_stirring() of the parent class."""
[docs] def start_stirring(self) -> None: """Mandatory method inherited from the AbstractStirringController.""" return self.start_rotation()
[docs] def stop_stirring(self) -> None: """Mandatory method inherited from the AbstractStirringController.""" return self.stop_rotation()
[docs] @abstractmethod def start_bath(self) -> None: """Human-friendly wrapper for the start_temperature_regulation() of the parent class."""
[docs] @abstractmethod def stop_bath(self) -> None: """Human-friendly wrapper for the stop_temperature_regulation() of the parent class."""
[docs] def start_temperature_regulation(self) -> None: """Mandatory method inherited from the AbstractTemperatureController.""" return self.start_bath()
[docs] def stop_temperature_regulation(self) -> None: """Mandatory method inherited from the AbstractTemperatureController.""" return self.stop_bath()
[docs] class AbstractSyringePump(AbstractDispensingController): """A syringe pump device."""
[docs] @abstractmethod def move_home(self) -> None: """Moves the plunger to home position."""
[docs] @abstractmethod def move_plunger_absolute(self, position: int) -> None: """Moves the plunger to an absolute position."""
[docs] @abstractmethod def move_plunger_relative(self, position: int) -> None: """Moves the plunger relative +/- to the current position."""
[docs] @abstractmethod def get_plunger_position(self) -> int: """Gets the actual plunger position."""