# -*- coding: utf-8 -*- """ Created on Mon Mar 4 18:16:01 2024 @author: ym """ import numpy as np import cv2 from pathlib import Path from scipy.spatial.distance import cdist from tracking.utils.mergetrack import track_equal_track, readDict curpath = Path(__file__).resolve().parents[0] curpath = Path(curpath) parpath = curpath.parent class MoveState: """商品运动状态标志""" Static = 0 DownWard = 1 UpWard = 2 FreeMove = 3 Unknown = -1 class ShoppingCart: def __init__(self, bboxes): self.bboxes = bboxes self.loadrate = self.load_rate() def load_rate(self): bboxes = self.bboxes fid = min(bboxes[:, 7]) idx = bboxes[:, 7] == fid boxes = bboxes[idx] temp = np.zeros(self.incart.shape, np.uint8) for i in range(boxes.shape[0]): x1, y1, x2, y2, tid = boxes[i, 0:5] cv2.rectangle(temp, (int(x1), int(y1)), (int(x2), int(y2)), 255, cv2.FILLED) '''1. and 滤除购物车边框外的干扰''' loadstate = cv2.bitwise_and(self.incart, temp) '''2. xor 得到购物车内内被填充的区域''' # loadstate = cv2.bitwise_xor(self.incart, temp1) num_loadstate = cv2.countNonZero(loadstate) num_incart = cv2.countNonZero(self.incart) loadrate = num_loadstate / (num_incart+0.01) # edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png", cv2.IMREAD_GRAYSCALE) # cv2.imwrite(f"./test/temp.png", cv2.add(temp, edgeline)) # cv2.imwrite(f"./test/incart.png", cv2.add(self.incart, edgeline)) # cv2.imwrite(f"./test/loadstate.png", cv2.add(loadstate, edgeline)) return loadrate @property def incart(self): img = cv2.imread(str(parpath/'shopcart/cart_tempt/incart.png'), cv2.IMREAD_GRAYSCALE) ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY) return binary @property def outcart(self): img = cv2.imread(str(parpath/'shopcart/cart_tempt/outcart.png'), cv2.IMREAD_GRAYSCALE) ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY) return binary @property def cartedge(self): img = cv2.imread(str(parpath/'shopcart/cart_tempt/cartedge.png'), cv2.IMREAD_GRAYSCALE) ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY) return binary class Track: '''抽象基类,不能实例化对象''' def __init__(self, boxes, features=None, imgshape=(1024, 1280)): ''' boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 ''' # assert len(set(boxes[:, 4].astype(int))) == 1, "For a Track, track_id more than 1" # assert len(set(boxes[:, 6].astype(int))) == 1, "For a Track, class number more than 1" self.boxes = boxes self.features = features self.tid = int(boxes[0, 4]) self.cls = int(boxes[0, 6]) self.frnum = boxes.shape[0] self.isCornpoint = False self.imgshape = imgshape # self.isBorder = False # self.state = MoveState.Unknown '''轨迹开始帧、结束帧 ID''' # self.start_fid = int(np.min(boxes[:, 7])) # self.end_fid = int(np.max(boxes[:, 7])) '''''' self.Hands = [] self.HandsIou = [] self.Goods = [] self.GoodsIou = [] '''5个关键点(中心点、左上点、右上点、左下点、右下点 )坐标''' self.compute_cornpoints() '''5个关键点轨迹特征,可以在子类中实现,降低顺序处理时的计算量 (中心点、左上点、右上点、左下点、右下点 )轨迹特征''' self.compute_cornpts_feats() '''应计算各个角点面积、平均面积''' mw, mh = np.mean(boxes[:, 2]-boxes[:, 0]), np.mean((boxes[:, 3]-boxes[:, 1])) self.mwh = np.mean((mw, mh)) self.Area = mw * mh ''' 最后一帧与第一帧间的位移: vshift: 正值为向下,负值为向上 hshift: 负值为向购物车边框两边移动,正值为物品向中心移动 ''' self.vshift = self.cornpoints[-1, 1] - self.cornpoints[0, 1] # 纵向位移 self.hshift = abs(self.cornpoints[0, 0] - self.imgshape[0]/2) - \ abs(self.cornpoints[-1, 0] - self.imgshape[0]/2) '''手部状态分析''' self.HAND_STATIC_THRESH = 100 if self.cls == 0: self.extract_hand_features() def compute_cornpoints(self): ''' cornpoints 共10项,分别是个点的坐标值(x, y) (center, top_left, top_right, bottom_left, bottom_right) ''' boxes = self.boxes cornpoints = np.zeros((self.frnum, 10)) cornpoints[:,0] = (boxes[:, 0] + boxes[:, 2]) / 2 cornpoints[:,1] = (boxes[:, 1] + boxes[:, 3]) / 2 cornpoints[:,2], cornpoints[:,3] = boxes[:, 0], boxes[:, 1] cornpoints[:,4], cornpoints[:,5] = boxes[:, 2], boxes[:, 1] cornpoints[:,6], cornpoints[:,7] = boxes[:, 0], boxes[:, 3] cornpoints[:,8], cornpoints[:,9] = boxes[:, 2], boxes[:, 3] self.cornpoints = cornpoints def compute_cornpts_feats(self): ''' ''' # print(f"TrackID: {self.tid}") trajectory = [] trajlens = [] trajdist = [] trajrects = [] trajrects_wh = [] for k in range(5): # diff_xy2 = np.power(np.diff(self.cornpoints[:, 2*k:2*(k+1)], axis = 0), 2) # trajlen = np.sum(np.sqrt(np.sum(diff_xy2, axis = 1))) X = self.cornpoints[:, 2*k:2*(k+1)] traj = np.linalg.norm(np.diff(X, axis=0), axis=1) trajectory.append(traj) trajlen = np.sum(traj) trajlens.append(trajlen) ptdist = np.max(cdist(X, X)) trajdist.append(ptdist) '''最小外接矩形: rect[0]: 中心(x, y) rect[1]: (w, h) rect[0]: 旋转角度 (-90°, 0] ''' rect = cv2.minAreaRect(X.astype(np.int64)) rect_wh = max(rect[1]) trajrects_wh.append(rect_wh) trajrects.append(rect) self.trajectory = trajectory self.trajlens = trajlens self.trajdist = trajdist self.trajrects = trajrects self.trajrects_wh = trajrects_wh def trajfeature(self): ''' 分两种情况计算轨迹特征(检测框边界不在图像边界范围内,在图像边界范围内): -最小长度轨迹:trajmin -最小轨迹长度:trajlen_min -最小轨迹欧氏距离:trajdist_max ''' # idx1 = self.trajlens.index(max(self.trajlens)) idx1 = self.trajrects_wh.index(max(self.trajrects_wh)) trajmax = self.trajectory[idx1] trajlen_max = self.trajlens[idx1] trajdist_max = self.trajdist[idx1] if not self.isCornpoint: # idx2 = self.trajlens.index(min(self.trajlens)) idx2 = self.trajrects_wh.index(min(self.trajrects_wh)) trajmin = self.trajectory[idx2] trajlen_min = self.trajlens[idx2] trajdist_min = self.trajdist[idx2] else: trajmin = self.trajectory[0] trajlen_min = self.trajlens[0] trajdist_min = self.trajdist[0] '''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小''' trajlen_rate = trajlen_min/(trajlen_max+0.0001) '''最小轨迹欧氏距离/目标框尺度均值''' trajdist_rate = trajdist_min/(self.mwh+0.0001) self.trajmin = trajmin self.trajmax = trajmax self.TrajFeat = [trajlen_min, trajlen_max, trajdist_min, trajdist_max, trajlen_rate, trajdist_rate] def pt_state_fids(self, det_y, STATIC_THRESH = 8): ''' 前摄时,y一般选择为 box 的 y1 坐标,且需限定商品在购物车内。 inputs: y:1D array, parameters: STATIC_THRESH:轨迹处于静止状态的阈值。 outputs: 输出为差分值小于 STATIC_THRESH 的y中元素的(start, end)索引 ranges = [(x1, y1), (x1, y1), ...] ''' # print(f"The ID is: {self.tid}") # det_y = np.diff(y, axis=0) ranges, rangex = [], [] static_indices = np.where(np.abs(det_y) < STATIC_THRESH)[0] if len(static_indices) == 0: rangex.append((0, len(det_y))) return ranges, rangex start_index = static_indices[0] for i in range(1, len(static_indices)): if static_indices[i] != static_indices[i-1] + 1: ranges.append((start_index, static_indices[i-1] + 1)) start_index = static_indices[i] ranges.append((start_index, static_indices[-1] + 1)) if len(ranges) == 0: rangex.append((0, len(det_y))) return ranges, rangex idx1, idx2 = ranges[0][0], ranges[-1][1] if idx1 != 0: rangex.append((0, idx1)) # 轨迹的最后阶段是运动状态 for k in range(1, len(ranges)): index1 = ranges[k-1][1] index2 = ranges[k][0] rangex.append((index1, index2)) if idx2 != len(det_y): rangex.append((idx2, len(det_y))) return ranges, rangex def PositionState(self, camerType="back"): ''' camerType: back, 后置摄像头 front, 前置摄像头 ''' if camerType=="back": incart = cv2.imread(str(parpath/'shopcart/cart_tempt/incart.png'), cv2.IMREAD_GRAYSCALE) outcart = cv2.imread(str(parpath/'shopcart/cart_tempt/outcart.png'), cv2.IMREAD_GRAYSCALE) else: incart = cv2.imread(str(parpath/'shopcart/cart_tempt/incart_ftmp.png'), cv2.IMREAD_GRAYSCALE) outcart = cv2.imread(str(parpath/'shopcart/cart_tempt/outcart_ftmp.png'), cv2.IMREAD_GRAYSCALE) # incart = cv2.imread('./cart_tempt/incart_ftmp.png', cv2.IMREAD_GRAYSCALE) # outcart = cv2.imread('./cart_tempt/outcart_ftmp.png', cv2.IMREAD_GRAYSCALE) xc, yc = self.cornpoints[:,0].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,1].clip(0,self.imgshape[1]-1).astype(np.int64) x1, y1 = self.cornpoints[:,6].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,7].clip(0,self.imgshape[1]-1).astype(np.int64) x2, y2 = self.cornpoints[:,8].clip(0,self.imgshape[0]-1).astype(np.int64), self.cornpoints[:,9].clip(0,self.imgshape[1]-1).astype(np.int64) # print(self.tid) Cent_inCartnum = np.count_nonzero(incart[(yc, xc)]) LB_inCartnum = np.count_nonzero(incart[(y1, x1)]) RB_inCartnum = np.count_nonzero(incart[(y2, x2)]) Cent_outCartnum = np.count_nonzero(outcart[(yc, xc)]) LB_outCartnum = np.count_nonzero(outcart[(y1, x1)]) RB_outCartnum = np.count_nonzero(outcart[(y2, x2)]) '''Track完全在车内:左下角点、右下角点与 outcart 的交集为 0''' self.isWholeInCart = False if LB_outCartnum + RB_outCartnum == 0: self.isWholeInCart = True '''Track完全在车外:左下角点、中心点与 incart 的交集为 0 右下角点、中心点与 incart 的交集为 0 ''' self.isWholeOutCart = False if Cent_inCartnum + LB_inCartnum == 0 or Cent_inCartnum + RB_inCartnum == 0: self.isWholeOutCart = True self.Cent_isIncart = False self.LB_isIncart = False self.RB_isIncart = False if Cent_inCartnum: self.Cent_isIncart = True if LB_inCartnum: self.LB_isIncart = True if RB_inCartnum: self.RB_isIncart = True self.posState = self.Cent_isIncart+self.LB_isIncart+self.RB_isIncart def is_freemove(self): # if self.tid==4: # print(f"track ID: {self.tid}") # boxes = self.boxes # features = self.features # similars = 1 - np.maximum(0.0, cdist(self.features, self.features, metric = 'cosine')) box1 = self.boxes[0, :4] box2 = self.boxes[-1, :4] ''' 第1帧、最后一帧subimg的相似度 ''' feat1 = self.features[0, :][None, :] feat2 = self.features[-1, :][None, :] similar = 1 - np.maximum(0.0, cdist(feat1, feat2, metric = 'cosine')) condta = similar > 0.8 ''' 第1帧、最后一帧 boxes 四个角点间的距离 ''' ptd = box2 - box1 ptd1 = np.linalg.norm((ptd[0], ptd[1])) ptd2 = np.linalg.norm((ptd[2], ptd[1])) ptd3 = np.linalg.norm((ptd[0], ptd[3])) ptd4 = np.linalg.norm((ptd[2], ptd[3])) condtb = ptd1<50 and ptd2<50 and ptd3<50 and ptd4<50 condt = condta and condtb return condt def extract_hand_features(self): assert self.cls == 0, "The class of traj must be HAND!" self.isHandStatic = False x0 = (self.boxes[:, 0] + self.boxes[:, 2]) / 2 y0 = (self.boxes[:, 1] + self.boxes[:, 3]) / 2 handXY = np.stack((x0, y0), axis=-1) # handMaxY0 = np.max(y0) handCenter = np.array([(max(x0)+min(x0))/2, (max(y0)+min(y0))/2]) handMaxDist = np.max(np.linalg.norm(handXY - handCenter)) if handMaxDist < self.HAND_STATIC_THRESH: self.isHandStatic = True return class doTracks: def __init__(self, bboxes, trackefeats): '''fundamental property trackefeats: dict, key 格式 "fid_bid" ''' self.bboxes = bboxes # self.TracksDict = TracksDict self.frameID = np.unique(bboxes[:, 7].astype(int)) self.trackID = np.unique(bboxes[:, 4].astype(int)) self.lboxes = self.array2list() self.lfeats = self.getfeats(trackefeats) '''对 self.tracks 中的元素进行分类,将 track 归入相应列表中''' self.Hands = [] self.Kids = [] self.Static = [] self.Residual = [] self.Confirmed = [] self.DownWard = [] # subset of self.Residual self.UpWard = [] # subset of self.Residual self.FreeMove = [] # subset of self.Residual def array2list(self): ''' 将 bboxes 变换为 track 列表 bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] Return: lboxes:列表,列表中元素具有同一 track_id,x1y1x2y2 格式 [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] ''' track_ids = self.bboxes[:, 4].astype(int) lboxes = [] for t_id in self.trackID: # print(f"The ID is: {t_id}") idx = np.where(track_ids == t_id)[0] box = self.bboxes[idx, :] assert len(set(box[:, 7])) == len(box), "Please check!!!" lboxes.append(box) return lboxes def getfeats(self, trackefeats): lboxes = self.lboxes lfeats = [] for boxes in lboxes: feats = [] for i in range(boxes.shape[0]): fid, bid = int(boxes[i, 7]), int(boxes[i, 8]) key = f"{int(fid)}_{int(bid)}" if key in trackefeats: feats.append(trackefeats[key]) feats = np.asarray(feats, dtype=np.float32) lfeats.append(feats) return lfeats def similarity(self): nt = len(self.tracks) similar_dict = {} if nt >= 2: for i in range(nt): for j in range(i, nt): tracka = self.tracks[i] trackb = self.tracks[j] similar = self.feat_similarity(tracka, trackb) similar_dict.update({(tracka.tid, trackb.tid): similar}) return similar_dict def feat_similarity(self, tracka, trackb, metric='cosine'): boxes_a, boxes_b = tracka.boxes, trackb.boxes na, nb = tracka.boxes.shape[0], trackb.boxes.shape[0] feata, featb = [], [] for i in range(na): fid, bid = tracka.boxes[i, 7:9] feata.append(self.features_dict[fid][bid]) for i in range(nb): fid, bid = trackb.boxes[i, 7:9] featb.append(self.features_dict[fid][bid]) feata = np.asarray(feata, dtype=np.float32) featb = np.asarray(featb, dtype=np.float32) similarity_matrix = 1-np.maximum(0.0, cdist(feata, featb, metric)) feata_m = np.mean(feata, axis =0)[None,:] featb_m = np.mean(featb, axis =0)[None,:] simi_ab = 1 - cdist(feata_m, featb_m, metric) print(f'tid {int(boxes_a[0, 4])} vs {int(boxes_b[0, 4])}: {simi_ab[0][0]}') # return np.max(similarity_matrix) return simi_ab def merge_tracks_loop(self, alist): na, nb = len(alist), 0 while na!=nb: na = len(alist) alist = self.merge_tracks(alist) #func is from subclass nb = len(alist) return alist def base_merge_tracks(self, Residual): """ 对不同id,但可能是同一商品的目标进行归并 """ mergedTracks = [] alist = [t for t in Residual] while alist: atrack = alist[0] cur_list = [] cur_list.append(atrack) alist.pop(0) blist = [b for b in alist] alist = [] for btrack in blist: # afids = [] # for track in cur_list: # afids.extend(list(track.boxes[:, 7].astype(np.int_))) # bfids = btrack.boxes[:, 7].astype(np.int_) # interfid = set(afids).intersection(set(bfids)) # if len(interfid): # print("wait!!!") # if track_equal_track(atrack, btrack) and len(interfid)==0: if track_equal_track(atrack, btrack): cur_list.append(btrack) else: alist.append(btrack) mergedTracks.append(cur_list) return mergedTracks @staticmethod def join_tracks(tlista, tlistb): """Combine two lists of stracks into a single one.""" exists = {} res = [] for t in tlista: exists[t.tid] = 1 res.append(t) for t in tlistb: tid = t.tid if not exists.get(tid, 0): exists[tid] = 1 res.append(t) return res @staticmethod def sub_tracks(tlista, tlistb): track_ids_b = {t.tid for t in tlistb} return [t for t in tlista if t.tid not in track_ids_b] def array2frame(self, bboxes): frameID = np.sort(np.unique(bboxes[:, 7].astype(int))) fboxes = [] for fid in frameID: idx = np.where(bboxes[:, 7] == fid)[0] box = bboxes[idx, :] fboxes.append(box) return fboxes def isintrude(self): ''' boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 ''' OverlapNum = 3 bboxes = self.bboxes.astype(np.int64) fboxes = self.array2frame(bboxes) incart = cv2.bitwise_not(self.incart) sum_incart = np.zeros(incart.shape, dtype=np.int64) for fid, boxes in enumerate(fboxes): for i in range(len(boxes)): x1, y1, x2, y2 = boxes[i, 0:4] sum_incart[y1:y2, x1:x2] += 1 sumincart = np.zeros(sum_incart.shape, dtype=np.uint8) idx255 = np.where(sum_incart >= OverlapNum) sumincart[idx255] = 255 idxnzr = np.where(sum_incart!=0) base = np.zeros(sum_incart.shape, dtype=np.uint8) base[idxnzr] = 255 contours_sum, _ = cv2.findContours(sumincart, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) contours_base, _ = cv2.findContours(base, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) have_existed, invasion = [], [] for k, ct_temp in enumerate(contours_base): tmp1 = np.zeros(sum_incart.shape, dtype=np.uint8) cv2.drawContours(tmp1, [ct_temp], -1, 255, cv2.FILLED) # 确定轮廓的包含关系 for ct_sum in contours_sum: tmp2 = np.zeros(sum_incart.shape, dtype=np.uint8) cv2.drawContours(tmp2, [ct_sum], -1, 255, cv2.FILLED) tmp = cv2.bitwise_and(tmp1, tmp2) if np.count_nonzero(tmp) == np.count_nonzero(tmp2): have_existed.append(k) inIdx = [i for i in range(len(contours_base)) if i not in have_existed] invasion = np.zeros(sum_incart.shape, dtype=np.uint8) for i in inIdx: cv2.drawContours(invasion, [contours_base[i]], -1, 255, cv2.FILLED) cv2.imwrite("./result/intrude/invasion.png", invasion) Intrude = True if len(inIdx)>=1 else False print(f"is intruded: {Intrude}") return Intrude