From fd2c7b8cb6b84e4ba9dac25f415f5c4b10f72706 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sun, 26 Feb 2023 10:36:37 +0800 Subject: [PATCH] =?UTF-8?q?VideoFrame=E5=92=8CAudioFrame=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?track=E7=9A=84=E6=8C=87=E9=92=88=E6=96=B9=E4=BE=BF=E8=AE=BF?= =?UTF-8?q?=E9=97=AEtrack=EF=BC=8Crtp=E5=86=99=E5=85=A5=E9=87=87=E7=94=A8?= =?UTF-8?q?=E5=86=85=E5=AD=98=E5=A4=8D=E7=94=A8=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 - common/frame.go | 27 ++++++------- common/index.go | 3 +- config/types.go | 1 - memory-ts.go | 12 ++---- publisher-rtpdump.go | 6 ++- subscriber.go | 9 ++++- track/base.go | 20 ++++++++-- track/rtp.go | 91 ++++++++++---------------------------------- util/pool.go | 3 ++ util/reorder.go | 15 +++----- 11 files changed, 78 insertions(+), 110 deletions(-) diff --git a/README.md b/README.md index 71aa61b..6399436 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,6 @@ global: subvideo: true # 是否订阅视频流 iframeonly: false # 只订阅关键帧 waittimeout: 10 # 等待发布者的秒数,用于订阅尚未发布的流 - rtpreorder : true # 启用RTP包乱序重排 enableavcc : true # 启用AVCC格式缓存,用于rtmp协议 enablertp : true # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议 enableauth: true # 启用鉴权,详细查看鉴权机制 diff --git a/common/frame.go b/common/frame.go index 83d6253..9a65dbc 100644 --- a/common/frame.go +++ b/common/frame.go @@ -21,26 +21,27 @@ func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) { } type RTPFrame struct { - rtp.Packet + *rtp.Packet + Raw []byte } -func (rtp *RTPFrame) Clone() *RTPFrame { - return &RTPFrame{*rtp.Packet.Clone()} +func (r *RTPFrame) H264Type() (naluType codec.H264NALUType) { + return naluType.Parse(r.Payload[0]) +} +func (r *RTPFrame) H265Type() (naluType codec.H265NALUType) { + return naluType.Parse(r.Payload[0]) } -func (rtp *RTPFrame) H264Type() (naluType codec.H264NALUType) { - return naluType.Parse(rtp.Payload[0]) -} -func (rtp *RTPFrame) H265Type() (naluType codec.H265NALUType) { - return naluType.Parse(rtp.Payload[0]) -} - -func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame { - if err := rtp.Packet.Unmarshal(raw); err != nil { +func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame { + if r.Packet == nil { + r.Packet = &rtp.Packet{} + } + r.Raw = raw + if err := r.Packet.Unmarshal(raw); err != nil { log.Error(err) return nil } - return rtp + return r } type BaseFrame struct { diff --git a/common/index.go b/common/index.go index 60b1d3e..b0eb924 100644 --- a/common/index.go +++ b/common/index.go @@ -98,10 +98,11 @@ type AVTrack interface { Attach() Detach() WriteAVCC(ts uint32, frame *util.BLL) error //写入AVCC格式的数据 - WriteRTP([]byte) + WriteRTP(*util.ListItem[RTPFrame]) WriteRTPPack(*rtp.Packet) Flush() SetSpeedLimit(time.Duration) + GetRTPFromPool() *util.ListItem[RTPFrame] } type VideoTrack interface { AVTrack diff --git a/config/types.go b/config/types.go index d1b5090..c7da92e 100755 --- a/config/types.go +++ b/config/types.go @@ -112,7 +112,6 @@ type Engine struct { Publish Subscribe HTTP - RTPReorder bool `default:"true"` EnableAVCC bool `default:"true"` //启用AVCC格式,rtmp协议使用 EnableRTP bool `default:"true"` //启用RTP格式,rtsp、gb18181等协议使用 EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能 diff --git a/memory-ts.go b/memory-ts.go index bbe345e..a0b0a74 100644 --- a/memory-ts.go +++ b/memory-ts.go @@ -8,7 +8,6 @@ import ( "m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec/mpegts" - "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -141,11 +140,11 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M return nil } -func (ts *MemoryTs) WriteAudioFrame(frame *AudioFrame, aac_asc *codec.AudioSpecificConfig, pes *mpegts.MpegtsPESFrame) (err error) { +func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error) { adtsItem := ts.Get(7) defer adtsItem.Recycle() adts := adtsItem.Value - aac_asc.ToADTS(frame.AUList.ByteLength, adts) + frame.AudioSpecificConfig.ToADTS(frame.AUList.ByteLength, adts) // packetLength = 原始音频流长度 + adts(7) + MpegTsOptionalPESHeader长度(8 bytes, 因为只含有pts) pktLength := len(adts) + frame.AUList.ByteLength + 8 var packet mpegts.MpegTsPESPacket @@ -161,17 +160,14 @@ func (ts *MemoryTs) WriteAudioFrame(frame *AudioFrame, aac_asc *codec.AudioSpeci return ts.WritePESPacket(pes, packet) } -func (ts *MemoryTs) WriteVideoFrame(frame *VideoFrame, paramaterSets common.ParamaterSets, pes *mpegts.MpegtsPESFrame) (err error) { +func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error) { var buffer net.Buffers //需要对原始数据(ES),进行一些预处理,视频需要分割nalu(H264编码),并且打上sps,pps,nalu_aud信息. - if len(paramaterSets) == 2 { + if len(frame.ParamaterSets) == 2 { buffer = append(buffer, codec.NALU_AUD_BYTE) } else { buffer = append(buffer, codec.AudNalu) } - if frame.IFrame { - buffer = append(buffer, paramaterSets.GetAnnexB()...) - } buffer = append(buffer, frame.GetAnnexB()...) pktLength := util.SizeOfBuffers(buffer) + 10 + 3 if pktLength > 0xffff { diff --git a/publisher-rtpdump.go b/publisher-rtpdump.go index c93b183..fa9555b 100644 --- a/publisher-rtpdump.go +++ b/publisher-rtpdump.go @@ -7,7 +7,9 @@ import ( "github.com/pion/webrtc/v3/pkg/media/rtpdump" "go.uber.org/zap" "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/common" "m7s.live/engine/v4/track" + "m7s.live/engine/v4/util" ) type RTPDumpPublisher struct { @@ -57,7 +59,9 @@ func (t *RTPDumpPublisher) OnEvent(event any) { return } if !packet.IsRTCP { - t.VideoTrack.WriteRTP(packet.Payload) + var frame common.RTPFrame + frame.Unmarshal(packet.Payload) + t.VideoTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame}) } // t.AudioTrack.WriteRTP(packet) } diff --git a/subscriber.go b/subscriber.go index 0426a7f..c5efb0d 100644 --- a/subscriber.go +++ b/subscriber.go @@ -33,12 +33,14 @@ type VideoDeConf []byte type AudioDeConf []byte type AudioFrame struct { *AVFrame + *track.Audio AbsTime uint32 PTS uint32 DTS uint32 } type VideoFrame struct { *AVFrame + *track.Video AbsTime uint32 PTS uint32 DTS uint32 @@ -69,6 +71,9 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) { } func (v VideoFrame) GetAnnexB() (r net.Buffers) { + if v.IFrame { + r = v.ParamaterSets.GetAnnexB() + } v.AUList.Range(func(au *util.BLL) bool { r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...) return true @@ -196,11 +201,11 @@ func (s *Subscriber) PlayBlock(subType byte) { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { // println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) - spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs}) + spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs}) } sendAudioFrame = func(frame *AVFrame) { // println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) - spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, frame.PTS - s.AudioReader.SkipRTPTs, frame.PTS - s.AudioReader.SkipRTPTs}) + spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, frame.PTS - s.AudioReader.SkipRTPTs, frame.PTS - s.AudioReader.SkipRTPTs}) } case SUBTYPE_RTP: var videoSeq, audioSeq uint16 diff --git a/track/base.go b/track/base.go index 967a0be..2bd13e7 100644 --- a/track/base.go +++ b/track/base.go @@ -4,6 +4,7 @@ import ( "time" "unsafe" + "github.com/pion/rtp" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" @@ -78,9 +79,9 @@ type Media struct { IDRingList `json:"-"` //最近的关键帧位置,首屏渲染 SSRC uint32 SampleRate uint32 - BytesPool util.BytesPool `json:"-"` - rtpPool util.Pool[RTPFrame] - SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) + BytesPool util.BytesPool `json:"-"` + RtpPool util.Pool[RTPFrame] `json:"-"` + SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) SequenceHeadSeq int RTPMuxer RTPDemuxer @@ -89,6 +90,19 @@ type Media struct { 流速控制 } +func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) { + result = av.RtpPool.Get() + if result.Value.Packet == nil { + result.Value.Packet = &rtp.Packet{} + result.Value.PayloadType = av.PayloadType + result.Value.SSRC = av.SSRC + result.Value.Version = 2 + result.Value.Raw = make([]byte, 1460) + result.Value.Payload = result.Value.Raw[:0] + } + return +} + // 毫秒转换为Mpeg时间戳 func (av *Media) Ms2MpegTs(ms uint32) uint32 { return uint32(uint64(ms) * 90) diff --git a/track/rtp.go b/track/rtp.go index 92979a3..211c4f1 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -2,55 +2,31 @@ package track import ( "github.com/pion/rtp" - "go.uber.org/zap" . "m7s.live/engine/v4/common" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" ) const RTPMTU = 1400 -func (av *Media) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) { - if av.PayloadType != p.PayloadType { - av.Warn("RTP PayloadType error", zap.Uint8("want", av.PayloadType), zap.Uint8("got", p.PayloadType)) - return - } - frame = &RTPFrame{ - Packet: *p, - } - av.Value.BytesIn += len(p.Payload) + 12 - return av.recorderRTP(frame) -} - -func (av *Media) UnmarshalRTP(raw []byte) (frame *RTPFrame) { - var p rtp.Packet - err := p.Unmarshal(raw) - if err != nil { - av.Warn("RTP Unmarshal error", zap.Error(err)) - return - } - return av.UnmarshalRTPPacket(&p) -} - -func (av *Media) writeRTPFrame(frame *RTPFrame) { - if len(frame.Payload) == 0 { - return - } - av.Value.RTP.PushValue(*frame) - av.WriteRTPFrame(frame) -} - -// WriteRTPPack 写入已反序列化的RTP包 +// WriteRTPPack 写入已反序列化的RTP包,已经排序过了的 func (av *Media) WriteRTPPack(p *rtp.Packet) { - for frame := av.UnmarshalRTPPacket(p); frame != nil; frame = av.nextRTPFrame() { - av.writeRTPFrame(frame) + var frame RTPFrame + frame.Packet = p + av.Value.BytesIn += len(frame.Payload) + 12 + av.Value.RTP.PushValue(frame) + if len(p.Payload) > 0 { + av.WriteRTPFrame(&frame) } } -// WriteRTP 写入未反序列化的RTP包 -func (av *Media) WriteRTP(raw []byte) { - for frame := av.UnmarshalRTP(raw); frame != nil; frame = av.nextRTPFrame() { - av.writeRTPFrame(frame) +// WriteRTPFrame 写入未反序列化的RTP包, 未排序的 +func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) { + for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() { + av.Value.BytesIn += len(frame.Value.Payload) + 12 + av.Value.RTP.Push(frame) + if len(frame.Value.Payload) > 0 { + av.WriteRTPFrame(&frame.Value) + } } } @@ -60,14 +36,8 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) { packetCount := len(payloads) for i, pp := range payloads { av.rtpSequence++ - rtpItem := av.rtpPool.Get() + rtpItem := av.GetRTPFromPool() packet := &rtpItem.Value - if packet.Payload == nil { - packet.Payload = make([]byte, 0, RTPMTU) - packet.Version = 2 - packet.PayloadType = av.PayloadType - packet.SSRC = av.SSRC - } packet.Payload = packet.Payload[:0] packet.SequenceNumber = av.rtpSequence if av.SampleRate != 90000 { @@ -86,36 +56,17 @@ func (av *Media) PacketizeRTP(payloads ...[][]byte) { type RTPDemuxer struct { lastSeq uint16 //上一个收到的序号,用于乱序重排 lastSeq2 uint16 //记录上上一个收到的序列号 - 乱序重排 util.RTPReorder[*RTPFrame] + 乱序重排 util.RTPReorder[*util.ListItem[RTPFrame]] } // 获取缓存中下一个rtpFrame -func (av *RTPDemuxer) nextRTPFrame() (frame *RTPFrame) { - if config.Global.RTPReorder { - return av.乱序重排.Pop() - } - return +func (av *RTPDemuxer) nextRTPFrame() (frame *util.ListItem[RTPFrame]) { + return av.乱序重排.Pop() } // 对RTP包乱序重排 -func (av *RTPDemuxer) recorderRTP(frame *RTPFrame) *RTPFrame { - if config.Global.RTPReorder { - return av.乱序重排.Push(frame.SequenceNumber, frame) - } else { - if av.lastSeq == 0 { - av.lastSeq = frame.SequenceNumber - } else if frame.SequenceNumber == av.lastSeq2+1 { // 本次序号是上上次的序号+1 说明中间隔了一个错误序号(某些rtsp流中的rtcp包写成了rtp包导致的) - av.lastSeq = frame.SequenceNumber - } else { - av.lastSeq2 = av.lastSeq - av.lastSeq = frame.SequenceNumber - if av.lastSeq != av.lastSeq2+1 { //序号不连续 - // av.Stream.Warn("RTP SequenceNumber error", av.lastSeq2, av.lastSeq) - return frame - } - } - return frame - } +func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) *util.ListItem[RTPFrame] { + return av.乱序重排.Push(item.Value.SequenceNumber, item) } type RTPMuxer struct { diff --git a/util/pool.go b/util/pool.go index 6996c3d..b50a4b3 100644 --- a/util/pool.go +++ b/util/pool.go @@ -5,6 +5,9 @@ import ( "net" ) +type Recyclable interface { + Recycle() +} type BLLReader struct { *ListItem[Buffer] pos int diff --git a/util/reorder.go b/util/reorder.go index 89f19b4..0bafd80 100644 --- a/util/reorder.go +++ b/util/reorder.go @@ -1,18 +1,13 @@ package util -type CloneType[T any] interface { - Clone() T - comparable -} - var RTPReorderBufferLen uint16 = 50 // RTPReorder RTP包乱序重排 -type RTPReorder[T CloneType[T]] struct { +type RTPReorder[T comparable] struct { lastSeq uint16 //最新收到的rtp包序号 queue []T // 缓存队列,0号元素位置代表lastReq+1,永远保持为空 - Total uint32 // 总共收到的包数量 - Drop uint32 // 丢弃的包数量 + Total uint32 // 总共收到的包数量 + Drop uint32 // 丢弃的包数量 } func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) { @@ -48,7 +43,7 @@ func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) { head := p.pop() // 可以放得进去了 if delta == RTPReorderBufferLen { - p.queue[RTPReorderBufferLen-1] = v.Clone() + p.queue[RTPReorderBufferLen-1] = v p.queue[0] = result return head } else if head != result { @@ -57,7 +52,7 @@ func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) { } } else { // 出现后面的包先到达,缓存起来 - p.queue[delta-1] = v.Clone() + p.queue[delta-1] = v return } }