Files
samyolo_on_segments/utils/file_utils.py
2025-07-27 11:43:07 -07:00

168 lines
4.5 KiB
Python

"""
File system utilities for the YOLO + SAM2 video processing pipeline.
"""
import os
import shutil
import glob
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
def ensure_directory(path: str) -> str:
"""
Ensure directory exists, create if it doesn't.
Args:
path: Directory path to create
Returns:
The created directory path
"""
os.makedirs(path, exist_ok=True)
logger.debug(f"Ensured directory exists: {path}")
return path
def cleanup_directory(path: str, pattern: str = "*") -> int:
"""
Clean up files matching pattern in directory.
Args:
path: Directory path to clean
pattern: File pattern to match (default: all files)
Returns:
Number of files removed
"""
if not os.path.exists(path):
return 0
files_to_remove = glob.glob(os.path.join(path, pattern))
removed_count = 0
for file_path in files_to_remove:
try:
if os.path.isfile(file_path):
os.remove(file_path)
removed_count += 1
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
removed_count += 1
except OSError as e:
logger.warning(f"Failed to remove {file_path}: {e}")
if removed_count > 0:
logger.info(f"Cleaned up {removed_count} files/directories from {path}")
return removed_count
def get_segments_directories(base_dir: str) -> List[str]:
"""
Get list of segment directories sorted by segment number.
Args:
base_dir: Base directory containing segments
Returns:
Sorted list of segment directory names
"""
if not os.path.exists(base_dir):
return []
segments = [d for d in os.listdir(base_dir)
if os.path.isdir(os.path.join(base_dir, d)) and d.startswith("segment_")]
# Sort by segment number
segments.sort(key=lambda x: int(x.split("_")[1]))
logger.debug(f"Found {len(segments)} segment directories in {base_dir}")
return segments
def get_video_file_name(segment_index: int) -> str:
"""
Get standardized video filename for a segment.
Args:
segment_index: Index of the segment
Returns:
Formatted filename
"""
return f"segment_{str(segment_index).zfill(3)}.mp4"
def file_exists(file_path: str) -> bool:
"""
Check if file exists and is readable.
Args:
file_path: Path to file
Returns:
True if file exists and is readable
"""
return os.path.isfile(file_path) and os.access(file_path, os.R_OK)
def create_file_list(segments_dir: str, output_path: str) -> str:
"""
Create ffmpeg-compatible file list for concatenation.
Args:
segments_dir: Directory containing segment subdirectories
output_path: Path to write the file list
Returns:
Path to the created file list
"""
segments = get_segments_directories(segments_dir)
with open(output_path, 'w') as f:
for i, segment in enumerate(segments):
segment_dir = os.path.join(segments_dir, segment)
output_video = os.path.join(segment_dir, f"output_{i}.mp4")
if file_exists(output_video):
# Use relative path for ffmpeg
relative_path = os.path.relpath(output_video, os.path.dirname(output_path))
f.write(f"file '{relative_path}'\\n")
logger.info(f"Created file list at {output_path}")
return output_path
def safe_remove_file(file_path: str) -> bool:
"""
Safely remove a file with error handling.
Args:
file_path: Path to file to remove
Returns:
True if file was removed successfully
"""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.debug(f"Removed file: {file_path}")
return True
return False
except OSError as e:
logger.warning(f"Failed to remove {file_path}: {e}")
return False
def get_file_size_mb(file_path: str) -> float:
"""
Get file size in megabytes.
Args:
file_path: Path to file
Returns:
File size in MB, or 0 if file doesn't exist
"""
try:
if os.path.exists(file_path):
size_bytes = os.path.getsize(file_path)
return size_bytes / (1024 * 1024)
return 0.0
except OSError:
return 0.0