initial project version!
This commit is contained in:
0
tracking/dotrack/__init__.py
Normal file
0
tracking/dotrack/__init__.py
Normal file
BIN
tracking/dotrack/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
tracking/dotrack/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
tracking/dotrack/__pycache__/dotracks.cpython-39.pyc
Normal file
BIN
tracking/dotrack/__pycache__/dotracks.cpython-39.pyc
Normal file
Binary file not shown.
BIN
tracking/dotrack/__pycache__/dotracks_back.cpython-39.pyc
Normal file
BIN
tracking/dotrack/__pycache__/dotracks_back.cpython-39.pyc
Normal file
Binary file not shown.
BIN
tracking/dotrack/__pycache__/dotracks_front.cpython-39.pyc
Normal file
BIN
tracking/dotrack/__pycache__/dotracks_front.cpython-39.pyc
Normal file
Binary file not shown.
BIN
tracking/dotrack/__pycache__/track_back.cpython-39.pyc
Normal file
BIN
tracking/dotrack/__pycache__/track_back.cpython-39.pyc
Normal file
Binary file not shown.
BIN
tracking/dotrack/__pycache__/track_front.cpython-39.pyc
Normal file
BIN
tracking/dotrack/__pycache__/track_front.cpython-39.pyc
Normal file
Binary file not shown.
466
tracking/dotrack/dotracks.py
Normal file
466
tracking/dotrack/dotracks.py
Normal file
@ -0,0 +1,466 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Mon Mar 4 18:16:01 2024
|
||||
|
||||
@author: ym
|
||||
"""
|
||||
import numpy as np
|
||||
import cv2
|
||||
from pathlib import Path
|
||||
from scipy.spatial.distance import cdist
|
||||
from utils.mergetrack import track_equal_track
|
||||
curpath = Path(__file__).resolve().parents[0]
|
||||
|
||||
curpath = Path(curpath)
|
||||
|
||||
class MoveState:
|
||||
"""商品运动状态标志"""
|
||||
Static = 0
|
||||
DownWard = 1
|
||||
UpWard = 2
|
||||
FreeMove = 3
|
||||
Unknown = -1
|
||||
|
||||
class ShoppingCart:
|
||||
|
||||
def __init__(self, bboxes):
|
||||
self.bboxes = bboxes
|
||||
self.loadrate = self.load_rate()
|
||||
|
||||
def load_rate(self):
|
||||
bboxes = self.bboxes
|
||||
|
||||
fid = min(bboxes[:, 7])
|
||||
idx = bboxes[:, 7] == fid
|
||||
boxes = bboxes[idx]
|
||||
|
||||
temp = np.zeros(self.incart.shape, np.uint8)
|
||||
for i in range(boxes.shape[0]):
|
||||
x1, y1, x2, y2, tid = boxes[i, 0:5]
|
||||
cv2.rectangle(temp, (int(x1), int(y1)), (int(x2), int(y2)), 255, cv2.FILLED)
|
||||
|
||||
'''1. and 滤除购物车边框外的干扰'''
|
||||
loadstate = cv2.bitwise_and(self.incart, temp)
|
||||
|
||||
'''2. xor 得到购物车内内被填充的区域'''
|
||||
# loadstate = cv2.bitwise_xor(self.incart, temp1)
|
||||
|
||||
num_loadstate = cv2.countNonZero(loadstate)
|
||||
num_incart = cv2.countNonZero(self.incart)
|
||||
loadrate = num_loadstate / (num_incart+0.01)
|
||||
|
||||
# edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png", cv2.IMREAD_GRAYSCALE)
|
||||
# cv2.imwrite(f"./test/temp.png", cv2.add(temp, edgeline))
|
||||
# cv2.imwrite(f"./test/incart.png", cv2.add(self.incart, edgeline))
|
||||
# cv2.imwrite(f"./test/loadstate.png", cv2.add(loadstate, edgeline))
|
||||
|
||||
return loadrate
|
||||
|
||||
@property
|
||||
def incart(self):
|
||||
img = cv2.imread(str(curpath/'cart_tempt/back_incart.png'), cv2.IMREAD_GRAYSCALE)
|
||||
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
|
||||
|
||||
return binary
|
||||
|
||||
@property
|
||||
def outcart(self):
|
||||
img = cv2.imread(str(curpath/'cart_tempt/back_outcart.png'), cv2.IMREAD_GRAYSCALE)
|
||||
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
|
||||
|
||||
return binary
|
||||
|
||||
@property
|
||||
def cartedge(self):
|
||||
img = cv2.imread(str(curpath/'cart_tempt/back_cartedge.png'), cv2.IMREAD_GRAYSCALE)
|
||||
ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY)
|
||||
|
||||
return binary
|
||||
|
||||
class Track:
|
||||
'''抽象基类,不能实例化对象'''
|
||||
def __init__(self, boxes, imgshape=(1024, 1280)):
|
||||
'''
|
||||
boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
0 1 2 3 4 5 6 7 8
|
||||
'''
|
||||
# 不满足以下条件时会如何?
|
||||
assert len(set(boxes[:, 4].astype(int))) == 1, "For a Track, track_id more than 1"
|
||||
assert len(set(boxes[:, 6].astype(int))) == 1, "For a Track, class number more than 1"
|
||||
|
||||
self.boxes = boxes
|
||||
self.tid = int(boxes[0, 4])
|
||||
self.cls = int(boxes[0, 6])
|
||||
self.frnum = boxes.shape[0]
|
||||
self.imgBorder = False
|
||||
self.imgshape = imgshape
|
||||
self.state = MoveState.Unknown
|
||||
|
||||
'''轨迹开始帧、结束帧 ID'''
|
||||
self.start_fid = int(np.min(boxes[:, 7]))
|
||||
self.end_fid = int(np.max(boxes[:, 7]))
|
||||
|
||||
|
||||
|
||||
'''5个关键点(中心点、左上点、右上点、左下点、右下点 )坐标'''
|
||||
self.compute_cornpoints()
|
||||
|
||||
'''5个关键点轨迹特征,可以在子类中实现,降低顺序处理时的计算量'''
|
||||
self.compute_cornpts_feats()
|
||||
|
||||
|
||||
mw, mh = np.mean(boxes[:, 2]-boxes[:, 0]), np.mean((boxes[:, 3]-boxes[:, 1]))
|
||||
self.mwh = np.mean((mw, mh))
|
||||
self.Area = mw * mh
|
||||
|
||||
'''
|
||||
最后一帧与第一帧间的位移:
|
||||
vshift: 正值为向下,负值为向上
|
||||
hshift: 负值为向购物车边框两边移动,正值为物品向中心移动
|
||||
'''
|
||||
self.vshift = self.cornpoints[-1, 1] - self.cornpoints[0, 1] # 纵向位移
|
||||
self.hshift = abs(self.cornpoints[0, 0] - self.imgshape[0]/2) - \
|
||||
abs(self.cornpoints[-1, 0] - self.imgshape[0]/2)
|
||||
|
||||
'''手部状态分析'''
|
||||
self.HAND_STATIC_THRESH = 100
|
||||
if self.cls == 0:
|
||||
self.extract_hand_features()
|
||||
|
||||
|
||||
def compute_cornpoints(self):
|
||||
'''
|
||||
cornpoints 共10项,分别是个点的坐标值(x, y)
|
||||
(center, top_left, top_right, bottom_left, bottom_right)
|
||||
'''
|
||||
boxes = self.boxes
|
||||
cornpoints = np.zeros((self.frnum, 10))
|
||||
cornpoints[:,0] = (boxes[:, 0] + boxes[:, 2]) / 2
|
||||
cornpoints[:,1] = (boxes[:, 1] + boxes[:, 3]) / 2
|
||||
cornpoints[:,2], cornpoints[:,3] = boxes[:, 0], boxes[:, 1]
|
||||
cornpoints[:,4], cornpoints[:,5] = boxes[:, 2], boxes[:, 1]
|
||||
cornpoints[:,6], cornpoints[:,7] = boxes[:, 0], boxes[:, 3]
|
||||
cornpoints[:,8], cornpoints[:,9] = boxes[:, 2], boxes[:, 3]
|
||||
|
||||
self.cornpoints = cornpoints
|
||||
def compute_cornpts_feats(self):
|
||||
'''
|
||||
'''
|
||||
trajectory = []
|
||||
trajlens = []
|
||||
trajdist = []
|
||||
trajrects = []
|
||||
for k in range(5):
|
||||
# diff_xy2 = np.power(np.diff(self.cornpoints[:, 2*k:2*(k+1)], axis = 0), 2)
|
||||
# trajlen = np.sum(np.sqrt(np.sum(diff_xy2, axis = 1)))
|
||||
|
||||
X = self.cornpoints[:, 2*k:2*(k+1)]
|
||||
|
||||
traj = np.linalg.norm(np.diff(X, axis=0), axis=1)
|
||||
trajectory.append(traj)
|
||||
|
||||
trajlen = np.sum(traj)
|
||||
trajlens.append(trajlen)
|
||||
|
||||
ptdist = np.max(cdist(X, X))
|
||||
trajdist.append(ptdist)
|
||||
|
||||
'''最小外接矩形:
|
||||
rect[0]: 中心(x, y)
|
||||
rect[1]: (w, h)
|
||||
rect[0]: 旋转角度 (-90°, 0]
|
||||
'''
|
||||
rect = cv2.minAreaRect(X.astype(np.int64))
|
||||
trajrects.append(rect)
|
||||
|
||||
self.trajectory = trajectory
|
||||
self.trajlens = trajlens
|
||||
self.trajdist = trajdist
|
||||
self.trajrects = trajrects
|
||||
|
||||
|
||||
|
||||
def trajfeature(self):
|
||||
'''
|
||||
分两种情况计算轨迹特征(检测框边界不在图像边界范围内,在图像边界范围内):
|
||||
-最小长度轨迹:trajmin
|
||||
-最小轨迹长度:trajlen_min
|
||||
-最小轨迹欧氏距离:trajdist_max
|
||||
'''
|
||||
idx1 = self.trajlens.index(max(self.trajlens))
|
||||
trajmax = self.trajectory[idx1]
|
||||
trajlen_max = self.trajlens[idx1]
|
||||
trajdist_max = self.trajdist[idx1]
|
||||
if not self.isCornpoint:
|
||||
idx2 = self.trajlens.index(min(self.trajlens))
|
||||
trajmin = self.trajectory[idx2]
|
||||
trajlen_min = self.trajlens[idx2]
|
||||
trajdist_min = self.trajdist[idx2]
|
||||
else:
|
||||
trajmin = self.trajectory[0]
|
||||
trajlen_min = self.trajlens[0]
|
||||
trajdist_min = self.trajdist[0]
|
||||
|
||||
|
||||
'''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小'''
|
||||
trajlen_rate = trajlen_min/(trajlen_max+0.0001)
|
||||
|
||||
'''最小轨迹欧氏距离/目标框尺度均值'''
|
||||
trajdist_rate = trajdist_min/(self.mwh+0.0001)
|
||||
|
||||
|
||||
|
||||
self.trajmin = trajmin
|
||||
self.trajmax = trajmax
|
||||
self.feature = [trajlen_min, trajlen_max,
|
||||
trajdist_min, trajdist_max,
|
||||
trajlen_rate, trajdist_rate]
|
||||
|
||||
def compute_static_fids(self, det_y, STATIC_THRESH = 8):
|
||||
'''
|
||||
前摄时,y一般选择为 box 的 y1 坐标,且需限定商品在购物车内。
|
||||
inputs:
|
||||
y:1D array,
|
||||
parameters:
|
||||
STATIC_THRESH:轨迹处于静止状态的阈值。
|
||||
outputs:
|
||||
输出为差分值小于 STATIC_THRESH 的y中元素的(start, end)索引
|
||||
ranges = [(x1, y1),
|
||||
(x1, y1),
|
||||
...]
|
||||
'''
|
||||
# print(f"The ID is: {self.tid}")
|
||||
|
||||
# det_y = np.diff(y, axis=0)
|
||||
ranges, rangex = [], []
|
||||
|
||||
static_indices = np.where(np.abs(det_y) < STATIC_THRESH)[0]
|
||||
|
||||
if len(static_indices) == 0:
|
||||
rangex.append((0, len(det_y)))
|
||||
return ranges, rangex
|
||||
|
||||
start_index = static_indices[0]
|
||||
|
||||
for i in range(1, len(static_indices)):
|
||||
if static_indices[i] != static_indices[i-1] + 1:
|
||||
ranges.append((start_index, static_indices[i-1] + 1))
|
||||
start_index = static_indices[i]
|
||||
ranges.append((start_index, static_indices[-1] + 1))
|
||||
|
||||
if len(ranges) == 0:
|
||||
rangex.append((0, len(det_y)))
|
||||
return ranges, rangex
|
||||
|
||||
idx1, idx2 = ranges[0][0], ranges[-1][1]
|
||||
|
||||
if idx1 != 0:
|
||||
rangex.append((0, idx1))
|
||||
|
||||
# 轨迹的最后阶段是运动状态
|
||||
for k in range(1, len(ranges)):
|
||||
index1 = ranges[k-1][1]
|
||||
index2 = ranges[k][0]
|
||||
rangex.append((index1, index2))
|
||||
|
||||
if idx2 != len(det_y):
|
||||
rangex.append((idx2, len(det_y)))
|
||||
|
||||
return ranges, rangex
|
||||
|
||||
|
||||
def extract_hand_features(self):
|
||||
assert self.cls == 0, "The class of traj must be HAND!"
|
||||
|
||||
self.isHandStatic = False
|
||||
|
||||
x0 = (self.boxes[:, 0] + self.boxes[:, 2]) / 2
|
||||
y0 = (self.boxes[:, 1] + self.boxes[:, 3]) / 2
|
||||
|
||||
handXY = np.stack((x0, y0), axis=-1)
|
||||
# handMaxY0 = np.max(y0)
|
||||
|
||||
handCenter = np.array([(max(x0)+min(x0))/2, (max(y0)+min(y0))/2])
|
||||
|
||||
handMaxDist = np.max(np.linalg.norm(handXY - handCenter))
|
||||
|
||||
if handMaxDist < self.HAND_STATIC_THRESH:
|
||||
self.isHandStatic = True
|
||||
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
class doTracks:
|
||||
def __init__(self, bboxes, TracksDict):
|
||||
'''fundamental property'''
|
||||
self.bboxes = bboxes
|
||||
self.TracksDict = TracksDict
|
||||
self.frameID = np.unique(bboxes[:, 7].astype(int))
|
||||
self.trackID = np.unique(bboxes[:, 4].astype(int))
|
||||
self.lboxes = self.array2list()
|
||||
|
||||
'''对 self.tracks 中的元素进行分类,将 track 归入相应列表中'''
|
||||
self.Hands = []
|
||||
self.Kids = []
|
||||
self.Static = []
|
||||
self.Residual = []
|
||||
self.DownWard = [] # subset of self.Residual
|
||||
self.UpWard = [] # subset of self.Residual
|
||||
self.FreeMove = [] # subset of self.Residual
|
||||
|
||||
|
||||
def array2list(self):
|
||||
'''
|
||||
将 bboxes 变换为 track 列表
|
||||
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
Return:
|
||||
lboxes:列表,列表中元素具有同一 track_id,x1y1x2y2 格式
|
||||
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
'''
|
||||
track_ids = self.bboxes[:, 4].astype(int)
|
||||
lboxes = []
|
||||
for t_id in self.trackID:
|
||||
# print(f"The ID is: {t_id}")
|
||||
idx = np.where(track_ids == t_id)[0]
|
||||
box = self.bboxes[idx, :]
|
||||
|
||||
lboxes.append(box)
|
||||
|
||||
return lboxes
|
||||
|
||||
def classify(self):
|
||||
|
||||
tracks = self.tracks
|
||||
|
||||
|
||||
# 提取手的frame_id,并和动目标的frame_id 进行关联
|
||||
hand_tracks = [t for t in tracks if t.cls==0]
|
||||
self.Hands.extend(hand_tracks)
|
||||
tracks = self.sub_tracks(tracks, hand_tracks)
|
||||
|
||||
|
||||
|
||||
# 提取小孩的track,并计算状态:left, right, incart
|
||||
kid_tracks = [t for t in tracks if t.cls==9]
|
||||
kid_states = [self.kid_state(t) for t in kid_tracks]
|
||||
self.Kids = [x for x in zip(kid_tracks, kid_states)]
|
||||
|
||||
tracks = self.sub_tracks(tracks, kid_tracks)
|
||||
|
||||
|
||||
static_tracks = [t for t in tracks if t.frnum>1 and t.is_static()]
|
||||
self.Static.extend(static_tracks)
|
||||
|
||||
|
||||
'''剔除静止目标后的 tracks'''
|
||||
tracks = self.sub_tracks(tracks, static_tracks)
|
||||
|
||||
return tracks
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def similarity(self):
|
||||
nt = len(self.tracks)
|
||||
similar_dict = {}
|
||||
if nt >= 2:
|
||||
for i in range(nt):
|
||||
for j in range(i, nt):
|
||||
tracka = self.tracks[i]
|
||||
trackb = self.tracks[j]
|
||||
similar = self.feat_similarity(tracka, trackb)
|
||||
similar_dict.update({(tracka.tid, trackb.tid): similar})
|
||||
return similar_dict
|
||||
|
||||
|
||||
def feat_similarity(self, tracka, trackb, metric='cosine'):
|
||||
boxes_a, boxes_b = tracka.boxes, trackb.boxes
|
||||
na, nb = tracka.boxes.shape[0], trackb.boxes.shape[0]
|
||||
feata, featb = [], []
|
||||
for i in range(na):
|
||||
fid, bid = tracka.boxes[i, 7:9]
|
||||
feata.append(self.features_dict[fid][bid])
|
||||
for i in range(nb):
|
||||
fid, bid = trackb.boxes[i, 7:9]
|
||||
featb.append(self.features_dict[fid][bid])
|
||||
|
||||
feata = np.asarray(feata, dtype=np.float32)
|
||||
featb = np.asarray(featb, dtype=np.float32)
|
||||
similarity_matrix = 1-np.maximum(0.0, cdist(feata, featb, metric))
|
||||
|
||||
feata_m = np.mean(feata, axis =0)[None,:]
|
||||
featb_m = np.mean(featb, axis =0)[None,:]
|
||||
simi_ab = 1 - cdist(feata_m, featb_m, metric)
|
||||
print(f'tid {int(boxes_a[0, 4])} vs {int(boxes_b[0, 4])}: {simi_ab[0][0]}')
|
||||
|
||||
# return np.max(similarity_matrix)
|
||||
return simi_ab
|
||||
|
||||
def merge_tracks_loop(self, alist):
|
||||
na, nb = len(alist), 0
|
||||
while na!=nb:
|
||||
na = len(alist)
|
||||
alist = self.merge_tracks(alist) #func is from subclass
|
||||
nb = len(alist)
|
||||
return alist
|
||||
|
||||
def base_merge_tracks(self, Residual):
|
||||
"""
|
||||
对不同id,但可能是同一商品的目标进行归并
|
||||
"""
|
||||
mergedTracks = []
|
||||
alist = [t for t in Residual]
|
||||
while alist:
|
||||
atrack = alist[0]
|
||||
cur_list = []
|
||||
cur_list.append(atrack)
|
||||
alist.pop(0)
|
||||
|
||||
blist = [b for b in alist]
|
||||
alist = []
|
||||
for btrack in blist:
|
||||
if track_equal_track(atrack, btrack, self.TracksDict):
|
||||
cur_list.append(btrack)
|
||||
else:
|
||||
alist.append(btrack)
|
||||
|
||||
mergedTracks.append(cur_list)
|
||||
|
||||
return mergedTracks
|
||||
|
||||
|
||||
@staticmethod
|
||||
def join_tracks(tlista, tlistb):
|
||||
"""Combine two lists of stracks into a single one."""
|
||||
exists = {}
|
||||
res = []
|
||||
for t in tlista:
|
||||
exists[t.tid] = 1
|
||||
res.append(t)
|
||||
for t in tlistb:
|
||||
tid = t.tid
|
||||
if not exists.get(tid, 0):
|
||||
exists[tid] = 1
|
||||
res.append(t)
|
||||
return res
|
||||
|
||||
@staticmethod
|
||||
def sub_tracks(tlista, tlistb):
|
||||
track_ids_b = {t.tid for t in tlistb}
|
||||
return [t for t in tlista if t.tid not in track_ids_b]
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
179
tracking/dotrack/dotracks_back.py
Normal file
179
tracking/dotrack/dotracks_back.py
Normal file
@ -0,0 +1,179 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Mon Mar 4 18:36:31 2024
|
||||
|
||||
@author: ym
|
||||
"""
|
||||
import numpy as np
|
||||
from utils.mergetrack import track_equal_track
|
||||
from scipy.spatial.distance import cdist
|
||||
from .dotracks import doTracks, ShoppingCart
|
||||
from .track_back import backTrack
|
||||
|
||||
|
||||
class doBackTracks(doTracks):
|
||||
def __init__(self, bboxes, TracksDict):
|
||||
|
||||
super().__init__(bboxes, TracksDict)
|
||||
|
||||
self.tracks = [backTrack(b) for b in self.lboxes]
|
||||
|
||||
# self.similar_dict = self.similarity()
|
||||
|
||||
|
||||
self.shopcart = ShoppingCart(bboxes)
|
||||
|
||||
# =============================================================================
|
||||
# def array2list(self):
|
||||
# ''' 0, 1, 2, 3, 4, 5, 6, 7, 8
|
||||
# bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
# lboxes:[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
# '''
|
||||
#
|
||||
# track_ids = set(self.bboxes[:, 4])
|
||||
# lboxes = []
|
||||
# for t_id in track_ids:
|
||||
# idx = np.where(self.bboxes[:, 4] == t_id)[0]
|
||||
# box = self.bboxes[idx, :]
|
||||
#
|
||||
# x = (box[:, 0] + box[:, 2]) / 2
|
||||
# y = (box[:, 1] + box[:, 3]) / 2
|
||||
#
|
||||
# # box: [x, y, w, h, track_id, score, cls, frame_index]
|
||||
# box[:, 2] = box[:, 2] - box[:, 0]
|
||||
# box[:, 3] = box[:, 3] - box[:, 1]
|
||||
# box[:, 0] = x
|
||||
# box[:, 1] = y
|
||||
#
|
||||
# lboxes.append(box)
|
||||
#
|
||||
#
|
||||
# return lboxes
|
||||
# =============================================================================
|
||||
|
||||
|
||||
|
||||
def classify(self):
|
||||
'''
|
||||
功能:对 tracks 中元素分类
|
||||
|
||||
'''
|
||||
tracks = super().classify()
|
||||
|
||||
# tracks = self.tracks
|
||||
# shopcart = self.shopcart
|
||||
|
||||
# # 提取手的frame_id,并和动目标的frame_id 进行关联
|
||||
# hand_tracks = [t for t in tracks if t.cls==0]
|
||||
# self.Hands.extend(hand_tracks)
|
||||
# tracks = self.sub_tracks(tracks, hand_tracks)
|
||||
|
||||
|
||||
|
||||
# # 提取小孩的track,并计算状态:left, right, incart
|
||||
# kid_tracks = [t for t in tracks if t.cls==9]
|
||||
# kid_states = [self.kid_state(t) for t in kid_tracks]
|
||||
# self.Kids = [x for x in zip(kid_tracks, kid_states)]
|
||||
|
||||
# tracks = self.sub_tracks(tracks, kid_tracks)
|
||||
|
||||
|
||||
# static_tracks = [t for t in tracks if t.frnum>1 and t.is_static()]
|
||||
# self.Static.extend(static_tracks)
|
||||
|
||||
|
||||
# '''剔除静止目标后的 tracks'''
|
||||
# tracks = self.sub_tracks(tracks, static_tracks)
|
||||
|
||||
|
||||
|
||||
'''购物框边界外具有运动状态的干扰目标'''
|
||||
out_trcak = [t for t in tracks if t.is_OutTrack()]
|
||||
|
||||
tracks = self.sub_tracks(tracks, out_trcak)
|
||||
|
||||
|
||||
'''轨迹循环归并'''
|
||||
# merged_tracks = self.merge_tracks(tracks)
|
||||
merged_tracks = self.merge_tracks_loop(tracks)
|
||||
tracks = [t for t in merged_tracks if t.frnum > 1]
|
||||
|
||||
self.Residual = tracks
|
||||
|
||||
|
||||
|
||||
def merge_tracks(self, Residual):
|
||||
"""
|
||||
对不同id,但可能是同一商品的目标进行归并
|
||||
"""
|
||||
mergedTracks = self.base_merge_tracks(Residual)
|
||||
|
||||
oldtracks, newtracks = [], []
|
||||
for tracklist in mergedTracks:
|
||||
if len(tracklist) > 1:
|
||||
boxes = np.empty((0, 9), dtype=np.float32)
|
||||
for i, track in enumerate(tracklist):
|
||||
if i==0: ntid, ncls=track.boxes[0, 4], track.boxes[0, 6]
|
||||
iboxes = track.boxes.copy()
|
||||
|
||||
iboxes[:, 4], iboxes[:, 6] = ntid, ncls
|
||||
boxes = np.concatenate((boxes, iboxes), axis=0)
|
||||
|
||||
oldtracks.append(track)
|
||||
|
||||
fid_indices = np.argsort(boxes[:, 7])
|
||||
boxes_fid = boxes[fid_indices]
|
||||
|
||||
newtracks.append(backTrack(boxes_fid))
|
||||
elif len(tracklist) == 1:
|
||||
oldtracks.append(tracklist[0])
|
||||
newtracks.append(tracklist[0])
|
||||
|
||||
|
||||
redu = self.sub_tracks(Residual, oldtracks)
|
||||
merged = self.join_tracks(redu, newtracks)
|
||||
|
||||
return merged
|
||||
|
||||
def kid_state(self, track):
|
||||
|
||||
left_dist = track.cornpoints[:, 2]
|
||||
right_dist = 1024 - track.cornpoints[:, 4]
|
||||
|
||||
if np.sum(left_dist<30)/track.frnum>0.8 and np.sum(right_dist>512)/track.frnum>0.7:
|
||||
kidstate = "left"
|
||||
elif np.sum(left_dist>512)/track.frnum>0.7 and np.sum(right_dist<30)/track.frnum>0.8:
|
||||
kidstate = "right"
|
||||
else:
|
||||
kidstate = "incart"
|
||||
|
||||
return kidstate
|
||||
|
||||
|
||||
|
||||
def is_associate_with_hand(self):
|
||||
"""
|
||||
分析商品和手之间的关联性
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def isuptrack(self, track):
|
||||
Flag = False
|
||||
|
||||
return Flag
|
||||
|
||||
def isdowntrack(self, track):
|
||||
Flag = False
|
||||
|
||||
return Flag
|
||||
|
||||
def isfreetrack(self, track):
|
||||
Flag = False
|
||||
|
||||
return Flag
|
184
tracking/dotrack/dotracks_front.py
Normal file
184
tracking/dotrack/dotracks_front.py
Normal file
@ -0,0 +1,184 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Mon Mar 4 18:38:20 2024
|
||||
|
||||
@author: ym
|
||||
"""
|
||||
import numpy as np
|
||||
from utils.mergetrack import track_equal_track
|
||||
from .dotracks import doTracks
|
||||
from .track_front import frontTrack
|
||||
|
||||
class doFrontTracks(doTracks):
|
||||
def __init__(self, bboxes, TracksDict):
|
||||
super().__init__(bboxes, TracksDict)
|
||||
|
||||
self.tracks = [frontTrack(b) for b in self.lboxes]
|
||||
|
||||
def classify(self):
|
||||
'''功能:对 tracks 中元素分类 '''
|
||||
|
||||
tracks = self.tracks
|
||||
|
||||
'''提取手的 tracks'''
|
||||
hand_tracks = [t for t in tracks if t.cls==0]
|
||||
|
||||
|
||||
self.Hands.extend(hand_tracks)
|
||||
tracks = self.sub_tracks(tracks, hand_tracks)
|
||||
|
||||
|
||||
|
||||
'''提取小孩的 tracks'''
|
||||
kid_tracks = [t for t in tracks if t.cls==9]
|
||||
tracks = self.sub_tracks(tracks, kid_tracks)
|
||||
|
||||
|
||||
'''静态 tracks'''
|
||||
static_tracks = [t for t in tracks if t.frnum>1 and t.is_static()]
|
||||
|
||||
|
||||
'''剔除静止目标后的 tracks'''
|
||||
tracks = self.sub_tracks(tracks, static_tracks)
|
||||
|
||||
'''轨迹循环归并'''
|
||||
merged_tracks = self.merge_tracks_loop(tracks)
|
||||
|
||||
tracks = [t for t in merged_tracks if t.frnum > 1]
|
||||
|
||||
for gtrack in tracks:
|
||||
# print(f"Goods ID:{gtrack.tid}")
|
||||
for htrack in hand_tracks:
|
||||
if self.is_associate_with_hand(htrack, gtrack):
|
||||
gtrack.hands.append(htrack)
|
||||
|
||||
freemoved_tracks = [t for t in tracks if t.is_free_move()]
|
||||
|
||||
tracks = self.sub_tracks(tracks, freemoved_tracks)
|
||||
|
||||
|
||||
self.Residual = tracks
|
||||
|
||||
def is_associate_with_hand(self, htrack, gtrack):
|
||||
'''手部 Track、商品 Track 建立关联的依据:
|
||||
a. 运动帧的帧索引有交集
|
||||
b. 帧索引交集部分iou均大于0
|
||||
'''
|
||||
|
||||
assert htrack.cls==0 and gtrack.cls!=0 and gtrack.cls!=9, 'Track cls is Error!'
|
||||
|
||||
hboxes = np.empty(shape=(0, 9), dtype = np.float)
|
||||
gboxes = np.empty(shape=(0, 9), dtype = np.float)
|
||||
|
||||
# start, end 为索引值,需要 start:(end+1)
|
||||
for start, end in htrack.dynamic_y2:
|
||||
hboxes = np.concatenate((hboxes, htrack.boxes[start:end+1, :]), axis=0)
|
||||
for start, end in gtrack.dynamic_y1:
|
||||
gboxes = np.concatenate((gboxes, gtrack.boxes[start:end+1, :]), axis=0)
|
||||
|
||||
hfids, gfids = hboxes[:, 7], gboxes[:, 7]
|
||||
fids = set(hfids).intersection(set(gfids))
|
||||
|
||||
if len(fids)==0:
|
||||
return False
|
||||
|
||||
|
||||
# print(f"Goods ID: {gtrack.tid}, Hand ID: {htrack.tid}")
|
||||
ious = []
|
||||
for f in fids:
|
||||
h = np.where(hfids==f)[0][0]
|
||||
g = np.where(gfids==f)[0][0]
|
||||
|
||||
x11, y11, x12, y12 = hboxes[h, 0:4]
|
||||
x21, y21, x22, y22 = gboxes[g, 0:4]
|
||||
|
||||
x1, y1 = max((x11, x21)), max((y11, y21))
|
||||
x2, y2 = min((x12, x22)), min((y12, y22))
|
||||
|
||||
union = (x2 - x1).clip(0) * (y2 - y1).clip(0)
|
||||
area1 = (x12 - x11) * (y12 - y11)
|
||||
area2 = (x22 - x21) * (y22 - y21)
|
||||
|
||||
iou = union / (area1 + area2 - union + 1e-6)
|
||||
|
||||
if iou>0:
|
||||
ious.append(iou)
|
||||
|
||||
return len(ious)
|
||||
|
||||
|
||||
|
||||
def merge_tracks(self, Residual):
|
||||
"""
|
||||
对不同id,但可能是同一商品的目标进行归并
|
||||
"""
|
||||
# =============================================================================
|
||||
# mergedTracks = []
|
||||
# alist = [t for t in Residual]
|
||||
# while alist:
|
||||
# atrack = alist[0]
|
||||
# cur_list = []
|
||||
# cur_list.append(atrack)
|
||||
# alist.pop(0)
|
||||
#
|
||||
# blist = [b for b in alist]
|
||||
# alist = []
|
||||
# for btrack in blist:
|
||||
# if track_equal_track(atrack, btrack, self.TracksDict):
|
||||
# cur_list.append(btrack)
|
||||
# else:
|
||||
# alist.append(btrack)
|
||||
#
|
||||
# mergedTracks.append(cur_list)
|
||||
# =============================================================================
|
||||
mergedTracks = self.base_merge_tracks(Residual)
|
||||
|
||||
oldtracks, newtracks = [], []
|
||||
for tracklist in mergedTracks:
|
||||
if len(tracklist) > 1:
|
||||
boxes = np.empty((0, 9), dtype=np.float32)
|
||||
for i, track in enumerate(tracklist):
|
||||
if i==0: ntid, ncls=track.boxes[0, 4], track.boxes[0, 6]
|
||||
iboxes = track.boxes.copy()
|
||||
iboxes[:, 4], iboxes[:, 6] = ntid, ncls
|
||||
boxes = np.concatenate((boxes, iboxes), axis=0)
|
||||
oldtracks.append(track)
|
||||
|
||||
fid_indices = np.argsort(boxes[:, 7])
|
||||
boxes_fid = boxes[fid_indices]
|
||||
|
||||
newtracks.append(frontTrack(boxes_fid))
|
||||
elif len(tracklist) == 1:
|
||||
oldtracks.append(tracklist[0])
|
||||
newtracks.append(tracklist[0])
|
||||
|
||||
|
||||
redu = self.sub_tracks(Residual, oldtracks)
|
||||
merged = self.join_tracks(redu, newtracks)
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# def array2list(self):
|
||||
# '''
|
||||
# 将 bboxes 变换为 track 列表
|
||||
# bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
# Return:
|
||||
# lboxes:列表,列表中元素具有同一 track_id,x1y1x2y2 格式
|
||||
# [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
# '''
|
||||
# track_ids = set(self.bboxes[:, 4])
|
||||
# lboxes = []
|
||||
# for t_id in track_ids:
|
||||
# # print(f"The ID is: {t_id}")
|
||||
# idx = np.where(self.bboxes[:, 4] == t_id)[0]
|
||||
# box = self.bboxes[idx, :]
|
||||
#
|
||||
# lboxes.append(box)
|
||||
#
|
||||
# return lboxes
|
||||
# =============================================================================
|
||||
|
329
tracking/dotrack/track_back.py
Normal file
329
tracking/dotrack/track_back.py
Normal file
@ -0,0 +1,329 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Mon Mar 4 18:28:47 2024
|
||||
|
||||
@author: ym
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
from scipy.spatial.distance import cdist
|
||||
from sklearn.decomposition import PCA
|
||||
from .dotracks import MoveState, Track
|
||||
|
||||
|
||||
class backTrack(Track):
|
||||
# boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
# 0, 1, 2, 3, 4, 5, 6, 7, 8
|
||||
def __init__(self, boxes, imgshape=(1024, 1280)):
|
||||
|
||||
super().__init__(boxes, imgshape)
|
||||
|
||||
'''该函数依赖项: self.cornpoints'''
|
||||
self.isCornpoint = self.isimgborder()
|
||||
|
||||
'''该函数依赖项: self.isCornpoint,不能在父类中初始化'''
|
||||
self.trajfeature()
|
||||
|
||||
|
||||
'''静止点帧索引'''
|
||||
self.static_index = self.compute_static_fids()
|
||||
|
||||
'''运动点帧索引(运动帧两端的静止帧索引)'''
|
||||
self.moving_index = self.compute_dynamic_fids()
|
||||
|
||||
self.static_dynamic_fids = self.compute_static_dynamic_fids()
|
||||
|
||||
'''该函数依赖项: self.cornpoints,定义 4 个商品位置变量:
|
||||
self.Cent_isIncart, self.LB_isIncart, self.RB_isIncart
|
||||
self.posState = self.Cent_isIncart+self.LB_isIncart+self.RB_isIncart'''
|
||||
self.PositionState()
|
||||
|
||||
'''self.feature_ious = (incart_iou, outcart_iou, cartboarder_iou, maxbox_iou, minbox_iou)
|
||||
self.incartrates = incartrates'''
|
||||
self.compute_ious_feat()
|
||||
|
||||
# self.PCA()
|
||||
|
||||
|
||||
|
||||
def isimgborder(self, BoundPixel=10, BoundThresh=0.3):
|
||||
|
||||
x1, y1 = self.cornpoints[:,2], self.cornpoints[:,3],
|
||||
x2, y2 = self.cornpoints[:,8], self.cornpoints[:,9]
|
||||
|
||||
cont1 = sum(abs(x1)<BoundPixel) / self.frnum > BoundThresh
|
||||
cont2 = sum(abs(y1)<BoundPixel) / self.frnum > BoundThresh
|
||||
cont3 = sum(abs(x2-self.imgshape[0])<BoundPixel) / self.frnum > BoundThresh
|
||||
cont4 = sum(abs(y2-self.imgshape[1])<BoundPixel) / self.frnum > BoundThresh
|
||||
|
||||
cont = cont1 or cont2 or cont3 or cont4
|
||||
isCornpoint = False
|
||||
if cont:
|
||||
isCornpoint = True
|
||||
|
||||
return isCornpoint
|
||||
|
||||
|
||||
def PositionState(self, camerType="back"):
|
||||
'''
|
||||
camerType: back, 后置摄像头
|
||||
front, 前置摄像头
|
||||
'''
|
||||
if camerType=="front":
|
||||
incart = cv2.imread("./shopcart/cart_tempt/incart.png", cv2.IMREAD_GRAYSCALE)
|
||||
else:
|
||||
incart = cv2.imread("./shopcart/cart_tempt/incart_ftmp.png", cv2.IMREAD_GRAYSCALE)
|
||||
|
||||
xc, yc = self.cornpoints[:,0].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,1].clip(0,self.imgshape[1]-1).astype(np.int64)
|
||||
x1, y1 = self.cornpoints[:,6].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,7].clip(0,self.imgshape[1]-1).astype(np.int64)
|
||||
x2, y2 = self.cornpoints[:,8].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,9].clip(0,self.imgshape[1]-1).astype(np.int64)
|
||||
|
||||
# print(self.tid)
|
||||
Cent_inCartnum = np.count_nonzero(incart[(yc, xc)])
|
||||
LB_inCartnum = np.count_nonzero(incart[(y1, x1)])
|
||||
RB_inCartnum = np.count_nonzero(incart[(y2, x2)])
|
||||
|
||||
self.Cent_isIncart = False
|
||||
self.LB_isIncart = False
|
||||
self.RB_isIncart = False
|
||||
if Cent_inCartnum: self.Cent_isIncart = True
|
||||
if LB_inCartnum: self.LB_isIncart = True
|
||||
if RB_inCartnum: self.RB_isIncart = True
|
||||
|
||||
self.posState = self.Cent_isIncart+self.LB_isIncart+self.RB_isIncart
|
||||
|
||||
|
||||
|
||||
def PCA(self):
|
||||
self.pca = PCA()
|
||||
|
||||
X = self.cornpoints[:, 0:2]
|
||||
self.pca.fit(X)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def compute_ious_feat(self):
|
||||
'''输出:
|
||||
self.feature_ious = (incart_iou, outcart_iou, cartboarder_iou, maxbox_iou, minbox_iou)
|
||||
self.incartrates = incartrates,
|
||||
其中:
|
||||
boxes流:track中所有boxes形成的轨迹图,可分为三部分:incart, outcart, cartboarder
|
||||
incart_iou, outcart_iou, cartboarder_iou:各部分和 boxes流的 iou。
|
||||
incart_iou = 0,track在购物车外,
|
||||
outcart_iou = 0,track在购物车内,也可能是通过左下角、右下角置入购物车,
|
||||
maxbox_iou, minbox_iou:track中最大、最小 box 和boxes流的iou,二者差值越小,越接近 1,表明track的运动型越小。
|
||||
incartrates: 各box和incart的iou时序,由小变大,反应的是置入过程,由大变小,反应的是取出过程
|
||||
'''
|
||||
incart = cv2.imread("./shopcart/cart_tempt/incart.png", cv2.IMREAD_GRAYSCALE)
|
||||
outcart = cv2.imread("./shopcart/cart_tempt/outcart.png", cv2.IMREAD_GRAYSCALE)
|
||||
cartboarder = cv2.imread("./shopcart/cart_tempt/cartboarder.png", cv2.IMREAD_GRAYSCALE)
|
||||
|
||||
incartrates = []
|
||||
temp = np.zeros(incart.shape, np.uint8)
|
||||
maxarea, minarea = 0, self.imgshape[0]*self.imgshape[1]
|
||||
for i in range(self.frnum):
|
||||
# x, y, w, h = self.boxes[i, 0:4]
|
||||
|
||||
x = (self.boxes[i, 2] + self.boxes[i, 0]) / 2
|
||||
w = (self.boxes[i, 2] - self.boxes[i, 0]) / 2
|
||||
y = (self.boxes[i, 3] + self.boxes[i, 1]) / 2
|
||||
h = (self.boxes[i, 3] - self.boxes[i, 1]) / 2
|
||||
|
||||
|
||||
if w*h > maxarea: maxarea = w*h
|
||||
if w*h < minarea: minarea = w*h
|
||||
cv2.rectangle(temp, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), 255, cv2.FILLED)
|
||||
|
||||
temp1 = np.zeros(incart.shape, np.uint8)
|
||||
cv2.rectangle(temp1, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), 255, cv2.FILLED)
|
||||
temp2 = cv2.bitwise_and(incart, temp1)
|
||||
inrate = cv2.countNonZero(temp1)/(w*h)
|
||||
incartrates.append(inrate)
|
||||
|
||||
isincart = cv2.bitwise_and(incart, temp)
|
||||
isoutcart = cv2.bitwise_and(outcart, temp)
|
||||
iscartboarder = cv2.bitwise_and(cartboarder, temp)
|
||||
|
||||
num_temp = cv2.countNonZero(temp)
|
||||
num_incart = cv2.countNonZero(isincart)
|
||||
num_outcart = cv2.countNonZero(isoutcart)
|
||||
num_cartboarder = cv2.countNonZero(iscartboarder)
|
||||
|
||||
incart_iou = num_incart/num_temp
|
||||
outcart_iou = num_outcart/num_temp
|
||||
cartboarder_iou = num_cartboarder/num_temp
|
||||
maxbox_iou = maxarea/num_temp
|
||||
minbox_iou = minarea/num_temp
|
||||
|
||||
self.feature_ious = (incart_iou, outcart_iou, cartboarder_iou, maxbox_iou, minbox_iou)
|
||||
self.incartrates = incartrates
|
||||
|
||||
|
||||
|
||||
|
||||
def compute_static_fids(self, thresh1 = 12, thresh2 = 3):
|
||||
'''
|
||||
计算 track 的轨迹中相对处于静止状态的轨迹点的(start_frame_id, end_frame_id)
|
||||
thresh1: 相邻两帧目标中心点是否静止的的阈值,以像素为单位,
|
||||
thresh2: 连续捕捉到目标处于静止状态的帧数,当 thresh2 = 3时,至少连续 4个点,
|
||||
产生3个相邻点差值均小于 thresh1 时,判定为连续静止.
|
||||
处理过程中利用了插值技术,因此start、end并非 self.boxes 中对应的帧索引
|
||||
'''
|
||||
|
||||
BoundPixel = 8
|
||||
x1, y1 = self.cornpoints[:,2], self.cornpoints[:,3],
|
||||
x2, y2 = self.cornpoints[:,8], self.cornpoints[:,9]
|
||||
cont1 = sum(abs(x1)<BoundPixel) > 3
|
||||
# cont2 = sum(abs(y1)<BoundPixel) > 3
|
||||
cont3 = sum(abs(x2-self.imgshape[0])<BoundPixel) > 3
|
||||
# cont4 = sum(abs(y2-self.imgshape[1])<BoundPixel) > 3
|
||||
cont = not(cont1 or cont3)
|
||||
|
||||
## ============== 下一步,启用中心点,选择具有最小运动幅度的角点作为参考点
|
||||
static_index = []
|
||||
if self.frnum>=2 and cont:
|
||||
x1 = self.boxes[1:,7]
|
||||
x2 = [i for i in range(int(min(x1)), int(max(x1)+1))]
|
||||
dist_adjc = np.interp(x2, x1, self.trajmin)
|
||||
|
||||
|
||||
# dist_adjc = self.trajmin
|
||||
|
||||
static_thresh = (dist_adjc < thresh1)[:, None].astype(np.uint8)
|
||||
static_cnts, _ = cv2.findContours(static_thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
|
||||
|
||||
for cnt in static_cnts:
|
||||
_, start, _, num = cv2.boundingRect(cnt)
|
||||
end = start + num
|
||||
if num < thresh2:
|
||||
continue
|
||||
static_index.append((start, end))
|
||||
|
||||
static_index = np.array(static_index)
|
||||
if static_index.size:
|
||||
indx = np.argsort(static_index[:, 0])
|
||||
static_index = static_index[indx]
|
||||
|
||||
return static_index
|
||||
|
||||
def compute_dynamic_fids(self, thresh1 = 12, thresh2 = 3):
|
||||
'''
|
||||
计算 track 的轨迹中运动轨迹点的(start_frame_id, end_frame_id)
|
||||
thresh1: 相邻两帧目标中心点是否运动的阈值,以像素为单位,
|
||||
thresh2: 连续捕捉到目标连续运动的帧数
|
||||
目标:
|
||||
1. 计算轨迹方向
|
||||
2. 计算和手部运动的关联性
|
||||
'''
|
||||
moving_index = []
|
||||
if self.frnum>=2:
|
||||
x1 = self.boxes[1:,7]
|
||||
x2 = [i for i in range(int(min(x1)), int(max(x1)+1))]
|
||||
dist_adjc = np.interp(x2, x1, self.trajmin)
|
||||
|
||||
moving_thresh = (dist_adjc >= thresh1)[:, None].astype(np.uint8)
|
||||
moving_cnts, _ = cv2.findContours(moving_thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
|
||||
|
||||
for cnt in moving_cnts:
|
||||
_, start, _, num = cv2.boundingRect(cnt)
|
||||
if num < thresh2:
|
||||
continue
|
||||
end = start + num
|
||||
moving_index.append((start, end))
|
||||
|
||||
# =============================================================================
|
||||
# '''========= 输出帧id,不太合适 ========='''
|
||||
# moving_fids = []
|
||||
# for i in range(len(moving_index)):
|
||||
# i1, i2 = moving_index[i]
|
||||
# fid1, fid2 = boxes[i1, 7], boxes[i2, 7]
|
||||
# moving_fids.append([fid1, fid2])
|
||||
# moving_fids = np.array(moving_fids)
|
||||
# =============================================================================
|
||||
moving_index = np.array(moving_index)
|
||||
if moving_index.size:
|
||||
indx = np.argsort(moving_index[:, 0])
|
||||
moving_index = moving_index[indx]
|
||||
|
||||
return moving_index
|
||||
|
||||
def compute_static_dynamic_fids(self):
|
||||
static_dynamic_fids = []
|
||||
for traj in self.trajectory:
|
||||
static, dynamic = self.compute_static_fids(traj)
|
||||
|
||||
static_dynamic_fids.append((static, dynamic))
|
||||
|
||||
return static_dynamic_fids
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def is_static(self):
|
||||
|
||||
'''静态情况 1: 目标关键点最小相对运动轨迹 < 0.2, 指标值偏大
|
||||
feature = [trajlen_min, trajlen_max,
|
||||
trajdist_min, trajdist_max,
|
||||
trajlen_rate, trajdist_rate]
|
||||
'''
|
||||
|
||||
condt1 = self.feature[5] < 0.2 or self.feature[3] < 120
|
||||
|
||||
'''静态情况 2: 目标初始状态为静止,适当放宽关键点最小相对运动轨迹 < 0.5'''
|
||||
condt2 = self.static_index.size > 0 \
|
||||
and self.static_index[0, 0] <= 2 \
|
||||
and self.feature[5] < 0.5
|
||||
|
||||
'''静态情况 3: 目标初始状态和最终状态均为静止'''
|
||||
condt3 = self.static_index.shape[0] >= 2 \
|
||||
and self.static_index[0, 0] <= 2 \
|
||||
and self.static_index[-1, 1] >= self.frnum-3 \
|
||||
|
||||
condt = condt1 or condt2 or condt3
|
||||
|
||||
return condt
|
||||
|
||||
# =============================================================================
|
||||
# track1 = [t for t in tracks if t.feature[5] < 0.2
|
||||
# or t.feature[3] < 120
|
||||
# ]
|
||||
#
|
||||
# track2 = [t for t in tracks if t.static_index.size > 0
|
||||
# and t.static_index[0, 0] <= 2
|
||||
# and t.feature[5] < 0.5]
|
||||
#
|
||||
# track3 = [t for t in tracks if t.static_index.shape[0] >= 2
|
||||
# and t.static_index[0, 0] <= 2
|
||||
# and t.static_index[-1, 1] >= t.frnum-3]
|
||||
#
|
||||
# track12 = self.join_tracks(track1, track2)
|
||||
#
|
||||
# '''提取静止状态的 track'''
|
||||
# static_tracks = self.join_tracks(track12, track3)
|
||||
# self.Static.extend(static_tracks)
|
||||
#
|
||||
# =============================================================================
|
||||
|
||||
def is_OutTrack(self):
|
||||
if self.posState <= 1:
|
||||
isout = True
|
||||
else:
|
||||
isout = False
|
||||
return isout
|
||||
|
||||
|
||||
|
||||
|
||||
def compute_distance(self):
|
||||
pass
|
||||
|
||||
|
||||
def move_start_fid(self):
|
||||
pass
|
||||
|
||||
|
||||
def move_end_fid(self):
|
||||
pass
|
271
tracking/dotrack/track_front.py
Normal file
271
tracking/dotrack/track_front.py
Normal file
@ -0,0 +1,271 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Mon Mar 4 18:33:01 2024
|
||||
|
||||
@author: ym
|
||||
"""
|
||||
import numpy as np
|
||||
from sklearn.cluster import KMeans
|
||||
from .dotracks import MoveState, Track
|
||||
|
||||
|
||||
class frontTrack(Track):
|
||||
# boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
|
||||
# 0, 1, 2, 3, 4, 5, 6, 7, 8
|
||||
def __init__(self, boxes, imgshape=(1024, 1280)):
|
||||
|
||||
super().__init__(boxes, imgshape)
|
||||
self.hands = []
|
||||
|
||||
|
||||
'''5个关键点(中心点、左上点、右上点、左下点、右下点 )轨迹特征'''
|
||||
# self.compute_cornpts_feats()
|
||||
|
||||
|
||||
self.CART_HIGH_THRESH1 = imgshape[1]/2.98
|
||||
|
||||
|
||||
# if self.tid==10:
|
||||
# print(f"ID: {self.tid}")
|
||||
|
||||
'''y1、y2静止状态区间,值是 boxes 中对 axis=0 的索引,不是帧索引'''
|
||||
det_y1 = np.diff(boxes[:, 1], axis=0)
|
||||
det_y2 = np.diff(boxes[:, 3], axis=0)
|
||||
self.static_y1, self.dynamic_y1 = self.compute_static_fids(det_y1)
|
||||
self.static_y2, self.dynamic_y2 = self.compute_static_fids(det_y2)
|
||||
|
||||
self.isCornpoint = self.is_left_or_right_cornpoint()
|
||||
self.isBotmpoint = self.is_bottom_cornpoint()
|
||||
|
||||
'''该函数依赖项: self.isCornpoint,不能在父类中初始化'''
|
||||
self.trajfeature()
|
||||
|
||||
|
||||
'''手部状态分析'''
|
||||
self.HAND_STATIC_THRESH = 100
|
||||
self.CART_POSIT_0 = 430
|
||||
self.CART_POSIT_1 = 620
|
||||
|
||||
|
||||
|
||||
|
||||
def is_left_or_right_cornpoint(self):
|
||||
''' 基于 all(boxes),
|
||||
boxes左下角点和图像左下角点重叠 或
|
||||
boxes右下角点和图像左下角点重叠
|
||||
'''
|
||||
x1, y1 = self.boxes[:, 0], self.boxes[:, 1]
|
||||
x2, y2 = self.boxes[:, 2], self.boxes[:, 3]
|
||||
|
||||
# Left-Bottom cornpoint
|
||||
condt1 = all(x1 < 5) and all(y2 > self.imgshape[1]-5)
|
||||
|
||||
# Right-Bottom cornpoint
|
||||
condt2 = all(x2 > self.imgshape[0]-5) and all(y2 > self.imgshape[1]-5)
|
||||
|
||||
condt = condt1 or condt2
|
||||
|
||||
return condt
|
||||
|
||||
def is_edge_cornpoint(self):
|
||||
'''基于 all(boxes),boxes是否和图像左右边缘重叠'''
|
||||
x1, x2 = self.boxes[:, 0], self.boxes[:, 2]
|
||||
condt = all(x1 < 3) or all(x2 > self.imgshape[0]-3)
|
||||
|
||||
return condt
|
||||
|
||||
def is_bottom_cornpoint(self):
|
||||
'''基于 all(boxes),boxes是否和图像下边缘重叠'''
|
||||
condt = all(self.boxes[:, 3] > self.imgshape[1]-20)
|
||||
|
||||
return condt
|
||||
|
||||
# def is_OutTrack(self):
|
||||
# isout = False
|
||||
# if self.posState <= 1:
|
||||
# isout = True
|
||||
# return isout
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# def compute_static_fids(self, det_y, STATIC_THRESH = 8):
|
||||
# '''
|
||||
# 前摄时,y一般选择为 box 的 y1 坐标,且需限定商品在购物车内。
|
||||
# inputs:
|
||||
# y:1D array,
|
||||
# parameters:
|
||||
# STATIC_THRESH:轨迹处于静止状态的阈值。
|
||||
# outputs:
|
||||
# 输出为差分值小于 STATIC_THRESH 的y中元素的(start, end)索引
|
||||
# ranges = [(x1, y1),
|
||||
# (x1, y1),
|
||||
# ...]
|
||||
# '''
|
||||
# # print(f"The ID is: {self.tid}")
|
||||
#
|
||||
# # det_y = np.diff(y, axis=0)
|
||||
# ranges, rangex = [], []
|
||||
#
|
||||
# static_indices = np.where(np.abs(det_y) < STATIC_THRESH)[0]
|
||||
#
|
||||
# if len(static_indices) == 0:
|
||||
# rangex.append((0, len(det_y)))
|
||||
# return ranges, rangex
|
||||
#
|
||||
# start_index = static_indices[0]
|
||||
#
|
||||
# for i in range(1, len(static_indices)):
|
||||
# if static_indices[i] != static_indices[i-1] + 1:
|
||||
# ranges.append((start_index, static_indices[i-1] + 1))
|
||||
# start_index = static_indices[i]
|
||||
# ranges.append((start_index, static_indices[-1] + 1))
|
||||
#
|
||||
# if len(ranges) == 0:
|
||||
# rangex.append((0, len(det_y)))
|
||||
# return ranges, rangex
|
||||
#
|
||||
# idx1, idx2 = ranges[0][0], ranges[-1][1]
|
||||
#
|
||||
# if idx1 != 0:
|
||||
# rangex.append((0, idx1))
|
||||
#
|
||||
# # 轨迹的最后阶段是运动状态
|
||||
# for k in range(1, len(ranges)):
|
||||
# index1 = ranges[k-1][1]
|
||||
# index2 = ranges[k][0]
|
||||
# rangex.append((index1, index2))
|
||||
#
|
||||
# if idx2 != len(det_y):
|
||||
# rangex.append((idx2, len(det_y)))
|
||||
#
|
||||
# return ranges, rangex
|
||||
#
|
||||
# =============================================================================
|
||||
|
||||
|
||||
|
||||
def is_static(self):
|
||||
assert self.frnum > 1, "boxes number must greater than 1"
|
||||
# print(f"The ID is: {self.tid}")
|
||||
|
||||
# 手部和小孩目标不考虑
|
||||
if self.cls == 0 or self.cls == 9:
|
||||
return False
|
||||
|
||||
# boxes 全部 y2=1280
|
||||
if self.isBotmpoint:
|
||||
return True
|
||||
|
||||
boxes = self.boxes
|
||||
y0 = (boxes[:, 1]+boxes[:, 3])/2
|
||||
|
||||
## 纵轴矢量和
|
||||
sum_y0 = y0[-1] - y0[0]
|
||||
sum_y1 = boxes[-1, 1]-boxes[0, 1]
|
||||
sum_y2 = boxes[-1, 3]-boxes[0, 3]
|
||||
|
||||
# 一些需要考虑的特殊情况
|
||||
isbottom = max(boxes[:, 3]) > 1280-3
|
||||
istop = min(boxes[:, 1]) < 3
|
||||
isincart = min(y0) > self.CART_HIGH_THRESH1
|
||||
uncert = abs(sum_y1)<100 and abs(sum_y2)<100
|
||||
|
||||
'''初始条件:商品中心点始终在购物车内、'''
|
||||
condt0 = max((boxes[:, 1]+boxes[:, 3])/2) > self.CART_HIGH_THRESH1
|
||||
|
||||
'''条件1:轨迹运动纵向和(y1 或 y2)描述商品轨迹长度,存在情况:
|
||||
(1). 检测框可能与图像上下边缘重合,
|
||||
(2). 上边或下边存在跳动
|
||||
'''
|
||||
if isbottom and istop:
|
||||
condt1 = abs(sum_y0) < 300
|
||||
elif isbottom: # y2在底部,用y1表征运动
|
||||
condt1 = sum_y1 > -120 and abs(sum_y0)<80 # 有底部点,方向向上阈值小于100
|
||||
elif istop: # y1在顶部,用y2表征运动
|
||||
condt1 = abs(sum_y2) < 100
|
||||
else:
|
||||
condt1 = (abs(sum_y1) < 30 or abs(sum_y2)<30)
|
||||
|
||||
'''条件2:轨迹的开始和结束阶段均处于静止状态, 利用静止状态区间判断,用 y1
|
||||
a. 商品在购物车内,
|
||||
b. 检测框的起始阶段和结束阶段均为静止状态
|
||||
c. 静止帧长度 > 3'''
|
||||
|
||||
condt2 = False
|
||||
if len(self.static_y1)>=2:
|
||||
condt_s0 = self.static_y1[0][0]==0 and self.static_y1[0][1] - self.static_y1[0][0] >= 3
|
||||
condt_s1 = self.static_y1[-1][1]==self.frnum-1 and self.static_y1[-1][1] - self.static_y1[-1][0] >= 3
|
||||
condt2 = condt_s0 and condt_s1 and isincart
|
||||
|
||||
|
||||
condt = condt0 and (condt1 or condt2)
|
||||
|
||||
return condt
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def is_upward(self):
|
||||
'''判断商品是否取出,'''
|
||||
print(f"The ID is: {self.tid}")
|
||||
|
||||
def is_free_move(self):
|
||||
if self.frnum == 1:
|
||||
return True
|
||||
# print(f"The ID is: {self.tid}")
|
||||
|
||||
|
||||
y0 = (self.boxes[:, 1] + self.boxes[:, 3]) / 2
|
||||
det_y0 = np.diff(y0, axis=0)
|
||||
sum_y0 = y0[-1] - y0[0]
|
||||
|
||||
'''情况1:中心点向下 '''
|
||||
## 初始条件:商品第一次检测到在购物车内
|
||||
condt0 = y0[0] > self.CART_HIGH_THRESH1
|
||||
|
||||
condt_a = False
|
||||
## 条件1:商品初始为静止状态,静止条件应严格一些
|
||||
condt11, condt12 = False, False
|
||||
if len(self.static_y1)>0:
|
||||
condt11 = self.static_y1[0][0]==0 and self.static_y1[0][1] - self.static_y1[0][0] >= 5
|
||||
if len(self.static_y2)>0:
|
||||
condt12 = self.static_y2[0][0]==0 and self.static_y2[0][1] - self.static_y2[0][0] >= 5
|
||||
|
||||
# 条件2:商品中心发生向下移动
|
||||
condt2 = y0[-1] > y0[0]
|
||||
|
||||
# 综合判断a
|
||||
condt_a = condt0 and (condt11 or condt12) and condt2
|
||||
|
||||
'''情况2:中心点向上 '''
|
||||
## 商品中心点向上移动,但没有关联的Hand轨迹,也不是左右边界点
|
||||
condt_b = condt0 and len(self.hands)==0 and y0[-1] < y0[0] and (not self.is_edge_cornpoint())
|
||||
|
||||
|
||||
'''情况3: 商品在购物车内,但运动方向无序'''
|
||||
## 中心点在购物车内,纵向轨迹和小于轨迹差中绝对值最大的两个值的和,说明运动没有主方向
|
||||
condt_c = False
|
||||
if self.frnum > 3:
|
||||
condt_c = all(y0>self.CART_HIGH_THRESH1) and \
|
||||
(abs(sum_y0) < sum(np.sort(np.abs(det_y0))[::-1][:2])-1)
|
||||
|
||||
condt = (condt_a or condt_b or condt_c) and self.cls!=0
|
||||
|
||||
return condt
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user