mirror of
http://git.xinwangdao.com/cnnc-embedded-parts-detect/detect-gui.git
synced 2025-06-24 13:14:11 +08:00
213 lines
7.4 KiB
Python
213 lines
7.4 KiB
Python
|
import ctypes
|
|||
|
from ctypes import *
|
|||
|
|
|||
|
# 加载C++库
|
|||
|
livox_sdk = ctypes.CDLL('./liblivox_sdk_wrapper.so')
|
|||
|
|
|||
|
# 预定义的常量
|
|||
|
kBroadcastCodeSize = 16 # 广播码长度
|
|||
|
# 定义枚举类型
|
|||
|
class DeviceType(c_int):
|
|||
|
kDeviceTypeHub = 0
|
|||
|
kDeviceTypeLidarMid40 = 1
|
|||
|
kDeviceTypeLidarTele = 2
|
|||
|
kDeviceTypeLidarHorizon = 3
|
|||
|
kDeviceTypeLidarMid70 = 6
|
|||
|
kDeviceTypeLidarAvia = 7
|
|||
|
|
|||
|
class LidarState(c_int):
|
|||
|
kLidarStateInit = 0
|
|||
|
kLidarStateNormal = 1
|
|||
|
kLidarStatePowerSaving = 2
|
|||
|
kLidarStateStandBy = 3
|
|||
|
kLidarStateError = 4
|
|||
|
kLidarStateUnknown = 5
|
|||
|
|
|||
|
class LidarMode(c_int):
|
|||
|
kLidarModeNormal = 1
|
|||
|
kLidarModePowerSaving = 2
|
|||
|
kLidarModeStandby = 3
|
|||
|
|
|||
|
class LidarFeature(c_int):
|
|||
|
kLidarFeatureNone = 0
|
|||
|
kLidarFeatureRainFog = 1
|
|||
|
|
|||
|
class LidarIpMode(c_int):
|
|||
|
kLidarDynamicIpMode = 0
|
|||
|
kLidarStaticIpMode = 1
|
|||
|
|
|||
|
class LidarScanPattern(c_int):
|
|||
|
kNoneRepetitiveScanPattern = 0
|
|||
|
kRepetitiveScanPattern = 1
|
|||
|
|
|||
|
class LivoxStatus(c_int):
|
|||
|
kStatusSendFailed = -9
|
|||
|
kStatusHandlerImplNotExist = -8
|
|||
|
kStatusInvalidHandle = -7
|
|||
|
kStatusChannelNotExist = -6
|
|||
|
kStatusNotEnoughMemory = -5
|
|||
|
kStatusTimeout = -4
|
|||
|
kStatusNotSupported = -3
|
|||
|
kStatusNotConnected = -2
|
|||
|
kStatusFailure = -1
|
|||
|
kStatusSuccess = 0
|
|||
|
|
|||
|
# 定义结构体和联合体
|
|||
|
class LidarErrorCode(Structure):
|
|||
|
_fields_ = [
|
|||
|
("temp_status", c_uint32, 2),
|
|||
|
("volt_status", c_uint32, 2),
|
|||
|
("motor_status", c_uint32, 2),
|
|||
|
("dirty_warn", c_uint32, 2),
|
|||
|
("firmware_err", c_uint32, 1),
|
|||
|
("pps_status", c_uint32, 1),
|
|||
|
("device_status", c_uint32, 1),
|
|||
|
("fan_status", c_uint32, 1),
|
|||
|
("self_heating", c_uint32, 1),
|
|||
|
("ptp_status", c_uint32, 1),
|
|||
|
("time_sync_status", c_uint32, 3),
|
|||
|
("rsvd", c_uint32, 13),
|
|||
|
("system_status", c_uint32, 2),
|
|||
|
]
|
|||
|
|
|||
|
class HubErrorCode(Structure):
|
|||
|
_fields_ = [
|
|||
|
("sync_status", c_uint32, 2),
|
|||
|
("temp_status", c_uint32, 2),
|
|||
|
("lidar_status", c_uint32, 1),
|
|||
|
("lidar_link_status", c_uint32, 1),
|
|||
|
("firmware_err", c_uint32, 1),
|
|||
|
("rsvd", c_uint32, 23),
|
|||
|
("system_status", c_uint32, 2),
|
|||
|
]
|
|||
|
|
|||
|
class ErrorMessage(Union):
|
|||
|
_fields_ = [
|
|||
|
("error_code", c_uint32),
|
|||
|
("lidar_error_code", LidarErrorCode),
|
|||
|
("hub_error_code", HubErrorCode)
|
|||
|
]
|
|||
|
|
|||
|
class StatusUnion(Union):
|
|||
|
_fields_ = [
|
|||
|
("error_code", c_uint32), # 假设这个字段为示例
|
|||
|
("lidar_error_code", LidarErrorCode),
|
|||
|
("hub_error_code", HubErrorCode)
|
|||
|
]
|
|||
|
|
|||
|
# 定义 DeviceInfo 结构体
|
|||
|
class DeviceInfo(Structure):
|
|||
|
_fields_ = [
|
|||
|
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最大15个字符,带终止符
|
|||
|
("handle", c_uint8), # 设备句柄
|
|||
|
("slot", c_uint8), # 插槽编号
|
|||
|
("id", c_uint8), # LiDAR id
|
|||
|
("type", DeviceType), # 设备类型
|
|||
|
("data_port", c_uint16), # 点云数据UDP端口
|
|||
|
("cmd_port", c_uint16), # 控制命令UDP端口
|
|||
|
("sensor_port", c_uint16), # IMU数据UDP端口
|
|||
|
("ip", c_char * 16), # IP地址
|
|||
|
("state", LidarState), # LiDAR状态
|
|||
|
("feature", LidarFeature), # LiDAR特性
|
|||
|
("status", StatusUnion), # LiDAR工作状态
|
|||
|
("firmware_version", c_uint8 * 4) # 固件版本
|
|||
|
]
|
|||
|
|
|||
|
# 定义 BroadcastDeviceInfo 结构体
|
|||
|
class BroadcastDeviceInfo(Structure):
|
|||
|
_fields_ = [
|
|||
|
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最多15个字符,带终止符
|
|||
|
("dev_type", c_uint8), # 设备类型,参考 DeviceType
|
|||
|
("reserved", c_uint16), # 保留字段
|
|||
|
("ip", c_char * 16) # 设备 IP 地址
|
|||
|
]
|
|||
|
|
|||
|
# 定义 LivoxEthPacket 结构体
|
|||
|
class LivoxEthPacket(Structure):
|
|||
|
_fields_ = [
|
|||
|
("version", c_uint8), # Packet protocol version.
|
|||
|
("slot", c_uint8), # Slot number used for connecting LiDAR.
|
|||
|
("id", c_uint8), # LiDAR id.
|
|||
|
("rsvd", c_uint8), # Reserved.
|
|||
|
("err_code", c_uint32), # Device error status indicator information.
|
|||
|
("timestamp_type", c_uint8), # Timestamp type.
|
|||
|
("data_type", c_uint8), # Point cloud coordinate format.
|
|||
|
("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp.
|
|||
|
("data", c_uint8 * 1) # Point cloud data (can be extended as needed).
|
|||
|
]
|
|||
|
|
|||
|
# 定义 LivoxExtendRawPoint 结构体
|
|||
|
class LivoxExtendRawPoint(Structure):
|
|||
|
_fields_ = [
|
|||
|
('x', c_int32), # X axis, Unit: mm
|
|||
|
('y', c_int32), # Y axis, Unit: mm
|
|||
|
('z', c_int32), # Z axis, Unit: mm
|
|||
|
('reflectivity', c_uint8), # Reflectivity
|
|||
|
('tag', c_uint8) # Tag
|
|||
|
]
|
|||
|
|
|||
|
# 回调函数类型定义
|
|||
|
DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(LivoxEthPacket), c_uint32)
|
|||
|
ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage))
|
|||
|
DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8)
|
|||
|
DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo))
|
|||
|
|
|||
|
|
|||
|
point_size = sizeof(LivoxExtendRawPoint)
|
|||
|
|
|||
|
# 数据回调函数
|
|||
|
@DataCallbackType
|
|||
|
def data_callback(handle, packet, data_num):
|
|||
|
# print(f"Received data from handle {handle}, data num: {data_num}")
|
|||
|
# 1. 计算数据的总字节数
|
|||
|
total_data_size = data_num * point_size
|
|||
|
# 2. 将 packet.data 转换为指向 LivoxExtendRawPoint 数组的指针
|
|||
|
point_data_array_type = LivoxExtendRawPoint * data_num # 定义类型
|
|||
|
point_data_pointer = cast(packet.data, POINTER(point_data_array_type)) # 将data转换为LivoxExtendRawPoint指针
|
|||
|
# 3. 获取具体的点云数据
|
|||
|
point_data_array = point_data_pointer.contents # 访问数组内容
|
|||
|
# 4. 遍历每个 LivoxExtendRawPoint 并打印数据
|
|||
|
for i in range(data_num):
|
|||
|
point = point_data_array[i]
|
|||
|
print(f"Point {i}: X={point.x}, Y={point.y}, Z={point.z}, Reflectivity={point.reflectivity}, Tag={point.tag}")
|
|||
|
|
|||
|
# 错误回调函数
|
|||
|
@ErrorCallbackType
|
|||
|
def error_callback(status, handle, error_message):
|
|||
|
print(f"Error from handle {handle}, status: {status}")
|
|||
|
|
|||
|
# 设备状态变化回调函数
|
|||
|
@DeviceChangeCallbackType
|
|||
|
def device_change_callback(info, event_type):
|
|||
|
print(f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}")
|
|||
|
|
|||
|
# 设备广播回调函数
|
|||
|
@DeviceBroadcastCallbackType
|
|||
|
def device_broadcast_callback(info):
|
|||
|
print(f"New device broadcast received: {bytes(info.contents.broadcast_code).decode('utf-8')}")
|
|||
|
|
|||
|
# 注册回调函数
|
|||
|
livox_sdk.register_data_callback(data_callback)
|
|||
|
livox_sdk.register_error_callback(error_callback)
|
|||
|
livox_sdk.register_device_change_callback(device_change_callback)
|
|||
|
livox_sdk.register_device_broadcast_callback(device_broadcast_callback)
|
|||
|
|
|||
|
# 调用初始化和开始发现设备的函数
|
|||
|
if livox_sdk.init_sdk():
|
|||
|
print("Livox SDK initialized.")
|
|||
|
else:
|
|||
|
print("Failed to initialize Livox SDK.")
|
|||
|
|
|||
|
if livox_sdk.start_discovery():
|
|||
|
print("Started discovering Livox devices.")
|
|||
|
else:
|
|||
|
print("Failed to discover devices.")
|
|||
|
|
|||
|
# 等待数据回调
|
|||
|
try:
|
|||
|
while True:
|
|||
|
pass
|
|||
|
except KeyboardInterrupt:
|
|||
|
livox_sdk.stop_sdk()
|
|||
|
print("SDK stopped.")
|