This commit is contained in:
leon 2025-04-20 21:44:06 +08:00
parent 7d980ddadd
commit e6744ccc83
30 changed files with 1151 additions and 149 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
.idea/
.vscode/
0_roi_*.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.3 MiB

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 MiB

After

Width:  |  Height:  |  Size: 4.2 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.9 MiB

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.6 MiB

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.2 MiB

After

Width:  |  Height:  |  Size: 2.9 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 983 KiB

After

Width:  |  Height:  |  Size: 1014 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 MiB

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.3 MiB

After

Width:  |  Height:  |  Size: 738 KiB

296
bim_cal.py Normal file
View File

@ -0,0 +1,296 @@
import copy
from typing import Optional
from typing import Tuple
import math
def point_to_line_distance(px, py, x1, y1, x2, y2):
"""
计算一个点到另外两个点形成的直线上的垂直距离点到直线的距离公式
返回距离值
"""
# 分子部分
numerator = abs((y2 - y1) * px - (x2 - x1) * py + x2 * y1 - y2 * x1)
# 分母部分
denominator = math.hypot(y2 - y1, x2 - x1)
# 距离
return numerator / denominator
def from_main_get_other_point_distance(main_ymj, other_ymj):
"""
以main_ymj(主要预埋件)的四个边的中点
高度中点连线为x轴
宽度中点连线为y轴
计算other_ymj(其他预埋件)的中心点到主要预埋件x轴直线和y轴直线的的垂直距离
返回x垂直距离,y垂直距离
"""
# 其他预埋件的中心点坐标
px = other_ymj['x_center']
py = other_ymj['y_center']
# 计算其他预埋件中心点坐标到预埋件左右边中点坐标连线形成的直线的垂直距离
bottom_line_center_x = main_ymj['bottom_line_center_x']
bottom_line_center_y = main_ymj['bottom_line_center_y']
top_line_center_x = main_ymj['top_line_center_x']
top_line_center_y = main_ymj['top_line_center_y']
x_distance = point_to_line_distance(px, py, bottom_line_center_x, bottom_line_center_y, top_line_center_x,
top_line_center_y)
# 计算其他预埋件中心点坐标到预埋件左右边中点坐标连线形成的直线的垂直距离
left_line_center_x = main_ymj['left_line_center_x']
left_line_center_y = main_ymj['left_line_center_y']
right_line_center_x = main_ymj['right_line_center_x']
right_line_center_y = main_ymj['right_line_center_y']
y_distance = point_to_line_distance(px, py, left_line_center_x, left_line_center_y, right_line_center_x,
right_line_center_y)
return round(x_distance, 2), round(y_distance, 2)
def from_main_get_other_point_distance_bim(main_ymj, other_ymj):
"""
与from_main_get_other_point_distance函数类似
只不过计算的是bim图中其他预埋件相对于主要预埋件的x和y的距离
因为bim是标准的平面图所以可以直接使用主要预埋件的中心点作为坐标原点直接计算其他预埋件相对于这个坐标原点的x,y,
返回x垂直距离,y垂直距离
"""
main_x = main_ymj["x"]
main_x = float(main_x)
main_y = main_ymj["center"]
main_y = float(main_y) * 1000
other_x = other_ymj["x"]
other_x = float(other_x)
other_y = other_ymj["center"]
other_y = float(other_y) * 1000
x_distance = abs(other_x - main_x)
y_distance = abs(other_y - main_y)
return round(x_distance, 2), round(y_distance, 2)
def cal_deviation(main_ymj, other_ymj_list):
"""
计算偏差表
bim和图片中其他预埋件中心点到主要预埋件的偏差值
中心x轴直线偏差值
中心y轴直线的的垂直距离偏差值
欧几里德距离偏差值
@:return
[
{
'ymj_code': 预埋件code
'x_deviation': x偏差
'y_deviation': y偏差
'euclidean_deviation': 欧几里德距离的偏差
'average_deviation' : 平均误差
'x_deviation_s': x偏差值带正负符号
'y_deviation_s': y偏差值带正负符号
}
]
"""
# 1.计算图片和bim中其他预埋件中心点到主要预埋件中心x轴直线和中心y轴直线的的垂直距离
distance_data = [] # bim 和 图片中其他预埋件中心点到主要预埋件中心x轴直线和中心y轴直线的的垂直距离欧几里德距离
for other_ymj in other_ymj_list:
# 计算图片中其他预埋件中心点到主要预埋件中心x轴直线和中心y轴直线的的垂直距离
x_distance_pic, y_distance_pic = from_main_get_other_point_distance(main_ymj, other_ymj)
pic_data = {
'x_distance': x_distance_pic,
'y_distance': y_distance_pic,
# 欧几里得距离值
'euclidean_distance': math.sqrt(x_distance_pic ** 2 + y_distance_pic ** 2),
}
# 计算bim中其他预埋件中心点到主要预埋件中心x轴直线和中心y轴直线的的垂直距离
x_distance_bim, y_distance_bim = from_main_get_other_point_distance_bim(main_ymj, other_ymj)
bim_data = {
'x_distance': x_distance_bim,
'y_distance': y_distance_bim,
# 欧几里得距离值
'euclidean_distance': math.sqrt(x_distance_bim ** 2 + y_distance_bim ** 2),
}
distance_data.append(
{
'code': other_ymj['code'],
'pic_data': pic_data, # 图片中其他预埋件中心点到主要预埋件中心x轴直线和中心y轴直线的的垂直距离, 欧几里得距离值
'bim_data': bim_data, # bim中其他预埋件中心点到主要预埋件中心x轴直线和中心y轴直线的的垂直距离, 欧几里得距离值
}
)
# 2.对比bim和图片中数据的偏差值
deviation_data = [] # 偏差值列表
for distance in distance_data:
pic_data = distance['pic_data']
bim_data = distance['bim_data']
ymj_code = distance['code']
# 计算x偏差
x_deviation = pic_data['x_distance'] - bim_data['x_distance']
# 计算y偏差
y_deviation = pic_data['y_distance'] - bim_data['y_distance']
# 计算欧几里得距离偏差
sqrt_xx_yy_deviation = pic_data['euclidean_distance'] - bim_data['euclidean_distance']
# 计算平均误差
average_deviation = max(abs(x_deviation), abs(y_deviation), abs(sqrt_xx_yy_deviation))
deviation_data.append({
'ymj_code': ymj_code,
'x_deviation': abs(x_deviation),
'x_deviation_s': x_deviation,
'y_deviation': abs(y_deviation),
'y_deviation_s': y_deviation,
'euclidean_deviation': abs(sqrt_xx_yy_deviation),
'average_deviation': average_deviation,
})
# 3.返回计算偏差值列表
return deviation_data
def get_point_align_line(
px: float,
py: float,
x_axis: Optional[Tuple[float, float, float, float]],
y_axis: Optional[Tuple[float, float, float, float]],
x_move_distance: float,
y_move_distance: float
) -> Tuple[float, float]:
"""
计算一点沿着两点形成的直线方向移动指定距离之后的坐标值
注意方向为
起点 x_axis_x1,x_axis_y1 ==> 终点 x_axis_x2,x_axis_y2
起点 y_axis_x1,y_axis_y1 ==> 终点 y_axis_x2,y_axis_y2
相当于 px, py 沿着 x_axis y_axis 的方向各移动 x_move_distance y_move_distance
:return: 移动后的新坐标 (new_x, new_y)
"""
new_x, new_y = px, py
# 如果 x 方向有位移
if x_move_distance != 0:
x1, y1, x2, y2 = x_axis
dx = x2 - x1
dy = y2 - y1
length = math.hypot(dx, dy)
if length != 0:
x_unit_vec = (dx / length, dy / length)
new_x += x_move_distance * x_unit_vec[0]
new_y += x_move_distance * x_unit_vec[1]
# 如果 y 方向有位移
if y_move_distance != 0:
x1, y1, x2, y2 = y_axis
dx = x2 - x1
dy = y2 - y1
length = math.hypot(dx, dy)
if length != 0:
y_unit_vec = (dx / length, dy / length)
new_x += y_move_distance * y_unit_vec[0]
new_y += y_move_distance * y_unit_vec[1]
return new_x, new_y
def cal_deviation_table(all_ymj_data):
"""
计算每个预埋件的平均偏差表
:param all_ymj_data:
:return: [
{
'ymj_code': 主要预埋件
'average_deviation': 平均误差
}
]
"""
deviation_table = [] # 数值偏差表
for main_ymj in all_ymj_data:
other_ymj_list = copy.deepcopy(all_ymj_data)
other_ymj_list.remove(main_ymj) # 移除主要预埋件
# 1.计算偏差数据
deviation_data = cal_deviation(main_ymj, other_ymj_list)
print(f"main_ymj_code={main_ymj['code']}")
print(f"deviation_data={deviation_data}")
# 2.计算所有其他预埋件误差的平均值
all_average_deviation = 0
for deviation in deviation_data:
# 一个预埋件的误差平均值
average_deviation = deviation['average_deviation']
# 误差 累加起来
all_average_deviation += average_deviation
# 除以预埋件数量
all_average_deviation = all_average_deviation / len(deviation_data)
deviation_table.append({
'ymj_code': main_ymj['code'],
'average_deviation': all_average_deviation,
})
print(f"📋deviation_table={deviation_table}")
return deviation_table
def cal_with_manual_measurement(all_ymj_data):
"""
根据人工测量出来的偏差值计算预埋件偏差值
@:return
main_ymj_code : 使用了人工测量数据的预埋件
measure_deviation: 其他预埋件相当于人工测量出数据的实际偏差
"""
main_ymj_code = None
measure_deviation = []
# 深度拷贝一份全部预埋件数据, 防止源数据被修改
all_ymj_data_copy = copy.deepcopy(all_ymj_data)
# 遍历查找哪个数据有人工测量的值
is_find = False
for idx, ymj in enumerate(all_ymj_data_copy):
# 如果有人工测量数据
if 'x_measure_bias' in ymj:
is_find = True
x_measure_bias = ymj['x_measure_bias'] # x偏差
y_measure_bias = ymj['y_measure_bias'] # y偏差
# 先获取其他预埋件相对于这个预埋件的偏差
other_ymj_list = all_ymj_data_copy
other_ymj_list.remove(ymj)
deviation_data = cal_deviation(ymj, other_ymj_list)
print(f"主预埋件code={ymj['code']}")
print(f"其他预埋件偏差={deviation_data}")
# 给其他预埋件的偏差值 加上人工测量的偏差 以获得其真实偏差
for deviation in deviation_data:
# 带符号的偏差偏差值
x_deviation_s = deviation['x_deviation_s']
y_deviation_s = deviation['y_deviation_s']
# 找到这个预埋件的源数据
for ymj_data in other_ymj_list:
# code相同判断
if ymj_data['code'] == deviation['ymj_code']:
# 偏差值再加上人工测量的偏差 以获得其真实偏差
measure_deviation.append({
'ymj_code': deviation['ymj_code'],
'x_deviation': x_deviation_s + x_measure_bias,
'y_deviation': y_deviation_s + y_measure_bias,
})
break
if is_find:
# 只拿取第一个人工测量的预埋件的数据
main_ymj_code = ymj['code']
break
if not is_find:
print("❗️❗️没有任何一个预埋件存在人工测量的数据")
return main_ymj_code,measure_deviation

