import os from ctypes import * import numpy as np from PyQt5.QtCore import QObject, pyqtSignal from dynaconf.base import Settings from numpy.ctypeslib import as_array import platform import time from core.edge_component import action, EdgeComponent, service from util import get_system_and_library_suffix # 给回调函数使用 class _SubRoiData_(Structure): _fields_ = [ ('mPparentId', c_long), ('mId', c_long), ('mInfo', c_char * 256), ('isGood', c_bool), ('vpOffset', c_float * 2), ('mRoiPoints', c_float * 8), # 4 * 2 ('mInnerConners', c_float * 8), # 4 * 2 ('mInnerConners3D', c_float * 12), # 4 * 3 ('edges', c_float * 4), ('center', c_float * 2), ('cloud_size', c_int), ('mVc2D', POINTER(c_float)), ('mVc3D', POINTER(c_float)), ('H', c_float * 9), ] SubRoiData = _SubRoiData_ class _CallbackInfo_(Structure): _fields_ = [ ('code', c_int), ('errorInfo', c_char * 256), ('bGetData', c_bool), ('mId', c_long), ('mInfo', c_char * 256), ('mImPath', c_char * 256), ('mIm2D3DPtr', c_char * 256), ('roi_size', c_int), ('subRois', SubRoiData * 16), ] CallbackInfo = _CallbackInfo_ # 回调函数类型定义 CallbackType = CFUNCTYPE(None, POINTER(CallbackInfo)) class ImageFrameworkSignals(QObject): # QT 信号 on_image_result = pyqtSignal(object) @action("image-framework", auto_start=True) class ImageFramework(EdgeComponent): signals = ImageFrameworkSignals() def __init__(self, context): super().__init__(context) self.image_framework_sdk = None self.data_callback = None def configure(self, setting: Settings) -> None: self.init_image_framework_sdk() self.logger.info(f"Image framework configure done.") def init_image_framework_sdk(self): try: # 加载C++库 current_file_dir = os.path.dirname(os.path.abspath(__file__)) (arch, suffix) = get_system_and_library_suffix() self.image_framework_sdk = CDLL(os.path.join(current_file_dir, f'../vendors/image-framework/{arch}/image_framework.{suffix}')) # 设置回调函数 self.init_callback() except Exception as e: self.logger.error(f"Load Image framework sdk failed: {str(e)}") def init_callback(self): def _callback(data): contents = data.contents self.logger.info(f"image framework callback data, bGetData:{contents.bGetData}") if contents.bGetData: record = CallbackInfo() self.image_framework_sdk.LibapGetRecord(byref(record)) self.logger.info(f"image framework callback data, roi_size:{record.roi_size}") self.logger.info(f"image framework callback data, code:{record.code}") self.logger.info(f"image framework callback data, errorInfo:{record.errorInfo}") for i in range(record.roi_size): roi_data = record.subRois[i] self.logger.info(f"roi data, roi_data[{i}].mId:{roi_data.mId}") self.logger.info(f"roi data, roi_data[{i}].isGood:{roi_data.isGood}") self.logger.info(f"roi data, roi_data[{i}].mRoiPoints:{list(roi_data.mRoiPoints)}") self.logger.info("image framework callback done.") self.data_callback = CallbackType(_callback) @service() def start(self): super().start() if self.image_framework_sdk is None: raise RuntimeError("Image framework SDK未正确加载!") ret = self.image_framework_sdk.LibapiInit(1, "", self.data_callback) if ret != 0: raise RuntimeError("Image framework 初始化失败!") self.logger.info("Image framework started.") @service() def stop(self): self.image_framework_sdk.stop_sdk() self.logger.info("Image framework stopped.") @service() def start_detect(self): ret = self.image_framework_sdk.LibapiStartDetection() if ret != 0: raise RuntimeError("启动检测失败!") @service() def continue_detect(self): ret = self.image_framework_sdk.LibapiContinuetDetection() if ret != 0: raise RuntimeError("继续检测失败!") @service() def stop_detect(self): ret = self.image_framework_sdk.LibapStopDetection() if ret != 0: raise RuntimeError("停止检测失败!")