mirror of
https://github.com/hacksider/Deep-Live-Cam.git
synced 2025-10-05 00:12:49 +08:00
Compare commits
35 Commits
refactorin
...
experiment
Author | SHA1 | Date | |
---|---|---|---|
![]() |
61dae91439 | ||
![]() |
5d450b4352 | ||
![]() |
a11ccf9c49 | ||
![]() |
59cc742197 | ||
![]() |
2066560a95 | ||
![]() |
1af9abda2f | ||
![]() |
91884eebf7 | ||
![]() |
4686716c59 | ||
![]() |
f4028d3949 | ||
![]() |
07c735e9d2 | ||
![]() |
aa021b6aa0 | ||
![]() |
0e3805e200 | ||
![]() |
5cabbffda8 | ||
![]() |
0d4676591e | ||
![]() |
c2cc885672 | ||
![]() |
e36c746c81 | ||
![]() |
14ab470dcc | ||
![]() |
4dc4746235 | ||
![]() |
ac8feff652 | ||
![]() |
a90c4facc5 | ||
![]() |
575373beac | ||
![]() |
b8cdad5cce | ||
![]() |
137ac597ef | ||
![]() |
f976885456 | ||
![]() |
cd2c3c2103 | ||
![]() |
3fbc1d0433 | ||
![]() |
b4cf8854f8 | ||
![]() |
eb733ad8c5 | ||
![]() |
c6c41b8d0d | ||
![]() |
55c8d8181c | ||
![]() |
4ddcd60c49 | ||
![]() |
408b0f4cf7 | ||
![]() |
78c808ef36 | ||
![]() |
6b0cc74957 | ||
![]() |
8d3072d906 |
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.10.14
|
17
README.md
17
README.md
@@ -24,7 +24,7 @@ Users of this software are expected to use this software responsibly while abidi
|
||||
#### 3. Download Models
|
||||
|
||||
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth)
|
||||
2. [inswapper_128_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx)
|
||||
2. [inswapper_128_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx)
|
||||
|
||||
Then put those 2 files on the "**models**" folder
|
||||
|
||||
@@ -142,30 +142,43 @@ Just follow the clicks on the screenshot
|
||||
Just use your favorite screencapture to stream like OBS
|
||||
> Note: In case you want to change your face, just select another picture, the preview mode will then restart (so just wait a bit).
|
||||
|
||||
You can now use the virtual camera output (uses pyvirtualcam) by turning on the `Virtual Cam Output (OBS)` toggle which should output to the OBS Virtual Camera. Note: this may not work on macOS. You will get a preview as before, but now you will also have a virtual camera output which can be used in applications like Zoom.
|
||||
|
||||
Additional command line arguments are given below. To learn out what they do, check [this guide](https://github.com/s0md3v/roop/wiki/Advanced-Options).
|
||||
|
||||
|
||||
```
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-s SOURCE_PATH, --source SOURCE_PATH select an source image
|
||||
-t TARGET_PATH, --target TARGET_PATH select an target image or video
|
||||
-o OUTPUT_PATH, --output OUTPUT_PATH select output file or directory
|
||||
--frame-processor FRAME_PROCESSOR [FRAME_PROCESSOR ...] frame processors (choices: face_swapper, face_enhancer, ...)
|
||||
--frame-processor FRAME_PROCESSOR [FRAME_PROCESSOR ...] frame processors (choices: face_swapper, face_enhancer, super_resolution...)
|
||||
--keep-fps keep original fps
|
||||
--keep-audio keep original audio
|
||||
--keep-frames keep temporary frames
|
||||
--many-faces process every face
|
||||
--video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder
|
||||
--video-quality [0-51] adjust output video quality
|
||||
--live-mirror the live camera display as you see it in the front-facing camera frame
|
||||
--live-resizable the live camera frame is resizable
|
||||
--max-memory MAX_MEMORY maximum amount of RAM in GB
|
||||
--execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...)
|
||||
--execution-threads EXECUTION_THREADS number of execution threads
|
||||
--headless run in headless mode
|
||||
--enhancer-upscale-factor Sets the upscale factor for the enhancer. Only applies if `face_enhancer` is set as a frame-processor
|
||||
--source-image-scaling-factor Set the upscale factor for source images. Only applies if `face_swapper` is set as a frame-processor
|
||||
-r SCALE, --super-resolution-scale-factor SCALE Super resolution scale factor, choices are 2, 3, 4
|
||||
-v, --version show program's version number and exit
|
||||
```
|
||||
|
||||
Looking for a CLI mode? Using the -s/--source argument will make the run program in cli mode.
|
||||
|
||||
To improve the video quality, you can use the `super_resolution` frame processor after swapping the faces. It will enhance the video quality by 2x, 3x or 4x. You can set the upscale factor using the `-r` or `--super-resolution-scale-factor` argument.
|
||||
Processing time will increase with the upscale factor, but it's quite quick.
|
||||
|
||||
```
|
||||
|
||||
## Credits
|
||||
- [henryruhs](https://github.com/henryruhs): for being an irreplaceable contributor to the project
|
||||
- [ffmpeg](https://ffmpeg.org/): for making video related operations easy
|
||||
|
@@ -1,20 +1,38 @@
|
||||
from typing import Any
|
||||
from typing import Any, Optional
|
||||
import cv2
|
||||
|
||||
|
||||
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
|
||||
def get_video_frame(video_path: str, frame_number: int = 0) -> Optional[Any]:
|
||||
"""Retrieve a specific frame from a video."""
|
||||
capture = cv2.VideoCapture(video_path)
|
||||
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
|
||||
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
|
||||
|
||||
if not capture.isOpened():
|
||||
print(f"Error: Cannot open video file {video_path}")
|
||||
return None
|
||||
|
||||
frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
|
||||
# Ensure frame_number is within the valid range
|
||||
frame_number = max(0, min(frame_number, frame_total - 1))
|
||||
|
||||
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
has_frame, frame = capture.read()
|
||||
capture.release()
|
||||
if has_frame:
|
||||
return frame
|
||||
return None
|
||||
|
||||
if not has_frame:
|
||||
print(f"Error: Cannot read frame {frame_number} from {video_path}")
|
||||
return None
|
||||
|
||||
return frame
|
||||
|
||||
def get_video_frame_total(video_path: str) -> int:
|
||||
"""Get the total number of frames in a video."""
|
||||
capture = cv2.VideoCapture(video_path)
|
||||
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
|
||||
if not capture.isOpened():
|
||||
print(f"Error: Cannot open video file {video_path}")
|
||||
return 0
|
||||
|
||||
frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
capture.release()
|
||||
return video_frame_total
|
||||
|
||||
return frame_total
|
||||
|
279
modules/core.py
279
modules/core.py
@@ -1,16 +1,17 @@
|
||||
import os
|
||||
import sys
|
||||
# single thread doubles cuda performance - needs to be set before torch import
|
||||
if any(arg.startswith('--execution-provider') for arg in sys.argv):
|
||||
os.environ['OMP_NUM_THREADS'] = '1'
|
||||
# reduce tensorflow log level
|
||||
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
|
||||
import warnings
|
||||
from typing import List
|
||||
import platform
|
||||
import signal
|
||||
import shutil
|
||||
import argparse
|
||||
from typing import List
|
||||
|
||||
# Set environment variables for CUDA performance and TensorFlow logging
|
||||
if any(arg.startswith('--execution-provider') for arg in sys.argv):
|
||||
os.environ['OMP_NUM_THREADS'] = '1'
|
||||
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
|
||||
|
||||
import torch
|
||||
import onnxruntime
|
||||
import tensorflow
|
||||
@@ -19,34 +20,73 @@ import modules.globals
|
||||
import modules.metadata
|
||||
import modules.ui as ui
|
||||
from modules.processors.frame.core import get_frame_processors_modules
|
||||
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
|
||||
|
||||
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||
del torch
|
||||
from modules.utilities import (
|
||||
has_image_extension,
|
||||
is_image,
|
||||
is_video,
|
||||
detect_fps,
|
||||
create_video,
|
||||
extract_frames,
|
||||
get_temp_frame_paths,
|
||||
restore_audio,
|
||||
create_temp,
|
||||
move_temp,
|
||||
clean_temp,
|
||||
normalize_output_path
|
||||
)
|
||||
|
||||
# Filter warnings
|
||||
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
|
||||
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
|
||||
|
||||
# Cross-platform resource management
|
||||
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||
del torch
|
||||
|
||||
|
||||
def parse_args() -> None:
|
||||
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
|
||||
program = argparse.ArgumentParser()
|
||||
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
|
||||
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
|
||||
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
|
||||
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
|
||||
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
|
||||
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
|
||||
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
|
||||
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
|
||||
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
|
||||
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
|
||||
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
|
||||
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
|
||||
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
|
||||
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
|
||||
program.add_argument('-s', '--source', help='Select a source image', dest='source_path')
|
||||
program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path')
|
||||
program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path')
|
||||
program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor',
|
||||
default=['face_swapper'], choices=['face_swapper', 'face_enhancer', 'super_resolution'],
|
||||
nargs='+')
|
||||
program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False)
|
||||
program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true',
|
||||
default=True)
|
||||
program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true',
|
||||
default=False)
|
||||
program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true',
|
||||
default=False)
|
||||
program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264',
|
||||
choices=['libx264', 'libx265', 'libvpx-vp9'])
|
||||
program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int,
|
||||
default=18,
|
||||
choices=range(52), metavar='[0-51]')
|
||||
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame',
|
||||
dest='live_mirror', action='store_true', default=False)
|
||||
program.add_argument('--live-resizable', help='The live camera frame is resizable',
|
||||
dest='live_resizable', action='store_true', default=False)
|
||||
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
|
||||
default=suggest_max_memory())
|
||||
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
|
||||
choices=suggest_execution_providers(), nargs='+')
|
||||
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
|
||||
default=suggest_execution_threads())
|
||||
program.add_argument('--headless', help='Run in headless mode', dest='headless', default=False, action='store_true')
|
||||
program.add_argument('--enhancer-upscale-factor',
|
||||
help='Sets the upscale factor for the enhancer. Only applies if `face_enhancer` is set as a frame-processor',
|
||||
dest='enhancer_upscale_factor', type=int, default=1)
|
||||
program.add_argument('--source-image-scaling-factor', help='Set the upscale factor for source images',
|
||||
dest='source_image_scaling_factor', default=2, type=int)
|
||||
program.add_argument('-r', '--super-resolution-scale-factor', dest='super_resolution_scale_factor',
|
||||
help='Set the upscale factor for super resolution', default=4, choices=[2, 3, 4], type=int)
|
||||
program.add_argument('-v', '--version', action='version',
|
||||
version=f'{modules.metadata.name} {modules.metadata.version}')
|
||||
|
||||
# register deprecated args
|
||||
# Register deprecated args
|
||||
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
|
||||
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
|
||||
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
|
||||
@@ -56,7 +96,8 @@ def parse_args() -> None:
|
||||
|
||||
modules.globals.source_path = args.source_path
|
||||
modules.globals.target_path = args.target_path
|
||||
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path)
|
||||
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path,
|
||||
args.output_path)
|
||||
modules.globals.frame_processors = args.frame_processor
|
||||
modules.globals.headless = args.source_path or args.target_path or args.output_path
|
||||
modules.globals.keep_fps = args.keep_fps
|
||||
@@ -65,23 +106,31 @@ def parse_args() -> None:
|
||||
modules.globals.many_faces = args.many_faces
|
||||
modules.globals.video_encoder = args.video_encoder
|
||||
modules.globals.video_quality = args.video_quality
|
||||
modules.globals.live_mirror = args.live_mirror
|
||||
modules.globals.live_resizable = args.live_resizable
|
||||
modules.globals.max_memory = args.max_memory
|
||||
modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
|
||||
modules.globals.execution_threads = args.execution_threads
|
||||
modules.globals.headless = args.headless
|
||||
modules.globals.enhancer_upscale_factor = args.enhancer_upscale_factor
|
||||
modules.globals.source_image_scaling_factor = args.source_image_scaling_factor
|
||||
modules.globals.sr_scale_factor = args.super_resolution_scale_factor
|
||||
# Handle face enhancer tumbler
|
||||
modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor
|
||||
|
||||
#for ENHANCER tumbler:
|
||||
if 'face_enhancer' in args.frame_processor:
|
||||
modules.globals.fp_ui['face_enhancer'] = True
|
||||
else:
|
||||
modules.globals.fp_ui['face_enhancer'] = False
|
||||
|
||||
modules.globals.nsfw = False
|
||||
|
||||
# translate deprecated args
|
||||
# Handle deprecated arguments
|
||||
handle_deprecated_args(args)
|
||||
|
||||
|
||||
def handle_deprecated_args(args) -> None:
|
||||
"""Handle deprecated arguments by translating them to the new format."""
|
||||
if args.source_path_deprecated:
|
||||
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
|
||||
modules.globals.source_path = args.source_path_deprecated
|
||||
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
|
||||
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path,
|
||||
args.output_path)
|
||||
if args.cpu_cores_deprecated:
|
||||
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
|
||||
modules.globals.execution_threads = args.cpu_cores_deprecated
|
||||
@@ -92,7 +141,7 @@ def parse_args() -> None:
|
||||
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
|
||||
modules.globals.execution_providers = decode_execution_providers(['cuda'])
|
||||
if args.gpu_vendor_deprecated == 'amd':
|
||||
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
|
||||
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m')
|
||||
modules.globals.execution_providers = decode_execution_providers(['rocm'])
|
||||
if args.gpu_threads_deprecated:
|
||||
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
|
||||
@@ -100,18 +149,22 @@ def parse_args() -> None:
|
||||
|
||||
|
||||
def encode_execution_providers(execution_providers: List[str]) -> List[str]:
|
||||
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
|
||||
return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers]
|
||||
|
||||
|
||||
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
|
||||
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
|
||||
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
|
||||
available_providers = onnxruntime.get_available_providers()
|
||||
encoded_providers = encode_execution_providers(available_providers)
|
||||
|
||||
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
|
||||
if req in encoded_providers]
|
||||
|
||||
# Default to CPU if no suitable providers are found
|
||||
return selected_providers if selected_providers else ['CPUExecutionProvider']
|
||||
|
||||
|
||||
def suggest_max_memory() -> int:
|
||||
if platform.system().lower() == 'darwin':
|
||||
return 4
|
||||
return 16
|
||||
return 4 if platform.system().lower() == 'darwin' else 16
|
||||
|
||||
|
||||
def suggest_execution_providers() -> List[str]:
|
||||
@@ -119,34 +172,43 @@ def suggest_execution_providers() -> List[str]:
|
||||
|
||||
|
||||
def suggest_execution_threads() -> int:
|
||||
if 'DmlExecutionProvider' in modules.globals.execution_providers:
|
||||
if 'dml' in modules.globals.execution_providers:
|
||||
return 1
|
||||
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||
if 'rocm' in modules.globals.execution_providers:
|
||||
return 1
|
||||
return 8
|
||||
|
||||
|
||||
def limit_resources() -> None:
|
||||
# prevent tensorflow memory leak
|
||||
# Prevent TensorFlow memory leak
|
||||
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
|
||||
for gpu in gpus:
|
||||
tensorflow.config.experimental.set_memory_growth(gpu, True)
|
||||
# limit memory usage
|
||||
|
||||
# Limit memory usage
|
||||
if modules.globals.max_memory:
|
||||
memory = modules.globals.max_memory * 1024 ** 3
|
||||
if platform.system().lower() == 'darwin':
|
||||
memory = modules.globals.max_memory * 1024 ** 6
|
||||
if platform.system().lower() == 'windows':
|
||||
memory = modules.globals.max_memory * 1024 ** 3
|
||||
elif platform.system().lower() == 'windows':
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
|
||||
else:
|
||||
import resource
|
||||
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
|
||||
try:
|
||||
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
|
||||
if memory > hard:
|
||||
print(
|
||||
f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
|
||||
memory = hard
|
||||
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
|
||||
except ValueError as e:
|
||||
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
|
||||
|
||||
|
||||
def release_resources() -> None:
|
||||
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
|
||||
if 'cuda' in modules.globals.execution_providers:
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
|
||||
@@ -157,12 +219,15 @@ def pre_check() -> bool:
|
||||
if not shutil.which('ffmpeg'):
|
||||
update_status('ffmpeg is not installed.')
|
||||
return False
|
||||
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
|
||||
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def update_status(message: str, scope: str = 'DLC.CORE') -> None:
|
||||
print(f'[{scope}] {message}')
|
||||
if not modules.globals.headless:
|
||||
if not modules.globals.headless and ui.status_label:
|
||||
ui.update_status(message)
|
||||
|
||||
|
||||
@@ -170,37 +235,70 @@ def start() -> None:
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
if not frame_processor.pre_start():
|
||||
return
|
||||
# process image to image
|
||||
|
||||
# Process image to image
|
||||
if has_image_extension(modules.globals.target_path):
|
||||
if modules.globals.nsfw == False:
|
||||
from modules.predicter import predict_image
|
||||
if predict_image(modules.globals.target_path):
|
||||
destroy()
|
||||
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
update_status('Progressing...', frame_processor.NAME)
|
||||
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
|
||||
release_resources()
|
||||
if is_image(modules.globals.target_path):
|
||||
update_status('Processing to image succeed!')
|
||||
else:
|
||||
update_status('Processing to image failed!')
|
||||
process_image_to_image()
|
||||
return
|
||||
# process image to videos
|
||||
if modules.globals.nsfw == False:
|
||||
|
||||
# Process image to video
|
||||
process_image_to_video()
|
||||
|
||||
|
||||
def process_image_to_image() -> None:
|
||||
if modules.globals.nsfw:
|
||||
from modules.predicter import predict_image
|
||||
if predict_image(modules.globals.target_path):
|
||||
destroy(to_quit=False)
|
||||
update_status('Processing to image ignored!')
|
||||
return
|
||||
|
||||
try:
|
||||
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
|
||||
except Exception as e:
|
||||
print("Error copying file:", str(e))
|
||||
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
update_status('Processing...', frame_processor.NAME)
|
||||
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path,
|
||||
modules.globals.output_path)
|
||||
release_resources()
|
||||
|
||||
if is_image(modules.globals.target_path):
|
||||
update_status('Processing to image succeeded!')
|
||||
else:
|
||||
update_status('Processing to image failed!')
|
||||
|
||||
|
||||
def process_image_to_video() -> None:
|
||||
if modules.globals.nsfw:
|
||||
from modules.predicter import predict_video
|
||||
if predict_video(modules.globals.target_path):
|
||||
destroy()
|
||||
update_status('Creating temp resources...')
|
||||
destroy(to_quit=False)
|
||||
update_status('Processing to video ignored!')
|
||||
return
|
||||
|
||||
update_status('Creating temporary resources...')
|
||||
create_temp(modules.globals.target_path)
|
||||
update_status('Extracting frames...')
|
||||
extract_frames(modules.globals.target_path)
|
||||
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
update_status('Progressing...', frame_processor.NAME)
|
||||
update_status('Processing...', frame_processor.NAME)
|
||||
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
|
||||
release_resources()
|
||||
# handles fps
|
||||
|
||||
handle_video_fps()
|
||||
handle_video_audio()
|
||||
clean_temp(modules.globals.target_path)
|
||||
|
||||
if is_video(modules.globals.target_path):
|
||||
update_status('Processing to video succeeded!')
|
||||
else:
|
||||
update_status('Processing to video failed!')
|
||||
|
||||
|
||||
def handle_video_fps() -> None:
|
||||
if modules.globals.keep_fps:
|
||||
update_status('Detecting fps...')
|
||||
fps = detect_fps(modules.globals.target_path)
|
||||
@@ -209,7 +307,9 @@ def start() -> None:
|
||||
else:
|
||||
update_status('Creating video with 30.0 fps...')
|
||||
create_video(modules.globals.target_path)
|
||||
# handle audio
|
||||
|
||||
|
||||
def handle_video_audio() -> None:
|
||||
if modules.globals.keep_audio:
|
||||
if modules.globals.keep_fps:
|
||||
update_status('Restoring audio...')
|
||||
@@ -218,30 +318,29 @@ def start() -> None:
|
||||
restore_audio(modules.globals.target_path, modules.globals.output_path)
|
||||
else:
|
||||
move_temp(modules.globals.target_path, modules.globals.output_path)
|
||||
# clean and validate
|
||||
clean_temp(modules.globals.target_path)
|
||||
if is_video(modules.globals.target_path):
|
||||
update_status('Processing to video succeed!')
|
||||
else:
|
||||
update_status('Processing to video failed!')
|
||||
|
||||
|
||||
def destroy() -> None:
|
||||
def destroy(to_quit=True) -> None:
|
||||
if modules.globals.target_path:
|
||||
clean_temp(modules.globals.target_path)
|
||||
quit()
|
||||
if to_quit: quit()
|
||||
|
||||
|
||||
def run() -> None:
|
||||
parse_args()
|
||||
if not pre_check():
|
||||
return
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
if not frame_processor.pre_check():
|
||||
try:
|
||||
parse_args()
|
||||
if not pre_check():
|
||||
return
|
||||
limit_resources()
|
||||
if modules.globals.headless:
|
||||
start()
|
||||
else:
|
||||
window = ui.init(start, destroy)
|
||||
window.mainloop()
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
if not frame_processor.pre_check():
|
||||
return
|
||||
limit_resources()
|
||||
if modules.globals.headless:
|
||||
start()
|
||||
else:
|
||||
window = ui.init(start, destroy)
|
||||
window.mainloop()
|
||||
except Exception as e:
|
||||
print(f"UI initialization failed: {str(e)}")
|
||||
update_status(f"UI initialization failed: {str(e)}")
|
||||
destroy() # Ensure any resources are cleaned up on failure
|
||||
|
@@ -1,31 +1,27 @@
|
||||
from typing import Any
|
||||
from typing import Any, Optional
|
||||
import insightface
|
||||
|
||||
import modules.globals
|
||||
from modules.typing import Frame
|
||||
|
||||
FACE_ANALYSER = None
|
||||
FACE_ANALYSER: Optional[insightface.app.FaceAnalysis] = None
|
||||
|
||||
|
||||
def get_face_analyser() -> Any:
|
||||
def get_face_analyser() -> insightface.app.FaceAnalysis:
|
||||
global FACE_ANALYSER
|
||||
|
||||
if FACE_ANALYSER is None:
|
||||
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers)
|
||||
FACE_ANALYSER = insightface.app.FaceAnalysis(
|
||||
name='buffalo_l',
|
||||
providers=modules.globals.execution_providers
|
||||
)
|
||||
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
|
||||
|
||||
return FACE_ANALYSER
|
||||
|
||||
def get_one_face(frame: Frame) -> Optional[Any]:
|
||||
faces = get_face_analyser().get(frame)
|
||||
return min(faces, key=lambda x: x.bbox[0], default=None)
|
||||
|
||||
def get_one_face(frame: Frame) -> Any:
|
||||
face = get_face_analyser().get(frame)
|
||||
try:
|
||||
return min(face, key=lambda x: x.bbox[0])
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def get_many_faces(frame: Frame) -> Any:
|
||||
try:
|
||||
return get_face_analyser().get(frame)
|
||||
except IndexError:
|
||||
return None
|
||||
def get_many_faces(frame: Frame) -> Optional[Any]:
|
||||
faces = get_face_analyser().get(frame)
|
||||
return faces if faces else None
|
||||
|
@@ -19,6 +19,8 @@ keep_frames = None
|
||||
many_faces = None
|
||||
video_encoder = None
|
||||
video_quality = None
|
||||
live_mirror = None
|
||||
live_resizable = None
|
||||
max_memory = None
|
||||
execution_providers: List[str] = []
|
||||
execution_threads = None
|
||||
@@ -27,4 +29,7 @@ log_level = 'error'
|
||||
fp_ui: Dict[str, bool] = {}
|
||||
nsfw = None
|
||||
camera_input_combobox = None
|
||||
webcam_preview_running = False
|
||||
webcam_preview_running = False
|
||||
enhancer_upscale_factor = 1
|
||||
source_image_scaling_factor = 2
|
||||
sr_scale_factor = 4
|
@@ -1,24 +1,25 @@
|
||||
import numpy
|
||||
import numpy as np
|
||||
import opennsfw2
|
||||
from PIL import Image
|
||||
|
||||
from modules.typing import Frame
|
||||
|
||||
MAX_PROBABILITY = 0.85
|
||||
|
||||
# Preload the model once for efficiency
|
||||
model = None
|
||||
|
||||
def predict_frame(target_frame: Frame) -> bool:
|
||||
global model
|
||||
if model is None: model = opennsfw2.make_open_nsfw_model()
|
||||
image = Image.fromarray(target_frame)
|
||||
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
|
||||
model = opennsfw2.make_open_nsfw_model()
|
||||
views = numpy.expand_dims(image, axis=0)
|
||||
views = np.expand_dims(image, axis=0)
|
||||
_, probability = model.predict(views)[0]
|
||||
return probability > MAX_PROBABILITY
|
||||
|
||||
|
||||
def predict_image(target_path: str) -> bool:
|
||||
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
|
||||
|
||||
probability = opennsfw2.predict_image(target_path)
|
||||
return probability > MAX_PROBABILITY
|
||||
|
||||
def predict_video(target_path: str) -> bool:
|
||||
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)
|
||||
|
@@ -17,57 +17,56 @@ FRAME_PROCESSORS_INTERFACE = [
|
||||
'process_video'
|
||||
]
|
||||
|
||||
|
||||
def load_frame_processor_module(frame_processor: str) -> Any:
|
||||
def load_frame_processor_module(frame_processor: str) -> ModuleType:
|
||||
try:
|
||||
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
|
||||
# Ensure all required methods are present
|
||||
for method_name in FRAME_PROCESSORS_INTERFACE:
|
||||
if not hasattr(frame_processor_module, method_name):
|
||||
sys.exit()
|
||||
raise AttributeError(f"Missing required method {method_name} in {frame_processor} module.")
|
||||
except ImportError:
|
||||
print(f"Frame processor {frame_processor} not found")
|
||||
sys.exit()
|
||||
print(f"Error: Frame processor '{frame_processor}' not found.")
|
||||
sys.exit(1)
|
||||
except AttributeError as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
|
||||
return frame_processor_module
|
||||
|
||||
|
||||
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
|
||||
global FRAME_PROCESSORS_MODULES
|
||||
|
||||
if not FRAME_PROCESSORS_MODULES:
|
||||
for frame_processor in frame_processors:
|
||||
frame_processor_module = load_frame_processor_module(frame_processor)
|
||||
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
|
||||
FRAME_PROCESSORS_MODULES = [load_frame_processor_module(fp) for fp in frame_processors]
|
||||
|
||||
set_frame_processors_modules_from_ui(frame_processors)
|
||||
return FRAME_PROCESSORS_MODULES
|
||||
|
||||
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
|
||||
global FRAME_PROCESSORS_MODULES
|
||||
for frame_processor, state in modules.globals.fp_ui.items():
|
||||
if state == True and frame_processor not in frame_processors:
|
||||
frame_processor_module = load_frame_processor_module(frame_processor)
|
||||
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
|
||||
if state and frame_processor not in frame_processors:
|
||||
module = load_frame_processor_module(frame_processor)
|
||||
FRAME_PROCESSORS_MODULES.append(module)
|
||||
modules.globals.frame_processors.append(frame_processor)
|
||||
if state == False:
|
||||
try:
|
||||
frame_processor_module = load_frame_processor_module(frame_processor)
|
||||
FRAME_PROCESSORS_MODULES.remove(frame_processor_module)
|
||||
modules.globals.frame_processors.remove(frame_processor)
|
||||
except:
|
||||
pass
|
||||
elif not state and frame_processor in frame_processors:
|
||||
module = load_frame_processor_module(frame_processor)
|
||||
FRAME_PROCESSORS_MODULES.remove(module)
|
||||
modules.globals.frame_processors.remove(frame_processor)
|
||||
|
||||
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
|
||||
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
|
||||
futures = []
|
||||
for path in temp_frame_paths:
|
||||
future = executor.submit(process_frames, source_path, [path], progress)
|
||||
futures.append(future)
|
||||
futures = [executor.submit(process_frames, source_path, [path], progress) for path in temp_frame_paths]
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
|
||||
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
|
||||
def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
|
||||
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
|
||||
total = len(frame_paths)
|
||||
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
||||
progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory})
|
||||
progress.set_postfix({
|
||||
'execution_providers': modules.globals.execution_providers,
|
||||
'execution_threads': modules.globals.execution_threads,
|
||||
'max_memory': modules.globals.max_memory
|
||||
})
|
||||
multi_process_frame(source_path, frame_paths, process_frames, progress)
|
||||
|
@@ -8,7 +8,7 @@ import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.typing import Frame, Face
|
||||
from modules.typing import Frame, Face # Ensure these are imported
|
||||
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
|
||||
|
||||
FACE_ENHANCER = None
|
||||
@@ -16,34 +16,29 @@ THREAD_SEMAPHORE = threading.Semaphore()
|
||||
THREAD_LOCK = threading.Lock()
|
||||
NAME = 'DLC.FACE-ENHANCER'
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('..\models')
|
||||
conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth'])
|
||||
return True
|
||||
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status('Select an image or video for target path.', NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_face_enhancer() -> Any:
|
||||
global FACE_ENHANCER
|
||||
|
||||
with THREAD_LOCK:
|
||||
if FACE_ENHANCER is None:
|
||||
if os.name == 'nt':
|
||||
model_path = resolve_relative_path('..\models\GFPGANv1.4.pth')
|
||||
# todo: set models path https://github.com/TencentARC/GFPGAN/issues/399
|
||||
else:
|
||||
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
|
||||
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
|
||||
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
|
||||
FACE_ENHANCER = gfpgan.GFPGANer(
|
||||
model_path=model_path,
|
||||
upscale=modules.globals.enhancer_upscale_factor
|
||||
) # type: ignore[attr-defined]
|
||||
return FACE_ENHANCER
|
||||
|
||||
|
||||
def enhance_face(temp_frame: Frame) -> Frame:
|
||||
with THREAD_SEMAPHORE:
|
||||
_, _, temp_frame = get_face_enhancer().enhance(
|
||||
@@ -52,14 +47,12 @@ def enhance_face(temp_frame: Frame) -> Frame:
|
||||
)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face:
|
||||
temp_frame = enhance_face(temp_frame)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
@@ -68,12 +61,10 @@ def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str, target_path: str, output_path: str) -> None:
|
||||
target_frame = cv2.imread(target_path)
|
||||
result = process_frame(None, target_frame)
|
||||
cv2.imwrite(output_path, result)
|
||||
|
||||
|
||||
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
||||
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
|
||||
|
@@ -2,6 +2,7 @@ from typing import Any, List
|
||||
import cv2
|
||||
import insightface
|
||||
import threading
|
||||
import os
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
@@ -9,45 +10,59 @@ from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face, get_many_faces
|
||||
from modules.typing import Face, Frame
|
||||
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
|
||||
import numpy as np
|
||||
|
||||
FACE_SWAPPER = None
|
||||
THREAD_LOCK = threading.Lock()
|
||||
NAME = 'DLC.FACE-SWAPPER'
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('../models')
|
||||
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'])
|
||||
conditional_download(download_directory_path, [
|
||||
'https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128.onnx'
|
||||
])
|
||||
return True
|
||||
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.source_path):
|
||||
update_status('Select an image for source path.', NAME)
|
||||
return False
|
||||
elif not get_one_face(cv2.imread(modules.globals.source_path)):
|
||||
update_status('No face in source path detected.', NAME)
|
||||
update_status('No face detected in the source path.', NAME)
|
||||
return False
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status('Select an image or video for target path.', NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_face_swapper() -> Any:
|
||||
global FACE_SWAPPER
|
||||
|
||||
with THREAD_LOCK:
|
||||
if FACE_SWAPPER is None:
|
||||
model_path = resolve_relative_path('../models/inswapper_128_fp16.onnx')
|
||||
model_path = resolve_relative_path('../models/inswapper_128.onnx')
|
||||
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers)
|
||||
return FACE_SWAPPER
|
||||
|
||||
def upscale_image(image: np.ndarray, scaling_factor: int = modules.globals.source_image_scaling_factor) -> np.ndarray:
|
||||
"""
|
||||
Upscales the given image by the specified scaling factor.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image to upscale.
|
||||
scaling_factor (int): The factor by which to upscale the image.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The upscaled image.
|
||||
"""
|
||||
height, width = image.shape[:2]
|
||||
new_size = (width * scaling_factor, height * scaling_factor)
|
||||
upscaled_image = cv2.resize(image, new_size, interpolation=cv2.INTER_CUBIC)
|
||||
return upscaled_image
|
||||
|
||||
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
|
||||
|
||||
|
||||
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
||||
if modules.globals.many_faces:
|
||||
many_faces = get_many_faces(temp_frame)
|
||||
@@ -60,27 +75,30 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
||||
temp_frame = swap_face(source_face, target_face, temp_frame)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
|
||||
source_face = get_one_face(cv2.imread(source_path))
|
||||
source_image = cv2.imread(source_path)
|
||||
if source_image is None:
|
||||
print(f"Failed to load source image from {source_path}")
|
||||
return
|
||||
# Upscale the source image for better quality
|
||||
source_image_upscaled = upscale_image(source_image, scaling_factor=2)
|
||||
source_face = get_one_face(source_image_upscaled)
|
||||
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
try:
|
||||
result = process_frame(source_face, temp_frame)
|
||||
cv2.imwrite(temp_frame_path, result)
|
||||
except Exception as exception:
|
||||
print(exception)
|
||||
pass
|
||||
print(f"Error processing frame {temp_frame_path}: {exception}")
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str, target_path: str, output_path: str) -> None:
|
||||
source_face = get_one_face(cv2.imread(source_path))
|
||||
target_frame = cv2.imread(target_path)
|
||||
result = process_frame(source_face, target_frame)
|
||||
cv2.imwrite(output_path, result)
|
||||
|
||||
|
||||
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
||||
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||
|
197
modules/processors/frame/super_resolution.py
Normal file
197
modules/processors/frame/super_resolution.py
Normal file
@@ -0,0 +1,197 @@
|
||||
import threading
|
||||
import traceback
|
||||
from typing import Any, List
|
||||
import cv2
|
||||
|
||||
import os
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
|
||||
import numpy as np
|
||||
|
||||
NAME = 'DLC.SUPER-RESOLUTION'
|
||||
THREAD_SEMAPHORE = threading.Semaphore()
|
||||
|
||||
# Singleton class for Super-Resolution
|
||||
class SuperResolutionModel:
|
||||
_instance = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __init__(self, sr_model_path: str = f'ESPCN_x{modules.globals.sr_scale_factor}.pb'):
|
||||
if SuperResolutionModel._instance is not None:
|
||||
raise Exception("This class is a singleton!")
|
||||
self.sr = cv2.dnn_superres.DnnSuperResImpl_create()
|
||||
self.model_path = os.path.join(resolve_relative_path('../models'), sr_model_path)
|
||||
if not os.path.exists(self.model_path):
|
||||
raise FileNotFoundError(f"Super-resolution model not found at {self.model_path}")
|
||||
try:
|
||||
self.sr.readModel(self.model_path)
|
||||
self.sr.setModel("espcn", modules.globals.sr_scale_factor) # Using ESPCN with 2,3 or 4x upscaling
|
||||
except Exception as e:
|
||||
print(f"Error during super-resolution model initialization: {e}")
|
||||
raise e
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls, sr_model_path: str = f'ESPCN_x{modules.globals.sr_scale_factor}.pb'):
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
try:
|
||||
cls._instance = cls(sr_model_path)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to initialize SuperResolution: {str(e)}")
|
||||
return cls._instance
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
"""
|
||||
Checks and downloads necessary models before starting the face swapper.
|
||||
"""
|
||||
download_directory_path = resolve_relative_path('../models')
|
||||
# Download the super-resolution model as well
|
||||
conditional_download(download_directory_path, [
|
||||
f'https://huggingface.co/spaces/PabloGabrielSch/AI_Resolution_Upscaler_And_Resizer/resolve/bcd13b766a9499196e8becbe453c4a848673b3b6/models/ESPCN_x{modules.globals.sr_scale_factor}.pb'
|
||||
])
|
||||
return True
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.source_path):
|
||||
update_status('Select an image for source path.', NAME)
|
||||
return False
|
||||
elif not get_one_face(cv2.imread(modules.globals.source_path)):
|
||||
update_status('No face detected in the source path.', NAME)
|
||||
return False
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status('Select an image or video for target path.', NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def apply_super_resolution(image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Applies super-resolution to the given image using the provided super-resolver.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image to enhance.
|
||||
sr_model_path (str): ESPCN model path for super-resolution.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The super-resolved image.
|
||||
"""
|
||||
with THREAD_SEMAPHORE:
|
||||
sr_model = SuperResolutionModel.get_instance()
|
||||
|
||||
if sr_model is None:
|
||||
print("Super-resolution model is not initialized.")
|
||||
return image
|
||||
try:
|
||||
upscaled_image = sr_model.sr.upsample(image)
|
||||
return upscaled_image
|
||||
except Exception as e:
|
||||
print(f"Error during super-resolution: {e}")
|
||||
return image
|
||||
|
||||
|
||||
def process_frame(frame: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Processes a single frame by swapping the source face into detected target faces.
|
||||
|
||||
Args:
|
||||
|
||||
frame (np.ndarray): The target frame image.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The processed frame with swapped faces.
|
||||
"""
|
||||
|
||||
# Apply super-resolution to the entire frame
|
||||
frame = apply_super_resolution(frame)
|
||||
|
||||
return frame
|
||||
|
||||
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
|
||||
"""
|
||||
Processes multiple frames by swapping the source face into each target frame.
|
||||
|
||||
Args:
|
||||
source_path (str): Path to the source image.
|
||||
temp_frame_paths (List[str]): List of paths to target frame images.
|
||||
progress (Any, optional): Progress tracker. Defaults to None.
|
||||
"""
|
||||
for idx, temp_frame_path in enumerate(temp_frame_paths):
|
||||
frame = cv2.imread(temp_frame_path)
|
||||
if frame is None:
|
||||
print(f"Failed to load frame from {temp_frame_path}")
|
||||
continue
|
||||
try:
|
||||
result = process_frame(frame)
|
||||
cv2.imwrite(temp_frame_path, result)
|
||||
except Exception as exception:
|
||||
traceback.print_exc()
|
||||
print(f"Error processing frame {temp_frame_path}: {exception}")
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
def upscale_image(image: np.ndarray, scaling_factor: int = 2) -> np.ndarray:
|
||||
"""
|
||||
Upscales the given image by the specified scaling factor.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image to upscale.
|
||||
scaling_factor (int): The factor by which to upscale the image.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The upscaled image.
|
||||
"""
|
||||
height, width = image.shape[:2]
|
||||
new_size = (width * scaling_factor, height * scaling_factor)
|
||||
upscaled_image = cv2.resize(image, new_size, interpolation=cv2.INTER_CUBIC)
|
||||
return upscaled_image
|
||||
|
||||
def process_image(source_path: str, target_path: str, output_path: str) -> None:
|
||||
"""
|
||||
Processes a single image by swapping the source face into the target image.
|
||||
|
||||
Args:
|
||||
source_path (str): Path to the source image.
|
||||
target_path (str): Path to the target image.
|
||||
output_path (str): Path to save the output image.
|
||||
"""
|
||||
source_image = cv2.imread(source_path)
|
||||
if source_image is None:
|
||||
print(f"Failed to load source image from {source_path}")
|
||||
return
|
||||
|
||||
# Upscale the source image for better quality before face detection
|
||||
source_image_upscaled = upscale_image(source_image, scaling_factor=2)
|
||||
|
||||
# Detect source face from the upscaled image
|
||||
source_face = get_one_face(source_image_upscaled)
|
||||
if source_face is None:
|
||||
print("No source face detected.")
|
||||
return
|
||||
|
||||
target_frame = cv2.imread(target_path)
|
||||
if target_frame is None:
|
||||
print(f"Failed to load target image from {target_path}")
|
||||
return
|
||||
|
||||
# Process the frame
|
||||
result = process_frame(target_frame)
|
||||
|
||||
# Save the processed frame
|
||||
cv2.imwrite(output_path, result)
|
||||
|
||||
|
||||
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
||||
"""
|
||||
Processes a video by swapping the source face into each frame.
|
||||
|
||||
Args:
|
||||
source_path (str): Path to the source image.
|
||||
temp_frame_paths (List[str]): List of paths to video frame images.
|
||||
"""
|
||||
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
|
123
modules/ui.json
123
modules/ui.json
@@ -1,76 +1,57 @@
|
||||
{
|
||||
"CTk": {
|
||||
"fg_color": ["gray95", "gray10"]
|
||||
"fg_color": ["#FFFFFF", "#2D2D2D"]
|
||||
},
|
||||
"CTkToplevel": {
|
||||
"fg_color": ["gray95", "gray10"]
|
||||
"fg_color": ["#FFFFFF", "#2D2D2D"]
|
||||
},
|
||||
"CTkFrame": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 0,
|
||||
"fg_color": ["gray90", "gray13"],
|
||||
"top_fg_color": ["gray85", "gray16"],
|
||||
"border_color": ["gray65", "gray28"]
|
||||
"fg_color": ["#F0F0F0", "#3C3C3C"],
|
||||
"top_fg_color": ["#E0E0E0", "#4B4B4B"],
|
||||
"border_color": ["#B0B0B0", "#5A5A5A"]
|
||||
},
|
||||
"CTkButton": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 0,
|
||||
"fg_color": ["#2aa666", "#1f538d"],
|
||||
"hover_color": ["#3cb666", "#14375e"],
|
||||
"border_color": ["#3e4a40", "#949A9F"],
|
||||
"text_color": ["#f3faf6", "#f3faf6"],
|
||||
"fg_color": ["#007ACC", "#007ACC"],
|
||||
"hover_color": ["#005EA3", "#005EA3"],
|
||||
"border_color": ["#004C8A", "#004C8A"],
|
||||
"text_color": ["#FFFFFF", "#FFFFFF"],
|
||||
"text_color_disabled": ["gray74", "gray60"]
|
||||
},
|
||||
"CTkLabel": {
|
||||
"corner_radius": 0,
|
||||
"fg_color": "transparent",
|
||||
"text_color": ["gray14", "gray84"]
|
||||
"text_color": ["#000000", "#FFFFFF"]
|
||||
},
|
||||
"CTkEntry": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 2,
|
||||
"fg_color": ["#F9F9FA", "#343638"],
|
||||
"border_color": ["#979DA2", "#565B5E"],
|
||||
"text_color": ["gray14", "gray84"],
|
||||
"fg_color": ["#FFFFFF", "#333333"],
|
||||
"border_color": ["#A0A0A0", "#5A5A5A"],
|
||||
"text_color": ["#000000", "#FFFFFF"],
|
||||
"placeholder_text_color": ["gray52", "gray62"]
|
||||
},
|
||||
"CTkCheckbox": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 3,
|
||||
"fg_color": ["#2aa666", "#1f538d"],
|
||||
"border_color": ["#3e4a40", "#949A9F"],
|
||||
"hover_color": ["#3cb666", "#14375e"],
|
||||
"checkmark_color": ["#f3faf6", "gray90"],
|
||||
"text_color": ["gray14", "gray84"],
|
||||
"text_color_disabled": ["gray60", "gray45"]
|
||||
},
|
||||
"CTkSwitch": {
|
||||
"corner_radius": 1000,
|
||||
"border_width": 3,
|
||||
"button_length": 0,
|
||||
"fg_color": ["#939BA2", "#4A4D50"],
|
||||
"progress_color": ["#2aa666", "#1f538d"],
|
||||
"button_color": ["gray36", "#D5D9DE"],
|
||||
"button_hover_color": ["gray20", "gray100"],
|
||||
"text_color": ["gray14", "gray84"],
|
||||
"button_color": ["#444444", "#D5D9DE"],
|
||||
"button_hover_color": ["#333333", "#FFFFFF"],
|
||||
"text_color": ["#000000", "#FFFFFF"],
|
||||
"text_color_disabled": ["gray60", "gray45"]
|
||||
},
|
||||
"CTkRadiobutton": {
|
||||
"corner_radius": 1000,
|
||||
"border_width_checked": 6,
|
||||
"border_width_unchecked": 3,
|
||||
"CTkOptionMenu": {
|
||||
"corner_radius": 0,
|
||||
"fg_color": ["#2aa666", "#1f538d"],
|
||||
"border_color": ["#3e4a40", "#949A9F"],
|
||||
"hover_color": ["#3cb666", "#14375e"],
|
||||
"text_color": ["gray14", "gray84"],
|
||||
"text_color_disabled": ["gray60", "gray45"]
|
||||
},
|
||||
"CTkProgressBar": {
|
||||
"corner_radius": 1000,
|
||||
"border_width": 0,
|
||||
"fg_color": ["#939BA2", "#4A4D50"],
|
||||
"progress_color": ["#2aa666", "#1f538d"],
|
||||
"border_color": ["gray", "gray"]
|
||||
"button_color": ["#3cb666", "#14375e"],
|
||||
"button_hover_color": ["#234567", "#1e2c40"],
|
||||
"text_color": ["#FFFFFF", "#FFFFFF"],
|
||||
"text_color_disabled": ["gray74", "gray60"]
|
||||
},
|
||||
"CTkSlider": {
|
||||
"corner_radius": 1000,
|
||||
@@ -82,59 +63,6 @@
|
||||
"button_color": ["#2aa666", "#1f538d"],
|
||||
"button_hover_color": ["#3cb666", "#14375e"]
|
||||
},
|
||||
"CTkOptionMenu": {
|
||||
"corner_radius": 0,
|
||||
"fg_color": ["#2aa666", "#1f538d"],
|
||||
"button_color": ["#3cb666", "#14375e"],
|
||||
"button_hover_color": ["#234567", "#1e2c40"],
|
||||
"text_color": ["#f3faf6", "#f3faf6"],
|
||||
"text_color_disabled": ["gray74", "gray60"]
|
||||
},
|
||||
"CTkComboBox": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 2,
|
||||
"fg_color": ["#F9F9FA", "#343638"],
|
||||
"border_color": ["#979DA2", "#565B5E"],
|
||||
"button_color": ["#979DA2", "#565B5E"],
|
||||
"button_hover_color": ["#6E7174", "#7A848D"],
|
||||
"text_color": ["gray14", "gray84"],
|
||||
"text_color_disabled": ["gray50", "gray45"]
|
||||
},
|
||||
"CTkScrollbar": {
|
||||
"corner_radius": 1000,
|
||||
"border_spacing": 4,
|
||||
"fg_color": "transparent",
|
||||
"button_color": ["gray55", "gray41"],
|
||||
"button_hover_color": ["gray40", "gray53"]
|
||||
},
|
||||
"CTkSegmentedButton": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 2,
|
||||
"fg_color": ["#979DA2", "gray29"],
|
||||
"selected_color": ["#2aa666", "#1f538d"],
|
||||
"selected_hover_color": ["#3cb666", "#14375e"],
|
||||
"unselected_color": ["#979DA2", "gray29"],
|
||||
"unselected_hover_color": ["gray70", "gray41"],
|
||||
"text_color": ["#f3faf6", "#f3faf6"],
|
||||
"text_color_disabled": ["gray74", "gray60"]
|
||||
},
|
||||
"CTkTextbox": {
|
||||
"corner_radius": 0,
|
||||
"border_width": 0,
|
||||
"fg_color": ["gray100", "gray20"],
|
||||
"border_color": ["#979DA2", "#565B5E"],
|
||||
"text_color": ["gray14", "gray84"],
|
||||
"scrollbar_button_color": ["gray55", "gray41"],
|
||||
"scrollbar_button_hover_color": ["gray40", "gray53"]
|
||||
},
|
||||
"CTkScrollableFrame": {
|
||||
"label_fg_color": ["gray80", "gray21"]
|
||||
},
|
||||
"DropdownMenu": {
|
||||
"fg_color": ["gray90", "gray20"],
|
||||
"hover_color": ["gray75", "gray28"],
|
||||
"text_color": ["gray14", "gray84"]
|
||||
},
|
||||
"CTkFont": {
|
||||
"macOS": {
|
||||
"family": "Avenir",
|
||||
@@ -152,7 +80,12 @@
|
||||
"weight": "normal"
|
||||
}
|
||||
},
|
||||
"DropdownMenu": {
|
||||
"fg_color": ["#FFFFFF", "#2D2D2D"],
|
||||
"hover_color": ["#E0E0E0", "#4B4B4B"],
|
||||
"text_color": ["#000000", "#FFFFFF"]
|
||||
},
|
||||
"URL": {
|
||||
"text_color": ["gray74", "gray60"]
|
||||
"text_color": ["#007ACC", "#1E90FF"]
|
||||
}
|
||||
}
|
||||
|
349
modules/ui.py
349
modules/ui.py
@@ -1,9 +1,21 @@
|
||||
import os
|
||||
import platform
|
||||
import webbrowser
|
||||
import customtkinter as ctk
|
||||
from typing import Callable, Tuple
|
||||
from typing import Callable, Tuple, List, Any
|
||||
from types import ModuleType
|
||||
import cv2
|
||||
from PIL import Image, ImageOps
|
||||
import pyvirtualcam
|
||||
|
||||
# Import OS-specific modules only when necessary
|
||||
if platform.system() == 'Darwin': # macOS
|
||||
import AVFoundation
|
||||
|
||||
# Import Windows specific modules only when on windows platform
|
||||
if platform.system() == 'Windows' or platform.system() == 'Linux': # Windows or Linux
|
||||
from pygrabber.dshow_graph import FilterGraph
|
||||
|
||||
|
||||
import modules.globals
|
||||
import modules.metadata
|
||||
@@ -13,12 +25,14 @@ from modules.processors.frame.core import get_frame_processors_modules
|
||||
from modules.utilities import is_image, is_video, resolve_relative_path
|
||||
|
||||
ROOT = None
|
||||
ROOT_HEIGHT = 700
|
||||
ROOT_HEIGHT = 800
|
||||
ROOT_WIDTH = 600
|
||||
|
||||
PREVIEW = None
|
||||
PREVIEW_MAX_HEIGHT = 700
|
||||
PREVIEW_MAX_WIDTH = 1200
|
||||
PREVIEW_MAX_WIDTH = 1200
|
||||
PREVIEW_DEFAULT_WIDTH = 960
|
||||
PREVIEW_DEFAULT_HEIGHT = 540
|
||||
|
||||
RECENT_DIRECTORY_SOURCE = None
|
||||
RECENT_DIRECTORY_TARGET = None
|
||||
@@ -32,10 +46,49 @@ status_label = None
|
||||
|
||||
img_ft, vid_ft = modules.globals.file_types
|
||||
|
||||
camera = None
|
||||
|
||||
def check_camera_permissions():
|
||||
"""Check and request camera access permission on macOS."""
|
||||
if platform.system() == 'Darwin': # macOS-specific
|
||||
auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo)
|
||||
|
||||
if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined:
|
||||
# Request access to the camera
|
||||
def completion_handler(granted):
|
||||
if granted:
|
||||
print("Access granted to the camera.")
|
||||
else:
|
||||
print("Access denied to the camera.")
|
||||
|
||||
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler)
|
||||
elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized:
|
||||
print("Camera access already authorized.")
|
||||
elif auth_status == AVFoundation.AVAuthorizationStatusDenied:
|
||||
print("Camera access denied. Please enable it in System Preferences.")
|
||||
elif auth_status == AVFoundation.AVAuthorizationStatusRestricted:
|
||||
print("Camera access restricted. The app is not allowed to use the camera.")
|
||||
|
||||
|
||||
def select_camera(camera_name: str):
|
||||
"""Select the appropriate camera based on its name (cross-platform)."""
|
||||
if platform.system() == 'Darwin': # macOS-specific
|
||||
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
|
||||
for device in devices:
|
||||
if device.localizedName() == camera_name:
|
||||
return device
|
||||
elif platform.system() == 'Windows' or platform.system() == 'Linux':
|
||||
# On Windows/Linux, simply return the camera name as OpenCV can handle it by index
|
||||
return camera_name
|
||||
return None
|
||||
|
||||
|
||||
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
|
||||
global ROOT, PREVIEW
|
||||
|
||||
if platform.system() == 'Darwin': # macOS-specific
|
||||
check_camera_permissions() # Check camera permissions before initializing the UI
|
||||
|
||||
ROOT = create_root(start, destroy)
|
||||
PREVIEW = create_preview(ROOT)
|
||||
|
||||
@@ -49,101 +102,102 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
ctk.set_appearance_mode('system')
|
||||
ctk.set_default_color_theme(resolve_relative_path('ui.json'))
|
||||
|
||||
print("Creating root window...")
|
||||
|
||||
root = ctk.CTk()
|
||||
root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
|
||||
root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}')
|
||||
root.configure()
|
||||
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
|
||||
|
||||
source_label = ctk.CTkLabel(root, text=None)
|
||||
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
|
||||
source_label.place(relx=0.1, rely=0.0875, relwidth=0.3, relheight=0.25)
|
||||
|
||||
target_label = ctk.CTkLabel(root, text=None)
|
||||
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
|
||||
target_label.place(relx=0.6, rely=0.0875, relwidth=0.3, relheight=0.25)
|
||||
|
||||
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path())
|
||||
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
|
||||
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path)
|
||||
source_button.place(relx=0.1, rely=0.35, relwidth=0.3, relheight=0.1)
|
||||
|
||||
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path())
|
||||
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
|
||||
swap_faces_button = ctk.CTkButton(root, text='↔', cursor='hand2', command=lambda: swap_faces_paths())
|
||||
swap_faces_button.place(relx=0.45, rely=0.4, relwidth=0.1, relheight=0.1)
|
||||
|
||||
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path)
|
||||
target_button.place(relx=0.6, rely=0.35, relwidth=0.3, relheight=0.1)
|
||||
|
||||
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
|
||||
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps))
|
||||
keep_fps_checkbox.place(relx=0.1, rely=0.6)
|
||||
keep_fps_checkbox.place(relx=0.1, rely=0.525)
|
||||
|
||||
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
|
||||
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
|
||||
keep_frames_switch.place(relx=0.1, rely=0.65)
|
||||
keep_frames_switch.place(relx=0.1, rely=0.56875)
|
||||
|
||||
# for FRAME PROCESSOR ENHANCER tumbler:
|
||||
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
|
||||
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
|
||||
enhancer_switch.place(relx=0.1, rely=0.7)
|
||||
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get()))
|
||||
enhancer_switch.place(relx=0.1, rely=0.6125)
|
||||
|
||||
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
|
||||
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get()))
|
||||
keep_audio_switch.place(relx=0.6, rely=0.6)
|
||||
keep_audio_switch.place(relx=0.6, rely=0.525)
|
||||
|
||||
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
|
||||
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()))
|
||||
many_faces_switch.place(relx=0.6, rely=0.65)
|
||||
many_faces_switch.place(relx=0.6, rely=0.56875)
|
||||
|
||||
nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
|
||||
nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
|
||||
nsfw_switch.place(relx=0.6, rely=0.7)
|
||||
nsfw_switch.place(relx=0.6, rely=0.6125)
|
||||
|
||||
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
|
||||
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
start_button.place(relx=0.15, rely=0.7, relwidth=0.2, relheight=0.05)
|
||||
|
||||
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
|
||||
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy)
|
||||
stop_button.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.05)
|
||||
|
||||
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
|
||||
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview)
|
||||
preview_button.place(relx=0.65, rely=0.7, relwidth=0.2, relheight=0.05)
|
||||
|
||||
# --- Camera Selection ---
|
||||
camera_label = ctk.CTkLabel(root, text="Select Camera:")
|
||||
camera_label.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
camera_label.place(relx=0.4, rely=0.7525, relwidth=0.2, relheight=0.05)
|
||||
|
||||
available_cameras = get_available_cameras()
|
||||
|
||||
# Convert camera indices to strings for CTkOptionMenu
|
||||
available_camera_strings = [str(cam) for cam in available_cameras]
|
||||
|
||||
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
|
||||
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable,
|
||||
values=available_camera_strings)
|
||||
camera_optionmenu.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
|
||||
camera_optionmenu.place(relx=0.65, rely=0.7525, relwidth=0.2, relheight=0.05)
|
||||
|
||||
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(int(camera_variable.get())))
|
||||
live_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
# --- End Camera Selection ---
|
||||
virtual_cam_out_value = ctk.BooleanVar(value=False)
|
||||
virtual_cam_out_switch = ctk.CTkSwitch(root, text='Virtual Cam Output (OBS)', variable=virtual_cam_out_value, cursor='hand2')
|
||||
virtual_cam_out_switch.place(relx=0.4, rely=0.805)
|
||||
|
||||
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get(), virtual_cam_out_value.get()))
|
||||
live_button.place(relx=0.15, rely=0.7525, relwidth=0.2, relheight=0.05)
|
||||
|
||||
status_label = ctk.CTkLabel(root, text=None, justify='center')
|
||||
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
|
||||
status_label.place(relx=0.1, relwidth=0.8, rely=0.875)
|
||||
|
||||
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
|
||||
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
|
||||
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
|
||||
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
|
||||
donate_label.bind('<Button-1>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
|
||||
|
||||
return root
|
||||
|
||||
|
||||
def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel:
|
||||
def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel:
|
||||
global preview_label, preview_slider
|
||||
|
||||
preview = ctk.CTkToplevel(parent)
|
||||
preview.withdraw()
|
||||
preview.title('Preview')
|
||||
preview.configure()
|
||||
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
|
||||
preview.resizable(width=False, height=False)
|
||||
preview.protocol('WM_DELETE_WINDOW', toggle_preview)
|
||||
preview.resizable(width=True, height=True)
|
||||
|
||||
preview_label = ctk.CTkLabel(preview, text=None)
|
||||
preview_label.pack(fill='both', expand=True)
|
||||
|
||||
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value))
|
||||
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=update_preview)
|
||||
|
||||
return preview
|
||||
|
||||
@@ -158,10 +212,10 @@ def update_tumbler(var: str, value: bool) -> None:
|
||||
|
||||
|
||||
def select_source_path() -> None:
|
||||
global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft
|
||||
global RECENT_DIRECTORY_SOURCE
|
||||
|
||||
PREVIEW.withdraw()
|
||||
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
|
||||
source_path = ctk.filedialog.askopenfilename(title='Select a source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
|
||||
if is_image(source_path):
|
||||
modules.globals.source_path = source_path
|
||||
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
|
||||
@@ -172,11 +226,35 @@ def select_source_path() -> None:
|
||||
source_label.configure(image=None)
|
||||
|
||||
|
||||
def select_target_path() -> None:
|
||||
global RECENT_DIRECTORY_TARGET, img_ft, vid_ft
|
||||
def swap_faces_paths() -> None:
|
||||
global RECENT_DIRECTORY_SOURCE, RECENT_DIRECTORY_TARGET
|
||||
|
||||
source_path = modules.globals.source_path
|
||||
target_path = modules.globals.target_path
|
||||
|
||||
if not is_image(source_path) or not is_image(target_path):
|
||||
return
|
||||
|
||||
modules.globals.source_path = target_path
|
||||
modules.globals.target_path = source_path
|
||||
|
||||
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
|
||||
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
|
||||
|
||||
PREVIEW.withdraw()
|
||||
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
|
||||
|
||||
source_image = render_image_preview(modules.globals.source_path, (200, 200))
|
||||
source_label.configure(image=source_image)
|
||||
|
||||
target_image = render_image_preview(modules.globals.target_path, (200, 200))
|
||||
target_label.configure(image=target_image)
|
||||
|
||||
|
||||
def select_target_path() -> None:
|
||||
global RECENT_DIRECTORY_TARGET
|
||||
|
||||
PREVIEW.withdraw()
|
||||
target_path = ctk.filedialog.askopenfilename(title='Select a target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
|
||||
if is_image(target_path):
|
||||
modules.globals.target_path = target_path
|
||||
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
|
||||
@@ -193,12 +271,12 @@ def select_target_path() -> None:
|
||||
|
||||
|
||||
def select_output_path(start: Callable[[], None]) -> None:
|
||||
global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft
|
||||
global RECENT_DIRECTORY_OUTPUT
|
||||
|
||||
if is_image(modules.globals.target_path):
|
||||
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
|
||||
output_path = ctk.filedialog.asksaveasfilename(title='Save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
|
||||
elif is_video(modules.globals.target_path):
|
||||
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
|
||||
output_path = ctk.filedialog.asksaveasfilename(title='Save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
|
||||
else:
|
||||
output_path = None
|
||||
if output_path:
|
||||
@@ -219,13 +297,13 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
|
||||
if frame_number:
|
||||
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
has_frame, frame = capture.read()
|
||||
capture.release()
|
||||
if has_frame:
|
||||
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||||
if size:
|
||||
image = ImageOps.fit(image, size, Image.LANCZOS)
|
||||
return ctk.CTkImage(image, size=image.size)
|
||||
capture.release()
|
||||
cv2.destroyAllWindows()
|
||||
return None
|
||||
|
||||
|
||||
def toggle_preview() -> None:
|
||||
@@ -235,12 +313,17 @@ def toggle_preview() -> None:
|
||||
init_preview()
|
||||
update_preview()
|
||||
PREVIEW.deiconify()
|
||||
global camera
|
||||
if PREVIEW.state() == 'withdrawn':
|
||||
if camera and camera.isOpened():
|
||||
camera.release()
|
||||
camera = None
|
||||
|
||||
|
||||
def init_preview() -> None:
|
||||
if is_image(modules.globals.target_path):
|
||||
preview_slider.pack_forget()
|
||||
if is_video(modules.globals.target_path):
|
||||
elif is_video(modules.globals.target_path):
|
||||
video_frame_total = get_video_frame_total(modules.globals.target_path)
|
||||
preview_slider.configure(to=video_frame_total)
|
||||
preview_slider.pack(fill='x')
|
||||
@@ -250,7 +333,7 @@ def init_preview() -> None:
|
||||
def update_preview(frame_number: int = 0) -> None:
|
||||
if modules.globals.source_path and modules.globals.target_path:
|
||||
temp_frame = get_video_frame(modules.globals.target_path, frame_number)
|
||||
if modules.globals.nsfw == False:
|
||||
if not modules.globals.nsfw:
|
||||
from modules.predicter import predict_frame
|
||||
if predict_frame(temp_frame):
|
||||
quit()
|
||||
@@ -264,62 +347,148 @@ def update_preview(frame_number: int = 0) -> None:
|
||||
image = ctk.CTkImage(image, size=image.size)
|
||||
preview_label.configure(image=image)
|
||||
|
||||
def webcam_preview(camera_index: int):
|
||||
def webcam_preview_loop(camera: cv2.VideoCapture, source_image: Any, frame_processors: List[ModuleType], virtual_cam: pyvirtualcam.Camera = None) -> bool:
|
||||
global preview_label, PREVIEW
|
||||
|
||||
ret, frame = camera.read()
|
||||
if not ret:
|
||||
update_status(f"Error: Frame not received from camera.")
|
||||
return False
|
||||
|
||||
temp_frame = frame.copy()
|
||||
|
||||
if modules.globals.live_mirror:
|
||||
temp_frame = cv2.flip(temp_frame, 1) # horizontal flipping
|
||||
|
||||
if modules.globals.live_resizable:
|
||||
temp_frame = fit_image_to_size(temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height())
|
||||
|
||||
for frame_processor in frame_processors:
|
||||
temp_frame = frame_processor.process_frame(source_image, temp_frame)
|
||||
|
||||
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
|
||||
image = ImageOps.contain(image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS)
|
||||
image = ctk.CTkImage(image, size=image.size)
|
||||
preview_label.configure(image=image)
|
||||
if virtual_cam:
|
||||
virtual_cam.send(temp_frame)
|
||||
virtual_cam.sleep_until_next_frame()
|
||||
ROOT.update()
|
||||
|
||||
if PREVIEW.state() == 'withdrawn':
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def fit_image_to_size(image, width: int, height: int):
|
||||
if width is None and height is None:
|
||||
return image
|
||||
h, w, _ = image.shape
|
||||
ratio_h = 0.0
|
||||
ratio_w = 0.0
|
||||
if width > height:
|
||||
ratio_h = height / h
|
||||
else:
|
||||
ratio_w = width / w
|
||||
ratio = max(ratio_w, ratio_h)
|
||||
new_size = (int(ratio * w), int(ratio * h))
|
||||
return cv2.resize(image, dsize=new_size)
|
||||
|
||||
def webcam_preview(camera_name: str, virtual_cam_output: bool):
|
||||
if modules.globals.source_path is None:
|
||||
# No image selected
|
||||
return
|
||||
|
||||
global preview_label, PREVIEW
|
||||
|
||||
cap = cv2.VideoCapture(camera_index)
|
||||
if not cap.isOpened():
|
||||
update_status(f"Error: Could not open camera with index {camera_index}")
|
||||
WIDTH = 960
|
||||
HEIGHT = 540
|
||||
FPS = 60
|
||||
|
||||
# Select the camera by its name
|
||||
selected_camera = select_camera(camera_name)
|
||||
if selected_camera is None:
|
||||
update_status(f"No suitable camera found.")
|
||||
return
|
||||
|
||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
|
||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
|
||||
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
|
||||
PREVIEW_MAX_WIDTH = 960
|
||||
PREVIEW_MAX_HEIGHT = 540
|
||||
# Use OpenCV's camera index for cross-platform compatibility
|
||||
camera_index = get_camera_index_by_name(camera_name)
|
||||
|
||||
preview_label.configure(image=None) # Reset the preview image before startup
|
||||
global camera
|
||||
camera = cv2.VideoCapture(camera_index)
|
||||
|
||||
PREVIEW.deiconify() # Open preview window
|
||||
if not camera.isOpened():
|
||||
update_status(f"Error: Could not open camera {camera_name}")
|
||||
return
|
||||
|
||||
camera.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
|
||||
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
|
||||
camera.set(cv2.CAP_PROP_FPS, FPS)
|
||||
|
||||
PREVIEW_MAX_WIDTH = WIDTH
|
||||
PREVIEW_MAX_HEIGHT = HEIGHT
|
||||
|
||||
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
|
||||
PREVIEW.deiconify()
|
||||
|
||||
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
||||
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
||||
|
||||
source_image = None # Initialize variable for the selected face image
|
||||
preview_running = True
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
if virtual_cam_output:
|
||||
with pyvirtualcam.Camera(width=WIDTH, height=HEIGHT, fps=FPS, fmt=pyvirtualcam.PixelFormat.BGR) as virtual_cam:
|
||||
while preview_running:
|
||||
preview_running = webcam_preview_loop(camera, source_image, frame_processors, virtual_cam)
|
||||
|
||||
# Select and save face image only once
|
||||
if source_image is None and modules.globals.source_path:
|
||||
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
||||
while preview_running:
|
||||
preview_running = webcam_preview_loop(camera, source_image, frame_processors)
|
||||
|
||||
temp_frame = frame.copy() #Create a copy of the frame
|
||||
if camera: camera.release()
|
||||
PREVIEW.withdraw()
|
||||
|
||||
for frame_processor in frame_processors:
|
||||
temp_frame = frame_processor.process_frame(source_image, temp_frame)
|
||||
|
||||
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
|
||||
image = Image.fromarray(image)
|
||||
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
|
||||
image = ctk.CTkImage(image, size=image.size)
|
||||
preview_label.configure(image=image)
|
||||
ROOT.update()
|
||||
def get_camera_index_by_name(camera_name: str) -> int:
|
||||
"""Map camera name to index for OpenCV."""
|
||||
if platform.system() == 'Darwin': # macOS-specific
|
||||
if "FaceTime" in camera_name:
|
||||
return 0 # Assuming FaceTime is at index 0
|
||||
elif "iPhone" in camera_name:
|
||||
return 1 # Assuming iPhone camera is at index 1
|
||||
elif platform.system() == 'Windows' or platform.system() == 'Linux':
|
||||
# Map camera name to index dynamically (OpenCV on these platforms usually starts with 0)
|
||||
return get_available_cameras().index(camera_name)
|
||||
return -1
|
||||
|
||||
cap.release()
|
||||
PREVIEW.withdraw() # Close preview window when loop is finished
|
||||
|
||||
def get_available_cameras():
|
||||
"""Returns a list of available camera indices."""
|
||||
"""Get available camera names (cross-platform)."""
|
||||
available_cameras = []
|
||||
for index in range(10): # Check for cameras with index 0 to 9
|
||||
cap = cv2.VideoCapture(index)
|
||||
if cap.isOpened():
|
||||
available_cameras.append(index)
|
||||
cap.release()
|
||||
return available_cameras
|
||||
if platform.system() == 'Darwin': # macOS-specific
|
||||
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
|
||||
|
||||
for device in devices:
|
||||
if device.deviceType() == AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera:
|
||||
print(f"Found Built-In Camera: {device.localizedName()}")
|
||||
available_cameras.append(device.localizedName())
|
||||
elif device.deviceType() == "AVCaptureDeviceTypeExternal":
|
||||
print(f"Found External Camera: {device.localizedName()}")
|
||||
available_cameras.append(device.localizedName())
|
||||
elif device.deviceType() == "AVCaptureDeviceTypeContinuityCamera":
|
||||
print(f"Skipping Continuity Camera: {device.localizedName()}")
|
||||
elif platform.system() == 'Windows' or platform.system() == 'Linux':
|
||||
try:
|
||||
devices = FilterGraph().get_input_devices()
|
||||
except Exception as e:
|
||||
# Use OpenCV to detect camera indexes
|
||||
index = 0
|
||||
devices = []
|
||||
while True:
|
||||
cap = cv2.VideoCapture(index)
|
||||
if not cap.isOpened():
|
||||
break
|
||||
devices.append(f"Camera {index}")
|
||||
cap.release()
|
||||
index += 1
|
||||
|
||||
available_cameras = devices
|
||||
return available_cameras
|
||||
|
@@ -5,7 +5,7 @@ import platform
|
||||
import shutil
|
||||
import ssl
|
||||
import subprocess
|
||||
import urllib
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import List, Any
|
||||
from tqdm import tqdm
|
||||
@@ -15,127 +15,123 @@ import modules.globals
|
||||
TEMP_FILE = 'temp.mp4'
|
||||
TEMP_DIRECTORY = 'temp'
|
||||
|
||||
# monkey patch ssl for mac
|
||||
# Monkey patch SSL for macOS to handle issues with some HTTPS requests
|
||||
if platform.system().lower() == 'darwin':
|
||||
ssl._create_default_https_context = ssl._create_unverified_context
|
||||
|
||||
|
||||
def run_ffmpeg(args: List[str]) -> bool:
|
||||
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level]
|
||||
commands.extend(args)
|
||||
try:
|
||||
subprocess.check_output(commands, stderr=subprocess.STDOUT)
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"FFmpeg error: {e.output.decode()}")
|
||||
return False
|
||||
|
||||
|
||||
def detect_fps(target_path: str) -> float:
|
||||
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
|
||||
output = subprocess.check_output(command).decode().strip().split('/')
|
||||
command = [
|
||||
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
|
||||
'-show_entries', 'stream=r_frame_rate',
|
||||
'-of', 'default=noprint_wrappers=1:nokey=1', target_path
|
||||
]
|
||||
try:
|
||||
output = subprocess.check_output(command).decode().strip().split('/')
|
||||
numerator, denominator = map(int, output)
|
||||
return numerator / denominator
|
||||
except Exception:
|
||||
pass
|
||||
except (subprocess.CalledProcessError, ValueError):
|
||||
print("Failed to detect FPS, defaulting to 30.0 FPS.")
|
||||
return 30.0
|
||||
|
||||
|
||||
def extract_frames(target_path: str) -> None:
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
create_temp(target_path)
|
||||
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
|
||||
|
||||
|
||||
def create_video(target_path: str, fps: float = 30.0) -> None:
|
||||
temp_output_path = get_temp_output_path(target_path)
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
|
||||
|
||||
run_ffmpeg([
|
||||
'-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'),
|
||||
'-c:v', modules.globals.video_encoder,
|
||||
'-crf', str(modules.globals.video_quality),
|
||||
'-pix_fmt', 'yuv420p',
|
||||
'-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1',
|
||||
'-y', temp_output_path
|
||||
])
|
||||
|
||||
def restore_audio(target_path: str, output_path: str) -> None:
|
||||
temp_output_path = get_temp_output_path(target_path)
|
||||
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
|
||||
done = run_ffmpeg([
|
||||
'-i', temp_output_path, '-i', target_path,
|
||||
'-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path
|
||||
])
|
||||
if not done:
|
||||
move_temp(target_path, output_path)
|
||||
|
||||
|
||||
def get_temp_frame_paths(target_path: str) -> List[str]:
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
|
||||
|
||||
return glob.glob(os.path.join(glob.escape(temp_directory_path), '*.png'))
|
||||
|
||||
def get_temp_directory_path(target_path: str) -> str:
|
||||
target_name, _ = os.path.splitext(os.path.basename(target_path))
|
||||
target_directory_path = os.path.dirname(target_path)
|
||||
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name)
|
||||
|
||||
target_name = Path(target_path).stem
|
||||
target_directory_path = Path(target_path).parent
|
||||
return str(target_directory_path / TEMP_DIRECTORY / target_name)
|
||||
|
||||
def get_temp_output_path(target_path: str) -> str:
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
return os.path.join(temp_directory_path, TEMP_FILE)
|
||||
return str(Path(temp_directory_path) / TEMP_FILE)
|
||||
|
||||
|
||||
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
|
||||
if source_path and target_path:
|
||||
source_name, _ = os.path.splitext(os.path.basename(source_path))
|
||||
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
|
||||
if os.path.isdir(output_path):
|
||||
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
|
||||
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> str:
|
||||
if source_path and target_path and os.path.isdir(output_path):
|
||||
source_name = Path(source_path).stem
|
||||
target_name = Path(target_path).stem
|
||||
target_extension = Path(target_path).suffix
|
||||
return str(Path(output_path) / f"{source_name}-{target_name}{target_extension}")
|
||||
return output_path
|
||||
|
||||
|
||||
def create_temp(target_path: str) -> None:
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def move_temp(target_path: str, output_path: str) -> None:
|
||||
temp_output_path = get_temp_output_path(target_path)
|
||||
if os.path.isfile(temp_output_path):
|
||||
if os.path.isfile(output_path):
|
||||
os.remove(output_path)
|
||||
shutil.move(temp_output_path, output_path)
|
||||
|
||||
|
||||
def clean_temp(target_path: str) -> None:
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
parent_directory_path = os.path.dirname(temp_directory_path)
|
||||
parent_directory_path = Path(temp_directory_path).parent
|
||||
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
|
||||
shutil.rmtree(temp_directory_path)
|
||||
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
|
||||
os.rmdir(parent_directory_path)
|
||||
|
||||
if parent_directory_path.exists() and not list(parent_directory_path.iterdir()):
|
||||
parent_directory_path.rmdir()
|
||||
|
||||
def has_image_extension(image_path: str) -> bool:
|
||||
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
|
||||
|
||||
|
||||
def is_image(image_path: str) -> bool:
|
||||
if image_path and os.path.isfile(image_path):
|
||||
mimetype, _ = mimetypes.guess_type(image_path)
|
||||
return bool(mimetype and mimetype.startswith('image/'))
|
||||
return mimetype and mimetype.startswith('image/')
|
||||
return False
|
||||
|
||||
|
||||
def is_video(video_path: str) -> bool:
|
||||
if video_path and os.path.isfile(video_path):
|
||||
mimetype, _ = mimetypes.guess_type(video_path)
|
||||
return bool(mimetype and mimetype.startswith('video/'))
|
||||
return mimetype and mimetype.startswith('video/')
|
||||
return False
|
||||
|
||||
|
||||
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
|
||||
if not os.path.exists(download_directory_path):
|
||||
os.makedirs(download_directory_path)
|
||||
download_directory = Path(download_directory_path)
|
||||
download_directory.mkdir(parents=True, exist_ok=True)
|
||||
for url in urls:
|
||||
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
|
||||
if not os.path.exists(download_file_path):
|
||||
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
|
||||
total = int(request.headers.get('Content-Length', 0))
|
||||
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
|
||||
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
|
||||
|
||||
download_file_path = download_directory / Path(url).name
|
||||
if not download_file_path.exists():
|
||||
with urllib.request.urlopen(url) as request:
|
||||
total = int(request.headers.get('Content-Length', 0))
|
||||
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
|
||||
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size))
|
||||
|
||||
def resolve_relative_path(path: str) -> str:
|
||||
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
|
||||
return str(Path(__file__).parent / path)
|
||||
|
@@ -1,7 +1,7 @@
|
||||
--extra-index-url https://download.pytorch.org/whl/cu118
|
||||
|
||||
numpy==1.23.5
|
||||
opencv-python==4.8.1.78
|
||||
opencv-contrib-python==4.10.0.84
|
||||
onnx==1.16.0
|
||||
insightface==0.7.3
|
||||
psutil==5.9.8
|
||||
@@ -21,3 +21,7 @@ opennsfw2==0.10.2
|
||||
protobuf==4.23.2
|
||||
tqdm==4.66.4
|
||||
gfpgan==1.3.8
|
||||
pyobjc==9.1; sys_platform == 'darwin'
|
||||
pygrabber==0.2
|
||||
pyvirtualcam==0.12.0
|
||||
pyobjc-framework-AVFoundation==10.3.1; sys_platform == 'darwin'
|
@@ -3,73 +3,35 @@ setlocal EnableDelayedExpansion
|
||||
|
||||
:: 1. Setup your platform
|
||||
echo Setting up your platform...
|
||||
|
||||
:: Python
|
||||
where python >nul 2>&1
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo Python is not installed. Please install Python 3.10 or later.
|
||||
pause
|
||||
exit /b
|
||||
)
|
||||
|
||||
:: Pip
|
||||
where pip >nul 2>&1
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo Pip is not installed. Please install Pip.
|
||||
pause
|
||||
exit /b
|
||||
)
|
||||
|
||||
:: Git
|
||||
where git >nul 2>&1
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo Git is not installed. Installing Git...
|
||||
winget install --id Git.Git -e --source winget
|
||||
)
|
||||
|
||||
:: FFMPEG
|
||||
where ffmpeg >nul 2>&1
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo FFMPEG is not installed. Installing FFMPEG...
|
||||
winget install --id Gyan.FFmpeg -e --source winget
|
||||
)
|
||||
call :check_installation python "Python 3.10 or later"
|
||||
call :check_installation pip "Pip"
|
||||
call :install_if_missing git "Git" "winget install --id Git.Git -e --source winget"
|
||||
call :install_if_missing ffmpeg "FFMPEG" "winget install --id Gyan.FFmpeg -e --source winget"
|
||||
|
||||
:: Visual Studio 2022 Runtimes
|
||||
echo Installing Visual Studio 2022 Runtimes...
|
||||
winget install --id Microsoft.VC++2015-2022Redist-x64 -e --source winget
|
||||
|
||||
:: 2. Clone Repository
|
||||
if exist Deep-Live-Cam (
|
||||
echo Deep-Live-Cam directory already exists.
|
||||
set /p overwrite="Do you want to overwrite? (Y/N): "
|
||||
if /i "%overwrite%"=="Y" (
|
||||
rmdir /s /q Deep-Live-Cam
|
||||
git clone https://github.com/hacksider/Deep-Live-Cam.git
|
||||
) else (
|
||||
echo Skipping clone, using existing directory.
|
||||
)
|
||||
) else (
|
||||
git clone https://github.com/hacksider/Deep-Live-Cam.git
|
||||
)
|
||||
cd Deep-Live-Cam
|
||||
call :clone_repository "https://github.com/iVideoGameBoss/iRoopDeepFaceCam.git" "iRoopDeepFaceCam"
|
||||
|
||||
:: 3. Download Models
|
||||
echo Downloading models...
|
||||
mkdir models
|
||||
curl -L -o models/GFPGANv1.4.pth https://path.to.model/GFPGANv1.4.pth
|
||||
curl -L -o models/inswapper_128_fp16.onnx https://path.to.model/inswapper_128_fp16.onnx
|
||||
if not exist models mkdir models
|
||||
curl -L -o models\GFPGANv1.4.pth https://huggingface.co/ivideogameboss/iroopdeepfacecam/resolve/main/GFPGANv1.4.pth
|
||||
curl -L -o models\inswapper_128_fp16.onnx https://huggingface.co/ivideogameboss/iroopdeepfacecam/resolve/main/inswapper_128_fp16.onnx
|
||||
|
||||
:: 4. Install dependencies
|
||||
echo Creating a virtual environment...
|
||||
python -m venv venv
|
||||
call venv\Scripts\activate
|
||||
call venv\Scripts\activate.bat
|
||||
|
||||
echo Installing required Python packages...
|
||||
pip install --upgrade pip
|
||||
pip install -r requirements.txt
|
||||
|
||||
echo Setup complete. You can now run the application.
|
||||
|
||||
:menu
|
||||
:: GPU Acceleration Options
|
||||
echo.
|
||||
echo Choose the GPU Acceleration Option if applicable:
|
||||
@@ -81,42 +43,83 @@ echo 5. OpenVINO (Intel)
|
||||
echo 6. None
|
||||
set /p choice="Enter your choice (1-6): "
|
||||
|
||||
if "%choice%"=="1" (
|
||||
echo Installing CUDA dependencies...
|
||||
pip uninstall -y onnxruntime onnxruntime-gpu
|
||||
pip install onnxruntime-gpu==1.16.3
|
||||
set exec_provider="cuda"
|
||||
) else if "%choice%"=="2" (
|
||||
echo Installing CoreML (Apple Silicon) dependencies...
|
||||
pip uninstall -y onnxruntime onnxruntime-silicon
|
||||
pip install onnxruntime-silicon==1.13.1
|
||||
set exec_provider="coreml"
|
||||
) else if "%choice%"=="3" (
|
||||
echo Installing CoreML (Apple Legacy) dependencies...
|
||||
pip uninstall -y onnxruntime onnxruntime-coreml
|
||||
pip install onnxruntime-coreml==1.13.1
|
||||
set exec_provider="coreml"
|
||||
) else if "%choice%"=="4" (
|
||||
echo Installing DirectML dependencies...
|
||||
pip uninstall -y onnxruntime onnxruntime-directml
|
||||
pip install onnxruntime-directml==1.15.1
|
||||
set exec_provider="directml"
|
||||
) else if "%choice%"=="5" (
|
||||
echo Installing OpenVINO dependencies...
|
||||
pip uninstall -y onnxruntime onnxruntime-openvino
|
||||
pip install onnxruntime-openvino==1.15.0
|
||||
set exec_provider="openvino"
|
||||
) else (
|
||||
echo Skipping GPU acceleration setup.
|
||||
)
|
||||
set "exec_provider="
|
||||
call :set_execution_provider %choice%
|
||||
|
||||
:end_choice
|
||||
echo.
|
||||
echo GPU Acceleration setup complete.
|
||||
echo Selected provider: !exec_provider!
|
||||
echo.
|
||||
|
||||
:: Run the application
|
||||
if defined exec_provider (
|
||||
echo Running the application with %exec_provider% execution provider...
|
||||
python run.py --execution-provider %exec_provider%
|
||||
echo Running the application with !exec_provider! execution provider...
|
||||
python run.py --execution-provider !exec_provider!
|
||||
) else (
|
||||
echo Running the application...
|
||||
python run.py
|
||||
)
|
||||
|
||||
:: Deactivate the virtual environment
|
||||
call venv\Scripts\deactivate.bat
|
||||
|
||||
echo.
|
||||
echo Script execution completed.
|
||||
pause
|
||||
exit /b
|
||||
|
||||
:check_installation
|
||||
where %1 >nul 2>&1
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo %2 is not installed. Please install %2.
|
||||
pause
|
||||
exit /b
|
||||
)
|
||||
|
||||
:install_if_missing
|
||||
where %1 >nul 2>&1
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo %2 is not installed. Installing %2...
|
||||
%3
|
||||
)
|
||||
|
||||
:clone_repository
|
||||
if exist %2 (
|
||||
echo %2 directory already exists.
|
||||
set /p overwrite="Do you want to overwrite? (Y/N): "
|
||||
if /i "%overwrite%"=="Y" (
|
||||
rmdir /s /q %2
|
||||
git clone %1
|
||||
) else (
|
||||
echo Skipping clone, using existing directory.
|
||||
)
|
||||
) else (
|
||||
git clone %1
|
||||
)
|
||||
|
||||
:set_execution_provider
|
||||
if "%1"=="1" (
|
||||
call :install_onnxruntime "onnxruntime-gpu" "1.16.3" "cuda"
|
||||
) else if "%1"=="2" (
|
||||
call :install_onnxruntime "onnxruntime-silicon" "1.13.1" "coreml"
|
||||
) else if "%1"=="3" (
|
||||
call :install_onnxruntime "onnxruntime-coreml" "1.13.1" "coreml"
|
||||
) else if "%1"=="4" (
|
||||
call :install_onnxruntime "onnxruntime-directml" "1.15.1" "directml"
|
||||
) else if "%1"=="5" (
|
||||
call :install_onnxruntime "onnxruntime-openvino" "1.15.0" "openvino"
|
||||
) else if "%1"=="6" (
|
||||
echo Skipping GPU acceleration setup.
|
||||
set "exec_provider=none"
|
||||
) else (
|
||||
echo Invalid choice. Please try again.
|
||||
goto menu
|
||||
)
|
||||
|
||||
:install_onnxruntime
|
||||
echo Installing %1 dependencies...
|
||||
pip uninstall -y onnxruntime %1
|
||||
pip install %1==%2
|
||||
set "exec_provider=%3"
|
||||
goto end_choice
|
||||
|
Reference in New Issue
Block a user