calibration_tools_v1.0/calibration_mian.py

535 lines
22 KiB
Python
Raw Permalink Normal View History

2025-02-20 10:45:17 +08:00
# -- coding: utf-8 --
import aurco_3d_detect as aurco3d
import cloud_3d_detect as cloud3d
import svd
import utils
import os
import sys
from utils import Log
from config import *
CAMERA_FILE = os.path.dirname(__file__) + "\\samples\\1\\output.png"
LIDAR_FILE = os.path.dirname(__file__) + "\\samples\\1\\output.ply"
sys.path.append(r"./")
_dict = [
"LEFT",
"TOP",
"RIGHT",
"BOTTOM",
]
def do_calibration_with_one_sample(camera_file_path = CAMERA_FILE, lidar_file_path = LIDAR_FILE):
global CAMERA_FILE, LIDAR_FILE
if CAMERA_FILE != camera_file_path :
CAMERA_FILE = camera_file_path
if LIDAR_FILE != lidar_file_path :
LIDAR_FILE = lidar_file_path
frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners(CAMERA_FILE)
# 分割标定板
marks_pcd = cloud3d.seg_mark_cloud(LIDAR_FILE, kk, show=False)
if len(marks_pcd) != MARK_COUNT:
return -1, None, None
pcd_marks_conners = cloud3d.cal_marks_3d_conners(show=False)
pcd_marks_conners_new = []
for i in range(4):
tmmp = []
for j in range(4):
tmmp.append(pcd_marks_conners[i].points[j])
pcd_marks_conners_new.append(tmmp)
# 排除异常点
__exclude = []
while True:
__max = 0
__min = 10
__mean = 0
__cnt = 0
__conner_mean = 0
__conner_cnt = 0
__conners_mean_distance = []
for i in range(16):
src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4]
src_cloud_mark_conner = pcd_marks_conners_new[int(i/4)][i%4]
for j in range(16):
if i in __exclude or j in __exclude:
continue
dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4]
dst_cloud_mark_conner = pcd_marks_conners_new[int(j/4)][j%4]
aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner)
lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner)
diff_distance = abs(aurco_distance - lidar_distance)
if diff_distance > __max:
__max = diff_distance
if diff_distance < __min:
__min = diff_distance
__cnt += 1
__mean += diff_distance
__conner_cnt += 1
__conner_mean += diff_distance
# print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4]
# + " = " + str(diff_distance))
if __conner_cnt == 0: tmp=0
else : tmp = __conner_mean/__conner_cnt
__conners_mean_distance.append(tmp)
__conner_cnt = 0
__conner_mean = 0
__mean = __mean/__cnt
Log.info("16点对比结果")
print("__max:" + str(__max))
print("__min:" + str(__min))
print("__cnt:" + str(__cnt))
print("__mean:" + str(__mean))
print("__conners_mean_distance:")
# for test
# for i in range(len(__conners_mean_distance)):
# print(__conners_mean_distance[i])
max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0]
if len(__exclude) < conf_max_exclude_num:
__exclude.append(max_index)
else:
break
# 初始化SVD数据
list_marks_aurco = []
list_marks_pcd = []
for i in range(16):
# for test
print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners_new[int(i/4)][i%4][0],
aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners_new[int(i/4)][i%4][1],
aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners_new[int(i/4)][i%4][2])
if i in __exclude:
continue
list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4])
list_marks_pcd.append(pcd_marks_conners_new[int(i/4)][i%4])
# SVD 分解得到 RT
_R, _t, _ ,_ = svd.scipy_svd(np.asarray(list_marks_pcd), np.asarray(list_marks_aurco))
# _r, _p, _y = utils.rotation_matrix_to_euler_angles(_R)
_r, _p, _y = utils.rotation_matrix_to_rpy(_R)
print(_R)
print(_t)
print(_r, _p, _y)
return 0, [_t[0],_t[1],_t[2]], [_r, _p, _y], __mean
g_marks_pcd = []
def step1_get_aruco_and_lidar_marks_and_conners(
camera_file_path = CAMERA_FILE,
lidar_file_path = LIDAR_FILE,
times = 1,
simple=True):
global CAMERA_FILE, LIDAR_FILE, g_marks_pcd
if CAMERA_FILE != camera_file_path :
CAMERA_FILE = camera_file_path
if LIDAR_FILE != lidar_file_path :
LIDAR_FILE = lidar_file_path
frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners(CAMERA_FILE)
if simple:
# 分割标定板
if times == 1: # 第一次执行时分割
g_marks_pcd = cloud3d.seg_mark_cloud_simple(LIDAR_FILE, kk, show=False)
if len(g_marks_pcd) != MARK_COUNT:
return None, None
pcd_marks_conners = cloud3d.cal_marks_3d_conners_simple(g_marks_pcd, show=False)
else:
# 分割标定板
if times == 1: # 第一次执行时分割
g_marks_pcd = cloud3d.seg_mark_cloud(LIDAR_FILE, kk, show=False)
if len(g_marks_pcd) != MARK_COUNT:
return None, None
pcd_marks_conners = cloud3d.cal_marks_3d_conners(show=False)
pcd_marks_conners_new = []
for i in range(4):
tmmp = []
for j in range(4):
tmmp.append(pcd_marks_conners[i].points[j])
pcd_marks_conners_new.append(tmmp)
return aurco_marks_conners, pcd_marks_conners_new
def step1_get_aruco_and_lidar_marks_and_conners_v2(
camera_file_path = CAMERA_FILE,
lidar_file_path = LIDAR_FILE,
times = 1,
simple=True):
global CAMERA_FILE, LIDAR_FILE, g_marks_pcd
if CAMERA_FILE != camera_file_path :
CAMERA_FILE = camera_file_path
if LIDAR_FILE != lidar_file_path :
LIDAR_FILE = lidar_file_path
frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners_v2(CAMERA_FILE)
frame, pcd_marks_conners = aurco3d.cal_lidar_marks_3d_conner_v2(LIDAR_FILE)
pcd_marks_conners_new = []
for i in range(4):
tmmp = []
for j in range(4):
tmmp.append(pcd_marks_conners[i][j])
pcd_marks_conners_new.append(tmmp)
return aurco_marks_conners, pcd_marks_conners_new
def step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners):
# 排除异常点
__exclude = []
while True:
__max = 0
__min = 10
__mean = 0
__cnt = 0
__conner_mean = 0
__conner_cnt = 0
__conners_mean_distance = []
for i in range(16):
src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4]
src_cloud_mark_conner = pcd_marks_conners[int(i/4)][i%4]
for j in range(16):
if i in __exclude or j in __exclude:
continue
dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4]
dst_cloud_mark_conner = pcd_marks_conners[int(j/4)][j%4]
aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner)
lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner)
diff_distance = abs(aurco_distance - lidar_distance)
if diff_distance > __max:
__max = diff_distance
if diff_distance < __min:
__min = diff_distance
__cnt += 1
__mean += diff_distance
__conner_cnt += 1
__conner_mean += diff_distance
# print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4]
# + " = " + str(diff_distance))
if __conner_cnt == 0: tmp=0
else : tmp = __conner_mean/__conner_cnt
__conners_mean_distance.append(tmp)
__conner_cnt = 0
__conner_mean = 0
__mean = __mean/__cnt
Log.info("16点对比结果")
print("__max:" + str(__max))
print("__min:" + str(__min))
print("__cnt:" + str(__cnt))
print("__mean:" + str(__mean))
print("__conners_mean_distance:")
# for test
# for i in range(len(__conners_mean_distance)):
# print(__conners_mean_distance[i])
max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0]
if len(__exclude) < conf_max_exclude_num:
__exclude.append(max_index)
else:
break
# 初始化SVD数据
list_marks_aurco = []
list_marks_pcd = []
for i in range(16):
# for test
print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners[int(i/4)][i%4][0],
aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners[int(i/4)][i%4][1],
aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners[int(i/4)][i%4][2])
if i in __exclude:
continue
list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4])
list_marks_pcd.append(pcd_marks_conners[int(i/4)][i%4])
return list_marks_aurco, list_marks_pcd, __mean
def step2_exclude_bad_conners_v2(aurco_marks_conners, pcd_marks_conners):
# 排除异常点
__exclude = []
while True:
__max = 0
__min = 10
__mean = 0
__cnt = 0
__conner_mean = 0
__conner_cnt = 0
__conners_mean_distance = []
for i in range(16):
src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4]
src_cloud_mark_conner = pcd_marks_conners[int(i/4)][i%4]
for j in range(16):
if i in __exclude or j in __exclude:
continue
dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4]
dst_cloud_mark_conner = pcd_marks_conners[int(j/4)][j%4]
aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner)
lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner)
diff_distance = abs(aurco_distance - lidar_distance)
if diff_distance > __max:
__max = diff_distance
if diff_distance < __min:
__min = diff_distance
__cnt += 1
__mean += diff_distance
__conner_cnt += 1
__conner_mean += diff_distance
# print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4]
# + " = " + str(diff_distance))
if __conner_cnt == 0: tmp=0
else : tmp = __conner_mean/__conner_cnt
__conners_mean_distance.append(tmp)
__conner_cnt = 0
__conner_mean = 0
__mean = __mean/__cnt
Log.info("16点对比结果")
print("__max:" + str(__max))
print("__min:" + str(__min))
print("__cnt:" + str(__cnt))
print("__mean:" + str(__mean))
print("__conners_mean_distance:")
# for test
# for i in range(len(__conners_mean_distance)):
# print(__conners_mean_distance[i])
max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0]
if len(__exclude) < conf_max_exclude_num:
__exclude.append(max_index)
else:
break
# 初始化SVD数据
list_marks_aurco = []
list_marks_pcd = []
Log.info("16点对比结果")
for i in range(16):
# for test
print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners[int(i/4)][i%4][0],
aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners[int(i/4)][i%4][1],
aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners[int(i/4)][i%4][2])
list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4])
list_marks_pcd.append(pcd_marks_conners[int(i/4)][i%4])
return list_marks_aurco, list_marks_pcd, __mean
import open3d as o3d
def create_spheres(points, sizes=None, color=[0, 0.706, 1]):
"""
创建一系列球体以替代点云中的点允许每个点有不同的大小
参数:
points: N x 3 数组表示N个点的位置
sizes: N维数组或列表表示对应点的半径大小如果为None则所有球体大小相同
color: RGB颜色值默认是橙色
返回:
spheres: 包含所有球体的Open3D几何对象列表
"""
spheres = []
if sizes is None:
sizes = [0.006] * len(points) # 如果没有提供大小,则默认所有球体大小相同
for i, point in enumerate(points):
sphere = o3d.geometry.TriangleMesh.create_sphere(radius=sizes[i])
sphere.translate(point) # 将球体移动到点的位置
sphere.paint_uniform_color(color)
spheres.append(sphere)
return spheres
def step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners):
# SVD 分解得到 RT
array_aurco_marks_conners = np.asarray(aurco_marks_conners)
array_pcd_marks_conners = np.asarray(pcd_marks_conners)
_R, _t, _ ,_ = svd.scipy_svd(array_pcd_marks_conners, array_aurco_marks_conners)
# _r, _p, _y = utils.rotation_matrix_to_euler_angles(_R)
_r, _p, _y = utils.rotation_matrix_to_rpy(_R)
print(_R)
print(_t)
print(_r, _p, _y)
# for test
_pdc_rt = (_R @ np.asarray(array_pcd_marks_conners).T).T + _t
if True:
# 角点显示的圆球半径,单位(米)
spheres_2d = create_spheres(array_aurco_marks_conners[:16], None, color=[0, 0.706, 1])
spheres_3d = create_spheres(_pdc_rt[:16], None, color=[0, 0, 0])
points_list = []
for i in range(len(array_aurco_marks_conners[:16])):
points_list.append(spheres_2d[i])
points_list.append(spheres_3d[i])
# o3d.visualization.draw_geometries(points_list)
return 0, [_t[0],_t[1],_t[2]], [_r, _p, _y]
def calibration_by_one_samples_points():
calibration_result = []
bad_samples = []
samples_path_root = os.path.dirname(__file__) + "\\samples\\calibration"
# 标定使用的样本数量
samples_size = 1
# 每个样本执行的次数
times_per_one_sample = 2
for subdir in utils.list_subdirs(samples_path_root):
paths = utils.traverse_folder(samples_path_root + "\\" + subdir)
if len(paths) != 2:
print("[ERROR]\t Data Error Path : " + paths)
times = 0
while times < times_per_one_sample:
times += 1
ret, _t, _rpy, __mean = do_calibration_with_one_sample(paths[1], paths[0]) # 0 png, 1 ply
if ret != 0:
bad_samples.append(subdir)
continue
# 合并得到最终得旋转角
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy])
samples_size -= 1
if samples_size == 0:
break
for i in range(len(calibration_result)):
str_mean = str(calibration_result[i][2])
str_t = str(calibration_result[i][3])
str_ypr = str(calibration_result[i][4])
str_new_rpy = str(calibration_result[i][5])
print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy)
for i in range(len(bad_samples)):
print(bad_samples[i])
utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result)
def calibration_by_all_samples_points(samples_path_root, samples_size=1, times_per_one_sample=2):
calibration_result = []
bad_samples = []
all_points = []
# 定义总体的 aurco_marks_conners, pcd_marks_conners
aurco_marks_conners_all = []
pcd_marks_conners_all = []
for subdir in utils.list_subdirs(samples_path_root):
paths = utils.traverse_folder(samples_path_root + "\\" + subdir)
if len(paths) != 2:
print("[ERROR]\t Data Error Path : " + paths)
times = 0
while times < times_per_one_sample:
times += 1
aurco_marks_conners, pcd_marks_conners = \
step1_get_aruco_and_lidar_marks_and_conners(paths[1], paths[0], times) # 0 png, 1 ply
if aurco_marks_conners is None:
Log.error("subdir is " + subdir + " aurco_marks_conners has ERROR")
break
aurco_marks_conners, pcd_marks_conners, __mean = \
step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners)
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners)
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy])
for it in range(len(aurco_marks_conners)):
aurco_marks_conners_all.append(aurco_marks_conners[it].tolist())
pcd_marks_conners_all.append(pcd_marks_conners[it].tolist())
samples_size -= 1
if samples_size == 0:
break
# 将多样本多次计算的综合点统一进行SVD分解
if len(aurco_marks_conners_all) == 0:
return -1, None, None, None
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners_all, pcd_marks_conners_all)
#
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append(["all", str(len(aurco_marks_conners_all)) + " & " + str(len(pcd_marks_conners_all)), \
__mean, _t, _rpy, _new_rpy])
for i in range(len(calibration_result)):
str_mean = str(calibration_result[i][2])
str_t = str(calibration_result[i][3])
str_ypr = str(calibration_result[i][4])
str_new_rpy = str(calibration_result[i][5])
print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy)
for i in range(len(bad_samples)):
print(bad_samples[i])
utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result)
all_points.append(aurco_marks_conners_all)
all_points.append(pcd_marks_conners_all)
utils.save_to_json(os.path.dirname(__file__) + '\\record\\points.json', all_points)
return 0, _t, _rpy, __mean
def calibration_by_all_samples_points_v2(samples_path_root, samples_size=1, times_per_one_sample=2):
calibration_result = []
bad_samples = []
all_points = []
# 定义总体的 aurco_marks_conners, pcd_marks_conners
aurco_marks_conners_all = []
pcd_marks_conners_all = []
for subdir in utils.list_subdirs(samples_path_root):
paths = utils.traverse_folder(samples_path_root + "\\" + subdir)
if len(paths) != 2:
print("[ERROR]\t Data Error Path : " + paths)
times = 0
while times < times_per_one_sample:
times += 1
aurco_marks_conners, pcd_marks_conners = \
step1_get_aruco_and_lidar_marks_and_conners_v2(paths[1], paths[0], times) # 0 png, 1 ply
if aurco_marks_conners is None:
Log.error("subdir is " + subdir + " aurco_marks_conners has ERROR")
break
aurco_marks_conners, pcd_marks_conners, __mean = \
step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners)
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners)
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy])
for it in range(len(aurco_marks_conners)):
aurco_marks_conners_all.append(aurco_marks_conners[it].tolist())
pcd_marks_conners_all.append(pcd_marks_conners[it].tolist())
samples_size -= 1
if samples_size == 0:
break
# 将多样本多次计算的综合点统一进行SVD分解
if len(aurco_marks_conners_all) == 0:
return -1, None, None, None
_, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners_all, pcd_marks_conners_all)
_new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2])
calibration_result.append(["all", str(len(aurco_marks_conners_all)) + " & " + str(len(pcd_marks_conners_all)), \
__mean, _t, _rpy, _new_rpy])
for i in range(len(calibration_result)):
str_mean = str(calibration_result[i][2])
str_t = str(calibration_result[i][3])
str_ypr = str(calibration_result[i][4])
str_new_rpy = str(calibration_result[i][5])
print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy)
for i in range(len(bad_samples)):
print(bad_samples[i])
utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result)
all_points.append(aurco_marks_conners_all)
all_points.append(pcd_marks_conners_all)
utils.save_to_json(os.path.dirname(__file__) + '\\record\\points.json', all_points)
return 0, _t, _rpy, __mean
if __name__ == '__main__':
# calibration_by_one_samples_points()
samples_size = 5
times_per_one_sample = 1
# samples_path_root = os.path.dirname(__file__) + "\\ssh_tmp"
samples_path_root = os.path.dirname(__file__) + "\\samples\\20250214"
ret, _t, _rpy, __mean = calibration_by_all_samples_points(samples_path_root, samples_size, times_per_one_sample)
# ret, _t, _rpy, __mean = calibration_by_all_samples_points_v2(samples_path_root, samples_size, times_per_one_sample)
# rt = utils.read_from_json(os.path.dirname(__file__) + '\\record\\record.json')
# print(rt)