mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-26 19:41:29 +08:00
memryx: fix model download bug when using multiple detectors (#20030)
* Add locking for model download files * ruff format --------- Co-authored-by: Abinila Siva <abinila.siva@memryx.com>
This commit is contained in:
@@ -177,6 +177,29 @@ class MemryXDetector(DetectionApi):
|
|||||||
logger.error(f"Failed to initialize MemryX model: {e}")
|
logger.error(f"Failed to initialize MemryX model: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def _acquire_file_lock(self, lock_path: str, timeout: int = 60, poll: float = 0.2):
|
||||||
|
"""
|
||||||
|
Create an exclusive lock file. Blocks (with polling) until it can acquire,
|
||||||
|
or raises TimeoutError. Uses only stdlib (os.O_EXCL).
|
||||||
|
"""
|
||||||
|
start = time.time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
|
||||||
|
os.close(fd)
|
||||||
|
return
|
||||||
|
except FileExistsError:
|
||||||
|
if time.time() - start > timeout:
|
||||||
|
raise TimeoutError(f"Timeout waiting for lock: {lock_path}")
|
||||||
|
time.sleep(poll)
|
||||||
|
|
||||||
|
def _release_file_lock(self, lock_path: str):
|
||||||
|
"""Best-effort removal of the lock file."""
|
||||||
|
try:
|
||||||
|
os.remove(lock_path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
def load_yolo_constants(self):
|
def load_yolo_constants(self):
|
||||||
base = f"{self.cache_dir}/{self.model_folder}"
|
base = f"{self.cache_dir}/{self.model_folder}"
|
||||||
# constants for yolov9 post-processing
|
# constants for yolov9 post-processing
|
||||||
@@ -188,122 +211,135 @@ class MemryXDetector(DetectionApi):
|
|||||||
if not os.path.exists(self.cache_dir):
|
if not os.path.exists(self.cache_dir):
|
||||||
os.makedirs(self.cache_dir, exist_ok=True)
|
os.makedirs(self.cache_dir, exist_ok=True)
|
||||||
|
|
||||||
# ---------- CASE 1: user provided a custom model path ----------
|
lock_path = os.path.join(self.cache_dir, f".{self.model_folder}.lock")
|
||||||
if self.memx_model_path:
|
self._acquire_file_lock(lock_path)
|
||||||
if not self.memx_model_path.endswith(".zip"):
|
|
||||||
raise ValueError(
|
|
||||||
f"Invalid model path: {self.memx_model_path}. "
|
|
||||||
"Only .zip files are supported. Please provide a .zip model archive."
|
|
||||||
)
|
|
||||||
if not os.path.exists(self.memx_model_path):
|
|
||||||
raise FileNotFoundError(
|
|
||||||
f"Custom model zip not found: {self.memx_model_path}"
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"User provided zip model: {self.memx_model_path}")
|
|
||||||
|
|
||||||
# Extract custom zip into a separate area so it never clashes with MemryX cache
|
|
||||||
custom_dir = os.path.join(
|
|
||||||
self.cache_dir, "custom_models", self.model_folder
|
|
||||||
)
|
|
||||||
if os.path.isdir(custom_dir):
|
|
||||||
shutil.rmtree(custom_dir)
|
|
||||||
os.makedirs(custom_dir, exist_ok=True)
|
|
||||||
|
|
||||||
with zipfile.ZipFile(self.memx_model_path, "r") as zip_ref:
|
|
||||||
zip_ref.extractall(custom_dir)
|
|
||||||
logger.info(f"Custom model extracted to {custom_dir}.")
|
|
||||||
|
|
||||||
# Find .dfp and optional *_post.onnx recursively
|
|
||||||
dfp_candidates = glob.glob(
|
|
||||||
os.path.join(custom_dir, "**", "*.dfp"), recursive=True
|
|
||||||
)
|
|
||||||
post_candidates = glob.glob(
|
|
||||||
os.path.join(custom_dir, "**", "*_post.onnx"), recursive=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if not dfp_candidates:
|
|
||||||
raise FileNotFoundError(
|
|
||||||
"No .dfp file found in custom model zip after extraction."
|
|
||||||
)
|
|
||||||
|
|
||||||
self.memx_model_path = dfp_candidates[0]
|
|
||||||
|
|
||||||
# Handle post model requirements by model type
|
|
||||||
if self.memx_model_type in [
|
|
||||||
ModelTypeEnum.yologeneric,
|
|
||||||
ModelTypeEnum.yolonas,
|
|
||||||
ModelTypeEnum.ssd,
|
|
||||||
]:
|
|
||||||
if not post_candidates:
|
|
||||||
raise FileNotFoundError(
|
|
||||||
f"No *_post.onnx file found in custom model zip for {self.memx_model_type.name}."
|
|
||||||
)
|
|
||||||
self.memx_post_model = post_candidates[0]
|
|
||||||
elif self.memx_model_type == ModelTypeEnum.yolox:
|
|
||||||
# Explicitly ignore any post model even if present
|
|
||||||
self.memx_post_model = None
|
|
||||||
else:
|
|
||||||
# Future model types can optionally use post if present
|
|
||||||
self.memx_post_model = post_candidates[0] if post_candidates else None
|
|
||||||
|
|
||||||
logger.info(f"Using custom model: {self.memx_model_path}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# ---------- CASE 2: no custom model path -> use MemryX cached models ----------
|
|
||||||
model_subdir = os.path.join(self.cache_dir, self.model_folder)
|
|
||||||
dfp_path = os.path.join(model_subdir, self.expected_dfp_model)
|
|
||||||
post_path = (
|
|
||||||
os.path.join(model_subdir, self.expected_post_model)
|
|
||||||
if self.expected_post_model
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
|
|
||||||
dfp_exists = os.path.exists(dfp_path)
|
|
||||||
post_exists = os.path.exists(post_path) if post_path else True
|
|
||||||
|
|
||||||
if dfp_exists and post_exists:
|
|
||||||
logger.info("Using cached models.")
|
|
||||||
self.memx_model_path = dfp_path
|
|
||||||
self.memx_post_model = post_path
|
|
||||||
if self.memx_model_type == ModelTypeEnum.yologeneric:
|
|
||||||
self.load_yolo_constants()
|
|
||||||
return
|
|
||||||
|
|
||||||
# ---------- CASE 3: download MemryX model (no cache) ----------
|
|
||||||
logger.info(
|
|
||||||
f"Model files not found locally. Downloading from {self.model_url}..."
|
|
||||||
)
|
|
||||||
zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not os.path.exists(zip_path):
|
# ---------- CASE 1: user provided a custom model path ----------
|
||||||
urllib.request.urlretrieve(self.model_url, zip_path)
|
if self.memx_model_path:
|
||||||
logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...")
|
if not self.memx_model_path.endswith(".zip"):
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid model path: {self.memx_model_path}. "
|
||||||
|
"Only .zip files are supported. Please provide a .zip model archive."
|
||||||
|
)
|
||||||
|
if not os.path.exists(self.memx_model_path):
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"Custom model zip not found: {self.memx_model_path}"
|
||||||
|
)
|
||||||
|
|
||||||
if not os.path.exists(model_subdir):
|
logger.info(f"User provided zip model: {self.memx_model_path}")
|
||||||
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
||||||
zip_ref.extractall(self.cache_dir)
|
|
||||||
logger.info(f"Model extracted to {self.cache_dir}.")
|
|
||||||
|
|
||||||
# Re-assign model paths after extraction
|
# Extract custom zip into a separate area so it never clashes with MemryX cache
|
||||||
self.memx_model_path = os.path.join(model_subdir, self.expected_dfp_model)
|
custom_dir = os.path.join(
|
||||||
self.memx_post_model = (
|
self.cache_dir, "custom_models", self.model_folder
|
||||||
|
)
|
||||||
|
if os.path.isdir(custom_dir):
|
||||||
|
shutil.rmtree(custom_dir)
|
||||||
|
os.makedirs(custom_dir, exist_ok=True)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(self.memx_model_path, "r") as zip_ref:
|
||||||
|
zip_ref.extractall(custom_dir)
|
||||||
|
logger.info(f"Custom model extracted to {custom_dir}.")
|
||||||
|
|
||||||
|
# Find .dfp and optional *_post.onnx recursively
|
||||||
|
dfp_candidates = glob.glob(
|
||||||
|
os.path.join(custom_dir, "**", "*.dfp"), recursive=True
|
||||||
|
)
|
||||||
|
post_candidates = glob.glob(
|
||||||
|
os.path.join(custom_dir, "**", "*_post.onnx"), recursive=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if not dfp_candidates:
|
||||||
|
raise FileNotFoundError(
|
||||||
|
"No .dfp file found in custom model zip after extraction."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.memx_model_path = dfp_candidates[0]
|
||||||
|
|
||||||
|
# Handle post model requirements by model type
|
||||||
|
if self.memx_model_type in [
|
||||||
|
ModelTypeEnum.yologeneric,
|
||||||
|
ModelTypeEnum.yolonas,
|
||||||
|
ModelTypeEnum.ssd,
|
||||||
|
]:
|
||||||
|
if not post_candidates:
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"No *_post.onnx file found in custom model zip for {self.memx_model_type.name}."
|
||||||
|
)
|
||||||
|
self.memx_post_model = post_candidates[0]
|
||||||
|
elif self.memx_model_type == ModelTypeEnum.yolox:
|
||||||
|
# Explicitly ignore any post model even if present
|
||||||
|
self.memx_post_model = None
|
||||||
|
else:
|
||||||
|
# Future model types can optionally use post if present
|
||||||
|
self.memx_post_model = (
|
||||||
|
post_candidates[0] if post_candidates else None
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Using custom model: {self.memx_model_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# ---------- CASE 2: no custom model path -> use MemryX cached models ----------
|
||||||
|
model_subdir = os.path.join(self.cache_dir, self.model_folder)
|
||||||
|
dfp_path = os.path.join(model_subdir, self.expected_dfp_model)
|
||||||
|
post_path = (
|
||||||
os.path.join(model_subdir, self.expected_post_model)
|
os.path.join(model_subdir, self.expected_post_model)
|
||||||
if self.expected_post_model
|
if self.expected_post_model
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.memx_model_type == ModelTypeEnum.yologeneric:
|
dfp_exists = os.path.exists(dfp_path)
|
||||||
self.load_yolo_constants()
|
post_exists = os.path.exists(post_path) if post_path else True
|
||||||
|
|
||||||
|
if dfp_exists and post_exists:
|
||||||
|
logger.info("Using cached models.")
|
||||||
|
self.memx_model_path = dfp_path
|
||||||
|
self.memx_post_model = post_path
|
||||||
|
if self.memx_model_type == ModelTypeEnum.yologeneric:
|
||||||
|
self.load_yolo_constants()
|
||||||
|
return
|
||||||
|
|
||||||
|
# ---------- CASE 3: download MemryX model (no cache) ----------
|
||||||
|
logger.info(
|
||||||
|
f"Model files not found locally. Downloading from {self.model_url}..."
|
||||||
|
)
|
||||||
|
zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip")
|
||||||
|
|
||||||
|
try:
|
||||||
|
if not os.path.exists(zip_path):
|
||||||
|
urllib.request.urlretrieve(self.model_url, zip_path)
|
||||||
|
logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...")
|
||||||
|
|
||||||
|
if not os.path.exists(model_subdir):
|
||||||
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
||||||
|
zip_ref.extractall(self.cache_dir)
|
||||||
|
logger.info(f"Model extracted to {self.cache_dir}.")
|
||||||
|
|
||||||
|
# Re-assign model paths after extraction
|
||||||
|
self.memx_model_path = os.path.join(
|
||||||
|
model_subdir, self.expected_dfp_model
|
||||||
|
)
|
||||||
|
self.memx_post_model = (
|
||||||
|
os.path.join(model_subdir, self.expected_post_model)
|
||||||
|
if self.expected_post_model
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.memx_model_type == ModelTypeEnum.yologeneric:
|
||||||
|
self.load_yolo_constants()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if os.path.exists(zip_path):
|
||||||
|
try:
|
||||||
|
os.remove(zip_path)
|
||||||
|
logger.info("Cleaned up ZIP file after extraction.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to remove downloaded zip {zip_path}: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if os.path.exists(zip_path):
|
self._release_file_lock(lock_path)
|
||||||
try:
|
|
||||||
os.remove(zip_path)
|
|
||||||
logger.info("Cleaned up ZIP file after extraction.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to remove downloaded zip {zip_path}: {e}")
|
|
||||||
|
|
||||||
def send_input(self, connection_id, tensor_input: np.ndarray):
|
def send_input(self, connection_id, tensor_input: np.ndarray):
|
||||||
"""Pre-process (if needed) and send frame to MemryX input queue"""
|
"""Pre-process (if needed) and send frame to MemryX input queue"""
|
||||||
|
Reference in New Issue
Block a user