API Reference
Complete API reference for psj-lib.
Package Structure
psj_lib/
├── devices/
│ ├── base/ # Base device classes (public: PiezoDevice, PiezoChannel)
│ │ ├── piezo_device.py
│ │ ├── piezo_channel.py
│ │ ├── exceptions.py
│ │ └── capabilities/ # Base capabilities (all public)
│ ├── d_drive_family/ # d-Drive family (d-Drive + 30DV series)
│ │ ├── d_drive/ # d-Drive modular system
│ │ │ ├── d_drive_device.py
│ │ │ └── d_drive_channel.py
│ │ ├── psj_30dv/ # PSJ 30DV single-channel device
│ │ │ ├── psj_30dv_device.py
│ │ │ └── psj_30dv_channel.py
│ │ └── capabilities/ # d-Drive family capabilities (all public)
│ │ ├── d_drive_status_register.py
│ │ ├── d_drive_waveform_generator.py
│ │ └── ...
│ ├── nv_family/ # NV family (NV403, OL + CLE variants)
│ │ ├── nv_family_device.py
│ │ ├── nv_family_channel.py
│ │ ├── nv403/
│ │ └── nv403_cle/
│ └── transport_protocol/ # Internal (only TransportType, DiscoverFlags, TransportProtocolInfo exported)
└── _internal/ # Internal utilities
Note: Only classes and types exported in psj_lib.__init__.py are part of the public API.
Internal modules like device_factory and transport_protocol implementation details should not be accessed directly by end users.
Quick Reference
Main Classes
from psj_lib import DDriveDevice, TransportType, DiscoverFlags
Core Modules
Devices
Base Device Classes
Base classes for piezoelectric device communication and control.
This module provides the generic PiezoDevice base class that handles low-level communication, command caching, synchronization, and channel management for piezoelectric amplifiers and controllers.
The PiezoDevice class is designed to be subclassed by device-specific implementations (e.g., d-Drive, NV200) which define their specific command sets, capabilities, and behaviors.
- Key Features:
Async command execution with automatic response parsing
Optional command caching to reduce communication overhead
Reentrant async locking for thread-safe access
Multi-channel device support
Backup/restore of device configurations
- class PiezoDevice[source]
Bases:
objectGeneric base class for piezoelectric amplifier and controller devices.
PiezoDevice provides a comprehensive async interface for communicating with piezoelectric devices over various transport protocols (serial or Telnet). It encapsulates low-level device commands, response parsing, synchronization, and optional result caching.
This class is designed to be subclassed by concrete device implementations (e.g., DDriveDevice, NV200Device) which define specific command sets, capabilities, and channel configurations.
- Command Caching:
PiezoDevice supports intelligent caching of command parameters/values to reduce latency from frequent read operations. Each read over serial/Telnet can add several milliseconds of latency, so caching significantly improves performance for applications that repeatedly query device state (e.g., GUI monitoring).
Caching behavior: - Only commands in CACHEABLE_COMMANDS set are cached - Cache is automatically invalidated on write operations - Enable/disable per-instance via enable_cmd_cache() method
- Thread Safety:
All device operations use a reentrant async lock, ensuring safe concurrent access from multiple async tasks or threads. Use
async with device.lock:to group multiple operations atomically.- Multi-Channel Support:
Devices with multiple actuator channels expose them via the
channelsproperty. Each channel can be controlled independently.
Important
Caching should ONLY be used when the device has exclusive access (no other applications modifying device state). If multiple applications can access the device (e.g., both serial and Telnet), disable caching to prevent stale data issues (e.g. by calling enable_cmd_cache(False)).
- Class Attributes:
- DEVICE_ID (str | None): Unique identifier for this device model. Subclasses
must set this to auto-register with the DeviceFactory.
MAX_CHANNEL_COUNT (int): Maximum number of channels supported by the device. Subclasses must set this. CACHEABLE_COMMANDS (set[str]): Commands whose results can be cached. BACKUP_COMMANDS (set[str]): Commands to include in device backup operations. DEFAULT_TIMEOUT_SECS (float): Default timeout for command operations (0.6s). FRAME_DELIMITER_WRITE (bytes): Byte sequence appended to commands (default: CRLF). FRAME_DELIMITER_READ (bytes): Byte sequence expected at end of responses (default: CRLF).
- Instance Attributes:
_transport: TransportProtocol instance handling low-level communication _cache: CommandCache instance for caching read operations _lock: Reentrant async lock for thread-safe access _channels: Dictionary mapping channel IDs to PiezoChannel instances
Example
>>> # Discover and connect to a device >>> devices = await PiezoDevice.discover_devices() >>> device = devices[0] >>> await device.connect() >>> >>> # Access device information >>> info = device.device_info >>> print(f"Device: {info.device_id} on {info.transport_info.identifier}") >>> >>> # Execute commands with caching >>> voltage = await device.write('voltage', None) # Read (cached) >>> await device.write('voltage', [10.5]) # Write (invalidates cache) >>> >>> # Thread-safe grouped operations >>> async with device.lock: ... await device.write('cmd1', [100]) ... await device.write('cmd2', [200]) >>> >>> # Backup and restore configuration >>> backup = await device.backup(['voltage', 'frequency']) >>> # ... change settings ... >>> await device.restore(backup) >>> >>> # Clean shutdown >>> await device.close()
Note
Subclasses must implement: - _discover_channels(): Initialize the _channels dictionary - _is_device_type(): Verify connected device matches expected type - Set DEVICE_ID to a unique string for factory registration
- BACKUP_COMMANDS: set[str] = {}
- CACHEABLE_COMMANDS: set[str] = {}
- DEFAULT_TIMEOUT_SECS = 0.6
- DEVICE_ID = None
- FRAME_DELIMITER_READ = b'\r\n'
- FRAME_DELIMITER_WRITE = b'\r\n'
- MAX_CHANNEL_COUNT = 0
- SERIAL_BAUDRATE = 115200
Baudrate for serial communication. Default is 115200 baud.
- __init__(transport_type: TransportType, identifier: str)[source]
Initialize a piezo device with the specified transport connection.
Creates a new device instance configured to communicate via the specified transport protocol. The device is not connected until connect() is called.
- Parameters:
transport_type – Communication protocol to use (SERIAL or TELNET). SERIAL: Direct connection via RS-232, USB-serial adapter TELNET: Network connection via TCP/IP (typically using Lantronix adapter)
identifier – Connection identifier specific to the transport type: For SERIAL: Port name (e.g., ‘COM3’, ‘/dev/ttyUSB0’) For TELNET: IP address or hostname (e.g., ‘192.168.1.100’)
Example
>>> # Create device with serial connection >>> device = DDriveDevice( ... transport_type=TransportType.SERIAL, ... identifier='/dev/ttyUSB0' ... ) >>> >>> # Create device with Telnet connection >>> device = DDriveDevice( ... transport_type=TransportType.TELNET, ... identifier='192.168.1.100' ... ) >>> >>> # Must call connect() before using >>> await device.connect()
Note
The device is not connected after initialization. Call connect() to establish communication before issuing commands.
- async backup(backup_list: list[str] | None = None, backup_channels: bool = True) dict[str, str][source]
Create a backup of device and channel settings.
Reads and stores current values of specified commands and all channel settings. The backup can later be restored using restore() method. Cache is automatically cleared before backup to ensure fresh values.
The backup includes: 1. All channel-specific settings (from each channel’s BACKUP_COMMANDS) 2. Device-level commands specified in backup_list
- Parameters:
backup_list – List of device-level command names to backup. If None, the default BACKUP_COMMANDS list of the device is used. Example: [‘modsrc’, ‘notchon’, ‘sr’, ‘reclen’]
backup_channels – If True, backs up all channel settings automatically. (Default: True)
- Returns:
Dictionary mapping command strings to their response values. Channel commands are formatted as ‘command,channel_id’. Can be passed to restore() to recreate this configuration.
Example
>>> # Backup specific device settings >>> backup = await device.backup(['voltage', 'frequency', 'mode']) >>> >>> # Make changes >>> await device.write('voltage', [15.0]) >>> await device.write('mode', [2]) >>> >>> # Restore original settings >>> await device.restore(backup) >>> >>> # Backup includes all channels automatically >>> backup = await device.backup(['global_setting']) >>> # backup contains: {'voltage,0': ['10.0'], 'voltage,1': ['12.0'], ...}
Note
Cache is cleared before backup to ensure fresh values
Channel settings are backed up automatically
Backup format matches restore() input requirements
- property channels: dict[int, PiezoChannel]
Get dictionary of available device channels.
Multi-channel devices expose their channels through this property, allowing independent control of each actuator output. Channel IDs are typically 0-based integers (0, 1, 2, etc.).
- Returns:
Dictionary mapping channel IDs (int) to PiezoChannel instances. Returns empty dict before channels are discovered during connect().
Example
>>> await device.connect() # Discovers channels >>> >>> # Access specific channel >>> channel_0 = device.channels[0] >>> print(f"Channel ID: {channel_0.id}") >>> >>> # Iterate all channels >>> for ch_id, channel in device.channels.items(): ... print(f"Channel {ch_id}: {channel}") >>> >>> # Check number of channels >>> num_channels = len(device.channels) >>> print(f"Device has {num_channels} channels")
Note
Channels are populated during connect() via _discover_channels()
Single-channel devices may only have one channel (ID 0)
Channel objects provide access to channel-specific capabilities
Each channel can be controlled independently
- clear_cmd_cache()[source]
Clear all cached command results.
Removes all entries from the command cache, forcing subsequent read operations to query the device hardware. Use this when: - Device state may have changed externally - Recovering from errors - Switching operation modes - Debugging cache-related issues
Example
>>> # After external device modification >>> device.clear_cmd_cache() >>> voltage = await device.write('voltage', None) # Fresh read >>> >>> # In error recovery >>> try: ... await device.write('something', [val]) ... except DeviceError: ... device.clear_cmd_cache() # Start fresh
Note
Does not disable caching, only clears current entries
Next read operations will rebuild cache as normal
To disable caching entirely, use enable_cmd_cache(False)
- async close()[source]
Close the connection to the device and release resources.
This method properly shuts down the transport connection and releases any associated resources (serial ports, network sockets, etc.). Always call this when finished with the device to prevent resource leaks.
The method is idempotent - it’s safe to call multiple times or on an already-closed connection.
- Raises:
Exception – May propagate transport-specific exceptions if close fails
Example
>>> device = await DDriveDevice.connect(TransportType.SERIAL, 'COM3') >>> try: ... await device.write('voltage', [10.0]) ... finally: ... await device.close() # Always close in finally block >>> >>> # Or use async context manager (preferred) >>> async with DDriveDevice(...) as device: ... await device.connect() ... await device.write('voltage', [10.0]) >>> # Automatically closed on exit
Note
Safe to call on already-closed or non-initialized transport
Logs debug message if already closed
Recommended to use in finally block or async context manager
Automatically clears command cache on close
- async connect(auto_adjust_comm_params: bool = True)[source]
Establish connection to the device and initialize channels.
This method connects to the device via the transport layer, verifies the device type matches expectations, and discovers available channels. After successful connection, the device is ready for command execution.
Connection Process: 1. Check if already connected (skip if yes) 2. Establish transport-layer connection 3. Auto-adjust communication parameters if enabled (Telnet only) 4. Verify device type matches expected DEVICE_ID 5. Discover and initialize device channels
- Parameters:
auto_adjust_comm_params –
For Telnet connections only, automatically configure the internal Lantronix XPORT ethernet module for communication. Sets flow control to XON_XOFF_PASS_TO_HOST mode, which is required for library to function correctly. Default: True (recommended)
Set to False only if: - Using serial connection (parameter is ignored) - XPORT is already properly configured - Manual configuration is preferred
- Raises:
DeviceUnavailableException – If: - Transport is not initialized - Connection to device fails - Device type verification fails (wrong device connected)
TransportException – If transport-specific connection errors occur
Example
>>> # Standard connection (recommended) >>> device = DDriveDevice(TransportType.SERIAL, 'COM3') >>> await device.connect() >>> >>> # Connect without auto-configuration >>> device = DDriveDevice(TransportType.TELNET, '192.168.1.100') >>> await device.connect(auto_adjust_comm_params=False) >>> >>> # Verify connection >>> info = device.device_info >>> print(f"Connected to {info.device_id}")
Note
Safe to call multiple times (idempotent if already connected)
Device type mismatch causes connection to be closed
Must be called before any device commands
Channels are not available until connection completes
- property device_info: DeviceInfo
Get comprehensive information about the connected device.
Provides a DeviceInfo object containing the device’s model identifier and transport connection details. This property is useful for logging, diagnostics, and displaying device information in user interfaces.
- Returns:
device_id: Model identifier (e.g., ‘d-drive’, ‘nv200’)
- transport_info: TransportProtocolInfo with connection details
(transport type, identifier, etc.)
- Return type:
DeviceInfo object containing
- Raises:
DeviceUnavailableException – If the transport is not initialized or the device is not connected. Call connect() before accessing.
Example
>>> await device.connect() >>> info = device.device_info >>> print(f"Connected to {info.device_id}") >>> print(f" Transport: {info.transport_info.transport_type.name}") >>> print(f" Identifier: {info.transport_info.identifier}") >>> >>> # Example output: >>> # Connected to d-drive >>> # Transport: SERIAL >>> # Identifier: COM3
Note
Device must be connected before accessing this property. The device_id comes from the class’s DEVICE_ID attribute, while transport_info is provided by the active transport protocol.
- async classmethod discover_devices(flags: ~psj_lib.devices.transport_protocol.device_discovery.DiscoverFlags = <DiscoverFlags.ALL_INTERFACES: 3>) list[Self][source]
Discover and create instances of devices accessible via available transports.
This class method scans for devices using the specified discovery flags, identifies devices matching the class’s DEVICE_ID, and returns a list of ready-to-connect device instances.
The discovery process: 1. Scans serial ports and/or network interfaces based on flags 2. Attempts basic communication to identify device type 3. Filters for devices matching this class’s DEVICE_ID (if defined) 4. Creates appropriate device instances via DeviceFactory
- Parameters:
flags – Discovery scope controlling which interfaces to scan. DiscoverFlags.ALL_INTERFACES: Scan both serial and network DiscoverFlags.SERIAL_ONLY: Scan only serial/USB ports DiscoverFlags.TELNET_ONLY: Scan only network interfaces Default: ALL_INTERFACES
- Returns:
List of device instances found during discovery. Each device is created but not connected (call connect() on each device to establish communication).
Example
>>> # Discover all available devices >>> devices = await DDriveDevice.discover_devices() >>> print(f"Found {len(devices)} devices") >>> >>> # Discover only network-connected devices >>> network_devices = await DDriveDevice.discover_devices( ... flags=DiscoverFlags.TELNET_ONLY ... ) >>> >>> # Connect to first discovered device >>> if devices: ... await devices[0].connect()
Note
Base PiezoDevice class discovers all registered device types
Subclasses only discover devices matching their DEVICE_ID
Discovery can take several seconds, especially for network scanning
Devices must be powered on and properly connected to be discovered
- enable_cmd_cache(enable: bool)[source]
Enable or disable command caching for this device instance.
Controls whether command results are cached. Disabling cache also automatically clears all cached entries.
- Parameters:
enable – True to enable caching, False to disable and clear cache
Example
>>> # Disable caching for debugging >>> device.enable_cmd_cache(False) >>> voltage = await device.write('voltage', None) # Always fresh >>> >>> # Re-enable for better performance >>> device.enable_cmd_cache(True) >>> >>> # Temporarily disable for critical operations >>> device.enable_cmd_cache(False) >>> await perform_calibration() >>> device.enable_cmd_cache(True)
Note
Disabling cache automatically clears all cached values
Setting to False is important when other apps access device
Per-instance setting (doesn’t affect other device instances)
See CommandCache class for detailed caching behavior
- property lock: _ReentrantAsyncLock
Reentrant async lock for thread-safe device access.
This lock ensures that device operations are atomic and thread-safe when multiple async tasks or threads access the device concurrently. The lock is reentrant, meaning the same task can acquire it multiple times.
All internal write/read operations automatically use this lock. Use it explicitly when you need to group multiple operations atomically.
- Returns:
Reentrant async context manager lock
Example
>>> # Atomic multi-command sequence >>> async with device.lock: ... await device.write('mode', [1]) # Switch to mode 1 ... await device.write('voltage', [10.0]) # Set voltage ... await device.write('enable', [True]) # Enable output >>> >>> # Guarantees no other task can execute commands between these
Note
Lock is automatically used by write(), write_raw(), and other methods
Explicit locking is only needed for grouping multiple operations
Lock is reentrant: nested acquisitions from same task are allowed
Always use with async context manager (async with)
- async restore(backup: dict[str, list[str]])[source]
Restore device settings from a backup created with backup().
Iterates through the backup dictionary and writes each command with its saved values back to the device. This restores both device-level and channel-specific settings.
- Parameters:
backup – Dictionary created by backup() method, mapping command strings to their parameter value lists.
- Raises:
DeviceError – If any command in the backup fails to restore (e.g., parameter out of range, read-only parameter)
Example
>>> # Create backup before experiment >>> backup = await device.backup(['voltage', 'frequency']) >>> >>> # Run experiment with different settings >>> await device.write('voltage', [20.0]) >>> await device.write('frequency', [100]) >>> # ... perform measurements ... >>> >>> # Restore original configuration >>> await device.restore(backup) >>> >>> # All settings now back to original values >>> voltage = await device.write('voltage', None) >>> # voltage matches original backed-up value
Note
Commands are restored in dictionary iteration order
If a command fails, the exception propagates immediately
Works with backups containing both device and channel commands
- async write(cmd: str, params: list[int | float | str | bool] | None = None, timeout: float | None = None) list[str][source]
Execute a device command with optional parameters and caching.
This is the primary method for device interaction. It automatically handles: - Read vs write operations (based on params being None or not) - Command result caching for read operations - Cache invalidation for write operations - Parameter type conversion (bool -> int, all -> str) - Response parsing and error handling
- Read Operation (params=None):
Checks cache first, returns cached value if available
If cache miss, sends command to device
Caches result if command is cacheable
Returns response parameter values
- Write Operation (params provided):
Converts parameters to proper string format
Sends command with parameters to device
Caches written values if command is cacheable
Returns device response
- Parameters:
cmd – Command name to execute (e.g., ‘voltage’, ‘position’, ‘identify’)
params – Parameters for the command. Pass None for read operations, list of values for write operations. Boolean values are automatically converted to integers (True=1, False=0).
timeout – Maximum time to wait for device response in seconds. Default: 0.6 seconds
- Returns:
List of response strings from the device. The command name is stripped, only parameter values are returned.
- Raises:
DeviceUnavailableException – If device is not connected or transport fails
DeviceError – If device returns an error response (see exceptions.py)
TimeoutException – If device does not respond within timeout period
Example
>>> # Read operation (returns cached value if available) >>> voltage = await device.write('voltage', None) >>> print(f"Voltage: {voltage[0]} {voltage[1]}") # ['10.5', 'V'] >>> >>> # Write operation (invalidates cache) >>> await device.write('voltage', [12.5]) >>> >>> # Boolean parameters (converted to 0/1) >>> await device.write('enable', [True]) # Sends 'enable,1' >>> >>> # Multiple parameters >>> await device.write('pid', [5.0, 0.1, 0.2]) # P, I, D values >>> >>> # Custom timeout for slow operations >>> result = await device.write('calibrate', None, timeout=5.0)
Note
Caching only applies to commands in CACHEABLE_COMMANDS set
Cache is automatically invalidated on write operations
All operations use the device’s reentrant lock for thread safety
Boolean parameters: True becomes ‘1’, False becomes ‘0’
- async write_raw(cmd: str, timeout: float | None = None, rx_delimiter: bytes | None = None) Awaitable[str][source]
Send a raw command string and return unparsed device response.
This low-level method sends a command directly to the device without: - Parameter formatting or conversion - Response parsing or error checking - Caching of any kind
Use this method when you need direct access to raw device responses, such as during device identification, debugging, or when implementing new command support.
The method automatically: - Adds the appropriate frame delimiter (CRLF by default) - Uses the device lock for thread safety - Waits for complete device response with timeout
- Args:
- cmd: Complete command string to send (e.g., ‘identify’ or ‘voltage,0,10.5’).
Frame delimiters are added automatically.
- timeout: Maximum time to wait for device response in seconds.
Default: 0.6 seconds
- rx_delimiter: Optional custom receive delimiter bytes.
Default: Device configured FRAME_DELIMITER_READ
- Returns:
Raw response string from device including any delimiters or control characters
- Raises:
DeviceUnavailableException: If device is not connected or transport error occurs TimeoutException: If device does not respond within timeout period
- Example:
>>> # Send raw identification command >>> response = await device.write_raw('identify') >>> print(f"Raw response: {repr(response)}") >>> # Raw response: 'identify,d-drive,v1.2.3
- ‘
>>> >>> # Debugging - see exact device response >>> raw = await device.write_raw('voltage,0') >>> print(f"Response bytes: {raw.encode()}")
- Note:
Prefer write() method for normal operations
No error checking is performed on responses
Response must be manually parsed
Useful for device discovery and debugging
Thread-safe via automatic lock acquisition
- class PiezoChannel[source]
Bases:
objectBase class representing a single channel in a multi-channel piezo device.
A PiezoChannel encapsulates the functionality for a single actuator channel in devices that support multiple independent piezo outputs. Each channel can typically be controlled independently with its own voltage, position, PID settings, and other parameters.
The channel communicates with the parent device via a write callback function, which handles the low-level command transmission. This design allows channels to be device-agnostic while providing a consistent interface.
Specific device implementations typically define PiezoCapability instances as attributes of the channel class to expose functionality like setpoint, position, and PID control.
- Type Aliases:
ChannelID: Integer identifier for the channel (typically 0-based) Command: String representing a device command name Param: Union type for command parameters (float | int | bool | str) WriteCallback: Async function signature for sending commands to the device
- BACKUP_COMMANDS
Set of command names that should be backed up when saving channel configuration. Subclasses should override this to specify which commands preserve channel state.
- Type:
set[str]
Example
>>> # Typically created by device class, not directly by users >>> channel = device.channels[0] # Get first channel >>> backup = await channel.backup() # Save channel settings >>> channel_id = channel.id # Get channel identifier
Note
This is a base class. Device-specific implementations should inherit from this and add capability-specific methods and properties.
- BACKUP_COMMANDS: set[str] = {}
- ChannelID = ChannelID
- Command = Command
- Param = Param
- WriteCallback = WriteCallback
- __init__(channel_id: int, write_cb: WriteCallback)[source]
Initialize a piezo channel.
- Parameters:
channel_id – Numeric identifier for this channel. Typically corresponds to the physical channel number on the device (0-based indexing).
write_cb – Async callback function that transmits commands to the device. The callback receives the command string, parameter list, and channel ID, and returns the device’s response as a list of strings.
Example
>>> async def device_write(cmd, params, ch_id): ... return await device.write_channel(ch_id, cmd, params) >>> channel = PiezoChannel(channel_id=0, write_cb=device_write)
- async backup() dict[str, list[str]][source]
Backup current channel configuration by reading all backup commands.
This method queries all commands listed in the BACKUP_COMMANDS class attribute and returns their current values as a dictionary. The backup can later be restored using the parent device class’s restore functionality.
The BACKUP_COMMANDS set should be defined in device-specific channel subclasses and typically includes commands for: - Voltage/position setpoints - PID controller parameters - Filter settings - Control mode configurations
- Returns:
Dictionary mapping command names to their response values (as string lists). The dictionary can be passed to the device’s restore method to recreate this channel configuration.
Example
>>> backup = await channel.backup() >>> # backup = {'voltage': ['10.5'], 'pid_p': ['5.0'], ...} >>> # ... later, restore the configuration ... >>> await device.restore_channel(channel.id, backup)
Note
Base class has an empty BACKUP_COMMANDS set. Subclasses must override this attribute to specify which commands should be backed up.
- property id: int
Get the numeric identifier for this channel.
The channel ID typically corresponds to the physical channel number on the device hardware. Most devices use 0-based indexing (0, 1, 2, etc.).
- Returns:
Integer channel identifier
Example
>>> channel = device.channels[0] >>> print(f"Channel ID: {channel.id}") # Channel ID: 0
d-Drive Family
Base classes for the piezosystem jena d-Drive device family.
This module provides the shared implementation for d-Drive family devices, including the multi-channel d-Drive modular amplifiers and the single-channel 30DV series (30DV50/300).
- Common family traits:
20-bit resolution with 50 kHz sampling rate (50 kSPS)
Digital PID controllers with filter stages
Integrated waveform generator and data recorder
RS-232/USB
For detailed hardware specifications, refer to the d-Drive Instruction Manual: https://www.piezosystem.com/products/amplifiers/modular/50ma-300ma-ddrive-digital-systems/
- class DDriveFamilyDevice[source]
Bases:
PiezoDeviceBase class for d-Drive family devices.
This class defines common behavior for d-Drive family devices, including the multi-channel d-Drive modular amplifier and the single-channel 30DV series (30DV50/300). Subclasses provide the concrete channel discovery and device identifiers.
Family features: - 20-bit resolution - 50 kHz sampling rate (20 µs control loop period) - Digital PID controllers with feedforward - Multiple filter stages (notch, low-pass, error filter) - Integrated waveform generator and data recorder - Hardware trigger output - Analog monitor output - Modulation input
- DEVICE_ID
Device type identifier string
- BACKUP_COMMANDS
Commands excluded from backup operations
- Type:
set[str]
- CACHEABLE_COMMANDS
Commands whose responses can be cached
- Type:
set[str]
Example
>>> from psj_lib import DDriveDevice, TransportType >>> device = DDriveDevice(TransportType.SERIAL, 'COM3') >>> await device.connect() >>> print(f"Found {len(device.channels)} channels") >>> channel = device.channels[0] >>> await channel.closed_loop_controller.set(True) >>> await channel.setpoint.set(50.0) >>> pos = await channel.position.get() >>> print(f"Position: {pos:.3f} µm")
Note
d-Drive modular systems support 1-6 channels (hardware dependent)
PSJ 30DV devices expose a single channel (ID 0)
Channels are numbered 0-5; not all IDs may be populated
Use device.channels dict to access available channels
- BACKUP_COMMANDS: set[str] = {}
Global device commands to include in backup operations (currently none for d-Drive).
- CACHEABLE_COMMANDS: set[str] = {'acclmas', 'acdescr', 'acolmas', 'bright', 'cl', 'elpor', 'errlpf', 'fan', 'ganoi', 'garec', 'gasin', 'gaswe', 'gatri', 'gfkt', 'gfrec', 'gfsin', 'gftri', 'gonoi', 'gorec', 'gosin', 'goswe', 'gotri', 'gsrec', 'gstri', 'gtswe', 'kd', 'ki', 'kp', 'lpf', 'lpon', 'modon', 'monsrc', 'notchb', 'notchf', 'notchon', 'pcf', 'recstride', 'sct', 'set', 'sr', 'tf', 'trgedge', 'trglen', 'trgos', 'trgse', 'trgsi', 'trgsrc', 'trgss'}
Commands whose responses can be cached for performance optimization.
These commands return relatively static configuration values that don’t change frequently. Caching reduces communication overhead for reads.
- DEFAULT_TIMEOUT_SECS = 0.5
- DEVICE_ID = 'd-Drive Family Device'
Device type identifier used for device discovery and type checking.
- D_DRIVE_IDENTIFIER = 'INVALID_STRING'
Internal identifier string used to recognize different d-Drive family devices. Overridden in subclasses.
- ERROR_MAP = {' not present': ErrorCode.UNKNOWN_CHANNEL, 'command mismatch': ErrorCode.COMMAND_PARAMETER_COUNT_EXCEEDED, 'command not found': ErrorCode.UNKNOWN_COMMAND, 'unit not available': ErrorCode.ACTUATOR_NOT_CONNECTED}
- FRAME_DELIMITER_MAP = {'elpor': b'\r', 'errlpf': b'\r', 'ganoi': b'\r', 'garec': b'\r', 'gasin': b'\r', 'gaswe': b'\r', 'gatri': b'\r', 'gfkt': b'\r', 'gfrec': b'\r', 'gfsin': b'\r', 'gftri': b'\r', 'gonoi': b'\r', 'gorec': b'\r', 'gosin': b'\r', 'goswe': b'\r', 'gotri': b'\r', 'gsrec': b'\r', 'gstri': b'\r', 'gtswe': b'\r', 'kd': b'\r', 'ki': b'\r', 'kp': b'\r', 'ktemp': b'\r', 'lpf': b'\r', 'lpon': b'\r', 'm': b'\r', 'modon': b'\r', 'monsrc': b'\r', 'notchb': b'\r', 'notchf': b'\r', 'notchon': b'\r', 'pcf': b'\r', 'sct': b'\r', 'sr': b'\r', 'tf': b'\r', 'trgedge': b'\r', 'trglen': b'\r', 'trgos': b'\r', 'trgse': b'\r', 'trgsi': b'\r', 'trgsrc': b'\r', 'trgss': b'\r', 'u': b'\r'}
- FRAME_DELIMITER_READ = b'\x11'
- FRAME_DELIMITER_WRITE = b'\r\n'
- property channels: dict[int, DDriveFamilyChannel]
Get dictionary of available d-Drive amplifier channels.
- Returns:
Dictionary mapping channel number (0-5) to DDriveFamilyChannel instance, or None for unpopulated slots
Example
>>> # Iterate over all available channels >>> for ch_num, channel in device.channels.items(): ... if channel is not None: ... pos = await channel.position.get() ... print(f"Channel {ch_num}: {pos} µm") >>> >>> # Access specific channel >>> if device.channels[0] is not None: ... await device.channels[0].setpoint.set(75.0)
Note
Channel numbers 0-5
None values indicate empty amplifier slots
Check for None before accessing channel
- async write_raw(cmd, timeout: float = 0.5, rx_delimiter: bytes = b'\x11') str[source]
Send a raw command string and return unparsed device response.
This low-level method sends a command directly to the device without: - Parameter formatting or conversion - Response parsing or error checking - Caching of any kind
Use this method when you need direct access to raw device responses, such as during device identification, debugging, or when implementing new command support.
The method automatically: - Adds the appropriate frame delimiter (CRLF by default) - Uses the device lock for thread safety - Waits for complete device response with timeout
- Args:
- cmd: Complete command string to send (e.g., ‘identify’ or ‘voltage,0,10.5’).
Frame delimiters are added automatically.
- timeout: Maximum time to wait for device response in seconds.
Default: 0.6 seconds
- rx_delimiter: Optional custom receive delimiter bytes.
Default: Device configured FRAME_DELIMITER_READ
- Returns:
Raw response string from device including any delimiters or control characters
- Raises:
DeviceUnavailableException: If device is not connected or transport error occurs TimeoutException: If device does not respond within timeout period
- Example:
>>> # Send raw identification command >>> response = await device.write_raw('identify') >>> print(f"Raw response: {repr(response)}") >>> # Raw response: 'identify,d-drive,v1.2.3
- ‘
>>> >>> # Debugging - see exact device response >>> raw = await device.write_raw('voltage,0') >>> print(f"Response bytes: {raw.encode()}")
- Note:
Prefer write() method for normal operations
No error checking is performed on responses
Response must be manually parsed
Useful for device discovery and debugging
Thread-safe via automatic lock acquisition
d-Drive amplifier channel implementation with all capabilities.
Each d-Drive channel is an independent piezo amplifier with digital control, providing high-precision positioning with 20-bit resolution and 50 kHz sampling.
- class DDriveFamilyChannel[source]
Bases:
PiezoChannelSingle d-Drive amplifier channel with all control capabilities.
Each d-Drive channel provides complete control of one piezoelectric actuator with:
Hardware Specifications (from d-Drive manual): - 20-bit resolution - 50 kHz sampling rate (20µs control loop period) - Digital PID controller with feedforward (PCF) - Multiple filter stages:
Notch filter (resonance suppression)
Low-pass filter (noise reduction)
Error low-pass filter (PID stability)
Signal Generation: - Waveform generator (sine, triangle, rectangle, sweep, noise) - Modulation input (external analog or internal waveform) - Analog monitor output (configurable signal routing)
Data Acquisition: - Two-channel data recorder - 500,000 samples per channel maximum - 50 kHz max sample rate with stride (decimation) support - Channel 1: position signal, Channel 2: actuator voltage
Trigger System: - Hardware trigger output (TTL) - Window mode with start/stop thresholds - Periodic triggers with configurable interval - Edge detection (rising, falling, both)
- SAMPLE_PERIOD
Control loop period in microseconds (20µs = 50kHz)
- Type:
int
- BACKUP_COMMANDS
Channel-specific commands included in backup
- Type:
set[str]
Example
>>> channel = device.channels[0] >>> >>> # Configure closed-loop control with PID tuning >>> await channel.closed_loop_controller.set(True) >>> await channel.pid_controller.set(p=10.0, i=5.0, d=0.5) >>> >>> # Enable notch filter to suppress 500 Hz resonance >>> await channel.notch.set( ... enabled=True, ... frequency=500.0, ... bandwidth=50.0 ... ) >>> >>> # Generate 10 Hz sine wave for scanning >>> await channel.waveform_generator.sine.set( ... amplitude=20.0, ... offset=50.0, ... frequency=10.0 ... ) >>> await channel.waveform_generator.set_waveform_type( ... WaveformType.SINE ... ) >>> >>> # Configure data recorder for 1 second capture >>> await channel.data_recorder.set( ... memory_length=50000, # 50k samples at 50kHz = 1 sec ... stride=1 # No decimation ... )
Note
Sample period defines control loop and data recorder timing
Capabilities are accessed as properties (e.g., channel.setpoint)
Hardware-specific enums defined in d_drive/capabilities/
- BACKUP_COMMANDS: set[str] = {'cl', 'elpor', 'errlpf', 'ganoi', 'garec', 'gasin', 'gaswe', 'gatri', 'gfkt', 'gfrec', 'gfsin', 'gftri', 'gonoi', 'gorec', 'gosin', 'goswe', 'gotri', 'gsrec', 'gstri', 'gtswe', 'kd', 'ki', 'kp', 'lpf', 'lpon', 'modon', 'monsrc', 'notchb', 'notchf', 'notchon', 'pcf', 'sct', 'sr', 'tf', 'trgedge', 'trglen', 'trgos', 'trgse', 'trgsi', 'trgsrc', 'trgss'}
d-Drive commands to include in backup operations.
These commands represent channel configuration that should be saved and restored during backup/restore operations. Includes: - Control parameters (PID, slew rate, PCF) - Filter settings (notch, low-pass, error filter) - Waveform generator configuration - Trigger and recorder settings - Display and monitoring configuration
Note
Dynamic state (position, setpoint) not included
Only configuration parameters are backed up
- SAMPLE_PERIOD: int = 20
Control loop sample period in microseconds.
The d-Drive control loop runs at 50 kHz (20µs period). This timing affects: - PID controller update rate - Data recorder maximum sample rate (50 kHz) - Waveform generator resolution - Trigger output timing
Note
20µs period = 50 kHz sampling frequency
Actual system bandwidth depends on actuator and filters
- actuator_description: ActuatorDescription
Actuator description and identification.
Retrieves human-readable description of connected piezo actuator.
Example
>>> desc = await channel.actuator_description.get() >>> print(f"Actuator: {desc}")
- async backup() dict[str, list[str]][source]
Backup current channel configuration by reading all backup commands.
This method queries all commands listed in the BACKUP_COMMANDS class attribute and returns their current values as a dictionary. The backup can later be restored using the parent device class’s restore functionality.
The BACKUP_COMMANDS set should be defined in device-specific channel subclasses and typically includes commands for: - Voltage/position setpoints - PID controller parameters - Filter settings - Control mode configurations
- Returns:
Dictionary mapping command names to their response values (as string lists). The dictionary can be passed to the device’s restore method to recreate this channel configuration.
Example
>>> backup = await channel.backup() >>> # backup = {'voltage': ['10.5'], 'pid_p': ['5.0'], ...} >>> # ... later, restore the configuration ... >>> await device.restore_channel(channel.id, backup)
Note
Base class has an empty BACKUP_COMMANDS set. Subclasses must override this attribute to specify which commands should be backed up.
- closed_loop_controller: DDriveClosedLoopController
Closed-loop position control enable/disable.
Enable sensor-based feedback control for precise positioning.
To configure the closed-loop PID controller, use the
pid_controllerproperty.Example
>>> await channel.closed_loop_controller.set(True) >>> is_closed = await channel.closed_loop_controller.get_enabled()
- data_recorder: DDriveDataRecorder
Two-channel data recorder for signal capture.
Records position (channel 1) and voltage (channel 2) at up to 50 kHz (20 µs period). Maximum 500,000 samples per channel.
Channel mapping: - Channel 1: Position sensor signal - Channel 2: Actuator voltage
The waveform generator is automatically started when sending a new setpoint, starting a waveform output or starting a scan.
Example
>>> from psj_lib import DDriveDataRecorderChannel >>> # Configure for 1 second at 50 kHz >>> await channel.data_recorder.set( ... memory_length=50000, ... stride=1 # No decimation ... ) >>> await channel.data_recorder.start() >>> # ... perform motion ... >>> position_data = await channel.data_recorder.get_all_data( ... DDriveDataRecorderChannel.POSITION ... ) >>> voltage_data = await channel.data_recorder.get_all_data( ... DDriveDataRecorderChannel.VOLTAGE ... )
Note
Sample period = 20 µs (50 kHz max rate)
Both channels always record same length
Use stride for longer durations at lower rate
- error_lpf: ErrorLowPassFilter
Error signal low-pass filter for PID input.
Filters the position error signal before PID controller to reduce noise and improve stability.
Example
>>> await channel.error_lpf.set( ... cutoff_frequency=200.0, # 200 Hz ... order=2 # 2nd order filter ... )
- fan: Fan
Cooling fan control for thermal management.
Enable or disable the channel’s cooling fan.
Example
>>> await channel.fan.set(True) # Enable fan >>> is_on = await channel.fan.get_enabled()
- lpf: LowPassFilter
Low-pass filter for signal conditioning.
Reduces high-frequency noise in position or control signals.
Example
>>> await channel.lpf.set( ... enabled=True, ... cutoff_frequency=100.0 # 100 Hz cutoff ... )
- modulation_source: ModulationSource
Modulation input source selection.
Select modulation source from d-Drive specific options: - SERIAL_ENCODER: Serial encoder input - SERIAL_ENCODER_ANALOG: Analog serial encoder
Example
>>> from psj_lib import DDriveModulationSourceTypes >>> await channel.modulation_source.set_source( ... DDriveModulationSourceTypes.SERIAL_ENCODER ... )
- monitor_output: MonitorOutput
Analog monitor output source routing.
Route internal signals to analog monitor output connector. Available sources: - CLOSED_LOOP_POSITION: Closed-loop position value - SETPOINT: Commanded setpoint - CONTROLLER_VOLTAGE: PID controller output - POSITION_ERROR: Setpoint - position error - POSITION_ERROR_ABS: Absolute position error - ACTUATOR_VOLTAGE: Output voltage to actuator - OPEN_LOOP_POSITION: Open-loop position estimate
Example
>>> from psj_lib import DDriveMonitorOutputSource >>> await channel.monitor_output.set_source( ... DDriveMonitorOutputSource.POSITION_ERROR ... )
- notch: NotchFilter
Notch filter for mechanical resonance suppression.
Attenuates specific frequencies to suppress resonances that can cause instability in closed-loop systems.
Example
>>> await channel.notch.set( ... enabled=True, ... frequency=500.0, # Suppress 500 Hz resonance ... bandwidth=50.0 # 50 Hz bandwidth ... )
- pcf: PreControlFactor
Pre-control factor for feedforward compensation.
Provides feedforward control to improve response speed and reduce tracking error in closed-loop operation.
Example
>>> await channel.pcf.set(0.5) # 50% feedforward >>> pcf_val = await channel.pcf.get()
- pid_controller: PIDController
PID controller parameter configuration.
Configure Proportional, Integral, Derivative, and differential filter (TF) parameters for closed-loop control.
Example
>>> await channel.pid_controller.set( ... p=10.0, # Proportional gain ... i=5.0, # Integral gain ... d=0.5, # Derivative gain ... diff_filter=100.0 # Diff filter time constant ... )
- position: Position
Actual position readback from sensor.
Reads the current measured position from the actuator’s position sensor. In closed-loop mode, this is the feedback value. In open loop mode, this is the output voltage.
Please note that for d-Drive devices, the position value is only updated every 500ms.
Example
>>> pos = await channel.position.get() >>> print(f"Current position: {pos:.3f} µm")
- setpoint: DDriveSetpoint
Target position control (commanded position).
Set and read the desired actuator position. In closed-loop mode, the controller actively drives to this position. In open loop mode, the setpoint acts as the voltage output to the actuator.
Example
>>> await channel.setpoint.set(75.0) # Move to 75 µm >>> target = await channel.setpoint.get()
- slew_rate: SlewRate
Maximum rate of change limiting.
Limits how fast the actuator can change position, providing smooth motion and protecting actuators and samples.
Example
>>> await channel.slew_rate.set(10.0) # 10 V/ms >>> rate = await channel.slew_rate.get()
- status_register: Status
d-Drive status register with hardware state information.
Provides real-time status including: - actor_plugged: Whether actuator is connected - sensor_type: Type of position sensor (STRAIN_GAUGE, CAPACITIVE, etc.) - piezo_voltage_enabled: Output voltage enable state - closed_loop: Closed-loop control active status - waveform_generator_status: Current waveform type (SINE, TRIANGLE, etc.) - notch_filter_active: Notch filter enable state - low_pass_filter_active: Low-pass filter enable state
Example
>>> status = await channel.status_register.get() >>> if status.actor_plugged: ... print(f"Sensor: {status.sensor_type.name}") ... print(f"Closed-loop: {status.closed_loop}")
- temperature: Temperature
Channel electronics temperature monitoring.
Reads internal temperature of the amplifier electronics for thermal management and diagnostics.
Example
>>> temp = await channel.temperature.get() >>> print(f"Temperature: {temp:.1f}°C")
- trigger_out: DDriveTriggerOut
Hardware trigger output generation.
d-Drive specific trigger output with additional offset parameter. Generates TTL pulses synchronized with actuator position or setpoint.
Example
>>> from psj_lib import TriggerEdge, TriggerDataSource >>> await channel.trigger_out.set( ... start_value=20.0, # Start at 20 µm ... stop_value=80.0, # Stop at 80 µm ... interval=10.0, # Trigger every 10 µm ... length=1000, # 1000 µs pulse ... edge=TriggerEdge.BOTH, ... src=TriggerDataSource.POSITION, ... offset=0.0 # d-Drive specific offset parameter ... )
- waveform_generator: DDriveWaveformGenerator
Multi-waveform generator for scanning and modulation.
d-Drive waveform generator supporting multiple waveform types: - SINE: Sinusoidal waveform - TRIANGLE: Triangular waveform with adjustable duty cycle - RECTANGLE: Square/rectangle wave with duty cycle - NOISE: Random noise for dithering - SWEEP: Linear sweep (ramp)
Each waveform type has dedicated configuration via properties: - waveform_generator.sine: Sine wave parameters - waveform_generator.triangle: Triangle wave parameters - waveform_generator.rectangle: Rectangle wave parameters - waveform_generator.noise: Noise parameters - waveform_generator.sweep: Sweep/ramp parameters
For sweep waveform, the frequency parameter acts as the time in seconds to complete one full sweep cycle (0.1 Hz to 10kHz).
Scan function for automated single or double scans.
Example
>>> from psj_lib import DDriveWaveformType, DDriveScanType >>> # Configure sine wave >>> await channel.waveform_generator.sine.set( ... amplitude=20.0, # 20 µm peak-to-peak ... offset=50.0, # Centered at 50 µm ... frequency=10.0 # 10 Hz ... ) >>> # Activate sine waveform >>> await channel.waveform_generator.set_waveform_type( ... DDriveWaveformType.SINE ... ) >>> >>> # Configure triangle with asymmetric duty cycle >>> await channel.waveform_generator.triangle.set( ... amplitude=30.0, ... offset=50.0, ... frequency=5.0, ... duty_cycle=70.0 # 70% rise, 30% fall ... ) >>> await channel.waveform_generator.set_waveform_type( ... DDriveWaveformType.TRIANGLE ... ) >>> >>> # Start automated scan >>> from psj_lib import DDriveScanType >>> await channel.waveform_generator.start_scan( ... DDriveScanType.TRIANGLE_ONCE ... )
Note
Set waveform parameters before activating type
By setting a waveform type, the generator is started. To stop, set type to NONE.
Only one waveform type active at a time
Scan function provides automated scanning sequences
Check status with get_waveform_type() or is_scan_running()
Piezosystem Jena d-Drive modular piezo amplifier device implementation.
The d-Drive is a modular, expandable piezoelectric amplifier system designed for high-precision nanopositioning. It features:
20-bit resolution with 50 kHz sampling rate (50 kSPS)
1 to 6 amplifier channels per system
Digital PID controllers with several filter stages
Integrated waveform generator (sine, triangle, rectangle, sweep, noise)
Two-channel data recorder (500k maximum samples per channel)
Modular architecture with hot-swappable amplifier modules
RS-232/USB connectivity options
For detailed hardware specifications, refer to the d-Drive Instruction Manual. (https://www.piezosystem.com/products/amplifiers/modular/50ma-300ma-ddrive-digital-systems/)
- class DDriveDevice[source]
Bases:
DDriveFamilyDevicePiezosystem Jena d-Drive modular amplifier system.
Represents a complete d-Drive system with 1-6 amplifier channels. Each channel provides independent control of a piezoelectric actuator with digital PID control, waveform generation, and data recording.
The d-Drive system features: - 20-bit resolution - 50 kHz sampling rate (20µs control loop period) - Digital PID controllers with feedforward - Multiple filter stages (notch, low-pass, error filter) - Integrated waveform generator (sine, triangle, rectangle, sweep, noise) - Two-channel data recorder (500k samples per channel) - Hardware trigger output - Analog monitor output - Modulation input
- DEVICE_ID
Device type identifier string
- BACKUP_COMMANDS
Commands excluded from backup operations
- CACHEABLE_COMMANDS
Commands whose responses can be cached
Example
>>> from psj_lib import DDriveDevice, TransportType >>> # Connect to d-Drive via serial port >>> device = DDriveDevice(TransportType.SERIAL, 'COM3') >>> await device.connect() >>> print(f"Found {len(device.channels)} channels") >>> >>> # Access channel 0 >>> channel = device.channels[0] >>> # Enable closed-loop control >>> await channel.closed_loop_controller.set(True) >>> # Move to position >>> await channel.setpoint.set(50.0) # 50 µm >>> # Read actual position >>> pos = await channel.position.get() >>> print(f"Position: {pos:.3f} µm")
Note
System supports 1-6 channels (hardware dependent)
Channels are numbered 0-5
Not all channel numbers may be populated
Use device.channels dict to access available channels
- DEVICE_ID = 'd-Drive'
Device type identifier used for device discovery and type checking.
- D_DRIVE_IDENTIFIER = 'DSM'
Internal identifier string used to recognize different d-Drive family devices.
- MAX_CHANNEL_COUNT = 6
The d-Drive system can support up to 6 amplifier channels, but actual count depends on hardware configuration.
- property channels: dict[int, DDriveChannel]
Get dictionary of available d-Drive amplifier channels.
- Returns:
Dictionary mapping channel number (0-5) to DDriveChannel instance, or None for unpopulated slots
Example
>>> # Iterate over all available channels >>> for ch_num, channel in device.channels.items(): ... if channel is not None: ... pos = await channel.position.get() ... print(f"Channel {ch_num}: {pos} µm") >>> >>> # Access specific channel >>> if device.channels[0] is not None: ... await device.channels[0].setpoint.set(75.0)
Note
Channel numbers 0-5
None values indicate empty amplifier slots
Check for None before accessing channel
d-Drive amplifier channel implementation with all capabilities.
Each d-Drive channel is an independent piezo amplifier with digital control, providing high-precision positioning with 20-bit resolution and 50 kHz sampling.
- class DDriveChannel[source]
Bases:
DDriveFamilyChannelSingle d-Drive amplifier channel with all control capabilities.
Each d-Drive channel provides complete control of one piezoelectric actuator with:
Hardware Specifications (from d-Drive manual): - 20-bit resolution - 50 kHz sampling rate (20µs control loop period) - Digital PID controller with feedforward (PCF) - Multiple filter stages:
Notch filter (resonance suppression)
Low-pass filter (noise reduction)
Error low-pass filter (PID stability)
Signal Generation: - Waveform generator (sine, triangle, rectangle, sweep, noise) - Modulation input (external analog or internal waveform) - Analog monitor output (configurable signal routing)
Data Acquisition: - Two-channel data recorder - 500,000 samples per channel maximum - 50 kHz max sample rate with stride (decimation) support - Channel 1: position signal, Channel 2: actuator voltage
Trigger System: - Hardware trigger output (TTL) - Window mode with start/stop thresholds - Periodic triggers with configurable interval - Edge detection (rising, falling, both)
- SAMPLE_PERIOD
Control loop period in microseconds (20µs = 50kHz)
- Type:
int
- BACKUP_COMMANDS
Channel-specific commands included in backup
- Type:
set[str]
Example
>>> channel = device.channels[0] >>> >>> # Configure closed-loop control with PID tuning >>> await channel.closed_loop_controller.set(True) >>> await channel.pid_controller.set(p=10.0, i=5.0, d=0.5) >>> >>> # Enable notch filter to suppress 500 Hz resonance >>> await channel.notch.set( ... enabled=True, ... frequency=500.0, ... bandwidth=50.0 ... ) >>> >>> # Generate 10 Hz sine wave for scanning >>> await channel.waveform_generator.sine.set( ... amplitude=20.0, ... offset=50.0, ... frequency=10.0 ... ) >>> await channel.waveform_generator.set_waveform_type( ... WaveformType.SINE ... ) >>> >>> # Configure data recorder for 1 second capture >>> await channel.data_recorder.set( ... memory_length=50000, # 50k samples at 50kHz = 1 sec ... stride=1 # No decimation ... )
Note
Sample period defines control loop and data recorder timing
Capabilities are accessed as properties (e.g., channel.setpoint)
Hardware-specific enums defined in d_drive_family/capabilities/
30DV device implementation (single-channel d-Drive family device).
- class PSJ30DVDevice[source]
Bases:
DDriveFamilyDevicePiezosystem Jena PSJ 30DV50/300 piezo amplifier device.
The PSJ 30DV50/300 is a standalone, single-channel amplifier that is protocol compatible with the d-Drive family. It exposes one channel (ID 0) with the same capability set as a d-Drive channel.
- DEVICE_ID
Device type identifier string
- D_DRIVE_IDENTIFIER
Internal identifier string used for discovery
Example
>>> from psj_lib import PSJ30DVDevice, TransportType >>> device = PSJ30DVDevice(TransportType.SERIAL, 'COM3') >>> await device.connect() >>> channel = device.channels[0] >>> await channel.setpoint.set(10.0)
- DEVICE_ID = '30DV50/300'
Device type identifier used for device discovery and type checking.
- D_DRIVE_IDENTIFIER = 'AP'
Internal identifier string used to recognize different d-Drive family devices.
- MAX_CHANNEL_COUNT = 1
Indicates that PSJ 30DV is a single-channel device.
- property channels: dict[int, PSJ30DVChannel]
Typed channel mapping for PSJ 30DV (single channel at ID 0).
PSJ 30DV channel implementation (single-channel d-Drive family).
- class PSJ30DVChannel[source]
Bases:
DDriveFamilyChannelChannel class for PSJ 30DV devices.
Inherits all d-Drive family capabilities and behavior. The PSJ 30DV exposes a single channel (ID 0) with the same capability set as d-Drive channels.
NV Family
NV family base device implementation.
This module defines shared transport, discovery, error handling, and global capabilities for NV-series amplifiers.
- class NVFamilyDevice[source]
Bases:
PiezoDeviceBase class for all supported NV-series devices.
Implements shared NV-family behavior: - Device identification at NV serial baudrate - NV frame delimiters and error parsing - Channel discovery from
MAX_CHANNEL_COUNT- Common device-level capabilitiesExample
>>> device = NV403CLEDevice(TransportType.SERIAL, "COM10") >>> async with device: ... await device.display.set(brightness=35.0) ... ch0 = device.channels[0] ... await ch0.setpoint.set(20.0)
- BACKUP_COMMANDS: set[str] = {'encexp', 'enclim', 'encmode', 'encstol', 'enctime', 'light'}
Commands to include in backup operations for state restoration.
- CACHEABLE_COMMANDS: set[str] = {'dspclmax', 'dspclmin', 'dspvmax', 'dspvmin', 'encexp', 'enclim', 'encmode', 'encstol', 'enctime', 'light', 'monwpa', 'setk', 'unitcl', 'unitol'}
Commands whose responses can be cached to optimize performance.
- DEVICE_ID = 'NV Family Device'
Device type identifier used for device discovery and type checking.
- ERROR_MAP = {11: ErrorCode.UNKNOWN_COMMAND, 15: ErrorCode.UNKNOWN_CHANNEL, 16: ErrorCode.UNKNOWN_CHANNEL, 17: ErrorCode.PARAMETER_MISSING, 18: ErrorCode.ADMISSIBLE_PARAMETER_RANGE_EXCEEDED, 25: ErrorCode.ACTUATOR_NOT_CONNECTED}
- FRAME_DELIMITER_READ = b'\x11'
- FRAME_DELIMITER_WRITE = b'\r'
- MAX_CHANNEL_COUNT = 0
Maximum number of channels supported by this device family. Overridden in subclasses.
- NV_CHANNEL_TYPE
Channel class type used for instantiating channels in this device family. Overridden in subclasses.
alias of
NVFamilyChannel
- NV_FAMILY_IDENTIFIER = 'INVALID_STRING'
Internal identifier string used to recognize different NV Family devices. Overridden in subclasses.
- SERIAL_BAUDRATE = 19200
Baudrate for serial communication during device identification. NV Family devices respond at 19200 baud.
- property channels: dict[int, NVFamilyChannel]
Typed access to NV family channels.
- Returns:
Dictionary mapping channel IDs to
NVFamilyChannelinstances (or subclass instances on model-specific devices).
NV family channel model and capability mapping.
This module defines the shared channel-level capabilities for supported NV-series amplifiers.
- class NVFamilyChannel[source]
Bases:
PiezoChannelBase channel for NV-series amplifiers.
This class provides common channel capabilities across open-loop and closed-loop NV variants.
Common capabilities include: - Setpoint and position control/readback - Status register access with NV-specific flag decoding - Modulation source selection - Monitor output routing - Open-loop unit and limit queries
Example
>>> channel = device.channels[0] >>> await channel.setpoint.set(25.0) >>> pos = await channel.position.get() >>> status = await channel.status.get() >>> print(pos, status.over_temperature)
- BACKUP_COMMANDS: set[str] = {'cloop', 'monwpa', 'setk'}
NV channel commands included in backup/restore operations.
- GLOBAL_COMMANDS: set[str] = {'ERROR', 'dspvmax', 'dspvmin'}
Commands that should belong to a channel, but the device expects them without a channel ID prefix. Handled specially in the _write method.
- modulation_source: NVModulationSource
Channel modulation source selection.
Expects
NVModulationSourceTypesvalues. Readback value is always the last set value, due to the lack of direct readback for this value in NV devices.
- monitor_output: NVMonitorOutput
Analog monitor output source routing.
Expects
NVMonitorOutputSourcevalues. Readback value is always the last set value, due to the lack of direct readback for this value in NV devices.
- openloop_limits: Limits
Open-loop lower and upper limit queries.
Example
>>> lower, upper = await channel.openloop_limits.get_range()
- openloop_unit: Unit
Open-loop unit readback capability.
Example
>>> unit = await channel.openloop_unit.get()
- position: Position
Channel position readback capability.
Example
>>> actual = await channel.position.get()
- setpoint: NVSetpoint
Channel setpoint capability.
Readback value is always the last set value, due to the lack of direct readback for this value in NV devices.
Example
>>> await channel.setpoint.set(40.0) >>> target = await channel.setpoint.get()
- status: Status
NV status register access.
Returns
NVStatusRegisterfor interpreted per-channel flags.
- class NV403Device[source]
Bases:
NVFamilyDeviceDevice class for NV40/3 open-loop 3-channel amplifier.
Provides three channel instances plus coordinated multi-channel helpers.
Example
>>> device = NV403Device(TransportType.SERIAL, "COM10") >>> async with device: ... await device.multi_setpoint.set([10.0, 20.0, 30.0]) ... print(await device.multi_position.get())
- DEVICE_ID = 'NV40/3'
Device type identifier used for device discovery and type checking.
- MAX_CHANNEL_COUNT = 3
Maximum number of channels supported by this device family. Overridden in subclasses.
- NV_CHANNEL_TYPE
alias of
NV403Channel
- NV_FAMILY_IDENTIFIER = 'NV403'
Internal identifier string used to recognize different NV Family devices. Overridden in subclasses.
- property channels: dict[int, NV403Channel]
Typed channel mapping for NV40/3.
- multi_position: MultiPosition
Single-command position readback for all three channels.
- multi_setpoint: MultiSetpoint
Synchronous setpoint write for all three channels.
Note: To use this capability, all 3 channels must have an actuator connected and their modulation mode set to SERIAL. Otherwise, the amplifier will ignore the command.
- class NV403Channel[source]
Bases:
NVFamilyChannelOpen-loop channel model used by NV40/3 devices.
Each NV40/3 channel exposes the shared NV channel capabilities for open-loop operation.
Example
>>> ch1 = device.channels[1] >>> await ch1.setpoint.set(18.0) >>> print(await ch1.openloop_unit.get())
- class NV403CLEDevice[source]
Bases:
NVFamilyDeviceDevice class for NV40/3CLE closed-loop 3-channel amplifier.
Provides three closed-loop capable channels and coordinated multi-channel helper capabilities.
Example
>>> device = NV403CLEDevice(TransportType.SERIAL, "COM10") >>> async with device: ... ch0 = device.channels[0] ... await ch0.closed_loop_controller.set(True) ... await device.multi_setpoint.set([5.0, 10.0, 15.0])
- DEVICE_ID = 'NV40/3CLE'
Device type identifier used for device discovery and type checking.
- MAX_CHANNEL_COUNT = 3
Maximum number of channels supported by this device family. Overridden in subclasses.
- NV_CHANNEL_TYPE
alias of
NV403CLEChannel
- NV_FAMILY_IDENTIFIER = 'NV403CLE'
Internal identifier string used to recognize different NV Family devices. Overridden in subclasses.
- property channels: dict[int, NV403CLEChannel]
Typed channel mapping for NV40/3CLE.
- multi_position: MultiPosition
Single-command position readback for all three channels.
- multi_setpoint: MultiSetpoint
Synchronous setpoint write for all three channels.
Note: To use this capability, all 3 channels must have an actuator connected and their modulation mode set to SERIAL. Otherwise, the amplifier will ignore the command.
- class NV403CLEChannel[source]
Bases:
NVFamilyChannelClosed-loop channel model used by NV40/3CLE devices.
Extends
NVFamilyChannelwith closed-loop specific capabilities for each of the three channels.Example
>>> ch2 = device.channels[2] >>> await ch2.closed_loop_controller.set(True) >>> await ch2.setpoint.set(9.0) >>> limits = await ch2.closedloop_limits.get_range()
- closed_loop_controller: ClosedLoopController
Closed-loop feedback control enable/disable.
Base Capabilities
Status and Monitoring
Device status register capability.
- class Status[source]
Bases:
PiezoCapabilityQuery device status register.
Retrieves the current status of the device, returning a status register object that interprets device state, error conditions, and operational flags.
The status register class is device-specific and provided during capability initialization.
Example
>>> status_cap = device.status >>> status_reg = await status_cap.get() >>> # Device-specific status properties >>> if hasattr(status_reg, 'has_error'): ... if status_reg.has_error: ... print("Device error detected")
Note
Status format is device-specific
Register type specified at capability creation
Provides real-time device state information
- CMD_STATUS = 'STATUS'
- __init__(*args, register_type: type[StatusRegister], **kwargs) None[source]
Initialize status capability with register interpreter.
- Parameters:
write_cb – Command execution callback
device_commands – Command mapping dictionary
register_type – Device-specific StatusRegister subclass
- async get() StatusRegister[source]
Query device status register.
- Returns:
StatusRegister instance with device state information
Example
>>> status = await device.status.get() >>> # Access device-specific properties >>> print(f"Status: {status.raw}")
Note
Return type depends on device-specific register class
Status interpretation varies by device model
- class StatusRegister[source]
Bases:
objectBase class for device status register interpretation.
Holds raw status data from device. Device-specific subclasses interpret the raw status bits to provide meaningful properties and error conditions.
- _raw
Raw status response from device
Example
>>> # Subclass provides bit interpretation >>> class DeviceStatusRegister(StatusRegister): ... @property ... def is_moving(self) -> bool: ... return bool(int(self._raw) & 0x01) >>> >>> status = await device.status.get() >>> if status.is_moving: ... print("Device is moving")
Note
Device-specific implementations decode status bits
Raw value format varies by device model
- __init__(value: list[str], channel_id: int | None = None) None[source]
Initialize status register with raw device response.
- Parameters:
value – Raw status response from device
channel_id – Optional channel identifier for multi-channel devices (might be used by subclasses)
- property raw: list[str]
Get the raw status register value from the device.
- Returns:
Raw status response as received from the device
Example
>>> status = await device.status.get() >>> print(f"Raw status: {status.raw}")
Temperature monitoring capability.
- class Temperature[source]
Bases:
PiezoCapabilityRead device internal temperature.
Monitors the temperature of device electronics or power stages. Temperature monitoring helps prevent overheating and can be used for thermal management or diagnostic purposes.
Example
>>> temp_cap = device.temperature >>> temp = await temp_cap.get() >>> print(f"Device temperature: {temp:.1f}°C") >>> if temp > 60: ... print("Warning: High temperature")
Note
Temperature unit is typically degrees Celsius
Sensor location varies by device (electronics, power stage)
- CMD_TEMPERATURE = 'TEMPERATURE'
- async get() float[source]
Get the current device temperature.
- Returns:
Temperature value in degrees Celsius
Example
>>> temperature = await device.temperature.get() >>> print(f"Current temp: {temperature:.2f}°C")
Note
Reading frequency depends on device update rate
Use for thermal monitoring and diagnostics
Actuator description capability.
- class ActuatorDescription[source]
Bases:
PiezoCapabilityQuery descriptive information about the connected actuator.
Retrieves a human-readable description of the piezoelectric actuator attached to a channel. This may include model number, specifications, or other identifying information.
Example
>>> desc_cap = channel.actuator_description >>> description = await desc_cap.get() >>> print(f"Actuator: {description}") >>> # Actuator: MIPOS 100
Note
Description format is actuator-specific
May include model, travel range, resolution
Useful for logging and system documentation
Some devices return empty string if not configured
- CMD_DESCRIPTION = 'actuator_description'
Position Control
Position readback capability for piezoelectric actuators.
- class Position[source]
Bases:
PiezoCapabilityRead the current position of a piezoelectric actuator.
Provides access to the current measured position of the actuator. In closed-loop systems, this represents the sensor feedback value. In open-loop systems, this may represent the output voltage.
Example
>>> position_cap = channel.position >>> current_pos = await position_cap.get() >>> print(f"Current position: {current_pos} µm")
Note
Position units depend on device configuration
In closed-loop mode: sensor feedback value
In open-loop mode: output voltage representation
Value range depends on actuator specifications
- CMD_POSITION = 'POSITION'
- async get() float[source]
Get the current actuator position.
- Returns:
Current position value in device-configured units (typically µm or volts)
Example
>>> pos = await channel.position.get() >>> print(f"Actuator at {pos:.2f} µm")
Note
Units can be queried via the channels units capability
Update rate depends on device capabilities
Setpoint control capability for piezoelectric actuators.
- class Setpoint[source]
Bases:
PiezoCapabilityControl the target setpoint (desired position) of an actuator.
The setpoint represents the target position that the actuator should move to. In closed-loop mode, the controller adjusts voltage to achieve this setpoint. In open-loop mode, this directly controls the output voltage.
Example
>>> setpoint_cap = channel.setpoint >>> # Move to 50 micrometers >>> await setpoint_cap.set(50.0) >>> # Read current setpoint >>> current = await setpoint_cap.get() >>> print(f"Target position: {current} µm")
Note
Setpoint units match position units (typically µm in closed-loop, V in open-loop)
Range limited by actuator travel range
In closed-loop: controller drives to this position
In open-loop: maps to output voltage
- CMD_SETPOINT = 'SETPOINT'
- async set(setpoint: float) None[source]
Set the target position for the actuator.
- Parameters:
setpoint – Desired position in device units (typically µm in closed-loop, V in open-loop)
Example
>>> # Move to 75.5 micrometers (closed-loop) or volts (open-loop) >>> await channel.setpoint.set(75.5)
Note
Value is clamped to actuator range by device
Movement speed affected by slew rate settings
In closed-loop mode, position is actively controlled by the amplifier
Closed-loop control capability.
- class ClosedLoopController[source]
Bases:
PiezoCapabilityEnable or disable closed-loop position control.
Closed-loop control uses sensor feedback to actively maintain the actuator at the desired setpoint position. When enabled, the controller compensates for drift, hysteresis, and external loads.
When disabled (open-loop), the actuator operates with direct voltage control without position feedback.
Depending on the device, different closed loop algorithms may be available. In this case, the device will provide a derived controller class with additional methods to configure the specific algorithm type.
Example
>>> controller = channel.closed_loop_controller >>> # Enable closed-loop control >>> await controller.set(True) >>> # Check if enabled >>> is_enabled = await controller.get_enabled() >>> print(f"Closed-loop: {'On' if is_enabled else 'Off'}")
Note
Requires position sensor for feedback
Provides better accuracy and stability than open-loop
May have slower response than open-loop
PID parameters affect closed-loop performance
- CMD_ENABLE = 'CLOSED_LOOP_CONTROLLER_ENABLE'
- __init__(*args, sample_period: int, **kwargs)[source]
Initialize the capability with command execution callback.
- Parameters:
write_cb – Function to execute device commands
device_commands – Mapping of command IDs to device strings
channel_id – Optional channel identifier for multi-channel devices (might be used by subclasses)
- async get_enabled() bool[source]
Check if closed-loop control is currently enabled.
- Returns:
True if closed-loop enabled, False if open-loop
Example
>>> if await channel.closed_loop_controller.get_enabled(): ... print("Using closed-loop control") ... else: ... print("Using open-loop control")
- property sample_period: int
Get the closed-loop controller sampling period in microseconds.
The sampling period defines how often the controller updates its output based on the position feedback. A shorter period allows for faster response but may increase noise sensitivity.
- Returns:
Sampling period in microseconds
Example
>>> period = channel.closed_loop_controller.sample_period >>> print(f"Controller sampling period: {period} µs") >>> freq = 1000000 / period >>> print(f"Control loop frequency: {freq:.0f} Hz")
- async set(enabled: bool) None[source]
Enable or disable closed-loop control.
- Parameters:
enabled – True to enable closed-loop, False for open-loop
Example
>>> # Enable closed-loop for precise positioning >>> await channel.closed_loop_controller.set(True) >>> # Disable for faster response (open-loop) >>> await channel.closed_loop_controller.set(False)
Note
Changing mode may cause position jump
Closed-loop requires properly tuned PID parameters
Slew rate limiting capability.
- class SlewRate[source]
Bases:
PiezoCapabilityControl the maximum rate of change for actuator movement.
Slew rate limiting controls how quickly the actuator can change position or voltage. This prevents mechanical shock, reduces vibration, and protects delicate samples.
Lower slew rates result in slower, smoother movements. Higher rates allow faster response but may cause oscillation or overshoot.
Example
>>> slew_rate_cap = channel.slew_rate >>> # Set gentle slew rate for delicate positioning >>> await slew_rate_cap.set(10.0) # 10 units/second >>> # Query current setting >>> rate = await slew_rate_cap.get() >>> print(f"Slew rate: {rate} units/second")
Note
Units typically V/ms or %/ms, depending on device
Lower values = smoother movement
Zero or maximum may disable rate limiting (device-specific)
Affects both commanded movements and waveform generation
- CMD_RATE = 'SLEW_RATE'
- async get() float[source]
Get the current slew rate limit.
- Returns:
Current slew rate in device units (typically V/ms or %/ms)
Example
>>> current_rate = await channel.slew_rate.get() >>> print(f"Max movement speed: {current_rate} V/ms")
- async set(rate: float) None[source]
Set the slew rate limit.
- Parameters:
rate – Maximum rate of change (units/second, typically V/ms or %/ms)
Example
>>> # Set moderate slew rate >>> await channel.slew_rate.set(50.0) >>> # Set very slow for ultra-smooth motion >>> await channel.slew_rate.set(1.0)
Note
Range is device-specific
Zero may disable limiting (instant response)
Very low values result in slow movements
Control System
PID controller parameter configuration.
- class PIDController[source]
Bases:
PiezoCapabilityConfigure PID controller parameters for closed-loop operation.
Provides access to Proportional, Integral, Derivative, and differential filter parameters that control closed-loop positioning behavior.
PID Control Theory: - P (Proportional): Response proportional to error. Higher = faster but may overshoot - I (Integral): Eliminates steady-state error. Too high causes oscillation - D (Derivative): Dampens oscillation. Higher = more damping but noise sensitive - TF (Diff Filter): Filters derivative term to reduce noise amplification
Example
>>> pid = channel.pid_controller >>> # Set aggressive PID for fast response >>> await pid.set(p=10.0, i=5.0, d=0.5, diff_filter=100.0) >>> # Read current parameters >>> p = await pid.get_p() >>> i = await pid.get_i() >>> print(f"PID: P={p}, I={i}")
Note
Improper tuning causes poor performance and possible damage to actuator!
Start with conservative values and tune incrementally
Only active when closed-loop control is enabled
Parameter ranges are device-specific
- CMD_D = 'PID_CONTROLLER_D'
- CMD_I = 'PID_CONTROLLER_I'
- CMD_P = 'PID_CONTROLLER_P'
- CMD_TF = 'PID_CONTROLLER_TF'
- async get_d() float[source]
Get derivative gain parameter.
- Returns:
Current D (derivative) gain value
Example
>>> d_gain = await channel.pid_controller.get_d()
- async get_diff_filter() float[source]
Get differential filter time constant.
- Returns:
Current differential filter value
Example
>>> tf = await channel.pid_controller.get_diff_filter()
- async get_i() float[source]
Get integral gain parameter.
- Returns:
Current I (integral) gain value
Example
>>> i_gain = await channel.pid_controller.get_i()
- async get_p() float[source]
Get proportional gain parameter.
- Returns:
Current P (proportional) gain value
Example
>>> p_gain = await channel.pid_controller.get_p()
- async set(p: float | None = None, i: float | None = None, d: float | None = None, diff_filter: float | None = None) None[source]
Set PID controller parameters.
Parameters are set independently - only provided values are updated. Omitted parameters remain unchanged.
- Parameters:
p – Proportional gain (higher = faster response, more overshoot)
i – Integral gain (eliminates steady-state error)
d – Derivative gain (dampens oscillation)
diff_filter – Differential filter time constant (noise reduction)
Example
>>> # Set only P and I, leave D and filter unchanged >>> await channel.pid_controller.set(p=8.0, i=4.0) >>> >>> # Fine-tune all parameters >>> await channel.pid_controller.set( ... p=12.0, i=6.0, d=0.01, diff_filter=150.0 ... )
Note
Units and ranges are device-specific
Test parameter changes with small movements first
Higher D gains amplify sensor noise (use diff_filter)
Pre-control factor capability.
- class PreControlFactor[source]
Bases:
PiezoCapabilityConfigure pre-control factor for feedforward control.
The Pre-Control Factor (PCF) provides feedforward compensation to improve control system response. It anticipates required control action based on the setpoint change, reducing settling time and tracking error.
PCF is added to the PID controller output to provide faster initial response to setpoint changes.
Example
>>> pcf = channel.pre_control_factor >>> # Set moderate pre-control >>> await pcf.set(0.5) >>> # Query current value >>> value = await pcf.get() >>> print(f"PCF: {value}")
Note
Typical range: 0.0 (no feedforward) to 1.0 (full feedforward)
Higher values = faster response but potential overshoot
Only active in closed-loop mode
Interacts with PID parameters
Device-specific implementation and range
- CMD_VALUE = 'PCF_VALUE'
- async get() None[source]
Get the current pre-control factor value.
- Returns:
Current PCF value (device-specific units, typically 0.0-1.0)
Example
>>> pcf_value = await channel.pre_control_factor.get() >>> print(f"Feedforward factor: {pcf_value}")
- async set(value: float) None[source]
Set the pre-control factor value.
- Parameters:
value – Pre-control factor (typically 0.0 to 1.0)
Example
>>> # Conservative feedforward >>> await channel.pre_control_factor.set(0.3) >>> # Aggressive feedforward >>> await channel.pre_control_factor.set(0.8)
Note
Exact range is device-specific
Higher values improve response speed
Too high may cause overshoot
Tune in conjunction with PID parameters
Filters
Notch filter capability for resonance suppression.
- class NotchFilter[source]
Bases:
PiezoCapabilityConfigure notch filtering to suppress specific frequency components.
Notch filters (band-stop filters) attenuate signals at a specific frequency while passing all other frequencies. They are used to suppress mechanical resonances that can cause instability or oscillation in closed-loop systems.
Parameters: - Center frequency: The frequency to suppress (resonance peak) - Bandwidth: Width of suppressed frequency band
Example
>>> notch = channel.notch_filter >>> # Suppress 500 Hz resonance >>> await notch.set( ... enabled=True, ... frequency=500.0, ... bandwidth=50.0 ... ) >>> # Check configuration >>> freq = await notch.get_frequency() >>> bw = await notch.get_bandwidth() >>> print(f"Notch at {freq}±{bw/2} Hz")
Note
Center frequency should match mechanical resonance
Narrow bandwidth = precise suppression, may be insufficient
Wide bandwidth = broader suppression, affects more frequencies
Multiple resonances may require cascaded notch filters
- CMD_BANDWIDTH = 'NOTCH_FILTER_BANDWIDTH'
- CMD_ENABLE = 'NOTCH_FILTER_ENABLE'
- CMD_FREQUENCY = 'NOTCH_FILTER_FREQUENCY'
- async get_bandwidth() float[source]
Get the notch filter bandwidth.
- Returns:
Bandwidth in Hz
Example
>>> bw = await channel.notch_filter.get_bandwidth() >>> freq = await channel.notch_filter.get_frequency() >>> print(f"Suppressing {freq-bw/2:.0f}-{freq+bw/2:.0f} Hz")
- async get_enabled() bool[source]
Check if notch filter is enabled.
- Returns:
True if filter is active, False if bypassed
Example
>>> if await channel.notch_filter.get_enabled(): ... print("Notch filter active")
- async get_frequency() float[source]
Get the notch center frequency.
- Returns:
Center frequency in Hz
Example
>>> freq = await channel.notch_filter.get_frequency() >>> print(f"Suppressing {freq} Hz")
- async set(enabled: bool | None = None, frequency: float | None = None, bandwidth: float | None = None) None[source]
Configure notch filter parameters.
- Parameters:
enabled – True to enable filtering, False to bypass
frequency – Center frequency to suppress (Hz)
bandwidth – Width of suppression band (Hz)
Example
>>> # Enable notch at 750 Hz with 100 Hz bandwidth >>> await channel.notch_filter.set( ... enabled=True, ... frequency=750.0, ... bandwidth=100.0 ... ) >>> # Just adjust bandwidth >>> await channel.notch_filter.set(bandwidth=80.0)
Note
Only provided parameters are updated
Frequency range is device-specific
Bandwidth affects depth and width of suppression
Low-pass filter capability for signal conditioning.
- class LowPassFilter[source]
Bases:
PiezoCapabilityConfigure low-pass filtering of position or control signals.
Low-pass filters attenuate high-frequency noise and oscillations while allowing low-frequency signals to pass. This improves signal quality and reduces noise in position measurements or control output.
The cutoff frequency determines which frequencies are filtered: - Signals below cutoff: pass through unaffected - Signals above cutoff: progressively attenuated
Example
>>> lpf = channel.low_pass_filter >>> # Enable with 100 Hz cutoff >>> await lpf.set(enabled=True, cutoff_frequency=100.0) >>> # Check settings >>> enabled = await lpf.get_enabled() >>> freq = await lpf.get_cutoff_frequency() >>> print(f"LPF: {'On' if enabled else 'Off'}, {freq} Hz")
Note
Lower cutoff = more filtering, slower response
Higher cutoff = less filtering, faster response
Can be applied to sensor input, control output, or both
Adds phase lag proportional to filtering strength
- CMD_CUTOFF_FREQUENCY = 'LOW_PASS_FILTER_CUTOFF_FREQUENCY'
- CMD_ENABLE = 'LOW_PASS_FILTER_ENABLE'
- async get_cutoff_frequency() float[source]
Get the current cutoff frequency.
- Returns:
Cutoff frequency in Hz
Example
>>> freq = await channel.low_pass_filter.get_cutoff_frequency() >>> print(f"Filtering above {freq} Hz")
- async get_enabled() bool[source]
Check if low-pass filter is enabled.
- Returns:
True if filter is active, False if bypassed
Example
>>> if await channel.low_pass_filter.get_enabled(): ... print("Filtering active")
- async set(enabled: bool | None = None, cutoff_frequency: float | None = None) None[source]
Configure low-pass filter parameters.
- Parameters:
enabled – True to enable filtering, False to bypass
cutoff_frequency – -3dB cutoff frequency in Hz
Example
>>> # Enable with specific cutoff >>> await channel.low_pass_filter.set( ... enabled=True, ... cutoff_frequency=50.0 ... ) >>> # Just change frequency >>> await channel.low_pass_filter.set(cutoff_frequency=200.0)
Note
Only provided parameters are updated
Cutoff range is device-specific
Typical range: 1 Hz to several kHz
Error signal low-pass filter for PID controller.
- class ErrorLowPassFilter[source]
Bases:
PiezoCapabilityFilter the position error signal in closed-loop control.
Applies low-pass filtering to the error signal (setpoint - position) before it enters the PID controller. This reduces high-frequency noise in the error signal that could cause unstable control behavior.
Unlike the general low-pass filter, this specifically filters the error signal used by the PID controller, helping to stabilize control without filtering the position feedback directly.
Parameters: - Cutoff frequency: Where filtering begins (-3dB point) - Order: Filter steepness (1st, 2nd order, etc.)
Example
>>> err_lpf = channel.error_low_pass_filter >>> # Configure 2nd-order filter at 200 Hz >>> await err_lpf.set( ... cutoff_frequency=200.0, ... order=2 ... ) >>> # Query settings >>> freq = await err_lpf.get_cutoff_frequency() >>> order = await err_lpf.get_order() >>> print(f"{order}-order error filter at {freq} Hz")
Note
Only affects closed-loop control
Higher order = steeper rolloff, more phase lag
Lower cutoff = more filtering, slower response
Helps stabilize noisy systems
- CMD_CUTOFF_FREQUENCY = 'ERROR_LOW_PASS_FILTER_CUTOFF_FREQUENCY'
- CMD_ORDER = 'ERROR_LOW_PASS_FILTER_ORDER'
- async get_cutoff_frequency() float[source]
Get the current error filter cutoff frequency.
- Returns:
Cutoff frequency in Hz
Example
>>> freq = await channel.error_low_pass_filter.get_cutoff_frequency() >>> print(f"Error signal filtered above {freq} Hz")
- async get_order() int[source]
Get the current filter order.
- Returns:
Filter order (1, 2, 3, or 4 typically)
Example
>>> order = await channel.error_low_pass_filter.get_order() >>> rolloff = order * 20 # dB/decade >>> print(f"{order}-order filter ({rolloff} dB/decade rolloff)")
- async set(cutoff_frequency: float | None = None, order: int | None = None) None[source]
Configure error signal filter parameters.
- Parameters:
cutoff_frequency – -3dB cutoff frequency in Hz
order – Filter order (1 = 1st order, 2 = 2nd order, etc.)
Example
>>> # Set moderate filtering >>> await channel.error_low_pass_filter.set( ... cutoff_frequency=150.0, ... order=1 ... ) >>> # Increase filter steepness >>> await channel.error_low_pass_filter.set(order=2)
Note
Only provided parameters are updated
Higher order provides sharper cutoff but more phase lag
Typical orders: 1 (gentlest) to 4 (steepest)
Coordinate with PID tuning for best stability
Signal Generation
Modulation input source selection capability.
- class ModulationSource[source]
Bases:
PiezoCapabilitySelect the source for setpoint modulation input.
Configures which signal source is used to modulate the actuator position or voltage. Modulation allows dynamic control from external signals or internal waveform generators.
Common modulation sources: - External analog input (0-10V) - Internal waveform generator - Serial commands
Example
>>> # Device-specific enum >>> from psj_lib import ModulationSourceTypes >>> >>> mod = channel.modulation_source >>> # Use internal waveform generator >>> await mod.set_source(ModulationSourceTypes.INTERNAL_WAVEFORM) >>> # Check current source >>> source = await mod.get_source() >>> print(f"Modulation from: {source.name}")
Note
Modulation mode must be enabled separately
Input range and scaling are device-specific
External input typically 0-10V
Source enum is device-specific
- CMD_SOURCE = 'MODULATION_SOURCE'
- __init__(*args, sources: type[ModulationSourceTypes], **kwargs) None[source]
Initialize modulation source capability.
- Parameters:
sources – Device-specific ModulationSourceTypes enum type
- async get_source() ModulationSourceTypes[source]
Get the current modulation input source.
- Returns:
Current modulation source enum value, or UNKNOWN if device returns unrecognized value
Example
>>> source = await channel.modulation_source.get_source() >>> if source == ModulationSourceTypes.EXTERNAL_INPUT: ... print("Using external modulation input")
Note
Returns UNKNOWN for unrecognized device responses
Source enum is device-specific
- async set_source(source: ModulationSourceTypes) None[source]
Set the modulation input source.
- Parameters:
source – Modulation source from device-specific enum
- Raises:
ValueError – If source is not from the correct device-specific enum
Example
>>> from psj_lib import ModulationSourceTypes >>> await channel.modulation_source.set_source( ... ModulationSourceTypes.EXTERNAL_INPUT ... )
Note
Source must be from device-specific enum
Invalid sources raise ValueError
May need to enable modulation mode separately
- class ModulationSourceTypes[source]
Bases:
EnumBase enumeration for modulation input sources.
Device-specific implementations define available modulation sources. Common sources include external analog input, internal waveform generator, or digital sources.
- UNKNOWN
Unrecognized or invalid source value
Note
Device-specific subclasses define actual sources
UNKNOWN used for unrecognized device responses
- property UNKNOWN
Monitor output source selection capability.
- class MonitorOutput[source]
Bases:
PiezoCapabilitySelect signal source for analog monitor output.
Configures which internal signal is routed to the device’s analog monitor output connector. This allows real-time observation of various internal signals using an oscilloscope or data acquisition system.
Available sources depend on device model and may include: - Position sensor value - Commanded setpoint - Output voltage - Control error signal - And more…
Example
>>> # Device-specific enum >>> from psj_lib import MonitorOutputSource >>> >>> monitor = channel.monitor_output >>> # Route position to monitor output >>> await monitor.set_source(MonitorOutputSource.POSITION) >>> # Check current source >>> source = await monitor.get_source() >>> print(f"Monitoring: {source.name}")
Note
Output is typically 0-10V
Scaling depends on device and selected source
Useful for debugging and real-time monitoring
Source enum is device-specific
- CMD_OUTPUT_SRC = 'MONITOR_OUTPUT_SRC'
- __init__(*args, sources: type[MonitorOutputSource], **kwargs) None[source]
Initialize monitor output capability.
- Parameters:
sources – Device-specific MonitorOutputSource enum type
- async get_source() MonitorOutputSource[source]
Get the current monitor output source.
- Returns:
Current monitor output source enum value, or UNKNOWN if device returns unrecognized value
Example
>>> source = await channel.monitor_output.get_source() >>> if source == MonitorOutputSource.POSITION: ... print("Monitoring position sensor")
Note
Returns UNKNOWN for unrecognized device responses
Source enum is device-specific
- async set_source(source: MonitorOutputSource) None[source]
Set the monitor output signal source.
- Parameters:
source – Monitor output source from device-specific enum
- Raises:
ValueError – If source is not from the correct device-specific enum
Example
>>> from psj_lib import MonitorOutputSource >>> await channel.monitor_output.set_source( ... MonitorOutputSource.SETPOINT ... )
Note
Source must be from device-specific enum
Invalid sources raise ValueError
Monitor output updates in real-time
- class MonitorOutputSource[source]
Bases:
EnumBase enumeration for monitor output signal sources.
Device-specific implementations define available monitor sources. Common sources include position, setpoint, voltage, error signal, etc.
- UNKNOWN
Unrecognized or invalid source value
Note
Device-specific subclasses define actual sources
UNKNOWN used for unrecognized device responses
- property UNKNOWN
Static waveform generator for periodic signal generation.
- class StaticWaveformGenerator[source]
Bases:
PiezoCapabilityGenerate periodic waveforms for actuator modulation.
The static waveform generator produces continuous periodic signals (sine, square, triangle, etc.) that can be used for: - Scanning applications - Vibration testing - Frequency response characterization - Dynamic positioning
Configurable parameters: - Frequency: Rate of oscillation (Hz) - Amplitude: Peak-to-peak magnitude - Offset: DC bias level - Duty cycle: Pulse width for square/pulse waveforms
The generated waveform can typically be used as the primary control signal.
Example
>>> wfg = channel.static_waveform_generator >>> # Generate 10 Hz sine wave, 20µm amplitude, centered at 50µm >>> await wfg.set( ... frequency=10.0, ... amplitude=20.0, ... offset=50.0 ... ) >>> # Create square wave with 30% duty cycle >>> await wfg.set( ... frequency=5.0, ... duty_cycle=30.0 # 30% high time ... ) >>> # Query current settings >>> freq = await wfg.get_frequency() >>> amp = await wfg.get_amplitude() >>> print(f"Generating {freq} Hz, ±{amp/2} µm")
Note
May require modulation source selection to use waveform output
Waveform type (sine/square/triangle) is device-specific
Frequency is limited by device output current and actuator resonance frequency
Amplitude is limited by actuator travel range
- CMD_AMPLITUDE = 'STATIC_WAVEFORM_AMPLITUDE'
- CMD_DUTY_CYCLE = 'STATIC_WAVEFORM_DUTY_CYCLE'
- CMD_FREQUENCY = 'STATIC_WAVEFORM_FREQUENCY'
- CMD_OFFSET = 'STATIC_WAVEFORM_OFFSET'
- async get_amplitude() float[source]
Get waveform amplitude.
- Returns:
Amplitude in position units (typically µm or V)
Example
>>> amp = await wfg.get_amplitude() >>> print(f"Oscillating ±{amp:.1f} µm from center")
- async get_duty_cycle() float[source]
Get pulse duty cycle percentage.
- Returns:
Duty cycle in percent (0-100%)
Example
>>> duty = await wfg.get_duty_cycle() >>> print(f"Pulse: {duty:.0f}% high, {100-duty:.0f}% low")
Note
50% = symmetric wave
- async get_frequency() float[source]
Get waveform frequency.
- Returns:
Frequency in Hz
Example
>>> freq = await wfg.get_frequency() >>> period = 1.0 / freq >>> print(f"Period: {period*1000:.1f} ms")
- async get_offset() float[source]
Get waveform DC offset (center position).
- Returns:
Offset in position units (typically µm or V)
Example
>>> offset = await wfg.get_offset() >>> amp = await wfg.get_amplitude() >>> print(f"Range: {offset-amp:.1f} to {offset+amp:.1f} µm")
- async set(frequency: float | None = None, amplitude: float | None = None, offset: float | None = None, duty_cycle: float | None = None) None[source]
Configure waveform generator parameters.
- Parameters:
frequency – Oscillation frequency in Hz
amplitude – Amplitude in position units (typically µm or V)
offset – DC offset/center position in position units (typically µm or V)
duty_cycle – Pulse width percentage (0-100%)
Example
>>> # Slow sine wave scan >>> await wfg.set( ... frequency=0.5, # 0.5 Hz (2 second period) ... amplitude=100.0, # 100µm amplitude ... offset=50.0 # Centered at 50µm ... ) >>> >>> # Fast square wave with asymmetric duty cycle >>> await wfg.set( ... frequency=100.0, ... amplitude=10.0, ... duty_cycle=25.0 # 25% high, 75% low ... )
Note
Only provided parameters are updated
Amplitude is amplitude
Actual range: [offset - amplitude, offset + amplitude]
Frequency is limited by device output current and actuator resonance frequency
Amplitude is limited by actuator travel range
Data Acquisition
Data recorder capability for capturing device signals.
- class DataRecorder[source]
Bases:
PiezoCapabilityRecord device signals to internal memory for later retrieval.
The data recorder captures signals (position, setpoint, voltage, etc.) at high speed into device memory. Data can be retrieved after recording completes for analysis and plotting.
Features: - Multiple channels (device dependent, typically 2) - Configurable memory length - Stride (decimation) for longer time spans
Typical workflow: 1. Configure memory length and stride 2. Start recording with start() or by using device specific autostart modes
(setpoint, waveform generator)
Wait for recording to complete
Retrieve data with get_all_data() or get_data()
Example
>>> recorder = device.data_recorder >>> # Configure for 10000 samples, no decimation >>> await recorder.set(memory_length=10000, stride=1) >>> # Start recording >>> await recorder.start() >>> # ... perform motion ... >>> # Retrieve channel 1 data with progress callback >>> def progress(current, total): ... print(f"Downloaded {current}/{total} samples") >>> data = await recorder.get_all_data( ... DataRecorderChannel.CHANNEL_1, ... callback=progress ... ) >>> print(f"Captured {len(data)} samples")
Note
Memory length limits total capture time
Stride reduces data rate (stride=10 keeps only every 10th sample)
Channels record different signals (device-specific)
Data units depend on recorded signal type
Large data transfers may take several seconds
- CHANNEL_1_IDX = 1
- CHANNEL_2_IDX = 2
- CMD_GET_DATA_1 = 'DATA_RECORDER_GET_DATA_1'
- CMD_GET_DATA_2 = 'DATA_RECORDER_GET_DATA_2'
- CMD_MEMORY_LENGTH = 'DATA_RECORDER_MEMORY_LENGTH'
- CMD_PTR = 'DATA_RECORDER_POINTER'
- CMD_START_RECORDING = 'DATA_RECORDER_START_RECORDING'
- CMD_STRIDE = 'DATA_RECORDER_RECORD_STRIDE'
- __init__(*args, sample_period: int, **kwargs) None[source]
Initialize the capability with command execution callback.
- Parameters:
write_cb – Function to execute device commands
device_commands – Mapping of command IDs to device strings
channel_id – Optional channel identifier for multi-channel devices (might be used by subclasses)
- async get_all_data(channel: DataRecorderChannel, max_length: int | None = None, callback: ProgressCallback | None = None) list[float][source]
Retrieve all recorded data from specified channel.
Downloads entire recording buffer from device memory. This may take several seconds for large datasets.
- Args:
channel: Which channel to retrieve max_length: Maximum number of samples to retrieve. If None,
retrieves full configured length.
callback: Optional progress callback function(current, total)
- Returns:
List of all recorded samples
- Example:
>>> # Simple retrieval >>> data = await recorder.get_all_data( ... DataRecorderChannel.CHANNEL_1 ... ) >>> >>> # With progress updates >>> def show_progress(current, total): ... percent = 100 * current / total ... print(f"
- Download: {percent:.0f}%”, end=””)
>>> >>> data = await recorder.get_all_data( ... DataRecorderChannel.CHANNEL_2, ... callback=show_progress ... ) >>> print(f"
Retrieved {len(data)} samples”)
- Note:
Transfer time depends on memory_length
Progress callback useful for long transfers
Returns data in chronological order
Units depend on recorded signal type
- async get_data(channel: DataRecorderChannel, index: int | None = None) float[source]
Read a single data sample from specified channel.
- Parameters:
channel – Which channel to read from
index – Sample index (0 to memory_length-1), or None to read next sequential sample
- Returns:
Single data sample value
Example
>>> # Read sample at index 100 >>> value = await recorder.get_data( ... DataRecorderChannel.CHANNEL_1, ... index=100 ... ) >>> # Read next sequential sample >>> value2 = await recorder.get_data( ... DataRecorderChannel.CHANNEL_1 ... )
Note
Setting index resets read pointer
Omitting index reads sequentially
More efficient to use get_all_data() for bulk retrieval
- async get_memory_length() int[source]
Get configured recording length.
- Returns:
Number of samples per channel
Example
>>> length = await recorder.get_memory_length() >>> print(f"Will record {length} samples")
- async get_stride() int[source]
Get decimation stride factor.
- Returns:
Current stride value (1=no decimation)
Example
>>> stride = await recorder.get_stride() >>> print(f"Recording every {stride} sample(s)")
- property sample_period: int
Get base sample period in microseconds.
- Returns:
Sample period in microseconds
Example
>>> period = recorder.sample_period >>> print(f"Base sample period: {period} µs")
- property sample_rate: float
Get base sample rate in Hz.
- Returns:
Sample rate in Hz
- async set(memory_length: int | None = None, stride: int | None = None) None[source]
Configure data recorder parameters.
- Parameters:
memory_length – Number of samples to record per channel
stride – Decimation factor (1=all samples, 10=every 10th sample)
Example
>>> # Record 5000 samples at full rate >>> await recorder.set(memory_length=5000, stride=1) >>> # Record 50000 samples, keeping every 100th >>> await recorder.set(memory_length=50000, stride=100)
Note
Maximum memory_length is device-specific
Stride>1 allows longer time spans at lower data rate
Effective sample period = sample_period * stride
- async start() None[source]
Start data recording.
Begins capturing data to device memory. Recording continues until memory is full or device is reset.
Some devices may support automatic recording start based on setpoint changes or waveform generator activity.
Example
>>> await recorder.start() >>> # Recording now active >>> await asyncio.sleep(1.0) # Record for 1 second >>> # Retrieve data...
Note
Previous recording data is overwritten
Recording may stop automatically when buffer full
Check device status to determine if still recording
- class DataRecorderChannel[source]
Bases:
EnumData recorder input channels.
Identifies which recorder channel to access for configuration or data retrieval.
Subclasses may define specific channel meanings.
Trigger output generation capability.
- class TriggerDataSource[source]
Bases:
EnumData source for trigger generation.
Determines which signal is monitored for trigger condition.
- POSITION
Monitor actual position sensor value
- SETPOINT
Monitor commanded setpoint value
- POSITION = 0
- SETPOINT = 1
- class TriggerEdge[source]
Bases:
EnumTrigger edge detection mode.
Configures which signal transitions generate trigger pulses.
- DISABLED
No trigger output
- RISING
Trigger on upward crossings only
- FALLING
Trigger on downward crossings only
- BOTH
Trigger on both upward and downward crossings
- BOTH = 3
- DISABLED = 0
- FALLING = 2
- RISING = 1
- class TriggerOut[source]
Bases:
PiezoCapabilityConfigure hardware trigger output generation.
Generates digital trigger pulses when monitored signal crosses threshold values. Useful for synchronizing external equipment (cameras, data acquisition, etc.) with actuator movement.
Trigger Options: - Window: Triggers when signal enters/exits range [start, stop] - Interval: Periodic triggers at fixed intervals within window - Edge sensitivity: Rising, falling, or both edges
Example
>>> trigger = channel.trigger_out >>> # Trigger every 10µm from 20µm to 80µm >>> await trigger.set( ... start_value=20.0, ... stop_value=80.0, ... interval=10.0, ... length=100, # 100 cycles pulse ... edge=TriggerEdge.BOTH, ... src=TriggerDataSource.POSITION ... ) >>> # Query configuration >>> start = await trigger.get_start_value() >>> interval = await trigger.get_interval() >>> print(f"Trigger every {interval}µm from {start}µm")
Note
Trigger output is typically 0V/5V TTL signal
Pulse length in cycles (device-specific)
Interval generates periodic pulses in window
DISABLED edge stops all trigger output
- CMD_EDGE = 'TRIGGER_OUT_EDGE'
- CMD_INTERVAL = 'TRIGGER_OUT_INTERVAL'
- CMD_LENGTH = 'TRIGGER_OUT_LENGTH'
- CMD_SRC = 'TRIGGER_OUT_SRC'
- CMD_START = 'TRIGGER_OUT_START'
- CMD_STOP = 'TRIGGER_OUT_STOP'
- async get_edge() TriggerEdge[source]
Get trigger edge detection mode.
- Returns:
Current edge detection mode
- async get_interval() float[source]
Get periodic trigger interval.
- Returns:
Interval spacing in source signal units
- async get_src() TriggerDataSource[source]
Get trigger data source.
- Returns:
Current monitored signal source
- async get_start_value() float[source]
Get trigger window start threshold.
- Returns:
Start value in source signal units
- async get_stop_value() float[source]
Get trigger window stop threshold.
- Returns:
Stop value in source signal units
- async set(start_value: float | None = None, stop_value: float | None = None, interval: float | None = None, length: int | None = None, edge: TriggerEdge | None = None, src: TriggerDataSource | None = None) None[source]
Configure trigger output parameters.
- Parameters:
start_value – Window start threshold (units match source)
stop_value – Window stop threshold (units match source)
interval – Periodic trigger spacing within window
length – Trigger pulse duration cycles
edge – Trigger edge detection mode
src – Data source to monitor (position or setpoint)
Example
>>> # Periodic triggers every 5µm >>> await channel.trigger_out.set( ... start_value=0.0, ... stop_value=100.0, ... interval=5.0, ... length=500 # 500 cycles pulse ... )
Note
Only provided parameters are updated
start_value should be less than stop_value
interval=0 disables periodic triggers
Units depend on selected source (typically µm or V)
Configuration
Unit information capability for device measurements.
- class Unit[source]
Bases:
PiezoCapabilityQuery device measurement units for voltage and position.
Provides methods to retrieve the units of measurement for a device’s operation mode.
Example
>>> unit = device.openloop_unit >>> openloop_unit = await unit.get() >>> print(f"Voltage: {openloop_unit}") >>> # Voltage: V or mV depending on device configuration
Note
Units are device-specific and may be configurable
Common voltage units: V, mV
Common position units: µm, mrad
- CMD_UNIT = 'UNIT'
- class Limits[source]
Bases:
PiezoCapabilityRead-only lower/upper limit capability.
Used for commands that expose admissible parameter bounds, for example voltage or position ranges.
- CMD_LOWER_LIMIT = 'LOWER_LIMIT'
- CMD_UPPER_LIMIT = 'UPPER_LIMIT'
- async get_lower() float[source]
Get the lower limit for the specified capability.
- Returns:
The lower limit value as a float.
Display capabilities for PSJ devices.
- class Display[source]
Bases:
PiezoCapabilityDisplay settings capability.
Provides access to the device display brightness in percent.
- CMD_BRIGHTNESS = 'DISPLAY_BRIGHTNESS'
- class MultiSetpoint[source]
Bases:
PiezoCapabilityCapability for devices with multiple independent setpoints.
Some devices allow synchronous control of multiple channel setpoints. This capability provides a unified interface for setting these setpoints together, ensuring coordinated updates and consistent timing.
- CMD_SETPOINTS = 'SETPOINTS'
- class MultiPosition[source]
Bases:
PiezoCapabilityCapability for reading multiple channel positions synchronously.
- CMD_POSITIONS = 'POSITIONS'
Factory reset capability.
- class FactoryReset[source]
Bases:
PiezoCapabilityReset device to factory default settings.
Restores all device parameters to their factory default values. Depending on the device implementation, this includes PID parameters, filters, control modes, and all other configurable settings.
WARNING: This operation cannot be undone!
Example
>>> reset = device.factory_reset >>> # Save current settings first! >>> await device.backup("settings_backup.txt") >>> # Reset to factory defaults >>> await reset.execute() >>> # Device parameters now at factory values
Note
All custom settings are lost
Use device.backup() to save configuration first
- CMD_RESET = 'FACTORY_RESET'
- async execute() None[source]
Execute factory reset operation.
WARNING: This permanently erases all custom settings!
Example
>>> # Backup first! >>> await device.backup("config_backup.txt") >>> # Then reset >>> await device.factory_reset.execute() >>> print("Device reset to factory defaults")
Note
Cannot be undone
Cooling fan control capability.
- class Fan[source]
Bases:
PiezoCapabilityControl device/channel cooling fan operation.
Enables or disables the internal cooling fan for thermal management. The fan helps dissipate heat from power electronics during operation.
Example
>>> fan = channel.fan >>> # Enable cooling fan >>> await fan.set(True) >>> # Check fan status >>> is_running = await fan.get_enabled() >>> print(f"Fan: {'On' if is_running else 'Off'}")
Note
Not all devices have controllable fans
Some fans run automatically based on temperature
Disabling may cause thermal shutdown under heavy load
Fan noise may affect sensitive measurements
- CMD_ENABLE = 'FAN_ENABLE'
- async get_enabled() bool[source]
Check if the cooling fan is enabled.
- Returns:
True if fan is running, False if stopped
Example
>>> if await device.fan.get_enabled(): ... print("Fan is running") ... else: ... print("Fan is off")
- async set(enabled: bool) None[source]
Enable or disable the cooling fan.
- Parameters:
enabled – True to turn fan on, False to turn off
Example
>>> # Turn fan on for active cooling >>> await channel.fan.set(True) >>> # Turn off for quiet operation >>> await channel.fan.set(False)
Note
Disabling may limit operating power
Monitor temperature if fan is disabled
d-Drive Specific Capabilities
- class DDriveStatusRegister[source]
Bases:
StatusRegisterd-Drive hardware status register with bit-mapped state information.
Decodes the d-Drive status register word into individual boolean and enumerated properties representing the amplifier channel’s current state.
The status register provides real-time information about: - Actuator connection status - Position sensor type - Output voltage enable state - Closed-loop control activation - Active waveform type - Filter enable states
- Properties:
actor_plugged (bool): Actuator physically connected sensor_type (SensorType): Position sensor type (STRAIN_GAUGE, CAPACITIVE, etc.) piezo_voltage_enabled (bool): High voltage output enabled closed_loop (bool): Closed-loop control active waveform_generator_status (DDriveWaveformGeneratorStatus): Active waveform type notch_filter_active (bool): Notch filter enabled low_pass_filter_active (bool): Low-pass filter enabled
Example
>>> status = await channel.status_register.get() >>> print(f"Actuator plugged: {status.actor_plugged}") >>> print(f"Sensor type: {status.sensor_type.name}") >>> print(f"Closed-loop: {status.closed_loop}") >>> print(f"Waveform: {status.waveform_generator_status.name}") >>> print(f"Notch filter: {status.notch_filter_active}")
Note
Status is read-only (reports hardware state)
Decoded from 16-bit status register word
Bit positions defined by d-Drive firmware
- property actor_plugged: bool
Actuator connection status (bit 0).
- Returns:
True if piezo actuator is physically connected and detected.
- Return type:
bool
Note
The d-Drive automatically detects actuator connection on power-up or when an actuator is plugged in.
- property closed_loop: bool
Closed-loop control activation status (bit 7).
- Returns:
True if sensor-based feedback control is active.
- Return type:
bool
Note
When True: PID controller drives actuator to match setpoint
When False: Open-loop operation (direct voltage control)
Requires valid sensor signal to enable
- property low_pass_filter_active: bool
Low-pass filter enable status (bit 13).
- Returns:
True if low-pass filter is enabled and actively filtering.
- Return type:
bool
Note
Low-pass filter reduces high-frequency noise in position or control signals for smoother operation.
- property notch_filter_active: bool
Notch filter enable status (bit 12).
- Returns:
True if notch filter is enabled and actively filtering.
- Return type:
bool
Note
Notch filter suppresses specific frequencies (typically mechanical resonances) to improve closed-loop stability.
- property piezo_voltage_enabled: bool
High voltage output enable status (bit 6).
- Returns:
- True if amplifier output stage is enabled and driving
the piezo actuator.
- Return type:
bool
Note
When False, the output is disabled for safety. This may occur: - After power-up when no actuator is detected - On error conditions - When explicitly disabled by user
- property sensor_type: SensorType
Position sensor type identification (bits 1-2).
- Returns:
- Type of position sensor in connected actuator.
Common types: STRAIN_GAUGE, CAPACITIVE, LVDT, etc.
- Return type:
Note
Sensor type is auto-detected from actuator’s identification. Different sensor types may have different characteristics (resolution, linearity, temperature sensitivity).
- property waveform_generator_status: DDriveWaveformGeneratorStatus
Active waveform generator type (bits 9-11).
- Returns:
- Currently active waveform type.
INACTIVE if no waveform is running, otherwise SINE, TRIANGLE, RECTANGLE, NOISE, or SWEEP.
- Return type:
Example
>>> status = await channel.status_register.get() >>> wfg = status.waveform_generator_status >>> if wfg != DDriveWaveformGeneratorStatus.INACTIVE: ... print(f"Active waveform: {wfg.name}")
Note
Returns UNKNOWN if hardware reports unrecognized value.
- class DDriveWaveformGeneratorStatus[source]
Bases:
Enumd-Drive waveform generator status from hardware status register.
Indicates which waveform type is currently active in the waveform generator. This is read from bits 9-11 of the d-Drive status register.
- INACTIVE
No waveform generation active
- SINE
Sinusoidal waveform active
- TRIANGLE
Triangle waveform active
- RECTANGLE
Rectangle/square waveform active
- NOISE
Noise waveform active
- SWEEP
Sweep/ramp waveform active
- UNKNOWN
Status value not recognized (error condition)
Example
>>> status = await channel.status_register.get() >>> if status.waveform_generator_status == DDriveWaveformGeneratorStatus.SINE: ... print("Sine wave is running")
- INACTIVE = 0
- NOISE = 4
- RECTANGLE = 3
- SINE = 1
- SWEEP = 5
- TRIANGLE = 2
- UNKNOWN = 99
- class DDriveScanType[source]
Bases:
Enumd-Drive automated scan patterns.
Defines automated scan sequences for the waveform generator, providing single or double scan cycles with different waveform shapes.
- OFF
No automated scanning
- SINE_ONCE
Single sinusoidal scan (one complete cycle)
- TRIANGLE_ONCE
Single triangular scan (up and down)
- SINE_TWICE
Double sinusoidal scan (two complete cycles)
- TRIANGLE_TWICE
Double triangular scan (two up/down cycles)
- UNKNOWN
Unrecognized scan type (error condition)
Example
>>> from psj_lib import DDriveScanType >>> # Start single triangle scan >>> await channel.waveform_generator.start_scan( ... DDriveScanType.TRIANGLE_ONCE ... ) >>> # Wait for completion >>> while await channel.waveform_generator.is_scan_running(): ... await asyncio.sleep(0.1)
Note
Scan automatically stops after completing specified cycles.
- OFF = 0
- SINE_ONCE = 1
- SINE_TWICE = 3
- TRIANGLE_ONCE = 2
- TRIANGLE_TWICE = 4
- UNKNOWN = 99
- class DDriveWaveformGenerator[source]
Bases:
PiezoCapabilityd-Drive multi-waveform generator for scanning and modulation.
Provides built-in waveform generation with multiple waveform types for automated scanning, periodic motion, and signal modulation. Each waveform type has independent parameter configuration.
Available waveforms: - SINE: Sinusoidal motion with amplitude, offset, and frequency control - TRIANGLE: Triangular scanning with adjustable duty cycle - RECTANGLE: Square wave with duty cycle control - NOISE: Random noise - SWEEP: Linear frequency sweep/ramp
Features: - Multiple waveform types with independent parameters - Sub-generator properties for each waveform type - Automated scan sequences (single or double cycles) - Synchronous with 50 kHz control loop
Example
>>> from psj_lib import DDriveWaveformType, DDriveScanType >>> wfg = channel.waveform_generator >>> >>> # Configure sine wave for 10 Hz scanning >>> await wfg.sine.set( ... amplitude=20.0, # 20 µm peak-to-peak ... offset=50.0, # Center at 50 µm ... frequency=10.0 # 10 Hz ... ) >>> await wfg.set_waveform_type(DDriveWaveformType.SINE) >>> >>> # Configure triangle with asymmetric duty cycle >>> await wfg.triangle.set( ... amplitude=30.0, ... offset=50.0, ... frequency=5.0, ... duty_cycle=70.0 # 70% rise, 30% fall ... ) >>> await wfg.set_waveform_type(DDriveWaveformType.TRIANGLE) >>> >>> # Start automated single scan >>> await wfg.start_scan(DDriveScanType.TRIANGLE_ONCE) >>> while await wfg.is_scan_running(): ... await asyncio.sleep(0.1)
- Properties:
sine: Sine waveform sub-generator configuration triangle: Triangle waveform sub-generator configuration rectangle: Rectangle waveform sub-generator configuration noise: Noise generator configuration sweep: Sweep/ramp generator configuration
Note
Configure waveform parameters before activating type
Only one waveform type active at a time
Waveform generation rate synchronized with control loop (50 kHz)
- CMD_NOISE_AMPLITUDE = 'WFG_NOISE_AMPLITUDE'
- CMD_NOISE_OFFSET = 'WFG_NOISE_OFFSET'
- CMD_REC_AMPLITUDE = 'WFG_RECTANGLE_AMPLITUDE'
- CMD_REC_DUTY_CYCLE = 'WFG_RECTANGLE_DUTY_CYCLE'
- CMD_REC_FREQUENCY = 'WFG_RECTANGLE_FREQUENCY'
- CMD_REC_OFFSET = 'WFG_RECTANGLE_OFFSET'
- CMD_SCAN_START = 'WFG_SCAN_START'
- CMD_SCAN_TYPE = 'WFG_SCAN_TYPE'
- CMD_SINE_AMPLITUDE = 'WFG_SINE_AMPLITUDE'
- CMD_SINE_FREQUENCY = 'WFG_SINE_FREQUENCY'
- CMD_SINE_OFFSET = 'WFG_SINE_OFFSET'
- CMD_SWEEP_AMPLITUDE = 'WFG_SWEEP_AMPLITUDE'
- CMD_SWEEP_OFFSET = 'WFG_SWEEP_OFFSET'
- CMD_SWEEP_TIME = 'WFG_SWEEP_TIME'
- CMD_TRI_AMPLITUDE = 'WFG_TRIANGLE_AMPLITUDE'
- CMD_TRI_DUTY_CYCLE = 'WFG_TRIANGLE_DUTY_CYCLE'
- CMD_TRI_FREQUENCY = 'WFG_TRIANGLE_FREQUENCY'
- CMD_TRI_OFFSET = 'WFG_TRIANGLE_OFFSET'
- CMD_WFG_TYPE = 'WFG_TYPE'
- __init__(*args, **kwargs) None[source]
Initialize the capability with command execution callback.
- Parameters:
write_cb – Function to execute device commands
device_commands – Mapping of command IDs to device strings
channel_id – Optional channel identifier for multi-channel devices (might be used by subclasses)
- async get_waveform_type() DDriveWaveformType[source]
Query currently active waveform type.
- Returns:
- Currently active waveform type, or NONE
if generator is disabled.
- Return type:
Example
>>> wfg_type = await wfg.get_waveform_type() >>> print(f"Active waveform: {wfg_type.name}")
- async is_scan_running() bool[source]
Check if automated scan is currently active.
- Returns:
True if scan is running, False if idle or completed.
- Return type:
bool
Example
>>> await wfg.start_scan(DDriveScanType.SINE_ONCE) >>> # Wait for scan to complete >>> while await wfg.is_scan_running(): ... await asyncio.sleep(0.05) >>> print("Scan completed")
- property noise: StaticWaveformGenerator
Noise generator configuration.
- Returns:
- Configuration interface for random
noise with amplitude and offset parameters.
- Return type:
Example
>>> await wfg.noise.set( ... amplitude=2.0, # ±2 µm random variation ... offset=50.0 # Centered at 50 µm ... )
- property rectangle: StaticWaveformGenerator
Rectangle waveform sub-generator configuration.
- Returns:
- Configuration interface for rectangular
(square) waveform with amplitude, offset, frequency, and duty cycle.
- Return type:
Example
>>> await wfg.rectangle.set( ... amplitude=25.0, ... offset=50.0, ... frequency=10.0, ... duty_cycle=30.0 # 30% high, 70% low ... )
Note
duty_cycle controls high/low ratio: 50% is square wave.
- async set_waveform_type(waveform_type: DDriveWaveformType) None[source]
Activate specific waveform type.
Switches the active waveform generator to the specified type. The waveform parameters should be configured via the corresponding sub-generator property before activation.
- Parameters:
waveform_type – Type of waveform to activate (SINE, TRIANGLE, RECTANGLE, NOISE, SWEEP, or NONE to disable).
Example
>>> from psj_lib import DDriveWaveformType >>> # Configure then activate sine wave >>> await wfg.sine.set(amplitude=20.0, offset=50.0, frequency=10.0) >>> await wfg.set_waveform_type(DDriveWaveformType.SINE) >>> # Disable waveform >>> await wfg.set_waveform_type(DDriveWaveformType.NONE)
Note
Setting to NONE disables waveform generation.
- property sine: StaticWaveformGenerator
Sine waveform sub-generator configuration.
- Returns:
- Configuration interface for sinusoidal
waveform with amplitude, offset, and frequency parameters.
- Return type:
Example
>>> await wfg.sine.set( ... amplitude=15.0, # 15 µm amplitude ... offset=50.0, # Centered at 50 µm ... frequency=20.0 # 20 Hz ... )
- async start_scan(scan_type: DDriveScanType) None[source]
Start automated scan sequence.
Initiates an automated scan with the specified pattern. The scan uses the currently configured waveform parameters and runs for the specified number of cycles before stopping automatically.
- Parameters:
scan_type – Type of scan to perform (SINE_ONCE, TRIANGLE_ONCE, SINE_TWICE, TRIANGLE_TWICE, or OFF to stop).
Example
>>> from psj_lib import DDriveScanType >>> # Configure triangle waveform first >>> await wfg.triangle.set( ... amplitude=40.0, ... offset=50.0, ... frequency=2.0 ... ) >>> await wfg.set_waveform_type(DDriveWaveformType.TRIANGLE) >>> # Start single scan >>> await wfg.start_scan(DDriveScanType.TRIANGLE_ONCE) >>> # Monitor progress >>> while await wfg.is_scan_running(): ... pos = await channel.position.get() ... print(f"Position: {pos:.2f}") ... await asyncio.sleep(0.1)
Note
Scan stops automatically after completing cycles
Check status with is_scan_running()
Waveform must be configured before starting scan
- property sweep: StaticWaveformGenerator
Sweep (ramp) generator configuration.
- Returns:
- Configuration interface for linear
sweep with amplitude, offset, and time (instead of frequency).
- Return type:
Example
>>> await wfg.sweep.set( ... amplitude=80.0, # Sweep range 80 µm ... offset=10.0, # Start at 10 µm ... frequency=2.0 # Actually sweep time: 2 seconds ... )
Note
For sweep, ‘frequency’ parameter actually represents sweep time in seconds (linear ramp duration).
- property triangle: StaticWaveformGenerator
Triangle waveform sub-generator configuration.
- Returns:
- Configuration interface for triangular
waveform with amplitude, offset, frequency, and duty cycle.
- Return type:
Example
>>> await wfg.triangle.set( ... amplitude=30.0, ... offset=50.0, ... frequency=5.0, ... duty_cycle=70.0 # 70% rise time, 30% fall time ... )
Note
duty_cycle controls asymmetry: 50% is symmetric, >50% is slower rise, <50% is faster rise.
- class DDriveWaveformType[source]
Bases:
Enumd-Drive waveform generator output types.
Defines the types of waveforms available from the d-Drive’s built-in waveform generator for scanning and modulation applications.
- NONE
No waveform output (generator disabled)
- SINE
Sinusoidal waveform
- TRIANGLE
Triangular waveform
- RECTANGLE
Square/rectangular waveform
- NOISE
Random noise
- SWEEP
Linear frequency sweep/ramp
- UNKNOWN
Unrecognized waveform type (error condition)
Example
>>> from psj_lib import DDriveWaveformType >>> await channel.waveform_generator.set_waveform_type( ... DDriveWaveformType.SINE ... )
- NOISE = 4
- NONE = 0
- RECTANGLE = 3
- SINE = 1
- SWEEP = 5
- TRIANGLE = 2
- UNKNOWN = 99
- class DDriveDataRecorder[source]
Bases:
DataRecorderd-Drive specific data recorder implementation.
The d-Drive data recorder captures two channels simultaneously: - Channel 1 (POSITION): Position sensor signal in device units (µm, mrad, etc.) - Channel 2 (VOLTAGE): Actuator voltage in volts
Recording specifications: - Maximum 500,000 samples per channel - 50 kHz sample rate (20 µs period) maximum - Stride (decimation) from 1 to 65535 - Both channels always record the same length
The d-Drive returns data in a device-specific format that requires special parsing, which this class handles automatically.
Example
>>> recorder = channel.data_recorder >>> # Configure for 1 second at full rate >>> await recorder.set(memory_length=50000, stride=1) >>> await recorder.start() >>> # ... perform motion ... >>> # Get position data >>> pos_data = await recorder.get_all_data( ... DDriveDataRecorderChannel.POSITION ... ) >>> # Get voltage data >>> vol_data = await recorder.get_all_data( ... DDriveDataRecorderChannel.VOLTAGE ... )
Note
get_memory_length() and get_stride() not supported by d-Drive
Use the values you configured with set() to track settings
Data format is automatically parsed from d-Drive response
- async get_data(channel: DataRecorderChannel, index: int | None = None) float[source]
Read a single data sample from specified channel.
The d-Drive returns data in format “m,value” (position) or “u,value” (voltage) which is automatically parsed.
- Parameters:
channel – Which channel to read (POSITION or VOLTAGE)
index – Sample index (0 to memory_length-1), or None for next sequential sample
- Returns:
Single data sample value as float
Example
>>> # Read position at index 100 >>> pos = await recorder.get_data( ... DDriveDataRecorderChannel.POSITION, ... index=100 ... ) >>> # Read next voltage sample >>> voltage = await recorder.get_data( ... DDriveDataRecorderChannel.VOLTAGE ... )
Note
The d-Drive returns “m,value” or “u,value” format. This method automatically extracts the numeric value.
- async get_memory_length() int[source]
Get configured recording length.
- Returns:
Always returns 500000 (hardware maximum) as d-Drive does not support reading back this configuration value.
Note
The d-Drive hardware does not provide a read command for memory length. This returns the maximum hardware capacity. Use recorder.sample_rate to calculate recording duration.
- class DDriveDataRecorderChannel[source]
Bases:
DataRecorderChanneld-Drive data recorder channel aliases with semantic names.
Provides meaningful names for the two hardware data recorder channels in d-Drive amplifiers. The d-Drive always records position on channel 1 and actuator voltage on channel 2.
- POSITION
Position sensor signal (alias for CHANNEL_1). Records actual measured position from the sensor.
- VOLTAGE
Actuator voltage signal (alias for CHANNEL_2). Records the output voltage applied to the piezo actuator.
Example
>>> from psj_lib import DDriveDataRecorderChannel >>> # Configure recorder >>> await channel.data_recorder.set( ... memory_length=50000, # 1 second at 50 kHz ... stride=1 ... ) >>> await channel.data_recorder.start() >>> # ... perform motion ... >>> # Retrieve position data using semantic name >>> pos_data = await channel.data_recorder.get_all_data( ... DDriveDataRecorderChannel.POSITION ... ) >>> # Retrieve voltage data >>> vol_data = await channel.data_recorder.get_all_data( ... DDriveDataRecorderChannel.VOLTAGE ... ) >>> print(f"Recorded {len(pos_data)} position samples") >>> print(f"Recorded {len(vol_data)} voltage samples")
Note
Both channels always record simultaneously
Sample rate: 50 kHz (20 µs period) maximum
Maximum 500,000 samples per channel
POSITION and VOLTAGE are semantic aliases for hardware channels
- POSITION = 1
- VOLTAGE = 2
- __new__(value)
- class DDriveTriggerOut[source]
Bases:
TriggerOutd-Drive hardware trigger output with offset compensation.
Extends base TriggerOut capability with an additional offset parameter for phase compensation. Generates TTL pulses synchronized with actuator position or setpoint values.
The trigger output can generate pulses: - At specific position or voltage ranges - At regular intervals within those ranges - On rising, falling, or both edges - With configurable pulse width - With offset for phase/timing adjustment (d-Drive specific for setpoints)
Example
>>> from psj_lib import TriggerEdge, TriggerDataSource >>> # Generate trigger every 10 µm from 20 to 80 µm >>> await channel.trigger_out.set( ... start_value=20.0, ... stop_value=80.0, ... interval=10.0, # Trigger every 10 µm ... length=1000, # 1000 cycle width (20 ms) ... edge=TriggerEdge.BOTH, # Trigger on up and down motion ... src=TriggerDataSource.POSITION, ... offset=0.5 # 0.5 µm phase compensation ... ) >>> >>> # Read current offset >>> current_offset = await channel.trigger_out.get_offset()
Note
Offset parameter is d-Drive specific extension
Trigger timing resolution depends on control loop rate (50 kHz)
Length specified in cycles (actual time = cycles * 20µs)
- CMD_OFFSET = 'TRIGGER_OUT_OFFSET'
- async get_offset() float[source]
Read current trigger offset value.
- Returns:
Trigger offset in position units.
- Return type:
float
Example
>>> offset = await channel.trigger_out.get_offset() >>> print(f"Trigger offset: {offset:.3f} µm")
Note
Offset is d-Drive specific parameter for phase compensation.
- async set(start_value: float | None = None, stop_value: float | None = None, interval: float | None = None, length: int | None = None, edge: TriggerEdge | None = None, src: TriggerDataSource | None = None, offset: float | None = None) None[source]
Configure trigger output parameters including d-Drive offset.
- Parameters:
start_value – Starting position/voltage for trigger window (in current units).
stop_value – Ending position/voltage for trigger window (in current units).
interval – Spacing between triggers within window (in current units). If None, single trigger at start_value.
length – Trigger pulse width in microseconds.
edge – Trigger on RISING, FALLING, or BOTH position crossings.
src – Data source for trigger comparison (POSITION or SETPOINT).
offset – Phase/timing offset in position units (d-Drive specific, see device manual).
Example
>>> from psj_lib import TriggerEdge, TriggerDataSource >>> # Trigger every 5 µm during upward scan >>> await trigger_out.set( ... start_value=10.0, ... stop_value=90.0, ... interval=5.0, ... length=500, # 500 cycle width (10 ms) ... edge=TriggerEdge.RISING, ... src=TriggerDataSource.POSITION, ... offset=0.2 # Compensate 0.2 µm delay ... )
Note
All parameters optional; only specified values are updated
Offset unique to d-Drive
Trigger activates automatically when actuator enters window
- class DDriveModulationSourceTypes[source]
Bases:
ModulationSourceTypesd-Drive modulation source types for external control input.
Defines available modulation input sources specific to d-Drive amplifiers. These sources allow external control of the actuator position or voltage.
- SERIAL_ENCODER
Serial interface + encoder knob is used for modulation. When sending a setpoint, the encoder offset is set to zero. Rotate the knob to apply an offset.
- SERIAL_ENCODER_ANALOG
Serial interface + encoder knob + analog input is used for modulation. When sending a setpoint, the encoder offset is set to zero. Rotate the knob to apply an offset. Additionally, an analog voltage input (0-10 V) can be used to apply a further offset.
Example
>>> from psj_lib import DDriveModulationSourceTypes >>> await channel.modulation_source.set_source( ... DDriveModulationSourceTypes.SERIAL_ENCODER ... ) >>> current = await channel.modulation_source.get_source() >>> print(f"Active source: {current.name}")
- SERIAL_ENCODER = 0
- SERIAL_ENCODER_ANALOG = 1
- __new__(value)
- class DDriveMonitorOutputSource[source]
Bases:
MonitorOutputSourced-Drive analog monitor output source selection.
Defines internal signals that can be routed to the analog monitor output connector for real-time observation with oscilloscopes or other measurement equipment.
- CLOSED_LOOP_POSITION
Closed-loop position value from sensor feedback.
- SETPOINT
Commanded target position (setpoint value).
- CONTROLLER_VOLTAGE
Internal amplifier control voltage.
- POSITION_ERROR
Position error signal (setpoint - actual position).
- POSITION_ERROR_ABS
Absolute value of position error.
- ACTUATOR_VOLTAGE
Actual voltage at piezo actuator.
- OPEN_LOOP_POSITION
Open-loop position value from sensor feedback.
Example
>>> from psj_lib import DDriveMonitorOutputSource >>> # Monitor position error for tuning >>> await channel.monitor_output.set_source( ... DDriveMonitorOutputSource.POSITION_ERROR ... ) >>> # Monitor final output voltage >>> await channel.monitor_output.set_source( ... DDriveMonitorOutputSource.ACTUATOR_VOLTAGE ... )
Note
Monitor output is a 0-10V analog signal
Useful for debugging, tuning PID, and system analysis
See device manual for detailed signal descriptions
- ACTUATOR_VOLTAGE = 5
- CLOSED_LOOP_POSITION = 0
- CONTROLLER_VOLTAGE = 2
- OPEN_LOOP_POSITION = 6
- POSITION_ERROR = 3
- POSITION_ERROR_ABS = 4
- SETPOINT = 1
- __new__(value)
NV Specific Capabilities
NV-specific display capability implementation.
- class NVDisplay[source]
Bases:
DisplayDisplay capability for NV devices.
Internally scale brightness to NV’s expected 0-255 range.
NV-series encoder knob capabilities.
- class NVCLEKnob[source]
Bases:
NVKnobEncoder knob capability variant with closed-loop step settings.
- CMD_STEP_CLOSED_LOOP = 'KNOB_STEP_CLOSED_LOOP'
- async get_step_closed_loop() float[source]
Read closed-loop knob step value.
- Returns:
Closed-loop step size (typically in µm).
- async set(mode: NVKnobMode | None = None, sample_time: float | None = None, accel_exponent: int | None = None, step_limit: int | None = None, step_open_loop: float | None = None, step_closed_loop: float | None = None) None[source]
Set encoder knob parameters. Only parameters that are not None will be updated.
- Parameters:
mode – Encoder knob mode (acceleration, interval, or interval with acceleration)
sample_time – Sample time in seconds (NV expects multiples of 20ms)
accel_exponent – Exponent for acceleration mode (higher = more aggressive)
step_limit – Maximum step size for interval mode
step_open_loop – Step size for open loop control (V)
step_closed_loop – Step size for closed loop control (µm)
- Returns:
None
- class NVKnob[source]
Bases:
PiezoCapabilityEncoder knob configuration for open-loop and shared settings.
- CMD_ACCEL_EXPONENT = 'KNOB_ACCEL_EXPONENT'
- CMD_MODE = 'KNOB_MODE'
- CMD_SAMPLE_TIME = 'KNOB_SAMPLE_TIME'
- CMD_STEP_LIMIT = 'KNOB_STEP_LIMIT'
- CMD_STEP_OPEN_LOOP = 'KNOB_STEP_OPEN_LOOP'
- async get_accel_exponent() int[source]
Read acceleration exponent.
- Returns:
Configured acceleration exponent.
- async get_mode() NVKnobMode[source]
Read the currently configured knob mode.
- Returns:
Current
NVKnobMode. ReturnsNVKnobMode.UNKNOWNif parsing fails.
- async get_sample_time() float[source]
Read encoder sampling period.
- Returns:
Sampling period in seconds.
- async get_step_limit() int[source]
Read configured knob step limit.
- Returns:
Maximum step limit value.
- async get_step_open_loop() float[source]
Read open-loop knob step value.
- Returns:
Open-loop step size (typically in volts).
- async set(mode: NVKnobMode | None = None, sample_time: float | None = None, accel_exponent: int | None = None, step_limit: int | None = None, step_open_loop: float | None = None) None[source]
Set encoder knob parameters. Only parameters that are not None will be updated.
- Parameters:
mode – Encoder knob mode (acceleration, interval, or interval with acceleration)
sample_time – Sample time in seconds (NV expects multiples of 20ms)
accel_exponent – Exponent for acceleration mode (higher = more aggressive)
step_limit – Maximum step size for interval mode
step_open_loop – Step size for open loop control (V)
- Returns:
None
- class NVKnobMode[source]
Bases:
EnumNV encoder knob operating modes.
Defines the operating behavior of the front-panel encoder knob for setpoint adjustment.
- UNKNOWN
Unrecognized mode (fallback when parsing fails).
- ACCELERATION
Step size scales with rotation speed.
- INTERVAL
Fixed step interval mode.
- INTERVAL_ACCELERATION
Interval mode with acceleration behavior.
Example
>>> from psj_lib import NVKnobMode >>> await device.knob.set(mode=NVKnobMode.INTERVAL) >>> mode = await device.knob.get_mode() >>> print(mode.name)
- ACCELERATION = 0
- INTERVAL = 1
- INTERVAL_ACCELERATION = 2
- UNKNOWN = -1
NV-specific modulation source capability and source enum.
- class NVModulationSource[source]
Bases:
ModulationSourceModulation source capability with client-side readback cache.
NV devices do not expose readback for modulation source, therefore the last written value is returned by
get_source().- __init__(*args, **kwargs)[source]
Initialize modulation source capability.
- Parameters:
*args – Positional arguments forwarded to
ModulationSource.**kwargs – Keyword arguments forwarded to
ModulationSource.
- async get_source() NVModulationSourceTypes[source]
Get current modulation source.
NV hardware does not provide source readback. This returns the last value set via
set_source().- Returns:
Last configured
NVModulationSourceTypesvalue.
- async set_source(source: NVModulationSourceTypes) None[source]
Set modulation source.
- Parameters:
source – Desired NV modulation source.
- Returns:
None
- class NVModulationSourceTypes[source]
Bases:
ModulationSourceTypesNV modulation source selection.
Defines available control sources for setpoint modulation on NV devices.
- ENCODER_ANALOG
Front-panel encoder + analog control path.
- SERIAL
Serial command path.
Example
>>> from psj_lib import NVModulationSourceTypes >>> await channel.modulation_source.set_source(NVModulationSourceTypes.SERIAL) >>> source = await channel.modulation_source.get_source() >>> print(source.name)
- ENCODER_ANALOG = 0
- SERIAL = 1
- __new__(value)
NV-specific monitor output capability and source enum.
- class NVMonitorOutput[source]
Bases:
MonitorOutputMonitor output capability with cached readback behavior.
- __init__(*args, **kwargs)[source]
Initialize monitor output capability.
- Parameters:
sources – Device-specific MonitorOutputSource enum type
- async get_source() NVMonitorOutputSource[source]
Get monitor output source.
NV hardware does not provide source readback. This returns the last value set via
set_source().- Returns:
Last configured
NVMonitorOutputSourcevalue.
- async set_source(source: NVMonitorOutputSource) None[source]
Set monitor output source.
- Parameters:
source – Desired monitor output source.
- Returns:
None
- Raises:
ValueError – If
sourceis not anNVMonitorOutputSource.
- class NVMonitorOutputSource[source]
Bases:
MonitorOutputSourceNV analog monitor output routing.
Defines internal signals that can be routed to the analog monitor output connector for live observation.
- ACTUATOR_VOLTAGE
Actuator drive voltage signal.
- POSITION_VOLTAGE
Position-proportional voltage signal.
- MODE_DEPENDENT
Source selected by current operation mode. Voltage in open loop, position in closed loop.
Example
>>> from psj_lib import NVMonitorOutputSource >>> await channel.monitor_output.set_source(NVMonitorOutputSource.POSITION_VOLTAGE) >>> source = await channel.monitor_output.get_source() >>> print(source.name)
- ACTUATOR_VOLTAGE = 0
- MODE_DEPENDENT = 2
- POSITION_VOLTAGE = 1
- __new__(value)
NV setpoint capability with client-side readback cache.
- class NVSetpoint[source]
Bases:
SetpointSetpoint capability for NV devices.
NV devices do not support direct setpoint readback, so the last written value is cached and returned by
get().- __init__(*args, **kwargs)[source]
Initialize the capability with command execution callback.
- Parameters:
write_cb – Function to execute device commands
device_commands – Mapping of command IDs to device strings
channel_id – Optional channel identifier for multi-channel devices (might be used by subclasses)
NV status register interpretation helpers.
This module provides flag constants and an NV-specific
StatusRegister
implementation for decoding per-channel status words.
- class NVStatusRegister[source]
Bases:
StatusRegisterNV-specific interpretation of per-channel error/status flags.
The raw NV
ERRORresponse contains one hexadecimal status value per channel. This class interprets those bit fields.Example
>>> status = await channel.status.get() >>> if not status.actuator_plugged: ... print("No actuator connected") >>> if status.over_temperature: ... print("Thermal warning")
- property actuator_plugged: bool
Indicates whether the actuator is plugged in.
- Returns:
True if the actuator is detected, otherwise False.
- property actuator_short: bool
Indicates whether a short circuit is detected on the actuator.
- Returns:
True if a short condition is present, otherwise False.
- property eeprom_error: bool
Indicates whether an EEPROM error is detected.
- Returns:
True if EEPROM error flag is set, otherwise False.
- interpret_status_register(flag: int) bool[source]
Interpret a specific flag from the raw status register value.
- Parameters:
flag – Specific flag bit to interpret (e.g. FLAG_ACTUATOR_NOT_PLUGGED)
- Returns:
True if the specified bit is set for the current channel, otherwise False.
- Raises:
ValueError – If no channel ID context is available.
- property invalid_actuator: bool
Indicates whether an invalid actuator is detected.
- Returns:
True if actuator identification is invalid, otherwise False.
- property over_temperature: bool
Indicates whether an over-temperature condition is detected.
- Returns:
True if over-temperature flag is set, otherwise False.
- property overload: bool
Indicates whether an overload condition is detected.
In this case the actuator is not able to reach the setpoint even with maximum voltage.
- Returns:
True if overload condition is active, otherwise False.
- property underload: bool
Indicates whether an underload condition is detected.
In this case the actuator is not able to reach the setpoint even with minimum voltage.
- Returns:
True if underload condition is active, otherwise False.
Exceptions
- exception ActuatorNotConnected[source]
Bases:
DeviceErrorPiezo actuator is not physically connected to the device.
Raised when the device detects that the piezo actuator is not plugged in or properly connected. This may prevent certain operations that require an active actuator connection.
- exception AdmissibleParameterRangeExceeded[source]
Bases:
DeviceErrorParameter value is outside the acceptable range for the command.
Raised when a parameter value exceeds the valid range defined by the device for that particular command. Check device documentation for valid parameter ranges.
Example
Setting a voltage beyond the device’s maximum output capability.
- exception CommandParameterCountExceeded[source]
Bases:
DeviceErrorToo many parameters were provided for the command.
Raised when more parameters are sent than the command expects. Each command has a defined number of parameters; sending extra parameters results in this error.
- exception DeviceError[source]
Bases:
ExceptionBase exception class for all device-related errors.
This is the parent exception for all errors that occur during device communication and operation. Catch this exception to handle any device error generically, or catch specific subclasses for targeted error handling.
Device errors typically result from: - Invalid commands or parameters sent to the device - Hardware conditions (overload, underload) - Communication protocol violations - Device configuration issues
Example
>>> try: ... await device.write("invalid_command") ... except DeviceError as e: ... print(f"Device error occurred: {e}")
- class ErrorCode[source]
Bases:
EnumEnumeration of device error codes with exception class mapping.
This enum defines all possible error codes that can be returned by piezoelectric devices. Each error code corresponds to a specific DeviceError exception subclass.
Error codes are returned by the device in response to invalid commands, parameter violations, or hardware conditions. The ErrorCode class provides utilities to convert error codes to exceptions and raise appropriate errors.
- ERROR_NOT_SPECIFIED
Generic unspecified error (code 1)
- Type:
int
- UNKNOWN_COMMAND
Command not recognized by device (code 2)
- Type:
int
- PARAMETER_MISSING
Required parameter not provided (code 3)
- Type:
int
- ADMISSIBLE_PARAMETER_RANGE_EXCEEDED
Parameter outside valid range (code 4)
- Type:
int
- COMMAND_PARAMETER_COUNT_EXCEEDED
Too many parameters provided (code 5)
- Type:
int
- PARAMETER_LOCKED_OR_READ_ONLY
Cannot modify locked/read-only parameter (code 6)
- Type:
int
- UNDERLOAD
Device detected underload condition (code 7)
- Type:
int
- OVERLOAD
Device detected overload condition (code 8)
- Type:
int
- PARAMETER_TOO_LOW
Parameter below minimum value (code 9)
- Type:
int
- PARAMETER_TOO_HIGH
Parameter above maximum value (code 10)
- Type:
int
- ACTUATOR_NOT_CONNECTED
Actuator not physically connected (code 98)
- Type:
int
- UNKNOWN_CHANNEL
Specified channel does not exist (code 99)
- Type:
int
Example
>>> error_code = ErrorCode.from_value(2) >>> print(error_code) # ErrorCode.UNKNOWN_COMMAND >>> ErrorCode.raise_error(error_code) # Raises UnknownCommand exception
- ACTUATOR_NOT_CONNECTED = 98
- ADMISSIBLE_PARAMETER_RANGE_EXCEEDED = 4
- COMMAND_PARAMETER_COUNT_EXCEEDED = 5
- ERROR_NOT_SPECIFIED = 1
- OVERLOAD = 8
- PARAMETER_LOCKED_OR_READ_ONLY = 6
- PARAMETER_MISSING = 3
- PARAMETER_TOO_HIGH = 10
- PARAMETER_TOO_LOW = 9
- UNDERLOAD = 7
- UNKNOWN_CHANNEL = 99
- UNKNOWN_COMMAND = 2
- classmethod from_value(value: int) ErrorCode[source]
Converts an integer value to its corresponding ErrorCode enum member.
- Parameters:
value (int) – The integer value representing the error code.
- Returns:
The corresponding ErrorCode enum member. If the value does not match any defined error code, returns ErrorCode.ERROR_NOT_SPECIFIED.
- Return type:
- classmethod get_exception_class(error_code) Type[DeviceError][source]
Returns the appropriate exception class for a given error code.
- Parameters:
error_code (ErrorCode or int) – The error code for which to get the exception class.
- Returns:
The exception class corresponding to the error code.
- Return type:
Type[DeviceError]
- classmethod raise_error(error_code, message=None)[source]
Raises the appropriate exception for a given error code.
- Parameters:
error_code (ErrorCode or int) – The error code to raise.
message (str, optional) – Custom error message. If not provided, uses default description.
- Raises:
DeviceError – The specific exception corresponding to the error code.
- exception ErrorNotSpecified[source]
Bases:
DeviceErrorGeneric error when the device does not provide specific error details.
Raised when the device reports an error condition but does not provide a specific error code. This typically indicates an internal device error or a communication protocol issue.
- exception Overload[source]
Bases:
DeviceErrorDevice detected an overload condition.
- exception ParameterLockedOrReadOnly[source]
Bases:
DeviceErrorAttempted to modify a read-only or locked parameter.
Raised when trying to write to a parameter that: - Is inherently read-only (e.g., hardware status, sensor values) - Is currently locked by device configuration or security settings - Cannot be changed in the current device operation mode
Example
Attempting to change a factory calibration value that is locked.
- exception ParameterMissing[source]
Bases:
DeviceErrorRequired parameter was not provided with the command.
Raised when a command that requires one or more parameters is sent without the necessary parameter values.
Example
A command expecting a channel number was sent without specifying which channel to operate on.
- exception ParameterTooHigh[source]
Bases:
DeviceErrorParameter value exceeds the maximum acceptable value.
Raised when a parameter value exceeds the upper bound of the acceptable range for that parameter.
This is more specific than AdmissibleParameterRangeExceeded, indicating specifically that the value is too high.
- exception ParameterTooLow[source]
Bases:
DeviceErrorParameter value is below the minimum acceptable value.
Raised when a parameter value is below the lower bound of the acceptable range for that parameter.
This is more specific than AdmissibleParameterRangeExceeded, indicating specifically that the value is too low.
- exception Underload[source]
Bases:
DeviceErrorDevice detected an underload condition.
- exception UnknownChannel[source]
Bases:
DeviceErrorSpecified channel does not exist on the device.
Raised when attempting to access or configure a channel number that is not present on the connected device.
Example
Accessing channel 3 on a single-channel device.
- exception UnknownCommand[source]
Bases:
DeviceErrorThe device does not recognize the command sent.
Raised when attempting to send a command that is not supported by the device or is not valid for the current device firmware version.
Common causes: - Typo in command name - Command not supported by device model - Firmware version incompatibility
Type Definitions
- class ActorType[source]
Bases:
EnumEnumeration of piezoelectric actuator connection types and geometries.
Different actuator designs require different electrical driving schemes and have different characteristics in terms of stroke, force, and response time.
- NANOX
piezosystem jena nanoX® actuators.
- Type:
0
- PSH
PSH style piezo actuators (not used anymore).
- Type:
1
- PARALLEL
Normal piezo actuators with parallel output stage configuration.
- Type:
2
- UNKNOWN
Actuator type could not be determined. Fallback value when actuator information is unavailable or unrecognized.
- Type:
99
- NANOX = 0
- PARALLEL = 2
- PSH = 1
- UNKNOWN = 99
- class DeviceInfo[source]
Bases:
objectComplete information about a connected piezoelectric device.
This dataclass aggregates all identification and configuration information for a connected device. It combines transport-layer information with device-specific metadata, providing a complete picture of the device’s identity and capabilities.
This information is typically populated during device discovery or initial connection and is used throughout the device’s lifecycle for logging, identification, and configuration management.
- transport_info
Transport layer connection details. Contains the transport type (SERIAL/TELNET), connection identifier (COM port or IP address), and MAC address if applicable.
- Type:
- device_id
Device model or family identifier. Examples: - “d-Drive”: Modular d-Drive piezo amplifier - “NV200/D_NET”: Network-enabled NV200 compact amplifier - “SPI Controller Box”: Multi-channel SPI controller This field is None if device identification has not been performed.
- Type:
Optional[str]
- extended_info
Device-specific metadata dictionary. Additional information that varies by device type. Common keys include: - “actuator_name”: Model name of connected actuator (e.g., “PSH15SG_Y”) - “actuator_serial”: Serial number of connected actuator - “firmware_version”: Device firmware version string - “channels”: Number of available channels - “capabilities”: List of supported features Empty dict if extended information was not requested during discovery.
- Type:
Dict[str, str]
Example
>>> device = await DDriveDevice.discover_devices()[0] >>> info = device.device_info >>> print(f"Connected to {info.device_id} on {info.transport_info}") >>> print(f"Actuator: {info.extended_info.get('actuator_name', 'Unknown')}")
- __init__(transport_info: ~psj_lib.devices.transport_protocol.transport_types.TransportProtocolInfo, device_id: str | None = None, extended_info: ~typing.Dict[str, str] = <factory>) None
- device_id: str | None = None
- extended_info: Dict[str, str]
- transport_info: TransportProtocolInfo
- class SensorType[source]
Bases:
EnumEnumeration of position sensor types used in piezoelectric actuators.
Different actuator models use different sensor technologies for position feedback in closed-loop control systems. The sensor type affects measurement accuracy, resolution, and environmental sensitivity.
- NONE
No position sensor present. Actuator operates in open-loop mode only. Position control relies solely on the voltage-displacement relationship of the piezo stack.
- Type:
0
- STRAIN_GAUGE
Strain gauge based position sensor. Measures mechanical deformation. Good for compact designs but can be less accurate and more environmentally sensitive than other sensor types.
- Type:
1
- CAPACITIVE
Capacitive position sensor. Non-contact measurement with high resolution and excellent linearity. Common in high-precision nanopositioning actuators.
- Type:
2
- INDUCTIVE
Inductive position sensor. Measures position through electromagnetic induction. Robust against environmental disturbances but typically lower resolution than capacitive.
- Type:
3
- UNKNOWN
Sensor type could not be determined. Fallback value when sensor information is unavailable or unrecognized.
- Type:
99
- CAPACITIVE = 2
- INDUCTIVE = 3
- NONE = 0
- STRAIN_GAUGE = 1
- UNKNOWN = 99
- class DetectedDevice[source]
Bases:
objectInformation about a device discovered during network or serial scanning.
This class represents a device that has been found during the discovery process. It contains all the information needed to identify the device and establish a connection. After discovery, this information can be used to create a device instance and connect to it.
- transport
The transport protocol type for this device. Indicates whether the device was found via serial or network.
- Type:
- identifier
Transport-specific connection identifier. - For SERIAL: COM port or device path (e.g., “COM3”, “/dev/ttyUSB0”) - For TELNET: IP address (e.g., “192.168.1.100”)
- Type:
str
- mac
MAC address for network devices. Only populated for TELNET transport. Used for unique device identification in network environments. Format: “XX:XX:XX:XX:XX:XX”
- Type:
Optional[str]
- device_id
Device model identifier if detected. Examples: “d-Drive”, “SPI Controller Box” This is only available if device identification was requested during discovery.
- Type:
Optional[str]
- device_info
Extended device information dictionary. May contain keys such as: - “actuator_name”: Connected actuator model - “actuator_serial”: Actuator serial number - “firmware_version”: Device firmware version Only populated if DiscoverFlags.READ_DEVICE_INFO was used.
- Type:
Dict[str, str]
Example
>>> from psj_lib import DDriveDevice >>> devices = await DDriveDevice.discover_devices() >>> if devices: ... print(devices[0]) # "Serial @ COM3 - d-Drive - {...}" ... device = DDriveDevice.from_detected_device(devices[0])
- __init__(transport: TransportType, identifier: str, mac: str | None = None, device_id: str | None = None) None
- device_id: str | None = None
- identifier: str
- mac: str | None = None
- transport: TransportType
Bases:
ProtocolExceptionException raised when a device cannot be accessed or is unavailable.
This exception is typically raised when: - A device cannot be found on the specified port or network address - A device is physically disconnected during operation - Network connectivity to a device is lost - A serial port cannot be opened due to permissions or other issues
Example
>>> try: ... device = DDriveDevice(TransportType.SERIAL, "COM99") ... await device.connect() ... except DeviceUnavailableException: ... print("Device not found on COM99")
- exception ProtocolException[source]
Bases:
ExceptionBase exception class for transport protocol communication errors.
This exception serves as the parent class for all protocol-related exceptions, allowing users to catch any transport protocol error with a single except clause.
Raised when there are general communication errors that don’t fit into more specific exception categories.
- exception TimeoutException[source]
Bases:
ProtocolExceptionException raised when a communication operation times out.
This exception occurs when a read or write operation does not complete within the specified timeout period. This typically indicates: - The device is not responding - The device is busy processing another command - Communication interference or hardware issues - Incorrect communication parameters
The default timeout for most operations is 0.6 seconds, but this can be adjusted based on the specific operation and device response time.
- class TransportProtocolInfo[source]
Bases:
objectContainer for transport protocol metadata and connection information.
This class encapsulates the essential information needed to identify and describe a transport protocol connection to a device. It is used throughout the library to pass connection information between discovery, connection, and device management functions.
- transport
The type of transport protocol in use.
- Type:
- identifier
The connection identifier specific to the transport type. - For SERIAL: COM port name (e.g., “COM3”, “/dev/ttyUSB0”) - For TELNET: IP address (e.g., “192.168.1.100”)
- Type:
str
- mac
MAC address of the device, if available and applicable. Only relevant for network-connected devices (TELNET transport).
- Type:
Optional[str]
Example
>>> info = TransportProtocolInfo( ... transport=TransportType.TELNET, ... identifier="192.168.1.100", ... mac="00:80:A3:12:34:56" ... ) >>> print(info) # "Telnet @ 192.168.1.100"
- __init__(transport: TransportType, identifier: str, mac: str | None = None) None
- identifier: str
- mac: str | None = None
- transport: TransportType
- class TransportType[source]
Bases:
str,EnumEnumeration of supported transport layer protocols for device communication.
This enum defines the available methods for communicating with piezosystem jena devices. Each transport type has its own characteristics, advantages, and use cases.
- TELNET
Network communication via Telnet protocol (TCP/IP). - Used for Ethernet-connected devices - Supports remote operation over LAN/WAN - Typical identifier: IP address (e.g., “192.168.1.100”) - Default port: 23
- SERIAL
Direct serial communication (RS-232/USB-Serial). - Used for USB or RS-232 connected devices - Lower latency than network communication - Typical identifier: COM port (Windows) or /dev/ttyUSB* (Linux) - Default baud rate: 115200
Example
>>> # Connect via serial >>> device = DDriveDevice(TransportType.SERIAL, "COM3") >>> # Connect via network >>> device = DDriveDevice(TransportType.TELNET, "192.168.1.100")
- SERIAL = 'serial'
- TELNET = 'telnet'
- __new__(value)
Note: TransportType, DiscoverFlags, and TransportProtocolInfo are exported
from the main psj_lib module, not from transport_protocol directly.
Common Patterns
Device Creation
# Direct instantiation (recommended)
device = DDriveDevice(TransportType.SERIAL, "COM3")
# From discovery (devices are already instantiated)
devices = await DDriveDevice.discover_devices()
device = devices[0] # Already a DDriveDevice instance
Connection Management
# Context manager (recommended)
async with device:
# Use device
pass
# Manual
await device.open()
try:
# Use device
pass
finally:
await device.close()
Channel Access
# Get all channels
channels = device.channels
# Access by index
channel = device.channels[0]
# Iterate
for channel in device.channels:
# Use channel
pass
Capability Access
# Capabilities are channel attributes
position = channel.position
setpoint = channel.setpoint
pid = channel.pid_controller
recorder = channel.data_recorder
# Use capabilities
await setpoint.set(50.0)
await pid.set(p=0.5, i=0.1, d=0.05)
await recorder.start()
Error Handling
from psj_lib import DeviceError, DeviceUnavailableException
try:
async with device:
await channel.setpoint.set(50.0)
except DeviceUnavailableException as e:
print(f"Connection failed: {e}")
except DeviceError as e:
print(f"Device error: {e}")
Type Hints
psj-lib includes comprehensive type hints for IDE autocomplete:
from psj_lib import DDriveDevice, TransportType, Position, PIDController
async def typed_function(device: DDriveDevice) -> float:
channel = device.channels[0]
# IDE provides autocomplete for all methods
position: Position = channel.position
value: float = await position.get()
return value
Async Patterns
Sequential Operations
# Operations execute one after another
await channel.setpoint.set(30.0)
await channel.setpoint.set(50.0)
await channel.setpoint.set(70.0)
Parallel Operations
# Operations execute concurrently
await asyncio.gather(
channel1.setpoint.set(30.0),
channel2.setpoint.set(60.0),
channel3.setpoint.set(90.0)
)
Timeouts
# With timeout
async with asyncio.timeout(5.0):
await channel.setpoint.set(50.0)