Source code for stg._wrapper.dll

from pathlib import Path
from sys import platform
from typing import List, Union, Any, Callable
from time import sleep
from abc import ABC, abstractmethod

OptionalInt = Union[int, None]

# ----------------------------------------------------------------------------
# Mocking everything in case we run this for testing or on Linux
if "win" in platform:  # pragma no cover
    # pylint: disable=import-error
    import clr
    import System
    from stg.install import DLLPATH

    lib = System.Reflection.Assembly.LoadFile(str(DLLPATH))
    from Mcs.Usb import (
        CMcsUsbListNet,
        DeviceEnumNet,
        CStg200xStreamingNet,
        CStg200xDownloadNet,
        STG_DestinationEnumNet,
    )

    CURRENT = STG_DestinationEnumNet.channeldata_current
    VOLTAGE = STG_DestinationEnumNet.channeldata_voltage
    from Mcs.Usb import CMcsUsbListEntryNet as DeviceInfo
else:  # pragma no cover
    from stg._wrapper.mock import (
        CMcsUsbListNet,
        DeviceEnumNet,
        CStg200xStreamingNet,
        CStg200xDownloadNet,
        CURRENT,
        VOLTAGE,
        DeviceInfo,
        System,
    )
from stg._wrapper.mock import CStg200xMockNet
from stg._wrapper.mock import info as mockinfo

# ------------------------------------------------------------------------------
def available() -> List[DeviceInfo]:
    "list all available MCS STGs connected over USB with this PC"
    deviceList = CMcsUsbListNet()
    deviceList.Initialize(DeviceEnumNet.MCS_STG_DEVICE)
    devices = []
    for dev_num in range(0, deviceList.GetNumberOfDevices()):
        devices.append(deviceList.GetUsbListEntry(dev_num))
    return devices


def select(serialnumber: OptionalInt = None) -> Union[DeviceInfo, None]:
    "select an STG with a specific serial number from all connected devices"
    if serialnumber == -1:
        return mockinfo

    deviceList = CMcsUsbListNet()
    deviceList.Initialize(DeviceEnumNet.MCS_STG_DEVICE)
    for dev_num in range(0, deviceList.GetNumberOfDevices()):
        snum = int(deviceList.GetUsbListEntry(dev_num).SerialNumber)
        if snum == serialnumber:
            device = deviceList.GetUsbListEntry(dev_num)
    return device


def bitmap(valuelist: list):
    bmap = sum(map(lambda x: 2 ** x, valuelist))
    return None if bmap == 0 else bmap


# ------------------------------------------------------------------------------
class BasicInterface(ABC):
    """Implements the `with` syntax for connecting to a CStg200xDownloadNet or CStg200xStreamingNet
    
    .. code-block:: python

       with BasicInterface() as interface:
           interface.start_stimulation()
       
    which is roughly equivalent to

    . code-block:: python

       interface = BasicInterface()
       try:
           interface.connect()
           interface.start_stimulation()
       except Exception: 
           pass
       finally:
           interface.disconnect()
    
    """

    @abstractmethod
    def __init__(self, info: DeviceInfo, *args, **kwargs):  # pragma no cover
        pass

    def connect(self) -> int:
        "connect with the device"
        self.connected = True
        return self._interface.Connect(self._info)

    def disconnect(self):
        "disconnect from the device"
        self.connected = False
        self._interface.Disconnect()

    def __enter__(self):
        err = self.connect()
        if err == 0:
            return self
        else:  # pragma no cover
            raise ConnectionRefusedError(f"{err}")

    def __exit__(self, type, value, tb):
        self.disconnect()

    def __getattr__(self, item):
        return getattr(self._interface, item)


class MockingInterface(BasicInterface):
    def __init__(self, info: DeviceInfo, *args, **kwargs):
        self.connected = False
        self._info = info
        self._interface = CStg200xMockNet(*args, **kwargs)


class DownloadInterface(BasicInterface):
    def __init__(self, info: DeviceInfo):
        self._info = info
        self._interface = CStg200xDownloadNet()


class StreamingInterface(BasicInterface):
    def __init__(self, info: DeviceInfo, buffer_size: int = 50_000):
        self._info = info
        self._interface = CStg200xStreamingNet(System.UInt32(buffer_size))


