""" File system utilities for the YOLO + SAM2 video processing pipeline. """ import os import shutil import glob from typing import List, Optional import logging logger = logging.getLogger(__name__) def ensure_directory(path: str) -> str: """ Ensure directory exists, create if it doesn't. Args: path: Directory path to create Returns: The created directory path """ os.makedirs(path, exist_ok=True) logger.debug(f"Ensured directory exists: {path}") return path def cleanup_directory(path: str, pattern: str = "*") -> int: """ Clean up files matching pattern in directory. Args: path: Directory path to clean pattern: File pattern to match (default: all files) Returns: Number of files removed """ if not os.path.exists(path): return 0 files_to_remove = glob.glob(os.path.join(path, pattern)) removed_count = 0 for file_path in files_to_remove: try: if os.path.isfile(file_path): os.remove(file_path) removed_count += 1 elif os.path.isdir(file_path): shutil.rmtree(file_path) removed_count += 1 except OSError as e: logger.warning(f"Failed to remove {file_path}: {e}") if removed_count > 0: logger.info(f"Cleaned up {removed_count} files/directories from {path}") return removed_count def get_segments_directories(base_dir: str) -> List[str]: """ Get list of segment directories sorted by segment number. Args: base_dir: Base directory containing segments Returns: Sorted list of segment directory names """ if not os.path.exists(base_dir): return [] segments = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)) and d.startswith("segment_")] # Sort by segment number segments.sort(key=lambda x: int(x.split("_")[1])) logger.debug(f"Found {len(segments)} segment directories in {base_dir}") return segments def get_video_file_name(segment_index: int) -> str: """ Get standardized video filename for a segment. Args: segment_index: Index of the segment Returns: Formatted filename """ return f"segment_{str(segment_index).zfill(3)}.mp4" def file_exists(file_path: str) -> bool: """ Check if file exists and is readable. Args: file_path: Path to file Returns: True if file exists and is readable """ return os.path.isfile(file_path) and os.access(file_path, os.R_OK) def create_file_list(segments_dir: str, output_path: str) -> str: """ Create ffmpeg-compatible file list for concatenation. Args: segments_dir: Directory containing segment subdirectories output_path: Path to write the file list Returns: Path to the created file list """ segments = get_segments_directories(segments_dir) with open(output_path, 'w') as f: for i, segment in enumerate(segments): segment_dir = os.path.join(segments_dir, segment) output_video = os.path.join(segment_dir, f"output_{i}.mp4") if file_exists(output_video): # Use relative path for ffmpeg relative_path = os.path.relpath(output_video, os.path.dirname(output_path)) f.write(f"file '{relative_path}'\\n") logger.info(f"Created file list at {output_path}") return output_path def safe_remove_file(file_path: str) -> bool: """ Safely remove a file with error handling. Args: file_path: Path to file to remove Returns: True if file was removed successfully """ try: if os.path.exists(file_path): os.remove(file_path) logger.debug(f"Removed file: {file_path}") return True return False except OSError as e: logger.warning(f"Failed to remove {file_path}: {e}") return False def get_file_size_mb(file_path: str) -> float: """ Get file size in megabytes. Args: file_path: Path to file Returns: File size in MB, or 0 if file doesn't exist """ try: if os.path.exists(file_path): size_bytes = os.path.getsize(file_path) return size_bytes / (1024 * 1024) return 0.0 except OSError: return 0.0