detect-gui/components/image_framework.py
2024-11-25 16:35:23 +08:00

434 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import threading
from ctypes import *
import numpy as np
from PyQt5.QtCore import QObject, pyqtSignal
from dynaconf.base import Settings
from numpy.ctypeslib import as_array
import platform
import time
import cv2
from core.edge_component import action, EdgeComponent, service
from util import get_system_and_library_suffix, imgProcess
# 给回调函数使用
class _SubRoiData_(Structure):
_fields_ = [
('mPparentId', c_long),
('mId', c_long),
('mInfo', c_char * 256),
('isGood', c_bool),
('vpOffset', c_float * 2),
('mRoiPoints', c_float * 8), # 4 * 2
('mInnerConners', c_float * 8), # 4 * 2
('mInnerConners3D', c_float * 12), # 4 * 3
('edges', c_float * 4),
('center', c_float * 2),
('cloud_size', c_int),
('mVc2D', POINTER(c_float)),
('mVc3D', POINTER(c_float)),
('H', c_float * 9),
]
SubRoiData = _SubRoiData_
class _CallbackInfo_(Structure):
_fields_ = [
('code', c_int),
('errorInfo', c_char * 256),
('bGetData', c_bool),
('mId', c_long),
('mInfo', c_char * 256),
('mImPath', c_char * 256),
('mIm2D3DPtr', c_char * 256),
('roi_size', c_int),
('subRois', SubRoiData * 16),
]
CallbackInfo = _CallbackInfo_
# 回调函数类型定义
CallbackType = CFUNCTYPE(None, POINTER(CallbackInfo))
# emit的结构体
MSG_ONLY_RECORD = -1
MSG_LOCATION_RECORD = 0
MSG_DETECTION_RECORD = 1
class Msg:
def __init__(self, msg_type, record, im2D):
self.msg_type = msg_type # 0刷新广角图片 1刷新高清 -1不刷新图片纯消息
self.record = record
self.im2D = im2D
TO_CLIENT_NOTIFY_BASE = 1000
SYS_BASE = 1000
CAL_2D3D_BASE = 2000
CAMERA_BASE = 4000
LIDAR_BASE = 5000
SYS_OK = 0
CAL_OK = 0
LIDAR_OK = 0
MSG_LIST = [
{1 + TO_CLIENT_NOTIFY_BASE: "开始相机拍摄"},
{2 + TO_CLIENT_NOTIFY_BASE: "相机拍摄已完成"},
{3 + TO_CLIENT_NOTIFY_BASE: "开始雷达采集数据"},
{4 + TO_CLIENT_NOTIFY_BASE: "雷达采集数据已完成"},
{5 + TO_CLIENT_NOTIFY_BASE: "开始定位检测"}, # 这个消息是相机实时定位,不用打印
{6 + TO_CLIENT_NOTIFY_BASE: "定位检测已完成"},
{7 + TO_CLIENT_NOTIFY_BASE: "开始角点2D检测"},
{8 + TO_CLIENT_NOTIFY_BASE: "角点2D检测已完成"},
{9 + TO_CLIENT_NOTIFY_BASE: "开始角点3D检测"},
{10 + TO_CLIENT_NOTIFY_BASE: "角点3D检测已完成"},
{11 + TO_CLIENT_NOTIFY_BASE: "开始计算预埋件尺寸"},
{12 + TO_CLIENT_NOTIFY_BASE: "预埋件尺寸计算已完成"},
{1 - SYS_BASE: "驱动初始化错误"},
{2 - SYS_BASE: "NPU计算单元初始化错误"},
{3 - SYS_BASE: "系统错误"},
{4 - SYS_BASE: "相机初始化错误"},
{5 - SYS_BASE: "当前没有可以读取的高清相片"},
{1 - CAL_2D3D_BASE: "读取图片文件错误"},
{2 - CAL_2D3D_BASE: "写图片文件错误"},
{3 - CAL_2D3D_BASE: "空指针"},
{4 - CAL_2D3D_BASE: "空参数"},
{5 - CAL_2D3D_BASE: "读取图像错误"},
{6 - CAL_2D3D_BASE: "NPU加载数据错误"},
{7 - CAL_2D3D_BASE: "NPU模型执行错误"},
{8 - CAL_2D3D_BASE: "NPU获取结果错误"},
{9 - CAL_2D3D_BASE: "NPU卸载数据错误"},
{10 - CAL_2D3D_BASE: "YOLO推理无可信ROI区域"},
{11 - CAL_2D3D_BASE: "ROI处理线程错误"},
{12 - CAL_2D3D_BASE: "H计算错误"},
{13 - CAL_2D3D_BASE: "3D角点检测错误"},
{14 - CAL_2D3D_BASE: "2D角点定位错误"},
{15 - CAL_2D3D_BASE: "3D投影计算错误"},
{16 - CAL_2D3D_BASE: "3D点云为空"},
{17 - CAL_2D3D_BASE: "未检测到2D角点"},
{18 - CAL_2D3D_BASE: "未检测到预埋件"},
{19 - CAL_2D3D_BASE: "相机捕获图像失败"},
{1 - CAMERA_BASE: "相机DLL初始化失败"},
{1 - LIDAR_BASE: "雷达初始化SDK失败"},
{2 - LIDAR_BASE: "雷达SO初始化失败"},
{3 - LIDAR_BASE: "雷达广播失败"},
{5 - LIDAR_BASE: "雷达开始采样失败"},
{6 - LIDAR_BASE: "雷达结束采样失败"},
{-10001: "已经在检测模式中"},
{-10002: "定位模式不支持执行该操作"}
]
####################################################################################
# 全局变量定义
####################################################################################
# 全局 msg 缓存
g_msg_cache = []
# 全局 frame 缓存
g_frame_cache = []
# 运行模式
RUNNING_MODE_NONE = -1 # 停止状态
RUNNING_MODE_LOCATION = 0 # 广角相机运行模式
RUNNING_MODE_DETECTION = 1 # 高清检测运行模式
g_running_mode = RUNNING_MODE_NONE # 默认是广角相机运行模式
####################################################################################
# 内部接口定义
####################################################################################
# 获取消息
def _get_msg_info(code) -> str:
global MSG_LIST
for msg in MSG_LIST:
for key, value in msg.items():
if code == key:
return value.encode("utf-8")
return "未知错误".encode("utf-8")
get_msg_info = _get_msg_info
# 加工检测图像
def _update_location_image(frame):
global needHandle, recordCache, ckinfoCache
def switch(choice):
if ckinfoCache.code == 1006: # TO_CLIENT_NOTIFY_DETECTION_END: 1006
for i in range(recordCache.roi_size):
subRois = recordCache.subRois[i]
s = ""
for j in range(8):
s += str(subRois.mRoiPoints[j]) + " "
else:
print("default Case")
# 在回调函数中将 record 数据拷贝出来
def _copy_record(record):
new_record = CallbackInfo()
new_record.code = record.code
new_record.errorInfo = record.errorInfo
new_record.bGetData = record.bGetData
new_record.roi_size = record.roi_size
for i in range(record.roi_size):
subRois = record.subRois[i]
newSubRois = new_record.subRois[i]
for j in range(8):
newSubRois.mRoiPoints[j] = subRois.mRoiPoints[j]
newSubRois.mInnerConners[j] = subRois.mInnerConners[j]
for j in range(12):
newSubRois.mInnerConners3D[j] = subRois.mInnerConners3D[j]
return new_record
class ImageFrameworkSignals(QObject):
# QT 信号
on_image_detect_result = pyqtSignal(Msg)
@action("image-framework", auto_start=True)
class ImageFramework(EdgeComponent):
signals = ImageFrameworkSignals()
def __init__(self, context):
super().__init__(context)
self.image_framework_sdk = None
self.data_callback = None
self.location_thread = None
self.process_thread = None
self.times = 0
def configure(self, setting: Settings) -> None:
self.init_image_framework_sdk()
self.location_thread = threading.Thread(target=self._location_processing_thread)
self.location_thread.daemon = True
self.process_thread = threading.Thread(target=self._main_processing_thread)
self.process_thread.daemon = True
self.location_thread.start()
self.process_thread.start()
self.logger.info(f"Image framework configure done.")
def init_image_framework_sdk(self):
try:
# 加载C++库
current_file_dir = os.path.dirname(os.path.abspath(__file__))
(arch, suffix) = get_system_and_library_suffix()
self.image_framework_sdk = CDLL(
os.path.join(current_file_dir, f'../vendors/image-framework/{arch}/image_framework.{suffix}'))
# 设置回调函数
self.init_callback()
except Exception as e:
self.logger.error(f"Load Image framework sdk failed: {str(e)}")
def init_callback(self):
def _callback(data):
global g_msg_cache, g_frame_cache, g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
frame = None
ret = SYS_OK
msg = None
# 获取数据
data_type = CallbackInfo
data_ptr = cast(data, POINTER(data_type))
ckinfoCache = data_ptr.contents
record = _copy_record(ckinfoCache)
print(str(g_running_mode) + " : " + str(record.code) + " : " + bytes.decode(record.errorInfo))
# 当定位模式下,接收到【定位完成】消息
if g_running_mode == RUNNING_MODE_LOCATION and record.code == (6 + TO_CLIENT_NOTIFY_BASE):
if len(g_frame_cache) == 0:
self.logger.error("当前没有广角数据")
return
frame = g_frame_cache.pop() # 从图像缓存中去除图像
g_frame_cache = [] # 清空缓存,避免无限增长
msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), frame)
# 当检测模式下,接收到【拍照完成】消息
elif (g_running_mode == RUNNING_MODE_DETECTION and record.code == (8 + TO_CLIENT_NOTIFY_BASE)) or \
(g_running_mode == RUNNING_MODE_DETECTION and record.code == (12 + TO_CLIENT_NOTIFY_BASE)):
frame = np.zeros((7000, 9344, 3), np.uint8)
# frame.zeros(9344, 7000, cv2.CV_8UC3)
# src_frame_ptr = c_char_p(frame.data.tobytes())
mv = memoryview(frame.data)
buffer_pointer = cast(mv.obj.ctypes.data, c_void_p)
ret = self.image_framework_sdk.LibapiGetHQImage(
buffer_pointer, frame.shape[1], frame.shape[0], 3, c_char_p(b"PythonProcessing"))
if ret != SYS_OK:
print("ret != SYS_OK:" + str(ret))
record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
msg = Msg(MSG_DETECTION_RECORD, record, None)
g_msg_cache.append(msg)
# emit msg
return
msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), frame)
elif g_running_mode == RUNNING_MODE_LOCATION:
msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), None)
elif g_running_mode == RUNNING_MODE_DETECTION:
msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
else:
print("未知回调")
return
g_msg_cache.append(msg)
# self.logger.info("image framework callback done.")
self.data_callback = CallbackType(_callback)
@service()
def start(self):
super().start()
if self.image_framework_sdk is None:
raise RuntimeError("Image framework SDK未正确加载!")
ret = self.image_framework_sdk.LibapiInit(1, "", self.data_callback)
if ret != 0:
self.logger.info("============================》" + str(ret))
raise RuntimeError("Image framework 初始化失败!")
self.logger.info("Image framework started.")
@service()
def stop(self):
if self.location_thread is not None:
self.location_thread.join()
if self.process_thread is not None:
self.process_thread.join()
self.image_framework_sdk.stop_sdk()
self.logger.info("Image framework stopped.")
@service()
def start_location(self):
global g_running_mode
self.logger.info("start_location")
if g_running_mode != RUNNING_MODE_LOCATION:
g_running_mode = RUNNING_MODE_LOCATION # 切换为定位模式
@service()
def stop_all(self):
global g_running_mode
if g_running_mode != RUNNING_MODE_NONE:
g_running_mode = RUNNING_MODE_NONE # 切换为静默模式
@service()
def start_adjust(self):
global g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
if g_running_mode != RUNNING_MODE_DETECTION:
g_running_mode = RUNNING_MODE_DETECTION # 切换为检测模式
else:
record.code = -10001
record.errorInfo = _get_msg_info(-10001)
return record
ret = self.image_framework_sdk.LibapiStartDetection()
if ret != SYS_OK:
record = CallbackInfo()
record.code = ret
record.errorInfo = _get_msg_info(ret)
return record
return record
@service()
def start_check(self):
global g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
if g_running_mode == RUNNING_MODE_LOCATION: # 如果在定位任务模式下,该接口不执行
record.code = -10002
record.errorInfo = _get_msg_info(-10002)
return MSG_DETECTION_RECORD
ret = self.image_framework_sdk.LibapiContinuetDetection()
if ret != SYS_OK:
record.code = ret
record.errorInfo = _get_msg_info(ret)
return record
return record
@service()
def stop_check(self):
global g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
if g_running_mode == RUNNING_MODE_LOCATION: # 如果在定位任务模式下,该接口不执行
record.code = -10002
record.errorInfo = _get_msg_info(-10002)
return record
ret = self.image_framework_sdk.LibapStopDetection()
if ret != SYS_OK:
record.code = ret
record.errorInfo = _get_msg_info(ret)
return record
g_running_mode = RUNNING_MODE_LOCATION # 切换为无模式
return record
# 广角相机运行定位处理线程
def _location_processing_thread(self):
global g_msg_cache, g_frame_cache, g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
# 打开广角摄像头
capture = cv2.VideoCapture(0)
while True:
# 如果当前不是定位模式,不处理任何事务
if g_running_mode != RUNNING_MODE_LOCATION:
time.sleep(0.5)
g_frame_cache = []
continue
# 上一帧还未处理完毕
if len(g_frame_cache) > 0:
continue
# 获取一帧图像
_, frame = capture.read()
# 如果图像为空,等待一秒,上报消息
if frame is None:
time.sleep(0.1)
self.logger.error("[ERROR] 广角相机未捕获到图像")
continue
print("location run")
if self.times % 3 == 0:
src_frame_ptr = c_char_p(frame.data.tobytes())
ret = self.image_framework_sdk.LibapiCameraSendMsgWithImage(
src_frame_ptr, frame.shape[1], frame.shape[0], 3,
c_char_p(b"PythonProcessing"), c_char_p(b"_t_PluginCameraPro"))
if ret != SYS_OK:
record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
msg = Msg(MSG_ONLY_RECORD, record, None)
self.logger.error(_get_msg_info(ret))
# emit msg
continue
# 向 g_frame_cache 中加入消息,对图像进行二次加工
g_frame_cache.append(frame)
self.times += 1
if self.times == 999:
self.times = 0
# 高清相机运行检测处理线程
def _main_processing_thread(self):
global g_msg_cache, g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
while True:
if len(g_msg_cache) <= 0:
time.sleep(0.01)
continue
msg = g_msg_cache.pop()
msg = imgProcess.process(msg)
self.logger.info("msg ==> ui")
self.signals.on_image_detect_result.emit(msg)