API Reference

The NV200 library API consists of the following modules:

  • Device Discovery and Connection

  • Device Abstractions

    • device_base — Generic piezosystem device base class.

    • nv200_device — High-level async client for NV200 piezo controllers.

    • shared_types — Shared data structures and enums.

  • Transport Protocols

  • Data and Signal Modules

  • Utilities

    • utils — General utility functions and classes.

Device Discovery and Connection

device_discovery

This module provides asynchronous device discovery functionality for the NV200 library. It concurrently scans for devices available via Telnet and Serial protocols, returning a unified list of detected devices. Each detected device is represented by a DetectedDevice instance, annotated with its transport type (TELNET or SERIAL), identifier (such as IP address or serial port), and optionally a MAC address.

async discover_devices(flags: ~nv200.shared_types.DiscoverFlags = <DiscoverFlags.ALL_INTERFACES: 3>, device_class: ~typing.Type[~nv200.device_base.PiezoDeviceBase] | None = None) List[DetectedDevice][source]

Asynchronously discovers devices on available interfaces based on the specified discovery flags and optional device class.

The discovery process can be customized using flags to enable or disable:

Parameters:
  • flags (DiscoverFlags, optional) – Flags indicating which interfaces to scan and whether to read device info. Defaults to DiscoverFlags.ALL_INTERFACES.

  • device_class (Optional[Type[PiezoDeviceBase]], optional) – If specified, only devices matching this class will be returned. Also ensures device info is read.

Returns:

A list of detected devices, optionally enriched with detailed information and filtered by device class.

Return type:

List[DetectedDevice]

Raises:

Any exceptions raised by underlying protocol discovery or enrichment functions.

Notes

  • Device discovery is performed in parallel for supported interfaces (Ethernet, Serial).

  • If READ_DEVICE_INFO is set, each detected device is enriched with additional information.

  • If device_class is specified, only devices matching the class’s DEVICE_ID are returned.

Examples

Discover all Devices on all interfaces

>>> device = await nv200.device_discovery.discover_devices(DiscoverFlags.ALL_INTERFACES | DiscoverFlags.READ_DEVICE_INFO)

Discover NV200 Devices connected via serial interface

>>> device = await nv200.device_discovery.discover_devices(DiscoverFlags.DETECT_SERIAL | DiscoverFlags.READ_DEVICE_INFO, NV200Device)

device_factory

create_device_from_detected_device(detected_device: DetectedDevice) PiezoDeviceBase[source]

Creates a device object from the given DetectedDevice parameters.

create_device_from_id(device_id: str, *args, **kwargs) PiezoDeviceBase[source]

Creates and returns an instance of a device class corresponding to the given device ID.

Parameters:
  • device_id (str) – The identifier for the device model to instantiate.

  • *args – Positional arguments to pass to the device class constructor.

  • **kwargs – Keyword arguments to pass to the device class constructor.

Returns:

An instance of the device class associated with the given device ID.

Return type:

PiezoDeviceBase

Raises:

ValueError – If the provided device_id is not supported or not found in the registry.

connection_utils

async connect_to_detected_device(detected_device: DetectedDevice, auto_adjust_comm_params: bool = False) PiezoDeviceBase[source]

Connects to a device using the provided DetectedDevice instance.

Parameters:

detected_device (DetectedDevice) – The detected device to connect to.

Returns:

An instance of PiezoDeviceBase connected to the specified device.

Return type:

PiezoDeviceBase

async connect_to_single_device(device_class: Type[PiezoDeviceType], transport_type: TransportType | None = None, interface_or_address: str = None) PiezoDeviceType[source]

Convenience function to quickly connect to a single device.

Use this function if you only have one single device connected and you would like to connect to it directly. If no transport type is specified, it will attempt to connect using the serial transport first, and if no devices are found, it will then attempt to connect using the Telnet transport. The function does not verify if the right device is connected, it simply connects to the first device found. So use this function only, if you really have only one device connected to your system.

If no transport type is specified, the function will perform auto discovery of devices. If transport type is specified, but no interface_or_address is provided, it will perform auto discovery of devices for that transport type. If both transport type and interface_or_address are specified, it will attempt to connect directly to the specified device.

Parameters:
  • transport_type (TransportType, optional) – The type of transport to use (e.g., Serial, Telnet). If not specified, it will attempt to connect using serial transport first, then Telnet.

  • interface_or_address (str, optional) – The serial port, IP address, or MAC address to connect to. If not specified, device discovery is performed. This parameter is only used if the transport type is specified. If no transport type is specified, it will attempt to discover devices automatically.

Returns:

An instance of DeviceClient connected to the NV200 device.

Return type:

DeviceClient

Examples

Auto Discovery

>>> device = await nv200.connection_utils.connect_to_single_device()

Serial Port Auto Discovery

>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.SERIAL)

Ethernet Auto Discovery

>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.TELNET)

Connect to specific MAC address

>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.TELNET, "00:80:A3:79:C6:18")

Connect to specific IP address

>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.TELNET, "192.168.102.3")

Connect to specific serial port

>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.SERIAL, "COM3")

Device Abstractions

device_base

Provides classes and enumerations for communicating with and interpreting responses from NV200 devices.

This module includes an asynchronous client for issuing commands and parsing responses from NV200 devices over supported transport protocols (e.g., serial, Telnet).

class PiezoDeviceBase[source]

Bases: object

Generic piezosystem device base class.

PiezoDeviceBase provides an asynchronous interface for communicating with a piezoelectric device over various transport protocols (such as serial or telnet). It encapsulates low-level device commands, response parsing, synchronization mechanisms, and optional command result caching.

This class is intended to be subclassed by concrete device implementations (e.g., NV200Device), which define specific device behaviors and cacheable commands.

Concrete device classes support caching of command parameters/values. This mechanism is designed to reduce frequent read access over the physical communication interface by serving previously retrieved values from a local cache. Since each read operation over the interface introduces latency, caching can significantly improve the performance of parameter access — particularly in scenarios like graphical user interfaces where many values are queried repeatedly.

Note

Caching should only be used if it is guaranteed that no other external application modifies the device state in parallel. For example, if access is made via the Python library over a Telnet connection, no other software (e.g., via a serial interface) should modify the same parameters concurrently. In such multi-access scenarios, caching can lead to inconsistent or outdated data. To ensure correctness, disable caching globally by setting the class variable CMD_CACHE_ENABLED to False.

CMD_CACHE_ENABLED

Class-level flag that controls whether command-level caching is enabled. When set to True (default), values read from or written to the device for cacheable commands will be stored and retrieved from an internal cache. Setting this to False disables the caching behavior globally for all instances unless explicitly overridden at the instance level.

Type:

bool

CACHEABLE_COMMANDS: set[str] = {}
CMD_CACHE_ENABLED = True
DEFAULT_TIMEOUT_SECS = 0.6
DEVICE_ID = None
__init__(transport: TransportProtocol)[source]
async backup_parameters(backup_list: list[str]) Dict[str, str][source]

Asynchronously backs up device settings by reading response parameters for each command in the provided list.

Use the restore_parameters method to restore the settings later from the backup.

Parameters:

backup_list (list[str]) – A list of command strings for which to back up settings.

Returns:

A dictionary mapping each command to its corresponding response string from the device.

Return type:

Dict[str, str]

Example

>>> backup_list = [
>>>     "modsrc", "notchon", "sr", "poslpon", "setlpon", "cl", "reclen", "recstr"]
>>> await self.backup_settings(backup_list)
async check_device_type() tuple[bool, str][source]

Checks if the device type matches the given device ID.

Returns:

True if the device type matches, False otherwise.

Return type:

bool

clear_cmd_cache()[source]

Clears the cache of previously issued commands.

async close()[source]

Asynchronously closes the transport connection.

This method ensures that the transport layer is properly closed, releasing any resources associated with it.

async connect(auto_adjust_comm_params: bool = True)[source]

Establishes a connection using the transport layer.

This asynchronous method initiates the connection process by calling the connect method of the transport instance.

Parameters:

auto_adjust_comm_params (bool) – If True, the Telnet transport will automatically adjust the internal communication parameters of the XPORT ethernet module. It will set the flow control mode to# XON_XOFF_PASS_TO_HOST. This is required for the library to work+ properly.

Raises:

Exception – If the connection fails, an exception may be raised depending on the implementation of the transport layer.

property device_info: DeviceInfo

Returns detailed information about the connected device.

