# -*- coding: utf-8 -*- """ Created on Sun Mar 2 14:15:57 2025 @author: ym """ import numpy as np import cv2 import os from pathlib import Path import sys sys.path.append(r"D:\DetectTracking") # from tracking.utils.read_data import extract_data_realtime, read_tracking_output_realtime from tracking.utils.plotting import Annotator, colors from tracking.utils import Boxes, IterableSimpleNamespace, yaml_load, boxes_add_fid from tracking.trackers import BOTSORT, BYTETracker from tracking.utils.showtrack import drawtracks from hands.hand_inference import hand_pose from tracking.utils.read_data import read_weight_sensor, extract_data_realtime, read_tracking_output_realtime from contrast.feat_extract.config import config as conf from contrast.feat_extract.inference import FeatsInterface from tracking.utils.drawtracks import drawTrack ReIDEncoder = FeatsInterface(conf) W, H = 1024, 1280 Mode = 'front' #'back' ImgFormat = ['.jpg', '.jpeg', '.png', '.bmp'] '''调用tracking()函数,利用本地跟踪算法获取各目标轨迹,可以比较本地跟踪算法与现场跟踪算法的区别。''' def init_tracker(tracker_yaml = None, bs=1): """ Initialize tracker for object tracking during prediction. """ TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT} cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml)) tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30) return tracker def init_trackers(tracker_yaml = None, bs=1): """ Initialize trackers for object tracking during prediction. """ # tracker_yaml = r"./tracking/trackers/cfg/botsort.yaml" TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT} cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml)) trackers = [] for _ in range(bs): tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30) trackers.append(tracker) return trackers def draw_box(img, tracks): annotator = Annotator(img.copy(), line_width=2) # for *xyxy, conf, cls in reversed(tracks): # name = f'{int(cls)} {conf:.2f}' # color = colors(int(cls), True) # annotator.box_label(xyxy, name, color=color) for *xyxy, id, conf, cls, fid, bid in reversed(tracks): name = f'ID:{int(id)} {int(cls)} {conf:.2f}' color = colors(int(cls), True) annotator.box_label(xyxy, name, color=color) im0 = annotator.result() return im0 def tracking(bboxes, ffeats): tracker_yaml = "./tracking/trackers/cfg/botsort.yaml" tracker = init_tracker(tracker_yaml) TrackBoxes = np.empty((0, 9), dtype = np.float32) TracksDict = {} frmIds = [] '''========================== 执行跟踪处理 =============================''' # dets 与 feats 应保持严格对应 k=0 for dets, feats in zip(bboxes, ffeats): frmIds.append(np.unique(dets[:, 6]).astype(np.int64)[0]) boxes = dets[:, :6] det_tracking = Boxes(boxes).cpu().numpy() tracks, outfeats = tracker.update(det_tracking, features=feats) '''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 这里,frame_index 也可以用视频的 帧ID 代替, box_index 保持不变 ''' k += 1 imgpath = r"D:\全实时\202502\tracker\Yolos_Tracking\tracker\1_1740891284792\1_1740891284792_{}.png".format(int(k)) img = cv2.imread(imgpath) im0 = draw_box(img, tracks) savepath = r"D:\全实时\202502\tracker\Yolos_Tracking\tracker\1_1740891284792\b\1_1740891284792_{}_b.png".format(k) cv2.imwrite(savepath, im0) if len(tracks): TrackBoxes = np.concatenate([TrackBoxes, tracks], axis=0) # ============================================================================= # FeatDict = {} # for track in tracks: # tid = int(track[8]) # FeatDict.update({tid: feats[tid, :]}) # # frameID = tracks[0, 7] # # # print(f"frameID: {int(frameID)}") # assert len(tracks) == len(FeatDict), f"Please check the func: tracker.update() at frameID({int(frameID)})" # # TracksDict[f"frame_{int(frameID)}"] = {"feats":FeatDict} # ============================================================================= return TrackBoxes, TracksDict def dotrack(): datapath = r"D:\全实时\202502\tracker\1_tracker_in.data" bboxes, ffeats = extract_data_realtime(datapath) trackerboxes, tracker_feat_dict = tracking(bboxes, ffeats) print("done!") # def plotbox(): # fpath = r"\\192.168.1.28\share\测试视频数据以及日志\全实时测试\V12\2025-3-3\20250303-103833-338_6928804010091_6928804010091\1_tracking_output.data" # imgpath = r"D:\全实时\202502\result\Yolos_Tracking\20250303-103833-338_6928804010091_6928804010091\1_1740969517953" # trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(fpath) # for *xyxy, id, conf, cls, fid, bid in tracking_outboxes[0]: # imgname = f"1_1740969517953_{int(fid)}.png" # img_path = os.path.join(imgpath, imgname) # img = cv2.imread(img_path) # annotator = Annotator(img.copy(), line_width=2) # name = f'ID:{int(id)} {int(cls)} {conf:.2f}' # color = colors(int(cls), True) # annotator.box_label(xyxy, name, color=color) # im0 = annotator.result() # cv2.imwrite(os.path.join(imgpath, f"1_1740969517953_{int(fid)}_.png"), im0) # print(f"1_1740969676295_{int(fid)}_.png") # print("done") def video2imgs(videopath): cap = cv2.VideoCapture(str(videopath)) k = 0 while True: ret, frame = cap.read() if frame is None: break k += 1 imgpath = videopath.parent / f"{videopath.stem}_{k}.png" cv2.imwrite(str(imgpath), frame) def extract_evtimgs(evtpath): vidpaths = [v for v in evtpath.iterdir() if v.suffix == '.mp4'] for vidpath in vidpaths: video2imgs(vidpath) stamps = [name.stem.split('_')[1] for name in vidpaths] if len(set(stamps)==1): return stamps[0] return None def draw_tracking_boxes(evtpath, stamp): for datapath in evtpath.iterdir(): if datapath.name.find('_tracking_output.data')<=0: continue camera = datapath.stem.split('_')[0] trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(str(datapath)) ## 该模块先读取轨迹数据,再根据帧ID读取相应图像 for *xyxy, id, conf, cls, fid, bid in tracking_outboxes[0]: imgpath = evtpath / f"{camera}_{stamp}_{int(fid)}.png" img = cv2.imread(str(imgpath)) annotator = Annotator(img.copy(), line_width=2) name = f'ID:{int(id)} {int(cls)} {conf:.2f}' color = colors(int(cls), True) annotator.box_label(xyxy, name, color=color) im0 = annotator.result() cv2.imwrite(imgpath, im0) print(datapath.name) def draw_traj(evtpath): for datapath in evtpath.iterdir(): if datapath.name.find('_tracking_output.data')<=0: continue fname = datapath.name trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(datapath) CamerType = fname.split('_')[0] if CamerType == '1': edgeline = cv2.imread("./CartTemp/board_ftmp_line.png") if CamerType == '0': edgeline = cv2.imread("./CartTemp/edgeline.png") edgeline = drawTrack(tracking_outboxes, edgeline) imgpath = datapath.parent / f"{datapath.stem}.png" cv2.imwrite(str(imgpath), edgeline) def main(): path = r"\\192.168.1.28\share\测试视频数据以及日志\全实时测试\V12\2025-3-3\20250303-104225-381_6920459958674" evtpaths = [p for p in Path(path).iterdir() if p.is_dir()] for evtpath in evtpaths: #1. 从事件的前后摄视频提取图像 stamp = extract_evtimgs(evtpath) #2. 根据 0/1_tracking_output.data 中提取的轨迹在img中绘制box draw_tracking_boxes(evtpath, stamp) #3. 根据 0/1_tracking_output.data 中提取的轨迹在edgeline中绘制box draw_traj(evtpath) if __name__ == '__main__': # dotrack() # plotbox() vpath = r"D:\datasets\ym\VID_20250307_105606" extract_evtimgs(Path(vpath))