243 lines
9.4 KiB
Python
243 lines
9.4 KiB
Python
from similar_analysis import SimilarAnalysis
|
|
import os
|
|
import pickle
|
|
from tools.image_joint import merge_imgs
|
|
import yaml
|
|
from PIL import Image
|
|
import torch
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
'''
|
|
轨迹图与标准库之间的相似度分析
|
|
1.用于生成轨迹图与标准库中所有图片的相似度
|
|
2.用于分析轨迹图与标准库比对选取策略的判断
|
|
'''
|
|
|
|
|
|
class picDirSimilarAnalysis(SimilarAnalysis):
|
|
def __init__(self):
|
|
super(picDirSimilarAnalysis, self).__init__()
|
|
with open('../configs/pic_pic_similar.yml', 'r') as f:
|
|
self.conf = yaml.load(f, Loader=yaml.FullLoader)
|
|
if not os.path.exists(self.conf['data']['total_pkl']):
|
|
# self.create_total_feature()
|
|
self.create_total_pkl()
|
|
if os.path.exists(self.conf['data']['total_pkl']):
|
|
self.all_dicts = self.load_dict_from_pkl()
|
|
|
|
def is_image_file(self, filename):
|
|
"""
|
|
检查文件是否为图像文件
|
|
"""
|
|
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff')
|
|
return filename.lower().endswith(image_extensions)
|
|
|
|
def create_total_pkl(self): # 将目录下所有的图片特征存入pkl文件
|
|
all_images_feature_dict = {}
|
|
for roots, dirs, files in os.walk(self.conf['data']['data_dir']):
|
|
for file_name in files:
|
|
if self.is_image_file(file_name):
|
|
try:
|
|
print(f"处理图像 {os.sep.join([roots, file_name])}")
|
|
feature = self.extract_features(os.sep.join([roots, file_name]))
|
|
except Exception as e:
|
|
print(f"处理图像 {os.sep.join([roots, file_name])} 时出错: {e}")
|
|
feature = None
|
|
all_images_feature_dict[os.sep.join([roots, file_name])] = feature
|
|
if not os.path.exists(self.conf['data']['total_pkl']):
|
|
with open(self.conf['data']['total_pkl'], 'wb') as f:
|
|
pickle.dump(all_images_feature_dict, f)
|
|
|
|
def load_dict_from_pkl(self):
|
|
with open(self.conf['data']['total_pkl'], 'rb') as f:
|
|
data = pickle.load(f)
|
|
print(f"字典已从 {self.conf['data']['total_pkl']} 加载")
|
|
return data
|
|
|
|
def get_image_files(self, folder_path):
|
|
"""
|
|
获取文件夹中的所有图像文件
|
|
"""
|
|
image_files = []
|
|
for root, _, files in os.walk(folder_path):
|
|
for file in files:
|
|
if self.is_image_file(file):
|
|
image_files.append(os.path.join(root, file))
|
|
return image_files
|
|
|
|
def extract_features(self, image_path):
|
|
feature_dict = self.get_feature(image_path)
|
|
return feature_dict[image_path]
|
|
|
|
def create_one_similarity_matrix(self, folder1_path, folder2_path):
|
|
images1 = self.get_image_files(folder1_path)
|
|
images2 = self.get_image_files(folder2_path)
|
|
|
|
print(f"文件夹1 ({folder1_path}) 包含 {len(images1)} 张图像")
|
|
print(f"文件夹2 ({folder2_path}) 包含 {len(images2)} 张图像")
|
|
|
|
if len(images1) == 0 or len(images2) == 0:
|
|
raise ValueError("至少有一个文件夹中没有图像文件")
|
|
|
|
# 提取文件夹1中的所有图像特征
|
|
features1 = []
|
|
print("正在提取文件夹1中的图像特征...")
|
|
for i, img_path in enumerate(images1):
|
|
try:
|
|
# feature = self.extract_features(img_path)
|
|
feature = self.all_dicts[img_path]
|
|
features1.append(feature.cpu().numpy())
|
|
# if (i + 1) % 10 == 0:
|
|
# print(f"已处理 {i + 1}/{len(images1)} 张图像")
|
|
except Exception as e:
|
|
print(f"处理图像 {img_path} 时出错: {e}")
|
|
features1.append(None)
|
|
|
|
# 提取文件夹2中的所有图像特征
|
|
features2 = []
|
|
print("正在提取文件夹2中的图像特征...")
|
|
for i, img_path in enumerate(images2):
|
|
try:
|
|
# feature = self.extract_features(img_path)
|
|
feature = self.all_dicts[img_path]
|
|
features2.append(feature.cpu().numpy())
|
|
# if (i + 1) % 10 == 0:
|
|
# print(f"已处理 {i + 1}/{len(images2)} 张图像")
|
|
except Exception as e:
|
|
print(f"处理图像 {img_path} 时出错: {e}")
|
|
features2.append(None)
|
|
|
|
# 移除处理失败的图像
|
|
valid_features1 = []
|
|
valid_images1 = []
|
|
for i, feature in enumerate(features1):
|
|
if feature is not None:
|
|
valid_features1.append(feature)
|
|
valid_images1.append(images1[i])
|
|
|
|
valid_features2 = []
|
|
valid_images2 = []
|
|
for i, feature in enumerate(features2):
|
|
if feature is not None:
|
|
valid_features2.append(feature)
|
|
valid_images2.append(images2[i])
|
|
|
|
# print(f"文件夹1中成功处理 {len(valid_features1)} 张图像")
|
|
# print(f"文件夹2中成功处理 {len(valid_features2)} 张图像")
|
|
|
|
if len(valid_features1) == 0 or len(valid_features2) == 0:
|
|
raise ValueError("没有成功处理任何图像")
|
|
|
|
# 计算相似度矩阵
|
|
print("正在计算相似度矩阵...")
|
|
similarity_matrix = cosine_similarity(valid_features1, valid_features2)
|
|
|
|
return similarity_matrix, valid_images1, valid_images2
|
|
|
|
def get_group_similarity_matrix(self, folder_path):
|
|
tracking_folder = os.sep.join([folder_path, 'tracking'])
|
|
standard_folder = os.sep.join([folder_path, 'standard_slim'])
|
|
for dir_name in os.listdir(tracking_folder):
|
|
tracking_dir = os.sep.join([tracking_folder, dir_name])
|
|
standard_dir = os.sep.join([standard_folder, dir_name])
|
|
similarity_matrix, valid_images1, valid_images2 = self.create_one_similarity_matrix(tracking_dir,
|
|
standard_dir)
|
|
mean_similarity = np.mean(similarity_matrix)
|
|
std_similarity = np.std(similarity_matrix)
|
|
max_similarity = np.max(similarity_matrix)
|
|
min_similarity = np.min(similarity_matrix)
|
|
print(f"文件夹 {dir_name} 的相似度矩阵已计算完成 "
|
|
f"均值:{mean_similarity} 标准差:{std_similarity} 最大值:{max_similarity} 最小值:{min_similarity}")
|
|
result = f"{os.path.basename(standard_folder)} {dir_name} {mean_similarity:.3f} {std_similarity:.3f} {max_similarity:.3f} {min_similarity:.3f}"
|
|
with open(self.conf['data']['result_txt'], 'a') as f:
|
|
f.write(result + '\n')
|
|
|
|
|
|
def read_result_txt():
|
|
parts = []
|
|
value_num = 2
|
|
with open('../configs/pic_pic_similar.yml', 'r') as f:
|
|
conf = yaml.load(f, Loader=yaml.FullLoader)
|
|
f.close()
|
|
with open(conf['data']['result_txt'], 'r') as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line:
|
|
parts.append(line.split(' '))
|
|
parts = np.array(parts)
|
|
print(parts)
|
|
labels = ['Mean', 'Std', 'Max', 'Min']
|
|
while value_num < 6:
|
|
dicts = {}
|
|
for barcode, value in zip(parts[:, 1], parts[:, value_num]):
|
|
if barcode in dicts:
|
|
dicts[barcode].append(float(value))
|
|
else:
|
|
dicts[barcode] = [float(value)]
|
|
get_histogram(dicts, labels[value_num - 2])
|
|
value_num += 1
|
|
f.close()
|
|
|
|
|
|
def get_histogram(data, label=None):
|
|
# 准备数据
|
|
categories = list(data.keys())
|
|
values1 = [data[cat][0] for cat in categories] # 第一个值
|
|
values2 = [data[cat][1] for cat in categories] # 第二个值
|
|
|
|
# 设置柱状图的位置
|
|
x = np.arange(len(categories)) # 标签位置
|
|
width = 0.35 # 柱状图的宽度
|
|
|
|
# 创建图形和轴
|
|
fig, ax = plt.subplots(figsize=(10, 6))
|
|
|
|
# 绘制柱状图
|
|
bars1 = ax.bar(x - width / 2, values1, width, label='standard', color='red', alpha=0.7)
|
|
bars2 = ax.bar(x + width / 2, values2, width, label='standard_slim', color='green', alpha=0.7)
|
|
|
|
# 在每个柱状图上显示数值
|
|
for bar in bars1:
|
|
height = bar.get_height()
|
|
ax.annotate(f'{height:.3f}',
|
|
xy=(bar.get_x() + bar.get_width() / 2, height),
|
|
xytext=(0, 3), # 3点垂直偏移
|
|
textcoords="offset points",
|
|
ha='center', va='bottom',
|
|
fontsize=12)
|
|
|
|
for bar in bars2:
|
|
height = bar.get_height()
|
|
ax.annotate(f'{height:.3f}',
|
|
xy=(bar.get_x() + bar.get_width() / 2, height),
|
|
xytext=(0, 3), # 3点垂直偏移
|
|
textcoords="offset points",
|
|
ha='center', va='bottom',
|
|
fontsize=12)
|
|
|
|
# 添加标签和标题
|
|
if label is None:
|
|
label = ''
|
|
ax.set_xlabel('barcode')
|
|
ax.set_ylabel('Values')
|
|
ax.set_title(label)
|
|
ax.set_xticks(x)
|
|
ax.set_xticklabels(categories)
|
|
ax.legend()
|
|
|
|
# 添加网格
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
# 调整布局并显示
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
picTopic_matrix = picDirSimilarAnalysis()
|
|
picTopic_matrix.get_group_similarity_matrix('/home/lc/data_center/image_analysis/pic_pic_similar_maxtrix')
|
|
# read_result_txt()
|