""" Streaming frame writer using ffmpeg pipe for zero-copy output """ import subprocess import numpy as np from pathlib import Path from typing import Optional, Dict, Any import signal import atexit import warnings def test_nvenc_support() -> bool: """Test if NVENC encoding is available""" try: # Quick test with a 1-frame video cmd = [ 'ffmpeg', '-f', 'lavfi', '-i', 'testsrc=duration=0.1:size=320x240:rate=1', '-c:v', 'h264_nvenc', '-t', '0.1', '-f', 'null', '-' ] result = subprocess.run( cmd, capture_output=True, timeout=10, text=True ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False class StreamingFrameWriter: """Write frames directly to ffmpeg via pipe for memory-efficient output""" def __init__(self, output_path: str, width: int, height: int, fps: float, audio_source: Optional[str] = None, video_codec: str = 'h264_nvenc', quality_preset: str = 'p4', # NVENC preset crf: int = 18, pixel_format: str = 'bgr24'): self.output_path = Path(output_path) self.output_path.parent.mkdir(parents=True, exist_ok=True) self.width = width self.height = height self.fps = fps self.audio_source = audio_source self.pixel_format = pixel_format self.frames_written = 0 self.ffmpeg_process = None # Test NVENC support if GPU codec requested if video_codec in ['h264_nvenc', 'hevc_nvenc']: print(f"šŸ” Testing NVENC support...") if not test_nvenc_support(): print(f"āŒ NVENC not available, switching to CPU encoding") video_codec = 'libx264' quality_preset = 'medium' else: print(f"āœ… NVENC available") # Build ffmpeg command self.ffmpeg_cmd = self._build_ffmpeg_command( video_codec, quality_preset, crf ) # Start ffmpeg process self._start_ffmpeg() # Register cleanup atexit.register(self.close) print(f"šŸ“¼ Streaming writer initialized:") print(f" Output: {self.output_path}") print(f" Resolution: {width}x{height} @ {fps}fps") print(f" Codec: {video_codec}") print(f" Audio: {'Yes' if audio_source else 'No'}") def _build_ffmpeg_command(self, video_codec: str, preset: str, crf: int) -> list: """Build ffmpeg command with optimal settings""" cmd = ['ffmpeg', '-y'] # Overwrite output # Video input from pipe cmd.extend([ '-f', 'rawvideo', '-pix_fmt', self.pixel_format, '-s', f'{self.width}x{self.height}', '-r', str(self.fps), '-i', 'pipe:0' # Read from stdin ]) # Audio input if provided if self.audio_source and Path(self.audio_source).exists(): cmd.extend(['-i', str(self.audio_source)]) # Try GPU encoding first, fallback to CPU if video_codec == 'h264_nvenc': # NVIDIA GPU encoding cmd.extend([ '-c:v', 'h264_nvenc', '-preset', preset, # p1-p7, higher = better quality '-rc', 'vbr', # Variable bitrate '-cq', str(crf), # Quality level (0-51, lower = better) '-b:v', '0', # Let VBR decide bitrate '-maxrate', '50M', # Max bitrate for 8K '-bufsize', '100M' # Buffer size ]) elif video_codec == 'hevc_nvenc': # NVIDIA HEVC/H.265 encoding (better for 8K) cmd.extend([ '-c:v', 'hevc_nvenc', '-preset', preset, '-rc', 'vbr', '-cq', str(crf), '-b:v', '0', '-maxrate', '40M', # HEVC is more efficient '-bufsize', '80M' ]) else: # CPU fallback (libx264) cmd.extend([ '-c:v', 'libx264', '-preset', 'medium', '-crf', str(crf), '-pix_fmt', 'yuv420p' ]) # Audio settings if self.audio_source: cmd.extend([ '-c:a', 'copy', # Copy audio without re-encoding '-map', '0:v:0', # Map video from pipe '-map', '1:a:0', # Map audio from file '-shortest' # Match shortest stream ]) else: cmd.extend(['-map', '0:v:0']) # Video only # Output file cmd.append(str(self.output_path)) return cmd def _start_ffmpeg(self) -> None: """Start ffmpeg subprocess""" try: print(f"šŸŽ¬ Starting ffmpeg: {' '.join(self.ffmpeg_cmd[:10])}...") self.ffmpeg_process = subprocess.Popen( self.ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10**8 # Large buffer for performance ) # Test if ffmpeg starts successfully (quick check) import time time.sleep(0.2) # Give ffmpeg time to fail if it's going to if self.ffmpeg_process.poll() is not None: # Process already died - read error stderr = self.ffmpeg_process.stderr.read().decode() # Check for specific NVENC errors and provide better feedback if 'nvenc' in ' '.join(self.ffmpeg_cmd): if 'unsupported device' in stderr.lower(): print(f"āŒ NVENC not available on this GPU - switching to CPU encoding") elif 'cannot load' in stderr.lower() or 'not found' in stderr.lower(): print(f"āŒ NVENC drivers not available - switching to CPU encoding") else: print(f"āŒ NVENC encoding failed: {stderr}") # Try CPU fallback print(f"šŸ”„ Falling back to CPU encoding (libx264)...") self.ffmpeg_cmd = self._build_ffmpeg_command('libx264', 'medium', 18) return self._start_ffmpeg() else: raise RuntimeError(f"FFmpeg failed: {stderr}") # Set process to ignore SIGINT (Ctrl+C) - we'll handle it if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) except Exception as e: # Final fallback if everything fails if 'nvenc' in ' '.join(self.ffmpeg_cmd): print(f"āš ļø GPU encoding failed with error: {e}") print(f"šŸ”„ Falling back to CPU encoding...") self.ffmpeg_cmd = self._build_ffmpeg_command('libx264', 'medium', 18) return self._start_ffmpeg() else: raise RuntimeError(f"Failed to start ffmpeg: {e}") def write_frame(self, frame: np.ndarray) -> bool: """ Write a single frame to the video Args: frame: Frame to write (BGR format) Returns: True if successful """ if self.ffmpeg_process is None or self.ffmpeg_process.poll() is not None: raise RuntimeError("FFmpeg process is not running") try: # Ensure correct shape if frame.shape != (self.height, self.width, 3): raise ValueError( f"Frame shape {frame.shape} doesn't match expected " f"({self.height}, {self.width}, 3)" ) # Ensure correct dtype if frame.dtype != np.uint8: frame = frame.astype(np.uint8) # Write raw frame data to pipe self.ffmpeg_process.stdin.write(frame.tobytes()) self.ffmpeg_process.stdin.flush() self.frames_written += 1 # Periodic progress update if self.frames_written % 100 == 0: print(f" Written {self.frames_written} frames...", end='\r') return True except BrokenPipeError: # Check if ffmpeg failed if self.ffmpeg_process.poll() is not None: stderr = self.ffmpeg_process.stderr.read().decode() raise RuntimeError(f"FFmpeg process died: {stderr}") raise except Exception as e: raise RuntimeError(f"Failed to write frame: {e}") def write_frame_alpha(self, frame: np.ndarray, alpha: np.ndarray) -> bool: """ Write frame with alpha channel (converts to green screen) Args: frame: RGB frame alpha: Alpha mask (0-255) Returns: True if successful """ # Create green screen composite green_bg = np.full_like(frame, [0, 255, 0], dtype=np.uint8) # Normalize alpha to 0-1 if alpha.dtype == np.uint8: alpha_float = alpha.astype(np.float32) / 255.0 else: alpha_float = alpha # Expand alpha to 3 channels if needed if alpha_float.ndim == 2: alpha_float = np.expand_dims(alpha_float, axis=2) alpha_float = np.repeat(alpha_float, 3, axis=2) # Composite composite = (frame * alpha_float + green_bg * (1 - alpha_float)).astype(np.uint8) return self.write_frame(composite) def get_progress(self) -> Dict[str, Any]: """Get writing progress""" return { 'frames_written': self.frames_written, 'duration_seconds': self.frames_written / self.fps if self.fps > 0 else 0, 'output_path': str(self.output_path), 'process_alive': self.ffmpeg_process is not None and self.ffmpeg_process.poll() is None } def close(self) -> None: """Close ffmpeg process and finalize video""" if self.ffmpeg_process is not None: try: # Close stdin to signal end of input if self.ffmpeg_process.stdin: self.ffmpeg_process.stdin.close() # Wait for ffmpeg to finish (with timeout) print(f"\nšŸŽ¬ Finalizing video with {self.frames_written} frames...") self.ffmpeg_process.wait(timeout=30) # Check return code if self.ffmpeg_process.returncode != 0: stderr = self.ffmpeg_process.stderr.read().decode() warnings.warn(f"FFmpeg exited with code {self.ffmpeg_process.returncode}: {stderr}") else: print(f"āœ… Video saved: {self.output_path}") except subprocess.TimeoutExpired: print("āš ļø FFmpeg taking too long, terminating...") self.ffmpeg_process.terminate() self.ffmpeg_process.wait(timeout=5) except Exception as e: warnings.warn(f"Error closing ffmpeg: {e}") if self.ffmpeg_process.poll() is None: self.ffmpeg_process.kill() finally: self.ffmpeg_process = None def __enter__(self): """Context manager support""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager cleanup""" self.close() def __del__(self): """Ensure cleanup on deletion""" try: self.close() except: pass