import asyncio import inspect import os from abc import ABC, abstractmethod import logging from functools import wraps from typing import Any from dynaconf.base import Settings from core.config import settings from core.edge_logger import LoggingMixin def device(name, auto_start=False): def decorator(cls): cls.component_name = name cls.component_type = "device" cls.component_auto_start = auto_start return cls return decorator def action(name, auto_start=True): def decorator(cls): cls.component_name = name cls.component_type = "action" cls.component_auto_start = auto_start return cls return decorator def service(): """ 定义标记服务的装饰器,只有有此装饰器的函数才可以被远程调用 :return: """ def decorator(func): func._component_service = True if not inspect.iscoroutinefunction(func): # 如果原函数不是协程函数,则直接返回原函数 return func @wraps(func) async def wrapper(*args, **kwargs): return await func(*args, **kwargs) return wrapper return decorator class EdgeComponent(LoggingMixin, ABC): component_auto_start = False def __init__(self, context): super().__init__() self.context = context self.is_started = False async def execute(self, func_name, *args, **kwargs): func = getattr(self, func_name, None) if func is None: raise RuntimeError(f"Function {func_name} not found in {self.__class__.__name__}") if hasattr(func, '_component_service') and func._component_service: if callable(func): method_signature = inspect.signature(func) method_params = method_signature.parameters call_args = [] for param_name, param_info in method_params.items(): if param_name == 'self' or param_info.kind == 4: continue param_type = param_info.annotation if param_name in kwargs: # 尝试将参数值转换为指定类型 try: if param_type is inspect.Parameter.empty or param_type is Any: # 如果参数类型未注明或为 Any 类型,则不进行强制类型转换 call_args.append(kwargs[param_name]) else: # 否则,尝试将参数值转换为指定类型 call_args.append(param_type(kwargs[param_name])) except (TypeError, ValueError) as e: raise ValueError(f"Invalid value '{kwargs[param_name]}' for parameter '{param_name}': {e}") elif param_info.default is inspect.Parameter.empty: # 如果参数没有默认值且未在 kwargs 中指定,则抛出异常 raise ValueError(f"Missing required parameter '{param_name}' for method '{func_name}'.") else: # 否则,使用参数的默认值 call_args.append(param_info.default) func_kwargs = {key: value for key, value in kwargs.items() if key not in [param_name for param_name, _ in method_params.items()]} if inspect.iscoroutinefunction(func): return await func(*tuple(call_args), **func_kwargs) else: return await asyncio.to_thread(func, *tuple(call_args), **func_kwargs) else: raise RuntimeError(f"Function {func.__name__} not found in {self.__class__.__name__}") else: raise RuntimeError(f"Function {func.__name__} does not have the required annotation.") @abstractmethod def start(self): """ 启动 :return: """ self.is_started = True @abstractmethod def configure(self, setting: Settings) -> None: """ 配置 :param setting: :return: """ pass @abstractmethod def stop(self): """ 停止 :return: """ self.is_started = False