calibration_tools_v1.0/utils copy.py

504 lines
19 KiB
Python
Raw Normal View History

2025-02-20 10:45:17 +08:00
# -- coding: utf-8 --
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, current_dir)
import inspect
import logging
import logging.config
import numpy as np
import json
import numpy as np
from scipy.spatial.transform import Rotation as R
import cv2
import config
import open3d as o3d
class Log(object):
# 文字效果 0终端默认1高亮4下划线5闪烁7反白显示
# 前景色 303132绿3334353637
# 背景色 404142绿4344454647
class ColorHandler(logging.StreamHandler):
COLOR_CODE = {
'INFO': '\033[1;32m', # 绿色
'WARNING': '\033[1;33m', # 黄色
'ERROR': '\033[1;31m', # 红色
'CRITICAL': '\033[1;34m', # 蓝色
}
def emit(self, record):
log_color = self.COLOR_CODE.get(record.levelname, '') # 默认不改变颜色
log_msg = self.format(record)
stream = self.stream
stream.write(log_color)
stream.write(log_msg)
self.stream.write('\033[0m') # 重置颜色
self.stream.write('\n')
logging.config.fileConfig(os.path.dirname(__file__) + '\\logging.conf')
__logger = logging.getLogger('script')
__logger.propagate = False # 避免重复打印
color_handler = ColorHandler()
color_handler.setFormatter(logging.Formatter('%(asctime)s - %(srcfilename)s - %(srcfunction)s - %(srclineno)d - %(levelname)s : %(message)s'))
__logger.addHandler(color_handler)
@classmethod
def set_level(cls, level):
cls.__logger.setLevel(level)
@classmethod
def debug(cls, msg, *args, **kwargs):
cls._log(logging.DEBUG, msg, *args, **kwargs)
@classmethod
def info(cls, msg, *args, **kwargs):
cls._log(logging.INFO, msg, *args, **kwargs)
@classmethod
def warning(cls, msg, *args, **kwargs):
cls._log(logging.WARNING, msg, *args, **kwargs)
@classmethod
def error(cls, msg, *args, **kwargs):
cls._log(logging.ERROR, msg, *args, **kwargs)
@classmethod
def critical(cls, msg, *args, **kwargs):
cls._log(logging.CRITICAL, msg, *args, **kwargs)
@classmethod
def _log(cls, level, msg, *args, **kwargs):
# 获取调用者的信息
frame = inspect.currentframe().f_back.f_back
caller = inspect.getframeinfo(frame)
# 记录日志,传入调用者的文件名和行号
cls.__logger.log(level, msg, *args, **kwargs,
extra={'srcfilename': os.path.basename(caller.filename), 'srcfunction': caller.function, 'srclineno': caller.lineno})
@classmethod
def get_logger(cls):
return cls.__logger
def color_map():
# 创建一个空的色彩映射表大小为256x1因为我们将生成256种颜色3个通道BGR
color_map = np.zeros((256, 1, 3), dtype=np.uint8)
def blue_to_red(i):
if i < 85:
# 蓝色 -> 绿色 (B: 255->0, G: 0->255, R: 0)
b = 255 - i * 3
g = i * 3
r = 0
elif i < 170:
# 绿色 -> 黄色 (B: 0, G: 255, R: 0->255)
b = 0
g = 255
r = (i - 85) * 3
else:
# 黄色 -> 红色 (B: 0, G: 255->0, R: 255)
b = 0
g = 255 - (i - 170) * 3
r = 255
# 使用 numpy.clip 来保证 RGB 值都在 [0, 255] 的范围内
return tuple(np.clip([b, g, r], 0, 255).astype(int))
# 填充色彩映射表
for i in range(256):
color_map[i, 0] = blue_to_red(i)
return color_map
def show_color_map(color_map):
color_map_vis = cv2.applyColorMap(np.arange(256, dtype=np.uint8).reshape(1, 256), color_map)
color_map_vis = cv2.resize(color_map_vis, (512, 100), interpolation=cv2.INTER_LINEAR)
cv2.imshow('Reflectivity Colormap', color_map_vis)
cv2.waitKey(0)
cv2.destroyAllWindows()
def print_diff(aurco_pt1, aurco_pt2, lidar_pt1, lidar_pt2):
aurco_distance = np.linalg.norm(aurco_pt1 - aurco_pt2)
lidar_distance = np.linalg.norm(lidar_pt1 - lidar_pt2)
print(aurco_distance)
print(lidar_distance)
print(abs(aurco_distance - lidar_distance))
def rotation_matrix_to_euler_angles(R):
# 提取旋转矩阵的元素
r11, r12, r13, r21, r22, r23, r31, r32, r33 = R.flatten()
# 计算旋转角度
_r = np.arctan2(r32, r22)
_p = np.arctan2(-r31, np.sqrt(r32**2 + r33**2))
_y = np.arctan2(r13, r11)
return _r, _p, _y
def rotation_matrix_to_rpy(R):
# 确保输入是一个有效的旋转矩阵
if not np.allclose(np.dot(R, R.T), np.eye(3)) or not np.isclose(np.linalg.det(R), 1.0):
raise ValueError("输入不是一个有效的旋转矩阵")
_pitch = -np.arcsin(R[2, 0])
if np.abs(_pitch - np.pi / 2) < 1e-6: # 处理接近90度的情况
_yaw = 0
_roll = np.arctan2(R[0, 1], R[0, 2])
elif np.abs(_pitch + np.pi / 2) < 1e-6: # 处理接近-90度的情况
_yaw = 0
_roll = -np.arctan2(R[0, 1], R[0, 2])
else:
_yaw = np.arctan2(R[1, 0], R[0, 0])
_roll = np.arctan2(R[2, 1], R[2, 2])
return _roll, _pitch, _yaw
def traverse_folder(folder_path):
path = []
for root, dirs, files in os.walk(folder_path):
for file_name in files:
abs_file_path = os.path.join(root, file_name)
Log.critical(abs_file_path)
path.append(abs_file_path)
return path
def list_subdirs(directory):
return [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
def save_to_json(file_path, array):
str_tmp = '[\n'
for i in range(len(array)):
if i == len(array) -1: str_tmp += json.dumps(array[i]) + '\n'
else: str_tmp += json.dumps(array[i]) + ',\n'
str_tmp += ']'
with open(file_path, 'w+') as f:
f.write(str_tmp)
def read_from_json(file_path):
with open(file_path, 'r') as f:
loaded_array = json.load(f)
return loaded_array
# def rotation_matrix_from_ypr(yaw, pitch, roll):
# """Convert Yaw-Pitch-Roll angles to a rotation matrix."""
# return R.from_euler('xyz', [yaw, pitch, roll], degrees=False).as_matrix()
# def combine_rotations(R1, R2):
# """Combine two rotation matrices."""
# return np.dot(R2, R1)
# def combine_ypr(yaw1, pitch1, roll1, yaw2, pitch2, roll2):
# R1 = rotation_matrix_from_ypr(yaw1, pitch1, roll1)
# R2 = rotation_matrix_from_ypr(yaw2, pitch2, roll2)
# R_combined = combine_rotations(R1, R2)
# r_combined = R.from_matrix(R_combined)
# yaw_combined, pitch_combined, roll_combined = r_combined.as_euler('xyz', degrees=False)
# return [yaw_combined, pitch_combined, roll_combined]
def quaternion_from_rpy(roll, pitch, yaw):
"""Convert Roll-Pitch-Yaw angles to a quaternion."""
return R.from_euler('xyz', [roll, pitch, yaw], degrees=False).as_quat()
def rpy_from_quaternion(q):
"""Convert a quaternion to Roll-Pitch-Yaw angles."""
return R.from_quat(q).as_euler('xyz', degrees=False)
def combine_quaternions(q1, q2):
"""Combine two quaternions."""
r1 = R.from_quat(q1)
r2 = R.from_quat(q2)
combined_rotation = r2 * r1
return combined_rotation.as_quat()
def combine_rpy(roll1, pitch1, yaw1, roll2 ,pitch2, yaw2):
q1 = quaternion_from_rpy(roll1, pitch1, yaw1)
q2 = quaternion_from_rpy(roll2, pitch2, yaw2)
q_combined = combine_quaternions(q1, q2)
roll_combined, pitch_combined, yaw_combined = rpy_from_quaternion(q_combined)
return [roll_combined, pitch_combined, yaw_combined]
def pnp3d(points3d, points2d):
_, rvec, tvec = cv2.solvePnP(points3d, points2d, config.camera_matrix, config.dist_coeffs)
_r, _p, _y = rotation_matrix_to_rpy(rvec)
return tvec, _r, _p, _y
def compute_error(P, Q, R_matrix, t_vector):
"""计算变换后的点云与目标点云之间的误差"""
transformed_P = (R_matrix @ P.T).T + t_vector
error = np.mean(np.linalg.norm(transformed_P - Q, axis=1))
return error, transformed_P
def filter_and_fit_rts(RT_matrices, P, Q, threshold=None, use_ransac=False):
"""
筛选并拟合最优的 RT 矩阵
参数:
RT_matrices: list of tuples (R_matrix, t_vector)
P: numpy.ndarray, 基准点云
Q: numpy.ndarray, 目标点云
threshold: float, 错误阈值可选
use_ransac: bool, 是否使用 RANSAC 方法默认 False
返回:
R_opt, t_opt: 最优的旋转矩阵和平移向量
"""
errors = []
transformed_points = []
# 计算每个 RT 矩阵的误差
for R_matrix, t_vector in RT_matrices:
error, transformed_P = compute_error(P, Q, R_matrix, t_vector)
errors.append(error)
transformed_points.append(transformed_P)
errors = np.array(errors)
transformed_points = np.array(transformed_points)
if use_ransac:
from sklearn.linear_model import RANSACRegressor
model = RANSACRegressor()
model.fit(transformed_points.reshape(-1, 3), Q.repeat(len(RT_matrices), axis=0))
inlier_mask = model.inlier_mask_
valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if inlier_mask[i * len(P)]]
else:
if threshold is not None:
valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if errors[i] < threshold]
else:
valid_RTs = RT_matrices
if not valid_RTs:
raise ValueError("没有有效的 RT 矩阵")
# 使用简单的平均方法拟合最优 RT 矩阵
Rs = np.array([R_matrix for R_matrix, _ in valid_RTs])
ts = np.array([t_vector for _, t_vector in valid_RTs])
# 平均旋转矩阵可以通过四元数平均实现
quats = R.from_matrix(Rs).as_quat().mean(axis=0)
R_opt = R.from_quat(quats).as_matrix()
# 平均平移向量
t_opt = ts.mean(axis=0)
return R_opt, t_opt
def rpy_to_rotation_matrix(r, p, y):
"""Convert RPY angles to a rotation matrix."""
_roll, _pitch, _yaw = r, p, y
# 定义每个轴的旋转矩阵
Rx = np.array([[1, 0, 0],
[0, np.cos(_roll), -np.sin(_roll)],
[0, np.sin(_roll), np.cos(_roll)]])
Ry = np.array([[np.cos(_pitch), 0, np.sin(_pitch)],
[0, 1, 0],
[-np.sin(_pitch), 0, np.cos(_pitch)]])
Rz = np.array([[np.cos(_yaw), -np.sin(_yaw), 0],
[np.sin(_yaw), np.cos(_yaw), 0],
[0, 0, 1]])
# 计算总的旋转矩阵(假设使用 ZYX 顺序)
R = Rz @ Ry @ Rx
return R
# def rpy_t_to_rt_matrix(r, p, y, t):
# """Construct the RT matrix from RPY angles and translation vector."""
# R = rpy_to_rotation_matrix(r, p, y)
# # 构建 4x4 齐次变换矩阵
# RT = np.eye(4)
# RT[:3, :3] = R
# RT[:3, 3] = t
# return RT
def parse_custom_attribute(ply_filename, attribute_name):
with open(ply_filename, 'r') as f:
lines = f.readlines()
# 解析文件头以找到自定义属性的位置
header_lines = []
for line in lines:
if line.strip() == 'end_header':
header_lines.append(line)
break
header_lines.append(line)
vertex_count = None
custom_index = None
for i, line in enumerate(header_lines):
parts = line.strip().split()
if len(parts) > 2 and parts[0] == 'element' and parts[1] == 'vertex':
vertex_count = int(parts[2])
elif len(parts) > 2 and parts[0] == 'property' and parts[-1] == attribute_name:
# custom_index = i - len([l for l in header_lines if 'element' in l]) - 1
custom_index = 3
if vertex_count is None or custom_index is None:
raise ValueError(f"Failed to find {attribute_name} in the PLY file header.")
# 提取自定义属性的数据
data_start = header_lines.index('end_header\n') + 1
custom_data = []
for line in lines[data_start:data_start + vertex_count]:
values = line.strip().split()
try:
custom_value = int(values[custom_index])
custom_data.append(custom_value)
except (IndexError, ValueError):
raise ValueError(f"Failed to read {attribute_name} from data row.")
return np.array(custom_data)
def project_cloud(pcd_cloud):
im_project = np.zeros((config.global_h, config.global_w, 3), dtype=np.uint8)
im_2d_to_3d = np.zeros((config.global_h, config.global_w, 3), dtype=np.float64)
rvec = np.array([np.array([0.]), np.array([0.]), np.array([0.])])
tvec = np.float64([0.0, 0.0, 0.0])
pts_2d, _ = cv2.projectPoints(np.array(pcd_cloud.points), rvec, tvec, config.camera_matrix, config.dist_coeffs)
color = (255,255,255)
list_pts_2d = pts_2d.tolist()
list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d]
points_np = np.asarray(pcd_cloud.points)
for i, pt_2d in enumerate(list_pts_2d):
# x, y = pt_2d.ravel()
# if x == float('nan') or y == float('nan'): continue
# x, y = int(x), int(y)
# x, y = pt_2d[0][0], pt_2d[0][1]
if 0 <= pt_2d[0][0] < config.global_w and 0 <= pt_2d[0][1] < config.global_h:
im_project[pt_2d[0][1], pt_2d[0][0]] = color
# im_2d_to_3d[y, x][0] = pcd_cloud.points[i][0]
# im_2d_to_3d[y, x][1] = pcd_cloud.points[i][1]
# im_2d_to_3d[y, x][2] = pcd_cloud.points[i][2]
im_2d_to_3d[pt_2d[0][1], pt_2d[0][0]] = points_np[i]
return im_project, im_2d_to_3d, pts_2d, list_pts_2d
def cal_H(im_2d_to_3d, pts_2d, list_pts_2d):
pts_3d = []
# list_pts_2d = pts_2d.tolist()
# list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d]
for i, pt_2d in enumerate(list_pts_2d):
# x, y = pt_2d.ravel()
# x, y = int(x), int(y)
# x, y = pt_2d[0][0], pt_2d[0][1]
pts_3d.append([im_2d_to_3d[pt_2d[1], pt_2d[0]][0],
im_2d_to_3d[pt_2d[1], pt_2d[0]][1],
im_2d_to_3d[pt_2d[1], pt_2d[0]][2]])
ransacReprojThreshold = 3.0
# 查找单应矩阵
# 修改第三个参数 1.0 -> 5.0
# 利用基于RANSAC的鲁棒算法选择最优的四组配对点再计算转换矩阵H(3*3)并返回,以便于反向投影错误率达到最小
# ransacReprojThreshold 设置为 1.0
# H, mask = cv2.findHomography(np.array(src_points), np.array(dst_points), cv2.RANSAC, ransacReprojThreshold, maxIters=1500, confidence=0.998) ## 修改 1.0 -> 5.0
pts_hom = []
for i in range(len(pts_2d)):
pt = pts_2d[i]
x,y = pt[0], pt[1]
pts_hom.append([x*1.0, y*1.0, 1.0])
H, _ = cv2.findHomography(np.array(pts_3d), np.array(pts_hom), cv2.RANSAC, ransacReprojThreshold) ## 修改 1.0 -> 5.0
Log.info("2D-3D 单应性矩阵计算结果为:")
print(H)
return H
def sort_conners_2d(conners):
sort_conner = []
left, top, right, bottom = 0, 0, 0, 0
# maxx,maxy = 0,0
maxx,maxy = -100000,-100000
minx,miny = 100000,100000
for i in range(len(conners)):
conner = conners[i]
if conner[0] < minx:
minx = conner[0]
left = i
if conner[0] > maxx:
maxx = conner[0]
right = i
if conner[1] < miny:
miny = conner[1]
top = i
if conner[1] > maxy:
maxy = conner[1]
bottom = i
sort_conner.append(conners[left])
sort_conner.append(conners[top])
sort_conner.append(conners[right])
sort_conner.append(conners[bottom])
return sort_conner
def select_and_sort_conners_2d(points):
sort_conner = []
left, top, right, bottom = 0, 0, 0, 0
# maxx,maxy = 0,0
maxx,maxy = -100000,-100000
minx,miny = 100000,100000
for i in range(len(points)):
conner = list(points[i])[0]
if conner[0] < minx:
minx = conner[0]
left = i
if conner[0] > maxx:
maxx = conner[0]
right = i
if conner[1] < miny:
miny = conner[1]
top = i
if conner[1] > maxy:
maxy = conner[1]
bottom = i
sort_conner.append([points[left][0][0], points[left][0][1]])
sort_conner.append([points[top][0][0], points[top][0][1]])
sort_conner.append([points[right][0][0], points[right][0][1]])
sort_conner.append([points[bottom][0][0], points[bottom][0][1]])
return sort_conner
def reproject_cloud(plane_model, cloud_load):
reproject_pcd = o3d.geometry.PointCloud()
A,B,C,D = plane_model
for pts_3d in cloud_load.points:
# 求线与平面的交点
t = -D / (A * pts_3d[0] + B * pts_3d[1] + C * pts_3d[2])
# pts_3d = np.array([t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]])
reproject_pcd.points.append((t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]))
return reproject_pcd
def correct_mark_cloud(mark_pcd):
plane_model, inliers = mark_pcd.segment_plane(distance_threshold=0.0155,
ransac_n=3,
num_iterations=10000)
inlier_pcd = mark_pcd.select_by_index(inliers)
inlier_pcd = reproject_cloud(plane_model, inlier_pcd) #利用投影得到的图像
inlier_pcd = inlier_pcd.paint_uniform_color([1.0, 0, 0])
outlier_cloud = mark_pcd.select_by_index(inliers, invert=True)
# 暂时没有用到
# radius = 0.05
# inlier_pcd = smooth_point_cloud(inlier_pcd, radius) #利用拟合得到的图像
inlier_pcd_correct = reproject_cloud(plane_model, mark_pcd) #利用投影得到的图像
inlier_pcd_correct = inlier_pcd_correct.paint_uniform_color([0.0, 1.0, 0])
# o3d.visualization.draw_geometries([ inlier_pcd, inlier_pcd_correct ])
# inlier_pcd_correct = inlier_pcd # ???
return plane_model, inlier_pcd, inlier_pcd_correct
if __name__ == '__main__':
# array = [["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]], ["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]]]
# save_to_json('', array)
_color_map = color_map()
show_color_map(_color_map)
# 假设我们有如下 RT 矩阵列表和其他必要的点云数据
RT_matrices = [...] # 这里应该是你提供的 RT 矩阵列表
P = np.random.rand(100, 3) # 示例基准点云
Q = np.random.rand(100, 3) # 示例目标点云
R_opt, t_opt = filter_and_fit_rts(RT_matrices, P, Q, threshold=0.1, use_ransac=True)
print("Optimal Rotation Matrix:\n", R_opt)
print("Optimal Translation Vector:\n", t_opt)