This property provides a DeviceInfo object that includes the device’s identifier and transport metadata. It requires that the transport layer is initialized and connected.

Raises:

RuntimeError – If the transport is not initialized or the device is not connected.

Returns:

An object containing the device ID and transport info.

Return type:

DeviceInfo

async enrich_device_info(detected_device: DetectedDevice) None[source]

Get additional information about the device.

A derived class should implement this method to enrich the device information in the given detected_device object.

Parameters:

detected_device (DetectedDevice) – The detected device object to enrich with additional information.

async get_device_type() str[source]

Retrieves the type of the device. The device type is the string that is returned if you just press enter after connecting to the device.

classmethod help(cmd: str | None = None) str[source]

Returns help information for a specific command or all commands if no command is specified.

Parameters:

cmd (str, optional) – The command to get help for. If None, returns help for all commands.

Returns:

Help information for the specified command or all commands.

Return type:

str

classmethod help_dict()[source]

Returns the class-level help dictionary with a list of all commands and their descriptions.

property lock: _ReentrantAsyncLock

Lock that can be used by external code to synchronize access to the device.

Use with async with client.lock: to group operations atomically.

async read_cached_response_parameters_tring(cmd: str, timeout: float = 0.6) str[source]

Asynchronously reads the response parameters string for a given command, utilizing a cache if enabled. If caching is enabled and the command’s response is present in the cache, returns the cached response. Otherwise, reads the response using read_response_parameters_string, caches it if applicable, and returns it.

Parameters:
  • cmd (str) – The command string to send.

  • timeout (float, optional) – Timeout in seconds for the response. Defaults to DEFAULT_TIMEOUT_SECS.

Returns:

The parameters part of the response string (after the first comma), or an empty string if no parameters are present.

Return type:

str

async read_float_value(cmd: str, param_index: int = 0) float[source]

Asynchronously reads a single float value from device. For example, if you write the command set, to read the current setpoint, it will return 80.000 if the setpoint is 80.000. The response is parsed into a float value. Use this function for command that returns a single floating point value.

Parameters:
  • cmd (str) – The command string to be sent.

  • param_index (int) – Parameter index (default 0) to read from the response.

Returns:

The value as a floating-point number.

Return type:

float

Example

>>> value = await device_client.read_float_value('set')
>>> print(value)
80.000
async read_int_value(cmd: str, param_index: int = 0) int[source]

Asynchronously reads a single float value from device. For example, if you write cl to the device, the response will be 0 or 1 depending on the current PID mode. The response is parsed into an integer value.

Parameters:
  • cmd (str) – The command string to be sent.

  • param_index (int) – Parameter index (default 0) to read from the response

Returns:

The value as a floating-point number.

Return type:

float

Example

>>> value = await device_client.read_int_value('cl')
>>> print(value)
1
async read_response(cmd: str, timeout: float = 0.6) tuple[source]

Asynchronously sends a command to read values and returnes the response as a tuple. For example, if you write the command set, it will return set,80.000 if the setpoint is 80.000. The response is parsed into a tuple containing the command set and a list of parameter strings, in this case [80.000].

Parameters:

cmd (str) – The command string to be sent.

Returns:

A tuple containing the command (str) and a list of parameters (list of str).

Return type:

tuple

Example

>>> response = await device_client.read_response('set')
>>> print(response)
('set', ['80.000'])
async read_response_parameters_string(cmd: str, timeout: float = 0.6) str[source]

Asynchronously sends a command and retrieves the parameters portion of the response string.

Parameters:
  • cmd (str) – The command string to send.

  • timeout (float, optional) – The maximum time to wait for a response, in seconds. Defaults to DEFAULT_TIMEOUT_SECS.

Returns:

The parameters part of the response string (after the first comma), or an empty string if no parameters are present.

Return type:

str

async read_response_string(cmd: str, timeout: float = 0.6) str[source]

Sends a command to the transport layer and reads the response asynchronously. For example, if you write cl to the device, it will return cl,0 or cl,1 depending on the current PID mode. That means, this function returns the complete string cl,0\r\n or cl,1\r\n including the carriage return and line feed.

Parameters:
  • cmd (str) – The command string to be sent.

  • timeout – The timeout for reading the response in seconds.

Returns:

The response received from the transport layer.

Return type:

str

Example

>>> response = await device_client.read('cl')
>>> print(response)
b'cl,1\r\n'
async read_string_value(cmd: str, param_index: int = 0) str[source]

Asynchronously reads a single string value from device. For example, if you write the command desc, the device will return the name of the actuator i.e. TRITOR100SG . The response is parsed into a string value.

Parameters:
  • cmd (str) – The command string to be sent.

  • param_index (int) – Parameter index (default 0) to read from the response.

Returns:

The value as a string.

Return type:

str

Example

>>> await self.read_string_value('desc')
>>> print(value)
TRITOR100SG
async read_stripped_response_string(cmd: str, timeout: float = 0.6) str[source]

Reads a response string from the device, stripping any leading/trailing whitespace. This method is a placeholder and should be implemented based on actual response handling.

async read_values(cmd: str, timeout: float = 0.6) list[str][source]

Asynchronously sends a command and returns the values as a list of strings. For example, if you write the command recout,0,0,1, to read the first data recorder value, it will return ['0', '0', '0.029'] if the first data recorder value is 0.029. So it returns a list of 3 strings.

Parameters:

cmd (str) – The command string to be sent.

Returns:

A list of values (list of str)..

Example

>>> values = await device_client.read_values('recout,0,0,1')
>>> print(values)
['0', '0', '0.029']
async restore_parameters(backup: Dict[str, str])[source]

Asynchronously restores device parameters from a backup created with backup_parameters.

Iterates over the provided backup dictionary, writing each parameter value to the device.

property transport_protocol: TransportProtocol

Returns the transport protocol used by the device.

async write(cmd: str)[source]

Sends a command to the transport layer.

This asynchronous method writes a command string followed by a carriage return to the transport layer.

Parameters:

cmd (str) – The command string to be sent. No carriage return is needed.

Example

>>> await device_client.write('set,80')
async write_string_value(cmd: str, value: str)[source]

Sends a command with a string value to the transport layer.

This asynchronous method writes a command string followed by a carriage return to the transport layer. It is used for commands that require a string value.

Parameters:
  • cmd (str) – The command string to be sent.

  • value (str) – The string value to be included in the command.

Example

>>> await device_client.write_string_value('set', '80.000')
async write_value(cmd: str, value: int | float | str | bool)[source]

Asynchronously writes a value to the device using the specified command.

Parameters:
  • cmd (str) – The command string to send to the device.

  • value (Union[int, float, str, bool]) – The value to write, which can be an integer, float, string, or boolean.

Example

>>> await device_client.write('set', 80)

nv200_device

class NV200Device[source]

Bases: PiezoDeviceBase

A high-level asynchronous client for communicating with NV200 piezo controllers.

This class extends the PiezoDeviceBase base class and provides high-level methods for setting and getting various device parameters, such as PID mode, setpoint, etc.

setpoint_lpf

Interface for the setpoint low-pass filter.

Type:

NV200Device.LowpassFilter

position_lpf

Interface for the position low-pass filter.

Type:

NV200Device.LowpassFilter

notch_filter

Interface for the notch filter.

Type:

NV200Device.NotchFilter

pid

Interface for the PID controller to set the controller mode and PID gains.

Type:

NV200Device.PIDController

CACHEABLE_COMMANDS: set[str] = {'acmeasure', 'avmax', 'avmin', 'cl', 'ctrlmode', 'kd', 'ki', 'kp', 'modsrc', 'monsrc', 'notchb', 'notchon', 'poslpf', 'poslponnotchf', 'posmax', 'posmin', 'setlpf', 'setlpon', 'spisrc', 'sr', 'unitcl', 'unitol'}
DEVICE_ID = 'NV200/D_NET'
class LowpassFilter[source]

Bases: object

Interface to a low-pass filter configuration on the NV200 device.

This class is parameterized by the command strings for enabling the filter and setting/getting the cutoff frequency.

__init__(device: NV200Device, enable_cmd: str, cutoff_cmd: str, cutoff_range: ValueRange[float]) None[source]

Initialize the LowpassFilter.

Parameters:
  • device (NV200Device) – The parent device instance.

  • enable_cmd (str) – The device command to enable/disable the filter.

  • cutoff_cmd (str) – The device command to set/get the cutoff frequency.

async enable(enable: bool) None[source]

