mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-26 03:21:28 +08:00
Implement stationary car classifier to improve parked car management (#20206)
* Implement stationary car classifier to base stationary state on visual changes and not just bounding box stability * Cleanup * Fix mypy * Move to new file and add config to disable if needed * Cleanup * Undo
This commit is contained in:
@@ -287,6 +287,9 @@ detect:
|
||||
max_disappeared: 25
|
||||
# Optional: Configuration for stationary object tracking
|
||||
stationary:
|
||||
# Optional: Stationary classifier that uses visual characteristics to determine if an object
|
||||
# is stationary even if the box changes enough to be considered motion (default: shown below).
|
||||
classifier: True
|
||||
# Optional: Frequency for confirming stationary objects (default: same as threshold)
|
||||
# When set to 1, object detection will run to confirm the object still exists on every frame.
|
||||
# If set to 10, object detection will run to confirm the object still exists on every 10th frame.
|
||||
|
@@ -29,6 +29,10 @@ class StationaryConfig(FrigateBaseModel):
|
||||
default_factory=StationaryMaxFramesConfig,
|
||||
title="Max frames for stationary objects.",
|
||||
)
|
||||
classifier: bool = Field(
|
||||
default=True,
|
||||
title="Enable visual classifier for determing if objects with jittery bounding boxes are stationary.",
|
||||
)
|
||||
|
||||
|
||||
class DetectConfig(FrigateBaseModel):
|
||||
|
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
from typing import Any, Sequence
|
||||
from typing import Any, Sequence, cast
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
@@ -17,6 +17,7 @@ from frigate.camera import PTZMetrics
|
||||
from frigate.config import CameraConfig
|
||||
from frigate.ptz.autotrack import PtzMotionEstimator
|
||||
from frigate.track import ObjectTracker
|
||||
from frigate.track.stationary_classifier import StationaryMotionClassifier
|
||||
from frigate.util.image import (
|
||||
SharedMemoryFrameManager,
|
||||
get_histogram,
|
||||
@@ -119,6 +120,7 @@ class NorfairTracker(ObjectTracker):
|
||||
self.ptz_motion_estimator: PtzMotionEstimator | None = None
|
||||
self.camera_name = config.name
|
||||
self.track_id_map: dict[str, str] = {}
|
||||
self.stationary_classifier = StationaryMotionClassifier()
|
||||
|
||||
# Define tracker configurations for static camera
|
||||
self.object_type_configs = {
|
||||
@@ -321,7 +323,13 @@ class NorfairTracker(ObjectTracker):
|
||||
|
||||
# tracks the current position of the object based on the last N bounding boxes
|
||||
# returns False if the object has moved outside its previous position
|
||||
def update_position(self, id: str, box: list[int], stationary: bool) -> bool:
|
||||
def update_position(
|
||||
self,
|
||||
id: str,
|
||||
box: list[int],
|
||||
stationary: bool,
|
||||
yuv_frame: np.ndarray | None,
|
||||
) -> bool:
|
||||
xmin, ymin, xmax, ymax = box
|
||||
position = self.positions[id]
|
||||
self.stationary_box_history[id].append(box)
|
||||
@@ -331,30 +339,44 @@ class NorfairTracker(ObjectTracker):
|
||||
-MAX_STATIONARY_HISTORY:
|
||||
]
|
||||
|
||||
avg_iou = intersection_over_union(
|
||||
box, average_boxes(self.stationary_box_history[id])
|
||||
)
|
||||
avg_box = average_boxes(self.stationary_box_history[id])
|
||||
avg_iou = intersection_over_union(box, avg_box)
|
||||
median_box = median_of_boxes(self.stationary_box_history[id])
|
||||
|
||||
# Establish anchor early when stationary and stable
|
||||
if stationary and yuv_frame is not None:
|
||||
history = self.stationary_box_history[id]
|
||||
if id not in self.stationary_classifier.anchor_crops and len(history) >= 5:
|
||||
stability_iou = intersection_over_union(avg_box, median_box)
|
||||
if stability_iou >= 0.7:
|
||||
self.stationary_classifier.ensure_anchor(
|
||||
id, yuv_frame, cast(tuple[int, int, int, int], median_box)
|
||||
)
|
||||
|
||||
# object has minimal or zero iou
|
||||
# assume object is active
|
||||
if avg_iou < THRESHOLD_KNOWN_ACTIVE_IOU:
|
||||
self.positions[id] = {
|
||||
"xmins": [xmin],
|
||||
"ymins": [ymin],
|
||||
"xmaxs": [xmax],
|
||||
"ymaxs": [ymax],
|
||||
"xmin": xmin,
|
||||
"ymin": ymin,
|
||||
"xmax": xmax,
|
||||
"ymax": ymax,
|
||||
}
|
||||
return False
|
||||
if stationary and yuv_frame is not None:
|
||||
if not self.stationary_classifier.evaluate(
|
||||
id, yuv_frame, cast(tuple[int, int, int, int], tuple(box))
|
||||
):
|
||||
self.positions[id] = {
|
||||
"xmins": [xmin],
|
||||
"ymins": [ymin],
|
||||
"xmaxs": [xmax],
|
||||
"ymaxs": [ymax],
|
||||
"xmin": xmin,
|
||||
"ymin": ymin,
|
||||
"xmax": xmax,
|
||||
"ymax": ymax,
|
||||
}
|
||||
return False
|
||||
|
||||
threshold = (
|
||||
THRESHOLD_STATIONARY_CHECK_IOU if stationary else THRESHOLD_ACTIVE_CHECK_IOU
|
||||
)
|
||||
|
||||
# object has iou below threshold, check median to reduce outliers
|
||||
# object has iou below threshold, check median and optionally crop similarity
|
||||
if avg_iou < threshold:
|
||||
median_iou = intersection_over_union(
|
||||
(
|
||||
@@ -363,23 +385,28 @@ class NorfairTracker(ObjectTracker):
|
||||
position["xmax"],
|
||||
position["ymax"],
|
||||
),
|
||||
median_of_boxes(self.stationary_box_history[id]),
|
||||
median_box,
|
||||
)
|
||||
|
||||
# if the median iou drops below the threshold
|
||||
# assume object is no longer stationary
|
||||
if median_iou < threshold:
|
||||
self.positions[id] = {
|
||||
"xmins": [xmin],
|
||||
"ymins": [ymin],
|
||||
"xmaxs": [xmax],
|
||||
"ymaxs": [ymax],
|
||||
"xmin": xmin,
|
||||
"ymin": ymin,
|
||||
"xmax": xmax,
|
||||
"ymax": ymax,
|
||||
}
|
||||
return False
|
||||
# Before flipping to active, check with classifier if we have YUV frame
|
||||
if stationary and yuv_frame is not None:
|
||||
if not self.stationary_classifier.evaluate(
|
||||
id, yuv_frame, cast(tuple[int, int, int, int], tuple(box))
|
||||
):
|
||||
self.positions[id] = {
|
||||
"xmins": [xmin],
|
||||
"ymins": [ymin],
|
||||
"xmaxs": [xmax],
|
||||
"ymaxs": [ymax],
|
||||
"xmin": xmin,
|
||||
"ymin": ymin,
|
||||
"xmax": xmax,
|
||||
"ymax": ymax,
|
||||
}
|
||||
return False
|
||||
|
||||
# if there are more than 5 and less than 10 entries for the position, add the bounding box
|
||||
# and recompute the position box
|
||||
@@ -416,7 +443,12 @@ class NorfairTracker(ObjectTracker):
|
||||
|
||||
return False
|
||||
|
||||
def update(self, track_id: str, obj: dict[str, Any]) -> None:
|
||||
def update(
|
||||
self,
|
||||
track_id: str,
|
||||
obj: dict[str, Any],
|
||||
yuv_frame: np.ndarray | None,
|
||||
) -> None:
|
||||
id = self.track_id_map[track_id]
|
||||
self.disappeared[id] = 0
|
||||
stationary = (
|
||||
@@ -424,7 +456,7 @@ class NorfairTracker(ObjectTracker):
|
||||
>= self.detect_config.stationary.threshold
|
||||
)
|
||||
# update the motionless count if the object has not moved to a new position
|
||||
if self.update_position(id, obj["box"], stationary):
|
||||
if self.update_position(id, obj["box"], stationary, yuv_frame):
|
||||
self.tracked_objects[id]["motionless_count"] += 1
|
||||
if self.is_expired(id):
|
||||
self.deregister(id, track_id)
|
||||
@@ -440,6 +472,7 @@ class NorfairTracker(ObjectTracker):
|
||||
self.tracked_objects[id]["position_changes"] += 1
|
||||
self.tracked_objects[id]["motionless_count"] = 0
|
||||
self.stationary_box_history[id] = []
|
||||
self.stationary_classifier.on_active(id)
|
||||
|
||||
self.tracked_objects[id].update(obj)
|
||||
|
||||
@@ -467,6 +500,15 @@ class NorfairTracker(ObjectTracker):
|
||||
) -> None:
|
||||
# Group detections by object type
|
||||
detections_by_type: dict[str, list[Detection]] = {}
|
||||
yuv_frame: np.ndarray | None = None
|
||||
|
||||
if self.ptz_metrics.autotracker_enabled.value or (
|
||||
self.detect_config.stationary.classifier
|
||||
and any(obj[0] == "car" for obj in detections)
|
||||
):
|
||||
yuv_frame = self.frame_manager.get(
|
||||
frame_name, self.camera_config.frame_shape_yuv
|
||||
)
|
||||
for obj in detections:
|
||||
label = obj[0]
|
||||
if label not in detections_by_type:
|
||||
@@ -481,9 +523,6 @@ class NorfairTracker(ObjectTracker):
|
||||
|
||||
embedding = None
|
||||
if self.ptz_metrics.autotracker_enabled.value:
|
||||
yuv_frame = self.frame_manager.get(
|
||||
frame_name, self.camera_config.frame_shape_yuv
|
||||
)
|
||||
embedding = get_histogram(
|
||||
yuv_frame, obj[2][0], obj[2][1], obj[2][2], obj[2][3]
|
||||
)
|
||||
@@ -575,7 +614,7 @@ class NorfairTracker(ObjectTracker):
|
||||
self.tracked_objects[id]["estimate"] = new_obj["estimate"]
|
||||
# else update it
|
||||
else:
|
||||
self.update(str(t.global_id), new_obj)
|
||||
self.update(str(t.global_id), new_obj, yuv_frame)
|
||||
|
||||
# clear expired tracks
|
||||
expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids]
|
||||
|
202
frigate/track/stationary_classifier.py
Normal file
202
frigate/track/stationary_classifier.py
Normal file
@@ -0,0 +1,202 @@
|
||||
"""Tools for determining if an object is stationary."""
|
||||
|
||||
import logging
|
||||
from typing import Any, cast
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from scipy.ndimage import gaussian_filter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
THRESHOLD_KNOWN_ACTIVE_IOU = 0.2
|
||||
THRESHOLD_STATIONARY_CHECK_IOU = 0.6
|
||||
THRESHOLD_ACTIVE_CHECK_IOU = 0.9
|
||||
MAX_STATIONARY_HISTORY = 10
|
||||
|
||||
|
||||
class StationaryMotionClassifier:
|
||||
"""Fallback classifier to prevent false flips from stationary to active.
|
||||
|
||||
Uses appearance consistency on a fixed spatial region (historical median box)
|
||||
to detect actual movement, ignoring bounding box detection variations.
|
||||
"""
|
||||
|
||||
CROP_SIZE = 96
|
||||
NCC_KEEP_THRESHOLD = 0.90 # High correlation = keep stationary
|
||||
NCC_ACTIVE_THRESHOLD = 0.85 # Low correlation = consider active
|
||||
SHIFT_KEEP_THRESHOLD = 0.02 # Small shift = keep stationary
|
||||
SHIFT_ACTIVE_THRESHOLD = 0.04 # Large shift = consider active
|
||||
DRIFT_ACTIVE_THRESHOLD = 0.12 # Cumulative drift over 5 frames
|
||||
CHANGED_FRAMES_TO_FLIP = 2
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.anchor_crops: dict[str, np.ndarray] = {}
|
||||
self.anchor_boxes: dict[str, tuple[int, int, int, int]] = {}
|
||||
self.changed_counts: dict[str, int] = {}
|
||||
self.shift_histories: dict[str, list[float]] = {}
|
||||
|
||||
# Pre-compute Hanning window for phase correlation
|
||||
hann = np.hanning(self.CROP_SIZE).astype(np.float64)
|
||||
self._hann2d = np.outer(hann, hann)
|
||||
|
||||
def reset(self, id: str) -> None:
|
||||
logger.debug("StationaryMotionClassifier.reset: id=%s", id)
|
||||
if id in self.anchor_crops:
|
||||
del self.anchor_crops[id]
|
||||
if id in self.anchor_boxes:
|
||||
del self.anchor_boxes[id]
|
||||
self.changed_counts[id] = 0
|
||||
self.shift_histories[id] = []
|
||||
|
||||
def _extract_y_crop(
|
||||
self, yuv_frame: np.ndarray, box: tuple[int, int, int, int]
|
||||
) -> np.ndarray:
|
||||
"""Extract and normalize Y-plane crop from bounding box."""
|
||||
y_height = yuv_frame.shape[0] // 3 * 2
|
||||
width = yuv_frame.shape[1]
|
||||
x1 = max(0, min(width - 1, box[0]))
|
||||
y1 = max(0, min(y_height - 1, box[1]))
|
||||
x2 = max(0, min(width - 1, box[2]))
|
||||
y2 = max(0, min(y_height - 1, box[3]))
|
||||
|
||||
if x2 <= x1:
|
||||
x2 = min(width - 1, x1 + 1)
|
||||
if y2 <= y1:
|
||||
y2 = min(y_height - 1, y1 + 1)
|
||||
|
||||
# Extract Y-plane crop, resize, and blur
|
||||
y_plane = yuv_frame[0:y_height, 0:width]
|
||||
crop = y_plane[y1:y2, x1:x2]
|
||||
crop_resized = cv2.resize(
|
||||
crop, (self.CROP_SIZE, self.CROP_SIZE), interpolation=cv2.INTER_AREA
|
||||
)
|
||||
result = cast(np.ndarray[Any, Any], gaussian_filter(crop_resized, sigma=0.5))
|
||||
logger.debug(
|
||||
"_extract_y_crop: box=%s clamped=(%d,%d,%d,%d) crop_shape=%s",
|
||||
box,
|
||||
x1,
|
||||
y1,
|
||||
x2,
|
||||
y2,
|
||||
crop.shape if "crop" in locals() else None,
|
||||
)
|
||||
return result
|
||||
|
||||
def ensure_anchor(
|
||||
self, id: str, yuv_frame: np.ndarray, median_box: tuple[int, int, int, int]
|
||||
) -> None:
|
||||
"""Initialize anchor crop from stable median box when object becomes stationary."""
|
||||
if id not in self.anchor_crops:
|
||||
self.anchor_boxes[id] = median_box
|
||||
self.anchor_crops[id] = self._extract_y_crop(yuv_frame, median_box)
|
||||
self.changed_counts[id] = 0
|
||||
self.shift_histories[id] = []
|
||||
logger.debug(
|
||||
"ensure_anchor: initialized id=%s median_box=%s crop_shape=%s",
|
||||
id,
|
||||
median_box,
|
||||
self.anchor_crops[id].shape,
|
||||
)
|
||||
|
||||
def on_active(self, id: str) -> None:
|
||||
"""Reset state when object becomes active to allow re-anchoring."""
|
||||
logger.debug("on_active: id=%s became active; resetting state", id)
|
||||
self.reset(id)
|
||||
|
||||
def evaluate(
|
||||
self, id: str, yuv_frame: np.ndarray, current_box: tuple[int, int, int, int]
|
||||
) -> bool:
|
||||
"""Return True to keep stationary, False to flip to active.
|
||||
|
||||
Compares the same spatial region (historical median box) across frames
|
||||
to detect actual movement, ignoring bounding box variations.
|
||||
"""
|
||||
|
||||
if id not in self.anchor_crops or id not in self.anchor_boxes:
|
||||
logger.debug("evaluate: id=%s has no anchor; default keep stationary", id)
|
||||
return True
|
||||
|
||||
# Compare same spatial region across frames
|
||||
anchor_box = self.anchor_boxes[id]
|
||||
anchor_crop = self.anchor_crops[id]
|
||||
curr_crop = self._extract_y_crop(yuv_frame, anchor_box)
|
||||
|
||||
# Compute appearance and motion metrics
|
||||
ncc = cv2.matchTemplate(curr_crop, anchor_crop, cv2.TM_CCOEFF_NORMED)[0, 0]
|
||||
a64 = anchor_crop.astype(np.float64) * self._hann2d
|
||||
c64 = curr_crop.astype(np.float64) * self._hann2d
|
||||
(shift_x, shift_y), _ = cv2.phaseCorrelate(a64, c64)
|
||||
shift_norm = float(np.hypot(shift_x, shift_y)) / float(self.CROP_SIZE)
|
||||
|
||||
logger.debug(
|
||||
"evaluate: id=%s metrics ncc=%.4f shift_norm=%.4f (shift_x=%.3f, shift_y=%.3f)",
|
||||
id,
|
||||
float(ncc),
|
||||
shift_norm,
|
||||
float(shift_x),
|
||||
float(shift_y),
|
||||
)
|
||||
|
||||
# Update rolling shift history
|
||||
history = self.shift_histories.get(id, [])
|
||||
history.append(shift_norm)
|
||||
if len(history) > 5:
|
||||
history = history[-5:]
|
||||
self.shift_histories[id] = history
|
||||
drift_sum = float(sum(history))
|
||||
|
||||
logger.debug(
|
||||
"evaluate: id=%s history_len=%d last_shift=%.4f drift_sum=%.4f",
|
||||
id,
|
||||
len(history),
|
||||
history[-1] if history else -1.0,
|
||||
drift_sum,
|
||||
)
|
||||
|
||||
# Early exit for clear stationary case
|
||||
if ncc >= self.NCC_KEEP_THRESHOLD and shift_norm < self.SHIFT_KEEP_THRESHOLD:
|
||||
self.changed_counts[id] = 0
|
||||
logger.debug(
|
||||
"evaluate: id=%s early-stationary keep=True (ncc>=%.2f and shift<%.2f)",
|
||||
id,
|
||||
self.NCC_KEEP_THRESHOLD,
|
||||
self.SHIFT_KEEP_THRESHOLD,
|
||||
)
|
||||
return True
|
||||
|
||||
# Check for movement indicators
|
||||
movement_detected = (
|
||||
ncc < self.NCC_ACTIVE_THRESHOLD
|
||||
or shift_norm >= self.SHIFT_ACTIVE_THRESHOLD
|
||||
or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD
|
||||
)
|
||||
|
||||
if movement_detected:
|
||||
cnt = self.changed_counts.get(id, 0) + 1
|
||||
self.changed_counts[id] = cnt
|
||||
if (
|
||||
cnt >= self.CHANGED_FRAMES_TO_FLIP
|
||||
or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD
|
||||
):
|
||||
logger.debug(
|
||||
"evaluate: id=%s flip_to_active=True cnt=%d drift_sum=%.4f thresholds(changed>=%d drift>=%.2f)",
|
||||
id,
|
||||
cnt,
|
||||
drift_sum,
|
||||
self.CHANGED_FRAMES_TO_FLIP,
|
||||
self.DRIFT_ACTIVE_THRESHOLD,
|
||||
)
|
||||
return False
|
||||
logger.debug(
|
||||
"evaluate: id=%s movement_detected cnt=%d keep_until_cnt>=%d",
|
||||
id,
|
||||
cnt,
|
||||
self.CHANGED_FRAMES_TO_FLIP,
|
||||
)
|
||||
else:
|
||||
self.changed_counts[id] = 0
|
||||
logger.debug("evaluate: id=%s no_movement keep=True", id)
|
||||
|
||||
return True
|
Reference in New Issue
Block a user