允许再次发送序列帧

This commit is contained in:
langhuihui
2021-07-18 22:40:16 +08:00
parent 8b3962b7db
commit 84656341b1
3 changed files with 100 additions and 53 deletions

View File

@@ -56,6 +56,9 @@ func (at *AudioTrack) pushByteStream(ts uint32, payload []byte) {
//extensionFlag = config2 & 0x01 //extensionFlag = config2 & 0x01
at.ExtraData = payload at.ExtraData = payload
at.PushByteStream = func(ts uint32, payload []byte) { at.PushByteStream = func(ts uint32, payload []byte) {
if len(payload) < 3 {
return
}
pack := at.current() pack := at.current()
pack.Raw = payload[2:] pack.Raw = payload[2:]
pack.Timestamp = ts pack.Timestamp = ts
@@ -69,11 +72,10 @@ func (at *AudioTrack) pushByteStream(ts uint32, payload []byte) {
at.Channels = payload[0]&0x01 + 1 at.Channels = payload[0]&0x01 + 1
at.ExtraData = payload[:1] at.ExtraData = payload[:1]
at.PushByteStream = func(ts uint32, payload []byte) { at.PushByteStream = func(ts uint32, payload []byte) {
pack := at.current() if len(payload) < 2 {
payloadLen := len(payload)
if payloadLen < 4 {
return return
} }
pack := at.current()
pack.Raw = payload[1:] pack.Raw = payload[1:]
pack.Timestamp = ts pack.Timestamp = ts
pack.Payload = payload pack.Payload = payload

View File

@@ -49,6 +49,18 @@ func FindStream(streamPath string) *Stream {
return Streams.GetStream(streamPath) return Streams.GetStream(streamPath)
} }
// Publish 直接发布
func Publish(streamPath, t string) *Stream {
var stream = &Stream{
StreamPath: streamPath,
Type: t,
}
if stream.Publish() {
return stream
}
return nil
}
// Stream 流定义 // Stream 流定义
type Stream struct { type Stream struct {
context.Context `json:"-"` context.Context `json:"-"`
@@ -62,12 +74,35 @@ type Stream struct {
Transcoding map[string]string //转码配置key目标编码value发布者提供的编码 Transcoding map[string]string //转码配置key目标编码value发布者提供的编码
subscribeMutex sync.Mutex subscribeMutex sync.Mutex
timeout *time.Timer //更新时间用来做超时处理 timeout *time.Timer //更新时间用来做超时处理
Close func() `json:"-"` OnClose func() `json:"-"`
ExtraProp interface{} //额外的属性用于实现子类化减少map的使用 ExtraProp interface{} //额外的属性用于实现子类化减少map的使用
} }
// 增加结束时的回调使用类似Js的猴子补丁
func (r *Stream) AddOnClose(onClose func()) {
if originOnClose := r.OnClose; originOnClose == nil {
r.OnClose = onClose
} else {
r.OnClose = func() {
originOnClose()
onClose()
}
}
}
func (r *Stream) Update() { func (r *Stream) Update() {
r.timeout.Reset(config.PublishTimeout) if r.timeout != nil {
r.timeout.Reset(config.PublishTimeout)
}
}
func (r *Stream) Close() {
Streams.Lock()
if r.OnClose != nil {
r.OnClose()
r.OnClose = nil
}
Streams.Unlock()
} }
// Publish 发布者进行发布操作 // Publish 发布者进行发布操作
@@ -80,27 +115,23 @@ func (r *Stream) Publish() bool {
r.VideoTracks.Init() r.VideoTracks.Init()
r.AudioTracks.Init() r.AudioTracks.Init()
var cancel context.CancelFunc var cancel context.CancelFunc
customClose := r.Close
r.Context, cancel = context.WithCancel(context.Background()) r.Context, cancel = context.WithCancel(context.Background())
var closeOnce sync.Once r.AddOnClose(func() {
r.Close = func() { cancel()
closeOnce.Do(func() { r.timeout.Stop()
r.timeout.Stop() r.VideoTracks.Dispose()
if customClose != nil { r.AudioTracks.Dispose()
customClose() utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
} delete(Streams.m, r.StreamPath)
cancel() TriggerHook(HOOK_STREAMCLOSE, r)
r.VideoTracks.Dispose() })
r.AudioTracks.Dispose()
utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
Streams.Delete(r.StreamPath)
TriggerHook(HOOK_STREAMCLOSE, r)
})
}
r.StartTime = time.Now() r.StartTime = time.Now()
Streams.m[r.StreamPath] = r Streams.m[r.StreamPath] = r
utils.Print(Green("Stream publish:"), BrightCyan(r.StreamPath)) utils.Print(Green("Stream publish:"), BrightCyan(r.StreamPath))
r.timeout = time.AfterFunc(config.PublishTimeout, r.Close) r.timeout = time.AfterFunc(config.PublishTimeout, func() {
utils.Print(Yellow("Stream timeout:"), BrightCyan(r.StreamPath))
r.Close()
})
//触发钩子 //触发钩子
TriggerHook(HOOK_PUBLISH, r) TriggerHook(HOOK_PUBLISH, r)
return true return true

View File

@@ -40,11 +40,11 @@ type VideoTrack struct {
ExtraData *VideoPack `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) ExtraData *VideoPack `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS)
WaitIDR context.Context `json:"-"` WaitIDR context.Context `json:"-"`
revIDR func() revIDR func()
PushByteStream func(ts uint32, payload []byte) `json:"-"`
PushNalu func(ts uint32, cts uint32, nalus ...[]byte) `json:"-"` PushNalu func(ts uint32, cts uint32, nalus ...[]byte) `json:"-"`
UsingDonlField bool UsingDonlField bool
writeByteStream func(pack *VideoPack) writeByteStream func(pack *VideoPack)
idrCount int //处于缓冲中的关键帧数量 idrCount int //处于缓冲中的关键帧数量
nalulenSize int
} }
func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) {
@@ -76,7 +76,6 @@ func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) {
} }
}, },
} }
vt.PushByteStream = vt.pushByteStream
vt.PushNalu = vt.pushNalu vt.PushNalu = vt.pushNalu
vt.Stream = s vt.Stream = s
vt.CodecID = codec vt.CodecID = codec
@@ -167,7 +166,11 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) {
naluType := nalu[0] & naluTypeBitmask naluType := nalu[0] & naluTypeBitmask
switch naluType { switch naluType {
case codec.NALU_SPS: case codec.NALU_SPS:
vt.ExtraData.NALUs[0] = nalu
vt.SPSInfo, _ = codec.ParseSPS(nalu)
case codec.NALU_PPS: case codec.NALU_PPS:
vt.ExtraData.NALUs[1] = nalu
vt.ExtraData.Payload = codec.BuildH264SeqHeaderFromSpsPps(vt.ExtraData.NALUs[0], vt.ExtraData.NALUs[1])
case codec.NALU_STAPB: case codec.NALU_STAPB:
stapaHeaderSize = 3 stapaHeaderSize = 3
fallthrough fallthrough
@@ -335,6 +338,22 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) {
continue continue
} }
switch naluType { switch naluType {
case codec.NAL_UNIT_VPS:
vps = nalu
vt.ExtraData.NALUs[0] = vps
case codec.NAL_UNIT_SPS:
sps = nalu
vt.ExtraData.NALUs[1] = sps
vt.SPSInfo, _ = codec.ParseHevcSPS(nalu)
case codec.NAL_UNIT_PPS:
pps = nalu
vt.ExtraData.NALUs[2] = pps
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vps, sps, pps)
if err != nil {
return
}
vt.ExtraData.Payload = extraData
// 4.4.2. Aggregation Packets (APs) (p25) // 4.4.2. Aggregation Packets (APs) (p25)
/* /*
0 1 2 3 0 1 2 3
@@ -457,18 +476,15 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) {
func (vt *VideoTrack) current() *VideoPack { func (vt *VideoTrack) current() *VideoPack {
return vt.CurrentValue().(*VideoPack) return vt.CurrentValue().(*VideoPack)
} }
func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) { func (vt *VideoTrack) PushByteStream(ts uint32, payload []byte) {
if payload[1] != 0 { if payload[1] == 0 {
return
} else {
vt.CodecID = payload[0] & 0x0F vt.CodecID = payload[0] & 0x0F
var nalulenSize int
switch vt.CodecID { switch vt.CodecID {
case 7: case 7:
var info codec.AVCDecoderConfigurationRecord var info codec.AVCDecoderConfigurationRecord
if _, err := info.Unmarshal(payload[5:]); err == nil { if _, err := info.Unmarshal(payload[5:]); err == nil {
vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit)
nalulenSize = int(info.LengthSizeMinusOne&3 + 1) vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1)
vt.ExtraData = &VideoPack{ vt.ExtraData = &VideoPack{
NALUs: [][]byte{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit}, NALUs: [][]byte{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit},
} }
@@ -478,7 +494,7 @@ func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) {
case 12: case 12:
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(payload); err == nil { if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(payload); err == nil {
vt.SPSInfo, _ = codec.ParseSPS(sps) vt.SPSInfo, _ = codec.ParseSPS(sps)
nalulenSize = int(payload[26]) & 0x03 vt.nalulenSize = int(payload[26]) & 0x03
vt.ExtraData = &VideoPack{ vt.ExtraData = &VideoPack{
NALUs: [][]byte{vps, sps, pps}, NALUs: [][]byte{vps, sps, pps},
} }
@@ -486,29 +502,27 @@ func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) {
vt.Stream.VideoTracks.AddTrack("h265", vt) vt.Stream.VideoTracks.AddTrack("h265", vt)
} }
} }
// 已完成序列帧组装重置Push函数从Payload中提取Nalu供非bytestream格式使用 } else {
vt.PushByteStream = func(ts uint32, payload []byte) { pack := vt.current()
pack := vt.current() if len(payload) < 4 {
if len(payload) < 4 { return
return
}
vt.bytes += len(payload)
pack.IDR = payload[0]>>4 == 1
pack.Timestamp = ts
pack.Sequence = vt.PacketCount
pack.Payload = payload
pack.CompositionTime = utils.BigEndian.Uint24(payload[2:])
pack.NALUs = nil
for nalus := payload[5:]; len(nalus) > nalulenSize; {
nalulen := 0
for i := 0; i < nalulenSize; i++ {
nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1))
}
pack.NALUs = append(pack.NALUs, nalus[nalulenSize:nalulen+nalulenSize])
nalus = nalus[nalulen+nalulenSize:]
}
vt.push(pack)
} }
vt.bytes += len(payload)
pack.IDR = payload[0]>>4 == 1
pack.Timestamp = ts
pack.Sequence = vt.PacketCount
pack.Payload = payload
pack.CompositionTime = utils.BigEndian.Uint24(payload[2:])
pack.NALUs = nil
for nalus := payload[5:]; len(nalus) > vt.nalulenSize; {
nalulen := 0
for i := 0; i < vt.nalulenSize; i++ {
nalulen += int(nalus[i]) << (8 * (vt.nalulenSize - i - 1))
}
pack.NALUs = append(pack.NALUs, nalus[vt.nalulenSize:nalulen+vt.nalulenSize])
nalus = nalus[nalulen+vt.nalulenSize:]
}
vt.push(pack)
} }
} }