import datetime print_condition = False all_plc_recv = 0 serverUrl = 'http://106.52.71.204:5050' # 定义颜色代码 RED = '\033[31m' GREEN = '\033[32m' YELLOW = '\033[33m' BLUE = '\033[34m' MAGENTA = '\033[35m' CYAN = '\033[36m' WHITE = '\033[37m' RESET = '\033[m' def find_item(json_array, key, value): if json_array is None: return None if isinstance(value, list): result = [] for matched_value in value: for item in json_array: if item.get(key) == matched_value: result.append(item) return result else: for item in json_array: if item.get(key) == value: return item return None # 如果没有匹配项,则返回 None def bcd_coder_array(value=0, lenght=4): return int_to_bcd(value, lenght) def int_to_bcd(number=0, length=5): bcd_array = [] try: int_number = int(round(number)) # 使用round函数四舍五入,然后转成整数 while int_number > 0: bcd = int_number % 100 bcd_array.insert(0, (bcd // 10) << 4 | (bcd % 10)) int_number = int_number // 100 except: pass while len(bcd_array) < length: bcd_array.insert(0, 0) return bcd_array def int_to_ascii_array(number, length): # 将整数转换为ASCII码字符串 ascii_str = str(number) # 获取ASCII码字符串的长度 ascii_str_length = len(ascii_str) # 如果ASCII码字符串的长度小于指定长度,则在前面补0 if ascii_str_length < length: ascii_str = '0' * (length - ascii_str_length) + ascii_str # 将ASCII码字符串转换为ASCII码数组 ascii_array = [ord(c) for c in ascii_str] return ascii_array def str_to_ascii_array(str, length): # 获取ASCII码字符串的长度 ascii_str_length = len(str) # 如果ASCII码字符串的长度小于指定长度,则在前面补0 if ascii_str_length < length: ascii_str = '0' * (length - ascii_str_length) + str # 将ASCII码字符串转换为ASCII码数组 ascii_array = [ord(c) for c in str] return ascii_array def hex_to_ascii_arr(hex_num, lenth): str_num = hex(hex_num)[2:].upper().rjust(lenth, '0') array = [ord(c) for c in str_num] return array def convert_ascii_to_num(ascii_val): if ord('0') <= ascii_val <= ord('9'): return ascii_val - ord('0') elif ord('A') <= ascii_val <= ord('F'): return ascii_val - ord('A') + 10 elif ord('a') <= ascii_val <= ord('f'): return ascii_val - ord('a') + 10 else: return 0 def ascii_array_to_word(ascii_array): if len(ascii_array) != 4: return 0 return convert_ascii_to_num(ascii_array[0]) * 0x1000 + \ convert_ascii_to_num(ascii_array[1]) * 0x0100 + \ convert_ascii_to_num(ascii_array[2]) * 0x010 + \ convert_ascii_to_num(ascii_array[3]) def date_to_bcd(date_str): # 0:06:20.585233 # condition_print("date=",date_str,type(date_str)) # 移除日期中的"-"字符 date_str = date_str.replace("-", "") # 按照两位数字进行分割 date_parts = [date_str[i:i + 2] for i in range(0, len(date_str), 2)] # 将每个部分转换为十六进制数 bcd_array = [int(part, 16) for part in date_parts] return bcd_array def time_to_bcd(time_str): # 10:06:20.585233 time_str = time_str.split('.')[0] # 去除掉后面的毫秒 time_parts = time_str.split(':') # 按":"分割 bcd = [int(part) for part in time_parts] # 分别转为整数 bcd = [int(str(i), 16) for i in bcd] # 分别转为16进制 return bcd def condition_print(*args, **kwargs): global print_condition # 使用全局变量 message = ' '.join(str(arg) for arg in args) if kwargs: message += ', ' + ', '.join("{}={}".format(key, value) for key, value in kwargs.items()) if print_condition: print(message) def string_to_registers(string, length=None, encoding='ascii'): """ 将字符串编码为 Modbus 寄存器格式(16 位寄存器)。 如果未指定长度,则自动按照字符串最小长度(字节数,奇数会补齐)返回寄存器。 如果指定长度,不够的部分用 \x00 填充,多余的部分会被截断。 如果输入字符串为空("")或 None,并且指定了长度,则返回全为 0 的寄存器。 在将 2 个 ASCII 字节组合成 1 个寄存器时,交换高字节和低字节的顺序。 对非ASCII字符(如中文)使用空格替换。 无论出现什么错误,返回的数组都会满足长度要求。 :param string: 原始字符串 :param length: (可选)目标字节长度。如果为 None,则按最小长度返回寄存器。 :param encoding: 字符串编码格式(默认 ascii) :return: 16 位寄存器列表,确保满足长度要求 """ try: # 如果字符串为 None 或空,设置为空字节流 if not string: byte_data = b'' else: # 处理非ASCII字符:用空格替换 if encoding == 'ascii': # 过滤非ASCII字符,用空格替换 cleaned_string = ''.join([c if ord(c) < 128 else ' ' for c in string]) byte_data = cleaned_string.encode(encoding) else: # 对于其他编码,尝试编码,如果失败则使用空格替换 try: byte_data = string.encode(encoding) except UnicodeEncodeError: # 如果编码失败,用空格替换所有非ASCII字符 cleaned_string = ''.join([c if ord(c) < 128 else ' ' for c in string]) byte_data = cleaned_string.encode('ascii') # 计算需要的寄存器数量 if length is None: # 未指定长度,使用字节数据的实际长度 byte_length = len(byte_data) if byte_length % 2 != 0: byte_length += 1 # 补齐到偶数字节 num_registers = byte_length // 2 else: # 指定了长度,计算需要的寄存器数量 num_registers = (length + 1) // 2 # 向上取整 # 如果未指定长度,按照最小长度返回寄存器(确保字节对齐) if length is None: if len(byte_data) % 2 != 0: byte_data += b'\x00' # 补齐到偶数字节 else: # 如果指定了长度 if len(byte_data) > length: # 截断字符串到指定长度 byte_data = byte_data[:length] elif len(byte_data) < length: # 使用 \x00 填充到指定长度 byte_data += b'\x00' * (length - len(byte_data)) # 如果字符串为空或 None 且指定了长度,填充全为 0 的寄存器 if not string and length is not None: byte_data = b'\x00' * length # 将字节流分为两个字节一组,并交换字节顺序 registers = [ int.from_bytes(byte_data[i:i + 2][::-1], byteorder='big') # 交换高字节和低字节 for i in range(0, len(byte_data), 2) ] # 确保寄存器数量正确(处理可能的截断情况) if len(registers) < num_registers: registers.extend([0] * (num_registers - len(registers))) elif len(registers) > num_registers: registers = registers[:num_registers] return registers except Exception as e: # 记录错误日志(实际应用中可以使用 logging 模块) print(f"Error in string_to_registers: {e}") # 确保返回的寄存器数量满足要求 if length is None: # 未指定长度,返回空列表 return [] else: # 指定了长度,返回全为 0 的寄存器列表 num_registers = (length + 1) // 2 # 向上取整 return [0] * num_registers def registers_to_string(registers, encoding='ascii'): """ 将 Modbus 寄存器格式(16 位寄存器)解码为字符串。 处理字节顺序交换(高字节和低字节交换)。 自动处理空字节填充,去除末尾的空字符。 :param registers: 16 位寄存器列表 :param encoding: 字符串编码格式(默认 ascii) :return: 解码后的字符串 """ try: # 如果寄存器为空或None,返回空字符串 if not registers: return "" # 将每个寄存器转换为两个字节,并交换字节顺序 byte_data = bytearray() for reg in registers: # 将16位寄存器转换为字节,并交换字节顺序 bytes_val = reg.to_bytes(2, byteorder='big') # 交换高字节和低字节 byte_data.append(bytes_val[1]) byte_data.append(bytes_val[0]) # 解码字节数据为字符串 result = byte_data.decode(encoding) # 去除末尾的空字符(\x00) result = result.rstrip('\x00') return result except Exception as e: # 记录错误日志(实际应用中可以使用 logging 模块) print(f"Error in registers_to_string: {e}") return "" def string_to_registers1(string, length=None, encoding='ascii'): """ 将字符串编码为 Modbus 寄存器格式(16 位寄存器)。 如果未指定长度,则自动按照字符串最小长度(字节数,奇数会补齐)返回寄存器。 如果指定长度,不够的部分用 \x00 填充,多余的部分会被截断。 如果输入字符串为空("")或 None,并且指定了长度,则返回全为 0 的寄存器。 在将 2 个 ASCII 字节组合成 1 个寄存器时,交换高字节和低字节的顺序。 :param string: 原始字符串 :param length: (可选)目标字节长度。如果为 None,则按最小长度返回寄存器。 :param encoding: 字符串编码格式(默认 ascii) :return: 16 位寄存器列表 """ # 如果字符串为 None 或空,设置为空字节流 if not string: byte_data = b'' else: # 将字符串编码为字节流 byte_data = string.encode(encoding) # 如果未指定长度,按照最小长度返回寄存器(确保字节对齐) if length is None: if len(byte_data) % 2 != 0: byte_data += b'\x00' # 补齐到偶数字节 else: # 如果指定了长度 if len(byte_data) > length: # 截断字符串到指定长度 byte_data = byte_data[:length] elif len(byte_data) < length: # 使用 \x00 填充到指定长度 byte_data += b'\x00' * (length - len(byte_data)) # 如果字符串为空或 None 且指定了长度,填充全为 0 的寄存器 if not string and length is not None: byte_data = b'\x00' * length # 将字节流分为两个字节一组,并交换字节顺序 registers = [ int.from_bytes(byte_data[i:i + 2][::-1], byteorder='big') # 交换高字节和低字节 for i in range(0, len(byte_data), 2) ] return registers class PLCDropParamsData: # 定义类属性来存储结构体定义 structure = [ ('dropCode', 0, 1), ('dropCycles', 1, 2), ('dropHeight', 3, 2), ] # 定义类属性来存储所需的寄存器数量 required_registers_size_per_group = max(start + length for _, start, length in structure) number_of_groups = 6 required_registers_size = required_registers_size_per_group * number_of_groups @classmethod def parse_registers(cls, registers): if len(registers) < cls.required_registers_size: raise ValueError("寄存器数量不足,无法解析数据") data = [] for group_index in range(cls.number_of_groups): group_data = {} for field, start, length in cls.structure: actual_start = start + group_index * cls.required_registers_size_per_group if length == 1: value = registers[actual_start] else: value = 0 for i in range(length): value |= registers[actual_start + i] << (16 * i) group_data[field] = value data.append(group_data) return data @classmethod def to_registers(cls, data): """ 将跌落参数数据转换回PLC寄存器值 参数: data: list of dict, 跌落参数数据,每个dict包含dropCode, dropCycles, dropHeight 返回: list: 连续的寄存器值列表 示例: data = [ {'dropCode': 1, 'dropCycles': 100, 'dropHeight': 500}, {'dropCode': 2, 'dropCycles': 200, 'dropHeight': 600}, # ... 最多6组 ] registers = PLCDropParamsData.to_registers(data) """ # 验证数据数量 if len(data) > cls.number_of_groups: raise ValueError(f"数据组数不能超过{cls.number_of_groups}") # 创建足够长度的寄存器列表,初始化为0 registers = [0] * cls.required_registers_size # 遍历每组数据 for group_index, group_data in enumerate(data): # 验证每组数据是否包含必要的字段 for field, _, _ in cls.structure: if field not in group_data: raise ValueError(f"第{group_index + 1}组数据缺少字段: {field}") # 处理每个字段 for field, start, length in cls.structure: actual_start = start + group_index * cls.required_registers_size_per_group value = group_data[field] # 验证值范围(16位寄存器,值应在0-65535之间) if not isinstance(value, int): raise ValueError(f"第{group_index + 1}组数据的{field}必须是整数") if value < 0 or value > (1 << (16 * length)) - 1: raise ValueError(f"第{group_index + 1}组数据的{field}超出范围: 0-{1 << (16 * length) - 1}") # 写入寄存器 if length == 1: registers[actual_start] = value else: # 将长整数值拆分为多个16位寄存器 for i in range(length): registers[actual_start + i] = (value >> (16 * i)) & 0xFFFF return registers class PLCTestReqResultData: # 定义类属性来存储结构体定义 structure = [ # ('flag', 0, 1), # 标志位 0--进行性中 1--已完成 2--无效 ('beginYear', 0, 1), # 开始 年 月 日 时 分 秒 ('beginMonth', 1, 1), ('beginDay', 2, 1), ('beginHour', 3, 1), ('beginMinute', 4, 1), ('beginSecond', 5, 1), ('endYear', 6, 1), # 结束 年 月 日 时 分 秒 ('endMonth', 7, 1), ('endDay', 8, 1), ('endHour', 9, 1), ('endMinute', 10, 1), ('endSecond', 11, 1), ('directionCode', 12, 1), # 正在跌落的方向代码 ('station_no', 13, 1), # 通道号 1 2 3 ('itemSN', 14, 2), # 流水号,判断是否重复读取用 ('dropHeight', 16, 2), # 跌落高度 ('dropSpeed', 18, 2), # 跌落速度 ('dutDropCycles', 20, 2), # 样品累计已完成跌落次数 ('itemCurrentCycles', 22, 2), # 样品此方向已经完成跌落次数 ('stationDropCycles', 24, 2), # 本通道自从维护以来累计跌落次数 ('machine_sn', 26, 18), # 16个寄存器代表32个字节 设备编码 ] # 定义类属性来存储所需的寄存器数量 required_registers_size = max(start + length for _, start, length in structure) @classmethod def get_itemSN_register_index(cls): for field, start, length in cls.structure: if field == 'itemSN': return start return 1 @classmethod def get_dut_result_counter_index(cls): for field, start, length in cls.structure: if field == 'dutDropCycles': return start return 1 @classmethod def get_flag_register_index(cls): for field, start, length in cls.structure: if field == 'flag': return start return 1 @classmethod def parse_registers(cls, registers): if len(registers) < cls.required_registers_size: raise ValueError("寄存器数量不足,无法解析数据") data = {} error_return = None try: for field, start, length in cls.structure: if length == 1: value = registers[start] else: value = 0 for i in range(length): value |= registers[start + i] << (16 * i) if field == 'machine_sn': value = b''.join(reg.to_bytes(2, byteorder='little') for reg in registers[start:start + length]) value = value.decode('ascii').rstrip('\x00') data[field] = value except Exception as error: print("parse_registers error", error) error_return = f"parse_registers error {error}" try: data['startTime'] = ( f"{data.get('beginYear')}-{data.get('beginMonth'):02d}-{data.get('beginDay'):02d} " f"{data.get('beginHour'):02d}:{data.get('beginMinute'):02d}:{data.get('beginSecond'):02d}" ) data['endTime'] = ( f"{data.get('endYear')}-{data.get('endMonth'):02d}-{data.get('endDay'):02d} " f"{data.get('endHour'):02d}:{data.get('endMinute'):02d}:{data.get('endSecond'):02d}" ) except Exception as error: if error_return: error_return = error_return + f"conv time {error}" else: f"conv time {error}" return data, error_return import colorama from colorama import Fore, Back, Style # 定义print颜色代码 # 定义颜色代码(使用 colorama) # 初始化 colorama colorama.init(autoreset=True) class Colors: RESET = Style.RESET_ALL BOLD = Style.BRIGHT # 前景色 BLACK = Fore.BLACK RED = Fore.RED GREEN = Fore.GREEN YELLOW = Fore.YELLOW BLUE = Fore.BLUE MAGENTA = Fore.MAGENTA CYAN = Fore.CYAN WHITE = Fore.WHITE # 背景色 BG_BLACK = Back.BLACK BG_RED = Back.RED BG_GREEN = Back.GREEN BG_YELLOW = Back.YELLOW BG_BLUE = Back.BLUE BG_MAGENTA = Back.MAGENTA BG_CYAN = Back.CYAN BG_WHITE = Back.WHITE # 自定义 print 函数,带时间戳和颜色 def print_with_timestamp(*args, color=None, **kwargs): timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 选择颜色 color_code = Colors.RESET if not color: pass elif isinstance(color, str) is False: pass elif color.lower() == 'red': color_code = Colors.RED elif color.lower() == 'green': color_code = Colors.GREEN elif color.lower() == 'yellow': color_code = Colors.YELLOW elif color.lower() == 'blue': color_code = Colors.BLUE elif color.lower() == 'magenta': color_code = Colors.MAGENTA elif color.lower() == 'cyan': color_code = Colors.CYAN # 构建带时间戳和颜色的消息 message = ' '.join(str(arg) for arg in args) colored_message = f"{Colors.BLUE}{timestamp}{Colors.RESET} {color_code}{message}{Colors.RESET}" print(colored_message, **kwargs)