Files
test2/vr180_streaming/detector.py
2025-07-27 08:01:08 -07:00

223 lines
7.5 KiB
Python

"""
Person detector using YOLOv8 for streaming pipeline
"""
import numpy as np
from typing import List, Dict, Any, Optional
import warnings
try:
from ultralytics import YOLO
except ImportError:
warnings.warn("Ultralytics YOLO not installed. Please install with: pip install ultralytics")
YOLO = None
class PersonDetector:
"""YOLO-based person detector for VR180 streaming"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.confidence_threshold = config.get('detection', {}).get('confidence_threshold', 0.7)
self.model_name = config.get('detection', {}).get('model', 'yolov8n')
self.device = config.get('detection', {}).get('device', 'cuda')
self.model = None
self._load_model()
# Statistics
self.stats = {
'frames_processed': 0,
'total_detections': 0,
'avg_detections_per_frame': 0.0
}
def _load_model(self) -> None:
"""Load YOLO model"""
if YOLO is None:
raise RuntimeError("YOLO not available. Please install ultralytics.")
try:
# Load pretrained model
model_file = f"{self.model_name}.pt"
self.model = YOLO(model_file)
self.model.to(self.device)
print(f"🎯 Person detector initialized:")
print(f" Model: {self.model_name}")
print(f" Device: {self.device}")
print(f" Confidence threshold: {self.confidence_threshold}")
except Exception as e:
raise RuntimeError(f"Failed to load YOLO model: {e}")
def detect_persons(self, frame: np.ndarray) -> List[Dict[str, Any]]:
"""
Detect persons in frame
Args:
frame: Input frame (BGR)
Returns:
List of detection dictionaries with 'box', 'confidence' keys
"""
if self.model is None:
return []
# Run detection
results = self.model(frame, verbose=False, conf=self.confidence_threshold)
detections = []
for r in results:
if r.boxes is not None:
for box in r.boxes:
# Check if detection is person (class 0 in COCO)
if int(box.cls) == 0:
# Get box coordinates
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf)
detection = {
'box': [int(x1), int(y1), int(x2), int(y2)],
'confidence': confidence,
'area': (x2 - x1) * (y2 - y1),
'center': [(x1 + x2) / 2, (y1 + y2) / 2]
}
detections.append(detection)
# Update statistics
self.stats['frames_processed'] += 1
self.stats['total_detections'] += len(detections)
self.stats['avg_detections_per_frame'] = (
self.stats['total_detections'] / self.stats['frames_processed']
)
return detections
def detect_persons_batch(self, frames: List[np.ndarray]) -> List[List[Dict[str, Any]]]:
"""
Detect persons in batch of frames
Args:
frames: List of frames
Returns:
List of detection lists
"""
if not frames or self.model is None:
return []
# Process batch
results_batch = self.model(frames, verbose=False, conf=self.confidence_threshold)
all_detections = []
for results in results_batch:
frame_detections = []
if results.boxes is not None:
for box in results.boxes:
if int(box.cls) == 0: # Person class
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf)
detection = {
'box': [int(x1), int(y1), int(x2), int(y2)],
'confidence': confidence,
'area': (x2 - x1) * (y2 - y1),
'center': [(x1 + x2) / 2, (y1 + y2) / 2]
}
frame_detections.append(detection)
all_detections.append(frame_detections)
# Update statistics
self.stats['frames_processed'] += len(frames)
self.stats['total_detections'] += sum(len(d) for d in all_detections)
self.stats['avg_detections_per_frame'] = (
self.stats['total_detections'] / self.stats['frames_processed']
)
return all_detections
def filter_detections(self,
detections: List[Dict[str, Any]],
min_area: Optional[float] = None,
max_detections: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Filter detections based on criteria
Args:
detections: List of detections
min_area: Minimum bounding box area
max_detections: Maximum number of detections to keep
Returns:
Filtered detections
"""
filtered = detections.copy()
# Filter by minimum area
if min_area is not None:
filtered = [d for d in filtered if d['area'] >= min_area]
# Sort by confidence and keep top N
if max_detections is not None and len(filtered) > max_detections:
filtered = sorted(filtered, key=lambda x: x['confidence'], reverse=True)
filtered = filtered[:max_detections]
return filtered
def convert_to_sam_prompts(self,
detections: List[Dict[str, Any]]) -> tuple:
"""
Convert detections to SAM2 prompt format
Args:
detections: List of detections
Returns:
Tuple of (boxes, labels) for SAM2
"""
if not detections:
return [], []
boxes = [d['box'] for d in detections]
# All detections are positive prompts (label=1)
labels = [1] * len(detections)
return boxes, labels
def get_stats(self) -> Dict[str, Any]:
"""Get detection statistics"""
return self.stats.copy()
def reset_stats(self) -> None:
"""Reset statistics"""
self.stats = {
'frames_processed': 0,
'total_detections': 0,
'avg_detections_per_frame': 0.0
}
def warmup(self, input_shape: tuple = (1080, 1920, 3)) -> None:
"""
Warmup model with dummy inference
Args:
input_shape: Shape of input frames
"""
if self.model is None:
return
print("🔥 Warming up detector...")
dummy_frame = np.zeros(input_shape, dtype=np.uint8)
_ = self.detect_persons(dummy_frame)
print(" Detector ready!")
def set_confidence_threshold(self, threshold: float) -> None:
"""Update confidence threshold"""
self.confidence_threshold = max(0.1, min(0.99, threshold))
def __del__(self):
"""Cleanup"""
self.model = None