607 lines
24 KiB
Python
607 lines
24 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
系统设置模型,负责系统相关配置在 SQLite 数据库中的读写。
|
|||
|
|
- 机台设置: 表 main.dtMachine
|
|||
|
|
- 样品阶段设置: 表 main.projectPhase
|
|||
|
|
- 用户及权限设置: 表 main.user
|
|||
|
|
|
|||
|
|
目前作为简单的同步模型使用,由各个视图在需要时调用 load_*/save_* 方法。
|
|||
|
|
支持通过全局配置切换直接访问 SQLite 或通过 HTTP API 访问。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sqlite3
|
|||
|
|
import json
|
|||
|
|
import requests
|
|||
|
|
from typing import List, Dict, Any
|
|||
|
|
|
|||
|
|
from PyQt5.QtCore import QObject
|
|||
|
|
|
|||
|
|
# 导入全局配置
|
|||
|
|
try:
|
|||
|
|
import sys
|
|||
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|||
|
|
from config import USE_HTTP_API, HTTP_API_BASE_URL
|
|||
|
|
except ImportError:
|
|||
|
|
USE_HTTP_API = False
|
|||
|
|
HTTP_API_BASE_URL = "http://127.0.0.1:5050"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SystemModel(QObject):
|
|||
|
|
def __init__(self, db_path: str = None, parent=None):
|
|||
|
|
super().__init__(parent)
|
|||
|
|
# 默认使用项目自带的 dtmgtDb.db
|
|||
|
|
if db_path is None:
|
|||
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|||
|
|
db_path = os.path.join(base_dir, "dtmgtDb.db")
|
|||
|
|
self.db_path = db_path
|
|||
|
|
self.use_http_api = USE_HTTP_API
|
|||
|
|
self.api_base_url = HTTP_API_BASE_URL
|
|||
|
|
|
|||
|
|
# 基础工具方法
|
|||
|
|
def _get_conn(self):
|
|||
|
|
return sqlite3.connect(self.db_path)
|
|||
|
|
|
|||
|
|
def _http_get(self, table: str, params: dict = None) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
通过 HTTP API 获取表数据
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
table: 表名
|
|||
|
|
params: 查询参数(可选)
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
数据列表
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
url = f"{self.api_base_url}/dbTableAccess"
|
|||
|
|
query_params = {"table": table}
|
|||
|
|
if params:
|
|||
|
|
query_params.update(params)
|
|||
|
|
|
|||
|
|
response = requests.get(url, params=query_params, timeout=5)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
|
|||
|
|
result = response.json()
|
|||
|
|
return result.get("data", [])
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"HTTP API 获取数据失败: {e},回退到直接访问 SQLite")
|
|||
|
|
# 回退到直接访问
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _http_post(self, table: str, action: str, records: List[Dict[str, Any]]) -> bool:
|
|||
|
|
"""
|
|||
|
|
通过 HTTP API 提交数据变更
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
table: 表名
|
|||
|
|
action: 操作类型(insert/update/delete)
|
|||
|
|
records: 数据记录列表
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
是否成功
|
|||
|
|
"""
|
|||
|
|
import time # 添加性能监控
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
url = f"{self.api_base_url}/dbTableAccess"
|
|||
|
|
params = {"table": table}
|
|||
|
|
data = {
|
|||
|
|
"action": action,
|
|||
|
|
"records": records
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 记录请求准备时间
|
|||
|
|
prepare_time = time.time() - start_time
|
|||
|
|
|
|||
|
|
# 发送请求
|
|||
|
|
request_start = time.time()
|
|||
|
|
response = requests.post(url, params=params, json=data, timeout=5)
|
|||
|
|
request_time = time.time() - request_start
|
|||
|
|
|
|||
|
|
response.raise_for_status()
|
|||
|
|
|
|||
|
|
# 记录响应解析时间
|
|||
|
|
parse_start = time.time()
|
|||
|
|
result = response.json()
|
|||
|
|
parse_time = time.time() - parse_start
|
|||
|
|
|
|||
|
|
total_time = time.time() - start_time
|
|||
|
|
|
|||
|
|
# 性能日志
|
|||
|
|
print(f"[性能] HTTP POST {table}/{action}: 总计 {total_time*1000:.2f}ms (准备 {prepare_time*1000:.2f}ms | 请求 {request_time*1000:.2f}ms | 解析 {parse_time*1000:.2f}ms) | 记录数: {len(records)}")
|
|||
|
|
|
|||
|
|
return result.get("status") == "success"
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"HTTP API 提交数据失败: {e},回退到直接访问 SQLite")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# ---------------------- 机台设置 dtMachine ----------------------
|
|||
|
|
def load_machines(self) -> List[Dict[str, Any]]:
|
|||
|
|
"""加载所有机台信息,自动解析 JSON 字段"""
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
data = self._http_get("dtMachine")
|
|||
|
|
if data is not None:
|
|||
|
|
# HTTP API 返回的数据已经是 dict 格式,直接解析 JSON 字段
|
|||
|
|
for m in data:
|
|||
|
|
# register01~04
|
|||
|
|
for i in range(1, 5):
|
|||
|
|
key = f"register0{i}"
|
|||
|
|
val = m.get(key, "")
|
|||
|
|
if val:
|
|||
|
|
try:
|
|||
|
|
m[key] = json.loads(val) if isinstance(val, str) else val
|
|||
|
|
except Exception:
|
|||
|
|
m[key] = {}
|
|||
|
|
else:
|
|||
|
|
m[key] = {}
|
|||
|
|
# baudrate 容错处理
|
|||
|
|
baud_val = m.get("baudrate", "")
|
|||
|
|
if baud_val:
|
|||
|
|
try:
|
|||
|
|
m["baudrate"] = json.loads(baud_val) if isinstance(baud_val, str) else baud_val
|
|||
|
|
except Exception:
|
|||
|
|
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
|
|||
|
|
else:
|
|||
|
|
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
conn.row_factory = sqlite3.Row
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.execute(
|
|||
|
|
"SELECT SN, label, status, type, color, description, id, "
|
|||
|
|
"station1, station2, station3, station4, "
|
|||
|
|
"status1, status2, status3, status4, testType,"
|
|||
|
|
"com, plcAddress, register01, register02, register03, register04, "
|
|||
|
|
"baudrate, testType FROM dtMachine ORDER BY id ASC"
|
|||
|
|
)
|
|||
|
|
rows = cur.fetchall()
|
|||
|
|
machines = [dict(r) for r in rows]
|
|||
|
|
# 解析 JSON 字段(register01~04 和 baudrate)
|
|||
|
|
for m in machines:
|
|||
|
|
# register01~04
|
|||
|
|
for i in range(1, 5):
|
|||
|
|
key = f"register0{i}"
|
|||
|
|
val = m.get(key, "")
|
|||
|
|
if val:
|
|||
|
|
try:
|
|||
|
|
m[key] = json.loads(val)
|
|||
|
|
except Exception:
|
|||
|
|
m[key] = {}
|
|||
|
|
else:
|
|||
|
|
m[key] = {}
|
|||
|
|
# baudrate 容错处理
|
|||
|
|
baud_val = m.get("baudrate", "")
|
|||
|
|
if baud_val:
|
|||
|
|
try:
|
|||
|
|
m["baudrate"] = json.loads(baud_val)
|
|||
|
|
except Exception:
|
|||
|
|
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
|
|||
|
|
else:
|
|||
|
|
m["baudrate"] = {"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}
|
|||
|
|
return machines
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def save_machines(self, machines: List[Dict[str, Any]], removed_ids: List[int] = None):
|
|||
|
|
"""保存机台信息列表,自动将 JSON 字段序列化为字符串
|
|||
|
|
|
|||
|
|
约定: 传入的每一项如果包含 id 且在库中存在则执行 UPDATE,否则执行 INSERT。
|
|||
|
|
重要:只更新传入的非 None 字段,避免清空未传入的字段(如 PLC 参数)
|
|||
|
|
removed_ids: 需要删除的 id 列表
|
|||
|
|
"""
|
|||
|
|
# 准备数据:将 register01~04 和 baudrate 序列化为 JSON 字符串
|
|||
|
|
prepared_machines = []
|
|||
|
|
for m in machines:
|
|||
|
|
m_copy = m.copy()
|
|||
|
|
# 先把 register01~04 和 baudrate 从 dict/str 转为 JSON 字符串
|
|||
|
|
for i in range(1, 5):
|
|||
|
|
key = f"register0{i}"
|
|||
|
|
val = m_copy.get(key)
|
|||
|
|
if val is None:
|
|||
|
|
# 保持现有值,不更新
|
|||
|
|
continue
|
|||
|
|
elif isinstance(val, dict):
|
|||
|
|
m_copy[key] = json.dumps(val, ensure_ascii=False)
|
|||
|
|
elif isinstance(val, str):
|
|||
|
|
# 已经是字符串,保持不变
|
|||
|
|
pass
|
|||
|
|
else:
|
|||
|
|
m_copy[key] = "{}"
|
|||
|
|
|
|||
|
|
baud_val = m_copy.get("baudrate")
|
|||
|
|
if baud_val is None:
|
|||
|
|
pass
|
|||
|
|
elif isinstance(baud_val, dict):
|
|||
|
|
m_copy["baudrate"] = json.dumps(baud_val, ensure_ascii=False)
|
|||
|
|
elif isinstance(baud_val, str):
|
|||
|
|
pass
|
|||
|
|
else:
|
|||
|
|
m_copy["baudrate"] = json.dumps({"baudrate": 19200, "stopbits": 1, "bytesize": 8, "parity": "E"}, ensure_ascii=False)
|
|||
|
|
|
|||
|
|
prepared_machines.append(m_copy)
|
|||
|
|
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
success = True
|
|||
|
|
|
|||
|
|
# 删除
|
|||
|
|
if removed_ids:
|
|||
|
|
delete_records = [{"id": i} for i in removed_ids]
|
|||
|
|
success = success and self._http_post("dtMachine", "delete", delete_records)
|
|||
|
|
|
|||
|
|
# 区分 INSERT 和 UPDATE
|
|||
|
|
insert_records = [m for m in prepared_machines if not m.get("id")]
|
|||
|
|
update_records = [m for m in prepared_machines if m.get("id")]
|
|||
|
|
|
|||
|
|
success = True
|
|||
|
|
if insert_records:
|
|||
|
|
# INSERT 需要填充默认值
|
|||
|
|
for m in insert_records:
|
|||
|
|
for key in [
|
|||
|
|
"SN", "label", "status", "type", "color", "description",
|
|||
|
|
"station1", "station2", "station3", "station4",
|
|||
|
|
"status1", "status2", "status3", "status4",
|
|||
|
|
"com", "plcAddress", "register01", "register02", "register03", "register04",
|
|||
|
|
"baudrate", "testType"
|
|||
|
|
]:
|
|||
|
|
m.setdefault(key, None)
|
|||
|
|
success = success and self._http_post("dtMachine", "insert", insert_records)
|
|||
|
|
|
|||
|
|
if update_records:
|
|||
|
|
success = success and self._http_post("dtMachine", "update", update_records)
|
|||
|
|
|
|||
|
|
if success:
|
|||
|
|
return
|
|||
|
|
# HTTP API 失败,回退到直接访问
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
# 先删除
|
|||
|
|
if removed_ids:
|
|||
|
|
cur.executemany("DELETE FROM dtMachine WHERE id=?", [(i,) for i in removed_ids])
|
|||
|
|
|
|||
|
|
for m in prepared_machines:
|
|||
|
|
if m.get("id"):
|
|||
|
|
# UPDATE: 只更新传入的非 None 字段
|
|||
|
|
update_fields = []
|
|||
|
|
update_values = []
|
|||
|
|
|
|||
|
|
# 所有可能的字段
|
|||
|
|
all_fields = [
|
|||
|
|
"SN", "label", "status", "type", "color", "description",
|
|||
|
|
"station1", "station2", "station3", "station4",
|
|||
|
|
"status1", "status2", "status3", "status4",
|
|||
|
|
"com", "plcAddress", "register01", "register02", "register03", "register04",
|
|||
|
|
"baudrate", "testType"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for field in all_fields:
|
|||
|
|
if field in m and m[field] is not None:
|
|||
|
|
update_fields.append(f"{field}=?")
|
|||
|
|
update_values.append(m[field])
|
|||
|
|
|
|||
|
|
if update_fields: # 只有当有字段需要更新时才执行
|
|||
|
|
update_values.append(m["id"]) # WHERE 条件的 id
|
|||
|
|
sql = f"UPDATE dtMachine SET {', '.join(update_fields)} WHERE id=?"
|
|||
|
|
cur.execute(sql, tuple(update_values))
|
|||
|
|
else:
|
|||
|
|
# INSERT: 使用默认值填充未传入的字段
|
|||
|
|
for key in [
|
|||
|
|
"SN", "label", "status", "type", "color", "description",
|
|||
|
|
"station1", "station2", "station3", "station4",
|
|||
|
|
"status1", "status2", "status3", "status4",
|
|||
|
|
"com", "plcAddress", "register01", "register02", "register03", "register04",
|
|||
|
|
"baudrate", "testType"
|
|||
|
|
]:
|
|||
|
|
m.setdefault(key, None)
|
|||
|
|
|
|||
|
|
cur.execute(
|
|||
|
|
"""
|
|||
|
|
INSERT INTO dtMachine (
|
|||
|
|
SN, label, status, type, color, description,
|
|||
|
|
station1, station2, station3, station4,
|
|||
|
|
status1, status2, status3, status4,
|
|||
|
|
com, plcAddress, register01, register02, register03, register04,
|
|||
|
|
baudrate, testType
|
|||
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|||
|
|
""",
|
|||
|
|
(
|
|||
|
|
m["SN"], m["label"], m["status"], m["type"], m["color"], m["description"],
|
|||
|
|
m["station1"], m["station2"], m["station3"], m["station4"],
|
|||
|
|
m["status1"], m["status2"], m["status3"], m["status4"],
|
|||
|
|
m["com"], m["plcAddress"], m["register01"], m["register02"], m["register03"], m["register04"],
|
|||
|
|
m["baudrate"], m["testType"],
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
conn.commit()
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def delete_machines(self, delete_records: List[Dict[str, Any]]) -> bool:
|
|||
|
|
"""删除机台记录
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
delete_records: 删除记录列表,每条记录包含 id 字段
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
是否成功
|
|||
|
|
"""
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
success = self._http_post("dtMachine", "delete", delete_records)
|
|||
|
|
if success:
|
|||
|
|
return True
|
|||
|
|
# HTTP API 失败,回退到直接访问
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.executemany("DELETE FROM dtMachine WHERE id=?", [(r["id"],) for r in delete_records])
|
|||
|
|
conn.commit()
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"删除机台记录失败: {e}")
|
|||
|
|
return False
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
# ---------------------- 样品阶段设置 projectPhase ----------------------
|
|||
|
|
def load_project_phases(self) -> List[Dict[str, Any]]:
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
data = self._http_get("projectPhase")
|
|||
|
|
if data is not None:
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
conn.row_factory = sqlite3.Row
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.execute(
|
|||
|
|
"SELECT name, PN, description, id FROM projectPhase ORDER BY id ASC"
|
|||
|
|
)
|
|||
|
|
rows = cur.fetchall()
|
|||
|
|
return [dict(r) for r in rows]
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def save_project_phases(self, phases: List[Dict[str, Any]], removed_ids: List[int]):
|
|||
|
|
"""保存阶段列表
|
|||
|
|
|
|||
|
|
phases: 当前表中的所有行
|
|||
|
|
removed_ids: 被删除的行 id 列表
|
|||
|
|
重要:只更新传入的非 None 字段,避免误清空数据
|
|||
|
|
"""
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
success = True
|
|||
|
|
|
|||
|
|
# 删除
|
|||
|
|
if removed_ids:
|
|||
|
|
delete_records = [{"id": i} for i in removed_ids]
|
|||
|
|
success = success and self._http_post("projectPhase", "delete", delete_records)
|
|||
|
|
|
|||
|
|
# 区分 INSERT 和 UPDATE
|
|||
|
|
insert_records = []
|
|||
|
|
update_records = []
|
|||
|
|
|
|||
|
|
for p in phases:
|
|||
|
|
if p.get("id"):
|
|||
|
|
update_records.append(p)
|
|||
|
|
else:
|
|||
|
|
# INSERT 需要填充默认值
|
|||
|
|
p_copy = p.copy()
|
|||
|
|
for key in ["name", "PN", "description"]:
|
|||
|
|
p_copy.setdefault(key, None)
|
|||
|
|
insert_records.append(p_copy)
|
|||
|
|
|
|||
|
|
if insert_records:
|
|||
|
|
success = success and self._http_post("projectPhase", "insert", insert_records)
|
|||
|
|
if update_records:
|
|||
|
|
success = success and self._http_post("projectPhase", "update", update_records)
|
|||
|
|
|
|||
|
|
if success:
|
|||
|
|
return
|
|||
|
|
# HTTP API 失败,回退到直接访问
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
# 先删除
|
|||
|
|
if removed_ids:
|
|||
|
|
cur.executemany("DELETE FROM projectPhase WHERE id=?", [(i,) for i in removed_ids])
|
|||
|
|
|
|||
|
|
for p in phases:
|
|||
|
|
if p.get("id"):
|
|||
|
|
# UPDATE: 只更新传入的非 None 字段
|
|||
|
|
update_fields = []
|
|||
|
|
update_values = []
|
|||
|
|
|
|||
|
|
for field in ["name", "PN", "description"]:
|
|||
|
|
if field in p and p[field] is not None:
|
|||
|
|
update_fields.append(f"{field}=?")
|
|||
|
|
update_values.append(p[field])
|
|||
|
|
|
|||
|
|
if update_fields:
|
|||
|
|
update_values.append(p["id"])
|
|||
|
|
sql = f"UPDATE projectPhase SET {', '.join(update_fields)} WHERE id=?"
|
|||
|
|
cur.execute(sql, tuple(update_values))
|
|||
|
|
else:
|
|||
|
|
# INSERT: 使用默认值填充未传入的字段
|
|||
|
|
for key in ["name", "PN", "description"]:
|
|||
|
|
p.setdefault(key, None)
|
|||
|
|
cur.execute(
|
|||
|
|
"INSERT INTO projectPhase (name, PN, description) VALUES (?,?,?)",
|
|||
|
|
(p["name"], p["PN"], p["description"]),
|
|||
|
|
)
|
|||
|
|
conn.commit()
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def delete_project_phases(self, delete_records: List[Dict[str, Any]]) -> bool:
|
|||
|
|
"""删除项目阶段记录
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
delete_records: 删除记录列表,每条记录包含 id 字段
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
是否成功
|
|||
|
|
"""
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
success = self._http_post("projectPhase", "delete", delete_records)
|
|||
|
|
if success:
|
|||
|
|
return True
|
|||
|
|
# HTTP API 失败,回退到直接访问
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.executemany("DELETE FROM projectPhase WHERE id=?", [(r["id"],) for r in delete_records])
|
|||
|
|
conn.commit()
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"删除项目阶段记录失败: {e}")
|
|||
|
|
return False
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
# ---------------------- 用户及权限设置 user ----------------------
|
|||
|
|
def load_users(self) -> List[Dict[str, Any]]:
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
data = self._http_get("user")
|
|||
|
|
if data is not None:
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
conn.row_factory = sqlite3.Row
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.execute(
|
|||
|
|
"SELECT username, password, role, auth, userid, description, id FROM user ORDER BY id ASC"
|
|||
|
|
)
|
|||
|
|
rows = cur.fetchall()
|
|||
|
|
return [dict(r) for r in rows]
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def save_users(self, users: List[Dict[str, Any]], removed_ids: List[int]):
|
|||
|
|
"""保存用户列表
|
|||
|
|
|
|||
|
|
重要:只更新传入的非 None 字段,避免误清空数据
|
|||
|
|
"""
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
success = True
|
|||
|
|
|
|||
|
|
# 删除
|
|||
|
|
if removed_ids:
|
|||
|
|
delete_records = [{"id": i} for i in removed_ids]
|
|||
|
|
success = success and self._http_post("user", "delete", delete_records)
|
|||
|
|
|
|||
|
|
# 区分 INSERT 和 UPDATE
|
|||
|
|
insert_records = []
|
|||
|
|
update_records = []
|
|||
|
|
|
|||
|
|
for u in users:
|
|||
|
|
if u.get("id"):
|
|||
|
|
update_records.append(u)
|
|||
|
|
else:
|
|||
|
|
# INSERT 需要填充默认值
|
|||
|
|
u_copy = u.copy()
|
|||
|
|
for key in ["username", "password", "role", "auth", "userid", "description"]:
|
|||
|
|
u_copy.setdefault(key, None)
|
|||
|
|
insert_records.append(u_copy)
|
|||
|
|
|
|||
|
|
if insert_records:
|
|||
|
|
success = success and self._http_post("user", "insert", insert_records)
|
|||
|
|
if update_records:
|
|||
|
|
success = success and self._http_post("user", "update", update_records)
|
|||
|
|
|
|||
|
|
if success:
|
|||
|
|
return
|
|||
|
|
# HTTP API 失败,回退到直接访问
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
if removed_ids:
|
|||
|
|
cur.executemany("DELETE FROM user WHERE id=?", [(i,) for i in removed_ids])
|
|||
|
|
|
|||
|
|
for u in users:
|
|||
|
|
if u.get("id"):
|
|||
|
|
# UPDATE: 只更新传入的非 None 字段
|
|||
|
|
update_fields = []
|
|||
|
|
update_values = []
|
|||
|
|
|
|||
|
|
for field in ["username", "password", "role", "auth", "userid", "description"]:
|
|||
|
|
if field in u and u[field] is not None:
|
|||
|
|
update_fields.append(f"{field}=?")
|
|||
|
|
update_values.append(u[field])
|
|||
|
|
|
|||
|
|
if update_fields:
|
|||
|
|
update_values.append(u["id"])
|
|||
|
|
sql = f"UPDATE user SET {', '.join(update_fields)} WHERE id=?"
|
|||
|
|
cur.execute(sql, tuple(update_values))
|
|||
|
|
else:
|
|||
|
|
# INSERT: 使用默认值填充未传入的字段
|
|||
|
|
for key in ["username", "password", "role", "auth", "userid", "description"]:
|
|||
|
|
u.setdefault(key, None)
|
|||
|
|
cur.execute(
|
|||
|
|
"""
|
|||
|
|
INSERT INTO user (username, password, role, auth, userid, description)
|
|||
|
|
VALUES (?,?,?,?,?,?)
|
|||
|
|
""",
|
|||
|
|
(u["username"], u["password"], u["role"], u["auth"], u["userid"], u["description"]),
|
|||
|
|
)
|
|||
|
|
conn.commit()
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def delete_users(self, delete_records: List[Dict[str, Any]]) -> bool:
|
|||
|
|
"""删除用户记录
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
delete_records: 删除记录列表,每条记录包含 id 字段
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
是否成功
|
|||
|
|
"""
|
|||
|
|
# 尝试使用 HTTP API
|
|||
|
|
if self.use_http_api:
|
|||
|
|
success = self._http_post("user", "delete", delete_records)
|
|||
|
|
if success:
|
|||
|
|
return True
|
|||
|
|
# HTTP API 失败,回退到直接访问
|
|||
|
|
|
|||
|
|
# 直接访问 SQLite
|
|||
|
|
conn = self._get_conn()
|
|||
|
|
try:
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.executemany("DELETE FROM user WHERE id=?", [(r["id"],) for r in delete_records])
|
|||
|
|
conn.commit()
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"删除用户记录失败: {e}")
|
|||
|
|
return False
|
|||
|
|
finally:
|
|||
|
|
conn.close()
|