| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- # @description:
- # @author: licanglong
- # @date: 2025/11/20 14:22
- import json
- import re
- import dirtyjson
- import openai
- from app.client.VectorStoreClient import vector_store_client
- from app.constants.vector_store import VectorStoreCollection
- from app.core import BizException, CTX
- from app.models.Result import SysResult
- from app.models.dto import FinalDecisionResult, RiskEvidenceResult, SimilarIdentificationResult
- from app.prompt import person_consumption_prompt, external_evidence_search_prompt, similar_identification_prompt
- from app.routes.risk import risk_router
- from app.service.llm_client import llm_call
- @risk_router.post('/decide')
- async def risk_decide(invoice_data: dict):
- """
- 发票风险裁决
- :return:
- """
- vector = vector_store_client.embedding.encode(f"""
- 特定业务类型:{invoice_data['tdywlx'] or ''}
- 购买方名称: {invoice_data['gmfmc'] or ''}
- 货物名称:{invoice_data['hwmc'] or ''}
- 规格型号:{invoice_data['ggxh'] or ''}
- 开票人:{invoice_data['kpr'] or ''}
- """)
- rules = await vector_store_client.client.query_points(
- collection_name=VectorStoreCollection.RULE_EMBED_STORE,
- query=vector.tolist(),
- limit=5,
- score_threshold=0.5
- )
- cases = await vector_store_client.client.query_points(
- collection_name=VectorStoreCollection.CASE_EMBED_STORE,
- query=vector.tolist(),
- limit=5,
- score_threshold=0.5
- )
- merchants = await vector_store_client.client.query_points(
- collection_name=VectorStoreCollection.MERCHANTS_EMBED_STORE,
- query=vector.tolist(),
- limit=5,
- score_threshold=0.5
- )
- edges = await vector_store_client.client.query_points(
- collection_name=VectorStoreCollection.EDGES_EMBED_STORE,
- query=vector.tolist(),
- limit=5,
- score_threshold=0.5
- )
- input_data = {
- "invoice_context": invoice_data,
- "rules": [hit.payload for hit in rules.points],
- "cases": [hit.payload for hit in cases.points],
- "industry": [hit.payload for hit in merchants.points],
- "signals": [hit.payload for hit in edges.points]
- }
- final_user_prompt = person_consumption_prompt.get_person_consumption_user_prompt(
- json.dumps(input_data, ensure_ascii=False))
- final_user_prompt = final_user_prompt.replace("{{input_data_desc}}", "")
- client = openai.AsyncOpenAI(
- api_key=CTX.ENV.getprop("llm.qwen.api_key", raise_error=True),
- base_url=CTX.ENV.getprop("llm.qwen.base_url", raise_error=True),
- )
- completion = await client.chat.completions.create(
- model="qwen-plus",
- messages=[{'role': 'system', 'content': person_consumption_prompt.system_prompt},
- {'role': 'user', 'content': final_user_prompt}]
- )
- if not completion.choices:
- raise BizException("LLM响应异常")
- generate_content = completion.choices[0].message.content
- decision_result: FinalDecisionResult = FinalDecisionResult.model_validate(dirtyjson.loads(generate_content))
- return SysResult.success(data=decision_result)
- @risk_router.post('/evidence')
- async def evidence_replenish(invoice_data: dict):
- input_data = {
- "invoice_context": invoice_data
- }
- final_external_evidence_user_prompt = external_evidence_search_prompt.get_external_evidence_user_prompt(
- json.dumps(input_data, ensure_ascii=False))
- tools = [
- {
- "type": "function",
- "function": {
- "name": "ali_search_tool",
- "description": "当需要从互联网获取额外信息时使用",
- "parameters": {
- "type": "object",
- "properties": {
- "keyword": {
- "type": "string",
- "description": "搜索关键词,如果需要限定搜索源可以在结尾加上 <+ 平台名称 >,例如: 如何判断一个企业的经营范围? + 税务局"
- }
- },
- "required": ["keyword"]
- }
- }
- }
- ]
- generate_content = await llm_call(tools=tools, messages=[
- {'role': 'system', 'content': external_evidence_search_prompt.external_evidence_system_prompt},
- {'role': 'user', 'content': final_external_evidence_user_prompt}])
- evidence_result: RiskEvidenceResult = RiskEvidenceResult.model_validate(dirtyjson.loads(generate_content))
- return SysResult.success(data=evidence_result)
- @risk_router.post('/similar')
- async def similar_identification(invoice_data: dict):
- hwmc = invoice_data["hwmc"]
- hw_type = None
- type_match = re.match(r'\*([^*]+)\*', hwmc)
- if type_match:
- hw_type = type_match[0]
- info = re.sub(r'\*([^*]+)\*', "", hwmc)
- if not hw_type or not info:
- return SysResult.fail(msg="货物信息不符合规范")
- system_prompt = similar_identification_prompt.similar_identification_system_prompt
- user_prompt = similar_identification_prompt.get_similar_identification_user_prompt(info, hw_type)
- client = openai.AsyncOpenAI(
- api_key=CTX.ENV.getprop("llm.qwen.api_key", raise_error=True),
- base_url=CTX.ENV.getprop("llm.qwen.base_url", raise_error=True),
- )
- tools = [
- {
- "type": "function",
- "function": {
- "name": "ali_search_tool",
- "description": "当需要从互联网获取额外信息时使用",
- "parameters": {
- "type": "object",
- "properties": {
- "keyword": {
- "type": "string",
- "description": "搜索关键词,如果需要限定搜索源可以在结尾加上 <+ 平台名称 >,例如: 行业指标 + 税务局"
- }
- },
- "required": ["keyword"]
- }
- }
- }
- ]
- generate_content = await llm_call(tools=tools, messages=[
- {'role': 'system', 'content': system_prompt},
- {'role': 'user', 'content': user_prompt}])
- identification_result: SimilarIdentificationResult = SimilarIdentificationResult.model_validate(
- dirtyjson.loads(generate_content))
- return SysResult.success(data=identification_result)
|