147 lines
4.0 KiB
Python
147 lines
4.0 KiB
Python
# -- coding: utf-8 --
|
|
import time
|
|
import os
|
|
import paramiko
|
|
from scp import SCPClient
|
|
import threading
|
|
from os import abort
|
|
|
|
|
|
HOST = "192.168.100.254"
|
|
PORT = 22
|
|
USERNAME = "root"
|
|
PASSWORD = "ebaina"
|
|
|
|
|
|
def ssh_execute_command(host, port, username, password, command):
|
|
ret = False
|
|
# 创建SSH客户端
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
client.connect(host, port, username, password)
|
|
|
|
# 执行命令
|
|
stdin, stdout, stderr = client.exec_command(command)
|
|
print(stdout.read().decode())
|
|
print(stderr.read().decode())
|
|
|
|
# 关闭连接
|
|
client.close()
|
|
return True
|
|
|
|
|
|
def scp_get_file(host, port, username, password, remote_path, local_path):
|
|
ret = False
|
|
# 创建SSH客户端
|
|
client = paramiko.Transport((host, port))
|
|
client.connect(username=username, password=password)
|
|
|
|
# 创建SCP客户端
|
|
scp_client = SCPClient(client)
|
|
|
|
# 拉取文件
|
|
try:
|
|
scp_client.get(remote_path, local_path)
|
|
except Exception as e:
|
|
print(e)
|
|
return False
|
|
|
|
# 关闭连接
|
|
scp_client.close()
|
|
client.close()
|
|
|
|
return True
|
|
|
|
def cameraCap():
|
|
start_time = time.time()
|
|
ret = ssh_execute_command(
|
|
host = HOST,
|
|
port = PORT,
|
|
username = USERNAME,
|
|
password = PASSWORD,
|
|
command = "cd /root/calibration_tools \n" +
|
|
"./camera_driver 240000 \n"
|
|
)
|
|
end_time = time.time() # 记录结束时间
|
|
execution_time = end_time - start_time # 计算执行时间
|
|
print(f"ssh_execute_command 程序执行时间:{execution_time}秒")
|
|
time.sleep(1) # 等待操作系统保存图像完毕
|
|
ret = scp_get_file(
|
|
host = HOST,
|
|
port = PORT,
|
|
username = USERNAME,
|
|
password = PASSWORD,
|
|
remote_path = "/root/calibration_tools/output.png",
|
|
local_path = os.path.dirname(__file__) + "\\ssh_tmp\\1\\output.png"
|
|
)
|
|
if not ret:
|
|
return False
|
|
ret = ssh_execute_command(
|
|
host = HOST,
|
|
port = PORT,
|
|
username = USERNAME,
|
|
password = PASSWORD,
|
|
command = "cd /root/calibration_tools \n " +
|
|
"rm output.png \n"
|
|
)
|
|
end_time = time.time() # 记录结束时间
|
|
execution_time = end_time - start_time # 计算执行时间
|
|
print(f"cameraCap 程序执行时间:{execution_time}秒")
|
|
|
|
|
|
def lidarCap():
|
|
start_time = time.time()
|
|
ret = ssh_execute_command(
|
|
host = HOST,
|
|
port = PORT,
|
|
username = USERNAME,
|
|
password = PASSWORD,
|
|
command = "cd /root/calibration_tools \n" +
|
|
"./lidar_driver \n"
|
|
)
|
|
ret = scp_get_file(
|
|
host = HOST,
|
|
port = PORT,
|
|
username = USERNAME,
|
|
password = PASSWORD,
|
|
remote_path = "/root/calibration_tools/output.ply",
|
|
local_path = os.path.dirname(__file__) + ".\\ssh_tmp\\1\\output.ply"
|
|
)
|
|
ret = ssh_execute_command(
|
|
host = HOST,
|
|
port = PORT,
|
|
username = USERNAME,
|
|
password = PASSWORD,
|
|
command = "cd /root/calibration_tools \n " +
|
|
"rm output.ply \n"
|
|
)
|
|
end_time = time.time() # 记录结束时间
|
|
execution_time = end_time - start_time # 计算执行时间
|
|
print(f"lidarCap 程序执行时间:{execution_time}秒")
|
|
|
|
|
|
def cap():
|
|
|
|
directory = os.path.dirname(__file__) + ".\\ssh_tmp\\"
|
|
files_and_directories = os.listdir(directory)
|
|
for file in files_and_directories:
|
|
file_path = os.path.join(directory, file)
|
|
if os.path.isfile(file_path):
|
|
os.unlink(file_path)
|
|
|
|
t_lidar = threading.Thread(target=lidarCap)
|
|
t_lidar.start()
|
|
cameraCap()
|
|
|
|
# t_camera.join()
|
|
|
|
t_lidar.join()
|
|
|
|
if __name__ == '__main__':
|
|
# 1 设置条件
|
|
# 2 cap
|
|
cap()
|
|
# 3 修改 config.ini
|
|
# 4 启动 image_framework.py
|
|
# 5 接收 回调,把结果保存下来
|