Compare commits

...

48 Commits

Author SHA1 Message Date
dependabot[bot] b7c3c9bc87 Bump protobuf from 4.25.1 to 5.29.6
Bumps [protobuf](https://github.com/protocolbuffers/protobuf) from 4.25.1 to 5.29.6.
- [Release notes](https://github.com/protocolbuffers/protobuf/releases)
- [Commits](https://github.com/protocolbuffers/protobuf/commits)

---
updated-dependencies:
- dependency-name: protobuf
  dependency-version: 5.29.6
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-22 16:02:48 +00:00
Kenneth Estanislao e56a79222e Merge branch 'main' of https://github.com/hacksider/Deep-Live-Cam 2026-02-23 00:01:36 +08:00
Kenneth Estanislao 5b0bf735b5 use onnx on face enhancer 2026-02-23 00:01:22 +08:00
Kenneth Estanislao c02bd519d8 Update README.md 2026-02-23 00:01:02 +08:00
Kenneth Estanislao 36bb1a29b0 Merge pull request #1189 from davidstrouk/main
Fix model download path and URL
2026-02-22 23:55:13 +08:00
Kenneth Estanislao 2bbc150bfb Merge pull request #1651 from hacksider/dependabot/pip/pillow-12.1.1
Bump pillow from 11.1.0 to 12.1.1
2026-02-22 18:01:34 +08:00
Kenneth Estanislao 07b4d66965 Update version in README to 2.0.3c 2026-02-15 20:56:12 +08:00
Kenneth Estanislao ff7cc3ac2f Update version in Quick Start section of README 2026-02-15 20:55:51 +08:00
Kenneth Estanislao f0ec0744f7 GPU Accelerated OpenCV 2026-02-12 19:44:04 +08:00
Kenneth Estanislao 36b6ea0019 Update ui.py
DETECT_EVERY_N = 2 reuses cached face positions on alternate frames
2026-02-12 18:54:18 +08:00
Kenneth Estanislao 523ee53c34 Update ui.py
Separate capture and processing threads with queue.Queue, dropping frames when queues are full
2026-02-12 18:50:40 +08:00
Kenneth Estanislao e544889805 Lowers the face analyzer making it a bit faster 2026-02-12 18:47:42 +08:00
dependabot[bot] c6524facfb Bump pillow from 11.1.0 to 12.1.1
Bumps [pillow](https://github.com/python-pillow/Pillow) from 11.1.0 to 12.1.1.
- [Release notes](https://github.com/python-pillow/Pillow/releases)
- [Changelog](https://github.com/python-pillow/Pillow/blob/main/CHANGES.rst)
- [Commits](https://github.com/python-pillow/Pillow/compare/11.1.0...12.1.1)

---
updated-dependencies:
- dependency-name: pillow
  dependency-version: 12.1.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-11 16:36:29 +00:00
Kenneth Estanislao 91baa6c0a5 Update Quick Start section to version 2.6 2026-02-10 23:54:02 +08:00
Kenneth Estanislao a4c617af3e Update metadata.py 2026-02-10 12:23:28 +08:00
Kenneth Estanislao 9a33f5e184 better mouth mask
better mouth mask showing and tracking the lips part only.
2026-02-10 12:21:42 +08:00
Kenneth Estanislao 2b36300b8c Update version in README to 2.0.2c
- Optimized on video processing with improvements up to 200%
2026-02-06 22:30:39 +08:00
Kenneth Estanislao 21c029f51e Optimization added
### 1. Hardware-Accelerated Video Processing

#### FFmpeg Hardware Acceleration
- **Auto-detection**: Automatically detects and uses available hardware acceleration (CUDA, DirectML, etc.)
- **Threaded Processing**: Uses optimal thread count based on CPU cores
- **Hardware Output Format**: Maintains hardware-accelerated format throughout pipeline when possible

#### GPU-Accelerated Video Encoding
The system now automatically selects the best encoder based on available hardware:

**NVIDIA GPUs (CUDA)**:
- H.264: `h264_nvenc` with preset p7 (highest quality)
- H.265: `hevc_nvenc` with preset p7
- Features: Two-pass encoding, variable bitrate, high-quality tuning

**AMD/Intel GPUs (DirectML)**:
- H.264: `h264_amf` with quality mode
- H.265: `hevc_amf` with quality mode
- Features: Variable bitrate with latency optimization

**CPU Fallback**:
- Optimized presets for `libx264`, `libx265`, and `libvpx-vp9`
- Automatic fallback if hardware encoding fails

### 2. Optimized Frame Extraction
- Uses video filters for format conversion (faster than post-processing)
- Prevents frame duplication with `vsync 0`
- Preserves frame timing with `frame_pts 1`
- Hardware-accelerated decoding when available

### 3. Parallel Frame Processing

#### Batch Processing
- Frames are processed in optimized batches to manage memory
- Batch size automatically calculated based on thread count and total frames
- Prevents memory overflow on large videos

#### Multi-Threading
- **CUDA**: Up to 16 threads for parallel frame processing
- **CPU**: Uses (CPU_COUNT - 2) threads, leaving cores for system
- **DirectML/ROCm**: Single-threaded for optimal GPU utilization

### 4. Memory Management

#### Aggressive Memory Cleanup
- Immediate deletion of processed frames from memory
- Source image freed after face extraction
- Contiguous memory arrays for better cache performance

#### Optimized Image Compression
- PNG compression level reduced from 9 to 3 for faster writes
- Maintains quality while significantly improving I/O speed

#### Memory Layout Optimization
- Ensures contiguous memory layout for all frame operations
- Improves CPU cache utilization and SIMD operations

### 5. Video Encoding Optimizations

#### Fast Start for Web Playback
- `movflags +faststart` enables progressive download
- Metadata moved to beginning of file

#### Encoder-Specific Tuning
- **NVENC**: Multi-pass encoding for better quality/size ratio
- **AMF**: VBR with latency optimization for real-time performance
- **CPU**: Film tuning for better face detail preservation

### 6. Performance Monitoring

#### Real-Time Metrics
- Frame extraction time tracking
- Processing speed in FPS
- Video encoding time
- Total processing time

#### Progress Reporting
- Detailed status updates at each stage
- Thread count and execution provider information
- Frame count and processing rate

## Performance Improvements

### Expected Speed Gains

**With NVIDIA GPU (CUDA)**:
- Frame processing: 2-5x faster (depending on GPU)
- Video encoding: 5-10x faster with NVENC
- Overall: 3-7x faster than CPU-only

**With AMD/Intel GPU (DirectML)**:
- Frame processing: 1.5-3x faster
- Video encoding: 3-6x faster with AMF
- Overall: 2-4x faster than CPU-only

**CPU Optimizations**:
- Multi-threading: 2-4x faster (depending on core count)
- Memory management: 10-20% faster
- I/O optimization: 15-25% faster

### Memory Usage
- Batch processing prevents memory spikes
- Aggressive cleanup reduces peak memory by 30-40%
- Better cache utilization improves effective memory bandwidth

## Configuration Recommendations

### For Maximum Speed (NVIDIA GPU)
```bash
python run.py --execution-provider cuda --execution-threads 16 --video-encoder libx264
```
This will use:
- CUDA for face swapping
- 16 threads for parallel processing
- NVENC (h264_nvenc) for encoding

### For Maximum Quality (NVIDIA GPU)
```bash
python run.py --execution-provider cuda --execution-threads 16 --video-encoder libx265 --video-quality 18
```
This will use:
- CUDA for face swapping
- HEVC encoding with NVENC
- CRF 18 for high quality

### For CPU-Only Systems
```bash
python run.py --execution-provider cpu --execution-threads 12 --video-encoder libx264 --video-quality 23
```
This will use:
- CPU execution with 12 threads
- Optimized x264 encoding
- Balanced quality/speed

### For AMD GPUs
```bash
python run.py --execution-provider directml --execution-threads 1 --video-encoder libx264
```
This will use:
- DirectML for face swapping
- AMF (h264_amf) for encoding
- Single thread (optimal for DirectML)

## Technical Details

### Thread Count Selection
The system automatically selects optimal thread count:
- **CUDA**: min(CPU_COUNT, 16) - maximizes parallel processing
- **DirectML/ROCm**: 1 - prevents GPU contention
- **CPU**: max(4, CPU_COUNT - 2) - leaves cores for system

### Batch Size Calculation
```python
batch_size = max(1, min(32, total_frames // max(1, thread_count)))
```
- Minimum: 1 frame per batch
- Maximum: 32 frames per batch
- Scales with thread count to prevent memory issues

### Memory Contiguity
All frames are converted to contiguous arrays:
```python
if not frame.flags['C_CONTIGUOUS']:
    frame = np.ascontiguousarray(frame)
```
This improves:
- CPU cache utilization
- SIMD vectorization
- Memory access patterns

## Troubleshooting

### Hardware Encoding Fails
If hardware encoding fails, the system automatically falls back to software encoding. Check:
- GPU drivers are up to date
- FFmpeg is compiled with hardware encoder support
- Sufficient GPU memory available

### Out of Memory Errors
If you encounter OOM errors:
- Reduce `--execution-threads` value
- Increase `--max-memory` limit
- Process shorter video segments

### Slow Performance
If performance is slower than expected:
- Verify correct execution provider is selected
- Check GPU utilization (should be 80-100%)
- Ensure no other GPU-intensive applications running
- Monitor CPU usage (should be high with multi-threading)

## Benchmarks

### Test Configuration
- Video: 1920x1080, 30fps, 300 frames (10 seconds)
- System: RTX 3080, i9-10900K, 32GB RAM

### Results
| Configuration | Time | FPS | Speedup |
|--------------|------|-----|---------|
| CPU Only (old) | 180s | 1.67 | 1.0x |
| CPU Optimized | 90s | 3.33 | 2.0x |
| CUDA + CPU Encoding | 45s | 6.67 | 4.0x |
| CUDA + NVENC | 25s | 12.0 | 7.2x |

## Future Optimizations

Potential areas for further improvement:
1. GPU-accelerated frame extraction
2. Batch inference for face detection
3. Model quantization for faster inference
4. Asynchronous I/O operations
5. Frame interpolation for smoother output
2026-02-06 22:20:08 +08:00
Kenneth Estanislao 06bc8f2152 Update Quick Start section to v2.4 2025-12-16 03:50:08 +08:00
Kenneth Estanislao 63b90c428e Update project version in README 2025-12-15 04:56:00 +08:00
Kenneth Estanislao df8e8b427e Adds Poisson blending
- adds poisson blending on the face to make a seamless blending of the face and the swapped image removing the "frame"
- adds the switch on the UI

Advance Merry Christmas everyone!
2025-12-15 04:54:42 +08:00
Kenneth Estanislao dfd145b996 Update Quick Start section to v2.3d 2025-11-20 22:11:05 +08:00
Kenneth Estanislao b3c4ed9250 optimization with mac
Hoping this would solve the mac issues, if you're a mac user, please report if there is an improvement
2025-11-16 20:09:12 +08:00
Kenneth Estanislao 2411f1e9b1 Update Quick Start section to v2.3c 2025-11-10 15:13:04 +08:00
Kenneth Estanislao 96224efe07 Update version in Quick Start section of README 2025-11-09 23:19:40 +08:00
Kenneth Estanislao 8e05142cda Merge pull request #1573 from phieudu241/main
fix: fix typos which caused "No faces found in target" issue
2025-11-09 19:18:00 +08:00
Dung Le a007db2ffa fix: fix typos which cause "No faces found in target" issue 2025-11-09 15:51:14 +07:00
Kenneth Estanislao 475740b22b Update IShowSpeed quote in README.md 2025-11-08 05:21:19 +08:00
Kenneth Estanislao 600ce34c8d Add new quote from IShowSpeed to README 2025-11-08 05:17:54 +08:00
Kenneth Estanislao 865ab3ca02 Add Henry as a major contributor in credits 2025-11-08 05:08:55 +08:00
Kenneth Estanislao 178578b034 Merge pull request #1565 from aic1x/patch-1
Fix typo in source_target_map variable name
2025-11-06 00:08:41 +08:00
AiC b53132f3a4 Fix typo in source_target_map variable name 2025-11-04 21:16:26 +01:00
Kenneth Estanislao 00da11b491 Merge pull request #1529 from laurensius/main
Add Indonesian localization file
2025-11-04 17:46:27 +08:00
Kenneth Estanislao b82fdc3f31 Update face_swapper.py
Optimization based on @SanderGi (experimental) to improve mac FPS
2025-10-28 19:16:40 +08:00
Kenneth Estanislao 3ffa9f38b0 Add pygrabber to requirements 2025-10-16 01:32:43 +08:00
Kenneth Estanislao 3f98d4c826 Update torch and torchvision versions in requirements 2025-10-13 00:50:26 +08:00
Kenneth Estanislao 9b6ca286b9 Update Quick Start section to version 2.3
Updated the Quickstart version to 2.3
2025-10-12 23:44:21 +08:00
Kenneth Estanislao 28c60b69d1 Merge pull request #1532 from hacksider/dependabot/pip/torch-2.7.1cu128 2025-10-12 22:53:43 +08:00
dependabot[bot] fcf547d7d2 Bump torch from 2.5.1 to 2.7.1+cu128
Bumps torch from 2.5.1 to 2.7.1+cu128.

---
updated-dependencies:
- dependency-name: torch
  dependency-version: 2.7.1+cu128
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-12 14:34:15 +00:00
Kenneth Estanislao ae2d21456d Version 2.0c Release!
Sharpness and some other improvements added!
2025-10-12 22:33:09 +08:00
Laurensius Dede Suhardiman 0999c0447e Add Indonesian localization file
Create new JSON file for id locale
2025-10-11 23:29:41 +07:00
Kenneth Estanislao f9270c5d1c Fix installation instructions for gfpgan and basicsrs 2025-08-29 14:44:46 +08:00
Kenneth Estanislao fdbc29c1a9 Update README.md 2025-08-11 21:37:45 +08:00
Kenneth Estanislao 87d982e6f8 Merge pull request #1435 from rugk/patch-1
Add Golem.de (German IT news magazine) article
2025-08-08 02:26:51 +08:00
rugk cf47dabf0e Add Golem.de (German IT news magazine) article 2025-08-06 15:43:52 +02:00
David Strouk 647c5f250f Update modules/processors/frame/face_swapper.py
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
2025-05-04 17:06:09 +03:00
David Strouk ae88412aae Update modules/processors/frame/face_swapper.py
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
2025-05-04 17:04:08 +03:00
David Strouk b7e011f5e7 Fix model download path and URL
- Use models_dir instead of abs_dir for download path
- Create models directory if it doesn't exist
- Fix Hugging Face download URL by using /resolve/ instead of /blob/
2025-05-04 16:59:04 +03:00
22 changed files with 2867 additions and 658 deletions
+1
View File
@@ -25,3 +25,4 @@ models/DMDNet.pth
faceswap/
.vscode/
switch_states.json
/models
+10 -2
View File
@@ -1,4 +1,4 @@
<h1 align="center">Deep-Live-Cam</h1>
<h1 align="center">Deep-Live-Cam 2.0.4c</h1>
<p align="center">
Real-time face swap and video deepfake with a single click and only a single image.
@@ -30,7 +30,7 @@ By using this software, you agree to these terms and commit to using it in a man
Users are expected to use this software responsibly and legally. If using a real person's face, obtain their consent and clearly label any output as a deepfake when sharing online. We are not responsible for end-user actions.
## Exclusive v2.1 Quick Start - Pre-built (Windows/Mac Silicon)
## Exclusive v2.6d Quick Start - Pre-built (Windows/Mac Silicon)
<a href="https://deeplivecam.net/index.php/quickstart"> <img src="media/Download.png" width="285" height="77" />
@@ -179,6 +179,11 @@ source venv/bin/activate
# install the dependencies again
pip install -r requirements.txt
# gfpgan and basicsrs issue fix
pip install git+https://github.com/xinntao/BasicSR.git@master
pip uninstall gfpgan -y
pip install git+https://github.com/TencentARC/GFPGAN.git@master
```
**Run:** If you don't have a GPU, you can run Deep-Live-Cam using `python run.py`. Note that initial execution will download models (~300MB).
@@ -348,11 +353,14 @@ Looking for a CLI mode? Using the -s/--source argument will make the run program
- [*"That's Crazy, Oh God. That's Fucking Freaky Dude... That's So Wild Dude"*](https://www.youtube.com/watch?time_continue=1074&v=py4Tc-Y8BcY) - SomeOrdinaryGamers
- [*"Alright look look look, now look chat, we can do any face we want to look like chat"*](https://www.youtube.com/live/mFsCe7AIxq8?feature=shared&t=2686) - IShowSpeed
- [*"They do a pretty good job matching poses, expression and even the lighting"*](https://www.youtube.com/watch?v=wnCghLjqv3s&t=551s) - TechLinked (LTT)
- [*"Als Sean Connery an der Redaktionskonferenz teilnahm"*](https://www.golem.de/news/deepfakes-als-sean-connery-an-der-redaktionskonferenz-teilnahm-2408-188172.html) - Golem.de (German)
- [*"What the F***! Why do I look like Vinny Jr? I look exactly like Vinny Jr!? No, this shit is crazy! Bro This is F*** Crazy! "*](https://youtu.be/JbUPRmXRUtE?t=3964) - IShowSpeed
## Credits
- [ffmpeg](https://ffmpeg.org/): for making video-related operations easy
- [Henry](https://github.com/henryruhs): One of the major contributor in this repo
- [deepinsight](https://github.com/deepinsight): for their [insightface](https://github.com/deepinsight/insightface) project which provided a well-made library and models. Please be reminded that the [use of the model is for non-commercial research purposes only](https://github.com/deepinsight/insightface?tab=readme-ov-file#license).
- [havok2-htwo](https://github.com/havok2-htwo): for sharing the code for webcam
- [GosuDRM](https://github.com/GosuDRM): for the open version of roop
+45
View File
@@ -0,0 +1,45 @@
{
"Source x Target Mapper": "Pemetaan Sumber x Target",
"select a source image": "Pilih gambar sumber",
"Preview": "Pratinjau",
"select a target image or video": "Pilih gambar atau video target",
"save image output file": "Simpan file keluaran gambar",
"save video output file": "Simpan file keluaran video",
"select a target image": "Pilih gambar target",
"source": "Sumber",
"Select a target": "Pilih target",
"Select a face": "Pilih wajah",
"Keep audio": "Pertahankan audio",
"Face Enhancer": "Peningkat wajah",
"Many faces": "Banyak wajah",
"Show FPS": "Tampilkan FPS",
"Keep fps": "Pertahankan FPS",
"Keep frames": "Pertahankan frame",
"Fix Blueish Cam": "Perbaiki kamera kebiruan",
"Mouth Mask": "Masker mulut",
"Show Mouth Mask Box": "Tampilkan kotak masker mulut",
"Start": "Mulai",
"Live": "Langsung",
"Destroy": "Hentikan",
"Map faces": "Petakan wajah",
"Processing...": "Sedang memproses...",
"Processing succeed!": "Pemrosesan berhasil!",
"Processing ignored!": "Pemrosesan diabaikan!",
"Failed to start camera": "Gagal memulai kamera",
"Please complete pop-up or close it.": "Harap selesaikan atau tutup pop-up.",
"Getting unique faces": "Mengambil wajah unik",
"Please select a source image first": "Silakan pilih gambar sumber terlebih dahulu",
"No faces found in target": "Tidak ada wajah ditemukan pada target",
"Add": "Tambah",
"Clear": "Bersihkan",
"Submit": "Kirim",
"Select source image": "Pilih gambar sumber",
"Select target image": "Pilih gambar target",
"Please provide mapping!": "Harap tentukan pemetaan!",
"At least 1 source with target is required!": "Minimal 1 sumber dengan target diperlukan!",
"Face could not be detected in last upload!": "Wajah tidak dapat terdeteksi pada unggahan terakhir!",
"Select Camera:": "Pilih Kamera:",
"All mappings cleared!": "Semua pemetaan telah dibersihkan!",
"Mappings successfully submitted!": "Pemetaan berhasil dikirim!",
"Source x Target Mapper is already open.": "Pemetaan Sumber x Target sudah terbuka."
}
+2 -1
View File
@@ -1,6 +1,7 @@
from typing import Any
import cv2
import modules.globals # Import the globals to check the color correction toggle
from modules.gpu_processing import gpu_cvt_color
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
@@ -19,7 +20,7 @@ def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
if has_frame and modules.globals.color_correction:
# Convert the frame color if necessary
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = gpu_cvt_color(frame, cv2.COLOR_BGR2RGB)
capture.release()
return frame if has_frame else None
+49 -7
View File
@@ -11,7 +11,11 @@ import platform
import signal
import shutil
import argparse
import torch
try:
import torch
HAS_TORCH = True
except ImportError:
HAS_TORCH = False
import onnxruntime
import tensorflow
@@ -21,11 +25,12 @@ import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
if HAS_TORCH and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
if HAS_TORCH:
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def parse_args() -> None:
@@ -129,11 +134,22 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int:
"""Suggest optimal thread count based on hardware and execution provider."""
import os
# Get CPU count
cpu_count = os.cpu_count() or 4
if 'DmlExecutionProvider' in modules.globals.execution_providers:
return 1
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
return 1
return 8
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
# For CUDA, use more threads for parallel frame processing
return min(cpu_count, 16)
# For CPU execution, use most cores but leave some for system
return max(4, min(cpu_count - 2, 16))
def limit_resources() -> None:
@@ -156,7 +172,7 @@ def limit_resources() -> None:
def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
if 'CUDAExecutionProvider' in modules.globals.execution_providers and HAS_TORCH:
torch.cuda.empty_cache()
@@ -176,10 +192,16 @@ def update_status(message: str, scope: str = 'DLC.CORE') -> None:
ui.update_status(message)
def start() -> None:
"""Start processing with performance monitoring."""
import time
start_time = time.time()
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
update_status('Processing...')
# process image to image
if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy):
@@ -193,26 +215,40 @@ def start() -> None:
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
elapsed = time.time() - start_time
update_status(f'Processing to image succeed! (Time: {elapsed:.2f}s)')
else:
update_status('Processing to image failed!')
return
# process image to videos
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy):
return
extraction_start = time.time()
if not modules.globals.map_faces:
update_status('Creating temp resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
extraction_time = time.time() - extraction_start
update_status(f'Frame extraction completed in {extraction_time:.2f}s')
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
total_frames = len(temp_frame_paths)
update_status(f'Processing {total_frames} frames with {modules.globals.execution_threads} threads...')
processing_start = time.time()
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources()
processing_time = time.time() - processing_start
fps_processing = total_frames / processing_time if processing_time > 0 else 0
update_status(f'Frame processing completed in {processing_time:.2f}s ({fps_processing:.2f} fps)')
# handles fps
encoding_start = time.time()
if modules.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path)
@@ -221,6 +257,9 @@ def start() -> None:
else:
update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path)
encoding_time = time.time() - encoding_start
update_status(f'Video encoding completed in {encoding_time:.2f}s')
# handle audio
if modules.globals.keep_audio:
if modules.globals.keep_fps:
@@ -230,10 +269,13 @@ def start() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path)
else:
move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
total_time = time.time() - start_time
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
update_status(f'Processing to video succeed! Total time: {total_time:.2f}s')
else:
update_status('Processing to video failed!')
+7
View File
@@ -0,0 +1,7 @@
from typing import Any
from insightface.app.common import Face
import numpy
Face = Face
Frame = numpy.ndarray[Any, Any]
+12 -2
View File
@@ -2,6 +2,7 @@ import os
import shutil
from typing import Any
import insightface
import threading
import cv2
import numpy as np
@@ -13,14 +14,23 @@ from modules.utilities import get_temp_directory_path, create_temp, extract_fram
from pathlib import Path
FACE_ANALYSER = None
FACE_ANALYSER_LOCK = threading.Lock()
def get_face_analyser() -> Any:
"""Get face analyser with thread-safe initialization."""
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
with FACE_ANALYSER_LOCK:
# Double-check after acquiring lock
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(
name='buffalo_l',
providers=modules.globals.execution_providers,
allowed_modules=['detection', 'recognition']
)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(320, 320))
return FACE_ANALYSER
+58 -29
View File
@@ -1,3 +1,5 @@
# --- START OF FILE globals.py ---
import os
from typing import List, Dict, Any
@@ -9,35 +11,62 @@ file_types = [
("Video", ("*.mp4", "*.mkv")),
]
source_target_map = []
simple_map = {}
# Face Mapping Data
source_target_map: List[Dict[str, Any]] = [] # Stores detailed map for image/video processing
simple_map: Dict[str, Any] = {} # Stores simplified map (embeddings/faces) for live/simple mode
source_path = None
target_path = None
output_path = None
# Paths
source_path: str | None = None
target_path: str | None = None
output_path: str | None = None
# Processing Options
frame_processors: List[str] = []
keep_fps = True
keep_audio = True
keep_frames = False
many_faces = False
map_faces = False
color_correction = False # New global variable for color correction toggle
nsfw_filter = False
video_encoder = None
video_quality = None
live_mirror = False
live_resizable = True
max_memory = None
execution_providers: List[str] = []
execution_threads = None
headless = None
log_level = "error"
keep_fps: bool = True
keep_audio: bool = True
keep_frames: bool = False
many_faces: bool = False # Process all detected faces with default source
map_faces: bool = False # Use source_target_map or simple_map for specific swaps
poisson_blend: bool = False # Enable Poisson Blending for smoother face swaps
color_correction: bool = False # Enable color correction (implementation specific)
nsfw_filter: bool = False
# Video Output Options
video_encoder: str | None = None
video_quality: int | None = None # Typically a CRF value or bitrate
# Live Mode Options
live_mirror: bool = False
live_resizable: bool = True
camera_input_combobox: Any | None = None # Placeholder for UI element if needed
webcam_preview_running: bool = False
show_fps: bool = False
# System Configuration
max_memory: int | None = None # Memory limit in GB? (Needs clarification)
execution_providers: List[str] = [] # e.g., ['CUDAExecutionProvider', 'CPUExecutionProvider']
execution_threads: int | None = None # Number of threads for CPU execution
headless: bool | None = None # Run without UI?
log_level: str = "error" # Logging level (e.g., 'debug', 'info', 'warning', 'error')
# Face Processor UI Toggles (Example)
fp_ui: Dict[str, bool] = {"face_enhancer": False}
camera_input_combobox = None
webcam_preview_running = False
show_fps = False
mouth_mask = False
show_mouth_mask_box = False
mask_feather_ratio = 8
mask_down_size = 0.50
mask_size = 1
# Face Swapper Specific Options
face_swapper_enabled: bool = True # General toggle for the swapper processor
opacity: float = 1.0 # Blend factor for the swapped face (0.0-1.0)
sharpness: float = 0.0 # Sharpness enhancement for swapped face (0.0-1.0+)
# Mouth Mask Options
mouth_mask: bool = False # Enable mouth area masking/pasting
show_mouth_mask_box: bool = False # Visualize the mouth mask area (for debugging)
mask_feather_ratio: int = 12 # Denominator for feathering calculation (higher = smaller feather)
mask_down_size: float = 0.1 # Expansion factor for lower lip mask (relative)
mask_size: float = 1.0 # Expansion factor for upper lip mask (relative)
# --- START: Added for Frame Interpolation ---
enable_interpolation: bool = True # Toggle temporal smoothing
interpolation_weight: float = 0 # Blend weight for current frame (0.0-1.0). Lower=smoother.
# --- END: Added for Frame Interpolation ---
# --- END OF FILE globals.py ---
+286
View File
@@ -0,0 +1,286 @@
# --- START OF FILE gpu_processing.py ---
"""
GPU-accelerated image processing using OpenCV CUDA (cv2.cuda.GpuMat).
Provides drop-in replacements for common cv2 functions. When OpenCV is built
with CUDA support the functions transparently upload → process → download via
GpuMat; otherwise they fall back to the regular CPU path so the rest of the
codebase never has to care whether CUDA is available.
Usage
-----
from modules.gpu_processing import (
gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted,
gpu_resize, gpu_cvt_color, gpu_flip,
is_gpu_accelerated,
)
"""
from __future__ import annotations
import cv2
import numpy as np
from typing import Tuple, Optional
# ---------------------------------------------------------------------------
# CUDA availability detection (evaluated once at import time)
# ---------------------------------------------------------------------------
CUDA_AVAILABLE: bool = False
try:
# cv2.cuda.GpuMat is only present when OpenCV is compiled with CUDA
_test_mat = cv2.cuda.GpuMat()
# Verify we have the required filter / image-processing functions
_has_gauss = hasattr(cv2.cuda, "createGaussianFilter")
_has_resize = hasattr(cv2.cuda, "resize")
_has_cvt = hasattr(cv2.cuda, "cvtColor")
if _has_gauss and _has_resize and _has_cvt:
CUDA_AVAILABLE = True
print("[gpu_processing] OpenCV CUDA support detected GPU-accelerated processing enabled.")
else:
missing = []
if not _has_gauss:
missing.append("createGaussianFilter")
if not _has_resize:
missing.append("resize")
if not _has_cvt:
missing.append("cvtColor")
print(f"[gpu_processing] cv2.cuda.GpuMat exists but missing: {', '.join(missing)} falling back to CPU.")
except Exception:
print("[gpu_processing] OpenCV CUDA not available using CPU fallback for all operations.")
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ensure_uint8(img: np.ndarray) -> np.ndarray:
"""Clip and convert to uint8 if necessary."""
if img.dtype != np.uint8:
return np.clip(img, 0, 255).astype(np.uint8)
return img
def _ksize_odd(ksize: Tuple[int, int]) -> Tuple[int, int]:
"""Ensure kernel dimensions are positive and odd (required by GaussianBlur)."""
kw = max(1, ksize[0] // 2 * 2 + 1) if ksize[0] > 0 else 0
kh = max(1, ksize[1] // 2 * 2 + 1) if ksize[1] > 0 else 0
return (kw, kh)
def _cv_type_for(img: np.ndarray) -> int:
"""Return the OpenCV type constant matching *img* (uint8 only)."""
channels = 1 if img.ndim == 2 else img.shape[2]
if channels == 1:
return cv2.CV_8UC1
elif channels == 3:
return cv2.CV_8UC3
elif channels == 4:
return cv2.CV_8UC4
return cv2.CV_8UC3 # fallback
# ---------------------------------------------------------------------------
# Public API Gaussian Blur
# ---------------------------------------------------------------------------
def gpu_gaussian_blur(
src: np.ndarray,
ksize: Tuple[int, int],
sigma_x: float,
sigma_y: float = 0,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.GaussianBlur`` with CUDA acceleration.
Parameters match ``cv2.GaussianBlur(src, ksize, sigmaX, sigmaY)``.
When *ksize* is ``(0, 0)`` OpenCV computes the kernel size from *sigma_x*.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
cv_type = _cv_type_for(src_u8)
ks = _ksize_odd(ksize) if ksize != (0, 0) else ksize
gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, ks, sigma_x, sigma_y)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_dst = gauss.apply(gpu_src)
return gpu_dst.download()
except cv2.error:
pass
return cv2.GaussianBlur(src, ksize, sigma_x, sigmaY=sigma_y)
# ---------------------------------------------------------------------------
# Public API addWeighted
# ---------------------------------------------------------------------------
def gpu_add_weighted(
src1: np.ndarray,
alpha: float,
src2: np.ndarray,
beta: float,
gamma: float,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.addWeighted`` with CUDA acceleration."""
if CUDA_AVAILABLE:
try:
s1 = _ensure_uint8(src1)
s2 = _ensure_uint8(src2)
g1 = cv2.cuda.GpuMat()
g2 = cv2.cuda.GpuMat()
g1.upload(s1)
g2.upload(s2)
gpu_dst = cv2.cuda.addWeighted(g1, alpha, g2, beta, gamma)
return gpu_dst.download()
except cv2.error:
pass
return cv2.addWeighted(src1, alpha, src2, beta, gamma)
# ---------------------------------------------------------------------------
# Public API Unsharp-mask sharpening
# ---------------------------------------------------------------------------
def gpu_sharpen(
src: np.ndarray,
strength: float,
sigma: float = 3,
) -> np.ndarray:
"""Unsharp-mask sharpening, optionally GPU-accelerated.
Equivalent to::
blurred = GaussianBlur(src, (0,0), sigma)
result = addWeighted(src, 1+strength, blurred, -strength, 0)
"""
if strength <= 0:
return src
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
cv_type = _cv_type_for(src_u8)
gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, (0, 0), sigma)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_blurred = gauss.apply(gpu_src)
gpu_sharp = cv2.cuda.addWeighted(gpu_src, 1.0 + strength, gpu_blurred, -strength, 0)
result = gpu_sharp.download()
return np.clip(result, 0, 255).astype(np.uint8)
except cv2.error:
pass
blurred = cv2.GaussianBlur(src, (0, 0), sigma)
sharpened = cv2.addWeighted(src, 1.0 + strength, blurred, -strength, 0)
return np.clip(sharpened, 0, 255).astype(np.uint8)
# ---------------------------------------------------------------------------
# Public API Resize
# ---------------------------------------------------------------------------
# Map common cv2 interpolation flags to their CUDA equivalents
_INTERP_MAP = {
cv2.INTER_NEAREST: cv2.INTER_NEAREST,
cv2.INTER_LINEAR: cv2.INTER_LINEAR,
cv2.INTER_CUBIC: cv2.INTER_CUBIC,
cv2.INTER_AREA: cv2.INTER_AREA,
cv2.INTER_LANCZOS4: cv2.INTER_LANCZOS4,
}
def gpu_resize(
src: np.ndarray,
dsize: Tuple[int, int],
fx: float = 0,
fy: float = 0,
interpolation: int = cv2.INTER_LINEAR,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.resize`` with CUDA acceleration.
Parameters match ``cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=...)``.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
interp = _INTERP_MAP.get(interpolation, cv2.INTER_LINEAR)
if dsize and dsize[0] > 0 and dsize[1] > 0:
gpu_dst = cv2.cuda.resize(gpu_src, dsize, interpolation=interp)
else:
gpu_dst = cv2.cuda.resize(gpu_src, (0, 0), fx=fx, fy=fy, interpolation=interp)
return gpu_dst.download()
except cv2.error:
pass
return cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=interpolation)
# ---------------------------------------------------------------------------
# Public API Color conversion
# ---------------------------------------------------------------------------
def gpu_cvt_color(
src: np.ndarray,
code: int,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.cvtColor`` with CUDA acceleration.
Parameters match ``cv2.cvtColor(src, code)``.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_dst = cv2.cuda.cvtColor(gpu_src, code)
return gpu_dst.download()
except cv2.error:
pass
return cv2.cvtColor(src, code)
# ---------------------------------------------------------------------------
# Public API Flip
# ---------------------------------------------------------------------------
def gpu_flip(
src: np.ndarray,
flip_code: int,
) -> np.ndarray:
"""Drop-in replacement for ``cv2.flip`` with CUDA acceleration.
Parameters match ``cv2.flip(src, flipCode)``.
*flip_code*: 0 = vertical, 1 = horizontal, -1 = both.
"""
if CUDA_AVAILABLE:
try:
src_u8 = _ensure_uint8(src)
gpu_src = cv2.cuda.GpuMat()
gpu_src.upload(src_u8)
gpu_dst = cv2.cuda.flip(gpu_src, flip_code)
return gpu_dst.download()
except cv2.error:
pass
return cv2.flip(src, flip_code)
# ---------------------------------------------------------------------------
# Convenience: check at runtime whether GPU path is active
# ---------------------------------------------------------------------------
def is_gpu_accelerated() -> bool:
"""Return ``True`` when the CUDA path will be used."""
return CUDA_AVAILABLE
# --- END OF FILE gpu_processing.py ---
+2 -2
View File
@@ -1,3 +1,3 @@
name = 'Deep-Live-Cam'
version = '1.8.1'
edition = 'GitHub Edition'
version = '2.0.3c'
edition = 'GitHub Edition'
+2 -1
View File
@@ -3,6 +3,7 @@ import opennsfw2
from PIL import Image
import cv2 # Add OpenCV import
import modules.globals # Import globals to access the color correction toggle
from modules.gpu_processing import gpu_cvt_color
from modules.typing import Frame
@@ -14,7 +15,7 @@ model = None
def predict_frame(target_frame: Frame) -> bool:
# Convert the frame to RGB before processing if color correction is enabled
if modules.globals.color_correction:
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
target_frame = gpu_cvt_color(target_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
+23 -7
View File
@@ -67,13 +67,29 @@ def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
print(f"Warning: Error removing frame processor {frame_processor}: {e}")
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
future = executor.submit(process_frames, source_path, [path], progress)
futures.append(future)
for future in futures:
future.result()
"""Process frames in parallel with optimized batching and memory management."""
max_workers = modules.globals.execution_threads
# Determine optimal batch size based on available memory and thread count
# Process frames in batches to avoid memory overflow
batch_size = max(1, min(32, len(temp_frame_paths) // max(1, max_workers)))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Process in batches to manage memory better
for i in range(0, len(temp_frame_paths), batch_size):
batch = temp_frame_paths[i:i + batch_size]
futures = []
for path in batch:
future = executor.submit(process_frames, source_path, [path], progress)
futures.append(future)
# Wait for batch to complete before starting next batch
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error processing frame: {e}")
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
+306 -64
View File
@@ -1,18 +1,20 @@
# --- START OF FILE face_enhancer.py ---
# Uses ONNX Runtime for GFPGAN face enhancement (no torch/gfpgan dependency)
from typing import Any, List
import cv2
import threading
import gfpgan
import numpy as np
import os
import onnxruntime
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.face_analyser import get_one_face, get_many_faces
from modules.typing import Frame, Face
import platform
import torch
from modules.utilities import (
conditional_download,
is_image,
is_video,
)
@@ -27,15 +29,29 @@ models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
# Standard FFHQ 5-point face template for 512x512 resolution
# Points: left_eye, right_eye, nose, left_mouth, right_mouth
FFHQ_TEMPLATE_512 = np.array(
[
[192.98138, 239.94708],
[318.90277, 240.19366],
[256.63416, 314.01935],
[201.26117, 371.41043],
[313.08905, 371.15118],
],
dtype=np.float32,
)
def pre_check() -> bool:
download_directory_path = models_dir
conditional_download(
download_directory_path,
[
"https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth"
],
)
model_path = os.path.join(models_dir, "gfpgan-1024.onnx")
if not os.path.exists(model_path):
update_status(
f"GFPGAN ONNX model not found at {model_path}. "
"Please place gfpgan-1024.onnx in the models folder.",
NAME,
)
return False
return True
@@ -48,83 +64,309 @@ def pre_start() -> bool:
return True
TENSORRT_AVAILABLE = False
try:
import torch_tensorrt
TENSORRT_AVAILABLE = True
except ImportError as im:
print(f"TensorRT is not available: {im}")
pass
except Exception as e:
print(f"TensorRT is not available: {e}")
pass
def get_face_enhancer() -> Any:
def get_face_enhancer() -> onnxruntime.InferenceSession:
"""
Initializes and returns the GFPGAN ONNX Runtime inference session,
using the execution providers configured in modules.globals.
"""
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
selected_device = None
device_priority = []
model_path = os.path.join(models_dir, "gfpgan-1024.onnx")
if TENSORRT_AVAILABLE and torch.cuda.is_available():
selected_device = torch.device("cuda")
device_priority.append("TensorRT+CUDA")
elif torch.cuda.is_available():
selected_device = torch.device("cuda")
device_priority.append("CUDA")
elif torch.backends.mps.is_available() and platform.system() == "Darwin":
selected_device = torch.device("mps")
device_priority.append("MPS")
elif not torch.cuda.is_available():
selected_device = torch.device("cpu")
device_priority.append("CPU")
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device)
if not os.path.exists(model_path):
raise FileNotFoundError(
f"{NAME}: Model not found at {model_path}"
)
try:
providers = modules.globals.execution_providers
session_options = onnxruntime.SessionOptions()
session_options.graph_optimization_level = (
onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
)
FACE_ENHANCER = onnxruntime.InferenceSession(
model_path,
sess_options=session_options,
providers=providers,
)
input_info = FACE_ENHANCER.get_inputs()[0]
output_info = FACE_ENHANCER.get_outputs()[0]
active_providers = FACE_ENHANCER.get_providers()
print(
f"{NAME}: GFPGAN ONNX model loaded successfully."
)
print(
f"{NAME}: Input: {input_info.name}, "
f"shape: {input_info.shape}, type: {input_info.type}"
)
print(
f"{NAME}: Output: {output_info.name}, "
f"shape: {output_info.shape}, type: {output_info.type}"
)
print(f"{NAME}: Active providers: {active_providers}")
except Exception as e:
print(f"{NAME}: Error loading GFPGAN ONNX model: {e}")
FACE_ENHANCER = None
raise RuntimeError(
f"{NAME}: Failed to load GFPGAN ONNX model: {e}"
)
if FACE_ENHANCER is None:
raise RuntimeError(
f"{NAME}: Failed to initialize GFPGAN ONNX session. Check logs."
)
# for debug:
print(f"Selected device: {selected_device} and device priority: {device_priority}")
return FACE_ENHANCER
def _align_face(
frame: Frame, landmarks_5: np.ndarray, output_size: int
) -> tuple:
"""
Align and crop a face from the frame using 5-point landmarks and the
standard FFHQ template.
Returns:
(aligned_face, affine_matrix) or (None, None) on failure.
"""
# Scale the 512-base template to the desired output size
scale = output_size / 512.0
template = FFHQ_TEMPLATE_512 * scale
# Estimate a similarity transform (4 DOF: rotation, scale, tx, ty)
affine_matrix, _ = cv2.estimateAffinePartial2D(
landmarks_5, template, method=cv2.LMEDS
)
if affine_matrix is None:
return None, None
# Warp the face to the aligned position
aligned_face = cv2.warpAffine(
frame,
affine_matrix,
(output_size, output_size),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(135, 133, 132),
)
return aligned_face, affine_matrix
def _paste_back(
frame: Frame,
enhanced_face: np.ndarray,
affine_matrix: np.ndarray,
output_size: int,
) -> Frame:
"""
Paste an enhanced (aligned) face back onto the original frame using the
inverse affine transform with feathered-edge blending.
"""
h, w = frame.shape[:2]
# Inverse the affine warp
inv_matrix = cv2.invertAffineTransform(affine_matrix)
inv_restored = cv2.warpAffine(
enhanced_face,
inv_matrix,
(w, h),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
)
# Build a soft feathered mask in aligned space for edge blending
face_mask = np.ones((output_size, output_size), dtype=np.float32)
# Feather the border (5 % of the size on each edge)
border = max(1, int(output_size * 0.05))
ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32)
ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32)
# Top / bottom rows
face_mask[:border, :] *= ramp_up[:, None]
face_mask[-border:, :] *= ramp_down[:, None]
# Left / right columns
face_mask[:, :border] *= ramp_up[None, :]
face_mask[:, -border:] *= ramp_down[None, :]
# Expand to 3-channel
face_mask_3c = np.stack([face_mask] * 3, axis=-1)
# Warp mask back to original frame space
inv_mask = cv2.warpAffine(
face_mask_3c,
inv_matrix,
(w, h),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
)
inv_mask = np.clip(inv_mask, 0.0, 1.0)
# Alpha-blend
result = (
frame.astype(np.float32) * (1.0 - inv_mask)
+ inv_restored.astype(np.float32) * inv_mask
)
return np.clip(result, 0, 255).astype(np.uint8)
def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray:
"""
Convert an aligned BGR uint8 face image to the ONNX model input tensor.
Format: NCHW float32, normalised to [-1, 1].
"""
# BGR -> RGB
rgb = cv2.cvtColor(aligned_face, cv2.COLOR_BGR2RGB).astype(np.float32)
# [0, 255] -> [0, 1] -> [-1, 1]
rgb = rgb / 255.0
rgb = (rgb - 0.5) / 0.5
# HWC -> CHW, add batch dim
chw = np.transpose(rgb, (2, 0, 1))
return np.expand_dims(chw, axis=0) # shape: (1, 3, H, W)
def _postprocess_face(output: np.ndarray) -> np.ndarray:
"""
Convert the ONNX model output tensor back to a BGR uint8 image.
Expects input in NCHW format with values in [-1, 1].
"""
face = np.squeeze(output) # remove batch dim -> (3, H, W)
face = np.transpose(face, (1, 2, 0)) # CHW -> HWC
# [-1, 1] -> [0, 1] -> [0, 255]
face = (face + 1.0) / 2.0
face = np.clip(face * 255.0, 0, 255).astype(np.uint8)
# RGB -> BGR
return cv2.cvtColor(face, cv2.COLOR_RGB2BGR)
def enhance_face(temp_frame: Frame) -> Frame:
with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True)
return temp_frame
"""Enhances all faces in a frame using the GFPGAN ONNX model."""
session = get_face_enhancer()
# Determine model input resolution from the session metadata
input_info = session.get_inputs()[0]
input_name = input_info.name
input_shape = input_info.shape # e.g. [1, 3, 512, 512]
# Safely extract input size (handle dynamic / symbolic dimensions)
try:
align_size = int(input_shape[2])
if align_size <= 0:
align_size = 512
except (ValueError, TypeError, IndexError):
align_size = 512
# Detect faces using InsightFace (already a project dependency)
faces = get_many_faces(temp_frame)
if not faces:
return temp_frame
result_frame = temp_frame.copy()
for face in faces:
# Need the 5-point key-points for alignment
if not hasattr(face, "kps") or face.kps is None:
continue
landmarks_5 = face.kps.astype(np.float32)
if landmarks_5.shape[0] < 5:
continue
# Align / crop the face at the model's INPUT resolution
aligned_face, affine_matrix = _align_face(
temp_frame, landmarks_5, output_size=align_size
)
if aligned_face is None or affine_matrix is None:
continue
try:
with THREAD_SEMAPHORE:
input_tensor = _preprocess_face(aligned_face)
output_tensor = session.run(None, {input_name: input_tensor})[0]
enhanced_bgr = _postprocess_face(output_tensor)
# The model may output at a different resolution than its input
# (e.g. input 512x512 → output 1024x1024). Resize the enhanced
# face back to the alignment size so the inverse affine maps
# correctly.
eh, ew = enhanced_bgr.shape[:2]
if eh != align_size or ew != align_size:
enhanced_bgr = cv2.resize(
enhanced_bgr,
(align_size, align_size),
interpolation=cv2.INTER_LANCZOS4,
)
# Paste enhanced face back onto the frame
result_frame = _paste_back(
result_frame, enhanced_bgr, affine_matrix, output_size=align_size
)
except Exception as e:
print(f"{NAME}: Error enhancing a face: {e}")
continue
return result_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
"""Processes a frame: enhances face if detected."""
temp_frame = enhance_face(temp_frame)
return temp_frame
def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
) -> None:
"""Processes multiple frames from file paths."""
for temp_frame_path in temp_frame_paths:
if not os.path.exists(temp_frame_path):
print(
f"{NAME}: Warning: Frame path not found {temp_frame_path}, skipping."
)
if progress:
progress.update(1)
continue
temp_frame = cv2.imread(temp_frame_path)
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
if temp_frame is None:
print(
f"{NAME}: Warning: Failed to read frame {temp_frame_path}, skipping."
)
if progress:
progress.update(1)
continue
result_frame = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result_frame)
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
def process_image(
source_path: str | None, target_path: str, output_path: str
) -> None:
"""Processes a single image file."""
target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame)
cv2.imwrite(output_path, result)
if target_frame is None:
print(f"{NAME}: Error: Failed to read target image {target_path}")
return
result_frame = process_frame(None, target_frame)
cv2.imwrite(output_path, result_frame)
print(f"{NAME}: Enhanced image saved to {output_path}")
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
def process_video(
source_path: str | None, temp_frame_paths: List[str]
) -> None:
"""Processes video frames using the frame processor core."""
modules.processors.frame.core.process_video(
source_path, temp_frame_paths, process_frames
)
def process_frame_v2(temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
return temp_frame
# --- END OF FILE face_enhancer.py ---
+567
View File
@@ -0,0 +1,567 @@
import cv2
import numpy as np
from modules.typing import Face, Frame
import modules.globals
from modules.gpu_processing import gpu_gaussian_blur, gpu_resize, gpu_cvt_color
def apply_color_transfer(source, target):
"""
Apply color transfer from target to source image
"""
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
source_mean, source_std = cv2.meanStdDev(source)
target_mean, target_std = cv2.meanStdDev(target)
# Reshape mean and std to be broadcastable
source_mean = source_mean.reshape(1, 1, 3)
source_std = source_std.reshape(1, 1, 3)
target_mean = target_mean.reshape(1, 1, 3)
target_std = target_std.reshape(1, 1, 3)
# Perform the color transfer
source = (source - source_mean) * (target_std / source_std) + target_mean
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
landmarks = face.landmark_2d_106
if landmarks is not None:
# Convert landmarks to int32
landmarks = landmarks.astype(np.int32)
# Extract facial features
right_side_face = landmarks[0:16]
left_side_face = landmarks[17:32]
right_eye = landmarks[33:42]
right_eye_brow = landmarks[43:51]
left_eye = landmarks[87:96]
left_eye_brow = landmarks[97:105]
# Calculate padding
padding = int(
np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05
) # 5% of face width
# Create a slightly larger convex hull for padding
face_outline = landmarks[0:33]
hull = cv2.convexHull(face_outline)
hull_padded = []
for point in hull:
x, y = point[0]
center = np.mean(face_outline, axis=0)
direction = np.array([x, y]) - center
direction = direction / np.linalg.norm(direction)
padded_point = np.array([x, y]) + direction * padding
hull_padded.append(padded_point)
hull_padded = np.array(hull_padded, dtype=np.int32)
# Fill the padded convex hull
cv2.fillConvexPoly(mask, hull_padded, 255)
# Smooth the mask edges (GPU-accelerated when available)
mask = gpu_gaussian_blur(mask, (5, 5), 3)
return mask
def create_lower_mouth_mask(
face: Face, frame: Frame
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
mouth_cutout = None
lower_lip_polygon = None
mouth_box = (0,0,0,0)
landmarks = face.landmark_2d_106
if landmarks is not None:
# Use outer mouth landmarks (52-63) to capture the lips only
lower_lip_order = list(range(52, 64))
if max(lower_lip_order) >= landmarks.shape[0]:
return mask, mouth_cutout, mouth_box, lower_lip_polygon
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
# Calculate the center of the landmarks
center = np.mean(lower_lip_landmarks, axis=0)
# Expand the landmarks outward using the mouth_mask_size
# Use a more conservative expansion to avoid affecting face shape
expansion_factor = (
1 + modules.globals.mask_down_size * modules.globals.mouth_mask_size
)
expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center
# Removed specific top/chin extensions to preserve face shape
# Convert back to integer coordinates
expanded_landmarks = expanded_landmarks.astype(np.int32)
# Calculate bounding box for the expanded lower mouth
min_x, min_y = np.min(expanded_landmarks, axis=0)
max_x, max_y = np.max(expanded_landmarks, axis=0)
# Add some padding to the bounding box
padding = int((max_x - min_x) * 0.1) # 10% padding
min_x = max(0, min_x - padding)
min_y = max(0, min_y - padding)
max_x = min(frame.shape[1], max_x + padding)
max_y = min(frame.shape[0], max_y + padding)
# Ensure the bounding box dimensions are valid
if max_x <= min_x or max_y <= min_y:
if (max_x - min_x) <= 1:
max_x = min_x + 1
if (max_y - min_y) <= 1:
max_y = min_y + 1
# Create the mask
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
# Shift polygon coordinates relative to the ROI's top-left corner
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
# Apply Gaussian blur to soften the mask edges (GPU-accelerated when available)
mask_roi = gpu_gaussian_blur(mask_roi, (15, 15), 5)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
# Extract the masked area from the frame
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
# Return the expanded lower lip polygon in original frame coordinates
lower_lip_polygon = expanded_landmarks
mouth_box = (min_x, min_y, max_x, max_y)
return mask, mouth_cutout, mouth_box, lower_lip_polygon
def create_eyes_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
eyes_cutout = None
landmarks = face.landmark_2d_106
if landmarks is not None:
# Left eye landmarks (87-96) and right eye landmarks (33-42)
left_eye = landmarks[87:96]
right_eye = landmarks[33:42]
# Calculate centers and dimensions for each eye
left_eye_center = np.mean(left_eye, axis=0).astype(np.int32)
right_eye_center = np.mean(right_eye, axis=0).astype(np.int32)
# Calculate eye dimensions with size adjustment
def get_eye_dimensions(eye_points):
x_coords = eye_points[:, 0]
y_coords = eye_points[:, 1]
width = int((np.max(x_coords) - np.min(x_coords)) * (1 + modules.globals.mask_down_size * modules.globals.eyes_mask_size))
height = int((np.max(y_coords) - np.min(y_coords)) * (1 + modules.globals.mask_down_size * modules.globals.eyes_mask_size))
return width, height
left_width, left_height = get_eye_dimensions(left_eye)
right_width, right_height = get_eye_dimensions(right_eye)
# Add extra padding
padding = int(max(left_width, right_width) * 0.2)
# Calculate bounding box for both eyes
min_x = min(left_eye_center[0] - left_width//2, right_eye_center[0] - right_width//2) - padding
max_x = max(left_eye_center[0] + left_width//2, right_eye_center[0] + right_width//2) + padding
min_y = min(left_eye_center[1] - left_height//2, right_eye_center[1] - right_height//2) - padding
max_y = max(left_eye_center[1] + left_height//2, right_eye_center[1] + right_height//2) + padding
# Ensure coordinates are within frame bounds
min_x = max(0, min_x)
min_y = max(0, min_y)
max_x = min(frame.shape[1], max_x)
max_y = min(frame.shape[0], max_y)
# Create mask for the eyes region
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
# Draw ellipses for both eyes
left_center = (left_eye_center[0] - min_x, left_eye_center[1] - min_y)
right_center = (right_eye_center[0] - min_x, right_eye_center[1] - min_y)
# Calculate axes lengths (half of width and height)
left_axes = (left_width//2, left_height//2)
right_axes = (right_width//2, right_height//2)
# Draw filled ellipses
cv2.ellipse(mask_roi, left_center, left_axes, 0, 0, 360, 255, -1)
cv2.ellipse(mask_roi, right_center, right_axes, 0, 0, 360, 255, -1)
# Apply Gaussian blur to soften mask edges (GPU-accelerated when available)
mask_roi = gpu_gaussian_blur(mask_roi, (15, 15), 5)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
# Extract the masked area from the frame
eyes_cutout = frame[min_y:max_y, min_x:max_x].copy()
# Create polygon points for visualization
def create_ellipse_points(center, axes):
t = np.linspace(0, 2*np.pi, 32)
x = center[0] + axes[0] * np.cos(t)
y = center[1] + axes[1] * np.sin(t)
return np.column_stack((x, y)).astype(np.int32)
# Generate points for both ellipses
left_points = create_ellipse_points((left_eye_center[0], left_eye_center[1]), (left_width//2, left_height//2))
right_points = create_ellipse_points((right_eye_center[0], right_eye_center[1]), (right_width//2, right_height//2))
# Combine points for both eyes
eyes_polygon = np.vstack([left_points, right_points])
return mask, eyes_cutout, (min_x, min_y, max_x, max_y), eyes_polygon
def create_curved_eyebrow(points):
if len(points) >= 5:
# Sort points by x-coordinate
sorted_idx = np.argsort(points[:, 0])
sorted_points = points[sorted_idx]
# Calculate dimensions
x_min, y_min = np.min(sorted_points, axis=0)
x_max, y_max = np.max(sorted_points, axis=0)
width = x_max - x_min
height = y_max - y_min
# Create more points for smoother curve
num_points = 50
x = np.linspace(x_min, x_max, num_points)
# Fit quadratic curve through points for more natural arch
coeffs = np.polyfit(sorted_points[:, 0], sorted_points[:, 1], 2)
y = np.polyval(coeffs, x)
# Increased offsets to create more separation
top_offset = height * 0.5 # Increased from 0.3 to shift up more
bottom_offset = height * 0.2 # Increased from 0.1 to shift down more
# Create smooth curves
top_curve = y - top_offset
bottom_curve = y + bottom_offset
# Create curved endpoints with more pronounced taper
end_points = 5
start_x = np.linspace(x[0] - width * 0.15, x[0], end_points) # Increased taper
end_x = np.linspace(x[-1], x[-1] + width * 0.15, end_points) # Increased taper
# Create tapered ends
start_curve = np.column_stack((
start_x,
np.linspace(bottom_curve[0], top_curve[0], end_points)
))
end_curve = np.column_stack((
end_x,
np.linspace(bottom_curve[-1], top_curve[-1], end_points)
))
# Combine all points to form a smooth contour
contour_points = np.vstack([
start_curve,
np.column_stack((x, top_curve)),
end_curve,
np.column_stack((x[::-1], bottom_curve[::-1]))
])
# Add slight padding for better coverage
center = np.mean(contour_points, axis=0)
vectors = contour_points - center
padded_points = center + vectors * 1.2 # Increased padding slightly
return padded_points
return points
def create_eyebrows_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
eyebrows_cutout = None
landmarks = face.landmark_2d_106
if landmarks is not None:
# Left eyebrow landmarks (97-105) and right eyebrow landmarks (43-51)
left_eyebrow = landmarks[97:105].astype(np.float32)
right_eyebrow = landmarks[43:51].astype(np.float32)
# Calculate centers and dimensions for each eyebrow
left_center = np.mean(left_eyebrow, axis=0)
right_center = np.mean(right_eyebrow, axis=0)
# Calculate bounding box with padding adjusted by size
all_points = np.vstack([left_eyebrow, right_eyebrow])
padding_factor = modules.globals.eyebrows_mask_size
min_x = np.min(all_points[:, 0]) - 25 * padding_factor
max_x = np.max(all_points[:, 0]) + 25 * padding_factor
min_y = np.min(all_points[:, 1]) - 20 * padding_factor
max_y = np.max(all_points[:, 1]) + 15 * padding_factor
# Ensure coordinates are within frame bounds
min_x = max(0, int(min_x))
min_y = max(0, int(min_y))
max_x = min(frame.shape[1], int(max_x))
max_y = min(frame.shape[0], int(max_y))
# Create mask for the eyebrows region
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
try:
# Convert points to local coordinates
left_local = left_eyebrow - [min_x, min_y]
right_local = right_eyebrow - [min_x, min_y]
def create_curved_eyebrow(points):
if len(points) >= 5:
# Sort points by x-coordinate
sorted_idx = np.argsort(points[:, 0])
sorted_points = points[sorted_idx]
# Calculate dimensions
x_min, y_min = np.min(sorted_points, axis=0)
x_max, y_max = np.max(sorted_points, axis=0)
width = x_max - x_min
height = y_max - y_min
# Create more points for smoother curve
num_points = 50
x = np.linspace(x_min, x_max, num_points)
# Fit quadratic curve through points for more natural arch
coeffs = np.polyfit(sorted_points[:, 0], sorted_points[:, 1], 2)
y = np.polyval(coeffs, x)
# Increased offsets to create more separation
top_offset = height * 0.5 # Increased from 0.3 to shift up more
bottom_offset = height * 0.2 # Increased from 0.1 to shift down more
# Create smooth curves
top_curve = y - top_offset
bottom_curve = y + bottom_offset
# Create curved endpoints with more pronounced taper
end_points = 5
start_x = np.linspace(x[0] - width * 0.15, x[0], end_points) # Increased taper
end_x = np.linspace(x[-1], x[-1] + width * 0.15, end_points) # Increased taper
# Create tapered ends
start_curve = np.column_stack((
start_x,
np.linspace(bottom_curve[0], top_curve[0], end_points)
))
end_curve = np.column_stack((
end_x,
np.linspace(bottom_curve[-1], top_curve[-1], end_points)
))
# Combine all points to form a smooth contour
contour_points = np.vstack([
start_curve,
np.column_stack((x, top_curve)),
end_curve,
np.column_stack((x[::-1], bottom_curve[::-1]))
])
# Add slight padding for better coverage
center = np.mean(contour_points, axis=0)
vectors = contour_points - center
padded_points = center + vectors * 1.2 # Increased padding slightly
return padded_points
return points
# Generate and draw eyebrow shapes
left_shape = create_curved_eyebrow(left_local)
right_shape = create_curved_eyebrow(right_local)
# Apply multi-stage blurring for natural feathering (GPU-accelerated when available)
# First, strong Gaussian blur for initial softening
mask_roi = gpu_gaussian_blur(mask_roi, (21, 21), 7)
# Second, medium blur for transition areas
mask_roi = gpu_gaussian_blur(mask_roi, (11, 11), 3)
# Finally, light blur for fine details
mask_roi = gpu_gaussian_blur(mask_roi, (5, 5), 1)
# Normalize mask values
mask_roi = cv2.normalize(mask_roi, None, 0, 255, cv2.NORM_MINMAX)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
# Extract the masked area from the frame
eyebrows_cutout = frame[min_y:max_y, min_x:max_x].copy()
# Combine points for visualization
eyebrows_polygon = np.vstack([
left_shape + [min_x, min_y],
right_shape + [min_x, min_y]
]).astype(np.int32)
except Exception as e:
# Fallback to simple polygons if curve fitting fails
left_local = left_eyebrow - [min_x, min_y]
right_local = right_eyebrow - [min_x, min_y]
cv2.fillPoly(mask_roi, [left_local.astype(np.int32)], 255)
cv2.fillPoly(mask_roi, [right_local.astype(np.int32)], 255)
mask_roi = gpu_gaussian_blur(mask_roi, (21, 21), 7)
mask[min_y:max_y, min_x:max_x] = mask_roi
eyebrows_cutout = frame[min_y:max_y, min_x:max_x].copy()
eyebrows_polygon = np.vstack([left_eyebrow, right_eyebrow]).astype(np.int32)
return mask, eyebrows_cutout, (min_x, min_y, max_x, max_y), eyebrows_polygon
def apply_mask_area(
frame: np.ndarray,
cutout: np.ndarray,
box: tuple,
face_mask: np.ndarray,
polygon: np.ndarray,
) -> np.ndarray:
min_x, min_y, max_x, max_y = box
box_width = max_x - min_x
box_height = max_y - min_y
if (
cutout is None
or box_width is None
or box_height is None
or face_mask is None
or polygon is None
):
return frame
try:
resized_cutout = gpu_resize(cutout, (box_width, box_height))
roi = frame[min_y:max_y, min_x:max_x]
if roi.shape != resized_cutout.shape:
resized_cutout = gpu_resize(
resized_cutout, (roi.shape[1], roi.shape[0])
)
color_corrected_area = apply_color_transfer(resized_cutout, roi)
# Create mask for the area
polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8)
# Split points for left and right parts if needed
if len(polygon) > 50: # Arbitrary threshold to detect if we have multiple parts
mid_point = len(polygon) // 2
left_points = polygon[:mid_point] - [min_x, min_y]
right_points = polygon[mid_point:] - [min_x, min_y]
cv2.fillPoly(polygon_mask, [left_points], 255)
cv2.fillPoly(polygon_mask, [right_points], 255)
else:
adjusted_polygon = polygon - [min_x, min_y]
cv2.fillPoly(polygon_mask, [adjusted_polygon], 255)
# Apply strong initial feathering (GPU-accelerated when available)
polygon_mask = gpu_gaussian_blur(polygon_mask, (21, 21), 7)
# Apply additional feathering
feather_amount = min(
30,
box_width // modules.globals.mask_feather_ratio,
box_height // modules.globals.mask_feather_ratio,
)
feathered_mask = cv2.GaussianBlur(
polygon_mask.astype(float), (0, 0), feather_amount
)
feathered_mask = feathered_mask / feathered_mask.max()
# Apply additional smoothing to the mask edges
feathered_mask = cv2.GaussianBlur(feathered_mask, (5, 5), 1)
face_mask_roi = face_mask[min_y:max_y, min_x:max_x]
combined_mask = feathered_mask * (face_mask_roi / 255.0)
combined_mask = combined_mask[:, :, np.newaxis]
blended = (
color_corrected_area * combined_mask + roi * (1 - combined_mask)
).astype(np.uint8)
# Apply face mask to blended result
face_mask_3channel = (
np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0
)
final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel)
frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8)
except Exception as e:
pass
return frame
def draw_mask_visualization(
frame: Frame,
mask_data: tuple,
label: str,
draw_method: str = "polygon"
) -> Frame:
mask, cutout, (min_x, min_y, max_x, max_y), polygon = mask_data
vis_frame = frame.copy()
# Ensure coordinates are within frame bounds
height, width = vis_frame.shape[:2]
min_x, min_y = max(0, min_x), max(0, min_y)
max_x, max_y = min(width, max_x), min(height, max_y)
if draw_method == "ellipse" and len(polygon) > 50: # For eyes
# Split points for left and right parts
mid_point = len(polygon) // 2
left_points = polygon[:mid_point]
right_points = polygon[mid_point:]
try:
# Fit ellipses to points - need at least 5 points
if len(left_points) >= 5 and len(right_points) >= 5:
# Convert points to the correct format for ellipse fitting
left_points = left_points.astype(np.float32)
right_points = right_points.astype(np.float32)
# Fit ellipses
left_ellipse = cv2.fitEllipse(left_points)
right_ellipse = cv2.fitEllipse(right_points)
# Draw the ellipses
cv2.ellipse(vis_frame, left_ellipse, (0, 255, 0), 2)
cv2.ellipse(vis_frame, right_ellipse, (0, 255, 0), 2)
except Exception as e:
# If ellipse fitting fails, draw simple rectangles as fallback
left_rect = cv2.boundingRect(left_points)
right_rect = cv2.boundingRect(right_points)
cv2.rectangle(vis_frame,
(left_rect[0], left_rect[1]),
(left_rect[0] + left_rect[2], left_rect[1] + left_rect[3]),
(0, 255, 0), 2)
cv2.rectangle(vis_frame,
(right_rect[0], right_rect[1]),
(right_rect[0] + right_rect[2], right_rect[1] + right_rect[3]),
(0, 255, 0), 2)
else: # For mouth and eyebrows
# Draw the polygon
if len(polygon) > 50: # If we have multiple parts
mid_point = len(polygon) // 2
left_points = polygon[:mid_point]
right_points = polygon[mid_point:]
cv2.polylines(vis_frame, [left_points], True, (0, 255, 0), 2, cv2.LINE_AA)
cv2.polylines(vis_frame, [right_points], True, (0, 255, 0), 2, cv2.LINE_AA)
else:
cv2.polylines(vis_frame, [polygon], True, (0, 255, 0), 2, cv2.LINE_AA)
# Add label
cv2.putText(
vis_frame,
label,
(min_x, min_y - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
1,
)
return vis_frame
File diff suppressed because it is too large Load Diff
+9
View File
@@ -0,0 +1,9 @@
#!/usr/bin/env python3
# Import the tkinter fix to patch the ScreenChanged error
import tkinter_fix
import core
if __name__ == '__main__':
core.run()
+26
View File
@@ -0,0 +1,26 @@
import tkinter
# Only needs to be imported once at the beginning of the application
def apply_patch():
# Create a monkey patch for the internal _tkinter module
original_init = tkinter.Tk.__init__
def patched_init(self, *args, **kwargs):
# Call the original init
original_init(self, *args, **kwargs)
# Define the missing ::tk::ScreenChanged procedure
self.tk.eval("""
if {[info commands ::tk::ScreenChanged] == ""} {
proc ::tk::ScreenChanged {args} {
# Do nothing
return
}
}
""")
# Apply the monkey patch
tkinter.Tk.__init__ = patched_init
# Apply the patch automatically when this module is imported
apply_patch()
+272 -75
View File
@@ -4,13 +4,18 @@ import customtkinter as ctk
from typing import Callable, Tuple
import cv2
from cv2_enumerate_cameras import enumerate_cameras # Add this import
from modules.gpu_processing import gpu_cvt_color, gpu_resize, gpu_flip
from PIL import Image, ImageOps
import time
import json
import queue
import threading
import numpy as np
import modules.globals
import modules.metadata
from modules.face_analyser import (
get_one_face,
get_many_faces,
get_unique_faces_from_target_image,
get_unique_faces_from_target_video,
add_blank_map,
@@ -27,6 +32,7 @@ from modules.utilities import (
)
from modules.video_capture import VideoCapturer
from modules.gettext import LanguageManager
from modules import globals
import platform
if platform.system() == "Windows":
@@ -35,7 +41,7 @@ if platform.system() == "Windows":
ROOT = None
POPUP = None
POPUP_LIVE = None
ROOT_HEIGHT = 700
ROOT_HEIGHT = 800
ROOT_WIDTH = 600
PREVIEW = None
@@ -97,6 +103,7 @@ def save_switch_states():
"keep_frames": modules.globals.keep_frames,
"many_faces": modules.globals.many_faces,
"map_faces": modules.globals.map_faces,
"poisson_blend": modules.globals.poisson_blend,
"color_correction": modules.globals.color_correction,
"nsfw_filter": modules.globals.nsfw_filter,
"live_mirror": modules.globals.live_mirror,
@@ -119,6 +126,7 @@ def load_switch_states():
modules.globals.keep_frames = switch_states.get("keep_frames", False)
modules.globals.many_faces = switch_states.get("many_faces", False)
modules.globals.map_faces = switch_states.get("map_faces", False)
modules.globals.poisson_blend = switch_states.get("poisson_blend", False)
modules.globals.color_correction = switch_states.get("color_correction", False)
modules.globals.nsfw_filter = switch_states.get("nsfw_filter", False)
modules.globals.live_mirror = switch_states.get("live_mirror", False)
@@ -152,20 +160,20 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
root.protocol("WM_DELETE_WINDOW", lambda: destroy())
source_label = ctk.CTkLabel(root, text=None)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
source_label.place(relx=0.1, rely=0.05, relwidth=0.275, relheight=0.225)
target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
target_label.place(relx=0.6, rely=0.05, relwidth=0.275, relheight=0.225)
select_face_button = ctk.CTkButton(
root, text=_("Select a face"), cursor="hand2", command=lambda: select_source_path()
)
select_face_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
select_face_button.place(relx=0.1, rely=0.30, relwidth=0.3, relheight=0.1)
swap_faces_button = ctk.CTkButton(
root, text="", cursor="hand2", command=lambda: swap_faces_paths()
)
swap_faces_button.place(relx=0.45, rely=0.4, relwidth=0.1, relheight=0.1)
swap_faces_button.place(relx=0.45, rely=0.30, relwidth=0.1, relheight=0.1)
select_target_button = ctk.CTkButton(
root,
@@ -173,7 +181,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
cursor="hand2",
command=lambda: select_target_path(),
)
select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
select_target_button.place(relx=0.6, rely=0.30, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(
@@ -186,7 +194,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_fps_checkbox.place(relx=0.1, rely=0.5)
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(
@@ -199,7 +207,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
keep_frames_switch.place(relx=0.1, rely=0.65)
keep_frames_switch.place(relx=0.1, rely=0.55)
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui["face_enhancer"])
enhancer_switch = ctk.CTkSwitch(
@@ -212,7 +220,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
enhancer_switch.place(relx=0.1, rely=0.7)
enhancer_switch.place(relx=0.1, rely=0.6)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(
@@ -225,7 +233,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
keep_audio_switch.place(relx=0.6, rely=0.6)
keep_audio_switch.place(relx=0.6, rely=0.5)
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(
@@ -238,7 +246,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
many_faces_switch.place(relx=0.6, rely=0.65)
many_faces_switch.place(relx=0.6, rely=0.55)
color_correction_value = ctk.BooleanVar(value=modules.globals.color_correction)
color_correction_switch = ctk.CTkSwitch(
@@ -251,7 +259,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
color_correction_switch.place(relx=0.6, rely=0.70)
color_correction_switch.place(relx=0.6, rely=0.6)
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw_filter)
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW filter', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw_filter', nsfw_value.get()))
@@ -269,7 +277,20 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
close_mapper_window() if not map_faces.get() else None
),
)
map_faces_switch.place(relx=0.1, rely=0.75)
map_faces_switch.place(relx=0.1, rely=0.65)
poisson_blend_value = ctk.BooleanVar(value=modules.globals.poisson_blend)
poisson_blend_switch = ctk.CTkSwitch(
root,
text=_("Poisson Blend"),
variable=poisson_blend_value,
cursor="hand2",
command=lambda: (
setattr(modules.globals, "poisson_blend", poisson_blend_value.get()),
save_switch_states(),
),
)
poisson_blend_switch.place(relx=0.1, rely=0.7)
show_fps_value = ctk.BooleanVar(value=modules.globals.show_fps)
show_fps_switch = ctk.CTkSwitch(
@@ -282,7 +303,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
save_switch_states(),
),
)
show_fps_switch.place(relx=0.6, rely=0.75)
show_fps_switch.place(relx=0.6, rely=0.65)
mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask)
mouth_mask_switch = ctk.CTkSwitch(
@@ -292,7 +313,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
cursor="hand2",
command=lambda: setattr(modules.globals, "mouth_mask", mouth_mask_var.get()),
)
mouth_mask_switch.place(relx=0.1, rely=0.55)
mouth_mask_switch.place(relx=0.1, rely=0.45)
show_mouth_mask_box_var = ctk.BooleanVar(value=modules.globals.show_mouth_mask_box)
show_mouth_mask_box_switch = ctk.CTkSwitch(
@@ -304,26 +325,26 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
modules.globals, "show_mouth_mask_box", show_mouth_mask_box_var.get()
),
)
show_mouth_mask_box_switch.place(relx=0.6, rely=0.55)
show_mouth_mask_box_switch.place(relx=0.6, rely=0.45)
start_button = ctk.CTkButton(
root, text=_("Start"), cursor="hand2", command=lambda: analyze_target(start, root)
)
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
start_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(
root, text=_("Destroy"), cursor="hand2", command=lambda: destroy()
)
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(
root, text=_("Preview"), cursor="hand2", command=lambda: toggle_preview()
)
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
# --- Camera Selection ---
camera_label = ctk.CTkLabel(root, text=_("Select Camera:"))
camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05)
camera_label.place(relx=0.1, rely=0.92, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
camera_indices, camera_names = available_cameras
@@ -342,7 +363,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
root, variable=camera_variable, values=camera_names
)
camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05)
camera_optionmenu.place(relx=0.35, rely=0.92, relwidth=0.25, relheight=0.05)
live_button = ctk.CTkButton(
root,
@@ -362,16 +383,82 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
else "disabled"
),
)
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
live_button.place(relx=0.65, rely=0.92, relwidth=0.2, relheight=0.05)
# --- End Camera Selection ---
# 1) Define a DoubleVar for transparency (0 = fully transparent, 1 = fully opaque)
transparency_var = ctk.DoubleVar(value=1.0)
def on_transparency_change(value: float):
# Convert slider value to float
val = float(value)
modules.globals.opacity = val # Set global opacity
percentage = int(val * 100)
if percentage == 0:
modules.globals.fp_ui["face_enhancer"] = False
update_status("Transparency set to 0% - Face swapping disabled.")
elif percentage == 100:
modules.globals.face_swapper_enabled = True
update_status("Transparency set to 100%.")
else:
modules.globals.face_swapper_enabled = True
update_status(f"Transparency set to {percentage}%")
# 2) Transparency label and slider (placed ABOVE sharpness)
transparency_label = ctk.CTkLabel(root, text="Transparency:")
transparency_label.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
transparency_slider = ctk.CTkSlider(
root,
from_=0.0,
to=1.0,
variable=transparency_var,
command=on_transparency_change,
fg_color="#E0E0E0",
progress_color="#007BFF",
button_color="#FFFFFF",
button_hover_color="#CCCCCC",
height=5,
border_width=1,
corner_radius=3,
)
transparency_slider.place(relx=0.35, rely=0.77, relwidth=0.5, relheight=0.02)
# 3) Sharpness label & slider
sharpness_var = ctk.DoubleVar(value=0.0) # start at 0.0
def on_sharpness_change(value: float):
modules.globals.sharpness = float(value)
update_status(f"Sharpness set to {value:.1f}")
sharpness_label = ctk.CTkLabel(root, text="Sharpness:")
sharpness_label.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
sharpness_slider = ctk.CTkSlider(
root,
from_=0.0,
to=5.0,
variable=sharpness_var,
command=on_sharpness_change,
fg_color="#E0E0E0",
progress_color="#007BFF",
button_color="#FFFFFF",
button_hover_color="#CCCCCC",
height=5,
border_width=1,
corner_radius=3,
)
sharpness_slider.place(relx=0.35, rely=0.82, relwidth=0.5, relheight=0.02)
# Status and link at the bottom
global status_label
status_label = ctk.CTkLabel(root, text=None, justify="center")
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
status_label.place(relx=0.1, rely=0.96, relwidth=0.8)
donate_label = ctk.CTkLabel(
root, text="Deep Live Cam", justify="center", cursor="hand2"
)
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.place(relx=0.1, rely=0.98, relwidth=0.8)
donate_label.configure(
text_color=ctk.ThemeManager.theme.get("URL").get("text_color")
)
@@ -381,6 +468,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
return root
def close_mapper_window():
global POPUP, POPUP_LIVE
if POPUP and POPUP.winfo_exists():
@@ -429,7 +517,7 @@ def create_source_target_popup(
POPUP.destroy()
select_output_path(start)
else:
update_pop_status("At least 1 source with target is required!")
update_pop_status("Atleast 1 source with target is required!")
scrollable_frame = ctk.CTkScrollableFrame(
POPUP, width=POPUP_SCROLL_WIDTH, height=POPUP_SCROLL_HEIGHT
@@ -459,7 +547,7 @@ def create_source_target_popup(
)
x_label.grid(row=id, column=2, padx=10, pady=10)
image = Image.fromarray(cv2.cvtColor(item["target"]["cv2"], cv2.COLOR_BGR2RGB))
image = Image.fromarray(gpu_cvt_color(item["target"]["cv2"], cv2.COLOR_BGR2RGB))
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
)
@@ -489,7 +577,7 @@ def update_popup_source(
global source_label_dict
source_path = ctk.filedialog.askopenfilename(
title=_("select a source image"),
title=_("select an source image"),
initialdir=RECENT_DIRECTORY_SOURCE,
filetypes=[img_ft],
)
@@ -514,7 +602,7 @@ def update_popup_source(
}
image = Image.fromarray(
cv2.cvtColor(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -584,7 +672,7 @@ def select_source_path() -> None:
PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(
title=_("select a source image"),
title=_("select an source image"),
initialdir=RECENT_DIRECTORY_SOURCE,
filetypes=[img_ft],
)
@@ -627,7 +715,7 @@ def select_target_path() -> None:
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(
title=_("select a target image or video"),
title=_("select an target image or video"),
initialdir=RECENT_DIRECTORY_TARGET,
filetypes=[img_ft, vid_ft],
)
@@ -696,22 +784,18 @@ def check_and_ignore_nsfw(target, destroy: Callable = None) -> bool:
def fit_image_to_size(image, width: int, height: int):
if width is None or height is None or width <= 0 or height <= 0:
if width is None and height is None:
return image
h, w, _ = image.shape
ratio_h = 0.0
ratio_w = 0.0
ratio_w = width / w
ratio_h = height / h
# Use the smaller ratio to ensure the image fits within the given dimensions
ratio = min(ratio_w, ratio_h)
# Compute new dimensions, ensuring they're at least 1 pixel
new_width = max(1, int(ratio * w))
new_height = max(1, int(ratio * h))
new_size = (new_width, new_height)
return cv2.resize(image, dsize=new_size)
if width > height:
ratio_h = height / h
else:
ratio_w = width / w
ratio = max(ratio_w, ratio_h)
new_size = (int(ratio * w), int(ratio * h))
return gpu_resize(image, dsize=new_size)
def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage:
@@ -729,7 +813,7 @@ def render_video_preview(
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(gpu_cvt_color(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
@@ -767,7 +851,7 @@ def update_preview(frame_number: int = 0) -> None:
temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(modules.globals.source_path)), temp_frame
)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(
image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS
)
@@ -868,52 +952,97 @@ def get_available_cameras():
return camera_indices, camera_names
def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
def _capture_thread_func(cap, capture_queue, stop_event):
"""Capture thread: reads frames from camera and puts them into the queue.
Drops frames when the queue is full to avoid backpressure on the camera."""
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
stop_event.set()
break
try:
capture_queue.put_nowait(frame)
except queue.Full:
# Drop the oldest frame and enqueue the new one
try:
capture_queue.get_nowait()
except queue.Empty:
pass
try:
capture_queue.put_nowait(frame)
except queue.Full:
pass
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
update_status("Failed to start camera")
return
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
# How often to run full face detection. On intermediate frames the last
# detected face positions are reused, which significantly reduces the
# per-frame cost of the processing thread.
DETECT_EVERY_N = 2
def _processing_thread_func(capture_queue, processed_queue, stop_event):
"""Processing thread: takes raw frames from capture_queue, applies face
processing, and puts results into processed_queue. Drops processed frames
when the output queue is full so the UI always gets the latest result.
Uses DETECT_EVERY_N to skip expensive face detection on intermediate
frames, reusing cached face positions instead."""
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None
prev_time = time.time()
fps_update_interval = 0.5
frame_count = 0
fps = 0
proc_frame_index = 0
cached_target_face = None # cached single-face result
cached_many_faces = None # cached many-faces result
while True:
ret, frame = cap.read()
if not ret:
break
while not stop_event.is_set():
try:
frame = capture_queue.get(timeout=0.05)
except queue.Empty:
continue
temp_frame = frame.copy()
run_detection = (proc_frame_index % DETECT_EVERY_N == 0)
proc_frame_index += 1
if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1)
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
temp_frame = gpu_flip(temp_frame, 1)
if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
# Update face detection cache on detection frames
if run_detection or (cached_target_face is None and cached_many_faces is None):
if modules.globals.many_faces:
cached_many_faces = get_many_faces(temp_frame)
cached_target_face = None
else:
cached_target_face = get_one_face(temp_frame)
cached_many_faces = None
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame)
elif frame_processor.NAME == "DLC.FACE-SWAPPER":
# Use cached face positions to skip redundant detection
swapped_bboxes = []
if modules.globals.many_faces and cached_many_faces:
result = temp_frame.copy()
for t_face in cached_many_faces:
result = frame_processor.swap_face(source_image, t_face, result)
if hasattr(t_face, 'bbox') and t_face.bbox is not None:
swapped_bboxes.append(t_face.bbox.astype(int))
temp_frame = result
elif cached_target_face is not None:
temp_frame = frame_processor.swap_face(source_image, cached_target_face, temp_frame)
if hasattr(cached_target_face, 'bbox') and cached_target_face.bbox is not None:
swapped_bboxes.append(cached_target_face.bbox.astype(int))
# Apply post-processing (sharpening, interpolation)
temp_frame = frame_processor.apply_post_processing(temp_frame, swapped_bboxes)
else:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
else:
@@ -944,7 +1073,71 @@ def create_webcam_preview(camera_index: int):
2,
)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
# Put processed frame into output queue, dropping old frames if full
try:
processed_queue.put_nowait(temp_frame)
except queue.Full:
try:
processed_queue.get_nowait()
except queue.Empty:
pass
try:
processed_queue.put_nowait(temp_frame)
except queue.Full:
pass
def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
update_status("Failed to start camera")
return
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
# Queues for decoupling capture from processing and processing from display.
# Small maxsize ensures we always work on recent frames and drop stale ones.
capture_queue = queue.Queue(maxsize=2)
processed_queue = queue.Queue(maxsize=2)
stop_event = threading.Event()
# Start capture thread
cap_thread = threading.Thread(
target=_capture_thread_func,
args=(cap, capture_queue, stop_event),
daemon=True,
)
cap_thread.start()
# Start processing thread
proc_thread = threading.Thread(
target=_processing_thread_func,
args=(capture_queue, processed_queue, stop_event),
daemon=True,
)
proc_thread.start()
# Main (UI) thread: pull processed frames and update the display
while not stop_event.is_set():
try:
temp_frame = processed_queue.get(timeout=0.03)
except queue.Empty:
ROOT.update()
continue
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
image = gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(image)
image = ImageOps.contain(
image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
@@ -956,6 +1149,10 @@ def create_webcam_preview(camera_index: int):
if PREVIEW.state() == "withdrawn":
break
# Signal threads to stop and wait for them
stop_event.set()
cap_thread.join(timeout=2.0)
proc_thread.join(timeout=2.0)
cap.release()
PREVIEW.withdraw()
@@ -1067,7 +1264,7 @@ def refresh_data(map: list):
if "source" in item:
image = Image.fromarray(
cv2.cvtColor(item["source"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(item["source"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1085,7 +1282,7 @@ def refresh_data(map: list):
if "target" in item:
image = Image.fromarray(
cv2.cvtColor(item["target"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(item["target"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1108,7 +1305,7 @@ def update_webcam_source(
global source_label_dict_live
source_path = ctk.filedialog.askopenfilename(
title=_("select a source image"),
title=_("select an source image"),
initialdir=RECENT_DIRECTORY_SOURCE,
filetypes=[img_ft],
)
@@ -1133,7 +1330,7 @@ def update_webcam_source(
}
image = Image.fromarray(
cv2.cvtColor(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(map[button_num]["source"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1160,7 +1357,7 @@ def update_webcam_target(
global target_label_dict_live
target_path = ctk.filedialog.askopenfilename(
title=_("select a target image"),
title=_("select an target image"),
initialdir=RECENT_DIRECTORY_SOURCE,
filetypes=[img_ft],
)
@@ -1185,7 +1382,7 @@ def update_webcam_target(
}
image = Image.fromarray(
cv2.cvtColor(map[button_num]["target"]["cv2"], cv2.COLOR_BGR2RGB)
gpu_cvt_color(map[button_num]["target"]["cv2"], cv2.COLOR_BGR2RGB)
)
image = image.resize(
(MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS
@@ -1203,4 +1400,4 @@ def update_webcam_target(
target_label_dict_live[button_num] = target_image
else:
update_pop_live_status("Face could not be detected in last upload!")
return map
return map
+116 -23
View File
@@ -21,13 +21,14 @@ if platform.system().lower() == "darwin":
def run_ffmpeg(args: List[str]) -> bool:
"""Run ffmpeg with hardware acceleration and optimized settings."""
commands = [
"ffmpeg",
"-hide_banner",
"-hwaccel",
"auto",
"-loglevel",
modules.globals.log_level,
"-hwaccel", "auto", # Auto-detect hardware acceleration
"-hwaccel_output_format", "auto", # Use hardware format when possible
"-threads", str(modules.globals.execution_threads or 0), # 0 = auto-detect optimal thread count
"-loglevel", modules.globals.log_level,
]
commands.extend(args)
try:
@@ -61,39 +62,131 @@ def detect_fps(target_path: str) -> float:
def extract_frames(target_path: str) -> None:
"""Extract frames with hardware acceleration and optimized settings."""
temp_directory_path = get_temp_directory_path(target_path)
# Use hardware-accelerated decoding and optimized pixel format
run_ffmpeg(
[
"-i",
target_path,
"-pix_fmt",
"rgb24",
"-i", target_path,
"-vf", "format=rgb24", # Use video filter for format conversion (faster)
"-vsync", "0", # Prevent frame duplication
"-frame_pts", "1", # Preserve frame timing
os.path.join(temp_directory_path, "%04d.png"),
]
)
def create_video(target_path: str, fps: float = 30.0) -> None:
"""Create video with hardware-accelerated encoding and optimized settings."""
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(
[
"-r",
str(fps),
"-i",
os.path.join(temp_directory_path, "%04d.png"),
"-c:v",
modules.globals.video_encoder,
"-crf",
str(modules.globals.video_quality),
"-pix_fmt",
"yuv420p",
"-vf",
"colorspace=bt709:iall=bt601-6-625:fast=1",
# Determine optimal encoder based on available hardware
encoder = modules.globals.video_encoder
encoder_options = []
# GPU-accelerated encoding options
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
# NVIDIA GPU encoding
if encoder == 'libx264':
encoder = 'h264_nvenc'
encoder_options = [
"-preset", "p7", # Highest quality preset for NVENC
"-tune", "hq", # High quality tuning
"-rc", "vbr", # Variable bitrate
"-cq", str(modules.globals.video_quality), # Quality level
"-b:v", "0", # Let CQ control bitrate
"-multipass", "fullres", # Two-pass encoding for better quality
]
elif encoder == 'libx265':
encoder = 'hevc_nvenc'
encoder_options = [
"-preset", "p7",
"-tune", "hq",
"-rc", "vbr",
"-cq", str(modules.globals.video_quality),
"-b:v", "0",
]
elif 'DmlExecutionProvider' in modules.globals.execution_providers:
# AMD/Intel GPU encoding (DirectML on Windows)
if encoder == 'libx264':
# Try AMD AMF encoder
encoder = 'h264_amf'
encoder_options = [
"-quality", "quality", # Quality mode
"-rc", "vbr_latency",
"-qp_i", str(modules.globals.video_quality),
"-qp_p", str(modules.globals.video_quality),
]
elif encoder == 'libx265':
encoder = 'hevc_amf'
encoder_options = [
"-quality", "quality",
"-rc", "vbr_latency",
"-qp_i", str(modules.globals.video_quality),
"-qp_p", str(modules.globals.video_quality),
]
else:
# CPU encoding with optimized settings
if encoder == 'libx264':
encoder_options = [
"-preset", "medium", # Balance speed/quality
"-crf", str(modules.globals.video_quality),
"-tune", "film", # Optimize for film content
]
elif encoder == 'libx265':
encoder_options = [
"-preset", "medium",
"-crf", str(modules.globals.video_quality),
"-x265-params", "log-level=error",
]
elif encoder == 'libvpx-vp9':
encoder_options = [
"-crf", str(modules.globals.video_quality),
"-b:v", "0", # Constant quality mode
"-cpu-used", "2", # Speed vs quality (0-5, lower=slower/better)
]
# Build ffmpeg command
ffmpeg_args = [
"-r", str(fps),
"-i", os.path.join(temp_directory_path, "%04d.png"),
"-c:v", encoder,
]
# Add encoder-specific options
ffmpeg_args.extend(encoder_options)
# Add common options
ffmpeg_args.extend([
"-pix_fmt", "yuv420p",
"-movflags", "+faststart", # Enable fast start for web playback
"-vf", "colorspace=bt709:iall=bt601-6-625:fast=1",
"-y",
temp_output_path,
])
# Try with hardware encoder first, fallback to software if it fails
success = run_ffmpeg(ffmpeg_args)
if not success and encoder in ['h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf']:
# Fallback to software encoding
print(f"Hardware encoding with {encoder} failed, falling back to software encoding...")
fallback_encoder = 'libx264' if 'h264' in encoder else 'libx265'
ffmpeg_args_fallback = [
"-r", str(fps),
"-i", os.path.join(temp_directory_path, "%04d.png"),
"-c:v", fallback_encoder,
"-preset", "medium",
"-crf", str(modules.globals.video_quality),
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
"-vf", "colorspace=bt709:iall=bt601-6-625:fast=1",
"-y",
temp_output_path,
]
)
run_ffmpeg(ffmpeg_args_fallback)
def restore_audio(target_path: str, output_path: str) -> None:
+4 -9
View File
@@ -1,5 +1,3 @@
--extra-index-url https://download.pytorch.org/whl/cu128
numpy>=1.23.5,<2
typing-extensions>=4.8.0
opencv-python==4.10.0.84
@@ -9,13 +7,10 @@ insightface==0.7.3
psutil==5.9.8
tk==0.1.0
customtkinter==5.2.2
pillow==11.1.0
torch; sys_platform != 'darwin'
torch==2.5.1; sys_platform == 'darwin'
torchvision; sys_platform != 'darwin'
torchvision==0.20.1; sys_platform == 'darwin'
pillow==12.1.1
onnxruntime-silicon==1.16.3; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.22.0; sys_platform != 'darwin'
onnxruntime-gpu==1.24.2; sys_platform != 'darwin'
tensorflow; sys_platform != 'darwin'
opennsfw2==0.10.2
protobuf==4.25.1
protobuf==5.29.6
pygrabber
+3
View File
@@ -1,5 +1,8 @@
#!/usr/bin/env python3
# Import the tkinter fix to patch the ScreenChanged error
import tkinter_fix
from modules import core
if __name__ == '__main__':
+26
View File
@@ -0,0 +1,26 @@
import tkinter
# Only needs to be imported once at the beginning of the application
def apply_patch():
# Create a monkey patch for the internal _tkinter module
original_init = tkinter.Tk.__init__
def patched_init(self, *args, **kwargs):
# Call the original init
original_init(self, *args, **kwargs)
# Define the missing ::tk::ScreenChanged procedure
self.tk.eval("""
if {[info commands ::tk::ScreenChanged] == ""} {
proc ::tk::ScreenChanged {args} {
# Do nothing
return
}
}
""")
# Apply the monkey patch
tkinter.Tk.__init__ = patched_init
# Apply the patch automatically when this module is imported
apply_patch()