Compare commits

10 Commits

Author SHA1 Message Date
277d554ecc fix scaling 1 2025-07-26 18:31:16 -07:00
d6d2b0aa93 full size babyyy 2025-07-26 18:09:48 -07:00
3a547b7c21 please god work 2025-07-26 17:44:23 -07:00
262cb00b69 checkpoints yay 2025-07-26 17:11:07 -07:00
caa4ddb5e0 actually fix streaming save 2025-07-26 17:05:50 -07:00
fa945b9c3e fix concat 2025-07-26 16:29:59 -07:00
4958c503dd please merge 2025-07-26 16:02:07 -07:00
366b132ef5 growth 2025-07-26 15:31:07 -07:00
4d1361df46 bigtime 2025-07-26 15:29:37 -07:00
884cb8dce2 lol 2025-07-26 15:29:28 -07:00
5 changed files with 911 additions and 65 deletions

148
test_inter_chunk_cleanup.py Normal file
View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Test script to verify inter-chunk cleanup properly destroys models
"""
import psutil
import gc
import sys
from pathlib import Path
def get_memory_usage():
"""Get current memory usage in GB"""
process = psutil.Process()
return process.memory_info().rss / (1024**3)
def test_inter_chunk_cleanup():
"""Test that models are properly destroyed between chunks"""
print("🧪 TESTING INTER-CHUNK CLEANUP")
print("=" * 50)
baseline_memory = get_memory_usage()
print(f"📊 Baseline memory: {baseline_memory:.2f} GB")
# Import and create processor
print("\n1⃣ Creating processor...")
from vr180_matting.config import VR180Config
from vr180_matting.vr180_processor import VR180Processor
config = VR180Config.from_yaml('config.yaml')
processor = VR180Processor(config)
init_memory = get_memory_usage()
print(f"📊 After processor init: {init_memory:.2f} GB (+{init_memory - baseline_memory:.2f} GB)")
# Simulate chunk processing (just trigger model loading)
print("\n2⃣ Simulating chunk 0 processing...")
# Test 1: Force YOLO model loading
try:
detector = processor.detector
detector._load_model() # Force load
yolo_memory = get_memory_usage()
print(f"📊 After YOLO load: {yolo_memory:.2f} GB (+{yolo_memory - init_memory:.2f} GB)")
except Exception as e:
print(f"❌ YOLO loading failed: {e}")
yolo_memory = init_memory
# Test 2: Force SAM2 model loading
try:
sam2_model = processor.sam2_model
sam2_model._load_model(sam2_model.model_cfg, sam2_model.checkpoint_path)
sam2_memory = get_memory_usage()
print(f"📊 After SAM2 load: {sam2_memory:.2f} GB (+{sam2_memory - yolo_memory:.2f} GB)")
except Exception as e:
print(f"❌ SAM2 loading failed: {e}")
sam2_memory = yolo_memory
total_model_memory = sam2_memory - init_memory
print(f"📊 Total model memory: {total_model_memory:.2f} GB")
# Test 3: Inter-chunk cleanup
print("\n3⃣ Testing inter-chunk cleanup...")
processor._complete_inter_chunk_cleanup(chunk_idx=0)
cleanup_memory = get_memory_usage()
cleanup_improvement = sam2_memory - cleanup_memory
print(f"📊 After cleanup: {cleanup_memory:.2f} GB (-{cleanup_improvement:.2f} GB freed)")
# Test 4: Verify models reload fresh
print("\n4⃣ Testing fresh model reload...")
# Check YOLO state
yolo_reloaded = processor.detector.model is None
print(f"🔍 YOLO model destroyed: {'✅ YES' if yolo_reloaded else '❌ NO'}")
# Check SAM2 state
sam2_reloaded = not processor.sam2_model._model_loaded or processor.sam2_model.predictor is None
print(f"🔍 SAM2 model destroyed: {'✅ YES' if sam2_reloaded else '❌ NO'}")
# Test 5: Force reload to verify they work
print("\n5⃣ Testing model reload...")
try:
# Force YOLO reload
processor.detector._load_model()
yolo_reload_memory = get_memory_usage()
# Force SAM2 reload
processor.sam2_model._load_model(processor.sam2_model.model_cfg, processor.sam2_model.checkpoint_path)
sam2_reload_memory = get_memory_usage()
reload_growth = sam2_reload_memory - cleanup_memory
print(f"📊 After reload: {sam2_reload_memory:.2f} GB (+{reload_growth:.2f} GB)")
if abs(reload_growth - total_model_memory) < 1.0: # Within 1GB
print("✅ Models reloaded with similar memory usage (good)")
else:
print("⚠️ Model reload memory differs significantly")
except Exception as e:
print(f"❌ Model reload failed: {e}")
# Final summary
print(f"\n📊 SUMMARY:")
print(f" Baseline → Peak: {baseline_memory:.2f}GB → {sam2_memory:.2f}GB")
print(f" Peak → Cleanup: {sam2_memory:.2f}GB → {cleanup_memory:.2f}GB")
print(f" Memory freed: {cleanup_improvement:.2f}GB")
print(f" Models destroyed: YOLO={yolo_reloaded}, SAM2={sam2_reloaded}")
# Success criteria: Both models destroyed AND can reload
models_destroyed = yolo_reloaded and sam2_reloaded
can_reload = 'reload_growth' in locals()
if models_destroyed and can_reload:
print("✅ Inter-chunk cleanup working effectively")
print("💡 Models destroyed and can reload fresh (memory will be freed during real processing)")
return True
elif models_destroyed:
print("⚠️ Models destroyed but reload test incomplete")
print("💡 This should still prevent accumulation during real processing")
return True
else:
print("❌ Inter-chunk cleanup not freeing enough memory")
return False
def main():
if len(sys.argv) != 2:
print("Usage: python test_inter_chunk_cleanup.py <config.yaml>")
sys.exit(1)
config_path = sys.argv[1]
if not Path(config_path).exists():
print(f"Config file not found: {config_path}")
sys.exit(1)
success = test_inter_chunk_cleanup()
if success:
print(f"\n🎉 SUCCESS: Inter-chunk cleanup is working!")
print(f"💡 This should prevent 15-20GB model accumulation between chunks")
else:
print(f"\n❌ FAILURE: Inter-chunk cleanup needs improvement")
print(f"💡 Check model destruction logic in _complete_inter_chunk_cleanup")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,220 @@
"""
Checkpoint manager for resumable video processing
Saves progress to avoid reprocessing after OOM or crashes
"""
import json
import hashlib
from pathlib import Path
from typing import Dict, Any, Optional, List
import os
import shutil
from datetime import datetime
class CheckpointManager:
"""Manages processing checkpoints for resumable execution"""
def __init__(self, video_path: str, output_path: str, checkpoint_dir: Optional[Path] = None):
"""
Initialize checkpoint manager
Args:
video_path: Input video path
output_path: Output video path
checkpoint_dir: Directory for checkpoint files (default: .vr180_checkpoints in CWD)
"""
self.video_path = Path(video_path)
self.output_path = Path(output_path)
# Create unique checkpoint ID based on video file
self.video_hash = self._compute_video_hash()
# Setup checkpoint directory
if checkpoint_dir is None:
self.checkpoint_dir = Path.cwd() / ".vr180_checkpoints" / self.video_hash
else:
self.checkpoint_dir = Path(checkpoint_dir) / self.video_hash
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
# Checkpoint files
self.status_file = self.checkpoint_dir / "processing_status.json"
self.chunks_dir = self.checkpoint_dir / "chunks"
self.chunks_dir.mkdir(exist_ok=True)
# Load existing status or create new
self.status = self._load_status()
def _compute_video_hash(self) -> str:
"""Compute hash of video file for unique identification"""
# Use file path, size, and modification time for quick hash
stat = self.video_path.stat()
hash_str = f"{self.video_path}_{stat.st_size}_{stat.st_mtime}"
return hashlib.md5(hash_str.encode()).hexdigest()[:12]
def _load_status(self) -> Dict[str, Any]:
"""Load processing status from checkpoint file"""
if self.status_file.exists():
with open(self.status_file, 'r') as f:
status = json.load(f)
print(f"📋 Loaded checkpoint: {status['completed_chunks']}/{status['total_chunks']} chunks completed")
return status
else:
# Create new status
return {
'video_path': str(self.video_path),
'output_path': str(self.output_path),
'video_hash': self.video_hash,
'start_time': datetime.now().isoformat(),
'total_chunks': 0,
'completed_chunks': 0,
'chunk_info': {},
'processing_complete': False,
'merge_complete': False
}
def _save_status(self):
"""Save current status to checkpoint file"""
self.status['last_update'] = datetime.now().isoformat()
with open(self.status_file, 'w') as f:
json.dump(self.status, f, indent=2)
def set_total_chunks(self, total_chunks: int):
"""Set total number of chunks to process"""
self.status['total_chunks'] = total_chunks
self._save_status()
def is_chunk_completed(self, chunk_idx: int) -> bool:
"""Check if a chunk has already been processed"""
chunk_key = f"chunk_{chunk_idx}"
return chunk_key in self.status['chunk_info'] and \
self.status['chunk_info'][chunk_key].get('completed', False)
def get_chunk_file(self, chunk_idx: int) -> Optional[Path]:
"""Get saved chunk file path if it exists"""
chunk_file = self.chunks_dir / f"chunk_{chunk_idx:04d}.npz"
if chunk_file.exists() and self.is_chunk_completed(chunk_idx):
return chunk_file
return None
def save_chunk(self, chunk_idx: int, frames: List, source_chunk_path: Optional[Path] = None):
"""
Save processed chunk and mark as completed
Args:
chunk_idx: Chunk index
frames: Processed frames (can be None if using source_chunk_path)
source_chunk_path: If provided, copy this file instead of saving frames
"""
chunk_file = self.chunks_dir / f"chunk_{chunk_idx:04d}.npz"
try:
if source_chunk_path and source_chunk_path.exists():
# Copy existing chunk file
shutil.copy2(source_chunk_path, chunk_file)
print(f"💾 Copied chunk {chunk_idx} to checkpoint: {chunk_file.name}")
elif frames is not None:
# Save new frames
import numpy as np
np.savez_compressed(str(chunk_file), frames=frames)
print(f"💾 Saved chunk {chunk_idx} to checkpoint: {chunk_file.name}")
else:
raise ValueError("Either frames or source_chunk_path must be provided")
# Update status
chunk_key = f"chunk_{chunk_idx}"
self.status['chunk_info'][chunk_key] = {
'completed': True,
'file': chunk_file.name,
'timestamp': datetime.now().isoformat()
}
self.status['completed_chunks'] = len([c for c in self.status['chunk_info'].values() if c['completed']])
self._save_status()
print(f"✅ Chunk {chunk_idx} checkpoint saved ({self.status['completed_chunks']}/{self.status['total_chunks']})")
except Exception as e:
print(f"❌ Failed to save chunk {chunk_idx} checkpoint: {e}")
def get_completed_chunk_files(self) -> List[Path]:
"""Get list of all completed chunk files in order"""
chunk_files = []
missing_chunks = []
for i in range(self.status['total_chunks']):
chunk_file = self.get_chunk_file(i)
if chunk_file:
chunk_files.append(chunk_file)
else:
# Check if chunk is marked as completed but file is missing
if self.is_chunk_completed(i):
missing_chunks.append(i)
print(f"⚠️ Chunk {i} marked complete but file missing!")
else:
break # Stop at first unprocessed chunk
if missing_chunks:
print(f"❌ Missing checkpoint files for chunks: {missing_chunks}")
print(f" This may happen if files were deleted during streaming merge")
print(f" These chunks may need to be reprocessed")
return chunk_files
def mark_processing_complete(self):
"""Mark all chunk processing as complete"""
self.status['processing_complete'] = True
self._save_status()
print(f"✅ All chunks processed and checkpointed")
def mark_merge_complete(self):
"""Mark final merge as complete"""
self.status['merge_complete'] = True
self._save_status()
print(f"✅ Video merge completed")
def cleanup_checkpoints(self, keep_chunks: bool = False):
"""
Clean up checkpoint files after successful completion
Args:
keep_chunks: If True, keep chunk files but remove status
"""
if keep_chunks:
# Just remove status file
if self.status_file.exists():
self.status_file.unlink()
print(f"🗑️ Removed checkpoint status file")
else:
# Remove entire checkpoint directory
if self.checkpoint_dir.exists():
shutil.rmtree(self.checkpoint_dir)
print(f"🗑️ Removed all checkpoint files: {self.checkpoint_dir}")
def get_resume_info(self) -> Dict[str, Any]:
"""Get information about what can be resumed"""
return {
'can_resume': self.status['completed_chunks'] > 0,
'completed_chunks': self.status['completed_chunks'],
'total_chunks': self.status['total_chunks'],
'processing_complete': self.status['processing_complete'],
'merge_complete': self.status['merge_complete'],
'checkpoint_dir': str(self.checkpoint_dir)
}
def print_status(self):
"""Print current checkpoint status"""
print(f"\n📊 CHECKPOINT STATUS:")
print(f" Video: {self.video_path.name}")
print(f" Hash: {self.video_hash}")
print(f" Progress: {self.status['completed_chunks']}/{self.status['total_chunks']} chunks")
print(f" Processing complete: {self.status['processing_complete']}")
print(f" Merge complete: {self.status['merge_complete']}")
print(f" Checkpoint dir: {self.checkpoint_dir}")
if self.status['completed_chunks'] > 0:
print(f"\n Completed chunks:")
for i in range(self.status['completed_chunks']):
chunk_info = self.status['chunk_info'].get(f'chunk_{i}', {})
if chunk_info.get('completed'):
print(f" ✓ Chunk {i}: {chunk_info.get('file', 'unknown')}")

View File

@@ -49,8 +49,8 @@ class SAM2VideoMatting:
def _load_model(self, model_cfg: str, checkpoint_path: str):
"""Load SAM2 video predictor lazily"""
if self._model_loaded:
return # Already loaded
if self._model_loaded and self.predictor is not None:
return # Already loaded and predictor exists
try:
# Import heavy SAM2 modules only when needed
@@ -419,6 +419,9 @@ class SAM2VideoMatting:
finally:
self.predictor = None
# Reset model loaded state for fresh reload
self._model_loaded = False
# Force garbage collection (critical for memory leak prevention)
gc.collect()

View File

@@ -281,6 +281,116 @@ class VideoProcessor:
print(f"Read {len(frames)} frames")
return frames
def read_video_frames_dual_resolution(self,
video_path: str,
start_frame: int = 0,
num_frames: Optional[int] = None,
scale_factor: float = 0.5) -> Dict[str, List[np.ndarray]]:
"""
Read video frames at both original and scaled resolution for dual-resolution processing
Args:
video_path: Path to video file
start_frame: Starting frame index
num_frames: Number of frames to read (None for all)
scale_factor: Scaling factor for inference frames
Returns:
Dictionary with 'original' and 'scaled' frame lists
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Could not open video file: {video_path}")
# Set starting position
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
original_frames = []
scaled_frames = []
frame_count = 0
# Progress tracking
total_to_read = num_frames if num_frames else self.total_frames - start_frame
with tqdm(total=total_to_read, desc="Reading dual-resolution frames") as pbar:
while True:
ret, frame = cap.read()
if not ret:
break
# Store original frame
original_frames.append(frame.copy())
# Create scaled frame for inference
if scale_factor != 1.0:
new_width = int(frame.shape[1] * scale_factor)
new_height = int(frame.shape[0] * scale_factor)
scaled_frame = cv2.resize(frame, (new_width, new_height),
interpolation=cv2.INTER_AREA)
else:
scaled_frame = frame.copy()
scaled_frames.append(scaled_frame)
frame_count += 1
pbar.update(1)
if num_frames is not None and frame_count >= num_frames:
break
cap.release()
print(f"Loaded {len(original_frames)} frames:")
print(f" Original: {original_frames[0].shape} per frame")
print(f" Scaled: {scaled_frames[0].shape} per frame (scale_factor={scale_factor})")
return {
'original': original_frames,
'scaled': scaled_frames
}
def upscale_mask(self, mask: np.ndarray, target_shape: tuple, method: str = 'cubic') -> np.ndarray:
"""
Upscale a mask from inference resolution to original resolution
Args:
mask: Low-resolution mask (H, W)
target_shape: Target shape (H, W) for upscaling
method: Upscaling method ('nearest', 'cubic', 'area')
Returns:
Upscaled mask at target resolution
"""
if mask.shape[:2] == target_shape[:2]:
return mask # Already correct size
# Ensure mask is 2D
if mask.ndim == 3:
mask = mask.squeeze()
# Choose interpolation method
if method == 'nearest':
interpolation = cv2.INTER_NEAREST # Crisp edges, good for sharp subjects
elif method == 'cubic':
interpolation = cv2.INTER_CUBIC # Smooth edges, good for most content
elif method == 'area':
interpolation = cv2.INTER_AREA # Good for downscaling, not upscaling
else:
interpolation = cv2.INTER_CUBIC # Default to cubic
# Upscale mask
upscaled_mask = cv2.resize(
mask.astype(np.uint8),
(target_shape[1], target_shape[0]), # (width, height) for cv2.resize
interpolation=interpolation
)
# Convert back to boolean if it was originally boolean
if mask.dtype == bool:
upscaled_mask = upscaled_mask.astype(bool)
return upscaled_mask
def calculate_optimal_chunking(self) -> Tuple[int, int]:
"""
Calculate optimal chunk size and overlap based on memory constraints
@@ -369,6 +479,92 @@ class VideoProcessor:
return matted_frames
def process_chunk_dual_resolution(self,
frame_data: Dict[str, List[np.ndarray]],
chunk_idx: int = 0) -> List[np.ndarray]:
"""
Process a chunk using dual-resolution approach: inference at low-res, output at full-res
Args:
frame_data: Dictionary with 'original' and 'scaled' frame lists
chunk_idx: Chunk index for logging
Returns:
List of matted frames at original resolution
"""
original_frames = frame_data['original']
scaled_frames = frame_data['scaled']
print(f"Processing chunk {chunk_idx} with dual-resolution ({len(original_frames)} frames)")
print(f" Inference: {scaled_frames[0].shape} → Output: {original_frames[0].shape}")
with self.memory_manager.memory_monitor(f"dual-res chunk {chunk_idx}"):
# Initialize SAM2 with scaled frames for inference
self.sam2_model.init_video_state(scaled_frames)
# Detect persons in first scaled frame
first_scaled_frame = scaled_frames[0]
detections = self.detector.detect_persons(first_scaled_frame)
if not detections:
warnings.warn(f"No persons detected in chunk {chunk_idx}")
return self._create_empty_masks(original_frames)
print(f"Detected {len(detections)} persons in first frame (at inference resolution)")
# Convert detections to SAM2 prompts (detections are already at scaled resolution)
box_prompts, labels = self.detector.convert_to_sam_prompts(detections)
# Add prompts to SAM2
object_ids = self.sam2_model.add_person_prompts(0, box_prompts, labels)
print(f"Added prompts for {len(object_ids)} objects")
# Propagate masks through chunk at inference resolution
video_segments = self.sam2_model.propagate_masks(
start_frame=0,
max_frames=len(scaled_frames)
)
# Apply upscaled masks to original resolution frames
matted_frames = []
original_shape = original_frames[0].shape[:2] # (H, W)
for frame_idx, original_frame in enumerate(tqdm(original_frames, desc="Applying upscaled masks")):
if frame_idx in video_segments:
frame_masks = video_segments[frame_idx]
# Get combined mask at inference resolution
combined_mask_scaled = self.sam2_model.get_combined_mask(frame_masks)
if combined_mask_scaled is not None:
# Upscale mask to original resolution
combined_mask_full = self.upscale_mask(
combined_mask_scaled,
target_shape=original_shape,
method='cubic' # Smooth upscaling for masks
)
# Apply upscaled mask to original resolution frame
matted_frame = self.sam2_model.apply_mask_to_frame(
original_frame, combined_mask_full,
output_format=self.config.output.format,
background_color=self.config.output.background_color
)
else:
# No mask for this frame
matted_frame = self._create_empty_mask_frame(original_frame)
else:
# No mask for this frame
matted_frame = self._create_empty_mask_frame(original_frame)
matted_frames.append(matted_frame)
# Cleanup SAM2 state
self.sam2_model.cleanup()
print(f"✅ Dual-resolution processing complete: {len(matted_frames)} frames at full resolution")
return matted_frames
def _create_empty_masks(self, frames: List[np.ndarray]) -> List[np.ndarray]:
"""Create empty masks when no persons detected"""
empty_frames = []
@@ -398,60 +594,190 @@ class VideoProcessor:
overlap_frames: Number of overlapping frames
audio_source: Audio source file for final video
"""
from .streaming_video_writer import StreamingVideoWriter
if not chunk_files:
raise ValueError("No chunk files to merge")
print(f"🎬 Streaming merge: {len(chunk_files)} chunks → {output_path}")
print(f"🎬 TRUE Streaming merge: {len(chunk_files)} chunks → {output_path}")
# Initialize streaming writer
writer = StreamingVideoWriter(
output_path=output_path,
fps=self.video_info['fps'],
audio_source=audio_source
)
# Create temporary directory for frame images
import tempfile
temp_frames_dir = Path(tempfile.mkdtemp(prefix="merge_frames_"))
frame_counter = 0
try:
# Process each chunk without accumulation
print(f"📁 Using temp frames dir: {temp_frames_dir}")
# Process each chunk frame-by-frame (true streaming)
for i, chunk_file in enumerate(chunk_files):
print(f"📼 Processing chunk {i+1}/{len(chunk_files)}: {chunk_file.name}")
# Load chunk (this is the only copy in memory)
# Load chunk metadata without loading frames array
chunk_data = np.load(str(chunk_file))
frames = chunk_data['frames'].tolist() # Convert to list of arrays
frames_array = chunk_data['frames'] # This is still mmap'd, not loaded
total_frames_in_chunk = frames_array.shape[0]
# Determine which frames to skip for overlap
start_frame_idx = overlap_frames if i > 0 and overlap_frames > 0 else 0
frames_to_process = total_frames_in_chunk - start_frame_idx
if start_frame_idx > 0:
print(f" ✂️ Skipping first {start_frame_idx} overlapping frames")
print(f" 🔄 Processing {frames_to_process} frames one-by-one...")
# Process frames ONE AT A TIME (true streaming)
for frame_idx in range(start_frame_idx, total_frames_in_chunk):
# Load only ONE frame at a time
frame = frames_array[frame_idx] # Load single frame
# Save frame directly to disk
frame_path = temp_frames_dir / f"frame_{frame_counter:06d}.jpg"
success = cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
if not success:
raise RuntimeError(f"Failed to save frame {frame_counter}")
frame_counter += 1
# Periodic progress and cleanup
if frame_counter % 100 == 0:
print(f" 💾 Saved {frame_counter} frames...")
gc.collect() # Periodic cleanup
print(f" ✅ Saved {frames_to_process} frames to disk (total: {frame_counter})")
# Close chunk file and cleanup
chunk_data.close()
del chunk_data, frames_array
# Write chunk with streaming writer
writer.write_chunk(
frames=frames,
chunk_index=i,
overlap_frames=overlap_frames if i > 0 else 0,
blend_with_previous=(i > 0 and overlap_frames > 0)
)
# Don't delete checkpoint files - they're needed for potential resume
# The checkpoint system manages cleanup separately
print(f" 📋 Keeping checkpoint file: {chunk_file.name}")
# Immediately free memory
del frames, chunk_data
# Aggressive cleanup and memory monitoring after each chunk
self._aggressive_memory_cleanup(f"After streaming merge chunk {i}")
# Delete chunk file to free disk space
try:
chunk_file.unlink()
print(f" 🗑️ Deleted {chunk_file.name}")
except Exception as e:
print(f" ⚠️ Could not delete {chunk_file.name}: {e}")
# Memory safety check
memory_info = self._get_process_memory_info()
if memory_info['total_process_gb'] > 35: # Warning if approaching 46GB limit
print(f"⚠️ High memory usage: {memory_info['total_process_gb']:.1f}GB - forcing cleanup")
gc.collect()
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Aggressive cleanup every chunk
self._aggressive_memory_cleanup(f"After processing chunk {i}")
# Finalize the video
writer.finalize()
# Create final video directly from frame images using ffmpeg
print(f"📹 Creating final video from {frame_counter} frames...")
self._create_video_from_frames(temp_frames_dir, Path(output_path), frame_counter)
# Add audio if provided
if audio_source:
self._add_audio_to_video(output_path, audio_source)
except Exception as e:
print(f"❌ Streaming merge failed: {e}")
writer.cleanup()
raise
print(f"✅ Streaming merge complete: {output_path}")
finally:
# Cleanup temporary frames directory
try:
if temp_frames_dir.exists():
import shutil
shutil.rmtree(temp_frames_dir)
print(f"🗑️ Cleaned up temp frames dir: {temp_frames_dir}")
except Exception as e:
print(f"⚠️ Could not cleanup temp frames dir: {e}")
# Memory cleanup
gc.collect()
print(f"✅ TRUE Streaming merge complete: {output_path}")
def _create_video_from_frames(self, frames_dir: Path, output_path: Path, frame_count: int):
"""Create video directly from frame images using ffmpeg (memory efficient)"""
import subprocess
frame_pattern = str(frames_dir / "frame_%06d.jpg")
fps = self.video_info['fps'] if hasattr(self, 'video_info') and self.video_info else 30.0
print(f"🎬 Creating video with ffmpeg: {frame_count} frames at {fps} fps")
# Use GPU encoding if available, fallback to CPU
gpu_cmd = [
'ffmpeg', '-y', # -y to overwrite output file
'-framerate', str(fps),
'-i', frame_pattern,
'-c:v', 'h264_nvenc', # NVIDIA GPU encoder
'-preset', 'fast',
'-cq', '18', # Quality for GPU encoding
'-pix_fmt', 'yuv420p',
str(output_path)
]
cpu_cmd = [
'ffmpeg', '-y', # -y to overwrite output file
'-framerate', str(fps),
'-i', frame_pattern,
'-c:v', 'libx264', # CPU encoder
'-preset', 'medium',
'-crf', '18', # Quality for CPU encoding
'-pix_fmt', 'yuv420p',
str(output_path)
]
# Try GPU first
print(f"🚀 Trying GPU encoding...")
result = subprocess.run(gpu_cmd, capture_output=True, text=True)
if result.returncode != 0:
print("⚠️ GPU encoding failed, using CPU...")
print(f"🔄 CPU encoding...")
result = subprocess.run(cpu_cmd, capture_output=True, text=True)
else:
print("✅ GPU encoding successful!")
if result.returncode != 0:
print(f"❌ FFmpeg stdout: {result.stdout}")
print(f"❌ FFmpeg stderr: {result.stderr}")
raise RuntimeError(f"FFmpeg failed with return code {result.returncode}")
print(f"✅ Video created successfully: {output_path}")
def _add_audio_to_video(self, video_path: str, audio_source: str):
"""Add audio to video using ffmpeg"""
import subprocess
import tempfile
try:
# Create temporary file for output with audio
temp_path = Path(video_path).with_suffix('.temp.mp4')
cmd = [
'ffmpeg', '-y',
'-i', str(video_path), # Input video (no audio)
'-i', str(audio_source), # Input audio source
'-c:v', 'copy', # Copy video without re-encoding
'-c:a', 'aac', # Encode audio as AAC
'-map', '0:v:0', # Map video from first input
'-map', '1:a:0', # Map audio from second input
'-shortest', # Match shortest stream duration
str(temp_path)
]
print(f"🎵 Adding audio: {audio_source}{video_path}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"⚠️ Audio addition failed: {result.stderr}")
# Keep original video without audio
return
# Replace original with audio version
Path(video_path).unlink()
temp_path.rename(video_path)
print(f"✅ Audio added successfully")
except Exception as e:
print(f"⚠️ Could not add audio: {e}")
def merge_overlapping_chunks(self,
chunk_results: List[List[np.ndarray]],
@@ -648,48 +974,100 @@ class VideoProcessor:
print(f"⚠️ Could not verify frame count: {e}")
def process_video(self) -> None:
"""Main video processing pipeline"""
"""Main video processing pipeline with checkpoint/resume support"""
self.processing_stats['start_time'] = time.time()
print("Starting VR180 video processing...")
# Load video info
self.load_video_info(self.config.input.video_path)
# Initialize checkpoint manager
from .checkpoint_manager import CheckpointManager
checkpoint_mgr = CheckpointManager(
self.config.input.video_path,
self.config.output.path
)
# Check for existing checkpoints
resume_info = checkpoint_mgr.get_resume_info()
if resume_info['can_resume']:
print(f"\n🔄 RESUME DETECTED:")
print(f" Found {resume_info['completed_chunks']} completed chunks")
print(f" Continue from where we left off? (saves time!)")
checkpoint_mgr.print_status()
# Calculate chunking parameters
chunk_size, overlap_frames = self.calculate_optimal_chunking()
# Calculate total chunks
total_chunks = 0
for _ in range(0, self.total_frames, chunk_size - overlap_frames):
total_chunks += 1
checkpoint_mgr.set_total_chunks(total_chunks)
# Process video in chunks
chunk_files = [] # Store file paths instead of frame data
temp_chunk_dir = Path(tempfile.mkdtemp(prefix="vr180_chunks_"))
try:
chunk_idx = 0
for start_frame in range(0, self.total_frames, chunk_size - overlap_frames):
end_frame = min(start_frame + chunk_size, self.total_frames)
frames_to_read = end_frame - start_frame
chunk_idx = len(chunk_files)
# Check if this chunk was already processed
existing_chunk = checkpoint_mgr.get_chunk_file(chunk_idx)
if existing_chunk:
print(f"\n✅ Chunk {chunk_idx} already processed: {existing_chunk.name}")
chunk_files.append(existing_chunk)
chunk_idx += 1
continue
print(f"\nProcessing chunk {chunk_idx}: frames {start_frame}-{end_frame}")
# Read chunk frames
frames = self.read_video_frames(
self.config.input.video_path,
start_frame=start_frame,
num_frames=frames_to_read,
scale_factor=self.config.processing.scale_factor
)
# Process chunk
matted_frames = self.process_chunk(frames, chunk_idx)
# Choose processing approach based on scale factor
if self.config.processing.scale_factor == 1.0:
# No scaling needed - use original single-resolution approach
print(f"🔄 Reading frames at original resolution (no scaling)")
frames = self.read_video_frames(
self.config.input.video_path,
start_frame=start_frame,
num_frames=frames_to_read,
scale_factor=1.0
)
# Process chunk normally (single resolution)
matted_frames = self.process_chunk(frames, chunk_idx)
else:
# Scaling required - use dual-resolution approach
print(f"🔄 Reading frames at dual resolution (scale_factor={self.config.processing.scale_factor})")
frame_data = self.read_video_frames_dual_resolution(
self.config.input.video_path,
start_frame=start_frame,
num_frames=frames_to_read,
scale_factor=self.config.processing.scale_factor
)
# Process chunk with dual-resolution approach
matted_frames = self.process_chunk_dual_resolution(frame_data, chunk_idx)
# Save chunk to disk immediately to free memory
chunk_path = temp_chunk_dir / f"chunk_{chunk_idx:04d}.npz"
print(f"Saving chunk {chunk_idx} to disk...")
np.savez_compressed(str(chunk_path), frames=matted_frames)
# Save to checkpoint
checkpoint_mgr.save_chunk(chunk_idx, None, source_chunk_path=chunk_path)
chunk_files.append(chunk_path)
chunk_idx += 1
# Free the frames from memory immediately
del matted_frames
del frames
if self.config.processing.scale_factor == 1.0:
del frames
else:
del frame_data
# Update statistics
self.processing_stats['chunks_processed'] += 1
@@ -704,21 +1082,39 @@ class VideoProcessor:
if self.memory_manager.should_emergency_cleanup():
self.memory_manager.emergency_cleanup()
# Use streaming merge to avoid memory accumulation (fixes OOM)
print("\n🎬 Using streaming merge (no memory accumulation)...")
# Mark chunk processing as complete
checkpoint_mgr.mark_processing_complete()
# Determine audio source for final video
audio_source = None
if self.config.output.preserve_audio and Path(self.config.input.video_path).exists():
audio_source = self.config.input.video_path
# Stream merge chunks directly to output (no memory accumulation)
self.merge_chunks_streaming(
chunk_files=chunk_files,
output_path=self.config.output.path,
overlap_frames=overlap_frames,
audio_source=audio_source
)
# Check if merge was already done
if resume_info.get('merge_complete', False):
print("\n✅ Merge already completed in previous run!")
print(f" Output: {self.config.output.path}")
else:
# Use streaming merge to avoid memory accumulation (fixes OOM)
print("\n🎬 Using streaming merge (no memory accumulation)...")
# For resume scenarios, make sure we have all chunk files
if resume_info['can_resume']:
checkpoint_chunk_files = checkpoint_mgr.get_completed_chunk_files()
if len(checkpoint_chunk_files) != len(chunk_files):
print(f"⚠️ Using {len(checkpoint_chunk_files)} checkpoint files instead of {len(chunk_files)} temp files")
chunk_files = checkpoint_chunk_files
# Determine audio source for final video
audio_source = None
if self.config.output.preserve_audio and Path(self.config.input.video_path).exists():
audio_source = self.config.input.video_path
# Stream merge chunks directly to output (no memory accumulation)
self.merge_chunks_streaming(
chunk_files=chunk_files,
output_path=self.config.output.path,
overlap_frames=overlap_frames,
audio_source=audio_source
)
# Mark merge as complete
checkpoint_mgr.mark_merge_complete()
print("✅ Streaming merge complete - no memory accumulation!")
@@ -736,11 +1132,24 @@ class VideoProcessor:
print("Video processing completed!")
# Option to clean up checkpoints
print("\n🗄️ CHECKPOINT CLEANUP OPTIONS:")
print(" Checkpoints saved successfully and can be cleaned up")
print(" - Keep checkpoints for debugging: checkpoint_mgr.cleanup_checkpoints(keep_chunks=True)")
print(" - Remove all checkpoints: checkpoint_mgr.cleanup_checkpoints()")
print(f" - Checkpoint location: {checkpoint_mgr.checkpoint_dir}")
# For now, keep checkpoints by default (user can manually clean)
print("\n💡 Checkpoints kept for safety. Delete manually when no longer needed.")
finally:
# Clean up temporary chunk files
# Clean up temporary chunk files (but not checkpoints)
if temp_chunk_dir.exists():
print("Cleaning up temporary chunk files...")
shutil.rmtree(temp_chunk_dir)
try:
shutil.rmtree(temp_chunk_dir)
except Exception as e:
print(f"⚠️ Could not clean temp directory: {e}")
def _print_processing_statistics(self):
"""Print detailed processing statistics"""

View File

@@ -3,6 +3,7 @@ import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
import warnings
import torch
from .video_processor import VideoProcessor
from .config import VR180Config
@@ -212,6 +213,10 @@ class VR180Processor(VideoProcessor):
del right_matted
self._aggressive_memory_cleanup(f"After combining frames chunk {chunk_idx}")
# CRITICAL: Complete inter-chunk cleanup to prevent model persistence
# This ensures models don't accumulate between chunks
self._complete_inter_chunk_cleanup(chunk_idx)
return combined_frames
def _process_eye_sequence(self,
@@ -691,6 +696,64 @@ class VR180Processor(VideoProcessor):
# TODO: Implement proper stereo correction algorithm
return right_frame
def _complete_inter_chunk_cleanup(self, chunk_idx: int):
"""
Complete inter-chunk cleanup: Destroy all models to prevent memory accumulation
This addresses the core issue where SAM2 and YOLO models (~15-20GB)
persist between chunks, causing OOM when processing subsequent chunks.
"""
print(f"🧹 INTER-CHUNK CLEANUP: Destroying all models after chunk {chunk_idx}")
# 1. Completely destroy SAM2 model (15-20GB)
if hasattr(self, 'sam2_model') and self.sam2_model is not None:
self.sam2_model.cleanup() # Call existing cleanup
# Force complete destruction of the model
try:
# Reset the model's loaded state so it will reload fresh
if hasattr(self.sam2_model, '_model_loaded'):
self.sam2_model._model_loaded = False
# Clear any cached state
if hasattr(self.sam2_model, 'predictor'):
self.sam2_model.predictor = None
if hasattr(self.sam2_model, 'inference_state'):
self.sam2_model.inference_state = None
print(f" ✅ SAM2 model destroyed and marked for fresh reload")
except Exception as e:
print(f" ⚠️ SAM2 destruction warning: {e}")
# 2. Completely destroy YOLO detector (400MB+)
if hasattr(self, 'detector') and self.detector is not None:
try:
# Force YOLO model to be reloaded fresh
if hasattr(self.detector, 'model') and self.detector.model is not None:
del self.detector.model
self.detector.model = None
print(f" ✅ YOLO model destroyed and marked for fresh reload")
except Exception as e:
print(f" ⚠️ YOLO destruction warning: {e}")
# 3. Clear CUDA cache aggressively
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize() # Wait for all operations to complete
print(f" ✅ CUDA cache cleared")
# 4. Force garbage collection
import gc
collected = gc.collect()
print(f" ✅ Garbage collection: {collected} objects freed")
# 5. Memory verification
self._print_memory_step(f"After complete inter-chunk cleanup (chunk {chunk_idx})")
print(f"🎯 RESULT: Models will reload fresh for next chunk (prevents 15-20GB accumulation)")
def process_chunk(self,
frames: List[np.ndarray],
chunk_idx: int = 0) -> List[np.ndarray]:
@@ -750,6 +813,9 @@ class VR180Processor(VideoProcessor):
combined = {'left': left_frame, 'right': right_frame}
combined_frames.append(combined)
# CRITICAL: Complete inter-chunk cleanup for independent processing too
self._complete_inter_chunk_cleanup(chunk_idx)
return combined_frames
def save_video(self, frames: List[np.ndarray], output_path: str):