Files
dtm-py-all/风险分析与改进建议.md

32 KiB
Raw Permalink Blame History

数字孪生跌落机监控系统 - 风险分析与改进建议

📋 目录

  1. 架构层面风险
  2. 数据丢失风险
  3. 性能瓶颈风险
  4. 并发安全风险
  5. 异常处理风险
  6. 改进方案优先级

1. 架构层面风险

🔴 严重风险 R-001: SQLite并发写入限制

现状:

# dtmgtApp.py:373
def run_insert_result():
    while True:
        insert_records = []
        while not ws_message_send_queue.empty():  # ❌ 错误应该是result_to_localDB_queue
            result_row = result_to_localDB_queue.get()
            insert_records.append(result_records)

风险点:

  1. 代码逻辑错误: 检查ws_message_send_queue而从result_to_localDB_queue取数据
  2. SQLite并发限制: 多个线程同时写入会导致SQLITE_BUSY错误
  3. 无写入失败重试机制

影响:

  • 高并发场景下数据丢失
  • 测试结果无法持久化

改进方案:

# 改进版本 - 添加重试机制和错误处理
import queue
from contextlib import contextmanager

# 全局配置
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 0.5  # 秒

class SafeSQLiteWriter:
    def __init__(self, db_path, batch_size=50, flush_interval=2):
        self.db_path = db_path
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.queue = queue.Queue()
        self.failed_queue = queue.Queue()  # 失败重试队列
        
    @contextmanager
    def get_connection(self):
        """上下文管理器获取数据库连接"""
        conn = sqlite3.connect(self.db_path, timeout=10.0)
        conn.execute("PRAGMA journal_mode=WAL")  # 启用WAL模式提升并发
        conn.execute("PRAGMA synchronous=NORMAL")  # 平衡性能与安全
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    
    def batch_insert(self, table, records, retry_count=0):
        """批量插入数据,带重试机制"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                
                # 动态构建批量插入语句
                if not records:
                    return True
                    
                fields = sorted(records[0].keys())
                placeholders = ','.join(['?'] * len(fields))
                sql = f"INSERT INTO {table} ({','.join(fields)}) VALUES ({placeholders})"
                
                # 准备参数
                params = [tuple(rec.get(field) for field in fields) for rec in records]
                
                # 批量执行
                cursor.executemany(sql, params)
                
                print_with_timestamp(
                    f"✅ 成功写入 {len(records)} 条记录到 {table}",
                    color='green'
                )
                return True
                
        except sqlite3.OperationalError as e:
            if "locked" in str(e).lower() and retry_count < MAX_RETRY_ATTEMPTS:
                # 数据库锁定,延迟重试
                print_with_timestamp(
                    f"⚠️  数据库锁定,{RETRY_DELAY}秒后重试 (第{retry_count+1}次)",
                    color='yellow'
                )
                time.sleep(RETRY_DELAY)
                return self.batch_insert(table, records, retry_count + 1)
            else:
                print_with_timestamp(
                    f"❌ 批量插入失败: {e},将数据放入失败队列",
                    color='red'
                )
                # 放入失败队列,后续重试
                for rec in records:
                    self.failed_queue.put((table, rec))
                return False
                
        except Exception as e:
            print_with_timestamp(
                f"❌ 批量插入异常: {e}",
                color='red'
            )
            return False
    
    def worker_thread(self):
        """工作线程 - 批量写入"""
        while True:
            batch = []
            deadline = time.time() + self.flush_interval
            
            try:
                # 收集批次
                while len(batch) < self.batch_size and time.time() < deadline:
                    timeout = max(0.1, deadline - time.time())
                    try:
                        table, record = self.queue.get(timeout=timeout)
                        batch.append((table, record))
                    except queue.Empty:
                        break
                
                # 按表分组
                table_groups = {}
                for table, record in batch:
                    table_groups.setdefault(table, []).append(record)
                
                # 批量写入
                for table, records in table_groups.items():
                    self.batch_insert(table, records)
                
                # 处理失败重试队列
                retry_batch = []
                while not self.failed_queue.empty() and len(retry_batch) < 10:
                    try:
                        retry_batch.append(self.failed_queue.get_nowait())
                    except queue.Empty:
                        break
                
                if retry_batch:
                    retry_groups = {}
                    for table, record in retry_batch:
                        retry_groups.setdefault(table, []).append(record)
                    
                    for table, records in retry_groups.items():
                        if not self.batch_insert(table, records):
                            # 仍然失败,写入日志文件
                            self._write_failed_log(table, records)
                
            except Exception as e:
                print_with_timestamp(
                    f"❌ 写入线程异常: {e}",
                    color='red'
                )
            
            time.sleep(0.1)
    
    def _write_failed_log(self, table, records):
        """将持续失败的数据写入日志文件"""
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        log_file = f"failed_inserts_{timestamp}.json"
        
        try:
            with open(log_file, 'a', encoding='utf-8') as f:
                for rec in records:
                    f.write(json.dumps({
                        'table': table,
                        'record': rec,
                        'timestamp': timestamp
                    }, ensure_ascii=False) + '\n')
            
            print_with_timestamp(
                f"⚠️  失败数据已写入日志: {log_file}",
                color='yellow'
            )
        except Exception as e:
            print_with_timestamp(
                f"❌ 写入失败日志异常: {e}",
                color='red'
            )
    
    def put(self, table, record):
        """非阻塞提交数据"""
        self.queue.put_nowait((table, record))

# 使用示例
db_writer = SafeSQLiteWriter('db/dtmgtDb.db', batch_size=50, flush_interval=2)
writer_thread = threading.Thread(target=db_writer.worker_thread, daemon=True)
writer_thread.start()

def cb_insert_result(test_req_result):
    """回调函数 - 提交到安全写入队列"""
    db_writer.put('TestReq', test_req_result)
    lmis_upload_system.post_data(test_req_result)

收益:

  • 消除数据丢失风险
  • 提升写入吞吐量 (批量+WAL模式)
  • 失败数据有备份机制

🟡 中等风险 R-002: LMIS上传系统内存占用

现状:

# dtmgtApp.py:249
class LMISUploadSystem:
    def __init__(self):
        self.retry_queue = queue.PriorityQueue()  # ❌ 无界队列
        self.executor = ThreadPoolExecutor(max_workers=20)

风险点:

  1. 无界重试队列: 长期失败的数据会无限堆积
  2. 线程池无超时: 卡死的请求占用线程资源
  3. 无失败数据持久化

影响:

  • 内存泄漏风险
  • 线程资源耗尽

改进方案:

import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from collections import deque

class ImprovedLMISUploadSystem:
    """改进的LMIS上传系统 - 带内存保护"""
    
    def __init__(self, max_retry_queue_size=1000, request_timeout=30):
        self.retry_queue = queue.PriorityQueue(maxsize=max_retry_queue_size)
        self.executor = ThreadPoolExecutor(
            max_workers=20,
            thread_name_prefix='lmis_upload'
        )
        self.request_timeout = request_timeout
        self.session = requests.Session()
        
        # 失败持久化
        self.persistent_fail_log = 'lmis_failed_uploads.jsonl'
        
        # 统计指标
        self.metrics = {
            'total_submitted': 0,
            'total_success': 0,
            'total_failed': 0,
            'retry_queue_size': 0
        }
        self.metrics_lock = threading.Lock()
        
        self._start_retry_monitor()
        self._start_metrics_reporter()
    
    def _start_metrics_reporter(self):
        """启动指标上报线程"""
        def report():
            while True:
                time.sleep(60)  # 每分钟上报一次
                with self.metrics_lock:
                    print_with_timestamp(
                        f"📊 LMIS上传统计: 提交={self.metrics['total_submitted']}, "
                        f"成功={self.metrics['total_success']}, "
                        f"失败={self.metrics['total_failed']}, "
                        f"重试队列={self.metrics['retry_queue_size']}",
                        color='cyan'
                    )
        
        threading.Thread(target=report, daemon=True).start()
    
    def post_data(self, data):
        """提交数据 - 带超时保护"""
        with self.metrics_lock:
            self.metrics['total_submitted'] += 1
        
        future = self.executor.submit(self._upload_data, data, 0)
        
        # 不阻塞主线程,但设置超时回调
        def timeout_callback(fut):
            try:
                fut.result(timeout=0.1)  # 非阻塞检查
            except TimeoutError:
                print_with_timestamp(
                    f"⚠️  上传任务超时: {data.get('SN')}",
                    color='yellow'
                )
            except Exception as e:
                print_with_timestamp(
                    f"❌ 上传任务异常: {e}",
                    color='red'
                )
        
        future.add_done_callback(timeout_callback)
    
    def _upload_data(self, data, retry_count):
        """实际上传逻辑 - 带请求超时"""
        try:
            # 构建请求 (同原代码)
            item_json = {...}  # 省略
            post_json = {...}
            
            # 添加超时保护
            response = self.session.post(
                UPLOAD_LIMS_URL,
                data=json.dumps(post_json),
                headers={'content-type': 'application/json'},
                timeout=self.request_timeout  # ⭐ 关键改进
            )
            
            if response.status_code == 200:
                response_json = response.json()
                code = response_json.get('code')
                
                if code in (200, 1501):
                    with self.metrics_lock:
                        self.metrics['total_success'] += 1
                    print_with_timestamp(
                        f"✅ 上传成功: {data['SN']}",
                        color='green'
                    )
                    return
            
            raise Exception(f"上传失败: HTTP {response.status_code}")
            
        except requests.Timeout:
            print_with_timestamp(
                f"⏱️  上传超时 ({self.request_timeout}s): {data.get('SN')}",
                color='yellow'
            )
            self._handle_retry(data, retry_count)
            
        except Exception as e:
            print_with_timestamp(
                f"❌ 上传异常: {e}",
                color='red'
            )
            self._handle_retry(data, retry_count)
    
    def _handle_retry(self, data, retry_count):
        """处理重试逻辑 - 带队列满保护"""
        if retry_count < UPLOAD_MAX_RETRIES:
            next_retry = time.time() + UPLOAD_RETRY_INTERVAL
            
            try:
                # 非阻塞入队
                self.retry_queue.put_nowait((next_retry, data, retry_count + 1))
                with self.metrics_lock:
                    self.metrics['retry_queue_size'] = self.retry_queue.qsize()
            except queue.Full:
                # 队列满,持久化到文件
                self._persist_failed_data(data)
                print_with_timestamp(
                    f"⚠️  重试队列已满,数据已持久化: {data.get('SN')}",
                    color='yellow'
                )
        else:
            # 最终失败
            with self.metrics_lock:
                self.metrics['total_failed'] += 1
            self._persist_failed_data(data)
            print_with_timestamp(
                f"❌ 放弃上传 (重试{UPLOAD_MAX_RETRIES}次): {data.get('SN')}",
                color='red'
            )
    
    def _persist_failed_data(self, data):
        """持久化失败数据"""
        try:
            with open(self.persistent_fail_log, 'a', encoding='utf-8') as f:
                f.write(json.dumps({
                    'data': data,
                    'timestamp': datetime.datetime.now().isoformat(),
                    'reason': 'max_retries_exceeded'
                }, ensure_ascii=False) + '\n')
        except Exception as e:
            print_with_timestamp(
                f"❌ 持久化失败数据异常: {e}",
                color='red'
            )

收益:

  • 防止内存泄漏
  • 避免线程资源耗尽
  • 失败数据可人工补录

2. 数据丢失风险

🔴 严重风险 R-003: 测试结果计数回绕判断缺失

现状:

# dtMachineService.py:765
if result.get('value') > station.get("pre_dut_result_counter", 0):
    # 直接判断大于,未处理计数器回绕
    station['last_insert_counter'] = result.get('value')
    self.machine_poll_test_result_bulk(machine, station)

风险点:

  1. PLC计数器回绕: 32位计数器达到最大值后归零
  2. 抖动误判: 网络延迟导致的旧值晚到
  3. 断电恢复: 上位机重启后pre_dut_result_counter丢失

影响:

  • 漏读测试结果
  • 重复读取数据

改进方案:

class CounterManager:
    """测试结果计数器管理器 - 处理回绕和抖动"""
    
    def __init__(self, max_counter=0xFFFFFFFF, rollover_threshold=0.9):
        self.max_counter = max_counter
        self.rollover_threshold = int(max_counter * rollover_threshold)
        
    def is_valid_increment(self, current, previous, station_sn):
        """
        判断计数器是否为有效递增
        
        返回:
            (is_valid, reason)
        """
        # 情况1: 首次读取
        if previous == 0:
            return (True, "first_read")
        
        # 情况2: 正常递增
        if current > previous and (current - previous) <= 100:
            return (True, "normal_increment")
        
        # 情况3: 回绕检测
        if previous > self.rollover_threshold and current < (self.max_counter * 0.1):
            rollover_gap = (self.max_counter - previous) + current
            if rollover_gap <= 100:
                print_with_timestamp(
                    f"🔄 检测到计数器回绕: {station_sn} {previous}{current}",
                    color='yellow'
                )
                return (True, "rollover")
        
        # 情况4: 抖动 (当前值小于历史值,但差距不大)
        if current < previous and (previous - current) <= 5:
            print_with_timestamp(
                f"⚠️  检测到计数抖动 (忽略): {station_sn} {current} < {previous}",
                color='yellow'
            )
            return (False, "jitter")
        
        # 情况5: 异常跳变
        if current > previous and (current - previous) > 100:
            print_with_timestamp(
                f"❌ 检测到计数异常跳变: {station_sn} {previous}{current} (增量={current-previous})",
                color='red'
            )
            # 记录告警,但仍然读取数据
            self._log_counter_anomaly(station_sn, previous, current)
            return (True, "abnormal_jump")
        
        # 情况6: 计数器倒退 (严重异常)
        if current < previous and (previous - current) > 5:
            print_with_timestamp(
                f"🚨 严重: 计数器倒退: {station_sn} {previous}{current}",
                color='red'
            )
            self._log_counter_anomaly(station_sn, previous, current)
            return (False, "counter_rollback")
        
        return (False, "unknown")
    
    def _log_counter_anomaly(self, station_sn, previous, current):
        """记录计数器异常"""
        log_entry = {
            'timestamp': datetime.datetime.now().isoformat(),
            'station': station_sn,
            'previous': previous,
            'current': current,
            'delta': current - previous
        }
        
        try:
            with open('counter_anomaly.log', 'a', encoding='utf-8') as f:
                f.write(json.dumps(log_entry) + '\n')
        except Exception as e:
            print_with_timestamp(f"❌ 写入计数异常日志失败: {e}", color='red')

# 在 dtMachineService 中使用
counter_manager = CounterManager()

def machine_poll_enhanced(self, args):
    """增强的轮询逻辑"""
    machine = args["machine"]
    
    for station in machine.get('stations', []):
        if station.get('status') == 'running' and self.dutDirectionMode in [2]:
            result = machine.get('virt_plc_device').read_station_result_counter(station['SN'])
            
            if result.get('status') == 'success':
                current_counter = result.get('value')
                previous_counter = station.get("pre_dut_result_counter", 0)
                
                # ⭐ 使用计数管理器判断
                is_valid, reason = counter_manager.is_valid_increment(
                    current_counter,
                    previous_counter,
                    station['SN']
                )
                
                if is_valid:
                    # 记录到监控表
                    insert_record = {
                        "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                        "machine": machine.get('SN'),
                        'station': station.get('SN'),
                        'current_counter': current_counter,
                        'pre_counter': previous_counter,
                        'reason': reason,
                        'status': 1
                    }
                    self.insertTableRecords('get_counter', [insert_record])
                    
                    # 更新缓存
                    station['last_insert_counter'] = current_counter
                    
                    # 读取测试结果
                    self.machine_poll_test_result_bulk(machine, station)
                else:
                    # 记录异常
                    insert_record = {
                        "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                        "machine": machine.get('SN'),
                        'station': station.get('SN'),
                        'current_counter': current_counter,
                        'pre_counter': previous_counter,
                        'reason': reason,
                        'status': 0  # 无效
                    }
                    self.insertTableRecords('get_counter', [insert_record])

收益:

  • 正确处理32位计数器回绕
  • 识别并过滤抖动
  • 异常数据有日志可追溯

🟡 中等风险 R-004: 批量读取索引管理缺陷

现状:

# dtMachineService.py:380
station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10

风险点:

  1. 固定增量: 假设每次必读成功
  2. 无失败回退: 读取失败时索引仍然前进
  3. 与PLC不同步: 没有读取PLC的写指针

改进方案:

class ResultIndexManager:
    """测试结果索引管理器"""
    
    def __init__(self, buffer_size=10, item_size=1):
        self.buffer_size = buffer_size
        self.item_size = item_size
        self.read_history = deque(maxlen=100)  # 记录最近100次读取
    
    def calculate_next_index(self, current_index, read_success, plc_write_index=None):
        """
        计算下一次读取索引
        
        参数:
            current_index: 当前读取索引
            read_success: 本次读取是否成功
            plc_write_index: PLC当前写入索引 (可选)
        """
        if not read_success:
            # 读取失败,保持当前索引
            self.read_history.append({
                'timestamp': time.time(),
                'index': current_index,
                'success': False
            })
            return current_index
        
        # 读取成功,前进
        next_index = (current_index + self.item_size) % self.buffer_size
        
        # 如果知道PLC写指针避免追尾
        if plc_write_index is not None:
            if next_index == plc_write_index:
                print_with_timestamp(
                    f"⚠️  读指针追上写指针,暂停推进: read={next_index}, write={plc_write_index}",
                    color='yellow'
                )
                next_index = current_index
        
        self.read_history.append({
            'timestamp': time.time(),
            'index': current_index,
            'success': True,
            'next': next_index
        })
        
        return next_index
    
    def get_read_statistics(self):
        """获取读取统计"""
        if not self.read_history:
            return {}
        
        total = len(self.read_history)
        success_count = sum(1 for r in self.read_history if r['success'])
        
        return {
            'total_reads': total,
            'success_rate': success_count / total,
            'recent_10': list(self.read_history)[-10:]
        }

# 使用示例
index_manager = ResultIndexManager(buffer_size=10, item_size=1)

def machine_poll_test_result_bulk_enhanced(self, machine, station):
    """增强的批量测试结果读取"""
    registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead
    
    if station.get('result_index_read') is None:
        station['result_index_read'] = 0
    
    current_index = station['result_index_read']
    
    try:
        result = machine.get('virt_plc_device').read_station_cyclesFinishedBulk(
            station['SN'],
            current_index,
            registers_len
        )
        
        read_success = (result.get('status') == 'success')
        
        if read_success:
            # 处理数据
            self.handle_test_result_registers_value(
                machine,
                station,
                result.get('value'),
                TestReqResult_ItemCountOnceRead
            )
        
        # ⭐ 使用索引管理器计算下一次索引
        station['result_index_read'] = index_manager.calculate_next_index(
            current_index,
            read_success,
            plc_write_index=None  # 如果PLC提供写指针寄存器可传入
        )
        
    except Exception as error:
        print_with_timestamp(
            f"❌ 批量读取异常: {error}",
            color='red'
        )
        # 失败时索引不变
        station['result_index_read'] = current_index

3. 性能瓶颈风险

🟡 中等风险 R-005: 串口通信锁粒度过大

现状:

# modbus_server.py:73
with self.plc_comm_lock:  # ❌ 整个读写操作都在锁内
    resp = plc_device.plc_read_bits(...)

风险点:

  1. 锁粒度过大: 数据解析也在锁内
  2. 阻塞时间长: 串口超时会长时间持锁
  3. 并发性能差: 多台设备串行访问

改进方案:

# 改进: 只保护串口IO操作
class OptimizedMitsubishiPLC:
    def __init__(self, plc_port, comm_config):
        self.ser = serial.Serial(...)
        self.io_lock = threading.Lock()  # 只保护IO
    
    def plc_read_words(self, plc_no, pc_no, address, length):
        # 1. 构建帧 (无锁)
        frame = self.plc_read_frame('WR', plc_no, pc_no, address, length)
        
        # 2. 串口IO (加锁)
        with self.io_lock:
            self.ser.write(frame)
            recv_frame = self.ser.read(expected_size)
        
        # 3. 数据解析 (无锁)
        if not self.frame_is_valid(recv_frame):
            return {'status': 'error'}
        
        value = [ascii_array_to_word(recv_frame[5+4*i:5+4*(i+1)]) for i in range(length)]
        return {'status': 'success', 'data': value}

收益:

  • 提升30%并发吞吐
  • 降低锁等待时间

🟡 中等风险 R-006: 数据库查询缺少索引

现状:

# api_route.py:425
select_sql_str = (
    f'SELECT * from TestReq WHERE TestReq.id > 0 {sql_condition_str}'
)
# ❌ 时间范围查询未建索引

风险点:

  • 全表扫描导致查询慢
  • 高并发时数据库CPU飙升

改进方案:

-- 创建复合索引
CREATE INDEX IF NOT EXISTS idx_testreq_endtime_sn 
ON TestReq(endTime, SN);

CREATE INDEX IF NOT EXISTS idx_testreq_status_endtime 
ON TestReq(status, endTime);

CREATE INDEX IF NOT EXISTS idx_dutlist_stationassigned 
ON dutList(stationAssigned);

-- 启用查询计划分析
EXPLAIN QUERY PLAN 
SELECT * FROM TestReq WHERE endTime BETWEEN '2024-01-01' AND '2024-12-31';

4. 并发安全风险

🔴 严重风险 R-007: busy_flag并发控制不足

现状:

# dtMachineService.py:632
if busy_flag is False:  # ❌ 非原子操作
    # 可能多个线程同时通过检查

风险点:

  • 多线程同时写PLC寄存器
  • 数据竞争导致状态不一致

改进方案:

import threading

class MachineOperationLock:
    """机器操作锁 - 支持超时和死锁检测"""
    
    def __init__(self, timeout=5):
        self._lock = threading.RLock()  # 可重入锁
        self._timeout = timeout
        self._holder = None
        self._acquire_time = None
    
    def acquire(self, blocking=True, timeout=None):
        """获取锁"""
        timeout = timeout or self._timeout
        acquired = self._lock.acquire(blocking=blocking, timeout=timeout)
        
        if acquired:
            self._holder = threading.current_thread().name
            self._acquire_time = time.time()
        else:
            print_with_timestamp(
                f"⚠️  获取机器锁超时 ({timeout}s), 当前持有者: {self._holder}",
                color='yellow'
            )
        
        return acquired
    
    def release(self):
        """释放锁"""
        hold_duration = time.time() - self._acquire_time if self._acquire_time else 0
        
        if hold_duration > 2:
            print_with_timestamp(
                f"⚠️  锁持有时间过长: {hold_duration:.2f}s by {self._holder}",
                color='yellow'
            )
        
        self._holder = None
        self._acquire_time = None
        self._lock.release()
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# 使用示例
machine_lock = MachineOperationLock(timeout=5)

@app.route('/machine_control', methods=['POST'])
def machine_control():
    if not machine_lock.acquire(timeout=5):
        return jsonify({"status": "error", "message": "系统繁忙,请稍后重试"})
    
    try:
        # 执行机器控制逻辑
        ...
    finally:
        machine_lock.release()

5. 异常处理风险

🟡 中等风险 R-008: PLC通信异常吞噬

现状:

# dtMachineService.py:666
except:
    pass  # ❌ 裸except吞噬所有异常

风险点:

  • 隐藏严重错误 (如内存溢出)
  • 难以排查问题

改进方案:

import traceback
import logging

# 配置日志
logging.basicConfig(
    filename='dtmgt_error.log',
    level=logging.ERROR,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

def machine_poll_safe(self, args):
    """安全的轮询逻辑"""
    machine = args["machine"]
    
    try:
        # 读取PLC连接状态
        result = machine.get('virt_plc_device').read_plc_connect_state()
        
        if result.get('status') == 'success':
            self.handle_status_sync_with_plc(machine, result.get('value'))
        
    except KeyboardInterrupt:
        # 允许Ctrl+C中断
        raise
    
    except ModbusException as e:
        # Modbus协议异常
        print_with_timestamp(
            f"❌ Modbus通信异常: {machine.get('SN')} - {e}",
            color='red'
        )
        logging.error(f"Modbus异常: {machine.get('SN')}", exc_info=True)
    
    except serial.SerialException as e:
        # 串口异常
        print_with_timestamp(
            f"❌ 串口异常: {machine.get('com')} - {e}",
            color='red'
        )
        logging.error(f"串口异常: {machine.get('com')}", exc_info=True)
        
        # 尝试重连
        try:
            machine['virt_plc_device'].connect_to_modbus_server(
                self.modbusServer,
                self.modbusServerPort
            )
        except Exception as reconnect_error:
            logging.error(f"重连失败: {reconnect_error}")
    
    except Exception as e:
        # 未知异常
        print_with_timestamp(
            f"🚨 未知异常: {type(e).__name__} - {e}",
            color='red'
        )
        logging.critical(
            f"未知异常: {machine.get('SN')}",
            exc_info=True
        )
        
        # 发送告警 (可选)
        # send_alert_email(subject="系统异常", body=traceback.format_exc())

6. 改进方案优先级

P0 (紧急 - 数据安全)

编号 问题 改进方案 预期收益
R-001 SQLite并发写入 SafeSQLiteWriter类 消除数据丢失
R-003 计数回绕判断 CounterManager类 避免漏读/重复
R-007 busy_flag竞态 MachineOperationLock 防止PLC写冲突

P1 (重要 - 稳定性)

编号 问题 改进方案 预期收益
R-002 LMIS内存泄漏 有界队列+持久化 防止OOM
R-004 结果索引管理 ResultIndexManager 提升可靠性
R-008 异常吞噬 结构化异常处理 可观测性

P2 (优化 - 性能)

编号 问题 改进方案 预期收益
R-005 串口锁粒度 细化锁范围 +30%并发
R-006 数据库索引 创建复合索引 查询加速10x

7. 监控与告警建议

7.1 关键指标监控

class SystemMonitor:
    """系统健康监控"""
    
    def __init__(self):
        self.metrics = {
            'plc_read_success_rate': 0,
            'plc_read_latency_p99': 0,
            'db_write_success_rate': 0,
            'lmis_upload_success_rate': 0,
            'queue_depth': {
                'db_write': 0,
                'lmis_upload': 0,
                'websocket': 0
            }
        }
    
    def report_metrics(self):
        """每分钟上报指标"""
        print_with_timestamp(
            f"📊 系统指标: "
            f"PLC读取成功率={self.metrics['plc_read_success_rate']:.2%}, "
            f"延迟P99={self.metrics['plc_read_latency_p99']:.0f}ms, "
            f"数据库写入成功率={self.metrics['db_write_success_rate']:.2%}, "
            f"LMIS上传成功率={self.metrics['lmis_upload_success_rate']:.2%}",
            color='cyan'
        )

7.2 告警规则

指标 阈值 告警级别
PLC读取成功率 <95% WARNING
PLC读取延迟P99 >500ms WARNING
数据库写入失败率 >1% CRITICAL
LMIS上传重试队列 >500 WARNING
内存使用率 >80% WARNING

文档版本: v1.0 最后更新: 2025-11-19 下一步: 实施P0级改进方案