Files
dtm-py-all/UI/models/monitor_model.py

138 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
监控模型,处理试验安排与监控相关的数据
"""
from PyQt5.QtCore import QObject, pyqtProperty, pyqtSignal, QVariant, pyqtSlot
import sqlite3
import os
import json
import ast
class MonitorModel(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self._monitor_data = []
self._filtered_data = [] # 仅包含dutDirectionMode==2的数据
self._load_data()
# 信号定义
dataChanged = pyqtSignal()
def _load_data(self):
"""从数据库加载数据"""
try:
# 连接数据库
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dtmgtDb.db')
if os.path.exists(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 查询 dutList 表中的试验数据
cursor.execute("""
SELECT SN, name, project, projectType, phase, weeks, stationAssigned,
itemOnGoing, itemsFinished, status, testReq, inspector, id,
description, deadLine, createdate, workOrder, direction_codes
FROM dutList
""")
rows = cursor.fetchall()
self._monitor_data = []
for row in rows:
# 解析direction_codes字段获取dutDirectionMode
direction_codes = row[17] if row[17] else ""
dut_direction_mode = 0
# 解析类似"[33, 35, 36]"的字符串
if direction_codes and direction_codes.startswith('[') and direction_codes.endswith(']'):
try:
# 使用ast.literal_eval安全地解析列表
codes_list = ast.literal_eval(direction_codes)
if isinstance(codes_list, list) and len(codes_list) > 0:
dut_direction_mode = codes_list[0] # 取第一个元素作为dutDirectionMode
except:
# 如果解析失败,尝试其他方式
try:
# 移除方括号并分割
codes_str = direction_codes.strip('[]')
codes_list = [int(code.strip()) for code in codes_str.split(',') if code.strip().isdigit()]
if len(codes_list) > 0:
dut_direction_mode = codes_list[0]
except:
dut_direction_mode = 0
elif direction_codes.isdigit():
# 如果是纯数字
dut_direction_mode = int(direction_codes)
monitor = {
"id": row[12] if row[12] is not None else 0,
"name": row[1] if row[1] else "",
"status": row[9] if row[9] else "",
"dutDirectionMode": dut_direction_mode,
"project": row[2] if row[2] else "",
"SN": row[0] if row[0] else "",
"description": row[13] if row[13] else ""
}
self._monitor_data.append(monitor)
conn.close()
else:
# 如果数据库不存在,使用示例数据
self._monitor_data = [
{"id": 1, "name": "测试1", "dutDirectionMode": 2, "status": "运行中", "project": "项目A"},
{"id": 2, "name": "测试2", "dutDirectionMode": 1, "status": "待开始", "project": "项目B"},
{"id": 3, "name": "测试3", "dutDirectionMode": 2, "status": "已完成", "project": "项目C"}
]
# 过滤数据只保留dutDirectionMode==2的记录
self._filtered_data = [item for item in self._monitor_data if item.get("dutDirectionMode") == 2]
self.dataChanged.emit()
except Exception as e:
print(f"加载监控数据时出错: {e}")
import traceback
traceback.print_exc()
# 出错时使用默认数据
self._monitor_data = []
self._filtered_data = []
@pyqtProperty('QVariant', notify=dataChanged)
def monitorData(self):
"""返回所有监控数据"""
return self._monitor_data
@pyqtProperty('QVariant', notify=dataChanged)
def filteredMonitorData(self):
"""返回过滤后的监控数据仅dutDirectionMode==2"""
return self._filtered_data
@pyqtSlot(int, result='QVariant')
def getMonitorById(self, monitor_id):
"""根据ID获取监控信息"""
for monitor in self._monitor_data:
if monitor.get("id") == monitor_id:
return monitor
return None
@pyqtSlot(result=int)
def getMonitorCount(self):
"""获取监控总数"""
return len(self._monitor_data)
@pyqtSlot(result=int)
def getFilteredMonitorCount(self):
"""获取过滤后的监控总数"""
return len(self._filtered_data)
def get_filtered_monitors(self):
"""获取过滤后的监控列表(供视图直接调用)"""
return self._filtered_data
def get_all_monitors(self):
"""获取所有监控列表(供视图直接调用)"""
return self._monitor_data