calibration_tools_v1.0/lidar_driver/sdk_test.py

213 lines
7.4 KiB
Python
Raw Permalink Normal View History

2025-02-20 10:45:17 +08:00
import ctypes
from ctypes import *
# 加载C++库
livox_sdk = ctypes.CDLL('./liblivox_sdk_wrapper.so')
# 预定义的常量
kBroadcastCodeSize = 16 # 广播码长度
# 定义枚举类型
class DeviceType(c_int):
kDeviceTypeHub = 0
kDeviceTypeLidarMid40 = 1
kDeviceTypeLidarTele = 2
kDeviceTypeLidarHorizon = 3
kDeviceTypeLidarMid70 = 6
kDeviceTypeLidarAvia = 7
class LidarState(c_int):
kLidarStateInit = 0
kLidarStateNormal = 1
kLidarStatePowerSaving = 2
kLidarStateStandBy = 3
kLidarStateError = 4
kLidarStateUnknown = 5
class LidarMode(c_int):
kLidarModeNormal = 1
kLidarModePowerSaving = 2
kLidarModeStandby = 3
class LidarFeature(c_int):
kLidarFeatureNone = 0
kLidarFeatureRainFog = 1
class LidarIpMode(c_int):
kLidarDynamicIpMode = 0
kLidarStaticIpMode = 1
class LidarScanPattern(c_int):
kNoneRepetitiveScanPattern = 0
kRepetitiveScanPattern = 1
class LivoxStatus(c_int):
kStatusSendFailed = -9
kStatusHandlerImplNotExist = -8
kStatusInvalidHandle = -7
kStatusChannelNotExist = -6
kStatusNotEnoughMemory = -5
kStatusTimeout = -4
kStatusNotSupported = -3
kStatusNotConnected = -2
kStatusFailure = -1
kStatusSuccess = 0
# 定义结构体和联合体
class LidarErrorCode(Structure):
_fields_ = [
("temp_status", c_uint32, 2),
("volt_status", c_uint32, 2),
("motor_status", c_uint32, 2),
("dirty_warn", c_uint32, 2),
("firmware_err", c_uint32, 1),
("pps_status", c_uint32, 1),
("device_status", c_uint32, 1),
("fan_status", c_uint32, 1),
("self_heating", c_uint32, 1),
("ptp_status", c_uint32, 1),
("time_sync_status", c_uint32, 3),
("rsvd", c_uint32, 13),
("system_status", c_uint32, 2),
]
class HubErrorCode(Structure):
_fields_ = [
("sync_status", c_uint32, 2),
("temp_status", c_uint32, 2),
("lidar_status", c_uint32, 1),
("lidar_link_status", c_uint32, 1),
("firmware_err", c_uint32, 1),
("rsvd", c_uint32, 23),
("system_status", c_uint32, 2),
]
class ErrorMessage(Union):
_fields_ = [
("error_code", c_uint32),
("lidar_error_code", LidarErrorCode),
("hub_error_code", HubErrorCode)
]
class StatusUnion(Union):
_fields_ = [
("error_code", c_uint32), # 假设这个字段为示例
("lidar_error_code", LidarErrorCode),
("hub_error_code", HubErrorCode)
]
# 定义 DeviceInfo 结构体
class DeviceInfo(Structure):
_fields_ = [
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码最大15个字符带终止符
("handle", c_uint8), # 设备句柄
("slot", c_uint8), # 插槽编号
("id", c_uint8), # LiDAR id
("type", DeviceType), # 设备类型
("data_port", c_uint16), # 点云数据UDP端口
("cmd_port", c_uint16), # 控制命令UDP端口
("sensor_port", c_uint16), # IMU数据UDP端口
("ip", c_char * 16), # IP地址
("state", LidarState), # LiDAR状态
("feature", LidarFeature), # LiDAR特性
("status", StatusUnion), # LiDAR工作状态
("firmware_version", c_uint8 * 4) # 固件版本
]
# 定义 BroadcastDeviceInfo 结构体
class BroadcastDeviceInfo(Structure):
_fields_ = [
("broadcast_code", c_char * kBroadcastCodeSize), # 广播码最多15个字符带终止符
("dev_type", c_uint8), # 设备类型,参考 DeviceType
("reserved", c_uint16), # 保留字段
("ip", c_char * 16) # 设备 IP 地址
]
# 定义 LivoxEthPacket 结构体
class LivoxEthPacket(Structure):
_fields_ = [
("version", c_uint8), # Packet protocol version.
("slot", c_uint8), # Slot number used for connecting LiDAR.
("id", c_uint8), # LiDAR id.
("rsvd", c_uint8), # Reserved.
("err_code", c_uint32), # Device error status indicator information.
("timestamp_type", c_uint8), # Timestamp type.
("data_type", c_uint8), # Point cloud coordinate format.
("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp.
("data", c_uint8 * 1) # Point cloud data (can be extended as needed).
]
# 定义 LivoxExtendRawPoint 结构体
class LivoxExtendRawPoint(Structure):
_fields_ = [
('x', c_int32), # X axis, Unit: mm
('y', c_int32), # Y axis, Unit: mm
('z', c_int32), # Z axis, Unit: mm
('reflectivity', c_uint8), # Reflectivity
('tag', c_uint8) # Tag
]
# 回调函数类型定义
DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(LivoxEthPacket), c_uint32)
ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage))
DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8)
DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo))
point_size = sizeof(LivoxExtendRawPoint)
# 数据回调函数
@DataCallbackType
def data_callback(handle, packet, data_num):
# print(f"Received data from handle {handle}, data num: {data_num}")
# 1. 计算数据的总字节数
total_data_size = data_num * point_size
# 2. 将 packet.data 转换为指向 LivoxExtendRawPoint 数组的指针
point_data_array_type = LivoxExtendRawPoint * data_num # 定义类型
point_data_pointer = cast(packet.data, POINTER(point_data_array_type)) # 将data转换为LivoxExtendRawPoint指针
# 3. 获取具体的点云数据
point_data_array = point_data_pointer.contents # 访问数组内容
# 4. 遍历每个 LivoxExtendRawPoint 并打印数据
for i in range(data_num):
point = point_data_array[i]
print(f"Point {i}: X={point.x}, Y={point.y}, Z={point.z}, Reflectivity={point.reflectivity}, Tag={point.tag}")
# 错误回调函数
@ErrorCallbackType
def error_callback(status, handle, error_message):
print(f"Error from handle {handle}, status: {status}")
# 设备状态变化回调函数
@DeviceChangeCallbackType
def device_change_callback(info, event_type):
print(f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}")
# 设备广播回调函数
@DeviceBroadcastCallbackType
def device_broadcast_callback(info):
print(f"New device broadcast received: {bytes(info.contents.broadcast_code).decode('utf-8')}")
# 注册回调函数
livox_sdk.register_data_callback(data_callback)
livox_sdk.register_error_callback(error_callback)
livox_sdk.register_device_change_callback(device_change_callback)
livox_sdk.register_device_broadcast_callback(device_broadcast_callback)
# 调用初始化和开始发现设备的函数
if livox_sdk.init_sdk():
print("Livox SDK initialized.")
else:
print("Failed to initialize Livox SDK.")
if livox_sdk.start_discovery():
print("Started discovering Livox devices.")
else:
print("Failed to discover devices.")
# 等待数据回调
try:
while True:
pass
except KeyboardInterrupt:
livox_sdk.stop_sdk()
print("SDK stopped.")