from mhi_plc import MitsubishiPLC import threading class MhiDtMachine: # 每个机器 这个类是不通过modbus client <---> modbus server 直接读取PLC的 def __init__(self, comm_port, comm_config={}, drop_register={}, plc_address=0x01): self.machine_plc = MitsubishiPLC(comm_port, comm_config) self.drop_register = drop_register self.plc_address = plc_address self.plc_comm_lock = threading.Lock() def set_station_dropheight(self, station_no, height): value_return = {'status': 'error', 'message': f"set station{station_no} dropheight error"} with self.plc_comm_lock: resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['height'], height) if 'status' in resp and resp['status'] == 'success' and 'data' in resp: if resp['data'][0] == 0x06: # ACK value_return = {'status': 'success'} print(f'write {station_no} height {height} success') return value_return def set_station_cycles(self, station_no, cycles): value_return = {'status': 'error', 'message': f"set station{station_no} cycles error"} with self.plc_comm_lock: resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['cycles'], cycles) if 'status' in resp and resp['status'] == 'success' and 'data' in resp: if resp['data'][0] == 0x06: # ACK value_return = {'status': 'success'} print(f'write {station_no} cycles {cycles} success') return value_return def set_station_finished(self, station_no, finished): value_return = {'status': 'error', 'message': f"set station{station_no} finished error"} with self.plc_comm_lock: resp = self.machine_plc.plc_write_word(self.plc_address, 0xFF, self.drop_register[station_no]['cyclesFinished'], finished) if 'status' in resp and resp['status'] == 'success' and 'data' in resp: if resp['data'][0] == 0x06: # ACK value_return = {'status': 'success'} print(f'write {station_no} cyclesFinished {finished} success') return value_return def read_station_dropheight(self, station_no): value_return = {'status': 'error', 'message': f"read station{station_no} dropheight error"} with self.plc_comm_lock: resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF, self.drop_register[station_no]['height'], 0x01) if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01: value_return = {'status': 'success', 'value': resp['data'][0]} print(f'read {station_no} height success', resp['data'][0]) return value_return def read_station_cyclesFinished(self, station_no): value_return = {'status': 'error', 'message': f"set station{station_no} cyclesFinished error"} with self.plc_comm_lock: resp = self.machine_plc.plc_read_words(self.plc_address, 0xFF, self.drop_register[station_no]['cyclesFinished'], 0x01) if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01: value_return = {'status': 'success', 'value': resp['data'][0]} print(f'read {station_no} cyclesFinished success', resp['data'][0]) return value_return def start_station(self, station_no): value_return = {'status': 'error', 'message': f"start station{station_no} error"} with self.plc_comm_lock: resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01, [0x01]) resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'], 0x01, [0x00]) if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \ 'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][ 0] == 0x06: # ACK value_return = {'status': 'success'} print(f'plc start {station_no} success') return value_return def resume_station(self, station_no): value_return = {'status': 'error', 'message': f"resume station{station_no} error"} with self.plc_comm_lock: resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01, [0x01]) resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'], 0x01, [0x00]) if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \ 'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][ 0] == 0x06: # ACK value_return = {'status': 'success'} print(f'plc resume {station_no} success') return value_return def stop_station(self, station_no): value_return = {'status': 'error', 'message': f"stop station{station_no} error"} with self.plc_comm_lock: resp1 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01, [0x00]) resp2 = self.machine_plc.plc_write_bits(self.plc_address, 0xFF, self.drop_register[station_no]['stop'], 0x01, [0x01]) if 'status' in resp1 and resp1['status'] == 'success' and 'data' in resp1 and resp1['data'][0] == 0x06 and \ 'status' in resp2 and resp2['status'] == 'success' and 'data' in resp2 and resp2['data'][ 0] == 0x06: # ACK value_return = {'status': 'success'} print(f'plc stop {station_no} success') return value_return def station_start_status(self, station_no): value_return = {'status': 'error'} with self.plc_comm_lock: resp = self.machine_plc.plc_read_bits(self.plc_address, 0xFF, self.drop_register[station_no]['start'], 0x01) if 'status' in resp and resp['status'] == 'success' and 'data' in resp and resp['length'] == 0x01: value_return = {'status': 'success', 'value': resp['data'][0]} print(f'read {station_no} cyclesFinished success', resp['data'][0]) return value_return