rtp B帧逻辑修改

This commit is contained in:
langhuihui
2021-07-21 23:22:46 +08:00
parent 44c342679e
commit a459993df4
5 changed files with 120 additions and 76 deletions

View File

@@ -1,8 +1,6 @@
package engine package engine
import ( import (
"bytes"
"context"
"github.com/Monibuca/utils/v3/codec" "github.com/Monibuca/utils/v3/codec"
) )
@@ -105,9 +103,6 @@ func (at *AudioTrack) pushRaw(ts uint32, payload []byte) {
pack.Payload = pack.Bytes() pack.Payload = pack.Bytes()
} }
} }
at.Do(func(v interface{}) {
v.(*RingItem).Value.(*AudioPack).Buffer = bytes.NewBuffer([]byte{})
})
at.PushRaw = func(ts uint32, payload []byte) { at.PushRaw = func(ts uint32, payload []byte) {
pack := at.CurrentValue().(*AudioPack) pack := at.CurrentValue().(*AudioPack)
pack.Timestamp = ts pack.Timestamp = ts
@@ -142,7 +137,7 @@ func (s *Stream) NewAudioTrack(codec byte) (at *AudioTrack) {
at.PushByteStream = at.pushByteStream at.PushByteStream = at.pushByteStream
at.PushRaw = at.pushRaw at.PushRaw = at.pushRaw
at.Stream = s at.Stream = s
at.Init(8) at.Init(256)
at.Do(func(v interface{}) { at.Do(func(v interface{}) {
v.(*RingItem).Value = new(AudioPack) v.(*RingItem).Value = new(AudioPack)
}) })
@@ -174,19 +169,14 @@ func (at *AudioTrack) SetASC(asc []byte) {
at.Stream.AudioTracks.AddTrack("aac", at) at.Stream.AudioTracks.AddTrack("aac", at)
} }
func (at *AudioTrack) Play(ctx context.Context, onAudio func(AudioPack)) { func (at *AudioTrack) Play(onAudio func(AudioPack), exit1, exit2 <-chan struct{}) {
streamExit := at.Stream.Context.Done()
ar := at.Clone() ar := at.Clone()
ap := ar.Read().(*AudioPack) ap := ar.Read().(*AudioPack)
var extraExit <-chan struct{}
if ctx != nil {
extraExit = ctx.Done()
}
for startTimestamp := ap.Timestamp; at.Goon(); ap = ar.Read().(*AudioPack) { for startTimestamp := ap.Timestamp; at.Goon(); ap = ar.Read().(*AudioPack) {
select { select {
case <-extraExit: case <-exit1:
return return
case <-streamExit: case <-exit2:
return return
default: default:
onAudio(ap.Copy(startTimestamp)) onAudio(ap.Copy(startTimestamp))

View File

@@ -21,7 +21,7 @@ type AVPack interface {
type BasePack struct { type BasePack struct {
Timestamp uint32 Timestamp uint32
Sequence int Sequence int
*bytes.Buffer bytes.Buffer
Payload []byte Payload []byte
} }

133
rtp.go
View File

@@ -1,8 +1,7 @@
package engine package engine
import ( import (
"sort" "github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec" "github.com/Monibuca/utils/v3/codec"
"github.com/pion/rtp" "github.com/pion/rtp"
) )
@@ -49,68 +48,124 @@ func (v *RTPVideo) push(payload []byte) {
if err := v.Unmarshal(payload); err != nil { if err := v.Unmarshal(payload); err != nil {
return return
} }
var p *VideoPack
t0 := v.Timestamp t0 := v.Timestamp
tmpVT := v.Stream.NewVideoTrack(0)
tmpVT.ExtraData = v.ExtraData
tmpVT.CodecID = v.CodecID
start := tmpVT.Ring
tmpVT.PushNalu(0, 0, v.Payload)
v.Push = func(payload []byte) { v.Push = func(payload []byte) {
if err := v.Unmarshal(payload); err != nil { if err := v.Unmarshal(payload); err != nil {
return return
} }
t1 := (v.Timestamp - t0) / 90 t1 := (v.Timestamp - t0) / 90
if t1 < vt.Prev().Value.(*RingItem).Value.(*VideoPack).Timestamp { utils.Println("video:", t1)
if vt.WaitIDR.Err() == nil { tmpVT.PushNalu(t1, 0, v.Payload)
return end := tmpVT.Prev()
if start != end {
for next := start; next != end; next = next.Next() {
vp := next.Value.(*RingItem).Value.(*VideoPack)
vpNext := next.Next().Value.(*RingItem).Value.(*VideoPack)
lastB := false
if p != nil && p.Timestamp < vpNext.Timestamp {
lastB = true
}
if vp.Timestamp > vpNext.Timestamp {
p = vp
} }
var ts TSSlice
//有B帧
tmpVT := v.Stream.NewVideoTrack(0)
tmpVT.ExtraData = v.ExtraData
tmpVT.CodecID = v.CodecID
tmpVT.revIDR = func() {
l := ts.Len()
sort.Sort(ts)
start := tmpVT.Move(-l)
for i := 0; i < l; i++ {
vp := start.Value.(*RingItem).Value.(*VideoPack)
pack := vt.current() pack := vt.current()
pack.IDR = vp.IDR if p != nil {
pack.Timestamp = ts[i] if lastB {
pack.CompositionTime = vp.Timestamp - ts[i] pack.Timestamp = p.Timestamp
p = nil
} else {
pack.Timestamp = vpNext.Timestamp
}
pack.CompositionTime = vp.Timestamp - pack.Timestamp
} else {
pack.Timestamp = vp.Timestamp
}
pack.NALUs = vp.NALUs pack.NALUs = vp.NALUs
pack.IDR = vp.IDR
vt.push(pack) vt.push(pack)
start = start.Next()
} }
ts = nil start = end
} }
v.Push = func(payload []byte) { // if t1 < vt.Prev().Value.(*RingItem).Value.(*VideoPack).Timestamp {
if err := v.Unmarshal(payload); err != nil { // if vt.WaitIDR.Err() == nil {
return // return
// }
// var buffer, pool = list.New(), list.New()
// var ts TSSlice
// //有B帧
// tmpVT := v.Stream.NewVideoTrack(0)
// tmpVT.ExtraData = v.ExtraData
// tmpVT.CodecID = v.CodecID
// tmpVT.revIDR = func() {
// l := ts.Len()
// sort.Sort(ts)
// start := tmpVT.Move(-l)
// for i := 0; i < l; i++ {
// vp := start.Value.(*RingItem).Value.(*VideoPack)
// var pack *VideoPack
// if pool.Len() > 0 {
// pack = pool.Remove(pool.Front()).(*VideoPack)
// } else {
// pack = &VideoPack{}
// }
// pack.IDR = vp.IDR
// pack.Timestamp = ts[i]
// pack.CompositionTime = vp.Timestamp - ts[i]
// pack.NALUs = vp.NALUs
// buffer.PushBack(pack)
// start = start.Next()
// }
// ts = ts[:0]
// }
// v.Push = func(payload []byte) {
// if err := v.Unmarshal(payload); err != nil {
// return
// }
// r := tmpVT.Ring
// t := (v.Timestamp - t0) / 90
// if tmpVT.PushNalu(t, 0, v.Payload); r != tmpVT.Ring {
// ts = append(ts, t)
// if buffer.Len() > 0 {
// vp := buffer.Remove(buffer.Front()).(*VideoPack)
// pack := vt.current()
// pack.IDR = vp.IDR
// pack.Timestamp = vp.Timestamp
// pack.CompositionTime = vp.CompositionTime
// pack.NALUs = vp.NALUs
// vt.push(pack)
// pool.PushBack(vp)
// }
// }
// }
// v.Push(payload)
// return
// }
//vt.PushNalu(t1, 0, v.Payload)
} }
r := tmpVT.Ring //v.Push(payload)
t := (v.Timestamp - t0) / 90
if tmpVT.PushNalu(t, 0, v.Payload); r != tmpVT.Ring {
ts = append(ts, t)
}
}
v.Push(payload)
return
}
vt.PushNalu(t1, 0, v.Payload)
}
v.Push(payload)
} }
func (v *RTPAudio) push(payload []byte) { func (v *RTPAudio) push(payload []byte) {
at := v.AudioTrack at := v.AudioTrack
if err := v.Unmarshal(payload); err != nil { if err := v.Unmarshal(payload); err != nil {
return return
} }
startTime := v.Timestamp t0 := v.Timestamp
switch at.CodecID { switch at.CodecID {
case 10: case 10:
v.Push = func(payload []byte) { v.Push = func(payload []byte) {
if err := v.Unmarshal(payload); err != nil { if err := v.Unmarshal(payload); err != nil {
return return
} }
t1 := (v.Timestamp - t0) / 90
utils.Println("audio:", t1)
for _, payload = range codec.ParseRTPAAC(v.Payload) { for _, payload = range codec.ParseRTPAAC(v.Payload) {
at.PushRaw((v.Timestamp-startTime)/90, payload) at.PushRaw(t1, payload)
} }
} }
case 7, 8: case 7, 8:
@@ -118,7 +173,7 @@ func (v *RTPAudio) push(payload []byte) {
if err := v.Unmarshal(payload); err != nil { if err := v.Unmarshal(payload); err != nil {
return return
} }
at.PushRaw((v.Timestamp-startTime)/8, v.Payload) at.PushRaw((v.Timestamp-t0)/8, v.Payload)
} }
} }
v.Push(payload) v.Push(payload)

View File

@@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/Monibuca/utils/v3"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@@ -94,6 +95,7 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
case <-streamExit: case <-streamExit:
return return
default: default:
utils.Println(vp.Timestamp, ap.Timestamp, ap.Timestamp > vp.Timestamp)
if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 { if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 {
s.OnVideo(vp.Copy(vst)) s.OnVideo(vp.Copy(vst))
vr.MoveNext() vr.MoveNext()
@@ -107,9 +109,17 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
} }
} }
func (s *Subscriber) PlayAudio(at *AudioTrack) { func (s *Subscriber) PlayAudio(at *AudioTrack) {
at.Play(s.Ctx2, s.OnAudio) if s.Ctx2 != nil {
at.Play(s.OnAudio, s.Done(), s.Ctx2.Done())
} else {
at.Play(s.OnAudio, s.Done(), nil)
}
} }
func (s *Subscriber) PlayVideo(vt *VideoTrack) { func (s *Subscriber) PlayVideo(vt *VideoTrack) {
vt.Play(s.Ctx2, s.OnVideo) if s.Ctx2 != nil {
vt.Play(s.OnVideo, s.Done(), s.Ctx2.Done())
} else {
vt.Play(s.OnVideo, s.Done(), nil)
}
} }

View File

@@ -120,9 +120,6 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) {
} }
pack.Payload = pack.Bytes() pack.Payload = pack.Bytes()
} }
vt.Do(func(v interface{}) {
v.(*RingItem).Value.(*VideoPack).Buffer = bytes.NewBuffer([]byte{})
})
switch vt.CodecID { switch vt.CodecID {
case 7: case 7:
{ {
@@ -544,11 +541,7 @@ func (vt *VideoTrack) push(pack *VideoPack) {
if vt.idrCount == 1 { if vt.idrCount == 1 {
exRing := ring.New(5) exRing := ring.New(5)
for x := exRing; x.Value == nil; x = x.Next() { for x := exRing; x.Value == nil; x = x.Next() {
pack := new(VideoPack) x.Value = &RingItem{Value: new(VideoPack)}
x.Value = &RingItem{Value: pack}
if vt.writeByteStream != nil {
pack.Buffer = bytes.NewBuffer([]byte{})
}
} }
vt.Link(exRing) // 扩大缓冲环 vt.Link(exRing) // 扩大缓冲环
} else { } else {
@@ -558,26 +551,22 @@ func (vt *VideoTrack) push(pack *VideoPack) {
vt.Step() vt.Step()
} }
func (vt *VideoTrack) Play(ctx context.Context, onVideo func(VideoPack)) { func (vt *VideoTrack) Play(onVideo func(VideoPack), exit1, exit2 <-chan struct{}) {
var extraExit <-chan struct{}
if ctx != nil {
extraExit = ctx.Done()
}
streamExit := vt.Stream.Context.Done()
select { select {
case <-vt.WaitIDR.Done(): case <-vt.WaitIDR.Done():
case <-streamExit: case <-exit1:
return return
case <-extraExit: //可能等不到关键帧就退出了 case <-exit2: //可能等不到关键帧就退出了
return return
} }
vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开
vp := vr.Read().(*VideoPack) vp := vr.Read().(*VideoPack)
for startTimestamp := vp.Timestamp; vt.Goon(); vp = vr.Read().(*VideoPack) { for startTimestamp := vp.Timestamp; vt.Goon(); vp = vr.Read().(*VideoPack) {
utils.Println(vp.Timestamp)
select { select {
case <-extraExit: case <-exit1:
return return
case <-streamExit: case <-exit2:
return return
default: default:
onVideo(vp.Copy(startTimestamp)) onVideo(vp.Copy(startTimestamp))