rtp改成链表形式

This commit is contained in:
dexter
2023-02-01 10:42:50 +08:00
parent b27e032435
commit f2847be29f
9 changed files with 128 additions and 71 deletions

View File

@@ -53,9 +53,9 @@ type AVFrame struct {
IFrame bool IFrame bool
PTS uint32 PTS uint32
DTS uint32 DTS uint32
AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
RTP []*RTPFrame `json:"-"` RTP util.List[RTPFrame] `json:"-"`
AUList util.BLLs `json:"-"` // 裸数据 AUList util.BLLs `json:"-"` // 裸数据
mem util.BLL mem util.BLL
CanRead bool `json:"-"` CanRead bool `json:"-"`
} }
@@ -79,15 +79,9 @@ func (av *AVFrame) AppendMem(item *util.ListItem[util.Buffer]) {
av.mem.Push(item) av.mem.Push(item)
} }
func (av *AVFrame) AppendRTP(rtp *RTPFrame) {
av.RTP = append(av.RTP, rtp)
}
// Reset 重置数据,复用内存 // Reset 重置数据,复用内存
func (av *AVFrame) Reset() { func (av *AVFrame) Reset() {
if av.RTP != nil { av.RTP.Recycle()
av.RTP = av.RTP[:0]
}
av.mem.Recycle() av.mem.Recycle()
av.AVCC.Recycle() av.AVCC.Recycle()
av.AUList.Recycle() av.AUList.Recycle()

View File

@@ -192,22 +192,22 @@ func (s *Subscriber) PlayBlock(subType byte) {
case SUBTYPE_RTP: case SUBTYPE_RTP:
var videoSeq, audioSeq uint16 var videoSeq, audioSeq uint16
sendVideoFrame = func(frame *AVFrame) { sendVideoFrame = func(frame *AVFrame) {
for _, p := range frame.RTP { frame.RTP.Range(func(vp RTPFrame) bool {
videoSeq++ videoSeq++
vp := *p
vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90 vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90
vp.Header.SequenceNumber = videoSeq vp.Header.SequenceNumber = videoSeq
spesic.OnEvent((VideoRTP)(vp)) spesic.OnEvent((VideoRTP)(vp))
} return true
})
} }
sendAudioFrame = func(frame *AVFrame) { sendAudioFrame = func(frame *AVFrame) {
for _, p := range frame.RTP { frame.RTP.Range(func(ap RTPFrame) bool {
audioSeq++ audioSeq++
vp := *p ap.Header.SequenceNumber = audioSeq
vp.Header.SequenceNumber = audioSeq ap.Header.Timestamp = ap.Header.Timestamp - s.AudioReader.SkipTs*90
vp.Header.Timestamp = vp.Header.Timestamp - s.AudioReader.SkipTs*90 spesic.OnEvent((AudioRTP)(ap))
spesic.OnEvent((AudioRTP)(vp)) return true
} })
} }
case SUBTYPE_FLV: case SUBTYPE_FLV:
flvHeadCache := make([]byte, 15) //内存复用 flvHeadCache := make([]byte, 15) //内存复用
@@ -258,7 +258,9 @@ func (s *Subscriber) PlayBlock(subType byte) {
hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio
if hasVideo { if hasVideo {
if videoFrame != nil { if videoFrame != nil {
sendVideoFrame(videoFrame) if videoFrame.CanRead {
sendVideoFrame(videoFrame)
}
videoFrame = nil videoFrame = nil
} }
for ctx.Err() == nil { for ctx.Err() == nil {
@@ -270,7 +272,9 @@ func (s *Subscriber) PlayBlock(subType byte) {
} }
if audioFrame != nil { if audioFrame != nil {
if frame.AbsTime > audioFrame.AbsTime { if frame.AbsTime > audioFrame.AbsTime {
sendAudioFrame(audioFrame) if audioFrame.CanRead {
sendAudioFrame(audioFrame)
}
audioFrame = nil audioFrame = nil
} }
} }
@@ -293,7 +297,9 @@ func (s *Subscriber) PlayBlock(subType byte) {
// 正常模式下或者纯音频模式下,音频开始播放 // 正常模式下或者纯音频模式下,音频开始播放
if hasAudio { if hasAudio {
if audioFrame != nil { if audioFrame != nil {
sendAudioFrame(audioFrame) if audioFrame.CanRead {
sendAudioFrame(audioFrame)
}
audioFrame = nil audioFrame = nil
} }
for ctx.Err() == nil { for ctx.Err() == nil {
@@ -315,7 +321,9 @@ func (s *Subscriber) PlayBlock(subType byte) {
} }
if videoFrame != nil { if videoFrame != nil {
if frame.AbsTime > videoFrame.AbsTime { if frame.AbsTime > videoFrame.AbsTime {
sendVideoFrame(videoFrame) if videoFrame.CanRead {
sendVideoFrame(videoFrame)
}
videoFrame = nil videoFrame = nil
} }
} }

View File

@@ -4,7 +4,6 @@ import (
"time" "time"
"unsafe" "unsafe"
"go.uber.org/zap"
. "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config" "m7s.live/engine/v4/config"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
@@ -65,8 +64,8 @@ func (p *IDRingList) AddIDR(IDRing *util.Ring[AVFrame]) {
} }
func (p *IDRingList) ShiftIDR() { func (p *IDRingList) ShiftIDR() {
p.HistoryRing = p.Next.Next.Value
p.Shift() p.Shift()
p.HistoryRing = p.Next.Value
} }
// Media 基础媒体Track类 // Media 基础媒体Track类
@@ -78,7 +77,8 @@ type Media struct {
SSRC uint32 SSRC uint32
PayloadType byte PayloadType byte
BytesPool util.BytesPool `json:"-"` BytesPool util.BytesPool `json:"-"`
SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) rtpPool util.Pool[RTPFrame]
SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
SequenceHeadSeq int SequenceHeadSeq int
RTPMuxer RTPMuxer
RTPDemuxer RTPDemuxer
@@ -163,7 +163,7 @@ func (av *Media) AppendAuBytes(b ...[]byte) {
func (av *Media) narrow(gop int) { func (av *Media) narrow(gop int) {
if l := av.Size - gop - 5; l > 5 { if l := av.Size - gop - 5; l > 5 {
av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-l), zap.String("name", av.Name)) // av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-l), zap.String("name", av.Name))
//缩小缓冲环节省内存 //缩小缓冲环节省内存
av.Reduce(l).Do(func(v AVFrame) { av.Reduce(l).Do(func(v AVFrame) {
v.Reset() v.Reset()
@@ -174,6 +174,9 @@ func (av *Media) narrow(gop int) {
func (av *Media) AddIDR() { func (av *Media) AddIDR() {
if av.Stream.GetPublisherConfig().BufferTime > 0 { if av.Stream.GetPublisherConfig().BufferTime > 0 {
av.IDRingList.AddIDR(av.Ring) av.IDRingList.AddIDR(av.Ring)
if av.HistoryRing == nil {
av.HistoryRing = av.IDRing
}
} else { } else {
av.IDRing = av.Ring av.IDRing = av.Ring
} }
@@ -189,14 +192,14 @@ func (av *Media) Flush() {
// 下一帧为订阅起始帧,即将覆盖,需要扩环 // 下一帧为订阅起始帧,即将覆盖,需要扩环
if nextValue == av.IDRing || nextValue == av.HistoryRing { if nextValue == av.IDRing || nextValue == av.HistoryRing {
// if av.AVRing.Size < 512 { // if av.AVRing.Size < 512 {
av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name)) // av.Stream.Debug("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name))
av.Glow(5) av.Glow(5)
// } else { // } else {
// av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name)) // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
// } // }
} }
// 补完RTP // 补完RTP
if config.Global.EnableRTP && len(curValue.RTP) == 0 { if config.Global.EnableRTP && curValue.RTP.Length == 0 {
av.CompleteRTP(curValue) av.CompleteRTP(curValue)
} }
// 补完AVCC // 补完AVCC

View File

@@ -128,7 +128,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
// RTP格式补完 // RTP格式补完
func (vt *H264) CompleteRTP(value *AVFrame) { func (vt *H264) CompleteRTP(value *AVFrame) {
if len(value.RTP) > 0 { if value.RTP.Length > 0 {
if !vt.dcChanged && value.IFrame { if !vt.dcChanged && value.IFrame {
vt.insertDCRtp() vt.insertDCRtp()
} }

View File

@@ -120,7 +120,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
// RTP格式补完 // RTP格式补完
func (vt *H265) CompleteRTP(value *AVFrame) { func (vt *H265) CompleteRTP(value *AVFrame) {
if len(value.RTP) > 0 { if value.RTP.Length > 0 {
if !vt.dcChanged && value.IFrame { if !vt.dcChanged && value.IFrame {
vt.insertDCRtp() vt.insertDCRtp()
} }

View File

@@ -33,7 +33,7 @@ func (av *Media) UnmarshalRTP(raw []byte) (frame *RTPFrame) {
} }
func (av *Media) writeRTPFrame(frame *RTPFrame) { func (av *Media) writeRTPFrame(frame *RTPFrame) {
av.Value.AppendRTP(frame) av.Value.RTP.PushValue(*frame)
av.WriteRTPFrame(frame) av.WriteRTPFrame(frame)
if frame.Marker { if frame.Marker {
av.SpesificTrack.generateTimestamp(frame.Timestamp) av.SpesificTrack.generateTimestamp(frame.Timestamp)
@@ -59,31 +59,24 @@ func (av *Media) WriteRTP(raw []byte) {
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets // Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
func (av *Media) PacketizeRTP(payloads ...[][]byte) { func (av *Media) PacketizeRTP(payloads ...[][]byte) {
packetCount := len(payloads) packetCount := len(payloads)
if cap(av.Value.RTP) < packetCount {
av.Value.RTP = make([]*RTPFrame, packetCount)
} else {
av.Value.RTP = av.Value.RTP[:packetCount]
}
for i, pp := range payloads { for i, pp := range payloads {
av.rtpSequence++ av.rtpSequence++
packet := av.Value.RTP[i] rtpItem := av.rtpPool.Get()
if packet == nil { packet := &rtpItem.Value
packet = &RTPFrame{} if packet.Payload == nil {
av.Value.RTP[i] = packet packet.Payload = make([]byte, 0, RTPMTU)
packet.Version = 2 packet.Version = 2
packet.PayloadType = av.PayloadType packet.PayloadType = av.PayloadType
packet.SSRC = av.SSRC packet.SSRC = av.SSRC
} }
item := av.BytesPool.Get(RTPMTU) packet.Payload = packet.Payload[:0]
av.Value.AppendMem(item)
packet.Payload = item.Value[:0]
// packet.Payload = packet.Payload[:0]
packet.SequenceNumber = av.rtpSequence packet.SequenceNumber = av.rtpSequence
packet.Timestamp = av.Value.PTS packet.Timestamp = av.Value.PTS
packet.Marker = i == packetCount-1 packet.Marker = i == packetCount-1
for _, p := range pp { for _, p := range pp {
packet.Payload = append(packet.Payload, p...) packet.Payload = append(packet.Payload, p...)
} }
av.Value.RTP.Push(rtpItem)
} }
} }

View File

@@ -71,7 +71,7 @@ func (vt *Video) computeGOP() {
vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence) vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence)
vt.narrow(vt.GOP) vt.narrow(vt.GOP)
} }
vt.IDRing = vt.Ring vt.AddIDR()
// var n int // var n int
// for i := 0; i < len(vt.BytesPool); i++ { // for i := 0; i < len(vt.BytesPool); i++ {
// n += vt.BytesPool[i].Length // n += vt.BytesPool[i].Length
@@ -149,33 +149,24 @@ func (vt *Video) WriteSliceByte(b ...byte) {
// 在I帧前面插入sps pps webrtc需要 // 在I帧前面插入sps pps webrtc需要
func (av *Video) insertDCRtp() { func (av *Video) insertDCRtp() {
seq := av.Value.RTP[0].SequenceNumber head := av.Value.RTP.Next
l1, l2 := len(av.ParamaterSets), len(av.Value.RTP) seq := head.Value.SequenceNumber
afterLen := l1 + l2 for _, nalu := range av.ParamaterSets {
if cap(av.Value.RTP) < afterLen { var packet RTPFrame
rtps := make([]*RTPFrame, l1, afterLen)
av.Value.RTP = append(rtps, av.Value.RTP...)
} else {
av.Value.RTP = av.Value.RTP[:afterLen]
copy(av.Value.RTP[l1:], av.Value.RTP[:l2])
}
for i, nalu := range av.ParamaterSets {
packet := &RTPFrame{}
packet.Version = 2 packet.Version = 2
packet.PayloadType = av.PayloadType packet.PayloadType = av.PayloadType
packet.Payload = nalu packet.Payload = nalu
packet.SSRC = av.SSRC packet.SSRC = av.SSRC
packet.SequenceNumber = seq
packet.Timestamp = av.Value.PTS packet.Timestamp = av.Value.PTS
packet.Marker = false packet.Marker = false
seq++ head.InsertBeforeValue(packet)
av.rtpSequence++ av.rtpSequence++
av.Value.RTP[i] = packet
} }
for i := l1; i < afterLen; i++ { av.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool {
av.Value.RTP[i].SequenceNumber = seq item.Value.SequenceNumber = seq
seq++ seq++
} return true
})
} }
func (av *Video) generateTimestamp(ts uint32) { func (av *Video) generateTimestamp(ts uint32) {

View File

@@ -15,6 +15,40 @@ type ListItem[T any] struct {
list *List[T] list *List[T]
} }
func (item *ListItem[T]) InsertBefore(insert *ListItem[T]) {
if insert.list != nil {
panic("item already in list")
}
insert.list = item.list
insert.Pre = item.Pre
insert.Next = item
item.Pre.Next = insert
item.Pre = insert
item.list.Length++
}
func (item *ListItem[T]) InsertBeforeValue(value T) (result *ListItem[T]) {
result = &ListItem[T]{Value: value}
item.InsertBefore(result)
return
}
func (item *ListItem[T]) InsertAfter(insert *ListItem[T]) {
if insert.list != nil {
panic("item already in list")
}
insert.list = item.list
insert.Next = item.Next
insert.Pre = item
item.Next.Pre = insert
item.Next = insert
item.list.Length++
}
func (item *ListItem[T]) InsertAfterValue(value T) (result *ListItem[T]) {
result = &ListItem[T]{Value: value}
item.InsertAfter(result)
return
}
func (item *ListItem[T]) IsRoot() bool { func (item *ListItem[T]) IsRoot() bool {
return &item.list.ListItem == item return &item.list.ListItem == item
} }
@@ -53,13 +87,37 @@ func (p *List[T]) Push(item *ListItem[T]) {
p.Pre = &p.ListItem p.Pre = &p.ListItem
p.ListItem.list = p p.ListItem.list = p
} }
item.list = p p.Pre.InsertAfter(item)
item.Next = &p.ListItem // item.list = p
item.Pre = p.Pre // item.Next = &p.ListItem
// p.Value = item.Value // item.Pre = p.Pre
p.Pre.Next = item // // p.Value = item.Value
p.Pre = p.Pre.Next // p.Pre.Next = item
p.Length++ // p.Pre = p.Pre.Next
// p.Length++
}
func (p *List[T]) UnshiftValue(value T) {
p.Unshift(&ListItem[T]{Value: value})
}
func (p *List[T]) Unshift(item *ListItem[T]) {
if item.list != nil {
panic("item already in list")
}
if p.Length == 0 {
p.Next = &p.ListItem
p.Pre = &p.ListItem
p.ListItem.list = p
}
p.Next.InsertBefore(item)
// item.list = p
// item.Next = p.Next
// item.Pre = &p.ListItem
// // p.Value = item.Value
// p.Next.Pre = item
// p.Next = p.Next.Pre
// p.Length++
} }
func (p *List[T]) ShiftValue() T { func (p *List[T]) ShiftValue() T {

View File

@@ -278,3 +278,13 @@ func (p BytesPool) Get(size int) (item *ListItem[Buffer]) {
} }
return return
} }
type Pool[T any] List[T]
func (p *Pool[T]) Get() (item *ListItem[T]) {
item = (*List[T])(p).PoolShift()
if item == nil {
item = &ListItem[T]{}
}
return
}