Enable or disable the low-pass filter.

Parameters:

enable (bool) – True to enable, False to disable.

async get_cutoff() float[source]

Get the cutoff frequency of the low-pass filter.

Returns:

The cutoff frequency in Hz.

Return type:

int

async is_enabled() bool[source]

Check if the low-pass filter is enabled.

Returns:

True if enabled, False otherwise.

Return type:

bool

async set_cutoff(frequency: float) None[source]

Set the cutoff frequency of the low-pass filter.

Parameters:

frequency (int) – The cutoff frequency in Hz (valid range 1..10000).

class NotchFilter[source]

Bases: object

Interface to a notch filter configuration on the NV200 device.

Allows enabling/disabling the notch filter and configuring its center frequency and -3 dB bandwidth.

__init__(device: NV200Device) None[source]

Initialize the NotchFilter interface.

Parameters:

device (NV200Device) – The parent device instance used for communication.

async enable(enable: bool) None[source]

Enable or disable the notch filter.

Parameters:

enable (bool) – True to enable the filter, False to disable.

async get_bandwidth() int[source]

Get the -3 dB bandwidth of the notch filter.

Returns:

Current bandwidth in Hz.

Return type:

int

async get_frequency() int[source]

Get the center frequency of the notch filter.

Returns:

Current center frequency in Hz.

Return type:

int

async is_enabled() bool[source]

Check if the notch filter is currently enabled.

Returns:

True if the filter is enabled, False otherwise.

Return type:

bool

async set_bandwidth(bandwidth: int) None[source]

Set the -3 dB bandwidth of the notch filter.

Parameters:

bandwidth (int) – Bandwidth in Hz. Valid range is 1 to 10,000 Hz, but must not exceed 2 × center frequency.

async set_frequency(frequency: int) None[source]

Set the center frequency of the notch filter.

Parameters:

frequency (int) – Center frequency in Hz. Valid range is 1 to 10,000 Hz.

class PIDController[source]

Bases: object

PIDController provides an interface for configuring and controlling the PID control loop of an NV200 device. This class allows enabling/disabling closed loop control, setting and retrieving PID gains, and configuring feed-forward control amplification factors for position, velocity, and acceleration.

__init__(device: NV200Device) None[source]

Initialize the NotchFilter interface.

Parameters:

device (NV200Device) – The parent device instance used for communication.

async get_mode() PidLoopMode[source]

Retrieves the current PID mode of the device.

async get_pcf_gains() PCFGains[source]

Retrieves the feed forward control amplification factors.

Returns:

The feed forward factors for position, velocity, and acceleration.

Return type:

PCFGains

async get_pid_gains() PIDGains[source]

Retrieves the PID gains (kp, ki, kd) for the device.

Returns:

A named tuple with fields kp, ki, and kd.

Return type:

PIDGains

async is_closed_loop() bool[source]

Check if the device is currently in closed loop control mode. :returns: True if in closed loop mode, False otherwise. :rtype: bool

async set_closed_loop(enable: bool) None[source]

Enable or disable closed loop control mode.

async set_mode(mode: PidLoopMode)[source]

Sets the PID mode of the device to either open loop or closed loop.

async set_pcf_gains(position: float | None = None, velocity: float | None = None, acceleration: float | None = None) None[source]

Sets the PID controllers feed forward control amplification factors.

Parameters:
  • position (float | None) – Factor for position feed-forward. Leave unchanged if None.

  • velocity (float | None) – Factor for velocity feed-forward. Leave unchanged if None.

  • acceleration (float | None) – Factor for acceleration feed-forward (scaled by 1/1_000_000 internally). Leave unchanged if None.

async set_pid_gains(kp: float | None = None, ki: float | None = None, kd: float | None = None) None[source]

Sets the PID gains for the device.

__init__(transport: TransportProtocol)[source]

Initialize NV200Device and its low-pass filter interfaces.

async enrich_device_info(detected_device: DetectedDevice) None[source]

Get additional information about the device.

A derived class should implement this method to enrich the device information in the given detected_device object.

Parameters:

detected_device (DetectedDevice) – The detected device object to enrich with additional information.

async export_actuator_config(path: str = '', filename: str = '') str[source]

Asynchronously exports the actuator configuration parameters to an INI file. This method reads a predefined set of actuator configuration parameters from the device, and writes them to an INI file. The file can be saved to a specified path and filename, or defaults will be used based on the actuator’s description and serial number.

Parameters:
  • path (str, optional) – Directory path where the configuration file will be saved. If not provided, the file will be saved in the current working directory.

  • filename (str, optional) – Name of the configuration file. If not provided, a default name in the format ‘actuator_conf_{desc}_{acserno}.ini’ will be used.

Returns:

The full path to the saved configuration file.

Raises:

Any exceptions raised during file writing or parameter reading will propagate.

static from_detected_device(detected_device: DetectedDevice) NV200Device[source]

Creates an NV200Device instance from a DetectedDevice object by selecting the appropriate transport protocol based on the detected device’s transport type. :param detected_device: The detected device containing transport type and identifier. :type detected_device: DetectedDevice

Returns:

An instance of NV200Device initialized with the correct transport protocol.

Return type:

NV200Device

async get_actuator_description() str[source]

Retrieves the description of the actuator that is connected to the NV200 device. The description consists of the actuator type and the serial number. For example: “TRITOR100SG, #85533”

async get_actuator_name() str[source]

Retrieves the name of the actuator that is connected to the NV200 device.

async get_actuator_sensor_type() PostionSensorType[source]

Retrieves the type of position sensor used by the actuator. Returns a PostionSensorType enum value.

async get_actuator_serial_number() str[source]

Retrieves the serial number of the actuator that is connected to the NV200 device.

async get_analog_monitor_source() AnalogMonitorSource[source]

Returns the source of data for analog output (monsrc).

async get_control_mode() CtrlMode[source]

Retrieves the current control mode of the device.

Returns:

The current control mode.

Return type:

CtrlMode

async get_current_position() float[source]

Retrieves the current position of the device. For actuators with sensor: Position in actuator units (μm or mrad) For actuators without sensor: Piezo voltage in V

async get_heat_sink_temperature() float[source]

Retrieves the heat sink temperature in degrees Celsius.

async get_max_position() float[source]

Retrieves the maximum position of the device. For actuators with sensor: Maximum position in actuator units (μm or mrad) For actuators without sensor: Maximum piezo voltage in V

async get_max_voltage() float[source]

Retrieves the maximum voltage of the device. This is the maximum voltage that can be applied to the piezo actuator.

async get_min_position() float[source]

Retrieves the minimum position of the device. For actuators with sensor: Minimum position in actuator units (μm or mrad) For actuators without sensor: Minimum piezo voltage in V

async get_min_voltage() float[source]

Retrieves the minimum voltage of the device. This is the minimum voltage that can be applied to the piezo actuator.

async get_modulation_source() ModulationSource[source]

Retrieves the current setpoint modulation source.

async get_position_range() Tuple[float, float][source]

Retrieves the position range of the device for closed loop control. Returns a tuple containing the minimum and maximum position.

async get_position_unit() str[source]

Retrieves the position unit of the device. This is typically “μm” for micrometers for linear actuatros or “mrad” for milliradians for tilting actuators.

async get_setpoint() float[source]

Retrieves the current setpoint of the device.

async get_setpoint_range() Tuple[float, float][source]

Retrieves the setpoint range of the device. Returns a tuple containing the minimum and maximum setpoint. The setpoint range is determined by the position range for closed loop control and the voltage range for open loop control.

async get_setpoint_unit() str[source]

Retrieves the current setpoint unit of the device. This is typically “V” for volts in open loop or the position unit in closed loop.

async get_slew_rate() float[source]

Retrieves the slew rate of the device. The slew rate is the maximum speed at which the device can move.

async get_spi_monitor_source() SPIMonitorSource[source]

Returns the source for the SPI/Monitor value returned via SPI MISO.

async get_status_register() StatusRegister[source]

Retrieves the status register of the device.

async get_voltage_range() Tuple[float, float][source]

Retrieves the voltage range of the device for open loop control. Returns a tuple containing the minimum and maximum voltage.

async get_voltage_unit() str[source]

Retrieves the voltage unit of the device. This is typically “V” for volts.

async import_actuator_config(filepath: str)[source]

Imports actuator configuration from an INI file.

Parameters:

filepath – Path to the INI file with the actuator configuration.

