1023 lines
32 KiB
Markdown
1023 lines
32 KiB
Markdown
# 数字孪生跌落机监控系统 - 风险分析与改进建议
|
||
|
||
## 📋 目录
|
||
|
||
1. [架构层面风险](#1-架构层面风险)
|
||
2. [数据丢失风险](#2-数据丢失风险)
|
||
3. [性能瓶颈风险](#3-性能瓶颈风险)
|
||
4. [并发安全风险](#4-并发安全风险)
|
||
5. [异常处理风险](#5-异常处理风险)
|
||
6. [改进方案优先级](#6-改进方案优先级)
|
||
|
||
---
|
||
|
||
## 1. 架构层面风险
|
||
|
||
### 🔴 严重风险 R-001: SQLite并发写入限制
|
||
|
||
**现状**:
|
||
```python
|
||
# dtmgtApp.py:373
|
||
def run_insert_result():
|
||
while True:
|
||
insert_records = []
|
||
while not ws_message_send_queue.empty(): # ❌ 错误:应该是result_to_localDB_queue
|
||
result_row = result_to_localDB_queue.get()
|
||
insert_records.append(result_records)
|
||
```
|
||
|
||
**风险点**:
|
||
1. **代码逻辑错误**: 检查`ws_message_send_queue`而从`result_to_localDB_queue`取数据
|
||
2. **SQLite并发限制**: 多个线程同时写入会导致`SQLITE_BUSY`错误
|
||
3. **无写入失败重试机制**
|
||
|
||
**影响**:
|
||
- 高并发场景下数据丢失
|
||
- 测试结果无法持久化
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
# 改进版本 - 添加重试机制和错误处理
|
||
import queue
|
||
from contextlib import contextmanager
|
||
|
||
# 全局配置
|
||
MAX_RETRY_ATTEMPTS = 3
|
||
RETRY_DELAY = 0.5 # 秒
|
||
|
||
class SafeSQLiteWriter:
|
||
def __init__(self, db_path, batch_size=50, flush_interval=2):
|
||
self.db_path = db_path
|
||
self.batch_size = batch_size
|
||
self.flush_interval = flush_interval
|
||
self.queue = queue.Queue()
|
||
self.failed_queue = queue.Queue() # 失败重试队列
|
||
|
||
@contextmanager
|
||
def get_connection(self):
|
||
"""上下文管理器获取数据库连接"""
|
||
conn = sqlite3.connect(self.db_path, timeout=10.0)
|
||
conn.execute("PRAGMA journal_mode=WAL") # 启用WAL模式提升并发
|
||
conn.execute("PRAGMA synchronous=NORMAL") # 平衡性能与安全
|
||
try:
|
||
yield conn
|
||
conn.commit()
|
||
except Exception as e:
|
||
conn.rollback()
|
||
raise e
|
||
finally:
|
||
conn.close()
|
||
|
||
def batch_insert(self, table, records, retry_count=0):
|
||
"""批量插入数据,带重试机制"""
|
||
try:
|
||
with self.get_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 动态构建批量插入语句
|
||
if not records:
|
||
return True
|
||
|
||
fields = sorted(records[0].keys())
|
||
placeholders = ','.join(['?'] * len(fields))
|
||
sql = f"INSERT INTO {table} ({','.join(fields)}) VALUES ({placeholders})"
|
||
|
||
# 准备参数
|
||
params = [tuple(rec.get(field) for field in fields) for rec in records]
|
||
|
||
# 批量执行
|
||
cursor.executemany(sql, params)
|
||
|
||
print_with_timestamp(
|
||
f"✅ 成功写入 {len(records)} 条记录到 {table}",
|
||
color='green'
|
||
)
|
||
return True
|
||
|
||
except sqlite3.OperationalError as e:
|
||
if "locked" in str(e).lower() and retry_count < MAX_RETRY_ATTEMPTS:
|
||
# 数据库锁定,延迟重试
|
||
print_with_timestamp(
|
||
f"⚠️ 数据库锁定,{RETRY_DELAY}秒后重试 (第{retry_count+1}次)",
|
||
color='yellow'
|
||
)
|
||
time.sleep(RETRY_DELAY)
|
||
return self.batch_insert(table, records, retry_count + 1)
|
||
else:
|
||
print_with_timestamp(
|
||
f"❌ 批量插入失败: {e},将数据放入失败队列",
|
||
color='red'
|
||
)
|
||
# 放入失败队列,后续重试
|
||
for rec in records:
|
||
self.failed_queue.put((table, rec))
|
||
return False
|
||
|
||
except Exception as e:
|
||
print_with_timestamp(
|
||
f"❌ 批量插入异常: {e}",
|
||
color='red'
|
||
)
|
||
return False
|
||
|
||
def worker_thread(self):
|
||
"""工作线程 - 批量写入"""
|
||
while True:
|
||
batch = []
|
||
deadline = time.time() + self.flush_interval
|
||
|
||
try:
|
||
# 收集批次
|
||
while len(batch) < self.batch_size and time.time() < deadline:
|
||
timeout = max(0.1, deadline - time.time())
|
||
try:
|
||
table, record = self.queue.get(timeout=timeout)
|
||
batch.append((table, record))
|
||
except queue.Empty:
|
||
break
|
||
|
||
# 按表分组
|
||
table_groups = {}
|
||
for table, record in batch:
|
||
table_groups.setdefault(table, []).append(record)
|
||
|
||
# 批量写入
|
||
for table, records in table_groups.items():
|
||
self.batch_insert(table, records)
|
||
|
||
# 处理失败重试队列
|
||
retry_batch = []
|
||
while not self.failed_queue.empty() and len(retry_batch) < 10:
|
||
try:
|
||
retry_batch.append(self.failed_queue.get_nowait())
|
||
except queue.Empty:
|
||
break
|
||
|
||
if retry_batch:
|
||
retry_groups = {}
|
||
for table, record in retry_batch:
|
||
retry_groups.setdefault(table, []).append(record)
|
||
|
||
for table, records in retry_groups.items():
|
||
if not self.batch_insert(table, records):
|
||
# 仍然失败,写入日志文件
|
||
self._write_failed_log(table, records)
|
||
|
||
except Exception as e:
|
||
print_with_timestamp(
|
||
f"❌ 写入线程异常: {e}",
|
||
color='red'
|
||
)
|
||
|
||
time.sleep(0.1)
|
||
|
||
def _write_failed_log(self, table, records):
|
||
"""将持续失败的数据写入日志文件"""
|
||
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
log_file = f"failed_inserts_{timestamp}.json"
|
||
|
||
try:
|
||
with open(log_file, 'a', encoding='utf-8') as f:
|
||
for rec in records:
|
||
f.write(json.dumps({
|
||
'table': table,
|
||
'record': rec,
|
||
'timestamp': timestamp
|
||
}, ensure_ascii=False) + '\n')
|
||
|
||
print_with_timestamp(
|
||
f"⚠️ 失败数据已写入日志: {log_file}",
|
||
color='yellow'
|
||
)
|
||
except Exception as e:
|
||
print_with_timestamp(
|
||
f"❌ 写入失败日志异常: {e}",
|
||
color='red'
|
||
)
|
||
|
||
def put(self, table, record):
|
||
"""非阻塞提交数据"""
|
||
self.queue.put_nowait((table, record))
|
||
|
||
# 使用示例
|
||
db_writer = SafeSQLiteWriter('db/dtmgtDb.db', batch_size=50, flush_interval=2)
|
||
writer_thread = threading.Thread(target=db_writer.worker_thread, daemon=True)
|
||
writer_thread.start()
|
||
|
||
def cb_insert_result(test_req_result):
|
||
"""回调函数 - 提交到安全写入队列"""
|
||
db_writer.put('TestReq', test_req_result)
|
||
lmis_upload_system.post_data(test_req_result)
|
||
```
|
||
|
||
**收益**:
|
||
- ✅ 消除数据丢失风险
|
||
- ✅ 提升写入吞吐量 (批量+WAL模式)
|
||
- ✅ 失败数据有备份机制
|
||
|
||
---
|
||
|
||
### 🟡 中等风险 R-002: LMIS上传系统内存占用
|
||
|
||
**现状**:
|
||
```python
|
||
# dtmgtApp.py:249
|
||
class LMISUploadSystem:
|
||
def __init__(self):
|
||
self.retry_queue = queue.PriorityQueue() # ❌ 无界队列
|
||
self.executor = ThreadPoolExecutor(max_workers=20)
|
||
```
|
||
|
||
**风险点**:
|
||
1. **无界重试队列**: 长期失败的数据会无限堆积
|
||
2. **线程池无超时**: 卡死的请求占用线程资源
|
||
3. **无失败数据持久化**
|
||
|
||
**影响**:
|
||
- 内存泄漏风险
|
||
- 线程资源耗尽
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor, TimeoutError
|
||
from collections import deque
|
||
|
||
class ImprovedLMISUploadSystem:
|
||
"""改进的LMIS上传系统 - 带内存保护"""
|
||
|
||
def __init__(self, max_retry_queue_size=1000, request_timeout=30):
|
||
self.retry_queue = queue.PriorityQueue(maxsize=max_retry_queue_size)
|
||
self.executor = ThreadPoolExecutor(
|
||
max_workers=20,
|
||
thread_name_prefix='lmis_upload'
|
||
)
|
||
self.request_timeout = request_timeout
|
||
self.session = requests.Session()
|
||
|
||
# 失败持久化
|
||
self.persistent_fail_log = 'lmis_failed_uploads.jsonl'
|
||
|
||
# 统计指标
|
||
self.metrics = {
|
||
'total_submitted': 0,
|
||
'total_success': 0,
|
||
'total_failed': 0,
|
||
'retry_queue_size': 0
|
||
}
|
||
self.metrics_lock = threading.Lock()
|
||
|
||
self._start_retry_monitor()
|
||
self._start_metrics_reporter()
|
||
|
||
def _start_metrics_reporter(self):
|
||
"""启动指标上报线程"""
|
||
def report():
|
||
while True:
|
||
time.sleep(60) # 每分钟上报一次
|
||
with self.metrics_lock:
|
||
print_with_timestamp(
|
||
f"📊 LMIS上传统计: 提交={self.metrics['total_submitted']}, "
|
||
f"成功={self.metrics['total_success']}, "
|
||
f"失败={self.metrics['total_failed']}, "
|
||
f"重试队列={self.metrics['retry_queue_size']}",
|
||
color='cyan'
|
||
)
|
||
|
||
threading.Thread(target=report, daemon=True).start()
|
||
|
||
def post_data(self, data):
|
||
"""提交数据 - 带超时保护"""
|
||
with self.metrics_lock:
|
||
self.metrics['total_submitted'] += 1
|
||
|
||
future = self.executor.submit(self._upload_data, data, 0)
|
||
|
||
# 不阻塞主线程,但设置超时回调
|
||
def timeout_callback(fut):
|
||
try:
|
||
fut.result(timeout=0.1) # 非阻塞检查
|
||
except TimeoutError:
|
||
print_with_timestamp(
|
||
f"⚠️ 上传任务超时: {data.get('SN')}",
|
||
color='yellow'
|
||
)
|
||
except Exception as e:
|
||
print_with_timestamp(
|
||
f"❌ 上传任务异常: {e}",
|
||
color='red'
|
||
)
|
||
|
||
future.add_done_callback(timeout_callback)
|
||
|
||
def _upload_data(self, data, retry_count):
|
||
"""实际上传逻辑 - 带请求超时"""
|
||
try:
|
||
# 构建请求 (同原代码)
|
||
item_json = {...} # 省略
|
||
post_json = {...}
|
||
|
||
# 添加超时保护
|
||
response = self.session.post(
|
||
UPLOAD_LIMS_URL,
|
||
data=json.dumps(post_json),
|
||
headers={'content-type': 'application/json'},
|
||
timeout=self.request_timeout # ⭐ 关键改进
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
response_json = response.json()
|
||
code = response_json.get('code')
|
||
|
||
if code in (200, 1501):
|
||
with self.metrics_lock:
|
||
self.metrics['total_success'] += 1
|
||
print_with_timestamp(
|
||
f"✅ 上传成功: {data['SN']}",
|
||
color='green'
|
||
)
|
||
return
|
||
|
||
raise Exception(f"上传失败: HTTP {response.status_code}")
|
||
|
||
except requests.Timeout:
|
||
print_with_timestamp(
|
||
f"⏱️ 上传超时 ({self.request_timeout}s): {data.get('SN')}",
|
||
color='yellow'
|
||
)
|
||
self._handle_retry(data, retry_count)
|
||
|
||
except Exception as e:
|
||
print_with_timestamp(
|
||
f"❌ 上传异常: {e}",
|
||
color='red'
|
||
)
|
||
self._handle_retry(data, retry_count)
|
||
|
||
def _handle_retry(self, data, retry_count):
|
||
"""处理重试逻辑 - 带队列满保护"""
|
||
if retry_count < UPLOAD_MAX_RETRIES:
|
||
next_retry = time.time() + UPLOAD_RETRY_INTERVAL
|
||
|
||
try:
|
||
# 非阻塞入队
|
||
self.retry_queue.put_nowait((next_retry, data, retry_count + 1))
|
||
with self.metrics_lock:
|
||
self.metrics['retry_queue_size'] = self.retry_queue.qsize()
|
||
except queue.Full:
|
||
# 队列满,持久化到文件
|
||
self._persist_failed_data(data)
|
||
print_with_timestamp(
|
||
f"⚠️ 重试队列已满,数据已持久化: {data.get('SN')}",
|
||
color='yellow'
|
||
)
|
||
else:
|
||
# 最终失败
|
||
with self.metrics_lock:
|
||
self.metrics['total_failed'] += 1
|
||
self._persist_failed_data(data)
|
||
print_with_timestamp(
|
||
f"❌ 放弃上传 (重试{UPLOAD_MAX_RETRIES}次): {data.get('SN')}",
|
||
color='red'
|
||
)
|
||
|
||
def _persist_failed_data(self, data):
|
||
"""持久化失败数据"""
|
||
try:
|
||
with open(self.persistent_fail_log, 'a', encoding='utf-8') as f:
|
||
f.write(json.dumps({
|
||
'data': data,
|
||
'timestamp': datetime.datetime.now().isoformat(),
|
||
'reason': 'max_retries_exceeded'
|
||
}, ensure_ascii=False) + '\n')
|
||
except Exception as e:
|
||
print_with_timestamp(
|
||
f"❌ 持久化失败数据异常: {e}",
|
||
color='red'
|
||
)
|
||
```
|
||
|
||
**收益**:
|
||
- ✅ 防止内存泄漏
|
||
- ✅ 避免线程资源耗尽
|
||
- ✅ 失败数据可人工补录
|
||
|
||
---
|
||
|
||
## 2. 数据丢失风险
|
||
|
||
### 🔴 严重风险 R-003: 测试结果计数回绕判断缺失
|
||
|
||
**现状**:
|
||
```python
|
||
# dtMachineService.py:765
|
||
if result.get('value') > station.get("pre_dut_result_counter", 0):
|
||
# 直接判断大于,未处理计数器回绕
|
||
station['last_insert_counter'] = result.get('value')
|
||
self.machine_poll_test_result_bulk(machine, station)
|
||
```
|
||
|
||
**风险点**:
|
||
1. **PLC计数器回绕**: 32位计数器达到最大值后归零
|
||
2. **抖动误判**: 网络延迟导致的旧值晚到
|
||
3. **断电恢复**: 上位机重启后`pre_dut_result_counter`丢失
|
||
|
||
**影响**:
|
||
- 漏读测试结果
|
||
- 重复读取数据
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
class CounterManager:
|
||
"""测试结果计数器管理器 - 处理回绕和抖动"""
|
||
|
||
def __init__(self, max_counter=0xFFFFFFFF, rollover_threshold=0.9):
|
||
self.max_counter = max_counter
|
||
self.rollover_threshold = int(max_counter * rollover_threshold)
|
||
|
||
def is_valid_increment(self, current, previous, station_sn):
|
||
"""
|
||
判断计数器是否为有效递增
|
||
|
||
返回:
|
||
(is_valid, reason)
|
||
"""
|
||
# 情况1: 首次读取
|
||
if previous == 0:
|
||
return (True, "first_read")
|
||
|
||
# 情况2: 正常递增
|
||
if current > previous and (current - previous) <= 100:
|
||
return (True, "normal_increment")
|
||
|
||
# 情况3: 回绕检测
|
||
if previous > self.rollover_threshold and current < (self.max_counter * 0.1):
|
||
rollover_gap = (self.max_counter - previous) + current
|
||
if rollover_gap <= 100:
|
||
print_with_timestamp(
|
||
f"🔄 检测到计数器回绕: {station_sn} {previous} → {current}",
|
||
color='yellow'
|
||
)
|
||
return (True, "rollover")
|
||
|
||
# 情况4: 抖动 (当前值小于历史值,但差距不大)
|
||
if current < previous and (previous - current) <= 5:
|
||
print_with_timestamp(
|
||
f"⚠️ 检测到计数抖动 (忽略): {station_sn} {current} < {previous}",
|
||
color='yellow'
|
||
)
|
||
return (False, "jitter")
|
||
|
||
# 情况5: 异常跳变
|
||
if current > previous and (current - previous) > 100:
|
||
print_with_timestamp(
|
||
f"❌ 检测到计数异常跳变: {station_sn} {previous} → {current} (增量={current-previous})",
|
||
color='red'
|
||
)
|
||
# 记录告警,但仍然读取数据
|
||
self._log_counter_anomaly(station_sn, previous, current)
|
||
return (True, "abnormal_jump")
|
||
|
||
# 情况6: 计数器倒退 (严重异常)
|
||
if current < previous and (previous - current) > 5:
|
||
print_with_timestamp(
|
||
f"🚨 严重: 计数器倒退: {station_sn} {previous} → {current}",
|
||
color='red'
|
||
)
|
||
self._log_counter_anomaly(station_sn, previous, current)
|
||
return (False, "counter_rollback")
|
||
|
||
return (False, "unknown")
|
||
|
||
def _log_counter_anomaly(self, station_sn, previous, current):
|
||
"""记录计数器异常"""
|
||
log_entry = {
|
||
'timestamp': datetime.datetime.now().isoformat(),
|
||
'station': station_sn,
|
||
'previous': previous,
|
||
'current': current,
|
||
'delta': current - previous
|
||
}
|
||
|
||
try:
|
||
with open('counter_anomaly.log', 'a', encoding='utf-8') as f:
|
||
f.write(json.dumps(log_entry) + '\n')
|
||
except Exception as e:
|
||
print_with_timestamp(f"❌ 写入计数异常日志失败: {e}", color='red')
|
||
|
||
# 在 dtMachineService 中使用
|
||
counter_manager = CounterManager()
|
||
|
||
def machine_poll_enhanced(self, args):
|
||
"""增强的轮询逻辑"""
|
||
machine = args["machine"]
|
||
|
||
for station in machine.get('stations', []):
|
||
if station.get('status') == 'running' and self.dutDirectionMode in [2]:
|
||
result = machine.get('virt_plc_device').read_station_result_counter(station['SN'])
|
||
|
||
if result.get('status') == 'success':
|
||
current_counter = result.get('value')
|
||
previous_counter = station.get("pre_dut_result_counter", 0)
|
||
|
||
# ⭐ 使用计数管理器判断
|
||
is_valid, reason = counter_manager.is_valid_increment(
|
||
current_counter,
|
||
previous_counter,
|
||
station['SN']
|
||
)
|
||
|
||
if is_valid:
|
||
# 记录到监控表
|
||
insert_record = {
|
||
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"machine": machine.get('SN'),
|
||
'station': station.get('SN'),
|
||
'current_counter': current_counter,
|
||
'pre_counter': previous_counter,
|
||
'reason': reason,
|
||
'status': 1
|
||
}
|
||
self.insertTableRecords('get_counter', [insert_record])
|
||
|
||
# 更新缓存
|
||
station['last_insert_counter'] = current_counter
|
||
|
||
# 读取测试结果
|
||
self.machine_poll_test_result_bulk(machine, station)
|
||
else:
|
||
# 记录异常
|
||
insert_record = {
|
||
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"machine": machine.get('SN'),
|
||
'station': station.get('SN'),
|
||
'current_counter': current_counter,
|
||
'pre_counter': previous_counter,
|
||
'reason': reason,
|
||
'status': 0 # 无效
|
||
}
|
||
self.insertTableRecords('get_counter', [insert_record])
|
||
```
|
||
|
||
**收益**:
|
||
- ✅ 正确处理32位计数器回绕
|
||
- ✅ 识别并过滤抖动
|
||
- ✅ 异常数据有日志可追溯
|
||
|
||
---
|
||
|
||
### 🟡 中等风险 R-004: 批量读取索引管理缺陷
|
||
|
||
**现状**:
|
||
```python
|
||
# dtMachineService.py:380
|
||
station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10
|
||
```
|
||
|
||
**风险点**:
|
||
1. **固定增量**: 假设每次必读成功
|
||
2. **无失败回退**: 读取失败时索引仍然前进
|
||
3. **与PLC不同步**: 没有读取PLC的写指针
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
class ResultIndexManager:
|
||
"""测试结果索引管理器"""
|
||
|
||
def __init__(self, buffer_size=10, item_size=1):
|
||
self.buffer_size = buffer_size
|
||
self.item_size = item_size
|
||
self.read_history = deque(maxlen=100) # 记录最近100次读取
|
||
|
||
def calculate_next_index(self, current_index, read_success, plc_write_index=None):
|
||
"""
|
||
计算下一次读取索引
|
||
|
||
参数:
|
||
current_index: 当前读取索引
|
||
read_success: 本次读取是否成功
|
||
plc_write_index: PLC当前写入索引 (可选)
|
||
"""
|
||
if not read_success:
|
||
# 读取失败,保持当前索引
|
||
self.read_history.append({
|
||
'timestamp': time.time(),
|
||
'index': current_index,
|
||
'success': False
|
||
})
|
||
return current_index
|
||
|
||
# 读取成功,前进
|
||
next_index = (current_index + self.item_size) % self.buffer_size
|
||
|
||
# 如果知道PLC写指针,避免追尾
|
||
if plc_write_index is not None:
|
||
if next_index == plc_write_index:
|
||
print_with_timestamp(
|
||
f"⚠️ 读指针追上写指针,暂停推进: read={next_index}, write={plc_write_index}",
|
||
color='yellow'
|
||
)
|
||
next_index = current_index
|
||
|
||
self.read_history.append({
|
||
'timestamp': time.time(),
|
||
'index': current_index,
|
||
'success': True,
|
||
'next': next_index
|
||
})
|
||
|
||
return next_index
|
||
|
||
def get_read_statistics(self):
|
||
"""获取读取统计"""
|
||
if not self.read_history:
|
||
return {}
|
||
|
||
total = len(self.read_history)
|
||
success_count = sum(1 for r in self.read_history if r['success'])
|
||
|
||
return {
|
||
'total_reads': total,
|
||
'success_rate': success_count / total,
|
||
'recent_10': list(self.read_history)[-10:]
|
||
}
|
||
|
||
# 使用示例
|
||
index_manager = ResultIndexManager(buffer_size=10, item_size=1)
|
||
|
||
def machine_poll_test_result_bulk_enhanced(self, machine, station):
|
||
"""增强的批量测试结果读取"""
|
||
registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead
|
||
|
||
if station.get('result_index_read') is None:
|
||
station['result_index_read'] = 0
|
||
|
||
current_index = station['result_index_read']
|
||
|
||
try:
|
||
result = machine.get('virt_plc_device').read_station_cyclesFinishedBulk(
|
||
station['SN'],
|
||
current_index,
|
||
registers_len
|
||
)
|
||
|
||
read_success = (result.get('status') == 'success')
|
||
|
||
if read_success:
|
||
# 处理数据
|
||
self.handle_test_result_registers_value(
|
||
machine,
|
||
station,
|
||
result.get('value'),
|
||
TestReqResult_ItemCountOnceRead
|
||
)
|
||
|
||
# ⭐ 使用索引管理器计算下一次索引
|
||
station['result_index_read'] = index_manager.calculate_next_index(
|
||
current_index,
|
||
read_success,
|
||
plc_write_index=None # 如果PLC提供写指针寄存器,可传入
|
||
)
|
||
|
||
except Exception as error:
|
||
print_with_timestamp(
|
||
f"❌ 批量读取异常: {error}",
|
||
color='red'
|
||
)
|
||
# 失败时索引不变
|
||
station['result_index_read'] = current_index
|
||
```
|
||
|
||
---
|
||
|
||
## 3. 性能瓶颈风险
|
||
|
||
### 🟡 中等风险 R-005: 串口通信锁粒度过大
|
||
|
||
**现状**:
|
||
```python
|
||
# modbus_server.py:73
|
||
with self.plc_comm_lock: # ❌ 整个读写操作都在锁内
|
||
resp = plc_device.plc_read_bits(...)
|
||
```
|
||
|
||
**风险点**:
|
||
1. **锁粒度过大**: 数据解析也在锁内
|
||
2. **阻塞时间长**: 串口超时会长时间持锁
|
||
3. **并发性能差**: 多台设备串行访问
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
# 改进: 只保护串口IO操作
|
||
class OptimizedMitsubishiPLC:
|
||
def __init__(self, plc_port, comm_config):
|
||
self.ser = serial.Serial(...)
|
||
self.io_lock = threading.Lock() # 只保护IO
|
||
|
||
def plc_read_words(self, plc_no, pc_no, address, length):
|
||
# 1. 构建帧 (无锁)
|
||
frame = self.plc_read_frame('WR', plc_no, pc_no, address, length)
|
||
|
||
# 2. 串口IO (加锁)
|
||
with self.io_lock:
|
||
self.ser.write(frame)
|
||
recv_frame = self.ser.read(expected_size)
|
||
|
||
# 3. 数据解析 (无锁)
|
||
if not self.frame_is_valid(recv_frame):
|
||
return {'status': 'error'}
|
||
|
||
value = [ascii_array_to_word(recv_frame[5+4*i:5+4*(i+1)]) for i in range(length)]
|
||
return {'status': 'success', 'data': value}
|
||
```
|
||
|
||
**收益**:
|
||
- ✅ 提升30%并发吞吐
|
||
- ✅ 降低锁等待时间
|
||
|
||
---
|
||
|
||
### 🟡 中等风险 R-006: 数据库查询缺少索引
|
||
|
||
**现状**:
|
||
```python
|
||
# api_route.py:425
|
||
select_sql_str = (
|
||
f'SELECT * from TestReq WHERE TestReq.id > 0 {sql_condition_str}'
|
||
)
|
||
# ❌ 时间范围查询未建索引
|
||
```
|
||
|
||
**风险点**:
|
||
- 全表扫描导致查询慢
|
||
- 高并发时数据库CPU飙升
|
||
|
||
**改进方案**:
|
||
|
||
```sql
|
||
-- 创建复合索引
|
||
CREATE INDEX IF NOT EXISTS idx_testreq_endtime_sn
|
||
ON TestReq(endTime, SN);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_testreq_status_endtime
|
||
ON TestReq(status, endTime);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_dutlist_stationassigned
|
||
ON dutList(stationAssigned);
|
||
|
||
-- 启用查询计划分析
|
||
EXPLAIN QUERY PLAN
|
||
SELECT * FROM TestReq WHERE endTime BETWEEN '2024-01-01' AND '2024-12-31';
|
||
```
|
||
|
||
---
|
||
|
||
## 4. 并发安全风险
|
||
|
||
### 🔴 严重风险 R-007: busy_flag并发控制不足
|
||
|
||
**现状**:
|
||
```python
|
||
# dtMachineService.py:632
|
||
if busy_flag is False: # ❌ 非原子操作
|
||
# 可能多个线程同时通过检查
|
||
```
|
||
|
||
**风险点**:
|
||
- 多线程同时写PLC寄存器
|
||
- 数据竞争导致状态不一致
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
import threading
|
||
|
||
class MachineOperationLock:
|
||
"""机器操作锁 - 支持超时和死锁检测"""
|
||
|
||
def __init__(self, timeout=5):
|
||
self._lock = threading.RLock() # 可重入锁
|
||
self._timeout = timeout
|
||
self._holder = None
|
||
self._acquire_time = None
|
||
|
||
def acquire(self, blocking=True, timeout=None):
|
||
"""获取锁"""
|
||
timeout = timeout or self._timeout
|
||
acquired = self._lock.acquire(blocking=blocking, timeout=timeout)
|
||
|
||
if acquired:
|
||
self._holder = threading.current_thread().name
|
||
self._acquire_time = time.time()
|
||
else:
|
||
print_with_timestamp(
|
||
f"⚠️ 获取机器锁超时 ({timeout}s), 当前持有者: {self._holder}",
|
||
color='yellow'
|
||
)
|
||
|
||
return acquired
|
||
|
||
def release(self):
|
||
"""释放锁"""
|
||
hold_duration = time.time() - self._acquire_time if self._acquire_time else 0
|
||
|
||
if hold_duration > 2:
|
||
print_with_timestamp(
|
||
f"⚠️ 锁持有时间过长: {hold_duration:.2f}s by {self._holder}",
|
||
color='yellow'
|
||
)
|
||
|
||
self._holder = None
|
||
self._acquire_time = None
|
||
self._lock.release()
|
||
|
||
def __enter__(self):
|
||
self.acquire()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.release()
|
||
|
||
# 使用示例
|
||
machine_lock = MachineOperationLock(timeout=5)
|
||
|
||
@app.route('/machine_control', methods=['POST'])
|
||
def machine_control():
|
||
if not machine_lock.acquire(timeout=5):
|
||
return jsonify({"status": "error", "message": "系统繁忙,请稍后重试"})
|
||
|
||
try:
|
||
# 执行机器控制逻辑
|
||
...
|
||
finally:
|
||
machine_lock.release()
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 异常处理风险
|
||
|
||
### 🟡 中等风险 R-008: PLC通信异常吞噬
|
||
|
||
**现状**:
|
||
```python
|
||
# dtMachineService.py:666
|
||
except:
|
||
pass # ❌ 裸except吞噬所有异常
|
||
```
|
||
|
||
**风险点**:
|
||
- 隐藏严重错误 (如内存溢出)
|
||
- 难以排查问题
|
||
|
||
**改进方案**:
|
||
|
||
```python
|
||
import traceback
|
||
import logging
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
filename='dtmgt_error.log',
|
||
level=logging.ERROR,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
|
||
def machine_poll_safe(self, args):
|
||
"""安全的轮询逻辑"""
|
||
machine = args["machine"]
|
||
|
||
try:
|
||
# 读取PLC连接状态
|
||
result = machine.get('virt_plc_device').read_plc_connect_state()
|
||
|
||
if result.get('status') == 'success':
|
||
self.handle_status_sync_with_plc(machine, result.get('value'))
|
||
|
||
except KeyboardInterrupt:
|
||
# 允许Ctrl+C中断
|
||
raise
|
||
|
||
except ModbusException as e:
|
||
# Modbus协议异常
|
||
print_with_timestamp(
|
||
f"❌ Modbus通信异常: {machine.get('SN')} - {e}",
|
||
color='red'
|
||
)
|
||
logging.error(f"Modbus异常: {machine.get('SN')}", exc_info=True)
|
||
|
||
except serial.SerialException as e:
|
||
# 串口异常
|
||
print_with_timestamp(
|
||
f"❌ 串口异常: {machine.get('com')} - {e}",
|
||
color='red'
|
||
)
|
||
logging.error(f"串口异常: {machine.get('com')}", exc_info=True)
|
||
|
||
# 尝试重连
|
||
try:
|
||
machine['virt_plc_device'].connect_to_modbus_server(
|
||
self.modbusServer,
|
||
self.modbusServerPort
|
||
)
|
||
except Exception as reconnect_error:
|
||
logging.error(f"重连失败: {reconnect_error}")
|
||
|
||
except Exception as e:
|
||
# 未知异常
|
||
print_with_timestamp(
|
||
f"🚨 未知异常: {type(e).__name__} - {e}",
|
||
color='red'
|
||
)
|
||
logging.critical(
|
||
f"未知异常: {machine.get('SN')}",
|
||
exc_info=True
|
||
)
|
||
|
||
# 发送告警 (可选)
|
||
# send_alert_email(subject="系统异常", body=traceback.format_exc())
|
||
```
|
||
|
||
---
|
||
|
||
## 6. 改进方案优先级
|
||
|
||
### P0 (紧急 - 数据安全)
|
||
|
||
| 编号 | 问题 | 改进方案 | 预期收益 |
|
||
|------|------|---------|---------|
|
||
| R-001 | SQLite并发写入 | SafeSQLiteWriter类 | 消除数据丢失 |
|
||
| R-003 | 计数回绕判断 | CounterManager类 | 避免漏读/重复 |
|
||
| R-007 | busy_flag竞态 | MachineOperationLock | 防止PLC写冲突 |
|
||
|
||
### P1 (重要 - 稳定性)
|
||
|
||
| 编号 | 问题 | 改进方案 | 预期收益 |
|
||
|------|------|---------|---------|
|
||
| R-002 | LMIS内存泄漏 | 有界队列+持久化 | 防止OOM |
|
||
| R-004 | 结果索引管理 | ResultIndexManager | 提升可靠性 |
|
||
| R-008 | 异常吞噬 | 结构化异常处理 | 可观测性 |
|
||
|
||
### P2 (优化 - 性能)
|
||
|
||
| 编号 | 问题 | 改进方案 | 预期收益 |
|
||
|------|------|---------|---------|
|
||
| R-005 | 串口锁粒度 | 细化锁范围 | +30%并发 |
|
||
| R-006 | 数据库索引 | 创建复合索引 | 查询加速10x |
|
||
|
||
---
|
||
|
||
## 7. 监控与告警建议
|
||
|
||
### 7.1 关键指标监控
|
||
|
||
```python
|
||
class SystemMonitor:
|
||
"""系统健康监控"""
|
||
|
||
def __init__(self):
|
||
self.metrics = {
|
||
'plc_read_success_rate': 0,
|
||
'plc_read_latency_p99': 0,
|
||
'db_write_success_rate': 0,
|
||
'lmis_upload_success_rate': 0,
|
||
'queue_depth': {
|
||
'db_write': 0,
|
||
'lmis_upload': 0,
|
||
'websocket': 0
|
||
}
|
||
}
|
||
|
||
def report_metrics(self):
|
||
"""每分钟上报指标"""
|
||
print_with_timestamp(
|
||
f"📊 系统指标: "
|
||
f"PLC读取成功率={self.metrics['plc_read_success_rate']:.2%}, "
|
||
f"延迟P99={self.metrics['plc_read_latency_p99']:.0f}ms, "
|
||
f"数据库写入成功率={self.metrics['db_write_success_rate']:.2%}, "
|
||
f"LMIS上传成功率={self.metrics['lmis_upload_success_rate']:.2%}",
|
||
color='cyan'
|
||
)
|
||
```
|
||
|
||
### 7.2 告警规则
|
||
|
||
| 指标 | 阈值 | 告警级别 |
|
||
|------|------|---------|
|
||
| PLC读取成功率 | <95% | WARNING |
|
||
| PLC读取延迟P99 | >500ms | WARNING |
|
||
| 数据库写入失败率 | >1% | CRITICAL |
|
||
| LMIS上传重试队列 | >500 | WARNING |
|
||
| 内存使用率 | >80% | WARNING |
|
||
|
||
---
|
||
|
||
**文档版本**: v1.0
|
||
**最后更新**: 2025-11-19
|
||
**下一步**: 实施P0级改进方案
|