* ffmpeg platform-agnostic hardware-acceleration

* clear CUDA cache after swapping

on low VRAM + ffmpeg cuda acceleration, clearing cache prevent cuda out-of-memory error

* check torch gpu before clearing cache

* torch check nvidia only

* syntax error

* Adjust comment

* Normalize ARGS

* Remove path normalization

* Remove args overrides

* Run test on Linux and Windows

* Run test on Linux and Windows

* Run test on Linux and Windows

* Run test on Linux and Windows

* Run test on Linux and Windows

* Run test on Linux and Windows

* Run test on Linux and Windows

* Revert to Ubuntu test only as Windows hangs

* Simplified the way to maintain aspect ratio of the preview, and maintaining aspect ratio of the miniatures

* Change face and target images from contain to fit

* Improve status output

* Massive utilities and core refactoring

* Fix sound

* Fix sound part2

* Fix more

* Move every UI related thing to ui.py

* Refactor UI

* Introduce render_video_preview()

* Add preview back part1

* Add preview back part2, Introduce --video-quality for CLI

* Get the preview working

* Couple if minor UI fixes

* Add video encoder via CLI

* Change default video quality, Integrate recent directories for UI

* Move temporary files to temp/{target-name}

* Fix fps detection

* Rename method

* Introduce suggest methods for args defaults, output mode and core/threads count via postfix

* Fix max_memory and output memory in progress bar too

* Turns out mac has a different memory unit

* Add typing to swapper

* Fix FileNotFoundError while deleting temp

* Updated requirements.txt for macs.

(cherry picked from commit fd00a18771)

* Doing bunch of renaming and typing

* Just a cosmetic

* Doing bunch of renaming again

* Introduce execution provider to support DirectML

* enhancer update

* remove useless code

* remove useless code

* remove useless code

* fix

* reslove some errors in code review.

* methods rename

* del Unused import

* recover the conditional installation for darwin!

* recover the import module

* del try catch in unrelate codes

* fix error in argument and potential infinity loop

* remove the ROCM check before face-enhancer

* fix lint error

* add the process for image

* conditional process according to --frame-processor

* Hotfix usage of --frame-processor face-swapper face-enhancer

* Auto download models

* typo fixed

* Fix framerate and audio sync issues

* Limit the video encoders according to -crf support

* Limit the video encoders according to -crf support part2

* Migrate to theme based UI using customtkinter

* Show full preview according to video frames total

* Simplify is_image and is_video, close preview on source/target change, show preview slider on video only, fix start button error

* Fix linter

* Use float over int for more accurate fps

* introduce a pre_check() to enhancer...

* update

* Update utilities.py

* move the model_path to the method

* Fix model paths

* Fix linter

* Fix images scaling

* Update onnxruntime-silicon and monkey patch ssl for mac

* Downgrade onnxruntime-silicon again

* Introduce a first version of CONTRIBUTING.md

* Update CONTRIBUTING.md

* Add libvpx-vp9 to video encoder's choices

* Update CONTRIBUTING.md

* Migrate gpu flags to execution flags

* Fix linter

* Encode and decode execution providers for easier usage

* Fix comment

* Update CLI usage

* Fixed wrong frame colors for preview

* Introduce dynamic frame procesors

* Remove unused imports

* Use different structure

* modified:   roop/core.py , enhancer and swapper

* fix link error

* modified core.py ui.py frame/core.py

* fix get_frame_processors_modules()

* fix lint error in core.py

* fix face_enhancer.py

* fix enhancer.py

* fix ui.py

* modified:   roop/ui.py

* fix ui.py

* Update ui.py

* Fix preview to work with multiple frame processors

* Remove multi processing as the performance is equal but memory consumtion better

* Extract common methods to processors.frame.core

* Add comments and todos

* Minor fixes

* Minor fixes

* Limit enhancer to 1 thread but restore afterwards

* Limit enhancer to 1 thread but restore afterwards

* Update README and GUI demo

* Implementation check for frame processors

* Improve path validation

* Introduce normalize output path

* Fix GUI startup

* Introduce pre_start() and move globals check to pre_check()

* Flip the hooks and update CLI usage

* Introduce bool returns for pre_check() and pre_start(), Scope for terminal output

* Cast deprecated args and warn the user

* Migrate to ctk.CTkImage

* readme update: old keys fixed

* Unused import made CI test to fail

* Give gfpgan a shot

* Update dependencies

* Update dependencies

* Update dependencies

* Update dependencies

