增加queueing对接

main
million 2024-11-21 22:52:02 +08:00
parent 6e66ccffd6
commit 26ea7e395d
8 changed files with 175 additions and 58 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
/venv/
/__pycache__/
/__pycache__/
/logs

22
config.json 100644
View File

@ -0,0 +1,22 @@
{
"abp": {
"api_base": "https://stag-abp-api.iconsz.com",
"tenant": "ics",
"username": "ics",
"password": "1qaz@WSX",
"client_id": "engine_InfoExtractor",
"client_secret": "7ii7U9AabN2*3000",
"scope": "FX"
},
"engine": {
},
"queue_processor": {
"get_pending_queue_interval_seconds": 2
},
"queue_client": {
"check_queue_interval_seconds": 1,
"call_wait_return_default_timeout_seconds": 60
}
}

View File

@ -1,3 +1,4 @@
API_KEY = 'ollama'
MODEL = 'qwen2.5:32b-instruct-q5_K_M'
LLM_BASE_URL = 'http://localhost:11434/v1'
API_KEY = "ollama"
MODEL = "qwen2.5:32b-instruct-q5_K_M"
# MODEL = "qwen2.5:7b"
LLM_BASE_URL = "http://localhost:11434/v1"

View File

@ -0,0 +1,39 @@
from dotenv import load_dotenv
load_dotenv()
import time
from openai import OpenAI
from datetime import datetime
import config, debug, prompt, json
"""
备注
目前使用ollama部署的qwen2.5:32b-instruct-q5_K_M,
如果输入字符串过长(大致超过1000个字符或1500token)会因上下文窗口过短导致提示词被遗忘
进而输出不合规定的结果
"""
llm = OpenAI(base_url=config.LLM_BASE_URL, api_key=config.API_KEY)
class entityExtractionProcess:
def entity_extract(input_text):
messages = [
{"role": "system", "content": prompt.ENTITY_EXTRACT},
{"role": "system", "content": f"今天的日期是:{str(datetime.today())}"},
{"role": "user", "content": f"{input_text}"},
]
timenow = time.time()
response = (
llm.chat.completions.create(
model=config.MODEL, messages=messages, temperature=0, max_tokens=128_000
)
.choices[0]
.message.content
)
print(f"本次输出花费时间:{time.time() - timenow}")
return json.loads(response)

View File

@ -0,0 +1,2 @@
git clone https://gitea.iconsz.com/iCON/iCloudEngine.git
pause

95
main.py
View File

@ -1,70 +1,57 @@
from dotenv import load_dotenv
load_dotenv()
import time
from openai import OpenAI
from datetime import datetime
import config, debug, prompt
from entity_extraction_process import entityExtractionProcess
import os, sys, time, json, traceback
from utils.logger import logger
DEBUG = True
'''
备注
目前使用ollama部署的qwen2.5:32b-instruct-q5_K_M,
如果输入字符串过长(大致超过1000个字符或1500token)会因上下文窗口过短导致提示词被遗忘
进而输出不合规定的结果
'''
base_dir = os.path.dirname(os.path.abspath(__file__))
relative_path = os.path.join(base_dir, "iCloudEngine/src")
sys.path.append(relative_path)
from queue_processor import QueueProcessor
llm = OpenAI(
base_url=config.LLM_BASE_URL,
api_key=config.API_KEY
)
DEBUG = False
def get_task() -> str:
# 从引擎中获取任务的接口
# 返回的类型应为字符串或None
raise NotImplementedError
def push_result(result):
# 将提取结果返回给引擎的接口
raise NotImplementedError
class Main(QueueProcessor):
def processor_handle(self, input):
print("input:", input)
currentEngineId = input["currentEngineId"]
inputData = json.loads(input["inputData"])
match currentEngineId:
case 3000: # InfoExtractor 实体信息提取
input_text = inputData["text"]
return entityExtractionProcess.entity_extract(input_text)
def entity_extract(input_text):
messages = [
{"role": "system", "content": prompt.ENTITY_EXTRACT},
{"role": "system", "content": f"今天的日期是:{str(datetime.today())}"},
{"role": "user", "content": f"{input_text}"}
]
timenow = time.time()
response = llm.chat.completions.create(
model=config.MODEL,
messages=messages,
temperature=0,
max_tokens=128_000
).choices[0].message.content
print(f"本次输出花费时间:{time.time() - timenow}")
return response
def run():
if DEBUG:
input_text = debug.input_text
else:
input_text = get_task()
if not input_text:
return
response = entity_extract(input_text)
if DEBUG:
print(response)
else:
push_result(response)
if __name__ == "__main__":
if DEBUG:
run()
input_text = debug.input_text
response = entityExtractionProcess.entity_extract(input_text)
print(response)
exit()
while True:
run()
time.sleep(5)
else:
logger.warning("application start")
try:
Main().startV2()
except Exception as e:
logger.warning(f"excute exception:{e}")
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_details = traceback.extract_tb(exc_traceback)
filename, line_number, function_name, text = traceback_details[-1]
logger.error(f"Exception occurred in {filename} at line {line_number}: {e}")
logger.error(f"Function name: {function_name}")
logger.error(f"Text: {text}")
import traceback
logger.warning(f"Crawler_NewsCN {traceback.format_exc()}")
finally:
logger.warning("application completed")

23
submit_test.py 100644
View File

@ -0,0 +1,23 @@
import os, sys
base_dir = os.path.dirname(os.path.abspath(__file__))
relative_path = os.path.join(base_dir, "iCloudEngine/src")
sys.path.append(relative_path)
from queue_client import QueueClient
class submit_test:
def submit(code, data):
client = QueueClient()
returnData = client.call(code, data)
print(returnData)
# # 创建一个测试队列3000 / 实体关系提取。输入一段话,提取人物姓名、性别、出生日期、年龄、职位等信息
submit_test.submit(
3000,
{
"text": "范九伦简历\n\n范九伦汉族1964年11月生研究生民盟盟员、中共党员现任民盟中央常委、陕西省委会主委省政协副主席西安邮电大学校长"
},
)

42
utils/logger.py 100644
View File

@ -0,0 +1,42 @@
import logging
import os
from datetime import datetime
import requests
if not os.path.exists("logs"):
os.makedirs("logs")
# 配置日志的基本设置
logging.basicConfig(
filename=os.path.join("logs", datetime.now().strftime("%Y-%m-%d") + ".log"),
level=logging.WARNING,
format="%(asctime)s:%(levelname)s:%(message)s",
filemode="a",
encoding="utf-8",
)
# 设置requests库的日志级别为WARNING或更高以确保不记录INFO或DEBUG级别的日志
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("elasticsearch").setLevel(logging.WARNING)
class logger:
# def debug(msg: str):
# logging.debug(msg)
# def info(msg: str):
# logging.info(msg)
def warning(msg, printLog=True):
logging.warning(msg)
if printLog == True:
print(msg)
def error(msg: str, printLog=True):
logging.error(msg)
if printLog == True:
print(msg)
# def critical(msg: str):
# logging.critical(msg)