Files
dtm-py-all/utils.py

547 lines
20 KiB
Python
Raw Normal View History

import datetime
print_condition = False
all_plc_recv = 0
serverUrl = 'http://106.52.71.204:5050'
# 定义颜色代码
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
RESET = '\033[m'
def find_item(json_array, key, value):
if json_array is None:
return None
if isinstance(value, list):
result = []
for matched_value in value:
for item in json_array:
if item.get(key) == matched_value:
result.append(item)
return result
else:
for item in json_array:
if item.get(key) == value:
return item
return None # 如果没有匹配项,则返回 None
def bcd_coder_array(value=0, lenght=4):
return int_to_bcd(value, lenght)
def int_to_bcd(number=0, length=5):
bcd_array = []
try:
int_number = int(round(number)) # 使用round函数四舍五入然后转成整数
while int_number > 0:
bcd = int_number % 100
bcd_array.insert(0, (bcd // 10) << 4 | (bcd % 10))
int_number = int_number // 100
except:
pass
while len(bcd_array) < length:
bcd_array.insert(0, 0)
return bcd_array
def int_to_ascii_array(number, length):
# 将整数转换为ASCII码字符串
ascii_str = str(number)
# 获取ASCII码字符串的长度
ascii_str_length = len(ascii_str)
# 如果ASCII码字符串的长度小于指定长度则在前面补0
if ascii_str_length < length:
ascii_str = '0' * (length - ascii_str_length) + ascii_str
# 将ASCII码字符串转换为ASCII码数组
ascii_array = [ord(c) for c in ascii_str]
return ascii_array
def str_to_ascii_array(str, length):
# 获取ASCII码字符串的长度
ascii_str_length = len(str)
# 如果ASCII码字符串的长度小于指定长度则在前面补0
if ascii_str_length < length:
ascii_str = '0' * (length - ascii_str_length) + str
# 将ASCII码字符串转换为ASCII码数组
ascii_array = [ord(c) for c in str]
return ascii_array
def hex_to_ascii_arr(hex_num, lenth):
str_num = hex(hex_num)[2:].upper().rjust(lenth, '0')
array = [ord(c) for c in str_num]
return array
def convert_ascii_to_num(ascii_val):
if ord('0') <= ascii_val <= ord('9'):
return ascii_val - ord('0')
elif ord('A') <= ascii_val <= ord('F'):
return ascii_val - ord('A') + 10
elif ord('a') <= ascii_val <= ord('f'):
return ascii_val - ord('a') + 10
else:
return 0
def ascii_array_to_word(ascii_array):
if len(ascii_array) != 4:
return 0
return convert_ascii_to_num(ascii_array[0]) * 0x1000 + \
convert_ascii_to_num(ascii_array[1]) * 0x0100 + \
convert_ascii_to_num(ascii_array[2]) * 0x010 + \
convert_ascii_to_num(ascii_array[3])
def date_to_bcd(date_str): # 0:06:20.585233
# condition_print("date=",date_str,type(date_str))
# 移除日期中的"-"字符
date_str = date_str.replace("-", "")
# 按照两位数字进行分割
date_parts = [date_str[i:i + 2] for i in range(0, len(date_str), 2)]
# 将每个部分转换为十六进制数
bcd_array = [int(part, 16) for part in date_parts]
return bcd_array
def time_to_bcd(time_str): # 10:06:20.585233
time_str = time_str.split('.')[0] # 去除掉后面的毫秒
time_parts = time_str.split(':') # 按":"分割
bcd = [int(part) for part in time_parts] # 分别转为整数
bcd = [int(str(i), 16) for i in bcd] # 分别转为16进制
return bcd
def condition_print(*args, **kwargs):
global print_condition # 使用全局变量
message = ' '.join(str(arg) for arg in args)
if kwargs:
message += ', ' + ', '.join("{}={}".format(key, value) for key, value in kwargs.items())
if print_condition:
print(message)
def string_to_registers(string, length=None, encoding='ascii'):
"""
将字符串编码为 Modbus 寄存器格式16 位寄存器
如果未指定长度则自动按照字符串最小长度字节数奇数会补齐返回寄存器
如果指定长度不够的部分用 \x00 填充多余的部分会被截断
如果输入字符串为空"" None并且指定了长度则返回全为 0 的寄存器
在将 2 ASCII 字节组合成 1 个寄存器时交换高字节和低字节的顺序
对非ASCII字符如中文使用空格替换
无论出现什么错误返回的数组都会满足长度要求
:param string: 原始字符串
:param length: 可选目标字节长度如果为 None则按最小长度返回寄存器
:param encoding: 字符串编码格式默认 ascii
:return: 16 位寄存器列表确保满足长度要求
"""
try:
# 如果字符串为 None 或空,设置为空字节流
if not string:
byte_data = b''
else:
# 处理非ASCII字符用空格替换
if encoding == 'ascii':
# 过滤非ASCII字符用空格替换
cleaned_string = ''.join([c if ord(c) < 128 else ' ' for c in string])
byte_data = cleaned_string.encode(encoding)
else:
# 对于其他编码,尝试编码,如果失败则使用空格替换
try:
byte_data = string.encode(encoding)
except UnicodeEncodeError:
# 如果编码失败用空格替换所有非ASCII字符
cleaned_string = ''.join([c if ord(c) < 128 else ' ' for c in string])
byte_data = cleaned_string.encode('ascii')
# 计算需要的寄存器数量
if length is None:
# 未指定长度,使用字节数据的实际长度
byte_length = len(byte_data)
if byte_length % 2 != 0:
byte_length += 1 # 补齐到偶数字节
num_registers = byte_length // 2
else:
# 指定了长度,计算需要的寄存器数量
num_registers = (length + 1) // 2 # 向上取整
# 如果未指定长度,按照最小长度返回寄存器(确保字节对齐)
if length is None:
if len(byte_data) % 2 != 0:
byte_data += b'\x00' # 补齐到偶数字节
else:
# 如果指定了长度
if len(byte_data) > length:
# 截断字符串到指定长度
byte_data = byte_data[:length]
elif len(byte_data) < length:
# 使用 \x00 填充到指定长度
byte_data += b'\x00' * (length - len(byte_data))
# 如果字符串为空或 None 且指定了长度,填充全为 0 的寄存器
if not string and length is not None:
byte_data = b'\x00' * length
# 将字节流分为两个字节一组,并交换字节顺序
registers = [
int.from_bytes(byte_data[i:i + 2][::-1], byteorder='big') # 交换高字节和低字节
for i in range(0, len(byte_data), 2)
]
# 确保寄存器数量正确(处理可能的截断情况)
if len(registers) < num_registers:
registers.extend([0] * (num_registers - len(registers)))
elif len(registers) > num_registers:
registers = registers[:num_registers]
return registers
except Exception as e:
# 记录错误日志(实际应用中可以使用 logging 模块)
print(f"Error in string_to_registers: {e}")
# 确保返回的寄存器数量满足要求
if length is None:
# 未指定长度,返回空列表
return []
else:
# 指定了长度,返回全为 0 的寄存器列表
num_registers = (length + 1) // 2 # 向上取整
return [0] * num_registers
def registers_to_string(registers, encoding='ascii'):
"""
Modbus 寄存器格式16 位寄存器解码为字符串
处理字节顺序交换高字节和低字节交换
自动处理空字节填充去除末尾的空字符
:param registers: 16 位寄存器列表
:param encoding: 字符串编码格式默认 ascii
:return: 解码后的字符串
"""
try:
# 如果寄存器为空或None返回空字符串
if not registers:
return ""
# 将每个寄存器转换为两个字节,并交换字节顺序
byte_data = bytearray()
for reg in registers:
# 将16位寄存器转换为字节并交换字节顺序
bytes_val = reg.to_bytes(2, byteorder='big')
# 交换高字节和低字节
byte_data.append(bytes_val[1])
byte_data.append(bytes_val[0])
# 解码字节数据为字符串
result = byte_data.decode(encoding)
# 去除末尾的空字符(\x00
result = result.rstrip('\x00')
return result
except Exception as e:
# 记录错误日志(实际应用中可以使用 logging 模块)
print(f"Error in registers_to_string: {e}")
return ""
def string_to_registers1(string, length=None, encoding='ascii'):
"""
将字符串编码为 Modbus 寄存器格式16 位寄存器
如果未指定长度则自动按照字符串最小长度字节数奇数会补齐返回寄存器
如果指定长度不够的部分用 \x00 填充多余的部分会被截断
如果输入字符串为空"" None并且指定了长度则返回全为 0 的寄存器
在将 2 ASCII 字节组合成 1 个寄存器时交换高字节和低字节的顺序
:param string: 原始字符串
:param length: 可选目标字节长度如果为 None则按最小长度返回寄存器
:param encoding: 字符串编码格式默认 ascii
:return: 16 位寄存器列表
"""
# 如果字符串为 None 或空,设置为空字节流
if not string:
byte_data = b''
else:
# 将字符串编码为字节流
byte_data = string.encode(encoding)
# 如果未指定长度,按照最小长度返回寄存器(确保字节对齐)
if length is None:
if len(byte_data) % 2 != 0:
byte_data += b'\x00' # 补齐到偶数字节
else:
# 如果指定了长度
if len(byte_data) > length:
# 截断字符串到指定长度
byte_data = byte_data[:length]
elif len(byte_data) < length:
# 使用 \x00 填充到指定长度
byte_data += b'\x00' * (length - len(byte_data))
# 如果字符串为空或 None 且指定了长度,填充全为 0 的寄存器
if not string and length is not None:
byte_data = b'\x00' * length
# 将字节流分为两个字节一组,并交换字节顺序
registers = [
int.from_bytes(byte_data[i:i + 2][::-1], byteorder='big') # 交换高字节和低字节
for i in range(0, len(byte_data), 2)
]
return registers
class PLCDropParamsData:
# 定义类属性来存储结构体定义
structure = [
('dropCode', 0, 1),
('dropCycles', 1, 2),
('dropHeight', 3, 2),
]
# 定义类属性来存储所需的寄存器数量
required_registers_size_per_group = max(start + length for _, start, length in structure)
number_of_groups = 6
required_registers_size = required_registers_size_per_group * number_of_groups
@classmethod
def parse_registers(cls, registers):
if len(registers) < cls.required_registers_size:
raise ValueError("寄存器数量不足,无法解析数据")
data = []
for group_index in range(cls.number_of_groups):
group_data = {}
for field, start, length in cls.structure:
actual_start = start + group_index * cls.required_registers_size_per_group
if length == 1:
value = registers[actual_start]
else:
value = 0
for i in range(length):
value |= registers[actual_start + i] << (16 * i)
group_data[field] = value
data.append(group_data)
return data
@classmethod
def to_registers(cls, data):
"""
将跌落参数数据转换回PLC寄存器值
参数:
data: list of dict, 跌落参数数据每个dict包含dropCode, dropCycles, dropHeight
返回:
list: 连续的寄存器值列表
示例:
data = [
{'dropCode': 1, 'dropCycles': 100, 'dropHeight': 500},
{'dropCode': 2, 'dropCycles': 200, 'dropHeight': 600},
# ... 最多6组
]
registers = PLCDropParamsData.to_registers(data)
"""
# 验证数据数量
if len(data) > cls.number_of_groups:
raise ValueError(f"数据组数不能超过{cls.number_of_groups}")
# 创建足够长度的寄存器列表初始化为0
registers = [0] * cls.required_registers_size
# 遍历每组数据
for group_index, group_data in enumerate(data):
# 验证每组数据是否包含必要的字段
for field, _, _ in cls.structure:
if field not in group_data:
raise ValueError(f"{group_index + 1}组数据缺少字段: {field}")
# 处理每个字段
for field, start, length in cls.structure:
actual_start = start + group_index * cls.required_registers_size_per_group
value = group_data[field]
# 验证值范围16位寄存器值应在0-65535之间
if not isinstance(value, int):
raise ValueError(f"{group_index + 1}组数据的{field}必须是整数")
if value < 0 or value > (1 << (16 * length)) - 1:
raise ValueError(f"{group_index + 1}组数据的{field}超出范围: 0-{1 << (16 * length) - 1}")
# 写入寄存器
if length == 1:
registers[actual_start] = value
else:
# 将长整数值拆分为多个16位寄存器
for i in range(length):
registers[actual_start + i] = (value >> (16 * i)) & 0xFFFF
return registers
class PLCTestReqResultData:
# 定义类属性来存储结构体定义
structure = [
# ('flag', 0, 1), # 标志位 0--进行性中 1--已完成 2--无效
('beginYear', 0, 1), # 开始 年 月 日 时 分 秒
('beginMonth', 1, 1),
('beginDay', 2, 1),
('beginHour', 3, 1),
('beginMinute', 4, 1),
('beginSecond', 5, 1),
('endYear', 6, 1), # 结束 年 月 日 时 分 秒
('endMonth', 7, 1),
('endDay', 8, 1),
('endHour', 9, 1),
('endMinute', 10, 1),
('endSecond', 11, 1),
('directionCode', 12, 1), # 正在跌落的方向代码
('station_no', 13, 1), # 通道号 1 2 3
('itemSN', 14, 2), # 流水号,判断是否重复读取用
('dropHeight', 16, 2), # 跌落高度
('dropSpeed', 18, 2), # 跌落速度
('dutDropCycles', 20, 2), # 样品累计已完成跌落次数
('itemCurrentCycles', 22, 2), # 样品此方向已经完成跌落次数
('stationDropCycles', 24, 2), # 本通道自从维护以来累计跌落次数
('machine_sn', 26, 18), # 16个寄存器代表32个字节 设备编码
]
# 定义类属性来存储所需的寄存器数量
required_registers_size = max(start + length for _, start, length in structure)
@classmethod
def get_itemSN_register_index(cls):
for field, start, length in cls.structure:
if field == 'itemSN':
return start
return 1
@classmethod
def get_dut_result_counter_index(cls):
for field, start, length in cls.structure:
if field == 'dutDropCycles':
return start
return 1
@classmethod
def get_flag_register_index(cls):
for field, start, length in cls.structure:
if field == 'flag':
return start
return 1
@classmethod
def parse_registers(cls, registers):
if len(registers) < cls.required_registers_size:
raise ValueError("寄存器数量不足,无法解析数据")
data = {}
error_return = None
try:
for field, start, length in cls.structure:
if length == 1:
value = registers[start]
else:
value = 0
for i in range(length):
value |= registers[start + i] << (16 * i)
if field == 'machine_sn':
value = b''.join(reg.to_bytes(2, byteorder='little') for reg in registers[start:start + length])
value = value.decode('ascii').rstrip('\x00')
data[field] = value
except Exception as error:
print("parse_registers error", error)
error_return = f"parse_registers error {error}"
try:
data['startTime'] = (
f"{data.get('beginYear')}-{data.get('beginMonth'):02d}-{data.get('beginDay'):02d} "
f"{data.get('beginHour'):02d}:{data.get('beginMinute'):02d}:{data.get('beginSecond'):02d}"
)
data['endTime'] = (
f"{data.get('endYear')}-{data.get('endMonth'):02d}-{data.get('endDay'):02d} "
f"{data.get('endHour'):02d}:{data.get('endMinute'):02d}:{data.get('endSecond'):02d}"
)
except Exception as error:
if error_return:
error_return = error_return + f"conv time {error}"
else:
f"conv time {error}"
return data, error_return
import colorama
from colorama import Fore, Back, Style
# 定义print颜色代码
# 定义颜色代码(使用 colorama
# 初始化 colorama
colorama.init(autoreset=True)
class Colors:
RESET = Style.RESET_ALL
BOLD = Style.BRIGHT
# 前景色
BLACK = Fore.BLACK
RED = Fore.RED
GREEN = Fore.GREEN
YELLOW = Fore.YELLOW
BLUE = Fore.BLUE
MAGENTA = Fore.MAGENTA
CYAN = Fore.CYAN
WHITE = Fore.WHITE
# 背景色
BG_BLACK = Back.BLACK
BG_RED = Back.RED
BG_GREEN = Back.GREEN
BG_YELLOW = Back.YELLOW
BG_BLUE = Back.BLUE
BG_MAGENTA = Back.MAGENTA
BG_CYAN = Back.CYAN
BG_WHITE = Back.WHITE
# 自定义 print 函数,带时间戳和颜色
def print_with_timestamp(*args, color=None, **kwargs):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 选择颜色
color_code = Colors.RESET
if not color:
pass
elif isinstance(color, str) is False:
pass
elif color.lower() == 'red':
color_code = Colors.RED
elif color.lower() == 'green':
color_code = Colors.GREEN
elif color.lower() == 'yellow':
color_code = Colors.YELLOW
elif color.lower() == 'blue':
color_code = Colors.BLUE
elif color.lower() == 'magenta':
color_code = Colors.MAGENTA
elif color.lower() == 'cyan':
color_code = Colors.CYAN
# 构建带时间戳和颜色的消息
message = ' '.join(str(arg) for arg in args)
colored_message = f"{Colors.BLUE}{timestamp}{Colors.RESET} {color_code}{message}{Colors.RESET}"
print(colored_message, **kwargs)