Files
dtm-py-all/UI/ui_utils/websocket_client.py

213 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WebSocket 客户端,用于接收后台推送的机台和工位实时数据
"""
from PyQt5.QtCore import QObject, pyqtSignal, QThread
import asyncio
import websockets
import json
import traceback
import time
from collections import defaultdict
class WebSocketClientThread(QThread):
"""WebSocket 客户端线程"""
# 信号定义
message_received = pyqtSignal(dict) # 接收到消息
connection_status_changed = pyqtSignal(bool) # 连接状态变化
error_occurred = pyqtSignal(str) # 发生错误
def __init__(self, url="ws://localhost:5080", parent=None):
super().__init__(parent)
self.url = url
self._running = False
self._websocket = None
self._loop = None
def run(self):
"""线程运行主函数"""
self._running = True
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._connect_and_listen())
except Exception as e:
self.error_occurred.emit(f"WebSocket 线程异常: {str(e)}")
traceback.print_exc()
finally:
self._loop.close()
async def _connect_and_listen(self):
"""连接 WebSocket 服务器并监听消息"""
retry_count = 0
max_retries = 5
retry_delay = 5 # 秒
while self._running:
try:
print(f"正在连接 WebSocket 服务器: {self.url}")
async with websockets.connect(self.url) as websocket:
self._websocket = websocket
self.connection_status_changed.emit(True)
print(f"WebSocket 连接成功: {self.url}")
retry_count = 0 # 重置重试计数
# 持续监听消息
async for message in websocket:
if not self._running:
break
try:
# 解析 JSON 消息
data = json.loads(message)
# 发射信号通知主线程
self.message_received.emit(data)
except json.JSONDecodeError as e:
self.error_occurred.emit(f"JSON 解析失败: {str(e)}")
except Exception as e:
self.error_occurred.emit(f"处理消息失败: {str(e)}")
except websockets.exceptions.ConnectionClosedError:
self.connection_status_changed.emit(False)
print("WebSocket 连接已关闭")
if self._running:
retry_count += 1
if retry_count <= max_retries:
print(f"将在 {retry_delay} 秒后重试连接 ({retry_count}/{max_retries})...")
await asyncio.sleep(retry_delay)
else:
self.error_occurred.emit(f"WebSocket 重连失败,已达到最大重试次数 ({max_retries})")
break
except Exception as e:
self.connection_status_changed.emit(False)
self.error_occurred.emit(f"WebSocket 连接错误: {str(e)}")
traceback.print_exc()
if self._running:
retry_count += 1
if retry_count <= max_retries:
print(f"将在 {retry_delay} 秒后重试连接 ({retry_count}/{max_retries})...")
await asyncio.sleep(retry_delay)
else:
break
def stop(self):
"""停止 WebSocket 客户端"""
self._running = False
if self._websocket:
try:
asyncio.run_coroutine_threadsafe(self._websocket.close(), self._loop)
except Exception as e:
print(f"关闭 WebSocket 连接失败: {e}")
self.quit()
self.wait()
class WebSocketClient(QObject):
"""WebSocket 客户端管理器"""
# 信号定义
machine_data_changed = pyqtSignal(dict) # 机台数据变化
station_data_changed = pyqtSignal(dict) # 工位数据变化(预留,待后续完善)
connection_status_changed = pyqtSignal(bool) # 连接状态
error_occurred = pyqtSignal(str) # 错误
def __init__(self, url="ws://localhost:5080", parent=None):
super().__init__(parent)
self.url = url
self._thread = None
self._connected = False
# 节流控制参数
self._message_buffer = defaultdict(dict) # 消息缓存
self._last_emit_time = defaultdict(float) # 上次发送时间
self._throttle_interval = 0.1 # 节流间隔 100ms每秒最多 5 次更新
def start(self):
"""启动 WebSocket 客户端"""
if self._thread and self._thread.isRunning():
print("WebSocket 客户端已在运行")
return
self._thread = WebSocketClientThread(self.url)
self._thread.message_received.connect(self._handle_message)
self._thread.connection_status_changed.connect(self._on_connection_status_changed)
self._thread.error_occurred.connect(self.error_occurred.emit)
self._thread.start()
print("WebSocket 客户端已启动")
def stop(self):
"""停止 WebSocket 客户端"""
if self._thread:
self._thread.stop()
self._thread = None
print("WebSocket 客户端已停止")
def _handle_message(self, data):
"""处理接收到的消息(带节流控制)"""
try:
command = data.get('command', '')
if command == 'machine_data_change':
# 机台数据变化 - 使用节流机制
machine_sn = data.get('machine_sn') or data.get('dtMachineSN') or 'unknown'
self._throttle_emit('machine', machine_sn, data)
elif command in ('station_data_change', 'station_drop_result_change'):
# 工位(通道)数据变化或跌落结果变化 - 使用节流机制
msn = data.get('machine_sn') or data.get('dtMachineSN') or ''
ssn = data.get('station_sn') or data.get('SN') or ''
station_key = f"{msn}_{ssn}"
self._throttle_emit('station', station_key, data)
else:
print(f"收到未知命令: {command}")
except Exception as e:
self.error_occurred.emit(f"处理消息失败: {str(e)}")
traceback.print_exc()
def _throttle_emit(self, msg_type, key, data):
"""节流发送信号避免高频更新阻塞UI
策略:
1. 缓存最新数据
2. 如果距离上次发送时间 < 节流间隔,仅更新缓存
3. 如果距离上次发送时间 >= 节流间隔,立即发送并清空缓存
"""
current_time = time.time()
buffer_key = f"{msg_type}_{key}"
# 更新缓存为最新数据
self._message_buffer[buffer_key] = data
# 检查是否需要发送
last_time = self._last_emit_time.get(buffer_key, 0)
time_since_last = current_time - last_time
if time_since_last >= self._throttle_interval:
# 可以发送:发送缓存的最新数据
buffered_data = self._message_buffer.pop(buffer_key)
self._last_emit_time[buffer_key] = current_time
if msg_type == 'machine':
self.machine_data_changed.emit(buffered_data)
elif msg_type == 'station':
self.station_data_changed.emit(buffered_data)
# 否则:仅缓存,不发送(等待下次节流窗口)
def _on_connection_status_changed(self, connected):
"""连接状态变化"""
self._connected = connected
self.connection_status_changed.emit(connected)
print(f"WebSocket 连接状态: {'已连接' if connected else '已断开'}")
@property
def is_connected(self):
"""是否已连接"""
return self._connected