async is_status_flag_set(flag: StatusFlags) bool[source]

Checks if a specific status flag is set in the status register.

async move(target: float)[source]

Moves the device to the specified target position or voltage. The target is interpreted as a position in closed loop or a voltage in open loop.

async move_to_position(position: float)[source]

Moves the device to the specified position in closed loop

async move_to_voltage(voltage: float)[source]

Moves the device to the specified voltage in open loop

async set_analog_monitor_source(source: AnalogMonitorSource)[source]

Sets the source of data for analog output (monsrc).

async set_control_mode(mode: CtrlMode) None[source]

Sets the control mode of the device.

Parameters:

mode (CtrlMode) – The control mode to set.

async set_modulation_source(source: ModulationSource)[source]

Sets the setpoint modulation source.

async set_setpoint(setpoint: float)[source]

Sets the setpoint value for the device.

async set_slew_rate(slew_rate: float)[source]

Sets the slew rate of the device. 0.0000008 … 2000.0 %ms⁄ (2000 = disabled)

async set_spi_monitor_source(source: SPIMonitorSource)[source]

Sets the source for the SPI/Monitor value returned via SPI MISO.

spibox_device

class SpiBoxDevice[source]

Bases: PiezoDeviceBase

A high-level asynchronous client for communicating with NV200 piezo controllers. This class extends the PiezoDeviceBase base class and provides high-level methods for setting and getting various device parameters, such as PID mode, setpoint,

DEVICE_ID = 'SPI Controller Box'
async connect(auto_adjust_comm_params: bool = True)[source]

Establishes a connection using the transport layer.

async set_setpoints_percent(ch1: float = 0, ch2: float = 0, ch3: float = 0) List[float][source]

Set device setpoints as percentages (0.0 to 100.0) for 3 channels.

Converts percent values to 16-bit hex strings and sends them as a formatted command.

async set_waveforms(ch1: ndarray | None, ch2: ndarray | None, ch3: ndarray | None) List[ndarray][source]

Set waveforms for the device channels.

Each channel can be set to a waveform represented as a numpy array of floats. The values are expected to be in the range [0.0, 100.0].

Parameters:
  • ch1 (Optional[np.ndarray]) – Waveform for channel 1.

  • ch2 (Optional[np.ndarray]) – Waveform for channel 2.

  • ch3 (Optional[np.ndarray]) – Waveform for channel 3.

Returns:

Response from the device after setting the waveforms.

Return type:

str

parse_hex_to_floats_percent(data: str) List[float][source]

Parses a comma-separated string of 4-character hexadecimal values into a list of floats.

The hex values are interpreted as unsigned 16-bit integers and converted to floats.

Parameters:

data (str) – A string of comma-separated 4-character hex values (e.g., “0000,FFFD,FFFD”).

Returns:

A list of float representations of the parsed unsigned integers.

Return type:

List[float]

percent_to_hex(value: float) str[source]

Converts a percentage value (0.0 to 100.0) to a 4-digit hexadecimal string.

shared_types

This module defines enumerations, classes, and data structures for representing device types, status flags, error codes, and related information for NV200 devices.

Classes and Enums:

  • PidLoopMode (Enum): Modes of operation for a PID control loop (open/closed loop).

  • ErrorCode (Enum): Error codes and descriptions for device errors.

  • StatusFlags (IntFlag): Bit flags representing the status register of a device.

  • ModulationSource (Enum): Sources for setpoint modulation.

  • StatusRegister: Class for parsing and representing a 16-bit status register.

  • DeviceError (Exception): Custom exception for device-related errors.

  • TransportType (Enum): Supported transport types (telnet, serial).

  • DetectedDevice (dataclass): Structure for detected device information.

Functionality:

  • Provides enums for device modes, errors, and status flags.

  • Offers utility methods for error code conversion and description lookup.

  • Parses and interprets status register values.

  • Defines a custom exception for device errors.

  • Structures device detection information for network or serial connections.

class AnalogMonitorSource[source]

Bases: Enum

Enum representing sources for SPI monitor return values via MISO.

Each value corresponds to a specific source of data returned over SPI.

ABS_POSITION_ERROR = 4

Absolute position error

CLOSED_LOOP_POS = 0

Position in closed-loop mode

OPEN_LOOP_POS = 5

Position in open-loop mode

PIEZO_CURRENT_1 = 6

Piezo current channel 1

PIEZO_CURRENT_2 = 7

Piezo current channel 2

PIEZO_VOLTAGE = 2

Piezo voltage (controller output)

POSITION_ERROR = 3

Position error

SETPOINT = 1

Setpoint value

class CtrlMode[source]

Bases: Enum

Enumeration for Controller Operation Modes. Defines how the controller operates in various configurations.

ILC_FEEDBACK = 3

Combines feedback with learned feedforward for improved performance.

Type:

ILC Feedback Mode

ILC_FEEDFORWARD = 2

Applies learned feedforward control based on previous iterations.

Type:

ILC Feedforward Mode

ILC_IDENTIFICATION = 1

Used for system identification in Iterative Learning Control.

Type:

ILC Identification Mode

PID = 0

Standard Proportional-Integral-Derivative control.

Type:

PID control mode

class DetectedDevice[source]

Bases: object

Represents a device detected on the network or via serial connection

transport

The transport type used to communicate with the device (e.g., Ethernet, Serial).

Type:

TransportType

identifier

A unique identifier for the device, such as an IP address or serial port name.

Type:

str

mac

The MAC address of the device, if available.

Type:

Optional[str]

device_id

A unique identifier for the device, if available. such as NV200/D_NET

Type:

Optional[str]

device_info

Dictionary with additional information about the device, such as actuator name and serial number.

Type:

Dict[str, str]

__init__(transport: ~nv200.shared_types.TransportType, identifier: str, mac: str | None = None, device_id: str | None = None, device_info: ~typing.Dict[str, str] = <factory>) None
exception DeviceError[source]

Bases: Exception

Custom exception class for handling device-related errors.

error_code

The error code associated with the exception.

Type:

ErrorCode

description

A human-readable description of the error.

Type:

str

Parameters:

error_code (ErrorCode) – An instance of the ErrorCode enum representing the error.

Raises:

ValueError – If the provided error_code is not a valid instance of the ErrorCode enum.

__init__(error_code: ErrorCode)[source]
class DeviceInfo[source]

Bases: object

Represents information about a device, including its transport type, identifier, and optional metadata.

transport

The type of transport used to communicate with the device.

Type:

TransportType

identifier

The primary identifier for the device (e.g., IP address or serial port).

Type:

str

mac

The MAC address of the device, if available.

Type:

Optional[str]

device_id

A unique identifier for the device, if available.

Type:

Optional[str]

__init__(transport_info: ~nv200.shared_types.TransportProtocolInfo, device_id: str | None = None, extended_info: ~typing.Dict[str, str] = <factory>) None
class DiscoverFlags[source]

Bases: Flag

Flags to configure the behavior of the device discovery process.

These flags can be combined using the bitwise OR (|) operator.

DETECT_SERIAL

Enables detection of serial devices.

DETECT_ETHERNET

Enables detection of ethernet devices.

READ_DEVICE_INFO

Enriches discovered devices with additional information such as actuator name and actuator serial number.

ADJUST_COMM_PARAMS

Automatically adjusts communication parameters for discovered devices. This may take some additional time, as it may involve reading and writing to the device or even resetting it.

ALL

Enables all discovery actions (serial, ethernet, and enrichment).

__new__(value)
static flags_for_transport(transport: TransportType | None = None) DiscoverFlags[source]

Maps a TransportType to the appropriate DiscoverFlags.

Parameters:

transport – The transport type (e.g., SERIAL or TELNET)

Returns:

DiscoverFlags corresponding to the selected transport type.

class ErrorCode[source]

Bases: Enum

ErrorCode(Enum):

An enumeration representing various error codes and their corresponding descriptions.

classmethod from_value(value: int)[source]

Convert an integer into an ErrorCode enum member.

classmethod get_description(error_code) str[source]

Retrieves a human-readable description for a given error code.

Parameters:

error_code (int) – The error code for which the description is requested.

Returns:

A string describing the error associated with the provided error code.

If the error code is not recognized, “Unknown error” is returned.

Return type:

str

class ModulationSource[source]

Bases: Enum

Enumeration for setpoint modulation source.

class NetworkEndpoint[source]

Bases: object

Represents a network endpoint identified by a MAC and IP address.

mac

