Files
detecttracking/contrast/utils/event.py
王庆刚 39f94c7bd4 20241217
2024-12-17 17:32:09 +08:00

499 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Tue Nov 26 17:35:05 2024
@author: ym
"""
import os
import cv2
import pickle
import numpy as np
from pathlib import Path
import sys
sys.path.append(r"D:\DetectTracking")
from tracking.utils.plotting import Annotator, colors
from tracking.utils.drawtracks import drawTrack
from tracking.utils.read_data import extract_data, read_tracking_output, read_similar
IMG_FORMAT = ['.bmp', '.jpg', '.jpeg', '.png']
VID_FORMAT = ['.mp4', '.avi']
def array2list(bboxes):
'''
将 bboxes 变换为 track 列表
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
Return
lboxes列表列表中元素具有同一 track_idx1y1x2y2 格式
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
'''
lboxes = []
if len(bboxes)==0:
return []
trackID = np.unique(bboxes[:, 4].astype(int))
track_ids = bboxes[:, 4].astype(int)
for t_id in trackID:
idx = np.where(track_ids == t_id)[0]
box = bboxes[idx, :]
lboxes.append(box)
return lboxes
class ShoppingEvent:
def __init__(self, eventpath, stype="data"):
'''stype: str, 'pickle', 'data', '''
self.eventpath = eventpath
self.evtname = str(Path(eventpath).stem)
self.barcode = ''
self.evtType = ''
'''=========== path of image and video =========== '''
self.back_videopath = ''
self.front_videopath = ''
self.back_imgpaths = []
self.front_imgpaths = []
'''=========== process.data ==============================='''
self.one2one = None
self.one2n = None
'''=========== 0/1_track.data ============================='''
self.back_yolobboxes = []
self.back_yolofeats = []
self.back_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
self.back_trackerfeats = {}
self.back_trackingboxes = []
self.back_trackingfeats = []
self.front_yolobboxes = []
self.front_yolofeats = []
self.front_trackerboxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
self.front_trackerfeats = {}
self.front_trackingboxes = []
self.front_trackingfeats = []
'''=========== 0/1_tracking_output.data ==================='''
self.back_boxes = []
self.back_feats = []
self.front_boxes = []
self.front_feats = []
if stype=="data":
self.from_datafile(eventpath)
if stype=="pickle":
self.from_pklfile(eventpath)
self.feats_select = []
self.feats_compose = np.empty((0, 256), dtype=np.float64)
self.select_feats()
self.compose_feats()
# if stype=="image":
# self.from_image(eventpath)
def kerndata(self, ShoppingDict, camtype="backCamera"):
'''
camtype: str, "backCamera" or "frontCamera"
'''
yoloboxes, resfeats = [], []
trackerboxes = np.empty((0, 9), dtype=np.float64)
trackefeats = {}
trackingboxes, trackingfeats = [], []
frameDictList = ShoppingDict[camtype]["yoloResnetTracker"]
for frameDict in frameDictList:
yoloboxes.append(frameDict["bboxes"])
tboxes = frameDict["tboxes"]
trackefeats.update(frameDict["feats"])
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
Residual = ShoppingDict[camtype]["tracking"].Residual
for track in Residual:
trackingboxes.append(track.boxes)
trackingfeats.append(track.features)
kdata = (yoloboxes, resfeats, trackerboxes, trackefeats, trackingboxes, trackingfeats)
tracking_out_boxes, tracking_out_feats = [], []
Confirmed = ShoppingDict[camtype]["tracking"].Confirmed
for track in Confirmed:
tracking_out_boxes.append(track.boxes)
tracking_out_feats.append(track.features)
outdata = (tracking_out_boxes, tracking_out_feats)
return kdata, outdata
def from_pklfile(self, eventpath):
with open(eventpath, 'rb') as f:
ShoppingDict = pickle.load(f)
self.eventpath = ShoppingDict["eventPath"]
self.evtname = ShoppingDict["eventName"]
self.barcode = ShoppingDict["barcode"]
'''=========== path of image and video =========== '''
self.back_videopath = ShoppingDict["backCamera"]["videoPath"]
self.front_videopath = ShoppingDict["frontCamera"]["videoPath"]
self.back_imgpaths = ShoppingDict["backCamera"]["imagePaths"]
self.front_imgpaths = ShoppingDict["frontCamera"]["imagePaths"]
'''===========对应于 0/1_track.data ============================='''
backdata, back_outdata = self.kerndata(ShoppingDict, "backCamera")
frontdata, front_outdata = self.kerndata(ShoppingDict, "frontCamera")
self.back_yolobboxes = backdata[0]
self.back_yolofeats = backdata[1]
self.back_trackerboxes = backdata[2]
self.back_trackerfeats = [3]
self.back_trackingboxes = [4]
self.back_trackingfeats = [5]
self.front_yolobboxes = frontdata[0]
self.front_yolofeats = frontdata[1]
self.front_trackerboxes = frontdata[2]
self.front_trackerfeats = frontdata[3]
self.front_trackingboxes = frontdata[4]
self.front_trackingfeats = frontdata[5]
'''===========对应于 0/1_tracking_output.data ============================='''
self.back_boxes = back_outdata[0]
self.back_feats = back_outdata[1]
self.front_boxes = front_outdata[0]
self.front_feats = front_outdata[1]
def from_datafile(self, eventpath):
evtList = self.evtname.split('_')
if len(evtList)>=2 and len(evtList[-1])>=10 and evtList[-1].isdigit():
self.barcode = evtList[-1]
if len(evtList)==3 and evtList[-1]== evtList[-2]:
self.evtType = 'input'
else:
self.evtType = 'other'
'''================ path of image ============='''
frontImgs, frontFid = [], []
backImgs, backFid = [], []
for imgname in os.listdir(eventpath):
name, ext = os.path.splitext(imgname)
if ext not in IMG_FORMAT or name.find('frameId') < 0: continue
if len(name.split('_')) != 3 and not name.split('_')[3].isdigit(): continue
CamerType = name.split('_')[0]
frameId = int(name.split('_')[3])
imgpath = os.path.join(eventpath, imgname)
if CamerType == '0':
backImgs.append(imgpath)
backFid.append(frameId)
if CamerType == '1':
frontImgs.append(imgpath)
frontFid.append(frameId)
## 生成依据帧 ID 排序的前后摄图像地址列表
frontIdx = np.argsort(np.array(frontFid))
backIdx = np.argsort(np.array(backFid))
self.front_imgpaths = [frontImgs[i] for i in frontIdx]
self.back_imgpaths = [backImgs[i] for i in backIdx]
'''================ path of video ============='''
for vidname in os.listdir(eventpath):
name, ext = os.path.splitext(vidname)
if ext not in VID_FORMAT: continue
vidpath = os.path.join(eventpath, vidname)
CamerType = name.split('_')[0]
if CamerType == '0':
self.back_videopath = vidpath
if CamerType == '1':
self.front_videopath = vidpath
'''================ process.data ============='''
procpath = Path(eventpath).joinpath('process.data')
if procpath.is_file():
SimiDict = read_similar(procpath)
self.one2one = SimiDict['one2one']
self.one2n = SimiDict['one2n']
'''=========== 0/1_track.data & 0/1_tracking_output.data ======='''
for dataname in os.listdir(eventpath):
datapath = os.path.join(eventpath, dataname)
if not os.path.isfile(datapath): continue
CamerType = dataname.split('_')[0]
'''========== 0/1_track.data =========='''
if dataname.find("_track.data")>0:
bboxes, ffeats, trackerboxes, trackerfeats, trackingboxes, trackingfeats = extract_data(datapath)
if CamerType == '0':
self.back_yolobboxes = bboxes
self.back_yolofeats = ffeats
self.back_trackerboxes = trackerboxes
self.back_trackerfeats = trackerfeats
self.back_trackingboxes = trackingboxes
self.back_trackingfeats = trackingfeats
if CamerType == '1':
self.front_yolobboxes = bboxes
self.front_yolofeats = ffeats
self.front_trackerboxes = trackerboxes
self.front_trackerfeats = trackerfeats
self.front_trackingboxes = trackingboxes
self.front_trackingfeats = trackingfeats
'''========== 0/1_tracking_output.data =========='''
if dataname.find("_tracking_output.data")>0:
tracking_output_boxes, tracking_output_feats = read_tracking_output(datapath)
if CamerType == '0':
self.back_boxes = tracking_output_boxes
self.back_feats = tracking_output_feats
elif CamerType == '1':
self.front_boxes = tracking_output_boxes
self.front_feats = tracking_output_feats
def compose_feats(self):
'''事件的特征集成'''
feats_compose = np.empty((0, 256), dtype=np.float64)
if len(self.front_feats):
for feat in self.front_feats:
feats_compose = np.concatenate((feats_compose, feat), axis=0)
if len(self.back_feats):
for feat in self.back_feats:
feats_compose = np.concatenate((feats_compose, feat), axis=0)
self.feats_compose = feats_compose
def select_feats(self):
'''事件的特征选择'''
self.feats_select = []
if len(self.front_feats):
self.feats_select = self.front_feats
elif len(self.back_feats):
self.feats_select = self.back_feats
def plot_save_image(self, savepath):
def array2list(bboxes):
'''[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]'''
frame_ids = bboxes[:, 7].astype(int)
fID = np.unique(bboxes[:, 7].astype(int))
fboxes = []
for f_id in fID:
idx = np.where(frame_ids==f_id)[0]
box = bboxes[idx, :]
fboxes.append((f_id, box))
return fboxes
imgpairs = []
cameras = ('front', 'back')
for camera in cameras:
if camera == 'front':
boxes = self.front_trackerboxes
imgpaths = self.front_imgpaths
else:
boxes = self.back_trackerboxes
imgpaths = self.back_imgpaths
fboxes = array2list(boxes)
for fid, fbox in fboxes:
imgpath = imgpaths[int(fid-1)]
image = cv2.imread(imgpath)
annotator = Annotator(image.copy(), line_width=2)
for i, box in enumerate(fbox):
x1, y1, x2, y2, tid, score, cls, fid, bid = box
label = f'{int(tid), int(cls)}'
if tid >=0 and cls==0:
color = colors(int(cls), True)
elif tid >=0 and cls!=0:
color = colors(int(tid), True)
else:
color = colors(19, True) # 19为调色板的最后一个元素
xyxy = (x1/2, y1/2, x2/2, y2/2)
annotator.box_label(xyxy, label, color=color)
im0 = annotator.result()
imgpairs.append((Path(imgpath).name, im0))
# spath = os.path.join(savepath, Path(imgpath).name)
# cv2.imwrite(spath, im0)
return imgpairs
def save_event_subimg(self, savepath):
'''
功能: 保存一次购物事件的轨迹子图
9 items: barcode, type, filepath, back_imgpaths, front_imgpaths,
back_boxes, front_boxes, back_feats, front_feats,
feats_compose, feats_select
子图保存次序:先前摄、后后摄,以 k 为编号,和 "feats_compose" 中次序相同
'''
imgpairs = []
cameras = ('front', 'back')
for camera in cameras:
boxes = np.empty((0, 9), dtype=np.float64) ##和类doTracks兼容
if camera == 'front':
for b in self.front_boxes:
boxes = np.concatenate((boxes, b), axis=0)
imgpaths = self.front_imgpaths
else:
for b in self.back_boxes:
boxes = np.concatenate((boxes, b), axis=0)
imgpaths = self.back_imgpaths
for i, box in enumerate(boxes):
x1, y1, x2, y2, tid, score, cls, fid, bid = box
imgpath = imgpaths[int(fid-1)]
image = cv2.imread(imgpath)
subimg = image[int(y1/2):int(y2/2), int(x1/2):int(x2/2), :]
camerType, timeTamp, _, frameID = os.path.basename(imgpath).split('.')[0].split('_')
subimgName = f"cam{camerType}_{i}_tid{int(tid)}_fid({int(fid)}, {frameID}).png"
imgpairs.append((subimgName, subimg))
# spath = os.path.join(savepath, subimgName)
# cv2.imwrite(spath, subimg)
return imgpairs
# basename = os.path.basename(event['filepath'])
print(f"Image saved: {os.path.basename(self.eventpath)}")
def draw_tracks(self):
front_edge = cv2.imread(r"D:\DetectTracking\tracking\shopcart\cart_tempt\board_ftmp_line.png")
back_edge = cv2.imread(r"D:\DetectTracking\tracking\shopcart\cart_tempt\edgeline.png")
front_trackerboxes = array2list(self.front_trackerboxes)
back_trackerboxes = array2list(self.back_trackerboxes)
# img1, img2 = edgeline.copy(), edgeline.copy()
img1 = drawTrack(front_trackerboxes, front_edge.copy())
img2 = drawTrack(self.front_trackingboxes, front_edge.copy())
img3 = drawTrack(back_trackerboxes, back_edge.copy())
img4 = drawTrack(self.back_trackingboxes, back_edge.copy())
imgcat1 = np.concatenate((img1, img2), axis = 1)
H, W = imgcat1.shape[:2]
cv2.line(imgcat1, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
imgcat2 = np.concatenate((img3, img4), axis = 1)
H, W = imgcat2.shape[:2]
cv2.line(imgcat2, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
illus = [imgcat1, imgcat2]
if len(illus):
img_cat = np.concatenate(illus, axis = 1)
if len(illus)==2:
H, W = img_cat.shape[:2]
cv2.line(img_cat, (int(W/2), 0), (int(W/2), int(H)), (128, 128, 255), 3)
return img_cat
def main():
# pklpath = r"D:\DetectTracking\evtresult\images2\ShoppingDict.pkl"
# evt = ShoppingEvent(pklpath, stype='pickle')
evtpath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\images\20241209-160248-08edd5f6-1806-45ad-babf-7a4dd11cea60_6973226721445"
evt = ShoppingEvent(evtpath, stype='data')
img_cat = evt.draw_tracks()
cv2.imwrite("a.png", img_cat)
# =============================================================================
# def main1():
# evtpaths = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\images"
# text1 = "one2n_Error.txt"
# text2 = "one2SN_Error.txt"
# events = []
# text = (text1, text2)
# for txt in text:
# txtfile = os.path.join(evtpaths, txt)
# with open(txtfile, "r") as f:
# lines = f.readlines()
# for i, line in enumerate(lines):
# line = line.strip()
# if line:
# fpath=os.path.join(evtpaths, line)
# events.append(fpath)
#
#
# events = list(set(events))
#
# '''定义当前事件存储地址及生成相应文件件'''
# resultPath = r"\\192.168.1.28\share\测试视频数据以及日志\算法全流程测试\202412\result"
# # eventDataPath = os.path.join(resultPath, "evtobjs")
# # subimgPath = os.path.join(resultPath, "subimgs")
# # imagePath = os.path.join(resultPath, "image")
#
# # if not os.path.exists(eventDataPath):
# # os.makedirs(eventDataPath)
# # if not os.path.exists(subimgPath):
# # os.makedirs(subimgPath)
# # if not os.path.exists(imagePath):
# # os.makedirs(imagePath)
#
#
# for evtpath in events:
# event = ShoppingEvent(evtpath)
#
#
# evtname = os.path.basename(evtpath)
# subimgpath = os.path.join(resultPath, f"{evtname}", "subimg")
# imgspath = os.path.join(resultPath, f"{evtname}", "imgs")
# if not os.path.exists(subimgpath):
# os.makedirs(subimgpath)
# if not os.path.exists(imgspath):
# os.makedirs(imgspath)
#
# subimgpairs = event.save_event_subimg(subimgpath)
#
# for subimgName, subimg in subimgpairs:
# spath = os.path.join(subimgpath, subimgName)
# cv2.imwrite(spath, subimg)
#
# imgpairs = event.plot_save_image(imgspath)
# for imgname, img in imgpairs:
# spath = os.path.join(imgspath, imgname)
# cv2.imwrite(spath, img)
#
# =============================================================================
if __name__ == "__main__":
main()
# main1()