""" Stereo consistency manager for VR180 side-by-side video processing """ import numpy as np from typing import Tuple, List, Dict, Any, Optional import cv2 import warnings class StereoConsistencyManager: """Manage stereo consistency between left and right eye views""" def __init__(self, config: Dict[str, Any]): self.config = config self.master_eye = config.get('stereo', {}).get('master_eye', 'left') self.disparity_correction = config.get('stereo', {}).get('disparity_correction', True) self.consistency_threshold = config.get('stereo', {}).get('consistency_threshold', 0.3) # Stereo calibration parameters (can be loaded from config) self.baseline = config.get('stereo', {}).get('baseline', 65.0) # mm, typical IPD self.focal_length = config.get('stereo', {}).get('focal_length', 1000.0) # pixels # Statistics tracking self.stats = { 'frames_processed': 0, 'corrections_applied': 0, 'detection_transfers': 0, 'mask_validations': 0 } print(f"👀 Stereo consistency manager initialized:") print(f" Master eye: {self.master_eye}") print(f" Disparity correction: {self.disparity_correction}") print(f" Consistency threshold: {self.consistency_threshold}") def split_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Split side-by-side frame into left and right eye views Args: frame: SBS frame Returns: Tuple of (left_eye, right_eye) frames """ height, width = frame.shape[:2] split_point = width // 2 left_eye = frame[:, :split_point] right_eye = frame[:, split_point:] return left_eye, right_eye def combine_frames(self, left_eye: np.ndarray, right_eye: np.ndarray) -> np.ndarray: """ Combine left and right eye frames back to SBS format Args: left_eye: Left eye frame right_eye: Right eye frame Returns: Combined SBS frame """ # Ensure same height if left_eye.shape[0] != right_eye.shape[0]: target_height = min(left_eye.shape[0], right_eye.shape[0]) left_eye = cv2.resize(left_eye, (left_eye.shape[1], target_height)) right_eye = cv2.resize(right_eye, (right_eye.shape[1], target_height)) return np.hstack([left_eye, right_eye]) def transfer_detections(self, detections: List[Dict[str, Any]], direction: str = 'left_to_right') -> List[Dict[str, Any]]: """ Transfer detections from master to slave eye with disparity adjustment Args: detections: List of detection dicts with 'box' key direction: Transfer direction ('left_to_right' or 'right_to_left') Returns: Transferred detections adjusted for stereo disparity """ transferred = [] for det in detections: box = det['box'] # [x1, y1, x2, y2] if self.disparity_correction: # Calculate disparity based on estimated depth # Closer objects have larger disparity box_width = box[2] - box[0] estimated_depth = self._estimate_depth_from_size(box_width) disparity = self._calculate_disparity(estimated_depth) # Apply disparity shift if direction == 'left_to_right': # Right eye sees objects shifted left adjusted_box = [ box[0] - disparity, box[1], box[2] - disparity, box[3] ] else: # right_to_left # Left eye sees objects shifted right adjusted_box = [ box[0] + disparity, box[1], box[2] + disparity, box[3] ] else: # No disparity correction adjusted_box = box.copy() # Create transferred detection transferred_det = det.copy() transferred_det['box'] = adjusted_box transferred_det['confidence'] = det.get('confidence', 1.0) * 0.95 # Slight reduction transferred_det['transferred'] = True transferred.append(transferred_det) self.stats['detection_transfers'] += len(detections) return transferred def validate_masks(self, left_masks: np.ndarray, right_masks: np.ndarray, frame_idx: int = 0) -> np.ndarray: """ Validate and correct right eye masks based on left eye Args: left_masks: Master eye masks right_masks: Slave eye masks to validate frame_idx: Current frame index for logging Returns: Validated/corrected right eye masks """ self.stats['mask_validations'] += 1 # Quick validation - compare mask areas left_area = np.sum(left_masks > 0) right_area = np.sum(right_masks > 0) if left_area == 0: # No person in left eye, clear right eye too if right_area > 0: warnings.warn(f"Frame {frame_idx}: No person in left eye but found in right - clearing") self.stats['corrections_applied'] += 1 return np.zeros_like(right_masks) return right_masks # Calculate area ratio area_ratio = right_area / (left_area + 1e-6) # Check if correction needed if abs(area_ratio - 1.0) > self.consistency_threshold: print(f" Frame {frame_idx}: Area mismatch (ratio={area_ratio:.2f}) - applying correction") self.stats['corrections_applied'] += 1 # Apply correction based on severity if area_ratio < 0.5 or area_ratio > 2.0: # Significant difference - use template matching right_masks = self._correct_mask_from_template(left_masks, right_masks) else: # Minor difference - blend masks right_masks = self._blend_masks(left_masks, right_masks, area_ratio) return right_masks def combine_masks(self, left_masks: np.ndarray, right_masks: np.ndarray) -> np.ndarray: """ Combine left and right eye masks back to SBS format Args: left_masks: Left eye masks right_masks: Right eye masks Returns: Combined SBS masks """ # Handle different mask formats if left_masks.ndim == 2 and right_masks.ndim == 2: # Single channel masks return np.hstack([left_masks, right_masks]) elif left_masks.ndim == 3 and right_masks.ndim == 3: # Multi-channel masks (e.g., per-object) return np.concatenate([left_masks, right_masks], axis=1) else: raise ValueError(f"Incompatible mask dimensions: {left_masks.shape} vs {right_masks.shape}") def _estimate_depth_from_size(self, object_width_pixels: float) -> float: """ Estimate object depth from its width in pixels Assumes average human width of 45cm Args: object_width_pixels: Width of detected person in pixels Returns: Estimated depth in meters """ HUMAN_WIDTH_M = 0.45 # Average human shoulder width # Using similar triangles: depth = (focal_length * real_width) / pixel_width depth = (self.focal_length * HUMAN_WIDTH_M) / max(object_width_pixels, 1) # Clamp to reasonable range (0.5m to 10m) return np.clip(depth, 0.5, 10.0) def _calculate_disparity(self, depth_m: float) -> float: """ Calculate stereo disparity in pixels for given depth Args: depth_m: Depth in meters Returns: Disparity in pixels """ # Disparity = (baseline * focal_length) / depth # Convert baseline from mm to m disparity_pixels = (self.baseline / 1000.0 * self.focal_length) / depth_m return disparity_pixels def _correct_mask_from_template(self, template_mask: np.ndarray, target_mask: np.ndarray) -> np.ndarray: """ Correct target mask using template mask with disparity adjustment Args: template_mask: Master eye mask to use as template target_mask: Mask to correct Returns: Corrected mask """ if not self.disparity_correction: # Simple copy without disparity return template_mask.copy() # Calculate average disparity from mask centroid template_moments = cv2.moments(template_mask.astype(np.uint8)) if template_moments['m00'] > 0: cx_template = int(template_moments['m10'] / template_moments['m00']) # Estimate depth from mask size mask_width = np.sum(np.any(template_mask > 0, axis=0)) depth = self._estimate_depth_from_size(mask_width) disparity = int(self._calculate_disparity(depth)) # Shift template mask by disparity if self.master_eye == 'left': # Right eye sees shifted left translation = np.float32([[1, 0, -disparity], [0, 1, 0]]) else: # Left eye sees shifted right translation = np.float32([[1, 0, disparity], [0, 1, 0]]) corrected = cv2.warpAffine( template_mask.astype(np.float32), translation, (template_mask.shape[1], template_mask.shape[0]) ) return corrected else: # No valid mask to correct from return template_mask.copy() def _blend_masks(self, mask1: np.ndarray, mask2: np.ndarray, area_ratio: float) -> np.ndarray: """ Blend two masks based on area ratio Args: mask1: First mask mask2: Second mask area_ratio: Ratio of mask2/mask1 areas Returns: Blended mask """ # Calculate blend weight based on how far off the ratio is blend_weight = min(abs(area_ratio - 1.0) / self.consistency_threshold, 1.0) # Blend towards mask1 (master) based on weight blended = mask1 * blend_weight + mask2 * (1 - blend_weight) # Threshold to binary return (blended > 0.5).astype(mask1.dtype) def get_stats(self) -> Dict[str, Any]: """Get processing statistics""" self.stats['frames_processed'] = self.stats.get('mask_validations', 0) if self.stats['frames_processed'] > 0: self.stats['correction_rate'] = ( self.stats['corrections_applied'] / self.stats['frames_processed'] ) else: self.stats['correction_rate'] = 0.0 return self.stats.copy() def reset_stats(self) -> None: """Reset statistics""" self.stats = { 'frames_processed': 0, 'corrections_applied': 0, 'detection_transfers': 0, 'mask_validations': 0 }