The MAC (Media Access Control) address of the endpoint, typically in the format ‘00:1A:2B:3C:4D:5E’.

Type:

str

ip

The IPv4 or IPv6 address of the endpoint, e.g., ‘192.168.1.100’

Type:

str

__init__(mac: str, ip: str) None
class PCFGains[source]

Bases: NamedTuple

Represents feed-forward control amplification factors for position, velocity, and acceleration.

static __new__(_cls, position: float, velocity: float, acceleration: float)

Create new instance of PCFGains(position, velocity, acceleration)

acceleration: float

Alias for field number 2

position: float

Alias for field number 0

velocity: float

Alias for field number 1

class PIDGains[source]

Bases: NamedTuple

A NamedTuple representing the proportional, integral, and derivative gains for a PID controller.

static __new__(_cls, kp: float, ki: float, kd: float)

Create new instance of PIDGains(kp, ki, kd)

kd: float

Alias for field number 2

ki: float

Alias for field number 1

kp: float

Alias for field number 0

class PidLoopMode[source]

Bases: Enum

PidLoopMode is an enumeration that defines the modes of operation for a PID control loop.

class PostionSensorType[source]

Bases: Enum

Enum representing the type of position sensor used in the actuator.

NONE

No position sensor is connected.

STRAIN_GAUGE

A strain gauge sensor is connected.

CAPACITIVE

A capacitive sensor is connected.

LVDT_INDUCTIVE

An inductive LVDT (Linear Variable Differential Transformer) sensor is connected.

class ProgressCallback[source]

Bases: Protocol

A callback to report progress during long running tasks

Parameters:
  • current_index (int) – The current item being processed (1-based).

  • total_count (int) – The total number of items.

__init__(*args, **kwargs)
class SPIMonitorSource[source]

Bases: Enum

Enum representing sources for SPI monitor return values via MISO.

Each value corresponds to a specific source of data returned over SPI.

ABS_POSITION_ERROR = 5

Absolute position error

CLOSED_LOOP_POS = 1

Position in closed-loop mode

OPEN_LOOP_POS = 6

Position in open-loop mode

PIEZO_CURRENT_1 = 7

Piezo current channel 1

PIEZO_CURRENT_2 = 8

Piezo current channel 2

PIEZO_VOLTAGE = 3

Piezo voltage (controller output)

POSITION_ERROR = 4

Position error

SETPOINT = 2

Setpoint value

TEST_VALUE_0x5A5A = 9

Test value (0x5A5A)

ZERO = 0

0x0000 (constant zero value)

class StatusFlags[source]

Bases: IntFlag

Enum representing the individual status flags within a 16-bit status register.

__new__(value)
static get_sensor_type(value)[source]

Determines the type of sensor based on the sensor bits in the status register.

Parameters:

value – The 16-bit status register value.

Returns:

A string describing the sensor type.

class StatusRegister[source]

Bases: object

A class representing the 16-bit status register of an actuator or amplifier.

__init__(value: int)[source]

Initializes the StatusRegister with a given 16-bit value.

Parameters:

value – The 16-bit status register value.

has_flag(flag: StatusFlags)[source]

Checks if a given status flag is set in the register.

Parameters:

flag – A StatusFlags enum value to check.

Returns:

True if the flag is set, False otherwise.

class TimeSeries[source]

Bases: object

TimeSeries represents waveform data with amplitude values (values) and corresponding sample times (sample_times_ms). It also includes a sample time in milliseconds.

__init__(values: list, sample_time_ms: float)[source]

Initialize the TimeSeries instance with amplitude values and sample time.

Parameters:
  • values (list) – The amplitude values corresponding to the waveform.

  • sample_time_ms (int) – The sample time in milliseconds (sampling interval).

generate_sample_times_ms() Generator[float, None, None][source]

Generator function to return time (sample_times_ms) values as they are requested. This will calculate and yield the corresponding time values based on sample_time_us.

property sample_freq_hz: float

Returns the sample frequency in Hertz (Hz), calculated as the inverse of the sample time.

property sample_period_ms: float

Returns the sample period in milliseconds, which is the same as sample_time_ms. This property is provided for compatibility with other systems that may expect this terminology.

property sample_time_ms: float

Returns the sample time in milliseconds.

property sample_times_ms: list

Return all time (sample_times_ms) values as a list, calculated based on the sample time.

set_value_at_index(index: int, value: float) None[source]

Set the amplitude value at a specific index.

Parameters:
  • index (int) – The index at which to set the value.

  • value (float) – The new amplitude value.

Raises:

IndexError – If the index is out of range.

property values: list

Return the amplitude values (values) as a list.

class TransportProtocolInfo[source]

Bases: object

Represents the protocol information for a transport type.

__init__(transport: TransportType, identifier: str, mac: str | None = None) None
class TransportType[source]

Bases: str, Enum

Enumeration of supported transport types for device communication.

TELNET

Represents the Telnet protocol for network communication.

SERIAL

Represents serial communication (e.g., RS-232).

__new__(value)
class ValueRange[source]

Bases: Generic[ValueRangeType]

An immutable generic value range with a minimum and maximum value.

min

The minimum value of the range.

Type:

ValueRangeType

max

The maximum value of the range.

Type:

ValueRangeType

__init__(min: ValueRangeType, max: ValueRangeType) None
contains(value: ValueRangeType) bool[source]

Check whether a value is within the range.

Parameters:

value (ValueRangeType) – The value to check.

Returns:

True if value is within the range (inclusive), False otherwise.

Return type:

bool

Transport Protocols

transport_protocol

This module defines the transport protocols for communicating with NV200 devices, including Telnet and Serial interfaces.

Classes:

Example

import asyncio
from nv200.device_interface import DeviceClient
from nv200.transport_protocols import SerialProtocol

async def serial_port_auto_detect():
    transport = SerialProtocol()
    client = DeviceClient(transport)
    await client.connect()
    print(f"Connected to device on serial port: {transport.port}")
    await client.close()

if __name__ == "__main__":
    asyncio.run(serial_port_auto_detect())
class TransportProtocol[source]

Bases: ABC

Abstract base class representing a transport protocol interface for a device.

CR = b'\r'
CRLF = b'\r\n'
DEFAULT_TIMEOUT_SECS = 0.6
LF = b'\n'
XOFF = b'\x13'
XON = b'\x11'
__init__()[source]

Initializes the TransportProtocol base class. Subclasses may extend this constructor to initialize protocol-specific state.

abstractmethod async close()[source]

Asynchronously closes the connection or resource associated with this instance.

This method should be used to release any resources or connections that were opened during the lifetime of the instance. Ensure that this method is called to avoid resource leaks.

Raises:

Exception – If an error occurs while attempting to close the resource.

abstractmethod async connect(auto_adjust_comm_params: bool = True, device: PiezoDeviceBase | None = None)[source]

Establishes an asynchronous connection to the NV200 device.

This method is intended to handle the initialization of a connection to the NV200 device. The implementation should include the necessary steps to ensure the connection is successfully established.

Raises:

Exception – If the connection fails or encounters an error.

abstractmethod async flush_input()[source]

Asynchronously flushes or clears the input buffer of the transport protocol.

This method is intended to remove any pending or unread data from the input stream, ensuring that subsequent read operations start with a clean buffer. It is typically used to prevent processing of stale or unwanted data.

abstractmethod get_info() TransportProtocolInfo[source]

Returns metadata about the transport protocol, such as type and identifier.

async read_message(timeout: float = 0.6) str[source]

Asynchronously reads a complete delimited message from the device

Returns:

The response read from the source.

Return type:

str

abstractmethod async read_until(expected: bytes = b'\x11', timeout: float = 0.6) str[source]

Asynchronously reads data from the connection until the specified expected byte sequence is encountered.

Parameters:

expected (bytes, optional) – The byte sequence to read until. Defaults to serial.XON.

Returns:

The data read from the serial connection, decoded as a string, .

Return type:

str

abstractmethod async write(cmd: str)[source]

Sends a command to the NV200 device asynchronously.

Parameters:

cmd (str) – The command string to be sent to the device.

Raises:

Exception – If there is an error while sending the command.

serial_protocol

class SerialProtocol[source]

Bases: TransportProtocol

A class to handle serial communication with an NV200 device using the AioSerial library. .. attribute:: port

The serial port to connect to. Defaults to None. If port is None, the class

type:

str

will try to auto detect the port.
baudrate

