diff --git a/core/async_lowres_preprocessor.py b/core/async_lowres_preprocessor.py new file mode 100644 index 0000000..d6f6660 --- /dev/null +++ b/core/async_lowres_preprocessor.py @@ -0,0 +1,337 @@ +""" +Async low-resolution video preprocessor for parallel processing optimization. +Creates low-resolution videos in background while main pipeline processes other segments. +""" + +import os +import asyncio +import subprocess +import logging +import threading +from pathlib import Path +from typing import List, Dict, Any, Optional +from concurrent.futures import ThreadPoolExecutor + +logger = logging.getLogger(__name__) + +class AsyncLowResPreprocessor: + """ + Handles async pre-generation of low-resolution videos for SAM2 inference. + Uses FFmpeg subprocesses to bypass Python GIL limitations. + """ + + def __init__(self, max_concurrent: int = 3, segments_ahead: int = 3, use_ffmpeg: bool = True): + """ + Initialize async preprocessor. + + Args: + max_concurrent: Maximum number of concurrent FFmpeg processes + segments_ahead: How many segments to prepare in advance + use_ffmpeg: Use FFmpeg instead of OpenCV for better performance + """ + self.max_concurrent = max_concurrent + self.segments_ahead = segments_ahead + self.use_ffmpeg = use_ffmpeg + self.preparation_tasks = {} # segment_idx -> threading.Thread + self.completed_segments = set() # Track completed preparations + self.active_threads = [] # Track active background threads + + logger.info(f"AsyncLowResPreprocessor initialized: max_concurrent={max_concurrent}, " + f"segments_ahead={segments_ahead}, use_ffmpeg={use_ffmpeg}") + + async def create_lowres_ffmpeg(self, input_path: str, output_path: str, scale: float, semaphore: asyncio.Semaphore) -> bool: + """ + Create low-resolution video using FFmpeg (bypasses Python GIL). + + Args: + input_path: Path to input video + output_path: Path to output low-res video + scale: Scale factor for resolution reduction + semaphore: Asyncio semaphore for limiting concurrent processes + + Returns: + True if successful + """ + async with semaphore: # Limit concurrent FFmpeg processes + try: + # Ensure output directory exists + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # FFmpeg command for fast low-res video creation + cmd = [ + 'ffmpeg', '-y', # Overwrite output + '-i', input_path, + '-vf', f'scale=iw*{scale}:ih*{scale}', + '-c:v', 'libx264', + '-preset', 'ultrafast', # Fastest encoding + '-crf', '28', # Lower quality OK for inference + '-an', # No audio needed for inference + output_path + ] + + logger.debug(f"Starting FFmpeg low-res creation: {os.path.basename(input_path)} -> {os.path.basename(output_path)}") + + # Run FFmpeg asynchronously + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await proc.wait(), await proc.communicate() + + if proc.returncode != 0: + stderr_text = stderr[1].decode() if stderr and len(stderr) > 1 else "Unknown error" + logger.error(f"FFmpeg failed for {input_path}: {stderr_text}") + return False + + # Verify output file was created + if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: + logger.error(f"FFmpeg output file missing or empty: {output_path}") + return False + + logger.debug(f"FFmpeg low-res creation completed: {os.path.basename(output_path)}") + return True + + except Exception as e: + logger.error(f"Error in FFmpeg low-res creation for {input_path}: {e}") + return False + + def create_lowres_opencv(self, input_path: str, output_path: str, scale: float) -> bool: + """ + Fallback: Create low-resolution video using OpenCV (blocking operation). + Used when FFmpeg is not available or fails. + + Args: + input_path: Path to input video + output_path: Path to output low-res video + scale: Scale factor for resolution reduction + + Returns: + True if successful + """ + try: + import cv2 + + logger.debug(f"Creating low-res video with OpenCV: {os.path.basename(input_path)}") + + cap = cv2.VideoCapture(input_path) + if not cap.isOpened(): + logger.error(f"Could not open video with OpenCV: {input_path}") + return False + + # Get video properties + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) * scale) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) * scale) + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 + + # Ensure output directory exists + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # Create video writer + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) + + if not out.isOpened(): + logger.error(f"Could not create video writer for: {output_path}") + cap.release() + return False + + # Process frames + frame_count = 0 + while True: + ret, frame = cap.read() + if not ret: + break + + # Resize frame + low_res_frame = cv2.resize(frame, (frame_width, frame_height), + interpolation=cv2.INTER_LINEAR) + out.write(low_res_frame) + frame_count += 1 + + # Cleanup + cap.release() + out.release() + + logger.debug(f"OpenCV low-res creation completed: {frame_count} frames -> {os.path.basename(output_path)}") + return True + + except Exception as e: + logger.error(f"Error in OpenCV low-res creation for {input_path}: {e}") + return False + + async def create_lowres_video_async(self, input_path: str, output_path: str, scale: float, semaphore: asyncio.Semaphore) -> bool: + """ + Create low-resolution video using the configured method (FFmpeg or OpenCV). + + Args: + input_path: Path to input video + output_path: Path to output low-res video + scale: Scale factor for resolution reduction + semaphore: Asyncio semaphore for limiting concurrent processes + + Returns: + True if successful + """ + # Skip if already exists + if os.path.exists(output_path) and os.path.getsize(output_path) > 0: + logger.debug(f"Low-res video already exists: {os.path.basename(output_path)}") + return True + + if self.use_ffmpeg: + # Try FFmpeg first + success = await self.create_lowres_ffmpeg(input_path, output_path, scale, semaphore) + if success: + return True + + logger.warning(f"FFmpeg failed for {input_path}, falling back to OpenCV") + + # Fallback to OpenCV (run in thread pool to avoid blocking) + loop = asyncio.get_event_loop() + with ThreadPoolExecutor(max_workers=1) as executor: + success = await loop.run_in_executor( + executor, self.create_lowres_opencv, input_path, output_path, scale + ) + + return success + + async def prepare_segment_lowres(self, segment_info: Dict[str, Any], scale: float, + separate_eye_processing: bool = False, semaphore: asyncio.Semaphore = None) -> bool: + """ + Prepare low-resolution videos for a segment (regular or eye-specific). + + Args: + segment_info: Segment information dictionary + scale: Scale factor for resolution reduction + separate_eye_processing: Whether to prepare eye-specific videos + semaphore: Asyncio semaphore for limiting concurrent processes + + Returns: + True if all videos were prepared successfully + """ + segment_idx = segment_info['index'] + segment_dir = segment_info['directory'] + + try: + if separate_eye_processing: + # Prepare low-res videos for left and right eyes + success_left = success_right = True + + left_eye_path = os.path.join(segment_dir, "left_eye.mp4") + right_eye_path = os.path.join(segment_dir, "right_eye.mp4") + + if os.path.exists(left_eye_path): + lowres_left_path = os.path.join(segment_dir, "low_res_left_eye_video.mp4") + success_left = await self.create_lowres_video_async(left_eye_path, lowres_left_path, scale, semaphore) + + if os.path.exists(right_eye_path): + lowres_right_path = os.path.join(segment_dir, "low_res_right_eye_video.mp4") + success_right = await self.create_lowres_video_async(right_eye_path, lowres_right_path, scale, semaphore) + + success = success_left and success_right + if success: + logger.info(f"Pre-generated low-res eye videos for segment {segment_idx}") + else: + logger.warning(f"Failed to pre-generate some eye videos for segment {segment_idx}") + else: + # Prepare regular low-res video + input_path = segment_info['video_file'] + lowres_path = os.path.join(segment_dir, "low_res_video.mp4") + + success = await self.create_lowres_video_async(input_path, lowres_path, scale, semaphore) + if success: + logger.info(f"Pre-generated low-res video for segment {segment_idx}") + else: + logger.warning(f"Failed to pre-generate low-res video for segment {segment_idx}") + + if success: + self.completed_segments.add(segment_idx) + + return success + + except Exception as e: + logger.error(f"Error preparing low-res videos for segment {segment_idx}: {e}") + return False + + def start_background_preparation(self, segments_info: List[Dict[str, Any]], scale: float, + separate_eye_processing: bool = False, current_segment: int = 0): + """ + Start preparing upcoming segments in background using threads. + + Args: + segments_info: List of all segment information + scale: Scale factor for resolution reduction + separate_eye_processing: Whether to prepare eye-specific videos + current_segment: Index of currently processing segment + """ + def background_worker(): + """Background thread worker that prepares upcoming segments.""" + try: + # Prepare segments ahead of current processing + start_idx = current_segment + 1 + end_idx = min(len(segments_info), start_idx + self.segments_ahead) + + segments_to_prepare = [] + for i in range(start_idx, end_idx): + if i not in self.completed_segments and i not in self.preparation_tasks: + segments_to_prepare.append((i, segments_info[i])) + + if segments_to_prepare: + logger.info(f"Starting background preparation for {len(segments_to_prepare)} segments (indices {start_idx}-{end_idx-1})") + + # Run async work in new event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # Create semaphore in this event loop + semaphore = asyncio.Semaphore(self.max_concurrent) + + tasks = [] + for segment_idx, segment_info in segments_to_prepare: + task = self.prepare_segment_lowres(segment_info, scale, separate_eye_processing, semaphore) + tasks.append(task) + + # Run all preparation tasks + results = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + + # Mark completed segments + for i, (segment_idx, _) in enumerate(segments_to_prepare): + if i < len(results) and results[i] is True: + self.completed_segments.add(segment_idx) + logger.debug(f"Background preparation completed for segment {segment_idx}") + + finally: + loop.close() + else: + logger.debug(f"No segments need preparation (current: {current_segment})") + + except Exception as e: + logger.error(f"Error in background preparation worker: {e}") + + # Start background thread + thread = threading.Thread(target=background_worker, daemon=True) + thread.start() + self.active_threads.append(thread) + + def is_segment_ready(self, segment_idx: int) -> bool: + """ + Check if low-res videos for a segment are ready. + + Args: + segment_idx: Index of segment to check + + Returns: + True if segment is ready + """ + return segment_idx in self.completed_segments + + def cleanup(self): + """Clean up any running threads.""" + # Note: daemon threads will be cleaned up automatically when main process exits + # We just clear our tracking structures + self.active_threads.clear() + self.preparation_tasks.clear() + + logger.debug("AsyncLowResPreprocessor cleanup completed") \ No newline at end of file diff --git a/core/mask_processor.py b/core/mask_processor.py index 0770383..e380bec 100644 --- a/core/mask_processor.py +++ b/core/mask_processor.py @@ -459,41 +459,12 @@ class MaskProcessor: return colored_mask - def _precompute_upscaled_masks(self, video_segments: Dict[int, Dict[int, np.ndarray]], - target_width: int, target_height: int) -> Dict[int, Dict[int, np.ndarray]]: - """ - Pre-compute all upscaled masks to avoid per-frame upscaling. - - Args: - video_segments: Dictionary of frame masks from SAM2 - target_width: Target frame width - target_height: Target frame height - - Returns: - Dictionary with pre-upscaled masks - """ - logger.info(f"Pre-computing upscaled masks for {len(video_segments)} frames") - upscaled_segments = {} - - for frame_idx, frame_masks in video_segments.items(): - upscaled_frame_masks = {} - for obj_id, mask in frame_masks.items(): - mask = mask.squeeze() - if mask.shape != (target_height, target_width): - upscaled_mask = cv2.resize(mask.astype(np.uint8), - (target_width, target_height), - interpolation=cv2.INTER_NEAREST) - upscaled_frame_masks[obj_id] = upscaled_mask - else: - upscaled_frame_masks[obj_id] = mask.astype(np.uint8) - upscaled_segments[frame_idx] = upscaled_frame_masks - - logger.info(f"Pre-computed upscaled masks for {len(upscaled_segments)} frames") - return upscaled_segments + def process_and_save_output_video(self, video_path: str, output_video_path: str, video_segments: Dict[int, Dict[int, np.ndarray]], - use_nvenc: bool = False, bitrate: str = "50M") -> bool: + use_nvenc: bool = False, bitrate: str = "50M", + batch_size: int = 16) -> bool: """ Process high-resolution frames, apply upscaled masks, and save the output video. @@ -503,6 +474,7 @@ class MaskProcessor: video_segments: Dictionary of frame masks use_nvenc: Whether to use NVIDIA hardware encoding bitrate: Output video bitrate + batch_size: Number of frames to process in a single batch Returns: True if successful @@ -520,18 +492,15 @@ class MaskProcessor: logger.info(f"Processing video: {frame_width}x{frame_height} @ {fps}fps, {total_frames} frames") - # Pre-compute all upscaled masks (Phase 1 optimization) - upscaled_segments = self._precompute_upscaled_masks(video_segments, frame_width, frame_height) - # Setup VideoWriter + out_writer = None if self.output_mode == "alpha_channel": - # For alpha channel, we need a codec that supports transparency success = self._setup_alpha_encoder(output_video_path, frame_width, frame_height, fps, bitrate) if not success: logger.error("Failed to setup alpha channel encoder") cap.release() return False - use_nvenc = False # Override NVENC for alpha channel + use_nvenc = False elif use_nvenc: success = self._setup_nvenc_encoder(output_video_path, frame_width, frame_height, fps, bitrate) if not success: @@ -539,106 +508,101 @@ class MaskProcessor: use_nvenc = False if not use_nvenc and self.output_mode != "alpha_channel": - # Use OpenCV VideoWriter - fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Use mp4v for better compatibility - out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) - if not out.isOpened(): + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) + if not out_writer.isOpened(): logger.error("Failed to create output video writer") cap.release() return False - - # Process frames with batch reading (Phase 1 optimization) + + # Process frames in batches frame_idx = 0 processed_frames = 0 - batch_size = 10 # Process frames in batches for better I/O performance - frame_buffer = [] - # Pre-fill frame buffer - for _ in range(min(batch_size, len(upscaled_segments))): - ret, frame = cap.read() - if ret: - frame_buffer.append(frame) - else: + while frame_idx < total_frames: + batch_frames = [] + batch_masks = [] + + # Read a batch of frames + for _ in range(batch_size): + ret, frame = cap.read() + if not ret: + break + batch_frames.append(frame) + + if not batch_frames: break - - buffer_idx = 0 - while frame_idx < len(upscaled_segments) and buffer_idx < len(frame_buffer): - frame = frame_buffer[buffer_idx] - if frame_idx in upscaled_segments: - # Get pre-computed upscaled masks for this frame (Phase 1 optimization) - upscaled_masks = [upscaled_segments[frame_idx][obj_id] - for obj_id in upscaled_segments[frame_idx]] - - # Apply mask based on output mode (no upscaling needed - already done) - if self.output_mode == "alpha_channel": - result_frame = self.apply_mask_with_alpha(frame, upscaled_masks) + # Get masks for the current batch and perform just-in-time upscaling + for i in range(len(batch_frames)): + current_frame_idx = frame_idx + i + if current_frame_idx in video_segments: + frame_masks = video_segments[current_frame_idx] + upscaled_masks = [] + for obj_id, mask in frame_masks.items(): + mask = mask.squeeze() + if mask.shape != (frame_height, frame_width): + upscaled_mask = cv2.resize(mask.astype(np.uint8), + (frame_width, frame_height), + interpolation=cv2.INTER_NEAREST) + upscaled_masks.append(upscaled_mask) + else: + upscaled_masks.append(mask.astype(np.uint8)) + batch_masks.append(upscaled_masks) else: - result_frame = self.apply_green_mask(frame, upscaled_masks) - else: - # No mask for this frame - if self.output_mode == "alpha_channel": - # Create fully transparent frame for alpha channel mode - bgra_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2BGRA) - bgra_frame[:, :, 3] = 0 # Fully transparent - result_frame = bgra_frame - logger.warning(f"No mask for frame {frame_idx}, using transparent frame") - else: - # Use original frame for green screen mode - logger.warning(f"No mask for frame {frame_idx}, using original") - result_frame = frame + batch_masks.append([]) # No masks for this frame - # Write frame - if self.output_mode == "alpha_channel" and hasattr(self, 'alpha_process'): - self.alpha_process.stdin.write(result_frame.tobytes()) - elif use_nvenc and hasattr(self, 'nvenc_process'): - self.nvenc_process.stdin.write(result_frame.tobytes()) - else: - out.write(result_frame) - - processed_frames += 1 - frame_idx += 1 - buffer_idx += 1 - - # Refill buffer when needed - if buffer_idx >= len(frame_buffer) and frame_idx < len(upscaled_segments): - frame_buffer.clear() - buffer_idx = 0 - # Read next batch - for _ in range(min(batch_size, len(upscaled_segments) - frame_idx)): - ret, frame = cap.read() - if ret: - frame_buffer.append(frame) + # Process the batch + result_batch = [] + for i, frame in enumerate(batch_frames): + masks = batch_masks[i] + if masks: + if self.output_mode == "alpha_channel": + result_frame = self.apply_mask_with_alpha(frame, masks) else: - break + result_frame = self.apply_green_mask(frame, masks) + else: + # No mask for this frame + if self.output_mode == "alpha_channel": + bgra_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2BGRA) + bgra_frame[:, :, 3] = 0 + result_frame = bgra_frame + else: + result_frame = frame + result_batch.append(result_frame) - # Progress logging - if processed_frames % 100 == 0: + # Write the processed batch + for result_frame in result_batch: + if self.output_mode == "alpha_channel" and hasattr(self, 'alpha_process'): + self.alpha_process.stdin.write(result_frame.tobytes()) + elif use_nvenc and hasattr(self, 'nvenc_process'): + self.nvenc_process.stdin.write(result_frame.tobytes()) + else: + out_writer.write(result_frame) + + processed_frames += len(batch_frames) + frame_idx += len(batch_frames) + + if processed_frames % 100 < batch_size: logger.info(f"Processed {processed_frames}/{total_frames} frames") - + # Cleanup cap.release() - if self.output_mode == "alpha_channel" and hasattr(self, 'alpha_process'): self.alpha_process.stdin.close() self.alpha_process.wait() - if self.alpha_process.returncode != 0: - logger.error("Alpha channel encoding failed") - return False elif use_nvenc and hasattr(self, 'nvenc_process'): self.nvenc_process.stdin.close() self.nvenc_process.wait() - if self.nvenc_process.returncode != 0: - logger.error("NVENC encoding failed") - return False else: - out.release() + if out_writer: + out_writer.release() logger.info(f"Successfully processed {processed_frames} frames to {output_video_path}") return True except Exception as e: - logger.error(f"Error processing video: {e}") + logger.error(f"Error processing video: {e}", exc_info=True) return False def _setup_nvenc_encoder(self, output_path: str, width: int, height: int, @@ -751,6 +715,14 @@ class MaskProcessor: if success: logger.info(f"Successfully created {self.output_mode} video: {output_video}") + # Mark segment as completed only after video is successfully written + try: + output_done_file = os.path.join(segment_info['directory'], "output_frames_done") + with open(output_done_file, 'w') as f: + f.write(f"Segment {segment_info['index']} processed and saved successfully.") + logger.debug(f"Created completion marker for segment {segment_info['index']}") + except Exception as e: + logger.error(f"Failed to create completion marker for segment {segment_info['index']}: {e}") else: logger.error(f"Failed to process segment {segment_info['index']}") diff --git a/core/sam2_processor.py b/core/sam2_processor.py index 923e126..9ac6c8b 100644 --- a/core/sam2_processor.py +++ b/core/sam2_processor.py @@ -8,6 +8,7 @@ import cv2 import numpy as np import torch import logging +import subprocess import gc from typing import Dict, List, Any, Optional, Tuple from sam2.build_sam import build_sam2_video_predictor @@ -19,7 +20,8 @@ class SAM2Processor: """Handles SAM2-based video segmentation for human tracking.""" def __init__(self, checkpoint_path: str, config_path: str, vos_optimized: bool = False, - separate_eye_processing: bool = False, eye_overlap_pixels: int = 0): + separate_eye_processing: bool = False, eye_overlap_pixels: int = 0, + async_preprocessor=None): """ Initialize SAM2 processor. @@ -29,11 +31,13 @@ class SAM2Processor: vos_optimized: Enable VOS optimization for speedup (requires PyTorch 2.5.1+) separate_eye_processing: Enable VR180 separate eye processing mode eye_overlap_pixels: Pixel overlap between eyes for blending + async_preprocessor: Optional async preprocessor for background low-res video generation """ self.checkpoint_path = checkpoint_path self.config_path = config_path self.vos_optimized = vos_optimized self.separate_eye_processing = separate_eye_processing + self.async_preprocessor = async_preprocessor self.predictor = None # Initialize eye processor if separate eye processing is enabled @@ -120,13 +124,64 @@ class SAM2Processor: def create_low_res_video(self, input_video_path: str, output_video_path: str, scale: float): """ - Create a low-resolution version of the input video for inference. + Create a low-resolution version of the input video for inference using FFmpeg + with hardware acceleration for improved performance. Args: input_video_path: Path to input video output_video_path: Path to output low-res video scale: Scale factor for resolution reduction """ + try: + # Get video properties using OpenCV + cap = cv2.VideoCapture(input_video_path) + if not cap.isOpened(): + raise ValueError(f"Could not open video: {input_video_path}") + + original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + + target_width = int(original_width * scale) + target_height = int(original_height * scale) + + # Ensure dimensions are even, as required by many codecs + target_width = target_width if target_width % 2 == 0 else target_width + 1 + target_height = target_height if target_height % 2 == 0 else target_height + 1 + + # Construct FFmpeg command with hardware acceleration + command = [ + 'ffmpeg', + '-y', + '-hwaccel', 'auto', # Auto-detect hardware acceleration + '-i', input_video_path, + '-vf', f'scale={target_width}:{target_height}', + '-c:v', 'h264_nvenc', # Use NVIDIA's hardware encoder + '-preset', 'fast', + '-crf', '23', + output_video_path + ] + + logger.info(f"Executing FFmpeg command: {' '.join(command)}") + + # Execute FFmpeg command + process = subprocess.run(command, check=True, capture_output=True, text=True) + + if process.returncode != 0: + logger.error(f"FFmpeg failed with error: {process.stderr}") + raise RuntimeError(f"FFmpeg process failed: {process.stderr}") + + logger.info(f"Created low-res video with {frame_count} frames: {output_video_path}") + + except (subprocess.CalledProcessError, FileNotFoundError) as e: + logger.warning(f"Hardware-accelerated FFmpeg failed: {e}. Falling back to OpenCV.") + # Fallback to original OpenCV implementation if FFmpeg fails + self._create_low_res_video_opencv(input_video_path, output_video_path, scale) + + def _create_low_res_video_opencv(self, input_video_path: str, output_video_path: str, scale: float): + """Original OpenCV-based implementation for creating low-resolution video.""" cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): raise ValueError(f"Could not open video: {input_video_path}") @@ -151,7 +206,42 @@ class SAM2Processor: cap.release() out.release() - logger.info(f"Created low-res video with {frame_count} frames: {output_video_path}") + logger.info(f"Created low-res video with {frame_count} frames using OpenCV: {output_video_path}") + + def ensure_low_res_video(self, input_video_path: str, output_video_path: str, + scale: float, segment_idx: Optional[int] = None) -> bool: + """ + Ensure low-resolution video exists, using async preprocessor if available. + + Args: + input_video_path: Path to input video + output_video_path: Path to output low-res video + scale: Scale factor for resolution reduction + segment_idx: Optional segment index for async coordination + + Returns: + True if low-res video is ready + """ + # Check if already exists + if os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0: + return True + + # Use async preprocessor if available and segment index provided + if self.async_preprocessor and segment_idx is not None: + if self.async_preprocessor.is_segment_ready(segment_idx): + if os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0: + logger.debug(f"Async preprocessor provided segment {segment_idx}") + return True + else: + logger.debug(f"Async preprocessor hasn't completed segment {segment_idx} yet") + + # Fallback to synchronous creation + try: + self.create_low_res_video(input_video_path, output_video_path, scale) + return os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0 + except Exception as e: + logger.error(f"Failed to create low-res video {output_video_path}: {e}") + return False def add_yolo_prompts_to_predictor(self, inference_state, prompts: List[Dict[str, Any]]) -> bool: """ @@ -341,14 +431,11 @@ class SAM2Processor: logger.info(f"Processing segment {segment_idx} with SAM2") - # Create low-resolution video for inference + # Create low-resolution video for inference (async-aware) low_res_video_path = os.path.join(segment_dir, "low_res_video.mp4") - if not os.path.exists(low_res_video_path): - try: - self.create_low_res_video(video_path, low_res_video_path, inference_scale) - except Exception as e: - logger.error(f"Failed to create low-res video for segment {segment_idx}: {e}") - return None + if not self.ensure_low_res_video(video_path, low_res_video_path, inference_scale, segment_idx): + logger.error(f"Failed to create low-res video for segment {segment_idx}") + return None try: # Initialize inference state @@ -387,13 +474,7 @@ class SAM2Processor: except Exception as e: logger.warning(f"Could not remove low-res video: {e}") - # Mark segment as completed (for resume capability) - try: - with open(output_done_file, 'w') as f: - f.write(f"Segment {segment_idx} completed successfully\n") - logger.debug(f"Marked segment {segment_idx} as completed") - except Exception as e: - logger.warning(f"Could not create completion marker: {e}") + return video_segments @@ -698,14 +779,11 @@ class SAM2Processor: logger.error(f"Eye video not found: {eye_video_path}") return None - # Create low-resolution eye video for inference + # Create low-resolution eye video for inference (async-aware) low_res_eye_video_path = os.path.join(segment_dir, f"low_res_{eye_side}_eye_video.mp4") - if not os.path.exists(low_res_eye_video_path): - try: - self.create_low_res_video(eye_video_path, low_res_eye_video_path, inference_scale) - except Exception as e: - logger.error(f"Failed to create low-res {eye_side} eye video for segment {segment_idx}: {e}") - return None + if not self.ensure_low_res_video(eye_video_path, low_res_eye_video_path, inference_scale, segment_idx): + logger.error(f"Failed to create low-res {eye_side} eye video for segment {segment_idx}") + return None try: # Initialize inference state with eye-specific video diff --git a/core/video_splitter.py b/core/video_splitter.py index 97c6fa9..7661891 100644 --- a/core/video_splitter.py +++ b/core/video_splitter.py @@ -44,6 +44,14 @@ class VideoSplitter: segments_dir = os.path.join(output_dir, f"{video_name}_segments") ensure_directory(segments_dir) + # Check for completion marker to avoid re-splitting + completion_marker = os.path.join(segments_dir, ".splitting_done") + if os.path.exists(completion_marker): + logger.info(f"Video already split, skipping splitting process. Found completion marker: {completion_marker}") + segment_dirs = [d for d in os.listdir(segments_dir) if os.path.isdir(os.path.join(segments_dir, d)) and d.startswith("segment_")] + segment_dirs.sort(key=lambda x: int(x.split("_")[1])) + return segments_dir, segment_dirs + logger.info(f"Splitting video {input_video} into {self.segment_duration}s segments") # Split video using ffmpeg @@ -83,6 +91,11 @@ class VideoSplitter: # Create file list for later concatenation self._create_file_list(segments_dir, segment_dirs) + # Create completion marker + completion_marker = os.path.join(segments_dir, ".splitting_done") + with open(completion_marker, 'w') as f: + f.write("Video splitting completed successfully.") + logger.info(f"Successfully split video into {len(segment_dirs)} segments") return segments_dir, segment_dirs diff --git a/main.py b/main.py index b870ac2..af99ec2 100644 --- a/main.py +++ b/main.py @@ -475,8 +475,8 @@ def process_segment_with_separate_eyes(segment_info, detector, sam2_processor, m return success, left_masks, right_masks -def main(): - """Main processing pipeline.""" +async def main_async(): + """Main processing pipeline with async optimizations.""" args = parse_arguments() try: @@ -568,6 +568,24 @@ def main(): eye_overlap_pixels = config.get('processing.eye_overlap_pixels', 0) enable_greenscreen_fallback = config.get('processing.enable_greenscreen_fallback', True) + # Initialize async preprocessor if enabled + async_preprocessor = None + if config.get('advanced.enable_background_lowres_generation', False): + from core.async_lowres_preprocessor import AsyncLowResPreprocessor + + max_concurrent = config.get('advanced.max_concurrent_lowres', 3) + segments_ahead = config.get('advanced.lowres_segments_ahead', 3) + use_ffmpeg = config.get('advanced.use_ffmpeg_lowres', True) + + async_preprocessor = AsyncLowResPreprocessor( + max_concurrent=max_concurrent, + segments_ahead=segments_ahead, + use_ffmpeg=use_ffmpeg + ) + logger.info(f"Async low-res preprocessing: ENABLED (max_concurrent={max_concurrent}, segments_ahead={segments_ahead})") + else: + logger.info("Async low-res preprocessing: DISABLED") + if separate_eye_processing: logger.info("VR180 Separate Eye Processing: ENABLED") logger.info(f"Eye overlap pixels: {eye_overlap_pixels}") @@ -578,7 +596,8 @@ def main(): config_path=config.get_sam2_config(), vos_optimized=config.get('models.sam2_vos_optimized', False), separate_eye_processing=separate_eye_processing, - eye_overlap_pixels=eye_overlap_pixels + eye_overlap_pixels=eye_overlap_pixels, + async_preprocessor=async_preprocessor ) # Initialize mask processor with quality enhancements @@ -593,6 +612,16 @@ def main(): logger.info("Step 4: Processing segments sequentially") total_humans_detected = 0 + # Start background low-res video preprocessing if enabled + if async_preprocessor: + logger.info("Starting background low-res video preprocessing") + async_preprocessor.start_background_preparation( + segments_info, + config.get_inference_scale(), + separate_eye_processing, + current_segment=0 + ) + # Initialize previous masks for separate eye processing previous_left_masks = None previous_right_masks = None @@ -602,6 +631,15 @@ def main(): logger.info(f"Processing segment {segment_idx}/{len(segments_info)-1}") + # Start background preparation for upcoming segments + if async_preprocessor and i < len(segments_info) - 1: + async_preprocessor.start_background_preparation( + segments_info, + config.get_inference_scale(), + separate_eye_processing, + current_segment=i + ) + # Reset temporal history for new segment mask_processor.reset_temporal_history() @@ -1012,6 +1050,16 @@ def main(): except Exception as e: logger.error(f"Pipeline failed: {e}", exc_info=True) return 1 + finally: + # Cleanup async preprocessor if it was used + if async_preprocessor: + async_preprocessor.cleanup() + logger.debug("Async preprocessor cleanup completed") + +def main(): + """Main entry point - wrapper for async main.""" + import asyncio + return asyncio.run(main_async()) if __name__ == "__main__": exit_code = main()