[docs]class STGX(ABC): """ The STGX is the base class for the STG4000 and wraps the basic USB interface and reads all the properties that are determined by the specific STG connected to your PC. Additionally methods for downloading or streaming are implemented by subclasses. Ideally, just use :code:`from stg.api import STG4000`, and you will get the class implementing all bells and whistles. When initialized without arguments, i.e. :code:`stg = STG4000()`, it looks through your USB ports and connects with the first STGs it finds. If you want to use a specific STG, initialize the class with the serial number of the device, e.g. using :code:`stg = STG4000(serial=12345)`. Immediatly after the connection is established, we eagerly read all properties from the stimulator, e.g. the number of channels or the output resolution. This read-only properties are than cached, to prevent any later overhead. This means it might take a few seconds until the :code:`stg` is initialized, but it saves you precious milliseconds when you later stimulate. Because at any time, only one process can be connected with a specific STG the connection is implemented using a :code:`with ... as` idiom. This should therefore be relatively safe. It is still possible that the STG can get into a weird state. In that case, try turning it off and on again. .. note:: * Properties are eagerly loaded and cached during initalization """ def __init__(self, serial: OptionalInt = None): if serial is None: try: info = available()[0] except IndexError as e: # pragma no cover raise ConnectionError( "No STG connected, or connected but not switched on." ) else: # pragma no cover info = select(serial) print( "Selecting {0:s}:SN {1:s}".format( info.DeviceName, info.SerialNumber ) ) self._info = info self._collect_properties() self.diagonalize_triggermap() def _collect_properties(self): self._str = self._info.ToString() self._name = self._info.DeviceName self._manufacturer = self._info.Manufacturer with self.interface() as interface: _, soft, hard = interface.GetStgVersionInfo("", "") self._version = (soft, hard) self._serial_number = int(self._info.SerialNumber) self._crinua = ( interface.GetCurrentResolutionInNanoAmp(System.UInt32(0)) / 1000 ) self._crinma = interface.GetCurrentResolutionInNanoAmp( System.UInt32(0) ) / (1000 * 1000) self._crngma = interface.GetCurrentRangeInNanoAmp( System.UInt32(0) ) / (1000 * 1000) self._crngua = interface.GetCurrentRangeInNanoAmp( System.UInt32(0) ) / (1000) self._vinuv = interface.GetVoltageResolutionInMicroVolt( System.UInt32(0) ) self._vrnguv = interface.GetVoltageRangeInMicroVolt( System.UInt32(0) ) / (1000) self._dacr = interface.GetDACResolution() self._achancnt = interface.GetNumberOfAnalogChannels() self._trgincnt = interface.GetNumberOfTriggerInputs() @abstractmethod def diagonalize_triggermap(self): # pragma no cover pass def interface(self): # pragma no cover return DownloadInterface(self._info)
[docs] def sleep(self, duration_in_ms: float): "sleep for duration in milliseconds" sleep(duration_in_ms / 1000)
@property def name(self) -> str: "returns the model name, i.e. STG4002/4/8" return self._name @property def version(self) -> str: "Returns the current hardware and software version" soft, hard = self._version return "Hardware - {0} : Software - Version: {1}".format( hard.replace("Rev.", "Revision"), soft ) @property def serial_number(self) -> int: "Returns the serial number of the device" return self._serial_number @property def manufacturer(self) -> str: "Returns the name of the manufacturer" return self._manufacturer @property def current_resolution_in_uA(self) -> float: "Return the current resolution in µA" return self._crinua @property def current_resolution_in_mA(self) -> float: "Return the current resolution in mA" return self._crinma @property def current_range_in_mA(self) -> float: "Return the current range in mA" return self._crngma @property def current_range_in_uA(self) -> float: "Return the current range in uA" return self._crngua @property def voltage_resolution_in_uV(self) -> float: "Return the voltage resolution in µV" return self._vinuv @property def voltage_range_in_uV(self) -> float: "Return the voltage range in uV" return self._vrnguv @property def time_resolution_in_us(self) -> float: "Return the time resolution in µs" return 20 @property def time_resolution_in_ms(self) -> float: "Return the time resolution in ms" return 0.02 @property def DAC_resolution(self) -> int: "Return the DAC resolution in bits" return self._dacr @property def channel_count(self) -> int: "returns the number of stimulation channels" return self._achancnt @property def trigin_count(self) -> int: "returns the number of trigger inputs" return self._trgincnt def __repr__(self) -> str: return f"{str(self)} at {hex(id(self))}" def __str__(self) -> str: return self._str def _set_current_mode(self, channel_index: List[int] = []): """set a single or all channels to current mode args ---- channel_index:list defaults to all, which sets all channels to current mode otherwise, takes an integer of the target channel. Indexing starts at 0. """ if channel_index == []: with self.interface() as interface: interface.SetCurrentMode() else: with self.interface() as interface: for chan in channel_index: interface.SetCurrentMode(System.UInt32(chan)) def _set_voltage_mode(self, channel_index: List[int] = []): """set a single or all channels to voltage mode args ---- channel_index:list defaults to all, which sets all channels to voltage mode otherwise, takes an integer of the target channel. Indexing starts at 0. """ if channel_index == []: with self.interface() as interface: interface.SetVoltageMode() else: with self.interface() as interface: for chan in channel_index: interface.SetVoltageMode(System.UInt32(chan))