From b0dcecdebcd43a35d2e3293461fd9c2ba952ee55 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Wed, 18 Jan 2023 18:08:59 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84Track=E5=86=99=E5=85=A5?= =?UTF-8?q?=E6=9E=B6=E6=9E=84=EF=BC=8C=E5=A2=9E=E5=8A=A0sub=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E9=A1=B9=E5=8F=AF=E9=85=8D=E7=BD=AE=E6=8C=87=E5=AE=9A?= =?UTF-8?q?track=E8=AE=A2=E9=98=85=E7=9A=84=E5=8F=82=E6=95=B0=E5=90=8D=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=A6=96=E6=AC=A1=E5=86=99=E5=85=A5AbsTime?= =?UTF-8?q?=E7=9A=84=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- codec/h264.go | 4 +- codec/h265.go | 4 ++ common/frame.go | 22 ++----- common/index.go | 11 ++-- config/types.go | 32 ++++++++--- publisher-ts.go | 3 +- stream.go | 6 +- subscriber.go | 24 ++++---- track/aac.go | 94 ++++++++++++++++++------------ track/audio.go | 44 ++++++-------- track/base.go | 75 ++++++++---------------- track/g711.go | 26 ++++----- track/h264.go | 146 ++++++++++++++++++++--------------------------- track/h265.go | 148 ++++++++++++++++++++---------------------------- track/rtp.go | 50 +++++++++++++--- track/video.go | 138 ++++++++++++++++++++++---------------------- 16 files changed, 396 insertions(+), 431 deletions(-) diff --git a/codec/h264.go b/codec/h264.go index 6f15672..d78b781 100644 --- a/codec/h264.go +++ b/codec/h264.go @@ -40,7 +40,9 @@ func (b H264NALUType) Offset() int { func (b H264NALUType) Byte() byte { return byte(b) } - +func ParseH264NALUType(b byte) H264NALUType { + return H264NALUType(b & 0x1F) +} func (H264NALUType) Parse(b byte) H264NALUType { return H264NALUType(b & 0x1F) } diff --git a/codec/h265.go b/codec/h265.go index 22d8e54..0340d2a 100644 --- a/codec/h265.go +++ b/codec/h265.go @@ -14,6 +14,10 @@ func (H265NALUType) Parse(b byte) H265NALUType { return H265NALUType(b & 0x7E >> 1) } +func ParseH265NALUType(b byte) H265NALUType { + return H265NALUType(b & 0x7E >> 1) +} + const ( // HEVC_VPS = 0x40 // HEVC_SPS = 0x42 diff --git a/common/frame.go b/common/frame.go index bac25a7..987f0bc 100644 --- a/common/frame.go +++ b/common/frame.go @@ -10,18 +10,6 @@ import ( ) type NALUSlice net.Buffers - -// type H264Slice NALUSlice -// type H265Slice NALUSlice - -// type H264NALU []H264Slice -// type H265NALU []H265Slice - -type AudioSlice []byte - -// type AACSlice AudioSlice -// type G711Slice AudioSlice - // 裸数据片段 type RawSlice interface { ~[][]byte | ~[]byte @@ -43,10 +31,13 @@ func (nalu NALUSlice) Bytes() (b []byte) { return } -func (nalu *NALUSlice) Reset() *NALUSlice { +func (nalu *NALUSlice) Reset(b ...[]byte) *NALUSlice { if len(*nalu) > 0 { *nalu = (*nalu)[:0] } + if len(b) > 0 { + *nalu = append(*nalu, b...) + } return nalu } @@ -111,12 +102,11 @@ type DataFrame[T any] struct { type AVFrame[T RawSlice] struct { BaseFrame IFrame bool - SEI T PTS uint32 DTS uint32 - AVCC net.Buffers `json:"-"` // 打包好的AVCC格式 + AVCC net.Buffers `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) RTP []*RTPFrame `json:"-"` - Raw []T `json:"-"` // 裸数据 + Raw []T `json:"-"` // 裸数据,通常代表Access Unit canRead bool } diff --git a/common/index.go b/common/index.go index 61b0b01..b84649a 100644 --- a/common/index.go +++ b/common/index.go @@ -80,17 +80,16 @@ type VideoTrack interface { GetDecoderConfiguration() DecoderConfiguration[NALUSlice] CurrentFrame() *AVFrame[NALUSlice] PreFrame() *AVFrame[NALUSlice] - WriteSlice(NALUSlice) + WriteSliceBytes(slice []byte) WriteAnnexB(uint32, uint32, AnnexBFrame) SetLostFlag() } type AudioTrack interface { AVTrack - GetDecoderConfiguration() DecoderConfiguration[AudioSlice] - CurrentFrame() *AVFrame[AudioSlice] - PreFrame() *AVFrame[AudioSlice] - WriteSlice(AudioSlice) + GetDecoderConfiguration() DecoderConfiguration[[]byte] + CurrentFrame() *AVFrame[[]byte] + PreFrame() *AVFrame[[]byte] WriteADTS([]byte) - WriteRaw(uint32, AudioSlice) + WriteRaw(uint32, []byte) } diff --git a/config/types.go b/config/types.go index ead482c..c1b2f17 100755 --- a/config/types.go +++ b/config/types.go @@ -40,13 +40,16 @@ func (c *Publish) GetPublishConfig() *Publish { } type Subscribe struct { - SubAudio bool - SubVideo bool - SubAudioTracks []string // 指定订阅的音频轨道 - SubVideoTracks []string // 指定订阅的视频轨道 - LiveMode bool // 实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后调到该帧。 - IFrameOnly bool // 只要关键帧 - WaitTimeout int // 等待流超时 + SubAudio bool + SubVideo bool + SubVideoArgName string // 指定订阅的视频轨道参数名 + SubAudioArgName string // 指定订阅的音频轨道参数名 + SubDataArgName string // 指定订阅的数据轨道参数名 + SubAudioTracks []string // 指定订阅的音频轨道 + SubVideoTracks []string // 指定订阅的视频轨道 + LiveMode bool // 实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后调到该帧。 + IFrameOnly bool // 只要关键帧 + WaitTimeout int // 等待流超时 } func (c *Subscribe) GetSubscribeConfig() *Subscribe { @@ -117,8 +120,19 @@ type Engine struct { } var Global = &Engine{ - Publish: Publish{true, true, false, 10, 0, 0}, - Subscribe: Subscribe{true, true, nil, nil, true, false, 10}, + Publish: Publish{true, true, false, 10, 0, 0}, + Subscribe: Subscribe{ + SubAudio: true, + SubVideo: true, + SubVideoArgName: "vts", + SubAudioArgName: "ats", + SubDataArgName: "dts", + SubAudioTracks: nil, + SubVideoTracks: nil, + LiveMode: true, + IFrameOnly: false, + WaitTimeout: 10, + }, HTTP: HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux}, RTPReorder: true, EnableAVCC: true, diff --git a/publisher-ts.go b/publisher-ts.go index 7942f9e..788acaa 100644 --- a/publisher-ts.go +++ b/publisher-ts.go @@ -84,8 +84,7 @@ func (t *TSPublisher) ReadPES() { if frameLen > remainLen { break } - - t.AudioTrack.WriteSlice(pes.Payload[7:frameLen]) + t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload[:frameLen]) pes.Payload = pes.Payload[frameLen:remainLen] remainLen -= frameLen } diff --git a/stream.go b/stream.go index 96a22ed..a8d346a 100644 --- a/stream.go +++ b/stream.go @@ -387,21 +387,21 @@ func (s *Stream) run() { waits := &waitTracks{ Promise: v, } - if ats := io.Args.Get("ats"); ats != "" { + if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" { waits.audio.Wait(strings.Split(ats, ",")...) } else if len(sbConfig.SubAudioTracks) > 0 { waits.audio.Wait(sbConfig.SubAudioTracks...) } else if sbConfig.SubAudio { waits.audio.Wait() } - if vts := io.Args.Get("vts"); vts != "" { + if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" { waits.video.Wait(strings.Split(vts, ",")...) } else if len(sbConfig.SubVideoTracks) > 0 { waits.video.Wait(sbConfig.SubVideoTracks...) } else if sbConfig.SubVideo { waits.video.Wait() } - if dts := io.Args.Get("dts"); dts != "" { + if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" { waits.data.Wait(strings.Split(dts, ",")...) } else { // waits.data.Wait() diff --git a/subscriber.go b/subscriber.go index 2d59cdf..622cb31 100644 --- a/subscriber.go +++ b/subscriber.go @@ -27,9 +27,9 @@ const ( SUBSTATE_NORMAL ) -type AudioFrame AVFrame[AudioSlice] +type AudioFrame AVFrame[[]byte] type VideoFrame AVFrame[NALUSlice] -type AudioDeConf DecoderConfiguration[AudioSlice] +type AudioDeConf DecoderConfiguration[[]byte] type VideoDeConf DecoderConfiguration[NALUSlice] type FLVFrame net.Buffers type AudioRTP RTPFrame @@ -47,10 +47,6 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) { // return append(r, b...) // } func (v *VideoFrame) GetAnnexB() (r net.Buffers) { - if v.SEI != nil { - r = append(r, codec.NALU_Delimiter2) - r = append(r, v.SEI...) - } r = append(r, codec.NALU_Delimiter2) for i, nalu := range v.Raw { if i > 0 { @@ -103,7 +99,7 @@ func (p *PlayContext[T, R]) decConfChanged() bool { type TrackPlayer struct { context.Context `json:"-"` context.CancelFunc `json:"-"` - Audio PlayContext[*track.Audio, AudioSlice] + Audio PlayContext[*track.Audio, []byte] Video PlayContext[*track.Video, NALUSlice] SkipTS uint32 //跳过的时间戳 FirstAbsTS uint32 //订阅起始时间戳 @@ -115,7 +111,7 @@ func (tp *TrackPlayer) ReadVideo() (vp *AVFrame[NALUSlice]) { return } -func (tp *TrackPlayer) ReadAudio() (ap *AVFrame[AudioSlice]) { +func (tp *TrackPlayer) ReadAudio() (ap *AVFrame[[]byte]) { ap = tp.Audio.ring.Read(tp.Context) tp.Audio.Frame = ap return @@ -197,19 +193,19 @@ func (s *Subscriber) PlayBlock(subType byte) { s.Video.confSeq = s.Video.Track.DecoderConfiguration.Seq spesic.OnEvent(VideoDeConf(s.Video.Track.DecoderConfiguration)) } - sendAudioDecConf := func(frame *AVFrame[AudioSlice]) { + sendAudioDecConf := func(frame *AVFrame[[]byte]) { s.Audio.confSeq = s.Audio.Track.DecoderConfiguration.Seq spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration)) } var sendVideoFrame func(*AVFrame[NALUSlice]) - var sendAudioFrame func(*AVFrame[AudioSlice]) + var sendAudioFrame func(*AVFrame[[]byte]) switch subType { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame[NALUSlice]) { // println(frame.Sequence, frame.AbsTime, frame.PTS, frame.DTS, frame.IFrame) spesic.OnEvent((*VideoFrame)(frame)) } - sendAudioFrame = func(frame *AVFrame[AudioSlice]) { + sendAudioFrame = func(frame *AVFrame[[]byte]) { spesic.OnEvent((*AudioFrame)(frame)) } case SUBTYPE_RTP: @@ -223,7 +219,7 @@ func (s *Subscriber) PlayBlock(subType byte) { spesic.OnEvent((VideoRTP)(vp)) } } - sendAudioFrame = func(frame *AVFrame[AudioSlice]) { + sendAudioFrame = func(frame *AVFrame[[]byte]) { for _, p := range frame.RTP { audioSeq++ vp := *p @@ -249,7 +245,7 @@ func (s *Subscriber) PlayBlock(subType byte) { sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, s.Video.Track.DecoderConfiguration.AVCC) // spesic.OnEvent(FLVFrame(copyBuffers(s.Video.Track.DecoderConfiguration.FLV))) } - sendAudioDecConf = func(frame *AVFrame[AudioSlice]) { + sendAudioDecConf = func(frame *AVFrame[[]byte]) { s.Audio.confSeq = s.Audio.Track.DecoderConfiguration.Seq ts := s.SkipTS if frame != nil { @@ -262,7 +258,7 @@ func (s *Subscriber) PlayBlock(subType byte) { // println(frame.Sequence, frame.AbsTime, frame.DeltaTime, frame.IFrame) sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, frame.AVCC) } - sendAudioFrame = func(frame *AVFrame[AudioSlice]) { + sendAudioFrame = func(frame *AVFrame[[]byte]) { sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, frame.AbsTime, frame.AVCC) } } diff --git a/track/aac.go b/track/aac.go index d4cfa84..a0d59b5 100644 --- a/track/aac.go +++ b/track/aac.go @@ -10,9 +10,15 @@ import ( "m7s.live/engine/v4/util" ) +var _ SpesificTrack[[]byte] = (*AAC)(nil) + func NewAAC(stream IStream) (aac *AAC) { - aac = &AAC{} + aac = &AAC{ + sizeLength: 13, + Mode: 2, + } aac.CodecID = codec.CodecID_AAC + aac.Channels = 2 aac.SampleSize = 16 aac.SetStuff("aac", stream, int(32), byte(97), aac, time.Millisecond*10) aac.AVCCHead = []byte{0xAF, 1} @@ -21,29 +27,49 @@ func NewAAC(stream IStream) (aac *AAC) { type AAC struct { Audio - buffer []byte + sizeLength int // 通常为13 + Mode int // 1为lbr,2为hbr + lack int // 用于处理不完整的AU,缺少的字节数 } -func (aac *AAC) writeRTPFrame(frame *RTPFrame) { - aac.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame) - auHeaderLen := util.ReadBE[int](frame.Payload[:2]) >> 3 - startOffset := 2 + auHeaderLen - if !frame.Marker { - aac.buffer = append(aac.buffer, frame.Payload[startOffset:]...) +// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 +func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { + auHeaderLen := util.ReadBE[int](frame.Payload[:aac.Mode]) >> 3 //通常为2,即一个AU Header的长度 + // auHeaderCount := auHeaderLen >> 1 // AU Header的个数, 通常为1 + if auHeaderLen == 0 { + aac.Value.AppendRaw(frame.Payload) } else { - if aac.buffer != nil { - aac.buffer = append(append([]byte{}, frame.Payload...), aac.buffer...) - } else { - aac.buffer = frame.Payload + startOffset := aac.Mode + auHeaderLen // 实际数据开始的位置 + if aac.lack > 0 { + rawLen := len(aac.Value.Raw) + if rawLen == 0 { + aac.Stream.Error("lack >0 but rawlen=0") + } + last := util.Buffer(aac.Value.Raw[rawLen-1]) + auLen := len(frame.Payload) - startOffset + if aac.lack > auLen { + last.Write(frame.Payload[startOffset:]) + aac.lack -= auLen + return + } else if aac.lack < auLen { + aac.Stream.Warn("lack < auLen", zap.Int("lack", aac.lack), zap.Int("auLen", auLen)) + } + last.Write(frame.Payload[startOffset : startOffset+aac.lack]) + aac.lack = 0 + return } - for iIndex := 2; iIndex <= auHeaderLen; iIndex += 2 { - auLen := util.ReadBE[int](aac.buffer[iIndex:iIndex+2]) >> 3 - aac.WriteSlice(aac.buffer[startOffset : startOffset+auLen]) - startOffset += auLen + for iIndex := aac.Mode; iIndex <= auHeaderLen; iIndex += aac.Mode { + auLen := util.ReadBE[int](frame.Payload[iIndex:iIndex+aac.Mode]) >> (8*aac.Mode - aac.sizeLength) //取高13bit代表AU的长度 + nextPos := startOffset + auLen + if len(frame.Payload) < nextPos { + aac.lack = nextPos - len(frame.Payload) + aac.Value.AppendRaw(frame.Payload[startOffset:]) + break + } else { + aac.Value.AppendRaw(frame.Payload[startOffset:nextPos]) + } + startOffset = nextPos } - aac.generateTimestamp() - aac.Flush() - aac.buffer = nil } } @@ -58,28 +84,24 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) { aac.Profile = (config1 & 0xF8) >> 3 aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.Audio.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) - aac.Audio.DecoderConfiguration.Raw = AudioSlice(frame[2:]) + aac.Audio.DecoderConfiguration.Raw = frame[2:] aac.Attach() } else { - aac.WriteSlice(AudioSlice(frame[2:])) + aac.Value.AppendRaw(frame[2:]) aac.Audio.WriteAVCC(ts, frame) - aac.Flush() } } -func (aac *AAC) Flush() { - // RTP格式补完 - value := aac.Audio.Media.RingBuffer.Value - if aac.ComplementRTP() { - l := util.SizeOfBuffers(value.Raw) - var packet = make(net.Buffers, len(value.Raw)+1) - //AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。 - packet[0] = []byte{0x00, 0x10, (byte)((l & 0x1fe0) >> 5), (byte)((l & 0x1f) << 3)} - for i, raw := range value.Raw { - packet[i+1] = raw - } - packets := util.SplitBuffers(packet, 1200) - aac.PacketizeRTP(packets...) +func (aac *AAC) CompleteRTP(value *AVFrame[[]byte]) { + l := util.SizeOfBuffers(value.Raw) + //AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。 + auHeaderLen := []byte{0x00, 0x10, (byte)((l & 0x1fe0) >> 5), (byte)((l & 0x1f) << 3)} // 3 = 16-13, 5 = 8-3 + packets := util.SplitBuffers(value.Raw, 1200) + for i, packet := range packets { + expand := append(packet, nil) + copy(expand[1:], packet) + expand[0] = auHeaderLen + packets[i] = expand } - aac.Audio.Flush() + aac.PacketizeRTP(packets...) } diff --git a/track/audio.go b/track/audio.go index 481af1a..1eaee64 100644 --- a/track/audio.go +++ b/track/audio.go @@ -7,11 +7,8 @@ import ( . "m7s.live/engine/v4/common" ) -var adcflv1 = []byte{codec.FLV_TAG_TYPE_AUDIO, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0} -var adcflv2 = []byte{0, 0, 0, 15} - type Audio struct { - Media[AudioSlice] + Media[[]byte] CodecID codec.AudioCodecID Channels byte SampleSize byte @@ -75,7 +72,7 @@ func (a *Audio) WriteADTS(adts []byte) { a.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) a.Channels = channel avcc := []byte{0xAF, 0x00, config1, config2} - a.DecoderConfiguration = DecoderConfiguration[AudioSlice]{ + a.DecoderConfiguration = DecoderConfiguration[[]byte]{ 97, net.Buffers{avcc}, avcc[2:], @@ -84,41 +81,32 @@ func (a *Audio) WriteADTS(adts []byte) { a.Attach() } -func (av *Audio) WriteRaw(pts uint32, raw AudioSlice) { +func (av *Audio) WriteRaw(pts uint32, raw []byte) { curValue := &av.Value curValue.BytesIn += len(raw) if len(av.AVCCHead) == 2 { raw = raw[7:] //AAC 去掉7个字节的ADTS头 } - av.WriteSlice(raw) - curValue.DTS = pts - curValue.PTS = pts + curValue.AppendRaw(raw) + av.generateTimestamp(pts) av.Flush() } func (av *Audio) WriteAVCC(ts uint32, frame AVCCFrame) { - curValue := &av.AVRing.RingBuffer.Value + curValue := &av.Value curValue.BytesIn += len(frame) curValue.AppendAVCC(frame) - curValue.DTS = ts * 90 - curValue.PTS = curValue.DTS + av.generateTimestamp(ts * 90) + av.Flush() } -func (a *Audio) Flush() { - // AVCC 格式补完 - value := &a.Value - if a.ComplementAVCC() { - value.AppendAVCC(a.AVCCHead) - for _, raw := range value.Raw { - value.AppendAVCC(raw) - } +func (a *Audio) CompleteAVCC(value *AVFrame[[]byte]) { + value.AppendAVCC(a.AVCCHead) + for _, raw := range value.Raw { + value.AppendAVCC(raw) } - if a.ComplementRTP() { - var packet = make(net.Buffers, len(value.Raw)) - for i, raw := range value.Raw { - packet[i] = raw - } - a.PacketizeRTP(packet) - } - a.Media.Flush() +} + +func (a *Audio) CompleteRTP(value *AVFrame[[]byte]) { + a.PacketizeRTP(value.Raw) } diff --git a/track/base.go b/track/base.go index 09c37aa..d721710 100644 --- a/track/base.go +++ b/track/base.go @@ -44,6 +44,15 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) { } } +type SpesificTrack[T RawSlice] interface { + CompleteRTP(*AVFrame[T]) + CompleteAVCC(*AVFrame[T]) + WriteSliceBytes([]byte) + WriteRTPFrame(*RTPFrame) + generateTimestamp(uint32) + Flush() +} + // Media 基础媒体Track类 type Media[T RawSlice] struct { Base @@ -53,6 +62,7 @@ type Media[T RawSlice] struct { DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) RTPMuxer RTPDemuxer + SpesificTrack[T] `json:"-"` 流速控制 } @@ -77,8 +87,8 @@ func (av *Media[T]) SetStuff(stuff ...any) { av.DecoderConfiguration.PayloadType = v case IStream: av.Stream = v - case RTPWriter: - av.RTPWriter = v + case SpesificTrack[T]: + av.SpesificTrack = v } } } @@ -107,20 +117,15 @@ func (av *Media[T]) GetDecoderConfiguration() DecoderConfiguration[T] { } func (av *Media[T]) CurrentFrame() *AVFrame[T] { - return &av.AVRing.RingBuffer.Value + return &av.Value } func (av *Media[T]) PreFrame() *AVFrame[T] { - return av.AVRing.RingBuffer.LastValue + return av.LastValue } -func (av *Media[T]) generateTimestamp() { - ts := av.AVRing.RingBuffer.Value.RTP[0].Timestamp - av.AVRing.RingBuffer.Value.PTS = ts - av.AVRing.RingBuffer.Value.DTS = ts -} - -func (av *Media[T]) WriteSlice(slice T) { - av.Value.AppendRaw(slice) +func (av *Media[T]) generateTimestamp(ts uint32) { + av.Value.PTS = ts + av.Value.DTS = ts } func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) { @@ -135,7 +140,14 @@ func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) { func (av *Media[T]) Flush() { curValue, preValue := &av.Value, av.LastValue + if config.Global.EnableRTP && len(curValue.RTP) == 0 { + av.CompleteRTP(curValue) + } + if config.Global.EnableAVCC && len(curValue.AVCC) == 0 { + av.CompleteAVCC(curValue) + } if av.起始时间.IsZero() { + curValue.AbsTime = curValue.DTS / 90 av.重置(curValue.AbsTime) } else { curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90 @@ -148,42 +160,3 @@ func (av *Media[T]) Flush() { } av.Step() } - -func (av *Media[T]) ComplementAVCC() bool { - return config.Global.EnableAVCC && len(av.Value.AVCC) == 0 -} - -// 是否需要补完RTP格式 -func (av *Media[T]) ComplementRTP() bool { - return config.Global.EnableRTP && len(av.Value.RTP) == 0 -} - -// https://www.cnblogs.com/moonwalk/p/15903760.html -// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets -func (av *Media[T]) PacketizeRTP(payloads ...[][]byte) { - packetCount := len(payloads) - if cap(av.Value.RTP) < packetCount { - av.Value.RTP = make([]*RTPFrame, packetCount) - } else { - av.Value.RTP = av.Value.RTP[:packetCount] - } - for i, pp := range payloads { - av.rtpSequence++ - packet := av.Value.RTP[i] - if packet == nil { - packet = &RTPFrame{} - av.Value.RTP[i] = packet - packet.Version = 2 - packet.PayloadType = av.DecoderConfiguration.PayloadType - packet.Payload = make([]byte, 0, 1200) - packet.SSRC = av.SSRC - } - packet.Payload = packet.Payload[:0] - packet.SequenceNumber = av.rtpSequence - packet.Timestamp = av.Value.PTS - packet.Marker = i == packetCount-1 - for _, p := range pp { - packet.Payload = append(packet.Payload, p...) - } - } -} diff --git a/track/g711.go b/track/g711.go index 89e5181..f304283 100644 --- a/track/g711.go +++ b/track/g711.go @@ -11,18 +11,20 @@ import ( func NewG711(stream IStream, alaw bool) (g711 *G711) { g711 = &G711{} if alaw { - g711.Audio.Name = "pcma" + g711.Name = "pcma" } else { - g711.Audio.Name = "pcmu" + g711.Name = "pcmu" } if alaw { - g711.Audio.CodecID = codec.CodecID_PCMA + g711.CodecID = codec.CodecID_PCMA } else { - g711.Audio.CodecID = codec.CodecID_PCMU + g711.CodecID = codec.CodecID_PCMU } - g711.Audio.SampleSize = 8 + g711.SampleSize = 8 + g711.Channels = 1 + g711.AVCCHead = []byte{(byte(g711.CodecID) << 4) | (1 << 1)} g711.SetStuff(stream, int(32), byte(97), uint32(8000), g711, time.Millisecond*10) - g711.Audio.Attach() + g711.Attach() return } @@ -35,16 +37,10 @@ func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) { g711.Stream.Error("AVCC data too short", zap.ByteString("data", frame)) return } - g711.WriteSlice(AudioSlice(frame[1:])) + g711.Value.AppendRaw(frame[1:]) g711.Audio.WriteAVCC(ts, frame) - g711.Flush() } -func (g711 *G711) writeRTPFrame(frame *RTPFrame) { - g711.WriteSlice(frame.Payload) - g711.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame) - if frame.Marker { - g711.generateTimestamp() - g711.Flush() - } +func (g711 *G711) WriteRTPFrame(frame *RTPFrame) { + g711.Value.AppendRaw(frame.Payload) } diff --git a/track/h264.go b/track/h264.go index 86ec3ca..8f57588 100644 --- a/track/h264.go +++ b/track/h264.go @@ -7,10 +7,11 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" ) +var _ SpesificTrack[NALUSlice] = (*H264)(nil) + type H264 struct { Video } @@ -23,30 +24,17 @@ func NewH264(stream IStream) (vt *H264) { vt.dtsEst = NewDTSEstimator() return } -func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { - if dts == 0 { - vt.generateTimestamp(pts) - } else { - vt.Value.PTS = pts - vt.Value.DTS = dts - } - for _, slice := range vt.Video.WriteAnnexB(frame) { - vt.WriteSlice(slice) - } - if len(vt.Value.Raw) > 0 { - vt.Flush() - } - // println(vt.FPS) -} -func (vt *H264) WriteSlice(slice NALUSlice) { - // print(slice.H264Type()) - switch slice.H264Type() { + +func (vt *H264) WriteSliceBytes(slice []byte) { + naluType := codec.ParseH264NALUType(slice[0]) + // println("naluType", naluType) + switch naluType { case codec.NALU_SPS: - vt.SPSInfo, _ = codec.ParseSPS(slice[0]) - vt.Video.DecoderConfiguration.Raw[0] = slice[0] + vt.SPSInfo, _ = codec.ParseSPS(slice) + vt.Video.DecoderConfiguration.Raw[0] = slice case codec.NALU_PPS: vt.dcChanged = true - vt.Video.DecoderConfiguration.Raw[1] = slice[0] + vt.Video.DecoderConfiguration.Raw[1] = slice lenSPS := len(vt.Video.DecoderConfiguration.Raw[0]) lenPPS := len(vt.Video.DecoderConfiguration.Raw[1]) if lenSPS > 3 { @@ -61,15 +49,18 @@ func (vt *H264) WriteSlice(slice NALUSlice) { vt.Video.DecoderConfiguration.Seq++ case codec.NALU_IDR_Picture: vt.Value.IFrame = true - vt.Video.WriteSlice(slice) + vt.WriteRawBytes(slice) case codec.NALU_Non_IDR_Picture, codec.NALU_Data_Partition_A, codec.NALU_Data_Partition_B, codec.NALU_Data_Partition_C: vt.Value.IFrame = false - vt.Video.WriteSlice(slice) + vt.WriteRawBytes(slice) case codec.NALU_SEI: - vt.Value.SEI = slice + vt.WriteRawBytes(slice) + case codec.NALU_Access_Unit_Delimiter: + default: + vt.Stream.Error("H264 WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType))) } } @@ -94,21 +85,20 @@ func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) { } } else { vt.Video.WriteAVCC(ts, frame) - vt.Video.Media.RingBuffer.Value.IFrame = frame.IsIDR() - vt.Flush() } } -func (vt *H264) writeRTPFrame(frame *RTPFrame) { - rv := &vt.Video.Media.RingBuffer.Value + +func (vt *H264) WriteRTPFrame(frame *RTPFrame) { + rv := &vt.Value if naluType := frame.H264Type(); naluType < 24 { - vt.WriteSlice(NALUSlice{frame.Payload}) + vt.WriteSliceBytes(frame.Payload) } else { switch naluType { case codec.NALU_STAPA, codec.NALU_STAPB: for buffer := util.Buffer(frame.Payload[naluType.Offset():]); buffer.CanRead(); { nextSize := int(buffer.ReadUint16()) if buffer.Len() >= nextSize { - vt.WriteSlice(NALUSlice{buffer.ReadN(nextSize)}) + vt.WriteSliceBytes(buffer.ReadN(nextSize)) } else { vt.Stream.Error("invalid nalu size", zap.Int("naluType", int(naluType))) return @@ -116,7 +106,7 @@ func (vt *H264) writeRTPFrame(frame *RTPFrame) { } case codec.NALU_FUA, codec.NALU_FUB: if util.Bit1(frame.Payload[1], 0) { - rv.AppendRaw(NALUSlice{[]byte{naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)}}) + vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)) } // 最后一个是半包缓存,用于拼接 lastIndex := len(rv.Raw) - 1 @@ -124,65 +114,49 @@ func (vt *H264) writeRTPFrame(frame *RTPFrame) { return } rv.Raw[lastIndex].Append(frame.Payload[naluType.Offset():]) - if util.Bit1(frame.Payload[1], 1) { - complete := rv.Raw[lastIndex] //拼接完成 - rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去 - vt.WriteSlice(complete) - } + // if util.Bit1(frame.Payload[1], 1) { + // complete := rv.Raw[lastIndex] //拼接完成 + + // } } } frame.SequenceNumber += vt.rtpSequence //增加偏移,需要增加rtp包后需要顺延 - rv.AppendRTP(frame) - if frame.Marker { - vt.generateTimestamp(frame.Timestamp) - vt.Flush() - } } -func (vt *H264) Flush() { - if vt.Video.Media.RingBuffer.Value.IFrame { - vt.Video.ComputeGOP() - } - if vt.Attached == 0 && vt.IDRing != nil && vt.DecoderConfiguration.Seq > 0 { - defer vt.Attach() - } - // RTP格式补完 - if config.Global.EnableRTP { - if len(vt.Value.RTP) > 0 { - if !vt.dcChanged && vt.Value.IFrame { - vt.insertDCRtp() - } - } else { - var out [][][]byte - if vt.Value.IFrame { - out = append(out, [][]byte{vt.DecoderConfiguration.Raw[0]}, [][]byte{vt.DecoderConfiguration.Raw[1]}) - } - for _, nalu := range vt.Value.Raw { - buffers := util.SplitBuffers(nalu, 1200) - firstBuffer := NALUSlice(buffers[0]) - if l := len(buffers); l == 1 { - out = append(out, firstBuffer) - } else { - naluType := firstBuffer.H264Type() - firstByte := codec.NALU_FUA.Or(firstBuffer.RefIdc()) - buf := [][]byte{{firstByte, naluType.Or(1 << 7)}} - for i, sp := range firstBuffer { - if i == 0 { - sp = sp[1:] - } - buf = append(buf, sp) - } - out = append(out, buf) - for _, bufs := range buffers[1:] { - buf = append([][]byte{{firstByte, naluType.Byte()}}, bufs...) - out = append(out, buf) - } - buf[0][1] |= 1 << 6 // set end bit - } - } - - vt.PacketizeRTP(out...) +// RTP格式补完 +func (vt *H264) CompleteRTP(value *AVFrame[NALUSlice]) { + if len(value.RTP) > 0 { + if !vt.dcChanged && value.IFrame { + vt.insertDCRtp() } + } else { + var out [][][]byte + if value.IFrame { + out = append(out, [][]byte{vt.DecoderConfiguration.Raw[0]}, [][]byte{vt.DecoderConfiguration.Raw[1]}) + } + for _, nalu := range value.Raw { + buffers := util.SplitBuffers(nalu, 1200) + firstBuffer := NALUSlice(buffers[0]) + if l := len(buffers); l == 1 { + out = append(out, firstBuffer) + } else { + naluType := firstBuffer.H264Type() + firstByte := codec.NALU_FUA.Or(firstBuffer.RefIdc()) + buf := [][]byte{{firstByte, naluType.Or(1 << 7)}} + for i, sp := range firstBuffer { + if i == 0 { + sp = sp[1:] + } + buf = append(buf, sp) + } + out = append(out, buf) + for _, bufs := range buffers[1:] { + buf = append([][]byte{{firstByte, naluType.Byte()}}, bufs...) + out = append(out, buf) + } + buf[0][1] |= 1 << 6 // set end bit + } + } + vt.PacketizeRTP(out...) } - vt.Video.Flush() } diff --git a/track/h265.go b/track/h265.go index a48aa68..85e6e64 100644 --- a/track/h265.go +++ b/track/h265.go @@ -7,10 +7,11 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" ) +var _ SpesificTrack[NALUSlice] = (*H265)(nil) + type H265 struct { Video } @@ -23,32 +24,17 @@ func NewH265(stream IStream) (vt *H265) { vt.dtsEst = NewDTSEstimator() return } -func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { - if dts == 0 { - vt.generateTimestamp(pts) - } else { - vt.Video.Media.RingBuffer.Value.PTS = pts - vt.Video.Media.RingBuffer.Value.DTS = dts - } - // println(pts,dts,len(frame)) - for _, slice := range vt.Video.WriteAnnexB(frame) { - vt.WriteSlice(slice) - } - if len(vt.Value.Raw) > 0 { - vt.Flush() - } -} -func (vt *H265) WriteSlice(slice NALUSlice) { - // println(slice.H265Type()) - switch slice.H265Type() { + +func (vt *H265) WriteSliceBytes(slice []byte) { + switch t := codec.ParseH265NALUType(slice[0]); t { case codec.NAL_UNIT_VPS: - vt.Video.DecoderConfiguration.Raw[0] = slice[0] + vt.Video.DecoderConfiguration.Raw[0] = slice case codec.NAL_UNIT_SPS: - vt.Video.DecoderConfiguration.Raw[1] = slice[0] - vt.Video.SPSInfo, _ = codec.ParseHevcSPS(slice[0]) + vt.Video.DecoderConfiguration.Raw[1] = slice + vt.Video.SPSInfo, _ = codec.ParseHevcSPS(slice) case codec.NAL_UNIT_PPS: vt.Video.dcChanged = true - vt.Video.DecoderConfiguration.Raw[2] = slice[0] + vt.Video.DecoderConfiguration.Raw[2] = slice extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.Video.DecoderConfiguration.Raw[0], vt.Video.DecoderConfiguration.Raw[1], vt.Video.DecoderConfiguration.Raw[2]) if err == nil { vt.Video.DecoderConfiguration.AVCC = net.Buffers{extraData} @@ -62,16 +48,17 @@ func (vt *H265) WriteSlice(slice NALUSlice) { codec.NAL_UNIT_CODED_SLICE_IDR_N_LP, codec.NAL_UNIT_CODED_SLICE_CRA: vt.Value.IFrame = true - vt.Video.WriteSlice(slice) + vt.WriteRawBytes(slice) case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9: vt.Value.IFrame = false - vt.Video.WriteSlice(slice) + vt.WriteRawBytes(slice) case codec.NAL_UNIT_SEI: - vt.Value.SEI = slice + vt.WriteRawBytes(slice) default: - vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(slice.H265Type()))) + vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(t))) } } + func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) { if len(frame) < 6 { vt.Stream.Error("AVCC data too short", zap.ByteString("data", frame)) @@ -93,13 +80,11 @@ func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) { } } else { vt.Video.WriteAVCC(ts, frame) - vt.Video.Media.RingBuffer.Value.IFrame = frame.IsIDR() - vt.Flush() } } -func (vt *H265) writeRTPFrame(frame *RTPFrame) { - rv := &vt.Video.Media.RingBuffer.Value +func (vt *H265) WriteRTPFrame(frame *RTPFrame) { + rv := &vt.Value // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. var usingDonlField bool var buffer = util.Buffer(frame.Payload) @@ -110,7 +95,7 @@ func (vt *H265) writeRTPFrame(frame *RTPFrame) { buffer.ReadUint16() } for buffer.CanRead() { - vt.WriteSlice(NALUSlice{buffer.ReadN(int(buffer.ReadUint16()))}) + vt.WriteSliceBytes(buffer.ReadN(int(buffer.ReadUint16()))) if usingDonlField { buffer.ReadByte() } @@ -122,72 +107,59 @@ func (vt *H265) writeRTPFrame(frame *RTPFrame) { buffer.ReadUint16() } if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) { - rv.AppendRaw(NALUSlice{[]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]}}) + vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1]) } lastIndex := len(rv.Raw) - 1 if lastIndex == -1 { return } rv.Raw[lastIndex].Append(buffer) - if util.Bit1(fuHeader, 1) { - complete := rv.Raw[lastIndex] //拼接完成 - rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去 - vt.WriteSlice(complete) - } + // if util.Bit1(fuHeader, 1) { + // complete := rv.Raw[lastIndex] //拼接完成 + // rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去 + // vt.WriteSlice(complete) + // } default: - vt.WriteSlice(NALUSlice{frame.Payload}) + vt.WriteSliceBytes(frame.Payload) } frame.SequenceNumber += vt.rtpSequence //增加偏移,需要增加rtp包后需要顺延 - rv.AppendRTP(frame) - if frame.Marker { - vt.Video.generateTimestamp(frame.Timestamp) - vt.Flush() - } } -func (vt *H265) Flush() { - if vt.Video.Media.RingBuffer.Value.IFrame { - vt.Video.ComputeGOP() - } - if vt.Attached == 0 && vt.IDRing != nil && vt.DecoderConfiguration.Seq > 0 { - defer vt.Attach() - } - // RTP格式补完 - if config.Global.EnableRTP { - if len(vt.Value.RTP) > 0 { - if !vt.dcChanged && vt.Value.IFrame { - vt.insertDCRtp() - } - } else { - // H265打包: https://blog.csdn.net/fanyun_01/article/details/114234290 - var out [][][]byte - if vt.Value.IFrame { - out = append(out, [][]byte{vt.DecoderConfiguration.Raw[0]}, [][]byte{vt.DecoderConfiguration.Raw[1]}, [][]byte{vt.DecoderConfiguration.Raw[2]}) - } - for _, nalu := range vt.Video.Media.RingBuffer.Value.Raw { - buffers := util.SplitBuffers(nalu, 1200) - firstBuffer := NALUSlice(buffers[0]) - if l := len(buffers); l == 1 { - out = append(out, firstBuffer) - } else { - naluType := firstBuffer.H265Type() - firstByte := (byte(codec.NAL_UNIT_RTP_FU) << 1) | (firstBuffer[0][0] & 0b10000001) - buf := [][]byte{{firstByte, firstBuffer[0][1], (1 << 7) | byte(naluType)}} - for i, sp := range firstBuffer { - if i == 0 { - sp = sp[2:] - } - buf = append(buf, sp) - } - out = append(out, buf) - for _, bufs := range buffers[1:] { - buf = append([][]byte{{firstByte, firstBuffer[0][1], byte(naluType)}}, bufs...) - out = append(out, buf) - } - buf[0][2] |= 1 << 6 // set end bit - } - } - vt.PacketizeRTP(out...) + +// RTP格式补完 +func (vt *H265) CompleteRTP(value *AVFrame[NALUSlice]) { + if len(value.RTP) > 0 { + if !vt.dcChanged && value.IFrame { + vt.insertDCRtp() } + } else { + // H265打包: https://blog.csdn.net/fanyun_01/article/details/114234290 + var out [][][]byte + if value.IFrame { + out = append(out, [][]byte{vt.DecoderConfiguration.Raw[0]}, [][]byte{vt.DecoderConfiguration.Raw[1]}, [][]byte{vt.DecoderConfiguration.Raw[2]}) + } + for _, nalu := range vt.Video.Media.RingBuffer.Value.Raw { + buffers := util.SplitBuffers(nalu, 1200) + firstBuffer := NALUSlice(buffers[0]) + if l := len(buffers); l == 1 { + out = append(out, firstBuffer) + } else { + naluType := firstBuffer.H265Type() + firstByte := (byte(codec.NAL_UNIT_RTP_FU) << 1) | (firstBuffer[0][0] & 0b10000001) + buf := [][]byte{{firstByte, firstBuffer[0][1], (1 << 7) | byte(naluType)}} + for i, sp := range firstBuffer { + if i == 0 { + sp = sp[2:] + } + buf = append(buf, sp) + } + out = append(out, buf) + for _, bufs := range buffers[1:] { + buf = append([][]byte{{firstByte, firstBuffer[0][1], byte(naluType)}}, bufs...) + out = append(out, buf) + } + buf[0][2] |= 1 << 6 // set end bit + } + } + vt.PacketizeRTP(out...) } - vt.Video.Flush() } diff --git a/track/rtp.go b/track/rtp.go index 9ddccdc..394fc17 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -8,10 +8,6 @@ import ( "m7s.live/engine/v4/util" ) -type RTPWriter interface { - writeRTPFrame(frame *RTPFrame) -} - func (av *Media[T]) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) { if av.DecoderConfiguration.PayloadType != p.PayloadType { av.Stream.Warn("RTP PayloadType error", zap.Uint8("want", av.DecoderConfiguration.PayloadType), zap.Uint8("got", p.PayloadType)) @@ -34,6 +30,15 @@ func (av *Media[T]) UnmarshalRTP(raw []byte) (frame *RTPFrame) { return av.UnmarshalRTPPacket(&p) } +func (av *Media[T]) writeRTPFrame(frame *RTPFrame) { + av.Value.AppendRTP(frame) + av.WriteRTPFrame(frame) + if frame.Marker { + av.SpesificTrack.generateTimestamp(frame.Timestamp) + av.SpesificTrack.Flush() + } +} + // WriteRTPPack 写入已反序列化的RTP包 func (av *Media[T]) WriteRTPPack(p *rtp.Packet) { for frame := av.UnmarshalRTPPacket(p); frame != nil; frame = av.nextRTPFrame() { @@ -48,11 +53,40 @@ func (av *Media[T]) WriteRTP(raw []byte) { } } +// https://www.cnblogs.com/moonwalk/p/15903760.html +// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets +func (av *Media[T]) PacketizeRTP(payloads ...[][]byte) { + packetCount := len(payloads) + if cap(av.Value.RTP) < packetCount { + av.Value.RTP = make([]*RTPFrame, packetCount) + } else { + av.Value.RTP = av.Value.RTP[:packetCount] + } + for i, pp := range payloads { + av.rtpSequence++ + packet := av.Value.RTP[i] + if packet == nil { + packet = &RTPFrame{} + av.Value.RTP[i] = packet + packet.Version = 2 + packet.PayloadType = av.DecoderConfiguration.PayloadType + packet.Payload = make([]byte, 0, 1200) + packet.SSRC = av.SSRC + } + packet.Payload = packet.Payload[:0] + packet.SequenceNumber = av.rtpSequence + packet.Timestamp = av.Value.PTS + packet.Marker = i == packetCount-1 + for _, p := range pp { + packet.Payload = append(packet.Payload, p...) + } + } +} + type RTPDemuxer struct { - lastSeq uint16 //上一个收到的序号,用于乱序重排 - lastSeq2 uint16 //记录上上一个收到的序列号 - 乱序重排 util.RTPReorder[*RTPFrame] - RTPWriter `json:"-"` + lastSeq uint16 //上一个收到的序号,用于乱序重排 + lastSeq2 uint16 //记录上上一个收到的序列号 + 乱序重排 util.RTPReorder[*RTPFrame] } // 获取缓存中下一个rtpFrame diff --git a/track/video.go b/track/video.go index b9f7622..027de3f 100644 --- a/track/video.go +++ b/track/video.go @@ -72,10 +72,6 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e data = append(data, codec.NALU_Delimiter2, nalu) } } - if vp.SEI != nil { - data = append(data, codec.NALU_Delimiter2) - data = append(data, vp.SEI...) - } data = append(data, codec.NALU_Delimiter2) for i, nalu := range vp.Raw { if i > 0 { @@ -90,7 +86,7 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e } return ctx.Err() } -func (vt *Video) ComputeGOP() { +func (vt *Video) computeGOP() { vt.idrCount++ if vt.IDRing != nil { vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence) @@ -109,38 +105,32 @@ func (vt *Video) ComputeGOP() { vt.IDRing = vt.AVRing.RingBuffer.Ring } -func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame, s *[]NALUSlice) { - for len(annexb) > 0 { - before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1) - if !found { - *s = append(*s, NALUSlice{annexb}) - return - } - if len(before) > 0 { - *s = append(*s, NALUSlice{before}) - } - annexb = after +func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame) { + for found, after := true, annexb; len(annexb) > 0 && found; annexb = after { + annexb, after, found = bytes.Cut(annexb, codec.NALU_Delimiter1) + vt.WriteSliceBytes(annexb) } } -func (vt *Video) WriteAnnexB(frame AnnexBFrame) (s []NALUSlice) { +func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { + if dts == 0 { + vt.generateTimestamp(pts) + } else { + vt.Value.PTS = pts + vt.Value.DTS = dts + } vt.Value.BytesIn += len(frame) - for len(frame) > 0 { - before, after, found := bytes.Cut(frame, codec.NALU_Delimiter2) - if !found { - vt.writeAnnexBSlice(frame, &s) - return - } - if len(before) > 0 { - vt.writeAnnexBSlice(AnnexBFrame(before), &s) - } - frame = after + for found, after := true, frame; len(frame) > 0 && found; frame = after { + frame, after, found = bytes.Cut(frame, codec.NALU_Delimiter2) + vt.writeAnnexBSlice(frame) + } + if len(vt.Value.Raw) > 0 { + vt.Flush() } - return } - func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { + vt.Value.IFrame = frame.IsIDR() vt.Media.WriteAVCC(ts, frame) for nalus := frame[5:]; len(nalus) > vt.nalulenSize; { nalulen := util.ReadBE[int](nalus[:vt.nalulenSize]) @@ -149,18 +139,26 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) { return } if end := nalulen + vt.nalulenSize; len(nalus) >= end { - slice := nalus[vt.nalulenSize:end] - if _rawSlice := util.MallocSlice(&vt.AVRing.Value.Raw); _rawSlice == nil { - vt.Value.AppendRaw(NALUSlice{slice}) - } else { - _rawSlice.Reset().Append(slice) - } + vt.WriteRawBytes(nalus[vt.nalulenSize:end]) nalus = nalus[end:] } else { vt.Stream.Error("WriteAVCC", zap.Int("len", len(nalus)), zap.Int("naluLenSize", vt.nalulenSize), zap.Int("end", end)) break } } + vt.Flush() +} + +func (vt *Video) WriteSliceByte(b ...byte) { + vt.WriteSliceBytes(b) +} + +func (vt *Video) WriteRawBytes(slice []byte) { + if naluSlice := util.MallocSlice(&vt.AVRing.Value.Raw); naluSlice == nil { + vt.Value.AppendRaw(NALUSlice{slice}) + } else { + naluSlice.Reset(slice) + } } // 在I帧前面插入sps pps webrtc需要 @@ -195,15 +193,46 @@ func (av *Video) insertDCRtp() { } func (av *Video) generateTimestamp(ts uint32) { - av.AVRing.RingBuffer.Value.PTS = ts - av.AVRing.RingBuffer.Value.DTS = av.dtsEst.Feed(ts) + av.Value.PTS = ts + av.Value.DTS = av.dtsEst.Feed(ts) } func (vt *Video) SetLostFlag() { vt.lostFlag = true } +func (vt *Video) CompleteAVCC(rv *AVFrame[NALUSlice]) { + var b util.Buffer + if cap(rv.AVCC) > 0 { + if avcc := rv.AVCC[:1]; len(avcc[0]) == 5 { + b = util.Buffer(avcc[0]) + } + } + if b == nil { + b = util.Buffer([]byte{0, 1, 0, 0, 0}) + } + if rv.IFrame { + b[0] = 0x10 | byte(vt.CodecID) + } else { + b[0] = 0x20 | byte(vt.CodecID) + } + // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS) + // 写入CTS + util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90) + lengths := b.Malloc(len(rv.Raw) * 4) //每个slice的长度内存复用 + rv.AppendAVCC(b.SubBuf(0, 5)) + for i, nalu := range rv.Raw { + rv.AppendAVCC(util.PutBE(lengths.SubBuf(i*4, 4), util.SizeOfBuffers(nalu))) + rv.AppendAVCC(nalu...) + } +} func (vt *Video) Flush() { + if vt.Value.IFrame { + vt.computeGOP() + } + if vt.Attached == 0 && vt.IDRing != nil && vt.DecoderConfiguration.Seq > 0 { + defer vt.Attach() + } rv := &vt.Value // 没有实际媒体数据 if len(rv.Raw) == 0 { @@ -219,37 +248,10 @@ func (vt *Video) Flush() { return } } - - // AVCC格式补完 - if vt.ComplementAVCC() { - var b util.Buffer - if cap(rv.AVCC) > 0 { - if avcc := rv.AVCC[:1]; len(avcc[0]) == 5 { - b = util.Buffer(avcc[0]) - } - } - if b == nil { - b = util.Buffer([]byte{0, 1, 0, 0, 0}) - } - if rv.IFrame { - b[0] = 0x10 | byte(vt.CodecID) - } else { - b[0] = 0x20 | byte(vt.CodecID) - } - // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS) - // 写入CTS - util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90) - lengths := b.Malloc(len(rv.Raw) * 4) //每个slice的长度内存复用 - rv.AppendAVCC(b.SubBuf(0, 5)) - for i, nalu := range rv.Raw { - rv.AppendAVCC(util.PutBE(lengths.SubBuf(i*4, 4), util.SizeOfBuffers(nalu))) - rv.AppendAVCC(nalu...) - } - } - // 下一帧为I帧,即将覆盖 - if vt.Next().Value.IFrame { - // 仅存一枚I帧,需要扩环 - if vt.idrCount == 1 { + // 仅存一枚I帧 + if vt.idrCount == 1 { + // 下一帧为I帧,即将覆盖,需要扩环 + if vt.Next().Value.IFrame { if vt.AVRing.RingBuffer.Size < 256 { vt.Link(util.NewRing[AVFrame[NALUSlice]](5)) // 扩大缓冲环 }