mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-09-27 04:36:12 +08:00
164 lines
2.9 KiB
Go
164 lines
2.9 KiB
Go
package streams
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
|
)
|
|
|
|
func (s *Stream) Play(urlOrProd any) error {
|
|
s.mu.Lock()
|
|
for _, producer := range s.producers {
|
|
if producer.state == stateInternal && producer.conn != nil {
|
|
_ = producer.conn.Stop()
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
var source string
|
|
var src core.Producer
|
|
|
|
switch urlOrProd.(type) {
|
|
case string:
|
|
if source = urlOrProd.(string); source == "" {
|
|
return nil
|
|
}
|
|
case core.Producer:
|
|
src = urlOrProd.(core.Producer)
|
|
}
|
|
|
|
for _, producer := range s.producers {
|
|
if producer.conn == nil {
|
|
continue
|
|
}
|
|
|
|
cons, ok := producer.conn.(core.Consumer)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if src == nil {
|
|
var err error
|
|
if src, err = GetProducer(source); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !matchMedia(src, cons) {
|
|
continue
|
|
}
|
|
|
|
s.AddInternalProducer(src)
|
|
|
|
go func() {
|
|
_ = src.Start()
|
|
s.RemoveProducer(src)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
for _, producer := range s.producers {
|
|
// start new client
|
|
dst, err := GetProducer(producer.url)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// check if client support consumer interface
|
|
cons, ok := dst.(core.Consumer)
|
|
if !ok {
|
|
_ = dst.Stop()
|
|
continue
|
|
}
|
|
|
|
// start new producer
|
|
if src == nil {
|
|
if src, err = GetProducer(source); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !matchMedia(src, cons) {
|
|
_ = dst.Stop()
|
|
continue
|
|
}
|
|
|
|
s.AddInternalProducer(src)
|
|
s.AddInternalConsumer(cons)
|
|
|
|
go func() {
|
|
_ = dst.Start()
|
|
_ = src.Stop()
|
|
s.RemoveInternalConsumer(cons)
|
|
}()
|
|
|
|
go func() {
|
|
_ = src.Start()
|
|
// little timeout before stop dst, so the buffer can be transferred
|
|
time.Sleep(time.Second)
|
|
_ = dst.Stop()
|
|
s.RemoveProducer(src)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
return errors.New("can't find consumer")
|
|
}
|
|
|
|
func (s *Stream) AddInternalProducer(conn core.Producer) {
|
|
producer := &Producer{conn: conn, state: stateInternal, url: "internal"}
|
|
s.mu.Lock()
|
|
s.producers = append(s.producers, producer)
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *Stream) AddInternalConsumer(conn core.Consumer) {
|
|
s.mu.Lock()
|
|
s.consumers = append(s.consumers, conn)
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *Stream) RemoveInternalConsumer(conn core.Consumer) {
|
|
s.mu.Lock()
|
|
for i, consumer := range s.consumers {
|
|
if consumer == conn {
|
|
s.consumers = append(s.consumers[:i], s.consumers[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func matchMedia(prod core.Producer, cons core.Consumer) bool {
|
|
for _, consMedia := range cons.GetMedias() {
|
|
for _, prodMedia := range prod.GetMedias() {
|
|
if prodMedia.Direction != core.DirectionRecvonly {
|
|
continue
|
|
}
|
|
|
|
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
|
|
if prodCodec == nil {
|
|
continue
|
|
}
|
|
|
|
track, err := prod.GetTrack(prodMedia, prodCodec)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("[streams] can't get track")
|
|
continue
|
|
}
|
|
|
|
if err = cons.AddTrack(consMedia, consCodec, track); err != nil {
|
|
log.Warn().Err(err).Msg("[streams] can't add track")
|
|
continue
|
|
}
|
|
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|