from similar_analysis import SimilarAnalysis import os import pickle from tools.image_joint import merge_imgs class EventSimilarAnalysis(SimilarAnalysis): def __init__(self): super(EventSimilarAnalysis, self).__init__() self.fn_one2one_event, self.fp_one2one_event = self.One2one_similar_analysis() self.fn_one2sn_event, self.fp_one2sn_event = self.One2Sn_similar_analysis() if os.path.exists(self.conf['event']['pickle_path']): print('pickle file exists') else: self.target_image = self.get_path() def get_path(self): events = [self.fn_one2one_event, self.fp_one2one_event, self.fn_one2sn_event, self.fp_one2sn_event] event_image_path = [] barcode_image_path = [] for event in events: for event_name, bcd in event: event_sub_image = os.sep.join([self.conf['event']['event_save_dir'], event_name, 'subimgs']) barcode_images = os.sep.join([self.conf['event']['stdlib_image_path'], bcd]) for image_name in os.listdir(event_sub_image): event_image_path.append(os.sep.join([event_sub_image, image_name])) for barcode in os.listdir(barcode_images): barcode_image_path.append(os.sep.join([barcode_images, barcode])) return list(set(event_image_path + barcode_image_path)) def write_dict_to_pickle(self, data): """将字典写入pickle文件.""" with open(self.conf['event']['pickle_path'], 'wb') as file: pickle.dump(data, file) def get_dict_to_pickle(self): with open(self.conf['event']['pickle_path'], 'rb') as f: data = pickle.load(f) return data def create_total_feature(self): feature_dicts = self.get_feature_map(self.target_image) self.write_dict_to_pickle(feature_dicts) print(feature_dicts) def One2one_similar_analysis(self): fn_event, fp_event = [], [] with open(self.conf['event']['oneToOneTxt'], 'r') as f: lines = f.readlines() for line in lines: print(line.strip().split(' ')) event_infor = line.strip().split(' ') label = event_infor[0] event_name = event_infor[1] bcd = event_infor[2] simi1 = event_infor[3] simi2 = event_infor[4] if label == 'same' and float(simi2) < self.conf['event']['oneToOne_max_th']: print(event_name, bcd, simi1) fn_event.append((event_name, bcd)) elif label == 'diff' and float(simi2) > self.conf['event']['oneToSn_min_th']: fp_event.append((event_name, bcd)) return fn_event, fp_event def One2Sn_similar_analysis(self): fn_event, fp_event = [], [] with open(self.conf['event']['oneToOneTxt'], 'r') as f: lines = f.readlines() for line in lines: print(line.strip().split(' ')) event_infor = line.strip().split(' ') label = event_infor[0] event_name = event_infor[1] bcd = event_infor[2] simi = event_infor[3] if label == 'fn': print(event_name, bcd, simi) fn_event.append((event_name, bcd)) elif label == 'fp': fp_event.append((event_name, bcd)) return fn_event, fp_event def save_joint_image(self, img_pth1, img_pth2, feature_dicts, record): feature_dict1 = feature_dicts[img_pth1] feature_dict2 = feature_dicts[img_pth2] similarity = self.get_similarity(feature_dict1.cpu().numpy(), feature_dict2.cpu().numpy()) dir_name = img_pth1.split('/')[-3] save_path = os.sep.join([self.conf['data']['image_joint_pth'], dir_name, record]) if "fp" in record: if similarity > 0.8: merge_imgs(img_pth1, img_pth2, self.conf, similarity, label=None, cam=self.cam, save_path=save_path) else: if similarity < 0.8: merge_imgs(img_pth1, img_pth2, self.conf, similarity, label=None, cam=self.cam, save_path=save_path) print(similarity) def get_contrast(self, feature_dicts): events_compare = [self.fp_one2one_event, self.fn_one2one_event, self.fp_one2sn_event, self.fn_one2sn_event] event_record = ['fp_one2one', 'fn_one2one', 'fp_one2sn', 'fn_one2sn'] for event_compare, record in zip(events_compare, event_record): for img, img_std in event_compare: imgs_pth1 = os.sep.join([self.conf['event']['event_save_dir'], img, 'subimgs']) imgs_pth2 = os.sep.join([self.conf['event']['stdlib_image_path'], img_std]) for img1 in os.listdir(imgs_pth1): for img2 in os.listdir(imgs_pth2): img_pth1 = os.sep.join([imgs_pth1, img1]) img_pth2 = os.sep.join([imgs_pth2, img2]) try: self.save_joint_image(img_pth1, img_pth2, feature_dicts, record) except Exception as e: continue print(e) if __name__ == '__main__': event_similar_analysis = EventSimilarAnalysis() if os.path.exists(event_similar_analysis.conf['event']['pickle_path']): print('pickle file exists') else: event_similar_analysis.create_total_feature() # 生成pickle文件, 生成时间较长,生成一个文件即可 feature_dicts = event_similar_analysis.get_dict_to_pickle() # all_compare_img = event_similar_analysis.get_image_map() event_similar_analysis.get_contrast(feature_dicts) # 获取比对结果