Source code for nv200.device_discovery

"""
This module provides asynchronous device discovery functionality for the NV200 library.
It concurrently scans for devices available via Telnet and Serial protocols, returning
a unified list of detected devices. Each detected device is represented by a :class:`.DetectedDevice`
instance, annotated with its transport type (TELNET or SERIAL), identifier (such as IP address
or serial port), and optionally a MAC address.
"""

import asyncio
import logging
from typing import List, Type, Optional
from nv200.device_factory import create_device_from_id
from nv200.transport_protocol import TransportProtocol
from nv200.telnet_protocol import TelnetProtocol  
from nv200.serial_protocol import SerialProtocol
from nv200.shared_types import DetectedDevice, TransportType, DiscoverFlags
from nv200.device_base import PiezoDeviceBase
from nv200.transport_factory import transport_from_detected_device


# Global module locker
logger = logging.getLogger(__name__)



async def _enrich_device_info(detected_device: DetectedDevice, flags: DiscoverFlags) -> DetectedDevice:
    """
    Asynchronously enriches a DetectedDevice object with additional actuator information.

    Returns:
        DetectedDevice: The enriched device information object with actuator name and serial number populated.
    """
    try:
        logger.debug("Reading device ID from: %s", detected_device.identifier)
        protocol = transport_from_detected_device(detected_device)
        await protocol.connect(auto_adjust_comm_params=bool(flags & DiscoverFlags.ADJUST_COMM_PARAMS))
        dev = PiezoDeviceBase(protocol)
        detected_device.device_id = await dev.get_device_type()
        logger.debug("Device ID detected: %s", detected_device.device_id)
        dev = create_device_from_id(detected_device.device_id, protocol)
        await dev.enrich_device_info(detected_device)
        await protocol.close()
        return detected_device
    except Exception:
        return None
      

[docs] async def discover_devices(flags: DiscoverFlags = DiscoverFlags.ALL_INTERFACES, device_class: Optional[Type[PiezoDeviceBase]] = None) -> List[DetectedDevice]: """ Asynchronously discovers devices on available interfaces based on the specified discovery flags and optional device class. The discovery process can be customized using flags to enable or disable: - `DiscoverFlags.DETECT_ETHERNET` - detect devices connected via Ethernet - `DiscoverFlags.DETECT_SERIAL` - detect devices connected via Serial - `DiscoverFlags.READ_DEVICE_INFO` - enrich device information with additional details such as actuator name and actuator serial number Args: flags (DiscoverFlags, optional): Flags indicating which interfaces to scan and whether to read device info. Defaults to DiscoverFlags.ALL_INTERFACES. device_class (Optional[Type[PiezoDeviceBase]], optional): If specified, only devices matching this class will be returned. Also ensures device info is read. Returns: List[DetectedDevice]: A list of detected devices, optionally enriched with detailed information and filtered by device class. Raises: Any exceptions raised by underlying protocol discovery or enrichment functions. Notes: - Device discovery is performed in parallel for supported interfaces (Ethernet, Serial). - If READ_DEVICE_INFO is set, each detected device is enriched with additional information. - If device_class is specified, only devices matching the class's DEVICE_ID are returned. Examples: Discover all Devices on all interfaces >>> device = await nv200.device_discovery.discover_devices(DiscoverFlags.ALL_INTERFACES | DiscoverFlags.READ_DEVICE_INFO) Discover NV200 Devices connected via serial interface >>> device = await nv200.device_discovery.discover_devices(DiscoverFlags.DETECT_SERIAL | DiscoverFlags.READ_DEVICE_INFO, NV200Device) """ devices: List[DetectedDevice] = [] tasks = [] if device_class: flags |= DiscoverFlags.READ_DEVICE_INFO # Ensure we read device info if a specific class is requested if flags & DiscoverFlags.DETECT_ETHERNET: tasks.append(TelnetProtocol.discover_devices(flags)) else: tasks.append(asyncio.sleep(0, result=[])) # Placeholder for parallel await if flags & DiscoverFlags.DETECT_SERIAL: tasks.append(SerialProtocol.discover_devices(flags)) else: tasks.append(asyncio.sleep(0, result=[])) # Placeholder for parallel await eth_devs, serial_devs = await asyncio.gather(*tasks) if flags & DiscoverFlags.DETECT_ETHERNET: devices.extend(eth_devs) if flags & DiscoverFlags.DETECT_SERIAL: devices.extend(serial_devs) if flags & DiscoverFlags.READ_DEVICE_INFO: # Enrich each device with detailed info logger.debug("Enriching %d devices with detailed info...", len(devices)) raw_results = await asyncio.gather(*(_enrich_device_info(d, flags) for d in devices)) devices = [d for d in raw_results if d is not None] if device_class: # Filter devices by the specified device class devices = [d for d in devices if d.device_id == device_class.DEVICE_ID] return devices