721 lines
36 KiB
Python
721 lines
36 KiB
Python
|
|
import asyncio
|
|||
|
|
import json
|
|||
|
|
import re
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
import pymodbus
|
|||
|
|
import serial
|
|||
|
|
import threading
|
|||
|
|
import binascii
|
|||
|
|
import datetime
|
|||
|
|
import requests
|
|||
|
|
from pymodbus.client import ModbusTcpClient
|
|||
|
|
from pymodbus import ModbusException
|
|||
|
|
from pymodbus.exceptions import ModbusIOException
|
|||
|
|
import utils
|
|||
|
|
from utils import PLCDropParamsData, PLCTestReqResultData, print_with_timestamp
|
|||
|
|
|
|||
|
|
PLC_CONNECT_REGISTER = 'D10'
|
|||
|
|
PLC_READ_INDEX_REGISTER = 'D15'
|
|||
|
|
|
|||
|
|
# 代表1个通道(工位)的夹具最多可放多少个样品,可选数 [1 2 4]
|
|||
|
|
# D11 代表第1通道 的运行状态 1--试验中 0--停止中
|
|||
|
|
# D12 代表第1通道 的运行状态 1--试验中 0--停止中
|
|||
|
|
# D13 代表第1通道 的运行状态 1--试验中 0--停止中
|
|||
|
|
# D14 代表第1通道 的运行状态 1--试验中 0--停止中
|
|||
|
|
|
|||
|
|
def get_visible_string(s):
|
|||
|
|
"""
|
|||
|
|
清理字符串,移除非法字符和空字符
|
|||
|
|
|
|||
|
|
:param s: 原始字符串
|
|||
|
|
:return: 清理后的字符串
|
|||
|
|
"""
|
|||
|
|
if not s:
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
# 移除空字符和不可打印字符
|
|||
|
|
# 只保留ASCII可打印字符 (32-126)
|
|||
|
|
visible_str = ''.join(
|
|||
|
|
char for char in s
|
|||
|
|
if 32 <= ord(char) <= 126 or char in '\n\r\t'
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 移除首尾空白字符
|
|||
|
|
visible_str = visible_str.strip()
|
|||
|
|
|
|||
|
|
return visible_str
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ModbusDtMachine:
|
|||
|
|
def __init__(self, plc_type, comm_port, comm_config={}, drop_register={}, plc_address=0x01,
|
|||
|
|
tcp_modbus_unit=0x01, machine_SN='DTM_STA_00001'):
|
|||
|
|
self.connect_thread = None
|
|||
|
|
self.drop_register = drop_register
|
|||
|
|
self.plc_address = plc_address
|
|||
|
|
self.tcp_modbus_unit = tcp_modbus_unit
|
|||
|
|
self.DTM_SN = machine_SN
|
|||
|
|
self.plc_address = tcp_modbus_unit
|
|||
|
|
self.plc_comm_lock = threading.Lock()
|
|||
|
|
self.client = None
|
|||
|
|
self.connection = None
|
|||
|
|
self.preConnection = None
|
|||
|
|
self.plc_type = plc_type
|
|||
|
|
self.comm = comm_port
|
|||
|
|
self.comm_config = comm_config
|
|||
|
|
self.exit_connect_thread = False
|
|||
|
|
self.nextTestResultBufferFlag = 0 # 从PLC读取测试结果,采用A B 双buffer制,上位机读A时,PLC写B,反过来成立
|
|||
|
|
# print_with_timestamp("dtmmodbus client init", comm_port, comm_config)#cyx
|
|||
|
|
|
|||
|
|
# 将三菱PLC使用的MXXX Dxxxx的寄存器地址转换为modbus 的数值寄存器地址
|
|||
|
|
# D0160 --> 160
|
|||
|
|
def conv_register_address(self, s):
|
|||
|
|
address_type = None
|
|||
|
|
# 使用正则表达式检查格式,确保是一个或两个字母后跟三或四位数字
|
|||
|
|
pattern = re.compile(r'^([MD]|HD)\d{1,6}$')
|
|||
|
|
if not pattern.match(s):
|
|||
|
|
raise ValueError(
|
|||
|
|
"Invalid format. The string must start with 'M', 'D', or 'HD' followed by three or four digits.")
|
|||
|
|
|
|||
|
|
address_type = s[0] if s[1].isdigit() else s[:2] # 获取前缀,可以是'M', 'D', 或者 'HD'
|
|||
|
|
address_value = int(s[len(address_type):]) # 获取后面的数字部分
|
|||
|
|
converted_address = address_value
|
|||
|
|
# 根据PLC的前缀来决定如何映射到Modbus地址
|
|||
|
|
if address_type == 'M':
|
|||
|
|
converted_address = address_value
|
|||
|
|
# 假设'M'类型的地址映射到Modbus的Coil
|
|||
|
|
modbus_type = "Coil"
|
|||
|
|
elif address_type == 'HD':
|
|||
|
|
# 假设'D'类型的地址映射到Modbus的Holding Register
|
|||
|
|
if self.plc_type == 'xinjie':
|
|||
|
|
converted_address = address_value + 41088
|
|||
|
|
else: # mhi
|
|||
|
|
converted_address = address_value
|
|||
|
|
modbus_type = "Holding_Register"
|
|||
|
|
elif address_type == 'D':
|
|||
|
|
# 'HD'可能指的是特殊功能或更高位数据的地址
|
|||
|
|
if self.plc_type == 'xinjie':
|
|||
|
|
converted_address = address_value
|
|||
|
|
else: # mhi
|
|||
|
|
converted_address = address_value
|
|||
|
|
modbus_type = "Data_Register"
|
|||
|
|
|
|||
|
|
return modbus_type, converted_address
|
|||
|
|
|
|||
|
|
def conv_drop_height(self, height):
|
|||
|
|
if self.plc_type == 'xinjie':
|
|||
|
|
height_register_value = int(100 * height)
|
|||
|
|
else: # mhi
|
|||
|
|
height_register_value = int(100 * height)
|
|||
|
|
return height_register_value
|
|||
|
|
|
|||
|
|
def is_connected_to_tcp_server(self):
|
|||
|
|
if self.connection:
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def connect_to_modbus_server(self, host='127.0.0.1', port=5020):
|
|||
|
|
if self.client: # 已经连接到modbus Server ,则立即返回
|
|||
|
|
print_with_timestamp(f"Modbus client--plc {self.comm} {self.plc_address} connectted state is OK",
|
|||
|
|
color='blue')
|
|||
|
|
return
|
|||
|
|
# 创建 Modbus 客户端实例
|
|||
|
|
self.client = ModbusTcpClient(host, port=port)
|
|||
|
|
# 连接到 Modbus 服务器
|
|||
|
|
self.connection = self.client.connect()
|
|||
|
|
if self.connection:
|
|||
|
|
print_with_timestamp(
|
|||
|
|
f"Modbus tcp client-- {self.tcp_modbus_unit}-{self.plc_address}-{self.comm} connected modbus tcp server successfully"
|
|||
|
|
, color='green')
|
|||
|
|
else:
|
|||
|
|
print_with_timestamp(
|
|||
|
|
f"Modbus tcp client-- {self.tcp_modbus_unit}-{self.plc_address}--{self.comm} connect to modbus tcp server error",
|
|||
|
|
color='red')
|
|||
|
|
|
|||
|
|
def connect_monitor(self):
|
|||
|
|
while not self.exit_connect_thread:
|
|||
|
|
if not self.is_connected_to_tcp_server():
|
|||
|
|
self.connect_to_modbus_server()
|
|||
|
|
# 等待 5 秒后继续下一次连接尝试
|
|||
|
|
time.sleep(5)
|
|||
|
|
|
|||
|
|
def start_connect_thread(self):
|
|||
|
|
self.connect_thread = threading.Thread(target=self.connect_monitor)
|
|||
|
|
self.connect_thread.start()
|
|||
|
|
|
|||
|
|
def stop_connect_thread(self):
|
|||
|
|
self.exit_connect_thread = True
|
|||
|
|
self.connect_thread.join() # 等待线程结束
|
|||
|
|
|
|||
|
|
def set_station_dropheight(self, station_no, height): # 数据库中的height 是float 并且mm为单位
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} dropheight error"}
|
|||
|
|
height_register_value = self.conv_drop_height(height) # 将mm 单位 的跌落高度转换 寄存器的值
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['height'])
|
|||
|
|
print_with_timestamp(f'ModbusDtMachine set {self.DTM_SN} station {station_no} dropheight {height} mm',
|
|||
|
|
address_type,
|
|||
|
|
address_value, height_register_value) # cyx
|
|||
|
|
resp = self.client.write_registers(address_value, [height_register_value], self.plc_address)
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"set station {station_no} dropheight success"}
|
|||
|
|
else:
|
|||
|
|
print_with_timestamp(f"ModbusDtMachine set station {station_no} dropheight success error")
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def set_station_cycles(self, station_no, cycles):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} cycles error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cycles'])
|
|||
|
|
print_with_timestamp(f"ModbusDtMachine set {self.DTM_SN} station {station_no} cycles {cycles} ",
|
|||
|
|
address_type,
|
|||
|
|
address_value, cycles) # cyx
|
|||
|
|
resp = self.client.write_register(address_value, cycles, self.plc_address)
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"set station{station_no} cycles success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def set_station_finished(self, station_no, finished):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} finished error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cyclesFinished'])
|
|||
|
|
resp = self.client.write_register(address_value, finished, self.plc_address)
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"set station{station_no} finished success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def set_station_dropdirection(self, station_no, direction):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} finished error"}
|
|||
|
|
direction_register = self.drop_register[station_no].get('direction')
|
|||
|
|
print_with_timestamp(f"ModbusDtMachine set_station_dropdirection direction register {direction_register} ")
|
|||
|
|
if direction_register is None or direction is None or direction_register == "":
|
|||
|
|
value_return = {'status': 'success', 'message': f"drop direction set is not demand"}
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['direction'])
|
|||
|
|
resp = self.client.write_register(address_value, direction, self.plc_address)
|
|||
|
|
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"set station{station_no} finished success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def set_station_dutInfo(self, station_no, dutInfo):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} dutInfo error"}
|
|||
|
|
dutBasic_register = self.drop_register[station_no].get('dutBasic')
|
|||
|
|
# print_with_timestamp(f"ModbusDtMachine set_station_dutInfo --0 {dutInfo} {dutBasic_register} {type(dutInfo)} ")
|
|||
|
|
if dutBasic_register is None or dutInfo is None or dutBasic_register == "":
|
|||
|
|
value_return = {'status': 'success', 'message': f"dut info set is not demand"}
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['dutBasic'])
|
|||
|
|
# 20250827 增加判断充UI传下来的是单个dut的SN,还是多个dut的SN数组
|
|||
|
|
dut_info_type = 0 if dutInfo is None else 2 if isinstance(dutInfo, list) else 1 if isinstance(dutInfo,
|
|||
|
|
dict) else 0
|
|||
|
|
if dut_info_type == 1:
|
|||
|
|
dut0 = dutInfo
|
|||
|
|
elif dut_info_type == 2:
|
|||
|
|
dut0 = dutInfo[0]
|
|||
|
|
else:
|
|||
|
|
dut0 = {}
|
|||
|
|
# print_with_timestamp(f"ModbusDtMachine set_station_dutInfo --1 {dut0} {dut_info_type} ")
|
|||
|
|
dutProject = dut0.get("project", "") # 项目代码
|
|||
|
|
dutPhase = dut0.get("phase", "") # 项目阶段
|
|||
|
|
dutProjectType = dut0.get("projectType", "") # 项目方案
|
|||
|
|
dutWeeks = str(dut0.get("weeks", 0)) # 测试周次
|
|||
|
|
dutWorkOrder = dut0.get("workOrder", "") # 工单
|
|||
|
|
registers_value = [0x0001] # 为产品信息有效标识
|
|||
|
|
registers_value.extend(utils.string_to_registers(dutProject, 36))
|
|||
|
|
registers_value.extend(utils.string_to_registers(dutPhase, 36))
|
|||
|
|
registers_value.extend(utils.string_to_registers(dutProjectType, 36))
|
|||
|
|
registers_value.extend(utils.string_to_registers(dutWeeks, 36))
|
|||
|
|
registers_value.extend(utils.string_to_registers(dutWorkOrder, 36))
|
|||
|
|
registers_value_bottom = []
|
|||
|
|
if dut_info_type == 1: # 20250830 为了兼容1个工位(通道) 可能同时放置4个被测样品
|
|||
|
|
registers_value_bottom.extend(utils.string_to_registers(dut0.get('SN', " "), 36)) # 产品序列号
|
|||
|
|
elif dut_info_type == 2:
|
|||
|
|
pass
|
|||
|
|
# registers_value.extend(utils.string_to_registers(dut0.get('SN', " "), 36)) # 产品序列号
|
|||
|
|
for dut in dutInfo: # 20250830 为了兼容1个工位(通道) 可能同时放置4个被测样品
|
|||
|
|
registers_value_bottom.extend(utils.string_to_registers(dut.get('SN', " "), 36))
|
|||
|
|
else: # dufInfo 参数非法
|
|||
|
|
return value_return
|
|||
|
|
# self.client_write 不能写太多数量的寄存器
|
|||
|
|
resp1 = self.client.write_registers(address_value, registers_value, self.plc_address)
|
|||
|
|
resp2 = self.client.write_registers(address_value + len(registers_value), registers_value_bottom,
|
|||
|
|
self.plc_address)
|
|||
|
|
# print_with_timestamp(f"ModbusDtMachine set_station_dutInfo dutInfo register--6 {dutBasic_register} {registers_value} "
|
|||
|
|
# f"{resp1} {resp2}")
|
|||
|
|
if resp1.isError() is False and resp2.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"set station{station_no} dutInfo success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_station_dutInfo(self, station_no, sn_count=1):
|
|||
|
|
"""
|
|||
|
|
从 Modbus 寄存器读取试验样品信息并转换为字符串
|
|||
|
|
|
|||
|
|
:param station_no: 工位号
|
|||
|
|
:param sn_count: SN 数量,默认为1(单个产品)
|
|||
|
|
:return: 包含产品信息的字典
|
|||
|
|
"""
|
|||
|
|
value_return = {'status': 'error', 'message': f"get station{station_no} dutInfo error"}
|
|||
|
|
try:
|
|||
|
|
# 获取寄存器地址
|
|||
|
|
dutBasic_register = self.drop_register[station_no].get('dutBasic')
|
|||
|
|
if dutBasic_register is None or dutBasic_register == "":
|
|||
|
|
value_return = {'status': 'success', 'message': f"dut info get is not demand"}
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
address_type, address_value = self.conv_register_address(dutBasic_register)
|
|||
|
|
|
|||
|
|
# 计算需要读取的寄存器数量
|
|||
|
|
# 固定部分: 1个标识符 + 5个字段 × 18个寄存器/字段 = 91个寄存器
|
|||
|
|
fixed_registers_count = 1 + 5 * 18 # 标识符 + 5个字段(每个36字节=18寄存器)
|
|||
|
|
|
|||
|
|
# 第一次读取:固定部分
|
|||
|
|
response_fixed = self.client.read_holding_registers(
|
|||
|
|
address_value,
|
|||
|
|
fixed_registers_count,
|
|||
|
|
self.plc_address
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if response_fixed.isError():
|
|||
|
|
value_return['message'] = f"Failed to read fixed registers: {response_fixed}"
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
fixed_registers = response_fixed.registers
|
|||
|
|
|
|||
|
|
# 解析标识符
|
|||
|
|
identifier = fixed_registers[0]
|
|||
|
|
if identifier != 0x0001:
|
|||
|
|
value_return['message'] = f"Invalid identifier: {identifier}"
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
# 解析固定字段
|
|||
|
|
dut_info = {}
|
|||
|
|
index = 1 # 跳过标识符
|
|||
|
|
|
|||
|
|
# 项目代码 (36字节 = 18寄存器)
|
|||
|
|
project_registers = fixed_registers[index:index + 18]
|
|||
|
|
dut_info['project'] = utils.registers_to_string(project_registers).strip()
|
|||
|
|
index += 18
|
|||
|
|
|
|||
|
|
# 项目阶段 (36字节 = 18寄存器)
|
|||
|
|
phase_registers = fixed_registers[index:index + 18]
|
|||
|
|
dut_info['phase'] = utils.registers_to_string(phase_registers).strip()
|
|||
|
|
index += 18
|
|||
|
|
|
|||
|
|
# 项目方案 (36字节 = 18寄存器)
|
|||
|
|
projectType_registers = fixed_registers[index:index + 18]
|
|||
|
|
dut_info['projectType'] = utils.registers_to_string(projectType_registers).strip()
|
|||
|
|
index += 18
|
|||
|
|
|
|||
|
|
# 测试周次 (36字节 = 18寄存器)
|
|||
|
|
weeks_registers = fixed_registers[index:index + 18]
|
|||
|
|
weeks_str = utils.registers_to_string(weeks_registers).strip()
|
|||
|
|
dut_info['weeks'] = int(weeks_str) if weeks_str.isdigit() else 0
|
|||
|
|
index += 18
|
|||
|
|
|
|||
|
|
# 工单 (36字节 = 18寄存器)
|
|||
|
|
workOrder_registers = fixed_registers[index:index + 18]
|
|||
|
|
dut_info['workOrder'] = utils.registers_to_string(workOrder_registers).strip()
|
|||
|
|
|
|||
|
|
# 第二次读取:SN部分
|
|||
|
|
sn_list = []
|
|||
|
|
if sn_count > 0:
|
|||
|
|
# 计算SN部分的起始地址和数量
|
|||
|
|
sn_start_address = address_value + fixed_registers_count
|
|||
|
|
sn_registers_count = sn_count * 18 # 每个SN占36字节=18寄存器
|
|||
|
|
|
|||
|
|
response_sn = self.client.read_holding_registers(
|
|||
|
|
sn_start_address,
|
|||
|
|
sn_registers_count,
|
|||
|
|
self.plc_address
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if response_sn.isError():
|
|||
|
|
value_return['message'] = f"Failed to read SN registers: {response_sn}"
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
sn_registers = response_sn.registers
|
|||
|
|
|
|||
|
|
# 解析SN列表
|
|||
|
|
for i in range(sn_count):
|
|||
|
|
start_idx = i * 18
|
|||
|
|
end_idx = start_idx + 18
|
|||
|
|
if end_idx <= len(sn_registers):
|
|||
|
|
sn_part = sn_registers[start_idx:end_idx]
|
|||
|
|
# 检查是否全为0
|
|||
|
|
if all(reg == 0 for reg in sn_part):
|
|||
|
|
continue # 全0则放弃
|
|||
|
|
|
|||
|
|
sn_raw = utils.registers_to_string(sn_part).strip()
|
|||
|
|
sn = get_visible_string(sn_raw)
|
|||
|
|
if not sn:
|
|||
|
|
continue
|
|||
|
|
else:
|
|||
|
|
sn_list.append(sn)
|
|||
|
|
|
|||
|
|
# 根据SN数量组织返回数据
|
|||
|
|
if sn_count == 1:
|
|||
|
|
dut_info['SN'] = sn_list[0] if sn_list else ""
|
|||
|
|
value_return = {
|
|||
|
|
'status': 'success',
|
|||
|
|
'message': f"read station{station_no} dutInfo success",
|
|||
|
|
'data': dut_info
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
# 为每个SN创建一个完整的信息副本
|
|||
|
|
duts_info = []
|
|||
|
|
for sn in sn_list:
|
|||
|
|
dut_copy = dut_info.copy()
|
|||
|
|
dut_copy['SN'] = sn
|
|||
|
|
duts_info.append(dut_copy)
|
|||
|
|
|
|||
|
|
value_return = {
|
|||
|
|
'status': 'success',
|
|||
|
|
'message': f"read station{station_no} dutInfo success",
|
|||
|
|
'data': duts_info
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
value_return['error'] = str(e)
|
|||
|
|
print_with_timestamp(f"Error in read_station_dutInfo: {e}", color='red')
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_station_dropheight(self, station_no):
|
|||
|
|
value_return = {'status': 'error', 'message': f"read station{station_no} dropheight error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['height'])
|
|||
|
|
resp = self.client.read_holding_registers(address_value, 0x1, self.plc_address)
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= 1:
|
|||
|
|
value_return = {'status': 'success', 'message': f"read station{station_no} height success",
|
|||
|
|
'value': resp.registers[0]}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_station_result_counter(self, station_no):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} result counter error"}
|
|||
|
|
try:
|
|||
|
|
if self.drop_register[station_no].get('stationResultCounter'):
|
|||
|
|
address_type, address_value = self.conv_register_address(
|
|||
|
|
self.drop_register[station_no]['stationResultCounter'])
|
|||
|
|
resp = self.client.read_holding_registers(address_value, 0x2, self.plc_address)
|
|||
|
|
# print_with_timestamp(f"dtMachine modbus read_station_result_counter "
|
|||
|
|
# f"{self.drop_register[station_no].get('stationResultCounter')} {resp}")
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= 2:
|
|||
|
|
value_return = {'status': 'success', 'message': f"read station{station_no} result counter success",
|
|||
|
|
'value': (resp.registers[0] | resp.registers[1] * 0x10000)}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_station_cyclesFinished(self, station_no):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinished error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cyclesFinished'])
|
|||
|
|
resp = self.client.read_holding_registers(address_value, 0x1, self.plc_address)
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= 1:
|
|||
|
|
value_return = {'status': 'success', 'message': f"read station{station_no} cyclesFinished success",
|
|||
|
|
'value': resp.registers[0]}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_station_cyclesFinishedBulk(self, station_no, result_index=0, length=33):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinishedBulk error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['cyclesFinished'])
|
|||
|
|
address_value = address_value + PLCTestReqResultData.required_registers_size * result_index
|
|||
|
|
resp = self.client.read_holding_registers(address_value, length, self.plc_address)
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= length:
|
|||
|
|
value_return = {'status': 'success', 'message': f"read station{station_no} cyclesFinishedBulk success",
|
|||
|
|
'value': resp.registers, 'address': address_value}
|
|||
|
|
"""
|
|||
|
|
nextTestResultBufferFlag = 0
|
|||
|
|
if result_index == 6:
|
|||
|
|
nextTestResultBufferFlag = 1
|
|||
|
|
if self.drop_register[station_no].get('resultSwitch') and result_index in [2, 6]:
|
|||
|
|
try:
|
|||
|
|
address_type, flag_address_value = self.conv_register_address(self.drop_register[station_no]['resultSwitch'])
|
|||
|
|
resp = self.client.write_register(flag_address_value, nextTestResultBufferFlag, self.plc_address)
|
|||
|
|
if resp.isError() is True:
|
|||
|
|
print_with_timestamp(f"set station {station_no} nextTestResultBufferFlag error!!!!!")
|
|||
|
|
except Exception as error:
|
|||
|
|
print_with_timestamp(f"write switch register error {error}", color='red') # cyx
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
print_with_timestamp(f"read_station_cyclesFinishedBulk error {error}", color='red')
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_station_dropParamsBulk(self, station_no, length=33):
|
|||
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} dropParamsBulk error"}
|
|||
|
|
direction_register = self.drop_register[station_no].get('direction')
|
|||
|
|
if direction_register is None or direction_register == "":
|
|||
|
|
value_return = {'status': 'success', 'message': f"drop params read is not demand"}
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['direction'])
|
|||
|
|
resp = self.client.read_holding_registers(address_value, length, self.plc_address)
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= length:
|
|||
|
|
value_return = {'status': 'success', 'message': f"read station{station_no} dropParamsBulk success",
|
|||
|
|
'value': resp.registers}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def start_station(self, station_no):
|
|||
|
|
value_return = {'status': 'error', 'message': f"start station{station_no} error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop'])
|
|||
|
|
resp1 = self.client.write_coils(address_value, [0x00], self.plc_address)
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start'])
|
|||
|
|
resp2 = self.client.write_coils(address_value, [0x01], self.plc_address)
|
|||
|
|
if resp1.isError() is False and resp2.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"start station{station_no} success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def resume_station(self, station_no):
|
|||
|
|
value_return = {'status': 'error', 'message': f"resume station{station_no} error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop'])
|
|||
|
|
resp1 = self.client.write_coils(address_value, [0x00], self.plc_address)
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start'])
|
|||
|
|
resp2 = self.client.write_coils(address_value, [0x01], self.plc_address)
|
|||
|
|
if resp1.isError() is False and resp2.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"resume station{station_no} success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def stop_station(self, station_no):
|
|||
|
|
value_return = {'status': 'error', 'message': f"stop station{station_no} error"}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start'])
|
|||
|
|
resp1 = self.client.write_coils(address_value, [0x00], self.plc_address)
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop'])
|
|||
|
|
resp2 = self.client.write_coils(address_value, [0x01], self.plc_address)
|
|||
|
|
if resp1.isError() is False and resp2.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"stop station{station_no} success"}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def station_start_status(self, station_no):
|
|||
|
|
value_return = {'status': 'error'}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['start'])
|
|||
|
|
resp1 = self.client.read_coils(address_value, 0x01, self.plc_address)
|
|||
|
|
if resp1.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f" station{station_no} status success",
|
|||
|
|
'value': resp1.bits[0]}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def station_pause_status(self, station_no): # 不管是上位机还是HMI,如果暂停了机器,需要去设置这个值
|
|||
|
|
value_return = {'status': 'error'}
|
|||
|
|
if self.drop_register[station_no].get('pause') is None:
|
|||
|
|
value_return = {'status': 'error', 'message': "register is not exist"}
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['pause'])
|
|||
|
|
resp1 = self.client.read_coils(address_value, 0x01, self.plc_address)
|
|||
|
|
if resp1.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f" station{station_no} status success",
|
|||
|
|
'value': resp1.bits[0]}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def station_stop_status(self, station_no): # 读PLC 停止试验的寄存器
|
|||
|
|
value_return = {'status': 'error'}
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['stop'])
|
|||
|
|
resp1 = self.client.read_coils(address_value, 0x01, self.plc_address)
|
|||
|
|
if resp1.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f" station{station_no} status success",
|
|||
|
|
'value': resp1.bits[0]}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def station_run_status(self, station_no): # 读PLC 停止试验的寄存器
|
|||
|
|
value_return = {'status': 'error', 'value': 0x00}
|
|||
|
|
|
|||
|
|
if 'runStatus' not in self.drop_register[station_no]:
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
address_type, address_value = self.conv_register_address(self.drop_register[station_no]['runStatus'])
|
|||
|
|
resp1 = self.client.read_coils(address_value, 0x01, self.plc_address)
|
|||
|
|
if resp1.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f" station{station_no} status success",
|
|||
|
|
'value': resp1.bits[0]}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_plc_connect_state(self):
|
|||
|
|
value_return = {'status': 'error', 'message': f"get plc connect state error--0", 'value': 0x00}
|
|||
|
|
"""
|
|||
|
|
if self.is_connected_to_tcp_server() is False and self.preConnection:
|
|||
|
|
print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} disconnect", color='red')
|
|||
|
|
if self.is_connected_to_tcp_server() and self.preConnection is None:
|
|||
|
|
print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} connectted", color='green')
|
|||
|
|
self.preConnection = self.connection
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
if self.is_connected_to_tcp_server() is False:
|
|||
|
|
value_return = {'status': 'error',
|
|||
|
|
'message': f"get plc connect state error modbus tcp client is not connectted"}
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
type, address_value = self.conv_register_address(PLC_CONNECT_REGISTER)
|
|||
|
|
resp = self.client.read_holding_registers(address_value, 0x9, self.plc_address) # 10 定义为PLC 连接状态寄存器
|
|||
|
|
# print_with_timestamp(f"dtMachine_modbus read_plc_connect_state {self.comm} {resp.registers}")
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= 9: # 20250905 读取5个 叠加运行状态
|
|||
|
|
value_return = {'status': 'success', 'message': f"get plc connect state success",
|
|||
|
|
'value': resp.registers}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
except Exception as error:
|
|||
|
|
value_return['error1'] = error
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def read_plc_read_index(self):
|
|||
|
|
value_return = {'status': 'error', 'message': f"get plc connect state error--0", 'value': 0x00}
|
|||
|
|
"""
|
|||
|
|
if self.is_connected_to_tcp_server() is False and self.preConnection:
|
|||
|
|
print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} disconnect", color='red')
|
|||
|
|
if self.is_connected_to_tcp_server() and self.preConnection is None:
|
|||
|
|
print_with_timestamp(f"Plc {self.comm}-{self.tcp_modbus_unit}-{self.plc_address} connectted", color='green')
|
|||
|
|
self.preConnection = self.connection
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
if self.is_connected_to_tcp_server() is False:
|
|||
|
|
value_return = {'status': 'error',
|
|||
|
|
'message': f"get plc connect state error modbus tcp client is not connectted"}
|
|||
|
|
return value_return
|
|||
|
|
try:
|
|||
|
|
type, address_value = self.conv_register_address(PLC_READ_INDEX_REGISTER)
|
|||
|
|
resp = self.client.read_holding_registers(address_value, 0x4, self.plc_address) # 15 定义为PLC
|
|||
|
|
# print_with_timestamp(f"dtMachine_modbus read_plc_connect_state {self.comm} {resp.registers}")
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= 4:
|
|||
|
|
value_return = {'status': 'success', 'message': f"get plc read index success",
|
|||
|
|
'value': resp.registers}
|
|||
|
|
except ModbusException as error:
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
except Exception as error:
|
|||
|
|
value_return['error1'] = error
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
def register_test(self, action, address, value):
|
|||
|
|
type, address_value = self.conv_register_address(address)
|
|||
|
|
if (type == 'Data_Register' or type == 'Holding_Register') and action.lower() == 'read':
|
|||
|
|
value_return = {'status': 'error', 'message': f"register test {action}:{address} error"}
|
|||
|
|
try:
|
|||
|
|
resp = self.client.read_holding_registers(address_value, 0x1, self.plc_address)
|
|||
|
|
if resp.isError() is False and len(resp.registers) >= 1:
|
|||
|
|
value_return = {'status': 'success', 'message': f"register test {action}:{address} success",
|
|||
|
|
'value': resp.registers[0]}
|
|||
|
|
except pymodbus.exceptions.ModbusIOExceptio as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
except Exception as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
if (type == 'Data_Register' or type == 'Holding_Register') and action.lower() == 'write':
|
|||
|
|
try:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value, address_value) # cyx
|
|||
|
|
resp = self.client.write_register(address_value, int(value), self.plc_address)
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"register test {action}:{address} success"}
|
|||
|
|
except ModbusIOException as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
except Exception as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
pass
|
|||
|
|
return value_return
|
|||
|
|
|
|||
|
|
if type == 'Coil' and action.lower() == 'read':
|
|||
|
|
try:
|
|||
|
|
resp = self.client.read_coils(address_value, 0x01, self.plc_address)
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f" register test {action}:{address} success",
|
|||
|
|
'value': resp.bits[0]}
|
|||
|
|
except ModbusIOException as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
except Exception as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
return value_return
|
|||
|
|
if type == 'Coil' and action.lower() == 'write':
|
|||
|
|
try:
|
|||
|
|
resp = self.client.write_coils(address_value, [value], self.plc_address)
|
|||
|
|
if resp.isError() is False:
|
|||
|
|
value_return = {'status': 'success', 'message': f"register test {action}:{address} success"}
|
|||
|
|
except ModbusIOException as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
except Exception as error:
|
|||
|
|
print_with_timestamp('ModbusDtMachine register_test', action, address, value,
|
|||
|
|
self.client.get_last_error()) # cyx
|
|||
|
|
value_return['error'] = error
|
|||
|
|
return value_return
|
|||
|
|
pass
|