""" YOLO detector module for human detection in video segments. Preserves the core detection logic from the original implementation. """ import os import cv2 import numpy as np import logging from typing import List, Dict, Any, Optional, Tuple from ultralytics import YOLO logger = logging.getLogger(__name__) class YOLODetector: """Handles YOLO-based human detection for video segments with support for both detection and segmentation modes.""" def __init__(self, detection_model_path: str = None, segmentation_model_path: str = None, mode: str = "detection", confidence_threshold: float = 0.6, human_class_id: int = 0): """ Initialize YOLO detector with support for both detection and segmentation modes. Args: detection_model_path: Path to YOLO detection model weights (e.g., yolov8n.pt) segmentation_model_path: Path to YOLO segmentation model weights (e.g., yolov8n-seg.pt) mode: Detection mode - "detection" for bboxes, "segmentation" for masks confidence_threshold: Detection confidence threshold human_class_id: COCO class ID for humans (0 = person) """ self.mode = mode self.confidence_threshold = confidence_threshold self.human_class_id = human_class_id # Select model path based on mode if mode == "segmentation": if not segmentation_model_path: raise ValueError("segmentation_model_path required for segmentation mode") self.model_path = segmentation_model_path self.supports_segmentation = True elif mode == "detection": if not detection_model_path: raise ValueError("detection_model_path required for detection mode") self.model_path = detection_model_path self.supports_segmentation = False else: raise ValueError(f"Invalid mode: {mode}. Must be 'detection' or 'segmentation'") # Load YOLO model try: self.model = YOLO(self.model_path) logger.info(f"Loaded YOLO model in {mode} mode from {self.model_path}") # Verify model capabilities if mode == "segmentation": # Test if model actually supports segmentation logger.info(f"YOLO Segmentation: Model loaded, will output direct masks") else: logger.info(f"YOLO Detection: Model loaded, will output bounding boxes") except Exception as e: logger.error(f"Failed to load YOLO model: {e}") raise def detect_humans_in_frame(self, frame: np.ndarray) -> List[Dict[str, Any]]: """ Detect humans in a single frame using YOLO. Args: frame: Input frame (BGR format from OpenCV) Returns: List of human detection dictionaries with bbox, confidence, and optionally masks """ # Run YOLO detection/segmentation results = self.model(frame, conf=self.confidence_threshold, verbose=False) human_detections = [] # Process results for result in results: boxes = result.boxes masks = result.masks if hasattr(result, 'masks') and result.masks is not None else None if boxes is not None: for i, box in enumerate(boxes): # Get class ID cls = int(box.cls.cpu().numpy()[0]) # Check if it's a person (human_class_id) if cls == self.human_class_id: # Get bounding box coordinates (x1, y1, x2, y2) coords = box.xyxy[0].cpu().numpy() conf = float(box.conf.cpu().numpy()[0]) detection = { 'bbox': coords, 'confidence': conf, 'has_mask': False, 'mask': None } # Extract mask if available (segmentation mode) if masks is not None and i < len(masks.data): mask_data = masks.data[i].cpu().numpy() # Get mask for this detection detection['has_mask'] = True detection['mask'] = mask_data logger.debug(f"YOLO Segmentation: Detected human with mask - conf={conf:.2f}, mask_shape={mask_data.shape}") else: logger.debug(f"YOLO Detection: Detected human with bbox - conf={conf:.2f}, bbox={coords}") human_detections.append(detection) if self.supports_segmentation: masks_found = sum(1 for d in human_detections if d['has_mask']) logger.info(f"YOLO Segmentation: Found {len(human_detections)} humans, {masks_found} with masks") else: logger.debug(f"YOLO Detection: Found {len(human_detections)} humans with bounding boxes") return human_detections def detect_humans_in_video_first_frame(self, video_path: str, scale: float = 1.0) -> List[Dict[str, Any]]: """ Detect humans in the first frame of a video. Args: video_path: Path to video file scale: Scale factor for frame processing Returns: List of human detection dictionaries """ if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return [] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Could not open video: {video_path}") return [] ret, frame = cap.read() cap.release() if not ret: logger.error(f"Could not read first frame from: {video_path}") return [] # Scale frame if needed if scale != 1.0: frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) return self.detect_humans_in_frame(frame) def save_detections_to_file(self, detections: List[Dict[str, Any]], output_path: str) -> bool: """ Save detection results to file. Args: detections: List of detection dictionaries output_path: Path to save detections Returns: True if saved successfully """ try: with open(output_path, 'w') as f: f.write("# YOLO Human Detections\\n") if detections: for detection in detections: bbox = detection['bbox'] conf = detection['confidence'] f.write(f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]},{conf}\\n") logger.info(f"Saved {len(detections)} detections to {output_path}") else: f.write("# No humans detected\\n") logger.info(f"Saved empty detection file to {output_path}") return True except Exception as e: logger.error(f"Failed to save detections to {output_path}: {e}") return False def load_detections_from_file(self, file_path: str) -> List[Dict[str, Any]]: """ Load detection results from file. Args: file_path: Path to detection file Returns: List of detection dictionaries """ detections = [] if not os.path.exists(file_path): logger.warning(f"Detection file not found: {file_path}") return detections try: with open(file_path, 'r') as f: content = f.read() # Handle files with literal \n characters if '\\n' in content: lines = content.split('\\n') else: lines = content.split('\n') for line in lines: line = line.strip() # Skip comments and empty lines if line.startswith('#') or not line: continue # Parse detection line: x1,y1,x2,y2,confidence parts = line.split(',') if len(parts) == 5: try: bbox = [float(x) for x in parts[:4]] conf = float(parts[4]) detections.append({ 'bbox': np.array(bbox), 'confidence': conf }) except ValueError: logger.warning(f"Invalid detection line: {line}") continue logger.info(f"Loaded {len(detections)} detections from {file_path}") except Exception as e: logger.error(f"Failed to load detections from {file_path}: {e}") return detections def debug_detect_with_lower_confidence(self, frame: np.ndarray, debug_confidence: float = 0.3) -> List[Dict[str, Any]]: """ Run YOLO detection with a lower confidence threshold for debugging. This helps identify if detections are being missed due to high confidence threshold. Args: frame: Input frame (BGR format from OpenCV) debug_confidence: Lower confidence threshold for debugging Returns: List of human detection dictionaries with lower confidence threshold """ logger.info(f"VR180 Debug: Running YOLO with lower confidence {debug_confidence} (vs normal {self.confidence_threshold})") # Run YOLO detection with lower confidence results = self.model(frame, conf=debug_confidence, verbose=False) debug_detections = [] # Process results for result in results: boxes = result.boxes if boxes is not None: for box in boxes: # Get class ID cls = int(box.cls.cpu().numpy()[0]) # Check if it's a person (human_class_id) if cls == self.human_class_id: # Get bounding box coordinates (x1, y1, x2, y2) coords = box.xyxy[0].cpu().numpy() conf = float(box.conf.cpu().numpy()[0]) debug_detections.append({ 'bbox': coords, 'confidence': conf }) logger.info(f"VR180 Debug: Lower confidence detection found {len(debug_detections)} total detections") return debug_detections def detect_humans_multi_frame(self, video_path: str, frame_indices: List[int], scale: float = 1.0) -> Dict[int, List[Dict[str, Any]]]: """ Detect humans at multiple specific frame indices in a video. Used for mid-segment re-detection to improve SAM2 tracking. Args: video_path: Path to video file frame_indices: List of frame indices to run detection on (e.g., [0, 30, 60, 90]) scale: Scale factor for frame processing Returns: Dictionary mapping frame_index -> list of detection dictionaries """ if not frame_indices: logger.warning("No frame indices provided for multi-frame detection") return {} if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return {} logger.info(f"Mid-segment Detection: Running YOLO on {len(frame_indices)} frames: {frame_indices}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Could not open video: {video_path}") return {} total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 # Filter out frame indices that are beyond video length valid_frame_indices = [idx for idx in frame_indices if 0 <= idx < total_frames] if len(valid_frame_indices) != len(frame_indices): invalid_frames = [idx for idx in frame_indices if idx not in valid_frame_indices] logger.warning(f"Mid-segment Detection: Skipping invalid frame indices: {invalid_frames} (video has {total_frames} frames)") multi_frame_detections = {} for frame_idx in valid_frame_indices: # Seek to specific frame cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if not ret: logger.warning(f"Mid-segment Detection: Could not read frame {frame_idx}") continue # Scale frame if needed if scale != 1.0: frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) # Run YOLO detection on this frame detections = self.detect_humans_in_frame(frame) multi_frame_detections[frame_idx] = detections # Log detection results time_seconds = frame_idx / fps logger.info(f"Mid-segment Detection: Frame {frame_idx} (t={time_seconds:.1f}s): {len(detections)} humans detected") for i, detection in enumerate(detections): bbox = detection['bbox'] conf = detection['confidence'] logger.debug(f"Mid-segment Detection: Frame {frame_idx}, Human {i+1}: bbox={bbox}, conf={conf:.3f}") cap.release() total_detections = sum(len(dets) for dets in multi_frame_detections.values()) logger.info(f"Mid-segment Detection: Complete - {total_detections} total detections across {len(valid_frame_indices)} frames") return multi_frame_detections def process_segments_batch(self, segments_info: List[dict], detect_segments: List[int], scale: float = 0.5) -> Dict[int, List[Dict[str, Any]]]: """ Process multiple segments for human detection. Args: segments_info: List of segment information dictionaries detect_segments: List of segment indices to process scale: Scale factor for processing Returns: Dictionary mapping segment index to detection results """ results = {} for segment_info in segments_info: segment_idx = segment_info['index'] # Skip if not in detect_segments list if detect_segments != 'all' and segment_idx not in detect_segments: continue video_path = segment_info['video_file'] detection_file = os.path.join(segment_info['directory'], "yolo_detections") # Skip if already processed if os.path.exists(detection_file): logger.info(f"Segment {segment_idx} already has detections, skipping") detections = self.load_detections_from_file(detection_file) results[segment_idx] = detections continue # Run detection logger.info(f"Processing segment {segment_idx} for human detection") detections = self.detect_humans_in_video_first_frame(video_path, scale) # Save results self.save_detections_to_file(detections, detection_file) results[segment_idx] = detections return results def convert_detections_to_sam2_prompts(self, detections: List[Dict[str, Any]], frame_width: int) -> List[Dict[str, Any]]: """ Convert YOLO detections to SAM2-compatible prompts for VR180 SBS video. For VR180, we expect 2 real detections (left and right eye views), not mirrored ones. Args: detections: List of YOLO detection results frame_width: Width of the video frame Returns: List of SAM2 prompt dictionaries with obj_id and bbox """ if not detections: logger.warning("No detections provided for SAM2 prompt conversion") return [] half_frame_width = frame_width // 2 prompts = [] logger.info(f"VR180 SBS Debug: Converting {len(detections)} detections for frame width {frame_width}") logger.info(f"VR180 SBS Debug: Half frame width = {half_frame_width}") # Sort detections by x-coordinate to get consistent left/right assignment sorted_detections = sorted(detections, key=lambda x: x['bbox'][0]) # Analyze detections by frame half left_detections = [] right_detections = [] for i, detection in enumerate(sorted_detections): bbox = detection['bbox'].copy() center_x = (bbox[0] + bbox[2]) / 2 pixel_range = f"{bbox[0]:.0f}-{bbox[2]:.0f}" if center_x < half_frame_width: left_detections.append((detection, i, pixel_range)) side = "LEFT" else: right_detections.append((detection, i, pixel_range)) side = "RIGHT" logger.info(f"VR180 SBS Debug: Detection {i}: pixels {pixel_range}, center_x={center_x:.1f}, side={side}") # VR180 SBS Format Validation logger.info(f"VR180 SBS Debug: Found {len(left_detections)} LEFT detections, {len(right_detections)} RIGHT detections") # Analyze confidence scores if left_detections: left_confidences = [det[0]['confidence'] for det in left_detections] logger.info(f"VR180 SBS Debug: LEFT eye confidences: {[f'{c:.3f}' for c in left_confidences]}") if right_detections: right_confidences = [det[0]['confidence'] for det in right_detections] logger.info(f"VR180 SBS Debug: RIGHT eye confidences: {[f'{c:.3f}' for c in right_confidences]}") if len(right_detections) == 0: logger.warning(f"VR180 SBS Warning: No detections found in RIGHT eye view (pixels {half_frame_width}-{frame_width})") logger.warning(f"VR180 SBS Warning: This may indicate:") logger.warning(f" 1. Person not visible in right eye view") logger.warning(f" 2. YOLO confidence threshold ({self.confidence_threshold}) too high") logger.warning(f" 3. VR180 SBS format issue") logger.warning(f" 4. Right eye view quality/lighting problems") logger.warning(f"VR180 SBS Suggestion: Try lowering yolo_confidence to 0.3-0.4 in config") if len(left_detections) == 0: logger.warning(f"VR180 SBS Warning: No detections found in LEFT eye view (pixels 0-{half_frame_width})") # Additional validation for VR180 SBS expectations total_detections = len(left_detections) + len(right_detections) if total_detections == 1: logger.warning(f"VR180 SBS Warning: Only 1 detection found - expected 2 for proper VR180 SBS") elif total_detections > 2: logger.warning(f"VR180 SBS Warning: {total_detections} detections found - will use only first 2") # Assign object IDs sequentially, regardless of which half they're in # This ensures we always get Object 1 and Object 2 for up to 2 detections obj_id = 1 # Process up to 2 detections total (left + right combined) all_detections = sorted_detections[:2] for i, detection in enumerate(all_detections): bbox = detection['bbox'].copy() center_x = (bbox[0] + bbox[2]) / 2 pixel_range = f"{bbox[0]:.0f}-{bbox[2]:.0f}" # Determine which eye view this detection is in if center_x < half_frame_width: eye_view = "LEFT" else: eye_view = "RIGHT" prompts.append({ 'obj_id': obj_id, 'bbox': bbox, 'confidence': detection['confidence'] }) logger.info(f"VR180 SBS Debug: Added {eye_view} eye detection as SAM2 Object {obj_id}") logger.info(f"VR180 SBS Debug: Object {obj_id} bbox: {bbox} (pixels {pixel_range})") obj_id += 1 logger.info(f"VR180 SBS Debug: Final result - {len(detections)} YOLO detections → {len(prompts)} SAM2 prompts") # Verify we have the expected objects obj_ids = [p['obj_id'] for p in prompts] logger.info(f"VR180 SBS Debug: SAM2 Object IDs created: {obj_ids}") return prompts def convert_yolo_masks_to_video_segments(self, detections: List[Dict[str, Any]], frame_width: int, target_frame_shape: Tuple[int, int] = None) -> Optional[Dict[int, Dict[int, np.ndarray]]]: """ Convert YOLO segmentation masks to SAM2-compatible video segments format. This allows using YOLO masks directly without SAM2 processing. Args: detections: List of YOLO detection results with masks frame_width: Width of the video frame for VR180 object ID assignment target_frame_shape: Target shape (height, width) for mask resizing Returns: Video segments dictionary compatible with SAM2 output format, or None if no masks """ if not detections: logger.warning("No detections provided for mask conversion") return None # Check if any detections have masks detections_with_masks = [d for d in detections if d.get('has_mask', False)] if not detections_with_masks: logger.warning("No detections have masks - YOLO segmentation may not be working") return None logger.info(f"YOLO Mask Conversion: Converting {len(detections_with_masks)} YOLO masks to video segments format") half_frame_width = frame_width // 2 video_segments = {} # Create frame 0 with converted masks frame_masks = {} obj_id = 1 # Sort detections by x-coordinate for consistent VR180 SBS assignment sorted_detections = sorted(detections_with_masks, key=lambda x: x['bbox'][0]) for i, detection in enumerate(sorted_detections[:2]): # Take up to 2 humans mask = detection['mask'] bbox = detection['bbox'] center_x = (bbox[0] + bbox[2]) / 2 # Assign sequential object IDs (similar to prompt conversion logic) current_obj_id = obj_id # Determine which eye view for logging if center_x < half_frame_width: eye_view = "LEFT" else: eye_view = "RIGHT" # Resize mask to target frame shape if specified if target_frame_shape and mask.shape != target_frame_shape: mask_resized = cv2.resize(mask.astype(np.float32), (target_frame_shape[1], target_frame_shape[0]), interpolation=cv2.INTER_NEAREST) mask = (mask_resized > 0.5).astype(bool) else: mask = mask.astype(bool) frame_masks[current_obj_id] = mask logger.info(f"YOLO Mask Conversion: {eye_view} eye detection -> Object {current_obj_id}, mask_shape={mask.shape}, pixels={np.sum(mask)}") obj_id += 1 # Always increment for next detection # Store masks in video segments format (single frame) video_segments[0] = frame_masks total_objects = len(frame_masks) total_pixels = sum(np.sum(mask) for mask in frame_masks.values()) logger.info(f"YOLO Mask Conversion: Created video segments with {total_objects} objects, {total_pixels} total mask pixels") return video_segments def save_debug_frame_with_detections(self, frame: np.ndarray, detections: List[Dict[str, Any]], output_path: str, prompts: List[Dict[str, Any]] = None) -> bool: """ Save a debug frame with YOLO detections and SAM2 prompts overlaid as bounding boxes. Args: frame: Input frame (BGR format from OpenCV) detections: List of detection dictionaries with bbox and confidence output_path: Path to save the debug image prompts: Optional list of SAM2 prompt dictionaries with obj_id and bbox Returns: True if saved successfully """ try: debug_frame = frame.copy() # Draw masks (if available) or bounding boxes for each detection for i, detection in enumerate(detections): bbox = detection['bbox'] confidence = detection['confidence'] has_mask = detection.get('has_mask', False) # Extract coordinates x1, y1, x2, y2 = map(int, bbox) # Choose color based on confidence (green for high, yellow for medium, red for low) if confidence >= 0.8: color = (0, 255, 0) # Green elif confidence >= 0.6: color = (0, 255, 255) # Yellow else: color = (0, 0, 255) # Red if has_mask and 'mask' in detection: # Draw segmentation mask mask = detection['mask'] # Resize mask to match frame if needed if mask.shape != debug_frame.shape[:2]: mask = cv2.resize(mask.astype(np.float32), (debug_frame.shape[1], debug_frame.shape[0]), interpolation=cv2.INTER_NEAREST) mask = mask > 0.5 mask = mask.astype(bool) # Apply colored overlay with transparency overlay = debug_frame.copy() overlay[mask] = color cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame) # Draw mask outline contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(debug_frame, contours, -1, color, 2) # Prepare label text for segmentation label = f"Person {i+1}: {confidence:.2f} (MASK)" else: # Draw bounding box (detection mode or no mask available) cv2.rectangle(debug_frame, (x1, y1), (x2, y2), color, 2) # Prepare label text for detection label = f"Person {i+1}: {confidence:.2f} (BBOX)" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] # Draw label background cv2.rectangle(debug_frame, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), color, -1) # Draw label text cv2.putText(debug_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Draw SAM2 prompts if provided (with different colors/style) if prompts: for prompt in prompts: obj_id = prompt['obj_id'] bbox = prompt['bbox'] # Extract coordinates x1, y1, x2, y2 = map(int, bbox) # Use different colors for each object ID if obj_id == 1: prompt_color = (0, 255, 0) # Green for Object 1 elif obj_id == 2: prompt_color = (255, 0, 0) # Blue for Object 2 else: prompt_color = (255, 255, 0) # Cyan for others # Draw thicker, dashed-style border for SAM2 prompts thickness = 3 cv2.rectangle(debug_frame, (x1-2, y1-2), (x2+2, y2+2), prompt_color, thickness) # Add SAM2 object ID label sam_label = f"SAM2 Obj {obj_id}" label_size = cv2.getTextSize(sam_label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] # Draw label background cv2.rectangle(debug_frame, (x1-2, y2+5), (x1-2 + label_size[0], y2+5 + label_size[1] + 5), prompt_color, -1) # Draw label text cv2.putText(debug_frame, sam_label, (x1-2, y2+5 + label_size[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # Draw VR180 SBS boundary line (center line separating left and right eye views) frame_height, frame_width = debug_frame.shape[:2] center_x = frame_width // 2 cv2.line(debug_frame, (center_x, 0), (center_x, frame_height), (0, 255, 255), 3) # Yellow line # Add VR180 SBS labels cv2.putText(debug_frame, "LEFT EYE", (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) cv2.putText(debug_frame, "RIGHT EYE", (center_x + 10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # Add summary text at top with mode information mode_text = f"YOLO Mode: {self.mode.upper()}" masks_available = sum(1 for d in detections if d.get('has_mask', False)) if self.supports_segmentation and masks_available > 0: summary = f"VR180 SBS: {len(detections)} detections → {masks_available} MASKS (for SAM2 propagation)" else: summary = f"VR180 SBS: {len(detections)} detections → {len(prompts) if prompts else 0} SAM2 prompts" cv2.putText(debug_frame, mode_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # Yellow for mode cv2.putText(debug_frame, summary, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) # Add frame dimensions info dims_info = f"Frame: {frame_width}x{frame_height}, Center: {center_x}" cv2.putText(debug_frame, dims_info, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Save debug frame success = cv2.imwrite(output_path, debug_frame) if success: logger.info(f"Saved YOLO debug frame to {output_path}") else: logger.error(f"Failed to save debug frame to {output_path}") return success except Exception as e: logger.error(f"Error creating debug frame: {e}") return False