import asyncio import json import re import sys import time import pymodbus import serial import threading import binascii import datetime import requests from pymodbus.client import ModbusTcpClient from pymodbus import ModbusException from pymodbus.exceptions import ModbusIOException import utils from utils import PLCDropParamsData, PLCTestReqResultData, print_with_timestamp PLC_CONNECT_REGISTER = 'D10' PLC_READ_INDEX_REGISTER = 'D15' # 代表1个通道(工位)的夹具最多可放多少个样品,可选数 [1 2 4] # D11 代表第1通道 的运行状态 1--试验中 0--停止中 # D12 代表第1通道 的运行状态 1--试验中 0--停止中 # D13 代表第1通道 的运行状态 1--试验中 0--停止中 # D14 代表第1通道 的运行状态 1--试验中 0--停止中 def get_visible_string(s): """ 清理字符串,移除非法字符和空字符 :param s: 原始字符串 :return: 清理后的字符串 """ if not s: return "" # 移除空字符和不可打印字符 # 只保留ASCII可打印字符 (32-126) visible_str = ''.join( char for char in s if 32 <= ord(char) <= 126 or char in '\n\r\t' ) # 移除首尾空白字符 visible_str = visible_str.strip() return visible_str class ModbusDtMachine: def __init__(self, plc_type, comm_port, comm_config={}, drop_register={}, plc_address=0x01, tcp_modbus_unit=0x01, machine_SN='DTM_STA_00001'): self.connect_thread = None self.drop_register = drop_register self.plc_address = plc_address self.tcp_modbus_unit = tcp_modbus_unit self.DTM_SN = machine_SN self.plc_address = tcp_modbus_unit self.plc_comm_lock = threading.Lock() self.client = None self.connection = None self.preConnection = None self.plc_type = plc_type self.comm = comm_port self.comm_config = comm_config self.exit_connect_thread = False self.nextTestResultBufferFlag = 0 # 从PLC读取测试结果,采用A B 双buffer制,上位机读A时,PLC写B,反过来成立 # print_with_timestamp("dtmmodbus client init", comm_port, comm_config)#cyx # 将三菱PLC使用的MXXX Dxxxx的寄存器地址转换为modbus 的数值寄存器地址 # D0160 --> 160 def conv_register_address(self, s): address_type = None # 使用正则表达式检查格式,确保是一个或两个字母后跟三或四位数字 pattern = re.compile(r'^([MD]|HD)\d{1,6}$') if not pattern.match(s): raise ValueError( "Invalid format. The string must start with 'M', 'D', or 'HD' followed by three or four digits.") address_type = s[0] if s[1].isdigit() else s[:2] # 获取前缀,可以是'M', 'D', 或者 'HD' address_value = int(s[len(address_type):]) # 获取后面的数字部分 converted_address = address_value # 根据PLC的前缀来决定如何映射到Modbus地址 if address_type == 'M': converted_address = address_value # 假设'M'类型的地址映射到Modbus的Coil modbus_type = "Coil" elif address_type == 'HD': # 假设'D'类型的地址映射到Modbus的Holding Register if self.plc_type == 'xinjie': converted_address = address_value + 41088 else: # mhi converted_address = address_value modbus_type = "Holding_Register" elif address_type == 'D': # 'HD'可能指的是特殊功能或更高位数据的地址 if self.plc_type == 'xinjie': converted_address = address_value else: # mhi converted_address = address_value modbus_type = "Data_Register" return modbus_type, converted_address def conv_drop_height(self, height): if self.plc_type == 'xinjie': height_register_value = int(100 * height) else: # mhi height_register_value = int(100 * height) return height_register_value def is_connected_to_tcp_server(self): if self.connection: return True else: return False def connect_to_modbus_server(self, host='127.0.0.1', port=5020): if self.client: # 已经连接到modbus Server ,则立即返回 print_with_timestamp(f"Modbus client--plc {self.comm} {self.plc_address} connectted state is OK", color='blue') return # 创建 Modbus 客户端实例 self.client = ModbusTcpClient(host, port=port) # 连接到 Modbus 服务器 self.connection = self.client.connect() if self.connection: print_with_timestamp( f"Modbus tcp client-- {self.tcp_modbus_unit}-{self.plc_address}-{self.comm} connected modbus tcp server successfully" , color='green') else: print_with_timestamp( f"Modbus tcp client-- {self.tcp_modbus_unit}-{self.plc_address}--{self.comm} connect to modbus tcp server error", color='red') def connect_monitor(self): while not self.exit_connect_thread: if not self.is_connected_to_tcp_server(): self.connect_to_modbus_server() # 等待 5 秒后继续下一次连接尝试 time.sleep(5) def start_connect_thread(self): self.connect_thread = threading.Thread(target=self.connect_monitor) self.connect_thread.start() def stop_connect_thread(self): self.exit_connect_thread = True self.connect_thread.join() # 等待线程结束 def set_station_dropheight(self, station_no, height): # 数据库中的height 是float 并且mm为单位 value_return = {'status': 'error', 'message': f"set station{station_no} dropheight error"} height_register_value = self.conv_drop_height(height) # 将mm 单位 的跌落高度转换 寄存器的值 try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['height']) print_with_timestamp(f'ModbusDtMachine set {self.DTM_SN} station {station_no} dropheight {height} mm', address_type, address_value, height_register_value) # cyx resp = self.client.write_registers(address_value, [height_register_value], self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f"set station {station_no} dropheight success"} else: print_with_timestamp(f"ModbusDtMachine set station {station_no} dropheight success error") except ModbusException as error: value_return['error'] = error pass return value_return def set_station_cycles(self, station_no, cycles): value_return = {'status': 'error', 'message': f"set station{station_no} cycles error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cycles']) print_with_timestamp(f"ModbusDtMachine set {self.DTM_SN} station {station_no} cycles {cycles} ", address_type, address_value, cycles) # cyx resp = self.client.write_register(address_value, cycles, self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f"set station{station_no} cycles success"} except ModbusException as error: value_return['error'] = error pass return value_return def set_station_finished(self, station_no, finished): value_return = {'status': 'error', 'message': f"set station{station_no} finished error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cyclesFinished']) resp = self.client.write_register(address_value, finished, self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f"set station{station_no} finished success"} except ModbusException as error: value_return['error'] = error pass return value_return def set_station_dropdirection(self, station_no, direction): value_return = {'status': 'error', 'message': f"set station{station_no} finished error"} direction_register = self.drop_register[station_no].get('direction') print_with_timestamp(f"ModbusDtMachine set_station_dropdirection direction register {direction_register} ") if direction_register is None or direction is None or direction_register == "": value_return = {'status': 'success', 'message': f"drop direction set is not demand"} return value_return try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['direction']) resp = self.client.write_register(address_value, direction, self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f"set station{station_no} finished success"} except ModbusException as error: value_return['error'] = error pass return value_return def set_station_dutInfo(self, station_no, dutInfo): value_return = {'status': 'error', 'message': f"set station{station_no} dutInfo error"} dutBasic_register = self.drop_register[station_no].get('dutBasic') # print_with_timestamp(f"ModbusDtMachine set_station_dutInfo --0 {dutInfo} {dutBasic_register} {type(dutInfo)} ") if dutBasic_register is None or dutInfo is None or dutBasic_register == "": value_return = {'status': 'success', 'message': f"dut info set is not demand"} return value_return try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['dutBasic']) # 20250827 增加判断充UI传下来的是单个dut的SN,还是多个dut的SN数组 dut_info_type = 0 if dutInfo is None else 2 if isinstance(dutInfo, list) else 1 if isinstance(dutInfo, dict) else 0 if dut_info_type == 1: dut0 = dutInfo elif dut_info_type == 2: dut0 = dutInfo[0] else: dut0 = {} # print_with_timestamp(f"ModbusDtMachine set_station_dutInfo --1 {dut0} {dut_info_type} ") dutProject = dut0.get("project", "") # 项目代码 dutPhase = dut0.get("phase", "") # 项目阶段 dutProjectType = dut0.get("projectType", "") # 项目方案 dutWeeks = str(dut0.get("weeks", 0)) # 测试周次 dutWorkOrder = dut0.get("workOrder", "") # 工单 registers_value = [0x0001] # 为产品信息有效标识 registers_value.extend(utils.string_to_registers(dutProject, 36)) registers_value.extend(utils.string_to_registers(dutPhase, 36)) registers_value.extend(utils.string_to_registers(dutProjectType, 36)) registers_value.extend(utils.string_to_registers(dutWeeks, 36)) registers_value.extend(utils.string_to_registers(dutWorkOrder, 36)) registers_value_bottom = [] if dut_info_type == 1: # 20250830 为了兼容1个工位(通道) 可能同时放置4个被测样品 registers_value_bottom.extend(utils.string_to_registers(dut0.get('SN', " "), 36)) # 产品序列号 elif dut_info_type == 2: pass # registers_value.extend(utils.string_to_registers(dut0.get('SN', " "), 36)) # 产品序列号 for dut in dutInfo: # 20250830 为了兼容1个工位(通道) 可能同时放置4个被测样品 registers_value_bottom.extend(utils.string_to_registers(dut.get('SN', " "), 36)) else: # dufInfo 参数非法 return value_return # self.client_write 不能写太多数量的寄存器 resp1 = self.client.write_registers(address_value, registers_value, self.plc_address) resp2 = self.client.write_registers(address_value + len(registers_value), registers_value_bottom, self.plc_address) # print_with_timestamp(f"ModbusDtMachine set_station_dutInfo dutInfo register--6 {dutBasic_register} {registers_value} " # f"{resp1} {resp2}") if resp1.isError() is False and resp2.isError() is False: value_return = {'status': 'success', 'message': f"set station{station_no} dutInfo success"} except ModbusException as error: value_return['error'] = error pass return value_return def read_station_dutInfo(self, station_no, sn_count=1): """ 从 Modbus 寄存器读取试验样品信息并转换为字符串 :param station_no: 工位号 :param sn_count: SN 数量,默认为1(单个产品) :return: 包含产品信息的字典 """ value_return = {'status': 'error', 'message': f"get station{station_no} dutInfo error"} try: # 获取寄存器地址 dutBasic_register = self.drop_register[station_no].get('dutBasic') if dutBasic_register is None or dutBasic_register == "": value_return = {'status': 'success', 'message': f"dut info get is not demand"} return value_return address_type, address_value = self.conv_register_address(dutBasic_register) # 计算需要读取的寄存器数量 # 固定部分: 1个标识符 + 5个字段 × 18个寄存器/字段 = 91个寄存器 fixed_registers_count = 1 + 5 * 18 # 标识符 + 5个字段(每个36字节=18寄存器) # 第一次读取:固定部分 response_fixed = self.client.read_holding_registers( address_value, fixed_registers_count, self.plc_address ) if response_fixed.isError(): value_return['message'] = f"Failed to read fixed registers: {response_fixed}" return value_return fixed_registers = response_fixed.registers # 解析标识符 identifier = fixed_registers[0] if identifier != 0x0001: value_return['message'] = f"Invalid identifier: {identifier}" return value_return # 解析固定字段 dut_info = {} index = 1 # 跳过标识符 # 项目代码 (36字节 = 18寄存器) project_registers = fixed_registers[index:index + 18] dut_info['project'] = utils.registers_to_string(project_registers).strip() index += 18 # 项目阶段 (36字节 = 18寄存器) phase_registers = fixed_registers[index:index + 18] dut_info['phase'] = utils.registers_to_string(phase_registers).strip() index += 18 # 项目方案 (36字节 = 18寄存器) projectType_registers = fixed_registers[index:index + 18] dut_info['projectType'] = utils.registers_to_string(projectType_registers).strip() index += 18 # 测试周次 (36字节 = 18寄存器) weeks_registers = fixed_registers[index:index + 18] weeks_str = utils.registers_to_string(weeks_registers).strip() dut_info['weeks'] = int(weeks_str) if weeks_str.isdigit() else 0 index += 18 # 工单 (36字节 = 18寄存器) workOrder_registers = fixed_registers[index:index + 18] dut_info['workOrder'] = utils.registers_to_string(workOrder_registers).strip() # 第二次读取:SN部分 sn_list = [] if sn_count > 0: # 计算SN部分的起始地址和数量 sn_start_address = address_value + fixed_registers_count sn_registers_count = sn_count * 18 # 每个SN占36字节=18寄存器 response_sn = self.client.read_holding_registers( sn_start_address, sn_registers_count, self.plc_address ) if response_sn.isError(): value_return['message'] = f"Failed to read SN registers: {response_sn}" return value_return sn_registers = response_sn.registers # 解析SN列表 for i in range(sn_count): start_idx = i * 18 end_idx = start_idx + 18 if end_idx <= len(sn_registers): sn_part = sn_registers[start_idx:end_idx] # 检查是否全为0 if all(reg == 0 for reg in sn_part): continue # 全0则放弃 sn_raw = utils.registers_to_string(sn_part).strip() sn = get_visible_string(sn_raw) if not sn: continue else: sn_list.append(sn) # 根据SN数量组织返回数据 if sn_count == 1: dut_info['SN'] = sn_list[0] if sn_list else "" value_return = { 'status': 'success', 'message': f"read station{station_no} dutInfo success", 'data': dut_info } else: # 为每个SN创建一个完整的信息副本 duts_info = [] for sn in sn_list: dut_copy = dut_info.copy() dut_copy['SN'] = sn duts_info.append(dut_copy) value_return = { 'status': 'success', 'message': f"read station{station_no} dutInfo success", 'data': duts_info } except Exception as e: value_return['error'] = str(e) print_with_timestamp(f"Error in read_station_dutInfo: {e}", color='red') return value_return def read_station_dropheight(self, station_no): value_return = {'status': 'error', 'message': f"read station{station_no} dropheight error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['height']) resp = self.client.read_holding_registers(address_value, 0x1, self.plc_address) if resp.isError() is False and len(resp.registers) >= 1: value_return = {'status': 'success', 'message': f"read station{station_no} height success", 'value': resp.registers[0]} except ModbusException as error: value_return['error'] = error pass return value_return def read_station_result_counter(self, station_no): value_return = {'status': 'error', 'message': f"set station{station_no} result counter error"} try: if self.drop_register[station_no].get('stationResultCounter'): address_type, address_value = self.conv_register_address( self.drop_register[station_no]['stationResultCounter']) resp = self.client.read_holding_registers(address_value, 0x2, self.plc_address) # print_with_timestamp(f"dtMachine modbus read_station_result_counter " # f"{self.drop_register[station_no].get('stationResultCounter')} {resp}") if resp.isError() is False and len(resp.registers) >= 2: value_return = {'status': 'success', 'message': f"read station{station_no} result counter success", 'value': (resp.registers[0] | resp.registers[1] * 0x10000)} except ModbusException as error: value_return['error'] = error pass return value_return def read_station_cyclesFinished(self, station_no): value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinished error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cyclesFinished']) resp = self.client.read_holding_registers(address_value, 0x1, self.plc_address) if resp.isError() is False and len(resp.registers) >= 1: value_return = {'status': 'success', 'message': f"read station{station_no} cyclesFinished success", 'value': resp.registers[0]} except ModbusException as error: value_return['error'] = error pass return value_return def read_station_cyclesFinishedBulk(self, station_no, result_index=0, length=33): value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinishedBulk error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cyclesFinished']) address_value = address_value + PLCTestReqResultData.required_registers_size * result_index resp = self.client.read_holding_registers(address_value, length, self.plc_address) if resp.isError() is False and len(resp.registers) >= length: value_return = {'status': 'success', 'message': f"read station{station_no} cyclesFinishedBulk success", 'value': resp.registers, 'address': address_value} """ nextTestResultBufferFlag = 0 if result_index == 6: nextTestResultBufferFlag = 1 if self.drop_register[station_no].get('resultSwitch') and result_index in [2, 6]: try: address_type, flag_address_value = self.conv_register_address(self.drop_register[station_no]['resultSwitch']) resp = self.client.write_register(flag_address_value, nextTestResultBufferFlag, self.plc_address) if resp.isError() is True: print_with_timestamp(f"set station {station_no} nextTestResultBufferFlag error!!!!!") except Exception as error: print_with_timestamp(f"write switch register error {error}", color='red') # cyx """ except ModbusException as error: value_return['error'] = error print_with_timestamp(f"read_station_cyclesFinishedBulk error {error}", color='red') pass return value_return def read_station_dropParamsBulk(self, station_no, length=33): value_return = {'status': 'error', 'message': f"set station{station_no} dropParamsBulk error"} direction_register = self.drop_register[station_no].get('direction') if direction_register is None or direction_register == "": value_return = {'status': 'success', 'message': f"drop params read is not demand"} return value_return try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['direction']) resp = self.client.read_holding_registers(address_value, length, self.plc_address) if resp.isError() is False and len(resp.registers) >= length: value_return = {'status': 'success', 'message': f"read station{station_no} dropParamsBulk success", 'value': resp.registers} except ModbusException as error: value_return['error'] = error pass return value_return def start_station(self, station_no): value_return = {'status': 'error', 'message': f"start station{station_no} error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop']) resp1 = self.client.write_coils(address_value, [0x00], self.plc_address) address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start']) resp2 = self.client.write_coils(address_value, [0x01], self.plc_address) if resp1.isError() is False and resp2.isError() is False: value_return = {'status': 'success', 'message': f"start station{station_no} success"} except ModbusException as error: value_return['error'] = error pass return value_return def resume_station(self, station_no): value_return = {'status': 'error', 'message': f"resume station{station_no} error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop']) resp1 = self.client.write_coils(address_value, [0x00], self.plc_address) address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start']) resp2 = self.client.write_coils(address_value, [0x01], self.plc_address) if resp1.isError() is False and resp2.isError() is False: value_return = {'status': 'success', 'message': f"resume station{station_no} success"} except ModbusException as error: value_return['error'] = error pass return value_return def stop_station(self, station_no): value_return = {'status': 'error', 'message': f"stop station{station_no} error"} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start']) resp1 = self.client.write_coils(address_value, [0x00], self.plc_address) address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop']) resp2 = self.client.write_coils(address_value, [0x01], self.plc_address) if resp1.isError() is False and resp2.isError() is False: value_return = {'status': 'success', 'message': f"stop station{station_no} success"} except ModbusException as error: value_return['error'] = error pass return value_return def station_start_status(self, station_no): value_return = {'status': 'error'} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start']) resp1 = self.client.read_coils(address_value, 0x01, self.plc_address) if resp1.isError() is False: value_return = {'status': 'success', 'message': f" station{station_no} status success", 'value': resp1.bits[0]} except ModbusException as error: value_return['error'] = error pass return value_return def station_pause_status(self, station_no): # 不管是上位机还是HMI,如果暂停了机器,需要去设置这个值 value_return = {'status': 'error'} if self.drop_register[station_no].get('pause') is None: value_return = {'status': 'error', 'message': "register is not exist"} return value_return try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['pause']) resp1 = self.client.read_coils(address_value, 0x01, self.plc_address) if resp1.isError() is False: value_return = {'status': 'success', 'message': f" station{station_no} status success", 'value': resp1.bits[0]} except ModbusException as error: value_return['error'] = error pass return value_return def station_stop_status(self, station_no): # 读PLC 停止试验的寄存器 value_return = {'status': 'error'} try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop']) resp1 = self.client.read_coils(address_value, 0x01, self.plc_address) if resp1.isError() is False: value_return = {'status': 'success', 'message': f" station{station_no} status success", 'value': resp1.bits[0]} except ModbusException as error: value_return['error'] = error pass return value_return def station_run_status(self, station_no): # 读PLC 停止试验的寄存器 value_return = {'status': 'error', 'value': 0x00} if 'runStatus' not in self.drop_register[station_no]: return value_return try: address_type, address_value = self.conv_register_address(self.drop_register[station_no]['runStatus']) resp1 = self.client.read_coils(address_value, 0x01, self.plc_address) if resp1.isError() is False: value_return = {'status': 'success', 'message': f" station{station_no} status success", 'value': resp1.bits[0]} except ModbusException as error: value_return['error'] = error pass return value_return def read_plc_connect_state(self): value_return = {'status': 'error', 'message': f"get plc connect state error--0", 'value': 0x00} """ if self.is_connected_to_tcp_server() is False and self.preConnection: print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} disconnect", color='red') if self.is_connected_to_tcp_server() and self.preConnection is None: print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} connectted", color='green') self.preConnection = self.connection """ if self.is_connected_to_tcp_server() is False: value_return = {'status': 'error', 'message': f"get plc connect state error modbus tcp client is not connectted"} return value_return try: type, address_value = self.conv_register_address(PLC_CONNECT_REGISTER) resp = self.client.read_holding_registers(address_value, 0x9, self.plc_address) # 10 定义为PLC 连接状态寄存器 # print_with_timestamp(f"dtMachine_modbus read_plc_connect_state {self.comm} {resp.registers}") if resp.isError() is False and len(resp.registers) >= 9: # 20250905 读取5个 叠加运行状态 value_return = {'status': 'success', 'message': f"get plc connect state success", 'value': resp.registers} except ModbusException as error: value_return['error'] = error pass except Exception as error: value_return['error1'] = error return value_return def read_plc_read_index(self): value_return = {'status': 'error', 'message': f"get plc connect state error--0", 'value': 0x00} """ if self.is_connected_to_tcp_server() is False and self.preConnection: print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} disconnect", color='red') if self.is_connected_to_tcp_server() and self.preConnection is None: print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} connectted", color='green') self.preConnection = self.connection """ if self.is_connected_to_tcp_server() is False: value_return = {'status': 'error', 'message': f"get plc connect state error modbus tcp client is not connectted"} return value_return try: type, address_value = self.conv_register_address(PLC_READ_INDEX_REGISTER) resp = self.client.read_holding_registers(address_value, 0x4, self.plc_address) # 15 定义为PLC # print_with_timestamp(f"dtMachine_modbus read_plc_connect_state {self.comm} {resp.registers}") if resp.isError() is False and len(resp.registers) >= 4: value_return = {'status': 'success', 'message': f"get plc read index success", 'value': resp.registers} except ModbusException as error: value_return['error'] = error pass except Exception as error: value_return['error1'] = error return value_return def register_test(self, action, address, value): type, address_value = self.conv_register_address(address) if (type == 'Data_Register' or type == 'Holding_Register') and action.lower() == 'read': value_return = {'status': 'error', 'message': f"register test {action}:{address} error"} try: resp = self.client.read_holding_registers(address_value, 0x1, self.plc_address) if resp.isError() is False and len(resp.registers) >= 1: value_return = {'status': 'success', 'message': f"register test {action}:{address} success", 'value': resp.registers[0]} except pymodbus.exceptions.ModbusIOExceptio as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error except Exception as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error pass return value_return if (type == 'Data_Register' or type == 'Holding_Register') and action.lower() == 'write': try: print_with_timestamp('ModbusDtMachine register_test', action, address, value, address_value) # cyx resp = self.client.write_register(address_value, int(value), self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f"register test {action}:{address} success"} except ModbusIOException as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error except Exception as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error pass return value_return if type == 'Coil' and action.lower() == 'read': try: resp = self.client.read_coils(address_value, 0x01, self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f" register test {action}:{address} success", 'value': resp.bits[0]} except ModbusIOException as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error except Exception as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error return value_return if type == 'Coil' and action.lower() == 'write': try: resp = self.client.write_coils(address_value, [value], self.plc_address) if resp.isError() is False: value_return = {'status': 'success', 'message': f"register test {action}:{address} success"} except ModbusIOException as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error except Exception as error: print_with_timestamp('ModbusDtMachine register_test', action, address, value, self.client.get_last_error()) # cyx value_return['error'] = error return value_return pass