223 lines
7.5 KiB
Python
223 lines
7.5 KiB
Python
"""
|
|
Person detector using YOLOv8 for streaming pipeline
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import List, Dict, Any, Optional
|
|
import warnings
|
|
|
|
try:
|
|
from ultralytics import YOLO
|
|
except ImportError:
|
|
warnings.warn("Ultralytics YOLO not installed. Please install with: pip install ultralytics")
|
|
YOLO = None
|
|
|
|
|
|
class PersonDetector:
|
|
"""YOLO-based person detector for VR180 streaming"""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.confidence_threshold = config.get('detection', {}).get('confidence_threshold', 0.7)
|
|
self.model_name = config.get('detection', {}).get('model', 'yolov8n')
|
|
self.device = config.get('detection', {}).get('device', 'cuda')
|
|
|
|
self.model = None
|
|
self._load_model()
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
'frames_processed': 0,
|
|
'total_detections': 0,
|
|
'avg_detections_per_frame': 0.0
|
|
}
|
|
|
|
def _load_model(self) -> None:
|
|
"""Load YOLO model"""
|
|
if YOLO is None:
|
|
raise RuntimeError("YOLO not available. Please install ultralytics.")
|
|
|
|
try:
|
|
# Load pretrained model
|
|
model_file = f"{self.model_name}.pt"
|
|
self.model = YOLO(model_file)
|
|
self.model.to(self.device)
|
|
|
|
print(f"🎯 Person detector initialized:")
|
|
print(f" Model: {self.model_name}")
|
|
print(f" Device: {self.device}")
|
|
print(f" Confidence threshold: {self.confidence_threshold}")
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to load YOLO model: {e}")
|
|
|
|
def detect_persons(self, frame: np.ndarray) -> List[Dict[str, Any]]:
|
|
"""
|
|
Detect persons in frame
|
|
|
|
Args:
|
|
frame: Input frame (BGR)
|
|
|
|
Returns:
|
|
List of detection dictionaries with 'box', 'confidence' keys
|
|
"""
|
|
if self.model is None:
|
|
return []
|
|
|
|
# Run detection
|
|
results = self.model(frame, verbose=False, conf=self.confidence_threshold)
|
|
|
|
detections = []
|
|
for r in results:
|
|
if r.boxes is not None:
|
|
for box in r.boxes:
|
|
# Check if detection is person (class 0 in COCO)
|
|
if int(box.cls) == 0:
|
|
# Get box coordinates
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
confidence = float(box.conf)
|
|
|
|
detection = {
|
|
'box': [int(x1), int(y1), int(x2), int(y2)],
|
|
'confidence': confidence,
|
|
'area': (x2 - x1) * (y2 - y1),
|
|
'center': [(x1 + x2) / 2, (y1 + y2) / 2]
|
|
}
|
|
detections.append(detection)
|
|
|
|
# Update statistics
|
|
self.stats['frames_processed'] += 1
|
|
self.stats['total_detections'] += len(detections)
|
|
self.stats['avg_detections_per_frame'] = (
|
|
self.stats['total_detections'] / self.stats['frames_processed']
|
|
)
|
|
|
|
return detections
|
|
|
|
def detect_persons_batch(self, frames: List[np.ndarray]) -> List[List[Dict[str, Any]]]:
|
|
"""
|
|
Detect persons in batch of frames
|
|
|
|
Args:
|
|
frames: List of frames
|
|
|
|
Returns:
|
|
List of detection lists
|
|
"""
|
|
if not frames or self.model is None:
|
|
return []
|
|
|
|
# Process batch
|
|
results_batch = self.model(frames, verbose=False, conf=self.confidence_threshold)
|
|
|
|
all_detections = []
|
|
for results in results_batch:
|
|
frame_detections = []
|
|
|
|
if results.boxes is not None:
|
|
for box in results.boxes:
|
|
if int(box.cls) == 0: # Person class
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
confidence = float(box.conf)
|
|
|
|
detection = {
|
|
'box': [int(x1), int(y1), int(x2), int(y2)],
|
|
'confidence': confidence,
|
|
'area': (x2 - x1) * (y2 - y1),
|
|
'center': [(x1 + x2) / 2, (y1 + y2) / 2]
|
|
}
|
|
frame_detections.append(detection)
|
|
|
|
all_detections.append(frame_detections)
|
|
|
|
# Update statistics
|
|
self.stats['frames_processed'] += len(frames)
|
|
self.stats['total_detections'] += sum(len(d) for d in all_detections)
|
|
self.stats['avg_detections_per_frame'] = (
|
|
self.stats['total_detections'] / self.stats['frames_processed']
|
|
)
|
|
|
|
return all_detections
|
|
|
|
def filter_detections(self,
|
|
detections: List[Dict[str, Any]],
|
|
min_area: Optional[float] = None,
|
|
max_detections: Optional[int] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter detections based on criteria
|
|
|
|
Args:
|
|
detections: List of detections
|
|
min_area: Minimum bounding box area
|
|
max_detections: Maximum number of detections to keep
|
|
|
|
Returns:
|
|
Filtered detections
|
|
"""
|
|
filtered = detections.copy()
|
|
|
|
# Filter by minimum area
|
|
if min_area is not None:
|
|
filtered = [d for d in filtered if d['area'] >= min_area]
|
|
|
|
# Sort by confidence and keep top N
|
|
if max_detections is not None and len(filtered) > max_detections:
|
|
filtered = sorted(filtered, key=lambda x: x['confidence'], reverse=True)
|
|
filtered = filtered[:max_detections]
|
|
|
|
return filtered
|
|
|
|
def convert_to_sam_prompts(self,
|
|
detections: List[Dict[str, Any]]) -> tuple:
|
|
"""
|
|
Convert detections to SAM2 prompt format
|
|
|
|
Args:
|
|
detections: List of detections
|
|
|
|
Returns:
|
|
Tuple of (boxes, labels) for SAM2
|
|
"""
|
|
if not detections:
|
|
return [], []
|
|
|
|
boxes = [d['box'] for d in detections]
|
|
# All detections are positive prompts (label=1)
|
|
labels = [1] * len(detections)
|
|
|
|
return boxes, labels
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get detection statistics"""
|
|
return self.stats.copy()
|
|
|
|
def reset_stats(self) -> None:
|
|
"""Reset statistics"""
|
|
self.stats = {
|
|
'frames_processed': 0,
|
|
'total_detections': 0,
|
|
'avg_detections_per_frame': 0.0
|
|
}
|
|
|
|
def warmup(self, input_shape: tuple = (1080, 1920, 3)) -> None:
|
|
"""
|
|
Warmup model with dummy inference
|
|
|
|
Args:
|
|
input_shape: Shape of input frames
|
|
"""
|
|
if self.model is None:
|
|
return
|
|
|
|
print("🔥 Warming up detector...")
|
|
dummy_frame = np.zeros(input_shape, dtype=np.uint8)
|
|
_ = self.detect_persons(dummy_frame)
|
|
print(" Detector ready!")
|
|
|
|
def set_confidence_threshold(self, threshold: float) -> None:
|
|
"""Update confidence threshold"""
|
|
self.confidence_threshold = max(0.1, min(0.99, threshold))
|
|
|
|
def __del__(self):
|
|
"""Cleanup"""
|
|
self.model = None |