# -*- coding: utf-8 -*- """ Created on Mon Mar 4 18:16:01 2024 @author: ym """ import numpy as np import cv2 from pathlib import Path from scipy.spatial.distance import cdist from ytracking.tracking.utils.mergetrack import track_equal_track # curpath = Path(__file__).resolve().parents[0] from tools.config import cfg class MoveState: """商品运动状态标志""" Static = 0 DownWard = 1 UpWard = 2 FreeMove = 3 HandHborder = 4 Unknown = -1 class ShoppingCart: def __init__(self, bboxes): self.bboxes = bboxes self.loadrate = self.load_rate() def load_rate(self): bboxes = self.bboxes fid = min(bboxes[:, 7]) idx = bboxes[:, 7] == fid boxes = bboxes[idx] temp = np.zeros(self.incart.shape, np.uint8) for i in range(boxes.shape[0]): x1, y1, x2, y2, tid = boxes[i, 0:5] cv2.rectangle(temp, (int(x1), int(y1)), (int(x2), int(y2)), 255, cv2.FILLED) '''1. and 滤除购物车边框外的干扰''' loadstate = cv2.bitwise_and(self.incart, temp) '''2. xor 得到购物车内内被填充的区域''' # loadstate = cv2.bitwise_xor(self.incart, temp1) num_loadstate = cv2.countNonZero(loadstate) num_incart = cv2.countNonZero(self.incart) loadrate = num_loadstate / (num_incart+0.01) # edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png", cv2.IMREAD_GRAYSCALE) # cv2.imwrite(f"./test/temp.png", cv2.add(temp, edgeline)) # cv2.imwrite(f"./test/incart.png", cv2.add(self.incart, edgeline)) # cv2.imwrite(f"./test/loadstate.png", cv2.add(loadstate, edgeline)) return loadrate @property def incart(self): # img = cv2.imread(curpath/'cart_tempt'/'back_incart.png', cv2.IMREAD_GRAYSCALE) img = cv2.imread(cfg.incart, cv2.IMREAD_GRAYSCALE) ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY) return binary @property def outcart(self): # img = cv2.imread(curpath/'cart_tempt'/'back_outcart.png', cv2.IMREAD_GRAYSCALE) img = cv2.imread(cfg.outcart, cv2.IMREAD_GRAYSCALE) ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY) return binary @property def cartedge(self): # img = cv2.imread(curpath/'cart_tempt'/'back_cartedge.png', cv2.IMREAD_GRAYSCALE) img = cv2.imread(cfg.cartedge, cv2.IMREAD_GRAYSCALE) ret, binary = cv2.threshold(img, 250, 255, cv2.THRESH_BINARY) return binary class Track: '''抽象基类,不能实例化对象''' def __init__(self, boxes, imgshape=(1024, 1280)): ''' boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] 0 1 2 3 4 5 6 7 8 ''' self.boxes = boxes self.tid = int(boxes[0, 4]) self.cls = int(boxes[0, 6]) self.frnum = boxes.shape[0] self.imgBorder = False self.imgshape = imgshape self.state = MoveState.Unknown '''轨迹开始帧、结束帧 ID''' self.start_fid = int(np.min(boxes[:, 7])) self.end_fid = int(np.max(boxes[:, 7])) # 根据需要,可以在子类中实现,降低顺序处理时的计算量 self.compute_cornpoints() self.compute_cornpts_feats() ''' 基于 (x, y, w, h) 的一些计算,haved deprecated 最后一帧与第一帧间的位移: vshift: 正值为向下,负值为向上 hshift: 正值为物品向中心移动,负值为向购物车边框两边移动 ''' x0, y0 = (boxes[:,0] + boxes[:,2])/2, (boxes[:, 1] + boxes[:, 3])/2 mw, mh = np.mean(boxes[:, 2]-boxes[:, 0]), np.mean((boxes[:, 3]-boxes[:, 1])) self.mwh = np.mean((mw, mh)) self.Area = mw * mh self.vshift = y0[-1] - boxes[0] self.hshift = abs(x0[0]-self.imgshape[0]/2) - abs(x0[-1]-self.imgshape[0]/2) # self.boxmean = [np.mean(self.boxes[:, k]) for k in range(4)] # self.mwh = np.mean(self.boxmean[2:]) # self.Area = boxes[:,2] * boxes[:,3] # self.vshift = boxes[-1, 1] - boxes[0, 1] # self.hshift = abs(boxes[0, 0]-self.imgshape[0]/2) - abs(boxes[-1, 0]-self.imgshape[0]/2) def compute_cornpoints(self): ''' cornpoints 共10项,分别是个点的坐标值(x, y) (center, top_left, top_right, bottom_left, bottom_right) ''' boxes = self.boxes cornpoints = np.zeros((self.frnum, 10)) cornpoints[:,0] = (boxes[:, 0] + boxes[:, 2]) / 2 cornpoints[:,1] = (boxes[:, 1] + boxes[:, 3]) / 2 cornpoints[:,2], cornpoints[:,3] = boxes[:, 0], boxes[:, 1] cornpoints[:,4], cornpoints[:,5] = boxes[:, 2], boxes[:, 1] cornpoints[:,6], cornpoints[:,7] = boxes[:, 0], boxes[:, 3] cornpoints[:,8], cornpoints[:,9] = boxes[:, 2], boxes[:, 3] self.cornpoints = cornpoints def compute_cornpts_feats(self): ''' ''' trajectory = [] trajlens = [] trajdist = [] trajrects = [] for k in range(5): # diff_xy2 = np.power(np.diff(self.cornpoints[:, 2*k:2*(k+1)], axis = 0), 2) # trajlen = np.sum(np.sqrt(np.sum(diff_xy2, axis = 1))) X = self.cornpoints[:, 2*k:2*(k+1)] traj = np.linalg.norm(np.diff(X, axis=0), axis=1) trajectory.append(traj) trajlen = np.sum(traj) trajlens.append(trajlen) ptdist = np.max(cdist(X, X)) trajdist.append(ptdist) rect = cv2.minAreaRect(X.astype(np.int64)) trajrects.append(rect) self.trajectory = trajectory self.trajlens = trajlens self.trajdist = trajdist self.trajrects = trajrects def trajfeature(self): ''' 分两种情况计算轨迹特征(检测框边界不在图像边界范围内,在图像边界范围内): -最小长度轨迹:trajmin -最小轨迹长度:trajlen_min -最小轨迹欧氏距离:trajdist_max ''' idx1 = self.trajlens.index(max(self.trajlens)) trajmax = self.trajectory[idx1] trajlen_max = self.trajlens[idx1] trajdist_max = self.trajdist[idx1] if not self.isCornpoint: idx2 = self.trajlens.index(min(self.trajlens)) trajmin = self.trajectory[idx2] trajlen_min = self.trajlens[idx2] trajdist_min = self.trajdist[idx2] else: trajmin = self.trajectory[0] trajlen_min = self.trajlens[0] trajdist_min = self.trajdist[0] '''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小''' trajlen_rate = trajlen_min/(trajlen_max+0.0001) '''最小轨迹欧氏距离/目标框尺度均值''' trajdist_rate = trajdist_min/(self.mwh+0.0001) self.trajmin = trajmin self.trajmax = trajmax self.feature = [trajlen_min, trajlen_max, trajdist_min, trajdist_max, trajlen_rate, trajdist_rate] class doTracks: def __init__(self, bboxes, features_dict): self.bboxes = bboxes self.features_dict = features_dict self.frameid = set(bboxes[:, 7]) self.trackid = set(bboxes[:, 4]) self.lboxes = self.array2list() '''对 self.tracks 中的元素进行分类,将 track 归入相应列表中''' self.Static = [] self.DownWard = [] self.UpWard = [] self.FreeMove = [] self.Hands = [] self.Kids = [] self.HandHborder = [] self.Disruptors = [] self.Residual = [] self.Merged = [] def array2list(self): ''' 将 bboxes 变换为 track 列表 bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] Return: lboxes:列表,列表中元素具有同一 track_id,x1y1x2y2 格式 [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index] ''' track_ids = set(self.bboxes[:, 4]) lboxes = [] for t_id in track_ids: # print(f"The ID is: {t_id}") idx = np.where(self.bboxes[:, 4] == t_id)[0] box = self.bboxes[idx, :] lboxes.append(box) return lboxes def similarity(self): nt = len(self.tracks) similar_dict = {} if nt >= 2: for i in range(nt): for j in range(i, nt): tracka = self.tracks[i] trackb = self.tracks[j] similar = self.feat_similarity(tracka, trackb) similar_dict.update({(tracka.tid, trackb.tid): similar}) return similar_dict def feat_similarity(self, tracka, trackb, metric='cosine'): boxes_a, boxes_b = tracka.boxes, trackb.boxes na, nb = tracka.boxes.shape[0], trackb.boxes.shape[0] feata, featb = [], [] for i in range(na): fid, bid = tracka.boxes[i, 7:9] feata.append(self.features_dict[fid][bid]) for i in range(nb): fid, bid = trackb.boxes[i, 7:9] featb.append(self.features_dict[fid][bid]) feata = np.asarray(feata, dtype=np.float32) featb = np.asarray(featb, dtype=np.float32) similarity_matrix = 1-np.maximum(0.0, cdist(feata, featb, metric)) feata_m = np.mean(feata, axis =0)[None,:] featb_m = np.mean(featb, axis =0)[None,:] simi_ab = 1 - cdist(feata_m, featb_m, metric) print(f'tid {int(boxes_a[0, 4])} vs {int(boxes_b[0, 4])}: {simi_ab[0][0]}') # return np.max(similarity_matrix) return simi_ab def merge_tracks_loop(self, alist): na, nb = len(alist), 0 while na!=nb: na = len(alist) alist = self.merge_tracks(alist) nb = len(alist) return alist def base_merge_tracks(self, Residual): """ 对不同id,但可能是同一商品的目标进行归并 """ mergedTracks = [] alist = [t for t in Residual] while alist: atrack = alist[0] cur_list = [] cur_list.append(atrack) alist.pop(0) blist = [b for b in alist] alist = [] for btrack in blist: if track_equal_track(atrack, btrack, self.features_dict): cur_list.append(btrack) else: alist.append(btrack) mergedTracks.append(cur_list) return mergedTracks @staticmethod def join_tracks(tlista, tlistb): """Combine two lists of stracks into a single one.""" exists = {} res = [] for t in tlista: exists[t.tid] = 1 res.append(t) for t in tlistb: tid = t.tid if not exists.get(tid, 0): exists[tid] = 1 res.append(t) return res @staticmethod def sub_tracks(tlista, tlistb): track_ids_b = {t.tid for t in tlistb} return [t for t in tlista if t.tid not in track_ids_b]