* Fix ci (#498)

* Use different dependencies for CI

* Use different dependencies for CI

* Use different dependencies for CI

* Use different dependencies for CI

* Use different dependencies for CI

* Fix preview (#499)

* Minor changes

* Fix override of files using restore_audio()

* Host the models in our huggingface space

* Remove everything source face related from enhancer (#510)

* Remove everything source face related from enhancer

* Disable upscale for enhancer to double fps

* Using futures for multi threading

* Introduce predicter (#530)

* Hotfix predicter

* Fix square brackets in the target path (#532)

* fixes the problem with square brackets in the target file path

* fixes the problem with square brackets in the target file path

* Ok, here is the fix without regexps

* unused import

* fix for ci

* glob.escape fits here

* Make multiple processors work with images

* Fix output normalize for deprecated args

* Make roop more or less type-safe (#541)

* Make roop more or less type-safe

* Fix ci.yml

* Fix urllib type error

* Rename globals in ui

* Update utilities.py (#542)

Updated the extraction process with the ffmpeg command that corrects the colorspace while extracting to png, and corrected the ffmpeg command, adding '-pix_fmt', 'rgb24', '-sws_flags', '+accurate_rnd+full_chroma_int', '-colorspace', '1', '-color_primaries', '1', '-color_trc', '1'

'-pix_fmt rgb24', means treat the image as RGB (or RGBA)
'-sws_flags +accurate_rnd+full_chroma_int', means use full color and chroma subsampling instead of 4:2:0
'-colorspace 1', '-color_primaries 1', '-color_trc 1' put the metadata color tags to the png

* Use GFPGANv1.4 for enhancer

* Fixing the colorspace issue when writing the mp4 from the extracted pngs (#550)

* Update utilities.py

Updated the extraction process with the ffmpeg command that corrects the colorspace while extracting to png, and corrected the ffmpeg command, adding '-pix_fmt', 'rgb24', '-sws_flags', '+accurate_rnd+full_chroma_int', '-colorspace', '1', '-color_primaries', '1', '-color_trc', '1'

'-pix_fmt rgb24', means treat the image as RGB (or RGBA)
'-sws_flags +accurate_rnd+full_chroma_int', means use full color and chroma subsampling instead of 4:2:0
'-colorspace 1', '-color_primaries 1', '-color_trc 1' put the metadata color tags to the png

* Fixing color conversion from temp png sequence to mp4

'-sws_flags', 'spline+accurate_rnd+full_chroma_int', ' use full color and chroma subsampling
-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', keep the same rec709 colorspace
'-color_range', '1', '-colorspace', '1', '-color_primaries', '1', '-color_trc', '1', put the metadata color tags to the mp4

* Revert "Fixing the colorspace issue when writing the mp4 from the extracted pngs (#550)"

This reverts commit cf5f27d36a.

* Revert "Update utilities.py (#542)"

This reverts commit d57279ceb6.

* Restore colorspace restoring

* Add metadata to cli and ui

* Introduce Face and Frame typing

* Update CLI usage

---------

Co-authored-by: Phan Tuấn Anh <phantuananh@hotmail.com.vn>
Co-authored-by: Antoine Buchser <10513467+AntwaneB@users.noreply.github.com>
Co-authored-by: Eamonn A. Sweeney <mail@eamonnsweeney.ie>
Co-authored-by: Moeblack <Moeblack@kuroinekorachi@gmail.com>
Co-authored-by: Pozitronik <antikillerxxl@gmail.com>
Co-authored-by: Pikachu~~~ <Moeblack@users.noreply.github.com>
Co-authored-by: K1llM@n <k1llmanws@yandex.ru>
Co-authored-by: NickPittas <107440357+NickPittas@users.noreply.github.com>
This commit is contained in:
Henry Ruhs
2023-06-20 10:57:07 +02:00
committed by GitHub
parent 70108223f9
commit ec12665fb1
29 changed files with 1078 additions and 747 deletions

View File

@@ -1,2 +1,3 @@
[flake8]
select = E3, E4, F
select = E3, E4, F
per-file-ignores = roop/core.py:E402

View File

Before

Width:  |  Height:  |  Size: 537 KiB

After

Width:  |  Height:  |  Size: 537 KiB

View File

@@ -13,7 +13,9 @@ jobs:
with:
python-version: 3.9
- run: pip install flake8
- run: flake8 run.py core
- run: pip install mypy
- run: flake8 run.py roop
- run: mypy --config-file mypi.ini run.py roop
test:
runs-on: ubuntu-latest
steps:
@@ -25,8 +27,7 @@ jobs:
uses: actions/setup-python@v2
with:
python-version: 3.9
- run: pip install -r requirements.txt gdown
- run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ
- run: ./run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4
- run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex "psnr" -f null -
- run: pip install -r requirements-ci.txt
- run: python run.py -s=.github/examples/source.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4
- run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex psnr -f null -

3
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.idea
models
temp
__pycache__
*.onnx

21
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,21 @@
## Pull Requests
### Do
- ...consider to fix bugs over adding features
- ...one pull request for one feature or improvement
- ...consult us about implementation details
- ...proper testing before you submit your code
- ...resolve failed CI pipelines
### Don't
- ...introduce fundamental changes in terms of software architecture
- ...introduce OOP - we accept functional programming only
- ...ignore given requirements or try to work around them
- ...submit code to a development branch without consulting us
- ...submit massive amount of code changes
- ...submit a proof of concept
- ...submit code that is using undocumented and private APIs
- ...solve third party issues in our project
- ...comment what your code does - use proper naming instead

View File

@@ -36,26 +36,32 @@ Additional command line arguments are given below:
```
options:
-h, --help show this help message and exit
-f SOURCE_IMG, --face SOURCE_IMG
use this face
-s SOURCE_PATH, --source SOURCE_PATH
select an source image
-t TARGET_PATH, --target TARGET_PATH
replace this face
-o OUTPUT_FILE, --output OUTPUT_FILE
save output to this file
--keep-fps maintain original fps
--keep-frames keep frames directory
--all-faces swap all faces in frame
select an target image or video
-o OUTPUT_PATH, --output OUTPUT_PATH
select output file or directory
--frame-processor {face_swapper,face_enhancer} [{face_swapper,face_enhancer} ...]
pipeline of frame processors
--keep-fps keep original fps
--keep-audio keep original audio
--keep-frames keep temporary frames
--many-faces process every face
--video-encoder {libx264,libx265,libvpx-vp9}
adjust output video encoder
--video-quality VIDEO_QUALITY
adjust output video quality
--max-memory MAX_MEMORY
maximum amount of RAM in GB to be used
--cpu-cores CPU_CORES
number of CPU cores to use
--gpu-threads GPU_THREADS
number of threads to be use for the GPU
--gpu-vendor {apple,amd,intel,nvidia}
choice your GPU vendor
maximum amount of RAM in GB
--execution-provider {cpu,...} [{cpu,...} ...]
execution provider
--execution-threads EXECUTION_THREADS
number of execution threads
-v, --version show program's version number and exit
```
Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode.
Looking for a CLI mode? Using the -s/--source argument will make the program in cli mode.
## Credits
- [henryruhs](https://github.com/henryruhs): for being an irreplaceable contributor to the project

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

After

Width:  |  Height:  |  Size: 18 KiB

7
mypi.ini Normal file
View File

@@ -0,0 +1,7 @@
[mypy]
check_untyped_defs = True
disallow_any_generics = True
disallow_untyped_calls = True
disallow_untyped_defs = True
ignore_missing_imports = True
strict_optional = False

14
requirements-ci.txt Normal file
View File

@@ -0,0 +1,14 @@
numpy==1.23.5
opencv-python==4.7.0.72
onnx==1.14.0
insightface==0.7.3
psutil==5.9.5
tk==0.1.0
customtkinter==5.1.3
torch==2.0.1
torchvision==0.15.2
onnxruntime==1.15.0
tensorflow==2.12.0
opennsfw2==0.10.2
protobuf==4.23.2
tqdm==4.65.0

View File

@@ -6,9 +6,12 @@ onnx==1.14.0
insightface==0.7.3
psutil==5.9.5
tk==0.1.0
customtkinter==5.1.3
pillow==9.5.0
torch==2.0.1+cu118; sys_platform != 'darwin'
torch==2.0.1; sys_platform == 'darwin'
torchvision==0.15.2+cu118; sys_platform != 'darwin'
torchvision==0.15.2; sys_platform == 'darwin'
onnxruntime==1.15.0; sys_platform == 'darwin' and platform_machine != 'arm64'
onnxruntime-silicon==1.13.1; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.15.0; sys_platform != 'darwin'
@@ -16,4 +19,5 @@ tensorflow==2.13.0rc1; sys_platform == 'darwin'
tensorflow==2.12.0; sys_platform != 'darwin'
opennsfw2==0.10.2
protobuf==4.23.2
tqdm==4.65.0
tqdm==4.65.0
gfpgan==1.3.8

View File

@@ -1 +0,0 @@

View File

@@ -1,27 +0,0 @@
import insightface
import roop.globals
FACE_ANALYSER = None
def get_face_analyser():
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.providers)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER
def get_face_single(img_data):
face = get_face_analyser().get(img_data)
try:
return sorted(face, key=lambda x: x.bbox[0])[0]
except IndexError:
return None
def get_face_many(img_data):
try:
return get_face_analyser().get(img_data)
except IndexError:
return None

20
roop/capturer.py Normal file
View File

@@ -0,0 +1,20 @@
from typing import Any
import cv2
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
capture.release()
if has_frame:
return frame
return None
def get_video_frame_total(video_path: str) -> int:
capture = cv2.VideoCapture(video_path)
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total

View File

@@ -2,78 +2,136 @@
import os
import sys
# single thread doubles performance of gpu-mode - needs to be set before torch import
if any(arg.startswith('--gpu-vendor') for arg in sys.argv):
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform
import signal
import shutil
import glob
import argparse
import psutil
import torch
import onnxruntime
import tensorflow
from pathlib import Path
import multiprocessing as mp
from opennsfw2 import predict_video_frames, predict_image
import cv2
import roop.globals
from roop.swapper import process_video, process_img, process_faces, process_frames
from roop.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace
from roop.analyser import get_face_single
import roop.metadata
import roop.ui as ui
from roop.predicter import predict_image, predict_video
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
signal.signal(signal.SIGINT, lambda signal_number, frame: quit())
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--face', help='use this face', dest='source_img')
parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_file')
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int)
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=max(psutil.cpu_count() / 2, 1))
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=8)
parser.add_argument('--gpu-vendor', help='choice your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia'])
if 'ROCMExecutionProvider' in roop.globals.execution_providers:
del torch
args = parser.parse_known_args()[0]
if 'all_faces' in args:
roop.globals.all_faces = True
if args.cpu_cores:
roop.globals.cpu_cores = int(args.cpu_cores)
# cpu thread fix for mac
if sys.platform == 'darwin':
roop.globals.cpu_cores = 1
if args.gpu_threads:
roop.globals.gpu_threads = int(args.gpu_threads)
# gpu thread fix for amd
if args.gpu_vendor == 'amd':
roop.globals.gpu_threads = 1
if args.gpu_vendor:
roop.globals.gpu_vendor = args.gpu_vendor
else:
roop.globals.providers = ['CPUExecutionProvider']
sep = "/"
if os.name == "nt":
sep = "\\"
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def limit_resources():
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18)
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{roop.metadata.name} {roop.metadata.version}')
# register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
program.add_argument('--gpu-threads', help=argparse.SUPPRESS, dest='gpu_threads_deprecated', type=int)
args = program.parse_args()
roop.globals.source_path = args.source_path
roop.globals.target_path = args.target_path
roop.globals.output_path = normalize_output_path(roop.globals.source_path, roop.globals.target_path, args.output_path)
roop.globals.frame_processors = args.frame_processor
roop.globals.headless = args.source_path or args.target_path or args.output_path
roop.globals.keep_fps = args.keep_fps
roop.globals.keep_audio = args.keep_audio
roop.globals.keep_frames = args.keep_frames
roop.globals.many_faces = args.many_faces
roop.globals.video_encoder = args.video_encoder
roop.globals.video_quality = args.video_quality
roop.globals.max_memory = args.max_memory
roop.globals.execution_providers = decode_execution_providers(args.execution_provider)
roop.globals.execution_threads = args.execution_threads
# translate deprecated args
if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
roop.globals.source_path = args.source_path_deprecated
roop.globals.output_path = normalize_output_path(args.source_path_deprecated, roop.globals.target_path, args.output_path)
if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
roop.globals.execution_threads = args.cpu_cores_deprecated
if args.gpu_vendor_deprecated == 'apple':
print('\033[33mArgument --gpu-vendor apple is deprecated. Use --execution-provider coreml instead.\033[0m')
roop.globals.execution_providers = decode_execution_providers(['coreml'])
if args.gpu_vendor_deprecated == 'nvidia':
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
roop.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
roop.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
roop.globals.execution_threads = args.gpu_threads_deprecated
def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin':
return 4
return 16
def suggest_execution_providers() -> List[str]:
return encode_execution_providers(onnxruntime.get_available_providers())
def suggest_execution_threads() -> int:
if 'DmlExecutionProvider' in roop.globals.execution_providers:
return 1
if 'ROCMExecutionProvider' in roop.globals.execution_providers:
return 2
return 8
def limit_resources() -> None:
# prevent tensorflow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
if args.max_memory:
memory = args.max_memory * 1024 * 1024 * 1024
if str(platform.system()).lower() == 'windows':
# limit memory usage
if roop.globals.max_memory:
memory = roop.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin':
memory = roop.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
@@ -82,199 +140,99 @@ def limit_resources():
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
def pre_check():
def release_resources() -> None:
if 'CUDAExecutionProvider' in roop.globals.execution_providers:
torch.cuda.empty_cache()
def pre_check() -> bool:
if sys.version_info < (3, 9):
quit('Python version is not supported - please upgrade to 3.9 or higher')
update_status('Python version is not supported - please upgrade to 3.9 or higher.')
return False
if not shutil.which('ffmpeg'):
quit('ffmpeg is not installed!')
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
if not os.path.isfile(model_path):
quit('File "inswapper_128.onnx" does not exist!')
if roop.globals.gpu_vendor == 'apple':
if 'CoreMLExecutionProvider' not in roop.globals.providers:
quit("You are using --gpu=apple flag but CoreML isn't available or properly installed on your system.")
if roop.globals.gpu_vendor == 'amd':
if 'ROCMExecutionProvider' not in roop.globals.providers:
quit("You are using --gpu=amd flag but ROCM isn't available or properly installed on your system.")
if roop.globals.gpu_vendor == 'nvidia':
CUDA_VERSION = torch.version.cuda
CUDNN_VERSION = torch.backends.cudnn.version()
if not torch.cuda.is_available():
quit("You are using --gpu=nvidia flag but CUDA isn't available or properly installed on your system.")
if CUDA_VERSION > '11.8':
quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8")
if CUDA_VERSION < '11.4':
quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8")
if CUDNN_VERSION < 8220:
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1")
if CUDNN_VERSION > 8910:
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1")
update_status('ffmpeg is not installed.')
return False
return True
def get_video_frame(video_path, frame_number = 1):
cap = cv2.VideoCapture(video_path)
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
cap.set(cv2.CAP_PROP_POS_FRAMES, min(amount_of_frames, frame_number-1))
if not cap.isOpened():
print("Error opening video file")
def update_status(message: str, scope: str = 'ROOP.CORE') -> None:
print(f'[{scope}] {message}')
if not roop.globals.headless:
ui.update_status(message)
def start() -> None:
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
if not frame_processor.pre_start():
return
# process image to image
if has_image_extension(roop.globals.target_path):
if predict_image(roop.globals.target_path):
destroy()
shutil.copy2(roop.globals.target_path, roop.globals.output_path)
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(roop.globals.source_path, roop.globals.output_path, roop.globals.output_path)
release_resources()
if is_image(roop.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
return
ret, frame = cap.read()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
cap.release()
def preview_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error opening video file")
return 0
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
ret, frame = cap.read()
if ret:
frame = get_video_frame(video_path)
cap.release()
return (amount_of_frames, frame)
def status(string):
value = "Status: " + string
if 'cli_mode' in args:
print(value)
# process image to videos
if predict_video(roop.globals.target_path):
destroy()
update_status('Creating temp resources...')
create_temp(roop.globals.target_path)
update_status('Extracting frames...')
extract_frames(roop.globals.target_path)
temp_frame_paths = get_temp_frame_paths(roop.globals.target_path)
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_video(roop.globals.source_path, temp_frame_paths)
release_resources()
# handles fps
if roop.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(roop.globals.target_path)
update_status(f'Creating video with {fps} fps...')
create_video(roop.globals.target_path, fps)
else:
ui.update_status_label(value)
def process_video_multi_cores(source_img, frame_paths):
n = len(frame_paths) // roop.globals.cpu_cores
if n > 2:
processes = []
for i in range(0, len(frame_paths), n):
p = POOL.apply_async(process_video, args=(source_img, frame_paths[i:i + n],))
processes.append(p)
for p in processes:
p.get()
POOL.close()
POOL.join()
def start(preview_callback = None):
if not args.source_img or not os.path.isfile(args.source_img):
print("\n[WARNING] Please select an image containing a face.")
return
elif not args.target_path or not os.path.isfile(args.target_path):
print("\n[WARNING] Please select a video/image to swap face in.")
return
if not args.output_file:
target_path = args.target_path
args.output_file = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path
target_path = args.target_path
test_face = get_face_single(cv2.imread(args.source_img))
if not test_face:
print("\n[WARNING] No face detected in source image. Please try with another one.\n")
return
if is_img(target_path):
if predict_image(target_path) > 0.85:
quit()
process_img(args.source_img, target_path, args.output_file)
status("swap successful!")
return
seconds, probabilities = predict_video_frames(video_path=args.target_path, frame_interval=100)
if any(probability > 0.85 for probability in probabilities):
quit()
video_name_full = target_path.split("/")[-1]
video_name = os.path.splitext(video_name_full)[0]
output_dir = os.path.dirname(target_path) + "/" + video_name if os.path.dirname(target_path) else video_name
Path(output_dir).mkdir(exist_ok=True)
status("detecting video's FPS...")
fps, exact_fps = detect_fps(target_path)
if not args.keep_fps and fps > 30:
this_path = output_dir + "/" + video_name + ".mp4"
set_fps(target_path, this_path, 30)
target_path, exact_fps = this_path, 30
update_status('Creating video with 30.0 fps...')
create_video(roop.globals.target_path)
# handle audio
if roop.globals.keep_audio:
if roop.globals.keep_fps:
update_status('Restoring audio...')
else:
update_status('Restoring audio might cause issues as fps are not kept...')
restore_audio(roop.globals.target_path, roop.globals.output_path)
else:
shutil.copy(target_path, output_dir)
status("extracting frames...")
extract_frames(target_path, output_dir)
args.frame_paths = tuple(sorted(
glob.glob(output_dir + "/*.png"),
key=lambda x: int(x.split(sep)[-1].replace(".png", ""))
))
status("swapping in progress...")
if roop.globals.gpu_vendor is None and roop.globals.cpu_cores > 1:
global POOL
POOL = mp.Pool(roop.globals.cpu_cores)
process_video_multi_cores(args.source_img, args.frame_paths)
move_temp(roop.globals.target_path, roop.globals.output_path)
# clean and validate
clean_temp(roop.globals.target_path)
if is_video(roop.globals.target_path):
update_status('Processing to video succeed!')
else:
process_video(args.source_img, args.frame_paths)
status("creating video...")
create_video(video_name, exact_fps, output_dir)
status("adding audio...")
add_audio(output_dir, target_path, video_name_full, args.keep_frames, args.output_file)
save_path = args.output_file if args.output_file else output_dir + "/" + video_name + ".mp4"
print("\n\nVideo saved as:", save_path, "\n\n")
status("swap successful!")
update_status('Processing to video failed!')
def select_face_handler(path: str):
args.source_img = path
def destroy() -> None:
if roop.globals.target_path:
clean_temp(roop.globals.target_path)
quit()
def select_target_handler(path: str):
args.target_path = path
return preview_video(args.target_path)
def toggle_all_faces_handler(value: int):
roop.globals.all_faces = True if value == 1 else False
def toggle_fps_limit_handler(value: int):
args.keep_fps = int(value != 1)
def toggle_keep_frames_handler(value: int):
args.keep_frames = value
def save_file_handler(path: str):
args.output_file = path
def create_test_preview(frame_number):
return process_faces(
get_face_single(cv2.imread(args.source_img)),
get_video_frame(args.target_path, frame_number)
)
def run():
global all_faces, keep_frames, limit_fps
pre_check()
def run() -> None:
parse_args()
if not pre_check():
return
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
if not frame_processor.pre_check():
return
limit_resources()
if args.source_img:
args.cli_mode = True
if roop.globals.headless:
start()
quit()
window = ui.init(
{
'all_faces': roop.globals.all_faces,
'keep_fps': args.keep_fps,
'keep_frames': args.keep_frames
},
select_face_handler,
select_target_handler,
toggle_all_faces_handler,
toggle_fps_limit_handler,
toggle_keep_frames_handler,
save_file_handler,
start,
get_video_frame,
create_test_preview
)
window.mainloop()
else:
window = ui.init(start, destroy)
window.mainloop()

31
roop/face_analyser.py Normal file
View File

@@ -0,0 +1,31 @@
from typing import Any
import insightface
import roop.globals
from roop.typing import Frame
FACE_ANALYSER = None
def get_face_analyser() -> Any:
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.execution_providers)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER
def get_one_face(frame: Frame) -> Any:
face = get_face_analyser().get(frame)
try:
return min(face, key=lambda x: x.bbox[0])
except ValueError:
return None
def get_many_faces(frame: Frame) -> Any:
try:
return get_face_analyser().get(frame)
except IndexError:
return None

View File

@@ -1,11 +1,17 @@
import onnxruntime
from typing import List
all_faces = None
source_path = None
target_path = None
output_path = None
frame_processors: List[str] = []
keep_fps = None
keep_audio = None
keep_frames = None
many_faces = None
video_encoder = None
video_quality = None
max_memory = None
execution_providers: List[str] = []
execution_threads = None
headless = None
log_level = 'error'
cpu_cores = None
gpu_threads = None
gpu_vendor = None
providers = onnxruntime.get_available_providers()
if 'TensorrtExecutionProvider' in providers:
providers.remove('TensorrtExecutionProvider')

2
roop/metadata.py Normal file
View File

@@ -0,0 +1,2 @@
name = 'roop'
version = '1.0.0'

25
roop/predicter.py Normal file
View File

@@ -0,0 +1,25 @@
import numpy
import opennsfw2
from PIL import Image
from roop.typing import Frame
MAX_PROBABILITY = 0.85
def predict_frame(target_frame: Frame) -> bool:
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
model = opennsfw2.make_open_nsfw_model()
views = numpy.expand_dims(image, axis=0)
_, probability = model.predict(views)[0]
return probability > MAX_PROBABILITY
def predict_image(target_path: str) -> bool:
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
def predict_video(target_path: str) -> bool:
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)
return any(probability > MAX_PROBABILITY for probability in probabilities)

View File

View File

View File

@@ -0,0 +1,56 @@
import sys
import importlib
from concurrent.futures import ThreadPoolExecutor
from types import ModuleType
from typing import Any, List, Callable
from tqdm import tqdm
import roop
FRAME_PROCESSORS_MODULES: List[ModuleType] = []
FRAME_PROCESSORS_INTERFACE = [
'pre_check',
'pre_start',
'process_frame',
'process_image',
'process_video'
]
def load_frame_processor_module(frame_processor: str) -> Any:
try:
frame_processor_module = importlib.import_module(f'roop.processors.frame.{frame_processor}')
for method_name in FRAME_PROCESSORS_INTERFACE:
if not hasattr(frame_processor_module, method_name):
sys.exit()
except ImportError:
sys.exit()
return frame_processor_module
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES:
for frame_processor in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
return FRAME_PROCESSORS_MODULES
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
with ThreadPoolExecutor(max_workers=roop.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
future = executor.submit(process_frames, source_path, [path], progress)
futures.append(future)
for future in futures:
future.result()
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
progress.set_postfix({'execution_providers': roop.globals.execution_providers, 'threads': roop.globals.execution_threads, 'memory': roop.globals.max_memory})
multi_process_frame(source_path, frame_paths, process_frames, progress)

View File

@@ -0,0 +1,75 @@
from typing import Any, List
import cv2
import threading
import gfpgan
import roop.globals
import roop.processors.frame.core
from roop.core import update_status
from roop.face_analyser import get_one_face
from roop.typing import Frame, Face
from roop.utilities import conditional_download, resolve_relative_path, is_image, is_video
FACE_ENHANCER = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'ROOP.FACE-ENHANCER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/henryruhs/roop/resolve/main/GFPGANv1.4.pth'])
return True
def pre_start() -> bool:
if not is_image(roop.globals.target_path) and not is_video(roop.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_enhancer() -> Any:
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
# todo: set models path https://github.com/TencentARC/GFPGAN/issues/399
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
return FACE_ENHANCER
def enhance_face(temp_frame: Frame) -> Frame:
with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(
temp_frame,
paste_back=True
)
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
roop.processors.frame.core.process_video(None, temp_frame_paths, process_frames)

View File

@@ -0,0 +1,86 @@
from typing import Any, List
import cv2
import insightface
import threading
import roop.globals
import roop.processors.frame.core
from roop.core import update_status
from roop.face_analyser import get_one_face, get_many_faces
from roop.typing import Face, Frame
from roop.utilities import conditional_download, resolve_relative_path, is_image, is_video
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
NAME = 'ROOP.FACE-SWAPPER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/henryruhs/roop/resolve/main/inswapper_128.onnx'])
return True
def pre_start() -> bool:
if not is_image(roop.globals.source_path):
update_status('Select an image for source path.', NAME)
return False
elif not get_one_face(cv2.imread(roop.globals.source_path)):
update_status('No face in source path detected.', NAME)
return False
if not is_image(roop.globals.target_path) and not is_video(roop.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_swapper() -> Any:
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = resolve_relative_path('../models/inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.execution_providers)
return FACE_SWAPPER
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if roop.globals.many_faces:
many_faces = get_many_faces(temp_frame)
if many_faces:
for target_face in many_faces:
temp_frame = swap_face(source_face, target_face, temp_frame)
else:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = swap_face(source_face, target_face, temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
source_face = get_one_face(cv2.imread(source_path))
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame(source_face, temp_frame)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
source_face = get_one_face(cv2.imread(source_path))
target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
roop.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)

View File

@@ -1,96 +0,0 @@
import os
from tqdm import tqdm
import cv2
import insightface
import threading
import roop.globals
from roop.analyser import get_face_single, get_face_many
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
def get_face_swapper():
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.providers)
return FACE_SWAPPER
def swap_face_in_frame(source_face, target_face, frame):
if target_face:
return get_face_swapper().get(frame, target_face, source_face, paste_back=True)
return frame
def process_faces(source_face, target_frame):
if roop.globals.all_faces:
many_faces = get_face_many(target_frame)
if many_faces:
for face in many_faces:
target_frame = swap_face_in_frame(source_face, face, target_frame)
else:
face = get_face_single(target_frame)
if face:
target_frame = swap_face_in_frame(source_face, face, target_frame)
return target_frame
def process_frames(source_img, frame_paths, progress=None):
source_face = get_face_single(cv2.imread(source_img))
for frame_path in frame_paths:
frame = cv2.imread(frame_path)
try:
result = process_faces(source_face, frame)
cv2.imwrite(frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
def multi_process_frame(source_img, frame_paths, progress):
threads = []
num_threads = roop.globals.gpu_threads
num_frames_per_thread = len(frame_paths) // num_threads
remaining_frames = len(frame_paths) % num_threads
# create thread and launch
start_index = 0
for _ in range(num_threads):
end_index = start_index + num_frames_per_thread
if remaining_frames > 0:
end_index += 1
remaining_frames -= 1
thread_frame_paths = frame_paths[start_index:end_index]
thread = threading.Thread(target=process_frames, args=(source_img, thread_frame_paths, progress))
threads.append(thread)
thread.start()
start_index = end_index
# threading
for thread in threads:
thread.join()
def process_img(source_img, target_path, output_file):
frame = cv2.imread(target_path)
face = get_face_single(frame)
source_face = get_face_single(cv2.imread(source_img))
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
cv2.imwrite(output_file, result)
print("\n\nImage saved as:", output_file, "\n\n")
def process_video(source_img, frame_paths):
do_multi = roop.globals.gpu_vendor is not None and roop.globals.gpu_threads > 1
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress:
if do_multi:
multi_process_frame(source_img, frame_paths, progress)
else:
process_frames(source_img, frame_paths, progress)

7
roop/typing.py Normal file
View File

@@ -0,0 +1,7 @@
from typing import Any
from insightface.app.common import Face
import numpy
Face = Face
Frame = numpy.ndarray[Any, Any]

155
roop/ui.json Normal file
View File

@@ -0,0 +1,155 @@
{
"CTk": {
"fg_color": ["gray95", "gray10"]
},
"CTkToplevel": {
"fg_color": ["gray95", "gray10"]
},
"CTkFrame": {
"corner_radius": 6,
"border_width": 0,
"fg_color": ["gray90", "gray13"],
"top_fg_color": ["gray85", "gray16"],
"border_color": ["gray65", "gray28"]
},
"CTkButton": {
"corner_radius": 6,
"border_width": 0,
"fg_color": ["#3a7ebf", "#1f538d"],
"hover_color": ["#325882", "#14375e"],
"border_color": ["#3E454A", "#949A9F"],
"text_color": ["#DCE4EE", "#DCE4EE"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkLabel": {
"corner_radius": 0,
"fg_color": "transparent",
"text_color": ["gray14", "gray84"]
},
"CTkEntry": {
"corner_radius": 6,
"border_width": 2,
"fg_color": ["#F9F9FA", "#343638"],
"border_color": ["#979DA2", "#565B5E"],
"text_color": ["gray14", "gray84"],
"placeholder_text_color": ["gray52", "gray62"]
},
"CTkCheckbox": {
"corner_radius": 6,
"border_width": 3,
"fg_color": ["#3a7ebf", "#1f538d"],
"border_color": ["#3E454A", "#949A9F"],
"hover_color": ["#325882", "#14375e"],
"checkmark_color": ["#DCE4EE", "gray90"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkSwitch": {
"corner_radius": 1000,
"border_width": 3,
"button_length": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["#3a7ebf", "#1f538d"],
"button_color": ["gray36", "#D5D9DE"],
"button_hover_color": ["gray20", "gray100"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkRadiobutton": {
"corner_radius": 1000,
"border_width_checked": 6,
"border_width_unchecked": 3,
"fg_color": ["#3a7ebf", "#1f538d"],
"border_color": ["#3E454A", "#949A9F"],
"hover_color": ["#325882", "#14375e"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkProgressBar": {
"corner_radius": 1000,
"border_width": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["#3a7ebf", "#1f538d"],
"border_color": ["gray", "gray"]
},
"CTkSlider": {
"corner_radius": 1000,
"button_corner_radius": 1000,
"border_width": 6,
"button_length": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["gray40", "#AAB0B5"],
"button_color": ["#3a7ebf", "#1f538d"],
"button_hover_color": ["#325882", "#14375e"]
},
"CTkOptionMenu": {
"corner_radius": 6,
"fg_color": ["#3a7ebf", "#1f538d"],
"button_color": ["#325882", "#14375e"],
"button_hover_color": ["#234567", "#1e2c40"],
"text_color": ["#DCE4EE", "#DCE4EE"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkComboBox": {
"corner_radius": 6,
"border_width": 2,
"fg_color": ["#F9F9FA", "#343638"],
"border_color": ["#979DA2", "#565B5E"],
"button_color": ["#979DA2", "#565B5E"],
"button_hover_color": ["#6E7174", "#7A848D"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray50", "gray45"]
},
"CTkScrollbar": {
"corner_radius": 1000,
"border_spacing": 4,
"fg_color": "transparent",
"button_color": ["gray55", "gray41"],
"button_hover_color": ["gray40", "gray53"]
},
"CTkSegmentedButton": {
"corner_radius": 6,
"border_width": 2,
"fg_color": ["#979DA2", "gray29"],
"selected_color": ["#3a7ebf", "#1f538d"],
"selected_hover_color": ["#325882", "#14375e"],
"unselected_color": ["#979DA2", "gray29"],
"unselected_hover_color": ["gray70", "gray41"],
"text_color": ["#DCE4EE", "#DCE4EE"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkTextbox": {
"corner_radius": 6,
"border_width": 0,
"fg_color": ["gray100", "gray20"],
"border_color": ["#979DA2", "#565B5E"],
"text_color": ["gray14", "gray84"],
"scrollbar_button_color": ["gray55", "gray41"],
"scrollbar_button_hover_color": ["gray40", "gray53"]
},
"CTkScrollableFrame": {
"label_fg_color": ["gray80", "gray21"]
},
"DropdownMenu": {
"fg_color": ["gray90", "gray20"],
"hover_color": ["gray75", "gray28"],
"text_color": ["gray14", "gray84"]
},
"CTkFont": {
"macOS": {
"family": "Avenir",
"size": 12,
"weight": "normal"
},
"Windows": {
"family": "Corbel",
"size": 12,
"weight": "normal"
},
"Linux": {
"family": "Montserrat",
"size": 12,
"weight": "normal"
}
}
}

View File

@@ -1,315 +1,225 @@
import tkinter as tk
from typing import Any, Callable, Tuple
from PIL import Image, ImageTk
import webbrowser
from tkinter import filedialog
from tkinter.filedialog import asksaveasfilename
import threading
import os
import customtkinter as ctk
from typing import Callable, Tuple
from roop.utils import is_img
import cv2
from PIL import Image, ImageOps
max_preview_size = 800
import roop.globals
import roop.metadata
from roop.face_analyser import get_one_face
from roop.capturer import get_video_frame, get_video_frame_total
from roop.predicter import predict_frame
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import is_image, is_video, resolve_relative_path
ROOT = None
ROOT_HEIGHT = 700
ROOT_WIDTH = 600
PREVIEW = None
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
RECENT_DIRECTORY_OUTPUT = None
preview_label = None
preview_slider = None
source_label = None
target_label = None
status_label = None
def create_preview(parent):
global preview_image_frame, preview_frame_slider, test_button
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global ROOT, PREVIEW
preview_window = tk.Toplevel(parent)
# Override close button
preview_window.protocol("WM_DELETE_WINDOW", hide_preview)
preview_window.withdraw()
preview_window.title("Preview")
preview_window.configure(bg="red")
preview_window.resizable(width=False, height=False)
ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT)
frame = tk.Frame(preview_window, background="#2d3436")
frame.pack(fill='both', side='left', expand='True')
# Preview image
preview_image_frame = tk.Label(frame)
preview_image_frame.pack(side='top')
# Bottom frame
buttons_frame = tk.Frame(frame, background="#2d3436")
buttons_frame.pack(fill='both', side='bottom')
current_frame = tk.IntVar()
preview_frame_slider = tk.Scale(
buttons_frame,
from_=0,
to=0,
orient='horizontal',
variable=current_frame
)
preview_frame_slider.pack(fill='both', side='left', expand='True')
test_button = tk.Button(buttons_frame, text="Test", bg="#f1c40f", relief="flat", width=15, borderwidth=0, highlightthickness=0)
test_button.pack(side='right', fill='y')
return preview_window
return ROOT
def show_preview():
preview.deiconify()
preview_visible.set(True)
def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global source_label, target_label, status_label
ctk.deactivate_automatic_dpi_awareness()
ctk.set_appearance_mode('system')
ctk.set_default_color_theme(resolve_relative_path('ui.json'))
root = ctk.CTk()
root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
root.title(f'{roop.metadata.name} {roop.metadata.version}')
root.configure()
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = ctk.CTkButton(root, text='Select a face', command=lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', command=lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=roop.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, command=lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_frames_value = ctk.BooleanVar(value=roop.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, command=lambda: setattr(roop.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65)
keep_audio_value = ctk.BooleanVar(value=roop.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, command=lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_switch.place(relx=0.6, rely=0.6)
many_faces_value = ctk.BooleanVar(value=roop.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, command=lambda: setattr(roop.globals, 'many_faces', many_faces_value.get()))
many_faces_switch.place(relx=0.6, rely=0.65)
start_button = ctk.CTkButton(root, text='Start', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.75, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.75, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
return root
def hide_preview():
def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel:
global preview_label, preview_slider
preview = ctk.CTkToplevel(parent)
preview.withdraw()
preview_visible.set(False)
preview.title('Preview')
preview.configure()
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.resizable(width=False, height=False)
preview_label = ctk.CTkLabel(preview, text=None)
preview_label.pack(fill='both', expand=True)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value))
return preview
def set_preview_handler(test_handler):
test_button.config(command = test_handler)
def update_status(text: str) -> None:
status_label.configure(text=text)
ROOT.update()
def init_slider(frames_count, change_handler):
preview_frame_slider.configure(to=frames_count, command=lambda value: change_handler(preview_frame_slider.get()))
preview_frame_slider.set(0)
def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE
def update_preview(frame):
img = Image.fromarray(frame)
width, height = img.size
aspect_ratio = 1
if width > height:
aspect_ratio = max_preview_size / width
PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE)
if is_image(source_path):
roop.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(roop.globals.source_path)
image = render_image_preview(roop.globals.source_path, (200, 200))
source_label.configure(image=image)
else:
aspect_ratio = max_preview_size / height
img = img.resize(
(
int(width * aspect_ratio),
int(height * aspect_ratio)
),
Image.ANTIALIAS
)
photo_img = ImageTk.PhotoImage(img)
preview_image_frame.configure(image=photo_img)
preview_image_frame.image = photo_img
roop.globals.source_path = None
source_label.configure(image=None)
def select_face(select_face_handler: Callable[[str], None]):
if select_face_handler:
path = filedialog.askopenfilename(title="Select a face")
preview_face(path)
return select_face_handler(path)
return None
def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET
def update_slider_handler(get_video_frame, video_path):
return lambda frame_number: update_preview(get_video_frame(video_path, frame_number))
def test_preview(create_test_preview):
frame = create_test_preview(preview_frame_slider.get())
update_preview(frame)
def update_slider(get_video_frame, create_test_preview, video_path, frames_amount):
init_slider(frames_amount, update_slider_handler(get_video_frame, video_path))
set_preview_handler(lambda: preview_thread(lambda: test_preview(create_test_preview)))
def analyze_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
path = filedialog.askopenfilename(title="Select a target")
target_path.set(path)
amount, frame = select_target_handler(path)
frames_amount.set(amount)
preview_target(frame)
update_preview(frame)
def select_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
if select_target_handler:
analyze_target(select_target_handler, target_path, frames_amount)
def save_file(save_file_handler: Callable[[str], None], target_path: str):
filename, ext = 'output.mp4', '.mp4'
if is_img(target_path):
filename, ext = 'output.png', '.png'
if save_file_handler:
return save_file_handler(asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")]))
return None
def toggle_all_faces(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_fps_limit(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_keep_frames(toggle_keep_frames_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_keep_frames_handler:
return lambda: toggle_keep_frames_handler(variable.get())
return None
def create_button(parent, text, command):
return tk.Button(
parent,
text=text,
command=command,
bg="#f1c40f",
relief="flat",
borderwidth=0,
highlightthickness=0
)
def create_background_button(parent, text, command):
button = create_button(parent, text, command)
button.configure(
bg="#2d3436",
fg="#74b9ff",
highlightthickness=4,
highlightbackground="#74b9ff",
activebackground="#74b9ff",
borderwidth=4
)
return button
def create_check(parent, text, variable, command):
return tk.Checkbutton(
parent,
anchor="w",
relief="groove",
activebackground="#2d3436",
activeforeground="#74b9ff",
selectcolor="black",
text=text,
fg="#dfe6e9",
borderwidth=0,
highlightthickness=0,
bg="#2d3436",
variable=variable,
command=command
)
def preview_thread(thread_function):
threading.Thread(target=thread_function).start()
def open_preview_window(get_video_frame, target_path):
if preview_visible.get():
hide_preview()
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET)
if is_image(target_path):
roop.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path)
image = render_image_preview(roop.globals.target_path, (200, 200))
target_label.configure(image=image)
elif is_video(target_path):
roop.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path)
video_frame = render_video_preview(target_path, (200, 200))
target_label.configure(image=video_frame)
else:
show_preview()
if target_path:
frame = get_video_frame(target_path)
update_preview(frame)
roop.globals.target_path = None
target_label.configure(image=None)
def preview_face(path):
img = Image.open(path)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
face_label.configure(image=photo_img)
face_label.image = photo_img
def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT
if is_image(roop.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(roop.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
else:
output_path = None
if output_path:
roop.globals.output_path = output_path
RECENT_DIRECTORY_OUTPUT = os.path.dirname(roop.globals.output_path)
start()
def preview_target(frame):
img = Image.fromarray(frame)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
target_label.configure(image=photo_img)
target_label.image = photo_img
def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage:
image = Image.open(image_path)
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
def update_status_label(value):
status_label["text"] = value
window.update()
def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: int = 0) -> ctk.CTkImage:
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
def init(
initial_values: dict,
select_face_handler: Callable[[str], None],
select_target_handler: Callable[[str], Tuple[int, Any]],
toggle_all_faces_handler: Callable[[int], None],
toggle_fps_limit_handler: Callable[[int], None],
toggle_keep_frames_handler: Callable[[int], None],
save_file_handler: Callable[[str], None],
start: Callable[[], None],
get_video_frame: Callable[[str, int], None],
create_test_preview: Callable[[int], Any],
):
global window, preview, preview_visible, face_label, target_label, status_label
def toggle_preview() -> None:
if PREVIEW.state() == 'normal':
PREVIEW.withdraw()
elif roop.globals.source_path and roop.globals.target_path:
init_preview()
update_preview()
PREVIEW.deiconify()
window = tk.Tk()
window.geometry("600x700")
window.title("roop")
window.configure(bg="#2d3436")
window.resizable(width=False, height=False)
preview_visible = tk.BooleanVar(window, False)
target_path = tk.StringVar()
frames_amount = tk.IntVar()
def init_preview() -> None:
if is_image(roop.globals.target_path):
preview_slider.pack_forget()
if is_video(roop.globals.target_path):
video_frame_total = get_video_frame_total(roop.globals.target_path)
preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x')
preview_slider.set(0)
# Preview window
preview = create_preview(window)
# Contact information
support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8))
support_link.place(x=180,y=20,width=250,height=30)
support_link.bind("<Button-1>", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v"))
left_frame = tk.Frame(window)
left_frame.place(x=60, y=100, width=180, height=180)
face_label = tk.Label(left_frame)
face_label.pack(fill='both', side='top', expand=True)
right_frame = tk.Frame(window)
right_frame.place(x=360, y=100, width=180, height=180)
target_label = tk.Label(right_frame)
target_label.pack(fill='both', side='top', expand=True)
# Select a face button
face_button = create_background_button(window, "Select a face", lambda: [
select_face(select_face_handler)
])
face_button.place(x=60,y=320,width=180,height=80)
# Select a target button
target_button = create_background_button(window, "Select a target", lambda: [
select_target(select_target_handler, target_path, frames_amount),
update_slider(get_video_frame, create_test_preview, target_path.get(), frames_amount.get())
])
target_button.place(x=360,y=320,width=180,height=80)
# All faces checkbox
all_faces = tk.IntVar(None, initial_values['all_faces'])
all_faces_checkbox = create_check(window, "Process all faces in frame", all_faces, toggle_all_faces(toggle_all_faces_handler, all_faces))
all_faces_checkbox.place(x=60,y=500,width=240,height=31)
# FPS limit checkbox
limit_fps = tk.IntVar(None, not initial_values['keep_fps'])
fps_checkbox = create_check(window, "Limit FPS to 30", limit_fps, toggle_fps_limit(toggle_fps_limit_handler, limit_fps))
fps_checkbox.place(x=60,y=475,width=240,height=31)
# Keep frames checkbox
keep_frames = tk.IntVar(None, initial_values['keep_frames'])
frames_checkbox = create_check(window, "Keep frames dir", keep_frames, toggle_keep_frames(toggle_keep_frames_handler, keep_frames))
frames_checkbox.place(x=60,y=450,width=240,height=31)
# Start button
start_button = create_button(window, "Start", lambda: [save_file(save_file_handler, target_path.get()), preview_thread(lambda: start(update_preview))])
start_button.place(x=170,y=560,width=120,height=49)
# Preview button
preview_button = create_button(window, "Preview", lambda: open_preview_window(get_video_frame, target_path.get()))
preview_button.place(x=310,y=560,width=120,height=49)
# Status label
status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436")
status_label.place(x=10,y=640,width=580,height=30)
return window
def update_preview(frame_number: int = 0) -> None:
if roop.globals.source_path and roop.globals.target_path:
temp_frame = get_video_frame(roop.globals.target_path, frame_number)
if predict_frame(temp_frame):
quit()
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(roop.globals.source_path)),
temp_frame
)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)

141
roop/utilities.py Normal file
View File

@@ -0,0 +1,141 @@
import glob
import mimetypes
import os
import platform
import shutil
import ssl
import subprocess
import urllib
from pathlib import Path
from typing import List, Any
from tqdm import tqdm
import roop.globals
TEMP_FILE = 'temp.mp4'
TEMP_DIRECTORY = 'temp'
# monkey patch ssl for mac
if platform.system().lower() == 'darwin':
ssl._create_default_https_context = ssl._create_unverified_context
def run_ffmpeg(args: List[str]) -> bool:
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', roop.globals.log_level]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
return True
except Exception:
pass
return False
def detect_fps(target_path: str) -> float:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
output = subprocess.check_output(command).decode().strip().split('/')
try:
numerator, denominator = map(int, output)
return numerator / denominator
except Exception:
pass
return 30.0
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
def create_video(target_path: str, fps: float = 30.0) -> None:
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', roop.globals.video_encoder, '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
def restore_audio(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
if not done:
move_temp(target_path, output_path)
def get_temp_frame_paths(target_path: str) -> List[str]:
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
def get_temp_directory_path(target_path: str) -> str:
target_name, _ = os.path.splitext(os.path.basename(target_path))
target_directory_path = os.path.dirname(target_path)
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name)
def get_temp_output_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_FILE)
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
if source_path and target_path:
source_name, _ = os.path.splitext(os.path.basename(source_path))
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path):
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
return output_path
def create_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
if os.path.isfile(temp_output_path):
if os.path.isfile(output_path):
os.remove(output_path)
shutil.move(temp_output_path, output_path)
def clean_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
if not roop.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
os.rmdir(parent_directory_path)
def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
mimetype, _ = mimetypes.guess_type(image_path)
return bool(mimetype and mimetype.startswith('image/'))
return False
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
mimetype, _ = mimetypes.guess_type(video_path)
return bool(mimetype and mimetype.startswith('video/'))
return False
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
for url in urls:
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
def resolve_relative_path(path: str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))

View File

@@ -1,72 +0,0 @@
import os
import shutil
import roop.globals
sep = "/"
if os.name == "nt":
sep = "\\"
def path(string):
if sep == "\\":
return string.replace("/", "\\")
return string
def run_command(command, mode="silent"):
if mode == "debug":
return os.system(command)
return os.popen(command).read()
def detect_fps(input_path):
input_path = path(input_path)
output = os.popen(f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"').read()
if "/" in output:
try:
return int(output.split("/")[0]) // int(output.split("/")[1].strip()), output.strip()
except:
pass
return 30, 30
def run_ffmpeg(args):
log_level = f'-loglevel {roop.globals.log_level}'
run_command(f'ffmpeg {log_level} {args}')
def set_fps(input_path, output_path, fps):
input_path, output_path = path(input_path), path(output_path)
run_ffmpeg(f'-i "{input_path}" -filter:v fps=fps={fps} "{output_path}"')
def create_video(video_name, fps, output_dir):
hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else ''
output_dir = path(output_dir)
run_ffmpeg(f'{hwaccel_option} -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"')
def extract_frames(input_path, output_dir):
hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else ''
input_path, output_dir = path(input_path), path(output_dir)
run_ffmpeg(f' {hwaccel_option} -i "{input_path}" "{output_dir}{sep}%04d.png"')
def add_audio(output_dir, target_path, video, keep_frames, output_file):
video_name = os.path.splitext(video)[0]
save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4"
save_to_ff, output_dir_ff = path(save_to), path(output_dir)
run_ffmpeg(f'-i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}"')
if not os.path.isfile(save_to):
shutil.move(output_dir + "/output.mp4", save_to)
if not keep_frames:
shutil.rmtree(output_dir)
def is_img(path):
return path.lower().endswith(("png", "jpg", "jpeg", "bmp"))
def rreplace(s, old, new, occurrence):
li = s.rsplit(old, occurrence)
return new.join(li)