Files
ieemoo-ai-isempty/ieemoo-ai-isempty.py
2022-11-09 03:39:36 +00:00

168 lines
6.3 KiB
Python
Executable File

# -*- coding: utf-8 -*-
from flask import request, Flask
import numpy as np
import json
import time
import cv2, base64
import argparse
import sys, os
import torch
from gevent.pywsgi import WSGIServer
from PIL import Image
from torchvision import transforms
from models.modeling import VisionTransformer, CONFIGS
from vit_pytorch import ViT
import lightrise
# import logging.config as log_config
sys.path.insert(0, ".")
#Flask对外服务接口
# from skywalking import agent, config
# SW_SERVER = os.environ.get('SW_AGENT_COLLECTOR_BACKEND_SERVICES')
# SW_SERVICE_NAME = os.environ.get('SW_AGENT_NAME')
# if SW_SERVER and SW_SERVICE_NAME:
# config.init() #采集服务的地址,给自己的服务起个名称
# #config.init(collector="123.60.56.51:11800", service='ieemoo-ai-search') #采集服务的地址,给自己的服务起个名称
# agent.start()
# def setup_logging(path):
# if os.path.exists(path):
# with open(path, 'r') as f:
# config = json.load(f)
# log_config.dictConfig(config)
# print = logging.getprint("root")
# return print
# print = setup_logging('utils/logging.json')
app = Flask(__name__)
#app.use_reloader=False
print("Autor:ieemoo_lc&ieemoo_lx")
print(torch.__version__)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--img_size", default=600, type=int, help="Resolution size")
parser.add_argument('--split', type=str, default='overlap', help="Split method")
parser.add_argument('--slide_step', type=int, default=2, help="Slide step for overlap split")
parser.add_argument('--smoothing_value', type=float, default=0.0, help="Label smoothing value")
parser.add_argument("--pretrained_model", type=str, default="../module/ieemoo-ai-isempty/model/new/ieemooempty_vit_checkpoint.pth", help="load pretrained model")
#parser.add_argument("--pretrained_model", type=str, default="output/ieemooempty_vit_checkpoint.pth", help="load pretrained model") #使用自定义VIT
opt, unknown = parser.parse_known_args()
return opt
class Predictor(object):
def __init__(self, args):
self.args = args
#self.args.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#self.args.device = torch.device("cpu")
#print(self.args.device)
#self.args.nprocs = torch.cuda.device_count()
self.cls_dict = {}
self.num_classes = 0
self.model = None
self.prepare_model()
self.test_transform = transforms.Compose([transforms.Resize((600, 600), Image.BILINEAR),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
def prepare_model(self):
# config = CONFIGS["ViT-B_16"]
# config.split = self.args.split
# config.slide_step = self.args.slide_step
# self.num_classes = 5
# self.cls_dict = {0: "noemp", 1: "yesemp", 2: "hard", 3: "fly", 4: "stack"}
# self.model = VisionTransformer(config, self.args.img_size, zero_head=True, num_classes=self.num_classes, smoothing_value=self.args.smoothing_value)
# if self.args.pretrained_model is not None:
# if not torch.cuda.is_available():
# self.model = torch.load(self.args.pretrained_model)
# else:
# self.model = torch.load(self.args.pretrained_model,map_location='cpu')
self.model = torch.load(self.args.pretrained_model)
self.model.eval()
self.model.to("cuda")
def normal_predict(self, img_data, result):
# img = Image.open(img_path)
if img_data is None:
#print('error, img data is None')
print('error, img data is None')
return result
else:
with torch.no_grad():
x = self.test_transform(img_data)
if torch.cuda.is_available():
x = x.cuda()
part_logits = self.model(x.unsqueeze(0))
probs = torch.nn.Softmax(dim=-1)(part_logits)
topN = torch.argsort(probs, dim=-1, descending=True).tolist()
clas_ids = topN[0][0]
clas_ids = 0 if 0==int(clas_ids) or 2 == int(clas_ids) or 3 == int(clas_ids) else 1
#print("cur_img result: class id: %d, score: %0.3f" % (clas_ids, probs[0, clas_ids].item()))
result={}
result["success"] = "true"
result["rst_cls"] = str(clas_ids)
return result
args = parse_args()
predictor = Predictor(args)
@app.route("/isempty", methods=['POST'])
def get_isempty():
start = time.time()
#print('--------------------EmptyPredict-----------------')
data = request.get_data()
ip = request.remote_addr
#print('------ ip = %s ------' % ip)
print(ip)
json_data = json.loads(data.decode("utf-8"))
getdateend = time.time()
#print('get date use time: {0:.2f}s'.format(getdateend - start))
pic = json_data.get("pic")
result = {}
imgdata = base64.b64decode(pic)
imgdata_np = np.frombuffer(imgdata, dtype='uint8')
img_src = cv2.imdecode(imgdata_np, cv2.IMREAD_COLOR)
cv2.imwrite('huanyuan.jpg',img_src)
#img_data = Image.fromarray(np.uint8(img_src)) #这个转换不能要,会导致判空错误增加
img_data = Image.open('huanyuan.jpg')
result = predictor.normal_predict(img_data, result) # 1==empty, 0==nonEmpty
#riseresult = lightrise.riseempty(img_data)
#print(riseresult["rst_cls"])
# if(result["rst_cls"]==1):
# if(riseresult["rst_cls"]==1):
# result = {}
# result["success"] = "true"
# result["rst_cls"] = 1
# else:
# result = {}
# result["success"] = "true"
# result["rst_cls"] = 0
# else:
# if(riseresult["rst_cls"]==0):
# result = {}
# result["success"] = "true"
# result["rst_cls"] = 0
# else:
# result = {}
# result["success"] = "true"
# result["rst_cls"] = 1
return repr(result)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=8888)