324 lines
12 KiB
Python
324 lines
12 KiB
Python
"""
|
|
Stereo consistency manager for VR180 side-by-side video processing
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Tuple, List, Dict, Any, Optional
|
|
import cv2
|
|
import warnings
|
|
|
|
|
|
class StereoConsistencyManager:
|
|
"""Manage stereo consistency between left and right eye views"""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.master_eye = config.get('stereo', {}).get('master_eye', 'left')
|
|
self.disparity_correction = config.get('stereo', {}).get('disparity_correction', True)
|
|
self.consistency_threshold = config.get('stereo', {}).get('consistency_threshold', 0.3)
|
|
|
|
# Stereo calibration parameters (can be loaded from config)
|
|
self.baseline = config.get('stereo', {}).get('baseline', 65.0) # mm, typical IPD
|
|
self.focal_length = config.get('stereo', {}).get('focal_length', 1000.0) # pixels
|
|
|
|
# Statistics tracking
|
|
self.stats = {
|
|
'frames_processed': 0,
|
|
'corrections_applied': 0,
|
|
'detection_transfers': 0,
|
|
'mask_validations': 0
|
|
}
|
|
|
|
print(f"👀 Stereo consistency manager initialized:")
|
|
print(f" Master eye: {self.master_eye}")
|
|
print(f" Disparity correction: {self.disparity_correction}")
|
|
print(f" Consistency threshold: {self.consistency_threshold}")
|
|
|
|
def split_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Split side-by-side frame into left and right eye views
|
|
|
|
Args:
|
|
frame: SBS frame
|
|
|
|
Returns:
|
|
Tuple of (left_eye, right_eye) frames
|
|
"""
|
|
height, width = frame.shape[:2]
|
|
split_point = width // 2
|
|
|
|
left_eye = frame[:, :split_point]
|
|
right_eye = frame[:, split_point:]
|
|
|
|
return left_eye, right_eye
|
|
|
|
def combine_frames(self, left_eye: np.ndarray, right_eye: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Combine left and right eye frames back to SBS format
|
|
|
|
Args:
|
|
left_eye: Left eye frame
|
|
right_eye: Right eye frame
|
|
|
|
Returns:
|
|
Combined SBS frame
|
|
"""
|
|
# Ensure same height
|
|
if left_eye.shape[0] != right_eye.shape[0]:
|
|
target_height = min(left_eye.shape[0], right_eye.shape[0])
|
|
left_eye = cv2.resize(left_eye, (left_eye.shape[1], target_height))
|
|
right_eye = cv2.resize(right_eye, (right_eye.shape[1], target_height))
|
|
|
|
return np.hstack([left_eye, right_eye])
|
|
|
|
def transfer_detections(self,
|
|
detections: List[Dict[str, Any]],
|
|
direction: str = 'left_to_right') -> List[Dict[str, Any]]:
|
|
"""
|
|
Transfer detections from master to slave eye with disparity adjustment
|
|
|
|
Args:
|
|
detections: List of detection dicts with 'box' key
|
|
direction: Transfer direction ('left_to_right' or 'right_to_left')
|
|
|
|
Returns:
|
|
Transferred detections adjusted for stereo disparity
|
|
"""
|
|
transferred = []
|
|
|
|
for det in detections:
|
|
box = det['box'] # [x1, y1, x2, y2]
|
|
|
|
if self.disparity_correction:
|
|
# Calculate disparity based on estimated depth
|
|
# Closer objects have larger disparity
|
|
box_width = box[2] - box[0]
|
|
estimated_depth = self._estimate_depth_from_size(box_width)
|
|
disparity = self._calculate_disparity(estimated_depth)
|
|
|
|
# Apply disparity shift
|
|
if direction == 'left_to_right':
|
|
# Right eye sees objects shifted left
|
|
adjusted_box = [
|
|
box[0] - disparity,
|
|
box[1],
|
|
box[2] - disparity,
|
|
box[3]
|
|
]
|
|
else: # right_to_left
|
|
# Left eye sees objects shifted right
|
|
adjusted_box = [
|
|
box[0] + disparity,
|
|
box[1],
|
|
box[2] + disparity,
|
|
box[3]
|
|
]
|
|
else:
|
|
# No disparity correction
|
|
adjusted_box = box.copy()
|
|
|
|
# Create transferred detection
|
|
transferred_det = det.copy()
|
|
transferred_det['box'] = adjusted_box
|
|
transferred_det['confidence'] = det.get('confidence', 1.0) * 0.95 # Slight reduction
|
|
transferred_det['transferred'] = True
|
|
|
|
transferred.append(transferred_det)
|
|
|
|
self.stats['detection_transfers'] += len(detections)
|
|
return transferred
|
|
|
|
def validate_masks(self,
|
|
left_masks: np.ndarray,
|
|
right_masks: np.ndarray,
|
|
frame_idx: int = 0) -> np.ndarray:
|
|
"""
|
|
Validate and correct right eye masks based on left eye
|
|
|
|
Args:
|
|
left_masks: Master eye masks
|
|
right_masks: Slave eye masks to validate
|
|
frame_idx: Current frame index for logging
|
|
|
|
Returns:
|
|
Validated/corrected right eye masks
|
|
"""
|
|
self.stats['mask_validations'] += 1
|
|
|
|
# Quick validation - compare mask areas
|
|
left_area = np.sum(left_masks > 0)
|
|
right_area = np.sum(right_masks > 0)
|
|
|
|
if left_area == 0:
|
|
# No person in left eye, clear right eye too
|
|
if right_area > 0:
|
|
warnings.warn(f"Frame {frame_idx}: No person in left eye but found in right - clearing")
|
|
self.stats['corrections_applied'] += 1
|
|
return np.zeros_like(right_masks)
|
|
return right_masks
|
|
|
|
# Calculate area ratio
|
|
area_ratio = right_area / (left_area + 1e-6)
|
|
|
|
# Check if correction needed
|
|
if abs(area_ratio - 1.0) > self.consistency_threshold:
|
|
print(f" Frame {frame_idx}: Area mismatch (ratio={area_ratio:.2f}) - applying correction")
|
|
self.stats['corrections_applied'] += 1
|
|
|
|
# Apply correction based on severity
|
|
if area_ratio < 0.5 or area_ratio > 2.0:
|
|
# Significant difference - use template matching
|
|
right_masks = self._correct_mask_from_template(left_masks, right_masks)
|
|
else:
|
|
# Minor difference - blend masks
|
|
right_masks = self._blend_masks(left_masks, right_masks, area_ratio)
|
|
|
|
return right_masks
|
|
|
|
def combine_masks(self, left_masks: np.ndarray, right_masks: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Combine left and right eye masks back to SBS format
|
|
|
|
Args:
|
|
left_masks: Left eye masks
|
|
right_masks: Right eye masks
|
|
|
|
Returns:
|
|
Combined SBS masks
|
|
"""
|
|
# Handle different mask formats
|
|
if left_masks.ndim == 2 and right_masks.ndim == 2:
|
|
# Single channel masks
|
|
return np.hstack([left_masks, right_masks])
|
|
elif left_masks.ndim == 3 and right_masks.ndim == 3:
|
|
# Multi-channel masks (e.g., per-object)
|
|
return np.concatenate([left_masks, right_masks], axis=1)
|
|
else:
|
|
raise ValueError(f"Incompatible mask dimensions: {left_masks.shape} vs {right_masks.shape}")
|
|
|
|
def _estimate_depth_from_size(self, object_width_pixels: float) -> float:
|
|
"""
|
|
Estimate object depth from its width in pixels
|
|
Assumes average human width of 45cm
|
|
|
|
Args:
|
|
object_width_pixels: Width of detected person in pixels
|
|
|
|
Returns:
|
|
Estimated depth in meters
|
|
"""
|
|
HUMAN_WIDTH_M = 0.45 # Average human shoulder width
|
|
|
|
# Using similar triangles: depth = (focal_length * real_width) / pixel_width
|
|
depth = (self.focal_length * HUMAN_WIDTH_M) / max(object_width_pixels, 1)
|
|
|
|
# Clamp to reasonable range (0.5m to 10m)
|
|
return np.clip(depth, 0.5, 10.0)
|
|
|
|
def _calculate_disparity(self, depth_m: float) -> float:
|
|
"""
|
|
Calculate stereo disparity in pixels for given depth
|
|
|
|
Args:
|
|
depth_m: Depth in meters
|
|
|
|
Returns:
|
|
Disparity in pixels
|
|
"""
|
|
# Disparity = (baseline * focal_length) / depth
|
|
# Convert baseline from mm to m
|
|
disparity_pixels = (self.baseline / 1000.0 * self.focal_length) / depth_m
|
|
|
|
return disparity_pixels
|
|
|
|
def _correct_mask_from_template(self,
|
|
template_mask: np.ndarray,
|
|
target_mask: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Correct target mask using template mask with disparity adjustment
|
|
|
|
Args:
|
|
template_mask: Master eye mask to use as template
|
|
target_mask: Mask to correct
|
|
|
|
Returns:
|
|
Corrected mask
|
|
"""
|
|
if not self.disparity_correction:
|
|
# Simple copy without disparity
|
|
return template_mask.copy()
|
|
|
|
# Calculate average disparity from mask centroid
|
|
template_moments = cv2.moments(template_mask.astype(np.uint8))
|
|
if template_moments['m00'] > 0:
|
|
cx_template = int(template_moments['m10'] / template_moments['m00'])
|
|
|
|
# Estimate depth from mask size
|
|
mask_width = np.sum(np.any(template_mask > 0, axis=0))
|
|
depth = self._estimate_depth_from_size(mask_width)
|
|
disparity = int(self._calculate_disparity(depth))
|
|
|
|
# Shift template mask by disparity
|
|
if self.master_eye == 'left':
|
|
# Right eye sees shifted left
|
|
translation = np.float32([[1, 0, -disparity], [0, 1, 0]])
|
|
else:
|
|
# Left eye sees shifted right
|
|
translation = np.float32([[1, 0, disparity], [0, 1, 0]])
|
|
|
|
corrected = cv2.warpAffine(
|
|
template_mask.astype(np.float32),
|
|
translation,
|
|
(template_mask.shape[1], template_mask.shape[0])
|
|
)
|
|
|
|
return corrected
|
|
else:
|
|
# No valid mask to correct from
|
|
return template_mask.copy()
|
|
|
|
def _blend_masks(self,
|
|
mask1: np.ndarray,
|
|
mask2: np.ndarray,
|
|
area_ratio: float) -> np.ndarray:
|
|
"""
|
|
Blend two masks based on area ratio
|
|
|
|
Args:
|
|
mask1: First mask
|
|
mask2: Second mask
|
|
area_ratio: Ratio of mask2/mask1 areas
|
|
|
|
Returns:
|
|
Blended mask
|
|
"""
|
|
# Calculate blend weight based on how far off the ratio is
|
|
blend_weight = min(abs(area_ratio - 1.0) / self.consistency_threshold, 1.0)
|
|
|
|
# Blend towards mask1 (master) based on weight
|
|
blended = mask1 * blend_weight + mask2 * (1 - blend_weight)
|
|
|
|
# Threshold to binary
|
|
return (blended > 0.5).astype(mask1.dtype)
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get processing statistics"""
|
|
self.stats['frames_processed'] = self.stats.get('mask_validations', 0)
|
|
|
|
if self.stats['frames_processed'] > 0:
|
|
self.stats['correction_rate'] = (
|
|
self.stats['corrections_applied'] / self.stats['frames_processed']
|
|
)
|
|
else:
|
|
self.stats['correction_rate'] = 0.0
|
|
|
|
return self.stats.copy()
|
|
|
|
def reset_stats(self) -> None:
|
|
"""Reset statistics"""
|
|
self.stats = {
|
|
'frames_processed': 0,
|
|
'corrections_applied': 0,
|
|
'detection_transfers': 0,
|
|
'mask_validations': 0
|
|
} |