import os.path import shutil import numpy as np from ytracking.track_ import * from contrast.test_logic import group_image, inference from tools.Interface import AiInterface, AiClass from tools.config import cfg, gvalue from tools.initModel import models from scipy.spatial.distance import cdist from dealdata import get_keams import matplotlib.pyplot as plt from contrast.model.resnet_pre import resnet18 from prettytable import PrettyTable from sklearn.cluster import KMeans models.initModel() ai_obj = AiClass() def showComprehensiveHistogram(data, title): bins = np.arange(0, 1.01, 0.1) plt.hist(data, bins, edgecolor='black') plt.title(title) plt.xlabel('Similarity') plt.ylabel('Frequency') # plt.show() plt.savefig(title + '.png') def showHistogram(): fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(10, 8)) bins = np.arange(0, 1.01, 0.1) axs[0, 0].hist(gvalue.back_return_similarity, bins=bins, edgecolor='black') axs[0, 0].set_title('back_return_similarity') axs[0, 0].set_xlabel('Similarity') axs[0, 0].set_ylabel('Frequency') axs[0, 0].legend(labels=['back_return_similarity']) axs[0, 1].hist(gvalue.back_add_similarity, bins=bins, edgecolor='black') axs[0, 1].set_title('back_add_similarity') axs[0, 1].set_xlabel('Similarity') axs[0, 1].set_ylabel('Frequency') axs[0, 1].legend(labels=['back_add_similarity']) axs[1, 0].hist(gvalue.front_return_similarity, bins=bins, edgecolor='black') axs[1, 0].set_title('front_return_similarity') axs[1, 0].set_xlabel('Similarity') axs[1, 0].set_ylabel('Frequency') axs[1, 0].legend(labels=['front_return_similarity']) axs[1, 1].hist(gvalue.front_add_similarity, bins=bins, edgecolor='black') axs[1, 1].set_title('front_add_similarity') axs[1, 1].set_xlabel('Similarity') axs[1, 1].set_ylabel('Frequency') axs[1, 1].legend(labels=['front_add_similarity']) # 显示图形 plt.tight_layout() plt.savefig('multiple_histograms.png') plt.close(fig) # plt.show() def showgrid(): y_back_return = get_count_number(gvalue.back_return_similarity) y_back_add = get_count_number(gvalue.back_add_similarity) y_front_return = get_count_number(gvalue.front_return_similarity) y_front_add = get_count_number(gvalue.front_add_similarity) y_comprehensive = get_count_number(gvalue.comprehensive_similarity) x = np.linspace(start=0.1, stop=1.0, num=10, endpoint=True).tolist() plt.figure(figsize=(10, 6)) plt.plot(x, y_back_return, color='red', label='back_return') plt.plot(x, y_back_add, color='blue', label='back_add') plt.plot(x, y_front_return, color='green', label='front_return') plt.plot(x, y_front_add, color='purple', label='front_add') plt.plot(x, y_comprehensive, color='orange', label='comprehensive') plt.legend() plt.xlabel('Similarity') plt.ylabel('Frequency') plt.grid(True, linestyle='--', alpha=0.5) plt.savefig('multiple_grid.png') plt.close() def showtable(): # 在指定相似度下离群点统计 temp_lists = [get_count_number(gvalue.back_return_similarity), get_count_number(gvalue.back_add_similarity), get_count_number(gvalue.front_return_similarity), get_count_number(gvalue.front_add_similarity), get_count_number(gvalue.comprehensive_similarity)] rows = [] table = PrettyTable() tablename = ['back_return', 'back_add', 'front_return', 'front_add', 'comprehensive'] table.field_names = ['name', '0.1', '0.2', '0.3', '0.4', '0.5', '0.6', '0.7', '0.8', '0.9', '1.0'] for List, name in zip(temp_lists, tablename): o_data = [round(data / List[-1], 3) for data in List] o_data.insert(0, name) rows.append(o_data) # print(rows) table.add_rows(rows) print(table) def compute_similarity_matrix(featurelists): """计算图片之间的余弦相似度矩阵""" # 计算所有向量对之间的余弦相似度 cosine_similarities = 1 - cdist(featurelists, featurelists, metric='cosine') cosine_similarities = np.around(cosine_similarities, decimals=3) return cosine_similarities def remove_empty_folders(root_dir): for foldername, subfolders, files in os.walk(root_dir): if not subfolders and not files: # 如果当前文件夹无子文件夹且无文件 print(f"Removing empty folder: {foldername}") try: shutil.rmtree(foldername) # 删除空文件夹 except Exception as e: print(f"Error removing folder {foldername}: {e}") def cosine_similarity(vec_mean, vecs, k=False, y_pred=None): # 余弦角相似度 all_similarity = [] if not k: vec_mean = np.array(vec_mean) for ovec in vecs: ovec = np.array(ovec) cos_sim = ovec.dot(vec_mean) / (np.linalg.norm(vec_mean) * np.linalg.norm(ovec)) all_similarity.append(cos_sim) else: for nu, ks in enumerate(y_pred): ovec = np.array(vecs[nu]) vecmean = np.array(vec_mean[ks]) cos_sim = ovec.dot(vecmean) / (np.linalg.norm(vecmean) * np.linalg.norm(ovec)) all_similarity.append(cos_sim) # print(all_similarity) return all_similarity def get_count_number(numbers): count_less = [] thresholds = np.linspace(start=0.1, stop=1.0, num=10, endpoint=True).tolist() for threshold in thresholds: count_less.append(sum(map(lambda x: x < threshold, numbers))) print(count_less) return count_less def shuntVideo_imgs(obj: AiInterface, rootpth, vpth, ): # 制作单trackid下的相似度矩阵 videospth = os.sep.join([rootpth, vpth]) for videoname in os.listdir(videospth): if videoname.endswith('mp4'): cameraId = '0' if videoname.split('_')[2] == 'back' else '1' videopth = os.sep.join([videospth, videoname]) save_imgs_dir = os.sep.join([rootpth, 'images', videoname.split('.')[0]]) if not os.path.exists(save_imgs_dir): os.makedirs(save_imgs_dir) track_boxes, features_dict, frame_id_img = run(models, source=videopth) allimages, trackIdList = obj.getTrackingBox(track_boxes, features_dict, cameraId, frame_id_img, save_imgs_dir) featList = get_feature_list(allimages) cosine_similarities = compute_similarity_matrix(featList) print(len(cosine_similarities)) print(cosine_similarities) def get_feature_list(allimages, actionModel=True): featList = [] groups = group_image(allimages, batch=64) if not actionModel: groups = [groups] for group in groups: for img in group: feat_tensor = inference(img, models.similarityModel, actionModel) for fe in feat_tensor: if fe.device == 'cpu': fe_np = fe.squeeze().detach().numpy() else: fe_np = fe.squeeze().detach().cpu().numpy() featList.append(fe_np) return featList def k_similarity(imgs_pth, k, actionModel=False): # k个聚类中心向量与每个图片的相似度 remove_empty_folders(imgs_pth) for imgdirs in os.listdir(imgs_pth): imgpth = [] for img in os.listdir(os.sep.join([imgs_pth, imgdirs])): imgpth.append(os.sep.join([imgs_pth, imgdirs, img])) featList = get_feature_list(imgpth, actionModel) # assert all(len(lst) == len(featList[0]) for lst in featList) if len(featList) < k: continue featList = np.array(featList) Kmeans = KMeans(n_clusters=k) y_pred = Kmeans.fit_predict(featList) ores = cosine_similarity(Kmeans.cluster_centers_, featList, k=True, y_pred=y_pred) if 'back_return' in imgdirs: gvalue.back_return_similarity += ores elif 'back_add' in imgdirs: gvalue.back_add_similarity += ores elif 'front_return' in imgdirs: gvalue.front_return_similarity += ores elif 'front_add' in imgdirs: gvalue.front_add_similarity += ores gvalue.comprehensive_similarity += ores showtable() # 离群点表格 def average_similarity(imgs_pth, actionModel=False): # 平均向量与每个图片的相似度 remove_empty_folders(imgs_pth) for imgdirs in os.listdir(imgs_pth): imgpth = [] if len(os.listdir(os.sep.join([imgs_pth, imgdirs]))) < 10: continue for img in os.listdir(os.sep.join([imgs_pth, imgdirs])): imgpth.append(os.sep.join([imgs_pth, imgdirs, img])) featList = get_feature_list(imgpth, actionModel) assert all(len(lst) == len(featList[0]) for lst in featList) vec_mean = [sum(column) / len(featList) for column in zip(*featList)] ores = cosine_similarity(vec_mean, featList) if 'back_return' in imgdirs: gvalue.back_return_similarity += ores elif 'back_add' in imgdirs: gvalue.back_add_similarity += ores elif 'front_return' in imgdirs: gvalue.front_return_similarity += ores elif 'front_add' in imgdirs: gvalue.front_add_similarity += ores gvalue.comprehensive_similarity += ores showHistogram() # 绘制直方图 showgrid() # 绘制折线图 showtable() # 离群点表格 showComprehensiveHistogram(gvalue.comprehensive_similarity, 'comprehensive_similarity') def barcode_similarity(rootpths): for dir in os.listdir(rootpths): if dir == 'barcode_similarity': continue new_dir = os.sep.join([rootpths, 'barcode_similarity', dir]) if not os.path.exists(new_dir): os.makedirs(new_dir) else: continue rootpth = os.sep.join([rootpths, dir]) # 6934660520292 imgs_pth = [os.sep.join([rootpth, name]) for name in os.listdir(rootpth)] featList = get_feature_list(imgs_pth, False) cosine_similarities = compute_similarity_matrix(featList) num = 0 for i in range(cosine_similarities.shape[0]): cols = np.where(cosine_similarities[i, :] > 0.5)[0] if len(cols) > num: num = len(cols) max_cols = cols imgPth = [os.sep.join([rootpth, imgName]) for imgName in [os.listdir(rootpth)[i] for i in max_cols]] for img in imgPth: try: shutil.copy(img, new_dir) except Exception as e: print(e) continue # shutil.copy(img, new_dir) # print(imgPth) # print(featList) # print(imgs_pth) def compare_two_img(img1, img2): img1_feature = get_feature_list(img1, False)[0] img2_feature = get_feature_list(img2, False)[0] cos_sim = img1_feature.dot(img2_feature) / (np.linalg.norm(img1_feature) * np.linalg.norm(img2_feature)) print(cos_sim) if __name__ == '__main__': rootpth = 'Single_purchase_data' ''' 制作单trackid下的相似度矩阵 ''' # vpth = 'test_video' # shuntVideo_imgs(ai_obj, rootpth, vpth) ''' 平均向量与每个图片的相似度 ''' imgs_pth = os.sep.join([rootpth, 'images']) average_similarity(imgs_pth) ''' k值聚类中心向量与每个图片的相似度 ''' # imgs_pth = os.sep.join([rootpth, 'images']) # k_similarity(imgs_pth, k=3) ''' 计算筛选单个barcode相似度集中最多的图片 ''' # rootpths = 'data_test' # barcode_similarity(rootpths) ''' 对比两张图的相似度 ''' # img1 = ['C:/Users/HP/Desktop/maskBackImg.jpg'] # img2 = ['C:/Users/HP/Desktop/frontImgMask.jpg'] # compare_two_img(img1, img2)