The baud rate for the serial connection. Defaults to 115200.

Type:

int

serial

The AioSerial instance for asynchronous serial communication.

Type:

AioSerial

__init__(port: str | None = None, baudrate: int = 115200)[source]

Initializes the NV200 driver with the specified serial port settings.

Parameters:
  • port (str, optional) – The serial port to connect to. Defaults to None. If port is None, the class will try to auto detect the port.

  • baudrate (int, optional) – The baud rate for the serial connection. Defaults to 115200.

async close()[source]

Asynchronously closes the connection or resource associated with this instance.

This method should be used to release any resources or connections that were opened during the lifetime of the instance. Ensure that this method is called to avoid resource leaks.

Raises:

Exception – If an error occurs while attempting to close the resource.

async connect(auto_adjust_comm_params: bool = True, device: PiezoDeviceBase | None = None)[source]

Establishes an asynchronous connection to the NV200 device using the specified serial port settings.

This method initializes the serial connection with the given port, baud rate, and flow control settings. If the port is not specified, it attempts to automatically detect the NV200 device’s port. If the device cannot be found, a RuntimeError is raised.

Raises:

RuntimeError – If the NV200 device cannot be detected or connected to.

async detect_port(device: PiezoDeviceBase) str | None[source]

Asynchronously detects and configures the serial port for the NV200 device.

This method scans through all available serial ports to find one with a manufacturer matching “FTDI”. If such a port is found, it attempts to communicate with the device to verify if it is an NV200 device. If the device is successfully detected, the port is configured and returned.

Returns:

The device name of the detected port if successful, otherwise None.

Return type:

str

async static discover_devices(flags: DiscoverFlags) List[DetectedDevice][source]

Asynchronously discovers all devices connected via serial interface.

Returns:

A list of serial port strings where a device has been detected.

Return type:

list

async flush_input()[source]

Discard all available input within a short timeout window.

get_info() TransportProtocolInfo[source]

Returns metadata about the transport protocol, such as type and identifier.

property port: str

Returns the serial port the device is connected to

async read_until(expected: bytes = b'\x11', timeout: float = 0.6) str[source]

Asynchronously reads data from the connection until the specified expected byte sequence is encountered.

Parameters:

expected (bytes, optional) – The byte sequence to read until. Defaults to serial.XON.

Returns:

The data read from the serial connection, decoded as a string, .

Return type:

str

property serial: AioSerial | None

Provides access to the internal AioSerial interface. :returns: The internal AioSerial instance. :rtype: AioSerial

async write(cmd: str)[source]

Sends a command to the NV200 device asynchronously.

Parameters:

cmd (str) – The command string to be sent to the device.

Raises:

Exception – If there is an error while sending the command.

telnet_protocol

class TelnetProtocol[source]

Bases: TransportProtocol

TelnetTransport is a class that implements a transport protocol for communicating with piezosystem devices over Telnet. It provides methods to establish a connection, send commands, read responses, and close the connection.

property MAC: str

Returns the MAC address.

__init__(host: str = '', port: int = 23, MAC: str = '')[source]

Initializes the transport protocol.

Parameters:
  • host (str, optional) – The hostname or IP address of the NV200 device. Defaults to None.

  • port (int, optional) – The port number to connect to. Defaults to 23.

  • MAC (str, optional) – The MAC address of the NV200 device. Defaults to None.

async close()[source]

Asynchronously closes the connection or resource associated with this instance.

This method should be used to release any resources or connections that were opened during the lifetime of the instance. Ensure that this method is called to avoid resource leaks.

Raises:

Exception – If an error occurs while attempting to close the resource.

async static configure_flow_control_mode(host: str) bool[source]

Configures the flow control mode for the device to pass XON/XOFF characters to host

async connect(auto_adjust_comm_params: bool = True, device: PiezoDeviceBase | None = None)[source]

Establishes a connection to a Lantronix device.

This asynchronous method attempts to connect to a Lantronix device using either the provided MAC address or by discovering devices on the network.

  • If self.host is None and self.MAC is provided, it discovers the device’s IP address using the MAC address.

  • If both self.host and self.MAC are None, it discovers all available Lantronix devices on the network and selects the first one.

Once the device’s IP address is determined, it establishes a Telnet connection to the device using the specified host and port.

Raises:

RuntimeError – If no devices are found during discovery.

async classmethod discover_devices(flags: DiscoverFlags) List[DetectedDevice][source]

Asynchronously discovers all devices connected via ethernet interface

Returns:

A list of dictionaries containing device information (IP and MAC addresses).

Return type:

list

async flush_input()[source]

Discard all available input within a short timeout window.

get_info() TransportProtocolInfo[source]

Returns metadata about the transport protocol, such as type and identifier.

property host: str

Returns the host address.

async is_xon_xoff_forwared_to_host() bool[source]

Checks if XON/XOFF flow control is forwarded to the host.

This method sends a command to the device and checks the response to determine if XON/XOFF flow control is enabled. The detection is based on whether the response starts with the byte sequence “XON/XOFF”.

Returns:

True if XON/XOFF flow control is forwarded to the host, False otherwise.

Return type:

bool

async read_until(expected: bytes = b'\x11', timeout: float = 0.6) str[source]

Asynchronously reads data from the connection until the specified expected byte sequence is encountered.

Parameters:

expected (bytes, optional) – The byte sequence to read until. Defaults to serial.XON.

Returns:

The data read from the serial connection, decoded as a string, .

Return type:

str

async write(cmd: str)[source]

Sends a command to the NV200 device asynchronously.

Parameters:

cmd (str) – The command string to be sent to the device.

Raises:

Exception – If there is an error while sending the command.

Data and Signal Modules

data_recorder

This module provides access to the NV200 data recorder functionality.

class DataRecorder[source]

Bases: object

Data recorder class provides an interface for NV200 data recorder. The data recorder consists of two memory banks that are written to in parallel. In this way, two individual signals can be stored synchronously.

ALL_CHANNELS = -1
BUFFER_READ_TIMEOUT_SECS = 6
class ChannelRecordingData[source]

Bases: TimeSeries

WaveformData is a NamedTuple that represents waveform data.

x_time

A list of time values (in seconds) corresponding to the waveform.

Type:

List[float]

y_values

A list of amplitude values corresponding to the waveform.

Type:

List[float]

sample_time_us

The sampling time in microseconds.

Type:

int

sample_factor

A factor used to calculate the sample time from the base sample time.

Type:

int

__init__(values: list, sample_time_ms: int, source: DataRecorderSource)[source]

Initialize the ChannelData instance with amplitude values, sample time, and source.

Parameters:
  • values (list) – The amplitude values corresponding to the waveform.

  • sample_time_ms (int) – The sample time in milliseconds (sampling interval).

  • source (str) – The data recorder source

property source: DataRecorderSource

Read-only property to get the maximum sample buffer size for the data recorder.

INFINITE_RECORDING_DURATION = 0
NV200_RECORDER_BUFFER_SIZE = 6144
NV200_RECORDER_SAMPLE_RATE_HZ = 20000
class RecorderParam

Bases: tuple

RecorderParam(bufsize, stride, sample_freq)

static __new__(_cls, bufsize, stride, sample_freq)

Create new instance of RecorderParam(bufsize, stride, sample_freq)

bufsize

Alias for field number 0

sample_freq

Alias for field number 2

stride

Alias for field number 1

__init__(device: NV200Device)[source]

Initializes the data recorder with the specified NV200 device.

Parameters:

device (NV200Device) – The NV200 device instance to be used by the data recorder.

_dev

Stores the provided NV200 device instance.

Type:

NV200Device

_sample_rate

The sample rate for data recording, initially set to None.

Type:

int | None

classmethod get_sample_period_ms_for_duration(milliseconds: float) float[source]

Calculates the sample period in seconds that is possible with the specified duration in milliseconds.

classmethod get_sample_rate_for_duration(milliseconds: float) float[source]

Calculates the sample rate that is possible with the specified duration in milliseconds.

async is_recording() bool[source]

Check if the recoder is currently recording.

property max_sample_buffer_size: int

Read-only property to get the maximum sample buffer size for the data recorder.

async read_recorded_data() List[ChannelRecordingData][source]

Asynchronously reads recorded data for two channels and returns it as a list.

This method retrieves the recorded data for channel 0 and channel 1 by calling read_recorded_data_of_channel for each channel. The results are returned as a list of ChannelRecordingData objects.

Returns:

A list containing the recorded data for channel 0 and channel 1.

