Files
detecttracking/contrast/event_test.py
2025-04-11 17:02:39 +08:00

375 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Mon Dec 16 18:56:18 2024
@author: ym
"""
import os
import cv2
import json
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from matplotlib import rcParams
from matplotlib.font_manager import FontProperties
from scipy.spatial.distance import cdist
from utils.event import ShoppingEvent, save_data
from utils.calsimi import calsimi_vs_stdfeat_new, get_topk_percent, cluster
from utils.tools import get_evtList
import pickle
rcParams['font.sans-serif'] = ['SimHei'] # 用黑体显示中文
rcParams['axes.unicode_minus'] = False # 正确显示负号
'''*********** USearch ***********'''
def read_usearch():
stdFeaturePath = r"D:\contrast\stdlib\v11_test.json"
stdBarcode = []
stdlib = {}
with open(stdFeaturePath, 'r', encoding='utf-8') as f:
data = json.load(f)
for dic in data['total']:
barcode = dic['key']
feature = np.array(dic['value'])
stdBarcode.append(barcode)
stdlib[barcode] = feature
return stdlib
def get_eventlist_errortxt(evtpaths):
'''
读取一次测试中的错误事件
'''
text1 = "one_2_Small_n_Error.txt"
text2 = "one_2_Big_N_Error.txt"
events = []
text = (text1, text2)
for txt in text:
txtfile = os.path.join(evtpaths, txt)
with open(txtfile, "r") as f:
lines = f.readlines()
for i, line in enumerate(lines):
line = line.strip()
if line:
fpath=os.path.join(evtpaths, line)
events.append(fpath)
events = list(set(events))
return events
def save_eventdata():
evtpaths = r"/home/wqg/dataset/test_dataset/performence_dataset/"
events = get_eventlist_errortxt(evtpaths)
'''定义当前事件存储地址及生成相应文件件'''
resultPath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\result\single_event"
for evtpath in events:
event = ShoppingEvent(evtpath)
save_data(event, resultPath)
print(event.evtname)
# def get_topk_percent(data, k):
# """
# 获取数据中最大的 k% 的元素
# """
# # 将数据转换为 NumPy 数组
# if isinstance(data, list):
# data = np.array(data)
# percentile = np.percentile(data, 100-k)
# top_k_percent = data[data >= percentile]
# return top_k_percent
# def cluster(data, thresh=0.15):
# # data = np.array([0.1, 0.13, 0.7, 0.2, 0.8, 0.52, 0.3, 0.7, 0.85, 0.58])
# # data = np.array([0.1, 0.13, 0.2, 0.3])
# # data = np.array([0.1])
# if isinstance(data, list):
# data = np.array(data)
# data1 = np.sort(data)
# cluter, Cluters, = [data1[0]], []
# for i in range(1, len(data1)):
# if data1[i] - data1[i-1]< thresh:
# cluter.append(data1[i])
# else:
# Cluters.append(cluter)
# cluter = [data1[i]]
# Cluters.append(cluter)
# clt_center = []
# for clt in Cluters:
# ## 是否应该在此处限制一个聚类中的最小轨迹样本数,应该将该因素放在轨迹分析中
# # if len(clt)>=3:
# # clt_center.append(np.mean(clt))
# clt_center.append(np.mean(clt))
# # print(clt_center)
# return clt_center
# def calsimi_vs_stdfeat_new(event, stdfeat):
# '''事件与标准库的对比策略
# 该比对策略是否可以拓展到事件与事件的比对?
# '''
# def calsiml(feat1, feat2, topkp=75, cluth=0.15):
# '''轨迹样本和标准特征集样本相似度的选择策略'''
# matrix = 1 - cdist(feat1, feat2, 'cosine')
# simi_max = []
# for i in range(len(matrix)):
# sim = np.mean(get_topk_percent(matrix[i, :], topkp))
# simi_max.append(sim)
# cltc_max = cluster(simi_max, cluth)
# Simi = max(cltc_max)
# ## cltc_max为空属于编程考虑不周应予以排查解决
# # if len(cltc_max):
# # Simi = max(cltc_max)
# # else:
# # Simi = 0 #不应该走到该处
# return Simi
# front_boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
# front_feats = np.empty((0, 256), dtype=np.float64) ##和类doTracks兼容
# for i in range(len(event.front_boxes)):
# front_boxes = np.concatenate((front_boxes, event.front_boxes[i]), axis=0)
# front_feats = np.concatenate((front_feats, event.front_feats[i]), axis=0)
# back_boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
# back_feats = np.empty((0, 256), dtype=np.float64) ##和类doTracks兼容
# for i in range(len(event.back_boxes)):
# back_boxes = np.concatenate((back_boxes, event.back_boxes[i]), axis=0)
# back_feats = np.concatenate((back_feats, event.back_feats[i]), axis=0)
# if len(front_feats):
# front_simi = calsiml(front_feats, stdfeat)
# if len(back_feats):
# back_simi = calsiml(back_feats, stdfeat)
# '''前后摄相似度融合策略'''
# if len(front_feats) and len(back_feats):
# diff_simi = abs(front_simi - back_simi)
# if diff_simi>0.15:
# Similar = max([front_simi, back_simi])
# else:
# Similar = (front_simi+back_simi)/2
# elif len(front_feats) and len(back_feats)==0:
# Similar = front_simi
# elif len(front_feats)==0 and len(back_feats):
# Similar = back_simi
# else:
# Similar = None # 在event.front_feats和event.back_feats同时为空时
# return Similar
def simi_matrix():
evtpaths = r"/home/wqg/dataset/pipeline/contrast/single_event_V10/evtobjs/"
stdfeatPath = r"/home/wqg/dataset/test_dataset/total_barcode/features_json/v11_barcode_0304/"
resultPath = r"/home/wqg/dataset/performence_dataset/result/"
evt_paths, bcdSet = get_evtList(evtpaths)
## read std features
stdDict={}
evtDict = {}
for barcode in bcdSet:
stdpath = os.path.join(stdfeatPath, f"{barcode}.json")
if not os.path.isfile(stdpath):
continue
with open(stdpath, 'r', encoding='utf-8') as f:
stddata = json.load(f)
feat = np.array(stddata["value"])
stdDict[barcode] = feat
for evtpath in evt_paths:
barcode = Path(evtpath).stem.split("_")[-1]
if barcode not in stdDict.keys():
continue
# try:
# with open(evtpath, 'rb') as f:
# evtdata = pickle.load(f)
# except Exception as e:
# print(evtname)
with open(evtpath, 'rb') as f:
event = pickle.load(f)
stdfeat = stdDict[barcode]
Similar = calsimi_vs_stdfeat_new(event, stdfeat)
# 构造 boxes 子图存储路径
subimgpath = os.path.join(resultPath, f"{event.evtname}", "subimg")
if not os.path.exists(subimgpath):
os.makedirs(subimgpath)
histpath = os.path.join(resultPath, "simi_hist")
if not os.path.exists(histpath):
os.makedirs(histpath)
mean_values, max_values = [], []
cameras = ('front', 'back')
fig, ax = plt.subplots(2, 3, figsize=(16, 9), dpi=100)
kpercent = 25
for camera in cameras:
boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
evtfeat = np.empty((0, 256), dtype=np.float64) ##和类doTracks兼容
if camera == 'front':
for i in range(len(event.front_boxes)):
boxes = np.concatenate((boxes, event.front_boxes[i]), axis=0)
evtfeat = np.concatenate((evtfeat, event.front_feats[i]), axis=0)
imgpaths = event.front_imgpaths
else:
for i in range(len(event.back_boxes)):
boxes = np.concatenate((boxes, event.back_boxes[i]), axis=0)
evtfeat = np.concatenate((evtfeat, event.back_feats[i]), axis=0)
imgpaths = event.back_imgpaths
assert len(boxes)==len(evtfeat), f"Please check the Event: {event.evtname}"
if len(boxes)==0: continue
print(event.evtname)
matrix = 1 - cdist(evtfeat, stdfeat, 'cosine')
simi_1d = matrix.flatten()
simi_mean = np.mean(matrix, axis=1)
# simi_max = np.max(matrix, axis=1)
'''以相似度矩阵每一行最大的 k% 的相似度做均值计算'''
simi_max = []
for i in range(len(matrix)):
sim = np.mean(get_topk_percent(matrix[i, :], kpercent))
simi_max.append(sim)
mean_values.append(np.mean(matrix))
max_values.append(np.mean(simi_max))
diff_max_mean = np.mean(simi_max) - np.mean(matrix)
'''相似度统计特性图示'''
k =0
if camera == 'front': k = 1
'''********************* 相似度全体数据 *********************'''
ax[k, 0].hist(simi_1d, bins=60, range=(-0.2, 1), edgecolor='black')
ax[k, 0].set_xlim([-0.2, 1])
ax[k, 0].set_title(camera)
_, y_max = ax[k, 0].get_ylim() # 获取y轴范围
'''相似度变动范围'''
ax[k, 0].text(-0.1, 0.15*y_max, f"rng:{max(simi_1d)-min(simi_1d):.3f}", fontsize=18, color='b')
'''********************* 均值********************************'''
ax[k, 1].hist(simi_mean, bins=24, range=(-0.2, 1), edgecolor='black')
ax[k, 1].set_xlim([-0.2, 1])
ax[k, 1].set_title("mean")
_, y_max = ax[k, 1].get_ylim() # 获取y轴范围
'''相似度变动范围'''
ax[k, 1].text(-0.1, 0.15*y_max, f"rng:{max(simi_mean)-min(simi_mean):.3f}", fontsize=18, color='b')
'''********************* 最大值 ******************************'''
ax[k, 2].hist(simi_max, bins=24, range=(-0.2, 1), edgecolor='black')
ax[k, 2].set_xlim([-0.2, 1])
ax[k, 2].set_title("max")
_, y_max = ax[k, 2].get_ylim() # 获取y轴范围
'''相似度变动范围'''
ax[k, 2].text(-0.1, 0.15*y_max, f"rng:{max(simi_max)-min(simi_max):.3f}", fontsize=18, color='b')
'''绘制聚类中心'''
cltc_mean = cluster(simi_mean)
for value in cltc_mean:
ax[k, 1].axvline(x=value, color='m', linestyle='--', linewidth=3)
cltc_max = cluster(simi_max)
for value in cltc_max:
ax[k, 2].axvline(x=value, color='m', linestyle='--', linewidth=3)
'''绘制相似度均值与最大值均值'''
ax[k, 1].axvline(x=np.mean(matrix), color='r', linestyle='-', linewidth=3)
ax[k, 2].axvline(x=np.mean(simi_max), color='g', linestyle='-', linewidth=3)
'''绘制相似度最大值均值 - 均值'''
_, y_max = ax[k, 2].get_ylim() # 获取y轴范围
ax[k, 2].text(-0.1, 0.05*y_max, f"g-r={diff_max_mean:.3f}", fontsize=18, color='m')
plt.show()
# for i, box in enumerate(boxes):
# x1, y1, x2, y2, tid, score, cls, fid, bid = box
# imgpath = imgpaths[int(fid-1)]
# image = cv2.imread(imgpath)
# subimg = image[int(y1/2):int(y2/2), int(x1/2):int(x2/2), :]
# camerType, timeTamp, _, frameID = os.path.basename(imgpath).split('.')[0].split('_')
# subimgName = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID})_{simi_mean[i]:.3f}.png"
# imgpairs.append((subimgName, subimg))
# spath = os.path.join(subimgpath, subimgName)
# cv2.imwrite(spath, subimg)
# oldname = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID}).png"
# oldpath = os.path.join(subimgpath, oldname)
# if os.path.exists(oldpath):
# os.remove(oldpath)
if len(mean_values)==2:
mean_diff = abs(mean_values[1]-mean_values[0])
ax[0, 1].set_title(f"mean diff: {mean_diff:.3f}")
if len(max_values)==2:
max_diff = abs(max_values[1]-max_values[0])
ax[0, 2].set_title(f"max diff: {max_diff:.3f}")
try:
fig.suptitle(f"Similar: {Similar:.3f}", fontsize=16)
except Exception as e:
print(e)
print(f"Similar: {Similar}")
pltpath = os.path.join(subimgpath, f"hist_max_{kpercent}%_.png")
plt.savefig(pltpath)
pltpath1 = os.path.join(histpath, f"{event.evtname}_.png")
plt.savefig(pltpath1)
plt.close()
def main():
simi_matrix()
if __name__ == "__main__":
main()
# cluster()