Files
detecttracking/tracking/module_analysis.py
王庆刚 9b5b135fa3 20250313
2025-03-13 15:36:29 +08:00

471 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Thu May 30 14:03:03 2024
轨迹分析现场测试性能分析:
(1) 读取 data 文件中的轨迹数据,绘制轨迹图
(2) 读取本地运行 Yolo+Rsenet+Tracker+Tracking 的数据,绘制轨迹图
@author: ym
"""
import os
import cv2
import numpy as np
from pathlib import Path
import warnings
import sys
sys.path.append(r"D:\DetectTracking")
from tracking.utils.read_data import extract_data_realtime, read_tracking_output_realtime
from tracking.utils.plotting import Annotator, colors, draw_tracking_boxes
from tracking.utils import Boxes, IterableSimpleNamespace, yaml_load
from tracking.trackers import BOTSORT, BYTETracker
from tracking.dotrack.dotracks_back import doBackTracks
from tracking.dotrack.dotracks_front import doFrontTracks
from tracking.utils.drawtracks import plot_frameID_y2, draw_all_trajectories
from tracking.utils.read_data import extract_data, read_deletedBarcode_file, read_tracking_output, read_returnGoods_file
from contrast.one2n_contrast import get_contrast_paths, one2n_return
from tracking.utils.annotator import TrackAnnotator
W, H = 1024, 1280
Mode = 'front' #'back'
ImgFormat = ['.jpg', '.jpeg', '.png', '.bmp']
'''调用tracking()函数,利用本地跟踪算法获取各目标轨迹,可以比较本地跟踪算法与现场跟踪算法的区别。'''
def init_tracker(tracker_yaml = None, bs=1):
"""
Initialize tracker for object tracking during prediction.
"""
TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT}
cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml))
tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30)
return tracker
def tracking(bboxes, ffeats):
tracker_yaml = r"./trackers/cfg/botsort.yaml"
tracker = init_tracker(tracker_yaml)
TrackBoxes = np.empty((0, 9), dtype = np.float32)
TracksDict = {}
'''========================== 执行跟踪处理 ============================='''
# dets 与 feats 应保持严格对应
for dets, feats in zip(bboxes, ffeats):
det_tracking = Boxes(dets).cpu().numpy()
tracks = tracker.update(det_tracking, features=feats)
'''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
0 1 2 3 4 5 6 7 8
这里frame_index 也可以用视频的 帧ID 代替, box_index 保持不变
'''
if len(tracks):
TrackBoxes = np.concatenate([TrackBoxes, tracks], axis=0)
FeatDict = {}
for track in tracks:
tid = int(track[8])
FeatDict.update({tid: feats[tid, :]})
frameID = tracks[0, 7]
# print(f"frameID: {int(frameID)}")
assert len(tracks) == len(FeatDict), f"Please check the func: tracker.update() at frameID({int(frameID)})"
TracksDict[f"frame_{int(frameID)}"] = {"feats":FeatDict}
return TrackBoxes, TracksDict
def read_imgs(imgspath, CamerType):
'''
inputs:
imgspath序列图像地址
CamerType相机类型0后摄1前摄
outputs
imgs图像序列
功能:
根据CamerType类型读取imgspath文件夹中的图像并根据帧索引进行排序。
do_tracking()中调用该函数实现1读取imgs并绘制各目标轨迹框2获取subimgs
'''
imgs, frmIDs = [], []
for filename in os.listdir(imgspath):
file, ext = os.path.splitext(filename)
flist = file.split('_')
if len(flist)==4 and ext in ImgFormat:
camID, frmID = flist[0], int(flist[-1])
if camID==CamerType:
img = cv2.imread(os.path.join(imgspath, filename))
imgs.append(img)
frmIDs.append(frmID)
if len(frmIDs):
indice = np.argsort(np.array(frmIDs))
imgs = [imgs[i] for i in indice]
return imgs
def do_tracking(fpath, savedir, event_name='images'):
'''
args:
fpath: 算法各模块输出的data文件地址匹配
savedir: 对 fpath 各模块输出的复现;
分析具体视频时,需指定 fpath 和 savedir
outputs:
img_tracking目标跟踪轨迹、本地轨迹分析算法的轨迹对比图
abimg现场轨迹分析算法、轨迹选择输出的对比图
'''
# fpath = r'D:\contrast\dataset\1_to_n\709\20240709-102758_6971558612189\1_track.data'
# savedir = r'D:\contrast\dataset\result\20240709-102843_6958770005357_6971558612189\error_6971558612189'
imgpath, dfname = os.path.split(fpath)
CamerType = dfname.split('_')[0]
'''1.1 构造 0/1_tracking_output.data 文件地址,读取文件数据'''
tracking_output_path = os.path.join(imgpath, CamerType + '_tracking_output.data')
basename = os.path.basename(imgpath)
if not os.path.isfile(fpath):
print(f"{basename}: Can't find {dfname} file!")
return None, None
if not os.path.isfile(tracking_output_path):
print(f"{basename}: Can't find {CamerType}_tracking_output.data file!")
return None, None
bboxes, ffeats, trackerboxes, tracker_feat_dict, trackingboxes, tracking_feat_dict = extract_data(fpath)
tracking_output_boxes, _ = read_tracking_output(tracking_output_path)
'''1.2 利用本地跟踪算法生成各商品轨迹'''
# trackerboxes, tracker_feat_dict = tracking(bboxes, ffeats)
'''1.3 分别构造 2 个文件夹,(1) 存储画框后的图像; (2) 运动轨迹对应的 boxes子图'''
save_dir = os.path.join(savedir, event_name + '_images')
subimg_dir = os.path.join(savedir, event_name + '_subimgs')
if not os.path.exists(save_dir):
os.makedirs(save_dir)
if not os.path.exists(subimg_dir):
os.makedirs(subimg_dir)
'''2. 执行轨迹分析, 保存轨迹分析前后的对比图示'''
traj_graphic = event_name + '_' + CamerType
if CamerType == '1':
vts = doFrontTracks(trackerboxes, tracker_feat_dict)
vts.classify()
plt = plot_frameID_y2(vts)
# ftpath = os.path.join(savedir, f"{traj_graphic}_front_y2.png")
# plt.savefig(ftpath)
plt.close()
edgeline = cv2.imread("./shopcart/cart_tempt/board_ftmp_line.png")
img_tracking = draw_all_trajectories(vts, edgeline, savedir, CamerType, draw5p=True)
elif CamerType == '0':
vts = doBackTracks(trackerboxes, tracker_feat_dict)
vts.classify()
edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png")
img_tracking = draw_all_trajectories(vts, edgeline, savedir, CamerType, draw5p=True)
# imgpth = os.path.join(savedir, f"{traj_graphic}_.png")
# cv2.imwrite(str(imgpth), img)
else:
print("Please check data file!")
'''3 tracking() 算法输出后多轨迹选择问题分析'''
if CamerType == '1':
aline = cv2.imread("./shopcart/cart_tempt/board_ftmp_line.png")
elif CamerType == '0':
aline = cv2.imread("./shopcart/cart_tempt/edgeline.png")
else:
print("Please check data file!")
bline = aline.copy()
annotator = TrackAnnotator(aline, line_width=2)
for track in trackingboxes:
annotator.plotting_track(track)
aline = annotator.result()
annotator = TrackAnnotator(bline, line_width=2)
if not isinstance(tracking_output_boxes, list):
tracking_output_boxes = [tracking_output_boxes]
for track in tracking_output_boxes:
annotator.plotting_track(track)
bline = annotator.result()
abimg = np.concatenate((aline, bline), axis = 1)
abH, abW = abimg.shape[:2]
cv2.line(abimg, (int(abW/2), 0), (int(abW/2), abH), (128, 255, 128), 2)
# algpath = os.path.join(savedir, f"{traj_graphic}_alg.png")
# cv2.imwrite(str(algpath), abimg)
'''4. 画框后的图像和子图保存若imgs数与tracker中fid数不匹配只保存原图不保存子图'''
'''4.0 读取 fpath 中对应的图像 imgs '''
imgs = read_imgs(imgpath, CamerType)
'''4.1 imgs数 < trackerboxes 的 max(fid),返回原图'''
if len(imgs) < np.max(trackerboxes[:,7]):
for i in range(len(imgs)):
img_savepath = os.path.join(save_dir, CamerType + "_" + f"{i}.png")
cv2.imwrite(img_savepath, imgs[i])
print(f"{basename}: len(imgs) = {len(imgs)} < Tracker max(fid) = {int(np.max(trackerboxes[:,7]))}, 无法匹配画框")
return img_tracking, abimg
'''4.2 在 imgs 上画框并保存'''
imgs_dw = draw_tracking_boxes(imgs, trackerboxes)
for fid, img in imgs_dw:
img_savepath = os.path.join(save_dir, CamerType + "_fid_" + f"{int(fid)}.png")
cv2.imwrite(img_savepath, img)
'''4.3.2 保存轨迹选择对应的子图'''
# for track in tracking_output_boxes:
for track in vts.Residual:
for *xyxy, tid, conf, cls, fid, bid in track.boxes:
img = imgs[int(fid-1)]
x1, y1, x2, y2 = int(xyxy[0]/2), int(xyxy[1]/2), int(xyxy[2]/2), int(xyxy[3]/2)
subimg = img[y1:y2, x1:x2]
subimg_path = os.path.join(subimg_dir, f'{CamerType}_tid{int(tid)}_{int(fid)}_{int(bid)}.png' )
cv2.imwrite(subimg_path, subimg)
for track in tracking_output_boxes:
for *xyxy, tid, conf, cls, fid, bid in track:
img = imgs[int(fid-1)]
x1, y1, x2, y2 = int(xyxy[0]/2), int(xyxy[1]/2), int(xyxy[2]/2), int(xyxy[3]/2)
subimg = img[y1:y2, x1:x2]
subimg_path = os.path.join(subimg_dir, f'x_{CamerType}_tid{int(tid)}_{int(fid)}_{int(bid)}.png' )
cv2.imwrite(subimg_path, subimg)
return img_tracking, abimg
def tracking_simulate(eventpath, savepath):
'''args:
eventpath: 事件文件夹
savepath: 存储文件夹
遍历eventpath
'''
# =============================================================================
# '''1. 获取事件名'''
# event_names = os.path.basename(eventpath).strip().split('_')
# if len(event_names)==2 and len(event_names[1])>=8:
# enent_name = event_names[1]
# elif len(event_names)==2 and len(event_names[1])==0:
# enent_name = event_names[0]
# else:
# return
# =============================================================================
enent_name = os.path.basename(eventpath)
## only for simplify the filename
idx = enent_name.find('2024')
if idx>=0:
enent_name = enent_name[idx:(idx+15)]
'''2. 依次读取 0/1_track.data 中数据,进行仿真'''
illu_tracking, illu_select = [], []
for filename in os.listdir(eventpath):
# filename = '1_track.data'
if filename.find("track.data") < 0: continue
fpath = os.path.join(eventpath, filename)
if not os.path.isfile(fpath): continue
img_tracking, img_select = do_tracking(fpath, savepath, enent_name)
if img_select is not None:
illu_select.append(img_select)
if img_tracking is not None:
illu_tracking.append(img_tracking)
'''3. 共幅8图上下子图显示的是前后摄每一行4个子图分别为
(1) tracker输出原始轨迹; (2)本地tracking输出; (3)现场算法轨迹选择前轨迹; (4)现场算法轨迹选择后的轨迹
'''
if len(illu_select)==2:
Img_s = np.concatenate((illu_select[0], illu_select[1]), axis = 0)
H, W = Img_s.shape[:2]
cv2.line(Img_s, (0, int(H/2)), (int(W), int(H/2)), (128, 255, 128), 2)
elif len(illu_select)==1:
Img_s = illu_select[0]
else:
Img_s = None
if len(illu_tracking)==2:
Img_t = np.concatenate((illu_tracking[0], illu_tracking[1]), axis = 0)
H, W = Img_t.shape[:2]
cv2.line(Img_t, (0, int(H/2)), (int(W), int(H/2)), (128, 255, 128), 2)
elif len(illu_tracking)==1:
Img_t = illu_tracking[0]
else:
Img_t = None
'''3.1 保存输出轨迹图若tracking、select的shape相同则合并输出否则单独输出'''
imgpath_tracking = os.path.join(savepath, enent_name + '_tracking.png')
imgpath_select = os.path.join(savepath, enent_name + '_select.png')
imgpath_ts = os.path.join(savepath, enent_name + '_tracking_select.png')
if Img_t is not None and Img_s is not None and np.all(Img_s.shape==Img_t.shape):
Img_ts = np.concatenate((Img_t, Img_s), axis = 1)
H, W = Img_ts.shape[:2]
cv2.line(Img_ts, (int(W/2), 0), (int(W/2), int(H)), (0, 0, 255), 4)
cv2.imwrite(imgpath_ts, Img_ts)
else:
if Img_s: cv2.imwrite(imgpath_select, Img_s) # 不会执行到该处
if Img_t: cv2.imwrite(imgpath_tracking, Img_t) # 不会执行到该处
Img_ts = None
'''3.2 单独另存保存完好的 8 轨迹图'''
if Img_ts is not None:
basepath, _ = os.path.split(savepath)
trajpath = os.path.join(basepath, 'trajs')
if not os.path.exists(trajpath):
os.makedirs(trajpath)
traj_path = os.path.join(trajpath, enent_name+'.png')
cv2.imwrite(traj_path, Img_ts)
return Img_ts
# warnings.simplefilter("error", category=np.VisibleDeprecationWarning)
def main_loop():
del_barcode_file = r'\\192.168.1.28\share\测试_202406\0723\0723_3\deletedBarcode.txt'
basepath = r'\\192.168.1.28\share\测试_202406\0723\0723_3' # 测试数据文件夹地址
# del_barcode_file = r'\\192.168.1.28\share\测试_202406\1030\images\returnGoods.txt'
# basepath = r'\\192.168.1.28\share\测试_202406\1030\images' # 测试数据文件夹地址
'''获取性能测试数据相关路径'''
SavePath = r'D:\contrast\dataset\resultx' # 结果保存地址
saveimgs = True
if os.path.basename(del_barcode_file).find('deletedBarcode'):
relative_paths = get_contrast_paths(del_barcode_file, basepath, SavePath, saveimgs)
elif os.path.basename(del_barcode_file).find('returnGoods'):
blist = read_returnGoods_file(del_barcode_file)
errpairs, corrpairs, err_similarity, correct_similarity = one2n_return(blist)
relative_paths = []
for getoutevent, inputevent, errevent in errpairs:
relative_paths.append(os.path.join(basepath, getoutevent))
relative_paths.append(os.path.join(basepath, inputevent))
relative_paths.append(os.path.join(basepath, errevent))
# prefix = ["getout_", "input_", "error_"]
'''开始循环执行每次测试过任务'''
k = 0
for tuple_paths in relative_paths:
'''1. 生成存储结果图像的文件夹'''
namedirs = []
for data_path in tuple_paths:
base_name = os.path.basename(data_path).strip().split('_')
if len(base_name[-1]):
name = base_name[-1]
else:
name = base_name[0]
namedirs.append(name)
sdir = "_".join(namedirs)
savepath = os.path.join(SavePath, sdir)
# if os.path.exists(savepath):
# continue
if not os.path.exists(savepath):
os.makedirs(savepath)
'''2. 循环执行操作事件:取出、放入、错误匹配'''
for eventpath in tuple_paths:
try:
tracking_simulate(eventpath, savepath)
except Exception as e:
print(f'Error! {eventpath}, {e}')
# k +=1
# if k==1:
# break
def main():
'''
eventPaths: data文件地址该 data 文件包括 Pipeline 各模块输出
SavePath: 包含二级目录,一级目录为轨迹图像;二级目录为与data文件对应的序列图像存储地址。
'''
# eventPaths = r'\\192.168.1.28\share\测试_202406\0723\0723_3'
eventPaths = r'\\192.168.1.28\share\测试视频数据以及日志\各模块测试记录\展厅测试\1120_展厅模型v801测试\扫A放A'
savePath = r'D:\exhibition\result'
k=0
for pathname in os.listdir(eventPaths):
pathname = "20241121-144901-fdba61c6-aefa-4b50-876d-5e05998befdc_6920459905012_6920459905012"
eventpath = os.path.join(eventPaths, pathname)
savepath = os.path.join(savePath, pathname)
if not os.path.exists(savepath):
os.makedirs(savepath)
tracking_simulate(eventpath, savepath)
# try:
# tracking_simulate(eventpath, savepath)
# except Exception as e:
# print(f'Error! {eventpath}, {e}')
k += 1
if k==1:
break
if __name__ == "__main__":
# main_loop()
main()
# try:
# main_loop()
# except Exception as e:
# print(f'Error: {e}')