Files
dtm-py-all/UI/models/dut_model.py

478 lines
21 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DUT模型,处理试验样品(DUT: Device Under Test)信息的新建与编辑
支持通过全局配置切换直接访问 SQLite 或通过 HTTP API 访问
"""
from PyQt5.QtCore import QObject, pyqtProperty, pyqtSignal, QVariant, pyqtSlot
import sqlite3
import json
import os
import requests
# 尝试导入配置
try:
from config import USE_HTTP_API, HTTP_API_BASE_URL
except ImportError:
USE_HTTP_API = False
HTTP_API_BASE_URL = "http://127.0.0.1:5050"
class DUTModel(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self._duts = []
self._current_dut = None
# HTTP API 配置
self.use_http_api = USE_HTTP_API
self.api_base_url = HTTP_API_BASE_URL
# 数据库路径
self.db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dtmgtDb.db')
self._load_duts()
# 信号定义
dutsChanged = pyqtSignal()
currentDUTChanged = pyqtSignal()
def _load_duts(self):
"""从数据库加载DUT数据"""
try:
# 尝试使用 HTTP API
if self.use_http_api:
try:
url = f"{self.api_base_url}/dbTableAccess"
# 尝试添加参数获取所有记录(避免后端分页限制)
response = requests.get(
url,
params={
"table": "dutList",
"limit": 1000, # 设置较大的限制值
"offset": 0
},
timeout=5
)
response.raise_for_status()
result = response.json()
data = result.get("data", [])
print(f"\n========== DUT 数据加载调试 ==========")
print(f"从 HTTP API 获取到 {len(data)} 条原始记录")
# 检查是否有分页信息
if 'total' in result:
print(f"后端数据总数: {result.get('total')}")
if 'limit' in result:
print(f"后端分页限制: limit={result.get('limit')}, offset={result.get('offset', 0)}")
if data is not None:
self._duts = []
skipped_count = 0
for idx, row in enumerate(data):
try:
# 保留原始的 stationAssigned JSON 字符串
# 安全处理:即使为 None 或空字符串也保留记录
station_assigned_str = row.get("stationAssigned") or ""
# 如果是 None转为空字符串
if station_assigned_str is None:
station_assigned_str = ""
dtm = ""
station = ""
# 解析 JSON 获取 dtMachine 和 station 字段供其他模块使用
# 只有当 stationAssigned 为非空且以 '{' 开头时才尝试解析
if station_assigned_str and isinstance(station_assigned_str, str) and station_assigned_str.strip().startswith('{'):
try:
obj = json.loads(station_assigned_str)
dtm = (obj.get('dtMachine') or '').strip()
station = (obj.get('station') or '').strip()
except Exception as e:
print(f" [警告] 记录 {idx+1} 解析 stationAssigned JSON 失败: {e}, 值: {station_assigned_str}")
dut = {
"SN": row.get("SN", ""),
"name": row.get("name", ""),
"project": row.get("project", ""),
"projectType": row.get("projectType", ""),
"phase": row.get("phase", ""),
"weeks": row.get("weeks", 0) if row.get("weeks") is not None else 0,
"stationAssigned": station_assigned_str, # 保留原始 JSON 字符串(即使为空)
"dtMachine": dtm, # 解析出的机台编码
"station": station, # 解析出的通道编号
"itemOnGoing": row.get("itemOnGoing", ""),
"itemsFinished": row.get("itemsFinished", 0) if row.get("itemsFinished") is not None else 0,
"status": row.get("status", ""),
"testReq": row.get("testReq", ""),
"inspector": row.get("inspector", ""),
"id": row.get("id", 0) if row.get("id") is not None else 0,
"description": row.get("description", ""),
"deadLine": row.get("deadLine", ""),
"createdate": row.get("createdate", ""),
"workOrder": row.get("workOrder", ""),
"direction_codes": row.get("direction_codes", "")
}
self._duts.append(dut)
except Exception as e:
skipped_count += 1
print(f" [错误] 记录 {idx+1} 处理失败,已跳过: {e}")
print(f" 原始数据: {row}")
import traceback
traceback.print_exc()
print(f"\n总计: 获取 {len(data)} 条 -> 成功加载 {len(self._duts)} 条 -> 跳过 {skipped_count}")
print(f"========================================\n")
self.dutsChanged.emit()
return
except Exception as e:
print(f"HTTP API 加载DUT失败: {e},回退到直接访问 SQLite")
import traceback
traceback.print_exc()
# 直接访问 SQLite
if os.path.exists(self.db_path):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 查询 dutList 表中的所有 DUT
cursor.execute("""
SELECT SN, name, project, projectType, phase, weeks, stationAssigned,
itemOnGoing, itemsFinished, status, testReq, inspector, id,
description, deadLine, createdate, workOrder, direction_codes
FROM dutList
""")
rows = cursor.fetchall()
self._duts = []
for row in rows:
# 保留原始的 stationAssigned JSON 字符串
# 安全处理:即使为 None 或空字符串也保留记录
station_assigned_str = row[6] if row[6] else ""
# 如果是 None转为空字符串
if station_assigned_str is None:
station_assigned_str = ""
dtm = ""
station = ""
# 解析 JSON 获取 dtMachine 和 station 字段供其他模块使用
# 只有当 stationAssigned 为非空且以 '{' 开头时才尝试解析
if station_assigned_str and isinstance(station_assigned_str, str) and station_assigned_str.strip().startswith('{'):
try:
obj = json.loads(station_assigned_str)
dtm = (obj.get('dtMachine') or '').strip()
station = (obj.get('station') or '').strip()
except Exception as e:
print(f"解析 stationAssigned JSON 失败: {e}, 值: {station_assigned_str}")
dut = {
"SN": row[0] if row[0] else "",
"name": row[1] if row[1] else "",
"project": row[2] if row[2] else "",
"projectType": row[3] if row[3] else "",
"phase": row[4] if row[4] else "",
"weeks": row[5] if row[5] is not None else 0,
"stationAssigned": station_assigned_str, # 保留原始 JSON 字符串(即使为空)
"dtMachine": dtm, # 解析出的机台编码
"station": station, # 解析出的通道编号
"itemOnGoing": row[7] if row[7] else "",
"itemsFinished": row[8] if row[8] is not None else 0,
"status": row[9] if row[9] else "",
"testReq": row[10] if row[10] else "",
"inspector": row[11] if row[11] else "",
"id": row[12] if row[12] is not None else 0,
"description": row[13] if row[13] else "",
"deadLine": row[14] if row[14] else "",
"createdate": row[15] if row[15] else "",
"workOrder": row[16] if row[16] else "",
"direction_codes": row[17] if row[17] else ""
}
self._duts.append(dut)
print(f"加载了 {len(self._duts)} 条 DUT 记录SQLite")
conn.close()
else:
# 如果数据库不存在,使用空列表
self._duts = []
print("数据库文件不存在")
self.dutsChanged.emit()
except Exception as e:
print(f"加载DUT数据时出错: {e}")
self._duts = []
@pyqtProperty('QVariant', notify=dutsChanged)
def duts(self):
"""返回所有DUT数据"""
return self._duts
@pyqtProperty('QVariant', notify=currentDUTChanged)
def currentDUT(self):
"""返回当前选中的DUT"""
return self._current_dut
@currentDUT.setter
def setCurrentDUT(self, dut):
"""设置当前选中的DUT"""
if self._current_dut != dut:
self._current_dut = dut
self.currentDUTChanged.emit()
@pyqtSlot(int, result='QVariant')
def getDUTById(self, dut_id):
"""根据ID获取DUT信息"""
for dut in self._duts:
if dut.get("id") == dut_id:
return dut
return None
@pyqtSlot('QVariant')
def addDUT(self, dut):
"""添加新DUT
会将 dtMachine station 字段合成为 JSON 字符串存入 stationAssigned
虚拟字段dtMachine, station不会传给后台
"""
if dut:
try:
# 合成 stationAssigned JSON 字符串
station_assigned_str = self._compose_station_assigned(dut)
# 准备数据库记录(移除虚拟字段)
db_record = self._prepare_db_record(dut, station_assigned_str)
# 尝试使用 HTTP API
if self.use_http_api:
try:
url = f"{self.api_base_url}/dbTableAccess"
response = requests.post(
url,
params={"table": "dutList"}, # 注意:表名必须是 dutList
json={"action": "insert", "records": [db_record]}, # 使用清理后的记录
timeout=5
)
response.raise_for_status()
result = response.json()
if result.get("status") == "success":
# 重新加载数据
self._load_duts()
return
except Exception as e:
print(f"HTTP API 添加DUT失败: {e},回退到直接访问 SQLite")
# 直接访问 SQLite
if os.path.exists(self.db_path):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 插入新DUT
cursor.execute("""
INSERT INTO dutList
(SN, name, project, projectType, phase, weeks, stationAssigned,
itemOnGoing, itemsFinished, status, testReq, inspector,
description, deadLine, createdate, workOrder, direction_codes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
db_record.get("SN", ""),
db_record.get("name", ""),
db_record.get("project", ""),
db_record.get("projectType", ""),
db_record.get("phase", ""),
db_record.get("weeks", 0),
db_record.get("stationAssigned", ""),
db_record.get("itemOnGoing", ""),
db_record.get("itemsFinished", 0),
db_record.get("status", ""),
db_record.get("testReq", ""),
db_record.get("inspector", ""),
db_record.get("description", ""),
db_record.get("deadLine", ""),
db_record.get("createdate", ""),
db_record.get("workOrder", ""),
db_record.get("direction_codes", "")
))
conn.commit()
conn.close()
# 重新加载数据
self._load_duts()
except Exception as e:
print(f"添加DUT时出错: {e}")
@pyqtSlot(int)
def removeDUT(self, dut_id):
"""删除DUT"""
try:
# 从数据库删除
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dtmgtDb.db')
if os.path.exists(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM dutList WHERE id = ?", (dut_id,))
conn.commit()
conn.close()
# 从内存列表删除
self._duts = [d for d in self._duts if d.get("id") != dut_id]
if self._current_dut and self._current_dut.get("id") == dut_id:
self.setCurrentDUT = None
self.dutsChanged.emit()
except Exception as e:
print(f"删除DUT时出错: {e}")
def _compose_station_assigned(self, dut):
"""将 dtMachine 和 station 字段合成为 JSON 字符串
Args:
dut: DUT 数据字典包含 dtMachine station 字段
Returns:
str: JSON 格式字符串 '{"dtMachine": "SUN-LAB1-TOP-002", "station": "01"}'
"""
dtm = dut.get("dtMachine", "").strip()
station = dut.get("station", "").strip()
if dtm or station:
obj = {"dtMachine": dtm, "station": station}
return json.dumps(obj, ensure_ascii=False)
return ""
def _prepare_db_record(self, dut, station_assigned_str=None):
"""准备数据库记录移除虚拟字段dtMachine, station
Args:
dut: DUT 数据字典
station_assigned_str: 已合成的 stationAssigned JSON 字符串可选
Returns:
dict: 清理后的数据库记录
"""
# 虚拟字段列表(不存在于数据库表中)
virtual_fields = {"dtMachine", "station"}
# 复制 dut 字典并移除虚拟字段
db_record = {k: v for k, v in dut.items() if k not in virtual_fields}
# 如果提供了 stationAssigned 字符串,使用它
if station_assigned_str is not None:
db_record["stationAssigned"] = station_assigned_str
return db_record
@pyqtSlot('QVariant')
def updateDUT(self, dut):
"""更新DUT信息
重要
1. 只更新传入的非 None 字段避免误清空数据
2. 如果传入了 dtMachine station 字段会自动合成 stationAssigned JSON 字符串
3. 虚拟字段dtMachine, station不会传给数据库
"""
if not dut or "id" not in dut:
return
try:
# 如果传入了 dtMachine 或 station合成 stationAssigned
station_assigned_str = None
if "dtMachine" in dut or "station" in dut:
station_assigned_str = self._compose_station_assigned(dut)
dut["stationAssigned"] = station_assigned_str
# 准备数据库记录(移除虚拟字段)
db_record = self._prepare_db_record(dut)
# 尝试使用 HTTP API
if self.use_http_api:
try:
url = f"{self.api_base_url}/dbTableAccess"
response = requests.post(
url,
params={"table": "dutList"}, # 注意:表名必须是 dutList
json={"action": "update", "records": [db_record]}, # 使用清理后的记录
timeout=5
)
response.raise_for_status()
result = response.json()
if result.get("status") == "success":
# 重新加载数据
self._load_duts()
return
except Exception as e:
print(f"HTTP API 更新DUT失败: {e},回退到直接访问 SQLite")
# 直接访问 SQLite
if os.path.exists(self.db_path):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 动态构建 UPDATE 语句,只更新传入的非 None 字段
update_fields = []
update_values = []
# 数据库真实字段(排除虚拟字段)
db_fields = [
"SN", "name", "project", "projectType", "phase",
"weeks", "stationAssigned", "itemOnGoing", "itemsFinished",
"status", "testReq", "inspector", "description",
"deadLine", "createdate", "workOrder", "direction_codes"
]
for field in db_fields:
if field in db_record and db_record[field] is not None:
update_fields.append(f"{field}=?")
# weeks 和 itemsFinished 需要转换为整数
if field in ["weeks", "itemsFinished"]:
update_values.append(int(db_record[field]))
else:
update_values.append(db_record[field])
if update_fields:
update_values.append(dut["id"])
sql = f"UPDATE dutList SET {', '.join(update_fields)} WHERE id=?"
cursor.execute(sql, tuple(update_values))
conn.commit()
conn.close()
# 重新加载数据
self._load_duts()
except Exception as e:
print(f"更新DUT时出错: {e}")
@pyqtSlot(result=int)
def getDUTCount(self):
"""获取DUT总数"""
return len(self._duts)
def getAllDUTs(self):
"""获取所有DUT列表
Returns:
list: 所有DUT数据列表
"""
print(f"getAllDUTs() 调用,当前共 {len(self._duts)} 条记录")
return self._duts
def filterByStatus(self, status):
"""按状态过滤DUT
Args:
status: 状态字符串为空或'全部'时返回全部
Returns:
list: 过滤后的DUT列表
"""
if not status or status == '全部':
return self._duts
return [d for d in self._duts if (d.get('status') or '').strip() == status]