Files
dtm-py-all/bk/plc_comm.py

475 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import array
import time
import serial
import threading
import binascii
import datetime
import struct
import requests
from queue import Queue
print_condition = False
all_plc_recv = 0
# 定义颜色代码
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
RESET = '\033[m'
def bcd_coder_array(value=0, lenght=4):
return int_to_bcd(value, lenght)
def int_to_bcd(number=0, length=5):
bcd_array = []
try:
int_number = int(round(number)) # 使用round函数四舍五入然后转成整数
while int_number > 0:
bcd = int_number % 100
bcd_array.insert(0, (bcd // 10) << 4 | (bcd % 10))
int_number = int_number // 100
except:
pass
while len(bcd_array) < length:
bcd_array.insert(0, 0)
return bcd_array
def int_to_ascii_array(number, length):
# 将整数转换为ASCII码字符串
ascii_str = str(number)
# 获取ASCII码字符串的长度
ascii_str_length = len(ascii_str)
# 如果ASCII码字符串的长度小于指定长度则在前面补0
if ascii_str_length < length:
ascii_str = '0' * (length - ascii_str_length) + ascii_str
# 将ASCII码字符串转换为ASCII码数组
ascii_array = [ord(c) for c in ascii_str]
return ascii_array
def str_to_ascii_array(str, length):
# 获取ASCII码字符串的长度
ascii_str_length = len(str)
# 如果ASCII码字符串的长度小于指定长度则在前面补0
if ascii_str_length < length:
ascii_str = '0' * (length - ascii_str_length) + str
# 将ASCII码字符串转换为ASCII码数组
ascii_array = [ord(c) for c in str]
return ascii_array
def hex_to_ascii_arr(hex_num, lenth):
str_num = hex(hex_num)[2:].upper().rjust(lenth, '0')
array = [ord(c) for c in str_num]
return array
def convert_ascii_to_num(ascii_val):
if ord('0') <= ascii_val <= ord('9'):
return ascii_val - ord('0')
elif ord('A') <= ascii_val <= ord('F'):
return ascii_val - ord('A') + 10
elif ord('a') <= ascii_val <= ord('f'):
return ascii_val - ord('a') + 10
else:
return 0
def ascii_array_to_word(ascii_array):
if len(ascii_array) != 4:
return 0
return convert_ascii_to_num(ascii_array[0]) * 0x1000 + \
convert_ascii_to_num(ascii_array[1]) * 0x0100 + \
convert_ascii_to_num(ascii_array[2]) * 0x010 + \
convert_ascii_to_num(ascii_array[3])
def date_to_bcd(date_str): # 0:06:20.585233
# condition_print("date=",date_str,type(date_str))
# 移除日期中的"-"字符
date_str = date_str.replace("-", "")
# 按照两位数字进行分割
date_parts = [date_str[i:i + 2] for i in range(0, len(date_str), 2)]
# 将每个部分转换为十六进制数
bcd_array = [int(part, 16) for part in date_parts]
return bcd_array
def time_to_bcd(time_str): # 10:06:20.585233
time_str = time_str.split('.')[0] # 去除掉后面的毫秒
time_parts = time_str.split(':') # 按":"分割
bcd = [int(part) for part in time_parts] # 分别转为整数
bcd = [int(str(i), 16) for i in bcd] # 分别转为16进制
return bcd
def condition_print(*args, **kwargs):
global print_condition # 使用全局变量
message = ' '.join(str(arg) for arg in args)
if kwargs:
message += ', ' + ', '.join("{}={}".format(key, value) for key, value in kwargs.items())
if print_condition:
print(message)
class DevicePLC:
def __init__(self, plc_port, comm_config):
global print_condition, api_condition, all_plc_recv
port_opened = False
print("DevicePLC init")
self.event_format_long = False
try:
self.ser = serial.Serial(port=plc_port,
baudrate=comm_config['baudrate'],
parity=comm_config['parity'],
bytesize=comm_config['bytesize'],
stopbits=comm_config['stopbits'],
timeout=1, rtscts=False)
except:
condition_print("\033[91m {}\033[00m".format("plc " + plc_port + " open error"))
try:
self.ser = serial.Serial(port="com1", baudrate=baudrate, parity=serial.PARITY_EVEN,
bytesize=serial.EIGHTBITS,
stopbits=serial.STOPBITS_ONE,
timeout=2, rtscts=False)
except:
condition_print("\033[91m {}\033[00m".format("plc " + "com1" + " open error"))
else:
port_opened = True
condition_print("\033[92m {}\033[00m".format("plc " + "com1" + " open success"))
else:
port_opened = True
condition_print("\033[92m {}\033[00m".format("plc " + plc_port + " open success--1"))
finally:
pass
if port_opened:
self.link_down_timer = threading.Timer(5.0, self.link_down_timeout) # 创建一个5秒后执行的定时器
# self.link_down_timer.start()
self.link_down_event = threading.Event()
if self.port_is_opened():
# self.recv_thread.start();
pass
def port_is_opened(self):
if hasattr(self, 'ser') and self.ser:
return True
else:
return False
return False
def link_down_timeout(self):
self.link_down_event.set()
self.link_down_timer = threading.Timer(5.0, self.link_down_timeout) # 创建一个5秒后执行的定时器
self.link_down_timer.start()
def reset_link_down_timer(self):
if self.link_down_timer:
self.link_down_timer.cancel()
self.link_down_timer = threading.Timer(5.0, self.link_down_timeout) # 创建一个5秒后执行的定时器
self.link_down_timer.start()
def send_heartBit(self):
if self.port_is_opened():
pass
def crc16_ccitt(self, data: bytearray): # CRC16/CCITT
crcval = 0x0000
for c in data:
q = (crcval ^ c) & 0x0F
crcval = (crcval >> 4) ^ (q * 0o10201)
q = (crcval ^ (c >> 4)) & 0x0F
crcval = (crcval >> 4) ^ (q * 0o10201)
return crcval
def plc_read_frame(self, command, plc_no=0x01, pc_no=0xFF, address='D0160', len=0x02):
frame = []
frame.append(0x05)
frame.extend(hex_to_ascii_arr(plc_no, 2))
frame.extend(hex_to_ascii_arr(pc_no, 2))
frame.extend([ord(c) for c in command]) # word read command
frame.append(0x30)
frame.extend(str_to_ascii_array(address, 5)) # eg.D0160
frame.extend(int_to_ascii_array(len, 2))
checksum = sum(frame[1:]) & 0xFF
frame.extend(hex_to_ascii_arr(checksum, 2))
# condition_print("frame=",frame,type(frame))
condition_print('frame intent to read plc: [', ', '.join('{:02x}'.format(x) for x in frame), ']')
return frame
def plc_write_frame(self, command, plc_no=0x01, pc_no=0xFF, address='D0160', date_len=0x02, data=[]):
frame = [0x05]
frame.extend(hex_to_ascii_arr(plc_no, 2))
frame.extend(hex_to_ascii_arr(pc_no, 2))
frame.extend([ord(c) for c in command]) # word write command bit write command
frame.append(0x30)
frame.extend(str_to_ascii_array(address, 5)) # eg.D0160
frame.extend(int_to_ascii_array(date_len, 2))
data_index = 0
while date_len > 0:
value = data[data_index]
data_index = data_index + 1
if command == 'BW':
arr_len = 1
else:
arr_len = 4
word_ascii_array = hex_to_ascii_arr(value, arr_len)
# condition_print(word,'[', ', '.join('{:02x}'.format(x) for x in word_ascii_array), ']')
frame.extend(word_ascii_array)
date_len = date_len - 1
checksum = sum(frame[1:]) & 0xFF
frame.extend(hex_to_ascii_arr(checksum, 2))
condition_print('frame intend to write plc: [', ', '.join('{:02x}'.format(x) for x in frame), ']')
return frame
# Symbol name Description Code (hexadecimal)
# STX Start of Text 02H
# ETX End of Text 03H
# EOT End of Transmission 04H
# ENQ Enquiry 05H
# ACK Acknowledge 06H
# LF Line Feed 0AH
# CL Clear 0CH
# CR Carriage Return ODH
# NAK Negative Acknowledge 15H
def frame_is_valid(self, frame):
# 检查第一个字节是否为控制代码
control_code = frame[0]
ETX: bool = False
if control_code not in [0x05, 0x02, 0x03, 0x4, 0x06, 0x15]:
condition_print("First byte is not a control code.")
return False
# 检查最后两个字节是否为校验和代码
hex_string = ''.join(chr(char) for char in frame[-2:])
# 然后你可以使用int函数将这个16进制的字符串转换为一个整数指定基数为16
checksum = int(hex_string, 16)
if frame[0] == 0x02 and frame[-3] != 0x03:
return False
if frame[0] == 0x06 or frame[0] == 0x15:
return True
if frame[-3] == 0x03:
ETX = True
end_position = -3 if ETX else -2
checksum_calc = sum(frame[1:-2]) & 0xFF # 计算校验和并取第八位
if checksum != checksum_calc:
condition_print("Last two bytes do not match checksum.", checksum, checksum_calc, frame[-2] - 0x30, frame[-1] - 0x30)
return False
# 创建一个包含所有有效字符的字符串
valid_chars = '0123456789abcdefABCDEF'
# 定义一个函数来检查bytes对象中的每个元素
for i in frame[1:end_position]:
char = chr(i)
if char not in valid_chars:
condition_print(f'{char} ({i}) is not a valid character.')
# 在此处添加其他的逻辑处理代码,例如抛出异常
# raise ValueError(f'{char} ({i}) is not a valid character.')
return False
else:
# condition_print(f'{char} ({i}) is a valid character.')
pass
return True
def plc_write(self, command='WW', plc_no=0x01, pc_no=0xFF, address='D0160', len=0x02, data=0x00000000):
frame = self.plc_write_frame(command, plc_no, pc_no, address, len, data)
timeout = 2 # 设置超时时间为2秒
if hasattr(self, 'ser') and self.ser:
self.ser.write(frame)
frame = self.ser.read(1);
time.sleep(0.06)
start_time = time.time() # 记录开始时间
while self.ser.in_waiting:
# 检查是否超时
if time.time() - start_time > timeout:
condition_print("serial com read Time's out!--", time)
return {"status": "error", "msg": "serial com read Time's out!"}
break
frame += self.ser.read(self.ser.in_waiting) # 读取剩余的数据
if self.frame_is_valid(frame) == False:
condition_print("recv frame is not valid", '[', ', '.join('{:02x}'.format(x) for x in frame), ']')
return {"status": "error", "msg": "recv frame is not valid"}
else:
condition_print("--write data to plc success", "resp", ' [',
', '.join('{:02x}'.format(x) for x in frame), ']')
# condition_print('recv pld frame: [', ', '.join('{:02x}'.format(x) for x in frame), ']')
return {"status": "success", "data": frame}
else:
return {"status":"error", "msg": "serial com is not open!"}
pass
def plc_read(self, command, plc_no=0x01, pc_no=0xFF, address='D0160', len=0x01):
frame = self.plc_read_frame(command, plc_no, pc_no, address, len)
timeout = 2 # 设置超时时间为2秒
if hasattr(self, 'ser') and self.ser:
self.ser.write(frame)
frame = self.ser.read(1);
time.sleep(0.06)
start_time = time.time() # 记录开始时间
while self.ser.in_waiting:
# 检查是否超时
if time.time() - start_time > timeout:
condition_print("serial com read Time's out!--", time)
return {"status": "error", "msg": "serial com read Time's out!"}
break
frame += self.ser.read(self.ser.in_waiting) # 读取剩余的数据
if self.frame_is_valid(frame) == False:
condition_print("recv frame is not valid")
return {"status": "error", "msg": "recv frame is not valid"}
else:
condition_print('--read data return: [', ', '.join('{:02x}'.format(x) for x in frame), ']')
return {"status": "success", "data": frame}
else:
return {"status":"error", "msg": "serial com is not open!"}
def plc_read_words(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
resp = {'status': 'error'}
result = self.plc_read('WR', plc_no, pc_no, address, length)
if result and 'status' in result and result['status'] == 'success' and 'data' in result:
frame = result['data']
data_len = (len(frame) - 8) / 4
if data_len == length:
value = [ascii_array_to_word(frame[5 + 4 * i:5 + 4 * (i + 1)]) for i in range(length)]
resp = {'status': 'success', 'length': data_len, 'data': value}
return resp
def plc_read_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01):
resp = {'status': 'error'}
result = self.plc_read('BR', plc_no, pc_no, address, length)
condition_print('plc_read_bits result',result) #cyx
if 'status' in result and result['status'] == 'success' and 'data' in result:
frame = result['data']
data_len = (len(frame) - 8) / 1
if data_len == length:
value = [(frame[5 + 1 * i] - 0x30) for i in range(length)]
resp = {'status': 'success', 'length': data_len, 'data': value}
return resp
def plc_write_word(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x0000):
return self.plc_write('WW', plc_no, pc_no, address, 0x01, list([data]))
def plc_write_dword(self, plc_no=0x01, pc_no=0xFF, address='D0160', data=0x00000000):
return self.plc_write('WW', plc_no, pc_no, address, 0x02, list([data >> 16, data & 0xFFFF]))
def plc_write_words(self, plc_no=0x01, pc_no=0xFF, start_addr='D0160', length=0x04, data=[]):
return self.plc_write('WW', plc_no, pc_no, start_addr, length, data)
def plc_write_bits(self, plc_no=0x01, pc_no=0xFF, address='D0160', length=0x01, data=[]):
return self.plc_write('BW', plc_no, pc_no, address, length, data)
class DtmMachine:
def __init__(self, comm_port, comm_config={}, drop_register={}, plc_address=0x01):
self.machine_plc = DevicePLC(comm_port, comm_config)
self.drop_register = drop_register
self.plc_address = plc_address
self.plc_comm_lock = threading.Lock()
pass
def set_station_dropheight(self, station_no, height):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['height'], height)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
if resp['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
condition_print(f'write {station_no} height {height} success')
return value_return
def set_station_cycles(self, station_no, cycles):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['cycles'], cycles)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
if resp['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
condition_print(f'write {station_no} cycles {cycles} success')
return value_return
def set_station_finished(self, station_no, finished):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['cyclesFinished'], finished)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
if resp['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
condition_print(f'write {station_no} cyclesFinished {finished} success')
return value_return
def read_station_dropheight(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF, self.drop_register[station_no]['height'], 0x01)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
value_return = {'status': 'success', 'value': resp['data'][0]}
condition_print(f'read {station_no} height success', resp['data'][0])
return value_return
def read_station_cyclesFinished(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF, self.drop_register[station_no]['cyclesFinished'],
0x01)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
value_return = {'status': 'success', 'value': resp['data'][0]}
condition_print(f'read {station_no} cyclesFinished success', resp['data'][0])
return value_return
def start_station(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
0x01, [0x01])
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
0x01, [0x00])
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
condition_print(f'plc start {station_no} success')
return value_return
def resume_station(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
0x01, [0x01])
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
0x01, [0x00])
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
condition_print(f'plc resume {station_no} success')
return value_return
def stop_station(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01, [0x00])
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'], 0x01, [0x01])
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
condition_print(f'plc stop {station_no} success')
return value_return
def station_start_status(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_read_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
value_return = {'status': 'success', 'value': resp['data'][0]}
condition_print(f'plc read {station_no} cyclesFinished success', resp['data'][0])
return value_return