882 lines
38 KiB
Python
882 lines
38 KiB
Python
from flask import Flask, request, jsonify, g, Response
|
||
from flask_cors import CORS, cross_origin
|
||
import sqlite3, datetime
|
||
from pymodbus.client import ModbusSerialClient
|
||
from pymodbus.exceptions import ConnectionException
|
||
import eel
|
||
import threading, os, sys, platform
|
||
import serial
|
||
|
||
import plc_comm as dtm_machine
|
||
|
||
dtm_machines = {}
|
||
# 初始化 Flask 应用
|
||
app = Flask(__name__)
|
||
CORS(app) # 这会让所有的资源都支持跨域访问
|
||
CORS(app, resources=r"/*")
|
||
|
||
# 初始化 Modbus 通信
|
||
client = ModbusSerialClient(method='rtu', port='<Your COM Port>', baudrate=9600)
|
||
|
||
# 连接SQLite数据库
|
||
try:
|
||
dbConn = sqlite3.connect('db\dtmgtDb.db')
|
||
dbCursor = dbConn.cursor()
|
||
|
||
# 创建表
|
||
dbCursor.execute('''CREATE TABLE IF NOT EXISTS TEST
|
||
(ID INT PRIMARY KEY NOT NULL,
|
||
NAME TEXT NOT NULL,
|
||
RESULT TEXT NOT NULL);''')
|
||
except sqlite3.Error as e:
|
||
print(f"Database error: {e}")
|
||
# 或者你可以选择抛出异常,让程序停止运行
|
||
# raise e
|
||
|
||
|
||
@app.before_request
|
||
def before_request():
|
||
g.db = sqlite3.connect('db\dtmgtDb.db')
|
||
# print("open database", g.db)
|
||
|
||
|
||
# 设置跌落机的试验参数
|
||
@app.route('/machine_control', methods=['POST'])
|
||
@cross_origin()
|
||
def set_params():
|
||
global dtm_machines
|
||
params = request.json
|
||
json_return = {"status": "error"}
|
||
actionParam = {}
|
||
"""
|
||
let control_reqs={
|
||
start:{machine:'HYSZ-DTM-002',com:'com2',station:'01'},
|
||
stop:{machine:'HYSZ-DTM-002',com:'com2',station:'01'},
|
||
setParams: {
|
||
machine: 'HYSZ-DTM-002', com: 'com2', station: '01', dropHeight: 67, dropCycles: 89,
|
||
},
|
||
getParams:{machine: 'HYSZ-DTM-002', com: 'com2', station: '01'}
|
||
}
|
||
"""
|
||
"""
|
||
dtm_machine1.read_station_dropheight('01')
|
||
dtm_machine1.set_station_dropheight('02',531)
|
||
dtm_machine1.start_station('01')
|
||
"""
|
||
# 检测键是否存在
|
||
if 'setParams' in params:
|
||
setParams = params['setParams']
|
||
actionParam = setParams
|
||
# 使用 set 检测多个键是否存在
|
||
for setParam in setParams:
|
||
required_keys = {'machine', 'com', 'station'}
|
||
if required_keys.issubset(setParam.keys()):
|
||
if setParam['com'].lower() in dtm_machines:
|
||
dtm_machine = dtm_machines[setParam['com'].lower()]
|
||
# print("set machine params", setParam, setParam['station'], setParam['dropHeight'])
|
||
try:
|
||
if 'dropHeight' in setParam:
|
||
resp = dtm_machine.set_station_dropheight(setParam['station'], int(setParam['dropHeight']))
|
||
if "status" in resp and resp["status"] == "success":
|
||
json_return["status"] = "success"
|
||
json_return["action"] = "set machine params"
|
||
pass
|
||
if 'dropCycles' in setParam:
|
||
resp = dtm_machine.set_station_cycles(setParam['station'], int(setParam['dropCycles']))
|
||
if "status" in resp and resp["status"] == "success":
|
||
json_return["status"] = "success"
|
||
json_return["action"] = "set machine params"
|
||
pass
|
||
if 'cyclesFinished' in setParam:
|
||
resp = dtm_machine.set_station_finished(setParam['station'],
|
||
int(setParam['cyclesFinished']))
|
||
if "status" in resp and resp["status"] == "success":
|
||
json_return["status"] = "success"
|
||
json_return["action"] = "set machine params"
|
||
pass
|
||
except Exception as error:
|
||
return jsonify({"status": "error", "msg": error})
|
||
pass
|
||
else:
|
||
# print("键 'machine', 'com', 'station', 'params' 不完全存在于 JSON 中")
|
||
pass
|
||
if 'start' in params or 'stop' in params or 'resume' in params:
|
||
param = None
|
||
if 'start' in params:
|
||
param = params['start']
|
||
if 'stop' in params:
|
||
param = params['stop']
|
||
if 'resume' in params:
|
||
param = params['resume']
|
||
actionParam = param
|
||
# 使用 set 检测多个键是否存在
|
||
required_keys = {'machine', 'com', 'station'}
|
||
if required_keys.issubset(param.keys()):
|
||
if param['com'].lower() in dtm_machines:
|
||
dtm_machine = dtm_machines[param['com'].lower()]
|
||
# print("set machine params", setParam, setParam['station'], setParam['dropHeight'])
|
||
try:
|
||
if 'start' in params:
|
||
resp = dtm_machine.start_station(param['station'])
|
||
if "status" in resp and resp["status"] == "success":
|
||
json_return["status"] = "success"
|
||
else:
|
||
pass
|
||
json_return["action"] = "start"
|
||
if 'resume' in params:
|
||
resp = dtm_machine.resume_station(param['station'])
|
||
if "status" in resp and resp["status"] == "success":
|
||
json_return["status"] = "success"
|
||
json_return["action"] = "resume"
|
||
if 'stop' in params:
|
||
resp = dtm_machine.stop_station(param['station'])
|
||
if "status" in resp and resp["status"] == "success":
|
||
json_return["status"] = "success"
|
||
json_return["action"] = "stop"
|
||
except Exception as error:
|
||
return jsonify({"status": "error", "msg": error})
|
||
pass
|
||
else:
|
||
# print("键 'machine', 'com', 'station', 'params' 不完全存在于 JSON 中")
|
||
pass
|
||
if 'getParams' in params:
|
||
param = params['getParams']
|
||
actionParam = param
|
||
# 使用 set 检测多个键是否存在q
|
||
required_keys = {'machine', 'com', 'station'}
|
||
if required_keys.issubset(param.keys()):
|
||
if param['com'].lower() in dtm_machines:
|
||
dtm_machine = dtm_machines[param['com'].lower()]
|
||
# print("set machine params", setParam, setParam['station'], setParam['dropHeight'])
|
||
try:
|
||
finished = dtm_machine.read_station_cyclesFinished(param['station'])
|
||
if 'status' in finished and finished['status'] == 'success' and 'value' in finished:
|
||
json_return["status"] = "success"
|
||
json_return["action"] = "getParams"
|
||
json_return["finished"] = finished['value']
|
||
except Exception as error:
|
||
return jsonify({"status": "error", "msg": error})
|
||
pass
|
||
else:
|
||
# print("键 'machine', 'com', 'station', 'params' 不完全存在于 JSON 中")
|
||
pass
|
||
return json_return
|
||
|
||
|
||
"""
|
||
CREATE TABLE DUTList (
|
||
SN TEXT (20) UNIQUE,
|
||
name TEXT (20),
|
||
project TEXT (20),
|
||
phase TEXT (16),
|
||
stationAssigned TEXT (18),
|
||
itemOnGoing TEXT (10),
|
||
itemsFinished NUMERIC (5),
|
||
status TEXT (10),
|
||
testReq TEXT (10),
|
||
inspector TEXT (20),
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT
|
||
);
|
||
"""
|
||
import sqlite3
|
||
from flask import jsonify, request
|
||
|
||
@app.route('/dbTableAccess', methods=['GET', 'POST'])
|
||
@app.route('/dbTableAccess', methods=['GET', 'POST'])
|
||
@cross_origin()
|
||
def process_table_data():
|
||
print("dbTableAccess request",request) #cyx
|
||
table = request.args.get('table')
|
||
if table is None:
|
||
return jsonify({"error": "Table parameter is missing"}), 400
|
||
|
||
if request.method == 'GET':
|
||
sql_str = f"SELECT * FROM {table} WHERE id > 0"
|
||
sql_params = []
|
||
parameters_to_check = []
|
||
if table == 'dutList':
|
||
parameters_to_check = ['name', 'PN', 'description']
|
||
if table == 'testReq':
|
||
parameters_to_check = ['SN', 'dropItem', 'dropHeight', 'dropCycles', 'description']
|
||
if table == 'project':
|
||
parameters_to_check = ['name', 'key', 'manager', 'type', 'description']
|
||
if table == 'projectPhase':
|
||
parameters_to_check = ['name', 'PN', 'description']
|
||
if table == 'dtMachine':
|
||
parameters_to_check = ['label', 'SN', 'status', 'type', 'color', 'description']
|
||
|
||
def get_parameter(param):
|
||
param_sql = ""
|
||
param_value = request.args.getlist(param)
|
||
if param_value:
|
||
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
|
||
return param_sql, param_value
|
||
|
||
for param in parameters_to_check:
|
||
param_sql, param_value = get_parameter(param)
|
||
sql_str += param_sql
|
||
sql_params.extend(param_value)
|
||
|
||
try:
|
||
if table == 'dutList':
|
||
sql_str = sql_str + ' ORDER BY date(createdate) DESC'
|
||
cursor = g.db.cursor()
|
||
cursor.execute(sql_str, sql_params)
|
||
except sqlite3.Error as e:
|
||
return jsonify({"error": f"Database error: {e}"}), 500
|
||
|
||
rows = cursor.fetchall()
|
||
column_names = [description[0] for description in cursor.description]
|
||
|
||
result = []
|
||
for row in rows:
|
||
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
|
||
result.append(dict_row)
|
||
|
||
return jsonify({'data': result})
|
||
|
||
elif request.method == 'POST':
|
||
data = request.get_json()
|
||
action = data['action']
|
||
records = data['records']
|
||
if action is None:
|
||
return jsonify({"status": "error", "message": "action is not specified"})
|
||
elif action == "insert":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
for record in records:
|
||
if 'id' in record:
|
||
del record['id']
|
||
columns = ', '.join(record.keys())
|
||
placeholders = ', '.join('?' for _ in record)
|
||
sql = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
|
||
cursor.execute(sql, tuple(record.values()))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "insert", "message": f"insert records into {table} success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('create duts error', error)
|
||
return jsonify({"status": "error", "action": "insert", "message": f"insert records into {table} error"})
|
||
|
||
elif action == "update":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
update_key = 'PN'
|
||
if table == 'dutList':
|
||
update_key = 'SN'
|
||
if table == 'testReq':
|
||
update_key = 'id'
|
||
if table == 'project':
|
||
update_key = 'key'
|
||
if table == 'projectPhase':
|
||
update_key = 'PN'
|
||
if table == 'dtMachine':
|
||
update_key = 'SN'
|
||
|
||
for record in records:
|
||
pn_value = record.get(update_key, None)
|
||
if pn_value is not None:
|
||
cursor.execute(
|
||
f'UPDATE {table} SET {", ".join(["{} = ?".format(field) for field in record])} WHERE {update_key} = ?',
|
||
tuple(record.values()) + (record[update_key],))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "update", "message": f"update records into {table} success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('update duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "update", "message": f"update records into {table} error{error}"})
|
||
|
||
elif action == "delete":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
cursor.executemany(f"DELETE FROM {table} WHERE id = ?", [(record['id'],) for record in records])
|
||
g.db.commit()
|
||
return jsonify({"status": "success", "action": "delete", "message": f"delete {table} records success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('delete', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "delete", "message": f"delete {table} records error{error}"})
|
||
|
||
|
||
def get_parameter(param_name):
|
||
param = request.args.get(param_name)
|
||
if param is not None:
|
||
return f" AND {param_name} = ?", param
|
||
return "", None
|
||
|
||
|
||
@app.route('/dutlist', methods=['GET', 'POST'])
|
||
@app.route('/dutList', methods=['GET', 'POST'])
|
||
@cross_origin()
|
||
def dut_info_list():
|
||
# 在这里实现你的数据处理逻辑
|
||
|
||
# start_time = request.args.get('start_time')
|
||
# end_time = request.args.get('end_time')
|
||
if request.method == 'GET':
|
||
sql_str = "SELECT * FROM DUTList WHERE id > 0"
|
||
sql_params = []
|
||
parameters_to_check = ['SN', 'status', 'inspector', 'project']
|
||
|
||
def get_parameter(param):
|
||
param_sql = ""
|
||
param_value = request.args.getlist(param)
|
||
if param_value:
|
||
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
|
||
return param_sql, param_value
|
||
|
||
for param in parameters_to_check:
|
||
param_sql, param_value = get_parameter(param)
|
||
sql_str += param_sql
|
||
sql_params.extend(param_value)
|
||
# print("select dutList params", param_value) # cyx
|
||
|
||
try:
|
||
# 从数据库查询数据
|
||
sql_str = sql_str + ' ORDER BY date(createdate) DESC'
|
||
cursor = g.db.cursor()
|
||
cursor.execute(sql_str, sql_params)
|
||
except sqlite3.Error as e:
|
||
return jsonify({"error": f"Database error: {e}"}), 500
|
||
|
||
rows = cursor.fetchall()
|
||
# 获取列名
|
||
column_names = [description[0] for description in cursor.description]
|
||
|
||
result = []
|
||
for row in rows:
|
||
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
|
||
result.append(dict_row)
|
||
return jsonify({'data': result})
|
||
|
||
elif request.method == 'POST':
|
||
data = request.get_json() # 从POST请求中获取数据
|
||
action = data['action']
|
||
records = data['records']
|
||
if action is None:
|
||
return jsonify({"status": "error", "message": "action is not specified"})
|
||
elif action == "insert":
|
||
try:
|
||
# 获取并格式化当前日期
|
||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
cursor = g.db.cursor()
|
||
for record in records:
|
||
# 将当前日期添加到记录中作为创建日期
|
||
record['createdate'] = current_date
|
||
columns = ', '.join(record.keys())
|
||
placeholders = ', '.join('?' for _ in record)
|
||
sql = 'INSERT INTO DUTList ({}) VALUES ({})'.format(columns, placeholders)
|
||
cursor.execute(sql, tuple(record.values()))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "insert", "message": "insert records into dutList success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('create duts error', error)
|
||
return jsonify({"status": "error", "action": "insert", "message": "insert records into dutList error"})
|
||
pass
|
||
|
||
elif action == "update":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
# 遍历数据,从数据库中删除相应的记录
|
||
for record in records:
|
||
sn_value = record.get('SN', None)
|
||
if sn_value is not None:
|
||
cursor.execute(
|
||
'UPDATE DUTList SET %s WHERE SN = ?' % ', '.join(['%s = ?' % field for field in record]),
|
||
tuple(record.values()) + (record['SN'],))
|
||
g.db.commit()
|
||
return jsonify({"status": "success", "action": "update", "message": "update dutList success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('update duts error', error)
|
||
return jsonify({"status": "error", "action": "update", "message": f"update dutList records error{error}"})
|
||
pass
|
||
elif action == "delete":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
cursor.executemany("DELETE FROM DUTList WHERE SN = ?", [(record['SN'],) for record in records])
|
||
g.db.commit()
|
||
return jsonify({"status": "success", "action": "delete", "message": "delete dutList records success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('delete', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "delete", "message": f"delete dutList records error{error}"})
|
||
pass
|
||
|
||
|
||
@app.route('/testReq', methods=['GET', 'POST'])
|
||
@app.route('/testReq', methods=['GET', 'POST'])
|
||
@cross_origin()
|
||
def test_requirements():
|
||
# 在这里实现你的数据处理逻辑
|
||
if request.method == 'GET':
|
||
sql_str = "SELECT * FROM TestReq WHERE id > 0"
|
||
sql_params = []
|
||
parameters_to_check = ['SN', 'dropItem', 'dropHeight', 'dropCycles', 'description']
|
||
|
||
def get_parameter(param):
|
||
param_sql = ""
|
||
param_value = request.args.getlist(param)
|
||
if param_value:
|
||
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
|
||
return param_sql, param_value
|
||
|
||
for param in parameters_to_check:
|
||
param_sql, param_value = get_parameter(param)
|
||
sql_str += param_sql
|
||
sql_params.extend(param_value)
|
||
# print("select testReq params", param_value) # cyx
|
||
|
||
try:
|
||
# 从数据库查询数据
|
||
sql_str = sql_str + ' ORDER BY id ASC'
|
||
cursor = g.db.cursor()
|
||
cursor.execute(sql_str, sql_params)
|
||
except sqlite3.Error as e:
|
||
return jsonify({"error": f"Database error: {e}"}), 500
|
||
|
||
rows = cursor.fetchall()
|
||
# 获取列名
|
||
column_names = [description[0] for description in cursor.description]
|
||
|
||
result = []
|
||
for row in rows:
|
||
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
|
||
result.append(dict_row)
|
||
return jsonify({'data': result})
|
||
|
||
elif request.method == 'POST':
|
||
data = request.get_json() # 从POST请求中获取数据
|
||
action = data['action']
|
||
records = data['records']
|
||
if action is None:
|
||
return jsonify({"status": "error", "message": "action is not specified"})
|
||
elif action == "insert":
|
||
try:
|
||
# 获取并格式化当前日期
|
||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
cursor = g.db.cursor()
|
||
for record in records:
|
||
# 将当前日期添加到记录中作为创建日期
|
||
# record['createdate'] = current_date
|
||
if 'id' in record:
|
||
del record['id']
|
||
columns = ', '.join(record.keys())
|
||
placeholders = ', '.join('?' for _ in record)
|
||
sql = 'INSERT INTO TestReq ({}) VALUES ({})'.format(columns, placeholders)
|
||
cursor.execute(sql, tuple(record.values()))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "insert", "message": "insert records into testReq success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('create duts error', error)
|
||
return jsonify({"status": "error", "action": "insert", "message": "insert records into testReq error"})
|
||
pass
|
||
|
||
elif action == "update":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
# 遍历数据,从数据库中删除相应的记录
|
||
for record in records:
|
||
id_value = record.get('id', None)
|
||
if id_value is not None:
|
||
cursor.execute(
|
||
'UPDATE TestReq SET %s WHERE id = ?' % ', '.join(['%s = ?' % field for field in record]),
|
||
tuple(record.values()) + (record['id'],))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "update", "message": "update record into testReq success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('update duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "update", "message": f"update records into testReq error{error}"})
|
||
pass
|
||
elif action == "delete":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
cursor.executemany("DELETE FROM TestReq WHERE id = ?", [(record['id'],) for record in records])
|
||
g.db.commit()
|
||
return jsonify({"status": "success", "action": "delete", "message": "delete testReq records success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('delete', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "delete", "message": f"delete testReq records error{error}"})
|
||
pass
|
||
|
||
|
||
@app.route('/project', methods=['GET', 'POST'])
|
||
@app.route('/project', methods=['GET', 'POST'])
|
||
@cross_origin()
|
||
def project_db():
|
||
# 在这里实现你的数据处理逻辑
|
||
if request.method == 'GET':
|
||
sql_str = "SELECT * FROM project WHERE id > 0"
|
||
sql_params = []
|
||
parameters_to_check = ['name', 'key', 'manager', 'type', 'description']
|
||
|
||
def get_parameter(param):
|
||
param_sql = ""
|
||
param_value = request.args.getlist(param)
|
||
if param_value:
|
||
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
|
||
return param_sql, param_value
|
||
|
||
for param in parameters_to_check:
|
||
param_sql, param_value = get_parameter(param)
|
||
sql_str += param_sql
|
||
sql_params.extend(param_value)
|
||
# print("select project params", param_value) # cyx
|
||
|
||
try:
|
||
# 从数据库查询数据
|
||
sql_str = sql_str + ' ORDER BY id ASC'
|
||
cursor = g.db.cursor()
|
||
cursor.execute(sql_str, sql_params)
|
||
except sqlite3.Error as e:
|
||
return jsonify({"error": f"Database error: {e}"}), 500
|
||
|
||
rows = cursor.fetchall()
|
||
# 获取列名
|
||
column_names = [description[0] for description in cursor.description]
|
||
|
||
result = []
|
||
for row in rows:
|
||
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
|
||
result.append(dict_row)
|
||
return jsonify({'data': result})
|
||
|
||
elif request.method == 'POST':
|
||
data = request.get_json() # 从POST请求中获取数据
|
||
action = data['action']
|
||
records = data['records']
|
||
if action is None:
|
||
return jsonify({"status": "error", "message": "action is not specified"})
|
||
elif action == "insert":
|
||
try:
|
||
# 获取并格式化当前日期
|
||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
cursor = g.db.cursor()
|
||
for record in records:
|
||
# 将当前日期添加到记录中作为创建日期
|
||
# record['createdate'] = current_date
|
||
if 'id' in record:
|
||
del record['id']
|
||
columns = ', '.join(record.keys())
|
||
placeholders = ', '.join('?' for _ in record)
|
||
sql = 'INSERT INTO project ({}) VALUES ({})'.format(columns, placeholders)
|
||
cursor.execute(sql, tuple(record.values()))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "insert", "message": "insert records into project success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('create duts error', error)
|
||
return jsonify({"status": "error", "action": "insert", "message": "insert records into project error"})
|
||
pass
|
||
|
||
elif action == "update":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
# 遍历数据,从数据库中删除相应的记录
|
||
for record in records:
|
||
key_value = record.get('key', None)
|
||
if key_value is not None:
|
||
cursor.execute(
|
||
'UPDATE project SET %s WHERE key = ?' % ', '.join(['%s = ?' % field for field in record]),
|
||
tuple(record.values()) + (record['key'],))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "update", "message": "update records into project success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('update duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "update", "message": f"update records into project error{error}"})
|
||
pass
|
||
elif action == "delete":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
cursor.executemany("DELETE FROM project WHERE id = ?", [(record['id'],) for record in records])
|
||
g.db.commit()
|
||
return jsonify({"status": "success", "action": "delete", "message": "delete project records success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('delete project', error, records)
|
||
return jsonify(
|
||
{"status": "error", "action": "delete", "message": f"delete project records error{error}"})
|
||
pass
|
||
|
||
|
||
@app.route('/projectPhase', methods=['GET', 'POST'])
|
||
@app.route('/projectPhase', methods=['GET', 'POST'])
|
||
@cross_origin()
|
||
def projecPhase_db():
|
||
# 在这里实现你的数据处理逻辑
|
||
if request.method == 'GET':
|
||
sql_str = "SELECT * FROM projectPhase WHERE id > 0"
|
||
sql_params = []
|
||
parameters_to_check = ['name', 'PN', 'description']
|
||
|
||
def get_parameter(param):
|
||
param_sql = ""
|
||
param_value = request.args.getlist(param)
|
||
if param_value:
|
||
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
|
||
return param_sql, param_value
|
||
|
||
for param in parameters_to_check:
|
||
param_sql, param_value = get_parameter(param)
|
||
sql_str += param_sql
|
||
sql_params.extend(param_value)
|
||
# print("select projectPhase params", param_value) # cyx
|
||
|
||
try:
|
||
# 从数据库查询数据
|
||
sql_str = sql_str + ' ORDER BY id ASC'
|
||
cursor = g.db.cursor()
|
||
cursor.execute(sql_str, sql_params)
|
||
except sqlite3.Error as e:
|
||
return jsonify({"error": f"Database error: {e}"}), 500
|
||
|
||
rows = cursor.fetchall()
|
||
# 获取列名
|
||
column_names = [description[0] for description in cursor.description]
|
||
|
||
result = []
|
||
for row in rows:
|
||
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
|
||
result.append(dict_row)
|
||
return jsonify({'data': result})
|
||
|
||
elif request.method == 'POST':
|
||
data = request.get_json() # 从POST请求中获取数据
|
||
action = data['action']
|
||
records = data['records']
|
||
if action is None:
|
||
return jsonify({"status": "error", "message": "action is not specified"})
|
||
elif action == "insert":
|
||
try:
|
||
# 获取并格式化当前日期
|
||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
cursor = g.db.cursor()
|
||
for record in records:
|
||
# 将当前日期添加到记录中作为创建日期
|
||
# record['createdate'] = current_date
|
||
if 'id' in record:
|
||
del record['id']
|
||
columns = ', '.join(record.keys())
|
||
placeholders = ', '.join('?' for _ in record)
|
||
sql = 'INSERT INTO projectPhase ({}) VALUES ({})'.format(columns, placeholders)
|
||
cursor.execute(sql, tuple(record.values()))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "insert", "message": "insert records into projectPhase success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('create duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "insert", "message": "insert records into projectPhase error"})
|
||
pass
|
||
|
||
elif action == "update":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
# 遍历数据,从数据库中删除相应的记录
|
||
for record in records:
|
||
pn_value = record.get('PN', None)
|
||
if pn_value is not None:
|
||
cursor.execute(
|
||
'UPDATE projectPhase SET %s WHERE PN = ?' % ', '.join(
|
||
['%s = ?' % field for field in record]),
|
||
tuple(record.values()) + (record['PN'],))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "update", "message": "update records into projectPhase success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('update duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "update", "message": f"update records into projectPhase error{error}"})
|
||
pass
|
||
elif action == "delete":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
cursor.executemany("DELETE FROM projectPhase WHERE id = ?", [(record['id'],) for record in records])
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "delete", "message": "delete projectPhase records success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('delete', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "delete", "message": f"delete projectPhase records error{error}"})
|
||
pass
|
||
|
||
|
||
@app.route('/dtMachine', methods=['GET', 'POST'])
|
||
@app.route('/dtMachine', methods=['GET', 'POST'])
|
||
@cross_origin()
|
||
def station_db():
|
||
# 在这里实现你的数据处理逻辑
|
||
if request.method == 'GET':
|
||
sql_str = "SELECT * FROM dtMachine WHERE id > 0"
|
||
sql_params = []
|
||
parameters_to_check = ['label', 'SN', 'status', 'type', 'color', 'description']
|
||
|
||
def get_parameter(param):
|
||
param_sql = ""
|
||
param_value = request.args.getlist(param)
|
||
if param_value:
|
||
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
|
||
return param_sql, param_value
|
||
|
||
for param in parameters_to_check:
|
||
param_sql, param_value = get_parameter(param)
|
||
sql_str += param_sql
|
||
sql_params.extend(param_value)
|
||
print("select station params", param_value) # cyx
|
||
try:
|
||
# 从数据库查询数据
|
||
sql_str = sql_str + ' ORDER BY id ASC'
|
||
cursor = g.db.cursor()
|
||
cursor.execute(sql_str, sql_params)
|
||
except sqlite3.Error as e:
|
||
return jsonify({"error": f"Database error: {e}"}), 500
|
||
|
||
rows = cursor.fetchall()
|
||
# 获取列名
|
||
column_names = [description[0] for description in cursor.description]
|
||
|
||
result = []
|
||
for row in rows:
|
||
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
|
||
result.append(dict_row)
|
||
return jsonify({'data': result})
|
||
|
||
elif request.method == 'POST':
|
||
data = request.get_json() # 从POST请求中获取数据
|
||
action = data['action']
|
||
records = data['records']
|
||
if action is None:
|
||
return jsonify({"status": "error", "message": "action is not specified"})
|
||
elif action == "insert":
|
||
try:
|
||
# 获取并格式化当前日期
|
||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
cursor = g.db.cursor()
|
||
for record in records:
|
||
# 将当前日期添加到记录中作为创建日期
|
||
# record['createdate'] = current_date
|
||
if 'id' in record:
|
||
del record['id']
|
||
columns = ', '.join(record.keys())
|
||
placeholders = ', '.join('?' for _ in record)
|
||
sql = 'INSERT INTO dtMachine ({}) VALUES ({})'.format(columns, placeholders)
|
||
cursor.execute(sql, tuple(record.values()))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "insert", "message": "insert records into dtMachine success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('create duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "insert", "message": "insert records into dtMachine error"})
|
||
pass
|
||
|
||
elif action == "update":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
# 遍历数据,从数据库中删除相应的记录
|
||
for record in records:
|
||
sn_value = record.get('SN', None)
|
||
if sn_value is not None:
|
||
cursor.execute(
|
||
'UPDATE dtMachine SET %s WHERE SN = ?' % ', '.join(['%s = ?' % field for field in record]),
|
||
tuple(record.values()) + (record['SN'],))
|
||
g.db.commit()
|
||
return jsonify(
|
||
{"status": "success", "action": "update", "message": "update records into dtMachine success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('update duts error', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "update", "message": f"update records into dtMachine error{error}"})
|
||
pass
|
||
elif action == "delete":
|
||
try:
|
||
cursor = g.db.cursor()
|
||
cursor.executemany("DELETE FROM dtMachine WHERE id = ?", [(record['id'],) for record in records])
|
||
g.db.commit()
|
||
return jsonify({"status": "success", "action": "delete", "message": "delete dtMachine records success",
|
||
"recordsCnt": len(records)})
|
||
except sqlite3.Error as error:
|
||
print('delete', error)
|
||
return jsonify(
|
||
{"status": "error", "action": "delete", "message": f"delete dtMachine records error{error}"})
|
||
pass
|
||
|
||
|
||
# 添加一个全局错误处理器
|
||
@app.errorhandler(500)
|
||
def internal_error(error):
|
||
return jsonify({'error': 'Internal server error'}), 500
|
||
|
||
|
||
def run_server():
|
||
app.run()
|
||
|
||
|
||
def run_webview():
|
||
eel.init('web') # 告诉 Eel 查找 'web' 文件夹
|
||
eel.start('dtm_entry.html', size=(3000, 2000), position=(0, 0), suppress_error=True)
|
||
|
||
|
||
def exit_program():
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
comm_config = {
|
||
"baudrate": 9600,
|
||
"stopbits": serial.STOPBITS_ONE,
|
||
"parity": serial.PARITY_EVEN,
|
||
"bytesize": serial.SEVENBITS
|
||
}
|
||
drop_register = {
|
||
'01': {"height": 'D0160', "cycles": "D0200", "cyclesFinished": "D0202", "start": 'M0016', "stop": "M0008"},
|
||
'02': {"height": 'D0162', "cycles": "D0204", "cyclesFinished": "D0206", "start": 'M0017', "stop": "M0018"}
|
||
|
||
}
|
||
dtm_machine1 = dtm_machine.DtmMachine("com2", comm_config, drop_register, 0x01)
|
||
dtm_machines['com2'] = dtm_machine1
|
||
# dtm_machine1.read_station_dropheight('01')
|
||
# dtm_machine1.set_station_dropheight('02',531)
|
||
# dtm_machine1.set_station_cycles('02',78)
|
||
# dtm_machine1.start_station('01')
|
||
dtm_machine1.stop_station('01')
|
||
print(dtm_machine1.station_start_status('01'))
|
||
# 创建并启动Flask的线程
|
||
t = threading.Thread(target=run_server)
|
||
t.daemon = True
|
||
t.start()
|
||
# 创建并启动Webview的线程
|
||
# print("start webview")
|
||
# run_webview()
|
||
# 监听键盘输入
|
||
while True:
|
||
key = input("按下q键退出程序:")
|
||
if key.lower() == 'q':
|
||
exit_program()
|