From 70044e1b10fd4ac4850c23ffc69e302929ae2a9a Mon Sep 17 00:00:00 2001 From: Scott Register Date: Wed, 30 Jul 2025 18:07:26 -0700 Subject: [PATCH] sbs working phase 1 --- config.yaml | 21 +- core/__init__.py | 4 +- core/eye_processor.py | 266 ++++++++++++ core/mask_processor.py | 942 ++++++++++++++++++++++++++++++++++++++++ core/sam2_processor.py | 264 ++++++++++- core/video_assembler.py | 306 +++++++++++++ core/yolo_detector.py | 296 +++++++++++++ main.py | 325 +++++++++++++- 8 files changed, 2417 insertions(+), 7 deletions(-) create mode 100644 core/eye_processor.py create mode 100644 core/mask_processor.py create mode 100644 core/video_assembler.py diff --git a/config.yaml b/config.yaml index 791d29f..fefe355 100644 --- a/config.yaml +++ b/config.yaml @@ -21,13 +21,26 @@ processing: # Options: "all", [0, 5, 10], or [] for default (all) detect_segments: "all" + # VR180 separate eye processing mode (default: false for backward compatibility) + separate_eye_processing: false + + # Enable full greenscreen fallback when no humans detected (only used with separate_eye_processing) + enable_greenscreen_fallback: true + + # Pixel overlap between left/right eyes for blending (optional, default: 0) + eye_overlap_pixels: 0 + models: - # YOLO model path - can be pretrained (yolov8n.pt) or custom path - yolo_model: "models/yolo/yolov8n.pt" + # YOLO detection mode: "detection" (bounding boxes) or "segmentation" (direct masks) + yolo_mode: "segmentation" # Default: existing behavior, Options: "detection", "segmentation" + + # YOLO model paths for different modes + yolo_detection_model: "models/yolo/yolo11l.pt" # Regular YOLO for detection mode + yolo_segmentation_model: "models/yolo/yolo11x-seg.pt" # Segmentation YOLO for segmentation mode # SAM2 model configuration - sam2_checkpoint: "models/sam2/checkpoints/sam2.1_hiera_large.pt" - sam2_config: "models/sam2/configs/sam2.1/sam2.1_hiera_l.yaml" + sam2_checkpoint: "models/sam2/checkpoints/sam2.1_hiera_small.pt" + sam2_config: "models/sam2/configs/sam2.1/sam2.1_hiera_s.yaml" video: # Use NVIDIA hardware encoding (requires NVENC-capable GPU) diff --git a/core/__init__.py b/core/__init__.py index 6639c1b..83c6fc5 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,2 +1,4 @@ # YOLO + SAM2 Video Processing Pipeline -# Core modules for video processing with human detection and segmentation \ No newline at end of file +# Core modules for video processing with human detection and segmentation + +from .eye_processor import EyeProcessor \ No newline at end of file diff --git a/core/eye_processor.py b/core/eye_processor.py new file mode 100644 index 0000000..4e3b248 --- /dev/null +++ b/core/eye_processor.py @@ -0,0 +1,266 @@ +""" +Eye processor module for VR180 separate eye processing. +Handles splitting VR180 side-by-side frames into separate left/right eyes and recombining. +""" + +import os +import cv2 +import numpy as np +import logging +import subprocess +from typing import Dict, List, Any, Optional, Tuple + +logger = logging.getLogger(__name__) + +class EyeProcessor: + """Handles VR180 eye-specific processing operations.""" + + def __init__(self, eye_overlap_pixels: int = 0): + """ + Initialize eye processor. + + Args: + eye_overlap_pixels: Number of pixels to overlap between eyes for blending + """ + self.eye_overlap_pixels = eye_overlap_pixels + + def split_frame_into_eyes(self, frame: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Split a VR180 side-by-side frame into separate left and right eye frames. + + Args: + frame: Input VR180 frame (BGR format) + + Returns: + Tuple of (left_eye_frame, right_eye_frame) + """ + if len(frame.shape) != 3: + raise ValueError("Frame must be a 3-channel BGR image") + + height, width, channels = frame.shape + half_width = width // 2 + + # Extract left and right eye frames + left_eye = frame[:, :half_width + self.eye_overlap_pixels, :] + right_eye = frame[:, half_width - self.eye_overlap_pixels:, :] + + logger.debug(f"Split frame {width}x{height} into left: {left_eye.shape} and right: {right_eye.shape}") + + return left_eye, right_eye + + def split_video_into_eyes(self, input_video_path: str, left_output_path: str, + right_output_path: str, scale: float = 1.0) -> bool: + """ + Split a VR180 video into separate left and right eye videos using FFmpeg. + + Args: + input_video_path: Path to input VR180 video + left_output_path: Output path for left eye video + right_output_path: Output path for right eye video + scale: Scale factor for output videos (default: 1.0) + + Returns: + True if successful, False otherwise + """ + try: + # Get video properties + cap = cv2.VideoCapture(input_video_path) + if not cap.isOpened(): + logger.error(f"Could not open video: {input_video_path}") + return False + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + cap.release() + + # Calculate output dimensions + half_width = int((width // 2) * scale) + output_height = int(height * scale) + + # Create output directories if they don't exist + os.makedirs(os.path.dirname(left_output_path), exist_ok=True) + os.makedirs(os.path.dirname(right_output_path), exist_ok=True) + + # FFmpeg command for left eye (crop left half) + left_command = [ + 'ffmpeg', '-y', + '-i', input_video_path, + '-vf', f'crop={width//2 + self.eye_overlap_pixels}:{height}:0:0,scale={half_width}:{output_height}', + '-c:v', 'libx264', + '-preset', 'fast', + '-crf', '18', + left_output_path + ] + + # FFmpeg command for right eye (crop right half) + right_command = [ + 'ffmpeg', '-y', + '-i', input_video_path, + '-vf', f'crop={width//2 + self.eye_overlap_pixels}:{height}:{width//2 - self.eye_overlap_pixels}:0,scale={half_width}:{output_height}', + '-c:v', 'libx264', + '-preset', 'fast', + '-crf', '18', + right_output_path + ] + + logger.info(f"Splitting video into left eye: {left_output_path}") + result_left = subprocess.run(left_command, capture_output=True, text=True) + if result_left.returncode != 0: + logger.error(f"FFmpeg failed for left eye: {result_left.stderr}") + return False + + logger.info(f"Splitting video into right eye: {right_output_path}") + result_right = subprocess.run(right_command, capture_output=True, text=True) + if result_right.returncode != 0: + logger.error(f"FFmpeg failed for right eye: {result_right.stderr}") + return False + + logger.info(f"Successfully split video into separate eye videos") + return True + + except Exception as e: + logger.error(f"Error splitting video into eyes: {e}") + return False + + def combine_eye_masks(self, left_masks: Optional[Dict[int, np.ndarray]], + right_masks: Optional[Dict[int, np.ndarray]], + full_frame_shape: Tuple[int, int]) -> Dict[int, np.ndarray]: + """ + Combine left and right eye masks back into full-frame format. + + Args: + left_masks: Dictionary of masks from left eye processing (frame_idx -> mask) + right_masks: Dictionary of masks from right eye processing (frame_idx -> mask) + full_frame_shape: Shape of the full VR180 frame (height, width) + + Returns: + Dictionary of combined masks in full-frame format + """ + combined_masks = {} + full_height, full_width = full_frame_shape + half_width = full_width // 2 + + # Get all frame indices from both eyes + left_frames = set(left_masks.keys()) if left_masks else set() + right_frames = set(right_masks.keys()) if right_masks else set() + all_frames = left_frames.union(right_frames) + + for frame_idx in all_frames: + # Create full-frame mask + combined_mask = np.zeros((full_height, full_width), dtype=np.uint8) + + # Add left eye mask to left half of frame + if left_masks and frame_idx in left_masks: + left_mask = left_masks[frame_idx] + if len(left_mask.shape) == 3: + left_mask = left_mask.squeeze() + + # Resize left mask to fit left half of full frame + left_target_width = half_width + self.eye_overlap_pixels + if left_mask.shape != (full_height, left_target_width): + left_mask = cv2.resize(left_mask.astype(np.uint8), + (left_target_width, full_height), + interpolation=cv2.INTER_NEAREST) + + # Place in left half of combined mask + combined_mask[:, :left_target_width] = left_mask[:, :left_target_width] + + # Add right eye mask to right half of frame + if right_masks and frame_idx in right_masks: + right_mask = right_masks[frame_idx] + if len(right_mask.shape) == 3: + right_mask = right_mask.squeeze() + + # Resize right mask to fit right half of full frame + right_target_width = half_width + self.eye_overlap_pixels + right_start_x = half_width - self.eye_overlap_pixels + + if right_mask.shape != (full_height, right_target_width): + right_mask = cv2.resize(right_mask.astype(np.uint8), + (right_target_width, full_height), + interpolation=cv2.INTER_NEAREST) + + # Place in right half of combined mask + combined_mask[:, right_start_x:] = right_mask + + # Store combined mask for this frame (using object ID 1 for simplicity) + combined_masks[frame_idx] = {1: combined_mask} + + logger.debug(f"Combined {len(combined_masks)} frame masks from left/right eyes") + return combined_masks + + def is_in_left_half(self, detection: Dict[str, Any], frame_width: int) -> bool: + """ + Check if a detection is in the left half of a VR180 frame. + + Args: + detection: YOLO detection dictionary with 'bbox' key + frame_width: Width of the full VR180 frame + + Returns: + True if detection center is in left half + """ + bbox = detection['bbox'] + center_x = (bbox[0] + bbox[2]) / 2 + return center_x < (frame_width // 2) + + def is_in_right_half(self, detection: Dict[str, Any], frame_width: int) -> bool: + """ + Check if a detection is in the right half of a VR180 frame. + + Args: + detection: YOLO detection dictionary with 'bbox' key + frame_width: Width of the full VR180 frame + + Returns: + True if detection center is in right half + """ + return not self.is_in_left_half(detection, frame_width) + + def convert_detection_to_eye_coordinates(self, detection: Dict[str, Any], + eye_side: str, frame_width: int) -> Dict[str, Any]: + """ + Convert a full-frame detection to eye-specific coordinates. + + Args: + detection: YOLO detection dictionary with 'bbox' key + eye_side: 'left' or 'right' + frame_width: Width of the full VR180 frame + + Returns: + Detection with converted coordinates for the specific eye + """ + bbox = detection['bbox'].copy() + half_width = frame_width // 2 + + if eye_side == 'right': + # Shift right eye coordinates to start from 0 + bbox[0] -= (half_width - self.eye_overlap_pixels) # x1 + bbox[2] -= (half_width - self.eye_overlap_pixels) # x2 + + # Ensure coordinates are within bounds + eye_width = half_width + self.eye_overlap_pixels + bbox[0] = max(0, min(bbox[0], eye_width - 1)) + bbox[2] = max(0, min(bbox[2], eye_width - 1)) + + converted_detection = detection.copy() + converted_detection['bbox'] = bbox + + return converted_detection + + def create_full_greenscreen_frame(self, frame_shape: Tuple[int, int, int], + green_color: List[int] = [0, 255, 0]) -> np.ndarray: + """ + Create a full greenscreen frame for fallback when no humans are detected. + + Args: + frame_shape: Shape of the frame (height, width, channels) + green_color: RGB values for green screen color + + Returns: + Full greenscreen frame + """ + greenscreen_frame = np.full(frame_shape, green_color, dtype=np.uint8) + logger.debug(f"Created full greenscreen frame with shape {frame_shape}") + return greenscreen_frame \ No newline at end of file diff --git a/core/mask_processor.py b/core/mask_processor.py new file mode 100644 index 0000000..0770383 --- /dev/null +++ b/core/mask_processor.py @@ -0,0 +1,942 @@ +""" +Mask processor module for applying green screen effects. +Handles applying masks to video frames to create green screen output. +""" + +import os +import cv2 +import numpy as np +import cupy as cp +import subprocess +import sys +import logging +from typing import Dict, List, Any, Optional, Tuple +from collections import deque + +logger = logging.getLogger(__name__) + +class MaskProcessor: + """Handles mask application and green screen processing with quality enhancements.""" + + def __init__(self, green_color: List[int] = [0, 255, 0], blue_color: List[int] = [255, 0, 0], + mask_quality_config: Optional[Dict[str, Any]] = None, + output_mode: str = "green_screen"): + """ + Initialize mask processor with quality enhancement options. + + Args: + green_color: RGB color for green screen background + blue_color: RGB color for second object (if needed) + mask_quality_config: Configuration dictionary for mask quality improvements + output_mode: Output mode - "green_screen" or "alpha_channel" + """ + self.green_color = green_color + self.blue_color = blue_color + self.output_mode = output_mode + self.use_gpu = self._check_gpu_availability() + + # Mask quality configuration with defaults + if mask_quality_config is None: + mask_quality_config = {} + + self.enable_edge_blur = mask_quality_config.get('enable_edge_blur', False) + self.edge_blur_radius = mask_quality_config.get('edge_blur_radius', 3) + self.edge_blur_sigma = mask_quality_config.get('edge_blur_sigma', 1.5) + + self.enable_temporal_smoothing = mask_quality_config.get('enable_temporal_smoothing', False) + self.temporal_blend_weight = mask_quality_config.get('temporal_blend_weight', 0.3) + self.temporal_history_frames = mask_quality_config.get('temporal_history_frames', 3) + + self.enable_morphological_cleaning = mask_quality_config.get('enable_morphological_cleaning', False) + self.morphology_kernel_size = mask_quality_config.get('morphology_kernel_size', 5) + self.min_component_size = mask_quality_config.get('min_component_size', 500) + + self.alpha_blending_mode = mask_quality_config.get('alpha_blending_mode', 'gaussian') + self.alpha_transition_width = mask_quality_config.get('alpha_transition_width', 10) + + self.enable_bilateral_filter = mask_quality_config.get('enable_bilateral_filter', False) + self.bilateral_d = mask_quality_config.get('bilateral_d', 9) + self.bilateral_sigma_color = mask_quality_config.get('bilateral_sigma_color', 75) + self.bilateral_sigma_space = mask_quality_config.get('bilateral_sigma_space', 75) + + # Temporal history buffer for mask smoothing + self.mask_history = deque(maxlen=self.temporal_history_frames) + + # Log configuration + if any([self.enable_edge_blur, self.enable_temporal_smoothing, self.enable_morphological_cleaning]): + logger.info("Mask quality enhancements enabled:") + if self.enable_edge_blur: + logger.info(f" Edge blur: radius={self.edge_blur_radius}, sigma={self.edge_blur_sigma}") + if self.enable_temporal_smoothing: + logger.info(f" Temporal smoothing: weight={self.temporal_blend_weight}, history={self.temporal_history_frames}") + if self.enable_morphological_cleaning: + logger.info(f" Morphological cleaning: kernel={self.morphology_kernel_size}, min_size={self.min_component_size}") + logger.info(f" Alpha blending: mode={self.alpha_blending_mode}, width={self.alpha_transition_width}") + else: + logger.info("Mask quality enhancements disabled - using standard binary masking") + + logger.info(f"Output mode: {self.output_mode}") + + def _check_gpu_availability(self) -> bool: + """Check if CuPy GPU acceleration is available.""" + try: + import cupy as cp + # Test GPU availability + test_array = cp.array([1, 2, 3]) + _ = test_array * 2 + logger.info("GPU acceleration available via CuPy") + return True + except Exception as e: + logger.warning(f"GPU acceleration not available, using CPU: {e}") + return False + + def enhance_mask_quality(self, mask: np.ndarray) -> np.ndarray: + """ + Apply all enabled mask quality enhancements. + + Args: + mask: Input binary mask + + Returns: + Enhanced mask with quality improvements applied + """ + enhanced_mask = mask.copy() + + # 1. Morphological cleaning + if self.enable_morphological_cleaning: + enhanced_mask = self._clean_mask_morphologically(enhanced_mask) + + # 2. Temporal smoothing + if self.enable_temporal_smoothing: + enhanced_mask = self._apply_temporal_smoothing(enhanced_mask) + + # 3. Edge enhancement and blurring + if self.enable_edge_blur: + enhanced_mask = self._apply_edge_blur(enhanced_mask) + + # 4. Bilateral filtering (if enabled) + if self.enable_bilateral_filter: + enhanced_mask = self._apply_bilateral_filter(enhanced_mask) + + return enhanced_mask + + def _clean_mask_morphologically(self, mask: np.ndarray) -> np.ndarray: + """ + Clean mask using morphological operations to remove noise and small artifacts. + + Args: + mask: Input binary mask + + Returns: + Cleaned mask + """ + # Convert to uint8 for OpenCV operations + mask_uint8 = (mask * 255).astype(np.uint8) + + # Create morphological kernel + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, + (self.morphology_kernel_size, self.morphology_kernel_size)) + + # Opening operation (erosion followed by dilation) to remove small noise + cleaned = cv2.morphologyEx(mask_uint8, cv2.MORPH_OPEN, kernel) + + # Closing operation (dilation followed by erosion) to fill small holes + cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) + + # Remove small connected components + if self.min_component_size > 0: + cleaned = self._remove_small_components(cleaned) + + return (cleaned / 255.0).astype(np.float32) + + def _remove_small_components(self, mask: np.ndarray) -> np.ndarray: + """ + Remove connected components smaller than minimum size. + + Args: + mask: Input binary mask (uint8) + + Returns: + Mask with small components removed + """ + # Find connected components + num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, connectivity=8) + + # Create output mask + output_mask = np.zeros_like(mask) + + # Keep components larger than minimum size (skip background label 0) + for i in range(1, num_labels): + component_size = stats[i, cv2.CC_STAT_AREA] + if component_size >= self.min_component_size: + output_mask[labels == i] = 255 + + return output_mask + + def _apply_temporal_smoothing(self, mask: np.ndarray) -> np.ndarray: + """ + Apply temporal smoothing using mask history. + + Args: + mask: Current frame mask + + Returns: + Temporally smoothed mask + """ + if len(self.mask_history) == 0: + # First frame, no history to blend with + self.mask_history.append(mask.copy()) + return mask + + # Blend with previous frames using weighted average + smoothed_mask = mask.astype(np.float32) + total_weight = 1.0 + + for i, hist_mask in enumerate(reversed(self.mask_history)): + # Exponential decay: more recent frames have higher weight + frame_weight = self.temporal_blend_weight * (0.8 ** i) + smoothed_mask += hist_mask.astype(np.float32) * frame_weight + total_weight += frame_weight + + # Normalize by total weight + smoothed_mask /= total_weight + + # Update history + self.mask_history.append(mask.copy()) + + return smoothed_mask + + def _apply_edge_blur(self, mask: np.ndarray) -> np.ndarray: + """ + Apply Gaussian blur to mask edges for smooth transitions. + + Args: + mask: Input mask + + Returns: + Mask with blurred edges + """ + # Apply Gaussian blur + kernel_size = 2 * self.edge_blur_radius + 1 + blurred_mask = cv2.GaussianBlur(mask.astype(np.float32), + (kernel_size, kernel_size), + self.edge_blur_sigma) + + return blurred_mask + + def _apply_bilateral_filter(self, mask: np.ndarray) -> np.ndarray: + """ + Apply bilateral filtering for edge-preserving smoothing. + + Args: + mask: Input mask + + Returns: + Filtered mask + """ + # Convert to uint8 for bilateral filter + mask_uint8 = (mask * 255).astype(np.uint8) + + # Apply bilateral filter + filtered = cv2.bilateralFilter(mask_uint8, self.bilateral_d, + self.bilateral_sigma_color, + self.bilateral_sigma_space) + + return (filtered / 255.0).astype(np.float32) + + def _create_alpha_mask(self, mask: np.ndarray) -> np.ndarray: + """ + Create alpha mask with smooth transitions based on blending mode. + + Args: + mask: Input binary/float mask + + Returns: + Alpha mask with smooth transitions + """ + if self.alpha_blending_mode == "linear": + return mask + elif self.alpha_blending_mode == "gaussian": + # Use distance transform for smooth falloff + binary_mask = (mask > 0.5).astype(np.uint8) + + # Distance transform from mask edges + dist_inside = cv2.distanceTransform(binary_mask, cv2.DIST_L2, 5) + dist_outside = cv2.distanceTransform(1 - binary_mask, cv2.DIST_L2, 5) + + # Create smooth alpha based on distance + alpha = np.zeros_like(mask, dtype=np.float32) + transition_width = self.alpha_transition_width + + # Inside mask: fade from edge + alpha[binary_mask > 0] = np.minimum(1.0, dist_inside[binary_mask > 0] / transition_width) + + # Outside mask: fade to zero + alpha[binary_mask == 0] = np.maximum(0.0, 1.0 - dist_outside[binary_mask == 0] / transition_width) + + return alpha + elif self.alpha_blending_mode == "sigmoid": + # Sigmoid-based smooth transition + return 1.0 / (1.0 + np.exp(-10 * (mask - 0.5))) + else: + return mask + + def apply_green_mask(self, frame: np.ndarray, masks: List[np.ndarray]) -> np.ndarray: + """ + Apply green screen mask to a frame with quality enhancements. + + Args: + frame: Input video frame (BGR format) + masks: List of object masks to apply + + Returns: + Frame with green screen background and enhanced mask quality + """ + # Combine all masks into a single mask + combined_mask = self._combine_masks(masks) + + # Apply quality enhancements + enhanced_mask = self.enhance_mask_quality(combined_mask) + + # Create alpha mask for smooth blending + alpha_mask = self._create_alpha_mask(enhanced_mask) + + # Apply mask using alpha blending + if self.use_gpu: + return self._apply_green_mask_gpu_enhanced(frame, alpha_mask) + else: + return self._apply_green_mask_cpu_enhanced(frame, alpha_mask) + + def apply_mask_with_alpha(self, frame: np.ndarray, masks: List[np.ndarray]) -> np.ndarray: + """ + Apply mask to create RGBA frame with alpha channel. + + Args: + frame: Input video frame (BGR format) + masks: List of object masks to apply + + Returns: + RGBA frame with alpha channel + """ + # Combine all masks into a single mask + combined_mask = self._combine_masks(masks) + + # Apply quality enhancements + enhanced_mask = self.enhance_mask_quality(combined_mask) + + # Create alpha mask for smooth blending + alpha_mask = self._create_alpha_mask(enhanced_mask) + + # Resize alpha mask to match frame if needed + if alpha_mask.shape != frame.shape[:2]: + alpha_mask = cv2.resize(alpha_mask, (frame.shape[1], frame.shape[0])) + + # Convert BGR to BGRA + bgra_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2BGRA) + + # Set alpha channel + bgra_frame[:, :, 3] = (alpha_mask * 255).astype(np.uint8) + + return bgra_frame + + def _combine_masks(self, masks: List[np.ndarray]) -> np.ndarray: + """ + Combine multiple object masks into a single mask. + + Args: + masks: List of object masks + + Returns: + Combined mask + """ + if not masks: + return np.zeros((0, 0), dtype=np.float32) + + # Start with first mask + combined_mask = masks[0].squeeze().astype(np.float32) + + # Combine with remaining masks using logical OR + for mask in masks[1:]: + mask_squeezed = mask.squeeze().astype(np.float32) + if mask_squeezed.shape != combined_mask.shape: + # Resize mask to match combined mask + mask_squeezed = cv2.resize(mask_squeezed, + (combined_mask.shape[1], combined_mask.shape[0]), + interpolation=cv2.INTER_NEAREST) + combined_mask = np.maximum(combined_mask, mask_squeezed) + + return combined_mask + + def reset_temporal_history(self): + """Reset temporal history buffer. Call this when starting a new segment.""" + self.mask_history.clear() + logger.debug("Temporal history buffer reset") + + def _apply_green_mask_gpu_enhanced(self, frame: np.ndarray, alpha_mask: np.ndarray) -> np.ndarray: + """GPU-accelerated green mask application with alpha blending using CuPy (Phase 1 optimized).""" + try: + # Convert to CuPy arrays with optimized data transfer + frame_gpu = cp.asarray(frame, dtype=cp.uint8) + alpha_gpu = cp.asarray(alpha_mask, dtype=cp.float32) + + # Resize alpha mask to match frame if needed (vectorized operation) + if alpha_gpu.shape != frame_gpu.shape[:2]: + # Use CuPy's resize instead of OpenCV for GPU optimization + alpha_gpu = cp.array(cv2.resize(cp.asnumpy(alpha_gpu), + (frame_gpu.shape[1], frame_gpu.shape[0]))) + + # Create green background (optimized broadcasting) + green_color_gpu = cp.array(self.green_color, dtype=cp.uint8) + green_background = cp.broadcast_to(green_color_gpu, frame_gpu.shape) + + # Apply vectorized alpha blending with optimized memory access + alpha_3d = cp.expand_dims(alpha_gpu, axis=2) + + # Use more efficient computation with explicit typing + frame_float = frame_gpu.astype(cp.float32) + green_float = green_background.astype(cp.float32) + + # Vectorized blending operation + result_frame = cp.clip(alpha_3d * frame_float + (1.0 - alpha_3d) * green_float, 0, 255) + + return cp.asnumpy(result_frame.astype(cp.uint8)) + + except Exception as e: + logger.error(f"GPU enhanced processing failed, falling back to CPU: {e}") + return self._apply_green_mask_cpu_enhanced(frame, alpha_mask) + + def _apply_green_mask_cpu_enhanced(self, frame: np.ndarray, alpha_mask: np.ndarray) -> np.ndarray: + """CPU-based green mask application with alpha blending (Phase 1 optimized).""" + # Resize alpha mask to match frame if needed + if alpha_mask.shape != frame.shape[:2]: + alpha_mask = cv2.resize(alpha_mask, (frame.shape[1], frame.shape[0])) + + # Create green background with broadcasting (more efficient) + green_color = np.array(self.green_color, dtype=np.uint8) + green_background = np.broadcast_to(green_color, frame.shape) + + # Apply optimized alpha blending with explicit data types + alpha_3d = np.expand_dims(alpha_mask.astype(np.float32), axis=2) + + # Vectorized blending with optimized memory access + frame_float = frame.astype(np.float32) + green_float = green_background.astype(np.float32) + + result_frame = np.clip(alpha_3d * frame_float + (1.0 - alpha_3d) * green_float, 0, 255) + + return result_frame.astype(np.uint8) + + def apply_colored_mask(self, frame: np.ndarray, masks_a: List[np.ndarray], + masks_b: List[np.ndarray]) -> np.ndarray: + """ + Apply colored masks for visualization (green and blue). + + Args: + frame: Input video frame + masks_a: Masks for object A (green) + masks_b: Masks for object B (blue) + + Returns: + Frame with colored masks applied + """ + colored_mask = np.zeros_like(frame) + + # Apply green color to masks_a + for mask in masks_a: + mask = mask.squeeze() + if mask.shape != frame.shape[:2]: + mask = cv2.resize(mask, (frame.shape[1], frame.shape[0]), + interpolation=cv2.INTER_NEAREST) + colored_mask[mask > 0] = self.green_color + + # Apply blue color to masks_b + for mask in masks_b: + mask = mask.squeeze() + if mask.shape != frame.shape[:2]: + mask = cv2.resize(mask, (frame.shape[1], frame.shape[0]), + interpolation=cv2.INTER_NEAREST) + colored_mask[mask > 0] = self.blue_color + + return colored_mask + + def _precompute_upscaled_masks(self, video_segments: Dict[int, Dict[int, np.ndarray]], + target_width: int, target_height: int) -> Dict[int, Dict[int, np.ndarray]]: + """ + Pre-compute all upscaled masks to avoid per-frame upscaling. + + Args: + video_segments: Dictionary of frame masks from SAM2 + target_width: Target frame width + target_height: Target frame height + + Returns: + Dictionary with pre-upscaled masks + """ + logger.info(f"Pre-computing upscaled masks for {len(video_segments)} frames") + upscaled_segments = {} + + for frame_idx, frame_masks in video_segments.items(): + upscaled_frame_masks = {} + for obj_id, mask in frame_masks.items(): + mask = mask.squeeze() + if mask.shape != (target_height, target_width): + upscaled_mask = cv2.resize(mask.astype(np.uint8), + (target_width, target_height), + interpolation=cv2.INTER_NEAREST) + upscaled_frame_masks[obj_id] = upscaled_mask + else: + upscaled_frame_masks[obj_id] = mask.astype(np.uint8) + upscaled_segments[frame_idx] = upscaled_frame_masks + + logger.info(f"Pre-computed upscaled masks for {len(upscaled_segments)} frames") + return upscaled_segments + + def process_and_save_output_video(self, video_path: str, output_video_path: str, + video_segments: Dict[int, Dict[int, np.ndarray]], + use_nvenc: bool = False, bitrate: str = "50M") -> bool: + """ + Process high-resolution frames, apply upscaled masks, and save the output video. + + Args: + video_path: Path to input video + output_video_path: Path to save output video + video_segments: Dictionary of frame masks + use_nvenc: Whether to use NVIDIA hardware encoding + bitrate: Output video bitrate + + Returns: + True if successful + """ + try: + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + logger.error(f"Could not open video: {video_path}") + return False + + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + logger.info(f"Processing video: {frame_width}x{frame_height} @ {fps}fps, {total_frames} frames") + + # Pre-compute all upscaled masks (Phase 1 optimization) + upscaled_segments = self._precompute_upscaled_masks(video_segments, frame_width, frame_height) + + # Setup VideoWriter + if self.output_mode == "alpha_channel": + # For alpha channel, we need a codec that supports transparency + success = self._setup_alpha_encoder(output_video_path, frame_width, frame_height, fps, bitrate) + if not success: + logger.error("Failed to setup alpha channel encoder") + cap.release() + return False + use_nvenc = False # Override NVENC for alpha channel + elif use_nvenc: + success = self._setup_nvenc_encoder(output_video_path, frame_width, frame_height, fps, bitrate) + if not success: + logger.warning("NVENC setup failed, falling back to OpenCV") + use_nvenc = False + + if not use_nvenc and self.output_mode != "alpha_channel": + # Use OpenCV VideoWriter + fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Use mp4v for better compatibility + out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) + if not out.isOpened(): + logger.error("Failed to create output video writer") + cap.release() + return False + + # Process frames with batch reading (Phase 1 optimization) + frame_idx = 0 + processed_frames = 0 + batch_size = 10 # Process frames in batches for better I/O performance + frame_buffer = [] + + # Pre-fill frame buffer + for _ in range(min(batch_size, len(upscaled_segments))): + ret, frame = cap.read() + if ret: + frame_buffer.append(frame) + else: + break + + buffer_idx = 0 + while frame_idx < len(upscaled_segments) and buffer_idx < len(frame_buffer): + frame = frame_buffer[buffer_idx] + + if frame_idx in upscaled_segments: + # Get pre-computed upscaled masks for this frame (Phase 1 optimization) + upscaled_masks = [upscaled_segments[frame_idx][obj_id] + for obj_id in upscaled_segments[frame_idx]] + + # Apply mask based on output mode (no upscaling needed - already done) + if self.output_mode == "alpha_channel": + result_frame = self.apply_mask_with_alpha(frame, upscaled_masks) + else: + result_frame = self.apply_green_mask(frame, upscaled_masks) + else: + # No mask for this frame + if self.output_mode == "alpha_channel": + # Create fully transparent frame for alpha channel mode + bgra_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2BGRA) + bgra_frame[:, :, 3] = 0 # Fully transparent + result_frame = bgra_frame + logger.warning(f"No mask for frame {frame_idx}, using transparent frame") + else: + # Use original frame for green screen mode + logger.warning(f"No mask for frame {frame_idx}, using original") + result_frame = frame + + # Write frame + if self.output_mode == "alpha_channel" and hasattr(self, 'alpha_process'): + self.alpha_process.stdin.write(result_frame.tobytes()) + elif use_nvenc and hasattr(self, 'nvenc_process'): + self.nvenc_process.stdin.write(result_frame.tobytes()) + else: + out.write(result_frame) + + processed_frames += 1 + frame_idx += 1 + buffer_idx += 1 + + # Refill buffer when needed + if buffer_idx >= len(frame_buffer) and frame_idx < len(upscaled_segments): + frame_buffer.clear() + buffer_idx = 0 + # Read next batch + for _ in range(min(batch_size, len(upscaled_segments) - frame_idx)): + ret, frame = cap.read() + if ret: + frame_buffer.append(frame) + else: + break + + # Progress logging + if processed_frames % 100 == 0: + logger.info(f"Processed {processed_frames}/{total_frames} frames") + + # Cleanup + cap.release() + + if self.output_mode == "alpha_channel" and hasattr(self, 'alpha_process'): + self.alpha_process.stdin.close() + self.alpha_process.wait() + if self.alpha_process.returncode != 0: + logger.error("Alpha channel encoding failed") + return False + elif use_nvenc and hasattr(self, 'nvenc_process'): + self.nvenc_process.stdin.close() + self.nvenc_process.wait() + if self.nvenc_process.returncode != 0: + logger.error("NVENC encoding failed") + return False + else: + out.release() + + logger.info(f"Successfully processed {processed_frames} frames to {output_video_path}") + return True + + except Exception as e: + logger.error(f"Error processing video: {e}") + return False + + def _setup_nvenc_encoder(self, output_path: str, width: int, height: int, + fps: float, bitrate: str) -> bool: + """Setup NVENC hardware encoder using FFmpeg.""" + try: + # Determine encoder based on platform + if sys.platform == 'darwin': + encoder = 'hevc_videotoolbox' + else: + encoder = 'hevc_nvenc' + + command = [ + 'ffmpeg', + '-y', # Overwrite output file + '-f', 'rawvideo', + '-vcodec', 'rawvideo', + '-pix_fmt', 'bgr24', + '-s', f'{width}x{height}', + '-r', str(fps), + '-i', '-', # Input from stdin + '-an', # No audio (will be added later) + '-vcodec', encoder, + '-pix_fmt', 'yuv420p', # Changed from nv12 for better compatibility + '-preset', 'slow', + '-b:v', bitrate, + output_path + ] + + self.nvenc_process = subprocess.Popen(command, stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + logger.info(f"Initialized {encoder} hardware encoder") + return True + + except Exception as e: + logger.error(f"Failed to setup NVENC encoder: {e}") + return False + + def _setup_alpha_encoder(self, output_path: str, width: int, height: int, + fps: float, bitrate: str) -> bool: + """Setup encoder for alpha channel video using FFmpeg with H.264/H.265.""" + try: + # For VR180 SBS, we'll use H.265 (HEVC) with alpha channel + # Note: Standard H.264/H.265 don't support alpha directly, + # so we'll encode the alpha as a separate grayscale channel or use a special pixel format + + # Determine encoder based on platform + if sys.platform == 'darwin': + encoder = 'hevc_videotoolbox' + else: + encoder = 'hevc_nvenc' + + command = [ + 'ffmpeg', + '-y', # Overwrite output file + '-f', 'rawvideo', + '-vcodec', 'rawvideo', + '-pix_fmt', 'bgra', # BGRA for alpha channel + '-s', f'{width}x{height}', + '-r', str(fps), + '-i', '-', # Input from stdin + '-an', # No audio (will be added later) + '-c:v', encoder, + '-pix_fmt', 'yuv420p', # Standard pixel format + '-preset', 'slow', + '-b:v', bitrate, + '-tag:v', 'hvc1', # Required for some players + output_path + ] + + self.alpha_process = subprocess.Popen(command, stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + self.alpha_output_path = output_path + logger.info(f"Initialized {encoder} for alpha channel output (will be encoded as transparency in RGB)") + return True + + except Exception as e: + logger.error(f"Failed to setup alpha encoder: {e}") + return False + + def process_segment(self, segment_info: dict, video_segments: Dict[int, Dict[int, np.ndarray]], + use_nvenc: bool = False, bitrate: str = "50M") -> bool: + """ + Process a single segment and save the output video. + + Args: + segment_info: Segment information dictionary + video_segments: Dictionary of frame masks from SAM2 + use_nvenc: Whether to use hardware encoding + bitrate: Output video bitrate + + Returns: + True if successful + """ + input_video = segment_info['video_file'] + if self.output_mode == "alpha_channel": + output_video = os.path.join(segment_info['directory'], f"output_{segment_info['index']}.mov") + else: + output_video = os.path.join(segment_info['directory'], f"output_{segment_info['index']}.mp4") + + logger.info(f"Processing segment {segment_info['index']} with {self.output_mode}") + + success = self.process_and_save_output_video( + input_video, + output_video, + video_segments, + use_nvenc, + bitrate + ) + + if success: + logger.info(f"Successfully created {self.output_mode} video: {output_video}") + else: + logger.error(f"Failed to process segment {segment_info['index']}") + + return success + + def create_full_greenscreen_frame(self, frame_shape: Tuple[int, int, int], + green_color: Optional[List[int]] = None) -> np.ndarray: + """ + Create a full greenscreen frame for fallback when no humans are detected. + + Args: + frame_shape: Shape of the frame (height, width, channels) + green_color: RGB values for green screen color (uses default if None) + + Returns: + Full greenscreen frame + """ + if green_color is None: + green_color = self.green_color + + greenscreen_frame = np.full(frame_shape, green_color, dtype=np.uint8) + logger.debug(f"Created full greenscreen frame with shape {frame_shape}") + return greenscreen_frame + + def process_greenscreen_only_segment(self, segment_info: dict, + green_color: Optional[List[int]] = None, + use_nvenc: bool = False, bitrate: str = "50M") -> bool: + """ + Create a full greenscreen segment when no humans are detected. + Used as fallback in separate eye processing mode. + + Args: + segment_info: Segment information dictionary + green_color: RGB values for green screen color (uses default if None) + use_nvenc: Whether to use hardware encoding + bitrate: Output video bitrate + + Returns: + True if greenscreen segment was created successfully + """ + segment_dir = segment_info['directory'] + video_path = segment_info['video_file'] + segment_idx = segment_info['index'] + + logger.info(f"Creating full greenscreen segment {segment_idx} (no humans detected)") + + try: + # Get video properties + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + logger.error(f"Could not open video: {video_path}") + return False + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + + # Create output video path + if self.output_mode == "alpha_channel": + output_video_path = os.path.join(segment_dir, f"output_{segment_idx}.mov") + else: + output_video_path = os.path.join(segment_dir, f"output_{segment_idx}.mp4") + + # Create greenscreen frame + if green_color is None: + green_color = self.green_color + + greenscreen_frame = self.create_full_greenscreen_frame( + (height, width, 3), green_color + ) + + # Setup video writer based on mode and hardware encoding preference + if use_nvenc: + success = self._write_greenscreen_with_nvenc( + output_video_path, greenscreen_frame, frame_count, fps, bitrate + ) + else: + success = self._write_greenscreen_with_opencv( + output_video_path, greenscreen_frame, frame_count, fps + ) + + if not success: + logger.error(f"Failed to write greenscreen video for segment {segment_idx}") + return False + + # Create empty mask file (black mask since no humans detected) + mask_output_path = os.path.join(segment_dir, "mask.png") + black_mask = np.zeros((height, width, 3), dtype=np.uint8) + cv2.imwrite(mask_output_path, black_mask) + + # Mark segment as completed + output_done_file = os.path.join(segment_dir, "output_frames_done") + with open(output_done_file, 'w') as f: + f.write(f"Greenscreen segment {segment_idx} completed successfully\n") + + logger.info(f"Successfully created greenscreen segment {segment_idx}") + return True + + except Exception as e: + logger.error(f"Error creating greenscreen segment {segment_idx}: {e}") + return False + + def _write_greenscreen_with_opencv(self, output_path: str, greenscreen_frame: np.ndarray, + frame_count: int, fps: float) -> bool: + """Write greenscreen video using OpenCV VideoWriter.""" + try: + if self.output_mode == "alpha_channel": + # For alpha channel mode, create fully transparent frames + bgra_frame = cv2.cvtColor(greenscreen_frame, cv2.COLOR_BGR2BGRA) + bgra_frame[:, :, 3] = 0 # Fully transparent + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, + (greenscreen_frame.shape[1], greenscreen_frame.shape[0]), True) + frame_to_write = bgra_frame[:, :, :3] # OpenCV expects BGR for mp4v + else: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, + (greenscreen_frame.shape[1], greenscreen_frame.shape[0])) + frame_to_write = greenscreen_frame + + if not out.isOpened(): + logger.error(f"Failed to open video writer for {output_path}") + return False + + # Write identical greenscreen frames + for _ in range(frame_count): + out.write(frame_to_write) + + out.release() + logger.debug(f"Wrote {frame_count} greenscreen frames using OpenCV") + return True + + except Exception as e: + logger.error(f"Error writing greenscreen with OpenCV: {e}") + return False + + def _write_greenscreen_with_nvenc(self, output_path: str, greenscreen_frame: np.ndarray, + frame_count: int, fps: float, bitrate: str) -> bool: + """Write greenscreen video using NVENC hardware encoding.""" + try: + # Setup NVENC encoder + if not self._setup_nvenc_encoder(output_path, + greenscreen_frame.shape[1], + greenscreen_frame.shape[0], + fps, bitrate): + logger.warning("NVENC setup failed for greenscreen, falling back to OpenCV") + return self._write_greenscreen_with_opencv(output_path, greenscreen_frame, frame_count, fps) + + # Write identical greenscreen frames + for _ in range(frame_count): + self.nvenc_process.stdin.write(greenscreen_frame.tobytes()) + + # Finalize encoding + self.nvenc_process.stdin.close() + self.nvenc_process.wait() + + if self.nvenc_process.returncode != 0: + logger.error("NVENC encoding failed for greenscreen") + return False + + logger.debug(f"Wrote {frame_count} greenscreen frames using NVENC") + return True + + except Exception as e: + logger.error(f"Error writing greenscreen with NVENC: {e}") + return False + + def has_valid_masks(self, video_segments: Optional[Dict[int, Dict[int, np.ndarray]]]) -> bool: + """ + Check if video segments contain valid masks. + + Args: + video_segments: Video segments dictionary from SAM2 + + Returns: + True if valid masks are found + """ + if not video_segments: + return False + + # Check if any frame has non-empty masks + for frame_idx, frame_masks in video_segments.items(): + for obj_id, mask in frame_masks.items(): + if mask is not None and np.any(mask): + return True + + return False \ No newline at end of file diff --git a/core/sam2_processor.py b/core/sam2_processor.py index aed7aeb..923e126 100644 --- a/core/sam2_processor.py +++ b/core/sam2_processor.py @@ -11,13 +11,15 @@ import logging import gc from typing import Dict, List, Any, Optional, Tuple from sam2.build_sam import build_sam2_video_predictor +from .eye_processor import EyeProcessor logger = logging.getLogger(__name__) class SAM2Processor: """Handles SAM2-based video segmentation for human tracking.""" - def __init__(self, checkpoint_path: str, config_path: str, vos_optimized: bool = False): + def __init__(self, checkpoint_path: str, config_path: str, vos_optimized: bool = False, + separate_eye_processing: bool = False, eye_overlap_pixels: int = 0): """ Initialize SAM2 processor. @@ -25,11 +27,21 @@ class SAM2Processor: checkpoint_path: Path to SAM2 checkpoint config_path: Path to SAM2 config file vos_optimized: Enable VOS optimization for speedup (requires PyTorch 2.5.1+) + separate_eye_processing: Enable VR180 separate eye processing mode + eye_overlap_pixels: Pixel overlap between eyes for blending """ self.checkpoint_path = checkpoint_path self.config_path = config_path self.vos_optimized = vos_optimized + self.separate_eye_processing = separate_eye_processing self.predictor = None + + # Initialize eye processor if separate eye processing is enabled + if separate_eye_processing: + self.eye_processor = EyeProcessor(eye_overlap_pixels=eye_overlap_pixels) + else: + self.eye_processor = None + self._initialize_predictor() def _initialize_predictor(self): @@ -650,3 +662,253 @@ class SAM2Processor: else: logger.error("SAM2 Mid-segment: FAILED - No prompts were successfully added") return False + + def process_single_eye_segment(self, segment_info: dict, eye_side: str, + yolo_prompts: Optional[List[Dict[str, Any]]] = None, + previous_masks: Optional[Dict[int, np.ndarray]] = None, + inference_scale: float = 0.5) -> Optional[Dict[int, np.ndarray]]: + """ + Process a single eye of a VR180 segment with SAM2. + + Args: + segment_info: Segment information dictionary + eye_side: 'left' or 'right' eye + yolo_prompts: Optional YOLO detection prompts for first frame + previous_masks: Optional masks from previous segment + inference_scale: Scale factor for inference + + Returns: + Dictionary mapping frame indices to masks, or None if failed + """ + if not self.eye_processor: + logger.error("Eye processor not initialized - separate_eye_processing must be enabled") + return None + + segment_dir = segment_info['directory'] + video_path = segment_info['video_file'] + segment_idx = segment_info['index'] + + logger.info(f"Processing {eye_side} eye for segment {segment_idx}") + + # Use the video path directly (it should already be the eye-specific video) + eye_video_path = video_path + + # Verify the eye video exists + if not os.path.exists(eye_video_path): + logger.error(f"Eye video not found: {eye_video_path}") + return None + + # Create low-resolution eye video for inference + low_res_eye_video_path = os.path.join(segment_dir, f"low_res_{eye_side}_eye_video.mp4") + if not os.path.exists(low_res_eye_video_path): + try: + self.create_low_res_video(eye_video_path, low_res_eye_video_path, inference_scale) + except Exception as e: + logger.error(f"Failed to create low-res {eye_side} eye video for segment {segment_idx}: {e}") + return None + + try: + # Initialize inference state with eye-specific video + inference_state = self.predictor.init_state(video_path=low_res_eye_video_path, async_loading_frames=True) + + # Add prompts or previous masks (always use obj_id=1 for single eye processing) + if yolo_prompts: + # Convert prompts to use obj_id=1 for single eye processing + eye_prompts = [] + for prompt in yolo_prompts: + eye_prompt = prompt.copy() + eye_prompt['obj_id'] = 1 # Always use obj_id=1 for single eye + eye_prompts.append(eye_prompt) + + if not self.add_yolo_prompts_to_predictor(inference_state, eye_prompts): + logger.error(f"Failed to add prompts for {eye_side} eye") + return None + + elif previous_masks: + # Convert previous masks to use obj_id=1 for single eye processing + eye_masks = {1: list(previous_masks.values())[0]} if previous_masks else {} + if not self.add_previous_masks_to_predictor(inference_state, eye_masks): + logger.error(f"Failed to add previous masks for {eye_side} eye") + return None + else: + logger.error(f"No prompts or previous masks available for {eye_side} eye of segment {segment_idx}") + return None + + # Propagate masks + logger.info(f"Propagating masks for {eye_side} eye") + video_segments = self.propagate_masks(inference_state) + + # Extract just the masks (remove obj_id structure since we only use obj_id=1) + eye_masks = {} + for frame_idx, frame_masks in video_segments.items(): + if 1 in frame_masks: # We always use obj_id=1 for single eye processing + eye_masks[frame_idx] = frame_masks[1] + + # Clean up + self.predictor.reset_state(inference_state) + del inference_state + gc.collect() + + # Remove temporary low-res video + try: + os.remove(low_res_eye_video_path) + logger.debug(f"Removed low-res {eye_side} eye video: {low_res_eye_video_path}") + except Exception as e: + logger.warning(f"Could not remove low-res {eye_side} eye video: {e}") + + logger.info(f"Successfully processed {eye_side} eye with {len(eye_masks)} frames") + return eye_masks + + except Exception as e: + logger.error(f"Error processing {eye_side} eye for segment {segment_idx}: {e}") + return None + + def process_segment_with_separate_eyes(self, segment_info: dict, + left_prompts: Optional[List[Dict[str, Any]]] = None, + right_prompts: Optional[List[Dict[str, Any]]] = None, + previous_left_masks: Optional[Dict[int, np.ndarray]] = None, + previous_right_masks: Optional[Dict[int, np.ndarray]] = None, + inference_scale: float = 0.5, + full_frame_shape: Optional[Tuple[int, int]] = None) -> Optional[Dict[int, Dict[int, np.ndarray]]]: + """ + Process a VR180 segment with separate left and right eye processing. + + Args: + segment_info: Segment information dictionary + left_prompts: Optional YOLO prompts for left eye + right_prompts: Optional YOLO prompts for right eye + previous_left_masks: Optional previous masks for left eye + previous_right_masks: Optional previous masks for right eye + inference_scale: Scale factor for inference + full_frame_shape: Shape of full VR180 frame (height, width) + + Returns: + Combined video segments dictionary or None if failed + """ + if not self.eye_processor: + logger.error("Eye processor not initialized - separate_eye_processing must be enabled") + return None + + segment_idx = segment_info['index'] + logger.info(f"Processing segment {segment_idx} with separate eye processing") + + # Get full frame shape if not provided + if full_frame_shape is None: + try: + cap = cv2.VideoCapture(segment_info['video_file']) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + cap.release() + full_frame_shape = (height, width) + except Exception as e: + logger.error(f"Could not determine frame shape: {e}") + return None + + # Process left eye if prompts or previous masks are available + left_masks = None + if left_prompts or previous_left_masks: + logger.info(f"Processing left eye for segment {segment_idx}") + left_masks = self.process_single_eye_segment( + segment_info, 'left', left_prompts, previous_left_masks, inference_scale + ) + + # Process right eye if prompts or previous masks are available + right_masks = None + if right_prompts or previous_right_masks: + logger.info(f"Processing right eye for segment {segment_idx}") + right_masks = self.process_single_eye_segment( + segment_info, 'right', right_prompts, previous_right_masks, inference_scale + ) + + # Combine masks back to full frame format + if left_masks or right_masks: + logger.info(f"Combining eye masks for segment {segment_idx}") + combined_masks = self.eye_processor.combine_eye_masks( + left_masks, right_masks, full_frame_shape + ) + + # Clean up eye-specific videos to save space + try: + left_eye_path = os.path.join(segment_info['directory'], "left_eye_video.mp4") + right_eye_path = os.path.join(segment_info['directory'], "right_eye_video.mp4") + + if os.path.exists(left_eye_path): + os.remove(left_eye_path) + logger.debug(f"Removed left eye video: {left_eye_path}") + + if os.path.exists(right_eye_path): + os.remove(right_eye_path) + logger.debug(f"Removed right eye video: {right_eye_path}") + + except Exception as e: + logger.warning(f"Could not clean up eye videos: {e}") + + logger.info(f"Successfully processed segment {segment_idx} with separate eyes") + return combined_masks + else: + logger.warning(f"No masks generated for either eye in segment {segment_idx}") + return None + + def create_greenscreen_segment(self, segment_info: dict, green_color: List[int] = [0, 255, 0]) -> bool: + """ + Create a full greenscreen segment when no humans are detected. + + Args: + segment_info: Segment information dictionary + green_color: RGB values for green screen color + + Returns: + True if greenscreen segment was created successfully + """ + segment_dir = segment_info['directory'] + video_path = segment_info['video_file'] + segment_idx = segment_info['index'] + + logger.info(f"Creating full greenscreen segment {segment_idx}") + + try: + # Get video properties + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + logger.error(f"Could not open video: {video_path}") + return False + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + + # Create output video path + output_video_path = os.path.join(segment_dir, f"output_{segment_idx}.mp4") + + # Create greenscreen frames + greenscreen_frame = self.eye_processor.create_full_greenscreen_frame( + (height, width, 3), green_color + ) + + # Write greenscreen video + fourcc = cv2.VideoWriter_fourcc(*'HEVC') + out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) + + for _ in range(frame_count): + out.write(greenscreen_frame) + + out.release() + + # Create mask file (empty/black mask since no humans detected) + mask_output_path = os.path.join(segment_dir, "mask.png") + black_mask = np.zeros((height, width, 3), dtype=np.uint8) + cv2.imwrite(mask_output_path, black_mask) + + # Mark segment as completed + output_done_file = os.path.join(segment_dir, "output_frames_done") + with open(output_done_file, 'w') as f: + f.write(f"Greenscreen segment {segment_idx} completed successfully\n") + + logger.info(f"Successfully created greenscreen segment {segment_idx}") + return True + + except Exception as e: + logger.error(f"Error creating greenscreen segment {segment_idx}: {e}") + return False diff --git a/core/video_assembler.py b/core/video_assembler.py new file mode 100644 index 0000000..9e3b72d --- /dev/null +++ b/core/video_assembler.py @@ -0,0 +1,306 @@ +""" +Video assembler module for concatenating processed segments. +Handles merging processed segments and adding audio from original video. +""" + +import os +import subprocess +import logging +from typing import List, Optional +from utils.file_utils import get_segments_directories, file_exists + +logger = logging.getLogger(__name__) + +class VideoAssembler: + """Handles final video assembly from processed segments.""" + + def __init__(self, preserve_audio: bool = True, use_nvenc: bool = False, + output_mode: str = "green_screen"): + """ + Initialize video assembler. + + Args: + preserve_audio: Whether to preserve audio from original video + use_nvenc: Whether to use hardware encoding for final output + output_mode: Output mode - "green_screen" or "alpha_channel" + """ + self.preserve_audio = preserve_audio + self.use_nvenc = use_nvenc + self.output_mode = output_mode + + def create_concat_file(self, segments_dir: str, output_filename: str = "concat_list.txt") -> Optional[str]: + """ + Create a concatenation file for FFmpeg. + + Args: + segments_dir: Directory containing processed segments + output_filename: Name for the concat file + + Returns: + Path to concat file or None if no valid segments found + """ + concat_path = os.path.join(segments_dir, output_filename) + valid_segments = 0 + + try: + segments = get_segments_directories(segments_dir) + + with open(concat_path, 'w') as f: + for i, segment in enumerate(segments): + segment_dir = os.path.join(segments_dir, segment) + if self.output_mode == "alpha_channel": + output_video = os.path.join(segment_dir, f"output_{i}.mov") + else: + output_video = os.path.join(segment_dir, f"output_{i}.mp4") + + if file_exists(output_video): + # Use relative path for FFmpeg + relative_path = os.path.relpath(output_video, segments_dir) + f.write(f"file '{relative_path}'\n") + valid_segments += 1 + else: + logger.warning(f"Output video not found for segment {i}: {output_video}") + + if valid_segments == 0: + logger.error("No valid output segments found for concatenation") + os.remove(concat_path) + return None + + logger.info(f"Created concatenation file with {valid_segments} segments: {concat_path}") + return concat_path + + except Exception as e: + logger.error(f"Error creating concatenation file: {e}") + return None + + def concatenate_segments(self, segments_dir: str, output_path: str, + bitrate: str = "50M") -> bool: + """ + Concatenate video segments using FFmpeg. + + Args: + segments_dir: Directory containing processed segments + output_path: Path for final concatenated video + bitrate: Output video bitrate + + Returns: + True if successful + """ + # Create concatenation file + concat_file = self.create_concat_file(segments_dir) + if not concat_file: + return False + + try: + # Build FFmpeg command + if self.output_mode == "alpha_channel": + # For alpha channel, we need to maintain the ProRes codec + cmd = [ + 'ffmpeg', + '-y', # Overwrite output + '-f', 'concat', + '-safe', '0', + '-i', concat_file, + '-c:v', 'copy', # Copy video codec to preserve alpha + '-an', # No audio for now + output_path + ] + else: + cmd = [ + 'ffmpeg', + '-y', # Overwrite output + '-f', 'concat', + '-safe', '0', + '-i', concat_file, + '-c:v', 'copy', # Copy video codec (no re-encoding) + '-an', # No audio for now + output_path + ] + + # Use hardware encoding if requested + if self.use_nvenc: + import sys + if sys.platform == 'darwin': + encoder = 'hevc_videotoolbox' + else: + encoder = 'hevc_nvenc' + + # Re-encode with hardware acceleration + cmd = [ + 'ffmpeg', + '-y', + '-f', 'concat', + '-safe', '0', + '-i', concat_file, + '-c:v', encoder, + '-preset', 'slow', + '-b:v', bitrate, + '-pix_fmt', 'yuv420p', + '-an', + output_path + ] + + logger.info(f"Running concatenation command: {' '.join(cmd)}") + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + logger.error(f"FFmpeg concatenation failed: {result.stderr}") + return False + + logger.info(f"Successfully concatenated segments to: {output_path}") + + # Clean up concat file + try: + os.remove(concat_file) + except: + pass + + return True + + except Exception as e: + logger.error(f"Error during concatenation: {e}") + return False + + def copy_audio_from_original(self, original_video: str, processed_video: str, + final_output: str) -> bool: + """ + Copy audio track from original video to processed video. + + Args: + original_video: Path to original video with audio + processed_video: Path to processed video without audio + final_output: Path for final output with audio + + Returns: + True if successful + """ + if not self.preserve_audio: + logger.info("Audio preservation disabled, skipping audio copy") + return True + + try: + # Check if original video has audio + probe_cmd = [ + 'ffprobe', + '-v', 'error', + '-select_streams', 'a:0', + '-show_entries', 'stream=codec_type', + '-of', 'csv=p=0', + original_video + ] + + result = subprocess.run(probe_cmd, capture_output=True, text=True) + + if result.returncode != 0 or result.stdout.strip() != 'audio': + logger.warning("Original video has no audio track") + # Just copy the processed video + import shutil + shutil.copy2(processed_video, final_output) + return True + + # Copy audio from original to processed video + cmd = [ + 'ffmpeg', + '-y', + '-i', processed_video, # Video input + '-i', original_video, # Audio input + '-c:v', 'copy', # Copy video stream + '-c:a', 'copy', # Copy audio stream + '-map', '0:v:0', # Map video from first input + '-map', '1:a:0', # Map audio from second input + '-shortest', # Match duration to shortest stream + final_output + ] + + logger.info("Copying audio from original video...") + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + logger.error(f"FFmpeg audio copy failed: {result.stderr}") + return False + + logger.info(f"Successfully added audio to final video: {final_output}") + return True + + except Exception as e: + logger.error(f"Error copying audio: {e}") + return False + + def assemble_final_video(self, segments_dir: str, original_video: str, + output_path: str, bitrate: str = "50M") -> bool: + """ + Complete pipeline to assemble final video with audio. + + Args: + segments_dir: Directory containing processed segments + original_video: Path to original video (for audio) + output_path: Path for final output video + bitrate: Output video bitrate + + Returns: + True if successful + """ + logger.info("Starting final video assembly...") + + # Step 1: Concatenate segments + temp_concat_path = os.path.join(os.path.dirname(output_path), "temp_concat.mp4") + + if not self.concatenate_segments(segments_dir, temp_concat_path, bitrate): + logger.error("Failed to concatenate segments") + return False + + # Step 2: Add audio from original + if self.preserve_audio and file_exists(original_video): + success = self.copy_audio_from_original(original_video, temp_concat_path, output_path) + + # Clean up temp file + try: + os.remove(temp_concat_path) + except: + pass + + return success + else: + # No audio to add, just rename temp file + import shutil + try: + shutil.move(temp_concat_path, output_path) + logger.info(f"Final video saved to: {output_path}") + return True + except Exception as e: + logger.error(f"Error moving final video: {e}") + return False + + def verify_segment_completeness(self, segments_dir: str) -> tuple[bool, List[int]]: + """ + Verify all segments have been processed. + + Args: + segments_dir: Directory containing segments + + Returns: + Tuple of (all_complete, missing_segments) + """ + segments = get_segments_directories(segments_dir) + missing_segments = [] + + for i, segment in enumerate(segments): + segment_dir = os.path.join(segments_dir, segment) + if self.output_mode == "alpha_channel": + output_video = os.path.join(segment_dir, f"output_{i}.mov") + else: + output_video = os.path.join(segment_dir, f"output_{i}.mp4") + + if not file_exists(output_video): + missing_segments.append(i) + + all_complete = len(missing_segments) == 0 + + if all_complete: + logger.info(f"All {len(segments)} segments have been processed") + else: + logger.warning(f"Missing output for segments: {missing_segments}") + + return all_complete, missing_segments \ No newline at end of file diff --git a/core/yolo_detector.py b/core/yolo_detector.py index c03fa46..0eaf550 100644 --- a/core/yolo_detector.py +++ b/core/yolo_detector.py @@ -732,4 +732,300 @@ class YOLODetector: except Exception as e: logger.error(f"Error creating debug frame: {e}") + return False + + def detect_humans_in_single_eye(self, frame: np.ndarray, eye_side: str) -> List[Dict[str, Any]]: + """ + Detect humans in a single eye frame (left or right). + + Args: + frame: Input eye frame (BGR format) + eye_side: 'left' or 'right' eye + + Returns: + List of human detection dictionaries for the single eye + """ + logger.info(f"Running YOLO detection on {eye_side} eye frame") + + # Run standard detection on the eye frame + detections = self.detect_humans_in_frame(frame) + + logger.info(f"YOLO {eye_side.upper()} Eye: Found {len(detections)} human detections") + + for i, detection in enumerate(detections): + bbox = detection['bbox'] + conf = detection['confidence'] + has_mask = detection.get('has_mask', False) + logger.debug(f"YOLO {eye_side.upper()} Eye Detection {i+1}: bbox={bbox}, conf={conf:.3f}, has_mask={has_mask}") + + return detections + + def convert_eye_detections_to_sam2_prompts(self, detections: List[Dict[str, Any]], + eye_side: str) -> List[Dict[str, Any]]: + """ + Convert single eye detections to SAM2 prompts (always uses obj_id=1 for single eye processing). + + Args: + detections: List of YOLO detection results for single eye + eye_side: 'left' or 'right' eye + + Returns: + List of SAM2 prompt dictionaries with obj_id=1 for single eye processing + """ + if not detections: + logger.warning(f"No detections provided for {eye_side} eye SAM2 prompt conversion") + return [] + + logger.info(f"Converting {len(detections)} {eye_side} eye detections to SAM2 prompts") + + prompts = [] + + # For single eye processing, always use obj_id=1 and take the best detection + best_detection = max(detections, key=lambda x: x['confidence']) + + prompts.append({ + 'obj_id': 1, # Always use obj_id=1 for single eye processing + 'bbox': best_detection['bbox'].copy(), + 'confidence': best_detection['confidence'] + }) + + logger.info(f"{eye_side.upper()} Eye: Converted best detection (conf={best_detection['confidence']:.3f}) to SAM2 Object 1") + + return prompts + + def has_any_detections(self, detections_list: List[List[Dict[str, Any]]]) -> bool: + """ + Check if any detections exist in a list of detection lists. + + Args: + detections_list: List of detection lists (e.g., [left_detections, right_detections]) + + Returns: + True if any detections are found + """ + for detections in detections_list: + if detections: + return True + return False + + def split_detections_by_eye(self, detections: List[Dict[str, Any]], frame_width: int) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Split VR180 detections into left and right eye detections with coordinate conversion. + + Args: + detections: List of full-frame VR180 detections + frame_width: Width of the full VR180 frame + + Returns: + Tuple of (left_eye_detections, right_eye_detections) with converted coordinates + """ + half_width = frame_width // 2 + left_detections = [] + right_detections = [] + + logger.info(f"Splitting {len(detections)} VR180 detections by eye (frame_width={frame_width}, half_width={half_width})") + + for i, detection in enumerate(detections): + bbox = detection['bbox'] + center_x = (bbox[0] + bbox[2]) / 2 + + logger.info(f"Detection {i}: bbox={bbox}, center_x={center_x:.1f}") + + # Create a copy with converted coordinates + converted_detection = detection.copy() + converted_bbox = bbox.copy() + + if center_x < half_width: + # Left eye detection - coordinates remain the same + # For segmentation mode, we also need to crop the mask to the left eye + if detection.get('has_mask', False) and 'mask' in detection: + original_mask = detection['mask'] + # Crop mask to left half (keep original coordinates for now, will be handled in eye processing) + converted_detection['mask'] = original_mask + logger.info(f"Detection {i}: LEFT eye mask shape: {original_mask.shape}") + + left_detections.append(converted_detection) + logger.info(f"Detection {i}: Assigned to LEFT eye, center_x={center_x:.1f} < {half_width}, bbox={bbox}") + else: + # Right eye detection - shift coordinates to start from 0 + original_bbox = converted_bbox.copy() + converted_bbox[0] -= half_width # x1 + converted_bbox[2] -= half_width # x2 + + # Ensure coordinates are within bounds + converted_bbox[0] = max(0, converted_bbox[0]) + converted_bbox[2] = max(0, min(converted_bbox[2], half_width)) + + converted_detection['bbox'] = converted_bbox + + # For segmentation mode, we also need to crop the mask to the right eye + if detection.get('has_mask', False) and 'mask' in detection: + original_mask = detection['mask'] + # Crop mask to right half and shift coordinates + # Note: This is a simplified approach - the mask coordinates need to be handled properly + converted_detection['mask'] = original_mask # Will be properly handled in eye processing + logger.info(f"Detection {i}: RIGHT eye mask shape: {original_mask.shape}") + + right_detections.append(converted_detection) + + logger.info(f"Detection {i}: Assigned to RIGHT eye, center_x={center_x:.1f} >= {half_width}, original_bbox={original_bbox}, converted_bbox={converted_bbox}") + + logger.info(f"Split result: {len(left_detections)} left eye, {len(right_detections)} right eye detections") + + return left_detections, right_detections + + def save_eye_debug_frames(self, left_frame: np.ndarray, right_frame: np.ndarray, + left_detections: List[Dict[str, Any]], right_detections: List[Dict[str, Any]], + left_output_path: str, right_output_path: str) -> Tuple[bool, bool]: + """ + Save debug frames for both left and right eye detections. + + Args: + left_frame: Left eye frame + right_frame: Right eye frame + left_detections: Left eye detections + right_detections: Right eye detections + left_output_path: Output path for left eye debug frame + right_output_path: Output path for right eye debug frame + + Returns: + Tuple of (left_success, right_success) + """ + logger.info(f"Saving eye-specific debug frames") + + # Save left eye debug frame (eye-specific version) + left_success = self._save_single_eye_debug_frame( + left_frame, left_detections, left_output_path, "LEFT" + ) + + # Save right eye debug frame (eye-specific version) + right_success = self._save_single_eye_debug_frame( + right_frame, right_detections, right_output_path, "RIGHT" + ) + + if left_success: + logger.info(f"Saved left eye debug frame: {left_output_path}") + if right_success: + logger.info(f"Saved right eye debug frame: {right_output_path}") + + return left_success, right_success + + def _save_single_eye_debug_frame(self, frame: np.ndarray, detections: List[Dict[str, Any]], + output_path: str, eye_side: str) -> bool: + """ + Save a debug frame for a single eye with eye-specific visualizations. + + Args: + frame: Single eye frame (BGR format from OpenCV) + detections: List of detection dictionaries for this eye + output_path: Path to save the debug image + eye_side: "LEFT" or "RIGHT" + + Returns: + True if saved successfully + """ + try: + debug_frame = frame.copy() + + # Draw masks or bounding boxes for each detection + for i, detection in enumerate(detections): + bbox = detection['bbox'] + confidence = detection['confidence'] + has_mask = detection.get('has_mask', False) + + # Extract coordinates + x1, y1, x2, y2 = map(int, bbox) + + # Choose color based on confidence (green for high, yellow for medium, red for low) + if confidence >= 0.8: + color = (0, 255, 0) # Green + elif confidence >= 0.6: + color = (0, 255, 255) # Yellow + else: + color = (0, 0, 255) # Red + + if has_mask and 'mask' in detection: + # Draw segmentation mask + mask = detection['mask'] + + # Resize mask to match frame if needed + if mask.shape != debug_frame.shape[:2]: + mask = cv2.resize(mask.astype(np.float32), (debug_frame.shape[1], debug_frame.shape[0]), interpolation=cv2.INTER_NEAREST) + mask = mask > 0.5 + + mask = mask.astype(bool) + + # Apply colored overlay with transparency + overlay = debug_frame.copy() + overlay[mask] = color + cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame) + + # Draw mask outline + contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + cv2.drawContours(debug_frame, contours, -1, color, 2) + + # Prepare label text for segmentation + label = f"Person {i+1}: {confidence:.2f} (MASK)" + else: + # Draw bounding box (detection mode or no mask available) + cv2.rectangle(debug_frame, (x1, y1), (x2, y2), color, 2) + + # Prepare label text for detection + label = f"Person {i+1}: {confidence:.2f} (BBOX)" + + label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] + + # Draw label background + cv2.rectangle(debug_frame, + (x1, y1 - label_size[1] - 10), + (x1 + label_size[0], y1), + color, -1) + + # Draw label text + cv2.putText(debug_frame, label, + (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, + (255, 255, 255), 2) + + # Add title specific to this eye + frame_height, frame_width = debug_frame.shape[:2] + title = f"{eye_side} EYE: {len(detections)} detections" + cv2.putText(debug_frame, title, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) + + # Add mode information + mode_text = f"YOLO Mode: {self.mode.upper()}" + masks_available = sum(1 for d in detections if d.get('has_mask', False)) + + if self.supports_segmentation and masks_available > 0: + summary = f"{len(detections)} detections → {masks_available} MASKS" + else: + summary = f"{len(detections)} detections → BOUNDING BOXES" + + cv2.putText(debug_frame, mode_text, + (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, + (0, 255, 255), 2) # Yellow for mode + cv2.putText(debug_frame, summary, + (10, 90), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, + (255, 255, 255), 2) + + # Add frame dimensions info + dims_info = f"Frame: {frame_width}x{frame_height}" + cv2.putText(debug_frame, dims_info, + (10, 120), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, + (255, 255, 255), 2) + + # Save debug frame + success = cv2.imwrite(output_path, debug_frame) + if success: + logger.info(f"Saved {eye_side} eye debug frame to {output_path}") + else: + logger.error(f"Failed to save {eye_side} eye debug frame to {output_path}") + + return success + + except Exception as e: + logger.error(f"Error creating {eye_side} eye debug frame: {e}") return False \ No newline at end of file diff --git a/main.py b/main.py index dadde3f..b870ac2 100644 --- a/main.py +++ b/main.py @@ -188,6 +188,293 @@ def resolve_detect_segments(detect_segments, total_segments: int) -> List[int]: logger.warning(f"Invalid detect_segments format: {detect_segments}. Using all segments.") return list(range(total_segments)) +def process_segment_with_separate_eyes(segment_info, detector, sam2_processor, mask_processor, config, + previous_left_masks=None, previous_right_masks=None): + """ + Process a single segment using separate eye processing mode. + Split video first, then run YOLO independently on each eye. + + Args: + segment_info: Segment information dictionary + detector: YOLO detector instance + sam2_processor: SAM2 processor with eye processing enabled + mask_processor: Mask processor instance + config: Configuration loader instance + previous_left_masks: Previous masks for left eye + previous_right_masks: Previous masks for right eye + + Returns: + Tuple of (success, left_masks, right_masks) + """ + segment_idx = segment_info['index'] + logger.info(f"VR180 Separate Eyes: Processing segment {segment_idx} (video-split approach)") + + # Get video properties + cap = cv2.VideoCapture(segment_info['video_file']) + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + cap.release() + + full_frame_shape = (frame_height, frame_width) + + # Step 1: Split the segment video into left and right eye videos + left_eye_video = os.path.join(segment_info['directory'], "left_eye.mp4") + right_eye_video = os.path.join(segment_info['directory'], "right_eye.mp4") + + logger.info(f"VR180 Separate Eyes: Splitting segment video into eye videos") + success = sam2_processor.eye_processor.split_video_into_eyes( + segment_info['video_file'], + left_eye_video, + right_eye_video, + scale=config.get_inference_scale() + ) + + if not success: + logger.error(f"VR180 Separate Eyes: Failed to split video for segment {segment_idx}") + return False, None, None + + # Check if both eye videos were created + if not os.path.exists(left_eye_video) or not os.path.exists(right_eye_video): + logger.error(f"VR180 Separate Eyes: Eye video files not created for segment {segment_idx}") + return False, None, None + + logger.info(f"VR180 Separate Eyes: Created eye videos - left: {left_eye_video}, right: {right_eye_video}") + + # Step 2: Run YOLO independently on each eye video + left_detections = detector.detect_humans_in_video_first_frame( + left_eye_video, scale=1.0 # Already scaled during video splitting + ) + + right_detections = detector.detect_humans_in_video_first_frame( + right_eye_video, scale=1.0 # Already scaled during video splitting + ) + + logger.info(f"VR180 Separate Eyes: YOLO detections - left: {len(left_detections)}, right: {len(right_detections)}") + + # Check if we have YOLO segmentation masks + has_yolo_masks = False + if detector.supports_segmentation: + has_yolo_masks = any(d.get('has_mask', False) for d in (left_detections + right_detections)) + + if has_yolo_masks: + logger.info(f"VR180 Separate Eyes: YOLO segmentation mode - using direct masks instead of bounding boxes") + + # Save eye-specific debug frames if enabled + if config.get('advanced.save_yolo_debug_frames', False) and (left_detections or right_detections): + try: + # Load first frames from each eye video + left_cap = cv2.VideoCapture(left_eye_video) + ret_left, left_frame = left_cap.read() + left_cap.release() + + right_cap = cv2.VideoCapture(right_eye_video) + ret_right, right_frame = right_cap.read() + right_cap.release() + + if ret_left and ret_right: + # Save eye-specific debug frames + left_debug_path = os.path.join(segment_info['directory'], "left_eye_debug.jpg") + right_debug_path = os.path.join(segment_info['directory'], "right_eye_debug.jpg") + + detector.save_eye_debug_frames( + left_frame, right_frame, + left_detections, right_detections, + left_debug_path, right_debug_path + ) + + logger.info(f"VR180 Separate Eyes: Saved eye-specific debug frames for segment {segment_idx}") + else: + logger.warning(f"VR180 Separate Eyes: Could not load eye frames for debug visualization") + + except Exception as e: + logger.warning(f"VR180 Separate Eyes: Failed to create eye debug frames: {e}") + + # Step 3: Process left eye if detections exist or we have previous masks + left_masks = None + if left_detections or previous_left_masks: + try: + left_prompts = None + left_initial_masks = None + + if left_detections: + if has_yolo_masks: + # YOLO segmentation mode: convert masks to initial masks for SAM2 + left_initial_masks = {} + for i, detection in enumerate(left_detections): + if detection.get('has_mask', False): + mask = detection['mask'] + left_initial_masks[1] = mask.astype(bool) # Always use obj_id=1 for single eye + logger.info(f"VR180 Separate Eyes: Left eye YOLO mask - shape: {mask.shape}, pixels: {np.sum(mask)}") + break # Only take the first/best mask for single eye processing + + if left_initial_masks: + logger.info(f"VR180 Separate Eyes: Left eye - using YOLO segmentation masks as initial masks") + else: + # YOLO detection mode: convert bounding boxes to prompts + left_prompts = detector.convert_detections_to_sam2_prompts(left_detections, frame_width // 2) + logger.info(f"VR180 Separate Eyes: Left eye - {len(left_prompts)} SAM2 prompts") + + # Create temporary segment info for left eye processing + left_segment_info = segment_info.copy() + left_segment_info['video_file'] = left_eye_video + + left_masks = sam2_processor.process_single_eye_segment( + left_segment_info, 'left', left_prompts, + left_initial_masks or previous_left_masks, + 1.0 # Scale already applied during video splitting + ) + + if left_masks: + logger.info(f"VR180 Separate Eyes: Left eye processed - {len(left_masks)} frame masks") + else: + logger.warning(f"VR180 Separate Eyes: Left eye processing failed") + + except Exception as e: + logger.error(f"VR180 Separate Eyes: Error processing left eye for segment {segment_idx}: {e}") + left_masks = None + + # Step 4: Process right eye if detections exist or we have previous masks + right_masks = None + if right_detections or previous_right_masks: + try: + right_prompts = None + right_initial_masks = None + + if right_detections: + if has_yolo_masks: + # YOLO segmentation mode: convert masks to initial masks for SAM2 + right_initial_masks = {} + for i, detection in enumerate(right_detections): + if detection.get('has_mask', False): + mask = detection['mask'] + right_initial_masks[1] = mask.astype(bool) # Always use obj_id=1 for single eye + logger.info(f"VR180 Separate Eyes: Right eye YOLO mask - shape: {mask.shape}, pixels: {np.sum(mask)}") + break # Only take the first/best mask for single eye processing + + if right_initial_masks: + logger.info(f"VR180 Separate Eyes: Right eye - using YOLO segmentation masks as initial masks") + else: + # YOLO detection mode: convert bounding boxes to prompts + right_prompts = detector.convert_detections_to_sam2_prompts(right_detections, frame_width // 2) + logger.info(f"VR180 Separate Eyes: Right eye - {len(right_prompts)} SAM2 prompts") + + # Create temporary segment info for right eye processing + right_segment_info = segment_info.copy() + right_segment_info['video_file'] = right_eye_video + + right_masks = sam2_processor.process_single_eye_segment( + right_segment_info, 'right', right_prompts, + right_initial_masks or previous_right_masks, + 1.0 # Scale already applied during video splitting + ) + + if right_masks: + logger.info(f"VR180 Separate Eyes: Right eye processed - {len(right_masks)} frame masks") + else: + logger.warning(f"VR180 Separate Eyes: Right eye processing failed") + + except Exception as e: + logger.error(f"VR180 Separate Eyes: Error processing right eye for segment {segment_idx}: {e}") + right_masks = None + + # Step 5: Check if we got any valid masks + if not left_masks and not right_masks: + logger.warning(f"VR180 Separate Eyes: Neither eye produced valid masks for segment {segment_idx}") + + if config.get('processing.enable_greenscreen_fallback', True): + logger.info(f"VR180 Separate Eyes: Using greenscreen fallback for segment {segment_idx}") + success = mask_processor.process_greenscreen_only_segment( + segment_info, + green_color=config.get_green_color(), + use_nvenc=config.get_use_nvenc(), + bitrate=config.get_output_bitrate() + ) + return success, None, None + else: + logger.error(f"VR180 Separate Eyes: No masks generated and greenscreen fallback disabled") + return False, None, None + + # Step 6: Combine masks back to full frame format + try: + logger.info(f"VR180 Separate Eyes: Combining eye masks for segment {segment_idx}") + combined_masks = sam2_processor.eye_processor.combine_eye_masks( + left_masks, right_masks, full_frame_shape + ) + + if not combined_masks: + logger.error(f"VR180 Separate Eyes: Failed to combine eye masks for segment {segment_idx}") + return False, left_masks, right_masks + + # Validate combined masks have reasonable content + total_mask_pixels = 0 + for frame_idx, frame_masks in combined_masks.items(): + for obj_id, mask in frame_masks.items(): + if mask is not None: + total_mask_pixels += np.sum(mask) + + if total_mask_pixels == 0: + logger.warning(f"VR180 Separate Eyes: Combined masks are empty for segment {segment_idx}") + if config.get('processing.enable_greenscreen_fallback', True): + logger.info(f"VR180 Separate Eyes: Using greenscreen fallback due to empty masks") + success = mask_processor.process_greenscreen_only_segment( + segment_info, + green_color=config.get_green_color(), + use_nvenc=config.get_use_nvenc(), + bitrate=config.get_output_bitrate() + ) + return success, left_masks, right_masks + + logger.info(f"VR180 Separate Eyes: Combined masks contain {total_mask_pixels} total pixels") + + except Exception as e: + logger.error(f"VR180 Separate Eyes: Error combining eye masks for segment {segment_idx}: {e}") + # Try greenscreen fallback if mask combination fails + if config.get('processing.enable_greenscreen_fallback', True): + logger.info(f"VR180 Separate Eyes: Using greenscreen fallback due to mask combination error") + success = mask_processor.process_greenscreen_only_segment( + segment_info, + green_color=config.get_green_color(), + use_nvenc=config.get_use_nvenc(), + bitrate=config.get_output_bitrate() + ) + return success, left_masks, right_masks + else: + return False, left_masks, right_masks + + # Step 7: Save combined masks + mask_path = os.path.join(segment_info['directory'], "mask.png") + sam2_processor.save_final_masks( + combined_masks, + mask_path, + green_color=config.get_green_color(), + blue_color=config.get_blue_color() + ) + + # Step 8: Apply green screen and save output video + success = mask_processor.process_segment( + segment_info, + combined_masks, + use_nvenc=config.get_use_nvenc(), + bitrate=config.get_output_bitrate() + ) + + if success: + logger.info(f"VR180 Separate Eyes: Successfully processed segment {segment_idx}") + else: + logger.error(f"VR180 Separate Eyes: Failed to create output video for segment {segment_idx}") + + # Clean up temporary eye video files + try: + if os.path.exists(left_eye_video): + os.remove(left_eye_video) + if os.path.exists(right_eye_video): + os.remove(right_eye_video) + logger.debug(f"VR180 Separate Eyes: Cleaned up temporary eye videos for segment {segment_idx}") + except Exception as e: + logger.warning(f"VR180 Separate Eyes: Failed to clean up temporary eye videos: {e}") + + return success, left_masks, right_masks + def main(): """Main processing pipeline.""" args = parse_arguments() @@ -275,10 +562,23 @@ def main(): ) logger.info("Step 3: Initializing SAM2 processor") + + # Check if separate eye processing is enabled + separate_eye_processing = config.get('processing.separate_eye_processing', False) + eye_overlap_pixels = config.get('processing.eye_overlap_pixels', 0) + enable_greenscreen_fallback = config.get('processing.enable_greenscreen_fallback', True) + + if separate_eye_processing: + logger.info("VR180 Separate Eye Processing: ENABLED") + logger.info(f"Eye overlap pixels: {eye_overlap_pixels}") + logger.info(f"Greenscreen fallback: {enable_greenscreen_fallback}") + sam2_processor = SAM2Processor( checkpoint_path=config.get_sam2_checkpoint(), config_path=config.get_sam2_config(), - vos_optimized=config.get('models.sam2_vos_optimized', False) + vos_optimized=config.get('models.sam2_vos_optimized', False), + separate_eye_processing=separate_eye_processing, + eye_overlap_pixels=eye_overlap_pixels ) # Initialize mask processor with quality enhancements @@ -293,6 +593,10 @@ def main(): logger.info("Step 4: Processing segments sequentially") total_humans_detected = 0 + # Initialize previous masks for separate eye processing + previous_left_masks = None + previous_right_masks = None + for i, segment_info in enumerate(segments_info): segment_idx = segment_info['index'] @@ -307,6 +611,25 @@ def main(): logger.info(f"Segment {segment_idx} already processed, skipping") continue + # Branch based on processing mode + if separate_eye_processing: + # Use separate eye processing mode + success, left_masks, right_masks = process_segment_with_separate_eyes( + segment_info, detector, sam2_processor, mask_processor, config, + previous_left_masks, previous_right_masks + ) + + # Update previous masks for next segment + previous_left_masks = left_masks + previous_right_masks = right_masks + + if success: + logger.info(f"Successfully processed segment {segment_idx} with separate eye processing") + else: + logger.error(f"Failed to process segment {segment_idx} with separate eye processing") + + continue # Skip the original processing logic + # Determine if we should use YOLO detections or previous masks use_detections = segment_idx in detect_segments