重构Track写入架构,增加sub配置项可配置指定track订阅的参数名

修复首次写入AbsTime的值
This commit is contained in:
dexter
2023-01-18 18:08:59 +08:00
parent 881fe5abd9
commit b0dcecdebc
16 changed files with 396 additions and 431 deletions

View File

@@ -40,7 +40,9 @@ func (b H264NALUType) Offset() int {
func (b H264NALUType) Byte() byte {
return byte(b)
}
func ParseH264NALUType(b byte) H264NALUType {
return H264NALUType(b & 0x1F)
}
func (H264NALUType) Parse(b byte) H264NALUType {
return H264NALUType(b & 0x1F)
}

View File

@@ -14,6 +14,10 @@ func (H265NALUType) Parse(b byte) H265NALUType {
return H265NALUType(b & 0x7E >> 1)
}
func ParseH265NALUType(b byte) H265NALUType {
return H265NALUType(b & 0x7E >> 1)
}
const (
// HEVC_VPS = 0x40
// HEVC_SPS = 0x42

View File

@@ -10,18 +10,6 @@ import (
)
type NALUSlice net.Buffers
// type H264Slice NALUSlice
// type H265Slice NALUSlice
// type H264NALU []H264Slice
// type H265NALU []H265Slice
type AudioSlice []byte
// type AACSlice AudioSlice
// type G711Slice AudioSlice
// 裸数据片段
type RawSlice interface {
~[][]byte | ~[]byte
@@ -43,10 +31,13 @@ func (nalu NALUSlice) Bytes() (b []byte) {
return
}
func (nalu *NALUSlice) Reset() *NALUSlice {
func (nalu *NALUSlice) Reset(b ...[]byte) *NALUSlice {
if len(*nalu) > 0 {
*nalu = (*nalu)[:0]
}
if len(b) > 0 {
*nalu = append(*nalu, b...)
}
return nalu
}
@@ -111,12 +102,11 @@ type DataFrame[T any] struct {
type AVFrame[T RawSlice] struct {
BaseFrame
IFrame bool
SEI T
PTS uint32
DTS uint32
AVCC net.Buffers `json:"-"` // 打包好的AVCC格式
AVCC net.Buffers `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
RTP []*RTPFrame `json:"-"`
Raw []T `json:"-"` // 裸数据
Raw []T `json:"-"` // 裸数据,通常代表Access Unit
canRead bool
}

View File

@@ -80,17 +80,16 @@ type VideoTrack interface {
GetDecoderConfiguration() DecoderConfiguration[NALUSlice]
CurrentFrame() *AVFrame[NALUSlice]
PreFrame() *AVFrame[NALUSlice]
WriteSlice(NALUSlice)
WriteSliceBytes(slice []byte)
WriteAnnexB(uint32, uint32, AnnexBFrame)
SetLostFlag()
}
type AudioTrack interface {
AVTrack
GetDecoderConfiguration() DecoderConfiguration[AudioSlice]
CurrentFrame() *AVFrame[AudioSlice]
PreFrame() *AVFrame[AudioSlice]
WriteSlice(AudioSlice)
GetDecoderConfiguration() DecoderConfiguration[[]byte]
CurrentFrame() *AVFrame[[]byte]
PreFrame() *AVFrame[[]byte]
WriteADTS([]byte)
WriteRaw(uint32, AudioSlice)
WriteRaw(uint32, []byte)
}

View File

@@ -42,6 +42,9 @@ func (c *Publish) GetPublishConfig() *Publish {
type Subscribe struct {
SubAudio bool
SubVideo bool
SubVideoArgName string // 指定订阅的视频轨道参数名
SubAudioArgName string // 指定订阅的音频轨道参数名
SubDataArgName string // 指定订阅的数据轨道参数名
SubAudioTracks []string // 指定订阅的音频轨道
SubVideoTracks []string // 指定订阅的视频轨道
LiveMode bool // 实时模式:追赶发布者进度,在播放首屏后等待发布者的下一个关键帧,然后调到该帧。
@@ -118,7 +121,18 @@ type Engine struct {
var Global = &Engine{
Publish: Publish{true, true, false, 10, 0, 0},
Subscribe: Subscribe{true, true, nil, nil, true, false, 10},
Subscribe: Subscribe{
SubAudio: true,
SubVideo: true,
SubVideoArgName: "vts",
SubAudioArgName: "ats",
SubDataArgName: "dts",
SubAudioTracks: nil,
SubVideoTracks: nil,
LiveMode: true,
IFrameOnly: false,
WaitTimeout: 10,
},
HTTP: HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux},
RTPReorder: true,
EnableAVCC: true,

View File

@@ -84,8 +84,7 @@ func (t *TSPublisher) ReadPES() {
if frameLen > remainLen {
break
}
t.AudioTrack.WriteSlice(pes.Payload[7:frameLen])
t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload[:frameLen])
pes.Payload = pes.Payload[frameLen:remainLen]
remainLen -= frameLen
}

View File

@@ -387,21 +387,21 @@ func (s *Stream) run() {
waits := &waitTracks{
Promise: v,
}
if ats := io.Args.Get("ats"); ats != "" {
if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" {
waits.audio.Wait(strings.Split(ats, ",")...)
} else if len(sbConfig.SubAudioTracks) > 0 {
waits.audio.Wait(sbConfig.SubAudioTracks...)
} else if sbConfig.SubAudio {
waits.audio.Wait()
}
if vts := io.Args.Get("vts"); vts != "" {
if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" {
waits.video.Wait(strings.Split(vts, ",")...)
} else if len(sbConfig.SubVideoTracks) > 0 {
waits.video.Wait(sbConfig.SubVideoTracks...)
} else if sbConfig.SubVideo {
waits.video.Wait()
}
if dts := io.Args.Get("dts"); dts != "" {
if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" {
waits.data.Wait(strings.Split(dts, ",")...)
} else {
// waits.data.Wait()

View File

@@ -27,9 +27,9 @@ const (
SUBSTATE_NORMAL
)
type AudioFrame AVFrame[AudioSlice]
type AudioFrame AVFrame[[]byte]
type VideoFrame AVFrame[NALUSlice]
type AudioDeConf DecoderConfiguration[AudioSlice]
type AudioDeConf DecoderConfiguration[[]byte]
type VideoDeConf DecoderConfiguration[NALUSlice]
type FLVFrame net.Buffers
type AudioRTP RTPFrame
@@ -47,10 +47,6 @@ func (f FLVFrame) WriteTo(w io.Writer) (int64, error) {
// return append(r, b...)
// }
func (v *VideoFrame) GetAnnexB() (r net.Buffers) {
if v.SEI != nil {
r = append(r, codec.NALU_Delimiter2)
r = append(r, v.SEI...)
}
r = append(r, codec.NALU_Delimiter2)
for i, nalu := range v.Raw {
if i > 0 {
@@ -103,7 +99,7 @@ func (p *PlayContext[T, R]) decConfChanged() bool {
type TrackPlayer struct {
context.Context `json:"-"`
context.CancelFunc `json:"-"`
Audio PlayContext[*track.Audio, AudioSlice]
Audio PlayContext[*track.Audio, []byte]
Video PlayContext[*track.Video, NALUSlice]
SkipTS uint32 //跳过的时间戳
FirstAbsTS uint32 //订阅起始时间戳
@@ -115,7 +111,7 @@ func (tp *TrackPlayer) ReadVideo() (vp *AVFrame[NALUSlice]) {
return
}
func (tp *TrackPlayer) ReadAudio() (ap *AVFrame[AudioSlice]) {
func (tp *TrackPlayer) ReadAudio() (ap *AVFrame[[]byte]) {
ap = tp.Audio.ring.Read(tp.Context)
tp.Audio.Frame = ap
return
@@ -197,19 +193,19 @@ func (s *Subscriber) PlayBlock(subType byte) {
s.Video.confSeq = s.Video.Track.DecoderConfiguration.Seq
spesic.OnEvent(VideoDeConf(s.Video.Track.DecoderConfiguration))
}
sendAudioDecConf := func(frame *AVFrame[AudioSlice]) {
sendAudioDecConf := func(frame *AVFrame[[]byte]) {
s.Audio.confSeq = s.Audio.Track.DecoderConfiguration.Seq
spesic.OnEvent(AudioDeConf(s.Audio.Track.DecoderConfiguration))
}
var sendVideoFrame func(*AVFrame[NALUSlice])
var sendAudioFrame func(*AVFrame[AudioSlice])
var sendAudioFrame func(*AVFrame[[]byte])
switch subType {
case SUBTYPE_RAW:
sendVideoFrame = func(frame *AVFrame[NALUSlice]) {
// println(frame.Sequence, frame.AbsTime, frame.PTS, frame.DTS, frame.IFrame)
spesic.OnEvent((*VideoFrame)(frame))
}
sendAudioFrame = func(frame *AVFrame[AudioSlice]) {
sendAudioFrame = func(frame *AVFrame[[]byte]) {
spesic.OnEvent((*AudioFrame)(frame))
}
case SUBTYPE_RTP:
@@ -223,7 +219,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
spesic.OnEvent((VideoRTP)(vp))
}
}
sendAudioFrame = func(frame *AVFrame[AudioSlice]) {
sendAudioFrame = func(frame *AVFrame[[]byte]) {
for _, p := range frame.RTP {
audioSeq++
vp := *p
@@ -249,7 +245,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, s.Video.Track.DecoderConfiguration.AVCC)
// spesic.OnEvent(FLVFrame(copyBuffers(s.Video.Track.DecoderConfiguration.FLV)))
}
sendAudioDecConf = func(frame *AVFrame[AudioSlice]) {
sendAudioDecConf = func(frame *AVFrame[[]byte]) {
s.Audio.confSeq = s.Audio.Track.DecoderConfiguration.Seq
ts := s.SkipTS
if frame != nil {
@@ -262,7 +258,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
// println(frame.Sequence, frame.AbsTime, frame.DeltaTime, frame.IFrame)
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, frame.AbsTime, frame.AVCC)
}
sendAudioFrame = func(frame *AVFrame[AudioSlice]) {
sendAudioFrame = func(frame *AVFrame[[]byte]) {
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, frame.AbsTime, frame.AVCC)
}
}

View File

@@ -10,9 +10,15 @@ import (
"m7s.live/engine/v4/util"
)
var _ SpesificTrack[[]byte] = (*AAC)(nil)
func NewAAC(stream IStream) (aac *AAC) {
aac = &AAC{}
aac = &AAC{
sizeLength: 13,
Mode: 2,
}
aac.CodecID = codec.CodecID_AAC
aac.Channels = 2
aac.SampleSize = 16
aac.SetStuff("aac", stream, int(32), byte(97), aac, time.Millisecond*10)
aac.AVCCHead = []byte{0xAF, 1}
@@ -21,29 +27,49 @@ func NewAAC(stream IStream) (aac *AAC) {
type AAC struct {
Audio
buffer []byte
sizeLength int // 通常为13
Mode int // 1为lbr2为hbr
lack int // 用于处理不完整的AU,缺少的字节数
}
func (aac *AAC) writeRTPFrame(frame *RTPFrame) {
aac.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame)
auHeaderLen := util.ReadBE[int](frame.Payload[:2]) >> 3
startOffset := 2 + auHeaderLen
if !frame.Marker {
aac.buffer = append(aac.buffer, frame.Payload[startOffset:]...)
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
func (aac *AAC) WriteRTPFrame(frame *RTPFrame) {
auHeaderLen := util.ReadBE[int](frame.Payload[:aac.Mode]) >> 3 //通常为2即一个AU Header的长度
// auHeaderCount := auHeaderLen >> 1 // AU Header的个数, 通常为1
if auHeaderLen == 0 {
aac.Value.AppendRaw(frame.Payload)
} else {
if aac.buffer != nil {
aac.buffer = append(append([]byte{}, frame.Payload...), aac.buffer...)
startOffset := aac.Mode + auHeaderLen // 实际数据开始的位置
if aac.lack > 0 {
rawLen := len(aac.Value.Raw)
if rawLen == 0 {
aac.Stream.Error("lack >0 but rawlen=0")
}
last := util.Buffer(aac.Value.Raw[rawLen-1])
auLen := len(frame.Payload) - startOffset
if aac.lack > auLen {
last.Write(frame.Payload[startOffset:])
aac.lack -= auLen
return
} else if aac.lack < auLen {
aac.Stream.Warn("lack < auLen", zap.Int("lack", aac.lack), zap.Int("auLen", auLen))
}
last.Write(frame.Payload[startOffset : startOffset+aac.lack])
aac.lack = 0
return
}
for iIndex := aac.Mode; iIndex <= auHeaderLen; iIndex += aac.Mode {
auLen := util.ReadBE[int](frame.Payload[iIndex:iIndex+aac.Mode]) >> (8*aac.Mode - aac.sizeLength) //取高13bit代表AU的长度
nextPos := startOffset + auLen
if len(frame.Payload) < nextPos {
aac.lack = nextPos - len(frame.Payload)
aac.Value.AppendRaw(frame.Payload[startOffset:])
break
} else {
aac.buffer = frame.Payload
aac.Value.AppendRaw(frame.Payload[startOffset:nextPos])
}
for iIndex := 2; iIndex <= auHeaderLen; iIndex += 2 {
auLen := util.ReadBE[int](aac.buffer[iIndex:iIndex+2]) >> 3
aac.WriteSlice(aac.buffer[startOffset : startOffset+auLen])
startOffset += auLen
startOffset = nextPos
}
aac.generateTimestamp()
aac.Flush()
aac.buffer = nil
}
}
@@ -58,28 +84,24 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
aac.Profile = (config1 & 0xF8) >> 3
aac.Channels = ((config2 >> 3) & 0x0F) //声道
aac.Audio.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
aac.Audio.DecoderConfiguration.Raw = AudioSlice(frame[2:])
aac.Audio.DecoderConfiguration.Raw = frame[2:]
aac.Attach()
} else {
aac.WriteSlice(AudioSlice(frame[2:]))
aac.Value.AppendRaw(frame[2:])
aac.Audio.WriteAVCC(ts, frame)
aac.Flush()
}
}
func (aac *AAC) Flush() {
// RTP格式补完
value := aac.Audio.Media.RingBuffer.Value
if aac.ComplementRTP() {
func (aac *AAC) CompleteRTP(value *AVFrame[[]byte]) {
l := util.SizeOfBuffers(value.Raw)
var packet = make(net.Buffers, len(value.Raw)+1)
//AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度又因为单个auheader字节长度2字节所以再除以2就是auheader的个数。
packet[0] = []byte{0x00, 0x10, (byte)((l & 0x1fe0) >> 5), (byte)((l & 0x1f) << 3)}
for i, raw := range value.Raw {
packet[i+1] = raw
auHeaderLen := []byte{0x00, 0x10, (byte)((l & 0x1fe0) >> 5), (byte)((l & 0x1f) << 3)} // 3 = 16-13, 5 = 8-3
packets := util.SplitBuffers(value.Raw, 1200)
for i, packet := range packets {
expand := append(packet, nil)
copy(expand[1:], packet)
expand[0] = auHeaderLen
packets[i] = expand
}
packets := util.SplitBuffers(packet, 1200)
aac.PacketizeRTP(packets...)
}
aac.Audio.Flush()
}

View File

@@ -7,11 +7,8 @@ import (
. "m7s.live/engine/v4/common"
)
var adcflv1 = []byte{codec.FLV_TAG_TYPE_AUDIO, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0}
var adcflv2 = []byte{0, 0, 0, 15}
type Audio struct {
Media[AudioSlice]
Media[[]byte]
CodecID codec.AudioCodecID
Channels byte
SampleSize byte
@@ -75,7 +72,7 @@ func (a *Audio) WriteADTS(adts []byte) {
a.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
a.Channels = channel
avcc := []byte{0xAF, 0x00, config1, config2}
a.DecoderConfiguration = DecoderConfiguration[AudioSlice]{
a.DecoderConfiguration = DecoderConfiguration[[]byte]{
97,
net.Buffers{avcc},
avcc[2:],
@@ -84,41 +81,32 @@ func (a *Audio) WriteADTS(adts []byte) {
a.Attach()
}
func (av *Audio) WriteRaw(pts uint32, raw AudioSlice) {
func (av *Audio) WriteRaw(pts uint32, raw []byte) {
curValue := &av.Value
curValue.BytesIn += len(raw)
if len(av.AVCCHead) == 2 {
raw = raw[7:] //AAC 去掉7个字节的ADTS头
}
av.WriteSlice(raw)
curValue.DTS = pts
curValue.PTS = pts
curValue.AppendRaw(raw)
av.generateTimestamp(pts)
av.Flush()
}
func (av *Audio) WriteAVCC(ts uint32, frame AVCCFrame) {
curValue := &av.AVRing.RingBuffer.Value
curValue := &av.Value
curValue.BytesIn += len(frame)
curValue.AppendAVCC(frame)
curValue.DTS = ts * 90
curValue.PTS = curValue.DTS
av.generateTimestamp(ts * 90)
av.Flush()
}
func (a *Audio) Flush() {
// AVCC 格式补完
value := &a.Value
if a.ComplementAVCC() {
func (a *Audio) CompleteAVCC(value *AVFrame[[]byte]) {
value.AppendAVCC(a.AVCCHead)
for _, raw := range value.Raw {
value.AppendAVCC(raw)
}
}
if a.ComplementRTP() {
var packet = make(net.Buffers, len(value.Raw))
for i, raw := range value.Raw {
packet[i] = raw
}
a.PacketizeRTP(packet)
}
a.Media.Flush()
func (a *Audio) CompleteRTP(value *AVFrame[[]byte]) {
a.PacketizeRTP(value.Raw)
}

View File

@@ -44,6 +44,15 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) {
}
}
type SpesificTrack[T RawSlice] interface {
CompleteRTP(*AVFrame[T])
CompleteAVCC(*AVFrame[T])
WriteSliceBytes([]byte)
WriteRTPFrame(*RTPFrame)
generateTimestamp(uint32)
Flush()
}
// Media 基础媒体Track类
type Media[T RawSlice] struct {
Base
@@ -53,6 +62,7 @@ type Media[T RawSlice] struct {
DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
RTPMuxer
RTPDemuxer
SpesificTrack[T] `json:"-"`
流速控制
}
@@ -77,8 +87,8 @@ func (av *Media[T]) SetStuff(stuff ...any) {
av.DecoderConfiguration.PayloadType = v
case IStream:
av.Stream = v
case RTPWriter:
av.RTPWriter = v
case SpesificTrack[T]:
av.SpesificTrack = v
}
}
}
@@ -107,20 +117,15 @@ func (av *Media[T]) GetDecoderConfiguration() DecoderConfiguration[T] {
}
func (av *Media[T]) CurrentFrame() *AVFrame[T] {
return &av.AVRing.RingBuffer.Value
return &av.Value
}
func (av *Media[T]) PreFrame() *AVFrame[T] {
return av.AVRing.RingBuffer.LastValue
return av.LastValue
}
func (av *Media[T]) generateTimestamp() {
ts := av.AVRing.RingBuffer.Value.RTP[0].Timestamp
av.AVRing.RingBuffer.Value.PTS = ts
av.AVRing.RingBuffer.Value.DTS = ts
}
func (av *Media[T]) WriteSlice(slice T) {
av.Value.AppendRaw(slice)
func (av *Media[T]) generateTimestamp(ts uint32) {
av.Value.PTS = ts
av.Value.DTS = ts
}
func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) {
@@ -135,7 +140,14 @@ func (av *Media[T]) WriteAVCC(ts uint32, frame AVCCFrame) {
func (av *Media[T]) Flush() {
curValue, preValue := &av.Value, av.LastValue
if config.Global.EnableRTP && len(curValue.RTP) == 0 {
av.CompleteRTP(curValue)
}
if config.Global.EnableAVCC && len(curValue.AVCC) == 0 {
av.CompleteAVCC(curValue)
}
if av.起始时间.IsZero() {
curValue.AbsTime = curValue.DTS / 90
av.重置(curValue.AbsTime)
} else {
curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90
@@ -148,42 +160,3 @@ func (av *Media[T]) Flush() {
}
av.Step()
}
func (av *Media[T]) ComplementAVCC() bool {
return config.Global.EnableAVCC && len(av.Value.AVCC) == 0
}
// 是否需要补完RTP格式
func (av *Media[T]) ComplementRTP() bool {
return config.Global.EnableRTP && len(av.Value.RTP) == 0
}
// https://www.cnblogs.com/moonwalk/p/15903760.html
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
func (av *Media[T]) PacketizeRTP(payloads ...[][]byte) {
packetCount := len(payloads)
if cap(av.Value.RTP) < packetCount {
av.Value.RTP = make([]*RTPFrame, packetCount)
} else {
av.Value.RTP = av.Value.RTP[:packetCount]
}
for i, pp := range payloads {
av.rtpSequence++
packet := av.Value.RTP[i]
if packet == nil {
packet = &RTPFrame{}
av.Value.RTP[i] = packet
packet.Version = 2
packet.PayloadType = av.DecoderConfiguration.PayloadType
packet.Payload = make([]byte, 0, 1200)
packet.SSRC = av.SSRC
}
packet.Payload = packet.Payload[:0]
packet.SequenceNumber = av.rtpSequence
packet.Timestamp = av.Value.PTS
packet.Marker = i == packetCount-1
for _, p := range pp {
packet.Payload = append(packet.Payload, p...)
}
}
}

View File

@@ -11,18 +11,20 @@ import (
func NewG711(stream IStream, alaw bool) (g711 *G711) {
g711 = &G711{}
if alaw {
g711.Audio.Name = "pcma"
g711.Name = "pcma"
} else {
g711.Audio.Name = "pcmu"
g711.Name = "pcmu"
}
if alaw {
g711.Audio.CodecID = codec.CodecID_PCMA
g711.CodecID = codec.CodecID_PCMA
} else {
g711.Audio.CodecID = codec.CodecID_PCMU
g711.CodecID = codec.CodecID_PCMU
}
g711.Audio.SampleSize = 8
g711.SampleSize = 8
g711.Channels = 1
g711.AVCCHead = []byte{(byte(g711.CodecID) << 4) | (1 << 1)}
g711.SetStuff(stream, int(32), byte(97), uint32(8000), g711, time.Millisecond*10)
g711.Audio.Attach()
g711.Attach()
return
}
@@ -35,16 +37,10 @@ func (g711 *G711) WriteAVCC(ts uint32, frame AVCCFrame) {
g711.Stream.Error("AVCC data too short", zap.ByteString("data", frame))
return
}
g711.WriteSlice(AudioSlice(frame[1:]))
g711.Value.AppendRaw(frame[1:])
g711.Audio.WriteAVCC(ts, frame)
g711.Flush()
}
func (g711 *G711) writeRTPFrame(frame *RTPFrame) {
g711.WriteSlice(frame.Payload)
g711.Audio.Media.AVRing.RingBuffer.Value.AppendRTP(frame)
if frame.Marker {
g711.generateTimestamp()
g711.Flush()
}
func (g711 *G711) WriteRTPFrame(frame *RTPFrame) {
g711.Value.AppendRaw(frame.Payload)
}

View File

@@ -7,10 +7,11 @@ import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
var _ SpesificTrack[NALUSlice] = (*H264)(nil)
type H264 struct {
Video
}
@@ -23,30 +24,17 @@ func NewH264(stream IStream) (vt *H264) {
vt.dtsEst = NewDTSEstimator()
return
}
func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
if dts == 0 {
vt.generateTimestamp(pts)
} else {
vt.Value.PTS = pts
vt.Value.DTS = dts
}
for _, slice := range vt.Video.WriteAnnexB(frame) {
vt.WriteSlice(slice)
}
if len(vt.Value.Raw) > 0 {
vt.Flush()
}
// println(vt.FPS)
}
func (vt *H264) WriteSlice(slice NALUSlice) {
// print(slice.H264Type())
switch slice.H264Type() {
func (vt *H264) WriteSliceBytes(slice []byte) {
naluType := codec.ParseH264NALUType(slice[0])
// println("naluType", naluType)
switch naluType {
case codec.NALU_SPS:
vt.SPSInfo, _ = codec.ParseSPS(slice[0])
vt.Video.DecoderConfiguration.Raw[0] = slice[0]
vt.SPSInfo, _ = codec.ParseSPS(slice)
vt.Video.DecoderConfiguration.Raw[0] = slice
case codec.NALU_PPS:
vt.dcChanged = true
vt.Video.DecoderConfiguration.Raw[1] = slice[0]
vt.Video.DecoderConfiguration.Raw[1] = slice
lenSPS := len(vt.Video.DecoderConfiguration.Raw[0])
lenPPS := len(vt.Video.DecoderConfiguration.Raw[1])
if lenSPS > 3 {
@@ -61,15 +49,18 @@ func (vt *H264) WriteSlice(slice NALUSlice) {
vt.Video.DecoderConfiguration.Seq++
case codec.NALU_IDR_Picture:
vt.Value.IFrame = true
vt.Video.WriteSlice(slice)
vt.WriteRawBytes(slice)
case codec.NALU_Non_IDR_Picture,
codec.NALU_Data_Partition_A,
codec.NALU_Data_Partition_B,
codec.NALU_Data_Partition_C:
vt.Value.IFrame = false
vt.Video.WriteSlice(slice)
vt.WriteRawBytes(slice)
case codec.NALU_SEI:
vt.Value.SEI = slice
vt.WriteRawBytes(slice)
case codec.NALU_Access_Unit_Delimiter:
default:
vt.Stream.Error("H264 WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType)))
}
}
@@ -94,21 +85,20 @@ func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) {
}
} else {
vt.Video.WriteAVCC(ts, frame)
vt.Video.Media.RingBuffer.Value.IFrame = frame.IsIDR()
vt.Flush()
}
}
func (vt *H264) writeRTPFrame(frame *RTPFrame) {
rv := &vt.Video.Media.RingBuffer.Value
func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
rv := &vt.Value
if naluType := frame.H264Type(); naluType < 24 {
vt.WriteSlice(NALUSlice{frame.Payload})
vt.WriteSliceBytes(frame.Payload)
} else {
switch naluType {
case codec.NALU_STAPA, codec.NALU_STAPB:
for buffer := util.Buffer(frame.Payload[naluType.Offset():]); buffer.CanRead(); {
nextSize := int(buffer.ReadUint16())
if buffer.Len() >= nextSize {
vt.WriteSlice(NALUSlice{buffer.ReadN(nextSize)})
vt.WriteSliceBytes(buffer.ReadN(nextSize))
} else {
vt.Stream.Error("invalid nalu size", zap.Int("naluType", int(naluType)))
return
@@ -116,7 +106,7 @@ func (vt *H264) writeRTPFrame(frame *RTPFrame) {
}
case codec.NALU_FUA, codec.NALU_FUB:
if util.Bit1(frame.Payload[1], 0) {
rv.AppendRaw(NALUSlice{[]byte{naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)}})
vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60))
}
// 最后一个是半包缓存,用于拼接
lastIndex := len(rv.Raw) - 1
@@ -124,40 +114,27 @@ func (vt *H264) writeRTPFrame(frame *RTPFrame) {
return
}
rv.Raw[lastIndex].Append(frame.Payload[naluType.Offset():])
if util.Bit1(frame.Payload[1], 1) {
complete := rv.Raw[lastIndex] //拼接完成
rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去
vt.WriteSlice(complete)
}
// if util.Bit1(frame.Payload[1], 1) {
// complete := rv.Raw[lastIndex] //拼接完成
// }
}
}
frame.SequenceNumber += vt.rtpSequence //增加偏移需要增加rtp包后需要顺延
rv.AppendRTP(frame)
if frame.Marker {
vt.generateTimestamp(frame.Timestamp)
vt.Flush()
}
}
func (vt *H264) Flush() {
if vt.Video.Media.RingBuffer.Value.IFrame {
vt.Video.ComputeGOP()
}
if vt.Attached == 0 && vt.IDRing != nil && vt.DecoderConfiguration.Seq > 0 {
defer vt.Attach()
}
// RTP格式补完
if config.Global.EnableRTP {
if len(vt.Value.RTP) > 0 {
if !vt.dcChanged && vt.Value.IFrame {
func (vt *H264) CompleteRTP(value *AVFrame[NALUSlice]) {
if len(value.RTP) > 0 {
if !vt.dcChanged && value.IFrame {
vt.insertDCRtp()
}
} else {
var out [][][]byte
if vt.Value.IFrame {
if value.IFrame {
out = append(out, [][]byte{vt.DecoderConfiguration.Raw[0]}, [][]byte{vt.DecoderConfiguration.Raw[1]})
}
for _, nalu := range vt.Value.Raw {
for _, nalu := range value.Raw {
buffers := util.SplitBuffers(nalu, 1200)
firstBuffer := NALUSlice(buffers[0])
if l := len(buffers); l == 1 {
@@ -180,9 +157,6 @@ func (vt *H264) Flush() {
buf[0][1] |= 1 << 6 // set end bit
}
}
vt.PacketizeRTP(out...)
}
}
vt.Video.Flush()
}

View File

@@ -7,10 +7,11 @@ import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
var _ SpesificTrack[NALUSlice] = (*H265)(nil)
type H265 struct {
Video
}
@@ -23,32 +24,17 @@ func NewH265(stream IStream) (vt *H265) {
vt.dtsEst = NewDTSEstimator()
return
}
func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
if dts == 0 {
vt.generateTimestamp(pts)
} else {
vt.Video.Media.RingBuffer.Value.PTS = pts
vt.Video.Media.RingBuffer.Value.DTS = dts
}
// println(pts,dts,len(frame))
for _, slice := range vt.Video.WriteAnnexB(frame) {
vt.WriteSlice(slice)
}
if len(vt.Value.Raw) > 0 {
vt.Flush()
}
}
func (vt *H265) WriteSlice(slice NALUSlice) {
// println(slice.H265Type())
switch slice.H265Type() {
func (vt *H265) WriteSliceBytes(slice []byte) {
switch t := codec.ParseH265NALUType(slice[0]); t {
case codec.NAL_UNIT_VPS:
vt.Video.DecoderConfiguration.Raw[0] = slice[0]
vt.Video.DecoderConfiguration.Raw[0] = slice
case codec.NAL_UNIT_SPS:
vt.Video.DecoderConfiguration.Raw[1] = slice[0]
vt.Video.SPSInfo, _ = codec.ParseHevcSPS(slice[0])
vt.Video.DecoderConfiguration.Raw[1] = slice
vt.Video.SPSInfo, _ = codec.ParseHevcSPS(slice)
case codec.NAL_UNIT_PPS:
vt.Video.dcChanged = true
vt.Video.DecoderConfiguration.Raw[2] = slice[0]
vt.Video.DecoderConfiguration.Raw[2] = slice
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.Video.DecoderConfiguration.Raw[0], vt.Video.DecoderConfiguration.Raw[1], vt.Video.DecoderConfiguration.Raw[2])
if err == nil {
vt.Video.DecoderConfiguration.AVCC = net.Buffers{extraData}
@@ -62,16 +48,17 @@ func (vt *H265) WriteSlice(slice NALUSlice) {
codec.NAL_UNIT_CODED_SLICE_IDR_N_LP,
codec.NAL_UNIT_CODED_SLICE_CRA:
vt.Value.IFrame = true
vt.Video.WriteSlice(slice)
vt.WriteRawBytes(slice)
case 0, 1, 2, 3, 4, 5, 6, 7, 8, 9:
vt.Value.IFrame = false
vt.Video.WriteSlice(slice)
vt.WriteRawBytes(slice)
case codec.NAL_UNIT_SEI:
vt.Value.SEI = slice
vt.WriteRawBytes(slice)
default:
vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(slice.H265Type())))
vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(t)))
}
}
func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
if len(frame) < 6 {
vt.Stream.Error("AVCC data too short", zap.ByteString("data", frame))
@@ -93,13 +80,11 @@ func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
}
} else {
vt.Video.WriteAVCC(ts, frame)
vt.Video.Media.RingBuffer.Value.IFrame = frame.IsIDR()
vt.Flush()
}
}
func (vt *H265) writeRTPFrame(frame *RTPFrame) {
rv := &vt.Video.Media.RingBuffer.Value
func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
rv := &vt.Value
// TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream.
var usingDonlField bool
var buffer = util.Buffer(frame.Payload)
@@ -110,7 +95,7 @@ func (vt *H265) writeRTPFrame(frame *RTPFrame) {
buffer.ReadUint16()
}
for buffer.CanRead() {
vt.WriteSlice(NALUSlice{buffer.ReadN(int(buffer.ReadUint16()))})
vt.WriteSliceBytes(buffer.ReadN(int(buffer.ReadUint16())))
if usingDonlField {
buffer.ReadByte()
}
@@ -122,45 +107,34 @@ func (vt *H265) writeRTPFrame(frame *RTPFrame) {
buffer.ReadUint16()
}
if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) {
rv.AppendRaw(NALUSlice{[]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]}})
vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1])
}
lastIndex := len(rv.Raw) - 1
if lastIndex == -1 {
return
}
rv.Raw[lastIndex].Append(buffer)
if util.Bit1(fuHeader, 1) {
complete := rv.Raw[lastIndex] //拼接完成
rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去
vt.WriteSlice(complete)
}
// if util.Bit1(fuHeader, 1) {
// complete := rv.Raw[lastIndex] //拼接完成
// rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去
// vt.WriteSlice(complete)
// }
default:
vt.WriteSlice(NALUSlice{frame.Payload})
vt.WriteSliceBytes(frame.Payload)
}
frame.SequenceNumber += vt.rtpSequence //增加偏移需要增加rtp包后需要顺延
rv.AppendRTP(frame)
if frame.Marker {
vt.Video.generateTimestamp(frame.Timestamp)
vt.Flush()
}
}
func (vt *H265) Flush() {
if vt.Video.Media.RingBuffer.Value.IFrame {
vt.Video.ComputeGOP()
}
if vt.Attached == 0 && vt.IDRing != nil && vt.DecoderConfiguration.Seq > 0 {
defer vt.Attach()
}
// RTP格式补完
if config.Global.EnableRTP {
if len(vt.Value.RTP) > 0 {
if !vt.dcChanged && vt.Value.IFrame {
func (vt *H265) CompleteRTP(value *AVFrame[NALUSlice]) {
if len(value.RTP) > 0 {
if !vt.dcChanged && value.IFrame {
vt.insertDCRtp()
}
} else {
// H265打包 https://blog.csdn.net/fanyun_01/article/details/114234290
var out [][][]byte
if vt.Value.IFrame {
if value.IFrame {
out = append(out, [][]byte{vt.DecoderConfiguration.Raw[0]}, [][]byte{vt.DecoderConfiguration.Raw[1]}, [][]byte{vt.DecoderConfiguration.Raw[2]})
}
for _, nalu := range vt.Video.Media.RingBuffer.Value.Raw {
@@ -189,5 +163,3 @@ func (vt *H265) Flush() {
vt.PacketizeRTP(out...)
}
}
vt.Video.Flush()
}

View File

@@ -8,10 +8,6 @@ import (
"m7s.live/engine/v4/util"
)
type RTPWriter interface {
writeRTPFrame(frame *RTPFrame)
}
func (av *Media[T]) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) {
if av.DecoderConfiguration.PayloadType != p.PayloadType {
av.Stream.Warn("RTP PayloadType error", zap.Uint8("want", av.DecoderConfiguration.PayloadType), zap.Uint8("got", p.PayloadType))
@@ -34,6 +30,15 @@ func (av *Media[T]) UnmarshalRTP(raw []byte) (frame *RTPFrame) {
return av.UnmarshalRTPPacket(&p)
}
func (av *Media[T]) writeRTPFrame(frame *RTPFrame) {
av.Value.AppendRTP(frame)
av.WriteRTPFrame(frame)
if frame.Marker {
av.SpesificTrack.generateTimestamp(frame.Timestamp)
av.SpesificTrack.Flush()
}
}
// WriteRTPPack 写入已反序列化的RTP包
func (av *Media[T]) WriteRTPPack(p *rtp.Packet) {
for frame := av.UnmarshalRTPPacket(p); frame != nil; frame = av.nextRTPFrame() {
@@ -48,11 +53,40 @@ func (av *Media[T]) WriteRTP(raw []byte) {
}
}
// https://www.cnblogs.com/moonwalk/p/15903760.html
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
func (av *Media[T]) PacketizeRTP(payloads ...[][]byte) {
packetCount := len(payloads)
if cap(av.Value.RTP) < packetCount {
av.Value.RTP = make([]*RTPFrame, packetCount)
} else {
av.Value.RTP = av.Value.RTP[:packetCount]
}
for i, pp := range payloads {
av.rtpSequence++
packet := av.Value.RTP[i]
if packet == nil {
packet = &RTPFrame{}
av.Value.RTP[i] = packet
packet.Version = 2
packet.PayloadType = av.DecoderConfiguration.PayloadType
packet.Payload = make([]byte, 0, 1200)
packet.SSRC = av.SSRC
}
packet.Payload = packet.Payload[:0]
packet.SequenceNumber = av.rtpSequence
packet.Timestamp = av.Value.PTS
packet.Marker = i == packetCount-1
for _, p := range pp {
packet.Payload = append(packet.Payload, p...)
}
}
}
type RTPDemuxer struct {
lastSeq uint16 //上一个收到的序号,用于乱序重排
lastSeq2 uint16 //记录上上一个收到的序列号
乱序重排 util.RTPReorder[*RTPFrame]
RTPWriter `json:"-"`
}
// 获取缓存中下一个rtpFrame

View File

@@ -72,10 +72,6 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e
data = append(data, codec.NALU_Delimiter2, nalu)
}
}
if vp.SEI != nil {
data = append(data, codec.NALU_Delimiter2)
data = append(data, vp.SEI...)
}
data = append(data, codec.NALU_Delimiter2)
for i, nalu := range vp.Raw {
if i > 0 {
@@ -90,7 +86,7 @@ func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) e
}
return ctx.Err()
}
func (vt *Video) ComputeGOP() {
func (vt *Video) computeGOP() {
vt.idrCount++
if vt.IDRing != nil {
vt.GOP = int(vt.AVRing.RingBuffer.Value.Sequence - vt.IDRing.Value.Sequence)
@@ -109,38 +105,32 @@ func (vt *Video) ComputeGOP() {
vt.IDRing = vt.AVRing.RingBuffer.Ring
}
func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame, s *[]NALUSlice) {
for len(annexb) > 0 {
before, after, found := bytes.Cut(annexb, codec.NALU_Delimiter1)
if !found {
*s = append(*s, NALUSlice{annexb})
return
}
if len(before) > 0 {
*s = append(*s, NALUSlice{before})
}
annexb = after
func (vt *Video) writeAnnexBSlice(annexb AnnexBFrame) {
for found, after := true, annexb; len(annexb) > 0 && found; annexb = after {
annexb, after, found = bytes.Cut(annexb, codec.NALU_Delimiter1)
vt.WriteSliceBytes(annexb)
}
}
func (vt *Video) WriteAnnexB(frame AnnexBFrame) (s []NALUSlice) {
func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
if dts == 0 {
vt.generateTimestamp(pts)
} else {
vt.Value.PTS = pts
vt.Value.DTS = dts
}
vt.Value.BytesIn += len(frame)
for len(frame) > 0 {
before, after, found := bytes.Cut(frame, codec.NALU_Delimiter2)
if !found {
vt.writeAnnexBSlice(frame, &s)
return
for found, after := true, frame; len(frame) > 0 && found; frame = after {
frame, after, found = bytes.Cut(frame, codec.NALU_Delimiter2)
vt.writeAnnexBSlice(frame)
}
if len(before) > 0 {
vt.writeAnnexBSlice(AnnexBFrame(before), &s)
if len(vt.Value.Raw) > 0 {
vt.Flush()
}
frame = after
}
return
}
func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) {
vt.Value.IFrame = frame.IsIDR()
vt.Media.WriteAVCC(ts, frame)
for nalus := frame[5:]; len(nalus) > vt.nalulenSize; {
nalulen := util.ReadBE[int](nalus[:vt.nalulenSize])
@@ -149,18 +139,26 @@ func (vt *Video) WriteAVCC(ts uint32, frame AVCCFrame) {
return
}
if end := nalulen + vt.nalulenSize; len(nalus) >= end {
slice := nalus[vt.nalulenSize:end]
if _rawSlice := util.MallocSlice(&vt.AVRing.Value.Raw); _rawSlice == nil {
vt.Value.AppendRaw(NALUSlice{slice})
} else {
_rawSlice.Reset().Append(slice)
}
vt.WriteRawBytes(nalus[vt.nalulenSize:end])
nalus = nalus[end:]
} else {
vt.Stream.Error("WriteAVCC", zap.Int("len", len(nalus)), zap.Int("naluLenSize", vt.nalulenSize), zap.Int("end", end))
break
}
}
vt.Flush()
}
func (vt *Video) WriteSliceByte(b ...byte) {
vt.WriteSliceBytes(b)
}
func (vt *Video) WriteRawBytes(slice []byte) {
if naluSlice := util.MallocSlice(&vt.AVRing.Value.Raw); naluSlice == nil {
vt.Value.AppendRaw(NALUSlice{slice})
} else {
naluSlice.Reset(slice)
}
}
// 在I帧前面插入sps pps webrtc需要
@@ -195,33 +193,14 @@ func (av *Video) insertDCRtp() {
}
func (av *Video) generateTimestamp(ts uint32) {
av.AVRing.RingBuffer.Value.PTS = ts
av.AVRing.RingBuffer.Value.DTS = av.dtsEst.Feed(ts)
av.Value.PTS = ts
av.Value.DTS = av.dtsEst.Feed(ts)
}
func (vt *Video) SetLostFlag() {
vt.lostFlag = true
}
func (vt *Video) Flush() {
rv := &vt.Value
// 没有实际媒体数据
if len(rv.Raw) == 0 {
rv.Reset()
return
}
if vt.lostFlag {
if rv.IFrame {
vt.lostFlag = false
} else {
rv.Reset()
return
}
}
// AVCC格式补完
if vt.ComplementAVCC() {
func (vt *Video) CompleteAVCC(rv *AVFrame[NALUSlice]) {
var b util.Buffer
if cap(rv.AVCC) > 0 {
if avcc := rv.AVCC[:1]; len(avcc[0]) == 5 {
@@ -246,10 +225,33 @@ func (vt *Video) Flush() {
rv.AppendAVCC(nalu...)
}
}
// 下一帧为I帧即将覆盖
if vt.Next().Value.IFrame {
// 仅存一枚I帧需要扩环
func (vt *Video) Flush() {
if vt.Value.IFrame {
vt.computeGOP()
}
if vt.Attached == 0 && vt.IDRing != nil && vt.DecoderConfiguration.Seq > 0 {
defer vt.Attach()
}
rv := &vt.Value
// 没有实际媒体数据
if len(rv.Raw) == 0 {
rv.Reset()
return
}
if vt.lostFlag {
if rv.IFrame {
vt.lostFlag = false
} else {
rv.Reset()
return
}
}
// 仅存一枚I帧
if vt.idrCount == 1 {
// 下一帧为I帧即将覆盖需要扩环
if vt.Next().Value.IFrame {
if vt.AVRing.RingBuffer.Size < 256 {
vt.Link(util.NewRing[AVFrame[NALUSlice]](5)) // 扩大缓冲环
}