import json import sqlite3 from dynaconf.base import Settings from exceptiongroup import catch from core.edge_component import EdgeComponent, action, service @action("dat_task", auto_start=True) class TaskTable(EdgeComponent): def __init__(self, context): super(TaskTable, self).__init__(context) self.db_path = None def configure(self, setting: Settings) -> None: self.db_path = self.context.get_component("_database").db_path @service() def start(self) -> None: super().start() @service() def stop(self) -> None: super().stop() @service() def insert_task(self, entity): cmd_str = "INSERT INTO dat_task (" value_str = " VALUES (" data = [] for k, v in entity.items(): if v is not None and k != "id": cmd_str += " {},".format(k) value_str += " ?," data.append(v) cmd_str = cmd_str[:len(cmd_str) - 1] value_str = value_str[:len(value_str) - 1] cmd_str += ")" + value_str + " )" conn = None cursor = None try: conn = sqlite3.connect(self.db_path) cursor = self.conn.cursor() cursor.execute(cmd_str, data) self.conn.commit() except Exception as e: pass finally: if cursor is not None: cursor.close() if conn is not None: conn.close() @service() def update_task(self, entity): if entity["id"] is None: return cmd_str = "UPDATE dat_task SET" data = [] for k, v in entity.items(): if v is not None and k != "id": cmd_str += " {}=?,".format(k) data.append(v) cmd_str = cmd_str[:len(cmd_str) - 1] if cmd_str.find('?') < 0: return cmd_str += " WHERE id=?" data.append(entity["id"]) conn = None cursor = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(cmd_str, data) conn.commit() except Exception as e: pass finally: if cursor is not None: cursor.close() if conn is not None: conn.close() @service() def list_task(self, state): conn = None cursor = None l = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT * FROM dat_task WHERE state <= ? order by create_time", (state,)) l = cursor.fetchall() except Exception as e: pass finally: if cursor is not None: cursor.close() if conn is not None: conn.close() return l @service() def add_server_tasks(self, tasks): """ 接收来自PC的任务 :return: """ if tasks is None or len(tasks) == 0: self.logger.warn("没有接收到任务数据!") return conn = None cursor = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() for task in tasks: cursor.execute("SELECT * FROM dat_task WHERE id = ? and device_sn = ?", (task["id"], task["deviceSn"])) rows = cursor.fetchall() if len(rows) == 0: cursor.execute( "INSERT INTO dat_task (id, name, device_sn, param_json, state, create_time, update_time) VALUES (?, ?, ?, ?, ?, ?, ?)", (task['id'], task['name'], task['deviceSn'], task['paramJson'], task['state'], task['createTime'], task['updateTime']) ) conn.commit() except Exception as e: self.logger.error(e) finally: if cursor is not None: cursor.close() if conn is not None: conn.close() @service() def sync_client_tasks(self, device_id, device_sn): """ 上传本地任务数据 :return: """ result = None if device_id is None or device_sn is None == 0: self.logger.warn("没有接收到任务数据!") return result conn = None cursor = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT id, device_sn, result_json, start_time, create_time, state FROM dat_task WHERE id = ? and device_sn = ?", (device_id, device_sn)) row = cursor.fetchone() if row is not None: result = dict() result["id"] = row[0] result["result_json"] = row[2] result["start_time"] = row[3] result["end_time"] = row[4] result["state"] = row[5] except Exception as e: self.logger.error(e) finally: if cursor is not None: cursor.close() if conn is not None: conn.close() return result