From 513e28907ac1ada87fbd77f873dcd027a5400d2b Mon Sep 17 00:00:00 2001 From: notch Date: Sat, 9 Jan 2021 20:12:33 +0800 Subject: [PATCH] refactoring the hls code --- av/format/hls/muxer.go | 346 ------------------------------ av/format/hls/playlist.go | 140 ++++++++++++ av/format/hls/segmentgenerator.go | 240 +++++++++++++++++++++ media/stream.go | 12 +- service/hls/hls.go | 4 +- 5 files changed, 390 insertions(+), 352 deletions(-) delete mode 100755 av/format/hls/muxer.go create mode 100755 av/format/hls/playlist.go create mode 100755 av/format/hls/segmentgenerator.go diff --git a/av/format/hls/muxer.go b/av/format/hls/muxer.go deleted file mode 100755 index b72de0d..0000000 --- a/av/format/hls/muxer.go +++ /dev/null @@ -1,346 +0,0 @@ -// Copyright calabashdad. https://github.com/calabashdad/seal.git -// -// Copyright (c) 2019,CAOHONGJU All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package hls - -import ( - "bytes" - "errors" - "fmt" - "io" - "path/filepath" - "strconv" - "sync" - "time" - - "github.com/cnotch/ipchub/av/format/mpegts" - "github.com/cnotch/ipchub/utils/murmur" - "github.com/cnotch/xlog" -) - -// drop the segment when duration of ts too small. -const hlsSegmentMinDurationMs = 100 - -// in ms, for HLS aac flush the audio -const hlsAacDelay = 100 -const hlsRemainSegments = 3 - -// Muxer the HLS stream(m3u8 and ts files). -type Muxer struct { - path string // 流路径 - hlsFragment int // 每个片段长度 - - // m3u8 segments - l sync.RWMutex - segments []*segment - memory bool // 使用内存存储缓存到硬盘 - segmentPath string // 缓存路径 - - sequenceNo int // 片段序号 - current *segment //current segment - - // last http access time - lastAccessTime time.Time - logger *xlog.Logger - - audioRate int - afCache *mpegts.Frame // audio frame cache - afCacheBuff bytes.Buffer - // time jitter for aac - aacJitter *hlsAacJitter -} - -// NewMuxer . -func NewMuxer(path string, hlsFragment int, segmentPath string, audioRate int, logger *xlog.Logger) (*Muxer, error) { - muxer := &Muxer{ - path: path, - hlsFragment: hlsFragment, - memory: segmentPath == "", - segmentPath: segmentPath, - logger: logger, - lastAccessTime: time.Now(), - sequenceNo: 0, - audioRate: audioRate, - aacJitter: newHlsAacJitter(), - } - - if err := muxer.segmentOpen(0); err != nil { - return nil, err - } - - // set the current segment to sequence header, - // when close the segement, it will write a discontinuity to m3u8 file. - muxer.current.isSequenceHeader = true - return muxer, nil -} - -// open a new segment, a new ts file -// segmentStartDts use to calc the segment duration, use 0 for the first segment of hls -func (muxer *Muxer) segmentOpen(segmentStartDts int64) (err error) { - if nil != muxer.current { - // has already opened, ignore segment open - return - } - - // new segment - muxer.sequenceNo++ - curr := newSegment(muxer.memory) - curr.sequenceNo = muxer.sequenceNo - curr.segmentStartPts = segmentStartDts - curr.uri = "/streams" + muxer.path + "/" + strconv.Itoa(muxer.sequenceNo) + ".ts" - - tsFileName := fmt.Sprintf("%d_%d.ts", murmur.OfString(muxer.path), curr.sequenceNo) - tsFilePath := filepath.Join(muxer.segmentPath, tsFileName) - if err = curr.file.open(tsFilePath); err != nil { - return - } - - muxer.current = curr - return -} - -// WriteMpegtsFrame implements mpegts.FrameWriter -func (muxer *Muxer) WriteMpegtsFrame(frame *mpegts.Frame) (err error) { - // if current is NULL, segment is not open, ignore the flush event. - if nil == muxer.current { - return - } - if len(frame.Payload) <= 0 { - return - } - - if frame.IsAudio() { - if muxer.afCache == nil { - pts := muxer.aacJitter.onBufferStart(frame.Pts, muxer.audioRate) - headerFrame := *frame - headerFrame.Dts = pts - headerFrame.Pts = pts - muxer.afCache = &headerFrame - muxer.afCacheBuff.Write(frame.Payload) - } else { - muxer.afCacheBuff.Write(frame.Header) - muxer.afCacheBuff.Write(frame.Payload) - muxer.aacJitter.onBufferContinue() - } - - if frame.Pts-muxer.afCache.Pts > hlsAacDelay*90 { - return muxer.flushAudioCache() - } - - // reap when current source is pure audio. - // it maybe changed when stream info changed, - // for example, pure audio when start, audio/video when publishing, - // pure audio again for audio disabled. - // so we reap event when the audio incoming when segment overflow. - // we use absolutely overflow of segment to make jwplayer/ffplay happy - if muxer.isSegmentAbsolutelyOverflow() { - if err = muxer.reapSegment(frame.Pts); err != nil { - return - } - } - return - } - - if frame.IsKeyFrame() && muxer.isSegmentOverflow() { - if err = muxer.reapSegment(frame.Pts); err != nil { - return - } - } - - // flush video when got one - if err = muxer.flushFrame(frame); err != nil { - return - } - return -} - -func (muxer *Muxer) flushAudioCache() (err error) { - if muxer.afCache == nil { - return - } - - muxer.afCache.Payload = muxer.afCacheBuff.Bytes() - err = muxer.flushFrame(muxer.afCache) - muxer.afCache = nil - muxer.afCacheBuff.Reset() - return -} - -func (muxer *Muxer) flushFrame(frame *mpegts.Frame) (err error) { - muxer.current.updateDuration(frame.Pts) - if err = muxer.current.file.writeFrame(frame); err != nil { - return - } - return -} - -// close segment(ts) -func (muxer *Muxer) segmentClose(muxerClosed bool) (err error) { - if nil == muxer.current { - return - } - muxer.current.file.close() - - muxer.l.Lock() - defer muxer.l.Unlock() - - remain := hlsRemainSegments - if muxerClosed { - remain = 0 - } - - // valid, add to segments if segment duration is ok - if muxer.current.duration*1000 >= hlsSegmentMinDurationMs { - muxer.segments = append(muxer.segments, muxer.current) - muxer.current = nil - } else { - // reuse current segment index - muxer.sequenceNo-- - muxer.current.file.delete() - } - - // 仅保留3个 - if len(muxer.segments) > remain { - for i := 0; i < len(muxer.segments)-remain; i++ { - // // 可以考虑异步删除 - // if muxerClosed { - // muxer.segments[i].file.delete() - // } else { - // file := muxer.segments[i].file - // delay := time.Duration(2*muxer.hlsFragment) * time.Second - // scheduler.AfterFunc(delay, func() { - // file.delete() - // }, "hls segment file delay(1.5*hlsFragment).") - // } - muxer.segments[i].file.delete() - muxer.segments[i] = nil - } - copy(muxer.segments[:remain], muxer.segments[len(muxer.segments)-remain:]) - muxer.segments = muxer.segments[:remain] - } - - return -} - -// reopen the muxer for a new hls segment, -// close current segment, open a new segment, -// then write the key frame to the new segment. -// so, user must reap_segment then flush_video to hls muxer. -func (muxer *Muxer) reapSegment(segmentStartDts int64) (err error) { - if err = muxer.segmentClose(false); err != nil { - return - } - - if err = muxer.segmentOpen(segmentStartDts); err != nil { - return - } - - // segment open, flush the audio. - // @see: ngx_rtmp_hls_open_fragment - /* start fragment with audio to make iPhone happy */ - err = muxer.flushAudioCache() - - return -} - -// whether segment overflow, -// that is whether the current segment duration>=(the segment in config) -func (muxer *Muxer) isSegmentOverflow() bool { - return muxer.current.duration >= float64(muxer.hlsFragment) -} - -// whether segment absolutely overflow, for pure audio to reap segment, -// that is whether the current segment duration>=2*(the segment in config) -func (muxer *Muxer) isSegmentAbsolutelyOverflow() bool { - if nil == muxer.current { - return true - } - - res := muxer.current.duration >= float64(2*muxer.hlsFragment) - - return res -} - -var m3u8Pool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 512)) - }, -} - -// M3u8 获取 m3u8 播放列表 -func (muxer *Muxer) M3u8(token string) ([]byte, error) { - muxer.lastAccessTime = time.Now() - w := m3u8Pool.Get().(*bytes.Buffer) - w.Reset() - defer m3u8Pool.Put(w) - - muxer.l.RLock() - defer muxer.l.RUnlock() - segments := muxer.segments - - if len(segments) < hlsRemainSegments { - return nil, errors.New("playlist is not enough,maybe the HLS stream just started") - } - - seq := segments[0].sequenceNo - var maxDuration float64 - for _, seg := range segments { - if seg.duration > maxDuration { - maxDuration = seg.duration - } - } - duration := int32(maxDuration + 1) - // 描述部分 - fmt.Fprintf(w, - "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:%d\n#EXT-X-MEDIA-SEQUENCE:%d\n\n", - duration, seq) - - // 列表部分 - for _, seg := range segments { - if seg.isSequenceHeader { - // #EXT-X-DISCONTINUITY\n - fmt.Fprint(w, "#EXT-X-DISCONTINUITY\n") - } - - if len(token) > 0 { - fmt.Fprintf(w, "#EXTINF:%.3f,\n%s?token=%s\n", - seg.duration, - seg.uri, token) - } else { - fmt.Fprintf(w, "#EXTINF:%.3f,\n%s\n", - seg.duration, - seg.uri) - } - } - - return w.Bytes(), nil -} - -// Segment 获取 segment -func (muxer *Muxer) Segment(seq int) (io.Reader, int, error) { - muxer.lastAccessTime = time.Now() - muxer.l.RLock() - defer muxer.l.RUnlock() - - for _, seg := range muxer.segments { - if seg.sequenceNo == seq { - return seg.file.get() - } - } - return nil, 0, errors.New("Not found TSFile") -} - -// LastAccessTime 最后hls访问时间 -func (muxer *Muxer) LastAccessTime() time.Time { - return muxer.lastAccessTime -} - -// Close . -func (muxer *Muxer) Close() error { - muxer.flushAudioCache() - - return muxer.segmentClose(true) -} diff --git a/av/format/hls/playlist.go b/av/format/hls/playlist.go new file mode 100755 index 0000000..2f6c15b --- /dev/null +++ b/av/format/hls/playlist.go @@ -0,0 +1,140 @@ +// Copyright (c) 2019,CAOHONGJU All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package hls + +import ( + "bytes" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + "time" +) + +const hlsRemainSegments = 3 + +// Playlist the HLS playlist(m3u8 and ts files). +type Playlist struct { + // m3u8 segments + l sync.RWMutex + segments []*segment + + // last http access time + lastAccessTime int64 +} + +// NewPlaylist . +func NewPlaylist() *Playlist { + return &Playlist{ + lastAccessTime: time.Now().UnixNano(), + } + +} + +var m3u8Pool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 512)) + }, +} + +// M3u8 获取 m3u8 播放列表 +func (pl *Playlist) M3u8(token string) ([]byte, error) { + atomic.StoreInt64(&pl.lastAccessTime, time.Now().UnixNano()) + w := m3u8Pool.Get().(*bytes.Buffer) + w.Reset() + defer m3u8Pool.Put(w) + + pl.l.RLock() + defer pl.l.RUnlock() + segments := pl.segments + + if len(segments) < hlsRemainSegments { + return nil, errors.New("playlist is not enough,maybe the HLS stream just started") + } + + seq := segments[0].sequenceNo + var maxDuration float64 + for _, seg := range segments { + if seg.duration > maxDuration { + maxDuration = seg.duration + } + } + duration := int32(maxDuration + 1) + // 描述部分 + fmt.Fprintf(w, + "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:%d\n#EXT-X-MEDIA-SEQUENCE:%d\n\n", + duration, seq) + + // 列表部分 + for _, seg := range segments { + if seg.isSequenceHeader { + // #EXT-X-DISCONTINUITY\n + fmt.Fprint(w, "#EXT-X-DISCONTINUITY\n") + } + + if len(token) > 0 { + fmt.Fprintf(w, "#EXTINF:%.3f,\n%s?token=%s\n", + seg.duration, + seg.uri, token) + } else { + fmt.Fprintf(w, "#EXTINF:%.3f,\n%s\n", + seg.duration, + seg.uri) + } + } + + return w.Bytes(), nil +} + +// Segment 获取 segment +func (pl *Playlist) Segment(seq int) (io.Reader, int, error) { + atomic.StoreInt64(&pl.lastAccessTime, time.Now().UnixNano()) + pl.l.RLock() + defer pl.l.RUnlock() + + for _, seg := range pl.segments { + if seg.sequenceNo == seq { + return seg.file.get() + } + } + return nil, 0, errors.New("Not found TSFile") +} + +// LastAccessTime 最后hls访问时间 +func (pl *Playlist) LastAccessTime() time.Time { + lastAccessTime := atomic.LoadInt64(&pl.lastAccessTime) + return time.Unix(0, lastAccessTime) +} + +// Close . +func (pl *Playlist) Close() error { + pl.l.Lock() + defer pl.l.Unlock() + pl.clearSegments(0) + + return nil +} + +func (pl *Playlist) addSegment(seg *segment) { + pl.l.Lock() + defer pl.l.Unlock() + pl.segments = append(pl.segments, seg) + + pl.clearSegments(hlsRemainSegments) +} + +func (pl *Playlist) clearSegments(remain int) { + // TODO: 延时异步删除? + if len(pl.segments) > remain { + for i := 0; i < len(pl.segments)-remain; i++ { + pl.segments[i].file.delete() + pl.segments[i] = nil + } + copy(pl.segments[:remain], pl.segments[len(pl.segments)-remain:]) + pl.segments = pl.segments[:remain] + } + return +} diff --git a/av/format/hls/segmentgenerator.go b/av/format/hls/segmentgenerator.go new file mode 100755 index 0000000..b515997 --- /dev/null +++ b/av/format/hls/segmentgenerator.go @@ -0,0 +1,240 @@ +// Copyright calabashdad. https://github.com/calabashdad/seal.git +// +// Copyright (c) 2019,CAOHONGJU All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package hls + +import ( + "bytes" + "fmt" + "path/filepath" + "strconv" + + "github.com/cnotch/ipchub/av/format/mpegts" + "github.com/cnotch/ipchub/utils/murmur" + "github.com/cnotch/xlog" +) + +// drop the segment when duration of ts too small. +const hlsSegmentMinDurationMs = 100 + +// in ms, for HLS aac flush the audio +const hlsAacDelay = 100 + +// SegmentGenerator generate the HLS ts segment. +type SegmentGenerator struct { + playlist *Playlist // 播放列表 + path string // 流路径 + hlsFragment int // 每个片段长度 + + memory bool // 使用内存存储缓存到硬盘 + segmentPath string // 缓存文件路径 + + sequenceNo int // 片段序号 + current *segment //current segment + + logger *xlog.Logger + + audioRate int + afCache *mpegts.Frame // audio frame cache + afCacheBuff bytes.Buffer + // time jitter for aac + aacJitter *hlsAacJitter +} + +// NewSegmentGenerator . +func NewSegmentGenerator(playlist *Playlist, path string, hlsFragment int, segmentPath string, audioRate int, logger *xlog.Logger) (*SegmentGenerator, error) { + sg := &SegmentGenerator{ + playlist: playlist, + path: path, + hlsFragment: hlsFragment, + memory: segmentPath == "", + segmentPath: segmentPath, + logger: logger, + sequenceNo: 0, + audioRate: audioRate, + aacJitter: newHlsAacJitter(), + } + + if err := sg.segmentOpen(0); err != nil { + return nil, err + } + + // set the current segment to sequence header, + // when close the segement, it will write a discontinuity to m3u8 file. + sg.current.isSequenceHeader = true + return sg, nil +} + +// open a new segment, a new ts file +// segmentStartDts use to calc the segment duration, use 0 for the first segment of hls +func (sg *SegmentGenerator) segmentOpen(segmentStartDts int64) (err error) { + if nil != sg.current { + // has already opened, ignore segment open + return + } + + // new segment + sg.sequenceNo++ + curr := newSegment(sg.memory) + curr.sequenceNo = sg.sequenceNo + curr.segmentStartPts = segmentStartDts + curr.uri = "/streams" + sg.path + "/" + strconv.Itoa(sg.sequenceNo) + ".ts" + + tsFileName := fmt.Sprintf("%d_%d.ts", murmur.OfString(sg.path), curr.sequenceNo) + tsFilePath := filepath.Join(sg.segmentPath, tsFileName) + if err = curr.file.open(tsFilePath); err != nil { + return + } + + sg.current = curr + return +} + +// WriteMpegtsFrame implements mpegts.FrameWriter +func (sg *SegmentGenerator) WriteMpegtsFrame(frame *mpegts.Frame) (err error) { + // if current is NULL, segment is not open, ignore the flush event. + if nil == sg.current { + return + } + if len(frame.Payload) <= 0 { + return + } + + if frame.IsAudio() { + if sg.afCache == nil { + pts := sg.aacJitter.onBufferStart(frame.Pts, sg.audioRate) + headerFrame := *frame + headerFrame.Dts = pts + headerFrame.Pts = pts + sg.afCache = &headerFrame + sg.afCacheBuff.Write(frame.Payload) + } else { + sg.afCacheBuff.Write(frame.Header) + sg.afCacheBuff.Write(frame.Payload) + sg.aacJitter.onBufferContinue() + } + + if frame.Pts-sg.afCache.Pts > hlsAacDelay*90 { + return sg.flushAudioCache() + } + + // reap when current source is pure audio. + // it maybe changed when stream info changed, + // for example, pure audio when start, audio/video when publishing, + // pure audio again for audio disabled. + // so we reap event when the audio incoming when segment overflow. + // we use absolutely overflow of segment to make jwplayer/ffplay happy + if sg.isSegmentAbsolutelyOverflow() { + if err = sg.reapSegment(frame.Pts); err != nil { + return + } + } + return + } + + if frame.IsKeyFrame() && sg.isSegmentOverflow() { + if err = sg.reapSegment(frame.Pts); err != nil { + return + } + } + + // flush video when got one + if err = sg.flushFrame(frame); err != nil { + return + } + return +} + +func (sg *SegmentGenerator) flushAudioCache() (err error) { + if sg.afCache == nil { + return + } + + sg.afCache.Payload = sg.afCacheBuff.Bytes() + err = sg.flushFrame(sg.afCache) + sg.afCache = nil + sg.afCacheBuff.Reset() + return +} + +func (sg *SegmentGenerator) flushFrame(frame *mpegts.Frame) (err error) { + sg.current.updateDuration(frame.Pts) + if err = sg.current.file.writeFrame(frame); err != nil { + return + } + return +} + +// close segment(ts) +func (sg *SegmentGenerator) segmentClose() (err error) { + if nil == sg.current { + return + } + + curr := sg.current + sg.current = nil + curr.file.close() + if curr.duration*1000 < hlsSegmentMinDurationMs { + // reuse current segment index + sg.sequenceNo-- + curr.file.delete() + } else { + sg.playlist.addSegment(curr) + } + return +} + +// reopen the sg for a new hls segment, +// close current segment, open a new segment, +// then write the key frame to the new segment. +// so, user must reap_segment then flush_video to hls sg. +func (sg *SegmentGenerator) reapSegment(segmentStartDts int64) (err error) { + if err = sg.segmentClose(); err != nil { + return + } + + if err = sg.segmentOpen(segmentStartDts); err != nil { + return + } + + // segment open, flush the audio. + // @see: ngx_rtmp_hls_open_fragment + /* start fragment with audio to make iPhone happy */ + err = sg.flushAudioCache() + + return +} + +// whether segment overflow, +// that is whether the current segment duration>=(the segment in config) +func (sg *SegmentGenerator) isSegmentOverflow() bool { + return sg.current.duration >= float64(sg.hlsFragment) +} + +// whether segment absolutely overflow, for pure audio to reap segment, +// that is whether the current segment duration>=2*(the segment in config) +func (sg *SegmentGenerator) isSegmentAbsolutelyOverflow() bool { + if nil == sg.current { + return true + } + + res := sg.current.duration >= float64(2*sg.hlsFragment) + + return res +} + +// Close . +func (sg *SegmentGenerator) Close() error { + if nil == sg.current { + return nil + } + + curr := sg.current + sg.current = nil + curr.file.close() + curr.file.delete() + return nil +} diff --git a/media/stream.go b/media/stream.go index 99d3394..a4de352 100755 --- a/media/stream.go +++ b/media/stream.go @@ -56,7 +56,7 @@ type Stream struct { flvConsumptions consumptions flvCache packCache tsMuxer *mpegts.MuxerAvcAac - hlsMuxer *hls.Muxer + hlsPlaylist *hls.Playlist attrs map[string]string // 流属性 multicast Multicastable hls Hlsable @@ -128,20 +128,21 @@ func (s *Stream) prepareOtherStream() { // prepare av.Frame -> mpegts.Frame if s.Video.Codec == "H264" { - hlsMuxer, err := hls.NewMuxer(s.path, + hlsPlaylist := hls.NewPlaylist() + sg, err := hls.NewSegmentGenerator(hlsPlaylist, s.path, config.HlsFragment(), config.HlsPath(), s.Audio.SampleRate, s.logger.With(xlog.Fields(xlog.F("extra", "hls.Muxer")))) if err != nil { return } - tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, hlsMuxer, + tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, sg, s.logger.With(xlog.Fields(xlog.F("extra", "ts.Muxer")))) if err2 != nil { return } s.tsMuxer = tsMuxer - s.hlsMuxer = hlsMuxer + s.hlsPlaylist = hlsPlaylist } } @@ -183,6 +184,7 @@ func (s *Stream) close(status int32) error { // 关闭 hls if s.tsMuxer != nil { s.tsMuxer.Close() + s.hlsPlaylist.Close() } // 关闭 flv 消费者和 Muxer @@ -246,7 +248,7 @@ func (s *Stream) Multicastable() Multicastable { // Hlsable 返回支持hls能力,不支持返回nil func (s *Stream) Hlsable() Hlsable { - return s.hlsMuxer + return s.hlsPlaylist } func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra string, useGopCache bool) CID { diff --git a/service/hls/hls.go b/service/hls/hls.go index 602fc95..1adec05 100755 --- a/service/hls/hls.go +++ b/service/hls/hls.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/cnotch/ipchub/config" "github.com/cnotch/ipchub/media" "github.com/cnotch/xlog" ) @@ -34,7 +35,8 @@ func GetM3u8(logger *xlog.Logger, path string, token string, addr string, w http var cont []byte // 最多等待完成 30 秒 - for i := 0; i < 30; i++ { + waitSeconds := int(1.5 * float64(3*config.HlsFragment())) + for i := 0; i < waitSeconds; i++ { cont, err = c.M3u8(token) if err == nil { break