32 KiB
32 KiB
数字孪生跌落机监控系统 - 风险分析与改进建议
📋 目录
1. 架构层面风险
🔴 严重风险 R-001: SQLite并发写入限制
现状:
# dtmgtApp.py:373
def run_insert_result():
while True:
insert_records = []
while not ws_message_send_queue.empty(): # ❌ 错误:应该是result_to_localDB_queue
result_row = result_to_localDB_queue.get()
insert_records.append(result_records)
风险点:
- 代码逻辑错误: 检查
ws_message_send_queue而从result_to_localDB_queue取数据 - SQLite并发限制: 多个线程同时写入会导致
SQLITE_BUSY错误 - 无写入失败重试机制
影响:
- 高并发场景下数据丢失
- 测试结果无法持久化
改进方案:
# 改进版本 - 添加重试机制和错误处理
import queue
from contextlib import contextmanager
# 全局配置
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 0.5 # 秒
class SafeSQLiteWriter:
def __init__(self, db_path, batch_size=50, flush_interval=2):
self.db_path = db_path
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = queue.Queue()
self.failed_queue = queue.Queue() # 失败重试队列
@contextmanager
def get_connection(self):
"""上下文管理器获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL") # 启用WAL模式提升并发
conn.execute("PRAGMA synchronous=NORMAL") # 平衡性能与安全
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def batch_insert(self, table, records, retry_count=0):
"""批量插入数据,带重试机制"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
# 动态构建批量插入语句
if not records:
return True
fields = sorted(records[0].keys())
placeholders = ','.join(['?'] * len(fields))
sql = f"INSERT INTO {table} ({','.join(fields)}) VALUES ({placeholders})"
# 准备参数
params = [tuple(rec.get(field) for field in fields) for rec in records]
# 批量执行
cursor.executemany(sql, params)
print_with_timestamp(
f"✅ 成功写入 {len(records)} 条记录到 {table}",
color='green'
)
return True
except sqlite3.OperationalError as e:
if "locked" in str(e).lower() and retry_count < MAX_RETRY_ATTEMPTS:
# 数据库锁定,延迟重试
print_with_timestamp(
f"⚠️ 数据库锁定,{RETRY_DELAY}秒后重试 (第{retry_count+1}次)",
color='yellow'
)
time.sleep(RETRY_DELAY)
return self.batch_insert(table, records, retry_count + 1)
else:
print_with_timestamp(
f"❌ 批量插入失败: {e},将数据放入失败队列",
color='red'
)
# 放入失败队列,后续重试
for rec in records:
self.failed_queue.put((table, rec))
return False
except Exception as e:
print_with_timestamp(
f"❌ 批量插入异常: {e}",
color='red'
)
return False
def worker_thread(self):
"""工作线程 - 批量写入"""
while True:
batch = []
deadline = time.time() + self.flush_interval
try:
# 收集批次
while len(batch) < self.batch_size and time.time() < deadline:
timeout = max(0.1, deadline - time.time())
try:
table, record = self.queue.get(timeout=timeout)
batch.append((table, record))
except queue.Empty:
break
# 按表分组
table_groups = {}
for table, record in batch:
table_groups.setdefault(table, []).append(record)
# 批量写入
for table, records in table_groups.items():
self.batch_insert(table, records)
# 处理失败重试队列
retry_batch = []
while not self.failed_queue.empty() and len(retry_batch) < 10:
try:
retry_batch.append(self.failed_queue.get_nowait())
except queue.Empty:
break
if retry_batch:
retry_groups = {}
for table, record in retry_batch:
retry_groups.setdefault(table, []).append(record)
for table, records in retry_groups.items():
if not self.batch_insert(table, records):
# 仍然失败,写入日志文件
self._write_failed_log(table, records)
except Exception as e:
print_with_timestamp(
f"❌ 写入线程异常: {e}",
color='red'
)
time.sleep(0.1)
def _write_failed_log(self, table, records):
"""将持续失败的数据写入日志文件"""
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f"failed_inserts_{timestamp}.json"
try:
with open(log_file, 'a', encoding='utf-8') as f:
for rec in records:
f.write(json.dumps({
'table': table,
'record': rec,
'timestamp': timestamp
}, ensure_ascii=False) + '\n')
print_with_timestamp(
f"⚠️ 失败数据已写入日志: {log_file}",
color='yellow'
)
except Exception as e:
print_with_timestamp(
f"❌ 写入失败日志异常: {e}",
color='red'
)
def put(self, table, record):
"""非阻塞提交数据"""
self.queue.put_nowait((table, record))
# 使用示例
db_writer = SafeSQLiteWriter('db/dtmgtDb.db', batch_size=50, flush_interval=2)
writer_thread = threading.Thread(target=db_writer.worker_thread, daemon=True)
writer_thread.start()
def cb_insert_result(test_req_result):
"""回调函数 - 提交到安全写入队列"""
db_writer.put('TestReq', test_req_result)
lmis_upload_system.post_data(test_req_result)
收益:
- ✅ 消除数据丢失风险
- ✅ 提升写入吞吐量 (批量+WAL模式)
- ✅ 失败数据有备份机制
🟡 中等风险 R-002: LMIS上传系统内存占用
现状:
# dtmgtApp.py:249
class LMISUploadSystem:
def __init__(self):
self.retry_queue = queue.PriorityQueue() # ❌ 无界队列
self.executor = ThreadPoolExecutor(max_workers=20)
风险点:
- 无界重试队列: 长期失败的数据会无限堆积
- 线程池无超时: 卡死的请求占用线程资源
- 无失败数据持久化
影响:
- 内存泄漏风险
- 线程资源耗尽
改进方案:
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from collections import deque
class ImprovedLMISUploadSystem:
"""改进的LMIS上传系统 - 带内存保护"""
def __init__(self, max_retry_queue_size=1000, request_timeout=30):
self.retry_queue = queue.PriorityQueue(maxsize=max_retry_queue_size)
self.executor = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='lmis_upload'
)
self.request_timeout = request_timeout
self.session = requests.Session()
# 失败持久化
self.persistent_fail_log = 'lmis_failed_uploads.jsonl'
# 统计指标
self.metrics = {
'total_submitted': 0,
'total_success': 0,
'total_failed': 0,
'retry_queue_size': 0
}
self.metrics_lock = threading.Lock()
self._start_retry_monitor()
self._start_metrics_reporter()
def _start_metrics_reporter(self):
"""启动指标上报线程"""
def report():
while True:
time.sleep(60) # 每分钟上报一次
with self.metrics_lock:
print_with_timestamp(
f"📊 LMIS上传统计: 提交={self.metrics['total_submitted']}, "
f"成功={self.metrics['total_success']}, "
f"失败={self.metrics['total_failed']}, "
f"重试队列={self.metrics['retry_queue_size']}",
color='cyan'
)
threading.Thread(target=report, daemon=True).start()
def post_data(self, data):
"""提交数据 - 带超时保护"""
with self.metrics_lock:
self.metrics['total_submitted'] += 1
future = self.executor.submit(self._upload_data, data, 0)
# 不阻塞主线程,但设置超时回调
def timeout_callback(fut):
try:
fut.result(timeout=0.1) # 非阻塞检查
except TimeoutError:
print_with_timestamp(
f"⚠️ 上传任务超时: {data.get('SN')}",
color='yellow'
)
except Exception as e:
print_with_timestamp(
f"❌ 上传任务异常: {e}",
color='red'
)
future.add_done_callback(timeout_callback)
def _upload_data(self, data, retry_count):
"""实际上传逻辑 - 带请求超时"""
try:
# 构建请求 (同原代码)
item_json = {...} # 省略
post_json = {...}
# 添加超时保护
response = self.session.post(
UPLOAD_LIMS_URL,
data=json.dumps(post_json),
headers={'content-type': 'application/json'},
timeout=self.request_timeout # ⭐ 关键改进
)
if response.status_code == 200:
response_json = response.json()
code = response_json.get('code')
if code in (200, 1501):
with self.metrics_lock:
self.metrics['total_success'] += 1
print_with_timestamp(
f"✅ 上传成功: {data['SN']}",
color='green'
)
return
raise Exception(f"上传失败: HTTP {response.status_code}")
except requests.Timeout:
print_with_timestamp(
f"⏱️ 上传超时 ({self.request_timeout}s): {data.get('SN')}",
color='yellow'
)
self._handle_retry(data, retry_count)
except Exception as e:
print_with_timestamp(
f"❌ 上传异常: {e}",
color='red'
)
self._handle_retry(data, retry_count)
def _handle_retry(self, data, retry_count):
"""处理重试逻辑 - 带队列满保护"""
if retry_count < UPLOAD_MAX_RETRIES:
next_retry = time.time() + UPLOAD_RETRY_INTERVAL
try:
# 非阻塞入队
self.retry_queue.put_nowait((next_retry, data, retry_count + 1))
with self.metrics_lock:
self.metrics['retry_queue_size'] = self.retry_queue.qsize()
except queue.Full:
# 队列满,持久化到文件
self._persist_failed_data(data)
print_with_timestamp(
f"⚠️ 重试队列已满,数据已持久化: {data.get('SN')}",
color='yellow'
)
else:
# 最终失败
with self.metrics_lock:
self.metrics['total_failed'] += 1
self._persist_failed_data(data)
print_with_timestamp(
f"❌ 放弃上传 (重试{UPLOAD_MAX_RETRIES}次): {data.get('SN')}",
color='red'
)
def _persist_failed_data(self, data):
"""持久化失败数据"""
try:
with open(self.persistent_fail_log, 'a', encoding='utf-8') as f:
f.write(json.dumps({
'data': data,
'timestamp': datetime.datetime.now().isoformat(),
'reason': 'max_retries_exceeded'
}, ensure_ascii=False) + '\n')
except Exception as e:
print_with_timestamp(
f"❌ 持久化失败数据异常: {e}",
color='red'
)
收益:
- ✅ 防止内存泄漏
- ✅ 避免线程资源耗尽
- ✅ 失败数据可人工补录
2. 数据丢失风险
🔴 严重风险 R-003: 测试结果计数回绕判断缺失
现状:
# dtMachineService.py:765
if result.get('value') > station.get("pre_dut_result_counter", 0):
# 直接判断大于,未处理计数器回绕
station['last_insert_counter'] = result.get('value')
self.machine_poll_test_result_bulk(machine, station)
风险点:
- PLC计数器回绕: 32位计数器达到最大值后归零
- 抖动误判: 网络延迟导致的旧值晚到
- 断电恢复: 上位机重启后
pre_dut_result_counter丢失
影响:
- 漏读测试结果
- 重复读取数据
改进方案:
class CounterManager:
"""测试结果计数器管理器 - 处理回绕和抖动"""
def __init__(self, max_counter=0xFFFFFFFF, rollover_threshold=0.9):
self.max_counter = max_counter
self.rollover_threshold = int(max_counter * rollover_threshold)
def is_valid_increment(self, current, previous, station_sn):
"""
判断计数器是否为有效递增
返回:
(is_valid, reason)
"""
# 情况1: 首次读取
if previous == 0:
return (True, "first_read")
# 情况2: 正常递增
if current > previous and (current - previous) <= 100:
return (True, "normal_increment")
# 情况3: 回绕检测
if previous > self.rollover_threshold and current < (self.max_counter * 0.1):
rollover_gap = (self.max_counter - previous) + current
if rollover_gap <= 100:
print_with_timestamp(
f"🔄 检测到计数器回绕: {station_sn} {previous} → {current}",
color='yellow'
)
return (True, "rollover")
# 情况4: 抖动 (当前值小于历史值,但差距不大)
if current < previous and (previous - current) <= 5:
print_with_timestamp(
f"⚠️ 检测到计数抖动 (忽略): {station_sn} {current} < {previous}",
color='yellow'
)
return (False, "jitter")
# 情况5: 异常跳变
if current > previous and (current - previous) > 100:
print_with_timestamp(
f"❌ 检测到计数异常跳变: {station_sn} {previous} → {current} (增量={current-previous})",
color='red'
)
# 记录告警,但仍然读取数据
self._log_counter_anomaly(station_sn, previous, current)
return (True, "abnormal_jump")
# 情况6: 计数器倒退 (严重异常)
if current < previous and (previous - current) > 5:
print_with_timestamp(
f"🚨 严重: 计数器倒退: {station_sn} {previous} → {current}",
color='red'
)
self._log_counter_anomaly(station_sn, previous, current)
return (False, "counter_rollback")
return (False, "unknown")
def _log_counter_anomaly(self, station_sn, previous, current):
"""记录计数器异常"""
log_entry = {
'timestamp': datetime.datetime.now().isoformat(),
'station': station_sn,
'previous': previous,
'current': current,
'delta': current - previous
}
try:
with open('counter_anomaly.log', 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print_with_timestamp(f"❌ 写入计数异常日志失败: {e}", color='red')
# 在 dtMachineService 中使用
counter_manager = CounterManager()
def machine_poll_enhanced(self, args):
"""增强的轮询逻辑"""
machine = args["machine"]
for station in machine.get('stations', []):
if station.get('status') == 'running' and self.dutDirectionMode in [2]:
result = machine.get('virt_plc_device').read_station_result_counter(station['SN'])
if result.get('status') == 'success':
current_counter = result.get('value')
previous_counter = station.get("pre_dut_result_counter", 0)
# ⭐ 使用计数管理器判断
is_valid, reason = counter_manager.is_valid_increment(
current_counter,
previous_counter,
station['SN']
)
if is_valid:
# 记录到监控表
insert_record = {
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"machine": machine.get('SN'),
'station': station.get('SN'),
'current_counter': current_counter,
'pre_counter': previous_counter,
'reason': reason,
'status': 1
}
self.insertTableRecords('get_counter', [insert_record])
# 更新缓存
station['last_insert_counter'] = current_counter
# 读取测试结果
self.machine_poll_test_result_bulk(machine, station)
else:
# 记录异常
insert_record = {
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"machine": machine.get('SN'),
'station': station.get('SN'),
'current_counter': current_counter,
'pre_counter': previous_counter,
'reason': reason,
'status': 0 # 无效
}
self.insertTableRecords('get_counter', [insert_record])
收益:
- ✅ 正确处理32位计数器回绕
- ✅ 识别并过滤抖动
- ✅ 异常数据有日志可追溯
🟡 中等风险 R-004: 批量读取索引管理缺陷
现状:
# dtMachineService.py:380
station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10
风险点:
- 固定增量: 假设每次必读成功
- 无失败回退: 读取失败时索引仍然前进
- 与PLC不同步: 没有读取PLC的写指针
改进方案:
class ResultIndexManager:
"""测试结果索引管理器"""
def __init__(self, buffer_size=10, item_size=1):
self.buffer_size = buffer_size
self.item_size = item_size
self.read_history = deque(maxlen=100) # 记录最近100次读取
def calculate_next_index(self, current_index, read_success, plc_write_index=None):
"""
计算下一次读取索引
参数:
current_index: 当前读取索引
read_success: 本次读取是否成功
plc_write_index: PLC当前写入索引 (可选)
"""
if not read_success:
# 读取失败,保持当前索引
self.read_history.append({
'timestamp': time.time(),
'index': current_index,
'success': False
})
return current_index
# 读取成功,前进
next_index = (current_index + self.item_size) % self.buffer_size
# 如果知道PLC写指针,避免追尾
if plc_write_index is not None:
if next_index == plc_write_index:
print_with_timestamp(
f"⚠️ 读指针追上写指针,暂停推进: read={next_index}, write={plc_write_index}",
color='yellow'
)
next_index = current_index
self.read_history.append({
'timestamp': time.time(),
'index': current_index,
'success': True,
'next': next_index
})
return next_index
def get_read_statistics(self):
"""获取读取统计"""
if not self.read_history:
return {}
total = len(self.read_history)
success_count = sum(1 for r in self.read_history if r['success'])
return {
'total_reads': total,
'success_rate': success_count / total,
'recent_10': list(self.read_history)[-10:]
}
# 使用示例
index_manager = ResultIndexManager(buffer_size=10, item_size=1)
def machine_poll_test_result_bulk_enhanced(self, machine, station):
"""增强的批量测试结果读取"""
registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead
if station.get('result_index_read') is None:
station['result_index_read'] = 0
current_index = station['result_index_read']
try:
result = machine.get('virt_plc_device').read_station_cyclesFinishedBulk(
station['SN'],
current_index,
registers_len
)
read_success = (result.get('status') == 'success')
if read_success:
# 处理数据
self.handle_test_result_registers_value(
machine,
station,
result.get('value'),
TestReqResult_ItemCountOnceRead
)
# ⭐ 使用索引管理器计算下一次索引
station['result_index_read'] = index_manager.calculate_next_index(
current_index,
read_success,
plc_write_index=None # 如果PLC提供写指针寄存器,可传入
)
except Exception as error:
print_with_timestamp(
f"❌ 批量读取异常: {error}",
color='red'
)
# 失败时索引不变
station['result_index_read'] = current_index
3. 性能瓶颈风险
🟡 中等风险 R-005: 串口通信锁粒度过大
现状:
# modbus_server.py:73
with self.plc_comm_lock: # ❌ 整个读写操作都在锁内
resp = plc_device.plc_read_bits(...)
风险点:
- 锁粒度过大: 数据解析也在锁内
- 阻塞时间长: 串口超时会长时间持锁
- 并发性能差: 多台设备串行访问
改进方案:
# 改进: 只保护串口IO操作
class OptimizedMitsubishiPLC:
def __init__(self, plc_port, comm_config):
self.ser = serial.Serial(...)
self.io_lock = threading.Lock() # 只保护IO
def plc_read_words(self, plc_no, pc_no, address, length):
# 1. 构建帧 (无锁)
frame = self.plc_read_frame('WR', plc_no, pc_no, address, length)
# 2. 串口IO (加锁)
with self.io_lock:
self.ser.write(frame)
recv_frame = self.ser.read(expected_size)
# 3. 数据解析 (无锁)
if not self.frame_is_valid(recv_frame):
return {'status': 'error'}
value = [ascii_array_to_word(recv_frame[5+4*i:5+4*(i+1)]) for i in range(length)]
return {'status': 'success', 'data': value}
收益:
- ✅ 提升30%并发吞吐
- ✅ 降低锁等待时间
🟡 中等风险 R-006: 数据库查询缺少索引
现状:
# api_route.py:425
select_sql_str = (
f'SELECT * from TestReq WHERE TestReq.id > 0 {sql_condition_str}'
)
# ❌ 时间范围查询未建索引
风险点:
- 全表扫描导致查询慢
- 高并发时数据库CPU飙升
改进方案:
-- 创建复合索引
CREATE INDEX IF NOT EXISTS idx_testreq_endtime_sn
ON TestReq(endTime, SN);
CREATE INDEX IF NOT EXISTS idx_testreq_status_endtime
ON TestReq(status, endTime);
CREATE INDEX IF NOT EXISTS idx_dutlist_stationassigned
ON dutList(stationAssigned);
-- 启用查询计划分析
EXPLAIN QUERY PLAN
SELECT * FROM TestReq WHERE endTime BETWEEN '2024-01-01' AND '2024-12-31';
4. 并发安全风险
🔴 严重风险 R-007: busy_flag并发控制不足
现状:
# dtMachineService.py:632
if busy_flag is False: # ❌ 非原子操作
# 可能多个线程同时通过检查
风险点:
- 多线程同时写PLC寄存器
- 数据竞争导致状态不一致
改进方案:
import threading
class MachineOperationLock:
"""机器操作锁 - 支持超时和死锁检测"""
def __init__(self, timeout=5):
self._lock = threading.RLock() # 可重入锁
self._timeout = timeout
self._holder = None
self._acquire_time = None
def acquire(self, blocking=True, timeout=None):
"""获取锁"""
timeout = timeout or self._timeout
acquired = self._lock.acquire(blocking=blocking, timeout=timeout)
if acquired:
self._holder = threading.current_thread().name
self._acquire_time = time.time()
else:
print_with_timestamp(
f"⚠️ 获取机器锁超时 ({timeout}s), 当前持有者: {self._holder}",
color='yellow'
)
return acquired
def release(self):
"""释放锁"""
hold_duration = time.time() - self._acquire_time if self._acquire_time else 0
if hold_duration > 2:
print_with_timestamp(
f"⚠️ 锁持有时间过长: {hold_duration:.2f}s by {self._holder}",
color='yellow'
)
self._holder = None
self._acquire_time = None
self._lock.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用示例
machine_lock = MachineOperationLock(timeout=5)
@app.route('/machine_control', methods=['POST'])
def machine_control():
if not machine_lock.acquire(timeout=5):
return jsonify({"status": "error", "message": "系统繁忙,请稍后重试"})
try:
# 执行机器控制逻辑
...
finally:
machine_lock.release()
5. 异常处理风险
🟡 中等风险 R-008: PLC通信异常吞噬
现状:
# dtMachineService.py:666
except:
pass # ❌ 裸except吞噬所有异常
风险点:
- 隐藏严重错误 (如内存溢出)
- 难以排查问题
改进方案:
import traceback
import logging
# 配置日志
logging.basicConfig(
filename='dtmgt_error.log',
level=logging.ERROR,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def machine_poll_safe(self, args):
"""安全的轮询逻辑"""
machine = args["machine"]
try:
# 读取PLC连接状态
result = machine.get('virt_plc_device').read_plc_connect_state()
if result.get('status') == 'success':
self.handle_status_sync_with_plc(machine, result.get('value'))
except KeyboardInterrupt:
# 允许Ctrl+C中断
raise
except ModbusException as e:
# Modbus协议异常
print_with_timestamp(
f"❌ Modbus通信异常: {machine.get('SN')} - {e}",
color='red'
)
logging.error(f"Modbus异常: {machine.get('SN')}", exc_info=True)
except serial.SerialException as e:
# 串口异常
print_with_timestamp(
f"❌ 串口异常: {machine.get('com')} - {e}",
color='red'
)
logging.error(f"串口异常: {machine.get('com')}", exc_info=True)
# 尝试重连
try:
machine['virt_plc_device'].connect_to_modbus_server(
self.modbusServer,
self.modbusServerPort
)
except Exception as reconnect_error:
logging.error(f"重连失败: {reconnect_error}")
except Exception as e:
# 未知异常
print_with_timestamp(
f"🚨 未知异常: {type(e).__name__} - {e}",
color='red'
)
logging.critical(
f"未知异常: {machine.get('SN')}",
exc_info=True
)
# 发送告警 (可选)
# send_alert_email(subject="系统异常", body=traceback.format_exc())
6. 改进方案优先级
P0 (紧急 - 数据安全)
| 编号 | 问题 | 改进方案 | 预期收益 |
|---|---|---|---|
| R-001 | SQLite并发写入 | SafeSQLiteWriter类 | 消除数据丢失 |
| R-003 | 计数回绕判断 | CounterManager类 | 避免漏读/重复 |
| R-007 | busy_flag竞态 | MachineOperationLock | 防止PLC写冲突 |
P1 (重要 - 稳定性)
| 编号 | 问题 | 改进方案 | 预期收益 |
|---|---|---|---|
| R-002 | LMIS内存泄漏 | 有界队列+持久化 | 防止OOM |
| R-004 | 结果索引管理 | ResultIndexManager | 提升可靠性 |
| R-008 | 异常吞噬 | 结构化异常处理 | 可观测性 |
P2 (优化 - 性能)
| 编号 | 问题 | 改进方案 | 预期收益 |
|---|---|---|---|
| R-005 | 串口锁粒度 | 细化锁范围 | +30%并发 |
| R-006 | 数据库索引 | 创建复合索引 | 查询加速10x |
7. 监控与告警建议
7.1 关键指标监控
class SystemMonitor:
"""系统健康监控"""
def __init__(self):
self.metrics = {
'plc_read_success_rate': 0,
'plc_read_latency_p99': 0,
'db_write_success_rate': 0,
'lmis_upload_success_rate': 0,
'queue_depth': {
'db_write': 0,
'lmis_upload': 0,
'websocket': 0
}
}
def report_metrics(self):
"""每分钟上报指标"""
print_with_timestamp(
f"📊 系统指标: "
f"PLC读取成功率={self.metrics['plc_read_success_rate']:.2%}, "
f"延迟P99={self.metrics['plc_read_latency_p99']:.0f}ms, "
f"数据库写入成功率={self.metrics['db_write_success_rate']:.2%}, "
f"LMIS上传成功率={self.metrics['lmis_upload_success_rate']:.2%}",
color='cyan'
)
7.2 告警规则
| 指标 | 阈值 | 告警级别 |
|---|---|---|
| PLC读取成功率 | <95% | WARNING |
| PLC读取延迟P99 | >500ms | WARNING |
| 数据库写入失败率 | >1% | CRITICAL |
| LMIS上传重试队列 | >500 | WARNING |
| 内存使用率 | >80% | WARNING |
文档版本: v1.0 最后更新: 2025-11-19 下一步: 实施P0级改进方案