Compare commits
8 Commits
4d1361df46
...
det
| Author | SHA1 | Date | |
|---|---|---|---|
| 277d554ecc | |||
| d6d2b0aa93 | |||
| 3a547b7c21 | |||
| 262cb00b69 | |||
| caa4ddb5e0 | |||
| fa945b9c3e | |||
| 4958c503dd | |||
| 366b132ef5 |
@@ -107,8 +107,17 @@ def test_inter_chunk_cleanup():
|
||||
print(f" Memory freed: {cleanup_improvement:.2f}GB")
|
||||
print(f" Models destroyed: YOLO={yolo_reloaded}, SAM2={sam2_reloaded}")
|
||||
|
||||
if cleanup_improvement > total_model_memory * 0.5: # Freed >50% of model memory
|
||||
# Success criteria: Both models destroyed AND can reload
|
||||
models_destroyed = yolo_reloaded and sam2_reloaded
|
||||
can_reload = 'reload_growth' in locals()
|
||||
|
||||
if models_destroyed and can_reload:
|
||||
print("✅ Inter-chunk cleanup working effectively")
|
||||
print("💡 Models destroyed and can reload fresh (memory will be freed during real processing)")
|
||||
return True
|
||||
elif models_destroyed:
|
||||
print("⚠️ Models destroyed but reload test incomplete")
|
||||
print("💡 This should still prevent accumulation during real processing")
|
||||
return True
|
||||
else:
|
||||
print("❌ Inter-chunk cleanup not freeing enough memory")
|
||||
|
||||
220
vr180_matting/checkpoint_manager.py
Normal file
220
vr180_matting/checkpoint_manager.py
Normal file
@@ -0,0 +1,220 @@
|
||||
"""
|
||||
Checkpoint manager for resumable video processing
|
||||
Saves progress to avoid reprocessing after OOM or crashes
|
||||
"""
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional, List
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class CheckpointManager:
|
||||
"""Manages processing checkpoints for resumable execution"""
|
||||
|
||||
def __init__(self, video_path: str, output_path: str, checkpoint_dir: Optional[Path] = None):
|
||||
"""
|
||||
Initialize checkpoint manager
|
||||
|
||||
Args:
|
||||
video_path: Input video path
|
||||
output_path: Output video path
|
||||
checkpoint_dir: Directory for checkpoint files (default: .vr180_checkpoints in CWD)
|
||||
"""
|
||||
self.video_path = Path(video_path)
|
||||
self.output_path = Path(output_path)
|
||||
|
||||
# Create unique checkpoint ID based on video file
|
||||
self.video_hash = self._compute_video_hash()
|
||||
|
||||
# Setup checkpoint directory
|
||||
if checkpoint_dir is None:
|
||||
self.checkpoint_dir = Path.cwd() / ".vr180_checkpoints" / self.video_hash
|
||||
else:
|
||||
self.checkpoint_dir = Path(checkpoint_dir) / self.video_hash
|
||||
|
||||
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Checkpoint files
|
||||
self.status_file = self.checkpoint_dir / "processing_status.json"
|
||||
self.chunks_dir = self.checkpoint_dir / "chunks"
|
||||
self.chunks_dir.mkdir(exist_ok=True)
|
||||
|
||||
# Load existing status or create new
|
||||
self.status = self._load_status()
|
||||
|
||||
def _compute_video_hash(self) -> str:
|
||||
"""Compute hash of video file for unique identification"""
|
||||
# Use file path, size, and modification time for quick hash
|
||||
stat = self.video_path.stat()
|
||||
hash_str = f"{self.video_path}_{stat.st_size}_{stat.st_mtime}"
|
||||
return hashlib.md5(hash_str.encode()).hexdigest()[:12]
|
||||
|
||||
def _load_status(self) -> Dict[str, Any]:
|
||||
"""Load processing status from checkpoint file"""
|
||||
if self.status_file.exists():
|
||||
with open(self.status_file, 'r') as f:
|
||||
status = json.load(f)
|
||||
print(f"📋 Loaded checkpoint: {status['completed_chunks']}/{status['total_chunks']} chunks completed")
|
||||
return status
|
||||
else:
|
||||
# Create new status
|
||||
return {
|
||||
'video_path': str(self.video_path),
|
||||
'output_path': str(self.output_path),
|
||||
'video_hash': self.video_hash,
|
||||
'start_time': datetime.now().isoformat(),
|
||||
'total_chunks': 0,
|
||||
'completed_chunks': 0,
|
||||
'chunk_info': {},
|
||||
'processing_complete': False,
|
||||
'merge_complete': False
|
||||
}
|
||||
|
||||
def _save_status(self):
|
||||
"""Save current status to checkpoint file"""
|
||||
self.status['last_update'] = datetime.now().isoformat()
|
||||
with open(self.status_file, 'w') as f:
|
||||
json.dump(self.status, f, indent=2)
|
||||
|
||||
def set_total_chunks(self, total_chunks: int):
|
||||
"""Set total number of chunks to process"""
|
||||
self.status['total_chunks'] = total_chunks
|
||||
self._save_status()
|
||||
|
||||
def is_chunk_completed(self, chunk_idx: int) -> bool:
|
||||
"""Check if a chunk has already been processed"""
|
||||
chunk_key = f"chunk_{chunk_idx}"
|
||||
return chunk_key in self.status['chunk_info'] and \
|
||||
self.status['chunk_info'][chunk_key].get('completed', False)
|
||||
|
||||
def get_chunk_file(self, chunk_idx: int) -> Optional[Path]:
|
||||
"""Get saved chunk file path if it exists"""
|
||||
chunk_file = self.chunks_dir / f"chunk_{chunk_idx:04d}.npz"
|
||||
if chunk_file.exists() and self.is_chunk_completed(chunk_idx):
|
||||
return chunk_file
|
||||
return None
|
||||
|
||||
def save_chunk(self, chunk_idx: int, frames: List, source_chunk_path: Optional[Path] = None):
|
||||
"""
|
||||
Save processed chunk and mark as completed
|
||||
|
||||
Args:
|
||||
chunk_idx: Chunk index
|
||||
frames: Processed frames (can be None if using source_chunk_path)
|
||||
source_chunk_path: If provided, copy this file instead of saving frames
|
||||
"""
|
||||
chunk_file = self.chunks_dir / f"chunk_{chunk_idx:04d}.npz"
|
||||
|
||||
try:
|
||||
if source_chunk_path and source_chunk_path.exists():
|
||||
# Copy existing chunk file
|
||||
shutil.copy2(source_chunk_path, chunk_file)
|
||||
print(f"💾 Copied chunk {chunk_idx} to checkpoint: {chunk_file.name}")
|
||||
elif frames is not None:
|
||||
# Save new frames
|
||||
import numpy as np
|
||||
np.savez_compressed(str(chunk_file), frames=frames)
|
||||
print(f"💾 Saved chunk {chunk_idx} to checkpoint: {chunk_file.name}")
|
||||
else:
|
||||
raise ValueError("Either frames or source_chunk_path must be provided")
|
||||
|
||||
# Update status
|
||||
chunk_key = f"chunk_{chunk_idx}"
|
||||
self.status['chunk_info'][chunk_key] = {
|
||||
'completed': True,
|
||||
'file': chunk_file.name,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
self.status['completed_chunks'] = len([c for c in self.status['chunk_info'].values() if c['completed']])
|
||||
self._save_status()
|
||||
|
||||
print(f"✅ Chunk {chunk_idx} checkpoint saved ({self.status['completed_chunks']}/{self.status['total_chunks']})")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to save chunk {chunk_idx} checkpoint: {e}")
|
||||
|
||||
def get_completed_chunk_files(self) -> List[Path]:
|
||||
"""Get list of all completed chunk files in order"""
|
||||
chunk_files = []
|
||||
missing_chunks = []
|
||||
|
||||
for i in range(self.status['total_chunks']):
|
||||
chunk_file = self.get_chunk_file(i)
|
||||
if chunk_file:
|
||||
chunk_files.append(chunk_file)
|
||||
else:
|
||||
# Check if chunk is marked as completed but file is missing
|
||||
if self.is_chunk_completed(i):
|
||||
missing_chunks.append(i)
|
||||
print(f"⚠️ Chunk {i} marked complete but file missing!")
|
||||
else:
|
||||
break # Stop at first unprocessed chunk
|
||||
|
||||
if missing_chunks:
|
||||
print(f"❌ Missing checkpoint files for chunks: {missing_chunks}")
|
||||
print(f" This may happen if files were deleted during streaming merge")
|
||||
print(f" These chunks may need to be reprocessed")
|
||||
|
||||
return chunk_files
|
||||
|
||||
def mark_processing_complete(self):
|
||||
"""Mark all chunk processing as complete"""
|
||||
self.status['processing_complete'] = True
|
||||
self._save_status()
|
||||
print(f"✅ All chunks processed and checkpointed")
|
||||
|
||||
def mark_merge_complete(self):
|
||||
"""Mark final merge as complete"""
|
||||
self.status['merge_complete'] = True
|
||||
self._save_status()
|
||||
print(f"✅ Video merge completed")
|
||||
|
||||
def cleanup_checkpoints(self, keep_chunks: bool = False):
|
||||
"""
|
||||
Clean up checkpoint files after successful completion
|
||||
|
||||
Args:
|
||||
keep_chunks: If True, keep chunk files but remove status
|
||||
"""
|
||||
if keep_chunks:
|
||||
# Just remove status file
|
||||
if self.status_file.exists():
|
||||
self.status_file.unlink()
|
||||
print(f"🗑️ Removed checkpoint status file")
|
||||
else:
|
||||
# Remove entire checkpoint directory
|
||||
if self.checkpoint_dir.exists():
|
||||
shutil.rmtree(self.checkpoint_dir)
|
||||
print(f"🗑️ Removed all checkpoint files: {self.checkpoint_dir}")
|
||||
|
||||
def get_resume_info(self) -> Dict[str, Any]:
|
||||
"""Get information about what can be resumed"""
|
||||
return {
|
||||
'can_resume': self.status['completed_chunks'] > 0,
|
||||
'completed_chunks': self.status['completed_chunks'],
|
||||
'total_chunks': self.status['total_chunks'],
|
||||
'processing_complete': self.status['processing_complete'],
|
||||
'merge_complete': self.status['merge_complete'],
|
||||
'checkpoint_dir': str(self.checkpoint_dir)
|
||||
}
|
||||
|
||||
def print_status(self):
|
||||
"""Print current checkpoint status"""
|
||||
print(f"\n📊 CHECKPOINT STATUS:")
|
||||
print(f" Video: {self.video_path.name}")
|
||||
print(f" Hash: {self.video_hash}")
|
||||
print(f" Progress: {self.status['completed_chunks']}/{self.status['total_chunks']} chunks")
|
||||
print(f" Processing complete: {self.status['processing_complete']}")
|
||||
print(f" Merge complete: {self.status['merge_complete']}")
|
||||
print(f" Checkpoint dir: {self.checkpoint_dir}")
|
||||
|
||||
if self.status['completed_chunks'] > 0:
|
||||
print(f"\n Completed chunks:")
|
||||
for i in range(self.status['completed_chunks']):
|
||||
chunk_info = self.status['chunk_info'].get(f'chunk_{i}', {})
|
||||
if chunk_info.get('completed'):
|
||||
print(f" ✓ Chunk {i}: {chunk_info.get('file', 'unknown')}")
|
||||
@@ -281,6 +281,116 @@ class VideoProcessor:
|
||||
print(f"Read {len(frames)} frames")
|
||||
return frames
|
||||
|
||||
def read_video_frames_dual_resolution(self,
|
||||
video_path: str,
|
||||
start_frame: int = 0,
|
||||
num_frames: Optional[int] = None,
|
||||
scale_factor: float = 0.5) -> Dict[str, List[np.ndarray]]:
|
||||
"""
|
||||
Read video frames at both original and scaled resolution for dual-resolution processing
|
||||
|
||||
Args:
|
||||
video_path: Path to video file
|
||||
start_frame: Starting frame index
|
||||
num_frames: Number of frames to read (None for all)
|
||||
scale_factor: Scaling factor for inference frames
|
||||
|
||||
Returns:
|
||||
Dictionary with 'original' and 'scaled' frame lists
|
||||
"""
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
|
||||
if not cap.isOpened():
|
||||
raise RuntimeError(f"Could not open video file: {video_path}")
|
||||
|
||||
# Set starting position
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
||||
|
||||
original_frames = []
|
||||
scaled_frames = []
|
||||
frame_count = 0
|
||||
|
||||
# Progress tracking
|
||||
total_to_read = num_frames if num_frames else self.total_frames - start_frame
|
||||
|
||||
with tqdm(total=total_to_read, desc="Reading dual-resolution frames") as pbar:
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
# Store original frame
|
||||
original_frames.append(frame.copy())
|
||||
|
||||
# Create scaled frame for inference
|
||||
if scale_factor != 1.0:
|
||||
new_width = int(frame.shape[1] * scale_factor)
|
||||
new_height = int(frame.shape[0] * scale_factor)
|
||||
scaled_frame = cv2.resize(frame, (new_width, new_height),
|
||||
interpolation=cv2.INTER_AREA)
|
||||
else:
|
||||
scaled_frame = frame.copy()
|
||||
|
||||
scaled_frames.append(scaled_frame)
|
||||
frame_count += 1
|
||||
pbar.update(1)
|
||||
|
||||
if num_frames is not None and frame_count >= num_frames:
|
||||
break
|
||||
|
||||
cap.release()
|
||||
|
||||
print(f"Loaded {len(original_frames)} frames:")
|
||||
print(f" Original: {original_frames[0].shape} per frame")
|
||||
print(f" Scaled: {scaled_frames[0].shape} per frame (scale_factor={scale_factor})")
|
||||
|
||||
return {
|
||||
'original': original_frames,
|
||||
'scaled': scaled_frames
|
||||
}
|
||||
|
||||
def upscale_mask(self, mask: np.ndarray, target_shape: tuple, method: str = 'cubic') -> np.ndarray:
|
||||
"""
|
||||
Upscale a mask from inference resolution to original resolution
|
||||
|
||||
Args:
|
||||
mask: Low-resolution mask (H, W)
|
||||
target_shape: Target shape (H, W) for upscaling
|
||||
method: Upscaling method ('nearest', 'cubic', 'area')
|
||||
|
||||
Returns:
|
||||
Upscaled mask at target resolution
|
||||
"""
|
||||
if mask.shape[:2] == target_shape[:2]:
|
||||
return mask # Already correct size
|
||||
|
||||
# Ensure mask is 2D
|
||||
if mask.ndim == 3:
|
||||
mask = mask.squeeze()
|
||||
|
||||
# Choose interpolation method
|
||||
if method == 'nearest':
|
||||
interpolation = cv2.INTER_NEAREST # Crisp edges, good for sharp subjects
|
||||
elif method == 'cubic':
|
||||
interpolation = cv2.INTER_CUBIC # Smooth edges, good for most content
|
||||
elif method == 'area':
|
||||
interpolation = cv2.INTER_AREA # Good for downscaling, not upscaling
|
||||
else:
|
||||
interpolation = cv2.INTER_CUBIC # Default to cubic
|
||||
|
||||
# Upscale mask
|
||||
upscaled_mask = cv2.resize(
|
||||
mask.astype(np.uint8),
|
||||
(target_shape[1], target_shape[0]), # (width, height) for cv2.resize
|
||||
interpolation=interpolation
|
||||
)
|
||||
|
||||
# Convert back to boolean if it was originally boolean
|
||||
if mask.dtype == bool:
|
||||
upscaled_mask = upscaled_mask.astype(bool)
|
||||
|
||||
return upscaled_mask
|
||||
|
||||
def calculate_optimal_chunking(self) -> Tuple[int, int]:
|
||||
"""
|
||||
Calculate optimal chunk size and overlap based on memory constraints
|
||||
@@ -369,6 +479,92 @@ class VideoProcessor:
|
||||
|
||||
return matted_frames
|
||||
|
||||
def process_chunk_dual_resolution(self,
|
||||
frame_data: Dict[str, List[np.ndarray]],
|
||||
chunk_idx: int = 0) -> List[np.ndarray]:
|
||||
"""
|
||||
Process a chunk using dual-resolution approach: inference at low-res, output at full-res
|
||||
|
||||
Args:
|
||||
frame_data: Dictionary with 'original' and 'scaled' frame lists
|
||||
chunk_idx: Chunk index for logging
|
||||
|
||||
Returns:
|
||||
List of matted frames at original resolution
|
||||
"""
|
||||
original_frames = frame_data['original']
|
||||
scaled_frames = frame_data['scaled']
|
||||
|
||||
print(f"Processing chunk {chunk_idx} with dual-resolution ({len(original_frames)} frames)")
|
||||
print(f" Inference: {scaled_frames[0].shape} → Output: {original_frames[0].shape}")
|
||||
|
||||
with self.memory_manager.memory_monitor(f"dual-res chunk {chunk_idx}"):
|
||||
# Initialize SAM2 with scaled frames for inference
|
||||
self.sam2_model.init_video_state(scaled_frames)
|
||||
|
||||
# Detect persons in first scaled frame
|
||||
first_scaled_frame = scaled_frames[0]
|
||||
detections = self.detector.detect_persons(first_scaled_frame)
|
||||
|
||||
if not detections:
|
||||
warnings.warn(f"No persons detected in chunk {chunk_idx}")
|
||||
return self._create_empty_masks(original_frames)
|
||||
|
||||
print(f"Detected {len(detections)} persons in first frame (at inference resolution)")
|
||||
|
||||
# Convert detections to SAM2 prompts (detections are already at scaled resolution)
|
||||
box_prompts, labels = self.detector.convert_to_sam_prompts(detections)
|
||||
|
||||
# Add prompts to SAM2
|
||||
object_ids = self.sam2_model.add_person_prompts(0, box_prompts, labels)
|
||||
print(f"Added prompts for {len(object_ids)} objects")
|
||||
|
||||
# Propagate masks through chunk at inference resolution
|
||||
video_segments = self.sam2_model.propagate_masks(
|
||||
start_frame=0,
|
||||
max_frames=len(scaled_frames)
|
||||
)
|
||||
|
||||
# Apply upscaled masks to original resolution frames
|
||||
matted_frames = []
|
||||
original_shape = original_frames[0].shape[:2] # (H, W)
|
||||
|
||||
for frame_idx, original_frame in enumerate(tqdm(original_frames, desc="Applying upscaled masks")):
|
||||
if frame_idx in video_segments:
|
||||
frame_masks = video_segments[frame_idx]
|
||||
|
||||
# Get combined mask at inference resolution
|
||||
combined_mask_scaled = self.sam2_model.get_combined_mask(frame_masks)
|
||||
|
||||
if combined_mask_scaled is not None:
|
||||
# Upscale mask to original resolution
|
||||
combined_mask_full = self.upscale_mask(
|
||||
combined_mask_scaled,
|
||||
target_shape=original_shape,
|
||||
method='cubic' # Smooth upscaling for masks
|
||||
)
|
||||
|
||||
# Apply upscaled mask to original resolution frame
|
||||
matted_frame = self.sam2_model.apply_mask_to_frame(
|
||||
original_frame, combined_mask_full,
|
||||
output_format=self.config.output.format,
|
||||
background_color=self.config.output.background_color
|
||||
)
|
||||
else:
|
||||
# No mask for this frame
|
||||
matted_frame = self._create_empty_mask_frame(original_frame)
|
||||
else:
|
||||
# No mask for this frame
|
||||
matted_frame = self._create_empty_mask_frame(original_frame)
|
||||
|
||||
matted_frames.append(matted_frame)
|
||||
|
||||
# Cleanup SAM2 state
|
||||
self.sam2_model.cleanup()
|
||||
|
||||
print(f"✅ Dual-resolution processing complete: {len(matted_frames)} frames at full resolution")
|
||||
return matted_frames
|
||||
|
||||
def _create_empty_masks(self, frames: List[np.ndarray]) -> List[np.ndarray]:
|
||||
"""Create empty masks when no persons detected"""
|
||||
empty_frames = []
|
||||
@@ -398,60 +594,190 @@ class VideoProcessor:
|
||||
overlap_frames: Number of overlapping frames
|
||||
audio_source: Audio source file for final video
|
||||
"""
|
||||
from .streaming_video_writer import StreamingVideoWriter
|
||||
|
||||
if not chunk_files:
|
||||
raise ValueError("No chunk files to merge")
|
||||
|
||||
print(f"🎬 Streaming merge: {len(chunk_files)} chunks → {output_path}")
|
||||
print(f"🎬 TRUE Streaming merge: {len(chunk_files)} chunks → {output_path}")
|
||||
|
||||
# Initialize streaming writer
|
||||
writer = StreamingVideoWriter(
|
||||
output_path=output_path,
|
||||
fps=self.video_info['fps'],
|
||||
audio_source=audio_source
|
||||
)
|
||||
# Create temporary directory for frame images
|
||||
import tempfile
|
||||
temp_frames_dir = Path(tempfile.mkdtemp(prefix="merge_frames_"))
|
||||
frame_counter = 0
|
||||
|
||||
try:
|
||||
# Process each chunk without accumulation
|
||||
print(f"📁 Using temp frames dir: {temp_frames_dir}")
|
||||
|
||||
# Process each chunk frame-by-frame (true streaming)
|
||||
for i, chunk_file in enumerate(chunk_files):
|
||||
print(f"📼 Processing chunk {i+1}/{len(chunk_files)}: {chunk_file.name}")
|
||||
|
||||
# Load chunk (this is the only copy in memory)
|
||||
# Load chunk metadata without loading frames array
|
||||
chunk_data = np.load(str(chunk_file))
|
||||
frames = chunk_data['frames'].tolist() # Convert to list of arrays
|
||||
frames_array = chunk_data['frames'] # This is still mmap'd, not loaded
|
||||
total_frames_in_chunk = frames_array.shape[0]
|
||||
|
||||
# Determine which frames to skip for overlap
|
||||
start_frame_idx = overlap_frames if i > 0 and overlap_frames > 0 else 0
|
||||
frames_to_process = total_frames_in_chunk - start_frame_idx
|
||||
|
||||
if start_frame_idx > 0:
|
||||
print(f" ✂️ Skipping first {start_frame_idx} overlapping frames")
|
||||
|
||||
print(f" 🔄 Processing {frames_to_process} frames one-by-one...")
|
||||
|
||||
# Process frames ONE AT A TIME (true streaming)
|
||||
for frame_idx in range(start_frame_idx, total_frames_in_chunk):
|
||||
# Load only ONE frame at a time
|
||||
frame = frames_array[frame_idx] # Load single frame
|
||||
|
||||
# Save frame directly to disk
|
||||
frame_path = temp_frames_dir / f"frame_{frame_counter:06d}.jpg"
|
||||
success = cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
|
||||
if not success:
|
||||
raise RuntimeError(f"Failed to save frame {frame_counter}")
|
||||
|
||||
frame_counter += 1
|
||||
|
||||
# Periodic progress and cleanup
|
||||
if frame_counter % 100 == 0:
|
||||
print(f" 💾 Saved {frame_counter} frames...")
|
||||
gc.collect() # Periodic cleanup
|
||||
|
||||
print(f" ✅ Saved {frames_to_process} frames to disk (total: {frame_counter})")
|
||||
|
||||
# Close chunk file and cleanup
|
||||
chunk_data.close()
|
||||
del chunk_data, frames_array
|
||||
|
||||
# Write chunk with streaming writer
|
||||
writer.write_chunk(
|
||||
frames=frames,
|
||||
chunk_index=i,
|
||||
overlap_frames=overlap_frames if i > 0 else 0,
|
||||
blend_with_previous=(i > 0 and overlap_frames > 0)
|
||||
)
|
||||
# Don't delete checkpoint files - they're needed for potential resume
|
||||
# The checkpoint system manages cleanup separately
|
||||
print(f" 📋 Keeping checkpoint file: {chunk_file.name}")
|
||||
|
||||
# Immediately free memory
|
||||
del frames, chunk_data
|
||||
# Aggressive cleanup and memory monitoring after each chunk
|
||||
self._aggressive_memory_cleanup(f"After streaming merge chunk {i}")
|
||||
|
||||
# Delete chunk file to free disk space
|
||||
try:
|
||||
chunk_file.unlink()
|
||||
print(f" 🗑️ Deleted {chunk_file.name}")
|
||||
except Exception as e:
|
||||
print(f" ⚠️ Could not delete {chunk_file.name}: {e}")
|
||||
# Memory safety check
|
||||
memory_info = self._get_process_memory_info()
|
||||
if memory_info['total_process_gb'] > 35: # Warning if approaching 46GB limit
|
||||
print(f"⚠️ High memory usage: {memory_info['total_process_gb']:.1f}GB - forcing cleanup")
|
||||
gc.collect()
|
||||
import torch
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
# Aggressive cleanup every chunk
|
||||
self._aggressive_memory_cleanup(f"After processing chunk {i}")
|
||||
# Create final video directly from frame images using ffmpeg
|
||||
print(f"📹 Creating final video from {frame_counter} frames...")
|
||||
self._create_video_from_frames(temp_frames_dir, Path(output_path), frame_counter)
|
||||
|
||||
# Finalize the video
|
||||
writer.finalize()
|
||||
# Add audio if provided
|
||||
if audio_source:
|
||||
self._add_audio_to_video(output_path, audio_source)
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Streaming merge failed: {e}")
|
||||
writer.cleanup()
|
||||
raise
|
||||
|
||||
print(f"✅ Streaming merge complete: {output_path}")
|
||||
finally:
|
||||
# Cleanup temporary frames directory
|
||||
try:
|
||||
if temp_frames_dir.exists():
|
||||
import shutil
|
||||
shutil.rmtree(temp_frames_dir)
|
||||
print(f"🗑️ Cleaned up temp frames dir: {temp_frames_dir}")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not cleanup temp frames dir: {e}")
|
||||
|
||||
# Memory cleanup
|
||||
gc.collect()
|
||||
|
||||
print(f"✅ TRUE Streaming merge complete: {output_path}")
|
||||
|
||||
def _create_video_from_frames(self, frames_dir: Path, output_path: Path, frame_count: int):
|
||||
"""Create video directly from frame images using ffmpeg (memory efficient)"""
|
||||
import subprocess
|
||||
|
||||
frame_pattern = str(frames_dir / "frame_%06d.jpg")
|
||||
fps = self.video_info['fps'] if hasattr(self, 'video_info') and self.video_info else 30.0
|
||||
|
||||
print(f"🎬 Creating video with ffmpeg: {frame_count} frames at {fps} fps")
|
||||
|
||||
# Use GPU encoding if available, fallback to CPU
|
||||
gpu_cmd = [
|
||||
'ffmpeg', '-y', # -y to overwrite output file
|
||||
'-framerate', str(fps),
|
||||
'-i', frame_pattern,
|
||||
'-c:v', 'h264_nvenc', # NVIDIA GPU encoder
|
||||
'-preset', 'fast',
|
||||
'-cq', '18', # Quality for GPU encoding
|
||||
'-pix_fmt', 'yuv420p',
|
||||
str(output_path)
|
||||
]
|
||||
|
||||
cpu_cmd = [
|
||||
'ffmpeg', '-y', # -y to overwrite output file
|
||||
'-framerate', str(fps),
|
||||
'-i', frame_pattern,
|
||||
'-c:v', 'libx264', # CPU encoder
|
||||
'-preset', 'medium',
|
||||
'-crf', '18', # Quality for CPU encoding
|
||||
'-pix_fmt', 'yuv420p',
|
||||
str(output_path)
|
||||
]
|
||||
|
||||
# Try GPU first
|
||||
print(f"🚀 Trying GPU encoding...")
|
||||
result = subprocess.run(gpu_cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
print("⚠️ GPU encoding failed, using CPU...")
|
||||
print(f"🔄 CPU encoding...")
|
||||
result = subprocess.run(cpu_cmd, capture_output=True, text=True)
|
||||
else:
|
||||
print("✅ GPU encoding successful!")
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"❌ FFmpeg stdout: {result.stdout}")
|
||||
print(f"❌ FFmpeg stderr: {result.stderr}")
|
||||
raise RuntimeError(f"FFmpeg failed with return code {result.returncode}")
|
||||
|
||||
print(f"✅ Video created successfully: {output_path}")
|
||||
|
||||
def _add_audio_to_video(self, video_path: str, audio_source: str):
|
||||
"""Add audio to video using ffmpeg"""
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
try:
|
||||
# Create temporary file for output with audio
|
||||
temp_path = Path(video_path).with_suffix('.temp.mp4')
|
||||
|
||||
cmd = [
|
||||
'ffmpeg', '-y',
|
||||
'-i', str(video_path), # Input video (no audio)
|
||||
'-i', str(audio_source), # Input audio source
|
||||
'-c:v', 'copy', # Copy video without re-encoding
|
||||
'-c:a', 'aac', # Encode audio as AAC
|
||||
'-map', '0:v:0', # Map video from first input
|
||||
'-map', '1:a:0', # Map audio from second input
|
||||
'-shortest', # Match shortest stream duration
|
||||
str(temp_path)
|
||||
]
|
||||
|
||||
print(f"🎵 Adding audio: {audio_source} → {video_path}")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"⚠️ Audio addition failed: {result.stderr}")
|
||||
# Keep original video without audio
|
||||
return
|
||||
|
||||
# Replace original with audio version
|
||||
Path(video_path).unlink()
|
||||
temp_path.rename(video_path)
|
||||
print(f"✅ Audio added successfully")
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not add audio: {e}")
|
||||
|
||||
def merge_overlapping_chunks(self,
|
||||
chunk_results: List[List[np.ndarray]],
|
||||
@@ -648,48 +974,100 @@ class VideoProcessor:
|
||||
print(f"⚠️ Could not verify frame count: {e}")
|
||||
|
||||
def process_video(self) -> None:
|
||||
"""Main video processing pipeline"""
|
||||
"""Main video processing pipeline with checkpoint/resume support"""
|
||||
self.processing_stats['start_time'] = time.time()
|
||||
print("Starting VR180 video processing...")
|
||||
|
||||
# Load video info
|
||||
self.load_video_info(self.config.input.video_path)
|
||||
|
||||
# Initialize checkpoint manager
|
||||
from .checkpoint_manager import CheckpointManager
|
||||
checkpoint_mgr = CheckpointManager(
|
||||
self.config.input.video_path,
|
||||
self.config.output.path
|
||||
)
|
||||
|
||||
# Check for existing checkpoints
|
||||
resume_info = checkpoint_mgr.get_resume_info()
|
||||
if resume_info['can_resume']:
|
||||
print(f"\n🔄 RESUME DETECTED:")
|
||||
print(f" Found {resume_info['completed_chunks']} completed chunks")
|
||||
print(f" Continue from where we left off? (saves time!)")
|
||||
checkpoint_mgr.print_status()
|
||||
|
||||
# Calculate chunking parameters
|
||||
chunk_size, overlap_frames = self.calculate_optimal_chunking()
|
||||
|
||||
# Calculate total chunks
|
||||
total_chunks = 0
|
||||
for _ in range(0, self.total_frames, chunk_size - overlap_frames):
|
||||
total_chunks += 1
|
||||
checkpoint_mgr.set_total_chunks(total_chunks)
|
||||
|
||||
# Process video in chunks
|
||||
chunk_files = [] # Store file paths instead of frame data
|
||||
temp_chunk_dir = Path(tempfile.mkdtemp(prefix="vr180_chunks_"))
|
||||
|
||||
try:
|
||||
chunk_idx = 0
|
||||
for start_frame in range(0, self.total_frames, chunk_size - overlap_frames):
|
||||
end_frame = min(start_frame + chunk_size, self.total_frames)
|
||||
frames_to_read = end_frame - start_frame
|
||||
|
||||
chunk_idx = len(chunk_files)
|
||||
# Check if this chunk was already processed
|
||||
existing_chunk = checkpoint_mgr.get_chunk_file(chunk_idx)
|
||||
if existing_chunk:
|
||||
print(f"\n✅ Chunk {chunk_idx} already processed: {existing_chunk.name}")
|
||||
chunk_files.append(existing_chunk)
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
print(f"\nProcessing chunk {chunk_idx}: frames {start_frame}-{end_frame}")
|
||||
|
||||
# Read chunk frames
|
||||
frames = self.read_video_frames(
|
||||
self.config.input.video_path,
|
||||
start_frame=start_frame,
|
||||
num_frames=frames_to_read,
|
||||
scale_factor=self.config.processing.scale_factor
|
||||
)
|
||||
# Choose processing approach based on scale factor
|
||||
if self.config.processing.scale_factor == 1.0:
|
||||
# No scaling needed - use original single-resolution approach
|
||||
print(f"🔄 Reading frames at original resolution (no scaling)")
|
||||
frames = self.read_video_frames(
|
||||
self.config.input.video_path,
|
||||
start_frame=start_frame,
|
||||
num_frames=frames_to_read,
|
||||
scale_factor=1.0
|
||||
)
|
||||
|
||||
# Process chunk
|
||||
matted_frames = self.process_chunk(frames, chunk_idx)
|
||||
# Process chunk normally (single resolution)
|
||||
matted_frames = self.process_chunk(frames, chunk_idx)
|
||||
else:
|
||||
# Scaling required - use dual-resolution approach
|
||||
print(f"🔄 Reading frames at dual resolution (scale_factor={self.config.processing.scale_factor})")
|
||||
frame_data = self.read_video_frames_dual_resolution(
|
||||
self.config.input.video_path,
|
||||
start_frame=start_frame,
|
||||
num_frames=frames_to_read,
|
||||
scale_factor=self.config.processing.scale_factor
|
||||
)
|
||||
|
||||
# Process chunk with dual-resolution approach
|
||||
matted_frames = self.process_chunk_dual_resolution(frame_data, chunk_idx)
|
||||
|
||||
# Save chunk to disk immediately to free memory
|
||||
chunk_path = temp_chunk_dir / f"chunk_{chunk_idx:04d}.npz"
|
||||
print(f"Saving chunk {chunk_idx} to disk...")
|
||||
np.savez_compressed(str(chunk_path), frames=matted_frames)
|
||||
|
||||
# Save to checkpoint
|
||||
checkpoint_mgr.save_chunk(chunk_idx, None, source_chunk_path=chunk_path)
|
||||
|
||||
chunk_files.append(chunk_path)
|
||||
chunk_idx += 1
|
||||
|
||||
# Free the frames from memory immediately
|
||||
del matted_frames
|
||||
del frames
|
||||
if self.config.processing.scale_factor == 1.0:
|
||||
del frames
|
||||
else:
|
||||
del frame_data
|
||||
|
||||
# Update statistics
|
||||
self.processing_stats['chunks_processed'] += 1
|
||||
@@ -704,21 +1082,39 @@ class VideoProcessor:
|
||||
if self.memory_manager.should_emergency_cleanup():
|
||||
self.memory_manager.emergency_cleanup()
|
||||
|
||||
# Use streaming merge to avoid memory accumulation (fixes OOM)
|
||||
print("\n🎬 Using streaming merge (no memory accumulation)...")
|
||||
# Mark chunk processing as complete
|
||||
checkpoint_mgr.mark_processing_complete()
|
||||
|
||||
# Determine audio source for final video
|
||||
audio_source = None
|
||||
if self.config.output.preserve_audio and Path(self.config.input.video_path).exists():
|
||||
audio_source = self.config.input.video_path
|
||||
# Check if merge was already done
|
||||
if resume_info.get('merge_complete', False):
|
||||
print("\n✅ Merge already completed in previous run!")
|
||||
print(f" Output: {self.config.output.path}")
|
||||
else:
|
||||
# Use streaming merge to avoid memory accumulation (fixes OOM)
|
||||
print("\n🎬 Using streaming merge (no memory accumulation)...")
|
||||
|
||||
# Stream merge chunks directly to output (no memory accumulation)
|
||||
self.merge_chunks_streaming(
|
||||
chunk_files=chunk_files,
|
||||
output_path=self.config.output.path,
|
||||
overlap_frames=overlap_frames,
|
||||
audio_source=audio_source
|
||||
)
|
||||
# For resume scenarios, make sure we have all chunk files
|
||||
if resume_info['can_resume']:
|
||||
checkpoint_chunk_files = checkpoint_mgr.get_completed_chunk_files()
|
||||
if len(checkpoint_chunk_files) != len(chunk_files):
|
||||
print(f"⚠️ Using {len(checkpoint_chunk_files)} checkpoint files instead of {len(chunk_files)} temp files")
|
||||
chunk_files = checkpoint_chunk_files
|
||||
|
||||
# Determine audio source for final video
|
||||
audio_source = None
|
||||
if self.config.output.preserve_audio and Path(self.config.input.video_path).exists():
|
||||
audio_source = self.config.input.video_path
|
||||
|
||||
# Stream merge chunks directly to output (no memory accumulation)
|
||||
self.merge_chunks_streaming(
|
||||
chunk_files=chunk_files,
|
||||
output_path=self.config.output.path,
|
||||
overlap_frames=overlap_frames,
|
||||
audio_source=audio_source
|
||||
)
|
||||
|
||||
# Mark merge as complete
|
||||
checkpoint_mgr.mark_merge_complete()
|
||||
|
||||
print("✅ Streaming merge complete - no memory accumulation!")
|
||||
|
||||
@@ -736,11 +1132,24 @@ class VideoProcessor:
|
||||
|
||||
print("Video processing completed!")
|
||||
|
||||
# Option to clean up checkpoints
|
||||
print("\n🗄️ CHECKPOINT CLEANUP OPTIONS:")
|
||||
print(" Checkpoints saved successfully and can be cleaned up")
|
||||
print(" - Keep checkpoints for debugging: checkpoint_mgr.cleanup_checkpoints(keep_chunks=True)")
|
||||
print(" - Remove all checkpoints: checkpoint_mgr.cleanup_checkpoints()")
|
||||
print(f" - Checkpoint location: {checkpoint_mgr.checkpoint_dir}")
|
||||
|
||||
# For now, keep checkpoints by default (user can manually clean)
|
||||
print("\n💡 Checkpoints kept for safety. Delete manually when no longer needed.")
|
||||
|
||||
finally:
|
||||
# Clean up temporary chunk files
|
||||
# Clean up temporary chunk files (but not checkpoints)
|
||||
if temp_chunk_dir.exists():
|
||||
print("Cleaning up temporary chunk files...")
|
||||
shutil.rmtree(temp_chunk_dir)
|
||||
try:
|
||||
shutil.rmtree(temp_chunk_dir)
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not clean temp directory: {e}")
|
||||
|
||||
def _print_processing_statistics(self):
|
||||
"""Print detailed processing statistics"""
|
||||
|
||||
Reference in New Issue
Block a user