Files
dtm-py-all/bk/dtmgtServer.py

882 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, request, jsonify, g, Response
from flask_cors import CORS, cross_origin
import sqlite3, datetime
from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ConnectionException
import eel
import threading, os, sys, platform
import serial
import plc_comm as dtm_machine
dtm_machines = {}
# 初始化 Flask 应用
app = Flask(__name__)
CORS(app) # 这会让所有的资源都支持跨域访问
CORS(app, resources=r"/*")
# 初始化 Modbus 通信
client = ModbusSerialClient(method='rtu', port='<Your COM Port>', baudrate=9600)
# 连接SQLite数据库
try:
dbConn = sqlite3.connect('db\dtmgtDb.db')
dbCursor = dbConn.cursor()
# 创建表
dbCursor.execute('''CREATE TABLE IF NOT EXISTS TEST
(ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
RESULT TEXT NOT NULL);''')
except sqlite3.Error as e:
print(f"Database error: {e}")
# 或者你可以选择抛出异常,让程序停止运行
# raise e
@app.before_request
def before_request():
g.db = sqlite3.connect('db\dtmgtDb.db')
# print("open database", g.db)
# 设置跌落机的试验参数
@app.route('/machine_control', methods=['POST'])
@cross_origin()
def set_params():
global dtm_machines
params = request.json
json_return = {"status": "error"}
actionParam = {}
"""
let control_reqs={
start:{machine:'HYSZ-DTM-002',com:'com2',station:'01'},
stop:{machine:'HYSZ-DTM-002',com:'com2',station:'01'},
setParams: {
machine: 'HYSZ-DTM-002', com: 'com2', station: '01', dropHeight: 67, dropCycles: 89,
},
getParams:{machine: 'HYSZ-DTM-002', com: 'com2', station: '01'}
}
"""
"""
dtm_machine1.read_station_dropheight('01')
dtm_machine1.set_station_dropheight('02',531)
dtm_machine1.start_station('01')
"""
# 检测键是否存在
if 'setParams' in params:
setParams = params['setParams']
actionParam = setParams
# 使用 set 检测多个键是否存在
for setParam in setParams:
required_keys = {'machine', 'com', 'station'}
if required_keys.issubset(setParam.keys()):
if setParam['com'].lower() in dtm_machines:
dtm_machine = dtm_machines[setParam['com'].lower()]
# print("set machine params", setParam, setParam['station'], setParam['dropHeight'])
try:
if 'dropHeight' in setParam:
resp = dtm_machine.set_station_dropheight(setParam['station'], int(setParam['dropHeight']))
if "status" in resp and resp["status"] == "success":
json_return["status"] = "success"
json_return["action"] = "set machine params"
pass
if 'dropCycles' in setParam:
resp = dtm_machine.set_station_cycles(setParam['station'], int(setParam['dropCycles']))
if "status" in resp and resp["status"] == "success":
json_return["status"] = "success"
json_return["action"] = "set machine params"
pass
if 'cyclesFinished' in setParam:
resp = dtm_machine.set_station_finished(setParam['station'],
int(setParam['cyclesFinished']))
if "status" in resp and resp["status"] == "success":
json_return["status"] = "success"
json_return["action"] = "set machine params"
pass
except Exception as error:
return jsonify({"status": "error", "msg": error})
pass
else:
# print("键 'machine', 'com', 'station', 'params' 不完全存在于 JSON 中")
pass
if 'start' in params or 'stop' in params or 'resume' in params:
param = None
if 'start' in params:
param = params['start']
if 'stop' in params:
param = params['stop']
if 'resume' in params:
param = params['resume']
actionParam = param
# 使用 set 检测多个键是否存在
required_keys = {'machine', 'com', 'station'}
if required_keys.issubset(param.keys()):
if param['com'].lower() in dtm_machines:
dtm_machine = dtm_machines[param['com'].lower()]
# print("set machine params", setParam, setParam['station'], setParam['dropHeight'])
try:
if 'start' in params:
resp = dtm_machine.start_station(param['station'])
if "status" in resp and resp["status"] == "success":
json_return["status"] = "success"
else:
pass
json_return["action"] = "start"
if 'resume' in params:
resp = dtm_machine.resume_station(param['station'])
if "status" in resp and resp["status"] == "success":
json_return["status"] = "success"
json_return["action"] = "resume"
if 'stop' in params:
resp = dtm_machine.stop_station(param['station'])
if "status" in resp and resp["status"] == "success":
json_return["status"] = "success"
json_return["action"] = "stop"
except Exception as error:
return jsonify({"status": "error", "msg": error})
pass
else:
# print("键 'machine', 'com', 'station', 'params' 不完全存在于 JSON 中")
pass
if 'getParams' in params:
param = params['getParams']
actionParam = param
# 使用 set 检测多个键是否存在q
required_keys = {'machine', 'com', 'station'}
if required_keys.issubset(param.keys()):
if param['com'].lower() in dtm_machines:
dtm_machine = dtm_machines[param['com'].lower()]
# print("set machine params", setParam, setParam['station'], setParam['dropHeight'])
try:
finished = dtm_machine.read_station_cyclesFinished(param['station'])
if 'status' in finished and finished['status'] == 'success' and 'value' in finished:
json_return["status"] = "success"
json_return["action"] = "getParams"
json_return["finished"] = finished['value']
except Exception as error:
return jsonify({"status": "error", "msg": error})
pass
else:
# print("键 'machine', 'com', 'station', 'params' 不完全存在于 JSON 中")
pass
return json_return
"""
CREATE TABLE DUTList (
SN TEXT (20) UNIQUE,
name TEXT (20),
project TEXT (20),
phase TEXT (16),
stationAssigned TEXT (18),
itemOnGoing TEXT (10),
itemsFinished NUMERIC (5),
status TEXT (10),
testReq TEXT (10),
inspector TEXT (20),
id INTEGER PRIMARY KEY AUTOINCREMENT
);
"""
import sqlite3
from flask import jsonify, request
@app.route('/dbTableAccess', methods=['GET', 'POST'])
@app.route('/dbTableAccess', methods=['GET', 'POST'])
@cross_origin()
def process_table_data():
print("dbTableAccess request",request) #cyx
table = request.args.get('table')
if table is None:
return jsonify({"error": "Table parameter is missing"}), 400
if request.method == 'GET':
sql_str = f"SELECT * FROM {table} WHERE id > 0"
sql_params = []
parameters_to_check = []
if table == 'dutList':
parameters_to_check = ['name', 'PN', 'description']
if table == 'testReq':
parameters_to_check = ['SN', 'dropItem', 'dropHeight', 'dropCycles', 'description']
if table == 'project':
parameters_to_check = ['name', 'key', 'manager', 'type', 'description']
if table == 'projectPhase':
parameters_to_check = ['name', 'PN', 'description']
if table == 'dtMachine':
parameters_to_check = ['label', 'SN', 'status', 'type', 'color', 'description']
def get_parameter(param):
param_sql = ""
param_value = request.args.getlist(param)
if param_value:
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
return param_sql, param_value
for param in parameters_to_check:
param_sql, param_value = get_parameter(param)
sql_str += param_sql
sql_params.extend(param_value)
try:
if table == 'dutList':
sql_str = sql_str + ' ORDER BY date(createdate) DESC'
cursor = g.db.cursor()
cursor.execute(sql_str, sql_params)
except sqlite3.Error as e:
return jsonify({"error": f"Database error: {e}"}), 500
rows = cursor.fetchall()
column_names = [description[0] for description in cursor.description]
result = []
for row in rows:
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
result.append(dict_row)
return jsonify({'data': result})
elif request.method == 'POST':
data = request.get_json()
action = data['action']
records = data['records']
if action is None:
return jsonify({"status": "error", "message": "action is not specified"})
elif action == "insert":
try:
cursor = g.db.cursor()
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
for record in records:
if 'id' in record:
del record['id']
columns = ', '.join(record.keys())
placeholders = ', '.join('?' for _ in record)
sql = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
cursor.execute(sql, tuple(record.values()))
g.db.commit()
return jsonify(
{"status": "success", "action": "insert", "message": f"insert records into {table} success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('create duts error', error)
return jsonify({"status": "error", "action": "insert", "message": f"insert records into {table} error"})
elif action == "update":
try:
cursor = g.db.cursor()
update_key = 'PN'
if table == 'dutList':
update_key = 'SN'
if table == 'testReq':
update_key = 'id'
if table == 'project':
update_key = 'key'
if table == 'projectPhase':
update_key = 'PN'
if table == 'dtMachine':
update_key = 'SN'
for record in records:
pn_value = record.get(update_key, None)
if pn_value is not None:
cursor.execute(
f'UPDATE {table} SET {", ".join(["{} = ?".format(field) for field in record])} WHERE {update_key} = ?',
tuple(record.values()) + (record[update_key],))
g.db.commit()
return jsonify(
{"status": "success", "action": "update", "message": f"update records into {table} success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('update duts error', error)
return jsonify(
{"status": "error", "action": "update", "message": f"update records into {table} error{error}"})
elif action == "delete":
try:
cursor = g.db.cursor()
cursor.executemany(f"DELETE FROM {table} WHERE id = ?", [(record['id'],) for record in records])
g.db.commit()
return jsonify({"status": "success", "action": "delete", "message": f"delete {table} records success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('delete', error)
return jsonify(
{"status": "error", "action": "delete", "message": f"delete {table} records error{error}"})
def get_parameter(param_name):
param = request.args.get(param_name)
if param is not None:
return f" AND {param_name} = ?", param
return "", None
@app.route('/dutlist', methods=['GET', 'POST'])
@app.route('/dutList', methods=['GET', 'POST'])
@cross_origin()
def dut_info_list():
# 在这里实现你的数据处理逻辑
# start_time = request.args.get('start_time')
# end_time = request.args.get('end_time')
if request.method == 'GET':
sql_str = "SELECT * FROM DUTList WHERE id > 0"
sql_params = []
parameters_to_check = ['SN', 'status', 'inspector', 'project']
def get_parameter(param):
param_sql = ""
param_value = request.args.getlist(param)
if param_value:
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
return param_sql, param_value
for param in parameters_to_check:
param_sql, param_value = get_parameter(param)
sql_str += param_sql
sql_params.extend(param_value)
# print("select dutList params", param_value) # cyx
try:
# 从数据库查询数据
sql_str = sql_str + ' ORDER BY date(createdate) DESC'
cursor = g.db.cursor()
cursor.execute(sql_str, sql_params)
except sqlite3.Error as e:
return jsonify({"error": f"Database error: {e}"}), 500
rows = cursor.fetchall()
# 获取列名
column_names = [description[0] for description in cursor.description]
result = []
for row in rows:
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
result.append(dict_row)
return jsonify({'data': result})
elif request.method == 'POST':
data = request.get_json() # 从POST请求中获取数据
action = data['action']
records = data['records']
if action is None:
return jsonify({"status": "error", "message": "action is not specified"})
elif action == "insert":
try:
# 获取并格式化当前日期
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
cursor = g.db.cursor()
for record in records:
# 将当前日期添加到记录中作为创建日期
record['createdate'] = current_date
columns = ', '.join(record.keys())
placeholders = ', '.join('?' for _ in record)
sql = 'INSERT INTO DUTList ({}) VALUES ({})'.format(columns, placeholders)
cursor.execute(sql, tuple(record.values()))
g.db.commit()
return jsonify(
{"status": "success", "action": "insert", "message": "insert records into dutList success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('create duts error', error)
return jsonify({"status": "error", "action": "insert", "message": "insert records into dutList error"})
pass
elif action == "update":
try:
cursor = g.db.cursor()
# 遍历数据,从数据库中删除相应的记录
for record in records:
sn_value = record.get('SN', None)
if sn_value is not None:
cursor.execute(
'UPDATE DUTList SET %s WHERE SN = ?' % ', '.join(['%s = ?' % field for field in record]),
tuple(record.values()) + (record['SN'],))
g.db.commit()
return jsonify({"status": "success", "action": "update", "message": "update dutList success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('update duts error', error)
return jsonify({"status": "error", "action": "update", "message": f"update dutList records error{error}"})
pass
elif action == "delete":
try:
cursor = g.db.cursor()
cursor.executemany("DELETE FROM DUTList WHERE SN = ?", [(record['SN'],) for record in records])
g.db.commit()
return jsonify({"status": "success", "action": "delete", "message": "delete dutList records success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('delete', error)
return jsonify(
{"status": "error", "action": "delete", "message": f"delete dutList records error{error}"})
pass
@app.route('/testReq', methods=['GET', 'POST'])
@app.route('/testReq', methods=['GET', 'POST'])
@cross_origin()
def test_requirements():
# 在这里实现你的数据处理逻辑
if request.method == 'GET':
sql_str = "SELECT * FROM TestReq WHERE id > 0"
sql_params = []
parameters_to_check = ['SN', 'dropItem', 'dropHeight', 'dropCycles', 'description']
def get_parameter(param):
param_sql = ""
param_value = request.args.getlist(param)
if param_value:
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
return param_sql, param_value
for param in parameters_to_check:
param_sql, param_value = get_parameter(param)
sql_str += param_sql
sql_params.extend(param_value)
# print("select testReq params", param_value) # cyx
try:
# 从数据库查询数据
sql_str = sql_str + ' ORDER BY id ASC'
cursor = g.db.cursor()
cursor.execute(sql_str, sql_params)
except sqlite3.Error as e:
return jsonify({"error": f"Database error: {e}"}), 500
rows = cursor.fetchall()
# 获取列名
column_names = [description[0] for description in cursor.description]
result = []
for row in rows:
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
result.append(dict_row)
return jsonify({'data': result})
elif request.method == 'POST':
data = request.get_json() # 从POST请求中获取数据
action = data['action']
records = data['records']
if action is None:
return jsonify({"status": "error", "message": "action is not specified"})
elif action == "insert":
try:
# 获取并格式化当前日期
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
cursor = g.db.cursor()
for record in records:
# 将当前日期添加到记录中作为创建日期
# record['createdate'] = current_date
if 'id' in record:
del record['id']
columns = ', '.join(record.keys())
placeholders = ', '.join('?' for _ in record)
sql = 'INSERT INTO TestReq ({}) VALUES ({})'.format(columns, placeholders)
cursor.execute(sql, tuple(record.values()))
g.db.commit()
return jsonify(
{"status": "success", "action": "insert", "message": "insert records into testReq success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('create duts error', error)
return jsonify({"status": "error", "action": "insert", "message": "insert records into testReq error"})
pass
elif action == "update":
try:
cursor = g.db.cursor()
# 遍历数据,从数据库中删除相应的记录
for record in records:
id_value = record.get('id', None)
if id_value is not None:
cursor.execute(
'UPDATE TestReq SET %s WHERE id = ?' % ', '.join(['%s = ?' % field for field in record]),
tuple(record.values()) + (record['id'],))
g.db.commit()
return jsonify(
{"status": "success", "action": "update", "message": "update record into testReq success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('update duts error', error)
return jsonify(
{"status": "error", "action": "update", "message": f"update records into testReq error{error}"})
pass
elif action == "delete":
try:
cursor = g.db.cursor()
cursor.executemany("DELETE FROM TestReq WHERE id = ?", [(record['id'],) for record in records])
g.db.commit()
return jsonify({"status": "success", "action": "delete", "message": "delete testReq records success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('delete', error)
return jsonify(
{"status": "error", "action": "delete", "message": f"delete testReq records error{error}"})
pass
@app.route('/project', methods=['GET', 'POST'])
@app.route('/project', methods=['GET', 'POST'])
@cross_origin()
def project_db():
# 在这里实现你的数据处理逻辑
if request.method == 'GET':
sql_str = "SELECT * FROM project WHERE id > 0"
sql_params = []
parameters_to_check = ['name', 'key', 'manager', 'type', 'description']
def get_parameter(param):
param_sql = ""
param_value = request.args.getlist(param)
if param_value:
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
return param_sql, param_value
for param in parameters_to_check:
param_sql, param_value = get_parameter(param)
sql_str += param_sql
sql_params.extend(param_value)
# print("select project params", param_value) # cyx
try:
# 从数据库查询数据
sql_str = sql_str + ' ORDER BY id ASC'
cursor = g.db.cursor()
cursor.execute(sql_str, sql_params)
except sqlite3.Error as e:
return jsonify({"error": f"Database error: {e}"}), 500
rows = cursor.fetchall()
# 获取列名
column_names = [description[0] for description in cursor.description]
result = []
for row in rows:
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
result.append(dict_row)
return jsonify({'data': result})
elif request.method == 'POST':
data = request.get_json() # 从POST请求中获取数据
action = data['action']
records = data['records']
if action is None:
return jsonify({"status": "error", "message": "action is not specified"})
elif action == "insert":
try:
# 获取并格式化当前日期
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
cursor = g.db.cursor()
for record in records:
# 将当前日期添加到记录中作为创建日期
# record['createdate'] = current_date
if 'id' in record:
del record['id']
columns = ', '.join(record.keys())
placeholders = ', '.join('?' for _ in record)
sql = 'INSERT INTO project ({}) VALUES ({})'.format(columns, placeholders)
cursor.execute(sql, tuple(record.values()))
g.db.commit()
return jsonify(
{"status": "success", "action": "insert", "message": "insert records into project success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('create duts error', error)
return jsonify({"status": "error", "action": "insert", "message": "insert records into project error"})
pass
elif action == "update":
try:
cursor = g.db.cursor()
# 遍历数据,从数据库中删除相应的记录
for record in records:
key_value = record.get('key', None)
if key_value is not None:
cursor.execute(
'UPDATE project SET %s WHERE key = ?' % ', '.join(['%s = ?' % field for field in record]),
tuple(record.values()) + (record['key'],))
g.db.commit()
return jsonify(
{"status": "success", "action": "update", "message": "update records into project success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('update duts error', error)
return jsonify(
{"status": "error", "action": "update", "message": f"update records into project error{error}"})
pass
elif action == "delete":
try:
cursor = g.db.cursor()
cursor.executemany("DELETE FROM project WHERE id = ?", [(record['id'],) for record in records])
g.db.commit()
return jsonify({"status": "success", "action": "delete", "message": "delete project records success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('delete project', error, records)
return jsonify(
{"status": "error", "action": "delete", "message": f"delete project records error{error}"})
pass
@app.route('/projectPhase', methods=['GET', 'POST'])
@app.route('/projectPhase', methods=['GET', 'POST'])
@cross_origin()
def projecPhase_db():
# 在这里实现你的数据处理逻辑
if request.method == 'GET':
sql_str = "SELECT * FROM projectPhase WHERE id > 0"
sql_params = []
parameters_to_check = ['name', 'PN', 'description']
def get_parameter(param):
param_sql = ""
param_value = request.args.getlist(param)
if param_value:
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
return param_sql, param_value
for param in parameters_to_check:
param_sql, param_value = get_parameter(param)
sql_str += param_sql
sql_params.extend(param_value)
# print("select projectPhase params", param_value) # cyx
try:
# 从数据库查询数据
sql_str = sql_str + ' ORDER BY id ASC'
cursor = g.db.cursor()
cursor.execute(sql_str, sql_params)
except sqlite3.Error as e:
return jsonify({"error": f"Database error: {e}"}), 500
rows = cursor.fetchall()
# 获取列名
column_names = [description[0] for description in cursor.description]
result = []
for row in rows:
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
result.append(dict_row)
return jsonify({'data': result})
elif request.method == 'POST':
data = request.get_json() # 从POST请求中获取数据
action = data['action']
records = data['records']
if action is None:
return jsonify({"status": "error", "message": "action is not specified"})
elif action == "insert":
try:
# 获取并格式化当前日期
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
cursor = g.db.cursor()
for record in records:
# 将当前日期添加到记录中作为创建日期
# record['createdate'] = current_date
if 'id' in record:
del record['id']
columns = ', '.join(record.keys())
placeholders = ', '.join('?' for _ in record)
sql = 'INSERT INTO projectPhase ({}) VALUES ({})'.format(columns, placeholders)
cursor.execute(sql, tuple(record.values()))
g.db.commit()
return jsonify(
{"status": "success", "action": "insert", "message": "insert records into projectPhase success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('create duts error', error)
return jsonify(
{"status": "error", "action": "insert", "message": "insert records into projectPhase error"})
pass
elif action == "update":
try:
cursor = g.db.cursor()
# 遍历数据,从数据库中删除相应的记录
for record in records:
pn_value = record.get('PN', None)
if pn_value is not None:
cursor.execute(
'UPDATE projectPhase SET %s WHERE PN = ?' % ', '.join(
['%s = ?' % field for field in record]),
tuple(record.values()) + (record['PN'],))
g.db.commit()
return jsonify(
{"status": "success", "action": "update", "message": "update records into projectPhase success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('update duts error', error)
return jsonify(
{"status": "error", "action": "update", "message": f"update records into projectPhase error{error}"})
pass
elif action == "delete":
try:
cursor = g.db.cursor()
cursor.executemany("DELETE FROM projectPhase WHERE id = ?", [(record['id'],) for record in records])
g.db.commit()
return jsonify(
{"status": "success", "action": "delete", "message": "delete projectPhase records success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('delete', error)
return jsonify(
{"status": "error", "action": "delete", "message": f"delete projectPhase records error{error}"})
pass
@app.route('/dtMachine', methods=['GET', 'POST'])
@app.route('/dtMachine', methods=['GET', 'POST'])
@cross_origin()
def station_db():
# 在这里实现你的数据处理逻辑
if request.method == 'GET':
sql_str = "SELECT * FROM dtMachine WHERE id > 0"
sql_params = []
parameters_to_check = ['label', 'SN', 'status', 'type', 'color', 'description']
def get_parameter(param):
param_sql = ""
param_value = request.args.getlist(param)
if param_value:
param_sql = f" AND {param} IN ({','.join(['?'] * len(param_value))})"
return param_sql, param_value
for param in parameters_to_check:
param_sql, param_value = get_parameter(param)
sql_str += param_sql
sql_params.extend(param_value)
print("select station params", param_value) # cyx
try:
# 从数据库查询数据
sql_str = sql_str + ' ORDER BY id ASC'
cursor = g.db.cursor()
cursor.execute(sql_str, sql_params)
except sqlite3.Error as e:
return jsonify({"error": f"Database error: {e}"}), 500
rows = cursor.fetchall()
# 获取列名
column_names = [description[0] for description in cursor.description]
result = []
for row in rows:
dict_row = {col: row[idx] if row[idx] is not None else None for idx, col in enumerate(column_names)}
result.append(dict_row)
return jsonify({'data': result})
elif request.method == 'POST':
data = request.get_json() # 从POST请求中获取数据
action = data['action']
records = data['records']
if action is None:
return jsonify({"status": "error", "message": "action is not specified"})
elif action == "insert":
try:
# 获取并格式化当前日期
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
cursor = g.db.cursor()
for record in records:
# 将当前日期添加到记录中作为创建日期
# record['createdate'] = current_date
if 'id' in record:
del record['id']
columns = ', '.join(record.keys())
placeholders = ', '.join('?' for _ in record)
sql = 'INSERT INTO dtMachine ({}) VALUES ({})'.format(columns, placeholders)
cursor.execute(sql, tuple(record.values()))
g.db.commit()
return jsonify(
{"status": "success", "action": "insert", "message": "insert records into dtMachine success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('create duts error', error)
return jsonify(
{"status": "error", "action": "insert", "message": "insert records into dtMachine error"})
pass
elif action == "update":
try:
cursor = g.db.cursor()
# 遍历数据,从数据库中删除相应的记录
for record in records:
sn_value = record.get('SN', None)
if sn_value is not None:
cursor.execute(
'UPDATE dtMachine SET %s WHERE SN = ?' % ', '.join(['%s = ?' % field for field in record]),
tuple(record.values()) + (record['SN'],))
g.db.commit()
return jsonify(
{"status": "success", "action": "update", "message": "update records into dtMachine success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('update duts error', error)
return jsonify(
{"status": "error", "action": "update", "message": f"update records into dtMachine error{error}"})
pass
elif action == "delete":
try:
cursor = g.db.cursor()
cursor.executemany("DELETE FROM dtMachine WHERE id = ?", [(record['id'],) for record in records])
g.db.commit()
return jsonify({"status": "success", "action": "delete", "message": "delete dtMachine records success",
"recordsCnt": len(records)})
except sqlite3.Error as error:
print('delete', error)
return jsonify(
{"status": "error", "action": "delete", "message": f"delete dtMachine records error{error}"})
pass
# 添加一个全局错误处理器
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
def run_server():
app.run()
def run_webview():
eel.init('web') # 告诉 Eel 查找 'web' 文件夹
eel.start('dtm_entry.html', size=(3000, 2000), position=(0, 0), suppress_error=True)
def exit_program():
sys.exit(0)
if __name__ == "__main__":
comm_config = {
"baudrate": 9600,
"stopbits": serial.STOPBITS_ONE,
"parity": serial.PARITY_EVEN,
"bytesize": serial.SEVENBITS
}
drop_register = {
'01': {"height": 'D0160', "cycles": "D0200", "cyclesFinished": "D0202", "start": 'M0016', "stop": "M0008"},
'02': {"height": 'D0162', "cycles": "D0204", "cyclesFinished": "D0206", "start": 'M0017', "stop": "M0018"}
}
dtm_machine1 = dtm_machine.DtmMachine("com2", comm_config, drop_register, 0x01)
dtm_machines['com2'] = dtm_machine1
# dtm_machine1.read_station_dropheight('01')
# dtm_machine1.set_station_dropheight('02',531)
# dtm_machine1.set_station_cycles('02',78)
# dtm_machine1.start_station('01')
dtm_machine1.stop_station('01')
print(dtm_machine1.station_start_status('01'))
# 创建并启动Flask的线程
t = threading.Thread(target=run_server)
t.daemon = True
t.start()
# 创建并启动Webview的线程
# print("start webview")
# run_webview()
# 监听键盘输入
while True:
key = input("按下q键退出程序")
if key.lower() == 'q':
exit_program()