772 lines
36 KiB
Python
772 lines
36 KiB
Python
import array
|
||
import time
|
||
import datetime
|
||
import utils
|
||
from utils import print_with_timestamp
|
||
import re
|
||
from pymodbus.client import ModbusSerialClient as ModbusSerialClient
|
||
from pymodbus.exceptions import ModbusException, ConnectionException
|
||
import threading
|
||
from utils import PLCDropParamsData
|
||
|
||
MOCK_MODE = False # 默认使用真实 PLC,通过命令行参数 --mock 启用模拟模式
|
||
|
||
PLC_DROP_PARAMS = [
|
||
{'dropCode': 1, 'dropCycles': 10, 'dropHeight': 7500},
|
||
{'dropCode': 2, 'dropCycles': 10, 'dropHeight': 7600},
|
||
{'dropCode': 3, 'dropCycles': 10, 'dropHeight': 7500},
|
||
{'dropCode': 4, 'dropCycles': 8, 'dropHeight': 7600},
|
||
{'dropCode': 5, 'dropCycles': 7, 'dropHeight': 7500},
|
||
{'dropCode': 6, 'dropCycles': 16, 'dropHeight': 7600},
|
||
]
|
||
# 扩展到4个通道
|
||
STATION_REG_BASE = [
|
||
{'drop_params': 'D3020', 'dut_info': 'HD0300', 'station_status': 'D11', 'station_cycles': 'HD0164',
|
||
'station_read_index': 'D15', 'station_drop_result': 'HD0510'},
|
||
{'drop_params': 'D4020', 'dut_info': 'HD2300', 'station_status': 'D12', 'station_cycles': 'HD0264',
|
||
'station_read_index': 'D16', 'station_drop_result': 'HD1110'},
|
||
{'drop_params': 'D5020', 'dut_info': 'HD4300', 'station_status': 'D13', 'station_cycles': 'HD0364',
|
||
'station_read_index': 'D17', 'station_drop_result': 'HD1710'}
|
||
]
|
||
|
||
|
||
class ModbusPLC_Mock:
|
||
def __init__(self, plc_port, comm_config):
|
||
self.plc_port = plc_port
|
||
self.plc_com_state = 0
|
||
self.mock_mode = True # 添加模拟模式开关
|
||
# 初始化模拟寄存器
|
||
self._mock_registers = {}
|
||
self._initialize_mock_registers()
|
||
self._connect_state_cache = [0] * 5 # 初始化连接状态缓存,假设5个寄存器
|
||
self._connect_state_address = self.conv_register_address('D10')
|
||
self._slave = 1
|
||
self._poll_connect_state_interval = 0.5 # 500ms
|
||
self._serial_lock = threading.Lock() # 用于同步串口访问
|
||
self._stop_event = threading.Event()
|
||
self._timer_thread = None
|
||
# 为每个通道创建模拟线程控制变量
|
||
self.station_threads = {} # 存储每个通道的线程
|
||
self.station_thread_controls = {} # 存储每个通道的控制变量
|
||
self.plc_com_state = 1 # 模拟连接成功
|
||
self._start_connect_state_polling()
|
||
|
||
def _initialize_mock_registers(self):
|
||
"""初始化模拟寄存器"""
|
||
# 初始化方向跌落参数
|
||
drop_params_registers = PLCDropParamsData.to_registers(PLC_DROP_PARAMS)
|
||
drop_params_base_addr1 = self.conv_register_address(STATION_REG_BASE[0].get('drop_params'))
|
||
drop_params_base_addr2 = self.conv_register_address(STATION_REG_BASE[1].get('drop_params'))
|
||
for index, value in enumerate(drop_params_registers):
|
||
self._mock_registers[drop_params_base_addr1 + index] = value
|
||
self._mock_registers[drop_params_base_addr2 + index] = value
|
||
# 初始化连接状态寄存器 (D10-D14)
|
||
# D10: 设备连接状态
|
||
# D11-D14: 通道工作状态
|
||
|
||
for i in range(10, 20):
|
||
type_char, addr_val = 'D', i
|
||
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
|
||
if i == 10:
|
||
self._mock_registers[modbus_addr] = 0x0001
|
||
self._mock_registers[i] = 0x0001
|
||
else:
|
||
self._mock_registers[modbus_addr] = 0x0000
|
||
self._mock_registers[i] = 0x0000
|
||
|
||
# 初始化跌落参数区域 (HD3020-HD3048)
|
||
for i in range(3020, 3049):
|
||
type_char, addr_val = 'HD', i
|
||
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
|
||
self._mock_registers[modbus_addr] = 0x0000
|
||
|
||
# 初始化样品信息区域 (HD300-HD462) - 扩展到支持4个通道
|
||
for i in range(300, 463*4): # 扩展区域以支持更多通道
|
||
type_char, addr_val = 'HD', i
|
||
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
|
||
self._mock_registers[modbus_addr] = 0x0000
|
||
|
||
# 初始化试验结果区域 (HD510-HD2949) - 扩展以支持4个通道,每个通道10组结果
|
||
for i in range(510, 2950):
|
||
type_char, addr_val = 'HD', i
|
||
modbus_addr = self.conv_register_address(f"{type_char}{addr_val}")
|
||
self._mock_registers[modbus_addr] = 0x0000
|
||
|
||
def conv_to_plc_address(self, type_char, value):
|
||
# 放在modbus tcp client 进行转换
|
||
return int(value)
|
||
"""
|
||
if type_char == 'M':
|
||
return int(value)
|
||
if type_char =='D' or type_char == 'HD':
|
||
return int(value) + 41088
|
||
return int(value)
|
||
"""
|
||
|
||
def conv_register_address(self, s):
|
||
address_type = None
|
||
# 使用正则表达式检查格式,确保是一个或两个字母后跟三或四位数字
|
||
pattern = re.compile(r'^([MD]|HD)\d{1,6}$')
|
||
if not pattern.match(s):
|
||
raise ValueError(
|
||
"Invalid format. The string must start with 'M', 'D', or 'HD' followed by three or four digits.")
|
||
|
||
address_type = s[0] if s[1].isdigit() else s[:2] # 获取前缀,可以是'M', 'D', 或者 'HD'
|
||
address_value = int(s[len(address_type):]) # 获取后面的数字部分
|
||
converted_address = address_value
|
||
# 根据PLC的前缀来决定如何映射到Modbus地址
|
||
if address_type == 'M':
|
||
converted_address = address_value
|
||
# 假设'M'类型的地址映射到Modbus的Coil
|
||
modbus_type = "Coil"
|
||
elif address_type == 'HD':
|
||
# 假设'D'类型的地址映射到Modbus的Holding Register
|
||
converted_address = address_value + 41088
|
||
modbus_type = "Holding_Register"
|
||
elif address_type == 'D':
|
||
# 'HD'可能指的是特殊功能或更高位数据的地址
|
||
converted_address = address_value
|
||
modbus_type = "Data_Register"
|
||
|
||
return converted_address
|
||
|
||
def _start_connect_state_polling(self):
|
||
"""启动后台线程定时读取连接状态"""
|
||
self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True)
|
||
self._timer_thread.start()
|
||
|
||
def _poll_connect_state(self):
|
||
"""后台线程函数,每隔300ms读取连接状态并更新缓存"""
|
||
while not self._stop_event.is_set():
|
||
# 读取连接状态,单元ID为1,读取5个寄存器
|
||
try:
|
||
# 使用锁保护串口访问
|
||
# print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}")
|
||
result = self.read_holding_registers(self._connect_state_address, 9, self._slave)
|
||
# print_with_timestamp(f"modbus plc end read connect state---1 {result}")
|
||
if result and result.get('status') == 'success' and result.get('data'):
|
||
self._connect_state_cache = result['data']
|
||
else:
|
||
self._connect_state_cache = [0] * 9
|
||
except Exception as e:
|
||
print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red')
|
||
self._connect_state_cache = [0] * 9
|
||
time.sleep(self._poll_connect_state_interval) # 每隔500ms
|
||
|
||
def get_connect_state_cache(self, connect_state_address, slave):
|
||
self._connect_state_address = connect_state_address
|
||
self._slave = slave
|
||
"""返回缓存的连接状态,不进行实时读取"""
|
||
if self.plc_port == 'com30' and 0:
|
||
print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}")
|
||
return self._connect_state_cache
|
||
def read_coils(self, address, count, unit=1):
|
||
# print("modbus plc read_coils", address)
|
||
# 模拟线圈读取
|
||
# 直接使用传入的Modbus地址
|
||
start_addr = int(address)
|
||
bits = []
|
||
for i in range(count):
|
||
# 简单模拟,返回False
|
||
bits.append(False)
|
||
return {"status": "success", "data": bits}
|
||
|
||
def write_single_coil(self, address, value, unit=1):
|
||
print("modbus plc write_single_coil ", address, value)
|
||
# 模拟线圈写入
|
||
# 直接使用传入的Modbus地址
|
||
addr = int(address)
|
||
print_with_timestamp(f"Mock write coil {addr} = {value}", color='blue')
|
||
return {"status": "success"}
|
||
|
||
def stop_polling(self):
|
||
"""停止后台线程(包括连接状态轮询和通道模拟线程)"""
|
||
print_with_timestamp(f"stop_polling called for {self.plc_port}", color='yellow')
|
||
|
||
# 设置停止事件
|
||
self._stop_event.set()
|
||
|
||
# 停止所有通道模拟线程
|
||
for station_idx in list(self.station_thread_controls.keys()):
|
||
self.station_thread_controls[station_idx]['stop'] = True
|
||
print_with_timestamp(f"Setting stop flag for station {station_idx}", color='yellow')
|
||
|
||
# 等待所有通道线程结束(增加等待时间到 3 秒)
|
||
for station_idx, thread in list(self.station_threads.items()):
|
||
if thread and thread.is_alive():
|
||
print_with_timestamp(f"Waiting for station {station_idx} thread to stop...", color='yellow')
|
||
thread.join(timeout=3) # 增加到 3 秒
|
||
if thread.is_alive():
|
||
print_with_timestamp(f"Station {station_idx} thread did not stop in time!", color='red')
|
||
else:
|
||
print_with_timestamp(f"Stopped station {station_idx} simulation thread", color='green')
|
||
|
||
# 停止连接状态轮询线程
|
||
if self._timer_thread and self._timer_thread.is_alive():
|
||
self._timer_thread.join(timeout=1)
|
||
print_with_timestamp(f"Stopped polling thread for {self.plc_port}", color='green')
|
||
|
||
def read_holding_registers(self, address, count, unit=1):
|
||
# 模拟寄存器读取
|
||
# 直接使用传入的Modbus地址
|
||
# 如果已经停止,返回空数据避免触发回调
|
||
if self._stop_event.is_set():
|
||
return {"status": "error", "message": "PLC stopped"}
|
||
|
||
start_addr = int(address)
|
||
values = []
|
||
for i in range(count):
|
||
reg_addr = start_addr + i
|
||
values.append(self._mock_registers.get(reg_addr, 0x0000))
|
||
if self.plc_port == 'com7':
|
||
pass
|
||
#print_with_timestamp(f"modbus plc read_holding_registers {self.plc_port} {start_addr} {count} {values}")
|
||
return {"status": "success", "data": values}
|
||
|
||
def read_input_registers(self, address, count, unit=1):
|
||
# 模拟输入寄存器读取
|
||
# 直接使用传入的Modbus地址
|
||
start_addr = int(address)
|
||
values = []
|
||
for i in range(count):
|
||
reg_addr = start_addr + i
|
||
values.append(self._mock_registers.get(reg_addr, 0x0000))
|
||
return {"status": "success", "data": values}
|
||
|
||
def write_single_register(self, address, value, unit=1):
|
||
print(f"modbus plc write_single_register-", address, value, unit)
|
||
# 模拟寄存器写入
|
||
# 直接使用传入的Modbus地址
|
||
addr = int(address)
|
||
self._mock_registers[addr] = value
|
||
|
||
# 检查是否有DUT信息写入,如果是则启动测试
|
||
self._check_and_start_test_on_dut_write(addr, value)
|
||
|
||
print_with_timestamp(f"Mock write register {addr} = {value}", color='blue')
|
||
return {"status": "success"}
|
||
|
||
def write_registers(self, address, values, unit=1):
|
||
# 模拟多个寄存器写入
|
||
# 直接使用传入的Modbus地址
|
||
start_addr = int(address)
|
||
for i, value in enumerate(values):
|
||
self._mock_registers[start_addr + i] = value
|
||
# 检查是否是DUT信息写入
|
||
self._check_and_start_test_on_dut_write(start_addr + i, value)
|
||
|
||
print_with_timestamp(f"Mock write registers {start_addr} = {values}", color='blue')
|
||
return {"status": "success"}
|
||
|
||
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
|
||
# print(f"modbus plc read holding registers {address} {length}")
|
||
resp = {'status': 'error'}
|
||
result = self.read_holding_registers(address, length, plc_no)
|
||
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
|
||
resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']}
|
||
return resp
|
||
|
||
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
|
||
resp = {'status': 'error'}
|
||
result = self.read_coils(address, length, plc_no)
|
||
utils.condition_print('plc_read_bits result', result) # cyx
|
||
if 'status' in result and result['status'] == 'success' and 'data' in result:
|
||
bits = result['data']
|
||
resp = {'status': 'success', 'length': len(bits), 'data': bits}
|
||
return resp
|
||
|
||
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
|
||
return self.write_single_register(address, data, plc_no)
|
||
|
||
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
|
||
return self.write_single_register(address, data, plc_no)
|
||
|
||
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
|
||
# print("modbus plc write words",plc_no,start_addr,length,data) # cyx
|
||
return self.write_registers(start_addr, data, plc_no)
|
||
|
||
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01):
|
||
return self.write_single_coil(address, data, plc_no)
|
||
|
||
def _check_and_start_test_on_dut_write(self, addr, value):
|
||
"""检查是否是DUT信息写入并启动测试"""
|
||
if value != 1:
|
||
return
|
||
|
||
# 检查写入的地址是否是某个通道的DUT信息标志位
|
||
for station_idx, station_reg in enumerate(STATION_REG_BASE):
|
||
dut_info_addr = self.conv_register_address(station_reg.get('dut_info', f'HD{station_idx}000'))
|
||
if addr == dut_info_addr:
|
||
# DUT信息写入标志为1,启动该通道的测试
|
||
station_status_addr = self.conv_register_address(station_reg.get('station_status', f'D{11+station_idx}'))
|
||
self._mock_registers[station_status_addr] = 1 # 设置状态为"试验中"
|
||
# 启动该通道的模拟线程
|
||
self._start_station_simulation_thread(station_idx)
|
||
break
|
||
|
||
def _start_station_simulation_thread(self, station_idx):
|
||
"""为指定通道启动模拟线程"""
|
||
# 如果该通道已经有线程在运行,则先停止它
|
||
if station_idx in self.station_threads and self.station_threads[station_idx].is_alive():
|
||
if station_idx in self.station_thread_controls:
|
||
self.station_thread_controls[station_idx]['stop'] = True
|
||
self.station_threads[station_idx].join(timeout=2)
|
||
|
||
# 创建新的控制变量
|
||
control = {'stop': False}
|
||
self.station_thread_controls[station_idx] = control
|
||
|
||
# 启动新的模拟线程
|
||
thread = threading.Thread(target=self._simulate_station_test, args=(station_idx, control), daemon=True)
|
||
self.station_threads[station_idx] = thread
|
||
thread.start()
|
||
|
||
print_with_timestamp(f"Started simulation thread for station {station_idx}", color='green')
|
||
|
||
def _simulate_station_test(self, station_idx, control):
|
||
"""模拟指定通道的测试过程"""
|
||
station_reg = STATION_REG_BASE[station_idx]
|
||
|
||
# 获取相关寄存器地址
|
||
station_status_addr = self.conv_register_address(station_reg.get('station_status'))
|
||
station_cycles_addr = self.conv_register_address(station_reg.get('station_cycles'))
|
||
station_read_index_addr = self.conv_register_address(station_reg.get('station_read_index'))
|
||
station_drop_result_addr = self.conv_register_address(station_reg.get('station_drop_result'))
|
||
|
||
# 初始化计数器
|
||
direction_index = 0 # 当前方向索引
|
||
directions = PLC_DROP_PARAMS # 所有方向的参数
|
||
|
||
print_with_timestamp(f"Starting simulation for station {station_idx+1}", color='blue')
|
||
|
||
while not control['stop'] and not self._stop_event.is_set() and direction_index < len(directions):
|
||
direction = directions[direction_index]
|
||
direction_cycle_count = 0 # 当前方向的跌落次数
|
||
|
||
print_with_timestamp(f"Station {station_idx} starting direction {direction['dropCode']}", color='cyan')
|
||
|
||
# 设置状态为"试验中"
|
||
self._mock_registers[station_status_addr] = 1
|
||
|
||
# 模拟当前方向的跌落
|
||
while direction_cycle_count < direction['dropCycles'] and not control['stop'] and not self._stop_event.is_set():
|
||
# 模拟1.5秒的跌落周期(使用可中断的 sleep)
|
||
for _ in range(15): # 1.5秒 = 15 * 0.1秒
|
||
if control['stop'] or self._stop_event.is_set():
|
||
print_with_timestamp(f"Station {station_idx} 收到停止信号,退出测试", color='red')
|
||
return
|
||
time.sleep(0.1)
|
||
|
||
# 增加跌落次数
|
||
direction_cycle_count += 1
|
||
|
||
# 更新station_cycles (一直增加)
|
||
current_cycles = self._mock_registers.get(station_cycles_addr, 0)
|
||
self._mock_registers[station_cycles_addr] = current_cycles + 1
|
||
|
||
# 更新station_read_index (1-10循环)
|
||
current_read_index = self._mock_registers.get(station_read_index_addr, 0)
|
||
new_read_index = (current_read_index % 10) + 1
|
||
self._mock_registers[station_read_index_addr] = new_read_index
|
||
|
||
# 生成并写入模拟结果数据
|
||
result_data = self._generate_mock_result_data(
|
||
station_idx,
|
||
direction,
|
||
direction_cycle_count,
|
||
current_cycles + 1
|
||
)
|
||
|
||
# 计算结果写入位置 (根据read_index)
|
||
result_offset = (new_read_index - 1) * 44 # 每组结果44个寄存器
|
||
for i, value in enumerate(result_data):
|
||
self._mock_registers[station_drop_result_addr + result_offset + i] = value
|
||
|
||
print_with_timestamp(
|
||
f"Station {station_idx} direction {direction['dropCode']} cycle {direction_cycle_count}/{direction['dropCycles']}, "
|
||
f"total cycles: {current_cycles + 1}, read_index: {new_read_index}",
|
||
color='blue'
|
||
)
|
||
|
||
# 当前方向完成
|
||
if direction_index <len(directions) and not control['stop'] and not self._stop_event.is_set():
|
||
# 暂停3秒
|
||
self._mock_registers[station_status_addr] = 2 # 暂停状态
|
||
print_with_timestamp(
|
||
f"Station {station_idx} direction {direction['dropCode']} completed, pausing for 3 seconds...",
|
||
color='yellow'
|
||
)
|
||
|
||
# 等待3秒
|
||
for _ in range(50): # 3秒 = 30 * 0.1秒
|
||
if control['stop'] or self._stop_event.is_set():
|
||
return
|
||
time.sleep(0.1)
|
||
|
||
# 切换到下一个方向
|
||
direction_index += 1
|
||
|
||
# 所有方向完成
|
||
if direction_index >= len(directions) and not control['stop'] and not self._stop_event.is_set():
|
||
self._mock_registers[station_status_addr] = 3 # 完成状态
|
||
print_with_timestamp(f"Station {station_idx} all directions completed!", color='green')
|
||
|
||
def _generate_mock_result_data(self, station_idx, direction, direction_cycle_count, total_cycles):
|
||
"""生成模拟的测试结果数据"""
|
||
from utils import PLCTestReqResultData
|
||
|
||
# 获取当前时间
|
||
now = datetime.datetime.now()
|
||
|
||
# 生成44个寄存器的数据 (根据PLCTestReqResultData结构)
|
||
result_registers = [0] * 44
|
||
|
||
# beginYear, beginMonth, beginDay, beginHour, beginMinute, beginSecond
|
||
result_registers[0] = now.year
|
||
result_registers[1] = now.month
|
||
result_registers[2] = now.day
|
||
result_registers[3] = now.hour
|
||
result_registers[4] = now.minute
|
||
result_registers[5] = now.second
|
||
|
||
# endYear, endMonth, endDay, endHour, endMinute, endSecond
|
||
result_registers[6] = now.year
|
||
result_registers[7] = now.month
|
||
result_registers[8] = now.day
|
||
result_registers[9] = now.hour
|
||
result_registers[10] = now.minute
|
||
result_registers[11] = now.second
|
||
|
||
# directionCode
|
||
result_registers[12] = direction['dropCode']
|
||
|
||
# station_no
|
||
result_registers[13] = station_idx + 1
|
||
|
||
# itemSN (流水号) - 2个寄存器
|
||
result_registers[14] = total_cycles & 0xFFFF
|
||
result_registers[15] = (total_cycles >> 16) & 0xFFFF
|
||
|
||
# dropHeight - 2个寄存器
|
||
result_registers[16] = direction['dropHeight'] & 0xFFFF
|
||
result_registers[17] = (direction['dropHeight'] >> 16) & 0xFFFF
|
||
|
||
# dropSpeed - 2个寄存器 (模拟速度)
|
||
result_registers[18] = 1500 & 0xFFFF
|
||
result_registers[19] = 0
|
||
|
||
# dutDropCycles (样品累计已完成跌落次数) - 2个寄存器
|
||
result_registers[20] = total_cycles & 0xFFFF
|
||
result_registers[21] = (total_cycles >> 16) & 0xFFFF
|
||
|
||
# itemCurrentCycles (样品此方向已经完成跌落次数) - 2个寄存器
|
||
result_registers[22] = direction_cycle_count & 0xFFFF
|
||
result_registers[23] = (direction_cycle_count >> 16) & 0xFFFF
|
||
|
||
# stationDropCycles (本通道自从维护以来累计跌落次数) - 2个寄存器
|
||
result_registers[24] = total_cycles & 0xFFFF
|
||
result_registers[25] = (total_cycles >> 16) & 0xFFFF
|
||
|
||
# machine_sn - 18个寄存器代表36个字节
|
||
machine_sn = f"MOCK_MACHINE_{station_idx}"
|
||
sn_registers = utils.string_to_registers(machine_sn, 36)
|
||
for i, reg in enumerate(sn_registers):
|
||
if i < 18:
|
||
result_registers[26 + i] = reg
|
||
|
||
return result_registers
|
||
|
||
def close(self):
|
||
# 停止所有模拟线程
|
||
for station_idx in self.station_thread_controls:
|
||
self.station_thread_controls[station_idx]['stop'] = True
|
||
|
||
# 等待所有线程结束
|
||
for station_idx, thread in self.station_threads.items():
|
||
if thread.is_alive():
|
||
thread.join(timeout=1)
|
||
|
||
if self.mock_mode:
|
||
return
|
||
self.client.close()
|
||
|
||
class ModbusPLC_Real:
|
||
def __init__(self, plc_port, comm_config):
|
||
self.plc_port = plc_port
|
||
self.plc_com_state = 0
|
||
self.client = None
|
||
self._connect_state_cache = [0] * 5 # 初始化连接状态缓存,假设5个寄存器
|
||
self._connect_state_address = 10
|
||
self._slave = 1
|
||
self._poll_connect_state_interval = 0.5 # 500ms
|
||
self._serial_lock = threading.Lock() # 用于同步串口访问
|
||
self._stop_event = threading.Event()
|
||
self._timer_thread = None
|
||
self._initialize_client(comm_config)
|
||
self._start_connect_state_polling()
|
||
|
||
def _initialize_client(self, comm_config):
|
||
try:
|
||
self.client = ModbusSerialClient(method="rtu", port=self.plc_port,
|
||
baudrate=comm_config.get('baudrate'),
|
||
parity=comm_config.get('parity'), timeout=1)
|
||
self.client.connect()
|
||
if self.port_is_opened():
|
||
self.plc_com_state = 1 # 表示串口能打开
|
||
print_with_timestamp(f"Modbus PLC open {self.client} {self.plc_port} {comm_config['baudrate']} "
|
||
f"{comm_config['parity']} success", color='green') # cyx
|
||
except Exception as e:
|
||
print_with_timestamp(f"Mmodbus PLC open {self.plc_port} "
|
||
f"{comm_config.get('baudrate')} {comm_config.get('parity')} error", color='red')
|
||
pass
|
||
|
||
def conv_to_plc_address(self, type_char, value):
|
||
# 放在modbus tcp client 进行转换
|
||
return int(value)
|
||
"""
|
||
if type_char == 'M':
|
||
return int(value)
|
||
if type_char =='D' or type_char == 'HD':
|
||
return int(value) + 41088
|
||
return int(value)
|
||
"""
|
||
|
||
def connect(self):
|
||
try:
|
||
if not self.client.connect():
|
||
print("Failed to connect to Modbus server in PLC")
|
||
return False
|
||
print("Modbus serial Client Connected to PLC by", self.plc_port)
|
||
return True
|
||
except ConnectionException as e:
|
||
print(f"Connection failed: {str(e)}")
|
||
return False
|
||
|
||
def reconnect(self):
|
||
return True # 20250908
|
||
if self.client:
|
||
self.client.close()
|
||
return self.connect()
|
||
|
||
def port_is_opened(self):
|
||
# Check if the serial connection is open
|
||
if self.client is None:
|
||
return False
|
||
return self.client.socket is not None if hasattr(self.client, 'socket') else False
|
||
|
||
def _start_connect_state_polling(self):
|
||
"""启动后台线程定时读取连接状态"""
|
||
self._timer_thread = threading.Thread(target=self._poll_connect_state, daemon=True)
|
||
self._timer_thread.start()
|
||
|
||
def _poll_connect_state(self):
|
||
"""后台线程函数,每隔300ms读取连接状态并更新缓存"""
|
||
while not self._stop_event.is_set():
|
||
if self.plc_com_state == 0:
|
||
# 尝试重新连接串口
|
||
try:
|
||
self.client.connect()
|
||
if self.port_is_opened():
|
||
self.plc_com_state = 1
|
||
self._poll_connect_state_interval = 0.5
|
||
else:
|
||
self.plc_com_state = 0
|
||
self._poll_connect_state_interval = 5 # 串口不通,为5秒
|
||
except Exception as e:
|
||
self.plc_com_state = 0
|
||
self._poll_connect_state_interval = 5
|
||
else:
|
||
# 读取连接状态,单元ID为1,读取5个寄存器
|
||
try:
|
||
# 使用锁保护串口访问
|
||
# print_with_timestamp(f"modbus plc begin read connect state---0 {self.plc_port}")
|
||
result = self.read_holding_registers(self._connect_state_address, 9, self._slave)
|
||
# print_with_timestamp(f"modbus plc end read connect state---1 {result}")
|
||
if result and result.get('status') == 'success' and result.get('data'):
|
||
self._connect_state_cache = result['data']
|
||
else:
|
||
self._connect_state_cache = [0] * 9
|
||
except Exception as e:
|
||
print_with_timestamp(f"Error polling connect state for {self.plc_port}: {e}", color='red')
|
||
self._connect_state_cache = [0] * 9
|
||
time.sleep(self._poll_connect_state_interval) # 每隔500ms
|
||
|
||
def get_connect_state_cache(self, connect_state_address, slave):
|
||
self._connect_state_address = connect_state_address
|
||
self._slave = slave
|
||
"""返回缓存的连接状态,不进行实时读取"""
|
||
if self.plc_port == 'com30'and 0:
|
||
print_with_timestamp(f"modbus plc get_connect_state_cache {self.plc_port} {self._connect_state_cache}")
|
||
return self._connect_state_cache
|
||
|
||
def stop_polling(self):
|
||
"""停止后台线程"""
|
||
self._stop_event.set()
|
||
if self._timer_thread and self._timer_thread.is_alive():
|
||
self._timer_thread.join(timeout=1)
|
||
|
||
def get_connect_state(self, connect_state_address, slave):
|
||
if self.plc_com_state == 0:
|
||
return 0x00
|
||
try:
|
||
result = self.read_holding_registers(connect_state_address, 0x05, slave) # 20250905 读取5个 叠加运行状态
|
||
#print_with_timestamp(f"modbus plc get_connect_state com={self.plc_port} addr={connect_state_address} "
|
||
# f"{slave} {result}")
|
||
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
|
||
return result['data']
|
||
else:
|
||
return [0x00, 0x00, 0x00, 0x00, 0x00]
|
||
except Exception as e:
|
||
return 0x00
|
||
|
||
|
||
|
||
def read_coils(self, address, count, unit=1):
|
||
# print("modbus plc read_coils", address)
|
||
with self._serial_lock:
|
||
try:
|
||
result = self.client.read_coils(address, count, slave=unit)
|
||
if result.isError():
|
||
return {"status": "error", "msg": str(result)}
|
||
return {"status": "success", "data": result.bits}
|
||
except (ModbusException, ConnectionException) as e:
|
||
print(f"Read error: {str(e)}")
|
||
if self.reconnect():
|
||
return self.read_coils(address, count, unit)
|
||
else:
|
||
return {"status": "error", "msg": str(e)}
|
||
|
||
def write_single_coil(self, address, value, unit=1):
|
||
print("modbus plc write_single_coil ", address, value)
|
||
with self._serial_lock:
|
||
try:
|
||
result = self.client.write_coil(address, value, 1)
|
||
if result.isError():
|
||
print("modbus plc write_single_coil error", str(result))
|
||
return {"status": "error", "msg": str(result)}
|
||
return {"status": "success"}
|
||
except (ModbusException, ConnectionException) as e:
|
||
print(f"Write error: {str(e)}")
|
||
if self.reconnect():
|
||
return self.write_single_coil(address, value, unit)
|
||
else:
|
||
return {"status": "error", "msg": str(e)}
|
||
|
||
def read_holding_registers(self, address, count, unit=1):
|
||
with self._serial_lock:
|
||
if not self.port_is_opened() and not self.reconnect():
|
||
return {"status": "error", "msg": "Unable to connect to open serial port"}
|
||
try:
|
||
result = self.client.read_holding_registers(address, count, slave=unit)
|
||
if result.isError():
|
||
return {"status": "error", "msg": str(result)}
|
||
return {"status": "success", "data": result.registers}
|
||
except (ModbusException, ConnectionException) as e:
|
||
return {"status": "error", "msg": str(e)}
|
||
|
||
def read_input_registers(self, address, count, unit=1):
|
||
with self._serial_lock:
|
||
if not self.port_is_opened() and not self.reconnect():
|
||
return {"status": "error", "msg": "Unable to open serial port"}
|
||
try:
|
||
result = self.client.read_input_registers(address, count, slave=unit)
|
||
if result.isError():
|
||
return {"status": "error", "msg": str(result)}
|
||
return {"status": "success", "data": result.registers}
|
||
except (ModbusException, ConnectionException) as e:
|
||
print(f"Read error: {str(e)}")
|
||
if self.reconnect():
|
||
return self.read_input_registers(address, count, unit)
|
||
else:
|
||
return {"status": "error", "msg": str(e)}
|
||
|
||
def write_single_register(self, address, value, unit=1):
|
||
print(f"modbus plc write_single_register-", address, value, unit)
|
||
with self._serial_lock:
|
||
if not self.port_is_opened() and not self.reconnect():
|
||
return {"status": "error", "msg": "Unable to open serial port"}
|
||
try:
|
||
try:
|
||
result = self.client.write_register(address, value, unit)
|
||
except Exception as e:
|
||
print(f"modbus plc write_single_register- error: {str(e)}")
|
||
if result.isError():
|
||
print(f"modbus plc write_single_register- error-2: ", result)
|
||
return {"status": "error", "msg": str(result)}
|
||
return {"status": "success"}
|
||
except (ModbusException, ConnectionException) as e:
|
||
print(f"Write error: {str(e)}")
|
||
if self.reconnect():
|
||
return self.write_single_register(address, value, unit)
|
||
else:
|
||
return {"status": "error", "msg": str(e)}
|
||
|
||
def write_registers(self, address, values, unit=1):
|
||
with self._serial_lock:
|
||
if not self.port_is_opened() and not self.reconnect():
|
||
return {"status": "error", "msg": "Unable to open serial port"}
|
||
try:
|
||
result = self.client.write_registers(address, values, unit)
|
||
#print(f"modbus plc write_registers {address} {values} {self.plc_port} {result}")
|
||
if result.isError():
|
||
return {"status": "error",
|
||
"msg": str(result)} # We don't raise ValueError anymore, consistent with other methods.
|
||
return {"status": "success"}
|
||
except (ModbusException, ConnectionException) as e:
|
||
print(f"Write error: {str(e)}")
|
||
if self.reconnect():
|
||
return self.write_registers(address, values,
|
||
unit) # Retry the write operation after successful reconnection
|
||
else:
|
||
return {"status": "error", "msg": str(e)} # Return error if reconnection fails
|
||
|
||
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
|
||
# print(f"modbus plc read holding registers {address} {length}")
|
||
resp = {'status': 'error'}
|
||
result = self.read_holding_registers(address, length, plc_no)
|
||
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
|
||
resp = {'status': 'success', 'length': len(result['data']), 'data': result['data']}
|
||
return resp
|
||
|
||
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
|
||
resp = {'status': 'error'}
|
||
result = self.read_coils(address, length, plc_no)
|
||
utils.condition_print('plc_read_bits result', result) # cyx
|
||
if 'status' in result and result['status'] == 'success' and 'data' in result:
|
||
bits = result['data']
|
||
resp = {'status': 'success', 'length': len(bits), 'data': bits}
|
||
return resp
|
||
|
||
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
|
||
return self.write_single_register(address, data, plc_no)
|
||
|
||
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
|
||
return self.write_single_register(address, data, plc_no)
|
||
|
||
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
|
||
# print("modbus plc write words",plc_no,start_addr,length,data) # cyx
|
||
return self.write_registers(start_addr, data, plc_no)
|
||
|
||
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x01):
|
||
return self.write_single_coil(address, data, plc_no)
|
||
|
||
def close(self):
|
||
self.client.close()
|
||
|
||
# 根据 MOCK_MODE 的值确定实际使用的类
|
||
def get_modbus_plc_class():
|
||
"""根据 MOCK_MODE 返回对应的 PLC 类"""
|
||
if MOCK_MODE:
|
||
return ModbusPLC_Mock
|
||
else:
|
||
return ModbusPLC_Real
|
||
|
||
# 保持向后兼容,但使用函数动态获取
|
||
def ModbusPLC(plc_port, comm_config):
|
||
"""
|
||
动态创建 PLC 实例
|
||
根据当前 MOCK_MODE 的值选择使用 Mock 或 Real 类
|
||
"""
|
||
plc_class = get_modbus_plc_class()
|
||
return plc_class(plc_port, comm_config)
|