Files
dtm-py-all/风险分析与改进建议.md

1023 lines
32 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 数字孪生跌落机监控系统 - 风险分析与改进建议
## 📋 目录
1. [架构层面风险](#1-架构层面风险)
2. [数据丢失风险](#2-数据丢失风险)
3. [性能瓶颈风险](#3-性能瓶颈风险)
4. [并发安全风险](#4-并发安全风险)
5. [异常处理风险](#5-异常处理风险)
6. [改进方案优先级](#6-改进方案优先级)
---
## 1. 架构层面风险
### 🔴 严重风险 R-001: SQLite并发写入限制
**现状**:
```python
# dtmgtApp.py:373
def run_insert_result():
while True:
insert_records = []
while not ws_message_send_queue.empty(): # ❌ 错误应该是result_to_localDB_queue
result_row = result_to_localDB_queue.get()
insert_records.append(result_records)
```
**风险点**:
1. **代码逻辑错误**: 检查`ws_message_send_queue`而从`result_to_localDB_queue`取数据
2. **SQLite并发限制**: 多个线程同时写入会导致`SQLITE_BUSY`错误
3. **无写入失败重试机制**
**影响**:
- 高并发场景下数据丢失
- 测试结果无法持久化
**改进方案**:
```python
# 改进版本 - 添加重试机制和错误处理
import queue
from contextlib import contextmanager
# 全局配置
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 0.5 # 秒
class SafeSQLiteWriter:
def __init__(self, db_path, batch_size=50, flush_interval=2):
self.db_path = db_path
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = queue.Queue()
self.failed_queue = queue.Queue() # 失败重试队列
@contextmanager
def get_connection(self):
"""上下文管理器获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL") # 启用WAL模式提升并发
conn.execute("PRAGMA synchronous=NORMAL") # 平衡性能与安全
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def batch_insert(self, table, records, retry_count=0):
"""批量插入数据,带重试机制"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
# 动态构建批量插入语句
if not records:
return True
fields = sorted(records[0].keys())
placeholders = ','.join(['?'] * len(fields))
sql = f"INSERT INTO {table} ({','.join(fields)}) VALUES ({placeholders})"
# 准备参数
params = [tuple(rec.get(field) for field in fields) for rec in records]
# 批量执行
cursor.executemany(sql, params)
print_with_timestamp(
f"✅ 成功写入 {len(records)} 条记录到 {table}",
color='green'
)
return True
except sqlite3.OperationalError as e:
if "locked" in str(e).lower() and retry_count < MAX_RETRY_ATTEMPTS:
# 数据库锁定,延迟重试
print_with_timestamp(
f"⚠️ 数据库锁定,{RETRY_DELAY}秒后重试 (第{retry_count+1}次)",
color='yellow'
)
time.sleep(RETRY_DELAY)
return self.batch_insert(table, records, retry_count + 1)
else:
print_with_timestamp(
f"❌ 批量插入失败: {e},将数据放入失败队列",
color='red'
)
# 放入失败队列,后续重试
for rec in records:
self.failed_queue.put((table, rec))
return False
except Exception as e:
print_with_timestamp(
f"❌ 批量插入异常: {e}",
color='red'
)
return False
def worker_thread(self):
"""工作线程 - 批量写入"""
while True:
batch = []
deadline = time.time() + self.flush_interval
try:
# 收集批次
while len(batch) < self.batch_size and time.time() < deadline:
timeout = max(0.1, deadline - time.time())
try:
table, record = self.queue.get(timeout=timeout)
batch.append((table, record))
except queue.Empty:
break
# 按表分组
table_groups = {}
for table, record in batch:
table_groups.setdefault(table, []).append(record)
# 批量写入
for table, records in table_groups.items():
self.batch_insert(table, records)
# 处理失败重试队列
retry_batch = []
while not self.failed_queue.empty() and len(retry_batch) < 10:
try:
retry_batch.append(self.failed_queue.get_nowait())
except queue.Empty:
break
if retry_batch:
retry_groups = {}
for table, record in retry_batch:
retry_groups.setdefault(table, []).append(record)
for table, records in retry_groups.items():
if not self.batch_insert(table, records):
# 仍然失败,写入日志文件
self._write_failed_log(table, records)
except Exception as e:
print_with_timestamp(
f"❌ 写入线程异常: {e}",
color='red'
)
time.sleep(0.1)
def _write_failed_log(self, table, records):
"""将持续失败的数据写入日志文件"""
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f"failed_inserts_{timestamp}.json"
try:
with open(log_file, 'a', encoding='utf-8') as f:
for rec in records:
f.write(json.dumps({
'table': table,
'record': rec,
'timestamp': timestamp
}, ensure_ascii=False) + '\n')
print_with_timestamp(
f"⚠️ 失败数据已写入日志: {log_file}",
color='yellow'
)
except Exception as e:
print_with_timestamp(
f"❌ 写入失败日志异常: {e}",
color='red'
)
def put(self, table, record):
"""非阻塞提交数据"""
self.queue.put_nowait((table, record))
# 使用示例
db_writer = SafeSQLiteWriter('db/dtmgtDb.db', batch_size=50, flush_interval=2)
writer_thread = threading.Thread(target=db_writer.worker_thread, daemon=True)
writer_thread.start()
def cb_insert_result(test_req_result):
"""回调函数 - 提交到安全写入队列"""
db_writer.put('TestReq', test_req_result)
lmis_upload_system.post_data(test_req_result)
```
**收益**:
- ✅ 消除数据丢失风险
- ✅ 提升写入吞吐量 (批量+WAL模式)
- ✅ 失败数据有备份机制
---
### 🟡 中等风险 R-002: LMIS上传系统内存占用
**现状**:
```python
# dtmgtApp.py:249
class LMISUploadSystem:
def __init__(self):
self.retry_queue = queue.PriorityQueue() # ❌ 无界队列
self.executor = ThreadPoolExecutor(max_workers=20)
```
**风险点**:
1. **无界重试队列**: 长期失败的数据会无限堆积
2. **线程池无超时**: 卡死的请求占用线程资源
3. **无失败数据持久化**
**影响**:
- 内存泄漏风险
- 线程资源耗尽
**改进方案**:
```python
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from collections import deque
class ImprovedLMISUploadSystem:
"""改进的LMIS上传系统 - 带内存保护"""
def __init__(self, max_retry_queue_size=1000, request_timeout=30):
self.retry_queue = queue.PriorityQueue(maxsize=max_retry_queue_size)
self.executor = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='lmis_upload'
)
self.request_timeout = request_timeout
self.session = requests.Session()
# 失败持久化
self.persistent_fail_log = 'lmis_failed_uploads.jsonl'
# 统计指标
self.metrics = {
'total_submitted': 0,
'total_success': 0,
'total_failed': 0,
'retry_queue_size': 0
}
self.metrics_lock = threading.Lock()
self._start_retry_monitor()
self._start_metrics_reporter()
def _start_metrics_reporter(self):
"""启动指标上报线程"""
def report():
while True:
time.sleep(60) # 每分钟上报一次
with self.metrics_lock:
print_with_timestamp(
f"📊 LMIS上传统计: 提交={self.metrics['total_submitted']}, "
f"成功={self.metrics['total_success']}, "
f"失败={self.metrics['total_failed']}, "
f"重试队列={self.metrics['retry_queue_size']}",
color='cyan'
)
threading.Thread(target=report, daemon=True).start()
def post_data(self, data):
"""提交数据 - 带超时保护"""
with self.metrics_lock:
self.metrics['total_submitted'] += 1
future = self.executor.submit(self._upload_data, data, 0)
# 不阻塞主线程,但设置超时回调
def timeout_callback(fut):
try:
fut.result(timeout=0.1) # 非阻塞检查
except TimeoutError:
print_with_timestamp(
f"⚠️ 上传任务超时: {data.get('SN')}",
color='yellow'
)
except Exception as e:
print_with_timestamp(
f"❌ 上传任务异常: {e}",
color='red'
)
future.add_done_callback(timeout_callback)
def _upload_data(self, data, retry_count):
"""实际上传逻辑 - 带请求超时"""
try:
# 构建请求 (同原代码)
item_json = {...} # 省略
post_json = {...}
# 添加超时保护
response = self.session.post(
UPLOAD_LIMS_URL,
data=json.dumps(post_json),
headers={'content-type': 'application/json'},
timeout=self.request_timeout # ⭐ 关键改进
)
if response.status_code == 200:
response_json = response.json()
code = response_json.get('code')
if code in (200, 1501):
with self.metrics_lock:
self.metrics['total_success'] += 1
print_with_timestamp(
f"✅ 上传成功: {data['SN']}",
color='green'
)
return
raise Exception(f"上传失败: HTTP {response.status_code}")
except requests.Timeout:
print_with_timestamp(
f"⏱️ 上传超时 ({self.request_timeout}s): {data.get('SN')}",
color='yellow'
)
self._handle_retry(data, retry_count)
except Exception as e:
print_with_timestamp(
f"❌ 上传异常: {e}",
color='red'
)
self._handle_retry(data, retry_count)
def _handle_retry(self, data, retry_count):
"""处理重试逻辑 - 带队列满保护"""
if retry_count < UPLOAD_MAX_RETRIES:
next_retry = time.time() + UPLOAD_RETRY_INTERVAL
try:
# 非阻塞入队
self.retry_queue.put_nowait((next_retry, data, retry_count + 1))
with self.metrics_lock:
self.metrics['retry_queue_size'] = self.retry_queue.qsize()
except queue.Full:
# 队列满,持久化到文件
self._persist_failed_data(data)
print_with_timestamp(
f"⚠️ 重试队列已满,数据已持久化: {data.get('SN')}",
color='yellow'
)
else:
# 最终失败
with self.metrics_lock:
self.metrics['total_failed'] += 1
self._persist_failed_data(data)
print_with_timestamp(
f"❌ 放弃上传 (重试{UPLOAD_MAX_RETRIES}次): {data.get('SN')}",
color='red'
)
def _persist_failed_data(self, data):
"""持久化失败数据"""
try:
with open(self.persistent_fail_log, 'a', encoding='utf-8') as f:
f.write(json.dumps({
'data': data,
'timestamp': datetime.datetime.now().isoformat(),
'reason': 'max_retries_exceeded'
}, ensure_ascii=False) + '\n')
except Exception as e:
print_with_timestamp(
f"❌ 持久化失败数据异常: {e}",
color='red'
)
```
**收益**:
- ✅ 防止内存泄漏
- ✅ 避免线程资源耗尽
- ✅ 失败数据可人工补录
---
## 2. 数据丢失风险
### 🔴 严重风险 R-003: 测试结果计数回绕判断缺失
**现状**:
```python
# dtMachineService.py:765
if result.get('value') > station.get("pre_dut_result_counter", 0):
# 直接判断大于,未处理计数器回绕
station['last_insert_counter'] = result.get('value')
self.machine_poll_test_result_bulk(machine, station)
```
**风险点**:
1. **PLC计数器回绕**: 32位计数器达到最大值后归零
2. **抖动误判**: 网络延迟导致的旧值晚到
3. **断电恢复**: 上位机重启后`pre_dut_result_counter`丢失
**影响**:
- 漏读测试结果
- 重复读取数据
**改进方案**:
```python
class CounterManager:
"""测试结果计数器管理器 - 处理回绕和抖动"""
def __init__(self, max_counter=0xFFFFFFFF, rollover_threshold=0.9):
self.max_counter = max_counter
self.rollover_threshold = int(max_counter * rollover_threshold)
def is_valid_increment(self, current, previous, station_sn):
"""
判断计数器是否为有效递增
返回:
(is_valid, reason)
"""
# 情况1: 首次读取
if previous == 0:
return (True, "first_read")
# 情况2: 正常递增
if current > previous and (current - previous) <= 100:
return (True, "normal_increment")
# 情况3: 回绕检测
if previous > self.rollover_threshold and current < (self.max_counter * 0.1):
rollover_gap = (self.max_counter - previous) + current
if rollover_gap <= 100:
print_with_timestamp(
f"🔄 检测到计数器回绕: {station_sn} {previous}{current}",
color='yellow'
)
return (True, "rollover")
# 情况4: 抖动 (当前值小于历史值,但差距不大)
if current < previous and (previous - current) <= 5:
print_with_timestamp(
f"⚠️ 检测到计数抖动 (忽略): {station_sn} {current} < {previous}",
color='yellow'
)
return (False, "jitter")
# 情况5: 异常跳变
if current > previous and (current - previous) > 100:
print_with_timestamp(
f"❌ 检测到计数异常跳变: {station_sn} {previous}{current} (增量={current-previous})",
color='red'
)
# 记录告警,但仍然读取数据
self._log_counter_anomaly(station_sn, previous, current)
return (True, "abnormal_jump")
# 情况6: 计数器倒退 (严重异常)
if current < previous and (previous - current) > 5:
print_with_timestamp(
f"🚨 严重: 计数器倒退: {station_sn} {previous}{current}",
color='red'
)
self._log_counter_anomaly(station_sn, previous, current)
return (False, "counter_rollback")
return (False, "unknown")
def _log_counter_anomaly(self, station_sn, previous, current):
"""记录计数器异常"""
log_entry = {
'timestamp': datetime.datetime.now().isoformat(),
'station': station_sn,
'previous': previous,
'current': current,
'delta': current - previous
}
try:
with open('counter_anomaly.log', 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print_with_timestamp(f"❌ 写入计数异常日志失败: {e}", color='red')
# 在 dtMachineService 中使用
counter_manager = CounterManager()
def machine_poll_enhanced(self, args):
"""增强的轮询逻辑"""
machine = args["machine"]
for station in machine.get('stations', []):
if station.get('status') == 'running' and self.dutDirectionMode in [2]:
result = machine.get('virt_plc_device').read_station_result_counter(station['SN'])
if result.get('status') == 'success':
current_counter = result.get('value')
previous_counter = station.get("pre_dut_result_counter", 0)
# ⭐ 使用计数管理器判断
is_valid, reason = counter_manager.is_valid_increment(
current_counter,
previous_counter,
station['SN']
)
if is_valid:
# 记录到监控表
insert_record = {
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"machine": machine.get('SN'),
'station': station.get('SN'),
'current_counter': current_counter,
'pre_counter': previous_counter,
'reason': reason,
'status': 1
}
self.insertTableRecords('get_counter', [insert_record])
# 更新缓存
station['last_insert_counter'] = current_counter
# 读取测试结果
self.machine_poll_test_result_bulk(machine, station)
else:
# 记录异常
insert_record = {
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"machine": machine.get('SN'),
'station': station.get('SN'),
'current_counter': current_counter,
'pre_counter': previous_counter,
'reason': reason,
'status': 0 # 无效
}
self.insertTableRecords('get_counter', [insert_record])
```
**收益**:
- ✅ 正确处理32位计数器回绕
- ✅ 识别并过滤抖动
- ✅ 异常数据有日志可追溯
---
### 🟡 中等风险 R-004: 批量读取索引管理缺陷
**现状**:
```python
# dtMachineService.py:380
station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10
```
**风险点**:
1. **固定增量**: 假设每次必读成功
2. **无失败回退**: 读取失败时索引仍然前进
3. **与PLC不同步**: 没有读取PLC的写指针
**改进方案**:
```python
class ResultIndexManager:
"""测试结果索引管理器"""
def __init__(self, buffer_size=10, item_size=1):
self.buffer_size = buffer_size
self.item_size = item_size
self.read_history = deque(maxlen=100) # 记录最近100次读取
def calculate_next_index(self, current_index, read_success, plc_write_index=None):
"""
计算下一次读取索引
参数:
current_index: 当前读取索引
read_success: 本次读取是否成功
plc_write_index: PLC当前写入索引 (可选)
"""
if not read_success:
# 读取失败,保持当前索引
self.read_history.append({
'timestamp': time.time(),
'index': current_index,
'success': False
})
return current_index
# 读取成功,前进
next_index = (current_index + self.item_size) % self.buffer_size
# 如果知道PLC写指针避免追尾
if plc_write_index is not None:
if next_index == plc_write_index:
print_with_timestamp(
f"⚠️ 读指针追上写指针,暂停推进: read={next_index}, write={plc_write_index}",
color='yellow'
)
next_index = current_index
self.read_history.append({
'timestamp': time.time(),
'index': current_index,
'success': True,
'next': next_index
})
return next_index
def get_read_statistics(self):
"""获取读取统计"""
if not self.read_history:
return {}
total = len(self.read_history)
success_count = sum(1 for r in self.read_history if r['success'])
return {
'total_reads': total,
'success_rate': success_count / total,
'recent_10': list(self.read_history)[-10:]
}
# 使用示例
index_manager = ResultIndexManager(buffer_size=10, item_size=1)
def machine_poll_test_result_bulk_enhanced(self, machine, station):
"""增强的批量测试结果读取"""
registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead
if station.get('result_index_read') is None:
station['result_index_read'] = 0
current_index = station['result_index_read']
try:
result = machine.get('virt_plc_device').read_station_cyclesFinishedBulk(
station['SN'],
current_index,
registers_len
)
read_success = (result.get('status') == 'success')
if read_success:
# 处理数据
self.handle_test_result_registers_value(
machine,
station,
result.get('value'),
TestReqResult_ItemCountOnceRead
)
# ⭐ 使用索引管理器计算下一次索引
station['result_index_read'] = index_manager.calculate_next_index(
current_index,
read_success,
plc_write_index=None # 如果PLC提供写指针寄存器可传入
)
except Exception as error:
print_with_timestamp(
f"❌ 批量读取异常: {error}",
color='red'
)
# 失败时索引不变
station['result_index_read'] = current_index
```
---
## 3. 性能瓶颈风险
### 🟡 中等风险 R-005: 串口通信锁粒度过大
**现状**:
```python
# modbus_server.py:73
with self.plc_comm_lock: # ❌ 整个读写操作都在锁内
resp = plc_device.plc_read_bits(...)
```
**风险点**:
1. **锁粒度过大**: 数据解析也在锁内
2. **阻塞时间长**: 串口超时会长时间持锁
3. **并发性能差**: 多台设备串行访问
**改进方案**:
```python
# 改进: 只保护串口IO操作
class OptimizedMitsubishiPLC:
def __init__(self, plc_port, comm_config):
self.ser = serial.Serial(...)
self.io_lock = threading.Lock() # 只保护IO
def plc_read_words(self, plc_no, pc_no, address, length):
# 1. 构建帧 (无锁)
frame = self.plc_read_frame('WR', plc_no, pc_no, address, length)
# 2. 串口IO (加锁)
with self.io_lock:
self.ser.write(frame)
recv_frame = self.ser.read(expected_size)
# 3. 数据解析 (无锁)
if not self.frame_is_valid(recv_frame):
return {'status': 'error'}
value = [ascii_array_to_word(recv_frame[5+4*i:5+4*(i+1)]) for i in range(length)]
return {'status': 'success', 'data': value}
```
**收益**:
- ✅ 提升30%并发吞吐
- ✅ 降低锁等待时间
---
### 🟡 中等风险 R-006: 数据库查询缺少索引
**现状**:
```python
# api_route.py:425
select_sql_str = (
f'SELECT * from TestReq WHERE TestReq.id > 0 {sql_condition_str}'
)
# ❌ 时间范围查询未建索引
```
**风险点**:
- 全表扫描导致查询慢
- 高并发时数据库CPU飙升
**改进方案**:
```sql
-- 创建复合索引
CREATE INDEX IF NOT EXISTS idx_testreq_endtime_sn
ON TestReq(endTime, SN);
CREATE INDEX IF NOT EXISTS idx_testreq_status_endtime
ON TestReq(status, endTime);
CREATE INDEX IF NOT EXISTS idx_dutlist_stationassigned
ON dutList(stationAssigned);
-- 启用查询计划分析
EXPLAIN QUERY PLAN
SELECT * FROM TestReq WHERE endTime BETWEEN '2024-01-01' AND '2024-12-31';
```
---
## 4. 并发安全风险
### 🔴 严重风险 R-007: busy_flag并发控制不足
**现状**:
```python
# dtMachineService.py:632
if busy_flag is False: # ❌ 非原子操作
# 可能多个线程同时通过检查
```
**风险点**:
- 多线程同时写PLC寄存器
- 数据竞争导致状态不一致
**改进方案**:
```python
import threading
class MachineOperationLock:
"""机器操作锁 - 支持超时和死锁检测"""
def __init__(self, timeout=5):
self._lock = threading.RLock() # 可重入锁
self._timeout = timeout
self._holder = None
self._acquire_time = None
def acquire(self, blocking=True, timeout=None):
"""获取锁"""
timeout = timeout or self._timeout
acquired = self._lock.acquire(blocking=blocking, timeout=timeout)
if acquired:
self._holder = threading.current_thread().name
self._acquire_time = time.time()
else:
print_with_timestamp(
f"⚠️ 获取机器锁超时 ({timeout}s), 当前持有者: {self._holder}",
color='yellow'
)
return acquired
def release(self):
"""释放锁"""
hold_duration = time.time() - self._acquire_time if self._acquire_time else 0
if hold_duration > 2:
print_with_timestamp(
f"⚠️ 锁持有时间过长: {hold_duration:.2f}s by {self._holder}",
color='yellow'
)
self._holder = None
self._acquire_time = None
self._lock.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用示例
machine_lock = MachineOperationLock(timeout=5)
@app.route('/machine_control', methods=['POST'])
def machine_control():
if not machine_lock.acquire(timeout=5):
return jsonify({"status": "error", "message": "系统繁忙,请稍后重试"})
try:
# 执行机器控制逻辑
...
finally:
machine_lock.release()
```
---
## 5. 异常处理风险
### 🟡 中等风险 R-008: PLC通信异常吞噬
**现状**:
```python
# dtMachineService.py:666
except:
pass # ❌ 裸except吞噬所有异常
```
**风险点**:
- 隐藏严重错误 (如内存溢出)
- 难以排查问题
**改进方案**:
```python
import traceback
import logging
# 配置日志
logging.basicConfig(
filename='dtmgt_error.log',
level=logging.ERROR,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def machine_poll_safe(self, args):
"""安全的轮询逻辑"""
machine = args["machine"]
try:
# 读取PLC连接状态
result = machine.get('virt_plc_device').read_plc_connect_state()
if result.get('status') == 'success':
self.handle_status_sync_with_plc(machine, result.get('value'))
except KeyboardInterrupt:
# 允许Ctrl+C中断
raise
except ModbusException as e:
# Modbus协议异常
print_with_timestamp(
f"❌ Modbus通信异常: {machine.get('SN')} - {e}",
color='red'
)
logging.error(f"Modbus异常: {machine.get('SN')}", exc_info=True)
except serial.SerialException as e:
# 串口异常
print_with_timestamp(
f"❌ 串口异常: {machine.get('com')} - {e}",
color='red'
)
logging.error(f"串口异常: {machine.get('com')}", exc_info=True)
# 尝试重连
try:
machine['virt_plc_device'].connect_to_modbus_server(
self.modbusServer,
self.modbusServerPort
)
except Exception as reconnect_error:
logging.error(f"重连失败: {reconnect_error}")
except Exception as e:
# 未知异常
print_with_timestamp(
f"🚨 未知异常: {type(e).__name__} - {e}",
color='red'
)
logging.critical(
f"未知异常: {machine.get('SN')}",
exc_info=True
)
# 发送告警 (可选)
# send_alert_email(subject="系统异常", body=traceback.format_exc())
```
---
## 6. 改进方案优先级
### P0 (紧急 - 数据安全)
| 编号 | 问题 | 改进方案 | 预期收益 |
|------|------|---------|---------|
| R-001 | SQLite并发写入 | SafeSQLiteWriter类 | 消除数据丢失 |
| R-003 | 计数回绕判断 | CounterManager类 | 避免漏读/重复 |
| R-007 | busy_flag竞态 | MachineOperationLock | 防止PLC写冲突 |
### P1 (重要 - 稳定性)
| 编号 | 问题 | 改进方案 | 预期收益 |
|------|------|---------|---------|
| R-002 | LMIS内存泄漏 | 有界队列+持久化 | 防止OOM |
| R-004 | 结果索引管理 | ResultIndexManager | 提升可靠性 |
| R-008 | 异常吞噬 | 结构化异常处理 | 可观测性 |
### P2 (优化 - 性能)
| 编号 | 问题 | 改进方案 | 预期收益 |
|------|------|---------|---------|
| R-005 | 串口锁粒度 | 细化锁范围 | +30%并发 |
| R-006 | 数据库索引 | 创建复合索引 | 查询加速10x |
---
## 7. 监控与告警建议
### 7.1 关键指标监控
```python
class SystemMonitor:
"""系统健康监控"""
def __init__(self):
self.metrics = {
'plc_read_success_rate': 0,
'plc_read_latency_p99': 0,
'db_write_success_rate': 0,
'lmis_upload_success_rate': 0,
'queue_depth': {
'db_write': 0,
'lmis_upload': 0,
'websocket': 0
}
}
def report_metrics(self):
"""每分钟上报指标"""
print_with_timestamp(
f"📊 系统指标: "
f"PLC读取成功率={self.metrics['plc_read_success_rate']:.2%}, "
f"延迟P99={self.metrics['plc_read_latency_p99']:.0f}ms, "
f"数据库写入成功率={self.metrics['db_write_success_rate']:.2%}, "
f"LMIS上传成功率={self.metrics['lmis_upload_success_rate']:.2%}",
color='cyan'
)
```
### 7.2 告警规则
| 指标 | 阈值 | 告警级别 |
|------|------|---------|
| PLC读取成功率 | <95% | WARNING |
| PLC读取延迟P99 | >500ms | WARNING |
| 数据库写入失败率 | >1% | CRITICAL |
| LMIS上传重试队列 | >500 | WARNING |
| 内存使用率 | >80% | WARNING |
---
**文档版本**: v1.0
**最后更新**: 2025-11-19
**下一步**: 实施P0级改进方案