145 lines
6.3 KiB
Python
145 lines
6.3 KiB
Python
from similar_analysis import SimilarAnalysis
|
|
import os
|
|
import pickle
|
|
from tools.image_joint import merge_imgs
|
|
|
|
|
|
class EventSimilarAnalysis(SimilarAnalysis):
|
|
def __init__(self):
|
|
super(EventSimilarAnalysis, self).__init__()
|
|
self.fn_one2one_event, self.fp_one2one_event = self.One2one_similar_analysis()
|
|
self.fn_one2sn_event, self.fp_one2sn_event = self.One2Sn_similar_analysis()
|
|
if os.path.exists(self.conf['event']['pickle_path']):
|
|
print('pickle file exists')
|
|
else:
|
|
self.target_image = self.get_path()
|
|
|
|
def get_path(self):
|
|
events = [self.fn_one2one_event, self.fp_one2one_event,
|
|
self.fn_one2sn_event, self.fp_one2sn_event]
|
|
event_image_path = []
|
|
barcode_image_path = []
|
|
for event in events:
|
|
for event_name, bcd in event:
|
|
event_sub_image = os.sep.join([self.conf['event']['event_save_dir'],
|
|
event_name,
|
|
'subimgs'])
|
|
barcode_images = os.sep.join([self.conf['event']['stdlib_image_path'],
|
|
bcd])
|
|
for image_name in os.listdir(event_sub_image):
|
|
event_image_path.append(os.sep.join([event_sub_image, image_name]))
|
|
for barcode in os.listdir(barcode_images):
|
|
barcode_image_path.append(os.sep.join([barcode_images, barcode]))
|
|
return list(set(event_image_path + barcode_image_path))
|
|
|
|
|
|
def write_dict_to_pickle(self, data):
|
|
"""将字典写入pickle文件."""
|
|
with open(self.conf['event']['pickle_path'], 'wb') as file:
|
|
pickle.dump(data, file)
|
|
|
|
def get_dict_to_pickle(self):
|
|
with open(self.conf['event']['pickle_path'], 'rb') as f:
|
|
data = pickle.load(f)
|
|
return data
|
|
|
|
def create_total_feature(self):
|
|
feature_dicts = self.get_feature_map(self.target_image)
|
|
self.write_dict_to_pickle(feature_dicts)
|
|
print(feature_dicts)
|
|
|
|
def One2one_similar_analysis(self):
|
|
fn_event, fp_event = [], []
|
|
with open(self.conf['event']['oneToOneTxt'], 'r') as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
print(line.strip().split(' '))
|
|
event_infor = line.strip().split(' ')
|
|
label = event_infor[0]
|
|
event_name = event_infor[1]
|
|
bcd = event_infor[2]
|
|
simi1 = event_infor[3]
|
|
simi2 = event_infor[4]
|
|
if label == 'same' and float(simi2) < self.conf['event']['oneToOne_max_th']:
|
|
print(event_name, bcd, simi1)
|
|
fn_event.append((event_name, bcd))
|
|
elif label == 'diff' and float(simi2) > self.conf['event']['oneToSn_min_th']:
|
|
fp_event.append((event_name, bcd))
|
|
return fn_event, fp_event
|
|
|
|
def One2Sn_similar_analysis(self):
|
|
fn_event, fp_event = [], []
|
|
with open(self.conf['event']['oneToOneTxt'], 'r') as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
print(line.strip().split(' '))
|
|
event_infor = line.strip().split(' ')
|
|
label = event_infor[0]
|
|
event_name = event_infor[1]
|
|
bcd = event_infor[2]
|
|
simi = event_infor[3]
|
|
if label == 'fn':
|
|
print(event_name, bcd, simi)
|
|
fn_event.append((event_name, bcd))
|
|
elif label == 'fp':
|
|
fp_event.append((event_name, bcd))
|
|
return fn_event, fp_event
|
|
|
|
def save_joint_image(self, img_pth1, img_pth2, feature_dicts, record):
|
|
feature_dict1 = feature_dicts[img_pth1]
|
|
feature_dict2 = feature_dicts[img_pth2]
|
|
similarity = self.get_similarity(feature_dict1.cpu().numpy(),
|
|
feature_dict2.cpu().numpy())
|
|
dir_name = img_pth1.split('/')[-3]
|
|
save_path = os.sep.join([self.conf['data']['image_joint_pth'], dir_name, record])
|
|
if "fp" in record:
|
|
if similarity > 0.8:
|
|
merge_imgs(img_pth1,
|
|
img_pth2,
|
|
self.conf,
|
|
similarity,
|
|
label=None,
|
|
cam=self.cam,
|
|
save_path=save_path)
|
|
else:
|
|
if similarity < 0.8:
|
|
merge_imgs(img_pth1,
|
|
img_pth2,
|
|
self.conf,
|
|
similarity,
|
|
label=None,
|
|
cam=self.cam,
|
|
save_path=save_path)
|
|
print(similarity)
|
|
|
|
def get_contrast(self, feature_dicts):
|
|
events_compare = [self.fp_one2one_event, self.fn_one2one_event, self.fp_one2sn_event, self.fn_one2sn_event]
|
|
event_record = ['fp_one2one', 'fn_one2one', 'fp_one2sn', 'fn_one2sn']
|
|
for event_compare, record in zip(events_compare, event_record):
|
|
for img, img_std in event_compare:
|
|
imgs_pth1 = os.sep.join([self.conf['event']['event_save_dir'],
|
|
img,
|
|
'subimgs'])
|
|
imgs_pth2 = os.sep.join([self.conf['event']['stdlib_image_path'],
|
|
img_std])
|
|
for img1 in os.listdir(imgs_pth1):
|
|
for img2 in os.listdir(imgs_pth2):
|
|
img_pth1 = os.sep.join([imgs_pth1, img1])
|
|
img_pth2 = os.sep.join([imgs_pth2, img2])
|
|
try:
|
|
self.save_joint_image(img_pth1, img_pth2, feature_dicts, record)
|
|
except Exception as e:
|
|
continue
|
|
print(e)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
event_similar_analysis = EventSimilarAnalysis()
|
|
if os.path.exists(event_similar_analysis.conf['event']['pickle_path']):
|
|
print('pickle file exists')
|
|
else:
|
|
event_similar_analysis.create_total_feature() # 生成pickle文件, 生成时间较长,生成一个文件即可
|
|
feature_dicts = event_similar_analysis.get_dict_to_pickle()
|
|
# all_compare_img = event_similar_analysis.get_image_map()
|
|
event_similar_analysis.get_contrast(feature_dicts) # 获取比对结果
|