Files
detecttracking/realtime/intrude_detect.py
2025-04-11 17:02:39 +08:00

421 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 8 10:07:17 2025
@author: wqg
"""
import csv
import os
import platform
import sys
import pickle
import cv2
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
from typing import List, Tuple
from scipy.spatial.distance import cdist
from scipy.spatial import ConvexHull
from shapely.geometry import Point, Polygon
##################################################### for method: run_yrt()
FILE = Path(__file__).resolve()
ROOT = FILE.parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from track_reid import yolov10_resnet_tracker
from event_time_specify import devide_motion_state
def cross(o: Tuple[float, float], a: Tuple[float, float], b: Tuple[float, float]) -> float:
""" 计算向量 OA × OB 的叉积 """
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
def compute_convex_hull(points: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
""" 使用 Andrew's Monotone Chain 算法求二维点集的凸包 """
points = sorted(set(points)) # 排序并去重
if len(points) <= 1:
return points
lower = []
for p in points:
while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0:
lower.pop()
lower.append(p)
upper = []
for p in reversed(points):
while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0:
upper.pop()
upper.append(p)
# 去掉重复的连接点
return lower[:-1] + upper[:-1]
def is_point_in_convex_hull(point: Tuple[float, float], hull: List[Tuple[float, float]]) -> bool:
""" 判断一个点是否在凸包(含边界)内 """
n = len(hull)
if n < 3:
# 对于点或线段,直接判断是否共线或在线段上
if n == 1:
return point == hull[0]
if n == 2:
a, b = hull
return abs(cross(a, b, point)) < 1e-10 and min(a[0], b[0]) <= point[0] <= max(a[0], b[0]) and min(a[1], b[1]) <= point[1] <= max(a[1], b[1])
return False
for i in range(n):
a = hull[i]
b = hull[(i + 1) % n]
if cross(a, b, point) < -1e-10: # 必须全部在左边或边上
return False
return True
def plot_convex_hull(points: List[Tuple[float, float]], hull: List[Tuple[float, float]], test_points: List[Tuple[float, float]] = None):
x_all, y_all = zip(*points)
fig, ax = plt.subplots()
ax.set_xlim(0, 1024)
ax.set_ylim(1280, 0)
ax.plot(x_all, y_all, 'o', label='Points')
# 凸包闭环线
hull_loop = hull + [hull[0]]
hx, hy = zip(*hull_loop)
ax.plot(hx, hy, 'r-', linewidth=2, label='Convex Hull')
# 如果有测试点
if test_points:
for pt in test_points:
color = 'green' if is_point_in_convex_hull(pt, hull) else 'black'
ax.plot(pt[0], pt[1], 's', color=color, markersize=8)
ax.text(pt[0] + 0.05, pt[1], f'{pt}', fontsize=9)
ax.legend()
ax.grid(True)
plt.title("Convex Hull Visualization")
plt.show()
def convex_scipy():
points = np.array([
[0, 0],
[2, 0],
[1, 1],
[2, 2],
[0, 2],
[1, 0.5]])
hull = ConvexHull(points)
# 凸包顶点的索引
print("凸包顶点索引:{}".format(hull.vertices))
print("凸包顶点坐标:")
for i in hull.vertices:
print(points[i])
# 将凸包坐标构造成 Polygon
hull_points = points[hull.vertices]
polygon = Polygon(hull_points)
# 判断一个点是否在凸包内
p = Point(1, 1) # 示例点
print("是否在凸包内:", polygon.contains(p)) # True or False
def test_convex():
# 测试数据
sample_points = [(0, 0), (1, 1), (2, 2), (2, 0), (0, 2), (1, 0.5)]
convex_hull = compute_convex_hull(sample_points)
# 测试点在凸包内
test_point_inside = (1, 1)
test_point_outside = (3, 3)
test_point_on_edge = (1, 0)
inside = is_point_in_convex_hull(test_point_inside, convex_hull)
outside = is_point_in_convex_hull(test_point_outside, convex_hull)
on_edge = is_point_in_convex_hull(test_point_on_edge, convex_hull)
convex_hull, inside, outside, on_edge
# 展示图像
plot_convex_hull(sample_points, convex_hull, [test_point_inside, test_point_outside, test_point_on_edge])
def array2frame(tboxes):
"tboxes: [x1, y1, x2, y2, track_id, score, cls, frame_index, box_index]"
idx = np.where(tboxes[:, 6] != 0)[0]
bboxes = tboxes[idx, :]
frameID = np.sort(np.unique(bboxes[:, 7].astype(int)))
fboxes = []
for fid in frameID:
idx = np.where(bboxes[:, 7] == fid)[0]
box = bboxes[idx, :]
fboxes.append(box)
return fboxes
def convex_based(tboxes, width, TH=40):
fboxes = array2frame(tboxes)
fnum = len(fboxes)
fids = np.array([i+1 for i in range(fnum)])[:, np.newaxis]
state = np.zeros((fnum, 1), dtype=np.int64)
frameState = np.concatenate((fids, state), axis = 1).astype(np.int64)
if fnum < width:
return frameState
for idx1 in range(width, fnum+1):
idx0 = idx1 - width
idx = idx1 - width//2 - 1
iboxes = fboxes[:idx]
cboxes = fboxes[idx][:, 0:4]
cur_xy = np.zeros((len(cboxes), 2))
cur_xy[:, 0] = (fboxes[idx][:, 0]+fboxes[idx][:, 2])/2
cur_xy[:, 1] = (fboxes[idx][:, 1]+fboxes[idx][:, 3])/2
for i in range(width//2):
x1, y1, x2, y2 = iboxes[i][:, 0], iboxes[i][:, 1], iboxes[i][:, 2], iboxes[i][:, 3]
boxes = np.array([(x1, y1), (x1, y2), (x2, y1), (x2, y2)]).transpose(0, 2, 1).reshape(-1, 2)
box1 = [(x, y) for x, y in boxes]
convex_hull = compute_convex_hull(box1)
for pt in cur_xy:
inside = is_point_in_convex_hull(pt, convex_hull)
if not inside:
break
if not inside:
break
# Based on the distance between the four corners of the current frame boxes
# and adjacent frame boxes
iboxes = fboxes[idx0:idx] + fboxes[idx+1:idx1]
cboxes = fboxes[idx][:, 0:4]
cx1, cy1, cx2, cy2 = cboxes[:, 0], cboxes[:, 1], cboxes[:, 2], cboxes[:, 3]
cxy = np.array([(cx1, cy1), (cx1, cy2), (cx2, cy1), (cx2, cy2)]).transpose(0, 2, 1).reshape(-1, 2)
iiboxes = np.concatenate(iboxes, axis=0)
ix1, iy1, ix2, iy2 = iiboxes[:, 0], iiboxes[:, 1], iiboxes[:, 2], iiboxes[:, 3]
ixy = np.array([(ix1, iy1), (ix1, iy2), (ix2, iy1), (ix2, iy2)]).transpose(0, 2, 1).reshape(-1, 2)
Dist = cdist(cxy, ixy).round(2)
max_dist = np.max(np.min(Dist, axis=1))
if max_dist > TH and not inside:
frameState[idx, 1] = 1
# plot_convex_hull(boxes, convex_hull, [pt])
frameState[idx, 1] = 1
return frameState
def single_point(tboxes, width, TH=60):
"""width: window width, >=2"""
fboxes = array2frame(tboxes)
fnum = len(fboxes)
fids = np.array([i+1 for i in range(fnum)])[:, np.newaxis]
state = np.zeros((fnum, 1), dtype=np.int64)
frameState = np.concatenate((fids, state), axis = 1).astype(np.int64)
if fnum < width:
return frameState
for idx1 in range(width, fnum+1):
idx0 = idx1 - width
idx = idx1 - width//2 - 1
iboxe1 = fboxes[idx0:idx]
iboxe2 = fboxes[idx+1:idx1]
iboxes = fboxes[idx0:idx] + fboxes[idx+1:idx1]
cboxes = fboxes[idx][:, 0:4]
cur_xy = np.zeros((len(cboxes), 2))
cur_xy[:, 0] = (fboxes[idx][:, 0]+fboxes[idx][:, 2])/2
cur_xy[:, 1] = (fboxes[idx][:, 1]+fboxes[idx][:, 3])/2
Dist = np.empty((len(cboxes), 0))
for i in range(width-1):
boxes = iboxes[i][:, 0:4]
box_xy = np.zeros((len(boxes), 2))
box_xy[:, 0] = (boxes[:, 0]+boxes[:, 2])/2
box_xy[:, 1] = (boxes[:, 1]+boxes[:, 3])/2
dist2 = cdist(cur_xy, box_xy).round(2)
Dist = np.concatenate((Dist, dist2), axis=1)
max_dist = np.max(np.min(Dist, axis=1))
if max_dist > TH:
frameState[idx, 1] = 1
return frameState
def intrude():
pkpath = Path("/home/wqg/dataset/small-goods/pkfiles")
savepath = Path("/home/wqg/dataset/small-goods/illustration_convex")
if not savepath.exists():
savepath.mkdir(parents=True, exist_ok=True)
err_trail, err_single, err_all = [], [], []
num = 0
for pth in pkpath.iterdir():
# item = r"69042386_20250407-145737_front_returnGood_b82d28427666_15_17700000001.pickle"
# pth = pkpath/item
with open(str(pth), 'rb') as f:
yrt = pickle.load(f)
evtname = pth.stem
bboxes = []
trackerboxes = np.empty((0, 10), dtype=np.float64)
for frameDict in yrt:
boxes = frameDict["bboxes"]
tboxes = frameDict["tboxes"]
tboxes = np.concatenate((tboxes, tboxes[:,7][:, None]), axis=1)
bboxes.append(boxes)
trackerboxes = np.concatenate((trackerboxes, np.array(tboxes)), axis=0)
'''single-points based for intrusion detection'''
# wd =5
# fstate1 = single_point(trackerboxes, wd)
'''convex-based '''
width = 5
fstate = convex_based(trackerboxes, width, TH=60)
# fstate = np.zeros(fstate1.shape)
# fstate[:, 0] = fstate1[:, 0]
# fstate[:, 1] = fstate1[:, 1] * fstate2[:, 1]
'''trajectory based for intrusion detection
period: 0 1 2 3
fid timestamp(fid) 基于滑动窗的tid扩展 滑动窗覆盖的运动区间
'''
win_width = 12
period, handState = devide_motion_state(trackerboxes, win_width)
num += 1
if np.all(period[:,2:4]==0):
err_trail.append(evtname)
if np.all(fstate[:,1]==0):
err_single.append(evtname)
if np.all(period[:,2:4]==0) and np.all(fstate[:,1]==0):
err_all.append(evtname)
fig, (ax1, ax2) = plt.subplots(2, 1)
ax1.plot(period[:, 1], period[:, 2], 'bo-', linewidth=1, markersize=4)
ax1.plot(period[:, 1], period[:, 3], 'rx-', linewidth=1, markersize=8)
ax2.plot(fstate[:, 0], fstate[:, 1], 'rx-', linewidth=1, markersize=8)
plt.savefig(os.path.join(str(savepath), f"{evtname}.png"))
plt.close()
# if num==1:
# break
rate_trail = 1 - len(err_trail)/num
rate_single = 1 - len(err_single)/num
rate_all = 1 - len(err_all)/num
print(f"rate_trail: {rate_trail}")
print(f"rate_single: {rate_single}")
print(f"rate_all: {rate_all}")
txtpath = savepath.parents[0] / "error.txt"
with open(str(txtpath), "w") as f:
f.write(f"rate_trail: {rate_trail}" + "\n")
f.write(f"rate_single: {rate_single}" + "\n")
f.write(f"rate_all: {rate_all}" + "\n")
f.write("\n" + "err_trail" + "\n")
for line in err_trail:
f.write(line + "\n")
f.write("\n" + "err_single" + "\n")
for line in err_single:
f.write(line + "\n")
f.write("\n" + "err_all" + "\n")
for line in err_all:
f.write(line + "\n")
print("Done!")
def run_yrt():
datapath = Path("/home/wqg/dataset/small-goods/videos/")
savepath = Path("/home/wqg/dataset/small-goods/result/")
pkpath = Path("/home/wqg/dataset/small-goods/pkfiles/")
if not savepath.exists():
savepath.mkdir(parents=True, exist_ok=True)
if not pkpath.exists():
pkpath.mkdir(parents=True, exist_ok=True)
optdict = {}
optdict["weights"] = ROOT / 'ckpts/best_v10s_width0375_1205.pt'
optdict["is_save_img"] = False
optdict["is_save_video"] = True
k = 0
for pth in datapath.iterdir():
item = "69042386_20250407-145819_back_returnGood_b82d28427666_15_17700000001.mp4"
pth = pth.parents[0] /item
optdict["source"] = pth
optdict["save_dir"] = savepath
# try:
yrtOut = yolov10_resnet_tracker(**optdict)
pkpath_ = pkpath / f"{Path(pth).stem}.pickle"
with open(str(pkpath_), 'wb') as f:
pickle.dump(yrtOut, f)
k += 1
if k==1:
break
# except Exception as e:
# print("abc")
if __name__ == '__main__':
# run_yrt()
intrude()
# test_convex()