import array import asyncio import sys import time import datetime import keyboard import serial import requests import threading import argparse import re import os from pymodbus.exceptions import ModbusIOException from pymodbus.server import StartTcpServer, StartAsyncTcpServer from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext from multiprocessing import Process # 动态导入 ModbusPLC from modbus_plc import get_modbus_plc_class # 保持向后兼容的工厂函数 ModbusPLC = lambda port, config: get_modbus_plc_class()(port, config) from mhi_plc import MitsubishiPLC import utils from utils import print_with_timestamp from pymodbus.client import ModbusSerialClient as ModbusSerialClient from pymodbus.exceptions import ModbusException, ConnectionException def test_modbus_plc1(): # 创建Modbus RTU客户端 with ModbusSerialClient(method="rtu", port="com1", baudrate=19200, parity="E", timeout=1) as client: if client.connect(): print("Modbus RTU Client Connected") else: print("Failed to connect to Modbus RTU Client") result = client.write_coil(117, 0, 1) utils.condition_print('write coil result', result.isError()) result = client.write_coil(118, 1, 1) utils.condition_print('write coil result', result.isError()) client.close() from pymodbus.datastore import ModbusSequentialDataBlock class CustomDataBlock(ModbusSequentialDataBlock): def __init__(self, data_type, plc_devices=None, plc_address=0x01, com_port="com1", plc_type="mhi", *args, **kwargs): # print("CustomDataBlock init ", plc_address, com_port, plc_type) if plc_devices is None: plc_devices = [] self.com_port = com_port # PLC 通讯串口号 self.plc_address = plc_address # PLC的 站地址 如:1 3 4等 self.plc_type = plc_type # PLC 类型 self.data_type = data_type # 区分coil input_register holding_register self.address = 0 # 这个是client 进行读写的寄存器地址 self.plc_devices = plc_devices # plc 设备的列表,每一台机器对应一个 # plc_devices 中元素中的device 是 MitsubishiPLC的实例,用于和物理PLC的通讯 # 机器的COM口相同的话,会共享相同的MitsubishiPLC的实例 self.plc_comm_lock = threading.Lock() # 具体访问MitsubishiPLC的保护锁 super().__init__(*args, **kwargs) def getValues(self, address, count): # plc_device_obj = [item for item in self.plc_devices if item["slave"] == self.plc_address] # 按照站地址进行匹配 plc_device_obj = [item for item in self.plc_devices if item["slave"] == self.plc_address and item["com"] == self.com_port and item[ 'type'] == self.plc_type] # 按照站地址进行匹配 plc_device = None if len(plc_device_obj) and plc_device_obj[0].get('device'): plc_device = plc_device_obj[0].get('device') # 得到访问物理PLC的MitsubishiPLC的实例 if self.data_type == 'coil': # 处理线圈数据类型的逻辑 # print('read coils address=', address) # modbus client 要读的寄存器地址 if plc_device: with self.plc_comm_lock: # 保护访问资源,执行完缩进的代码块释放资源 resp = plc_device.plc_read_bits(self.plc_address, 0xFF, plc_device.conv_to_plc_address('M', address), count) if resp and resp.get('status') == 'success' and resp.get('data') and resp.get('length'): return resp['data'] else: return 0x00 # raise ModbusIOException("Modbus server read coil failed--PLC response error") else: if address == 18: return [True, False, True] else: raise ModbusIOException("Modbus server read coil failed--PLC not connected") pass elif self.data_type == 'input_register': # 处理输入寄存器数据类型的逻辑 default_value = [0x00 for _ in range(count)] if plc_device: with self.plc_comm_lock: # 保护访问资源,执行完缩进的代码块释放资源 resp = plc_device.plc_read_words(self.plc_address, 0xFF, plc_device.conv_to_plc_address('D', address), count) if resp and resp.get('status') == 'success' and resp.get('data') and resp.get('length'): return resp['data'] else: # raise ModbusIOException("Modbus server read input register failed--PLC response error") return default_value else: if address == 185: return [88, 89, 100] else: raise ModbusIOException("Modbus server read input register failed--PLC not connected") pass elif self.data_type == 'holding_register': # 处理保持寄存器数据类型的逻辑 if address == 10: # 10 作为特殊的寄存器,用来返回PLC(物理)的连接状态 if plc_device: # 直接返回缓存的值,无需实时读取 # return plc_device.get_connect_state(plc_device.conv_to_plc_address('D', 10), self.plc_address) #print_with_timestamp(f"modbus server begin--0 get_connect_state_cache {plc_device.plc_port}") connect_state = plc_device.get_connect_state_cache(plc_device.conv_to_plc_address('D', 10), self.plc_address) #print_with_timestamp(f"modbus server end -- 1 get_connect_state_cache {plc_device.plc_port} ") return connect_state else: return [0, 0, 0, 0, 0, 0, 0, 0, 0] if plc_device: default_value = [0x00 for _ in range(count)] with self.plc_comm_lock: # 保护访问资源,执行完缩进的代码块释放资源 resp = plc_device.plc_read_words(self.plc_address, 0xFF, plc_device.conv_to_plc_address('D', address), count) if resp and resp.get('status') == 'success' and resp.get('data') and resp.get('length'): return resp.get('data') else: # raise ModbusIOException("Modbus server read holding register failed--PLC response error") return default_value else: if address == 185: return [88, 89, 100] else: raise ModbusIOException("Modbus server read holding register failed--PLC not connected") pass def setValues(self, address, values): # print("CustomDataBlock setValues ", self.data_type, address, values) # plc_device_obj = [item for item in self.plc_devices if item["slave"] == self.plc_address] plc_device_obj = [item for item in self.plc_devices if item["slave"] == self.plc_address and item["com"] == self.com_port and item[ 'type'] == self.plc_type] # 按照站地址进行匹配 plc_device = None if len(plc_device_obj) and plc_device_obj[0].get('device'): plc_device = plc_device_obj[0].get('device') # print("CustomDataBlock setValues plc_devices", self.plc_devices,plc_device) # cyx if self.data_type == 'coil': # 处理线圈数据类型的逻辑 if plc_device: with self.plc_comm_lock: # 保护访问资源,执行完缩进的代码块释放资源 resp = plc_device.plc_write_bit(self.plc_address, 0xFF, plc_device.conv_to_plc_address('M', address), values[0]) if resp and resp.get('status') == 'success': # ACK pass else: # raise ModbusIOException("Modbus server set coil failed--PLC response error") pass else: # 如果读取不成功,抛出一个Modbus异常 # raise ModbusIOException("Modbus server set coil failed--PLC not connected") pass elif self.data_type == 'input_register': # 处理输入寄存器数据类型的逻辑 pass elif self.data_type == 'holding_register': # print("modbus server set holding register address=", address, len(values), values, self.plc_address) # cyx if plc_device: with self.plc_comm_lock: # 保护访问资源,执行完缩进的代码块释放资源 if len(values) == 1: resp = plc_device.plc_write_word(self.plc_address, 0xFF, plc_device.conv_to_plc_address('D', address), values[0]) if resp and resp.get('status') == 'success': # ACK pass else: raise ModbusIOException("Modbus server set holding register failed--PLC response error") if len(values) > 1: resp = plc_device.plc_write_words(self.plc_address, 0xFF, plc_device.conv_to_plc_address('D', address), len(values), values) if resp and resp.get('status') == 'success': # ACK pass else: raise ModbusIOException("Modbus server set holding register failed--PLC response error") # 处理保持寄存器数据类型的逻辑 else: # 如果读取不成功,抛出一个Modbus异常 # raise ModbusIOException("Modbus server set holding register failed--PLC not connected") pass exit_flag = False def getPlcConfig(serverUrl, params): result = {} query_string = params response = requests.get(serverUrl + '/get_machine_config', params=query_string, json={}) if response.status_code == 200: result = response.json() else: print('请求失败') pass return result class ModbusServer(Process): def __init__(self, host='127.0.0.1', port=5020, machines_config=None, ready_event=None, mock_mode=False): super().__init__() self.host = host self.port = port self.machines_config = machines_config or [] self.context = None self.server = None self.plc_devices = [] self.ready_event = ready_event # 用于通知主进程,服务器已启动 self.mock_mode = mock_mode # 添加模拟模式参数 def _create_plc_devices(self): """ Create PLC devices based on the machine configuration. """ try: for config in self.machines_config: if config.get('type') == 'mhi' and config.get('com').startswith("com") and config.get('plcAddress'): device_matched = [ device_json['device'] for device_json in self.plc_devices if device_json["com"] == config.get('com') and device_json['type'] == config.get('type') ] device_have_created = device_matched[0] if device_matched else None if device_have_created is None: device_have_created = MitsubishiPLC(config.get('com'), config.get('comm_config')) self.plc_devices.append({ 'slave': int(config.get('plcAddress')), 'com': config.get('com'), 'device': device_have_created, 'type': config.get('type') }) if config.get('type') == 'xinjie' and config.get('com').startswith("com") and config.get('plcAddress'): device_matched = [ device_json['device'] for device_json in self.plc_devices if device_json["com"] == config.get('com') and device_json['type'] == config.get('type') ] device_have_created = device_matched[0] if device_matched else None if device_have_created is None: device_have_created = ModbusPLC(config.get('com'), config.get('comm_config')) self.plc_devices.append({ 'slave': int(config.get('plcAddress')), 'com': config.get('com'), 'device': device_have_created, 'type': config.get('type') }) except Exception as error: print(f"Error creating PLC devices: {error}") def _create_context(self): """ Create Modbus context for each PLC device. """ stores = {} for config in self.machines_config: plc_address = int(config.get('plcAddress')) coil_block = CustomDataBlock('coil', self.plc_devices, plc_address, config.get("com"), config.get("type"), 0, [False] * 0x10000) holding_register_block = CustomDataBlock('holding_register', self.plc_devices, plc_address, config.get("com"), config.get("type"), 0, [0] * 0x10000) input_coil_block = CustomDataBlock('input_coil', self.plc_devices, plc_address, config.get("com"), config.get("type"), 0, [False] * 0x10000) input_register_block = CustomDataBlock('input_register', self.plc_devices, plc_address, config.get("com"), config.get("type"), 0, [0] * 0x10000) store = ModbusSlaveContext( di=input_coil_block, co=coil_block, hr=holding_register_block, ir=input_register_block, zero_mode=True ) tcp_unit_id = int(config.get('tcp_modbus_unit')) stores[tcp_unit_id] = store self.context = ModbusServerContext(slaves=stores, single=False) async def _start_server(self): """ Start the Modbus TCP Server. """ self._create_context() print(f"Starting Modbus server on {self.host}:{self.port}") try: self.server = await StartAsyncTcpServer(self.context, address=(self.host, self.port)) print(f"Modbus server started on {self.host}:{self.port}") # 通知主进程服务器已启动c if self.ready_event: self.ready_event.set() await self.server.serve_forever() except Exception as e: if self.ready_event: self.ready_event.set() print(f"Error starting Modbus server: {e}") def run(self): """ Run the Modbus server in the Process. """ # 设置模拟模式 if self.mock_mode: import modbus_plc modbus_plc.MOCK_MODE = True print("Modbus Server: 启用 PLC 模拟模式") else: print("Modbus Server: 使用真实 PLC 设备") print("Modbus server begin start in a separate process") # 创建 PLC 设备 self._create_plc_devices() # 创建新的事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # 启动 Modbus 服务器 loop.run_until_complete(self._start_server()) except Exception as e: print(f"Failed to start Modbus server: {e}") finally: print("Modbus server shutting down.") if self.server: self.server.shutdown() loop.close() def stop(self): """ Stop the Modbus server gracefully. """ try: for device_info in self.plc_devices: device = device_info.get('device') if hasattr(device, 'stop_polling'): device.stop_polling() if self.server: self.server.shutdown() print("Modbus server stopped gracefully.") finally: pass modbus_server = None # 声明全局变量 exit_flag = False def get_machine_configs(serverUrl='127.0.0.1:5050'): global modbus_server, exit_flag config_got = False while not config_got: try: resp = getPlcConfig(serverUrl, {}) if resp.get('status') == 'success': config_got = True machine_config = resp.get('data', []) modbus_server = ModbusServer(machine_config) modbus_server.modbus_thread = threading.Thread(target=modbus_server.run_modbus_server) modbus_server.modbus_thread.start() except Exception as error: pass time.sleep(3) pass def main(): global modbus_server # 引用全局变量 global exit_flag get_machine_configs_thread = threading.Thread(target=get_machine_configs, args=(args.serverUrl,)) get_machine_configs_thread.start() def stop_server(): # 停止服务器的函数 global exit_flag if modbus_server: modbus_server.stop_modbus_server() if modbus_server and modbus_server.modbus_thread: modbus_server.modbus_thread.join() get_machine_configs_thread.join() exit_flag = True try: while not exit_flag: if keyboard.is_pressed('z'): # stop_server() pass time.sleep(0.1) except KeyboardInterrupt: print("用户强制退出(Ctrl+C)") stop_server() sys.exit(0) if __name__ == "__main__": # 创建解析器 parser = argparse.ArgumentParser(description="My modbus and plc Description") # 添加参数 parser.add_argument('--serverUrl', default="http://127.0.0.1:5050", help="Server URL") # 添加参数 parser.add_argument('--host', default="127.0.0.1", help="Seqrver URL") # 添加参数 parser.add_argument('--port', default=5020, help="Server URL") parser.add_argument('--mock', action='store_true', help="启用 PLC 模拟模式") # 解析参数 args = parser.parse_args() # 设置 MOCK_MODE if hasattr(args, 'mock') and args.mock: import modbus_plc modbus_plc.MOCK_MODE = True print("启用 PLC 模拟模式") else: print("使用真实 PLC 设备") serverUrl = args.serverUrl main()