145
capture(1).py Normal file
View File

@ -0,0 +1,145 @@
# -- coding: utf-8 --
import time
import os
import paramiko
from scp import SCPClient
import threading
from os import abort
HOST = "192.168.100.254"
PORT = 22
USERNAME = "root"
PASSWORD = "ebaina"
EXPTIME = 4500000
CLOUDPOINTS = 30 * 10000
def ssh_execute_command(host, port, username, password, command):
ret = False
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, port, username, password)
# 执行命令
stdin, stdout, stderr = client.exec_command(command)
print(stdout.read().decode())
print(stderr.read().decode())
# 关闭连接
client.close()
return True
def scp_get_file(host, port, username, password, remote_path, local_path):
ret = False
# 创建SSH客户端
client = paramiko.Transport((host, port))
client.connect(username=username, password=password)
# 创建SCP客户端
scp_client = SCPClient(client)
# 拉取文件
try:
scp_client.get(remote_path, local_path)
except Exception as e:
print(e)
return False
# 关闭连接
scp_client.close()
client.close()
return True
def cameraCap():
start_time = time.time()
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n" +
"./camera " + str(EXPTIME) + "\n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"ssh_execute_command 程序执行时间:{execution_time}")
time.sleep(1) # 等待操作系统保存图像完毕
ret = scp_get_file(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
remote_path = "/root/calibration_tools/output.png",
local_path = os.path.dirname(__file__) + "\\ssh_tmp\\output.png"
)
if not ret:
return False
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n " +
"rm output.png \n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"cameraCap 程序执行时间:{execution_time}")
def lidarCap():
start_time = time.time()
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n" +
"./lidar_driver " + str(CLOUDPOINTS) + "\n"
)
ret = scp_get_file(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
remote_path = "/root/calibration_tools/output.ply",
local_path = os.path.dirname(__file__) + ".\\ssh_tmp\\output.ply"
)
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n " +
"rm output.ply \n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"lidarCap 程序执行时间:{execution_time}")
def cap():
directory = os.path.dirname(__file__) + ".\\ssh_tmp\\"
files_and_directories = os.listdir(directory)
for file in files_and_directories:
file_path = os.path.join(directory, file)
if os.path.isfile(file_path):
os.unlink(file_path)
t_lidar = threading.Thread(target=lidarCap)
t_lidar.start()
cameraCap()
# t_camera.join()
t_lidar.join()
if __name__ == '__main__':
cap()

