721 lines
25 KiB
Python
721 lines
25 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on Mon Mar 4 18:16:01 2024
|
||
|
||
@author: ym
|
||
"""
|
||
import numpy as np
|
||
import cv2
|
||
from pathlib import Path
|
||
from scipy.spatial.distance import cdist
|
||
from tracking.utils.mergetrack import track_equal_track, readDict
|
||
curpath = Path(__file__).resolve().parents[0]
|
||
|
||
curpath = Path(curpath)
|
||
parpath = curpath.parent
|
||
|
||
class MoveState:
|
||
"""商品运动状态标志"""
|
||
Static = 0
|
||
DownWard = 1
|
||
UpWard = 2
|
||
FreeMove = 3
|
||
Unknown = -1
|
||
|
||
def bbox_ioa(box1, box2, iou=False, eps=1e-7):
|
||
"""
|
||
Calculate the intersection over box2 area given box1 and box2. Boxes are in x1y1x2y2 format.
|
||
|
||
Args:
|
||
box1 (np.array): A numpy array of shape (n, 4) representing n bounding boxes.
|
||
box2 (np.array): A numpy array of shape (m, 4) representing m bounding boxes.
|
||
iou (bool): Calculate the standard iou if True else return inter_area/box2_area.
|
||
eps (float, optional): A small value to avoid division by zero. Defaults to 1e-7.
|
||
|
||
Returns:
|
||
(np.array): A numpy array of shape (n, m) representing the intersection over box2 area.
|
||
"""
|
||
|
||
# Get the coordinates of bounding boxes
|
||
b1_x1, b1_y1, b1_x2, b1_y2 = box1.T
|
||
b2_x1, b2_y1, b2_x2, b2_y2 = box2.T
|
||
|
||
# Intersection area
|
||
inter_area = (np.minimum(b1_x2[:, None], b2_x2) - np.maximum(b1_x1[:, None], b2_x1)).clip(0) * \
|
||
(np.minimum(b1_y2[:, None], b2_y2) - np.maximum(b1_y1[:, None], b2_y1)).clip(0)
|
||
|
||
# box2 area
|
||
area = (b2_x2 - b2_x1) * (b2_y2 - b2_y1)
|
||
if iou:
|
||
box1_area = (b1_x2 - b1_x1) * (b1_y2 - b1_y1)
|
||
area = area + box1_area[:, None] - inter_area
|
||
|
||
# Intersection over box2 area
|
||
return inter_area / (area + eps)
|
||
|
||
class ShoppingCart:
|
||
|
||
def __init__(self, bboxes):
|
||
self.bboxes = bboxes
|
||
self.loadrate = self.load_rate()
|
||
|
||
def load_rate(self):
|
||
bboxes = self.bboxes
|
||
|
||
fid = min(bboxes[:, 7])
|
||
idx = bboxes[:, 7] == fid
|
||
boxes = bboxes[idx]
|
||
|
||
temp = np.zeros(self.incart.shape, np.uint8)
|
||
for i in range(boxes.shape[0]):
|
||
x1, y1, x2, y2, tid = boxes[i, 0:5]
|
||
cv2.rectangle(temp, (int(x1), int(y1)), (int(x2), int(y2)), 255, cv2.FILLED)
|
||
|
||
'''1. and 滤除购物车边框外的干扰'''
|
||
loadstate = cv2.bitwise_and(self.incart, temp)
|
||
|
||
'''2. xor 得到购物车内内被填充的区域'''
|
||
# loadstate = cv2.bitwise_xor(self.incart, temp1)
|
||
|
||
num_loadstate = cv2.countNonZero(loadstate)
|
||
num_incart = cv2.countNonZero(self.incart)
|
||
loadrate = num_loadstate / (num_incart+0.01)
|
||
|
||
# edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png", cv2.IMREAD_GRAYSCALE)
|
||
# cv2.imwrite(f"./test/temp.png", cv2.add(temp, edgeline))
|
||
# cv2.imwrite(f"./test/incart.png", cv2.add(self.incart, edgeline))
|
||
# cv2.imwrite(f"./test/loadstate.png", cv2.add(loadstate, edgeline))
|
||
|
||
return loadrate
|
||
|
||
@property
|
||
def incart(self):
|
||
img = cv2.imread(str(parpath/'shopcart/cart_tempt/incart.png'), cv2.IMREAD_GRAYSCALE)
|
||
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
|
||
|
||
return binary
|
||
|
||
@property
|
||
def outcart(self):
|
||
img = cv2.imread(str(parpath/'shopcart/cart_tempt/outcart.png'), cv2.IMREAD_GRAYSCALE)
|
||
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
|
||
|
||
return binary
|
||
|
||
@property
|
||
def cartedge(self):
|
||
img = cv2.imread(str(parpath/'shopcart/cart_tempt/cartedge.png'), cv2.IMREAD_GRAYSCALE)
|
||
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
|
||
|
||
return binary
|
||
|
||
class Track:
|
||
'''抽象基类,不能实例化对象'''
|
||
def __init__(self, boxes, features=None, imgshape=(1024, 1280)):
|
||
'''
|
||
boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
0 1 2 3 4 5 6 7 8
|
||
'''
|
||
# assert len(set(boxes[:, 4].astype(int))) == 1, "For a Track, track_id more than 1"
|
||
# assert len(set(boxes[:, 6].astype(int))) == 1, "For a Track, class number more than 1"
|
||
|
||
self.boxes = boxes
|
||
self.features = features
|
||
self.slt_boxes = self.select_boxes()
|
||
|
||
self.tid = int(boxes[0, 4])
|
||
self.cls = int(boxes[0, 6])
|
||
self.frnum = boxes.shape[0]
|
||
|
||
self.isCornpoint = False
|
||
self.imgshape = imgshape
|
||
# self.isBorder = False
|
||
# self.state = MoveState.Unknown
|
||
|
||
'''轨迹开始帧、结束帧 ID'''
|
||
# self.start_fid = int(np.min(boxes[:, 7]))
|
||
# self.end_fid = int(np.max(boxes[:, 7]))
|
||
|
||
''''''
|
||
self.Hands = []
|
||
|
||
self.HandsIou = []
|
||
|
||
self.Goods = []
|
||
self.GoodsIou = []
|
||
|
||
|
||
'''5个关键点(中心点、左上点、右上点、左下点、右下点 )坐标'''
|
||
self.compute_cornpoints()
|
||
|
||
'''5个关键点轨迹特征,可以在子类中实现,降低顺序处理时的计算量
|
||
(中心点、左上点、右上点、左下点、右下点 )轨迹特征'''
|
||
self.compute_cornpts_feats()
|
||
|
||
'''应计算各个角点面积、平均面积'''
|
||
mw, mh = np.mean(boxes[:, 2]-boxes[:, 0]), np.mean((boxes[:, 3]-boxes[:, 1]))
|
||
self.mwh = np.mean((mw, mh))
|
||
self.Area = mw * mh
|
||
|
||
'''
|
||
最后一帧与第一帧间的位移:
|
||
vshift: 正值为向下,负值为向上
|
||
hshift: 负值为向购物车边框两边移动,正值为物品向中心移动
|
||
'''
|
||
self.vshift = self.cornpoints[-1, 1] - self.cornpoints[0, 1] # 纵向位移
|
||
self.hshift = abs(self.cornpoints[0, 0] - self.imgshape[0]/2) - \
|
||
abs(self.cornpoints[-1, 0] - self.imgshape[0]/2)
|
||
|
||
'''手部状态分析'''
|
||
self.HAND_STATIC_THRESH = 100
|
||
if self.cls == 0:
|
||
self.extract_hand_features()
|
||
|
||
def select_boxes(self):
|
||
|
||
slt_boxes = []
|
||
idx = np.argsort(self.boxes[:, 7])
|
||
boxes = self.boxes[idx]
|
||
features = self.features[idx]
|
||
|
||
for i in range(len(boxes)):
|
||
simi = None
|
||
box, tid, fid, bid = boxes[i, :4], int(boxes[i, 4]), int(boxes[i, 7]), int(boxes[i, 8])
|
||
|
||
if i == 0:
|
||
slt_boxes.append(boxes[i, :])
|
||
continue
|
||
|
||
if len(boxes)!=len(features):
|
||
print("check!")
|
||
continue
|
||
|
||
box0, tid0, fid0, bid0 = boxes[i-1, :4], int(boxes[i-1, 4]), int(boxes[i-1, 7]), int(boxes[i-1, 8])
|
||
|
||
# 当前 box 和轨迹上一个 box 的iou
|
||
iou = bbox_ioa(box[None, :], box0[None, :])
|
||
|
||
# 当前 box 和轨迹上一个 box 的 feat similarity
|
||
feat0 = features[i, :][None, :]
|
||
feat1 = features[i-1, :][None, :]
|
||
simi = 1 - np.maximum(0.0, cdist(feat0, feat1, "cosine"))[0][0]
|
||
|
||
if iou > 0.85 and simi>0.85:
|
||
continue
|
||
|
||
slt_boxes.append(boxes[i, :])
|
||
|
||
|
||
return np.array(slt_boxes)
|
||
|
||
|
||
def compute_cornpoints(self):
|
||
'''
|
||
cornpoints 共10项,分别是个点的坐标值(x, y)
|
||
(center, top_left, top_right, bottom_left, bottom_right)
|
||
'''
|
||
boxes = self.boxes
|
||
cornpoints = np.zeros((self.frnum, 10))
|
||
cornpoints[:,0] = (boxes[:, 0] + boxes[:, 2]) / 2
|
||
cornpoints[:,1] = (boxes[:, 1] + boxes[:, 3]) / 2
|
||
cornpoints[:,2], cornpoints[:,3] = boxes[:, 0], boxes[:, 1]
|
||
cornpoints[:,4], cornpoints[:,5] = boxes[:, 2], boxes[:, 1]
|
||
cornpoints[:,6], cornpoints[:,7] = boxes[:, 0], boxes[:, 3]
|
||
cornpoints[:,8], cornpoints[:,9] = boxes[:, 2], boxes[:, 3]
|
||
|
||
self.cornpoints = cornpoints
|
||
def compute_cornpts_feats(self):
|
||
'''
|
||
'''
|
||
# print(f"TrackID: {self.tid}")
|
||
trajectory = []
|
||
trajlens = []
|
||
trajdist = []
|
||
trajrects = []
|
||
trajrects_wh = []
|
||
for k in range(5):
|
||
# diff_xy2 = np.power(np.diff(self.cornpoints[:, 2*k:2*(k+1)], axis = 0), 2)
|
||
# trajlen = np.sum(np.sqrt(np.sum(diff_xy2, axis = 1)))
|
||
|
||
X = self.cornpoints[:, 2*k:2*(k+1)]
|
||
|
||
traj = np.linalg.norm(np.diff(X, axis=0), axis=1)
|
||
trajectory.append(traj)
|
||
|
||
trajlen = np.sum(traj)
|
||
trajlens.append(trajlen)
|
||
|
||
ptdist = np.max(cdist(X, X))
|
||
trajdist.append(ptdist)
|
||
|
||
'''最小外接矩形:
|
||
rect[0]: 中心(x, y)
|
||
rect[1]: (w, h)
|
||
rect[0]: 旋转角度 (-90°, 0]
|
||
'''
|
||
rect = cv2.minAreaRect(X.astype(np.int64))
|
||
rect_wh = max(rect[1])
|
||
|
||
|
||
trajrects_wh.append(rect_wh)
|
||
trajrects.append(rect)
|
||
|
||
self.trajectory = trajectory
|
||
self.trajlens = trajlens
|
||
self.trajdist = trajdist
|
||
self.trajrects = trajrects
|
||
self.trajrects_wh = trajrects_wh
|
||
|
||
|
||
|
||
def trajfeature(self):
|
||
'''
|
||
分两种情况计算轨迹特征(检测框边界不在图像边界范围内,在图像边界范围内):
|
||
-最小长度轨迹:trajmin
|
||
-最小轨迹长度:trajlen_min
|
||
-最小轨迹欧氏距离:trajdist_max
|
||
'''
|
||
|
||
# idx1 = self.trajlens.index(max(self.trajlens))
|
||
idx1 = self.trajrects_wh.index(max(self.trajrects_wh))
|
||
|
||
trajmax = self.trajectory[idx1]
|
||
trajlen_max = self.trajlens[idx1]
|
||
trajdist_max = self.trajdist[idx1]
|
||
if not self.isCornpoint:
|
||
# idx2 = self.trajlens.index(min(self.trajlens))
|
||
idx2 = self.trajrects_wh.index(min(self.trajrects_wh))
|
||
|
||
trajmin = self.trajectory[idx2]
|
||
trajlen_min = self.trajlens[idx2]
|
||
trajdist_min = self.trajdist[idx2]
|
||
else:
|
||
trajmin = self.trajectory[0]
|
||
trajlen_min = self.trajlens[0]
|
||
trajdist_min = self.trajdist[0]
|
||
|
||
|
||
'''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小'''
|
||
trajlen_rate = trajlen_min/(trajlen_max+0.0001)
|
||
|
||
'''最小轨迹欧氏距离/目标框尺度均值'''
|
||
trajdist_rate = trajdist_min/(self.mwh+0.0001)
|
||
|
||
|
||
|
||
self.trajmin = trajmin
|
||
self.trajmax = trajmax
|
||
self.TrajFeat = [trajlen_min, trajlen_max,
|
||
trajdist_min, trajdist_max,
|
||
trajlen_rate, trajdist_rate]
|
||
|
||
def pt_state_fids(self, det_y, STATIC_THRESH = 8):
|
||
'''
|
||
前摄时,y一般选择为 box 的 y1 坐标,且需限定商品在购物车内。
|
||
inputs:
|
||
y:1D array,
|
||
parameters:
|
||
STATIC_THRESH:轨迹处于静止状态的阈值。
|
||
outputs:
|
||
输出为差分值小于 STATIC_THRESH 的y中元素的(start, end)索引
|
||
ranges = [(x1, y1),
|
||
(x1, y1),
|
||
...]
|
||
'''
|
||
# print(f"The ID is: {self.tid}")
|
||
|
||
# det_y = np.diff(y, axis=0)
|
||
ranges, rangex = [], []
|
||
|
||
static_indices = np.where(np.abs(det_y) < STATIC_THRESH)[0]
|
||
|
||
if len(static_indices) == 0:
|
||
rangex.append((0, len(det_y)))
|
||
return ranges, rangex
|
||
|
||
start_index = static_indices[0]
|
||
|
||
for i in range(1, len(static_indices)):
|
||
if static_indices[i] != static_indices[i-1] + 1:
|
||
ranges.append((start_index, static_indices[i-1] + 1))
|
||
start_index = static_indices[i]
|
||
ranges.append((start_index, static_indices[-1] + 1))
|
||
|
||
if len(ranges) == 0:
|
||
rangex.append((0, len(det_y)))
|
||
return ranges, rangex
|
||
|
||
idx1, idx2 = ranges[0][0], ranges[-1][1]
|
||
|
||
if idx1 != 0:
|
||
rangex.append((0, idx1))
|
||
|
||
# 轨迹的最后阶段是运动状态
|
||
for k in range(1, len(ranges)):
|
||
index1 = ranges[k-1][1]
|
||
index2 = ranges[k][0]
|
||
rangex.append((index1, index2))
|
||
|
||
if idx2 != len(det_y):
|
||
rangex.append((idx2, len(det_y)))
|
||
|
||
return ranges, rangex
|
||
|
||
def PositionState(self, camerType="back"):
|
||
'''
|
||
camerType: back, 后置摄像头
|
||
front, 前置摄像头
|
||
'''
|
||
if camerType=="back":
|
||
incart = cv2.imread(str(parpath/'shopcart/cart_tempt/incart.png'), cv2.IMREAD_GRAYSCALE)
|
||
outcart = cv2.imread(str(parpath/'shopcart/cart_tempt/outcart.png'), cv2.IMREAD_GRAYSCALE)
|
||
else:
|
||
incart = cv2.imread(str(parpath/'shopcart/cart_tempt/incart_ftmp.png'), cv2.IMREAD_GRAYSCALE)
|
||
outcart = cv2.imread(str(parpath/'shopcart/cart_tempt/outcart_ftmp.png'), cv2.IMREAD_GRAYSCALE)
|
||
|
||
# incart = cv2.imread('./cart_tempt/incart_ftmp.png', cv2.IMREAD_GRAYSCALE)
|
||
# outcart = cv2.imread('./cart_tempt/outcart_ftmp.png', cv2.IMREAD_GRAYSCALE)
|
||
|
||
|
||
xc, yc = self.cornpoints[:,0].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,1].clip(0,self.imgshape[1]-1).astype(np.int64)
|
||
x1, y1 = self.cornpoints[:,6].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,7].clip(0,self.imgshape[1]-1).astype(np.int64)
|
||
x2, y2 = self.cornpoints[:,8].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,9].clip(0,self.imgshape[1]-1).astype(np.int64)
|
||
|
||
# print(self.tid)
|
||
Cent_inCartnum = np.count_nonzero(incart[(yc, xc)])
|
||
LB_inCartnum = np.count_nonzero(incart[(y1, x1)])
|
||
RB_inCartnum = np.count_nonzero(incart[(y2, x2)])
|
||
|
||
Cent_outCartnum = np.count_nonzero(outcart[(yc, xc)])
|
||
LB_outCartnum = np.count_nonzero(outcart[(y1, x1)])
|
||
RB_outCartnum = np.count_nonzero(outcart[(y2, x2)])
|
||
|
||
'''Track完全在车内:左下角点、右下角点与 outcart 的交集为 0'''
|
||
self.isWholeInCart = False
|
||
if LB_outCartnum + RB_outCartnum == 0:
|
||
self.isWholeInCart = True
|
||
|
||
'''Track完全在车外:左下角点、中心点与 incart 的交集为 0
|
||
右下角点、中心点与 incart 的交集为 0
|
||
'''
|
||
self.isWholeOutCart = False
|
||
if Cent_inCartnum + LB_inCartnum == 0 or Cent_inCartnum + RB_inCartnum == 0:
|
||
self.isWholeOutCart = True
|
||
|
||
|
||
self.Cent_isIncart = False
|
||
self.LB_isIncart = False
|
||
self.RB_isIncart = False
|
||
if Cent_inCartnum: self.Cent_isIncart = True
|
||
if LB_inCartnum: self.LB_isIncart = True
|
||
if RB_inCartnum: self.RB_isIncart = True
|
||
|
||
self.posState = self.Cent_isIncart+self.LB_isIncart+self.RB_isIncart
|
||
|
||
|
||
def is_freemove(self):
|
||
# if self.tid==4:
|
||
# print(f"track ID: {self.tid}")
|
||
# boxes = self.boxes
|
||
# features = self.features
|
||
# similars = 1 - np.maximum(0.0, cdist(self.features, self.features, metric = 'cosine'))
|
||
|
||
box1 = self.boxes[0, :4]
|
||
box2 = self.boxes[-1, :4]
|
||
|
||
''' 第1帧、最后一帧subimg的相似度 '''
|
||
feat1 = self.features[0, :][None, :]
|
||
feat2 = self.features[-1, :][None, :]
|
||
similar = 1 - np.maximum(0.0, cdist(feat1, feat2, metric = 'cosine'))
|
||
condta = similar > 0.8
|
||
|
||
''' 第1帧、最后一帧 boxes 四个角点间的距离 '''
|
||
ptd = box2 - box1
|
||
ptd1 = np.linalg.norm((ptd[0], ptd[1]))
|
||
ptd2 = np.linalg.norm((ptd[2], ptd[1]))
|
||
ptd3 = np.linalg.norm((ptd[0], ptd[3]))
|
||
ptd4 = np.linalg.norm((ptd[2], ptd[3]))
|
||
condtb = ptd1<50 and ptd2<50 and ptd3<50 and ptd4<50
|
||
|
||
condt = condta and condtb
|
||
return condt
|
||
|
||
|
||
def extract_hand_features(self):
|
||
assert self.cls == 0, "The class of traj must be HAND!"
|
||
|
||
self.isHandStatic = False
|
||
|
||
x0 = (self.boxes[:, 0] + self.boxes[:, 2]) / 2
|
||
y0 = (self.boxes[:, 1] + self.boxes[:, 3]) / 2
|
||
|
||
handXY = np.stack((x0, y0), axis=-1)
|
||
# handMaxY0 = np.max(y0)
|
||
|
||
handCenter = np.array([(max(x0)+min(x0))/2, (max(y0)+min(y0))/2])
|
||
|
||
handMaxDist = np.max(np.linalg.norm(handXY - handCenter))
|
||
|
||
if handMaxDist < self.HAND_STATIC_THRESH:
|
||
self.isHandStatic = True
|
||
|
||
return
|
||
|
||
|
||
class doTracks:
|
||
def __init__(self, bboxes, trackefeats):
|
||
'''fundamental property
|
||
trackefeats: dict, key 格式 "fid_bid"
|
||
'''
|
||
self.bboxes = bboxes
|
||
# self.TracksDict = TracksDict
|
||
self.frameID = np.unique(bboxes[:, 7].astype(int))
|
||
self.trackID = np.unique(bboxes[:, 4].astype(int))
|
||
|
||
self.lboxes = self.array2list()
|
||
self.lfeats = self.getfeats(trackefeats)
|
||
|
||
'''对 self.tracks 中的元素进行分类,将 track 归入相应列表中'''
|
||
self.Hands = []
|
||
self.Kids = []
|
||
self.Static = []
|
||
self.Residual = []
|
||
self.Confirmed = []
|
||
self.DownWard = [] # subset of self.Residual
|
||
self.UpWard = [] # subset of self.Residual
|
||
self.FreeMove = [] # subset of self.Residual
|
||
|
||
|
||
|
||
|
||
def array2list(self):
|
||
'''
|
||
将 bboxes 变换为 track 列表
|
||
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
Return:
|
||
lboxes:列表,列表中元素具有同一 track_id,x1y1x2y2 格式
|
||
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
'''
|
||
track_ids = self.bboxes[:, 4].astype(int)
|
||
lboxes = []
|
||
for t_id in self.trackID:
|
||
# print(f"The ID is: {t_id}")
|
||
idx = np.where(track_ids == t_id)[0]
|
||
box = self.bboxes[idx, :]
|
||
|
||
assert len(set(box[:, 7])) == len(box), "Please check!!!"
|
||
|
||
lboxes.append(box)
|
||
|
||
return lboxes
|
||
|
||
def getfeats(self, trackefeats):
|
||
lboxes = self.lboxes
|
||
lfeats = []
|
||
for boxes in lboxes:
|
||
feats = []
|
||
for i in range(boxes.shape[0]):
|
||
fid, bid = int(boxes[i, 7]), int(boxes[i, 8])
|
||
key = f"{int(fid)}_{int(bid)}"
|
||
if key in trackefeats:
|
||
feats.append(trackefeats[key])
|
||
feats = np.asarray(feats, dtype=np.float32)
|
||
lfeats.append(feats)
|
||
|
||
return lfeats
|
||
|
||
|
||
|
||
def similarity(self):
|
||
nt = len(self.tracks)
|
||
similar_dict = {}
|
||
if nt >= 2:
|
||
for i in range(nt):
|
||
for j in range(i, nt):
|
||
tracka = self.tracks[i]
|
||
trackb = self.tracks[j]
|
||
similar = self.feat_similarity(tracka, trackb)
|
||
similar_dict.update({(tracka.tid, trackb.tid): similar})
|
||
return similar_dict
|
||
|
||
|
||
def feat_similarity(self, tracka, trackb, metric='cosine'):
|
||
boxes_a, boxes_b = tracka.boxes, trackb.boxes
|
||
na, nb = tracka.boxes.shape[0], trackb.boxes.shape[0]
|
||
feata, featb = [], []
|
||
for i in range(na):
|
||
fid, bid = tracka.boxes[i, 7:9]
|
||
feata.append(self.features_dict[fid][bid])
|
||
for i in range(nb):
|
||
fid, bid = trackb.boxes[i, 7:9]
|
||
featb.append(self.features_dict[fid][bid])
|
||
|
||
feata = np.asarray(feata, dtype=np.float32)
|
||
featb = np.asarray(featb, dtype=np.float32)
|
||
similarity_matrix = 1-np.maximum(0.0, cdist(feata, featb, metric))
|
||
|
||
feata_m = np.mean(feata, axis =0)[None,:]
|
||
featb_m = np.mean(featb, axis =0)[None,:]
|
||
simi_ab = 1 - cdist(feata_m, featb_m, metric)
|
||
print(f'tid {int(boxes_a[0, 4])} vs {int(boxes_b[0, 4])}: {simi_ab[0][0]}')
|
||
|
||
# return np.max(similarity_matrix)
|
||
return simi_ab
|
||
|
||
def merge_tracks_loop(self, alist):
|
||
na, nb = len(alist), 0
|
||
while na!=nb:
|
||
na = len(alist)
|
||
alist = self.merge_tracks(alist) #func is from subclass
|
||
nb = len(alist)
|
||
return alist
|
||
|
||
def base_merge_tracks(self, Residual):
|
||
"""
|
||
对不同id,但可能是同一商品的目标进行归并
|
||
"""
|
||
mergedTracks = []
|
||
alist = [t for t in Residual]
|
||
while alist:
|
||
atrack = alist[0]
|
||
cur_list = []
|
||
cur_list.append(atrack)
|
||
alist.pop(0)
|
||
|
||
blist = [b for b in alist]
|
||
alist = []
|
||
for btrack in blist:
|
||
# afids = []
|
||
# for track in cur_list:
|
||
# afids.extend(list(track.boxes[:, 7].astype(np.int_)))
|
||
# bfids = btrack.boxes[:, 7].astype(np.int_)
|
||
# interfid = set(afids).intersection(set(bfids))
|
||
# if len(interfid):
|
||
# print("wait!!!")
|
||
# if track_equal_track(atrack, btrack) and len(interfid)==0:
|
||
if track_equal_track(atrack, btrack):
|
||
cur_list.append(btrack)
|
||
else:
|
||
alist.append(btrack)
|
||
|
||
mergedTracks.append(cur_list)
|
||
|
||
return mergedTracks
|
||
|
||
@staticmethod
|
||
def join_tracks(tlista, tlistb):
|
||
"""Combine two lists of stracks into a single one."""
|
||
exists = {}
|
||
res = []
|
||
for t in tlista:
|
||
exists[t.tid] = 1
|
||
res.append(t)
|
||
for t in tlistb:
|
||
tid = t.tid
|
||
if not exists.get(tid, 0):
|
||
exists[tid] = 1
|
||
res.append(t)
|
||
return res
|
||
|
||
@staticmethod
|
||
def sub_tracks(tlista, tlistb):
|
||
track_ids_b = {t.tid for t in tlistb}
|
||
return [t for t in tlista if t.tid not in track_ids_b]
|
||
|
||
|
||
|
||
def array2frame(self, bboxes):
|
||
frameID = np.sort(np.unique(bboxes[:, 7].astype(int)))
|
||
fboxes = []
|
||
for fid in frameID:
|
||
idx = np.where(bboxes[:, 7] == fid)[0]
|
||
box = bboxes[idx, :]
|
||
fboxes.append(box)
|
||
return fboxes
|
||
|
||
|
||
def isintrude(self):
|
||
'''
|
||
boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||
0 1 2 3 4 5 6 7 8
|
||
'''
|
||
OverlapNum = 3
|
||
bboxes = self.bboxes.astype(np.int64)
|
||
fboxes = self.array2frame(bboxes)
|
||
|
||
incart = cv2.bitwise_not(self.incart)
|
||
sum_incart = np.zeros(incart.shape, dtype=np.int64)
|
||
for fid, boxes in enumerate(fboxes):
|
||
for i in range(len(boxes)):
|
||
x1, y1, x2, y2 = boxes[i, 0:4]
|
||
sum_incart[y1:y2, x1:x2] += 1
|
||
|
||
sumincart = np.zeros(sum_incart.shape, dtype=np.uint8)
|
||
idx255 = np.where(sum_incart >= OverlapNum)
|
||
sumincart[idx255] = 255
|
||
|
||
idxnzr = np.where(sum_incart!=0)
|
||
base = np.zeros(sum_incart.shape, dtype=np.uint8)
|
||
base[idxnzr] = 255
|
||
|
||
contours_sum, _ = cv2.findContours(sumincart, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
contours_base, _ = cv2.findContours(base, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
||
have_existed, invasion = [], []
|
||
for k, ct_temp in enumerate(contours_base):
|
||
tmp1 = np.zeros(sum_incart.shape, dtype=np.uint8)
|
||
cv2.drawContours(tmp1, [ct_temp], -1, 255, cv2.FILLED)
|
||
|
||
# 确定轮廓的包含关系
|
||
for ct_sum in contours_sum:
|
||
tmp2 = np.zeros(sum_incart.shape, dtype=np.uint8)
|
||
cv2.drawContours(tmp2, [ct_sum], -1, 255, cv2.FILLED)
|
||
tmp = cv2.bitwise_and(tmp1, tmp2)
|
||
if np.count_nonzero(tmp) == np.count_nonzero(tmp2):
|
||
have_existed.append(k)
|
||
|
||
inIdx = [i for i in range(len(contours_base)) if i not in have_existed]
|
||
invasion = np.zeros(sum_incart.shape, dtype=np.uint8)
|
||
|
||
for i in inIdx:
|
||
cv2.drawContours(invasion, [contours_base[i]], -1, 255, cv2.FILLED)
|
||
cv2.imwrite("./result/intrude/invasion.png", invasion)
|
||
|
||
|
||
Intrude = True if len(inIdx)>=1 else False
|
||
print(f"is intruded: {Intrude}")
|
||
|
||
return Intrude
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|