import cv2 import numpy as np from typing import List, Dict, Any, Optional, Tuple from pathlib import Path import warnings from .video_processor import VideoProcessor from .config import VR180Config class VR180Processor(VideoProcessor): """Enhanced video processor with VR180-specific optimizations""" def __init__(self, config: VR180Config): super().__init__(config) # VR180 specific properties self.left_eye_width = 0 self.right_eye_width = 0 self.eye_height = 0 self.sbs_split_point = 0 def analyze_sbs_layout(self) -> Dict[str, Any]: """ Analyze side-by-side layout and determine eye regions Returns: Dictionary with eye region information """ if self.video_info is None: raise RuntimeError("Video info not loaded") total_width = self.video_info['width'] total_height = self.video_info['height'] # Assume equal split for VR180 SBS self.sbs_split_point = total_width // 2 self.left_eye_width = self.sbs_split_point self.right_eye_width = total_width - self.sbs_split_point self.eye_height = total_height layout_info = { 'total_width': total_width, 'total_height': total_height, 'split_point': self.sbs_split_point, 'left_eye_region': (0, 0, self.left_eye_width, self.eye_height), 'right_eye_region': (self.sbs_split_point, 0, self.right_eye_width, self.eye_height), 'eye_aspect_ratio': self.left_eye_width / self.eye_height } print(f"VR180 SBS Layout: {total_width}x{total_height}") print(f"Split point: {self.sbs_split_point}") print(f"Left eye: {self.left_eye_width}x{self.eye_height}") print(f"Right eye: {self.right_eye_width}x{self.eye_height}") return layout_info def split_sbs_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Split side-by-side frame into left and right eye views Args: frame: Input SBS frame Returns: Tuple of (left_eye_frame, right_eye_frame) """ # Always calculate split point based on current frame width # This handles scaled frames correctly frame_width = frame.shape[1] current_split_point = frame_width // 2 # Debug info on first use if self.sbs_split_point == 0: print(f"Frame dimensions: {frame.shape[1]}x{frame.shape[0]}") print(f"Split point: {current_split_point}") self.sbs_split_point = current_split_point # Store for reference left_eye = frame[:, :current_split_point] right_eye = frame[:, current_split_point:] # Validate both eyes have content if left_eye.size == 0: raise RuntimeError(f"Left eye frame is empty after split (frame width: {frame_width})") if right_eye.size == 0: raise RuntimeError(f"Right eye frame is empty after split (frame width: {frame_width})") return left_eye, right_eye def combine_sbs_frame(self, left_eye: np.ndarray, right_eye: np.ndarray) -> np.ndarray: """ Combine left and right eye frames back into side-by-side format Args: left_eye: Left eye frame right_eye: Right eye frame Returns: Combined SBS frame """ # Ensure frames have same height if left_eye.shape[0] != right_eye.shape[0]: target_height = min(left_eye.shape[0], right_eye.shape[0]) left_eye = cv2.resize(left_eye, (left_eye.shape[1], target_height)) right_eye = cv2.resize(right_eye, (right_eye.shape[1], target_height)) # Combine horizontally combined = np.hstack([left_eye, right_eye]) return combined def process_with_disparity_mapping(self, frames: List[np.ndarray], chunk_idx: int = 0) -> List[np.ndarray]: """ Process frames using disparity mapping optimization Args: frames: List of SBS frames chunk_idx: Chunk index Returns: List of processed SBS frames """ print(f"Processing chunk {chunk_idx} with disparity mapping ({len(frames)} frames)") # Split all frames into left/right eyes left_eye_frames = [] right_eye_frames = [] for i, frame in enumerate(frames): left, right = self.split_sbs_frame(frame) # Debug: Check if frames are valid if i == 0: # Only debug first frame print(f"Original frame shape: {frame.shape}") print(f"Left eye shape: {left.shape}") print(f"Right eye shape: {right.shape}") print(f"Left eye min/max: {left.min()}/{left.max()}") print(f"Right eye min/max: {right.min()}/{right.max()}") # Validate frames if left.size == 0: raise RuntimeError(f"Left eye frame {i} is empty") if right.size == 0: raise RuntimeError(f"Right eye frame {i} is empty") left_eye_frames.append(left) right_eye_frames.append(right) # Process left eye at full quality print("Processing left eye...") with self.memory_manager.memory_monitor(f"left eye chunk {chunk_idx}"): left_matted = self._process_eye_sequence(left_eye_frames, "left", chunk_idx) # Process right eye with cross-validation print("Processing right eye with cross-validation...") with self.memory_manager.memory_monitor(f"right eye chunk {chunk_idx}"): right_matted = self._process_eye_sequence_with_validation( right_eye_frames, left_matted, "right", chunk_idx ) # Combine results back to SBS format combined_frames = [] for left_frame, right_frame in zip(left_matted, right_matted): if self.config.output.maintain_sbs: combined = self.combine_sbs_frame(left_frame, right_frame) else: # Return as separate eye outputs combined = {'left': left_frame, 'right': right_frame} combined_frames.append(combined) return combined_frames def _process_eye_sequence(self, eye_frames: List[np.ndarray], eye_name: str, chunk_idx: int) -> List[np.ndarray]: """Process a single eye sequence""" if not eye_frames: return [] # Create a unique temporary video for this eye processing import uuid temp_video_name = f"temp_sam2_{eye_name}_chunk{chunk_idx}_{uuid.uuid4().hex[:8]}.mp4" temp_video_path = Path.cwd() / temp_video_name try: # Use ffmpeg approach since OpenCV video writer is failing height, width = eye_frames[0].shape[:2] temp_video_path = temp_video_path.with_suffix('.mp4') print(f"Creating temp video using ffmpeg: {temp_video_path}") print(f"Video params: size=({width}, {height}), frames={len(eye_frames)}") # Create a temporary directory for frame images temp_frames_dir = temp_video_path.parent / f"frames_{temp_video_path.stem}" temp_frames_dir.mkdir(exist_ok=True) # Save frames as individual images (using JPEG for smaller file size) print("Saving frames as images...") for i, frame in enumerate(eye_frames): # Check if frame is empty if frame.size == 0: raise RuntimeError(f"Frame {i} is empty (size=0)") # Ensure frame is uint8 if frame.dtype != np.uint8: frame = frame.astype(np.uint8) # Debug first frame if i == 0: print(f"First frame to save: shape={frame.shape}, dtype={frame.dtype}, empty={frame.size == 0}") # Use JPEG instead of PNG for smaller files (faster I/O, less disk space) frame_path = temp_frames_dir / f"frame_{i:06d}.jpg" # Use high quality JPEG to minimize compression artifacts success = cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95]) if not success: print(f"Frame {i} details: shape={frame.shape}, dtype={frame.dtype}, size={frame.size}") raise RuntimeError(f"Failed to save frame {i} as image") if i % 50 == 0: print(f"Saved {i}/{len(eye_frames)} frames") # Force garbage collection every 100 frames to free memory if i % 100 == 0: import gc gc.collect() # Use ffmpeg to create video from images import subprocess # Use the original video's framerate - access through parent class original_fps = self.fps if hasattr(self, 'fps') else 30.0 print(f"Using framerate: {original_fps} fps") # Memory monitoring before ffmpeg self._print_memory_step(f"Before ffmpeg encoding ({eye_name} eye)") # Try GPU encoding first, fallback to CPU gpu_cmd = [ 'ffmpeg', '-y', # -y to overwrite output file '-framerate', str(original_fps), '-i', str(temp_frames_dir / 'frame_%06d.jpg'), '-c:v', 'h264_nvenc', # NVIDIA GPU encoder '-preset', 'fast', # GPU preset '-cq', '18', # Quality for GPU encoding '-pix_fmt', 'yuv420p', str(temp_video_path) ] cpu_cmd = [ 'ffmpeg', '-y', # -y to overwrite output file '-framerate', str(original_fps), '-i', str(temp_frames_dir / 'frame_%06d.jpg'), '-c:v', 'libx264', # CPU encoder '-pix_fmt', 'yuv420p', '-crf', '18', # Quality for CPU encoding '-preset', 'medium', str(temp_video_path) ] # Try GPU first print(f"Trying GPU encoding: {' '.join(gpu_cmd)}") result = subprocess.run(gpu_cmd, capture_output=True, text=True) if result.returncode != 0: print("GPU encoding failed, trying CPU...") print(f"GPU error: {result.stderr}") ffmpeg_cmd = cpu_cmd print(f"Using CPU encoding: {' '.join(ffmpeg_cmd)}") result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) else: print("GPU encoding successful!") ffmpeg_cmd = gpu_cmd print(f"Running ffmpeg: {' '.join(ffmpeg_cmd)}") result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) if result.returncode != 0: print(f"FFmpeg stdout: {result.stdout}") print(f"FFmpeg stderr: {result.stderr}") raise RuntimeError(f"FFmpeg failed with return code {result.returncode}") # Clean up frame images import shutil shutil.rmtree(temp_frames_dir) print(f"Created temp video successfully") # Memory monitoring after ffmpeg self._print_memory_step(f"After ffmpeg encoding ({eye_name} eye)") # Verify the file was created and has content if not temp_video_path.exists(): raise RuntimeError(f"Temporary video file was not created: {temp_video_path}") file_size = temp_video_path.stat().st_size if file_size == 0: raise RuntimeError(f"Temporary video file is empty: {temp_video_path}") print(f"Created temp video {temp_video_path} ({file_size / 1024 / 1024:.1f} MB)") # Memory monitoring and cleanup before SAM2 initialization num_frames = len(eye_frames) # Store count before freeing first_frame = eye_frames[0].copy() # Copy first frame for detection before freeing self._print_memory_step(f"Before SAM2 init ({eye_name} eye, {num_frames} frames)") # CRITICAL: Explicitly free eye_frames from memory before SAM2 loads the same video # This prevents the OOM issue where both Python frames and SAM2 frames exist simultaneously del eye_frames # Free the frames array self._aggressive_memory_cleanup(f"SAM2 init for {eye_name} eye") # Initialize SAM2 with video path self._print_memory_step(f"Starting SAM2 init ({eye_name} eye)") self.sam2_model.init_video_state(video_path=str(temp_video_path)) self._print_memory_step(f"SAM2 initialized ({eye_name} eye)") # Detect persons in first frame detections = self.detector.detect_persons(first_frame) if not detections: warnings.warn(f"No persons detected in {eye_name} eye, chunk {chunk_idx}") # Return empty masks for the number of frames return self._create_empty_masks_from_count(num_frames, first_frame.shape) print(f"Detected {len(detections)} persons in {eye_name} eye first frame") # Convert to SAM2 prompts box_prompts, labels = self.detector.convert_to_sam_prompts(detections) # Add prompts object_ids = self.sam2_model.add_person_prompts(0, box_prompts, labels) # Propagate masks (most expensive operation) self._print_memory_step(f"Before SAM2 propagation ({eye_name} eye, {num_frames} frames)") video_segments = self.sam2_model.propagate_masks( start_frame=0, max_frames=num_frames ) self._print_memory_step(f"After SAM2 propagation ({eye_name} eye)") # Apply masks - need to reload frames from temp video since we freed the original frames self._print_memory_step(f"Before reloading frames for mask application ({eye_name} eye)") # Read frames back from the temp video for mask application cap = cv2.VideoCapture(str(temp_video_path)) reloaded_frames = [] for frame_idx in range(num_frames): ret, frame = cap.read() if not ret: break reloaded_frames.append(frame) cap.release() self._print_memory_step(f"Reloaded {len(reloaded_frames)} frames for mask application") # Apply masks matted_frames = [] for frame_idx, frame in enumerate(reloaded_frames): if frame_idx in video_segments: frame_masks = video_segments[frame_idx] combined_mask = self.sam2_model.get_combined_mask(frame_masks) matted_frame = self.sam2_model.apply_mask_to_frame( frame, combined_mask, output_format=self.config.output.format, background_color=self.config.output.background_color ) else: matted_frame = self._create_empty_mask_frame(frame) matted_frames.append(matted_frame) # Free reloaded frames del reloaded_frames self._aggressive_memory_cleanup(f"After mask application ({eye_name} eye)") return matted_frames finally: # Always cleanup self.sam2_model.cleanup() # Remove temporary video file try: if temp_video_path.exists(): temp_video_path.unlink() except Exception as e: warnings.warn(f"Failed to cleanup temp video {temp_video_path}: {e}") def _process_eye_sequence_with_validation(self, right_eye_frames: List[np.ndarray], left_eye_results: List[np.ndarray], eye_name: str, chunk_idx: int) -> List[np.ndarray]: """ Process right eye with validation against left eye results Args: right_eye_frames: Right eye frame sequence left_eye_results: Processed left eye results for validation eye_name: Eye identifier chunk_idx: Chunk index Returns: Processed right eye frames """ # For now, process right eye independently # TODO: Implement stereo consistency validation right_matted = self._process_eye_sequence(right_eye_frames, eye_name, chunk_idx) # Apply stereo consistency checks validated_results = self._validate_stereo_consistency( left_eye_results, right_matted ) return validated_results def _validate_stereo_consistency(self, left_results: List[np.ndarray], right_results: List[np.ndarray]) -> List[np.ndarray]: """ Validate and correct stereo consistency between left and right eye results Args: left_results: Left eye processed frames right_results: Right eye processed frames Returns: Validated right eye frames """ validated_frames = [] for i, (left_frame, right_frame) in enumerate(zip(left_results, right_results)): # Simple validation: check if mask areas are similar left_mask_area = self._get_mask_area(left_frame) right_mask_area = self._get_mask_area(right_frame) # If areas differ significantly, apply correction area_ratio = right_mask_area / (left_mask_area + 1e-6) if area_ratio < 0.5 or area_ratio > 2.0: # Significant difference - apply correction corrected_frame = self._apply_stereo_correction( left_frame, right_frame, area_ratio ) validated_frames.append(corrected_frame) else: validated_frames.append(right_frame) return validated_frames def _create_empty_masks_from_count(self, num_frames: int, frame_shape: tuple) -> List[np.ndarray]: """Create empty masks when no persons detected (without frame array)""" empty_frames = [] for _ in range(num_frames): if self.config.output.format == "alpha": # Transparent output output = np.zeros((frame_shape[0], frame_shape[1], 4), dtype=np.uint8) else: # Green screen background output = np.full((frame_shape[0], frame_shape[1], 3), self.config.output.background_color, dtype=np.uint8) empty_frames.append(output) return empty_frames def _get_mask_area(self, frame: np.ndarray) -> float: """Get mask area from processed frame""" if frame.shape[2] == 4: # Alpha channel mask = frame[:, :, 3] > 0 else: # Green screen - detect non-background pixels bg_color = np.array(self.config.output.background_color) diff = np.abs(frame.astype(np.float32) - bg_color).sum(axis=2) mask = diff > 30 # Threshold for non-background return np.sum(mask) def _apply_stereo_correction(self, left_frame: np.ndarray, right_frame: np.ndarray, area_ratio: float) -> np.ndarray: """ Apply stereo correction to right frame based on left frame This is a simplified correction - in production, you'd use proper disparity mapping and stereo geometry """ # For now, return the right frame as-is # TODO: Implement proper stereo correction algorithm return right_frame def process_chunk(self, frames: List[np.ndarray], chunk_idx: int = 0) -> List[np.ndarray]: """ Override parent method to handle VR180-specific processing Args: frames: List of SBS frames to process chunk_idx: Chunk index for logging Returns: List of processed frames """ if not frames: return [] # Analyze SBS layout if not done yet if self.sbs_split_point == 0: sample_frame = frames[0] self.sbs_split_point = sample_frame.shape[1] // 2 # Choose processing method based on configuration if self.config.matting.use_disparity_mapping: return self.process_with_disparity_mapping(frames, chunk_idx) else: # Process each eye independently and combine return self._process_eyes_independently(frames, chunk_idx) def _process_eyes_independently(self, frames: List[np.ndarray], chunk_idx: int) -> List[np.ndarray]: """Process left and right eyes independently""" print(f"Processing chunk {chunk_idx} with independent eye processing") # Split frames left_eye_frames = [] right_eye_frames = [] for frame in frames: left, right = self.split_sbs_frame(frame) left_eye_frames.append(left) right_eye_frames.append(right) # Process each eye print("Processing left eye...") left_matted = self._process_eye_sequence(left_eye_frames, "left", chunk_idx) print("Processing right eye...") right_matted = self._process_eye_sequence(right_eye_frames, "right", chunk_idx) # Combine results combined_frames = [] for left_frame, right_frame in zip(left_matted, right_matted): if self.config.output.maintain_sbs: combined = self.combine_sbs_frame(left_frame, right_frame) else: combined = {'left': left_frame, 'right': right_frame} combined_frames.append(combined) return combined_frames def save_video(self, frames: List[np.ndarray], output_path: str): """ Override parent method to handle VR180-specific output formats Args: frames: List of processed frames output_path: Output path """ if not frames: raise ValueError("No frames to save") output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # Check if frames are in separate eye format if isinstance(frames[0], dict) and 'left' in frames[0]: # Save separate eye videos self._save_separate_eye_videos(frames, output_path) else: # Save as combined SBS video super().save_video(frames, str(output_path)) def _save_separate_eye_videos(self, frames: List[Dict[str, np.ndarray]], output_path: Path): """Save left and right eye videos separately""" left_frames = [frame['left'] for frame in frames] right_frames = [frame['right'] for frame in frames] # Save left eye left_path = output_path.parent / f"{output_path.stem}_left{output_path.suffix}" super().save_video(left_frames, str(left_path)) # Save right eye right_path = output_path.parent / f"{output_path.stem}_right{output_path.suffix}" super().save_video(right_frames, str(right_path)) print(f"Saved separate eye videos: {left_path}, {right_path}") def process_video(self) -> None: """ Override parent method to add VR180-specific initialization """ print("Starting VR180 video processing...") # Load video info and analyze SBS layout self.load_video_info(self.config.input.video_path) self.analyze_sbs_layout() # Continue with parent processing super().process_video()