回传数据解析,兼容v5和v10

This commit is contained in:
jiajie555
2025-04-18 14:41:53 +08:00
commit 010f5c445a
888 changed files with 93632 additions and 0 deletions

View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 31 17:06:34 2023
@author: ym
"""
from .proBoxes import Boxes, boxes_add_fid
from .iterYaml import IterableSimpleNamespace, yaml_load
__all__ = "IterableSimpleNamespace", "yaml_load", "Boxes", "boxes_add_fid"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 20 14:21:13 2023
@author: ym
"""
import cv2
# import sys
# sys.path.append(r"D:\DeepLearning\yolov5")
# from ultralytics.utils.plotting import Annotator, colors
from .plotting import Annotator, colors
class TrackAnnotator(Annotator):
def plotting_track(self, track, names='abc'):
"""
track[x, y, w, h, track_id, score, cls, frame_index]
boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
0 1 2 3 4 5 6 7 8
id跟踪id从 1 开始计数,
frame_index: 帧索引,从 1 开始计数
cls类别编号从 0 开始计数,用作 names 的 key 值
"""
if track.size==0: return
id, cls = track[0, 4], track[0, 6]
if id >=0 and cls==0:
color = colors(int(cls), True)
elif id >=0 and cls!=0:
color = colors(int(id), True) # 不存在 id = 0不会和上面产生冲突
else:
color = colors(19, True) # 19为调色板的最后一个元素
nb = track.shape[0]
for i in range(nb):
if i == 0:
# label = f'{int(track[i, 4])}:({int(track[i, 7])})'
label = f'ID_{int(track[i, 4])}'
elif i == nb-1:
label = ''
# label = f'{int(track[i, 4])}:({int(track[i, 7])})&{int(nb)}'
else:
label = ''
self.circle_label(track[i, :], label, color=color)
def circle_label(self, track, label='', color=(128, 128, 128), txt_color=(255, 255, 255)):
"""
绘制选定 track 的轨迹
"""
if track.size==0: return
x, y = int((track[0]+track[2])/2), int((track[1]+track[3])/2)
cv2.circle(self.im, (x, y), 6, color, 2)
# txt_color = (0,0,0)
if label:
tf = max(self.lw - 1, 1) # font thickness
w, h = cv2.getTextSize(label, 0, fontScale=self.lw / 3, thickness=tf)[0] # text width, height
outside = x + w <= self.im.shape[1]-3
# p2 = x + w, y - h - 3 if outside else y + h + 3
# cv2.rectangle(self.im, p1, p2, color, -1, cv2.LINE_AA) # filled
cv2.putText(self.im,
label, (x-10 if outside else x-w+2, y-20),
0,
# self.lw / 3,
self.lw/2,
txt_color,
thickness=tf,
lineType=cv2.LINE_AA)

View File

