547 lines
20 KiB
Python
547 lines
20 KiB
Python
import datetime
|
||
|
||
print_condition = False
|
||
all_plc_recv = 0
|
||
serverUrl = 'http://106.52.71.204:5050'
|
||
# 定义颜色代码
|
||
RED = '\033[31m'
|
||
GREEN = '\033[32m'
|
||
YELLOW = '\033[33m'
|
||
BLUE = '\033[34m'
|
||
MAGENTA = '\033[35m'
|
||
CYAN = '\033[36m'
|
||
WHITE = '\033[37m'
|
||
RESET = '\033[m'
|
||
|
||
|
||
def find_item(json_array, key, value):
|
||
if json_array is None:
|
||
return None
|
||
if isinstance(value, list):
|
||
result = []
|
||
for matched_value in value:
|
||
for item in json_array:
|
||
if item.get(key) == matched_value:
|
||
result.append(item)
|
||
return result
|
||
else:
|
||
for item in json_array:
|
||
if item.get(key) == value:
|
||
return item
|
||
return None # 如果没有匹配项,则返回 None
|
||
|
||
|
||
def bcd_coder_array(value=0, lenght=4):
|
||
return int_to_bcd(value, lenght)
|
||
|
||
|
||
def int_to_bcd(number=0, length=5):
|
||
bcd_array = []
|
||
try:
|
||
int_number = int(round(number)) # 使用round函数四舍五入,然后转成整数
|
||
while int_number > 0:
|
||
bcd = int_number % 100
|
||
bcd_array.insert(0, (bcd // 10) << 4 | (bcd % 10))
|
||
int_number = int_number // 100
|
||
except:
|
||
pass
|
||
|
||
while len(bcd_array) < length:
|
||
bcd_array.insert(0, 0)
|
||
|
||
return bcd_array
|
||
|
||
|
||
def int_to_ascii_array(number, length):
|
||
# 将整数转换为ASCII码字符串
|
||
ascii_str = str(number)
|
||
# 获取ASCII码字符串的长度
|
||
ascii_str_length = len(ascii_str)
|
||
# 如果ASCII码字符串的长度小于指定长度,则在前面补0
|
||
if ascii_str_length < length:
|
||
ascii_str = '0' * (length - ascii_str_length) + ascii_str
|
||
# 将ASCII码字符串转换为ASCII码数组
|
||
ascii_array = [ord(c) for c in ascii_str]
|
||
return ascii_array
|
||
|
||
|
||
def str_to_ascii_array(str, length):
|
||
# 获取ASCII码字符串的长度
|
||
ascii_str_length = len(str)
|
||
# 如果ASCII码字符串的长度小于指定长度,则在前面补0
|
||
if ascii_str_length < length:
|
||
ascii_str = '0' * (length - ascii_str_length) + str
|
||
# 将ASCII码字符串转换为ASCII码数组
|
||
ascii_array = [ord(c) for c in str]
|
||
return ascii_array
|
||
|
||
|
||
def hex_to_ascii_arr(hex_num, lenth):
|
||
str_num = hex(hex_num)[2:].upper().rjust(lenth, '0')
|
||
array = [ord(c) for c in str_num]
|
||
return array
|
||
|
||
|
||
def convert_ascii_to_num(ascii_val):
|
||
if ord('0') <= ascii_val <= ord('9'):
|
||
return ascii_val - ord('0')
|
||
elif ord('A') <= ascii_val <= ord('F'):
|
||
return ascii_val - ord('A') + 10
|
||
elif ord('a') <= ascii_val <= ord('f'):
|
||
return ascii_val - ord('a') + 10
|
||
else:
|
||
return 0
|
||
|
||
|
||
def ascii_array_to_word(ascii_array):
|
||
if len(ascii_array) != 4:
|
||
return 0
|
||
return convert_ascii_to_num(ascii_array[0]) * 0x1000 + \
|
||
convert_ascii_to_num(ascii_array[1]) * 0x0100 + \
|
||
convert_ascii_to_num(ascii_array[2]) * 0x010 + \
|
||
convert_ascii_to_num(ascii_array[3])
|
||
|
||
|
||
def date_to_bcd(date_str): # 0:06:20.585233
|
||
# condition_print("date=",date_str,type(date_str))
|
||
# 移除日期中的"-"字符
|
||
date_str = date_str.replace("-", "")
|
||
# 按照两位数字进行分割
|
||
date_parts = [date_str[i:i + 2] for i in range(0, len(date_str), 2)]
|
||
# 将每个部分转换为十六进制数
|
||
bcd_array = [int(part, 16) for part in date_parts]
|
||
return bcd_array
|
||
|
||
|
||
def time_to_bcd(time_str): # 10:06:20.585233
|
||
time_str = time_str.split('.')[0] # 去除掉后面的毫秒
|
||
time_parts = time_str.split(':') # 按":"分割
|
||
bcd = [int(part) for part in time_parts] # 分别转为整数
|
||
bcd = [int(str(i), 16) for i in bcd] # 分别转为16进制
|
||
return bcd
|
||
|
||
|
||
def condition_print(*args, **kwargs):
|
||
global print_condition # 使用全局变量
|
||
message = ' '.join(str(arg) for arg in args)
|
||
if kwargs:
|
||
message += ', ' + ', '.join("{}={}".format(key, value) for key, value in kwargs.items())
|
||
|
||
if print_condition:
|
||
print(message)
|
||
|
||
|
||
def string_to_registers(string, length=None, encoding='ascii'):
|
||
"""
|
||
将字符串编码为 Modbus 寄存器格式(16 位寄存器)。
|
||
如果未指定长度,则自动按照字符串最小长度(字节数,奇数会补齐)返回寄存器。
|
||
如果指定长度,不够的部分用 \x00 填充,多余的部分会被截断。
|
||
如果输入字符串为空("")或 None,并且指定了长度,则返回全为 0 的寄存器。
|
||
在将 2 个 ASCII 字节组合成 1 个寄存器时,交换高字节和低字节的顺序。
|
||
对非ASCII字符(如中文)使用空格替换。
|
||
|
||
无论出现什么错误,返回的数组都会满足长度要求。
|
||
|
||
:param string: 原始字符串
|
||
:param length: (可选)目标字节长度。如果为 None,则按最小长度返回寄存器。
|
||
:param encoding: 字符串编码格式(默认 ascii)
|
||
:return: 16 位寄存器列表,确保满足长度要求
|
||
"""
|
||
try:
|
||
# 如果字符串为 None 或空,设置为空字节流
|
||
if not string:
|
||
byte_data = b''
|
||
else:
|
||
# 处理非ASCII字符:用空格替换
|
||
if encoding == 'ascii':
|
||
# 过滤非ASCII字符,用空格替换
|
||
cleaned_string = ''.join([c if ord(c) < 128 else ' ' for c in string])
|
||
byte_data = cleaned_string.encode(encoding)
|
||
else:
|
||
# 对于其他编码,尝试编码,如果失败则使用空格替换
|
||
try:
|
||
byte_data = string.encode(encoding)
|
||
except UnicodeEncodeError:
|
||
# 如果编码失败,用空格替换所有非ASCII字符
|
||
cleaned_string = ''.join([c if ord(c) < 128 else ' ' for c in string])
|
||
byte_data = cleaned_string.encode('ascii')
|
||
|
||
# 计算需要的寄存器数量
|
||
if length is None:
|
||
# 未指定长度,使用字节数据的实际长度
|
||
byte_length = len(byte_data)
|
||
if byte_length % 2 != 0:
|
||
byte_length += 1 # 补齐到偶数字节
|
||
num_registers = byte_length // 2
|
||
else:
|
||
# 指定了长度,计算需要的寄存器数量
|
||
num_registers = (length + 1) // 2 # 向上取整
|
||
|
||
# 如果未指定长度,按照最小长度返回寄存器(确保字节对齐)
|
||
if length is None:
|
||
if len(byte_data) % 2 != 0:
|
||
byte_data += b'\x00' # 补齐到偶数字节
|
||
else:
|
||
# 如果指定了长度
|
||
if len(byte_data) > length:
|
||
# 截断字符串到指定长度
|
||
byte_data = byte_data[:length]
|
||
elif len(byte_data) < length:
|
||
# 使用 \x00 填充到指定长度
|
||
byte_data += b'\x00' * (length - len(byte_data))
|
||
|
||
# 如果字符串为空或 None 且指定了长度,填充全为 0 的寄存器
|
||
if not string and length is not None:
|
||
byte_data = b'\x00' * length
|
||
|
||
# 将字节流分为两个字节一组,并交换字节顺序
|
||
registers = [
|
||
int.from_bytes(byte_data[i:i + 2][::-1], byteorder='big') # 交换高字节和低字节
|
||
for i in range(0, len(byte_data), 2)
|
||
]
|
||
|
||
# 确保寄存器数量正确(处理可能的截断情况)
|
||
if len(registers) < num_registers:
|
||
registers.extend([0] * (num_registers - len(registers)))
|
||
elif len(registers) > num_registers:
|
||
registers = registers[:num_registers]
|
||
|
||
return registers
|
||
|
||
except Exception as e:
|
||
# 记录错误日志(实际应用中可以使用 logging 模块)
|
||
print(f"Error in string_to_registers: {e}")
|
||
|
||
# 确保返回的寄存器数量满足要求
|
||
if length is None:
|
||
# 未指定长度,返回空列表
|
||
return []
|
||
else:
|
||
# 指定了长度,返回全为 0 的寄存器列表
|
||
num_registers = (length + 1) // 2 # 向上取整
|
||
return [0] * num_registers
|
||
|
||
|
||
def registers_to_string(registers, encoding='ascii'):
|
||
"""
|
||
将 Modbus 寄存器格式(16 位寄存器)解码为字符串。
|
||
处理字节顺序交换(高字节和低字节交换)。
|
||
自动处理空字节填充,去除末尾的空字符。
|
||
|
||
:param registers: 16 位寄存器列表
|
||
:param encoding: 字符串编码格式(默认 ascii)
|
||
:return: 解码后的字符串
|
||
"""
|
||
try:
|
||
# 如果寄存器为空或None,返回空字符串
|
||
if not registers:
|
||
return ""
|
||
|
||
# 将每个寄存器转换为两个字节,并交换字节顺序
|
||
byte_data = bytearray()
|
||
for reg in registers:
|
||
# 将16位寄存器转换为字节,并交换字节顺序
|
||
bytes_val = reg.to_bytes(2, byteorder='big')
|
||
# 交换高字节和低字节
|
||
byte_data.append(bytes_val[1])
|
||
byte_data.append(bytes_val[0])
|
||
|
||
# 解码字节数据为字符串
|
||
result = byte_data.decode(encoding)
|
||
|
||
# 去除末尾的空字符(\x00)
|
||
result = result.rstrip('\x00')
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# 记录错误日志(实际应用中可以使用 logging 模块)
|
||
print(f"Error in registers_to_string: {e}")
|
||
return ""
|
||
|
||
def string_to_registers1(string, length=None, encoding='ascii'):
|
||
"""
|
||
将字符串编码为 Modbus 寄存器格式(16 位寄存器)。
|
||
如果未指定长度,则自动按照字符串最小长度(字节数,奇数会补齐)返回寄存器。
|
||
如果指定长度,不够的部分用 \x00 填充,多余的部分会被截断。
|
||
如果输入字符串为空("")或 None,并且指定了长度,则返回全为 0 的寄存器。
|
||
在将 2 个 ASCII 字节组合成 1 个寄存器时,交换高字节和低字节的顺序。
|
||
|
||
:param string: 原始字符串
|
||
:param length: (可选)目标字节长度。如果为 None,则按最小长度返回寄存器。
|
||
:param encoding: 字符串编码格式(默认 ascii)
|
||
:return: 16 位寄存器列表
|
||
"""
|
||
# 如果字符串为 None 或空,设置为空字节流
|
||
if not string:
|
||
byte_data = b''
|
||
else:
|
||
# 将字符串编码为字节流
|
||
byte_data = string.encode(encoding)
|
||
|
||
# 如果未指定长度,按照最小长度返回寄存器(确保字节对齐)
|
||
if length is None:
|
||
if len(byte_data) % 2 != 0:
|
||
byte_data += b'\x00' # 补齐到偶数字节
|
||
else:
|
||
# 如果指定了长度
|
||
if len(byte_data) > length:
|
||
# 截断字符串到指定长度
|
||
byte_data = byte_data[:length]
|
||
elif len(byte_data) < length:
|
||
# 使用 \x00 填充到指定长度
|
||
byte_data += b'\x00' * (length - len(byte_data))
|
||
|
||
# 如果字符串为空或 None 且指定了长度,填充全为 0 的寄存器
|
||
if not string and length is not None:
|
||
byte_data = b'\x00' * length
|
||
|
||
# 将字节流分为两个字节一组,并交换字节顺序
|
||
registers = [
|
||
int.from_bytes(byte_data[i:i + 2][::-1], byteorder='big') # 交换高字节和低字节
|
||
for i in range(0, len(byte_data), 2)
|
||
]
|
||
return registers
|
||
|
||
|
||
class PLCDropParamsData:
|
||
# 定义类属性来存储结构体定义
|
||
structure = [
|
||
('dropCode', 0, 1),
|
||
('dropCycles', 1, 2),
|
||
('dropHeight', 3, 2),
|
||
]
|
||
|
||
# 定义类属性来存储所需的寄存器数量
|
||
required_registers_size_per_group = max(start + length for _, start, length in structure)
|
||
number_of_groups = 6
|
||
required_registers_size = required_registers_size_per_group * number_of_groups
|
||
|
||
@classmethod
|
||
def parse_registers(cls, registers):
|
||
if len(registers) < cls.required_registers_size:
|
||
raise ValueError("寄存器数量不足,无法解析数据")
|
||
|
||
data = []
|
||
for group_index in range(cls.number_of_groups):
|
||
group_data = {}
|
||
for field, start, length in cls.structure:
|
||
actual_start = start + group_index * cls.required_registers_size_per_group
|
||
if length == 1:
|
||
value = registers[actual_start]
|
||
else:
|
||
value = 0
|
||
for i in range(length):
|
||
value |= registers[actual_start + i] << (16 * i)
|
||
group_data[field] = value
|
||
data.append(group_data)
|
||
return data
|
||
|
||
@classmethod
|
||
def to_registers(cls, data):
|
||
"""
|
||
将跌落参数数据转换回PLC寄存器值
|
||
|
||
参数:
|
||
data: list of dict, 跌落参数数据,每个dict包含dropCode, dropCycles, dropHeight
|
||
|
||
返回:
|
||
list: 连续的寄存器值列表
|
||
|
||
示例:
|
||
data = [
|
||
{'dropCode': 1, 'dropCycles': 100, 'dropHeight': 500},
|
||
{'dropCode': 2, 'dropCycles': 200, 'dropHeight': 600},
|
||
# ... 最多6组
|
||
]
|
||
registers = PLCDropParamsData.to_registers(data)
|
||
"""
|
||
# 验证数据数量
|
||
if len(data) > cls.number_of_groups:
|
||
raise ValueError(f"数据组数不能超过{cls.number_of_groups}")
|
||
|
||
# 创建足够长度的寄存器列表,初始化为0
|
||
registers = [0] * cls.required_registers_size
|
||
|
||
# 遍历每组数据
|
||
for group_index, group_data in enumerate(data):
|
||
# 验证每组数据是否包含必要的字段
|
||
for field, _, _ in cls.structure:
|
||
if field not in group_data:
|
||
raise ValueError(f"第{group_index + 1}组数据缺少字段: {field}")
|
||
|
||
# 处理每个字段
|
||
for field, start, length in cls.structure:
|
||
actual_start = start + group_index * cls.required_registers_size_per_group
|
||
value = group_data[field]
|
||
|
||
# 验证值范围(16位寄存器,值应在0-65535之间)
|
||
if not isinstance(value, int):
|
||
raise ValueError(f"第{group_index + 1}组数据的{field}必须是整数")
|
||
|
||
if value < 0 or value > (1 << (16 * length)) - 1:
|
||
raise ValueError(f"第{group_index + 1}组数据的{field}超出范围: 0-{1 << (16 * length) - 1}")
|
||
|
||
# 写入寄存器
|
||
if length == 1:
|
||
registers[actual_start] = value
|
||
else:
|
||
# 将长整数值拆分为多个16位寄存器
|
||
for i in range(length):
|
||
registers[actual_start + i] = (value >> (16 * i)) & 0xFFFF
|
||
|
||
return registers
|
||
|
||
class PLCTestReqResultData:
|
||
# 定义类属性来存储结构体定义
|
||
structure = [
|
||
# ('flag', 0, 1), # 标志位 0--进行性中 1--已完成 2--无效
|
||
|
||
('beginYear', 0, 1), # 开始 年 月 日 时 分 秒
|
||
('beginMonth', 1, 1),
|
||
('beginDay', 2, 1),
|
||
('beginHour', 3, 1),
|
||
('beginMinute', 4, 1),
|
||
('beginSecond', 5, 1),
|
||
('endYear', 6, 1), # 结束 年 月 日 时 分 秒
|
||
('endMonth', 7, 1),
|
||
('endDay', 8, 1),
|
||
('endHour', 9, 1),
|
||
('endMinute', 10, 1),
|
||
('endSecond', 11, 1),
|
||
('directionCode', 12, 1), # 正在跌落的方向代码
|
||
('station_no', 13, 1), # 通道号 1 2 3
|
||
('itemSN', 14, 2), # 流水号,判断是否重复读取用
|
||
('dropHeight', 16, 2), # 跌落高度
|
||
('dropSpeed', 18, 2), # 跌落速度
|
||
('dutDropCycles', 20, 2), # 样品累计已完成跌落次数
|
||
('itemCurrentCycles', 22, 2), # 样品此方向已经完成跌落次数
|
||
('stationDropCycles', 24, 2), # 本通道自从维护以来累计跌落次数
|
||
('machine_sn', 26, 18), # 16个寄存器代表32个字节 设备编码
|
||
]
|
||
|
||
# 定义类属性来存储所需的寄存器数量
|
||
required_registers_size = max(start + length for _, start, length in structure)
|
||
|
||
@classmethod
|
||
def get_itemSN_register_index(cls):
|
||
for field, start, length in cls.structure:
|
||
if field == 'itemSN':
|
||
return start
|
||
return 1
|
||
|
||
@classmethod
|
||
def get_dut_result_counter_index(cls):
|
||
for field, start, length in cls.structure:
|
||
if field == 'dutDropCycles':
|
||
return start
|
||
return 1
|
||
|
||
@classmethod
|
||
def get_flag_register_index(cls):
|
||
for field, start, length in cls.structure:
|
||
if field == 'flag':
|
||
return start
|
||
return 1
|
||
|
||
@classmethod
|
||
def parse_registers(cls, registers):
|
||
if len(registers) < cls.required_registers_size:
|
||
raise ValueError("寄存器数量不足,无法解析数据")
|
||
data = {}
|
||
error_return = None
|
||
try:
|
||
for field, start, length in cls.structure:
|
||
if length == 1:
|
||
value = registers[start]
|
||
else:
|
||
value = 0
|
||
for i in range(length):
|
||
value |= registers[start + i] << (16 * i)
|
||
if field == 'machine_sn':
|
||
value = b''.join(reg.to_bytes(2, byteorder='little') for reg in registers[start:start + length])
|
||
value = value.decode('ascii').rstrip('\x00')
|
||
data[field] = value
|
||
except Exception as error:
|
||
print("parse_registers error", error)
|
||
error_return = f"parse_registers error {error}"
|
||
|
||
try:
|
||
data['startTime'] = (
|
||
f"{data.get('beginYear')}-{data.get('beginMonth'):02d}-{data.get('beginDay'):02d} "
|
||
f"{data.get('beginHour'):02d}:{data.get('beginMinute'):02d}:{data.get('beginSecond'):02d}"
|
||
)
|
||
data['endTime'] = (
|
||
f"{data.get('endYear')}-{data.get('endMonth'):02d}-{data.get('endDay'):02d} "
|
||
f"{data.get('endHour'):02d}:{data.get('endMinute'):02d}:{data.get('endSecond'):02d}"
|
||
)
|
||
|
||
except Exception as error:
|
||
if error_return:
|
||
error_return = error_return + f"conv time {error}"
|
||
else:
|
||
f"conv time {error}"
|
||
|
||
return data, error_return
|
||
|
||
|
||
import colorama
|
||
from colorama import Fore, Back, Style
|
||
# 定义print颜色代码
|
||
# 定义颜色代码(使用 colorama)
|
||
# 初始化 colorama
|
||
colorama.init(autoreset=True)
|
||
class Colors:
|
||
RESET = Style.RESET_ALL
|
||
BOLD = Style.BRIGHT
|
||
|
||
# 前景色
|
||
BLACK = Fore.BLACK
|
||
RED = Fore.RED
|
||
GREEN = Fore.GREEN
|
||
YELLOW = Fore.YELLOW
|
||
BLUE = Fore.BLUE
|
||
MAGENTA = Fore.MAGENTA
|
||
CYAN = Fore.CYAN
|
||
WHITE = Fore.WHITE
|
||
|
||
# 背景色
|
||
BG_BLACK = Back.BLACK
|
||
BG_RED = Back.RED
|
||
BG_GREEN = Back.GREEN
|
||
BG_YELLOW = Back.YELLOW
|
||
BG_BLUE = Back.BLUE
|
||
BG_MAGENTA = Back.MAGENTA
|
||
BG_CYAN = Back.CYAN
|
||
BG_WHITE = Back.WHITE
|
||
|
||
|
||
# 自定义 print 函数,带时间戳和颜色
|
||
def print_with_timestamp(*args, color=None, **kwargs):
|
||
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
# 选择颜色
|
||
color_code = Colors.RESET
|
||
if not color:
|
||
pass
|
||
elif isinstance(color, str) is False:
|
||
pass
|
||
elif color.lower() == 'red':
|
||
color_code = Colors.RED
|
||
elif color.lower() == 'green':
|
||
color_code = Colors.GREEN
|
||
elif color.lower() == 'yellow':
|
||
color_code = Colors.YELLOW
|
||
elif color.lower() == 'blue':
|
||
color_code = Colors.BLUE
|
||
elif color.lower() == 'magenta':
|
||
color_code = Colors.MAGENTA
|
||
elif color.lower() == 'cyan':
|
||
color_code = Colors.CYAN
|
||
|
||
# 构建带时间戳和颜色的消息
|
||
message = ' '.join(str(arg) for arg in args)
|
||
colored_message = f"{Colors.BLUE}{timestamp}{Colors.RESET} {color_code}{message}{Colors.RESET}"
|
||
|
||
print(colored_message, **kwargs)
|
||
|