import asyncio import json import re import sys import time import pymodbus import serial import threading import binascii import datetime import requests from collections.abc import MutableMapping, MutableSequence from dtMachine_modbus import ModbusDtMachine from utils import find_item from Observable import ObservableDict, ObservableList from utils import PLCTestReqResultData, PLCDropParamsData, print_with_timestamp """ # 构建请求数据 data = { 'action': 'insert', 'records': [ # 插入记录的数据 { 'column1': 'value1', 'column2': 'value2' }, # 更多记录... ] } # 发送POST请求 url = 'http://your-api-endpoint' # 替换为实际的API端点 response = requests.post(url, json=data, params=param_values) """ dbServer_aloned = False url_base = 'http://127.0.0.1:5050/' # url_base = 'http://106.52.71.204:5050/' comm_config_xinjie = { "baudrate": 19200, "stopbits": serial.STOPBITS_ONE, "parity": serial.PARITY_EVEN, "bytesize": serial.EIGHTBITS } comm_config = { "baudrate": 9600, "stopbits": serial.STOPBITS_ONE, "parity": serial.PARITY_EVEN, "bytesize": serial.SEVENBITS } drop_register = { '01': {"height": "D0160", "cycles": "D0200", "cyclesFinished": "D0202", "start": "M0016", "stop": "M0008", "runStatus": "M0010"}, '02': {"height": "D0162", "cycles": "D0204", "cyclesFinished": "D0206", "start": "M0017", "stop": "M0018", "runStatus": "M0020"}, '03': {"height": "D0164", "cycles": "D0210", "cyclesFinished": "D0212", "start": "M0026", "stop": "M0028", "runStatus": "M0030"}, '04': {"height": "D0166", "cycles": "D0214", "cyclesFinished": "D0216", "start": "M0027", "stop": "M0038", "runStatus": "M0040"} } PLC_CONNECT_REGISTER = 'D10' # json 数组中找包含key的键值==value 的记录 # 返回的是满足条件的所有记录 POLL_PLC_INTERVAL = 0.6 OFF_LINE_FOR_DEBUG = False INVALID_DROP_DIRECTION = 21 TestReqResult_ItemCountOnceRead = 1 DROP_DIRECTIONS = { 0: "正面", 1: "背面", 2: "左面", 3: "右面", 4: "上面", 5: "下面", 6: "顶面", 7: "底面", 8: "前面", 9: "后面", 10: "左侧边", 11: "右侧边", 12: "头部", 13: "尾部", 14: "底部", 15: "随机方向", 16: "备用1", 17: "备用2", 18: "备用3", 19: "备用4", 20: "备用5", 21: "禁用" } # ============================================================================ # 核心业务服务类: 跌落机测试管理服务 # ============================================================================ # 功能职责: # 1. 管理多台跌落机及其工位状态 # 2. 与Modbus网关通信,读写PLC寄存器 # 3. 轮询PLC数据,同步设备状态 # 4. 解析测试结果,触发存储和上传 # 5. 提供REST API供前端调用 # # 设计模式: # - 每台跌落机独立线程轮询(0.6秒间隔) # - 观察者模式: 数据变化自动回调前端 # - 工厂模式: 根据PLC类型创建不同通信对象 # ============================================================================ class DtmMachineService: """跌落机测试管理服务 - 核心业务逻辑""" def __init__(self, modbusServer='127.0.0.1', modbusServerPort=5020, access_table_func=None, flask_app=None, dutDirectionMode=2, cb_station_result_change=None, cb_machine_data_change=None, cb_insert_result=None): """ 初始化跌落机管理服务 参数: modbusServer: Modbus网关IP地址 modbusServerPort: Modbus网关端口(默认5020) access_table_func: 数据库访问函数(可选,用于内部调用) flask_app: Flask应用实例(可选,用于测试) dutDirectionMode: 跌落方向模式(2=从PLC读取) cb_station_result_change: 工位结果变化回调 cb_machine_data_change: 设备数据变化回调 cb_insert_result: 测试结果插入回调 """ # 操作互斥标志 (WARNING: 非线程安全,需改进) self.busy_flag = False # Modbus网关连接配置 self.modbus_loop = None self.modbusServer = modbusServer self.modbusServerPort = modbusServerPort # 数据库访问接口 self.access_table_func = access_table_func # 直接调用数据库函数 self.flask_app = flask_app # Flask应用(用于测试上下文) # 业务配置 self.dutDirectionMode = dutDirectionMode # 2=从PLC HMI读取跌落参数 # 回调函数注册 self.cb_station_result_change = cb_station_result_change # 通知前端工位结果更新 self.cb_machine_data_change = cb_machine_data_change # 通知前端设备状态更新 self.cb_insert_result = cb_insert_result # 触发本地存储和LMIS上传 # 设备配置数据(从数据库加载) self.machineConfigsListData = [] result = self.getTableRecords('dtMachine', {}) # 从数据库中得到所有的已经配置的设备(跌落机) if 'data' in result: result = result["data"] self.machineConfigsListData = result # 设备运行时数据(包含工位状态、轮询线程等) self.dtMachineStationData = [] self.poll_connect_thread = None # 初始化设备和工位数据结构 self.initMachineStationData() # 从数据库中获取满足params条件的记录,表格名为table def getTableRecords(self, table, params): result = {} query_string = {**params, 'table': table} # 确保所有查询参数都转换为字符串 query_string = {k: str(v) for k, v in query_string.items()} if self.access_table_func is None: try: response = requests.get(url_base + '/dbTableAccess', params=query_string, json={}) if response.status_code == 200: result = response.json() else: print_with_timestamp('请求失败,状态码:', response.status_code, color='red') except requests.RequestException as e: print_with_timestamp('请求过程中发生错误:', e, color='red') else: if self.flask_app is None: print_with_timestamp('错误: flask_app 未初始化,无法执行内部调用', color='red') return result try: # 这个只是准备环境 with self.flask_app.test_request_context('/dbTableAccess', method='GET', query_string=query_string, data={}, content_type='application/json'): # 触发测试视图函数 response = self.access_table_func() if response.status_code == 200: response_data = response.get_data(as_text=True) # 解析为 JSON try: result = json.loads(response_data) except json.JSONDecodeError as error: print_with_timestamp(f'JSON 解析错误:{error}', color='red') else: print_with_timestamp(f'请求失败,状态码:{response.status_code}', color='red') except Exception as error: print_with_timestamp(f'测试请求过程中发生错误:{error}', color='red') return result def updateTableRecords(self, table, records): result = {} query_string = {'table': table} data = {"action": "update", "records": records} if self.access_table_func is None: response = requests.post(url_base + '/dbTableAccess', params=query_string, json=data) if response.status_code == 200: result = response.json() else: print_with_timestamp('请求失败', color='red') pass else: if self.flask_app is None: print_with_timestamp('错误: flask_app 未初始化,无法执行内部调用', color='red') return result try: # 这个只是准备环境 with self.flask_app.test_request_context('/dbTableAccess', method='POST', query_string=query_string, json=data, content_type='application/json'): # 触发测试视图函数 response = self.access_table_func() if response.status_code == 200: response_data = response.get_data(as_text=True) # 解析为 JSON try: result = json.loads(response_data) except Exception as error: pass except Exception as error: print_with_timestamp(f'测试请求过程中发生错误:{error}', color='red') return result def insertTableRecords(self, table, records): result = {} query_string = {'table': table} data = {"action": "insert", "records": records} if self.access_table_func is None: response = requests.post(url_base + '/dbTableAccess', params=query_string, json=data) if response.status_code == 200: result = response.json() else: print_with_timestamp('请求失败', color='red') pass else: if self.flask_app is None: print_with_timestamp('错误: flask_app 未初始化,无法执行内部调用', color='red') return result try: # 这个只是准备环境 with self.flask_app.test_request_context('/dbTableAccess', method='POST', query_string=query_string, json=data, content_type='application/json'): # 触发测试视图函数 response = self.access_table_func() if response.status_code == 200: response_data = response.get_data(as_text=True) # 解析为 JSON try: result = json.loads(response_data) if result['status'] == 'error': print_with_timestamp(f"----insert table error--0:{result}", color='red') except Exception as error: print_with_timestamp(f"----insert table error:{error}", color='red') pass except Exception as error: print_with_timestamp(f'测试请求过程中发生错误:{error}', color='red') return result def initMachineStationData(self): stationIndex = 0 for index, machine_config in enumerate(self.machineConfigsListData): machine_config['selected'] = True # 初始化选中 ,数据库中没有此字段 if 'plcAddress' not in machine_config: # PLC的站点地址,默认为0x01 machine_config['plcAddress'] = '01' if machine_config['plcAddress'] is None: machine_config['plcAddress'] = '01' if 'com' not in machine_config or machine_config['com'] is None: # 和plc 通讯的串口,如果没有设置,设置默认为com2 machine_config['com'] = 'com2' else: machine_config['com'] = machine_config['com'].lower() # 转成小写 pass machine_config['tcp_modbus_unit'] = index + 1 machine_drop_register = {} try: # 从machine 的数据库表格中取得 PLC 对应的寄存器配置 ,总共可以设置4个工作台01--04 machine_baud_register = json.loads(machine_config.get('baudrate')) machine_drop_register['01'] = json.loads(machine_config.get('register01')) machine_drop_register['02'] = json.loads(machine_config.get('register02')) machine_drop_register['03'] = json.loads(machine_config.get('register03')) machine_drop_register['04'] = json.loads(machine_config.get('register04')) if machine_baud_register is None: machine_baud_register = comm_config except Exception as error: # 如果从数据库表格中取得寄存器配置失败,则使用默认值 machine_drop_register = drop_register machine_baud_register = comm_config print_with_timestamp(f"dtMachineService 从数据库表格中取得寄存器配置失败,使用默认值,{error}", color='red') finally: pass if machine_config.get('type') == 'xinjie': plc_com_config = comm_config_xinjie plc_com_config = machine_baud_register machine = {'label': machine_config.get('label'), 'SN': machine_config.get('SN'), 'key': index, 'com': machine_config.get('com'), 'testType': machine_config.get('testType'), 'plcAddress': machine_config.get('plcAddress'), 'type': machine_config.get('type'), 'connectState': 0x00, # 初始化为离线状态,等待PLC检测后更新 'virt_plc_device': ModbusDtMachine(machine_config.get('type'), machine_config.get('com'), plc_com_config, machine_drop_register, int(machine_config.get('plcAddress')), machine_config.get('tcp_modbus_unit'), machine_config.get('SN')), 'comm_config': plc_com_config, 'tcp_modbus_unit': index + 1} # tcp_modbus_unit 对应modbus tcp的unitid 站点号,虚拟的,和串口的PLC 不一样 machine['poll_thread'] = threading.Thread(target=self.machine_poll, daemon=True, args=[{"machine": machine, "busy_flag": self.busy_flag}]) machine['poll_thread'].start() stations = [] stationsAvailable = [False, False, False, False] for i in range(1, 5): if f'station{i}' in machine_config and machine_config[f'station{i}'] \ and machine_config[f'station{i}'] != 'NA': stationsAvailable[i - 1] = True for i in range(1, 5): if stationsAvailable[i - 1]: station_inner = {'dtMachineSN': machine_config['SN'], 'dtMachineLabel': machine_config['label'], 'label': f'工位{i}', 'SN': f'0{i}', 'key': index, 'formal': True, 'status': 'idle', 'index': stationIndex} debugDutData = {'name': '二型电池组件', 'SN': '38383838', 'project': 'CB-IHU21', 'phase': 'A Sample', 'testReq': 'kdkakd', 'itemOnGoing': '正面-500', 'itemFinished': 8} # dtStationAttachDut('init', station, debugDutData, False) station = ObservableDict(station_inner, self.machine_data_setter, 0.3) stations.append(station) else: hasFollowed = False for j in range(i + 1, 5): hasFollowed = hasFollowed or stationsAvailable[j - 1] if hasFollowed: station_inner = {'dtMachineSN': machine_config['SN'], 'label': 'NA', 'SN': -1} station = ObservableDict(station_inner, self.machine_data_setter, 0.3) stations.append(station) machine['stations'] = stations self.dtMachineStationData.append(machine) # 增加1台机器(每台机器最多4个工作台) # print_with_timestamp(self.dtMachineStationData[1]) def machine_data_setter(self, action, value): if isinstance(value, dict) and value.get('dtMachineSN') and self.cb_machine_data_change: machine_sn = value.get('dtMachineSN') self.cb_machine_data_change([machine_sn], self.get_machines_value([machine_sn])) # print_with_timestamp(f"machine_data_setter {action} {station.get('dtMachineSN')} {value.get('SN')}") pass def get_machine_com_config(self, machine_sn=None): value = [] for machine in self.dtMachineStationData: if machine_sn and machine_sn == machine['SN']: return [{'SN': machine.get('SN'), 'com': machine.get('com'), 'comm_config': machine.get('comm_config'), 'plcAddress': machine.get('plcAddress'), 'type': machine.get('type'), 'tcp_modbus_unit': machine.get('tcp_modbus_unit')}] if not machine_sn: value.append({'SN': machine.get('SN'), 'com': machine.get('com'), 'comm_config': machine.get('comm_config'), 'plcAddress': machine.get('plcAddress'), 'type': machine.get('type'), 'tcp_modbus_unit': machine.get('tcp_modbus_unit')}) return value def dtm_virt_machine_client_connect(self): for machine in self.dtMachineStationData: if machine.get('virt_plc_device') and isinstance(machine.get('virt_plc_device'), ModbusDtMachine): machine['virt_plc_device'].connect_to_modbus_server(self.modbusServer, self.modbusServerPort) pass def data_change_callback(self, station, testReqId): # dutDirectionMode 为2时不执行 cyclesTotal = 0 try: dut = station.get("dut") testReqList = dut.get("testReqList") for testReq in testReqList: cyclesTotal = cyclesTotal + int(testReq.get("dropCycles")) except Exception as e: pass response = self.getTableRecords('testReqResult', {'id': testReqId, 'pageSize': 100, 'currentPage': 1}) if 'data' in response and isinstance(response['data'], list): # 如果返回多个结果,取第一个: data = response['data'][0] data['cyclesTotal'] = cyclesTotal if 'stationAssigned' in data and 'dtMachine' in data: dtMachineResponse = \ self.getTableRecords('dtMachine', {'SN': data.get("dtMachine")}) # 获得设备名称和状态 if 'data' in dtMachineResponse and isinstance(dtMachineResponse['data'], list): # 如果返回多个结果,取第一个: data['deviceName'] = dtMachineResponse['data'][0].get('label') # 数据库中设备名称为label data['deviceStatus'] = dtMachineResponse['data'][0].get('status') # 数据库中设备名称为label pass if self.cb_station_result_change: self.cb_station_result_change(station, data) pass def drop_params_to_dut_req_list(self, station, drop_params): # 从PLC取得的跌落参数 转成station 中的testReqList if station.get('dut'): dut = station.get('dut') dutDropCycles = 0 old_testReqList = dut.get('testReqList', []) # 保存旧的testReqList dut['testReqList'] = [] for direction_drop_param in drop_params: if direction_drop_param.get('dropCode') != INVALID_DROP_DIRECTION: # 表示无效的跌落方向 drop_param_json = {} drop_param_json['dropCode'] = direction_drop_param.get('dropCode', 0) # 20251201修改,直接方向名称给到前端 direction_name = DROP_DIRECTIONS.get(direction_drop_param.get('dropCode'), "未知方向") drop_param_json['item'] = direction_name drop_height = direction_drop_param.get('dropHeight') if isinstance(drop_height, int): # 需要进行除100,PLC中读回的参数是高度 x 100 (单位为mm) drop_param_json['height'] = drop_height / 100 else: drop_param_json['height'] = drop_height drop_param_json['count'] = direction_drop_param.get('dropCycles') # 保留之前的itemCurrentCycles for old_req in old_testReqList: if old_req.get('item') == direction_name and 'itemCurrentCycles' in old_req: drop_param_json['itemCurrentCycles'] = old_req['itemCurrentCycles'] break dut['testReqList'].append(drop_param_json) dutDropCycles = dutDropCycles + direction_drop_param.get('dropCycles') dut['dutDropCycles'] = dutDropCycles # 样品设定的跌落总次数 pass def test_result_add_dut_info(self, station, test_result, dutInfo): # 将dut 信息添加到测试结果中 if dutInfo: test_result['dutProject'] = dutInfo.get("project", "") # 项目代码 test_result['dutPhase'] = dutInfo.get("projectPhase", "") # 项目阶段 test_result['dutProjectType'] = dutInfo.get("projectType", "") # 项目方案 test_result['dutWeeks'] = str(dutInfo.get("weeks", 0)) # 测试周次 test_result['dutWorkOrder'] = dutInfo.get("workOrder", "") # 工单 if 'SN' in dutInfo: test_result['SN'] = dutInfo.get("SN", "") # 产品序列号 elif "dutSN" in dutInfo: test_result['SN'] = dutInfo.get("dutSN", "") # 产品序列号 # 从PLC读出的测试结果(系列寄存器) 进行处理 def handle_test_result_registers_value(self, machine, station, registers_value, itemCount): # print_with_timestamp(f"handle_test_result_registers_value {itemCount} {registers_value}") registers_len = PLCTestReqResultData.required_registers_size sub_registers = [registers_value[i * registers_len:(i + 1) * registers_len] for i in range(itemCount)] for index, testReqResultRegisters in enumerate(sub_registers): pre_result_sn = station.get("pre_dut_result_counter", 0) try: test_result, error_parse = PLCTestReqResultData.parse_registers(testReqResultRegisters) dut_result_counter = test_result.get('stationDropCycles', 0) print_with_timestamp( f"handle_test_result_registers_value --1 stationDropCycles {dut_result_counter} {pre_result_sn} " f"{station['result_index_read']}") if dut_result_counter == 0: print_with_timestamp(f"handle_test_result_registers_value --2 {test_result}") if dut_result_counter > pre_result_sn: # 新的测试结果 station['pre_dut_result_counter'] = dut_result_counter # station['result_index_read'] = (station['result_index_read'] + TestReqResult_ItemCountOnceRead) % 10 update_counter = {"dutDropCycles": test_result['dutDropCycles'], "itemCurrentCycles": test_result['itemCurrentCycles'], 'current_counter': station['last_insert_counter']} if error_parse: update_counter['error2'] = error_parse self.updateTableRecords('get_counter', [update_counter]) cb_test_result = {} cb_test_result['dropDirection'] = test_result['directionCode'] # 跌落方向 cb_test_result['dropHeight_int'] = test_result['dropHeight'] # 跌落高度 cb_test_result['dropCycles'] = test_result['dutDropCycles'] # 样品跌落测试 cb_test_result['dropSpeed'] = test_result['dropSpeed'] # 跌落速度 cb_test_result['itemCurrentCycles'] = test_result['itemCurrentCycles'] # 当前方向跌落次数 cb_test_result['stationDropCycles'] = test_result['stationDropCycles'] # 木板跌落测试 cb_test_result['machine_sn'] = test_result['machine_sn'] cb_test_result['station_no'] = test_result['station_no'] time_text = ( f"begin:{test_result.get('beginYear')}-" f"{test_result.get('beginMonth')}-{test_result.get('beginDay')}" f" {test_result.get('beginHour')}:{test_result.get('beginMinute')}" f":{test_result.get('beginSecond')} end:{test_result.get('endYear')}-" f"{test_result.get('endMonth')}-{test_result.get('endDay')}" f" {test_result.get('endHour')}:{test_result.get('endMinute')}" f":{test_result.get('endSecond')} " ) cb_test_result['time_text'] = time_text try: cb_test_result['startTime'] = test_result.get('startTime', "") cb_test_result['endTime'] = test_result.get('endTime', "") if station['dut']['duts']: duts_on_station = station['dut']['duts'].to_list() except Exception as error: update_counter = {"error1": f"error!!!!--set startTime endTime {error}", 'current_counter': station['last_insert_counter']} self.updateTableRecords('get_counter', [update_counter]) finally: # self.insertTableRecords('TestReq', [cb_test_result]) # 20250901 修改,兼容1个工位(通道) 可以放置多个测试样品 cyx if duts_on_station and isinstance(duts_on_station, list): for dut in duts_on_station: cb_insert_result_copied = dict(cb_test_result) self.test_result_add_dut_info(station, cb_insert_result_copied, dut) # print_with_timestamp(f"handle_test_result_registers_value --2 {cb_insert_result_copied} ") if self.cb_insert_result: # 将结果进行回调处理,存入数据库和上传LIMS self.cb_insert_result(cb_insert_result_copied) else: self.test_result_add_dut_info(station, cb_test_result, station['dut']) if self.cb_insert_result: # 将结果进行回调处理,存入数据库和上传LIMS self.cb_insert_result(cb_test_result) directionCodeOngoing = test_result.get('directionCode') # 此条记录的跌落方向码 if station['dut'] and station['dut']['dutDropCycles'] and 0: # 判断样品试验是否已经完成 202509 改判断PLC状态 # print_with_timestamp(f"result {test_result['dutDropCycles'] } {station['dut']['dutDropCycles'] }") if int(test_result['dutDropCycles']) >= int(station['dut']['dutDropCycles']): print_with_timestamp( f"----station test finished!----", color='green') station['status'] = 'finished' # 设置工位试验已经完成的标志 # print_with_timestamp(f"{itemCount} index -- {result_index + index} {test_result.get( # 'itemCurrentCycles')}", test_result) if station['dut'] and station['dut']['testReqList']: station['lastDropResult'] = ( f"{test_result.get('itemSN')} 方向:{test_result.get('directionCode')} begin:{test_result.get('beginYear')}-" f"{test_result.get('beginMonth')}-{test_result.get('beginDay')}" f" {test_result.get('beginHour')}:{test_result.get('beginMinute')}" f":{test_result.get('beginSecond')} end:{test_result.get('endYear')}-" f"{test_result.get('endMonth')}-{test_result.get('endDay')}" f" {test_result.get('endHour')}:{test_result.get('endMinute')}" f":{test_result.get('endSecond')} 方向次数:{test_result.get('itemCurrentCycles')}" f"样品总次数:{test_result.get('dutDropCycles')} 木板累积次数:{test_result.get('stationDropCycles')}" f" 高度:{test_result.get('dropHeight') / 100} 高度:{test_result.get('dropSpeed') / 10}" f" 通道:{test_result.get('station_no')} 设备编码:{test_result.get('machine_sn')}" f" 设定总次数 {station['dut']['dutDropCycles']}" ) testReqList = station['dut']['testReqList'] direction_name_ongoing = DROP_DIRECTIONS.get(directionCodeOngoing, "未知方向") # 将方向码转为方向名称 for index, testReq in enumerate(testReqList): # if testReq.get('item') == direction_name_ongoing: if testReq.get('dropCode') == directionCodeOngoing: # 设置通道当前正在进行的跌落方向(码) station['itemOnGoingIndex'] = index # 设置通道当前跌落方向已经完成的跌落次数 testReq['itemCurrentCycles'] = test_result.get('itemCurrentCycles') except Exception as error: update_counter = {"error2": f"error!!!!--PLCTestReqResultData.parse_registers {error}", 'current_counter': station['last_insert_counter']} self.updateTableRecords('get_counter', [update_counter]) pass # 从PLC读取测试结果数据,允许批量读取,而不仅是一条,参数见TestReqResult_ItemCountOnceRead def machine_poll_test_result_bulk(self, machine, station): # 2025 12 01 修改 cyx read_index_result = machine.get('virt_plc_device').read_plc_read_index() if read_index_result.get('status') == 'success': self.handle_readIndex_sync_with_plc(machine, read_index_result.get('value', [])) start_time = datetime.datetime.now() registers_len = PLCTestReqResultData.required_registers_size * TestReqResult_ItemCountOnceRead if station.get('result_index_read') is None: station['result_index_read'] = 0 try: result = machine.get('virt_plc_device'). \ read_station_cyclesFinishedBulk(station['SN'], station['result_index_read'], registers_len) # 一次性读取多条 end_time = datetime.datetime.now() # 计算时间差 time_diff = (end_time - start_time).total_seconds() * 1000 # 转换为毫秒 # 检查是否超过 300 毫秒 if time_diff > 300: # 进行处理 print_with_timestamp(f"Operation read_station_cyclesFinishedBulk took too long: {time_diff} ms " f"{machine.get('SN')} {station['SN']}", color='red') if result.get('status') == 'success': try: update_counter = {"get_result": station.get("last_insert_counter", 0), 'current_counter': station['last_insert_counter'], 'read_index': station['result_index_read'], 'address': result.get('address'), 'access_time': time_diff} self.updateTableRecords('get_counter', [update_counter]) self.handle_test_result_registers_value(machine, station, result.get('value'), TestReqResult_ItemCountOnceRead) except Exception as error: print_with_timestamp(f"error!!!!--machine_poll_test_result_bulk " f"{machine.get('SN')} {station['SN']} {error}", color='red') finally: pass else: update_counter = {"get_result": 0, 'current_counter': station['last_insert_counter']} self.updateTableRecords('get_counter', [update_counter]) except Exception as error: print_with_timestamp(f"error!!!!--read_station_cyclesFinishedBulk {error}", color='red', ) """ # (plc_status, current_status) -> (action_type, action_name, need_read_dut) TRANSITION_TABLE = { # ================================== # PLC 状态 0: 停止 # 目标:station 状态应为 'idle' # ================================== (0, 'idle'): ('none', None, False), (0, 'running'): ('detach', 'stop_by_plc', False), (0, 'pause'): ('detach', 'stop_by_plc', False), # ================================== # PLC 状态 1: 运行中(试验中) # 目标:station 状态应为 'running' # ================================== (1, 'idle'): ('attach_dut', 'resync_with_plc_run', True), (1, 'running'): ('none', None, False), (1, 'pause'): ('attach', 'resume_by_plc', False), # ================================== # PLC 状态 2: 暂停中 # 目标:station 状态应为 'pause' # ================================== (2, 'idle'): ('attach_dut', 'resync_with_plc_pause', True), (2, 'running'): ('attach', 'pause_by_plc', False), (2, 'pause'): ('none', None, False), } """ def handle_readIndex_sync_with_plc(self, machine, read_index_values): try: if len(read_index_values) >= 4: for index, station in enumerate(machine.get('stations', [])): # 每个工位 station['result_index_read'] = read_index_values[index] - 1 except Exception as e: pass def handle_status_sync_with_plc(self, machine, connect_state_values): # 处理从PLC读回的各个通道的状态 debug = False # if machine.get('com') == 'com7': debug = True if debug: print_with_timestamp(f"handle_status_sync_with_plc, {machine.get('com')} {connect_state_values}") def updateDutStatus(status): if station['dut']['duts']: duts_on_station = station['dut']['duts'].to_list() update_values = [{"SN": item.get('SN', ''), "status": status} for item in duts_on_station] if len(update_values): self.updateTableRecords('dutList', update_values) for index, station in enumerate(machine.get('stations', [])): # 每个工位 plc_status = connect_state_values[1 + index] # 获取当前工位状态 station_status = station.get('status', 'unknown') machine_sn = machine.get('SN') station_sn = station['SN'] if plc_status == 0: # 需要清除 testReqList 中的当前跌落次数计数 cyx 20251201 if debug: print_with_timestamp(f"---{station_sn} handle_status_sync_with_plc--0") testReqList = station.get('dut', {}).get('testReqList', []) if not isinstance(testReqList, list): testReqList = [] for index_testReq, testReq in enumerate(testReqList): testReq['itemCurrentCycles'] = "" # --- 状态转换表 --- # (plc_status, station_status) -> (action_type, action_name, need_read_dut_on_plc) TRANSITION_TABLE = { (0, 'idle'): ('none', None, False), (0, 'running'): ('detach', 'stop_by_plc', False), (0, 'pause'): ('detach', 'stop_by_plc', False), (1, 'idle'): ('attach_dut', 'resync_with_plc_run', True), (1, 'running'): ('none', None, False), (1, 'pause'): ('attach', 'resume_by_plc', False), (2, 'idle'): ('attach_dut', 'resync_with_plc_pause', True), (2, 'running'): ('attach', 'pause_by_plc', False), (2, 'pause'): ('none', None, False), (3, 'pause'): ('finish', None, False), # station['status'] = 'finished' (3, 'running'): ('finish', None, False), # station['status'] = 'finished' } key = (plc_status, station_status) entry = TRANSITION_TABLE.get(key) # print_with_timestamp(f"handle_status_sync_with_plc--1 {key} {entry}") if not entry: # 可选:记录未知状态组合 # logger.warning(f"Unsupported PLC-station state: {key}") continue action_type, action_name, need_read_dut_on_plc = entry # --- 执行动作 --- if action_type == 'none': continue # 状态一致,无需操作 elif action_type == 'finish': pass station['status'] = 'finished' print_with_timestamp(f"----station test finished!---- PLC_status = {plc_status}", color='green') updateDutStatus('已完成') elif action_type == 'detach': self.dtStationDetachDut(machine_sn, station_sn, action_name) elif action_type == 'attach': self.dtStationAttachDut( machine_sn, station_sn, action_name, None, False, None ) if action_name in ['resume_by_plc']: dut_status = "试验中" elif action_name in ['pause_by_plc']: dut_status = "暂停中" updateDutStatus(dut_status) elif action_type == 'attach_dut': # 需要从 PLC 读取当前 DUT SN 列表 dut_info = machine.get('virt_plc_device').read_station_dutInfo( station_sn, sn_count=connect_state_values[0] ) duts_sn = [] if dut_info.get('status') == 'success' and 'data' in dut_info: data_content = dut_info.get('data', []) if isinstance(data_content, dict): # 单个字典的情况 duts_sn.append(data_content.get('SN', '')) else: # 列表的情况 for item in data_content: duts_sn.append(item.get('SN', '')) self.dtStationAttachDut(machine_sn, station_sn, action_name, duts_sn, False, None) if action_name in ['resync_with_plc_run']: dut_status = "试验中" elif action_name in ['resync_with_plc_pause']: dut_status = "暂停中" else: dut_status = "试验中" updateDutStatus(dut_status) # print_with_timestamp(f"handle_status_sync_with_plc--2\n") # ======================================================================== # 核心轮询函数: 每台设备独立线程执行 # ======================================================================== # 轮询周期: 0.6秒 # 主要任务: # 1. 读取PLC连接状态和工位运行状态(1.2秒间隔) # 2. 读取PLC设置的跌落参数(1.2秒间隔,仅dutDirectionMode=2) # 3. 读取测试结果计数器并批量读取结果(每次轮询) # 4. 同步PLC状态到工位状态 # ======================================================================== def machine_poll(self, args): # 每个机器通过此线程工作函数来同步设备的数据与状态,通过modbus (RS-485) """ PLC数据轮询主循环 - 每台设备独立线程 参数: args: 字典,包含: - machine: 设备对象 - busy_flag: 全局繁忙标志(暂未使用) 轮询策略: - 0.6秒轮询间隔 (POLL_PLC_INTERVAL) - 分任务计数器控制执行频率 - 每次轮询检查所有工位的running状态 """ machine = args["machine"] busy_flag = args["busy_flag"] # 任务执行计数器 second_counter = 0 # PLC连接状态检测计数(每2秒) read_drop_params_counter = 0 # 跌落参数读取计数(每2秒) run_status_counter = 0 # 运行状态检测计数(暂未使用) # 安全检查: 必须有有效的PLC通信对象 if not machine or not machine.get('virt_plc_device'): return # 无限轮询循环 while True: machine_sn = machine.get('SN') if busy_flag is True: print_with_timestamp("machine is busy", color='red') # cyx if busy_flag is False: current_time = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] second_counter = second_counter + 1 run_status_counter = run_status_counter + 1 read_drop_params_counter = read_drop_params_counter + 1 # ------------------------------------------------------------ # 任务1: 读取PLC连接状态和工位运行状态 # 频率: 每2秒 (second_counter达到阈值) # 读取内容: D10寄存器的5个值 [连接标志, 工位1状态, 工位2, 工位3, 工位4] # ------------------------------------------------------------ if second_counter >= (2 / POLL_PLC_INTERVAL): # 2 x 0.3 =0.6 秒 PLC 的连接状态 second_counter = 0 try: start_time = datetime.datetime.now() # 离线调试模式: 模拟PLC响应 if OFF_LINE_FOR_DEBUG: # 20250826 为了PLC没有连接时进行调试 result = {'status': 'success', 'message': f"get plc connect state success", 'value': [2, 0, 0, 0, 0, 0, 0, 0, 0]} # [连接状态, 工位1, 工位2, 工位3, 工位4] else: # 正常模式: 读取PLC的D10寄存器(5个字) result = machine.get('virt_plc_device').read_plc_connect_state() end_time = datetime.datetime.now() # 性能监控: 检测读取耗时 time_diff = (end_time - start_time).total_seconds() * 1000 # 转换为毫秒 if time_diff > 300: # 超过300ms告警 print_with_timestamp( f"⚠️ PLC状态读取超时: {time_diff:.0f}ms (阈值300ms) " f"设备: {machine.get('com')} 时间: {start_time} → {end_time}", color='red' ) # 处理读取结果 if result.get('status') == 'success' and result.get('value') and result.get('value')[0]: # 成功读取: 更新设备连接状态 # result['value'] = [连接状态码, 工位1状态, 工位2状态, 工位3状态, 工位4状态] machine['connectState'] = result.get('value')[0] # 同步PLC状态到各工位 (核心状态机处理) self.handle_status_sync_with_plc(machine, result.get('value')) else: # 读取失败: 设置为离线状态 machine['connectState'] = 0x00 self.handle_status_sync_with_plc(machine, [0x00, 0x00, 0x00, 0x00, 0x00]) # 通知前端设备状态变化 (通过WebSocket推送) self.cb_machine_data_change([machine_sn], self.get_machines_value([machine_sn])) except Exception as e: # WARNING: 此处吞噬异常,应改为结构化异常处理 print_with_timestamp(f"❌ 读取PLC状态异常: {machine.get('SN')} - {e}", color='red') # ------------------------------------------------------------ # 任务2: 读取PLC设置的跌落参数 (dutDirectionMode=2时) # 频率: 每2秒 # 场景: 操作员在PLC HMI上设置跌落方向、高度、次数 # 读取内容: 6个跌落方向的参数 (每个5个register) # ------------------------------------------------------------ if read_drop_params_counter > (2 / POLL_PLC_INTERVAL): # 读取在PLM HMI上设置的跌落参数(方向 次数) read_drop_params_counter = 0 # 只处理dutDirectionMode=2的模式 (从PLC读取跌落参数) for station in machine.get('stations', []): # 每个工位 if self.dutDirectionMode in [2]: # 只处理running状态的工位 if station.get('status') in ['running']: # ['running'] : # 读取在PLC上设置的方向跌落参数,最多6个方向 result = self.read_station_dropParamsBulk( machine_sn, station.get('SN'), PLCDropParamsData.required_registers_size ) if result.get('status') == 'success' and result.get('value'): try: # 解析寄存器数据为跌落参数列表 drop_params = PLCDropParamsData.parse_registers(result.get('value')) # 转换为工位的testReqList格式 self.drop_params_to_dut_req_list(station, drop_params) except Exception as error: print_with_timestamp( f"❌ 解析跌落参数失败: {machine_sn}/{station.get('SN')} - {error}", color='red' ) for station in machine.get('stations', []): # 每个工位 if station.get('status') == 'running' and self.dutDirectionMode in [0, 3]: # 2不执行 start_time = datetime.datetime.now() result = machine.get('virt_plc_device').read_station_cyclesFinished(station['SN']) end_time = datetime.datetime.now() # 计算时间差 time_diff = (end_time - start_time).total_seconds() * 1000 # 转换为毫秒 # 检查是否超过 300 毫秒 if time_diff > 300: # 进行处理 print_with_timestamp( f"Operation read_station_cyclesFinished took too long: {time_diff} ms " f"{machine.get('SN')} {station['SN']}", color='red') if result.get('status') == 'success' and result.get('value'): station['finished'] = result.get('value') station['itemFinished'] = result.get('value') dut = station.get('dut', {}) testReqList = dut.get('testReqList', []) count = testReqList[station.get('itemOnGoingIndex')].get('count') testReqId = testReqList[station.get('itemOnGoingIndex')].get('id') if result.get('value') != testReqList[station.get('itemOnGoingIndex')].get( "itemCurrentCycles", 0): testReqList[station.get('itemOnGoingIndex')]['itemCurrentCycles'] = result.get('value') if count and count != -1: # 本次测试项是不是已经结束,判断是否需要切换到下一个测试项 # 样品的每一个测试项结束时需要将结果上传至LMIS(实验室管理)系统 if result.get('value') >= count and station['itemOnGoingIndex'] < len(testReqList) - 1: self.dtStationAttachDut(station['dtMachineSN'], station['SN'], 'switch', None, True) self.data_change_callback(station, testReqId) if result.get('value') >= count and station['itemOnGoingIndex'] == len(testReqList) - 1: self.dtStationDetachDut(station['dtMachineSN'], station['SN'], 'finish') self.data_change_callback(station, testReqId) """ result = machine.get('virt_plc_device').station_start_status(station['SN']) if result.get('status') == 'success' and result.get('value'): start_status = result.get('value') # 暂停状态为1时,表示已经启动 0--没有启动 # TBD 需要更新工位的状态 result = machine.get('virt_plc_device').station_stop_status(station['SN']) if result.get('status') == 'success' and result.get('value'): stop_status = result.get('value') # 暂停状态为1时,表示已经停止 0--没有停止 # TBD 需要更新工位的状态 result = machine.get('virt_plc_device').station_pause_status(station['SN']) if result.get('status') == 'success' and result.get('value'): pause_status = result.get('value') # 暂停状态为1时,表示暂停 0--没有暂停 # TBD 需要更新工位的状态 """ """ if run_status_counter >= 5: # 5 x 0.3 =3 PLC 的运行状态 1.5秒 run_status_counter = 0 try: if station.get('status') in ['running']: result = machine.get('virt_plc_device').station_run_status(station['SN']) if result.get('status') == 'success' and 'value' in result: run_status = result.get('value') if run_status is False: station['status'] = 'pause' if station.get('status') in ['stop' , 'pause']: result = machine.get('virt_plc_device').station_run_status(station['SN']) if result.get('status') == 'success' and 'value' in result: run_status = result.get('value') if run_status is True: station['status'] = 'running' finally: pass """ if station.get('status') in ['running', 'pause', 'finished'] and self.dutDirectionMode in [2]: if machine['connectState'] == 0x00: station['lastDropResult'] = f"!!!error:设备未连接!!!" # 让UI上显示设备未连接 # 先读取PLC当前样品跌落次数计数 # 比较PLC当前样品跌落次数 和 station 中存的之前已经读取到的样品跌落计数,只有大于这个值, # 表示有新的跌落结果,才需要去读取 now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") insert_record = {"time": now_str, "machine": machine.get('SN'), 'station': station.get('SN')} result = machine.get('virt_plc_device').read_station_result_counter(station['SN']) if result.get('status') == 'success': raw_new = result.get('value') prev = station.get("pre_dut_result_counter", 0) last_insert_counter = station.get("last_insert_counter", 0) jitter_tolerance = 2 event = 'duplicate' delta = 0 if raw_new > prev: delta = raw_new - prev event = 'increment' elif raw_new == prev: event = 'duplicate' else: if (prev - raw_new) <= jitter_tolerance: event = 'jitter' delta = 0 else: event = 'rollover' # 不计算跨界delta,避免误差;保持触发读取 delta = 0 should_read_results = (event in ['increment', 'rollover']) and ( raw_new != last_insert_counter) # 记录计数事件 insert_record['event'] = event insert_record['delta'] = delta insert_record['current_counter'] = raw_new insert_record['pre_counter'] = prev if should_read_results: insert_record['status'] = 1 station['last_insert_counter'] = raw_new self.insertTableRecords('get_counter', [insert_record]) # 从PLC读取完整的测试结果(可能是多条) self.machine_poll_test_result_bulk(machine, station) elif event in ['duplicate', 'jitter']: insert_record['status'] = 0 self.insertTableRecords('get_counter', [insert_record]) else: # 其他情况,保守记录 insert_record['status'] = 0 self.insertTableRecords('get_counter', [insert_record]) else: insert_record['status'] = 0 self.insertTableRecords('get_counter', [insert_record]) time.sleep(POLL_PLC_INTERVAL) def setMachineBusyFlag(self, flag=False): self.busy_flag = flag pass # 前端通过RESTAPI(websocket备选)在工位上安排试验样品,样品需要在数据库中创建,并绑定到工位上 # machine_sn: 机器SN station_sn: 工位SN action: 动作 dut_sn: 样品SN updateTableFlag: 是否更新表格 def dtStationAttachDut(self, machine_sn, station_sn, action, dut_sn, updateTableFlag, userid=None): print_with_timestamp('dtMachineService dtStationAttachDut', action, station_sn, machine_sn, dut_sn, userid) value_return = {"status": "error", "message": "dtStationAttachDut error"} machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine: station = find_item(machine['stations'], 'SN', station_sn) if station is None: return value_return else: return value_return # 20250827 增加判断从UI传下来的是单个dut的SN,还是多个dut的SN数组 dut_sn_type = 0 if dut_sn is None else 2 if isinstance(dut_sn, list) else 1 if isinstance(dut_sn, dict) else 0 all_duts = [] if action in ["create", "select", "update", "create_and_transfer", "select_and_transfer", "resync_with_plc_run", "resync_with_plc_pause"]: if dut_sn_type == 1: response = self.getTableRecords('dutList', {'SN': dut_sn}) # 从数据库取得dut if 'data' in response: dut = response['data'] if isinstance(dut, list): # 如果返回多个结果,取第一个 dut = dut[0] else: return value_return elif dut_sn_type == 2: for index, sn in enumerate(dut_sn): response = self.getTableRecords('dutList', {'SN': sn}) # 从数据库取得dut if 'data' in response: cur_dut = response['data'] if isinstance(response['data'], list): cur_dut = response['data'][0] if index == 0: # 20250827 如果UI传递多个DUT SNS,那么取第一个作为默认的测试样品,代替选择1个测试样品的作用 dut = cur_dut all_duts.append(cur_dut) # 将所有查询到的测试样品信息形成为数组 else: return value_return elif dut_sn_type == 0: return value_return else: if station.get('dut') and station['dut'].get('SN'): # 获取工作台当前已经安排的dut dut = station['dut'] # ---- else: return value_return value = {"SN": dut["SN"], "status": "待启动"} # 样品状态初始值设置 #---- # 将样品状态进行转换 if action == "select" or action == "create" or action == "update": value["status"] = "待启动" elif action in ['select_and_transfer', 'create_and_transfer']: value["status"] = "待启动" elif action in ['resync_with_plc_run']: value["status"] = "试验中" elif action in ['resync_with_plc_pause']: value["status"] = "暂停中" elif action in ['pause_by_plc']: value["status"] = "暂停中" elif action in ['resume_by_plc']: value["status"] = "试验中" elif action == "start": value["status"] = "试验中" elif action == "resume": value["status"] = "试验中" elif action == "pause": value["status"] = "暂停中" elif action == "switch": value["status"] = "暂停中" elif action == "finish": value["status"] = "已完成" elif action == "update": updateTableFlag = False elif action == "init": value = {"SN": dut["SN"], "status": "待启动"} dut["status"] = "待启动" result_array = [] station["dutStatus"] = value["status"] # 试验工位上的样品状态 if action in ["create", "select", "update"]: stationKeys = ["dutName", "dutSN", "dutProject", "dutPhase"] dutKeys = ["name", "SN", "project", "phase"] value["stationAssigned"] = json.dumps({"dtMachine": machine_sn, "station": station_sn}) # 更新样品安排的机台和通道 # 将样品的一些信息和数据 拷贝到工作位的数据结构中,并且有些重新命名 for index, key in enumerate(dutKeys): if key in dut: # ---- station[stationKeys[index]] = dut[key] # ---- station["dut"] = dut # 将样品对象关联到工位对象中 station['dut']['duts'] = all_duts # 20250827 将所有测试样品信息数组作为station['dut']['duts'] station['status'] = 'dutAttached' # 20240706 工位状态增加 "dutAttached" 表示已经安排了样品 # 如果是新的样品关联,默认需要将样品的第1个测试项设置为当前测试项 result = self.dtStationAssignTestReq(machine_sn, station_sn, dut_sn, 0) result_array.append({"status": result.get('status'), "message": result.get('message')}) if result.get('status') == 'success': # 设置工位状态 # station['status'] = 'idle' station['status'] = 'dutAttached' # 20240706 工位状态增加 "dutAttached" 表示已经安排了样品 if userid: station['userid'] = userid if action in ["create_and_transfer", "select_and_transfer", "resync_with_plc_run", "resync_with_plc_pause"]: # 专门为在PLC HMI界面上设置跌落反向和跌落高度传输DUT信息 stationKeys = ["dutName", "dutSN", "dutProject", "dutPhase", "projectName", "projectPhase"] dutKeys = ["name", "SN", "project", "phase", "projectName", "projectPhase"] value["stationAssigned"] = json.dumps({"dtMachine": machine_sn, "station": station_sn}) # 更新样品安排的机台和通道 # 将样品的一些信息和数据 拷贝到工作位的数据结构中,并且有些重新命名 for index, key in enumerate(dutKeys): if key in dut: station[stationKeys[index]] = dut.get(key, None) station['pre_dut_result_counter'] = 0 # 清除之前备份的计数 station['dut'] = dut # 将样品对象关联到工位对象中 station['dut']['duts'] = all_duts # 20250827 将所有测试样品信息数组作为station['dut']['duts'] station['itemOnGoingIndex'] = 99 # 将当前的测试项目设为无效 if OFF_LINE_FOR_DEBUG: result = {'status': 'success'} elif action in ["create_and_transfer", "select_and_transfer"]: result = self.set_station_dutInfo(station['dtMachineSN'], station['SN'], all_duts) # 给PLC设置测试样品信息 elif action in ['resync_with_plc_run', 'resync_with_plc_pause']: # 如果action 为 resync_with_plc_run ,resync_with_plc_pause说明程序进入时,PLC这个通道已经在试验中 ,暂停中 result = {"status": "success"} else: pass result_array.append(result) if "status" in result and result["status"] == "success": # 如果刚刚工位状态为idle,需要更新测试项的开始试验时间 if station["status"] == "idle" or station["status"] == "dutAttached": # 设置工位状态 station["status"] = "running" # 20250101 更改新方案,由PLC启动试验,状态直接改为running if action in ["create_and_transfer", "select_and_transfer"]: station["status"] = "idle" # 202500905 更改新方案,由PLC启动试验 if action in ['resync_with_plc_pause']: station["status"] = "pause" # 202500905 更改新方案,由PLC启动试验,程序启动时PLC处于pause print_with_timestamp(f"dtStationAttachDut {action} status=", station["status"]) # cyx if updateTableFlag: # 更新数据库 dutList 中样品状态 response = self.updateTableRecords('dutList', [value]) if 'status' in response and response['status'] == 'success': result_array.append({"status": "success", "message": "update dutList success"}) else: result_array.append({"status": "error", "message": "update dutList error"}) if action == "pause_by_plc": result_array.append({"status": "success"}) # 设置工位状态 station["status"] = "pause" if action == "resume_by_plc": result_array.append({"status": "success"}) # 设置工位状态 station["status"] = "running" if action == "start": result = self.start_station(station['dtMachineSN'], station['SN']) result_array.append(result) if "status" in result and result["status"] == "success": # 如果刚刚工位状态为idle,需要更新测试项的开始试验时间 if station["status"] == "idle" or station["status"] == "dutAttached": testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项 testReqItem['startTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 更新数据库中 testReq 测试项的试验开始时间 和 执行的测试人员 self.updateTableRecords('testReq', [{"id": testReqItem['id'], "startTime": testReqItem['startTime'], "tester": station.get("userid", -1), "status": "running"}]) # 设置工位状态 station["status"] = "running" if action == "resume": result = self.resume_station(station['dtMachineSN'], station['SN']) result_array.append(result) if "status" in result and result["status"] == "success": if station["status"] == "switch": testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项 testReqItem['startTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 更新数据库中 testReq 测试项的试验开始时间 和 执行的测试人员 self.updateTableRecords('testReq', [{"id": testReqItem['id'], "startTime": testReqItem['startTime'], "tester": station.get("userid", -1), "status": "running"}]) else: testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项 testReqItem['startTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 更新数据库中 testReq 测试项的试验开始时间 和 执行的测试人员 self.updateTableRecords('testReq', [{"id": testReqItem['id'], "status": "running"}]) # 设置工位状态 station["status"] = "running" if action == "pause": result = self.stop_station(station['dtMachineSN'], station['SN']) result_array.append(result) if "status" in result and result["status"] == "success": testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项 # 更新数据库中 testReq 测试项的状态 self.updateTableRecords('testReq', [{"id": testReqItem['id'], "status": "pause"}]) # 设置工位状态 station["status"] = "pause" if action == "switch": result = self.stop_station(station['dtMachineSN'], station['SN']) result_array.append(result) if "status" in result and result["status"] == "success": # 更新本测试项的结束时间 testReqItem = dut['testReqList'][station['itemOnGoingIndex']] # 当前的测试项 testReqItem['endTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 更新数据库中 testReq 测试项的试验完成时间 和 执行的测试人员 及 完成状态 self.updateTableRecords('testReq', [{"id": testReqItem['id'], "endTime": testReqItem['endTime'], "tester": station.get("userid", -1), "status": "finished"}]) # 设置工位状态 station["status"] = "switch" station_return = station if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')): station_return = station.to_dict() value_return = {"status": "success", "message": "dtStationAttachDut success", "station": station_return} for result in result_array: if "status" not in result or result["status"] != "success": value_return = {"status": "error", "message": "dtStationAttachDut error", "station": station_return} return value_return def dtStationDetachDut(self, machine_sn, station_sn, action): value_return = {"status": "error", "message": "dtStationDetachDut error"} print_with_timestamp('dtMachineService dtStationDetachDut', action, station_sn, machine_sn) machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine: station = find_item(machine['stations'], 'SN', station_sn) if station is None: return value_return else: return value_return if action in ['reset']: # 主要是试验完成时 ,经确认后的复位 station['status'] = 'idle' station['userid'] = None station_return = station station['pre_dut_result_counter'] = 0 # 清除之前备份的计数 station['itemOnGoingIndex'] = 99 # 将当前的测试项目设为无效 if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')): station_return = station.to_dict() value_return = {"status": "success", "message": "station reset successful", "station": station_return} return value_return if action in ['stop_by_plc']: # 在PLC主控操作时,PLC可以停止试验 station['status'] = 'idle' if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')): station_return = station.to_dict() value_return = {"status": "success", "message": "station stop_by_plc successful", "station": station_return} return value_return if station.get('SN') and station.get('dtMachineSN'): if action in ['reselect', 'reCreate']: value = {'SN': station['dutSN'], 'status': '待安排'} elif action == 'stop': value = {'SN': station['dutSN'], 'status': '已取消'} elif action == 'finish': value = {'SN': station['dutSN'], 'status': '已完成'} elif action == 'cancel': value = {'SN': station['dutSN'], 'status': '已取消'} result_array = [] # 更新数据库中 dutList 样品状态 """ if value and value.get('SN'): print_with_timestamp(f'dtMachineService dtStationDetachDut--3-0') response = self.updateTableRecords('dutList', [value]) if 'status' in response and response['status'] == 'success': result_array.append({"status": "success", "message": "update dutList success"}) else: result_array.append({"status": "error", "message": "update dutList error"}) station['dutStatus'] = value.get('status') """ testReqStatus = None if action in ['stop', 'cancel', 'finish']: try: # 对于PLC来说,不管是stop cancel finish 都是要停止机器的运行 result = self.stop_station(station['dtMachineSN'], station['SN']) result_array.append(result) testReqItem = station['dut']['testReqList'][station['itemOnGoingIndex']] # 当前的测试项 if "status" in result and result["status"] == "success": if action == 'stop': station['status'] = 'stop' testReqStatus = "stop" elif action == 'cancel': station['status'] = 'idle' # 完全重置 station['userid'] = None testReqStatus = "cancel" elif action == 'finish': # 如果刚刚工位状态为running,需要更新测试项的结束试验时间 if station["status"] == "running": testReqItem['endTime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") testReqStatus = "finished" # 更新数据库中 testReq 测试项的试验完成时间 self.updateTableRecords('testReq', [{"id": testReqItem['id'], "endTime": testReqItem['endTime'], "status": testReqStatus}]) station['status'] = 'finished' station['userid'] = None # 更新数据库中 testReq 测试项的状态 if action in ['stop', 'cancel'] and testReqStatus: self.updateTableRecords('testReq', [{"id": testReqItem['id'], "status": testReqStatus}]) except Exception as error: pass if action not in ['finish', 'stop']: stationKeys = ['dutName', 'dutSN', 'dutProject', 'dutPhase', 'dutTestReq', 'itemOnGoing', 'itemFinished', 'dutStatus'] for key in stationKeys: station[key] = '' station['itemOnGoingIndex'] = 0 station['dut'] = None station_return = station if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')): station_return = station.to_dict() value_return = {"status": "success", "message": "dtStationDetachDut success", "station": station_return} for result in result_array: if "status" not in result or result["status"] != "success": value_return = {"status": "error", "message": "dtStationDetachDut error", "station": station_return} return value_return def dtStationAssignTestReq(self, machine_sn, station_sn, dut_sn, testReqIndex): value_return = {'status': "error"} print_with_timestamp("DtmMachineService dtStationAssignTestReq", machine_sn, station_sn, dut_sn, testReqIndex) machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine: station = find_item(machine['stations'], 'SN', station_sn) else: return value_return dut = station['dut'] """ response = self.getTableRecords('dutList', {'SN': dut_sn}) if 'data' in response: dut = response['data'] # ---- if isinstance(dut, list): # ---- dut = dut[0] # ---- else: return value_return """ first_dut_sn = dut['SN'] # 20250827 cyx 为了兼容一个工位(通道)有多个测试样品 response = self.getTableRecords('testReq', {'SN': first_dut_sn}) if 'data' in response: dut['testReqList'] = response['data'] # 将数据库中的字段名,重新更改键值,方便后续程序和前端处理 for testReq in dut['testReqList']: if 'dropHeight' in testReq: testReq['height'] = testReq['dropHeight'] if 'dropItem' in testReq: testReq['item'] = testReq['dropItem'] if 'dropCycles' in testReq: testReq['count'] = testReq['dropCycles'] if 'dropDirection' in testReq: testReq['dropDirection'] = testReq['dropDirection'] else: return value_return station['itemOnGoing'] = "" station['itemFinished'] = 0 if dut and 'testReqList' in dut and isinstance(dut['testReqList'], list) and len( dut['testReqList']) > testReqIndex: # station['dut'] = dut # 20250827 cyx 修改,在dtStationAttachDut 已经执行station['dut'] = dut station['dutTestReq'] = dut['testReqList'][testReqIndex]['item'] station['itemCount'] = dut['testReqList'][testReqIndex]['count'] station['itemHeight'] = dut['testReqList'][testReqIndex]['height'] station['itemDirection'] = dut['testReqList'][testReqIndex]['dropDirection'] station['itemOnGoingIndex'] = testReqIndex # 第几项测试 if 'dtMachineSN' in station and station['dtMachineSN']: dtMachineSN = station['dtMachineSN'] try: if OFF_LINE_FOR_DEBUG: result1 = {'status': 'success'} result2 = {'status': 'success'} result3 = {'status': 'success'} result4 = {'status': 'success'} else: result1 = self.set_station_dropheight(dtMachineSN, station['SN'], dut['testReqList'][testReqIndex]['height']) result2 = self.set_station_cycles(dtMachineSN, station['SN'], dut['testReqList'][testReqIndex]['count']) result3 = self.set_station_finished(dtMachineSN, station['SN'], 0) result4 = self.set_station_dropdirection(dtMachineSN, station['SN'], dut['testReqList'][testReqIndex]['dropDirection']) if all('status' in result and result['status'] == 'success' for result in [result1, result2, result3, result4]): station_return = station if hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')): station_return = station.to_dict() value_return = { 'status': "success", 'message': "assignTestReq to station success", "station": station_return } else: value_return['message'] = "Failed to set station parameters" except Exception as error: value_return = {'status': "error", "message": "set station params ", "error": error} else: value_return = {'status': "error", 'message': "testReqList 不满足条件"} return value_return def set_station_dropheight(self, dtMachineSN, station_no, height): value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} dropheight error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].set_station_dropheight(station_no, height) else: return value_return def set_station_cycles(self, dtMachineSN, station_no, cycles): value_return = {'status': 'error', 'message': f"set station {dtMachineSN}:{station_no} cycles error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].set_station_cycles(station_no, cycles) else: return value_return def set_station_finished(self, dtMachineSN, station_no, finished): value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} finished error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].set_station_finished(station_no, finished) else: return value_return return value_return def set_station_dropdirection(self, dtMachineSN, station_no, direction): value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} direction error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].set_station_dropdirection(station_no, direction) else: return value_return return value_return # 给PLC设置 待测试的工件信息 def set_station_dutInfo(self, dtMachineSN, station_no, dutInfo): # dutInfo 有可能是数组 value_return = {'status': 'error', 'message': f"set station{dtMachineSN}:{station_no} DUT info error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].set_station_dutInfo(station_no, dutInfo) else: return value_return return value_return def read_station_dropheight(self, dtMachineSN, station_no): value_return = {'status': 'error', 'message': f"read station{dtMachineSN}:{station_no} dropheight error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].read_station_dropheight(station_no) else: return value_return return value_return def read_station_cyclesFinished(self, dtMachineSN, station_no): value_return = {'status': 'error', 'message': f"read station{dtMachineSN}:{station_no} cyclesFinished error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].read_station_cyclesFinished(station_no) else: return value_return return value_return def read_station_cyclesFinishedBulk(self, dtMachineSN, station_no, result_index, length): value_return = {'status': 'error', 'message': f"read station{dtMachineSN}:{station_no} cyclesFinishedBulk error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].read_station_cyclesFinishedBulk(station_no, result_index, length) else: return value_return return value_return def read_station_dropParamsBulk(self, dtMachineSN, station_no, length): value_return = {'status': 'error', 'message': f"read station{dtMachineSN}:{station_no} dropParamsBulk error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].read_station_dropParamsBulk(station_no, length) else: return value_return return value_return def start_station(self, machine_sn, station_no): value_return = {'status': 'error', 'message': f"start station{machine_sn}:{station_no} error"} machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine and 'virt_plc_device' in machine: result = machine['virt_plc_device'].start_station(station_no) if result.get('status') and result['status'] == 'success': value_return = {'status': 'success', 'message': f"start station{machine_sn}:{station_no} success"} return value_return def resume_station(self, machine_sn, station_no): value_return = {'status': 'error', 'message': f"resume station{machine_sn}:{station_no} error"} machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine and 'virt_plc_device' in machine: result = machine['virt_plc_device'].resume_station(station_no) if result.get('status') and result['status'] == 'success': value_return = {'status': 'success', 'message': f"resume station{machine_sn}:{station_no} success"} return value_return def stop_station(self, machine_sn, station_no): value_return = {'status': 'error', 'message': f"stop {machine_sn}:{station_no} error"} machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine and 'virt_plc_device' in machine: result = machine['virt_plc_device'].stop_station(station_no) if result.get('status') and result['status'] == 'success': value_return = {'status': 'success', 'message': f"stop station{machine_sn}:{station_no} success"} return value_return def pause_station(self, machine_sn, station_no): machine = find_item(self.dtMachineStationData, 'SN', machine_sn) if machine and 'virt_plc_device' in machine: result = machine['virt_plc_device'].stop_station(station_no) if result.get('status') and result['status'] == 'success': value_return = {'status': 'success', 'message': f"stop station{machine_sn}:{station_no} success"} return value_return def get_station_value(self, dtMachineSN, station_no): value_return = {'status': 'error', 'station': {}, 'message': f"get station{dtMachineSN}:{station_no} value error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine: station = find_item(machine.get('stations'), 'SN', station_no) if station and hasattr(station, 'to_dict') and callable(getattr(station, 'to_dict')): value_return = {'status': 'success', 'station': station.to_dict(), 'message': f"get station{dtMachineSN}:{station_no} value success"} elif station: value_return = {'status': 'success', 'station': station, 'message': f"get station{dtMachineSN}:{station_no} value success"} return value_return def get_all_machine_sn(self): # 使用列表推导式提取所有的 'SN' 值 return [machine['SN'] for machine in self.dtMachineStationData] def get_machines_value(self, machines_sn_list): machines_return = [] machines_matched = find_item(self.dtMachineStationData, 'SN', machines_sn_list) for machine in machines_matched: new_data = {} for key in machine: if key != 'virt_plc_device' and key != 'poll_thread' and key != 'register01' and \ key != 'register02' and key != 'register03' and key != 'register04' and key != "stations": new_data[key] = machine[key] if 'stations' in machine: new_data['stations'] = [] for station in machine.get('stations'): new_data['stations'].append(station.to_dict()) machines_return.append(new_data) value_return = {'status': 'success', 'machines': machines_return, 'message': f"get machines{machines_sn_list} value success"} return value_return def register_test(self, dtMachineSN, action, address, value): value_return = {'status': 'error', 'message': f"register {action} {dtMachineSN}:{address} error"} machine = find_item(self.dtMachineStationData, 'SN', dtMachineSN) if machine and 'virt_plc_device' in machine: return machine['virt_plc_device'].register_test(action, address, value) else: return value_return return value_return """ 跌落试验机 通过modbus client 与 modbus server 通讯 modbus server 通过串口 rs232 rs485 与 跌落机的控制器(PLC)通讯 """ """ station = { 'dtMachineSN': machine_config.SN, 'dtMachineLabel': machine_config.label, 'label': f"工位{i}", 'SN': f"0{i}", 'key': index, 'formal': True, 'testReqExpanded': False, 'status': 'idle', 'index': stationIndex } """