detect-gui/components/ups.py

136 lines
4.4 KiB
Python
Raw Normal View History

2024-11-21 11:39:52 +08:00
from threading import Timer
from time import sleep
from dynaconf.base import Settings
from serial import Serial, SerialException
from PyQt5.QtCore import QObject, pyqtSignal
from core.edge_component import device, EdgeComponent, service
def get_crc(pc_rec, i_len):
i_crc = 0xFFFF # 初始化CRC值
if i_len <= 0:
return None # 长度为0时返回None
for ix in range(i_len):
i_crc ^= pc_rec[ix] # 进行异或运算
for iy in range(8):
if (i_crc & 1) != 0:
i_crc = (i_crc >> 1) ^ 0xA001 # CRC多项式
else:
i_crc >>= 1 # 右移一位
# 返回高字节和低字节
return [(i_crc >> 8) & 0xFF, i_crc & 0xFF]
def read_packet(addr):
pack = bytearray([0xAB, 0xCD, 0x00, 0x03, addr, 0x00, 0x00])
crc = get_crc(pack, 7)
pack.append(crc[0])
pack.append(crc[1])
return pack
class UpsSignals(QObject):
battery_change = pyqtSignal(int)
state_change = pyqtSignal(int)
@device("ups", auto_start=True)
class Ups(EdgeComponent):
signals = UpsSignals()
def __init__(self, context):
super(Ups, self).__init__(context)
self.port = '/dev/ttyUSB0'
self.bytesize = 8
self.parity = 'N'
self.stopbits = 1
self.baudrate = 115200
self.serial = None
self.read_timer = None
self.cmd_read_delay = 0.3
def configure(self, setting: Settings) -> None:
self.port = setting.get('ups.port', '/dev/ttyUSB0')
self.baudrate = setting.get('ups.baudrate', 115200)
self.bytesize = setting.get('ups.bytesize', 8)
self.stopbits = setting.get('ups.stopbits', 1)
self.parity = setting.get('ups.parity', 'N')
self.cmd_read_delay = setting.get('ups.cmd.read.delay', 0.3)
self.logger.info(f"Ups configure done.")
@service()
def start(self) -> None:
# 打开串口
try:
self.serial = Serial(port=self.port, baudrate=self.baudrate, stopbits=self.stopbits, parity=self.parity,
bytesize=self.bytesize, timeout=2.0)
self.logger.info(f"open serial port: {self.port} baudrate:{self.baudrate} stopbits:{self.stopbits} bytesize:{self.bytesize}")
except SerialException as e:
self.logger.error(f"Failed to open serial port: {e}")
# 启动读取数据
self.read_battery()
self.logger.info('Ups start!')
super().start()
def read_battery(self):
if self.serial is not None and self.serial.isOpen():
# 读取状态
read_state_pack = read_packet(0x05)
self.serial.write(read_state_pack)
# self.logger.info(f"发送查询状态指令: {read_state_pack.hex()}")
sleep(self.cmd_read_delay)
if self.serial.in_waiting >= 9:
# 返回指令释义:
# AB CD 00 03 05 64 00 00 00
data = self.serial.read(9)
# self.logger.info(f'收到查询状态数据:{data.hex()}')
if data[0] == 0xAB and data[1] == 0xCD:
state = data[6]
# self.logger.info(f'当前状态:{state}')
self.signals.state_change.emit(state)
# 读取电量
read_battery_pack = read_packet(0x01)
self.serial.write(read_battery_pack)
# self.logger.info(f"发送查询电量指令: {read_battery_pack.hex()}")
sleep(self.cmd_read_delay)
if self.serial.in_waiting >= 9:
# 返回指令释义:
# abcd000301003c4042
data = self.serial.read(9)
# self.logger.info(f'收到查询电量数据:{data.hex()}')
if data[0] == 0xAB and data[1] == 0xCD:
battery = data[6]
# self.logger.info(f'当前电量:{battery}')
self.signals.battery_change.emit(battery)
self.read_timer = Timer(3, self.read_battery)
self.read_timer.start()
@service()
def stop(self) -> None:
if self.read_timer is not None:
self.read_timer.cancel()
if self.serial is not None:
self.serial.close()
self.serial = None
self.logger.info('Ups stop!')
super().stop()
if __name__=="__main__":
ups = Ups()
ups.start()
sleep(30)
ups.stop()