282 lines
8.8 KiB
Python
282 lines
8.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on Sun Mar 2 14:15:57 2025
|
||
|
||
@author: ym
|
||
"""
|
||
import numpy as np
|
||
import cv2
|
||
import os
|
||
from pathlib import Path
|
||
|
||
import sys
|
||
sys.path.append(r"D:\DetectTracking")
|
||
|
||
# from tracking.utils.read_data import extract_data_realtime, read_tracking_output_realtime
|
||
from tracking.utils.plotting import Annotator, colors
|
||
from tracking.utils import Boxes, IterableSimpleNamespace, yaml_load, boxes_add_fid
|
||
from tracking.trackers import BOTSORT, BYTETracker
|
||
from tracking.utils.showtrack import drawtracks
|
||
from hands.hand_inference import hand_pose
|
||
from tracking.utils.read_data import read_weight_sensor, extract_data_realtime, read_tracking_output_realtime
|
||
from contrast.feat_extract.config import config as conf
|
||
from contrast.feat_extract.inference import FeatsInterface
|
||
from tracking.utils.drawtracks import drawTrack
|
||
|
||
|
||
ReIDEncoder = FeatsInterface(conf)
|
||
|
||
|
||
W, H = 1024, 1280
|
||
Mode = 'front' #'back'
|
||
ImgFormat = ['.jpg', '.jpeg', '.png', '.bmp']
|
||
|
||
|
||
'''调用tracking()函数,利用本地跟踪算法获取各目标轨迹,可以比较本地跟踪算法与现场跟踪算法的区别。'''
|
||
def init_tracker(tracker_yaml = None, bs=1):
|
||
"""
|
||
Initialize tracker for object tracking during prediction.
|
||
"""
|
||
TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT}
|
||
cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml))
|
||
|
||
tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30)
|
||
|
||
return tracker
|
||
|
||
|
||
def init_trackers(tracker_yaml = None, bs=1):
|
||
"""
|
||
Initialize trackers for object tracking during prediction.
|
||
"""
|
||
# tracker_yaml = r"./tracking/trackers/cfg/botsort.yaml"
|
||
|
||
TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT}
|
||
|
||
cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml))
|
||
trackers = []
|
||
for _ in range(bs):
|
||
tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30)
|
||
trackers.append(tracker)
|
||
|
||
return trackers
|
||
|
||
|
||
def draw_box(img, tracks):
|
||
annotator = Annotator(img.copy(), line_width=2)
|
||
# for *xyxy, conf, cls in reversed(tracks):
|
||
# name = f'{int(cls)} {conf:.2f}'
|
||
# color = colors(int(cls), True)
|
||
# annotator.box_label(xyxy, name, color=color)
|
||
|
||
for *xyxy, id, conf, cls, fid, bid in reversed(tracks):
|
||
name = f'ID:{int(id)} {int(cls)} {conf:.2f}'
|
||
color = colors(int(cls), True)
|
||
annotator.box_label(xyxy, name, color=color)
|
||
|
||
|
||
|
||
im0 = annotator.result()
|
||
|
||
return im0
|
||
|
||
|
||
|
||
|
||
def tracking(bboxes, ffeats):
|
||
tracker_yaml = "./tracking/trackers/cfg/botsort.yaml"
|
||
|
||
tracker = init_tracker(tracker_yaml)
|
||
|
||
TrackBoxes = np.empty((0, 9), dtype = np.float32)
|
||
TracksDict = {}
|
||
|
||
frmIds = []
|
||
'''========================== 执行跟踪处理 ============================='''
|
||
# dets 与 feats 应保持严格对应
|
||
k=0
|
||
for dets, feats in zip(bboxes, ffeats):
|
||
|
||
frmIds.append(np.unique(dets[:, 6]).astype(np.int64)[0])
|
||
|
||
boxes = dets[:, :6]
|
||
det_tracking = Boxes(boxes).cpu().numpy()
|
||
tracks, outfeats = tracker.update(det_tracking, features=feats)
|
||
'''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
0 1 2 3 4 5 6 7 8
|
||
这里,frame_index 也可以用视频的 帧ID 代替, box_index 保持不变
|
||
'''
|
||
k += 1
|
||
imgpath = r"D:\全实时\202502\tracker\Yolos_Tracking\tracker\1_1740891284792\1_1740891284792_{}.png".format(int(k))
|
||
img = cv2.imread(imgpath)
|
||
|
||
im0 = draw_box(img, tracks)
|
||
savepath = r"D:\全实时\202502\tracker\Yolos_Tracking\tracker\1_1740891284792\b\1_1740891284792_{}_b.png".format(k)
|
||
cv2.imwrite(savepath, im0)
|
||
|
||
|
||
if len(tracks):
|
||
TrackBoxes = np.concatenate([TrackBoxes, tracks], axis=0)
|
||
|
||
# =============================================================================
|
||
# FeatDict = {}
|
||
# for track in tracks:
|
||
# tid = int(track[8])
|
||
# FeatDict.update({tid: feats[tid, :]})
|
||
#
|
||
# frameID = tracks[0, 7]
|
||
#
|
||
# # print(f"frameID: {int(frameID)}")
|
||
# assert len(tracks) == len(FeatDict), f"Please check the func: tracker.update() at frameID({int(frameID)})"
|
||
#
|
||
# TracksDict[f"frame_{int(frameID)}"] = {"feats":FeatDict}
|
||
# =============================================================================
|
||
|
||
|
||
return TrackBoxes, TracksDict
|
||
|
||
def dotrack():
|
||
|
||
datapath = r"D:\全实时\202502\tracker\1_tracker_in.data"
|
||
|
||
bboxes, ffeats = extract_data_realtime(datapath)
|
||
trackerboxes, tracker_feat_dict = tracking(bboxes, ffeats)
|
||
|
||
print("done!")
|
||
|
||
|
||
# def plotbox():
|
||
|
||
# fpath = r"\\192.168.1.28\share\测试视频数据以及日志\全实时测试\V12\2025-3-3\20250303-103833-338_6928804010091_6928804010091\1_tracking_output.data"
|
||
# imgpath = r"D:\全实时\202502\result\Yolos_Tracking\20250303-103833-338_6928804010091_6928804010091\1_1740969517953"
|
||
# trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(fpath)
|
||
|
||
# for *xyxy, id, conf, cls, fid, bid in tracking_outboxes[0]:
|
||
# imgname = f"1_1740969517953_{int(fid)}.png"
|
||
|
||
# img_path = os.path.join(imgpath, imgname)
|
||
# img = cv2.imread(img_path)
|
||
# annotator = Annotator(img.copy(), line_width=2)
|
||
|
||
# name = f'ID:{int(id)} {int(cls)} {conf:.2f}'
|
||
# color = colors(int(cls), True)
|
||
# annotator.box_label(xyxy, name, color=color)
|
||
# im0 = annotator.result()
|
||
# cv2.imwrite(os.path.join(imgpath, f"1_1740969517953_{int(fid)}_.png"), im0)
|
||
|
||
# print(f"1_1740969676295_{int(fid)}_.png")
|
||
|
||
# print("done")
|
||
|
||
def video2imgs(videopath):
|
||
cap = cv2.VideoCapture(str(videopath))
|
||
k = 0
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if frame is None:
|
||
break
|
||
k += 1
|
||
imgpath = videopath.parent / f"{videopath.stem}_{k}.png"
|
||
cv2.imwrite(str(imgpath), frame)
|
||
|
||
|
||
|
||
def extract_evtimgs(evtpath):
|
||
vidpaths = [v for v in evtpath.iterdir() if v.suffix == '.mp4']
|
||
for vidpath in vidpaths:
|
||
video2imgs(vidpath)
|
||
|
||
stamps = [name.stem.split('_')[1] for name in vidpaths]
|
||
|
||
if len(set(stamps)==1):
|
||
return stamps[0]
|
||
return None
|
||
|
||
def draw_tracking_boxes(evtpath, stamp):
|
||
for datapath in evtpath.iterdir():
|
||
if datapath.name.find('_tracking_output.data')<=0:
|
||
continue
|
||
|
||
camera = datapath.stem.split('_')[0]
|
||
trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(str(datapath))
|
||
|
||
## 该模块先读取轨迹数据,再根据帧ID读取相应图像
|
||
for *xyxy, id, conf, cls, fid, bid in tracking_outboxes[0]:
|
||
imgpath = evtpath / f"{camera}_{stamp}_{int(fid)}.png"
|
||
|
||
img = cv2.imread(str(imgpath))
|
||
annotator = Annotator(img.copy(), line_width=2)
|
||
|
||
name = f'ID:{int(id)} {int(cls)} {conf:.2f}'
|
||
color = colors(int(cls), True)
|
||
annotator.box_label(xyxy, name, color=color)
|
||
im0 = annotator.result()
|
||
cv2.imwrite(imgpath, im0)
|
||
|
||
print(datapath.name)
|
||
|
||
def draw_traj(evtpath):
|
||
for datapath in evtpath.iterdir():
|
||
if datapath.name.find('_tracking_output.data')<=0:
|
||
continue
|
||
|
||
fname = datapath.name
|
||
trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(datapath)
|
||
|
||
CamerType = fname.split('_')[0]
|
||
if CamerType == '1':
|
||
edgeline = cv2.imread("./CartTemp/board_ftmp_line.png")
|
||
if CamerType == '0':
|
||
edgeline = cv2.imread("./CartTemp/edgeline.png")
|
||
edgeline = drawTrack(tracking_outboxes, edgeline)
|
||
|
||
|
||
imgpath = datapath.parent / f"{datapath.stem}.png"
|
||
cv2.imwrite(str(imgpath), edgeline)
|
||
|
||
|
||
def main():
|
||
|
||
path = r"\\192.168.1.28\share\测试视频数据以及日志\全实时测试\V12\2025-3-3\20250303-104225-381_6920459958674"
|
||
evtpaths = [p for p in Path(path).iterdir() if p.is_dir()]
|
||
for evtpath in evtpaths:
|
||
#1. 从事件的前后摄视频提取图像
|
||
stamp = extract_evtimgs(evtpath)
|
||
|
||
#2. 根据 0/1_tracking_output.data 中提取的轨迹在img中绘制box
|
||
draw_tracking_boxes(evtpath, stamp)
|
||
|
||
#3. 根据 0/1_tracking_output.data 中提取的轨迹在edgeline中绘制box
|
||
draw_traj(evtpath)
|
||
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# dotrack()
|
||
# plotbox()
|
||
|
||
vpath = r"D:\datasets\ym\VID_20250307_105606"
|
||
extract_evtimgs(Path(vpath))
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|