View File

@ -17,8 +17,8 @@ CLOUD_COUNTS = [400000, 500000, 600000]
DEFAULT_COUNT = 300000
EXPS = [200 * 1000, 300 * 1000, 400 * 1000]
DEFAULT_EXP = 100 * 1000
PARENT_DIR_NAME = "test_2"
SAVE_DIR_NAME = ""
PARENT_DIR_NAME = "test_ymj"
SAVE_DIR_NAME = "test_ymj_1"
def ssh_execute_command(host, port, username, password, command):

View File

@ -15,7 +15,7 @@ USERNAME = "root"
PASSWORD = "ebaina"
# CLOUD_COUNTS = [600000,800000,1000000,1500000,2000000]
CLOUD_COUNTS = [1500000]
CLOUD_COUNTS = [300000]
DEFAULT_COUNT = 1500000
EXPS = [20*1000]
# EXPS = [100 * 1000]

View File

@ -15,12 +15,12 @@ USERNAME = "root"
PASSWORD = "ebaina"
# CLOUD_COUNTS = [600000,800000,1000000,1500000,2000000]
CLOUD_COUNTS = [1500000]
DEFAULT_COUNT = 1500000
EXPS = [10*1000]
CLOUD_COUNTS = []
DEFAULT_COUNT = 300000
EXPS = [4500000*1000]
# EXPS = [100 * 1000]
DEFAULT_EXP = int(8 * 1000)
PARENT_DIR_NAME = "宁德核电站0327-9-长条形状2"
DEFAULT_EXP = 0
PARENT_DIR_NAME = "测试2"
# PARENT_DIR_NAME = "test_1"
SAVE_DIR_NAME = ""
TEST_CLOUD = False

145
capture_new.py Normal file
View File

