import ctypes from ctypes import * # 加载C++库 livox_sdk = ctypes.CDLL('./liblivox_sdk_wrapper.so') # 预定义的常量 kBroadcastCodeSize = 16 # 广播码长度 # 定义枚举类型 class DeviceType(c_int): kDeviceTypeHub = 0 kDeviceTypeLidarMid40 = 1 kDeviceTypeLidarTele = 2 kDeviceTypeLidarHorizon = 3 kDeviceTypeLidarMid70 = 6 kDeviceTypeLidarAvia = 7 class LidarState(c_int): kLidarStateInit = 0 kLidarStateNormal = 1 kLidarStatePowerSaving = 2 kLidarStateStandBy = 3 kLidarStateError = 4 kLidarStateUnknown = 5 class LidarMode(c_int): kLidarModeNormal = 1 kLidarModePowerSaving = 2 kLidarModeStandby = 3 class LidarFeature(c_int): kLidarFeatureNone = 0 kLidarFeatureRainFog = 1 class LidarIpMode(c_int): kLidarDynamicIpMode = 0 kLidarStaticIpMode = 1 class LidarScanPattern(c_int): kNoneRepetitiveScanPattern = 0 kRepetitiveScanPattern = 1 class LivoxStatus(c_int): kStatusSendFailed = -9 kStatusHandlerImplNotExist = -8 kStatusInvalidHandle = -7 kStatusChannelNotExist = -6 kStatusNotEnoughMemory = -5 kStatusTimeout = -4 kStatusNotSupported = -3 kStatusNotConnected = -2 kStatusFailure = -1 kStatusSuccess = 0 # 定义结构体和联合体 class LidarErrorCode(Structure): _fields_ = [ ("temp_status", c_uint32, 2), ("volt_status", c_uint32, 2), ("motor_status", c_uint32, 2), ("dirty_warn", c_uint32, 2), ("firmware_err", c_uint32, 1), ("pps_status", c_uint32, 1), ("device_status", c_uint32, 1), ("fan_status", c_uint32, 1), ("self_heating", c_uint32, 1), ("ptp_status", c_uint32, 1), ("time_sync_status", c_uint32, 3), ("rsvd", c_uint32, 13), ("system_status", c_uint32, 2), ] class HubErrorCode(Structure): _fields_ = [ ("sync_status", c_uint32, 2), ("temp_status", c_uint32, 2), ("lidar_status", c_uint32, 1), ("lidar_link_status", c_uint32, 1), ("firmware_err", c_uint32, 1), ("rsvd", c_uint32, 23), ("system_status", c_uint32, 2), ] class ErrorMessage(Union): _fields_ = [ ("error_code", c_uint32), ("lidar_error_code", LidarErrorCode), ("hub_error_code", HubErrorCode) ] class StatusUnion(Union): _fields_ = [ ("error_code", c_uint32), # 假设这个字段为示例 ("lidar_error_code", LidarErrorCode), ("hub_error_code", HubErrorCode) ] # 定义 DeviceInfo 结构体 class DeviceInfo(Structure): _fields_ = [ ("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最大15个字符,带终止符 ("handle", c_uint8), # 设备句柄 ("slot", c_uint8), # 插槽编号 ("id", c_uint8), # LiDAR id ("type", DeviceType), # 设备类型 ("data_port", c_uint16), # 点云数据UDP端口 ("cmd_port", c_uint16), # 控制命令UDP端口 ("sensor_port", c_uint16), # IMU数据UDP端口 ("ip", c_char * 16), # IP地址 ("state", LidarState), # LiDAR状态 ("feature", LidarFeature), # LiDAR特性 ("status", StatusUnion), # LiDAR工作状态 ("firmware_version", c_uint8 * 4) # 固件版本 ] # 定义 BroadcastDeviceInfo 结构体 class BroadcastDeviceInfo(Structure): _fields_ = [ ("broadcast_code", c_char * kBroadcastCodeSize), # 广播码,最多15个字符,带终止符 ("dev_type", c_uint8), # 设备类型,参考 DeviceType ("reserved", c_uint16), # 保留字段 ("ip", c_char * 16) # 设备 IP 地址 ] # 定义 LivoxEthPacket 结构体 class LivoxEthPacket(Structure): _fields_ = [ ("version", c_uint8), # Packet protocol version. ("slot", c_uint8), # Slot number used for connecting LiDAR. ("id", c_uint8), # LiDAR id. ("rsvd", c_uint8), # Reserved. ("err_code", c_uint32), # Device error status indicator information. ("timestamp_type", c_uint8), # Timestamp type. ("data_type", c_uint8), # Point cloud coordinate format. ("timestamp", c_uint8 * 8), # Nanosecond or UTC format timestamp. ("data", c_uint8 * 1) # Point cloud data (can be extended as needed). ] # 定义 LivoxExtendRawPoint 结构体 class LivoxExtendRawPoint(Structure): _fields_ = [ ('x', c_int32), # X axis, Unit: mm ('y', c_int32), # Y axis, Unit: mm ('z', c_int32), # Z axis, Unit: mm ('reflectivity', c_uint8), # Reflectivity ('tag', c_uint8) # Tag ] # 回调函数类型定义 DataCallbackType = CFUNCTYPE(None, c_uint8, POINTER(LivoxEthPacket), c_uint32) ErrorCallbackType = CFUNCTYPE(None, c_uint8, c_uint8, POINTER(ErrorMessage)) DeviceChangeCallbackType = CFUNCTYPE(None, POINTER(DeviceInfo), c_uint8) DeviceBroadcastCallbackType = CFUNCTYPE(None, POINTER(BroadcastDeviceInfo)) point_size = sizeof(LivoxExtendRawPoint) # 数据回调函数 @DataCallbackType def data_callback(handle, packet, data_num): # print(f"Received data from handle {handle}, data num: {data_num}") # 1. 计算数据的总字节数 total_data_size = data_num * point_size # 2. 将 packet.data 转换为指向 LivoxExtendRawPoint 数组的指针 point_data_array_type = LivoxExtendRawPoint * data_num # 定义类型 point_data_pointer = cast(packet.data, POINTER(point_data_array_type)) # 将data转换为LivoxExtendRawPoint指针 # 3. 获取具体的点云数据 point_data_array = point_data_pointer.contents # 访问数组内容 # 4. 遍历每个 LivoxExtendRawPoint 并打印数据 for i in range(data_num): point = point_data_array[i] print(f"Point {i}: X={point.x}, Y={point.y}, Z={point.z}, Reflectivity={point.reflectivity}, Tag={point.tag}") # 错误回调函数 @ErrorCallbackType def error_callback(status, handle, error_message): print(f"Error from handle {handle}, status: {status}") # 设备状态变化回调函数 @DeviceChangeCallbackType def device_change_callback(info, event_type): print(f"Device {bytes(info.contents.broadcast_code).decode('utf-8')} changed state, event type: {event_type}") # 设备广播回调函数 @DeviceBroadcastCallbackType def device_broadcast_callback(info): print(f"New device broadcast received: {bytes(info.contents.broadcast_code).decode('utf-8')}") # 注册回调函数 livox_sdk.register_data_callback(data_callback) livox_sdk.register_error_callback(error_callback) livox_sdk.register_device_change_callback(device_change_callback) livox_sdk.register_device_broadcast_callback(device_broadcast_callback) # 调用初始化和开始发现设备的函数 if livox_sdk.init_sdk(): print("Livox SDK initialized.") else: print("Failed to initialize Livox SDK.") if livox_sdk.start_discovery(): print("Started discovering Livox devices.") else: print("Failed to discover devices.") # 等待数据回调 try: while True: pass except KeyboardInterrupt: livox_sdk.stop_sdk() print("SDK stopped.")