# @description: # @author: licanglong # @date: 2025/11/20 14:22 import json import re import dirtyjson import openai from app.client.VectorStoreClient import vector_store_client from app.constants.vector_store import VectorStoreCollection from app.core import BizException, CTX from app.models.Result import SysResult from app.models.dto import FinalDecisionResult, RiskEvidenceResult, SimilarIdentificationResult from app.prompt import person_consumption_prompt, external_evidence_search_prompt, similar_identification_prompt from app.routes.risk import risk_router from app.service.llm_client import llm_call @risk_router.post('/decide') async def risk_decide(invoice_data: dict): """ 发票风险裁决 :return: """ vector = vector_store_client.embedding.encode(f""" 特定业务类型:{invoice_data['tdywlx'] or ''} 购买方名称: {invoice_data['gmfmc'] or ''} 货物名称:{invoice_data['hwmc'] or ''} 规格型号:{invoice_data['ggxh'] or ''} 开票人:{invoice_data['kpr'] or ''} """) rules = await vector_store_client.client.query_points( collection_name=VectorStoreCollection.RULE_EMBED_STORE, query=vector.tolist(), limit=5, score_threshold=0.5 ) cases = await vector_store_client.client.query_points( collection_name=VectorStoreCollection.CASE_EMBED_STORE, query=vector.tolist(), limit=5, score_threshold=0.5 ) merchants = await vector_store_client.client.query_points( collection_name=VectorStoreCollection.MERCHANTS_EMBED_STORE, query=vector.tolist(), limit=5, score_threshold=0.5 ) edges = await vector_store_client.client.query_points( collection_name=VectorStoreCollection.EDGES_EMBED_STORE, query=vector.tolist(), limit=5, score_threshold=0.5 ) input_data = { "invoice_context": invoice_data, "rules": [hit.payload for hit in rules.points], "cases": [hit.payload for hit in cases.points], "industry": [hit.payload for hit in merchants.points], "signals": [hit.payload for hit in edges.points] } final_user_prompt = person_consumption_prompt.get_person_consumption_user_prompt( json.dumps(input_data, ensure_ascii=False)) final_user_prompt = final_user_prompt.replace("{{input_data_desc}}", "") client = openai.AsyncOpenAI( api_key=CTX.ENV.getprop("llm.qwen.api_key", raise_error=True), base_url=CTX.ENV.getprop("llm.qwen.base_url", raise_error=True), ) completion = await client.chat.completions.create( model="qwen-plus", messages=[{'role': 'system', 'content': person_consumption_prompt.system_prompt}, {'role': 'user', 'content': final_user_prompt}] ) if not completion.choices: raise BizException("LLM响应异常") generate_content = completion.choices[0].message.content decision_result: FinalDecisionResult = FinalDecisionResult.model_validate(dirtyjson.loads(generate_content)) return SysResult.success(data=decision_result) @risk_router.post('/evidence') async def evidence_replenish(invoice_data: dict): input_data = { "invoice_context": invoice_data } final_external_evidence_user_prompt = external_evidence_search_prompt.get_external_evidence_user_prompt( json.dumps(input_data, ensure_ascii=False)) tools = [ { "type": "function", "function": { "name": "ali_search_tool", "description": "当需要从互联网获取额外信息时使用", "parameters": { "type": "object", "properties": { "keyword": { "type": "string", "description": "搜索关键词,如果需要限定搜索源可以在结尾加上 <+ 平台名称 >,例如: 如何判断一个企业的经营范围? + 税务局" } }, "required": ["keyword"] } } } ] generate_content = await llm_call(tools=tools, messages=[ {'role': 'system', 'content': external_evidence_search_prompt.external_evidence_system_prompt}, {'role': 'user', 'content': final_external_evidence_user_prompt}]) evidence_result: RiskEvidenceResult = RiskEvidenceResult.model_validate(dirtyjson.loads(generate_content)) return SysResult.success(data=evidence_result) @risk_router.post('/similar') async def similar_identification(invoice_data: dict): hwmc = invoice_data["hwmc"] hw_type = None type_match = re.match(r'\*([^*]+)\*', hwmc) if type_match: hw_type = type_match[0] info = re.sub(r'\*([^*]+)\*', "", hwmc) if not hw_type or not info: return SysResult.fail(msg="货物信息不符合规范") system_prompt = similar_identification_prompt.similar_identification_system_prompt user_prompt = similar_identification_prompt.get_similar_identification_user_prompt(info, hw_type) client = openai.AsyncOpenAI( api_key=CTX.ENV.getprop("llm.qwen.api_key", raise_error=True), base_url=CTX.ENV.getprop("llm.qwen.base_url", raise_error=True), ) tools = [ { "type": "function", "function": { "name": "ali_search_tool", "description": "当需要从互联网获取额外信息时使用", "parameters": { "type": "object", "properties": { "keyword": { "type": "string", "description": "搜索关键词,如果需要限定搜索源可以在结尾加上 <+ 平台名称 >,例如: 行业指标 + 税务局" } }, "required": ["keyword"] } } } ] generate_content = await llm_call(tools=tools, messages=[ {'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': user_prompt}]) identification_result: SimilarIdentificationResult = SimilarIdentificationResult.model_validate( dirtyjson.loads(generate_content)) return SysResult.success(data=identification_result)