detect-gui/core/edge_internal.py
2024-11-21 11:39:52 +08:00

346 lines
12 KiB
Python

import asyncio
import json
import os
import sqlite3
import traceback
from sqlite3 import Error
from typing import Any
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.base import STATE_RUNNING
from dynaconf.base import Settings
from core.edge_component import EdgeComponent, action, service
from core.logging import logger
ctx = None
def execute_action(component, method, **args):
try:
comp = ctx.get_component(component)
result = asyncio.run(comp.execute(method, **args))
logger.info(result)
logger.info(f'执行定时任务【{component}.{method}')
except Exception as e:
logger.error(f'执行定时任务【{component}.{method}】出错:{str(e)}')
logger.error(traceback.format_exc())
def job2dict(job):
return {'id': job.id, 'name': job.name, 'trigger': str(job.trigger), 'next_run_time': job.next_run_time,
'args': job.args, 'kwargs': job.kwargs}
# 检查表是否存在
def check_table_exists(cursor, table_name):
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
return cursor.fetchone() is not None
@action("_database", auto_start=True)
class Database(EdgeComponent):
def __init__(self, context):
super().__init__(context)
self.device_info = None
self.sql_path = None
self.db_path = None
def start(self):
self.logger.info("Database started")
self.init_db()
super().start()
def configure(self, setting: Settings) -> None:
self.db_path = setting.get("sqlite3.path")
self.sql_path = setting.get("sqlite3.sql")
self.device_info = setting.get("device")
self.logger.info(f"Database configure done.")
def stop(self):
super().stop()
self.logger.info("Database stopped")
# 读取SQL文件并执行建表语句
def init_db(self):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
sql_files = sorted(os.listdir(self.sql_path))
executed_sqls = []
cursor = conn.cursor()
if check_table_exists(cursor, 'sys_system'):
self.logger.info(f"Table sys_system already exists, no need to initialize.")
cursor.execute("select file from sys_sql_history order by id")
rows = cursor.fetchall()
executed_sqls = [row[0] for row in rows]
for sql in sql_files:
if not sql in executed_sqls:
sql_file = os.path.join(self.sql_path, sql)
with open(sql_file, 'r') as file:
sql_script = file.read()
cursor.executescript(sql_script)
self.logger.info(f"Execute SQL:{sql_file}")
cursor.execute(f"INSERT INTO sys_sql_history (file) VALUES (?)", (sql,))
conn.commit()
self.logger.info(f"SQL[{sql_file}] initialized successfully.")
self.logger.info("Database initialized successfully.")
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 系统信息
@service()
def system_info(self):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("select code, val from sys_system")
rows = cursor.fetchall()
self.logger.info("List system info successfully.")
sys_info = dict(rows)
if self.device_info is not None:
sys_info.update(self.device_info)
return sys_info
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 插入用户数据
@service()
def insert_user(self, username, password):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
f"INSERT INTO sys_user (username, password) VALUES (?, ?)",
(username, password)
)
conn.commit()
self.logger.info("User inserted successfully.")
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 根据用户名删除用户
@service()
def delete_user(self, username):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM sys_user WHERE username = ?", (username,))
conn.commit()
self.logger.info(f"User {username} deleted successfully.")
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
@service()
def list_users(self, username=None):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if username is None:
cursor.execute("SELECT id, username, created_at FROM sys_user order by id")
else:
cursor.execute("SELECT id, username, created_at FROM sys_user WHERE username = ?", (username,))
rows = cursor.fetchall()
keys = ['id', 'username', 'created_at']
return [dict(zip(keys, values)) for values in rows]
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 根据用户名更新用户密码
@service()
def update_user(self, username, password):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("UPDATE sys_user set password = ? WHERE username = ?", (password, username,))
conn.commit()
self.logger.info(f"User {username} update successfully.")
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 新增任务记录
@service()
def add_task(self, name, steps, result=None, state=0, creator='admin'):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
id = cursor.execute(
"insert into sys_task(name, creator, steps, result, state)values(?,?,?,?,?)",
(name, creator, json.dumps(steps), json.dumps(result), state)
)
conn.commit()
self.logger.info(f"Task {id} insert successfully.")
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
# 列出任务记录
@service()
def list_tasks(self, id=None):
global conn, cursor
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if id is None:
cursor.execute("SELECT id, name, creator, steps, result, state, created_at FROM sys_task order by id "
"desc")
else:
cursor.execute("SELECT id, name, creator, steps, result, state, created_at FROM sys_task WHERE id = ?", (id,))
rows = cursor.fetchall()
keys = ['id', 'name', 'creator', 'steps', 'result', 'state', 'created_at']
return [dict(zip(keys, values)) for values in rows]
except Error as e:
self.logger.error(f"Error occurred: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
@action("_scheduler", auto_start=True)
class Scheduler(EdgeComponent):
def __init__(self, context):
super().__init__(context)
global ctx
self.scheduler = None
ctx = self.context
def start(self):
try:
self.scheduler.start()
except Exception as e:
self.logger.error(f'定时器启动失败:{str(e)}')
raise e
super().start()
self.logger.info("Scheduler started")
def configure(self, setting: Settings) -> None:
db_url = setting.get('sched.db')
job_stores = {
'default': SQLAlchemyJobStore(url=db_url)
}
executors = {
'default': ThreadPoolExecutor(setting.get('sched.thread_pool_workers', 10)),
'processpool': ProcessPoolExecutor(setting.get('sched.process_pool_workers', 3))
}
job_defaults = {
'coalesce': setting.get('sched.coalesce', False),
'max_instances': setting.get('sched.max_instances', 3)
}
self.scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults, daemon=True)
self.logger.info(f"Scheduler configure done.")
def stop(self):
if self.scheduler is not None and self.scheduler.state == STATE_RUNNING:
self.scheduler.shutdown()
self.logger.info("Scheduler stopped")
super().stop()
@service()
def add_job(self, schedule_comp: str, schedule_method: str, trigger: str = 'cron',
schedule_kwargs: dict[str, Any] = None, **kwargs) -> None:
"""
设置定时任务
:param schedule_comp:
:param schedule_method:
:param trigger:
:param schedule_kwargs:
:param kwargs:
:return:
"""
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
self.logger.debug(f'设置定时任务:{schedule_comp}.{schedule_method}:{json.dumps(schedule_kwargs)}')
job = self.scheduler.add_job(func=execute_action, args=[schedule_comp, schedule_method], kwargs=schedule_kwargs,
trigger=trigger, **kwargs)
return job.id
@service()
def get_jobs(self):
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
return [job2dict(job) for job in self.scheduler.get_jobs()]
@service()
def get_job(self, job_id):
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
job = self.scheduler.get_job(job_id=job_id)
return job2dict(job)
@service()
def pause_job(self, job_id):
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
job = self.scheduler.pause_job(job_id=job_id)
return job2dict(job)
@service()
def resume_job(self, job_id):
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
job = self.scheduler.resume_job(job_id=job_id)
return job2dict(job)
@service()
def remove_job(self, job_id):
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
self.scheduler.remove_job(job_id=job_id)
@service()
def remove_jobs(self):
if not self.scheduler.running:
raise RuntimeError('定时器未启动')
self.scheduler.remove_all_jobs()