@ -0,0 +1,145 @@
# -- coding: utf-8 --
import time
import os
import paramiko
from scp import SCPClient
import threading
from os import abort
HOST = "192.168.100.254"
PORT = 22
USERNAME = "root"
PASSWORD = "ebaina"
EXPTIME = 4500000
CLOUDPOINTS = 30 * 10000
def ssh_execute_command(host, port, username, password, command):
ret = False
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, port, username, password)
# 执行命令
stdin, stdout, stderr = client.exec_command(command)
print(stdout.read().decode())
print(stderr.read().decode())
# 关闭连接
client.close()
return True
def scp_get_file(host, port, username, password, remote_path, local_path):
ret = False
# 创建SSH客户端
client = paramiko.Transport((host, port))
client.connect(username=username, password=password)
# 创建SCP客户端
scp_client = SCPClient(client)
# 拉取文件
try:
scp_client.get(remote_path, local_path)
except Exception as e:
print(e)
return False
# 关闭连接
scp_client.close()
client.close()
return True
def cameraCap():
start_time = time.time()
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n" +
"./camera_driver " + str(EXPTIME) + "\n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"ssh_execute_command 程序执行时间:{execution_time}")
time.sleep(1) # 等待操作系统保存图像完毕
ret = scp_get_file(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
remote_path = "/root/calibration_tools/output.png",
local_path = os.path.dirname(__file__) + "\\ssh_tmp\\output.png"
)
if not ret:
return False
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n " +
"rm output.png \n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"cameraCap 程序执行时间:{execution_time}")
def lidarCap():
start_time = time.time()
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n" +
"./lidar_driver " + str(CLOUDPOINTS) + "\n"
)
ret = scp_get_file(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
remote_path = "/root/calibration_tools/output.ply",
local_path = os.path.dirname(__file__) + ".\\ssh_tmp\\output.ply"
)
ret = ssh_execute_command(
host = HOST,
port = PORT,
username = USERNAME,
password = PASSWORD,
command = "cd /root/calibration_tools \n " +
"rm output.ply \n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"lidarCap 程序执行时间:{execution_time}")
def cap():
directory = os.path.dirname(__file__) + ".\\ssh_tmp\\"
files_and_directories = os.listdir(directory)
for file in files_and_directories:
file_path = os.path.join(directory, file)
if os.path.isfile(file_path):
os.unlink(file_path)
t_lidar = threading.Thread(target=lidarCap)
t_lidar.start()
cameraCap()
# t_camera.join()
t_lidar.join()
if __name__ == '__main__':
cap()

View File

@ -12,7 +12,7 @@ p2 = -0.000424586368
k3 = 0.371045956
yolo_label = 0
yolo_prob = 0.601
yolo_modol_path = ./best_ep23_small_lab.om
yolo_modol_path = ./ymj_03-04-enhance-712-lab-seaSlat.om
save_image = true
roi_y_offset = 20
edge_min_line_len = 400
@ -24,18 +24,18 @@ kk = 0.0
r = 1.572895519320505
p = -1.5570521101629837
y = 0.024731696602587224
tx = 0.109253
ty = 0.0710213
tz = 0.0175978
dminx = -2.5
dmaxx = 2.5
dminy = -2.5
dmaxy = 2.5
tx = 0.110253
ty = -0.0095978
tz = 0.0910213
dminx = -1.5
dmaxx = 1.5
dminy = -1.5
dmaxy = 1.5
dminz = 0.0
dmaxz = 3.6
kk = 0.0
cloud_need_points_size = 600000
save_cload = true
cloud_need_points_size = 1500000
save_cload = false
[sys]
fake = true
@ -43,6 +43,12 @@ camera_cap_fake = true
lidar_cap_fake = true
npu_fake = true
conners_detect_fake = true
fake_image_fpath = ./result/hy_1/output.jpg
fake_lidar_fpath = ./result/hy_1/output.ply
fake_image_fpath = ./result2/test2/output.jpg
fake_lidar_fpath = ./result2/test2/output.ply
export_time = 24
bim_angle_offset_max = 0.08
bin_dist_offset_max = 0.03
[test]
apple = 234

80
conners.txt Normal file
View File

@ -0,0 +1,80 @@
6683
1675
7716
1675
7716
2654
6683
2654
5799
3730
7895
3730
7895
5796
5799
5796
4469
158
5638
158
5638
1310
4469
1310
6594
249
7773
249
7773
1384
6594
1384
2031
547
3680
547
3680
2151
2031
2151
5244
2321
6266
2321
6266
3314
5244
3314
3505
5140
4863
5140
4863
6515
3505
6515
0
18
675
18
675
1202
0
1202
0
1427
463
1427
463
2529
0
2529
2132
3680
4847
3680
4847
4909
2132
4909

262
detect.py
View File

@ -1,3 +1,4 @@
import ctypes
import os
import shutil
import sys
@ -6,6 +7,8 @@ import time
from ctypes import *
import logging
from logging import makeLogRecord
import imgProcess
from clog import logger
import cv2
import numpy as np
@ -37,6 +40,46 @@ MSG_ONLY_RECORD = -1
MSG_LOCATION_RECORD = 0
MSG_DETECTION_RECORD = 1
# bim_data = [
# {"code": "BFX1101VBPT001", "type": "248x248", "x": "325", "y": "250", "center": "1.269", "w": "248", "h": "248",
# "angle": "0°"},
# {"code": "BFX1101VBPT002", "type": "150x149", "x": "717", "y": "250", "center": "0.849", "w": "150", "h": "149",
# "angle": "0°"},
# {"code": "BFX1101VBPT003", "type": "300x248", "x": "778", "y": "250", "center": "0.317", "w": "300", "h": "248",
# "angle": "0°"},
# {"code": "BFX1101VBPT004", "type": "248x248", "x": "1668", "y": "250", "center": "0.296", "w": "248", "h": "248",
# "angle": "90°"},
# {"code": "BFX1101VBPT005", "type": "248x248", "x": "2454", "y": "250", "center": "1.307", "w": "248", "h": "248",
# "angle": "0°"}]
bim_data = [
{"code": "BFX1101VBPT002", "type": "150x149", "x": "717", "y": "250", "center": "0.960", "w": "150", "h": "149",
"angle": ""},
{"code": "BFX1101VBPT003", "type": "300x248", "x": "778", "y": "250", "center": "0.317", "w": "300", "h": "248",
"angle": ""},
{"code": "BFX1101VBPT004", "type": "248x248", "x": "1668", "y": "250", "center": "0.296", "w": "248", "h": "248",
"angle": "90°"}]
MAX_OBJ = 32
class OjectInfo(ctypes.Structure):
_fields_ = [
("id", ctypes.c_int),
("name", ctypes.c_char * 256),
("centerX", ctypes.c_float),
("centerY", ctypes.c_float),
("w", ctypes.c_float),
("h", ctypes.c_float)
]
class OjectsInfo(ctypes.Structure):
_fields_ = [
("count", ctypes.c_int),
("objs", OjectInfo * MAX_OBJ)
]
# 获取消息
def _get_msg_info(code) -> str:
@ -71,6 +114,13 @@ class _SubRoiData_(Structure):
('mVc2D', POINTER(c_float)),
('mVc3D', POINTER(c_float)),
('H', c_float * 9),
# 新增部分
('mInnerLineCenter', c_float * 8), # 4 * 2
('mInnerLineCenter3D', c_float * 12), # 4 * 3
('mInnerCenter', c_float * 2), # 1 * 2
('mInnerCenter3D', c_float * 3), # 1 * 3
('mEdgesLen', c_float * 4), # 4
# 新增字段之后需同步在_copy_record函数中进行添加
]
@ -106,22 +156,94 @@ def _copy_record(record):
if i > 15: # 超过16个ROI返回
break
subRois = record.subRois[i]
newSubRois = new_record.subRois[i]
newSubRois.isGood = subRois.isGood
newSubRois.mInfo = subRois.mInfo
for j in range(8):
newSubRois.mRoiPoints[j] = subRois.mRoiPoints[j]
newSubRois.mInnerConners[j] = subRois.mInnerConners[j]
newSubRois.mInnerLineCenter[j] = subRois.mInnerLineCenter[j]
for j in range(12):
newSubRois.mInnerConners3D[j] = subRois.mInnerConners3D[j]
newSubRois.mInnerLineCenter3D[j] = subRois.mInnerLineCenter3D[j]
for j in range(4):
newSubRois.mEdgesLen[j] = subRois.mEdgesLen[j]
for j in range(2):
newSubRois.mInnerCenter[j] = subRois.mInnerCenter[j]
for j in range(3):
newSubRois.mInnerCenter3D[j] = subRois.mInnerCenter3D[j]
return new_record
# def init_callback():
# global data_callback, bStop
#
# def _callback(data):
# print("_callback回调执行")
# global g_msg_cache, g_frame_cache, g_running_mode, bStop
# record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
# frame = None
# ret = SYS_OK
# msg = None
#
# # 获取数据
# data_type = CallbackInfo
# data_ptr = cast(data, POINTER(data_type))
# ckinfoCache = data_ptr.contents
# record = _copy_record(ckinfoCache)
# print(str(g_running_mode) + " : " + str(record.code) + " : " + bytes.decode(record.errorInfo))
# # 当定位模式下,接收到【定位完成】消息
# if g_running_mode == RUNNING_MODE_LOCATION and record.code == (6 + TO_CLIENT_NOTIFY_BASE):
# if len(g_frame_cache) == 0:
# print("当前没有广角数据")
# return
# frame = g_frame_cache.pop() # 从图像缓存中去除图像
# g_frame_cache = [] # 清空缓存,避免无限增长
# msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), frame)
# # 当检测模式下,接收到【拍照完成】消息
# elif record.code == (8 + TO_CLIENT_NOTIFY_BASE):
# bStop = True
# time.sleep(1)
# elif record.code == (12 + TO_CLIENT_NOTIFY_BASE):
# msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
# detectionResProcessing(msg,data_img_path,data_detect_res_save_path)
# bStop = True
# time.sleep(1)
# # # frame = np.zeros((7000, 9344, 3), np.uint8)
# # # # frame.zeros(9344, 7000, cv2.CV_8UC3)
# # # # src_frame_ptr = c_char_p(frame.data.tobytes())
# # # mv = memoryview(frame.data)
# # # buffer_pointer = cast(mv.obj.ctypes.data, c_void_p)
# # # ret = image_framework_sdk.LibapiGetHQImage(
# # # buffer_pointer, frame.shape[1], frame.shape[0], 3, c_char_p(b"PythonProcessing"))
# # if ret != SYS_OK:
# # print("ret != SYS_OK:" + str(ret))
# # record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
# # msg = Msg(MSG_DETECTION_RECORD, record, None)
# # g_msg_cache.append(msg)
# # # emit msg
# # return
# # msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), frame)
# # elif g_running_mode == RUNNING_MODE_LOCATION:
# # msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), None)
# # elif g_running_mode == RUNNING_MODE_DETECTION:
# # msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
# # else:
# # print("未知回调")
# # return
#
# data_callback = CallbackType(_callback)
def init_callback():
global data_callback, bStop
global data_callback
def _callback(data):
print("_callback回调执行")
global g_msg_cache, g_frame_cache, g_running_mode, bStop
global g_msg_cache, g_frame_cache, g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
frame = None
ret = SYS_OK
@ -132,46 +254,55 @@ def init_callback():
data_ptr = cast(data, POINTER(data_type))
ckinfoCache = data_ptr.contents
record = _copy_record(ckinfoCache)
print(str(g_running_mode) + " : " + str(record.code) + " : " + bytes.decode(record.errorInfo))
print(f"g_running_mode===={g_running_mode}")
print(f"record===={record.code}")
# print("callback ============ > " + str(record.code) + " mode " + str(g_running_mode))
# print(str(g_running_mode) + " : " + str(record.code) + " : " + bytes.decode(record.errorInfo))
# 当定位模式下,接收到【定位完成】消息
if g_running_mode == RUNNING_MODE_LOCATION and record.code == (6 + TO_CLIENT_NOTIFY_BASE):
if len(g_frame_cache) == 0:
print("当前没有广角数据")
# self.logger.error("当前没有广角数据")
return
frame = g_frame_cache.pop() # 从图像缓存中去除图像
g_frame_cache = [] # 清空缓存,避免无限增长
msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), frame)
msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), frame, None)
# 当检测模式下,接收到【拍照完成】消息
elif record.code == (8 + TO_CLIENT_NOTIFY_BASE):
bStop = True
time.sleep(1)
elif record.code == (12 + TO_CLIENT_NOTIFY_BASE):
msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
detectionResProcessing(msg,data_img_path,data_detect_res_save_path)
bStop = True
time.sleep(1)
# # frame = np.zeros((7000, 9344, 3), np.uint8)
# # # frame.zeros(9344, 7000, cv2.CV_8UC3)
# # # src_frame_ptr = c_char_p(frame.data.tobytes())
# # mv = memoryview(frame.data)
# # buffer_pointer = cast(mv.obj.ctypes.data, c_void_p)
# # ret = image_framework_sdk.LibapiGetHQImage(
# # buffer_pointer, frame.shape[1], frame.shape[0], 3, c_char_p(b"PythonProcessing"))
# if ret != SYS_OK:
# print("ret != SYS_OK:" + str(ret))
# record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
# msg = Msg(MSG_DETECTION_RECORD, record, None)
# g_msg_cache.append(msg)
# # emit msg
# return
# msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), frame)
# elif g_running_mode == RUNNING_MODE_LOCATION:
# msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), None)
# elif g_running_mode == RUNNING_MODE_DETECTION:
# msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None)
# else:
# print("未知回调")
# return
elif (g_running_mode == RUNNING_MODE_DETECTION and record.code == (2 + TO_CLIENT_NOTIFY_BASE)) or \
(g_running_mode == RUNNING_MODE_DETECTION and record.code == (8 + TO_CLIENT_NOTIFY_BASE)) or \
(g_running_mode == RUNNING_MODE_DETECTION and record.code == (12 + TO_CLIENT_NOTIFY_BASE)):
frame = np.zeros((7000, 9344, 3), np.uint8)
print("1")
# frame.zeros(9344, 7000, cv2.CV_8UC3)
# src_frame_ptr = c_char_p(frame.data.tobytes())
mv = memoryview(frame.data)
buffer_pointer = cast(mv.obj.ctypes.data, c_void_p)
print("12")
# ret = self.image_framework_sdk.LibapiGetHQImage(
# buffer_pointer, frame.shape[1], frame.shape[0], 3, c_char_p(b"PythonProcessing"))
print("13")
if ret != SYS_OK:
print("ret != SYS_OK:" + str(ret))
record = CallbackInfo(code=ret, errorInfo=_get_msg_info(ret), bGetData=False)
msg = Msg(MSG_DETECTION_RECORD, record, None)
print("14")
g_msg_cache.append(msg)
# emit msg
return
msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), frame)
elif g_running_mode == RUNNING_MODE_LOCATION:
# msg = Msg(MSG_LOCATION_RECORD, _copy_record(record), None, None)
print("")
elif g_running_mode == RUNNING_MODE_DETECTION:
# msg = Msg(MSG_DETECTION_RECORD, _copy_record(record), None, None)
print("")
else:
print("未知回调")
return
print("15")
g_msg_cache.append(msg)
# self.logger.info("image framework callback done.")
data_callback = CallbackType(_callback)
@ -188,10 +319,25 @@ def init_image_framework_sdk():
print(f"Load Image framework sdk failed: {str(e)}")
def adjust():
global image_framework_sdk
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
ret = image_framework_sdk.LibapiStartDetection()
# 创建一个 OjectsInfo 实例
info = OjectsInfo()
for idx, bim in enumerate(bim_data):
print(bim["center"])
info.objs[idx].id = idx
info.objs[idx].name = bim["code"].encode("utf-8")
info.objs[idx].centerX = float(bim["x"]) / 1000
info.objs[idx].centerY = float(bim["center"])
info.objs[idx].w = float(bim["w"]) / 1000
info.objs[idx].h = float(bim["h"]) / 1000
ret = image_framework_sdk.LibapiStartDetectionV2(ctypes.byref(info))
if ret != SYS_OK:
print(f"image_framework_sdk.LibapiStartDetection():执行失败 :{ret}")
return record
@ -203,19 +349,23 @@ def check():
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
ret = image_framework_sdk.LibapiContinuetDetection("{}")
print("返回值:", ret)
if ret != SYS_OK:
print("image_framework_sdk.LibapiContinuetDetection执行失败")
return record
def start():
global g_running_mode
global bStop
g_running_mode = RUNNING_MODE_DETECTION
# 开始检测
logger.debug("开始adjust")
adjust()
while not bStop:
logger.debug("adjust等待回调")
time.sleep(1)
# while not bStop:
# logger.debug("adjust等待回调")
# time.sleep(1)
time.sleep(5)
logger.debug("开始check")
check()
bStop = False
@ -224,7 +374,34 @@ def start():
time.sleep(1)
def _main_processing_thread():
global g_msg_cache, g_running_mode
record = CallbackInfo(code=SYS_OK, errorInfo=b"", bGetData=False)
times = 0
while True:
if len(g_msg_cache) <= 0:
times += 1
time.sleep(0.01)
# if times % 200 == 0:
# print("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%")
continue
msg = g_msg_cache.pop()
# g_msg_cache = []
# print("===============> start" + str(msg.record.code))
if msg is None:
continue
msg = imgProcess.process(msg)
# print("===============> end" + str(msg.record.code))
# self.logger.info("msg ==> ui")
# self.signals.on_image_detect_result.emit(msg)
if __name__ == '__main__':
process_thread = threading.Thread(target=_main_processing_thread)
process_thread.daemon = True
process_thread.start()
# 加载驱动
init_image_framework_sdk()
if image_framework_sdk is None:
@ -236,10 +413,9 @@ if __name__ == '__main__':
if ret != 0:
raise RuntimeError(f"设置回调函数始设置失败, 错误码: {ret}")
dataPath = "./result" # 数据路径
data_img_path = None # 检测的图片路径
data_detect_res_save_path = None # 检测结果的保存路径
dataPath = "./result2" # 数据路径
data_img_path = None # 检测的图片路径
data_detect_res_save_path = None # 检测结果的保存路径
# 遍历目录
for root, dirs, files in os.walk(dataPath):
for dir in dirs:

