""" Status utilities for tracking processing progress and resume capability. """ import os from typing import Dict, List, Tuple import logging logger = logging.getLogger(__name__) def get_processing_status(segments_dir: str) -> Dict[str, any]: """ Get detailed processing status for all segments. Args: segments_dir: Directory containing video segments Returns: Dictionary with processing status information """ if not os.path.exists(segments_dir): return { 'total_segments': 0, 'segments_split': 0, 'yolo_completed': 0, 'sam2_completed': 0, 'can_resume': False, 'next_step': 'split_video' } # Find all segment directories segments = [] for item in os.listdir(segments_dir): item_path = os.path.join(segments_dir, item) if os.path.isdir(item_path) and item.startswith("segment_"): segments.append(item) segments.sort(key=lambda x: int(x.split("_")[1])) # Check status of each segment segments_split = 0 yolo_completed = 0 sam2_completed = 0 for segment in segments: segment_path = os.path.join(segments_dir, segment) segment_idx = int(segment.split("_")[1]) # Check if segment video exists video_file = os.path.join(segment_path, f"segment_{str(segment_idx).zfill(3)}.mp4") if os.path.exists(video_file): segments_split += 1 # Check if YOLO detection completed yolo_file = os.path.join(segment_path, "yolo_detections") if os.path.exists(yolo_file): yolo_completed += 1 # Check if SAM2 processing completed done_file = os.path.join(segment_path, "output_frames_done") if os.path.exists(done_file): sam2_completed += 1 # Determine next step next_step = "complete" if sam2_completed < len(segments): next_step = "sam2_processing" elif yolo_completed < len(segments): next_step = "yolo_detection" elif segments_split < len(segments): next_step = "split_video" return { 'total_segments': len(segments), 'segments_split': segments_split, 'yolo_completed': yolo_completed, 'sam2_completed': sam2_completed, 'can_resume': segments_split > 0, 'next_step': next_step, 'completion_percentage': (sam2_completed / len(segments) * 100) if segments else 0 } def print_processing_status(segments_dir: str): """ Print a human-readable processing status report. Args: segments_dir: Directory containing video segments """ status = get_processing_status(segments_dir) print("\\n" + "="*50) print("PROCESSING STATUS REPORT") print("="*50) print(f"Total Segments: {status['total_segments']}") print(f"Video Splitting: {status['segments_split']}/{status['total_segments']} completed") print(f"YOLO Detection: {status['yolo_completed']}/{status['total_segments']} completed") print(f"SAM2 Processing: {status['sam2_completed']}/{status['total_segments']} completed") print(f"Overall Progress: {status['completion_percentage']:.1f}%") print(f"Next Step: {status['next_step']}") print(f"Can Resume: {'Yes' if status['can_resume'] else 'No'}") print("="*50 + "\\n") def get_incomplete_segments(segments_dir: str) -> List[Tuple[int, str]]: """ Get list of segments that still need processing. Args: segments_dir: Directory containing video segments Returns: List of tuples (segment_index, reason) """ incomplete = [] if not os.path.exists(segments_dir): return incomplete segments = [] for item in os.listdir(segments_dir): item_path = os.path.join(segments_dir, item) if os.path.isdir(item_path) and item.startswith("segment_"): segments.append(item) segments.sort(key=lambda x: int(x.split("_")[1])) for segment in segments: segment_path = os.path.join(segments_dir, segment) segment_idx = int(segment.split("_")[1]) # Check SAM2 completion first (final step) done_file = os.path.join(segment_path, "output_frames_done") if not os.path.exists(done_file): # Check what step is missing yolo_file = os.path.join(segment_path, "yolo_detections") video_file = os.path.join(segment_path, f"segment_{str(segment_idx).zfill(3)}.mp4") if not os.path.exists(video_file): incomplete.append((segment_idx, "video_splitting")) elif not os.path.exists(yolo_file): incomplete.append((segment_idx, "yolo_detection")) else: incomplete.append((segment_idx, "sam2_processing")) return incomplete def cleanup_incomplete_segment(segment_dir: str) -> bool: """ Clean up a partially processed segment for restart. Args: segment_dir: Path to segment directory Returns: True if cleanup was successful """ try: # Remove temporary files that might cause issues temp_files = [ "low_res_video.mp4", "output_frames_done" ] removed_count = 0 for temp_file in temp_files: temp_path = os.path.join(segment_dir, temp_file) if os.path.exists(temp_path): os.remove(temp_path) removed_count += 1 if removed_count > 0: logger.info(f"Cleaned up {removed_count} temporary files from {segment_dir}") return True except Exception as e: logger.error(f"Failed to cleanup segment {segment_dir}: {e}") return False def find_last_valid_mask(segments_dir: str, before_segment: int) -> str: """ Find the most recent segment with a valid mask file. Args: segments_dir: Directory containing segments before_segment: Look for masks before this segment index Returns: Path to the most recent valid mask, or empty string if none found """ for i in range(before_segment - 1, -1, -1): segment_path = os.path.join(segments_dir, f"segment_{i}") mask_path = os.path.join(segment_path, "mask.png") if os.path.exists(mask_path): return segment_path return ""