__init__.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # @description:
  2. # @author: licanglong
  3. # @date: 2025/9/24 14:07
  4. import hashlib
  5. import json
  6. import re
  7. from typing import Dict, Any
  8. class AIStreamJSONParser:
  9. """
  10. 用于解析 AI stream 返回的 chunk 流,
  11. 最终还原为完整 JSON 对象
  12. """
  13. def __init__(self):
  14. self._content_buffer: list[str] = []
  15. def feed_chunk(self, chunk_line: str) -> None:
  16. """
  17. 处理单行 stream 数据
  18. """
  19. if not chunk_line.strip():
  20. return
  21. try:
  22. event = json.loads(chunk_line)
  23. except json.JSONDecodeError:
  24. # 非法行直接忽略(极少见)
  25. return
  26. choices = event.get("choices", [])
  27. if not choices:
  28. return
  29. delta = choices[0].get("delta", {})
  30. content_piece = delta.get("content")
  31. if content_piece:
  32. self._content_buffer.append(content_piece)
  33. def is_finished(self, chunk_line: str) -> bool:
  34. """
  35. 判断 stream 是否结束
  36. """
  37. try:
  38. event = json.loads(chunk_line)
  39. except json.JSONDecodeError:
  40. return False
  41. choices = event.get("choices", [])
  42. if not choices:
  43. return False
  44. return choices[0].get("finish_reason") == "stop"
  45. def get_result(self) -> Dict[str, Any]:
  46. """
  47. 返回最终解析结果(JSON 对象)
  48. """
  49. full_text = "".join(self._content_buffer)
  50. return json.loads(full_text)
  51. def normalize_text(text: str) -> str:
  52. """
  53. 对文本进行规范化,确保 hash 稳定
  54. """
  55. text = text.strip().lower()
  56. text = re.sub(r"\s+", "", text)
  57. text = re.sub(r"[^\w\u4e00-\u9fff]", "", text)
  58. return text
  59. def compute_text_hash(text: str) -> str:
  60. """
  61. 计算文本的 SHA256 hash,用于完全重复去重
  62. """
  63. normalized = normalize_text(text)
  64. return hashlib.sha256(normalized.encode("utf-8")).hexdigest()