Files
test2/vr180_streaming/frame_reader.py
2025-07-27 08:01:08 -07:00

191 lines
5.6 KiB
Python

"""
Streaming frame reader for memory-efficient video processing
"""
import cv2
import numpy as np
from pathlib import Path
from typing import Optional, Dict, Any, Tuple
class StreamingFrameReader:
"""Read frames one at a time from video file with seeking support"""
def __init__(self, video_path: str, start_frame: int = 0):
self.video_path = Path(video_path)
if not self.video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
self.cap = cv2.VideoCapture(str(self.video_path))
if not self.cap.isOpened():
raise RuntimeError(f"Failed to open video: {video_path}")
# Get video properties
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Set start position
self.current_frame_idx = 0
if start_frame > 0:
self.seek(start_frame)
print(f"📹 Streaming reader initialized:")
print(f" Video: {self.video_path.name}")
print(f" Resolution: {self.width}x{self.height}")
print(f" FPS: {self.fps}")
print(f" Total frames: {self.total_frames}")
print(f" Starting at frame: {start_frame}")
def read_frame(self) -> Optional[np.ndarray]:
"""
Read next frame from video
Returns:
Frame as numpy array or None if end of video
"""
ret, frame = self.cap.read()
if ret:
self.current_frame_idx += 1
return frame
return None
def seek(self, frame_idx: int) -> bool:
"""
Seek to specific frame
Args:
frame_idx: Target frame index
Returns:
True if seek successful
"""
if 0 <= frame_idx < self.total_frames:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
self.current_frame_idx = frame_idx
return True
return False
def get_video_info(self) -> Dict[str, Any]:
"""Get video metadata"""
return {
'width': self.width,
'height': self.height,
'fps': self.fps,
'total_frames': self.total_frames,
'path': str(self.video_path)
}
def get_progress(self) -> float:
"""Get current progress as percentage"""
if self.total_frames > 0:
return (self.current_frame_idx / self.total_frames) * 100
return 0.0
def reset(self) -> None:
"""Reset to beginning of video"""
self.seek(0)
def peek_frame(self) -> Optional[np.ndarray]:
"""
Peek at next frame without advancing position
Returns:
Frame as numpy array or None if end of video
"""
current_pos = self.current_frame_idx
frame = self.read_frame()
if frame is not None:
# Reset position
self.seek(current_pos)
return frame
def read_frame_at(self, frame_idx: int) -> Optional[np.ndarray]:
"""
Read frame at specific index without changing current position
Args:
frame_idx: Frame index to read
Returns:
Frame as numpy array or None if invalid index
"""
current_pos = self.current_frame_idx
if self.seek(frame_idx):
frame = self.read_frame()
# Restore position
self.seek(current_pos)
return frame
return None
def get_frame_batch(self, start_idx: int, count: int) -> list[np.ndarray]:
"""
Read a batch of frames (for initial detection or correction)
Args:
start_idx: Starting frame index
count: Number of frames to read
Returns:
List of frames
"""
current_pos = self.current_frame_idx
frames = []
if self.seek(start_idx):
for i in range(count):
frame = self.read_frame()
if frame is None:
break
frames.append(frame)
# Restore position
self.seek(current_pos)
return frames
def estimate_memory_per_frame(self) -> float:
"""
Estimate memory usage per frame in MB
Returns:
Estimated memory in MB
"""
# BGR format = 3 channels, uint8 = 1 byte per channel
bytes_per_frame = self.width * self.height * 3
return bytes_per_frame / (1024 * 1024)
def close(self) -> None:
"""Release video capture resources"""
if self.cap is not None:
self.cap.release()
self.cap = None
def __enter__(self):
"""Context manager support"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager cleanup"""
self.close()
def __del__(self):
"""Ensure cleanup on deletion"""
self.close()
def __len__(self) -> int:
"""Total number of frames"""
return self.total_frames
def __iter__(self):
"""Iterator support"""
self.reset()
return self
def __next__(self) -> np.ndarray:
"""Iterator next frame"""
frame = self.read_frame()
if frame is None:
raise StopIteration
return frame