detect-gui/components/image_framework.py
2024-11-22 16:01:59 +08:00

366 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import threading
from ctypes import *
import numpy as np
from PyQt5.QtCore import QObject, pyqtSignal
from dynaconf.base import Settings
from numpy.ctypeslib import as_array
import platform
import time
import cv2
from core.edge_component import action, EdgeComponent, service
from util import get_system_and_library_suffix, imgProcess
# 给回调函数使用
class _SubRoiData_(Structure):
_fields_ = [
('mPparentId', c_long),
('mId', c_long),
('mInfo', c_char * 256),
('isGood', c_bool),
('vpOffset', c_float * 2),
('mRoiPoints', c_float * 8), # 4 * 2
('mInnerConners', c_float * 8), # 4 * 2
('mInnerConners3D', c_float * 12), # 4 * 3
('edges', c_float * 4),
('center', c_float * 2),
('cloud_size', c_int),
('mVc2D', POINTER(c_float)),
('mVc3D', POINTER(c_float)),
('H', c_float * 9),
]
SubRoiData = _SubRoiData_
class _CallbackInfo_(Structure):
_fields_ = [
('code', c_int),
('errorInfo', c_char * 256),
('bGetData', c_bool),
('mId', c_long),
('mInfo', c_char * 256),
('mImPath', c_char * 256),
('mIm2D3DPtr', c_char * 256),
('roi_size', c_int),
('subRois', SubRoiData * 16),
]
CallbackInfo = _CallbackInfo_
# 回调函数类型定义
CallbackType = CFUNCTYPE(None, POINTER(CallbackInfo))
# emit的结构体
MSG_ONLY_RECORD = -1
MSG_LOCATION_RECORD = 0
MSG_DETECTION_RECORD = 1
class Msg:
def __init__(self, msg_type, record, im2D):
self.msg_type = msg_type # 0刷新广角图片 1刷新高清 -1不刷新图片纯消息
self.record = record
self.im2D = im2D
TO_CLIENT_NOTIFY_BASE = 1000
SYS_BASE = 1000
CAL_2D3D_BASE = 2000
CAMERA_BASE = 4000
LIDAR_BASE = 5000
SYS_OK = 0
CAL_OK = 0
LIDAR_OK = 0
MSG_LIST = [
{ 1 + TO_CLIENT_NOTIFY_BASE : "开始相机拍摄" },
{ 2 + TO_CLIENT_NOTIFY_BASE : "相机拍摄已完成" },
{ 3 + TO_CLIENT_NOTIFY_BASE : "开始雷达采集数据" },
{ 4 + TO_CLIENT_NOTIFY_BASE : "雷达采集数据已完成" },
{ 5 + TO_CLIENT_NOTIFY_BASE : "开始定位检测" }, # 这个消息是相机实时定位,不用打印
{ 6 + TO_CLIENT_NOTIFY_BASE : "定位检测已完成" },
{ 7 + TO_CLIENT_NOTIFY_BASE : "开始角点2D检测" },
{ 8 + TO_CLIENT_NOTIFY_BASE : "角点2D检测已完成" },
{ 9 + TO_CLIENT_NOTIFY_BASE : "开始角点3D检测" },
{ 10 + TO_CLIENT_NOTIFY_BASE : "角点3D检测已完成" },
{ 11 + TO_CLIENT_NOTIFY_BASE : "开始计算预埋件尺寸" },
{ 12 + TO_CLIENT_NOTIFY_BASE : "预埋件尺寸计算已完成" },
{ 1 - SYS_BASE : "驱动初始化错误" },
{ 2 - SYS_BASE : "NPU计算单元初始化错误" },
{ 3 - SYS_BASE : "系统错误" },
{ 4 - SYS_BASE : "相机初始化错误" },
{ 5 - SYS_BASE : "当前没有可以读取的高清相片"},
{ 1 - CAL_2D3D_BASE : "读取图片文件错误" },
{ 2 - CAL_2D3D_BASE : "写图片文件错误" },
{ 3 - CAL_2D3D_BASE : "空指针" },
{ 4 - CAL_2D3D_BASE : "空参数" },
{ 5 - CAL_2D3D_BASE : "读取图像错误" },
{ 6 - CAL_2D3D_BASE : "NPU加载数据错误" },
{ 7 - CAL_2D3D_BASE : "NPU模型执行错误" },
{ 8 - CAL_2D3D_BASE : "NPU获取结果错误" },
{ 9 - CAL_2D3D_BASE : "NPU卸载数据错误" },
{ 10 - CAL_2D3D_BASE : "YOLO推理无可信ROI区域" },
{ 11 - CAL_2D3D_BASE : "ROI处理线程错误" },
{ 12 - CAL_2D3D_BASE : "H计算错误" },
{ 13 - CAL_2D3D_BASE : "3D角点检测错误" },
{ 14 - CAL_2D3D_BASE : "2D角点定位错误" },
{ 15 - CAL_2D3D_BASE : "3D投影计算错误" },
{ 16 - CAL_2D3D_BASE : "3D点云为空" },
{ 17 - CAL_2D3D_BASE : "未检测到2D角点" },
{ 18 - CAL_2D3D_BASE : "未检测到预埋件" },
{ 19 - CAL_2D3D_BASE : "相机捕获图像失败" },
{ 1 - CAMERA_BASE : "相机DLL初始化失败" },
{ 1 - LIDAR_BASE : "雷达初始化SDK失败" },
{ 2 - LIDAR_BASE : "雷达SO初始化失败" },
{ 3 - LIDAR_BASE : "雷达广播失败" },
{ 5 - LIDAR_BASE : "雷达开始采样失败" },
{ 6 - LIDAR_BASE : "雷达结束采样失败" },
{ -10001 : "已经在检测模式中"},
{ -10002 : "定位模式不支持执行该操作"}
]
####################################################################################
# 全局变量定义
####################################################################################
# 全局 msg 缓存
g_msg_cache = []
# 全局 frame 缓存
g_frame_cache = []
# 运行模式
RUNNING_MODE_NONE = -1 # 停止状态
RUNNING_MODE_LOCATION = 0 # 广角相机运行模式
RUNNING_MODE_DETECTION = 1 # 高清检测运行模式
g_running_mode = RUNNING_MODE_LOCATION # 默认是广角相机运行模式
####################################################################################
# 内部接口定义
####################################################################################
# 获取消息
def _get_msg_info(code) -> str:
global MSG_LIST
for msg in MSG_LIST:
if code == msg.keys()[0]:
return msg.values
return "未知错误"
# 加工检测图像
def _update_location_image(frame):
global needHandle, recordCache, ckinfoCache
def switch(choice):
if ckinfoCache.code == 1006: #TO_CLIENT_NOTIFY_DETECTION_END: 1006
for i in range(recordCache.roi_size):
subRois = recordCache.subRois[i]
s = ""
for j in range(8):
s += str(subRois.mRoiPoints[j])+ " "
else:
print("default Case")
# 在回调函数中将 record 数据拷贝出来
def _copy_record(record):
new_record = CallbackInfo()
new_record.code = record.code
new_record.errorInfo = record.errorInfo
new_record.bGetData = record.bGetData
for i in range(record.roi_size):
subRois = record.subRois[i]
newSubRois = new_record.subRois[i]
for j in range(8):
newSubRois.mRoiPoints[j] = subRois.mRoiPoints[j]
newSubRois.mInnerConners[j] = subRois.mInnerConners[j]
for j in range(12):
newSubRois.mInnerConners3D[j] = subRois.mInnerConners3D[j]
return new_record
class ImageFrameworkSignals(QObject):
# QT 信号
on_image_detect_result = pyqtSignal(Msg)
@action("image-framework", auto_start=True)
class ImageFramework(EdgeComponent):
signals = ImageFrameworkSignals()
def __init__(self, context):
super().__init__(context)
self.image_framework_sdk = None
self.data_callback = None
self.location_thread = None
self.process_thread = None
self.thread_running = False
def configure(self, setting: Settings) -> None:
self.init_image_framework_sdk()
self.location_thread = threading.Thread(target=self._location_processing_thread)
self.location_thread.daemon = True
self.process_thread = threading.Thread(target=self._main_processing_thread)
self.process_thread.daemon = True
self.logger.info(f"Image framework configure done.")
def init_image_framework_sdk(self):
try:
# 加载C++库
current_file_dir = os.path.dirname(os.path.abspath(__file__))
(arch, suffix) = get_system_and_library_suffix()
self.image_framework_sdk = CDLL(os.path.join(current_file_dir, f'../vendors/image-framework/{arch}/image_framework.{suffix}'))
# 设置回调函数
self.init_callback()
except Exception as e:
self.logger.error(f"Load Image framework sdk failed: {str(e)}")
def init_callback(self):
def _callback(data):
global g_msg_cache, g_frame_cache, g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
frame = None
ret = SYS_OK
record.code = data.contents.code
record.errorInfo = data.contents.errorInfo
# 当定位模式下,接收到【定位完成】消息
if g_running_mode == RUNNING_MODE_LOCATION and record.code == (6 + TO_CLIENT_NOTIFY_BASE):
if len(g_frame_cache) == 0:
self.logger.error("当前没有广角数据")
return
frame = g_frame_cache.pop() # 从图像缓存中去除图像
g_frame_cache = [] # 清空缓存,避免无限增长
msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), frame)
# 当检测模式下,接收到【拍照完成】消息
elif g_running_mode == RUNNING_MODE_DETECTION and record.code == (2 + TO_CLIENT_NOTIFY_BASE):
frame.zeros(9344, 7000, cv2.CV_8UC3)
src_frame_ptr = c_char_p(frame.data.tobytes())
ret = self.image_framework_sdk.LibapiGetHQImage(
src_frame_ptr, frame.shape[1], frame.shape[0], 3, c_char_p(b"PythonProcessing"))
if ret != SYS_OK:
record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
msg = Msg(MSG_ONLY_RECORD, record, None)
# emit msg
return
msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), frame)
elif g_running_mode == RUNNING_MODE_LOCATION or g_running_mode == RUNNING_MODE_DETECTION:
msg = Msg(MSG_ONLY_RECORD, _copy_record(record), None)
else:
print("未知回调")
return
g_msg_cache.append(msg)
# self.logger.info("image framework callback done.")
self.data_callback = CallbackType(_callback)
@service()
def start(self):
super().start()
if self.image_framework_sdk is None:
raise RuntimeError("Image framework SDK未正确加载!")
ret = self.image_framework_sdk.LibapiInit(1, "", self.data_callback)
if ret != 0:
raise RuntimeError("Image framework 初始化失败!")
self.logger.info("Image framework started.")
@service()
def stop(self):
super().stop()
self.stop_location()
# self.image_framework_sdk.LibapiStop()
self.logger.info("Image framework stopped.")
@service()
def start_location(self):
self.thread_running = True
self.location_thread.start()
self.process_thread.start()
@service()
def stop_location(self):
self.thread_running = False
if self.location_thread is not None:
self.location_thread.join()
if self.process_thread is not None:
self.process_thread.join()
@service()
def start_detect(self):
global g_running_mode
if g_running_mode != RUNNING_MODE_DETECTION:
g_running_mode = RUNNING_MODE_DETECTION # 切换为检测模式
ret = self.image_framework_sdk.LibapiStartDetection()
if ret != 0:
raise RuntimeError("启动检测失败!")
@service()
def continue_detect(self):
ret = self.image_framework_sdk.LibapiContinuetDetection()
if ret != 0:
raise RuntimeError("继续检测失败!")
@service()
def stop_detect(self):
global g_running_mode
if g_running_mode == RUNNING_MODE_LOCATION: # 如果在定位任务模式下,该接口不执行
raise RuntimeError("正在检测,请稍后停止!")
g_running_mode = RUNNING_MODE_NONE # 切换为无模式
ret = self.image_framework_sdk.LibapStopDetection()
if ret != 0:
raise RuntimeError("停止检测失败!")
# 广角相机运行定位处理线程
def _location_processing_thread(self):
global g_msg_cache, g_frame_cache, g_running_mode
# 打开广角摄像头
capture = cv2.VideoCapture(0)
while self.thread_running:
# 如果当前不是定位模式,不处理任何事务
if g_running_mode != RUNNING_MODE_LOCATION:
time.sleep(0.5)
continue
# 获取一帧图像
_, frame = capture.read()
# 如果图像为空,等待一秒,上报消息
if frame is None:
time.sleep(0.1)
self.logger.error("[ERROR] 广角相机未捕获到图像")
continue
src_frame_ptr = c_char_p(frame.data.tobytes())
ret = self.image_framework_sdk.LibapiCameraSendMsgWithImage(
src_frame_ptr, frame.shape[1], frame.shape[0], 3,
c_char_p(b"PythonProcessing"), c_char_p(b"_t_PluginCameraPro"))
if ret != SYS_OK:
self.logger.error(_get_msg_info(ret))
# emit msg
continue
# 向 g_frame_cache 中加入消息,对图像进行二次加工
g_frame_cache.append(frame)
# 高清相机运行检测处理线程
def _main_processing_thread(self):
global g_msg_cache, g_frame_cache, g_running_mode
while self.thread_running:
if len(g_msg_cache) <= 0:
time.sleep(0.01)
continue
msg = g_msg_cache.pop()
if msg.msg_type == MSG_LOCATION_RECORD:
# cv2.imshow("LOCATION", msg.im2D)
# cv2.waitKey(1)
msg = imgProcess.process(msg)
self.signals.on_image_detect_result.emit(msg)