from threading import Timer from time import sleep from dynaconf.base import Settings from serial import Serial, SerialException from PyQt5.QtCore import QObject, pyqtSignal from core.edge_component import device, EdgeComponent, service def get_crc(pc_rec, i_len): i_crc = 0xFFFF # 初始化CRC值 if i_len <= 0: return None # 长度为0时返回None for ix in range(i_len): i_crc ^= pc_rec[ix] # 进行异或运算 for iy in range(8): if (i_crc & 1) != 0: i_crc = (i_crc >> 1) ^ 0xA001 # CRC多项式 else: i_crc >>= 1 # 右移一位 # 返回高字节和低字节 return [(i_crc >> 8) & 0xFF, i_crc & 0xFF] def read_packet(addr): pack = bytearray([0xAB, 0xCD, 0x00, 0x03, addr, 0x00, 0x00]) crc = get_crc(pack, 7) pack.append(crc[0]) pack.append(crc[1]) return pack class UpsSignals(QObject): battery_change = pyqtSignal(int) state_change = pyqtSignal(int) @device("ups", auto_start=True) class Ups(EdgeComponent): signals = UpsSignals() def __init__(self, context): super(Ups, self).__init__(context) self.port = '/dev/ttyUSB0' self.bytesize = 8 self.parity = 'N' self.stopbits = 1 self.baudrate = 115200 self.serial = None self.read_timer = None self.cmd_read_delay = 0.3 def configure(self, setting: Settings) -> None: self.port = setting.get('ups.port', '/dev/ttyUSB0') self.baudrate = setting.get('ups.baudrate', 115200) self.bytesize = setting.get('ups.bytesize', 8) self.stopbits = setting.get('ups.stopbits', 1) self.parity = setting.get('ups.parity', 'N') self.cmd_read_delay = setting.get('ups.cmd.read.delay', 0.3) self.logger.info(f"Ups configure done.") @service() def start(self) -> None: # 打开串口 try: self.serial = Serial(port=self.port, baudrate=self.baudrate, stopbits=self.stopbits, parity=self.parity, bytesize=self.bytesize, timeout=2.0) self.logger.info(f"open serial port: {self.port} baudrate:{self.baudrate} stopbits:{self.stopbits} bytesize:{self.bytesize}") except SerialException as e: self.logger.error(f"Failed to open serial port: {e}") # 启动读取数据 self.read_battery() self.logger.info('Ups start!') super().start() def read_battery(self): if self.serial is not None and self.serial.isOpen(): # 读取状态 read_state_pack = read_packet(0x05) self.serial.write(read_state_pack) # self.logger.info(f"发送查询状态指令: {read_state_pack.hex()}") sleep(self.cmd_read_delay) if self.serial.in_waiting >= 9: # 返回指令释义: # AB CD 00 03 05 64 00 00 00 data = self.serial.read(9) # self.logger.info(f'收到查询状态数据:{data.hex()}') if data[0] == 0xAB and data[1] == 0xCD: state = data[6] # self.logger.info(f'当前状态:{state}') self.signals.state_change.emit(state) # 读取电量 read_battery_pack = read_packet(0x01) self.serial.write(read_battery_pack) # self.logger.info(f"发送查询电量指令: {read_battery_pack.hex()}") sleep(self.cmd_read_delay) if self.serial.in_waiting >= 9: # 返回指令释义: # abcd000301003c4042 data = self.serial.read(9) # self.logger.info(f'收到查询电量数据:{data.hex()}') if data[0] == 0xAB and data[1] == 0xCD: battery = data[6] # self.logger.info(f'当前电量:{battery}') self.signals.battery_change.emit(battery) self.read_timer = Timer(3, self.read_battery) self.read_timer.start() @service() def stop(self) -> None: if self.read_timer is not None: self.read_timer.cancel() if self.serial is not None: self.serial.close() self.serial = None self.logger.info('Ups stop!') super().stop() if __name__=="__main__": ups = Ups() ups.start() sleep(30) ups.stop()