Files
dtm-py-all/Observable.py

208 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import binascii
import datetime
import requests
from collections.abc import MutableMapping, MutableSequence
class ObservableDict(MutableMapping):
def __init__(self, initial_data=None, callback=None, timeout=0.05):
self._data = {}
self._callback = callback
self._timeout = timeout # 延迟触发时间,单位:秒
self._timer = None # 定时器
self._lock = threading.Lock() # 用于保护线程安全
self._updated_keys = set() # 记录更新的键和值
self._deleted_keys = set() # 记录删除的键
self._forbid_callback = False
if initial_data:
for key, value in initial_data.items():
self[key] = value # 初始化时封装嵌套结构
def to_dict(self):
"""
将 ObservableDict 转换为普通的 dict。
如果其中包含嵌套的 ObservableDict 或 ObservableList也会递归转换。
"""
def convert(value):
if isinstance(value, ObservableDict):
return value.to_dict()
elif isinstance(value, ObservableList):
return value.to_list()
return value
# 转换 _data 中所有键值对
return {key: convert(value) for key, value in self._data.items()}
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
# 支持嵌套结构
if isinstance(value, dict):
value = ObservableDict(value, self._callback, self._timeout)
elif isinstance(value, list):
value = ObservableList(value, self._callback, self._timeout)
if self._forbid_callback is False:
with self._lock:
self._data[key] = value
self._updated_keys.add(key) # 记录更新的键和值
if key in self._deleted_keys: # 如果键之前是删除的,现在需要移出删除记录
self._deleted_keys.remove(key)
# 启动延迟触发
self._schedule_callback()
def __delitem__(self, key):
with self._lock:
del self._data[key]
self._deleted_keys.add(key) # 记录删除的键
if key in self._updated_keys: # 如果键之前是更新的,现在需要移出更新记录
self._updated_keys.remove(key)
# 启动延迟触发
self._schedule_callback()
def __iter__(self):
return iter(self._data)
def __len__(self):
return len(self._data)
def __repr__(self):
return repr(self._data)
def _schedule_callback(self):
"""
延时触发回调的机制,每次修改会重置定时器,直到最后一次修改后的 timeout 时间到达时触发回调。
"""
with self._lock:
if self._timer:
self._timer.cancel() # 如果已有定时器,取消它
self._timer = threading.Timer(self._timeout, self._trigger_callback)
self._timer.start()
def _trigger_callback(self):
"""
定时器到期时,触发回调,传递更新和删除的键及数据。
"""
with self._lock:
if self._callback:
# 将更新和删除的键和值传递给回调函数
self._callback(
{
"updated": list(self._updated_keys), # 复制更新的键和值
"deleted": list(self._deleted_keys) # 转换删除的键为列表
},
self._data.copy()
)
# 清空记录
self._updated_keys.clear()
self._deleted_keys.clear()
# 定义 ObservableList
class ObservableList(MutableSequence):
def __init__(self, initial_data=None, callback=None, timeout=0.05):
self._data = []
self._callback = callback
self._timeout = timeout # 延迟触发时间,单位:秒
self._timer = None # 定时器
self._lock = threading.Lock() # 用于保护线程安全
self._updated_indices = set() # 记录更新的索引
self._deleted_indices = set() # 记录删除的索引
self._inserted_indices = set() # 记录插入的索引
if initial_data:
for item in initial_data:
self.append(item)
def __getitem__(self, index):
return self._data[index]
def __setitem__(self, index, value):
if isinstance(value, dict):
value = ObservableDict(value, self._callback, self._timeout)
elif isinstance(value, list):
value = ObservableList(value, self._callback, self._timeout)
with self._lock:
self._data[index] = value
self._updated_indices.add(index) # 记录更新的索引
if index in self._deleted_indices:
self._deleted_indices.remove(index)
self._schedule_callback()
def __delitem__(self, index):
with self._lock:
del self._data[index]
self._deleted_indices.add(index) # 记录删除的索引
if index in self._updated_indices:
self._updated_indices.remove(index)
self._schedule_callback()
def insert(self, index, value):
if isinstance(value, dict):
value = ObservableDict(value, self._callback, self._timeout)
elif isinstance(value, list):
value = ObservableList(value, self._callback, self._timeout)
with self._lock:
self._data.insert(index, value)
self._inserted_indices.add(index) # 记录插入的索引
self._schedule_callback()
def __len__(self):
return len(self._data)
def __repr__(self):
return repr(self._data)
def to_list(self):
"""
将 ObservableList 转换为普通的 list。
如果其中包含嵌套的 ObservableDict 或 ObservableList也会递归转换。
"""
def convert(value):
if isinstance(value, ObservableDict):
return value.to_dict()
elif isinstance(value, ObservableList):
return value.to_list()
return value
return [convert(item) for item in self._data]
def _schedule_callback(self):
"""
延时触发回调的机制,每次修改会重置定时器,直到最后一次修改后的 timeout 时间到达时触发回调。
"""
with self._lock:
if self._timer:
self._timer.cancel() # 如果已有定时器,取消它
self._timer = threading.Timer(self._timeout, self._trigger_callback)
self._timer.start()
def _trigger_callback(self):
"""
定时器到期时,触发回调,传递更新、删除和插入的索引及数据。
"""
with self._lock:
if self._callback:
data_copy = self.to_list() # 传递当前完整的数据列表
self._callback(
{
"updated": list(self._updated_indices),
"deleted": list(self._deleted_indices),
"inserted": list(self._inserted_indices)
},
data_copy
)
# 清空记录
self._updated_indices.clear()
self._deleted_indices.clear()
self._inserted_indices.clear()