Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a4c617af3e | |||
| 9a33f5e184 | |||
| 2b36300b8c | |||
| 21c029f51e | |||
| 06bc8f2152 | |||
| 63b90c428e | |||
| df8e8b427e | |||
| dfd145b996 | |||
| b3c4ed9250 |
@@ -1,4 +1,4 @@
|
||||
<h1 align="center">Deep-Live-Cam</h1>
|
||||
<h1 align="center">Deep-Live-Cam 2.0.2c</h1>
|
||||
|
||||
<p align="center">
|
||||
Real-time face swap and video deepfake with a single click and only a single image.
|
||||
@@ -30,7 +30,7 @@ By using this software, you agree to these terms and commit to using it in a man
|
||||
|
||||
Users are expected to use this software responsibly and legally. If using a real person's face, obtain their consent and clearly label any output as a deepfake when sharing online. We are not responsible for end-user actions.
|
||||
|
||||
## Exclusive v2.3c Quick Start - Pre-built (Windows/Mac Silicon)
|
||||
## Exclusive v2.4 Quick Start - Pre-built (Windows/Mac Silicon)
|
||||
|
||||
<a href="https://deeplivecam.net/index.php/quickstart"> <img src="media/Download.png" width="285" height="77" />
|
||||
|
||||
|
||||
+40
-3
@@ -129,11 +129,22 @@ def suggest_execution_providers() -> List[str]:
|
||||
|
||||
|
||||
def suggest_execution_threads() -> int:
|
||||
"""Suggest optimal thread count based on hardware and execution provider."""
|
||||
import os
|
||||
|
||||
# Get CPU count
|
||||
cpu_count = os.cpu_count() or 4
|
||||
|
||||
if 'DmlExecutionProvider' in modules.globals.execution_providers:
|
||||
return 1
|
||||
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||
return 1
|
||||
return 8
|
||||
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
|
||||
# For CUDA, use more threads for parallel frame processing
|
||||
return min(cpu_count, 16)
|
||||
|
||||
# For CPU execution, use most cores but leave some for system
|
||||
return max(4, min(cpu_count - 2, 16))
|
||||
|
||||
|
||||
def limit_resources() -> None:
|
||||
@@ -176,10 +187,16 @@ def update_status(message: str, scope: str = 'DLC.CORE') -> None:
|
||||
ui.update_status(message)
|
||||
|
||||
def start() -> None:
|
||||
"""Start processing with performance monitoring."""
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
if not frame_processor.pre_start():
|
||||
return
|
||||
update_status('Processing...')
|
||||
|
||||
# process image to image
|
||||
if has_image_extension(modules.globals.target_path):
|
||||
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy):
|
||||
@@ -193,26 +210,40 @@ def start() -> None:
|
||||
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
|
||||
release_resources()
|
||||
if is_image(modules.globals.target_path):
|
||||
update_status('Processing to image succeed!')
|
||||
elapsed = time.time() - start_time
|
||||
update_status(f'Processing to image succeed! (Time: {elapsed:.2f}s)')
|
||||
else:
|
||||
update_status('Processing to image failed!')
|
||||
return
|
||||
|
||||
# process image to videos
|
||||
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy):
|
||||
return
|
||||
|
||||
extraction_start = time.time()
|
||||
if not modules.globals.map_faces:
|
||||
update_status('Creating temp resources...')
|
||||
create_temp(modules.globals.target_path)
|
||||
update_status('Extracting frames...')
|
||||
extract_frames(modules.globals.target_path)
|
||||
extraction_time = time.time() - extraction_start
|
||||
update_status(f'Frame extraction completed in {extraction_time:.2f}s')
|
||||
|
||||
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
|
||||
total_frames = len(temp_frame_paths)
|
||||
update_status(f'Processing {total_frames} frames with {modules.globals.execution_threads} threads...')
|
||||
|
||||
processing_start = time.time()
|
||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||
update_status('Progressing...', frame_processor.NAME)
|
||||
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
|
||||
release_resources()
|
||||
processing_time = time.time() - processing_start
|
||||
fps_processing = total_frames / processing_time if processing_time > 0 else 0
|
||||
update_status(f'Frame processing completed in {processing_time:.2f}s ({fps_processing:.2f} fps)')
|
||||
|
||||
# handles fps
|
||||
encoding_start = time.time()
|
||||
if modules.globals.keep_fps:
|
||||
update_status('Detecting fps...')
|
||||
fps = detect_fps(modules.globals.target_path)
|
||||
@@ -221,6 +252,9 @@ def start() -> None:
|
||||
else:
|
||||
update_status('Creating video with 30.0 fps...')
|
||||
create_video(modules.globals.target_path)
|
||||
encoding_time = time.time() - encoding_start
|
||||
update_status(f'Video encoding completed in {encoding_time:.2f}s')
|
||||
|
||||
# handle audio
|
||||
if modules.globals.keep_audio:
|
||||
if modules.globals.keep_fps:
|
||||
@@ -230,10 +264,13 @@ def start() -> None:
|
||||
restore_audio(modules.globals.target_path, modules.globals.output_path)
|
||||
else:
|
||||
move_temp(modules.globals.target_path, modules.globals.output_path)
|
||||
|
||||
# clean and validate
|
||||
clean_temp(modules.globals.target_path)
|
||||
|
||||
total_time = time.time() - start_time
|
||||
if is_video(modules.globals.target_path):
|
||||
update_status('Processing to video succeed!')
|
||||
update_status(f'Processing to video succeed! Total time: {total_time:.2f}s')
|
||||
else:
|
||||
update_status('Processing to video failed!')
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import shutil
|
||||
from typing import Any
|
||||
import insightface
|
||||
import threading
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
@@ -13,14 +14,22 @@ from modules.utilities import get_temp_directory_path, create_temp, extract_fram
|
||||
from pathlib import Path
|
||||
|
||||
FACE_ANALYSER = None
|
||||
FACE_ANALYSER_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def get_face_analyser() -> Any:
|
||||
"""Get face analyser with thread-safe initialization."""
|
||||
global FACE_ANALYSER
|
||||
|
||||
if FACE_ANALYSER is None:
|
||||
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers)
|
||||
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
|
||||
with FACE_ANALYSER_LOCK:
|
||||
# Double-check after acquiring lock
|
||||
if FACE_ANALYSER is None:
|
||||
FACE_ANALYSER = insightface.app.FaceAnalysis(
|
||||
name='buffalo_l',
|
||||
providers=modules.globals.execution_providers
|
||||
)
|
||||
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
|
||||
return FACE_ANALYSER
|
||||
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ keep_audio: bool = True
|
||||
keep_frames: bool = False
|
||||
many_faces: bool = False # Process all detected faces with default source
|
||||
map_faces: bool = False # Use source_target_map or simple_map for specific swaps
|
||||
poisson_blend: bool = False # Enable Poisson Blending for smoother face swaps
|
||||
color_correction: bool = False # Enable color correction (implementation specific)
|
||||
nsfw_filter: bool = False
|
||||
|
||||
|
||||
+2
-2
@@ -1,3 +1,3 @@
|
||||
name = 'Deep-Live-Cam'
|
||||
version = '2.0c'
|
||||
edition = 'GitHub Edition'
|
||||
version = '2.0.3c'
|
||||
edition = 'GitHub Edition'
|
||||
@@ -67,13 +67,29 @@ def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
|
||||
print(f"Warning: Error removing frame processor {frame_processor}: {e}")
|
||||
|
||||
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
|
||||
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
|
||||
futures = []
|
||||
for path in temp_frame_paths:
|
||||
future = executor.submit(process_frames, source_path, [path], progress)
|
||||
futures.append(future)
|
||||
for future in futures:
|
||||
future.result()
|
||||
"""Process frames in parallel with optimized batching and memory management."""
|
||||
max_workers = modules.globals.execution_threads
|
||||
|
||||
# Determine optimal batch size based on available memory and thread count
|
||||
# Process frames in batches to avoid memory overflow
|
||||
batch_size = max(1, min(32, len(temp_frame_paths) // max(1, max_workers)))
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
# Process in batches to manage memory better
|
||||
for i in range(0, len(temp_frame_paths), batch_size):
|
||||
batch = temp_frame_paths[i:i + batch_size]
|
||||
futures = []
|
||||
|
||||
for path in batch:
|
||||
future = executor.submit(process_frames, source_path, [path], progress)
|
||||
futures.append(future)
|
||||
|
||||
# Wait for batch to complete before starting next batch
|
||||
for future in futures:
|
||||
try:
|
||||
future.result()
|
||||
except Exception as e:
|
||||
print(f"Error processing frame: {e}")
|
||||
|
||||
|
||||
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
|
||||
|
||||
@@ -45,6 +45,7 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||
) # 5% of face width
|
||||
|
||||
# Create a slightly larger convex hull for padding
|
||||
face_outline = landmarks[0:33]
|
||||
hull = cv2.convexHull(face_outline)
|
||||
hull_padded = []
|
||||
for point in hull:
|
||||
@@ -70,77 +71,30 @@ def create_lower_mouth_mask(
|
||||
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
|
||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
||||
mouth_cutout = None
|
||||
lower_lip_polygon = None
|
||||
mouth_box = (0,0,0,0)
|
||||
|
||||
landmarks = face.landmark_2d_106
|
||||
if landmarks is not None:
|
||||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
||||
lower_lip_order = [
|
||||
65,
|
||||
66,
|
||||
62,
|
||||
70,
|
||||
69,
|
||||
18,
|
||||
19,
|
||||
20,
|
||||
21,
|
||||
22,
|
||||
23,
|
||||
24,
|
||||
0,
|
||||
8,
|
||||
7,
|
||||
6,
|
||||
5,
|
||||
4,
|
||||
3,
|
||||
2,
|
||||
65,
|
||||
]
|
||||
lower_lip_landmarks = landmarks[lower_lip_order].astype(
|
||||
np.float32
|
||||
) # Use float for precise calculations
|
||||
# Use outer mouth landmarks (52-63) to capture the lips only
|
||||
lower_lip_order = list(range(52, 64))
|
||||
|
||||
if max(lower_lip_order) >= landmarks.shape[0]:
|
||||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||||
|
||||
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
|
||||
|
||||
# Calculate the center of the landmarks
|
||||
center = np.mean(lower_lip_landmarks, axis=0)
|
||||
|
||||
# Expand the landmarks outward using the mouth_mask_size
|
||||
# Use a more conservative expansion to avoid affecting face shape
|
||||
expansion_factor = (
|
||||
1 + modules.globals.mask_down_size * modules.globals.mouth_mask_size
|
||||
) # Adjust expansion based on slider
|
||||
)
|
||||
expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center
|
||||
|
||||
# Extend the top lip part
|
||||
toplip_indices = [
|
||||
20,
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
] # Indices for landmarks 2, 65, 66, 62, 70, 69, 18
|
||||
toplip_extension = (
|
||||
modules.globals.mask_size * modules.globals.mouth_mask_size * 0.5
|
||||
) # Adjust extension based on slider
|
||||
for idx in toplip_indices:
|
||||
direction = expanded_landmarks[idx] - center
|
||||
direction = direction / np.linalg.norm(direction)
|
||||
expanded_landmarks[idx] += direction * toplip_extension
|
||||
|
||||
# Extend the bottom part (chin area)
|
||||
chin_indices = [
|
||||
11,
|
||||
12,
|
||||
13,
|
||||
14,
|
||||
15,
|
||||
16,
|
||||
] # Indices for landmarks 21, 22, 23, 24, 0, 8
|
||||
chin_extension = 2 * 0.2 # Adjust this factor to control the extension
|
||||
for idx in chin_indices:
|
||||
expanded_landmarks[idx][1] += (
|
||||
expanded_landmarks[idx][1] - center[1]
|
||||
) * chin_extension
|
||||
# Removed specific top/chin extensions to preserve face shape
|
||||
|
||||
# Convert back to integer coordinates
|
||||
expanded_landmarks = expanded_landmarks.astype(np.int32)
|
||||
@@ -165,7 +119,9 @@ def create_lower_mouth_mask(
|
||||
|
||||
# Create the mask
|
||||
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
|
||||
cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255)
|
||||
# Shift polygon coordinates relative to the ROI's top-left corner
|
||||
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
|
||||
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
|
||||
|
||||
# Apply Gaussian blur to soften the mask edges
|
||||
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
|
||||
@@ -178,8 +134,9 @@ def create_lower_mouth_mask(
|
||||
|
||||
# Return the expanded lower lip polygon in original frame coordinates
|
||||
lower_lip_polygon = expanded_landmarks
|
||||
mouth_box = (min_x, min_y, max_x, max_y)
|
||||
|
||||
return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon
|
||||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||||
|
||||
def create_eyes_mask(face: Face, frame: Frame) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
|
||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
||||
@@ -606,4 +563,4 @@ def draw_mask_visualization(
|
||||
1,
|
||||
)
|
||||
|
||||
return vis_frame
|
||||
return vis_frame
|
||||
@@ -1,8 +1,9 @@
|
||||
from typing import Any, List
|
||||
from typing import Any, List, Optional
|
||||
import cv2
|
||||
import insightface
|
||||
import threading
|
||||
import numpy as np
|
||||
import platform
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
@@ -14,9 +15,9 @@ from modules.utilities import (
|
||||
is_video,
|
||||
)
|
||||
from modules.cluster_analysis import find_closest_centroid
|
||||
# Removed modules.globals.face_swapper_enabled - assuming controlled elsewhere or implicitly true if used
|
||||
# Removed modules.globals.opacity - accessed via getattr
|
||||
import os
|
||||
from collections import deque
|
||||
import time
|
||||
|
||||
FACE_SWAPPER = None
|
||||
THREAD_LOCK = threading.Lock()
|
||||
@@ -26,6 +27,16 @@ NAME = "DLC.FACE-SWAPPER"
|
||||
PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step
|
||||
# --- END: Added for Interpolation ---
|
||||
|
||||
# --- START: Mac M1-M5 Optimizations ---
|
||||
IS_APPLE_SILICON = platform.system() == 'Darwin' and platform.machine() == 'arm64'
|
||||
FRAME_CACHE = deque(maxlen=3) # Cache for frame reuse
|
||||
FACE_DETECTION_CACHE = {} # Cache face detections
|
||||
LAST_DETECTION_TIME = 0
|
||||
DETECTION_INTERVAL = 0.033 # ~30 FPS detection rate for live mode
|
||||
FRAME_SKIP_COUNTER = 0
|
||||
ADAPTIVE_QUALITY = True
|
||||
# --- END: Mac M1-M5 Optimizations ---
|
||||
|
||||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
models_dir = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||
@@ -69,55 +80,64 @@ def get_face_swapper() -> Any:
|
||||
model_path = os.path.join(models_dir, model_name)
|
||||
update_status(f"Loading face swapper model from: {model_path}", NAME)
|
||||
try:
|
||||
# Ensure the providers list is correctly passed
|
||||
# Apply CoreML optimization for Mac systems
|
||||
# Optimized provider configuration for Apple Silicon
|
||||
providers_config = []
|
||||
for p in modules.globals.execution_providers:
|
||||
if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
|
||||
# Enhanced CoreML configuration for M1-M5
|
||||
providers_config.append((
|
||||
"CoreMLExecutionProvider",
|
||||
{
|
||||
"ModelFormat": "MLProgram",
|
||||
"MLComputeUnits": "ALL", # Use Neural Engine + GPU + CPU
|
||||
"SpecializationStrategy": "FastPrediction",
|
||||
"AllowLowPrecisionAccumulationOnGPU": 1,
|
||||
"EnableOnSubgraphs": 1,
|
||||
"RequireStaticShapes": 0,
|
||||
"MaximumCacheSize": 1024 * 1024 * 512, # 512MB cache
|
||||
}
|
||||
))
|
||||
else:
|
||||
providers_config.append(p)
|
||||
|
||||
FACE_SWAPPER = insightface.model_zoo.get_model(
|
||||
model_path,
|
||||
providers=[
|
||||
(
|
||||
(
|
||||
"CoreMLExecutionProvider",
|
||||
{
|
||||
"ModelFormat": "MLProgram",
|
||||
"MLComputeUnits": "CPUAndGPU",
|
||||
"SpecializationStrategy": "FastPrediction",
|
||||
"AllowLowPrecisionAccumulationOnGPU": 1,
|
||||
},
|
||||
)
|
||||
if p == "CoreMLExecutionProvider"
|
||||
else p
|
||||
)
|
||||
for p in modules.globals.execution_providers
|
||||
],
|
||||
providers=providers_config,
|
||||
)
|
||||
update_status("Face swapper model loaded successfully.", NAME)
|
||||
except Exception as e:
|
||||
update_status(f"Error loading face swapper model: {e}", NAME)
|
||||
# print traceback maybe?
|
||||
# import traceback
|
||||
# traceback.print_exc()
|
||||
FACE_SWAPPER = None # Ensure it remains None on failure
|
||||
FACE_SWAPPER = None
|
||||
return None
|
||||
return FACE_SWAPPER
|
||||
|
||||
|
||||
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
"""Optimized face swapping with better memory management and performance."""
|
||||
face_swapper = get_face_swapper()
|
||||
if face_swapper is None:
|
||||
update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME)
|
||||
return temp_frame # Return original frame if model failed or not loaded
|
||||
return temp_frame
|
||||
|
||||
# Safety check for faces
|
||||
if source_face is None or target_face is None:
|
||||
return temp_frame
|
||||
if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None:
|
||||
return temp_frame
|
||||
|
||||
# Store a copy of the original frame before swapping for opacity blending
|
||||
original_frame = temp_frame.copy()
|
||||
|
||||
# --- Pre-swap Input Check (Optional but good practice) ---
|
||||
# Pre-swap Input Check with optimization
|
||||
if temp_frame.dtype != np.uint8:
|
||||
# print(f"Warning: Input frame is {temp_frame.dtype}, converting to uint8 before swap.")
|
||||
temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8)
|
||||
# --- End Input Check ---
|
||||
|
||||
# Apply the face swap
|
||||
# Apply the face swap with optimized memory handling
|
||||
try:
|
||||
# Ensure contiguous memory layout for better performance on all platforms
|
||||
if not temp_frame.flags['C_CONTIGUOUS']:
|
||||
temp_frame = np.ascontiguousarray(temp_frame)
|
||||
|
||||
swapped_frame_raw = face_swapper.get(
|
||||
temp_frame, target_face, source_face, paste_back=True
|
||||
)
|
||||
@@ -174,13 +194,42 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
)
|
||||
|
||||
if getattr(modules.globals, "show_mouth_mask_box", False):
|
||||
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
||||
# Draw visualization on the swapped_frame *before* opacity blending
|
||||
swapped_frame = draw_mouth_mask_visualization(
|
||||
swapped_frame, target_face, mouth_mask_data
|
||||
)
|
||||
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
||||
# Draw visualization on the swapped_frame *before* opacity blending
|
||||
swapped_frame = draw_mouth_mask_visualization(
|
||||
swapped_frame, target_face, mouth_mask_data
|
||||
)
|
||||
|
||||
# --- Poisson Blending ---
|
||||
if getattr(modules.globals, "poisson_blend", False):
|
||||
face_mask = create_face_mask(target_face, temp_frame)
|
||||
if face_mask is not None:
|
||||
# Find bounding box of the mask
|
||||
y_indices, x_indices = np.where(face_mask > 0)
|
||||
if len(x_indices) > 0 and len(y_indices) > 0:
|
||||
x_min, x_max = np.min(x_indices), np.max(x_indices)
|
||||
y_min, y_max = np.min(y_indices), np.max(y_indices)
|
||||
|
||||
# Apply opacity blend between the original frame and the swapped frame
|
||||
# Calculate center
|
||||
center = (int((x_min + x_max) / 2), int((y_min + y_max) / 2))
|
||||
|
||||
# Crop src and mask
|
||||
src_crop = swapped_frame[y_min : y_max + 1, x_min : x_max + 1]
|
||||
mask_crop = face_mask[y_min : y_max + 1, x_min : x_max + 1]
|
||||
|
||||
try:
|
||||
# Use original_frame as destination to blend the swapped face onto it
|
||||
swapped_frame = cv2.seamlessClone(
|
||||
src_crop,
|
||||
original_frame,
|
||||
mask_crop,
|
||||
center,
|
||||
cv2.NORMAL_CLONE,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Poisson blending failed: {e}")
|
||||
|
||||
# Apply opacity blend between the original frame and the swapped frame
|
||||
opacity = getattr(modules.globals, "opacity", 1.0)
|
||||
# Ensure opacity is within valid range [0.0, 1.0]
|
||||
opacity = max(0.0, min(1.0, opacity))
|
||||
@@ -195,14 +244,50 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||
return final_swapped_frame
|
||||
|
||||
|
||||
# --- START: Mac M1-M5 Optimized Face Detection ---
|
||||
def get_faces_optimized(frame: Frame, use_cache: bool = True) -> Optional[List[Face]]:
|
||||
"""Optimized face detection for live mode on Apple Silicon"""
|
||||
global LAST_DETECTION_TIME, FACE_DETECTION_CACHE
|
||||
|
||||
if not use_cache or not IS_APPLE_SILICON:
|
||||
# Standard detection
|
||||
if modules.globals.many_faces:
|
||||
return get_many_faces(frame)
|
||||
else:
|
||||
face = get_one_face(frame)
|
||||
return [face] if face else None
|
||||
|
||||
# Adaptive detection rate for live mode
|
||||
current_time = time.time()
|
||||
time_since_last = current_time - LAST_DETECTION_TIME
|
||||
|
||||
# Skip detection if too soon (adaptive frame skipping)
|
||||
if time_since_last < DETECTION_INTERVAL and FACE_DETECTION_CACHE:
|
||||
return FACE_DETECTION_CACHE.get('faces')
|
||||
|
||||
# Perform detection
|
||||
LAST_DETECTION_TIME = current_time
|
||||
if modules.globals.many_faces:
|
||||
faces = get_many_faces(frame)
|
||||
else:
|
||||
face = get_one_face(frame)
|
||||
faces = [face] if face else None
|
||||
|
||||
# Cache results
|
||||
FACE_DETECTION_CACHE['faces'] = faces
|
||||
FACE_DETECTION_CACHE['timestamp'] = current_time
|
||||
|
||||
return faces
|
||||
# --- END: Mac M1-M5 Optimized Face Detection ---
|
||||
|
||||
# --- START: Helper function for interpolation and sharpening ---
|
||||
def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame:
|
||||
"""Applies sharpening and interpolation."""
|
||||
"""Applies sharpening and interpolation with Apple Silicon optimizations."""
|
||||
global PREVIOUS_FRAME_RESULT
|
||||
|
||||
processed_frame = current_frame.copy()
|
||||
|
||||
# 1. Apply Sharpening (if enabled)
|
||||
# 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon
|
||||
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
|
||||
if sharpness_value > 0.0 and swapped_face_bboxes:
|
||||
height, width = processed_frame.shape[:2]
|
||||
@@ -225,23 +310,21 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
|
||||
continue
|
||||
|
||||
face_region = processed_frame[y1:y2, x1:x2]
|
||||
if face_region.size == 0: continue # Skip empty regions
|
||||
if face_region.size == 0: continue
|
||||
|
||||
# Apply sharpening using addWeighted for smoother control
|
||||
# Use try-except for GaussianBlur and addWeighted as they can fail on invalid inputs
|
||||
# Apply sharpening with optimized parameters for Apple Silicon
|
||||
try:
|
||||
blurred = cv2.GaussianBlur(face_region, (0, 0), 3) # sigma=3, kernel size auto
|
||||
sharpened_region = cv2.addWeighted(
|
||||
# Use smaller sigma for faster processing on Apple Silicon
|
||||
sigma = 2 if IS_APPLE_SILICON else 3
|
||||
blurred = cv2.GaussianBlur(face_region, (0, 0), sigma)
|
||||
sharpened_region = cv2.addWeighted(
|
||||
face_region, 1.0 + sharpness_value,
|
||||
blurred, -sharpness_value,
|
||||
0
|
||||
)
|
||||
# Ensure the sharpened region doesn't have invalid values
|
||||
sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8)
|
||||
processed_frame[y1:y2, x1:x2] = sharpened_region
|
||||
except cv2.error as sharpen_e:
|
||||
# print(f"Warning: OpenCV error during sharpening: {sharpen_e} for bbox {bbox}") # Debug
|
||||
# Skip sharpening for this region if it fails
|
||||
)
|
||||
sharpened_region = np.clip(sharpened_region, 0, 255).astype(np.uint8)
|
||||
processed_frame[y1:y2, x1:x2] = sharpened_region
|
||||
except cv2.error:
|
||||
pass
|
||||
|
||||
|
||||
@@ -455,6 +538,7 @@ def process_frames(
|
||||
) -> None:
|
||||
"""
|
||||
Processes a list of frame paths (typically for video).
|
||||
Optimized with better memory management and caching.
|
||||
Iterates through frames, applies the appropriate swapping logic based on globals,
|
||||
and saves the result back to the frame path. Handles multi-threading via caller.
|
||||
"""
|
||||
@@ -478,6 +562,8 @@ def process_frames(
|
||||
if source_face is None:
|
||||
# Specific message for no face detected after successful read
|
||||
update_status(f"Warning: Successfully read source image {source_path}, but no face was detected. Swaps will be skipped.", NAME)
|
||||
# Free memory immediately after extracting face
|
||||
del source_img
|
||||
except Exception as e:
|
||||
# Print the specific exception caught
|
||||
import traceback
|
||||
@@ -505,6 +591,7 @@ def process_frames(
|
||||
# update_status(f"Processing frame {i+1}/{total_frames}: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
||||
|
||||
# Read the target frame
|
||||
temp_frame = None
|
||||
try:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
if temp_frame is None:
|
||||
@@ -539,13 +626,19 @@ def process_frames(
|
||||
# traceback.print_exc()
|
||||
result_frame = temp_frame # Use original frame on processing error
|
||||
|
||||
# Write the result back to the same frame path
|
||||
# Write the result back to the same frame path with optimized compression
|
||||
try:
|
||||
write_success = cv2.imwrite(temp_frame_path, result_frame)
|
||||
# Use PNG compression level 3 (faster) instead of default 9
|
||||
write_success = cv2.imwrite(temp_frame_path, result_frame, [cv2.IMWRITE_PNG_COMPRESSION, 3])
|
||||
if not write_success:
|
||||
print(f"{NAME}: Error: Failed to write processed frame to {temp_frame_path}")
|
||||
except Exception as write_e:
|
||||
print(f"{NAME}: Error writing frame {temp_frame_path}: {write_e}")
|
||||
|
||||
# Free memory immediately after processing
|
||||
del temp_frame
|
||||
if result_frame is not None:
|
||||
del result_frame
|
||||
|
||||
# Update progress bar
|
||||
if progress:
|
||||
@@ -659,8 +752,9 @@ def create_lower_mouth_mask(
|
||||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||||
|
||||
try: # Wrap main logic in try-except
|
||||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
||||
lower_lip_order = [65, 66, 62, 70, 69, 18, 19, 20, 21, 22, 23, 24, 0, 8, 7, 6, 5, 4, 3, 2, 65] # 21 points
|
||||
# Use outer mouth landmarks (52-63) to capture the lips only
|
||||
# This avoids including the chin/jawline, preserving the face shape from the swap
|
||||
lower_lip_order = list(range(52, 64))
|
||||
|
||||
# Check if all indices are valid for the loaded landmarks (already partially done by < 106 check)
|
||||
if max(lower_lip_order) >= landmarks.shape[0]:
|
||||
@@ -684,31 +778,6 @@ def create_lower_mouth_mask(
|
||||
expansion_factor = 1 + mask_down_size
|
||||
expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center
|
||||
|
||||
mask_size = getattr(modules.globals, "mask_size", 1.0) # Default 1.0
|
||||
toplip_extension = mask_size * 0.5
|
||||
|
||||
# Define toplip indices relative to lower_lip_order (safer)
|
||||
toplip_local_indices = [0, 1, 2, 3, 4, 5, 19] # Indices in lower_lip_order for [65, 66, 62, 70, 69, 18, 2]
|
||||
|
||||
for idx in toplip_local_indices:
|
||||
if idx < len(expanded_landmarks): # Boundary check
|
||||
direction = expanded_landmarks[idx] - center
|
||||
norm = np.linalg.norm(direction)
|
||||
if norm > 1e-6: # Avoid division by zero
|
||||
direction_normalized = direction / norm
|
||||
expanded_landmarks[idx] += direction_normalized * toplip_extension
|
||||
|
||||
# Define chin indices relative to lower_lip_order
|
||||
chin_local_indices = [9, 10, 11, 12, 13, 14] # Indices for [22, 23, 24, 0, 8, 7]
|
||||
chin_extension = 2 * 0.2
|
||||
|
||||
for idx in chin_local_indices:
|
||||
if idx < len(expanded_landmarks): # Boundary check
|
||||
# Extend vertically based on distance from center y
|
||||
y_diff = expanded_landmarks[idx][1] - center[1]
|
||||
expanded_landmarks[idx][1] += y_diff * chin_extension
|
||||
|
||||
|
||||
# Ensure landmarks are finite after adjustments
|
||||
if not np.all(np.isfinite(expanded_landmarks)):
|
||||
# print("Warning: Non-finite values detected after expanding landmarks.")
|
||||
@@ -1007,13 +1076,43 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||
landmarks_int = landmarks.astype(np.int32)
|
||||
|
||||
# Use standard face outline landmarks (0-32)
|
||||
face_outline_points = landmarks_int[0:33] # Points 0 to 32 cover chin and sides
|
||||
# Use standard face outline (0-32)
|
||||
face_outline = landmarks_int[0:33]
|
||||
|
||||
# Estimate forehead points to ensure mask covers the whole face (including forehead)
|
||||
# This is critical for Poisson blending to work correctly on the forehead
|
||||
eyebrows = landmarks_int[33:43]
|
||||
if eyebrows.shape[0] > 0:
|
||||
chin = landmarks_int[16]
|
||||
eyebrow_center = np.mean(eyebrows, axis=0)
|
||||
|
||||
# Vector from chin to eyebrows (upwards)
|
||||
up_vector = eyebrow_center - chin
|
||||
norm = np.linalg.norm(up_vector)
|
||||
if norm > 0:
|
||||
up_vector /= norm
|
||||
|
||||
# Extend upwards by 1.0 of the chin-to-eyebrow distance (aggressive coverage)
|
||||
# This ensures the mask covers the entire forehead for proper blending
|
||||
forehead_offset = up_vector * (norm * 1.0)
|
||||
|
||||
# Shift eyebrows up to create forehead points
|
||||
forehead_points = eyebrows + forehead_offset
|
||||
|
||||
# Expand the top points slightly outwards to cover forehead corners
|
||||
# Calculate the center of the new top points
|
||||
top_center = np.mean(forehead_points, axis=0)
|
||||
|
||||
# Expand outwards by 20%
|
||||
forehead_points = (forehead_points - top_center) * 1.2 + top_center
|
||||
|
||||
# Combine outline and forehead points
|
||||
face_outline = np.concatenate((face_outline, forehead_points.astype(np.int32)), axis=0)
|
||||
|
||||
# Calculate convex hull of these points
|
||||
# Use try-except as convexHull can fail on degenerate input
|
||||
try:
|
||||
hull = cv2.convexHull(full_face_poly.astype(np.float32)) # Use float for accuracy
|
||||
hull = cv2.convexHull(face_outline.astype(np.float32)) # Use float for accuracy
|
||||
if hull is None or len(hull) < 3:
|
||||
# print("Warning: Convex hull calculation failed or returned too few points.")
|
||||
# Fallback: use bounding box of landmarks? Or just return empty mask?
|
||||
|
||||
+28
-13
@@ -36,7 +36,7 @@ if platform.system() == "Windows":
|
||||
ROOT = None
|
||||
POPUP = None
|
||||
POPUP_LIVE = None
|
||||
ROOT_HEIGHT = 750
|
||||
ROOT_HEIGHT = 800
|
||||
ROOT_WIDTH = 600
|
||||
|
||||
PREVIEW = None
|
||||
@@ -98,6 +98,7 @@ def save_switch_states():
|
||||
"keep_frames": modules.globals.keep_frames,
|
||||
"many_faces": modules.globals.many_faces,
|
||||
"map_faces": modules.globals.map_faces,
|
||||
"poisson_blend": modules.globals.poisson_blend,
|
||||
"color_correction": modules.globals.color_correction,
|
||||
"nsfw_filter": modules.globals.nsfw_filter,
|
||||
"live_mirror": modules.globals.live_mirror,
|
||||
@@ -120,6 +121,7 @@ def load_switch_states():
|
||||
modules.globals.keep_frames = switch_states.get("keep_frames", False)
|
||||
modules.globals.many_faces = switch_states.get("many_faces", False)
|
||||
modules.globals.map_faces = switch_states.get("map_faces", False)
|
||||
modules.globals.poisson_blend = switch_states.get("poisson_blend", False)
|
||||
modules.globals.color_correction = switch_states.get("color_correction", False)
|
||||
modules.globals.nsfw_filter = switch_states.get("nsfw_filter", False)
|
||||
modules.globals.live_mirror = switch_states.get("live_mirror", False)
|
||||
@@ -272,6 +274,19 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
)
|
||||
map_faces_switch.place(relx=0.1, rely=0.65)
|
||||
|
||||
poisson_blend_value = ctk.BooleanVar(value=modules.globals.poisson_blend)
|
||||
poisson_blend_switch = ctk.CTkSwitch(
|
||||
root,
|
||||
text=_("Poisson Blend"),
|
||||
variable=poisson_blend_value,
|
||||
cursor="hand2",
|
||||
command=lambda: (
|
||||
setattr(modules.globals, "poisson_blend", poisson_blend_value.get()),
|
||||
save_switch_states(),
|
||||
),
|
||||
)
|
||||
poisson_blend_switch.place(relx=0.1, rely=0.7)
|
||||
|
||||
show_fps_value = ctk.BooleanVar(value=modules.globals.show_fps)
|
||||
show_fps_switch = ctk.CTkSwitch(
|
||||
root,
|
||||
@@ -310,21 +325,21 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
start_button = ctk.CTkButton(
|
||||
root, text=_("Start"), cursor="hand2", command=lambda: analyze_target(start, root)
|
||||
)
|
||||
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
start_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
|
||||
stop_button = ctk.CTkButton(
|
||||
root, text=_("Destroy"), cursor="hand2", command=lambda: destroy()
|
||||
)
|
||||
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
stop_button.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
|
||||
preview_button = ctk.CTkButton(
|
||||
root, text=_("Preview"), cursor="hand2", command=lambda: toggle_preview()
|
||||
)
|
||||
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
preview_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
|
||||
# --- Camera Selection ---
|
||||
camera_label = ctk.CTkLabel(root, text=_("Select Camera:"))
|
||||
camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
camera_label.place(relx=0.1, rely=0.92, relwidth=0.2, relheight=0.05)
|
||||
|
||||
available_cameras = get_available_cameras()
|
||||
camera_indices, camera_names = available_cameras
|
||||
@@ -343,7 +358,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
root, variable=camera_variable, values=camera_names
|
||||
)
|
||||
|
||||
camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05)
|
||||
camera_optionmenu.place(relx=0.35, rely=0.92, relwidth=0.25, relheight=0.05)
|
||||
|
||||
live_button = ctk.CTkButton(
|
||||
root,
|
||||
@@ -363,7 +378,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
else "disabled"
|
||||
),
|
||||
)
|
||||
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||
live_button.place(relx=0.65, rely=0.92, relwidth=0.2, relheight=0.05)
|
||||
# --- End Camera Selection ---
|
||||
|
||||
# 1) Define a DoubleVar for transparency (0 = fully transparent, 1 = fully opaque)
|
||||
@@ -387,7 +402,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
|
||||
# 2) Transparency label and slider (placed ABOVE sharpness)
|
||||
transparency_label = ctk.CTkLabel(root, text="Transparency:")
|
||||
transparency_label.place(relx=0.15, rely=0.69, relwidth=0.2, relheight=0.05)
|
||||
transparency_label.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
|
||||
|
||||
transparency_slider = ctk.CTkSlider(
|
||||
root,
|
||||
@@ -403,7 +418,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
border_width=1,
|
||||
corner_radius=3,
|
||||
)
|
||||
transparency_slider.place(relx=0.35, rely=0.71, relwidth=0.5, relheight=0.02)
|
||||
transparency_slider.place(relx=0.35, rely=0.77, relwidth=0.5, relheight=0.02)
|
||||
|
||||
# 3) Sharpness label & slider
|
||||
sharpness_var = ctk.DoubleVar(value=0.0) # start at 0.0
|
||||
@@ -412,7 +427,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
update_status(f"Sharpness set to {value:.1f}")
|
||||
|
||||
sharpness_label = ctk.CTkLabel(root, text="Sharpness:")
|
||||
sharpness_label.place(relx=0.15, rely=0.74, relwidth=0.2, relheight=0.05)
|
||||
sharpness_label.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
|
||||
|
||||
sharpness_slider = ctk.CTkSlider(
|
||||
root,
|
||||
@@ -428,17 +443,17 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||
border_width=1,
|
||||
corner_radius=3,
|
||||
)
|
||||
sharpness_slider.place(relx=0.35, rely=0.76, relwidth=0.5, relheight=0.02)
|
||||
sharpness_slider.place(relx=0.35, rely=0.82, relwidth=0.5, relheight=0.02)
|
||||
|
||||
# Status and link at the bottom
|
||||
global status_label
|
||||
status_label = ctk.CTkLabel(root, text=None, justify="center")
|
||||
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
|
||||
status_label.place(relx=0.1, rely=0.96, relwidth=0.8)
|
||||
|
||||
donate_label = ctk.CTkLabel(
|
||||
root, text="Deep Live Cam", justify="center", cursor="hand2"
|
||||
)
|
||||
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
|
||||
donate_label.place(relx=0.1, rely=0.98, relwidth=0.8)
|
||||
donate_label.configure(
|
||||
text_color=ctk.ThemeManager.theme.get("URL").get("text_color")
|
||||
)
|
||||
|
||||
+116
-23
@@ -21,13 +21,14 @@ if platform.system().lower() == "darwin":
|
||||
|
||||
|
||||
def run_ffmpeg(args: List[str]) -> bool:
|
||||
"""Run ffmpeg with hardware acceleration and optimized settings."""
|
||||
commands = [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-hwaccel",
|
||||
"auto",
|
||||
"-loglevel",
|
||||
modules.globals.log_level,
|
||||
"-hwaccel", "auto", # Auto-detect hardware acceleration
|
||||
"-hwaccel_output_format", "auto", # Use hardware format when possible
|
||||
"-threads", str(modules.globals.execution_threads or 0), # 0 = auto-detect optimal thread count
|
||||
"-loglevel", modules.globals.log_level,
|
||||
]
|
||||
commands.extend(args)
|
||||
try:
|
||||
@@ -61,39 +62,131 @@ def detect_fps(target_path: str) -> float:
|
||||
|
||||
|
||||
def extract_frames(target_path: str) -> None:
|
||||
"""Extract frames with hardware acceleration and optimized settings."""
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
|
||||
# Use hardware-accelerated decoding and optimized pixel format
|
||||
run_ffmpeg(
|
||||
[
|
||||
"-i",
|
||||
target_path,
|
||||
"-pix_fmt",
|
||||
"rgb24",
|
||||
"-i", target_path,
|
||||
"-vf", "format=rgb24", # Use video filter for format conversion (faster)
|
||||
"-vsync", "0", # Prevent frame duplication
|
||||
"-frame_pts", "1", # Preserve frame timing
|
||||
os.path.join(temp_directory_path, "%04d.png"),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def create_video(target_path: str, fps: float = 30.0) -> None:
|
||||
"""Create video with hardware-accelerated encoding and optimized settings."""
|
||||
temp_output_path = get_temp_output_path(target_path)
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
run_ffmpeg(
|
||||
[
|
||||
"-r",
|
||||
str(fps),
|
||||
"-i",
|
||||
os.path.join(temp_directory_path, "%04d.png"),
|
||||
"-c:v",
|
||||
modules.globals.video_encoder,
|
||||
"-crf",
|
||||
str(modules.globals.video_quality),
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-vf",
|
||||
"colorspace=bt709:iall=bt601-6-625:fast=1",
|
||||
|
||||
# Determine optimal encoder based on available hardware
|
||||
encoder = modules.globals.video_encoder
|
||||
encoder_options = []
|
||||
|
||||
# GPU-accelerated encoding options
|
||||
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
|
||||
# NVIDIA GPU encoding
|
||||
if encoder == 'libx264':
|
||||
encoder = 'h264_nvenc'
|
||||
encoder_options = [
|
||||
"-preset", "p7", # Highest quality preset for NVENC
|
||||
"-tune", "hq", # High quality tuning
|
||||
"-rc", "vbr", # Variable bitrate
|
||||
"-cq", str(modules.globals.video_quality), # Quality level
|
||||
"-b:v", "0", # Let CQ control bitrate
|
||||
"-multipass", "fullres", # Two-pass encoding for better quality
|
||||
]
|
||||
elif encoder == 'libx265':
|
||||
encoder = 'hevc_nvenc'
|
||||
encoder_options = [
|
||||
"-preset", "p7",
|
||||
"-tune", "hq",
|
||||
"-rc", "vbr",
|
||||
"-cq", str(modules.globals.video_quality),
|
||||
"-b:v", "0",
|
||||
]
|
||||
elif 'DmlExecutionProvider' in modules.globals.execution_providers:
|
||||
# AMD/Intel GPU encoding (DirectML on Windows)
|
||||
if encoder == 'libx264':
|
||||
# Try AMD AMF encoder
|
||||
encoder = 'h264_amf'
|
||||
encoder_options = [
|
||||
"-quality", "quality", # Quality mode
|
||||
"-rc", "vbr_latency",
|
||||
"-qp_i", str(modules.globals.video_quality),
|
||||
"-qp_p", str(modules.globals.video_quality),
|
||||
]
|
||||
elif encoder == 'libx265':
|
||||
encoder = 'hevc_amf'
|
||||
encoder_options = [
|
||||
"-quality", "quality",
|
||||
"-rc", "vbr_latency",
|
||||
"-qp_i", str(modules.globals.video_quality),
|
||||
"-qp_p", str(modules.globals.video_quality),
|
||||
]
|
||||
else:
|
||||
# CPU encoding with optimized settings
|
||||
if encoder == 'libx264':
|
||||
encoder_options = [
|
||||
"-preset", "medium", # Balance speed/quality
|
||||
"-crf", str(modules.globals.video_quality),
|
||||
"-tune", "film", # Optimize for film content
|
||||
]
|
||||
elif encoder == 'libx265':
|
||||
encoder_options = [
|
||||
"-preset", "medium",
|
||||
"-crf", str(modules.globals.video_quality),
|
||||
"-x265-params", "log-level=error",
|
||||
]
|
||||
elif encoder == 'libvpx-vp9':
|
||||
encoder_options = [
|
||||
"-crf", str(modules.globals.video_quality),
|
||||
"-b:v", "0", # Constant quality mode
|
||||
"-cpu-used", "2", # Speed vs quality (0-5, lower=slower/better)
|
||||
]
|
||||
|
||||
# Build ffmpeg command
|
||||
ffmpeg_args = [
|
||||
"-r", str(fps),
|
||||
"-i", os.path.join(temp_directory_path, "%04d.png"),
|
||||
"-c:v", encoder,
|
||||
]
|
||||
|
||||
# Add encoder-specific options
|
||||
ffmpeg_args.extend(encoder_options)
|
||||
|
||||
# Add common options
|
||||
ffmpeg_args.extend([
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-movflags", "+faststart", # Enable fast start for web playback
|
||||
"-vf", "colorspace=bt709:iall=bt601-6-625:fast=1",
|
||||
"-y",
|
||||
temp_output_path,
|
||||
])
|
||||
|
||||
# Try with hardware encoder first, fallback to software if it fails
|
||||
success = run_ffmpeg(ffmpeg_args)
|
||||
|
||||
if not success and encoder in ['h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf']:
|
||||
# Fallback to software encoding
|
||||
print(f"Hardware encoding with {encoder} failed, falling back to software encoding...")
|
||||
fallback_encoder = 'libx264' if 'h264' in encoder else 'libx265'
|
||||
ffmpeg_args_fallback = [
|
||||
"-r", str(fps),
|
||||
"-i", os.path.join(temp_directory_path, "%04d.png"),
|
||||
"-c:v", fallback_encoder,
|
||||
"-preset", "medium",
|
||||
"-crf", str(modules.globals.video_quality),
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-movflags", "+faststart",
|
||||
"-vf", "colorspace=bt709:iall=bt601-6-625:fast=1",
|
||||
"-y",
|
||||
temp_output_path,
|
||||
]
|
||||
)
|
||||
run_ffmpeg(ffmpeg_args_fallback)
|
||||
|
||||
|
||||
def restore_audio(target_path: str, output_path: str) -> None:
|
||||
|
||||
Reference in New Issue
Block a user