_configs.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # @description:
  2. # @author: licanglong
  3. # @date: 2025/12/23 13:52
  4. import logging
  5. import threading
  6. from abc import ABC, abstractmethod
  7. from pathlib import Path
  8. import requests
  9. import yaml
  10. from app.core._events import Event, EM
  11. from app.core._property import PropertyDict
  12. CONFIG_IMPORT = "config.imports"
  13. _log = logging.getLogger(__name__)
  14. class ConfigEnvironment(PropertyDict):
  15. def merge_source(self, source):
  16. """接受 ConfigDataResource 对象"""
  17. if not hasattr(source, "load"):
  18. raise TypeError(f"Source must have load() method, got {type(source)}")
  19. data = source.load()
  20. if not isinstance(data, dict):
  21. data = {}
  22. # 自动提取 import
  23. imports = self.extract_imports(data)
  24. if imports:
  25. ImportResolver.add_imports(imports)
  26. self.merge(data)
  27. def extract_imports(self, data: dict) -> list:
  28. """
  29. 从配置数据中提取 import 条目
  30. 返回列表,如果没有 import 返回空列表
  31. """
  32. if not isinstance(data, dict):
  33. return []
  34. data_config = PropertyDict(data)
  35. return data_config.getprop(CONFIG_IMPORT, [])
  36. class ConfigEnvironmentInstance:
  37. """
  38. ConfigEnvironment 的单例代理
  39. """
  40. _instance_lock = threading.Lock()
  41. _instance: ConfigEnvironment = None
  42. def __new__(cls, *args, **kwargs):
  43. if cls._instance is None:
  44. with cls._instance_lock:
  45. if cls._instance is None:
  46. cls._instance = ConfigEnvironment(*args, **kwargs)
  47. return cls._instance
  48. _environment = ConfigEnvironmentInstance()
  49. class ConfigDataResource(ABC):
  50. @abstractmethod
  51. def load(self) -> dict:
  52. pass
  53. class ConfigDataLocationResolver(ABC):
  54. @abstractmethod
  55. def resolve(self, location: str) -> ConfigDataResource:
  56. pass
  57. class RegisterResolverEvent(Event):
  58. """
  59. 注册解析器事件
  60. """
  61. def __init__(self, protocol, **kwargs):
  62. super().__init__(**kwargs)
  63. self.protocol = protocol
  64. class ImportResolver:
  65. environment: ConfigEnvironment = None
  66. resolvers = {} # protocol -> resolver
  67. cached_imports = [] # 未解析的 import 条目
  68. @classmethod
  69. def register(cls, protocol: str, resolver: ConfigDataLocationResolver):
  70. """注册解析器"""
  71. _log.info(f"Register resolver for protocol: {protocol}")
  72. cls.resolvers[protocol] = resolver
  73. EM.emit(RegisterResolverEvent(protocol))
  74. @classmethod
  75. def add_imports(cls, imports: list[str]):
  76. """添加新的 import 条目到缓存"""
  77. cls.cached_imports.extend(imports)
  78. for protocol in cls.resolvers.keys():
  79. EM.emit(RegisterResolverEvent(protocol))
  80. @classmethod
  81. def resolve(cls, import_str: str):
  82. if ":" not in import_str:
  83. cls.cached_imports.append(import_str)
  84. return None
  85. protocol, target = import_str.split(":", 1)
  86. if protocol not in cls.resolvers:
  87. cls.cached_imports.append(import_str)
  88. return None
  89. resolver = cls.resolvers[protocol]
  90. return resolver.resolve(target.strip())
  91. ImportResolver.environment = _environment
  92. class FileResource(ConfigDataResource):
  93. def __init__(self, path: str):
  94. self.path = Path(path)
  95. def load(self) -> dict:
  96. if not self.path.exists():
  97. return {}
  98. with open(self.path, "r", encoding="utf-8") as f:
  99. data = yaml.safe_load(f) or {}
  100. # if isinstance(data, dict) and "config" in data and isinstance(data["config"], dict):
  101. # return data["config"]
  102. return data
  103. class FileResolver(ConfigDataLocationResolver):
  104. def resolve(self, location: str) -> ConfigDataResource:
  105. return FileResource(location)
  106. class HttpResource(ConfigDataResource):
  107. def __init__(self, url: str):
  108. self.url = url
  109. def load(self) -> dict:
  110. resp = requests.get(self.url)
  111. resp.raise_for_status()
  112. import yaml
  113. data = yaml.safe_load(resp.text) or {}
  114. if isinstance(data, dict) and "config" in data and isinstance(data["config"], dict):
  115. return data["config"]
  116. return data
  117. class HttpResolver(ConfigDataLocationResolver):
  118. def resolve(self, location: str) -> ConfigDataResource:
  119. return HttpResource(location)
  120. ImportResolver.register("file", FileResolver())
  121. ImportResolver.register("http", HttpResolver())