import os import ctypes from ctypes import * import numpy as np from dynaconf.base import Settings from core.edge_component import device, EdgeComponent, service from core.logging import logger import time from PyQt5.QtCore import QObject, pyqtSignal from core.config import settings # 定义 livox_status 的返回值类型 livox_status = ctypes.c_int # 预定义的常量 kBroadcastCodeSize = 16 # 广播码长度 # 定义枚举类型 class DeviceType(c_int): kDeviceTypeHub = 0 kDeviceTypeLidarMid40 = 1 kDeviceTypeLidarTele = 2 kDeviceTypeLidarHorizon = 3 kDeviceTypeLidarMid70 = 6 kDeviceTypeLidarAvia = 7 class LidarState(c_int): kLidarStateInit = 0 kLidarStateNormal = 1 kLidarStatePowerSaving = 2 kLidarStateStandBy = 3 kLidarStateError = 4 kLidarStateUnknown = 5 class LidarMode(c_int): kLidarModeNormal = 1 kLidarModePowerSaving = 2 kLidarModeStandby = 3 class LidarFeature(c_int): kLidarFeatureNone = 0 kLidarFeatureRainFog = 1 class LidarIpMode(c_int): kLidarDynamicIpMode = 0 kLidarStaticIpMode = 1 class LidarScanPattern(c_int): kNoneRepetitiveScanPattern = 0 kRepetitiveScanPattern = 1 class LivoxStatus(c_int): kStatusSendFailed = -9 kStatusHandlerImplNotExist = -8 kStatusInvalidHandle = -7 kStatusChannelNotExist = -6 kStatusNotEnoughMemory = -5 kStatusTimeout = -4 kStatusNotSupported = -3 kStatusNotConnected = -2 kStatusFailure = -1 kStatusSuccess = 0 # 定义结构体和联合体 class LidarErrorCode(Structure): _fields_ = [ ("temp_status", c_uint32, 2), ("volt_status", c_uint32, 2), ("motor_status", c_uint32, 2), ("dirty_warn", c_uint32, 2), ("firmware_err", c_uint32, 1), ("pps_status", c_uint32, 1), ("device_status", c_uint32, 1), ("fan_status", c_uint32, 1), ("self_heating", c_uint32, 1), ("ptp_status", c_uint32, 1), ("time_sync_status", c_uint32, 3), ("rsvd", c_uint32, 13), ("system_status", c_uint32, 2), ] class HubErrorCode(Structure): _fields_ = [ ("sync_status", c_uint32, 2), ("temp_status", c_uint32, 2), ("lidar_status", c_uint32, 1), ("lidar_link_status", c_uint32, 1), ("firmware_err", c_uint32, 1), ("rsvd", c_uint32, 23), ("system_status", c_uint32, 2), ] class ErrorMessage(Union): _fields_ = [ ("error_code", c_uint32), ("lidar_error_code", LidarErrorCode), ("hub_error_code", HubErrorCode) ] class StatusUnion(Union): _fields_ = [ ("error_code", c_uint32), # 假设这个字段为示例 ("lidar_error_code", LidarErrorCode), ("hub_error_code", HubErrorCode) ] # 定义 DeviceInfo 结构体 class DeviceInfo(Structure): _fields_ = [ ("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最大15个字符,带终止符 ("handle", c_uint8), # 设备句柄 ("slot", c_uint8), # 插槽编号 ("id", c_uint8), # LiDAR id ("type", DeviceType), # 设备类型 ("data_port", c_uint16), # 点云数据UDP端口 ("cmd_port", c_uint16), # 控制命令UDP端口 ("sensor_port", c_uint16), # IMU数据UDP端口 ("ip", c_char * 16), # IP地址 ("state", LidarState), # LiDAR状态 ("feature", LidarFeature), # LiDAR特性 ("status", StatusUnion), # LiDAR工作状态 ("firmware_version", c_uint8 * 4) # 固件版本 ] # 定义 BroadcastDeviceInfo 结构体 class BroadcastDeviceInfo(Structure): _fields_ = [ ("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最多15个字符,带终止符 ("dev_type", c_uint8), # 设备类型,参考 DeviceType ("reserved", c_uint16), # 保留字段 ("ip", c_char * 16) # 设备 IP 地址 ] # 定义 LivoxEthPacket 结构体 class LivoxEthPacket(Structure): _fields_ = [ ("version", c_uint8), # Packet protocol version. ("slot", c_uint8), # Slot number used for connecting LiDAR. ("id", c_uint8), # LiDAR id. ("rsvd", c_uint8), # Reserved. ("err_code", c_uint32), # Device error status indicator information. ("timestamp_type", c_uint8), # Timestamp type. ("data_type", c_uint8), # Point cloud coordinate format. ("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp. ("data", c_uint8 * 1) # Point cloud data (can be extended as needed). ] # Extend cartesian coordinate format. class LivoxExtendRawPoint(Structure): _fields_ = [ ('x', c_int32), # X axis, Unit: mm ('y', c_int32), # Y axis, Unit: mm ('z', c_int32), # Z axis, Unit: mm ('reflectivity', c_uint8), # Reflectivity ('tag', c_uint8) # Tag ] # 回调函数类型定义 DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(c_int32), c_uint32) ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage)) DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8) DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo)) CommonCommandCallback = CFUNCTYPE(None, c_int, c_uint8, c_uint8) class LidarSignals(QObject): # QT 信号 on_device_broadcast = pyqtSignal(str) # 0 - 已连接,1 - 已断开 on_device_change = pyqtSignal(int) on_device_error = pyqtSignal(ErrorMessage) on_device_data = pyqtSignal(object) @device("lidar", auto_start=True) class LidarDevice(EdgeComponent): signals = LidarSignals() def __init__(self, context): super().__init__(context) self.livox_sdk = None self.data_callback = None self.error_callback = None self.device_change_callback = None self.device_broadcast_callback = None self.common_command_callback = None def configure(self, setting: Settings) -> None: self.init_livox_sdk() self.logger.info(f"Lidar configure done.") def init_livox_sdk(self): try: # 加载C++库 current_file_dir = os.path.dirname(os.path.abspath(__file__)) self.livox_sdk = ctypes.CDLL(os.path.join(current_file_dir, '../vendors/livox/liblivox_sdk_wrapper.so')) self.livox_sdk.connect.argtypes = [c_char_p] # 设置参数类型 self.livox_sdk.connect.restype = c_int # 设置返回值类型 # 设置回调函数 self.init_livox_callback() except Exception as e: self.logger.error(f"load livox sdk failed: {str(e)}") def init_livox_callback(self): # 数据回调函数 def data_callback(handle, points, data_num): # 将 ctypes 指针转换为 numpy 数组,点数 * 4 个 uint32 值 (x, y, z, reflectivity_and_tag) np_data = np.ctypeslib.as_array(points, shape=(data_num * 4,)) # 将 x, y, z 组成新的二维 numpy 数组 xyz = np_data.reshape(data_num, 4)[:, :3] # 提取前 3 个数值 (x, y, z) # 结果 self.signals.on_device_data.emit(xyz) # 通用指令回调函数 def common_command_callback(status, handle, response): logger.info(f"Common from handle {handle}, status: {status}, response: {response}") # 错误回调函数 def error_callback(status, handle, error_message): logger.error(f"Error from handle {handle}, status: {status}") # 设备状态变化回调函数 def device_change_callback(info, event_type): logger.info( f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}") self.signals.on_device_change.emit(event_type) # 设备广播回调函数 def device_broadcast_callback(info): broadcast_code_bytes = bytes(info.contents.broadcast_code) broadcast_code = broadcast_code_bytes.decode('utf-8') logger.info(f"New device broadcast received: {broadcast_code}") if broadcast_code == settings.get("lidar.target.barcode", "3GGDLCM00201561"): logger.info(f"Connect device: {broadcast_code}") self.livox_sdk.connect(broadcast_code_bytes) self.signals.on_device_broadcast.emit(broadcast_code) self.data_callback = DataCallbackType(data_callback) self.error_callback = ErrorCallbackType(data_callback) self.device_change_callback = DeviceChangeCallbackType(device_change_callback) self.device_broadcast_callback = DeviceBroadcastCallbackType(device_broadcast_callback) self.common_command_callback = CommonCommandCallback(common_command_callback) # 注册回调函数 self.livox_sdk.register_data_callback(self.data_callback) self.livox_sdk.register_error_callback(self.error_callback) self.livox_sdk.register_device_change_callback(self.device_change_callback) self.livox_sdk.register_device_broadcast_callback(self.device_broadcast_callback) self.livox_sdk.register_common_command_callback(self.common_command_callback) @service() def start(self): if self.livox_sdk is None: raise RuntimeError("雷达设备SDK未正确加载") if self.livox_sdk.init_sdk(): self.logger.info("Livox SDK initialized.") if self.livox_sdk.start_discovery(): self.logger.info("Started discovering Livox devices.") else: self.logger.error("Failed to discover devices.") else: self.logger.error("Failed to initialize Livox SDK.") @service() def set_mode(self, mode): """ 设置雷达模式:1-normal,2-power saving """ self.logger.info(f'set mode: {mode}') return self.livox_sdk.set_mode(mode) @service() def stop(self): self.livox_sdk.stop_sdk() self.logger.info("Livox SDK stopped.") if __name__ == '__main__': pass