View File

@ -1,4 +1,4 @@
name: ymj_test
name: ymj_data_collect
channels:
- defaults
- https://repo.anaconda.com/pkgs/main
@ -57,4 +57,3 @@ dependencies:
- typing-extensions==4.12.2
- tzdata==2025.1
- zipp==3.21.0
prefix: C:\Users\leon\miniconda3\envs\ymj_test

BIN
image_framework-back.dll Normal file

Binary file not shown.

Binary file not shown.

View File

@ -121,27 +121,158 @@ def doDetectionProcessing(msg):
print("================== cv2.imwrite success =================")
if record.code == 12 + TO_CLIENT_NOTIFY_BASE:
for j in range(record.roi_size):
roi = record.subRois[j]
if not roi.isGood: continue
ptsInnerConners = ptsInnerConnerslist[j]
ptsInnerConners3D = ptsInnerConners3Dlist[j]
if len(ptsInnerConners) == 4:
for i in range(4):
p3d = ptsInnerConners3D[i]
p3d_next = ptsInnerConners3D[(i + 1) % 4]
p2d = ptsInnerConners[i]
p2d_next = ptsInnerConners[(i + 1) % 4]
edge = np.sqrt(pow((p3d[0] - p3d_next[0]), 2)
+ pow((p3d[1] - p3d_next[1]), 2)
+ pow((p3d[2] - p3d_next[2]), 2))
cv2.putText(im, str(edge)[:6], \
(int((ptsInnerConners[0][0] + ptsInnerConners[(0 + 1) % 4][0]) / 2), \
int((ptsInnerConners[0][1] + ptsInnerConners[(0 + 1) % 4][1]) / 2 + i * 20)), \
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (255, 0, 0), 2)
bim_result = []
for i in range(record.roi_size):
roi2 = record.subRois[i]
if not roi2.isGood: continue
mInfo = roi2.mInfo.decode('utf-8')
mInnerConners3D = list(roi2.mInnerConners3D)
mInnerLineCenter = list(roi2.mInnerLineCenter)
mInnerLineCenter3D = list(roi2.mInnerLineCenter3D)
mInnerCenter = list(roi2.mInnerCenter)
mInnerCenter3D = list(roi2.mInnerCenter3D)
mEdgesLen = list(roi2.mEdgesLen)
print(f"mEdgesLen={mEdgesLen}")
print(f"mInfo={mInfo}")
# 单位换算
for i in range(3):
mInnerCenter3D[i] = mInnerCenter3D[i] * 1000
mInnerCenter3D[i] = round(mInnerCenter3D[i], 2)
for i in range(12):
# 米 单位 换算成毫米。# 留下小数
mInnerConners3D[i] = mInnerConners3D[i] * 1000
mInnerConners3D[i] = round(mInnerConners3D[i], 2)
mInnerLineCenter3D[i] = mInnerLineCenter3D[i] * 1000
mInnerLineCenter3D[i] = round(mInnerLineCenter3D[i], 2)
# mEdgesLen换算
for i in range(4):
# 米 单位 换算成毫米
mEdgesLen[i] = mEdgesLen[i] * 1000
# 留下小数
mEdgesLen[i] = round(mEdgesLen[i], 2)
# 从bim数据数组中获取code等于mInfo的元素
# this_bim_data = None
# for bim_data_item in bim_data:
# if str(bim_data_item["code"]) == mInfo:
# this_bim_data = bim_data_item
# break
# if this_bim_data is None:
# print(f"🟡返回的值为mInfo={mInfo},无法在bim数据中找到code相同的")
# continue
# else:
# print(f"🟢{mInfo}预埋件匹配成功")
# 临时测试
bim_data = [
{'code': 'BFX1101VBPT003', 'type': '300x248', 'x': '778', 'y': '250', 'center': '0.317', 'w': '300',
'h': '248', 'angle': '', 'm_x': '0', 'm_y': '0', 'm_z': '0', 'base': '3'}]
this_bim_data = bim_data[0]
print(f"🟢{mInfo}预埋件匹配成功")
# bim的长度 和 计算出来的长度进行计算,取优的值
bim_data_width = float(this_bim_data["w"])
bim_data_height = float(this_bim_data["h"])
print(f"bim_data_width:{bim_data_width}")
print(f"bim_data_height:{bim_data_height}")
# 上右下左
cal_up_width = mEdgesLen[0] # 上边宽度
cal_down_width = mEdgesLen[2] # 下边宽度
cal_left_height = mEdgesLen[3] # 左边高度
cal_right_height = mEdgesLen[1] # 右边宽度
print("mEdgesLen:==", mEdgesLen)
# 取差值最小width 和 height的值
if abs(cal_up_width - bim_data_width) < abs(cal_down_width - bim_data_width):
best_cal_width = cal_up_width
else:
best_cal_width = cal_down_width
if abs(cal_left_height - bim_data_height) < abs(cal_right_height - bim_data_height):
best_cal_height = cal_left_height
else:
best_cal_height = cal_right_height
print("best_cal_width:==", best_cal_width)
print("best_cal_height:==", best_cal_height)
# 画到图上
cv2.putText(im, "w:" + str(best_cal_width), \
(int((ptsInnerConners[0][0] + ptsInnerConners[(0 + 1) % 4][0]) / 2), \
int((ptsInnerConners[0][1] + ptsInnerConners[(0 + 1) % 4][1]) / 2 + 0 * 20)), \
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (255, 0, 0), 2)
cv2.putText(im, "h:" + str(best_cal_height), \
(int((ptsInnerConners[0][0] + ptsInnerConners[(0 + 1) % 4][0]) / 2), \
int((ptsInnerConners[0][1] + ptsInnerConners[(0 + 1) % 4][1]) / 2 + 1 * 20)), \
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (255, 0, 0), 2)
# 预埋件基础数据
dict = this_bim_data
# 四个点的3d坐标
dict["x1"] = mInnerConners3D[0]
dict["y1"] = mInnerConners3D[1]
dict["z1"] = mInnerConners3D[2]
dict["x2"] = mInnerConners3D[3]
dict["y2"] = mInnerConners3D[4]
dict["z2"] = mInnerConners3D[5]
dict["x3"] = mInnerConners3D[6]
dict["y3"] = mInnerConners3D[7]
dict["z3"] = mInnerConners3D[8]
dict["x4"] = mInnerConners3D[9]
dict["y4"] = mInnerConners3D[10]
dict["z4"] = mInnerConners3D[11]
# 四条边中各自中点的3d坐标
dict["top_line_center_x"] = mInnerLineCenter3D[0]
dict["top_line_center_y"] = mInnerLineCenter3D[1]
dict["top_line_center_z"] = mInnerLineCenter3D[2]
dict["right_line_center_x"] = mInnerLineCenter3D[3]
dict["right_line_center_y"] = mInnerLineCenter3D[4]
dict["right_line_center_z"] = mInnerLineCenter3D[5]
dict["bottom_line_center_x"] = mInnerLineCenter3D[6]
dict["bottom_line_center_y"] = mInnerLineCenter3D[7]
dict["bottom_line_center_z"] = mInnerLineCenter3D[8]
dict["left_line_center_x"] = mInnerLineCenter3D[9]
dict["left_line_center_y"] = mInnerLineCenter3D[10]
dict["left_line_center_z"] = mInnerLineCenter3D[11]
# 计算出的宽度和高度
dict["best_cal_width"] = best_cal_width
dict["best_cal_height"] = best_cal_height
# 中心点3d坐标
dict["x_center"] = mInnerCenter3D[0]
dict["y_center"] = mInnerCenter3D[1]
dict["z_center"] = mInnerCenter3D[2]
dict["actual_value"] = ""
dict["status"] = "good"
dict["base"] = "5"
# dict["x_center"] = mInnerConners3D[0]
# dict["y_center"] = mInnerConners3D[1]
# dict["z_center"] = mInnerConners3D[2]
bim_result.append(dict)
print(f"bim_result🟠={bim_result}")
# for j in range(record.roi_size):
# roi = record.subRois[j]
# if not roi.isGood: continue
# ptsInnerConners = ptsInnerConnerslist[j]
# ptsInnerConners3D = ptsInnerConners3Dlist[j]
# if len(ptsInnerConners) == 4:
# for i in range(4):
# p3d = ptsInnerConners3D[i]
# p3d_next = ptsInnerConners3D[(i + 1) % 4]
# p2d = ptsInnerConners[i]
# p2d_next = ptsInnerConners[(i + 1) % 4]
# edge = np.sqrt(pow((p3d[0] - p3d_next[0]), 2)
# + pow((p3d[1] - p3d_next[1]), 2)
# + pow((p3d[2] - p3d_next[2]), 2))
# cv2.putText(im, str(edge)[:6], \
# (int((ptsInnerConners[0][0] + ptsInnerConners[(0 + 1) % 4][0]) / 2), \
# int((ptsInnerConners[0][1] + ptsInnerConners[(0 + 1) % 4][1]) / 2 + i * 20)), \
# cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (255, 0, 0), 2)
print("================== draw =================")
msg.im2D = im
cv2.imshow("HQImage", im)
# cv2.imshow("HQImage", im)
# cv2.waitKey(1)
return msg
@ -210,6 +341,11 @@ def detectionResProcessing(msg,data_img_path,data_detect_res_save_path):
logger.success("================== cv2.imwrite result.jpg success =================")
if record.code == 12 + TO_CLIENT_NOTIFY_BASE:
for i in range(record.roi_size):
roi2 = record.subRois[i]
if not roi2.isGood: continue
mInfo = roi2.mInfo
print(f"mInfo====={mInfo}")
for j in range(record.roi_size):
roi = record.subRois[j]
# if not roi.isGood: continue

