Files
dtm-py-all/mhi_plc.py

323 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import array
import asyncio
import sys
import time
import datetime
import keyboard
import serial
import requests
import threading
import argparse
import re
import utils
class MitsubishiPLC:
def __init__(self, plc_port, comm_config):
global print_condition, api_condition, all_plc_recv
port_opened = False
self.event_format_long = False
self.plc_port = plc_port
self.baudrate = comm_config['baudrate']
try:
self.ser = serial.Serial(port=plc_port,
baudrate=comm_config['baudrate'],
parity=comm_config['parity'],
bytesize=comm_config['bytesize'],
stopbits=comm_config['stopbits'],
timeout=1, rtscts=False)
# condition_print(f"DevicePLC open {plc_port} success") # cyx
print(f"Mhi DevicePLC open {plc_port} {comm_config['baudrate']} {comm_config['bytesize']} success") # cyx
except Exception as error:
print("Mhi DevicePLC open error", error) # cyx
utils.condition_print("\033[91m {}\033[00m".format("plc " + plc_port + " open error"))
try:
self.ser = serial.Serial(port="com3",
baudrate=comm_config['baudrate'],
parity=comm_config['parity'],
bytesize=comm_config['bytesize'],
stopbits=comm_config['stopbits'],
timeout=1, rtscts=False)
except:
print("\033[91m {}\033[00m".format("plc " + "com1" + " open error"))
else:
port_opened = True
print("\033[92m {}\033[00m".format("plc " + "com1" + " open success"))
else:
port_opened = True
print("\033[92m {}\033[00m".format("plc " + plc_port + " open success--1"))
finally:
pass
if port_opened:
self.link_down_timer = threading.Timer(5.0, self.link_down_timeout) # 创建一个5秒后执行的定时器
# self.link_down_timer.start()
self.link_down_event = threading.Event()
if self.port_is_opened():
# self.recv_thread.start();
pass
def conv_to_plc_address(self, type_char, value):
# 检查type_char是否为'M'或'D'value是否为整数
if type_char not in ('M', 'D') or not isinstance(value, int):
raise ValueError("Type must be 'M' or 'D' and value must be an integer.")
# 使用str.format()方法格式化字符串
formatted = "{type}{value:04d}".format(type=type_char, value=value)
return formatted
def readWithTimeOut(self, size, timeout):
origin_timeout = self.ser.timeout
self.ser.timeout = timeout
data = None
success = True
message = ""
start_time = time.time()
try:
data = self.ser.read(size)
except serial.serialTimeoutException:
success = False
message = "serial read timeout"
finally:
self.ser.timeout = origin_timeout
real_time = time.time() - start_time
# print(f"time={real_time}", message,f"timeout setting={timeout}") #cyx
return success, data, message
def port_is_opened(self):
if hasattr(self, 'ser') and self.ser:
return True
else:
return False
return False
def get_connect_state(self, connect_state_address, slave):
if self.port_is_opened() is False:
return 0
try:
# current_time = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
# print(f'{current_time} :mhi plc {self.plc_port} get_connect_state begin') #cyx
result = self.plc_read('WR', slave, 0xff, connect_state_address, 0x01)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
frame = result['data']
data_len = (len(frame) - 8) / 4
if data_len == 0x1:
return 1
else:
return 0
except Exception as e:
return 0
finally:
# current_time = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]
# print(f'{current_time} :mhi plc {self.plc_port} get_connect_state end') # cyx
pass
def link_down_timeout(self):
self.link_down_event.set()
self.link_down_timer = threading.Timer(5.0, self.link_down_timeout) # 创建一个5秒后执行的定时器
self.link_down_timer.start()
def reset_link_down_timer(self):
if self.link_down_timer:
self.link_down_timer.cancel()
self.link_down_timer = threading.Timer(5.0, self.link_down_timeout) # 创建一个5秒后执行的定时器
self.link_down_timer.start()
def send_heartBit(self):
if self.port_is_opened():
pass
def crc16_ccitt(self, data: bytearray): # CRC16/CCITT
crcval = 0x0000
for c in data:
q = (crcval ^ c) & 0x0F
crcval = (crcval >> 4) ^ (q * 0o10201)
q = (crcval ^ (c >> 4)) & 0x0F
crcval = (crcval >> 4) ^ (q * 0o10201)
return crcval
def plc_read_frame(self, command, plc_no=0x01, pc_no=0xFF, address='D0160', lenght=0x02):
frame = []
frame.append(0x05)
frame.extend(utils.hex_to_ascii_arr(plc_no, 2))
frame.extend(utils.hex_to_ascii_arr(pc_no, 2))
frame.extend([ord(c) for c in command]) # word read command
frame.append(0x30)
frame.extend(utils.str_to_ascii_array(address, 5)) # eg.D0160
frame.extend(utils.int_to_ascii_array(lenght, 2))
checksum = sum(frame[1:]) & 0xFF
frame.extend(utils.hex_to_ascii_arr(checksum, 2))
# condition_print("frame=",frame,type(frame))
utils.condition_print('frame intent to read plc: [', ', '.join('{:02x}'.format(x) for x in frame), ']')
return frame
def plc_write_frame(self, command, plc_no=0x01, pc_no=0xFF, address='D0160', date_len=0x02, data=[]):
frame = [0x05]
frame.extend(utils.hex_to_ascii_arr(plc_no, 2))
frame.extend(utils.hex_to_ascii_arr(pc_no, 2))
frame.extend([ord(c) for c in command]) # word write command bit write command
frame.append(0x30)
frame.extend(utils.str_to_ascii_array(address, 5)) # eg.D0160
frame.extend(utils.int_to_ascii_array(date_len, 2))
data_index = 0
while date_len > 0:
value = data[data_index]
data_index = data_index + 1
if command == 'BW':
arr_len = 1
else:
arr_len = 4
word_ascii_array = utils.hex_to_ascii_arr(value, arr_len)
# condition_print(word,'[', ', '.join('{:02x}'.format(x) for x in word_ascii_array), ']')
frame.extend(word_ascii_array)
date_len = date_len - 1
checksum = sum(frame[1:]) & 0xFF
frame.extend(utils.hex_to_ascii_arr(checksum, 2))
utils.condition_print('frame intend to write plc: [', ', '.join('{:02x}'.format(x) for x in frame), ']')
return frame
# Symbol name Description Code (hexadecimal)
# STX Start of Text 02H
# ETX End of Text 03H
# EOT End of Transmission 04H
# ENQ Enquiry 05H
# ACK Acknowledge 06H
# LF Line Feed 0AH
# CL Clear 0CH
# CR Carriage Return ODH
# NAK Negative Acknowledge 15H
def frame_is_valid(self, frame):
# 检查第一个字节是否为控制代码
if len(frame) < 5:
return False
try:
control_code = frame[0]
ETX: bool = False
if control_code not in [0x05, 0x02, 0x03, 0x4, 0x06, 0x15]:
utils.condition_print("First byte is not a control code.")
return False
# 检查最后两个字节是否为校验和代码
hex_string = ''.join(chr(char) for char in frame[-2:])
# 然后你可以使用int函数将这个16进制的字符串转换为一个整数指定基数为16
checksum = int(hex_string, 16)
if frame[0] == 0x02 and frame[-3] != 0x03:
return False
if frame[0] == 0x06 or frame[0] == 0x15:
return True
if frame[-3] == 0x03:
ETX = True
end_position = -3 if ETX else -2
checksum_calc = sum(frame[1:-2]) & 0xFF # 计算校验和并取第八位
if checksum != checksum_calc:
utils.condition_print("Last two bytes do not match checksum.", checksum, checksum_calc, frame[-2] - 0x30,
frame[-1] - 0x30)
return False
# 创建一个包含所有有效字符的字符串
valid_chars = '0123456789abcdefABCDEF'
# 定义一个函数来检查bytes对象中的每个元素
for i in frame[1:end_position]:
char = chr(i)
if char not in valid_chars:
utils.condition_print(f'{char} ({i}) is not a valid character.')
# 在此处添加其他的逻辑处理代码,例如抛出异常
# raise ValueError(f'{char} ({i}) is not a valid character.')
return False
else:
# condition_print(f'{char} ({i}) is a valid character.')
pass
except Exception as e:
return False
return True
def plc_write(self, command='WW', plc_no=0x01, pc_no=0xFF, address='D0160', lenght=0x02, data=0x00000000):
frame = self.plc_write_frame(command, plc_no, pc_no, address, lenght, data)
timeout = 0.05 # 设置超时时间为50毫秒
try:
if hasattr(self, 'ser') and self.ser:
self.ser.write(frame)
read_bytes = 20
timeout = read_bytes * 11 * 1 / self.baudrate * 3
success, recv_frame, message = self.readWithTimeOut(read_bytes, timeout)
if not success:
print("mhi plc write error", plc_no, address, lenght, data) # cyx
return {"status": "error", "msg": message}
else:
if not self.frame_is_valid(recv_frame):
utils.condition_print(f"recv frame is not valid recv frame len={len(recv_frame)}")
return {"status": "error", "msg": "recv frame is not valid"}
else:
utils.condition_print('--read data return: [', ', '.join('{:02x}'.format(x) for x in recv_frame), ']')
return {"status": "success"} # {"status": "success", "data": recv_frame}
else:
return {"status": "error", "msg": "serial com is not open!"}
# 记录开始时间
except Exception as e:
return {"status": "error", "msg": "serial com is not open!"}
def plc_read(self, command, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
frame = self.plc_read_frame(command, plc_no, pc_no, address, length)
try:
if hasattr(self, 'ser') and self.ser:
self.ser.write(frame)
read_bytes = 20
timeout = read_bytes * 11 * 1 / self.baudrate * 3
success, recv_frame, message = self.readWithTimeOut(read_bytes, timeout)
if not success:
return {"status": "error", "msg": message}
else:
if not self.frame_is_valid(recv_frame):
utils.condition_print(f"recv frame is not valid frame len={len(recv_frame)}")
return {"status": "error", "msg": "recv frame is not valid"}
else:
utils.condition_print('--read data return: [', ', '.join('{:02x}'.format(x) for x in recv_frame), ']')
return {"status": "success", "data": recv_frame}
else:
return {"status": "error", "msg": "serial com is not open!"}
except Exception as e:
return {"status": "error", "msg": "serial com is not open!"}
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
utils.condition_print(f'mhi read words {address} {length} ')
resp = {'status': 'error'}
result = self.plc_read('WR', plc_no, pc_no, address, length)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
frame = result['data']
data_len = (len(frame) - 8) / 4
if data_len == length:
value = [utils.ascii_array_to_word(frame[5 + 4 * i:5 + 4 * (i + 1)]) for i in range(length)]
resp = {'status': 'success', 'length': data_len, 'data': value}
return resp
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
utils.condition_print(f'mhi read bits {address} {length} ')
resp = {'status': 'error'}
result = self.plc_read('BR', plc_no, pc_no, address, length)
utils.condition_print('plc_read_bits result', result) # cyx
if 'status' in result and result['status'] == 'success' and 'data' in result:
frame = result['data']
data_len = (len(frame) - 8) / 1
if data_len == length:
value = [(frame[5 + 1 * i] - 0x30) for i in range(length)]
resp = {'status': 'success', 'length': data_len, 'data': value}
return resp
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
utils.condition_print(f'mhi write word {address} {data} ')
return self.plc_write('WW', plc_no, pc_no, address, 0x01, list([data]))
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
utils.condition_print(f'mhi write dword {address} {data} ')
return self.plc_write('WW', plc_no, pc_no, address, 0x02, list([data >> 16, data & 0xFFFF]))
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
utils.condition_print(f'mhi write words {start_addr} {length} ', data)
return self.plc_write('WW', plc_no, pc_no, start_addr, length, data)
def plc_write_bit(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00):
utils.condition_print(f'mhi write bit {address} {data} ')
return self.plc_write('BW', plc_no, pc_no, address, 0x1, list([data]))