#!/usr/bin/env python3 """ Main entry point for YOLO + SAM2 video processing pipeline. Processes long videos by splitting into segments, detecting humans with YOLO, and creating green screen masks with SAM2. """ import os import sys import argparse import cv2 import numpy as np from typing import List # Add project root to path sys.path.append(os.path.dirname(__file__)) from core.config_loader import ConfigLoader from core.video_splitter import VideoSplitter from core.yolo_detector import YOLODetector from core.sam2_processor import SAM2Processor from core.mask_processor import MaskProcessor from core.video_assembler import VideoAssembler from utils.logging_utils import setup_logging, get_logger from utils.file_utils import ensure_directory from utils.status_utils import print_processing_status, cleanup_incomplete_segment logger = get_logger(__name__) def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="YOLO + SAM2 Video Processing Pipeline" ) parser.add_argument( "--config", type=str, required=True, help="Path to YAML configuration file" ) parser.add_argument( "--log-file", type=str, help="Optional log file path" ) parser.add_argument( "--status", action="store_true", help="Show processing status and exit" ) parser.add_argument( "--cleanup-segment", type=int, help="Clean up a specific segment for restart (segment index)" ) return parser.parse_args() def validate_dependencies(): """Validate that required dependencies are available.""" try: import torch import cv2 import numpy as np import cupy as cp from ultralytics import YOLO from sam2.build_sam import build_sam2_video_predictor logger.info("All dependencies validated successfully") return True except ImportError as e: logger.error(f"Missing dependency: {e}") logger.error("Please install requirements: pip install -r requirements.txt") return False def create_yolo_mask_debug_frame(detections: List[dict], video_path: str, output_path: str, scale: float = 1.0) -> bool: """ Create debug visualization for YOLO direct masks. Args: detections: List of YOLO detections with masks video_path: Path to video file output_path: Path to save debug image scale: Scale factor for frame processing Returns: True if debug frame was created successfully """ try: # Load first frame cap = cv2.VideoCapture(video_path) ret, original_frame = cap.read() cap.release() if not ret: logger.error("Could not read first frame for YOLO mask debug") return False # Scale frame if needed if scale != 1.0: original_frame = cv2.resize(original_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) debug_frame = original_frame.copy() # Define colors for each object colors = { 1: (0, 255, 0), # Green for Object 1 (Left eye) 2: (255, 0, 0), # Blue for Object 2 (Right eye) } # Get detections with masks detections_with_masks = [d for d in detections if d.get('has_mask', False)] # Overlay masks with transparency obj_id = 1 for detection in detections_with_masks[:2]: # Up to 2 objects mask = detection['mask'] # Resize mask to match frame if needed if mask.shape != original_frame.shape[:2]: mask = cv2.resize(mask.astype(np.float32), (original_frame.shape[1], original_frame.shape[0]), interpolation=cv2.INTER_NEAREST) mask = mask > 0.5 mask = mask.astype(bool) # Apply colored overlay color = colors.get(obj_id, (128, 128, 128)) overlay = debug_frame.copy() overlay[mask] = color # Blend with original (30% overlay, 70% original) cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame) # Draw outline contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(debug_frame, contours, -1, color, 2) logger.info(f"YOLO Mask Debug: Object {obj_id} mask - shape: {mask.shape}, pixels: {np.sum(mask)}") obj_id += 1 # Add title and source info title = f"YOLO Direct Masks: {len(detections_with_masks)} objects detected" cv2.putText(debug_frame, title, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) source_info = "Mask Source: YOLO Segmentation (DIRECT - No SAM2)" cv2.putText(debug_frame, source_info, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # Green for YOLO # Add object legend y_offset = 90 for i, detection in enumerate(detections_with_masks[:2]): obj_id = i + 1 color = colors.get(obj_id, (128, 128, 128)) text = f"Object {obj_id}: {'Left Eye' if obj_id == 1 else 'Right Eye'} (YOLO Mask)" cv2.putText(debug_frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) y_offset += 30 # Save debug image success = cv2.imwrite(output_path, debug_frame) if success: logger.info(f"YOLO Mask Debug: Saved debug frame to {output_path}") else: logger.error(f"Failed to save YOLO mask debug frame to {output_path}") return success except Exception as e: logger.error(f"Error creating YOLO mask debug frame: {e}") return False def resolve_detect_segments(detect_segments, total_segments: int) -> List[int]: """ Resolve detect_segments configuration to list of segment indices. Args: detect_segments: Configuration value ("all", list, or None) total_segments: Total number of segments Returns: List of segment indices to process """ if detect_segments == "all" or detect_segments is None: return list(range(total_segments)) elif isinstance(detect_segments, list): # Filter out invalid segment indices valid_segments = [s for s in detect_segments if 0 <= s < total_segments] if len(valid_segments) != len(detect_segments): logger.warning(f"Some segment indices are invalid. Using: {valid_segments}") return valid_segments else: logger.warning(f"Invalid detect_segments format: {detect_segments}. Using all segments.") return list(range(total_segments)) def main(): """Main processing pipeline.""" args = parse_arguments() try: # Load configuration config = ConfigLoader(args.config) # Setup logging setup_logging(config.get_log_level(), args.log_file) # Handle status check if args.status: output_dir = config.get_output_directory() input_video = config.get_input_video_path() video_name = os.path.splitext(os.path.basename(input_video))[0] segments_dir = os.path.join(output_dir, f"{video_name}_segments") print_processing_status(segments_dir) return 0 # Handle segment cleanup if args.cleanup_segment is not None: output_dir = config.get_output_directory() input_video = config.get_input_video_path() video_name = os.path.splitext(os.path.basename(input_video))[0] segments_dir = os.path.join(output_dir, f"{video_name}_segments") segment_dir = os.path.join(segments_dir, f"segment_{args.cleanup_segment}") if cleanup_incomplete_segment(segment_dir): logger.info(f"Successfully cleaned up segment {args.cleanup_segment}") return 0 else: logger.error(f"Failed to clean up segment {args.cleanup_segment}") return 1 logger.info("Starting YOLO + SAM2 video processing pipeline") # Validate dependencies if not validate_dependencies(): return 1 # Validate input video exists input_video = config.get_input_video_path() if not os.path.exists(input_video): logger.error(f"Input video not found: {input_video}") return 1 # Setup output directory output_dir = config.get_output_directory() ensure_directory(output_dir) # Step 1: Split video into segments logger.info("Step 1: Splitting video into segments") splitter = VideoSplitter( segment_duration=config.get_segment_duration(), force_keyframes=config.get('video.force_keyframes', True) ) segments_dir, segment_dirs = splitter.split_video(input_video, output_dir) logger.info(f"Created {len(segment_dirs)} segments in {segments_dir}") # Get detailed segment information segments_info = splitter.get_segment_info(segments_dir) # Resolve which segments to process with YOLO detect_segments_config = config.get_detect_segments() detect_segments = resolve_detect_segments(detect_segments_config, len(segments_info)) # Initialize processors once logger.info("Step 2: Initializing YOLO detector") # Get YOLO mode and model paths yolo_mode = config.get('models.yolo_mode', 'detection') detection_model = config.get('models.yolo_detection_model', config.get_yolo_model_path()) segmentation_model = config.get('models.yolo_segmentation_model', None) logger.info(f"YOLO Mode: {yolo_mode}") detector = YOLODetector( detection_model_path=detection_model, segmentation_model_path=segmentation_model, mode=yolo_mode, confidence_threshold=config.get_yolo_confidence(), human_class_id=config.get_human_class_id() ) logger.info("Step 3: Initializing SAM2 processor") sam2_processor = SAM2Processor( checkpoint_path=config.get_sam2_checkpoint(), config_path=config.get_sam2_config(), vos_optimized=config.get('models.sam2_vos_optimized', False) ) # Initialize mask processor with quality enhancements mask_quality_config = config.get('mask_processing', {}) mask_processor = MaskProcessor( green_color=config.get_green_color(), blue_color=config.get_blue_color(), mask_quality_config=mask_quality_config ) # Process each segment sequentially (YOLO -> SAM2 -> Render) logger.info("Step 4: Processing segments sequentially") total_humans_detected = 0 for i, segment_info in enumerate(segments_info): segment_idx = segment_info['index'] logger.info(f"Processing segment {segment_idx}/{len(segments_info)-1}") # Reset temporal history for new segment mask_processor.reset_temporal_history() # Skip if segment output already exists output_video = os.path.join(segment_info['directory'], f"output_{segment_idx}.mp4") if os.path.exists(output_video): logger.info(f"Segment {segment_idx} already processed, skipping") continue # Determine if we should use YOLO detections or previous masks use_detections = segment_idx in detect_segments # First segment must use detections if segment_idx == 0 and not use_detections: logger.warning(f"First segment must use YOLO detection") use_detections = True # Get YOLO prompts or previous masks yolo_prompts = None previous_masks = None if use_detections: # Run YOLO detection on current segment logger.info(f"Running YOLO detection on segment {segment_idx}") detection_file = os.path.join(segment_info['directory'], "yolo_detections") # Check if detection already exists if os.path.exists(detection_file): logger.info(f"Loading existing YOLO detections for segment {segment_idx}") detections = detector.load_detections_from_file(detection_file) else: # Run YOLO detection on first frame detections = detector.detect_humans_in_video_first_frame( segment_info['video_file'], scale=config.get_inference_scale() ) # Save detections for future runs detector.save_detections_to_file(detections, detection_file) if detections: total_humans_detected += len(detections) logger.info(f"Found {len(detections)} humans in segment {segment_idx}") # Get frame width from video cap = cv2.VideoCapture(segment_info['video_file']) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) cap.release() yolo_prompts = detector.convert_detections_to_sam2_prompts( detections, frame_width ) # If no right eye detections found, run debug analysis with lower confidence half_frame_width = frame_width // 2 right_eye_detections = [d for d in detections if (d['bbox'][0] + d['bbox'][2]) / 2 >= half_frame_width] if len(right_eye_detections) == 0 and config.get('advanced.save_yolo_debug_frames', False): logger.info(f"VR180 Debug: No right eye detections found, running lower confidence analysis...") # Load first frame for debug analysis cap = cv2.VideoCapture(segment_info['video_file']) ret, debug_frame = cap.read() cap.release() if ret: # Scale frame to match detection scale if config.get_inference_scale() != 1.0: scale = config.get_inference_scale() debug_frame = cv2.resize(debug_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) # Run debug detection with lower confidence debug_detections = detector.debug_detect_with_lower_confidence(debug_frame, debug_confidence=0.3) # Analyze where these lower confidence detections are debug_right_eye = [d for d in debug_detections if (d['bbox'][0] + d['bbox'][2]) / 2 >= half_frame_width] if len(debug_right_eye) > 0: logger.warning(f"VR180 Debug: Found {len(debug_right_eye)} right eye detections with lower confidence!") for i, det in enumerate(debug_right_eye): logger.warning(f"VR180 Debug: Right eye detection {i+1}: conf={det['confidence']:.3f}, bbox={det['bbox']}") logger.warning(f"VR180 Debug: Consider lowering yolo_confidence from {config.get_yolo_confidence()} to 0.3-0.4") else: logger.info(f"VR180 Debug: No right eye detections found even with confidence 0.3") logger.info(f"VR180 Debug: This confirms person is not visible in right eye view") logger.info(f"Pipeline Debug: Segment {segment_idx} - Generated {len(yolo_prompts)} SAM2 prompts from {len(detections)} YOLO detections") # Save debug frame with detections visualized (if enabled) if config.get('advanced.save_yolo_debug_frames', False): debug_frame_path = os.path.join(segment_info['directory'], "yolo_debug.jpg") # Load first frame for debug visualization cap = cv2.VideoCapture(segment_info['video_file']) ret, debug_frame = cap.read() cap.release() if ret: # Scale frame to match detection scale if config.get_inference_scale() != 1.0: scale = config.get_inference_scale() debug_frame = cv2.resize(debug_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) detector.save_debug_frame_with_detections(debug_frame, detections, debug_frame_path, yolo_prompts) else: logger.warning(f"Could not load frame for debug visualization in segment {segment_idx}") # Check if we have YOLO masks for debug visualization has_yolo_masks = False if detections and detector.supports_segmentation: has_yolo_masks = any(d.get('has_mask', False) for d in detections) # Generate first frame masks debug (SAM2 or YOLO) first_frame_debug_path = os.path.join(segment_info['directory'], "first_frame_detection.jpg") if has_yolo_masks: logger.info(f"Pipeline Debug: Generating YOLO first frame masks for segment {segment_idx}") # Create YOLO mask debug visualization create_yolo_mask_debug_frame(detections, segment_info['video_file'], first_frame_debug_path, config.get_inference_scale()) else: logger.info(f"Pipeline Debug: Generating SAM2 first frame masks for segment {segment_idx}") sam2_processor.generate_first_frame_debug_masks( segment_info['video_file'], yolo_prompts, first_frame_debug_path, config.get_inference_scale() ) else: logger.warning(f"No humans detected in segment {segment_idx}") # Save debug frame even when no detections (if enabled) if config.get('advanced.save_yolo_debug_frames', False): debug_frame_path = os.path.join(segment_info['directory'], "yolo_debug_no_detections.jpg") # Load first frame for debug visualization cap = cv2.VideoCapture(segment_info['video_file']) ret, debug_frame = cap.read() cap.release() if ret: # Scale frame to match detection scale if config.get_inference_scale() != 1.0: scale = config.get_inference_scale() debug_frame = cv2.resize(debug_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) # Add "No detections" text overlay cv2.putText(debug_frame, "YOLO: No humans detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2) # Red text cv2.imwrite(debug_frame_path, debug_frame) logger.info(f"Saved no-detection debug frame to {debug_frame_path}") else: logger.warning(f"Could not load frame for no-detection debug visualization in segment {segment_idx}") elif segment_idx > 0: # Try to load previous segment mask for j in range(segment_idx - 1, -1, -1): prev_segment_dir = segments_info[j]['directory'] previous_masks = sam2_processor.load_previous_segment_mask(prev_segment_dir) if previous_masks: logger.info(f"Using masks from segment {j} for segment {segment_idx}") break if not yolo_prompts and not previous_masks: logger.error(f"No prompts or previous masks available for segment {segment_idx}") continue # Check if we have YOLO masks and can skip SAM2 (recheck in case detections were loaded from file) if not 'has_yolo_masks' in locals(): has_yolo_masks = False if detections and detector.supports_segmentation: has_yolo_masks = any(d.get('has_mask', False) for d in detections) if has_yolo_masks: logger.info(f"Pipeline Debug: YOLO segmentation provided masks - using as SAM2 initial masks for segment {segment_idx}") # Convert YOLO masks to initial masks for SAM2 cap = cv2.VideoCapture(segment_info['video_file']) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() # Convert YOLO masks to the format expected by SAM2 add_previous_masks_to_predictor yolo_masks_dict = {} for i, detection in enumerate(detections[:2]): # Up to 2 objects if detection.get('has_mask', False): mask = detection['mask'] # Resize mask to match inference scale if config.get_inference_scale() != 1.0: scale = config.get_inference_scale() scaled_height = int(frame_height * scale) scaled_width = int(frame_width * scale) mask = cv2.resize(mask.astype(np.float32), (scaled_width, scaled_height), interpolation=cv2.INTER_NEAREST) mask = mask > 0.5 obj_id = i + 1 # Sequential object IDs yolo_masks_dict[obj_id] = mask.astype(bool) logger.info(f"Pipeline Debug: YOLO mask for Object {obj_id} - shape: {mask.shape}, pixels: {np.sum(mask)}") logger.info(f"Pipeline Debug: Using YOLO masks as SAM2 initial masks - {len(yolo_masks_dict)} objects") # Use traditional SAM2 pipeline with YOLO masks as initial masks previous_masks = yolo_masks_dict yolo_prompts = None # Don't use bounding box prompts when we have masks # Debug what we're passing to SAM2 if yolo_prompts: logger.info(f"Pipeline Debug: Passing {len(yolo_prompts)} YOLO prompts to SAM2 for segment {segment_idx}") for i, prompt in enumerate(yolo_prompts): logger.info(f"Pipeline Debug: Prompt {i+1}: Object {prompt['obj_id']}, bbox={prompt['bbox']}") if previous_masks: logger.info(f"Pipeline Debug: Using {len(previous_masks)} previous masks for segment {segment_idx}") logger.info(f"Pipeline Debug: Previous mask object IDs: {list(previous_masks.keys())}") # Handle mid-segment detection if enabled (works for both detection and segmentation modes) multi_frame_prompts = None if config.get('advanced.enable_mid_segment_detection', False) and (yolo_prompts or has_yolo_masks): logger.info(f"Mid-segment Detection: Enabled for segment {segment_idx}") # Calculate frame indices for re-detection cap = cv2.VideoCapture(segment_info['video_file']) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 cap.release() redetection_interval = config.get('advanced.redetection_interval', 30) max_redetections = config.get('advanced.max_redetections_per_segment', 10) # Generate frame indices: [30, 60, 90, ...] (skip frame 0 since we already have first frame prompts) frame_indices = [] frame_idx = redetection_interval while frame_idx < total_frames and len(frame_indices) < max_redetections: frame_indices.append(frame_idx) frame_idx += redetection_interval if frame_indices: logger.info(f"Mid-segment Detection: Running YOLO on frames {frame_indices} (interval={redetection_interval})") # Run multi-frame detection multi_frame_detections = detector.detect_humans_multi_frame( segment_info['video_file'], frame_indices, scale=config.get_inference_scale() ) # Convert detections to SAM2 prompts (different handling for segmentation vs detection mode) multi_frame_prompts = {} cap = cv2.VideoCapture(segment_info['video_file']) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() for frame_idx, detections in multi_frame_detections.items(): if detections: if has_yolo_masks: # Segmentation mode: convert YOLO masks to SAM2 mask prompts frame_masks = {} for i, detection in enumerate(detections[:2]): # Up to 2 objects if detection.get('has_mask', False): mask = detection['mask'] # Resize mask to match inference scale if config.get_inference_scale() != 1.0: scale = config.get_inference_scale() scaled_height = int(frame_height * scale) scaled_width = int(frame_width * scale) mask = cv2.resize(mask.astype(np.float32), (scaled_width, scaled_height), interpolation=cv2.INTER_NEAREST) mask = mask > 0.5 obj_id = i + 1 # Sequential object IDs frame_masks[obj_id] = mask.astype(bool) logger.debug(f"Mid-segment Detection: Frame {frame_idx}, Object {obj_id} mask - shape: {mask.shape}, pixels: {np.sum(mask)}") if frame_masks: # Store as mask prompts (different format than bbox prompts) multi_frame_prompts[frame_idx] = {'masks': frame_masks} logger.info(f"Mid-segment Detection: Frame {frame_idx} -> {len(frame_masks)} YOLO masks") else: # Detection mode: convert to bounding box prompts (existing logic) prompts = detector.convert_detections_to_sam2_prompts(detections, frame_width) multi_frame_prompts[frame_idx] = prompts logger.info(f"Mid-segment Detection: Frame {frame_idx} -> {len(prompts)} SAM2 prompts") logger.info(f"Mid-segment Detection: Generated prompts for {len(multi_frame_prompts)} frames") else: logger.info(f"Mid-segment Detection: No additional frames to process (segment has {total_frames} frames)") elif config.get('advanced.enable_mid_segment_detection', False): logger.info(f"Mid-segment Detection: Skipped for segment {segment_idx} (no initial YOLO data)") # Process segment with SAM2 logger.info(f"Pipeline Debug: Starting SAM2 processing for segment {segment_idx}") video_segments = sam2_processor.process_single_segment( segment_info, yolo_prompts=yolo_prompts, previous_masks=previous_masks, inference_scale=config.get_inference_scale(), multi_frame_prompts=multi_frame_prompts ) if video_segments is None: logger.error(f"SAM2 processing failed for segment {segment_idx}") continue # Check if SAM2 produced adequate results if len(video_segments) == 0: logger.error(f"SAM2 produced no frames for segment {segment_idx}") continue elif len(video_segments) < 10: # Expected many frames for a 5-second segment logger.warning(f"SAM2 produced very few frames ({len(video_segments)}) for segment {segment_idx} - this may indicate propagation failure") # Debug what SAM2 produced logger.info(f"Pipeline Debug: SAM2 completed for segment {segment_idx}") logger.info(f"Pipeline Debug: Generated masks for {len(video_segments)} frames") if video_segments: # Check first frame to see what objects were tracked first_frame_idx = min(video_segments.keys()) first_frame_objects = video_segments[first_frame_idx] logger.info(f"Pipeline Debug: First frame contains {len(first_frame_objects)} tracked objects") logger.info(f"Pipeline Debug: Tracked object IDs: {list(first_frame_objects.keys())}") for obj_id, mask in first_frame_objects.items(): mask_pixels = np.sum(mask) logger.info(f"Pipeline Debug: Object {obj_id} mask has {mask_pixels} pixels") # Check last frame as well last_frame_idx = max(video_segments.keys()) last_frame_objects = video_segments[last_frame_idx] logger.info(f"Pipeline Debug: Last frame contains {len(last_frame_objects)} tracked objects") logger.info(f"Pipeline Debug: Final object IDs: {list(last_frame_objects.keys())}") # Save final masks for next segment mask_path = os.path.join(segment_info['directory'], "mask.png") sam2_processor.save_final_masks( video_segments, mask_path, green_color=config.get_green_color(), blue_color=config.get_blue_color() ) # Apply green screen and save output video success = mask_processor.process_segment( segment_info, video_segments, use_nvenc=config.get_use_nvenc(), bitrate=config.get_output_bitrate() ) if success: logger.info(f"Successfully processed segment {segment_idx}") else: logger.error(f"Failed to create green screen video for segment {segment_idx}") # Log processing summary logger.info(f"Sequential processing complete. Total humans detected: {total_humans_detected}") # Step 3: Assemble final video logger.info("Step 3: Assembling final video with audio") # Initialize video assembler assembler = VideoAssembler( preserve_audio=config.get_preserve_audio(), use_nvenc=config.get_use_nvenc() ) # Verify all segments are complete all_complete, missing = assembler.verify_segment_completeness(segments_dir) if not all_complete: logger.error(f"Cannot assemble video - missing segments: {missing}") return 1 # Assemble final video final_output = os.path.join(output_dir, config.get_output_filename()) success = assembler.assemble_final_video( segments_dir, input_video, final_output, bitrate=config.get_output_bitrate() ) if success: logger.info(f"Final video saved to: {final_output}") logger.info("Pipeline completed successfully") return 0 except Exception as e: logger.error(f"Pipeline failed: {e}", exc_info=True) return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)