diff --git a/audio_track.go b/audio_track.go index ae47230..f6e788a 100644 --- a/audio_track.go +++ b/audio_track.go @@ -1,8 +1,6 @@ package engine import ( - "bytes" - "context" "github.com/Monibuca/utils/v3/codec" ) @@ -105,9 +103,6 @@ func (at *AudioTrack) pushRaw(ts uint32, payload []byte) { pack.Payload = pack.Bytes() } } - at.Do(func(v interface{}) { - v.(*RingItem).Value.(*AudioPack).Buffer = bytes.NewBuffer([]byte{}) - }) at.PushRaw = func(ts uint32, payload []byte) { pack := at.CurrentValue().(*AudioPack) pack.Timestamp = ts @@ -142,7 +137,7 @@ func (s *Stream) NewAudioTrack(codec byte) (at *AudioTrack) { at.PushByteStream = at.pushByteStream at.PushRaw = at.pushRaw at.Stream = s - at.Init(8) + at.Init(256) at.Do(func(v interface{}) { v.(*RingItem).Value = new(AudioPack) }) @@ -174,19 +169,14 @@ func (at *AudioTrack) SetASC(asc []byte) { at.Stream.AudioTracks.AddTrack("aac", at) } -func (at *AudioTrack) Play(ctx context.Context, onAudio func(AudioPack)) { - streamExit := at.Stream.Context.Done() +func (at *AudioTrack) Play(onAudio func(AudioPack), exit1, exit2 <-chan struct{}) { ar := at.Clone() ap := ar.Read().(*AudioPack) - var extraExit <-chan struct{} - if ctx != nil { - extraExit = ctx.Done() - } for startTimestamp := ap.Timestamp; at.Goon(); ap = ar.Read().(*AudioPack) { select { - case <-extraExit: + case <-exit1: return - case <-streamExit: + case <-exit2: return default: onAudio(ap.Copy(startTimestamp)) diff --git a/base_track.go b/base_track.go index b04d991..a0912b1 100644 --- a/base_track.go +++ b/base_track.go @@ -21,7 +21,7 @@ type AVPack interface { type BasePack struct { Timestamp uint32 Sequence int - *bytes.Buffer + bytes.Buffer Payload []byte } diff --git a/rtp.go b/rtp.go index 6034895..54ba1c3 100644 --- a/rtp.go +++ b/rtp.go @@ -1,8 +1,7 @@ package engine import ( - "sort" - + "github.com/Monibuca/utils/v3" "github.com/Monibuca/utils/v3/codec" "github.com/pion/rtp" ) @@ -49,68 +48,124 @@ func (v *RTPVideo) push(payload []byte) { if err := v.Unmarshal(payload); err != nil { return } + var p *VideoPack t0 := v.Timestamp + tmpVT := v.Stream.NewVideoTrack(0) + tmpVT.ExtraData = v.ExtraData + tmpVT.CodecID = v.CodecID + start := tmpVT.Ring + tmpVT.PushNalu(0, 0, v.Payload) v.Push = func(payload []byte) { if err := v.Unmarshal(payload); err != nil { return } - t1 := (v.Timestamp - t0)/90 - if t1 < vt.Prev().Value.(*RingItem).Value.(*VideoPack).Timestamp { - if vt.WaitIDR.Err() == nil { - return - } - var ts TSSlice - //有B帧 - tmpVT := v.Stream.NewVideoTrack(0) - tmpVT.ExtraData = v.ExtraData - tmpVT.CodecID = v.CodecID - tmpVT.revIDR = func() { - l := ts.Len() - sort.Sort(ts) - start := tmpVT.Move(-l) - for i := 0; i < l; i++ { - vp := start.Value.(*RingItem).Value.(*VideoPack) - pack := vt.current() - pack.IDR = vp.IDR - pack.Timestamp = ts[i] - pack.CompositionTime = vp.Timestamp - ts[i] - pack.NALUs = vp.NALUs - vt.push(pack) - start = start.Next() + t1 := (v.Timestamp - t0) / 90 + utils.Println("video:", t1) + tmpVT.PushNalu(t1, 0, v.Payload) + end := tmpVT.Prev() + if start != end { + for next := start; next != end; next = next.Next() { + vp := next.Value.(*RingItem).Value.(*VideoPack) + vpNext := next.Next().Value.(*RingItem).Value.(*VideoPack) + lastB := false + if p != nil && p.Timestamp < vpNext.Timestamp { + lastB = true } - ts = nil - } - v.Push = func(payload []byte) { - if err := v.Unmarshal(payload); err != nil { - return + if vp.Timestamp > vpNext.Timestamp { + p = vp } - r := tmpVT.Ring - t := (v.Timestamp - t0) / 90 - if tmpVT.PushNalu(t, 0, v.Payload); r != tmpVT.Ring { - ts = append(ts, t) + pack := vt.current() + if p != nil { + if lastB { + pack.Timestamp = p.Timestamp + p = nil + } else { + pack.Timestamp = vpNext.Timestamp + } + pack.CompositionTime = vp.Timestamp - pack.Timestamp + } else { + pack.Timestamp = vp.Timestamp } + pack.NALUs = vp.NALUs + pack.IDR = vp.IDR + vt.push(pack) } - v.Push(payload) - return + start = end } - vt.PushNalu(t1, 0, v.Payload) + // if t1 < vt.Prev().Value.(*RingItem).Value.(*VideoPack).Timestamp { + // if vt.WaitIDR.Err() == nil { + // return + // } + // var buffer, pool = list.New(), list.New() + // var ts TSSlice + // //有B帧 + // tmpVT := v.Stream.NewVideoTrack(0) + // tmpVT.ExtraData = v.ExtraData + // tmpVT.CodecID = v.CodecID + // tmpVT.revIDR = func() { + // l := ts.Len() + // sort.Sort(ts) + // start := tmpVT.Move(-l) + // for i := 0; i < l; i++ { + // vp := start.Value.(*RingItem).Value.(*VideoPack) + // var pack *VideoPack + // if pool.Len() > 0 { + // pack = pool.Remove(pool.Front()).(*VideoPack) + // } else { + // pack = &VideoPack{} + // } + // pack.IDR = vp.IDR + // pack.Timestamp = ts[i] + // pack.CompositionTime = vp.Timestamp - ts[i] + // pack.NALUs = vp.NALUs + // buffer.PushBack(pack) + // start = start.Next() + // } + // ts = ts[:0] + // } + // v.Push = func(payload []byte) { + // if err := v.Unmarshal(payload); err != nil { + // return + // } + // r := tmpVT.Ring + // t := (v.Timestamp - t0) / 90 + // if tmpVT.PushNalu(t, 0, v.Payload); r != tmpVT.Ring { + // ts = append(ts, t) + // if buffer.Len() > 0 { + // vp := buffer.Remove(buffer.Front()).(*VideoPack) + // pack := vt.current() + // pack.IDR = vp.IDR + // pack.Timestamp = vp.Timestamp + // pack.CompositionTime = vp.CompositionTime + // pack.NALUs = vp.NALUs + // vt.push(pack) + // pool.PushBack(vp) + // } + // } + // } + // v.Push(payload) + // return + // } + //vt.PushNalu(t1, 0, v.Payload) } - v.Push(payload) + //v.Push(payload) } func (v *RTPAudio) push(payload []byte) { at := v.AudioTrack if err := v.Unmarshal(payload); err != nil { return } - startTime := v.Timestamp + t0 := v.Timestamp switch at.CodecID { case 10: v.Push = func(payload []byte) { if err := v.Unmarshal(payload); err != nil { return } + t1 := (v.Timestamp - t0) / 90 + utils.Println("audio:", t1) for _, payload = range codec.ParseRTPAAC(v.Payload) { - at.PushRaw((v.Timestamp-startTime)/90, payload) + at.PushRaw(t1, payload) } } case 7, 8: @@ -118,7 +173,7 @@ func (v *RTPAudio) push(payload []byte) { if err := v.Unmarshal(payload); err != nil { return } - at.PushRaw((v.Timestamp-startTime)/8, v.Payload) + at.PushRaw((v.Timestamp-t0)/8, v.Payload) } } v.Push(payload) diff --git a/subscriber.go b/subscriber.go index 6ace459..6c7b41f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/Monibuca/utils/v3" "github.com/pkg/errors" ) @@ -94,6 +95,7 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { case <-streamExit: return default: + utils.Println(vp.Timestamp, ap.Timestamp, ap.Timestamp > vp.Timestamp) if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 { s.OnVideo(vp.Copy(vst)) vr.MoveNext() @@ -107,9 +109,17 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { } } func (s *Subscriber) PlayAudio(at *AudioTrack) { - at.Play(s.Ctx2, s.OnAudio) + if s.Ctx2 != nil { + at.Play(s.OnAudio, s.Done(), s.Ctx2.Done()) + } else { + at.Play(s.OnAudio, s.Done(), nil) + } } func (s *Subscriber) PlayVideo(vt *VideoTrack) { - vt.Play(s.Ctx2, s.OnVideo) + if s.Ctx2 != nil { + vt.Play(s.OnVideo, s.Done(), s.Ctx2.Done()) + } else { + vt.Play(s.OnVideo, s.Done(), nil) + } } diff --git a/video_track.go b/video_track.go index b8ac4d7..0de32b0 100644 --- a/video_track.go +++ b/video_track.go @@ -120,9 +120,6 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) { } pack.Payload = pack.Bytes() } - vt.Do(func(v interface{}) { - v.(*RingItem).Value.(*VideoPack).Buffer = bytes.NewBuffer([]byte{}) - }) switch vt.CodecID { case 7: { @@ -544,11 +541,7 @@ func (vt *VideoTrack) push(pack *VideoPack) { if vt.idrCount == 1 { exRing := ring.New(5) for x := exRing; x.Value == nil; x = x.Next() { - pack := new(VideoPack) - x.Value = &RingItem{Value: pack} - if vt.writeByteStream != nil { - pack.Buffer = bytes.NewBuffer([]byte{}) - } + x.Value = &RingItem{Value: new(VideoPack)} } vt.Link(exRing) // 扩大缓冲环 } else { @@ -558,26 +551,22 @@ func (vt *VideoTrack) push(pack *VideoPack) { vt.Step() } -func (vt *VideoTrack) Play(ctx context.Context, onVideo func(VideoPack)) { - var extraExit <-chan struct{} - if ctx != nil { - extraExit = ctx.Done() - } - streamExit := vt.Stream.Context.Done() +func (vt *VideoTrack) Play(onVideo func(VideoPack), exit1, exit2 <-chan struct{}) { select { case <-vt.WaitIDR.Done(): - case <-streamExit: + case <-exit1: return - case <-extraExit: //可能等不到关键帧就退出了 + case <-exit2: //可能等不到关键帧就退出了 return } vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 vp := vr.Read().(*VideoPack) for startTimestamp := vp.Timestamp; vt.Goon(); vp = vr.Read().(*VideoPack) { + utils.Println(vp.Timestamp) select { - case <-extraExit: + case <-exit1: return - case <-streamExit: + case <-exit2: return default: onVideo(vp.Copy(startTimestamp))