541 lines
21 KiB
Python
541 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on Tue Nov 26 17:35:05 2024
|
||
|
||
@author: ym
|
||
"""
|
||
import os
|
||
import cv2
|
||
import pickle
|
||
import numpy as np
|
||
from pathlib import Path
|
||
|
||
import sys
|
||
sys.path.append(r"D:\DetectTracking")
|
||
from tracking.utils.plotting import Annotator, colors
|
||
from tracking.utils.drawtracks import drawTrack
|
||
from tracking.utils.read_data import extract_data, read_tracking_output, read_similar
|
||
from tracking.utils.read_data import extract_data_realtime, read_tracking_output_realtime
|
||
|
||
|
||
# import platform
|
||
# import pathlib
|
||
# plt = platform.system()
|
||
|
||
|
||
IMG_FORMAT = ['.bmp', '.jpg', '.jpeg', '.png']
|
||
VID_FORMAT = ['.mp4', '.avi']
|
||
|
||
def save_data(event, resultPath=None):
|
||
'''事件轨迹子图保存'''
|
||
if resultPath is None:
|
||
resultPath = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
|
||
subimgpath = os.path.join(resultPath, f"{event.evtname}", "subimg")
|
||
imgspath = os.path.join(resultPath, f"{event.evtname}", "imgs")
|
||
if not os.path.exists(subimgpath):
|
||
os.makedirs(subimgpath)
|
||
if not os.path.exists(imgspath):
|
||
os.makedirs(imgspath)
|
||
##(2) 保存轨迹中的子图
|
||
subimgpairs = event.save_event_subimg(subimgpath)
|
||
for subimgName, subimg in subimgpairs:
|
||
spath = os.path.join(subimgpath, subimgName)
|
||
cv2.imwrite(spath, subimg)
|
||
|
||
##(3) 保存序列图像
|
||
imgpairs = event.plot_save_image(imgspath)
|
||
for imgname, img in imgpairs:
|
||
spath = os.path.join(imgspath, imgname)
|
||
cv2.imwrite(spath, img)
|
||
##(4) 保存轨迹散点图
|
||
img_cat = event.draw_tracks()
|
||
trajpath = os.path.join(resultPath, "trajectory")
|
||
if not os.path.exists(trajpath):
|
||
os.makedirs(trajpath)
|
||
traj_imgpath = os.path.join(trajpath, event.evtname+".png")
|
||
cv2.imwrite(traj_imgpath, img_cat)
|
||
|
||
|
||
def array2list(bboxes):
|
||
'''
|
||
将 bboxes 变换为 track 列表
|
||
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
Return:
|
||
lboxes:列表,列表中元素具有同一 track_id,x1y1x2y2 格式
|
||
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
'''
|
||
lboxes = []
|
||
if len(bboxes)==0:
|
||
return []
|
||
|
||
trackID = np.unique(bboxes[:, 4].astype(int))
|
||
track_ids = bboxes[:, 4].astype(int)
|
||
for t_id in trackID:
|
||
idx = np.where(track_ids == t_id)[0]
|
||
box = bboxes[idx, :]
|
||
lboxes.append(box)
|
||
|
||
return lboxes
|
||
|
||
|
||
class ShoppingEvent:
|
||
def __init__(self, eventpath, stype="data"):
|
||
'''stype: str, 'source', 'data', 'realtime', 共三种 '''
|
||
|
||
self.eventpath = eventpath
|
||
self.evtname = str(Path(eventpath).stem)
|
||
self.barcode = ''
|
||
self.evtType = ''
|
||
|
||
'''=========== path of image and video =========== '''
|
||
self.back_videopath = ''
|
||
self.front_videopath = ''
|
||
self.back_imgpaths = []
|
||
self.front_imgpaths = []
|
||
|
||
'''=========== process.data ==============================='''
|
||
self.one2one = None
|
||
self.one2n = None
|
||
self.one2SN = None
|
||
|
||
'''=========== 0/1_track.data ============================='''
|
||
self.back_yolobboxes = []
|
||
self.back_yolofeats = []
|
||
self.back_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
||
self.back_trackerfeats = {}
|
||
self.back_trackingboxes = []
|
||
self.back_trackingfeats = []
|
||
|
||
self.front_yolobboxes = []
|
||
self.front_yolofeats = []
|
||
self.front_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
||
self.front_trackerfeats = {}
|
||
self.front_trackingboxes = []
|
||
self.front_trackingfeats = []
|
||
|
||
'''=========== 0/1_tracking_output.data ==================='''
|
||
self.back_boxes = []
|
||
self.back_feats = []
|
||
self.front_boxes = []
|
||
self.front_feats = []
|
||
|
||
|
||
if stype=="data":
|
||
self.from_datafile(eventpath)
|
||
if stype=="realtime":
|
||
self.from_realtime_datafile(eventpath)
|
||
if stype=="source":
|
||
self.from_source_pkl(eventpath)
|
||
|
||
self.feats_select = np.empty((0, 256), dtype=np.float64)
|
||
self.feats_compose = np.empty((0, 256), dtype=np.float64)
|
||
self.select_feats()
|
||
self.compose_feats()
|
||
|
||
# if stype=="image":
|
||
# self.from_image(eventpath)
|
||
|
||
def kerndata(self, ShoppingDict, camtype="backCamera"):
|
||
'''
|
||
camtype: str, "backCamera" or "frontCamera"
|
||
'''
|
||
yoloboxes, resfeats = [], []
|
||
trackerboxes = np.empty((0, 9), dtype=np.float64)
|
||
trackefeats = {}
|
||
trackingboxes, trackingfeats = [], []
|
||
|
||
frameDictList = ShoppingDict[camtype]["yoloResnetTracker"]
|
||
for frameDict in frameDictList:
|
||
yoloboxes.append(frameDict["bboxes"])
|
||
|
||
tboxes = frameDict["tboxes"]
|
||
trackefeats.update(frameDict["feats"])
|
||
|
||
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
|
||
|
||
Residual = ShoppingDict[camtype]["tracking"].Residual
|
||
for track in Residual:
|
||
trackingboxes.append(track.boxes)
|
||
trackingfeats.append(track.features)
|
||
kdata = (yoloboxes, resfeats, trackerboxes, trackefeats, trackingboxes, trackingfeats)
|
||
|
||
|
||
tracking_out_boxes, tracking_out_feats = [], []
|
||
Confirmed = ShoppingDict[camtype]["tracking"].Confirmed
|
||
for track in Confirmed:
|
||
tracking_out_boxes.append(track.boxes)
|
||
tracking_out_feats.append(track.features)
|
||
outdata = (tracking_out_boxes, tracking_out_feats)
|
||
|
||
return kdata, outdata
|
||
|
||
|
||
def from_source_pkl(self, eventpath):
|
||
# if plt == 'Windows':
|
||
# pathlib.PosixPath = pathlib.WindowsPath
|
||
with open(eventpath, 'rb') as f:
|
||
ShoppingDict = pickle.load(f)
|
||
|
||
self.eventpath = ShoppingDict["eventPath"]
|
||
self.evtname = ShoppingDict["eventName"]
|
||
self.barcode = ShoppingDict["barcode"]
|
||
|
||
if len(ShoppingDict["one2n"]):
|
||
self.one2n = ShoppingDict["one2n"]
|
||
|
||
'''=========== path of image and video =========== '''
|
||
self.back_videopath = ShoppingDict["backCamera"]["videoPath"]
|
||
self.front_videopath = ShoppingDict["frontCamera"]["videoPath"]
|
||
self.back_imgpaths = ShoppingDict["backCamera"]["imagePaths"]
|
||
self.front_imgpaths = ShoppingDict["frontCamera"]["imagePaths"]
|
||
|
||
|
||
'''===========对应于 0/1_track.data ============================='''
|
||
backdata, back_outdata = self.kerndata(ShoppingDict, "backCamera")
|
||
frontdata, front_outdata = self.kerndata(ShoppingDict, "frontCamera")
|
||
self.back_yolobboxes = backdata[0]
|
||
self.back_yolofeats = backdata[1]
|
||
self.back_trackerboxes = backdata[2]
|
||
self.back_trackerfeats = [3]
|
||
self.back_trackingboxes = [4]
|
||
self.back_trackingfeats = [5]
|
||
|
||
self.front_yolobboxes = frontdata[0]
|
||
self.front_yolofeats = frontdata[1]
|
||
self.front_trackerboxes = frontdata[2]
|
||
self.front_trackerfeats = frontdata[3]
|
||
self.front_trackingboxes = frontdata[4]
|
||
self.front_trackingfeats = frontdata[5]
|
||
|
||
'''===========对应于 0/1_tracking_output.data ============================='''
|
||
self.back_boxes = back_outdata[0]
|
||
self.back_feats = back_outdata[1]
|
||
self.front_boxes = front_outdata[0]
|
||
self.front_feats = front_outdata[1]
|
||
|
||
|
||
def from_datafile(self, eventpath):
|
||
evtList = self.evtname.split('_')
|
||
if len(evtList)>=2 and len(evtList[-1])>=10 and evtList[-1].isdigit():
|
||
self.barcode = evtList[-1]
|
||
if len(evtList)==3 and evtList[-1]== evtList[-2]:
|
||
self.evtType = 'input'
|
||
else:
|
||
self.evtType = 'other'
|
||
|
||
'''================ path of image ============='''
|
||
frontImgs, frontFid = [], []
|
||
backImgs, backFid = [], []
|
||
for imgname in os.listdir(eventpath):
|
||
name, ext = os.path.splitext(imgname)
|
||
if ext not in IMG_FORMAT or name.find('frameId') < 0: continue
|
||
if len(name.split('_')) != 3 and not name.split('_')[3].isdigit(): continue
|
||
|
||
CamerType = name.split('_')[0]
|
||
frameId = int(name.split('_')[3])
|
||
imgpath = os.path.join(eventpath, imgname)
|
||
if CamerType == '0':
|
||
backImgs.append(imgpath)
|
||
backFid.append(frameId)
|
||
if CamerType == '1':
|
||
frontImgs.append(imgpath)
|
||
frontFid.append(frameId)
|
||
## 生成依据帧 ID 排序的前后摄图像地址列表
|
||
frontIdx = np.argsort(np.array(frontFid))
|
||
backIdx = np.argsort(np.array(backFid))
|
||
self.front_imgpaths = [frontImgs[i] for i in frontIdx]
|
||
self.back_imgpaths = [backImgs[i] for i in backIdx]
|
||
|
||
|
||
'''================ path of video ============='''
|
||
for vidname in os.listdir(eventpath):
|
||
name, ext = os.path.splitext(vidname)
|
||
if ext not in VID_FORMAT: continue
|
||
vidpath = os.path.join(eventpath, vidname)
|
||
|
||
CamerType = name.split('_')[0]
|
||
if CamerType == '0':
|
||
self.back_videopath = vidpath
|
||
if CamerType == '1':
|
||
self.front_videopath = vidpath
|
||
|
||
'''================ process.data ============='''
|
||
procpath = Path(eventpath).joinpath('process.data')
|
||
if procpath.is_file():
|
||
SimiDict = read_similar(procpath)
|
||
self.one2one = SimiDict['one2one']
|
||
self.one2n = SimiDict['one2n']
|
||
self.one2SN = SimiDict['one2SN']
|
||
|
||
'''=========== 0/1_track.data & 0/1_tracking_output.data ======='''
|
||
for dataname in os.listdir(eventpath):
|
||
datapath = os.path.join(eventpath, dataname)
|
||
if not os.path.isfile(datapath): continue
|
||
CamerType = dataname.split('_')[0]
|
||
|
||
'''========== 0/1_track.data =========='''
|
||
if dataname.find("_track.data")>0:
|
||
bboxes, ffeats, trackerboxes, trackerfeats, trackingboxes, trackingfeats = extract_data(datapath)
|
||
if CamerType == '0':
|
||
self.back_yolobboxes = bboxes
|
||
self.back_yolofeats = ffeats
|
||
self.back_trackerboxes = trackerboxes
|
||
self.back_trackerfeats = trackerfeats
|
||
self.back_trackingboxes = trackingboxes
|
||
self.back_trackingfeats = trackingfeats
|
||
if CamerType == '1':
|
||
self.front_yolobboxes = bboxes
|
||
self.front_yolofeats = ffeats
|
||
self.front_trackerboxes = trackerboxes
|
||
self.front_trackerfeats = trackerfeats
|
||
self.front_trackingboxes = trackingboxes
|
||
self.front_trackingfeats = trackingfeats
|
||
|
||
'''========== 0/1_tracking_output.data =========='''
|
||
if dataname.find("_tracking_output.data")>0:
|
||
tracking_output_boxes, tracking_output_feats = read_tracking_output(datapath)
|
||
if CamerType == '0':
|
||
self.back_boxes = tracking_output_boxes
|
||
self.back_feats = tracking_output_feats
|
||
elif CamerType == '1':
|
||
self.front_boxes = tracking_output_boxes
|
||
self.front_feats = tracking_output_feats
|
||
|
||
def from_realtime_datafile(self, eventpath):
|
||
evtList = self.evtname.split('_')
|
||
if len(evtList)>=2 and len(evtList[-1])>=10 and evtList[-1].isdigit():
|
||
self.barcode = evtList[-1]
|
||
if len(evtList)==3 and evtList[-1]== evtList[-2]:
|
||
self.evtType = 'input'
|
||
else:
|
||
self.evtType = 'other'
|
||
|
||
'''================ path of video ============='''
|
||
for vidname in os.listdir(eventpath):
|
||
name, ext = os.path.splitext(vidname)
|
||
if ext not in VID_FORMAT: continue
|
||
vidpath = os.path.join(eventpath, vidname)
|
||
|
||
CamerType = name.split('_')[0]
|
||
if CamerType == '0':
|
||
self.back_videopath = vidpath
|
||
if CamerType == '1':
|
||
self.front_videopath = vidpath
|
||
|
||
'''================ process.data ============='''
|
||
procpath = Path(eventpath).joinpath('process.data')
|
||
if procpath.is_file():
|
||
SimiDict = read_similar(procpath)
|
||
self.one2one = SimiDict['one2one']
|
||
self.one2n = SimiDict['one2n']
|
||
self.one2SN = SimiDict['one2SN']
|
||
|
||
'''=========== 0/1_track.data & 0/1_tracking_output.data ======='''
|
||
for dataname in os.listdir(eventpath):
|
||
datapath = os.path.join(eventpath, dataname)
|
||
if not os.path.isfile(datapath): continue
|
||
CamerType = dataname.split('_')[0]
|
||
'''========== 0/1_track.data =========='''
|
||
if dataname.find("_tracker.data")>0:
|
||
trackerboxes, trackerfeats = extract_data_realtime(datapath)
|
||
if CamerType == '0':
|
||
self.back_trackerboxes = trackerboxes
|
||
self.back_trackerfeats = trackerfeats
|
||
|
||
if CamerType == '1':
|
||
self.front_trackerboxes = trackerboxes
|
||
self.front_trackerfeats = trackerfeats
|
||
'''========== 0/1_tracking_output.data =========='''
|
||
if dataname.find("_tracking_output.data")>0:
|
||
trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats = read_tracking_output_realtime(datapath)
|
||
if CamerType == '0':
|
||
self.back_trackingboxes = trackingboxes
|
||
self.back_trackingfeats = trackingfeats
|
||
self.back_boxes = tracking_outboxes
|
||
self.back_feats = tracking_outfeats
|
||
elif CamerType == '1':
|
||
self.front_trackingboxes = trackingboxes
|
||
self.front_trackingfeats = trackingfeats
|
||
self.front_boxes = tracking_outboxes
|
||
self.front_feats = tracking_outfeats
|
||
|
||
|
||
|
||
|
||
|
||
def compose_feats(self):
|
||
'''事件的特征集成'''
|
||
feats_compose = np.empty((0, 256), dtype=np.float64)
|
||
if len(self.front_feats):
|
||
for feat in self.front_feats:
|
||
feats_compose = np.concatenate((feats_compose, feat), axis=0)
|
||
if len(self.back_feats):
|
||
for feat in self.back_feats:
|
||
feats_compose = np.concatenate((feats_compose, feat), axis=0)
|
||
self.feats_compose = feats_compose
|
||
|
||
def select_feats(self):
|
||
'''事件的特征选择'''
|
||
if len(self.front_feats):
|
||
self.feats_select = self.front_feats[0]
|
||
elif len(self.back_feats):
|
||
self.feats_select = self.back_feats[0]
|
||
|
||
def plot_save_image(self, savepath):
|
||
|
||
def array2list(bboxes):
|
||
'''[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]'''
|
||
frame_ids = bboxes[:, 7].astype(int)
|
||
fID = np.unique(bboxes[:, 7].astype(int))
|
||
fboxes = []
|
||
for f_id in fID:
|
||
idx = np.where(frame_ids==f_id)[0]
|
||
box = bboxes[idx, :]
|
||
fboxes.append((f_id, box))
|
||
return fboxes
|
||
|
||
imgpairs = []
|
||
cameras = ('front', 'back')
|
||
for camera in cameras:
|
||
if camera == 'front':
|
||
boxes = self.front_trackerboxes
|
||
imgpaths = self.front_imgpaths
|
||
else:
|
||
boxes = self.back_trackerboxes
|
||
imgpaths = self.back_imgpaths
|
||
|
||
fboxes = array2list(boxes)
|
||
for fid, fbox in fboxes:
|
||
imgpath = imgpaths[int(fid-1)]
|
||
|
||
image = cv2.imread(imgpath)
|
||
|
||
annotator = Annotator(image.copy(), line_width=2)
|
||
for i, box in enumerate(fbox):
|
||
x1, y1, x2, y2, tid, score, cls, fid, bid = box
|
||
label = f'{int(tid), int(cls)}'
|
||
if tid >=0 and cls==0:
|
||
color = colors(int(cls), True)
|
||
elif tid >=0 and cls!=0:
|
||
color = colors(int(tid), True)
|
||
else:
|
||
color = colors(19, True) # 19为调色板的最后一个元素
|
||
xyxy = (x1/2, y1/2, x2/2, y2/2)
|
||
annotator.box_label(xyxy, label, color=color)
|
||
|
||
im0 = annotator.result()
|
||
|
||
imgpairs.append((Path(imgpath).name, im0))
|
||
|
||
# spath = os.path.join(savepath, Path(imgpath).name)
|
||
|
||
|
||
# cv2.imwrite(spath, im0)
|
||
return imgpairs
|
||
|
||
|
||
def save_event_subimg(self, savepath):
|
||
'''
|
||
功能: 保存一次购物事件的轨迹子图
|
||
9 items: barcode, type, filepath, back_imgpaths, front_imgpaths,
|
||
back_boxes, front_boxes, back_feats, front_feats,
|
||
feats_compose, feats_select
|
||
子图保存次序:先前摄、后后摄,以 k 为编号,和 "feats_compose" 中次序相同
|
||
'''
|
||
imgpairs = []
|
||
cameras = ('front', 'back')
|
||
for camera in cameras:
|
||
boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
|
||
if camera == 'front':
|
||
for b in self.front_boxes:
|
||
boxes = np.concatenate((boxes, b), axis=0)
|
||
imgpaths = self.front_imgpaths
|
||
else:
|
||
for b in self.back_boxes:
|
||
boxes = np.concatenate((boxes, b), axis=0)
|
||
imgpaths = self.back_imgpaths
|
||
|
||
for i, box in enumerate(boxes):
|
||
x1, y1, x2, y2, tid, score, cls, fid, bid = box
|
||
|
||
imgpath = imgpaths[int(fid-1)]
|
||
image = cv2.imread(imgpath)
|
||
|
||
subimg = image[int(y1/2):int(y2/2), int(x1/2):int(x2/2), :]
|
||
|
||
camerType, timeTamp, _, frameID = os.path.basename(imgpath).split('.')[0].split('_')
|
||
subimgName = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID}).png"
|
||
|
||
imgpairs.append((subimgName, subimg))
|
||
|
||
# spath = os.path.join(savepath, subimgName)
|
||
|
||
# cv2.imwrite(spath, subimg)
|
||
return imgpairs
|
||
# basename = os.path.basename(event['filepath'])
|
||
print(f"Image saved: {os.path.basename(self.eventpath)}")
|
||
|
||
def draw_tracks(self):
|
||
front_edge = cv2.imread(r"D:\DetectTracking\tracking\shopcart\cart_tempt\board_ftmp_line.png")
|
||
back_edge = cv2.imread(r"D:\DetectTracking\tracking\shopcart\cart_tempt\edgeline.png")
|
||
|
||
front_trackerboxes = array2list(self.front_trackerboxes)
|
||
back_trackerboxes = array2list(self.back_trackerboxes)
|
||
|
||
# img1, img2 = edgeline.copy(), edgeline.copy()
|
||
img1 = drawTrack(front_trackerboxes, front_edge.copy())
|
||
img2 = drawTrack(self.front_trackingboxes, front_edge.copy())
|
||
|
||
img3 = drawTrack(back_trackerboxes, back_edge.copy())
|
||
img4 = drawTrack(self.back_trackingboxes, back_edge.copy())
|
||
|
||
|
||
|
||
imgcat1 = np.concatenate((img1, img2), axis = 1)
|
||
H, W = imgcat1.shape[:2]
|
||
cv2.line(imgcat1, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
|
||
|
||
imgcat2 = np.concatenate((img3, img4), axis = 1)
|
||
H, W = imgcat2.shape[:2]
|
||
cv2.line(imgcat2, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
|
||
|
||
|
||
illus = [imgcat1, imgcat2]
|
||
if len(illus):
|
||
img_cat = np.concatenate(illus, axis = 1)
|
||
if len(illus)==2:
|
||
H, W = img_cat.shape[:2]
|
||
cv2.line(img_cat, (int(W/2), 0), (int(W/2), int(H)), (128, 128, 255), 3)
|
||
|
||
return img_cat
|
||
|
||
|
||
|
||
|
||
|
||
def main():
|
||
# pklpath = r"D:\DetectTracking\evtresult\images2\ShoppingDict.pkl"
|
||
# evt = ShoppingEvent(pklpath, stype='source')
|
||
|
||
evtpath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\images\20241209-160248-08edd5f6-1806-45ad-babf-7a4dd11cea60_6973226721445"
|
||
evt = ShoppingEvent(evtpath, stype='data')
|
||
|
||
img_cat = evt.draw_tracks()
|
||
|
||
cv2.imwrite("a.png", img_cat)
|
||
|
||
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
# main1()
|
||
|
||
|
||
|
||
|
||
|
||
|