streaming part1
This commit is contained in:
223
vr180_streaming/detector.py
Normal file
223
vr180_streaming/detector.py
Normal file
@@ -0,0 +1,223 @@
|
||||
"""
|
||||
Person detector using YOLOv8 for streaming pipeline
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
from typing import List, Dict, Any, Optional
|
||||
import warnings
|
||||
|
||||
try:
|
||||
from ultralytics import YOLO
|
||||
except ImportError:
|
||||
warnings.warn("Ultralytics YOLO not installed. Please install with: pip install ultralytics")
|
||||
YOLO = None
|
||||
|
||||
|
||||
class PersonDetector:
|
||||
"""YOLO-based person detector for VR180 streaming"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.config = config
|
||||
self.confidence_threshold = config.get('detection', {}).get('confidence_threshold', 0.7)
|
||||
self.model_name = config.get('detection', {}).get('model', 'yolov8n')
|
||||
self.device = config.get('detection', {}).get('device', 'cuda')
|
||||
|
||||
self.model = None
|
||||
self._load_model()
|
||||
|
||||
# Statistics
|
||||
self.stats = {
|
||||
'frames_processed': 0,
|
||||
'total_detections': 0,
|
||||
'avg_detections_per_frame': 0.0
|
||||
}
|
||||
|
||||
def _load_model(self) -> None:
|
||||
"""Load YOLO model"""
|
||||
if YOLO is None:
|
||||
raise RuntimeError("YOLO not available. Please install ultralytics.")
|
||||
|
||||
try:
|
||||
# Load pretrained model
|
||||
model_file = f"{self.model_name}.pt"
|
||||
self.model = YOLO(model_file)
|
||||
self.model.to(self.device)
|
||||
|
||||
print(f"🎯 Person detector initialized:")
|
||||
print(f" Model: {self.model_name}")
|
||||
print(f" Device: {self.device}")
|
||||
print(f" Confidence threshold: {self.confidence_threshold}")
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to load YOLO model: {e}")
|
||||
|
||||
def detect_persons(self, frame: np.ndarray) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Detect persons in frame
|
||||
|
||||
Args:
|
||||
frame: Input frame (BGR)
|
||||
|
||||
Returns:
|
||||
List of detection dictionaries with 'box', 'confidence' keys
|
||||
"""
|
||||
if self.model is None:
|
||||
return []
|
||||
|
||||
# Run detection
|
||||
results = self.model(frame, verbose=False, conf=self.confidence_threshold)
|
||||
|
||||
detections = []
|
||||
for r in results:
|
||||
if r.boxes is not None:
|
||||
for box in r.boxes:
|
||||
# Check if detection is person (class 0 in COCO)
|
||||
if int(box.cls) == 0:
|
||||
# Get box coordinates
|
||||
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
||||
confidence = float(box.conf)
|
||||
|
||||
detection = {
|
||||
'box': [int(x1), int(y1), int(x2), int(y2)],
|
||||
'confidence': confidence,
|
||||
'area': (x2 - x1) * (y2 - y1),
|
||||
'center': [(x1 + x2) / 2, (y1 + y2) / 2]
|
||||
}
|
||||
detections.append(detection)
|
||||
|
||||
# Update statistics
|
||||
self.stats['frames_processed'] += 1
|
||||
self.stats['total_detections'] += len(detections)
|
||||
self.stats['avg_detections_per_frame'] = (
|
||||
self.stats['total_detections'] / self.stats['frames_processed']
|
||||
)
|
||||
|
||||
return detections
|
||||
|
||||
def detect_persons_batch(self, frames: List[np.ndarray]) -> List[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Detect persons in batch of frames
|
||||
|
||||
Args:
|
||||
frames: List of frames
|
||||
|
||||
Returns:
|
||||
List of detection lists
|
||||
"""
|
||||
if not frames or self.model is None:
|
||||
return []
|
||||
|
||||
# Process batch
|
||||
results_batch = self.model(frames, verbose=False, conf=self.confidence_threshold)
|
||||
|
||||
all_detections = []
|
||||
for results in results_batch:
|
||||
frame_detections = []
|
||||
|
||||
if results.boxes is not None:
|
||||
for box in results.boxes:
|
||||
if int(box.cls) == 0: # Person class
|
||||
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
||||
confidence = float(box.conf)
|
||||
|
||||
detection = {
|
||||
'box': [int(x1), int(y1), int(x2), int(y2)],
|
||||
'confidence': confidence,
|
||||
'area': (x2 - x1) * (y2 - y1),
|
||||
'center': [(x1 + x2) / 2, (y1 + y2) / 2]
|
||||
}
|
||||
frame_detections.append(detection)
|
||||
|
||||
all_detections.append(frame_detections)
|
||||
|
||||
# Update statistics
|
||||
self.stats['frames_processed'] += len(frames)
|
||||
self.stats['total_detections'] += sum(len(d) for d in all_detections)
|
||||
self.stats['avg_detections_per_frame'] = (
|
||||
self.stats['total_detections'] / self.stats['frames_processed']
|
||||
)
|
||||
|
||||
return all_detections
|
||||
|
||||
def filter_detections(self,
|
||||
detections: List[Dict[str, Any]],
|
||||
min_area: Optional[float] = None,
|
||||
max_detections: Optional[int] = None) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Filter detections based on criteria
|
||||
|
||||
Args:
|
||||
detections: List of detections
|
||||
min_area: Minimum bounding box area
|
||||
max_detections: Maximum number of detections to keep
|
||||
|
||||
Returns:
|
||||
Filtered detections
|
||||
"""
|
||||
filtered = detections.copy()
|
||||
|
||||
# Filter by minimum area
|
||||
if min_area is not None:
|
||||
filtered = [d for d in filtered if d['area'] >= min_area]
|
||||
|
||||
# Sort by confidence and keep top N
|
||||
if max_detections is not None and len(filtered) > max_detections:
|
||||
filtered = sorted(filtered, key=lambda x: x['confidence'], reverse=True)
|
||||
filtered = filtered[:max_detections]
|
||||
|
||||
return filtered
|
||||
|
||||
def convert_to_sam_prompts(self,
|
||||
detections: List[Dict[str, Any]]) -> tuple:
|
||||
"""
|
||||
Convert detections to SAM2 prompt format
|
||||
|
||||
Args:
|
||||
detections: List of detections
|
||||
|
||||
Returns:
|
||||
Tuple of (boxes, labels) for SAM2
|
||||
"""
|
||||
if not detections:
|
||||
return [], []
|
||||
|
||||
boxes = [d['box'] for d in detections]
|
||||
# All detections are positive prompts (label=1)
|
||||
labels = [1] * len(detections)
|
||||
|
||||
return boxes, labels
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get detection statistics"""
|
||||
return self.stats.copy()
|
||||
|
||||
def reset_stats(self) -> None:
|
||||
"""Reset statistics"""
|
||||
self.stats = {
|
||||
'frames_processed': 0,
|
||||
'total_detections': 0,
|
||||
'avg_detections_per_frame': 0.0
|
||||
}
|
||||
|
||||
def warmup(self, input_shape: tuple = (1080, 1920, 3)) -> None:
|
||||
"""
|
||||
Warmup model with dummy inference
|
||||
|
||||
Args:
|
||||
input_shape: Shape of input frames
|
||||
"""
|
||||
if self.model is None:
|
||||
return
|
||||
|
||||
print("🔥 Warming up detector...")
|
||||
dummy_frame = np.zeros(input_shape, dtype=np.uint8)
|
||||
_ = self.detect_persons(dummy_frame)
|
||||
print(" Detector ready!")
|
||||
|
||||
def set_confidence_threshold(self, threshold: float) -> None:
|
||||
"""Update confidence threshold"""
|
||||
self.confidence_threshold = max(0.1, min(0.99, threshold))
|
||||
|
||||
def __del__(self):
|
||||
"""Cleanup"""
|
||||
self.model = None
|
||||
Reference in New Issue
Block a user