mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-12-24 13:48:04 +08:00
fix: wrap index error
This commit is contained in:
127
subscriber.go
127
subscriber.go
@@ -177,49 +177,51 @@ func (s *Subscriber) CheckWebSocket(w http.ResponseWriter, r *http.Request) (con
|
||||
|
||||
func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time.Duration) (awi int) {
|
||||
if s.waitingPublish() || dataType == nil {
|
||||
return
|
||||
return -1
|
||||
}
|
||||
var at *AVTrack
|
||||
if dataType == AVFrameType {
|
||||
at = s.Publisher.AudioTrack.AVTrack
|
||||
awi = -1
|
||||
awi = 0
|
||||
} else {
|
||||
at = s.Publisher.GetAudioTrack(dataType)
|
||||
if at != nil {
|
||||
awi = at.WrapIndex
|
||||
awi = at.WrapIndex + 1
|
||||
}
|
||||
}
|
||||
if at != nil {
|
||||
if err := at.WaitReady(); err != nil {
|
||||
return
|
||||
}
|
||||
s.AudioReader = NewAVRingReader(at, dataType.String())
|
||||
s.AudioReader.StartTs = startAudioTs
|
||||
if at == nil {
|
||||
return -1
|
||||
}
|
||||
if err := at.WaitReady(); err != nil {
|
||||
return -1
|
||||
}
|
||||
s.AudioReader = NewAVRingReader(at, dataType.String())
|
||||
s.AudioReader.StartTs = startAudioTs
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time.Duration) (vwi int) {
|
||||
if s.waitingPublish() || dataType == nil {
|
||||
return
|
||||
return -1
|
||||
}
|
||||
var vt *AVTrack
|
||||
if dataType == AVFrameType {
|
||||
vt = s.Publisher.VideoTrack.AVTrack
|
||||
vwi = -1
|
||||
vwi = 0
|
||||
} else {
|
||||
vt = s.Publisher.GetVideoTrack(dataType)
|
||||
if vt != nil {
|
||||
vwi = vt.WrapIndex
|
||||
vwi = vt.WrapIndex + 1
|
||||
}
|
||||
}
|
||||
if vt != nil {
|
||||
if err := vt.WaitReady(); err != nil {
|
||||
return
|
||||
}
|
||||
s.VideoReader = NewAVRingReader(vt, dataType.String())
|
||||
s.VideoReader.StartTs = startVideoTs
|
||||
if vt == nil {
|
||||
return -1
|
||||
}
|
||||
if err := vt.WaitReady(); err != nil {
|
||||
return -1
|
||||
}
|
||||
s.VideoReader = NewAVRingReader(vt, dataType.String())
|
||||
s.VideoReader.StartTs = startVideoTs
|
||||
return
|
||||
}
|
||||
|
||||
@@ -285,29 +287,27 @@ func (handler *SubscribeHandler[A, V]) checkPublishChanged() {
|
||||
}
|
||||
|
||||
func (handler *SubscribeHandler[A, V]) sendAudioFrame() (err error) {
|
||||
if handler.awi >= 0 {
|
||||
if len(handler.audioFrame.Wraps) > handler.awi {
|
||||
frame := handler.audioFrame.Wraps[handler.awi]
|
||||
frameSize := frame.GetSize()
|
||||
if handler.s.Enabled(handler.s, task.TraceLevel) {
|
||||
handler.s.Trace("send audio frame", "seq", handler.audioFrame.Sequence, "data", frame.String(), "size", frameSize)
|
||||
}
|
||||
err = handler.OnAudio(frame.(A))
|
||||
// Calculate BPS
|
||||
if handler.s.AudioReader != nil {
|
||||
handler.bytesRead += uint32(frameSize)
|
||||
now := time.Now()
|
||||
if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second {
|
||||
handler.s.AudioReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds())
|
||||
handler.bytesRead = 0
|
||||
handler.lastBPSTime = now
|
||||
}
|
||||
}
|
||||
} else if handler.s.AudioReader != nil {
|
||||
handler.s.AudioReader.StopRead()
|
||||
}
|
||||
} else {
|
||||
if handler.awi == 0 {
|
||||
err = handler.OnAudio(any(handler.audioFrame).(A))
|
||||
} else if handler.awi > 0 && len(handler.audioFrame.Wraps) > handler.awi-1 {
|
||||
frame := handler.audioFrame.Wraps[handler.awi-1]
|
||||
frameSize := frame.GetSize()
|
||||
if handler.s.Enabled(handler.s, task.TraceLevel) {
|
||||
handler.s.Trace("send audio frame", "seq", handler.audioFrame.Sequence, "data", frame.String(), "size", frameSize)
|
||||
}
|
||||
err = handler.OnAudio(frame.(A))
|
||||
// Calculate BPS
|
||||
if handler.s.AudioReader != nil {
|
||||
handler.bytesRead += uint32(frameSize)
|
||||
now := time.Now()
|
||||
if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second {
|
||||
handler.s.AudioReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds())
|
||||
handler.bytesRead = 0
|
||||
handler.lastBPSTime = now
|
||||
}
|
||||
}
|
||||
} else if handler.s.AudioReader != nil {
|
||||
handler.s.AudioReader.StopRead()
|
||||
}
|
||||
if err != nil && !errors.Is(err, ErrInterrupt) {
|
||||
handler.s.Stop(err)
|
||||
@@ -322,30 +322,29 @@ func (handler *SubscribeHandler[A, V]) sendAudioFrame() (err error) {
|
||||
}
|
||||
|
||||
func (handler *SubscribeHandler[A, V]) sendVideoFrame() (err error) {
|
||||
if handler.vwi >= 0 {
|
||||
if len(handler.videoFrame.Wraps) > handler.vwi {
|
||||
frame := handler.videoFrame.Wraps[handler.vwi]
|
||||
frameSize := frame.GetSize()
|
||||
if handler.s.Enabled(handler.s, task.TraceLevel) {
|
||||
handler.s.Trace("send video frame", "seq", handler.videoFrame.Sequence, "data", frame.String(), "size", frameSize)
|
||||
}
|
||||
err = handler.OnVideo(frame.(V))
|
||||
// Calculate BPS
|
||||
if handler.s.VideoReader != nil {
|
||||
handler.bytesRead += uint32(frameSize)
|
||||
now := time.Now()
|
||||
if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second {
|
||||
handler.s.VideoReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds())
|
||||
handler.bytesRead = 0
|
||||
handler.lastBPSTime = now
|
||||
}
|
||||
}
|
||||
} else if handler.s.VideoReader != nil {
|
||||
handler.s.VideoReader.StopRead()
|
||||
}
|
||||
} else {
|
||||
if handler.vwi == 0 {
|
||||
err = handler.OnVideo(any(handler.videoFrame).(V))
|
||||
}
|
||||
if handler.vwi > 0 && len(handler.videoFrame.Wraps) > handler.vwi-1 {
|
||||
frame := handler.videoFrame.Wraps[handler.vwi-1]
|
||||
frameSize := frame.GetSize()
|
||||
if handler.s.Enabled(handler.s, task.TraceLevel) {
|
||||
handler.s.Trace("send video frame", "seq", handler.videoFrame.Sequence, "data", frame.String(), "size", frameSize)
|
||||
}
|
||||
err = handler.OnVideo(frame.(V))
|
||||
// Calculate BPS
|
||||
if handler.s.VideoReader != nil {
|
||||
handler.bytesRead += uint32(frameSize)
|
||||
now := time.Now()
|
||||
if elapsed := now.Sub(handler.lastBPSTime); elapsed >= time.Second {
|
||||
handler.s.VideoReader.BPS = uint32(float64(handler.bytesRead) / elapsed.Seconds())
|
||||
handler.bytesRead = 0
|
||||
handler.lastBPSTime = now
|
||||
}
|
||||
}
|
||||
} else if handler.s.VideoReader != nil {
|
||||
handler.s.VideoReader.StopRead()
|
||||
}
|
||||
if err != nil && !errors.Is(err, ErrInterrupt) {
|
||||
handler.s.Stop(err)
|
||||
}
|
||||
@@ -415,7 +414,7 @@ func (handler *SubscribeHandler[A, V]) Run() (err error) {
|
||||
if handler.videoFrame.IDR && vr.DecConfChanged() {
|
||||
vr.LastCodecCtx = vr.Track.ICodecCtx
|
||||
if seqFrame := vr.Track.SequenceFrame; seqFrame != nil {
|
||||
if handler.vwi >= 0 {
|
||||
if handler.vwi > 0 {
|
||||
err = handler.OnVideo(seqFrame.(V))
|
||||
}
|
||||
}
|
||||
@@ -475,7 +474,7 @@ func (handler *SubscribeHandler[A, V]) Run() (err error) {
|
||||
if ar.DecConfChanged() {
|
||||
ar.LastCodecCtx = ar.Track.ICodecCtx
|
||||
if seqFrame := ar.Track.SequenceFrame; seqFrame != nil {
|
||||
if handler.awi >= 0 {
|
||||
if handler.awi > 0 {
|
||||
err = handler.OnAudio(seqFrame.(A))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user