# -- coding: utf-8 -- import time import os import paramiko from scp import SCPClient import threading from os import abort HOST = "192.168.100.254" PORT = 22 USERNAME = "root" PASSWORD = "ebaina" EXPTIME = 4500000 CLOUDPOINTS = 30 * 10000 def ssh_execute_command(host, port, username, password, command): ret = False # 创建SSH客户端 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, port, username, password) # 执行命令 stdin, stdout, stderr = client.exec_command(command) print(stdout.read().decode()) print(stderr.read().decode()) # 关闭连接 client.close() return True def scp_get_file(host, port, username, password, remote_path, local_path): ret = False # 创建SSH客户端 client = paramiko.Transport((host, port)) client.connect(username=username, password=password) # 创建SCP客户端 scp_client = SCPClient(client) # 拉取文件 try: scp_client.get(remote_path, local_path) except Exception as e: print(e) return False # 关闭连接 scp_client.close() client.close() return True def cameraCap(): start_time = time.time() ret = ssh_execute_command( host = HOST, port = PORT, username = USERNAME, password = PASSWORD, command = "cd /root/calibration_tools \n" + "./camera_driver " + str(EXPTIME) + "\n" ) end_time = time.time() # 记录结束时间 execution_time = end_time - start_time # 计算执行时间 print(f"ssh_execute_command 程序执行时间:{execution_time}秒") time.sleep(1) # 等待操作系统保存图像完毕 ret = scp_get_file( host = HOST, port = PORT, username = USERNAME, password = PASSWORD, remote_path = "/root/calibration_tools/output.png", local_path = os.path.dirname(__file__) + "\\ssh_tmp\\output.png" ) if not ret: return False ret = ssh_execute_command( host = HOST, port = PORT, username = USERNAME, password = PASSWORD, command = "cd /root/calibration_tools \n " + "rm output.png \n" ) end_time = time.time() # 记录结束时间 execution_time = end_time - start_time # 计算执行时间 print(f"cameraCap 程序执行时间:{execution_time}秒") def lidarCap(): start_time = time.time() ret = ssh_execute_command( host = HOST, port = PORT, username = USERNAME, password = PASSWORD, command = "cd /root/calibration_tools \n" + "./lidar_driver " + str(CLOUDPOINTS) + "\n" ) ret = scp_get_file( host = HOST, port = PORT, username = USERNAME, password = PASSWORD, remote_path = "/root/calibration_tools/output.ply", local_path = os.path.dirname(__file__) + ".\\ssh_tmp\\output.ply" ) ret = ssh_execute_command( host = HOST, port = PORT, username = USERNAME, password = PASSWORD, command = "cd /root/calibration_tools \n " + "rm output.ply \n" ) end_time = time.time() # 记录结束时间 execution_time = end_time - start_time # 计算执行时间 print(f"lidarCap 程序执行时间:{execution_time}秒") def cap(): directory = os.path.dirname(__file__) + ".\\ssh_tmp\\" files_and_directories = os.listdir(directory) for file in files_and_directories: file_path = os.path.join(directory, file) if os.path.isfile(file_path): os.unlink(file_path) t_lidar = threading.Thread(target=lidarCap) t_lidar.start() cameraCap() # t_camera.join() t_lidar.join() if __name__ == '__main__': cap()