数据分析
This commit is contained in:
@ -3,7 +3,7 @@ from tools.getHeatMap import cal_cam
|
||||
import os
|
||||
|
||||
|
||||
def merge_imgs(img1_path, img2_path, conf, similar=None, label=None, cam=None):
|
||||
def merge_imgs(img1_path, img2_path, conf, similar=None, label=None, cam=None, save_path=None):
|
||||
save = True
|
||||
position = (50, 50) # 文字的左上角坐标
|
||||
color = (255, 0, 0) # 红色文字,格式为 RGB
|
||||
@ -11,22 +11,25 @@ def merge_imgs(img1_path, img2_path, conf, similar=None, label=None, cam=None):
|
||||
# os.makedirs(os.sep.join([save_path, str(label)]))
|
||||
# save_path = os.sep.join([save_path, str(label)])
|
||||
# img_name = os.path.basename(img1_path).split('.')[0] + '_' + os.path.basename(img2_path).split('.')[0] + '.png'
|
||||
if save_path is None:
|
||||
save_path = conf['data']['image_joint_pth']
|
||||
if not conf['heatmap']['show_heatmap']:
|
||||
img1 = Image.open(img1_path)
|
||||
img2 = Image.open(img2_path)
|
||||
img1 = img1.resize((224, 224))
|
||||
img2 = img2.resize((224, 224))
|
||||
save_path = conf['data']['image_joint_pth']
|
||||
# save_path = conf['data']['image_joint_pth']
|
||||
else:
|
||||
assert cam is not None, 'cam is None'
|
||||
img1 = cam.get_hot_map(img1_path)
|
||||
img2 = cam.get_hot_map(img2_path)
|
||||
save_path = conf['heatmap']['image_joint_pth']
|
||||
# save_path = conf['heatmap']['image_joint_pth']
|
||||
# print('img1_path', img1)
|
||||
# print('img2_path', img2)
|
||||
if not os.path.exists(os.sep.join([save_path, str(label)])):
|
||||
os.makedirs(os.sep.join([save_path, str(label)]))
|
||||
save_path = os.sep.join([save_path, str(label)])
|
||||
if save_path is None:
|
||||
save_path = os.sep.join([save_path, str(label)])
|
||||
img_name = os.path.basename(img1_path).split('.')[0] + '_' + os.path.basename(img2_path).split('.')[0] + '.png'
|
||||
assert img1.height == img2.height
|
||||
|
||||
|
106
tools/similar_analysis.py
Normal file
106
tools/similar_analysis.py
Normal file
@ -0,0 +1,106 @@
|
||||
from configs.utils import trainer_tools
|
||||
from test_ori import group_image, featurize, cosin_metric
|
||||
from tools.dataset import get_transform
|
||||
from tools.getHeatMap import cal_cam
|
||||
from tools.image_joint import merge_imgs
|
||||
import torch.nn as nn
|
||||
import torch
|
||||
from collections import ChainMap
|
||||
import yaml
|
||||
import os
|
||||
|
||||
|
||||
class analysis:
|
||||
def __init__(self):
|
||||
with open('../configs/similar_analysis.yml', 'r') as f:
|
||||
self.conf = yaml.load(f, Loader=yaml.FullLoader)
|
||||
self.model = self.initialize_model(self.conf)
|
||||
_, self.test_transform = get_transform(self.conf)
|
||||
self.cam = cal_cam(self.model, self.conf)
|
||||
|
||||
def initialize_model(self, conf):
|
||||
"""初始化模型和度量方法"""
|
||||
tr_tools = trainer_tools(conf)
|
||||
backbone_mapping = tr_tools.get_backbone()
|
||||
|
||||
if conf['models']['backbone'] in backbone_mapping:
|
||||
model = backbone_mapping[conf['models']['backbone']]()
|
||||
else:
|
||||
raise ValueError('不支持该模型: {}'.format({conf['models']['backbone']}))
|
||||
try:
|
||||
model.load_state_dict(torch.load(conf['models']['model_path'],
|
||||
map_location=conf['base']['device']))
|
||||
except:
|
||||
state_dict = torch.load(conf['models']['model_path'],
|
||||
map_location=conf['base']['device'])
|
||||
new_state_dict = {}
|
||||
for k, v in state_dict.items():
|
||||
new_key = k.replace("module.", "")
|
||||
new_state_dict[new_key] = v
|
||||
model.load_state_dict(new_state_dict, strict=False)
|
||||
return model.eval()
|
||||
|
||||
def get_feature(self, img_pth):
|
||||
group = group_image([img_pth], self.conf['data']['val_batch_size'])
|
||||
feature = featurize(group[0], self.test_transform, self.model, self.conf['base']['device'])
|
||||
return feature
|
||||
|
||||
def get_similarity(self, feature_dict1, feature_dict2):
|
||||
similarity = cosin_metric(feature_dict1, feature_dict2)
|
||||
print(f"Similarity: {similarity}")
|
||||
return similarity
|
||||
|
||||
def get_feature_map(self, all_imgs):
|
||||
feature_dicts = {}
|
||||
for img_pth in all_imgs:
|
||||
feature_dict = self.get_feature(img_pth)
|
||||
feature_dicts = dict(ChainMap(feature_dict, feature_dicts))
|
||||
return feature_dicts
|
||||
|
||||
def get_image_map(self):
|
||||
all_compare_img = []
|
||||
for root, dirs, files in os.walk(self.conf['data']['data_dir']):
|
||||
if len(dirs) == 2:
|
||||
dir_pth_1 = os.sep.join([root, dirs[0]])
|
||||
dir_pth_2 = os.sep.join([root, dirs[1]])
|
||||
for img_name_1 in os.listdir(dir_pth_1):
|
||||
for img_name_2 in os.listdir(dir_pth_2):
|
||||
all_compare_img.append((os.sep.join([dir_pth_1, img_name_1]),
|
||||
os.sep.join([dir_pth_2, img_name_2])))
|
||||
return all_compare_img
|
||||
|
||||
def create_total_feature(self):
|
||||
all_imgs = []
|
||||
for root, dirs, files in os.walk(self.conf['data']['data_dir']):
|
||||
if len(dirs) == 2:
|
||||
for dir_name in dirs:
|
||||
dir_pth = os.sep.join([root, dir_name])
|
||||
for img_name in os.listdir(dir_pth):
|
||||
all_imgs.append(os.sep.join([dir_pth, img_name]))
|
||||
return all_imgs
|
||||
|
||||
def get_contrast_result(self, feature_dicts, all_compare_img):
|
||||
for img_pth1, img_pth2 in all_compare_img:
|
||||
feature_dict1 = feature_dicts[img_pth1]
|
||||
feature_dict2 = feature_dicts[img_pth2]
|
||||
similarity = self.get_similarity(feature_dict1.cpu().numpy(),
|
||||
feature_dict2.cpu().numpy())
|
||||
dir_name = img_pth1.split(os.sep)[-3]
|
||||
save_path = os.sep.join([self.conf['data']['image_joint_pth'], dir_name])
|
||||
if similarity > 0.7:
|
||||
merge_imgs(img_pth1,
|
||||
img_pth2,
|
||||
self.conf,
|
||||
similarity,
|
||||
label=None,
|
||||
cam=self.cam,
|
||||
save_path=save_path)
|
||||
print(similarity)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
ana = analysis()
|
||||
all_imgs = ana.create_total_feature()
|
||||
feature_dicts = ana.get_feature_map(all_imgs)
|
||||
all_compare_img = ana.get_image_map()
|
||||
ana.get_contrast_result(feature_dicts, all_compare_img)
|
Reference in New Issue
Block a user