Usage

Importing the library automatically imports all the device modules into the library namespace to make their usage straightforward:

>>> import PyLabware as pl
>>> dir(pl)
['C3000SyringePump', 'C815FlashChromatographySystem', 'CF41Chiller',
'CVC3000VacuumPump', 'HeiTorque100PrecisionStirrer', 'Microstar75Stirrer',
'PetiteFleurChiller', 'R300Rotovap', 'RCTDigitalHotplate',
'RETControlViscHotplate', 'RV10Rotovap', 'RZR2052ControlStirrer',
'__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__',
'__package__', '__path__', '__spec__', 'connections', 'controllers', 'devices',
'exceptions', 'models', 'parsers']

Basic examples

Creating a device

Serial connection:

>>> import PyLabware as pl
>>> pump = pl.C3000SyringePump(device_name="reagent_pump", port="COM7",
connection_mode="serial", address=None, switch_address=4)
  • device_name is an arbitrary string used to identify a device.

  • port is a serial port name (platform-dependent).

  • connection_mode determines which connection adapter would be activated for the device.

  • address determines IP address/DNS name for socket-based or HTTP REST connection, it is not used for serial connection.

The rest of constructor parameters are device specific and are described in the corresponding module documentation.

Socket-based connection:

>>> import PyLabware as pl
>>> chiller = pl.CF41Chiller(device_name="jacket_chiller", port="5050",
connection_mode="tcpip", address="192.168.0.1")

HTTP connection:

>>> import PyLabware as pl
>>> rv = pl.R300Rotovap(device_name="rotavap", connection_mode="http",
address="r300.local", password="yourpass", port="4443")

Operating a device

A general sequence for an arbitrary device would be:

  1. Create device object.

    >>> import PyLabware as pl
    >>> pump = pl.C3000SyringePump(device_name="reagent_pump", port="COM7",
    connection_mode="serial", address=None, switch_address=4)
    
  2. Alter any settings if needed.

  3. Open connection.

    >>> pump.connect()                  # Open serial connection
    
  4. Check that device is alive.

    >>> pump.is_connected()             # Check that the pump is live
    True
    
  5. Check that device is initialized. initialize_device() handles any device-specific functionality needed for it to operate properly.

    >>> pump.is_initialized()           # Check that the pump has been initialized
    False
    >>> pump.initialize_device()
    
  6. Run whatever operations are needed.

    >>> pump.get_valve_position()       # Check pump distribution valve position
    'I'
    >>> pump.withdraw(200)              # Withdraw 200 units
    >>> pump.get_plunger_position()
    200
    >>> pump.set_valve_position("O")    # Switch valve
    >>> pump.get_valve_position()
    'O'
    >>> pump.dispense(200)              # Dispense 200 units to another output
    >>> pump.get_plunger_position()
    0
    
  7. Close the connection and exit.

    >>> pump.disconnect()               # Close connection before exiting
    

Warning

Gracefully closing connection before exiting is a good practice. Otherwise you are relying on a Python garbage collector for closing the connection when it destroys the device object. There are no warranties the latter would happen cleanly, so the physical device might get stuck with the half-open connection.

Running sequential commands

Often it is important to ensure that the device is idle before sending a command. A classical example would be a syringe pump running at low speed that would block any further commands until the current dispensing/withdrawing is complete.

Note

For the definition of what ‘device idle state’ means in the context of this library, please check the documentation for the is_idle():

To indicate whether a device is ready to receive further commands, every device driver implements a hardware-specific is_idle() method:

>>> import PyLabware as pl
>>> pump = pl.C3000SyringePump(device_name="reagent_pump", port="COM7",
connection_mode="serial", address=None, switch_address=4)
>>> pump.connect()
>>> pump.initialise_device()
>>> pump.is_idle()
True
###########################
# Set slow withdrawal speed
###########################
>>> pump.set_speed(20)
>>> pump.withdraw(200)
###############################################################
# From here the pump would give a hardware error if any further
# plunger movement commands are issued before it has finished move.
###############################################################

To make the life easier, a execute_when_ready() method is provided. For most of the device drivers it is internally used when necessary, so that the end user has nothing to worry about:

class C3000SyringePump(AbstractSyringePump, AbstractDistributionValve):
    ...
    def move_plunger_absolute(self, position: int, set_busy: bool = True):
        """Makes absolute plunger move.
        """

        if set_busy is True:
            cmd = self.cmd.SYR_MOVE_ABS
        else:
            cmd = self.cmd.SYR_MOVE_ABS_NOBUSY
        # Send command & check reply for errors
        self.execute_when_ready(self.send, cmd, position)

For the detailed syntax please check the corresponding documentation.