Return type:

List[ChannelRecordingData]

async read_recorded_data_of_channel(channel: int) ChannelRecordingData[source]

Asynchronously reads recorded data from a specified channel.

Parameters:

channel (int) – The channel number from which to read the recorded data.

Returns:

An object containing the recording source as a string and a list of floating-point numbers representing the recorded data.

Return type:

ChannelRecordingData

Raises:

Any exceptions raised by the underlying device communication methods.

async set_autostart_mode(mode: RecorderAutoStartMode)[source]

Sets the autostart mode of the data recorder.

async set_data_source(channel: int, source: DataRecorderSource)[source]

Sets the channel and the source of data to be stored in the data recorder channel.

async set_recorder_stride(stride: int)[source]

Sets the recorder stride.

async set_recording_duration_ms(milliseconds: float) RecorderParam[source]

Sets the recording duration in milliseconds and adjusts the recorder parameters accordingly.

This method calculates the appropriate stride, sample rate, and buffer size based on the specified recording duration and the recorder’s configuration. It then updates the recorder settings to match these calculated values.

Parameters:

milliseconds (float) – The desired recording duration in milliseconds.

Returns:

An object containing the updated buffer length, stride, and sample rate.

Return type:

RecorderParam

Raises:

ValueError – If the calculated buffer size or stride is invalid.

async set_sample_buffer_size(buffer_size: int)[source]

Sets the sample buffer size for each of the two data recorder channels (0..6144) A value of 0 means infinite loop over maximum length until recorder is stopped manually. If you would like to have an infinite loop, use the constant DataRecorder.INFINITE_RECORDING_DURATION. You can get the maximum buffer size using the max_sample_buffer_size property.

async start_recording(start: bool = True)[source]

Starts / stops the data recorder.

async stop_recording()[source]

Stops the recording process by invoking the start_recording method with False.

async wait_until_finished(timeout_s: float = 10.0)[source]

Waits asynchronously until the recording process has finished or the specified timeout is reached.

Parameters:

timeout_s (float) – The maximum time to wait in seconds. Defaults to 10.0.

Returns:

True if the recording process finished within the timeout, False otherwise.

Return type:

bool

Raises:

asyncio.TimeoutError – If the timeout is reached before the recording process finishes.

class DataRecorderSource[source]

Bases: Enum

Enum representing the source of data to be stored in the data recorder channel, ignoring the buffer (A or B) distinction.

ABS_POSITION_ERROR = 4

Absolute position error

PIEZO_CURRENT_1 = 6

Piezo current 1 (A)

PIEZO_CURRENT_2 = 7

Piezo current 2 (A)

PIEZO_POSITION = 0

Piezo position (μm or mrad)

PIEZO_VOLTAGE = 2

Piezo voltage (V)

POSITION_ERROR = 3

Position error

SETPOINT = 1

Setpoint (μm or mrad)

classmethod from_value(value: int)[source]
class RecorderAutoStartMode[source]

Bases: Enum

Enum representing the autostart mode of the data recorder.

OFF = 0

Autostart off - start recording manually with recorder start command

START_ON_SET_COMMAND = 1

Start on set-command

START_ON_WAVEFORM_GEN_RUN = 2

Start on waveform generator run

classmethod get_mode(value: int) RecorderAutoStartMode[source]

Given a mode value, return the corresponding RecorderAutoStartMode enum.

waveform_generator

WaveformGenerator module for controlling waveform generation on a connected NV200 device.

This module defines the WaveformGenerator class, which provides methods to configure and control waveform generation, including the ability to generate sine waves, set cycles, adjust sampling times, and manage waveform buffers. It supports asynchronous interaction with the device for real-time control of waveform generation.

Classes:
  • WaveformGenerator: Manages waveform generation and configuration on the NV200 device.

  • WaveformData: Represents waveform data with time, amplitude, and sample time.

class WaveformGenerator[source]

Bases: object

WaveformGenerator is a class responsible for generating waveforms using a connected device.

class WaveformData[source]

Bases: TimeSeries

WaveformData is a NamedTuple that represents waveform data.

property cycle_time_ms

Returns the cycle of a single cycle in milliseconds.

property sample_factor

Returns the sample factor used to calculate the sample time from the base sample time.

__init__(device: NV200Device)[source]

Initializes the WaveformGenerator instance with the specified device client.

Parameters:

device (DeviceClient) – The device client used for communication with the hardware.

async configure_waveform_loop(start_index: int, loop_start_index: int, loop_end_index: int)[source]

Sets the start and end indices for the waveform loop. The start index is the index where the waveform generator starts when it is started. The loop start index is the index where the waveform generator starts in the next cycle and the loop end index is the index where the waveform generator jumps to the next cycle.

classmethod generate_constant_wave(freq_hz: float, constant_level: float) WaveformData[source]

Generates a constant waveform at a specified frequency and level. This method creates a waveform where all sample values are set to a constant level, sampled at intervals determined by the specified frequency.

Parameters:
  • freq_hz (float) – The frequency in Hertz at which to generate the waveform samples.

  • constant_level (float) – The constant value for all samples in the waveform.

Returns:

An object containing the generated constant waveform values and the sample time in milliseconds.

Return type:

WaveformData

classmethod generate_sine_wave(freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0) WaveformData[source]

Generates a sine wave based on the specified frequency and amplitude levels.

Parameters:
  • freq_hz (float) – The frequency of the sine wave in Hertz (Hz).

  • low_level (float) – The minimum value (low level) of the sine wave.

  • high_level (float) – The maximum value (high level) of the sine wave.

Returns:

An object containing the generated sine wave data, including:
  • x_time (List[float]): A list of time points in ms corresponding to the sine wave samples.

  • y_values (List[float]): A list of amplitude values for the sine wave at each time point.

  • sample_time_us (float): The time interval between samples in microseconds (µs).

Return type:

WaveformData

Notes

  • The method calculates an optimal sample time based on the desired frequency and the hardware’s base sample time.

  • The buffer size is adjusted to ensure the generated waveform fits within one period of the sine wave.

  • The sine wave is scaled and offset to match the specified low and high levels.

classmethod generate_square_wave(freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0, duty_cycle: float = 0.5) WaveformData[source]

Generates a square wave (or PWM waveform) using NumPy for efficient computation.

Parameters:
  • freq_hz (float) – Frequency of the waveform in Hz.

  • low_level (float) – Output level during the “low” part of the cycle.

  • high_level (float) – Output level during the “high” part of the cycle.

  • duty_cycle (float, optional) – Duty cycle as a fraction between 0.0 and 1.0. Defaults to 0.5 (i.e., 50%).

  • phase_shift_rad (float, optional) – Phase shift in radians. Defaults to 0.0.

Returns:

An object containing:
  • values (List[float]): Amplitude values of the waveform.

  • sample_time_ms (float): Time between samples in milliseconds.

Return type:

WaveformData

classmethod generate_time_samples_array(freq_hz: float) ndarray[source]

Generates a NumPy array of time samples (in milliseconds) for one period of a waveform at the specified frequency.

Parameters:

freq_hz (float) – The frequency of the waveform in Hertz.

Returns:

Time samples (in milliseconds).

Return type:

np.ndarray

classmethod generate_time_samples_list(freq_hz: float) List[float][source]

Generates a list of time samples (in milliseconds) for one period of a waveform at the specified frequency. Sampling is adjusted based on hardware base sample time and buffer constraints.

Parameters:

freq_hz (float) – The frequency of the waveform in Hertz.

Returns:

Time samples (in milliseconds).

Return type:

List[float]

classmethod generate_triangle_wave(freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0) WaveformData[source]

Generates a triangle wave based on the specified frequency and amplitude levels.

Parameters:
  • freq_hz (float) – The frequency of the triangle wave in Hertz (Hz).

  • low_level (float) – The minimum value (low level) of the triangle wave.

  • high_level (float) – The maximum value (high level) of the triangle wave.

  • phase_shift_rad (float, optional) – Phase shift in radians. Defaults to 0.0.

Returns:

An object containing the generated triangle wave data, including:
  • values (List[float]): A list of amplitude values for the triangle wave at each time point.

  • sample_time_ms (float): The time interval between samples in milliseconds (ms).

Return type:

WaveformData

Notes

  • The method calculates an optimal sample time based on the desired frequency and the

hardware’s base sample time. - The waveform is normalized between -1 and 1, then scaled and offset to fit between low_level and high_level. - The waveform is generated over one full period.

