Source code for nv200.waveform_generator
"""
WaveformGenerator module for controlling waveform generation on a connected NV200 device.
This module defines the `WaveformGenerator` class, which provides methods to configure and control waveform generation,
including the ability to generate sine waves, set cycles, adjust sampling times, and manage waveform buffers. It supports
asynchronous interaction with the device for real-time control of waveform generation.
Classes:
- :class:`.WaveformGenerator`: Manages waveform generation and configuration on the NV200 device.
- :class:`.WaveformData`: Represents waveform data with time, amplitude, and sample time.
"""
import math
import logging
import numpy as np
from typing import List, Union, Sequence, Optional
from enum import Enum
from nv200.nv200_device import NV200Device, ModulationSource
from nv200.shared_types import TimeSeries, ProgressCallback
from nv200.utils import wait_until
# Global module locker
logger = logging.getLogger(__name__)
[docs]
def calculate_sampling_time_ms(time_samples: Union[Sequence[float], np.ndarray]) -> float:
"""
Calculates the sampling time in milliseconds from a sequence of time samples.
Args:
time_samples (Union[Sequence[float], np.ndarray]): A list or NumPy array of time samples in milliseconds.
Returns:
float: The sampling time in milliseconds.
Raises:
ValueError: If the sequence has fewer than 2 time samples.
"""
if len(time_samples) < 2:
raise ValueError("At least two time samples are required to calculate sampling time.")
return (time_samples[-1] - time_samples[0]) / (len(time_samples) - 1) #
[docs]
class WaveformType(Enum):
"""
Enumeration for different waveform types supported by the generator.
"""
SINE = 0
TRIANGLE = 1
SQUARE = 2
CONSTANT = 3
[docs]
class WaveformUnit(Enum):
"""
Enumeration for different waveform units used in the generator.
"""
PERCENT = 0 # Percentage of the range (0-100%)
POSITION = 1 # Length units (e.g., micrometers, millimeters)
VOLTAGE = 2 # Voltage units (e.g., volts)
[docs]
class WaveformGenerator:
"""
WaveformGenerator is a class responsible for generating waveforms using a connected device.
"""
NV200_WAVEFORM_BUFFER_SIZE = 1024 # Size of the data buffer for waveform generator
NV200_BASE_SAMPLE_TIME_US = 50 # Base sample time in microseconds
NV200_INFINITE_CYCLES = 0 # Infinite cycles constant for the waveform generator
[docs]
class WaveformData(TimeSeries):
"""
WaveformData is a NamedTuple that represents waveform data.
"""
@property
def sample_factor(self):
"""
Returns the sample factor used to calculate the sample time from the base sample time.
"""
return (self.sample_time_ms * 1000) // WaveformGenerator.NV200_BASE_SAMPLE_TIME_US
@property
def cycle_time_ms(self):
"""
Returns the cycle of a single cycle in milliseconds.
"""
return len(self.values) * self.sample_time_ms
[docs]
def __init__(self, device: NV200Device):
"""
Initializes the WaveformGenerator instance with the specified device client.
Args:
device (DeviceClient): The device client used for communication with the hardware.
"""
self._dev = device
self._waveform : WaveformGenerator.WaveformData = None
[docs]
async def start(self, start : bool = True, cycles: int = -1, start_index: int = -1):
"""
Starts / stops the waveform generator
Args:
start (bool, optional): If True, starts the waveform generator. If False, stops it. Defaults to True.
cycles (int, optional): The number of cycles to run the waveform generator.
If set to -1, the value configured via set_cycles() will be used.
"""
if cycles > -1:
await self.set_cycles(cycles)
if start_index > -1:
await self.set_start_index(start_index)
await self._dev.set_modulation_source(ModulationSource.WAVEFORM_GENERATOR)
await self._dev.write(f"grun,{int(start)}")
[docs]
async def stop(self):
"""
Stops the waveform generator.
This is equivalent to calling start(False).
"""
await self._dev.write(f"grun,{0}")
[docs]
async def set_loop_start_index(self, start_index: int):
"""
Sets the start index for the waveform generator loop.
If you use multiple cycles, the loop start index is the index defines the index
where the waveform generator starts in the next cycle.
"""
await self._dev.write(f"gsarb,{start_index}")
[docs]
async def set_loop_end_index(self, end_index: int):
"""
Sets the end index for arbitrary waveform generator output.
The loop end index is the index where the waveform generator jumps to the next
cycle or finishes if only one cycle is used.
"""
await self._dev.write(f"gearb,{end_index}")
[docs]
async def set_start_index(self, index: int):
"""
Sets the offset index when arbitrary waveform generator
gets started. That means after the start() function is called, the arbitrary
waveform generator starts at the index defined by set_start_index() and runs
until the index defined by set_loop_end_index(). In all successive cycles, the arbitrary
waveform generator starts at set_loop_start_index(). This is repeated until the number
of cycles reaches the value given by set_cycles().
"""
await self._dev.write(f"goarb,{index}")
[docs]
async def set_cycles(self, cycles: int = 0):
"""
Sets the number of cycles to run.
- WaveformGenerator.NV200_INFINITE_CYCLES - 0 = infinitely
- 1…65535
"""
await self._dev.write(f"gcarb,{cycles}")
[docs]
async def configure_waveform_loop(self, start_index: int, loop_start_index: int, loop_end_index: int):
"""
Sets the start and end indices for the waveform loop.
The start index is the index where the waveform generator starts when it is started.
The loop start index is the index where the waveform generator starts in the next cycle
and the loop end index is the index where the waveform generator jumps to the next cycle.
"""
await self.set_start_index(start_index)
await self.set_loop_start_index(loop_start_index)
await self.set_loop_end_index(loop_end_index)
[docs]
async def set_output_sampling_time(self, sampling_time: int):
"""
Sets the output sampling time for the waveform generator.
The output sampling time can be given in multiples of 50 µs from
1 * 50µs to 65535 * 50µs. If the sampling time is not a multiple
of 50, it will be rounded to the nearest multiple of 50µs.
The calculated sampling time is returned in microseconds.
Returns:
int: The set sampling time in microseconds.
Note: Normally you do not need to set the sampling time manually because it is set automatically
calculated when the waveform is generated.
"""
rounded_sampling_time = round(sampling_time / 50) * 50
factor = rounded_sampling_time // 50
factor = max(1, min(factor, 65535))
await self._dev.write(f"gtarb,{factor}")
return rounded_sampling_time
[docs]
async def set_waveform_value_percent(self, index : int, percent : float):
"""
Sets the value of the waveform at the specified index in percent from 0 - 100%
In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra)
and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).
"""
if not 0 <= index < self.NV200_WAVEFORM_BUFFER_SIZE:
raise ValueError(f"Buffer index must be in the range from 0 to {self.NV200_WAVEFORM_BUFFER_SIZE} , got {index}")
if not 0 <= percent <= 100:
raise ValueError(f"Waveform value must be in the range from 0 to 100%, got {percent}")
await self._dev.write(f"gbarb,{index},{percent}")
[docs]
async def set_waveform_buffer(self, buffer: list[float], unit: WaveformUnit = WaveformUnit.PERCENT, on_progress: Optional[ProgressCallback] = None):
"""
Writes a full waveform buffer to the device by setting each value
using set_waveform_value.
The buffer should contain waveform values in percent (0-100).
In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra)
and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).
Parameters:
buffer (list of float): The waveform values in percent (0-100).
unit (WaveformUnit): The unit of the waveform values. Defaults to WaveformUnit.PERCENT.
on_progress (Optional[ProgressCallback]): Optional callback for progress updates.
Raises:
ValueError: If the buffer size exceeds the maximum buffer length.
"""
if len(buffer) > self.NV200_WAVEFORM_BUFFER_SIZE:
raise ValueError(
f"Buffer too large: max size is {self.NV200_WAVEFORM_BUFFER_SIZE}, got {len(buffer)}"
)
value_range = None
if unit == WaveformUnit.POSITION:
value_range = await self._dev.get_position_range()
elif unit == WaveformUnit.VOLTAGE:
value_range = await self._dev.get_voltage_range()
if value_range is not None:
# Scale values to to percent
buffer = [100 * (value - value_range[0]) / (value_range[1] - value_range[0]) for value in buffer]
total = len(buffer)
for index, percent in enumerate(buffer):
await self.set_waveform_value_percent(index, percent)
if on_progress:
await on_progress(index + 1, total)
[docs]
async def set_waveform(
self,
waveform: WaveformData,
unit: WaveformUnit = WaveformUnit.PERCENT,
adjust_loop: bool = True,
on_progress: Optional[ProgressCallback] = None,
):
"""
Sets the waveform data in the device.
The WaveformData object should contain the waveform values and the sample time.
Parameters:
waveform (WaveformData): The waveform data to be set.
unit (WaveformUnit): The unit of the waveform values. Defaults to WaveformUnit.PERCENT.
adjust_loop (bool): If True, adjusts the loop indices based on the
waveform data, if false, the loop indices are not adjusted.
If the loop indices are adjusted, then they will be set to
the following value:
- start_index = 0 (first waveform value)
- loop_start_index = 0 (first waveform value)
- loop_end_index = last waveform value
on_progress (Optional[ProgressCallback]): Optional callback for progress updates.
Raises:
ValueError: If the waveform data is invalid.
"""
await self.set_waveform_buffer(waveform.values, unit=unit, on_progress=on_progress)
self._waveform = waveform
await self.set_output_sampling_time(int(waveform.sample_time_ms * 1000))
if not adjust_loop:
return
# Adjust loop indices based on the waveform data
await self.configure_waveform_loop(
start_index=0,
loop_start_index=0,
loop_end_index=len(waveform.values) - 1,
)
[docs]
async def set_waveform_from_samples(
self,
time_samples: Union[Sequence[float], np.ndarray],
values: Union[Sequence[float], np.ndarray],
unit: WaveformUnit = WaveformUnit.PERCENT,
adjust_loop: bool = True
):
"""
Sets the waveform data in the device from separate time samples and values.
The waveform data should contain the time samples in milliseconds and the corresponding
amplitude values. in percent (0-100).
In closed loop mode, the value is interpreted as a percentage of the position range (i.e. 0 - 80 mra)
and in open loop mode, the value is interpreted as a percentage of the voltage range (i.e. -20 - 130 V).
Args:
time_samples (Sequence[float] or np.ndarray): Time samples in milliseconds.
values (Sequence[float] or np.ndarray): Corresponding waveform amplitude values in percent (0-100).
unit (WaveformUnit, optional): The unit of the waveform values. Defaults to WaveformUnit.PERCENT.
adjust_loop (bool): If True, adjusts loop indices based on data length.
Raises:
ValueError: If inputs are invalid or lengths mismatch.
"""
if len(time_samples) != len(values):
raise ValueError("Time samples and values must have the same length.")
if len(time_samples) == 0:
raise ValueError("Input arrays cannot be empty.")
# Calculate sample_time_ms from time_samples
sample_time_ms = calculate_sampling_time_ms(time_samples)
# Create WaveformData object (assuming it takes values and sample_time_ms)
waveform = self.WaveformData(values=list(values), sample_time_ms=sample_time_ms)
# Use existing set_waveform method to actually send data
await self.set_waveform(waveform, unit=unit, adjust_loop=adjust_loop)
[docs]
async def is_running(self) -> bool:
"""
Checks if the waveform generator is currently running.
Returns:
bool: True if the waveform generator is running, False otherwise.
"""
return bool(await self._dev.read_int_value('grun'))
[docs]
async def wait_until_finished(self, timeout_s: float = 10.0):
"""
Waits until the waveform generator is finished running.
Args:
timeout_s (float): The maximum time to wait in seconds. Defaults to 10 seconds.
Returns:
bool: True if the waveform generator finished running, False if timed out.
"""
return await wait_until(
self.is_running,
check_func=lambda x: not x,
poll_interval_s=0.1,
timeout_s=timeout_s
)
[docs]
@classmethod
def generate_time_samples_list(cls, freq_hz: float) -> List[float]:
"""
Generates a list of time samples (in milliseconds) for one period of a waveform at the specified frequency.
Sampling is adjusted based on hardware base sample time and buffer constraints.
Args:
freq_hz (float): The frequency of the waveform in Hertz.
Returns:
List[float]: Time samples (in milliseconds).
"""
if freq_hz <= 0:
raise ValueError("Frequency must be greater than zero.")
buf_size = cls.NV200_WAVEFORM_BUFFER_SIZE
base_sample_time_us = cls.NV200_BASE_SAMPLE_TIME_US
period_us = 1_000_000 / freq_hz
ideal_sample_time_us = period_us / buf_size
sample_factor = math.ceil(ideal_sample_time_us / base_sample_time_us)
sample_time_us = sample_factor * base_sample_time_us
sample_time_s = sample_time_us / 1_000_000
required_buffer = int(period_us / sample_time_us)
return [i * sample_time_s * 1000 for i in range(required_buffer)]
[docs]
@classmethod
def generate_time_samples_array(cls, freq_hz: float) -> np.ndarray:
"""
Generates a NumPy array of time samples (in milliseconds) for one period of a waveform at the specified frequency.
Args:
freq_hz (float): The frequency of the waveform in Hertz.
Returns:
np.ndarray: Time samples (in milliseconds).
"""
return np.array(cls.generate_time_samples_list(freq_hz))
[docs]
@classmethod
def generate_sine_wave(
cls,
freq_hz: float,
low_level: float,
high_level: float,
phase_shift_rad: float = 0.0,
) -> WaveformData:
"""
Generates a sine wave based on the specified frequency and amplitude levels.
Args:
freq_hz (float): The frequency of the sine wave in Hertz (Hz).
low_level (float): The minimum value (low level) of the sine wave.
high_level (float): The maximum value (high level) of the sine wave.
Returns:
WaveformData: An object containing the generated sine wave data, including:
- x_time (List[float]): A list of time points in ms corresponding to the sine wave samples.
- y_values (List[float]): A list of amplitude values for the sine wave at each time point.
- sample_time_us (float): The time interval between samples in microseconds (µs).
Notes:
- The method calculates an optimal sample time based on the desired frequency and the
hardware's base sample time.
- The buffer size is adjusted to ensure the generated waveform fits within one period
of the sine wave.
- The sine wave is scaled and offset to match the specified low and high levels.
"""
times_ms = cls.generate_time_samples_list(freq_hz)
amplitude = (high_level - low_level) / 2.0
offset = (high_level + low_level) / 2.0
values: List[float] = [
offset + amplitude * math.sin(2 * math.pi * freq_hz * (t_ms / 1000) + phase_shift_rad)
for t_ms in times_ms
]
return cls.WaveformData(
values=values,
sample_time_ms=calculate_sampling_time_ms(times_ms)
)
[docs]
@classmethod
def generate_triangle_wave(
cls,
freq_hz: float,
low_level: float,
high_level: float,
phase_shift_rad: float = 0.0,
) -> WaveformData:
"""
Generates a triangle wave based on the specified frequency and amplitude levels.
Args:
freq_hz (float): The frequency of the triangle wave in Hertz (Hz).
low_level (float): The minimum value (low level) of the triangle wave.
high_level (float): The maximum value (high level) of the triangle wave.
phase_shift_rad (float, optional): Phase shift in radians. Defaults to 0.0.
Returns:
WaveformData: An object containing the generated triangle wave data, including:
- values (List[float]): A list of amplitude values for the triangle wave at each time point.
- sample_time_ms (float): The time interval between samples in milliseconds (ms).
Notes:
- The method calculates an optimal sample time based on the desired frequency and the
hardware's base sample time.
- The waveform is normalized between -1 and 1, then scaled and offset to fit between
low_level and high_level.
- The waveform is generated over one full period.
"""
times_ms = cls.generate_time_samples_array(freq_hz) # NumPy array in ms
t = times_ms / 1000.0 # Convert to seconds
amplitude = (high_level - low_level) / 2.0
offset = (high_level + low_level) / 2.0
period_s = 1.0 / freq_hz
# Apply phase shift in time domain
phase_shift_t = phase_shift_rad / (2 * np.pi * freq_hz)
t_shifted = t + phase_shift_t
# Generate normalized triangle wave in [-1, 1]
normalized_t = ((t_shifted / period_s) - 0.25) % 1.0
triangle = 4 * np.abs(normalized_t - 0.5) - 1
y = offset + amplitude * triangle
return cls.WaveformData(
values=y.tolist(),
sample_time_ms=calculate_sampling_time_ms(times_ms)
)
[docs]
@classmethod
def generate_square_wave(
cls,
freq_hz: float,
low_level: float,
high_level: float,
phase_shift_rad: float = 0.0,
duty_cycle: float = 0.5,
) -> WaveformData:
"""
Generates a square wave (or PWM waveform) using NumPy for efficient computation.
Args:
freq_hz (float): Frequency of the waveform in Hz.
low_level (float): Output level during the "low" part of the cycle.
high_level (float): Output level during the "high" part of the cycle.
duty_cycle (float, optional): Duty cycle as a fraction between 0.0 and 1.0.
Defaults to 0.5 (i.e., 50%).
phase_shift_rad (float, optional): Phase shift in radians. Defaults to 0.0.
Returns:
WaveformData: An object containing:
- values (List[float]): Amplitude values of the waveform.
- sample_time_ms (float): Time between samples in milliseconds.
"""
if not (0.0 < duty_cycle <= 1.0):
raise ValueError("Duty cycle must be between 0.0 (exclusive) and 1.0 (inclusive).")
times_ms = cls.generate_time_samples_array(freq_hz) # NumPy array (ms)
t = times_ms / 1000.0 # Convert to seconds
period_s = 1.0 / freq_hz
# Apply phase shift in time domain
phase_shift_t = phase_shift_rad / (2 * np.pi * freq_hz)
t_shifted = t + phase_shift_t
# Time within the period (0 to 1)
normalized_t = (t_shifted / period_s) % 1.0
# Generate square wave with duty cycle
values = np.where(normalized_t < duty_cycle, high_level, low_level)
return cls.WaveformData(
values=values.tolist(),
sample_time_ms=calculate_sampling_time_ms(times_ms)
)
[docs]
@classmethod
def generate_constant_wave(
cls,
freq_hz: float,
constant_level: float
) -> WaveformData:
"""
Generates a constant waveform at a specified frequency and level.
This method creates a waveform where all sample values are set to a constant level,
sampled at intervals determined by the specified frequency.
Args:
freq_hz (float): The frequency in Hertz at which to generate the waveform samples.
constant_level (float): The constant value for all samples in the waveform.
Returns:
WaveformData: An object containing the generated constant waveform values and the sample time in milliseconds.
"""
times_ms = cls.generate_time_samples_array(freq_hz) # NumPy array of time points in ms
values = np.full_like(times_ms, constant_level, dtype=float)
return cls.WaveformData(
values=values.tolist(),
sample_time_ms=calculate_sampling_time_ms(times_ms),
)
[docs]
@classmethod
def generate_waveform(
cls,
waveform_type: WaveformType,
freq_hz: float,
low_level: float,
high_level: float,
phase_shift_rad: float = 0.0,
duty_cycle: float = 0.5,
) -> WaveformData:
"""
Generates a waveform based on the specified type and parameters.
Args:
waveform_type (Waveform): The type of waveform to generate (SINE, TRIANGLE, SQUARE).
freq_hz (float): Frequency of the waveform in Hertz.
low_level (float): Minimum value of the waveform.
high_level (float): Maximum value of the waveform.
phase_shift_rad (float, optional): Phase shift in radians. Defaults to 0.0.
duty_cycle (float, optional): Duty cycle for square wave. Defaults to 0.5.
Returns:
WaveformData: The generated waveform data.
"""
if waveform_type == WaveformType.SINE:
return cls.generate_sine_wave(freq_hz, low_level, high_level, phase_shift_rad)
elif waveform_type == WaveformType.TRIANGLE:
return cls.generate_triangle_wave(freq_hz, low_level, high_level, phase_shift_rad)
elif waveform_type == WaveformType.SQUARE:
return cls.generate_square_wave(freq_hz, low_level, high_level, phase_shift_rad, duty_cycle)
elif waveform_type == WaveformType.CONSTANT:
return cls.generate_constant_wave(freq_hz, high_level)
else:
raise ValueError(f"Unsupported waveform type: {waveform_type}")