350 lines
12 KiB
Python
350 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on Mon Dec 16 18:56:18 2024
|
||
|
||
@author: ym
|
||
"""
|
||
import os
|
||
import cv2
|
||
import json
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib import rcParams
|
||
from matplotlib.font_manager import FontProperties
|
||
from scipy.spatial.distance import cdist
|
||
from utils.event import ShoppingEvent, save_data
|
||
|
||
|
||
|
||
rcParams['font.sans-serif'] = ['SimHei'] # 用黑体显示中文
|
||
rcParams['axes.unicode_minus'] = False # 正确显示负号
|
||
|
||
|
||
'''*********** USearch ***********'''
|
||
def read_usearch():
|
||
stdFeaturePath = r"D:\contrast\stdlib\v11_test.json"
|
||
stdBarcode = []
|
||
stdlib = {}
|
||
with open(stdFeaturePath, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
for dic in data['total']:
|
||
barcode = dic['key']
|
||
feature = np.array(dic['value'])
|
||
stdBarcode.append(barcode)
|
||
stdlib[barcode] = feature
|
||
|
||
return stdlib
|
||
|
||
def get_eventlist():
|
||
'''
|
||
读取一次测试中的错误事件
|
||
'''
|
||
evtpaths = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\images"
|
||
text1 = "one2n_Error.txt"
|
||
text2 = "one2SN_Error.txt"
|
||
events = []
|
||
text = (text1, text2)
|
||
for txt in text:
|
||
txtfile = os.path.join(evtpaths, txt)
|
||
with open(txtfile, "r") as f:
|
||
lines = f.readlines()
|
||
for i, line in enumerate(lines):
|
||
line = line.strip()
|
||
if line:
|
||
fpath=os.path.join(evtpaths, line)
|
||
events.append(fpath)
|
||
|
||
events = list(set(events))
|
||
|
||
return events
|
||
|
||
def single_event():
|
||
|
||
events = get_eventlist()
|
||
|
||
|
||
|
||
'''定义当前事件存储地址及生成相应文件件'''
|
||
resultPath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\result\single_event"
|
||
for evtpath in events:
|
||
event = ShoppingEvent(evtpath)
|
||
save_data(event, resultPath)
|
||
|
||
print(event.evtname)
|
||
|
||
|
||
|
||
def get_topk_percent(data, k):
|
||
"""
|
||
获取数据中最大的 k% 的元素
|
||
"""
|
||
# 将数据转换为 NumPy 数组
|
||
if isinstance(data, list):
|
||
data = np.array(data)
|
||
|
||
percentile = np.percentile(data, 100-k)
|
||
top_k_percent = data[data >= percentile]
|
||
|
||
return top_k_percent
|
||
def cluster(data, thresh=0.15):
|
||
# data = np.array([0.1, 0.13, 0.7, 0.2, 0.8, 0.52, 0.3, 0.7, 0.85, 0.58])
|
||
# data = np.array([0.1, 0.13, 0.2, 0.3])
|
||
# data = np.array([0.1])
|
||
|
||
if isinstance(data, list):
|
||
data = np.array(data)
|
||
|
||
data1 = np.sort(data)
|
||
cluter, Cluters, = [data1[0]], []
|
||
for i in range(1, len(data1)):
|
||
if data1[i] - data1[i-1]< thresh:
|
||
cluter.append(data1[i])
|
||
else:
|
||
Cluters.append(cluter)
|
||
cluter = [data1[i]]
|
||
Cluters.append(cluter)
|
||
|
||
clt_center = []
|
||
for clt in Cluters:
|
||
## 是否应该在此处限制一个聚类中的最小轨迹样本数,应该将该因素放在轨迹分析中
|
||
# if len(clt)>=3:
|
||
# clt_center.append(np.mean(clt))
|
||
clt_center.append(np.mean(clt))
|
||
|
||
# print(clt_center)
|
||
|
||
return clt_center
|
||
|
||
def calc_simil(event, stdfeat):
|
||
|
||
def calsiml(feat1, feat2):
|
||
'''轨迹样本和标准特征集样本相似度的选择策略'''
|
||
matrix = 1 - cdist(feat1, feat2, 'cosine')
|
||
simi_max = []
|
||
for i in range(len(matrix)):
|
||
sim = np.mean(get_topk_percent(matrix[i, :], 75))
|
||
simi_max.append(sim)
|
||
cltc_max = cluster(simi_max)
|
||
Simi = max(cltc_max)
|
||
|
||
## cltc_max为空属于编程考虑不周,应予以排查解决
|
||
# if len(cltc_max):
|
||
# Simi = max(cltc_max)
|
||
# else:
|
||
# Simi = 0 #不应该走到该处
|
||
|
||
|
||
return Simi
|
||
|
||
|
||
front_boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
||
front_feats = np.empty((0, 256), dtype=np.float64) ##和类doTracks兼容
|
||
for i in range(len(event.front_boxes)):
|
||
front_boxes = np.concatenate((front_boxes, event.front_boxes[i]), axis=0)
|
||
front_feats = np.concatenate((front_feats, event.front_feats[i]), axis=0)
|
||
|
||
back_boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
||
back_feats = np.empty((0, 256), dtype=np.float64) ##和类doTracks兼容
|
||
for i in range(len(event.back_boxes)):
|
||
back_boxes = np.concatenate((back_boxes, event.back_boxes[i]), axis=0)
|
||
back_feats = np.concatenate((back_feats, event.back_feats[i]), axis=0)
|
||
|
||
if len(front_feats):
|
||
front_simi = calsiml(front_feats, stdfeat)
|
||
if len(back_feats):
|
||
back_simi = calsiml(back_feats, stdfeat)
|
||
|
||
'''前后摄相似度融合策略'''
|
||
if len(front_feats) and len(back_feats):
|
||
diff_simi = abs(front_simi - back_simi)
|
||
if diff_simi>0.15:
|
||
Similar = max([front_simi, back_simi])
|
||
else:
|
||
Similar = (front_simi+back_simi)/2
|
||
elif len(front_feats) and len(back_feats)==0:
|
||
Similar = front_simi
|
||
elif len(front_feats)==0 and len(back_feats):
|
||
Similar = back_simi
|
||
else:
|
||
Similar = None # 在event.front_feats和event.back_feats同时为空时
|
||
|
||
return Similar
|
||
|
||
|
||
def simi_matrix():
|
||
resultPath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\result\single_event"
|
||
|
||
stdlib = read_usearch()
|
||
events = get_eventlist()
|
||
for evtpath in events:
|
||
evtname = os.path.basename(evtpath)
|
||
_, barcode = evtname.split("_")
|
||
|
||
# 生成事件与相应标准特征集
|
||
event = ShoppingEvent(evtpath)
|
||
stdfeat = stdlib[barcode]
|
||
|
||
Similar = calc_simil(event, stdfeat)
|
||
|
||
# 构造 boxes 子图存储路径
|
||
subimgpath = os.path.join(resultPath, f"{event.evtname}", "subimg")
|
||
if not os.path.exists(subimgpath):
|
||
os.makedirs(subimgpath)
|
||
histpath = os.path.join(resultPath, "simi_hist")
|
||
if not os.path.exists(histpath):
|
||
os.makedirs(histpath)
|
||
|
||
|
||
|
||
mean_values, max_values = [], []
|
||
cameras = ('front', 'back')
|
||
fig, ax = plt.subplots(2, 3, figsize=(16, 9), dpi=100)
|
||
kpercent = 25
|
||
for camera in cameras:
|
||
boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
||
evtfeat = np.empty((0, 256), dtype=np.float64) ##和类doTracks兼容
|
||
if camera == 'front':
|
||
for i in range(len(event.front_boxes)):
|
||
boxes = np.concatenate((boxes, event.front_boxes[i]), axis=0)
|
||
evtfeat = np.concatenate((evtfeat, event.front_feats[i]), axis=0)
|
||
imgpaths = event.front_imgpaths
|
||
|
||
else:
|
||
for i in range(len(event.back_boxes)):
|
||
boxes = np.concatenate((boxes, event.back_boxes[i]), axis=0)
|
||
evtfeat = np.concatenate((evtfeat, event.back_feats[i]), axis=0)
|
||
imgpaths = event.back_imgpaths
|
||
|
||
assert len(boxes)==len(evtfeat), f"Please check the Event: {evtname}"
|
||
if len(boxes)==0: continue
|
||
print(evtname)
|
||
|
||
matrix = 1 - cdist(evtfeat, stdfeat, 'cosine')
|
||
simi_1d = matrix.flatten()
|
||
simi_mean = np.mean(matrix, axis=1)
|
||
# simi_max = np.max(matrix, axis=1)
|
||
|
||
'''以相似度矩阵每一行最大的 k% 的相似度做均值计算'''
|
||
simi_max = []
|
||
for i in range(len(matrix)):
|
||
sim = np.mean(get_topk_percent(matrix[i, :], kpercent))
|
||
simi_max.append(sim)
|
||
|
||
|
||
mean_values.append(np.mean(matrix))
|
||
max_values.append(np.mean(simi_max))
|
||
|
||
diff_max_mean = np.mean(simi_max) - np.mean(matrix)
|
||
|
||
'''相似度统计特性图示'''
|
||
k =0
|
||
if camera == 'front': k = 1
|
||
|
||
'''********************* 相似度全体数据 *********************'''
|
||
ax[k, 0].hist(simi_1d, bins=60, range=(-0.2, 1), edgecolor='black')
|
||
ax[k, 0].set_xlim([-0.2, 1])
|
||
ax[k, 0].set_title(camera)
|
||
|
||
_, y_max = ax[k, 0].get_ylim() # 获取y轴范围
|
||
'''相似度变动范围'''
|
||
ax[k, 0].text(-0.1, 0.15*y_max, f"rng:{max(simi_1d)-min(simi_1d):.3f}", fontsize=18, color='b')
|
||
|
||
'''********************* 均值********************************'''
|
||
ax[k, 1].hist(simi_mean, bins=24, range=(-0.2, 1), edgecolor='black')
|
||
ax[k, 1].set_xlim([-0.2, 1])
|
||
ax[k, 1].set_title("mean")
|
||
_, y_max = ax[k, 1].get_ylim() # 获取y轴范围
|
||
'''相似度变动范围'''
|
||
ax[k, 1].text(-0.1, 0.15*y_max, f"rng:{max(simi_mean)-min(simi_mean):.3f}", fontsize=18, color='b')
|
||
|
||
|
||
'''********************* 最大值 ******************************'''
|
||
ax[k, 2].hist(simi_max, bins=24, range=(-0.2, 1), edgecolor='black')
|
||
ax[k, 2].set_xlim([-0.2, 1])
|
||
ax[k, 2].set_title("max")
|
||
_, y_max = ax[k, 2].get_ylim() # 获取y轴范围
|
||
'''相似度变动范围'''
|
||
ax[k, 2].text(-0.1, 0.15*y_max, f"rng:{max(simi_max)-min(simi_max):.3f}", fontsize=18, color='b')
|
||
|
||
|
||
'''绘制聚类中心'''
|
||
cltc_mean = cluster(simi_mean)
|
||
for value in cltc_mean:
|
||
ax[k, 1].axvline(x=value, color='m', linestyle='--', linewidth=3)
|
||
|
||
cltc_max = cluster(simi_max)
|
||
for value in cltc_max:
|
||
ax[k, 2].axvline(x=value, color='m', linestyle='--', linewidth=3)
|
||
|
||
'''绘制相似度均值与最大值均值'''
|
||
ax[k, 1].axvline(x=np.mean(matrix), color='r', linestyle='-', linewidth=3)
|
||
ax[k, 2].axvline(x=np.mean(simi_max), color='g', linestyle='-', linewidth=3)
|
||
|
||
'''绘制相似度最大值均值 - 均值'''
|
||
_, y_max = ax[k, 2].get_ylim() # 获取y轴范围
|
||
ax[k, 2].text(-0.1, 0.05*y_max, f"g-r={diff_max_mean:.3f}", fontsize=18, color='m')
|
||
|
||
plt.show()
|
||
|
||
# for i, box in enumerate(boxes):
|
||
# x1, y1, x2, y2, tid, score, cls, fid, bid = box
|
||
# imgpath = imgpaths[int(fid-1)]
|
||
# image = cv2.imread(imgpath)
|
||
# subimg = image[int(y1/2):int(y2/2), int(x1/2):int(x2/2), :]
|
||
# camerType, timeTamp, _, frameID = os.path.basename(imgpath).split('.')[0].split('_')
|
||
# subimgName = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID})_{simi_mean[i]:.3f}.png"
|
||
# imgpairs.append((subimgName, subimg))
|
||
# spath = os.path.join(subimgpath, subimgName)
|
||
# cv2.imwrite(spath, subimg)
|
||
|
||
# oldname = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID}).png"
|
||
# oldpath = os.path.join(subimgpath, oldname)
|
||
# if os.path.exists(oldpath):
|
||
# os.remove(oldpath)
|
||
|
||
|
||
if len(mean_values)==2:
|
||
mean_diff = abs(mean_values[1]-mean_values[0])
|
||
ax[0, 1].set_title(f"mean diff: {mean_diff:.3f}")
|
||
if len(max_values)==2:
|
||
max_values = abs(max_values[1]-max_values[0])
|
||
ax[0, 2].set_title(f"max diff: {max_values:.3f}")
|
||
try:
|
||
fig.suptitle(f"Similar: {Similar:.3f}", fontsize=16)
|
||
except Exception as e:
|
||
print(e)
|
||
print(f"Similar: {Similar}")
|
||
pltpath = os.path.join(subimgpath, f"hist_max_{kpercent}%_.png")
|
||
plt.savefig(pltpath)
|
||
|
||
pltpath1 = os.path.join(histpath, f"{evtname}_.png")
|
||
plt.savefig(pltpath1)
|
||
|
||
|
||
plt.close()
|
||
|
||
|
||
|
||
|
||
|
||
|
||
def main():
|
||
|
||
simi_matrix()
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
# cluster()
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|