Files
detecttracking/contrast/utils/event.py
王庆刚 9b5b135fa3 20250313
2025-03-13 15:36:29 +08:00

545 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Tue Nov 26 17:35:05 2024
@author: ym
"""
import os
import cv2
import pickle
import numpy as np
from pathlib import Path
import sys
sys.path.append(r"D:\DetectTracking")
from tracking.utils.plotting import Annotator, colors
from tracking.utils.drawtracks import drawTrack
from tracking.utils.read_data import extract_data, read_tracking_output, read_similar
from tracking.utils.read_data import extract_data_realtime, read_tracking_output_realtime
# import platform
# import pathlib
# plt = platform.system()
IMG_FORMAT = ['.bmp', '.jpg', '.jpeg', '.png']
VID_FORMAT = ['.mp4', '.avi']
def save_data(event, resultPath=None):
'''事件轨迹子图保存'''
if resultPath is None:
resultPath = os.path.dirname(os.path.abspath(__file__))
subimgpath = os.path.join(resultPath, f"{event.evtname}", "subimg")
imgspath = os.path.join(resultPath, f"{event.evtname}", "imgs")
if not os.path.exists(subimgpath):
os.makedirs(subimgpath)
if not os.path.exists(imgspath):
os.makedirs(imgspath)
##(2) 保存轨迹中的子图
subimgpairs = event.save_event_subimg(subimgpath)
for subimgName, subimg in subimgpairs:
spath = os.path.join(subimgpath, subimgName)
cv2.imwrite(spath, subimg)
##(3) 保存序列图像
imgpairs = event.plot_save_image(imgspath)
for imgname, img in imgpairs:
spath = os.path.join(imgspath, imgname)
cv2.imwrite(spath, img)
##(4) 保存轨迹散点图
img_cat = event.draw_tracks()
trajpath = os.path.join(resultPath, "trajectory")
if not os.path.exists(trajpath):
os.makedirs(trajpath)
traj_imgpath = os.path.join(trajpath, event.evtname+".png")
cv2.imwrite(traj_imgpath, img_cat)
def array2list(bboxes):
'''
将 bboxes 变换为 track 列表
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
Return
lboxes列表列表中元素具有同一 track_idx1y1x2y2 格式
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
'''
lboxes = []
if len(bboxes)==0:
return []
trackID = np.unique(bboxes[:, 4].astype(int))
track_ids = bboxes[:, 4].astype(int)
for t_id in trackID:
idx = np.where(track_ids == t_id)[0]
box = bboxes[idx, :]
lboxes.append(box)
return lboxes
class ShoppingEvent:
def __init__(self, eventpath, stype="data"):
'''stype: str, 'source', 'data', 'realtime', 共三种
source: 前后摄视频经 pipeline 生成的文件
data: 基于事件切分的原 data 文件版本
realtime: 全实时生成的 data 文件
'''
self.eventpath = eventpath
self.evtname = str(Path(eventpath).stem)
self.barcode = ''
self.evtType = ''
'''=========== path of image and video =========== '''
self.back_videopath = ''
self.front_videopath = ''
self.back_imgpaths = []
self.front_imgpaths = []
'''=========== process.data ==============================='''
self.one2one = None
self.one2n = None
self.one2SN = None
'''=========== 0/1_track.data ============================='''
self.back_yolobboxes = []
self.back_yolofeats = []
self.back_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
self.back_trackerfeats = {}
self.back_trackingboxes = []
self.back_trackingfeats = []
self.front_yolobboxes = []
self.front_yolofeats = []
self.front_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
self.front_trackerfeats = {}
self.front_trackingboxes = []
self.front_trackingfeats = []
'''=========== 0/1_tracking_output.data ==================='''
self.back_boxes = []
self.back_feats = []
self.front_boxes = []
self.front_feats = []
if stype=="data":
self.from_datafile(eventpath)
if stype=="realtime":
self.from_realtime_datafile(eventpath)
if stype=="source":
self.from_source_pkl(eventpath)
self.feats_select = np.empty((0, 256), dtype=np.float64)
self.feats_compose = np.empty((0, 256), dtype=np.float64)
self.select_feats()
self.compose_feats()
# if stype=="image":
# self.from_image(eventpath)
def kerndata(self, ShoppingDict, camtype="backCamera"):
'''
camtype: str, "backCamera" or "frontCamera"
'''
yoloboxes, resfeats = [], []
trackerboxes = np.empty((0, 9), dtype=np.float64)
trackefeats = {}
trackingboxes, trackingfeats = [], []
frameDictList = ShoppingDict[camtype]["yoloResnetTracker"]
for frameDict in frameDictList:
yoloboxes.append(frameDict["bboxes"])
tboxes = frameDict["tboxes"]
trackefeats.update(frameDict["feats"])
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
Residual = ShoppingDict[camtype]["tracking"].Residual
for track in Residual:
trackingboxes.append(track.boxes)
trackingfeats.append(track.features)
kdata = (yoloboxes, resfeats, trackerboxes, trackefeats, trackingboxes, trackingfeats)
tracking_out_boxes, tracking_out_feats = [], []
Confirmed = ShoppingDict[camtype]["tracking"].Confirmed
for track in Confirmed:
tracking_out_boxes.append(track.boxes)
tracking_out_feats.append(track.features)
outdata = (tracking_out_boxes, tracking_out_feats)
return kdata, outdata
def from_source_pkl(self, eventpath):
# if plt == 'Windows':
# pathlib.PosixPath = pathlib.WindowsPath
with open(eventpath, 'rb') as f:
ShoppingDict = pickle.load(f)
self.eventpath = ShoppingDict["eventPath"]
self.evtname = ShoppingDict["eventName"]
self.barcode = ShoppingDict["barcode"]
if len(ShoppingDict["one2n"]):
self.one2n = ShoppingDict["one2n"]
'''=========== path of image and video =========== '''
self.back_videopath = ShoppingDict["backCamera"]["videoPath"]
self.front_videopath = ShoppingDict["frontCamera"]["videoPath"]
self.back_imgpaths = ShoppingDict["backCamera"]["imagePaths"]
self.front_imgpaths = ShoppingDict["frontCamera"]["imagePaths"]
'''===========对应于 0/1_track.data ============================='''
backdata, back_outdata = self.kerndata(ShoppingDict, "backCamera")
frontdata, front_outdata = self.kerndata(ShoppingDict, "frontCamera")
self.back_yolobboxes = backdata[0]
self.back_yolofeats = backdata[1]
self.back_trackerboxes = backdata[2]
self.back_trackerfeats = [3]
self.back_trackingboxes = [4]
self.back_trackingfeats = [5]
self.front_yolobboxes = frontdata[0]
self.front_yolofeats = frontdata[1]
self.front_trackerboxes = frontdata[2]
self.front_trackerfeats = frontdata[3]
self.front_trackingboxes = frontdata[4]
self.front_trackingfeats = frontdata[5]
'''===========对应于 0/1_tracking_output.data ============================='''
self.back_boxes = back_outdata[0]
self.back_feats = back_outdata[1]
self.front_boxes = front_outdata[0]
self.front_feats = front_outdata[1]
def from_datafile(self, eventpath):
evtList = self.evtname.split('_')
if len(evtList)>=2 and len(evtList[-1])>=10 and evtList[-1].isdigit():
self.barcode = evtList[-1]
if len(evtList)==3 and evtList[-1]== evtList[-2]:
self.evtType = 'input'
else:
self.evtType = 'other'
'''================ path of image ============='''
frontImgs, frontFid = [], []
backImgs, backFid = [], []
for imgname in os.listdir(eventpath):
name, ext = os.path.splitext(imgname)
if ext not in IMG_FORMAT or name.find('frameId') < 0: continue
if len(name.split('_')) != 3 and not name.split('_')[3].isdigit(): continue
CamerType = name.split('_')[0]
frameId = int(name.split('_')[3])
imgpath = os.path.join(eventpath, imgname)
if CamerType == '0':
backImgs.append(imgpath)
backFid.append(frameId)
if CamerType == '1':
frontImgs.append(imgpath)
frontFid.append(frameId)
## 生成依据帧 ID 排序的前后摄图像地址列表
frontIdx = np.argsort(np.array(frontFid))
backIdx = np.argsort(np.array(backFid))
self.front_imgpaths = [frontImgs[i] for i in frontIdx]
self.back_imgpaths = [backImgs[i] for i in backIdx]
'''================ path of video ============='''
for vidname in os.listdir(eventpath):
name, ext = os.path.splitext(vidname)
if ext not in VID_FORMAT: continue
vidpath = os.path.join(eventpath, vidname)
CamerType = name.split('_')[0]
if CamerType == '0':
self.back_videopath = vidpath
if CamerType == '1':
self.front_videopath = vidpath
'''================ process.data ============='''
procpath = Path(eventpath).joinpath('process.data')
if procpath.is_file():
SimiDict = read_similar(procpath)
self.one2one = SimiDict['one2one']
self.one2n = SimiDict['one2n']
self.one2SN = SimiDict['one2SN']
'''=========== 0/1_track.data & 0/1_tracking_output.data ======='''
for dataname in os.listdir(eventpath):
datapath = os.path.join(eventpath, dataname)
if not os.path.isfile(datapath): continue
CamerType = dataname.split('_')[0]
'''========== 0/1_track.data =========='''
if dataname.find("_track.data")>0:
bboxes, ffeats, trackerboxes, trackerfeats, trackingboxes, trackingfeats = extract_data(datapath)
if CamerType == '0':
self.back_yolobboxes = bboxes
self.back_yolofeats = ffeats
self.back_trackerboxes = trackerboxes
self.back_trackerfeats = trackerfeats
self.back_trackingboxes = trackingboxes
self.back_trackingfeats = trackingfeats
if CamerType == '1':
self.front_yolobboxes = bboxes
self.front_yolofeats = ffeats
self.front_trackerboxes = trackerboxes
self.front_trackerfeats = trackerfeats
self.front_trackingboxes = trackingboxes
self.front_trackingfeats = trackingfeats
'''========== 0/1_tracking_output.data =========='''
if dataname.find("_tracking_output.data")>0:
tracking_output_boxes, tracking_output_feats = read_tracking_output(datapath)
if CamerType == '0':
self.back_boxes = tracking_output_boxes
self.back_feats = tracking_output_feats
elif CamerType == '1':
self.front_boxes = tracking_output_boxes
self.front_feats = tracking_output_feats
def from_realtime_datafile(self, eventpath):
evtList = self.evtname.split('_')
if len(evtList)>=2 and len(evtList[-1])>=10 and evtList[-1].isdigit():
self.barcode = evtList[-1]
if len(evtList)==3 and evtList[-1]== evtList[-2]:
self.evtType = 'input'
else:
self.evtType = 'other'
'''================ path of video ============='''
for vidname in os.listdir(eventpath):
name, ext = os.path.splitext(vidname)
if ext not in VID_FORMAT: continue
vidpath = os.path.join(eventpath, vidname)
CamerType = name.split('_')[0]
if CamerType == '0':
self.back_videopath = vidpath
if CamerType == '1':
self.front_videopath = vidpath
'''================ process.data ============='''
procpath = Path(eventpath).joinpath('process.data')
if procpath.is_file():
SimiDict = read_similar(procpath)
self.one2one = SimiDict['one2one']
self.one2n = SimiDict['one2n']
self.one2SN = SimiDict['one2SN']
'''=========== 0/1_track.data & 0/1_tracking_output.data ======='''
for dataname in os.listdir(eventpath):
datapath = os.path.join(eventpath, dataname)
if not os.path.isfile(datapath): continue
CamerType = dataname.split('_')[0]
'''========== 0/1_track.data =========='''
if dataname.find("_tracker.data")>0:
trackerboxes, trackerfeats = extract_data_realtime(datapath)
if CamerType == '0':
self.back_trackerboxes = trackerboxes
self.back_trackerfeats = trackerfeats
if CamerType == '1':
self.front_trackerboxes = trackerboxes
self.front_trackerfeats = trackerfeats
'''========== 0/1_tracking_output.data =========='''
if dataname.find("_tracking_output.data")>0:
trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(datapath)
if CamerType == '0':
self.back_trackingboxes = trackingboxes
self.back_trackingfeats = trackingfeats
self.back_boxes = tracking_outboxes
self.back_feats = tracking_outfeats
elif CamerType == '1':
self.front_trackingboxes = trackingboxes
self.front_trackingfeats = trackingfeats
self.front_boxes = tracking_outboxes
self.front_feats = tracking_outfeats
def compose_feats(self):
'''事件的特征集成'''
feats_compose = np.empty((0, 256), dtype=np.float64)
if len(self.front_feats):
for feat in self.front_feats:
feats_compose = np.concatenate((feats_compose, feat), axis=0)
if len(self.back_feats):
for feat in self.back_feats:
feats_compose = np.concatenate((feats_compose, feat), axis=0)
self.feats_compose = feats_compose
def select_feats(self):
'''事件的特征选择'''
if len(self.front_feats):
self.feats_select = self.front_feats[0]
elif len(self.back_feats):
self.feats_select = self.back_feats[0]
def plot_save_image(self, savepath):
def array2list(bboxes):
'''[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]'''
frame_ids = bboxes[:, 7].astype(int)
fID = np.unique(bboxes[:, 7].astype(int))
fboxes = []
for f_id in fID:
idx = np.where(frame_ids==f_id)[0]
box = bboxes[idx, :]
fboxes.append((f_id, box))
return fboxes
imgpairs = []
cameras = ('front', 'back')
for camera in cameras:
if camera == 'front':
boxes = self.front_trackerboxes
imgpaths = self.front_imgpaths
else:
boxes = self.back_trackerboxes
imgpaths = self.back_imgpaths
fboxes = array2list(boxes)
for fid, fbox in fboxes:
imgpath = imgpaths[int(fid-1)]
image = cv2.imread(imgpath)
annotator = Annotator(image.copy(), line_width=2)
for i, box in enumerate(fbox):
x1, y1, x2, y2, tid, score, cls, fid, bid = box
label = f'{int(tid), int(cls)}'
if tid >=0 and cls==0:
color = colors(int(cls), True)
elif tid >=0 and cls!=0:
color = colors(int(tid), True)
else:
color = colors(19, True) # 19为调色板的最后一个元素
xyxy = (x1/2, y1/2, x2/2, y2/2)
annotator.box_label(xyxy, label, color=color)
im0 = annotator.result()
imgpairs.append((Path(imgpath).name, im0))
# spath = os.path.join(savepath, Path(imgpath).name)
# cv2.imwrite(spath, im0)
return imgpairs
def save_event_subimg(self, savepath):
'''
功能: 保存一次购物事件的轨迹子图
9 items: barcode, type, filepath, back_imgpaths, front_imgpaths,
back_boxes, front_boxes, back_feats, front_feats,
feats_compose, feats_select
子图保存次序:先前摄、后后摄,以 k 为编号,和 "feats_compose" 中次序相同
'''
imgpairs = []
cameras = ('front', 'back')
for camera in cameras:
boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
if camera == 'front':
for b in self.front_boxes:
boxes = np.concatenate((boxes, b), axis=0)
imgpaths = self.front_imgpaths
else:
for b in self.back_boxes:
boxes = np.concatenate((boxes, b), axis=0)
imgpaths = self.back_imgpaths
for i, box in enumerate(boxes):
x1, y1, x2, y2, tid, score, cls, fid, bid = box
imgpath = imgpaths[int(fid-1)]
image = cv2.imread(imgpath)
subimg = image[int(y1/2):int(y2/2), int(x1/2):int(x2/2), :]
camerType, timeTamp, _, frameID = os.path.basename(imgpath).split('.')[0].split('_')
subimgName = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID}).png"
imgpairs.append((subimgName, subimg))
# spath = os.path.join(savepath, subimgName)
# cv2.imwrite(spath, subimg)
return imgpairs
# basename = os.path.basename(event['filepath'])
print(f"Image saved: {os.path.basename(self.eventpath)}")
def draw_tracks(self):
front_edge = cv2.imread(r"D:\DetectTracking\tracking\shopcart\cart_tempt\board_ftmp_line.png")
back_edge = cv2.imread(r"D:\DetectTracking\tracking\shopcart\cart_tempt\edgeline.png")
front_trackerboxes = array2list(self.front_trackerboxes)
back_trackerboxes = array2list(self.back_trackerboxes)
# img1, img2 = edgeline.copy(), edgeline.copy()
img1 = drawTrack(front_trackerboxes, front_edge.copy())
img2 = drawTrack(self.front_trackingboxes, front_edge.copy())
img3 = drawTrack(back_trackerboxes, back_edge.copy())
img4 = drawTrack(self.back_trackingboxes, back_edge.copy())
imgcat1 = np.concatenate((img1, img2), axis = 1)
H, W = imgcat1.shape[:2]
cv2.line(imgcat1, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
imgcat2 = np.concatenate((img3, img4), axis = 1)
H, W = imgcat2.shape[:2]
cv2.line(imgcat2, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
illus = [imgcat1, imgcat2]
if len(illus):
img_cat = np.concatenate(illus, axis = 1)
if len(illus)==2:
H, W = img_cat.shape[:2]
cv2.line(img_cat, (int(W/2), 0), (int(W/2), int(H)), (128, 128, 255), 3)
return img_cat
def main():
# pklpath = r"D:\DetectTracking\evtresult\images2\ShoppingDict.pkl"
# evt = ShoppingEvent(pklpath, stype='source')
evtpath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\images\20241209-160248-08edd5f6-1806-45ad-babf-7a4dd11cea60_6973226721445"
evt = ShoppingEvent(evtpath, stype='data')
img_cat = evt.draw_tracks()
cv2.imwrite("a.png", img_cat)
if __name__ == "__main__":
main()
# main1()