ymj_data_collect/capture_img_ply.py

168 lines
4.8 KiB
Python
Raw Normal View History

2025-02-25 10:15:21 +08:00
# -- coding: utf-8 --
import time
import os
import paramiko
from scp import SCPClient
import threading
from os import abort
from pathlib import Path
from clog import logger
HOST = "192.168.100.254"
PORT = 22
USERNAME = "root"
PASSWORD = "ebaina"
CLOUD_COUNTS = [400000, 500000, 600000]
DEFAULT_COUNT = 300000
EXPS = [200 * 1000, 300 * 1000, 400 * 1000]
DEFAULT_EXP = 100 * 1000
2025-04-20 21:44:06 +08:00
PARENT_DIR_NAME = "test_ymj"
SAVE_DIR_NAME = "test_ymj_1"
2025-02-25 10:15:21 +08:00
def ssh_execute_command(host, port, username, password, command):
ret = False
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, port, username, password)
# 执行命令
stdin, stdout, stderr = client.exec_command(command)
# 读取输出结果
output = stdout.read().decode()
error = stderr.read().decode()
if output:
logger.info(f"{output}")
if error:
logger.error(f"{error}")
# 关闭连接
client.close()
return True
def scp_get_file(host, port, username, password, remote_path, local_path):
ret = False
# 创建SSH客户端
client = paramiko.Transport((host, port))
client.connect(username=username, password=password)
# 创建SCP客户端
scp_client = SCPClient(client)
# 拉取文件
try:
scp_client.get(remote_path, local_path)
except Exception as e:
print(e)
return False
# 关闭连接
scp_client.close()
client.close()
return True
def cameraCap(exp):
start_time = time.time()
ret = ssh_execute_command(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD,
command="cd /root/app/detect-gui/ \n" +
f"/root/miniconda3/bin/python cap.py --exp={exp}\n"
)
if not ret:
return False
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"ssh_execute_command 程序执行时间:{execution_time}")
time.sleep(1) # 等待操作系统保存图像完毕
ret = scp_get_file(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD,
remote_path="/root/app/detect-gui/output.jpg",
local_path=Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME
)
if not ret:
return False
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"cameraCap 程序执行时间:{execution_time}")
def lidarCap(cloud_count):
start_time = time.time()
print(f"./lidar_driver {cloud_count}\n")
ret = ssh_execute_command(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD,
command="cd /root/calibration_tools \n" +
f"./lidar_driver {cloud_count}\n"
)
ret = scp_get_file(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD,
remote_path="/root/calibration_tools/output.ply",
local_path=Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME
)
ret = ssh_execute_command(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD,
command="cd /root/calibration_tools \n " +
"rm output.ply \n"
)
end_time = time.time() # 记录结束时间
execution_time = end_time - start_time # 计算执行时间
print(f"lidarCap 程序执行时间:{execution_time}")
def cap(exp=DEFAULT_EXP, cloud_count=DEFAULT_COUNT):
global SAVE_DIR_NAME
print(f"曝光时间{exp}")
print(f"点云数量:{cloud_count}")
SAVE_DIR_NAME = f"exp_{exp}_cn_{cloud_count}"
# 如果文件夹不存在则创建
if not os.path.exists(Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME):
os.makedirs(Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME)
# 雷达线程
t_lidar = threading.Thread(target=lidarCap, args=(cloud_count,))
t_lidar.start()
# 相机
cameraCap(exp)
# 等待雷达线程结束
t_lidar.join()
if __name__ == '__main__':
# 加载驱动
ssh_execute_command(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD,
command="/opt/HuarayTech/MVviewer/module/loadAllDrv.sh \n"
)
for cloud_count in CLOUD_COUNTS:
logger.info("开始采集")
cap(cloud_count = cloud_count)
logger.info("采集完毕===========")
for exp in EXPS:
logger.info("开始采集")
cap(exp = exp)
logger.info("采集完毕===========")