detect-gui/components/lidar.py
2024-11-21 16:46:11 +08:00

283 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import ctypes
from ctypes import *
import numpy as np
from dynaconf.base import Settings
from core.edge_component import device, EdgeComponent, service
from core.logging import logger
import time
from PyQt5.QtCore import QObject, pyqtSignal
from core.config import settings
# 定义 livox_status 的返回值类型
livox_status = ctypes.c_int
# 预定义的常量
kBroadcastCodeSize = 16 # 广播码长度
# 定义枚举类型
class DeviceType(c_int):
kDeviceTypeHub = 0
kDeviceTypeLidarMid40 = 1
kDeviceTypeLidarTele = 2
kDeviceTypeLidarHorizon = 3
kDeviceTypeLidarMid70 = 6
kDeviceTypeLidarAvia = 7
class LidarState(c_int):
kLidarStateInit = 0
kLidarStateNormal = 1
kLidarStatePowerSaving = 2
kLidarStateStandBy = 3
kLidarStateError = 4
kLidarStateUnknown = 5
class LidarMode(c_int):
kLidarModeNormal = 1
kLidarModePowerSaving = 2
kLidarModeStandby = 3
class LidarFeature(c_int):
kLidarFeatureNone = 0
kLidarFeatureRainFog = 1
class LidarIpMode(c_int):
kLidarDynamicIpMode = 0
kLidarStaticIpMode = 1
class LidarScanPattern(c_int):
kNoneRepetitiveScanPattern = 0
kRepetitiveScanPattern = 1
class LivoxStatus(c_int):
kStatusSendFailed = -9
kStatusHandlerImplNotExist = -8
kStatusInvalidHandle = -7
kStatusChannelNotExist = -6
kStatusNotEnoughMemory = -5
kStatusTimeout = -4
kStatusNotSupported = -3
kStatusNotConnected = -2
kStatusFailure = -1
kStatusSuccess = 0
# 定义结构体和联合体
class LidarErrorCode(Structure):
_fields_ = [
("temp_status", c_uint32, 2),
("volt_status", c_uint32, 2),
("motor_status", c_uint32, 2),
("dirty_warn", c_uint32, 2),
("firmware_err", c_uint32, 1),
("pps_status", c_uint32, 1),
("device_status", c_uint32, 1),
("fan_status", c_uint32, 1),
("self_heating", c_uint32, 1),
("ptp_status", c_uint32, 1),
("time_sync_status", c_uint32, 3),
("rsvd", c_uint32, 13),
("system_status", c_uint32, 2),
]
class HubErrorCode(Structure):
_fields_ = [
("sync_status", c_uint32, 2),
("temp_status", c_uint32, 2),
("lidar_status", c_uint32, 1),
("lidar_link_status", c_uint32, 1),
("firmware_err", c_uint32, 1),
("rsvd", c_uint32, 23),
("system_status", c_uint32, 2),
]
class ErrorMessage(Union):
_fields_ = [
("error_code", c_uint32),
("lidar_error_code", LidarErrorCode),
("hub_error_code", HubErrorCode)
]
class StatusUnion(Union):
_fields_ = [
("error_code", c_uint32), # 假设这个字段为示例
("lidar_error_code", LidarErrorCode),
("hub_error_code", HubErrorCode)
]
# 定义 DeviceInfo 结构体
class DeviceInfo(Structure):
_fields_ = [
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码最大15个字符带终止符
("handle", c_uint8), # 设备句柄
("slot", c_uint8), # 插槽编号
("id", c_uint8), # LiDAR id
("type", DeviceType), # 设备类型
("data_port", c_uint16), # 点云数据UDP端口
("cmd_port", c_uint16), # 控制命令UDP端口
("sensor_port", c_uint16), # IMU数据UDP端口
("ip", c_char * 16), # IP地址
("state", LidarState), # LiDAR状态
("feature", LidarFeature), # LiDAR特性
("status", StatusUnion), # LiDAR工作状态
("firmware_version", c_uint8 * 4) # 固件版本
]
# 定义 BroadcastDeviceInfo 结构体
class BroadcastDeviceInfo(Structure):
_fields_ = [
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码最多15个字符带终止符
("dev_type", c_uint8), # 设备类型,参考 DeviceType
("reserved", c_uint16), # 保留字段
("ip", c_char * 16) # 设备 IP 地址
]
# 定义 LivoxEthPacket 结构体
class LivoxEthPacket(Structure):
_fields_ = [
("version", c_uint8), # Packet protocol version.
("slot", c_uint8), # Slot number used for connecting LiDAR.
("id", c_uint8), # LiDAR id.
("rsvd", c_uint8), # Reserved.
("err_code", c_uint32), # Device error status indicator information.
("timestamp_type", c_uint8), # Timestamp type.
("data_type", c_uint8), # Point cloud coordinate format.
("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp.
("data", c_uint8 * 1) # Point cloud data (can be extended as needed).
]
# Extend cartesian coordinate format.
class LivoxExtendRawPoint(Structure):
_fields_ = [
('x', c_int32), # X axis, Unit: mm
('y', c_int32), # Y axis, Unit: mm
('z', c_int32), # Z axis, Unit: mm
('reflectivity', c_uint8), # Reflectivity
('tag', c_uint8) # Tag
]
# 回调函数类型定义
DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(c_int32), c_uint32)
ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage))
DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8)
DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo))
CommonCommandCallback = CFUNCTYPE(None, c_int, c_uint8, c_uint8)
class LidarSignals(QObject):
# QT 信号
on_device_broadcast = pyqtSignal(str)
# 0 - 已连接1 - 已断开
on_device_change = pyqtSignal(int)
on_device_error = pyqtSignal(ErrorMessage)
on_device_data = pyqtSignal(object)
@device("lidar", auto_start=False)
class LidarDevice(EdgeComponent):
signals = LidarSignals()
def __init__(self, context):
super().__init__(context)
self.livox_sdk = None
self.data_callback = None
self.error_callback = None
self.device_change_callback = None
self.device_broadcast_callback = None
self.common_command_callback = None
def configure(self, setting: Settings) -> None:
self.init_livox_sdk()
self.logger.info(f"Lidar configure done.")
def init_livox_sdk(self):
try:
# 加载C++库
current_file_dir = os.path.dirname(os.path.abspath(__file__))
self.livox_sdk = ctypes.CDLL(os.path.join(current_file_dir, '../vendors/livox/liblivox_sdk_wrapper.so'))
self.livox_sdk.connect.argtypes = [c_char_p] # 设置参数类型
self.livox_sdk.connect.restype = c_int # 设置返回值类型
# 设置回调函数
self.init_livox_callback()
except Exception as e:
self.logger.error(f"load livox sdk failed: {str(e)}")
def init_livox_callback(self):
# 数据回调函数
def data_callback(handle, points, data_num):
# 将 ctypes 指针转换为 numpy 数组,点数 * 4 个 uint32 值 (x, y, z, reflectivity_and_tag)
np_data = np.ctypeslib.as_array(points, shape=(data_num * 4,))
# 将 x, y, z 组成新的二维 numpy 数组
xyz = np_data.reshape(data_num, 4)[:, :3] # 提取前 3 个数值 (x, y, z)
# 结果
self.signals.on_device_data.emit(xyz)
# 通用指令回调函数
def common_command_callback(status, handle, response):
logger.info(f"Common from handle {handle}, status: {status}, response: {response}")
# 错误回调函数
def error_callback(status, handle, error_message):
logger.error(f"Error from handle {handle}, status: {status}")
# 设备状态变化回调函数
def device_change_callback(info, event_type):
logger.info(
f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}")
self.signals.on_device_change.emit(event_type)
# 设备广播回调函数
def device_broadcast_callback(info):
broadcast_code_bytes = bytes(info.contents.broadcast_code)
broadcast_code = broadcast_code_bytes.decode('utf-8')
logger.info(f"New device broadcast received: {broadcast_code}")
if broadcast_code == settings.get("lidar.target.barcode", "3GGDLCM00201561"):
logger.info(f"Connect device: {broadcast_code}")
self.livox_sdk.connect(broadcast_code_bytes)
self.signals.on_device_broadcast.emit(broadcast_code)
self.data_callback = DataCallbackType(data_callback)
self.error_callback = ErrorCallbackType(data_callback)
self.device_change_callback = DeviceChangeCallbackType(device_change_callback)
self.device_broadcast_callback = DeviceBroadcastCallbackType(device_broadcast_callback)
self.common_command_callback = CommonCommandCallback(common_command_callback)
# 注册回调函数
self.livox_sdk.register_data_callback(self.data_callback)
self.livox_sdk.register_error_callback(self.error_callback)
self.livox_sdk.register_device_change_callback(self.device_change_callback)
self.livox_sdk.register_device_broadcast_callback(self.device_broadcast_callback)
self.livox_sdk.register_common_command_callback(self.common_command_callback)
@service()
def start(self):
if self.livox_sdk is None:
raise RuntimeError("雷达设备SDK未正确加载")
if self.livox_sdk.init_sdk():
self.logger.info("Livox SDK initialized.")
if self.livox_sdk.start_discovery():
self.logger.info("Started discovering Livox devices.")
else:
self.logger.error("Failed to discover devices.")
else:
self.logger.error("Failed to initialize Livox SDK.")
@service()
def set_mode(self, mode):
"""
设置雷达模式1-normal2-power saving
"""
self.logger.info(f'set mode: {mode}')
return self.livox_sdk.set_mode(mode)
@service()
def stop(self):
self.livox_sdk.stop_sdk()
self.logger.info("Livox SDK stopped.")
if __name__ == '__main__':
pass