The wait_until_ready() is a simplified wrapper over execute_when_ready() that just blocks until the device is idle.

Note

If using is_idle()/execute_when_ready() is not convenient (e.g. the device doesn’t report busy/idle state), there is a simple control flow mechanism built in that ensures there is a minimal delay between any two successive commands. Read more here.

Operating multiple devices

Operating multiple devices is similar to the single device example given above. The device_name attribute can be used to distinguish between device replies in the log files.

Note

Every device has its own connection object, so concurrent access to a single serial port from multiple devices is not supported.

Advanced examples

Running concurrent tasks for devices

PyLabware supports concurrent execution of commands if the device hardware itself supports it:

>>> import PyLabware as pl
>>> pump = pl.C3000SyringePump(device_name="reagent_pump", port="COM7",
connection_mode="serial", address=None, switch_address=4)
>>> pump.connect()
>>> pump.initialise_device()
>>> pump.is_idle()
True
>>> pump.set_speed(20)
>>> pump.withdraw(200)
>>> def print_plunger_position():
...:    print(f"Plunger position: {pump.get_plunger_position()}")
>>> pump.start_task(interval=5, method=print_plunger_position, args=None)
Plunger position: 0
Plunger position: 0
Plunger position: 0
>>> pump.withdraw(200)
Plunger position: 12
Plunger position: 62
Plunger position: 113
Plunger position: 163
Plunger position: 200
Plunger position: 200
>>> pump.get_all_tasks()
[<LabDeviceTask(Thread-2, started 7944)>]
>>> task = pump.get_all_tasks()[0]
>>> pump.stop_task(task)

In the example above the plunger position is constantly monitored and printed out while the pump is withdrawing the liquid. Any sensible number of tasks can be run in parallel and started/stopped independently. A common use case for this feature would be to issue keep-alive commands so that the device stays active.

More examples can be found in PyLabware/examples/concurrent_tasks.py

Simulation mode

Often it is desirable to make a dry run of a script before running it on actual hardware to avoid unnecessary time/material cost and/or to ease up debug and development. To fulfill this task, every device can be run in simulation mode.

The simulation mode is switched on by setting the simulation property to True. Simulation messages are printed to log at INFO level so to use it you need to configure logging first:

>>> import PyLabware as pl
>>> import logging
>>> logging.getLogger().setLevel(logging.INFO)
>>> pump = pl.C3000SyringePump(device_name="reagent_pump", port="COM7", connection_mode="serial", address=None, switch_address=4)
[INFO] :: PyLabware.connections.SerialConnection :: Creating connection object with the following settings:
{'address': None, 'port': 'COM7', 'encoding': 'UTF-8', 'command_delay': 0.5,
'receive_buffer_size': 128, 'receive_timeout': 1, 'transmit_timeout': 1,
'receiving_interval': 0.05, 'write_timeout': 0.5, 'baudrate': 9600,
'bytesize': 8, 'parity': 'N', 'stopbits': 1, 'xonxoff': False, 'rtscts': False,
'dsrdtr': False, 'inter_byte_timeout': False}

>>> pump.simulation = True

After that one can use the device as usual issuing any commands:

>>> pump.is_connected()
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Pretending to send message <'/5?23R\r\n'>
True
>>> pump.is_initialized()
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Patched send() to return <True>, calling <is_initialized>
True
>>> pump.get_valve_position()
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Pretending to send message <'/5?6R\r\n'>
>>> pump.get_plunger_position()
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Pretending to send message <'/5?R\r\n'>
>>> In [9]: pump.withdraw(200)
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Patched send() to return <<PyLabware.models.LabDeviceReply object at 0x05A8DA08>>, calling <is_idle>
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: Waiting done. Device <test> ready.
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Pretending to send message <'/5P200R\r\n'>
>>> pump.dispense(200)
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Patched send() to return <<PyLabware.models.LabDeviceReply object at 0x05A8DA08>>, calling <is_idle>
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: Waiting done. Device <test> ready.
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Pretending to send message <'/5D200R\r\n'>
>>> pump.set_valve_position("O")
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Patched send() to return <<PyLabware.models.LabDeviceReply object at 0x05A8DA08>>, calling <is_idle>
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: Waiting done. Device <test> ready.
[INFO] :: PyLabware.controllers.C3000SyringePump.test :: SIM :: Pretending to send message <'/5OR\r\n'>

All methods work without throwing an error, though, obviously, the methods that have to return the data from the device do not return anything. This behavior can be altered in the device driver, see below for more details.

The simulation mode is designed in such a way to facilitate device testing. Thus, all value checking in device methods still takes place even in simulation:

>>> pump.set_valve_position("X")
SLDeviceCommandError: Unknown valve position <X> requested!

