diff --git a/internal/streams/add_consumer.go b/internal/streams/add_consumer.go new file mode 100644 index 00000000..d97a4266 --- /dev/null +++ b/internal/streams/add_consumer.go @@ -0,0 +1,149 @@ +package streams + +import ( + "errors" + "strings" + "sync/atomic" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func (s *Stream) AddConsumer(cons core.Consumer) (err error) { + // support for multiple simultaneous requests from different consumers + consN := atomic.AddInt32(&s.requests, 1) - 1 + + var prodErrors []error + var prodMedias []*core.Media + var prods []*Producer // matched producers for consumer + + // Step 1. Get consumer medias + consMedias := cons.GetMedias() + for _, consMedia := range consMedias { + log.Trace().Msgf("[streams] check cons=%d media=%s", consN, consMedia) + + producers: + for prodN, prod := range s.producers { + if err = prod.Dial(); err != nil { + log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url) + prodErrors = append(prodErrors, err) + continue + } + + // Step 2. Get producer medias (not tracks yet) + for _, prodMedia := range prod.GetMedias() { + log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia) + prodMedias = append(prodMedias, prodMedia) + + // Step 3. Match consumer/producer codecs list + prodCodec, consCodec := prodMedia.MatchMedia(consMedia) + if prodCodec == nil { + continue + } + + var track *core.Receiver + + switch prodMedia.Direction { + case core.DirectionRecvonly: + log.Trace().Msgf("[streams] match prod=%d => cons=%d", prodN, consN) + + // Step 4. Get recvonly track from producer + if track, err = prod.GetTrack(prodMedia, prodCodec); err != nil { + log.Info().Err(err).Msg("[streams] can't get track") + continue + } + // Step 5. Add track to consumer + if err = cons.AddTrack(consMedia, consCodec, track); err != nil { + log.Info().Err(err).Msg("[streams] can't add track") + continue + } + + case core.DirectionSendonly: + log.Trace().Msgf("[streams] match cons=%d => prod=%d", consN, prodN) + + // Step 4. Get recvonly track from consumer (backchannel) + if track, err = cons.(core.Producer).GetTrack(consMedia, consCodec); err != nil { + log.Info().Err(err).Msg("[streams] can't get track") + continue + } + // Step 5. Add track to producer + if err = prod.AddTrack(prodMedia, prodCodec, track); err != nil { + log.Info().Err(err).Msg("[streams] can't add track") + continue + } + } + + prods = append(prods, prod) + + if !consMedia.MatchAll() { + break producers + } + } + } + } + + // stop producers if they don't have readers + if atomic.AddInt32(&s.requests, -1) == 0 { + s.stopProducers() + } + + if len(prods) == 0 { + return formatError(consMedias, prodMedias, prodErrors) + } + + s.mu.Lock() + s.consumers = append(s.consumers, cons) + s.mu.Unlock() + + // there may be duplicates, but that's not a problem + for _, prod := range prods { + prod.start() + } + + return nil +} + +func formatError(consMedias, prodMedias []*core.Media, prodErrors []error) error { + if prodMedias != nil { + var prod, cons string + + for _, media := range prodMedias { + if media.Direction == core.DirectionRecvonly { + for _, codec := range media.Codecs { + prod = appendString(prod, codec.PrintName()) + } + } + } + + for _, media := range consMedias { + if media.Direction == core.DirectionSendonly { + for _, codec := range media.Codecs { + cons = appendString(cons, codec.PrintName()) + } + } + } + + return errors.New("streams: codecs not matched: " + prod + " => " + cons) + } + + if prodErrors != nil { + var text string + + for _, err := range prodErrors { + text = appendString(text, err.Error()) + } + + return errors.New("streams: " + text) + } + + return errors.New("streams: unknown error") +} + +func appendString(s, elem string) string { + if strings.Contains(s, elem) { + return s + } + if len(s) == 0 { + return elem + } + return s + ", " + elem +} diff --git a/internal/streams/stream.go b/internal/streams/stream.go index c918aaba..75b855b8 100644 --- a/internal/streams/stream.go +++ b/internal/streams/stream.go @@ -2,10 +2,7 @@ package streams import ( "encoding/json" - "errors" - "strings" "sync" - "sync/atomic" "github.com/AlexxIT/go2rtc/pkg/core" ) @@ -51,99 +48,6 @@ func (s *Stream) SetSource(source string) { } } -func (s *Stream) AddConsumer(cons core.Consumer) (err error) { - // support for multiple simultaneous requests from different consumers - consN := atomic.AddInt32(&s.requests, 1) - 1 - - var statErrors []error - var statMedias []*core.Media - var statProds []*Producer // matched producers for consumer - - // Step 1. Get consumer medias - for _, consMedia := range cons.GetMedias() { - log.Trace().Msgf("[streams] check cons=%d media=%s", consN, consMedia) - - producers: - for prodN, prod := range s.producers { - if err = prod.Dial(); err != nil { - log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url) - statErrors = append(statErrors, err) - continue - } - - // Step 2. Get producer medias (not tracks yet) - for _, prodMedia := range prod.GetMedias() { - log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia) - statMedias = append(statMedias, prodMedia) - - // Step 3. Match consumer/producer codecs list - prodCodec, consCodec := prodMedia.MatchMedia(consMedia) - if prodCodec == nil { - continue - } - - var track *core.Receiver - - switch prodMedia.Direction { - case core.DirectionRecvonly: - log.Trace().Msgf("[streams] match prod=%d => cons=%d", prodN, consN) - - // Step 4. Get recvonly track from producer - if track, err = prod.GetTrack(prodMedia, prodCodec); err != nil { - log.Info().Err(err).Msg("[streams] can't get track") - continue - } - // Step 5. Add track to consumer - if err = cons.AddTrack(consMedia, consCodec, track); err != nil { - log.Info().Err(err).Msg("[streams] can't add track") - continue - } - - case core.DirectionSendonly: - log.Trace().Msgf("[streams] match cons=%d => prod=%d", consN, prodN) - - // Step 4. Get recvonly track from consumer (backchannel) - if track, err = cons.(core.Producer).GetTrack(consMedia, consCodec); err != nil { - log.Info().Err(err).Msg("[streams] can't get track") - continue - } - // Step 5. Add track to producer - if err = prod.AddTrack(prodMedia, prodCodec, track); err != nil { - log.Info().Err(err).Msg("[streams] can't add track") - continue - } - } - - statProds = append(statProds, prod) - - if !consMedia.MatchAll() { - break producers - } - } - } - } - - // stop producers if they don't have readers - if atomic.AddInt32(&s.requests, -1) == 0 { - s.stopProducers() - } - - if len(statProds) == 0 { - return formatError(statMedias, statErrors) - } - - s.mu.Lock() - s.consumers = append(s.consumers, cons) - s.mu.Unlock() - - // there may be duplicates, but that's not a problem - for _, prod := range statProds { - prod.start() - } - - return nil -} - func (s *Stream) RemoveConsumer(cons core.Consumer) { _ = cons.Stop() @@ -213,52 +117,3 @@ func (s *Stream) MarshalJSON() ([]byte, error) { return json.Marshal(info) } - -func formatError(statMedias []*core.Media, statErrors []error) error { - var text string - - if statMedias == nil { - return errors.New("can't get medias for stream") - } - - for _, media := range statMedias { - if media.Direction == core.DirectionRecvonly { - continue - } - - for _, codec := range media.Codecs { - name := codec.Name - if name == core.CodecAAC { - name = "AAC" - } - if strings.Contains(text, name) { - continue - } - if len(text) > 0 { - text += "," - } - text += name - } - } - - if text != "" { - return errors.New(text) - } - - for _, err := range statErrors { - s := err.Error() - if strings.Contains(text, s) { - continue - } - if len(text) > 0 { - text += "," - } - text += s - } - - if text != "" { - return errors.New(text) - } - - return errors.New("unknown error") -} diff --git a/pkg/core/codec.go b/pkg/core/codec.go index 90af2c65..b48b629d 100644 --- a/pkg/core/codec.go +++ b/pkg/core/codec.go @@ -64,6 +64,20 @@ func (c *Codec) Kind() string { return GetKind(c.Name) } +func (c *Codec) PrintName() string { + switch c.Name { + case CodecAAC: + return "AAC" + case CodecELD: + return "AAC-ELD" + case CodecPCM: + return "PCM-S16BE" + case CodecPCML: + return "PCM-S16LE" + } + return c.Name +} + func (c *Codec) Clone() *Codec { clone := *c return &clone