# YOLOv5 🚀 by Ultralytics, AGPL-3.0 license """ Run YOLOv5 detection inference on images, videos, directories, globs, YouTube, webcam, streams, etc. Usage - sources: $ python detect.py --weights yolov5s.pt --source 0 # webcam img.jpg # image vid.mp4 # video screen # screenshot path/ # directory list.txt # list of images list.streams # list of streams 'path/*.jpg' # glob 'https://youtu.be/Zgi9g1ksQHc' # YouTube 'rtsp://example.com/media.mp4' # RTSP, RTMP, HTTP stream Usage - formats: $ python detect.py --weights yolov5s.pt # PyTorch yolov5s.torchscript # TorchScript yolov5s.onnx # ONNX Runtime or OpenCV DNN with --dnn yolov5s_openvino_model # OpenVINO yolov5s.engine # TensorRT yolov5s.mlmodel # CoreML (macOS-only) yolov5s_saved_model # TensorFlow SavedModel yolov5s.pb # TensorFlow GraphDef yolov5s.tflite # TensorFlow Lite yolov5s_edgetpu.tflite # TensorFlow Edge TPU yolov5s_paddle_model # PaddlePaddle """ import argparse import csv import os import platform import sys from pathlib import Path import glob import numpy as np import pickle import torch from scipy.spatial.distance import cdist FILE = Path(__file__).resolve() ROOT = FILE.parents[0] # YOLOv5 root directory if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) # add ROOT to PATH ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative from models.common import DetectMultiBackend from utils.dataloaders import IMG_FORMATS, VID_FORMATS, LoadImages, LoadScreenshots, LoadStreams from utils.general import (LOGGER, Profile, check_file, check_img_size, check_imshow, check_requirements, colorstr, cv2, increment_path, non_max_suppression, print_args, scale_boxes, strip_optimizer, xyxy2xywh) from utils.torch_utils import select_device, smart_inference_mode '''集成跟踪模块,输出跟踪结果文件 .npy''' # from ultralytics.engine.results import Boxes # Results # from ultralytics.utils import IterableSimpleNamespace, yaml_load from tracking.utils.plotting import Annotator, colors from tracking.utils import Boxes, IterableSimpleNamespace, yaml_load, boxes_add_fid from tracking.trackers import BOTSORT, BYTETracker from tracking.utils.showtrack import drawtracks from hands.hand_inference import hand_pose from contrast.feat_extract.config import config as conf from contrast.feat_extract.inference import FeatsInterface from ultralytics import YOLOv10 ReIDEncoder = FeatsInterface(conf) print(f'load model {conf.testbackbone} in {Path(__file__).stem}') IMG_FORMATS = '.bmp', '.dng', '.jpeg', '.jpg', '.mpo', '.png', '.tif', '.tiff', '.webp', '.pfm' # include image suffixes VID_FORMATS = '.asf', '.avi', '.gif', '.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.ts', '.wmv' # include video suffixes # from tracking.trackers.reid.reid_interface import ReIDInterface # from tracking.trackers.reid.config import config as ReIDConfig # ReIDEncoder = ReIDInterface(ReIDConfig) # tracker_yaml = r"./tracking/trackers/cfg/botsort.yaml" # def inference_image(image, detections): # H, W, _ = np.shape(image) # imgs = [] # batch_patches = [] # patches = [] # for d in range(np.size(detections, 0)): # tlbr = detections[d, :4].astype(np.int_) # tlbr[0] = max(0, tlbr[0]) # tlbr[1] = max(0, tlbr[1]) # tlbr[2] = min(W - 1, tlbr[2]) # tlbr[3] = min(H - 1, tlbr[3]) # img1 = image[tlbr[1]:tlbr[3], tlbr[0]:tlbr[2], :] # img = img1[:, :, ::-1].copy() # the model expects RGB inputs # patch = ReIDEncoder.transform(img) # imgs.append(img1) # # patch = patch.to(device=self.device).half() # if str(ReIDEncoder.device) != "cpu": # patch = patch.to(device=ReIDEncoder.device).half() # else: # patch = patch.to(device=ReIDEncoder.device) # patches.append(patch) # if (d + 1) % ReIDEncoder.batch_size == 0: # patches = torch.stack(patches, dim=0) # batch_patches.append(patches) # patches = [] # if len(patches): # patches = torch.stack(patches, dim=0) # batch_patches.append(patches) # features = np.zeros((0, ReIDEncoder.embedding_size)) # for patches in batch_patches: # pred = ReIDEncoder.model(patches) # pred[torch.isinf(pred)] = 1.0 # feat = pred.cpu().data.numpy() # features = np.vstack((features, feat)) # return imgs, features def init_trackers(tracker_yaml = None, bs=1): """ Initialize trackers for object tracking during prediction. """ # tracker_yaml = r"./tracking/trackers/cfg/botsort.yaml" tracker_yaml = str(tracker_yaml) TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT} cfg = IterableSimpleNamespace(**yaml_load(tracker_yaml)) trackers = [] for _ in range(bs): tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30) if cfg.with_reid: tracker.encoder = ReIDEncoder trackers.append(tracker) return trackers '''=============== used in pipeline.py for Yolov10 ==================''' def yolov10_resnet_tracker( weights = ROOT / 'ckpts/best_v10s_width0375_1205.pt', # model path or triton URL source = '', # file/dir/URL/glob/screen/0(webcam) save_dir = '', is_save_img = True, is_save_video = True, tracker_yaml = ROOT / "tracking/trackers/cfg/botsort.yaml", line_thickness=3, # bounding box thickness (pixels) hide_labels=False, # hide labels ): ## load a custom model model = YOLOv10(weights) custom = {"conf": 0.1, "batch": 1, "save": False, "mode": "predict"} kwargs = {"save": True, "imgsz": 640, "conf": 0.1} args = {**model.overrides, **custom, **kwargs} predictor = model.task_map[model.task]["predictor"](overrides=args, _callbacks=model.callbacks) vid_path, vid_writer = None, None tracker = init_trackers(tracker_yaml)[0] yoloResnetTracker = [] for i, result in enumerate(predictor.stream_inference(source)): datamode = predictor.dataset.mode det = result.boxes.data.cpu().numpy() im0 = result.orig_img names = result.names path = result.path im_array = result.plot() ## to do tracker.update() det_tracking = Boxes(det, im0.shape) tracks, outfeats = tracker.update(det_tracking, im0) if datamode == "video": frameId = predictor.dataset.frame elif datamode == "image": frameId = predictor.dataset.count annotator = Annotator(im0.copy(), line_width=line_thickness, example=str(names)) simdict, simdict1 = {}, {} for fid, bid, mfeat, cfeat, features in outfeats: if mfeat is not None and cfeat is not None: simi = 1 - np.maximum(0.0, cdist(mfeat[None, :], cfeat[None, :], "cosine"))[0][0] simdict.update({f"{int(frameId)}_{int(bid)}":simi}) if cfeat is not None and len(features)>=2: mfeat = features[-2] simi = 1 - np.maximum(0.0, cdist(mfeat[None, :], cfeat[None, :], "cosine"))[0][0] simdict1.update({f"{int(frameId)}_{int(bid)}":simi}) if len(tracks) > 0: tracks[:, 7] = frameId # trackerBoxes = np.concatenate([trackerBoxes, tracks], axis=0) '''================== 1. 存储 dets/subimgs/features Dict =============''' imgs, features = ReIDEncoder.inference(im0, tracks) imgdict, featdict = {}, {} for ii, bid in enumerate(tracks[:, 8]): featdict.update({f"{int(frameId)}_{int(bid)}": features[ii, :]}) # [f"feat_{int(bid)}"] = features[i, :] imgdict.update({f"{int(frameId)}_{int(bid)}": imgs[ii]}) frameDict = {"path": path, "fid": int(frameId), "bboxes": det, "tboxes": tracks, "imgs": imgdict, "feats": featdict, "featsimi": simdict, # 当前 box 特征和该轨迹 smooth_feat 特征的相似度 "featsimi1": simdict1 # 当前 box 特征和该轨迹前一个 box 特征的相似度 } yoloResnetTracker.append(frameDict) # imgs, features = inference_image(im0, tracks) # TrackerFeats = np.concatenate([TrackerFeats, features], axis=0) '''================== 2. 提取手势位置 ===================''' for *xyxy, id, conf, cls, fid, bid in reversed(tracks): name = ('' if id==-1 else f'id:{int(id)} ') + names[int(cls)] if f"{int(frameId)}_{int(bid)}" in simdict.keys(): sim = simdict[f"{int(frameId)}_{int(bid)}"] label = f"{name} {sim:.2f}" else: label = None if hide_labels else name # label = None if hide_labels else (name if hide_conf else f'{name} {conf:.1f}') if id >=0 and cls==0: color = colors(int(cls), True) elif id >=0 and cls!=0: color = colors(int(id), True) else: color = colors(19, True) # 19为调色板的最后一个元素 annotator.box_label(xyxy, label, color=color) '''====== Save results (image and video) ======''' # save_path = str(save_dir / Path(path).name) # 带有后缀名 im0 = annotator.result() if is_save_img: save_path_img = str(save_dir / Path(path).stem) if datamode == 'image': imgpath = save_path_img + ".png" if datamode == 'video' : imgpath = save_path_img + f"_{frameId}.png" cv2.imwrite(Path(imgpath), im0) # if dataset.mode == 'video' and is_save_video: if is_save_video: if datamode == 'video': video_path = str(save_dir / Path(path).stem) + '.mp4' # 带有后缀名 else: videoname = str(Path(path).stem).split('_')[0] + '.mp4' video_path = str(save_dir / videoname) if vid_path != video_path: # new video vid_path = video_path vid_cap = predictor.dataset.cap if isinstance(vid_writer, cv2.VideoWriter): vid_writer.release() # release previous video writer if vid_cap: # video fps = vid_cap.get(cv2.CAP_PROP_FPS) w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) else: # stream fps, w, h = 25, im0.shape[1], im0.shape[0] ## for image rotating in dataloader.LoadImages.__next__() w, h = im0.shape[1], im0.shape[0] video_path = str(Path(video_path).with_suffix('.mp4')) # force *.mp4 suffix on results videos vid_writer = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) vid_writer.write(im0) return yoloResnetTracker '''=============== used in pipeline.py for Yolov5 ==================''' @smart_inference_mode() def yolo_resnet_tracker( weights=ROOT / 'yolov5s.pt', # model path or triton URL source=ROOT / 'data/images', # file/dir/URL/glob/screen/0(webcam) save_dir = '', is_save_img = True, is_save_video = True, tracker_yaml = ROOT / "tracking/trackers/cfg/botsort.yaml", imgsz=(640, 640), # inference size (height, width) conf_thres=0.25, # confidence threshold iou_thres=0.45, # NMS IOU threshold max_det=1000, # maximum detections per image device='', # cuda device, i.e. 0 or 0,1,2,3 or cpu classes=None, # filter by class: --class 0, or --class 0 2 3 agnostic_nms=False, # class-agnostic NMS augment=False, # augmented inference line_thickness=3, # bounding box thickness (pixels) hide_labels=False, # hide labels hide_conf=False, # hide confidencesL half=False, # use FP16 half-precision inference dnn=False, # use OpenCV DNN for ONNX inference vid_stride=1, # video frame-rate stride data=ROOT / 'data/coco128.yaml', # dataset.yaml path ): # source = str(source) # Load model device = select_device(device) model = DetectMultiBackend(weights, device=device, dnn=dnn, data=data, fp16=half) stride, names, pt = model.stride, model.names, model.pt imgsz = check_img_size(imgsz, s=stride) # check image size # Dataloader bs = 1 # batch_size dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt, vid_stride=vid_stride) vid_path, vid_writer = [None] * bs, [None] * bs # Run inference model.warmup(imgsz=(1 if pt or model.triton else bs, 3, *imgsz)) # warmup tracker = init_trackers(tracker_yaml, bs)[0] dt = (Profile(), Profile(), Profile()) # trackerBoxes = np.empty((0, 9), dtype = np.float32) yoloResnetTracker = [] for path, im, im0s, vid_cap, s in dataset: with dt[0]: im = torch.from_numpy(im).to(model.device) im = im.half() if model.fp16 else im.float() # uint8 to fp16/32 im /= 255 # 0 - 255 to 0.0 - 1.0 if len(im.shape) == 3: im = im[None] # expand for batch dim # Inference with dt[1]: # visualize = increment_path(project / Path(path).stem, mkdir=True) if visualize else False pred = model(im, augment=augment, visualize=False) # NMS with dt[2]: pred = non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det) if dataset.mode == "video": frameId = dataset.frame else: frameId = dataset.count # Process predictions for i, det in enumerate(pred): # per image im0 = im0s.copy() annotator = Annotator(im0.copy(), line_width=line_thickness, example=str(names)) s += '%gx%g ' % im.shape[2:] # print string if len(det): # Rescale boxes from img_size to im0 size det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round() det = det.cpu().numpy() ## ================================================================ writed by WQG '''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 这里,frame_index 也可以用视频的 帧ID 代替, box_index 保持不变 ''' det_tracking = Boxes(det, im0.shape).cpu().numpy() tracks, outfeats = tracker.update(det_tracking, im0) simdict, simdict1 = {}, {} for fid, bid, mfeat, cfeat, features in outfeats: if mfeat is not None and cfeat is not None: simi = 1 - np.maximum(0.0, cdist(mfeat[None, :], cfeat[None, :], "cosine"))[0][0] simdict.update({f"{int(frameId)}_{int(bid)}":simi}) if cfeat is not None and len(features)>=2: mfeat = features[-2] simi = 1 - np.maximum(0.0, cdist(mfeat[None, :], cfeat[None, :], "cosine"))[0][0] simdict1.update({f"{int(frameId)}_{int(bid)}":simi}) if len(tracks) > 0: tracks[:, 7] = frameId # trackerBoxes = np.concatenate([trackerBoxes, tracks], axis=0) '''================== 1. 存储 dets/subimgs/features Dict =============''' imgs, features = ReIDEncoder.inference(im0, tracks) imgdict, featdict = {}, {} for ii, bid in enumerate(tracks[:, 8]): featdict.update({f"{int(frameId)}_{int(bid)}": features[ii, :]}) # [f"feat_{int(bid)}"] = features[i, :] imgdict.update({f"{int(frameId)}_{int(bid)}": imgs[ii]}) frameDict = {"path": path, "fid": int(frameId), "bboxes": det, "tboxes": tracks, "imgs": imgdict, "feats": featdict, "featsimi": simdict, # 当前 box 特征和该轨迹 smooth_feat 特征的相似度 "featsimi1": simdict1 # 当前 box 特征和该轨迹前一个 box 特征的相似度 } yoloResnetTracker.append(frameDict) # imgs, features = inference_image(im0, tracks) # TrackerFeats = np.concatenate([TrackerFeats, features], axis=0) '''================== 2. 提取手势位置 ===================''' for *xyxy, id, conf, cls, fid, bid in reversed(tracks): name = ('' if id==-1 else f'id:{int(id)} ') + names[int(cls)] if f"{int(frameId)}_{int(bid)}" in simdict.keys(): sim = simdict[f"{int(frameId)}_{int(bid)}"] label = f"{name} {sim:.2f}" else: label = None if hide_labels else name # label = None if hide_labels else (name if hide_conf else f'{name} {conf:.1f}') if id >=0 and cls==0: color = colors(int(cls), True) elif id >=0 and cls!=0: color = colors(int(id), True) else: color = colors(19, True) # 19为调色板的最后一个元素 annotator.box_label(xyxy, label, color=color) '''====== Save results (image and video) ======''' # save_path = str(save_dir / Path(path).name) # 带有后缀名 im0 = annotator.result() if is_save_img: save_path_img = str(save_dir / Path(path).stem) if dataset.mode == 'image': imgpath = save_path_img + ".png" else: imgpath = save_path_img + f"_{frameId}.png" cv2.imwrite(Path(imgpath), im0) # if dataset.mode == 'video' and is_save_video: if is_save_video: if dataset.mode == 'video': vdieo_path = str(save_dir / Path(path).stem) + '.mp4' # 带有后缀名 else: videoname = str(Path(path).stem).split('_')[0] + '.mp4' vdieo_path = str(save_dir / videoname) if vid_path[i] != vdieo_path: # new video vid_path[i] = vdieo_path if isinstance(vid_writer[i], cv2.VideoWriter): vid_writer[i].release() # release previous video writer if vid_cap: # video fps = vid_cap.get(cv2.CAP_PROP_FPS) w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) else: # stream fps, w, h = 25, im0.shape[1], im0.shape[0] ## for image rotating in dataloader.LoadImages.__next__() w, h = im0.shape[1], im0.shape[0] vdieo_path = str(Path(vdieo_path).with_suffix('.mp4')) # force *.mp4 suffix on results videos vid_writer[i] = cv2.VideoWriter(vdieo_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) vid_writer[i].write(im0) # Print time (inference-only) LOGGER.info(f"{s}{'' if len(det) else '(no detections), '}{dt[1].dt * 1E3:.1f}ms") return yoloResnetTracker @smart_inference_mode() def run( weights=ROOT / 'yolov5s.pt', # model path or triton URL source=ROOT / 'data/images', # file/dir/URL/glob/screen/0(webcam) project=ROOT / 'runs/detect', # save results to project/name name='exp', # save results to project/name tracker_yaml = ROOT / "tracking/trackers/cfg/botsort.yaml", imgsz=(640, 640), # inference size (height, width) conf_thres=0.25, # confidence threshold iou_thres=0.45, # NMS IOU threshold max_det=1000, # maximum detections per image device='', # cuda device, i.e. 0 or 0,1,2,3 or cpu view_img=False, # show results save_txt=False, # save results to *.txt save_csv=False, # save results in CSV format save_conf=False, # save confidences in --save-txt labels save_crop=False, # save cropped prediction boxes nosave=False, # do not save images/videos classes=None, # filter by class: --class 0, or --class 0 2 3 agnostic_nms=False, # class-agnostic NMS augment=False, # augmented inference visualize=False, # visualize features update=False, # update all models exist_ok=False, # existing project/name ok, do not increment line_thickness=3, # bounding box thickness (pixels) hide_labels=False, # hide labels hide_conf=False, # hide confidencesL half=False, # use FP16 half-precision inference dnn=False, # use OpenCV DNN for ONNX inference vid_stride=1, # video frame-rate stride data=ROOT / 'data/coco128.yaml', # dataset.yaml path ): ''' source: 视频文件或图像列表 ''' source = str(source) # filename = os.path.split(source)[-1] save_img = not nosave and not source.endswith('.txt') # save inference images is_file = Path(source).suffix[1:] in (IMG_FORMATS + VID_FORMATS) is_url = source.lower().startswith(('rtsp://', 'rtmp://', 'http://', 'https://')) webcam = source.isnumeric() or source.endswith('.streams') or (is_url and not is_file) screenshot = source.lower().startswith('screen') if is_url and is_file: source = check_file(source) # download # spth = source.split('\\')[-2] + "_" + Path(source).stem save_dir = Path(project) / Path(source.split('\\')[-2] + "_" + str(Path(source).stem)) # save_dir = Path(project) / Path(source).stem if save_dir.exists(): print(Path(source).stem) # return save_dir = increment_path(Path(project) / name, exist_ok=exist_ok) # increment run (save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True, exist_ok=True) # make dir else: save_dir.mkdir(parents=True, exist_ok=True) # Load model device = select_device(device) model = DetectMultiBackend(weights, device=device, dnn=dnn, data=data, fp16=half) stride, names, pt = model.stride, model.names, model.pt imgsz = check_img_size(imgsz, s=stride) # check image size # Dataloader bs = 1 # batch_size dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt, vid_stride=vid_stride) vid_path, vid_writer = [None] * bs, [None] * bs # Run inference model.warmup(imgsz=(1 if pt or model.triton else bs, 3, *imgsz)) # warmup seen, dt = 0, (Profile(), Profile(), Profile()) tracker = init_trackers(tracker_yaml, bs)[0] handpose = hand_pose() handlocals_dict = {} boxes_and_imgs = [] BoxesFeats = [] track_boxes = np.empty((0, 9), dtype = np.float32) det_boxes = np.empty((0, 9), dtype = np.float32) DetBoxes = np.empty((0, 6), dtype = np.float32) TrackerBoxes = np.empty((0, 9), dtype = np.float32) TrackerFeats = np.empty((0, 256), dtype = np.float32) features_dict = {} TracksDict = {} for path, im, im0s, vid_cap, s in dataset: if save_img and 'imgshow' not in locals().keys(): imgshow = im0s.copy() ## ============================= tracking 功能只处理视频,writed by WQG # if dataset.mode == 'image': # continue with dt[0]: im = torch.from_numpy(im).to(model.device) im = im.half() if model.fp16 else im.float() # uint8 to fp16/32 im /= 255 # 0 - 255 to 0.0 - 1.0 if len(im.shape) == 3: im = im[None] # expand for batch dim # Inference with dt[1]: visualize = increment_path(save_dir / Path(path).stem, mkdir=True) if visualize else False pred = model(im, augment=augment, visualize=visualize) # NMS with dt[2]: pred = non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det) # Process predictions for i, det in enumerate(pred): # per image seen += 1 if webcam: # batch_size >= 1 p, im0, frame = path[i], im0s[i].copy(), dataset.count s += f'{i}: ' else: p, im0, frame = path, im0s.copy(), getattr(dataset, 'frame', 0) s += '%gx%g ' % im.shape[2:] # print string # im0_ant = im0.copy() annotator = Annotator(im0.copy(), line_width=line_thickness, example=str(names)) nd = len(det) if nd: # Rescale boxes from img_size to im0 size det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round() # det = det.cpu().numpy() ## ============================================================ 前后帧相同 boxes 的特征赋值 # def static_estimate(box1, box2, TH1=8, TH2=12): # dij_abs = max(np.abs(box1 - box2)) # dij_euc = max([np.linalg.norm((box1[:2] - box2[:2])), # np.linalg.norm((box1[2:4] - box2[2:4])) # ]) # if dij_abs < TH1 and dij_euc < TH2: # return True # else: # return False # nw = 3 # 向前递推检查的窗口大小 # nf = len(BoxesFeats) # 已经检测+特征提取的帧数 # feat_curr = [None] * nd # nd: 当前帧检测出的boxes数 # for ii in range(nd): # box = det[ii, :4] # kk=1 # feat = None # while kk <= nw and nf>=kk: # ki = -1 * kk # boxes_ = BoxesFeats[ki][0] # feats_ = BoxesFeats[ki][1] # flag = [jj for jj in range(len(boxes_)) if static_estimate(box, boxes_[jj, :4])] # if len(flag) == 1: # feat = feats_[flag[0]] # break # kk += 1 # if feat is not None: # feat_curr[ii] = feat ## ================================================================ writed by WQG '''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 这里,frame_index 也可以用视频的 帧ID 代替, box_index 保持不变 ''' det_tracking = Boxes(det, im0.shape).cpu().numpy() tracks, outfeats = tracker.update(det_tracking, im0) if len(tracks) == 0: continue if dataset.mode == "video": frameId = dataset.frame else: frameId = dataset.count tracks[:, 7] = frameId tracks[:, 7] = frameId '''================== 1. 存储 dets/subimgs/features Dict =============''' # imgs, features = inference_image(im0, tracks) imgs, features = ReIDEncoder.inference(im0, tracks) TrackerFeats = np.concatenate([TrackerFeats, features], axis=0) imgdict = {} boxdict = {} featdict = {} for ii, bid in enumerate(tracks[:, 8]): imgdict.update({int(bid): imgs[ii]}) # [f"img_{int(bid)}"] = imgs[i] boxdict.update({int(bid): tracks[ii, :]}) # [f"box_{int(bid)}"] = tracks[i, :] featdict.update({int(bid): features[ii, :]}) # [f"feat_{int(bid)}"] = features[i, :] TracksDict[f"frame_{int(frameId)}"] = {"imgs":imgdict, "boxes":boxdict, "feats":featdict} track_boxes = np.concatenate([track_boxes, tracks], axis=0) '''================== 2. 提取手势位置 ===================''' # idx_0 = tracks[:, 6].astype(np.int_) == 0 # hn = 0 # for j, index in enumerate(idx_0): # if index: # track = tracks[j, :] # hand_local, imgshow = handpose.get_hand_local(track, im0) # handlocals_dict.update({int(track[7]): {int(track[8]): hand_local}}) # # '''yoloV5和手势检测的召回率并不一直,用hand_local代替tracks中手部的(x1, y1, x2, y2),会使得两种坐标方式混淆''' # # if hand_local: tracks[j, :4] = hand_local # hn += 1 # cv2.imwrite(f"D:\DeepLearning\yolov5\hands\images\{Path(source).stem}_{int(track[7])}_{hn}.png", imgshow) for *xyxy, id, conf, cls, fid, bid in reversed(tracks): name = ('' if id==-1 else f'id:{int(id)} ') + names[int(cls)] label = None if hide_labels else (name if hide_conf else f'{name} {conf:.2f}') if id >=0 and cls==0: color = colors(int(cls), True) elif id >=0 and cls!=0: color = colors(int(id), True) else: color = colors(19, True) # 19为调色板的最后一个元素 annotator.box_label(xyxy, label, color=color) # Save results (image and video with tracking) im0 = annotator.result() p = Path(p) # to Path save_path = str(save_dir / p.name) # im.jpg if save_img: save_path_img, ext = os.path.splitext(save_path) if dataset.mode == 'image': imgpath = save_path_img + ".png" else: imgpath = save_path_img + f"_{frameId}.png" cv2.imwrite(Path(imgpath), im0) if dataset.mode == 'video': if vid_path[i] != save_path: # new video vid_path[i] = save_path if isinstance(vid_writer[i], cv2.VideoWriter): vid_writer[i].release() # release previous video writer if vid_cap: # video fps = vid_cap.get(cv2.CAP_PROP_FPS) w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) else: # stream fps, w, h = 30, im0.shape[1], im0.shape[0] ## for image rotating in dataloader.LoadImages.__next__() w, h = im0.shape[1], im0.shape[0] save_path = str(Path(save_path).with_suffix('.mp4')) # force *.mp4 suffix on results videos vid_writer[i] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) vid_writer[i].write(im0) # Print time (inference-only) LOGGER.info(f"{s}{'' if len(det) else '(no detections), '}{dt[1].dt * 1E3:.1f}ms") if track_boxes.size == 0: return ## ======================================================================== written by WQG ## track_boxes: Array, [x1, y1, x2, y2, track_id, score, cls, frame_index, box_id] TracksDict.update({"TrackBoxes": track_boxes}) '''上面保存了检测结果是视频和图像,以下还保存五种类型的数据''' filename = os.path.split(save_path_img)[-1] '''======================== 1. save in './run/detect/' ====================''' if source.find("front") >= 0 or Path(source).stem.split('_')[0] == '1': carttemp = cv2.imread("./tracking/shopcart/cart_tempt/board_ftmp_line.png") else: carttemp = cv2.imread("./tracking/shopcart/cart_tempt/edgeline.png") imgshow = drawtracks(track_boxes, carttemp) showpath_1 = save_path_img + "_show.png" cv2.imwrite(Path(showpath_1), imgshow) '''======================== 2. save dets/subimgs/features Dict ==================''' trackdicts_dir = Path('./tracking/data/trackdicts/') if not trackdicts_dir.exists(): trackdicts_dir.mkdir(parents=True, exist_ok=True) trackdicts_dir = trackdicts_dir.joinpath(f'{filename}.pkl') with open(trackdicts_dir, 'wb') as file: pickle.dump(TracksDict, file) # np.save(f'{filename}.npy', DetBoxes) '''======================== 3. save hand_local data ==================''' # handlocal_dir = Path('./tracking/data/handlocal/') # if not handlocal_dir.exists(): # handlocal_dir.mkdir(parents=True, exist_ok=True) # handlocal_path = handlocal_dir.joinpath(f'{filename}.pkl') # with open(handlocal_path, 'wb') as file: # pickle.dump(handlocals_dict, file) # Print results t = tuple(x.t / seen * 1E3 for x in dt) # speeds per image LOGGER.info(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {(1, 3, *imgsz)}' % t) if save_txt or save_img: s = f"\n{len(list(save_dir.glob('labels/*.txt')))} labels saved to {save_dir / 'labels'}" if save_txt else '' LOGGER.info(f"Results saved to {colorstr('bold', save_dir)}{s}") if update: strip_optimizer(weights[0]) # update model (to fix SourceChangeWarning) def parse_opt(): modelpath = ROOT / 'ckpts/best_cls10_0906.pt' # 'ckpts/best_15000_0908.pt', 'ckpts/yolov5s.pt', 'ckpts/best_20000_cls30.pt, best_yolov5m_250000' '''datapath为视频文件目录或视频文件''' datapath = r"D:/datasets/ym/videos/标记视频/" # ROOT/'data/videos', ROOT/'data/images' images # datapath = r"D:\datasets\ym\highvalue\videos" # datapath = r"D:/dcheng/videos/" # modelpath = ROOT / 'ckpts/yolov5s.pt' parser = argparse.ArgumentParser() parser.add_argument('--weights', nargs='+', type=str, default=modelpath, help='model path or triton URL') # 'yolov5s.pt', best_15000_0908.pt parser.add_argument('--source', type=str, default=datapath, help='file/dir/URL/glob/screen/0(webcam)') # images, videos parser.add_argument('--data', type=str, default=ROOT / 'data/coco128.yaml', help='(optional) dataset.yaml path') parser.add_argument('--imgsz', '--img', '--img-size', nargs='+', type=int, default=[640], help='inference size h,w') parser.add_argument('--conf-thres', type=float, default=0.25, help='confidence threshold') parser.add_argument('--iou-thres', type=float, default=0.45, help='NMS IoU threshold') parser.add_argument('--max-det', type=int, default=1000, help='maximum detections per image') parser.add_argument('--device', default='', help='cuda device, i.e. 0 or 0,1,2,3 or cpu') parser.add_argument('--view-img', action='store_true', help='show results') parser.add_argument('--save-txt', action='store_true', help='save results to *.txt') parser.add_argument('--save-csv', action='store_true', help='save results in CSV format') parser.add_argument('--save-conf', action='store_true', help='save confidences in --save-txt labels') parser.add_argument('--save-crop', action='store_true', help='save cropped prediction boxes') parser.add_argument('--nosave', action='store_true', help='do not save images/videos') parser.add_argument('--classes', nargs='+', type=int, help='filter by class: --classes 0, or --classes 0 2 3') parser.add_argument('--agnostic-nms', action='store_true', help='class-agnostic NMS') parser.add_argument('--augment', action='store_true', help='augmented inference') parser.add_argument('--visualize', action='store_true', help='visualize features') parser.add_argument('--update', action='store_true', help='update all models') parser.add_argument('--project', default=ROOT / 'runs/detect', help='save results to project/name') parser.add_argument('--name', default='exp', help='save results to project/name') parser.add_argument('--exist-ok', action='store_true', help='existing project/name ok, do not increment') parser.add_argument('--line-thickness', default=3, type=int, help='bounding box thickness (pixels)') parser.add_argument('--hide-labels', default=False, action='store_true', help='hide labels') parser.add_argument('--hide-conf', default=False, action='store_true', help='hide confidences') parser.add_argument('--half', action='store_true', help='use FP16 half-precision inference') parser.add_argument('--dnn', action='store_true', help='use OpenCV DNN for ONNX inference') parser.add_argument('--vid-stride', type=int, default=1, help='video frame-rate stride') opt = parser.parse_args() opt.imgsz *= 2 if len(opt.imgsz) == 1 else 1 # expand print_args(vars(opt)) return opt def find_video_imgs(root_dir): all_files = [] extensions = ['.mp4'] for dirpath, dirnames, filenames in os.walk(root_dir): for filename in filenames: file, ext = os.path.splitext(filename) if ext in IMG_FORMATS + VID_FORMATS: all_files.append(os.path.join(dirpath, filename)) return all_files def main_v5(): ''' run(): 单张图像或单个视频文件的推理,不支持图像序列, ''' check_requirements(ROOT / 'requirements.txt', exclude=('tensorboard', 'thop')) opt = parse_opt() optdict = vars(opt) # p = r"D:\datasets\ym\永辉测试数据_比对" # p = r"D:\datasets\ym\广告板遮挡测试\8" # p = r"D:\datasets\ym\videos\标记视频" # p = r"D:\datasets\ym\实验室测试" # p = r"D:\datasets\ym\永辉双摄视频\新建文件夹" # p = r"\\192.168.1.28\share\测试_202406\0723\0723_2\20240723-112522_" # p = r"D:\datasets\ym\联华中环" # p = r"D:\exhibition\images\153112511_0_seek_105.mp4" # p = r"D:\exhibition\images\image" p = r"D:\datasets\ym\后台数据\unzip\20250310-175352-741" optdict["project"] = r"D:\work\result" optdict["weights"] = ROOT / 'ckpts/best_cls10_0906.pt' if os.path.isdir(p): files = find_video_imgs(p) k = 0 for file in files: optdict["source"] = file run(**optdict) k += 1 if k == 2: break elif os.path.isfile(p): optdict["source"] = p run(**optdict) def main_v10(): datapath = r'D:\datasets\ym\后台数据\unzip\20250310-175352-741\0.mp4' savepath = r'D:\work\result' savepath = savepath / Path(str(Path(datapath).stem)) if not savepath.exists(): savepath.mkdir(parents=True, exist_ok=True) weightpath = ROOT / 'ckpts/best_v10s_width0375_1205.pt' optdict = {} optdict["weights"] = weightpath optdict["source"] = datapath optdict["save_dir"] = savepath optdict["is_save_img"] = True optdict["is_save_video"] = True yrtOut = yolov10_resnet_tracker(**optdict) if __name__ == '__main__': # main_v5() main_v10()