Tweaking simulation mode

How simulation mode works

Simulation mode works by intercepting the execution workflow in the following four places:

  • LabDevice.connect()

  • LabDevice.disconnect()

  • LabDevice.send()

  • LabDevice._recv()

A typical implementation just replaces the actual invocation of the underlying connection adapter method with a logging.info() call:

def connect(self):
    """ Connects to the device.

    This method normally shouldn't be redefined in child classes.
    """

    if self._simulation is True:
        self.logger.info("SIM :: Opened connection.")
        return
    self.connection.open_connection()
    self.logger.info("Opened connection.")

This results in all high-level code (e.g. value checking and other device-specific logic) to be executed as usual in the simulation mode, but all the command strings prepared are just logged instead of being sent to the device.

Using @in_simulation_device_returns decorator

Sometimes it is necessary to tune the simulated device behavior more granularly. The possible examples are:

  • A device that echoes the command back upon successful action. The device driver code checks the device reply to determine whether the command has been interpreted and ran correctly and raises an error if not.

  • A device with higher-level logic that relies on a particular value being returned from the device before the execution can continue, e.g. waiting for a certain temperature to be reached.

Both of these examples would be impossible to implement with the simple logic described above. To work around this issue and avoid patching every complex device method with if self.simulation is True clause, a special method decorator is used.

@in_simulation_device_returns decorator should be used to wrap any function that relies on a particular value that device should return. This value should be passed as the decorator argument. Here is an example from Tricontinent C3000 syringe pump driver:

class C3000SyringePump(AbstractSyringePump, AbstractDistributionValve):
    ...
    @in_simulation_device_returns(LabDeviceReply(body=C3000SyringePumpCommands.DEFAULT_STATUS))
        def is_idle(self) -> bool:
        """Checks if pump is in idle state.
        """

        # Send status request command and read back reply with no parsing
        # Parsing manipulates status byte to get error flags, we need it here
        try:

            ########################################################
            # send() patching takes place here
            ########################################################
            reply = self.send(self.cmd.GET_STATUS, parse_reply=False)

        except SLConnectionError:
            return False
        # Chop off prefix/terminator & cut out status byte
        reply = parser.stripper(reply.body, self.reply_prefix, self.reply_terminator)
        status_byte = ord(reply[0])
        # Busy/idle bit is 6th bit of the status byte. 0 - busy, 1 - idle
        if status_byte & 1 << 5 == 0:
            self.logger.debug("is_idle()::false.")
            return False
        # Check for errors if any
        try:
            self.check_errors(status_byte)
        except SLDeviceInternalError:
            self.logger.debug("is_idle()::false, errors present.")
            return False
        self.logger.debug("is_idle()::true.")
        return True

The decorator works as following:

  • Gets the object reference from the wrapped bound method (passed as self in the arguments list).

  • Checks self.simulation() to proceed.

  • Stores reference to original self.send() and replaces self.send() with a lambda returning decorator argument.

  • Runs the wrapped function and stores the return value.

  • Replaces self.send() back with original reference and returns the return value from previous step.

Simulating dynamic return values

Sometimes just the decorator is also not enough to achieve desirable behavior. A typical example would be a device that echoes back not the command, but the command argument and the code that relies on checking this reply for proper operation, e.g.:

Command to device >>> SET_TEMP 25.0
Device reply      <<< 25.0

This is quite typical echo mode often encountered in different devices. In order to support this mode of operation the following special syntax is used:

@in_simulation_device_returns("{$args[1]}")
def some_method(arg1, arg2, arg3):

1 is the number of positional argument that you want to extract from the some_method() call. In the case above the decorator will extract arg2 from the arguments list and return it as a return value for the send() call. Here’s a specific example from the Heidolph overhead stirrer driver:

class HeiTorque100PrecisionStirrer(AbstractStirringController):
    ...
    @in_simulation_device_returns("{$args[1]}")
    def set_speed(self, speed: int):
        """Sets rotation speed in rpm.
        """

        # If the stirrer is not running, just update internal variable
        if not self._running:
            # Check value against limits before updating
            self.check_value(self.cmd.SET_SPEED, speed)
            self._speed_setpoint = speed
        else:

            ###############################################################
            # Here send() will be replaced with a lambda returning args[1]
            # from set_speed(), which is speed
            ###############################################################
            readback_setpoint = self.send(self.cmd.SET_SPEED, speed)

            if readback_setpoint != speed:
                self.stop()
                raise SLDeviceReplyError(f"Error setting stirrer speed. Requested setpoint <{self._speed_setpoint}> "
                                        f"RPM, read back setpoint <{readback_setpoint}> RPM")
            self._speed_setpoint = speed