""" Person detector using YOLOv8 for streaming pipeline """ import numpy as np from typing import List, Dict, Any, Optional import warnings try: from ultralytics import YOLO except ImportError: warnings.warn("Ultralytics YOLO not installed. Please install with: pip install ultralytics") YOLO = None class PersonDetector: """YOLO-based person detector for VR180 streaming""" def __init__(self, config: Dict[str, Any]): self.config = config self.confidence_threshold = config.get('detection', {}).get('confidence_threshold', 0.7) self.model_name = config.get('detection', {}).get('model', 'yolov8n') self.device = config.get('detection', {}).get('device', 'cuda') self.model = None self._load_model() # Statistics self.stats = { 'frames_processed': 0, 'total_detections': 0, 'avg_detections_per_frame': 0.0 } def _load_model(self) -> None: """Load YOLO model""" if YOLO is None: raise RuntimeError("YOLO not available. Please install ultralytics.") try: # Load pretrained model model_file = f"{self.model_name}.pt" self.model = YOLO(model_file) self.model.to(self.device) print(f"🎯 Person detector initialized:") print(f" Model: {self.model_name}") print(f" Device: {self.device}") print(f" Confidence threshold: {self.confidence_threshold}") except Exception as e: raise RuntimeError(f"Failed to load YOLO model: {e}") def detect_persons(self, frame: np.ndarray) -> List[Dict[str, Any]]: """ Detect persons in frame Args: frame: Input frame (BGR) Returns: List of detection dictionaries with 'box', 'confidence' keys """ if self.model is None: return [] # Run detection results = self.model(frame, verbose=False, conf=self.confidence_threshold) detections = [] for r in results: if r.boxes is not None: for box in r.boxes: # Check if detection is person (class 0 in COCO) if int(box.cls) == 0: # Get box coordinates x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf) detection = { 'box': [int(x1), int(y1), int(x2), int(y2)], 'confidence': confidence, 'area': (x2 - x1) * (y2 - y1), 'center': [(x1 + x2) / 2, (y1 + y2) / 2] } detections.append(detection) # Update statistics self.stats['frames_processed'] += 1 self.stats['total_detections'] += len(detections) self.stats['avg_detections_per_frame'] = ( self.stats['total_detections'] / self.stats['frames_processed'] ) return detections def detect_persons_batch(self, frames: List[np.ndarray]) -> List[List[Dict[str, Any]]]: """ Detect persons in batch of frames Args: frames: List of frames Returns: List of detection lists """ if not frames or self.model is None: return [] # Process batch results_batch = self.model(frames, verbose=False, conf=self.confidence_threshold) all_detections = [] for results in results_batch: frame_detections = [] if results.boxes is not None: for box in results.boxes: if int(box.cls) == 0: # Person class x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf) detection = { 'box': [int(x1), int(y1), int(x2), int(y2)], 'confidence': confidence, 'area': (x2 - x1) * (y2 - y1), 'center': [(x1 + x2) / 2, (y1 + y2) / 2] } frame_detections.append(detection) all_detections.append(frame_detections) # Update statistics self.stats['frames_processed'] += len(frames) self.stats['total_detections'] += sum(len(d) for d in all_detections) self.stats['avg_detections_per_frame'] = ( self.stats['total_detections'] / self.stats['frames_processed'] ) return all_detections def filter_detections(self, detections: List[Dict[str, Any]], min_area: Optional[float] = None, max_detections: Optional[int] = None) -> List[Dict[str, Any]]: """ Filter detections based on criteria Args: detections: List of detections min_area: Minimum bounding box area max_detections: Maximum number of detections to keep Returns: Filtered detections """ filtered = detections.copy() # Filter by minimum area if min_area is not None: filtered = [d for d in filtered if d['area'] >= min_area] # Sort by confidence and keep top N if max_detections is not None and len(filtered) > max_detections: filtered = sorted(filtered, key=lambda x: x['confidence'], reverse=True) filtered = filtered[:max_detections] return filtered def convert_to_sam_prompts(self, detections: List[Dict[str, Any]]) -> tuple: """ Convert detections to SAM2 prompt format Args: detections: List of detections Returns: Tuple of (boxes, labels) for SAM2 """ if not detections: return [], [] boxes = [d['box'] for d in detections] # All detections are positive prompts (label=1) labels = [1] * len(detections) return boxes, labels def get_stats(self) -> Dict[str, Any]: """Get detection statistics""" return self.stats.copy() def reset_stats(self) -> None: """Reset statistics""" self.stats = { 'frames_processed': 0, 'total_detections': 0, 'avg_detections_per_frame': 0.0 } def warmup(self, input_shape: tuple = (1080, 1920, 3)) -> None: """ Warmup model with dummy inference Args: input_shape: Shape of input frames """ if self.model is None: return print("🔥 Warming up detector...") dummy_frame = np.zeros(input_shape, dtype=np.uint8) _ = self.detect_persons(dummy_frame) print(" Detector ready!") def set_confidence_threshold(self, threshold: float) -> None: """Update confidence threshold""" self.confidence_threshold = max(0.1, min(0.99, threshold)) def __del__(self): """Cleanup""" self.model = None