Files
dtm-py-all/modbus_plc.py

772 lines
36 KiB
Python
Raw Permalink Normal View History

import array
import time
import datetime
import utils
from utils import print_with_timestamp
import re
from pymodbus.client import ModbusSerialClient as ModbusSerialClient
from pymodbus.exceptions import ModbusException, ConnectionException
import threading
from utils import PLCDropParamsData
MOCK_MODE = False # 默认使用真实 PLC通过命令行参数 --mock 启用模拟模式
PLC_DROP_PARAMS = [
{'dropCode': 1, 'dropCycles': 10, 'dropHeight': 7500},
{'dropCode': 2, 'dropCycles': 10, 'dropHeight': 7600},
{'dropCode': 3, 'dropCycles': 10, 'dropHeight': 7500},
{'dropCode': 4, 'dropCycles': 8, 'dropHeight': 7600},
{'dropCode': 5, 'dropCycles': 7, 'dropHeight': 7500},
{'dropCode': 6, 'dropCycles': 16, 'dropHeight': 7600},
]
# 扩展到4个通道
STATION_REG_BASE = [
{'drop_params': 'D3020', 'dut_info': 'HD0300', 'station_status': 'D11', 'station_cycles': 'HD0164',
'station_read_index': 'D15', 'station_drop_result': 'HD0510'},
{'drop_params': 'D4020', 'dut_info': 'HD2300', 'station_status': 'D12', 'station_cycles': 'HD0264',
'station_read_index': 'D16', 'station_drop_result': 'HD1110'},
{'drop_params': 'D5020', 'dut_info': 'HD4300', 'station_status': 'D13', 'station_cycles': 'HD0364',
'station_read_index': 'D17', 'station_drop_result': 'HD1710'}
]
class ModbusPLC_Mock:
def __init__(self, plc_port, comm_config):
self.plc_port = plc_port
self.plc_com_state = 0
self.mock_mode = True # 添加模拟模式开关
# 初始化模拟寄存器
self._mock_registers = {}
self._initialize_mock_registers()
self._connect_state_cache = [0] * 5 # 初始化连接状态缓存假设5个寄存器
self._connect_state_address = self.conv_register_address('D10')
self._slave = 1
self._poll_connect_state_interval = 0.5 # 500ms
self._serial_lock = threading.Lock() # 用于同步串口访问
self._stop_event = threading.Event()
self._timer_thread = None
# 为每个通道创建模拟线程控制变量
self.station_threads = {} # 存储每个通道的线程
self.station_thread_controls = {} # 存储每个通道的控制变量
self.plc_com_state = 1 # 模拟连接成功
self._start_connect_state_polling()
def _initialize_mock_registers(self):
"""初始化模拟寄存器"""
# 初始化方向跌落参数
drop_params_registers = PLCDropParamsData.to_registers(PLC_DROP_PARAMS)
drop_params_base_addr1 = self.conv_register_address(STATION_REG_BASE[0].get('drop_params'))
drop_params_base_addr2 = self.conv_register_address(STATION_REG_BASE[1].get('drop_params'))
for index, value in enumerate(drop_params_registers):
self._mock_registers[drop_params_base_addr1 + index] = value
self._mock_registers[drop_params_base_addr2 + index] = value
# 初始化连接状态寄存器 (D10-D14)
# D10: 设备连接状态
# D11-D14: 通道工作状态
for i in range(10, 20):
type_char, addr_val = 'D', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
if i == 10:
self._mock_registers[modbus_addr] = 0x0001
self._mock_registers[i] = 0x0001
else:
self._mock_registers[modbus_addr] = 0x0000
self._mock_registers[i] = 0x0000
# 初始化跌落参数区域 (HD3020-HD3048)
for i in range(3020, 3049):
type_char, addr_val = 'HD', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
self._mock_registers[modbus_addr] = 0x0000
# 初始化样品信息区域 (HD300-HD462) - 扩展到支持4个通道
for i in range(300, 463*4): # 扩展区域以支持更多通道
type_char, addr_val = 'HD', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
self._mock_registers[modbus_addr] = 0x0000
# 初始化试验结果区域 (HD510-HD2949) - 扩展以支持4个通道每个通道10组结果
for i in range(510, 2950):
type_char, addr_val = 'HD', i
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
self._mock_registers[modbus_addr] = 0x0000
def conv_to_plc_address(self, type_char, value):
# 放在modbus tcp client 进行转换
return int(value)
"""
if type_char == 'M':
return int(value)
if type_char =='D' or type_char == 'HD':
return int(value) + 41088
return int(value)
"""
def conv_register_address(self, s):
address_type = None
# 使用正则表达式检查格式,确保是一个或两个字母后跟三或四位数字
pattern = re.compile(r'^([MD]|HD)\d{1,6}$')
if not pattern.match(s):
raise ValueError(
"Invalid format. The string must start with 'M', 'D', or 'HD' followed by three or four digits.")
address_type = s[0] if s[1].isdigit() else s[:2] # 获取前缀,可以是'M', 'D', 或者 'HD'
address_value = int(s[len(address_type):]) # 获取后面的数字部分
converted_address = address_value
# 根据PLC的前缀来决定如何映射到Modbus地址
if address_type == 'M':
converted_address = address_value
# 假设'M'类型的地址映射到Modbus的Coil
modbus_type = "Coil"
elif address_type == 'HD':
# 假设'D'类型的地址映射到Modbus的Holding Register
converted_address = address_value + 41088
modbus_type = "Holding_Register"
elif address_type == 'D':
# 'HD'可能指的是特殊功能或更高位数据的地址
converted_address = address_value
modbus_type = "Data_Register"
return converted_address
def _start_connect_state_polling(self):
"""启动后台线程定时读取连接状态"""
self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True)
self._timer_thread.start()
def _poll_connect_state(self):
"""后台线程函数每隔300ms读取连接状态并更新缓存"""
while not self._stop_event.is_set():
# 读取连接状态单元ID为1读取5个寄存器
try:
# 使用锁保护串口访问
# print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}")
result = self.read_holding_registers(self._connect_state_address, 9, self._slave)
# print_with_timestamp(f"modbus plc end read connect state---1 {result}")
if result and result.get('status') == 'success' and result.get('data'):
self._connect_state_cache = result['data']
else:
self._connect_state_cache = [0] * 9
except Exception as e:
print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red')
self._connect_state_cache = [0] * 9
time.sleep(self._poll_connect_state_interval) # 每隔500ms
def get_connect_state_cache(self, connect_state_address, slave):
self._connect_state_address = connect_state_address
self._slave = slave
"""返回缓存的连接状态,不进行实时读取"""
if self.plc_port == 'com30' and 0:
print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}")
return self._connect_state_cache
def read_coils(self, address, count, unit=1):
# print("modbus plc read_coils", address)
# 模拟线圈读取
# 直接使用传入的Modbus地址
start_addr = int(address)
bits = []
for i in range(count):
# 简单模拟返回False
bits.append(False)
return {"status": "success", "data": bits}
def write_single_coil(self, address, value, unit=1):
print("modbus plc write_single_coil ", address, value)
# 模拟线圈写入
# 直接使用传入的Modbus地址
addr = int(address)
print_with_timestamp(f"Mock write coil {addr} = {value}", color='blue')
return {"status": "success"}
def stop_polling(self):
"""停止后台线程(包括连接状态轮询和通道模拟线程)"""
print_with_timestamp(f"stop_polling called for {self.plc_port}", color='yellow')
# 设置停止事件
self._stop_event.set()
# 停止所有通道模拟线程
for station_idx in list(self.station_thread_controls.keys()):
self.station_thread_controls[station_idx]['stop'] = True
print_with_timestamp(f"Setting stop flag for station {station_idx}", color='yellow')
# 等待所有通道线程结束(增加等待时间到 3 秒)
for station_idx, thread in list(self.station_threads.items()):
if thread and thread.is_alive():
print_with_timestamp(f"Waiting for station {station_idx} thread to stop...", color='yellow')
thread.join(timeout=3) # 增加到 3 秒
if thread.is_alive():
print_with_timestamp(f"Station {station_idx} thread did not stop in time!", color='red')
else:
print_with_timestamp(f"Stopped station {station_idx} simulation thread", color='green')
# 停止连接状态轮询线程
if self._timer_thread and self._timer_thread.is_alive():
self._timer_thread.join(timeout=1)
print_with_timestamp(f"Stopped polling thread for {self.plc_port}", color='green')
def read_holding_registers(self, address, count, unit=1):
# 模拟寄存器读取
# 直接使用传入的Modbus地址
# 如果已经停止,返回空数据避免触发回调
if self._stop_event.is_set():
return {"status": "error", "message": "PLC stopped"}
start_addr = int(address)
values = []
for i in range(count):
reg_addr = start_addr + i
values.append(self._mock_registers.get(reg_addr, 0x0000))
if self.plc_port == 'com7':
pass
#print_with_timestamp(f"modbus plc read_holding_registers {self.plc_port} {start_addr} {count} {values}")
return {"status": "success", "data": values}
def read_input_registers(self, address, count, unit=1):
# 模拟输入寄存器读取
# 直接使用传入的Modbus地址
start_addr = int(address)
values = []
for i in range(count):
reg_addr = start_addr + i
values.append(self._mock_registers.get(reg_addr, 0x0000))
return {"status": "success", "data": values}
def write_single_register(self, address, value, unit=1):
print(f"modbus plc write_single_register-", address, value, unit)
# 模拟寄存器写入
# 直接使用传入的Modbus地址
addr = int(address)
self._mock_registers[addr] = value
# 检查是否有DUT信息写入如果是则启动测试
self._check_and_start_test_on_dut_write(addr, value)
print_with_timestamp(f"Mock write register {addr} = {value}", color='blue')
return {"status": "success"}
def write_registers(self, address, values, unit=1):
# 模拟多个寄存器写入
# 直接使用传入的Modbus地址
start_addr = int(address)
for i, value in enumerate(values):
self._mock_registers[start_addr + i] = value
# 检查是否是DUT信息写入
self._check_and_start_test_on_dut_write(start_addr + i, value)
print_with_timestamp(f"Mock write registers {start_addr} = {values}", color='blue')
return {"status": "success"}
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
# print(f"modbus plc read holding registers {address} {length}")
resp = {'status': 'error'}
result = self.read_holding_registers(address, length, plc_no)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']}
return resp
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
resp = {'status': 'error'}
result = self.read_coils(address, length, plc_no)
utils.condition_print('plc_read_bits result', result) # cyx
if 'status' in result and result['status'] == 'success' and 'data' in result:
bits = result['data']
resp = {'status': 'success', 'length': len(bits), 'data': bits}
return resp
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
return self.write_single_register(address, data, plc_no)
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
return self.write_single_register(address, data, plc_no)
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
# print("modbus plc write words",plc_no,start_addr,length,data) # cyx
return self.write_registers(start_addr, data, plc_no)
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01):
return self.write_single_coil(address, data, plc_no)
def _check_and_start_test_on_dut_write(self, addr, value):
"""检查是否是DUT信息写入并启动测试"""
if value != 1:
return
# 检查写入的地址是否是某个通道的DUT信息标志位
for station_idx, station_reg in enumerate(STATION_REG_BASE):
dut_info_addr = self.conv_register_address(station_reg.get('dut_info', f'HD{station_idx}000'))
if addr == dut_info_addr:
# DUT信息写入标志为1启动该通道的测试
station_status_addr = self.conv_register_address(station_reg.get('station_status', f'D{11+station_idx}'))
self._mock_registers[station_status_addr] = 1 # 设置状态为"试验中"
# 启动该通道的模拟线程
self._start_station_simulation_thread(station_idx)
break
def _start_station_simulation_thread(self, station_idx):
"""为指定通道启动模拟线程"""
# 如果该通道已经有线程在运行,则先停止它
if station_idx in self.station_threads and self.station_threads[station_idx].is_alive():
if station_idx in self.station_thread_controls:
self.station_thread_controls[station_idx]['stop'] = True
self.station_threads[station_idx].join(timeout=2)
# 创建新的控制变量
control = {'stop': False}
self.station_thread_controls[station_idx] = control
# 启动新的模拟线程
thread = threading.Thread(target=self._simulate_station_test, args=(station_idx, control), daemon=True)
self.station_threads[station_idx] = thread
thread.start()
print_with_timestamp(f"Started simulation thread for station {station_idx}", color='green')
def _simulate_station_test(self, station_idx, control):
"""模拟指定通道的测试过程"""
station_reg = STATION_REG_BASE[station_idx]
# 获取相关寄存器地址
station_status_addr = self.conv_register_address(station_reg.get('station_status'))
station_cycles_addr = self.conv_register_address(station_reg.get('station_cycles'))
station_read_index_addr = self.conv_register_address(station_reg.get('station_read_index'))
station_drop_result_addr = self.conv_register_address(station_reg.get('station_drop_result'))
# 初始化计数器
direction_index = 0 # 当前方向索引
directions = PLC_DROP_PARAMS # 所有方向的参数
print_with_timestamp(f"Starting simulation for station {station_idx+1}", color='blue')
while not control['stop'] and not self._stop_event.is_set() and direction_index < len(directions):
direction = directions[direction_index]
direction_cycle_count = 0 # 当前方向的跌落次数
print_with_timestamp(f"Station {station_idx} starting direction {direction['dropCode']}", color='cyan')
# 设置状态为"试验中"
self._mock_registers[station_status_addr] = 1
# 模拟当前方向的跌落
while direction_cycle_count < direction['dropCycles'] and not control['stop'] and not self._stop_event.is_set():
# 模拟1.5秒的跌落周期(使用可中断的 sleep
for _ in range(15): # 1.5秒 = 15 * 0.1秒
if control['stop'] or self._stop_event.is_set():
print_with_timestamp(f"Station {station_idx} 收到停止信号,退出测试", color='red')
return
time.sleep(0.1)
# 增加跌落次数
direction_cycle_count += 1
# 更新station_cycles (一直增加)
current_cycles = self._mock_registers.get(station_cycles_addr, 0)
self._mock_registers[station_cycles_addr] = current_cycles + 1
# 更新station_read_index (1-10循环)
current_read_index = self._mock_registers.get(station_read_index_addr, 0)
new_read_index = (current_read_index % 10) + 1
self._mock_registers[station_read_index_addr] = new_read_index
# 生成并写入模拟结果数据
result_data = self._generate_mock_result_data(
station_idx,
direction,
direction_cycle_count,
current_cycles + 1
)
# 计算结果写入位置 (根据read_index)
result_offset = (new_read_index - 1) * 44 # 每组结果44个寄存器
for i, value in enumerate(result_data):
self._mock_registers[station_drop_result_addr + result_offset + i] = value
print_with_timestamp(
f"Station {station_idx} direction {direction['dropCode']} cycle {direction_cycle_count}/{direction['dropCycles']}, "
f"total cycles: {current_cycles + 1}, read_index: {new_read_index}",
color='blue'
)
# 当前方向完成
if direction_index <len(directions) and not control['stop'] and not self._stop_event.is_set():
# 暂停3秒
self._mock_registers[station_status_addr] = 2 # 暂停状态
print_with_timestamp(
f"Station {station_idx} direction {direction['dropCode']} completed, pausing for 3 seconds...",
color='yellow'
)
# 等待3秒
for _ in range(50): # 3秒 = 30 * 0.1秒
if control['stop'] or self._stop_event.is_set():
return
time.sleep(0.1)
# 切换到下一个方向
direction_index += 1
# 所有方向完成
if direction_index >= len(directions) and not control['stop'] and not self._stop_event.is_set():
self._mock_registers[station_status_addr] = 3 # 完成状态
print_with_timestamp(f"Station {station_idx} all directions completed!", color='green')
def _generate_mock_result_data(self, station_idx, direction, direction_cycle_count, total_cycles):
"""生成模拟的测试结果数据"""
from utils import PLCTestReqResultData
# 获取当前时间
now = datetime.datetime.now()
# 生成44个寄存器的数据 (根据PLCTestReqResultData结构)
result_registers = [0] * 44
# beginYear, beginMonth, beginDay, beginHour, beginMinute, beginSecond
result_registers[0] = now.year
result_registers[1] = now.month
result_registers[2] = now.day
result_registers[3] = now.hour
result_registers[4] = now.minute
result_registers[5] = now.second
# endYear, endMonth, endDay, endHour, endMinute, endSecond
result_registers[6] = now.year
result_registers[7] = now.month
result_registers[8] = now.day
result_registers[9] = now.hour
result_registers[10] = now.minute
result_registers[11] = now.second
# directionCode
result_registers[12] = direction['dropCode']
# station_no
result_registers[13] = station_idx + 1
# itemSN (流水号) - 2个寄存器
result_registers[14] = total_cycles & 0xFFFF
result_registers[15] = (total_cycles >> 16) & 0xFFFF
# dropHeight - 2个寄存器
result_registers[16] = direction['dropHeight'] & 0xFFFF
result_registers[17] = (direction['dropHeight'] >> 16) & 0xFFFF
# dropSpeed - 2个寄存器 (模拟速度)
result_registers[18] = 1500 & 0xFFFF
result_registers[19] = 0
# dutDropCycles (样品累计已完成跌落次数) - 2个寄存器
result_registers[20] = total_cycles & 0xFFFF
result_registers[21] = (total_cycles >> 16) & 0xFFFF
# itemCurrentCycles (样品此方向已经完成跌落次数) - 2个寄存器
result_registers[22] = direction_cycle_count & 0xFFFF
result_registers[23] = (direction_cycle_count >> 16) & 0xFFFF
# stationDropCycles (本通道自从维护以来累计跌落次数) - 2个寄存器
result_registers[24] = total_cycles & 0xFFFF
result_registers[25] = (total_cycles >> 16) & 0xFFFF
# machine_sn - 18个寄存器代表36个字节
machine_sn = f"MOCK_MACHINE_{station_idx}"
sn_registers = utils.string_to_registers(machine_sn, 36)
for i, reg in enumerate(sn_registers):
if i < 18:
result_registers[26 + i] = reg
return result_registers
def close(self):
# 停止所有模拟线程
for station_idx in self.station_thread_controls:
self.station_thread_controls[station_idx]['stop'] = True
# 等待所有线程结束
for station_idx, thread in self.station_threads.items():
if thread.is_alive():
thread.join(timeout=1)
if self.mock_mode:
return
self.client.close()
class ModbusPLC_Real:
def __init__(self, plc_port, comm_config):
self.plc_port = plc_port
self.plc_com_state = 0
self.client = None
self._connect_state_cache = [0] * 5 # 初始化连接状态缓存假设5个寄存器
self._connect_state_address = 10
self._slave = 1
self._poll_connect_state_interval = 0.5 # 500ms
self._serial_lock = threading.Lock() # 用于同步串口访问
self._stop_event = threading.Event()
self._timer_thread = None
self._initialize_client(comm_config)
self._start_connect_state_polling()
def _initialize_client(self, comm_config):
try:
self.client = ModbusSerialClient(method="rtu", port=self.plc_port,
baudrate=comm_config.get('baudrate'),
parity=comm_config.get('parity'), timeout=1)
self.client.connect()
if self.port_is_opened():
self.plc_com_state = 1 # 表示串口能打开
print_with_timestamp(f"Modbus PLC open {self.client} {self.plc_port} {comm_config['baudrate']} "
f"{comm_config['parity']} success", color='green') # cyx
except Exception as e:
print_with_timestamp(f"Mmodbus PLC open {self.plc_port} "
f"{comm_config.get('baudrate')} {comm_config.get('parity')} error", color='red')
pass
def conv_to_plc_address(self, type_char, value):
# 放在modbus tcp client 进行转换
return int(value)
"""
if type_char == 'M':
return int(value)
if type_char =='D' or type_char == 'HD':
return int(value) + 41088
return int(value)
"""
def connect(self):
try:
if not self.client.connect():
print("Failed to connect to Modbus server in PLC")
return False
print("Modbus serial Client Connected to PLC by", self.plc_port)
return True
except ConnectionException as e:
print(f"Connection failed: {str(e)}")
return False
def reconnect(self):
return True # 20250908
if self.client:
self.client.close()
return self.connect()
def port_is_opened(self):
# Check if the serial connection is open
if self.client is None:
return False
return self.client.socket is not None if hasattr(self.client, 'socket') else False
def _start_connect_state_polling(self):
"""启动后台线程定时读取连接状态"""
self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True)
self._timer_thread.start()
def _poll_connect_state(self):
"""后台线程函数每隔300ms读取连接状态并更新缓存"""
while not self._stop_event.is_set():
if self.plc_com_state == 0:
# 尝试重新连接串口
try:
self.client.connect()
if self.port_is_opened():
self.plc_com_state = 1
self._poll_connect_state_interval = 0.5
else:
self.plc_com_state = 0
self._poll_connect_state_interval = 5 # 串口不通为5秒
except Exception as e:
self.plc_com_state = 0
self._poll_connect_state_interval = 5
else:
# 读取连接状态单元ID为1读取5个寄存器
try:
# 使用锁保护串口访问
# print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}")
result = self.read_holding_registers(self._connect_state_address, 9, self._slave)
# print_with_timestamp(f"modbus plc end read connect state---1 {result}")
if result and result.get('status') == 'success' and result.get('data'):
self._connect_state_cache = result['data']
else:
self._connect_state_cache = [0] * 9
except Exception as e:
print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red')
self._connect_state_cache = [0] * 9
time.sleep(self._poll_connect_state_interval) # 每隔500ms
def get_connect_state_cache(self, connect_state_address, slave):
self._connect_state_address = connect_state_address
self._slave = slave
"""返回缓存的连接状态,不进行实时读取"""
if self.plc_port == 'com30'and 0:
print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}")
return self._connect_state_cache
def stop_polling(self):
"""停止后台线程"""
self._stop_event.set()
if self._timer_thread and self._timer_thread.is_alive():
self._timer_thread.join(timeout=1)
def get_connect_state(self, connect_state_address, slave):
if self.plc_com_state == 0:
return 0x00
try:
result = self.read_holding_registers(connect_state_address, 0x05, slave) # 20250905 读取5个 叠加运行状态
#print_with_timestamp(f"modbus plc get_connect_state com={self.plc_port} addr={connect_state_address} "
# f"{slave} {result}")
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
return result['data']
else:
return [0x00, 0x00, 0x00, 0x00, 0x00]
except Exception as e:
return 0x00
def read_coils(self, address, count, unit=1):
# print("modbus plc read_coils", address)
with self._serial_lock:
try:
result = self.client.read_coils(address, count, slave=unit)
if result.isError():
return {"status": "error", "msg": str(result)}
return {"status": "success", "data": result.bits}
except (ModbusException, ConnectionException) as e:
print(f"Read error: {str(e)}")
if self.reconnect():
return self.read_coils(address, count, unit)
else:
return {"status": "error", "msg": str(e)}
def write_single_coil(self, address, value, unit=1):
print("modbus plc write_single_coil ", address, value)
with self._serial_lock:
try:
result = self.client.write_coil(address, value, 1)
if result.isError():
print("modbus plc write_single_coil error", str(result))
return {"status": "error", "msg": str(result)}
return {"status": "success"}
except (ModbusException, ConnectionException) as e:
print(f"Write error: {str(e)}")
if self.reconnect():
return self.write_single_coil(address, value, unit)
else:
return {"status": "error", "msg": str(e)}
def read_holding_registers(self, address, count, unit=1):
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to connect to open serial port"}
try:
result = self.client.read_holding_registers(address, count, slave=unit)
if result.isError():
return {"status": "error", "msg": str(result)}
return {"status": "success", "data": result.registers}
except (ModbusException, ConnectionException) as e:
return {"status": "error", "msg": str(e)}
def read_input_registers(self, address, count, unit=1):
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to open serial port"}
try:
result = self.client.read_input_registers(address, count, slave=unit)
if result.isError():
return {"status": "error", "msg": str(result)}
return {"status": "success", "data": result.registers}
except (ModbusException, ConnectionException) as e:
print(f"Read error: {str(e)}")
if self.reconnect():
return self.read_input_registers(address, count, unit)
else:
return {"status": "error", "msg": str(e)}
def write_single_register(self, address, value, unit=1):
print(f"modbus plc write_single_register-", address, value, unit)
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to open serial port"}
try:
try:
result = self.client.write_register(address, value, unit)
except Exception as e:
print(f"modbus plc write_single_register- error: {str(e)}")
if result.isError():
print(f"modbus plc write_single_register- error-2: ", result)
return {"status": "error", "msg": str(result)}
return {"status": "success"}
except (ModbusException, ConnectionException) as e:
print(f"Write error: {str(e)}")
if self.reconnect():
return self.write_single_register(address, value, unit)
else:
return {"status": "error", "msg": str(e)}
def write_registers(self, address, values, unit=1):
with self._serial_lock:
if not self.port_is_opened() and not self.reconnect():
return {"status": "error", "msg": "Unable to open serial port"}
try:
result = self.client.write_registers(address, values, unit)
#print(f"modbus plc write_registers {address} {values} {self.plc_port} {result}")
if result.isError():
return {"status": "error",
"msg": str(result)} # We don't raise ValueError anymore, consistent with other methods.
return {"status": "success"}
except (ModbusException, ConnectionException) as e:
print(f"Write error: {str(e)}")
if self.reconnect():
return self.write_registers(address, values,
unit) # Retry the write operation after successful reconnection
else:
return {"status": "error", "msg": str(e)} # Return error if reconnection fails
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
# print(f"modbus plc read holding registers {address} {length}")
resp = {'status': 'error'}
result = self.read_holding_registers(address, length, plc_no)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']}
return resp
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
resp = {'status': 'error'}
result = self.read_coils(address, length, plc_no)
utils.condition_print('plc_read_bits result', result) # cyx
if 'status' in result and result['status'] == 'success' and 'data' in result:
bits = result['data']
resp = {'status': 'success', 'length': len(bits), 'data': bits}
return resp
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
return self.write_single_register(address, data, plc_no)
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
return self.write_single_register(address, data, plc_no)
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
# print("modbus plc write words",plc_no,start_addr,length,data) # cyx
return self.write_registers(start_addr, data, plc_no)
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01):
return self.write_single_coil(address, data, plc_no)
def close(self):
self.client.close()
# 根据 MOCK_MODE 的值确定实际使用的类
def get_modbus_plc_class():
"""根据 MOCK_MODE 返回对应的 PLC 类"""
if MOCK_MODE:
return ModbusPLC_Mock
else:
return ModbusPLC_Real
# 保持向后兼容,但使用函数动态获取
def ModbusPLC(plc_port, comm_config):
"""
动态创建 PLC 实例
根据当前 MOCK_MODE 的值选择使用 Mock Real
"""
plc_class = get_modbus_plc_class()
return plc_class(plc_port, comm_config)