# -- coding: utf-8 -- import time import os import paramiko from scp import SCPClient import threading from os import abort from pathlib import Path from clog import logger HOST = "192.168.100.254" PORT = 22 USERNAME = "root" PASSWORD = "ebaina" CLOUD_COUNTS = [400000, 500000, 600000] DEFAULT_COUNT = 300000 EXPS = [200 * 1000, 300 * 1000, 400 * 1000] DEFAULT_EXP = 100 * 1000 PARENT_DIR_NAME = "test_2" SAVE_DIR_NAME = "" def ssh_execute_command(host, port, username, password, command): ret = False # 创建SSH客户端 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, port, username, password) # 执行命令 stdin, stdout, stderr = client.exec_command(command) # 读取输出结果 output = stdout.read().decode() error = stderr.read().decode() if output: logger.info(f"{output}") if error: logger.error(f"{error}") # 关闭连接 client.close() return True def scp_get_file(host, port, username, password, remote_path, local_path): ret = False # 创建SSH客户端 client = paramiko.Transport((host, port)) client.connect(username=username, password=password) # 创建SCP客户端 scp_client = SCPClient(client) # 拉取文件 try: scp_client.get(remote_path, local_path) except Exception as e: print(e) return False # 关闭连接 scp_client.close() client.close() return True def cameraCap(exp): start_time = time.time() ret = ssh_execute_command( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, command="cd /root/app/detect-gui/ \n" + f"/root/miniconda3/bin/python cap.py --exp={exp}\n" ) if not ret: return False end_time = time.time() # 记录结束时间 execution_time = end_time - start_time # 计算执行时间 print(f"ssh_execute_command 程序执行时间:{execution_time}秒") time.sleep(1) # 等待操作系统保存图像完毕 ret = scp_get_file( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, remote_path="/root/app/detect-gui/output.jpg", local_path=Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME ) if not ret: return False end_time = time.time() # 记录结束时间 execution_time = end_time - start_time # 计算执行时间 print(f"cameraCap 程序执行时间:{execution_time}秒") def lidarCap(cloud_count): start_time = time.time() print(f"./lidar_driver {cloud_count}\n") ret = ssh_execute_command( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, command="cd /root/calibration_tools \n" + f"./lidar_driver {cloud_count}\n" ) ret = scp_get_file( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, remote_path="/root/calibration_tools/output.ply", local_path=Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME ) ret = ssh_execute_command( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, command="cd /root/calibration_tools \n " + "rm output.ply \n" ) end_time = time.time() # 记录结束时间 execution_time = end_time - start_time # 计算执行时间 print(f"lidarCap 程序执行时间:{execution_time}秒") def cap(exp=DEFAULT_EXP, cloud_count=DEFAULT_COUNT): global SAVE_DIR_NAME print(f"曝光时间{exp}") print(f"点云数量:{cloud_count}") SAVE_DIR_NAME = f"exp_{exp}_cn_{cloud_count}" # 如果文件夹不存在则创建 if not os.path.exists(Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME): os.makedirs(Path(__file__).parent / "result" / PARENT_DIR_NAME / SAVE_DIR_NAME) # 雷达线程 t_lidar = threading.Thread(target=lidarCap, args=(cloud_count,)) t_lidar.start() # 相机 cameraCap(exp) # 等待雷达线程结束 t_lidar.join() if __name__ == '__main__': # 加载驱动 ssh_execute_command( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, command="/opt/HuarayTech/MVviewer/module/loadAllDrv.sh \n" ) for cloud_count in CLOUD_COUNTS: logger.info("开始采集") cap(cloud_count = cloud_count) logger.info("采集完毕===========") for exp in EXPS: logger.info("开始采集") cap(exp = exp) logger.info("采集完毕===========")