@ -0,0 +1,387 @@
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 15 15:26:38 2024
@author: ym
"""
import numpy as np
import cv2
import os
from pathlib import Path
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from tracking.utils.annotator import TrackAnnotator
from tracking.utils.plotting import colors
def plot_frameID_y2(vts):
# boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
# 0, 1, 2, 3, 4, 5, 6, 7, 8
markers = ['o', 'v', '^', '<', '>', 's', 'p', 'P','*', '+', 'x', 'X', 'd', 'D', 'H']
colors = ['b', 'g', 'c', 'm', 'y', ]
bboxes = vts.bboxes
maxfid = max(vts.bboxes[:, 7])
CART_HIGH_THRESH1 = 430
TRACK_STATIC_THRESH = 8
fig = plt.figure(figsize=(16, 12))
gs = fig.add_gridspec(2, 1, left=0.1, right=0.9, bottom=0.1, top=0.9,
wspace=0.05, hspace=0.15)
# ax1, ax2 = axs
ax1 = fig.add_subplot(gs[0,0])
ax2 = fig.add_subplot(gs[1,0])
ax1.plot((0, maxfid+5), (1280-CART_HIGH_THRESH1, 1280-CART_HIGH_THRESH1), 'b--', linewidth=2 )
ax2.plot((0, maxfid+5), (1280-CART_HIGH_THRESH1, 1280-CART_HIGH_THRESH1), 'b--', linewidth=2 )
hands = [t for t in vts.Hands if not t.isHandStatic]
tracks = vts.join_tracks(vts.Residual, hands)
for i, track in enumerate(vts.tracks):
boxes = track.boxes
cls, tid = track.cls, track.tid
y2, fids = boxes[:, 3], boxes[:, 7]
if cls==0:
ax1.scatter(fids, 1280-y2, marker='4', s=50, color=colors[tid%len(colors)], label = f"ID_{tid}")
else:
ax1.scatter(fids, 1280-y2, marker=markers[tid%len(markers)], color=colors[tid%len(colors)],
s=50, label = f"ID_{tid}")
# hist, bins = np.histogram(1280-y2, bins='auto')
ax1.set_ylim([-50, 1350])
for i, track in enumerate(tracks):
boxes = track.boxes
cls, tid = track.cls, track.tid
y2, fids = boxes[:, 3], boxes[:, 7]
if cls==0:
ax2.scatter(fids, 1280-y2, marker='4', s=50, color=colors[tid%len(colors)], label = f"ID_{tid}")
else:
ax2.scatter(fids, 1280-y2, marker=markers[tid%len(markers)], color=colors[tid%len(colors)],
s=50, label = f"ID_{tid}")
# hist, bins = np.histogram(1280-y2, bins='auto')
ax2.set_ylim([-50, 1350])
ax1.grid(True), ax1.set_xlim(0, maxfid+5), ax1.set_title('y2')
ax1.legend()
ax2.grid(True), ax2.set_xlim(0, maxfid+5), ax2.set_title('y2')
ax2.legend()
# plt.show()
return plt
def draw_all_trajectories(vts, edgeline, save_dir, file, draw5p=False):
'''显示四种类型结果'''
# file, ext = os.path.splitext(filename)
# edgeline = cv2.imread("./shopcart/cart_tempt/edgeline.png")
# edgeline2 = edgeline1.copy()
# edgeline = np.concatenate((edgeline1, edgeline2), exis = 1)
if not isinstance(save_dir, Path): save_dir = Path(save_dir)
''' all tracks 中心轨迹'''
img1, img2 = edgeline.copy(), edgeline.copy()
img1 = drawTrack(vts.tracks, img1)
img2 = drawTrack(vts.Residual, img2)
imgcat = np.concatenate((img1, img2), axis = 1)
H, W = imgcat.shape[:2]
cv2.line(imgcat, (int(W/2), 0), (int(W/2), H), (128, 255, 128), 2)
# imgpth = save_dir.joinpath(f"{file}_show.png")
# cv2.imwrite(str(imgpth), img)
if not draw5p:
return imgcat
## yi write piclke change
''' tracks 5点轨迹'''
# trackpth = save_dir / Path("trajectory")
# if not trackpth.exists():
# trackpth.mkdir(parents=True, exist_ok=True)
# for track in vts.tracks:
# # if track.cls != 0:
# img = edgeline.copy()
# img = draw5points(track, img)
#
# pth = trackpth.joinpath(f"{file}_{track.tid}_.png")
# cv2.imwrite(str(pth), img)
# for track in vts.Residual:
# # if track.cls != 0:
# img = edgeline.copy()
# img = draw5points(track, img)
# pth = trackpth.joinpath(f"{file}_{track.tid}_.png")
# cv2.imwrite(str(pth), img)
return imgcat
# =============================================================================
# '''3. moving tracks 中心轨迹'''
# filename2 = f"{file}_show_r.png"
# img = edgeline.copy()
# img = drawTrack(vts.Residual, img)
# pth = save_dir.joinpath(filename2)
# cv2.imwrite(pth, img)
# =============================================================================
'''5. tracks 时序trajmin、trajmax、arearate、incartrate'''
# plt = drawtracefeat(vts)
# pth = save_dir.joinpath(f"{file}_x.png")
# plt.savefig(pth)
# plt.close('all')
def drawFeatures(allvts, save_dir):
# [trajlen_min, trajdist_max, trajlen_rate, trajist_rate]]
feats = [track.TrajFeat for vts in allvts for track in vts.tracks]
feats = np.array(feats)
fig, ax = plt.subplots()
ax.scatter(feats[:,3], feats[:, 1], s=10)
# ax.set_xlim(0, 2)
# ax.set_ylim(0, 100)
ax.grid(True)
plt.show()
pth = save_dir.joinpath("scatter.png")
plt.savefig(pth)
plt.close('all')
def drawtracefeat(vts):
'''
需要对曲线进行特征提取和分类
boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
0 1 2 3 4 5 6 7 8
'''
# matplotlib.use('Agg')
fid = vts.frameid
fid1, fid2 = min(fid), max(fid)
fig, axs = plt.subplots(2, 2,figsize=(18, 8))
kernel = [0.15, 0.7, 0.15]
for i, track in enumerate(vts.tracks):
boxes = track.boxes
tid = int(track.tid)
cls = int(track.cls)
posState = track.posState
if track.frnum>=5:
x1 = boxes[1:, 7]
y1 = track.trajmin
x11 = [i for i in range(int(min(x1)), int(max(x1)+1))]
y11 = np.interp(x11, x1, y1)
y11[1:-1] = np.convolve(y11, kernel, 'valid')
x3 = boxes[1:, 7]
y3 = track.trajmax
x33 = [i for i in range(int(min(x3)), int(max(x3)+1))]
y33 = np.interp(x33, x3, y3)
y33[1:-1] = np.convolve(y33, kernel, 'valid')
x2 = boxes[:, 7]
# y2 = track.Area/max(track.Area) - min(track.Area/max(track.Area))
y2 = track.Area/max(track.Area)
x22 = [i for i in range(int(min(x2)), int(max(x2)+1))]
y22 = np.interp(x22, x2, y2)
y22[1:-1] = np.convolve(y22, kernel, 'valid')
x4 = boxes[:, 7]
y4 = track.incartrates
x44 = [i for i in range(int(min(x4)), int(max(x4)+1))]
y44 = np.interp(x44, x4, y4)
y44[1:-1] = np.convolve(y44, kernel, 'valid')
elif track.frnum>=2:
x11 = boxes[1:, 7]
y11 = track.trajmin
x33 = boxes[1:, 7]
y33 = track.trajmax
x22 = boxes[:, 7]
# y22 = track.Area/max(track.Area) - min(track.Area/max(track.Area))
y22 = track.Area/max(track.Area)
x44 = boxes[:, 7]
y44 = track.incartrates
else:
continue
# cls!=0, max(y)>20
if cls!=0 and cls!=9 and posState>=2 and max(y11)>10 and max(y33)>10 and max(y22>0.1):
axs[0, 0].plot(x11, y11, label=f"ID_{tid}")
axs[0, 0].legend()
# axs[0].set_ylim(0, 100)
axs[0, 1].plot(x22, y22, label=f"ID_{tid}")
axs[0, 1].legend()
axs[1, 0].plot(x33, y33, label=f"ID_{tid}")
axs[1, 0].legend()
axs[1, 1].plot(x44, y44, label=f"ID_{tid}")
axs[1, 1].legend()
axs[0, 0].grid(True), axs[0, 0].set_xlim(fid1, fid2+10), axs[0, 0].set_title('trajmin')
axs[0, 1].grid(True), axs[0, 1].set_xlim(fid1, fid2+10), axs[0, 1].set_title('arearate')
axs[1, 0].grid(True), axs[1, 0].set_xlim(fid1, fid2+10), axs[1, 0].set_title('trajmax')
axs[1, 1].grid(True), axs[1, 1].set_xlim(fid1, fid2+10), axs[1, 1].set_ylim(-0.1, 1.1)
axs[1, 1].set_title('incartrate')
# pth = save_dir.joinpath(f"{file}_show_x.png")
# plt.savefig(pth)
# plt.savefig(f"./result/cls11_80212_time/{file}_show_x.png")
# plt.show()
return plt
def draw5points(track, img):
"""
显示中心点、4角点的轨迹以及轨迹 features
"""
colorx = np.array([[255, 255, 255], [255, 255, 255], [255, 255, 255], [255, 255, 255], [255, 255, 255],
[0, 0, 255], [0, 255, 0], [255, 51, 255], [102, 178, 255], [51, 153, 255],[255, 153, 153],
[255, 102, 102], [255, 51, 51], [153, 255, 153], [102, 255, 102], [51, 255, 51],
[255, 102, 255], [153, 204, 255], [255, 0, 0], [255, 255, 255]], dtype=np.uint8)
color = ((0, 0, 255), (255, 128, 0))
# img = cv2.imread("./shopcart/cart_tempt/edgeline.png")
boxes = track.boxes
cornpoints = track.cornpoints
trajlens = [int(t) for t in track.trajlens]
trajdist = [int(t) for t in track.trajdist]
if len(track.trajmin):
trajstd = np.std(track.trajmin)
else:
trajstd = 0
trajlen_min, trajlen_max, trajdist_min, trajdist_max, trajlen_rate, trajdist_rate = track.TrajFeat
for i in range(boxes.shape[0]):
cv2.circle(img, (int(cornpoints[i, 0]), int(cornpoints[i, 1])), 6, (255, 255, 255), 2)
cv2.circle(img, (int(cornpoints[i, 2]), int(cornpoints[i, 3])), 6, (255, 0, 255), 2)
cv2.circle(img, (int(cornpoints[i, 4]), int(cornpoints[i, 5])), 6, (0, 255, 0), 2)
cv2.circle(img, (int(cornpoints[i, 6]), int(cornpoints[i, 7])), 6, (64, 128, 255), 2)
cv2.circle(img, (int(cornpoints[i, 8]), int(cornpoints[i, 9])), 6, (255, 128, 64), 2)
label_0 = f"ID: {track.tid}, Class: {track.cls}"
label_1 = f"trajlens: {trajlens}, trajlen_min: {int(trajlen_min)}"
label_2 = f"trajdist: {trajdist}: trajdist_max: {int(trajdist_max)}"
label_3 = "trajlen_min/trajlen_max: {:.2f}/{:.2f} = {:.2f}".format(trajlen_min, trajlen_max, trajlen_rate)
label_4 = "trajdist_min/mwh : {:.2f}/{:.2f} = {:.2f}".format(trajdist_min, track.mwh, trajdist_rate)
label_5 = "std(trajmin) : {:.2f}".format(trajstd)
label_6 = "PCA(variance_ratio) : "
label_7 = "Rect W&H&Ratio : "
label_8 = ""
# label_8 = "IOU of incart/maxbox/minbox: {:.2f}, {:.2f}, {:.2f}".format(
# track.feature_ious[0], track.feature_ious[3], track.feature_ious[4])
'''=============== 最小轨迹长度索引 ===================='''
trajlens = [int(t) for t in track.trajrects_wh]
if track.isCornpoint:
idx = 0
else:
idx = trajlens.index(min(trajlens))
'''=============== PCA ===================='''
if trajlens[idx] > 12:
X = cornpoints[:, 2*idx:2*(idx+1)]
pca = PCA()
pca.fit(X)
label_6 = "PCA(variance_ratio): {:.2f}".format(pca.explained_variance_ratio_[0])
# if sum(np.isnan(pca.explained_variance_ratio_)) == 0:
for i, (comp, var) in enumerate(zip(pca.components_, pca.explained_variance_ratio_)):
pt1 = (pca.mean_ - comp*var*200).astype(np.int64)
pt2 = (pca.mean_ + comp*var*200).astype(np.int64)
cv2.line(img, pt1, pt2, color=color[i], thickness=2)
'''=============== RECT ===================='''
rect = track.trajrects[idx]
box = cv2.boxPoints(rect)
box = np.int0(box)
cv2.drawContours(img, [box], 0, (0, 255, 0), 2)
label_7 = "Rect W&H&Ratio: {}, {}, {:.2f}".format(int(rect[1][0]), int(rect[1][1]), min(rect[1])/(max(rect[1])+0.001))
'''=============== 显示文字 ===================='''
# label = [label_0, label_1, label_2, label_3, label_4, label_5, label_6, label_7, label_8]
# w, h = cv2.getTextSize('abc', 0, fontScale=2, thickness=1)[0]
# for i in range(len(label)):
# cv2.putText(img, label[i], (20, int((i+1)*1.1*h)), 0, 1,
# [int(x) for x in colorx[i]], 2, lineType=cv2.LINE_AA)
# pth = save_dir.joinpath(f"{file}_{track.tid}.png")
# cv2.imwrite(pth, img)
'''撰写专利需要,生成黑白图像'''
# imgbt = cv2.bitwise_not(img)
# for i in range(box.shape[0]):
# cv2.circle(imgbt, (int(cornpoints[i, 0]), int(cornpoints[i, 1])), 14, (0, 0, 0), 2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 2]), int(cornpoints[i, 3])), color= (0, 0, 0), markerType=3, markerSize = 30, thickness=2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 4]), int(cornpoints[i, 5])), color= (0, 0, 0), markerType=4, markerSize = 30, thickness=2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 6]), int(cornpoints[i, 7])), color= (0, 0, 0), markerType=5, markerSize = 30, thickness=2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 8]), int(cornpoints[i, 9])), color= (0, 0, 0), markerType=6, markerSize = 30, thickness=2)
# cv2.imwrite(pth + f"/zhuanli/{file}_{track.tid}.png", imgbt)
return img
def drawTrack(tracks, img):
# img = cv2.imread("./shopcart/cart_tempt/edgeline.png")
annotator = TrackAnnotator(img, line_width=2)
for track in tracks:
if isinstance(track, np.ndarray):
annotator.plotting_track(track)
else:
annotator.plotting_track(track.boxes)
img = annotator.result()
# pth = save_dir.joinpath(f"{filename}")
# cv2.imwrite(pth, img)
return img
if __name__ == "__main__":
y = np.array([5.0, 20, 40, 41, 42, 55, 56])

27
tracking/utils/gen.py Normal file
View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 16 10:36:38 2024
@author: ym
"""
import contextlib
import time
class Profile(contextlib.ContextDecorator):
# YOLOv5 Profile class. Usage: @Profile() decorator or 'with Profile():' context manager
def __init__(self, t=0.0):
self.t = t
# self.cuda = torch.cuda.is_available()
def __enter__(self):
self.start = self.time()
return self
def __exit__(self, type, value, traceback):
self.dt = self.time() - self.start # delta-time
self.t += self.dt # accumulate dt
def time(self):
# if self.cuda:
# torch.cuda.synchronize()
return time.time()

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 31 17:07:09 2023
@author: ym
"""
from pathlib import Path
from types import SimpleNamespace
import re
import yaml
class IterableSimpleNamespace(SimpleNamespace):
"""
Ultralytics IterableSimpleNamespace is an extension class of SimpleNamespace that adds iterable functionality and
enables usage with dict() and for loops.
"""
def __iter__(self):
"""Return an iterator of key-value pairs from the namespace's attributes."""
return iter(vars(self).items())
def __str__(self):
"""Return a human-readable string representation of the object."""
return '\n'.join(f'{k}={v}' for k, v in vars(self).items())
def __getattr__(self, attr):
"""Custom attribute access error message with helpful information."""
name = self.__class__.__name__
raise AttributeError(f"""
'{name}' object has no attribute '{attr}'. This may be caused by a modified or out of date ultralytics
'default.yaml' file.\nPlease update your code with 'pip install -U ultralytics' and if necessary replace
DEFAULT_CFG_PATH with the latest version from
https://github.com/ultralytics/ultralytics/blob/main/ultralytics/cfg/default.yaml
""")
def get(self, key, default=None):
"""Return the value of the specified key if it exists; otherwise, return the default value."""
return getattr(self, key, default)
def yaml_load(file='data.yaml', append_filename=False):
"""
Load YAML data from a file.
Args:
file (str, optional): File name. Default is 'data.yaml'.
append_filename (bool): Add the YAML filename to the YAML dictionary. Default is False.
Returns:
(dict): YAML data and file name.
"""
assert Path(file).suffix in ('.yaml', '.yml'), f'Attempting to load non-YAML file {file} with yaml_load()'
with open(file, errors='ignore', encoding='utf-8') as f:
s = f.read() # string
# Remove special characters
if not s.isprintable():
s = re.sub(r'[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD\U00010000-\U0010ffff]+', '', s)
# Add YAML filename to dict and return
data = yaml.safe_load(s) or {} # always return a dict (yaml.safe_load() may return None for empty files)
if append_filename:
data['yaml_file'] = str(file)
return data

View File

