import asyncio import json import os import sqlite3 import traceback from sqlite3 import Error from typing import Any from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.base import STATE_RUNNING from dynaconf.base import Settings from core.edge_component import EdgeComponent, action, service from core.logging import logger ctx = None def execute_action(component, method, **args): try: comp = ctx.get_component(component) result = asyncio.run(comp.execute(method, **args)) logger.info(result) logger.info(f'执行定时任务【{component}.{method}】') except Exception as e: logger.error(f'执行定时任务【{component}.{method}】出错:{str(e)}') logger.error(traceback.format_exc()) def job2dict(job): return {'id': job.id, 'name': job.name, 'trigger': str(job.trigger), 'next_run_time': job.next_run_time, 'args': job.args, 'kwargs': job.kwargs} # 检查表是否存在 def check_table_exists(cursor, table_name): cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") return cursor.fetchone() is not None @action("_database", auto_start=True) class Database(EdgeComponent): def __init__(self, context): super().__init__(context) self.device_info = None self.sql_path = None self.db_path = None def start(self): self.logger.info("Database started") self.init_db() super().start() def configure(self, setting: Settings) -> None: self.db_path = setting.get("sqlite3.path") self.sql_path = setting.get("sqlite3.sql") self.device_info = setting.get("device") self.logger.info(f"Database configure done.") def stop(self): super().stop() self.logger.info("Database stopped") # 读取SQL文件并执行建表语句 def init_db(self): global conn, cursor try: conn = sqlite3.connect(self.db_path) sql_files = sorted(os.listdir(self.sql_path)) executed_sqls = [] cursor = conn.cursor() if check_table_exists(cursor, 'sys_system'): self.logger.info(f"Table sys_system already exists, no need to initialize.") cursor.execute("select file from sys_sql_history order by id") rows = cursor.fetchall() executed_sqls = [row[0] for row in rows] for sql in sql_files: if not sql in executed_sqls: sql_file = os.path.join(self.sql_path, sql) with open(sql_file, 'r') as file: sql_script = file.read() cursor.executescript(sql_script) self.logger.info(f"Execute SQL:{sql_file}") cursor.execute(f"INSERT INTO sys_sql_history (file) VALUES (?)", (sql,)) conn.commit() self.logger.info(f"SQL[{sql_file}] initialized successfully.") self.logger.info("Database initialized successfully.") except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() # 系统信息 @service() def system_info(self): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("select code, val from sys_system") rows = cursor.fetchall() self.logger.info("List system info successfully.") sys_info = dict(rows) if self.device_info is not None: sys_info.update(self.device_info) return sys_info except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() # 插入用户数据 @service() def insert_user(self, username, password): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( f"INSERT INTO sys_user (username, password) VALUES (?, ?)", (username, password) ) conn.commit() self.logger.info("User inserted successfully.") except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() # 根据用户名删除用户 @service() def delete_user(self, username): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("DELETE FROM sys_user WHERE username = ?", (username,)) conn.commit() self.logger.info(f"User {username} deleted successfully.") except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() @service() def list_users(self, username=None): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if username is None: cursor.execute("SELECT id, username, created_at FROM sys_user order by id") else: cursor.execute("SELECT id, username, created_at FROM sys_user WHERE username = ?", (username,)) rows = cursor.fetchall() keys = ['id', 'username', 'created_at'] return [dict(zip(keys, values)) for values in rows] except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() # 根据用户名更新用户密码 @service() def update_user(self, username, password): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("UPDATE sys_user set password = ? WHERE username = ?", (password, username,)) conn.commit() self.logger.info(f"User {username} update successfully.") except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() # 新增任务记录 @service() def add_task(self, name, steps, result=None, state=0, creator='admin'): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() id = cursor.execute( "insert into sys_task(name, creator, steps, result, state)values(?,?,?,?,?)", (name, creator, json.dumps(steps), json.dumps(result), state) ) conn.commit() self.logger.info(f"Task {id} insert successfully.") except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() # 列出任务记录 @service() def list_tasks(self, id=None): global conn, cursor try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if id is None: cursor.execute("SELECT id, name, creator, steps, result, state, created_at FROM sys_task order by id " "desc") else: cursor.execute("SELECT id, name, creator, steps, result, state, created_at FROM sys_task WHERE id = ?", (id,)) rows = cursor.fetchall() keys = ['id', 'name', 'creator', 'steps', 'result', 'state', 'created_at'] return [dict(zip(keys, values)) for values in rows] except Error as e: self.logger.error(f"Error occurred: {e}") finally: if cursor: cursor.close() if conn: conn.close() @action("_scheduler", auto_start=True) class Scheduler(EdgeComponent): def __init__(self, context): super().__init__(context) global ctx self.scheduler = None ctx = self.context def start(self): try: self.scheduler.start() except Exception as e: self.logger.error(f'定时器启动失败:{str(e)}') raise e super().start() self.logger.info("Scheduler started") def configure(self, setting: Settings) -> None: db_url = setting.get('sched.db') job_stores = { 'default': SQLAlchemyJobStore(url=db_url) } executors = { 'default': ThreadPoolExecutor(setting.get('sched.thread_pool_workers', 10)), 'processpool': ProcessPoolExecutor(setting.get('sched.process_pool_workers', 3)) } job_defaults = { 'coalesce': setting.get('sched.coalesce', False), 'max_instances': setting.get('sched.max_instances', 3) } self.scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults, daemon=True) self.logger.info(f"Scheduler configure done.") def stop(self): if self.scheduler is not None and self.scheduler.state == STATE_RUNNING: self.scheduler.shutdown() self.logger.info("Scheduler stopped") super().stop() @service() def add_job(self, schedule_comp: str, schedule_method: str, trigger: str = 'cron', schedule_kwargs: dict[str, Any] = None, **kwargs) -> None: """ 设置定时任务 :param schedule_comp: :param schedule_method: :param trigger: :param schedule_kwargs: :param kwargs: :return: """ if not self.scheduler.running: raise RuntimeError('定时器未启动') self.logger.debug(f'设置定时任务:{schedule_comp}.{schedule_method}:{json.dumps(schedule_kwargs)}') job = self.scheduler.add_job(func=execute_action, args=[schedule_comp, schedule_method], kwargs=schedule_kwargs, trigger=trigger, **kwargs) return job.id @service() def get_jobs(self): if not self.scheduler.running: raise RuntimeError('定时器未启动') return [job2dict(job) for job in self.scheduler.get_jobs()] @service() def get_job(self, job_id): if not self.scheduler.running: raise RuntimeError('定时器未启动') job = self.scheduler.get_job(job_id=job_id) return job2dict(job) @service() def pause_job(self, job_id): if not self.scheduler.running: raise RuntimeError('定时器未启动') job = self.scheduler.pause_job(job_id=job_id) return job2dict(job) @service() def resume_job(self, job_id): if not self.scheduler.running: raise RuntimeError('定时器未启动') job = self.scheduler.resume_job(job_id=job_id) return job2dict(job) @service() def remove_job(self, job_id): if not self.scheduler.running: raise RuntimeError('定时器未启动') self.scheduler.remove_job(job_id=job_id) @service() def remove_jobs(self): if not self.scheduler.running: raise RuntimeError('定时器未启动') self.scheduler.remove_all_jobs()