feat: add GPEN-BFR 256 and 512 ONNX face enhancers
Add two new face enhancement processors using GPEN-BFR ONNX models at 256x256 and 512x512 resolutions. Models auto-download on first use from GitHub releases. Integrates into existing frame processor pipeline alongside GFPGAN enhancer with UI toggle switches. - modules/paths.py: Shared path constants module - modules/processors/frame/_onnx_enhancer.py: ONNX enhancement utilities - modules/processors/frame/face_enhancer_gpen256.py: GPEN-BFR 256 processor - modules/processors/frame/face_enhancer_gpen512.py: GPEN-BFR 512 processor - modules/core.py: Add GPEN choices to --frame-processor CLI arg - modules/globals.py: Add GPEN entries to fp_ui toggle dict - modules/ui.py: Add GPEN toggle switches and processing integration Closes #1663 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,145 @@
|
||||
"""Shared ONNX-based face enhancement utilities for GPEN-BFR models.
|
||||
|
||||
Provides session creation, pre/post processing, and the core
|
||||
enhance-face-via-ONNX pipeline.
|
||||
"""
|
||||
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import onnxruntime
|
||||
|
||||
import modules.globals
|
||||
|
||||
IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64"
|
||||
|
||||
# Limit concurrent ONNX calls to avoid VRAM exhaustion on multi-face frames
|
||||
THREAD_SEMAPHORE = threading.Semaphore(min(max(1, (os.cpu_count() or 1)), 8))
|
||||
|
||||
|
||||
def create_onnx_session(model_path: str) -> onnxruntime.InferenceSession:
|
||||
"""Create an ONNX Runtime session using the configured execution providers."""
|
||||
providers = modules.globals.execution_providers
|
||||
session = onnxruntime.InferenceSession(model_path, providers=providers)
|
||||
return session
|
||||
|
||||
|
||||
def warmup_session(session: onnxruntime.InferenceSession) -> None:
|
||||
"""Run a dummy inference pass to trigger JIT / compile caching."""
|
||||
try:
|
||||
input_feed = {
|
||||
inp.name: np.zeros(
|
||||
[d if isinstance(d, int) and d > 0 else 1 for d in inp.shape],
|
||||
dtype=np.float32,
|
||||
)
|
||||
for inp in session.get_inputs()
|
||||
}
|
||||
session.run(None, input_feed)
|
||||
except Exception as e:
|
||||
print(f"ONNX enhancer warmup skipped (non-fatal): {e}")
|
||||
|
||||
|
||||
def preprocess_face(face_img: np.ndarray, input_size: int) -> np.ndarray:
|
||||
"""Resize, normalize, and convert a BGR face crop to ONNX input blob.
|
||||
|
||||
GPEN-BFR expects [1, 3, H, W] float32 in RGB, normalized to [-1, 1].
|
||||
"""
|
||||
resized = cv2.resize(face_img, (input_size, input_size), interpolation=cv2.INTER_LINEAR)
|
||||
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
||||
blob = rgb.astype(np.float32) / 255.0 * 2.0 - 1.0
|
||||
blob = np.transpose(blob, (2, 0, 1))[np.newaxis, ...]
|
||||
return blob
|
||||
|
||||
|
||||
def postprocess_face(output: np.ndarray) -> np.ndarray:
|
||||
"""Convert ONNX output [1, 3, H, W] float32 back to BGR uint8 image."""
|
||||
img = output[0].transpose(1, 2, 0)
|
||||
img = ((img + 1.0) / 2.0 * 255.0)
|
||||
img = np.clip(img, 0, 255).astype(np.uint8)
|
||||
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
||||
return img
|
||||
|
||||
|
||||
def _get_face_affine(face: Any, input_size: int):
|
||||
"""Compute affine transform to align a face to GPEN input space.
|
||||
|
||||
Returns (M, inv_M) — forward and inverse affine matrices.
|
||||
"""
|
||||
template = np.array([
|
||||
[0.31556875, 0.4615741],
|
||||
[0.68262291, 0.4615741],
|
||||
[0.50009375, 0.6405054],
|
||||
[0.34947187, 0.8246919],
|
||||
[0.65343645, 0.8246919],
|
||||
], dtype=np.float32) * input_size
|
||||
|
||||
landmarks = None
|
||||
if hasattr(face, "kps") and face.kps is not None:
|
||||
landmarks = face.kps.astype(np.float32)
|
||||
elif hasattr(face, "landmark_2d_106") and face.landmark_2d_106 is not None:
|
||||
lm106 = face.landmark_2d_106
|
||||
landmarks = np.array([
|
||||
lm106[38], # left eye
|
||||
lm106[88], # right eye
|
||||
lm106[86], # nose tip
|
||||
lm106[52], # left mouth
|
||||
lm106[61], # right mouth
|
||||
], dtype=np.float32)
|
||||
|
||||
if landmarks is None or len(landmarks) < 5:
|
||||
return None, None
|
||||
|
||||
M = cv2.estimateAffinePartial2D(landmarks, template, method=cv2.LMEDS)[0]
|
||||
if M is None:
|
||||
return None, None
|
||||
inv_M = cv2.invertAffineTransform(M)
|
||||
return M, inv_M
|
||||
|
||||
|
||||
def enhance_face_onnx(
|
||||
frame: np.ndarray,
|
||||
face: Any,
|
||||
session: onnxruntime.InferenceSession,
|
||||
input_size: int,
|
||||
) -> np.ndarray:
|
||||
"""Enhance a single face in the frame using an ONNX face restoration model."""
|
||||
M, inv_M = _get_face_affine(face, input_size)
|
||||
if M is None:
|
||||
return frame
|
||||
|
||||
face_crop = cv2.warpAffine(
|
||||
frame, M, (input_size, input_size),
|
||||
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE,
|
||||
)
|
||||
|
||||
blob = preprocess_face(face_crop, input_size)
|
||||
with THREAD_SEMAPHORE:
|
||||
output = session.run(None, {session.get_inputs()[0].name: blob})[0]
|
||||
enhanced = postprocess_face(output)
|
||||
|
||||
# Create mask for blending (feathered edges)
|
||||
mask = np.ones((input_size, input_size), dtype=np.float32)
|
||||
border = max(1, input_size // 16)
|
||||
mask[:border, :] = np.linspace(0, 1, border)[:, np.newaxis]
|
||||
mask[-border:, :] = np.linspace(1, 0, border)[:, np.newaxis]
|
||||
mask[:, :border] = np.minimum(mask[:, :border], np.linspace(0, 1, border)[np.newaxis, :])
|
||||
mask[:, -border:] = np.minimum(mask[:, -border:], np.linspace(1, 0, border)[np.newaxis, :])
|
||||
|
||||
h, w = frame.shape[:2]
|
||||
warped_enhanced = cv2.warpAffine(
|
||||
enhanced, inv_M, (w, h),
|
||||
flags=cv2.INTER_LINEAR, borderValue=(0, 0, 0),
|
||||
)
|
||||
warped_mask = cv2.warpAffine(
|
||||
mask, inv_M, (w, h),
|
||||
flags=cv2.INTER_LINEAR, borderValue=0,
|
||||
)
|
||||
|
||||
mask_3ch = warped_mask[:, :, np.newaxis]
|
||||
result = (warped_enhanced.astype(np.float32) * mask_3ch +
|
||||
frame.astype(np.float32) * (1.0 - mask_3ch))
|
||||
return np.clip(result, 0, 255).astype(np.uint8)
|
||||
@@ -0,0 +1,125 @@
|
||||
"""GPEN-BFR-256 face enhancer — ONNX-based face restoration at 256x256."""
|
||||
|
||||
from typing import Any, List
|
||||
import os
|
||||
import threading
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.typing import Frame, Face
|
||||
from modules.utilities import (
|
||||
is_image,
|
||||
is_video,
|
||||
)
|
||||
from modules.processors.frame._onnx_enhancer import (
|
||||
create_onnx_session,
|
||||
warmup_session,
|
||||
enhance_face_onnx,
|
||||
)
|
||||
|
||||
NAME = "DLC.FACE-ENHANCER-GPEN256"
|
||||
INPUT_SIZE = 256
|
||||
MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-256.onnx"
|
||||
MODEL_FILE = "GPEN-BFR-256.onnx"
|
||||
|
||||
ENHANCER = None
|
||||
THREAD_LOCK = threading.Lock()
|
||||
|
||||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
models_dir = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||
)
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
update_status(f"Downloading {MODEL_FILE}...", NAME)
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
return True
|
||||
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status("Select an image or video for target path.", NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_enhancer() -> Any:
|
||||
global ENHANCER
|
||||
with THREAD_LOCK:
|
||||
if ENHANCER is None:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
if not os.path.exists(model_path):
|
||||
raise FileNotFoundError(f"Model file not found: {model_path}")
|
||||
print(f"{NAME}: Loading ONNX model from {model_path}")
|
||||
ENHANCER = create_onnx_session(model_path)
|
||||
warmup_session(ENHANCER)
|
||||
print(f"{NAME}: Model loaded successfully.")
|
||||
return ENHANCER
|
||||
|
||||
|
||||
def enhance_face(temp_frame: Frame, face: Face) -> Frame:
|
||||
try:
|
||||
session = get_enhancer()
|
||||
except Exception as e:
|
||||
print(f"{NAME}: {e}")
|
||||
return temp_frame
|
||||
try:
|
||||
return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE)
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error during face enhancement: {e}")
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face is None:
|
||||
return temp_frame
|
||||
return enhance_face(temp_frame, target_face)
|
||||
|
||||
|
||||
def process_frame_v2(temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face:
|
||||
temp_frame = enhance_face(temp_frame, target_face)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frames(
|
||||
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
|
||||
) -> None:
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
if temp_frame is None:
|
||||
if progress:
|
||||
progress.update(1)
|
||||
continue
|
||||
result = process_frame(None, temp_frame)
|
||||
cv2.imwrite(temp_frame_path, result)
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
|
||||
target_frame = cv2.imread(target_path)
|
||||
if target_frame is None:
|
||||
print(f"{NAME}: Error: Failed to read target image {target_path}")
|
||||
return
|
||||
result_frame = process_frame(None, target_frame)
|
||||
cv2.imwrite(output_path, result_frame)
|
||||
print(f"{NAME}: Enhanced image saved to {output_path}")
|
||||
|
||||
|
||||
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
|
||||
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||
@@ -0,0 +1,125 @@
|
||||
"""GPEN-BFR-512 face enhancer — ONNX-based face restoration at 512x512."""
|
||||
|
||||
from typing import Any, List
|
||||
import os
|
||||
import threading
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import modules.globals
|
||||
import modules.processors.frame.core
|
||||
from modules.core import update_status
|
||||
from modules.face_analyser import get_one_face
|
||||
from modules.typing import Frame, Face
|
||||
from modules.utilities import (
|
||||
is_image,
|
||||
is_video,
|
||||
)
|
||||
from modules.processors.frame._onnx_enhancer import (
|
||||
create_onnx_session,
|
||||
warmup_session,
|
||||
enhance_face_onnx,
|
||||
)
|
||||
|
||||
NAME = "DLC.FACE-ENHANCER-GPEN512"
|
||||
INPUT_SIZE = 512
|
||||
MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-512.onnx"
|
||||
MODEL_FILE = "GPEN-BFR-512.onnx"
|
||||
|
||||
ENHANCER = None
|
||||
THREAD_LOCK = threading.Lock()
|
||||
|
||||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
models_dir = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||
)
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
update_status(f"Downloading {MODEL_FILE}...", NAME)
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
return True
|
||||
|
||||
|
||||
def pre_start() -> bool:
|
||||
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
|
||||
update_status("Select an image or video for target path.", NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_enhancer() -> Any:
|
||||
global ENHANCER
|
||||
with THREAD_LOCK:
|
||||
if ENHANCER is None:
|
||||
model_path = os.path.join(models_dir, MODEL_FILE)
|
||||
if not os.path.exists(model_path):
|
||||
from modules.utilities import conditional_download
|
||||
conditional_download(models_dir, [MODEL_URL])
|
||||
if not os.path.exists(model_path):
|
||||
raise FileNotFoundError(f"Model file not found: {model_path}")
|
||||
print(f"{NAME}: Loading ONNX model from {model_path}")
|
||||
ENHANCER = create_onnx_session(model_path)
|
||||
warmup_session(ENHANCER)
|
||||
print(f"{NAME}: Model loaded successfully.")
|
||||
return ENHANCER
|
||||
|
||||
|
||||
def enhance_face(temp_frame: Frame, face: Face) -> Frame:
|
||||
try:
|
||||
session = get_enhancer()
|
||||
except Exception as e:
|
||||
print(f"{NAME}: {e}")
|
||||
return temp_frame
|
||||
try:
|
||||
return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE)
|
||||
except Exception as e:
|
||||
print(f"{NAME}: Error during face enhancement: {e}")
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face is None:
|
||||
return temp_frame
|
||||
return enhance_face(temp_frame, target_face)
|
||||
|
||||
|
||||
def process_frame_v2(temp_frame: Frame) -> Frame:
|
||||
target_face = get_one_face(temp_frame)
|
||||
if target_face:
|
||||
temp_frame = enhance_face(temp_frame, target_face)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def process_frames(
|
||||
source_path: str | None, temp_frame_paths: List[str], progress: Any = None
|
||||
) -> None:
|
||||
for temp_frame_path in temp_frame_paths:
|
||||
temp_frame = cv2.imread(temp_frame_path)
|
||||
if temp_frame is None:
|
||||
if progress:
|
||||
progress.update(1)
|
||||
continue
|
||||
result = process_frame(None, temp_frame)
|
||||
cv2.imwrite(temp_frame_path, result)
|
||||
if progress:
|
||||
progress.update(1)
|
||||
|
||||
|
||||
def process_image(source_path: str | None, target_path: str, output_path: str) -> None:
|
||||
target_frame = cv2.imread(target_path)
|
||||
if target_frame is None:
|
||||
print(f"{NAME}: Error: Failed to read target image {target_path}")
|
||||
return
|
||||
result_frame = process_frame(None, target_frame)
|
||||
cv2.imwrite(output_path, result_frame)
|
||||
print(f"{NAME}: Enhanced image saved to {output_path}")
|
||||
|
||||
|
||||
def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None:
|
||||
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||
Reference in New Issue
Block a user