14 Commits
master ... dev

Author SHA1 Message Date
9400ae904a llm_agent 2025-04-15 09:26:24 +08:00
ad850221c5 修改跟踪算法cpu gpu切换错误 2025-01-17 10:53:24 +08:00
31db54fa37 添加标准图像为None时候的判断 2025-01-15 15:26:59 +08:00
38d962cd59 Add Qwen agent 2025-01-14 17:24:07 +08:00
f797aea5d0 Add Qwen agent 2025-01-14 17:20:52 +08:00
a3b65be6b1 add Qwen interface 2025-01-14 17:18:39 +08:00
6ede9580cb add Qwen interface 2025-01-14 17:15:24 +08:00
2feedd622d {commit_message} 2025-01-14 15:03:49 +08:00
544eac9cec add network crop 2025-01-14 15:03:49 +08:00
415a804c9b add network image cropping 2025-01-14 15:03:49 +08:00
a64dcb5a10 {commit_message} 2025-01-14 15:03:49 +08:00
47a384131e {commit_message} 2025-01-14 14:56:05 +08:00
c7859a7792 add network image crop pipeline 2025-01-14 14:56:05 +08:00
a16235a593 add network image crop pipeline 2025-01-14 13:38:17 +08:00
51 changed files with 53206 additions and 11 deletions

10
.gitignore vendored
View File

