Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0d8f3b1f82 | |||
| 6e9e7addf2 | |||
| 0c7e871bfc | |||
| e340b0da8a | |||
| d0f81ed755 | |||
| de01b28802 | |||
| b645d5e60b | |||
| 31b3a97003 | |||
| e3b46e83b7 | |||
| e93fb95903 | |||
| aabf41050a | |||
| e57116de68 | |||
| d5338a3eae | |||
| 7ec3a4be29 | |||
| ca6cba9311 | |||
| d89385457e | |||
| b015f0099f | |||
| a1722c7b2e |
@@ -26,3 +26,4 @@ faceswap/
|
||||
.vscode/
|
||||
switch_states.json
|
||||
/models
|
||||
install.bat
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
<h1 align="center">Deep-Live-Cam 2.0.4c</h1>
|
||||
<h1 align="center">Deep-Live-Cam 2.0.5c</h1>
|
||||
|
||||
<p align="center">
|
||||
Real-time face swap and video deepfake with a single click and only a single image.
|
||||
@@ -124,7 +124,7 @@ cd Deep-Live-Cam
|
||||
|
||||
**3. Download the Models**
|
||||
|
||||
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth)
|
||||
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.onnx)
|
||||
2. [inswapper\_128\_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx)
|
||||
|
||||
Place these files in the "**models**" folder.
|
||||
@@ -338,23 +338,16 @@ Looking for a CLI mode? Using the -s/--source argument will make the run program
|
||||
|
||||
## Press
|
||||
|
||||
**We are always open to criticism and are ready to improve, that's why we didn't cherry-pick anything.**
|
||||
|
||||
- [*"Deep-Live-Cam goes viral, allowing anyone to become a digital doppelganger"*](https://arstechnica.com/information-technology/2024/08/new-ai-tool-enables-real-time-face-swapping-on-webcams-raising-fraud-concerns/) - Ars Technica
|
||||
- [*"Thanks Deep Live Cam, shapeshifters are among us now"*](https://dataconomy.com/2024/08/15/what-is-deep-live-cam-github-deepfake/) - Dataconomy
|
||||
- [*"This free AI tool lets you become anyone during video-calls"*](https://www.newsbytesapp.com/news/science/deep-live-cam-ai-impersonation-tool-goes-viral/story) - NewsBytes
|
||||
- [*"OK, this viral AI live stream software is truly terrifying"*](https://www.creativebloq.com/ai/ok-this-viral-ai-live-stream-software-is-truly-terrifying) - Creative Bloq
|
||||
- [*"Deepfake AI Tool Lets You Become Anyone in a Video Call With Single Photo"*](https://petapixel.com/2024/08/14/deep-live-cam-deepfake-ai-tool-lets-you-become-anyone-in-a-video-call-with-single-photo-mark-zuckerberg-jd-vance-elon-musk/) - PetaPixel
|
||||
- [*"Deep-Live-Cam Uses AI to Transform Your Face in Real-Time, Celebrities Included"*](https://www.techeblog.com/deep-live-cam-ai-transform-face/) - TechEBlog
|
||||
- [*"An AI tool that "makes you look like anyone" during a video call is going viral online"*](https://telegrafi.com/en/a-tool-that-makes-you-look-like-anyone-during-a-video-call-is-going-viral-on-the-Internet/) - Telegrafi
|
||||
- [*"This Deepfake Tool Turning Images Into Livestreams is Topping the GitHub Charts"*](https://decrypt.co/244565/this-deepfake-tool-turning-images-into-livestreams-is-topping-the-github-charts) - Emerge
|
||||
- [*"New Real-Time Face-Swapping AI Allows Anyone to Mimic Famous Faces"*](https://www.digitalmusicnews.com/2024/08/15/face-swapping-ai-real-time-mimic/) - Digital Music News
|
||||
- [*"This real-time webcam deepfake tool raises alarms about the future of identity theft"*](https://www.diyphotography.net/this-real-time-webcam-deepfake-tool-raises-alarms-about-the-future-of-identity-theft/) - DIYPhotography
|
||||
- [*"That's Crazy, Oh God. That's Fucking Freaky Dude... That's So Wild Dude"*](https://www.youtube.com/watch?time_continue=1074&v=py4Tc-Y8BcY) - SomeOrdinaryGamers
|
||||
- [*"Alright look look look, now look chat, we can do any face we want to look like chat"*](https://www.youtube.com/live/mFsCe7AIxq8?feature=shared&t=2686) - IShowSpeed
|
||||
- [*"They do a pretty good job matching poses, expression and even the lighting"*](https://www.youtube.com/watch?v=wnCghLjqv3s&t=551s) - TechLinked (LTT)
|
||||
- [*"Als Sean Connery an der Redaktionskonferenz teilnahm"*](https://www.golem.de/news/deepfakes-als-sean-connery-an-der-redaktionskonferenz-teilnahm-2408-188172.html) - Golem.de (German)
|
||||
- [*"What the F***! Why do I look like Vinny Jr? I look exactly like Vinny Jr!? No, this shit is crazy! Bro This is F*** Crazy! "*](https://youtu.be/JbUPRmXRUtE?t=3964) - IShowSpeed
|
||||
- [**Ars Technica**](https://arstechnica.com/information-technology/2024/08/new-ai-tool-enables-real-time-face-swapping-on-webcams-raising-fraud-concerns/) - *"Deep-Live-Cam goes viral, allowing anyone to become a digital doppelganger"*
|
||||
- [**Yahoo!**](https://www.yahoo.com/tech/ok-viral-ai-live-stream-080041056.html) - *"OK, this viral AI live stream software is truly terrifying"*
|
||||
- [**CNN Brasil**](https://www.cnnbrasil.com.br/tecnologia/ia-consegue-clonar-rostos-na-webcam-entenda-funcionamento/) - *"AI can clone faces on webcam; understand how it works"*
|
||||
- [**Bloomberg Technoz**](https://www.bloombergtechnoz.com/detail-news/71032/kenalan-dengan-teknologi-deep-live-cam-bisa-jadi-alat-menipu) - *"Get to know Deep Live Cam technology, it can be used as a tool for deception."*
|
||||
- [**TrendMicro**](https://www.trendmicro.com/vinfo/gb/security/news/cyber-attacks/ai-vs-ai-deepfakes-and-ekyc) - *"AI vs AI: DeepFakes and eKYC"*
|
||||
- [**PetaPixel**](https://petapixel.com/2024/08/14/deep-live-cam-deepfake-ai-tool-lets-you-become-anyone-in-a-video-call-with-single-photo-mark-zuckerberg-jd-vance-elon-musk/) - *"Deepfake AI Tool Lets You Become Anyone in a Video Call With Single Photo"*
|
||||
- [**SomeOrdinaryGamers**](https://www.youtube.com/watch?time_continue=1074&v=py4Tc-Y8BcY) - *"That's Crazy, Oh God. That's Fucking Freaky Dude... That's So Wild Dude"*
|
||||
- [**IShowSpeed**](https://www.youtube.com/live/mFsCe7AIxq8?feature=shared&t=2686) - *"Alright look look look, now look chat, we can do any face we want to look like chat"*
|
||||
- [**TechLinked (Linus Tech Tips)**](https://www.youtube.com/watch?v=wnCghLjqv3s&t=551s) - *"They do a pretty good job matching poses, expression and even the lighting"*
|
||||
- [**IShowSpeed**](https://youtu.be/JbUPRmXRUtE?t=3964) - *"What the F***! Why do I look like Vinny Jr? I look exactly like Vinny Jr!? No, this shit is crazy! Bro This is F*** Crazy!"*
|
||||
|
||||
|
||||
## Credits
|
||||
@@ -368,6 +361,7 @@ Looking for a CLI mode? Using the -s/--source argument will make the run program
|
||||
- [vic4key](https://github.com/vic4key): For supporting/contributing to this project
|
||||
- [kier007](https://github.com/kier007): for improving the user experience
|
||||
- [qitianai](https://github.com/qitianai): for multi-lingual support
|
||||
- [laurigates](https://github.com/laurigates): Decoupling stuffs to make everything faster!
|
||||
- and [all developers](https://github.com/hacksider/Deep-Live-Cam/graphs/contributors) behind libraries used in this project.
|
||||
- Footnote: Please be informed that the base author of the code is [s0md3v](https://github.com/s0md3v/roop)
|
||||
- All the wonderful users who helped make this project go viral by starring the repo ❤️
|
||||
|
||||
+4
-6
@@ -39,7 +39,7 @@ def parse_args() -> None:
|
||||
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
|
||||
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
|
||||
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
|
||||
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
|
||||
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer', 'face_enhancer_gpen256', 'face_enhancer_gpen512'], nargs='+')
|
||||
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
|
||||
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
|
||||
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
|
||||
@@ -86,11 +86,9 @@ def parse_args() -> None:
|
||||
modules.globals.execution_threads = args.execution_threads
|
||||
modules.globals.lang = args.lang
|
||||
|
||||
#for ENHANCER tumbler:
|
||||
if 'face_enhancer' in args.frame_processor:
|
||||
modules.globals.fp_ui['face_enhancer'] = True
|
||||
else:
|
||||
modules.globals.fp_ui['face_enhancer'] = False
|
||||
#for ENHANCER tumblers:
|
||||
for enhancer_key in ('face_enhancer', 'face_enhancer_gpen256', 'face_enhancer_gpen512'):
|
||||
modules.globals.fp_ui[enhancer_key] = enhancer_key in args.frame_processor
|
||||
|
||||
# translate deprecated args
|
||||
if args.source_path_deprecated:
|
||||
|
||||
+1
-1
@@ -50,7 +50,7 @@ headless: bool | None = None # Run without UI?
|
||||
log_level: str = "error" # Logging level (e.g., 'debug', 'info', 'warning', 'error')
|
||||
|
||||
# Face Processor UI Toggles (Example)
|
||||
fp_ui: Dict[str, bool] = {"face_enhancer": False}
|
||||
fp_ui: Dict[str, bool] = {"face_enhancer": False, "face_enhancer_gpen256": False, "face_enhancer_gpen512": False}
|
||||
|
||||
# Face Swapper Specific Options
|
||||
face_swapper_enabled: bool = True # General toggle for the swapper processor
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Shared path constants for the Deep-Live-Cam project."""
|
||||
|
||||
import os
|
||||
|
||||
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
MODELS_DIR = os.path.join(ROOT_DIR, "models")
|
||||
@@ -0,0 +1,145 @@
|
||||
"""Shared ONNX-based face enhancement utilities for GPEN-BFR models.
|
||||
|
||||
Provides session creation, pre/post processing, and the core
|
||||
enhance-face-via-ONNX pipeline.
|
||||
"""
|
||||
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import onnxruntime
|
||||
|
||||
import modules.globals
|
||||
|
||||
IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64"
|
||||
|
||||
# Limit concurrent ONNX calls to avoid VRAM exhaustion on multi-face frames
|
||||
THREAD_SEMAPHORE = threading.Semaphore(min(max(1, (os.cpu_count() or 1)), 8))
|
||||
|
||||
|
||||
def create_onnx_session(model_path: str) -> onnxruntime.InferenceSession:
|
||||
"""Create an ONNX Runtime session using the configured execution providers."""
|
||||
providers = modules.globals.execution_providers
|
||||
session = onnxruntime.InferenceSession(model_path, providers=providers)
|
||||
return session
|
||||
|
||||
|
||||
def warmup_session(session: onnxruntime.InferenceSession) -> None:
|
||||
"""Run a dummy inference pass to trigger JIT / compile caching."""
|
||||
try:
|
||||
input_feed = {
|
||||
inp.name: np.zeros(
|
||||
[d if isinstance(d, int) and d > 0 else 1 for d in inp.shape],
|
||||
dtype=np.float32,
|
||||
)
|
||||
for inp in session.get_inputs()
|
||||
}
|
||||
session.run(None, input_feed)
|
||||
except Exception as e:
|
||||
print(f"ONNX enhancer warmup skipped (non-fatal): {e}")
|
||||
|
||||
|
||||
def preprocess_face(face_img: np.ndarray, input_size: int) -> np.ndarray:
|
||||
"""Resize, normalize, and convert a BGR face crop to ONNX input blob.
|
||||
|
||||
GPEN-BFR expects [1, 3, H, W] float32 in RGB, normalized to [-1, 1].
|
||||
"""
|
||||
resized = cv2.resize(face_img, (input_size, input_size), interpolation=cv2.INTER_LINEAR)
|
||||
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
||||
blob = rgb.astype(np.float32) / 255.0 * 2.0 - 1.0
|
||||
blob = np.transpose(blob, (2, 0, 1))[np.newaxis, ...]
|
||||
return blob
|
||||
|
||||
|
||||
def postprocess_face(output: np.ndarray) -> np.ndarray:
|
||||
"""Convert ONNX output [1, 3, H, W] float32 back to BGR uint8 image."""
|
||||
img = output[0].transpose(1, 2, 0)
|
||||
img = ((img + 1.0) / 2.0 * 255.0)
|
||||
img = np.clip(img, 0, 255).astype(np.uint8)
|
||||
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
||||
return img
|
||||
|
||||
|
||||
def _get_face_affine(face: Any, input_size: int):
|
||||
"""Compute affine transform to align a face to GPEN input space.
|
||||
|
||||
Returns (M, inv_M) — forward and inverse affine matrices.
|
||||
"""
|
||||
template = np.array([
|
||||
[0.31556875, 0.4615741],
|
||||
[0.68262291, 0.4615741],
|
||||
[0.50009375, 0.6405054],
|
||||
[0.34947187, 0.8246919],
|
||||
[0.65343645, 0.8246919],
|
||||
], dtype=np.float32) * input_size
|
||||
|
||||
landmarks = None
|
||||
if hasattr(face, "kps") and face.kps is not None:
|
||||
landmarks = face.kps.astype(np.float32)
|
||||
elif hasattr(face, "landmark_2d_106") and face.landmark_2d_106 is not None:
|
||||
lm106 = face.landmark_2d_106
|
||||
landmarks = np.array([
|
||||
lm106[38], # left eye
|
||||
lm106[88], # right eye
|
||||
lm106[86], # nose tip
|
||||
lm106[52], # left mouth
|
||||
lm106[61], # right mouth
|
||||
], dtype=np.float32)
|
||||
|
||||
if landmarks is None or len(landmarks) < 5:
|
||||
return None, None
|
||||
|
||||
M = cv2.estimateAffinePartial2D(landmarks, template, method=cv2.LMEDS)[0]
|
||||
if M is None:
|
||||
return None, None
|
||||
inv_M = cv2.invertAffineTransform(M)
|
||||
return M, inv_M
|
||||
|
||||
|
||||
def enhance_face_onnx(
|
||||
frame: np.ndarray,
|
||||
face: Any,
|
||||
session: onnxruntime.InferenceSession,
|
||||
input_size: int,
|
||||
) -> np.ndarray:
|
||||
"""Enhance a single face in the frame using an ONNX face restoration model."""
|
||||
M, inv_M = _get_face_affine(face, input_size)
|
||||
if M is None:
|
||||
return frame
|
||||
|
||||
face_crop = cv2.warpAffine(
|
||||
frame, M, (input_size, input_size),
|
||||
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
|
||||
)
|
||||
|
||||
blob = preprocess_face(face_crop, input_size)
|
||||
with THREAD_SEMAPHORE:
|
||||
output = session.run(None, {session.get_inputs()[0].name: blob})[0]
|
||||
enhanced = postprocess_face(output)
|
||||
|
||||
# Create mask for blending (feathered edges)
|
||||
mask = np.ones((input_size, input_size), dtype=np.float32)
|
||||
border = max(1, input_size // 16)
|
||||
mask[:border, :] = np.linspace(0, 1, border)[:, np.newaxis]
|
||||
mask[-border:, :] = np.linspace(1, 0, border)[:, np.newaxis]
|
||||
mask[:, :border] = np.minimum(mask[:, :border], np.linspace(0, 1, border)[np.newaxis, :])
|
||||
mask[:, -border:] = np.minimum(mask[:, -border:], np.linspace(1, 0, border)[np.newaxis, :])
|
||||
|
||||
h, w = frame.shape[:2]
|
||||
warped_enhanced = cv2.warpAffine(
|
||||
enhanced, inv_M, (w, h),
|
||||
flags=cv2.INTER_LINEAR, borderValue=(0, 0, 0),
|
||||
)
|
||||
warped_mask = cv2.warpAffine(
|
||||
mask, inv_M, (w, h),
|
||||
flags=cv2.INTER_LINEAR, borderValue=0,
|
||||
)
|
||||
|
||||
mask_3ch = warped_mask[:, :, np.newaxis]
|
||||
result = (warped_enhanced.astype(np.float32) * mask_3ch +
|
||||
frame.astype(np.float32) * (1.0 - mask_3ch))
|
||||
return np.clip(result, 0, 255).astype(np.uint8)
|
||||
@@ -17,8 +17,17 @@ FRAME_PROCESSORS_INTERFACE = [
|
||||
'process_video'
|
||||
]
|
||||
|
||||
ALLOWED_PROCESSORS = {
|
||||
'face_swapper',
|
||||
'face_enhancer',
|
||||
'face_enhancer_gpen256',
|
||||
'face_enhancer_gpen512'
|
||||
}
|
||||
|
||||
def load_frame_processor_module(frame_processor: str) -> Any:
|
||||
if frame_processor not in ALLOWED_PROCESSORS:
|
||||
print(f"Frame processor {frame_processor} is not allowed")
|
||||
sys.exit()
|
||||
try:
|
||||
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
|
||||
for method_name in FRAME_PROCESSORS_INTERFACE:
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
"""GPEN-BFR-256 face enhancer — ONNX-based face restoration at 256x256."""
|
||||
|
||||
from typing import Any, List
|
||||
import os
|
||||
import threading
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.typing import Frame, Face
|
||||
from modules.utilities import (
|
||||
is_image,
|
||||
is_video,
|
||||
)
|
||||
from modules.processors.frame._onnx_enhancer import (
|
||||
create_onnx_session,
|
||||
warmup_session,
|
||||
enhance_face_onnx,
|
||||
)
|
||||
|
||||
NAME = "DLC.FACE-ENHANCER-GPEN256"
|
||||
INPUT_SIZE = 256
|
||||
MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-256.onnx"
|
||||
MODEL_FILE = "GPEN-BFR-256.onnx"
|
||||
|
||||
ENHANCER = None
|
||||
THREAD_LOCK = threading.Lock()
|
||||
|
||||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
models_dir = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||
)
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
update_status(f"Downloading {MODEL_FILE}...", NAME)
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
return True
|
||||
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status("Select an image or video for target path.", NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_enhancer() -> Any:
|
||||
global ENHANCER
|
||||
with THREAD_LOCK:
|
||||
if ENHANCER is None:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
if not os.path.exists(model_path):
|
||||
raise FileNotFoundError(f"Model file not found: {model_path}")
|
||||
print(f"{NAME}: Loading ONNX model from {model_path}")
|
||||
ENHANCER = create_onnx_session(model_path)
|
||||
warmup_session(ENHANCER)
|
||||
print(f"{NAME}: Model loaded successfully.")
|
||||
return ENHANCER
|
||||
|
||||
|
||||
def enhance_face(temp_frame: Frame, face: Face) -> Frame:
|
||||
try:
|
||||
session = get_enhancer()
|
||||
except Exception as e:
|
||||
print(f"{NAME}: {e}")
|
||||
return temp_frame
|
||||
try:
|
||||
return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE)
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error during face enhancement: {e}")
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face is None:
|
||||
return temp_frame
|
||||
return enhance_face(temp_frame, target_face)
|
||||
|
||||
|
||||
def process_frame_v2(temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face:
|
||||
temp_frame = enhance_face(temp_frame, target_face)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frames(
|
||||
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
|
||||
) -> None:
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
if temp_frame is None:
|
||||
if progress:
|
||||
progress.update(1)
|
||||
continue
|
||||
result = process_frame(None, temp_frame)
|
||||
cv2.imwrite(temp_frame_path, result)
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
|
||||
target_frame = cv2.imread(target_path)
|
||||
if target_frame is None:
|
||||
print(f"{NAME}: Error: Failed to read target image {target_path}")
|
||||
return
|
||||
result_frame = process_frame(None, target_frame)
|
||||
cv2.imwrite(output_path, result_frame)
|
||||
print(f"{NAME}: Enhanced image saved to {output_path}")
|
||||
|
||||
|
||||
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
|
||||
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||
@@ -0,0 +1,125 @@
|
||||
"""GPEN-BFR-512 face enhancer — ONNX-based face restoration at 512x512."""
|
||||
|
||||
from typing import Any, List
|
||||
import os
|
||||
import threading
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.typing import Frame, Face
|
||||
from modules.utilities import (
|
||||
is_image,
|
||||
is_video,
|
||||
)
|
||||
from modules.processors.frame._onnx_enhancer import (
|
||||
create_onnx_session,
|
||||
warmup_session,
|
||||
enhance_face_onnx,
|
||||
)
|
||||
|
||||
NAME = "DLC.FACE-ENHANCER-GPEN512"
|
||||
INPUT_SIZE = 512
|
||||
MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-512.onnx"
|
||||
MODEL_FILE = "GPEN-BFR-512.onnx"
|
||||
|
||||
ENHANCER = None
|
||||
THREAD_LOCK = threading.Lock()
|
||||
|
||||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
models_dir = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||
)
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
update_status(f"Downloading {MODEL_FILE}...", NAME)
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
return True
|
||||
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status("Select an image or video for target path.", NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_enhancer() -> Any:
|
||||
global ENHANCER
|
||||
with THREAD_LOCK:
|
||||
if ENHANCER is None:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
if not os.path.exists(model_path):
|
||||
raise FileNotFoundError(f"Model file not found: {model_path}")
|
||||
print(f"{NAME}: Loading ONNX model from {model_path}")
|
||||
ENHANCER = create_onnx_session(model_path)
|
||||
warmup_session(ENHANCER)
|
||||
print(f"{NAME}: Model loaded successfully.")
|
||||
return ENHANCER
|
||||
|
||||
|
||||
def enhance_face(temp_frame: Frame, face: Face) -> Frame:
|
||||
try:
|
||||
session = get_enhancer()
|
||||
except Exception as e:
|
||||
print(f"{NAME}: {e}")
|
||||
return temp_frame
|
||||
try:
|
||||
return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE)
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error during face enhancement: {e}")
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face is None:
|
||||
return temp_frame
|
||||
return enhance_face(temp_frame, target_face)
|
||||
|
||||
|
||||
def process_frame_v2(temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face:
|
||||
temp_frame = enhance_face(temp_frame, target_face)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frames(
|
||||
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
|
||||
) -> None:
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
if temp_frame is None:
|
||||
if progress:
|
||||
progress.update(1)
|
||||
continue
|
||||
result = process_frame(None, temp_frame)
|
||||
cv2.imwrite(temp_frame_path, result)
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
|
||||
target_frame = cv2.imread(target_path)
|
||||
if target_frame is None:
|
||||
print(f"{NAME}: Error: Failed to read target image {target_path}")
|
||||
return
|
||||
result_frame = process_frame(None, target_frame)
|
||||
cv2.imwrite(output_path, result_frame)
|
||||
print(f"{NAME}: Enhanced image saved to {output_path}")
|
||||
|
||||
|
||||
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
|
||||
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||
@@ -6,24 +6,31 @@ from modules.gpu_processing import gpu_gaussian_blur, gpu_resize, gpu_cvt_color
|
||||
|
||||
def apply_color_transfer(source, target):
|
||||
"""
|
||||
Apply color transfer from target to source image
|
||||
Apply color transfer from target to source image using LAB color space.
|
||||
Uses float32 throughout for performance (sufficient precision for 8-bit images).
|
||||
"""
|
||||
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
|
||||
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
|
||||
# Convert to float32 [0,1] range for proper LAB conversion
|
||||
source_f32 = source.astype(np.float32) / 255.0
|
||||
target_f32 = target.astype(np.float32) / 255.0
|
||||
|
||||
source_mean, source_std = cv2.meanStdDev(source)
|
||||
target_mean, target_std = cv2.meanStdDev(target)
|
||||
source_lab = cv2.cvtColor(source_f32, cv2.COLOR_BGR2LAB)
|
||||
target_lab = cv2.cvtColor(target_f32, cv2.COLOR_BGR2LAB)
|
||||
|
||||
# Reshape mean and std to be broadcastable
|
||||
source_mean = source_mean.reshape(1, 1, 3)
|
||||
source_std = source_std.reshape(1, 1, 3)
|
||||
target_mean = target_mean.reshape(1, 1, 3)
|
||||
target_std = target_std.reshape(1, 1, 3)
|
||||
source_mean, source_std = cv2.meanStdDev(source_lab)
|
||||
target_mean, target_std = cv2.meanStdDev(target_lab)
|
||||
|
||||
# Perform the color transfer
|
||||
source = (source - source_mean) * (target_std / source_std) + target_mean
|
||||
# Reshape mean and std to be broadcastable (already float64 from meanStdDev, cast to f32)
|
||||
source_mean = source_mean.reshape(1, 1, 3).astype(np.float32)
|
||||
source_std = np.maximum(source_std.reshape(1, 1, 3), 1e-6).astype(np.float32)
|
||||
target_mean = target_mean.reshape(1, 1, 3).astype(np.float32)
|
||||
target_std = target_std.reshape(1, 1, 3).astype(np.float32)
|
||||
|
||||
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
||||
# Perform the color transfer in LAB space
|
||||
result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean
|
||||
|
||||
# Convert back to BGR and uint8
|
||||
result_bgr = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR)
|
||||
return np.clip(result_bgr * 255.0, 0, 255).astype(np.uint8)
|
||||
|
||||
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
||||
@@ -48,16 +55,14 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||
# Create a slightly larger convex hull for padding
|
||||
face_outline = landmarks[0:33]
|
||||
hull = cv2.convexHull(face_outline)
|
||||
hull_padded = []
|
||||
for point in hull:
|
||||
x, y = point[0]
|
||||
center = np.mean(face_outline, axis=0)
|
||||
direction = np.array([x, y]) - center
|
||||
direction = direction / np.linalg.norm(direction)
|
||||
padded_point = np.array([x, y]) + direction * padding
|
||||
hull_padded.append(padded_point)
|
||||
|
||||
hull_padded = np.array(hull_padded, dtype=np.int32)
|
||||
# Vectorized hull padding — expand each point outward from center
|
||||
center = np.mean(face_outline, axis=0, dtype=np.float32)
|
||||
hull_pts = hull.reshape(-1, 2).astype(np.float32)
|
||||
directions = hull_pts - center
|
||||
norms = np.linalg.norm(directions, axis=1, keepdims=True)
|
||||
norms = np.maximum(norms, 1e-6) # avoid division by zero
|
||||
directions /= norms
|
||||
hull_padded = (hull_pts + directions * padding).astype(np.int32)
|
||||
|
||||
# Fill the padded convex hull
|
||||
cv2.fillConvexPoly(mask, hull_padded, 255)
|
||||
@@ -468,26 +473,28 @@ def apply_mask_area(
|
||||
box_height // modules.globals.mask_feather_ratio,
|
||||
)
|
||||
feathered_mask = cv2.GaussianBlur(
|
||||
polygon_mask.astype(float), (0, 0), feather_amount
|
||||
polygon_mask.astype(np.float32), (0, 0), feather_amount
|
||||
)
|
||||
feathered_mask = feathered_mask / feathered_mask.max()
|
||||
max_val = feathered_mask.max()
|
||||
if max_val > 1e-6:
|
||||
feathered_mask *= np.float32(1.0 / max_val)
|
||||
|
||||
# Apply additional smoothing to the mask edges
|
||||
feathered_mask = cv2.GaussianBlur(feathered_mask, (5, 5), 1)
|
||||
|
||||
face_mask_roi = face_mask[min_y:max_y, min_x:max_x]
|
||||
combined_mask = feathered_mask * (face_mask_roi / 255.0)
|
||||
combined_mask = feathered_mask * (face_mask_roi.astype(np.float32) * np.float32(1.0 / 255.0))
|
||||
|
||||
combined_mask = combined_mask[:, :, np.newaxis]
|
||||
combined_mask_3ch = combined_mask[:, :, np.newaxis]
|
||||
inv_mask = np.float32(1.0) - combined_mask_3ch
|
||||
blended = (
|
||||
color_corrected_area * combined_mask + roi * (1 - combined_mask)
|
||||
color_corrected_area * combined_mask_3ch + roi * inv_mask
|
||||
).astype(np.uint8)
|
||||
|
||||
# Apply face mask to blended result
|
||||
face_mask_3channel = (
|
||||
np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0
|
||||
)
|
||||
final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel)
|
||||
face_mask_f32 = face_mask_roi[:, :, np.newaxis].astype(np.float32) * np.float32(1.0 / 255.0)
|
||||
face_mask_3channel = np.broadcast_to(face_mask_f32, blended.shape)
|
||||
final_blend = blended * face_mask_3channel + roi * (np.float32(1.0) - face_mask_3channel)
|
||||
|
||||
frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8)
|
||||
except Exception as e:
|
||||
|
||||
@@ -137,7 +137,9 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
return temp_frame
|
||||
|
||||
# Store a copy of the original frame before swapping for opacity blending
|
||||
original_frame = temp_frame.copy()
|
||||
opacity = getattr(modules.globals, "opacity", 1.0)
|
||||
opacity = max(0.0, min(1.0, opacity))
|
||||
original_frame = temp_frame if opacity >= 1.0 else temp_frame.copy()
|
||||
|
||||
# Pre-swap Input Check with optimization
|
||||
if temp_frame.dtype != np.uint8:
|
||||
@@ -240,19 +242,13 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
except Exception as e:
|
||||
print(f"Poisson blending failed: {e}")
|
||||
|
||||
# Apply opacity blend between the original frame and the swapped frame
|
||||
opacity = getattr(modules.globals, "opacity", 1.0)
|
||||
# Ensure opacity is within valid range [0.0, 1.0]
|
||||
opacity = max(0.0, min(1.0, opacity))
|
||||
# Apply opacity blend between the original frame and the swapped frame
|
||||
if opacity >= 1.0:
|
||||
return swapped_frame.astype(np.uint8)
|
||||
|
||||
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
|
||||
# Ensure both frames are uint8 before blending
|
||||
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
|
||||
|
||||
# Ensure final frame is uint8 after blending (addWeighted should preserve it, but belt-and-suspenders)
|
||||
final_swapped_frame = final_swapped_frame.astype(np.uint8)
|
||||
|
||||
return final_swapped_frame
|
||||
return final_swapped_frame.astype(np.uint8)
|
||||
|
||||
|
||||
# --- START: Mac M1-M5 Optimized Face Detection ---
|
||||
@@ -363,10 +359,8 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
|
||||
pass
|
||||
PREVIOUS_FRAME_RESULT = processed_frame.copy()
|
||||
else:
|
||||
# If interpolation is off or weight is invalid, just use the current frame
|
||||
# Update state with the current (potentially sharpened) frame
|
||||
# Reset previous frame state if interpolation was just turned off or weight is invalid
|
||||
PREVIOUS_FRAME_RESULT = processed_frame.copy()
|
||||
# Interpolation is off or weight is invalid — no need to cache
|
||||
PREVIOUS_FRAME_RESULT = None
|
||||
|
||||
|
||||
return final_frame
|
||||
@@ -1004,7 +998,7 @@ def apply_mouth_area(
|
||||
feather_amount = max(1, min(30, feather_base_dim // max(1, mask_feather_ratio))) # Avoid div by zero
|
||||
# Ensure kernel size is odd and positive
|
||||
kernel_size = 2 * feather_amount + 1
|
||||
feathered_polygon_mask = cv2.GaussianBlur(polygon_mask_roi.astype(float), (kernel_size, kernel_size), 0)
|
||||
feathered_polygon_mask = cv2.GaussianBlur(polygon_mask_roi.astype(np.float32), (kernel_size, kernel_size), 0)
|
||||
|
||||
# Normalize feathered mask to [0.0, 1.0] range
|
||||
max_val = feathered_polygon_mask.max()
|
||||
@@ -1019,9 +1013,9 @@ def apply_mouth_area(
|
||||
# Get the corresponding ROI from the *full face mask* (already blurred)
|
||||
# Ensure face_mask is float and normalized [0.0, 1.0]
|
||||
if face_mask.dtype != np.float64 and face_mask.dtype != np.float32:
|
||||
face_mask_float = face_mask.astype(float) / 255.0
|
||||
face_mask_float = face_mask.astype(np.float32) / 255.0
|
||||
else: # Assume already float [0,1] if type is float
|
||||
face_mask_float = face_mask
|
||||
face_mask_float = face_mask.astype(np.float32) if face_mask.dtype == np.float64 else face_mask
|
||||
face_mask_roi = face_mask_float[min_y:max_y, min_x:max_x]
|
||||
|
||||
# Combine the feathered mouth polygon mask with the face mask ROI
|
||||
@@ -1033,14 +1027,14 @@ def apply_mouth_area(
|
||||
if len(frame.shape) == 3 and frame.shape[2] == 3:
|
||||
combined_mask_3channel = combined_mask[:, :, np.newaxis]
|
||||
|
||||
# Ensure data types are compatible for blending (float or double for mask, uint8 for images)
|
||||
color_corrected_mouth_uint8 = color_corrected_mouth.astype(np.uint8)
|
||||
roi_uint8 = roi.astype(np.uint8)
|
||||
combined_mask_float = combined_mask_3channel.astype(np.float64) # Use float64 for precision in mask
|
||||
# Ensure data types are compatible for blending
|
||||
# float32 provides sufficient precision for 8-bit image blending
|
||||
combined_mask_f32 = combined_mask_3channel.astype(np.float32)
|
||||
inv_mask = np.float32(1.0) - combined_mask_f32
|
||||
|
||||
# Blend: (original_mouth * combined_mask) + (swapped_face_roi * (1 - combined_mask))
|
||||
blended_roi = (color_corrected_mouth_uint8 * combined_mask_float +
|
||||
roi_uint8 * (1.0 - combined_mask_float))
|
||||
blended_roi = (color_corrected_mouth * combined_mask_f32 +
|
||||
roi * inv_mask)
|
||||
|
||||
# Place the blended ROI back into the frame
|
||||
frame[min_y:max_y, min_x:max_x] = blended_roi.astype(np.uint8)
|
||||
|
||||
+178
-59
@@ -3,7 +3,6 @@ import webbrowser
|
||||
import customtkinter as ctk
|
||||
from typing import Callable, Tuple
|
||||
import cv2
|
||||
from cv2_enumerate_cameras import enumerate_cameras # Add this import
|
||||
from modules.gpu_processing import gpu_cvt_color, gpu_resize, gpu_flip
|
||||
from PIL import Image, ImageOps
|
||||
import time
|
||||
@@ -32,12 +31,36 @@ from modules.utilities import (
|
||||
)
|
||||
from modules.video_capture import VideoCapturer
|
||||
from modules.gettext import LanguageManager
|
||||
from modules.ui_tooltip import ToolTip
|
||||
from modules import globals
|
||||
import platform
|
||||
|
||||
if platform.system() == "Windows":
|
||||
from pygrabber.dshow_graph import FilterGraph
|
||||
|
||||
# --- Tk 9.0 compatibility patch ---
|
||||
# In Tk 9.0, Menu.index("end") returns "" instead of raising TclError
|
||||
# when the menu is empty. CustomTkinter's CTkOptionMenu doesn't handle
|
||||
# this, causing crashes. This patch adds the missing guard.
|
||||
try:
|
||||
from customtkinter.windows.widgets.core_widget_classes import DropdownMenu as _DropdownMenu
|
||||
|
||||
_original_add_menu_commands = _DropdownMenu._add_menu_commands
|
||||
|
||||
def _patched_add_menu_commands(self, *args, **kwargs):
|
||||
try:
|
||||
end_index = self._menu.index("end")
|
||||
if end_index == "" or end_index is None:
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
_original_add_menu_commands(self, *args, **kwargs)
|
||||
|
||||
_DropdownMenu._add_menu_commands = _patched_add_menu_commands
|
||||
except (ImportError, AttributeError):
|
||||
pass # CustomTkinter version doesn't have this class path
|
||||
# --- End Tk 9.0 patch ---
|
||||
|
||||
ROOT = None
|
||||
POPUP = None
|
||||
POPUP_LIVE = None
|
||||
@@ -169,11 +192,13 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
root, text=_("Select a face"), cursor="hand2", command=lambda: select_source_path()
|
||||
)
|
||||
select_face_button.place(relx=0.1, rely=0.30, relwidth=0.3, relheight=0.1)
|
||||
ToolTip(select_face_button, _("Choose the source face image to swap onto the target"))
|
||||
|
||||
swap_faces_button = ctk.CTkButton(
|
||||
root, text="↔", cursor="hand2", command=lambda: swap_faces_paths()
|
||||
)
|
||||
swap_faces_button.place(relx=0.45, rely=0.30, relwidth=0.1, relheight=0.1)
|
||||
ToolTip(swap_faces_button, _("Swap source and target images"))
|
||||
|
||||
select_target_button = ctk.CTkButton(
|
||||
root,
|
||||
@@ -182,6 +207,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
command=lambda: select_target_path(),
|
||||
)
|
||||
select_target_button.place(relx=0.6, rely=0.30, relwidth=0.3, relheight=0.1)
|
||||
ToolTip(select_target_button, _("Choose the target image or video to apply face swap to"))
|
||||
|
||||
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
|
||||
keep_fps_checkbox = ctk.CTkSwitch(
|
||||
@@ -195,6 +221,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
keep_fps_checkbox.place(relx=0.1, rely=0.5)
|
||||
ToolTip(keep_fps_checkbox, _("Output video keeps the original frame rate"))
|
||||
|
||||
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
|
||||
keep_frames_switch = ctk.CTkSwitch(
|
||||
@@ -208,6 +235,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
keep_frames_switch.place(relx=0.1, rely=0.55)
|
||||
ToolTip(keep_frames_switch, _("Keep extracted frames on disk after processing"))
|
||||
|
||||
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui["face_enhancer"])
|
||||
enhancer_switch = ctk.CTkSwitch(
|
||||
@@ -221,6 +249,35 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
enhancer_switch.place(relx=0.1, rely=0.6)
|
||||
ToolTip(enhancer_switch, _("Improve face quality using the GFPGAN restoration model"))
|
||||
|
||||
gpen256_value = ctk.BooleanVar(value=modules.globals.fp_ui.get("face_enhancer_gpen256", False))
|
||||
gpen256_switch = ctk.CTkSwitch(
|
||||
root,
|
||||
text=_("GPEN Enhancer 256"),
|
||||
variable=gpen256_value,
|
||||
cursor="hand2",
|
||||
command=lambda: (
|
||||
update_tumbler("face_enhancer_gpen256", gpen256_value.get()),
|
||||
save_switch_states(),
|
||||
),
|
||||
)
|
||||
gpen256_switch.place(relx=0.1, rely=0.65)
|
||||
ToolTip(gpen256_switch, _("Use GPEN face enhancement model at 256px resolution (faster)"))
|
||||
|
||||
gpen512_value = ctk.BooleanVar(value=modules.globals.fp_ui.get("face_enhancer_gpen512", False))
|
||||
gpen512_switch = ctk.CTkSwitch(
|
||||
root,
|
||||
text=_("GPEN Enhancer 512"),
|
||||
variable=gpen512_value,
|
||||
cursor="hand2",
|
||||
command=lambda: (
|
||||
update_tumbler("face_enhancer_gpen512", gpen512_value.get()),
|
||||
save_switch_states(),
|
||||
),
|
||||
)
|
||||
gpen512_switch.place(relx=0.1, rely=0.7)
|
||||
ToolTip(gpen512_switch, _("Use GPEN face enhancement model at 512px resolution (higher quality)"))
|
||||
|
||||
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
|
||||
keep_audio_switch = ctk.CTkSwitch(
|
||||
@@ -234,6 +291,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
keep_audio_switch.place(relx=0.6, rely=0.5)
|
||||
ToolTip(keep_audio_switch, _("Copy audio track from the source video to output"))
|
||||
|
||||
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
|
||||
many_faces_switch = ctk.CTkSwitch(
|
||||
@@ -247,6 +305,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
many_faces_switch.place(relx=0.6, rely=0.55)
|
||||
ToolTip(many_faces_switch, _("Swap every detected face, not just the primary one"))
|
||||
|
||||
color_correction_value = ctk.BooleanVar(value=modules.globals.color_correction)
|
||||
color_correction_switch = ctk.CTkSwitch(
|
||||
@@ -260,6 +319,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
color_correction_switch.place(relx=0.6, rely=0.6)
|
||||
ToolTip(color_correction_switch, _("Fix blue/green color cast from some webcams"))
|
||||
|
||||
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw_filter)
|
||||
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW filter', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw_filter', nsfw_value.get()))
|
||||
@@ -277,7 +337,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
close_mapper_window() if not map_faces.get() else None
|
||||
),
|
||||
)
|
||||
map_faces_switch.place(relx=0.1, rely=0.65)
|
||||
map_faces_switch.place(relx=0.1, rely=0.75)
|
||||
ToolTip(map_faces_switch, _("Manually assign which source face maps to which target face"))
|
||||
|
||||
poisson_blend_value = ctk.BooleanVar(value=modules.globals.poisson_blend)
|
||||
poisson_blend_switch = ctk.CTkSwitch(
|
||||
@@ -290,7 +351,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
save_switch_states(),
|
||||
),
|
||||
)
|
||||
poisson_blend_switch.place(relx=0.1, rely=0.7)
|
||||
poisson_blend_switch.place(relx=0.1, rely=0.8)
|
||||
ToolTip(poisson_blend_switch, _("Blend face edges smoothly using Poisson blending"))
|
||||
|
||||
show_fps_value = ctk.BooleanVar(value=modules.globals.show_fps)
|
||||
show_fps_switch = ctk.CTkSwitch(
|
||||
@@ -304,6 +366,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
show_fps_switch.place(relx=0.6, rely=0.65)
|
||||
ToolTip(show_fps_switch, _("Display frames-per-second counter on the live preview"))
|
||||
|
||||
mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask)
|
||||
mouth_mask_switch = ctk.CTkSwitch(
|
||||
@@ -314,6 +377,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
command=lambda: setattr(modules.globals, "mouth_mask", mouth_mask_var.get()),
|
||||
)
|
||||
mouth_mask_switch.place(relx=0.1, rely=0.45)
|
||||
ToolTip(mouth_mask_switch, _("Preserve original mouth movement in the swapped face"))
|
||||
|
||||
show_mouth_mask_box_var = ctk.BooleanVar(value=modules.globals.show_mouth_mask_box)
|
||||
show_mouth_mask_box_switch = ctk.CTkSwitch(
|
||||
@@ -326,21 +390,25 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
show_mouth_mask_box_switch.place(relx=0.6, rely=0.45)
|
||||
ToolTip(show_mouth_mask_box_switch, _("Display the mouth mask boundary for debugging"))
|
||||
|
||||
start_button = ctk.CTkButton(
|
||||
root, text=_("Start"), cursor="hand2", command=lambda: analyze_target(start, root)
|
||||
)
|
||||
start_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
ToolTip(start_button, _("Begin processing the target image/video with selected face"))
|
||||
|
||||
stop_button = ctk.CTkButton(
|
||||
root, text=_("Destroy"), cursor="hand2", command=lambda: destroy()
|
||||
)
|
||||
stop_button.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
ToolTip(stop_button, _("Stop processing and close the application"))
|
||||
|
||||
preview_button = ctk.CTkButton(
|
||||
root, text=_("Preview"), cursor="hand2", command=lambda: toggle_preview()
|
||||
)
|
||||
preview_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
ToolTip(preview_button, _("Show/hide a preview of the processed output"))
|
||||
|
||||
# --- Camera Selection ---
|
||||
camera_label = ctk.CTkLabel(root, text=_("Select Camera:"))
|
||||
@@ -364,6 +432,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
)
|
||||
|
||||
camera_optionmenu.place(relx=0.35, rely=0.92, relwidth=0.25, relheight=0.05)
|
||||
ToolTip(camera_optionmenu, _("Select which camera to use for live mode"))
|
||||
|
||||
live_button = ctk.CTkButton(
|
||||
root,
|
||||
@@ -384,6 +453,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
),
|
||||
)
|
||||
live_button.place(relx=0.65, rely=0.92, relwidth=0.2, relheight=0.05)
|
||||
ToolTip(live_button, _("Start real-time face swap using webcam"))
|
||||
# --- End Camera Selection ---
|
||||
|
||||
# 1) Define a DoubleVar for transparency (0 = fully transparent, 1 = fully opaque)
|
||||
@@ -424,6 +494,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
corner_radius=3,
|
||||
)
|
||||
transparency_slider.place(relx=0.35, rely=0.77, relwidth=0.5, relheight=0.02)
|
||||
ToolTip(transparency_slider, _("Blend between original and swapped face (0% = original, 100% = fully swapped)"))
|
||||
|
||||
# 3) Sharpness label & slider
|
||||
sharpness_var = ctk.DoubleVar(value=0.0) # start at 0.0
|
||||
@@ -449,6 +520,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
corner_radius=3,
|
||||
)
|
||||
sharpness_slider.place(relx=0.35, rely=0.82, relwidth=0.5, relheight=0.02)
|
||||
ToolTip(sharpness_slider, _("Sharpen the enhanced face output"))
|
||||
|
||||
# Status and link at the bottom
|
||||
global status_label
|
||||
@@ -922,21 +994,13 @@ def get_available_cameras():
|
||||
camera_indices = []
|
||||
camera_names = []
|
||||
|
||||
if platform.system() == "Darwin": # macOS specific handling
|
||||
# Try to open the default FaceTime camera first
|
||||
cap = cv2.VideoCapture(0)
|
||||
if cap.isOpened():
|
||||
camera_indices.append(0)
|
||||
camera_names.append("FaceTime Camera")
|
||||
cap.release()
|
||||
|
||||
# On macOS, additional cameras typically use indices 1 and 2
|
||||
for i in [1, 2]:
|
||||
cap = cv2.VideoCapture(i)
|
||||
if cap.isOpened():
|
||||
camera_indices.append(i)
|
||||
camera_names.append(f"Camera {i}")
|
||||
cap.release()
|
||||
if platform.system() == "Darwin":
|
||||
# Do NOT probe cameras with cv2.VideoCapture on macOS — probing
|
||||
# invalid indices triggers the OBSENSOR backend and causes SIGSEGV.
|
||||
# Default to indices 0 and 1 (covers FaceTime + one USB camera).
|
||||
# The user can select the correct index from the UI dropdown.
|
||||
camera_indices = [0, 1]
|
||||
camera_names = ["Camera 0", "Camera 1"]
|
||||
else:
|
||||
# Linux camera detection - test first 10 indices
|
||||
for i in range(10):
|
||||
@@ -974,28 +1038,48 @@ def _capture_thread_func(cap, capture_queue, stop_event):
|
||||
pass
|
||||
|
||||
|
||||
# How often to run full face detection. On intermediate frames the last
|
||||
# detected face positions are reused, which significantly reduces the
|
||||
# per-frame cost of the processing thread.
|
||||
DETECT_EVERY_N = 2
|
||||
def _detection_thread_func(latest_frame_holder, detection_result, detection_lock, stop_event):
|
||||
"""Detection thread: continuously runs face detection on the latest
|
||||
captured frame and stores results in detection_result under detection_lock.
|
||||
|
||||
This decouples face detection (~15-30ms) from face swapping (~5-10ms)
|
||||
so the swap loop never blocks on detection, significantly improving
|
||||
live mode FPS."""
|
||||
while not stop_event.is_set():
|
||||
with detection_lock:
|
||||
frame = latest_frame_holder[0]
|
||||
|
||||
if frame is None:
|
||||
time.sleep(0.005)
|
||||
continue
|
||||
|
||||
if modules.globals.many_faces:
|
||||
many = get_many_faces(frame)
|
||||
with detection_lock:
|
||||
detection_result['target_face'] = None
|
||||
detection_result['many_faces'] = many
|
||||
else:
|
||||
face = get_one_face(frame)
|
||||
with detection_lock:
|
||||
detection_result['target_face'] = face
|
||||
detection_result['many_faces'] = None
|
||||
|
||||
|
||||
def _processing_thread_func(capture_queue, processed_queue, stop_event):
|
||||
"""Processing thread: takes raw frames from capture_queue, applies face
|
||||
processing, and puts results into processed_queue. Drops processed frames
|
||||
when the output queue is full so the UI always gets the latest result.
|
||||
def _processing_thread_func(capture_queue, processed_queue, stop_event,
|
||||
latest_frame_holder, detection_result, detection_lock):
|
||||
"""Processing thread: takes raw frames from capture_queue, reads the
|
||||
latest detection result from the shared detection_result dict, applies
|
||||
face swap/enhancement, and puts results into processed_queue.
|
||||
|
||||
Uses DETECT_EVERY_N to skip expensive face detection on intermediate
|
||||
frames, reusing cached face positions instead."""
|
||||
Face detection runs concurrently in _detection_thread_func — this thread
|
||||
only reads cached results so it never blocks on detection."""
|
||||
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
||||
source_image = None
|
||||
last_source_path = None
|
||||
prev_time = time.time()
|
||||
fps_update_interval = 0.5
|
||||
frame_count = 0
|
||||
fps = 0
|
||||
proc_frame_index = 0
|
||||
cached_target_face = None # cached single-face result
|
||||
cached_many_faces = None # cached many-faces result
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
@@ -1003,32 +1087,37 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event):
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
temp_frame = frame.copy()
|
||||
run_detection = (proc_frame_index % DETECT_EVERY_N == 0)
|
||||
proc_frame_index += 1
|
||||
temp_frame = frame
|
||||
|
||||
if modules.globals.live_mirror:
|
||||
temp_frame = gpu_flip(temp_frame, 1)
|
||||
|
||||
# Publish the mirrored frame for the detection thread to pick up
|
||||
with detection_lock:
|
||||
latest_frame_holder[0] = temp_frame
|
||||
|
||||
if not modules.globals.map_faces:
|
||||
if source_image is None and modules.globals.source_path:
|
||||
if modules.globals.source_path and modules.globals.source_path != last_source_path:
|
||||
last_source_path = modules.globals.source_path
|
||||
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
||||
|
||||
# Update face detection cache on detection frames
|
||||
if run_detection or (cached_target_face is None and cached_many_faces is None):
|
||||
if modules.globals.many_faces:
|
||||
cached_many_faces = get_many_faces(temp_frame)
|
||||
cached_target_face = None
|
||||
else:
|
||||
cached_target_face = get_one_face(temp_frame)
|
||||
cached_many_faces = None
|
||||
# Read latest detection results (brief lock to avoid blocking detection thread)
|
||||
with detection_lock:
|
||||
cached_target_face = detection_result.get('target_face')
|
||||
cached_many_faces = detection_result.get('many_faces')
|
||||
|
||||
for frame_processor in frame_processors:
|
||||
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
||||
if modules.globals.fp_ui["face_enhancer"]:
|
||||
temp_frame = frame_processor.process_frame(None, temp_frame)
|
||||
elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN256":
|
||||
if modules.globals.fp_ui.get("face_enhancer_gpen256", False):
|
||||
temp_frame = frame_processor.process_frame(None, temp_frame)
|
||||
elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN512":
|
||||
if modules.globals.fp_ui.get("face_enhancer_gpen512", False):
|
||||
temp_frame = frame_processor.process_frame(None, temp_frame)
|
||||
elif frame_processor.NAME == "DLC.FACE-SWAPPER":
|
||||
# Use cached face positions to skip redundant detection
|
||||
# Use cached face positions from detection thread
|
||||
swapped_bboxes = []
|
||||
if modules.globals.many_faces and cached_many_faces:
|
||||
result = temp_frame.copy()
|
||||
@@ -1051,6 +1140,10 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event):
|
||||
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
||||
if modules.globals.fp_ui["face_enhancer"]:
|
||||
temp_frame = frame_processor.process_frame_v2(temp_frame)
|
||||
elif frame_processor.NAME in ("DLC.FACE-ENHANCER-GPEN256", "DLC.FACE-ENHANCER-GPEN512"):
|
||||
fp_key = frame_processor.NAME.split(".")[-1].lower().replace("-", "_")
|
||||
if modules.globals.fp_ui.get(fp_key, False):
|
||||
temp_frame = frame_processor.process_frame_v2(temp_frame)
|
||||
else:
|
||||
temp_frame = frame_processor.process_frame_v2(temp_frame)
|
||||
|
||||
@@ -1104,6 +1197,14 @@ def create_webcam_preview(camera_index: int):
|
||||
processed_queue = queue.Queue(maxsize=2)
|
||||
stop_event = threading.Event()
|
||||
|
||||
# Shared state for the detection pipeline.
|
||||
# latest_frame_holder[0] is the most recent raw frame for the detection
|
||||
# thread; detection_result holds the last detected faces for the
|
||||
# processing thread to read. Both are guarded by detection_lock.
|
||||
detection_lock = threading.Lock()
|
||||
latest_frame_holder = [None]
|
||||
detection_result = {'target_face': None, 'many_faces': None}
|
||||
|
||||
# Start capture thread
|
||||
cap_thread = threading.Thread(
|
||||
target=_capture_thread_func,
|
||||
@@ -1112,21 +1213,45 @@ def create_webcam_preview(camera_index: int):
|
||||
)
|
||||
cap_thread.start()
|
||||
|
||||
# Start detection thread — runs face detection asynchronously so the
|
||||
# processing/swap thread never blocks on it
|
||||
det_thread = threading.Thread(
|
||||
target=_detection_thread_func,
|
||||
args=(latest_frame_holder, detection_result, detection_lock, stop_event),
|
||||
daemon=True,
|
||||
)
|
||||
det_thread.start()
|
||||
|
||||
# Start processing thread
|
||||
proc_thread = threading.Thread(
|
||||
target=_processing_thread_func,
|
||||
args=(capture_queue, processed_queue, stop_event),
|
||||
args=(capture_queue, processed_queue, stop_event,
|
||||
latest_frame_holder, detection_result, detection_lock),
|
||||
daemon=True,
|
||||
)
|
||||
proc_thread.start()
|
||||
|
||||
# Main (UI) thread: pull processed frames and update the display
|
||||
while not stop_event.is_set():
|
||||
# Cleanup helper called from the display loop when preview closes
|
||||
def _cleanup():
|
||||
stop_event.set()
|
||||
cap_thread.join(timeout=2.0)
|
||||
det_thread.join(timeout=2.0)
|
||||
proc_thread.join(timeout=2.0)
|
||||
cap.release()
|
||||
PREVIEW.withdraw()
|
||||
|
||||
# Non-blocking display loop using ROOT.after() — avoids blocking the
|
||||
# Tk event loop which could cause UI freezes or re-entrancy issues
|
||||
def _display_next_frame():
|
||||
if stop_event.is_set() or PREVIEW.state() == "withdrawn":
|
||||
_cleanup()
|
||||
return
|
||||
|
||||
try:
|
||||
temp_frame = processed_queue.get(timeout=0.03)
|
||||
temp_frame = processed_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
ROOT.update()
|
||||
continue
|
||||
ROOT.after(16, _display_next_frame)
|
||||
return
|
||||
|
||||
if modules.globals.live_resizable:
|
||||
temp_frame = fit_image_to_size(
|
||||
@@ -1144,17 +1269,11 @@ def create_webcam_preview(camera_index: int):
|
||||
)
|
||||
image = ctk.CTkImage(image, size=image.size)
|
||||
preview_label.configure(image=image)
|
||||
ROOT.update()
|
||||
|
||||
if PREVIEW.state() == "withdrawn":
|
||||
break
|
||||
ROOT.after(16, _display_next_frame)
|
||||
|
||||
# Signal threads to stop and wait for them
|
||||
stop_event.set()
|
||||
cap_thread.join(timeout=2.0)
|
||||
proc_thread.join(timeout=2.0)
|
||||
cap.release()
|
||||
PREVIEW.withdraw()
|
||||
# Kick off the non-blocking display loop
|
||||
ROOT.after(0, _display_next_frame)
|
||||
|
||||
|
||||
def create_source_target_popup_for_webcam(
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
"""Lightweight hover tooltip for CustomTkinter widgets."""
|
||||
|
||||
import customtkinter as ctk
|
||||
|
||||
|
||||
class ToolTip:
|
||||
"""Show a floating tooltip popup when the user hovers over a widget.
|
||||
|
||||
Usage:
|
||||
ToolTip(my_button, "Helpful description text")
|
||||
"""
|
||||
|
||||
def __init__(self, widget: ctk.CTkBaseClass, text: str, delay: int = 500):
|
||||
self._widget = widget
|
||||
self._text = text
|
||||
self._delay = delay
|
||||
self._tooltip_window = None
|
||||
self._after_id = None
|
||||
|
||||
widget.bind("<Enter>", self._schedule_show, add="+")
|
||||
widget.bind("<Leave>", self._hide, add="+")
|
||||
|
||||
def _schedule_show(self, event=None):
|
||||
self._cancel()
|
||||
self._after_id = self._widget.after(self._delay, self._show)
|
||||
|
||||
def _show(self):
|
||||
if self._tooltip_window is not None:
|
||||
return
|
||||
|
||||
x = self._widget.winfo_rootx() + 20
|
||||
y = self._widget.winfo_rooty() + self._widget.winfo_height() + 5
|
||||
|
||||
self._tooltip_window = tw = ctk.CTkToplevel(self._widget)
|
||||
tw.withdraw()
|
||||
tw.overrideredirect(True)
|
||||
|
||||
label = ctk.CTkLabel(
|
||||
tw,
|
||||
text=self._text,
|
||||
fg_color="#333333",
|
||||
text_color="#EEEEEE",
|
||||
corner_radius=6,
|
||||
padx=8,
|
||||
pady=4,
|
||||
)
|
||||
label.pack()
|
||||
|
||||
tw.update_idletasks()
|
||||
|
||||
# Clamp to screen bounds
|
||||
screen_w = tw.winfo_screenwidth()
|
||||
screen_h = tw.winfo_screenheight()
|
||||
tip_w = tw.winfo_reqwidth()
|
||||
tip_h = tw.winfo_reqheight()
|
||||
|
||||
if x + tip_w > screen_w:
|
||||
x = screen_w - tip_w - 5
|
||||
if y + tip_h > screen_h:
|
||||
y = self._widget.winfo_rooty() - tip_h - 5
|
||||
|
||||
tw.geometry(f"+{x}+{y}")
|
||||
tw.deiconify()
|
||||
|
||||
def _hide(self, event=None):
|
||||
self._cancel()
|
||||
if self._tooltip_window is not None:
|
||||
self._tooltip_window.destroy()
|
||||
self._tooltip_window = None
|
||||
|
||||
def _cancel(self):
|
||||
if self._after_id is not None:
|
||||
self._widget.after_cancel(self._after_id)
|
||||
self._after_id = None
|
||||
+16
-7
@@ -15,10 +15,6 @@ import modules.globals
|
||||
TEMP_FILE = "temp.mp4"
|
||||
TEMP_DIRECTORY = "temp"
|
||||
|
||||
# monkey patch ssl for mac
|
||||
if platform.system().lower() == "darwin":
|
||||
ssl._create_default_https_context = ssl._create_unverified_context
|
||||
|
||||
|
||||
def run_ffmpeg(args: List[str]) -> bool:
|
||||
"""Run ffmpeg with hardware acceleration and optimized settings."""
|
||||
@@ -286,8 +282,15 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
|
||||
download_directory_path, os.path.basename(url)
|
||||
)
|
||||
if not os.path.exists(download_file_path):
|
||||
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
|
||||
total = int(request.headers.get("Content-Length", 0))
|
||||
request = urllib.request.Request(url)
|
||||
|
||||
# Create a specific SSL context for macOS to avoid globally disabling verification
|
||||
ctx = None
|
||||
if platform.system().lower() == "darwin":
|
||||
ctx = ssl._create_unverified_context()
|
||||
|
||||
response = urllib.request.urlopen(request, context=ctx)
|
||||
total = int(response.headers.get("Content-Length", 0))
|
||||
with tqdm(
|
||||
total=total,
|
||||
desc="Downloading",
|
||||
@@ -295,7 +298,13 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
|
||||
unit_scale=True,
|
||||
unit_divisor=1024,
|
||||
) as progress:
|
||||
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
|
||||
with open(download_file_path, "wb") as f:
|
||||
while True:
|
||||
buffer = response.read(8192)
|
||||
if not buffer:
|
||||
break
|
||||
f.write(buffer)
|
||||
progress.update(len(buffer))
|
||||
|
||||
|
||||
def resolve_relative_path(path: str) -> str:
|
||||
|
||||
Reference in New Issue
Block a user