# @description: # @author: licanglong # @date: 2025/9/24 14:07 import datetime import hashlib import json import re from typing import Dict, Any from aiohttp import ClientSession from app.core import BizException class AIStreamJSONParser: """ 用于解析 AI stream 返回的 chunk 流, 最终还原为完整 JSON 对象 """ def __init__(self): self._content_buffer: list[str] = [] def feed_chunk(self, chunk_line: str) -> None: """ 处理单行 stream 数据 """ if not chunk_line.strip(): return try: event = json.loads(chunk_line) except json.JSONDecodeError: # 非法行直接忽略(极少见) return choices = event.get("choices", []) if not choices: return delta = choices[0].get("delta", {}) content_piece = delta.get("content") if content_piece: self._content_buffer.append(content_piece) def is_finished(self, chunk_line: str) -> bool: """ 判断 stream 是否结束 """ try: event = json.loads(chunk_line) except json.JSONDecodeError: return False choices = event.get("choices", []) if not choices: return False return choices[0].get("finish_reason") == "stop" def get_result(self) -> Dict[str, Any]: """ 返回最终解析结果(JSON 对象) """ full_text = "".join(self._content_buffer) return json.loads(full_text) def normalize_text(text: str) -> str: """ 对文本进行规范化,确保 hash 稳定 """ text = text.strip().lower() text = re.sub(r"\s+", "", text) text = re.sub(r"[^\w\u4e00-\u9fff]", "", text) return text def compute_text_hash(text: str) -> str: """ 计算文本的 SHA256 hash,用于完全重复去重 """ normalized = normalize_text(text) return hashlib.sha256(normalized.encode("utf-8")).hexdigest() def get_current_time(): return f"The current date and time is {datetime.datetime.now()}" async def ali_search_tool(text: str): from app.core import CTX async with ClientSession() as session: search_resp = await session.post(url="https://cloud-iqs.aliyuncs.com/search/unified", headers={"Content-Type": "application/json", "Authorization": f"Bearer {CTX.ENV.getprop('aliyun.search.api_key')}"}, json={ "query": text, "engineType": "Generic", "contents": { "mainText": True, "markdownText": False, "summary": False, "rerankScore": True } }) if search_resp.status != 200: raise BizException("搜索工具调用异常") return await search_resp.json(encoding="utf-8")