# 数字孪生跌落机监控系统 - 风险分析与改进建议 ## 📋 目录 1. [架构层面风险](#1-架构层面风险) 2. [数据丢失风险](#2-数据丢失风险) 3. [性能瓶颈风险](#3-性能瓶颈风险) 4. [并发安全风险](#4-并发安全风险) 5. [异常处理风险](#5-异常处理风险) 6. [改进方案优先级](#6-改进方案优先级) --- ## 1. 架构层面风险 ### 🔴 严重风险 R-001: SQLite并发写入限制 **现状**: ```python # dtmgtApp.py:373 def run_insert_result(): while True: insert_records = [] while not ws_message_send_queue.empty(): # ❌ 错误:应该是result_to_localDB_queue result_row = result_to_localDB_queue.get() insert_records.append(result_records) ``` **风险点**: 1. **代码逻辑错误**: 检查`ws_message_send_queue`而从`result_to_localDB_queue`取数据 2. **SQLite并发限制**: 多个线程同时写入会导致`SQLITE_BUSY`错误 3. **无写入失败重试机制** **影响**: - 高并发场景下数据丢失 - 测试结果无法持久化 **改进方案**: ```python # 改进版本 - 添加重试机制和错误处理 import queue from contextlib import contextmanager # 全局配置 MAX_RETRY_ATTEMPTS = 3 RETRY_DELAY = 0.5 # 秒 class SafeSQLiteWriter: def __init__(self, db_path, batch_size=50, flush_interval=2): self.db_path = db_path self.batch_size = batch_size self.flush_interval = flush_interval self.queue = queue.Queue() self.failed_queue = queue.Queue() # 失败重试队列 @contextmanager def get_connection(self): """上下文管理器获取数据库连接""" conn = sqlite3.connect(self.db_path, timeout=10.0) conn.execute("PRAGMA journal_mode=WAL") # 启用WAL模式提升并发 conn.execute("PRAGMA synchronous=NORMAL") # 平衡性能与安全 try: yield conn conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def batch_insert(self, table, records, retry_count=0): """批量插入数据,带重试机制""" try: with self.get_connection() as conn: cursor = conn.cursor() # 动态构建批量插入语句 if not records: return True fields = sorted(records[0].keys()) placeholders = ','.join(['?'] * len(fields)) sql = f"INSERT INTO {table} ({','.join(fields)}) VALUES ({placeholders})" # 准备参数 params = [tuple(rec.get(field) for field in fields) for rec in records] # 批量执行 cursor.executemany(sql, params) print_with_timestamp( f"✅ 成功写入 {len(records)} 条记录到 {table}", color='green' ) return True except sqlite3.OperationalError as e: if "locked" in str(e).lower() and retry_count < MAX_RETRY_ATTEMPTS: # 数据库锁定,延迟重试 print_with_timestamp( f"⚠️ 数据库锁定,{RETRY_DELAY}秒后重试 (第{retry_count+1}次)", color='yellow' ) time.sleep(RETRY_DELAY) return self.batch_insert(table, records, retry_count + 1) else: print_with_timestamp( f"❌ 批量插入失败: {e},将数据放入失败队列", color='red' ) # 放入失败队列,后续重试 for rec in records: self.failed_queue.put((table, rec)) return False except Exception as e: print_with_timestamp( f"❌ 批量插入异常: {e}", color='red' ) return False def worker_thread(self): """工作线程 - 批量写入""" while True: batch = [] deadline = time.time() + self.flush_interval try: # 收集批次 while len(batch) < self.batch_size and time.time() < deadline: timeout = max(0.1, deadline - time.time()) try: table, record = self.queue.get(timeout=timeout) batch.append((table, record)) except queue.Empty: break # 按表分组 table_groups = {} for table, record in batch: table_groups.setdefault(table, []).append(record) # 批量写入 for table, records in table_groups.items(): self.batch_insert(table, records) # 处理失败重试队列 retry_batch = [] while not self.failed_queue.empty() and len(retry_batch) < 10: try: retry_batch.append(self.failed_queue.get_nowait()) except queue.Empty: break if retry_batch: retry_groups = {} for table, record in retry_batch: retry_groups.setdefault(table, []).append(record) for table, records in retry_groups.items(): if not self.batch_insert(table, records): # 仍然失败,写入日志文件 self._write_failed_log(table, records) except Exception as e: print_with_timestamp( f"❌ 写入线程异常: {e}", color='red' ) time.sleep(0.1) def _write_failed_log(self, table, records): """将持续失败的数据写入日志文件""" timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') log_file = f"failed_inserts_{timestamp}.json" try: with open(log_file, 'a', encoding='utf-8') as f: for rec in records: f.write(json.dumps({ 'table': table, 'record': rec, 'timestamp': timestamp }, ensure_ascii=False) + '\n') print_with_timestamp( f"⚠️ 失败数据已写入日志: {log_file}", color='yellow' ) except Exception as e: print_with_timestamp( f"❌ 写入失败日志异常: {e}", color='red' ) def put(self, table, record): """非阻塞提交数据""" self.queue.put_nowait((table, record)) # 使用示例 db_writer = SafeSQLiteWriter('db/dtmgtDb.db', batch_size=50, flush_interval=2) writer_thread = threading.Thread(target=db_writer.worker_thread, daemon=True) writer_thread.start() def cb_insert_result(test_req_result): """回调函数 - 提交到安全写入队列""" db_writer.put('TestReq', test_req_result) lmis_upload_system.post_data(test_req_result) ``` **收益**: - ✅ 消除数据丢失风险 - ✅ 提升写入吞吐量 (批量+WAL模式) - ✅ 失败数据有备份机制 --- ### 🟡 中等风险 R-002: LMIS上传系统内存占用 **现状**: ```python # dtmgtApp.py:249 class LMISUploadSystem: def __init__(self): self.retry_queue = queue.PriorityQueue() # ❌ 无界队列 self.executor = ThreadPoolExecutor(max_workers=20) ``` **风险点**: 1. **无界重试队列**: 长期失败的数据会无限堆积 2. **线程池无超时**: 卡死的请求占用线程资源 3. **无失败数据持久化** **影响**: - 内存泄漏风险 - 线程资源耗尽 **改进方案**: ```python import threading from concurrent.futures import ThreadPoolExecutor, TimeoutError from collections import deque class ImprovedLMISUploadSystem: """改进的LMIS上传系统 - 带内存保护""" def __init__(self, max_retry_queue_size=1000, request_timeout=30): self.retry_queue = queue.PriorityQueue(maxsize=max_retry_queue_size) self.executor = ThreadPoolExecutor( max_workers=20, thread_name_prefix='lmis_upload' ) self.request_timeout = request_timeout self.session = requests.Session() # 失败持久化 self.persistent_fail_log = 'lmis_failed_uploads.jsonl' # 统计指标 self.metrics = { 'total_submitted': 0, 'total_success': 0, 'total_failed': 0, 'retry_queue_size': 0 } self.metrics_lock = threading.Lock() self._start_retry_monitor() self._start_metrics_reporter() def _start_metrics_reporter(self): """启动指标上报线程""" def report(): while True: time.sleep(60) # 每分钟上报一次 with self.metrics_lock: print_with_timestamp( f"📊 LMIS上传统计: 提交={self.metrics['total_submitted']}, " f"成功={self.metrics['total_success']}, " f"失败={self.metrics['total_failed']}, " f"重试队列={self.metrics['retry_queue_size']}", color='cyan' ) threading.Thread(target=report, daemon=True).start() def post_data(self, data): """提交数据 - 带超时保护""" with self.metrics_lock: self.metrics['total_submitted'] += 1 future = self.executor.submit(self._upload_data, data, 0) # 不阻塞主线程,但设置超时回调 def timeout_callback(fut): try: fut.result(timeout=0.1) # 非阻塞检查 except TimeoutError: print_with_timestamp( f"⚠️ 上传任务超时: {data.get('SN')}", color='yellow' ) except Exception as e: print_with_timestamp( f"❌ 上传任务异常: {e}", color='red' ) future.add_done_callback(timeout_callback) def _upload_data(self, data, retry_count): """实际上传逻辑 - 带请求超时""" try: # 构建请求 (同原代码) item_json = {...} # 省略 post_json = {...} # 添加超时保护 response = self.session.post( UPLOAD_LIMS_URL, data=json.dumps(post_json), headers={'content-type': 'application/json'}, timeout=self.request_timeout # ⭐ 关键改进 ) if response.status_code == 200: response_json = response.json() code = response_json.get('code') if code in (200, 1501): with self.metrics_lock: self.metrics['total_success'] += 1 print_with_timestamp( f"✅ 上传成功: {data['SN']}", color='green' ) return raise Exception(f"上传失败: HTTP {response.status_code}") except requests.Timeout: print_with_timestamp( f"⏱️ 上传超时 ({self.request_timeout}s): {data.get('SN')}", color='yellow' ) self._handle_retry(data, retry_count) except Exception as e: print_with_timestamp( f"❌ 上传异常: {e}", color='red' ) self._handle_retry(data, retry_count) def _handle_retry(self, data, retry_count): """处理重试逻辑 - 带队列满保护""" if retry_count < UPLOAD_MAX_RETRIES: next_retry = time.time() + UPLOAD_RETRY_INTERVAL try: # 非阻塞入队 self.retry_queue.put_nowait((next_retry, data, retry_count + 1)) with self.metrics_lock: self.metrics['retry_queue_size'] = self.retry_queue.qsize() except queue.Full: # 队列满,持久化到文件 self._persist_failed_data(data) print_with_timestamp( f"⚠️ 重试队列已满,数据已持久化: {data.get('SN')}", color='yellow' ) else: # 最终失败 with self.metrics_lock: self.metrics['total_failed'] += 1 self._persist_failed_data(data) print_with_timestamp( f"❌ 放弃上传 (重试{UPLOAD_MAX_RETRIES}次): {data.get('SN')}", color='red' ) def _persist_failed_data(self, data): """持久化失败数据""" try: with open(self.persistent_fail_log, 'a', encoding='utf-8') as f: f.write(json.dumps({ 'data': data, 'timestamp': datetime.datetime.now().isoformat(), 'reason': 'max_retries_exceeded' }, ensure_ascii=False) + '\n') except Exception as e: print_with_timestamp( f"❌ 持久化失败数据异常: {e}", color='red' ) ``` **收益**: - ✅ 防止内存泄漏 - ✅ 避免线程资源耗尽 - ✅ 失败数据可人工补录 --- ## 2. 数据丢失风险 ### 🔴 严重风险 R-003: 测试结果计数回绕判断缺失 **现状**: ```python # dtMachineService.py:765 if result.get('value') > station.get("pre_dut_result_counter", 0): # 直接判断大于,未处理计数器回绕 station['last_insert_counter'] = result.get('value') self.machine_poll_test_result_bulk(machine, station) ``` **风险点**: 1. **PLC计数器回绕**: 32位计数器达到最大值后归零 2. **抖动误判**: 网络延迟导致的旧值晚到 3. **断电恢复**: 上位机重启后`pre_dut_result_counter`丢失 **影响**: - 漏读测试结果 - 重复读取数据 **改进方案**: ```python class CounterManager: """测试结果计数器管理器 - 处理回绕和抖动""" def __init__(self, max_counter=0xFFFFFFFF, rollover_threshold=0.9): self.max_counter = max_counter self.rollover_threshold = int(max_counter * rollover_threshold) def is_valid_increment(self, current, previous, station_sn): """ 判断计数器是否为有效递增 返回: (is_valid, reason) """ # 情况1: 首次读取 if previous == 0: return (True, "first_read") # 情况2: 正常递增 if current > previous and (current - previous) <= 100: return (True, "normal_increment") # 情况3: 回绕检测 if previous > self.rollover_threshold and current < (self.max_counter * 0.1): rollover_gap = (self.max_counter - previous) + current if rollover_gap <= 100: print_with_timestamp( f"🔄 检测到计数器回绕: {station_sn} {previous} → {current}", color='yellow' ) return (True, "rollover") # 情况4: 抖动 (当前值小于历史值,但差距不大) if current < previous and (previous - current) <= 5: print_with_timestamp( f"⚠️ 检测到计数抖动 (忽略): {station_sn} {current} < {previous}", color='yellow' ) return (False, "jitter") # 情况5: 异常跳变 if current > previous and (current - previous) > 100: print_with_timestamp( f"❌ 检测到计数异常跳变: {station_sn} {previous} → {current} (增量={current-previous})", color='red' ) # 记录告警,但仍然读取数据 self._log_counter_anomaly(station_sn, previous, current) return (True, "abnormal_jump") # 情况6: 计数器倒退 (严重异常) if current < previous and (previous - current) > 5: print_with_timestamp( f"🚨 严重: 计数器倒退: {station_sn} {previous} → {current}", color='red' ) self._log_counter_anomaly(station_sn, previous, current) return (False, "counter_rollback") return (False, "unknown") def _log_counter_anomaly(self, station_sn, previous, current): """记录计数器异常""" log_entry = { 'timestamp': datetime.datetime.now().isoformat(), 'station': station_sn, 'previous': previous, 'current': current, 'delta': current - previous } try: with open('counter_anomaly.log', 'a', encoding='utf-8') as f: f.write(json.dumps(log_entry) + '\n') except Exception as e: print_with_timestamp(f"❌ 写入计数异常日志失败: {e}", color='red') # 在 dtMachineService 中使用 counter_manager = CounterManager() def machine_poll_enhanced(self, args): """增强的轮询逻辑""" machine = args["machine"] for station in machine.get('stations', []): if station.get('status') == 'running' and self.dutDirectionMode in [2]: result = machine.get('virt_plc_device').read_station_result_counter(station['SN']) if result.get('status') == 'success': current_counter = result.get('value') previous_counter = station.get("pre_dut_result_counter", 0) # ⭐ 使用计数管理器判断 is_valid, reason = counter_manager.is_valid_increment( current_counter, previous_counter, station['SN'] ) if is_valid: # 记录到监控表 insert_record = { "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "machine": machine.get('SN'), 'station': station.get('SN'), 'current_counter': current_counter, 'pre_counter': previous_counter, 'reason': reason, 'status': 1 } self.insertTableRecords('get_counter', [insert_record]) # 更新缓存 station['last_insert_counter'] = current_counter # 读取测试结果 self.machine_poll_test_result_bulk(machine, station) else: # 记录异常 insert_record = { "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "machine": machine.get('SN'), 'station': station.get('SN'), 'current_counter': current_counter, 'pre_counter': previous_counter, 'reason': reason, 'status': 0 # 无效 } self.insertTableRecords('get_counter', [insert_record]) ``` **收益**: - ✅ 正确处理32位计数器回绕 - ✅ 识别并过滤抖动 - ✅ 异常数据有日志可追溯 --- ### 🟡 中等风险 R-004: 批量读取索引管理缺陷 **现状**: ```python # dtMachineService.py:380 station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10 ``` **风险点**: 1. **固定增量**: 假设每次必读成功 2. **无失败回退**: 读取失败时索引仍然前进 3. **与PLC不同步**: 没有读取PLC的写指针 **改进方案**: ```python class ResultIndexManager: """测试结果索引管理器""" def __init__(self, buffer_size=10, item_size=1): self.buffer_size = buffer_size self.item_size = item_size self.read_history = deque(maxlen=100) # 记录最近100次读取 def calculate_next_index(self, current_index, read_success, plc_write_index=None): """ 计算下一次读取索引 参数: current_index: 当前读取索引 read_success: 本次读取是否成功 plc_write_index: PLC当前写入索引 (可选) """ if not read_success: # 读取失败,保持当前索引 self.read_history.append({ 'timestamp': time.time(), 'index': current_index, 'success': False }) return current_index # 读取成功,前进 next_index = (current_index + self.item_size) % self.buffer_size # 如果知道PLC写指针,避免追尾 if plc_write_index is not None: if next_index == plc_write_index: print_with_timestamp( f"⚠️ 读指针追上写指针,暂停推进: read={next_index}, write={plc_write_index}", color='yellow' ) next_index = current_index self.read_history.append({ 'timestamp': time.time(), 'index': current_index, 'success': True, 'next': next_index }) return next_index def get_read_statistics(self): """获取读取统计""" if not self.read_history: return {} total = len(self.read_history) success_count = sum(1 for r in self.read_history if r['success']) return { 'total_reads': total, 'success_rate': success_count / total, 'recent_10': list(self.read_history)[-10:] } # 使用示例 index_manager = ResultIndexManager(buffer_size=10, item_size=1) def machine_poll_test_result_bulk_enhanced(self, machine, station): """增强的批量测试结果读取""" registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead if station.get('result_index_read') is None: station['result_index_read'] = 0 current_index = station['result_index_read'] try: result = machine.get('virt_plc_device').read_station_cyclesFinishedBulk( station['SN'], current_index, registers_len ) read_success = (result.get('status') == 'success') if read_success: # 处理数据 self.handle_test_result_registers_value( machine, station, result.get('value'), TestReqResult_ItemCountOnceRead ) # ⭐ 使用索引管理器计算下一次索引 station['result_index_read'] = index_manager.calculate_next_index( current_index, read_success, plc_write_index=None # 如果PLC提供写指针寄存器,可传入 ) except Exception as error: print_with_timestamp( f"❌ 批量读取异常: {error}", color='red' ) # 失败时索引不变 station['result_index_read'] = current_index ``` --- ## 3. 性能瓶颈风险 ### 🟡 中等风险 R-005: 串口通信锁粒度过大 **现状**: ```python # modbus_server.py:73 with self.plc_comm_lock: # ❌ 整个读写操作都在锁内 resp = plc_device.plc_read_bits(...) ``` **风险点**: 1. **锁粒度过大**: 数据解析也在锁内 2. **阻塞时间长**: 串口超时会长时间持锁 3. **并发性能差**: 多台设备串行访问 **改进方案**: ```python # 改进: 只保护串口IO操作 class OptimizedMitsubishiPLC: def __init__(self, plc_port, comm_config): self.ser = serial.Serial(...) self.io_lock = threading.Lock() # 只保护IO def plc_read_words(self, plc_no, pc_no, address, length): # 1. 构建帧 (无锁) frame = self.plc_read_frame('WR', plc_no, pc_no, address, length) # 2. 串口IO (加锁) with self.io_lock: self.ser.write(frame) recv_frame = self.ser.read(expected_size) # 3. 数据解析 (无锁) if not self.frame_is_valid(recv_frame): return {'status': 'error'} value = [ascii_array_to_word(recv_frame[5+4*i:5+4*(i+1)]) for i in range(length)] return {'status': 'success', 'data': value} ``` **收益**: - ✅ 提升30%并发吞吐 - ✅ 降低锁等待时间 --- ### 🟡 中等风险 R-006: 数据库查询缺少索引 **现状**: ```python # api_route.py:425 select_sql_str = ( f'SELECT * from TestReq WHERE TestReq.id > 0 {sql_condition_str}' ) # ❌ 时间范围查询未建索引 ``` **风险点**: - 全表扫描导致查询慢 - 高并发时数据库CPU飙升 **改进方案**: ```sql -- 创建复合索引 CREATE INDEX IF NOT EXISTS idx_testreq_endtime_sn ON TestReq(endTime, SN); CREATE INDEX IF NOT EXISTS idx_testreq_status_endtime ON TestReq(status, endTime); CREATE INDEX IF NOT EXISTS idx_dutlist_stationassigned ON dutList(stationAssigned); -- 启用查询计划分析 EXPLAIN QUERY PLAN SELECT * FROM TestReq WHERE endTime BETWEEN '2024-01-01' AND '2024-12-31'; ``` --- ## 4. 并发安全风险 ### 🔴 严重风险 R-007: busy_flag并发控制不足 **现状**: ```python # dtMachineService.py:632 if busy_flag is False: # ❌ 非原子操作 # 可能多个线程同时通过检查 ``` **风险点**: - 多线程同时写PLC寄存器 - 数据竞争导致状态不一致 **改进方案**: ```python import threading class MachineOperationLock: """机器操作锁 - 支持超时和死锁检测""" def __init__(self, timeout=5): self._lock = threading.RLock() # 可重入锁 self._timeout = timeout self._holder = None self._acquire_time = None def acquire(self, blocking=True, timeout=None): """获取锁""" timeout = timeout or self._timeout acquired = self._lock.acquire(blocking=blocking, timeout=timeout) if acquired: self._holder = threading.current_thread().name self._acquire_time = time.time() else: print_with_timestamp( f"⚠️ 获取机器锁超时 ({timeout}s), 当前持有者: {self._holder}", color='yellow' ) return acquired def release(self): """释放锁""" hold_duration = time.time() - self._acquire_time if self._acquire_time else 0 if hold_duration > 2: print_with_timestamp( f"⚠️ 锁持有时间过长: {hold_duration:.2f}s by {self._holder}", color='yellow' ) self._holder = None self._acquire_time = None self._lock.release() def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() # 使用示例 machine_lock = MachineOperationLock(timeout=5) @app.route('/machine_control', methods=['POST']) def machine_control(): if not machine_lock.acquire(timeout=5): return jsonify({"status": "error", "message": "系统繁忙,请稍后重试"}) try: # 执行机器控制逻辑 ... finally: machine_lock.release() ``` --- ## 5. 异常处理风险 ### 🟡 中等风险 R-008: PLC通信异常吞噬 **现状**: ```python # dtMachineService.py:666 except: pass # ❌ 裸except吞噬所有异常 ``` **风险点**: - 隐藏严重错误 (如内存溢出) - 难以排查问题 **改进方案**: ```python import traceback import logging # 配置日志 logging.basicConfig( filename='dtmgt_error.log', level=logging.ERROR, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def machine_poll_safe(self, args): """安全的轮询逻辑""" machine = args["machine"] try: # 读取PLC连接状态 result = machine.get('virt_plc_device').read_plc_connect_state() if result.get('status') == 'success': self.handle_status_sync_with_plc(machine, result.get('value')) except KeyboardInterrupt: # 允许Ctrl+C中断 raise except ModbusException as e: # Modbus协议异常 print_with_timestamp( f"❌ Modbus通信异常: {machine.get('SN')} - {e}", color='red' ) logging.error(f"Modbus异常: {machine.get('SN')}", exc_info=True) except serial.SerialException as e: # 串口异常 print_with_timestamp( f"❌ 串口异常: {machine.get('com')} - {e}", color='red' ) logging.error(f"串口异常: {machine.get('com')}", exc_info=True) # 尝试重连 try: machine['virt_plc_device'].connect_to_modbus_server( self.modbusServer, self.modbusServerPort ) except Exception as reconnect_error: logging.error(f"重连失败: {reconnect_error}") except Exception as e: # 未知异常 print_with_timestamp( f"🚨 未知异常: {type(e).__name__} - {e}", color='red' ) logging.critical( f"未知异常: {machine.get('SN')}", exc_info=True ) # 发送告警 (可选) # send_alert_email(subject="系统异常", body=traceback.format_exc()) ``` --- ## 6. 改进方案优先级 ### P0 (紧急 - 数据安全) | 编号 | 问题 | 改进方案 | 预期收益 | |------|------|---------|---------| | R-001 | SQLite并发写入 | SafeSQLiteWriter类 | 消除数据丢失 | | R-003 | 计数回绕判断 | CounterManager类 | 避免漏读/重复 | | R-007 | busy_flag竞态 | MachineOperationLock | 防止PLC写冲突 | ### P1 (重要 - 稳定性) | 编号 | 问题 | 改进方案 | 预期收益 | |------|------|---------|---------| | R-002 | LMIS内存泄漏 | 有界队列+持久化 | 防止OOM | | R-004 | 结果索引管理 | ResultIndexManager | 提升可靠性 | | R-008 | 异常吞噬 | 结构化异常处理 | 可观测性 | ### P2 (优化 - 性能) | 编号 | 问题 | 改进方案 | 预期收益 | |------|------|---------|---------| | R-005 | 串口锁粒度 | 细化锁范围 | +30%并发 | | R-006 | 数据库索引 | 创建复合索引 | 查询加速10x | --- ## 7. 监控与告警建议 ### 7.1 关键指标监控 ```python class SystemMonitor: """系统健康监控""" def __init__(self): self.metrics = { 'plc_read_success_rate': 0, 'plc_read_latency_p99': 0, 'db_write_success_rate': 0, 'lmis_upload_success_rate': 0, 'queue_depth': { 'db_write': 0, 'lmis_upload': 0, 'websocket': 0 } } def report_metrics(self): """每分钟上报指标""" print_with_timestamp( f"📊 系统指标: " f"PLC读取成功率={self.metrics['plc_read_success_rate']:.2%}, " f"延迟P99={self.metrics['plc_read_latency_p99']:.0f}ms, " f"数据库写入成功率={self.metrics['db_write_success_rate']:.2%}, " f"LMIS上传成功率={self.metrics['lmis_upload_success_rate']:.2%}", color='cyan' ) ``` ### 7.2 告警规则 | 指标 | 阈值 | 告警级别 | |------|------|---------| | PLC读取成功率 | <95% | WARNING | | PLC读取延迟P99 | >500ms | WARNING | | 数据库写入失败率 | >1% | CRITICAL | | LMIS上传重试队列 | >500 | WARNING | | 内存使用率 | >80% | WARNING | --- **文档版本**: v1.0 **最后更新**: 2025-11-19 **下一步**: 实施P0级改进方案