import threading import binascii import datetime import requests from collections.abc import MutableMapping, MutableSequence class ObservableDict(MutableMapping): def __init__(self, initial_data=None, callback=None, timeout=0.05): self._data = {} self._callback = callback self._timeout = timeout # 延迟触发时间,单位:秒 self._timer = None # 定时器 self._lock = threading.Lock() # 用于保护线程安全 self._updated_keys = set() # 记录更新的键和值 self._deleted_keys = set() # 记录删除的键 self._forbid_callback = False if initial_data: for key, value in initial_data.items(): self[key] = value # 初始化时封装嵌套结构 def to_dict(self): """ 将 ObservableDict 转换为普通的 dict。 如果其中包含嵌套的 ObservableDict 或 ObservableList,也会递归转换。 """ def convert(value): if isinstance(value, ObservableDict): return value.to_dict() elif isinstance(value, ObservableList): return value.to_list() return value # 转换 _data 中所有键值对 return {key: convert(value) for key, value in self._data.items()} def __getitem__(self, key): return self._data[key] def __setitem__(self, key, value): # 支持嵌套结构 if isinstance(value, dict): value = ObservableDict(value, self._callback, self._timeout) elif isinstance(value, list): value = ObservableList(value, self._callback, self._timeout) if self._forbid_callback is False: with self._lock: self._data[key] = value self._updated_keys.add(key) # 记录更新的键和值 if key in self._deleted_keys: # 如果键之前是删除的,现在需要移出删除记录 self._deleted_keys.remove(key) # 启动延迟触发 self._schedule_callback() def __delitem__(self, key): with self._lock: del self._data[key] self._deleted_keys.add(key) # 记录删除的键 if key in self._updated_keys: # 如果键之前是更新的,现在需要移出更新记录 self._updated_keys.remove(key) # 启动延迟触发 self._schedule_callback() def __iter__(self): return iter(self._data) def __len__(self): return len(self._data) def __repr__(self): return repr(self._data) def _schedule_callback(self): """ 延时触发回调的机制,每次修改会重置定时器,直到最后一次修改后的 timeout 时间到达时触发回调。 """ with self._lock: if self._timer: self._timer.cancel() # 如果已有定时器,取消它 self._timer = threading.Timer(self._timeout, self._trigger_callback) self._timer.start() def _trigger_callback(self): """ 定时器到期时,触发回调,传递更新和删除的键及数据。 """ with self._lock: if self._callback: # 将更新和删除的键和值传递给回调函数 self._callback( { "updated": list(self._updated_keys), # 复制更新的键和值 "deleted": list(self._deleted_keys) # 转换删除的键为列表 }, self._data.copy() ) # 清空记录 self._updated_keys.clear() self._deleted_keys.clear() # 定义 ObservableList class ObservableList(MutableSequence): def __init__(self, initial_data=None, callback=None, timeout=0.05): self._data = [] self._callback = callback self._timeout = timeout # 延迟触发时间,单位:秒 self._timer = None # 定时器 self._lock = threading.Lock() # 用于保护线程安全 self._updated_indices = set() # 记录更新的索引 self._deleted_indices = set() # 记录删除的索引 self._inserted_indices = set() # 记录插入的索引 if initial_data: for item in initial_data: self.append(item) def __getitem__(self, index): return self._data[index] def __setitem__(self, index, value): if isinstance(value, dict): value = ObservableDict(value, self._callback, self._timeout) elif isinstance(value, list): value = ObservableList(value, self._callback, self._timeout) with self._lock: self._data[index] = value self._updated_indices.add(index) # 记录更新的索引 if index in self._deleted_indices: self._deleted_indices.remove(index) self._schedule_callback() def __delitem__(self, index): with self._lock: del self._data[index] self._deleted_indices.add(index) # 记录删除的索引 if index in self._updated_indices: self._updated_indices.remove(index) self._schedule_callback() def insert(self, index, value): if isinstance(value, dict): value = ObservableDict(value, self._callback, self._timeout) elif isinstance(value, list): value = ObservableList(value, self._callback, self._timeout) with self._lock: self._data.insert(index, value) self._inserted_indices.add(index) # 记录插入的索引 self._schedule_callback() def __len__(self): return len(self._data) def __repr__(self): return repr(self._data) def to_list(self): """ 将 ObservableList 转换为普通的 list。 如果其中包含嵌套的 ObservableDict 或 ObservableList,也会递归转换。 """ def convert(value): if isinstance(value, ObservableDict): return value.to_dict() elif isinstance(value, ObservableList): return value.to_list() return value return [convert(item) for item in self._data] def _schedule_callback(self): """ 延时触发回调的机制,每次修改会重置定时器,直到最后一次修改后的 timeout 时间到达时触发回调。 """ with self._lock: if self._timer: self._timer.cancel() # 如果已有定时器,取消它 self._timer = threading.Timer(self._timeout, self._trigger_callback) self._timer.start() def _trigger_callback(self): """ 定时器到期时,触发回调,传递更新、删除和插入的索引及数据。 """ with self._lock: if self._callback: data_copy = self.to_list() # 传递当前完整的数据列表 self._callback( { "updated": list(self._updated_indices), "deleted": list(self._deleted_indices), "inserted": list(self._inserted_indices) }, data_copy ) # 清空记录 self._updated_indices.clear() self._deleted_indices.clear() self._inserted_indices.clear()