Files
samyolo_on_segments/core/yolo_detector.py
2025-07-30 18:07:26 -07:00

1031 lines
45 KiB
Python

"""
YOLO detector module for human detection in video segments.
Preserves the core detection logic from the original implementation.
"""
import os
import cv2
import numpy as np
import logging
from typing import List, Dict, Any, Optional, Tuple
from ultralytics import YOLO
logger = logging.getLogger(__name__)
class YOLODetector:
"""Handles YOLO-based human detection for video segments with support for both detection and segmentation modes."""
def __init__(self, detection_model_path: str = None, segmentation_model_path: str = None,
mode: str = "detection", confidence_threshold: float = 0.6, human_class_id: int = 0):
"""
Initialize YOLO detector with support for both detection and segmentation modes.
Args:
detection_model_path: Path to YOLO detection model weights (e.g., yolov8n.pt)
segmentation_model_path: Path to YOLO segmentation model weights (e.g., yolov8n-seg.pt)
mode: Detection mode - "detection" for bboxes, "segmentation" for masks
confidence_threshold: Detection confidence threshold
human_class_id: COCO class ID for humans (0 = person)
"""
self.mode = mode
self.confidence_threshold = confidence_threshold
self.human_class_id = human_class_id
# Select model path based on mode
if mode == "segmentation":
if not segmentation_model_path:
raise ValueError("segmentation_model_path required for segmentation mode")
self.model_path = segmentation_model_path
self.supports_segmentation = True
elif mode == "detection":
if not detection_model_path:
raise ValueError("detection_model_path required for detection mode")
self.model_path = detection_model_path
self.supports_segmentation = False
else:
raise ValueError(f"Invalid mode: {mode}. Must be 'detection' or 'segmentation'")
# Load YOLO model
try:
self.model = YOLO(self.model_path)
logger.info(f"Loaded YOLO model in {mode} mode from {self.model_path}")
# Verify model capabilities
if mode == "segmentation":
# Test if model actually supports segmentation
logger.info(f"YOLO Segmentation: Model loaded, will output direct masks")
else:
logger.info(f"YOLO Detection: Model loaded, will output bounding boxes")
except Exception as e:
logger.error(f"Failed to load YOLO model: {e}")
raise
def detect_humans_in_frame(self, frame: np.ndarray) -> List[Dict[str, Any]]:
"""
Detect humans in a single frame using YOLO.
Args:
frame: Input frame (BGR format from OpenCV)
Returns:
List of human detection dictionaries with bbox, confidence, and optionally masks
"""
# Run YOLO detection/segmentation
results = self.model(frame, conf=self.confidence_threshold, verbose=False)
human_detections = []
# Process results
for result in results:
boxes = result.boxes
masks = result.masks if hasattr(result, 'masks') and result.masks is not None else None
if boxes is not None:
for i, box in enumerate(boxes):
# Get class ID
cls = int(box.cls.cpu().numpy()[0])
# Check if it's a person (human_class_id)
if cls == self.human_class_id:
# Get bounding box coordinates (x1, y1, x2, y2)
coords = box.xyxy[0].cpu().numpy()
conf = float(box.conf.cpu().numpy()[0])
detection = {
'bbox': coords,
'confidence': conf,
'has_mask': False,
'mask': None
}
# Extract mask if available (segmentation mode)
if masks is not None and i < len(masks.data):
mask_data = masks.data[i].cpu().numpy() # Get mask for this detection
detection['has_mask'] = True
detection['mask'] = mask_data
logger.debug(f"YOLO Segmentation: Detected human with mask - conf={conf:.2f}, mask_shape={mask_data.shape}")
else:
logger.debug(f"YOLO Detection: Detected human with bbox - conf={conf:.2f}, bbox={coords}")
human_detections.append(detection)
if self.supports_segmentation:
masks_found = sum(1 for d in human_detections if d['has_mask'])
logger.info(f"YOLO Segmentation: Found {len(human_detections)} humans, {masks_found} with masks")
else:
logger.debug(f"YOLO Detection: Found {len(human_detections)} humans with bounding boxes")
return human_detections
def detect_humans_in_video_first_frame(self, video_path: str, scale: float = 1.0) -> List[Dict[str, Any]]:
"""
Detect humans in the first frame of a video.
Args:
video_path: Path to video file
scale: Scale factor for frame processing
Returns:
List of human detection dictionaries
"""
if not os.path.exists(video_path):
logger.error(f"Video file not found: {video_path}")
return []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"Could not open video: {video_path}")
return []
ret, frame = cap.read()
cap.release()
if not ret:
logger.error(f"Could not read first frame from: {video_path}")
return []
# Scale frame if needed
if scale != 1.0:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
return self.detect_humans_in_frame(frame)
def save_detections_to_file(self, detections: List[Dict[str, Any]], output_path: str) -> bool:
"""
Save detection results to file.
Args:
detections: List of detection dictionaries
output_path: Path to save detections
Returns:
True if saved successfully
"""
try:
with open(output_path, 'w') as f:
f.write("# YOLO Human Detections\\n")
if detections:
for detection in detections:
bbox = detection['bbox']
conf = detection['confidence']
f.write(f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]},{conf}\\n")
logger.info(f"Saved {len(detections)} detections to {output_path}")
else:
f.write("# No humans detected\\n")
logger.info(f"Saved empty detection file to {output_path}")
return True
except Exception as e:
logger.error(f"Failed to save detections to {output_path}: {e}")
return False
def load_detections_from_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
Load detection results from file.
Args:
file_path: Path to detection file
Returns:
List of detection dictionaries
"""
detections = []
if not os.path.exists(file_path):
logger.warning(f"Detection file not found: {file_path}")
return detections
try:
with open(file_path, 'r') as f:
content = f.read()
# Handle files with literal \n characters
if '\\n' in content:
lines = content.split('\\n')
else:
lines = content.split('\n')
for line in lines:
line = line.strip()
# Skip comments and empty lines
if line.startswith('#') or not line:
continue
# Parse detection line: x1,y1,x2,y2,confidence
parts = line.split(',')
if len(parts) == 5:
try:
bbox = [float(x) for x in parts[:4]]
conf = float(parts[4])
detections.append({
'bbox': np.array(bbox),
'confidence': conf
})
except ValueError:
logger.warning(f"Invalid detection line: {line}")
continue
logger.info(f"Loaded {len(detections)} detections from {file_path}")
except Exception as e:
logger.error(f"Failed to load detections from {file_path}: {e}")
return detections
def debug_detect_with_lower_confidence(self, frame: np.ndarray, debug_confidence: float = 0.3) -> List[Dict[str, Any]]:
"""
Run YOLO detection with a lower confidence threshold for debugging.
This helps identify if detections are being missed due to high confidence threshold.
Args:
frame: Input frame (BGR format from OpenCV)
debug_confidence: Lower confidence threshold for debugging
Returns:
List of human detection dictionaries with lower confidence threshold
"""
logger.info(f"VR180 Debug: Running YOLO with lower confidence {debug_confidence} (vs normal {self.confidence_threshold})")
# Run YOLO detection with lower confidence
results = self.model(frame, conf=debug_confidence, verbose=False)
debug_detections = []
# Process results
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
# Get class ID
cls = int(box.cls.cpu().numpy()[0])
# Check if it's a person (human_class_id)
if cls == self.human_class_id:
# Get bounding box coordinates (x1, y1, x2, y2)
coords = box.xyxy[0].cpu().numpy()
conf = float(box.conf.cpu().numpy()[0])
debug_detections.append({
'bbox': coords,
'confidence': conf
})
logger.info(f"VR180 Debug: Lower confidence detection found {len(debug_detections)} total detections")
return debug_detections
def detect_humans_multi_frame(self, video_path: str, frame_indices: List[int],
scale: float = 1.0) -> Dict[int, List[Dict[str, Any]]]:
"""
Detect humans at multiple specific frame indices in a video.
Used for mid-segment re-detection to improve SAM2 tracking.
Args:
video_path: Path to video file
frame_indices: List of frame indices to run detection on (e.g., [0, 30, 60, 90])
scale: Scale factor for frame processing
Returns:
Dictionary mapping frame_index -> list of detection dictionaries
"""
if not frame_indices:
logger.warning("No frame indices provided for multi-frame detection")
return {}
if not os.path.exists(video_path):
logger.error(f"Video file not found: {video_path}")
return {}
logger.info(f"Mid-segment Detection: Running YOLO on {len(frame_indices)} frames: {frame_indices}")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"Could not open video: {video_path}")
return {}
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
# Filter out frame indices that are beyond video length
valid_frame_indices = [idx for idx in frame_indices if 0 <= idx < total_frames]
if len(valid_frame_indices) != len(frame_indices):
invalid_frames = [idx for idx in frame_indices if idx not in valid_frame_indices]
logger.warning(f"Mid-segment Detection: Skipping invalid frame indices: {invalid_frames} (video has {total_frames} frames)")
multi_frame_detections = {}
for frame_idx in valid_frame_indices:
# Seek to specific frame
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
logger.warning(f"Mid-segment Detection: Could not read frame {frame_idx}")
continue
# Scale frame if needed
if scale != 1.0:
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
# Run YOLO detection on this frame
detections = self.detect_humans_in_frame(frame)
multi_frame_detections[frame_idx] = detections
# Log detection results
time_seconds = frame_idx / fps
logger.info(f"Mid-segment Detection: Frame {frame_idx} (t={time_seconds:.1f}s): {len(detections)} humans detected")
for i, detection in enumerate(detections):
bbox = detection['bbox']
conf = detection['confidence']
logger.debug(f"Mid-segment Detection: Frame {frame_idx}, Human {i+1}: bbox={bbox}, conf={conf:.3f}")
cap.release()
total_detections = sum(len(dets) for dets in multi_frame_detections.values())
logger.info(f"Mid-segment Detection: Complete - {total_detections} total detections across {len(valid_frame_indices)} frames")
return multi_frame_detections
def process_segments_batch(self, segments_info: List[dict], detect_segments: List[int],
scale: float = 0.5) -> Dict[int, List[Dict[str, Any]]]:
"""
Process multiple segments for human detection.
Args:
segments_info: List of segment information dictionaries
detect_segments: List of segment indices to process
scale: Scale factor for processing
Returns:
Dictionary mapping segment index to detection results
"""
results = {}
for segment_info in segments_info:
segment_idx = segment_info['index']
# Skip if not in detect_segments list
if detect_segments != 'all' and segment_idx not in detect_segments:
continue
video_path = segment_info['video_file']
detection_file = os.path.join(segment_info['directory'], "yolo_detections")
# Skip if already processed
if os.path.exists(detection_file):
logger.info(f"Segment {segment_idx} already has detections, skipping")
detections = self.load_detections_from_file(detection_file)
results[segment_idx] = detections
continue
# Run detection
logger.info(f"Processing segment {segment_idx} for human detection")
detections = self.detect_humans_in_video_first_frame(video_path, scale)
# Save results
self.save_detections_to_file(detections, detection_file)
results[segment_idx] = detections
return results
def convert_detections_to_sam2_prompts(self, detections: List[Dict[str, Any]],
frame_width: int) -> List[Dict[str, Any]]:
"""
Convert YOLO detections to SAM2-compatible prompts for VR180 SBS video.
For VR180, we expect 2 real detections (left and right eye views), not mirrored ones.
Args:
detections: List of YOLO detection results
frame_width: Width of the video frame
Returns:
List of SAM2 prompt dictionaries with obj_id and bbox
"""
if not detections:
logger.warning("No detections provided for SAM2 prompt conversion")
return []
half_frame_width = frame_width // 2
prompts = []
logger.info(f"VR180 SBS Debug: Converting {len(detections)} detections for frame width {frame_width}")
logger.info(f"VR180 SBS Debug: Half frame width = {half_frame_width}")
# Sort detections by x-coordinate to get consistent left/right assignment
sorted_detections = sorted(detections, key=lambda x: x['bbox'][0])
# Analyze detections by frame half
left_detections = []
right_detections = []
for i, detection in enumerate(sorted_detections):
bbox = detection['bbox'].copy()
center_x = (bbox[0] + bbox[2]) / 2
pixel_range = f"{bbox[0]:.0f}-{bbox[2]:.0f}"
if center_x < half_frame_width:
left_detections.append((detection, i, pixel_range))
side = "LEFT"
else:
right_detections.append((detection, i, pixel_range))
side = "RIGHT"
logger.info(f"VR180 SBS Debug: Detection {i}: pixels {pixel_range}, center_x={center_x:.1f}, side={side}")
# VR180 SBS Format Validation
logger.info(f"VR180 SBS Debug: Found {len(left_detections)} LEFT detections, {len(right_detections)} RIGHT detections")
# Analyze confidence scores
if left_detections:
left_confidences = [det[0]['confidence'] for det in left_detections]
logger.info(f"VR180 SBS Debug: LEFT eye confidences: {[f'{c:.3f}' for c in left_confidences]}")
if right_detections:
right_confidences = [det[0]['confidence'] for det in right_detections]
logger.info(f"VR180 SBS Debug: RIGHT eye confidences: {[f'{c:.3f}' for c in right_confidences]}")
if len(right_detections) == 0:
logger.warning(f"VR180 SBS Warning: No detections found in RIGHT eye view (pixels {half_frame_width}-{frame_width})")
logger.warning(f"VR180 SBS Warning: This may indicate:")
logger.warning(f" 1. Person not visible in right eye view")
logger.warning(f" 2. YOLO confidence threshold ({self.confidence_threshold}) too high")
logger.warning(f" 3. VR180 SBS format issue")
logger.warning(f" 4. Right eye view quality/lighting problems")
logger.warning(f"VR180 SBS Suggestion: Try lowering yolo_confidence to 0.3-0.4 in config")
if len(left_detections) == 0:
logger.warning(f"VR180 SBS Warning: No detections found in LEFT eye view (pixels 0-{half_frame_width})")
# Additional validation for VR180 SBS expectations
total_detections = len(left_detections) + len(right_detections)
if total_detections == 1:
logger.warning(f"VR180 SBS Warning: Only 1 detection found - expected 2 for proper VR180 SBS")
elif total_detections > 2:
logger.warning(f"VR180 SBS Warning: {total_detections} detections found - will use only first 2")
# Assign object IDs sequentially, regardless of which half they're in
# This ensures we always get Object 1 and Object 2 for up to 2 detections
obj_id = 1
# Process up to 2 detections total (left + right combined)
all_detections = sorted_detections[:2]
for i, detection in enumerate(all_detections):
bbox = detection['bbox'].copy()
center_x = (bbox[0] + bbox[2]) / 2
pixel_range = f"{bbox[0]:.0f}-{bbox[2]:.0f}"
# Determine which eye view this detection is in
if center_x < half_frame_width:
eye_view = "LEFT"
else:
eye_view = "RIGHT"
prompts.append({
'obj_id': obj_id,
'bbox': bbox,
'confidence': detection['confidence']
})
logger.info(f"VR180 SBS Debug: Added {eye_view} eye detection as SAM2 Object {obj_id}")
logger.info(f"VR180 SBS Debug: Object {obj_id} bbox: {bbox} (pixels {pixel_range})")
obj_id += 1
logger.info(f"VR180 SBS Debug: Final result - {len(detections)} YOLO detections → {len(prompts)} SAM2 prompts")
# Verify we have the expected objects
obj_ids = [p['obj_id'] for p in prompts]
logger.info(f"VR180 SBS Debug: SAM2 Object IDs created: {obj_ids}")
return prompts
def convert_yolo_masks_to_video_segments(self, detections: List[Dict[str, Any]],
frame_width: int, target_frame_shape: Tuple[int, int] = None) -> Optional[Dict[int, Dict[int, np.ndarray]]]:
"""
Convert YOLO segmentation masks to SAM2-compatible video segments format.
This allows using YOLO masks directly without SAM2 processing.
Args:
detections: List of YOLO detection results with masks
frame_width: Width of the video frame for VR180 object ID assignment
target_frame_shape: Target shape (height, width) for mask resizing
Returns:
Video segments dictionary compatible with SAM2 output format, or None if no masks
"""
if not detections:
logger.warning("No detections provided for mask conversion")
return None
# Check if any detections have masks
detections_with_masks = [d for d in detections if d.get('has_mask', False)]
if not detections_with_masks:
logger.warning("No detections have masks - YOLO segmentation may not be working")
return None
logger.info(f"YOLO Mask Conversion: Converting {len(detections_with_masks)} YOLO masks to video segments format")
half_frame_width = frame_width // 2
video_segments = {}
# Create frame 0 with converted masks
frame_masks = {}
obj_id = 1
# Sort detections by x-coordinate for consistent VR180 SBS assignment
sorted_detections = sorted(detections_with_masks, key=lambda x: x['bbox'][0])
for i, detection in enumerate(sorted_detections[:2]): # Take up to 2 humans
mask = detection['mask']
bbox = detection['bbox']
center_x = (bbox[0] + bbox[2]) / 2
# Assign sequential object IDs (similar to prompt conversion logic)
current_obj_id = obj_id
# Determine which eye view for logging
if center_x < half_frame_width:
eye_view = "LEFT"
else:
eye_view = "RIGHT"
# Resize mask to target frame shape if specified
if target_frame_shape and mask.shape != target_frame_shape:
mask_resized = cv2.resize(mask.astype(np.float32), (target_frame_shape[1], target_frame_shape[0]), interpolation=cv2.INTER_NEAREST)
mask = (mask_resized > 0.5).astype(bool)
else:
mask = mask.astype(bool)
frame_masks[current_obj_id] = mask
logger.info(f"YOLO Mask Conversion: {eye_view} eye detection -> Object {current_obj_id}, mask_shape={mask.shape}, pixels={np.sum(mask)}")
obj_id += 1 # Always increment for next detection
# Store masks in video segments format (single frame)
video_segments[0] = frame_masks
total_objects = len(frame_masks)
total_pixels = sum(np.sum(mask) for mask in frame_masks.values())
logger.info(f"YOLO Mask Conversion: Created video segments with {total_objects} objects, {total_pixels} total mask pixels")
return video_segments
def save_debug_frame_with_detections(self, frame: np.ndarray, detections: List[Dict[str, Any]],
output_path: str, prompts: List[Dict[str, Any]] = None) -> bool:
"""
Save a debug frame with YOLO detections and SAM2 prompts overlaid as bounding boxes.
Args:
frame: Input frame (BGR format from OpenCV)
detections: List of detection dictionaries with bbox and confidence
output_path: Path to save the debug image
prompts: Optional list of SAM2 prompt dictionaries with obj_id and bbox
Returns:
True if saved successfully
"""
try:
debug_frame = frame.copy()
# Draw masks (if available) or bounding boxes for each detection
for i, detection in enumerate(detections):
bbox = detection['bbox']
confidence = detection['confidence']
has_mask = detection.get('has_mask', False)
# Extract coordinates
x1, y1, x2, y2 = map(int, bbox)
# Choose color based on confidence (green for high, yellow for medium, red for low)
if confidence >= 0.8:
color = (0, 255, 0) # Green
elif confidence >= 0.6:
color = (0, 255, 255) # Yellow
else:
color = (0, 0, 255) # Red
if has_mask and 'mask' in detection:
# Draw segmentation mask
mask = detection['mask']
# Resize mask to match frame if needed
if mask.shape != debug_frame.shape[:2]:
mask = cv2.resize(mask.astype(np.float32), (debug_frame.shape[1], debug_frame.shape[0]), interpolation=cv2.INTER_NEAREST)
mask = mask > 0.5
mask = mask.astype(bool)
# Apply colored overlay with transparency
overlay = debug_frame.copy()
overlay[mask] = color
cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame)
# Draw mask outline
contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(debug_frame, contours, -1, color, 2)
# Prepare label text for segmentation
label = f"Person {i+1}: {confidence:.2f} (MASK)"
else:
# Draw bounding box (detection mode or no mask available)
cv2.rectangle(debug_frame, (x1, y1), (x2, y2), color, 2)
# Prepare label text for detection
label = f"Person {i+1}: {confidence:.2f} (BBOX)"
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
# Draw label background
cv2.rectangle(debug_frame,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0], y1),
color, -1)
# Draw label text
cv2.putText(debug_frame, label,
(x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(255, 255, 255), 2)
# Draw SAM2 prompts if provided (with different colors/style)
if prompts:
for prompt in prompts:
obj_id = prompt['obj_id']
bbox = prompt['bbox']
# Extract coordinates
x1, y1, x2, y2 = map(int, bbox)
# Use different colors for each object ID
if obj_id == 1:
prompt_color = (0, 255, 0) # Green for Object 1
elif obj_id == 2:
prompt_color = (255, 0, 0) # Blue for Object 2
else:
prompt_color = (255, 255, 0) # Cyan for others
# Draw thicker, dashed-style border for SAM2 prompts
thickness = 3
cv2.rectangle(debug_frame, (x1-2, y1-2), (x2+2, y2+2), prompt_color, thickness)
# Add SAM2 object ID label
sam_label = f"SAM2 Obj {obj_id}"
label_size = cv2.getTextSize(sam_label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
# Draw label background
cv2.rectangle(debug_frame,
(x1-2, y2+5),
(x1-2 + label_size[0], y2+5 + label_size[1] + 5),
prompt_color, -1)
# Draw label text
cv2.putText(debug_frame, sam_label,
(x1-2, y2+5 + label_size[1]),
cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(255, 255, 255), 2)
# Draw VR180 SBS boundary line (center line separating left and right eye views)
frame_height, frame_width = debug_frame.shape[:2]
center_x = frame_width // 2
cv2.line(debug_frame, (center_x, 0), (center_x, frame_height), (0, 255, 255), 3) # Yellow line
# Add VR180 SBS labels
cv2.putText(debug_frame, "LEFT EYE", (10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
cv2.putText(debug_frame, "RIGHT EYE", (center_x + 10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
# Add summary text at top with mode information
mode_text = f"YOLO Mode: {self.mode.upper()}"
masks_available = sum(1 for d in detections if d.get('has_mask', False))
if self.supports_segmentation and masks_available > 0:
summary = f"VR180 SBS: {len(detections)} detections → {masks_available} MASKS (for SAM2 propagation)"
else:
summary = f"VR180 SBS: {len(detections)} detections → {len(prompts) if prompts else 0} SAM2 prompts"
cv2.putText(debug_frame, mode_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8,
(0, 255, 255), 2) # Yellow for mode
cv2.putText(debug_frame, summary,
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 1.0,
(255, 255, 255), 2)
# Add frame dimensions info
dims_info = f"Frame: {frame_width}x{frame_height}, Center: {center_x}"
cv2.putText(debug_frame, dims_info,
(10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(255, 255, 255), 2)
# Save debug frame
success = cv2.imwrite(output_path, debug_frame)
if success:
logger.info(f"Saved YOLO debug frame to {output_path}")
else:
logger.error(f"Failed to save debug frame to {output_path}")
return success
except Exception as e:
logger.error(f"Error creating debug frame: {e}")
return False
def detect_humans_in_single_eye(self, frame: np.ndarray, eye_side: str) -> List[Dict[str, Any]]:
"""
Detect humans in a single eye frame (left or right).
Args:
frame: Input eye frame (BGR format)
eye_side: 'left' or 'right' eye
Returns:
List of human detection dictionaries for the single eye
"""
logger.info(f"Running YOLO detection on {eye_side} eye frame")
# Run standard detection on the eye frame
detections = self.detect_humans_in_frame(frame)
logger.info(f"YOLO {eye_side.upper()} Eye: Found {len(detections)} human detections")
for i, detection in enumerate(detections):
bbox = detection['bbox']
conf = detection['confidence']
has_mask = detection.get('has_mask', False)
logger.debug(f"YOLO {eye_side.upper()} Eye Detection {i+1}: bbox={bbox}, conf={conf:.3f}, has_mask={has_mask}")
return detections
def convert_eye_detections_to_sam2_prompts(self, detections: List[Dict[str, Any]],
eye_side: str) -> List[Dict[str, Any]]:
"""
Convert single eye detections to SAM2 prompts (always uses obj_id=1 for single eye processing).
Args:
detections: List of YOLO detection results for single eye
eye_side: 'left' or 'right' eye
Returns:
List of SAM2 prompt dictionaries with obj_id=1 for single eye processing
"""
if not detections:
logger.warning(f"No detections provided for {eye_side} eye SAM2 prompt conversion")
return []
logger.info(f"Converting {len(detections)} {eye_side} eye detections to SAM2 prompts")
prompts = []
# For single eye processing, always use obj_id=1 and take the best detection
best_detection = max(detections, key=lambda x: x['confidence'])
prompts.append({
'obj_id': 1, # Always use obj_id=1 for single eye processing
'bbox': best_detection['bbox'].copy(),
'confidence': best_detection['confidence']
})
logger.info(f"{eye_side.upper()} Eye: Converted best detection (conf={best_detection['confidence']:.3f}) to SAM2 Object 1")
return prompts
def has_any_detections(self, detections_list: List[List[Dict[str, Any]]]) -> bool:
"""
Check if any detections exist in a list of detection lists.
Args:
detections_list: List of detection lists (e.g., [left_detections, right_detections])
Returns:
True if any detections are found
"""
for detections in detections_list:
if detections:
return True
return False
def split_detections_by_eye(self, detections: List[Dict[str, Any]], frame_width: int) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Split VR180 detections into left and right eye detections with coordinate conversion.
Args:
detections: List of full-frame VR180 detections
frame_width: Width of the full VR180 frame
Returns:
Tuple of (left_eye_detections, right_eye_detections) with converted coordinates
"""
half_width = frame_width // 2
left_detections = []
right_detections = []
logger.info(f"Splitting {len(detections)} VR180 detections by eye (frame_width={frame_width}, half_width={half_width})")
for i, detection in enumerate(detections):
bbox = detection['bbox']
center_x = (bbox[0] + bbox[2]) / 2
logger.info(f"Detection {i}: bbox={bbox}, center_x={center_x:.1f}")
# Create a copy with converted coordinates
converted_detection = detection.copy()
converted_bbox = bbox.copy()
if center_x < half_width:
# Left eye detection - coordinates remain the same
# For segmentation mode, we also need to crop the mask to the left eye
if detection.get('has_mask', False) and 'mask' in detection:
original_mask = detection['mask']
# Crop mask to left half (keep original coordinates for now, will be handled in eye processing)
converted_detection['mask'] = original_mask
logger.info(f"Detection {i}: LEFT eye mask shape: {original_mask.shape}")
left_detections.append(converted_detection)
logger.info(f"Detection {i}: Assigned to LEFT eye, center_x={center_x:.1f} < {half_width}, bbox={bbox}")
else:
# Right eye detection - shift coordinates to start from 0
original_bbox = converted_bbox.copy()
converted_bbox[0] -= half_width # x1
converted_bbox[2] -= half_width # x2
# Ensure coordinates are within bounds
converted_bbox[0] = max(0, converted_bbox[0])
converted_bbox[2] = max(0, min(converted_bbox[2], half_width))
converted_detection['bbox'] = converted_bbox
# For segmentation mode, we also need to crop the mask to the right eye
if detection.get('has_mask', False) and 'mask' in detection:
original_mask = detection['mask']
# Crop mask to right half and shift coordinates
# Note: This is a simplified approach - the mask coordinates need to be handled properly
converted_detection['mask'] = original_mask # Will be properly handled in eye processing
logger.info(f"Detection {i}: RIGHT eye mask shape: {original_mask.shape}")
right_detections.append(converted_detection)
logger.info(f"Detection {i}: Assigned to RIGHT eye, center_x={center_x:.1f} >= {half_width}, original_bbox={original_bbox}, converted_bbox={converted_bbox}")
logger.info(f"Split result: {len(left_detections)} left eye, {len(right_detections)} right eye detections")
return left_detections, right_detections
def save_eye_debug_frames(self, left_frame: np.ndarray, right_frame: np.ndarray,
left_detections: List[Dict[str, Any]], right_detections: List[Dict[str, Any]],
left_output_path: str, right_output_path: str) -> Tuple[bool, bool]:
"""
Save debug frames for both left and right eye detections.
Args:
left_frame: Left eye frame
right_frame: Right eye frame
left_detections: Left eye detections
right_detections: Right eye detections
left_output_path: Output path for left eye debug frame
right_output_path: Output path for right eye debug frame
Returns:
Tuple of (left_success, right_success)
"""
logger.info(f"Saving eye-specific debug frames")
# Save left eye debug frame (eye-specific version)
left_success = self._save_single_eye_debug_frame(
left_frame, left_detections, left_output_path, "LEFT"
)
# Save right eye debug frame (eye-specific version)
right_success = self._save_single_eye_debug_frame(
right_frame, right_detections, right_output_path, "RIGHT"
)
if left_success:
logger.info(f"Saved left eye debug frame: {left_output_path}")
if right_success:
logger.info(f"Saved right eye debug frame: {right_output_path}")
return left_success, right_success
def _save_single_eye_debug_frame(self, frame: np.ndarray, detections: List[Dict[str, Any]],
output_path: str, eye_side: str) -> bool:
"""
Save a debug frame for a single eye with eye-specific visualizations.
Args:
frame: Single eye frame (BGR format from OpenCV)
detections: List of detection dictionaries for this eye
output_path: Path to save the debug image
eye_side: "LEFT" or "RIGHT"
Returns:
True if saved successfully
"""
try:
debug_frame = frame.copy()
# Draw masks or bounding boxes for each detection
for i, detection in enumerate(detections):
bbox = detection['bbox']
confidence = detection['confidence']
has_mask = detection.get('has_mask', False)
# Extract coordinates
x1, y1, x2, y2 = map(int, bbox)
# Choose color based on confidence (green for high, yellow for medium, red for low)
if confidence >= 0.8:
color = (0, 255, 0) # Green
elif confidence >= 0.6:
color = (0, 255, 255) # Yellow
else:
color = (0, 0, 255) # Red
if has_mask and 'mask' in detection:
# Draw segmentation mask
mask = detection['mask']
# Resize mask to match frame if needed
if mask.shape != debug_frame.shape[:2]:
mask = cv2.resize(mask.astype(np.float32), (debug_frame.shape[1], debug_frame.shape[0]), interpolation=cv2.INTER_NEAREST)
mask = mask > 0.5
mask = mask.astype(bool)
# Apply colored overlay with transparency
overlay = debug_frame.copy()
overlay[mask] = color
cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame)
# Draw mask outline
contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(debug_frame, contours, -1, color, 2)
# Prepare label text for segmentation
label = f"Person {i+1}: {confidence:.2f} (MASK)"
else:
# Draw bounding box (detection mode or no mask available)
cv2.rectangle(debug_frame, (x1, y1), (x2, y2), color, 2)
# Prepare label text for detection
label = f"Person {i+1}: {confidence:.2f} (BBOX)"
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
# Draw label background
cv2.rectangle(debug_frame,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0], y1),
color, -1)
# Draw label text
cv2.putText(debug_frame, label,
(x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(255, 255, 255), 2)
# Add title specific to this eye
frame_height, frame_width = debug_frame.shape[:2]
title = f"{eye_side} EYE: {len(detections)} detections"
cv2.putText(debug_frame, title, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
# Add mode information
mode_text = f"YOLO Mode: {self.mode.upper()}"
masks_available = sum(1 for d in detections if d.get('has_mask', False))
if self.supports_segmentation and masks_available > 0:
summary = f"{len(detections)} detections → {masks_available} MASKS"
else:
summary = f"{len(detections)} detections → BOUNDING BOXES"
cv2.putText(debug_frame, mode_text,
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8,
(0, 255, 255), 2) # Yellow for mode
cv2.putText(debug_frame, summary,
(10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.8,
(255, 255, 255), 2)
# Add frame dimensions info
dims_info = f"Frame: {frame_width}x{frame_height}"
cv2.putText(debug_frame, dims_info,
(10, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(255, 255, 255), 2)
# Save debug frame
success = cv2.imwrite(output_path, debug_frame)
if success:
logger.info(f"Saved {eye_side} eye debug frame to {output_path}")
else:
logger.error(f"Failed to save {eye_side} eye debug frame to {output_path}")
return success
except Exception as e:
logger.error(f"Error creating {eye_side} eye debug frame: {e}")
return False