Compare commits

..

34 Commits

Author SHA1 Message Date
Kenneth Estanislao 0d8f3b1f82 Fix on vulnerability report
https://github.com/hacksider/Deep-Live-Cam/issues/1695
2026-03-06 23:26:48 +08:00
KRSHH 6e9e7addf2 Update press section with recent media mentions 2026-03-03 21:16:56 +05:30
Kenneth Estanislao 0c7e871bfc Merge pull request #1689 from laurigates/pr/base-ui-tooltips
feat(ui): add hover tooltips to all controls
2026-02-28 02:41:07 +08:00
Lauri Gates e340b0da8a feat(ui): add hover tooltips to all controls
Add ToolTip class (modules/ui_tooltip.py) and wire descriptive hover
tooltips onto every button, switch, slider, and dropdown in the main
window. Tooltips appear after a 500ms hover delay and are clamped to
screen bounds.

This requires no new dependencies — ToolTip uses only customtkinter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 21:41:24 +02:00
Kenneth Estanislao d0f81ed755 Merge pull request #1671 from laurigates/pr/fix-macos-camera-enum
fix(macos): replace cv2_enumerate_cameras with safe bounded loop
2026-02-24 14:29:00 +08:00
Kenneth Estanislao de01b28802 Merge pull request #1678 from laurigates/pr/perf-opacity-handling
perf(face-swapper): optimize opacity handling and frame copies
2026-02-24 14:28:17 +08:00
Lauri Gates b645d5e60b fix(macos): replace cv2_enumerate_cameras with safe bounded loop
cv2_enumerate_cameras(CAP_AVFOUNDATION) probes indices 0-99 through
OpenCV's AVFoundation backend, which intermittently segfaults (exit
code 139) when invalid device indices are probed. Replace with a
bounded cv2.VideoCapture loop (range(10)) that safely skips
unavailable indices.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 17:22:35 +02:00
Kenneth Estanislao 31b3a97003 Merge pull request #1680 from laurigates/pr/perf-float32-buffer-reuse
perf(processing): optimize post-processing with float32 and buffer reuse
2026-02-23 15:13:03 +08:00
Kenneth Estanislao e3b46e83b7 Merge pull request #1669 from laurigates/pr/feat-gpen-enhancers
feat: add GPEN-BFR 256 and 512 ONNX face enhancers
2026-02-23 15:05:44 +08:00
Lauri Gates e93fb95903 perf(processing): optimize post-processing with float32 and buffer reuse
- Replace float64 with float32 in apply_mouth_area() blending masks —
  float32 provides sufficient precision for 8-bit image blending and
  halves memory bandwidth
- Use float32 in apply_mask_area() mask computations
- Vectorize hull padding loop in create_face_mask() (face_masking.py)
  replacing per-point Python loop with NumPy array operations
- Fix apply_color_transfer() to use proper [0,1] LAB conversion —
  cv2.cvtColor with float32 input expects [0,1] range, not [0,255]
- Pre-compute inverse masks to avoid repeated (1.0 - mask) subtraction
- Use np.broadcast_to instead of np.repeat for face mask expansion

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 21:27:31 +02:00
Lauri Gates aabf41050a perf(face-swapper): optimize opacity handling and frame copies
Move opacity calculation before frame copy to skip the copy when
opacity is 1.0 (common case). Add early return path for full opacity.
Clear PREVIOUS_FRAME_RESULT instead of caching when interpolation
is disabled.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 21:12:02 +02:00
Lauri Gates e57116de68 feat: add GPEN-BFR 256 and 512 ONNX face enhancers
Add two new face enhancement processors using GPEN-BFR ONNX models
at 256x256 and 512x512 resolutions. Models auto-download on first
use from GitHub releases. Integrates into existing frame processor
pipeline alongside GFPGAN enhancer with UI toggle switches.

- modules/paths.py: Shared path constants module
- modules/processors/frame/_onnx_enhancer.py: ONNX enhancement utilities
- modules/processors/frame/face_enhancer_gpen256.py: GPEN-BFR 256 processor
- modules/processors/frame/face_enhancer_gpen512.py: GPEN-BFR 512 processor
- modules/core.py: Add GPEN choices to --frame-processor CLI arg
- modules/globals.py: Add GPEN entries to fp_ui toggle dict
- modules/ui.py: Add GPEN toggle switches and processing integration

Closes #1663

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 19:39:12 +02:00
Kenneth Estanislao d5338a3eae Update version in README and add contributor 2026-02-23 01:02:22 +08:00
Kenneth Estanislao 7ec3a4be29 Merge pull request #1665 from laurigates/pr/perf-pipeline-threading
perf(ui): decouple face detection from swap in live webcam pipeline
2026-02-23 00:59:22 +08:00
Lauri Gates ca6cba9311 perf(ui): decouple face detection from swap in live webcam pipeline
Add a dedicated detection thread that runs face detection continuously
on the latest captured frame and publishes results to a shared dict.
The processing/swap thread reads cached detection results instead of
running detection inline, so it never blocks on the 15-30ms detection
cost.

Architecture change: 2 threads → 3 threads
  Before: capture → [detect + swap] → display
  After:  capture → swap (uses cached detections) → display
                  ↘ detect (async, writes to shared cache) ↗

Also replaces the blocking while/ROOT.update() display loop with
ROOT.after()-based scheduling, which avoids Tk event loop re-entrancy
issues and UI freezes.

Closes #1664
2026-02-22 18:41:47 +02:00
Kenneth Estanislao d89385457e Merge pull request #1659 from laurigates/pr/fix-tk9-compat
fix(ui): patch CTkOptionMenu for Tk 9.0 compatibility
2026-02-23 00:13:47 +08:00
Kenneth Estanislao b015f0099f Update GFPGANv1.4 download link to ONNX format 2026-02-23 00:03:37 +08:00
Kenneth Estanislao e56a79222e Merge branch 'main' of https://github.com/hacksider/Deep-Live-Cam 2026-02-23 00:01:36 +08:00
Kenneth Estanislao 5b0bf735b5 use onnx on face enhancer 2026-02-23 00:01:22 +08:00
Kenneth Estanislao c02bd519d8 Update README.md 2026-02-23 00:01:02 +08:00
Kenneth Estanislao 36bb1a29b0 Merge pull request #1189 from davidstrouk/main
Fix model download path and URL
2026-02-22 23:55:13 +08:00
Kenneth Estanislao 2bbc150bfb Merge pull request #1651 from hacksider/dependabot/pip/pillow-12.1.1
Bump pillow from 11.1.0 to 12.1.1
2026-02-22 18:01:34 +08:00
Lauri Gates a1722c7b2e fix(ui): patch CTkOptionMenu for Tk 9.0 compatibility
In Tk 9.0, Menu.index("end") returns "" instead of raising TclError
on empty menus. CustomTkinter's DropdownMenu._add_menu_commands
doesn't handle this case, causing a crash when creating CTkOptionMenu
widgets (e.g., the camera selector dropdown).

