diff --git a/audio_track.go b/audio_track.go index e5e13d8..ae47230 100644 --- a/audio_track.go +++ b/audio_track.go @@ -56,6 +56,9 @@ func (at *AudioTrack) pushByteStream(ts uint32, payload []byte) { //extensionFlag = config2 & 0x01 at.ExtraData = payload at.PushByteStream = func(ts uint32, payload []byte) { + if len(payload) < 3 { + return + } pack := at.current() pack.Raw = payload[2:] pack.Timestamp = ts @@ -69,11 +72,10 @@ func (at *AudioTrack) pushByteStream(ts uint32, payload []byte) { at.Channels = payload[0]&0x01 + 1 at.ExtraData = payload[:1] at.PushByteStream = func(ts uint32, payload []byte) { - pack := at.current() - payloadLen := len(payload) - if payloadLen < 4 { + if len(payload) < 2 { return } + pack := at.current() pack.Raw = payload[1:] pack.Timestamp = ts pack.Payload = payload diff --git a/stream.go b/stream.go index 23580df..b451acf 100644 --- a/stream.go +++ b/stream.go @@ -49,6 +49,18 @@ func FindStream(streamPath string) *Stream { return Streams.GetStream(streamPath) } +// Publish 直接发布 +func Publish(streamPath, t string) *Stream { + var stream = &Stream{ + StreamPath: streamPath, + Type: t, + } + if stream.Publish() { + return stream + } + return nil +} + // Stream 流定义 type Stream struct { context.Context `json:"-"` @@ -62,12 +74,35 @@ type Stream struct { Transcoding map[string]string //转码配置,key:目标编码,value:发布者提供的编码 subscribeMutex sync.Mutex timeout *time.Timer //更新时间用来做超时处理 - Close func() `json:"-"` + OnClose func() `json:"-"` ExtraProp interface{} //额外的属性,用于实现子类化,减少map的使用 } +// 增加结束时的回调,使用类似Js的猴子补丁 +func (r *Stream) AddOnClose(onClose func()) { + if originOnClose := r.OnClose; originOnClose == nil { + r.OnClose = onClose + } else { + r.OnClose = func() { + originOnClose() + onClose() + } + } +} + func (r *Stream) Update() { - r.timeout.Reset(config.PublishTimeout) + if r.timeout != nil { + r.timeout.Reset(config.PublishTimeout) + } +} + +func (r *Stream) Close() { + Streams.Lock() + if r.OnClose != nil { + r.OnClose() + r.OnClose = nil + } + Streams.Unlock() } // Publish 发布者进行发布操作 @@ -80,27 +115,23 @@ func (r *Stream) Publish() bool { r.VideoTracks.Init() r.AudioTracks.Init() var cancel context.CancelFunc - customClose := r.Close r.Context, cancel = context.WithCancel(context.Background()) - var closeOnce sync.Once - r.Close = func() { - closeOnce.Do(func() { - r.timeout.Stop() - if customClose != nil { - customClose() - } - cancel() - r.VideoTracks.Dispose() - r.AudioTracks.Dispose() - utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) - Streams.Delete(r.StreamPath) - TriggerHook(HOOK_STREAMCLOSE, r) - }) - } + r.AddOnClose(func() { + cancel() + r.timeout.Stop() + r.VideoTracks.Dispose() + r.AudioTracks.Dispose() + utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) + delete(Streams.m, r.StreamPath) + TriggerHook(HOOK_STREAMCLOSE, r) + }) r.StartTime = time.Now() Streams.m[r.StreamPath] = r utils.Print(Green("Stream publish:"), BrightCyan(r.StreamPath)) - r.timeout = time.AfterFunc(config.PublishTimeout, r.Close) + r.timeout = time.AfterFunc(config.PublishTimeout, func() { + utils.Print(Yellow("Stream timeout:"), BrightCyan(r.StreamPath)) + r.Close() + }) //触发钩子 TriggerHook(HOOK_PUBLISH, r) return true diff --git a/video_track.go b/video_track.go index c629a39..d80bdfc 100644 --- a/video_track.go +++ b/video_track.go @@ -40,11 +40,11 @@ type VideoTrack struct { ExtraData *VideoPack `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) WaitIDR context.Context `json:"-"` revIDR func() - PushByteStream func(ts uint32, payload []byte) `json:"-"` PushNalu func(ts uint32, cts uint32, nalus ...[]byte) `json:"-"` UsingDonlField bool writeByteStream func(pack *VideoPack) idrCount int //处于缓冲中的关键帧数量 + nalulenSize int } func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { @@ -76,7 +76,6 @@ func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { } }, } - vt.PushByteStream = vt.pushByteStream vt.PushNalu = vt.pushNalu vt.Stream = s vt.CodecID = codec @@ -167,7 +166,11 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) { naluType := nalu[0] & naluTypeBitmask switch naluType { case codec.NALU_SPS: + vt.ExtraData.NALUs[0] = nalu + vt.SPSInfo, _ = codec.ParseSPS(nalu) case codec.NALU_PPS: + vt.ExtraData.NALUs[1] = nalu + vt.ExtraData.Payload = codec.BuildH264SeqHeaderFromSpsPps(vt.ExtraData.NALUs[0], vt.ExtraData.NALUs[1]) case codec.NALU_STAPB: stapaHeaderSize = 3 fallthrough @@ -335,6 +338,22 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) { continue } switch naluType { + case codec.NAL_UNIT_VPS: + vps = nalu + vt.ExtraData.NALUs[0] = vps + case codec.NAL_UNIT_SPS: + sps = nalu + vt.ExtraData.NALUs[1] = sps + vt.SPSInfo, _ = codec.ParseHevcSPS(nalu) + case codec.NAL_UNIT_PPS: + pps = nalu + vt.ExtraData.NALUs[2] = pps + extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vps, sps, pps) + if err != nil { + return + } + vt.ExtraData.Payload = extraData + // 4.4.2. Aggregation Packets (APs) (p25) /* 0 1 2 3 @@ -457,18 +476,15 @@ func (vt *VideoTrack) pushNalu(ts uint32, cts uint32, nalus ...[]byte) { func (vt *VideoTrack) current() *VideoPack { return vt.CurrentValue().(*VideoPack) } -func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) { - if payload[1] != 0 { - return - } else { +func (vt *VideoTrack) PushByteStream(ts uint32, payload []byte) { + if payload[1] == 0 { vt.CodecID = payload[0] & 0x0F - var nalulenSize int switch vt.CodecID { case 7: var info codec.AVCDecoderConfigurationRecord if _, err := info.Unmarshal(payload[5:]); err == nil { vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit) - nalulenSize = int(info.LengthSizeMinusOne&3 + 1) + vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1) vt.ExtraData = &VideoPack{ NALUs: [][]byte{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit}, } @@ -478,7 +494,7 @@ func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) { case 12: if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(payload); err == nil { vt.SPSInfo, _ = codec.ParseSPS(sps) - nalulenSize = int(payload[26]) & 0x03 + vt.nalulenSize = int(payload[26]) & 0x03 vt.ExtraData = &VideoPack{ NALUs: [][]byte{vps, sps, pps}, } @@ -486,29 +502,27 @@ func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) { vt.Stream.VideoTracks.AddTrack("h265", vt) } } - // 已完成序列帧组装,重置Push函数,从Payload中提取Nalu供非bytestream格式使用 - vt.PushByteStream = func(ts uint32, payload []byte) { - pack := vt.current() - if len(payload) < 4 { - return - } - vt.bytes += len(payload) - pack.IDR = payload[0]>>4 == 1 - pack.Timestamp = ts - pack.Sequence = vt.PacketCount - pack.Payload = payload - pack.CompositionTime = utils.BigEndian.Uint24(payload[2:]) - pack.NALUs = nil - for nalus := payload[5:]; len(nalus) > nalulenSize; { - nalulen := 0 - for i := 0; i < nalulenSize; i++ { - nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1)) - } - pack.NALUs = append(pack.NALUs, nalus[nalulenSize:nalulen+nalulenSize]) - nalus = nalus[nalulen+nalulenSize:] - } - vt.push(pack) + } else { + pack := vt.current() + if len(payload) < 4 { + return } + vt.bytes += len(payload) + pack.IDR = payload[0]>>4 == 1 + pack.Timestamp = ts + pack.Sequence = vt.PacketCount + pack.Payload = payload + pack.CompositionTime = utils.BigEndian.Uint24(payload[2:]) + pack.NALUs = nil + for nalus := payload[5:]; len(nalus) > vt.nalulenSize; { + nalulen := 0 + for i := 0; i < vt.nalulenSize; i++ { + nalulen += int(nalus[i]) << (8 * (vt.nalulenSize - i - 1)) + } + pack.NALUs = append(pack.NALUs, nalus[vt.nalulenSize:nalulen+vt.nalulenSize]) + nalus = nalus[nalulen+vt.nalulenSize:] + } + vt.push(pack) } }