Files
dtm-py-all/UI/models/system_model.py

607 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系统设置模型,负责系统相关配置在 SQLite 数据库中的读写。
- 机台设置: 表 main.dtMachine
- 样品阶段设置: 表 main.projectPhase
- 用户及权限设置: 表 main.user
目前作为简单的同步模型使用,由各个视图在需要时调用 load_*/save_* 方法。
支持通过全局配置切换直接访问 SQLite 或通过 HTTP API 访问。
"""
import os
import sqlite3
import json
import requests
from typing import List, Dict, Any
from PyQt5.QtCore import QObject
# 导入全局配置
try:
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import USE_HTTP_API, HTTP_API_BASE_URL
except ImportError:
USE_HTTP_API = False
HTTP_API_BASE_URL = "http://127.0.0.1:5050"
class SystemModel(QObject):
def __init__(self, db_path: str = None, parent=None):
super().__init__(parent)
# 默认使用项目自带的 dtmgtDb.db
if db_path is None:
base_dir = os.path.dirname(os.path.abspath(__file__))
db_path = os.path.join(base_dir, "dtmgtDb.db")
self.db_path = db_path
self.use_http_api = USE_HTTP_API
self.api_base_url = HTTP_API_BASE_URL
# 基础工具方法
def _get_conn(self):
return sqlite3.connect(self.db_path)
def _http_get(self, table: str, params: dict = None) -> List[Dict[str, Any]]:
"""
通过 HTTP API 获取表数据
参数:
table: 表名
params: 查询参数(可选)
返回:
数据列表
"""
try:
url = f"{self.api_base_url}/dbTableAccess"
query_params = {"table": table}
if params:
query_params.update(params)
response = requests.get(url, params=query_params, timeout=5)
response.raise_for_status()
result = response.json()
return result.get("data", [])
except Exception as e:
print(f"HTTP API 获取数据失败: {e},回退到直接访问 SQLite")
# 回退到直接访问
return None
def _http_post(self, table: str, action: str, records: List[Dict[str, Any]]) -> bool:
"""
通过 HTTP API 提交数据变更
参数:
table: 表名
action: 操作类型insert/update/delete
records: 数据记录列表
返回:
是否成功
"""
import time # 添加性能监控
try:
start_time = time.time()
url = f"{self.api_base_url}/dbTableAccess"
params = {"table": table}
data = {
"action": action,
"records": records
}
# 记录请求准备时间
prepare_time = time.time() - start_time
# 发送请求
request_start = time.time()
response = requests.post(url, params=params, json=data, timeout=5)
request_time = time.time() - request_start
response.raise_for_status()
# 记录响应解析时间
parse_start = time.time()
result = response.json()
parse_time = time.time() - parse_start
total_time = time.time() - start_time
# 性能日志
print(f"[性能] HTTP POST {table}/{action}: 总计 {total_time*1000:.2f}ms (准备 {prepare_time*1000:.2f}ms | 请求 {request_time*1000:.2f}ms | 解析 {parse_time*1000:.2f}ms) | 记录数: {len(records)}")
return result.get("status") == "success"
except Exception as e:
print(f"HTTP API 提交数据失败: {e},回退到直接访问 SQLite")
return False
# ---------------------- 机台设置 dtMachine ----------------------
def load_machines(self) -> List[Dict[str, Any]]:
"""加载所有机台信息,自动解析 JSON 字段"""
# 尝试使用 HTTP API
if self.use_http_api:
data = self._http_get("dtMachine")
if data is not None:
# HTTP API 返回的数据已经是 dict 格式,直接解析 JSON 字段
for m in data:
# register01~04
for i in range(1, 5):
key = f"register0{i}"
val = m.get(key, "")
if val:
try:
m[key] = json.loads(val) if isinstance(val, str) else val
except Exception:
m[key] = {}
else:
m[key] = {}
# baudrate 容错处理
baud_val = m.get("baudrate", "")
if baud_val:
try:
m["baudrate"] = json.loads(baud_val) if isinstance(baud_val, str) else baud_val
except Exception:
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
else:
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
return data
# 直接访问 SQLite
conn = self._get_conn()
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT SN, label, status, type, color, description, id, "
"station1, station2, station3, station4, "
"status1, status2, status3, status4, testType,"
"com, plcAddress, register01, register02, register03, register04, "
"baudrate, testType FROM dtMachine ORDER BY id ASC"
)
rows = cur.fetchall()
machines = [dict(r) for r in rows]
# 解析 JSON 字段register01~04 和 baudrate
for m in machines:
# register01~04
for i in range(1, 5):
key = f"register0{i}"
val = m.get(key, "")
if val:
try:
m[key] = json.loads(val)
except Exception:
m[key] = {}
else:
m[key] = {}
# baudrate 容错处理
baud_val = m.get("baudrate", "")
if baud_val:
try:
m["baudrate"] = json.loads(baud_val)
except Exception:
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
else:
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
return machines
finally:
conn.close()
def save_machines(self, machines: List[Dict[str, Any]], removed_ids: List[int] = None):
"""保存机台信息列表,自动将 JSON 字段序列化为字符串
约定: 传入的每一项如果包含 id 且在库中存在则执行 UPDATE否则执行 INSERT。
重要:只更新传入的非 None 字段,避免清空未传入的字段(如 PLC 参数)
removed_ids: 需要删除的 id 列表
"""
# 准备数据:将 register01~04 和 baudrate 序列化为 JSON 字符串
prepared_machines = []
for m in machines:
m_copy = m.copy()
# 先把 register01~04 和 baudrate 从 dict/str 转为 JSON 字符串
for i in range(1, 5):
key = f"register0{i}"
val = m_copy.get(key)
if val is None:
# 保持现有值,不更新
continue
elif isinstance(val, dict):
m_copy[key] = json.dumps(val, ensure_ascii=False)
elif isinstance(val, str):
# 已经是字符串,保持不变
pass
else:
m_copy[key] = "{}"
baud_val = m_copy.get("baudrate")
if baud_val is None:
pass
elif isinstance(baud_val, dict):
m_copy["baudrate"] = json.dumps(baud_val, ensure_ascii=False)
elif isinstance(baud_val, str):
pass
else:
m_copy["baudrate"] = json.dumps({"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}, ensure_ascii=False)
prepared_machines.append(m_copy)
# 尝试使用 HTTP API
if self.use_http_api:
success = True
# 删除
if removed_ids:
delete_records = [{"id": i} for i in removed_ids]
success = success and self._http_post("dtMachine", "delete", delete_records)
# 区分 INSERT 和 UPDATE
insert_records = [m for m in prepared_machines if not m.get("id")]
update_records = [m for m in prepared_machines if m.get("id")]
success = True
if insert_records:
# INSERT 需要填充默认值
for m in insert_records:
for key in [
"SN", "label", "status", "type", "color", "description",
"station1", "station2", "station3", "station4",
"status1", "status2", "status3", "status4",
"com", "plcAddress", "register01", "register02", "register03", "register04",
"baudrate", "testType"
]:
m.setdefault(key, None)
success = success and self._http_post("dtMachine", "insert", insert_records)
if update_records:
success = success and self._http_post("dtMachine", "update", update_records)
if success:
return
# HTTP API 失败,回退到直接访问
# 直接访问 SQLite
conn = self._get_conn()
try:
cur = conn.cursor()
# 先删除
if removed_ids:
cur.executemany("DELETE FROM dtMachine WHERE id=?", [(i,) for i in removed_ids])
for m in prepared_machines:
if m.get("id"):
# UPDATE: 只更新传入的非 None 字段
update_fields = []
update_values = []
# 所有可能的字段
all_fields = [
"SN", "label", "status", "type", "color", "description",
"station1", "station2", "station3", "station4",
"status1", "status2", "status3", "status4",
"com", "plcAddress", "register01", "register02", "register03", "register04",
"baudrate", "testType"
]
for field in all_fields:
if field in m and m[field] is not None:
update_fields.append(f"{field}=?")
update_values.append(m[field])
if update_fields: # 只有当有字段需要更新时才执行
update_values.append(m["id"]) # WHERE 条件的 id
sql = f"UPDATE dtMachine SET {', '.join(update_fields)} WHERE id=?"
cur.execute(sql, tuple(update_values))
else:
# INSERT: 使用默认值填充未传入的字段
for key in [
"SN", "label", "status", "type", "color", "description",
"station1", "station2", "station3", "station4",
"status1", "status2", "status3", "status4",
"com", "plcAddress", "register01", "register02", "register03", "register04",
"baudrate", "testType"
]:
m.setdefault(key, None)
cur.execute(
"""
INSERT INTO dtMachine (
SN, label, status, type, color, description,
station1, station2, station3, station4,
status1, status2, status3, status4,
com, plcAddress, register01, register02, register03, register04,
baudrate, testType
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
(
m["SN"], m["label"], m["status"], m["type"], m["color"], m["description"],
m["station1"], m["station2"], m["station3"], m["station4"],
m["status1"], m["status2"], m["status3"], m["status4"],
m["com"], m["plcAddress"], m["register01"], m["register02"], m["register03"], m["register04"],
m["baudrate"], m["testType"],
),
)
conn.commit()
finally:
conn.close()
def delete_machines(self, delete_records: List[Dict[str, Any]]) -> bool:
"""删除机台记录
参数:
delete_records: 删除记录列表,每条记录包含 id 字段
返回:
是否成功
"""
# 尝试使用 HTTP API
if self.use_http_api:
success = self._http_post("dtMachine", "delete", delete_records)
if success:
return True
# HTTP API 失败,回退到直接访问
# 直接访问 SQLite
conn = self._get_conn()
try:
cur = conn.cursor()
cur.executemany("DELETE FROM dtMachine WHERE id=?", [(r["id"],) for r in delete_records])
conn.commit()
return True
except Exception as e:
print(f"删除机台记录失败: {e}")
return False
finally:
conn.close()
# ---------------------- 样品阶段设置 projectPhase ----------------------
def load_project_phases(self) -> List[Dict[str, Any]]:
# 尝试使用 HTTP API
if self.use_http_api:
data = self._http_get("projectPhase")
if data is not None:
return data
# 直接访问 SQLite
conn = self._get_conn()
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT name, PN, description, id FROM projectPhase ORDER BY id ASC"
)
rows = cur.fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def save_project_phases(self, phases: List[Dict[str, Any]], removed_ids: List[int]):
"""保存阶段列表
phases: 当前表中的所有行
removed_ids: 被删除的行 id 列表
重要:只更新传入的非 None 字段,避免误清空数据
"""
# 尝试使用 HTTP API
if self.use_http_api:
success = True
# 删除
if removed_ids:
delete_records = [{"id": i} for i in removed_ids]
success = success and self._http_post("projectPhase", "delete", delete_records)
# 区分 INSERT 和 UPDATE
insert_records = []
update_records = []
for p in phases:
if p.get("id"):
update_records.append(p)
else:
# INSERT 需要填充默认值
p_copy = p.copy()
for key in ["name", "PN", "description"]:
p_copy.setdefault(key, None)
insert_records.append(p_copy)
if insert_records:
success = success and self._http_post("projectPhase", "insert", insert_records)
if update_records:
success = success and self._http_post("projectPhase", "update", update_records)
if success:
return
# HTTP API 失败,回退到直接访问
# 直接访问 SQLite
conn = self._get_conn()
try:
cur = conn.cursor()
# 先删除
if removed_ids:
cur.executemany("DELETE FROM projectPhase WHERE id=?", [(i,) for i in removed_ids])
for p in phases:
if p.get("id"):
# UPDATE: 只更新传入的非 None 字段
update_fields = []
update_values = []
for field in ["name", "PN", "description"]:
if field in p and p[field] is not None:
update_fields.append(f"{field}=?")
update_values.append(p[field])
if update_fields:
update_values.append(p["id"])
sql = f"UPDATE projectPhase SET {', '.join(update_fields)} WHERE id=?"
cur.execute(sql, tuple(update_values))
else:
# INSERT: 使用默认值填充未传入的字段
for key in ["name", "PN", "description"]:
p.setdefault(key, None)
cur.execute(
"INSERT INTO projectPhase (name, PN, description) VALUES (?,?,?)",
(p["name"], p["PN"], p["description"]),
)
conn.commit()
finally:
conn.close()
def delete_project_phases(self, delete_records: List[Dict[str, Any]]) -> bool:
"""删除项目阶段记录
参数:
delete_records: 删除记录列表,每条记录包含 id 字段
返回:
是否成功
"""
# 尝试使用 HTTP API
if self.use_http_api:
success = self._http_post("projectPhase", "delete", delete_records)
if success:
return True
# HTTP API 失败,回退到直接访问
# 直接访问 SQLite
conn = self._get_conn()
try:
cur = conn.cursor()
cur.executemany("DELETE FROM projectPhase WHERE id=?", [(r["id"],) for r in delete_records])
conn.commit()
return True
except Exception as e:
print(f"删除项目阶段记录失败: {e}")
return False
finally:
conn.close()
# ---------------------- 用户及权限设置 user ----------------------
def load_users(self) -> List[Dict[str, Any]]:
# 尝试使用 HTTP API
if self.use_http_api:
data = self._http_get("user")
if data is not None:
return data
# 直接访问 SQLite
conn = self._get_conn()
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT username, password, role, auth, userid, description, id FROM user ORDER BY id ASC"
)
rows = cur.fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def save_users(self, users: List[Dict[str, Any]], removed_ids: List[int]):
"""保存用户列表
重要:只更新传入的非 None 字段,避免误清空数据
"""
# 尝试使用 HTTP API
if self.use_http_api:
success = True
# 删除
if removed_ids:
delete_records = [{"id": i} for i in removed_ids]
success = success and self._http_post("user", "delete", delete_records)
# 区分 INSERT 和 UPDATE
insert_records = []
update_records = []
for u in users:
if u.get("id"):
update_records.append(u)
else:
# INSERT 需要填充默认值
u_copy = u.copy()
for key in ["username", "password", "role", "auth", "userid", "description"]:
u_copy.setdefault(key, None)
insert_records.append(u_copy)
if insert_records:
success = success and self._http_post("user", "insert", insert_records)
if update_records:
success = success and self._http_post("user", "update", update_records)
if success:
return
# HTTP API 失败,回退到直接访问
# 直接访问 SQLite
conn = self._get_conn()
try:
cur = conn.cursor()
if removed_ids:
cur.executemany("DELETE FROM user WHERE id=?", [(i,) for i in removed_ids])
for u in users:
if u.get("id"):
# UPDATE: 只更新传入的非 None 字段
update_fields = []
update_values = []
for field in ["username", "password", "role", "auth", "userid", "description"]:
if field in u and u[field] is not None:
update_fields.append(f"{field}=?")
update_values.append(u[field])
if update_fields:
update_values.append(u["id"])
sql = f"UPDATE user SET {', '.join(update_fields)} WHERE id=?"
cur.execute(sql, tuple(update_values))
else:
# INSERT: 使用默认值填充未传入的字段
for key in ["username", "password", "role", "auth", "userid", "description"]:
u.setdefault(key, None)
cur.execute(
"""
INSERT INTO user (username, password, role, auth, userid, description)
VALUES (?,?,?,?,?,?)
""",
(u["username"], u["password"], u["role"], u["auth"], u["userid"], u["description"]),
)
conn.commit()
finally:
conn.close()
def delete_users(self, delete_records: List[Dict[str, Any]]) -> bool:
"""删除用户记录
参数:
delete_records: 删除记录列表,每条记录包含 id 字段
返回:
是否成功
"""
# 尝试使用 HTTP API
if self.use_http_api:
success = self._http_post("user", "delete", delete_records)
if success:
return True
# HTTP API 失败,回退到直接访问
# 直接访问 SQLite
conn = self._get_conn()
try:
cur = conn.cursor()
cur.executemany("DELETE FROM user WHERE id=?", [(r["id"],) for r in delete_records])
conn.commit()
return True
except Exception as e:
print(f"删除用户记录失败: {e}")
return False
finally:
conn.close()