Source code for PyLabware.devices.kern_kdp3000

"""PyLabware driver for Kern KDP3000 digital weighting platform (balance)."""

import re
from typing import Optional, Union, Dict, Any
import serial

# Core imports
from .. import parsers as parser
from ..controllers import AbstractBalance, in_simulation_device_returns
from ..exceptions import PLConnectionError, PLDeviceCommandError, PLDeviceError
from ..models import LabDeviceCommands, ConnectionParameters


[docs] class KDP3000BalanceCommands(LabDeviceCommands): """Collection of commands for Kern KDP3000 balance. """ # ########################## Constants ################################## # ID string to check that correct device is connected DEFAULT_NAME = "KDP3000" # Maximum allowed weight, in grams MAX_WEIGHT = 3000 # Response and error codes. These differ a bit from command to command, unfortunately. RESPONSE_CODES = { "A": "Acknowledged.", "B": "More data to follow.", "S": "Stable.", "D": "Dynamic." } ERROR_CODES = { "L": "Logical error or invalid parameter.", "I": "Internal error.", "ES": "Syntax error.", "+": "Overweight!", "-": "Underweight!" } # ################### Control commands ################################### # Get list of commands GET_CMD_LIST = {"name": "I0", "reply":{"type": str, "parser": str.split, "args":[" "]}} # Get command interface level and version GET_CMD_VER = {"name": "I1", "reply":{"type": str, "parser": str.split, "args":[" "]}} # Get balance name GET_NAME = {"name": "I2", "reply":{"type": str, "parser": parser.splitter, "args":[" ", 1]}} # Get software version GET_SW_VER = {"name": "I3", "reply":{"type": str, "parser": parser.splitter, "args":[" ", 1]}} # Get serial number GET_SER_NUM = {"name": "I4", "reply":{"type": str, "parser": parser.splitter, "args":[" ", 1]}} # Get software id number GET_SW_ID = {"name": "I5", "reply":{"type": str, "parser": str.split, "args":[" "]}} # Set tare weight SET_TARE = {"name": "T", "reply":{"type": str}} # Set tare weight immediately SET_TARE_IMMEDIATE = {"name": "TI", "reply":{"type": str}} # Zero weight SET_ZERO = {"name": "Z", "reply":{"type": str}} # Zero weight immediately SET_ZERO_IMMEDIATE = {"name": "ZI", "reply":{"type": str}} # Get stable weight GET_WEIGHT = {"name": "S", "reply":{"type": float, "parser": str.split, "args":[" "]}} # Get weight immediately GET_WEIGHT_IMMEDIATE = {"name": "SI", "reply":{"type": float, "parser": str.split, "args":[" "]}} # Record calibration zero point CALIBRATE_ZERO = {"name": "JZ", "reply":{"type": str}} # Record max weight calibration point CALIBRATE_MAX = {"name": "JG", "reply":{"type": str}} # Save calibration data CALIBRATION_SAVE = {"name": "JS", "reply":{"type": str}} # ################### Configuration commands ############################# # Reset device RESET = {"name": "@", "reply": {"type": str}}
[docs] class KDP3000Balance(AbstractBalance): """ This provides a Python class for the Kern KDP3000 balance using KERN KCP protocol version 1.01 based on the operation manual KDP-BA_IA-e-1710 Version 1.0 2017-11 GB """ def __init__(self, device_name: str, connection_mode: str, address: Optional[str] = None, port: Optional[Union[str, int]] = None): """Default constructor """ # Load commands from helper class self.cmd = KDP3000BalanceCommands # Connection settings connection_parameters: ConnectionParameters = {} connection_parameters["port"] = port connection_parameters["address"] = address connection_parameters["baudrate"] = 9600 connection_parameters["bytesize"] = serial.EIGHTBITS connection_parameters["parity"] = serial.PARITY_NONE super().__init__(device_name, connection_mode, connection_parameters) # Protocol settings self.command_terminator = "\r\n" self.reply_terminator = "\r\n" self.args_delimiter = " "
[docs] def initialize_device(self): """Issue reset command. """ self.send(self.cmd.RESET) self.logger.info("Device initialized.")
[docs] @in_simulation_device_returns([KDP3000BalanceCommands.DEFAULT_NAME]) def is_connected(self) -> bool: """ Check if the device is connected via GET_NAME command. """ try: reply = self.send(self.cmd.GET_NAME) except PLConnectionError: return False return self.cmd.DEFAULT_NAME in reply
[docs] def parse_reply(self, cmd: Dict, reply: Any) -> str: """Overloaded method from base class. We need to do chop off command echo and status before geting to the actual reply. """ # Strip reply terminator and prefix reply = parser.stripper(reply.body, self.reply_prefix, self.reply_terminator) # Split into parts command, response_code, *reply = str.split(reply, " ") # Analyze response code if response_code in self.cmd.ERROR_CODES: if response_code in ("L", "I", "+", "-"): raise PLDeviceError(self.cmd.ERROR_CODES[response_code]) elif response_code == "ES": raise PLDeviceCommandError(self.cmd.ERROR_CODES[response_code]) elif response_code in self.cmd.RESPONSE_CODES: # All fine # Glue reply back into string and strip empty spaces and quotes reply = " ".join(reply).strip(" ").strip('"') self.logger.debug("Invoking parse_reply() of the base class with argument <%s>", reply) return super().parse_reply(cmd, reply) else: raise PLDeviceError(f"Unknown response code <{response_code}> received from the device!")
# TODO Maybe put power on/off in start/stop ?
[docs] def start(self): """Not supported on this device. """
[docs] def stop(self) -> bool: """Not supported on this device. """
[docs] def is_idle(self) -> bool: """Not supported on this device. """ return self.is_connected()
[docs] def get_status(self): """Not supported on this device. """
[docs] def check_errors(self): """Not supported on this device. """
[docs] def clear_errors(self): """Not supported on this device. """
[docs] def calibrate(self, internal: bool = False) -> bool: """Runs the balance calibration according to the manufacturer specifications. This is an interactive method requiring user to putting on/off the weights for external calibration. The procedure is based on the original user manual page, section 9 "Adjustment", page 14. The commands listed there differ from the ones in the latest KCP protocol specification! Args: internal (bool): Calibrate using internal weight (if available). Returns: bool: True if calibration has completed successfully. """ if internal is True: self.logger.error("This balance doesn't support internal calibration!") return input("Please insure that the balance are leveled on a hard surface and the weighting pan is empty. Press <Enter> when ready.") self.logger.info("Running zero point calibration...") try: self.send(self.cmd.CALIBRATE_ZERO) except PLDeviceError as e: self.logger.error("Zero calibration failed! %s", e.args[0]) return input("Please put the calibration weight (3kg, class F1) in the weighting pan and press <Enter>.") self.logger.info("Running max point calibration...") try: self.send(self.cmd.CALIBRATE_MAX) except PLDeviceError as e: self.logger.error("Maximum weight calibration failed! %s", e.args[0]) return input("Please remove the calibration weight from the weighting pan and press <Enter>.") self.logger.info("Storing calibration data...") try: self.send(self.cmd.CALIBRATION_SAVE) except PLDeviceError as e: self.logger.error("Storing calibration data failed! %s", e.args[0]) return self.logger.info("Calibration done!")
[docs] def set_tare(self, stable=True) -> None: """Sets tare weight to the currently measured value. Args: stable (bool, optional): Whether to wait for the reading to stable. Defaults to True. """ if stable is True: self.send(self.cmd.SET_TARE) else: self.send(self.cmd.SET_TARE_IMMEDIATE)
[docs] def set_zero(self, stable=True) -> None: """Zeros current balance reading. Args: stable (bool, optional): Whether to wait for the reading to stable. Defaults to True. """ if stable is True: self.send(self.cmd.SET_ZERO) else: self.send(self.cmd.SET_ZERO_IMMEDIATE)
[docs] @in_simulation_device_returns(42) def get_weight(self, stable=True) -> float: """Gets current weight reading. Args: stable (bool, optional): Whether to wait for the reading to stable. Defaults to True. Returns: float: current weight reading. """ if stable is True: weight, unit = self.send(self.cmd.GET_WEIGHT) else: weight, unit = self.send(self.cmd.GET_WEIGHT_IMMEDIATE) return (float(weight), unit)