| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- # @description:
- # @author: licanglong
- # @date: 2025/9/24 14:07
- import datetime
- import hashlib
- import json
- import re
- from typing import Dict, Any
- from aiohttp import ClientSession
- from app.core import BizException
- class AIStreamJSONParser:
- """
- 用于解析 AI stream 返回的 chunk 流,
- 最终还原为完整 JSON 对象
- """
- def __init__(self):
- self._content_buffer: list[str] = []
- def feed_chunk(self, chunk_line: str) -> None:
- """
- 处理单行 stream 数据
- """
- if not chunk_line.strip():
- return
- try:
- event = json.loads(chunk_line)
- except json.JSONDecodeError:
- # 非法行直接忽略(极少见)
- return
- choices = event.get("choices", [])
- if not choices:
- return
- delta = choices[0].get("delta", {})
- content_piece = delta.get("content")
- if content_piece:
- self._content_buffer.append(content_piece)
- def is_finished(self, chunk_line: str) -> bool:
- """
- 判断 stream 是否结束
- """
- try:
- event = json.loads(chunk_line)
- except json.JSONDecodeError:
- return False
- choices = event.get("choices", [])
- if not choices:
- return False
- return choices[0].get("finish_reason") == "stop"
- def get_result(self) -> Dict[str, Any]:
- """
- 返回最终解析结果(JSON 对象)
- """
- full_text = "".join(self._content_buffer)
- return json.loads(full_text)
- def normalize_text(text: str) -> str:
- """
- 对文本进行规范化,确保 hash 稳定
- """
- text = text.strip().lower()
- text = re.sub(r"\s+", "", text)
- text = re.sub(r"[^\w\u4e00-\u9fff]", "", text)
- return text
- def compute_text_hash(text: str) -> str:
- """
- 计算文本的 SHA256 hash,用于完全重复去重
- """
- normalized = normalize_text(text)
- return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
- def get_current_time():
- return f"The current date and time is {datetime.datetime.now()}"
- async def ali_search_tool(text: str):
- from app.core import CTX
- async with ClientSession() as session:
- search_resp = await session.post(url="https://cloud-iqs.aliyuncs.com/search/unified",
- headers={"Content-Type": "application/json",
- "Authorization": f"Bearer {CTX.ENV.getprop('aliyun.search.api_key')}"},
- json={
- "query": text,
- "engineType": "Generic",
- "contents": {
- "mainText": True,
- "markdownText": False,
- "summary": False,
- "rerankScore": True
- }
- })
- if search_resp.status != 200:
- raise BizException("搜索工具调用异常")
- return await search_resp.json(encoding="utf-8")
|