calibration_tools_v1.0/utils.py
2025-02-20 10:45:17 +08:00

494 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -- coding: utf-8 --
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, current_dir)
import inspect
import logging
import logging.config
import numpy as np
import json
import numpy as np
from scipy.spatial.transform import Rotation as R
import cv2
import config
import open3d as o3d
class Log(object):
# 文字效果 0终端默认1高亮4下划线5闪烁7反白显示
# 前景色 303132绿3334353637
# 背景色 404142绿4344454647
class ColorHandler(logging.StreamHandler):
COLOR_CODE = {
'INFO': '\033[1;32m', # 绿色
'WARNING': '\033[1;33m', # 黄色
'ERROR': '\033[1;31m', # 红色
'CRITICAL': '\033[1;34m', # 蓝色
}
def emit(self, record):
log_color = self.COLOR_CODE.get(record.levelname, '') # 默认不改变颜色
log_msg = self.format(record)
stream = self.stream
stream.write(log_color)
stream.write(log_msg)
self.stream.write('\033[0m') # 重置颜色
self.stream.write('\n')
logging.config.fileConfig(os.path.dirname(__file__) + '\\logging.conf')
__logger = logging.getLogger('script')
__logger.propagate = False # 避免重复打印
color_handler = ColorHandler()
color_handler.setFormatter(logging.Formatter('%(asctime)s - %(srcfilename)s - %(srcfunction)s - %(srclineno)d - %(levelname)s : %(message)s'))
__logger.addHandler(color_handler)
@classmethod
def set_level(cls, level):
cls.__logger.setLevel(level)
@classmethod
def debug(cls, msg, *args, **kwargs):
cls._log(logging.DEBUG, msg, *args, **kwargs)
@classmethod
def info(cls, msg, *args, **kwargs):
cls._log(logging.INFO, msg, *args, **kwargs)
@classmethod
def warning(cls, msg, *args, **kwargs):
cls._log(logging.WARNING, msg, *args, **kwargs)
@classmethod
def error(cls, msg, *args, **kwargs):
cls._log(logging.ERROR, msg, *args, **kwargs)
@classmethod
def critical(cls, msg, *args, **kwargs):
cls._log(logging.CRITICAL, msg, *args, **kwargs)
@classmethod
def _log(cls, level, msg, *args, **kwargs):
# 获取调用者的信息
frame = inspect.currentframe().f_back.f_back
caller = inspect.getframeinfo(frame)
# 记录日志,传入调用者的文件名和行号
cls.__logger.log(level, msg, *args, **kwargs,
extra={'srcfilename': os.path.basename(caller.filename), 'srcfunction': caller.function, 'srclineno': caller.lineno})
@classmethod
def get_logger(cls):
return cls.__logger
def color_map():
# 创建一个空的色彩映射表大小为256x1因为我们将生成256种颜色3个通道BGR
color_map = np.zeros((256, 1, 3), dtype=np.uint8)
def blue_to_red(i):
if i < 85:
# 蓝色 -> 绿色 (B: 255->0, G: 0->255, R: 0)
b = 255 - i * 3
g = i * 3
r = 0
elif i < 170:
# 绿色 -> 黄色 (B: 0, G: 255, R: 0->255)
b = 0
g = 255
r = (i - 85) * 3
else:
# 黄色 -> 红色 (B: 0, G: 255->0, R: 255)
b = 0
g = 255 - (i - 170) * 3
r = 255
# 使用 numpy.clip 来保证 RGB 值都在 [0, 255] 的范围内
return tuple(np.clip([b, g, r], 0, 255).astype(int))
# 填充色彩映射表
for i in range(256):
color_map[i, 0] = blue_to_red(i)
return color_map
def show_color_map(color_map):
color_map_vis = cv2.applyColorMap(np.arange(256, dtype=np.uint8).reshape(1, 256), color_map)
color_map_vis = cv2.resize(color_map_vis, (512, 100), interpolation=cv2.INTER_LINEAR)
cv2.imshow('Reflectivity Colormap', color_map_vis)
cv2.waitKey(0)
cv2.destroyAllWindows()
def print_diff(aurco_pt1, aurco_pt2, lidar_pt1, lidar_pt2):
aurco_distance = np.linalg.norm(aurco_pt1 - aurco_pt2)
lidar_distance = np.linalg.norm(lidar_pt1 - lidar_pt2)
print(aurco_distance)
print(lidar_distance)
print(abs(aurco_distance - lidar_distance))
def rotation_matrix_to_euler_angles(R):
# 提取旋转矩阵的元素
r11, r12, r13, r21, r22, r23, r31, r32, r33 = R.flatten()
# 计算旋转角度
_r = np.arctan2(r32, r22)
_p = np.arctan2(-r31, np.sqrt(r32**2 + r33**2))
_y = np.arctan2(r13, r11)
return _r, _p, _y
def rotation_matrix_to_rpy(R):
# 确保输入是一个有效的旋转矩阵
if not np.allclose(np.dot(R, R.T), np.eye(3)) or not np.isclose(np.linalg.det(R), 1.0):
raise ValueError("输入不是一个有效的旋转矩阵")
_pitch = -np.arcsin(R[2, 0])
if np.abs(_pitch - np.pi / 2) < 1e-6: # 处理接近90度的情况
_yaw = 0
_roll = np.arctan2(R[0, 1], R[0, 2])
elif np.abs(_pitch + np.pi / 2) < 1e-6: # 处理接近-90度的情况
_yaw = 0
_roll = -np.arctan2(R[0, 1], R[0, 2])
else:
_yaw = np.arctan2(R[1, 0], R[0, 0])
_roll = np.arctan2(R[2, 1], R[2, 2])
return _roll, _pitch, _yaw
def traverse_folder(folder_path):
path = []
for root, dirs, files in os.walk(folder_path):
for file_name in files:
abs_file_path = os.path.join(root, file_name)
Log.critical(abs_file_path)
path.append(abs_file_path)
return path
def list_subdirs(directory):
return [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
def save_to_json(file_path, array):
str_tmp = '[\n'
for i in range(len(array)):
if i == len(array) -1: str_tmp += json.dumps(array[i]) + '\n'
else: str_tmp += json.dumps(array[i]) + ',\n'
str_tmp += ']'
with open(file_path, 'w+') as f:
f.write(str_tmp)
def read_from_json(file_path):
with open(file_path, 'r') as f:
loaded_array = json.load(f)
return loaded_array
# def rotation_matrix_from_ypr(yaw, pitch, roll):
# """Convert Yaw-Pitch-Roll angles to a rotation matrix."""
# return R.from_euler('xyz', [yaw, pitch, roll], degrees=False).as_matrix()
# def combine_rotations(R1, R2):
# """Combine two rotation matrices."""
# return np.dot(R2, R1)
# def combine_ypr(yaw1, pitch1, roll1, yaw2, pitch2, roll2):
# R1 = rotation_matrix_from_ypr(yaw1, pitch1, roll1)
# R2 = rotation_matrix_from_ypr(yaw2, pitch2, roll2)
# R_combined = combine_rotations(R1, R2)
# r_combined = R.from_matrix(R_combined)
# yaw_combined, pitch_combined, roll_combined = r_combined.as_euler('xyz', degrees=False)
# return [yaw_combined, pitch_combined, roll_combined]
def quaternion_from_rpy(roll, pitch, yaw):
"""Convert Roll-Pitch-Yaw angles to a quaternion."""
return R.from_euler('xyz', [roll, pitch, yaw], degrees=False).as_quat()
def rpy_from_quaternion(q):
"""Convert a quaternion to Roll-Pitch-Yaw angles."""
return R.from_quat(q).as_euler('xyz', degrees=False)
def combine_quaternions(q1, q2):
"""Combine two quaternions."""
r1 = R.from_quat(q1)
r2 = R.from_quat(q2)
combined_rotation = r2 * r1
return combined_rotation.as_quat()
def combine_rpy(roll1, pitch1, yaw1, roll2 ,pitch2, yaw2):
q1 = quaternion_from_rpy(roll1, pitch1, yaw1)
q2 = quaternion_from_rpy(roll2, pitch2, yaw2)
q_combined = combine_quaternions(q1, q2)
roll_combined, pitch_combined, yaw_combined = rpy_from_quaternion(q_combined)
return [roll_combined, pitch_combined, yaw_combined]
def pnp3d(points3d, points2d):
_, rvec, tvec = cv2.solvePnP(points3d, points2d, config.camera_matrix, config.dist_coeffs)
_r, _p, _y = rotation_matrix_to_rpy(rvec)
return tvec, _r, _p, _y
def compute_error(P, Q, R_matrix, t_vector):
"""计算变换后的点云与目标点云之间的误差"""
transformed_P = (R_matrix @ P.T).T + t_vector
error = np.mean(np.linalg.norm(transformed_P - Q, axis=1))
return error, transformed_P
def filter_and_fit_rts(RT_matrices, P, Q, threshold=None, use_ransac=False):
"""
筛选并拟合最优的 RT 矩阵。
参数:
RT_matrices: list of tuples (R_matrix, t_vector)
P: numpy.ndarray, 基准点云
Q: numpy.ndarray, 目标点云
threshold: float, 错误阈值,可选
use_ransac: bool, 是否使用 RANSAC 方法,默认 False
返回:
R_opt, t_opt: 最优的旋转矩阵和平移向量
"""
errors = []
transformed_points = []
# 计算每个 RT 矩阵的误差
for R_matrix, t_vector in RT_matrices:
error, transformed_P = compute_error(P, Q, R_matrix, t_vector)
errors.append(error)
transformed_points.append(transformed_P)
errors = np.array(errors)
transformed_points = np.array(transformed_points)
if use_ransac:
from sklearn.linear_model import RANSACRegressor
model = RANSACRegressor()
model.fit(transformed_points.reshape(-1, 3), Q.repeat(len(RT_matrices), axis=0))
inlier_mask = model.inlier_mask_
valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if inlier_mask[i * len(P)]]
else:
if threshold is not None:
valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if errors[i] < threshold]
else:
valid_RTs = RT_matrices
if not valid_RTs:
raise ValueError("没有有效的 RT 矩阵")
# 使用简单的平均方法拟合最优 RT 矩阵
Rs = np.array([R_matrix for R_matrix, _ in valid_RTs])
ts = np.array([t_vector for _, t_vector in valid_RTs])
# 平均旋转矩阵可以通过四元数平均实现
quats = R.from_matrix(Rs).as_quat().mean(axis=0)
R_opt = R.from_quat(quats).as_matrix()
# 平均平移向量
t_opt = ts.mean(axis=0)
return R_opt, t_opt
# def rpy_to_rotation_matrix(r, p, y):
# """Convert RPY angles to a rotation matrix."""
# _roll, _pitch, _yaw = r, p, y
# # 定义每个轴的旋转矩阵
# Rx = np.array([[1, 0, 0],
# [0, np.cos(_roll), -np.sin(_roll)],
# [0, np.sin(_roll), np.cos(_roll)]])
# Ry = np.array([[np.cos(_pitch), 0, np.sin(_pitch)],
# [0, 1, 0],
# [-np.sin(_pitch), 0, np.cos(_pitch)]])
# Rz = np.array([[np.cos(_yaw), -np.sin(_yaw), 0],
# [np.sin(_yaw), np.cos(_yaw), 0],
# [0, 0, 1]])
# # 计算总的旋转矩阵(假设使用 ZYX 顺序)
# R = Rz @ Ry @ Rx
# return R
def parse_custom_attribute(ply_filename, attribute_name):
with open(ply_filename, 'r') as f:
lines = f.readlines()
# 解析文件头以找到自定义属性的位置
header_lines = []
for line in lines:
if line.strip() == 'end_header':
header_lines.append(line)
break
header_lines.append(line)
vertex_count = None
custom_index = None
for i, line in enumerate(header_lines):
parts = line.strip().split()
if len(parts) > 2 and parts[0] == 'element' and parts[1] == 'vertex':
vertex_count = int(parts[2])
elif len(parts) > 2 and parts[0] == 'property' and parts[-1] == attribute_name:
# custom_index = i - len([l for l in header_lines if 'element' in l]) - 1
custom_index = 3
if vertex_count is None or custom_index is None:
raise ValueError(f"Failed to find {attribute_name} in the PLY file header.")
# 提取自定义属性的数据
data_start = header_lines.index('end_header\n') + 1
custom_data = []
for line in lines[data_start:data_start + vertex_count]:
values = line.strip().split()
try:
custom_value = int(values[custom_index])
custom_data.append(custom_value)
except (IndexError, ValueError):
raise ValueError(f"Failed to read {attribute_name} from data row.")
return np.array(custom_data)
def project_cloud(pcd_cloud):
im_project = np.zeros((config.global_h, config.global_w, 3), dtype=np.uint8)
im_2d_to_3d = np.zeros((config.global_h, config.global_w, 3), dtype=np.float64)
rvec = np.array([np.array([0.]), np.array([0.]), np.array([0.])])
tvec = np.float64([0.0, 0.0, 0.0])
pts_2d, _ = cv2.projectPoints(np.array(pcd_cloud.points), rvec, tvec, config.camera_matrix, config.dist_coeffs)
color = (255,255,255)
list_pts_2d = pts_2d.tolist()
list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d]
points_np = np.asarray(pcd_cloud.points)
for i, pt_2d in enumerate(list_pts_2d):
# x, y = pt_2d.ravel()
# if x == float('nan') or y == float('nan'): continue
# x, y = int(x), int(y)
# x, y = pt_2d[0][0], pt_2d[0][1]
if 0 <= pt_2d[0][0] < config.global_w and 0 <= pt_2d[0][1] < config.global_h:
im_project[pt_2d[0][1], pt_2d[0][0]] = color
# im_2d_to_3d[y, x][0] = pcd_cloud.points[i][0]
# im_2d_to_3d[y, x][1] = pcd_cloud.points[i][1]
# im_2d_to_3d[y, x][2] = pcd_cloud.points[i][2]
im_2d_to_3d[pt_2d[0][1], pt_2d[0][0]] = points_np[i]
return im_project, im_2d_to_3d, pts_2d, list_pts_2d
def cal_H(im_2d_to_3d, pts_2d, list_pts_2d):
pts_3d = []
# list_pts_2d = pts_2d.tolist()
# list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d]
for i, pt_2d in enumerate(list_pts_2d):
# x, y = pt_2d.ravel()
# x, y = int(x), int(y)
# x, y = pt_2d[0][0], pt_2d[0][1]
pts_3d.append([im_2d_to_3d[pt_2d[1], pt_2d[0]][0],
im_2d_to_3d[pt_2d[1], pt_2d[0]][1],
im_2d_to_3d[pt_2d[1], pt_2d[0]][2]])
ransacReprojThreshold = 3.0
# 查找单应矩阵
# 修改第三个参数 1.0 -> 5.0
# 利用基于RANSAC的鲁棒算法选择最优的四组配对点再计算转换矩阵H(3*3)并返回,以便于反向投影错误率达到最小
# ransacReprojThreshold 设置为 1.0
# H, mask = cv2.findHomography(np.array(src_points), np.array(dst_points), cv2.RANSAC, ransacReprojThreshold, maxIters=1500, confidence=0.998) ## 修改 1.0 -> 5.0
pts_hom = []
for i in range(len(pts_2d)):
pt = pts_2d[i]
x,y = pt[0], pt[1]
pts_hom.append([x*1.0, y*1.0, 1.0])
H, _ = cv2.findHomography(np.array(pts_3d), np.array(pts_hom), cv2.RANSAC, ransacReprojThreshold) ## 修改 1.0 -> 5.0
Log.info("2D-3D 单应性矩阵计算结果为:")
print(H)
return H
def sort_conners_2d(conners):
sort_conner = []
left, top, right, bottom = 0, 0, 0, 0
# maxx,maxy = 0,0
maxx,maxy = -100000,-100000
minx,miny = 100000,100000
for i in range(len(conners)):
conner = conners[i]
if conner[0] < minx:
minx = conner[0]
left = i
if conner[0] > maxx:
maxx = conner[0]
right = i
if conner[1] < miny:
miny = conner[1]
top = i
if conner[1] > maxy:
maxy = conner[1]
bottom = i
sort_conner.append(conners[left])
sort_conner.append(conners[top])
sort_conner.append(conners[right])
sort_conner.append(conners[bottom])
return sort_conner
def select_and_sort_conners_2d(points):
sort_conner = []
left, top, right, bottom = 0, 0, 0, 0
# maxx,maxy = 0,0
maxx,maxy = -100000,-100000
minx,miny = 100000,100000
for i in range(len(points)):
conner = list(points[i])[0]
if conner[0] < minx:
minx = conner[0]
left = i
if conner[0] > maxx:
maxx = conner[0]
right = i
if conner[1] < miny:
miny = conner[1]
top = i
if conner[1] > maxy:
maxy = conner[1]
bottom = i
sort_conner.append([points[left][0][0], points[left][0][1]])
sort_conner.append([points[top][0][0], points[top][0][1]])
sort_conner.append([points[right][0][0], points[right][0][1]])
sort_conner.append([points[bottom][0][0], points[bottom][0][1]])
return sort_conner
def reproject_cloud(plane_model, cloud_load):
reproject_pcd = o3d.geometry.PointCloud()
A,B,C,D = plane_model
for pts_3d in cloud_load.points:
# 求线与平面的交点
t = -D / (A * pts_3d[0] + B * pts_3d[1] + C * pts_3d[2])
# pts_3d = np.array([t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]])
reproject_pcd.points.append((t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]))
return reproject_pcd
def correct_mark_cloud(mark_pcd):
plane_model, inliers = mark_pcd.segment_plane(distance_threshold=0.0155,
ransac_n=3,
num_iterations=10000)
inlier_pcd = mark_pcd.select_by_index(inliers)
inlier_pcd = reproject_cloud(plane_model, inlier_pcd) #利用投影得到的图像
inlier_pcd = inlier_pcd.paint_uniform_color([1.0, 0, 0])
outlier_cloud = mark_pcd.select_by_index(inliers, invert=True)
# 暂时没有用到
# radius = 0.05
# inlier_pcd = smooth_point_cloud(inlier_pcd, radius) #利用拟合得到的图像
inlier_pcd_correct = reproject_cloud(plane_model, mark_pcd) #利用投影得到的图像
inlier_pcd_correct = inlier_pcd_correct.paint_uniform_color([0.0, 1.0, 0])
# o3d.visualization.draw_geometries([ inlier_pcd, inlier_pcd_correct ])
# inlier_pcd_correct = inlier_pcd # ???
return plane_model, inlier_pcd, inlier_pcd_correct
if __name__ == '__main__':
# array = [["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]], ["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]]]
# save_to_json('', array)
_color_map = color_map()
show_color_map(_color_map)
# 假设我们有如下 RT 矩阵列表和其他必要的点云数据
RT_matrices = [...] # 这里应该是你提供的 RT 矩阵列表
P = np.random.rand(100, 3) # 示例基准点云
Q = np.random.rand(100, 3) # 示例目标点云
R_opt, t_opt = filter_and_fit_rts(RT_matrices, P, Q, threshold=0.1, use_ransac=True)
print("Optimal Rotation Matrix:\n", R_opt)
print("Optimal Translation Vector:\n", t_opt)