mirror of
https://github.com/HM-RunningHub/ComfyUI_RH_DreamID-V.git
synced 2026-03-11 15:31:21 +08:00
swap to rh custom version
This commit is contained in:
37
cptorh01.sh
37
cptorh01.sh
@@ -1,37 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# 创建目标目录
|
||||
mkdir -p /root/custom_nodes
|
||||
|
||||
# 简化的mount命令
|
||||
mount -t nfs4 -o rw rh-nfs.runninghub.cn:/data/rh_storage/global/custom_nodes_rel /root/custom_nodes
|
||||
|
||||
# 获取当前目录名作为NODE_NAME
|
||||
NODE_NAME=$(basename "$PWD")
|
||||
|
||||
# 创建目标目录
|
||||
mkdir -p /root/custom_nodes/${NODE_NAME}
|
||||
|
||||
# 显示将要执行的rsync命令
|
||||
echo "准备执行以下rsync命令:"
|
||||
echo "rsync -av --include=\"*/\" --include=\"*.py\" --exclude=\"*\" ./ /root/custom_nodes/${NODE_NAME}/"
|
||||
echo ""
|
||||
echo "此命令将同步当前目录及子目录中的所有 .py 文件到 /root/custom_nodes/${NODE_NAME}/"
|
||||
echo ""
|
||||
read -p "是否继续执行?(输入 Y 确认): " -n 1 -r
|
||||
echo ""
|
||||
|
||||
if [[ $REPLY =~ ^[Yy]$ ]]; then
|
||||
echo "开始同步 Python 文件..."
|
||||
rsync -av --include="*/" --include="*.py" --exclude="*" ./ /root/custom_nodes/${NODE_NAME}/
|
||||
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Python 文件同步完成!"
|
||||
else
|
||||
echo "Python 文件同步失败!"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "取消同步操作。"
|
||||
exit 1
|
||||
fi
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -29,75 +29,29 @@ from IPython.display import display, Image as IPyImage
|
||||
import torchvision.transforms as T
|
||||
|
||||
import sys
|
||||
from .media_pipe.mp_utils import LMKExtractor
|
||||
from .media_pipe.mp_utils import LMKExtractor
|
||||
from .media_pipe.draw_util import FaceMeshVisualizer
|
||||
from .media_pipe.pose_util import project_points_with_trans, matrix_to_euler_and_translation, euler_and_translation_to_matrix
|
||||
|
||||
# Try to import hybrid detector (InsightFace + MediaPipe)
|
||||
HYBRID_AVAILABLE = False
|
||||
try:
|
||||
from .media_pipe.face_detector_hybrid import HybridLMKExtractor, INSIGHTFACE_AVAILABLE
|
||||
HYBRID_AVAILABLE = INSIGHTFACE_AVAILABLE
|
||||
if HYBRID_AVAILABLE:
|
||||
print("[get_video_npy] InsightFace hybrid detector available")
|
||||
except ImportError as e:
|
||||
print(f"[get_video_npy] Hybrid detector not available: {e}")
|
||||
|
||||
# Default global extractor for backward compatibility (used by get_video_npy)
|
||||
# Uses default threshold of 0.5
|
||||
lmk_extractor = LMKExtractor(min_detection_confidence=0.5, min_tracking_confidence=0.5)
|
||||
lmk_extractor = LMKExtractor()
|
||||
vis = FaceMeshVisualizer(forehead_edge=False)
|
||||
|
||||
def prehandle_video(video_path, save_path=None, fps=24, debug=False, min_detection_confidence=0.3, use_insightface=False):
|
||||
"""
|
||||
Detect faces in video and return face detection results.
|
||||
For frames without detectable faces, use the previous frame's result (interpolation).
|
||||
|
||||
NOTE: This function NO LONGER re-encodes the video. The original video is used directly.
|
||||
The save_path parameter is kept for backward compatibility but ignored.
|
||||
|
||||
Args:
|
||||
video_path: Path to input video
|
||||
save_path: DEPRECATED - kept for backward compatibility, ignored
|
||||
fps: Frames per second (not used, kept for backward compatibility)
|
||||
debug: Enable debug logging
|
||||
min_detection_confidence: Face detection threshold (only used if use_insightface=True)
|
||||
use_insightface: Use InsightFace + MediaPipe hybrid detection (default: False to match original behavior)
|
||||
|
||||
Returns:
|
||||
interpolated_frames: list of frame indices that used interpolated face results
|
||||
face_results: list of face detection results for ALL frames (with interpolation)
|
||||
"""
|
||||
# Use original LMKExtractor with default parameters (matches original project behavior)
|
||||
# Only use hybrid detector if explicitly requested
|
||||
if use_insightface and HYBRID_AVAILABLE:
|
||||
from .media_pipe.face_detector_hybrid import HybridLMKExtractor
|
||||
extractor = HybridLMKExtractor(
|
||||
min_detection_confidence=min_detection_confidence,
|
||||
min_tracking_confidence=min_detection_confidence,
|
||||
use_insightface=True,
|
||||
insightface_det_thresh=min_detection_confidence
|
||||
)
|
||||
print(f"[prehandle_video] Using InsightFace + MediaPipe hybrid detector (threshold: {min_detection_confidence})")
|
||||
else:
|
||||
# Use original LMKExtractor with DEFAULT parameters (no custom thresholds)
|
||||
# This matches the original project behavior exactly
|
||||
extractor = LMKExtractor() # No parameters = use MediaPipe defaults
|
||||
print(f"[prehandle_video] Using original MediaPipe detector (default thresholds)")
|
||||
|
||||
def prehandle_video(video_path, save_path, fps=24):
|
||||
frames = imageio.get_reader(video_path)
|
||||
|
||||
face_results = [] # Store face results for ALL frames
|
||||
interpolated_frames = [] # Track frames that used interpolation
|
||||
last_valid_result = None # Store last valid face result for interpolation
|
||||
total_frames = 0
|
||||
detected_count = 0
|
||||
|
||||
# Only enable debug for first few frames to avoid log spam
|
||||
debug_limit = 10 if debug else 0
|
||||
|
||||
meta = frames.get_meta_data()
|
||||
|
||||
# size = meta.get('size')
|
||||
codec = meta.get('codec', 'libx264')
|
||||
writer = imageio.get_writer(
|
||||
save_path,
|
||||
fps=fps,
|
||||
codec=codec,
|
||||
macro_block_size=1,
|
||||
quality=10
|
||||
)
|
||||
skip_frames_index = []
|
||||
skip_frames_data = {}
|
||||
for i, frame in enumerate(frames):
|
||||
total_frames += 1
|
||||
frame_bgr = cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR)
|
||||
face_result = lmk_extractor(frame_bgr)
|
||||
if face_result is None:
|
||||
@@ -111,11 +65,11 @@ def prehandle_video(video_path, save_path=None, fps=24, debug=False, min_detecti
|
||||
return skip_frames_index, skip_frames_data
|
||||
|
||||
def get_video_npy(video_path):
|
||||
"""
|
||||
Extract face landmarks from video frames.
|
||||
Frames without detectable faces are skipped.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
frames = imageio.get_reader(video_path)
|
||||
# print(f'frames count: {len(frames)}')
|
||||
|
||||
face_results = []
|
||||
skip_frames_index = []
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,333 +0,0 @@
|
||||
# Copyright 2024-2025 RunningHub. All rights reserved.
|
||||
# Hybrid face detector: InsightFace detection + MediaPipe landmarks
|
||||
"""
|
||||
Hybrid face detector that uses InsightFace for robust face detection
|
||||
and MediaPipe for detailed landmark extraction.
|
||||
|
||||
This provides better face detection on challenging videos while
|
||||
maintaining compatibility with the existing landmark-based pipeline.
|
||||
"""
|
||||
|
||||
import os
|
||||
import numpy as np
|
||||
import cv2
|
||||
import time
|
||||
|
||||
# Check if insightface is available
|
||||
INSIGHTFACE_AVAILABLE = False
|
||||
try:
|
||||
from insightface.app import FaceAnalysis
|
||||
INSIGHTFACE_AVAILABLE = True
|
||||
except ImportError:
|
||||
print("[HybridDetector] InsightFace not available, falling back to MediaPipe only")
|
||||
|
||||
# Get ComfyUI models path for InsightFace models
|
||||
def get_insightface_model_root():
|
||||
"""
|
||||
Get InsightFace model root directory.
|
||||
Priority:
|
||||
1. {ComfyUI}/models/insightface
|
||||
2. Default ~/.insightface
|
||||
"""
|
||||
# Try to find ComfyUI models path
|
||||
try:
|
||||
import folder_paths
|
||||
comfy_models_path = folder_paths.models_dir
|
||||
insightface_path = os.path.join(comfy_models_path, "insightface")
|
||||
if os.path.exists(insightface_path):
|
||||
print(f"[HybridDetector] Using InsightFace models from: {insightface_path}")
|
||||
return insightface_path
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Try relative path from this file (for standalone testing)
|
||||
# Go up to ComfyUI_RH_DreamID-V, then to parent ComfyUI/models/insightface
|
||||
current_dir = os.path.dirname(__file__)
|
||||
possible_paths = [
|
||||
# Standard ComfyUI custom_nodes layout
|
||||
os.path.join(current_dir, "..", "..", "..", "..", "models", "insightface"),
|
||||
# Alternative layout
|
||||
os.path.join(current_dir, "..", "..", "..", "models", "insightface"),
|
||||
]
|
||||
|
||||
for path in possible_paths:
|
||||
abs_path = os.path.abspath(path)
|
||||
if os.path.exists(abs_path):
|
||||
print(f"[HybridDetector] Using InsightFace models from: {abs_path}")
|
||||
return abs_path
|
||||
|
||||
# Fallback to default (will try to download if not exists)
|
||||
default_path = os.path.expanduser("~/.insightface")
|
||||
print(f"[HybridDetector] Using default InsightFace path: {default_path}")
|
||||
return default_path
|
||||
|
||||
import mediapipe as mp
|
||||
from mediapipe.tasks import python
|
||||
from mediapipe.tasks.python import vision
|
||||
from . import face_landmark
|
||||
|
||||
CUR_DIR = os.path.dirname(__file__)
|
||||
|
||||
|
||||
class InsightFaceDetector:
|
||||
"""InsightFace-based face detector for robust detection."""
|
||||
|
||||
def __init__(self, det_size=(640, 640), det_thresh=0.3, model_name='buffalo_l'):
|
||||
"""
|
||||
Initialize InsightFace detector.
|
||||
|
||||
Args:
|
||||
det_size: Detection input size
|
||||
det_thresh: Detection threshold (lower = more detections)
|
||||
model_name: Model name (default: buffalo_l)
|
||||
"""
|
||||
if not INSIGHTFACE_AVAILABLE:
|
||||
raise RuntimeError("InsightFace is not installed. Run: pip install insightface onnxruntime")
|
||||
|
||||
# Get model root directory (use ComfyUI models path if available)
|
||||
model_root = get_insightface_model_root()
|
||||
|
||||
# Check if model exists locally
|
||||
model_path = os.path.join(model_root, "models", model_name)
|
||||
if not os.path.exists(model_path):
|
||||
# Try without 'models' subdirectory
|
||||
model_path = os.path.join(model_root, model_name)
|
||||
|
||||
if os.path.exists(model_path):
|
||||
print(f"[InsightFaceDetector] Found local model at: {model_path}")
|
||||
else:
|
||||
print(f"[InsightFaceDetector] WARNING: Model not found at {model_path}, may attempt download")
|
||||
|
||||
self.app = FaceAnalysis(
|
||||
name=model_name,
|
||||
root=model_root,
|
||||
allowed_modules=['detection'], # Only use detection module
|
||||
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
|
||||
)
|
||||
self.app.prepare(ctx_id=0, det_size=det_size, det_thresh=det_thresh)
|
||||
self.det_thresh = det_thresh
|
||||
print(f"[InsightFaceDetector] Initialized with det_thresh={det_thresh}, model={model_name}")
|
||||
|
||||
def detect(self, img_bgr):
|
||||
"""
|
||||
Detect faces in image.
|
||||
|
||||
Args:
|
||||
img_bgr: BGR image (OpenCV format)
|
||||
|
||||
Returns:
|
||||
List of tuples [(bbox, score, area), ...] where bbox is (x1, y1, x2, y2)
|
||||
"""
|
||||
h, w = img_bgr.shape[:2]
|
||||
faces = self.app.get(img_bgr)
|
||||
results = []
|
||||
for face in faces:
|
||||
bbox = face.bbox.astype(int)
|
||||
x1, y1, x2, y2 = bbox
|
||||
|
||||
# Filter out invalid bboxes (negative coords or out of bounds)
|
||||
if x1 < 0 or y1 < 0 or x2 > w or y2 > h:
|
||||
continue
|
||||
|
||||
# Filter out too small faces
|
||||
face_w, face_h = x2 - x1, y2 - y1
|
||||
if face_w < 30 or face_h < 30:
|
||||
continue
|
||||
|
||||
area = face_w * face_h
|
||||
score = face.det_score if hasattr(face, 'det_score') else 1.0
|
||||
results.append((bbox, score, area))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class HybridLMKExtractor:
|
||||
"""
|
||||
Hybrid face landmark extractor.
|
||||
Uses InsightFace for detection + MediaPipe for landmarks.
|
||||
Falls back to MediaPipe-only if InsightFace is not available.
|
||||
"""
|
||||
|
||||
def __init__(self, FPS=25, min_detection_confidence=0.5, min_tracking_confidence=0.5,
|
||||
use_insightface=True, insightface_det_thresh=0.3):
|
||||
"""
|
||||
Initialize hybrid extractor.
|
||||
|
||||
Args:
|
||||
FPS: Frames per second for video mode
|
||||
min_detection_confidence: MediaPipe detection confidence
|
||||
min_tracking_confidence: MediaPipe tracking confidence
|
||||
use_insightface: Whether to use InsightFace for pre-detection
|
||||
insightface_det_thresh: InsightFace detection threshold
|
||||
"""
|
||||
self.use_insightface = use_insightface and INSIGHTFACE_AVAILABLE
|
||||
|
||||
# Initialize InsightFace detector
|
||||
if self.use_insightface:
|
||||
try:
|
||||
self.insightface_detector = InsightFaceDetector(det_thresh=insightface_det_thresh)
|
||||
print(f"[HybridLMKExtractor] Using InsightFace + MediaPipe hybrid mode")
|
||||
except Exception as e:
|
||||
print(f"[HybridLMKExtractor] Failed to init InsightFace: {e}, falling back to MediaPipe only")
|
||||
self.use_insightface = False
|
||||
|
||||
# Initialize MediaPipe FaceLandmarker
|
||||
self.mode = mp.tasks.vision.FaceDetectorOptions.running_mode.IMAGE
|
||||
base_options = python.BaseOptions(
|
||||
model_asset_path=os.path.join(CUR_DIR, 'mp_models/face_landmarker_v2_with_blendshapes.task')
|
||||
)
|
||||
base_options.delegate = mp.tasks.BaseOptions.Delegate.CPU
|
||||
options = vision.FaceLandmarkerOptions(
|
||||
base_options=base_options,
|
||||
running_mode=self.mode,
|
||||
output_face_blendshapes=True,
|
||||
output_facial_transformation_matrixes=True,
|
||||
num_faces=1,
|
||||
min_face_detection_confidence=min_detection_confidence,
|
||||
min_face_presence_confidence=min_detection_confidence,
|
||||
min_tracking_confidence=min_tracking_confidence
|
||||
)
|
||||
self.mp_detector = face_landmark.FaceLandmarker.create_from_options(options)
|
||||
self.last_ts = 0
|
||||
self.frame_ms = int(1000 / FPS)
|
||||
|
||||
if not self.use_insightface:
|
||||
print(f"[HybridLMKExtractor] Using MediaPipe only mode")
|
||||
|
||||
def _crop_and_pad_face(self, img_bgr, bbox, padding_ratio=0.3):
|
||||
"""
|
||||
Crop face region with padding.
|
||||
|
||||
Args:
|
||||
img_bgr: Original image
|
||||
bbox: Face bounding box (x1, y1, x2, y2)
|
||||
padding_ratio: Padding ratio around face
|
||||
|
||||
Returns:
|
||||
cropped_img: Cropped face image
|
||||
offset: (offset_x, offset_y) for coordinate mapping
|
||||
scale: Scale factor for coordinate mapping
|
||||
"""
|
||||
h, w = img_bgr.shape[:2]
|
||||
x1, y1, x2, y2 = bbox
|
||||
|
||||
# Add padding
|
||||
face_w, face_h = x2 - x1, y2 - y1
|
||||
pad_w = int(face_w * padding_ratio)
|
||||
pad_h = int(face_h * padding_ratio)
|
||||
|
||||
# Expand bbox with padding
|
||||
x1 = max(0, x1 - pad_w)
|
||||
y1 = max(0, y1 - pad_h)
|
||||
x2 = min(w, x2 + pad_w)
|
||||
y2 = min(h, y2 + pad_h)
|
||||
|
||||
# Crop
|
||||
cropped = img_bgr[y1:y2, x1:x2].copy()
|
||||
|
||||
return cropped, (x1, y1), (x2 - x1, y2 - y1)
|
||||
|
||||
def _extract_landmarks_mediapipe(self, img_bgr, debug=False):
|
||||
"""Extract landmarks using MediaPipe."""
|
||||
frame = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
|
||||
image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
|
||||
|
||||
try:
|
||||
detection_result, mesh3d = self.mp_detector.detect(image)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
print(f"[HybridLMKExtractor] MediaPipe detect exception: {e}")
|
||||
return None
|
||||
|
||||
# Check if face was detected
|
||||
if mesh3d is None:
|
||||
return None
|
||||
|
||||
bs_list = detection_result.face_blendshapes
|
||||
if len(bs_list) != 1:
|
||||
return None
|
||||
|
||||
bs = bs_list[0]
|
||||
bs_values = [bs[i].score for i in range(len(bs))][1:] # Remove neutral
|
||||
trans_mat = detection_result.facial_transformation_matrixes[0]
|
||||
face_landmarks = detection_result.face_landmarks[0]
|
||||
|
||||
lmks = np.array([[lm.x, lm.y, lm.z] for lm in face_landmarks])
|
||||
lmks3d = np.array(mesh3d.vertex_buffer).reshape(-1, 5)[:, :3]
|
||||
mp_tris = np.array(mesh3d.index_buffer).reshape(-1, 3) + 1
|
||||
|
||||
return {
|
||||
"lmks": lmks,
|
||||
'lmks3d': lmks3d,
|
||||
"trans_mat": trans_mat,
|
||||
'faces': mp_tris,
|
||||
"bs": bs_values
|
||||
}
|
||||
|
||||
def __call__(self, img_bgr, debug=False):
|
||||
"""
|
||||
Extract face landmarks from image.
|
||||
|
||||
Uses InsightFace to verify face presence, but always runs MediaPipe on
|
||||
the FULL image to get correct landmarks and transformation matrices.
|
||||
|
||||
Args:
|
||||
img_bgr: BGR image (OpenCV format)
|
||||
debug: Enable debug logging
|
||||
|
||||
Returns:
|
||||
dict with lmks, lmks3d, trans_mat, faces, bs or None if no face
|
||||
"""
|
||||
# First, try MediaPipe on full image (this gives correct coordinates)
|
||||
result = self._extract_landmarks_mediapipe(img_bgr, debug)
|
||||
|
||||
if result is not None:
|
||||
if debug:
|
||||
print(f"[HybridLMKExtractor] MediaPipe detected face on full image")
|
||||
return result
|
||||
|
||||
# MediaPipe failed, check if InsightFace can detect a face
|
||||
if self.use_insightface:
|
||||
detections = self.insightface_detector.detect(img_bgr)
|
||||
|
||||
if len(detections) > 0:
|
||||
# InsightFace found a face but MediaPipe couldn't
|
||||
# This might be a difficult pose - return None and let caller handle
|
||||
if debug:
|
||||
print(f"[HybridLMKExtractor] InsightFace found {len(detections)} faces but MediaPipe failed")
|
||||
return None
|
||||
else:
|
||||
if debug:
|
||||
print(f"[HybridLMKExtractor] No face detected by either InsightFace or MediaPipe")
|
||||
return None
|
||||
else:
|
||||
if debug:
|
||||
print(f"[HybridLMKExtractor] MediaPipe failed to detect face")
|
||||
return None
|
||||
|
||||
|
||||
def get_extractor(use_insightface=True, min_detection_confidence=0.5, insightface_det_thresh=0.3):
|
||||
"""
|
||||
Factory function to get the appropriate extractor.
|
||||
|
||||
Args:
|
||||
use_insightface: Whether to try InsightFace + MediaPipe hybrid
|
||||
min_detection_confidence: MediaPipe detection confidence
|
||||
insightface_det_thresh: InsightFace detection threshold
|
||||
|
||||
Returns:
|
||||
HybridLMKExtractor or LMKExtractor instance
|
||||
"""
|
||||
if use_insightface and INSIGHTFACE_AVAILABLE:
|
||||
return HybridLMKExtractor(
|
||||
min_detection_confidence=min_detection_confidence,
|
||||
use_insightface=True,
|
||||
insightface_det_thresh=insightface_det_thresh
|
||||
)
|
||||
else:
|
||||
from .mp_utils import LMKExtractor
|
||||
return LMKExtractor(
|
||||
min_detection_confidence=min_detection_confidence,
|
||||
min_tracking_confidence=min_detection_confidence
|
||||
)
|
||||
|
||||
@@ -3207,7 +3207,7 @@ class FaceLandmarker(base_vision_task_api.BaseVisionTaskApi):
|
||||
})
|
||||
|
||||
if output_packets[_NORM_LANDMARKS_STREAM_NAME].is_empty():
|
||||
return FaceLandmarkerResult([], [], []), None
|
||||
return FaceLandmarkerResult([], [], [])
|
||||
|
||||
return _build_landmarker_result2(output_packets)
|
||||
|
||||
@@ -3252,7 +3252,7 @@ class FaceLandmarker(base_vision_task_api.BaseVisionTaskApi):
|
||||
})
|
||||
|
||||
if output_packets[_NORM_LANDMARKS_STREAM_NAME].is_empty():
|
||||
return FaceLandmarkerResult([], [], []), None
|
||||
return FaceLandmarkerResult([], [], [])
|
||||
|
||||
return _build_landmarker_result2(output_packets)
|
||||
|
||||
|
||||
@@ -33,24 +33,15 @@ import folder_paths
|
||||
|
||||
class LMKExtractor():
|
||||
def __init__(self, FPS=25):
|
||||
"""
|
||||
Initialize face landmark extractor.
|
||||
Uses MediaPipe default thresholds to match original project behavior.
|
||||
|
||||
Args:
|
||||
FPS: Frames per second for video mode
|
||||
"""
|
||||
# Create an FaceLandmarker object.
|
||||
self.mode = mp.tasks.vision.FaceDetectorOptions.running_mode.IMAGE
|
||||
base_options = python.BaseOptions(model_asset_path=os.path.join(CUR_DIR, 'mp_models/face_landmarker_v2_with_blendshapes.task'))
|
||||
base_options.delegate = mp.tasks.BaseOptions.Delegate.CPU
|
||||
options = vision.FaceLandmarkerOptions(
|
||||
base_options=base_options,
|
||||
running_mode=self.mode,
|
||||
output_face_blendshapes=True,
|
||||
output_facial_transformation_matrixes=True,
|
||||
num_faces=1
|
||||
)
|
||||
options = vision.FaceLandmarkerOptions(base_options=base_options,
|
||||
running_mode=self.mode,
|
||||
output_face_blendshapes=True,
|
||||
output_facial_transformation_matrixes=True,
|
||||
num_faces=1)
|
||||
self.detector = face_landmark.FaceLandmarker.create_from_options(options)
|
||||
self.last_ts = 0
|
||||
self.frame_ms = int(1000 / FPS)
|
||||
@@ -67,17 +58,7 @@ class LMKExtractor():
|
||||
self.handler.prepare(ctx_id=0, det_size=(640, 640))
|
||||
|
||||
|
||||
def __call__(self, img, debug=False):
|
||||
"""
|
||||
Extract face landmarks from image.
|
||||
|
||||
Args:
|
||||
img: BGR image (OpenCV format)
|
||||
debug: Enable debug logging (optional, for compatibility)
|
||||
|
||||
Returns:
|
||||
dict with lmks, lmks3d, trans_mat, faces, bs or None if no face
|
||||
"""
|
||||
def __call__(self, img):
|
||||
frame = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||||
image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
|
||||
t0 = time.time()
|
||||
@@ -94,6 +75,10 @@ class LMKExtractor():
|
||||
except:
|
||||
return None
|
||||
elif self.mode == mp.tasks.vision.FaceDetectorOptions.running_mode.IMAGE:
|
||||
# det_result = self.det_detector.detect(image)
|
||||
|
||||
# if len(det_result.detections) != 1:
|
||||
# return None
|
||||
try:
|
||||
detection_result, mesh3d = self.detector.detect(image)
|
||||
except:
|
||||
|
||||
29
nodes.py
29
nodes.py
@@ -31,16 +31,8 @@ try:
|
||||
except ImportError:
|
||||
VideoFromFile = None
|
||||
|
||||
def generate_pose_and_mask_videos(ref_video_path, ref_image_path, face_results=None, fps=None):
|
||||
"""
|
||||
Generate pose and mask videos from reference video.
|
||||
|
||||
Args:
|
||||
ref_video_path: Path to the reference video
|
||||
ref_image_path: Path to the reference image
|
||||
face_results: Pre-computed face detection results (optional, avoids re-detection)
|
||||
fps: Video fps (optional, will be read from video if not provided)
|
||||
"""
|
||||
def generate_pose_and_mask_videos(ref_video_path, ref_image_path):
|
||||
|
||||
print("Starting online generation of pose and mask videos...")
|
||||
detector = FaceMeshDetector()
|
||||
get_align_motion = FaceMeshAlign_dreamidv()
|
||||
@@ -183,7 +175,6 @@ class RunningHub_DreamID_V_Sampler:
|
||||
"optional": {
|
||||
"custom_width": ("INT", {"default": 832, "min": 64, "max": 2048, "step": 8}),
|
||||
"custom_height": ("INT", {"default": 480, "min": 64, "max": 2048, "step": 8}),
|
||||
"face_detection_threshold": ("FLOAT", {"default": 0.3, "min": 0.1, "max": 1.0, "step": 0.05}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,9 +320,7 @@ class RunningHub_DreamID_V_Sampler:
|
||||
try:
|
||||
detected_frames, pose_frames, mask_frames, skip_frames_index, skip_frames_data = generate_pose_and_mask_videos(
|
||||
ref_video_path=ref_video_path,
|
||||
ref_image_path=ref_image_path,
|
||||
face_results=face_results,
|
||||
fps=fps
|
||||
ref_image_path=ref_image_path
|
||||
)
|
||||
except:
|
||||
raise ValueError("Pose and mask video generation failed. no pose detected in the reference video.")
|
||||
@@ -368,7 +357,16 @@ class RunningHub_DreamID_V_Sampler:
|
||||
|
||||
# Convert to frames tensor (N, H, W, C) with values in [0, 1]
|
||||
frames = (generated.clamp(-1, 1).cpu().permute(1, 2, 3, 0) + 1.0) / 2.0
|
||||
print(f'[DreamID-V] Output frames shape: {frames.shape}')
|
||||
print(frames.shape)
|
||||
|
||||
frames_list = list(torch.unbind(frames, dim=0))
|
||||
target_w, target_h = frames.shape[2], frames.shape[1]
|
||||
for i in skip_frames_index:
|
||||
if i < frame_num:
|
||||
frames_list.insert(i, self.frame_2_tensor(skip_frames_data[i], target_w, target_h))
|
||||
frames_list = frames_list[:frame_num]
|
||||
frames = torch.stack(frames_list, dim=0)
|
||||
# print(frames.shape)
|
||||
|
||||
# Create output video with audio from source
|
||||
fps = kwargs.get('fps')
|
||||
@@ -406,7 +404,6 @@ class RunningHub_DreamID_V_Sampler_Test:
|
||||
"optional": {
|
||||
"custom_width": ("INT", {"default": 832, "min": 64, "max": 2048, "step": 8}),
|
||||
"custom_height": ("INT", {"default": 480, "min": 64, "max": 2048, "step": 8}),
|
||||
"face_detection_threshold": ("FLOAT", {"default": 0.3, "min": 0.1, "max": 1.0, "step": 0.05}),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,3 @@ decord
|
||||
tqdm
|
||||
mediapipe
|
||||
|
||||
# Optional: InsightFace for better face detection on difficult videos
|
||||
# Uncomment the lines below to enable InsightFace hybrid detection
|
||||
# insightface>=0.7.3
|
||||
# onnxruntime-gpu>=1.16.0
|
||||
|
||||
@@ -1,99 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
Standalone test script for face detection.
|
||||
Run this directly without starting ComfyUI:
|
||||
python test_face_detection.py /path/to/your/video.mp4
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add project to path
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import imageio
|
||||
|
||||
def test_face_detection(video_path, max_frames=30, min_detection_confidence=0.3):
|
||||
"""Test face detection on a video file."""
|
||||
|
||||
# Import after path setup
|
||||
from express_adaption.media_pipe.mp_utils import LMKExtractor
|
||||
|
||||
print(f"Testing face detection on: {video_path}")
|
||||
print(f"min_detection_confidence: {min_detection_confidence}")
|
||||
print("-" * 50)
|
||||
|
||||
# Create extractor with custom threshold
|
||||
lmk_extractor = LMKExtractor(
|
||||
min_detection_confidence=min_detection_confidence,
|
||||
min_tracking_confidence=min_detection_confidence
|
||||
)
|
||||
|
||||
# Read video
|
||||
frames = imageio.get_reader(video_path)
|
||||
|
||||
detected = 0
|
||||
skipped = 0
|
||||
total = 0
|
||||
|
||||
for i, frame in enumerate(frames):
|
||||
if i >= max_frames:
|
||||
break
|
||||
|
||||
total += 1
|
||||
frame_bgr = cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR)
|
||||
|
||||
# Enable debug for all frames
|
||||
face_result = lmk_extractor(frame_bgr, debug=True)
|
||||
|
||||
if face_result is not None:
|
||||
detected += 1
|
||||
print(f"Frame {i}: ✓ Face detected")
|
||||
else:
|
||||
skipped += 1
|
||||
print(f"Frame {i}: ✗ No face")
|
||||
|
||||
print("-" * 50)
|
||||
print(f"Results: {detected}/{total} frames with faces ({100*detected/total:.1f}%)")
|
||||
print(f"Skipped: {skipped} frames")
|
||||
|
||||
return detected, total
|
||||
|
||||
def test_different_thresholds(video_path, max_frames=30):
|
||||
"""Test with different detection thresholds."""
|
||||
thresholds = [0.5, 0.4, 0.3, 0.2, 0.1]
|
||||
|
||||
print("=" * 60)
|
||||
print("Testing different detection thresholds")
|
||||
print("=" * 60)
|
||||
|
||||
results = []
|
||||
for thresh in thresholds:
|
||||
print(f"\n>>> Testing threshold: {thresh}")
|
||||
detected, total = test_face_detection(video_path, max_frames, thresh)
|
||||
results.append((thresh, detected, total))
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("Summary:")
|
||||
print("=" * 60)
|
||||
for thresh, detected, total in results:
|
||||
print(f" Threshold {thresh}: {detected}/{total} ({100*detected/total:.1f}%)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python test_face_detection.py <video_path> [max_frames]")
|
||||
print("Example: python test_face_detection.py /path/to/video.mp4 50")
|
||||
sys.exit(1)
|
||||
|
||||
video_path = sys.argv[1]
|
||||
max_frames = int(sys.argv[2]) if len(sys.argv) > 2 else 30
|
||||
|
||||
if not os.path.exists(video_path):
|
||||
print(f"Error: Video file not found: {video_path}")
|
||||
sys.exit(1)
|
||||
|
||||
# Test with different thresholds to find the best one
|
||||
test_different_thresholds(video_path, max_frames)
|
||||
|
||||
Reference in New Issue
Block a user