ymj_data_collect/detect.py

273 lines
9.3 KiB
Python
Raw Normal View History

2025-02-25 10:15:21 +08:00
import os
import shutil
import sys
import threading
import time
from ctypes import *
import logging
from logging import makeLogRecord
from clog import logger
import cv2
import numpy as np
from imgProcess import detectionResProcessing
from util import change_config_ini
image_framework_sdk = None
data_callback = None
SYS_OK = 0
bStop = False
# 全局 msg 缓存
g_msg_cache = []
# 全局 frame 缓存
g_frame_cache = []
# 运行模式
RUNNING_MODE_NONE = -1 # 停止状态
RUNNING_MODE_LOCATION = 0 # 广角相机运行模式
RUNNING_MODE_DETECTION = 1 # 高清检测运行模式
g_running_mode = RUNNING_MODE_NONE # 默认是广角相机运行模式
TO_CLIENT_NOTIFY_BASE = 1000
# emit的结构体
MSG_ONLY_RECORD = -1
MSG_LOCATION_RECORD = 0
MSG_DETECTION_RECORD = 1
# 获取消息
def _get_msg_info(code) -> str:
global MSG_LIST
for msg in MSG_LIST:
for key, value in msg.items():
if code == key:
return value.encode("utf-8")
return "未知错误".encode("utf-8")
class Msg:
def __init__(self, msg_type, record, im2D):
self.msg_type = msg_type # 0刷新广角图片 1刷新高清 -1不刷新图片纯消息
self.record = record
self.im2D = im2D
class _SubRoiData_(Structure):
_fields_ = [
('mPparentId', c_long),
('mId', c_long),
('mInfo', c_char * 256),
('isGood', c_bool),
('vpOffset', c_float * 2),
('mRoiPoints', c_float * 8), # 4 * 2
('mInnerConners', c_float * 8), # 4 * 2
('mInnerConners3D', c_float * 12), # 4 * 3
('edges', c_float * 4),
('center', c_float * 2),
('cloud_size', c_int),
('mVc2D', POINTER(c_float)),
('mVc3D', POINTER(c_float)),
('H', c_float * 9),
]
SubRoiData = _SubRoiData_
class _CallbackInfo_(Structure):
_fields_ = [
('code', c_int),
('errorInfo', c_char * 256),
('bGetData', c_bool),
('mId', c_long),
('mInfo', c_char * 256),
('mImPath', c_char * 256),
('mIm2D3DPtr', c_char * 256),
('roi_size', c_int),
('subRois', SubRoiData * 16),
]
CallbackInfo = _CallbackInfo_
# 回调函数类型定义
CallbackType = CFUNCTYPE(None, POINTER(CallbackInfo))
def _copy_record(record):
new_record = CallbackInfo()
new_record.code = record.code
new_record.errorInfo = record.errorInfo
new_record.bGetData = record.bGetData
new_record.roi_size = record.roi_size
for i in range(record.roi_size):
if i > 15: # 超过16个ROI返回
break
subRois = record.subRois[i]
newSubRois = new_record.subRois[i]
for j in range(8):
newSubRois.mRoiPoints[j] = subRois.mRoiPoints[j]
newSubRois.mInnerConners[j] = subRois.mInnerConners[j]
for j in range(12):
newSubRois.mInnerConners3D[j] = subRois.mInnerConners3D[j]
return new_record
def init_callback():
global data_callback, bStop
def _callback(data):
print("_callback回调执行")
global g_msg_cache, g_frame_cache, g_running_mode, bStop
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
frame = None
ret = SYS_OK
msg = None
# 获取数据
data_type = CallbackInfo
data_ptr = cast(data, POINTER(data_type))
ckinfoCache = data_ptr.contents
record = _copy_record(ckinfoCache)
print(str(g_running_mode) + " : " + str(record.code) + " : " + bytes.decode(record.errorInfo))
# 当定位模式下,接收到【定位完成】消息
if g_running_mode == RUNNING_MODE_LOCATION and record.code == (6 + TO_CLIENT_NOTIFY_BASE):
if len(g_frame_cache) == 0:
print("当前没有广角数据")
return
frame = g_frame_cache.pop() # 从图像缓存中去除图像
g_frame_cache = [] # 清空缓存,避免无限增长
msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), frame)
# 当检测模式下,接收到【拍照完成】消息
elif record.code == (8 + TO_CLIENT_NOTIFY_BASE):
bStop = True
time.sleep(1)
elif record.code == (12 + TO_CLIENT_NOTIFY_BASE):
msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
detectionResProcessing(msg,data_img_path,data_detect_res_save_path)
bStop = True
time.sleep(1)
# # frame = np.zeros((7000, 9344, 3), np.uint8)
# # # frame.zeros(9344, 7000, cv2.CV_8UC3)
# # # src_frame_ptr = c_char_p(frame.data.tobytes())
# # mv = memoryview(frame.data)
# # buffer_pointer = cast(mv.obj.ctypes.data, c_void_p)
# # ret = image_framework_sdk.LibapiGetHQImage(
# # buffer_pointer, frame.shape[1], frame.shape[0], 3, c_char_p(b"PythonProcessing"))
# if ret != SYS_OK:
# print("ret != SYS_OK:" + str(ret))
# record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
# msg = Msg(MSG_DETECTION_RECORD, record, None)
# g_msg_cache.append(msg)
# # emit msg
# return
# msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), frame)
# elif g_running_mode == RUNNING_MODE_LOCATION:
# msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), None)
# elif g_running_mode == RUNNING_MODE_DETECTION:
# msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
# else:
# print("未知回调")
# return
data_callback = CallbackType(_callback)
def init_image_framework_sdk():
global image_framework_sdk
try:
# 加载C++库
current_file_dir = os.path.dirname(os.path.abspath(__file__))
image_framework_sdk = CDLL(os.path.join(current_file_dir, f'./image_framework.dll'))
2025-03-03 10:55:46 +08:00
print(f"[image_framework_sdk] ====== [{image_framework_sdk}]")
2025-02-25 10:15:21 +08:00
print("Load Image framework sdk success")
except Exception as e:
print(f"Load Image framework sdk failed: {str(e)}")
def adjust():
global image_framework_sdk
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
ret = image_framework_sdk.LibapiStartDetection()
if ret != SYS_OK:
print(f"image_framework_sdk.LibapiStartDetection():执行失败 :{ret}")
return record
def check():
print("check====================")
global image_framework_sdk
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
ret = image_framework_sdk.LibapiContinuetDetection("{}")
if ret != SYS_OK:
print("image_framework_sdk.LibapiContinuetDetection执行失败")
return record
def start():
global bStop
# 开始检测
logger.debug("开始adjust")
adjust()
while not bStop:
logger.debug("adjust等待回调")
time.sleep(1)
logger.debug("开始check")
check()
bStop = False
while not bStop:
logger.debug("check等待回调")
time.sleep(1)
if __name__ == '__main__':
# 加载驱动
init_image_framework_sdk()
if image_framework_sdk is None:
raise RuntimeError("Image framework SDK未正确加载!")
# 设置回调函数
init_callback()
ret = image_framework_sdk.LibapiInit(1, "", data_callback)
if ret != 0:
raise RuntimeError(f"设置回调函数始设置失败, 错误码: {ret}")
2025-03-03 10:55:46 +08:00
dataPath = "./result" # 数据路径
2025-02-25 10:15:21 +08:00
data_img_path = None # 检测的图片路径
data_detect_res_save_path = None # 检测结果的保存路径
# 遍历目录
for root, dirs, files in os.walk(dataPath):
for dir in dirs:
logger.debug(f"开始处理子目录:{dir}")
data_detect_res_save_path = os.path.join(root, dir)
# 目录下的所有文件路径
for file in os.listdir(os.path.join(root, dir)):
file_path = os.path.join(root, dir, file)
# 将反斜杠替换为正斜杠
file_path = file_path.replace('\\', '/')
logger.debug(f"文件路径:{file_path}")
# 如果是jpg图片
if file.endswith("output.jpg"):
# 将文件路径修改到ini配置中
data_img_path = file_path
change_config_ini("sys", "fake_image_fpath", file_path)
if file.endswith(".ply"):
# 将文件路径修改到ini配置中
change_config_ini("sys", "fake_lidar_fpath", file_path)
if file.endswith(".txt"):
# txt roi坐标复制到更目录曹总懒得修改c代码只能从根目录读取。
shutil.copy(file_path, os.path.join("./", file))
logger.info(f"roi坐标文件已复制到更目录")
time.sleep(1)
# ...
# 开始检测
bStop = False
logger.debug("开始检测")
start()
exit()