Add a monkey-patch that guards against the empty-string return value.
2026-02-22 11:59:51 +02:00
Kenneth Estanislao 07b4d66965 Update version in README to 2.0.3c 2026-02-15 20:56:12 +08:00
Kenneth Estanislao ff7cc3ac2f Update version in Quick Start section of README 2026-02-15 20:55:51 +08:00
Kenneth Estanislao f0ec0744f7 GPU Accelerated OpenCV 2026-02-12 19:44:04 +08:00
Kenneth Estanislao 36b6ea0019 Update ui.py
DETECT_EVERY_N = 2 reuses cached face positions on alternate frames
2026-02-12 18:54:18 +08:00
Kenneth Estanislao 523ee53c34 Update ui.py
Separate capture and processing threads with queue.Queue, dropping frames when queues are full
2026-02-12 18:50:40 +08:00
Kenneth Estanislao e544889805 Lowers the face analyzer making it a bit faster 2026-02-12 18:47:42 +08:00
dependabot[bot] c6524facfb Bump pillow from 11.1.0 to 12.1.1
Bumps [pillow](https://github.com/python-pillow/Pillow) from 11.1.0 to 12.1.1.
- [Release notes](https://github.com/python-pillow/Pillow/releases)
- [Changelog](https://github.com/python-pillow/Pillow/blob/main/CHANGES.rst)
- [Commits](https://github.com/python-pillow/Pillow/compare/11.1.0...12.1.1)

---
updated-dependencies:
- dependency-name: pillow
  dependency-version: 12.1.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-11 16:36:29 +00:00
Kenneth Estanislao 91baa6c0a5 Update Quick Start section to version 2.6 2026-02-10 23:54:02 +08:00
David Strouk 647c5f250f Update modules/processors/frame/face_swapper.py
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
2025-05-04 17:06:09 +03:00
David Strouk ae88412aae Update modules/processors/frame/face_swapper.py
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
2025-05-04 17:04:08 +03:00
David Strouk b7e011f5e7 Fix model download path and URL
- Use models_dir instead of abs_dir for download path
- Create models directory if it doesn't exist
- Fix Hugging Face download URL by using /resolve/ instead of /blob/
2025-05-04 16:59:04 +03:00
20 changed files with 1485 additions and 307 deletions
+2
View File
@@ -25,3 +25,5 @@ models/DMDNet.pth
faceswap/
.vscode/
switch_states.json
/models
install.bat
+14 -20
View File
@@ -1,4 +1,4 @@
<h1 align="center">Deep-Live-Cam 2.0.2c</h1>
<h1 align="center">Deep-Live-Cam 2.0.5c</h1>
<p align="center">
Real-time face swap and video deepfake with a single click and only a single image.
@@ -30,7 +30,7 @@ By using this software, you agree to these terms and commit to using it in a man
Users are expected to use this software responsibly and legally. If using a real person's face, obtain their consent and clearly label any output as a deepfake when sharing online. We are not responsible for end-user actions.
## Exclusive v2.4 Quick Start - Pre-built (Windows/Mac Silicon)
## Exclusive v2.6d Quick Start - Pre-built (Windows/Mac Silicon)
<a href="https://deeplivecam.net/index.php/quickstart"> <img src="media/Download.png" width="285" height="77" />
@@ -124,7 +124,7 @@ cd Deep-Live-Cam
**3. Download the Models**
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth)
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.onnx)
2. [inswapper\_128\_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx)
Place these files in the "**models**" folder.
@@ -338,23 +338,16 @@ Looking for a CLI mode? Using the -s/--source argument will make the run program
## Press
**We are always open to criticism and are ready to improve, that's why we didn't cherry-pick anything.**
- [*"Deep-Live-Cam goes viral, allowing anyone to become a digital doppelganger"*](https://arstechnica.com/information-technology/2024/08/new-ai-tool-enables-real-time-face-swapping-on-webcams-raising-fraud-concerns/) - Ars Technica
- [*"Thanks Deep Live Cam, shapeshifters are among us now"*](https://dataconomy.com/2024/08/15/what-is-deep-live-cam-github-deepfake/) - Dataconomy
- [*"This free AI tool lets you become anyone during video-calls"*](https://www.newsbytesapp.com/news/science/deep-live-cam-ai-impersonation-tool-goes-viral/story) - NewsBytes
- [*"OK, this viral AI live stream software is truly terrifying"*](https://www.creativebloq.com/ai/ok-this-viral-ai-live-stream-software-is-truly-terrifying) - Creative Bloq
- [*"Deepfake AI Tool Lets You Become Anyone in a Video Call With Single Photo"*](https://petapixel.com/2024/08/14/deep-live-cam-deepfake-ai-tool-lets-you-become-anyone-in-a-video-call-with-single-photo-mark-zuckerberg-jd-vance-elon-musk/) - PetaPixel
- [*"Deep-Live-Cam Uses AI to Transform Your Face in Real-Time, Celebrities Included"*](https://www.techeblog.com/deep-live-cam-ai-transform-face/) - TechEBlog
- [*"An AI tool that "makes you look like anyone" during a video call is going viral online"*](https://telegrafi.com/en/a-tool-that-makes-you-look-like-anyone-during-a-video-call-is-going-viral-on-the-Internet/) - Telegrafi
- [*"This Deepfake Tool Turning Images Into Livestreams is Topping the GitHub Charts"*](https://decrypt.co/244565/this-deepfake-tool-turning-images-into-livestreams-is-topping-the-github-charts) - Emerge
- [*"New Real-Time Face-Swapping AI Allows Anyone to Mimic Famous Faces"*](https://www.digitalmusicnews.com/2024/08/15/face-swapping-ai-real-time-mimic/) - Digital Music News
- [*"This real-time webcam deepfake tool raises alarms about the future of identity theft"*](https://www.diyphotography.net/this-real-time-webcam-deepfake-tool-raises-alarms-about-the-future-of-identity-theft/) - DIYPhotography
- [*"That's Crazy, Oh God. That's Fucking Freaky Dude... That's So Wild Dude"*](https://www.youtube.com/watch?time_continue=1074&v=py4Tc-Y8BcY) - SomeOrdinaryGamers
- [*"Alright look look look, now look chat, we can do any face we want to look like chat"*](https://www.youtube.com/live/mFsCe7AIxq8?feature=shared&t=2686) - IShowSpeed
- [*"They do a pretty good job matching poses, expression and even the lighting"*](https://www.youtube.com/watch?v=wnCghLjqv3s&t=551s) - TechLinked (LTT)
- [*"Als Sean Connery an der Redaktionskonferenz teilnahm"*](https://www.golem.de/news/deepfakes-als-sean-connery-an-der-redaktionskonferenz-teilnahm-2408-188172.html) - Golem.de (German)
- [*"What the F***! Why do I look like Vinny Jr? I look exactly like Vinny Jr!? No, this shit is crazy! Bro This is F*** Crazy! "*](https://youtu.be/JbUPRmXRUtE?t=3964) - IShowSpeed
- [**Ars Technica**](https://arstechnica.com/information-technology/2024/08/new-ai-tool-enables-real-time-face-swapping-on-webcams-raising-fraud-concerns/) - *"Deep-Live-Cam goes viral, allowing anyone to become a digital doppelganger"*
- [**Yahoo!**](https://www.yahoo.com/tech/ok-viral-ai-live-stream-080041056.html) - *"OK, this viral AI live stream software is truly terrifying"*
- [**CNN Brasil**](https://www.cnnbrasil.com.br/tecnologia/ia-consegue-clonar-rostos-na-webcam-entenda-funcionamento/) - *"AI can clone faces on webcam; understand how it works"*
- [**Bloomberg Technoz**](https://www.bloombergtechnoz.com/detail-news/71032/kenalan-dengan-teknologi-deep-live-cam-bisa-jadi-alat-menipu) - *"Get to know Deep Live Cam technology, it can be used as a tool for deception."*
- [**TrendMicro**](https://www.trendmicro.com/vinfo/gb/security/news/cyber-attacks/ai-vs-ai-deepfakes-and-ekyc) - *"AI vs AI: DeepFakes and eKYC"*
- [**PetaPixel**](https://petapixel.com/2024/08/14/deep-live-cam-deepfake-ai-tool-lets-you-become-anyone-in-a-video-call-with-single-photo-mark-zuckerberg-jd-vance-elon-musk/) - *"Deepfake AI Tool Lets You Become Anyone in a Video Call With Single Photo"*
- [**SomeOrdinaryGamers**](https://www.youtube.com/watch?time_continue=1074&v=py4Tc-Y8BcY) - *"That's Crazy, Oh God. That's Fucking Freaky Dude... That's So Wild Dude"*
- [**IShowSpeed**](https://www.youtube.com/live/mFsCe7AIxq8?feature=shared&t=2686) - *"Alright look look look, now look chat, we can do any face we want to look like chat"*
- [**TechLinked (Linus Tech Tips)**](https://www.youtube.com/watch?v=wnCghLjqv3s&t=551s) - *"They do a pretty good job matching poses, expression and even the lighting"*
- [**IShowSpeed**](https://youtu.be/JbUPRmXRUtE?t=3964) - *"What the F***! Why do I look like Vinny Jr? I look exactly like Vinny Jr!? No, this shit is crazy! Bro This is F*** Crazy!"*
## Credits
@@ -368,6 +361,7 @@ Looking for a CLI mode? Using the -s/--source argument will make the run program
- [vic4key](https://github.com/vic4key): For supporting/contributing to this project
- [kier007](https://github.com/kier007): for improving the user experience
- [qitianai](https://github.com/qitianai): for multi-lingual support
- [laurigates](https://github.com/laurigates): Decoupling stuffs to make everything faster!
- and [all developers](https://github.com/hacksider/Deep-Live-Cam/graphs/contributors) behind libraries used in this project.
- Footnote: Please be informed that the base author of the code is [s0md3v](https://github.com/s0md3v/roop)
- All the wonderful users who helped make this project go viral by starring the repo ❤️
+2 -1
View File
@@ -1,6 +1,7 @@
from typing import Any
import cv2
import modules.globals # Import the globals to check the color correction toggle
from modules.gpu_processing import gpu_cvt_color
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
@@ -19,7 +20,7 @@ def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
if has_frame and modules.globals.color_correction:
# Convert the frame color if necessary
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = gpu_cvt_color(frame, cv2.COLOR_BGR2RGB)
capture.release()
return frame if has_frame else None
+13 -10
View File
@@ -11,7 +11,11 @@ import platform
import signal
import shutil
import argparse
import torch
try:
import torch
HAS_TORCH = True
except ImportError:
HAS_TORCH = False
import onnxruntime
import tensorflow
@@ -21,11 +25,12 @@ import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
if HAS_TORCH and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
if HAS_TORCH:
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def parse_args() -> None:
@@ -34,7 +39,7 @@ def parse_args() -> None:
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer', 'face_enhancer_gpen256', 'face_enhancer_gpen512'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
@@ -81,11 +86,9 @@ def parse_args() -> None:
modules.globals.execution_threads = args.execution_threads
modules.globals.lang = args.lang
#for ENHANCER tumbler:
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
#for ENHANCER tumblers:
for enhancer_key in ('face_enhancer', 'face_enhancer_gpen256', 'face_enhancer_gpen512'):
modules.globals.fp_ui[enhancer_key] = enhancer_key in args.frame_processor
# translate deprecated args
if args.source_path_deprecated:
@@ -167,7 +170,7 @@ def limit_resources() -> None:
def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
if 'CUDAExecutionProvider' in modules.globals.execution_providers and HAS_TORCH:
torch.cuda.empty_cache()
+3 -2
View File
@@ -27,9 +27,10 @@ def get_face_analyser() -> Any:
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(
name='buffalo_l',
providers=modules.globals.execution_providers
providers=modules.globals.execution_providers,
allowed_modules=['detection', 'recognition']
)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
FACE_ANALYSER.prepare(ctx_id=0, det_size=(320, 320))
return FACE_ANALYSER
+1 -1
View File
@@ -50,7 +50,7 @@ headless: bool | None = None # Run without UI?
log_level: str = "error" # Logging level (e.g., 'debug', 'info', 'warning', 'error')
# Face Processor UI Toggles (Example)
fp_ui: Dict[str, bool] = {"face_enhancer": False}
fp_ui: Dict[str, bool] = {"face_enhancer": False, "face_enhancer_gpen256": False, "face_enhancer_gpen512": False}
# Face Swapper Specific Options
face_swapper_enabled: bool = True # General toggle for the swapper processor
+286
View File
@@ -0,0 +1,286 @@
# --- START OF FILE gpu_processing.py ---
"""
GPU-accelerated image processing using OpenCV CUDA (cv2.cuda.GpuMat).
Provides drop-in replacements for common cv2 functions. When OpenCV is built
with CUDA support the functions transparently upload → process → download via
GpuMat; otherwise they fall back to the regular CPU path so the rest of the
codebase never has to care whether CUDA is available.
Usage
-----
from modules.gpu_processing import (
gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted,
gpu_resize, gpu_cvt_color, gpu_flip,
is_gpu_accelerated,
)
"""
from __future__ import annotations
import cv2
import numpy as np
from typing import Tuple, Optional
# ---------------------------------------------------------------------------
# CUDA availability detection (evaluated once at import time)
# ---------------------------------------------------------------------------
CUDA_AVAILABLE: bool = False
try:
# cv2.cuda.GpuMat is only present when OpenCV is compiled with CUDA
_test_mat = cv2.cuda.GpuMat()
# Verify we have the required filter / image-processing functions
_has_gauss = hasattr(cv2.cuda, "createGaussianFilter")
_has_resize = hasattr(cv2.cuda, "resize")
_has_cvt = hasattr(cv2.cuda, "cvtColor")
if _has_gauss and _has_resize and _has_cvt:
CUDA_AVAILABLE = True
print("[gpu_processing] OpenCV CUDA support detected GPU-accelerated processing enabled.")
else:
missing = []
if not _has_gauss:
missing.append("createGaussianFilter")
if not _has_resize:
missing.append("resize")
if not _has_cvt:
missing.append("cvtColor")
print(f"[gpu_processing] cv2.cuda.GpuMat exists but missing: {', '.join(missing)} falling back to CPU.")
except Exception:
print("[gpu_processing] OpenCV CUDA not available using CPU fallback for all operations.")
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ensure_uint8(img: np.ndarray) -> np.ndarray:
"""Clip and convert to uint8 if necessary."""
if img.dtype != np.uint8:
return np.clip(img, 0, 255).astype(np.uint8)
return img
def _ksize_odd(ksize: Tuple[int, int]) -> Tuple[int, int]:
"""Ensure kernel dimensions are positive and odd (required by GaussianBlur)."""
kw = max(1, ksize[0] // 2 * 2 + 1) if ksize[0] > 0 else 0
kh = max(1, ksize[1] // 2 * 2 + 1) if ksize[1] > 0 else 0
return (kw, kh)
def _cv_type_for(img: np.ndarray) -> int:
"""Return the OpenCV type constant matching *img* (uint8 only)."""
channels = 1 if img.ndim == 2 else img.shape[2]
if channels == 1:
return cv2.CV_8UC1
elif channels == 3:
return cv2.CV_8UC3
elif channels == 4:
return cv2.CV_8UC4
return cv2.CV_8UC3 # fallback
# ---------------------------------------------------------------------------
# Public API Gaussian Blur
# ---------------------------------------------------------------------------
def gpu_gaussian_blur(
src: np.ndarray,
ksize: Tuple[int, int],
sigma_x: float,
sigma_y: float = 0,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.GaussianBlur`` with CUDA acceleration.
Parameters match ``cv2.GaussianBlur(src, ksize, sigmaX, sigmaY)``.
When *ksize* is ``(0, 0)`` OpenCV computes the kernel size from *sigma_x*.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
cv_type = _cv_type_for(src_u8)
ks = _ksize_odd(ksize) if ksize != (0, 0) else ksize
gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, ks, sigma_x, sigma_y)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_dst = gauss.apply(gpu_src)
return gpu_dst.download()
except cv2.error:
pass
return cv2.GaussianBlur(src, ksize, sigma_x, sigmaY=sigma_y)
# ---------------------------------------------------------------------------
# Public API addWeighted
# ---------------------------------------------------------------------------
def gpu_add_weighted(
src1: np.ndarray,
alpha: float,
src2: np.ndarray,
beta: float,
gamma: float,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.addWeighted`` with CUDA acceleration."""
if CUDA_AVAILABLE:
try:
s1 = _ensure_uint8(src1)
s2 = _ensure_uint8(src2)
g1 = cv2.cuda.GpuMat()
g2 = cv2.cuda.GpuMat()
g1.upload(s1)
g2.upload(s2)
gpu_dst = cv2.cuda.addWeighted(g1, alpha, g2, beta, gamma)
return gpu_dst.download()
except cv2.error:
pass
return cv2.addWeighted(src1, alpha, src2, beta, gamma)
# ---------------------------------------------------------------------------
# Public API Unsharp-mask sharpening
# ---------------------------------------------------------------------------
def gpu_sharpen(
src: np.ndarray,
strength: float,
sigma: float = 3,
) -> np.ndarray:
"""Unsharp-mask sharpening, optionally GPU-accelerated.
Equivalent to::
blurred = GaussianBlur(src, (0,0), sigma)
result = addWeighted(src, 1+strength, blurred, -strength, 0)
"""
if strength <= 0:
return src
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
cv_type = _cv_type_for(src_u8)
gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, (0, 0), sigma)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_blurred = gauss.apply(gpu_src)
gpu_sharp = cv2.cuda.addWeighted(gpu_src, 1.0 + strength, gpu_blurred, -strength, 0)
result = gpu_sharp.download()
return np.clip(result, 0, 255).astype(np.uint8)
except cv2.error:
pass
blurred = cv2.GaussianBlur(src, (0, 0), sigma)
sharpened = cv2.addWeighted(src, 1.0 + strength, blurred, -strength, 0)
return np.clip(sharpened, 0, 255).astype(np.uint8)
# ---------------------------------------------------------------------------
# Public API Resize
# ---------------------------------------------------------------------------
# Map common cv2 interpolation flags to their CUDA equivalents
_INTERP_MAP = {
cv2.INTER_NEAREST: cv2.INTER_NEAREST,
cv2.INTER_LINEAR: cv2.INTER_LINEAR,
cv2.INTER_CUBIC: cv2.INTER_CUBIC,
cv2.INTER_AREA: cv2.INTER_AREA,
cv2.INTER_LANCZOS4: cv2.INTER_LANCZOS4,
}
def gpu_resize(
src: np.ndarray,
dsize: Tuple[int, int],
fx: float = 0,
fy: float = 0,
interpolation: int = cv2.INTER_LINEAR,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.resize`` with CUDA acceleration.
Parameters match ``cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=...)``.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
interp = _INTERP_MAP.get(interpolation, cv2.INTER_LINEAR)
if dsize and dsize[0] > 0 and dsize[1] > 0:
gpu_dst = cv2.cuda.resize(gpu_src, dsize, interpolation=interp)
else:
gpu_dst = cv2.cuda.resize(gpu_src, (0, 0), fx=fx, fy=fy, interpolation=interp)
return gpu_dst.download()
except cv2.error:
pass
return cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=interpolation)
# ---------------------------------------------------------------------------
# Public API Color conversion
# ---------------------------------------------------------------------------
def gpu_cvt_color(
src: np.ndarray,
code: int,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.cvtColor`` with CUDA acceleration.
Parameters match ``cv2.cvtColor(src, code)``.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_dst = cv2.cuda.cvtColor(gpu_src, code)
return gpu_dst.download()
except cv2.error:
pass
return cv2.cvtColor(src, code)
# ---------------------------------------------------------------------------
# Public API Flip
# ---------------------------------------------------------------------------
def gpu_flip(
src: np.ndarray,
flip_code: int,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.flip`` with CUDA acceleration.
Parameters match ``cv2.flip(src, flipCode)``.
*flip_code*: 0 = vertical, 1 = horizontal, -1 = both.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_dst = cv2.cuda.flip(gpu_src, flip_code)
return gpu_dst.download()
except cv2.error:
pass
return cv2.flip(src, flip_code)
# ---------------------------------------------------------------------------
# Convenience: check at runtime whether GPU path is active
# ---------------------------------------------------------------------------
def is_gpu_accelerated() -> bool:
"""Return ``True`` when the CUDA path will be used."""
return CUDA_AVAILABLE
# --- END OF FILE gpu_processing.py ---
+6
View File
@@ -0,0 +1,6 @@
"""Shared path constants for the Deep-Live-Cam project."""
import os
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
MODELS_DIR = os.path.join(ROOT_DIR, "models")
+2 -1
View File
@@ -3,6 +3,7 @@ import opennsfw2
from PIL import Image
import cv2 # Add OpenCV import
import modules.globals # Import globals to access the color correction toggle
from modules.gpu_processing import gpu_cvt_color
from modules.typing import Frame
@@ -14,7 +15,7 @@ model = None
def predict_frame(target_frame: Frame) -> bool:
# Convert the frame to RGB before processing if color correction is enabled
if modules.globals.color_correction:
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
target_frame = gpu_cvt_color(target_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
+145
View File
@@ -0,0 +1,145 @@
"""Shared ONNX-based face enhancement utilities for GPEN-BFR models.
Provides session creation, pre/post processing, and the core
enhance-face-via-ONNX pipeline.
"""
import os
import platform
import threading
from typing import Any
import cv2
import numpy as np
import onnxruntime
import modules.globals
IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64"
# Limit concurrent ONNX calls to avoid VRAM exhaustion on multi-face frames
THREAD_SEMAPHORE = threading.Semaphore(min(max(1, (os.cpu_count() or 1)), 8))
def create_onnx_session(model_path: str) -> onnxruntime.InferenceSession:
"""Create an ONNX Runtime session using the configured execution providers."""
providers = modules.globals.execution_providers
session = onnxruntime.InferenceSession(model_path, providers=providers)
return session
def warmup_session(session: onnxruntime.InferenceSession) -> None:
"""Run a dummy inference pass to trigger JIT / compile caching."""
try:
input_feed = {
inp.name: np.zeros(
[d if isinstance(d, int) and d > 0 else 1 for d in inp.shape],
dtype=np.float32,
)
for inp in session.get_inputs()
}
session.run(None, input_feed)
except Exception as e:
print(f"ONNX enhancer warmup skipped (non-fatal): {e}")
def preprocess_face(face_img: np.ndarray, input_size: int) -> np.ndarray:
"""Resize, normalize, and convert a BGR face crop to ONNX input blob.
GPEN-BFR expects [1, 3, H, W] float32 in RGB, normalized to [-1, 1].
"""
resized = cv2.resize(face_img, (input_size, input_size), interpolation=cv2.INTER_LINEAR)
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
blob = rgb.astype(np.float32) / 255.0 * 2.0 - 1.0
blob = np.transpose(blob, (2, 0, 1))[np.newaxis, ...]
return blob
def postprocess_face(output: np.ndarray) -> np.ndarray:
"""Convert ONNX output [1, 3, H, W] float32 back to BGR uint8 image."""
img = output[0].transpose(1, 2, 0)
img = ((img + 1.0) / 2.0 * 255.0)
img = np.clip(img, 0, 255).astype(np.uint8)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
return img
def _get_face_affine(face: Any, input_size: int):
"""Compute affine transform to align a face to GPEN input space.
Returns (M, inv_M) — forward and inverse affine matrices.
"""
template = np.array([
[0.31556875, 0.4615741],
[0.68262291, 0.4615741],
[0.50009375, 0.6405054],
[0.34947187, 0.8246919],
[0.65343645, 0.8246919],
], dtype=np.float32) * input_size
landmarks = None
if hasattr(face, "kps") and face.kps is not None:
landmarks = face.kps.astype(np.float32)
elif hasattr(face, "landmark_2d_106") and face.landmark_2d_106 is not None:
lm106 = face.landmark_2d_106
landmarks = np.array([
lm106[38], # left eye
lm106[88], # right eye
lm106[86], # nose tip
lm106[52], # left mouth
lm106[61], # right mouth
], dtype=np.float32)
if landmarks is None or len(landmarks) < 5:
return None, None
M = cv2.estimateAffinePartial2D(landmarks, template, method=cv2.LMEDS)[0]
if M is None:
return None, None
inv_M = cv2.invertAffineTransform(M)
return M, inv_M
def enhance_face_onnx(
frame: np.ndarray,
face: Any,
session: onnxruntime.InferenceSession,
input_size: int,
) -> np.ndarray:
"""Enhance a single face in the frame using an ONNX face restoration model."""
M, inv_M = _get_face_affine(face, input_size)
if M is None:
return frame
face_crop = cv2.warpAffine(
frame, M, (input_size, input_size),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
)
blob = preprocess_face(face_crop, input_size)
with THREAD_SEMAPHORE:
output = session.run(None, {session.get_inputs()[0].name: blob})[0]
enhanced = postprocess_face(output)
# Create mask for blending (feathered edges)
mask = np.ones((input_size, input_size), dtype=np.float32)
border = max(1, input_size // 16)
mask[:border, :] = np.linspace(0, 1, border)[:, np.newaxis]
mask[-border:, :] = np.linspace(1, 0, border)[:, np.newaxis]
mask[:, :border] = np.minimum(mask[:, :border], np.linspace(0, 1, border)[np.newaxis, :])
mask[:, -border:] = np.minimum(mask[:, -border:], np.linspace(1, 0, border)[np.newaxis, :])
h, w = frame.shape[:2]
warped_enhanced = cv2.warpAffine(
enhanced, inv_M, (w, h),
flags=cv2.INTER_LINEAR, borderValue=(0, 0, 0),
)
warped_mask = cv2.warpAffine(
mask, inv_M, (w, h),
flags=cv2.INTER_LINEAR, borderValue=0,
)
mask_3ch = warped_mask[:, :, np.newaxis]
result = (warped_enhanced.astype(np.float32) * mask_3ch +
frame.astype(np.float32) * (1.0 - mask_3ch))
return np.clip(result, 0, 255).astype(np.uint8)
+9
View File
@@ -17,8 +17,17 @@ FRAME_PROCESSORS_INTERFACE = [
'process_video'
]
ALLOWED_PROCESSORS = {
'face_swapper',
'face_enhancer',
'face_enhancer_gpen256',
'face_enhancer_gpen512'
}
def load_frame_processor_module(frame_processor: str) -> Any:
if frame_processor not in ALLOWED_PROCESSORS:
print(f"Frame processor {frame_processor} is not allowed")
sys.exit()
try:
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
for method_name in FRAME_PROCESSORS_INTERFACE:
+270 -104
View File
@@ -1,20 +1,20 @@
# --- START OF FILE face_enhancer.py ---
# Uses ONNX Runtime for GFPGAN face enhancement (no torch/gfpgan dependency)
from typing import Any, List
import cv2
import threading
import gfpgan
import numpy as np
import os
import platform
import torch # Make sure torch is imported
import onnxruntime
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.face_analyser import get_one_face, get_many_faces
from modules.typing import Frame, Face
from modules.utilities import (
conditional_download,
is_image,
is_video,
)
@@ -29,15 +29,29 @@ models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
# Standard FFHQ 5-point face template for 512x512 resolution
# Points: left_eye, right_eye, nose, left_mouth, right_mouth
FFHQ_TEMPLATE_512 = np.array(
[
[192.98138, 239.94708],
[318.90277, 240.19366],
[256.63416, 314.01935],
[201.26117, 371.41043],
[313.08905, 371.15118],
],
dtype=np.float32,
)
def pre_check() -> bool:
download_directory_path = models_dir
conditional_download(
download_directory_path,
[
"https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth"
],
)
model_path = os.path.join(models_dir, "gfpgan-1024.onnx")
if not os.path.exists(model_path):
update_status(
f"GFPGAN ONNX model not found at {model_path}. "
"Please place gfpgan-1024.onnx in the models folder.",
NAME,
)
return False
return True
@@ -50,108 +64,257 @@ def pre_start() -> bool:
return True
def get_face_enhancer() -> Any:
def get_face_enhancer() -> onnxruntime.InferenceSession:
"""
Initializes and returns the GFPGAN face enhancer instance,
prioritizing CUDA, then MPS (Mac), then CPU.
Initializes and returns the GFPGAN ONNX Runtime inference session,
using the execution providers configured in modules.globals.
"""
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
device = None
try:
# Priority 1: CUDA
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"{NAME}: Using CUDA device.")
# Priority 2: MPS (Mac Silicon)
elif platform.system() == "Darwin" and torch.backends.mps.is_available():
device = torch.device("mps")
print(f"{NAME}: Using MPS device.")
# Priority 3: CPU
else:
device = torch.device("cpu")
print(f"{NAME}: Using CPU device.")
model_path = os.path.join(models_dir, "gfpgan-1024.onnx")
FACE_ENHANCER = gfpgan.GFPGANer(
model_path=model_path,
upscale=1, # upscale=1 means enhancement only, no resizing
arch='clean',
channel_multiplier=2,
bg_upsampler=None,
device=device
if not os.path.exists(model_path):
raise FileNotFoundError(
f"{NAME}: Model not found at {model_path}"
)
print(f"{NAME}: GFPGANer initialized successfully on {device}.")
try:
providers = modules.globals.execution_providers
session_options = onnxruntime.SessionOptions()
session_options.graph_optimization_level = (
onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
)
FACE_ENHANCER = onnxruntime.InferenceSession(
model_path,
sess_options=session_options,
providers=providers,
)
input_info = FACE_ENHANCER.get_inputs()[0]
output_info = FACE_ENHANCER.get_outputs()[0]
active_providers = FACE_ENHANCER.get_providers()
print(
f"{NAME}: GFPGAN ONNX model loaded successfully."
)
print(
f"{NAME}: Input: {input_info.name}, "
f"shape: {input_info.shape}, type: {input_info.type}"
)
print(
f"{NAME}: Output: {output_info.name}, "
f"shape: {output_info.shape}, type: {output_info.type}"
)
print(f"{NAME}: Active providers: {active_providers}")
except Exception as e:
print(f"{NAME}: Error initializing GFPGANer: {e}")
# Fallback to CPU if initialization with GPU fails for some reason
if device is not None and device.type != 'cpu':
print(f"{NAME}: Falling back to CPU due to error.")
try:
device = torch.device("cpu")
FACE_ENHANCER = gfpgan.GFPGANer(
model_path=model_path,
upscale=1,
arch='clean',
channel_multiplier=2,
bg_upsampler=None,
device=device
)
print(f"{NAME}: GFPGANer initialized successfully on CPU after fallback.")
except Exception as fallback_e:
print(f"{NAME}: FATAL: Could not initialize GFPGANer even on CPU: {fallback_e}")
FACE_ENHANCER = None # Ensure it's None if totally failed
else:
# If it failed even on the first CPU attempt or device was already CPU
print(f"{NAME}: FATAL: Could not initialize GFPGANer on CPU: {e}")
FACE_ENHANCER = None # Ensure it's None if totally failed
print(f"{NAME}: Error loading GFPGAN ONNX model: {e}")
FACE_ENHANCER = None
raise RuntimeError(
f"{NAME}: Failed to load GFPGAN ONNX model: {e}"
)
# Check if enhancer is still None after attempting initialization
if FACE_ENHANCER is None:
raise RuntimeError(f"{NAME}: Failed to initialize GFPGANer. Check logs for errors.")
raise RuntimeError(
f"{NAME}: Failed to initialize GFPGAN ONNX session. Check logs."
)
return FACE_ENHANCER
def _align_face(
frame: Frame, landmarks_5: np.ndarray, output_size: int
) -> tuple:
"""
Align and crop a face from the frame using 5-point landmarks and the
standard FFHQ template.
Returns:
(aligned_face, affine_matrix) or (None, None) on failure.
"""
# Scale the 512-base template to the desired output size
scale = output_size / 512.0
template = FFHQ_TEMPLATE_512 * scale
# Estimate a similarity transform (4 DOF: rotation, scale, tx, ty)
affine_matrix, _ = cv2.estimateAffinePartial2D(
landmarks_5, template, method=cv2.LMEDS
)
if affine_matrix is None:
return None, None
# Warp the face to the aligned position
aligned_face = cv2.warpAffine(
frame,
affine_matrix,
(output_size, output_size),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(135, 133, 132),
)
return aligned_face, affine_matrix
def _paste_back(
frame: Frame,
enhanced_face: np.ndarray,
affine_matrix: np.ndarray,
output_size: int,
) -> Frame:
"""
Paste an enhanced (aligned) face back onto the original frame using the
inverse affine transform with feathered-edge blending.
"""
h, w = frame.shape[:2]
# Inverse the affine warp
inv_matrix = cv2.invertAffineTransform(affine_matrix)
inv_restored = cv2.warpAffine(
enhanced_face,
inv_matrix,
(w, h),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
)
# Build a soft feathered mask in aligned space for edge blending
face_mask = np.ones((output_size, output_size), dtype=np.float32)
# Feather the border (5 % of the size on each edge)
border = max(1, int(output_size * 0.05))
ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32)
ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32)
# Top / bottom rows
face_mask[:border, :] *= ramp_up[:, None]
face_mask[-border:, :] *= ramp_down[:, None]
# Left / right columns
face_mask[:, :border] *= ramp_up[None, :]
face_mask[:, -border:] *= ramp_down[None, :]
# Expand to 3-channel
face_mask_3c = np.stack([face_mask] * 3, axis=-1)
# Warp mask back to original frame space
inv_mask = cv2.warpAffine(
face_mask_3c,
inv_matrix,
(w, h),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
)
inv_mask = np.clip(inv_mask, 0.0, 1.0)
# Alpha-blend
result = (
frame.astype(np.float32) * (1.0 - inv_mask)
+ inv_restored.astype(np.float32) * inv_mask
)
return np.clip(result, 0, 255).astype(np.uint8)
def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray:
"""
Convert an aligned BGR uint8 face image to the ONNX model input tensor.
Format: NCHW float32, normalised to [-1, 1].
"""
# BGR -> RGB
rgb = cv2.cvtColor(aligned_face, cv2.COLOR_BGR2RGB).astype(np.float32)
# [0, 255] -> [0, 1] -> [-1, 1]
rgb = rgb / 255.0
rgb = (rgb - 0.5) / 0.5
# HWC -> CHW, add batch dim
chw = np.transpose(rgb, (2, 0, 1))
return np.expand_dims(chw, axis=0) # shape: (1, 3, H, W)
def _postprocess_face(output: np.ndarray) -> np.ndarray:
"""
Convert the ONNX model output tensor back to a BGR uint8 image.
Expects input in NCHW format with values in [-1, 1].
"""
face = np.squeeze(output) # remove batch dim -> (3, H, W)
face = np.transpose(face, (1, 2, 0)) # CHW -> HWC
# [-1, 1] -> [0, 1] -> [0, 255]
face = (face + 1.0) / 2.0
face = np.clip(face * 255.0, 0, 255).astype(np.uint8)
# RGB -> BGR
return cv2.cvtColor(face, cv2.COLOR_RGB2BGR)
def enhance_face(temp_frame: Frame) -> Frame:
"""Enhances faces in a single frame using the global GFPGANer instance."""
# Ensure enhancer is ready
enhancer = get_face_enhancer()
"""Enhances all faces in a frame using the GFPGAN ONNX model."""
session = get_face_enhancer()
# Determine model input resolution from the session metadata
input_info = session.get_inputs()[0]
input_name = input_info.name
input_shape = input_info.shape # e.g. [1, 3, 512, 512]
# Safely extract input size (handle dynamic / symbolic dimensions)
try:
with THREAD_SEMAPHORE:
# The enhance method returns: _, restored_faces, restored_img
_, _, restored_img = enhancer.enhance(
temp_frame,
has_aligned=False, # Assume faces are not pre-aligned
only_center_face=False, # Enhance all detected faces
paste_back=True # Paste enhanced faces back onto the original image
)
# GFPGAN might return None if no face is detected or an error occurs
if restored_img is None:
# print(f"{NAME}: Warning: GFPGAN enhancement returned None. Returning original frame.")
return temp_frame
return restored_img
except Exception as e:
print(f"{NAME}: Error during face enhancement: {e}")
# Return the original frame in case of error during enhancement
align_size = int(input_shape[2])
if align_size <= 0:
align_size = 512
except (ValueError, TypeError, IndexError):
align_size = 512
# Detect faces using InsightFace (already a project dependency)
faces = get_many_faces(temp_frame)
if not faces:
return temp_frame
result_frame = temp_frame.copy()
for face in faces:
# Need the 5-point key-points for alignment
if not hasattr(face, "kps") or face.kps is None:
continue
landmarks_5 = face.kps.astype(np.float32)
if landmarks_5.shape[0] < 5:
continue
# Align / crop the face at the model's INPUT resolution
aligned_face, affine_matrix = _align_face(
temp_frame, landmarks_5, output_size=align_size
)
if aligned_face is None or affine_matrix is None:
continue
try:
with THREAD_SEMAPHORE:
input_tensor = _preprocess_face(aligned_face)
output_tensor = session.run(None, {input_name: input_tensor})[0]
enhanced_bgr = _postprocess_face(output_tensor)
# The model may output at a different resolution than its input
# (e.g. input 512x512 → output 1024x1024). Resize the enhanced
# face back to the alignment size so the inverse affine maps
# correctly.
eh, ew = enhanced_bgr.shape[:2]
if eh != align_size or ew != align_size:
enhanced_bgr = cv2.resize(
enhanced_bgr,
(align_size, align_size),
interpolation=cv2.INTER_LANCZOS4,
)
# Paste enhanced face back onto the frame
result_frame = _paste_back(
result_frame, enhanced_bgr, affine_matrix, output_size=align_size
)
except Exception as e:
print(f"{NAME}: Error enhancing a face: {e}")
continue
return result_frame
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
"""Processes a frame: enhances face if detected."""
# We don't strictly need source_face for enhancement only
# Check if any face exists to potentially save processing time, though GFPGAN also does detection.
# For simplicity and ensuring enhancement is attempted if possible, we can rely on enhance_face.
# target_face = get_one_face(temp_frame) # This gets only ONE face
# If you want to enhance ONLY if a face is detected by your *own* analyser first:
# has_face = get_one_face(temp_frame) is not None # Or use get_many_faces
# if has_face:
# temp_frame = enhance_face(temp_frame)
# else: # Enhance regardless, let GFPGAN handle detection
temp_frame = enhance_face(temp_frame)
return temp_frame
@@ -162,14 +325,18 @@ def process_frames(
"""Processes multiple frames from file paths."""
for temp_frame_path in temp_frame_paths:
if not os.path.exists(temp_frame_path):
print(f"{NAME}: Warning: Frame path not found {temp_frame_path}, skipping.")
print(
f"{NAME}: Warning: Frame path not found {temp_frame_path}, skipping."
)
if progress:
progress.update(1)
continue
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
print(f"{NAME}: Warning: Failed to read frame {temp_frame_path}, skipping.")
print(
f"{NAME}: Warning: Failed to read frame {temp_frame_path}, skipping."
)
if progress:
progress.update(1)
continue
@@ -180,7 +347,9 @@ def process_frames(
progress.update(1)
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
def process_image(
source_path: str | None, target_path: str, output_path: str
) -> None:
"""Processes a single image file."""
target_frame = cv2.imread(target_path)
if target_frame is None:
@@ -191,16 +360,13 @@ def process_image(source_path: str | None, target_path: str, output_path: str) -
print(f"{NAME}: Enhanced image saved to {output_path}")
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
def process_video(
source_path: str | None, temp_frame_paths: List[str]
) -> None:
"""Processes video frames using the frame processor core."""
# source_path might be optional depending on how process_video is called
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
modules.processors.frame.core.process_video(
source_path, temp_frame_paths, process_frames
)
# Optional: Keep process_frame_v2 if it's used elsewhere, otherwise it's redundant
# def process_frame_v2(temp_frame: Frame) -> Frame:
# target_face = get_one_face(temp_frame)
# if target_face:
# temp_frame = enhance_face(temp_frame)
# return temp_frame
# --- END OF FILE face_enhancer.py ---
# --- END OF FILE face_enhancer.py ---
@@ -0,0 +1,125 @@
"""GPEN-BFR-256 face enhancer — ONNX-based face restoration at 256x256."""
from typing import Any, List
import os
import threading
import cv2
import numpy as np
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
from modules.utilities import (
is_image,
is_video,
)
from modules.processors.frame._onnx_enhancer import (
create_onnx_session,
warmup_session,
enhance_face_onnx,
)
NAME = "DLC.FACE-ENHANCER-GPEN256"
INPUT_SIZE = 256
MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-256.onnx"
MODEL_FILE = "GPEN-BFR-256.onnx"
ENHANCER = None
THREAD_LOCK = threading.Lock()
abs_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
def pre_check() -> bool:
model_path = os.path.join(models_dir, MODEL_FILE)
if not os.path.exists(model_path):
update_status(f"Downloading {MODEL_FILE}...", NAME)
from modules.utilities import conditional_download
conditional_download(models_dir, [MODEL_URL])
return True
def pre_start() -> bool:
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status("Select an image or video for target path.", NAME)
return False
return True
def get_enhancer() -> Any:
global ENHANCER
with THREAD_LOCK:
if ENHANCER is None:
model_path = os.path.join(models_dir, MODEL_FILE)
if not os.path.exists(model_path):
from modules.utilities import conditional_download
conditional_download(models_dir, [MODEL_URL])
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
print(f"{NAME}: Loading ONNX model from {model_path}")
ENHANCER = create_onnx_session(model_path)
warmup_session(ENHANCER)
print(f"{NAME}: Model loaded successfully.")
return ENHANCER
def enhance_face(temp_frame: Frame, face: Face) -> Frame:
try:
session = get_enhancer()
except Exception as e:
print(f"{NAME}: {e}")
return temp_frame
try:
return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE)
except Exception as e:
print(f"{NAME}: Error during face enhancement: {e}")
return temp_frame
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face is None:
return temp_frame
return enhance_face(temp_frame, target_face)
def process_frame_v2(temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame, target_face)
return temp_frame
def process_frames(
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
) -> None:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
if progress:
progress.update(1)
continue
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
if progress:
progress.update(1)
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
if target_frame is None:
print(f"{NAME}: Error: Failed to read target image {target_path}")
return
result_frame = process_frame(None, target_frame)
cv2.imwrite(output_path, result_frame)
print(f"{NAME}: Enhanced image saved to {output_path}")
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
@@ -0,0 +1,125 @@
"""GPEN-BFR-512 face enhancer — ONNX-based face restoration at 512x512."""
from typing import Any, List
import os
import threading
import cv2
import numpy as np
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
from modules.utilities import (
is_image,
is_video,
)
from modules.processors.frame._onnx_enhancer import (
create_onnx_session,
warmup_session,
enhance_face_onnx,
)
NAME = "DLC.FACE-ENHANCER-GPEN512"
INPUT_SIZE = 512
MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-512.onnx"
MODEL_FILE = "GPEN-BFR-512.onnx"
ENHANCER = None
THREAD_LOCK = threading.Lock()
abs_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
def pre_check() -> bool:
model_path = os.path.join(models_dir, MODEL_FILE)
if not os.path.exists(model_path):
update_status(f"Downloading {MODEL_FILE}...", NAME)
from modules.utilities import conditional_download
conditional_download(models_dir, [MODEL_URL])
return True
def pre_start() -> bool:
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status("Select an image or video for target path.", NAME)
return False
return True
def get_enhancer() -> Any:
global ENHANCER
with THREAD_LOCK:
if ENHANCER is None:
model_path = os.path.join(models_dir, MODEL_FILE)
if not os.path.exists(model_path):
from modules.utilities import conditional_download
conditional_download(models_dir, [MODEL_URL])
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
print(f"{NAME}: Loading ONNX model from {model_path}")
ENHANCER = create_onnx_session(model_path)
warmup_session(ENHANCER)
print(f"{NAME}: Model loaded successfully.")
return ENHANCER
def enhance_face(temp_frame: Frame, face: Face) -> Frame:
try:
session = get_enhancer()
except Exception as e:
print(f"{NAME}: {e}")
return temp_frame
try:
return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE)
except Exception as e:
print(f"{NAME}: Error during face enhancement: {e}")
return temp_frame
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face is None:
return temp_frame
return enhance_face(temp_frame, target_face)
def process_frame_v2(temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame, target_face)
return temp_frame
def process_frames(
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
) -> None:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
if progress:
progress.update(1)
continue
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
if progress:
progress.update(1)
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
if target_frame is None:
print(f"{NAME}: Error: Failed to read target image {target_path}")
return
result_frame = process_frame(None, target_frame)
cv2.imwrite(output_path, result_frame)
print(f"{NAME}: Enhanced image saved to {output_path}")
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
+55 -47
View File
@@ -2,27 +2,35 @@ import cv2
import numpy as np
from modules.typing import Face, Frame
import modules.globals
from modules.gpu_processing import gpu_gaussian_blur, gpu_resize, gpu_cvt_color
def apply_color_transfer(source, target):
"""
Apply color transfer from target to source image
Apply color transfer from target to source image using LAB color space.
Uses float32 throughout for performance (sufficient precision for 8-bit images).
"""
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
# Convert to float32 [0,1] range for proper LAB conversion
source_f32 = source.astype(np.float32) / 255.0
target_f32 = target.astype(np.float32) / 255.0
source_mean, source_std = cv2.meanStdDev(source)
target_mean, target_std = cv2.meanStdDev(target)
source_lab = cv2.cvtColor(source_f32, cv2.COLOR_BGR2LAB)
target_lab = cv2.cvtColor(target_f32, cv2.COLOR_BGR2LAB)
# Reshape mean and std to be broadcastable
source_mean = source_mean.reshape(1, 1, 3)
source_std = source_std.reshape(1, 1, 3)
target_mean = target_mean.reshape(1, 1, 3)
target_std = target_std.reshape(1, 1, 3)
source_mean, source_std = cv2.meanStdDev(source_lab)
target_mean, target_std = cv2.meanStdDev(target_lab)
# Perform the color transfer
source = (source - source_mean) * (target_std / source_std) + target_mean
# Reshape mean and std to be broadcastable (already float64 from meanStdDev, cast to f32)
source_mean = source_mean.reshape(1, 1, 3).astype(np.float32)
source_std = np.maximum(source_std.reshape(1, 1, 3), 1e-6).astype(np.float32)
target_mean = target_mean.reshape(1, 1, 3).astype(np.float32)
target_std = target_std.reshape(1, 1, 3).astype(np.float32)
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
# Perform the color transfer in LAB space
result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean
# Convert back to BGR and uint8
result_bgr = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR)
return np.clip(result_bgr * 255.0, 0, 255).astype(np.uint8)
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
@@ -47,22 +55,20 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
# Create a slightly larger convex hull for padding
face_outline = landmarks[0:33]
hull = cv2.convexHull(face_outline)
hull_padded = []
for point in hull:
x, y = point[0]
center = np.mean(face_outline, axis=0)
direction = np.array([x, y]) - center
direction = direction / np.linalg.norm(direction)
padded_point = np.array([x, y]) + direction * padding
hull_padded.append(padded_point)
hull_padded = np.array(hull_padded, dtype=np.int32)
# Vectorized hull padding — expand each point outward from center
center = np.mean(face_outline, axis=0, dtype=np.float32)
hull_pts = hull.reshape(-1, 2).astype(np.float32)
directions = hull_pts - center
norms = np.linalg.norm(directions, axis=1, keepdims=True)
norms = np.maximum(norms, 1e-6) # avoid division by zero
directions /= norms
hull_padded = (hull_pts + directions * padding).astype(np.int32)
# Fill the padded convex hull
cv2.fillConvexPoly(mask, hull_padded, 255)
# Smooth the mask edges
mask = cv2.GaussianBlur(mask, (5, 5), 3)
# Smooth the mask edges (GPU-accelerated when available)
mask = gpu_gaussian_blur(mask, (5, 5), 3)
return mask
@@ -123,8 +129,8 @@ def create_lower_mouth_mask(
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
# Apply Gaussian blur to soften the mask edges
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
# Apply Gaussian blur to soften the mask edges (GPU-accelerated when available)
mask_roi = gpu_gaussian_blur(mask_roi, (15, 15), 5)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
@@ -192,8 +198,8 @@ def create_eyes_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple
cv2.ellipse(mask_roi, left_center, left_axes, 0, 0, 360, 255, -1)
cv2.ellipse(mask_roi, right_center, right_axes, 0, 0, 360, 255, -1)
# Apply Gaussian blur to soften mask edges
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
# Apply Gaussian blur to soften mask edges (GPU-accelerated when available)
mask_roi = gpu_gaussian_blur(mask_roi, (15, 15), 5)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
@@ -374,15 +380,15 @@ def create_eyebrows_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, t
left_shape = create_curved_eyebrow(left_local)
right_shape = create_curved_eyebrow(right_local)
# Apply multi-stage blurring for natural feathering
# Apply multi-stage blurring for natural feathering (GPU-accelerated when available)
# First, strong Gaussian blur for initial softening
mask_roi = cv2.GaussianBlur(mask_roi, (21, 21), 7)
mask_roi = gpu_gaussian_blur(mask_roi, (21, 21), 7)
# Second, medium blur for transition areas
mask_roi = cv2.GaussianBlur(mask_roi, (11, 11), 3)
mask_roi = gpu_gaussian_blur(mask_roi, (11, 11), 3)
# Finally, light blur for fine details
mask_roi = cv2.GaussianBlur(mask_roi, (5, 5), 1)
mask_roi = gpu_gaussian_blur(mask_roi, (5, 5), 1)
# Normalize mask values
mask_roi = cv2.normalize(mask_roi, None, 0, 255, cv2.NORM_MINMAX)
@@ -405,7 +411,7 @@ def create_eyebrows_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, t
right_local = right_eyebrow - [min_x, min_y]
cv2.fillPoly(mask_roi, [left_local.astype(np.int32)], 255)
cv2.fillPoly(mask_roi, [right_local.astype(np.int32)], 255)
mask_roi = cv2.GaussianBlur(mask_roi, (21, 21), 7)
mask_roi = gpu_gaussian_blur(mask_roi, (21, 21), 7)
mask[min_y:max_y, min_x:max_x] = mask_roi
eyebrows_cutout = frame[min_y:max_y, min_x:max_x].copy()
eyebrows_polygon = np.vstack([left_eyebrow, right_eyebrow]).astype(np.int32)
@@ -433,11 +439,11 @@ def apply_mask_area(
return frame
try:
resized_cutout = cv2.resize(cutout, (box_width, box_height))
resized_cutout = gpu_resize(cutout, (box_width, box_height))
roi = frame[min_y:max_y, min_x:max_x]
if roi.shape != resized_cutout.shape:
resized_cutout = cv2.resize(
resized_cutout = gpu_resize(
resized_cutout, (roi.shape[1], roi.shape[0])
)
@@ -457,8 +463,8 @@ def apply_mask_area(
adjusted_polygon = polygon - [min_x, min_y]
cv2.fillPoly(polygon_mask, [adjusted_polygon], 255)
# Apply strong initial feathering
polygon_mask = cv2.GaussianBlur(polygon_mask, (21, 21), 7)
# Apply strong initial feathering (GPU-accelerated when available)
polygon_mask = gpu_gaussian_blur(polygon_mask, (21, 21), 7)
# Apply additional feathering
feather_amount = min(
@@ -467,26 +473,28 @@ def apply_mask_area(
box_height // modules.globals.mask_feather_ratio,
)
feathered_mask = cv2.GaussianBlur(
polygon_mask.astype(float), (0, 0), feather_amount
polygon_mask.astype(np.float32), (0, 0), feather_amount
)
feathered_mask = feathered_mask / feathered_mask.max()
max_val = feathered_mask.max()
if max_val > 1e-6:
feathered_mask *= np.float32(1.0 / max_val)
# Apply additional smoothing to the mask edges
feathered_mask = cv2.GaussianBlur(feathered_mask, (5, 5), 1)
face_mask_roi = face_mask[min_y:max_y, min_x:max_x]
combined_mask = feathered_mask * (face_mask_roi / 255.0)
combined_mask = feathered_mask * (face_mask_roi.astype(np.float32) * np.float32(1.0 / 255.0))
combined_mask = combined_mask[:, :, np.newaxis]
combined_mask_3ch = combined_mask[:, :, np.newaxis]
inv_mask = np.float32(1.0) - combined_mask_3ch
blended = (
color_corrected_area * combined_mask + roi * (1 - combined_mask)
color_corrected_area * combined_mask_3ch + roi * inv_mask
).astype(np.uint8)
# Apply face mask to blended result
face_mask_3channel = (
np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0
)
final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel)
face_mask_f32 = face_mask_roi[:, :, np.newaxis].astype(np.float32) * np.float32(1.0 / 255.0)
face_mask_3channel = np.broadcast_to(face_mask_f32, blended.shape)
final_blend = blended * face_mask_3channel + roi * (np.float32(1.0) - face_mask_3channel)
frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8)
except Exception as e:
+41 -47
View File
@@ -15,6 +15,7 @@ from modules.utilities import (
is_video,
)
from modules.cluster_analysis import find_closest_centroid
from modules.gpu_processing import gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, gpu_resize, gpu_cvt_color
import os
from collections import deque
import time
@@ -43,11 +44,21 @@ models_dir = os.path.join(
)
def pre_check() -> bool:
download_directory_path = abs_dir
# Use models_dir instead of abs_dir to save to the correct location
download_directory_path = models_dir
# Make sure the models directory exists, catch permission errors if they occur
try:
os.makedirs(download_directory_path, exist_ok=True)
except OSError as e:
logging.error(f"Failed to create directory {download_directory_path} due to permission error: {e}")
return False
# Use the direct download URL from Hugging Face
conditional_download(
download_directory_path,
[
"https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx"
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx"
],
)
return True
@@ -126,7 +137,9 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return temp_frame
# Store a copy of the original frame before swapping for opacity blending
original_frame = temp_frame.copy()
opacity = getattr(modules.globals, "opacity", 1.0)
opacity = max(0.0, min(1.0, opacity))
original_frame = temp_frame if opacity >= 1.0 else temp_frame.copy()
# Pre-swap Input Check with optimization
if temp_frame.dtype != np.uint8:
@@ -158,7 +171,7 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
# print(f"Warning: Swapped frame shape {swapped_frame_raw.shape} differs from input {temp_frame.shape}.") # Debug
# Attempt resize (might distort if aspect ratio changed, but better than crashing)
try:
swapped_frame_raw = cv2.resize(swapped_frame_raw, (temp_frame.shape[1], temp_frame.shape[0]))
swapped_frame_raw = gpu_resize(swapped_frame_raw, (temp_frame.shape[1], temp_frame.shape[0]))
except Exception as resize_e:
# print(f"Error resizing swapped frame: {resize_e}") # Debug
return original_frame
@@ -229,19 +242,13 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
except Exception as e:
print(f"Poisson blending failed: {e}")
# Apply opacity blend between the original frame and the swapped frame
opacity = getattr(modules.globals, "opacity", 1.0)
# Ensure opacity is within valid range [0.0, 1.0]
opacity = max(0.0, min(1.0, opacity))
# Apply opacity blend between the original frame and the swapped frame
if opacity >= 1.0:
return swapped_frame.astype(np.uint8)
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
# Ensure both frames are uint8 before blending
final_swapped_frame = cv2.addWeighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
# Ensure final frame is uint8 after blending (addWeighted should preserve it, but belt-and-suspenders)
final_swapped_frame = final_swapped_frame.astype(np.uint8)
return final_swapped_frame
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
return final_swapped_frame.astype(np.uint8)
# --- START: Mac M1-M5 Optimized Face Detection ---
@@ -312,17 +319,10 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
face_region = processed_frame[y1:y2, x1:x2]
if face_region.size == 0: continue
# Apply sharpening with optimized parameters for Apple Silicon
# Apply sharpening (GPU-accelerated when CUDA OpenCV is available)
try:
# Use smaller sigma for faster processing on Apple Silicon
sigma = 2 if IS_APPLE_SILICON else 3
blurred = cv2.GaussianBlur(face_region, (0, 0), sigma)
sharpened_region = cv2.addWeighted(
face_region, 1.0 + sharpness_value,
blurred, -sharpness_value,
0
)
sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8)
sharpened_region = gpu_sharpen(face_region, strength=sharpness_value, sigma=sigma)
processed_frame[y1:y2, x1:x2] = sharpened_region
except cv2.error:
pass
@@ -338,7 +338,7 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype:
# Perform interpolation
try:
final_frame = cv2.addWeighted(
final_frame = gpu_add_weighted(
PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight,
processed_frame, interpolation_weight,
0
@@ -359,10 +359,8 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
pass
PREVIOUS_FRAME_RESULT = processed_frame.copy()
else:
# If interpolation is off or weight is invalid, just use the current frame
# Update state with the current (potentially sharpened) frame
# Reset previous frame state if interpolation was just turned off or weight is invalid
PREVIOUS_FRAME_RESULT = processed_frame.copy()
# Interpolation is off or weight is invalid — no need to cache
PREVIOUS_FRAME_RESULT = None
return final_frame
@@ -813,10 +811,10 @@ def create_lower_mouth_mask(
# Draw polygon on the ROI mask
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
# Apply Gaussian blur (ensure kernel size is odd and positive)
# Apply Gaussian blur (GPU-accelerated when available)
blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd
mask_roi = cv2.GaussianBlur(mask_roi, (blur_k_size, blur_k_size), 0) # Sigma=0 calculates from kernel
mask_roi = gpu_gaussian_blur(mask_roi, (blur_k_size, blur_k_size), 0)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
@@ -952,7 +950,7 @@ def apply_mouth_area(
if roi.shape[:2] != mouth_cutout.shape[:2]:
# Check if mouth_cutout has valid dimensions before resizing
if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0:
resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
resized_mouth_cutout = gpu_resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
else:
# print("Warning: mouth_cutout has invalid dimensions, cannot resize.")
return frame # Cannot proceed without valid cutout
@@ -1000,7 +998,7 @@ def apply_mouth_area(
feather_amount = max(1, min(30, feather_base_dim // max(1, mask_feather_ratio))) # Avoid div by zero
# Ensure kernel size is odd and positive
kernel_size = 2 * feather_amount + 1
feathered_polygon_mask = cv2.GaussianBlur(polygon_mask_roi.astype(float), (kernel_size, kernel_size), 0)
feathered_polygon_mask = cv2.GaussianBlur(polygon_mask_roi.astype(np.float32), (kernel_size, kernel_size), 0)
# Normalize feathered mask to [0.0, 1.0] range
max_val = feathered_polygon_mask.max()
@@ -1015,9 +1013,9 @@ def apply_mouth_area(
# Get the corresponding ROI from the *full face mask* (already blurred)
# Ensure face_mask is float and normalized [0.0, 1.0]
if face_mask.dtype != np.float64 and face_mask.dtype != np.float32:
face_mask_float = face_mask.astype(float) / 255.0
face_mask_float = face_mask.astype(np.float32) / 255.0
else: # Assume already float [0,1] if type is float
face_mask_float = face_mask
face_mask_float = face_mask.astype(np.float32) if face_mask.dtype == np.float64 else face_mask
face_mask_roi = face_mask_float[min_y:max_y, min_x:max_x]
# Combine the feathered mouth polygon mask with the face mask ROI
@@ -1029,14 +1027,14 @@ def apply_mouth_area(
if len(frame.shape) == 3 and frame.shape[2] == 3:
combined_mask_3channel = combined_mask[:, :, np.newaxis]
# Ensure data types are compatible for blending (float or double for mask, uint8 for images)
color_corrected_mouth_uint8 = color_corrected_mouth.astype(np.uint8)
roi_uint8 = roi.astype(np.uint8)
combined_mask_float = combined_mask_3channel.astype(np.float64) # Use float64 for precision in mask
# Ensure data types are compatible for blending
# float32 provides sufficient precision for 8-bit image blending
combined_mask_f32 = combined_mask_3channel.astype(np.float32)
inv_mask = np.float32(1.0) - combined_mask_f32
# Blend: (original_mouth * combined_mask) + (swapped_face_roi * (1 - combined_mask))
blended_roi = (color_corrected_mouth_uint8 * combined_mask_float +
roi_uint8 * (1.0 - combined_mask_float))
blended_roi = (color_corrected_mouth * combined_mask_f32 +
roi * inv_mask)
# Place the blended ROI back into the frame
frame[min_y:max_y, min_x:max_x] = blended_roi.astype(np.uint8)
@@ -1125,14 +1123,10 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
return mask # Return empty mask on error
# Apply Gaussian blur to feather the mask edges
# Kernel size should be reasonably large, odd, and positive
# Apply Gaussian blur to feather the mask edges (GPU-accelerated when available)
blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive
# Use sigma=0 to let OpenCV calculate from kernel size
# Apply blur to the uint8 mask directly
mask = cv2.GaussianBlur(mask, (blur_k_size, blur_k_size), 0)
mask = gpu_gaussian_blur(mask, (blur_k_size, blur_k_size), 0)
# --- Optional: Return float mask for apply_mouth_area ---
# mask = mask.astype(float) / 255.0
+294 -57
View File
@@ -3,14 +3,18 @@ import webbrowser
import customtkinter as ctk
from typing import Callable, Tuple
import cv2
from cv2_enumerate_cameras import enumerate_cameras # Add this import
from modules.gpu_processing import gpu_cvt_color, gpu_resize, gpu_flip
from PIL import Image, ImageOps
import time
import json
import queue
import threading
import numpy as np
import modules.globals
import modules.metadata
from modules.face_analyser import (
get_one_face,
get_many_faces,
get_unique_faces_from_target_image,
get_unique_faces_from_target_video,
add_blank_map,
@@ -27,12 +31,36 @@ from modules.utilities import (
)
from modules.video_capture import VideoCapturer
from modules.gettext import LanguageManager
from modules.ui_tooltip import ToolTip
from modules import globals
import platform
if platform.system() == "Windows":
from pygrabber.dshow_graph import FilterGraph
# --- Tk 9.0 compatibility patch ---
# In Tk 9.0, Menu.index("end") returns "" instead of raising TclError
# when the menu is empty. CustomTkinter's CTkOptionMenu doesn't handle
# this, causing crashes. This patch adds the missing guard.
try:
from customtkinter.windows.widgets.core_widget_classes import DropdownMenu as _DropdownMenu
_original_add_menu_commands = _DropdownMenu._add_menu_commands
def _patched_add_menu_commands(self, *args, **kwargs):
try:
end_index = self._menu.index("end")
if end_index == "" or end_index is None:
return
except Exception:
pass
_original_add_menu_commands(self, *args, **kwargs)
_DropdownMenu._add_menu_commands = _patched_add_menu_commands
except (ImportError, AttributeError):
pass # CustomTkinter version doesn't have this class path
# --- End Tk 9.0 patch ---
ROOT = None
POPUP = None
POPUP_LIVE = None
@@ -164,11 +192,13 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
root, text=_("Select a face"), cursor="hand2", command=lambda: select_source_path()
)
select_face_button.place(relx=0.1, rely=0.30, relwidth=0.3, relheight=0.1)
ToolTip(select_face_button, _("Choose the source face image to swap onto the target"))
swap_faces_button = ctk.CTkButton(
root, text="", cursor="hand2", command=lambda: swap_faces_paths()
)
swap_faces_button.place(relx=0.45, rely=0.30, relwidth=0.1, relheight=0.1)
ToolTip(swap_faces_button, _("Swap source and target images"))
select_target_button = ctk.CTkButton(
root,
@@ -177,6 +207,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
command=lambda: select_target_path(),
)
select_target_button.place(relx=0.6, rely=0.30, relwidth=0.3, relheight=0.1)
ToolTip(select_target_button, _("Choose the target image or video to apply face swap to"))
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(
@@ -190,6 +221,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
keep_fps_checkbox.place(relx=0.1, rely=0.5)
ToolTip(keep_fps_checkbox, _("Output video keeps the original frame rate"))
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(
@@ -203,6 +235,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
keep_frames_switch.place(relx=0.1, rely=0.55)
ToolTip(keep_frames_switch, _("Keep extracted frames on disk after processing"))
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui["face_enhancer"])
enhancer_switch = ctk.CTkSwitch(
@@ -216,6 +249,35 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
enhancer_switch.place(relx=0.1, rely=0.6)
ToolTip(enhancer_switch, _("Improve face quality using the GFPGAN restoration model"))
gpen256_value = ctk.BooleanVar(value=modules.globals.fp_ui.get("face_enhancer_gpen256", False))
gpen256_switch = ctk.CTkSwitch(
root,
text=_("GPEN Enhancer 256"),
variable=gpen256_value,
cursor="hand2",
command=lambda: (
update_tumbler("face_enhancer_gpen256", gpen256_value.get()),
save_switch_states(),
),
)
gpen256_switch.place(relx=0.1, rely=0.65)
ToolTip(gpen256_switch, _("Use GPEN face enhancement model at 256px resolution (faster)"))
gpen512_value = ctk.BooleanVar(value=modules.globals.fp_ui.get("face_enhancer_gpen512", False))
gpen512_switch = ctk.CTkSwitch(
root,
text=_("GPEN Enhancer 512"),
variable=gpen512_value,
cursor="hand2",
command=lambda: (
update_tumbler("face_enhancer_gpen512", gpen512_value.get()),
save_switch_states(),
),
)
gpen512_switch.place(relx=0.1, rely=0.7)
ToolTip(gpen512_switch, _("Use GPEN face enhancement model at 512px resolution (higher quality)"))
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(
@@ -229,6 +291,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
keep_audio_switch.place(relx=0.6, rely=0.5)
ToolTip(keep_audio_switch, _("Copy audio track from the source video to output"))
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(
@@ -242,6 +305,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
many_faces_switch.place(relx=0.6, rely=0.55)
ToolTip(many_faces_switch, _("Swap every detected face, not just the primary one"))
color_correction_value = ctk.BooleanVar(value=modules.globals.color_correction)
color_correction_switch = ctk.CTkSwitch(
@@ -255,6 +319,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
color_correction_switch.place(relx=0.6, rely=0.6)
ToolTip(color_correction_switch, _("Fix blue/green color cast from some webcams"))
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw_filter)
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW filter', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw_filter', nsfw_value.get()))
@@ -272,7 +337,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
close_mapper_window() if not map_faces.get() else None
),
)
map_faces_switch.place(relx=0.1, rely=0.65)
map_faces_switch.place(relx=0.1, rely=0.75)
ToolTip(map_faces_switch, _("Manually assign which source face maps to which target face"))
poisson_blend_value = ctk.BooleanVar(value=modules.globals.poisson_blend)
poisson_blend_switch = ctk.CTkSwitch(
@@ -285,7 +351,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
poisson_blend_switch.place(relx=0.1, rely=0.7)
poisson_blend_switch.place(relx=0.1, rely=0.8)
ToolTip(poisson_blend_switch, _("Blend face edges smoothly using Poisson blending"))
show_fps_value = ctk.BooleanVar(value=modules.globals.show_fps)
show_fps_switch = ctk.CTkSwitch(
@@ -299,6 +366,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
show_fps_switch.place(relx=0.6, rely=0.65)
ToolTip(show_fps_switch, _("Display frames-per-second counter on the live preview"))
mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask)
mouth_mask_switch = ctk.CTkSwitch(
@@ -309,6 +377,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
command=lambda: setattr(modules.globals, "mouth_mask", mouth_mask_var.get()),
)
mouth_mask_switch.place(relx=0.1, rely=0.45)
ToolTip(mouth_mask_switch, _("Preserve original mouth movement in the swapped face"))
show_mouth_mask_box_var = ctk.BooleanVar(value=modules.globals.show_mouth_mask_box)
show_mouth_mask_box_switch = ctk.CTkSwitch(
@@ -321,21 +390,25 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
show_mouth_mask_box_switch.place(relx=0.6, rely=0.45)
ToolTip(show_mouth_mask_box_switch, _("Display the mouth mask boundary for debugging"))
start_button = ctk.CTkButton(
root, text=_("Start"), cursor="hand2", command=lambda: analyze_target(start, root)
)
start_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
ToolTip(start_button, _("Begin processing the target image/video with selected face"))
stop_button = ctk.CTkButton(
root, text=_("Destroy"), cursor="hand2", command=lambda: destroy()
)
stop_button.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
ToolTip(stop_button, _("Stop processing and close the application"))
preview_button = ctk.CTkButton(
root, text=_("Preview"), cursor="hand2", command=lambda: toggle_preview()
)
preview_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
ToolTip(preview_button, _("Show/hide a preview of the processed output"))
# --- Camera Selection ---
camera_label = ctk.CTkLabel(root, text=_("Select Camera:"))
@@ -359,6 +432,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
)
camera_optionmenu.place(relx=0.35, rely=0.92, relwidth=0.25, relheight=0.05)
ToolTip(camera_optionmenu, _("Select which camera to use for live mode"))
live_button = ctk.CTkButton(
root,
@@ -379,6 +453,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
live_button.place(relx=0.65, rely=0.92, relwidth=0.2, relheight=0.05)
ToolTip(live_button, _("Start real-time face swap using webcam"))
# --- End Camera Selection ---
# 1) Define a DoubleVar for transparency (0 = fully transparent, 1 = fully opaque)
@@ -419,6 +494,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
corner_radius=3,
)
transparency_slider.place(relx=0.35, rely=0.77, relwidth=0.5, relheight=0.02)
ToolTip(transparency_slider, _("Blend between original and swapped face (0% = original, 100% = fully swapped)"))
# 3) Sharpness label & slider
sharpness_var = ctk.DoubleVar(value=0.0) # start at 0.0
@@ -444,6 +520,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
corner_radius=3,
)
sharpness_slider.place(relx=0.35, rely=0.82, relwidth=0.5, relheight=0.02)
ToolTip(sharpness_slider, _("Sharpen the enhanced face output"))
# Status and link at the bottom
global status_label
@@ -542,7 +619,7 @@ def create_source_target_popup(
)
x_label.grid(row=id, column=2, padx=10, pady=10)
image = Image.fromarray(cv2.cvtColor(item["target"]["cv2"], cv2.COLOR_BGR2RGB))
image = Image.fromarray(gpu_cvt_color(item["target"]["cv2"], cv2.COLOR_BGR2RGB))
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
)
@@ -597,7 +674,7 @@ def update_popup_source(
}
image = Image.fromarray(
cv2.cvtColor(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -790,7 +867,7 @@ def fit_image_to_size(image, width: int, height: int):
ratio_w = width / w
ratio = max(ratio_w, ratio_h)
new_size = (int(ratio * w), int(ratio * h))
return cv2.resize(image, dsize=new_size)
return gpu_resize(image, dsize=new_size)
def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage:
@@ -808,7 +885,7 @@ def render_video_preview(
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(gpu_cvt_color(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
@@ -846,7 +923,7 @@ def update_preview(frame_number: int = 0) -> None:
temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(modules.globals.source_path)), temp_frame
)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(
image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS
)
@@ -917,21 +994,13 @@ def get_available_cameras():
camera_indices = []
camera_names = []
if platform.system() == "Darwin": # macOS specific handling
# Try to open the default FaceTime camera first
cap = cv2.VideoCapture(0)
if cap.isOpened():
camera_indices.append(0)
camera_names.append("FaceTime Camera")
cap.release()
# On macOS, additional cameras typically use indices 1 and 2
for i in [1, 2]:
cap = cv2.VideoCapture(i)
if cap.isOpened():
camera_indices.append(i)
camera_names.append(f"Camera {i}")
cap.release()
if platform.system() == "Darwin":
# Do NOT probe cameras with cv2.VideoCapture on macOS — probing
# invalid indices triggers the OBSENSOR backend and causes SIGSEGV.
# Default to indices 0 and 1 (covers FaceTime + one USB camera).
# The user can select the correct index from the UI dropdown.
camera_indices = [0, 1]
camera_names = ["Camera 0", "Camera 1"]
else:
# Linux camera detection - test first 10 indices
for i in range(10):
@@ -947,52 +1016,122 @@ def get_available_cameras():
return camera_indices, camera_names
def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
def _capture_thread_func(cap, capture_queue, stop_event):
"""Capture thread: reads frames from camera and puts them into the queue.
Drops frames when the queue is full to avoid backpressure on the camera."""
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
stop_event.set()
break
try:
capture_queue.put_nowait(frame)
except queue.Full:
# Drop the oldest frame and enqueue the new one
try:
capture_queue.get_nowait()
except queue.Empty:
pass
try:
capture_queue.put_nowait(frame)
except queue.Full:
pass
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
update_status("Failed to start camera")
return
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
def _detection_thread_func(latest_frame_holder, detection_result, detection_lock, stop_event):
"""Detection thread: continuously runs face detection on the latest
captured frame and stores results in detection_result under detection_lock.
This decouples face detection (~15-30ms) from face swapping (~5-10ms)
so the swap loop never blocks on detection, significantly improving
live mode FPS."""
while not stop_event.is_set():
with detection_lock:
frame = latest_frame_holder[0]
if frame is None:
time.sleep(0.005)
continue
if modules.globals.many_faces:
many = get_many_faces(frame)
with detection_lock:
detection_result['target_face'] = None
detection_result['many_faces'] = many
else:
face = get_one_face(frame)
with detection_lock:
detection_result['target_face'] = face
detection_result['many_faces'] = None
def _processing_thread_func(capture_queue, processed_queue, stop_event,
latest_frame_holder, detection_result, detection_lock):
"""Processing thread: takes raw frames from capture_queue, reads the
latest detection result from the shared detection_result dict, applies
face swap/enhancement, and puts results into processed_queue.
Face detection runs concurrently in _detection_thread_func — this thread
only reads cached results so it never blocks on detection."""
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None
last_source_path = None
prev_time = time.time()
fps_update_interval = 0.5
frame_count = 0
fps = 0
while True:
ret, frame = cap.read()
if not ret:
break
while not stop_event.is_set():
try:
frame = capture_queue.get(timeout=0.05)
except queue.Empty:
continue
temp_frame = frame.copy()
temp_frame = frame
if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1)
temp_frame = gpu_flip(temp_frame, 1)
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
# Publish the mirrored frame for the detection thread to pick up
with detection_lock:
latest_frame_holder[0] = temp_frame
if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path:
if modules.globals.source_path and modules.globals.source_path != last_source_path:
last_source_path = modules.globals.source_path
source_image = get_one_face(cv2.imread(modules.globals.source_path))
# Read latest detection results (brief lock to avoid blocking detection thread)
with detection_lock:
cached_target_face = detection_result.get('target_face')
cached_many_faces = detection_result.get('many_faces')
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame)
elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN256":
if modules.globals.fp_ui.get("face_enhancer_gpen256", False):
temp_frame = frame_processor.process_frame(None, temp_frame)
elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN512":
if modules.globals.fp_ui.get("face_enhancer_gpen512", False):
temp_frame = frame_processor.process_frame(None, temp_frame)
elif frame_processor.NAME == "DLC.FACE-SWAPPER":
# Use cached face positions from detection thread
swapped_bboxes = []
if modules.globals.many_faces and cached_many_faces:
result = temp_frame.copy()
for t_face in cached_many_faces:
result = frame_processor.swap_face(source_image, t_face, result)
if hasattr(t_face, 'bbox') and t_face.bbox is not None:
swapped_bboxes.append(t_face.bbox.astype(int))
temp_frame = result
elif cached_target_face is not None:
temp_frame = frame_processor.swap_face(source_image, cached_target_face, temp_frame)
if hasattr(cached_target_face, 'bbox') and cached_target_face.bbox is not None:
swapped_bboxes.append(cached_target_face.bbox.astype(int))
# Apply post-processing (sharpening, interpolation)
temp_frame = frame_processor.apply_post_processing(temp_frame, swapped_bboxes)
else:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
else:
@@ -1001,6 +1140,10 @@ def create_webcam_preview(camera_index: int):
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame_v2(temp_frame)
elif frame_processor.NAME in ("DLC.FACE-ENHANCER-GPEN256", "DLC.FACE-ENHANCER-GPEN512"):
fp_key = frame_processor.NAME.split(".")[-1].lower().replace("-", "_")
if modules.globals.fp_ui.get(fp_key, False):
temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
temp_frame = frame_processor.process_frame_v2(temp_frame)
@@ -1023,20 +1166,114 @@ def create_webcam_preview(camera_index: int):
2,
)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
# Put processed frame into output queue, dropping old frames if full
try:
processed_queue.put_nowait(temp_frame)
except queue.Full:
try:
processed_queue.get_nowait()
except queue.Empty:
pass
try:
processed_queue.put_nowait(temp_frame)
except queue.Full:
pass
def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
update_status("Failed to start camera")
return
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
# Queues for decoupling capture from processing and processing from display.
# Small maxsize ensures we always work on recent frames and drop stale ones.
capture_queue = queue.Queue(maxsize=2)
processed_queue = queue.Queue(maxsize=2)
stop_event = threading.Event()
# Shared state for the detection pipeline.
# latest_frame_holder[0] is the most recent raw frame for the detection
# thread; detection_result holds the last detected faces for the
# processing thread to read. Both are guarded by detection_lock.
detection_lock = threading.Lock()
latest_frame_holder = [None]
detection_result = {'target_face': None, 'many_faces': None}
# Start capture thread
cap_thread = threading.Thread(
target=_capture_thread_func,
args=(cap, capture_queue, stop_event),
daemon=True,
)
cap_thread.start()
# Start detection thread — runs face detection asynchronously so the
# processing/swap thread never blocks on it
det_thread = threading.Thread(
target=_detection_thread_func,
args=(latest_frame_holder, detection_result, detection_lock, stop_event),
daemon=True,
)
det_thread.start()
# Start processing thread
proc_thread = threading.Thread(
target=_processing_thread_func,
args=(capture_queue, processed_queue, stop_event,
latest_frame_holder, detection_result, detection_lock),
daemon=True,
)
proc_thread.start()
# Cleanup helper called from the display loop when preview closes
def _cleanup():
stop_event.set()
cap_thread.join(timeout=2.0)
det_thread.join(timeout=2.0)
proc_thread.join(timeout=2.0)
cap.release()
PREVIEW.withdraw()
# Non-blocking display loop using ROOT.after() — avoids blocking the
# Tk event loop which could cause UI freezes or re-entrancy issues
def _display_next_frame():
if stop_event.is_set() or PREVIEW.state() == "withdrawn":
_cleanup()
return
try:
temp_frame = processed_queue.get_nowait()
except queue.Empty:
ROOT.after(16, _display_next_frame)
return
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
image = gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(image)
image = ImageOps.contain(
image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
if PREVIEW.state() == "withdrawn":
break
ROOT.after(16, _display_next_frame)
cap.release()
PREVIEW.withdraw()
# Kick off the non-blocking display loop
ROOT.after(0, _display_next_frame)
def create_source_target_popup_for_webcam(
@@ -1146,7 +1383,7 @@ def refresh_data(map: list):
if "source" in item:
image = Image.fromarray(
cv2.cvtColor(item["source"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(item["source"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1164,7 +1401,7 @@ def refresh_data(map: list):
if "target" in item:
image = Image.fromarray(
cv2.cvtColor(item["target"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(item["target"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1212,7 +1449,7 @@ def update_webcam_source(
}
image = Image.fromarray(
cv2.cvtColor(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1264,7 +1501,7 @@ def update_webcam_target(
}
image = Image.fromarray(
cv2.cvtColor(map[button_num]["target"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(map[button_num]["target"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
+74
View File
@@ -0,0 +1,74 @@
"""Lightweight hover tooltip for CustomTkinter widgets."""
import customtkinter as ctk
class ToolTip:
"""Show a floating tooltip popup when the user hovers over a widget.
Usage:
ToolTip(my_button, "Helpful description text")
"""
def __init__(self, widget: ctk.CTkBaseClass, text: str, delay: int = 500):
self._widget = widget
self._text = text
self._delay = delay
self._tooltip_window = None
self._after_id = None
widget.bind("<Enter>", self._schedule_show, add="+")
widget.bind("<Leave>", self._hide, add="+")
def _schedule_show(self, event=None):
self._cancel()
self._after_id = self._widget.after(self._delay, self._show)
def _show(self):
if self._tooltip_window is not None:
return
x = self._widget.winfo_rootx() + 20
y = self._widget.winfo_rooty() + self._widget.winfo_height() + 5
self._tooltip_window = tw = ctk.CTkToplevel(self._widget)
tw.withdraw()
tw.overrideredirect(True)
label = ctk.CTkLabel(
tw,
text=self._text,
fg_color="#333333",
text_color="#EEEEEE",
corner_radius=6,
padx=8,
pady=4,
)
label.pack()
tw.update_idletasks()
# Clamp to screen bounds
screen_w = tw.winfo_screenwidth()
screen_h = tw.winfo_screenheight()
tip_w = tw.winfo_reqwidth()
tip_h = tw.winfo_reqheight()
if x + tip_w > screen_w:
x = screen_w - tip_w - 5
if y + tip_h > screen_h:
y = self._widget.winfo_rooty() - tip_h - 5
tw.geometry(f"+{x}+{y}")
tw.deiconify()
def _hide(self, event=None):
self._cancel()
if self._tooltip_window is not None:
self._tooltip_window.destroy()
self._tooltip_window = None
def _cancel(self):
if self._after_id is not None:
self._widget.after_cancel(self._after_id)
self._after_id = None
+16 -7
View File
@@ -15,10 +15,6 @@ import modules.globals
TEMP_FILE = "temp.mp4"
TEMP_DIRECTORY = "temp"
# monkey patch ssl for mac
if platform.system().lower() == "darwin":
ssl._create_default_https_context = ssl._create_unverified_context
def run_ffmpeg(args: List[str]) -> bool:
"""Run ffmpeg with hardware acceleration and optimized settings."""
@@ -286,8 +282,15 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
download_directory_path, os.path.basename(url)
)
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get("Content-Length", 0))
request = urllib.request.Request(url)
# Create a specific SSL context for macOS to avoid globally disabling verification
ctx = None
if platform.system().lower() == "darwin":
ctx = ssl._create_unverified_context()
response = urllib.request.urlopen(request, context=ctx)
total = int(response.headers.get("Content-Length", 0))
with tqdm(
total=total,
desc="Downloading",
@@ -295,7 +298,13 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
unit_scale=True,
unit_divisor=1024,
) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
with open(download_file_path, "wb") as f:
while True:
buffer = response.read(8192)
if not buffer:
break
f.write(buffer)
progress.update(len(buffer))
def resolve_relative_path(path: str) -> str:
+2 -10
View File
@@ -1,5 +1,3 @@
--extra-index-url https://download.pytorch.org/whl/cu128
numpy>=1.23.5,<2
typing-extensions>=4.8.0
opencv-python==4.10.0.84
@@ -9,16 +7,10 @@ insightface==0.7.3
psutil==5.9.8
tk==0.1.0
customtkinter==5.2.2
pillow==11.1.0
torch; sys_platform != 'darwin'
torch==2.8.0+cu128; sys_platform == 'darwin'
torchvision; sys_platform != 'darwin'
torchvision==0.20.1; sys_platform == 'darwin'
pillow==12.1.1
onnxruntime-silicon==1.16.3; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.22.0; sys_platform != 'darwin'
onnxruntime-gpu==1.24.2; sys_platform != 'darwin'
tensorflow; sys_platform != 'darwin'
opennsfw2==0.10.2
protobuf==4.25.1
git+https://github.com/xinntao/BasicSR.git@master
git+https://github.com/TencentARC/GFPGAN.git@master
pygrabber