from similar_analysis import SimilarAnalysis import os import pickle from tools.image_joint import merge_imgs import yaml from PIL import Image import torch from sklearn.metrics.pairwise import cosine_similarity import numpy as np import matplotlib.pyplot as plt ''' 轨迹图与标准库之间的相似度分析 1.用于生成轨迹图与标准库中所有图片的相似度 2.用于分析轨迹图与标准库比对选取策略的判断 ''' class picDirSimilarAnalysis(SimilarAnalysis): def __init__(self): super(picDirSimilarAnalysis, self).__init__() with open('../configs/pic_pic_similar.yml', 'r') as f: self.conf = yaml.load(f, Loader=yaml.FullLoader) if not os.path.exists(self.conf['data']['total_pkl']): # self.create_total_feature() self.create_total_pkl() if os.path.exists(self.conf['data']['total_pkl']): self.all_dicts = self.load_dict_from_pkl() def is_image_file(self, filename): """ 检查文件是否为图像文件 """ image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff') return filename.lower().endswith(image_extensions) def create_total_pkl(self): # 将目录下所有的图片特征存入pkl文件 all_images_feature_dict = {} for roots, dirs, files in os.walk(self.conf['data']['data_dir']): for file_name in files: if self.is_image_file(file_name): try: print(f"处理图像 {os.sep.join([roots, file_name])}") feature = self.extract_features(os.sep.join([roots, file_name])) except Exception as e: print(f"处理图像 {os.sep.join([roots, file_name])} 时出错: {e}") feature = None all_images_feature_dict[os.sep.join([roots, file_name])] = feature if not os.path.exists(self.conf['data']['total_pkl']): with open(self.conf['data']['total_pkl'], 'wb') as f: pickle.dump(all_images_feature_dict, f) def load_dict_from_pkl(self): with open(self.conf['data']['total_pkl'], 'rb') as f: data = pickle.load(f) print(f"字典已从 {self.conf['data']['total_pkl']} 加载") return data def get_image_files(self, folder_path): """ 获取文件夹中的所有图像文件 """ image_files = [] for root, _, files in os.walk(folder_path): for file in files: if self.is_image_file(file): image_files.append(os.path.join(root, file)) return image_files def extract_features(self, image_path): feature_dict = self.get_feature(image_path) return feature_dict[image_path] def create_one_similarity_matrix(self, folder1_path, folder2_path): images1 = self.get_image_files(folder1_path) images2 = self.get_image_files(folder2_path) print(f"文件夹1 ({folder1_path}) 包含 {len(images1)} 张图像") print(f"文件夹2 ({folder2_path}) 包含 {len(images2)} 张图像") if len(images1) == 0 or len(images2) == 0: raise ValueError("至少有一个文件夹中没有图像文件") # 提取文件夹1中的所有图像特征 features1 = [] print("正在提取文件夹1中的图像特征...") for i, img_path in enumerate(images1): try: # feature = self.extract_features(img_path) feature = self.all_dicts[img_path] features1.append(feature.cpu().numpy()) # if (i + 1) % 10 == 0: # print(f"已处理 {i + 1}/{len(images1)} 张图像") except Exception as e: print(f"处理图像 {img_path} 时出错: {e}") features1.append(None) # 提取文件夹2中的所有图像特征 features2 = [] print("正在提取文件夹2中的图像特征...") for i, img_path in enumerate(images2): try: # feature = self.extract_features(img_path) feature = self.all_dicts[img_path] features2.append(feature.cpu().numpy()) # if (i + 1) % 10 == 0: # print(f"已处理 {i + 1}/{len(images2)} 张图像") except Exception as e: print(f"处理图像 {img_path} 时出错: {e}") features2.append(None) # 移除处理失败的图像 valid_features1 = [] valid_images1 = [] for i, feature in enumerate(features1): if feature is not None: valid_features1.append(feature) valid_images1.append(images1[i]) valid_features2 = [] valid_images2 = [] for i, feature in enumerate(features2): if feature is not None: valid_features2.append(feature) valid_images2.append(images2[i]) # print(f"文件夹1中成功处理 {len(valid_features1)} 张图像") # print(f"文件夹2中成功处理 {len(valid_features2)} 张图像") if len(valid_features1) == 0 or len(valid_features2) == 0: raise ValueError("没有成功处理任何图像") # 计算相似度矩阵 print("正在计算相似度矩阵...") similarity_matrix = cosine_similarity(valid_features1, valid_features2) return similarity_matrix, valid_images1, valid_images2 def get_group_similarity_matrix(self, folder_path): tracking_folder = os.sep.join([folder_path, 'tracking']) standard_folder = os.sep.join([folder_path, 'standard_slim']) for dir_name in os.listdir(tracking_folder): tracking_dir = os.sep.join([tracking_folder, dir_name]) standard_dir = os.sep.join([standard_folder, dir_name]) similarity_matrix, valid_images1, valid_images2 = self.create_one_similarity_matrix(tracking_dir, standard_dir) mean_similarity = np.mean(similarity_matrix) std_similarity = np.std(similarity_matrix) max_similarity = np.max(similarity_matrix) min_similarity = np.min(similarity_matrix) print(f"文件夹 {dir_name} 的相似度矩阵已计算完成 " f"均值:{mean_similarity} 标准差:{std_similarity} 最大值:{max_similarity} 最小值:{min_similarity}") result = f"{os.path.basename(standard_folder)} {dir_name} {mean_similarity:.3f} {std_similarity:.3f} {max_similarity:.3f} {min_similarity:.3f}" with open(self.conf['data']['result_txt'], 'a') as f: f.write(result + '\n') def read_result_txt(): parts = [] value_num = 2 with open('../configs/pic_pic_similar.yml', 'r') as f: conf = yaml.load(f, Loader=yaml.FullLoader) f.close() with open(conf['data']['result_txt'], 'r') as f: lines = f.readlines() for line in lines: line = line.strip() if line: parts.append(line.split(' ')) parts = np.array(parts) print(parts) labels = ['Mean', 'Std', 'Max', 'Min'] while value_num < 6: dicts = {} for barcode, value in zip(parts[:, 1], parts[:, value_num]): if barcode in dicts: dicts[barcode].append(float(value)) else: dicts[barcode] = [float(value)] get_histogram(dicts, labels[value_num - 2]) value_num += 1 f.close() def get_histogram(data, label=None): # 准备数据 categories = list(data.keys()) values1 = [data[cat][0] for cat in categories] # 第一个值 values2 = [data[cat][1] for cat in categories] # 第二个值 # 设置柱状图的位置 x = np.arange(len(categories)) # 标签位置 width = 0.35 # 柱状图的宽度 # 创建图形和轴 fig, ax = plt.subplots(figsize=(10, 6)) # 绘制柱状图 bars1 = ax.bar(x - width / 2, values1, width, label='standard', color='red', alpha=0.7) bars2 = ax.bar(x + width / 2, values2, width, label='standard_slim', color='green', alpha=0.7) # 在每个柱状图上显示数值 for bar in bars1: height = bar.get_height() ax.annotate(f'{height:.3f}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), # 3点垂直偏移 textcoords="offset points", ha='center', va='bottom', fontsize=12) for bar in bars2: height = bar.get_height() ax.annotate(f'{height:.3f}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), # 3点垂直偏移 textcoords="offset points", ha='center', va='bottom', fontsize=12) # 添加标签和标题 if label is None: label = '' ax.set_xlabel('barcode') ax.set_ylabel('Values') ax.set_title(label) ax.set_xticks(x) ax.set_xticklabels(categories) ax.legend() # 添加网格 ax.grid(True, alpha=0.3) # 调整布局并显示 plt.tight_layout() plt.show() if __name__ == '__main__': # picTopic_matrix = picDirSimilarAnalysis() # picTopic_matrix.get_group_similarity_matrix('/home/lc/data_center/image_analysis/pic_pic_similar_maxtrix') read_result_txt()