import logging from PyQt5.QtCore import QObject, pyqtSignal from dynaconf.base import Settings from core.edge_component import device, EdgeComponent, service class GpioSignals(QObject): gpio_lidar_opened = pyqtSignal(int) gpio_lidar_closed = pyqtSignal(int) gpio_camera_opened = pyqtSignal(int) gpio_camera_closed = pyqtSignal(int) @device("gpio", auto_start=True) class GPIOManager(EdgeComponent): signals = GpioSignals() def __init__(self, context): super(GPIOManager, self).__init__(context) self.lidar_pin = 80 self.camera_pin = 91 self.export_path = '/sys/class/gpio/export' self.unexport_path = '/sys/class/gpio/unexport' self.lidar_value_path = None self.lidar_direction_path = None self.camera_value_path = None self.camera_direction_path = None def configure(self, setting: Settings) -> None: self.lidar_pin = setting.get('gpio.lidar.pin', 80) self.camera_pin = setting.get('gpio.camera.pin', 91) self.lidar_value_path = f'/sys/class/gpio/gpio{self.lidar_pin}/value' self.lidar_direction_path = f'/sys/class/gpio/gpio{self.lidar_pin}/direction' self.camera_value_path = f'/sys/class/gpio/gpio{self.camera_pin}/value' self.camera_direction_path = f'/sys/class/gpio/gpio{self.camera_pin}/direction' self.logger.info(f"Gpio configure done.") @service() def start(self) -> None: """导出 GPIO 引脚并设置为输出""" self.stop() try: with open(self.export_path, 'w') as f: f.write(str(self.lidar_pin)) with open(self.lidar_direction_path, 'w') as f: f.write('out') self.logger.info(f"Export GPIO {self.lidar_pin}") with open(self.export_path, 'w') as f: f.write(str(self.camera_pin)) with open(self.camera_direction_path, 'w') as f: f.write('out') self.logger.info(f"Export GPIO {self.camera_pin}") except IOError as e: self.logger.error(f"Error exporting GPIO {self.lidar_pin}/{self.camera_pin}: {e}") @service() def stop(self) -> None: """释放 GPIO 引脚""" try: with open(self.unexport_path, 'w') as f: f.write(str(self.lidar_pin)) self.logger.info(f"Release GPIO {self.lidar_pin}") with open(self.unexport_path, 'w') as f: f.write(str(self.camera_pin)) self.logger.info(f"Release GPIO {self.camera_pin}") except IOError as e: self.logger.error(f"Error unexporting GPIO {self.lidar_pin}/{self.camera_pin}: {e}") @property def lidar_value(self): try: with open(self.lidar_value_path, 'r') as f: return int(f.read().strip()) except Exception as e: self.logger.error(f"Error reading GPIO value: {e}") return -1 @lidar_value.setter def lidar_value(self, val): try: with open(self.lidar_value_path, 'w') as f: f.write(str(val)) except Exception as e: self.logger.error(f"Error setting GPIO value: {e}") @property def camera_value(self): try: with open(self.camera_value_path, 'r') as f: return int(f.read().strip()) except Exception as e: self.logger.error(f"Error reading GPIO value: {e}") return -1 @camera_value.setter def camera_value(self, val): try: with open(self.camera_value_path, 'w') as f: f.write(str(val)) except Exception as e: self.logger.error(f"Error setting GPIO value: {e}") @service() def open_lidar(self): """设置 GPIO 为低电平并发出信号""" self.lidar_value = 0 # 设置为低电平 self.signals.gpio_lidar_opened.emit(self.lidar_pin) self.logger.info(f"Setting GPIO {self.lidar_pin}: LOW") @service() def close_lidar(self): """设置 GPIO 为高电平并发出信号""" self.lidar_value = 1 # 设置为高电平 self.signals.gpio_lidar_closed.emit(self.lidar_pin) self.logger.info(f"Setting GPIO {self.lidar_pin}: HIGH") @service() def open_camera(self): """设置 GPIO 为低电平并发出信号""" self.camera_value = 0 # 设置为低电平 self.signals.gpio_camera_opened.emit(self.camera_pin) self.logger.info(f"Setting GPIO {self.camera_pin}: LOW") @service() def close_camera(self): """设置 GPIO 为高电平并发出信号""" self.camera_value = 1 # 设置为高电平 self.signals.gpio_camera_closed.emit(self.camera_pin) self.logger.info(f"Setting GPIO {self.camera_pin}: HIGH")