@ -0,0 +1,213 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 23 11:04:48 2024
@author: ym
"""
import numpy as np
import cv2
from scipy.spatial.distance import cdist
# from trackers.utils import matching
# TracksDict
def readDict(boxes, TracksDict):
feats = []
for i in range(boxes.shape[0]):
tid, fid, bid = int(boxes[i, 4]), int(boxes[i, 7]), int(boxes[i, 8])
trackdict = TracksDict[f"frame_{fid}"]
if "feats" in trackdict:
feat = trackdict["feats"][bid]
feats.append(feat)
if "boxes" in trackdict:
box = trackdict["boxes"][bid]
assert (box[:4].astype(int) == boxes[i, :4].astype(int)).all(), f"Please check: frame_{fid}"
if "imgs" in trackdict:
img = trackdict["imgs"][bid]
cv2.imwrite(f'./data/imgs/{tid}_{fid}_{bid}.png', img)
return np.asarray(feats, dtype=np.float32)
def track_equal_track(atrack, btrack):
# boxes: [x, y, w, h, track_id, score, cls, frame_index, box_index]
# 0 1 2 3 4 5 6 7 8
aboxes = atrack.boxes
bboxes = btrack.boxes
afeat = atrack.features
bfeat = btrack.features
# afeat = readDict(aboxes, TracksDict)
# bfeat = readDict(bboxes, TracksDict)
''' 1. 判断轨迹在时序上是否有交集 '''
afids = aboxes[:, 7].astype(np.int_)
bfids = bboxes[:, 7].astype(np.int_)
# 帧索引交集
interfid = set(afids).intersection(set(bfids))
# 或者直接判断帧索引是否有交集,返回 Ture or False
# interfid = set(afids).isdisjoint(set(bfids))
if len(interfid):
return False
''' 2. 轨迹特征相似度判断'''
feat = np.concatenate((afeat, bfeat), axis=0)
emb_simil = 1 - np.maximum(0.0, cdist(feat, feat, 'cosine'))
emb_ = 1 - np.maximum(0.0, cdist(np.mean(afeat, axis=0)[None, :], np.mean(bfeat, axis=0)[None, :], 'cosine'))/2
if emb_[0, 0]<0.66:
return False
''' 3. 轨迹空间iou'''
alabel = np.array([0] * afids.size, dtype=np.int_)
blabel = np.array([1] * bfids.size, dtype=np.int_)
label = np.concatenate((alabel, blabel), axis=0)
fids = np.concatenate((afids, bfids), axis=0)
indices = np.argsort(fids)
idx_pair = []
for i in range(len(indices)-1):
idx1, idx2 = indices[i], indices[i+1]
if label[idx1] != label[idx2] and fids[idx2] - fids[idx1] <= 3:
if label[idx1] == 0:
a_idx = idx1
b_idx = idx2-alabel.size
else:
a_idx = idx2
b_idx = idx1-alabel.size
idx_pair.append((a_idx, b_idx))
ious = []
embs = []
for a, b in idx_pair:
abox, bbox = aboxes[a, :], bboxes[b, :]
af, bf = afeat[a, :], bfeat[b, :]
emb_ab = 1 - np.maximum(0.0, cdist(af[None, :], bf[None, :], 'cosine'))
xa1, ya1 = abox[0] - abox[2]/2, abox[1] - abox[3]/2
xa2, ya2 = abox[0] + abox[2]/2, abox[1] + abox[3]/2
xb1, yb1 = bbox[0] - bbox[2]/2, bbox[1] - bbox[3]/2
xb2, yb2 = bbox[0] + bbox[2]/2, bbox[1] + bbox[3]/2
inter = (np.minimum(xb2, xa2) - np.maximum(xb1, xa1)).clip(0) * \
(np.minimum(yb2, ya2) - np.maximum(yb1, ya1)).clip(0)
# Union Area
box1_area = abox[2] * abox[3]
box2_area = bbox[2] * bbox[3]
union = box1_area + box2_area - inter + 1e-6
ious.append(inter/union)
embs.append(emb_ab[0, 0])
''' 4. 和同一手部关联,如何将该代码和 iou 部分相融合,需进一步完善'''
# ahands = np.array(atrack.Hands)
# bhands = np.array(btrack.Hands)
# ahids = ahands[:, 0]
# bhids = bhands[:, 0]
# interhid = set(ahids).intersection(set(bhids))
# for hid in interhid:
# aidx = ahands[:, 0] == hid
# bidx = bhands[:, 0] == hid
# ahfids = ahids[aidx, 1]
# bhfids = bhids[bidx, 1]
cont = False if len(interfid) else True # fid 无交集
cont1 = all(emb > 0.5 for emb in embs)
cont2 = all(iou > 0.5 for iou in ious)
# cont = cont and cont2 and cont3
cont = cont and cont1 and cont2
return cont
def track_equal_str(atrack, btrack):
if atrack == btrack:
return True
else:
return False
def merge_track(Residual):
out_list = []
alist = [t for t in Residual]
while alist:
atrack = alist[0]
cur_list = []
cur_list.append(atrack)
alist.pop(0)
blist = [b for b in alist]
alist = []
for btrack in blist:
if track_equal_str(atrack, btrack):
cur_list.append(btrack)
else:
alist.append(btrack)
out_list.append(cur_list)
return out_list
def main():
Residual = ['a', 'b', 'c', 'd', 'a', 'b', 'c', 'b', 'c', 'd']
out_list = merge_track(Residual)
print(Residual)
print(out_list)
if __name__ == "__main__":
main()
# =============================================================================
# for i, atrack in enumerate(input_list):
# cur_list = []
# cur_list.append(atrack)
# del input_list[i]
#
# for j, btrack in enumerate(input_list):
# if track_equal(atrack, btrack):
# cur_list.append(btrack)
# del input_list[j]
#
# out_list.append(cur_list)
# =============================================================================

364
tracking/utils/plotting.py Normal file
View File

@ -0,0 +1,364 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license
import contextlib
import math
import warnings
from pathlib import Path
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np
import torch
from PIL import Image, ImageDraw, ImageFont
from PIL import __version__ as pil_version
# from utils.general import increment_path
# from ultralytics.utils import LOGGER, TryExcept, ops, plt_settings, threaded
# from .checks import check_font, check_version, is_ascii
# from .files import increment_path
class Colors:
"""
Ultralytics default color palette https://ultralytics.com/.
This class provides methods to work with the Ultralytics color palette, including converting hex color codes to
RGB values.
Attributes:
palette (list of tuple): List of RGB color values.
n (int): The number of colors in the palette.
pose_palette (np.array): A specific color palette array with dtype np.uint8.
"""
def __init__(self):
"""Initialize colors as hex = matplotlib.colors.TABLEAU_COLORS.values()."""
hexs = ('FF3838', 'FF9D97', 'FF701F', 'FFB21D', 'CFD231', '48F90A', '92CC17', '3DDB86', '1A9334', '00D4BB',
'2C99A8', '00C2FF', '344593', '6473FF', '0018EC', '8438FF', '520085', 'CB38FF', 'FF95C8', 'FF37C7')
self.palette = [self.hex2rgb(f'#{c}') for c in hexs]
self.n = len(self.palette)
self.pose_palette = np.array([[255, 128, 0], [255, 153, 51], [255, 178, 102], [230, 230, 0], [255, 153, 255],
[153, 204, 255], [255, 102, 255], [255, 51, 255], [102, 178, 255], [51, 153, 255],
[255, 153, 153], [255, 102, 102], [255, 51, 51], [153, 255, 153], [102, 255, 102],
[51, 255, 51], [0, 255, 0], [0, 0, 255], [255, 0, 0], [255, 255, 255]],
dtype=np.uint8)
def __call__(self, i, bgr=False):
"""Converts hex color codes to RGB values."""
c = self.palette[int(i) % self.n]
return (c[2], c[1], c[0]) if bgr else c
@staticmethod
def hex2rgb(h):
"""Converts hex color codes to RGB values (i.e. default PIL order)."""
return tuple(int(h[1 + i:1 + i + 2], 16) for i in (0, 2, 4))
colors = Colors() # create instance for 'from utils.plots import colors'
class Annotator:
"""
Ultralytics Annotator for train/val mosaics and JPGs and predictions annotations.
Attributes:
im (Image.Image or numpy array): The image to annotate.
pil (bool): Whether to use PIL or cv2 for drawing annotations.
font (ImageFont.truetype or ImageFont.load_default): Font used for text annotations.
lw (float): Line width for drawing.
skeleton (List[List[int]]): Skeleton structure for keypoints.
limb_color (List[int]): Color palette for limbs.
kpt_color (List[int]): Color palette for keypoints.
"""
def __init__(self, im, line_width=None, font_size=None, font='Arial.ttf', pil=False, example='abc'):
"""Initialize the Annotator class with image and line width along with color palette for keypoints and limbs."""
assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to Annotator() input images.'
self.im = im
self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) # line width
# Pose
self.skeleton = [[16, 14], [14, 12], [17, 15], [15, 13], [12, 13], [6, 12], [7, 13], [6, 7], [6, 8], [7, 9],
[8, 10], [9, 11], [2, 3], [1, 2], [1, 3], [2, 4], [3, 5], [4, 6], [5, 7]]
self.limb_color = colors.pose_palette[[9, 9, 9, 9, 7, 7, 7, 0, 0, 0, 0, 0, 16, 16, 16, 16, 16, 16, 16]]
self.kpt_color = colors.pose_palette[[16, 16, 16, 16, 16, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 9, 9]]
def box_label(self, box, label='', color=(128, 128, 128), txt_color=(255, 255, 255)):
"""Add one xyxy box to image with label."""
if isinstance(box, torch.Tensor):
box = box.tolist()
p1, p2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3]))
cv2.rectangle(self.im, p1, p2, color, thickness=self.lw, lineType=cv2.LINE_AA)
if label:
tf = max(self.lw - 1, 1) # font thickness
w, h = cv2.getTextSize(label, 0, fontScale=self.lw / 3, thickness=tf)[0] # text width, height
outside = p1[1] - h >= 3
p2 = p1[0] + w, p1[1] - h - 3 if outside else p1[1] + h + 3
cv2.rectangle(self.im, p1, p2, color, -1, cv2.LINE_AA) # filled
cv2.putText(self.im,
label, (p1[0], p1[1] - 2 if outside else p1[1] + h + 2),
0,
self.lw / 3,
txt_color,
thickness=tf,
lineType=cv2.LINE_AA)
def masks(self, masks, colors, im_gpu, alpha=0.5, retina_masks=False):
"""
Plot masks on image.
Args:
masks (tensor): Predicted masks on cuda, shape: [n, h, w]
colors (List[List[Int]]): Colors for predicted masks, [[r, g, b] * n]
im_gpu (tensor): Image is in cuda, shape: [3, h, w], range: [0, 1]
alpha (float): Mask transparency: 0.0 fully transparent, 1.0 opaque
retina_masks (bool): Whether to use high resolution masks or not. Defaults to False.
"""
if self.pil:
# Convert to numpy first
self.im = np.asarray(self.im).copy()
if len(masks) == 0:
self.im[:] = im_gpu.permute(1, 2, 0).contiguous().cpu().numpy() * 255
if im_gpu.device != masks.device:
im_gpu = im_gpu.to(masks.device)
colors = torch.tensor(colors, device=masks.device, dtype=torch.float32) / 255.0 # shape(n,3)
colors = colors[:, None, None] # shape(n,1,1,3)
masks = masks.unsqueeze(3) # shape(n,h,w,1)
masks_color = masks * (colors * alpha) # shape(n,h,w,3)
inv_alph_masks = (1 - masks * alpha).cumprod(0) # shape(n,h,w,1)
mcs = masks_color.max(dim=0).values # shape(n,h,w,3)
im_gpu = im_gpu.flip(dims=[0]) # flip channel
im_gpu = im_gpu.permute(1, 2, 0).contiguous() # shape(h,w,3)
im_gpu = im_gpu * inv_alph_masks[-1] + mcs
im_mask = (im_gpu * 255)
im_mask_np = im_mask.byte().cpu().numpy()
self.im[:] = im_mask_np if retina_masks else scale_image(im_mask_np, self.im.shape)
if self.pil:
# Convert im back to PIL and update draw
self.fromarray(self.im)
def kpts(self, kpts, shape=(640, 640), radius=5, kpt_line=True):
"""
Plot keypoints on the image.
Args:
kpts (tensor): Predicted keypoints with shape [17, 3]. Each keypoint has (x, y, confidence).
shape (tuple): Image shape as a tuple (h, w), where h is the height and w is the width.
radius (int, optional): Radius of the drawn keypoints. Default is 5.
kpt_line (bool, optional): If True, the function will draw lines connecting keypoints
for human pose. Default is True.
Note: `kpt_line=True` currently only supports human pose plotting.
"""
if self.pil:
# Convert to numpy first
self.im = np.asarray(self.im).copy()
nkpt, ndim = kpts.shape
is_pose = nkpt == 17 and ndim == 3
kpt_line &= is_pose # `kpt_line=True` for now only supports human pose plotting
for i, k in enumerate(kpts):
color_k = [int(x) for x in self.kpt_color[i]] if is_pose else colors(i)
x_coord, y_coord = k[0], k[1]
if x_coord % shape[1] != 0 and y_coord % shape[0] != 0:
if len(k) == 3:
conf = k[2]
if conf < 0.5:
continue
cv2.circle(self.im, (int(x_coord), int(y_coord)), radius, color_k, -1, lineType=cv2.LINE_AA)
if kpt_line:
ndim = kpts.shape[-1]
for i, sk in enumerate(self.skeleton):
pos1 = (int(kpts[(sk[0] - 1), 0]), int(kpts[(sk[0] - 1), 1]))
pos2 = (int(kpts[(sk[1] - 1), 0]), int(kpts[(sk[1] - 1), 1]))
if ndim == 3:
conf1 = kpts[(sk[0] - 1), 2]
conf2 = kpts[(sk[1] - 1), 2]
if conf1 < 0.5 or conf2 < 0.5:
continue
if pos1[0] % shape[1] == 0 or pos1[1] % shape[0] == 0 or pos1[0] < 0 or pos1[1] < 0:
continue
if pos2[0] % shape[1] == 0 or pos2[1] % shape[0] == 0 or pos2[0] < 0 or pos2[1] < 0:
continue
cv2.line(self.im, pos1, pos2, [int(x) for x in self.limb_color[i]], thickness=2, lineType=cv2.LINE_AA)
if self.pil:
# Convert im back to PIL and update draw
self.fromarray(self.im)
def rectangle(self, xy, fill=None, outline=None, width=1):
"""Add rectangle to image (PIL-only)."""
self.draw.rectangle(xy, fill, outline, width)
def text(self, xy, text, txt_color=(255, 255, 255), anchor='top', box_style=False):
"""Adds text to an image using PIL or cv2."""
if anchor == 'bottom': # start y from font bottom
w, h = self.font.getsize(text) # text width, height
xy[1] += 1 - h
if self.pil:
if box_style:
w, h = self.font.getsize(text)
self.draw.rectangle((xy[0], xy[1], xy[0] + w + 1, xy[1] + h + 1), fill=txt_color)
# Using `txt_color` for background and draw fg with white color
txt_color = (255, 255, 255)
if '\n' in text:
lines = text.split('\n')
_, h = self.font.getsize(text)
for line in lines:
self.draw.text(xy, line, fill=txt_color, font=self.font)
xy[1] += h
else:
self.draw.text(xy, text, fill=txt_color, font=self.font)
else:
if box_style:
tf = max(self.lw - 1, 1) # font thickness
w, h = cv2.getTextSize(text, 0, fontScale=self.lw / 3, thickness=tf)[0] # text width, height
outside = xy[1] - h >= 3
p2 = xy[0] + w, xy[1] - h - 3 if outside else xy[1] + h + 3
cv2.rectangle(self.im, xy, p2, txt_color, -1, cv2.LINE_AA) # filled
# Using `txt_color` for background and draw fg with white color
txt_color = (255, 255, 255)
tf = max(self.lw - 1, 1) # font thickness
cv2.putText(self.im, text, xy, 0, self.lw / 3, txt_color, thickness=tf, lineType=cv2.LINE_AA)
def fromarray(self, im):
"""Update self.im from a numpy array."""
self.im = im if isinstance(im, Image.Image) else Image.fromarray(im)
self.draw = ImageDraw.Draw(self.im)
def result(self):
"""Return annotated image as array."""
return np.asarray(self.im)
def scale_image(masks, im0_shape, ratio_pad=None):
"""
Takes a mask, and resizes it to the original image size
Args:
masks (np.ndarray): resized and padded masks/images, [h, w, num]/[h, w, 3].
im0_shape (tuple): the original image shape
ratio_pad (tuple): the ratio of the padding to the original image.
Returns:
masks (torch.Tensor): The masks that are being returned.
"""
# Rescale coordinates (xyxy) from im1_shape to im0_shape
im1_shape = masks.shape
if im1_shape[:2] == im0_shape[:2]:
return masks
if ratio_pad is None: # calculate from im0_shape
gain = min(im1_shape[0] / im0_shape[0], im1_shape[1] / im0_shape[1]) # gain = old / new
pad = (im1_shape[1] - im0_shape[1] * gain) / 2, (im1_shape[0] - im0_shape[0] * gain) / 2 # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
top, left = int(pad[1]), int(pad[0]) # y, x
bottom, right = int(im1_shape[0] - pad[1]), int(im1_shape[1] - pad[0])
if len(masks.shape) < 2:
raise ValueError(f'"len of masks shape" should be 2 or 3, but got {len(masks.shape)}')
masks = masks[top:bottom, left:right]
masks = cv2.resize(masks, (im0_shape[1], im0_shape[0]))
if len(masks.shape) == 2:
masks = masks[:, :, None]
return masks
def boxing_img(det, img, line_width=3):
annotator = Annotator(img, line_width)
for *xyxy, id, conf, cls, _, _ in reversed(det):
label = (f'id:{int(id)} '+str(int(cls)) +f' {conf:.2f}')
if cls==0:
color = colors(int(cls), True)
else:
color = colors(int(id), True)
annotator.box_label(xyxy, label, color=color)
# Save results (image and video with tracking)
imgx = annotator.result()
return imgx
def array2list(bboxes):
track_fids = np.unique(bboxes[:, 7].astype(int))
track_fids.sort()
lboxes = []
for f_id in track_fids:
# print(f"The ID is: {t_id}")
idx = np.where(bboxes[:, 7] == f_id)[0]
box = bboxes[idx, :]
lboxes.append(box)
assert len(set(box[:, 4])) == len(box), "Please check!!!"
return lboxes
def get_subimgs(imgs, tracks, scale=2):
bboxes = []
if len(tracks):
bboxes = array2list(tracks)
subimgs = []
for i, boxes in enumerate(bboxes):
fid = int(boxes[0, 7])
for *xyxy, tid, conf, cls, fid, bid in boxes:
pt2 = [p/scale for p in xyxy]
x1, y1, x2, y2 = (int(pt2[0]), int(pt2[1])), (int(pt2[2]), int(pt2[3]))
subimgs.append((int(fid), int(bid), imgs[fid-1][y1:y2, x1:x2]))
return subimgs
def draw_tracking_boxes(imgs, tracks, scale=2):
'''需要确保 imgs 覆盖tracks中的帧ID数
tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
0 1 2 3 4 5 6 7 8
关键:
(1) imgs中的次序和 track 中的 fid 对应
(2) img 尺度小对于xyxy减半
'''
bboxes = []
if len(tracks):
bboxes = array2list(tracks)
# if len(bboxes)!=len(imgs):
# return False, imgs
annimgs = []
for i, boxes in enumerate(bboxes):
fid = int(boxes[0, 7])
annotator = Annotator(imgs[fid-1].copy())
for *xyxy, tid, conf, cls, fid, bid in boxes:
label = f'id:{int(tid)}_{int(cls)}_{conf:.2f}'
if cls==0:
color = colors(int(cls), True)
elif tid>0 and cls!=0:
color = colors(int(tid), True)
else:
color = colors(19, True) # 19为调色板的最后一个元素
pt2 = [p/scale for p in xyxy]
annotator.box_label(pt2, label, color=color)
img = annotator.result()
annimgs.append((int(fid), img))
return annimgs

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 31 17:14:37 2023
@author: ym
"""
import numpy as np
class Boxes:
def __init__(self, boxes, orig_shape=None) -> None:
"""Initialize the Boxes class."""
if boxes.ndim == 1:
boxes = boxes[None, :]
m, n = boxes.shape
assert n in (6, 7), f'expected `n` in [6, 7], but got {n}' # xyxy, track_id, conf, cls
'''对每一个box进行编号利用该编号可以索引对应 feature'''
self.data = np.concatenate([boxes[:, :4], np.arange(m).reshape(-1, 1), boxes[:, 4:]], axis=-1)
self.orig_shape = orig_shape
def cpu(self):
"""Return a copy of the tensor on CPU memory."""
return self if isinstance(self.data, np.ndarray) else self.__class__(self.data.cpu(), self.orig_shape)
def numpy(self):
"""Return a copy of the tensor as a numpy array."""
return self if isinstance(self.data, np.ndarray) else self.__class__(self.data.numpy(), self.orig_shape)
@property
def xyxy(self):
"""Return the boxes in xyxy format."""
return self.data[:, :4]
@property
def xyxyb(self):
"""Return the boxes in xyxyb format."""
return self.data[:, :5]
@property
def conf(self):
"""Return the confidence values of the boxes."""
return self.data[:, -2]
@property
def cls(self):
"""Return the class values of the boxes."""
return self.data[:, -1]
# def boxes_add_fid(tboxes):
# '''
# 将 bboxes 对应的帧索引添加到 boxes 最后一列
# Return
# bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index]
# '''
# bboxes = np.empty((0, 8), dtype = np.float32)
# for tbox, f in tboxes:
# data = tbox.numpy()
# frame = f * np.ones([data.shape[0], 1])
# bbox = np.concatenate([data, frame], axis=1)
# bboxes = np.concatenate([bboxes, bbox], axis=0)
# return bboxes
def boxes_add_fid(tboxes):
'''
将 bboxes 对应的帧索引添加到 boxes 最后一列
Return
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index]
'''
bboxes = np.empty((0, 8), dtype = np.float32)
for data, f in tboxes:
frame = f * np.ones([data.shape[0], 1])
bbox = np.concatenate([data, frame], axis=1)
bboxes = np.concatenate([bboxes, bbox], axis=0)
return bboxes

