Implement Tuner v1 — Go backend, React frontend, Docker setup
- Go backend: REST API, M3U parser, FFmpeg/MediaMTX process manager - React/Vite frontend: HLS player, admin panel, channel browser (dark theme) - MediaMTX config for RTMP ingest + HLS output - Multi-stage Dockerfile (Go + Bun + Alpine runtime) - docker-compose.yml for single-container deployment - Sample M3U playlist with test streams Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
189
backend/api/admin.go
Normal file
189
backend/api/admin.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"tuner/m3u"
|
||||
"tuner/models"
|
||||
"tuner/process"
|
||||
)
|
||||
|
||||
// App holds shared application state for all handlers.
|
||||
type App struct {
|
||||
mu sync.RWMutex
|
||||
Channels []models.Channel
|
||||
Status models.StreamStatus
|
||||
PlaylistPath string
|
||||
MediaMTX *process.MediaMTXManager
|
||||
FFmpeg *process.FFmpegManager
|
||||
}
|
||||
|
||||
// HandleChannels lists, searches, and filters channels.
|
||||
// GET /api/admin/channels?search=<q>&group=<g>
|
||||
func (a *App) HandleChannels(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
search := r.URL.Query().Get("search")
|
||||
group := r.URL.Query().Get("group")
|
||||
|
||||
a.mu.RLock()
|
||||
channels := m3u.FilterChannels(a.Channels, search, group)
|
||||
a.mu.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(channels)
|
||||
}
|
||||
|
||||
// HandleGroups returns the list of channel groups.
|
||||
// GET /api/admin/groups
|
||||
func (a *App) HandleGroups(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
a.mu.RLock()
|
||||
groups := m3u.GetGroups(a.Channels)
|
||||
a.mu.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(groups)
|
||||
}
|
||||
|
||||
// HandleSetSource sets the active source (obs or iptv).
|
||||
// POST /api/admin/source {"source": "obs"|"iptv"}
|
||||
func (a *App) HandleSetSource(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Source string `json:"source"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "invalid json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if req.Source != "obs" && req.Source != "iptv" {
|
||||
http.Error(w, `source must be "obs" or "iptv"`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
a.Status.Source = req.Source
|
||||
a.mu.Unlock()
|
||||
|
||||
// If switching to OBS, stop FFmpeg relay
|
||||
if req.Source == "obs" {
|
||||
if err := a.FFmpeg.Stop(); err != nil {
|
||||
log.Printf("[admin] error stopping ffmpeg: %v", err)
|
||||
}
|
||||
a.mu.Lock()
|
||||
a.Status.ChannelName = ""
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
log.Printf("[admin] source set to %s", req.Source)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "ok", "source": req.Source})
|
||||
}
|
||||
|
||||
// HandleSetChannel selects an IPTV channel and starts FFmpeg relay.
|
||||
// POST /api/admin/channel {"channel_id": "..."}
|
||||
func (a *App) HandleSetChannel(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
ChannelID string `json:"channel_id"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "invalid json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Find the channel
|
||||
a.mu.RLock()
|
||||
var found *models.Channel
|
||||
for i := range a.Channels {
|
||||
if a.Channels[i].ID == req.ChannelID {
|
||||
found = &a.Channels[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
a.mu.RUnlock()
|
||||
|
||||
if found == nil {
|
||||
http.Error(w, "channel not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// Start FFmpeg with the channel's stream URL
|
||||
if err := a.FFmpeg.Start(found.StreamURL); err != nil {
|
||||
http.Error(w, "failed to start stream: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
a.Status.Source = "iptv"
|
||||
a.Status.ChannelName = found.Name
|
||||
a.Status.Live = true
|
||||
a.mu.Unlock()
|
||||
|
||||
log.Printf("[admin] channel set to %s (%s)", found.Name, found.ID)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "ok", "channel": found.Name})
|
||||
}
|
||||
|
||||
// HandleReloadPlaylist re-parses the M3U playlist from disk.
|
||||
// POST /api/admin/playlist/reload
|
||||
func (a *App) HandleReloadPlaylist(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
channels, err := m3u.ParseFile(a.PlaylistPath)
|
||||
if err != nil {
|
||||
http.Error(w, "failed to reload playlist: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
a.Channels = channels
|
||||
a.mu.Unlock()
|
||||
|
||||
log.Printf("[admin] playlist reloaded: %d channels", len(channels))
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{"status": "ok", "channels": len(channels)})
|
||||
}
|
||||
|
||||
// HandleProcessStatus returns the status of managed processes.
|
||||
// GET /api/admin/process/status
|
||||
func (a *App) HandleProcessStatus(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
status := models.SystemStatus{
|
||||
MediaMTX: a.MediaMTX.Status(),
|
||||
FFmpeg: a.FFmpeg.Status(),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(status)
|
||||
}
|
||||
32
backend/api/middleware.go
Normal file
32
backend/api/middleware.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CORS wraps a handler with permissive CORS headers (allow all origins for MVP).
|
||||
func CORS(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
|
||||
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// Logging wraps a handler with request logging (method, path, duration).
|
||||
func Logging(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start).Truncate(time.Microsecond))
|
||||
})
|
||||
}
|
||||
22
backend/api/status.go
Normal file
22
backend/api/status.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// HandleStatus returns the current stream status as JSON.
|
||||
// GET /api/status
|
||||
func (a *App) HandleStatus(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
a.mu.RLock()
|
||||
status := a.Status
|
||||
a.mu.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(status)
|
||||
}
|
||||
3
backend/go.mod
Normal file
3
backend/go.mod
Normal file
@@ -0,0 +1,3 @@
|
||||
module tuner
|
||||
|
||||
go 1.22
|
||||
142
backend/m3u/parser.go
Normal file
142
backend/m3u/parser.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package m3u
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"tuner/models"
|
||||
)
|
||||
|
||||
// ParseFile reads and parses an M3U/M3U8 playlist file into a slice of Channels.
|
||||
func ParseFile(path string) ([]models.Channel, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open playlist: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var channels []models.Channel
|
||||
scanner := bufio.NewScanner(f)
|
||||
|
||||
var extinf string
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
|
||||
if strings.HasPrefix(line, "#EXTINF:") {
|
||||
extinf = line
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip comments and blank lines
|
||||
if line == "" || strings.HasPrefix(line, "#") {
|
||||
continue
|
||||
}
|
||||
|
||||
// If we have a pending EXTINF, this line is the URL
|
||||
if extinf != "" {
|
||||
ch := parseExtinf(extinf, line)
|
||||
channels = append(channels, ch)
|
||||
extinf = ""
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, fmt.Errorf("read playlist: %w", err)
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
// parseExtinf extracts channel metadata from an #EXTINF line and pairs it with a stream URL.
|
||||
func parseExtinf(extinf string, streamURL string) models.Channel {
|
||||
ch := models.Channel{
|
||||
StreamURL: streamURL,
|
||||
}
|
||||
|
||||
// Extract attributes from the EXTINF line
|
||||
ch.Name = extractDisplayName(extinf)
|
||||
ch.Group = extractAttribute(extinf, "group-title")
|
||||
ch.LogoURL = extractAttribute(extinf, "tvg-logo")
|
||||
|
||||
tvgName := extractAttribute(extinf, "tvg-name")
|
||||
if tvgName != "" && ch.Name == "" {
|
||||
ch.Name = tvgName
|
||||
}
|
||||
|
||||
// Generate stable ID from name + URL
|
||||
ch.ID = generateID(ch.Name, ch.StreamURL)
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// extractAttribute extracts a named attribute value from an EXTINF line.
|
||||
// Example: `#EXTINF:-1 tvg-name="CNN" group-title="News",CNN HD`
|
||||
func extractAttribute(line string, attr string) string {
|
||||
key := attr + `="`
|
||||
idx := strings.Index(line, key)
|
||||
if idx == -1 {
|
||||
return ""
|
||||
}
|
||||
start := idx + len(key)
|
||||
end := strings.Index(line[start:], `"`)
|
||||
if end == -1 {
|
||||
return ""
|
||||
}
|
||||
return line[start : start+end]
|
||||
}
|
||||
|
||||
// extractDisplayName extracts the display name (the part after the last comma in the EXTINF line).
|
||||
func extractDisplayName(line string) string {
|
||||
// The display name is after the last comma
|
||||
idx := strings.LastIndex(line, ",")
|
||||
if idx == -1 {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(line[idx+1:])
|
||||
}
|
||||
|
||||
// generateID creates a stable ID for a channel based on its name and stream URL.
|
||||
func generateID(name, url string) string {
|
||||
h := sha256.Sum256([]byte(name + "|" + url))
|
||||
return fmt.Sprintf("%x", h[:8])
|
||||
}
|
||||
|
||||
// FilterChannels returns channels matching the given search term and/or group.
|
||||
func FilterChannels(channels []models.Channel, search string, group string) []models.Channel {
|
||||
if search == "" && group == "" {
|
||||
return channels
|
||||
}
|
||||
|
||||
searchLower := strings.ToLower(search)
|
||||
var result []models.Channel
|
||||
|
||||
for _, ch := range channels {
|
||||
if group != "" && !strings.EqualFold(ch.Group, group) {
|
||||
continue
|
||||
}
|
||||
if search != "" && !strings.Contains(strings.ToLower(ch.Name), searchLower) {
|
||||
continue
|
||||
}
|
||||
result = append(result, ch)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// GetGroups returns a deduplicated, sorted list of group names from the channel list.
|
||||
func GetGroups(channels []models.Channel) []string {
|
||||
seen := make(map[string]bool)
|
||||
var groups []string
|
||||
|
||||
for _, ch := range channels {
|
||||
if ch.Group != "" && !seen[ch.Group] {
|
||||
seen[ch.Group] = true
|
||||
groups = append(groups, ch.Group)
|
||||
}
|
||||
}
|
||||
|
||||
return groups
|
||||
}
|
||||
124
backend/main.go
Normal file
124
backend/main.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"tuner/api"
|
||||
"tuner/m3u"
|
||||
"tuner/process"
|
||||
)
|
||||
|
||||
func getEnv(key, fallback string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func main() {
|
||||
playlistPath := getEnv("PLAYLIST_PATH", "/data/playlist.m3u")
|
||||
mediamtxBin := getEnv("MEDIAMTX_PATH", "mediamtx")
|
||||
mediamtxCfg := getEnv("MEDIAMTX_CONFIG", "mediamtx.yml")
|
||||
port := getEnv("RESTREAMER_PORT", "8080")
|
||||
|
||||
// Initialize process managers
|
||||
mtxManager := process.NewMediaMTXManager(mediamtxBin, mediamtxCfg)
|
||||
ffmpegManager := process.NewFFmpegManager()
|
||||
|
||||
// Build shared app state
|
||||
app := &api.App{
|
||||
PlaylistPath: playlistPath,
|
||||
MediaMTX: mtxManager,
|
||||
FFmpeg: ffmpegManager,
|
||||
}
|
||||
|
||||
// Load playlist (warn but don't crash if missing)
|
||||
channels, err := m3u.ParseFile(playlistPath)
|
||||
if err != nil {
|
||||
log.Printf("[startup] warning: could not load playlist %s: %v", playlistPath, err)
|
||||
} else {
|
||||
app.Channels = channels
|
||||
log.Printf("[startup] loaded %d channels from %s", len(channels), playlistPath)
|
||||
}
|
||||
|
||||
// Start MediaMTX (warn but don't crash if binary not found)
|
||||
if err := mtxManager.Start(); err != nil {
|
||||
log.Printf("[startup] warning: could not start mediamtx: %v", err)
|
||||
}
|
||||
|
||||
// Register routes
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// API routes
|
||||
mux.HandleFunc("/api/status", app.HandleStatus)
|
||||
mux.HandleFunc("/api/admin/channels", app.HandleChannels)
|
||||
mux.HandleFunc("/api/admin/groups", app.HandleGroups)
|
||||
mux.HandleFunc("/api/admin/source", app.HandleSetSource)
|
||||
mux.HandleFunc("/api/admin/channel", app.HandleSetChannel)
|
||||
mux.HandleFunc("/api/admin/playlist/reload", app.HandleReloadPlaylist)
|
||||
mux.HandleFunc("/api/admin/process/status", app.HandleProcessStatus)
|
||||
|
||||
// Serve frontend static files (falls back to index.html for SPA routing)
|
||||
frontendDir := getEnv("FRONTEND_DIR", "frontend/dist")
|
||||
if info, err := os.Stat(frontendDir); err == nil && info.IsDir() {
|
||||
fs := http.FileServer(http.Dir(frontendDir))
|
||||
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Try to serve the file; if it doesn't exist, serve index.html (SPA fallback)
|
||||
path := frontendDir + r.URL.Path
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) && r.URL.Path != "/" {
|
||||
http.ServeFile(w, r, frontendDir+"/index.html")
|
||||
return
|
||||
}
|
||||
fs.ServeHTTP(w, r)
|
||||
}))
|
||||
log.Printf("[startup] serving frontend from %s", frontendDir)
|
||||
} else {
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Write([]byte("Tuner backend running. Frontend not found."))
|
||||
})
|
||||
log.Printf("[startup] frontend dir %s not found, serving placeholder", frontendDir)
|
||||
}
|
||||
|
||||
// Wrap with middleware
|
||||
handler := api.Logging(api.CORS(mux))
|
||||
|
||||
server := &http.Server{
|
||||
Addr: ":" + port,
|
||||
Handler: handler,
|
||||
}
|
||||
|
||||
// Graceful shutdown
|
||||
go func() {
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
sig := <-sigCh
|
||||
log.Printf("[shutdown] received %v, shutting down...", sig)
|
||||
|
||||
// Stop child processes
|
||||
if err := ffmpegManager.Stop(); err != nil {
|
||||
log.Printf("[shutdown] error stopping ffmpeg: %v", err)
|
||||
}
|
||||
if err := mtxManager.Stop(); err != nil {
|
||||
log.Printf("[shutdown] error stopping mediamtx: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
log.Printf("[shutdown] http server shutdown error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf("[startup] listening on :%s", port)
|
||||
if err := server.ListenAndServe(); err != http.ErrServerClosed {
|
||||
log.Fatalf("[fatal] server error: %v", err)
|
||||
}
|
||||
log.Println("[shutdown] complete")
|
||||
}
|
||||
31
backend/models/types.go
Normal file
31
backend/models/types.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package models
|
||||
|
||||
// Channel represents a single IPTV channel parsed from an M3U playlist.
|
||||
type Channel struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Group string `json:"group"`
|
||||
LogoURL string `json:"logo_url"`
|
||||
StreamURL string `json:"stream_url"`
|
||||
}
|
||||
|
||||
// StreamStatus represents the current streaming state.
|
||||
type StreamStatus struct {
|
||||
Source string `json:"source"` // "obs" or "iptv"
|
||||
ChannelName string `json:"channel_name"` // current IPTV channel name (empty if OBS)
|
||||
Live bool `json:"live"`
|
||||
}
|
||||
|
||||
// ProcessInfo represents the status of a managed child process.
|
||||
type ProcessInfo struct {
|
||||
Running bool `json:"running"`
|
||||
PID int `json:"pid"`
|
||||
Uptime string `json:"uptime"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// SystemStatus holds status for all managed processes.
|
||||
type SystemStatus struct {
|
||||
MediaMTX ProcessInfo `json:"mediamtx"`
|
||||
FFmpeg ProcessInfo `json:"ffmpeg"`
|
||||
}
|
||||
115
backend/process/ffmpeg.go
Normal file
115
backend/process/ffmpeg.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package process
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tuner/models"
|
||||
)
|
||||
|
||||
// FFmpegManager manages an FFmpeg child process for IPTV relay.
|
||||
type FFmpegManager struct {
|
||||
mu sync.Mutex
|
||||
cmd *exec.Cmd
|
||||
startTime time.Time
|
||||
lastError string
|
||||
streamURL string
|
||||
}
|
||||
|
||||
// NewFFmpegManager creates a new FFmpeg process manager.
|
||||
func NewFFmpegManager() *FFmpegManager {
|
||||
return &FFmpegManager{}
|
||||
}
|
||||
|
||||
// Start spawns an FFmpeg process to pull from streamURL and push RTMP to MediaMTX.
|
||||
// It kills any existing FFmpeg process first.
|
||||
func (f *FFmpegManager) Start(streamURL string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
// Kill existing process if running
|
||||
f.stopLocked()
|
||||
|
||||
f.cmd = exec.Command("ffmpeg",
|
||||
"-re",
|
||||
"-i", streamURL,
|
||||
"-c", "copy",
|
||||
"-f", "flv",
|
||||
"rtmp://localhost:1935/live/stream",
|
||||
)
|
||||
f.streamURL = streamURL
|
||||
f.lastError = ""
|
||||
|
||||
if err := f.cmd.Start(); err != nil {
|
||||
f.lastError = err.Error()
|
||||
return fmt.Errorf("start ffmpeg: %w", err)
|
||||
}
|
||||
|
||||
f.startTime = time.Now()
|
||||
log.Printf("[ffmpeg] started with pid %d, source: %s", f.cmd.Process.Pid, streamURL)
|
||||
|
||||
// Monitor in background
|
||||
go f.monitor()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FFmpegManager) monitor() {
|
||||
err := f.cmd.Wait()
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
f.lastError = err.Error()
|
||||
log.Printf("[ffmpeg] exited: %v", err)
|
||||
} else {
|
||||
log.Println("[ffmpeg] exited (status 0)")
|
||||
}
|
||||
|
||||
f.cmd = nil
|
||||
}
|
||||
|
||||
// Stop kills the FFmpeg process.
|
||||
func (f *FFmpegManager) Stop() error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
return f.stopLocked()
|
||||
}
|
||||
|
||||
func (f *FFmpegManager) stopLocked() error {
|
||||
if f.cmd == nil || f.cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := f.cmd.Process.Kill(); err != nil {
|
||||
return fmt.Errorf("kill ffmpeg: %w", err)
|
||||
}
|
||||
|
||||
// Wait for the process to fully exit to avoid zombies
|
||||
_ = f.cmd.Wait()
|
||||
f.cmd = nil
|
||||
log.Println("[ffmpeg] killed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Status returns the current FFmpeg process status.
|
||||
func (f *FFmpegManager) Status() models.ProcessInfo {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
info := models.ProcessInfo{
|
||||
Error: f.lastError,
|
||||
}
|
||||
|
||||
if f.cmd != nil && f.cmd.Process != nil {
|
||||
info.Running = true
|
||||
info.PID = f.cmd.Process.Pid
|
||||
info.Uptime = time.Since(f.startTime).Truncate(time.Second).String()
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
160
backend/process/mediamtx.go
Normal file
160
backend/process/mediamtx.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package process
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tuner/models"
|
||||
)
|
||||
|
||||
// MediaMTXManager manages the MediaMTX child process.
|
||||
type MediaMTXManager struct {
|
||||
mu sync.Mutex
|
||||
cmd *exec.Cmd
|
||||
binaryPath string
|
||||
configPath string
|
||||
startTime time.Time
|
||||
lastError string
|
||||
stopCh chan struct{}
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// NewMediaMTXManager creates a new manager for the MediaMTX process.
|
||||
func NewMediaMTXManager(binaryPath, configPath string) *MediaMTXManager {
|
||||
return &MediaMTXManager{
|
||||
binaryPath: binaryPath,
|
||||
configPath: configPath,
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the MediaMTX process. It auto-restarts on unexpected exit.
|
||||
func (m *MediaMTXManager) Start() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if m.cmd != nil && m.cmd.Process != nil {
|
||||
return fmt.Errorf("mediamtx already running (pid %d)", m.cmd.Process.Pid)
|
||||
}
|
||||
|
||||
return m.startLocked()
|
||||
}
|
||||
|
||||
func (m *MediaMTXManager) startLocked() error {
|
||||
_, err := exec.LookPath(m.binaryPath)
|
||||
if err != nil {
|
||||
m.lastError = fmt.Sprintf("binary not found: %s", m.binaryPath)
|
||||
return fmt.Errorf("mediamtx binary not found: %s", m.binaryPath)
|
||||
}
|
||||
|
||||
m.cmd = exec.Command(m.binaryPath, m.configPath)
|
||||
m.lastError = ""
|
||||
m.stopped = false
|
||||
m.stopCh = make(chan struct{})
|
||||
|
||||
if err := m.cmd.Start(); err != nil {
|
||||
m.lastError = err.Error()
|
||||
return fmt.Errorf("start mediamtx: %w", err)
|
||||
}
|
||||
|
||||
m.startTime = time.Now()
|
||||
log.Printf("[mediamtx] started with pid %d", m.cmd.Process.Pid)
|
||||
|
||||
// Monitor in background and auto-restart on unexpected exit
|
||||
go m.monitor()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MediaMTXManager) monitor() {
|
||||
err := m.cmd.Wait()
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Check if this was a deliberate stop
|
||||
select {
|
||||
case <-m.stopCh:
|
||||
log.Println("[mediamtx] stopped")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
m.lastError = err.Error()
|
||||
log.Printf("[mediamtx] exited unexpectedly: %v — restarting in 2s", err)
|
||||
} else {
|
||||
log.Println("[mediamtx] exited unexpectedly (status 0) — restarting in 2s")
|
||||
}
|
||||
|
||||
m.cmd = nil
|
||||
|
||||
// Auto-restart after a brief delay (without lock held)
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.stopped {
|
||||
return
|
||||
}
|
||||
if m.cmd != nil {
|
||||
return // already restarted
|
||||
}
|
||||
if err := m.startLocked(); err != nil {
|
||||
log.Printf("[mediamtx] auto-restart failed: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop kills the MediaMTX process.
|
||||
func (m *MediaMTXManager) Stop() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.stopped = true
|
||||
|
||||
if m.cmd == nil || m.cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if m.stopCh != nil {
|
||||
close(m.stopCh)
|
||||
}
|
||||
|
||||
if err := m.cmd.Process.Kill(); err != nil {
|
||||
return fmt.Errorf("kill mediamtx: %w", err)
|
||||
}
|
||||
|
||||
m.cmd = nil
|
||||
log.Println("[mediamtx] killed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Restart stops and restarts the MediaMTX process.
|
||||
func (m *MediaMTXManager) Restart() error {
|
||||
if err := m.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
return m.Start()
|
||||
}
|
||||
|
||||
// Status returns the current process status.
|
||||
func (m *MediaMTXManager) Status() models.ProcessInfo {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
info := models.ProcessInfo{
|
||||
Error: m.lastError,
|
||||
}
|
||||
|
||||
if m.cmd != nil && m.cmd.Process != nil {
|
||||
info.Running = true
|
||||
info.PID = m.cmd.Process.Pid
|
||||
info.Uptime = time.Since(m.startTime).Truncate(time.Second).String()
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
Reference in New Issue
Block a user