Files
ragflow_python/asr-monitor-test/app/monitor_service.py

418 lines
14 KiB
Python
Raw Normal View History

from fastapi import WebSocket, APIRouter,WebSocketDisconnect,Request,Body,Query
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import StreamingResponse,JSONResponse
import asyncio
import time
from typing import Dict, Optional
import logging,uuid
from contextlib import suppress
monitor_router = APIRouter()
logger = logging.getLogger(__name__)
class DeviceMonitor:
def __init__(self):
# 连接状态存储
self.active_connections: Dict[str, WebSocket] = {}
# 心跳时间记录
self.client_last_ping: Dict[str, float] = {} # 客户端最后发送时间
self.server_last_ping: Dict[str, float] = {} # 服务端最后发送时间
self.server_last_ack: Dict[str, float] = {} # 服务端最后收到ACK时间
# 心跳任务管理
self.heartbeat_tasks: Dict[str, asyncio.Task] = {}
# 新增:请求响应等待队列
self.pending_requests: Dict[str, asyncio.Queue] = {}
async def connect(self, device_id: str, websocket: WebSocket, device_sn: str):
"""处理新设备连接"""
await websocket.accept()
websocket.device_sn = device_sn
self.active_connections[device_id] = websocket
self._init_heartbeat(device_id)
logger.info(f"设备 {device_id} 连接成功")
def _init_heartbeat(self, device_id: str):
"""初始化心跳记录"""
now = time.time()
self.client_last_ping[device_id] = now
self.server_last_ping[device_id] = now
self.server_last_ack[device_id] = now
# 启动双向心跳检测
self.heartbeat_tasks[device_id] = asyncio.create_task(
self._server_heartbeat_loop(device_id)
)
async def _server_heartbeat_loop(self, device_id: str):
"""服务端主动心跳循环"""
try:
while True:
# 发送心跳间隔(秒)
await asyncio.sleep(15)
if not self._is_connected(device_id):
break
# 发送心跳并记录时间
await self._send_heartbeat(device_id)
self.server_last_ping[device_id] = time.time()
# 等待ACK响应
await asyncio.sleep(5)
# 检查ACK响应
if not self._check_server_ack(device_id):
logger.warning(f"设备 {device_id} 服务端心跳未响应")
await self.disconnect(device_id)
break
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"心跳任务异常: {str(e)}")
await self.disconnect(device_id)
def _check_server_ack(self, device_id: str) -> bool:
"""检查服务端心跳确认"""
return (
time.time() - self.server_last_ping[device_id] < 20 and
self.server_last_ack[device_id] > self.server_last_ping[device_id]
)
async def _send_heartbeat(self, device_id: str):
"""发送服务端心跳"""
try:
await self.active_connections[device_id].send_json({
"type": "server_heartbeat",
"timestamp": time.time()
})
except Exception as e:
logger.error(f"发送服务端心跳失败: {str(e)}")
await self.disconnect(device_id)
async def disconnect(self, device_id: str):
"""安全断开连接"""
if device_id in self.active_connections:
# 关闭连接
with suppress(Exception):
await self.active_connections[device_id].close()
# 清理状态
self._cleanup_device(device_id)
logger.info(f"设备 {device_id} 已断开")
def _cleanup_device(self, device_id: str):
"""清理设备相关数据"""
# 取消心跳任务
if task := self.heartbeat_tasks.get(device_id):
task.cancel()
del self.heartbeat_tasks[device_id]
# 删除记录
for collection in [
self.active_connections,
self.client_last_ping,
self.server_last_ping,
self.server_last_ack
]:
if device_id in collection:
del collection[device_id]
async def check_client_heartbeats(self):
"""检查客户端心跳定时任务"""
while True:
await asyncio.sleep(10)
current_time = time.time()
for device_id in list(self.client_last_ping.keys()):
if current_time - self.client_last_ping[device_id] > 30:
logger.warning(f"设备 {device_id} 客户端心跳超时 {current_time - self.client_last_ping[device_id]}")
await self.disconnect(device_id)
def _is_connected(self, device_id: str) -> bool:
"""检查连接是否有效"""
return device_id in self.active_connections
def get_device_socket(self,deviceId):
return self.active_connections.get(deviceId)
async def get_log_list(self,deviceId):
device_socket = self.active_connections.get(deviceId)
if device_socket:
print("send request_log_list")
await device_socket.send_json({"type": "request_log_list"})
async def send_log_list_request(self, device_id: str) -> Optional[dict]:
"""发送日志请求并等待响应"""
if device_id not in self.active_connections:
return None
# 生成唯一请求ID
request_id = str(uuid.uuid4())
response_queue = asyncio.Queue(maxsize=1)
self.pending_requests[request_id] = response_queue
try:
# 发送带请求ID的请求
await self.active_connections[device_id].send_json({
"type": "request_log_list",
"request_id": request_id
})
# 等待响应最多等待10秒
return await asyncio.wait_for(response_queue.get(), timeout=10)
except asyncio.TimeoutError:
logger.warning(f"设备 {device_id} 日志请求超时")
return None
finally:
# 清理请求队列
self.pending_requests.pop(request_id, None)
async def send_upload_log_command(self, device_id: str,file_name: str,request_id: str) -> Optional[dict]:
"""发送日志请求并等待响应"""
if device_id not in self.active_connections:
return None
if not file_name:
return None
try:
# 发送带请求ID的请求
await self.active_connections[device_id].send_json({
"type": "request_log_file",
"request_id": request_id,
"file_name" :file_name
})
# 等待响应最多等待10秒
return True
except asyncio.TimeoutError:
logger.warning(f"设备 {device_id} 日志请求超时")
return False
finally:
# 清理请求队列
pass
async def handle_websocket_message(self, device_id: str, data: dict):
"""处理WebSocket消息"""
if data["type"] == "log_list":
# 提取请求ID并放入响应队列
request_id = data.get("request_id")
if request_id in self.pending_requests:
await self.pending_requests[request_id].put({
"files": data["files"],
"status": "success"
})
def get_device_online_list(self):
return self.active_connections
monitor = DeviceMonitor()
@monitor_router.on_event("startup")
async def startup_event():
# 启动客户端心跳检测
asyncio.create_task(monitor.check_client_heartbeats())
logger.info("监控服务已启动")
@monitor_router.websocket("/ws")
async def monitor_websocket(websocket: WebSocket):
# await websocket.accept()
# 设备身份验证
#device_id = await websocket.receive_text()
device_id= websocket.headers.get('x-device-id',"1234567")
deivce_sn= websocket.headers.get('x-device-sn',"")
try:
await monitor.connect(device_id, websocket, deivce_sn)
while True:
data = await websocket.receive_json()
# 处理客户端心跳
if data["type"] == "client_heartbeat":
monitor.client_last_ping[device_id] = time.time()
await websocket.send_json({"type": "client_heartbeat_ack"})
# 处理服务端心跳响应
elif data["type"] == "server_heartbeat_ack":
monitor.server_last_ack[device_id] = time.time()
await websocket.send_json({"type": "server_heartbeat_confirm"})
if data["type"] == "log_metadata":
# 初始化文件接收
file_name = data["name"]
current_file = open(f"received_logs/{file_name}", "wb")
total_chunks = data["chunks"]
elif data["type"] == "log_chunk":
index = data["index"]
checksum = data["checksum"]
# 接收二进制数据
chunk_data = await websocket.receive_bytes()
# 校验分片
if hashlib.md5(chunk_data).hexdigest() != checksum:
continue
# 写入文件
current_file.seek(index * 512 * 1024)
current_file.write(chunk_data)
received_chunks.add(index)
# 发送确认
await websocket.send_json({
"type": "log_ack",
"index": index
})
elif data["type"] == "log_list":
await monitor.handle_websocket_message(device_id, data)
except WebSocketDisconnect:
logger.info(f"设备 {device_id} 主动断开")
except Exception as e:
logger.error(f"连接异常: {str(e)}")
finally:
await monitor.disconnect(device_id)
# REST API路由
@monitor_router.get("/log_list")
async def get_log_list(request:Request):
# headers = request.headers
params = dict(request.query_params)
device_id = params.get('device_id')
if not device_id:
return {'success': False, "message": "缺少deviceId参数"}
try:
# 发送请求并等待响应
response = await monitor.send_log_list_request(device_id)
if response:
return {
'success': True,
'files': response.get("files", []),
'deviceId': device_id
}
return {'success': False, "message": "设备未响应"}
except Exception as e:
logger.error(f"日志请求失败: {str(e)}")
return {'success': False, "message": "服务端错误"}
# 异步存储上传事件和文件内容
upload_events = {}
file_cache = {}
upload_events_lock = asyncio.Lock()
file_cache_lock = asyncio.Lock()
@monitor_router.get("/get_log_file")
async def get_log_file_content(request: Request):
params = dict(request.query_params)
device_id = params.get('device_id')
file_name = params.get('file_name')
print(f"get_log_content {device_id} {file_name}")
# 参数校验
if not device_id or not file_name:
return {'success': False, "message": "Missing device_id or file_name"}
# 生成唯一请求ID
request_id = str(uuid.uuid4())
event = asyncio.Event()
# 注册等待事件
async with upload_events_lock:
upload_events[request_id] = event
async with file_cache_lock:
file_cache[request_id] = None
try:
# 通过WebSocket通知设备上传文件
await monitor.send_upload_log_command(
device_id=device_id,
file_name=file_name,
request_id=request_id
)
# 等待文件上传最多30秒
try:
await asyncio.wait_for(event.wait(), timeout=30.0)
except asyncio.TimeoutError:
return {'success': False, "message": "Upload timeout"}
# 获取文件内容
async with file_cache_lock:
content = file_cache.get(request_id)
if not content:
return {'success': False, "message": "Empty file content"}
# 返回文件流响应
return StreamingResponse(
iter([content]),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={file_name}"}
)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return {'success': False, "message": "Server error"}
finally:
# 清理资源
async with upload_events_lock:
upload_events.pop(request_id, None)
async with file_cache_lock:
file_cache.pop(request_id, None)
@monitor_router.post("/logfile/upload")
async def handle_log_upload(
file: UploadFile = File(...,alias="file"),
request_id: str = Form(...,alias="request_id"),
file_name: str = Form(...,alias="file_name")
):
print("handle_log_upload",request_id)
# 验证请求ID有效性
async with upload_events_lock:
event = upload_events.get(request_id)
if not event:
return {"success": False, "message": "Invalid request ID"}
try:
# 读取文件内容
content = await file.read()
# 存储文件内容
async with file_cache_lock:
file_cache[request_id] = content
# 通知等待的请求
event.set()
return JSONResponse(
content={"success": True, "data": {"request_id": request_id}},
status_code=200
)
except Exception as e:
logger.error(f"Upload failed: {str(e)}")
return {"success": False, "message": "File processing error"}
@monitor_router.get("/device_list")
async def get_device_list(request:Request):
params = dict(request.query_params)
#headers = request.headers
device_list = []
connections = monitor.get_device_online_list()
for device_id in connections.keys():
socket = connections.get(device_id)
device_list.append({"device_id":device_id,"device_sn":socket.device_sn})
return {"success":True,"devices":device_list,"message": f"I device online list {params}"}