calibration_tools_v1.0/lidar_driver/sdk_test.py
2025-02-20 10:45:17 +08:00

213 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ctypes
from ctypes import *
# 加载C++库
livox_sdk = ctypes.CDLL('./liblivox_sdk_wrapper.so')
# 预定义的常量
kBroadcastCodeSize = 16 # 广播码长度
# 定义枚举类型
class DeviceType(c_int):
kDeviceTypeHub = 0
kDeviceTypeLidarMid40 = 1
kDeviceTypeLidarTele = 2
kDeviceTypeLidarHorizon = 3
kDeviceTypeLidarMid70 = 6
kDeviceTypeLidarAvia = 7
class LidarState(c_int):
kLidarStateInit = 0
kLidarStateNormal = 1
kLidarStatePowerSaving = 2
kLidarStateStandBy = 3
kLidarStateError = 4
kLidarStateUnknown = 5
class LidarMode(c_int):
kLidarModeNormal = 1
kLidarModePowerSaving = 2
kLidarModeStandby = 3
class LidarFeature(c_int):
kLidarFeatureNone = 0
kLidarFeatureRainFog = 1
class LidarIpMode(c_int):
kLidarDynamicIpMode = 0
kLidarStaticIpMode = 1
class LidarScanPattern(c_int):
kNoneRepetitiveScanPattern = 0
kRepetitiveScanPattern = 1
class LivoxStatus(c_int):
kStatusSendFailed = -9
kStatusHandlerImplNotExist = -8
kStatusInvalidHandle = -7
kStatusChannelNotExist = -6
kStatusNotEnoughMemory = -5
kStatusTimeout = -4
kStatusNotSupported = -3
kStatusNotConnected = -2
kStatusFailure = -1
kStatusSuccess = 0
# 定义结构体和联合体
class LidarErrorCode(Structure):
_fields_ = [
("temp_status", c_uint32, 2),
("volt_status", c_uint32, 2),
("motor_status", c_uint32, 2),
("dirty_warn", c_uint32, 2),
("firmware_err", c_uint32, 1),
("pps_status", c_uint32, 1),
("device_status", c_uint32, 1),
("fan_status", c_uint32, 1),
("self_heating", c_uint32, 1),
("ptp_status", c_uint32, 1),
("time_sync_status", c_uint32, 3),
("rsvd", c_uint32, 13),
("system_status", c_uint32, 2),
]
class HubErrorCode(Structure):
_fields_ = [
("sync_status", c_uint32, 2),
("temp_status", c_uint32, 2),
("lidar_status", c_uint32, 1),
("lidar_link_status", c_uint32, 1),
("firmware_err", c_uint32, 1),
("rsvd", c_uint32, 23),
("system_status", c_uint32, 2),
]
class ErrorMessage(Union):
_fields_ = [
("error_code", c_uint32),
("lidar_error_code", LidarErrorCode),
("hub_error_code", HubErrorCode)
]
class StatusUnion(Union):
_fields_ = [
("error_code", c_uint32), # 假设这个字段为示例
("lidar_error_code", LidarErrorCode),
("hub_error_code", HubErrorCode)
]
# 定义 DeviceInfo 结构体
class DeviceInfo(Structure):
_fields_ = [
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码最大15个字符带终止符
("handle", c_uint8), # 设备句柄
("slot", c_uint8), # 插槽编号
("id", c_uint8), # LiDAR id
("type", DeviceType), # 设备类型
("data_port", c_uint16), # 点云数据UDP端口
("cmd_port", c_uint16), # 控制命令UDP端口
("sensor_port", c_uint16), # IMU数据UDP端口
("ip", c_char * 16), # IP地址
("state", LidarState), # LiDAR状态
("feature", LidarFeature), # LiDAR特性
("status", StatusUnion), # LiDAR工作状态
("firmware_version", c_uint8 * 4) # 固件版本
]
# 定义 BroadcastDeviceInfo 结构体
class BroadcastDeviceInfo(Structure):
_fields_ = [
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码最多15个字符带终止符
("dev_type", c_uint8), # 设备类型,参考 DeviceType
("reserved", c_uint16), # 保留字段
("ip", c_char * 16) # 设备 IP 地址
]
# 定义 LivoxEthPacket 结构体
class LivoxEthPacket(Structure):
_fields_ = [
("version", c_uint8), # Packet protocol version.
("slot", c_uint8), # Slot number used for connecting LiDAR.
("id", c_uint8), # LiDAR id.
("rsvd", c_uint8), # Reserved.
("err_code", c_uint32), # Device error status indicator information.
("timestamp_type", c_uint8), # Timestamp type.
("data_type", c_uint8), # Point cloud coordinate format.
("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp.
("data", c_uint8 * 1) # Point cloud data (can be extended as needed).
]
# 定义 LivoxExtendRawPoint 结构体
class LivoxExtendRawPoint(Structure):
_fields_ = [
('x', c_int32), # X axis, Unit: mm
('y', c_int32), # Y axis, Unit: mm
('z', c_int32), # Z axis, Unit: mm
('reflectivity', c_uint8), # Reflectivity
('tag', c_uint8) # Tag
]
# 回调函数类型定义
DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(LivoxEthPacket), c_uint32)
ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage))
DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8)
DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo))
point_size = sizeof(LivoxExtendRawPoint)
# 数据回调函数
@DataCallbackType
def data_callback(handle, packet, data_num):
# print(f"Received data from handle {handle}, data num: {data_num}")
# 1. 计算数据的总字节数
total_data_size = data_num * point_size
# 2. 将 packet.data 转换为指向 LivoxExtendRawPoint 数组的指针
point_data_array_type = LivoxExtendRawPoint * data_num # 定义类型
point_data_pointer = cast(packet.data, POINTER(point_data_array_type)) # 将data转换为LivoxExtendRawPoint指针
# 3. 获取具体的点云数据
point_data_array = point_data_pointer.contents # 访问数组内容
# 4. 遍历每个 LivoxExtendRawPoint 并打印数据
for i in range(data_num):
point = point_data_array[i]
print(f"Point {i}: X={point.x}, Y={point.y}, Z={point.z}, Reflectivity={point.reflectivity}, Tag={point.tag}")
# 错误回调函数
@ErrorCallbackType
def error_callback(status, handle, error_message):
print(f"Error from handle {handle}, status: {status}")
# 设备状态变化回调函数
@DeviceChangeCallbackType
def device_change_callback(info, event_type):
print(f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}")
# 设备广播回调函数
@DeviceBroadcastCallbackType
def device_broadcast_callback(info):
print(f"New device broadcast received: {bytes(info.contents.broadcast_code).decode('utf-8')}")
# 注册回调函数
livox_sdk.register_data_callback(data_callback)
livox_sdk.register_error_callback(error_callback)
livox_sdk.register_device_change_callback(device_change_callback)
livox_sdk.register_device_broadcast_callback(device_broadcast_callback)
# 调用初始化和开始发现设备的函数
if livox_sdk.init_sdk():
print("Livox SDK initialized.")
else:
print("Failed to initialize Livox SDK.")
if livox_sdk.start_discovery():
print("Started discovering Livox devices.")
else:
print("Failed to discover devices.")
# 等待数据回调
try:
while True:
pass
except KeyboardInterrupt:
livox_sdk.stop_sdk()
print("SDK stopped.")