Files
detecttracking/pipeline.py
王庆刚 661489120b 20240102
2025-01-02 18:23:16 +08:00

295 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Sun Sep 29 08:59:21 2024
@author: ym
"""
import os
import sys
import cv2
import pickle
import argparse
import numpy as np
from pathlib import Path
from track_reid import parse_opt
from track_reid import yolo_resnet_tracker
# FILE = Path(__file__).resolve()
# ROOT = FILE.parents[0] # YOLOv5 root directory
# if str(ROOT) not in sys.path:
# sys.path.append(str(ROOT)) # add ROOT to PATH
# ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
from tracking.dotrack.dotracks_back import doBackTracks
from tracking.dotrack.dotracks_front import doFrontTracks
from tracking.utils.drawtracks import plot_frameID_y2, draw_all_trajectories
from utils.getsource import get_image_pairs, get_video_pairs
def get_interbcd_inputenents():
bcdpath = r"\\192.168.1.28\share\测试_202406\contrast\std_barcodes_2192"
eventpath = r"\\192.168.1.28\share\测试_202406\0918"
barcodes = []
eventpaths = []
for featname in os.listdir(bcdpath):
barcode, ext = os.path.splitext(featname)
barcodes.append(barcode)
input_enents = []
for root, dirs, files in os.walk(eventpath):
input_enent = [os.path.join(root, d) for d in dirs if d.split('_')[-1] in barcodes]
input_enents.extend(input_enent)
return input_enents
def pipeline(
eventpath,
savepath = '',
SourceType = "image", # video
stdfeat_path = None
):
if SourceType == "video":
vpaths = get_video_pairs(eventpath)
elif SourceType == "image":
vpaths = get_image_pairs(eventpath)
'''
eventpath: 单个事件的存储路径
'''
'''======== 函数 yolo_resnet_tracker() 的参数字典 ========'''
opt = parse_opt()
optdict = vars(opt)
optdict["weights"] = r'D:\DetectTracking\ckpts\best_cls10_0906.pt'
optdict["is_save_img"] = True
optdict["is_save_video"] = True
event_tracks = []
## 构造购物事件字典
evtname = Path(eventpath).stem
barcode = evtname.split('_')[-1] if len(evtname.split('_'))>=2 \
and len(evtname.split('_')[-1])>=8 \
and evtname.split('_')[-1].isdigit() else ''
'''事件结果存储文件夹'''
if not savepath:
savepath = Path(__file__).resolve().parents[0] / "evtresult"
save_dir_event = Path(savepath) / evtname
pickpath = Path(savepath)/"pickfile"
if not pickpath.exists():
pickpath.mkdir(parents=True, exist_ok=True)
ShoppingDict = {"eventPath": eventpath,
"eventName": evtname,
"barcode": barcode,
"eventType": '', # "input", "output", "other"
"frontCamera": {},
"backCamera": {}}
for vpath in vpaths:
'''相机事件字典构造'''
CameraEvent = {"cameraType": '', # "front", "back"
"videoPath": '',
"imagePaths": [],
"yoloResnetTracker": [],
"tracking": [],
}
if isinstance(vpath, list):
CameraEvent["imagePaths"] = vpath
bname = os.path.basename(vpath[0])
if not isinstance(vpath, list):
CameraEvent["videoPath"] = vpath
bname = os.path.basename(vpath)
if bname.split('_')[0] == "0" or bname.find('back')>=0:
CameraEvent["cameraType"] = "back"
if bname.split('_')[0] == "1" or bname.find('front')>=0:
CameraEvent["cameraType"] = "front"
'''事件结果存储文件夹'''
if isinstance(vpath, list):
save_dir_video = save_dir_event / Path("images")
else:
save_dir_video = save_dir_event / Path(str(Path(vpath).stem))
if not save_dir_video.exists():
save_dir_video.mkdir(parents=True, exist_ok=True)
'''Yolo + Resnet + Tracker'''
optdict["source"] = vpath
optdict["save_dir"] = save_dir_video
yrtOut = yolo_resnet_tracker(**optdict)
CameraEvent["yoloResnetTracker"] = yrtOut
# bboxes = np.empty((0, 9), dtype = np.float32)
# for frameDict in yrtOut:
# bboxes = np.concatenate([bboxes, frameDict["tboxes"]], axis=0)
trackerboxes = np.empty((0, 9), dtype=np.float64)
trackefeats = {}
for frameDict in yrtOut:
tboxes = frameDict["tboxes"]
ffeats = frameDict["feats"]
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
for i in range(len(tboxes)):
fid, bid = int(tboxes[i, 7]), int(tboxes[i, 8])
trackefeats.update({f"{fid}_{bid}": ffeats[f"{fid}_{bid}"]})
'''tracking'''
if CameraEvent["cameraType"] == "back":
vts = doBackTracks(trackerboxes, trackefeats)
vts.classify()
event_tracks.append(("back", vts))
CameraEvent["tracking"] = vts
ShoppingDict["backCamera"] = CameraEvent
if CameraEvent["cameraType"] == "front":
vts = doFrontTracks(trackerboxes, trackefeats)
vts.classify()
event_tracks.append(("front", vts))
CameraEvent["tracking"] = vts
ShoppingDict["frontCamera"] = CameraEvent
# pklpath = save_dir_event / "ShoppingDict.pkl"
# with open(str(pklpath), 'wb') as f:
# pickle.dump(ShoppingDict, f)
pf_path = Path(pickpath) / Path(str(evtname)+".pkl")
with open(str(pf_path), 'wb') as f:
pickle.dump(ShoppingDict, f)
'''轨迹显示模块'''
illus = [None, None]
for CamerType, vts in event_tracks:
if len(vts.tracks)==0: continue
if CamerType == 'front':
edgeline = cv2.imread("./tracking/shopcart/cart_tempt/board_ftmp_line.png")
h, w = edgeline.shape[:2]
nh, nw = h//2, w//2
edgeline = cv2.resize(edgeline, (nw, nh), interpolation=cv2.INTER_AREA)
img_tracking = draw_all_trajectories(vts, edgeline, save_dir_event, CamerType, draw5p=True)
illus[0] = img_tracking
plt = plot_frameID_y2(vts)
plt.savefig(os.path.join(save_dir_event, "front_y2.png"))
if CamerType == 'back':
edgeline = cv2.imread("./tracking/shopcart/cart_tempt/edgeline.png")
h, w = edgeline.shape[:2]
nh, nw = h//2, w//2
edgeline = cv2.resize(edgeline, (nw, nh), interpolation=cv2.INTER_AREA)
img_tracking = draw_all_trajectories(vts, edgeline, save_dir_event, CamerType, draw5p=True)
illus[1] = img_tracking
illus = [im for im in illus if im is not None]
if len(illus):
img_cat = np.concatenate(illus, axis = 1)
if len(illus)==2:
H, W = img_cat.shape[:2]
cv2.line(img_cat, (int(W/2), 0), (int(W/2), int(H)), (128, 128, 255), 3)
trajpath = os.path.join(save_dir_event, "traj.png")
cv2.imwrite(trajpath, img_cat)
def main_loop():
bcdpath = r"\\192.168.1.28\share\测试_202406\contrast\std_barcodes_2192"
eventpath = r"\\192.168.1.28\share\测试_202406\0918\images1"
SourceType = "image" # video, image
barcodes = []
input_enents = []
output_events = []
'''1. 获得barcode标准特征集列表'''
for featname in os.listdir(bcdpath):
barcode, ext = os.path.splitext(featname)
if not barcode.isdigit() or len(barcode)<=8 or ext != ".pickle" :
continue
barcodes.append(barcode)
'''2. 构造(放入事件,标准特征)对'''
for filename in os.listdir(eventpath):
'''barcode为时间文件夹的最后一个字段'''
bcd = filename.split('_')[-1]
event_path = os.path.join(eventpath, filename)
stdfeat_path = None
if bcd in barcodes:
stdfeat_path = os.path.join(bcdpath, f"{bcd}.pickle")
input_enents.append((event_path, stdfeat_path))
parmDict = {}
parmDict["SourceType"] = "image"
parmDict["savepath"] = r"D:\contrast\detect"
for eventpath, stdfeat_path in input_enents:
parmDict["eventpath"] = eventpath
parmDict["stdfeat_path"] = stdfeat_path
pipeline(**parmDict)
def main():
'''
函数pipeline(),遍历事件文件夹,选择类型 image 或 video,
'''
evtdir = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\images"
evtdir = Path(evtdir)
parmDict = {}
parmDict["savepath"] = r"D:\contrast\202412测试"
parmDict["SourceType"] = "video" # video, image
parmDict["stdfeat_path"] = None
k = 0
errEvents = []
for item in evtdir.iterdir():
if item.is_dir():
# item = r"D:\exhibition\images\images2\images2"
parmDict["eventpath"] = item
# pipeline(**parmDict)
try:
pipeline(**parmDict)
except Exception as e:
errEvents.append(str(item))
# k+=1
# if k==1:
# break
errfile = os.path.join(parmDict["savepath"], f'error_events.txt')
with open(errfile, 'w', encoding='utf-8') as f:
for line in errEvents:
f.write(line + '\n')
if __name__ == "__main__":
main()