213 lines
8.3 KiB
Python
213 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
WebSocket 客户端,用于接收后台推送的机台和工位实时数据
|
||
"""
|
||
|
||
from PyQt5.QtCore import QObject, pyqtSignal, QThread
|
||
import asyncio
|
||
import websockets
|
||
import json
|
||
import traceback
|
||
import time
|
||
from collections import defaultdict
|
||
|
||
|
||
class WebSocketClientThread(QThread):
|
||
"""WebSocket 客户端线程"""
|
||
|
||
# 信号定义
|
||
message_received = pyqtSignal(dict) # 接收到消息
|
||
connection_status_changed = pyqtSignal(bool) # 连接状态变化
|
||
error_occurred = pyqtSignal(str) # 发生错误
|
||
|
||
def __init__(self, url="ws://localhost:5080", parent=None):
|
||
super().__init__(parent)
|
||
self.url = url
|
||
self._running = False
|
||
self._websocket = None
|
||
self._loop = None
|
||
|
||
def run(self):
|
||
"""线程运行主函数"""
|
||
self._running = True
|
||
self._loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self._loop)
|
||
|
||
try:
|
||
self._loop.run_until_complete(self._connect_and_listen())
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"WebSocket 线程异常: {str(e)}")
|
||
traceback.print_exc()
|
||
finally:
|
||
self._loop.close()
|
||
|
||
async def _connect_and_listen(self):
|
||
"""连接 WebSocket 服务器并监听消息"""
|
||
retry_count = 0
|
||
max_retries = 5
|
||
retry_delay = 5 # 秒
|
||
|
||
while self._running:
|
||
try:
|
||
print(f"正在连接 WebSocket 服务器: {self.url}")
|
||
async with websockets.connect(self.url) as websocket:
|
||
self._websocket = websocket
|
||
self.connection_status_changed.emit(True)
|
||
print(f"WebSocket 连接成功: {self.url}")
|
||
retry_count = 0 # 重置重试计数
|
||
|
||
# 持续监听消息
|
||
async for message in websocket:
|
||
if not self._running:
|
||
break
|
||
|
||
try:
|
||
# 解析 JSON 消息
|
||
data = json.loads(message)
|
||
# 发射信号通知主线程
|
||
self.message_received.emit(data)
|
||
except json.JSONDecodeError as e:
|
||
self.error_occurred.emit(f"JSON 解析失败: {str(e)}")
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"处理消息失败: {str(e)}")
|
||
|
||
except websockets.exceptions.ConnectionClosedError:
|
||
self.connection_status_changed.emit(False)
|
||
print("WebSocket 连接已关闭")
|
||
if self._running:
|
||
retry_count += 1
|
||
if retry_count <= max_retries:
|
||
print(f"将在 {retry_delay} 秒后重试连接 ({retry_count}/{max_retries})...")
|
||
await asyncio.sleep(retry_delay)
|
||
else:
|
||
self.error_occurred.emit(f"WebSocket 重连失败,已达到最大重试次数 ({max_retries})")
|
||
break
|
||
|
||
except Exception as e:
|
||
self.connection_status_changed.emit(False)
|
||
self.error_occurred.emit(f"WebSocket 连接错误: {str(e)}")
|
||
traceback.print_exc()
|
||
if self._running:
|
||
retry_count += 1
|
||
if retry_count <= max_retries:
|
||
print(f"将在 {retry_delay} 秒后重试连接 ({retry_count}/{max_retries})...")
|
||
await asyncio.sleep(retry_delay)
|
||
else:
|
||
break
|
||
|
||
def stop(self):
|
||
"""停止 WebSocket 客户端"""
|
||
self._running = False
|
||
if self._websocket:
|
||
try:
|
||
asyncio.run_coroutine_threadsafe(self._websocket.close(), self._loop)
|
||
except Exception as e:
|
||
print(f"关闭 WebSocket 连接失败: {e}")
|
||
self.quit()
|
||
self.wait()
|
||
|
||
|
||
class WebSocketClient(QObject):
|
||
"""WebSocket 客户端管理器"""
|
||
|
||
# 信号定义
|
||
machine_data_changed = pyqtSignal(dict) # 机台数据变化
|
||
station_data_changed = pyqtSignal(dict) # 工位数据变化(预留,待后续完善)
|
||
connection_status_changed = pyqtSignal(bool) # 连接状态
|
||
error_occurred = pyqtSignal(str) # 错误
|
||
|
||
def __init__(self, url="ws://localhost:5080", parent=None):
|
||
super().__init__(parent)
|
||
self.url = url
|
||
self._thread = None
|
||
self._connected = False
|
||
|
||
# 节流控制参数
|
||
self._message_buffer = defaultdict(dict) # 消息缓存
|
||
self._last_emit_time = defaultdict(float) # 上次发送时间
|
||
self._throttle_interval = 0.1 # 节流间隔 100ms,每秒最多 5 次更新
|
||
|
||
def start(self):
|
||
"""启动 WebSocket 客户端"""
|
||
if self._thread and self._thread.isRunning():
|
||
print("WebSocket 客户端已在运行")
|
||
return
|
||
|
||
self._thread = WebSocketClientThread(self.url)
|
||
self._thread.message_received.connect(self._handle_message)
|
||
self._thread.connection_status_changed.connect(self._on_connection_status_changed)
|
||
self._thread.error_occurred.connect(self.error_occurred.emit)
|
||
self._thread.start()
|
||
print("WebSocket 客户端已启动")
|
||
|
||
def stop(self):
|
||
"""停止 WebSocket 客户端"""
|
||
if self._thread:
|
||
self._thread.stop()
|
||
self._thread = None
|
||
print("WebSocket 客户端已停止")
|
||
|
||
def _handle_message(self, data):
|
||
"""处理接收到的消息(带节流控制)"""
|
||
try:
|
||
command = data.get('command', '')
|
||
|
||
if command == 'machine_data_change':
|
||
# 机台数据变化 - 使用节流机制
|
||
machine_sn = data.get('machine_sn') or data.get('dtMachineSN') or 'unknown'
|
||
self._throttle_emit('machine', machine_sn, data)
|
||
elif command in ('station_data_change', 'station_drop_result_change'):
|
||
# 工位(通道)数据变化或跌落结果变化 - 使用节流机制
|
||
msn = data.get('machine_sn') or data.get('dtMachineSN') or ''
|
||
ssn = data.get('station_sn') or data.get('SN') or ''
|
||
station_key = f"{msn}_{ssn}"
|
||
self._throttle_emit('station', station_key, data)
|
||
else:
|
||
print(f"收到未知命令: {command}")
|
||
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"处理消息失败: {str(e)}")
|
||
traceback.print_exc()
|
||
|
||
def _throttle_emit(self, msg_type, key, data):
|
||
"""节流发送信号,避免高频更新阻塞UI
|
||
|
||
策略:
|
||
1. 缓存最新数据
|
||
2. 如果距离上次发送时间 < 节流间隔,仅更新缓存
|
||
3. 如果距离上次发送时间 >= 节流间隔,立即发送并清空缓存
|
||
"""
|
||
current_time = time.time()
|
||
buffer_key = f"{msg_type}_{key}"
|
||
|
||
# 更新缓存为最新数据
|
||
self._message_buffer[buffer_key] = data
|
||
|
||
# 检查是否需要发送
|
||
last_time = self._last_emit_time.get(buffer_key, 0)
|
||
time_since_last = current_time - last_time
|
||
|
||
if time_since_last >= self._throttle_interval:
|
||
# 可以发送:发送缓存的最新数据
|
||
buffered_data = self._message_buffer.pop(buffer_key)
|
||
self._last_emit_time[buffer_key] = current_time
|
||
|
||
if msg_type == 'machine':
|
||
self.machine_data_changed.emit(buffered_data)
|
||
elif msg_type == 'station':
|
||
self.station_data_changed.emit(buffered_data)
|
||
# 否则:仅缓存,不发送(等待下次节流窗口)
|
||
|
||
def _on_connection_status_changed(self, connected):
|
||
"""连接状态变化"""
|
||
self._connected = connected
|
||
self.connection_status_changed.emit(connected)
|
||
print(f"WebSocket 连接状态: {'已连接' if connected else '已断开'}")
|
||
|
||
@property
|
||
def is_connected(self):
|
||
"""是否已连接"""
|
||
return self._connected
|