Files
ieemoo-ai-detecttracking/process_addWrite.py
2025-04-27 16:45:11 +08:00

542 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Sun Sep 29 08:59:21 2024
@author: ym
"""
import os
# import sys
import cv2
import pickle
import numpy as np
from pathlib import Path
from scipy.spatial.distance import cdist
from track_reid import yolo_resnet_tracker, yolov10_resnet_tracker
from tracking.dotrack.dotracks_back import doBackTracks
from tracking.dotrack.dotracks_front import doFrontTracks
from tracking.utils.drawtracks import plot_frameID_y2, draw_all_trajectories
from utils.getsource import get_image_pairs, get_video_pairs
from tracking.utils.read_data import read_similar, get_process_csv_data
from openpyxl import Workbook, load_workbook
def save_subimgs(imgdict, boxes, spath, ctype, featdict = None):
'''
当前 box 特征和该轨迹前一个 box 特征的相似度,可用于和跟踪序列中的相似度进行比较
'''
boxes = boxes[np.argsort(boxes[:, 7])]
for i in range(len(boxes)):
simi = None
tid, fid, bid = int(boxes[i, 4]), int(boxes[i, 7]), int(boxes[i, 8])
if i>0:
_, fid0, bid0 = int(boxes[i-1, 4]), int(boxes[i-1, 7]), int(boxes[i-1, 8])
if f"{fid0}_{bid0}" in featdict.keys() and f"{fid}_{bid}" in featdict.keys():
feat0 = featdict[f"{fid0}_{bid0}"]
feat1 = featdict[f"{fid}_{bid}"]
simi = 1 - np.maximum(0.0, cdist(feat0[None, :], feat1[None, :], "cosine"))[0][0]
img = imgdict[f"{fid}_{bid}"]
imgpath = spath / f"{ctype}_tid{tid}-{fid}-{bid}.png"
if simi is not None:
imgpath = spath / f"{ctype}_tid{tid}-{fid}-{bid}_sim{simi:.2f}.png"
cv2.imwrite(imgpath, img)
def save_subimgs_1(imgdict, boxes, spath, ctype, simidict = None):
'''
当前 box 特征和该轨迹 smooth_feat 特征的相似度, yolo_resnet_tracker 函数中,
采用该方式记录特征相似度
'''
for i in range(len(boxes)):
tid, fid, bid = int(boxes[i, 4]), int(boxes[i, 7]), int(boxes[i, 8])
key = f"{fid}_{bid}"
img = imgdict[key]
imgpath = spath / f"{ctype}_tid{tid}-{fid}-{bid}.png"
if simidict is not None and key in simidict.keys():
imgpath = spath / f"{ctype}_tid{tid}-{fid}-{bid}_sim{simidict[key]:.2f}.png"
cv2.imwrite(imgpath, img)
def show_result(eventpath, event_tracks, yrtDict, savepath_pipe):
'''保存 Tracking 输出的运动轨迹子图,并记录相似度'''
savepath_pipe_subimgs = savepath_pipe / Path("subimgs")
if not savepath_pipe_subimgs.exists():
savepath_pipe_subimgs.mkdir(parents=True, exist_ok=True)
for CamerType, vts in event_tracks:
if len(vts.tracks)==0: continue
if CamerType == 'front':
# yolos = ShoppingDict["frontCamera"]["yoloResnetTracker"]
yolos = yrtDict["frontyrt"]
ctype = 1
if CamerType == 'back':
# yolos = ShoppingDict["backCamera"]["yoloResnetTracker"]
yolos = yrtDict["backyrt"]
ctype = 0
imgdict, featdict, simidict = {}, {}, {}
for y in yolos:
imgdict.update(y["imgs"])
featdict.update(y["feats"])
simidict.update(y["featsimi"])
for track in vts.Residual:
if isinstance(track, np.ndarray):
save_subimgs(imgdict, track, savepath_pipe_subimgs, ctype, featdict)
else:
save_subimgs(imgdict, track.slt_boxes, savepath_pipe_subimgs, ctype, featdict)
'''(3) 轨迹显示与保存'''
illus = [None, None]
for CamerType, vts in event_tracks:
if len(vts.tracks)==0: continue
if CamerType == 'front':
edgeline = cv2.imread("./tracking/shopcart/cart_tempt/board_ftmp_line.png")
h, w = edgeline.shape[:2]
# nh, nw = h//2, w//2
# edgeline = cv2.resize(edgeline, (nw, nh), interpolation=cv2.INTER_AREA)
img_tracking = draw_all_trajectories(vts, edgeline, savepath_pipe, CamerType, draw5p=True)
illus[0] = img_tracking
plt = plot_frameID_y2(vts)
'''==========yj callbackdata========='''
plt.savefig(os.path.join(eventpath, "front_y2.png"))
'''========================================'''
if CamerType == 'back':
edgeline = cv2.imread("./tracking/shopcart/cart_tempt/edgeline.png")
h, w = edgeline.shape[:2]
# nh, nw = h//2, w//2
# edgeline = cv2.resize(edgeline, (nw, nh), interpolation=cv2.INTER_AREA)
img_tracking = draw_all_trajectories(vts, edgeline, savepath_pipe, CamerType, draw5p=True)
illus[1] = img_tracking
illus = [im for im in illus if im is not None]
if len(illus):
img_cat = np.concatenate(illus, axis = 1)
if len(illus)==2:
H, W = img_cat.shape[:2]
cv2.line(img_cat, (int(W/2), 0), (int(W/2), int(H)), (128, 128, 255), 3)
'''==========yj callbackdata========='''
trajpath = os.path.join(eventpath, "trajectory.png")
# trajpath = os.path.join(savepath_pipe, "trajectory.png")
'''======================================'''
cv2.imwrite(trajpath, img_cat)
def pipeline(dict_data,
pickle_exist,
eventpath,
SourceType,
weights,
DataType = "raw", #raw, pkl: images or videos, pkl, pickle file
YoloVersion="V5",
savepath = None,
saveimages = True,
):
## 构造购物事件字典
evtname = Path(eventpath).stem
barcode = evtname.split('_')[-1] if len(evtname.split('_'))>=2 \
and len(evtname.split('_')[-1])>=8 \
and evtname.split('_')[-1].isdigit() else ''
'''事件结果存储文件夹: savepath_pipe, savepath_pkl'''
if not savepath:
savepath = Path(__file__).resolve().parents[0] / "events_result"
savepath_pipe = Path(savepath) / Path("yolos_tracking") / evtname
savepath_pkl = Path(savepath) / "shopping_pkl"
if not savepath_pkl.exists():
savepath_pkl.mkdir(parents=True, exist_ok=True)
pklpath = Path(savepath_pkl) / Path(str(evtname)+".pickle")
yrt_out = []
if DataType == "raw":
if not pickle_exist:
### 不重复执行已经过yolo-resnet-tracker
if pklpath.exists():
print(f"Pickle file have saved: {evtname}.pickle")
return
if SourceType == "video":
vpaths = get_video_pairs(eventpath)
elif SourceType == "image":
vpaths = get_image_pairs(eventpath)
for vpath in vpaths:
'''================= 2. 事件结果存储文件夹 ================='''
if isinstance(vpath, list):
savepath_pipe_imgs = savepath_pipe / Path("images")
else:
savepath_pipe_imgs = savepath_pipe / Path(str(Path(vpath).stem))
if not savepath_pipe_imgs.exists():
savepath_pipe_imgs.mkdir(parents=True, exist_ok=True)
optdict = {}
optdict["weights"] = weights
optdict["source"] = vpath
optdict["save_dir"] = savepath_pipe_imgs
optdict["is_save_img"] = saveimages
optdict["is_save_video"] = True
if YoloVersion == "V5":
yrtOut = yolo_resnet_tracker(**optdict)
elif YoloVersion == "V10":
yrtOut = yolov10_resnet_tracker(**optdict)
yrt_out.append((vpath, yrtOut))
elif DataType == "pkl":
pass
else:
return
'''====================== 构造 ShoppingDict 模块 ======================='''
ShoppingDict = {"eventPath": eventpath,
"eventName": evtname,
"barcode": barcode,
"eventType": '', # "input", "output", "other"
"frontCamera": {},
"backCamera": {},
"one2n": [] #
}
procpath = Path(eventpath).joinpath('process.data')
if procpath.is_file():
SimiDict = read_similar(procpath)
ShoppingDict["one2n"] = SimiDict['one2n']
yrtDict = {}
event_tracks = []
for vpath, yrtOut in yrt_out:
'''================= 1. 构造相机事件字典 ================='''
CameraEvent = {"cameraType": '', # "front", "back"
"videoPath": '',
"imagePaths": [],
"yoloResnetTracker": [],
"tracking": [],
}
if isinstance(vpath, list):
CameraEvent["imagePaths"] = vpath
bname = os.path.basename(vpath[0])
if not isinstance(vpath, list):
CameraEvent["videoPath"] = vpath
bname = os.path.basename(vpath).split('.')[0]
if bname.split('_')[0] == "0" or bname.find('back')>=0:
CameraEvent["cameraType"] = "back"
if bname.split('_')[0] == "1" or bname.find('front')>=0:
CameraEvent["cameraType"] = "front"
'''2种保存方式: (1) no save subimg, (2) save img'''
###(1) save images
yrtOut_save = []
for frdict in yrtOut:
fr_dict = {}
for k, v in frdict.items():
if k != "imgs":
fr_dict[k]=v
yrtOut_save.append(fr_dict)
CameraEvent["yoloResnetTracker"] = yrtOut_save
###(2) no save images
# CameraEvent["yoloResnetTracker"] = yrtOut
'''================= 4. tracking ================='''
'''(1) 生成用于 tracking 模块的 boxes、feats'''
bboxes = np.empty((0, 6), dtype=np.float64)
trackerboxes = np.empty((0, 9), dtype=np.float64)
trackefeats = {}
for frameDict in yrtOut:
tboxes = frameDict["tboxes"]
ffeats = frameDict["feats"]
boxes = frameDict["bboxes"]
bboxes = np.concatenate((bboxes, np.array(boxes)), axis=0)
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
for i in range(len(tboxes)):
fid, bid = int(tboxes[i, 7]), int(tboxes[i, 8])
trackefeats.update({f"{fid}_{bid}": ffeats[f"{fid}_{bid}"]})
'''(2) tracking, 后摄'''
if CameraEvent["cameraType"] == "back":
vts = doBackTracks(trackerboxes, trackefeats)
vts.classify()
event_tracks.append(("back", vts))
CameraEvent["tracking"] = vts
ShoppingDict["backCamera"] = CameraEvent
'''====yj callbackdata======='''
back_cnts = len(vts.Residual)
dict_data['后摄轨迹数'] = back_cnts
print(f"back_cnts: {back_cnts}")
'''=============================='''
yrtDict["backyrt"] = yrtOut
'''(2) tracking, 前摄'''
if CameraEvent["cameraType"] == "front":
vts = doFrontTracks(trackerboxes, trackefeats)
vts.classify()
event_tracks.append(("front", vts))
CameraEvent["tracking"] = vts
ShoppingDict["frontCamera"] = CameraEvent
'''====yj callbackdata======='''
front_cnts = len(vts.Residual)
dict_data['前摄轨迹数'] = front_cnts
print(f"front_cnts: {front_cnts}")
'''=============================='''
yrtDict["frontyrt"] = yrtOut
'''========================== 保存模块 ================================='''
# 保存 ShoppingDict
with open(str(pklpath), 'wb') as f:
pickle.dump(ShoppingDict, f)
# 绘制并保存轨迹图
show_result(eventpath, event_tracks, yrtDict, savepath_pipe)
return dict_data
class WriteExcel:
def is_excel(self, input_excel):
# 若文件存在,加载工作簿
wb = load_workbook(input_excel)
sheet_name = wb.sheetnames[0] ##默认回传分析表只有一个sheet
# 获取活动工作表
ws = wb.active
sheet = wb[sheet_name]
##确定新增列的位置
# new_col_index = sheet.max_column
return wb, ws, sheet
def init_excel(self, input_excel, output_excel, headers, max_col=13):
if os.path.exists(output_excel):
wb, ws, sheet = self.is_excel(output_excel)
return wb, ws, sheet
elif os.path.exists(input_excel):
wb, ws, sheet = self.is_excel(input_excel)
self.add_header(wb, sheet, max_col, headers, output_excel)
return wb, ws, sheet
else:
raise FileNotFoundError(f"文件 '{input_excel}' 不存在")
'''在已有excel文件中新增列标题'''
def add_header(self, wb, sheet, column, headers, file_name):
write_data = {}
sub_data = {}
for i, head in enumerate(headers):
k = column + i + 1
sub_data[k] = head
write_data[1] = sub_data
self.add_data_to_excel(wb, sheet, write_data, file_name)
def add_data_to_excel(self, wb, sheet, write_data, file_name):
'''
示例写入数据,格式为 {行号: {列号: 值}}
write_data = {
5: {11: 1, 12: 2, 13: 3}
}
'''
# 写入指定行列的数据
for row_num, col_data in write_data.items():
for col_num, value in col_data.items():
# print('row_num', row_num, 'col_num', col_num, 'value', value)
sheet.cell(row=row_num, column=col_num, value=str(value))
wb.save(file_name)
def get_simiTitle_index(self, ws, title_name='事件名'):
'''获取excel文件中标题名称与追加内容相同部分的列索引
例如:默认以"事件名"为基准,追加统一事件名下不同组成信息'''
# 获取列标题
headers = [cell.value for cell in ws[1]]
# # 找到“事件名”所在的列索引
event_name_index = headers.index(title_name) if title_name in headers else None
if event_name_index is None:
print("未找到标题为'事件名'的列")
return
else:
return event_name_index
def get_event_row(self, sheet, event_name_index, event_name):
'''获得当前事件名event_name在excel文件中所在的行索引'''
row_index = 0
for row, content in enumerate(sheet.iter_rows(min_row=2, values_only=True)):
# print('row', row, content[event_name_index])
if content[event_name_index] == event_name:
row_index = row + 2 ### 默认只有一行标题,若有两行标题则需改为+3
# print('row_index', row_index)
break
return row_index
def write_simi_add(self, wb, ws, sheet, max_col, evtname, dict_data, headers, file_name):
'''
在已有excel文件中追加内容
找出事件名所在行索引和原excel最大列索引在原excel最大列索引后指定行写入新内容内容顺序与新增headers顺序一致
'''
event_index = self.get_simiTitle_index(ws)
if event_index is not None:
print('evtname', evtname)
row_index = self.get_event_row(sheet, event_index, evtname)
if row_index > 0:
sub_dict = {}
print('headers', headers)
for i, header in enumerate(headers):
col_index = max_col + i + 1
# print('list(dict_data.keys())', list(dict_data.keys()))
if header in list(dict_data.keys()):
sub_dict[col_index] = dict_data[header]
else:
sub_dict[col_index] = ''
write_data = {row_index: sub_dict}
self.add_data_to_excel(wb, sheet, write_data, file_name)
print("=========save excel===========")
else:
raise Exception(f"未找到事件名:{evtname}")
else:
raise Exception("未找到标题为'事件名'的列")
def execute_pipeline(evtdir = r"D:\datasets\ym\后台数据\unzip",
DataType = "raw", # raw, pkl
save_path = r"D:\work\result_pipeline",
kk=1,
source_type = "video", # video, image,
yolo_ver = "V10", # V10, V5
weight_yolo_v5 = r'./ckpts/best_cls10_0906.pt' ,
weight_yolo_v10 = r'./ckpts/best_v10s_width0375_1205.pt',
saveimages = True,
max_col = 12,
track_txt = ''
):
'''
运行函数 pipeline(),遍历事件文件夹,每个文件夹是一个事件
'''
parmDict = {}
parmDict["DataType"] = DataType
parmDict["savepath"] = save_path
parmDict["SourceType"] = source_type
parmDict["YoloVersion"] = yolo_ver
if parmDict["YoloVersion"] == "V5":
parmDict["weights"] = weight_yolo_v5
elif parmDict["YoloVersion"] == "V10":
parmDict["weights"] = weight_yolo_v10
parmDict["saveimages"] = saveimages
evtdir = Path(evtdir)
errEvents = []
# k = 0
'''=========change callbackdata=============='''
csv_name = 'data.csv'
xlsx_name = '现场回传数据分析表.xlsx'
output_name = '现场回传数据分析表_all.xlsx'
# headers = ['algroStartToEnd', 'one2one', 'one2SN', 'one2n', '前摄轨迹数', '后摄轨迹数']
headers = ['algroStartToEnd', 'one2one', 'one2SN', 'one2n']
excelWriter = WriteExcel() ## 实例化excel对象
for name in evtdir.iterdir(): ##人名
for date_file in name.iterdir(): ##2025-01-13
# try:
xlsx_data = os.path.join(date_file, xlsx_name)
csv_data = os.path.join(date_file, csv_name)
excel_name = os.path.join(date_file, output_name)
wb, ws, sheet = excelWriter.init_excel(xlsx_data, excel_name, headers, max_col)
if csv_data == '':
with open('no_datacsv.txt', 'a') as f:
f.write(str(date_file) + '\n')
for item in date_file.iterdir():
# dict_data = {}
if item.is_dir():
# item = evtdir/Path("20241212-171505-f0afe929-fdfe-4efa-94d0-2fa748d65fbb_6907992518930")
parmDict["eventpath"] = item
event_name = str(item.name)
dict_data = get_process_csv_data(csv_data, item)
print('dict_data', dict_data)
# dict_data_all = pipeline(dict_data, pickle_exist, **parmDict)
if dict_data is not None: #已保存pickle文件的事件返回为None
# print('dict_data_all', dict_data_all)
excelWriter.write_simi_add(wb, ws, sheet, max_col, event_name, dict_data, headers, excel_name)
# except Exception as e:
# with open('process_error.txt', 'a') as f:
# f.write(str(date_file) + ':' + str(e) + '\n')
if __name__ == "__main__":
# datapath = '/home/yujia/yj/gpu_code/callback_data_test/'
# datapath = '/home/yujia/yj/gpu_code/callback_data_test_0417/'
# savepath = '/home/yujia/yj/gpu_code/result_0417_v10/'
datapath = '/shareData/data/temp_data/tengXunCloud_data/code_test_0427/'
# savepath = '/shareData/data/temp_data/tengXunCloud_data/code_test/pipline_result/' ##保存pipline结果路径
max_col = 12 ##excel表格列索引从0开始从这列开始写入代码解析内容
# track_txt = '轨迹数为空.txt'
track_txt = '' ##第一次跑pipline
execute_pipeline(evtdir=datapath,
DataType = "raw", # raw, pkl
kk=1,
source_type = "video", # video, image,
save_path = '',
yolo_ver = "V5", # V10, V5 ##20250401之前使用V5 ressnet使用resv10
weight_yolo_v5 = './ckpts/best_cls10_0906.pt' ,
weight_yolo_v10 = './ckpts/best_v10s_width0375_1205.pt',
saveimages = False,
max_col = max_col,
track_txt = track_txt
)