BIN
result2/test2/output.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.2 MiB

BIN
result2/test2/output.ply Normal file

Binary file not shown.

View File

@ -0,0 +1,24 @@
2373
866
3068
827
3096
1505
2407
1533
2452
3809
3839
3674
3959
4832
2548
4966
6562
3602
7714
3634
7693
4794
6536
4767

BIN
result2/test_3/output.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 MiB

BIN
result2/test_3/output.ply Normal file

Binary file not shown.

View File

@ -0,0 +1,26 @@
2269
754
3219
754
3219
1641
2269
1633
2306
3544
4109
3544
4109
5135
2311
5140
6363
3463
7873
3468
7882
4910
6350
4897

View File

@ -1,64 +1,32 @@
6911
3175
7935
3175
7935
4142
6911
4142
5578
3808
6623
3808
6623
4761
5578
4761
2677
5089
5214
5089
5214
6179
2677
6179
2613
2198
4142
2198
4142
3607
2613
3607
73
1746
1314
1746
1314
2799
73
2799
52
3066
1092
3066
1092
3997
52
3997
4815
1805
5951
1805
5951
2849
4815
2849
6774
1829
7896
1829
7896
2869
6774
2869
5708
4469
6930
4469
6930
5660
5708
5660
5
1293
1078
1293
1078
2349
5
2349
2551
999
3662
999
3662
2054
2551
2054
4666
1307
5807
1307
5807
2406
4666
2406

BIN
u2metp_ymj_small.onnx Normal file

Binary file not shown.