Files
go2rtc/cmd/streams/producer.go
2022-11-04 20:54:35 +03:00

177 lines
2.9 KiB
Go

package streams
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"strings"
"sync"
"time"
)
type state byte
const (
stateNone state = iota
stateMedias
stateTracks
stateStart
)
type Producer struct {
streamer.Element
url string
template string
element streamer.Producer
tracks []*streamer.Track
state state
mu sync.Mutex
restart *time.Timer
}
func (p *Producer) SetSource(s string) {
if p.template == "" {
p.template = p.url
}
p.url = strings.Replace(p.template, "{input}", s, 1)
}
func (p *Producer) GetMedias() []*streamer.Media {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
log.Debug().Msgf("[streams] probe producer url=%s", p.url)
var err error
p.element, err = GetProducer(p.url)
if err != nil || p.element == nil {
log.Error().Err(err).Caller().Send()
return nil
}
p.state = stateMedias
}
return p.element.GetMedias()
}
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
return nil
}
track := p.element.GetTrack(media, codec)
if track == nil {
return nil
}
for _, t := range p.tracks {
if track == t {
return track
}
}
if p.state == stateMedias {
p.state = stateTracks
}
p.tracks = append(p.tracks, track)
return track
}
// internals
func (p *Producer) start() {
p.mu.Lock()
defer p.mu.Unlock()
if p.state != stateTracks {
return
}
log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart
go func() {
// safe read element while mu locked
if err := p.element.Start(); err != nil {
log.Warn().Err(err).Caller().Send()
}
p.reconnect()
}()
}
func (p *Producer) reconnect() {
p.mu.Lock()
defer p.mu.Unlock()
if p.state != stateStart {
log.Debug().Msgf("[streams] closed ...")
return
}
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
var err error
p.element, err = GetProducer(p.url)
if err != nil || p.element == nil {
log.Debug().Err(err).Caller().Send()
// TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
return
}
medias := p.element.GetMedias()
// convert all old producer tracks to new tracks
for i, oldTrack := range p.tracks {
// match new element medias with old track codec
for _, media := range medias {
codec := media.MatchCodec(oldTrack.Codec)
if codec == nil {
continue
}
// move sink from old track to new track
newTrack := p.element.GetTrack(media, codec)
newTrack.GetSink(oldTrack)
p.tracks[i] = newTrack
break
}
}
go func() {
if err = p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send()
}
p.reconnect()
}()
}
func (p *Producer) stop() {
p.mu.Lock()
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
if p.element != nil {
_ = p.element.Stop()
p.element = nil
}
if p.restart != nil {
p.restart.Stop()
p.restart = nil
}
p.state = stateNone
p.tracks = nil
p.mu.Unlock()
}