# -- coding: utf-8 -- import aurco_3d_detect as aurco3d import cloud_3d_detect as cloud3d import svd import utils import os import sys from utils import Log from config import * CAMERA_FILE = os.path.dirname(__file__) + "\\samples\\1\\output.png" LIDAR_FILE = os.path.dirname(__file__) + "\\samples\\1\\output.ply" sys.path.append(r"./") _dict = [ "LEFT", "TOP", "RIGHT", "BOTTOM", ] def do_calibration_with_one_sample(camera_file_path = CAMERA_FILE, lidar_file_path = LIDAR_FILE): global CAMERA_FILE, LIDAR_FILE if CAMERA_FILE != camera_file_path : CAMERA_FILE = camera_file_path if LIDAR_FILE != lidar_file_path : LIDAR_FILE = lidar_file_path frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners(CAMERA_FILE) # 分割标定板 marks_pcd = cloud3d.seg_mark_cloud(LIDAR_FILE, kk, show=False) if len(marks_pcd) != MARK_COUNT: return -1, None, None pcd_marks_conners = cloud3d.cal_marks_3d_conners(show=False) pcd_marks_conners_new = [] for i in range(4): tmmp = [] for j in range(4): tmmp.append(pcd_marks_conners[i].points[j]) pcd_marks_conners_new.append(tmmp) # 排除异常点 __exclude = [] while True: __max = 0 __min = 10 __mean = 0 __cnt = 0 __conner_mean = 0 __conner_cnt = 0 __conners_mean_distance = [] for i in range(16): src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4] src_cloud_mark_conner = pcd_marks_conners_new[int(i/4)][i%4] for j in range(16): if i in __exclude or j in __exclude: continue dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4] dst_cloud_mark_conner = pcd_marks_conners_new[int(j/4)][j%4] aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner) lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner) diff_distance = abs(aurco_distance - lidar_distance) if diff_distance > __max: __max = diff_distance if diff_distance < __min: __min = diff_distance __cnt += 1 __mean += diff_distance __conner_cnt += 1 __conner_mean += diff_distance # print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4] # + " = " + str(diff_distance)) if __conner_cnt == 0: tmp=0 else : tmp = __conner_mean/__conner_cnt __conners_mean_distance.append(tmp) __conner_cnt = 0 __conner_mean = 0 __mean = __mean/__cnt Log.info("16点对比结果") print("__max:" + str(__max)) print("__min:" + str(__min)) print("__cnt:" + str(__cnt)) print("__mean:" + str(__mean)) print("__conners_mean_distance:") # for test # for i in range(len(__conners_mean_distance)): # print(__conners_mean_distance[i]) max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0] if len(__exclude) < conf_max_exclude_num: __exclude.append(max_index) else: break # 初始化SVD数据 list_marks_aurco = [] list_marks_pcd = [] for i in range(16): # for test print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners_new[int(i/4)][i%4][0], aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners_new[int(i/4)][i%4][1], aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners_new[int(i/4)][i%4][2]) if i in __exclude: continue list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4]) list_marks_pcd.append(pcd_marks_conners_new[int(i/4)][i%4]) # SVD 分解得到 RT _R, _t, _ ,_ = svd.scipy_svd(np.asarray(list_marks_pcd), np.asarray(list_marks_aurco)) # _r, _p, _y = utils.rotation_matrix_to_euler_angles(_R) _r, _p, _y = utils.rotation_matrix_to_rpy(_R) print(_R) print(_t) print(_r, _p, _y) return 0, [_t[0],_t[1],_t[2]], [_r, _p, _y], __mean g_marks_pcd = [] def step1_get_aruco_and_lidar_marks_and_conners( camera_file_path = CAMERA_FILE, lidar_file_path = LIDAR_FILE, times = 1, simple=True): global CAMERA_FILE, LIDAR_FILE, g_marks_pcd if CAMERA_FILE != camera_file_path : CAMERA_FILE = camera_file_path if LIDAR_FILE != lidar_file_path : LIDAR_FILE = lidar_file_path frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners(CAMERA_FILE) if simple: # 分割标定板 if times == 1: # 第一次执行时分割 g_marks_pcd = cloud3d.seg_mark_cloud_simple(LIDAR_FILE, kk, show=False) if len(g_marks_pcd) != MARK_COUNT: return None, None pcd_marks_conners = cloud3d.cal_marks_3d_conners_simple(g_marks_pcd, show=False) else: # 分割标定板 if times == 1: # 第一次执行时分割 g_marks_pcd = cloud3d.seg_mark_cloud(LIDAR_FILE, kk, show=False) if len(g_marks_pcd) != MARK_COUNT: return None, None pcd_marks_conners = cloud3d.cal_marks_3d_conners(show=False) pcd_marks_conners_new = [] for i in range(4): tmmp = [] for j in range(4): tmmp.append(pcd_marks_conners[i].points[j]) pcd_marks_conners_new.append(tmmp) return aurco_marks_conners, pcd_marks_conners_new def step1_get_aruco_and_lidar_marks_and_conners_v2( camera_file_path = CAMERA_FILE, lidar_file_path = LIDAR_FILE, times = 1, simple=True): global CAMERA_FILE, LIDAR_FILE, g_marks_pcd if CAMERA_FILE != camera_file_path : CAMERA_FILE = camera_file_path if LIDAR_FILE != lidar_file_path : LIDAR_FILE = lidar_file_path frame, aurco_marks_conners = aurco3d.cal_camera_marks_3d_conners_v2(CAMERA_FILE) frame, pcd_marks_conners = aurco3d.cal_lidar_marks_3d_conner_v2(LIDAR_FILE) pcd_marks_conners_new = [] for i in range(4): tmmp = [] for j in range(4): tmmp.append(pcd_marks_conners[i][j]) pcd_marks_conners_new.append(tmmp) return aurco_marks_conners, pcd_marks_conners_new def step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners): # 排除异常点 __exclude = [] while True: __max = 0 __min = 10 __mean = 0 __cnt = 0 __conner_mean = 0 __conner_cnt = 0 __conners_mean_distance = [] for i in range(16): src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4] src_cloud_mark_conner = pcd_marks_conners[int(i/4)][i%4] for j in range(16): if i in __exclude or j in __exclude: continue dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4] dst_cloud_mark_conner = pcd_marks_conners[int(j/4)][j%4] aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner) lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner) diff_distance = abs(aurco_distance - lidar_distance) if diff_distance > __max: __max = diff_distance if diff_distance < __min: __min = diff_distance __cnt += 1 __mean += diff_distance __conner_cnt += 1 __conner_mean += diff_distance # print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4] # + " = " + str(diff_distance)) if __conner_cnt == 0: tmp=0 else : tmp = __conner_mean/__conner_cnt __conners_mean_distance.append(tmp) __conner_cnt = 0 __conner_mean = 0 __mean = __mean/__cnt Log.info("16点对比结果") print("__max:" + str(__max)) print("__min:" + str(__min)) print("__cnt:" + str(__cnt)) print("__mean:" + str(__mean)) print("__conners_mean_distance:") # for test # for i in range(len(__conners_mean_distance)): # print(__conners_mean_distance[i]) max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0] if len(__exclude) < conf_max_exclude_num: __exclude.append(max_index) else: break # 初始化SVD数据 list_marks_aurco = [] list_marks_pcd = [] for i in range(16): # for test print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners[int(i/4)][i%4][0], aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners[int(i/4)][i%4][1], aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners[int(i/4)][i%4][2]) if i in __exclude: continue list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4]) list_marks_pcd.append(pcd_marks_conners[int(i/4)][i%4]) return list_marks_aurco, list_marks_pcd, __mean def step2_exclude_bad_conners_v2(aurco_marks_conners, pcd_marks_conners): # 排除异常点 __exclude = [] while True: __max = 0 __min = 10 __mean = 0 __cnt = 0 __conner_mean = 0 __conner_cnt = 0 __conners_mean_distance = [] for i in range(16): src_aruco_mark_conner = aurco_marks_conners[int(i/4)][i%4] src_cloud_mark_conner = pcd_marks_conners[int(i/4)][i%4] for j in range(16): if i in __exclude or j in __exclude: continue dst_aruco_mark_conner = aurco_marks_conners[int(j/4)][j%4] dst_cloud_mark_conner = pcd_marks_conners[int(j/4)][j%4] aurco_distance = np.linalg.norm(src_aruco_mark_conner - dst_aruco_mark_conner) lidar_distance = np.linalg.norm(src_cloud_mark_conner - dst_cloud_mark_conner) diff_distance = abs(aurco_distance - lidar_distance) if diff_distance > __max: __max = diff_distance if diff_distance < __min: __min = diff_distance __cnt += 1 __mean += diff_distance __conner_cnt += 1 __conner_mean += diff_distance # print(str(int(i/4)) + "-" + _dict[i%4] + " : " + str(int(j/4)) + "-" + _dict[j%4] # + " = " + str(diff_distance)) if __conner_cnt == 0: tmp=0 else : tmp = __conner_mean/__conner_cnt __conners_mean_distance.append(tmp) __conner_cnt = 0 __conner_mean = 0 __mean = __mean/__cnt Log.info("16点对比结果") print("__max:" + str(__max)) print("__min:" + str(__min)) print("__cnt:" + str(__cnt)) print("__mean:" + str(__mean)) print("__conners_mean_distance:") # for test # for i in range(len(__conners_mean_distance)): # print(__conners_mean_distance[i]) max_index = max(enumerate(__conners_mean_distance), key=lambda x: x[1])[0] if len(__exclude) < conf_max_exclude_num: __exclude.append(max_index) else: break # 初始化SVD数据 list_marks_aurco = [] list_marks_pcd = [] Log.info("16点对比结果") for i in range(16): # for test print(aurco_marks_conners[int(i/4)][i%4][0] - pcd_marks_conners[int(i/4)][i%4][0], aurco_marks_conners[int(i/4)][i%4][1] - pcd_marks_conners[int(i/4)][i%4][1], aurco_marks_conners[int(i/4)][i%4][2] - pcd_marks_conners[int(i/4)][i%4][2]) list_marks_aurco.append(aurco_marks_conners[int(i/4)][i%4]) list_marks_pcd.append(pcd_marks_conners[int(i/4)][i%4]) return list_marks_aurco, list_marks_pcd, __mean import open3d as o3d def create_spheres(points, sizes=None, color=[0, 0.706, 1]): """ 创建一系列球体以替代点云中的点,允许每个点有不同的大小。 参数: points: N x 3 数组,表示N个点的位置。 sizes: N维数组或列表,表示对应点的半径大小。如果为None,则所有球体大小相同。 color: RGB颜色值,默认是橙色。 返回: spheres: 包含所有球体的Open3D几何对象列表。 """ spheres = [] if sizes is None: sizes = [0.006] * len(points) # 如果没有提供大小,则默认所有球体大小相同 for i, point in enumerate(points): sphere = o3d.geometry.TriangleMesh.create_sphere(radius=sizes[i]) sphere.translate(point) # 将球体移动到点的位置 sphere.paint_uniform_color(color) spheres.append(sphere) return spheres def step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners): # SVD 分解得到 RT array_aurco_marks_conners = np.asarray(aurco_marks_conners) array_pcd_marks_conners = np.asarray(pcd_marks_conners) _R, _t, _ ,_ = svd.scipy_svd(array_pcd_marks_conners, array_aurco_marks_conners) # _r, _p, _y = utils.rotation_matrix_to_euler_angles(_R) _r, _p, _y = utils.rotation_matrix_to_rpy(_R) print(_R) print(_t) print(_r, _p, _y) # for test _pdc_rt = (_R @ np.asarray(array_pcd_marks_conners).T).T + _t if True: # 角点显示的圆球半径,单位(米) spheres_2d = create_spheres(array_aurco_marks_conners[:16], None, color=[0, 0.706, 1]) spheres_3d = create_spheres(_pdc_rt[:16], None, color=[0, 0, 0]) points_list = [] for i in range(len(array_aurco_marks_conners[:16])): points_list.append(spheres_2d[i]) points_list.append(spheres_3d[i]) # o3d.visualization.draw_geometries(points_list) return 0, [_t[0],_t[1],_t[2]], [_r, _p, _y] def calibration_by_one_samples_points(): calibration_result = [] bad_samples = [] samples_path_root = os.path.dirname(__file__) + "\\samples\\calibration" # 标定使用的样本数量 samples_size = 1 # 每个样本执行的次数 times_per_one_sample = 2 for subdir in utils.list_subdirs(samples_path_root): paths = utils.traverse_folder(samples_path_root + "\\" + subdir) if len(paths) != 2: print("[ERROR]\t Data Error Path : " + paths) times = 0 while times < times_per_one_sample: times += 1 ret, _t, _rpy, __mean = do_calibration_with_one_sample(paths[1], paths[0]) # 0 png, 1 ply if ret != 0: bad_samples.append(subdir) continue # 合并得到最终得旋转角 _new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2]) calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy]) samples_size -= 1 if samples_size == 0: break for i in range(len(calibration_result)): str_mean = str(calibration_result[i][2]) str_t = str(calibration_result[i][3]) str_ypr = str(calibration_result[i][4]) str_new_rpy = str(calibration_result[i][5]) print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy) for i in range(len(bad_samples)): print(bad_samples[i]) utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result) def calibration_by_all_samples_points(samples_path_root, samples_size=1, times_per_one_sample=2): calibration_result = [] bad_samples = [] all_points = [] # 定义总体的 aurco_marks_conners, pcd_marks_conners aurco_marks_conners_all = [] pcd_marks_conners_all = [] for subdir in utils.list_subdirs(samples_path_root): paths = utils.traverse_folder(samples_path_root + "\\" + subdir) if len(paths) != 2: print("[ERROR]\t Data Error Path : " + paths) times = 0 while times < times_per_one_sample: times += 1 aurco_marks_conners, pcd_marks_conners = \ step1_get_aruco_and_lidar_marks_and_conners(paths[1], paths[0], times) # 0 png, 1 ply if aurco_marks_conners is None: Log.error("subdir is " + subdir + " aurco_marks_conners has ERROR") break aurco_marks_conners, pcd_marks_conners, __mean = \ step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners) _, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners) _new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2]) calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy]) for it in range(len(aurco_marks_conners)): aurco_marks_conners_all.append(aurco_marks_conners[it].tolist()) pcd_marks_conners_all.append(pcd_marks_conners[it].tolist()) samples_size -= 1 if samples_size == 0: break # 将多样本多次计算的综合点统一进行SVD分解 if len(aurco_marks_conners_all) == 0: return -1, None, None, None _, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners_all, pcd_marks_conners_all) # _new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2]) calibration_result.append(["all", str(len(aurco_marks_conners_all)) + " & " + str(len(pcd_marks_conners_all)), \ __mean, _t, _rpy, _new_rpy]) for i in range(len(calibration_result)): str_mean = str(calibration_result[i][2]) str_t = str(calibration_result[i][3]) str_ypr = str(calibration_result[i][4]) str_new_rpy = str(calibration_result[i][5]) print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy) for i in range(len(bad_samples)): print(bad_samples[i]) utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result) all_points.append(aurco_marks_conners_all) all_points.append(pcd_marks_conners_all) utils.save_to_json(os.path.dirname(__file__) + '\\record\\points.json', all_points) return 0, _t, _rpy, __mean def calibration_by_all_samples_points_v2(samples_path_root, samples_size=1, times_per_one_sample=2): calibration_result = [] bad_samples = [] all_points = [] # 定义总体的 aurco_marks_conners, pcd_marks_conners aurco_marks_conners_all = [] pcd_marks_conners_all = [] for subdir in utils.list_subdirs(samples_path_root): paths = utils.traverse_folder(samples_path_root + "\\" + subdir) if len(paths) != 2: print("[ERROR]\t Data Error Path : " + paths) times = 0 while times < times_per_one_sample: times += 1 aurco_marks_conners, pcd_marks_conners = \ step1_get_aruco_and_lidar_marks_and_conners_v2(paths[1], paths[0], times) # 0 png, 1 ply if aurco_marks_conners is None: Log.error("subdir is " + subdir + " aurco_marks_conners has ERROR") break aurco_marks_conners, pcd_marks_conners, __mean = \ step2_exclude_bad_conners(aurco_marks_conners, pcd_marks_conners) _, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners, pcd_marks_conners) _new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2]) calibration_result.append([subdir, str(times), __mean, _t, _rpy, _new_rpy]) for it in range(len(aurco_marks_conners)): aurco_marks_conners_all.append(aurco_marks_conners[it].tolist()) pcd_marks_conners_all.append(pcd_marks_conners[it].tolist()) samples_size -= 1 if samples_size == 0: break # 将多样本多次计算的综合点统一进行SVD分解 if len(aurco_marks_conners_all) == 0: return -1, None, None, None _, _t, _rpy = step3_cal_aruco_and_lidar_marks_rt(aurco_marks_conners_all, pcd_marks_conners_all) _new_rpy = utils.combine_rpy(roll, pitch, yaw, _rpy[0], _rpy[1], _rpy[2]) calibration_result.append(["all", str(len(aurco_marks_conners_all)) + " & " + str(len(pcd_marks_conners_all)), \ __mean, _t, _rpy, _new_rpy]) for i in range(len(calibration_result)): str_mean = str(calibration_result[i][2]) str_t = str(calibration_result[i][3]) str_ypr = str(calibration_result[i][4]) str_new_rpy = str(calibration_result[i][5]) print(calibration_result[i][0], calibration_result[i][1], str_mean, str_t, str_ypr, str_new_rpy) for i in range(len(bad_samples)): print(bad_samples[i]) utils.save_to_json(os.path.dirname(__file__) + '\\record\\record.json', calibration_result) all_points.append(aurco_marks_conners_all) all_points.append(pcd_marks_conners_all) utils.save_to_json(os.path.dirname(__file__) + '\\record\\points.json', all_points) return 0, _t, _rpy, __mean if __name__ == '__main__': # calibration_by_one_samples_points() samples_size = 5 times_per_one_sample = 1 # samples_path_root = os.path.dirname(__file__) + "\\ssh_tmp" samples_path_root = os.path.dirname(__file__) + "\\samples\\20250214" ret, _t, _rpy, __mean = calibration_by_all_samples_points(samples_path_root, samples_size, times_per_one_sample) # ret, _t, _rpy, __mean = calibration_by_all_samples_points_v2(samples_path_root, samples_size, times_per_one_sample) # rt = utils.read_from_json(os.path.dirname(__file__) + '\\record\\record.json') # print(rt)