""" YOLO detector module for human detection in video segments. Preserves the core detection logic from the original implementation. """ import os import cv2 import numpy as np import logging from typing import List, Dict, Any, Optional, Tuple from ultralytics import YOLO logger = logging.getLogger(__name__) class YOLODetector: """Handles YOLO-based human detection for video segments with support for both detection and segmentation modes.""" def __init__(self, detection_model_path: str = None, segmentation_model_path: str = None, mode: str = "detection", confidence_threshold: float = 0.6, human_class_id: int = 0): """ Initialize YOLO detector with support for both detection and segmentation modes. Args: detection_model_path: Path to YOLO detection model weights (e.g., yolov8n.pt) segmentation_model_path: Path to YOLO segmentation model weights (e.g., yolov8n-seg.pt) mode: Detection mode - "detection" for bboxes, "segmentation" for masks confidence_threshold: Detection confidence threshold human_class_id: COCO class ID for humans (0 = person) """ self.mode = mode self.confidence_threshold = confidence_threshold self.human_class_id = human_class_id # Select model path based on mode if mode == "segmentation": if not segmentation_model_path: raise ValueError("segmentation_model_path required for segmentation mode") self.model_path = segmentation_model_path self.supports_segmentation = True elif mode == "detection": if not detection_model_path: raise ValueError("detection_model_path required for detection mode") self.model_path = detection_model_path self.supports_segmentation = False else: raise ValueError(f"Invalid mode: {mode}. Must be 'detection' or 'segmentation'") # Load YOLO model try: self.model = YOLO(self.model_path) logger.info(f"Loaded YOLO model in {mode} mode from {self.model_path}") # Verify model capabilities if mode == "segmentation": # Test if model actually supports segmentation logger.info(f"YOLO Segmentation: Model loaded, will output direct masks") else: logger.info(f"YOLO Detection: Model loaded, will output bounding boxes") except Exception as e: logger.error(f"Failed to load YOLO model: {e}") raise def detect_humans_in_frame(self, frame: np.ndarray, confidence_override: Optional[float] = None, validate_with_detection: bool = False) -> List[Dict[str, Any]]: """ Detect humans in a single frame using YOLO. Args: frame: Input frame (BGR format from OpenCV) confidence_override: Optional confidence to use instead of the default validate_with_detection: If True and in segmentation mode, validate masks against detection bboxes Returns: List of human detection dictionaries with bbox, confidence, and optionally masks """ # Run YOLO detection/segmentation confidence = confidence_override if confidence_override is not None else self.confidence_threshold results = self.model(frame, conf=confidence, verbose=False) human_detections = [] # Process results for result_idx, result in enumerate(results): boxes = result.boxes masks = result.masks if hasattr(result, 'masks') and result.masks is not None else None logger.debug(f"YOLO Result {result_idx}: boxes={boxes is not None}, masks={masks is not None}") if boxes is not None: logger.debug(f" Found {len(boxes)} total boxes") if masks is not None: logger.debug(f" Found {len(masks.data)} total masks") if boxes is not None: for i, box in enumerate(boxes): # Get class ID cls = int(box.cls.cpu().numpy()[0]) # Check if it's a person (human_class_id) if cls == self.human_class_id: # Get bounding box coordinates (x1, y1, x2, y2) coords = box.xyxy[0].cpu().numpy() conf = float(box.conf.cpu().numpy()[0]) detection = { 'bbox': coords, 'confidence': conf, 'has_mask': False, 'mask': None } # Extract mask if available (segmentation mode) if masks is not None and i < len(masks.data): # Resize the raw mask to match the input frame dimensions raw_mask = masks.data[i].cpu().numpy() resized_mask = cv2.resize(raw_mask, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_NEAREST) mask_area = np.sum(resized_mask > 0.5) detection['has_mask'] = True detection['mask'] = resized_mask logger.info(f"YOLO Segmentation: Human {len(human_detections)} - conf={conf:.3f}, raw_mask_shape={raw_mask.shape}, frame_shape={frame.shape}, resized_mask_shape={resized_mask.shape}, mask_area={mask_area}px") else: logger.debug(f"YOLO Detection: Human {len(human_detections)} - conf={conf:.3f}, bbox={coords} (no mask)") human_detections.append(detection) else: logger.debug(f"YOLO: Skipping non-human detection (class {cls})") if self.supports_segmentation: masks_found = sum(1 for d in human_detections if d['has_mask']) logger.info(f"YOLO Segmentation: Found {len(human_detections)} humans, {masks_found} with masks") # Optional validation with detection model if validate_with_detection and masks_found > 0: logger.info("Validating segmentation masks with detection model...") validated_detections = self._validate_masks_with_detection(frame, human_detections, confidence_override) return validated_detections else: logger.debug(f"YOLO Detection: Found {len(human_detections)} humans with bounding boxes") return human_detections def detect_humans_in_video_first_frame(self, video_path: str, scale: float = 1.0) -> List[Dict[str, Any]]: """ Detect humans in the first frame of a video. Args: video_path: Path to video file scale: Scale factor for frame processing Returns: List of human detection dictionaries """ if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return [] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Could not open video: {video_path}") return [] ret, frame = cap.read() cap.release() if not ret: logger.error(f"Could not read first frame from: {video_path}") return [] # Scale frame if needed if scale != 1.0: frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) return self.detect_humans_in_frame(frame) def save_detections_to_file(self, detections: List[Dict[str, Any]], output_path: str) -> bool: """ Save detection results to file. Args: detections: List of detection dictionaries output_path: Path to save detections Returns: True if saved successfully """ try: with open(output_path, 'w') as f: f.write("# YOLO Human Detections\\n") if detections: for detection in detections: bbox = detection['bbox'] conf = detection['confidence'] f.write(f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]},{conf}\\n") logger.info(f"Saved {len(detections)} detections to {output_path}") else: f.write("# No humans detected\\n") logger.info(f"Saved empty detection file to {output_path}") return True except Exception as e: logger.error(f"Failed to save detections to {output_path}: {e}") return False def load_detections_from_file(self, file_path: str) -> List[Dict[str, Any]]: """ Load detection results from file. Args: file_path: Path to detection file Returns: List of detection dictionaries """ detections = [] if not os.path.exists(file_path): logger.warning(f"Detection file not found: {file_path}") return detections try: with open(file_path, 'r') as f: content = f.read() # Handle files with literal \n characters if '\\n' in content: lines = content.split('\\n') else: lines = content.split('\n') for line in lines: line = line.strip() # Skip comments and empty lines if line.startswith('#') or not line: continue # Parse detection line: x1,y1,x2,y2,confidence parts = line.split(',') if len(parts) == 5: try: bbox = [float(x) for x in parts[:4]] conf = float(parts[4]) detections.append({ 'bbox': np.array(bbox), 'confidence': conf }) except ValueError: logger.warning(f"Invalid detection line: {line}") continue logger.info(f"Loaded {len(detections)} detections from {file_path}") except Exception as e: logger.error(f"Failed to load detections from {file_path}: {e}") return detections def debug_detect_with_lower_confidence(self, frame: np.ndarray, debug_confidence: float = 0.3) -> List[Dict[str, Any]]: """ Run YOLO detection with a lower confidence threshold for debugging. This helps identify if detections are being missed due to high confidence threshold. Args: frame: Input frame (BGR format from OpenCV) debug_confidence: Lower confidence threshold for debugging Returns: List of human detection dictionaries with lower confidence threshold """ logger.info(f"VR180 Debug: Running YOLO with lower confidence {debug_confidence} (vs normal {self.confidence_threshold})") # Run YOLO detection with lower confidence results = self.model(frame, conf=debug_confidence, verbose=False) debug_detections = [] # Process results for result in results: boxes = result.boxes if boxes is not None: for box in boxes: # Get class ID cls = int(box.cls.cpu().numpy()[0]) # Check if it's a person (human_class_id) if cls == self.human_class_id: # Get bounding box coordinates (x1, y1, x2, y2) coords = box.xyxy[0].cpu().numpy() conf = float(box.conf.cpu().numpy()[0]) debug_detections.append({ 'bbox': coords, 'confidence': conf }) logger.info(f"VR180 Debug: Lower confidence detection found {len(debug_detections)} total detections") return debug_detections def detect_humans_multi_frame(self, video_path: str, frame_indices: List[int], scale: float = 1.0) -> Dict[int, List[Dict[str, Any]]]: """ Detect humans at multiple specific frame indices in a video. Used for mid-segment re-detection to improve SAM2 tracking. Args: video_path: Path to video file frame_indices: List of frame indices to run detection on (e.g., [0, 30, 60, 90]) scale: Scale factor for frame processing Returns: Dictionary mapping frame_index -> list of detection dictionaries """ if not frame_indices: logger.warning("No frame indices provided for multi-frame detection") return {} if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return {} logger.info(f"Mid-segment Detection: Running YOLO on {len(frame_indices)} frames: {frame_indices}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Could not open video: {video_path}") return {} total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 # Filter out frame indices that are beyond video length valid_frame_indices = [idx for idx in frame_indices if 0 <= idx < total_frames] if len(valid_frame_indices) != len(frame_indices): invalid_frames = [idx for idx in frame_indices if idx not in valid_frame_indices] logger.warning(f"Mid-segment Detection: Skipping invalid frame indices: {invalid_frames} (video has {total_frames} frames)") multi_frame_detections = {} for frame_idx in valid_frame_indices: # Seek to specific frame cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if not ret: logger.warning(f"Mid-segment Detection: Could not read frame {frame_idx}") continue # Scale frame if needed if scale != 1.0: frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR) # Run YOLO detection on this frame detections = self.detect_humans_in_frame(frame) multi_frame_detections[frame_idx] = detections # Log detection results time_seconds = frame_idx / fps logger.info(f"Mid-segment Detection: Frame {frame_idx} (t={time_seconds:.1f}s): {len(detections)} humans detected") for i, detection in enumerate(detections): bbox = detection['bbox'] conf = detection['confidence'] logger.debug(f"Mid-segment Detection: Frame {frame_idx}, Human {i+1}: bbox={bbox}, conf={conf:.3f}") cap.release() total_detections = sum(len(dets) for dets in multi_frame_detections.values()) logger.info(f"Mid-segment Detection: Complete - {total_detections} total detections across {len(valid_frame_indices)} frames") return multi_frame_detections def process_segments_batch(self, segments_info: List[dict], detect_segments: List[int], scale: float = 0.5) -> Dict[int, List[Dict[str, Any]]]: """ Process multiple segments for human detection. Args: segments_info: List of segment information dictionaries detect_segments: List of segment indices to process scale: Scale factor for processing Returns: Dictionary mapping segment index to detection results """ results = {} for segment_info in segments_info: segment_idx = segment_info['index'] # Skip if not in detect_segments list if detect_segments != 'all' and segment_idx not in detect_segments: continue video_path = segment_info['video_file'] detection_file = os.path.join(segment_info['directory'], "yolo_detections") # Skip if already processed if os.path.exists(detection_file): logger.info(f"Segment {segment_idx} already has detections, skipping") detections = self.load_detections_from_file(detection_file) results[segment_idx] = detections continue # Run detection logger.info(f"Processing segment {segment_idx} for human detection") detections = self.detect_humans_in_video_first_frame(video_path, scale) # Save results self.save_detections_to_file(detections, detection_file) results[segment_idx] = detections return results def convert_detections_to_sam2_prompts(self, detections: List[Dict[str, Any]], frame_width: int) -> List[Dict[str, Any]]: """ Convert YOLO detections to SAM2-compatible prompts for VR180 SBS video. For VR180, we expect 2 real detections (left and right eye views), not mirrored ones. Args: detections: List of YOLO detection results frame_width: Width of the video frame Returns: List of SAM2 prompt dictionaries with obj_id and bbox """ if not detections: logger.warning("No detections provided for SAM2 prompt conversion") return [] half_frame_width = frame_width // 2 prompts = [] logger.info(f"VR180 SBS Debug: Converting {len(detections)} detections for frame width {frame_width}") logger.info(f"VR180 SBS Debug: Half frame width = {half_frame_width}") # Sort detections by x-coordinate to get consistent left/right assignment sorted_detections = sorted(detections, key=lambda x: x['bbox'][0]) # Analyze detections by frame half left_detections = [] right_detections = [] for i, detection in enumerate(sorted_detections): bbox = detection['bbox'].copy() center_x = (bbox[0] + bbox[2]) / 2 pixel_range = f"{bbox[0]:.0f}-{bbox[2]:.0f}" if center_x < half_frame_width: left_detections.append((detection, i, pixel_range)) side = "LEFT" else: right_detections.append((detection, i, pixel_range)) side = "RIGHT" logger.info(f"VR180 SBS Debug: Detection {i}: pixels {pixel_range}, center_x={center_x:.1f}, side={side}") # VR180 SBS Format Validation logger.info(f"VR180 SBS Debug: Found {len(left_detections)} LEFT detections, {len(right_detections)} RIGHT detections") # Analyze confidence scores if left_detections: left_confidences = [det[0]['confidence'] for det in left_detections] logger.info(f"VR180 SBS Debug: LEFT eye confidences: {[f'{c:.3f}' for c in left_confidences]}") if right_detections: right_confidences = [det[0]['confidence'] for det in right_detections] logger.info(f"VR180 SBS Debug: RIGHT eye confidences: {[f'{c:.3f}' for c in right_confidences]}") if len(right_detections) == 0: logger.warning(f"VR180 SBS Warning: No detections found in RIGHT eye view (pixels {half_frame_width}-{frame_width})") logger.warning(f"VR180 SBS Warning: This may indicate:") logger.warning(f" 1. Person not visible in right eye view") logger.warning(f" 2. YOLO confidence threshold ({self.confidence_threshold}) too high") logger.warning(f" 3. VR180 SBS format issue") logger.warning(f" 4. Right eye view quality/lighting problems") logger.warning(f"VR180 SBS Suggestion: Try lowering yolo_confidence to 0.3-0.4 in config") if len(left_detections) == 0: logger.warning(f"VR180 SBS Warning: No detections found in LEFT eye view (pixels 0-{half_frame_width})") # Additional validation for VR180 SBS expectations total_detections = len(left_detections) + len(right_detections) if total_detections == 1: logger.warning(f"VR180 SBS Warning: Only 1 detection found - expected 2 for proper VR180 SBS") elif total_detections > 2: logger.warning(f"VR180 SBS Warning: {total_detections} detections found - will use only first 2") # Assign object IDs sequentially, regardless of which half they're in # This ensures we always get Object 1 and Object 2 for up to 2 detections obj_id = 1 # Process up to 2 detections total (left + right combined) all_detections = sorted_detections[:2] for i, detection in enumerate(all_detections): bbox = detection['bbox'].copy() center_x = (bbox[0] + bbox[2]) / 2 pixel_range = f"{bbox[0]:.0f}-{bbox[2]:.0f}" # Determine which eye view this detection is in if center_x < half_frame_width: eye_view = "LEFT" else: eye_view = "RIGHT" prompts.append({ 'obj_id': obj_id, 'bbox': bbox, 'confidence': detection['confidence'] }) logger.info(f"VR180 SBS Debug: Added {eye_view} eye detection as SAM2 Object {obj_id}") logger.info(f"VR180 SBS Debug: Object {obj_id} bbox: {bbox} (pixels {pixel_range})") obj_id += 1 logger.info(f"VR180 SBS Debug: Final result - {len(detections)} YOLO detections → {len(prompts)} SAM2 prompts") # Verify we have the expected objects obj_ids = [p['obj_id'] for p in prompts] logger.info(f"VR180 SBS Debug: SAM2 Object IDs created: {obj_ids}") return prompts def convert_yolo_masks_to_video_segments(self, detections: List[Dict[str, Any]], frame_width: int, target_frame_shape: Tuple[int, int] = None) -> Optional[Dict[int, Dict[int, np.ndarray]]]: """ Convert YOLO segmentation masks to SAM2-compatible video segments format. This allows using YOLO masks directly without SAM2 processing. Args: detections: List of YOLO detection results with masks frame_width: Width of the video frame for VR180 object ID assignment target_frame_shape: Target shape (height, width) for mask resizing Returns: Video segments dictionary compatible with SAM2 output format, or None if no masks """ if not detections: logger.warning("No detections provided for mask conversion") return None # Check if any detections have masks detections_with_masks = [d for d in detections if d.get('has_mask', False)] if not detections_with_masks: logger.warning("No detections have masks - YOLO segmentation may not be working") return None logger.info(f"YOLO Mask Conversion: Converting {len(detections_with_masks)} YOLO masks to video segments format") half_frame_width = frame_width // 2 video_segments = {} # Create frame 0 with converted masks frame_masks = {} obj_id = 1 # Sort detections by x-coordinate for consistent VR180 SBS assignment sorted_detections = sorted(detections_with_masks, key=lambda x: x['bbox'][0]) for i, detection in enumerate(sorted_detections[:2]): # Take up to 2 humans mask = detection['mask'] bbox = detection['bbox'] center_x = (bbox[0] + bbox[2]) / 2 # Assign sequential object IDs (similar to prompt conversion logic) current_obj_id = obj_id # Determine which eye view for logging if center_x < half_frame_width: eye_view = "LEFT" else: eye_view = "RIGHT" # Resize mask to target frame shape if specified if target_frame_shape and mask.shape != target_frame_shape: mask_resized = cv2.resize(mask.astype(np.float32), (target_frame_shape[1], target_frame_shape[0]), interpolation=cv2.INTER_NEAREST) mask = (mask_resized > 0.5).astype(bool) else: mask = mask.astype(bool) frame_masks[current_obj_id] = mask logger.info(f"YOLO Mask Conversion: {eye_view} eye detection -> Object {current_obj_id}, mask_shape={mask.shape}, pixels={np.sum(mask)}") obj_id += 1 # Always increment for next detection # Store masks in video segments format (single frame) video_segments[0] = frame_masks total_objects = len(frame_masks) total_pixels = sum(np.sum(mask) for mask in frame_masks.values()) logger.info(f"YOLO Mask Conversion: Created video segments with {total_objects} objects, {total_pixels} total mask pixels") return video_segments def save_debug_frame_with_detections(self, frame: np.ndarray, detections: List[Dict[str, Any]], output_path: str, prompts: List[Dict[str, Any]] = None) -> bool: """ Save a debug frame with YOLO detections and SAM2 prompts overlaid as bounding boxes. Args: frame: Input frame (BGR format from OpenCV) detections: List of detection dictionaries with bbox and confidence output_path: Path to save the debug image prompts: Optional list of SAM2 prompt dictionaries with obj_id and bbox Returns: True if saved successfully """ try: debug_frame = frame.copy() # Draw masks (if available) or bounding boxes for each detection for i, detection in enumerate(detections): bbox = detection['bbox'] confidence = detection['confidence'] has_mask = detection.get('has_mask', False) # Extract coordinates x1, y1, x2, y2 = map(int, bbox) # Choose color based on confidence (green for high, yellow for medium, red for low) if confidence >= 0.8: color = (0, 255, 0) # Green elif confidence >= 0.6: color = (0, 255, 255) # Yellow else: color = (0, 0, 255) # Red if has_mask and 'mask' in detection: # Draw segmentation mask mask = detection['mask'] # Resize mask to match frame if needed if mask.shape != debug_frame.shape[:2]: mask = cv2.resize(mask.astype(np.float32), (debug_frame.shape[1], debug_frame.shape[0]), interpolation=cv2.INTER_NEAREST) mask = mask > 0.5 mask = mask.astype(bool) # Apply colored overlay with transparency overlay = debug_frame.copy() overlay[mask] = color cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame) # Draw mask outline contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(debug_frame, contours, -1, color, 2) # Prepare label text for segmentation label = f"Person {i+1}: {confidence:.2f} (MASK)" else: # Draw bounding box (detection mode or no mask available) cv2.rectangle(debug_frame, (x1, y1), (x2, y2), color, 2) # Prepare label text for detection label = f"Person {i+1}: {confidence:.2f} (BBOX)" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] # Draw label background cv2.rectangle(debug_frame, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), color, -1) # Draw label text cv2.putText(debug_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Draw SAM2 prompts if provided (with different colors/style) if prompts: for prompt in prompts: obj_id = prompt['obj_id'] bbox = prompt['bbox'] # Extract coordinates x1, y1, x2, y2 = map(int, bbox) # Use different colors for each object ID if obj_id == 1: prompt_color = (0, 255, 0) # Green for Object 1 elif obj_id == 2: prompt_color = (255, 0, 0) # Blue for Object 2 else: prompt_color = (255, 255, 0) # Cyan for others # Draw thicker, dashed-style border for SAM2 prompts thickness = 3 cv2.rectangle(debug_frame, (x1-2, y1-2), (x2+2, y2+2), prompt_color, thickness) # Add SAM2 object ID label sam_label = f"SAM2 Obj {obj_id}" label_size = cv2.getTextSize(sam_label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] # Draw label background cv2.rectangle(debug_frame, (x1-2, y2+5), (x1-2 + label_size[0], y2+5 + label_size[1] + 5), prompt_color, -1) # Draw label text cv2.putText(debug_frame, sam_label, (x1-2, y2+5 + label_size[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # Draw VR180 SBS boundary line (center line separating left and right eye views) frame_height, frame_width = debug_frame.shape[:2] center_x = frame_width // 2 cv2.line(debug_frame, (center_x, 0), (center_x, frame_height), (0, 255, 255), 3) # Yellow line # Add VR180 SBS labels cv2.putText(debug_frame, "LEFT EYE", (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) cv2.putText(debug_frame, "RIGHT EYE", (center_x + 10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # Add summary text at top with mode information mode_text = f"YOLO Mode: {self.mode.upper()}" masks_available = sum(1 for d in detections if d.get('has_mask', False)) if self.supports_segmentation and masks_available > 0: summary = f"VR180 SBS: {len(detections)} detections → {masks_available} MASKS (for SAM2 propagation)" else: summary = f"VR180 SBS: {len(detections)} detections → {len(prompts) if prompts else 0} SAM2 prompts" cv2.putText(debug_frame, mode_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # Yellow for mode cv2.putText(debug_frame, summary, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) # Add frame dimensions info dims_info = f"Frame: {frame_width}x{frame_height}, Center: {center_x}" cv2.putText(debug_frame, dims_info, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Save debug frame success = cv2.imwrite(output_path, debug_frame) if success: logger.info(f"Saved YOLO debug frame to {output_path}") else: logger.error(f"Failed to save debug frame to {output_path}") return success except Exception as e: logger.error(f"Error creating debug frame: {e}") return False def detect_humans_in_single_eye(self, frame: np.ndarray, eye_side: str) -> List[Dict[str, Any]]: """ Detect humans in a single eye frame (left or right). Args: frame: Input eye frame (BGR format) eye_side: 'left' or 'right' eye Returns: List of human detection dictionaries for the single eye """ logger.info(f"Running YOLO detection on {eye_side} eye frame") # Run standard detection on the eye frame detections = self.detect_humans_in_frame(frame) logger.info(f"YOLO {eye_side.upper()} Eye: Found {len(detections)} human detections") for i, detection in enumerate(detections): bbox = detection['bbox'] conf = detection['confidence'] has_mask = detection.get('has_mask', False) logger.debug(f"YOLO {eye_side.upper()} Eye Detection {i+1}: bbox={bbox}, conf={conf:.3f}, has_mask={has_mask}") return detections def convert_eye_detections_to_sam2_prompts(self, detections: List[Dict[str, Any]], eye_side: str) -> List[Dict[str, Any]]: """ Convert single eye detections to SAM2 prompts (always uses obj_id=1 for single eye processing). Args: detections: List of YOLO detection results for single eye eye_side: 'left' or 'right' eye Returns: List of SAM2 prompt dictionaries with obj_id=1 for single eye processing """ if not detections: logger.warning(f"No detections provided for {eye_side} eye SAM2 prompt conversion") return [] logger.info(f"Converting {len(detections)} {eye_side} eye detections to SAM2 prompts") prompts = [] # For single eye processing, always use obj_id=1 and take the best detection best_detection = max(detections, key=lambda x: x['confidence']) prompts.append({ 'obj_id': 1, # Always use obj_id=1 for single eye processing 'bbox': best_detection['bbox'].copy(), 'confidence': best_detection['confidence'] }) logger.info(f"{eye_side.upper()} Eye: Converted best detection (conf={best_detection['confidence']:.3f}) to SAM2 Object 1") return prompts def has_any_detections(self, detections_list: List[List[Dict[str, Any]]]) -> bool: """ Check if any detections exist in a list of detection lists. Args: detections_list: List of detection lists (e.g., [left_detections, right_detections]) Returns: True if any detections are found """ for detections in detections_list: if detections: return True return False def split_detections_by_eye(self, detections: List[Dict[str, Any]], frame_width: int) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """ Split VR180 detections into left and right eye detections with coordinate conversion. Args: detections: List of full-frame VR180 detections frame_width: Width of the full VR180 frame Returns: Tuple of (left_eye_detections, right_eye_detections) with converted coordinates """ half_width = frame_width // 2 left_detections = [] right_detections = [] logger.info(f"Splitting {len(detections)} VR180 detections by eye (frame_width={frame_width}, half_width={half_width})") for i, detection in enumerate(detections): bbox = detection['bbox'] center_x = (bbox[0] + bbox[2]) / 2 logger.info(f"Detection {i}: bbox={bbox}, center_x={center_x:.1f}") # Create a copy with converted coordinates converted_detection = detection.copy() converted_bbox = bbox.copy() if center_x < half_width: # Left eye detection - coordinates remain the same # For segmentation mode, we also need to crop the mask to the left eye if detection.get('has_mask', False) and 'mask' in detection: original_mask = detection['mask'] # Crop mask to left half (keep original coordinates for now, will be handled in eye processing) converted_detection['mask'] = original_mask logger.info(f"Detection {i}: LEFT eye mask shape: {original_mask.shape}") left_detections.append(converted_detection) logger.info(f"Detection {i}: Assigned to LEFT eye, center_x={center_x:.1f} < {half_width}, bbox={bbox}") else: # Right eye detection - shift coordinates to start from 0 original_bbox = converted_bbox.copy() converted_bbox[0] -= half_width # x1 converted_bbox[2] -= half_width # x2 # Ensure coordinates are within bounds converted_bbox[0] = max(0, converted_bbox[0]) converted_bbox[2] = max(0, min(converted_bbox[2], half_width)) converted_detection['bbox'] = converted_bbox # For segmentation mode, we also need to crop the mask to the right eye if detection.get('has_mask', False) and 'mask' in detection: original_mask = detection['mask'] # Crop mask to right half and shift coordinates # Note: This is a simplified approach - the mask coordinates need to be handled properly converted_detection['mask'] = original_mask # Will be properly handled in eye processing logger.info(f"Detection {i}: RIGHT eye mask shape: {original_mask.shape}") right_detections.append(converted_detection) logger.info(f"Detection {i}: Assigned to RIGHT eye, center_x={center_x:.1f} >= {half_width}, original_bbox={original_bbox}, converted_bbox={converted_bbox}") logger.info(f"Split result: {len(left_detections)} left eye, {len(right_detections)} right eye detections") return left_detections, right_detections def save_eye_debug_frames(self, left_frame: np.ndarray, right_frame: np.ndarray, left_detections: List[Dict[str, Any]], right_detections: List[Dict[str, Any]], left_output_path: str, right_output_path: str) -> Tuple[bool, bool]: """ Save debug frames for both left and right eye detections. Args: left_frame: Left eye frame right_frame: Right eye frame left_detections: Left eye detections right_detections: Right eye detections left_output_path: Output path for left eye debug frame right_output_path: Output path for right eye debug frame Returns: Tuple of (left_success, right_success) """ logger.info(f"Saving eye-specific debug frames") # Save left eye debug frame (eye-specific version) left_success = self._save_single_eye_debug_frame( left_frame, left_detections, left_output_path, "LEFT" ) # Save right eye debug frame (eye-specific version) right_success = self._save_single_eye_debug_frame( right_frame, right_detections, right_output_path, "RIGHT" ) if left_success: logger.info(f"Saved left eye debug frame: {left_output_path}") if right_success: logger.info(f"Saved right eye debug frame: {right_output_path}") return left_success, right_success def _save_single_eye_debug_frame(self, frame: np.ndarray, detections: List[Dict[str, Any]], output_path: str, eye_side: str) -> bool: """ Save a debug frame for a single eye with eye-specific visualizations. Args: frame: Single eye frame (BGR format from OpenCV) detections: List of detection dictionaries for this eye output_path: Path to save the debug image eye_side: "LEFT" or "RIGHT" Returns: True if saved successfully """ try: debug_frame = frame.copy() # Draw masks or bounding boxes for each detection for i, detection in enumerate(detections): bbox = detection['bbox'] confidence = detection['confidence'] has_mask = detection.get('has_mask', False) # Extract coordinates x1, y1, x2, y2 = map(int, bbox) # Choose color based on confidence (green for high, yellow for medium, red for low) if confidence >= 0.8: color = (0, 255, 0) # Green elif confidence >= 0.6: color = (0, 255, 255) # Yellow else: color = (0, 0, 255) # Red if has_mask and 'mask' in detection: # Draw segmentation mask mask = detection['mask'] # Resize mask to match frame if needed if mask.shape != debug_frame.shape[:2]: mask = cv2.resize(mask.astype(np.float32), (debug_frame.shape[1], debug_frame.shape[0]), interpolation=cv2.INTER_NEAREST) mask = mask > 0.5 mask = mask.astype(bool) # Apply colored overlay with transparency overlay = debug_frame.copy() overlay[mask] = color cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame) # Draw mask outline contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(debug_frame, contours, -1, color, 2) # Prepare label text for segmentation label = f"Person {i+1}: {confidence:.2f} (MASK)" else: # Draw bounding box (detection mode or no mask available) cv2.rectangle(debug_frame, (x1, y1), (x2, y2), color, 2) # Prepare label text for detection label = f"Person {i+1}: {confidence:.2f} (BBOX)" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] # Draw label background cv2.rectangle(debug_frame, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), color, -1) # Draw label text cv2.putText(debug_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Add title specific to this eye frame_height, frame_width = debug_frame.shape[:2] title = f"{eye_side} EYE: {len(detections)} detections" cv2.putText(debug_frame, title, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) # Add mode information mode_text = f"YOLO Mode: {self.mode.upper()}" masks_available = sum(1 for d in detections if d.get('has_mask', False)) if self.supports_segmentation and masks_available > 0: summary = f"{len(detections)} detections → {masks_available} MASKS" else: summary = f"{len(detections)} detections → BOUNDING BOXES" cv2.putText(debug_frame, mode_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # Yellow for mode cv2.putText(debug_frame, summary, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) # Add frame dimensions info dims_info = f"Frame: {frame_width}x{frame_height}" cv2.putText(debug_frame, dims_info, (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Save debug frame success = cv2.imwrite(output_path, debug_frame) if success: logger.info(f"Saved {eye_side} eye debug frame to {output_path}") else: logger.error(f"Failed to save {eye_side} eye debug frame to {output_path}") return success except Exception as e: logger.error(f"Error creating {eye_side} eye debug frame: {e}") return False def _calculate_iou(self, mask1: np.ndarray, mask2: np.ndarray) -> float: """Calculate Intersection over Union for two masks of the same size.""" if mask1.shape != mask2.shape: return 0.0 intersection = np.logical_and(mask1, mask2).sum() union = np.logical_or(mask1, mask2).sum() return intersection / union if union > 0 else 0.0 def _calculate_stereo_similarity(self, left_mask: np.ndarray, right_mask: np.ndarray, left_bbox: np.ndarray, right_bbox: np.ndarray, left_idx: int = -1, right_idx: int = -1) -> float: """ Calculate stereo similarity for VR180 masks using spatial and size features. For VR180, left and right eye views won't overlap much, so we use other metrics. """ logger.info(f" Starting similarity calculation L{left_idx} vs R{right_idx}") logger.info(f" Left mask: shape={left_mask.shape}, dtype={left_mask.dtype}, min={left_mask.min()}, max={left_mask.max()}") logger.info(f" Right mask: shape={right_mask.shape}, dtype={right_mask.dtype}, min={right_mask.min()}, max={right_mask.max()}") logger.info(f" Left bbox: {left_bbox}") logger.info(f" Right bbox: {right_bbox}") if left_mask.shape != right_mask.shape: logger.info(f" L{left_idx} vs R{right_idx}: Shape mismatch - {left_mask.shape} vs {right_mask.shape} - attempting to resize") # Try to resize the smaller mask to match the larger one if left_mask.size < right_mask.size: left_mask = cv2.resize(left_mask.astype(np.float32), (right_mask.shape[1], right_mask.shape[0]), interpolation=cv2.INTER_NEAREST) left_mask = left_mask > 0.5 logger.info(f" Resized left mask to {left_mask.shape}") else: right_mask = cv2.resize(right_mask.astype(np.float32), (left_mask.shape[1], left_mask.shape[0]), interpolation=cv2.INTER_NEAREST) right_mask = right_mask > 0.5 logger.info(f" Resized right mask to {right_mask.shape}") if left_mask.shape != right_mask.shape: logger.warning(f" L{left_idx} vs R{right_idx}: Still shape mismatch after resize - {left_mask.shape} vs {right_mask.shape}") return 0.0 # 1. Size similarity (area ratio) left_area = np.sum(left_mask) right_area = np.sum(right_mask) if left_area == 0 or right_area == 0: logger.debug(f" L{left_idx} vs R{right_idx}: Zero area - left={left_area}, right={right_area}") return 0.0 area_ratio = min(left_area, right_area) / max(left_area, right_area) # 2. Vertical position similarity (y-coordinates should be similar) left_center_y = (left_bbox[1] + left_bbox[3]) / 2 right_center_y = (right_bbox[1] + right_bbox[3]) / 2 height = left_mask.shape[0] y_diff = abs(left_center_y - right_center_y) / height y_similarity = max(0, 1.0 - y_diff * 2) # Penalize vertical misalignment # 3. Height similarity (bounding box heights should be similar) left_height = left_bbox[3] - left_bbox[1] right_height = right_bbox[3] - right_bbox[1] if left_height == 0 or right_height == 0: height_ratio = 0.0 else: height_ratio = min(left_height, right_height) / max(left_height, right_height) # 4. Aspect ratio similarity left_width = left_bbox[2] - left_bbox[0] right_width = right_bbox[2] - right_bbox[0] if left_width == 0 or right_width == 0 or left_height == 0 or right_height == 0: aspect_similarity = 0.0 else: left_aspect = left_width / left_height right_aspect = right_width / right_height aspect_diff = abs(left_aspect - right_aspect) / max(left_aspect, right_aspect) aspect_similarity = max(0, 1.0 - aspect_diff) # Combine metrics with weights similarity = ( area_ratio * 0.3 + # 30% weight on size similarity y_similarity * 0.4 + # 40% weight on vertical alignment height_ratio * 0.2 + # 20% weight on height similarity aspect_similarity * 0.1 # 10% weight on aspect ratio ) # Detailed logging for each comparison logger.info(f" L{left_idx} vs R{right_idx}: area_ratio={area_ratio:.3f} (L={left_area}px, R={right_area}px), " f"y_sim={y_similarity:.3f} (L_y={left_center_y:.1f}, R_y={right_center_y:.1f}, diff={y_diff:.3f}), " f"height_ratio={height_ratio:.3f} (L_h={left_height:.1f}, R_h={right_height:.1f}), " f"aspect_sim={aspect_similarity:.3f} (L_asp={left_aspect:.2f}, R_asp={right_aspect:.2f}), " f"FINAL_SIMILARITY={similarity:.3f}") return similarity def _find_matching_mask_pairs(self, left_masks: List[Dict[str, Any]], right_masks: List[Dict[str, Any]], similarity_threshold: float) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: """Find the best matching pairs of masks between left and right eyes using stereo similarity.""" logger.info(f"Starting stereo mask matching with {len(left_masks)} left masks and {len(right_masks)} right masks.") if not left_masks or not right_masks: return [], left_masks, right_masks # 1. Calculate all similarity scores for every possible pair possible_pairs = [] logger.info("--- Calculating all possible stereo similarity pairs ---") # First, log details about each mask logger.info(f"LEFT EYE MASKS ({len(left_masks)} total):") for i, left_detection in enumerate(left_masks): bbox = left_detection['bbox'] mask_area = np.sum(left_detection['mask']) conf = left_detection['confidence'] logger.info(f" L{i}: bbox=[{bbox[0]:.1f},{bbox[1]:.1f},{bbox[2]:.1f},{bbox[3]:.1f}], area={mask_area}px, conf={conf:.3f}") logger.info(f"RIGHT EYE MASKS ({len(right_masks)} total):") for j, right_detection in enumerate(right_masks): bbox = right_detection['bbox'] mask_area = np.sum(right_detection['mask']) conf = right_detection['confidence'] logger.info(f" R{j}: bbox=[{bbox[0]:.1f},{bbox[1]:.1f},{bbox[2]:.1f},{bbox[3]:.1f}], area={mask_area}px, conf={conf:.3f}") logger.info("--- Stereo Similarity Calculations ---") for i, left_detection in enumerate(left_masks): for j, right_detection in enumerate(right_masks): try: # Use stereo similarity instead of IOU for VR180 similarity = self._calculate_stereo_similarity( left_detection['mask'], right_detection['mask'], left_detection['bbox'], right_detection['bbox'], left_idx=i, right_idx=j ) if similarity > similarity_threshold: possible_pairs.append({'left_idx': i, 'right_idx': j, 'similarity': similarity}) logger.info(f" ✓ L{i} vs R{j}: ABOVE THRESHOLD ({similarity:.4f} > {similarity_threshold:.4f})") else: logger.info(f" ✗ L{i} vs R{j}: BELOW THRESHOLD ({similarity:.4f} <= {similarity_threshold:.4f})") except Exception as e: logger.error(f" ERROR L{i} vs R{j}: Exception in similarity calculation: {e}") similarity = 0.0 # 2. Sort pairs by similarity score in descending order to prioritize the best matches possible_pairs.sort(key=lambda x: x['similarity'], reverse=True) logger.debug("--- Sorted similarity pairs above threshold ---") for pair in possible_pairs: logger.debug(f" Pair (L{pair['left_idx']}, R{pair['right_idx']}) - Similarity: {pair['similarity']:.4f}") matched_pairs = [] matched_left_indices = set() matched_right_indices = set() # 3. Iterate through sorted pairs and greedily select the best available ones logger.debug("--- Selecting best pairs ---") for pair in possible_pairs: left_idx, right_idx = pair['left_idx'], pair['right_idx'] if left_idx not in matched_left_indices and right_idx not in matched_right_indices: logger.info(f" MATCH FOUND: (L{left_idx}, R{right_idx}) with Similarity {pair['similarity']:.4f}") matched_pairs.append({ 'left_mask': left_masks[left_idx], 'right_mask': right_masks[right_idx], 'similarity': pair['similarity'] # Changed from 'iou' to 'similarity' }) matched_left_indices.add(left_idx) matched_right_indices.add(right_idx) else: logger.debug(f" Skipping pair (L{left_idx}, R{right_idx}) because one mask is already matched.") # 4. Identify unmatched (orphan) masks unmatched_left = [mask for i, mask in enumerate(left_masks) if i not in matched_left_indices] unmatched_right = [mask for i, mask in enumerate(right_masks) if i not in matched_right_indices] logger.info(f"Matching complete: Found {len(matched_pairs)} pairs. Left orphans: {len(unmatched_left)}, Right orphans: {len(unmatched_right)}.") return matched_pairs, unmatched_left, unmatched_right def _save_stereo_agreement_debug_frame(self, left_frame: np.ndarray, right_frame: np.ndarray, left_detections: List[Dict[str, Any]], right_detections: List[Dict[str, Any]], matched_pairs: List[Dict[str, Any]], unmatched_left: List[Dict[str, Any]], unmatched_right: List[Dict[str, Any]], output_path: str, title: str): """Save a debug frame visualizing the stereo mask agreement process.""" try: # Create a combined image h, w, _ = left_frame.shape combined_frame = np.hstack((left_frame, right_frame)) def get_centroid(mask): m = cv2.moments(mask.astype(np.uint8), binaryImage=True) return (int(m["m10"] / m["m00"]), int(m["m01"] / m["m00"])) if m["m00"] != 0 else (0,0) def draw_label(frame, text, pos, color): # Draw a black background rectangle cv2.rectangle(frame, (pos[0], pos[1] - 14), (pos[0] + len(text) * 8, pos[1] + 5), (0,0,0), -1) # Draw the text cv2.putText(frame, text, pos, cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) # --- Draw ALL Masks First (to ensure every mask gets a label) --- logger.info(f"Debug Frame: Drawing {len(left_detections)} left masks and {len(right_detections)} right masks") # Draw all left detections first for i, detection in enumerate(left_detections): mask = detection['mask'] mask_area = np.sum(mask > 0.5) # Skip tiny masks that are likely noise if mask_area < 100: # Less than 100 pixels logger.debug(f"Skipping tiny left mask L{i} with area {mask_area}px") continue contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: cv2.drawContours(combined_frame, contours, -1, (0, 0, 255), 2) # Default red for unmatched c = get_centroid(mask) if c[0] > 0 and c[1] > 0: # Valid centroid draw_label(combined_frame, f"L{i}", c, (0, 0, 255)) logger.debug(f"Drew left mask L{i} at centroid {c}, area={mask_area}px") # Draw all right detections for i, detection in enumerate(right_detections): mask = detection['mask'] mask_area = np.sum(mask > 0.5) # Skip tiny masks that are likely noise if mask_area < 100: # Less than 100 pixels logger.debug(f"Skipping tiny right mask R{i} with area {mask_area}px") continue contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: for cnt in contours: cnt[:, :, 0] += w cv2.drawContours(combined_frame, contours, -1, (0, 0, 255), 2) # Default red for unmatched c_shifted = get_centroid(mask) c = (c_shifted[0] + w, c_shifted[1]) if c[0] > w and c[1] > 0: # Valid centroid in right half draw_label(combined_frame, f"R{i}", c, (0, 0, 255)) logger.debug(f"Drew right mask R{i} at centroid {c}, area={mask_area}px") # --- Now Overdraw Matched Pairs in Green --- for pair in matched_pairs: left_mask = pair['left_mask']['mask'] right_mask = pair['right_mask']['mask'] # Find the indices from the stored pair data (should be available from matching) left_idx = None right_idx = None # Find indices by comparing mask properties for i, det in enumerate(left_detections): if (np.array_equal(det['bbox'], pair['left_mask']['bbox']) and abs(det['confidence'] - pair['left_mask']['confidence']) < 0.001): left_idx = i break for i, det in enumerate(right_detections): if (np.array_equal(det['bbox'], pair['right_mask']['bbox']) and abs(det['confidence'] - pair['right_mask']['confidence']) < 0.001): right_idx = i break # Draw left mask in green (matched) contours, _ = cv2.findContours(left_mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: cv2.drawContours(combined_frame, contours, -1, (0, 255, 0), 3) # Thicker green line c1 = get_centroid(left_mask) if c1[0] > 0 and c1[1] > 0: draw_label(combined_frame, f"L{left_idx if left_idx is not None else '?'}", c1, (0, 255, 0)) # Draw right mask in green (matched) contours, _ = cv2.findContours(right_mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: for cnt in contours: cnt[:, :, 0] += w cv2.drawContours(combined_frame, contours, -1, (0, 255, 0), 3) # Thicker green line c2_shifted = get_centroid(right_mask) c2 = (c2_shifted[0] + w, c2_shifted[1]) if c2[0] > w and c2[1] > 0: draw_label(combined_frame, f"R{right_idx if right_idx is not None else '?'}", c2, (0, 255, 0)) # Draw line connecting centroids and similarity score cv2.line(combined_frame, c1, c2, (0, 255, 0), 2) similarity_text = f"Sim: {pair.get('similarity', pair.get('iou', 0)):.2f}" cv2.putText(combined_frame, similarity_text, (c1[0] + 10, c1[1] + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Add title cv2.putText(combined_frame, title, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) cv2.imwrite(output_path, combined_frame) logger.info(f"Saved stereo agreement debug frame to {output_path}") except Exception as e: logger.error(f"Failed to create stereo agreement debug frame: {e}") def detect_and_match_stereo_pairs(self, frame: np.ndarray, confidence_reduction_factor: float, stereo_similarity_threshold: float, segment_info: dict, save_debug_frames: bool) -> List[Dict[str, Any]]: """The main method to detect and match stereo mask pairs.""" frame_height, frame_width, _ = frame.shape half_width = frame_width // 2 left_eye_frame = frame[:, :half_width] right_eye_frame = frame[:, half_width:half_width*2] # Ensure exact same width logger.info(f"VR180 Frame Split: Original={frame.shape}, Left={left_eye_frame.shape}, Right={right_eye_frame.shape}") # Initial detection with validation logger.info(f"Running initial stereo detection at {self.confidence_threshold} confidence.") left_detections = self.detect_humans_in_frame(left_eye_frame, validate_with_detection=True) right_detections = self.detect_humans_in_frame(right_eye_frame, validate_with_detection=True) # Convert IOU threshold to similarity threshold (IOU 0.5 ≈ similarity 0.3) similarity_threshold = max(0.2, stereo_similarity_threshold * 0.6) matched_pairs, unmatched_left, unmatched_right = self._find_matching_mask_pairs(left_detections, right_detections, similarity_threshold) if save_debug_frames: debug_path = os.path.join(segment_info['directory'], "yolo_stereo_agreement_initial.jpg") title = f"Initial Attempt (Conf: {self.confidence_threshold:.2f}) - {len(matched_pairs)} Pairs" self._save_stereo_agreement_debug_frame(left_eye_frame, right_eye_frame, left_detections, right_detections, matched_pairs, unmatched_left, unmatched_right, debug_path, title) # Retry with lower confidence if no pairs found if not matched_pairs: new_confidence = self.confidence_threshold * confidence_reduction_factor logger.info(f"No valid pairs found. Reducing confidence to {new_confidence:.2f} and retrying.") left_detections = self.detect_humans_in_frame(left_eye_frame, confidence_override=new_confidence, validate_with_detection=True) right_detections = self.detect_humans_in_frame(right_eye_frame, confidence_override=new_confidence, validate_with_detection=True) matched_pairs, unmatched_left, unmatched_right = self._find_matching_mask_pairs(left_detections, right_detections, similarity_threshold) if save_debug_frames: debug_path = os.path.join(segment_info['directory'], "yolo_stereo_agreement_retry.jpg") title = f"Retry Attempt (Conf: {new_confidence:.2f}) - {len(matched_pairs)} Pairs" self._save_stereo_agreement_debug_frame(left_eye_frame, right_eye_frame, left_detections, right_detections, matched_pairs, unmatched_left, unmatched_right, debug_path, title) # Prepare final results - convert to full-frame coordinates and masks final_prompts = [] if matched_pairs: logger.info(f"Found {len(matched_pairs)} valid stereo pairs.") for i, pair in enumerate(matched_pairs): # Convert eye-specific coordinates and masks to full-frame left_bbox_full_frame, left_mask_full_frame = self._convert_eye_to_full_frame( pair['left_mask']['bbox'], pair['left_mask']['mask'], 'left', frame_width, frame_height ) right_bbox_full_frame, right_mask_full_frame = self._convert_eye_to_full_frame( pair['right_mask']['bbox'], pair['right_mask']['mask'], 'right', frame_width, frame_height ) logger.info(f"Stereo Pair {i}: Left bbox {pair['left_mask']['bbox']} -> {left_bbox_full_frame}") logger.info(f"Stereo Pair {i}: Right bbox {pair['right_mask']['bbox']} -> {right_bbox_full_frame}") # Create prompts for SAM2 with full-frame coordinates and masks final_prompts.append({ 'obj_id': i * 2 + 1, 'bbox': left_bbox_full_frame, 'mask': left_mask_full_frame }) final_prompts.append({ 'obj_id': i * 2 + 2, 'bbox': right_bbox_full_frame, 'mask': right_mask_full_frame }) else: logger.warning("No valid stereo pairs found after all attempts.") return final_prompts def _convert_eye_to_full_frame(self, eye_bbox: np.ndarray, eye_mask: np.ndarray, eye_side: str, full_frame_width: int, full_frame_height: int) -> tuple: """ Convert eye-specific bounding box and mask to full-frame coordinates. Args: eye_bbox: Bounding box in eye coordinate system eye_mask: Mask in eye coordinate system eye_side: 'left' or 'right' full_frame_width: Width of the full VR180 frame full_frame_height: Height of the full VR180 frame Returns: Tuple of (full_frame_bbox, full_frame_mask) """ half_width = full_frame_width // 2 # Convert bounding box coordinates full_frame_bbox = eye_bbox.copy() if eye_side == 'right': # Shift right eye coordinates by half_width full_frame_bbox[0] += half_width # x1 full_frame_bbox[2] += half_width # x2 # Create full-frame mask full_frame_mask = np.zeros((full_frame_height, full_frame_width), dtype=eye_mask.dtype) if eye_side == 'left': # Place left eye mask in left half eye_height, eye_width = eye_mask.shape target_height = min(eye_height, full_frame_height) target_width = min(eye_width, half_width) full_frame_mask[:target_height, :target_width] = eye_mask[:target_height, :target_width] else: # right # Place right eye mask in right half eye_height, eye_width = eye_mask.shape target_height = min(eye_height, full_frame_height) target_width = min(eye_width, half_width) full_frame_mask[:target_height, half_width:half_width+target_width] = eye_mask[:target_height, :target_width] logger.debug(f"Converted {eye_side} eye: bbox {eye_bbox} -> {full_frame_bbox}, " f"mask {eye_mask.shape} -> {full_frame_mask.shape}, " f"mask_pixels: {np.sum(eye_mask > 0.5)} -> {np.sum(full_frame_mask > 0.5)}") return full_frame_bbox, full_frame_mask def _validate_masks_with_detection(self, frame: np.ndarray, segmentation_detections: List[Dict[str, Any]], confidence_override: Optional[float] = None) -> List[Dict[str, Any]]: """ Validate segmentation masks by checking if they overlap with detection bounding boxes. This helps filter out spurious mask regions that aren't actually humans. """ if not hasattr(self, '_detection_model'): # Load detection model for validation try: detection_model_path = self.model_path.replace('-seg.pt', '.pt') # Try to find detection version if not os.path.exists(detection_model_path): detection_model_path = "yolo11l.pt" # Fallback to default logger.info(f"Loading detection model for validation: {detection_model_path}") self._detection_model = YOLO(detection_model_path) except Exception as e: logger.warning(f"Could not load detection model for validation: {e}") return segmentation_detections # Run detection model confidence = confidence_override if confidence_override is not None else self.confidence_threshold detection_results = self._detection_model(frame, conf=confidence, verbose=False) # Extract detection bounding boxes detection_bboxes = [] for result in detection_results: if result.boxes is not None: for box in result.boxes: cls = int(box.cls.cpu().numpy()[0]) if cls == self.human_class_id: coords = box.xyxy[0].cpu().numpy() conf = float(box.conf.cpu().numpy()[0]) detection_bboxes.append({'bbox': coords, 'confidence': conf}) logger.info(f"Validation: Found {len(detection_bboxes)} detection bboxes vs {len(segmentation_detections)} segmentation masks") # Validate each segmentation mask against detection bboxes validated_detections = [] for seg_det in segmentation_detections: if not seg_det['has_mask']: validated_detections.append(seg_det) continue # Check if this mask overlaps significantly with any detection bbox mask = seg_det['mask'] seg_bbox = seg_det['bbox'] best_overlap = 0.0 best_detection = None for det_bbox_info in detection_bboxes: det_bbox = det_bbox_info['bbox'] overlap = self._calculate_bbox_overlap(seg_bbox, det_bbox) if overlap > best_overlap: best_overlap = overlap best_detection = det_bbox_info if best_overlap > 0.3: # 30% overlap threshold logger.info(f"Validation: Segmentation mask validated (overlap={best_overlap:.3f} with detection conf={best_detection['confidence']:.3f})") validated_detections.append(seg_det) else: mask_area = np.sum(mask > 0.5) logger.warning(f"Validation: Rejecting segmentation mask with low overlap ({best_overlap:.3f}) - area={mask_area}px") logger.info(f"Validation: Kept {len(validated_detections)}/{len(segmentation_detections)} segmentation masks") return validated_detections def _calculate_bbox_overlap(self, bbox1: np.ndarray, bbox2: np.ndarray) -> float: """Calculate the overlap ratio between two bounding boxes.""" # Calculate intersection x1 = max(bbox1[0], bbox2[0]) y1 = max(bbox1[1], bbox2[1]) x2 = min(bbox1[2], bbox2[2]) y2 = min(bbox1[3], bbox2[3]) if x2 <= x1 or y2 <= y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) # Calculate areas area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]) area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1]) # Return intersection over smaller area (more lenient than IoU) return intersection / min(area1, area2) if min(area1, area2) > 0 else 0.0