| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- # @description:
- # @author: licanglong
- # @date: 2025/9/24 14:07
- import hashlib
- import json
- import re
- from typing import Dict, Any
- class AIStreamJSONParser:
- """
- 用于解析 AI stream 返回的 chunk 流,
- 最终还原为完整 JSON 对象
- """
- def __init__(self):
- self._content_buffer: list[str] = []
- def feed_chunk(self, chunk_line: str) -> None:
- """
- 处理单行 stream 数据
- """
- if not chunk_line.strip():
- return
- try:
- event = json.loads(chunk_line)
- except json.JSONDecodeError:
- # 非法行直接忽略(极少见)
- return
- choices = event.get("choices", [])
- if not choices:
- return
- delta = choices[0].get("delta", {})
- content_piece = delta.get("content")
- if content_piece:
- self._content_buffer.append(content_piece)
- def is_finished(self, chunk_line: str) -> bool:
- """
- 判断 stream 是否结束
- """
- try:
- event = json.loads(chunk_line)
- except json.JSONDecodeError:
- return False
- choices = event.get("choices", [])
- if not choices:
- return False
- return choices[0].get("finish_reason") == "stop"
- def get_result(self) -> Dict[str, Any]:
- """
- 返回最终解析结果(JSON 对象)
- """
- full_text = "".join(self._content_buffer)
- return json.loads(full_text)
- def normalize_text(text: str) -> str:
- """
- 对文本进行规范化,确保 hash 稳定
- """
- text = text.strip().lower()
- text = re.sub(r"\s+", "", text)
- text = re.sub(r"[^\w\u4e00-\u9fff]", "", text)
- return text
- def compute_text_hash(text: str) -> str:
- """
- 计算文本的 SHA256 hash,用于完全重复去重
- """
- normalized = normalize_text(text)
- return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
|