# -- coding: utf-8 -- import os import sys current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, current_dir) import inspect import logging import logging.config import numpy as np import json import numpy as np from scipy.spatial.transform import Rotation as R import cv2 import config import open3d as o3d class Log(object): # 文字效果 0:终端默认,1:高亮,4:下划线,5:闪烁,7:反白显示 # 前景色 30(黑),31(红),32(绿),33(黄),34(蓝),35(紫),36(青),37(灰) # 背景色 40(黑),41(红),42(绿),43(黄),44(蓝),45(紫),46(青),47(灰) class ColorHandler(logging.StreamHandler): COLOR_CODE = { 'INFO': '\033[1;32m', # 绿色 'WARNING': '\033[1;33m', # 黄色 'ERROR': '\033[1;31m', # 红色 'CRITICAL': '\033[1;34m', # 蓝色 } def emit(self, record): log_color = self.COLOR_CODE.get(record.levelname, '') # 默认不改变颜色 log_msg = self.format(record) stream = self.stream stream.write(log_color) stream.write(log_msg) self.stream.write('\033[0m') # 重置颜色 self.stream.write('\n') logging.config.fileConfig(os.path.dirname(__file__) + '\\logging.conf') __logger = logging.getLogger('script') __logger.propagate = False # 避免重复打印 color_handler = ColorHandler() color_handler.setFormatter(logging.Formatter('%(asctime)s - %(srcfilename)s - %(srcfunction)s - %(srclineno)d - %(levelname)s : %(message)s')) __logger.addHandler(color_handler) @classmethod def set_level(cls, level): cls.__logger.setLevel(level) @classmethod def debug(cls, msg, *args, **kwargs): cls._log(logging.DEBUG, msg, *args, **kwargs) @classmethod def info(cls, msg, *args, **kwargs): cls._log(logging.INFO, msg, *args, **kwargs) @classmethod def warning(cls, msg, *args, **kwargs): cls._log(logging.WARNING, msg, *args, **kwargs) @classmethod def error(cls, msg, *args, **kwargs): cls._log(logging.ERROR, msg, *args, **kwargs) @classmethod def critical(cls, msg, *args, **kwargs): cls._log(logging.CRITICAL, msg, *args, **kwargs) @classmethod def _log(cls, level, msg, *args, **kwargs): # 获取调用者的信息 frame = inspect.currentframe().f_back.f_back caller = inspect.getframeinfo(frame) # 记录日志,传入调用者的文件名和行号 cls.__logger.log(level, msg, *args, **kwargs, extra={'srcfilename': os.path.basename(caller.filename), 'srcfunction': caller.function, 'srclineno': caller.lineno}) @classmethod def get_logger(cls): return cls.__logger def color_map(): # 创建一个空的色彩映射表,大小为256x1(因为我们将生成256种颜色),3个通道(BGR) color_map = np.zeros((256, 1, 3), dtype=np.uint8) def blue_to_red(i): if i < 85: # 蓝色 -> 绿色 (B: 255->0, G: 0->255, R: 0) b = 255 - i * 3 g = i * 3 r = 0 elif i < 170: # 绿色 -> 黄色 (B: 0, G: 255, R: 0->255) b = 0 g = 255 r = (i - 85) * 3 else: # 黄色 -> 红色 (B: 0, G: 255->0, R: 255) b = 0 g = 255 - (i - 170) * 3 r = 255 # 使用 numpy.clip 来保证 RGB 值都在 [0, 255] 的范围内 return tuple(np.clip([b, g, r], 0, 255).astype(int)) # 填充色彩映射表 for i in range(256): color_map[i, 0] = blue_to_red(i) return color_map def show_color_map(color_map): color_map_vis = cv2.applyColorMap(np.arange(256, dtype=np.uint8).reshape(1, 256), color_map) color_map_vis = cv2.resize(color_map_vis, (512, 100), interpolation=cv2.INTER_LINEAR) cv2.imshow('Reflectivity Colormap', color_map_vis) cv2.waitKey(0) cv2.destroyAllWindows() def print_diff(aurco_pt1, aurco_pt2, lidar_pt1, lidar_pt2): aurco_distance = np.linalg.norm(aurco_pt1 - aurco_pt2) lidar_distance = np.linalg.norm(lidar_pt1 - lidar_pt2) print(aurco_distance) print(lidar_distance) print(abs(aurco_distance - lidar_distance)) def rotation_matrix_to_euler_angles(R): # 提取旋转矩阵的元素 r11, r12, r13, r21, r22, r23, r31, r32, r33 = R.flatten() # 计算旋转角度 _r = np.arctan2(r32, r22) _p = np.arctan2(-r31, np.sqrt(r32**2 + r33**2)) _y = np.arctan2(r13, r11) return _r, _p, _y def rotation_matrix_to_rpy(R): # 确保输入是一个有效的旋转矩阵 if not np.allclose(np.dot(R, R.T), np.eye(3)) or not np.isclose(np.linalg.det(R), 1.0): raise ValueError("输入不是一个有效的旋转矩阵") _pitch = -np.arcsin(R[2, 0]) if np.abs(_pitch - np.pi / 2) < 1e-6: # 处理接近90度的情况 _yaw = 0 _roll = np.arctan2(R[0, 1], R[0, 2]) elif np.abs(_pitch + np.pi / 2) < 1e-6: # 处理接近-90度的情况 _yaw = 0 _roll = -np.arctan2(R[0, 1], R[0, 2]) else: _yaw = np.arctan2(R[1, 0], R[0, 0]) _roll = np.arctan2(R[2, 1], R[2, 2]) return _roll, _pitch, _yaw def traverse_folder(folder_path): path = [] for root, dirs, files in os.walk(folder_path): for file_name in files: abs_file_path = os.path.join(root, file_name) Log.critical(abs_file_path) path.append(abs_file_path) return path def list_subdirs(directory): return [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))] def save_to_json(file_path, array): str_tmp = '[\n' for i in range(len(array)): if i == len(array) -1: str_tmp += json.dumps(array[i]) + '\n' else: str_tmp += json.dumps(array[i]) + ',\n' str_tmp += ']' with open(file_path, 'w+') as f: f.write(str_tmp) def read_from_json(file_path): with open(file_path, 'r') as f: loaded_array = json.load(f) return loaded_array # def rotation_matrix_from_ypr(yaw, pitch, roll): # """Convert Yaw-Pitch-Roll angles to a rotation matrix.""" # return R.from_euler('xyz', [yaw, pitch, roll], degrees=False).as_matrix() # def combine_rotations(R1, R2): # """Combine two rotation matrices.""" # return np.dot(R2, R1) # def combine_ypr(yaw1, pitch1, roll1, yaw2, pitch2, roll2): # R1 = rotation_matrix_from_ypr(yaw1, pitch1, roll1) # R2 = rotation_matrix_from_ypr(yaw2, pitch2, roll2) # R_combined = combine_rotations(R1, R2) # r_combined = R.from_matrix(R_combined) # yaw_combined, pitch_combined, roll_combined = r_combined.as_euler('xyz', degrees=False) # return [yaw_combined, pitch_combined, roll_combined] def quaternion_from_rpy(roll, pitch, yaw): """Convert Roll-Pitch-Yaw angles to a quaternion.""" return R.from_euler('xyz', [roll, pitch, yaw], degrees=False).as_quat() def rpy_from_quaternion(q): """Convert a quaternion to Roll-Pitch-Yaw angles.""" return R.from_quat(q).as_euler('xyz', degrees=False) def combine_quaternions(q1, q2): """Combine two quaternions.""" r1 = R.from_quat(q1) r2 = R.from_quat(q2) combined_rotation = r2 * r1 return combined_rotation.as_quat() def combine_rpy(roll1, pitch1, yaw1, roll2 ,pitch2, yaw2): q1 = quaternion_from_rpy(roll1, pitch1, yaw1) q2 = quaternion_from_rpy(roll2, pitch2, yaw2) q_combined = combine_quaternions(q1, q2) roll_combined, pitch_combined, yaw_combined = rpy_from_quaternion(q_combined) return [roll_combined, pitch_combined, yaw_combined] def pnp3d(points3d, points2d): _, rvec, tvec = cv2.solvePnP(points3d, points2d, config.camera_matrix, config.dist_coeffs) _r, _p, _y = rotation_matrix_to_rpy(rvec) return tvec, _r, _p, _y def compute_error(P, Q, R_matrix, t_vector): """计算变换后的点云与目标点云之间的误差""" transformed_P = (R_matrix @ P.T).T + t_vector error = np.mean(np.linalg.norm(transformed_P - Q, axis=1)) return error, transformed_P def filter_and_fit_rts(RT_matrices, P, Q, threshold=None, use_ransac=False): """ 筛选并拟合最优的 RT 矩阵。 参数: RT_matrices: list of tuples (R_matrix, t_vector) P: numpy.ndarray, 基准点云 Q: numpy.ndarray, 目标点云 threshold: float, 错误阈值,可选 use_ransac: bool, 是否使用 RANSAC 方法,默认 False 返回: R_opt, t_opt: 最优的旋转矩阵和平移向量 """ errors = [] transformed_points = [] # 计算每个 RT 矩阵的误差 for R_matrix, t_vector in RT_matrices: error, transformed_P = compute_error(P, Q, R_matrix, t_vector) errors.append(error) transformed_points.append(transformed_P) errors = np.array(errors) transformed_points = np.array(transformed_points) if use_ransac: from sklearn.linear_model import RANSACRegressor model = RANSACRegressor() model.fit(transformed_points.reshape(-1, 3), Q.repeat(len(RT_matrices), axis=0)) inlier_mask = model.inlier_mask_ valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if inlier_mask[i * len(P)]] else: if threshold is not None: valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if errors[i] < threshold] else: valid_RTs = RT_matrices if not valid_RTs: raise ValueError("没有有效的 RT 矩阵") # 使用简单的平均方法拟合最优 RT 矩阵 Rs = np.array([R_matrix for R_matrix, _ in valid_RTs]) ts = np.array([t_vector for _, t_vector in valid_RTs]) # 平均旋转矩阵可以通过四元数平均实现 quats = R.from_matrix(Rs).as_quat().mean(axis=0) R_opt = R.from_quat(quats).as_matrix() # 平均平移向量 t_opt = ts.mean(axis=0) return R_opt, t_opt def rpy_to_rotation_matrix(r, p, y): """Convert RPY angles to a rotation matrix.""" _roll, _pitch, _yaw = r, p, y # 定义每个轴的旋转矩阵 Rx = np.array([[1, 0, 0], [0, np.cos(_roll), -np.sin(_roll)], [0, np.sin(_roll), np.cos(_roll)]]) Ry = np.array([[np.cos(_pitch), 0, np.sin(_pitch)], [0, 1, 0], [-np.sin(_pitch), 0, np.cos(_pitch)]]) Rz = np.array([[np.cos(_yaw), -np.sin(_yaw), 0], [np.sin(_yaw), np.cos(_yaw), 0], [0, 0, 1]]) # 计算总的旋转矩阵(假设使用 ZYX 顺序) R = Rz @ Ry @ Rx return R # def rpy_t_to_rt_matrix(r, p, y, t): # """Construct the RT matrix from RPY angles and translation vector.""" # R = rpy_to_rotation_matrix(r, p, y) # # 构建 4x4 齐次变换矩阵 # RT = np.eye(4) # RT[:3, :3] = R # RT[:3, 3] = t # return RT def parse_custom_attribute(ply_filename, attribute_name): with open(ply_filename, 'r') as f: lines = f.readlines() # 解析文件头以找到自定义属性的位置 header_lines = [] for line in lines: if line.strip() == 'end_header': header_lines.append(line) break header_lines.append(line) vertex_count = None custom_index = None for i, line in enumerate(header_lines): parts = line.strip().split() if len(parts) > 2 and parts[0] == 'element' and parts[1] == 'vertex': vertex_count = int(parts[2]) elif len(parts) > 2 and parts[0] == 'property' and parts[-1] == attribute_name: # custom_index = i - len([l for l in header_lines if 'element' in l]) - 1 custom_index = 3 if vertex_count is None or custom_index is None: raise ValueError(f"Failed to find {attribute_name} in the PLY file header.") # 提取自定义属性的数据 data_start = header_lines.index('end_header\n') + 1 custom_data = [] for line in lines[data_start:data_start + vertex_count]: values = line.strip().split() try: custom_value = int(values[custom_index]) custom_data.append(custom_value) except (IndexError, ValueError): raise ValueError(f"Failed to read {attribute_name} from data row.") return np.array(custom_data) def project_cloud(pcd_cloud): im_project = np.zeros((config.global_h, config.global_w, 3), dtype=np.uint8) im_2d_to_3d = np.zeros((config.global_h, config.global_w, 3), dtype=np.float64) rvec = np.array([np.array([0.]), np.array([0.]), np.array([0.])]) tvec = np.float64([0.0, 0.0, 0.0]) pts_2d, _ = cv2.projectPoints(np.array(pcd_cloud.points), rvec, tvec, config.camera_matrix, config.dist_coeffs) color = (255,255,255) list_pts_2d = pts_2d.tolist() list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d] points_np = np.asarray(pcd_cloud.points) for i, pt_2d in enumerate(list_pts_2d): # x, y = pt_2d.ravel() # if x == float('nan') or y == float('nan'): continue # x, y = int(x), int(y) # x, y = pt_2d[0][0], pt_2d[0][1] if 0 <= pt_2d[0][0] < config.global_w and 0 <= pt_2d[0][1] < config.global_h: im_project[pt_2d[0][1], pt_2d[0][0]] = color # im_2d_to_3d[y, x][0] = pcd_cloud.points[i][0] # im_2d_to_3d[y, x][1] = pcd_cloud.points[i][1] # im_2d_to_3d[y, x][2] = pcd_cloud.points[i][2] im_2d_to_3d[pt_2d[0][1], pt_2d[0][0]] = points_np[i] return im_project, im_2d_to_3d, pts_2d, list_pts_2d def cal_H(im_2d_to_3d, pts_2d, list_pts_2d): pts_3d = [] # list_pts_2d = pts_2d.tolist() # list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d] for i, pt_2d in enumerate(list_pts_2d): # x, y = pt_2d.ravel() # x, y = int(x), int(y) # x, y = pt_2d[0][0], pt_2d[0][1] pts_3d.append([im_2d_to_3d[pt_2d[1], pt_2d[0]][0], im_2d_to_3d[pt_2d[1], pt_2d[0]][1], im_2d_to_3d[pt_2d[1], pt_2d[0]][2]]) ransacReprojThreshold = 3.0 # 查找单应矩阵 # 修改第三个参数 1.0 -> 5.0 # 利用基于RANSAC的鲁棒算法选择最优的四组配对点,再计算转换矩阵H(3*3)并返回,以便于反向投影错误率达到最小 # ransacReprojThreshold 设置为 1.0 # H, mask = cv2.findHomography(np.array(src_points), np.array(dst_points), cv2.RANSAC, ransacReprojThreshold, maxIters=1500, confidence=0.998) ## 修改 1.0 -> 5.0 pts_hom = [] for i in range(len(pts_2d)): pt = pts_2d[i] x,y = pt[0], pt[1] pts_hom.append([x*1.0, y*1.0, 1.0]) H, _ = cv2.findHomography(np.array(pts_3d), np.array(pts_hom), cv2.RANSAC, ransacReprojThreshold) ## 修改 1.0 -> 5.0 Log.info("2D-3D 单应性矩阵计算结果为:") print(H) return H def sort_conners_2d(conners): sort_conner = [] left, top, right, bottom = 0, 0, 0, 0 # maxx,maxy = 0,0 maxx,maxy = -100000,-100000 minx,miny = 100000,100000 for i in range(len(conners)): conner = conners[i] if conner[0] < minx: minx = conner[0] left = i if conner[0] > maxx: maxx = conner[0] right = i if conner[1] < miny: miny = conner[1] top = i if conner[1] > maxy: maxy = conner[1] bottom = i sort_conner.append(conners[left]) sort_conner.append(conners[top]) sort_conner.append(conners[right]) sort_conner.append(conners[bottom]) return sort_conner def select_and_sort_conners_2d(points): sort_conner = [] left, top, right, bottom = 0, 0, 0, 0 # maxx,maxy = 0,0 maxx,maxy = -100000,-100000 minx,miny = 100000,100000 for i in range(len(points)): conner = list(points[i])[0] if conner[0] < minx: minx = conner[0] left = i if conner[0] > maxx: maxx = conner[0] right = i if conner[1] < miny: miny = conner[1] top = i if conner[1] > maxy: maxy = conner[1] bottom = i sort_conner.append([points[left][0][0], points[left][0][1]]) sort_conner.append([points[top][0][0], points[top][0][1]]) sort_conner.append([points[right][0][0], points[right][0][1]]) sort_conner.append([points[bottom][0][0], points[bottom][0][1]]) return sort_conner def reproject_cloud(plane_model, cloud_load): reproject_pcd = o3d.geometry.PointCloud() A,B,C,D = plane_model for pts_3d in cloud_load.points: # 求线与平面的交点 t = -D / (A * pts_3d[0] + B * pts_3d[1] + C * pts_3d[2]) # pts_3d = np.array([t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]]) reproject_pcd.points.append((t * pts_3d[0], t * pts_3d[1], t * pts_3d[2])) return reproject_pcd def correct_mark_cloud(mark_pcd): plane_model, inliers = mark_pcd.segment_plane(distance_threshold=0.0155, ransac_n=3, num_iterations=10000) inlier_pcd = mark_pcd.select_by_index(inliers) inlier_pcd = reproject_cloud(plane_model, inlier_pcd) #利用投影得到的图像 inlier_pcd = inlier_pcd.paint_uniform_color([1.0, 0, 0]) outlier_cloud = mark_pcd.select_by_index(inliers, invert=True) # 暂时没有用到 # radius = 0.05 # inlier_pcd = smooth_point_cloud(inlier_pcd, radius) #利用拟合得到的图像 inlier_pcd_correct = reproject_cloud(plane_model, mark_pcd) #利用投影得到的图像 inlier_pcd_correct = inlier_pcd_correct.paint_uniform_color([0.0, 1.0, 0]) # o3d.visualization.draw_geometries([ inlier_pcd, inlier_pcd_correct ]) # inlier_pcd_correct = inlier_pcd # ??? return plane_model, inlier_pcd, inlier_pcd_correct if __name__ == '__main__': # array = [["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]], ["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]]] # save_to_json('', array) _color_map = color_map() show_color_map(_color_map) # 假设我们有如下 RT 矩阵列表和其他必要的点云数据 RT_matrices = [...] # 这里应该是你提供的 RT 矩阵列表 P = np.random.rand(100, 3) # 示例基准点云 Q = np.random.rand(100, 3) # 示例目标点云 R_opt, t_opt = filter_and_fit_rts(RT_matrices, P, Q, threshold=0.1, use_ransac=True) print("Optimal Rotation Matrix:\n", R_opt) print("Optimal Translation Vector:\n", t_opt)