_events.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # @description:
  2. # @author: licanglong
  3. # @date: 2025/11/20 11:46
  4. import logging
  5. import threading
  6. import time
  7. from collections import defaultdict
  8. from typing import Any, Dict, Type, List, Callable, Optional
  9. _log = logging.getLogger(__name__)
  10. class Event:
  11. """事件基类,支持携带任意数据"""
  12. def __init__(self, source: Any = None, tags: Optional[List[str]] = None):
  13. self.source = source
  14. self.tags = tags or []
  15. self.timestamp = time.time()
  16. class Subscriber:
  17. """
  18. # ---------------------------
  19. # 订阅者信息
  20. # ---------------------------
  21. """
  22. def __init__(self, callback: Callable, event_type: Type[Event],
  23. condition: Optional[Callable[[Event], bool]] = None,
  24. priority: int = 0,
  25. async_: bool = False,
  26. once: bool = False):
  27. self.callback = callback
  28. self.event_type = event_type
  29. self.condition = condition
  30. self.priority = priority
  31. self.async_ = async_
  32. self.once = once
  33. class EventBus:
  34. """
  35. # ---------------------------
  36. # 事件总线
  37. # ---------------------------
  38. """
  39. def __init__(self):
  40. # 事件类型 -> list[Subscriber]
  41. self._subscribers: Dict[Type[Event], List[Subscriber]] = defaultdict(list)
  42. self._lock = threading.RLock()
  43. def subscribe(self, event_type: Type[Event], *,
  44. condition: Optional[Callable[[Event], bool]] = None,
  45. priority: int = 0,
  46. async_: bool = False,
  47. once: bool = False):
  48. """
  49. # -----------------------
  50. # 订阅装饰器
  51. # -----------------------
  52. :param event_type: 事件类型
  53. :param condition: 条件
  54. :param priority: 优先级
  55. :param async_: 异步模式
  56. :param once: 一次订阅
  57. :return:
  58. """
  59. def decorator(func: Callable):
  60. sub = Subscriber(func, event_type, condition, priority, async_, once)
  61. with self._lock:
  62. self._subscribers[event_type].append(sub)
  63. # 按优先级排序,优先级高先执行
  64. self._subscribers[event_type].sort(key=lambda s: -s.priority)
  65. return func
  66. return decorator
  67. def unsubscribe(self, event_type: Type[Event], callback: Callable):
  68. """
  69. # -----------------------
  70. # 注销订阅者
  71. # -----------------------
  72. :param event_type: 事件类型
  73. :param callback: 回调
  74. :return:
  75. """
  76. with self._lock:
  77. self._subscribers[event_type] = [
  78. s for s in self._subscribers[event_type] if s.callback != callback
  79. ]
  80. def emit(self, event: Event):
  81. """
  82. # -----------------------
  83. # 触发事件
  84. # -----------------------
  85. :param event: 事件
  86. :return:
  87. """
  88. event_type = type(event)
  89. subscribers_to_remove = []
  90. with self._lock:
  91. # 支持子类事件触发父类订阅
  92. applicable_subs = []
  93. for etype, subs in self._subscribers.items():
  94. if issubclass(event_type, etype):
  95. applicable_subs.extend(subs)
  96. # 执行回调
  97. for sub in sorted(applicable_subs, key=lambda s: -s.priority):
  98. if sub.condition and not sub.condition(event):
  99. continue
  100. try:
  101. if sub.async_:
  102. threading.Thread(target=sub.callback, args=(event,), daemon=True).start()
  103. else:
  104. sub.callback(event)
  105. except Exception as e:
  106. _log.info(f"[EventBus] error in subscriber {sub.callback}: {e}")
  107. if sub.once:
  108. subscribers_to_remove.append(sub)
  109. # 移除一次性订阅
  110. for sub in subscribers_to_remove:
  111. self.unsubscribe(sub.event_type, sub.callback)
  112. def clear(self, event_type: Optional[Type[Event]] = None):
  113. """
  114. # -----------------------
  115. # 清理订阅者
  116. # -----------------------
  117. :param event_type:
  118. :return:
  119. """
  120. with self._lock:
  121. if event_type:
  122. self._subscribers[event_type] = []
  123. else:
  124. self._subscribers.clear()
  125. class EventBusInstance:
  126. """
  127. EventBus 的单例类
  128. """
  129. _instance_lock = threading.Lock()
  130. _instance: Optional[EventBus] = None
  131. def __new__(cls, *args, **kwargs):
  132. if not cls._instance:
  133. with cls._instance_lock:
  134. if not cls._instance:
  135. cls._instance = EventBus()
  136. return cls._instance
  137. EM = EventBusInstance()