2024-11-21 11:39:52 +08:00
|
|
|
|
import os
|
|
|
|
|
import ctypes
|
|
|
|
|
from ctypes import *
|
|
|
|
|
import numpy as np
|
|
|
|
|
from dynaconf.base import Settings
|
|
|
|
|
|
|
|
|
|
from core.edge_component import device, EdgeComponent, service
|
|
|
|
|
from core.logging import logger
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
from PyQt5.QtCore import QObject, pyqtSignal
|
|
|
|
|
|
|
|
|
|
from core.config import settings
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 定义 livox_status 的返回值类型
|
|
|
|
|
livox_status = ctypes.c_int
|
|
|
|
|
# 预定义的常量
|
|
|
|
|
kBroadcastCodeSize = 16 # 广播码长度
|
|
|
|
|
|
|
|
|
|
# 定义枚举类型
|
|
|
|
|
class DeviceType(c_int):
|
|
|
|
|
kDeviceTypeHub = 0
|
|
|
|
|
kDeviceTypeLidarMid40 = 1
|
|
|
|
|
kDeviceTypeLidarTele = 2
|
|
|
|
|
kDeviceTypeLidarHorizon = 3
|
|
|
|
|
kDeviceTypeLidarMid70 = 6
|
|
|
|
|
kDeviceTypeLidarAvia = 7
|
|
|
|
|
|
|
|
|
|
class LidarState(c_int):
|
|
|
|
|
kLidarStateInit = 0
|
|
|
|
|
kLidarStateNormal = 1
|
|
|
|
|
kLidarStatePowerSaving = 2
|
|
|
|
|
kLidarStateStandBy = 3
|
|
|
|
|
kLidarStateError = 4
|
|
|
|
|
kLidarStateUnknown = 5
|
|
|
|
|
|
|
|
|
|
class LidarMode(c_int):
|
|
|
|
|
kLidarModeNormal = 1
|
|
|
|
|
kLidarModePowerSaving = 2
|
|
|
|
|
kLidarModeStandby = 3
|
|
|
|
|
|
|
|
|
|
class LidarFeature(c_int):
|
|
|
|
|
kLidarFeatureNone = 0
|
|
|
|
|
kLidarFeatureRainFog = 1
|
|
|
|
|
|
|
|
|
|
class LidarIpMode(c_int):
|
|
|
|
|
kLidarDynamicIpMode = 0
|
|
|
|
|
kLidarStaticIpMode = 1
|
|
|
|
|
|
|
|
|
|
class LidarScanPattern(c_int):
|
|
|
|
|
kNoneRepetitiveScanPattern = 0
|
|
|
|
|
kRepetitiveScanPattern = 1
|
|
|
|
|
|
|
|
|
|
class LivoxStatus(c_int):
|
|
|
|
|
kStatusSendFailed = -9
|
|
|
|
|
kStatusHandlerImplNotExist = -8
|
|
|
|
|
kStatusInvalidHandle = -7
|
|
|
|
|
kStatusChannelNotExist = -6
|
|
|
|
|
kStatusNotEnoughMemory = -5
|
|
|
|
|
kStatusTimeout = -4
|
|
|
|
|
kStatusNotSupported = -3
|
|
|
|
|
kStatusNotConnected = -2
|
|
|
|
|
kStatusFailure = -1
|
|
|
|
|
kStatusSuccess = 0
|
|
|
|
|
|
|
|
|
|
# 定义结构体和联合体
|
|
|
|
|
class LidarErrorCode(Structure):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("temp_status", c_uint32, 2),
|
|
|
|
|
("volt_status", c_uint32, 2),
|
|
|
|
|
("motor_status", c_uint32, 2),
|
|
|
|
|
("dirty_warn", c_uint32, 2),
|
|
|
|
|
("firmware_err", c_uint32, 1),
|
|
|
|
|
("pps_status", c_uint32, 1),
|
|
|
|
|
("device_status", c_uint32, 1),
|
|
|
|
|
("fan_status", c_uint32, 1),
|
|
|
|
|
("self_heating", c_uint32, 1),
|
|
|
|
|
("ptp_status", c_uint32, 1),
|
|
|
|
|
("time_sync_status", c_uint32, 3),
|
|
|
|
|
("rsvd", c_uint32, 13),
|
|
|
|
|
("system_status", c_uint32, 2),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
class HubErrorCode(Structure):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("sync_status", c_uint32, 2),
|
|
|
|
|
("temp_status", c_uint32, 2),
|
|
|
|
|
("lidar_status", c_uint32, 1),
|
|
|
|
|
("lidar_link_status", c_uint32, 1),
|
|
|
|
|
("firmware_err", c_uint32, 1),
|
|
|
|
|
("rsvd", c_uint32, 23),
|
|
|
|
|
("system_status", c_uint32, 2),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
class ErrorMessage(Union):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("error_code", c_uint32),
|
|
|
|
|
("lidar_error_code", LidarErrorCode),
|
|
|
|
|
("hub_error_code", HubErrorCode)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
class StatusUnion(Union):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("error_code", c_uint32), # 假设这个字段为示例
|
|
|
|
|
("lidar_error_code", LidarErrorCode),
|
|
|
|
|
("hub_error_code", HubErrorCode)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# 定义 DeviceInfo 结构体
|
|
|
|
|
class DeviceInfo(Structure):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最大15个字符,带终止符
|
|
|
|
|
("handle", c_uint8), # 设备句柄
|
|
|
|
|
("slot", c_uint8), # 插槽编号
|
|
|
|
|
("id", c_uint8), # LiDAR id
|
|
|
|
|
("type", DeviceType), # 设备类型
|
|
|
|
|
("data_port", c_uint16), # 点云数据UDP端口
|
|
|
|
|
("cmd_port", c_uint16), # 控制命令UDP端口
|
|
|
|
|
("sensor_port", c_uint16), # IMU数据UDP端口
|
|
|
|
|
("ip", c_char * 16), # IP地址
|
|
|
|
|
("state", LidarState), # LiDAR状态
|
|
|
|
|
("feature", LidarFeature), # LiDAR特性
|
|
|
|
|
("status", StatusUnion), # LiDAR工作状态
|
|
|
|
|
("firmware_version", c_uint8 * 4) # 固件版本
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# 定义 BroadcastDeviceInfo 结构体
|
|
|
|
|
class BroadcastDeviceInfo(Structure):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最多15个字符,带终止符
|
|
|
|
|
("dev_type", c_uint8), # 设备类型,参考 DeviceType
|
|
|
|
|
("reserved", c_uint16), # 保留字段
|
|
|
|
|
("ip", c_char * 16) # 设备 IP 地址
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# 定义 LivoxEthPacket 结构体
|
|
|
|
|
class LivoxEthPacket(Structure):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
("version", c_uint8), # Packet protocol version.
|
|
|
|
|
("slot", c_uint8), # Slot number used for connecting LiDAR.
|
|
|
|
|
("id", c_uint8), # LiDAR id.
|
|
|
|
|
("rsvd", c_uint8), # Reserved.
|
|
|
|
|
("err_code", c_uint32), # Device error status indicator information.
|
|
|
|
|
("timestamp_type", c_uint8), # Timestamp type.
|
|
|
|
|
("data_type", c_uint8), # Point cloud coordinate format.
|
|
|
|
|
("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp.
|
|
|
|
|
("data", c_uint8 * 1) # Point cloud data (can be extended as needed).
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# Extend cartesian coordinate format.
|
|
|
|
|
class LivoxExtendRawPoint(Structure):
|
|
|
|
|
_fields_ = [
|
|
|
|
|
('x', c_int32), # X axis, Unit: mm
|
|
|
|
|
('y', c_int32), # Y axis, Unit: mm
|
|
|
|
|
('z', c_int32), # Z axis, Unit: mm
|
|
|
|
|
('reflectivity', c_uint8), # Reflectivity
|
|
|
|
|
('tag', c_uint8) # Tag
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# 回调函数类型定义
|
|
|
|
|
DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(c_int32), c_uint32)
|
|
|
|
|
ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage))
|
|
|
|
|
DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8)
|
|
|
|
|
DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo))
|
|
|
|
|
CommonCommandCallback = CFUNCTYPE(None, c_int, c_uint8, c_uint8)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LidarSignals(QObject):
|
|
|
|
|
# QT 信号
|
|
|
|
|
on_device_broadcast = pyqtSignal(str)
|
|
|
|
|
# 0 - 已连接,1 - 已断开
|
|
|
|
|
on_device_change = pyqtSignal(int)
|
|
|
|
|
on_device_error = pyqtSignal(ErrorMessage)
|
|
|
|
|
on_device_data = pyqtSignal(object)
|
|
|
|
|
|
|
|
|
|
|
2024-11-21 16:46:11 +08:00
|
|
|
|
@device("lidar", auto_start=False)
|
2024-11-21 11:39:52 +08:00
|
|
|
|
class LidarDevice(EdgeComponent):
|
|
|
|
|
signals = LidarSignals()
|
|
|
|
|
|
|
|
|
|
def __init__(self, context):
|
|
|
|
|
super().__init__(context)
|
|
|
|
|
self.livox_sdk = None
|
|
|
|
|
self.data_callback = None
|
|
|
|
|
self.error_callback = None
|
|
|
|
|
self.device_change_callback = None
|
|
|
|
|
self.device_broadcast_callback = None
|
|
|
|
|
self.common_command_callback = None
|
|
|
|
|
|
|
|
|
|
def configure(self, setting: Settings) -> None:
|
|
|
|
|
self.init_livox_sdk()
|
|
|
|
|
self.logger.info(f"Lidar configure done.")
|
|
|
|
|
|
|
|
|
|
def init_livox_sdk(self):
|
|
|
|
|
try:
|
|
|
|
|
# 加载C++库
|
|
|
|
|
current_file_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
self.livox_sdk = ctypes.CDLL(os.path.join(current_file_dir, '../vendors/livox/liblivox_sdk_wrapper.so'))
|
|
|
|
|
self.livox_sdk.connect.argtypes = [c_char_p] # 设置参数类型
|
|
|
|
|
self.livox_sdk.connect.restype = c_int # 设置返回值类型
|
|
|
|
|
# 设置回调函数
|
|
|
|
|
self.init_livox_callback()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(f"load livox sdk failed: {str(e)}")
|
|
|
|
|
|
|
|
|
|
def init_livox_callback(self):
|
|
|
|
|
# 数据回调函数
|
|
|
|
|
def data_callback(handle, points, data_num):
|
|
|
|
|
# 将 ctypes 指针转换为 numpy 数组,点数 * 4 个 uint32 值 (x, y, z, reflectivity_and_tag)
|
|
|
|
|
np_data = np.ctypeslib.as_array(points, shape=(data_num * 4,))
|
|
|
|
|
# 将 x, y, z 组成新的二维 numpy 数组
|
|
|
|
|
xyz = np_data.reshape(data_num, 4)[:, :3] # 提取前 3 个数值 (x, y, z)
|
|
|
|
|
# 结果
|
|
|
|
|
self.signals.on_device_data.emit(xyz)
|
|
|
|
|
|
|
|
|
|
# 通用指令回调函数
|
|
|
|
|
def common_command_callback(status, handle, response):
|
|
|
|
|
logger.info(f"Common from handle {handle}, status: {status}, response: {response}")
|
|
|
|
|
|
|
|
|
|
# 错误回调函数
|
|
|
|
|
def error_callback(status, handle, error_message):
|
|
|
|
|
logger.error(f"Error from handle {handle}, status: {status}")
|
|
|
|
|
|
|
|
|
|
# 设备状态变化回调函数
|
|
|
|
|
def device_change_callback(info, event_type):
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}")
|
|
|
|
|
self.signals.on_device_change.emit(event_type)
|
|
|
|
|
|
|
|
|
|
# 设备广播回调函数
|
|
|
|
|
def device_broadcast_callback(info):
|
|
|
|
|
broadcast_code_bytes = bytes(info.contents.broadcast_code)
|
|
|
|
|
broadcast_code = broadcast_code_bytes.decode('utf-8')
|
|
|
|
|
logger.info(f"New device broadcast received: {broadcast_code}")
|
|
|
|
|
if broadcast_code == settings.get("lidar.target.barcode", "3GGDLCM00201561"):
|
|
|
|
|
logger.info(f"Connect device: {broadcast_code}")
|
|
|
|
|
self.livox_sdk.connect(broadcast_code_bytes)
|
|
|
|
|
self.signals.on_device_broadcast.emit(broadcast_code)
|
|
|
|
|
|
|
|
|
|
self.data_callback = DataCallbackType(data_callback)
|
|
|
|
|
self.error_callback = ErrorCallbackType(data_callback)
|
|
|
|
|
self.device_change_callback = DeviceChangeCallbackType(device_change_callback)
|
|
|
|
|
self.device_broadcast_callback = DeviceBroadcastCallbackType(device_broadcast_callback)
|
|
|
|
|
self.common_command_callback = CommonCommandCallback(common_command_callback)
|
|
|
|
|
# 注册回调函数
|
|
|
|
|
self.livox_sdk.register_data_callback(self.data_callback)
|
|
|
|
|
self.livox_sdk.register_error_callback(self.error_callback)
|
|
|
|
|
self.livox_sdk.register_device_change_callback(self.device_change_callback)
|
|
|
|
|
self.livox_sdk.register_device_broadcast_callback(self.device_broadcast_callback)
|
|
|
|
|
self.livox_sdk.register_common_command_callback(self.common_command_callback)
|
|
|
|
|
|
|
|
|
|
@service()
|
|
|
|
|
def start(self):
|
|
|
|
|
if self.livox_sdk is None:
|
|
|
|
|
raise RuntimeError("雷达设备SDK未正确加载")
|
|
|
|
|
|
|
|
|
|
if self.livox_sdk.init_sdk():
|
|
|
|
|
self.logger.info("Livox SDK initialized.")
|
|
|
|
|
if self.livox_sdk.start_discovery():
|
|
|
|
|
self.logger.info("Started discovering Livox devices.")
|
|
|
|
|
else:
|
|
|
|
|
self.logger.error("Failed to discover devices.")
|
|
|
|
|
else:
|
|
|
|
|
self.logger.error("Failed to initialize Livox SDK.")
|
|
|
|
|
|
|
|
|
|
@service()
|
|
|
|
|
def set_mode(self, mode):
|
|
|
|
|
"""
|
|
|
|
|
设置雷达模式:1-normal,2-power saving
|
|
|
|
|
"""
|
|
|
|
|
self.logger.info(f'set mode: {mode}')
|
|
|
|
|
return self.livox_sdk.set_mode(mode)
|
|
|
|
|
|
|
|
|
|
@service()
|
|
|
|
|
def stop(self):
|
|
|
|
|
self.livox_sdk.stop_sdk()
|
|
|
|
|
self.logger.info("Livox SDK stopped.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
pass
|