# -*- coding: utf-8 -*- """ Created on Thu May 30 14:03:03 2024 轨迹分析现场测试性能分析: (1) 读取 data 文件中的轨迹数据,绘制轨迹图 (2) 读取本地运行 Yolo+Rsenet+Tracker+Tracking 的数据,绘制轨迹图 @author: ym """ import os import cv2 import numpy as np from pathlib import Path import warnings import sys sys.path.append(r"D:\DetectTracking") from tracking.utils.read_data import extract_data_realtime, read_tracking_output_realtime from tracking.utils.plotting import Annotator, colors, draw_tracking_boxes from tracking.utils import Boxes, IterableSimpleNamespace, yaml_load from tracking.trackers import BOTSORT, BYTETracker from tracking.dotrack.dotracks_back import doBackTracks from tracking.dotrack.dotracks_front import doFrontTracks from tracking.utils.drawtracks import plot_frameID_y2, draw_all_trajectories from tracking.utils.read_data import extract_data, read_deletedBarcode_file, read_tracking_output, read_returnGoods_file from contrast.one2n_contrast import get_contrast_paths, one2n_return from tracking.utils.annotator import TrackAnnotator W, H = 1024, 1280 Mode = 'front' #'back' ImgFormat = ['.jpg', '.jpeg', '.png', '.bmp'] '''调用tracking()函数,利用本地跟踪算法获取各目标轨迹,可以比较本地跟踪算法与现场跟踪算法的区别。''' def init_tracker(tracker_yaml = None, bs=1): """ Initialize tracker for object tracking during prediction. """ TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT} cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml)) tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30) return tracker def tracking(bboxes, ffeats): tracker_yaml = r"./trackers/cfg/botsort.yaml" tracker = init_tracker(tracker_yaml) TrackBoxes = np.empty((0, 9), dtype = np.float32) TracksDict = {} '''========================== 执行跟踪处理 =============================''' # dets 与 feats 应保持严格对应 for dets, feats in zip(bboxes, ffeats): det_tracking = Boxes(dets).cpu().numpy() tracks = tracker.update(det_tracking, features=feats) '''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 这里,frame_index 也可以用视频的 帧ID 代替, box_index 保持不变 ''' if len(tracks): TrackBoxes = np.concatenate([TrackBoxes, tracks], axis=0) FeatDict = {} for track in tracks: tid = int(track[8]) FeatDict.update({tid: feats[tid, :]}) frameID = tracks[0, 7] # print(f"frameID: {int(frameID)}") assert len(tracks) == len(FeatDict), f"Please check the func: tracker.update() at frameID({int(frameID)})" TracksDict[f"frame_{int(frameID)}"] = {"feats":FeatDict} return TrackBoxes, TracksDict def read_imgs(imgspath, CamerType): ''' inputs: imgspath;序列图像地址 CamerType:相机类型,0:后摄,1:前摄 outputs: imgs:图像序列 功能: 根据CamerType类型读取imgspath文件夹中的图像,并根据帧索引进行排序。 do_tracking()中调用该函数,实现(1)读取imgs并绘制各目标轨迹框;(2)获取subimgs ''' imgs, frmIDs = [], [] for filename in os.listdir(imgspath): file, ext = os.path.splitext(filename) flist = file.split('_') if len(flist)==4 and ext in ImgFormat: camID, frmID = flist[0], int(flist[-1]) if camID==CamerType: img = cv2.imread(os.path.join(imgspath, filename)) imgs.append(img) frmIDs.append(frmID) if len(frmIDs): indice = np.argsort(np.array(frmIDs)) imgs = [imgs[i] for i in indice] return imgs def do_tracking(fpath, savedir, event_name='images'): ''' args: fpath: 算法各模块输出的data文件地址,匹配; savedir: 对 fpath 各模块输出的复现; 分析具体视频时,需指定 fpath 和 savedir outputs: img_tracking:目标跟踪轨迹、本地轨迹分析算法的轨迹对比图 abimg:现场轨迹分析算法、轨迹选择输出的对比图 ''' # fpath = r'D:\contrast\dataset\1_to_n\709\20240709-102758_6971558612189\1_track.data' # savedir = r'D:\contrast\dataset\result\20240709-102843_6958770005357_6971558612189\error_6971558612189' imgpath, dfname = os.path.split(fpath) CamerType = dfname.split('_')[0] '''1.1 构造 0/1_tracking_output.data 文件地址,读取文件数据''' tracking_output_path = os.path.join(imgpath, CamerType + '_tracking_output.data') basename = os.path.basename(imgpath) if not os.path.isfile(fpath): print(f"{basename}: Can't find {dfname} file!") return None, None if not os.path.isfile(tracking_output_path): print(f"{basename}: Can't find {CamerType}_tracking_output.data file!") return None, None bboxes, ffeats, trackerboxes, tracker_feat_dict, trackingboxes, tracking_feat_dict = extract_data(fpath) tracking_output_boxes, _ = read_tracking_output(tracking_output_path) '''1.2 利用本地跟踪算法生成各商品轨迹''' # trackerboxes, tracker_feat_dict = tracking(bboxes, ffeats) '''1.3 分别构造 2 个文件夹,(1) 存储画框后的图像; (2) 运动轨迹对应的 boxes子图''' save_dir = os.path.join(savedir, event_name + '_images') subimg_dir = os.path.join(savedir, event_name + '_subimgs') if not os.path.exists(save_dir): os.makedirs(save_dir) if not os.path.exists(subimg_dir): os.makedirs(subimg_dir) '''2. 执行轨迹分析, 保存轨迹分析前后的对比图示''' traj_graphic = event_name + '_' + CamerType if CamerType == '1': vts = doFrontTracks(trackerboxes, tracker_feat_dict) vts.classify() plt = plot_frameID_y2(vts) # ftpath = os.path.join(savedir, f"{traj_graphic}_front_y2.png") # plt.savefig(ftpath) plt.close() edgeline = cv2.imread("./shopcart/cart_tempt/board_ftmp_line.png") img_tracking = draw_all_trajectories(vts, edgeline, savedir, CamerType, draw5p=True) elif CamerType == '0': vts = doBackTracks(trackerboxes, tracker_feat_dict) vts.classify() edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png") img_tracking = draw_all_trajectories(vts, edgeline, savedir, CamerType, draw5p=True) # imgpth = os.path.join(savedir, f"{traj_graphic}_.png") # cv2.imwrite(str(imgpth), img) else: print("Please check data file!") '''3 tracking() 算法输出后多轨迹选择问题分析''' if CamerType == '1': aline = cv2.imread("./shopcart/cart_tempt/board_ftmp_line.png") elif CamerType == '0': aline = cv2.imread("./shopcart/cart_tempt/edgeline.png") else: print("Please check data file!") bline = aline.copy() annotator = TrackAnnotator(aline, line_width=2) for track in trackingboxes: annotator.plotting_track(track) aline = annotator.result() annotator = TrackAnnotator(bline, line_width=2) if not isinstance(tracking_output_boxes, list): tracking_output_boxes = [tracking_output_boxes] for track in tracking_output_boxes: annotator.plotting_track(track) bline = annotator.result() abimg = np.concatenate((aline, bline), axis = 1) abH, abW = abimg.shape[:2] cv2.line(abimg, (int(abW/2), 0), (int(abW/2), abH), (128, 255, 128), 2) # algpath = os.path.join(savedir, f"{traj_graphic}_alg.png") # cv2.imwrite(str(algpath), abimg) '''4. 画框后的图像和子图保存,若imgs数与tracker中fid数不匹配,只保存原图,不保存子图''' '''4.0 读取 fpath 中对应的图像 imgs ''' imgs = read_imgs(imgpath, CamerType) '''4.1 imgs数 < trackerboxes 的 max(fid),返回原图''' if len(imgs) < np.max(trackerboxes[:,7]): for i in range(len(imgs)): img_savepath = os.path.join(save_dir, CamerType + "_" + f"{i}.png") cv2.imwrite(img_savepath, imgs[i]) print(f"{basename}: len(imgs) = {len(imgs)} < Tracker max(fid) = {int(np.max(trackerboxes[:,7]))}, 无法匹配画框") return img_tracking, abimg '''4.2 在 imgs 上画框并保存''' imgs_dw = draw_tracking_boxes(imgs, trackerboxes) for fid, img in imgs_dw: img_savepath = os.path.join(save_dir, CamerType + "_fid_" + f"{int(fid)}.png") cv2.imwrite(img_savepath, img) '''4.3.2 保存轨迹选择对应的子图''' # for track in tracking_output_boxes: for track in vts.Residual: for *xyxy, tid, conf, cls, fid, bid in track.boxes: img = imgs[int(fid-1)] x1, y1, x2, y2 = int(xyxy[0]/2), int(xyxy[1]/2), int(xyxy[2]/2), int(xyxy[3]/2) subimg = img[y1:y2, x1:x2] subimg_path = os.path.join(subimg_dir, f'{CamerType}_tid{int(tid)}_{int(fid)}_{int(bid)}.png' ) cv2.imwrite(subimg_path, subimg) for track in tracking_output_boxes: for *xyxy, tid, conf, cls, fid, bid in track: img = imgs[int(fid-1)] x1, y1, x2, y2 = int(xyxy[0]/2), int(xyxy[1]/2), int(xyxy[2]/2), int(xyxy[3]/2) subimg = img[y1:y2, x1:x2] subimg_path = os.path.join(subimg_dir, f'x_{CamerType}_tid{int(tid)}_{int(fid)}_{int(bid)}.png' ) cv2.imwrite(subimg_path, subimg) return img_tracking, abimg def tracking_simulate(eventpath, savepath): '''args: eventpath: 事件文件夹 savepath: 存储文件夹 遍历eventpath ''' # ============================================================================= # '''1. 获取事件名''' # event_names = os.path.basename(eventpath).strip().split('_') # if len(event_names)==2 and len(event_names[1])>=8: # enent_name = event_names[1] # elif len(event_names)==2 and len(event_names[1])==0: # enent_name = event_names[0] # else: # return # ============================================================================= enent_name = os.path.basename(eventpath) ## only for simplify the filename idx = enent_name.find('2024') if idx>=0: enent_name = enent_name[idx:(idx+15)] '''2. 依次读取 0/1_track.data 中数据,进行仿真''' illu_tracking, illu_select = [], [] for filename in os.listdir(eventpath): # filename = '1_track.data' if filename.find("track.data") < 0: continue fpath = os.path.join(eventpath, filename) if not os.path.isfile(fpath): continue img_tracking, img_select = do_tracking(fpath, savepath, enent_name) if img_select is not None: illu_select.append(img_select) if img_tracking is not None: illu_tracking.append(img_tracking) '''3. 共幅8图,上下子图显示的是前后摄,每一行4个子图,分别为: (1) tracker输出原始轨迹; (2)本地tracking输出; (3)现场算法轨迹选择前轨迹; (4)现场算法轨迹选择后的轨迹 ''' if len(illu_select)==2: Img_s = np.concatenate((illu_select[0], illu_select[1]), axis = 0) H, W = Img_s.shape[:2] cv2.line(Img_s, (0, int(H/2)), (int(W), int(H/2)), (128, 255, 128), 2) elif len(illu_select)==1: Img_s = illu_select[0] else: Img_s = None if len(illu_tracking)==2: Img_t = np.concatenate((illu_tracking[0], illu_tracking[1]), axis = 0) H, W = Img_t.shape[:2] cv2.line(Img_t, (0, int(H/2)), (int(W), int(H/2)), (128, 255, 128), 2) elif len(illu_tracking)==1: Img_t = illu_tracking[0] else: Img_t = None '''3.1 保存输出轨迹图,若tracking、select的shape相同,则合并输出,否则单独输出''' imgpath_tracking = os.path.join(savepath, enent_name + '_tracking.png') imgpath_select = os.path.join(savepath, enent_name + '_select.png') imgpath_ts = os.path.join(savepath, enent_name + '_tracking_select.png') if Img_t is not None and Img_s is not None and np.all(Img_s.shape==Img_t.shape): Img_ts = np.concatenate((Img_t, Img_s), axis = 1) H, W = Img_ts.shape[:2] cv2.line(Img_ts, (int(W/2), 0), (int(W/2), int(H)), (0, 0, 255), 4) cv2.imwrite(imgpath_ts, Img_ts) else: if Img_s: cv2.imwrite(imgpath_select, Img_s) # 不会执行到该处 if Img_t: cv2.imwrite(imgpath_tracking, Img_t) # 不会执行到该处 Img_ts = None '''3.2 单独另存保存完好的 8 轨迹图''' if Img_ts is not None: basepath, _ = os.path.split(savepath) trajpath = os.path.join(basepath, 'trajs') if not os.path.exists(trajpath): os.makedirs(trajpath) traj_path = os.path.join(trajpath, enent_name+'.png') cv2.imwrite(traj_path, Img_ts) return Img_ts # warnings.simplefilter("error", category=np.VisibleDeprecationWarning) def main_loop(): del_barcode_file = r'\\192.168.1.28\share\测试_202406\0723\0723_3\deletedBarcode.txt' basepath = r'\\192.168.1.28\share\测试_202406\0723\0723_3' # 测试数据文件夹地址 # del_barcode_file = r'\\192.168.1.28\share\测试_202406\1030\images\returnGoods.txt' # basepath = r'\\192.168.1.28\share\测试_202406\1030\images' # 测试数据文件夹地址 '''获取性能测试数据相关路径''' SavePath = r'D:\contrast\dataset\resultx' # 结果保存地址 saveimgs = True if os.path.basename(del_barcode_file).find('deletedBarcode'): relative_paths = get_contrast_paths(del_barcode_file, basepath, SavePath, saveimgs) elif os.path.basename(del_barcode_file).find('returnGoods'): blist = read_returnGoods_file(del_barcode_file) errpairs, corrpairs, err_similarity, correct_similarity = one2n_return(blist) relative_paths = [] for getoutevent, inputevent, errevent in errpairs: relative_paths.append(os.path.join(basepath, getoutevent)) relative_paths.append(os.path.join(basepath, inputevent)) relative_paths.append(os.path.join(basepath, errevent)) # prefix = ["getout_", "input_", "error_"] '''开始循环执行每次测试过任务''' k = 0 for tuple_paths in relative_paths: '''1. 生成存储结果图像的文件夹''' namedirs = [] for data_path in tuple_paths: base_name = os.path.basename(data_path).strip().split('_') if len(base_name[-1]): name = base_name[-1] else: name = base_name[0] namedirs.append(name) sdir = "_".join(namedirs) savepath = os.path.join(SavePath, sdir) # if os.path.exists(savepath): # continue if not os.path.exists(savepath): os.makedirs(savepath) '''2. 循环执行操作事件:取出、放入、错误匹配''' for eventpath in tuple_paths: try: tracking_simulate(eventpath, savepath) except Exception as e: print(f'Error! {eventpath}, {e}') # k +=1 # if k==1: # break def main(): ''' eventPaths: data文件地址,该 data 文件包括 Pipeline 各模块输出 SavePath: 包含二级目录,一级目录为轨迹图像;二级目录为与data文件对应的序列图像存储地址。 ''' # eventPaths = r'\\192.168.1.28\share\测试_202406\0723\0723_3' eventPaths = r'\\192.168.1.28\share\测试视频数据以及日志\各模块测试记录\展厅测试\1120_展厅模型v801测试\扫A放A' savePath = r'D:\exhibition\result' k=0 for pathname in os.listdir(eventPaths): pathname = "20241121-144901-fdba61c6-aefa-4b50-876d-5e05998befdc_6920459905012_6920459905012" eventpath = os.path.join(eventPaths, pathname) savepath = os.path.join(savePath, pathname) if not os.path.exists(savepath): os.makedirs(savepath) tracking_simulate(eventpath, savepath) # try: # tracking_simulate(eventpath, savepath) # except Exception as e: # print(f'Error! {eventpath}, {e}') k += 1 if k==1: break if __name__ == "__main__": # main_loop() main() # try: # main_loop() # except Exception as e: # print(f'Error: {e}')