View File

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 20 14:28:20 2023
@author: ym
"""
import numpy as np
from scipy.spatial.distance import cdist
def boxes_add_fid(tboxes):
'''
将 bboxes 对应的帧索引添加到 boxes 最后一列
Return
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index]
'''
bboxes = np.empty((0, 8), dtype = np.float32)
for tbox, f in tboxes:
data = tbox.numpy()
frame = f * np.ones([data.shape[0], 1])
bbox = np.concatenate([data, frame], axis=1)
bboxes = np.concatenate([bboxes, bbox], axis=0)
return bboxes
def array2list(bboxes):
'''
将 bboxes 变换为 track 列表
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index]
Return
lboxes列表列表中元素具有同一 track_idxywh 格式
[x, y, w, h, track_id, score, cls, frame_index]
'''
track_ids = set(bboxes[:, 4])
lboxes = []
for t_id in track_ids:
idx = np.where(bboxes[:, 4] == t_id)[0]
box = bboxes[idx, :]
x = (box[:, 0] + box[:, 2]) / 2
y = (box[:, 1] + box[:, 3]) / 2
# box: [x, y, w, h, track_id, score, cls, frame_index]
box[:, 2] = box[:, 2] - box[:, 0]
box[:, 3] = box[:, 3] - box[:, 1]
box[:, 0] = x
box[:, 1] = y
lboxes.append(box)
return lboxes
def max_dist_track(tboxes):
'''
计算 tboxes 中最大dist的 track
Return
'''
max_track_dist, max_dist = 0, 0
for track in tboxes:
box = track[:, :4].astype(int)
dist = cdist(box[:, :2], box[:, :2])
dm = np.max(dist)
if dm > max_dist:
max_dist = dm
max_track = track.copy()
max_track_dist = dist.copy()
# 同一 track_id 中目标中心移动最大距离的索引ix1, ix2
indx, indy = np.where(dist == dm)
ix1, ix2 = indx[0], indy[0]
# 确保 ix1 < ix2索引 ix1 是开始时的视频
if ix1 > ix2: ix1, ix2 = ix2, ix1
# =============================================================================
# # =============================================================================
# # 逻辑分析
# # =============================================================================
# Scanzone = ((0, int(Height/4)), (int(2*Weight/3), Weight))
# if max_track.shape[0] > 10:
#
# # max_track 视频序列的第一帧索引 idx1
# frame_1 = int(min(max_track[:, 7]))
# idx1 = np.where(max_track[:, 7] == frame_1)[0][0]
#
# # max_track 视频序列的最后一帧索引 idx2
# frame_2 = int(max(max_track[:, 7]))
# idx2 = np.where(max_track[:, 7] == frame_2)[0][0]
#
# # max_track 视频序列的第一帧目标位置中心 (x1, y1)
# x1, y1 = max_track[idx1, :2]
#
# # max_track 视频序列的第最后一帧目标位置中心 (x2, y2)
# x2, y2 = max_track[idx2, :2]
#
#
# # track序列第一帧和最后一帧的距离该距离和 mx_dist 不是一个概念
# dist_1_2 = max_track_dist[idx1, idx2]
#
# if max_dist < 3 * Height/10:
# State = Uncertain
#
# elif y1 > y2:
# State = TakeOut
#
# elif y1 < y2:
# State = PutIn
# =============================================================================
return max_track, max_dist

750
tracking/utils/read_data.py Normal file
View File

@ -0,0 +1,750 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Jul 5 13:59:21 2024
函数 读取文件
extract_data() 0/1_track.data
read_tracking_output() 0/1_tracking_output.data
read_similar() process.data
@author: ym
"""
import numpy as np
import re
import os
from collections import OrderedDict
import warnings
import matplotlib.pyplot as plt
def str_to_float_arr(s):
# 移除字符串末尾的逗号(如果存在)
if s.endswith(','):
s = s[:-1]
# 使用split()方法分割字符串然后将每个元素转化为float
float_array = [float(x) for x in s.split(",")]
return float_array
def find_samebox_in_array(arr, target):
for i, st in enumerate(arr):
if st[:4] == target[:4]:
return i
return -1
def array2list(boxes, feats):
'''boxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]'''
trackID = np.unique(boxes[:, 4].astype(int))
track_ids = boxes[:, 4].astype(int)
lboxes = []
for t_id in trackID:
idx = np.where(track_ids == t_id)[0]
box = boxes[idx, :]
feat = feats[idx, :]
assert len(set(box[:, 7])) == len(box), "Please check!!!"
lboxes.append(box)
return lboxes
def extract_data(datapath):
'''
0/1_track.data 数据读取
对于特征,需要构造两个对象:
(1) tracking输出 boxes依据 (fid, bid) 找到对应的 feats, tracker_feat_dict 实现较为方便
(2) frameDictList 中元素为 frameDict输出同一接口
'''
bboxes, ffeats = [], []
trackerboxes = np.empty((0, 9), dtype=np.float64)
trackerfeats = np.empty((0, 256), dtype=np.float64)
boxes, feats, tboxes, tfeats = [], [], [], []
timestamps, frameIds = [], []
with open(datapath, 'r', encoding='utf-8') as lines:
for line in lines:
line = line.strip() # 去除行尾的换行符和可能的空白字符
if not line: # 跳过空行
continue
if line.find("CameraId")>=0:
if len(boxes): bboxes.append(np.array(boxes))
if len(feats): ffeats.append(np.array(feats))
if len(tboxes):
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)))
if len(tfeats):
trackerfeats = np.concatenate((trackerfeats, np.array(tfeats)))
timestamp, frameId = [int(ln.split(":")[1]) for ln in line.split(",")[1:]]
timestamps.append(timestamp)
frameIds.append(frameId)
boxes, feats, tboxes, tfeats = [], [], [], []
if line.find("box:") >= 0 and line.find("output_box:")<0 and line.find("out_boxes")<0:
box = line[line.find("box:") + 4:].strip()
# if len(box)==6:
boxes.append(str_to_float_arr(box))
if line.find("feat:") >= 0:
feat = line[line.find("feat:") + 5:].strip()
# if len(feat)==256:
feats.append(str_to_float_arr(feat))
if line.find("output_box:") >= 0:
assert(len(boxes)>=0 and len(boxes)==len(feats)), f"{datapath}, {datapath}, len(boxes)!=len(feats)"
box = str_to_float_arr(line[line.find("output_box:") + 11:].strip())
index = find_samebox_in_array(boxes, box)
if index >= 0:
tboxes.append(box) # 去掉'output_box:'并去除可能的空白字符
# feat_f = str_to_float_arr(input_feats[index])
feat_f = feats[index]
norm_f = np.linalg.norm(feat_f)
feat_f = feat_f / norm_f
tfeats.append(feat_f)
if len(boxes): bboxes.append(np.array(boxes))
if len(feats): ffeats.append(np.array(feats))
if len(tboxes): trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)))
if len(tfeats): trackerfeats = np.concatenate((trackerfeats, np.array(tfeats)))
assert(len(bboxes)==len(ffeats)), "Error at Yolo output!"
assert(len(trackerboxes)==len(trackerfeats)), "Error at tracker output!"
## 生成帧为单位的特征列表
tracker_feats = {}
frmIDs = np.sort(np.unique(trackerboxes[:, 7].astype(int)))
for fid in frmIDs:
idx = np.where(trackerboxes[:, 7] == fid)[0]
boxes = trackerboxes[idx, :]
feats = trackerfeats[idx, :]
for i in range(len(boxes)):
f, b = int(boxes[i, 7]), int(boxes[i, 8])
tracker_feats.update({f"{f}_{b}": feats[i, :]})
boxes, trackingboxes= [], []
tracking_flag = False
with open(datapath, 'r', encoding='utf-8') as lines:
for line in lines:
line = line.strip() # 去除行尾的换行符和可能的空白字符
if not line: # 跳过空行
tracking_flag = False
continue
if tracking_flag:
if line.find("tracking_") >= 0:
tracking_flag = False
else:
box = str_to_float_arr(line)
boxes.append(box)
if line.find("tracking_") >= 0:
tracking_flag = True
if len(boxes):
trackingboxes.append(np.array(boxes))
boxes = []
if len(boxes):
trackingboxes.append(np.array(boxes))
# tracking_feat_dict = {}
# try:
# for i, boxes in enumerate(trackingboxes):
# for box in boxes:
# tid, fid, bid = int(box[4]), int(box[7]), int(box[8])
# if f"track_{tid}" not in tracking_feat_dict:
# tracking_feat_dict[f"track_{tid}"]= {"feats": {}}
# tracking_feat_dict[f"track_{tid}"]["feats"].update({f"{fid}_{bid}": tracker_feat_dict[f"frame_{fid}"]["feats"][bid]})
# except Exception as e:
# print(f'Path: {datapath}, tracking_feat_dict can not be structured correcttly, Error: {e}')
# tracker_feat_dict = {}
# tracker_feat_dict["timestamps"] = timestamps
# tracker_feat_dict["frameIds"] = frameIds
# for i in range(len(trackerboxes)):
# fid, bid = int(trackerboxes[i, 7]), int(trackerboxes[i, 8])
# if f"frame_{fid}" not in tracker_feat_dict:
# tracker_feat_dict[f"frame_{fid}"]= {"feats": {}}
# tracker_feat_dict[f"frame_{fid}"]["feats"].update({bid: trackerfeats[i, :]})
trackingfeats = []
try:
for i, boxes in enumerate(trackingboxes):
feats = []
for box in boxes:
fid, bid = int(box[7]), int(box[8])
feat = tracker_feats[f"{fid}_{bid}"]
feats.append(feat)
trackingfeats.append(np.array(feats))
except Exception as e:
print(f'Path: {datapath}, trackingfeats can not be structured correcttly, Error: {e}')
# return bboxes, ffeats, trackerboxes, tracker_feat_dict, trackingboxes, trackingfeats
return bboxes, ffeats, trackerboxes, tracker_feats, trackingboxes, trackingfeats
def extract_data_realtime(datapath):
boxes, feats = [], []
tracker_feats = {}
with open(datapath, 'r', encoding='utf-8') as lines:
for line in lines:
line = line.strip() # 去除行尾的换行符和可能的空白字符
if not line: # 跳过空行
continue
if line.endswith(','):
line = line[:-1]
ftlist = [float(x) for x in line.split()]
if len(ftlist) != 265:
continue
boxes.append(ftlist[:9])
feats.append(ftlist[9:])
trackerboxes = np.array(boxes)
trackerfeats = np.array(feats)
if len(trackerboxes)==0 or len(trackerboxes) != len(trackerfeats):
return np.array([]), {}
frmIDs = np.sort(np.unique(trackerboxes[:, 7].astype(int)))
for fid in frmIDs:
idx = np.where(trackerboxes[:, 7] == fid)[0]
box = trackerboxes[idx, :]
feat = trackerfeats[idx, :]
for i in range(len(box)):
f, b = int(box[i, 7]), int(box[i, 8])
tracker_feats.update({f"{f}_{b}": feat[i, :]})
return trackerboxes, tracker_feats
def read_tracking_output_realtime(datapath):
trackingboxes, trackingfeats = [], []
tracking_outboxes, tracking_outfeats = [], []
with open(datapath, 'r', encoding='utf-8') as lines:
boxes, feats = [], []
Flag = False
for line in lines:
line = line.strip() # 去除行尾的换行符和可能的空白字符
if not line: # 跳过空行
continue
if line.endswith(','):
line = line[:-1]
ftlist = [float(x) for x in line.split()]
if len(ftlist) != 265: continue
Flag = all(elem == 0 for elem in ftlist)
if Flag:
trackingboxes.append(np.array(boxes))
trackingfeats.append(np.array(feats))
boxes, feats = [], []
continue
boxes.append(ftlist[:9])
feats.append(ftlist[9:])
if len(boxes):
trackingboxes.append(np.array(boxes))
trackingfeats.append(np.array(feats))
if len(trackingboxes):
tracking_outboxes = trackingboxes[:1]
tracking_outfeats = trackingfeats[:1]
return trackingboxes, trackingfeats, tracking_outboxes, tracking_outfeats
def read_tracking_output(filepath):
'''
0/1_tracking_output.data 数据读取
'''
boxes = []
feats = []
if not os.path.isfile(filepath):
return boxes, feats
with open(filepath, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip() # 去除行尾的换行符和可能的空白字符
if not line:
continue
if line.find("gift")>0: continue
if line.endswith(','):
line = line[:-1]
data = np.array([float(x) for x in line.split(",")])
if data.size == 9:
boxes.append(data)
if data.size == 256:
feats.append(data)
if len(feats) != len(boxes) or len(boxes)==0:
return [], []
return [np.array(boxes)], [np.array(feats)]
def read_process(filePath):
timeDict = {}
with open(filePath, 'r', encoding='utf-8') as f:
lines = f.readlines()
clines = [line.strip().replace("'", '').replace('"', '') for line in lines]
for i, line in enumerate(clines):
line = line.strip()
if line.endswith(','):
line = line[:-1]
if not line: continue
lnList = line.split(":")
if line.find("eventStart")>=0:
timeDict["eventStart"] = int(lnList[-1])
if line.find("eventEnd")>=0:
timeDict["eventEnd"] = int(lnList[-1])
if line.find("weightValue")>=0:
timeDict["weightValue"] = int(lnList[-1])
return timeDict
def read_similar(filePath):
'''1:n时 Dict['type']字段提取和非全实时不一致,无 "=" 字符 '''
SimiDict = {}
SimiDict['one2one'] = []
SimiDict['one2SN'] = []
SimiDict['one2n'] = []
SimiDict['algroStartToEnd'] = -1
with open(filePath, 'r', encoding='utf-8') as f:
lines = f.readlines()
clean_lines = [line.strip().replace("'", '').replace('"', '') for line in lines]
one2one_list, one2SN_list, one2n_list = [], [], []
Flag_1to1, Flag_1toSN, Flag_1ton = False, False, False
for i, line in enumerate(clean_lines):
line = line.strip()
if line.endswith(','):
line = line[:-1]
Dict = {}
if not line:
if len(one2one_list): SimiDict['one2one'] = one2one_list
if len(one2SN_list): SimiDict['one2SN'] = one2SN_list
if len(one2n_list): SimiDict['one2n'] = one2n_list
one2one_list, one2SN_list, one2n_list = [], [], []
Flag_1to1, Flag_1toSN, Flag_1ton = False, False, False
continue
if line.find("algroStartToEnd")>=0:
alg_during = line.split(':')[1].strip()
SimiDict['algroStartToEnd'] = int(alg_during)
if line.find('oneToOne')>=0:
Flag_1to1, Flag_1toSN, Flag_1ton = True, False,False
continue
if line.find('oneToSN')>=0:
Flag_1to1, Flag_1toSN, Flag_1ton = False, True, False
continue
if line.find('oneTon')>=0:
Flag_1to1, Flag_1toSN, Flag_1ton = False, False, True
continue
if Flag_1to1:
barcode = line.split(',')[0].strip()
value = line.split(',')[1].split(':')[1].strip()
Dict['barcode'] = barcode
Dict['similar'] = float(value)
one2one_list.append(Dict)
continue
if Flag_1toSN:
barcode = line.split(',')[0].strip()
value = line.split(',')[1].split(':')[1].strip()
Dict['barcode'] = barcode
Dict['similar'] = float(value)
one2SN_list.append(Dict)
continue
if Flag_1ton:
label = line.split(':')[0].strip()
value = line.split(':')[1].strip()
Dict['barcode'] = ''
if label.find("_") > 0:
bcd = label.split('_')[-1]
if len(bcd)>=8 and bcd.isdigit():
Dict['barcode'] = bcd
Dict['event'] = label
Dict['similar'] = float(value.split(',')[0])
if value.find("=")>0:
Dict['type'] = value.split('=')[-1]
else:
Dict['type'] = value.split(',')[-1]
one2n_list.append(Dict)
if len(one2one_list): SimiDict['one2one'] = one2one_list
if len(one2n_list): SimiDict['one2n'] = one2n_list
if len(one2SN_list): SimiDict['one2SN'] = one2SN_list
return SimiDict
def read_weight_sensor(filepath):
WeightDict = OrderedDict()
with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines()
clean_lines = [line.strip().replace("'", '').replace('"', '') for line in lines]
for i, line in enumerate(clean_lines):
line = line.strip()
if line.find(':') < 0: continue
if line.find("Weight") >= 0:
label = "Weight"
continue
keyword = line.split(':')[0]
value = line.split(':')[1]
if label == "Weight":
vdata = [float(s) for s in value.split(',') if len(s) and s.isdigit()]
WeightDict[keyword] = vdata[-1]
weights = [(float(t), w) for t, w in WeightDict.items()]
weights = np.array(weights).astype(np.int64)
return weights
def read_weight_timeConsuming(filePth):
WeightDict, SensorDict, ProcessTimeDict = OrderedDict(), OrderedDict(), OrderedDict()
with open(filePth, 'r', encoding='utf-8') as f:
lines = f.readlines()
# label = ''
for i, line in enumerate(lines):
line = line.strip()
if line.find(':') < 0: continue
if line.find("Weight") >= 0:
label = "Weight"
continue
if line.find("Sensor") >= 0:
label = "Sensor"
continue
if line.find("processTime") >= 0:
label = "ProcessTime"
continue
keyword = line.split(':')[0]
value = line.split(':')[1]
if label == "Weight":
WeightDict[keyword] = float(value.strip(','))
if label == "Sensor":
SensorDict[keyword] = [float(s) for s in value.split(',') if len(s)]
if label == "ProcessTime":
ProcessTimeDict[keyword] = float(value.strip(','))
# print("Done!")
return WeightDict, SensorDict, ProcessTimeDict
def read_deletedBarcode_file(filePath):
with open(filePath, 'r', encoding='utf-8') as f:
lines = f.readlines()
split_flag, all_list = False, []
dict, barcode_list, similarity_list = {}, [], []
clean_lines = [line.strip().replace("'", '').replace('"', '') for line in lines]
for i, line in enumerate(clean_lines):
if line.endswith(','):
line = line[:-1]
stripped_line = line.strip()
if not stripped_line:
if len(barcode_list): dict['barcode'] = barcode_list
if len(similarity_list): dict['similarity'] = similarity_list
if len(dict): all_list.append(dict)
split_flag = False
dict, barcode_list, similarity_list = {}, [], []
continue
if line.find(':')<0: continue
label = line.split(':')[0]
value = line.split(':')[1]
if label == 'SeqDir':
dict['SeqDir'] = value
dict['filetype'] = "deletedBarcode"
if label == 'Deleted':
dict['Deleted'] = value
if label == 'List':
split_flag = True
continue
if split_flag:
barcode_list.append(label)
similarity_list.append(value)
if len(barcode_list): dict['barcode'] = barcode_list
if len(similarity_list): dict['similarity'] = similarity_list
if len(dict): all_list.append(dict)
return all_list
def read_returnGoods_file(filePath):
'''
20241030开始原 deletedBarcode.txt 中数据格式修改为 returnGoods.txt读数方式随之变化
'''
with open(filePath, 'r', encoding='utf-8') as f:
lines = f.readlines()
clean_lines = [line.strip().replace("'", '').replace('"', '') for line in lines]
all_list = []
split_flag, dict = False, {}
barcode_list, similarity_list = [], []
event_list, type_list = [], []
for i, line in enumerate(clean_lines):
stripped_line = line.strip()
if line.endswith(','):
line = line[:-1]
if not stripped_line:
if len(barcode_list): dict['barcode'] = barcode_list
if len(similarity_list): dict['similarity'] = similarity_list
if len(event_list): dict['event'] = event_list
if len(type_list): dict['type'] = type_list
if len(dict) and dict['SeqDir'].find('*')<0:
all_list.append(dict)
split_flag, dict = False, {}
barcode_list, similarity_list = [], []
event_list, type_list = [], []
continue
if line.find(':')<0: continue
if line.find('1:n')==0: continue
label = line.split(':')[0].strip()
value = line.split(':')[1].strip()
if label == 'SeqDir':
dict['SeqDir'] = value
dict['Deleted'] = value.split('_')[-1]
dict['filetype'] = "returnGoods"
if label == 'List':
split_flag = True
continue
if split_flag:
bcd = label.split('_')[-1]
if len(bcd)<8: continue
# event_list.append(label + '_' + bcd)
event_list.append(label)
barcode_list.append(bcd)
similarity_list.append(value.split(',')[0])
type_list.append(value.split('=')[-1])
if len(barcode_list): dict['barcode'] = barcode_list
if len(similarity_list): dict['similarity'] = similarity_list
if len(event_list): dict['event'] = event_list
if len(type_list): dict['type'] = type_list
if len(dict) and dict['SeqDir'].find('*')<0:
all_list.append(dict)
return all_list
def analyse_simi(simi_list, one2n_flag=False):
dict_ = {}
# maxSimi_dict = sorted(simi_list, key=lambda x: x['similar'], reverse=True)[0]
for i, simi_dict in enumerate(simi_list):
if one2n_flag:
event = simi_dict['event']
simi = simi_dict['similar']
# print(f"barcode:{barcode}, simi:{simi}")
dict_[event] = simi
else:
barcode = simi_dict['barcode']
simi = simi_dict['similar']
# print(f"barcode:{barcode}, simi:{simi}")
dict_[barcode] = simi
return dict_
def get_process_csv_data(csv_path, event_path):
dict_data = {}
event = event_path.name
process_file = event_path / 'process.data'
# print('**********process_file', process_file)
simidct = read_similar(process_file)
# print('simidct', simidct)
###========================解析每个相似度======================
one2one_list = simidct['one2one']
one2SN_list = simidct['one2SN']
one2n_list = simidct['one2n']
# print('one2one_list', one2one_list)
if len(one2one_list):
one2one_dict = analyse_simi(one2one_list)
dict_data['one2one'] = str(list(one2one_dict.values())[0])
# print('one2one_dict', one2one_dict)
if len(one2SN_list):
one2SN_dict = analyse_simi(one2SN_list)
dict_data['one2SN'] = one2SN_dict
# print('one2SN_dict', one2SN_dict)
if len(one2n_list):
one2n_dict = analyse_simi(one2n_list, one2n_flag=True)
dict_data['one2n'] = one2n_dict
# print('one2n_dict', one2n_dict)
algroStartToEnd = str(simidct['algroStartToEnd'])
dict_data['algroStartToEnd'] = algroStartToEnd
###=========================================================
####人工标注初始化excel文件中已有
# barcode = read_csv(csv_path, event)
# dict_data['barcode'] = barcode
return dict_data
def plot_sensor_curve(WeightDict, SensorDict, ProcessTimeDict):
wtime, wdata = [], []
stime, sdata = [], []
for key, value in WeightDict.items():
wtime.append(int(key))
wdata.append(value)
for key, value in SensorDict.items():
if len(value) != 9: continue
stime.append(int(key))
sdata.append(np.array(value))
static_range = []
dynamic_range = []
windth = 8
nw = len(wdata)
assert(nw) >= 8, "The num of weight data is less than 8!"
# i1, i2 = 0, 7
# while i2 < nw:
# data = wdata[i1:(i2+1)]
# max(data) - min(data)
# if i2<7:
# i1 = 0
# else:
# i1 = i2-windth
min_t = min(wtime + stime)
wtime = [t-min_t for t in wtime]
stime = [t-min_t for t in stime]
max_t = max(wtime + stime)
fig = plt.figure(figsize=(16, 12))
gs = fig.add_gridspec(2, 1, left=0.1, right=0.9, bottom=0.1, top=0.9,
wspace=0.05, hspace=0.15)
# ax1, ax2 = axs
ax1 = fig.add_subplot(gs[0,0])
ax2 = fig.add_subplot(gs[1,0])
ax1.plot(wtime, wdata, 'b--', linewidth=2 )
for i in range(9):
ydata = [s[i] for s in sdata]
ax2.plot(stime, ydata, linewidth=2 )
ax1.grid(True), ax1.set_xlim(0, max_t), ax1.set_title('Weight')
ax1.set_label("(Time: ms)")
# ax1.legend()
ax2.grid(True), ax2.set_xlim(0, max_t), ax2.set_title('IMU')
# ax2.legend()
plt.show()
def test_process(file_path):
WeightDict, SensorDict, ProcessTimeDict = read_weight_timeConsuming(file_path)
plot_sensor_curve(WeightDict, SensorDict, ProcessTimeDict)
def main():
files_path = r'\\192.168.1.28\share\测试_202406\0814\0814\20240814-102227-62264578-a720-4eb9-b95e-cb8be009aa98_null'
k = 0
for filename in os.listdir(files_path):
filename = 'process.data'
file_path = os.path.join(files_path, filename)
if os.path.isfile(file_path) and filename.find("track.data")>0:
extract_data(file_path)
if os.path.isfile(file_path) and filename.find("process.data")>=0:
test_process(file_path)
k += 1
if k == 1:
break
def main1():
fpath = r'\\192.168.1.28\share\测试视频数据以及日志\各模块测试记录\比对测试\1209永辉超市测试\20241209-155924-117e1941-70f8-4287-8de1-4866868548a6_6926475209967\process.data'
simidct = read_similar(fpath)
print(simidct)
if __name__ == "__main__":
main1()

View File

@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
"""
Created on Tue May 21 15:25:23 2024
读取 Pipeline 各模块的数据,主代码由 马晓慧 完成
@author: ieemoo-zl003
"""
import os
import numpy as np
# 替换为你的目录路径
files_path = 'D:/contrast/dataset/1_to_n/709/20240709-112658_6903148351833/'
def str_to_float_arr(s):
# 移除字符串末尾的逗号(如果存在)
if s.endswith(','):
s = s[:-1]
# 使用split()方法分割字符串然后将每个元素转化为float
float_array = np.array([float(x) for x in s.split(",")])
return float_array
def extract_tracker_input_boxes_feats(file_name):
boxes = []
feats = []
with open(file_name, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip() # 去除行尾的换行符和可能的空白字符
# 跳过空行
if not line:
continue
# 检查是否以'box:'或'feat:'开始
if line.find("box:") >= 0 and line.find("output_box:") < 0:
box = line[line.find("box:") + 4:].strip()
boxes.append(str_to_float_arr(box)) # 去掉'box:'并去除可能的空白字符
if line.find("feat:") >= 0:
feat = line[line.find("feat:") + 5:].strip()
feats.append(str_to_float_arr(feat)) # 去掉'box:'并去除可能的空白字符
return np.array(boxes), np.array(feats)
def find_string_in_array(arr, target):
"""
在字符串数组中找到目标字符串对应的行(索引)。
参数:
arr -- 字符串数组
target -- 要查找的目标字符串
返回:
目标字符串在数组中的索引。如果未找到,则返回-1。
"""
tg = [float(t) for k, t in enumerate(target.split(',')) if k<4][:4]
for i, st in enumerate(arr):
st = [float(s) for k, s in enumerate(target.split(',')) if k<4][:4]
if st == tg:
return i
# if st[:20] == target[:20]:
# return i
return -1
def find_samebox_in_array(arr, target):
for i, st in enumerate(arr):
if all(st[:4] == target[:4]):
return i
return -1
def extract_tracker_output_boxes_feats(read_file_name):
input_boxes, input_feats = extract_tracker_input_boxes_feats(read_file_name)
boxes = []
feats = []
with open(read_file_name, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip() # 去除行尾的换行符和可能的空白字符
# 跳过空行
if not line:
continue
# 检查是否以'output_box:'开始
if line.find("output_box:") >= 0:
box = str_to_float_arr(line[line.find("output_box:") + 11:].strip())
boxes.append(box) # 去掉'output_box:'并去除可能的空白字符
index = find_samebox_in_array(input_boxes, box)
if index >= 0:
# feat_f = str_to_float_arr(input_feats[index])
feat_f = input_feats[index]
norm_f = np.linalg.norm(feat_f)
feat_f = feat_f / norm_f
feats.append(feat_f)
return input_boxes, input_feats, np.array(boxes), np.array(feats)
def extract_tracking_output_boxes_feats(read_file_name):
tracker_boxes, tracker_feats, input_boxes, input_feats = extract_tracker_output_boxes_feats(read_file_name)
boxes = []
feats = []
tracking_flag = False
with open(read_file_name, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip() # 去除行尾的换行符和可能的空白字符
# 跳过空行
if not line:
continue
if tracking_flag:
if line.find("tracking_") >= 0:
tracking_flag = False
else:
box = str_to_float_arr(line)
boxes.append(box)
index = find_samebox_in_array(input_boxes, box)
if index >= 0:
feats.append(input_feats[index])
# 检查是否以tracking_'开始
if line.find("tracking_") >= 0:
tracking_flag = True
assert(len(tracker_boxes)==len(tracker_feats)), "Error at Yolo output"
assert(len(input_boxes)==len(input_feats)), "Error at tracker output"
assert(len(boxes)==len(feats)), "Error at tracking output"
return tracker_boxes, tracker_feats, input_boxes, input_feats, np.array(boxes), np.array(feats)
def read_tracking_input(datapath):
with open(datapath, 'r') as file:
lines = file.readlines()
data = []
for line in lines:
data.append([s for s in line.split(',') if len(s)>=3])
# data.append([float(s) for s in line.split(',') if len(s)>=3])
# data = np.array(data, dtype = np.float32)
try:
data = np.array(data, dtype = np.float32)
except Exception as e:
data = np.array([], dtype = np.float32)
print('DataError for func: read_tracking_input()')
return data
def read_tracker_input(datapath):
with open(datapath, 'r') as file:
lines = file.readlines()
Videos = []
FrameBoxes, FrameFeats = [], []
boxes, feats = [], []
timestamp = []
t1 = None
for line in lines:
if line.find('CameraId') >= 0:
t = int(line.split(',')[1].split(':')[1])
timestamp.append(t)
if len(boxes) and len(feats):
FrameBoxes.append(np.array(boxes, dtype = np.float32))
FrameFeats.append(np.array(feats, dtype = np.float32))
boxes, feats = [], []
if t1 and t - t1 > 1e3:
Videos.append((FrameBoxes, FrameFeats))
FrameBoxes, FrameFeats = [], []
t1 = int(line.split(',')[1].split(':')[1])
if line.find('box') >= 0:
box = line.split(':', )[1].split(',')[:-1]
boxes.append(box)
if line.find('feat') >= 0:
feat = line.split(':', )[1].split(',')[:-1]
feats.append(feat)
FrameBoxes.append(np.array(boxes, dtype = np.float32))
FrameFeats.append(np.array(feats, dtype = np.float32))
Videos.append((FrameBoxes, FrameFeats))
# TimeStamp = np.array(timestamp, dtype = np.int64)
# DimesDiff = np.diff((TimeStamp))
# sorted_indices = np.argsort(TimeStamp)
# TimeStamp_sorted = TimeStamp[sorted_indices]
# DimesDiff_sorted = np.diff((TimeStamp_sorted))
return Videos
def main():
files_path = 'D:/contrast/dataset/1_to_n/709/20240709-112658_6903148351833/'
# 遍历目录下的所有文件和目录
for filename in os.listdir(files_path):
# 构造完整的文件路径
file_path = os.path.join(files_path, filename)
if os.path.isfile(file_path) and filename.find("track.data")>0:
tracker_boxes, tracker_feats, tracking_boxes, tracking_feats, output_boxes, output_feats = extract_tracking_output_boxes_feats(file_path)
print("Done")
if __name__ == "__main__":
main()

35
tracking/utils/rename.py Normal file
View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
"""
Created on Sat Jun 8 09:51:59 2024
@author: ym
"""
import os
def main():
directory = r'D:\DetectTracking\runs\detect'
directory = r'D:\DetectTracking\tracking\result\tracks'
suffix = '_'
for root, dirs, files in os.walk(directory):
for name in dirs:
old_name = os.path.join(root, name)
new_name = os.path.join(root, f"{name}{suffix}")
try:
os.rename(old_name, new_name)
except Exception as e:
print(f"Failed to rename directory '{old_name}': {e}")
for name in files:
old_name = os.path.join(root, name)
file, ext = os.path.splitext(name)
new_name = os.path.join(root, f"{file}{suffix}{ext}")
try:
os.rename(old_name, new_name)
except Exception as e:
print(f"Failed to rename file '{old_name}': {e}")
if __name__ == "__main__":
main()

361
tracking/utils/showtrack.py Normal file
View File

@ -0,0 +1,361 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 5 10:01:11 2023
@author: ym
"""
import numpy as np
import os
import cv2
import sys
from scipy.spatial.distance import cdist
import matplotlib
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
# from ultralytics.utils.plotting import Annotator, colors
from .annotator import TrackAnnotator
# from .processboxes import array2list
# boxes Format: [x1, y1, x2, y2, track_id, score, cls, frame_index]
pth = r"D:/DeepLearning/yolov5/tracking/"
colors = np.array([[255, 255, 255], [255, 255, 255], [255, 255, 255], [255, 255, 255], [255, 255, 255],
[0, 0, 255], [0, 255, 0], [255, 51, 255], [102, 178, 255], [51, 153, 255],
[255, 153, 153], [255, 102, 102], [255, 51, 51], [153, 255, 153], [102, 255, 102],
[51, 255, 51], [255, 102, 255], [153, 204, 255], [255, 0, 0], [255, 255, 255]],
dtype=np.uint8)
def array2list(bboxes):
'''
将 bboxes 变换为 track 列表
bboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
Return
lboxes列表列表中元素具有同一 track_idx1y1x2y2 格式
[x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
'''
tids = set(bboxes[:, 4])
track_ids = bboxes[:, 4].astype(int)
lboxes = []
for t_id in tids:
# print(f"The ID is: {t_id}")
idx = np.where(track_ids == t_id)[0]
box = bboxes[idx, :]
lboxes.append(box)
return lboxes
def draw5points(bboxes, file):
"""
显示中心点、4角点的轨迹以及轨迹 features
"""
image = cv2.imread(pth + r"/shopcart/cart_tempt/edgeline.png")
imgx = image.copy()
annotator = TrackAnnotator(imgx, line_width=2)
lboxes = array2list(bboxes)
for k in range(len(lboxes)):
boxes = lboxes[k]
cls = int(boxes[0, 6])
tid = int(boxes[0, 4])
# print(tid)
frnum = boxes.shape[0]
cornpoints = np.zeros((frnum, 10))
cornpoints[:,0] = (boxes[:, 0] + boxes[:, 2]) / 2
cornpoints[:,1] = (boxes[:, 1] + boxes[:, 3]) / 2
cornpoints[:,2], cornpoints[:,3] = boxes[:, 0], boxes[:, 1]
cornpoints[:,4], cornpoints[:,5] = boxes[:, 2], boxes[:, 1]
cornpoints[:,6], cornpoints[:,7] = boxes[:, 0], boxes[:, 3]
cornpoints[:,8], cornpoints[:,9] = boxes[:, 2], boxes[:, 3]
x1, y1, x2, y2 = cornpoints[:,2],cornpoints[:,3],cornpoints[:,8],cornpoints[:,9]
BoundPixel = 10
BoundThresh = 0.4
cont1 = sum(abs(x1)<BoundPixel) / frnum > BoundThresh
cont2 = sum(abs(y1)<BoundPixel) / frnum > BoundThresh
cont3 = sum(abs(x2-1024)<BoundPixel) / frnum > BoundThresh
cont4 = sum(abs(y2-1280)<BoundPixel) / frnum > BoundThresh
isImgBorder = False
if cont1 or cont2 or cont3 or cont4:
isImgBorder = True
# =============================================================================
# '''情况1: 在商品运动过程中,商品检测框始终左下角点和图像左下角点重合, 用中心点代替'''
# lfcn_dist = np.linalg.norm(cornpoints[:, 6:8] - [0, 1280], axis=1)
# idx1 = lfcn_dist<10
# if sum(idx1)/frnum > 0.5:
# cornpoints[:, 6:8] = cornpoints[:, 0:2]
#
# '''情况2: 在商品运动过程中,商品检测框始终右下角点和图像右下角点重合, 用中心点代替'''
# rtcn_dist = np.linalg.norm(cornpoints[:, 8:10] - [1024, 1280], axis=1)
# idx2 = rtcn_dist<10
# if sum(idx2)/frnum > 0.5:
# cornpoints[:, 8:10] = cornpoints[:, 0:2]
# =============================================================================
mwh = (np.mean(boxes[:, 2]) + np.mean(boxes[:, 3]))/2
trajectory = []
trajlens = []
trajdist = []
for k in range(5):
traj = np.linalg.norm(np.diff(cornpoints[:, 2*k:2*(k+1)], axis = 0), axis=1)
trajlen = np.sum(traj)
ptdist = np.max(cdist(cornpoints[:, 2*k:2*(k+1)], cornpoints[:, 2*k:2*(k+1)]))
trajectory.append(traj)
trajlens.append(trajlen)
trajdist.append(ptdist)
if not isImgBorder:
idx = trajlens.index(min(trajlens))
trajmin = trajectory[idx]
trajlen_min = min(trajlens)
trajdist_min = min(trajdist)
else:
trajmin = trajectory[0]
trajlen_min = trajlens[0]
trajdist_min = trajdist[0]
'''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小'''
trajlen_rate = trajlen_min/(max(trajlens)+0.0001)
'''最小轨迹欧氏距离/目标框尺度均值'''
trajdist_rate = trajdist_min/(mwh+0.0001)
# idx = trajlens.index(min(trajlens))
# trajmin = trajectory[idx]
# '''最小轨迹长度/最大轨迹长度,越小,代表运动幅度越小'''
# trajlen_rate = min(trajlens)/(max(trajlens)+0.0001)
# '''最小轨迹欧氏距离,越小,代表运动幅度越小'''
# trajdist_min = min(trajdist)
# '''最小轨迹欧氏距离 / 目标框尺度均值'''
# mindist_rate = min(trajdist)/(mwh+0.0001)
img = image.copy()
for i in range(boxes.shape[0]):
cv2.circle(img, (int(cornpoints[i, 0]), int(cornpoints[i, 1])), 6, (255, 255, 255), 2)
cv2.circle(img, (int(cornpoints[i, 2]), int(cornpoints[i, 3])), 6, (255, 0, 255), 2)
cv2.circle(img, (int(cornpoints[i, 4]), int(cornpoints[i, 5])), 6, (0, 255, 0), 2)
cv2.circle(img, (int(cornpoints[i, 6]), int(cornpoints[i, 7])), 6, (64, 128, 255), 2)
cv2.circle(img, (int(cornpoints[i, 8]), int(cornpoints[i, 9])), 6, (255, 128, 64), 2)
# if frnum>=3:
# cntpoints = cornpoints[:, 0:2].astype(np.int64)
# rect = cv2.minAreaRect(cntpoints)
# box = cv2.boxPoints(rect)
# box = np.int0(box)
# cv2.drawContours(img, [box], 0, (255, 0, 0), 2)
# img1 = image.copy()
# for i in range(boxes.shape[0]-1):
# pt1 = cornpoints[i, :].astype(np.int64)
# pt2 = cornpoints[i+1, :].astype(np.int64)
# cv2.line(img1, pt1, pt2, color=(255, 255, 255), thickness=2)
# gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
# _, binary = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
# contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
color = ((0, 0, 255), (255, 128, 0))
label_6 = "PCA(singular_values_) : "
label_7 = "Rect : "
if frnum>=3:
if isImgBorder:
X = cornpoints[:, 0:2]
else:
X = cornpoints[:, 2*idx:2*(idx+1)]
pca = PCA()
pca.fit(X)
label_6 = "PCA(variance_ratio) : {:.2f}".format(pca.explained_variance_ratio_[0])
for i, (comp, var) in enumerate(zip(pca.components_, pca.explained_variance_ratio_)):
pt1 = (pca.mean_ - comp*var*200).astype(np.int64)
pt2 = (pca.mean_ + comp*var*200).astype(np.int64)
cv2.line(img, pt1, pt2, color=color[i], thickness=2)
rect = cv2.minAreaRect(X.astype(np.int64))
box = cv2.boxPoints(rect)
box = np.int0(box)
cv2.drawContours(img, [box], 0, (0, 255, 0), 2)
label_7 = "Rect W&H&Ratio: {}, {}, {:.2f}".format(int(rect[1][0]), int(rect[1][1]), min(rect[1])/(max(rect[1])+0.001))
'''撰写专利需要,生成黑白图像'''
# imgbt = cv2.bitwise_not(img)
# for i in range(box.shape[0]):
# cv2.circle(imgbt, (int(cornpoints[i, 0]), int(cornpoints[i, 1])), 14, (0, 0, 0), 2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 2]), int(cornpoints[i, 3])), color= (0, 0, 0), markerType=3, markerSize = 30, thickness=2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 4]), int(cornpoints[i, 5])), color= (0, 0, 0), markerType=4, markerSize = 30, thickness=2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 6]), int(cornpoints[i, 7])), color= (0, 0, 0), markerType=5, markerSize = 30, thickness=2)
# cv2.drawMarker(imgbt, (int(cornpoints[i, 8]), int(cornpoints[i, 9])), color= (0, 0, 0), markerType=6, markerSize = 30, thickness=2)
# cv2.imwrite(pth + f"/zhuanli/{file}_{tid}.png", imgbt)
if len(trajmin):
trajstd = np.std(trajmin)
else:
trajstd = 0
trajlens = [int(t) for t in trajlens]
trajdist = [int(t) for t in trajdist]
label_1 = f"trajlens: {trajlens}, trajlen_min: {int(trajlen_min)}"
label_2 = f"trajdist: {trajdist}: trajdist_min: {int(trajdist_min)}"
label_3 = "trajlen_min/max(trajlens): {:.2f}/{} = {:.2f}".format(trajlen_min, max(trajlens), trajlen_rate)
label_4 = "trajdist_min/mwh : {:.2f}/{} = {:.2f}".format(trajdist_min, int(mwh), trajdist_rate)
label_5 = "std(trajmin) : {:.2f}".format(trajstd)
label = [label_1, label_2, label_3, label_4, label_5, label_6, label_7]
word = 'abc'
w, h = cv2.getTextSize('abc', 0, fontScale=2, thickness=1)[0]
for i in range(len(label)):
# color = [int(x) for x in colors[i]]
cv2.putText(img,
label[i],
(20, int(50+(i+1)*1.2*h)),
0,
1,
[int(x) for x in colors[i]],
2,
lineType=cv2.LINE_AA)
cv2.imwrite(pth + f"/result/cls11_80212/{file}_{tid}.png", img)
def drawtracks(bboxes, imgshow=None):
"""
Inputs
bboxes: 原始检测跟踪后的结果,变换为 tboxes
image只用于获取图像的Width, Height
Outputs:
imgshow
"""
if imgshow is None:
edgeline = cv2.imread(pth + r"/shopcart/cart_tempt/edgeline.png")
# edgeline = cv2.bitwise_not(edgeline)
H, W = edgeline.shape[0:2]
imgshow= np.zeros((H, W, 3), np.uint8)
if 'edgeline' in locals().keys():
imgshow = cv2.add(imgshow, edgeline)
## ==== list其中元素格式: [x, y, w, h, track_id, score, cls, frame_index]
tboxes = array2list(bboxes)
# imgshow = cv2.bitwise_not(imgshow)
annotator = TrackAnnotator(imgshow, line_width=2)
for boxes in tboxes:
annotator.plotting_track(boxes)
imgshow = annotator.result()
return imgshow
def writefilename():
npydir = r"D:\DeepLearning\yolov5\runs\boxes"
files = [f.split('.')[0] for f in os.listdir(npydir)]
with open('data.txt', 'w') as f:
[f.write(f"{file}:\n") for file in files]
print("len(files)")
# for filename in os.listdir(npydir):
# file, ext = os.path.splitext(filename)
def main():
npydir = r"D:\DeepLearning\yolov5\runs\boxes"
k = 0
fields = []
for filename in os.listdir(npydir):
# filename = "加购_快速置入_12.npy"
print(filename)
file, ext = os.path.splitext(filename)
filepath = os.path.join(npydir, filename)
try:
bboxes = np.load(filepath)
imgshow = drawtracks(bboxes, file)
draw5points(bboxes, file)
cv2.imwrite(pth + f"/result/cls11_80212/{file}_show.png", imgshow)
except Exception as e:
# print(str(e))
pass
# k += 1
# if k == 1:
# break
if __name__ == "__main__":
main()
# writefilename()

111
tracking/utils/videot.py Normal file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 20 14:10:09 2023
@author: ym
"""
import numpy as np
import os
import cv2
# from pathlib import Path
# import math
# import sys
# from scipy.spatial.distance import cdist
VideoFormat = ['.mp4', '.avi', '.ts']
def video2imgs(videof, imgdir):
cap = cv2.VideoCapture(videof)
i = 0
while True:
ret, frame = cap.read()
if not ret:
break
imgp = os.path.join(imgdir, f"{i}.png")
i += 1
cv2.imwrite(imgp, frame)
if i == 400:
break
cap.release()
print(os.path.basename(videof) + f" haved resolved")
def videosave(bboxes, videopath="100_1688009697927.mp4"):
cap = cv2.VideoCapture(videopath)
fps = int(cap.get(cv2.CAP_PROP_FPS)) # integer required, floats produce error in MP4 codec
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
## =========================================== 在当前模块地址下存储图像和视频
path = os.path.split(os.path.realpath(__file__))[0]
_, filename = os.path.split(videopath)
file, ext = os.path.splitext(filename)
## ======================================================== 视频保存设置
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
save_video_path = os.path.join(path, "{}_show_1.mp4".format(file))
vid_writer = cv2.VideoWriter(save_video_path, fourcc, fps, (width, height))
## ======================================================== 图像保存路径设置
save_img_path = os.path.join(path, "{}_show".format(file))
if not os.path.exists(save_img_path):
os.makedirs(save_img_path)
cout = 0
while cap.isOpened():
ret, frame = cap.read()
if ret:
idx = np.where(bboxes[:, 7] == cout)[0]
box = bboxes[idx, 0:4].astype(int)
for i in range(box.shape[0]):
x1, y1 = box[i, :2]
x2, y2 = box[i, 2:4]
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 125, 255), 2)
cv2.imwrite(os.path.join(save_img_path, "{}.png".format(cout)), frame)
vid_writer.write(frame)
cout += 1
else:
print("end!!!!!!!!!!!!!!!!!!!")
break
vid_writer.release()
cap.release()
def main():
videopath = r'D:\videos'
savepath = r'D:\videos'
# video2imgs(videopath, savepath)
k = 0
for filename in os.listdir(videopath):
# filename = "20240929-155533.ts"
file, ext = os.path.splitext(filename)
if ext not in VideoFormat:
continue
basename = os.path.basename(videopath)
imgbase = basename + '-&-' + file
imgdir = os.path.join(savepath, imgbase)
if not os.path.exists(imgdir):
os.mkdir(imgdir)
videof = os.path.join(videopath, filename)
video2imgs(videof, imgdir)
# k += 1
# if k == 1:
# break
if __name__ == '__main__':
main()