classmethod generate_waveform(waveform_type: WaveformType, freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0, duty_cycle: float = 0.5) WaveformData[source]

Generates a waveform based on the specified type and parameters.

Parameters:
  • waveform_type (Waveform) – The type of waveform to generate (SINE, TRIANGLE, SQUARE).

  • freq_hz (float) – Frequency of the waveform in Hertz.

  • low_level (float) – Minimum value of the waveform.

  • high_level (float) – Maximum value of the waveform.

  • phase_shift_rad (float, optional) – Phase shift in radians. Defaults to 0.0.

  • duty_cycle (float, optional) – Duty cycle for square wave. Defaults to 0.5.

Returns:

The generated waveform data.

Return type:

WaveformData

async is_running() bool[source]

Checks if the waveform generator is currently running.

Returns:

True if the waveform generator is running, False otherwise.

Return type:

bool

async set_cycles(cycles: int = 0)[source]

Sets the number of cycles to run. - WaveformGenerator.NV200_INFINITE_CYCLES - 0 = infinitely - 1…65535

async set_loop_end_index(end_index: int)[source]

Sets the end index for arbitrary waveform generator output. The loop end index is the index where the waveform generator jumps to the next cycle or finishes if only one cycle is used.

async set_loop_start_index(start_index: int)[source]

Sets the start index for the waveform generator loop. If you use multiple cycles, the loop start index is the index defines the index where the waveform generator starts in the next cycle.

async set_output_sampling_time(sampling_time: int)[source]

Sets the output sampling time for the waveform generator. The output sampling time can be given in multiples of 50 µs from 1 * 50µs to 65535 * 50µs. If the sampling time is not a multiple of 50, it will be rounded to the nearest multiple of 50µs. The calculated sampling time is returned in microseconds.

Returns:

The set sampling time in microseconds.

Return type:

int

Note: Normally you do not need to set the sampling time manually because it is set automatically calculated when the waveform is generated.

async set_start_index(index: int)[source]

Sets the offset index when arbitrary waveform generator gets started. That means after the start() function is called, the arbitrary waveform generator starts at the index defined by set_start_index() and runs until the index defined by set_loop_end_index(). In all successive cycles, the arbitrary waveform generator starts at set_loop_start_index(). This is repeated until the number of cycles reaches the value given by set_cycles().

async set_waveform(waveform: WaveformData, unit: WaveformUnit = WaveformUnit.PERCENT, adjust_loop: bool = True, on_progress: ProgressCallback | None = None)[source]

Sets the waveform data in the device. The WaveformData object should contain the waveform values and the sample time.

Parameters:
  • waveform (WaveformData) – The waveform data to be set.

  • unit (WaveformUnit) – The unit of the waveform values. Defaults to WaveformUnit.PERCENT.

  • adjust_loop (bool) – If True, adjusts the loop indices based on the waveform data, if false, the loop indices are not adjusted. If the loop indices are adjusted, then they will be set to the following value: - start_index = 0 (first waveform value) - loop_start_index = 0 (first waveform value) - loop_end_index = last waveform value

  • on_progress (Optional[ProgressCallback]) – Optional callback for progress updates.

Raises:

ValueError – If the waveform data is invalid.

async set_waveform_buffer(buffer: list[float], unit: WaveformUnit = WaveformUnit.PERCENT, on_progress: ProgressCallback | None = None)[source]

Writes a full waveform buffer to the device by setting each value using set_waveform_value. The buffer should contain waveform values in percent (0-100). In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra) and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).

Parameters:
  • buffer (list of float) – The waveform values in percent (0-100).

  • unit (WaveformUnit) – The unit of the waveform values. Defaults to WaveformUnit.PERCENT.

  • on_progress (Optional[ProgressCallback]) – Optional callback for progress updates.

Raises:

ValueError – If the buffer size exceeds the maximum buffer length.

async set_waveform_from_samples(time_samples: Sequence[float] | ndarray, values: Sequence[float] | ndarray, unit: WaveformUnit = WaveformUnit.PERCENT, adjust_loop: bool = True)[source]

Sets the waveform data in the device from separate time samples and values. The waveform data should contain the time samples in milliseconds and the corresponding amplitude values. in percent (0-100). In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra) and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).

Parameters:
  • time_samples (Sequence[float] or np.ndarray) – Time samples in milliseconds.

  • values (Sequence[float] or np.ndarray) – Corresponding waveform amplitude values in percent (0-100).

  • unit (WaveformUnit, optional) – The unit of the waveform values. Defaults to WaveformUnit.PERCENT.

  • adjust_loop (bool) – If True, adjusts loop indices based on data length.

Raises:

ValueError – If inputs are invalid or lengths mismatch.

async set_waveform_value_percent(index: int, percent: float)[source]

Sets the value of the waveform at the specified index in percent from 0 - 100% In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra) and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).

async start(start: bool = True, cycles: int = -1, start_index: int = -1)[source]

Starts / stops the waveform generator

Parameters:
  • start (bool, optional) – If True, starts the waveform generator. If False, stops it. Defaults to True.

  • cycles (int, optional) – The number of cycles to run the waveform generator. If set to -1, the value configured via set_cycles() will be used.

async stop()[source]

Stops the waveform generator. This is equivalent to calling start(False).

async wait_until_finished(timeout_s: float = 10.0)[source]

Waits until the waveform generator is finished running.

Parameters:

timeout_s (float) – The maximum time to wait in seconds. Defaults to 10 seconds.

Returns:

True if the waveform generator finished running, False if timed out.

Return type:

bool

class WaveformType[source]

Bases: Enum

Enumeration for different waveform types supported by the generator.

class WaveformUnit[source]

Bases: Enum

Enumeration for different waveform units used in the generator.

calculate_sampling_time_ms(time_samples: Sequence[float] | ndarray) float[source]

Calculates the sampling time in milliseconds from a sequence of time samples.

Parameters:

time_samples (Union[Sequence[float], np.ndarray]) – A list or NumPy array of time samples in milliseconds.

Returns:

The sampling time in milliseconds.

Return type:

float

Raises:

ValueError – If the sequence has fewer than 2 time samples.

analysis

class ResonanceAnalyzer[source]

Bases: object

A utility class for measuring and analyzing the resonance behavior of a piezoelectric system.

This class encapsulates both the hardware interaction needed to acquire an impulse response from the device and the signal processing needed to compute the resonance spectrum.

__init__(device: NV200Device)[source]

Initializes the ResonanceAnalyzer with the required hardware components.

Parameters:
  • device – The device object used to restore parameters and get voltage range.

  • recorder – The data recorder used to capture the piezo response.

  • waveform_generator – The waveform generator used to generate the impulse.

static compute_resonance_spectrum(signal: ndarray, sample_freq: float) Tuple[ndarray, ndarray, float][source]

Computes the frequency spectrum of a signal and extracts the resonance frequency.

Parameters:
  • signal – The time-domain signal (e.g., piezo position) as a NumPy array.

  • sample_freq – The sampling frequency in Hz.

Returns:

  • Frequencies (xf): NumPy array of frequency bins.

  • Spectrum magnitude (yf): NumPy array of FFT magnitudes.

  • Resonance frequency (res_freq): Peak frequency in Hz.

Return type:

Tuple containing

async measure_impulse_response(baseline_voltage: float) Tuple[ndarray, float][source]

Measures the impulse response of the system by generating a waveform and recording the resulting piezo position signal.

Returns:

  • The recorded piezo signal as a NumPy array.

  • The sample frequency in Hz.

Return type:

Tuple containing

Utilities

utils

async wait_until(condition_func: Callable[[], Awaitable], check_func: Callable[[any], bool], poll_interval_s: float = 0.1, timeout_s: float | None = None) bool[source]

Wait until an asynchronous condition function returns a value that satisfies a check function.

Parameters:
  • condition_func (Callable[[], Awaitable]) – An async function returning a value of any type.

  • check_func (Callable[[any], bool]) – A function that checks if the value returned by condition_func satisfies the condition.

  • poll_interval_s (float) – Time in seconds to wait between condition checks.

  • timeout_s (float | None) – Optional timeout in seconds. If None, wait indefinitely.

Returns:

True if the condition matched within the timeout, False otherwise.

Return type:

bool

Example

>>> async def get_result():
...     return 3
>>> await wait_until(get_result, check_func=lambda x: x > 2, timeout_s=5.0)
True