import array import time import datetime import utils from utils import print_with_timestamp import re from pymodbus.client import ModbusSerialClient as ModbusSerialClient from pymodbus.exceptions import ModbusException, ConnectionException import threading from utils import PLCDropParamsData MOCK_MODE = False # 默认使用真实 PLC,通过命令行参数 --mock 启用模拟模式 PLC_DROP_PARAMS = [ {'dropCode': 1, 'dropCycles': 10, 'dropHeight': 7500}, {'dropCode': 2, 'dropCycles': 10, 'dropHeight': 7600}, {'dropCode': 3, 'dropCycles': 10, 'dropHeight': 7500}, {'dropCode': 4, 'dropCycles': 8, 'dropHeight': 7600}, {'dropCode': 5, 'dropCycles': 7, 'dropHeight': 7500}, {'dropCode': 6, 'dropCycles': 16, 'dropHeight': 7600}, ] # 扩展到4个通道 STATION_REG_BASE = [ {'drop_params': 'D3020', 'dut_info': 'HD0300', 'station_status': 'D11', 'station_cycles': 'HD0164', 'station_read_index': 'D15', 'station_drop_result': 'HD0510'}, {'drop_params': 'D4020', 'dut_info': 'HD2300', 'station_status': 'D12', 'station_cycles': 'HD0264', 'station_read_index': 'D16', 'station_drop_result': 'HD1110'}, {'drop_params': 'D5020', 'dut_info': 'HD4300', 'station_status': 'D13', 'station_cycles': 'HD0364', 'station_read_index': 'D17', 'station_drop_result': 'HD1710'} ] class ModbusPLC_Mock: def __init__(self, plc_port, comm_config): self.plc_port = plc_port self.plc_com_state = 0 self.mock_mode = True # 添加模拟模式开关 # 初始化模拟寄存器 self._mock_registers = {} self._initialize_mock_registers() self._connect_state_cache = [0] * 5 # 初始化连接状态缓存,假设5个寄存器 self._connect_state_address = self.conv_register_address('D10') self._slave = 1 self._poll_connect_state_interval = 0.5 # 500ms self._serial_lock = threading.Lock() # 用于同步串口访问 self._stop_event = threading.Event() self._timer_thread = None # 为每个通道创建模拟线程控制变量 self.station_threads = {} # 存储每个通道的线程 self.station_thread_controls = {} # 存储每个通道的控制变量 self.plc_com_state = 1 # 模拟连接成功 self._start_connect_state_polling() def _initialize_mock_registers(self): """初始化模拟寄存器""" # 初始化方向跌落参数 drop_params_registers = PLCDropParamsData.to_registers(PLC_DROP_PARAMS) drop_params_base_addr1 = self.conv_register_address(STATION_REG_BASE[0].get('drop_params')) drop_params_base_addr2 = self.conv_register_address(STATION_REG_BASE[1].get('drop_params')) for index, value in enumerate(drop_params_registers): self._mock_registers[drop_params_base_addr1 + index] = value self._mock_registers[drop_params_base_addr2 + index] = value # 初始化连接状态寄存器 (D10-D14) # D10: 设备连接状态 # D11-D14: 通道工作状态 for i in range(10, 20): type_char, addr_val = 'D', i modbus_addr = self.conv_register_address(f"{type_char}{addr_val}") if i == 10: self._mock_registers[modbus_addr] = 0x0001 self._mock_registers[i] = 0x0001 else: self._mock_registers[modbus_addr] = 0x0000 self._mock_registers[i] = 0x0000 # 初始化跌落参数区域 (HD3020-HD3048) for i in range(3020, 3049): type_char, addr_val = 'HD', i modbus_addr = self.conv_register_address(f"{type_char}{addr_val}") self._mock_registers[modbus_addr] = 0x0000 # 初始化样品信息区域 (HD300-HD462) - 扩展到支持4个通道 for i in range(300, 463*4): # 扩展区域以支持更多通道 type_char, addr_val = 'HD', i modbus_addr = self.conv_register_address(f"{type_char}{addr_val}") self._mock_registers[modbus_addr] = 0x0000 # 初始化试验结果区域 (HD510-HD2949) - 扩展以支持4个通道,每个通道10组结果 for i in range(510, 2950): type_char, addr_val = 'HD', i modbus_addr = self.conv_register_address(f"{type_char}{addr_val}") self._mock_registers[modbus_addr] = 0x0000 def conv_to_plc_address(self, type_char, value): # 放在modbus tcp client 进行转换 return int(value) """ if type_char == 'M': return int(value) if type_char =='D' or type_char == 'HD': return int(value) + 41088 return int(value) """ def conv_register_address(self, s): address_type = None # 使用正则表达式检查格式,确保是一个或两个字母后跟三或四位数字 pattern = re.compile(r'^([MD]|HD)\d{1,6}$') if not pattern.match(s): raise ValueError( "Invalid format. The string must start with 'M', 'D', or 'HD' followed by three or four digits.") address_type = s[0] if s[1].isdigit() else s[:2] # 获取前缀,可以是'M', 'D', 或者 'HD' address_value = int(s[len(address_type):]) # 获取后面的数字部分 converted_address = address_value # 根据PLC的前缀来决定如何映射到Modbus地址 if address_type == 'M': converted_address = address_value # 假设'M'类型的地址映射到Modbus的Coil modbus_type = "Coil" elif address_type == 'HD': # 假设'D'类型的地址映射到Modbus的Holding Register converted_address = address_value + 41088 modbus_type = "Holding_Register" elif address_type == 'D': # 'HD'可能指的是特殊功能或更高位数据的地址 converted_address = address_value modbus_type = "Data_Register" return converted_address def _start_connect_state_polling(self): """启动后台线程定时读取连接状态""" self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True) self._timer_thread.start() def _poll_connect_state(self): """后台线程函数,每隔300ms读取连接状态并更新缓存""" while not self._stop_event.is_set(): # 读取连接状态,单元ID为1,读取5个寄存器 try: # 使用锁保护串口访问 # print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}") result = self.read_holding_registers(self._connect_state_address, 9, self._slave) # print_with_timestamp(f"modbus plc end read connect state---1 {result}") if result and result.get('status') == 'success' and result.get('data'): self._connect_state_cache = result['data'] else: self._connect_state_cache = [0] * 9 except Exception as e: print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red') self._connect_state_cache = [0] * 9 time.sleep(self._poll_connect_state_interval) # 每隔500ms def get_connect_state_cache(self, connect_state_address, slave): self._connect_state_address = connect_state_address self._slave = slave """返回缓存的连接状态,不进行实时读取""" if self.plc_port == 'com30' and 0: print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}") return self._connect_state_cache def read_coils(self, address, count, unit=1): # print("modbus plc read_coils", address) # 模拟线圈读取 # 直接使用传入的Modbus地址 start_addr = int(address) bits = [] for i in range(count): # 简单模拟,返回False bits.append(False) return {"status": "success", "data": bits} def write_single_coil(self, address, value, unit=1): print("modbus plc write_single_coil ", address, value) # 模拟线圈写入 # 直接使用传入的Modbus地址 addr = int(address) print_with_timestamp(f"Mock write coil {addr} = {value}", color='blue') return {"status": "success"} def stop_polling(self): """停止后台线程(包括连接状态轮询和通道模拟线程)""" print_with_timestamp(f"stop_polling called for {self.plc_port}", color='yellow') # 设置停止事件 self._stop_event.set() # 停止所有通道模拟线程 for station_idx in list(self.station_thread_controls.keys()): self.station_thread_controls[station_idx]['stop'] = True print_with_timestamp(f"Setting stop flag for station {station_idx}", color='yellow') # 等待所有通道线程结束(增加等待时间到 3 秒) for station_idx, thread in list(self.station_threads.items()): if thread and thread.is_alive(): print_with_timestamp(f"Waiting for station {station_idx} thread to stop...", color='yellow') thread.join(timeout=3) # 增加到 3 秒 if thread.is_alive(): print_with_timestamp(f"Station {station_idx} thread did not stop in time!", color='red') else: print_with_timestamp(f"Stopped station {station_idx} simulation thread", color='green') # 停止连接状态轮询线程 if self._timer_thread and self._timer_thread.is_alive(): self._timer_thread.join(timeout=1) print_with_timestamp(f"Stopped polling thread for {self.plc_port}", color='green') def read_holding_registers(self, address, count, unit=1): # 模拟寄存器读取 # 直接使用传入的Modbus地址 # 如果已经停止,返回空数据避免触发回调 if self._stop_event.is_set(): return {"status": "error", "message": "PLC stopped"} start_addr = int(address) values = [] for i in range(count): reg_addr = start_addr + i values.append(self._mock_registers.get(reg_addr, 0x0000)) if self.plc_port == 'com7': pass #print_with_timestamp(f"modbus plc read_holding_registers {self.plc_port} {start_addr} {count} {values}") return {"status": "success", "data": values} def read_input_registers(self, address, count, unit=1): # 模拟输入寄存器读取 # 直接使用传入的Modbus地址 start_addr = int(address) values = [] for i in range(count): reg_addr = start_addr + i values.append(self._mock_registers.get(reg_addr, 0x0000)) return {"status": "success", "data": values} def write_single_register(self, address, value, unit=1): print(f"modbus plc write_single_register-", address, value, unit) # 模拟寄存器写入 # 直接使用传入的Modbus地址 addr = int(address) self._mock_registers[addr] = value # 检查是否有DUT信息写入,如果是则启动测试 self._check_and_start_test_on_dut_write(addr, value) print_with_timestamp(f"Mock write register {addr} = {value}", color='blue') return {"status": "success"} def write_registers(self, address, values, unit=1): # 模拟多个寄存器写入 # 直接使用传入的Modbus地址 start_addr = int(address) for i, value in enumerate(values): self._mock_registers[start_addr + i] = value # 检查是否是DUT信息写入 self._check_and_start_test_on_dut_write(start_addr + i, value) print_with_timestamp(f"Mock write registers {start_addr} = {values}", color='blue') return {"status": "success"} def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01): # print(f"modbus plc read holding registers {address} {length}") resp = {'status': 'error'} result = self.read_holding_registers(address, length, plc_no) if result and 'status' in result and result['status'] == 'success' and 'data' in result: resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']} return resp def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01): resp = {'status': 'error'} result = self.read_coils(address, length, plc_no) utils.condition_print('plc_read_bits result', result) # cyx if 'status' in result and result['status'] == 'success' and 'data' in result: bits = result['data'] resp = {'status': 'success', 'length': len(bits), 'data': bits} return resp def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000): return self.write_single_register(address, data, plc_no) def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000): return self.write_single_register(address, data, plc_no) def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]): # print("modbus plc write words",plc_no,start_addr,length,data) # cyx return self.write_registers(start_addr, data, plc_no) def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01): return self.write_single_coil(address, data, plc_no) def _check_and_start_test_on_dut_write(self, addr, value): """检查是否是DUT信息写入并启动测试""" if value != 1: return # 检查写入的地址是否是某个通道的DUT信息标志位 for station_idx, station_reg in enumerate(STATION_REG_BASE): dut_info_addr = self.conv_register_address(station_reg.get('dut_info', f'HD{station_idx}000')) if addr == dut_info_addr: # DUT信息写入标志为1,启动该通道的测试 station_status_addr = self.conv_register_address(station_reg.get('station_status', f'D{11+station_idx}')) self._mock_registers[station_status_addr] = 1 # 设置状态为"试验中" # 启动该通道的模拟线程 self._start_station_simulation_thread(station_idx) break def _start_station_simulation_thread(self, station_idx): """为指定通道启动模拟线程""" # 如果该通道已经有线程在运行,则先停止它 if station_idx in self.station_threads and self.station_threads[station_idx].is_alive(): if station_idx in self.station_thread_controls: self.station_thread_controls[station_idx]['stop'] = True self.station_threads[station_idx].join(timeout=2) # 创建新的控制变量 control = {'stop': False} self.station_thread_controls[station_idx] = control # 启动新的模拟线程 thread = threading.Thread(target=self._simulate_station_test, args=(station_idx, control), daemon=True) self.station_threads[station_idx] = thread thread.start() print_with_timestamp(f"Started simulation thread for station {station_idx}", color='green') def _simulate_station_test(self, station_idx, control): """模拟指定通道的测试过程""" station_reg = STATION_REG_BASE[station_idx] # 获取相关寄存器地址 station_status_addr = self.conv_register_address(station_reg.get('station_status')) station_cycles_addr = self.conv_register_address(station_reg.get('station_cycles')) station_read_index_addr = self.conv_register_address(station_reg.get('station_read_index')) station_drop_result_addr = self.conv_register_address(station_reg.get('station_drop_result')) # 初始化计数器 direction_index = 0 # 当前方向索引 directions = PLC_DROP_PARAMS # 所有方向的参数 print_with_timestamp(f"Starting simulation for station {station_idx+1}", color='blue') while not control['stop'] and not self._stop_event.is_set() and direction_index < len(directions): direction = directions[direction_index] direction_cycle_count = 0 # 当前方向的跌落次数 print_with_timestamp(f"Station {station_idx} starting direction {direction['dropCode']}", color='cyan') # 设置状态为"试验中" self._mock_registers[station_status_addr] = 1 # 模拟当前方向的跌落 while direction_cycle_count < direction['dropCycles'] and not control['stop'] and not self._stop_event.is_set(): # 模拟1.5秒的跌落周期(使用可中断的 sleep) for _ in range(15): # 1.5秒 = 15 * 0.1秒 if control['stop'] or self._stop_event.is_set(): print_with_timestamp(f"Station {station_idx} 收到停止信号,退出测试", color='red') return time.sleep(0.1) # 增加跌落次数 direction_cycle_count += 1 # 更新station_cycles (一直增加) current_cycles = self._mock_registers.get(station_cycles_addr, 0) self._mock_registers[station_cycles_addr] = current_cycles + 1 # 更新station_read_index (1-10循环) current_read_index = self._mock_registers.get(station_read_index_addr, 0) new_read_index = (current_read_index % 10) + 1 self._mock_registers[station_read_index_addr] = new_read_index # 生成并写入模拟结果数据 result_data = self._generate_mock_result_data( station_idx, direction, direction_cycle_count, current_cycles + 1 ) # 计算结果写入位置 (根据read_index) result_offset = (new_read_index - 1) * 44 # 每组结果44个寄存器 for i, value in enumerate(result_data): self._mock_registers[station_drop_result_addr + result_offset + i] = value print_with_timestamp( f"Station {station_idx} direction {direction['dropCode']} cycle {direction_cycle_count}/{direction['dropCycles']}, " f"total cycles: {current_cycles + 1}, read_index: {new_read_index}", color='blue' ) # 当前方向完成 if direction_index = len(directions) and not control['stop'] and not self._stop_event.is_set(): self._mock_registers[station_status_addr] = 3 # 完成状态 print_with_timestamp(f"Station {station_idx} all directions completed!", color='green') def _generate_mock_result_data(self, station_idx, direction, direction_cycle_count, total_cycles): """生成模拟的测试结果数据""" from utils import PLCTestReqResultData # 获取当前时间 now = datetime.datetime.now() # 生成44个寄存器的数据 (根据PLCTestReqResultData结构) result_registers = [0] * 44 # beginYear, beginMonth, beginDay, beginHour, beginMinute, beginSecond result_registers[0] = now.year result_registers[1] = now.month result_registers[2] = now.day result_registers[3] = now.hour result_registers[4] = now.minute result_registers[5] = now.second # endYear, endMonth, endDay, endHour, endMinute, endSecond result_registers[6] = now.year result_registers[7] = now.month result_registers[8] = now.day result_registers[9] = now.hour result_registers[10] = now.minute result_registers[11] = now.second # directionCode result_registers[12] = direction['dropCode'] # station_no result_registers[13] = station_idx + 1 # itemSN (流水号) - 2个寄存器 result_registers[14] = total_cycles & 0xFFFF result_registers[15] = (total_cycles >> 16) & 0xFFFF # dropHeight - 2个寄存器 result_registers[16] = direction['dropHeight'] & 0xFFFF result_registers[17] = (direction['dropHeight'] >> 16) & 0xFFFF # dropSpeed - 2个寄存器 (模拟速度) result_registers[18] = 1500 & 0xFFFF result_registers[19] = 0 # dutDropCycles (样品累计已完成跌落次数) - 2个寄存器 result_registers[20] = total_cycles & 0xFFFF result_registers[21] = (total_cycles >> 16) & 0xFFFF # itemCurrentCycles (样品此方向已经完成跌落次数) - 2个寄存器 result_registers[22] = direction_cycle_count & 0xFFFF result_registers[23] = (direction_cycle_count >> 16) & 0xFFFF # stationDropCycles (本通道自从维护以来累计跌落次数) - 2个寄存器 result_registers[24] = total_cycles & 0xFFFF result_registers[25] = (total_cycles >> 16) & 0xFFFF # machine_sn - 18个寄存器代表36个字节 machine_sn = f"MOCK_MACHINE_{station_idx}" sn_registers = utils.string_to_registers(machine_sn, 36) for i, reg in enumerate(sn_registers): if i < 18: result_registers[26 + i] = reg return result_registers def close(self): # 停止所有模拟线程 for station_idx in self.station_thread_controls: self.station_thread_controls[station_idx]['stop'] = True # 等待所有线程结束 for station_idx, thread in self.station_threads.items(): if thread.is_alive(): thread.join(timeout=1) if self.mock_mode: return self.client.close() class ModbusPLC_Real: def __init__(self, plc_port, comm_config): self.plc_port = plc_port self.plc_com_state = 0 self.client = None self._connect_state_cache = [0] * 5 # 初始化连接状态缓存,假设5个寄存器 self._connect_state_address = 10 self._slave = 1 self._poll_connect_state_interval = 0.5 # 500ms self._serial_lock = threading.Lock() # 用于同步串口访问 self._stop_event = threading.Event() self._timer_thread = None self._initialize_client(comm_config) self._start_connect_state_polling() def _initialize_client(self, comm_config): try: self.client = ModbusSerialClient(method="rtu", port=self.plc_port, baudrate=comm_config.get('baudrate'), parity=comm_config.get('parity'), timeout=1) self.client.connect() if self.port_is_opened(): self.plc_com_state = 1 # 表示串口能打开 print_with_timestamp(f"Modbus PLC open {self.client} {self.plc_port} {comm_config['baudrate']} " f"{comm_config['parity']} success", color='green') # cyx except Exception as e: print_with_timestamp(f"Mmodbus PLC open {self.plc_port} " f"{comm_config.get('baudrate')} {comm_config.get('parity')} error", color='red') pass def conv_to_plc_address(self, type_char, value): # 放在modbus tcp client 进行转换 return int(value) """ if type_char == 'M': return int(value) if type_char =='D' or type_char == 'HD': return int(value) + 41088 return int(value) """ def connect(self): try: if not self.client.connect(): print("Failed to connect to Modbus server in PLC") return False print("Modbus serial Client Connected to PLC by", self.plc_port) return True except ConnectionException as e: print(f"Connection failed: {str(e)}") return False def reconnect(self): return True # 20250908 if self.client: self.client.close() return self.connect() def port_is_opened(self): # Check if the serial connection is open if self.client is None: return False return self.client.socket is not None if hasattr(self.client, 'socket') else False def _start_connect_state_polling(self): """启动后台线程定时读取连接状态""" self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True) self._timer_thread.start() def _poll_connect_state(self): """后台线程函数,每隔300ms读取连接状态并更新缓存""" while not self._stop_event.is_set(): if self.plc_com_state == 0: # 尝试重新连接串口 try: self.client.connect() if self.port_is_opened(): self.plc_com_state = 1 self._poll_connect_state_interval = 0.5 else: self.plc_com_state = 0 self._poll_connect_state_interval = 5 # 串口不通,为5秒 except Exception as e: self.plc_com_state = 0 self._poll_connect_state_interval = 5 else: # 读取连接状态,单元ID为1,读取5个寄存器 try: # 使用锁保护串口访问 # print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}") result = self.read_holding_registers(self._connect_state_address, 9, self._slave) # print_with_timestamp(f"modbus plc end read connect state---1 {result}") if result and result.get('status') == 'success' and result.get('data'): self._connect_state_cache = result['data'] else: self._connect_state_cache = [0] * 9 except Exception as e: print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red') self._connect_state_cache = [0] * 9 time.sleep(self._poll_connect_state_interval) # 每隔500ms def get_connect_state_cache(self, connect_state_address, slave): self._connect_state_address = connect_state_address self._slave = slave """返回缓存的连接状态,不进行实时读取""" if self.plc_port == 'com30'and 0: print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}") return self._connect_state_cache def stop_polling(self): """停止后台线程""" self._stop_event.set() if self._timer_thread and self._timer_thread.is_alive(): self._timer_thread.join(timeout=1) def get_connect_state(self, connect_state_address, slave): if self.plc_com_state == 0: return 0x00 try: result = self.read_holding_registers(connect_state_address, 0x05, slave) # 20250905 读取5个 叠加运行状态 #print_with_timestamp(f"modbus plc get_connect_state com={self.plc_port} addr={connect_state_address} " # f"{slave} {result}") if result and 'status' in result and result['status'] == 'success' and 'data' in result: return result['data'] else: return [0x00, 0x00, 0x00, 0x00, 0x00] except Exception as e: return 0x00 def read_coils(self, address, count, unit=1): # print("modbus plc read_coils", address) with self._serial_lock: try: result = self.client.read_coils(address, count, slave=unit) if result.isError(): return {"status": "error", "msg": str(result)} return {"status": "success", "data": result.bits} except (ModbusException, ConnectionException) as e: print(f"Read error: {str(e)}") if self.reconnect(): return self.read_coils(address, count, unit) else: return {"status": "error", "msg": str(e)} def write_single_coil(self, address, value, unit=1): print("modbus plc write_single_coil ", address, value) with self._serial_lock: try: result = self.client.write_coil(address, value, 1) if result.isError(): print("modbus plc write_single_coil error", str(result)) return {"status": "error", "msg": str(result)} return {"status": "success"} except (ModbusException, ConnectionException) as e: print(f"Write error: {str(e)}") if self.reconnect(): return self.write_single_coil(address, value, unit) else: return {"status": "error", "msg": str(e)} def read_holding_registers(self, address, count, unit=1): with self._serial_lock: if not self.port_is_opened() and not self.reconnect(): return {"status": "error", "msg": "Unable to connect to open serial port"} try: result = self.client.read_holding_registers(address, count, slave=unit) if result.isError(): return {"status": "error", "msg": str(result)} return {"status": "success", "data": result.registers} except (ModbusException, ConnectionException) as e: return {"status": "error", "msg": str(e)} def read_input_registers(self, address, count, unit=1): with self._serial_lock: if not self.port_is_opened() and not self.reconnect(): return {"status": "error", "msg": "Unable to open serial port"} try: result = self.client.read_input_registers(address, count, slave=unit) if result.isError(): return {"status": "error", "msg": str(result)} return {"status": "success", "data": result.registers} except (ModbusException, ConnectionException) as e: print(f"Read error: {str(e)}") if self.reconnect(): return self.read_input_registers(address, count, unit) else: return {"status": "error", "msg": str(e)} def write_single_register(self, address, value, unit=1): print(f"modbus plc write_single_register-", address, value, unit) with self._serial_lock: if not self.port_is_opened() and not self.reconnect(): return {"status": "error", "msg": "Unable to open serial port"} try: try: result = self.client.write_register(address, value, unit) except Exception as e: print(f"modbus plc write_single_register- error: {str(e)}") if result.isError(): print(f"modbus plc write_single_register- error-2: ", result) return {"status": "error", "msg": str(result)} return {"status": "success"} except (ModbusException, ConnectionException) as e: print(f"Write error: {str(e)}") if self.reconnect(): return self.write_single_register(address, value, unit) else: return {"status": "error", "msg": str(e)} def write_registers(self, address, values, unit=1): with self._serial_lock: if not self.port_is_opened() and not self.reconnect(): return {"status": "error", "msg": "Unable to open serial port"} try: result = self.client.write_registers(address, values, unit) #print(f"modbus plc write_registers {address} {values} {self.plc_port} {result}") if result.isError(): return {"status": "error", "msg": str(result)} # We don't raise ValueError anymore, consistent with other methods. return {"status": "success"} except (ModbusException, ConnectionException) as e: print(f"Write error: {str(e)}") if self.reconnect(): return self.write_registers(address, values, unit) # Retry the write operation after successful reconnection else: return {"status": "error", "msg": str(e)} # Return error if reconnection fails def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01): # print(f"modbus plc read holding registers {address} {length}") resp = {'status': 'error'} result = self.read_holding_registers(address, length, plc_no) if result and 'status' in result and result['status'] == 'success' and 'data' in result: resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']} return resp def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01): resp = {'status': 'error'} result = self.read_coils(address, length, plc_no) utils.condition_print('plc_read_bits result', result) # cyx if 'status' in result and result['status'] == 'success' and 'data' in result: bits = result['data'] resp = {'status': 'success', 'length': len(bits), 'data': bits} return resp def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000): return self.write_single_register(address, data, plc_no) def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000): return self.write_single_register(address, data, plc_no) def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]): # print("modbus plc write words",plc_no,start_addr,length,data) # cyx return self.write_registers(start_addr, data, plc_no) def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01): return self.write_single_coil(address, data, plc_no) def close(self): self.client.close() # 根据 MOCK_MODE 的值确定实际使用的类 def get_modbus_plc_class(): """根据 MOCK_MODE 返回对应的 PLC 类""" if MOCK_MODE: return ModbusPLC_Mock else: return ModbusPLC_Real # 保持向后兼容,但使用函数动态获取 def ModbusPLC(plc_port, comm_config): """ 动态创建 PLC 实例 根据当前 MOCK_MODE 的值选择使用 Mock 或 Real 类 """ plc_class = get_modbus_plc_class() return plc_class(plc_port, comm_config)