Files
detecttracking/tracking/dotrack/dotracks.py
2024-07-18 17:52:12 +08:00

500 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Mon Mar 4 18:16:01 2024
@author: ym
"""
import numpy as np
import cv2
from pathlib import Path
from scipy.spatial.distance import cdist
from utils.mergetrack import track_equal_track, readDict
curpath = Path(__file__).resolve().parents[0]
curpath = Path(curpath)
class MoveState:
"""商品运动状态标志"""
Static = 0
DownWard = 1
UpWard = 2
FreeMove = 3
Unknown = -1
class ShoppingCart:
def __init__(self, bboxes):
self.bboxes = bboxes
self.loadrate = self.load_rate()
def load_rate(self):
bboxes = self.bboxes
fid = min(bboxes[:, 7])
idx = bboxes[:, 7] == fid
boxes = bboxes[idx]
temp = np.zeros(self.incart.shape, np.uint8)
for i in range(boxes.shape[0]):
x1, y1, x2, y2, tid = boxes[i, 0:5]
cv2.rectangle(temp, (int(x1), int(y1)), (int(x2), int(y2)), 255, cv2.FILLED)
'''1. and 滤除购物车边框外的干扰'''
loadstate = cv2.bitwise_and(self.incart, temp)
'''2. xor 得到购物车内内被填充的区域'''
# loadstate = cv2.bitwise_xor(self.incart, temp1)
num_loadstate = cv2.countNonZero(loadstate)
num_incart = cv2.countNonZero(self.incart)
loadrate = num_loadstate / (num_incart+0.01)
# edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png", cv2.IMREAD_GRAYSCALE)
# cv2.imwrite(f"./test/temp.png", cv2.add(temp, edgeline))
# cv2.imwrite(f"./test/incart.png", cv2.add(self.incart, edgeline))
# cv2.imwrite(f"./test/loadstate.png", cv2.add(loadstate, edgeline))
return loadrate
@property
def incart(self):
img = cv2.imread(str(curpath/'cart_tempt/back_incart.png'), cv2.IMREAD_GRAYSCALE)
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
return binary
@property
def outcart(self):
img = cv2.imread(str(curpath/'cart_tempt/back_outcart.png'), cv2.IMREAD_GRAYSCALE)
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
return binary
@property
def cartedge(self):
img = cv2.imread(str(curpath/'cart_tempt/back_cartedge.png'), cv2.IMREAD_GRAYSCALE)
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
return binary
class Track:
'''抽象基类,不能实例化对象'''
def __init__(self, boxes, features, imgshape=(1024, 1280)):
'''
boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
0 1 2 3 4 5 6 7 8
'''
# assert len(set(boxes[:, 4].astype(int))) == 1, "For a Track, track_id more than 1"
# assert len(set(boxes[:, 6].astype(int))) == 1, "For a Track, class number more than 1"
self.boxes = boxes
self.features = features
self.tid = int(boxes[0, 4])
self.cls = int(boxes[0, 6])
self.frnum = boxes.shape[0]
self.imgBorder = False
self.isCornpoint = False
self.imgshape = imgshape
self.state = MoveState.Unknown
'''轨迹开始帧、结束帧 ID'''
self.start_fid = int(np.min(boxes[:, 7]))
self.end_fid = int(np.max(boxes[:, 7]))
''''''
self.Hands = []
self.HandsIou = []
self.Goods = []
self.GoodsIou = []
'''5个关键点中心点、左上点、右上点、左下点、右下点 )坐标'''
self.compute_cornpoints()
'''5个关键点轨迹特征可以在子类中实现降低顺序处理时的计算量
(中心点、左上点、右上点、左下点、右下点 )轨迹特征'''
self.compute_cornpts_feats()
'''应计算各个角点面积、平均面积'''
mw, mh = np.mean(boxes[:, 2]-boxes[:, 0]), np.mean((boxes[:, 3]-boxes[:, 1]))
self.mwh = np.mean((mw, mh))
self.Area = mw * mh
'''
最后一帧与第一帧间的位移:
vshift: 正值为向下,负值为向上
hshift: 负值为向购物车边框两边移动,正值为物品向中心移动
'''
self.vshift = self.cornpoints[-1, 1] - self.cornpoints[0, 1] # 纵向位移
self.hshift = abs(self.cornpoints[0, 0] - self.imgshape[0]/2) - \
abs(self.cornpoints[-1, 0] - self.imgshape[0]/2)
'''手部状态分析'''
self.HAND_STATIC_THRESH = 100
if self.cls == 0:
self.extract_hand_features()
def compute_cornpoints(self):
'''
cornpoints 共10项分别是个点的坐标值x, y
(center, top_left, top_right, bottom_left, bottom_right)
'''
boxes = self.boxes
cornpoints = np.zeros((self.frnum, 10))
cornpoints[:,0] = (boxes[:, 0] + boxes[:, 2]) / 2
cornpoints[:,1] = (boxes[:, 1] + boxes[:, 3]) / 2
cornpoints[:,2], cornpoints[:,3] = boxes[:, 0], boxes[:, 1]
cornpoints[:,4], cornpoints[:,5] = boxes[:, 2], boxes[:, 1]
cornpoints[:,6], cornpoints[:,7] = boxes[:, 0], boxes[:, 3]
cornpoints[:,8], cornpoints[:,9] = boxes[:, 2], boxes[:, 3]
self.cornpoints = cornpoints
def compute_cornpts_feats(self):
'''
'''
trajectory = []
trajlens = []
trajdist = []
trajrects = []
for k in range(5):
# diff_xy2 = np.power(np.diff(self.cornpoints[:, 2*k:2*(k+1)], axis = 0), 2)
# trajlen = np.sum(np.sqrt(np.sum(diff_xy2, axis = 1)))
X = self.cornpoints[:, 2*k:2*(k+1)]
traj = np.linalg.norm(np.diff(X, axis=0), axis=1)
trajectory.append(traj)
trajlen = np.sum(traj)
trajlens.append(trajlen)
ptdist = np.max(cdist(X, X))
trajdist.append(ptdist)
'''最小外接矩形:
rect[0]: 中心(x, y)
rect[1]: (w, h)
rect[0]: 旋转角度 (-90°, 0]
'''
rect = cv2.minAreaRect(X.astype(np.int64))
trajrects.append(rect)
self.trajectory = trajectory
self.trajlens = trajlens
self.trajdist = trajdist
self.trajrects = trajrects
def trajfeature(self):
'''
分两种情况计算轨迹特征(检测框边界不在图像边界范围内,在图像边界范围内):
-最小长度轨迹trajmin
-最小轨迹长度trajlen_min
-最小轨迹欧氏距离trajdist_max
'''
idx1 = self.trajlens.index(max(self.trajlens))
trajmax = self.trajectory[idx1]
trajlen_max = self.trajlens[idx1]
trajdist_max = self.trajdist[idx1]
if not self.isCornpoint:
idx2 = self.trajlens.index(min(self.trajlens))
trajmin = self.trajectory[idx2]
trajlen_min = self.trajlens[idx2]
trajdist_min = self.trajdist[idx2]
else:
trajmin = self.trajectory[0]
trajlen_min = self.trajlens[0]
trajdist_min = self.trajdist[0]
'''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小'''
trajlen_rate = trajlen_min/(trajlen_max+0.0001)
'''最小轨迹欧氏距离/目标框尺度均值'''
trajdist_rate = trajdist_min/(self.mwh+0.0001)
self.trajmin = trajmin
self.trajmax = trajmax
self.TrajFeat = [trajlen_min, trajlen_max,
trajdist_min, trajdist_max,
trajlen_rate, trajdist_rate]
def pt_state_fids(self, det_y, STATIC_THRESH = 8):
'''
前摄时y一般选择为 box 的 y1 坐标,且需限定商品在购物车内。
inputs
y1D array
parameters
STATIC_THRESH轨迹处于静止状态的阈值。
outputs
输出为差分值小于 STATIC_THRESH 的y中元素的start, end索引
ranges = [(x1, y1),
(x1, y1),
...]
'''
# print(f"The ID is: {self.tid}")
# det_y = np.diff(y, axis=0)
ranges, rangex = [], []
static_indices = np.where(np.abs(det_y) < STATIC_THRESH)[0]
if len(static_indices) == 0:
rangex.append((0, len(det_y)))
return ranges, rangex
start_index = static_indices[0]
for i in range(1, len(static_indices)):
if static_indices[i] != static_indices[i-1] + 1:
ranges.append((start_index, static_indices[i-1] + 1))
start_index = static_indices[i]
ranges.append((start_index, static_indices[-1] + 1))
if len(ranges) == 0:
rangex.append((0, len(det_y)))
return ranges, rangex
idx1, idx2 = ranges[0][0], ranges[-1][1]
if idx1 != 0:
rangex.append((0, idx1))
# 轨迹的最后阶段是运动状态
for k in range(1, len(ranges)):
index1 = ranges[k-1][1]
index2 = ranges[k][0]
rangex.append((index1, index2))
if idx2 != len(det_y):
rangex.append((idx2, len(det_y)))
return ranges, rangex
def PositionState(self, camerType="back"):
'''
camerType: back, 后置摄像头
front, 前置摄像头
'''
if camerType=="front":
incart = cv2.imread("./shopcart/cart_tempt/incart.png", cv2.IMREAD_GRAYSCALE)
outcart = cv2.imread("./shopcart/cart_tempt/outcart.png", cv2.IMREAD_GRAYSCALE)
else:
incart = cv2.imread("./shopcart/cart_tempt/incart_ftmp.png", cv2.IMREAD_GRAYSCALE)
outcart = cv2.imread("./shopcart/cart_tempt/outcart_ftmp.png", cv2.IMREAD_GRAYSCALE)
xc, yc = self.cornpoints[:,0].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,1].clip(0,self.imgshape[1]-1).astype(np.int64)
x1, y1 = self.cornpoints[:,6].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,7].clip(0,self.imgshape[1]-1).astype(np.int64)
x2, y2 = self.cornpoints[:,8].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,9].clip(0,self.imgshape[1]-1).astype(np.int64)
# print(self.tid)
Cent_inCartnum = np.count_nonzero(incart[(yc, xc)])
LB_inCartnum = np.count_nonzero(incart[(y1, x1)])
RB_inCartnum = np.count_nonzero(incart[(y2, x2)])
Cent_outCartnum = np.count_nonzero(outcart[(yc, xc)])
LB_outCartnum = np.count_nonzero(outcart[(y1, x1)])
RB_outCartnum = np.count_nonzero(outcart[(y2, x2)])
'''Track完全在车内左下角点、右下角点与 outcart 的交集为 0'''
self.isWholeInCart = False
if LB_outCartnum + RB_outCartnum == 0:
self.isWholeInCart = True
'''Track完全在车外左下角点、中心点与 incart 的交集为 0
右下角点、中心点与 incart 的交集为 0
'''
self.isWholeOutCart = False
if Cent_inCartnum + LB_inCartnum == 0 or Cent_inCartnum + RB_inCartnum == 0:
self.isWholeOutCart = True
self.Cent_isIncart = False
self.LB_isIncart = False
self.RB_isIncart = False
if Cent_inCartnum: self.Cent_isIncart = True
if LB_inCartnum: self.LB_isIncart = True
if RB_inCartnum: self.RB_isIncart = True
self.posState = self.Cent_isIncart+self.LB_isIncart+self.RB_isIncart
def extract_hand_features(self):
assert self.cls == 0, "The class of traj must be HAND!"
self.isHandStatic = False
x0 = (self.boxes[:, 0] + self.boxes[:, 2]) / 2
y0 = (self.boxes[:, 1] + self.boxes[:, 3]) / 2
handXY = np.stack((x0, y0), axis=-1)
# handMaxY0 = np.max(y0)
handCenter = np.array([(max(x0)+min(x0))/2, (max(y0)+min(y0))/2])
handMaxDist = np.max(np.linalg.norm(handXY - handCenter))
if handMaxDist < self.HAND_STATIC_THRESH:
self.isHandStatic = True
return
class doTracks:
def __init__(self, bboxes, TracksDict):
'''fundamental property'''
self.bboxes = bboxes
# self.TracksDict = TracksDict
self.frameID = np.unique(bboxes[:, 7].astype(int))
self.trackID = np.unique(bboxes[:, 4].astype(int))
self.lboxes = self.array2list()
self.lfeats = self.getfeats(TracksDict)
'''对 self.tracks 中的元素进行分类,将 track 归入相应列表中'''
self.Hands = []
self.Kids = []
self.Static = []
self.Residual = []
self.DownWard = [] # subset of self.Residual
self.UpWard = [] # subset of self.Residual
self.FreeMove = [] # subset of self.Residual
def array2list(self):
'''
将 bboxes 变换为 track 列表
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
Return
lboxes列表列表中元素具有同一 track_idx1y1x2y2 格式
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
'''
track_ids = self.bboxes[:, 4].astype(int)
lboxes = []
for t_id in self.trackID:
# print(f"The ID is: {t_id}")
idx = np.where(track_ids == t_id)[0]
box = self.bboxes[idx, :]
assert len(set(box[:, 7])) == len(box), "Please check!!!"
lboxes.append(box)
return lboxes
def getfeats(self, TracksDict):
lboxes = self.lboxes
lfeats = []
for boxes in lboxes:
afeat = readDict(boxes, TracksDict)
lfeats.append(afeat)
return lfeats
def similarity(self):
nt = len(self.tracks)
similar_dict = {}
if nt >= 2:
for i in range(nt):
for j in range(i, nt):
tracka = self.tracks[i]
trackb = self.tracks[j]
similar = self.feat_similarity(tracka, trackb)
similar_dict.update({(tracka.tid, trackb.tid): similar})
return similar_dict
def feat_similarity(self, tracka, trackb, metric='cosine'):
boxes_a, boxes_b = tracka.boxes, trackb.boxes
na, nb = tracka.boxes.shape[0], trackb.boxes.shape[0]
feata, featb = [], []
for i in range(na):
fid, bid = tracka.boxes[i, 7:9]
feata.append(self.features_dict[fid][bid])
for i in range(nb):
fid, bid = trackb.boxes[i, 7:9]
featb.append(self.features_dict[fid][bid])
feata = np.asarray(feata, dtype=np.float32)
featb = np.asarray(featb, dtype=np.float32)
similarity_matrix = 1-np.maximum(0.0, cdist(feata, featb, metric))
feata_m = np.mean(feata, axis =0)[None,:]
featb_m = np.mean(featb, axis =0)[None,:]
simi_ab = 1 - cdist(feata_m, featb_m, metric)
print(f'tid {int(boxes_a[0, 4])} vs {int(boxes_b[0, 4])}: {simi_ab[0][0]}')
# return np.max(similarity_matrix)
return simi_ab
def merge_tracks_loop(self, alist):
na, nb = len(alist), 0
while na!=nb:
na = len(alist)
alist = self.merge_tracks(alist) #func is from subclass
nb = len(alist)
return alist
def base_merge_tracks(self, Residual):
"""
对不同id但可能是同一商品的目标进行归并
"""
mergedTracks = []
alist = [t for t in Residual]
while alist:
atrack = alist[0]
cur_list = []
cur_list.append(atrack)
alist.pop(0)
blist = [b for b in alist]
alist = []
for btrack in blist:
if track_equal_track(atrack, btrack):
cur_list.append(btrack)
else:
alist.append(btrack)
mergedTracks.append(cur_list)
return mergedTracks
@staticmethod
def join_tracks(tlista, tlistb):
"""Combine two lists of stracks into a single one."""
exists = {}
res = []
for t in tlista:
exists[t.tid] = 1
res.append(t)
for t in tlistb:
tid = t.tid
if not exists.get(tid, 0):
exists[tid] = 1
res.append(t)
return res
@staticmethod
def sub_tracks(tlista, tlistb):
track_ids_b = {t.tid for t in tlistb}
return [t for t in tlista if t.tid not in track_ids_b]