Files
dtm-py-all/Observable.py

208 lines
7.4 KiB
Python
Raw Permalink Normal View History

import threading
import binascii
import datetime
import requests
from collections.abc import MutableMapping, MutableSequence
class ObservableDict(MutableMapping):
def __init__(self, initial_data=None, callback=None, timeout=0.05):
self._data = {}
self._callback = callback
self._timeout = timeout # 延迟触发时间,单位:秒
self._timer = None # 定时器
self._lock = threading.Lock() # 用于保护线程安全
self._updated_keys = set() # 记录更新的键和值
self._deleted_keys = set() # 记录删除的键
self._forbid_callback = False
if initial_data:
for key, value in initial_data.items():
self[key] = value # 初始化时封装嵌套结构
def to_dict(self):
"""
ObservableDict 转换为普通的 dict
如果其中包含嵌套的 ObservableDict ObservableList也会递归转换
"""
def convert(value):
if isinstance(value, ObservableDict):
return value.to_dict()
elif isinstance(value, ObservableList):
return value.to_list()
return value
# 转换 _data 中所有键值对
return {key: convert(value) for key, value in self._data.items()}
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
# 支持嵌套结构
if isinstance(value, dict):
value = ObservableDict(value, self._callback, self._timeout)
elif isinstance(value, list):
value = ObservableList(value, self._callback, self._timeout)
if self._forbid_callback is False:
with self._lock:
self._data[key] = value
self._updated_keys.add(key) # 记录更新的键和值
if key in self._deleted_keys: # 如果键之前是删除的,现在需要移出删除记录
self._deleted_keys.remove(key)
# 启动延迟触发
self._schedule_callback()
def __delitem__(self, key):
with self._lock:
del self._data[key]
self._deleted_keys.add(key) # 记录删除的键
if key in self._updated_keys: # 如果键之前是更新的,现在需要移出更新记录
self._updated_keys.remove(key)
# 启动延迟触发
self._schedule_callback()
def __iter__(self):
return iter(self._data)
def __len__(self):
return len(self._data)
def __repr__(self):
return repr(self._data)
def _schedule_callback(self):
"""
延时触发回调的机制每次修改会重置定时器直到最后一次修改后的 timeout 时间到达时触发回调
"""
with self._lock:
if self._timer:
self._timer.cancel() # 如果已有定时器,取消它
self._timer = threading.Timer(self._timeout, self._trigger_callback)
self._timer.start()
def _trigger_callback(self):
"""
定时器到期时触发回调传递更新和删除的键及数据
"""
with self._lock:
if self._callback:
# 将更新和删除的键和值传递给回调函数
self._callback(
{
"updated": list(self._updated_keys), # 复制更新的键和值
"deleted": list(self._deleted_keys) # 转换删除的键为列表
},
self._data.copy()
)
# 清空记录
self._updated_keys.clear()
self._deleted_keys.clear()
# 定义 ObservableList
class ObservableList(MutableSequence):
def __init__(self, initial_data=None, callback=None, timeout=0.05):
self._data = []
self._callback = callback
self._timeout = timeout # 延迟触发时间,单位:秒
self._timer = None # 定时器
self._lock = threading.Lock() # 用于保护线程安全
self._updated_indices = set() # 记录更新的索引
self._deleted_indices = set() # 记录删除的索引
self._inserted_indices = set() # 记录插入的索引
if initial_data:
for item in initial_data:
self.append(item)
def __getitem__(self, index):
return self._data[index]
def __setitem__(self, index, value):
if isinstance(value, dict):
value = ObservableDict(value, self._callback, self._timeout)
elif isinstance(value, list):
value = ObservableList(value, self._callback, self._timeout)
with self._lock:
self._data[index] = value
self._updated_indices.add(index) # 记录更新的索引
if index in self._deleted_indices:
self._deleted_indices.remove(index)
self._schedule_callback()
def __delitem__(self, index):
with self._lock:
del self._data[index]
self._deleted_indices.add(index) # 记录删除的索引
if index in self._updated_indices:
self._updated_indices.remove(index)
self._schedule_callback()
def insert(self, index, value):
if isinstance(value, dict):
value = ObservableDict(value, self._callback, self._timeout)
elif isinstance(value, list):
value = ObservableList(value, self._callback, self._timeout)
with self._lock:
self._data.insert(index, value)
self._inserted_indices.add(index) # 记录插入的索引
self._schedule_callback()
def __len__(self):
return len(self._data)
def __repr__(self):
return repr(self._data)
def to_list(self):
"""
ObservableList 转换为普通的 list
如果其中包含嵌套的 ObservableDict ObservableList也会递归转换
"""
def convert(value):
if isinstance(value, ObservableDict):
return value.to_dict()
elif isinstance(value, ObservableList):
return value.to_list()
return value
return [convert(item) for item in self._data]
def _schedule_callback(self):
"""
延时触发回调的机制每次修改会重置定时器直到最后一次修改后的 timeout 时间到达时触发回调
"""
with self._lock:
if self._timer:
self._timer.cancel() # 如果已有定时器,取消它
self._timer = threading.Timer(self._timeout, self._trigger_callback)
self._timer.start()
def _trigger_callback(self):
"""
定时器到期时触发回调传递更新删除和插入的索引及数据
"""
with self._lock:
if self._callback:
data_copy = self.to_list() # 传递当前完整的数据列表
self._callback(
{
"updated": list(self._updated_indices),
"deleted": list(self._deleted_indices),
"inserted": list(self._inserted_indices)
},
data_copy
)
# 清空记录
self._updated_indices.clear()
self._deleted_indices.clear()
self._inserted_indices.clear()