@ -1,14 +1,11 @@
# Repo-specific GitIgnore ----------------------------------------------------------------------------------------------
*.jpg
*.jpeg
*.png
*.bmp
*.tif
*.tiff
*.heic
*.JPG
*.JPEG
*.PNG
*.BMP
*.TIF
*.TIFF
@ -26,7 +23,7 @@
*.pickle
*.npy
*.csv
*.pyc
# for tracking ---------------------------------------------------------------
@ -39,6 +36,10 @@ tracking/data/boxes_imgs/*
tracking/data/trackfeats/*
tracking/data/tracks/*
tracking/data/handlocal/*
contrast/feat_extract/model/__pycache__/*
std_img*
.gitignore
*/__pycache__/*
ckpts/*
doc
@ -54,7 +55,6 @@ VOC/
# Neural Network weights -----------------------------------------------------------------------------------------------
*.weights
*.pt
*.pth
*.pb
*.onnx

176
Qwen_agent.py Normal file
View File

@ -0,0 +1,176 @@
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from accelerate import init_empty_weights, load_checkpoint_in_model
from stream_pipeline import stream_pipeline
from PIL import Image
from io import BytesIO
import torch
import ast
import requests
import random
# default: Load the model on the available device(s)
model = Qwen2VLForConditionalGeneration.from_pretrained(
"Qwen/Qwen2-VL-7B-Instruct",
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2",
device_map="auto"
)
# default processer
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-7B-Instruct", attn_implementation="flash_attention_2")
def qwen_prompt(img_list, messages):
# Preparation for inference
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = processor(
text=[text],
images=img_list,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
# Inference: Generation of the output
with torch.no_grad():
generated_ids = model.generate(**inputs, max_new_tokens=256)
generated_ids_trimmed = [
out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, add_special_tokens=False, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
del inputs
del generated_ids
del generated_ids_trimmed
torch.cuda.empty_cache()
return output_text[0]
def get_best_image(track_imgs):
if len(track_imgs) >= 5:
track_imgs = random.sample(track_imgs, 5)
img_frames = []
for i in range(len(track_imgs)):
content = {}
content['type'] = 'image'
content['min_pixels'] = 224 * 224
content['max_pixels'] = 800 * 800
img_frames.append(content)
messages = [
{
"role": "system",
"content": "你是一个在超市工作的chatbot你现在需要帮助顾客找到一张质量最好的商品图像。一个好的商品图像需要满足以下条件: \
1. 文字清晰且连贯。\
2. 商品图案清晰可识别。\
3. 商品可提取的描述信息丰富。\
基于以上条件从多张图像中筛选出最好的图像然后以dict输出该图像的索引信息key为'index'"
},
{
"role": "system",
"content": img_frames,
},
]
output_text = qwen_prompt(track_imgs, messages)
output_dict = ast.literal_eval(output_text.strip('```python\n'))
if output_dict['index'] > len(track_imgs):
output_dict['index'] = len(track_imgs)
best_img = track_imgs[output_dict['index'] - 1]
return best_img
def get_product_description(std_img, track_imgs):
messages = [
{
"role": "system",
"content": "你是一个在超市工作的chatbot你现在需要提取图像中商品的信息信息需要按照以下python dict的格式输出如果 \
信息模糊不清则输出'未知': \
{ \
'item1': {\
'Text': 第一张图像中商品中提取出的文字信息, \
'Color': 第一张图像中商品的颜色, \
'Shape': 第一张图像中商品的形状, \
'Material': 第一张图像中商品的材质, \
'Category': 第一张图像中商品的类别, \
} \
'item2': {\
'Text': 第二张图像中商品中提取出的文字信息, \
'Color': 第二张图像中商品的颜色, \
'Shape': 第二张图像中商品的形状, \
'Material': 第二张图像中商品的材质, \
'Category': 第二张图像中商品的类别, \
} \
'is_Same': 首先判断'Color'是否一致如果不一致则返回False如果一致则判断是否以上两个dict的['Text', 'Shape', 'Material', 'Category']key中至少有3个相同则输出True\
否则输出False。 \
} \
"
},
{
"role": "user",
"content": [
{
"type": "image",
"min_pixels": 224 * 224,
"max_pixels": 800 * 800,
},
{
"type": "image",
"min_pixels": 224 * 224,
"max_pixels": 800 * 800,
},
],
},
# {
# "role": "user",
# "content": "以python dict的形式输出第二张图像的比对信息。"
# "content": "输出一个listlist的内容包含两张图像提取出的dict信息。"
# }
]
best_img = get_best_image(track_imgs)
if std_img is not None:
img_list = [std_img, best_img]
else:
img_list = [best_img, best_img]
output_text = qwen_prompt(img_list, messages)
contrast_pair = ast.literal_eval(output_text.strip('```python\n'))
return contrast_pair
def item_analysis(stream_dict):
track_imgs = stream_pipeline(stream_dict)
if len(track_imgs) == 0:
return {}
std_img = None
if stream_dict['goodsPic'] is not None:
# response = requests.get(stream_dict['goodsPic'])
# std_img = Image.open(BytesIO(response.content))
std_img = Image.open(stream_dict['goodsPic']).convert("RGB")
description_dict = get_product_description(std_img, track_imgs)
return description_dict
def main():
# sample input dict
stream_dict = {
"goodsName" : "优诺优丝黄桃果粒风味发酵乳",
"measureProperty" : 0,
"qty" : 1,
"price" : 25.9,
"weight": 560, # 单位克
"barcode": "6931806801024",
"video" : "https://ieemoo-ai.obs.cn-east-3.myhuaweicloud.com/videos/20231009/04/04_20231009-082149_21f2ca35-f2c2-4386-8497-3e7a3b407f03_4901872831197.mp4",
"goodsPic" : "https://ieemoo-storage.obs.cn-east-3.myhuaweicloud.com/lhpic/6931806801024.jpg",
"measureUnit" : "",
"goodsSpec" : "405g"
}
result = item_analysis(stream_dict)
print(result)
if __name__ == "__main__":
main()

View File

@ -61,7 +61,7 @@ class Config:
test_val = "D:/比对/cl"
# test_val = "./data/test_data_100"
test_model = "checkpoints/best_resnet18_v12.pth"
test_model = "checkpoints/zhanting_res_801.pth"
# test_model = "checkpoints/zhanting_res_801.pth"

View File

@ -108,7 +108,7 @@ class FeatsInterface:
# patch = patch.to(device=self.device).half()
if str(self.device) != "cpu":
patch = patch.to(device=self.device).half()
patch = patch.to(device=self.device)
else:
patch = patch.to(device=self.device)

23239
dataPair_test.ipynb Normal file

File diff suppressed because one or more lines are too long

BIN
images/20250123160635.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

BIN
images/34414.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

BIN
images/6917935002150.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 416 KiB

BIN
images/6920584471215.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

BIN
images/6925819700245.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 190 KiB

BIN
images/6931941252224.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

BIN
images/6934129300472.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

BIN
images/6942070231936.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 204 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 181 KiB

BIN
images/image1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 434 KiB

BIN
images/output.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 103 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 321 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

16326
minicpm.ipynb Normal file

File diff suppressed because one or more lines are too long

25
minicpm.py Normal file
View File

@ -0,0 +1,25 @@
# Load model directly
from transformers import AutoModel, AutoTokenizer
import torch
from PIL import Image
model = AutoModel.from_pretrained(
"openbmb/MiniCPM-o-2_6",
trust_remote_code=True,
attn_implementation='flash_attention_2',
torch_dtype=torch.bfloat16,
# device_map="auto"
)
model = model.eval().cuda()
tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-o-2_6', use_fast=True, trust_remote_code=True)
img1 = Image.open('/home/ieemoo0337/projects/datasets/constrast_pair/8850813311020/8850813311020.jpg')
img2 = Image.open('/home/ieemoo0337/projects/datasets/constrast_pair/8850511321499/8850511321499.jpg')
question = '描述第一张图像的1。'
msgs = [{'role': 'user', 'content': [img1, img2, question]}]
answer = model.chat(
msgs=msgs,
tokenizer=tokenizer
)
print(answer)

146
stream_pipeline.py Normal file
View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
"""
Created on Tuesday Jan 14 2025
@author: liujiawei
@description: 读取网络图片,并优化轨迹,截取子图
"""
import os
import sys
import cv2
import numpy as np
from pipeline import pipeline
from tracking import traclus as tr
from track_reid import parse_opt
from track_reid import yolo_resnet_tracker
from tracking.dotrack.dotracks_back import doBackTracks
from PIL import Image
def save_event_subimgs(imgs, bboxes):
img_list = {}
for i, box in enumerate(bboxes):
x1, y1, x2, y2, tid, score, cls, fid, bid = box
img_list[int(fid)] = imgs[fid][int(y1):int(y2), int(x1):int(x2), :]
return img_list
def get_optimized_bboxes(event_tracks):
vts_back = event_tracks
points = []
labels = []
for track in vts_back.Residual:
for ele in track.boxes:
points.append([int(ele[2]), int(ele[3])])
labels.append(int(ele[4])) # track_id
points = np.array(points)
if len(points) == 0:
return []
partitions, indices = tr.partition(points, progress_bar=False, w_perpendicular=100, w_angular=10)
bboxes_opt = []
for track in vts_back.Residual:
for i in indices:
if i >= len(track.boxes): continue
if labels[i] == track.boxes[i][4]:
bboxes_opt.append(track.boxes[i])
return bboxes_opt
def get_tracking_info(
vpath,
SourceType = "video", # video
stdfeat_path = None
):
optdict = {}
optdict["weights"] = './tracking/ckpts/best_cls10_0906.pt'
optdict["is_save_img"] = False
optdict["is_save_video"] = False
event_tracks = []
video_frames = {}
'''Yolo + Resnet + Tracker'''
optdict["source"] = vpath
optdict["video_frames"] = video_frames
optdict["is_annotate"] = False
yrtOut = yolo_resnet_tracker(**optdict)
trackerboxes = np.empty((0, 9), dtype=np.float64)
trackefeats = {}
for frameDict in yrtOut:
tboxes = frameDict["tboxes"]
ffeats = frameDict["feats"]
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
for i in range(len(tboxes)):
fid, bid = int(tboxes[i, 7]), int(tboxes[i, 8])
trackefeats.update({f"{fid}_{bid}": ffeats[f"{fid}_{bid}"]})
vts = doBackTracks(trackerboxes, trackefeats)
vts.classify()
event_tracks.append(("back", vts))
return event_tracks, video_frames
def stream_pipeline(stream_dict):
parmDict = {}
parmDict["vpath"] = stream_dict["video"]
# parmDict["savepath"] = os.path.join('pipeline_output', info_dict["barcode"])
parmDict["SourceType"] = "video" # video, image
parmDict["stdfeat_path"] = None
event_tracks, video_frames = get_tracking_info(**parmDict)
bboxes_opt = get_optimized_bboxes(event_tracks[0][1])
if len(bboxes_opt) == 0:
return []
subimg_dict = save_event_subimgs(video_frames, bboxes_opt)
sub_images = []
for fid, img in subimg_dict.items():
pil_image = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
sub_images.append(pil_image)
return sub_images
def main():
'''
sample stream_dict:
'''
stream_dict = {
"goodsName" : "优诺优丝黄桃果粒风味发酵乳",
"measureProperty" : 0,
"qty" : 1,
"price" : 25.9,
"weight": 560, # 单位克
"barcode": "6931806801024",
"video" : "https://ieemoo-ai.obs.cn-east-3.myhuaweicloud.com/videos/20231009/04/04_20231009-082149_21f2ca35-f2c2-4386-8497-3e7a3b407f03_4901872831197.mp4",
"goodsPic" : "https://ieemoo-storage.obs.cn-east-3.myhuaweicloud.com/lhpic/6931806801024.jpg",
"measureUnit" : "",
"goodsSpec" : "405g"
}
subimg_list = stream_pipeline(stream_dict)
save_path = os.path.join('subimg', stream_dict["barcode"])
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
for filename in os.listdir(save_path):
file_path = os.path.join(save_path, filename)
if os.path.isfile(file_path):
os.unlink(file_path)
for i, img in enumerate(subimg_list):
img.save(f'{save_path}/frame_{i}.jpg')
print(f'Finish crop subimages {stream_dict["barcode"]}!')
if __name__ == "__main__":
main()

12995
test.ipynb Normal file

File diff suppressed because one or more lines are too long

View File

@ -143,6 +143,7 @@ def yolo_resnet_tracker(
save_dir = '',
is_save_img = True,
is_save_video = True,
is_annotate = True,
tracker_yaml = "./tracking/trackers/cfg/botsort.yaml",
imgsz=(640, 640), # inference size (height, width)
@ -162,6 +163,7 @@ def yolo_resnet_tracker(
dnn=False, # use OpenCV DNN for ONNX inference
vid_stride=1, # video frame-rate stride
data=ROOT / 'data/coco128.yaml', # dataset.yaml path
video_frames = None
):
# source = str(source)
# Load model
@ -215,7 +217,7 @@ def yolo_resnet_tracker(
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round()
# det = det.cpu().numpy()
det = det.cpu().numpy()
## ================================================================ writed by WQG
'''tracks: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]
0 1 2 3 4 5 6 7 8
@ -260,7 +262,8 @@ def yolo_resnet_tracker(
'''====== Save results (image and video) ======'''
# save_path = str(save_dir / Path(path).name) # 带有后缀名
im0 = annotator.result()
if is_annotate:
im0 = annotator.result()
if is_save_img:
save_path_img = str(save_dir / Path(path).stem)
if dataset.mode == 'image':
@ -268,6 +271,8 @@ def yolo_resnet_tracker(
else:
imgpath = save_path_img + f"_{frameId}.png"
cv2.imwrite(Path(imgpath), im0)
if video_frames is not None:
video_frames.update({frameId: im0})
# if dataset.mode == 'video' and is_save_video:

BIN
tracking/ckpts/best_cls10_0906.pt Executable file

Binary file not shown.

BIN
tracking/ckpts/yolov5s.pt Normal file

Binary file not shown.

View File

@ -153,8 +153,8 @@ class doBackTracks(doTracks):
hand_ious = []
hboxes = np.empty(shape=(0, 9), dtype = np.float)
gboxes = np.empty(shape=(0, 9), dtype = np.float)
hboxes = np.empty(shape=(0, 9), dtype = float)
gboxes = np.empty(shape=(0, 9), dtype = float)
# start, end 为索引值,需要 start:(end+1)

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.0 KiB

280
tracking/traclus.py Normal file
View File

@ -0,0 +1,280 @@
"""
TRACLUS: A Trajectory Clustering Algorithm (A Partition and Group Framework)
Implemented for Python 3
This is an implementation of the TRACLUS algorithm as described in the paper:
"Trajectory Clustering: A Partition-and-Group Framework"
by Lee, Han, & Whang (2007) [http://hanj.cs.illinois.edu/pdf/sigmod07_jglee.pdf]
Implementation Author: Adriel Isaiah V. Amoguis (De La Salle University)
Implementation Date: 2023-03-19
This implementation was done as part of the algorithms required for the implementation author's
undergraduate thesis. The implementation is not guaranteed to be bug-free and may not be optimized
for certain use-cases. The implementation author is not responsible for any damages caused by the
use of this implementation. Use at your own risk. End-users are encouraged to examine the code
in the case of any issues. If you find any bugs, please report them to the implementation author
via the repository's issues page on GitHub.
"""
import argparse
import numpy as np
from sklearn.cluster import OPTICS
from sklearn.metrics.pairwise import cosine_similarity
from scipy.spatial.distance import euclidean as d_euclidean
import pickle
import os
import warnings
# UTILITY FUNCTIONS
def load_trajectories(filepath):
"""
Load the trajectories from a pickle file.
"""
if not os.path.exists(filepath):
raise FileNotFoundError("File not found at {}".format(filepath))
with open(filepath, 'rb') as f:
trajectories = pickle.load(f)
return trajectories
def get_point_projection_on_line(point, line):
"""
Get the projection of a point on a line.
"""
# Get the slope of the line using the start and end points
line_slope = (line[-1, 1] - line[0, 1]) / (line[-1, 0] - line[0, 0]) if line[-1, 0] != line[0, 0] else np.inf
# In case the slope is infinite, we can directly get the projection
if np.isinf(line_slope):
return np.array([line[0,0], point[1]])
# Convert the slope to a rotation matrix
R = slope_to_rotation_matrix(line_slope)
# Rotate the line and point
rot_line = np.matmul(line, R.T)
rot_point = np.matmul(point, R.T)
# Get the projection
proj = np.array([rot_point[0], rot_line[0,1]])
# Undo the rotation for the projection
R_inverse = np.linalg.inv(R)
proj = np.matmul(proj, R_inverse.T)
return proj
################# EQUATIONS #################
# Euclidean Distance : Accepts two points of type np.ndarray([x,y])
# DEPRECATED IN FAVOR OF THE SCIPY IMPLEMENTATION OF THE EUCLIDEAN DISTANCE
# d_euclidean = lambda p1, p2: np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)
# Perpendicular Distance
def d_perpendicular(l1, l2):
"""
Calculate the perpendicular distance between two lines.
"""
# Find the shorter line and assign that as l_shorter
l_shorter = l_longer = None
l1_len, l2_len = d_euclidean(l1[0], l1[-1]), d_euclidean(l2[0], l2[-1])
if l1_len < l2_len:
l_shorter = l1
l_longer = l2
else:
l_shorter = l2
l_longer = l1
ps = get_point_projection_on_line(l_shorter[0], l_longer)
pe = get_point_projection_on_line(l_shorter[-1], l_longer)
lehmer_1 = d_euclidean(l_shorter[0], ps)
lehmer_2 = d_euclidean(l_shorter[-1], pe)
if lehmer_1 == 0 and lehmer_2 == 0:
return 0
return (lehmer_1**2 + lehmer_2**2) / (lehmer_1 + lehmer_2)#, ps, pe, l_shorter[0], l_shorter[-1]
# Parallel Distance
def d_parallel(l1, l2):
"""
Calculate the parallel distance between two lines.
"""
# Find the shorter line and assign that as l_shorter
l_shorter = l_longer = None
l1_len, l2_len = d_euclidean(l1[0], l1[-1]), d_euclidean(l2[0], l2[-1])
if l1_len < l2_len:
l_shorter = l1
l_longer = l2
else:
l_shorter = l2
l_longer = l1
ps = get_point_projection_on_line(l_shorter[0], l_longer)
pe = get_point_projection_on_line(l_shorter[-1], l_longer)
parallel_1 = min(d_euclidean(l_longer[0], ps), d_euclidean(l_longer[-1], ps))
parallel_2 = min(d_euclidean(l_longer[0], pe), d_euclidean(l_longer[-1], pe))
return min(parallel_1, parallel_2)
# Angular Distance
def d_angular(l1, l2, directional=True):
"""
Calculate the angular distance between two lines.
"""
# Find the shorter line and assign that as l_shorter
l_shorter = l_longer = None
l1_len, l2_len = d_euclidean(l1[0], l1[-1]), d_euclidean(l2[0], l2[-1])
if l1_len < l2_len:
l_shorter = l1
l_longer = l2
else:
l_shorter = l2
l_longer = l1
# Get the minimum intersecting angle between both lines
shorter_slope = (l_shorter[-1,1] - l_shorter[0,1]) / (l_shorter[-1,0] - l_shorter[0,0]) if l_shorter[-1,0] - l_shorter[0,0] != 0 else np.inf
longer_slope = (l_longer[-1,1] - l_longer[0,1]) / (l_longer[-1,0] - l_longer[0,0]) if l_longer[-1,0] - l_longer[0,0] != 0 else np.inf
# The case of a vertical line
theta = None
if np.isinf(shorter_slope):
# Get the angle of the longer line with the x-axis and subtract it from 90 degrees
tan_theta0 = longer_slope
tan_theta1 = tan_theta0 * -1
theta0 = np.abs(np.arctan(tan_theta0))
theta1 = np.abs(np.arctan(tan_theta1))
theta = min(theta0, theta1)
elif np.isinf(longer_slope):
# Get the angle of the shorter line with the x-axis and subtract it from 90 degrees
tan_theta0 = shorter_slope
tan_theta1 = tan_theta0 * -1
theta0 = np.abs(np.arctan(tan_theta0))
theta1 = np.abs(np.arctan(tan_theta1))
theta = min(theta0, theta1)
else:
tan_theta0 = (shorter_slope - longer_slope) / (1 + shorter_slope * longer_slope)
tan_theta1 = tan_theta0 * -1
theta0 = np.abs(np.arctan(tan_theta0))
theta1 = np.abs(np.arctan(tan_theta1))
theta = min(theta0, theta1)
if directional:
return np.sin(theta) * d_euclidean(l_longer[0], l_longer[-1])
if 0 <= theta < (90 * np.pi / 180):
return np.sin(theta) * d_euclidean(l_longer[0], l_longer[-1])
elif (90 * np.pi / 180) <= theta <= np.pi:
return np.sin(theta)
else:
raise ValueError("Theta is not in the range of 0 to 180 degrees.")
# Total Trajectory Distance
def distance(l1, l2, directional=True, w_perpendicular=1, w_parallel=1, w_angular=1):
"""
Get the total trajectory distance using all three distance formulas.
"""
perpendicular_distance = d_perpendicular(l1, l2)
parallel_distance = d_parallel(l1, l2)
angular_distance = d_angular(l1, l2, directional=directional)
return (w_perpendicular * perpendicular_distance) + (w_parallel * parallel_distance) + (w_angular * angular_distance)
# Minimum Description Length
def minimum_desription_length(start_idx, curr_idx, trajectory, w_angular=1, w_perpendicular=1, par=True, directional=True):
"""
Calculate the minimum description length.
"""
LH = LDH = 0
for i in range(start_idx, curr_idx-1):
ed = d_euclidean(trajectory[i], trajectory[i+1])
# print("ed:", ed)
LH += max(0, np.log2(ed, where=ed>0))
if par:
for j in range(start_idx, i-1):
# print()
# print(np.array([trajectory[start_idx], trajectory[i]]))
# print(np.array([trajectory[j], trajectory[j+1]]))
LDH += w_perpendicular * d_perpendicular(np.array([trajectory[start_idx], trajectory[i]]), np.array([trajectory[j], trajectory[j+1]]))
LDH += w_angular * d_angular(np.array([trajectory[start_idx], trajectory[i]]), np.array([trajectory[j], trajectory[j+1]]), directional=directional)
# print("LDH:", LDH)
if par:
return LDH + LH
return LH
# Slope to rotation matrix
def slope_to_rotation_matrix(slope):
"""
Convert slope to rotation matrix.
"""
return np.array([[1, slope], [-slope, 1]])
#############################################
def partition(trajectory, directional=True, progress_bar=False, edis=30, w_perpendicular=1, w_angular=1):
"""
Partition a trajectory into segments.
"""
# Ensure that the trajectory is a numpy array of shape (n, 2)
if not isinstance(trajectory, np.ndarray):
raise TypeError("Trajectory must be a numpy array")
elif trajectory.shape[1] != 2:
raise ValueError("Trajectory must be a numpy array of shape (n, 2)")
# Initialize the characteristic points, add the first point as a characteristic point
cp_indices = []
cp_indices.append(0)
traj_len = trajectory.shape[0]
start_idx = 0
length = 1
while start_idx + length < traj_len:
if progress_bar:
print(f'\r{round(((start_idx + length) / traj_len) * 100, 2)}%', end='')
# print(f'Current Index: {start_idx + length}, Trajectory Length: {traj_len}')
curr_idx = start_idx + length
# print(start_idx, curr_idx)
# print(f"Current Index: {curr_idx}, Current point: {trajectory[curr_idx]}")
cost_par = minimum_desription_length(start_idx, curr_idx, trajectory, w_angular=w_angular, w_perpendicular=w_perpendicular, directional=directional)
cost_nopar = minimum_desription_length(start_idx, curr_idx, trajectory, par=False, directional=directional)
# cost_par += 0 if 1 - cos == 0 else w_feats / (1 - cos)
# print(f'Cost with partition: {cost_par}, Cost without partition: {cost_nopar}')
if cost_par > cost_nopar and d_euclidean(trajectory[start_idx], trajectory[curr_idx]) > edis:
# print('edp:', d_euclidean(trajectory[start_idx], trajectory[curr_idx]))
# print(f"Added characteristic point: {trajectory[curr_idx-1]} with index {curr_idx-1}")
cp_indices.append(curr_idx-1)
start_idx = curr_idx-1
length = 1
else:
length += 1
# Add last point to characteristic points
cp_indices.append(len(trajectory) - 1)
# print(cp_indices)
return np.array([trajectory[i] for i in cp_indices]), cp_indices
# Create the script version that takes in a file path for inputs
if __name__ == "__main__":
# Parse the arguments
parser = argparse.ArgumentParser(description="Trajectory Clustering Algorithm")
parser.add_argument("input_file", help="The input file path (pickle format)")
parser.add_argument("-p", "--progress_bar", help="Show the progress bar", action="store_true")
args = parser.parse_args()
# Load the trajectories
trajectories = load_trajectories(args.input_file)
# Run the partition algorithm
partitions, indices = tr.partition(points, progress_bar=args.progress_bar, w_perpendicular=100, w_angular=10)

View File

@ -243,6 +243,9 @@ class LoadImages:
path = Path(path).read_text().rsplit()
files = []
for p in sorted(path) if isinstance(path, (list, tuple)) else [path]:
if p.startswith('http'):
files.append(p)
continue
p = str(Path(p).resolve())
if '*' in p:
files.extend(sorted(glob.glob(p, recursive=True))) # glob