""" Video splitter module for the YOLO + SAM2 processing pipeline. Handles splitting long videos into manageable segments. """ import os import subprocess import logging from typing import List, Tuple from utils.file_utils import ensure_directory, get_video_file_name logger = logging.getLogger(__name__) class VideoSplitter: """Handles splitting videos into segments for processing.""" def __init__(self, segment_duration: int = 5, force_keyframes: bool = True): """ Initialize video splitter. Args: segment_duration: Duration of each segment in seconds force_keyframes: Whether to force keyframes for clean cuts """ self.segment_duration = segment_duration self.force_keyframes = force_keyframes def split_video(self, input_video: str, output_dir: str) -> Tuple[str, List[str]]: """ Split video into segments and organize into directory structure. Args: input_video: Path to input video file output_dir: Base output directory Returns: Tuple of (segments_directory, list_of_segment_directories) """ if not os.path.exists(input_video): raise FileNotFoundError(f"Input video not found: {input_video}") # Create output directory structure video_name = os.path.splitext(os.path.basename(input_video))[0] segments_dir = os.path.join(output_dir, f"{video_name}_segments") ensure_directory(segments_dir) logger.info(f"Splitting video {input_video} into {self.segment_duration}s segments") # Split video using ffmpeg segment_pattern = os.path.join(segments_dir, "segment_%03d.mp4") # Build ffmpeg command cmd = [ 'ffmpeg', '-i', input_video, '-f', 'segment', '-segment_time', str(self.segment_duration), '-reset_timestamps', '1', '-c', 'copy' ] # Add keyframe forcing if enabled if self.force_keyframes: cmd.extend(['-force_key_frames', f'expr:gte(t,n_forced*{self.segment_duration})']) # Add copyts for timestamp preservation cmd.extend(['-copyts', segment_pattern]) try: result = subprocess.run( cmd, check=True, capture_output=True, text=True ) logger.debug(f"FFmpeg output: {result.stderr}") except subprocess.CalledProcessError as e: logger.error(f"FFmpeg failed: {e.stderr}") raise RuntimeError(f"Video splitting failed: {e}") # Organize segments into individual directories segment_dirs = self._organize_segments(segments_dir) # Create file list for later concatenation self._create_file_list(segments_dir, segment_dirs) logger.info(f"Successfully split video into {len(segment_dirs)} segments") return segments_dir, segment_dirs def _organize_segments(self, segments_dir: str) -> List[str]: """ Move each segment into its own subdirectory. Args: segments_dir: Directory containing split segments Returns: List of created segment directory names """ segment_files = [] segment_dirs = [] # Find all segment files for file in os.listdir(segments_dir): if file.startswith("segment_") and file.endswith(".mp4"): segment_files.append(file) # Sort segment files numerically segment_files.sort(key=lambda x: int(x.split("_")[1].split(".")[0])) # Move each segment to its own directory for i, segment_file in enumerate(segment_files): segment_dir_name = f"segment_{i}" segment_dir_path = os.path.join(segments_dir, segment_dir_name) ensure_directory(segment_dir_path) # Move segment file to subdirectory with standardized name old_path = os.path.join(segments_dir, segment_file) new_path = os.path.join(segment_dir_path, get_video_file_name(i)) os.rename(old_path, new_path) segment_dirs.append(segment_dir_name) logger.debug(f"Organized segment {i}: {new_path}") return segment_dirs def _create_file_list(self, segments_dir: str, segment_dirs: List[str]): """ Create a file list for future concatenation. Args: segments_dir: Base segments directory segment_dirs: List of segment directory names """ file_list_path = os.path.join(segments_dir, "file_list.txt") with open(file_list_path, 'w') as f: for i, segment_dir in enumerate(segment_dirs): segment_path = os.path.join(segment_dir, get_video_file_name(i)) f.write(f"file '{segment_path}'\\n") logger.debug(f"Created file list: {file_list_path}") def get_segment_info(self, segments_dir: str) -> List[dict]: """ Get information about all segments in a directory. Args: segments_dir: Directory containing segments Returns: List of segment information dictionaries """ segment_info = [] for item in os.listdir(segments_dir): item_path = os.path.join(segments_dir, item) if os.path.isdir(item_path) and item.startswith("segment_"): segment_index = int(item.split("_")[1]) video_file = os.path.join(item_path, get_video_file_name(segment_index)) info = { 'index': segment_index, 'directory': item_path, 'video_file': video_file, 'exists': os.path.exists(video_file) } segment_info.append(info) # Sort by index segment_info.sort(key=lambda x: x['index']) return segment_info