269 lines
10 KiB
Python
269 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Tue Nov 26 17:35:05 2024
|
|
|
|
@author: ym
|
|
"""
|
|
import os
|
|
import pickle
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
import sys
|
|
sys.path.append(r"D:\DetectTracking")
|
|
from tracking.utils.read_data import extract_data, read_tracking_output, read_similar
|
|
|
|
IMG_FORMAT = ['.bmp', '.jpg', '.jpeg', '.png']
|
|
VID_FORMAT = ['.mp4', '.avi']
|
|
|
|
class ShoppingEvent:
|
|
def __init__(self, eventpath, stype="data"):
|
|
'''stype: str, 'pickle', 'data', '''
|
|
|
|
self.eventpath = eventpath
|
|
self.evtname = str(Path(eventpath).stem)
|
|
self.barcode = ''
|
|
self.evtType = ''
|
|
|
|
'''=========== path of image and video =========== '''
|
|
self.back_videopath = ''
|
|
self.front_videopath = ''
|
|
self.back_imgpaths = []
|
|
self.front_imgpaths = []
|
|
|
|
'''=========== process.data ==============================='''
|
|
self.one2one = None
|
|
self.one2n = None
|
|
|
|
'''=========== 0/1_track.data ============================='''
|
|
self.back_yolobboxes = []
|
|
self.back_yolofeats = []
|
|
self.back_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
|
self.back_trackerfeats = {}
|
|
self.back_trackingboxes = []
|
|
self.back_trackingfeats = []
|
|
|
|
self.front_yolobboxes = []
|
|
self.front_yolofeats = []
|
|
self.front_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
|
self.front_trackerfeats = {}
|
|
self.front_trackingboxes = []
|
|
self.front_trackingfeats = []
|
|
|
|
'''=========== 0/1_tracking_output.data ==================='''
|
|
self.back_boxes = []
|
|
self.back_feats = []
|
|
self.front_boxes = []
|
|
self.front_feats = []
|
|
|
|
|
|
if stype=="data":
|
|
self.from_datafile(eventpath)
|
|
if stype=="pickle":
|
|
self.from_pklfile(eventpath)
|
|
|
|
self.feats_select = []
|
|
self.feats_compose = np.empty((0, 256), dtype=np.float64)
|
|
self.select_feats()
|
|
self.compose_feats()
|
|
|
|
# if stype=="image":
|
|
# self.from_image(eventpath)
|
|
|
|
def kerndata(self, ShoppingDict, camtype="backCamera"):
|
|
'''
|
|
camtype: str, "backCamera" or "frontCamera"
|
|
'''
|
|
yoloboxes, resfeats = [], []
|
|
trackerboxes = np.empty((0, 9), dtype=np.float64)
|
|
trackefeats = {}
|
|
trackingboxes, trackingfeats = [], []
|
|
|
|
frameDictList = ShoppingDict[camtype]["yoloResnetTracker"]
|
|
for frameDict in frameDictList:
|
|
yoloboxes.append(frameDict["bboxes"])
|
|
|
|
tboxes = frameDict["tboxes"]
|
|
trackefeats.update(frameDict["feats"])
|
|
|
|
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
|
|
|
|
Residual = ShoppingDict[camtype]["tracking"].Residual
|
|
for track in Residual:
|
|
trackingboxes.append(track.boxes)
|
|
trackingfeats.append(track.features)
|
|
kdata = (yoloboxes, resfeats, trackerboxes, trackefeats, trackingboxes, trackingfeats)
|
|
|
|
|
|
tracking_out_boxes, tracking_out_feats = [], []
|
|
Confirmed = ShoppingDict[camtype]["tracking"].Confirmed
|
|
for track in Confirmed:
|
|
tracking_out_boxes.append(track.boxes)
|
|
tracking_out_feats.append(track.features)
|
|
outdata = (tracking_out_boxes, tracking_out_feats)
|
|
|
|
return kdata, outdata
|
|
|
|
|
|
def from_pklfile(self, eventpath):
|
|
|
|
with open(eventpath, 'rb') as f:
|
|
ShoppingDict = pickle.load(f)
|
|
|
|
|
|
self.eventpath = ShoppingDict["eventPath"]
|
|
self.evtname = ShoppingDict["eventName"]
|
|
self.barcode = ShoppingDict["barcode"]
|
|
|
|
'''=========== path of image and video =========== '''
|
|
self.back_videopath = ShoppingDict["backCamera"]["videoPath"]
|
|
self.front_videopath = ShoppingDict["frontCamera"]["videoPath"]
|
|
self.back_imgpaths = ShoppingDict["backCamera"]["imagePaths"]
|
|
self.front_imgpaths = ShoppingDict["frontCamera"]["imagePaths"]
|
|
|
|
'''===========对应于 0/1_track.data ============================='''
|
|
backdata, back_outdata = self.kerndata(ShoppingDict, "backCamera")
|
|
frontdata, front_outdata = self.kerndata(ShoppingDict, "frontCamera")
|
|
self.back_yolobboxes = backdata[0]
|
|
self.back_yolofeats = backdata[1]
|
|
self.back_trackerboxes = backdata[2]
|
|
self.back_trackerfeats = [3]
|
|
self.back_trackingboxes = [4]
|
|
self.back_trackingfeats = [5]
|
|
|
|
self.front_yolobboxes = frontdata[0]
|
|
self.front_yolofeats = frontdata[1]
|
|
self.front_trackerboxes = frontdata[2]
|
|
self.front_trackerfeats = frontdata[3]
|
|
self.front_trackingboxes = frontdata[4]
|
|
self.front_trackingfeats = frontdata[5]
|
|
|
|
'''===========对应于 0/1_tracking_output.data ============================='''
|
|
self.back_boxes = back_outdata[0]
|
|
self.back_feats = back_outdata[1]
|
|
self.front_boxes = front_outdata[0]
|
|
self.front_feats = front_outdata[1]
|
|
|
|
|
|
def from_datafile(self, eventpath):
|
|
evtList = self.evtname.split('_')
|
|
if len(evtList)>=2 and len(evtList[-1])>=10 and evtList[-1].isdigit():
|
|
self.barcode = evtList[-1]
|
|
if len(evtList)==3 and evtList[-1]== evtList[-2]:
|
|
self.evtType = 'input'
|
|
else:
|
|
self.evtType = 'other'
|
|
|
|
'''================ path of image ============='''
|
|
frontImgs, frontFid = [], []
|
|
backImgs, backFid = [], []
|
|
for imgname in os.listdir(eventpath):
|
|
name, ext = os.path.splitext(imgname)
|
|
if ext not in IMG_FORMAT or name.find('frameId') < 0: continue
|
|
if len(name.split('_')) != 3 and not name.split('_')[3].isdigit(): continue
|
|
|
|
CamerType = name.split('_')[0]
|
|
frameId = int(name.split('_')[3])
|
|
imgpath = os.path.join(eventpath, imgname)
|
|
if CamerType == '0':
|
|
backImgs.append(imgpath)
|
|
backFid.append(frameId)
|
|
if CamerType == '1':
|
|
frontImgs.append(imgpath)
|
|
frontFid.append(frameId)
|
|
## 生成依据帧 ID 排序的前后摄图像地址列表
|
|
frontIdx = np.argsort(np.array(frontFid))
|
|
backIdx = np.argsort(np.array(backFid))
|
|
self.front_imgpaths = [frontImgs[i] for i in frontIdx]
|
|
self.back_imgpaths = [backImgs[i] for i in backIdx]
|
|
|
|
|
|
'''================ path of video ============='''
|
|
for vidname in os.listdir(eventpath):
|
|
name, ext = os.path.splitext(vidname)
|
|
if ext not in VID_FORMAT: continue
|
|
vidpath = os.path.join(eventpath, vidname)
|
|
|
|
CamerType = name.split('_')[0]
|
|
if CamerType == '0':
|
|
self.back_videopath = vidpath
|
|
if CamerType == '1':
|
|
self.front_videopath = vidpath
|
|
|
|
'''================ process.data ============='''
|
|
procpath = Path(eventpath).joinpath('process.data')
|
|
if procpath.is_file():
|
|
SimiDict = read_similar(procpath)
|
|
self.one2one = SimiDict['one2one']
|
|
self.one2n = SimiDict['one2n']
|
|
|
|
|
|
'''=========== 0/1_track.data & 0/1_tracking_output.data ======='''
|
|
for dataname in os.listdir(eventpath):
|
|
datapath = os.path.join(eventpath, dataname)
|
|
if not os.path.isfile(datapath): continue
|
|
CamerType = dataname.split('_')[0]
|
|
|
|
'''========== 0/1_track.data =========='''
|
|
if dataname.find("_track.data")>0:
|
|
bboxes, ffeats, trackerboxes, trackerfeats, trackingboxes, trackingfeats = extract_data(datapath)
|
|
if CamerType == '0':
|
|
self.back_yolobboxes = bboxes
|
|
self.back_yolofeats = ffeats
|
|
self.back_trackerboxes = trackerboxes
|
|
self.back_trackerfeats = trackerfeats
|
|
self.back_trackingboxes = trackingboxes
|
|
self.back_trackingfeats = trackingfeats
|
|
if CamerType == '1':
|
|
self.front_yolobboxes = bboxes
|
|
self.front_yolofeats = ffeats
|
|
self.front_trackerboxes = trackerboxes
|
|
self.front_trackerfeats = trackerfeats
|
|
self.front_trackingboxes = trackingboxes
|
|
self.front_trackingfeats = trackingfeats
|
|
|
|
'''========== 0/1_tracking_output.data =========='''
|
|
if dataname.find("_tracking_output.data")>0:
|
|
tracking_output_boxes, tracking_output_feats = read_tracking_output(datapath)
|
|
if CamerType == '0':
|
|
self.back_boxes = tracking_output_boxes
|
|
self.back_feats = tracking_output_feats
|
|
elif CamerType == '1':
|
|
self.front_boxes = tracking_output_boxes
|
|
self.front_feats = tracking_output_feats
|
|
|
|
|
|
|
|
def compose_feats(self):
|
|
'''事件的特征集成'''
|
|
feats_compose = np.empty((0, 256), dtype=np.float64)
|
|
if len(self.front_feats):
|
|
for feat in self.front_feats:
|
|
feats_compose = np.concatenate((feats_compose, feat), axis=0)
|
|
if len(self.back_feats):
|
|
for feat in self.back_feats:
|
|
feats_compose = np.concatenate((feats_compose, feat), axis=0)
|
|
self.feats_compose = feats_compose
|
|
|
|
def select_feats(self):
|
|
'''事件的特征选择'''
|
|
self.feats_select = []
|
|
if len(self.front_feats):
|
|
self.feats_select = self.front_feats
|
|
elif len(self.back_feats):
|
|
self.feats_select = self.back_feats
|
|
|
|
def main():
|
|
pklpath = r"D:\DetectTracking\evtresult\images2\ShoppingDict.pkl"
|
|
|
|
evt = ShoppingEvent(pklpath, stype='pickle')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
|
|
|
|
|