__init__.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # @description:
  2. # @author: licanglong
  3. # @date: 2025/9/24 14:07
  4. import datetime
  5. import hashlib
  6. import json
  7. import re
  8. from typing import Dict, Any
  9. from aiohttp import ClientSession
  10. from app.core import BizException
  11. class AIStreamJSONParser:
  12. """
  13. 用于解析 AI stream 返回的 chunk 流,
  14. 最终还原为完整 JSON 对象
  15. """
  16. def __init__(self):
  17. self._content_buffer: list[str] = []
  18. def feed_chunk(self, chunk_line: str) -> None:
  19. """
  20. 处理单行 stream 数据
  21. """
  22. if not chunk_line.strip():
  23. return
  24. try:
  25. event = json.loads(chunk_line)
  26. except json.JSONDecodeError:
  27. # 非法行直接忽略(极少见)
  28. return
  29. choices = event.get("choices", [])
  30. if not choices:
  31. return
  32. delta = choices[0].get("delta", {})
  33. content_piece = delta.get("content")
  34. if content_piece:
  35. self._content_buffer.append(content_piece)
  36. def is_finished(self, chunk_line: str) -> bool:
  37. """
  38. 判断 stream 是否结束
  39. """
  40. try:
  41. event = json.loads(chunk_line)
  42. except json.JSONDecodeError:
  43. return False
  44. choices = event.get("choices", [])
  45. if not choices:
  46. return False
  47. return choices[0].get("finish_reason") == "stop"
  48. def get_result(self) -> Dict[str, Any]:
  49. """
  50. 返回最终解析结果(JSON 对象)
  51. """
  52. full_text = "".join(self._content_buffer)
  53. return json.loads(full_text)
  54. def normalize_text(text: str) -> str:
  55. """
  56. 对文本进行规范化,确保 hash 稳定
  57. """
  58. text = text.strip().lower()
  59. text = re.sub(r"\s+", "", text)
  60. text = re.sub(r"[^\w\u4e00-\u9fff]", "", text)
  61. return text
  62. def compute_text_hash(text: str) -> str:
  63. """
  64. 计算文本的 SHA256 hash,用于完全重复去重
  65. """
  66. normalized = normalize_text(text)
  67. return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
  68. def get_current_time():
  69. return f"The current date and time is {datetime.datetime.now()}"
  70. async def ali_search_tool(text: str):
  71. from app.core import CTX
  72. async with ClientSession() as session:
  73. search_resp = await session.post(url="https://cloud-iqs.aliyuncs.com/search/unified",
  74. headers={"Content-Type": "application/json",
  75. "Authorization": f"Bearer {CTX.ENV.getprop('aliyun.search.api_key')}"},
  76. json={
  77. "query": text,
  78. "engineType": "Generic",
  79. "contents": {
  80. "mainText": True,
  81. "markdownText": False,
  82. "summary": False,
  83. "rerankScore": True
  84. }
  85. })
  86. if search_resp.status != 200:
  87. raise BizException("搜索工具调用异常")
  88. return await search_resp.json(encoding="utf-8")