494 lines
19 KiB
Python
494 lines
19 KiB
Python
![]() |
# -- coding: utf-8 --
|
|||
|
import os
|
|||
|
import sys
|
|||
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|||
|
sys.path.insert(0, current_dir)
|
|||
|
import inspect
|
|||
|
import logging
|
|||
|
import logging.config
|
|||
|
import numpy as np
|
|||
|
import json
|
|||
|
import numpy as np
|
|||
|
from scipy.spatial.transform import Rotation as R
|
|||
|
import cv2
|
|||
|
import config
|
|||
|
import open3d as o3d
|
|||
|
|
|||
|
class Log(object):
|
|||
|
|
|||
|
# 文字效果 0:终端默认,1:高亮,4:下划线,5:闪烁,7:反白显示
|
|||
|
# 前景色 30(黑),31(红),32(绿),33(黄),34(蓝),35(紫),36(青),37(灰)
|
|||
|
# 背景色 40(黑),41(红),42(绿),43(黄),44(蓝),45(紫),46(青),47(灰)
|
|||
|
class ColorHandler(logging.StreamHandler):
|
|||
|
COLOR_CODE = {
|
|||
|
'INFO': '\033[1;32m', # 绿色
|
|||
|
'WARNING': '\033[1;33m', # 黄色
|
|||
|
'ERROR': '\033[1;31m', # 红色
|
|||
|
'CRITICAL': '\033[1;34m', # 蓝色
|
|||
|
}
|
|||
|
|
|||
|
def emit(self, record):
|
|||
|
log_color = self.COLOR_CODE.get(record.levelname, '') # 默认不改变颜色
|
|||
|
log_msg = self.format(record)
|
|||
|
stream = self.stream
|
|||
|
stream.write(log_color)
|
|||
|
stream.write(log_msg)
|
|||
|
self.stream.write('\033[0m') # 重置颜色
|
|||
|
self.stream.write('\n')
|
|||
|
|
|||
|
logging.config.fileConfig(os.path.dirname(__file__) + '\\logging.conf')
|
|||
|
__logger = logging.getLogger('script')
|
|||
|
__logger.propagate = False # 避免重复打印
|
|||
|
color_handler = ColorHandler()
|
|||
|
color_handler.setFormatter(logging.Formatter('%(asctime)s - %(srcfilename)s - %(srcfunction)s - %(srclineno)d - %(levelname)s : %(message)s'))
|
|||
|
__logger.addHandler(color_handler)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def set_level(cls, level):
|
|||
|
cls.__logger.setLevel(level)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def debug(cls, msg, *args, **kwargs):
|
|||
|
cls._log(logging.DEBUG, msg, *args, **kwargs)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def info(cls, msg, *args, **kwargs):
|
|||
|
cls._log(logging.INFO, msg, *args, **kwargs)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def warning(cls, msg, *args, **kwargs):
|
|||
|
cls._log(logging.WARNING, msg, *args, **kwargs)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def error(cls, msg, *args, **kwargs):
|
|||
|
cls._log(logging.ERROR, msg, *args, **kwargs)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def critical(cls, msg, *args, **kwargs):
|
|||
|
cls._log(logging.CRITICAL, msg, *args, **kwargs)
|
|||
|
|
|||
|
@classmethod
|
|||
|
def _log(cls, level, msg, *args, **kwargs):
|
|||
|
# 获取调用者的信息
|
|||
|
frame = inspect.currentframe().f_back.f_back
|
|||
|
caller = inspect.getframeinfo(frame)
|
|||
|
# 记录日志,传入调用者的文件名和行号
|
|||
|
cls.__logger.log(level, msg, *args, **kwargs,
|
|||
|
extra={'srcfilename': os.path.basename(caller.filename), 'srcfunction': caller.function, 'srclineno': caller.lineno})
|
|||
|
|
|||
|
@classmethod
|
|||
|
def get_logger(cls):
|
|||
|
return cls.__logger
|
|||
|
|
|||
|
def color_map():
|
|||
|
# 创建一个空的色彩映射表,大小为256x1(因为我们将生成256种颜色),3个通道(BGR)
|
|||
|
color_map = np.zeros((256, 1, 3), dtype=np.uint8)
|
|||
|
|
|||
|
def blue_to_red(i):
|
|||
|
if i < 85:
|
|||
|
# 蓝色 -> 绿色 (B: 255->0, G: 0->255, R: 0)
|
|||
|
b = 255 - i * 3
|
|||
|
g = i * 3
|
|||
|
r = 0
|
|||
|
elif i < 170:
|
|||
|
# 绿色 -> 黄色 (B: 0, G: 255, R: 0->255)
|
|||
|
b = 0
|
|||
|
g = 255
|
|||
|
r = (i - 85) * 3
|
|||
|
else:
|
|||
|
# 黄色 -> 红色 (B: 0, G: 255->0, R: 255)
|
|||
|
b = 0
|
|||
|
g = 255 - (i - 170) * 3
|
|||
|
r = 255
|
|||
|
|
|||
|
# 使用 numpy.clip 来保证 RGB 值都在 [0, 255] 的范围内
|
|||
|
return tuple(np.clip([b, g, r], 0, 255).astype(int))
|
|||
|
|
|||
|
# 填充色彩映射表
|
|||
|
for i in range(256):
|
|||
|
color_map[i, 0] = blue_to_red(i)
|
|||
|
|
|||
|
return color_map
|
|||
|
|
|||
|
def show_color_map(color_map):
|
|||
|
color_map_vis = cv2.applyColorMap(np.arange(256, dtype=np.uint8).reshape(1, 256), color_map)
|
|||
|
color_map_vis = cv2.resize(color_map_vis, (512, 100), interpolation=cv2.INTER_LINEAR)
|
|||
|
cv2.imshow('Reflectivity Colormap', color_map_vis)
|
|||
|
cv2.waitKey(0)
|
|||
|
cv2.destroyAllWindows()
|
|||
|
|
|||
|
|
|||
|
def print_diff(aurco_pt1, aurco_pt2, lidar_pt1, lidar_pt2):
|
|||
|
aurco_distance = np.linalg.norm(aurco_pt1 - aurco_pt2)
|
|||
|
lidar_distance = np.linalg.norm(lidar_pt1 - lidar_pt2)
|
|||
|
print(aurco_distance)
|
|||
|
print(lidar_distance)
|
|||
|
print(abs(aurco_distance - lidar_distance))
|
|||
|
|
|||
|
def rotation_matrix_to_euler_angles(R):
|
|||
|
# 提取旋转矩阵的元素
|
|||
|
r11, r12, r13, r21, r22, r23, r31, r32, r33 = R.flatten()
|
|||
|
|
|||
|
# 计算旋转角度
|
|||
|
_r = np.arctan2(r32, r22)
|
|||
|
_p = np.arctan2(-r31, np.sqrt(r32**2 + r33**2))
|
|||
|
_y = np.arctan2(r13, r11)
|
|||
|
|
|||
|
return _r, _p, _y
|
|||
|
|
|||
|
def rotation_matrix_to_rpy(R):
|
|||
|
# 确保输入是一个有效的旋转矩阵
|
|||
|
if not np.allclose(np.dot(R, R.T), np.eye(3)) or not np.isclose(np.linalg.det(R), 1.0):
|
|||
|
raise ValueError("输入不是一个有效的旋转矩阵")
|
|||
|
|
|||
|
_pitch = -np.arcsin(R[2, 0])
|
|||
|
|
|||
|
if np.abs(_pitch - np.pi / 2) < 1e-6: # 处理接近90度的情况
|
|||
|
_yaw = 0
|
|||
|
_roll = np.arctan2(R[0, 1], R[0, 2])
|
|||
|
elif np.abs(_pitch + np.pi / 2) < 1e-6: # 处理接近-90度的情况
|
|||
|
_yaw = 0
|
|||
|
_roll = -np.arctan2(R[0, 1], R[0, 2])
|
|||
|
else:
|
|||
|
_yaw = np.arctan2(R[1, 0], R[0, 0])
|
|||
|
_roll = np.arctan2(R[2, 1], R[2, 2])
|
|||
|
|
|||
|
return _roll, _pitch, _yaw
|
|||
|
|
|||
|
def traverse_folder(folder_path):
|
|||
|
path = []
|
|||
|
for root, dirs, files in os.walk(folder_path):
|
|||
|
for file_name in files:
|
|||
|
abs_file_path = os.path.join(root, file_name)
|
|||
|
Log.critical(abs_file_path)
|
|||
|
path.append(abs_file_path)
|
|||
|
return path
|
|||
|
|
|||
|
def list_subdirs(directory):
|
|||
|
return [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
|
|||
|
|
|||
|
def save_to_json(file_path, array):
|
|||
|
str_tmp = '[\n'
|
|||
|
for i in range(len(array)):
|
|||
|
if i == len(array) -1: str_tmp += json.dumps(array[i]) + '\n'
|
|||
|
else: str_tmp += json.dumps(array[i]) + ',\n'
|
|||
|
str_tmp += ']'
|
|||
|
with open(file_path, 'w+') as f:
|
|||
|
f.write(str_tmp)
|
|||
|
|
|||
|
def read_from_json(file_path):
|
|||
|
with open(file_path, 'r') as f:
|
|||
|
loaded_array = json.load(f)
|
|||
|
return loaded_array
|
|||
|
|
|||
|
# def rotation_matrix_from_ypr(yaw, pitch, roll):
|
|||
|
# """Convert Yaw-Pitch-Roll angles to a rotation matrix."""
|
|||
|
# return R.from_euler('xyz', [yaw, pitch, roll], degrees=False).as_matrix()
|
|||
|
|
|||
|
# def combine_rotations(R1, R2):
|
|||
|
# """Combine two rotation matrices."""
|
|||
|
# return np.dot(R2, R1)
|
|||
|
|
|||
|
# def combine_ypr(yaw1, pitch1, roll1, yaw2, pitch2, roll2):
|
|||
|
# R1 = rotation_matrix_from_ypr(yaw1, pitch1, roll1)
|
|||
|
# R2 = rotation_matrix_from_ypr(yaw2, pitch2, roll2)
|
|||
|
# R_combined = combine_rotations(R1, R2)
|
|||
|
# r_combined = R.from_matrix(R_combined)
|
|||
|
# yaw_combined, pitch_combined, roll_combined = r_combined.as_euler('xyz', degrees=False)
|
|||
|
# return [yaw_combined, pitch_combined, roll_combined]
|
|||
|
|
|||
|
def quaternion_from_rpy(roll, pitch, yaw):
|
|||
|
"""Convert Roll-Pitch-Yaw angles to a quaternion."""
|
|||
|
return R.from_euler('xyz', [roll, pitch, yaw], degrees=False).as_quat()
|
|||
|
|
|||
|
def rpy_from_quaternion(q):
|
|||
|
"""Convert a quaternion to Roll-Pitch-Yaw angles."""
|
|||
|
return R.from_quat(q).as_euler('xyz', degrees=False)
|
|||
|
|
|||
|
def combine_quaternions(q1, q2):
|
|||
|
"""Combine two quaternions."""
|
|||
|
r1 = R.from_quat(q1)
|
|||
|
r2 = R.from_quat(q2)
|
|||
|
combined_rotation = r2 * r1
|
|||
|
return combined_rotation.as_quat()
|
|||
|
|
|||
|
def combine_rpy(roll1, pitch1, yaw1, roll2 ,pitch2, yaw2):
|
|||
|
q1 = quaternion_from_rpy(roll1, pitch1, yaw1)
|
|||
|
q2 = quaternion_from_rpy(roll2, pitch2, yaw2)
|
|||
|
q_combined = combine_quaternions(q1, q2)
|
|||
|
roll_combined, pitch_combined, yaw_combined = rpy_from_quaternion(q_combined)
|
|||
|
return [roll_combined, pitch_combined, yaw_combined]
|
|||
|
|
|||
|
def pnp3d(points3d, points2d):
|
|||
|
_, rvec, tvec = cv2.solvePnP(points3d, points2d, config.camera_matrix, config.dist_coeffs)
|
|||
|
_r, _p, _y = rotation_matrix_to_rpy(rvec)
|
|||
|
return tvec, _r, _p, _y
|
|||
|
|
|||
|
def compute_error(P, Q, R_matrix, t_vector):
|
|||
|
"""计算变换后的点云与目标点云之间的误差"""
|
|||
|
transformed_P = (R_matrix @ P.T).T + t_vector
|
|||
|
error = np.mean(np.linalg.norm(transformed_P - Q, axis=1))
|
|||
|
return error, transformed_P
|
|||
|
|
|||
|
def filter_and_fit_rts(RT_matrices, P, Q, threshold=None, use_ransac=False):
|
|||
|
"""
|
|||
|
筛选并拟合最优的 RT 矩阵。
|
|||
|
|
|||
|
参数:
|
|||
|
RT_matrices: list of tuples (R_matrix, t_vector)
|
|||
|
P: numpy.ndarray, 基准点云
|
|||
|
Q: numpy.ndarray, 目标点云
|
|||
|
threshold: float, 错误阈值,可选
|
|||
|
use_ransac: bool, 是否使用 RANSAC 方法,默认 False
|
|||
|
|
|||
|
返回:
|
|||
|
R_opt, t_opt: 最优的旋转矩阵和平移向量
|
|||
|
"""
|
|||
|
errors = []
|
|||
|
transformed_points = []
|
|||
|
|
|||
|
# 计算每个 RT 矩阵的误差
|
|||
|
for R_matrix, t_vector in RT_matrices:
|
|||
|
error, transformed_P = compute_error(P, Q, R_matrix, t_vector)
|
|||
|
errors.append(error)
|
|||
|
transformed_points.append(transformed_P)
|
|||
|
|
|||
|
errors = np.array(errors)
|
|||
|
transformed_points = np.array(transformed_points)
|
|||
|
|
|||
|
if use_ransac:
|
|||
|
from sklearn.linear_model import RANSACRegressor
|
|||
|
model = RANSACRegressor()
|
|||
|
model.fit(transformed_points.reshape(-1, 3), Q.repeat(len(RT_matrices), axis=0))
|
|||
|
inlier_mask = model.inlier_mask_
|
|||
|
valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if inlier_mask[i * len(P)]]
|
|||
|
else:
|
|||
|
if threshold is not None:
|
|||
|
valid_RTs = [RT_matrices[i] for i in range(len(RT_matrices)) if errors[i] < threshold]
|
|||
|
else:
|
|||
|
valid_RTs = RT_matrices
|
|||
|
|
|||
|
if not valid_RTs:
|
|||
|
raise ValueError("没有有效的 RT 矩阵")
|
|||
|
|
|||
|
# 使用简单的平均方法拟合最优 RT 矩阵
|
|||
|
Rs = np.array([R_matrix for R_matrix, _ in valid_RTs])
|
|||
|
ts = np.array([t_vector for _, t_vector in valid_RTs])
|
|||
|
|
|||
|
# 平均旋转矩阵可以通过四元数平均实现
|
|||
|
quats = R.from_matrix(Rs).as_quat().mean(axis=0)
|
|||
|
R_opt = R.from_quat(quats).as_matrix()
|
|||
|
|
|||
|
# 平均平移向量
|
|||
|
t_opt = ts.mean(axis=0)
|
|||
|
|
|||
|
return R_opt, t_opt
|
|||
|
|
|||
|
# def rpy_to_rotation_matrix(r, p, y):
|
|||
|
# """Convert RPY angles to a rotation matrix."""
|
|||
|
# _roll, _pitch, _yaw = r, p, y
|
|||
|
|
|||
|
# # 定义每个轴的旋转矩阵
|
|||
|
# Rx = np.array([[1, 0, 0],
|
|||
|
# [0, np.cos(_roll), -np.sin(_roll)],
|
|||
|
# [0, np.sin(_roll), np.cos(_roll)]])
|
|||
|
|
|||
|
# Ry = np.array([[np.cos(_pitch), 0, np.sin(_pitch)],
|
|||
|
# [0, 1, 0],
|
|||
|
# [-np.sin(_pitch), 0, np.cos(_pitch)]])
|
|||
|
|
|||
|
# Rz = np.array([[np.cos(_yaw), -np.sin(_yaw), 0],
|
|||
|
# [np.sin(_yaw), np.cos(_yaw), 0],
|
|||
|
# [0, 0, 1]])
|
|||
|
|
|||
|
# # 计算总的旋转矩阵(假设使用 ZYX 顺序)
|
|||
|
# R = Rz @ Ry @ Rx
|
|||
|
# return R
|
|||
|
|
|||
|
|
|||
|
def parse_custom_attribute(ply_filename, attribute_name):
|
|||
|
with open(ply_filename, 'r') as f:
|
|||
|
lines = f.readlines()
|
|||
|
|
|||
|
# 解析文件头以找到自定义属性的位置
|
|||
|
header_lines = []
|
|||
|
for line in lines:
|
|||
|
if line.strip() == 'end_header':
|
|||
|
header_lines.append(line)
|
|||
|
break
|
|||
|
header_lines.append(line)
|
|||
|
|
|||
|
vertex_count = None
|
|||
|
custom_index = None
|
|||
|
|
|||
|
for i, line in enumerate(header_lines):
|
|||
|
parts = line.strip().split()
|
|||
|
if len(parts) > 2 and parts[0] == 'element' and parts[1] == 'vertex':
|
|||
|
vertex_count = int(parts[2])
|
|||
|
elif len(parts) > 2 and parts[0] == 'property' and parts[-1] == attribute_name:
|
|||
|
# custom_index = i - len([l for l in header_lines if 'element' in l]) - 1
|
|||
|
custom_index = 3
|
|||
|
|
|||
|
if vertex_count is None or custom_index is None:
|
|||
|
raise ValueError(f"Failed to find {attribute_name} in the PLY file header.")
|
|||
|
|
|||
|
# 提取自定义属性的数据
|
|||
|
data_start = header_lines.index('end_header\n') + 1
|
|||
|
custom_data = []
|
|||
|
|
|||
|
for line in lines[data_start:data_start + vertex_count]:
|
|||
|
values = line.strip().split()
|
|||
|
try:
|
|||
|
custom_value = int(values[custom_index])
|
|||
|
custom_data.append(custom_value)
|
|||
|
except (IndexError, ValueError):
|
|||
|
raise ValueError(f"Failed to read {attribute_name} from data row.")
|
|||
|
|
|||
|
return np.array(custom_data)
|
|||
|
|
|||
|
def project_cloud(pcd_cloud):
|
|||
|
im_project = np.zeros((config.global_h, config.global_w, 3), dtype=np.uint8)
|
|||
|
im_2d_to_3d = np.zeros((config.global_h, config.global_w, 3), dtype=np.float64)
|
|||
|
rvec = np.array([np.array([0.]), np.array([0.]), np.array([0.])])
|
|||
|
tvec = np.float64([0.0, 0.0, 0.0])
|
|||
|
pts_2d, _ = cv2.projectPoints(np.array(pcd_cloud.points), rvec, tvec, config.camera_matrix, config.dist_coeffs)
|
|||
|
color = (255,255,255)
|
|||
|
list_pts_2d = pts_2d.tolist()
|
|||
|
list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d]
|
|||
|
points_np = np.asarray(pcd_cloud.points)
|
|||
|
for i, pt_2d in enumerate(list_pts_2d):
|
|||
|
# x, y = pt_2d.ravel()
|
|||
|
# if x == float('nan') or y == float('nan'): continue
|
|||
|
# x, y = int(x), int(y)
|
|||
|
# x, y = pt_2d[0][0], pt_2d[0][1]
|
|||
|
if 0 <= pt_2d[0][0] < config.global_w and 0 <= pt_2d[0][1] < config.global_h:
|
|||
|
im_project[pt_2d[0][1], pt_2d[0][0]] = color
|
|||
|
# im_2d_to_3d[y, x][0] = pcd_cloud.points[i][0]
|
|||
|
# im_2d_to_3d[y, x][1] = pcd_cloud.points[i][1]
|
|||
|
# im_2d_to_3d[y, x][2] = pcd_cloud.points[i][2]
|
|||
|
im_2d_to_3d[pt_2d[0][1], pt_2d[0][0]] = points_np[i]
|
|||
|
return im_project, im_2d_to_3d, pts_2d, list_pts_2d
|
|||
|
|
|||
|
def cal_H(im_2d_to_3d, pts_2d, list_pts_2d):
|
|||
|
pts_3d = []
|
|||
|
# list_pts_2d = pts_2d.tolist()
|
|||
|
# list_pts_2d = [[[int(item) for item in subsublist] for subsublist in sublist] for sublist in list_pts_2d]
|
|||
|
for i, pt_2d in enumerate(list_pts_2d):
|
|||
|
# x, y = pt_2d.ravel()
|
|||
|
# x, y = int(x), int(y)
|
|||
|
# x, y = pt_2d[0][0], pt_2d[0][1]
|
|||
|
pts_3d.append([im_2d_to_3d[pt_2d[1], pt_2d[0]][0],
|
|||
|
im_2d_to_3d[pt_2d[1], pt_2d[0]][1],
|
|||
|
im_2d_to_3d[pt_2d[1], pt_2d[0]][2]])
|
|||
|
ransacReprojThreshold = 3.0
|
|||
|
# 查找单应矩阵
|
|||
|
# 修改第三个参数 1.0 -> 5.0
|
|||
|
# 利用基于RANSAC的鲁棒算法选择最优的四组配对点,再计算转换矩阵H(3*3)并返回,以便于反向投影错误率达到最小
|
|||
|
# ransacReprojThreshold 设置为 1.0
|
|||
|
# H, mask = cv2.findHomography(np.array(src_points), np.array(dst_points), cv2.RANSAC, ransacReprojThreshold, maxIters=1500, confidence=0.998) ## 修改 1.0 -> 5.0
|
|||
|
pts_hom = []
|
|||
|
for i in range(len(pts_2d)):
|
|||
|
pt = pts_2d[i]
|
|||
|
x,y = pt[0], pt[1]
|
|||
|
pts_hom.append([x*1.0, y*1.0, 1.0])
|
|||
|
H, _ = cv2.findHomography(np.array(pts_3d), np.array(pts_hom), cv2.RANSAC, ransacReprojThreshold) ## 修改 1.0 -> 5.0
|
|||
|
Log.info("2D-3D 单应性矩阵计算结果为:")
|
|||
|
print(H)
|
|||
|
return H
|
|||
|
|
|||
|
def sort_conners_2d(conners):
|
|||
|
sort_conner = []
|
|||
|
left, top, right, bottom = 0, 0, 0, 0
|
|||
|
# maxx,maxy = 0,0
|
|||
|
maxx,maxy = -100000,-100000
|
|||
|
minx,miny = 100000,100000
|
|||
|
for i in range(len(conners)):
|
|||
|
conner = conners[i]
|
|||
|
if conner[0] < minx:
|
|||
|
minx = conner[0]
|
|||
|
left = i
|
|||
|
if conner[0] > maxx:
|
|||
|
maxx = conner[0]
|
|||
|
right = i
|
|||
|
if conner[1] < miny:
|
|||
|
miny = conner[1]
|
|||
|
top = i
|
|||
|
if conner[1] > maxy:
|
|||
|
maxy = conner[1]
|
|||
|
bottom = i
|
|||
|
sort_conner.append(conners[left])
|
|||
|
sort_conner.append(conners[top])
|
|||
|
sort_conner.append(conners[right])
|
|||
|
sort_conner.append(conners[bottom])
|
|||
|
return sort_conner
|
|||
|
|
|||
|
def select_and_sort_conners_2d(points):
|
|||
|
sort_conner = []
|
|||
|
left, top, right, bottom = 0, 0, 0, 0
|
|||
|
# maxx,maxy = 0,0
|
|||
|
maxx,maxy = -100000,-100000
|
|||
|
minx,miny = 100000,100000
|
|||
|
for i in range(len(points)):
|
|||
|
conner = list(points[i])[0]
|
|||
|
if conner[0] < minx:
|
|||
|
minx = conner[0]
|
|||
|
left = i
|
|||
|
if conner[0] > maxx:
|
|||
|
maxx = conner[0]
|
|||
|
right = i
|
|||
|
if conner[1] < miny:
|
|||
|
miny = conner[1]
|
|||
|
top = i
|
|||
|
if conner[1] > maxy:
|
|||
|
maxy = conner[1]
|
|||
|
bottom = i
|
|||
|
sort_conner.append([points[left][0][0], points[left][0][1]])
|
|||
|
sort_conner.append([points[top][0][0], points[top][0][1]])
|
|||
|
sort_conner.append([points[right][0][0], points[right][0][1]])
|
|||
|
sort_conner.append([points[bottom][0][0], points[bottom][0][1]])
|
|||
|
return sort_conner
|
|||
|
|
|||
|
def reproject_cloud(plane_model, cloud_load):
|
|||
|
reproject_pcd = o3d.geometry.PointCloud()
|
|||
|
A,B,C,D = plane_model
|
|||
|
for pts_3d in cloud_load.points:
|
|||
|
# 求线与平面的交点
|
|||
|
t = -D / (A * pts_3d[0] + B * pts_3d[1] + C * pts_3d[2])
|
|||
|
# pts_3d = np.array([t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]])
|
|||
|
reproject_pcd.points.append((t * pts_3d[0], t * pts_3d[1], t * pts_3d[2]))
|
|||
|
return reproject_pcd
|
|||
|
|
|||
|
def correct_mark_cloud(mark_pcd):
|
|||
|
plane_model, inliers = mark_pcd.segment_plane(distance_threshold=0.0155,
|
|||
|
ransac_n=3,
|
|||
|
num_iterations=10000)
|
|||
|
inlier_pcd = mark_pcd.select_by_index(inliers)
|
|||
|
inlier_pcd = reproject_cloud(plane_model, inlier_pcd) #利用投影得到的图像
|
|||
|
inlier_pcd = inlier_pcd.paint_uniform_color([1.0, 0, 0])
|
|||
|
outlier_cloud = mark_pcd.select_by_index(inliers, invert=True)
|
|||
|
|
|||
|
# 暂时没有用到
|
|||
|
# radius = 0.05
|
|||
|
# inlier_pcd = smooth_point_cloud(inlier_pcd, radius) #利用拟合得到的图像
|
|||
|
|
|||
|
inlier_pcd_correct = reproject_cloud(plane_model, mark_pcd) #利用投影得到的图像
|
|||
|
inlier_pcd_correct = inlier_pcd_correct.paint_uniform_color([0.0, 1.0, 0])
|
|||
|
# o3d.visualization.draw_geometries([ inlier_pcd, inlier_pcd_correct ])
|
|||
|
# inlier_pcd_correct = inlier_pcd # ???
|
|||
|
return plane_model, inlier_pcd, inlier_pcd_correct
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
# array = [["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]], ["1","1",0.3213123,[0.323, 0.323, 0.32],[0.323, 0.323, 0.32]]]
|
|||
|
# save_to_json('', array)
|
|||
|
_color_map = color_map()
|
|||
|
show_color_map(_color_map)
|
|||
|
|
|||
|
# 假设我们有如下 RT 矩阵列表和其他必要的点云数据
|
|||
|
RT_matrices = [...] # 这里应该是你提供的 RT 矩阵列表
|
|||
|
P = np.random.rand(100, 3) # 示例基准点云
|
|||
|
Q = np.random.rand(100, 3) # 示例目标点云
|
|||
|
|
|||
|
R_opt, t_opt = filter_and_fit_rts(RT_matrices, P, Q, threshold=0.1, use_ransac=True)
|
|||
|
|
|||
|
print("Optimal Rotation Matrix:\n", R_opt)
|
|||
|
print("Optimal Translation Vector:\n", t_opt)
|