This commit is contained in:
418
asr-monitor/app/monitor_service.py
Normal file
418
asr-monitor/app/monitor_service.py
Normal file
@@ -0,0 +1,418 @@
|
||||
from fastapi import WebSocket, APIRouter,WebSocketDisconnect,Request,Body,Query
|
||||
from fastapi import FastAPI, UploadFile, File, Form
|
||||
from fastapi.responses import StreamingResponse,JSONResponse
|
||||
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
import logging,uuid
|
||||
from contextlib import suppress
|
||||
|
||||
monitor_router = APIRouter()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeviceMonitor:
|
||||
def __init__(self):
|
||||
# 连接状态存储
|
||||
self.active_connections: Dict[str, WebSocket] = {}
|
||||
# 心跳时间记录
|
||||
self.client_last_ping: Dict[str, float] = {} # 客户端最后发送时间
|
||||
self.server_last_ping: Dict[str, float] = {} # 服务端最后发送时间
|
||||
self.server_last_ack: Dict[str, float] = {} # 服务端最后收到ACK时间
|
||||
# 心跳任务管理
|
||||
self.heartbeat_tasks: Dict[str, asyncio.Task] = {}
|
||||
# 新增:请求响应等待队列
|
||||
self.pending_requests: Dict[str, asyncio.Queue] = {}
|
||||
|
||||
async def connect(self, device_id: str, websocket: WebSocket, device_sn: str):
|
||||
"""处理新设备连接"""
|
||||
await websocket.accept()
|
||||
websocket.device_sn = device_sn
|
||||
self.active_connections[device_id] = websocket
|
||||
self._init_heartbeat(device_id)
|
||||
|
||||
logger.info(f"设备 {device_id} 连接成功")
|
||||
|
||||
def _init_heartbeat(self, device_id: str):
|
||||
"""初始化心跳记录"""
|
||||
now = time.time()
|
||||
self.client_last_ping[device_id] = now
|
||||
self.server_last_ping[device_id] = now
|
||||
self.server_last_ack[device_id] = now
|
||||
|
||||
# 启动双向心跳检测
|
||||
self.heartbeat_tasks[device_id] = asyncio.create_task(
|
||||
self._server_heartbeat_loop(device_id)
|
||||
)
|
||||
|
||||
async def _server_heartbeat_loop(self, device_id: str):
|
||||
"""服务端主动心跳循环"""
|
||||
try:
|
||||
while True:
|
||||
# 发送心跳间隔(秒)
|
||||
await asyncio.sleep(15)
|
||||
|
||||
if not self._is_connected(device_id):
|
||||
break
|
||||
|
||||
# 发送心跳并记录时间
|
||||
await self._send_heartbeat(device_id)
|
||||
self.server_last_ping[device_id] = time.time()
|
||||
|
||||
# 等待ACK响应
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# 检查ACK响应
|
||||
if not self._check_server_ack(device_id):
|
||||
logger.warning(f"设备 {device_id} 服务端心跳未响应")
|
||||
await self.disconnect(device_id)
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"心跳任务异常: {str(e)}")
|
||||
await self.disconnect(device_id)
|
||||
|
||||
def _check_server_ack(self, device_id: str) -> bool:
|
||||
"""检查服务端心跳确认"""
|
||||
return (
|
||||
time.time() - self.server_last_ping[device_id] < 20 and
|
||||
self.server_last_ack[device_id] > self.server_last_ping[device_id]
|
||||
)
|
||||
|
||||
async def _send_heartbeat(self, device_id: str):
|
||||
"""发送服务端心跳"""
|
||||
try:
|
||||
await self.active_connections[device_id].send_json({
|
||||
"type": "server_heartbeat",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"发送服务端心跳失败: {str(e)}")
|
||||
await self.disconnect(device_id)
|
||||
|
||||
async def disconnect(self, device_id: str):
|
||||
"""安全断开连接"""
|
||||
if device_id in self.active_connections:
|
||||
# 关闭连接
|
||||
with suppress(Exception):
|
||||
await self.active_connections[device_id].close()
|
||||
|
||||
# 清理状态
|
||||
self._cleanup_device(device_id)
|
||||
logger.info(f"设备 {device_id} 已断开")
|
||||
|
||||
def _cleanup_device(self, device_id: str):
|
||||
"""清理设备相关数据"""
|
||||
# 取消心跳任务
|
||||
if task := self.heartbeat_tasks.get(device_id):
|
||||
task.cancel()
|
||||
del self.heartbeat_tasks[device_id]
|
||||
|
||||
# 删除记录
|
||||
for collection in [
|
||||
self.active_connections,
|
||||
self.client_last_ping,
|
||||
self.server_last_ping,
|
||||
self.server_last_ack
|
||||
]:
|
||||
if device_id in collection:
|
||||
del collection[device_id]
|
||||
|
||||
async def check_client_heartbeats(self):
|
||||
"""检查客户端心跳定时任务"""
|
||||
while True:
|
||||
await asyncio.sleep(10)
|
||||
current_time = time.time()
|
||||
for device_id in list(self.client_last_ping.keys()):
|
||||
if current_time - self.client_last_ping[device_id] > 30:
|
||||
logger.warning(f"设备 {device_id} 客户端心跳超时 {current_time - self.client_last_ping[device_id]}")
|
||||
await self.disconnect(device_id)
|
||||
|
||||
def _is_connected(self, device_id: str) -> bool:
|
||||
"""检查连接是否有效"""
|
||||
return device_id in self.active_connections
|
||||
|
||||
def get_device_socket(self,deviceId):
|
||||
return self.active_connections.get(deviceId)
|
||||
|
||||
async def get_log_list(self,deviceId):
|
||||
device_socket = self.active_connections.get(deviceId)
|
||||
if device_socket:
|
||||
print("send request_log_list")
|
||||
await device_socket.send_json({"type": "request_log_list"})
|
||||
|
||||
async def send_log_list_request(self, device_id: str) -> Optional[dict]:
|
||||
"""发送日志请求并等待响应"""
|
||||
if device_id not in self.active_connections:
|
||||
return None
|
||||
|
||||
# 生成唯一请求ID
|
||||
request_id = str(uuid.uuid4())
|
||||
response_queue = asyncio.Queue(maxsize=1)
|
||||
self.pending_requests[request_id] = response_queue
|
||||
|
||||
try:
|
||||
# 发送带请求ID的请求
|
||||
await self.active_connections[device_id].send_json({
|
||||
"type": "request_log_list",
|
||||
"request_id": request_id
|
||||
})
|
||||
|
||||
# 等待响应(最多等待10秒)
|
||||
return await asyncio.wait_for(response_queue.get(), timeout=10)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"设备 {device_id} 日志请求超时")
|
||||
return None
|
||||
finally:
|
||||
# 清理请求队列
|
||||
self.pending_requests.pop(request_id, None)
|
||||
|
||||
async def send_upload_log_command(self, device_id: str,file_name: str,request_id: str) -> Optional[dict]:
|
||||
"""发送日志请求并等待响应"""
|
||||
if device_id not in self.active_connections:
|
||||
return None
|
||||
if not file_name:
|
||||
return None
|
||||
|
||||
try:
|
||||
# 发送带请求ID的请求
|
||||
await self.active_connections[device_id].send_json({
|
||||
"type": "request_log_file",
|
||||
"request_id": request_id,
|
||||
"file_name" :file_name
|
||||
})
|
||||
|
||||
# 等待响应(最多等待10秒)
|
||||
return True
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"设备 {device_id} 日志请求超时")
|
||||
return False
|
||||
finally:
|
||||
# 清理请求队列
|
||||
pass
|
||||
|
||||
async def handle_websocket_message(self, device_id: str, data: dict):
|
||||
"""处理WebSocket消息"""
|
||||
if data["type"] == "log_list":
|
||||
# 提取请求ID并放入响应队列
|
||||
request_id = data.get("request_id")
|
||||
if request_id in self.pending_requests:
|
||||
await self.pending_requests[request_id].put({
|
||||
"files": data["files"],
|
||||
"status": "success"
|
||||
})
|
||||
|
||||
def get_device_online_list(self):
|
||||
return self.active_connections
|
||||
|
||||
monitor = DeviceMonitor()
|
||||
|
||||
|
||||
@monitor_router.on_event("startup")
|
||||
async def startup_event():
|
||||
# 启动客户端心跳检测
|
||||
asyncio.create_task(monitor.check_client_heartbeats())
|
||||
logger.info("监控服务已启动")
|
||||
|
||||
|
||||
@monitor_router.websocket("/ws")
|
||||
async def monitor_websocket(websocket: WebSocket):
|
||||
# await websocket.accept()
|
||||
# 设备身份验证
|
||||
#device_id = await websocket.receive_text()
|
||||
device_id= websocket.headers.get('x-device-id',"1234567")
|
||||
deivce_sn= websocket.headers.get('x-device-sn',"")
|
||||
try:
|
||||
await monitor.connect(device_id, websocket, deivce_sn)
|
||||
while True:
|
||||
data = await websocket.receive_json()
|
||||
|
||||
# 处理客户端心跳
|
||||
if data["type"] == "client_heartbeat":
|
||||
monitor.client_last_ping[device_id] = time.time()
|
||||
await websocket.send_json({"type": "client_heartbeat_ack"})
|
||||
|
||||
# 处理服务端心跳响应
|
||||
elif data["type"] == "server_heartbeat_ack":
|
||||
monitor.server_last_ack[device_id] = time.time()
|
||||
await websocket.send_json({"type": "server_heartbeat_confirm"})
|
||||
|
||||
if data["type"] == "log_metadata":
|
||||
# 初始化文件接收
|
||||
file_name = data["name"]
|
||||
current_file = open(f"received_logs/{file_name}", "wb")
|
||||
total_chunks = data["chunks"]
|
||||
|
||||
elif data["type"] == "log_chunk":
|
||||
index = data["index"]
|
||||
checksum = data["checksum"]
|
||||
|
||||
# 接收二进制数据
|
||||
chunk_data = await websocket.receive_bytes()
|
||||
|
||||
# 校验分片
|
||||
if hashlib.md5(chunk_data).hexdigest() != checksum:
|
||||
continue
|
||||
|
||||
# 写入文件
|
||||
current_file.seek(index * 512 * 1024)
|
||||
current_file.write(chunk_data)
|
||||
received_chunks.add(index)
|
||||
|
||||
# 发送确认
|
||||
await websocket.send_json({
|
||||
"type": "log_ack",
|
||||
"index": index
|
||||
})
|
||||
|
||||
elif data["type"] == "log_list":
|
||||
await monitor.handle_websocket_message(device_id, data)
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"设备 {device_id} 主动断开")
|
||||
except Exception as e:
|
||||
logger.error(f"连接异常: {str(e)}")
|
||||
finally:
|
||||
await monitor.disconnect(device_id)
|
||||
|
||||
# REST API路由
|
||||
@monitor_router.get("/log_list")
|
||||
async def get_log_list(request:Request):
|
||||
# headers = request.headers
|
||||
params = dict(request.query_params)
|
||||
device_id = params.get('device_id')
|
||||
if not device_id:
|
||||
return {'success': False, "message": "缺少deviceId参数"}
|
||||
|
||||
try:
|
||||
# 发送请求并等待响应
|
||||
response = await monitor.send_log_list_request(device_id)
|
||||
|
||||
if response:
|
||||
return {
|
||||
'success': True,
|
||||
'files': response.get("files", []),
|
||||
'deviceId': device_id
|
||||
}
|
||||
return {'success': False, "message": "设备未响应"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"日志请求失败: {str(e)}")
|
||||
return {'success': False, "message": "服务端错误"}
|
||||
|
||||
|
||||
# 异步存储上传事件和文件内容
|
||||
upload_events = {}
|
||||
file_cache = {}
|
||||
upload_events_lock = asyncio.Lock()
|
||||
file_cache_lock = asyncio.Lock()
|
||||
|
||||
|
||||
@monitor_router.get("/get_log_file")
|
||||
async def get_log_file_content(request: Request):
|
||||
params = dict(request.query_params)
|
||||
device_id = params.get('device_id')
|
||||
file_name = params.get('file_name')
|
||||
print(f"get_log_content {device_id} {file_name}")
|
||||
# 参数校验
|
||||
if not device_id or not file_name:
|
||||
return {'success': False, "message": "Missing device_id or file_name"}
|
||||
|
||||
# 生成唯一请求ID
|
||||
request_id = str(uuid.uuid4())
|
||||
event = asyncio.Event()
|
||||
|
||||
# 注册等待事件
|
||||
async with upload_events_lock:
|
||||
upload_events[request_id] = event
|
||||
async with file_cache_lock:
|
||||
file_cache[request_id] = None
|
||||
|
||||
try:
|
||||
# 通过WebSocket通知设备上传文件
|
||||
await monitor.send_upload_log_command(
|
||||
device_id=device_id,
|
||||
file_name=file_name,
|
||||
request_id=request_id
|
||||
)
|
||||
|
||||
# 等待文件上传(最多30秒)
|
||||
try:
|
||||
await asyncio.wait_for(event.wait(), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
return {'success': False, "message": "Upload timeout"}
|
||||
|
||||
# 获取文件内容
|
||||
async with file_cache_lock:
|
||||
content = file_cache.get(request_id)
|
||||
|
||||
if not content:
|
||||
return {'success': False, "message": "Empty file content"}
|
||||
|
||||
# 返回文件流响应
|
||||
return StreamingResponse(
|
||||
iter([content]),
|
||||
media_type="application/octet-stream",
|
||||
headers={"Content-Disposition": f"attachment; filename={file_name}"}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing request: {str(e)}")
|
||||
return {'success': False, "message": "Server error"}
|
||||
|
||||
finally:
|
||||
# 清理资源
|
||||
async with upload_events_lock:
|
||||
upload_events.pop(request_id, None)
|
||||
async with file_cache_lock:
|
||||
file_cache.pop(request_id, None)
|
||||
|
||||
|
||||
@monitor_router.post("/logfile/upload")
|
||||
async def handle_log_upload(
|
||||
file: UploadFile = File(...,alias="file"),
|
||||
request_id: str = Form(...,alias="request_id"),
|
||||
file_name: str = Form(...,alias="file_name")
|
||||
):
|
||||
print("handle_log_upload",request_id)
|
||||
# 验证请求ID有效性
|
||||
async with upload_events_lock:
|
||||
event = upload_events.get(request_id)
|
||||
|
||||
if not event:
|
||||
return {"success": False, "message": "Invalid request ID"}
|
||||
|
||||
try:
|
||||
# 读取文件内容
|
||||
content = await file.read()
|
||||
|
||||
# 存储文件内容
|
||||
async with file_cache_lock:
|
||||
file_cache[request_id] = content
|
||||
|
||||
# 通知等待的请求
|
||||
event.set()
|
||||
|
||||
return JSONResponse(
|
||||
content={"success": True, "data": {"request_id": request_id}},
|
||||
status_code=200
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Upload failed: {str(e)}")
|
||||
return {"success": False, "message": "File processing error"}
|
||||
|
||||
@monitor_router.get("/device_list")
|
||||
async def get_device_list(request:Request):
|
||||
params = dict(request.query_params)
|
||||
#headers = request.headers
|
||||
device_list = []
|
||||
connections = monitor.get_device_online_list()
|
||||
for device_id in connections.keys():
|
||||
socket = connections.get(device_id)
|
||||
device_list.append({"device_id":device_id,"device_sn":socket.device_sn})
|
||||
|
||||
return {"success":True,"devices":device_list,"message": f"I device online list {params}"}
|
||||
Reference in New Issue
Block a user