calibration_tools_v1.0/calibration_mian.py
2025-02-20 10:45:17 +08:00

535 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -- coding: utf-8 --
import aurco_3d_detect as aurco3d
import cloud_3d_detect as cloud3d
import svd
import utils
import os
import sys
from utils import Log
from config import *
CAMERA_FILE = os.path.dirname(__file__) + "\\samples\\1\\output.png"
LIDAR_FILE = os.path.dirname(__file__) + "\\samples\\1\\output.ply"
sys.path.append(r"./")
_dict = [
"LEFT",
"TOP",
"RIGHT",
"BOTTOM",
]
def do_calibration_with_one_sample(camera_file_path = CAMERA_FILE, lidar_file_path = LIDAR_FILE):
global CAMERA_FILE, LIDAR_FILE
if CAMERA_FILE != camera_file_path :
CAMERA_FILE = camera_file_path
if LIDAR_FILE != lidar_file_path :
LIDAR_FILE = lidar_file_path
frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners(CAMERA_FILE)
# 分割标定板
marks_pcd = cloud3d.seg_mark_cloud(LIDAR_FILE, kk, show=False)
if len(marks_pcd) != MARK_COUNT:
return -1, None, None
pcd_marks_conners = cloud3d.cal_marks_3d_conners(show=False)
pcd_marks_conners_new = []
for i in range(4):
tmmp = []
for j in range(4):
tmmp.append(pcd_marks_conners[i].points[j])
pcd_marks_conners_new.append(tmmp)
# 排除异常点
__exclude = []
while True:
__max = 0
__min = 10
__mean = 0
__cnt = 0
__conner_mean = 0
__conner_cnt = 0
__conners_mean_distance = []
for i in range(16):
src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4]
src_cloud_mark_conner = pcd_marks_conners_new[int(i/4)][i%4]
for j in range(16):
if i in __exclude or j in __exclude:
continue
dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4]
dst_cloud_mark_conner = pcd_marks_conners_new[int(j/4)][j%4]
aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner)
lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner)
diff_distance = abs(aurco_distance - lidar_distance)
if diff_distance > __max:
__max = diff_distance
if diff_distance < __min:
__min = diff_distance
__cnt += 1
__mean += diff_distance
__conner_cnt += 1
__conner_mean += diff_distance
# print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4]
# + " = " + str(diff_distance))
if __conner_cnt == 0: tmp=0
else : tmp = __conner_mean/__conner_cnt
__conners_mean_distance.append(tmp)
__conner_cnt = 0
__conner_mean = 0
__mean = __mean/__cnt
Log.info("16点对比结果")
print("__max:" + str(__max))
print("__min:" + str(__min))
print("__cnt:" + str(__cnt))
print("__mean:" + str(__mean))
print("__conners_mean_distance:")
# for test
# for i in range(len(__conners_mean_distance)):
# print(__conners_mean_distance[i])
max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0]
if len(__exclude) < conf_max_exclude_num:
__exclude.append(max_index)
else:
break
# 初始化SVD数据
list_marks_aurco = []
list_marks_pcd = []
for i in range(16):
# for test
print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners_new[int(i/4)][i%4][0],
aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners_new[int(i/4)][i%4][1],
aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners_new[int(i/4)][i%4][2])
if i in __exclude:
continue
list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4])
list_marks_pcd.append(pcd_marks_conners_new[int(i/4)][i%4])
# SVD 分解得到 RT
_R, _t, _ ,_ = svd.scipy_svd(np.asarray(list_marks_pcd), np.asarray(list_marks_aurco))
# _r, _p, _y = utils.rotation_matrix_to_euler_angles(_R)
_r, _p, _y = utils.rotation_matrix_to_rpy(_R)
print(_R)
print(_t)
print(_r, _p, _y)
return 0, [_t[0],_t[1],_t[2]], [_r, _p, _y], __mean
g_marks_pcd = []
def step1_get_aruco_and_lidar_marks_and_conners(
camera_file_path = CAMERA_FILE,
lidar_file_path = LIDAR_FILE,
times = 1,
simple=True):
global CAMERA_FILE, LIDAR_FILE, g_marks_pcd
if CAMERA_FILE != camera_file_path :
CAMERA_FILE = camera_file_path
if LIDAR_FILE != lidar_file_path :
LIDAR_FILE = lidar_file_path
frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners(CAMERA_FILE)
if simple:
# 分割标定板
if times == 1: # 第一次执行时分割
g_marks_pcd = cloud3d.seg_mark_cloud_simple(LIDAR_FILE, kk, show=False)
if len(g_marks_pcd) != MARK_COUNT:
return None, None
pcd_marks_conners = cloud3d.cal_marks_3d_conners_simple(g_marks_pcd, show=False)
else:
# 分割标定板
if times == 1: # 第一次执行时分割
g_marks_pcd = cloud3d.seg_mark_cloud(LIDAR_FILE, kk, show=False)
if len(g_marks_pcd) != MARK_COUNT:
return None, None
pcd_marks_conners = cloud3d.cal_marks_3d_conners(show=False)
pcd_marks_conners_new = []
for i in range(4):
tmmp = []
for j in range(4):
tmmp.append(pcd_marks_conners[i].points[j])
pcd_marks_conners_new.append(tmmp)
return aurco_marks_conners, pcd_marks_conners_new
def step1_get_aruco_and_lidar_marks_and_conners_v2(
camera_file_path = CAMERA_FILE,
lidar_file_path = LIDAR_FILE,
times = 1,
simple=True):
global CAMERA_FILE, LIDAR_FILE, g_marks_pcd
if CAMERA_FILE != camera_file_path :
CAMERA_FILE = camera_file_path
if LIDAR_FILE != lidar_file_path :
LIDAR_FILE = lidar_file_path
frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners_v2(CAMERA_FILE)
frame, pcd_marks_conners = aurco3d.cal_lidar_marks_3d_conner_v2(LIDAR_FILE)
pcd_marks_conners_new = []
for i in range(4):
tmmp = []
for j in range(4):
tmmp.append(pcd_marks_conners[i][j])
pcd_marks_conners_new.append(tmmp)
return aurco_marks_conners, pcd_marks_conners_new
def step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners):
# 排除异常点
__exclude = []
while True:
__max = 0
__min = 10
__mean = 0
__cnt = 0
__conner_mean = 0
__conner_cnt = 0
__conners_mean_distance = []
for i in range(16):
src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4]
src_cloud_mark_conner = pcd_marks_conners[int(i/4)][i%4]
for j in range(16):
if i in __exclude or j in __exclude:
continue
dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4]
dst_cloud_mark_conner = pcd_marks_conners[int(j/4)][j%4]
aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner)
lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner)
diff_distance = abs(aurco_distance - lidar_distance)
if diff_distance > __max:
__max = diff_distance
if diff_distance < __min:
__min = diff_distance
__cnt += 1
__mean += diff_distance
__conner_cnt += 1
__conner_mean += diff_distance
# print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4]
# + " = " + str(diff_distance))
if __conner_cnt == 0: tmp=0
else : tmp = __conner_mean/__conner_cnt
__conners_mean_distance.append(tmp)
__conner_cnt = 0
__conner_mean = 0
__mean = __mean/__cnt
Log.info("16点对比结果")
print("__max:" + str(__max))
print("__min:" + str(__min))
print("__cnt:" + str(__cnt))
print("__mean:" + str(__mean))
print("__conners_mean_distance:")
# for test
# for i in range(len(__conners_mean_distance)):
# print(__conners_mean_distance[i])
max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0]
if len(__exclude) < conf_max_exclude_num:
__exclude.append(max_index)
else:
break
# 初始化SVD数据
list_marks_aurco = []
list_marks_pcd = []
for i in range(16):
# for test
print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners[int(i/4)][i%4][0],
aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners[int(i/4)][i%4][1],
aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners[int(i/4)][i%4][2])
if i in __exclude:
continue
list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4])
list_marks_pcd.append(pcd_marks_conners[int(i/4)][i%4])
return list_marks_aurco, list_marks_pcd, __mean
def step2_exclude_bad_conners_v2(aurco_marks_conners, pcd_marks_conners):
# 排除异常点
__exclude = []
while True:
__max = 0
__min = 10
__mean = 0
__cnt = 0
__conner_mean = 0
__conner_cnt = 0
__conners_mean_distance = []
for i in range(16):
src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4]
src_cloud_mark_conner = pcd_marks_conners[int(i/4)][i%4]
for j in range(16):
if i in __exclude or j in __exclude:
continue
dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4]
dst_cloud_mark_conner = pcd_marks_conners[int(j/4)][j%4]
aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner)
lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner)
diff_distance = abs(aurco_distance - lidar_distance)
if diff_distance > __max:
__max = diff_distance
if diff_distance < __min:
__min = diff_distance
__cnt += 1
__mean += diff_distance
__conner_cnt += 1
__conner_mean += diff_distance
# print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4]
# + " = " + str(diff_distance))
if __conner_cnt == 0: tmp=0
else : tmp = __conner_mean/__conner_cnt
__conners_mean_distance.append(tmp)
__conner_cnt = 0
__conner_mean = 0
__mean = __mean/__cnt
Log.info("16点对比结果")
print("__max:" + str(__max))
print("__min:" + str(__min))
print("__cnt:" + str(__cnt))
print("__mean:" + str(__mean))
print("__conners_mean_distance:")
# for test
# for i in range(len(__conners_mean_distance)):
# print(__conners_mean_distance[i])
max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0]
if len(__exclude) < conf_max_exclude_num:
__exclude.append(max_index)
else:
break
# 初始化SVD数据
list_marks_aurco = []
list_marks_pcd = []
Log.info("16点对比结果")
for i in range(16):
# for test
print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners[int(i/4)][i%4][0],
aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners[int(i/4)][i%4][1],
aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners[int(i/4)][i%4][2])
list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4])
list_marks_pcd.append(pcd_marks_conners[int(i/4)][i%4])
return list_marks_aurco, list_marks_pcd, __mean
import open3d as o3d
def create_spheres(points, sizes=None, color=[0, 0.706, 1]):
"""
创建一系列球体以替代点云中的点,允许每个点有不同的大小。
参数:
points: N x 3 数组表示N个点的位置。
sizes: N维数组或列表表示对应点的半径大小。如果为None则所有球体大小相同。
color: RGB颜色值默认是橙色。
返回:
spheres: 包含所有球体的Open3D几何对象列表。
"""
spheres = []
if sizes is None:
sizes = [0.006] * len(points) # 如果没有提供大小,则默认所有球体大小相同
for i, point in enumerate(points):
sphere = o3d.geometry.TriangleMesh.create_sphere(radius=sizes[i])
sphere.translate(point) # 将球体移动到点的位置
sphere.paint_uniform_color(color)
spheres.append(sphere)
return spheres
def step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners):
# SVD 分解得到 RT
array_aurco_marks_conners = np.asarray(aurco_marks_conners)
array_pcd_marks_conners = np.asarray(pcd_marks_conners)
_R, _t, _ ,_ = svd.scipy_svd(array_pcd_marks_conners, array_aurco_marks_conners)
# _r, _p, _y = utils.rotation_matrix_to_euler_angles(_R)
_r, _p, _y = utils.rotation_matrix_to_rpy(_R)
print(_R)
print(_t)
print(_r, _p, _y)
# for test
_pdc_rt = (_R @ np.asarray(array_pcd_marks_conners).T).T + _t
if True:
# 角点显示的圆球半径,单位(米)
spheres_2d = create_spheres(array_aurco_marks_conners[:16], None, color=[0, 0.706, 1])
spheres_3d = create_spheres(_pdc_rt[:16], None, color=[0, 0, 0])
points_list = []
for i in range(len(array_aurco_marks_conners[:16])):
points_list.append(spheres_2d[i])
points_list.append(spheres_3d[i])
# o3d.visualization.draw_geometries(points_list)
return 0, [_t[0],_t[1],_t[2]], [_r, _p, _y]
def calibration_by_one_samples_points():
calibration_result = []
bad_samples = []
samples_path_root = os.path.dirname(__file__) + "\\samples\\calibration"
# 标定使用的样本数量
samples_size = 1
# 每个样本执行的次数
times_per_one_sample = 2
for subdir in utils.list_subdirs(samples_path_root):
paths = utils.traverse_folder(samples_path_root + "\\" + subdir)
if len(paths) != 2:
print("[ERROR]\t Data Error Path : " + paths)
times = 0
while times < times_per_one_sample:
times += 1
ret, _t, _rpy, __mean = do_calibration_with_one_sample(paths[1], paths[0]) # 0 png, 1 ply
if ret != 0:
bad_samples.append(subdir)
continue
# 合并得到最终得旋转角
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy])
samples_size -= 1
if samples_size == 0:
break
for i in range(len(calibration_result)):
str_mean = str(calibration_result[i][2])
str_t = str(calibration_result[i][3])
str_ypr = str(calibration_result[i][4])
str_new_rpy = str(calibration_result[i][5])
print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy)
for i in range(len(bad_samples)):
print(bad_samples[i])
utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result)
def calibration_by_all_samples_points(samples_path_root, samples_size=1, times_per_one_sample=2):
calibration_result = []
bad_samples = []
all_points = []
# 定义总体的 aurco_marks_conners, pcd_marks_conners
aurco_marks_conners_all = []
pcd_marks_conners_all = []
for subdir in utils.list_subdirs(samples_path_root):
paths = utils.traverse_folder(samples_path_root + "\\" + subdir)
if len(paths) != 2:
print("[ERROR]\t Data Error Path : " + paths)
times = 0
while times < times_per_one_sample:
times += 1
aurco_marks_conners, pcd_marks_conners = \
step1_get_aruco_and_lidar_marks_and_conners(paths[1], paths[0], times) # 0 png, 1 ply
if aurco_marks_conners is None:
Log.error("subdir is " + subdir + " aurco_marks_conners has ERROR")
break
aurco_marks_conners, pcd_marks_conners, __mean = \
step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners)
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners)
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy])
for it in range(len(aurco_marks_conners)):
aurco_marks_conners_all.append(aurco_marks_conners[it].tolist())
pcd_marks_conners_all.append(pcd_marks_conners[it].tolist())
samples_size -= 1
if samples_size == 0:
break
# 将多样本多次计算的综合点统一进行SVD分解
if len(aurco_marks_conners_all) == 0:
return -1, None, None, None
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners_all, pcd_marks_conners_all)
#
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append(["all", str(len(aurco_marks_conners_all)) + " & " + str(len(pcd_marks_conners_all)), \
__mean, _t, _rpy, _new_rpy])
for i in range(len(calibration_result)):
str_mean = str(calibration_result[i][2])
str_t = str(calibration_result[i][3])
str_ypr = str(calibration_result[i][4])
str_new_rpy = str(calibration_result[i][5])
print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy)
for i in range(len(bad_samples)):
print(bad_samples[i])
utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result)
all_points.append(aurco_marks_conners_all)
all_points.append(pcd_marks_conners_all)
utils.save_to_json(os.path.dirname(__file__) + '\\record\\points.json', all_points)
return 0, _t, _rpy, __mean
def calibration_by_all_samples_points_v2(samples_path_root, samples_size=1, times_per_one_sample=2):
calibration_result = []
bad_samples = []
all_points = []
# 定义总体的 aurco_marks_conners, pcd_marks_conners
aurco_marks_conners_all = []
pcd_marks_conners_all = []
for subdir in utils.list_subdirs(samples_path_root):
paths = utils.traverse_folder(samples_path_root + "\\" + subdir)
if len(paths) != 2:
print("[ERROR]\t Data Error Path : " + paths)
times = 0
while times < times_per_one_sample:
times += 1
aurco_marks_conners, pcd_marks_conners = \
step1_get_aruco_and_lidar_marks_and_conners_v2(paths[1], paths[0], times) # 0 png, 1 ply
if aurco_marks_conners is None:
Log.error("subdir is " + subdir + " aurco_marks_conners has ERROR")
break
aurco_marks_conners, pcd_marks_conners, __mean = \
step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners)
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners)
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy])
for it in range(len(aurco_marks_conners)):
aurco_marks_conners_all.append(aurco_marks_conners[it].tolist())
pcd_marks_conners_all.append(pcd_marks_conners[it].tolist())
samples_size -= 1
if samples_size == 0:
break
# 将多样本多次计算的综合点统一进行SVD分解
if len(aurco_marks_conners_all) == 0:
return -1, None, None, None
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners_all, pcd_marks_conners_all)
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append(["all", str(len(aurco_marks_conners_all)) + " & " + str(len(pcd_marks_conners_all)), \
__mean, _t, _rpy, _new_rpy])
for i in range(len(calibration_result)):
str_mean = str(calibration_result[i][2])
str_t = str(calibration_result[i][3])
str_ypr = str(calibration_result[i][4])
str_new_rpy = str(calibration_result[i][5])
print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy)
for i in range(len(bad_samples)):
print(bad_samples[i])
utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result)
all_points.append(aurco_marks_conners_all)
all_points.append(pcd_marks_conners_all)
utils.save_to_json(os.path.dirname(__file__) + '\\record\\points.json', all_points)
return 0, _t, _rpy, __mean
if __name__ == '__main__':
# calibration_by_one_samples_points()
samples_size = 5
times_per_one_sample = 1
# samples_path_root = os.path.dirname(__file__) + "\\ssh_tmp"
samples_path_root = os.path.dirname(__file__) + "\\samples\\20250214"
ret, _t, _rpy, __mean = calibration_by_all_samples_points(samples_path_root, samples_size, times_per_one_sample)
# ret, _t, _rpy, __mean = calibration_by_all_samples_points_v2(samples_path_root, samples_size, times_per_one_sample)
# rt = utils.read_from_json(os.path.dirname(__file__) + '\\record\\record.json')
# print(rt)