Files
2025-01-22 13:16:44 +08:00

280 lines
11 KiB
Python

"""
TRACLUS: A Trajectory Clustering Algorithm (A Partition and Group Framework)
Implemented for Python 3
This is an implementation of the TRACLUS algorithm as described in the paper:
"Trajectory Clustering: A Partition-and-Group Framework"
by Lee, Han, & Whang (2007) [http://hanj.cs.illinois.edu/pdf/sigmod07_jglee.pdf]
Implementation Author: Adriel Isaiah V. Amoguis (De La Salle University)
Implementation Date: 2023-03-19
This implementation was done as part of the algorithms required for the implementation author's
undergraduate thesis. The implementation is not guaranteed to be bug-free and may not be optimized
for certain use-cases. The implementation author is not responsible for any damages caused by the
use of this implementation. Use at your own risk. End-users are encouraged to examine the code
in the case of any issues. If you find any bugs, please report them to the implementation author
via the repository's issues page on GitHub.
"""
import argparse
import numpy as np
from sklearn.cluster import OPTICS
from sklearn.metrics.pairwise import cosine_similarity
from scipy.spatial.distance import euclidean as d_euclidean
import pickle
import os
import warnings
# UTILITY FUNCTIONS
def load_trajectories(filepath):
"""
Load the trajectories from a pickle file.
"""
if not os.path.exists(filepath):
raise FileNotFoundError("File not found at {}".format(filepath))
with open(filepath, 'rb') as f:
trajectories = pickle.load(f)
return trajectories
def get_point_projection_on_line(point, line):
"""
Get the projection of a point on a line.
"""
# Get the slope of the line using the start and end points
line_slope = (line[-1, 1] - line[0, 1]) / (line[-1, 0] - line[0, 0]) if line[-1, 0] != line[0, 0] else np.inf
# In case the slope is infinite, we can directly get the projection
if np.isinf(line_slope):
return np.array([line[0,0], point[1]])
# Convert the slope to a rotation matrix
R = slope_to_rotation_matrix(line_slope)
# Rotate the line and point
rot_line = np.matmul(line, R.T)
rot_point = np.matmul(point, R.T)
# Get the projection
proj = np.array([rot_point[0], rot_line[0,1]])
# Undo the rotation for the projection
R_inverse = np.linalg.inv(R)
proj = np.matmul(proj, R_inverse.T)
return proj
################# EQUATIONS #################
# Euclidean Distance : Accepts two points of type np.ndarray([x,y])
# DEPRECATED IN FAVOR OF THE SCIPY IMPLEMENTATION OF THE EUCLIDEAN DISTANCE
# d_euclidean = lambda p1, p2: np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)
# Perpendicular Distance
def d_perpendicular(l1, l2):
"""
Calculate the perpendicular distance between two lines.
"""
# Find the shorter line and assign that as l_shorter
l_shorter = l_longer = None
l1_len, l2_len = d_euclidean(l1[0], l1[-1]), d_euclidean(l2[0], l2[-1])
if l1_len < l2_len:
l_shorter = l1
l_longer = l2
else:
l_shorter = l2
l_longer = l1
ps = get_point_projection_on_line(l_shorter[0], l_longer)
pe = get_point_projection_on_line(l_shorter[-1], l_longer)
lehmer_1 = d_euclidean(l_shorter[0], ps)
lehmer_2 = d_euclidean(l_shorter[-1], pe)
if lehmer_1 == 0 and lehmer_2 == 0:
return 0
return (lehmer_1**2 + lehmer_2**2) / (lehmer_1 + lehmer_2)#, ps, pe, l_shorter[0], l_shorter[-1]
# Parallel Distance
def d_parallel(l1, l2):
"""
Calculate the parallel distance between two lines.
"""
# Find the shorter line and assign that as l_shorter
l_shorter = l_longer = None
l1_len, l2_len = d_euclidean(l1[0], l1[-1]), d_euclidean(l2[0], l2[-1])
if l1_len < l2_len:
l_shorter = l1
l_longer = l2
else:
l_shorter = l2
l_longer = l1
ps = get_point_projection_on_line(l_shorter[0], l_longer)
pe = get_point_projection_on_line(l_shorter[-1], l_longer)
parallel_1 = min(d_euclidean(l_longer[0], ps), d_euclidean(l_longer[-1], ps))
parallel_2 = min(d_euclidean(l_longer[0], pe), d_euclidean(l_longer[-1], pe))
return min(parallel_1, parallel_2)
# Angular Distance
def d_angular(l1, l2, directional=True):
"""
Calculate the angular distance between two lines.
"""
# Find the shorter line and assign that as l_shorter
l_shorter = l_longer = None
l1_len, l2_len = d_euclidean(l1[0], l1[-1]), d_euclidean(l2[0], l2[-1])
if l1_len < l2_len:
l_shorter = l1
l_longer = l2
else:
l_shorter = l2
l_longer = l1
# Get the minimum intersecting angle between both lines
shorter_slope = (l_shorter[-1,1] - l_shorter[0,1]) / (l_shorter[-1,0] - l_shorter[0,0]) if l_shorter[-1,0] - l_shorter[0,0] != 0 else np.inf
longer_slope = (l_longer[-1,1] - l_longer[0,1]) / (l_longer[-1,0] - l_longer[0,0]) if l_longer[-1,0] - l_longer[0,0] != 0 else np.inf
# The case of a vertical line
theta = None
if np.isinf(shorter_slope):
# Get the angle of the longer line with the x-axis and subtract it from 90 degrees
tan_theta0 = longer_slope
tan_theta1 = tan_theta0 * -1
theta0 = np.abs(np.arctan(tan_theta0))
theta1 = np.abs(np.arctan(tan_theta1))
theta = min(theta0, theta1)
elif np.isinf(longer_slope):
# Get the angle of the shorter line with the x-axis and subtract it from 90 degrees
tan_theta0 = shorter_slope
tan_theta1 = tan_theta0 * -1
theta0 = np.abs(np.arctan(tan_theta0))
theta1 = np.abs(np.arctan(tan_theta1))
theta = min(theta0, theta1)
else:
tan_theta0 = (shorter_slope - longer_slope) / (1 + shorter_slope * longer_slope)
tan_theta1 = tan_theta0 * -1
theta0 = np.abs(np.arctan(tan_theta0))
theta1 = np.abs(np.arctan(tan_theta1))
theta = min(theta0, theta1)
if directional:
return np.sin(theta) * d_euclidean(l_longer[0], l_longer[-1])
if 0 <= theta < (90 * np.pi / 180):
return np.sin(theta) * d_euclidean(l_longer[0], l_longer[-1])
elif (90 * np.pi / 180) <= theta <= np.pi:
return np.sin(theta)
else:
raise ValueError("Theta is not in the range of 0 to 180 degrees.")
# Total Trajectory Distance
def distance(l1, l2, directional=True, w_perpendicular=1, w_parallel=1, w_angular=1):
"""
Get the total trajectory distance using all three distance formulas.
"""
perpendicular_distance = d_perpendicular(l1, l2)
parallel_distance = d_parallel(l1, l2)
angular_distance = d_angular(l1, l2, directional=directional)
return (w_perpendicular * perpendicular_distance) + (w_parallel * parallel_distance) + (w_angular * angular_distance)
# Minimum Description Length
def minimum_desription_length(start_idx, curr_idx, trajectory, w_angular=1, w_perpendicular=1, par=True, directional=True):
"""
Calculate the minimum description length.
"""
LH = LDH = 0
for i in range(start_idx, curr_idx-1):
ed = d_euclidean(trajectory[i], trajectory[i+1])
# print("ed:", ed)
LH += max(0, np.log2(ed, where=ed>0))
if par:
for j in range(start_idx, i-1):
# print()
# print(np.array([trajectory[start_idx], trajectory[i]]))
# print(np.array([trajectory[j], trajectory[j+1]]))
LDH += w_perpendicular * d_perpendicular(np.array([trajectory[start_idx], trajectory[i]]), np.array([trajectory[j], trajectory[j+1]]))
LDH += w_angular * d_angular(np.array([trajectory[start_idx], trajectory[i]]), np.array([trajectory[j], trajectory[j+1]]), directional=directional)
# print("LDH:", LDH)
if par:
return LDH + LH
return LH
# Slope to rotation matrix
def slope_to_rotation_matrix(slope):
"""
Convert slope to rotation matrix.
"""
return np.array([[1, slope], [-slope, 1]])
#############################################
def partition(trajectory, directional=True, progress_bar=False, edis=30, w_perpendicular=1, w_angular=1):
"""
Partition a trajectory into segments.
"""
# Ensure that the trajectory is a numpy array of shape (n, 2)
if not isinstance(trajectory, np.ndarray):
raise TypeError("Trajectory must be a numpy array")
elif trajectory.shape[1] != 2:
raise ValueError("Trajectory must be a numpy array of shape (n, 2)")
# Initialize the characteristic points, add the first point as a characteristic point
cp_indices = []
cp_indices.append(0)
traj_len = trajectory.shape[0]
start_idx = 0
length = 1
while start_idx + length < traj_len:
if progress_bar:
print(f'\r{round(((start_idx + length) / traj_len) * 100, 2)}%', end='')
# print(f'Current Index: {start_idx + length}, Trajectory Length: {traj_len}')
curr_idx = start_idx + length
# print(start_idx, curr_idx)
# print(f"Current Index: {curr_idx}, Current point: {trajectory[curr_idx]}")
cost_par = minimum_desription_length(start_idx, curr_idx, trajectory, w_angular=w_angular, w_perpendicular=w_perpendicular, directional=directional)
cost_nopar = minimum_desription_length(start_idx, curr_idx, trajectory, par=False, directional=directional)
# cost_par += 0 if 1 - cos == 0 else w_feats / (1 - cos)
# print(f'Cost with partition: {cost_par}, Cost without partition: {cost_nopar}')
if cost_par > cost_nopar and d_euclidean(trajectory[start_idx], trajectory[curr_idx]) > edis:
# print('edp:', d_euclidean(trajectory[start_idx], trajectory[curr_idx]))
# print(f"Added characteristic point: {trajectory[curr_idx-1]} with index {curr_idx-1}")
cp_indices.append(curr_idx-1)
start_idx = curr_idx-1
length = 1
else:
length += 1
# Add last point to characteristic points
cp_indices.append(len(trajectory) - 1)
# print(cp_indices)
return np.array([trajectory[i] for i in cp_indices]), cp_indices
# Create the script version that takes in a file path for inputs
if __name__ == "__main__":
# Parse the arguments
parser = argparse.ArgumentParser(description="Trajectory Clustering Algorithm")
parser.add_argument("input_file", help="The input file path (pickle format)")
parser.add_argument("-p", "--progress_bar", help="Show the progress bar", action="store_true")
args = parser.parse_args()
# Load the trajectories
trajectories = load_trajectories(args.input_file)
# Run the partition algorithm
partitions, indices = tr.partition(points, progress_bar=args.progress_bar, w_perpendicular=100, w_angular=10)