API Reference
The NV200 library API consists of the following modules:
Device Discovery and Connection
device_discovery — The NV200 device discovery module.
connection_utils — Helper functions for device connection.
device_factory — Factory for creating device instances from discovery results.
Device Abstractions
device_base — Generic piezosystem device base class.
nv200_device — High-level async client for NV200 piezo controllers.
shared_types — Shared data structures and enums.
Transport Protocols
transport_protocol — Base class for the device transport protocol.
serial_protocol — Serial communication implementation.
telnet_protocol — Telnet protocol over Ethernet.
Data and Signal Modules
data_recorder — Module for recording experimental data.
waveform_generator — Interface to arbitrary waveform generator.
analysis — Module for resonance analysis
Utilities
utils — General utility functions and classes.
Device Discovery and Connection
device_discovery
This module provides asynchronous device discovery functionality for the NV200 library.
It concurrently scans for devices available via Telnet and Serial protocols, returning
a unified list of detected devices. Each detected device is represented by a DetectedDevice
instance, annotated with its transport type (TELNET or SERIAL), identifier (such as IP address
or serial port), and optionally a MAC address.
- async discover_devices(flags: ~nv200.shared_types.DiscoverFlags = <DiscoverFlags.ALL_INTERFACES: 3>, device_class: ~typing.Type[~nv200.device_base.PiezoDeviceBase] | None = None) List[DetectedDevice] [source]
Asynchronously discovers devices on available interfaces based on the specified discovery flags and optional device class.
The discovery process can be customized using flags to enable or disable:
DiscoverFlags.DETECT_ETHERNET
- detect devices connected via EthernetDiscoverFlags.DETECT_SERIAL
- detect devices connected via SerialDiscoverFlags.READ_DEVICE_INFO
- enrich device information with additional details such as actuator name and actuator serial number
- Parameters:
flags (DiscoverFlags, optional) – Flags indicating which interfaces to scan and whether to read device info. Defaults to DiscoverFlags.ALL_INTERFACES.
device_class (Optional[Type[PiezoDeviceBase]], optional) – If specified, only devices matching this class will be returned. Also ensures device info is read.
- Returns:
A list of detected devices, optionally enriched with detailed information and filtered by device class.
- Return type:
List[DetectedDevice]
- Raises:
Any exceptions raised by underlying protocol discovery or enrichment functions. –
Notes
Device discovery is performed in parallel for supported interfaces (Ethernet, Serial).
If READ_DEVICE_INFO is set, each detected device is enriched with additional information.
If device_class is specified, only devices matching the class’s DEVICE_ID are returned.
Examples
Discover all Devices on all interfaces
>>> device = await nv200.device_discovery.discover_devices(DiscoverFlags.ALL_INTERFACES | DiscoverFlags.READ_DEVICE_INFO)
Discover NV200 Devices connected via serial interface
>>> device = await nv200.device_discovery.discover_devices(DiscoverFlags.DETECT_SERIAL | DiscoverFlags.READ_DEVICE_INFO, NV200Device)
device_factory
- create_device_from_detected_device(detected_device: DetectedDevice) PiezoDeviceBase [source]
Creates a device object from the given DetectedDevice parameters.
- create_device_from_id(device_id: str, *args, **kwargs) PiezoDeviceBase [source]
Creates and returns an instance of a device class corresponding to the given device ID.
- Parameters:
device_id (str) – The identifier for the device model to instantiate.
*args – Positional arguments to pass to the device class constructor.
**kwargs – Keyword arguments to pass to the device class constructor.
- Returns:
An instance of the device class associated with the given device ID.
- Return type:
- Raises:
ValueError – If the provided device_id is not supported or not found in the registry.
connection_utils
- async connect_to_detected_device(detected_device: DetectedDevice, auto_adjust_comm_params: bool = False) PiezoDeviceBase [source]
Connects to a device using the provided DetectedDevice instance.
- Parameters:
detected_device (DetectedDevice) – The detected device to connect to.
- Returns:
An instance of PiezoDeviceBase connected to the specified device.
- Return type:
- async connect_to_single_device(device_class: Type[PiezoDeviceType], transport_type: TransportType | None = None, interface_or_address: str = None) PiezoDeviceType [source]
Convenience function to quickly connect to a single device.
Use this function if you only have one single device connected and you would like to connect to it directly. If no transport type is specified, it will attempt to connect using the serial transport first, and if no devices are found, it will then attempt to connect using the Telnet transport. The function does not verify if the right device is connected, it simply connects to the first device found. So use this function only, if you really have only one device connected to your system.
If no transport type is specified, the function will perform auto discovery of devices. If transport type is specified, but no interface_or_address is provided, it will perform auto discovery of devices for that transport type. If both transport type and interface_or_address are specified, it will attempt to connect directly to the specified device.
- Parameters:
transport_type (TransportType, optional) – The type of transport to use (e.g., Serial, Telnet). If not specified, it will attempt to connect using serial transport first, then Telnet.
interface_or_address (str, optional) – The serial port, IP address, or MAC address to connect to. If not specified, device discovery is performed. This parameter is only used if the transport type is specified. If no transport type is specified, it will attempt to discover devices automatically.
- Returns:
An instance of DeviceClient connected to the NV200 device.
- Return type:
DeviceClient
Examples
Auto Discovery
>>> device = await nv200.connection_utils.connect_to_single_device()
Serial Port Auto Discovery
>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.SERIAL)
Ethernet Auto Discovery
>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.TELNET)
Connect to specific MAC address
>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.TELNET, "00:80:A3:79:C6:18")
Connect to specific IP address
>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.TELNET, "192.168.102.3")
Connect to specific serial port
>>> device = await nv200.connection_utils.connect_to_single_device(TransportType.SERIAL, "COM3")
Device Abstractions
device_base
Provides classes and enumerations for communicating with and interpreting responses from NV200 devices.
This module includes an asynchronous client for issuing commands and parsing responses from NV200 devices over supported transport protocols (e.g., serial, Telnet).
- class PiezoDeviceBase[source]
Bases:
object
Generic piezosystem device base class.
PiezoDeviceBase provides an asynchronous interface for communicating with a piezoelectric device over various transport protocols (such as serial or telnet). It encapsulates low-level device commands, response parsing, synchronization mechanisms, and optional command result caching.
This class is intended to be subclassed by concrete device implementations (e.g.,
NV200Device
), which define specific device behaviors and cacheable commands.Concrete device classes support caching of command parameters/values. This mechanism is designed to reduce frequent read access over the physical communication interface by serving previously retrieved values from a local cache. Since each read operation over the interface introduces latency, caching can significantly improve the performance of parameter access — particularly in scenarios like graphical user interfaces where many values are queried repeatedly.
Note
Caching should only be used if it is guaranteed that no other external application modifies the device state in parallel. For example, if access is made via the Python library over a Telnet connection, no other software (e.g., via a serial interface) should modify the same parameters concurrently. In such multi-access scenarios, caching can lead to inconsistent or outdated data. To ensure correctness, disable caching globally by setting the class variable
CMD_CACHE_ENABLED
toFalse
.- CMD_CACHE_ENABLED
Class-level flag that controls whether command-level caching is enabled. When set to True (default), values read from or written to the device for cacheable commands will be stored and retrieved from an internal cache. Setting this to
False
disables the caching behavior globally for all instances unless explicitly overridden at the instance level.- Type:
bool
- CACHEABLE_COMMANDS: set[str] = {}
- CMD_CACHE_ENABLED = True
- DEFAULT_TIMEOUT_SECS = 0.6
- DEVICE_ID = None
- __init__(transport: TransportProtocol)[source]
- async backup_parameters(backup_list: list[str]) Dict[str, str] [source]
Asynchronously backs up device settings by reading response parameters for each command in the provided list.
Use the restore_parameters method to restore the settings later from the backup.
- Parameters:
backup_list (list[str]) – A list of command strings for which to back up settings.
- Returns:
A dictionary mapping each command to its corresponding response string from the device.
- Return type:
Dict[str, str]
Example
>>> backup_list = [ >>> "modsrc", "notchon", "sr", "poslpon", "setlpon", "cl", "reclen", "recstr"] >>> await self.backup_settings(backup_list)
- async check_device_type() tuple[bool, str] [source]
Checks if the device type matches the given device ID.
- Returns:
True if the device type matches, False otherwise.
- Return type:
bool
- async close()[source]
Asynchronously closes the transport connection.
This method ensures that the transport layer is properly closed, releasing any resources associated with it.
- async connect(auto_adjust_comm_params: bool = True)[source]
Establishes a connection using the transport layer.
This asynchronous method initiates the connection process by calling the
connect
method of the transport instance.- Parameters:
auto_adjust_comm_params (bool) – If True, the Telnet transport will automatically adjust the internal communication parameters of the XPORT ethernet module. It will set the flow control mode to#
XON_XOFF_PASS_TO_HOST
. This is required for the library to work+ properly.- Raises:
Exception – If the connection fails, an exception may be raised depending on the implementation of the transport layer.
- property device_info: DeviceInfo
Returns detailed information about the connected device.
This property provides a DeviceInfo object that includes the device’s identifier and transport metadata. It requires that the transport layer is initialized and connected.
- Raises:
RuntimeError – If the transport is not initialized or the device is not connected.
- Returns:
An object containing the device ID and transport info.
- Return type:
- async enrich_device_info(detected_device: DetectedDevice) None [source]
Get additional information about the device.
A derived class should implement this method to enrich the device information in the given detected_device object.
- Parameters:
detected_device (DetectedDevice) – The detected device object to enrich with additional information.
- async get_device_type() str [source]
Retrieves the type of the device. The device type is the string that is returned if you just press enter after connecting to the device.
- classmethod help(cmd: str | None = None) str [source]
Returns help information for a specific command or all commands if no command is specified.
- Parameters:
cmd (str, optional) – The command to get help for. If None, returns help for all commands.
- Returns:
Help information for the specified command or all commands.
- Return type:
str
- classmethod help_dict()[source]
Returns the class-level help dictionary with a list of all commands and their descriptions.
- property lock: _ReentrantAsyncLock
Lock that can be used by external code to synchronize access to the device.
Use with
async with client.lock:
to group operations atomically.
- async read_cached_response_parameters_tring(cmd: str, timeout: float = 0.6) str [source]
Asynchronously reads the response parameters string for a given command, utilizing a cache if enabled. If caching is enabled and the command’s response is present in the cache, returns the cached response. Otherwise, reads the response using
read_response_parameters_string
, caches it if applicable, and returns it.- Parameters:
cmd (str) – The command string to send.
timeout (float, optional) – Timeout in seconds for the response. Defaults to DEFAULT_TIMEOUT_SECS.
- Returns:
The parameters part of the response string (after the first comma), or an empty string if no parameters are present.
- Return type:
str
- async read_float_value(cmd: str, param_index: int = 0) float [source]
Asynchronously reads a single float value from device. For example, if you write the command
set
, to read the current setpoint, it will return80.000
if the setpoint is 80.000. The response is parsed into a float value. Use this function for command that returns a single floating point value.- Parameters:
cmd (str) – The command string to be sent.
param_index (int) – Parameter index (default 0) to read from the response.
- Returns:
The value as a floating-point number.
- Return type:
float
Example
>>> value = await device_client.read_float_value('set') >>> print(value) 80.000
- async read_int_value(cmd: str, param_index: int = 0) int [source]
Asynchronously reads a single float value from device. For example, if you write
cl
to the device, the response will be0
or1
depending on the current PID mode. The response is parsed into an integer value.- Parameters:
cmd (str) – The command string to be sent.
param_index (int) – Parameter index (default 0) to read from the response
- Returns:
The value as a floating-point number.
- Return type:
float
Example
>>> value = await device_client.read_int_value('cl') >>> print(value) 1
- async read_response(cmd: str, timeout: float = 0.6) tuple [source]
Asynchronously sends a command to read values and returnes the response as a tuple. For example, if you write the command
set
, it will returnset,80.000
if the setpoint is 80.000. The response is parsed into a tuple containing the commandset
and a list of parameter strings, in this case[80.000]
.- Parameters:
cmd (str) – The command string to be sent.
- Returns:
A tuple containing the command (str) and a list of parameters (list of str).
- Return type:
tuple
Example
>>> response = await device_client.read_response('set') >>> print(response) ('set', ['80.000'])
- async read_response_parameters_string(cmd: str, timeout: float = 0.6) str [source]
Asynchronously sends a command and retrieves the parameters portion of the response string.
- Parameters:
cmd (str) – The command string to send.
timeout (float, optional) – The maximum time to wait for a response, in seconds. Defaults to DEFAULT_TIMEOUT_SECS.
- Returns:
The parameters part of the response string (after the first comma), or an empty string if no parameters are present.
- Return type:
str
- async read_response_string(cmd: str, timeout: float = 0.6) str [source]
Sends a command to the transport layer and reads the response asynchronously. For example, if you write
cl
to the device, it will returncl,0
orcl,1
depending on the current PID mode. That means, this function returns the complete stringcl,0\r\n
orcl,1\r\n
including the carriage return and line feed.- Parameters:
cmd (str) – The command string to be sent.
timeout – The timeout for reading the response in seconds.
- Returns:
The response received from the transport layer.
- Return type:
str
Example
>>> response = await device_client.read('cl') >>> print(response) b'cl,1\r\n'
- async read_string_value(cmd: str, param_index: int = 0) str [source]
Asynchronously reads a single string value from device. For example, if you write the command
desc
, the device will return the name of the actuator i.e. TRITOR100SG . The response is parsed into a string value.- Parameters:
cmd (str) – The command string to be sent.
param_index (int) – Parameter index (default 0) to read from the response.
- Returns:
The value as a string.
- Return type:
str
Example
>>> await self.read_string_value('desc') >>> print(value) TRITOR100SG
- async read_stripped_response_string(cmd: str, timeout: float = 0.6) str [source]
Reads a response string from the device, stripping any leading/trailing whitespace. This method is a placeholder and should be implemented based on actual response handling.
- async read_values(cmd: str, timeout: float = 0.6) list[str] [source]
Asynchronously sends a command and returns the values as a list of strings. For example, if you write the command
recout,0,0,1
, to read the first data recorder value, it will return['0', '0', '0.029']
if the first data recorder value is0.029
. So it returns a list of 3 strings.- Parameters:
cmd (str) – The command string to be sent.
- Returns:
A list of values (list of str)..
Example
>>> values = await device_client.read_values('recout,0,0,1') >>> print(values) ['0', '0', '0.029']
- async restore_parameters(backup: Dict[str, str])[source]
Asynchronously restores device parameters from a backup created with
backup_parameters
.Iterates over the provided backup dictionary, writing each parameter value to the device.
- property transport_protocol: TransportProtocol
Returns the transport protocol used by the device.
- async write(cmd: str)[source]
Sends a command to the transport layer.
This asynchronous method writes a command string followed by a carriage return to the transport layer.
- Parameters:
cmd (str) – The command string to be sent. No carriage return is needed.
Example
>>> await device_client.write('set,80')
- async write_string_value(cmd: str, value: str)[source]
Sends a command with a string value to the transport layer.
This asynchronous method writes a command string followed by a carriage return to the transport layer. It is used for commands that require a string value.
- Parameters:
cmd (str) – The command string to be sent.
value (str) – The string value to be included in the command.
Example
>>> await device_client.write_string_value('set', '80.000')
- async write_value(cmd: str, value: int | float | str | bool)[source]
Asynchronously writes a value to the device using the specified command.
- Parameters:
cmd (str) – The command string to send to the device.
value (Union[int, float, str, bool]) – The value to write, which can be an integer, float, string, or boolean.
Example
>>> await device_client.write('set', 80)
nv200_device
- class NV200Device[source]
Bases:
PiezoDeviceBase
A high-level asynchronous client for communicating with NV200 piezo controllers.
This class extends the
PiezoDeviceBase
base class and provides high-level methods for setting and getting various device parameters, such as PID mode, setpoint, etc.- setpoint_lpf
Interface for the setpoint low-pass filter.
- position_lpf
Interface for the position low-pass filter.
- notch_filter
Interface for the notch filter.
- Type:
- pid
Interface for the PID controller to set the controller mode and PID gains.
- CACHEABLE_COMMANDS: set[str] = {'acmeasure', 'avmax', 'avmin', 'cl', 'ctrlmode', 'kd', 'ki', 'kp', 'modsrc', 'monsrc', 'notchb', 'notchon', 'poslpf', 'poslponnotchf', 'posmax', 'posmin', 'setlpf', 'setlpon', 'spisrc', 'sr', 'unitcl', 'unitol'}
- DEVICE_ID = 'NV200/D_NET'
- class LowpassFilter[source]
Bases:
object
Interface to a low-pass filter configuration on the NV200 device.
This class is parameterized by the command strings for enabling the filter and setting/getting the cutoff frequency.
- __init__(device: NV200Device, enable_cmd: str, cutoff_cmd: str, cutoff_range: ValueRange[float]) None [source]
Initialize the LowpassFilter.
- Parameters:
device (NV200Device) – The parent device instance.
enable_cmd (str) – The device command to enable/disable the filter.
cutoff_cmd (str) – The device command to set/get the cutoff frequency.
- async enable(enable: bool) None [source]
Enable or disable the low-pass filter.
- Parameters:
enable (bool) – True to enable, False to disable.
- async get_cutoff() float [source]
Get the cutoff frequency of the low-pass filter.
- Returns:
The cutoff frequency in Hz.
- Return type:
int
- class NotchFilter[source]
Bases:
object
Interface to a notch filter configuration on the NV200 device.
Allows enabling/disabling the notch filter and configuring its center frequency and -3 dB bandwidth.
- __init__(device: NV200Device) None [source]
Initialize the NotchFilter interface.
- Parameters:
device (NV200Device) – The parent device instance used for communication.
- async enable(enable: bool) None [source]
Enable or disable the notch filter.
- Parameters:
enable (bool) – True to enable the filter, False to disable.
- async get_bandwidth() int [source]
Get the -3 dB bandwidth of the notch filter.
- Returns:
Current bandwidth in Hz.
- Return type:
int
- async get_frequency() int [source]
Get the center frequency of the notch filter.
- Returns:
Current center frequency in Hz.
- Return type:
int
- async is_enabled() bool [source]
Check if the notch filter is currently enabled.
- Returns:
True if the filter is enabled, False otherwise.
- Return type:
bool
- class PIDController[source]
Bases:
object
PIDController provides an interface for configuring and controlling the PID control loop of an NV200 device. This class allows enabling/disabling closed loop control, setting and retrieving PID gains, and configuring feed-forward control amplification factors for position, velocity, and acceleration.
- __init__(device: NV200Device) None [source]
Initialize the NotchFilter interface.
- Parameters:
device (NV200Device) – The parent device instance used for communication.
- async get_mode() PidLoopMode [source]
Retrieves the current PID mode of the device.
- async get_pcf_gains() PCFGains [source]
Retrieves the feed forward control amplification factors.
- Returns:
The feed forward factors for position, velocity, and acceleration.
- Return type:
- async get_pid_gains() PIDGains [source]
Retrieves the PID gains (kp, ki, kd) for the device.
- Returns:
A named tuple with fields kp, ki, and kd.
- Return type:
- async is_closed_loop() bool [source]
Check if the device is currently in closed loop control mode. :returns: True if in closed loop mode, False otherwise. :rtype: bool
- async set_mode(mode: PidLoopMode)[source]
Sets the PID mode of the device to either open loop or closed loop.
- async set_pcf_gains(position: float | None = None, velocity: float | None = None, acceleration: float | None = None) None [source]
Sets the PID controllers feed forward control amplification factors.
- Parameters:
position (float | None) – Factor for position feed-forward. Leave unchanged if None.
velocity (float | None) – Factor for velocity feed-forward. Leave unchanged if None.
acceleration (float | None) – Factor for acceleration feed-forward (scaled by 1/1_000_000 internally). Leave unchanged if None.
- __init__(transport: TransportProtocol)[source]
Initialize NV200Device and its low-pass filter interfaces.
- async enrich_device_info(detected_device: DetectedDevice) None [source]
Get additional information about the device.
A derived class should implement this method to enrich the device information in the given detected_device object.
- Parameters:
detected_device (DetectedDevice) – The detected device object to enrich with additional information.
- async export_actuator_config(path: str = '', filename: str = '') str [source]
Asynchronously exports the actuator configuration parameters to an INI file. This method reads a predefined set of actuator configuration parameters from the device, and writes them to an INI file. The file can be saved to a specified path and filename, or defaults will be used based on the actuator’s description and serial number.
- Parameters:
path (str, optional) – Directory path where the configuration file will be saved. If not provided, the file will be saved in the current working directory.
filename (str, optional) – Name of the configuration file. If not provided, a default name in the format ‘actuator_conf_{desc}_{acserno}.ini’ will be used.
- Returns:
The full path to the saved configuration file.
- Raises:
Any exceptions raised during file writing or parameter reading will propagate. –
- static from_detected_device(detected_device: DetectedDevice) NV200Device [source]
Creates an NV200Device instance from a DetectedDevice object by selecting the appropriate transport protocol based on the detected device’s transport type. :param detected_device: The detected device containing transport type and identifier. :type detected_device: DetectedDevice
- Returns:
An instance of NV200Device initialized with the correct transport protocol.
- Return type:
- async get_actuator_description() str [source]
Retrieves the description of the actuator that is connected to the NV200 device. The description consists of the actuator type and the serial number. For example: “TRITOR100SG, #85533”
- async get_actuator_name() str [source]
Retrieves the name of the actuator that is connected to the NV200 device.
- async get_actuator_sensor_type() PostionSensorType [source]
Retrieves the type of position sensor used by the actuator. Returns a PostionSensorType enum value.
- async get_actuator_serial_number() str [source]
Retrieves the serial number of the actuator that is connected to the NV200 device.
- async get_analog_monitor_source() AnalogMonitorSource [source]
Returns the source of data for analog output (monsrc).
- async get_control_mode() CtrlMode [source]
Retrieves the current control mode of the device.
- Returns:
The current control mode.
- Return type:
- async get_current_position() float [source]
Retrieves the current position of the device. For actuators with sensor: Position in actuator units (μm or mrad) For actuators without sensor: Piezo voltage in V
- async get_heat_sink_temperature() float [source]
Retrieves the heat sink temperature in degrees Celsius.
- async get_max_position() float [source]
Retrieves the maximum position of the device. For actuators with sensor: Maximum position in actuator units (μm or mrad) For actuators without sensor: Maximum piezo voltage in V
- async get_max_voltage() float [source]
Retrieves the maximum voltage of the device. This is the maximum voltage that can be applied to the piezo actuator.
- async get_min_position() float [source]
Retrieves the minimum position of the device. For actuators with sensor: Minimum position in actuator units (μm or mrad) For actuators without sensor: Minimum piezo voltage in V
- async get_min_voltage() float [source]
Retrieves the minimum voltage of the device. This is the minimum voltage that can be applied to the piezo actuator.
- async get_modulation_source() ModulationSource [source]
Retrieves the current setpoint modulation source.
- async get_position_range() Tuple[float, float] [source]
Retrieves the position range of the device for closed loop control. Returns a tuple containing the minimum and maximum position.
- async get_position_unit() str [source]
Retrieves the position unit of the device. This is typically “μm” for micrometers for linear actuatros or “mrad” for milliradians for tilting actuators.
- async get_setpoint_range() Tuple[float, float] [source]
Retrieves the setpoint range of the device. Returns a tuple containing the minimum and maximum setpoint. The setpoint range is determined by the position range for closed loop control and the voltage range for open loop control.
- async get_setpoint_unit() str [source]
Retrieves the current setpoint unit of the device. This is typically “V” for volts in open loop or the position unit in closed loop.
- async get_slew_rate() float [source]
Retrieves the slew rate of the device. The slew rate is the maximum speed at which the device can move.
- async get_spi_monitor_source() SPIMonitorSource [source]
Returns the source for the SPI/Monitor value returned via SPI MISO.
- async get_status_register() StatusRegister [source]
Retrieves the status register of the device.
- async get_voltage_range() Tuple[float, float] [source]
Retrieves the voltage range of the device for open loop control. Returns a tuple containing the minimum and maximum voltage.
- async get_voltage_unit() str [source]
Retrieves the voltage unit of the device. This is typically “V” for volts.
- async import_actuator_config(filepath: str)[source]
Imports actuator configuration from an INI file.
- Parameters:
filepath – Path to the INI file with the actuator configuration.
- async is_status_flag_set(flag: StatusFlags) bool [source]
Checks if a specific status flag is set in the status register.
- async move(target: float)[source]
Moves the device to the specified target position or voltage. The target is interpreted as a position in closed loop or a voltage in open loop.
- async move_to_position(position: float)[source]
Moves the device to the specified position in closed loop
- async move_to_voltage(voltage: float)[source]
Moves the device to the specified voltage in open loop
- async set_analog_monitor_source(source: AnalogMonitorSource)[source]
Sets the source of data for analog output (monsrc).
- async set_control_mode(mode: CtrlMode) None [source]
Sets the control mode of the device.
- Parameters:
mode (CtrlMode) – The control mode to set.
- async set_modulation_source(source: ModulationSource)[source]
Sets the setpoint modulation source.
- async set_slew_rate(slew_rate: float)[source]
Sets the slew rate of the device. 0.0000008 … 2000.0 %ms⁄ (2000 = disabled)
- async set_spi_monitor_source(source: SPIMonitorSource)[source]
Sets the source for the SPI/Monitor value returned via SPI MISO.
spibox_device
- class SpiBoxDevice[source]
Bases:
PiezoDeviceBase
A high-level asynchronous client for communicating with NV200 piezo controllers. This class extends the
PiezoDeviceBase
base class and provides high-level methods for setting and getting various device parameters, such as PID mode, setpoint,- DEVICE_ID = 'SPI Controller Box'
- async connect(auto_adjust_comm_params: bool = True)[source]
Establishes a connection using the transport layer.
- async set_setpoints_percent(ch1: float = 0, ch2: float = 0, ch3: float = 0) List[float] [source]
Set device setpoints as percentages (0.0 to 100.0) for 3 channels.
Converts percent values to 16-bit hex strings and sends them as a formatted command.
- async set_waveforms(ch1: ndarray | None, ch2: ndarray | None, ch3: ndarray | None) List[ndarray] [source]
Set waveforms for the device channels.
Each channel can be set to a waveform represented as a numpy array of floats. The values are expected to be in the range [0.0, 100.0].
- Parameters:
ch1 (Optional[np.ndarray]) – Waveform for channel 1.
ch2 (Optional[np.ndarray]) – Waveform for channel 2.
ch3 (Optional[np.ndarray]) – Waveform for channel 3.
- Returns:
Response from the device after setting the waveforms.
- Return type:
str
- parse_hex_to_floats_percent(data: str) List[float] [source]
Parses a comma-separated string of 4-character hexadecimal values into a list of floats.
The hex values are interpreted as unsigned 16-bit integers and converted to floats.
- Parameters:
data (str) – A string of comma-separated 4-character hex values (e.g., “0000,FFFD,FFFD”).
- Returns:
A list of float representations of the parsed unsigned integers.
- Return type:
List[float]
Transport Protocols
transport_protocol
This module defines the transport protocols for communicating with NV200 devices, including Telnet and Serial interfaces.
- Classes:
TransportProtocol
: Abstract base class for transport protocols.TelnetProtocol
: Implements Telnet-based communication with NV200 devices.SerialProtocol
: Implements serial communication with NV200 devices.
Example
import asyncio
from nv200.device_interface import DeviceClient
from nv200.transport_protocols import SerialProtocol
async def serial_port_auto_detect():
transport = SerialProtocol()
client = DeviceClient(transport)
await client.connect()
print(f"Connected to device on serial port: {transport.port}")
await client.close()
if __name__ == "__main__":
asyncio.run(serial_port_auto_detect())
- class TransportProtocol[source]
Bases:
ABC
Abstract base class representing a transport protocol interface for a device.
- CR = b'\r'
- CRLF = b'\r\n'
- DEFAULT_TIMEOUT_SECS = 0.6
- LF = b'\n'
- XOFF = b'\x13'
- XON = b'\x11'
- __init__()[source]
Initializes the TransportProtocol base class. Subclasses may extend this constructor to initialize protocol-specific state.
- abstractmethod async close()[source]
Asynchronously closes the connection or resource associated with this instance.
This method should be used to release any resources or connections that were opened during the lifetime of the instance. Ensure that this method is called to avoid resource leaks.
- Raises:
Exception – If an error occurs while attempting to close the resource.
- abstractmethod async connect(auto_adjust_comm_params: bool = True, device: PiezoDeviceBase | None = None)[source]
Establishes an asynchronous connection to the NV200 device.
This method is intended to handle the initialization of a connection to the NV200 device. The implementation should include the necessary steps to ensure the connection is successfully established.
- Raises:
Exception – If the connection fails or encounters an error.
- abstractmethod async flush_input()[source]
Asynchronously flushes or clears the input buffer of the transport protocol.
This method is intended to remove any pending or unread data from the input stream, ensuring that subsequent read operations start with a clean buffer. It is typically used to prevent processing of stale or unwanted data.
- abstractmethod get_info() TransportProtocolInfo [source]
Returns metadata about the transport protocol, such as type and identifier.
- async read_message(timeout: float = 0.6) str [source]
Asynchronously reads a complete delimited message from the device
- Returns:
The response read from the source.
- Return type:
str
- abstractmethod async read_until(expected: bytes = b'\x11', timeout: float = 0.6) str [source]
Asynchronously reads data from the connection until the specified expected byte sequence is encountered.
- Parameters:
expected (bytes, optional) – The byte sequence to read until. Defaults to serial.XON.
- Returns:
The data read from the serial connection, decoded as a string, .
- Return type:
str
serial_protocol
- class SerialProtocol[source]
Bases:
TransportProtocol
A class to handle serial communication with an NV200 device using the AioSerial library. .. attribute:: port
The serial port to connect to. Defaults to None. If port is None, the class
- type:
str
- will try to auto detect the port.
- baudrate
The baud rate for the serial connection. Defaults to 115200.
- Type:
int
- serial
The AioSerial instance for asynchronous serial communication.
- Type:
AioSerial
- __init__(port: str | None = None, baudrate: int = 115200)[source]
Initializes the NV200 driver with the specified serial port settings.
- Parameters:
port (str, optional) – The serial port to connect to. Defaults to None. If port is None, the class will try to auto detect the port.
baudrate (int, optional) – The baud rate for the serial connection. Defaults to 115200.
- async close()[source]
Asynchronously closes the connection or resource associated with this instance.
This method should be used to release any resources or connections that were opened during the lifetime of the instance. Ensure that this method is called to avoid resource leaks.
- Raises:
Exception – If an error occurs while attempting to close the resource.
- async connect(auto_adjust_comm_params: bool = True, device: PiezoDeviceBase | None = None)[source]
Establishes an asynchronous connection to the NV200 device using the specified serial port settings.
This method initializes the serial connection with the given port, baud rate, and flow control settings. If the port is not specified, it attempts to automatically detect the NV200 device’s port. If the device cannot be found, a RuntimeError is raised.
- Raises:
RuntimeError – If the NV200 device cannot be detected or connected to.
- async detect_port(device: PiezoDeviceBase) str | None [source]
Asynchronously detects and configures the serial port for the NV200 device.
This method scans through all available serial ports to find one with a manufacturer matching “FTDI”. If such a port is found, it attempts to communicate with the device to verify if it is an NV200 device. If the device is successfully detected, the port is configured and returned.
- Returns:
The device name of the detected port if successful, otherwise None.
- Return type:
str
- async static discover_devices(flags: DiscoverFlags) List[DetectedDevice] [source]
Asynchronously discovers all devices connected via serial interface.
- Returns:
A list of serial port strings where a device has been detected.
- Return type:
list
- get_info() TransportProtocolInfo [source]
Returns metadata about the transport protocol, such as type and identifier.
- property port: str
Returns the serial port the device is connected to
- async read_until(expected: bytes = b'\x11', timeout: float = 0.6) str [source]
Asynchronously reads data from the connection until the specified expected byte sequence is encountered.
- Parameters:
expected (bytes, optional) – The byte sequence to read until. Defaults to serial.XON.
- Returns:
The data read from the serial connection, decoded as a string, .
- Return type:
str
- property serial: AioSerial | None
Provides access to the internal AioSerial interface. :returns: The internal AioSerial instance. :rtype: AioSerial
telnet_protocol
- class TelnetProtocol[source]
Bases:
TransportProtocol
TelnetTransport is a class that implements a transport protocol for communicating with piezosystem devices over Telnet. It provides methods to establish a connection, send commands, read responses, and close the connection.
- property MAC: str
Returns the MAC address.
- __init__(host: str = '', port: int = 23, MAC: str = '')[source]
Initializes the transport protocol.
- Parameters:
host (str, optional) – The hostname or IP address of the NV200 device. Defaults to None.
port (int, optional) – The port number to connect to. Defaults to 23.
MAC (str, optional) – The MAC address of the NV200 device. Defaults to None.
- async close()[source]
Asynchronously closes the connection or resource associated with this instance.
This method should be used to release any resources or connections that were opened during the lifetime of the instance. Ensure that this method is called to avoid resource leaks.
- Raises:
Exception – If an error occurs while attempting to close the resource.
- async static configure_flow_control_mode(host: str) bool [source]
Configures the flow control mode for the device to pass XON/XOFF characters to host
- async connect(auto_adjust_comm_params: bool = True, device: PiezoDeviceBase | None = None)[source]
Establishes a connection to a Lantronix device.
This asynchronous method attempts to connect to a Lantronix device using either the provided MAC address or by discovering devices on the network.
If self.host is None and self.MAC is provided, it discovers the device’s IP address using the MAC address.
If both self.host and self.MAC are None, it discovers all available Lantronix devices on the network and selects the first one.
Once the device’s IP address is determined, it establishes a Telnet connection to the device using the specified host and port.
- Raises:
RuntimeError – If no devices are found during discovery.
- async classmethod discover_devices(flags: DiscoverFlags) List[DetectedDevice] [source]
Asynchronously discovers all devices connected via ethernet interface
- Returns:
A list of dictionaries containing device information (IP and MAC addresses).
- Return type:
list
- get_info() TransportProtocolInfo [source]
Returns metadata about the transport protocol, such as type and identifier.
- property host: str
Returns the host address.
- async is_xon_xoff_forwared_to_host() bool [source]
Checks if XON/XOFF flow control is forwarded to the host.
This method sends a command to the device and checks the response to determine if XON/XOFF flow control is enabled. The detection is based on whether the response starts with the byte sequence “XON/XOFF”.
- Returns:
True if XON/XOFF flow control is forwarded to the host, False otherwise.
- Return type:
bool
- async read_until(expected: bytes = b'\x11', timeout: float = 0.6) str [source]
Asynchronously reads data from the connection until the specified expected byte sequence is encountered.
- Parameters:
expected (bytes, optional) – The byte sequence to read until. Defaults to serial.XON.
- Returns:
The data read from the serial connection, decoded as a string, .
- Return type:
str
Data and Signal Modules
data_recorder
This module provides access to the NV200 data recorder functionality.
- class DataRecorder[source]
Bases:
object
Data recorder class provides an interface for NV200 data recorder. The data recorder consists of two memory banks that are written to in parallel. In this way, two individual signals can be stored synchronously.
- ALL_CHANNELS = -1
- BUFFER_READ_TIMEOUT_SECS = 6
- class ChannelRecordingData[source]
Bases:
TimeSeries
WaveformData is a NamedTuple that represents waveform data.
- x_time
A list of time values (in seconds) corresponding to the waveform.
- Type:
List[float]
- y_values
A list of amplitude values corresponding to the waveform.
- Type:
List[float]
- sample_time_us
The sampling time in microseconds.
- Type:
int
- sample_factor
A factor used to calculate the sample time from the base sample time.
- Type:
int
- __init__(values: list, sample_time_ms: int, source: DataRecorderSource)[source]
Initialize the ChannelData instance with amplitude values, sample time, and source.
- Parameters:
values (list) – The amplitude values corresponding to the waveform.
sample_time_ms (int) – The sample time in milliseconds (sampling interval).
source (str) – The data recorder source
- property source: DataRecorderSource
Read-only property to get the maximum sample buffer size for the data recorder.
- INFINITE_RECORDING_DURATION = 0
- NV200_RECORDER_BUFFER_SIZE = 6144
- NV200_RECORDER_SAMPLE_RATE_HZ = 20000
- class RecorderParam
Bases:
tuple
RecorderParam(bufsize, stride, sample_freq)
- static __new__(_cls, bufsize, stride, sample_freq)
Create new instance of RecorderParam(bufsize, stride, sample_freq)
- bufsize
Alias for field number 0
- sample_freq
Alias for field number 2
- stride
Alias for field number 1
- __init__(device: NV200Device)[source]
Initializes the data recorder with the specified NV200 device.
- Parameters:
device (NV200Device) – The NV200 device instance to be used by the data recorder.
- _dev
Stores the provided NV200 device instance.
- Type:
- _sample_rate
The sample rate for data recording, initially set to None.
- Type:
int | None
- classmethod get_sample_period_ms_for_duration(milliseconds: float) float [source]
Calculates the sample period in seconds that is possible with the specified duration in milliseconds.
- classmethod get_sample_rate_for_duration(milliseconds: float) float [source]
Calculates the sample rate that is possible with the specified duration in milliseconds.
- property max_sample_buffer_size: int
Read-only property to get the maximum sample buffer size for the data recorder.
- async read_recorded_data() List[ChannelRecordingData] [source]
Asynchronously reads recorded data for two channels and returns it as a list.
This method retrieves the recorded data for channel 0 and channel 1 by calling
read_recorded_data_of_channel
for each channel. The results are returned as a list ofChannelRecordingData
objects.- Returns:
A list containing the recorded data for channel 0 and channel 1.
- Return type:
List[ChannelRecordingData]
- async read_recorded_data_of_channel(channel: int) ChannelRecordingData [source]
Asynchronously reads recorded data from a specified channel.
- Parameters:
channel (int) – The channel number from which to read the recorded data.
- Returns:
An object containing the recording source as a string and a list of floating-point numbers representing the recorded data.
- Return type:
- Raises:
Any exceptions raised by the underlying device communication methods. –
- async set_autostart_mode(mode: RecorderAutoStartMode)[source]
Sets the autostart mode of the data recorder.
- async set_data_source(channel: int, source: DataRecorderSource)[source]
Sets the channel and the source of data to be stored in the data recorder channel.
- async set_recording_duration_ms(milliseconds: float) RecorderParam [source]
Sets the recording duration in milliseconds and adjusts the recorder parameters accordingly.
This method calculates the appropriate stride, sample rate, and buffer size based on the specified recording duration and the recorder’s configuration. It then updates the recorder settings to match these calculated values.
- Parameters:
milliseconds (float) – The desired recording duration in milliseconds.
- Returns:
An object containing the updated buffer length, stride, and sample rate.
- Return type:
- Raises:
ValueError – If the calculated buffer size or stride is invalid.
- async set_sample_buffer_size(buffer_size: int)[source]
Sets the sample buffer size for each of the two data recorder channels (0..6144) A value of 0 means infinite loop over maximum length until recorder is stopped manually. If you would like to have an infinite loop, use the constant
DataRecorder.INFINITE_RECORDING_DURATION
. You can get the maximum buffer size using themax_sample_buffer_size
property.
- async stop_recording()[source]
Stops the recording process by invoking the
start_recording
method with False.
- async wait_until_finished(timeout_s: float = 10.0)[source]
Waits asynchronously until the recording process has finished or the specified timeout is reached.
- Parameters:
timeout_s (float) – The maximum time to wait in seconds. Defaults to 10.0.
- Returns:
True if the recording process finished within the timeout, False otherwise.
- Return type:
bool
- Raises:
asyncio.TimeoutError – If the timeout is reached before the recording process finishes.
- class DataRecorderSource[source]
Bases:
Enum
Enum representing the source of data to be stored in the data recorder channel, ignoring the buffer (A or B) distinction.
- ABS_POSITION_ERROR = 4
Absolute position error
- PIEZO_CURRENT_1 = 6
Piezo current 1 (A)
- PIEZO_CURRENT_2 = 7
Piezo current 2 (A)
- PIEZO_POSITION = 0
Piezo position (μm or mrad)
- PIEZO_VOLTAGE = 2
Piezo voltage (V)
- POSITION_ERROR = 3
Position error
- SETPOINT = 1
Setpoint (μm or mrad)
- class RecorderAutoStartMode[source]
Bases:
Enum
Enum representing the autostart mode of the data recorder.
- OFF = 0
Autostart off - start recording manually with recorder start command
- START_ON_SET_COMMAND = 1
Start on set-command
- START_ON_WAVEFORM_GEN_RUN = 2
Start on waveform generator run
- classmethod get_mode(value: int) RecorderAutoStartMode [source]
Given a mode value, return the corresponding RecorderAutoStartMode enum.
waveform_generator
WaveformGenerator module for controlling waveform generation on a connected NV200 device.
This module defines the WaveformGenerator
class, which provides methods to configure and control waveform generation,
including the ability to generate sine waves, set cycles, adjust sampling times, and manage waveform buffers. It supports
asynchronous interaction with the device for real-time control of waveform generation.
- Classes:
WaveformGenerator
: Manages waveform generation and configuration on the NV200 device.WaveformData
: Represents waveform data with time, amplitude, and sample time.
- class WaveformGenerator[source]
Bases:
object
WaveformGenerator is a class responsible for generating waveforms using a connected device.
- class WaveformData[source]
Bases:
TimeSeries
WaveformData is a NamedTuple that represents waveform data.
- property cycle_time_ms
Returns the cycle of a single cycle in milliseconds.
- property sample_factor
Returns the sample factor used to calculate the sample time from the base sample time.
- __init__(device: NV200Device)[source]
Initializes the WaveformGenerator instance with the specified device client.
- Parameters:
device (DeviceClient) – The device client used for communication with the hardware.
- async configure_waveform_loop(start_index: int, loop_start_index: int, loop_end_index: int)[source]
Sets the start and end indices for the waveform loop. The start index is the index where the waveform generator starts when it is started. The loop start index is the index where the waveform generator starts in the next cycle and the loop end index is the index where the waveform generator jumps to the next cycle.
- classmethod generate_constant_wave(freq_hz: float, constant_level: float) WaveformData [source]
Generates a constant waveform at a specified frequency and level. This method creates a waveform where all sample values are set to a constant level, sampled at intervals determined by the specified frequency.
- Parameters:
freq_hz (float) – The frequency in Hertz at which to generate the waveform samples.
constant_level (float) – The constant value for all samples in the waveform.
- Returns:
An object containing the generated constant waveform values and the sample time in milliseconds.
- Return type:
- classmethod generate_sine_wave(freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0) WaveformData [source]
Generates a sine wave based on the specified frequency and amplitude levels.
- Parameters:
freq_hz (float) – The frequency of the sine wave in Hertz (Hz).
low_level (float) – The minimum value (low level) of the sine wave.
high_level (float) – The maximum value (high level) of the sine wave.
- Returns:
- An object containing the generated sine wave data, including:
x_time (List[float]): A list of time points in ms corresponding to the sine wave samples.
y_values (List[float]): A list of amplitude values for the sine wave at each time point.
sample_time_us (float): The time interval between samples in microseconds (µs).
- Return type:
Notes
The method calculates an optimal sample time based on the desired frequency and the hardware’s base sample time.
The buffer size is adjusted to ensure the generated waveform fits within one period of the sine wave.
The sine wave is scaled and offset to match the specified low and high levels.
- classmethod generate_square_wave(freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0, duty_cycle: float = 0.5) WaveformData [source]
Generates a square wave (or PWM waveform) using NumPy for efficient computation.
- Parameters:
freq_hz (float) – Frequency of the waveform in Hz.
low_level (float) – Output level during the “low” part of the cycle.
high_level (float) – Output level during the “high” part of the cycle.
duty_cycle (float, optional) – Duty cycle as a fraction between 0.0 and 1.0. Defaults to 0.5 (i.e., 50%).
phase_shift_rad (float, optional) – Phase shift in radians. Defaults to 0.0.
- Returns:
- An object containing:
values (List[float]): Amplitude values of the waveform.
sample_time_ms (float): Time between samples in milliseconds.
- Return type:
- classmethod generate_time_samples_array(freq_hz: float) ndarray [source]
Generates a NumPy array of time samples (in milliseconds) for one period of a waveform at the specified frequency.
- Parameters:
freq_hz (float) – The frequency of the waveform in Hertz.
- Returns:
Time samples (in milliseconds).
- Return type:
np.ndarray
- classmethod generate_time_samples_list(freq_hz: float) List[float] [source]
Generates a list of time samples (in milliseconds) for one period of a waveform at the specified frequency. Sampling is adjusted based on hardware base sample time and buffer constraints.
- Parameters:
freq_hz (float) – The frequency of the waveform in Hertz.
- Returns:
Time samples (in milliseconds).
- Return type:
List[float]
- classmethod generate_triangle_wave(freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0) WaveformData [source]
Generates a triangle wave based on the specified frequency and amplitude levels.
- Parameters:
freq_hz (float) – The frequency of the triangle wave in Hertz (Hz).
low_level (float) – The minimum value (low level) of the triangle wave.
high_level (float) – The maximum value (high level) of the triangle wave.
phase_shift_rad (float, optional) – Phase shift in radians. Defaults to 0.0.
- Returns:
- An object containing the generated triangle wave data, including:
values (List[float]): A list of amplitude values for the triangle wave at each time point.
sample_time_ms (float): The time interval between samples in milliseconds (ms).
- Return type:
Notes
The method calculates an optimal sample time based on the desired frequency and the
hardware’s base sample time. - The waveform is normalized between -1 and 1, then scaled and offset to fit between low_level and high_level. - The waveform is generated over one full period.
- classmethod generate_waveform(waveform_type: WaveformType, freq_hz: float, low_level: float, high_level: float, phase_shift_rad: float = 0.0, duty_cycle: float = 0.5) WaveformData [source]
Generates a waveform based on the specified type and parameters.
- Parameters:
waveform_type (Waveform) – The type of waveform to generate (SINE, TRIANGLE, SQUARE).
freq_hz (float) – Frequency of the waveform in Hertz.
low_level (float) – Minimum value of the waveform.
high_level (float) – Maximum value of the waveform.
phase_shift_rad (float, optional) – Phase shift in radians. Defaults to 0.0.
duty_cycle (float, optional) – Duty cycle for square wave. Defaults to 0.5.
- Returns:
The generated waveform data.
- Return type:
- async is_running() bool [source]
Checks if the waveform generator is currently running.
- Returns:
True if the waveform generator is running, False otherwise.
- Return type:
bool
- async set_cycles(cycles: int = 0)[source]
Sets the number of cycles to run. - WaveformGenerator.NV200_INFINITE_CYCLES - 0 = infinitely - 1…65535
- async set_loop_end_index(end_index: int)[source]
Sets the end index for arbitrary waveform generator output. The loop end index is the index where the waveform generator jumps to the next cycle or finishes if only one cycle is used.
- async set_loop_start_index(start_index: int)[source]
Sets the start index for the waveform generator loop. If you use multiple cycles, the loop start index is the index defines the index where the waveform generator starts in the next cycle.
- async set_output_sampling_time(sampling_time: int)[source]
Sets the output sampling time for the waveform generator. The output sampling time can be given in multiples of 50 µs from 1 * 50µs to 65535 * 50µs. If the sampling time is not a multiple of 50, it will be rounded to the nearest multiple of 50µs. The calculated sampling time is returned in microseconds.
- Returns:
The set sampling time in microseconds.
- Return type:
int
Note: Normally you do not need to set the sampling time manually because it is set automatically calculated when the waveform is generated.
- async set_start_index(index: int)[source]
Sets the offset index when arbitrary waveform generator gets started. That means after the start() function is called, the arbitrary waveform generator starts at the index defined by set_start_index() and runs until the index defined by set_loop_end_index(). In all successive cycles, the arbitrary waveform generator starts at set_loop_start_index(). This is repeated until the number of cycles reaches the value given by set_cycles().
- async set_waveform(waveform: WaveformData, unit: WaveformUnit = WaveformUnit.PERCENT, adjust_loop: bool = True, on_progress: ProgressCallback | None = None)[source]
Sets the waveform data in the device. The WaveformData object should contain the waveform values and the sample time.
- Parameters:
waveform (WaveformData) – The waveform data to be set.
unit (WaveformUnit) – The unit of the waveform values. Defaults to WaveformUnit.PERCENT.
adjust_loop (bool) – If True, adjusts the loop indices based on the waveform data, if false, the loop indices are not adjusted. If the loop indices are adjusted, then they will be set to the following value: - start_index = 0 (first waveform value) - loop_start_index = 0 (first waveform value) - loop_end_index = last waveform value
on_progress (Optional[ProgressCallback]) – Optional callback for progress updates.
- Raises:
ValueError – If the waveform data is invalid.
- async set_waveform_buffer(buffer: list[float], unit: WaveformUnit = WaveformUnit.PERCENT, on_progress: ProgressCallback | None = None)[source]
Writes a full waveform buffer to the device by setting each value using set_waveform_value. The buffer should contain waveform values in percent (0-100). In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra) and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).
- Parameters:
buffer (list of float) – The waveform values in percent (0-100).
unit (WaveformUnit) – The unit of the waveform values. Defaults to WaveformUnit.PERCENT.
on_progress (Optional[ProgressCallback]) – Optional callback for progress updates.
- Raises:
ValueError – If the buffer size exceeds the maximum buffer length.
- async set_waveform_from_samples(time_samples: Sequence[float] | ndarray, values: Sequence[float] | ndarray, unit: WaveformUnit = WaveformUnit.PERCENT, adjust_loop: bool = True)[source]
Sets the waveform data in the device from separate time samples and values. The waveform data should contain the time samples in milliseconds and the corresponding amplitude values. in percent (0-100). In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra) and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).
- Parameters:
time_samples (Sequence[float] or np.ndarray) – Time samples in milliseconds.
values (Sequence[float] or np.ndarray) – Corresponding waveform amplitude values in percent (0-100).
unit (WaveformUnit, optional) – The unit of the waveform values. Defaults to WaveformUnit.PERCENT.
adjust_loop (bool) – If True, adjusts loop indices based on data length.
- Raises:
ValueError – If inputs are invalid or lengths mismatch.
- async set_waveform_value_percent(index: int, percent: float)[source]
Sets the value of the waveform at the specified index in percent from 0 - 100% In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra) and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).
- async start(start: bool = True, cycles: int = -1, start_index: int = -1)[source]
Starts / stops the waveform generator
- Parameters:
start (bool, optional) – If True, starts the waveform generator. If False, stops it. Defaults to True.
cycles (int, optional) – The number of cycles to run the waveform generator. If set to -1, the value configured via set_cycles() will be used.
- async wait_until_finished(timeout_s: float = 10.0)[source]
Waits until the waveform generator is finished running.
- Parameters:
timeout_s (float) – The maximum time to wait in seconds. Defaults to 10 seconds.
- Returns:
True if the waveform generator finished running, False if timed out.
- Return type:
bool
- class WaveformType[source]
Bases:
Enum
Enumeration for different waveform types supported by the generator.
- class WaveformUnit[source]
Bases:
Enum
Enumeration for different waveform units used in the generator.
- calculate_sampling_time_ms(time_samples: Sequence[float] | ndarray) float [source]
Calculates the sampling time in milliseconds from a sequence of time samples.
- Parameters:
time_samples (Union[Sequence[float], np.ndarray]) – A list or NumPy array of time samples in milliseconds.
- Returns:
The sampling time in milliseconds.
- Return type:
float
- Raises:
ValueError – If the sequence has fewer than 2 time samples.
analysis
- class ResonanceAnalyzer[source]
Bases:
object
A utility class for measuring and analyzing the resonance behavior of a piezoelectric system.
This class encapsulates both the hardware interaction needed to acquire an impulse response from the device and the signal processing needed to compute the resonance spectrum.
- __init__(device: NV200Device)[source]
Initializes the ResonanceAnalyzer with the required hardware components.
- Parameters:
device – The device object used to restore parameters and get voltage range.
recorder – The data recorder used to capture the piezo response.
waveform_generator – The waveform generator used to generate the impulse.
- static compute_resonance_spectrum(signal: ndarray, sample_freq: float) Tuple[ndarray, ndarray, float] [source]
Computes the frequency spectrum of a signal and extracts the resonance frequency.
- Parameters:
signal – The time-domain signal (e.g., piezo position) as a NumPy array.
sample_freq – The sampling frequency in Hz.
- Returns:
Frequencies (xf): NumPy array of frequency bins.
Spectrum magnitude (yf): NumPy array of FFT magnitudes.
Resonance frequency (res_freq): Peak frequency in Hz.
- Return type:
Tuple containing
- async measure_impulse_response(baseline_voltage: float) Tuple[ndarray, float] [source]
Measures the impulse response of the system by generating a waveform and recording the resulting piezo position signal.
- Returns:
The recorded piezo signal as a NumPy array.
The sample frequency in Hz.
- Return type:
Tuple containing
Utilities
utils
- async wait_until(condition_func: Callable[[], Awaitable], check_func: Callable[[any], bool], poll_interval_s: float = 0.1, timeout_s: float | None = None) bool [source]
Wait until an asynchronous condition function returns a value that satisfies a check function.
- Parameters:
condition_func (Callable[[], Awaitable]) – An async function returning a value of any type.
check_func (Callable[[any], bool]) – A function that checks if the value returned by condition_func satisfies the condition.
poll_interval_s (float) – Time in seconds to wait between condition checks.
timeout_s (float | None) – Optional timeout in seconds. If None, wait indefinitely.
- Returns:
True if the condition matched within the timeout, False otherwise.
- Return type:
bool
Example
>>> async def get_result(): ... return 3 >>> await wait_until(get_result, check_func=lambda x: x > 2, timeout_s=5.0) True