Files
dtm-py-all/dtMachine_mhi.py

115 lines
7.0 KiB
Python

from mhi_plc import MitsubishiPLC
import threading
class MhiDtMachine: # 每个机器 这个类是不通过modbus client <---> modbus server 直接读取PLC的
def __init__(self, comm_port, comm_config={}, drop_register={}, plc_address=0x01):
self.machine_plc = MitsubishiPLC(comm_port, comm_config)
self.drop_register = drop_register
self.plc_address = plc_address
self.plc_comm_lock = threading.Lock()
def set_station_dropheight(self, station_no, height):
value_return = {'status': 'error', 'message': f"set station{station_no} dropheight error"}
with self.plc_comm_lock:
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['height'],
height)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
if resp['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
print(f'write {station_no} height {height} success')
return value_return
def set_station_cycles(self, station_no, cycles):
value_return = {'status': 'error', 'message': f"set station{station_no} cycles error"}
with self.plc_comm_lock:
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['cycles'],
cycles)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
if resp['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
print(f'write {station_no} cycles {cycles} success')
return value_return
def set_station_finished(self, station_no, finished):
value_return = {'status': 'error', 'message': f"set station{station_no} finished error"}
with self.plc_comm_lock:
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF,
self.drop_register[station_no]['cyclesFinished'], finished)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
if resp['data'][0] == 0x06: # ACK
value_return = {'status': 'success'}
print(f'write {station_no} cyclesFinished {finished} success')
return value_return
def read_station_dropheight(self, station_no):
value_return = {'status': 'error', 'message': f"read station{station_no} dropheight error"}
with self.plc_comm_lock:
resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF, self.drop_register[station_no]['height'],
0x01)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
value_return = {'status': 'success', 'value': resp['data'][0]}
print(f'read {station_no} height success', resp['data'][0])
return value_return
def read_station_cyclesFinished(self, station_no):
value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinished error"}
with self.plc_comm_lock:
resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF,
self.drop_register[station_no]['cyclesFinished'],
0x01)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
value_return = {'status': 'success', 'value': resp['data'][0]}
print(f'read {station_no} cyclesFinished success', resp['data'][0])
return value_return
def start_station(self, station_no):
value_return = {'status': 'error', 'message': f"start station{station_no} error"}
with self.plc_comm_lock:
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
0x01, [0x01])
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
0x01, [0x00])
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][
0] == 0x06: # ACK
value_return = {'status': 'success'}
print(f'plc start {station_no} success')
return value_return
def resume_station(self, station_no):
value_return = {'status': 'error', 'message': f"resume station{station_no} error"}
with self.plc_comm_lock:
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
0x01, [0x01])
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
0x01, [0x00])
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][
0] == 0x06: # ACK
value_return = {'status': 'success'}
print(f'plc resume {station_no} success')
return value_return
def stop_station(self, station_no):
value_return = {'status': 'error', 'message': f"stop station{station_no} error"}
with self.plc_comm_lock:
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
0x01, [0x00])
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
0x01, [0x01])
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][
0] == 0x06: # ACK
value_return = {'status': 'success'}
print(f'plc stop {station_no} success')
return value_return
def station_start_status(self, station_no):
value_return = {'status': 'error'}
with self.plc_comm_lock:
resp = self.machine_plc.plc_read_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01)
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
value_return = {'status': 'success', 'value': resp['data'][0]}
print(f'read {station_no} cyclesFinished success', resp['data'][0])
return value_return