From b3c4ed925011dba5b8b7cb0a3048ef6a47f4e9de Mon Sep 17 00:00:00 2001 From: Kenneth Estanislao Date: Sun, 16 Nov 2025 20:09:12 +0800 Subject: [PATCH] optimization with mac Hoping this would solve the mac issues, if you're a mac user, please report if there is an improvement --- modules/processors/frame/face_swapper.py | 136 +++++++++++++++-------- 1 file changed, 92 insertions(+), 44 deletions(-) diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 61b4ca3..a4ce20d 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -1,8 +1,9 @@ -from typing import Any, List +from typing import Any, List, Optional import cv2 import insightface import threading import numpy as np +import platform import modules.globals import modules.processors.frame.core from modules.core import update_status @@ -14,9 +15,9 @@ from modules.utilities import ( is_video, ) from modules.cluster_analysis import find_closest_centroid -# Removed modules.globals.face_swapper_enabled - assuming controlled elsewhere or implicitly true if used -# Removed modules.globals.opacity - accessed via getattr import os +from collections import deque +import time FACE_SWAPPER = None THREAD_LOCK = threading.Lock() @@ -26,6 +27,16 @@ NAME = "DLC.FACE-SWAPPER" PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step # --- END: Added for Interpolation --- +# --- START: Mac M1-M5 Optimizations --- +IS_APPLE_SILICON = platform.system() == 'Darwin' and platform.machine() == 'arm64' +FRAME_CACHE = deque(maxlen=3) # Cache for frame reuse +FACE_DETECTION_CACHE = {} # Cache face detections +LAST_DETECTION_TIME = 0 +DETECTION_INTERVAL = 0.033 # ~30 FPS detection rate for live mode +FRAME_SKIP_COUNTER = 0 +ADAPTIVE_QUALITY = True +# --- END: Mac M1-M5 Optimizations --- + abs_dir = os.path.dirname(os.path.abspath(__file__)) models_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" @@ -69,34 +80,34 @@ def get_face_swapper() -> Any: model_path = os.path.join(models_dir, model_name) update_status(f"Loading face swapper model from: {model_path}", NAME) try: - # Ensure the providers list is correctly passed - # Apply CoreML optimization for Mac systems + # Optimized provider configuration for Apple Silicon + providers_config = [] + for p in modules.globals.execution_providers: + if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON: + # Enhanced CoreML configuration for M1-M5 + providers_config.append(( + "CoreMLExecutionProvider", + { + "ModelFormat": "MLProgram", + "MLComputeUnits": "ALL", # Use Neural Engine + GPU + CPU + "SpecializationStrategy": "FastPrediction", + "AllowLowPrecisionAccumulationOnGPU": 1, + "EnableOnSubgraphs": 1, + "RequireStaticShapes": 0, + "MaximumCacheSize": 1024 * 1024 * 512, # 512MB cache + } + )) + else: + providers_config.append(p) + FACE_SWAPPER = insightface.model_zoo.get_model( model_path, - providers=[ - ( - ( - "CoreMLExecutionProvider", - { - "ModelFormat": "MLProgram", - "MLComputeUnits": "CPUAndGPU", - "SpecializationStrategy": "FastPrediction", - "AllowLowPrecisionAccumulationOnGPU": 1, - }, - ) - if p == "CoreMLExecutionProvider" - else p - ) - for p in modules.globals.execution_providers - ], + providers=providers_config, ) update_status("Face swapper model loaded successfully.", NAME) except Exception as e: update_status(f"Error loading face swapper model: {e}", NAME) - # print traceback maybe? - # import traceback - # traceback.print_exc() - FACE_SWAPPER = None # Ensure it remains None on failure + FACE_SWAPPER = None return None return FACE_SWAPPER @@ -105,19 +116,22 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: face_swapper = get_face_swapper() if face_swapper is None: update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME) - return temp_frame # Return original frame if model failed or not loaded + return temp_frame # Store a copy of the original frame before swapping for opacity blending original_frame = temp_frame.copy() - # --- Pre-swap Input Check (Optional but good practice) --- + # Pre-swap Input Check with optimization if temp_frame.dtype != np.uint8: - # print(f"Warning: Input frame is {temp_frame.dtype}, converting to uint8 before swap.") temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8) - # --- End Input Check --- - # Apply the face swap + # Apply the face swap with optimized memory handling try: + # For Apple Silicon, use optimized inference + if IS_APPLE_SILICON: + # Ensure contiguous memory layout for better performance + temp_frame = np.ascontiguousarray(temp_frame) + swapped_frame_raw = face_swapper.get( temp_frame, target_face, source_face, paste_back=True ) @@ -195,14 +209,50 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: return final_swapped_frame +# --- START: Mac M1-M5 Optimized Face Detection --- +def get_faces_optimized(frame: Frame, use_cache: bool = True) -> Optional[List[Face]]: + """Optimized face detection for live mode on Apple Silicon""" + global LAST_DETECTION_TIME, FACE_DETECTION_CACHE + + if not use_cache or not IS_APPLE_SILICON: + # Standard detection + if modules.globals.many_faces: + return get_many_faces(frame) + else: + face = get_one_face(frame) + return [face] if face else None + + # Adaptive detection rate for live mode + current_time = time.time() + time_since_last = current_time - LAST_DETECTION_TIME + + # Skip detection if too soon (adaptive frame skipping) + if time_since_last < DETECTION_INTERVAL and FACE_DETECTION_CACHE: + return FACE_DETECTION_CACHE.get('faces') + + # Perform detection + LAST_DETECTION_TIME = current_time + if modules.globals.many_faces: + faces = get_many_faces(frame) + else: + face = get_one_face(frame) + faces = [face] if face else None + + # Cache results + FACE_DETECTION_CACHE['faces'] = faces + FACE_DETECTION_CACHE['timestamp'] = current_time + + return faces +# --- END: Mac M1-M5 Optimized Face Detection --- + # --- START: Helper function for interpolation and sharpening --- def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame: - """Applies sharpening and interpolation.""" + """Applies sharpening and interpolation with Apple Silicon optimizations.""" global PREVIOUS_FRAME_RESULT processed_frame = current_frame.copy() - # 1. Apply Sharpening (if enabled) + # 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon sharpness_value = getattr(modules.globals, "sharpness", 0.0) if sharpness_value > 0.0 and swapped_face_bboxes: height, width = processed_frame.shape[:2] @@ -225,23 +275,21 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda continue face_region = processed_frame[y1:y2, x1:x2] - if face_region.size == 0: continue # Skip empty regions + if face_region.size == 0: continue - # Apply sharpening using addWeighted for smoother control - # Use try-except for GaussianBlur and addWeighted as they can fail on invalid inputs + # Apply sharpening with optimized parameters for Apple Silicon try: - blurred = cv2.GaussianBlur(face_region, (0, 0), 3) # sigma=3, kernel size auto - sharpened_region = cv2.addWeighted( + # Use smaller sigma for faster processing on Apple Silicon + sigma = 2 if IS_APPLE_SILICON else 3 + blurred = cv2.GaussianBlur(face_region, (0, 0), sigma) + sharpened_region = cv2.addWeighted( face_region, 1.0 + sharpness_value, blurred, -sharpness_value, 0 - ) - # Ensure the sharpened region doesn't have invalid values - sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8) - processed_frame[y1:y2, x1:x2] = sharpened_region - except cv2.error as sharpen_e: - # print(f"Warning: OpenCV error during sharpening: {sharpen_e} for bbox {bbox}") # Debug - # Skip sharpening for this region if it fails + ) + sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8) + processed_frame[y1:y2, x1:x2] = sharpened_region + except cv2.error: pass