Files
ieemoo-ai-detecttracking/realtime/event_time_specify.py
2025-04-18 14:41:53 +08:00

315 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Thu Oct 10 11:01:39 2024
@author: ym
"""
import os
import numpy as np
# from matplotlib.pylab import mpl
# mpl.use('Qt5Agg')
import matplotlib.pyplot as plt
import sys
sys.path.append(r"D:\DetectTracking")
from move_detect import MoveDetect
# from tracking.utils.read_data import extract_data, read_deletedBarcode_file, read_tracking_output, read_weight_timeConsuming
from tracking.utils.read_data import read_weight_timeConsuming
def str_to_float_arr(s):
# 移除字符串末尾的逗号(如果存在)
if s.endswith(','):
s = s[:-1]
# 使用split()方法分割字符串然后将每个元素转化为float
float_array = [float(x) for x in s.split(",")]
return float_array
def find_samebox_in_array(arr, target):
for i, st in enumerate(arr):
if st[:4] == target[:4]:
return i
return -1
def array2frame(bboxes):
frameID = np.sort(np.unique(bboxes[:, 7].astype(int)))
# frame_ids = bboxes[:, frameID].astype(int)
fboxes, ttamps = [], []
for fid in frameID:
idx = np.where(bboxes[:, 7] == fid)[0]
box = bboxes[idx, :]
fboxes.append(box)
ttamps.append(int(box[0, 9]))
frameTstamp = np.concatenate((frameID[:,None], np.array(ttamps)[:,None]), axis=1)
return fboxes, frameTstamp
def extract_data_1(datapath):
'''
要求每一帧(包括最后一帧)输出数据后有一空行作为分割行,该分割行为标志行
'''
trackerboxes = np.empty((0, 10), dtype=np.float64)
trackerfeats = np.empty((0, 256), dtype=np.float64)
boxes, feats, tboxes, tfeats = [], [], [], []
timestamp = -1
newframe = False
with open(datapath, 'r', encoding='utf-8') as lines:
for line in lines:
if line.find("CameraId")>=0:
newframe = True
timestamp, frameId = [int(ln.split(":")[1]) for ln in line.split(",")[1:]]
# boxes, feats, tboxes, tfeats = [], [], [], []
if line.find("box:") >= 0 and line.find("output_box:") < 0:
line = line.strip()
box = line[line.find("box:") + 4:].strip()
# if len(box)==6:
boxes.append(str_to_float_arr(box))
if line.find("feat:") >= 0:
line = line.strip()
feat = line[line.find("feat:") + 5:].strip()
# if len(feat)==256:
feats.append(str_to_float_arr(feat))
if line.find("output_box:") >= 0:
line = line.strip()
# 确保 boxes 和 feats 一一对应,并可以保证 tboxes 和 tfeats 一一对应
if len(boxes)==0 or len(boxes)!=len(feats):
continue
box = str_to_float_arr(line[line.find("output_box:") + 11:].strip())
box.append(timestamp)
index = find_samebox_in_array(boxes, box)
if index >= 0:
tboxes.append(box) # 去掉'output_box:'并去除可能的空白字符
# feat_f = str_to_float_arr(input_feats[index])
feat_f = feats[index]
norm_f = np.linalg.norm(feat_f)
feat_f = feat_f / norm_f
tfeats.append(feat_f)
'''标志行(空行)判断'''
condt = line.find("timestamp")<0 and line.find("box:")<0 and line.find("feat:")<0
if condt and newframe:
if len(tboxes) and len(tfeats):
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)))
trackerfeats = np.concatenate((trackerfeats, np.array(tfeats)))
timestamp = -1
boxes, feats, tboxes, tfeats = [], [], [], []
newframe = False
return trackerboxes, trackerfeats
def devide_motion_state(tboxes, width):
'''frameTstamp: 用于标记当前相机视野内用购物车运动状态变化
Hand状态
0: 不存在
1: 手部存在
2: 手部存在且处于某种状态(静止)
'''
fboxes, frameTstamp = array2frame(tboxes)
fnum = len(frameTstamp)
state = np.zeros((fnum, 2), dtype=np.int64)
frameState = np.concatenate((frameTstamp, state), axis = 1).astype(np.int64)
handState = np.concatenate((frameTstamp, state), axis = 1).astype(np.int64)
if fnum < width:
return frameState, handState
mtrackFid = {}
handFid = {}
'''frameState 标记由图像判断的购物车状态0: 静止1: 运动'''
for idx in range(width, fnum+1):
idx0 = idx-width
# if idx == 40:
# print("123")
lboxes = np.concatenate(fboxes[idx0:idx], axis = 0)
md = MoveDetect(lboxes)
md.classify()
## track.during 二元素组, 表征在该时间片段内,轨迹 track 的起止时间,数值用 boxes[:, 7]
for track in md.track_motion:
f1, f2 = track.during
# if track.cls == 0: continue
idx1 = set(np.where(frameState[:,0] >= f1)[0])
idx2 = set(np.where(frameState[:,0] <= f2)[0])
idx3 = list(idx1.intersection(idx2))
if track.tid not in mtrackFid:
mtrackFid[track.tid] = set(idx3)
else:
mtrackFid[track.tid] = mtrackFid[track.tid].union(set(idx3))
frameState[idx-1, 3] = 1
frameState[idx3, 2] = 1
for track in md.hand_tracks:
f11, f22 = track.during
idx11 = set(np.where(handState[:,0] >= f11)[0])
idx22 = set(np.where(handState[:,0] <= f22)[0])
idx33 = list(idx11.intersection(idx22))
'''手部存在标记'''
handState[idx33, 2] = 1
'''未来改进方向is_static 可以用手部状态判断的函数代替'''
if track.is_static(70) and len(idx33)>1:
idx11 = set(np.where(handState[:,0] >= f11)[0])
idx22 = set(np.where(handState[:,0] <= f22)[0])
idx33 = list(idx11.intersection(idx22))
'''手部静止标记'''
handState[idx33, 2] = 2
'''状态变化输出'''
for tid, fid in mtrackFid.items():
fstate = np.zeros((fnum, 1), dtype=np.int64)
fstate[list(fid), 0] = tid
frameState = np.concatenate((frameState, fstate), axis = 1).astype(np.int64)
return frameState, handState
def state_measure(periods, weights, hands, spath=None):
'''两种状态static、motion,
(t0, t1)
t0: static ----> motion
t1: motion ----> static
'''
PrevState = 'static'
CuurState = 'static'
camtype_0, frstate_0 = periods[0]
camtype_1, frstate_1 = periods[1]
'''计算总时间区间: tmin, tmax, during'''
tmin_w, tmax_w = np.min(weights[:, 0]), np.max(weights[:, 0])
tmin_0, tmax_0 = np.min(frstate_0[:, 1]), np.max(frstate_0[:, 1])
tmin_1, tmax_1 = np.min(frstate_1[:, 1]), np.max(frstate_1[:, 1])
tmin = min([tmin_w, tmin_0, tmin_1])
tmax = max([tmax_w, tmax_0, tmax_1])
# for ctype, tboxes, _ in tracker_boxes:
# t_min, t_max = np.min(tboxes[:, 9]), np.max(tboxes[:, 9])
# if t_min<tmin:
# tmin = t_min
# if t_max>tmax:
# tmax = t_max
# during = tmax - tmin
fig, (ax1, ax2, ax3) = plt.subplots(3, 1)
ax1.plot(weights[:, 0] - tmin, weights[:, 1], 'bo-', linewidth=1, markersize=4)
# ax1.set_xlim([0, during])
ax1.set_title('Weight (g)')
ax2.plot(frstate_0[:, 1] - tmin, frstate_0[:, 2], 'rx-', linewidth=1, markersize=8)
ax2.plot(frstate_0[:, 1] - tmin, frstate_0[:, 3], 'bo-', linewidth=1, markersize=4)
# ax2.set_xlim([0, during])
ax2.set_title(f'Camera: {int(camtype_0)}')
ax3.plot(frstate_1[:, 1] - tmin, frstate_1[:, 2], 'rx-', linewidth=1, markersize=8)
ax3.plot(frstate_1[:, 1] - tmin, frstate_1[:, 3], 'bo-', linewidth=1, markersize=4)
ax3.set_title(f'Camera: {int(camtype_1)}')
if spath:
plt.savefig(spath)
plt.show()
def read_yolo_weight_data(eventdir):
filepaths = []
for filename in os.listdir(eventdir):
file, ext = os.path.splitext(filename)
if ext =='.data':
filepath = os.path.join(eventdir, filename)
filepaths.append(filepath)
if len(filepaths) != 5:
return
tracker_boxes = []
WeightDict, SensorDict, ProcessTimeDict = {}, {}, {}
for filepath in filepaths:
filename = os.path.basename(filepath)
if filename.find('_track.data')>0:
CamerType = filename.split('_')[0]
trackerboxes, trackerfeats = extract_data_1(filepath)
tracker_boxes.append((CamerType, trackerboxes, trackerfeats))
if filename.find('process.data')==0:
WeightDict, SensorDict, ProcessTimeDict = read_weight_timeConsuming(filepath)
'''====================重力信号处理===================='''
weights = [(float(t), w) for t, w in WeightDict.items()]
weights = np.array(weights)
return tracker_boxes, weights
def main():
eventdir = r"\\192.168.1.28\share\测试_202406\0819\images\20240817-192549-6940120c-634c-481b-97a6-65042729f86b_null"
tracker_boxes, weights = read_yolo_weight_data(eventdir)
'''====================图像运动分析===================='''
win_width = 12
periods, hands = [], []
for ctype, tboxes, _ in tracker_boxes:
period, handState = devide_motion_state(tboxes, win_width)
periods.append((ctype, period))
hands.append((ctype, handState))
print('done!')
'''===============重力、图像信息融合==================='''
state_measure(periods, weights, hands)
if __name__ == "__main__":
main()