213 lines
6.6 KiB
Python
213 lines
6.6 KiB
Python
from fastapi import FastAPI, HTTPException
|
||
from pydantic import BaseModel, Field, validator
|
||
import sqlite3
|
||
import json,os
|
||
from datetime import datetime as dt
|
||
import uuid
|
||
from datetime import datetime
|
||
from typing import Optional, Dict, Any
|
||
|
||
app = FastAPI()
|
||
|
||
# 数据库文件路径
|
||
DATABASE = os.path.join('db', 'dtmgtDb.db') #"lms_data.db"
|
||
|
||
|
||
# 创建数据库表
|
||
def init_db():
|
||
db_path = os.path.join('db', 'dtmgtDb.db')
|
||
conn = sqlite3.connect(db_path)
|
||
c = conn.cursor()
|
||
# 创建新表结构,包含所有拆分字段
|
||
c.execute('''
|
||
CREATE TABLE IF NOT EXISTS LMIS_result (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
requestId TEXT UNIQUE NOT NULL,
|
||
module TEXT NOT NULL,
|
||
buckSn TEXT,
|
||
time TEXT NOT NULL,
|
||
deviceCode TEXT NOT NULL,
|
||
deviceName TEXT NOT NULL,
|
||
deviceStatus TEXT NOT NULL,
|
||
operatorId TEXT NOT NULL,
|
||
|
||
-- 从data字段拆分的具体字段 --
|
||
project TEXT,
|
||
phase TEXT,
|
||
channel TEXT,
|
||
startTime TEXT,
|
||
endTime TEXT,
|
||
orientation TEXT,
|
||
direction TEXT,
|
||
cycles INTEGER,
|
||
cyclesTotal INTEGER,
|
||
height REAL,
|
||
config TEXT,
|
||
|
||
-- 原始JSON数据备份 --
|
||
raw_data TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
# 初始化数据库
|
||
init_db()
|
||
|
||
|
||
# 请求模型 - 适配客户端格式
|
||
# 请求模型 - 适配客户端格式
|
||
class RequestData(BaseModel):
|
||
requestId: str = Field(..., min_length=12, max_length=40,
|
||
description="请求唯一编号uuid(去除横杠)")
|
||
module: str
|
||
buckSn: Optional[str] = None
|
||
time: str
|
||
deviceCode: str
|
||
deviceName: str
|
||
deviceStatus: str
|
||
operatorId: str
|
||
data: str # 客户端发送的是JSON字符串
|
||
|
||
@validator('time')
|
||
def validate_time_format(cls, v):
|
||
"""验证时间格式是否正确"""
|
||
try:
|
||
dt.strptime(v, "%Y-%m-%d %H:%M:%S")
|
||
return v
|
||
except ValueError:
|
||
raise ValueError("时间格式应为 YYYY-MM-DD HH:MM:SS")
|
||
|
||
|
||
@app.post("/gateway/api/rel/upload/commonUploadData")
|
||
async def submit_lmis_data(request_data: RequestData):
|
||
"""接收LMIS数据并写入SQLite数据库,拆分data字段"""
|
||
print(f"submit_lmis_data /gateway/api/rel/upload/commonUploadData")
|
||
# 检查requestId是否已存在(防重)
|
||
conn = sqlite3.connect(DATABASE)
|
||
c = conn.cursor()
|
||
c.execute("SELECT 1 FROM LMIS_result WHERE requestId = ?", (request_data.requestId,))
|
||
if c.fetchone():
|
||
conn.close()
|
||
# 返回客户端期望的1501状态码
|
||
return {
|
||
"code": 1501,
|
||
"message": "数据已存在,重复的requestId"
|
||
}
|
||
|
||
try:
|
||
# 解析data字段
|
||
try:
|
||
data_json = json.loads(request_data.data)
|
||
except json.JSONDecodeError as e:
|
||
error_detail = f"data字段不是有效的JSON: {str(e)}"
|
||
raise HTTPException(status_code=400, detail=error_detail)
|
||
|
||
# 提取各个字段(使用get方法避免KeyError)
|
||
project = data_json.get("project")
|
||
phase = data_json.get("phase")
|
||
channel = data_json.get("channel")
|
||
startTime = data_json.get("startTime")
|
||
endTime = data_json.get("endTime")
|
||
orientation = data_json.get("orientation")
|
||
direction = data_json.get("direction")
|
||
|
||
# 处理数值字段,确保类型正确
|
||
cycles = data_json.get("cycles")
|
||
if cycles is not None and not isinstance(cycles, int):
|
||
try:
|
||
cycles = int(cycles)
|
||
except (ValueError, TypeError):
|
||
cycles = None
|
||
|
||
cyclesTotal = data_json.get("cyclesTotal")
|
||
if cyclesTotal is not None and not isinstance(cyclesTotal, int):
|
||
try:
|
||
cyclesTotal = int(cyclesTotal)
|
||
except (ValueError, TypeError):
|
||
cyclesTotal = None
|
||
|
||
height = data_json.get("height")
|
||
if height is not None and not isinstance(height, float):
|
||
try:
|
||
height = float(height)
|
||
except (ValueError, TypeError):
|
||
height = None
|
||
|
||
config = data_json.get("config")
|
||
print(f"json={data_json}")
|
||
# 插入数据到数据库(包含所有拆分字段)
|
||
c.execute('''
|
||
INSERT INTO LMIS_result (
|
||
requestId, module, buckSn, time,
|
||
deviceCode, deviceName, deviceStatus, operatorId,
|
||
project, phase, channel, startTime, endTime,
|
||
orientation, direction, cycles, cyclesTotal, height, config,
|
||
raw_data
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
request_data.requestId,
|
||
request_data.module,
|
||
request_data.buckSn,
|
||
request_data.time,
|
||
request_data.deviceCode,
|
||
request_data.deviceName,
|
||
request_data.deviceStatus,
|
||
request_data.operatorId,
|
||
project,
|
||
phase,
|
||
channel,
|
||
startTime,
|
||
endTime,
|
||
orientation,
|
||
direction,
|
||
cycles,
|
||
cyclesTotal,
|
||
height,
|
||
config,
|
||
request_data.data # 存储原始JSON作为备份
|
||
))
|
||
|
||
conn.commit()
|
||
row_id = c.lastrowid
|
||
|
||
# 返回客户端期望的成功响应格式
|
||
return {
|
||
"code": 200,
|
||
"message": "操作成功",
|
||
"data": {
|
||
"recordId": row_id,
|
||
"requestId": request_data.requestId
|
||
}
|
||
}
|
||
except HTTPException as e:
|
||
# 捕获已知异常
|
||
conn.rollback()
|
||
return {
|
||
"code": 400,
|
||
"message": e.detail
|
||
}
|
||
except Exception as e:
|
||
# 处理其他异常
|
||
conn.rollback()
|
||
error_detail = f"服务器内部错误: {str(e)}"
|
||
return {
|
||
"code": 500,
|
||
"message": error_detail
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# 添加一个简单的根路由用于测试
|
||
@app.get("/gateway/api/rel/upload/getconnect")
|
||
async def get_connect():
|
||
return {"message": "LMIS 模拟接收数据服务已运行", "status": "ok"}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
|
||
uvicorn.run(app, host="0.0.0.0", port=8088) |