# -*- coding: utf-8 -*- """ Created on Thu Oct 10 11:01:39 2024 @author: ym """ import os import numpy as np # from matplotlib.pylab import mpl # mpl.use('Qt5Agg') import matplotlib.pyplot as plt from move_detect import MoveDetect import sys sys.path.append(r"D:\DetectTracking") # from tracking.utils.read_data import extract_data, read_deletedBarcode_file, read_tracking_output, read_weight_timeConsuming from tracking.utils.read_data import read_weight_timeConsuming def str_to_float_arr(s): # 移除字符串末尾的逗号(如果存在) if s.endswith(','): s = s[:-1] # 使用split()方法分割字符串,然后将每个元素转化为float float_array = [float(x) for x in s.split(",")] return float_array def find_samebox_in_array(arr, target): for i, st in enumerate(arr): if st[:4] == target[:4]: return i return -1 def array2frame(bboxes): frameID = np.sort(np.unique(bboxes[:, 7].astype(int))) # frame_ids = bboxes[:, frameID].astype(int) fboxes, ttamps = [], [] for fid in frameID: idx = np.where(bboxes[:, 7] == fid)[0] box = bboxes[idx, :] fboxes.append(box) ttamps.append(int(box[0, 9])) frameTstamp = np.concatenate((frameID[:,None], np.array(ttamps)[:,None]), axis=1) return fboxes, frameTstamp def extract_data_1(datapath): ''' 要求每一帧(包括最后一帧)输出数据后有一空行作为分割行,该分割行为标志行 ''' trackerboxes = np.empty((0, 10), dtype=np.float64) trackerfeats = np.empty((0, 256), dtype=np.float64) boxes, feats, tboxes, tfeats = [], [], [], [] timestamp = -1 newframe = False with open(datapath, 'r', encoding='utf-8') as lines: for line in lines: if line.find("CameraId")>=0: newframe = True timestamp, frameId = [int(ln.split(":")[1]) for ln in line.split(",")[1:]] # boxes, feats, tboxes, tfeats = [], [], [], [] if line.find("box:") >= 0 and line.find("output_box:") < 0: line = line.strip() box = line[line.find("box:") + 4:].strip() # if len(box)==6: boxes.append(str_to_float_arr(box)) if line.find("feat:") >= 0: line = line.strip() feat = line[line.find("feat:") + 5:].strip() # if len(feat)==256: feats.append(str_to_float_arr(feat)) if line.find("output_box:") >= 0: line = line.strip() # 确保 boxes 和 feats 一一对应,并可以保证 tboxes 和 tfeats 一一对应 if len(boxes)==0 or len(boxes)!=len(feats): continue box = str_to_float_arr(line[line.find("output_box:") + 11:].strip()) box.append(timestamp) index = find_samebox_in_array(boxes, box) if index >= 0: tboxes.append(box) # 去掉'output_box:'并去除可能的空白字符 # feat_f = str_to_float_arr(input_feats[index]) feat_f = feats[index] norm_f = np.linalg.norm(feat_f) feat_f = feat_f / norm_f tfeats.append(feat_f) '''标志行(空行)判断''' condt = line.find("timestamp")<0 and line.find("box:")<0 and line.find("feat:")<0 if condt and newframe: if len(tboxes) and len(tfeats): trackerboxes = np.concatenate((trackerboxes, np.array(tboxes))) trackerfeats = np.concatenate((trackerfeats, np.array(tfeats))) timestamp = -1 boxes, feats, tboxes, tfeats = [], [], [], [] newframe = False return trackerboxes, trackerfeats def devide_motion_state(tboxes, width): '''frameTstamp: 用于标记当前相机视野内用购物车运动状态变化''' periods = [] if len(tboxes) < width: return periods fboxes, frameTstamp = array2frame(tboxes) fnum = len(frameTstamp) if fnum < width: return periods state = np.zeros((fnum, 2), dtype=np.int64) frameState = np.concatenate((frameTstamp, state), axis = 1).astype(np.int64) mtrackFid = {} '''frameState 标记由图像判断的购物车状态:0: 静止,1: 运动''' for idx in range(width, fnum+1): lboxes = np.concatenate(fboxes[idx-width:idx], axis = 0) md = MoveDetect(lboxes) md.classify() # if idx==60: # print('a') ## track.during 二元素组, 表征在该时间片段内,轨迹 track 的起止时间,数值用 boxes[:, 7] for track in md.track_motion: if track.cls == 0: continue f1, f2 = track.during idx1 = set(np.where(frameState[:,0] >= f1)[0]) idx2 = set(np.where(frameState[:,0] <= f2)[0]) idx3 = list(idx1.intersection(idx2)) if track.tid not in mtrackFid: mtrackFid[track.tid] = set(idx3) else: mtrackFid[track.tid] = mtrackFid[track.tid].union(set(idx3)) frameState[idx-1, 3] = 1 frameState[idx3, 2] = 1 '''状态变化输出''' for tid, fid in mtrackFid.items(): fstate = np.zeros((fnum, 1), dtype=np.int64) fstate[list(fid), 0] = tid frameState = np.concatenate((frameState, fstate), axis = 1).astype(np.int64) return frameState def state_measure(periods, weights, spath=None): '''两种状态:static、motion, (t0, t1) t0: static ----> motion t1: motion ----> static ''' PrevState = 'static' CuurState = 'static' camtype_0, frstate_0 = periods[0] camtype_1, frstate_1 = periods[1] '''计算总时间区间: tmin, tmax, during''' tmin_w, tmax_w = np.min(weights[:, 0]), np.max(weights[:, 0]) tmin_0, tmax_0 = np.min(frstate_0[:, 1]), np.max(frstate_0[:, 1]) tmin_1, tmax_1 = np.min(frstate_1[:, 1]), np.max(frstate_1[:, 1]) tmin = min([tmin_w, tmin_0, tmin_1]) tmax = max([tmax_w, tmax_0, tmax_1]) # for ctype, tboxes, _ in tracker_boxes: # t_min, t_max = np.min(tboxes[:, 9]), np.max(tboxes[:, 9]) # if t_mintmax: # tmax = t_max # during = tmax - tmin fig, (ax1, ax2, ax3) = plt.subplots(3, 1) ax1.plot(weights[:, 0] - tmin, weights[:, 1], 'bo-', linewidth=1, markersize=4) # ax1.set_xlim([0, during]) ax1.set_title('Weight (g)') ax2.plot(frstate_0[:, 1] - tmin, frstate_0[:, 2], 'rx-', linewidth=1, markersize=8) ax2.plot(frstate_0[:, 1] - tmin, frstate_0[:, 3], 'bo-', linewidth=1, markersize=4) # ax2.set_xlim([0, during]) ax2.set_title(f'Camera: {int(camtype_0)}') ax3.plot(frstate_1[:, 1] - tmin, frstate_1[:, 2], 'rx-', linewidth=1, markersize=8) ax3.plot(frstate_1[:, 1] - tmin, frstate_1[:, 3], 'bo-', linewidth=1, markersize=4) ax3.set_title(f'Camera: {int(camtype_1)}') if spath: plt.savefig(spath) plt.show() def read_yolo_weight_data(eventdir): filepaths = [] for filename in os.listdir(eventdir): file, ext = os.path.splitext(filename) if ext =='.data': filepath = os.path.join(eventdir, filename) filepaths.append(filepath) if len(filepaths) != 5: return tracker_boxes = [] WeightDict, SensorDict, ProcessTimeDict = {}, {}, {} for filepath in filepaths: filename = os.path.basename(filepath) if filename.find('_track.data')>0: CamerType = filename.split('_')[0] trackerboxes, trackerfeats = extract_data_1(filepath) tracker_boxes.append((CamerType, trackerboxes, trackerfeats)) if filename.find('process.data')==0: WeightDict, SensorDict, ProcessTimeDict = read_weight_timeConsuming(filepath) '''====================重力信号处理====================''' weights = [(float(t), w) for t, w in WeightDict.items()] weights = np.array(weights) return tracker_boxes, weights def main(): eventdir = r"\\192.168.1.28\share\测试_202406\0819\images\20240817-192549-6940120c-634c-481b-97a6-65042729f86b_null" tracker_boxes, weights = read_yolo_weight_data(eventdir) '''====================图像运动分析====================''' win_width = 12 periods = [] for ctype, tboxes, _ in tracker_boxes: period = devide_motion_state(tboxes, win_width) periods.append((ctype, period)) print('done!') '''===============重力、图像信息融合===================''' state_measure(periods, weights) if __name__ == "__main__": main()