115 lines
7.0 KiB
Python
115 lines
7.0 KiB
Python
from mhi_plc import MitsubishiPLC
|
|
import threading
|
|
|
|
class MhiDtMachine: # 每个机器 这个类是不通过modbus client <---> modbus server 直接读取PLC的
|
|
def __init__(self, comm_port, comm_config={}, drop_register={}, plc_address=0x01):
|
|
self.machine_plc = MitsubishiPLC(comm_port, comm_config)
|
|
self.drop_register = drop_register
|
|
self.plc_address = plc_address
|
|
self.plc_comm_lock = threading.Lock()
|
|
|
|
def set_station_dropheight(self, station_no, height):
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} dropheight error"}
|
|
with self.plc_comm_lock:
|
|
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['height'],
|
|
height)
|
|
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
|
|
if resp['data'][0] == 0x06: # ACK
|
|
value_return = {'status': 'success'}
|
|
print(f'write {station_no} height {height} success')
|
|
return value_return
|
|
|
|
def set_station_cycles(self, station_no, cycles):
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} cycles error"}
|
|
with self.plc_comm_lock:
|
|
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['cycles'],
|
|
cycles)
|
|
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
|
|
if resp['data'][0] == 0x06: # ACK
|
|
value_return = {'status': 'success'}
|
|
print(f'write {station_no} cycles {cycles} success')
|
|
return value_return
|
|
|
|
def set_station_finished(self, station_no, finished):
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} finished error"}
|
|
with self.plc_comm_lock:
|
|
resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF,
|
|
self.drop_register[station_no]['cyclesFinished'], finished)
|
|
if 'status' in resp and resp['status'] == 'success' and 'data' in resp:
|
|
if resp['data'][0] == 0x06: # ACK
|
|
value_return = {'status': 'success'}
|
|
print(f'write {station_no} cyclesFinished {finished} success')
|
|
return value_return
|
|
|
|
def read_station_dropheight(self, station_no):
|
|
value_return = {'status': 'error', 'message': f"read station{station_no} dropheight error"}
|
|
with self.plc_comm_lock:
|
|
resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF, self.drop_register[station_no]['height'],
|
|
0x01)
|
|
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
|
|
value_return = {'status': 'success', 'value': resp['data'][0]}
|
|
print(f'read {station_no} height success', resp['data'][0])
|
|
return value_return
|
|
|
|
def read_station_cyclesFinished(self, station_no):
|
|
value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinished error"}
|
|
with self.plc_comm_lock:
|
|
resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF,
|
|
self.drop_register[station_no]['cyclesFinished'],
|
|
0x01)
|
|
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
|
|
value_return = {'status': 'success', 'value': resp['data'][0]}
|
|
print(f'read {station_no} cyclesFinished success', resp['data'][0])
|
|
return value_return
|
|
|
|
def start_station(self, station_no):
|
|
value_return = {'status': 'error', 'message': f"start station{station_no} error"}
|
|
with self.plc_comm_lock:
|
|
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
|
|
0x01, [0x01])
|
|
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
|
|
0x01, [0x00])
|
|
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
|
|
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][
|
|
0] == 0x06: # ACK
|
|
value_return = {'status': 'success'}
|
|
print(f'plc start {station_no} success')
|
|
return value_return
|
|
|
|
def resume_station(self, station_no):
|
|
value_return = {'status': 'error', 'message': f"resume station{station_no} error"}
|
|
with self.plc_comm_lock:
|
|
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
|
|
0x01, [0x01])
|
|
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
|
|
0x01, [0x00])
|
|
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
|
|
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][
|
|
0] == 0x06: # ACK
|
|
value_return = {'status': 'success'}
|
|
print(f'plc resume {station_no} success')
|
|
|
|
return value_return
|
|
|
|
def stop_station(self, station_no):
|
|
value_return = {'status': 'error', 'message': f"stop station{station_no} error"}
|
|
with self.plc_comm_lock:
|
|
resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'],
|
|
0x01, [0x00])
|
|
resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'],
|
|
0x01, [0x01])
|
|
if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \
|
|
'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][
|
|
0] == 0x06: # ACK
|
|
value_return = {'status': 'success'}
|
|
print(f'plc stop {station_no} success')
|
|
return value_return
|
|
|
|
def station_start_status(self, station_no):
|
|
value_return = {'status': 'error'}
|
|
with self.plc_comm_lock:
|
|
resp = self.machine_plc.plc_read_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01)
|
|
if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01:
|
|
value_return = {'status': 'success', 'value': resp['data'][0]}
|
|
print(f'read {station_no} cyclesFinished success', resp['data'][0])
|
|
return value_return |