Files
dtm-py-all/dtMachineService.py

1597 lines
88 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import re
import sys
import time
import pymodbus
import serial
import threading
import binascii
import datetime
import requests
from collections.abc import MutableMapping, MutableSequence
from dtMachine_modbus import ModbusDtMachine
from utils import find_item
from Observable import ObservableDict, ObservableList
from utils import PLCTestReqResultData, PLCDropParamsData, print_with_timestamp
"""
# 构建请求数据
data = {
'action': 'insert',
'records': [
# 插入记录的数据
{
'column1': 'value1',
'column2': 'value2'
},
# 更多记录...
]
}
# 发送POST请求
url = 'http://your-api-endpoint' # 替换为实际的API端点
response = requests.post(url, json=data, params=param_values)
"""
dbServer_aloned = False
url_base = 'http://127.0.0.1:5050/'
# url_base = 'http://106.52.71.204:5050/'
comm_config_xinjie = {
"baudrate": 19200,
"stopbits": serial.STOPBITS_ONE,
"parity": serial.PARITY_EVEN,
"bytesize": serial.EIGHTBITS
}
comm_config = {
"baudrate": 9600,
"stopbits": serial.STOPBITS_ONE,
"parity": serial.PARITY_EVEN,
"bytesize": serial.SEVENBITS
}
drop_register = {
'01': {"height": "D0160", "cycles": "D0200", "cyclesFinished": "D0202", "start": "M0016", "stop": "M0008",
"runStatus": "M0010"},
'02': {"height": "D0162", "cycles": "D0204", "cyclesFinished": "D0206", "start": "M0017", "stop": "M0018",
"runStatus": "M0020"},
'03': {"height": "D0164", "cycles": "D0210", "cyclesFinished": "D0212", "start": "M0026", "stop": "M0028",
"runStatus": "M0030"},
'04': {"height": "D0166", "cycles": "D0214", "cyclesFinished": "D0216", "start": "M0027", "stop": "M0038",
"runStatus": "M0040"}
}
PLC_CONNECT_REGISTER = 'D10'
# json 数组中找包含key的键值==value 的记录
# 返回的是满足条件的所有记录
POLL_PLC_INTERVAL = 0.6
OFF_LINE_FOR_DEBUG = False
INVALID_DROP_DIRECTION = 21
TestReqResult_ItemCountOnceRead = 1
DROP_DIRECTIONS = {
0: "正面",
1: "背面",
2: "左面",
3: "右面",
4: "上面",
5: "下面",
6: "顶面",
7: "底面",
8: "前面",
9: "后面",
10: "左侧边",
11: "右侧边",
12: "头部",
13: "尾部",
14: "底部",
15: "随机方向",
16: "备用1",
17: "备用2",
18: "备用3",
19: "备用4",
20: "备用5",
21: "禁用"
}
# ============================================================================
# 核心业务服务类: 跌落机测试管理服务
# ============================================================================
# 功能职责:
# 1. 管理多台跌落机及其工位状态
# 2. 与Modbus网关通信,读写PLC寄存器
# 3. 轮询PLC数据,同步设备状态
# 4. 解析测试结果,触发存储和上传
# 5. 提供REST API供前端调用
#
# 设计模式:
# - 每台跌落机独立线程轮询(0.6秒间隔)
# - 观察者模式: 数据变化自动回调前端
# - 工厂模式: 根据PLC类型创建不同通信对象
# ============================================================================
class DtmMachineService:
"""跌落机测试管理服务 - 核心业务逻辑"""
def __init__(self, modbusServer='127.0.0.1', modbusServerPort=5020, access_table_func=None, flask_app=None,
dutDirectionMode=2, cb_station_result_change=None, cb_machine_data_change=None, cb_insert_result=None):
"""
初始化跌落机管理服务
参数:
modbusServer: Modbus网关IP地址
modbusServerPort: Modbus网关端口(默认5020)
access_table_func: 数据库访问函数(可选,用于内部调用)
flask_app: Flask应用实例(可选,用于测试)
dutDirectionMode: 跌落方向模式(2=从PLC读取)
cb_station_result_change: 工位结果变化回调
cb_machine_data_change: 设备数据变化回调
cb_insert_result: 测试结果插入回调
"""
# 操作互斥标志 (WARNING: 非线程安全,需改进)
self.busy_flag = False
# Modbus网关连接配置
self.modbus_loop = None
self.modbusServer = modbusServer
self.modbusServerPort = modbusServerPort
# 数据库访问接口
self.access_table_func = access_table_func # 直接调用数据库函数
self.flask_app = flask_app # Flask应用(用于测试上下文)
# 业务配置
self.dutDirectionMode = dutDirectionMode # 2=从PLC HMI读取跌落参数
# 回调函数注册
self.cb_station_result_change = cb_station_result_change # 通知前端工位结果更新
self.cb_machine_data_change = cb_machine_data_change # 通知前端设备状态更新
self.cb_insert_result = cb_insert_result # 触发本地存储和LMIS上传
# 设备配置数据(从数据库加载)
self.machineConfigsListData = []
result = self.getTableRecords('dtMachine', {}) # 从数据库中得到所有的已经配置的设备(跌落机)
if 'data' in result:
result = result["data"]
self.machineConfigsListData = result
# 设备运行时数据(包含工位状态、轮询线程等)
self.dtMachineStationData = []
self.poll_connect_thread = None
# 初始化设备和工位数据结构
self.initMachineStationData()
# 从数据库中获取满足params条件的记录表格名为table
def getTableRecords(self, table, params):
result = {}
query_string = {**params, 'table': table}
# 确保所有查询参数都转换为字符串
query_string = {k: str(v) for k, v in query_string.items()}
if self.access_table_func is None:
try:
response = requests.get(url_base + '/dbTableAccess', params=query_string, json={})
if response.status_code == 200:
result = response.json()
else:
print_with_timestamp('请求失败,状态码:', response.status_code, color='red')
except requests.RequestException as e:
print_with_timestamp('请求过程中发生错误:', e, color='red')
else:
if self.flask_app is None:
print_with_timestamp('错误: flask_app 未初始化,无法执行内部调用', color='red')
return result
try:
# 这个只是准备环境
with self.flask_app.test_request_context('/dbTableAccess',
method='GET',
query_string=query_string,
data={},
content_type='application/json'):
# 触发测试视图函数
response = self.access_table_func()
if response.status_code == 200:
response_data = response.get_data(as_text=True)
# 解析为 JSON
try:
result = json.loads(response_data)
except json.JSONDecodeError as error:
print_with_timestamp(f'JSON 解析错误:{error}', color='red')
else:
print_with_timestamp(f'请求失败,状态码:{response.status_code}', color='red')
except Exception as error:
print_with_timestamp(f'测试请求过程中发生错误:{error}', color='red')
return result
def updateTableRecords(self, table, records):
result = {}
query_string = {'table': table}
data = {"action": "update", "records": records}
if self.access_table_func is None:
response = requests.post(url_base + '/dbTableAccess', params=query_string, json=data)
if response.status_code == 200:
result = response.json()
else:
print_with_timestamp('请求失败', color='red')
pass
else:
if self.flask_app is None:
print_with_timestamp('错误: flask_app 未初始化,无法执行内部调用', color='red')
return result
try:
# 这个只是准备环境
with self.flask_app.test_request_context('/dbTableAccess',
method='POST',
query_string=query_string,
json=data,
content_type='application/json'):
# 触发测试视图函数
response = self.access_table_func()
if response.status_code == 200:
response_data = response.get_data(as_text=True)
# 解析为 JSON
try:
result = json.loads(response_data)
except Exception as error:
pass
except Exception as error:
print_with_timestamp(f'测试请求过程中发生错误:{error}', color='red')
return result
def insertTableRecords(self, table, records):
result = {}
query_string = {'table': table}
data = {"action": "insert", "records": records}
if self.access_table_func is None:
response = requests.post(url_base + '/dbTableAccess', params=query_string, json=data)
if response.status_code == 200:
result = response.json()
else:
print_with_timestamp('请求失败', color='red')
pass
else:
if self.flask_app is None:
print_with_timestamp('错误: flask_app 未初始化,无法执行内部调用', color='red')
return result
try:
# 这个只是准备环境
with self.flask_app.test_request_context('/dbTableAccess',
method='POST',
query_string=query_string,
json=data,
content_type='application/json'):
# 触发测试视图函数
response = self.access_table_func()
if response.status_code == 200:
response_data = response.get_data(as_text=True)
# 解析为 JSON
try:
result = json.loads(response_data)
if result['status'] == 'error':
print_with_timestamp(f"----insert table error--0:{result}", color='red')
except Exception as error:
print_with_timestamp(f"----insert table error:{error}", color='red')
pass
except Exception as error:
print_with_timestamp(f'测试请求过程中发生错误:{error}', color='red')
return result
def initMachineStationData(self):
stationIndex = 0
for index, machine_config in enumerate(self.machineConfigsListData):
machine_config['selected'] = True # 初始化选中 ,数据库中没有此字段
if 'plcAddress' not in machine_config: # PLC的站点地址默认为0x01
machine_config['plcAddress'] = '01'
if machine_config['plcAddress'] is None:
machine_config['plcAddress'] = '01'
if 'com' not in machine_config or machine_config['com'] is None: # 和plc 通讯的串口如果没有设置设置默认为com2
machine_config['com'] = 'com2'
else:
machine_config['com'] = machine_config['com'].lower() # 转成小写
pass
machine_config['tcp_modbus_unit'] = index + 1
machine_drop_register = {}
try:
# 从machine 的数据库表格中取得 PLC 对应的寄存器配置 总共可以设置4个工作台01--04
machine_baud_register = json.loads(machine_config.get('baudrate'))
machine_drop_register['01'] = json.loads(machine_config.get('register01'))
machine_drop_register['02'] = json.loads(machine_config.get('register02'))
machine_drop_register['03'] = json.loads(machine_config.get('register03'))
machine_drop_register['04'] = json.loads(machine_config.get('register04'))
if machine_baud_register is None:
machine_baud_register = comm_config
except Exception as error:
# 如果从数据库表格中取得寄存器配置失败,则使用默认值
machine_drop_register = drop_register
machine_baud_register = comm_config
print_with_timestamp(f"dtMachineService 从数据库表格中取得寄存器配置失败,使用默认值,{error}",
color='red')
finally:
pass
if machine_config.get('type') == 'xinjie':
plc_com_config = comm_config_xinjie
plc_com_config = machine_baud_register
machine = {'label': machine_config.get('label'), 'SN': machine_config.get('SN'), 'key': index,
'com': machine_config.get('com'), 'testType': machine_config.get('testType'),
'plcAddress': machine_config.get('plcAddress'), 'type': machine_config.get('type'),
'connectState': 0x00, # 初始化为离线状态等待PLC检测后更新
'virt_plc_device': ModbusDtMachine(machine_config.get('type'), machine_config.get('com'),
plc_com_config, machine_drop_register,
int(machine_config.get('plcAddress')),
machine_config.get('tcp_modbus_unit'),
machine_config.get('SN')),
'comm_config': plc_com_config,
'tcp_modbus_unit': index + 1} # tcp_modbus_unit 对应modbus tcp的unitid 站点号虚拟的和串口的PLC 不一样
machine['poll_thread'] = threading.Thread(target=self.machine_poll, daemon=True,
args=[{"machine": machine, "busy_flag": self.busy_flag}])
machine['poll_thread'].start()
stations = []
stationsAvailable = [False, False, False, False]
for i in range(1, 5):
if f'station{i}' in machine_config and machine_config[f'station{i}'] \
and machine_config[f'station{i}'] != 'NA':
stationsAvailable[i - 1] = True
for i in range(1, 5):
if stationsAvailable[i - 1]:
station_inner = {'dtMachineSN': machine_config['SN'], 'dtMachineLabel': machine_config['label'],
'label': f'工位{i}', 'SN': f'0{i}', 'key': index, 'formal': True,
'status': 'idle', 'index': stationIndex}
debugDutData = {'name': '二型电池组件', 'SN': '38383838', 'project': 'CB-IHU21',
'phase': 'A Sample',
'testReq': 'kdkakd', 'itemOnGoing': '正面-500', 'itemFinished': 8}
# dtStationAttachDut('init', station, debugDutData, False)
station = ObservableDict(station_inner, self.machine_data_setter, 0.3)
stations.append(station)
else:
hasFollowed = False
for j in range(i + 1, 5):
hasFollowed = hasFollowed or stationsAvailable[j - 1]
if hasFollowed:
station_inner = {'dtMachineSN': machine_config['SN'], 'label': 'NA', 'SN': -1}
station = ObservableDict(station_inner, self.machine_data_setter, 0.3)
stations.append(station)
machine['stations'] = stations
self.dtMachineStationData.append(machine) # 增加1台机器(每台机器最多4个工作台)
# print_with_timestamp(self.dtMachineStationData[1])
def machine_data_setter(self, action, value):
if isinstance(value, dict) and value.get('dtMachineSN') and self.cb_machine_data_change:
machine_sn = value.get('dtMachineSN')
self.cb_machine_data_change([machine_sn], self.get_machines_value([machine_sn]))
# print_with_timestamp(f"machine_data_setter {action} {station.get('dtMachineSN')} {value.get('SN')}")
pass
def get_machine_com_config(self, machine_sn=None):
value = []
for machine in self.dtMachineStationData:
if machine_sn and machine_sn == machine['SN']:
return [{'SN': machine.get('SN'), 'com': machine.get('com'),
'comm_config': machine.get('comm_config'),
'plcAddress': machine.get('plcAddress'), 'type': machine.get('type'),
'tcp_modbus_unit': machine.get('tcp_modbus_unit')}]
if not machine_sn:
value.append({'SN': machine.get('SN'), 'com': machine.get('com'),
'comm_config': machine.get('comm_config'),
'plcAddress': machine.get('plcAddress'), 'type': machine.get('type'),
'tcp_modbus_unit': machine.get('tcp_modbus_unit')})
return value
def dtm_virt_machine_client_connect(self):
for machine in self.dtMachineStationData:
if machine.get('virt_plc_device') and isinstance(machine.get('virt_plc_device'), ModbusDtMachine):
machine['virt_plc_device'].connect_to_modbus_server(self.modbusServer, self.modbusServerPort)
pass
def data_change_callback(self, station, testReqId): # dutDirectionMode 为2时不执行
cyclesTotal = 0
try:
dut = station.get("dut")
testReqList = dut.get("testReqList")
for testReq in testReqList:
cyclesTotal = cyclesTotal + int(testReq.get("dropCycles"))
except Exception as e:
pass
response = self.getTableRecords('testReqResult', {'id': testReqId, 'pageSize': 100, 'currentPage': 1})
if 'data' in response and isinstance(response['data'], list): # 如果返回多个结果,取第一个:
data = response['data'][0]
data['cyclesTotal'] = cyclesTotal
if 'stationAssigned' in data and 'dtMachine' in data:
dtMachineResponse = \
self.getTableRecords('dtMachine', {'SN': data.get("dtMachine")}) # 获得设备名称和状态
if 'data' in dtMachineResponse and isinstance(dtMachineResponse['data'], list): # 如果返回多个结果,取第一个:
data['deviceName'] = dtMachineResponse['data'][0].get('label') # 数据库中设备名称为label
data['deviceStatus'] = dtMachineResponse['data'][0].get('status') # 数据库中设备名称为label
pass
if self.cb_station_result_change:
self.cb_station_result_change(station, data)
pass
def drop_params_to_dut_req_list(self, station, drop_params): # 从PLC取得的跌落参数 转成station 中的testReqList
if station.get('dut'):
dut = station.get('dut')
dutDropCycles = 0
old_testReqList = dut.get('testReqList', []) # 保存旧的testReqList
dut['testReqList'] = []
for direction_drop_param in drop_params:
if direction_drop_param.get('dropCode') != INVALID_DROP_DIRECTION: # 表示无效的跌落方向
drop_param_json = {}
drop_param_json['dropCode'] = direction_drop_param.get('dropCode', 0)
# 20251201修改直接方向名称给到前端
direction_name = DROP_DIRECTIONS.get(direction_drop_param.get('dropCode'), "未知方向")
drop_param_json['item'] = direction_name
drop_height = direction_drop_param.get('dropHeight')
if isinstance(drop_height, int): # 需要进行除100PLC中读回的参数是高度 x 100 (单位为mm)
drop_param_json['height'] = drop_height / 100
else:
drop_param_json['height'] = drop_height
drop_param_json['count'] = direction_drop_param.get('dropCycles')
# 保留之前的itemCurrentCycles
for old_req in old_testReqList:
if old_req.get('item') == direction_name and 'itemCurrentCycles' in old_req:
drop_param_json['itemCurrentCycles'] = old_req['itemCurrentCycles']
break
dut['testReqList'].append(drop_param_json)
dutDropCycles = dutDropCycles + direction_drop_param.get('dropCycles')
dut['dutDropCycles'] = dutDropCycles # 样品设定的跌落总次数
pass
def test_result_add_dut_info(self, station, test_result, dutInfo): # 将dut 信息添加到测试结果中
if dutInfo:
test_result['dutProject'] = dutInfo.get("project", "") # 项目代码
test_result['dutPhase'] = dutInfo.get("projectPhase", "") # 项目阶段
test_result['dutProjectType'] = dutInfo.get("projectType", "") # 项目方案
test_result['dutWeeks'] = str(dutInfo.get("weeks", 0)) # 测试周次
test_result['dutWorkOrder'] = dutInfo.get("workOrder", "") # 工单
if 'SN' in dutInfo:
test_result['SN'] = dutInfo.get("SN", "") # 产品序列号
elif "dutSN" in dutInfo:
test_result['SN'] = dutInfo.get("dutSN", "") # 产品序列号
# 从PLC读出的测试结果(系列寄存器) 进行处理
def handle_test_result_registers_value(self, machine, station, registers_value, itemCount):
# print_with_timestamp(f"handle_test_result_registers_value {itemCount} {registers_value}")
registers_len = PLCTestReqResultData.required_registers_size
sub_registers = [registers_value[i * registers_len:(i + 1) * registers_len] for i in range(itemCount)]
for index, testReqResultRegisters in enumerate(sub_registers):
pre_result_sn = station.get("pre_dut_result_counter", 0)
try:
test_result, error_parse = PLCTestReqResultData.parse_registers(testReqResultRegisters)
dut_result_counter = test_result.get('stationDropCycles', 0)
print_with_timestamp(
f"handle_test_result_registers_value --1 stationDropCycles {dut_result_counter} {pre_result_sn} "
f"{station['result_index_read']}")
if dut_result_counter == 0:
print_with_timestamp(f"handle_test_result_registers_value --2 {test_result}")
if dut_result_counter > pre_result_sn: # 新的测试结果
station['pre_dut_result_counter'] = dut_result_counter
# station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10
update_counter = {"dutDropCycles": test_result['dutDropCycles'],
"itemCurrentCycles": test_result['itemCurrentCycles'],
'current_counter': station['last_insert_counter']}
if error_parse:
update_counter['error2'] = error_parse
self.updateTableRecords('get_counter', [update_counter])
cb_test_result = {}
cb_test_result['dropDirection'] = test_result['directionCode'] # 跌落方向
cb_test_result['dropHeight_int'] = test_result['dropHeight'] # 跌落高度
cb_test_result['dropCycles'] = test_result['dutDropCycles'] # 样品跌落测试
cb_test_result['dropSpeed'] = test_result['dropSpeed'] # 跌落速度
cb_test_result['itemCurrentCycles'] = test_result['itemCurrentCycles'] # 当前方向跌落次数
cb_test_result['stationDropCycles'] = test_result['stationDropCycles'] # 木板跌落测试
cb_test_result['machine_sn'] = test_result['machine_sn']
cb_test_result['station_no'] = test_result['station_no']
time_text = (
f"begin:{test_result.get('beginYear')}-"
f"{test_result.get('beginMonth')}-{test_result.get('beginDay')}"
f" {test_result.get('beginHour')}:{test_result.get('beginMinute')}"
f":{test_result.get('beginSecond')} end:{test_result.get('endYear')}-"
f"{test_result.get('endMonth')}-{test_result.get('endDay')}"
f" {test_result.get('endHour')}:{test_result.get('endMinute')}"
f":{test_result.get('endSecond')} "
)
cb_test_result['time_text'] = time_text
try:
cb_test_result['startTime'] = test_result.get('startTime', "")
cb_test_result['endTime'] = test_result.get('endTime', "")
if station['dut']['duts']:
duts_on_station = station['dut']['duts'].to_list()
except Exception as error:
update_counter = {"error1": f"error!!!!--set startTime endTime {error}",
'current_counter': station['last_insert_counter']}
self.updateTableRecords('get_counter', [update_counter])
finally:
# self.insertTableRecords('TestReq', [cb_test_result])
# 20250901 修改兼容1个工位(通道) 可以放置多个测试样品 cyx
if duts_on_station and isinstance(duts_on_station, list):
for dut in duts_on_station:
cb_insert_result_copied = dict(cb_test_result)
self.test_result_add_dut_info(station, cb_insert_result_copied, dut)
# print_with_timestamp(f"handle_test_result_registers_value --2 {cb_insert_result_copied} ")
if self.cb_insert_result: # 将结果进行回调处理存入数据库和上传LIMS
self.cb_insert_result(cb_insert_result_copied)
else:
self.test_result_add_dut_info(station, cb_test_result, station['dut'])
if self.cb_insert_result: # 将结果进行回调处理存入数据库和上传LIMS
self.cb_insert_result(cb_test_result)
directionCodeOngoing = test_result.get('directionCode') # 此条记录的跌落方向码
if station['dut'] and station['dut']['dutDropCycles'] and 0: # 判断样品试验是否已经完成 202509 改判断PLC状态
# print_with_timestamp(f"result {test_result['dutDropCycles'] } {station['dut']['dutDropCycles'] }")
if int(test_result['dutDropCycles']) >= int(station['dut']['dutDropCycles']):
print_with_timestamp(
f"----station test finished!----", color='green')
station['status'] = 'finished' # 设置工位试验已经完成的标志
# print_with_timestamp(f"{itemCount} index -- {result_index + index} {test_result.get(
# 'itemCurrentCycles')}", test_result)
if station['dut'] and station['dut']['testReqList']:
station['lastDropResult'] = (
f"{test_result.get('itemSN')} 方向:{test_result.get('directionCode')} begin:{test_result.get('beginYear')}-"
f"{test_result.get('beginMonth')}-{test_result.get('beginDay')}"
f" {test_result.get('beginHour')}:{test_result.get('beginMinute')}"
f":{test_result.get('beginSecond')} end:{test_result.get('endYear')}-"
f"{test_result.get('endMonth')}-{test_result.get('endDay')}"
f" {test_result.get('endHour')}:{test_result.get('endMinute')}"
f":{test_result.get('endSecond')} 方向次数:{test_result.get('itemCurrentCycles')}"
f"样品总次数:{test_result.get('dutDropCycles')} 木板累积次数:{test_result.get('stationDropCycles')}"
f" 高度:{test_result.get('dropHeight') / 100} 高度:{test_result.get('dropSpeed') / 10}"
f" 通道:{test_result.get('station_no')} 设备编码:{test_result.get('machine_sn')}"
f" 设定总次数 {station['dut']['dutDropCycles']}"
)
testReqList = station['dut']['testReqList']
direction_name_ongoing = DROP_DIRECTIONS.get(directionCodeOngoing, "未知方向") # 将方向码转为方向名称
for index, testReq in enumerate(testReqList):
# if testReq.get('item') == direction_name_ongoing:
if testReq.get('dropCode') == directionCodeOngoing:
# 设置通道当前正在进行的跌落方向(码)
station['itemOnGoingIndex'] = index
# 设置通道当前跌落方向已经完成的跌落次数
testReq['itemCurrentCycles'] = test_result.get('itemCurrentCycles')
except Exception as error:
update_counter = {"error2": f"error!!!!--PLCTestReqResultData.parse_registers {error}",
'current_counter': station['last_insert_counter']}
self.updateTableRecords('get_counter', [update_counter])
pass
# 从PLC读取测试结果数据允许批量读取而不仅是一条参数见TestReqResult_ItemCountOnceRead
def machine_poll_test_result_bulk(self, machine, station):
# 2025 12 01 修改 cyx
read_index_result = machine.get('virt_plc_device').read_plc_read_index()
if read_index_result.get('status') == 'success':
self.handle_readIndex_sync_with_plc(machine, read_index_result.get('value', []))
start_time = datetime.datetime.now()
registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead
if station.get('result_index_read') is None:
station['result_index_read'] = 0
try:
result = machine.get('virt_plc_device'). \
read_station_cyclesFinishedBulk(station['SN'], station['result_index_read'], registers_len) # 一次性读取多条
end_time = datetime.datetime.now()
# 计算时间差
time_diff = (end_time - start_time).total_seconds() * 1000 # 转换为毫秒
# 检查是否超过 300 毫秒
if time_diff > 300:
# 进行处理
print_with_timestamp(f"Operation read_station_cyclesFinishedBulk took too long: {time_diff} ms "
f"{machine.get('SN')} {station['SN']}", color='red')
if result.get('status') == 'success':
try:
update_counter = {"get_result": station.get("last_insert_counter", 0),
'current_counter': station['last_insert_counter'],
'read_index': station['result_index_read'],
'address': result.get('address'), 'access_time': time_diff}
self.updateTableRecords('get_counter', [update_counter])
self.handle_test_result_registers_value(machine, station, result.get('value'),
TestReqResult_ItemCountOnceRead)
except Exception as error:
print_with_timestamp(f"error!!!!--machine_poll_test_result_bulk "
f"{machine.get('SN')} {station['SN']} {error}", color='red')
finally:
pass
else:
update_counter = {"get_result": 0, 'current_counter': station['last_insert_counter']}
self.updateTableRecords('get_counter', [update_counter])
except Exception as error:
print_with_timestamp(f"error!!!!--read_station_cyclesFinishedBulk {error}", color='red', )
"""
# (plc_status, current_status) -> (action_type, action_name, need_read_dut)
TRANSITION_TABLE = {
# ==================================
# PLC 状态 0: 停止
# 目标station 状态应为 'idle'
# ==================================
(0, 'idle'): ('none', None, False),
(0, 'running'): ('detach', 'stop_by_plc', False),
(0, 'pause'): ('detach', 'stop_by_plc', False),
# ==================================
# PLC 状态 1: 运行中(试验中)
# 目标station 状态应为 'running'
# ==================================
(1, 'idle'): ('attach_dut', 'resync_with_plc_run', True),
(1, 'running'): ('none', None, False),
(1, 'pause'): ('attach', 'resume_by_plc', False),
# ==================================
# PLC 状态 2: 暂停中
# 目标station 状态应为 'pause'
# ==================================
(2, 'idle'): ('attach_dut', 'resync_with_plc_pause', True),
(2, 'running'): ('attach', 'pause_by_plc', False),
(2, 'pause'): ('none', None, False),
}
"""
def handle_readIndex_sync_with_plc(self, machine, read_index_values):
try:
if len(read_index_values) >= 4:
for index, station in enumerate(machine.get('stations', [])): # 每个工位
station['result_index_read'] = read_index_values[index] - 1
except Exception as e:
pass
def handle_status_sync_with_plc(self, machine, connect_state_values): # 处理从PLC读回的各个通道的状态
debug = False
# if machine.get('com') == 'com7': debug = True
if debug:
print_with_timestamp(f"handle_status_sync_with_plc, {machine.get('com')} {connect_state_values}")
def updateDutStatus(status):
if station['dut']['duts']:
duts_on_station = station['dut']['duts'].to_list()
update_values = [{"SN": item.get('SN', ''), "status": status} for item in duts_on_station]
if len(update_values):
self.updateTableRecords('dutList', update_values)
for index, station in enumerate(machine.get('stations', [])): # 每个工位
plc_status = connect_state_values[1 + index]
# 获取当前工位状态
station_status = station.get('status', 'unknown')
machine_sn = machine.get('SN')
station_sn = station['SN']
if plc_status == 0: # 需要清除 testReqList 中的当前跌落次数计数 cyx 20251201
if debug:
print_with_timestamp(f"---{station_sn} handle_status_sync_with_plc--0")
testReqList = station.get('dut', {}).get('testReqList', [])
if not isinstance(testReqList, list):
testReqList = []
for index_testReq, testReq in enumerate(testReqList):
testReq['itemCurrentCycles'] = ""
# --- 状态转换表 ---
# (plc_status, station_status) -> (action_type, action_name, need_read_dut_on_plc)
TRANSITION_TABLE = {
(0, 'idle'): ('none', None, False),
(0, 'running'): ('detach', 'stop_by_plc', False),
(0, 'pause'): ('detach', 'stop_by_plc', False),
(1, 'idle'): ('attach_dut', 'resync_with_plc_run', True),
(1, 'running'): ('none', None, False),
(1, 'pause'): ('attach', 'resume_by_plc', False),
(2, 'idle'): ('attach_dut', 'resync_with_plc_pause', True),
(2, 'running'): ('attach', 'pause_by_plc', False),
(2, 'pause'): ('none', None, False),
(3, 'pause'): ('finish', None, False), # station['status'] = 'finished'
(3, 'running'): ('finish', None, False), # station['status'] = 'finished'
}
key = (plc_status, station_status)
entry = TRANSITION_TABLE.get(key)
# print_with_timestamp(f"handle_status_sync_with_plc--1 {key} {entry}")
if not entry:
# 可选:记录未知状态组合
# logger.warning(f"Unsupported PLC-station state: {key}")
continue
action_type, action_name, need_read_dut_on_plc = entry
# --- 执行动作 ---
if action_type == 'none':
continue # 状态一致,无需操作
elif action_type == 'finish':
pass
station['status'] = 'finished'
print_with_timestamp(f"----station test finished!---- PLC_status = {plc_status}", color='green')
updateDutStatus('已完成')
elif action_type == 'detach':
self.dtStationDetachDut(machine_sn, station_sn, action_name)
elif action_type == 'attach':
self.dtStationAttachDut(
machine_sn, station_sn, action_name, None, False, None
)
if action_name in ['resume_by_plc']:
dut_status = "试验中"
elif action_name in ['pause_by_plc']:
dut_status = "暂停中"
updateDutStatus(dut_status)
elif action_type == 'attach_dut':
# 需要从 PLC 读取当前 DUT SN 列表
dut_info = machine.get('virt_plc_device').read_station_dutInfo(
station_sn, sn_count=connect_state_values[0]
)
duts_sn = []
if dut_info.get('status') == 'success' and 'data' in dut_info:
data_content = dut_info.get('data', [])
if isinstance(data_content, dict):
# 单个字典的情况
duts_sn.append(data_content.get('SN', ''))
else:
# 列表的情况
for item in data_content:
duts_sn.append(item.get('SN', ''))
self.dtStationAttachDut(machine_sn, station_sn, action_name, duts_sn, False, None)
if action_name in ['resync_with_plc_run']:
dut_status = "试验中"
elif action_name in ['resync_with_plc_pause']:
dut_status = "暂停中"
else:
dut_status = "试验中"
updateDutStatus(dut_status)
# print_with_timestamp(f"handle_status_sync_with_plc--2\n")
# ========================================================================
# 核心轮询函数: 每台设备独立线程执行
# ========================================================================
# 轮询周期: 0.6秒
# 主要任务:
# 1. 读取PLC连接状态和工位运行状态(1.2秒间隔)
# 2. 读取PLC设置的跌落参数(1.2秒间隔,仅dutDirectionMode=2)
# 3. 读取测试结果计数器并批量读取结果(每次轮询)
# 4. 同步PLC状态到工位状态
# ========================================================================
def machine_poll(self, args): # 每个机器通过此线程工作函数来同步设备的数据与状态通过modbus (RS-485)
"""
PLC数据轮询主循环 - 每台设备独立线程
参数:
args: 字典,包含:
- machine: 设备对象
- busy_flag: 全局繁忙标志(暂未使用)
轮询策略:
- 0.6秒轮询间隔 (POLL_PLC_INTERVAL)
- 分任务计数器控制执行频率
- 每次轮询检查所有工位的running状态
"""
machine = args["machine"]
busy_flag = args["busy_flag"]
# 任务执行计数器
second_counter = 0 # PLC连接状态检测计数(每2秒)
read_drop_params_counter = 0 # 跌落参数读取计数(每2秒)
run_status_counter = 0 # 运行状态检测计数(暂未使用)
# 安全检查: 必须有有效的PLC通信对象
if not machine or not machine.get('virt_plc_device'):
return
# 无限轮询循环
while True:
machine_sn = machine.get('SN')
if busy_flag is True:
print_with_timestamp("machine is busy", color='red') # cyx
if busy_flag is False:
current_time = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
second_counter = second_counter + 1
run_status_counter = run_status_counter + 1
read_drop_params_counter = read_drop_params_counter + 1
# ------------------------------------------------------------
# 任务1: 读取PLC连接状态和工位运行状态
# 频率: 每2秒 (second_counter达到阈值)
# 读取内容: D10寄存器的5个值 [连接标志, 工位1状态, 工位2, 工位3, 工位4]
# ------------------------------------------------------------
if second_counter >= (2 / POLL_PLC_INTERVAL): # 2 x 0.3 =0.6 秒 PLC 的连接状态
second_counter = 0
try:
start_time = datetime.datetime.now()
# 离线调试模式: 模拟PLC响应
if OFF_LINE_FOR_DEBUG: # 20250826 为了PLC没有连接时进行调试
result = {'status': 'success', 'message': f"get plc connect state success",
'value': [2, 0, 0, 0, 0, 0, 0, 0, 0]} # [连接状态, 工位1, 工位2, 工位3, 工位4]
else:
# 正常模式: 读取PLC的D10寄存器(5个字)
result = machine.get('virt_plc_device').read_plc_connect_state()
end_time = datetime.datetime.now()
# 性能监控: 检测读取耗时
time_diff = (end_time - start_time).total_seconds() * 1000 # 转换为毫秒
if time_diff > 300: # 超过300ms告警
print_with_timestamp(
f"⚠️ PLC状态读取超时: {time_diff:.0f}ms (阈值300ms) "
f"设备: {machine.get('com')} 时间: {start_time}{end_time}",
color='red'
)
# 处理读取结果
if result.get('status') == 'success' and result.get('value') and result.get('value')[0]:
# 成功读取: 更新设备连接状态
# result['value'] = [连接状态码, 工位1状态, 工位2状态, 工位3状态, 工位4状态]
machine['connectState'] = result.get('value')[0]
# 同步PLC状态到各工位 (核心状态机处理)
self.handle_status_sync_with_plc(machine, result.get('value'))
else:
# 读取失败: 设置为离线状态
machine['connectState'] = 0x00
self.handle_status_sync_with_plc(machine, [0x00, 0x00, 0x00, 0x00, 0x00])
# 通知前端设备状态变化 (通过WebSocket推送)
self.cb_machine_data_change([machine_sn], self.get_machines_value([machine_sn]))
except Exception as e:
# WARNING: 此处吞噬异常,应改为结构化异常处理
print_with_timestamp(f"❌ 读取PLC状态异常: {machine.get('SN')} - {e}", color='red')
# ------------------------------------------------------------
# 任务2: 读取PLC设置的跌落参数 (dutDirectionMode=2时)
# 频率: 每2秒
# 场景: 操作员在PLC HMI上设置跌落方向、高度、次数
# 读取内容: 6个跌落方向的参数 (每个5个register)
# ------------------------------------------------------------
if read_drop_params_counter > (2 / POLL_PLC_INTERVAL): # 读取在PLM HMI上设置的跌落参数(方向 次数)
read_drop_params_counter = 0
# 只处理dutDirectionMode=2的模式 (从PLC读取跌落参数)
for station in machine.get('stations', []): # 每个工位
if self.dutDirectionMode in [2]:
# 只处理running状态的工位
if station.get('status') in ['running']: # ['running'] :
# 读取在PLC上设置的方向跌落参数,最多6个方向
result = self.read_station_dropParamsBulk(
machine_sn,
station.get('SN'),
PLCDropParamsData.required_registers_size
)
if result.get('status') == 'success' and result.get('value'):
try:
# 解析寄存器数据为跌落参数列表
drop_params = PLCDropParamsData.parse_registers(result.get('value'))
# 转换为工位的testReqList格式
self.drop_params_to_dut_req_list(station, drop_params)
except Exception as error:
print_with_timestamp(
f"❌ 解析跌落参数失败: {machine_sn}/{station.get('SN')} - {error}",
color='red'
)
for station in machine.get('stations', []): # 每个工位
if station.get('status') == 'running' and self.dutDirectionMode in [0, 3]: # 2不执行
start_time = datetime.datetime.now()
result = machine.get('virt_plc_device').read_station_cyclesFinished(station['SN'])
end_time = datetime.datetime.now()
# 计算时间差
time_diff = (end_time - start_time).total_seconds() * 1000 # 转换为毫秒
# 检查是否超过 300 毫秒
if time_diff > 300:
# 进行处理
print_with_timestamp(
f"Operation read_station_cyclesFinished took too long: {time_diff} ms "
f"{machine.get('SN')} {station['SN']}", color='red')
if result.get('status') == 'success' and result.get('value'):
station['finished'] = result.get('value')
station['itemFinished'] = result.get('value')
dut = station.get('dut', {})
testReqList = dut.get('testReqList', [])
count = testReqList[station.get('itemOnGoingIndex')].get('count')
testReqId = testReqList[station.get('itemOnGoingIndex')].get('id')
if result.get('value') != testReqList[station.get('itemOnGoingIndex')].get(
"itemCurrentCycles", 0):
testReqList[station.get('itemOnGoingIndex')]['itemCurrentCycles'] = result.get('value')
if count and count != -1:
# 本次测试项是不是已经结束,判断是否需要切换到下一个测试项
# 样品的每一个测试项结束时需要将结果上传至LMIS(实验室管理)系统
if result.get('value') >= count and station['itemOnGoingIndex'] < len(testReqList) - 1:
self.dtStationAttachDut(station['dtMachineSN'], station['SN'], 'switch', None, True)
self.data_change_callback(station, testReqId)
if result.get('value') >= count and station['itemOnGoingIndex'] == len(testReqList) - 1:
self.dtStationDetachDut(station['dtMachineSN'], station['SN'], 'finish')
self.data_change_callback(station, testReqId)
"""
result = machine.get('virt_plc_device').station_start_status(station['SN'])
if result.get('status') == 'success' and result.get('value'):
start_status = result.get('value')
# 暂停状态为1时表示已经启动 0--没有启动
# TBD 需要更新工位的状态
result = machine.get('virt_plc_device').station_stop_status(station['SN'])
if result.get('status') == 'success' and result.get('value'):
stop_status = result.get('value')
# 暂停状态为1时表示已经停止 0--没有停止
# TBD 需要更新工位的状态
result = machine.get('virt_plc_device').station_pause_status(station['SN'])
if result.get('status') == 'success' and result.get('value'):
pause_status = result.get('value')
# 暂停状态为1时表示暂停 0--没有暂停
# TBD 需要更新工位的状态
"""
"""
if run_status_counter >= 5: # 5 x 0.3 =3 PLC 的运行状态 1.5秒
run_status_counter = 0
try:
if station.get('status') in ['running']:
result = machine.get('virt_plc_device').station_run_status(station['SN'])
if result.get('status') == 'success' and 'value' in result:
run_status = result.get('value')
if run_status is False:
station['status'] = 'pause'
if station.get('status') in ['stop' , 'pause']:
result = machine.get('virt_plc_device').station_run_status(station['SN'])
if result.get('status') == 'success' and 'value' in result:
run_status = result.get('value')
if run_status is True:
station['status'] = 'running'
finally:
pass
"""
if station.get('status') in ['running', 'pause', 'finished'] and self.dutDirectionMode in [2]:
if machine['connectState'] == 0x00:
station['lastDropResult'] = f"!!!error:设备未连接!!!" # 让UI上显示设备未连接
# 先读取PLC当前样品跌落次数计数
# 比较PLC当前样品跌落次数 和 station 中存的之前已经读取到的样品跌落计数,只有大于这个值,
# 表示有新的跌落结果,才需要去读取
now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
insert_record = {"time": now_str, "machine": machine.get('SN'), 'station': station.get('SN')}
result = machine.get('virt_plc_device').read_station_result_counter(station['SN'])
if result.get('status') == 'success':
raw_new = result.get('value')
prev = station.get("pre_dut_result_counter", 0)
last_insert_counter = station.get("last_insert_counter", 0)
jitter_tolerance = 2
event = 'duplicate'
delta = 0
if raw_new > prev:
delta = raw_new - prev
event = 'increment'
elif raw_new == prev:
event = 'duplicate'
else:
if (prev - raw_new) <= jitter_tolerance:
event = 'jitter'
delta = 0
else:
event = 'rollover'
# 不计算跨界delta避免误差保持触发读取
delta = 0
should_read_results = (event in ['increment', 'rollover']) and (
raw_new != last_insert_counter)
# 记录计数事件
insert_record['event'] = event
insert_record['delta'] = delta
insert_record['current_counter'] = raw_new
insert_record['pre_counter'] = prev
if should_read_results:
insert_record['status'] = 1
station['last_insert_counter'] = raw_new
self.insertTableRecords('get_counter', [insert_record])
# 从PLC读取完整的测试结果可能是多条
self.machine_poll_test_result_bulk(machine, station)
elif event in ['duplicate', 'jitter']:
insert_record['status'] = 0
self.insertTableRecords('get_counter', [insert_record])
else:
# 其他情况,保守记录
insert_record['status'] = 0
self.insertTableRecords('get_counter', [insert_record])
else:
insert_record['status'] = 0
self.insertTableRecords('get_counter', [insert_record])
time.sleep(POLL_PLC_INTERVAL)
def setMachineBusyFlag(self, flag=False):
self.busy_flag = flag
pass
# 前端通过RESTAPI(websocket备选)在工位上安排试验样品,样品需要在数据库中创建,并绑定到工位上
# machine_sn: 机器SN station_sn: 工位SN action: 动作 dut_sn: 样品SN updateTableFlag: 是否更新表格
def dtStationAttachDut(self, machine_sn, station_sn, action, dut_sn, updateTableFlag, userid=None):
print_with_timestamp('dtMachineService dtStationAttachDut', action, station_sn, machine_sn, dut_sn, userid)
value_return = {"status": "error", "message": "dtStationAttachDut error"}
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine:
station = find_item(machine['stations'], 'SN', station_sn)
if station is None:
return value_return
else:
return value_return
# 20250827 增加判断从UI传下来的是单个dut的SN还是多个dut的SN数组
dut_sn_type = 0 if dut_sn is None else 2 if isinstance(dut_sn, list) else 1 if isinstance(dut_sn,
dict) else 0
all_duts = []
if action in ["create", "select", "update", "create_and_transfer", "select_and_transfer",
"resync_with_plc_run", "resync_with_plc_pause"]:
if dut_sn_type == 1:
response = self.getTableRecords('dutList', {'SN': dut_sn}) # 从数据库取得dut
if 'data' in response:
dut = response['data']
if isinstance(dut, list): # 如果返回多个结果,取第一个
dut = dut[0]
else:
return value_return
elif dut_sn_type == 2:
for index, sn in enumerate(dut_sn):
response = self.getTableRecords('dutList', {'SN': sn}) # 从数据库取得dut
if 'data' in response:
cur_dut = response['data']
if isinstance(response['data'], list):
cur_dut = response['data'][0]
if index == 0: # 20250827 如果UI传递多个DUT SNS那么取第一个作为默认的测试样品代替选择1个测试样品的作用
dut = cur_dut
all_duts.append(cur_dut) # 将所有查询到的测试样品信息形成为数组
else:
return value_return
elif dut_sn_type == 0:
return value_return
else:
if station.get('dut') and station['dut'].get('SN'): # 获取工作台当前已经安排的dut
dut = station['dut'] # ----
else:
return value_return
value = {"SN": dut["SN"], "status": "待启动"} # 样品状态初始值设置 #----
# 将样品状态进行转换
if action == "select" or action == "create" or action == "update":
value["status"] = "待启动"
elif action in ['select_and_transfer', 'create_and_transfer']:
value["status"] = "待启动"
elif action in ['resync_with_plc_run']:
value["status"] = "试验中"
elif action in ['resync_with_plc_pause']:
value["status"] = "暂停中"
elif action in ['pause_by_plc']:
value["status"] = "暂停中"
elif action in ['resume_by_plc']:
value["status"] = "试验中"
elif action == "start":
value["status"] = "试验中"
elif action == "resume":
value["status"] = "试验中"
elif action == "pause":
value["status"] = "暂停中"
elif action == "switch":
value["status"] = "暂停中"
elif action == "finish":
value["status"] = "已完成"
elif action == "update":
updateTableFlag = False
elif action == "init":
value = {"SN": dut["SN"], "status": "待启动"}
dut["status"] = "待启动"
result_array = []
station["dutStatus"] = value["status"] # 试验工位上的样品状态
if action in ["create", "select", "update"]:
stationKeys = ["dutName", "dutSN", "dutProject", "dutPhase"]
dutKeys = ["name", "SN", "project", "phase"]
value["stationAssigned"] = json.dumps({"dtMachine": machine_sn, "station": station_sn}) # 更新样品安排的机台和通道
# 将样品的一些信息和数据 拷贝到工作位的数据结构中,并且有些重新命名
for index, key in enumerate(dutKeys):
if key in dut: # ----
station[stationKeys[index]] = dut[key] # ----
station["dut"] = dut # 将样品对象关联到工位对象中
station['dut']['duts'] = all_duts # 20250827 将所有测试样品信息数组作为station['dut']['duts']
station['status'] = 'dutAttached' # 20240706 工位状态增加 "dutAttached" 表示已经安排了样品
# 如果是新的样品关联默认需要将样品的第1个测试项设置为当前测试项
result = self.dtStationAssignTestReq(machine_sn, station_sn, dut_sn, 0)
result_array.append({"status": result.get('status'), "message": result.get('message')})
if result.get('status') == 'success':
# 设置工位状态
# station['status'] = 'idle'
station['status'] = 'dutAttached' # 20240706 工位状态增加 "dutAttached" 表示已经安排了样品
if userid:
station['userid'] = userid
if action in ["create_and_transfer", "select_and_transfer",
"resync_with_plc_run", "resync_with_plc_pause"]: # 专门为在PLC HMI界面上设置跌落反向和跌落高度传输DUT信息
stationKeys = ["dutName", "dutSN", "dutProject", "dutPhase", "projectName", "projectPhase"]
dutKeys = ["name", "SN", "project", "phase", "projectName", "projectPhase"]
value["stationAssigned"] = json.dumps({"dtMachine": machine_sn, "station": station_sn}) # 更新样品安排的机台和通道
# 将样品的一些信息和数据 拷贝到工作位的数据结构中,并且有些重新命名
for index, key in enumerate(dutKeys):
if key in dut:
station[stationKeys[index]] = dut.get(key, None)
station['pre_dut_result_counter'] = 0 # 清除之前备份的计数
station['dut'] = dut # 将样品对象关联到工位对象中
station['dut']['duts'] = all_duts # 20250827 将所有测试样品信息数组作为station['dut']['duts']
station['itemOnGoingIndex'] = 99 # 将当前的测试项目设为无效
if OFF_LINE_FOR_DEBUG:
result = {'status': 'success'}
elif action in ["create_and_transfer", "select_and_transfer"]:
result = self.set_station_dutInfo(station['dtMachineSN'], station['SN'], all_duts) # 给PLC设置测试样品信息
elif action in ['resync_with_plc_run', 'resync_with_plc_pause']:
# 如果action 为 resync_with_plc_run resync_with_plc_pause说明程序进入时PLC这个通道已经在试验中 ,暂停中
result = {"status": "success"}
else:
pass
result_array.append(result)
if "status" in result and result["status"] == "success":
# 如果刚刚工位状态为idle需要更新测试项的开始试验时间
if station["status"] == "idle" or station["status"] == "dutAttached":
# 设置工位状态
station["status"] = "running" # 20250101 更改新方案由PLC启动试验状态直接改为running
if action in ["create_and_transfer", "select_and_transfer"]:
station["status"] = "idle" # 202500905 更改新方案由PLC启动试验
if action in ['resync_with_plc_pause']:
station["status"] = "pause" # 202500905 更改新方案由PLC启动试验程序启动时PLC处于pause
print_with_timestamp(f"dtStationAttachDut {action} status=", station["status"]) # cyx
if updateTableFlag: # 更新数据库 dutList 中样品状态
response = self.updateTableRecords('dutList', [value])
if 'status' in response and response['status'] == 'success':
result_array.append({"status": "success", "message": "update dutList success"})
else:
result_array.append({"status": "error", "message": "update dutList error"})
if action == "pause_by_plc":
result_array.append({"status": "success"})
# 设置工位状态
station["status"] = "pause"
if action == "resume_by_plc":
result_array.append({"status": "success"})
# 设置工位状态
station["status"] = "running"
if action == "start":
result = self.start_station(station['dtMachineSN'], station['SN'])
result_array.append(result)
if "status" in result and result["status"] == "success":
# 如果刚刚工位状态为idle需要更新测试项的开始试验时间
if station["status"] == "idle" or station["status"] == "dutAttached":
testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项
testReqItem['startTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 更新数据库中 testReq 测试项的试验开始时间 和 执行的测试人员
self.updateTableRecords('testReq',
[{"id": testReqItem['id'], "startTime": testReqItem['startTime'],
"tester": station.get("userid", -1), "status": "running"}])
# 设置工位状态
station["status"] = "running"
if action == "resume":
result = self.resume_station(station['dtMachineSN'], station['SN'])
result_array.append(result)
if "status" in result and result["status"] == "success":
if station["status"] == "switch":
testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项
testReqItem['startTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 更新数据库中 testReq 测试项的试验开始时间 和 执行的测试人员
self.updateTableRecords('testReq', [{"id": testReqItem['id'], "startTime": testReqItem['startTime'],
"tester": station.get("userid", -1), "status": "running"}])
else:
testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项
testReqItem['startTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 更新数据库中 testReq 测试项的试验开始时间 和 执行的测试人员
self.updateTableRecords('testReq', [{"id": testReqItem['id'], "status": "running"}])
# 设置工位状态
station["status"] = "running"
if action == "pause":
result = self.stop_station(station['dtMachineSN'], station['SN'])
result_array.append(result)
if "status" in result and result["status"] == "success":
testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项
# 更新数据库中 testReq 测试项的状态
self.updateTableRecords('testReq', [{"id": testReqItem['id'], "status": "pause"}])
# 设置工位状态
station["status"] = "pause"
if action == "switch":
result = self.stop_station(station['dtMachineSN'], station['SN'])
result_array.append(result)
if "status" in result and result["status"] == "success":
# 更新本测试项的结束时间
testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项
testReqItem['endTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 更新数据库中 testReq 测试项的试验完成时间 和 执行的测试人员 及 完成状态
self.updateTableRecords('testReq',
[{"id": testReqItem['id'], "endTime": testReqItem['endTime'],
"tester": station.get("userid", -1), "status": "finished"}])
# 设置工位状态
station["status"] = "switch"
station_return = station
if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')):
station_return = station.to_dict()
value_return = {"status": "success", "message": "dtStationAttachDut success", "station": station_return}
for result in result_array:
if "status" not in result or result["status"] != "success":
value_return = {"status": "error", "message": "dtStationAttachDut error", "station": station_return}
return value_return
def dtStationDetachDut(self, machine_sn, station_sn, action):
value_return = {"status": "error", "message": "dtStationDetachDut error"}
print_with_timestamp('dtMachineService dtStationDetachDut', action, station_sn, machine_sn)
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine:
station = find_item(machine['stations'], 'SN', station_sn)
if station is None:
return value_return
else:
return value_return
if action in ['reset']: # 主要是试验完成时 ,经确认后的复位
station['status'] = 'idle'
station['userid'] = None
station_return = station
station['pre_dut_result_counter'] = 0 # 清除之前备份的计数
station['itemOnGoingIndex'] = 99 # 将当前的测试项目设为无效
if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')):
station_return = station.to_dict()
value_return = {"status": "success", "message": "station reset successful", "station": station_return}
return value_return
if action in ['stop_by_plc']: # 在PLC主控操作时PLC可以停止试验
station['status'] = 'idle'
if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')):
station_return = station.to_dict()
value_return = {"status": "success", "message": "station stop_by_plc successful", "station": station_return}
return value_return
if station.get('SN') and station.get('dtMachineSN'):
if action in ['reselect', 'reCreate']:
value = {'SN': station['dutSN'], 'status': '待安排'}
elif action == 'stop':
value = {'SN': station['dutSN'], 'status': '已取消'}
elif action == 'finish':
value = {'SN': station['dutSN'], 'status': '已完成'}
elif action == 'cancel':
value = {'SN': station['dutSN'], 'status': '已取消'}
result_array = []
# 更新数据库中 dutList 样品状态
"""
if value and value.get('SN'):
print_with_timestamp(f'dtMachineService dtStationDetachDut--3-0')
response = self.updateTableRecords('dutList', [value])
if 'status' in response and response['status'] == 'success':
result_array.append({"status": "success", "message": "update dutList success"})
else:
result_array.append({"status": "error", "message": "update dutList error"})
station['dutStatus'] = value.get('status')
"""
testReqStatus = None
if action in ['stop', 'cancel', 'finish']:
try:
# 对于PLC来说不管是stop cancel finish 都是要停止机器的运行
result = self.stop_station(station['dtMachineSN'], station['SN'])
result_array.append(result)
testReqItem = station['dut']['testReqList'][station['itemOnGoingIndex']] # 当前的测试项
if "status" in result and result["status"] == "success":
if action == 'stop':
station['status'] = 'stop'
testReqStatus = "stop"
elif action == 'cancel':
station['status'] = 'idle' # 完全重置
station['userid'] = None
testReqStatus = "cancel"
elif action == 'finish':
# 如果刚刚工位状态为running需要更新测试项的结束试验时间
if station["status"] == "running":
testReqItem['endTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
testReqStatus = "finished"
# 更新数据库中 testReq 测试项的试验完成时间
self.updateTableRecords('testReq',
[{"id": testReqItem['id'], "endTime": testReqItem['endTime'],
"status": testReqStatus}])
station['status'] = 'finished'
station['userid'] = None
# 更新数据库中 testReq 测试项的状态
if action in ['stop', 'cancel'] and testReqStatus:
self.updateTableRecords('testReq', [{"id": testReqItem['id'], "status": testReqStatus}])
except Exception as error:
pass
if action not in ['finish', 'stop']:
stationKeys = ['dutName', 'dutSN', 'dutProject', 'dutPhase', 'dutTestReq', 'itemOnGoing', 'itemFinished',
'dutStatus']
for key in stationKeys:
station[key] = ''
station['itemOnGoingIndex'] = 0
station['dut'] = None
station_return = station
if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')):
station_return = station.to_dict()
value_return = {"status": "success", "message": "dtStationDetachDut success", "station": station_return}
for result in result_array:
if "status" not in result or result["status"] != "success":
value_return = {"status": "error", "message": "dtStationDetachDut error", "station": station_return}
return value_return
def dtStationAssignTestReq(self, machine_sn, station_sn, dut_sn, testReqIndex):
value_return = {'status': "error"}
print_with_timestamp("DtmMachineService dtStationAssignTestReq", machine_sn, station_sn, dut_sn, testReqIndex)
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine:
station = find_item(machine['stations'], 'SN', station_sn)
else:
return value_return
dut = station['dut']
"""
response = self.getTableRecords('dutList', {'SN': dut_sn})
if 'data' in response:
dut = response['data'] # ----
if isinstance(dut, list): # ----
dut = dut[0] # ----
else:
return value_return
"""
first_dut_sn = dut['SN'] # 20250827 cyx 为了兼容一个工位(通道)有多个测试样品
response = self.getTableRecords('testReq', {'SN': first_dut_sn})
if 'data' in response:
dut['testReqList'] = response['data']
# 将数据库中的字段名,重新更改键值,方便后续程序和前端处理
for testReq in dut['testReqList']:
if 'dropHeight' in testReq:
testReq['height'] = testReq['dropHeight']
if 'dropItem' in testReq:
testReq['item'] = testReq['dropItem']
if 'dropCycles' in testReq:
testReq['count'] = testReq['dropCycles']
if 'dropDirection' in testReq:
testReq['dropDirection'] = testReq['dropDirection']
else:
return value_return
station['itemOnGoing'] = ""
station['itemFinished'] = 0
if dut and 'testReqList' in dut and isinstance(dut['testReqList'], list) and len(
dut['testReqList']) > testReqIndex:
# station['dut'] = dut # 20250827 cyx 修改在dtStationAttachDut 已经执行station['dut'] = dut
station['dutTestReq'] = dut['testReqList'][testReqIndex]['item']
station['itemCount'] = dut['testReqList'][testReqIndex]['count']
station['itemHeight'] = dut['testReqList'][testReqIndex]['height']
station['itemDirection'] = dut['testReqList'][testReqIndex]['dropDirection']
station['itemOnGoingIndex'] = testReqIndex # 第几项测试
if 'dtMachineSN' in station and station['dtMachineSN']:
dtMachineSN = station['dtMachineSN']
try:
if OFF_LINE_FOR_DEBUG:
result1 = {'status': 'success'}
result2 = {'status': 'success'}
result3 = {'status': 'success'}
result4 = {'status': 'success'}
else:
result1 = self.set_station_dropheight(dtMachineSN, station['SN'],
dut['testReqList'][testReqIndex]['height'])
result2 = self.set_station_cycles(dtMachineSN, station['SN'],
dut['testReqList'][testReqIndex]['count'])
result3 = self.set_station_finished(dtMachineSN, station['SN'], 0)
result4 = self.set_station_dropdirection(dtMachineSN, station['SN'],
dut['testReqList'][testReqIndex]['dropDirection'])
if all('status' in result and result['status'] == 'success' for result in
[result1, result2, result3, result4]):
station_return = station
if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')):
station_return = station.to_dict()
value_return = {
'status': "success",
'message': "assignTestReq to station success",
"station": station_return
}
else:
value_return['message'] = "Failed to set station parameters"
except Exception as error:
value_return = {'status': "error", "message": "set station params ", "error": error}
else:
value_return = {'status': "error", 'message': "testReqList 不满足条件"}
return value_return
def set_station_dropheight(self, dtMachineSN, station_no, height):
value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} dropheight error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].set_station_dropheight(station_no, height)
else:
return value_return
def set_station_cycles(self, dtMachineSN, station_no, cycles):
value_return = {'status': 'error', 'message': f"set station {dtMachineSN}:{station_no} cycles error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].set_station_cycles(station_no, cycles)
else:
return value_return
def set_station_finished(self, dtMachineSN, station_no, finished):
value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} finished error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].set_station_finished(station_no, finished)
else:
return value_return
return value_return
def set_station_dropdirection(self, dtMachineSN, station_no, direction):
value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} direction error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].set_station_dropdirection(station_no, direction)
else:
return value_return
return value_return
# 给PLC设置 待测试的工件信息
def set_station_dutInfo(self, dtMachineSN, station_no, dutInfo): # dutInfo 有可能是数组
value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} DUT info error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].set_station_dutInfo(station_no, dutInfo)
else:
return value_return
return value_return
def read_station_dropheight(self, dtMachineSN, station_no):
value_return = {'status': 'error', 'message': f"read station{dtMachineSN}:{station_no} dropheight error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].read_station_dropheight(station_no)
else:
return value_return
return value_return
def read_station_cyclesFinished(self, dtMachineSN, station_no):
value_return = {'status': 'error', 'message': f"read station{dtMachineSN}:{station_no} cyclesFinished error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].read_station_cyclesFinished(station_no)
else:
return value_return
return value_return
def read_station_cyclesFinishedBulk(self, dtMachineSN, station_no, result_index, length):
value_return = {'status': 'error',
'message': f"read station{dtMachineSN}:{station_no} cyclesFinishedBulk error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].read_station_cyclesFinishedBulk(station_no, result_index, length)
else:
return value_return
return value_return
def read_station_dropParamsBulk(self, dtMachineSN, station_no, length):
value_return = {'status': 'error',
'message': f"read station{dtMachineSN}:{station_no} dropParamsBulk error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].read_station_dropParamsBulk(station_no, length)
else:
return value_return
return value_return
def start_station(self, machine_sn, station_no):
value_return = {'status': 'error', 'message': f"start station{machine_sn}:{station_no} error"}
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine and 'virt_plc_device' in machine:
result = machine['virt_plc_device'].start_station(station_no)
if result.get('status') and result['status'] == 'success':
value_return = {'status': 'success', 'message': f"start station{machine_sn}:{station_no} success"}
return value_return
def resume_station(self, machine_sn, station_no):
value_return = {'status': 'error', 'message': f"resume station{machine_sn}:{station_no} error"}
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine and 'virt_plc_device' in machine:
result = machine['virt_plc_device'].resume_station(station_no)
if result.get('status') and result['status'] == 'success':
value_return = {'status': 'success', 'message': f"resume station{machine_sn}:{station_no} success"}
return value_return
def stop_station(self, machine_sn, station_no):
value_return = {'status': 'error', 'message': f"stop {machine_sn}:{station_no} error"}
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine and 'virt_plc_device' in machine:
result = machine['virt_plc_device'].stop_station(station_no)
if result.get('status') and result['status'] == 'success':
value_return = {'status': 'success', 'message': f"stop station{machine_sn}:{station_no} success"}
return value_return
def pause_station(self, machine_sn, station_no):
machine = find_item(self.dtMachineStationData, 'SN', machine_sn)
if machine and 'virt_plc_device' in machine:
result = machine['virt_plc_device'].stop_station(station_no)
if result.get('status') and result['status'] == 'success':
value_return = {'status': 'success', 'message': f"stop station{machine_sn}:{station_no} success"}
return value_return
def get_station_value(self, dtMachineSN, station_no):
value_return = {'status': 'error', 'station': {},
'message': f"get station{dtMachineSN}:{station_no} value error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine:
station = find_item(machine.get('stations'), 'SN', station_no)
if station and hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')):
value_return = {'status': 'success', 'station': station.to_dict(),
'message': f"get station{dtMachineSN}:{station_no} value success"}
elif station:
value_return = {'status': 'success', 'station': station,
'message': f"get station{dtMachineSN}:{station_no} value success"}
return value_return
def get_all_machine_sn(self):
# 使用列表推导式提取所有的 'SN' 值
return [machine['SN'] for machine in self.dtMachineStationData]
def get_machines_value(self, machines_sn_list):
machines_return = []
machines_matched = find_item(self.dtMachineStationData, 'SN', machines_sn_list)
for machine in machines_matched:
new_data = {}
for key in machine:
if key != 'virt_plc_device' and key != 'poll_thread' and key != 'register01' and \
key != 'register02' and key != 'register03' and key != 'register04' and key != "stations":
new_data[key] = machine[key]
if 'stations' in machine:
new_data['stations'] = []
for station in machine.get('stations'):
new_data['stations'].append(station.to_dict())
machines_return.append(new_data)
value_return = {'status': 'success', 'machines': machines_return,
'message': f"get machines{machines_sn_list} value success"}
return value_return
def register_test(self, dtMachineSN, action, address, value):
value_return = {'status': 'error', 'message': f"register {action} {dtMachineSN}:{address} error"}
machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN)
if machine and 'virt_plc_device' in machine:
return machine['virt_plc_device'].register_test(action, address, value)
else:
return value_return
return value_return
"""
跌落试验机
通过modbus client 与 modbus server 通讯
modbus server 通过串口 rs232 rs485 与 跌落机的控制器(PLC)通讯
"""
"""
station = {
'dtMachineSN': machine_config.SN,
'dtMachineLabel': machine_config.label,
'label': f"工位{i}",
'SN': f"0{i}",
'key': index,
'formal': True,
'testReqExpanded': False,
'status': 'idle',
'index': stationIndex
}
"""