Files
dtm-py-all/modbus_plc.py

772 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import array
import time
import datetime
import utils
from utils import print_with_timestamp
import re
from pymodbus.client import ModbusSerialClient as ModbusSerialClient
from pymodbus.exceptions import ModbusException, ConnectionException
import threading
from utils import PLCDropParamsData
MOCK_MODE = False # 默认使用真实 PLC通过命令行参数 --mock 启用模拟模式
PLC_DROP_PARAMS = [
{'dropCode': 1, 'dropCycles': 10, 'dropHeight': 7500},
{'dropCode': 2, 'dropCycles': 10, 'dropHeight': 7600},
{'dropCode': 3, 'dropCycles': 10, 'dropHeight': 7500},
{'dropCode': 4, 'dropCycles': 8, 'dropHeight': 7600},
{'dropCode': 5, 'dropCycles': 7, 'dropHeight': 7500},
{'dropCode': 6, 'dropCycles': 16, 'dropHeight': 7600},
]
# 扩展到4个通道
STATION_REG_BASE = [
{'drop_params': 'D3020', 'dut_info': 'HD0300', 'station_status': 'D11', 'station_cycles': 'HD0164',
'station_read_index': 'D15', 'station_drop_result': 'HD0510'},
{'drop_params': 'D4020', 'dut_info': 'HD2300', 'station_status': 'D12', 'station_cycles': 'HD0264',
'station_read_index': 'D16', 'station_drop_result': 'HD1110'},
{'drop_params': 'D5020', 'dut_info': 'HD4300', 'station_status': 'D13', 'station_cycles': 'HD0364',
'station_read_index': 'D17', 'station_drop_result': 'HD1710'}
]
class ModbusPLC_Mock:
def __init__(self, plc_port, comm_config):
self.plc_port = plc_port
self.plc_com_state = 0
self.mock_mode = True # 添加模拟模式开关
# 初始化模拟寄存器
self._mock_registers = {}
self._initialize_mock_registers()
self._connect_state_cache = [0] * 5 # 初始化连接状态缓存假设5个寄存器
self._connect_state_address = self.conv_register_address('D10')
self._slave = 1
self._poll_connect_state_interval = 0.5 # 500ms
self._serial_lock = threading.Lock() # 用于同步串口访问
self._stop_event = threading.Event()
self._timer_thread = None
# 为每个通道创建模拟线程控制变量
self.station_threads = {} # 存储每个通道的线程
self.station_thread_controls = {} # 存储每个通道的控制变量
self.plc_com_state = 1 # 模拟连接成功
self._start_connect_state_polling()
def _initialize_mock_registers(self):
"""初始化模拟寄存器"""
# 初始化方向跌落参数
drop_params_registers = PLCDropParamsData.to_registers(PLC_DROP_PARAMS)
drop_params_base_addr1 = self.conv_register_address(STATION_REG_BASE[0].get('drop_params'))
drop_params_base_addr2 = self.conv_register_address(STATION_REG_BASE[1].get('drop_params'))
for index, value in enumerate(drop_params_registers):
self._mock_registers[drop_params_base_addr1 + index] = value
self._mock_registers[drop_params_base_addr2 + index] = value
# 初始化连接状态寄存器 (D10-D14)
# D10: 设备连接状态
# D11-D14: 通道工作状态
for i in range(10, 20):
type_char, addr_val = 'D', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
if i == 10:
self._mock_registers[modbus_addr] = 0x0001
self._mock_registers[i] = 0x0001
else:
self._mock_registers[modbus_addr] = 0x0000
self._mock_registers[i] = 0x0000
# 初始化跌落参数区域 (HD3020-HD3048)
for i in range(3020, 3049):
type_char, addr_val = 'HD', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
self._mock_registers[modbus_addr] = 0x0000
# 初始化样品信息区域 (HD300-HD462) - 扩展到支持4个通道
for i in range(300, 463*4): # 扩展区域以支持更多通道
type_char, addr_val = 'HD', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
self._mock_registers[modbus_addr] = 0x0000
# 初始化试验结果区域 (HD510-HD2949) - 扩展以支持4个通道每个通道10组结果
for i in range(510, 2950):
type_char, addr_val = 'HD', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
self._mock_registers[modbus_addr] = 0x0000
def conv_to_plc_address(self, type_char, value):
# 放在modbus tcp client 进行转换
return int(value)
"""
if type_char == 'M':
return int(value)
if type_char =='D' or type_char == 'HD':
return int(value) + 41088
return int(value)
"""
def conv_register_address(self, s):
address_type = None
# 使用正则表达式检查格式,确保是一个或两个字母后跟三或四位数字
pattern = re.compile(r'^([MD]|HD)\d{1,6}$')
if not pattern.match(s):
raise ValueError(
"Invalid format. The string must start with 'M', 'D', or 'HD' followed by three or four digits.")
address_type = s[0] if s[1].isdigit() else s[:2] # 获取前缀,可以是'M', 'D', 或者 'HD'
address_value = int(s[len(address_type):]) # 获取后面的数字部分
converted_address = address_value
# 根据PLC的前缀来决定如何映射到Modbus地址
if address_type == 'M':
converted_address = address_value
# 假设'M'类型的地址映射到Modbus的Coil
modbus_type = "Coil"
elif address_type == 'HD':
# 假设'D'类型的地址映射到Modbus的Holding Register
converted_address = address_value + 41088
modbus_type = "Holding_Register"
elif address_type == 'D':
# 'HD'可能指的是特殊功能或更高位数据的地址
converted_address = address_value
modbus_type = "Data_Register"
return converted_address
def _start_connect_state_polling(self):
"""启动后台线程定时读取连接状态"""
self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True)
self._timer_thread.start()
def _poll_connect_state(self):
"""后台线程函数每隔300ms读取连接状态并更新缓存"""
while not self._stop_event.is_set():
# 读取连接状态单元ID为1读取5个寄存器
try:
# 使用锁保护串口访问
# print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}")
result = self.read_holding_registers(self._connect_state_address, 9, self._slave)
# print_with_timestamp(f"modbus plc end read connect state---1 {result}")
if result and result.get('status') == 'success' and result.get('data'):
self._connect_state_cache = result['data']
else:
self._connect_state_cache = [0] * 9
except Exception as e:
print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red')
self._connect_state_cache = [0] * 9
time.sleep(self._poll_connect_state_interval) # 每隔500ms
def get_connect_state_cache(self, connect_state_address, slave):
self._connect_state_address = connect_state_address
self._slave = slave
"""返回缓存的连接状态,不进行实时读取"""
if self.plc_port == 'com30' and 0:
print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}")
return self._connect_state_cache
def read_coils(self, address, count, unit=1):
# print("modbus plc read_coils", address)
# 模拟线圈读取
# 直接使用传入的Modbus地址
start_addr = int(address)
bits = []
for i in range(count):
# 简单模拟返回False
bits.append(False)
return {"status": "success", "data": bits}
def write_single_coil(self, address, value, unit=1):
print("modbus plc write_single_coil ", address, value)
# 模拟线圈写入
# 直接使用传入的Modbus地址
addr = int(address)
print_with_timestamp(f"Mock write coil {addr} = {value}", color='blue')
return {"status": "success"}
def stop_polling(self):
"""停止后台线程(包括连接状态轮询和通道模拟线程)"""
print_with_timestamp(f"stop_polling called for {self.plc_port}", color='yellow')
# 设置停止事件
self._stop_event.set()
# 停止所有通道模拟线程
for station_idx in list(self.station_thread_controls.keys()):
self.station_thread_controls[station_idx]['stop'] = True
print_with_timestamp(f"Setting stop flag for station {station_idx}", color='yellow')
# 等待所有通道线程结束(增加等待时间到 3 秒)
for station_idx, thread in list(self.station_threads.items()):
if thread and thread.is_alive():
print_with_timestamp(f"Waiting for station {station_idx} thread to stop...", color='yellow')
thread.join(timeout=3) # 增加到 3 秒
if thread.is_alive():
print_with_timestamp(f"Station {station_idx} thread did not stop in time!", color='red')
else:
print_with_timestamp(f"Stopped station {station_idx} simulation thread", color='green')
# 停止连接状态轮询线程
if self._timer_thread and self._timer_thread.is_alive():
self._timer_thread.join(timeout=1)
print_with_timestamp(f"Stopped polling thread for {self.plc_port}", color='green')
def read_holding_registers(self, address, count, unit=1):
# 模拟寄存器读取
# 直接使用传入的Modbus地址
# 如果已经停止,返回空数据避免触发回调
if self._stop_event.is_set():
return {"status": "error", "message": "PLC stopped"}
start_addr = int(address)
values = []
for i in range(count):
reg_addr = start_addr + i
values.append(self._mock_registers.get(reg_addr, 0x0000))
if self.plc_port == 'com7':
pass
#print_with_timestamp(f"modbus plc read_holding_registers {self.plc_port} {start_addr} {count} {values}")
return {"status": "success", "data": values}
def read_input_registers(self, address, count, unit=1):
# 模拟输入寄存器读取
# 直接使用传入的Modbus地址
start_addr = int(address)
values = []
for i in range(count):
reg_addr = start_addr + i
values.append(self._mock_registers.get(reg_addr, 0x0000))
return {"status": "success", "data": values}
def write_single_register(self, address, value, unit=1):
print(f"modbus plc write_single_register-", address, value, unit)
# 模拟寄存器写入
# 直接使用传入的Modbus地址
addr = int(address)
self._mock_registers[addr] = value
# 检查是否有DUT信息写入如果是则启动测试
self._check_and_start_test_on_dut_write(addr, value)
print_with_timestamp(f"Mock write register {addr} = {value}", color='blue')
return {"status": "success"}
def write_registers(self, address, values, unit=1):
# 模拟多个寄存器写入
# 直接使用传入的Modbus地址
start_addr = int(address)
for i, value in enumerate(values):
self._mock_registers[start_addr + i] = value
# 检查是否是DUT信息写入
self._check_and_start_test_on_dut_write(start_addr + i, value)
print_with_timestamp(f"Mock write registers {start_addr} = {values}", color='blue')
return {"status": "success"}
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
# print(f"modbus plc read holding registers {address} {length}")
resp = {'status': 'error'}
result = self.read_holding_registers(address, length, plc_no)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']}
return resp
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
resp = {'status': 'error'}
result = self.read_coils(address, length, plc_no)
utils.condition_print('plc_read_bits result', result) # cyx
if 'status' in result and result['status'] == 'success' and 'data' in result:
bits = result['data']
resp = {'status': 'success', 'length': len(bits), 'data': bits}
return resp
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
return self.write_single_register(address, data, plc_no)
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
return self.write_single_register(address, data, plc_no)
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
# print("modbus plc write words",plc_no,start_addr,length,data) # cyx
return self.write_registers(start_addr, data, plc_no)
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01):
return self.write_single_coil(address, data, plc_no)
def _check_and_start_test_on_dut_write(self, addr, value):
"""检查是否是DUT信息写入并启动测试"""
if value != 1:
return
# 检查写入的地址是否是某个通道的DUT信息标志位
for station_idx, station_reg in enumerate(STATION_REG_BASE):
dut_info_addr = self.conv_register_address(station_reg.get('dut_info', f'HD{station_idx}000'))
if addr == dut_info_addr:
# DUT信息写入标志为1启动该通道的测试
station_status_addr = self.conv_register_address(station_reg.get('station_status', f'D{11+station_idx}'))
self._mock_registers[station_status_addr] = 1 # 设置状态为"试验中"
# 启动该通道的模拟线程
self._start_station_simulation_thread(station_idx)
break
def _start_station_simulation_thread(self, station_idx):
"""为指定通道启动模拟线程"""
# 如果该通道已经有线程在运行,则先停止它
if station_idx in self.station_threads and self.station_threads[station_idx].is_alive():
if station_idx in self.station_thread_controls:
self.station_thread_controls[station_idx]['stop'] = True
self.station_threads[station_idx].join(timeout=2)
# 创建新的控制变量
control = {'stop': False}
self.station_thread_controls[station_idx] = control
# 启动新的模拟线程
thread = threading.Thread(target=self._simulate_station_test, args=(station_idx, control), daemon=True)
self.station_threads[station_idx] = thread
thread.start()
print_with_timestamp(f"Started simulation thread for station {station_idx}", color='green')
def _simulate_station_test(self, station_idx, control):
"""模拟指定通道的测试过程"""
station_reg = STATION_REG_BASE[station_idx]
# 获取相关寄存器地址
station_status_addr = self.conv_register_address(station_reg.get('station_status'))
station_cycles_addr = self.conv_register_address(station_reg.get('station_cycles'))
station_read_index_addr = self.conv_register_address(station_reg.get('station_read_index'))
station_drop_result_addr = self.conv_register_address(station_reg.get('station_drop_result'))
# 初始化计数器
direction_index = 0 # 当前方向索引
directions = PLC_DROP_PARAMS # 所有方向的参数
print_with_timestamp(f"Starting simulation for station {station_idx+1}", color='blue')
while not control['stop'] and not self._stop_event.is_set() and direction_index < len(directions):
direction = directions[direction_index]
direction_cycle_count = 0 # 当前方向的跌落次数
print_with_timestamp(f"Station {station_idx} starting direction {direction['dropCode']}", color='cyan')
# 设置状态为"试验中"
self._mock_registers[station_status_addr] = 1
# 模拟当前方向的跌落
while direction_cycle_count < direction['dropCycles'] and not control['stop'] and not self._stop_event.is_set():
# 模拟1.5秒的跌落周期(使用可中断的 sleep
for _ in range(15): # 1.5秒 = 15 * 0.1秒
if control['stop'] or self._stop_event.is_set():
print_with_timestamp(f"Station {station_idx} 收到停止信号,退出测试", color='red')
return
time.sleep(0.1)
# 增加跌落次数
direction_cycle_count += 1
# 更新station_cycles (一直增加)
current_cycles = self._mock_registers.get(station_cycles_addr, 0)
self._mock_registers[station_cycles_addr] = current_cycles + 1
# 更新station_read_index (1-10循环)
current_read_index = self._mock_registers.get(station_read_index_addr, 0)
new_read_index = (current_read_index % 10) + 1
self._mock_registers[station_read_index_addr] = new_read_index
# 生成并写入模拟结果数据
result_data = self._generate_mock_result_data(
station_idx,
direction,
direction_cycle_count,
current_cycles + 1
)
# 计算结果写入位置 (根据read_index)
result_offset = (new_read_index - 1) * 44 # 每组结果44个寄存器
for i, value in enumerate(result_data):
self._mock_registers[station_drop_result_addr + result_offset + i] = value
print_with_timestamp(
f"Station {station_idx} direction {direction['dropCode']} cycle {direction_cycle_count}/{direction['dropCycles']}, "
f"total cycles: {current_cycles + 1}, read_index: {new_read_index}",
color='blue'
)
# 当前方向完成
if direction_index <len(directions) and not control['stop'] and not self._stop_event.is_set():
# 暂停3秒
self._mock_registers[station_status_addr] = 2 # 暂停状态
print_with_timestamp(
f"Station {station_idx} direction {direction['dropCode']} completed, pausing for 3 seconds...",
color='yellow'
)
# 等待3秒
for _ in range(50): # 3秒 = 30 * 0.1秒
if control['stop'] or self._stop_event.is_set():
return
time.sleep(0.1)
# 切换到下一个方向
direction_index += 1
# 所有方向完成
if direction_index >= len(directions) and not control['stop'] and not self._stop_event.is_set():
self._mock_registers[station_status_addr] = 3 # 完成状态
print_with_timestamp(f"Station {station_idx} all directions completed!", color='green')
def _generate_mock_result_data(self, station_idx, direction, direction_cycle_count, total_cycles):
"""生成模拟的测试结果数据"""
from utils import PLCTestReqResultData
# 获取当前时间
now = datetime.datetime.now()
# 生成44个寄存器的数据 (根据PLCTestReqResultData结构)
result_registers = [0] * 44
# beginYear, beginMonth, beginDay, beginHour, beginMinute, beginSecond
result_registers[0] = now.year
result_registers[1] = now.month
result_registers[2] = now.day
result_registers[3] = now.hour
result_registers[4] = now.minute
result_registers[5] = now.second
# endYear, endMonth, endDay, endHour, endMinute, endSecond
result_registers[6] = now.year
result_registers[7] = now.month
result_registers[8] = now.day
result_registers[9] = now.hour
result_registers[10] = now.minute
result_registers[11] = now.second
# directionCode
result_registers[12] = direction['dropCode']
# station_no
result_registers[13] = station_idx + 1
# itemSN (流水号) - 2个寄存器
result_registers[14] = total_cycles & 0xFFFF
result_registers[15] = (total_cycles >> 16) & 0xFFFF
# dropHeight - 2个寄存器
result_registers[16] = direction['dropHeight'] & 0xFFFF
result_registers[17] = (direction['dropHeight'] >> 16) & 0xFFFF
# dropSpeed - 2个寄存器 (模拟速度)
result_registers[18] = 1500 & 0xFFFF
result_registers[19] = 0
# dutDropCycles (样品累计已完成跌落次数) - 2个寄存器
result_registers[20] = total_cycles & 0xFFFF
result_registers[21] = (total_cycles >> 16) & 0xFFFF
# itemCurrentCycles (样品此方向已经完成跌落次数) - 2个寄存器
result_registers[22] = direction_cycle_count & 0xFFFF
result_registers[23] = (direction_cycle_count >> 16) & 0xFFFF
# stationDropCycles (本通道自从维护以来累计跌落次数) - 2个寄存器
result_registers[24] = total_cycles & 0xFFFF
result_registers[25] = (total_cycles >> 16) & 0xFFFF
# machine_sn - 18个寄存器代表36个字节
machine_sn = f"MOCK_MACHINE_{station_idx}"
sn_registers = utils.string_to_registers(machine_sn, 36)
for i, reg in enumerate(sn_registers):
if i < 18:
result_registers[26 + i] = reg
return result_registers
def close(self):
# 停止所有模拟线程
for station_idx in self.station_thread_controls:
self.station_thread_controls[station_idx]['stop'] = True
# 等待所有线程结束
for station_idx, thread in self.station_threads.items():
if thread.is_alive():
thread.join(timeout=1)
if self.mock_mode:
return
self.client.close()
class ModbusPLC_Real:
def __init__(self, plc_port, comm_config):
self.plc_port = plc_port
self.plc_com_state = 0
self.client = None
self._connect_state_cache = [0] * 5 # 初始化连接状态缓存假设5个寄存器
self._connect_state_address = 10
self._slave = 1
self._poll_connect_state_interval = 0.5 # 500ms
self._serial_lock = threading.Lock() # 用于同步串口访问
self._stop_event = threading.Event()
self._timer_thread = None
self._initialize_client(comm_config)
self._start_connect_state_polling()
def _initialize_client(self, comm_config):
try:
self.client = ModbusSerialClient(method="rtu", port=self.plc_port,
baudrate=comm_config.get('baudrate'),
parity=comm_config.get('parity'), timeout=1)
self.client.connect()
if self.port_is_opened():
self.plc_com_state = 1 # 表示串口能打开
print_with_timestamp(f"Modbus PLC open {self.client} {self.plc_port} {comm_config['baudrate']} "
f"{comm_config['parity']} success", color='green') # cyx
except Exception as e:
print_with_timestamp(f"Mmodbus PLC open {self.plc_port} "
f"{comm_config.get('baudrate')} {comm_config.get('parity')} error", color='red')
pass
def conv_to_plc_address(self, type_char, value):
# 放在modbus tcp client 进行转换
return int(value)
"""
if type_char == 'M':
return int(value)
if type_char =='D' or type_char == 'HD':
return int(value) + 41088
return int(value)
"""
def connect(self):
try:
if not self.client.connect():
print("Failed to connect to Modbus server in PLC")
return False
print("Modbus serial Client Connected to PLC by", self.plc_port)
return True
except ConnectionException as e:
print(f"Connection failed: {str(e)}")
return False
def reconnect(self):
return True # 20250908
if self.client:
self.client.close()
return self.connect()
def port_is_opened(self):
# Check if the serial connection is open
if self.client is None:
return False
return self.client.socket is not None if hasattr(self.client, 'socket') else False
def _start_connect_state_polling(self):
"""启动后台线程定时读取连接状态"""
self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True)
self._timer_thread.start()
def _poll_connect_state(self):
"""后台线程函数每隔300ms读取连接状态并更新缓存"""
while not self._stop_event.is_set():
if self.plc_com_state == 0:
# 尝试重新连接串口
try:
self.client.connect()
if self.port_is_opened():
self.plc_com_state = 1
self._poll_connect_state_interval = 0.5
else:
self.plc_com_state = 0
self._poll_connect_state_interval = 5 # 串口不通为5秒
except Exception as e:
self.plc_com_state = 0
self._poll_connect_state_interval = 5
else:
# 读取连接状态单元ID为1读取5个寄存器
try:
# 使用锁保护串口访问
# print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}")
result = self.read_holding_registers(self._connect_state_address, 9, self._slave)
# print_with_timestamp(f"modbus plc end read connect state---1 {result}")
if result and result.get('status') == 'success' and result.get('data'):
self._connect_state_cache = result['data']
else:
self._connect_state_cache = [0] * 9
except Exception as e:
print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red')
self._connect_state_cache = [0] * 9
time.sleep(self._poll_connect_state_interval) # 每隔500ms
def get_connect_state_cache(self, connect_state_address, slave):
self._connect_state_address = connect_state_address
self._slave = slave
"""返回缓存的连接状态,不进行实时读取"""
if self.plc_port == 'com30'and 0:
print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}")
return self._connect_state_cache
def stop_polling(self):
"""停止后台线程"""
self._stop_event.set()
if self._timer_thread and self._timer_thread.is_alive():
self._timer_thread.join(timeout=1)
def get_connect_state(self, connect_state_address, slave):
if self.plc_com_state == 0:
return 0x00
try:
result = self.read_holding_registers(connect_state_address, 0x05, slave) # 20250905 读取5个 叠加运行状态
#print_with_timestamp(f"modbus plc get_connect_state com={self.plc_port} addr={connect_state_address} "
# f"{slave} {result}")
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
return result['data']
else:
return [0x00, 0x00, 0x00, 0x00, 0x00]
except Exception as e:
return 0x00
def read_coils(self, address, count, unit=1):
# print("modbus plc read_coils", address)
with self._serial_lock:
try:
result = self.client.read_coils(address, count, slave=unit)
if result.isError():
return {"status": "error", "msg": str(result)}
return {"status": "success", "data": result.bits}
except (ModbusException, ConnectionException) as e:
print(f"Read error: {str(e)}")
if self.reconnect():
return self.read_coils(address, count, unit)
else:
return {"status": "error", "msg": str(e)}
def write_single_coil(self, address, value, unit=1):
print("modbus plc write_single_coil ", address, value)
with self._serial_lock:
try:
result = self.client.write_coil(address, value, 1)
if result.isError():
print("modbus plc write_single_coil error", str(result))
return {"status": "error", "msg": str(result)}
return {"status": "success"}
except (ModbusException, ConnectionException) as e:
print(f"Write error: {str(e)}")
if self.reconnect():
return self.write_single_coil(address, value, unit)
else:
return {"status": "error", "msg": str(e)}
def read_holding_registers(self, address, count, unit=1):
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to connect to open serial port"}
try:
result = self.client.read_holding_registers(address, count, slave=unit)
if result.isError():
return {"status": "error", "msg": str(result)}
return {"status": "success", "data": result.registers}
except (ModbusException, ConnectionException) as e:
return {"status": "error", "msg": str(e)}
def read_input_registers(self, address, count, unit=1):
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to open serial port"}
try:
result = self.client.read_input_registers(address, count, slave=unit)
if result.isError():
return {"status": "error", "msg": str(result)}
return {"status": "success", "data": result.registers}
except (ModbusException, ConnectionException) as e:
print(f"Read error: {str(e)}")
if self.reconnect():
return self.read_input_registers(address, count, unit)
else:
return {"status": "error", "msg": str(e)}
def write_single_register(self, address, value, unit=1):
print(f"modbus plc write_single_register-", address, value, unit)
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to open serial port"}
try:
try:
result = self.client.write_register(address, value, unit)
except Exception as e:
print(f"modbus plc write_single_register- error: {str(e)}")
if result.isError():
print(f"modbus plc write_single_register- error-2: ", result)
return {"status": "error", "msg": str(result)}
return {"status": "success"}
except (ModbusException, ConnectionException) as e:
print(f"Write error: {str(e)}")
if self.reconnect():
return self.write_single_register(address, value, unit)
else:
return {"status": "error", "msg": str(e)}
def write_registers(self, address, values, unit=1):
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to open serial port"}
try:
result = self.client.write_registers(address, values, unit)
#print(f"modbus plc write_registers {address} {values} {self.plc_port} {result}")
if result.isError():
return {"status": "error",
"msg": str(result)} # We don't raise ValueError anymore, consistent with other methods.
return {"status": "success"}
except (ModbusException, ConnectionException) as e:
print(f"Write error: {str(e)}")
if self.reconnect():
return self.write_registers(address, values,
unit) # Retry the write operation after successful reconnection
else:
return {"status": "error", "msg": str(e)} # Return error if reconnection fails
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
# print(f"modbus plc read holding registers {address} {length}")
resp = {'status': 'error'}
result = self.read_holding_registers(address, length, plc_no)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']}
return resp
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
resp = {'status': 'error'}
result = self.read_coils(address, length, plc_no)
utils.condition_print('plc_read_bits result', result) # cyx
if 'status' in result and result['status'] == 'success' and 'data' in result:
bits = result['data']
resp = {'status': 'success', 'length': len(bits), 'data': bits}
return resp
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
return self.write_single_register(address, data, plc_no)
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
return self.write_single_register(address, data, plc_no)
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
# print("modbus plc write words",plc_no,start_addr,length,data) # cyx
return self.write_registers(start_addr, data, plc_no)
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01):
return self.write_single_coil(address, data, plc_no)
def close(self):
self.client.close()
# 根据 MOCK_MODE 的值确定实际使用的类
def get_modbus_plc_class():
"""根据 MOCK_MODE 返回对应的 PLC 类"""
if MOCK_MODE:
return ModbusPLC_Mock
else:
return ModbusPLC_Real
# 保持向后兼容,但使用函数动态获取
def ModbusPLC(plc_port, comm_config):
"""
动态创建 PLC 实例
根据当前 MOCK_MODE 的值选择使用 Mock 或 Real 类
"""
plc_class = get_modbus_plc_class()
return plc_class(plc_port, comm_config)