208 lines
7.4 KiB
Python
208 lines
7.4 KiB
Python
import threading
|
||
import binascii
|
||
import datetime
|
||
import requests
|
||
from collections.abc import MutableMapping, MutableSequence
|
||
|
||
|
||
class ObservableDict(MutableMapping):
|
||
def __init__(self, initial_data=None, callback=None, timeout=0.05):
|
||
self._data = {}
|
||
self._callback = callback
|
||
self._timeout = timeout # 延迟触发时间,单位:秒
|
||
self._timer = None # 定时器
|
||
self._lock = threading.Lock() # 用于保护线程安全
|
||
self._updated_keys = set() # 记录更新的键和值
|
||
self._deleted_keys = set() # 记录删除的键
|
||
self._forbid_callback = False
|
||
if initial_data:
|
||
for key, value in initial_data.items():
|
||
self[key] = value # 初始化时封装嵌套结构
|
||
|
||
def to_dict(self):
|
||
"""
|
||
将 ObservableDict 转换为普通的 dict。
|
||
如果其中包含嵌套的 ObservableDict 或 ObservableList,也会递归转换。
|
||
"""
|
||
|
||
def convert(value):
|
||
if isinstance(value, ObservableDict):
|
||
return value.to_dict()
|
||
elif isinstance(value, ObservableList):
|
||
return value.to_list()
|
||
return value
|
||
|
||
# 转换 _data 中所有键值对
|
||
return {key: convert(value) for key, value in self._data.items()}
|
||
|
||
def __getitem__(self, key):
|
||
return self._data[key]
|
||
|
||
def __setitem__(self, key, value):
|
||
# 支持嵌套结构
|
||
if isinstance(value, dict):
|
||
value = ObservableDict(value, self._callback, self._timeout)
|
||
elif isinstance(value, list):
|
||
value = ObservableList(value, self._callback, self._timeout)
|
||
if self._forbid_callback is False:
|
||
with self._lock:
|
||
self._data[key] = value
|
||
self._updated_keys.add(key) # 记录更新的键和值
|
||
if key in self._deleted_keys: # 如果键之前是删除的,现在需要移出删除记录
|
||
self._deleted_keys.remove(key)
|
||
# 启动延迟触发
|
||
self._schedule_callback()
|
||
|
||
def __delitem__(self, key):
|
||
with self._lock:
|
||
del self._data[key]
|
||
self._deleted_keys.add(key) # 记录删除的键
|
||
if key in self._updated_keys: # 如果键之前是更新的,现在需要移出更新记录
|
||
self._updated_keys.remove(key)
|
||
|
||
# 启动延迟触发
|
||
self._schedule_callback()
|
||
|
||
def __iter__(self):
|
||
return iter(self._data)
|
||
|
||
def __len__(self):
|
||
return len(self._data)
|
||
|
||
def __repr__(self):
|
||
return repr(self._data)
|
||
|
||
def _schedule_callback(self):
|
||
"""
|
||
延时触发回调的机制,每次修改会重置定时器,直到最后一次修改后的 timeout 时间到达时触发回调。
|
||
"""
|
||
with self._lock:
|
||
if self._timer:
|
||
self._timer.cancel() # 如果已有定时器,取消它
|
||
self._timer = threading.Timer(self._timeout, self._trigger_callback)
|
||
self._timer.start()
|
||
|
||
def _trigger_callback(self):
|
||
"""
|
||
定时器到期时,触发回调,传递更新和删除的键及数据。
|
||
"""
|
||
with self._lock:
|
||
if self._callback:
|
||
# 将更新和删除的键和值传递给回调函数
|
||
self._callback(
|
||
{
|
||
"updated": list(self._updated_keys), # 复制更新的键和值
|
||
"deleted": list(self._deleted_keys) # 转换删除的键为列表
|
||
},
|
||
self._data.copy()
|
||
)
|
||
# 清空记录
|
||
self._updated_keys.clear()
|
||
self._deleted_keys.clear()
|
||
|
||
|
||
# 定义 ObservableList
|
||
class ObservableList(MutableSequence):
|
||
def __init__(self, initial_data=None, callback=None, timeout=0.05):
|
||
self._data = []
|
||
self._callback = callback
|
||
self._timeout = timeout # 延迟触发时间,单位:秒
|
||
self._timer = None # 定时器
|
||
self._lock = threading.Lock() # 用于保护线程安全
|
||
self._updated_indices = set() # 记录更新的索引
|
||
self._deleted_indices = set() # 记录删除的索引
|
||
self._inserted_indices = set() # 记录插入的索引
|
||
|
||
if initial_data:
|
||
for item in initial_data:
|
||
self.append(item)
|
||
|
||
def __getitem__(self, index):
|
||
return self._data[index]
|
||
|
||
def __setitem__(self, index, value):
|
||
if isinstance(value, dict):
|
||
value = ObservableDict(value, self._callback, self._timeout)
|
||
elif isinstance(value, list):
|
||
value = ObservableList(value, self._callback, self._timeout)
|
||
|
||
with self._lock:
|
||
self._data[index] = value
|
||
self._updated_indices.add(index) # 记录更新的索引
|
||
if index in self._deleted_indices:
|
||
self._deleted_indices.remove(index)
|
||
|
||
self._schedule_callback()
|
||
|
||
def __delitem__(self, index):
|
||
with self._lock:
|
||
del self._data[index]
|
||
self._deleted_indices.add(index) # 记录删除的索引
|
||
if index in self._updated_indices:
|
||
self._updated_indices.remove(index)
|
||
|
||
self._schedule_callback()
|
||
|
||
def insert(self, index, value):
|
||
if isinstance(value, dict):
|
||
value = ObservableDict(value, self._callback, self._timeout)
|
||
elif isinstance(value, list):
|
||
value = ObservableList(value, self._callback, self._timeout)
|
||
|
||
with self._lock:
|
||
self._data.insert(index, value)
|
||
self._inserted_indices.add(index) # 记录插入的索引
|
||
|
||
self._schedule_callback()
|
||
|
||
def __len__(self):
|
||
return len(self._data)
|
||
|
||
def __repr__(self):
|
||
return repr(self._data)
|
||
|
||
def to_list(self):
|
||
"""
|
||
将 ObservableList 转换为普通的 list。
|
||
如果其中包含嵌套的 ObservableDict 或 ObservableList,也会递归转换。
|
||
"""
|
||
|
||
def convert(value):
|
||
if isinstance(value, ObservableDict):
|
||
return value.to_dict()
|
||
elif isinstance(value, ObservableList):
|
||
return value.to_list()
|
||
return value
|
||
|
||
return [convert(item) for item in self._data]
|
||
|
||
def _schedule_callback(self):
|
||
"""
|
||
延时触发回调的机制,每次修改会重置定时器,直到最后一次修改后的 timeout 时间到达时触发回调。
|
||
"""
|
||
with self._lock:
|
||
if self._timer:
|
||
self._timer.cancel() # 如果已有定时器,取消它
|
||
self._timer = threading.Timer(self._timeout, self._trigger_callback)
|
||
self._timer.start()
|
||
|
||
def _trigger_callback(self):
|
||
"""
|
||
定时器到期时,触发回调,传递更新、删除和插入的索引及数据。
|
||
"""
|
||
with self._lock:
|
||
if self._callback:
|
||
data_copy = self.to_list() # 传递当前完整的数据列表
|
||
self._callback(
|
||
{
|
||
"updated": list(self._updated_indices),
|
||
"deleted": list(self._deleted_indices),
|
||
"inserted": list(self._inserted_indices)
|
||
},
|
||
data_copy
|
||
)
|
||
# 清空记录
|
||
self._updated_indices.clear()
|
||
self._deleted_indices.clear()
|
||
self._inserted_indices.clear()
|