Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b7c3c9bc87 | |||
| e56a79222e | |||
| 5b0bf735b5 | |||
| c02bd519d8 | |||
| 36bb1a29b0 | |||
| 2bbc150bfb | |||
| 07b4d66965 | |||
| ff7cc3ac2f | |||
| f0ec0744f7 | |||
| 36b6ea0019 | |||
| 523ee53c34 | |||
| e544889805 | |||
| c6524facfb | |||
| 91baa6c0a5 | |||
| 647c5f250f | |||
| ae88412aae | |||
| b7e011f5e7 |
@@ -25,3 +25,4 @@ models/DMDNet.pth
|
||||
faceswap/
|
||||
.vscode/
|
||||
switch_states.json
|
||||
/models
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
<h1 align="center">Deep-Live-Cam 2.0.2c</h1>
|
||||
<h1 align="center">Deep-Live-Cam 2.0.4c</h1>
|
||||
|
||||
<p align="center">
|
||||
Real-time face swap and video deepfake with a single click and only a single image.
|
||||
@@ -30,7 +30,7 @@ By using this software, you agree to these terms and commit to using it in a man
|
||||
|
||||
Users are expected to use this software responsibly and legally. If using a real person's face, obtain their consent and clearly label any output as a deepfake when sharing online. We are not responsible for end-user actions.
|
||||
|
||||
## Exclusive v2.4 Quick Start - Pre-built (Windows/Mac Silicon)
|
||||
## Exclusive v2.6d Quick Start - Pre-built (Windows/Mac Silicon)
|
||||
|
||||
<a href="https://deeplivecam.net/index.php/quickstart"> <img src="media/Download.png" width="285" height="77" />
|
||||
|
||||
|
||||
+2
-1
@@ -1,6 +1,7 @@
|
||||
from typing import Any
|
||||
import cv2
|
||||
import modules.globals # Import the globals to check the color correction toggle
|
||||
from modules.gpu_processing import gpu_cvt_color
|
||||
|
||||
|
||||
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
|
||||
@@ -19,7 +20,7 @@ def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
|
||||
|
||||
if has_frame and modules.globals.color_correction:
|
||||
# Convert the frame color if necessary
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
frame = gpu_cvt_color(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
capture.release()
|
||||
return frame if has_frame else None
|
||||
|
||||
+9
-4
@@ -11,7 +11,11 @@ import platform
|
||||
import signal
|
||||
import shutil
|
||||
import argparse
|
||||
import torch
|
||||
try:
|
||||
import torch
|
||||
HAS_TORCH = True
|
||||
except ImportError:
|
||||
HAS_TORCH = False
|
||||
import onnxruntime
|
||||
import tensorflow
|
||||
|
||||
@@ -21,11 +25,12 @@ import modules.ui as ui
|
||||
from modules.processors.frame.core import get_frame_processors_modules
|
||||
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
|
||||
|
||||
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||
if HAS_TORCH and 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||
del torch
|
||||
|
||||
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
|
||||
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
|
||||
if HAS_TORCH:
|
||||
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
|
||||
|
||||
|
||||
def parse_args() -> None:
|
||||
@@ -167,7 +172,7 @@ def limit_resources() -> None:
|
||||
|
||||
|
||||
def release_resources() -> None:
|
||||
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
|
||||
if 'CUDAExecutionProvider' in modules.globals.execution_providers and HAS_TORCH:
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
|
||||
|
||||
@@ -27,9 +27,10 @@ def get_face_analyser() -> Any:
|
||||
if FACE_ANALYSER is None:
|
||||
FACE_ANALYSER = insightface.app.FaceAnalysis(
|
||||
name='buffalo_l',
|
||||
providers=modules.globals.execution_providers
|
||||
providers=modules.globals.execution_providers,
|
||||
allowed_modules=['detection', 'recognition']
|
||||
)
|
||||
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
|
||||
FACE_ANALYSER.prepare(ctx_id=0, det_size=(320, 320))
|
||||
return FACE_ANALYSER
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,286 @@
|
||||
# --- START OF FILE gpu_processing.py ---
|
||||
"""
|
||||
GPU-accelerated image processing using OpenCV CUDA (cv2.cuda.GpuMat).
|
||||
|
||||
Provides drop-in replacements for common cv2 functions. When OpenCV is built
|
||||
with CUDA support the functions transparently upload → process → download via
|
||||
GpuMat; otherwise they fall back to the regular CPU path so the rest of the
|
||||
codebase never has to care whether CUDA is available.
|
||||
|
||||
Usage
|
||||
-----
|
||||
from modules.gpu_processing import (
|
||||
gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted,
|
||||
gpu_resize, gpu_cvt_color, gpu_flip,
|
||||
is_gpu_accelerated,
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Tuple, Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CUDA availability detection (evaluated once at import time)
|
||||
# ---------------------------------------------------------------------------
|
||||
CUDA_AVAILABLE: bool = False
|
||||
|
||||
try:
|
||||
# cv2.cuda.GpuMat is only present when OpenCV is compiled with CUDA
|
||||
_test_mat = cv2.cuda.GpuMat()
|
||||
# Verify we have the required filter / image-processing functions
|
||||
_has_gauss = hasattr(cv2.cuda, "createGaussianFilter")
|
||||
_has_resize = hasattr(cv2.cuda, "resize")
|
||||
_has_cvt = hasattr(cv2.cuda, "cvtColor")
|
||||
if _has_gauss and _has_resize and _has_cvt:
|
||||
CUDA_AVAILABLE = True
|
||||
print("[gpu_processing] OpenCV CUDA support detected – GPU-accelerated processing enabled.")
|
||||
else:
|
||||
missing = []
|
||||
if not _has_gauss:
|
||||
missing.append("createGaussianFilter")
|
||||
if not _has_resize:
|
||||
missing.append("resize")
|
||||
if not _has_cvt:
|
||||
missing.append("cvtColor")
|
||||
print(f"[gpu_processing] cv2.cuda.GpuMat exists but missing: {', '.join(missing)} – falling back to CPU.")
|
||||
except Exception:
|
||||
print("[gpu_processing] OpenCV CUDA not available – using CPU fallback for all operations.")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ensure_uint8(img: np.ndarray) -> np.ndarray:
|
||||
"""Clip and convert to uint8 if necessary."""
|
||||
if img.dtype != np.uint8:
|
||||
return np.clip(img, 0, 255).astype(np.uint8)
|
||||
return img
|
||||
|
||||
|
||||
def _ksize_odd(ksize: Tuple[int, int]) -> Tuple[int, int]:
|
||||
"""Ensure kernel dimensions are positive and odd (required by GaussianBlur)."""
|
||||
kw = max(1, ksize[0] // 2 * 2 + 1) if ksize[0] > 0 else 0
|
||||
kh = max(1, ksize[1] // 2 * 2 + 1) if ksize[1] > 0 else 0
|
||||
return (kw, kh)
|
||||
|
||||
|
||||
def _cv_type_for(img: np.ndarray) -> int:
|
||||
"""Return the OpenCV type constant matching *img* (uint8 only)."""
|
||||
channels = 1 if img.ndim == 2 else img.shape[2]
|
||||
if channels == 1:
|
||||
return cv2.CV_8UC1
|
||||
elif channels == 3:
|
||||
return cv2.CV_8UC3
|
||||
elif channels == 4:
|
||||
return cv2.CV_8UC4
|
||||
return cv2.CV_8UC3 # fallback
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – Gaussian Blur
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def gpu_gaussian_blur(
|
||||
src: np.ndarray,
|
||||
ksize: Tuple[int, int],
|
||||
sigma_x: float,
|
||||
sigma_y: float = 0,
|
||||
) -> np.ndarray:
|
||||
"""Drop-in replacement for ``cv2.GaussianBlur`` with CUDA acceleration.
|
||||
|
||||
Parameters match ``cv2.GaussianBlur(src, ksize, sigmaX, sigmaY)``.
|
||||
When *ksize* is ``(0, 0)`` OpenCV computes the kernel size from *sigma_x*.
|
||||
"""
|
||||
if CUDA_AVAILABLE:
|
||||
try:
|
||||
src_u8 = _ensure_uint8(src)
|
||||
cv_type = _cv_type_for(src_u8)
|
||||
ks = _ksize_odd(ksize) if ksize != (0, 0) else ksize
|
||||
|
||||
gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, ks, sigma_x, sigma_y)
|
||||
gpu_src = cv2.cuda.GpuMat()
|
||||
gpu_src.upload(src_u8)
|
||||
gpu_dst = gauss.apply(gpu_src)
|
||||
return gpu_dst.download()
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
return cv2.GaussianBlur(src, ksize, sigma_x, sigmaY=sigma_y)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – addWeighted
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def gpu_add_weighted(
|
||||
src1: np.ndarray,
|
||||
alpha: float,
|
||||
src2: np.ndarray,
|
||||
beta: float,
|
||||
gamma: float,
|
||||
) -> np.ndarray:
|
||||
"""Drop-in replacement for ``cv2.addWeighted`` with CUDA acceleration."""
|
||||
if CUDA_AVAILABLE:
|
||||
try:
|
||||
s1 = _ensure_uint8(src1)
|
||||
s2 = _ensure_uint8(src2)
|
||||
g1 = cv2.cuda.GpuMat()
|
||||
g2 = cv2.cuda.GpuMat()
|
||||
g1.upload(s1)
|
||||
g2.upload(s2)
|
||||
gpu_dst = cv2.cuda.addWeighted(g1, alpha, g2, beta, gamma)
|
||||
return gpu_dst.download()
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
return cv2.addWeighted(src1, alpha, src2, beta, gamma)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – Unsharp-mask sharpening
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def gpu_sharpen(
|
||||
src: np.ndarray,
|
||||
strength: float,
|
||||
sigma: float = 3,
|
||||
) -> np.ndarray:
|
||||
"""Unsharp-mask sharpening, optionally GPU-accelerated.
|
||||
|
||||
Equivalent to::
|
||||
|
||||
blurred = GaussianBlur(src, (0,0), sigma)
|
||||
result = addWeighted(src, 1+strength, blurred, -strength, 0)
|
||||
"""
|
||||
if strength <= 0:
|
||||
return src
|
||||
|
||||
if CUDA_AVAILABLE:
|
||||
try:
|
||||
src_u8 = _ensure_uint8(src)
|
||||
cv_type = _cv_type_for(src_u8)
|
||||
|
||||
gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, (0, 0), sigma)
|
||||
gpu_src = cv2.cuda.GpuMat()
|
||||
gpu_src.upload(src_u8)
|
||||
gpu_blurred = gauss.apply(gpu_src)
|
||||
gpu_sharp = cv2.cuda.addWeighted(gpu_src, 1.0 + strength, gpu_blurred, -strength, 0)
|
||||
result = gpu_sharp.download()
|
||||
return np.clip(result, 0, 255).astype(np.uint8)
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
blurred = cv2.GaussianBlur(src, (0, 0), sigma)
|
||||
sharpened = cv2.addWeighted(src, 1.0 + strength, blurred, -strength, 0)
|
||||
return np.clip(sharpened, 0, 255).astype(np.uint8)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – Resize
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Map common cv2 interpolation flags to their CUDA equivalents
|
||||
_INTERP_MAP = {
|
||||
cv2.INTER_NEAREST: cv2.INTER_NEAREST,
|
||||
cv2.INTER_LINEAR: cv2.INTER_LINEAR,
|
||||
cv2.INTER_CUBIC: cv2.INTER_CUBIC,
|
||||
cv2.INTER_AREA: cv2.INTER_AREA,
|
||||
cv2.INTER_LANCZOS4: cv2.INTER_LANCZOS4,
|
||||
}
|
||||
|
||||
|
||||
def gpu_resize(
|
||||
src: np.ndarray,
|
||||
dsize: Tuple[int, int],
|
||||
fx: float = 0,
|
||||
fy: float = 0,
|
||||
interpolation: int = cv2.INTER_LINEAR,
|
||||
) -> np.ndarray:
|
||||
"""Drop-in replacement for ``cv2.resize`` with CUDA acceleration.
|
||||
|
||||
Parameters match ``cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=...)``.
|
||||
"""
|
||||
if CUDA_AVAILABLE:
|
||||
try:
|
||||
src_u8 = _ensure_uint8(src)
|
||||
gpu_src = cv2.cuda.GpuMat()
|
||||
gpu_src.upload(src_u8)
|
||||
|
||||
interp = _INTERP_MAP.get(interpolation, cv2.INTER_LINEAR)
|
||||
|
||||
if dsize and dsize[0] > 0 and dsize[1] > 0:
|
||||
gpu_dst = cv2.cuda.resize(gpu_src, dsize, interpolation=interp)
|
||||
else:
|
||||
gpu_dst = cv2.cuda.resize(gpu_src, (0, 0), fx=fx, fy=fy, interpolation=interp)
|
||||
|
||||
return gpu_dst.download()
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
return cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=interpolation)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – Color conversion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def gpu_cvt_color(
|
||||
src: np.ndarray,
|
||||
code: int,
|
||||
) -> np.ndarray:
|
||||
"""Drop-in replacement for ``cv2.cvtColor`` with CUDA acceleration.
|
||||
|
||||
Parameters match ``cv2.cvtColor(src, code)``.
|
||||
"""
|
||||
if CUDA_AVAILABLE:
|
||||
try:
|
||||
src_u8 = _ensure_uint8(src)
|
||||
gpu_src = cv2.cuda.GpuMat()
|
||||
gpu_src.upload(src_u8)
|
||||
gpu_dst = cv2.cuda.cvtColor(gpu_src, code)
|
||||
return gpu_dst.download()
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
return cv2.cvtColor(src, code)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – Flip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def gpu_flip(
|
||||
src: np.ndarray,
|
||||
flip_code: int,
|
||||
) -> np.ndarray:
|
||||
"""Drop-in replacement for ``cv2.flip`` with CUDA acceleration.
|
||||
|
||||
Parameters match ``cv2.flip(src, flipCode)``.
|
||||
*flip_code*: 0 = vertical, 1 = horizontal, -1 = both.
|
||||
"""
|
||||
if CUDA_AVAILABLE:
|
||||
try:
|
||||
src_u8 = _ensure_uint8(src)
|
||||
gpu_src = cv2.cuda.GpuMat()
|
||||
gpu_src.upload(src_u8)
|
||||
gpu_dst = cv2.cuda.flip(gpu_src, flip_code)
|
||||
return gpu_dst.download()
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
return cv2.flip(src, flip_code)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Convenience: check at runtime whether GPU path is active
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def is_gpu_accelerated() -> bool:
|
||||
"""Return ``True`` when the CUDA path will be used."""
|
||||
return CUDA_AVAILABLE
|
||||
|
||||
# --- END OF FILE gpu_processing.py ---
|
||||
@@ -3,6 +3,7 @@ import opennsfw2
|
||||
from PIL import Image
|
||||
import cv2 # Add OpenCV import
|
||||
import modules.globals # Import globals to access the color correction toggle
|
||||
from modules.gpu_processing import gpu_cvt_color
|
||||
|
||||
from modules.typing import Frame
|
||||
|
||||
@@ -14,7 +15,7 @@ model = None
|
||||
def predict_frame(target_frame: Frame) -> bool:
|
||||
# Convert the frame to RGB before processing if color correction is enabled
|
||||
if modules.globals.color_correction:
|
||||
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
|
||||
target_frame = gpu_cvt_color(target_frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
image = Image.fromarray(target_frame)
|
||||
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
|
||||
|
||||
@@ -1,20 +1,20 @@
|
||||
# --- START OF FILE face_enhancer.py ---
|
||||
# Uses ONNX Runtime for GFPGAN face enhancement (no torch/gfpgan dependency)
|
||||
|
||||
from typing import Any, List
|
||||
import cv2
|
||||
import threading
|
||||
import gfpgan
|
||||
import numpy as np
|
||||
import os
|
||||
import platform
|
||||
import torch # Make sure torch is imported
|
||||
|
||||
import onnxruntime
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.face_analyser import get_one_face, get_many_faces
|
||||
from modules.typing import Frame, Face
|
||||
from modules.utilities import (
|
||||
conditional_download,
|
||||
is_image,
|
||||
is_video,
|
||||
)
|
||||
@@ -29,15 +29,29 @@ models_dir = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||
)
|
||||
|
||||
# Standard FFHQ 5-point face template for 512x512 resolution
|
||||
# Points: left_eye, right_eye, nose, left_mouth, right_mouth
|
||||
FFHQ_TEMPLATE_512 = np.array(
|
||||
[
|
||||
[192.98138, 239.94708],
|
||||
[318.90277, 240.19366],
|
||||
[256.63416, 314.01935],
|
||||
[201.26117, 371.41043],
|
||||
[313.08905, 371.15118],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = models_dir
|
||||
conditional_download(
|
||||
download_directory_path,
|
||||
[
|
||||
"https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth"
|
||||
],
|
||||
)
|
||||
model_path = os.path.join(models_dir, "gfpgan-1024.onnx")
|
||||
if not os.path.exists(model_path):
|
||||
update_status(
|
||||
f"GFPGAN ONNX model not found at {model_path}. "
|
||||
"Please place gfpgan-1024.onnx in the models folder.",
|
||||
NAME,
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@@ -50,108 +64,257 @@ def pre_start() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def get_face_enhancer() -> Any:
|
||||
def get_face_enhancer() -> onnxruntime.InferenceSession:
|
||||
"""
|
||||
Initializes and returns the GFPGAN face enhancer instance,
|
||||
prioritizing CUDA, then MPS (Mac), then CPU.
|
||||
Initializes and returns the GFPGAN ONNX Runtime inference session,
|
||||
using the execution providers configured in modules.globals.
|
||||
"""
|
||||
global FACE_ENHANCER
|
||||
|
||||
with THREAD_LOCK:
|
||||
if FACE_ENHANCER is None:
|
||||
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
|
||||
device = None
|
||||
try:
|
||||
# Priority 1: CUDA
|
||||
if torch.cuda.is_available():
|
||||
device = torch.device("cuda")
|
||||
print(f"{NAME}: Using CUDA device.")
|
||||
# Priority 2: MPS (Mac Silicon)
|
||||
elif platform.system() == "Darwin" and torch.backends.mps.is_available():
|
||||
device = torch.device("mps")
|
||||
print(f"{NAME}: Using MPS device.")
|
||||
# Priority 3: CPU
|
||||
else:
|
||||
device = torch.device("cpu")
|
||||
print(f"{NAME}: Using CPU device.")
|
||||
model_path = os.path.join(models_dir, "gfpgan-1024.onnx")
|
||||
|
||||
FACE_ENHANCER = gfpgan.GFPGANer(
|
||||
model_path=model_path,
|
||||
upscale=1, # upscale=1 means enhancement only, no resizing
|
||||
arch='clean',
|
||||
channel_multiplier=2,
|
||||
bg_upsampler=None,
|
||||
device=device
|
||||
if not os.path.exists(model_path):
|
||||
raise FileNotFoundError(
|
||||
f"{NAME}: Model not found at {model_path}"
|
||||
)
|
||||
print(f"{NAME}: GFPGANer initialized successfully on {device}.")
|
||||
|
||||
try:
|
||||
providers = modules.globals.execution_providers
|
||||
|
||||
session_options = onnxruntime.SessionOptions()
|
||||
session_options.graph_optimization_level = (
|
||||
onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
)
|
||||
|
||||
FACE_ENHANCER = onnxruntime.InferenceSession(
|
||||
model_path,
|
||||
sess_options=session_options,
|
||||
providers=providers,
|
||||
)
|
||||
|
||||
input_info = FACE_ENHANCER.get_inputs()[0]
|
||||
output_info = FACE_ENHANCER.get_outputs()[0]
|
||||
active_providers = FACE_ENHANCER.get_providers()
|
||||
print(
|
||||
f"{NAME}: GFPGAN ONNX model loaded successfully."
|
||||
)
|
||||
print(
|
||||
f"{NAME}: Input: {input_info.name}, "
|
||||
f"shape: {input_info.shape}, type: {input_info.type}"
|
||||
)
|
||||
print(
|
||||
f"{NAME}: Output: {output_info.name}, "
|
||||
f"shape: {output_info.shape}, type: {output_info.type}"
|
||||
)
|
||||
print(f"{NAME}: Active providers: {active_providers}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error initializing GFPGANer: {e}")
|
||||
# Fallback to CPU if initialization with GPU fails for some reason
|
||||
if device is not None and device.type != 'cpu':
|
||||
print(f"{NAME}: Falling back to CPU due to error.")
|
||||
try:
|
||||
device = torch.device("cpu")
|
||||
FACE_ENHANCER = gfpgan.GFPGANer(
|
||||
model_path=model_path,
|
||||
upscale=1,
|
||||
arch='clean',
|
||||
channel_multiplier=2,
|
||||
bg_upsampler=None,
|
||||
device=device
|
||||
)
|
||||
print(f"{NAME}: GFPGANer initialized successfully on CPU after fallback.")
|
||||
except Exception as fallback_e:
|
||||
print(f"{NAME}: FATAL: Could not initialize GFPGANer even on CPU: {fallback_e}")
|
||||
FACE_ENHANCER = None # Ensure it's None if totally failed
|
||||
else:
|
||||
# If it failed even on the first CPU attempt or device was already CPU
|
||||
print(f"{NAME}: FATAL: Could not initialize GFPGANer on CPU: {e}")
|
||||
FACE_ENHANCER = None # Ensure it's None if totally failed
|
||||
print(f"{NAME}: Error loading GFPGAN ONNX model: {e}")
|
||||
FACE_ENHANCER = None
|
||||
raise RuntimeError(
|
||||
f"{NAME}: Failed to load GFPGAN ONNX model: {e}"
|
||||
)
|
||||
|
||||
|
||||
# Check if enhancer is still None after attempting initialization
|
||||
if FACE_ENHANCER is None:
|
||||
raise RuntimeError(f"{NAME}: Failed to initialize GFPGANer. Check logs for errors.")
|
||||
raise RuntimeError(
|
||||
f"{NAME}: Failed to initialize GFPGAN ONNX session. Check logs."
|
||||
)
|
||||
|
||||
return FACE_ENHANCER
|
||||
|
||||
|
||||
def _align_face(
|
||||
frame: Frame, landmarks_5: np.ndarray, output_size: int
|
||||
) -> tuple:
|
||||
"""
|
||||
Align and crop a face from the frame using 5-point landmarks and the
|
||||
standard FFHQ template.
|
||||
|
||||
Returns:
|
||||
(aligned_face, affine_matrix) or (None, None) on failure.
|
||||
"""
|
||||
# Scale the 512-base template to the desired output size
|
||||
scale = output_size / 512.0
|
||||
template = FFHQ_TEMPLATE_512 * scale
|
||||
|
||||
# Estimate a similarity transform (4 DOF: rotation, scale, tx, ty)
|
||||
affine_matrix, _ = cv2.estimateAffinePartial2D(
|
||||
landmarks_5, template, method=cv2.LMEDS
|
||||
)
|
||||
if affine_matrix is None:
|
||||
return None, None
|
||||
|
||||
# Warp the face to the aligned position
|
||||
aligned_face = cv2.warpAffine(
|
||||
frame,
|
||||
affine_matrix,
|
||||
(output_size, output_size),
|
||||
borderMode=cv2.BORDER_CONSTANT,
|
||||
borderValue=(135, 133, 132),
|
||||
)
|
||||
|
||||
return aligned_face, affine_matrix
|
||||
|
||||
|
||||
def _paste_back(
|
||||
frame: Frame,
|
||||
enhanced_face: np.ndarray,
|
||||
affine_matrix: np.ndarray,
|
||||
output_size: int,
|
||||
) -> Frame:
|
||||
"""
|
||||
Paste an enhanced (aligned) face back onto the original frame using the
|
||||
inverse affine transform with feathered-edge blending.
|
||||
"""
|
||||
h, w = frame.shape[:2]
|
||||
|
||||
# Inverse the affine warp
|
||||
inv_matrix = cv2.invertAffineTransform(affine_matrix)
|
||||
inv_restored = cv2.warpAffine(
|
||||
enhanced_face,
|
||||
inv_matrix,
|
||||
(w, h),
|
||||
borderMode=cv2.BORDER_CONSTANT,
|
||||
borderValue=(0, 0, 0),
|
||||
)
|
||||
|
||||
# Build a soft feathered mask in aligned space for edge blending
|
||||
face_mask = np.ones((output_size, output_size), dtype=np.float32)
|
||||
|
||||
# Feather the border (5 % of the size on each edge)
|
||||
border = max(1, int(output_size * 0.05))
|
||||
ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32)
|
||||
ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32)
|
||||
|
||||
# Top / bottom rows
|
||||
face_mask[:border, :] *= ramp_up[:, None]
|
||||
face_mask[-border:, :] *= ramp_down[:, None]
|
||||
# Left / right columns
|
||||
face_mask[:, :border] *= ramp_up[None, :]
|
||||
face_mask[:, -border:] *= ramp_down[None, :]
|
||||
|
||||
# Expand to 3-channel
|
||||
face_mask_3c = np.stack([face_mask] * 3, axis=-1)
|
||||
|
||||
# Warp mask back to original frame space
|
||||
inv_mask = cv2.warpAffine(
|
||||
face_mask_3c,
|
||||
inv_matrix,
|
||||
(w, h),
|
||||
borderMode=cv2.BORDER_CONSTANT,
|
||||
borderValue=(0, 0, 0),
|
||||
)
|
||||
inv_mask = np.clip(inv_mask, 0.0, 1.0)
|
||||
|
||||
# Alpha-blend
|
||||
result = (
|
||||
frame.astype(np.float32) * (1.0 - inv_mask)
|
||||
+ inv_restored.astype(np.float32) * inv_mask
|
||||
)
|
||||
return np.clip(result, 0, 255).astype(np.uint8)
|
||||
|
||||
|
||||
def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Convert an aligned BGR uint8 face image to the ONNX model input tensor.
|
||||
Format: NCHW float32, normalised to [-1, 1].
|
||||
"""
|
||||
# BGR -> RGB
|
||||
rgb = cv2.cvtColor(aligned_face, cv2.COLOR_BGR2RGB).astype(np.float32)
|
||||
# [0, 255] -> [0, 1] -> [-1, 1]
|
||||
rgb = rgb / 255.0
|
||||
rgb = (rgb - 0.5) / 0.5
|
||||
# HWC -> CHW, add batch dim
|
||||
chw = np.transpose(rgb, (2, 0, 1))
|
||||
return np.expand_dims(chw, axis=0) # shape: (1, 3, H, W)
|
||||
|
||||
|
||||
def _postprocess_face(output: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Convert the ONNX model output tensor back to a BGR uint8 image.
|
||||
Expects input in NCHW format with values in [-1, 1].
|
||||
"""
|
||||
face = np.squeeze(output) # remove batch dim -> (3, H, W)
|
||||
face = np.transpose(face, (1, 2, 0)) # CHW -> HWC
|
||||
# [-1, 1] -> [0, 1] -> [0, 255]
|
||||
face = (face + 1.0) / 2.0
|
||||
face = np.clip(face * 255.0, 0, 255).astype(np.uint8)
|
||||
# RGB -> BGR
|
||||
return cv2.cvtColor(face, cv2.COLOR_RGB2BGR)
|
||||
|
||||
|
||||
def enhance_face(temp_frame: Frame) -> Frame:
|
||||
"""Enhances faces in a single frame using the global GFPGANer instance."""
|
||||
# Ensure enhancer is ready
|
||||
enhancer = get_face_enhancer()
|
||||
"""Enhances all faces in a frame using the GFPGAN ONNX model."""
|
||||
session = get_face_enhancer()
|
||||
|
||||
# Determine model input resolution from the session metadata
|
||||
input_info = session.get_inputs()[0]
|
||||
input_name = input_info.name
|
||||
input_shape = input_info.shape # e.g. [1, 3, 512, 512]
|
||||
# Safely extract input size (handle dynamic / symbolic dimensions)
|
||||
try:
|
||||
with THREAD_SEMAPHORE:
|
||||
# The enhance method returns: _, restored_faces, restored_img
|
||||
_, _, restored_img = enhancer.enhance(
|
||||
temp_frame,
|
||||
has_aligned=False, # Assume faces are not pre-aligned
|
||||
only_center_face=False, # Enhance all detected faces
|
||||
paste_back=True # Paste enhanced faces back onto the original image
|
||||
)
|
||||
# GFPGAN might return None if no face is detected or an error occurs
|
||||
if restored_img is None:
|
||||
# print(f"{NAME}: Warning: GFPGAN enhancement returned None. Returning original frame.")
|
||||
return temp_frame
|
||||
return restored_img
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error during face enhancement: {e}")
|
||||
# Return the original frame in case of error during enhancement
|
||||
align_size = int(input_shape[2])
|
||||
if align_size <= 0:
|
||||
align_size = 512
|
||||
except (ValueError, TypeError, IndexError):
|
||||
align_size = 512
|
||||
|
||||
# Detect faces using InsightFace (already a project dependency)
|
||||
faces = get_many_faces(temp_frame)
|
||||
if not faces:
|
||||
return temp_frame
|
||||
|
||||
result_frame = temp_frame.copy()
|
||||
|
||||
for face in faces:
|
||||
# Need the 5-point key-points for alignment
|
||||
if not hasattr(face, "kps") or face.kps is None:
|
||||
continue
|
||||
|
||||
landmarks_5 = face.kps.astype(np.float32)
|
||||
if landmarks_5.shape[0] < 5:
|
||||
continue
|
||||
|
||||
# Align / crop the face at the model's INPUT resolution
|
||||
aligned_face, affine_matrix = _align_face(
|
||||
temp_frame, landmarks_5, output_size=align_size
|
||||
)
|
||||
if aligned_face is None or affine_matrix is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
with THREAD_SEMAPHORE:
|
||||
input_tensor = _preprocess_face(aligned_face)
|
||||
output_tensor = session.run(None, {input_name: input_tensor})[0]
|
||||
enhanced_bgr = _postprocess_face(output_tensor)
|
||||
|
||||
# The model may output at a different resolution than its input
|
||||
# (e.g. input 512x512 → output 1024x1024). Resize the enhanced
|
||||
# face back to the alignment size so the inverse affine maps
|
||||
# correctly.
|
||||
eh, ew = enhanced_bgr.shape[:2]
|
||||
if eh != align_size or ew != align_size:
|
||||
enhanced_bgr = cv2.resize(
|
||||
enhanced_bgr,
|
||||
(align_size, align_size),
|
||||
interpolation=cv2.INTER_LANCZOS4,
|
||||
)
|
||||
|
||||
# Paste enhanced face back onto the frame
|
||||
result_frame = _paste_back(
|
||||
result_frame, enhanced_bgr, affine_matrix, output_size=align_size
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error enhancing a face: {e}")
|
||||
continue
|
||||
|
||||
return result_frame
|
||||
|
||||
|
||||
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
|
||||
"""Processes a frame: enhances face if detected."""
|
||||
# We don't strictly need source_face for enhancement only
|
||||
# Check if any face exists to potentially save processing time, though GFPGAN also does detection.
|
||||
# For simplicity and ensuring enhancement is attempted if possible, we can rely on enhance_face.
|
||||
# target_face = get_one_face(temp_frame) # This gets only ONE face
|
||||
# If you want to enhance ONLY if a face is detected by your *own* analyser first:
|
||||
# has_face = get_one_face(temp_frame) is not None # Or use get_many_faces
|
||||
# if has_face:
|
||||
# temp_frame = enhance_face(temp_frame)
|
||||
# else: # Enhance regardless, let GFPGAN handle detection
|
||||
temp_frame = enhance_face(temp_frame)
|
||||
return temp_frame
|
||||
|
||||
@@ -162,14 +325,18 @@ def process_frames(
|
||||
"""Processes multiple frames from file paths."""
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
if not os.path.exists(temp_frame_path):
|
||||
print(f"{NAME}: Warning: Frame path not found {temp_frame_path}, skipping.")
|
||||
print(
|
||||
f"{NAME}: Warning: Frame path not found {temp_frame_path}, skipping."
|
||||
)
|
||||
if progress:
|
||||
progress.update(1)
|
||||
continue
|
||||
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
if temp_frame is None:
|
||||
print(f"{NAME}: Warning: Failed to read frame {temp_frame_path}, skipping.")
|
||||
print(
|
||||
f"{NAME}: Warning: Failed to read frame {temp_frame_path}, skipping."
|
||||
)
|
||||
if progress:
|
||||
progress.update(1)
|
||||
continue
|
||||
@@ -180,7 +347,9 @@ def process_frames(
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
|
||||
def process_image(
|
||||
source_path: str | None, target_path: str, output_path: str
|
||||
) -> None:
|
||||
"""Processes a single image file."""
|
||||
target_frame = cv2.imread(target_path)
|
||||
if target_frame is None:
|
||||
@@ -191,16 +360,13 @@ def process_image(source_path: str | None, target_path: str, output_path: str) -
|
||||
print(f"{NAME}: Enhanced image saved to {output_path}")
|
||||
|
||||
|
||||
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
|
||||
def process_video(
|
||||
source_path: str | None, temp_frame_paths: List[str]
|
||||
) -> None:
|
||||
"""Processes video frames using the frame processor core."""
|
||||
# source_path might be optional depending on how process_video is called
|
||||
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||
modules.processors.frame.core.process_video(
|
||||
source_path, temp_frame_paths, process_frames
|
||||
)
|
||||
|
||||
# Optional: Keep process_frame_v2 if it's used elsewhere, otherwise it's redundant
|
||||
# def process_frame_v2(temp_frame: Frame) -> Frame:
|
||||
# target_face = get_one_face(temp_frame)
|
||||
# if target_face:
|
||||
# temp_frame = enhance_face(temp_frame)
|
||||
# return temp_frame
|
||||
|
||||
# --- END OF FILE face_enhancer.py ---
|
||||
# --- END OF FILE face_enhancer.py ---
|
||||
|
||||
@@ -2,6 +2,7 @@ import cv2
|
||||
import numpy as np
|
||||
from modules.typing import Face, Frame
|
||||
import modules.globals
|
||||
from modules.gpu_processing import gpu_gaussian_blur, gpu_resize, gpu_cvt_color
|
||||
|
||||
def apply_color_transfer(source, target):
|
||||
"""
|
||||
@@ -61,8 +62,8 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||
# Fill the padded convex hull
|
||||
cv2.fillConvexPoly(mask, hull_padded, 255)
|
||||
|
||||
# Smooth the mask edges
|
||||
mask = cv2.GaussianBlur(mask, (5, 5), 3)
|
||||
# Smooth the mask edges (GPU-accelerated when available)
|
||||
mask = gpu_gaussian_blur(mask, (5, 5), 3)
|
||||
|
||||
return mask
|
||||
|
||||
@@ -123,8 +124,8 @@ def create_lower_mouth_mask(
|
||||
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
|
||||
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
|
||||
|
||||
# Apply Gaussian blur to soften the mask edges
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
|
||||
# Apply Gaussian blur to soften the mask edges (GPU-accelerated when available)
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (15, 15), 5)
|
||||
|
||||
# Place the mask ROI in the full-sized mask
|
||||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
||||
@@ -192,8 +193,8 @@ def create_eyes_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple
|
||||
cv2.ellipse(mask_roi, left_center, left_axes, 0, 0, 360, 255, -1)
|
||||
cv2.ellipse(mask_roi, right_center, right_axes, 0, 0, 360, 255, -1)
|
||||
|
||||
# Apply Gaussian blur to soften mask edges
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
|
||||
# Apply Gaussian blur to soften mask edges (GPU-accelerated when available)
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (15, 15), 5)
|
||||
|
||||
# Place the mask ROI in the full-sized mask
|
||||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
||||
@@ -374,15 +375,15 @@ def create_eyebrows_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, t
|
||||
left_shape = create_curved_eyebrow(left_local)
|
||||
right_shape = create_curved_eyebrow(right_local)
|
||||
|
||||
# Apply multi-stage blurring for natural feathering
|
||||
# Apply multi-stage blurring for natural feathering (GPU-accelerated when available)
|
||||
# First, strong Gaussian blur for initial softening
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (21, 21), 7)
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (21, 21), 7)
|
||||
|
||||
# Second, medium blur for transition areas
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (11, 11), 3)
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (11, 11), 3)
|
||||
|
||||
# Finally, light blur for fine details
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (5, 5), 1)
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (5, 5), 1)
|
||||
|
||||
# Normalize mask values
|
||||
mask_roi = cv2.normalize(mask_roi, None, 0, 255, cv2.NORM_MINMAX)
|
||||
@@ -405,7 +406,7 @@ def create_eyebrows_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, t
|
||||
right_local = right_eyebrow - [min_x, min_y]
|
||||
cv2.fillPoly(mask_roi, [left_local.astype(np.int32)], 255)
|
||||
cv2.fillPoly(mask_roi, [right_local.astype(np.int32)], 255)
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (21, 21), 7)
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (21, 21), 7)
|
||||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
||||
eyebrows_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
||||
eyebrows_polygon = np.vstack([left_eyebrow, right_eyebrow]).astype(np.int32)
|
||||
@@ -433,11 +434,11 @@ def apply_mask_area(
|
||||
return frame
|
||||
|
||||
try:
|
||||
resized_cutout = cv2.resize(cutout, (box_width, box_height))
|
||||
resized_cutout = gpu_resize(cutout, (box_width, box_height))
|
||||
roi = frame[min_y:max_y, min_x:max_x]
|
||||
|
||||
if roi.shape != resized_cutout.shape:
|
||||
resized_cutout = cv2.resize(
|
||||
resized_cutout = gpu_resize(
|
||||
resized_cutout, (roi.shape[1], roi.shape[0])
|
||||
)
|
||||
|
||||
@@ -457,8 +458,8 @@ def apply_mask_area(
|
||||
adjusted_polygon = polygon - [min_x, min_y]
|
||||
cv2.fillPoly(polygon_mask, [adjusted_polygon], 255)
|
||||
|
||||
# Apply strong initial feathering
|
||||
polygon_mask = cv2.GaussianBlur(polygon_mask, (21, 21), 7)
|
||||
# Apply strong initial feathering (GPU-accelerated when available)
|
||||
polygon_mask = gpu_gaussian_blur(polygon_mask, (21, 21), 7)
|
||||
|
||||
# Apply additional feathering
|
||||
feather_amount = min(
|
||||
|
||||
@@ -15,6 +15,7 @@ from modules.utilities import (
|
||||
is_video,
|
||||
)
|
||||
from modules.cluster_analysis import find_closest_centroid
|
||||
from modules.gpu_processing import gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, gpu_resize, gpu_cvt_color
|
||||
import os
|
||||
from collections import deque
|
||||
import time
|
||||
@@ -43,11 +44,21 @@ models_dir = os.path.join(
|
||||
)
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = abs_dir
|
||||
# Use models_dir instead of abs_dir to save to the correct location
|
||||
download_directory_path = models_dir
|
||||
|
||||
# Make sure the models directory exists, catch permission errors if they occur
|
||||
try:
|
||||
os.makedirs(download_directory_path, exist_ok=True)
|
||||
except OSError as e:
|
||||
logging.error(f"Failed to create directory {download_directory_path} due to permission error: {e}")
|
||||
return False
|
||||
|
||||
# Use the direct download URL from Hugging Face
|
||||
conditional_download(
|
||||
download_directory_path,
|
||||
[
|
||||
"https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx"
|
||||
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx"
|
||||
],
|
||||
)
|
||||
return True
|
||||
@@ -158,7 +169,7 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
# print(f"Warning: Swapped frame shape {swapped_frame_raw.shape} differs from input {temp_frame.shape}.") # Debug
|
||||
# Attempt resize (might distort if aspect ratio changed, but better than crashing)
|
||||
try:
|
||||
swapped_frame_raw = cv2.resize(swapped_frame_raw, (temp_frame.shape[1], temp_frame.shape[0]))
|
||||
swapped_frame_raw = gpu_resize(swapped_frame_raw, (temp_frame.shape[1], temp_frame.shape[0]))
|
||||
except Exception as resize_e:
|
||||
# print(f"Error resizing swapped frame: {resize_e}") # Debug
|
||||
return original_frame
|
||||
@@ -236,7 +247,7 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
|
||||
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
|
||||
# Ensure both frames are uint8 before blending
|
||||
final_swapped_frame = cv2.addWeighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
|
||||
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
|
||||
|
||||
# Ensure final frame is uint8 after blending (addWeighted should preserve it, but belt-and-suspenders)
|
||||
final_swapped_frame = final_swapped_frame.astype(np.uint8)
|
||||
@@ -312,17 +323,10 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
|
||||
face_region = processed_frame[y1:y2, x1:x2]
|
||||
if face_region.size == 0: continue
|
||||
|
||||
# Apply sharpening with optimized parameters for Apple Silicon
|
||||
# Apply sharpening (GPU-accelerated when CUDA OpenCV is available)
|
||||
try:
|
||||
# Use smaller sigma for faster processing on Apple Silicon
|
||||
sigma = 2 if IS_APPLE_SILICON else 3
|
||||
blurred = cv2.GaussianBlur(face_region, (0, 0), sigma)
|
||||
sharpened_region = cv2.addWeighted(
|
||||
face_region, 1.0 + sharpness_value,
|
||||
blurred, -sharpness_value,
|
||||
0
|
||||
)
|
||||
sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8)
|
||||
sharpened_region = gpu_sharpen(face_region, strength=sharpness_value, sigma=sigma)
|
||||
processed_frame[y1:y2, x1:x2] = sharpened_region
|
||||
except cv2.error:
|
||||
pass
|
||||
@@ -338,7 +342,7 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
|
||||
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype:
|
||||
# Perform interpolation
|
||||
try:
|
||||
final_frame = cv2.addWeighted(
|
||||
final_frame = gpu_add_weighted(
|
||||
PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight,
|
||||
processed_frame, interpolation_weight,
|
||||
0
|
||||
@@ -813,10 +817,10 @@ def create_lower_mouth_mask(
|
||||
# Draw polygon on the ROI mask
|
||||
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
|
||||
|
||||
# Apply Gaussian blur (ensure kernel size is odd and positive)
|
||||
# Apply Gaussian blur (GPU-accelerated when available)
|
||||
blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15
|
||||
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (blur_k_size, blur_k_size), 0) # Sigma=0 calculates from kernel
|
||||
mask_roi = gpu_gaussian_blur(mask_roi, (blur_k_size, blur_k_size), 0)
|
||||
|
||||
# Place the mask ROI in the full-sized mask
|
||||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
||||
@@ -952,7 +956,7 @@ def apply_mouth_area(
|
||||
if roi.shape[:2] != mouth_cutout.shape[:2]:
|
||||
# Check if mouth_cutout has valid dimensions before resizing
|
||||
if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0:
|
||||
resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
|
||||
resized_mouth_cutout = gpu_resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
|
||||
else:
|
||||
# print("Warning: mouth_cutout has invalid dimensions, cannot resize.")
|
||||
return frame # Cannot proceed without valid cutout
|
||||
@@ -1125,14 +1129,10 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||
return mask # Return empty mask on error
|
||||
|
||||
|
||||
# Apply Gaussian blur to feather the mask edges
|
||||
# Kernel size should be reasonably large, odd, and positive
|
||||
# Apply Gaussian blur to feather the mask edges (GPU-accelerated when available)
|
||||
blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31
|
||||
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive
|
||||
|
||||
# Use sigma=0 to let OpenCV calculate from kernel size
|
||||
# Apply blur to the uint8 mask directly
|
||||
mask = cv2.GaussianBlur(mask, (blur_k_size, blur_k_size), 0)
|
||||
mask = gpu_gaussian_blur(mask, (blur_k_size, blur_k_size), 0)
|
||||
|
||||
# --- Optional: Return float mask for apply_mouth_area ---
|
||||
# mask = mask.astype(float) / 255.0
|
||||
|
||||
+151
-33
@@ -4,13 +4,18 @@ import customtkinter as ctk
|
||||
from typing import Callable, Tuple
|
||||
import cv2
|
||||
from cv2_enumerate_cameras import enumerate_cameras # Add this import
|
||||
from modules.gpu_processing import gpu_cvt_color, gpu_resize, gpu_flip
|
||||
from PIL import Image, ImageOps
|
||||
import time
|
||||
import json
|
||||
import queue
|
||||
import threading
|
||||
import numpy as np
|
||||
import modules.globals
|
||||
import modules.metadata
|
||||
from modules.face_analyser import (
|
||||
get_one_face,
|
||||
get_many_faces,
|
||||
get_unique_faces_from_target_image,
|
||||
get_unique_faces_from_target_video,
|
||||
add_blank_map,
|
||||
@@ -542,7 +547,7 @@ def create_source_target_popup(
|
||||
)
|
||||
x_label.grid(row=id, column=2, padx=10, pady=10)
|
||||
|
||||
image = Image.fromarray(cv2.cvtColor(item["target"]["cv2"], cv2.COLOR_BGR2RGB))
|
||||
image = Image.fromarray(gpu_cvt_color(item["target"]["cv2"], cv2.COLOR_BGR2RGB))
|
||||
image = image.resize(
|
||||
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
)
|
||||
@@ -597,7 +602,7 @@ def update_popup_source(
|
||||
}
|
||||
|
||||
image = Image.fromarray(
|
||||
cv2.cvtColor(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
gpu_cvt_color(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
)
|
||||
image = image.resize(
|
||||
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
@@ -790,7 +795,7 @@ def fit_image_to_size(image, width: int, height: int):
|
||||
ratio_w = width / w
|
||||
ratio = max(ratio_w, ratio_h)
|
||||
new_size = (int(ratio * w), int(ratio * h))
|
||||
return cv2.resize(image, dsize=new_size)
|
||||
return gpu_resize(image, dsize=new_size)
|
||||
|
||||
|
||||
def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage:
|
||||
@@ -808,7 +813,7 @@ def render_video_preview(
|
||||
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
has_frame, frame = capture.read()
|
||||
if has_frame:
|
||||
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||||
image = Image.fromarray(gpu_cvt_color(frame, cv2.COLOR_BGR2RGB))
|
||||
if size:
|
||||
image = ImageOps.fit(image, size, Image.LANCZOS)
|
||||
return ctk.CTkImage(image, size=image.size)
|
||||
@@ -846,7 +851,7 @@ def update_preview(frame_number: int = 0) -> None:
|
||||
temp_frame = frame_processor.process_frame(
|
||||
get_one_face(cv2.imread(modules.globals.source_path)), temp_frame
|
||||
)
|
||||
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
|
||||
image = Image.fromarray(gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB))
|
||||
image = ImageOps.contain(
|
||||
image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
)
|
||||
@@ -947,52 +952,97 @@ def get_available_cameras():
|
||||
return camera_indices, camera_names
|
||||
|
||||
|
||||
def create_webcam_preview(camera_index: int):
|
||||
global preview_label, PREVIEW
|
||||
def _capture_thread_func(cap, capture_queue, stop_event):
|
||||
"""Capture thread: reads frames from camera and puts them into the queue.
|
||||
Drops frames when the queue is full to avoid backpressure on the camera."""
|
||||
while not stop_event.is_set():
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
stop_event.set()
|
||||
break
|
||||
try:
|
||||
capture_queue.put_nowait(frame)
|
||||
except queue.Full:
|
||||
# Drop the oldest frame and enqueue the new one
|
||||
try:
|
||||
capture_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
try:
|
||||
capture_queue.put_nowait(frame)
|
||||
except queue.Full:
|
||||
pass
|
||||
|
||||
cap = VideoCapturer(camera_index)
|
||||
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
|
||||
update_status("Failed to start camera")
|
||||
return
|
||||
|
||||
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
|
||||
PREVIEW.deiconify()
|
||||
# How often to run full face detection. On intermediate frames the last
|
||||
# detected face positions are reused, which significantly reduces the
|
||||
# per-frame cost of the processing thread.
|
||||
DETECT_EVERY_N = 2
|
||||
|
||||
|
||||
def _processing_thread_func(capture_queue, processed_queue, stop_event):
|
||||
"""Processing thread: takes raw frames from capture_queue, applies face
|
||||
processing, and puts results into processed_queue. Drops processed frames
|
||||
when the output queue is full so the UI always gets the latest result.
|
||||
|
||||
Uses DETECT_EVERY_N to skip expensive face detection on intermediate
|
||||
frames, reusing cached face positions instead."""
|
||||
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
||||
source_image = None
|
||||
prev_time = time.time()
|
||||
fps_update_interval = 0.5
|
||||
frame_count = 0
|
||||
fps = 0
|
||||
proc_frame_index = 0
|
||||
cached_target_face = None # cached single-face result
|
||||
cached_many_faces = None # cached many-faces result
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
frame = capture_queue.get(timeout=0.05)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
temp_frame = frame.copy()
|
||||
run_detection = (proc_frame_index % DETECT_EVERY_N == 0)
|
||||
proc_frame_index += 1
|
||||
|
||||
if modules.globals.live_mirror:
|
||||
temp_frame = cv2.flip(temp_frame, 1)
|
||||
|
||||
if modules.globals.live_resizable:
|
||||
temp_frame = fit_image_to_size(
|
||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
||||
)
|
||||
|
||||
else:
|
||||
temp_frame = fit_image_to_size(
|
||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
||||
)
|
||||
temp_frame = gpu_flip(temp_frame, 1)
|
||||
|
||||
if not modules.globals.map_faces:
|
||||
if source_image is None and modules.globals.source_path:
|
||||
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
||||
|
||||
# Update face detection cache on detection frames
|
||||
if run_detection or (cached_target_face is None and cached_many_faces is None):
|
||||
if modules.globals.many_faces:
|
||||
cached_many_faces = get_many_faces(temp_frame)
|
||||
cached_target_face = None
|
||||
else:
|
||||
cached_target_face = get_one_face(temp_frame)
|
||||
cached_many_faces = None
|
||||
|
||||
for frame_processor in frame_processors:
|
||||
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
||||
if modules.globals.fp_ui["face_enhancer"]:
|
||||
temp_frame = frame_processor.process_frame(None, temp_frame)
|
||||
elif frame_processor.NAME == "DLC.FACE-SWAPPER":
|
||||
# Use cached face positions to skip redundant detection
|
||||
swapped_bboxes = []
|
||||
if modules.globals.many_faces and cached_many_faces:
|
||||
result = temp_frame.copy()
|
||||
for t_face in cached_many_faces:
|
||||
result = frame_processor.swap_face(source_image, t_face, result)
|
||||
if hasattr(t_face, 'bbox') and t_face.bbox is not None:
|
||||
swapped_bboxes.append(t_face.bbox.astype(int))
|
||||
temp_frame = result
|
||||
elif cached_target_face is not None:
|
||||
temp_frame = frame_processor.swap_face(source_image, cached_target_face, temp_frame)
|
||||
if hasattr(cached_target_face, 'bbox') and cached_target_face.bbox is not None:
|
||||
swapped_bboxes.append(cached_target_face.bbox.astype(int))
|
||||
# Apply post-processing (sharpening, interpolation)
|
||||
temp_frame = frame_processor.apply_post_processing(temp_frame, swapped_bboxes)
|
||||
else:
|
||||
temp_frame = frame_processor.process_frame(source_image, temp_frame)
|
||||
else:
|
||||
@@ -1023,7 +1073,71 @@ def create_webcam_preview(camera_index: int):
|
||||
2,
|
||||
)
|
||||
|
||||
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
|
||||
# Put processed frame into output queue, dropping old frames if full
|
||||
try:
|
||||
processed_queue.put_nowait(temp_frame)
|
||||
except queue.Full:
|
||||
try:
|
||||
processed_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
try:
|
||||
processed_queue.put_nowait(temp_frame)
|
||||
except queue.Full:
|
||||
pass
|
||||
|
||||
|
||||
def create_webcam_preview(camera_index: int):
|
||||
global preview_label, PREVIEW
|
||||
|
||||
cap = VideoCapturer(camera_index)
|
||||
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
|
||||
update_status("Failed to start camera")
|
||||
return
|
||||
|
||||
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
|
||||
PREVIEW.deiconify()
|
||||
|
||||
# Queues for decoupling capture from processing and processing from display.
|
||||
# Small maxsize ensures we always work on recent frames and drop stale ones.
|
||||
capture_queue = queue.Queue(maxsize=2)
|
||||
processed_queue = queue.Queue(maxsize=2)
|
||||
stop_event = threading.Event()
|
||||
|
||||
# Start capture thread
|
||||
cap_thread = threading.Thread(
|
||||
target=_capture_thread_func,
|
||||
args=(cap, capture_queue, stop_event),
|
||||
daemon=True,
|
||||
)
|
||||
cap_thread.start()
|
||||
|
||||
# Start processing thread
|
||||
proc_thread = threading.Thread(
|
||||
target=_processing_thread_func,
|
||||
args=(capture_queue, processed_queue, stop_event),
|
||||
daemon=True,
|
||||
)
|
||||
proc_thread.start()
|
||||
|
||||
# Main (UI) thread: pull processed frames and update the display
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
temp_frame = processed_queue.get(timeout=0.03)
|
||||
except queue.Empty:
|
||||
ROOT.update()
|
||||
continue
|
||||
|
||||
if modules.globals.live_resizable:
|
||||
temp_frame = fit_image_to_size(
|
||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
||||
)
|
||||
else:
|
||||
temp_frame = fit_image_to_size(
|
||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
||||
)
|
||||
|
||||
image = gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB)
|
||||
image = Image.fromarray(image)
|
||||
image = ImageOps.contain(
|
||||
image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
|
||||
@@ -1035,6 +1149,10 @@ def create_webcam_preview(camera_index: int):
|
||||
if PREVIEW.state() == "withdrawn":
|
||||
break
|
||||
|
||||
# Signal threads to stop and wait for them
|
||||
stop_event.set()
|
||||
cap_thread.join(timeout=2.0)
|
||||
proc_thread.join(timeout=2.0)
|
||||
cap.release()
|
||||
PREVIEW.withdraw()
|
||||
|
||||
@@ -1146,7 +1264,7 @@ def refresh_data(map: list):
|
||||
|
||||
if "source" in item:
|
||||
image = Image.fromarray(
|
||||
cv2.cvtColor(item["source"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
gpu_cvt_color(item["source"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
)
|
||||
image = image.resize(
|
||||
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
@@ -1164,7 +1282,7 @@ def refresh_data(map: list):
|
||||
|
||||
if "target" in item:
|
||||
image = Image.fromarray(
|
||||
cv2.cvtColor(item["target"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
gpu_cvt_color(item["target"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
)
|
||||
image = image.resize(
|
||||
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
@@ -1212,7 +1330,7 @@ def update_webcam_source(
|
||||
}
|
||||
|
||||
image = Image.fromarray(
|
||||
cv2.cvtColor(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
gpu_cvt_color(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
)
|
||||
image = image.resize(
|
||||
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
@@ -1264,7 +1382,7 @@ def update_webcam_target(
|
||||
}
|
||||
|
||||
image = Image.fromarray(
|
||||
cv2.cvtColor(map[button_num]["target"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
gpu_cvt_color(map[button_num]["target"]["cv2"], cv2.COLOR_BGR2RGB)
|
||||
)
|
||||
image = image.resize(
|
||||
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
|
||||
|
||||
+3
-11
@@ -1,5 +1,3 @@
|
||||
--extra-index-url https://download.pytorch.org/whl/cu128
|
||||
|
||||
numpy>=1.23.5,<2
|
||||
typing-extensions>=4.8.0
|
||||
opencv-python==4.10.0.84
|
||||
@@ -9,16 +7,10 @@ insightface==0.7.3
|
||||
psutil==5.9.8
|
||||
tk==0.1.0
|
||||
customtkinter==5.2.2
|
||||
pillow==11.1.0
|
||||
torch; sys_platform != 'darwin'
|
||||
torch==2.8.0+cu128; sys_platform == 'darwin'
|
||||
torchvision; sys_platform != 'darwin'
|
||||
torchvision==0.20.1; sys_platform == 'darwin'
|
||||
pillow==12.1.1
|
||||
onnxruntime-silicon==1.16.3; sys_platform == 'darwin' and platform_machine == 'arm64'
|
||||
onnxruntime-gpu==1.22.0; sys_platform != 'darwin'
|
||||
onnxruntime-gpu==1.24.2; sys_platform != 'darwin'
|
||||
tensorflow; sys_platform != 'darwin'
|
||||
opennsfw2==0.10.2
|
||||
protobuf==4.25.1
|
||||
git+https://github.com/xinntao/BasicSR.git@master
|
||||
git+https://github.com/TencentARC/GFPGAN.git@master
|
||||
protobuf==5.29.6
|
||||
pygrabber
|
||||
|
||||
Reference in New Issue
Block a user