# -*- coding: utf-8 -*- """ Created on Fri Aug 30 17:53:03 2024 功能:1:1比对性能测试程序 1. 基于标准特征集所对应的原始图像样本,生成标准特征集并保存。 func: generate_event_and_stdfeatures(): (1) get_std_barcodeDict(stdSamplePath, stdBarcodePath) 提取 stdSamplePath 中样本地址,生成字典{barcode: [imgpath1, imgpath1, ...]} 并存储为 pickle 文件,barcode.pickle''' (2) stdfeat_infer(stdBarcodePath, stdFeaturePath, bcdSet=None) 标准特征提取,并保存至文件夹 stdFeaturePath 中, 也可在运行过程中根据与购物事件集合 barcodes 交集执行 2. 1:1 比对性能测试, func: one2one_simi() (1) 求购物事件和标准特征级 Barcode 交集,构造 evtDict、stdDict (2) 构造扫 A 放 A、扫 A 放 B 组合,mergePairs = AA_list + AB_list (3) 循环计算 mergePairs 中元素 "(A, A) 或 (A, B)" 相似度; 对于未保存的轨迹图像或标准 barcode 图像,保存图像 (4) 保存计算结果 3. precise、recall等指标计算 func: compute_one2one_pr(pickpath) @author: ym """ import numpy as np import cv2 import os import sys import random import pickle import json import random import copy import sys # import torch import time # import json from pathlib import Path from scipy.spatial.distance import cdist import matplotlib.pyplot as plt import shutil from datetime import datetime # from openpyxl import load_workbook, Workbook # from config import config as conf # from model import resnet18 as resnet18 # from feat_inference import inference_image sys.path.append(r"D:\DetectTracking") from tracking.utils.read_data import extract_data, read_tracking_output, read_similar, read_deletedBarcode_file from tracking.utils.plotting import Annotator, colors from feat_extract.config import config as conf from feat_extract.inference import FeatsInterface from utils.event import ShoppingEvent, save_data from genfeats import gen_bcd_features from event_test import calc_simil def int8_to_ft16(arr_uint8, amin, amax): arr_ft16 = (arr_uint8 / 255 * (amax-amin) + amin).astype(np.float16) return arr_ft16 def ft16_to_uint8(arr_ft16): # pickpath = r"\\192.168.1.28\share\测试_202406\contrast\std_features_ft32vsft16\6902265587712_ft16.pickle" # with open(pickpath, 'rb') as f: # edict = pickle.load(f) # arr_ft16 = edict['feats'] amin = np.min(arr_ft16) amax = np.max(arr_ft16) arr_ft255 = (arr_ft16 - amin) * 255 / (amax-amin) arr_uint8 = arr_ft255.astype(np.uint8) arr_ft16_ = int8_to_ft16(arr_uint8, amin, amax) arrDistNorm = np.linalg.norm(arr_ft16_ - arr_ft16) / arr_ft16_.size return arr_uint8, arr_ft16_ def data_precision_compare(stdfeat, evtfeat, evtMessage, save=True): evt, stdbcd, label = evtMessage rltdata, rltdata_ft16, rltdata_ft16_ = [], [], [] matrix = 1 - cdist(stdfeat, evtfeat, 'cosine') simi_mean = np.mean(matrix) simi_max = np.max(matrix) stdfeatm = np.mean(stdfeat, axis=0, keepdims=True) evtfeatm = np.mean(evtfeat, axis=0, keepdims=True) simi_mfeat = 1- np.maximum(0.0, cdist(stdfeatm, evtfeatm, 'cosine')) rltdata = [label, stdbcd, evt, simi_mean, simi_max, simi_mfeat[0,0]] ##================================================================= float16 stdfeat_ft16 = stdfeat.astype(np.float16) evtfeat_ft16 = evtfeat.astype(np.float16) stdfeat_ft16 /= np.linalg.norm(stdfeat_ft16, axis=1)[:, None] evtfeat_ft16 /= np.linalg.norm(evtfeat_ft16, axis=1)[:, None] matrix_ft16 = 1 - cdist(stdfeat_ft16, evtfeat_ft16, 'cosine') simi_mean_ft16 = np.mean(matrix_ft16) simi_max_ft16 = np.max(matrix_ft16) stdfeatm_ft16 = np.mean(stdfeat_ft16, axis=0, keepdims=True) evtfeatm_ft16 = np.mean(evtfeat_ft16, axis=0, keepdims=True) simi_mfeat_ft16 = 1- np.maximum(0.0, cdist(stdfeatm_ft16, evtfeatm_ft16, 'cosine')) rltdata_ft16 = [label, stdbcd, evt, simi_mean_ft16, simi_max_ft16, simi_mfeat_ft16[0,0]] '''****************** uint8 is ok!!!!!! ******************''' ##=================================================================== uint8 # stdfeat_uint8, stdfeat_ft16_ = ft16_to_uint8(stdfeat_ft16) # evtfeat_uint8, evtfeat_ft16_ = ft16_to_uint8(evtfeat_ft16) stdfeat_uint8 = (stdfeat_ft16*128).astype(np.int8) evtfeat_uint8 = (evtfeat_ft16*128).astype(np.int8) stdfeat_ft16_ = stdfeat_uint8.astype(np.float16)/128 evtfeat_ft16_ = evtfeat_uint8.astype(np.float16)/128 absdiff = np.linalg.norm(stdfeat_ft16_ - stdfeat) / stdfeat.size matrix_ft16_ = 1 - cdist(stdfeat_ft16_, evtfeat_ft16_, 'cosine') simi_mean_ft16_ = np.mean(matrix_ft16_) simi_max_ft16_ = np.max(matrix_ft16_) stdfeatm_ft16_ = np.mean(stdfeat_ft16_, axis=0, keepdims=True) evtfeatm_ft16_ = np.mean(evtfeat_ft16_, axis=0, keepdims=True) simi_mfeat_ft16_ = 1- np.maximum(0.0, cdist(stdfeatm_ft16_, evtfeatm_ft16_, 'cosine')) rltdata_ft16_ = [label, stdbcd, evt, simi_mean_ft16_, simi_max_ft16_, simi_mfeat_ft16_[0,0]] if not save: return ##========================================================= save as float32 rppath = os.path.join(similPath, f'{evt}_ft32.pickle') with open(rppath, 'wb') as f: pickle.dump(rltdata, f) rtpath = os.path.join(similPath, f'{evt}_ft32.txt') with open(rtpath, 'w', encoding='utf-8') as f: for result in rltdata: part = [f"{x:.3f}" if isinstance(x, float) else str(x) for x in result] line = ', '.join(part) f.write(line + '\n') ##========================================================= save as float16 rppath_ft16 = os.path.join(similPath, f'{evt}_ft16.pickle') with open(rppath_ft16, 'wb') as f: pickle.dump(rltdata_ft16, f) rtpath_ft16 = os.path.join(similPath, f'{evt}_ft16.txt') with open(rtpath_ft16, 'w', encoding='utf-8') as f: for result in rltdata_ft16: part = [f"{x:.3f}" if isinstance(x, float) else str(x) for x in result] line = ', '.join(part) f.write(line + '\n') ##=========================================================== save as uint8 rppath_uint8 = os.path.join(similPath, f'{evt}_uint8.pickle') with open(rppath_uint8, 'wb') as f: pickle.dump(rltdata_ft16_, f) rtpath_uint8 = os.path.join(similPath, f'{evt}_uint8.txt') with open(rtpath_uint8, 'w', encoding='utf-8') as f: for result in rltdata_ft16_: part = [f"{x:.3f}" if isinstance(x, float) else str(x) for x in result] line = ', '.join(part) f.write(line + '\n') def simi_calc(event, stdfeat): evtfeat = event.feats_compose if isinstance(event.feats_select, list): if len(event.feats_select) and len(event.feats_select[0]): evtfeat = event.feats_select[0] else: return None, None, None else: evtfeat = event.feats_select if len(evtfeat)==0 or len(stdfeat)==0: return None, None, None evtfeat /= np.linalg.norm(evtfeat, axis=1)[:, None] stdfeat /= np.linalg.norm(stdfeat, axis=1)[:, None] matrix = 1 - cdist(evtfeat, stdfeat, 'cosine') matrix[matrix < 0] = 0 simi_mean = np.mean(matrix) simi_max = np.max(matrix) stdfeatm = np.mean(stdfeat, axis=0, keepdims=True) evtfeatm = np.mean(evtfeat, axis=0, keepdims=True) simi_mfeat = 1- np.maximum(0.0, cdist(stdfeatm, evtfeatm, 'cosine')) return simi_mean, simi_max, simi_mfeat[0,0] def build_std_evt_dict(): ''' stdFeaturePath: 标准特征集地址 eventDataPath: Event对象地址 ''' stdBarcode = [p.stem for p in Path(stdFeaturePath).iterdir() if p.is_file() and (p.suffix=='.json' or p.suffix=='.pickle')] '''*********** USearch ***********''' # stdFeaturePath = r"D:\contrast\stdlib\v11_test.json" # stdBarcode = [] # stdlib = {} # with open(stdFeaturePath, 'r', encoding='utf-8') as f: # data = json.load(f) # for dic in data['total']: # barcode = dic['key'] # feature = np.array(dic['value']) # stdBarcode.append(barcode) # stdlib[barcode] = feature '''======1. 购物事件列表,该列表中的 Barcode 存在于标准的 stdBarcode 内 ===''' evtList = [(p.stem, p.stem.split('_')[-1]) for p in Path(eventDataPath).iterdir() if p.is_file() and p.suffix=='.pickle' and (len(p.stem.split('_'))==2 or len(p.stem.split('_'))==3) and p.stem.split('_')[-1].isdigit() and p.stem.split('_')[-1] in stdBarcode ] barcodes = set([bcd for _, bcd in evtList]) '''======2. 构建用于比对的标准特征字典 =============''' stdDict = {} for stdfile in os.listdir(stdFeaturePath): barcode, ext = os.path.splitext(stdfile) if barcode not in barcodes: continue stdpath = os.path.join(stdFeaturePath, stdfile) if ext == ".json": with open(stdpath, 'r', encoding='utf-8') as f: stddata = json.load(f) feat = np.array(stddata["value"]) stdDict[barcode] = feat if ext == ".pickle": with open(stdpath, 'rb') as f: stddata = pickle.load(f) feat = stddata["feats_ft32"] stdDict[barcode] = feat '''*********** USearch ***********''' # stdDict = {} # for barcode in barcodes: # stdDict[barcode] = stdlib[barcode] '''======3. 构建用于比对的操作事件字典 =============''' evtDict = {} for evtname, barcode in evtList: evtpath = os.path.join(eventDataPath, evtname+'.pickle') with open(evtpath, 'rb') as f: evtdata = pickle.load(f) evtDict[evtname] = evtdata return evtList, evtDict, stdDict def one2SN_pr(evtList, evtDict, stdDict): std_barcodes = set([bcd for _, bcd in evtList]) tp_events, fn_events, fp_events, tn_events = [], [], [], [] tp_simi, fn_simi, tn_simi, fp_simi = [], [], [], [] errorFile_one2SN = [] SN = 9 for evtname, barcode in evtList: bcd_selected = [barcode] dset = list(std_barcodes - set([barcode])) if len(dset) > SN: random.shuffle(dset) bcd_selected.extend(dset[:SN]) else: bcd_selected.extend(dset) event = evtDict[evtname] ## 无轨迹判断 if len(event.front_feats)+len(event.back_feats)==0: print(evtname) continue barcodes, similars = [], [] for stdbcd in bcd_selected: stdfeat = stdDict[stdbcd] simi_mean, simi_max, simi_mfeat = simi_calc(event, stdfeat) # simi_mean = calc_simil(event, stdfeat) ## 在event.front_feats和event.back_feats同时为空时,此处不需要保护 # if simi_mean==None: # continue barcodes.append(stdbcd) similars.append(simi_mean) ## 此处不需要保护 # if len(similars)==0: # print(evtname) # continue max_idx = similars.index(max(similars)) max_sim = similars[max_idx] for i in range(len(barcodes)): bcd, simi = barcodes[i], similars[i] if bcd==barcode and simi==max_sim: tp_simi.append(simi) tp_events.append(evtname) elif bcd==barcode and simi!=max_sim: fn_simi.append(simi) fn_events.append(evtname) elif bcd!=barcode and simi!=max_sim: tn_simi.append(simi) tn_events.append(evtname) elif bcd!=barcode and simi==max_sim and barcode in barcodes: fp_simi.append(simi) fp_events.append(evtname) else: errorFile_one2SN.append(evtname) PPreciseX, PRecallX = [], [] NPreciseX, NRecallX = [], [] Thresh = np.linspace(-0.2, 1, 100) for th in Thresh: '''适用于 (Precise, Recall) 计算方式:多个相似度计算并排序,barcode相等且排名第一为 TP ''' '''===================================== 1:SN ''' TPX = sum(np.array(tp_simi) >= th) FPX = sum(np.array(fp_simi) >= th) FNX = sum(np.array(fn_simi) < th) TNX = sum(np.array(tn_simi) < th) PPreciseX.append(TPX/(TPX+FPX+1e-6)) PRecallX.append(TPX/(len(tp_simi)+len(fn_simi)+1e-6)) NPreciseX.append(TNX/(TNX+FNX+1e-6)) NRecallX.append(TNX/(len(tn_simi)+len(fp_simi)+1e-6)) fig, ax = plt.subplots() ax.plot(Thresh, PPreciseX, 'r', label='Precise_Pos: TP/TPFP') ax.plot(Thresh, PRecallX, 'b', label='Recall_Pos: TP/TPFN') ax.plot(Thresh, NPreciseX, 'g', label='Precise_Neg: TN/TNFP') ax.plot(Thresh, NRecallX, 'c', label='Recall_Neg: TN/TNFN') ax.set_xlim([0, 1]) ax.set_ylim([0, 1]) ax.grid(True) ax.set_title('1:SN Precise & Recall') ax.set_xlabel(f"Event Num: {len(evtList)}") ax.legend() plt.show() ## ============================= 1:N 展厅 直方图''' fig, axes = plt.subplots(2, 2) axes[0, 0].hist(tp_simi, bins=60, range=(-0.2, 1), edgecolor='black') axes[0, 0].set_xlim([-0.2, 1]) axes[0, 0].set_title(f'TP({len(tp_simi)})') axes[0, 1].hist(fp_simi, bins=60, range=(-0.2, 1), edgecolor='black') axes[0, 1].set_xlim([-0.2, 1]) axes[0, 1].set_title(f'FP({len(fp_simi)})') axes[1, 0].hist(tn_simi, bins=60, range=(-0.2, 1), edgecolor='black') axes[1, 0].set_xlim([-0.2, 1]) axes[1, 0].set_title(f'TN({len(tn_simi)})') axes[1, 1].hist(fn_simi, bins=60, range=(-0.2, 1), edgecolor='black') axes[1, 1].set_xlim([-0.2, 1]) axes[1, 1].set_title(f'FN({len(fn_simi)})') plt.show() def one2one_simi(evtList, evtDict, stdDict): barcodes = set([bcd for _, bcd in evtList]) '''======1 构造 3 个事件对: 扫 A 放 A, 扫 A 放 B, 合并 ====================''' AA_list = [(evtname, barcode, "same") for evtname, barcode in evtList] AB_list = [] for evtname, barcode in evtList: dset = list(barcodes.symmetric_difference(set([barcode]))) if len(dset): idx = random.randint(0, len(dset)-1) AB_list.append((evtname, dset[idx], "diff")) mergePairs = AA_list + AB_list '''======2 计算事件、标准特征集相似度 ==================''' rltdata = [] for i in range(len(mergePairs)): evtname, stdbcd, label = mergePairs[i] event = evtDict[evtname] if len(event.feats_compose)==0: continue stdfeat = stdDict[stdbcd] # float32 simi_mean, simi_max, simi_mfeat = simi_calc(event, stdfeat) if simi_mean is None: continue rltdata.append((label, stdbcd, evtname, simi_mean, simi_max, simi_mfeat)) '''================ float32、16、int8 精度比较与存储 =============''' # data_precision_compare(stdfeat, evtfeat, mergePairs[i], save=True) return rltdata def one2one_pr(rltdata): Same, Cross = [], [] for label, stdbcd, evtname, simi_mean, simi_max, simi_mft in rltdata: if label == "same": Same.append(simi_max) if label == "diff": Cross.append(simi_max) Same = np.array(Same) Cross = np.array(Cross) TPFN = len(Same) TNFP = len(Cross) # fig, axs = plt.subplots(2, 1) # axs[0].hist(Same, bins=60, range=(-0.2, 1), edgecolor='black') # axs[0].set_xlim([-0.2, 1]) # axs[0].set_title(f'Same Barcode, Num: {TPFN}') # axs[1].hist(Cross, bins=60, range=(-0.2, 1), edgecolor='black') # axs[1].set_xlim([-0.2, 1]) # axs[1].set_title(f'Cross Barcode, Num: {TNFP}') # plt.savefig(f'./result/{file}_hist.png') # svg, png, pdf Recall_Pos, Recall_Neg = [], [] Precision_Pos, Precision_Neg = [], [] Correct = [] Thresh = np.linspace(-0.2, 1, 100) for th in Thresh: TP = np.sum(Same > th) FN = TPFN - TP TN = np.sum(Cross < th) FP = TNFP - TN Recall_Pos.append(TP/TPFN) Recall_Neg.append(TN/TNFP) Precision_Pos.append(TP/(TP+FP+1e-6)) Precision_Neg.append(TN/(TN+FN+1e-6)) Correct.append((TN+TP)/(TPFN+TNFP)) fig, ax = plt.subplots() ax.plot(Thresh, Correct, 'r', label='Correct: (TN+TP)/(TPFN+TNFP)') ax.plot(Thresh, Recall_Pos, 'b', label='Recall_Pos: TP/TPFN') ax.plot(Thresh, Recall_Neg, 'g', label='Recall_Neg: TN/TNFP') ax.plot(Thresh, Precision_Pos, 'c', label='Precision_Pos: TP/(TP+FP)') ax.plot(Thresh, Precision_Neg, 'm', label='Precision_Neg: TN/(TN+FN)') ax.set_xlim([0, 1]) ax.set_ylim([0, 1]) ax.grid(True) ax.set_title('PrecisePos & PreciseNeg') ax.set_xlabel(f"Same Num: {TPFN}, Cross Num: {TNFP}") ax.legend() plt.show() rltpath = os.path.join(similPath, 'pr.png') plt.savefig(rltpath) # svg, png, pdf fig, axes = plt.subplots(2,1) axes[0].hist(Same, bins=60, range=(-0.2, 1), edgecolor='black') axes[0].set_xlim([-0.2, 1]) axes[0].set_title(f'TP({len(Same)})') axes[1].hist(Cross, bins=60, range=(-0.2, 1), edgecolor='black') axes[1].set_xlim([-0.2, 1]) axes[1].set_title(f'TN({len(Cross)})') rltpath = os.path.join(similPath, 'hist.png') plt.savefig(rltpath) plt.show() def gen_eventdict(sourcePath, saveimg=True): k, errEvents = 0, [] for source_path in sourcePath: evtpath, bname = os.path.split(source_path) ## 兼容事件的两种情况:文件夹 和 Yolo-Resnet-Tracker 的输出 if os.path.isfile(source_path): bname, ext = os.path.splitext(bname) evt = bname.split("_") evt = bname.split('_') condt = len(evt)>=2 and evt[-1].isdigit() and len(evt[-1])>=10 if not condt: continue # bname = r"20241126-135911-bdf91cf9-3e9a-426d-94e8-ddf92238e175_6923555210479" # source_path = os.path.join(evtpath, bname) # 如果已完成事件生成,则不执行 pickpath = os.path.join(eventDataPath, f"{bname}.pickle") if os.path.isfile(pickpath): continue # event = ShoppingEvent(source_path, stype="data") # with open(pickpath, 'wb') as f: # pickle.dump(event, f) try: event = ShoppingEvent(source_path, stype="source") # save_data(event, resultPath) with open(pickpath, 'wb') as f: pickle.dump(event, f) print(bname) except Exception as e: errEvents.append(source_path) print(e) # k += 1 # if k==1: # break errfile = os.path.join(resultPath, 'error_events.txt') with open(errfile, 'w', encoding='utf-8') as f: for line in errEvents: f.write(line + '\n') def init_std_evt_dict(): '''==== 0. 生成事件列表和对应的 Barcodes列表 ===========''' bcdList, event_spath = [], [] for evtpath in eventSourcePath: for evtname in os.listdir(evtpath): bname, ext = os.path.splitext(evtname) ## 处理事件的两种情况:文件夹 和 Yolo-Resnet-Tracker 的输出 fpath = os.path.join(evtpath, evtname) if os.path.isfile(fpath) and (ext==".pkl" or ext==".pickle"): evt = bname.split('_') elif os.path.isdir(fpath): evt = evtname.split('_') else: continue if len(evt)>=2 and evt[-1].isdigit() and len(evt[-1])>=10: bcdList.append(evt[-1]) event_spath.append(os.path.join(evtpath, evtname)) '''==== 1. 生成标准特征集, 只需运行一次, 在 genfeats.py 中实现 ===========''' bcdSet = set(bcdList) gen_bcd_features(stdSamplePath, stdBarcodePath, stdFeaturePath, bcdSet) print("stdFeats have generated and saved!") '''==== 2. 生成事件字典, 只需运行一次 ===============''' gen_eventdict(event_spath) print("eventList have generated and saved!") def test_one2one(): '''1:1性能评估''' # 1. 只需运行一次,生成事件字典和相应的标准特征库字典 init_std_evt_dict() # 2. 基于事件barcode集和标准库barcode交集构造事件集合 evtList, evtDict, stdDict = build_std_evt_dict() rltdata = one2one_simi(evtList, evtDict, stdDict) one2one_pr(rltdata) def test_one2SN(): '''1:SN性能评估''' # 1. 只需运行一次,生成事件字典和相应的标准特征库字典 init_std_evt_dict() # 2. 事件barcode集和标准库barcode求交集 evtList, evtDict, stdDict = build_std_evt_dict() one2SN_pr(evtList, evtDict, stdDict) if __name__ == '__main__': ''' 共7个地址: (1) stdSamplePath: 用于生成比对标准特征集的原始图像地址 (2) stdBarcodePath: 比对标准特征集原始图像地址的pickle文件存储,{barcode: [imgpath1, imgpath1, ...]} (3) stdFeaturePath: 比对标准特征集特征存储地址 (4) eventSourcePath: 事件地址 (5) resultPath: 结果存储地址 (6) eventDataPath: 用于1:1比对的购物事件存储地址,在resultPath下 (7) similPath: 1:1比对结果存储地址(事件级),在resultPath下 ''' # stdSamplePath = r"\\192.168.1.28\share\数据\已完成数据\展厅数据\v1.0\比对数据\整理\zhantingBase" # stdBarcodePath = r"D:\exhibition\dataset\bcdpath" # stdFeaturePath = r"\\192.168.1.28\share\数据\已完成数据\比对数据\barcode\all_totalBarocde\features_json\v11_barcode_11592" # eventSourcePath = [r'D:\exhibition\images\20241202'] # eventSourcePath = [r"\\192.168.1.28\share\测试视频数据以及日志\各模块测试记录\展厅测试\1129_展厅模型v801测试组测试"] stdSamplePath = r"\\192.168.1.28\share\数据\已完成数据\比对数据\barcode\all_totalBarocde\totalBarcode" stdBarcodePath = r"D:\全实时\source_data\bcdpath" stdFeaturePath = r"D:\全实时\source_data\stdfeats" eventSourcePath = [r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\result\ShoppingDict_pkfile"] resultPath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\result\contrast" eventDataPath = os.path.join(resultPath, "evtobjs") similPath = os.path.join(resultPath, "simidata") if not os.path.exists(eventDataPath): os.makedirs(eventDataPath) if not os.path.exists(similPath): os.makedirs(similPath) # test_one2one() test_one2SN()