Files
samyolo_on_segments/main.py
2025-07-27 14:14:21 -07:00

682 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Main entry point for YOLO + SAM2 video processing pipeline.
Processes long videos by splitting into segments, detecting humans with YOLO,
and creating green screen masks with SAM2.
"""
import os
import sys
import argparse
import cv2
import numpy as np
from typing import List
# Add project root to path
sys.path.append(os.path.dirname(__file__))
from core.config_loader import ConfigLoader
from core.video_splitter import VideoSplitter
from core.yolo_detector import YOLODetector
from core.sam2_processor import SAM2Processor
from core.mask_processor import MaskProcessor
from core.video_assembler import VideoAssembler
from utils.logging_utils import setup_logging, get_logger
from utils.file_utils import ensure_directory
from utils.status_utils import print_processing_status, cleanup_incomplete_segment
logger = get_logger(__name__)
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="YOLO + SAM2 Video Processing Pipeline"
)
parser.add_argument(
"--config",
type=str,
required=True,
help="Path to YAML configuration file"
)
parser.add_argument(
"--log-file",
type=str,
help="Optional log file path"
)
parser.add_argument(
"--status",
action="store_true",
help="Show processing status and exit"
)
parser.add_argument(
"--cleanup-segment",
type=int,
help="Clean up a specific segment for restart (segment index)"
)
return parser.parse_args()
def validate_dependencies():
"""Validate that required dependencies are available."""
try:
import torch
import cv2
import numpy as np
import cupy as cp
from ultralytics import YOLO
from sam2.build_sam import build_sam2_video_predictor
logger.info("All dependencies validated successfully")
return True
except ImportError as e:
logger.error(f"Missing dependency: {e}")
logger.error("Please install requirements: pip install -r requirements.txt")
return False
def create_yolo_mask_debug_frame(detections: List[dict], video_path: str, output_path: str, scale: float = 1.0) -> bool:
"""
Create debug visualization for YOLO direct masks.
Args:
detections: List of YOLO detections with masks
video_path: Path to video file
output_path: Path to save debug image
scale: Scale factor for frame processing
Returns:
True if debug frame was created successfully
"""
try:
# Load first frame
cap = cv2.VideoCapture(video_path)
ret, original_frame = cap.read()
cap.release()
if not ret:
logger.error("Could not read first frame for YOLO mask debug")
return False
# Scale frame if needed
if scale != 1.0:
original_frame = cv2.resize(original_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
debug_frame = original_frame.copy()
# Define colors for each object
colors = {
1: (0, 255, 0), # Green for Object 1 (Left eye)
2: (255, 0, 0), # Blue for Object 2 (Right eye)
}
# Get detections with masks
detections_with_masks = [d for d in detections if d.get('has_mask', False)]
# Overlay masks with transparency
obj_id = 1
for detection in detections_with_masks[:2]: # Up to 2 objects
mask = detection['mask']
# Resize mask to match frame if needed
if mask.shape != original_frame.shape[:2]:
mask = cv2.resize(mask.astype(np.float32), (original_frame.shape[1], original_frame.shape[0]), interpolation=cv2.INTER_NEAREST)
mask = mask > 0.5
mask = mask.astype(bool)
# Apply colored overlay
color = colors.get(obj_id, (128, 128, 128))
overlay = debug_frame.copy()
overlay[mask] = color
# Blend with original (30% overlay, 70% original)
cv2.addWeighted(overlay, 0.3, debug_frame, 0.7, 0, debug_frame)
# Draw outline
contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(debug_frame, contours, -1, color, 2)
logger.info(f"YOLO Mask Debug: Object {obj_id} mask - shape: {mask.shape}, pixels: {np.sum(mask)}")
obj_id += 1
# Add title and source info
title = f"YOLO Direct Masks: {len(detections_with_masks)} objects detected"
cv2.putText(debug_frame, title, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
source_info = "Mask Source: YOLO Segmentation (DIRECT - No SAM2)"
cv2.putText(debug_frame, source_info, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # Green for YOLO
# Add object legend
y_offset = 90
for i, detection in enumerate(detections_with_masks[:2]):
obj_id = i + 1
color = colors.get(obj_id, (128, 128, 128))
text = f"Object {obj_id}: {'Left Eye' if obj_id == 1 else 'Right Eye'} (YOLO Mask)"
cv2.putText(debug_frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
y_offset += 30
# Save debug image
success = cv2.imwrite(output_path, debug_frame)
if success:
logger.info(f"YOLO Mask Debug: Saved debug frame to {output_path}")
else:
logger.error(f"Failed to save YOLO mask debug frame to {output_path}")
return success
except Exception as e:
logger.error(f"Error creating YOLO mask debug frame: {e}")
return False
def resolve_detect_segments(detect_segments, total_segments: int) -> List[int]:
"""
Resolve detect_segments configuration to list of segment indices.
Args:
detect_segments: Configuration value ("all", list, or None)
total_segments: Total number of segments
Returns:
List of segment indices to process
"""
if detect_segments == "all" or detect_segments is None:
return list(range(total_segments))
elif isinstance(detect_segments, list):
# Filter out invalid segment indices
valid_segments = [s for s in detect_segments if 0 <= s < total_segments]
if len(valid_segments) != len(detect_segments):
logger.warning(f"Some segment indices are invalid. Using: {valid_segments}")
return valid_segments
else:
logger.warning(f"Invalid detect_segments format: {detect_segments}. Using all segments.")
return list(range(total_segments))
def main():
"""Main processing pipeline."""
args = parse_arguments()
try:
# Load configuration
config = ConfigLoader(args.config)
# Setup logging
setup_logging(config.get_log_level(), args.log_file)
# Handle status check
if args.status:
output_dir = config.get_output_directory()
input_video = config.get_input_video_path()
video_name = os.path.splitext(os.path.basename(input_video))[0]
segments_dir = os.path.join(output_dir, f"{video_name}_segments")
print_processing_status(segments_dir)
return 0
# Handle segment cleanup
if args.cleanup_segment is not None:
output_dir = config.get_output_directory()
input_video = config.get_input_video_path()
video_name = os.path.splitext(os.path.basename(input_video))[0]
segments_dir = os.path.join(output_dir, f"{video_name}_segments")
segment_dir = os.path.join(segments_dir, f"segment_{args.cleanup_segment}")
if cleanup_incomplete_segment(segment_dir):
logger.info(f"Successfully cleaned up segment {args.cleanup_segment}")
return 0
else:
logger.error(f"Failed to clean up segment {args.cleanup_segment}")
return 1
logger.info("Starting YOLO + SAM2 video processing pipeline")
# Validate dependencies
if not validate_dependencies():
return 1
# Validate input video exists
input_video = config.get_input_video_path()
if not os.path.exists(input_video):
logger.error(f"Input video not found: {input_video}")
return 1
# Setup output directory
output_dir = config.get_output_directory()
ensure_directory(output_dir)
# Step 1: Split video into segments
logger.info("Step 1: Splitting video into segments")
splitter = VideoSplitter(
segment_duration=config.get_segment_duration(),
force_keyframes=config.get('video.force_keyframes', True)
)
segments_dir, segment_dirs = splitter.split_video(input_video, output_dir)
logger.info(f"Created {len(segment_dirs)} segments in {segments_dir}")
# Get detailed segment information
segments_info = splitter.get_segment_info(segments_dir)
# Resolve which segments to process with YOLO
detect_segments_config = config.get_detect_segments()
detect_segments = resolve_detect_segments(detect_segments_config, len(segments_info))
# Initialize processors once
logger.info("Step 2: Initializing YOLO detector")
# Get YOLO mode and model paths
yolo_mode = config.get('models.yolo_mode', 'detection')
detection_model = config.get('models.yolo_detection_model', config.get_yolo_model_path())
segmentation_model = config.get('models.yolo_segmentation_model', None)
logger.info(f"YOLO Mode: {yolo_mode}")
detector = YOLODetector(
detection_model_path=detection_model,
segmentation_model_path=segmentation_model,
mode=yolo_mode,
confidence_threshold=config.get_yolo_confidence(),
human_class_id=config.get_human_class_id()
)
logger.info("Step 3: Initializing SAM2 processor")
sam2_processor = SAM2Processor(
checkpoint_path=config.get_sam2_checkpoint(),
config_path=config.get_sam2_config()
)
# Initialize mask processor
mask_processor = MaskProcessor(
green_color=config.get_green_color(),
blue_color=config.get_blue_color()
)
# Process each segment sequentially (YOLO -> SAM2 -> Render)
logger.info("Step 4: Processing segments sequentially")
total_humans_detected = 0
for i, segment_info in enumerate(segments_info):
segment_idx = segment_info['index']
logger.info(f"Processing segment {segment_idx}/{len(segments_info)-1}")
# Skip if segment output already exists
output_video = os.path.join(segment_info['directory'], f"output_{segment_idx}.mp4")
if os.path.exists(output_video):
logger.info(f"Segment {segment_idx} already processed, skipping")
continue
# Determine if we should use YOLO detections or previous masks
use_detections = segment_idx in detect_segments
# First segment must use detections
if segment_idx == 0 and not use_detections:
logger.warning(f"First segment must use YOLO detection")
use_detections = True
# Get YOLO prompts or previous masks
yolo_prompts = None
previous_masks = None
if use_detections:
# Run YOLO detection on current segment
logger.info(f"Running YOLO detection on segment {segment_idx}")
detection_file = os.path.join(segment_info['directory'], "yolo_detections")
# Check if detection already exists
if os.path.exists(detection_file):
logger.info(f"Loading existing YOLO detections for segment {segment_idx}")
detections = detector.load_detections_from_file(detection_file)
else:
# Run YOLO detection on first frame
detections = detector.detect_humans_in_video_first_frame(
segment_info['video_file'],
scale=config.get_inference_scale()
)
# Save detections for future runs
detector.save_detections_to_file(detections, detection_file)
if detections:
total_humans_detected += len(detections)
logger.info(f"Found {len(detections)} humans in segment {segment_idx}")
# Get frame width from video
cap = cv2.VideoCapture(segment_info['video_file'])
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
cap.release()
yolo_prompts = detector.convert_detections_to_sam2_prompts(
detections, frame_width
)
# If no right eye detections found, run debug analysis with lower confidence
half_frame_width = frame_width // 2
right_eye_detections = [d for d in detections if (d['bbox'][0] + d['bbox'][2]) / 2 >= half_frame_width]
if len(right_eye_detections) == 0 and config.get('advanced.save_yolo_debug_frames', False):
logger.info(f"VR180 Debug: No right eye detections found, running lower confidence analysis...")
# Load first frame for debug analysis
cap = cv2.VideoCapture(segment_info['video_file'])
ret, debug_frame = cap.read()
cap.release()
if ret:
# Scale frame to match detection scale
if config.get_inference_scale() != 1.0:
scale = config.get_inference_scale()
debug_frame = cv2.resize(debug_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
# Run debug detection with lower confidence
debug_detections = detector.debug_detect_with_lower_confidence(debug_frame, debug_confidence=0.3)
# Analyze where these lower confidence detections are
debug_right_eye = [d for d in debug_detections if (d['bbox'][0] + d['bbox'][2]) / 2 >= half_frame_width]
if len(debug_right_eye) > 0:
logger.warning(f"VR180 Debug: Found {len(debug_right_eye)} right eye detections with lower confidence!")
for i, det in enumerate(debug_right_eye):
logger.warning(f"VR180 Debug: Right eye detection {i+1}: conf={det['confidence']:.3f}, bbox={det['bbox']}")
logger.warning(f"VR180 Debug: Consider lowering yolo_confidence from {config.get_yolo_confidence()} to 0.3-0.4")
else:
logger.info(f"VR180 Debug: No right eye detections found even with confidence 0.3")
logger.info(f"VR180 Debug: This confirms person is not visible in right eye view")
logger.info(f"Pipeline Debug: Segment {segment_idx} - Generated {len(yolo_prompts)} SAM2 prompts from {len(detections)} YOLO detections")
# Save debug frame with detections visualized (if enabled)
if config.get('advanced.save_yolo_debug_frames', False):
debug_frame_path = os.path.join(segment_info['directory'], "yolo_debug.jpg")
# Load first frame for debug visualization
cap = cv2.VideoCapture(segment_info['video_file'])
ret, debug_frame = cap.read()
cap.release()
if ret:
# Scale frame to match detection scale
if config.get_inference_scale() != 1.0:
scale = config.get_inference_scale()
debug_frame = cv2.resize(debug_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
detector.save_debug_frame_with_detections(debug_frame, detections, debug_frame_path, yolo_prompts)
else:
logger.warning(f"Could not load frame for debug visualization in segment {segment_idx}")
# Check if we have YOLO masks for debug visualization
has_yolo_masks = False
if detections and detector.supports_segmentation:
has_yolo_masks = any(d.get('has_mask', False) for d in detections)
# Generate first frame masks debug (SAM2 or YOLO)
first_frame_debug_path = os.path.join(segment_info['directory'], "first_frame_detection.jpg")
if has_yolo_masks:
logger.info(f"Pipeline Debug: Generating YOLO first frame masks for segment {segment_idx}")
# Create YOLO mask debug visualization
create_yolo_mask_debug_frame(detections, segment_info['video_file'], first_frame_debug_path, config.get_inference_scale())
else:
logger.info(f"Pipeline Debug: Generating SAM2 first frame masks for segment {segment_idx}")
sam2_processor.generate_first_frame_debug_masks(
segment_info['video_file'],
yolo_prompts,
first_frame_debug_path,
config.get_inference_scale()
)
else:
logger.warning(f"No humans detected in segment {segment_idx}")
# Save debug frame even when no detections (if enabled)
if config.get('advanced.save_yolo_debug_frames', False):
debug_frame_path = os.path.join(segment_info['directory'], "yolo_debug_no_detections.jpg")
# Load first frame for debug visualization
cap = cv2.VideoCapture(segment_info['video_file'])
ret, debug_frame = cap.read()
cap.release()
if ret:
# Scale frame to match detection scale
if config.get_inference_scale() != 1.0:
scale = config.get_inference_scale()
debug_frame = cv2.resize(debug_frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
# Add "No detections" text overlay
cv2.putText(debug_frame, "YOLO: No humans detected",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0,
(0, 0, 255), 2) # Red text
cv2.imwrite(debug_frame_path, debug_frame)
logger.info(f"Saved no-detection debug frame to {debug_frame_path}")
else:
logger.warning(f"Could not load frame for no-detection debug visualization in segment {segment_idx}")
elif segment_idx > 0:
# Try to load previous segment mask
for j in range(segment_idx - 1, -1, -1):
prev_segment_dir = segments_info[j]['directory']
previous_masks = sam2_processor.load_previous_segment_mask(prev_segment_dir)
if previous_masks:
logger.info(f"Using masks from segment {j} for segment {segment_idx}")
break
if not yolo_prompts and not previous_masks:
logger.error(f"No prompts or previous masks available for segment {segment_idx}")
continue
# Check if we have YOLO masks and can skip SAM2 (recheck in case detections were loaded from file)
if not 'has_yolo_masks' in locals():
has_yolo_masks = False
if detections and detector.supports_segmentation:
has_yolo_masks = any(d.get('has_mask', False) for d in detections)
if has_yolo_masks:
logger.info(f"Pipeline Debug: YOLO segmentation provided masks - using as SAM2 initial masks for segment {segment_idx}")
# Convert YOLO masks to initial masks for SAM2
cap = cv2.VideoCapture(segment_info['video_file'])
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# Convert YOLO masks to the format expected by SAM2 add_previous_masks_to_predictor
yolo_masks_dict = {}
for i, detection in enumerate(detections[:2]): # Up to 2 objects
if detection.get('has_mask', False):
mask = detection['mask']
# Resize mask to match inference scale
if config.get_inference_scale() != 1.0:
scale = config.get_inference_scale()
scaled_height = int(frame_height * scale)
scaled_width = int(frame_width * scale)
mask = cv2.resize(mask.astype(np.float32), (scaled_width, scaled_height), interpolation=cv2.INTER_NEAREST)
mask = mask > 0.5
obj_id = i + 1 # Sequential object IDs
yolo_masks_dict[obj_id] = mask.astype(bool)
logger.info(f"Pipeline Debug: YOLO mask for Object {obj_id} - shape: {mask.shape}, pixels: {np.sum(mask)}")
logger.info(f"Pipeline Debug: Using YOLO masks as SAM2 initial masks - {len(yolo_masks_dict)} objects")
# Use traditional SAM2 pipeline with YOLO masks as initial masks
previous_masks = yolo_masks_dict
yolo_prompts = None # Don't use bounding box prompts when we have masks
# Debug what we're passing to SAM2
if yolo_prompts:
logger.info(f"Pipeline Debug: Passing {len(yolo_prompts)} YOLO prompts to SAM2 for segment {segment_idx}")
for i, prompt in enumerate(yolo_prompts):
logger.info(f"Pipeline Debug: Prompt {i+1}: Object {prompt['obj_id']}, bbox={prompt['bbox']}")
if previous_masks:
logger.info(f"Pipeline Debug: Using {len(previous_masks)} previous masks for segment {segment_idx}")
logger.info(f"Pipeline Debug: Previous mask object IDs: {list(previous_masks.keys())}")
# Handle mid-segment detection if enabled (works for both detection and segmentation modes)
multi_frame_prompts = None
if config.get('advanced.enable_mid_segment_detection', False) and (yolo_prompts or has_yolo_masks):
logger.info(f"Mid-segment Detection: Enabled for segment {segment_idx}")
# Calculate frame indices for re-detection
cap = cv2.VideoCapture(segment_info['video_file'])
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
cap.release()
redetection_interval = config.get('advanced.redetection_interval', 30)
max_redetections = config.get('advanced.max_redetections_per_segment', 10)
# Generate frame indices: [30, 60, 90, ...] (skip frame 0 since we already have first frame prompts)
frame_indices = []
frame_idx = redetection_interval
while frame_idx < total_frames and len(frame_indices) < max_redetections:
frame_indices.append(frame_idx)
frame_idx += redetection_interval
if frame_indices:
logger.info(f"Mid-segment Detection: Running YOLO on frames {frame_indices} (interval={redetection_interval})")
# Run multi-frame detection
multi_frame_detections = detector.detect_humans_multi_frame(
segment_info['video_file'],
frame_indices,
scale=config.get_inference_scale()
)
# Convert detections to SAM2 prompts (different handling for segmentation vs detection mode)
multi_frame_prompts = {}
cap = cv2.VideoCapture(segment_info['video_file'])
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
for frame_idx, detections in multi_frame_detections.items():
if detections:
if has_yolo_masks:
# Segmentation mode: convert YOLO masks to SAM2 mask prompts
frame_masks = {}
for i, detection in enumerate(detections[:2]): # Up to 2 objects
if detection.get('has_mask', False):
mask = detection['mask']
# Resize mask to match inference scale
if config.get_inference_scale() != 1.0:
scale = config.get_inference_scale()
scaled_height = int(frame_height * scale)
scaled_width = int(frame_width * scale)
mask = cv2.resize(mask.astype(np.float32), (scaled_width, scaled_height), interpolation=cv2.INTER_NEAREST)
mask = mask > 0.5
obj_id = i + 1 # Sequential object IDs
frame_masks[obj_id] = mask.astype(bool)
logger.debug(f"Mid-segment Detection: Frame {frame_idx}, Object {obj_id} mask - shape: {mask.shape}, pixels: {np.sum(mask)}")
if frame_masks:
# Store as mask prompts (different format than bbox prompts)
multi_frame_prompts[frame_idx] = {'masks': frame_masks}
logger.info(f"Mid-segment Detection: Frame {frame_idx} -> {len(frame_masks)} YOLO masks")
else:
# Detection mode: convert to bounding box prompts (existing logic)
prompts = detector.convert_detections_to_sam2_prompts(detections, frame_width)
multi_frame_prompts[frame_idx] = prompts
logger.info(f"Mid-segment Detection: Frame {frame_idx} -> {len(prompts)} SAM2 prompts")
logger.info(f"Mid-segment Detection: Generated prompts for {len(multi_frame_prompts)} frames")
else:
logger.info(f"Mid-segment Detection: No additional frames to process (segment has {total_frames} frames)")
elif config.get('advanced.enable_mid_segment_detection', False):
logger.info(f"Mid-segment Detection: Skipped for segment {segment_idx} (no initial YOLO data)")
# Process segment with SAM2
logger.info(f"Pipeline Debug: Starting SAM2 processing for segment {segment_idx}")
video_segments = sam2_processor.process_single_segment(
segment_info,
yolo_prompts=yolo_prompts,
previous_masks=previous_masks,
inference_scale=config.get_inference_scale(),
multi_frame_prompts=multi_frame_prompts
)
if video_segments is None:
logger.error(f"SAM2 processing failed for segment {segment_idx}")
continue
# Debug what SAM2 produced
logger.info(f"Pipeline Debug: SAM2 completed for segment {segment_idx}")
logger.info(f"Pipeline Debug: Generated masks for {len(video_segments)} frames")
if video_segments:
# Check first frame to see what objects were tracked
first_frame_idx = min(video_segments.keys())
first_frame_objects = video_segments[first_frame_idx]
logger.info(f"Pipeline Debug: First frame contains {len(first_frame_objects)} tracked objects")
logger.info(f"Pipeline Debug: Tracked object IDs: {list(first_frame_objects.keys())}")
for obj_id, mask in first_frame_objects.items():
mask_pixels = np.sum(mask)
logger.info(f"Pipeline Debug: Object {obj_id} mask has {mask_pixels} pixels")
# Check last frame as well
last_frame_idx = max(video_segments.keys())
last_frame_objects = video_segments[last_frame_idx]
logger.info(f"Pipeline Debug: Last frame contains {len(last_frame_objects)} tracked objects")
logger.info(f"Pipeline Debug: Final object IDs: {list(last_frame_objects.keys())}")
# Save final masks for next segment
mask_path = os.path.join(segment_info['directory'], "mask.png")
sam2_processor.save_final_masks(
video_segments,
mask_path,
green_color=config.get_green_color(),
blue_color=config.get_blue_color()
)
# Apply green screen and save output video
success = mask_processor.process_segment(
segment_info,
video_segments,
use_nvenc=config.get_use_nvenc(),
bitrate=config.get_output_bitrate()
)
if success:
logger.info(f"Successfully processed segment {segment_idx}")
else:
logger.error(f"Failed to create green screen video for segment {segment_idx}")
# Log processing summary
logger.info(f"Sequential processing complete. Total humans detected: {total_humans_detected}")
# Step 3: Assemble final video
logger.info("Step 3: Assembling final video with audio")
# Initialize video assembler
assembler = VideoAssembler(
preserve_audio=config.get_preserve_audio(),
use_nvenc=config.get_use_nvenc()
)
# Verify all segments are complete
all_complete, missing = assembler.verify_segment_completeness(segments_dir)
if not all_complete:
logger.error(f"Cannot assemble video - missing segments: {missing}")
return 1
# Assemble final video
final_output = os.path.join(output_dir, config.get_output_filename())
success = assembler.assemble_final_video(
segments_dir,
input_video,
final_output,
bitrate=config.get_output_bitrate()
)
if success:
logger.info(f"Final video saved to: {final_output}")
logger.info("Pipeline completed successfully")
return 0
except Exception as e:
logger.error(f"Pipeline failed: {e}", exc_info=True)
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)