From 5e41a2f6bb0ec3dc8809e183bede8c782b19ffd7 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Tue, 20 Feb 2024 18:08:21 +0800 Subject: [PATCH] feat: sei track use channel to replace data track --- stream.go | 15 +++++---------- subscribers.go | 5 +++++ track/channel.go | 24 ++++++++++++++++++++++++ track/reader-av.go | 2 +- track/video.go | 19 ++++++++++--------- 5 files changed, 45 insertions(+), 20 deletions(-) create mode 100644 track/channel.go diff --git a/stream.go b/stream.go index 7f85f6b..9b3ed58 100644 --- a/stream.go +++ b/stream.go @@ -128,7 +128,7 @@ type Tracks struct { Data []common.Track MainVideo *track.Video MainAudio *track.Audio - SEI *track.Data[[]byte] + SEI *track.Channel[[]byte] marshalLock sync.Mutex } @@ -147,8 +147,7 @@ func (tracks *Tracks) Add(name string, t Track) bool { tracks.SetIDR(v) } if tracks.SEI != nil { - v.SEIReader = &track.DataReader[[]byte]{} - v.SEIReader.Ring = tracks.SEI.Ring + v.SEIReader = tracks.SEI.CreateReader(100) } case *track.Audio: if tracks.MainAudio == nil { @@ -194,7 +193,7 @@ func (tracks *Tracks) AddSEI(t byte, data []byte) bool { buffer.WriteByte(byte(l)) buffer.Write(data) buffer.WriteByte(0x80) - tracks.SEI.Push(buffer) + tracks.SEI.Write(buffer) return true } return false @@ -577,12 +576,8 @@ func (s *Stream) run() { } if conf.InsertSEI { if s.Tracks.SEI == nil { - s.Tracks.SEI = track.NewDataTrack[[]byte]("sei") - s.Tracks.SEI.Locker = &sync.Mutex{} - s.Tracks.SEI.SetStuff(s) - if s.Tracks.Add("sei", s.Tracks.SEI) { - s.Info("sei track added") - } + s.Tracks.SEI = &track.Channel[[]byte]{} + s.Info("sei track added") } } v.Resolve() diff --git a/subscribers.go b/subscribers.go index 4c80d4d..1624185 100644 --- a/subscribers.go +++ b/subscribers.go @@ -125,9 +125,14 @@ func (s *Subscribers) Find(id string) ISubscriber { func (s *Subscribers) Delete(suber ISubscriber) { io := suber.GetSubscriber() + io.TrackPlayer.Audio = nil + io.TrackPlayer.AudioReader = nil + io.TrackPlayer.Video = nil + io.TrackPlayer.VideoReader = nil for _, reader := range io.readers { reader.Track.Debug("reader -1", zap.Int32("count", reader.Track.ReaderCount.Add(-1))) } + io.readers = nil if _, ok := s.public[suber]; ok { delete(s.public, suber) io.Info("suber -1", zap.Int("remains", s.Len())) diff --git a/track/channel.go b/track/channel.go new file mode 100644 index 0000000..86a7bef --- /dev/null +++ b/track/channel.go @@ -0,0 +1,24 @@ +package track + +type Channel[T any] struct { + listeners []chan T +} + +func (r *Channel[T]) CreateReader(l int) chan T { + c := make(chan T, l) + r.listeners = append(r.listeners, c) + return c +} + +func (r *Channel[T]) AddListener(c chan T) { + r.listeners = append(r.listeners, c) +} + +func (r *Channel[T]) Write(data T) { + for _, listener := range r.listeners { + if len(listener) == cap(listener) { + <-listener + } + listener <- data + } +} \ No newline at end of file diff --git a/track/reader-av.go b/track/reader-av.go index 59a28ea..4e835df 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -129,7 +129,7 @@ func (r *AVRingReader) ReadFrame(mode int) (err error) { // r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds()) r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence) // fmt.Println(r.Track.Name, r.Delay) - // fmt.Println(r.Track.Name, r.State, r.Value.Timestamp, r.SkipTs, r.AbsTime) + // fmt.Println(r.Track.Name, r.Value.Sequence, r.Delay, r.AbsTime) return } func (r *AVRingReader) GetPTS32() uint32 { diff --git a/track/video.go b/track/video.go index 7579633..0486ae9 100644 --- a/track/video.go +++ b/track/video.go @@ -22,9 +22,9 @@ type Video struct { lostFlag bool // 是否丢帧 codec.SPSInfo ParamaterSets `json:"-" yaml:"-"` - SPS []byte `json:"-" yaml:"-"` - PPS []byte `json:"-" yaml:"-"` - SEIReader *DataReader[[]byte] `json:"-" yaml:"-"` + SPS []byte `json:"-" yaml:"-"` + PPS []byte `json:"-" yaml:"-"` + SEIReader chan []byte `json:"-" yaml:"-"` } func (v *Video) Attach() { @@ -249,15 +249,16 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { func (vt *Video) Flush() { rv := vt.Value - if vt.SEIReader != nil { - if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil { + if vt.SEIReader != nil && len(vt.SEIReader) > 0 { + for seiFrame := range vt.SEIReader { var au util.BLL au.Push(vt.SpesificTrack.GetNALU_SEI()) - au.Push(vt.BytesPool.GetShell(seiFrame.Data)) - vt.Info("sei", zap.Int("len", len(seiFrame.Data))) + au.Push(vt.BytesPool.GetShell(seiFrame)) + vt.Info("sei", zap.Int("len", len(seiFrame))) vt.Value.AUList.UnshiftValue(&au) - } else if err != nil { - vt.SEIReader = nil + if len(vt.SEIReader) == 0 { + break + } } } if rv.IFrame {