174 lines
6.2 KiB
Python
174 lines
6.2 KiB
Python
"""
|
|
Video splitter module for the YOLO + SAM2 processing pipeline.
|
|
Handles splitting long videos into manageable segments.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import logging
|
|
from typing import List, Tuple
|
|
from utils.file_utils import ensure_directory, get_video_file_name
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class VideoSplitter:
|
|
"""Handles splitting videos into segments for processing."""
|
|
|
|
def __init__(self, segment_duration: int = 5, force_keyframes: bool = True):
|
|
"""
|
|
Initialize video splitter.
|
|
|
|
Args:
|
|
segment_duration: Duration of each segment in seconds
|
|
force_keyframes: Whether to force keyframes for clean cuts
|
|
"""
|
|
self.segment_duration = segment_duration
|
|
self.force_keyframes = force_keyframes
|
|
|
|
def split_video(self, input_video: str, output_dir: str) -> Tuple[str, List[str]]:
|
|
"""
|
|
Split video into segments and organize into directory structure.
|
|
|
|
Args:
|
|
input_video: Path to input video file
|
|
output_dir: Base output directory
|
|
|
|
Returns:
|
|
Tuple of (segments_directory, list_of_segment_directories)
|
|
"""
|
|
if not os.path.exists(input_video):
|
|
raise FileNotFoundError(f"Input video not found: {input_video}")
|
|
|
|
# Create output directory structure
|
|
video_name = os.path.splitext(os.path.basename(input_video))[0]
|
|
segments_dir = os.path.join(output_dir, f"{video_name}_segments")
|
|
ensure_directory(segments_dir)
|
|
|
|
logger.info(f"Splitting video {input_video} into {self.segment_duration}s segments")
|
|
|
|
# Split video using ffmpeg
|
|
segment_pattern = os.path.join(segments_dir, "segment_%03d.mp4")
|
|
|
|
# Build ffmpeg command
|
|
cmd = [
|
|
'ffmpeg', '-i', input_video,
|
|
'-f', 'segment',
|
|
'-segment_time', str(self.segment_duration),
|
|
'-reset_timestamps', '1',
|
|
'-c', 'copy'
|
|
]
|
|
|
|
# Add keyframe forcing if enabled
|
|
if self.force_keyframes:
|
|
cmd.extend(['-force_key_frames', f'expr:gte(t,n_forced*{self.segment_duration})'])
|
|
|
|
# Add copyts for timestamp preservation
|
|
cmd.extend(['-copyts', segment_pattern])
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
logger.debug(f"FFmpeg output: {result.stderr}")
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"FFmpeg failed: {e.stderr}")
|
|
raise RuntimeError(f"Video splitting failed: {e}")
|
|
|
|
# Organize segments into individual directories
|
|
segment_dirs = self._organize_segments(segments_dir)
|
|
|
|
# Create file list for later concatenation
|
|
self._create_file_list(segments_dir, segment_dirs)
|
|
|
|
logger.info(f"Successfully split video into {len(segment_dirs)} segments")
|
|
return segments_dir, segment_dirs
|
|
|
|
def _organize_segments(self, segments_dir: str) -> List[str]:
|
|
"""
|
|
Move each segment into its own subdirectory.
|
|
|
|
Args:
|
|
segments_dir: Directory containing split segments
|
|
|
|
Returns:
|
|
List of created segment directory names
|
|
"""
|
|
segment_files = []
|
|
segment_dirs = []
|
|
|
|
# Find all segment files
|
|
for file in os.listdir(segments_dir):
|
|
if file.startswith("segment_") and file.endswith(".mp4"):
|
|
segment_files.append(file)
|
|
|
|
# Sort segment files numerically
|
|
segment_files.sort(key=lambda x: int(x.split("_")[1].split(".")[0]))
|
|
|
|
# Move each segment to its own directory
|
|
for i, segment_file in enumerate(segment_files):
|
|
segment_dir_name = f"segment_{i}"
|
|
segment_dir_path = os.path.join(segments_dir, segment_dir_name)
|
|
ensure_directory(segment_dir_path)
|
|
|
|
# Move segment file to subdirectory with standardized name
|
|
old_path = os.path.join(segments_dir, segment_file)
|
|
new_path = os.path.join(segment_dir_path, get_video_file_name(i))
|
|
|
|
os.rename(old_path, new_path)
|
|
segment_dirs.append(segment_dir_name)
|
|
|
|
logger.debug(f"Organized segment {i}: {new_path}")
|
|
|
|
return segment_dirs
|
|
|
|
def _create_file_list(self, segments_dir: str, segment_dirs: List[str]):
|
|
"""
|
|
Create a file list for future concatenation.
|
|
|
|
Args:
|
|
segments_dir: Base segments directory
|
|
segment_dirs: List of segment directory names
|
|
"""
|
|
file_list_path = os.path.join(segments_dir, "file_list.txt")
|
|
|
|
with open(file_list_path, 'w') as f:
|
|
for i, segment_dir in enumerate(segment_dirs):
|
|
segment_path = os.path.join(segment_dir, get_video_file_name(i))
|
|
f.write(f"file '{segment_path}'\\n")
|
|
|
|
logger.debug(f"Created file list: {file_list_path}")
|
|
|
|
def get_segment_info(self, segments_dir: str) -> List[dict]:
|
|
"""
|
|
Get information about all segments in a directory.
|
|
|
|
Args:
|
|
segments_dir: Directory containing segments
|
|
|
|
Returns:
|
|
List of segment information dictionaries
|
|
"""
|
|
segment_info = []
|
|
|
|
for item in os.listdir(segments_dir):
|
|
item_path = os.path.join(segments_dir, item)
|
|
|
|
if os.path.isdir(item_path) and item.startswith("segment_"):
|
|
segment_index = int(item.split("_")[1])
|
|
video_file = os.path.join(item_path, get_video_file_name(segment_index))
|
|
|
|
info = {
|
|
'index': segment_index,
|
|
'directory': item_path,
|
|
'video_file': video_file,
|
|
'exists': os.path.exists(video_file)
|
|
}
|
|
segment_info.append(info)
|
|
|
|
# Sort by index
|
|
segment_info.sort(key=lambda x: x['index'])
|
|
|
|
return segment_info |