35 Commits

Author SHA1 Message Date
Kenneth Estanislao
61dae91439 Revert "Merge pull request #566 from pereiraroland26/main"
This reverts commit 5d450b4352.
2024-10-04 15:57:48 +08:00
Kenneth Estanislao
5d450b4352 Merge pull request #566 from pereiraroland26/main
Added support for multiple faces
2024-10-04 15:55:31 +08:00
Kenneth Estanislao
a11ccf9c49 Merge pull request #621 from saleweaver/experimental
Significantly improve video resolution/quality using ESPCN_x4 model
2024-09-23 11:59:23 +08:00
Michael
59cc742197 Add option to change the scale factor.
Add readme information about super resolution.
2024-09-23 02:49:48 +01:00
Michael
2066560a95 Significantly improve video resolution/quality using ESPCN_x4 model 2024-09-23 02:48:59 +01:00
Michael
1af9abda2f Significantly improve video resolution/quality using ESPCN_x4 model 2024-09-23 02:25:14 +01:00
Kenneth Estanislao
91884eebf7 Merge pull request #615 from saleweaver/experimental
Adding headless parameter to arguments to run from the cli, reenabling macOS compatibility
2024-09-23 04:01:23 +08:00
Michael
4686716c59 add to README.md 2024-09-22 18:39:00 +01:00
Michael
f4028d3949 Fix underscore/hyphen 2024-09-22 18:33:02 +01:00
Michael
07c735e9d2 Allows to set the upscale factor for gfpgan face_enhancer 2024-09-22 18:31:06 +01:00
Michael
aa021b6aa0 better import condition 2024-09-22 18:11:02 +01:00
Michael
0e3805e200 added headless argument to readme 2024-09-22 17:57:46 +01:00
Michael
5cabbffda8 - removed unused import statements
- added macOS specific required library to requirements.txt
- conditional import of pygrabber, which is unavailable for macOS
2024-09-22 17:55:26 +01:00
Michael
0d4676591e - removed unused import statements
- added macOS specific required library to requirements.txt
- conditional import of pygrabber, which is unavailable for macOS
2024-09-22 17:54:44 +01:00
Michael
c2cc885672 Adding headless parameter to arguments to run from the cli 2024-09-21 22:41:47 +01:00
Kenneth Estanislao
e36c746c81 Update setup_deep_live_cam.bat 2024-09-08 20:31:36 +08:00
barongello
14ab470dcc Adding a swap faces button to easily swap source/target images 2024-08-27 12:44:47 +08:00
Kenneth Estanislao
4dc4746235 update inswapper 2024-08-21 14:40:15 +08:00
Kenneth Estanislao
ac8feff652 Merge pull request #329 from bit-wrangler/experimental
Added virtual camera output and fetching of input camera devices with names using pygrabber on windows and linux
2024-08-16 00:58:50 +08:00
Aleksandr Spiridonov
a90c4facc5 added a note to README to document new virtual cam output feature 2024-08-15 12:27:52 -04:00
Aleksandr Spiridonov
575373beac fixed variable names not matching after merge conflict resolution from upstream 2024-08-15 12:22:19 -04:00
Aleksandr Spiridonov
b8cdad5cce Merge remote-tracking branch 'parent/experimental' into experimental 2024-08-15 12:15:53 -04:00
Kenneth Estanislao
137ac597ef Merge pull request #293 from vic4key/experimental
To fix bugs and support more options for the Live function (see details in Commits tab)
2024-08-15 13:44:53 +08:00
Aleksandr Spiridonov
f976885456 updated rely coords for the taller window 2024-08-15 01:36:24 -04:00
Aleksandr Spiridonov
cd2c3c2103 added virtual cam output 2024-08-15 01:31:10 -04:00
Aleksandr Spiridonov
3fbc1d0433 added virtual cam output toggle 2024-08-15 01:04:57 -04:00
Aleksandr Spiridonov
b4cf8854f8 refactored camera preview to use a loop function 2024-08-15 00:50:14 -04:00
Aleksandr Spiridonov
eb733ad8c5 started using pygrabber to get input cameras with names; fixed issue with webcam preview not stopping when the preview window is closed 2024-08-15 00:42:53 -04:00
Vic P
c6c41b8d0d Support the following options:
- The live camera display as you see it in the front-facing camera frame (like iPhone's Mirror Front Camera).
- The live camera frame is resizable.
Note: These options are turned off by default. Enabling both options may reduce performance by ~2%.

Signed-off-by: Vic P <vic4key@gmail.com>
2024-08-15 02:25:29 +07:00
Vic P
55c8d8181c Fix an issue that the Live function where the camera was not released when the user closed the live window.
Signed-off-by: Vic P <vic4key@gmail.com>
2024-08-14 00:48:01 +07:00
Kenneth Estanislao
4ddcd60c49 Merge pull request #237 from vic4key/experimental
Fix & Improve the NSFW function
2024-08-13 12:10:14 +08:00
Vic P
408b0f4cf7 ## Fix & Improve the NSFW function
- Fixed incorrect state usage.
- Removed the redundant argument that caused exceptions.
- Prevented the app from closing when an image is flagged as NSFW.
2024-08-13 04:16:34 +07:00
Kenneth Estanislao
78c808ef36 Merge pull request #166 from zoharbabin/experimental
Refactor and Optimize Cross-Platform Support
2024-08-12 12:27:35 +08:00
Zohar Babin
6b0cc74957 Refactor and Optimize Cross-Platform Support, Error Handling, and UI Enhancements 2024-08-10 22:41:45 -04:00
Dmitry Samoylenko
8d3072d906 Enable to choose a camera device in UI
Signed-off-by: samoylenkodmitry <samoylenkodmitry@gmail.com>
2024-08-10 14:08:29 +08:00
16 changed files with 943 additions and 500 deletions

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.10.14

View File

@@ -24,7 +24,7 @@ Users of this software are expected to use this software responsibly while abidi
#### 3. Download Models
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth)
2. [inswapper_128_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx)
2. [inswapper_128_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx)
Then put those 2 files on the "**models**" folder
@@ -142,30 +142,43 @@ Just follow the clicks on the screenshot
Just use your favorite screencapture to stream like OBS
> Note: In case you want to change your face, just select another picture, the preview mode will then restart (so just wait a bit).
You can now use the virtual camera output (uses pyvirtualcam) by turning on the `Virtual Cam Output (OBS)` toggle which should output to the OBS Virtual Camera. Note: this may not work on macOS. You will get a preview as before, but now you will also have a virtual camera output which can be used in applications like Zoom.
Additional command line arguments are given below. To learn out what they do, check [this guide](https://github.com/s0md3v/roop/wiki/Advanced-Options).
```
options:
-h, --help show this help message and exit
-s SOURCE_PATH, --source SOURCE_PATH select an source image
-t TARGET_PATH, --target TARGET_PATH select an target image or video
-o OUTPUT_PATH, --output OUTPUT_PATH select output file or directory
--frame-processor FRAME_PROCESSOR [FRAME_PROCESSOR ...] frame processors (choices: face_swapper, face_enhancer, ...)
--frame-processor FRAME_PROCESSOR [FRAME_PROCESSOR ...] frame processors (choices: face_swapper, face_enhancer, super_resolution...)
--keep-fps keep original fps
--keep-audio keep original audio
--keep-frames keep temporary frames
--many-faces process every face
--video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder
--video-quality [0-51] adjust output video quality
--live-mirror the live camera display as you see it in the front-facing camera frame
--live-resizable the live camera frame is resizable
--max-memory MAX_MEMORY maximum amount of RAM in GB
--execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...)
--execution-threads EXECUTION_THREADS number of execution threads
--headless run in headless mode
--enhancer-upscale-factor Sets the upscale factor for the enhancer. Only applies if `face_enhancer` is set as a frame-processor
--source-image-scaling-factor Set the upscale factor for source images. Only applies if `face_swapper` is set as a frame-processor
-r SCALE, --super-resolution-scale-factor SCALE Super resolution scale factor, choices are 2, 3, 4
-v, --version show program's version number and exit
```
Looking for a CLI mode? Using the -s/--source argument will make the run program in cli mode.
To improve the video quality, you can use the `super_resolution` frame processor after swapping the faces. It will enhance the video quality by 2x, 3x or 4x. You can set the upscale factor using the `-r` or `--super-resolution-scale-factor` argument.
Processing time will increase with the upscale factor, but it's quite quick.
```
## Credits
- [henryruhs](https://github.com/henryruhs): for being an irreplaceable contributor to the project
- [ffmpeg](https://ffmpeg.org/): for making video related operations easy

View File

@@ -1,20 +1,38 @@
from typing import Any
from typing import Any, Optional
import cv2
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
def get_video_frame(video_path: str, frame_number: int = 0) -> Optional[Any]:
"""Retrieve a specific frame from a video."""
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
if not capture.isOpened():
print(f"Error: Cannot open video file {video_path}")
return None
frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
# Ensure frame_number is within the valid range
frame_number = max(0, min(frame_number, frame_total - 1))
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
capture.release()
if has_frame:
return frame
return None
if not has_frame:
print(f"Error: Cannot read frame {frame_number} from {video_path}")
return None
return frame
def get_video_frame_total(video_path: str) -> int:
"""Get the total number of frames in a video."""
capture = cv2.VideoCapture(video_path)
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
if not capture.isOpened():
print(f"Error: Cannot open video file {video_path}")
return 0
frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total
return frame_total

View File

@@ -1,16 +1,17 @@
import os
import sys
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform
import signal
import shutil
import argparse
from typing import List
# Set environment variables for CUDA performance and TensorFlow logging
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import torch
import onnxruntime
import tensorflow
@@ -19,34 +20,73 @@ import modules.globals
import modules.metadata
import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
from modules.utilities import (
has_image_extension,
is_image,
is_video,
detect_fps,
create_video,
extract_frames,
get_temp_frame_paths,
restore_audio,
create_temp,
move_temp,
clean_temp,
normalize_output_path
)
# Filter warnings
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
# Cross-platform resource management
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
program.add_argument('-s', '--source', help='Select a source image', dest='source_path')
program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path')
program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor',
default=['face_swapper'], choices=['face_swapper', 'face_enhancer', 'super_resolution'],
nargs='+')
program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true',
default=True)
program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true',
default=False)
program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true',
default=False)
program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264',
choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int,
default=18,
choices=range(52), metavar='[0-51]')
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame',
dest='live_mirror', action='store_true', default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable',
dest='live_resizable', action='store_true', default=False)
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
default=suggest_max_memory())
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
default=suggest_execution_threads())
program.add_argument('--headless', help='Run in headless mode', dest='headless', default=False, action='store_true')
program.add_argument('--enhancer-upscale-factor',
help='Sets the upscale factor for the enhancer. Only applies if `face_enhancer` is set as a frame-processor',
dest='enhancer_upscale_factor', type=int, default=1)
program.add_argument('--source-image-scaling-factor', help='Set the upscale factor for source images',
dest='source_image_scaling_factor', default=2, type=int)
program.add_argument('-r', '--super-resolution-scale-factor', dest='super_resolution_scale_factor',
help='Set the upscale factor for super resolution', default=4, choices=[2, 3, 4], type=int)
program.add_argument('-v', '--version', action='version',
version=f'{modules.metadata.name} {modules.metadata.version}')
# register deprecated args
# Register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
@@ -56,7 +96,8 @@ def parse_args() -> None:
modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path)
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path,
args.output_path)
modules.globals.frame_processors = args.frame_processor
modules.globals.headless = args.source_path or args.target_path or args.output_path
modules.globals.keep_fps = args.keep_fps
@@ -65,23 +106,31 @@ def parse_args() -> None:
modules.globals.many_faces = args.many_faces
modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality
modules.globals.live_mirror = args.live_mirror
modules.globals.live_resizable = args.live_resizable
modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads
modules.globals.headless = args.headless
modules.globals.enhancer_upscale_factor = args.enhancer_upscale_factor
modules.globals.source_image_scaling_factor = args.source_image_scaling_factor
modules.globals.sr_scale_factor = args.super_resolution_scale_factor
# Handle face enhancer tumbler
modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor
#for ENHANCER tumbler:
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
modules.globals.nsfw = False
# translate deprecated args
# Handle deprecated arguments
handle_deprecated_args(args)
def handle_deprecated_args(args) -> None:
"""Handle deprecated arguments by translating them to the new format."""
if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path,
args.output_path)
if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated
@@ -92,7 +141,7 @@ def parse_args() -> None:
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
@@ -100,18 +149,22 @@ def parse_args() -> None:
def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
available_providers = onnxruntime.get_available_providers()
encoded_providers = encode_execution_providers(available_providers)
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
if req in encoded_providers]
# Default to CPU if no suitable providers are found
return selected_providers if selected_providers else ['CPUExecutionProvider']
def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin':
return 4
return 16
return 4 if platform.system().lower() == 'darwin' else 16
def suggest_execution_providers() -> List[str]:
@@ -119,34 +172,43 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int:
if 'DmlExecutionProvider' in modules.globals.execution_providers:
if 'dml' in modules.globals.execution_providers:
return 1
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
if 'rocm' in modules.globals.execution_providers:
return 1
return 8
def limit_resources() -> None:
# prevent tensorflow memory leak
# Prevent TensorFlow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
# limit memory usage
# Limit memory usage
if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
memory = modules.globals.max_memory * 1024 ** 3
elif platform.system().lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else:
import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
try:
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
if memory > hard:
print(
f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
memory = hard
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
except ValueError as e:
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
if 'cuda' in modules.globals.execution_providers:
torch.cuda.empty_cache()
@@ -157,12 +219,15 @@ def pre_check() -> bool:
if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.')
return False
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
return False
return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None:
print(f'[{scope}] {message}')
if not modules.globals.headless:
if not modules.globals.headless and ui.status_label:
ui.update_status(message)
@@ -170,37 +235,70 @@ def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
# process image to image
# Process image to image
if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw == False:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
process_image_to_image()
return
# process image to videos
if modules.globals.nsfw == False:
# Process image to video
process_image_to_video()
def process_image_to_image() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy(to_quit=False)
update_status('Processing to image ignored!')
return
try:
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
except Exception as e:
print("Error copying file:", str(e))
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path,
modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeeded!')
else:
update_status('Processing to image failed!')
def process_image_to_video() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_video
if predict_video(modules.globals.target_path):
destroy()
update_status('Creating temp resources...')
destroy(to_quit=False)
update_status('Processing to video ignored!')
return
update_status('Creating temporary resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
update_status('Processing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources()
# handles fps
handle_video_fps()
handle_video_audio()
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeeded!')
else:
update_status('Processing to video failed!')
def handle_video_fps() -> None:
if modules.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path)
@@ -209,7 +307,9 @@ def start() -> None:
else:
update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path)
# handle audio
def handle_video_audio() -> None:
if modules.globals.keep_audio:
if modules.globals.keep_fps:
update_status('Restoring audio...')
@@ -218,30 +318,29 @@ def start() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path)
else:
move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
else:
update_status('Processing to video failed!')
def destroy() -> None:
def destroy(to_quit=True) -> None:
if modules.globals.target_path:
clean_temp(modules.globals.target_path)
quit()
if to_quit: quit()
def run() -> None:
parse_args()
if not pre_check():
return
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
try:
parse_args()
if not pre_check():
return
limit_resources()
if modules.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
return
limit_resources()
if modules.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()
except Exception as e:
print(f"UI initialization failed: {str(e)}")
update_status(f"UI initialization failed: {str(e)}")
destroy() # Ensure any resources are cleaned up on failure

View File

@@ -1,31 +1,27 @@
from typing import Any
from typing import Any, Optional
import insightface
import modules.globals
from modules.typing import Frame
FACE_ANALYSER = None
FACE_ANALYSER: Optional[insightface.app.FaceAnalysis] = None
def get_face_analyser() -> Any:
def get_face_analyser() -> insightface.app.FaceAnalysis:
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers)
FACE_ANALYSER = insightface.app.FaceAnalysis(
name='buffalo_l',
providers=modules.globals.execution_providers
)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER
def get_one_face(frame: Frame) -> Optional[Any]:
faces = get_face_analyser().get(frame)
return min(faces, key=lambda x: x.bbox[0], default=None)
def get_one_face(frame: Frame) -> Any:
face = get_face_analyser().get(frame)
try:
return min(face, key=lambda x: x.bbox[0])
except ValueError:
return None
def get_many_faces(frame: Frame) -> Any:
try:
return get_face_analyser().get(frame)
except IndexError:
return None
def get_many_faces(frame: Frame) -> Optional[Any]:
faces = get_face_analyser().get(frame)
return faces if faces else None

View File

@@ -19,6 +19,8 @@ keep_frames = None
many_faces = None
video_encoder = None
video_quality = None
live_mirror = None
live_resizable = None
max_memory = None
execution_providers: List[str] = []
execution_threads = None
@@ -27,4 +29,7 @@ log_level = 'error'
fp_ui: Dict[str, bool] = {}
nsfw = None
camera_input_combobox = None
webcam_preview_running = False
webcam_preview_running = False
enhancer_upscale_factor = 1
source_image_scaling_factor = 2
sr_scale_factor = 4

View File

@@ -1,24 +1,25 @@
import numpy
import numpy as np
import opennsfw2
from PIL import Image
from modules.typing import Frame
MAX_PROBABILITY = 0.85
# Preload the model once for efficiency
model = None
def predict_frame(target_frame: Frame) -> bool:
global model
if model is None: model = opennsfw2.make_open_nsfw_model()
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
model = opennsfw2.make_open_nsfw_model()
views = numpy.expand_dims(image, axis=0)
views = np.expand_dims(image, axis=0)
_, probability = model.predict(views)[0]
return probability > MAX_PROBABILITY
def predict_image(target_path: str) -> bool:
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
probability = opennsfw2.predict_image(target_path)
return probability > MAX_PROBABILITY
def predict_video(target_path: str) -> bool:
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)

View File

@@ -17,57 +17,56 @@ FRAME_PROCESSORS_INTERFACE = [
'process_video'
]
def load_frame_processor_module(frame_processor: str) -> Any:
def load_frame_processor_module(frame_processor: str) -> ModuleType:
try:
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
# Ensure all required methods are present
for method_name in FRAME_PROCESSORS_INTERFACE:
if not hasattr(frame_processor_module, method_name):
sys.exit()
raise AttributeError(f"Missing required method {method_name} in {frame_processor} module.")
except ImportError:
print(f"Frame processor {frame_processor} not found")
sys.exit()
print(f"Error: Frame processor '{frame_processor}' not found.")
sys.exit(1)
except AttributeError as e:
print(e)
sys.exit(1)
return frame_processor_module
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES:
for frame_processor in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
FRAME_PROCESSORS_MODULES = [load_frame_processor_module(fp) for fp in frame_processors]
set_frame_processors_modules_from_ui(frame_processors)
return FRAME_PROCESSORS_MODULES
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
global FRAME_PROCESSORS_MODULES
for frame_processor, state in modules.globals.fp_ui.items():
if state == True and frame_processor not in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
if state and frame_processor not in frame_processors:
module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(module)
modules.globals.frame_processors.append(frame_processor)
if state == False:
try:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.remove(frame_processor_module)
modules.globals.frame_processors.remove(frame_processor)
except:
pass
elif not state and frame_processor in frame_processors:
module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.remove(module)
modules.globals.frame_processors.remove(frame_processor)
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
future = executor.submit(process_frames, source_path, [path], progress)
futures.append(future)
futures = [executor.submit(process_frames, source_path, [path], progress) for path in temp_frame_paths]
for future in futures:
future.result()
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory})
progress.set_postfix({
'execution_providers': modules.globals.execution_providers,
'execution_threads': modules.globals.execution_threads,
'max_memory': modules.globals.max_memory
})
multi_process_frame(source_path, frame_paths, process_frames, progress)

View File

@@ -8,7 +8,7 @@ import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
from modules.typing import Frame, Face # Ensure these are imported
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
FACE_ENHANCER = None
@@ -16,34 +16,29 @@ THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-ENHANCER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('..\models')
conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth'])
return True
def pre_start() -> bool:
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_enhancer() -> Any:
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
if os.name == 'nt':
model_path = resolve_relative_path('..\models\GFPGANv1.4.pth')
# todo: set models path https://github.com/TencentARC/GFPGAN/issues/399
else:
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
FACE_ENHANCER = gfpgan.GFPGANer(
model_path=model_path,
upscale=modules.globals.enhancer_upscale_factor
) # type: ignore[attr-defined]
return FACE_ENHANCER
def enhance_face(temp_frame: Frame) -> Frame:
with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(
@@ -52,14 +47,12 @@ def enhance_face(temp_frame: Frame) -> Frame:
)
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
@@ -68,12 +61,10 @@ def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)

View File

@@ -2,6 +2,7 @@ from typing import Any, List
import cv2
import insightface
import threading
import os
import modules.globals
import modules.processors.frame.core
@@ -9,45 +10,59 @@ from modules.core import update_status
from modules.face_analyser import get_one_face, get_many_faces
from modules.typing import Face, Frame
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
import numpy as np
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-SWAPPER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'])
conditional_download(download_directory_path, [
'https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128.onnx'
])
return True
def pre_start() -> bool:
if not is_image(modules.globals.source_path):
update_status('Select an image for source path.', NAME)
return False
elif not get_one_face(cv2.imread(modules.globals.source_path)):
update_status('No face in source path detected.', NAME)
update_status('No face detected in the source path.', NAME)
return False
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_swapper() -> Any:
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = resolve_relative_path('../models/inswapper_128_fp16.onnx')
model_path = resolve_relative_path('../models/inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers)
return FACE_SWAPPER
def upscale_image(image: np.ndarray, scaling_factor: int = modules.globals.source_image_scaling_factor) -> np.ndarray:
"""
Upscales the given image by the specified scaling factor.
Args:
image (np.ndarray): The input image to upscale.
scaling_factor (int): The factor by which to upscale the image.
Returns:
np.ndarray: The upscaled image.
"""
height, width = image.shape[:2]
new_size = (width * scaling_factor, height * scaling_factor)
upscaled_image = cv2.resize(image, new_size, interpolation=cv2.INTER_CUBIC)
return upscaled_image
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if modules.globals.many_faces:
many_faces = get_many_faces(temp_frame)
@@ -60,27 +75,30 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
temp_frame = swap_face(source_face, target_face, temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
source_face = get_one_face(cv2.imread(source_path))
source_image = cv2.imread(source_path)
if source_image is None:
print(f"Failed to load source image from {source_path}")
return
# Upscale the source image for better quality
source_image_upscaled = upscale_image(source_image, scaling_factor=2)
source_face = get_one_face(source_image_upscaled)
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame(source_face, temp_frame)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
print(f"Error processing frame {temp_frame_path}: {exception}")
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
source_face = get_one_face(cv2.imread(source_path))
target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)

View File

@@ -0,0 +1,197 @@
import threading
import traceback
from typing import Any, List
import cv2
import os
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
import numpy as np
NAME = 'DLC.SUPER-RESOLUTION'
THREAD_SEMAPHORE = threading.Semaphore()
# Singleton class for Super-Resolution
class SuperResolutionModel:
_instance = None
_lock = threading.Lock()
def __init__(self, sr_model_path: str = f'ESPCN_x{modules.globals.sr_scale_factor}.pb'):
if SuperResolutionModel._instance is not None:
raise Exception("This class is a singleton!")
self.sr = cv2.dnn_superres.DnnSuperResImpl_create()
self.model_path = os.path.join(resolve_relative_path('../models'), sr_model_path)
if not os.path.exists(self.model_path):
raise FileNotFoundError(f"Super-resolution model not found at {self.model_path}")
try:
self.sr.readModel(self.model_path)
self.sr.setModel("espcn", modules.globals.sr_scale_factor) # Using ESPCN with 2,3 or 4x upscaling
except Exception as e:
print(f"Error during super-resolution model initialization: {e}")
raise e
@classmethod
def get_instance(cls, sr_model_path: str = f'ESPCN_x{modules.globals.sr_scale_factor}.pb'):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
try:
cls._instance = cls(sr_model_path)
except Exception as e:
raise RuntimeError(f"Failed to initialize SuperResolution: {str(e)}")
return cls._instance
def pre_check() -> bool:
"""
Checks and downloads necessary models before starting the face swapper.
"""
download_directory_path = resolve_relative_path('../models')
# Download the super-resolution model as well
conditional_download(download_directory_path, [
f'https://huggingface.co/spaces/PabloGabrielSch/AI_Resolution_Upscaler_And_Resizer/resolve/bcd13b766a9499196e8becbe453c4a848673b3b6/models/ESPCN_x{modules.globals.sr_scale_factor}.pb'
])
return True
def pre_start() -> bool:
if not is_image(modules.globals.source_path):
update_status('Select an image for source path.', NAME)
return False
elif not get_one_face(cv2.imread(modules.globals.source_path)):
update_status('No face detected in the source path.', NAME)
return False
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def apply_super_resolution(image: np.ndarray) -> np.ndarray:
"""
Applies super-resolution to the given image using the provided super-resolver.
Args:
image (np.ndarray): The input image to enhance.
sr_model_path (str): ESPCN model path for super-resolution.
Returns:
np.ndarray: The super-resolved image.
"""
with THREAD_SEMAPHORE:
sr_model = SuperResolutionModel.get_instance()
if sr_model is None:
print("Super-resolution model is not initialized.")
return image
try:
upscaled_image = sr_model.sr.upsample(image)
return upscaled_image
except Exception as e:
print(f"Error during super-resolution: {e}")
return image
def process_frame(frame: np.ndarray) -> np.ndarray:
"""
Processes a single frame by swapping the source face into detected target faces.
Args:
frame (np.ndarray): The target frame image.
Returns:
np.ndarray: The processed frame with swapped faces.
"""
# Apply super-resolution to the entire frame
frame = apply_super_resolution(frame)
return frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
"""
Processes multiple frames by swapping the source face into each target frame.
Args:
source_path (str): Path to the source image.
temp_frame_paths (List[str]): List of paths to target frame images.
progress (Any, optional): Progress tracker. Defaults to None.
"""
for idx, temp_frame_path in enumerate(temp_frame_paths):
frame = cv2.imread(temp_frame_path)
if frame is None:
print(f"Failed to load frame from {temp_frame_path}")
continue
try:
result = process_frame(frame)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
traceback.print_exc()
print(f"Error processing frame {temp_frame_path}: {exception}")
if progress:
progress.update(1)
def upscale_image(image: np.ndarray, scaling_factor: int = 2) -> np.ndarray:
"""
Upscales the given image by the specified scaling factor.
Args:
image (np.ndarray): The input image to upscale.
scaling_factor (int): The factor by which to upscale the image.
Returns:
np.ndarray: The upscaled image.
"""
height, width = image.shape[:2]
new_size = (width * scaling_factor, height * scaling_factor)
upscaled_image = cv2.resize(image, new_size, interpolation=cv2.INTER_CUBIC)
return upscaled_image
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""
Processes a single image by swapping the source face into the target image.
Args:
source_path (str): Path to the source image.
target_path (str): Path to the target image.
output_path (str): Path to save the output image.
"""
source_image = cv2.imread(source_path)
if source_image is None:
print(f"Failed to load source image from {source_path}")
return
# Upscale the source image for better quality before face detection
source_image_upscaled = upscale_image(source_image, scaling_factor=2)
# Detect source face from the upscaled image
source_face = get_one_face(source_image_upscaled)
if source_face is None:
print("No source face detected.")
return
target_frame = cv2.imread(target_path)
if target_frame is None:
print(f"Failed to load target image from {target_path}")
return
# Process the frame
result = process_frame(target_frame)
# Save the processed frame
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""
Processes a video by swapping the source face into each frame.
Args:
source_path (str): Path to the source image.
temp_frame_paths (List[str]): List of paths to video frame images.
"""
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)

View File

@@ -1,76 +1,57 @@
{
"CTk": {
"fg_color": ["gray95", "gray10"]
"fg_color": ["#FFFFFF", "#2D2D2D"]
},
"CTkToplevel": {
"fg_color": ["gray95", "gray10"]
"fg_color": ["#FFFFFF", "#2D2D2D"]
},
"CTkFrame": {
"corner_radius": 0,
"border_width": 0,
"fg_color": ["gray90", "gray13"],
"top_fg_color": ["gray85", "gray16"],
"border_color": ["gray65", "gray28"]
"fg_color": ["#F0F0F0", "#3C3C3C"],
"top_fg_color": ["#E0E0E0", "#4B4B4B"],
"border_color": ["#B0B0B0", "#5A5A5A"]
},
"CTkButton": {
"corner_radius": 0,
"border_width": 0,
"fg_color": ["#2aa666", "#1f538d"],
"hover_color": ["#3cb666", "#14375e"],
"border_color": ["#3e4a40", "#949A9F"],
"text_color": ["#f3faf6", "#f3faf6"],
"fg_color": ["#007ACC", "#007ACC"],
"hover_color": ["#005EA3", "#005EA3"],
"border_color": ["#004C8A", "#004C8A"],
"text_color": ["#FFFFFF", "#FFFFFF"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkLabel": {
"corner_radius": 0,
"fg_color": "transparent",
"text_color": ["gray14", "gray84"]
"text_color": ["#000000", "#FFFFFF"]
},
"CTkEntry": {
"corner_radius": 0,
"border_width": 2,
"fg_color": ["#F9F9FA", "#343638"],
"border_color": ["#979DA2", "#565B5E"],
"text_color": ["gray14", "gray84"],
"fg_color": ["#FFFFFF", "#333333"],
"border_color": ["#A0A0A0", "#5A5A5A"],
"text_color": ["#000000", "#FFFFFF"],
"placeholder_text_color": ["gray52", "gray62"]
},
"CTkCheckbox": {
"corner_radius": 0,
"border_width": 3,
"fg_color": ["#2aa666", "#1f538d"],
"border_color": ["#3e4a40", "#949A9F"],
"hover_color": ["#3cb666", "#14375e"],
"checkmark_color": ["#f3faf6", "gray90"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkSwitch": {
"corner_radius": 1000,
"border_width": 3,
"button_length": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["#2aa666", "#1f538d"],
"button_color": ["gray36", "#D5D9DE"],
"button_hover_color": ["gray20", "gray100"],
"text_color": ["gray14", "gray84"],
"button_color": ["#444444", "#D5D9DE"],
"button_hover_color": ["#333333", "#FFFFFF"],
"text_color": ["#000000", "#FFFFFF"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkRadiobutton": {
"corner_radius": 1000,
"border_width_checked": 6,
"border_width_unchecked": 3,
"CTkOptionMenu": {
"corner_radius": 0,
"fg_color": ["#2aa666", "#1f538d"],
"border_color": ["#3e4a40", "#949A9F"],
"hover_color": ["#3cb666", "#14375e"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkProgressBar": {
"corner_radius": 1000,
"border_width": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["#2aa666", "#1f538d"],
"border_color": ["gray", "gray"]
"button_color": ["#3cb666", "#14375e"],
"button_hover_color": ["#234567", "#1e2c40"],
"text_color": ["#FFFFFF", "#FFFFFF"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkSlider": {
"corner_radius": 1000,
@@ -82,59 +63,6 @@
"button_color": ["#2aa666", "#1f538d"],
"button_hover_color": ["#3cb666", "#14375e"]
},
"CTkOptionMenu": {
"corner_radius": 0,
"fg_color": ["#2aa666", "#1f538d"],
"button_color": ["#3cb666", "#14375e"],
"button_hover_color": ["#234567", "#1e2c40"],
"text_color": ["#f3faf6", "#f3faf6"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkComboBox": {
"corner_radius": 0,
"border_width": 2,
"fg_color": ["#F9F9FA", "#343638"],
"border_color": ["#979DA2", "#565B5E"],
"button_color": ["#979DA2", "#565B5E"],
"button_hover_color": ["#6E7174", "#7A848D"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray50", "gray45"]
},
"CTkScrollbar": {
"corner_radius": 1000,
"border_spacing": 4,
"fg_color": "transparent",
"button_color": ["gray55", "gray41"],
"button_hover_color": ["gray40", "gray53"]
},
"CTkSegmentedButton": {
"corner_radius": 0,
"border_width": 2,
"fg_color": ["#979DA2", "gray29"],
"selected_color": ["#2aa666", "#1f538d"],
"selected_hover_color": ["#3cb666", "#14375e"],
"unselected_color": ["#979DA2", "gray29"],
"unselected_hover_color": ["gray70", "gray41"],
"text_color": ["#f3faf6", "#f3faf6"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkTextbox": {
"corner_radius": 0,
"border_width": 0,
"fg_color": ["gray100", "gray20"],
"border_color": ["#979DA2", "#565B5E"],
"text_color": ["gray14", "gray84"],
"scrollbar_button_color": ["gray55", "gray41"],
"scrollbar_button_hover_color": ["gray40", "gray53"]
},
"CTkScrollableFrame": {
"label_fg_color": ["gray80", "gray21"]
},
"DropdownMenu": {
"fg_color": ["gray90", "gray20"],
"hover_color": ["gray75", "gray28"],
"text_color": ["gray14", "gray84"]
},
"CTkFont": {
"macOS": {
"family": "Avenir",
@@ -152,7 +80,12 @@
"weight": "normal"
}
},
"DropdownMenu": {
"fg_color": ["#FFFFFF", "#2D2D2D"],
"hover_color": ["#E0E0E0", "#4B4B4B"],
"text_color": ["#000000", "#FFFFFF"]
},
"URL": {
"text_color": ["gray74", "gray60"]
"text_color": ["#007ACC", "#1E90FF"]
}
}

View File

@@ -1,9 +1,21 @@
import os
import platform
import webbrowser
import customtkinter as ctk
from typing import Callable, Tuple
from typing import Callable, Tuple, List, Any
from types import ModuleType
import cv2
from PIL import Image, ImageOps
import pyvirtualcam
# Import OS-specific modules only when necessary
if platform.system() == 'Darwin': # macOS
import AVFoundation
# Import Windows specific modules only when on windows platform
if platform.system() == 'Windows' or platform.system() == 'Linux': # Windows or Linux
from pygrabber.dshow_graph import FilterGraph
import modules.globals
import modules.metadata
@@ -13,12 +25,14 @@ from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import is_image, is_video, resolve_relative_path
ROOT = None
ROOT_HEIGHT = 700
ROOT_HEIGHT = 800
ROOT_WIDTH = 600
PREVIEW = None
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
PREVIEW_MAX_WIDTH = 1200
PREVIEW_DEFAULT_WIDTH = 960
PREVIEW_DEFAULT_HEIGHT = 540
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
@@ -32,10 +46,49 @@ status_label = None
img_ft, vid_ft = modules.globals.file_types
camera = None
def check_camera_permissions():
"""Check and request camera access permission on macOS."""
if platform.system() == 'Darwin': # macOS-specific
auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo)
if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined:
# Request access to the camera
def completion_handler(granted):
if granted:
print("Access granted to the camera.")
else:
print("Access denied to the camera.")
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler)
elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized:
print("Camera access already authorized.")
elif auth_status == AVFoundation.AVAuthorizationStatusDenied:
print("Camera access denied. Please enable it in System Preferences.")
elif auth_status == AVFoundation.AVAuthorizationStatusRestricted:
print("Camera access restricted. The app is not allowed to use the camera.")
def select_camera(camera_name: str):
"""Select the appropriate camera based on its name (cross-platform)."""
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.localizedName() == camera_name:
return device
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# On Windows/Linux, simply return the camera name as OpenCV can handle it by index
return camera_name
return None
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global ROOT, PREVIEW
if platform.system() == 'Darwin': # macOS-specific
check_camera_permissions() # Check camera permissions before initializing the UI
ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT)
@@ -49,101 +102,102 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
ctk.set_appearance_mode('system')
ctk.set_default_color_theme(resolve_relative_path('ui.json'))
print("Creating root window...")
root = ctk.CTk()
root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}')
root.configure()
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
source_label.place(relx=0.1, rely=0.0875, relwidth=0.3, relheight=0.25)
target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
target_label.place(relx=0.6, rely=0.0875, relwidth=0.3, relheight=0.25)
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path)
source_button.place(relx=0.1, rely=0.35, relwidth=0.3, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
swap_faces_button = ctk.CTkButton(root, text='', cursor='hand2', command=lambda: swap_faces_paths())
swap_faces_button.place(relx=0.45, rely=0.4, relwidth=0.1, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path)
target_button.place(relx=0.6, rely=0.35, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_fps_checkbox.place(relx=0.1, rely=0.525)
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65)
keep_frames_switch.place(relx=0.1, rely=0.56875)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7)
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.6125)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_switch.place(relx=0.6, rely=0.6)
keep_audio_switch.place(relx=0.6, rely=0.525)
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()))
many_faces_switch.place(relx=0.6, rely=0.65)
many_faces_switch.place(relx=0.6, rely=0.56875)
nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
nsfw_switch.place(relx=0.6, rely=0.7)
nsfw_switch.place(relx=0.6, rely=0.6125)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
start_button.place(relx=0.15, rely=0.7, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy)
stop_button.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview)
preview_button.place(relx=0.65, rely=0.7, relwidth=0.2, relheight=0.05)
# --- Camera Selection ---
camera_label = ctk.CTkLabel(root, text="Select Camera:")
camera_label.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
camera_label.place(relx=0.4, rely=0.7525, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
# Convert camera indices to strings for CTkOptionMenu
available_camera_strings = [str(cam) for cam in available_cameras]
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable,
values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.7525, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(int(camera_variable.get())))
live_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
# --- End Camera Selection ---
virtual_cam_out_value = ctk.BooleanVar(value=False)
virtual_cam_out_switch = ctk.CTkSwitch(root, text='Virtual Cam Output (OBS)', variable=virtual_cam_out_value, cursor='hand2')
virtual_cam_out_switch.place(relx=0.4, rely=0.805)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get(), virtual_cam_out_value.get()))
live_button.place(relx=0.15, rely=0.7525, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
status_label.place(relx=0.1, relwidth=0.8, rely=0.875)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
donate_label.bind('<Button-1>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
return root
def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel:
def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel:
global preview_label, preview_slider
preview = ctk.CTkToplevel(parent)
preview.withdraw()
preview.title('Preview')
preview.configure()
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.resizable(width=False, height=False)
preview.protocol('WM_DELETE_WINDOW', toggle_preview)
preview.resizable(width=True, height=True)
preview_label = ctk.CTkLabel(preview, text=None)
preview_label.pack(fill='both', expand=True)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value))
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=update_preview)
return preview
@@ -158,10 +212,10 @@ def update_tumbler(var: str, value: bool) -> None:
def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft
global RECENT_DIRECTORY_SOURCE
PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
source_path = ctk.filedialog.askopenfilename(title='Select a source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if is_image(source_path):
modules.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
@@ -172,11 +226,35 @@ def select_source_path() -> None:
source_label.configure(image=None)
def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET, img_ft, vid_ft
def swap_faces_paths() -> None:
global RECENT_DIRECTORY_SOURCE, RECENT_DIRECTORY_TARGET
source_path = modules.globals.source_path
target_path = modules.globals.target_path
if not is_image(source_path) or not is_image(target_path):
return
modules.globals.source_path = target_path
modules.globals.target_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
source_image = render_image_preview(modules.globals.source_path, (200, 200))
source_label.configure(image=source_image)
target_image = render_image_preview(modules.globals.target_path, (200, 200))
target_label.configure(image=target_image)
def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='Select a target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
if is_image(target_path):
modules.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
@@ -193,12 +271,12 @@ def select_target_path() -> None:
def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft
global RECENT_DIRECTORY_OUTPUT
if is_image(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(title='Save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(title='Save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
else:
output_path = None
if output_path:
@@ -219,13 +297,13 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
capture.release()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
return None
def toggle_preview() -> None:
@@ -235,12 +313,17 @@ def toggle_preview() -> None:
init_preview()
update_preview()
PREVIEW.deiconify()
global camera
if PREVIEW.state() == 'withdrawn':
if camera and camera.isOpened():
camera.release()
camera = None
def init_preview() -> None:
if is_image(modules.globals.target_path):
preview_slider.pack_forget()
if is_video(modules.globals.target_path):
elif is_video(modules.globals.target_path):
video_frame_total = get_video_frame_total(modules.globals.target_path)
preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x')
@@ -250,7 +333,7 @@ def init_preview() -> None:
def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path:
temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if modules.globals.nsfw == False:
if not modules.globals.nsfw:
from modules.predicter import predict_frame
if predict_frame(temp_frame):
quit()
@@ -264,62 +347,148 @@ def update_preview(frame_number: int = 0) -> None:
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
def webcam_preview(camera_index: int):
def webcam_preview_loop(camera: cv2.VideoCapture, source_image: Any, frame_processors: List[ModuleType], virtual_cam: pyvirtualcam.Camera = None) -> bool:
global preview_label, PREVIEW
ret, frame = camera.read()
if not ret:
update_status(f"Error: Frame not received from camera.")
return False
temp_frame = frame.copy()
if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1) # horizontal flipping
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height())
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
if virtual_cam:
virtual_cam.send(temp_frame)
virtual_cam.sleep_until_next_frame()
ROOT.update()
if PREVIEW.state() == 'withdrawn':
return False
return True
def fit_image_to_size(image, width: int, height: int):
if width is None and height is None:
return image
h, w, _ = image.shape
ratio_h = 0.0
ratio_w = 0.0
if width > height:
ratio_h = height / h
else:
ratio_w = width / w
ratio = max(ratio_w, ratio_h)
new_size = (int(ratio * w), int(ratio * h))
return cv2.resize(image, dsize=new_size)
def webcam_preview(camera_name: str, virtual_cam_output: bool):
if modules.globals.source_path is None:
# No image selected
return
global preview_label, PREVIEW
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
update_status(f"Error: Could not open camera with index {camera_index}")
WIDTH = 960
HEIGHT = 540
FPS = 60
# Select the camera by its name
selected_camera = select_camera(camera_name)
if selected_camera is None:
update_status(f"No suitable camera found.")
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
PREVIEW_MAX_WIDTH = 960
PREVIEW_MAX_HEIGHT = 540
# Use OpenCV's camera index for cross-platform compatibility
camera_index = get_camera_index_by_name(camera_name)
preview_label.configure(image=None) # Reset the preview image before startup
global camera
camera = cv2.VideoCapture(camera_index)
PREVIEW.deiconify() # Open preview window
if not camera.isOpened():
update_status(f"Error: Could not open camera {camera_name}")
return
camera.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
camera.set(cv2.CAP_PROP_FPS, FPS)
PREVIEW_MAX_WIDTH = WIDTH
PREVIEW_MAX_HEIGHT = HEIGHT
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = get_one_face(cv2.imread(modules.globals.source_path))
source_image = None # Initialize variable for the selected face image
preview_running = True
while True:
ret, frame = cap.read()
if not ret:
break
if virtual_cam_output:
with pyvirtualcam.Camera(width=WIDTH, height=HEIGHT, fps=FPS, fmt=pyvirtualcam.PixelFormat.BGR) as virtual_cam:
while preview_running:
preview_running = webcam_preview_loop(camera, source_image, frame_processors, virtual_cam)
# Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
while preview_running:
preview_running = webcam_preview_loop(camera, source_image, frame_processors)
temp_frame = frame.copy() #Create a copy of the frame
if camera: camera.release()
PREVIEW.withdraw()
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
image = Image.fromarray(image)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
def get_camera_index_by_name(camera_name: str) -> int:
"""Map camera name to index for OpenCV."""
if platform.system() == 'Darwin': # macOS-specific
if "FaceTime" in camera_name:
return 0 # Assuming FaceTime is at index 0
elif "iPhone" in camera_name:
return 1 # Assuming iPhone camera is at index 1
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Map camera name to index dynamically (OpenCV on these platforms usually starts with 0)
return get_available_cameras().index(camera_name)
return -1
cap.release()
PREVIEW.withdraw() # Close preview window when loop is finished
def get_available_cameras():
"""Returns a list of available camera indices."""
"""Get available camera names (cross-platform)."""
available_cameras = []
for index in range(10): # Check for cameras with index 0 to 9
cap = cv2.VideoCapture(index)
if cap.isOpened():
available_cameras.append(index)
cap.release()
return available_cameras
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.deviceType() == AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera:
print(f"Found Built-In Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeExternal":
print(f"Found External Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeContinuityCamera":
print(f"Skipping Continuity Camera: {device.localizedName()}")
elif platform.system() == 'Windows' or platform.system() == 'Linux':
try:
devices = FilterGraph().get_input_devices()
except Exception as e:
# Use OpenCV to detect camera indexes
index = 0
devices = []
while True:
cap = cv2.VideoCapture(index)
if not cap.isOpened():
break
devices.append(f"Camera {index}")
cap.release()
index += 1
available_cameras = devices
return available_cameras

View File

@@ -5,7 +5,7 @@ import platform
import shutil
import ssl
import subprocess
import urllib
import urllib.request
from pathlib import Path
from typing import List, Any
from tqdm import tqdm
@@ -15,127 +15,123 @@ import modules.globals
TEMP_FILE = 'temp.mp4'
TEMP_DIRECTORY = 'temp'
# monkey patch ssl for mac
# Monkey patch SSL for macOS to handle issues with some HTTPS requests
if platform.system().lower() == 'darwin':
ssl._create_default_https_context = ssl._create_unverified_context
def run_ffmpeg(args: List[str]) -> bool:
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
return True
except Exception:
pass
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e.output.decode()}")
return False
def detect_fps(target_path: str) -> float:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
output = subprocess.check_output(command).decode().strip().split('/')
command = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=r_frame_rate',
'-of', 'default=noprint_wrappers=1:nokey=1', target_path
]
try:
output = subprocess.check_output(command).decode().strip().split('/')
numerator, denominator = map(int, output)
return numerator / denominator
except Exception:
pass
except (subprocess.CalledProcessError, ValueError):
print("Failed to detect FPS, defaulting to 30.0 FPS.")
return 30.0
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
create_temp(target_path)
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
def create_video(target_path: str, fps: float = 30.0) -> None:
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
run_ffmpeg([
'-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'),
'-c:v', modules.globals.video_encoder,
'-crf', str(modules.globals.video_quality),
'-pix_fmt', 'yuv420p',
'-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1',
'-y', temp_output_path
])
def restore_audio(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
done = run_ffmpeg([
'-i', temp_output_path, '-i', target_path,
'-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path
])
if not done:
move_temp(target_path, output_path)
def get_temp_frame_paths(target_path: str) -> List[str]:
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
return glob.glob(os.path.join(glob.escape(temp_directory_path), '*.png'))
def get_temp_directory_path(target_path: str) -> str:
target_name, _ = os.path.splitext(os.path.basename(target_path))
target_directory_path = os.path.dirname(target_path)
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name)
target_name = Path(target_path).stem
target_directory_path = Path(target_path).parent
return str(target_directory_path / TEMP_DIRECTORY / target_name)
def get_temp_output_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_FILE)
return str(Path(temp_directory_path) / TEMP_FILE)
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
if source_path and target_path:
source_name, _ = os.path.splitext(os.path.basename(source_path))
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path):
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> str:
if source_path and target_path and os.path.isdir(output_path):
source_name = Path(source_path).stem
target_name = Path(target_path).stem
target_extension = Path(target_path).suffix
return str(Path(output_path) / f"{source_name}-{target_name}{target_extension}")
return output_path
def create_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
if os.path.isfile(temp_output_path):
if os.path.isfile(output_path):
os.remove(output_path)
shutil.move(temp_output_path, output_path)
def clean_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
parent_directory_path = Path(temp_directory_path).parent
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
os.rmdir(parent_directory_path)
if parent_directory_path.exists() and not list(parent_directory_path.iterdir()):
parent_directory_path.rmdir()
def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
mimetype, _ = mimetypes.guess_type(image_path)
return bool(mimetype and mimetype.startswith('image/'))
return mimetype and mimetype.startswith('image/')
return False
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
mimetype, _ = mimetypes.guess_type(video_path)
return bool(mimetype and mimetype.startswith('video/'))
return mimetype and mimetype.startswith('video/')
return False
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
download_directory = Path(download_directory_path)
download_directory.mkdir(parents=True, exist_ok=True)
for url in urls:
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
download_file_path = download_directory / Path(url).name
if not download_file_path.exists():
with urllib.request.urlopen(url) as request:
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size))
def resolve_relative_path(path: str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
return str(Path(__file__).parent / path)

View File

@@ -1,7 +1,7 @@
--extra-index-url https://download.pytorch.org/whl/cu118
numpy==1.23.5
opencv-python==4.8.1.78
opencv-contrib-python==4.10.0.84
onnx==1.16.0
insightface==0.7.3
psutil==5.9.8
@@ -21,3 +21,7 @@ opennsfw2==0.10.2
protobuf==4.23.2
tqdm==4.66.4
gfpgan==1.3.8
pyobjc==9.1; sys_platform == 'darwin'
pygrabber==0.2
pyvirtualcam==0.12.0
pyobjc-framework-AVFoundation==10.3.1; sys_platform == 'darwin'

View File

@@ -3,73 +3,35 @@ setlocal EnableDelayedExpansion
:: 1. Setup your platform
echo Setting up your platform...
:: Python
where python >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Python is not installed. Please install Python 3.10 or later.
pause
exit /b
)
:: Pip
where pip >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Pip is not installed. Please install Pip.
pause
exit /b
)
:: Git
where git >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Git is not installed. Installing Git...
winget install --id Git.Git -e --source winget
)
:: FFMPEG
where ffmpeg >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo FFMPEG is not installed. Installing FFMPEG...
winget install --id Gyan.FFmpeg -e --source winget
)
call :check_installation python "Python 3.10 or later"
call :check_installation pip "Pip"
call :install_if_missing git "Git" "winget install --id Git.Git -e --source winget"
call :install_if_missing ffmpeg "FFMPEG" "winget install --id Gyan.FFmpeg -e --source winget"
:: Visual Studio 2022 Runtimes
echo Installing Visual Studio 2022 Runtimes...
winget install --id Microsoft.VC++2015-2022Redist-x64 -e --source winget
:: 2. Clone Repository
if exist Deep-Live-Cam (
echo Deep-Live-Cam directory already exists.
set /p overwrite="Do you want to overwrite? (Y/N): "
if /i "%overwrite%"=="Y" (
rmdir /s /q Deep-Live-Cam
git clone https://github.com/hacksider/Deep-Live-Cam.git
) else (
echo Skipping clone, using existing directory.
)
) else (
git clone https://github.com/hacksider/Deep-Live-Cam.git
)
cd Deep-Live-Cam
call :clone_repository "https://github.com/iVideoGameBoss/iRoopDeepFaceCam.git" "iRoopDeepFaceCam"
:: 3. Download Models
echo Downloading models...
mkdir models
curl -L -o models/GFPGANv1.4.pth https://path.to.model/GFPGANv1.4.pth
curl -L -o models/inswapper_128_fp16.onnx https://path.to.model/inswapper_128_fp16.onnx
if not exist models mkdir models
curl -L -o models\GFPGANv1.4.pth https://huggingface.co/ivideogameboss/iroopdeepfacecam/resolve/main/GFPGANv1.4.pth
curl -L -o models\inswapper_128_fp16.onnx https://huggingface.co/ivideogameboss/iroopdeepfacecam/resolve/main/inswapper_128_fp16.onnx
:: 4. Install dependencies
echo Creating a virtual environment...
python -m venv venv
call venv\Scripts\activate
call venv\Scripts\activate.bat
echo Installing required Python packages...
pip install --upgrade pip
pip install -r requirements.txt
echo Setup complete. You can now run the application.
:menu
:: GPU Acceleration Options
echo.
echo Choose the GPU Acceleration Option if applicable:
@@ -81,42 +43,83 @@ echo 5. OpenVINO (Intel)
echo 6. None
set /p choice="Enter your choice (1-6): "
if "%choice%"=="1" (
echo Installing CUDA dependencies...
pip uninstall -y onnxruntime onnxruntime-gpu
pip install onnxruntime-gpu==1.16.3
set exec_provider="cuda"
) else if "%choice%"=="2" (
echo Installing CoreML (Apple Silicon) dependencies...
pip uninstall -y onnxruntime onnxruntime-silicon
pip install onnxruntime-silicon==1.13.1
set exec_provider="coreml"
) else if "%choice%"=="3" (
echo Installing CoreML (Apple Legacy) dependencies...
pip uninstall -y onnxruntime onnxruntime-coreml
pip install onnxruntime-coreml==1.13.1
set exec_provider="coreml"
) else if "%choice%"=="4" (
echo Installing DirectML dependencies...
pip uninstall -y onnxruntime onnxruntime-directml
pip install onnxruntime-directml==1.15.1
set exec_provider="directml"
) else if "%choice%"=="5" (
echo Installing OpenVINO dependencies...
pip uninstall -y onnxruntime onnxruntime-openvino
pip install onnxruntime-openvino==1.15.0
set exec_provider="openvino"
) else (
echo Skipping GPU acceleration setup.
)
set "exec_provider="
call :set_execution_provider %choice%
:end_choice
echo.
echo GPU Acceleration setup complete.
echo Selected provider: !exec_provider!
echo.
:: Run the application
if defined exec_provider (
echo Running the application with %exec_provider% execution provider...
python run.py --execution-provider %exec_provider%
echo Running the application with !exec_provider! execution provider...
python run.py --execution-provider !exec_provider!
) else (
echo Running the application...
python run.py
)
:: Deactivate the virtual environment
call venv\Scripts\deactivate.bat
echo.
echo Script execution completed.
pause
exit /b
:check_installation
where %1 >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo %2 is not installed. Please install %2.
pause
exit /b
)
:install_if_missing
where %1 >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo %2 is not installed. Installing %2...
%3
)
:clone_repository
if exist %2 (
echo %2 directory already exists.
set /p overwrite="Do you want to overwrite? (Y/N): "
if /i "%overwrite%"=="Y" (
rmdir /s /q %2
git clone %1
) else (
echo Skipping clone, using existing directory.
)
) else (
git clone %1
)
:set_execution_provider
if "%1"=="1" (
call :install_onnxruntime "onnxruntime-gpu" "1.16.3" "cuda"
) else if "%1"=="2" (
call :install_onnxruntime "onnxruntime-silicon" "1.13.1" "coreml"
) else if "%1"=="3" (
call :install_onnxruntime "onnxruntime-coreml" "1.13.1" "coreml"
) else if "%1"=="4" (
call :install_onnxruntime "onnxruntime-directml" "1.15.1" "directml"
) else if "%1"=="5" (
call :install_onnxruntime "onnxruntime-openvino" "1.15.0" "openvino"
) else if "%1"=="6" (
echo Skipping GPU acceleration setup.
set "exec_provider=none"
) else (
echo Invalid choice. Please try again.
goto menu
)
:install_onnxruntime
echo Installing %1 dependencies...
pip uninstall -y onnxruntime %1
pip install %1==%2
set "exec_provider=%3"
goto end_choice