From 400e8d17e1ef3afcf80256d7a7a3e94d4c2d207b Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Tue, 18 Mar 2025 19:42:11 +0800 Subject: [PATCH] fix: wrap index error --- subscriber.go | 127 +++++++++++++++++++++++++------------------------- 1 file changed, 63 insertions(+), 64 deletions(-) diff --git a/subscriber.go b/subscriber.go index b836751..0186f0e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -177,49 +177,51 @@ func (s *Subscriber) CheckWebSocket(w http.ResponseWriter, r *http.Request) (con func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time.Duration) (awi int) { if s.waitingPublish() || dataType == nil { - return + return -1 } var at *AVTrack if dataType == AVFrameType { at = s.Publisher.AudioTrack.AVTrack - awi = -1 + awi = 0 } else { at = s.Publisher.GetAudioTrack(dataType) if at != nil { - awi = at.WrapIndex + awi = at.WrapIndex + 1 } } - if at != nil { - if err := at.WaitReady(); err != nil { - return - } - s.AudioReader = NewAVRingReader(at, dataType.String()) - s.AudioReader.StartTs = startAudioTs + if at == nil { + return -1 } + if err := at.WaitReady(); err != nil { + return -1 + } + s.AudioReader = NewAVRingReader(at, dataType.String()) + s.AudioReader.StartTs = startAudioTs return } func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time.Duration) (vwi int) { if s.waitingPublish() || dataType == nil { - return + return -1 } var vt *AVTrack if dataType == AVFrameType { vt = s.Publisher.VideoTrack.AVTrack - vwi = -1 + vwi = 0 } else { vt = s.Publisher.GetVideoTrack(dataType) if vt != nil { - vwi = vt.WrapIndex + vwi = vt.WrapIndex + 1 } } - if vt != nil { - if err := vt.WaitReady(); err != nil { - return - } - s.VideoReader = NewAVRingReader(vt, dataType.String()) - s.VideoReader.StartTs = startVideoTs + if vt == nil { + return -1 } + if err := vt.WaitReady(); err != nil { + return -1 + } + s.VideoReader = NewAVRingReader(vt, dataType.String()) + s.VideoReader.StartTs = startVideoTs return } @@ -285,29 +287,27 @@ func (handler *SubscribeHandler[A, V]) checkPublishChanged() { } func (handler *SubscribeHandler[A, V]) sendAudioFrame() (err error) { - if handler.awi >= 0 { - if len(handler.audioFrame.Wraps) > handler.awi { - frame := handler.audioFrame.Wraps[handler.awi] - frameSize := frame.GetSize() - if handler.s.Enabled(handler.s, task.TraceLevel) { - handler.s.Trace("send audio frame", "seq", handler.audioFrame.Sequence, "data", frame.String(), "size", frameSize) - } - err = handler.OnAudio(frame.(A)) - // Calculate BPS - if handler.s.AudioReader != nil { - handler.bytesRead += uint32(frameSize) - now := time.Now() - if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second { - handler.s.AudioReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds()) - handler.bytesRead = 0 - handler.lastBPSTime = now - } - } - } else if handler.s.AudioReader != nil { - handler.s.AudioReader.StopRead() - } - } else { + if handler.awi == 0 { err = handler.OnAudio(any(handler.audioFrame).(A)) + } else if handler.awi > 0 && len(handler.audioFrame.Wraps) > handler.awi-1 { + frame := handler.audioFrame.Wraps[handler.awi-1] + frameSize := frame.GetSize() + if handler.s.Enabled(handler.s, task.TraceLevel) { + handler.s.Trace("send audio frame", "seq", handler.audioFrame.Sequence, "data", frame.String(), "size", frameSize) + } + err = handler.OnAudio(frame.(A)) + // Calculate BPS + if handler.s.AudioReader != nil { + handler.bytesRead += uint32(frameSize) + now := time.Now() + if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second { + handler.s.AudioReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds()) + handler.bytesRead = 0 + handler.lastBPSTime = now + } + } + } else if handler.s.AudioReader != nil { + handler.s.AudioReader.StopRead() } if err != nil && !errors.Is(err, ErrInterrupt) { handler.s.Stop(err) @@ -322,30 +322,29 @@ func (handler *SubscribeHandler[A, V]) sendAudioFrame() (err error) { } func (handler *SubscribeHandler[A, V]) sendVideoFrame() (err error) { - if handler.vwi >= 0 { - if len(handler.videoFrame.Wraps) > handler.vwi { - frame := handler.videoFrame.Wraps[handler.vwi] - frameSize := frame.GetSize() - if handler.s.Enabled(handler.s, task.TraceLevel) { - handler.s.Trace("send video frame", "seq", handler.videoFrame.Sequence, "data", frame.String(), "size", frameSize) - } - err = handler.OnVideo(frame.(V)) - // Calculate BPS - if handler.s.VideoReader != nil { - handler.bytesRead += uint32(frameSize) - now := time.Now() - if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second { - handler.s.VideoReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds()) - handler.bytesRead = 0 - handler.lastBPSTime = now - } - } - } else if handler.s.VideoReader != nil { - handler.s.VideoReader.StopRead() - } - } else { + if handler.vwi == 0 { err = handler.OnVideo(any(handler.videoFrame).(V)) } + if handler.vwi > 0 && len(handler.videoFrame.Wraps) > handler.vwi-1 { + frame := handler.videoFrame.Wraps[handler.vwi-1] + frameSize := frame.GetSize() + if handler.s.Enabled(handler.s, task.TraceLevel) { + handler.s.Trace("send video frame", "seq", handler.videoFrame.Sequence, "data", frame.String(), "size", frameSize) + } + err = handler.OnVideo(frame.(V)) + // Calculate BPS + if handler.s.VideoReader != nil { + handler.bytesRead += uint32(frameSize) + now := time.Now() + if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second { + handler.s.VideoReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds()) + handler.bytesRead = 0 + handler.lastBPSTime = now + } + } + } else if handler.s.VideoReader != nil { + handler.s.VideoReader.StopRead() + } if err != nil && !errors.Is(err, ErrInterrupt) { handler.s.Stop(err) } @@ -415,7 +414,7 @@ func (handler *SubscribeHandler[A, V]) Run() (err error) { if handler.videoFrame.IDR && vr.DecConfChanged() { vr.LastCodecCtx = vr.Track.ICodecCtx if seqFrame := vr.Track.SequenceFrame; seqFrame != nil { - if handler.vwi >= 0 { + if handler.vwi > 0 { err = handler.OnVideo(seqFrame.(V)) } } @@ -475,7 +474,7 @@ func (handler *SubscribeHandler[A, V]) Run() (err error) { if ar.DecConfChanged() { ar.LastCodecCtx = ar.Track.ICodecCtx if seqFrame := ar.Track.SequenceFrame; seqFrame != nil { - if handler.awi >= 0 { + if handler.awi > 0 { err = handler.OnAudio(seqFrame.(A)) } }