diff --git a/dtMachineService.py b/dtMachineService.py index 0361c80..821f54e 100644 --- a/dtMachineService.py +++ b/dtMachineService.py @@ -142,10 +142,10 @@ class DtmMachineService: # 业务配置 self.dutDirectionMode = dutDirectionMode # 2=从PLC HMI读取跌落参数 - # 回调函数注册 - self.cb_station_result_change = cb_station_result_change # 通知前端工位结果更新 - self.cb_machine_data_change = cb_machine_data_change # 通知前端设备状态更新 - self.cb_insert_result = cb_insert_result # 触发本地存储和LMIS上传 + # 回调函数注册(用于将解析后的领域事件分发给外部消费者) + self.cb_station_result_change = cb_station_result_change # 发布:工位结果变更事件 + self.cb_machine_data_change = cb_machine_data_change # 发布:设备状态变更事件 + self.cb_insert_result = cb_insert_result # 发布:测试结果产生事件(触发存储&上传) # 设备配置数据(从数据库加载) self.machineConfigsListData = [] @@ -521,11 +521,11 @@ class DtmMachineService: cb_insert_result_copied = dict(cb_test_result) self.test_result_add_dut_info(station, cb_insert_result_copied, dut) # print_with_timestamp(f"handle_test_result_registers_value --2 {cb_insert_result_copied} ") - if self.cb_insert_result: # 将结果进行回调处理,存入数据库和上传LIMS + if self.cb_insert_result: # 发布测试结果事件(外部处理存储&上传) self.cb_insert_result(cb_insert_result_copied) else: self.test_result_add_dut_info(station, cb_test_result, station['dut']) - if self.cb_insert_result: # 将结果进行回调处理,存入数据库和上传LIMS + if self.cb_insert_result: # 发布测试结果事件(外部处理存储&上传) self.cb_insert_result(cb_test_result) directionCodeOngoing = test_result.get('directionCode') # 此条记录的跌落方向码 diff --git a/dtmgtApp.py b/dtmgtApp.py index 9919912..429b1fd 100644 --- a/dtmgtApp.py +++ b/dtmgtApp.py @@ -370,11 +370,13 @@ SN TEXT, -- 测试样品序列号 """ -def run_insert_result(): +def local_db_result_consumer(): + """后台线程:消费测试结果队列并批量写入本地 SQLite(TestReq 表)""" while True: try: insert_records = [] - while not ws_message_send_queue.empty(): + # 从本地结果队列中批量取出待入库记录 + while not result_to_localDB_queue.empty(): # 尝试从队列中获取元素,如果队列为空,则阻塞等待 result_row = result_to_localDB_queue.get() insert_records.append(result_row) @@ -392,7 +394,16 @@ def run_insert_result(): def cb_insert_result(test_req_result): - """回调函数 - 处理测试结果""" + """ + 测试结果事件处理器(由 DtmMachineService 触发) + + 职责: + 1. 将结果推送到本地数据库入库队列 + 2. 提交至 LMIS 上传系统(异步处理) + + 参数: + test_req_result: 测试结果字典,包含所有测试字段 + """ # 1. 放入结果队列供本地数据库写入 result_to_localDB_queue.put_nowait(test_req_result) @@ -401,6 +412,15 @@ def cb_insert_result(test_req_result): def cb_station_result_change(station, data): + """ + 工位结果变更事件处理器(由 DtmMachineService 触发) + + 职责:将工位结果变更消息推送至 WebSocket 队列,通知前端 UI 更新 + + 参数: + station: 工位对象 + data: 结果数据字典 + """ if data is None: return json_data = {"status": "success", "command": "station_result_change", "data": data} @@ -412,7 +432,19 @@ def cb_station_result_change(station, data): print("Message queue is not initialized yet!") -def cb_machine_data_change(machine_sn_list, data): # 通知和station 无关的machine data,目前重点是connectState +def cb_machine_data_change(machine_sn_list, data): + """ + 设备状态变更事件处理器(由 DtmMachineService 触发) + + 职责:将设备状态变更消息推送至 WebSocket 队列,通知前端 UI 更新 + + 参数: + machine_sn_list: 设备编号列表 + data: 设备数据字典 + + 注意: + - 单台设备更新时会移除队列中该设备的旧消息,避免积压 + """ if data is None: return json_data = {"status": "success", "command": "machine_data_change", "data": data} @@ -432,7 +464,7 @@ def run_flask_server(flask_stop_event=None): # WebSocket 消息分发器:从队列中取出消息并发送给所有客户端 -async def ws_message_send_dispatcher(): +async def ws_message_dispatcher(): while True: # 创建一个字典来存储以 SN 为键的最新记录 latest_machines = {} @@ -516,8 +548,8 @@ def run_websockets(): asyncio.set_event_loop(loop) webSocket_server = websockets.serve(websocket_server_handle, 'localhost', 5080) loop.run_until_complete(webSocket_server) - # 在事件循环中启动消息分发器 - loop.create_task(ws_message_send_dispatcher()) + # 在事件循环中启动消息分发器(消费 ws_message_send_queue,将变更推送给所有前端客户端) + loop.create_task(ws_message_dispatcher()) try: loop.run_forever() except KeyboardInterrupt: @@ -840,8 +872,8 @@ def main(args): socketio_thread.daemon = True socketio_thread.start() - # 启动写入从PLC得到测试结果的thread - insert_result_thread = threading.Thread(target=run_insert_result) + # 启动写入从PLC得到测试结果的线程(后台消费本地结果队列并写入 SQLite) + insert_result_thread = threading.Thread(target=local_db_result_consumer) insert_result_thread.daemon = True insert_result_thread.start()