refactoring the hls code

This commit is contained in:
notch
2021-01-09 20:12:33 +08:00
parent 5723f539e5
commit 513e28907a
5 changed files with 390 additions and 352 deletions

View File

@@ -1,346 +0,0 @@
// Copyright calabashdad. https://github.com/calabashdad/seal.git
//
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package hls
import (
"bytes"
"errors"
"fmt"
"io"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/cnotch/ipchub/av/format/mpegts"
"github.com/cnotch/ipchub/utils/murmur"
"github.com/cnotch/xlog"
)
// drop the segment when duration of ts too small.
const hlsSegmentMinDurationMs = 100
// in ms, for HLS aac flush the audio
const hlsAacDelay = 100
const hlsRemainSegments = 3
// Muxer the HLS stream(m3u8 and ts files).
type Muxer struct {
path string // 流路径
hlsFragment int // 每个片段长度
// m3u8 segments
l sync.RWMutex
segments []*segment
memory bool // 使用内存存储缓存到硬盘
segmentPath string // 缓存路径
sequenceNo int // 片段序号
current *segment //current segment
// last http access time
lastAccessTime time.Time
logger *xlog.Logger
audioRate int
afCache *mpegts.Frame // audio frame cache
afCacheBuff bytes.Buffer
// time jitter for aac
aacJitter *hlsAacJitter
}
// NewMuxer .
func NewMuxer(path string, hlsFragment int, segmentPath string, audioRate int, logger *xlog.Logger) (*Muxer, error) {
muxer := &Muxer{
path: path,
hlsFragment: hlsFragment,
memory: segmentPath == "",
segmentPath: segmentPath,
logger: logger,
lastAccessTime: time.Now(),
sequenceNo: 0,
audioRate: audioRate,
aacJitter: newHlsAacJitter(),
}
if err := muxer.segmentOpen(0); err != nil {
return nil, err
}
// set the current segment to sequence header,
// when close the segement, it will write a discontinuity to m3u8 file.
muxer.current.isSequenceHeader = true
return muxer, nil
}
// open a new segment, a new ts file
// segmentStartDts use to calc the segment duration, use 0 for the first segment of hls
func (muxer *Muxer) segmentOpen(segmentStartDts int64) (err error) {
if nil != muxer.current {
// has already opened, ignore segment open
return
}
// new segment
muxer.sequenceNo++
curr := newSegment(muxer.memory)
curr.sequenceNo = muxer.sequenceNo
curr.segmentStartPts = segmentStartDts
curr.uri = "/streams" + muxer.path + "/" + strconv.Itoa(muxer.sequenceNo) + ".ts"
tsFileName := fmt.Sprintf("%d_%d.ts", murmur.OfString(muxer.path), curr.sequenceNo)
tsFilePath := filepath.Join(muxer.segmentPath, tsFileName)
if err = curr.file.open(tsFilePath); err != nil {
return
}
muxer.current = curr
return
}
// WriteMpegtsFrame implements mpegts.FrameWriter
func (muxer *Muxer) WriteMpegtsFrame(frame *mpegts.Frame) (err error) {
// if current is NULL, segment is not open, ignore the flush event.
if nil == muxer.current {
return
}
if len(frame.Payload) <= 0 {
return
}
if frame.IsAudio() {
if muxer.afCache == nil {
pts := muxer.aacJitter.onBufferStart(frame.Pts, muxer.audioRate)
headerFrame := *frame
headerFrame.Dts = pts
headerFrame.Pts = pts
muxer.afCache = &headerFrame
muxer.afCacheBuff.Write(frame.Payload)
} else {
muxer.afCacheBuff.Write(frame.Header)
muxer.afCacheBuff.Write(frame.Payload)
muxer.aacJitter.onBufferContinue()
}
if frame.Pts-muxer.afCache.Pts > hlsAacDelay*90 {
return muxer.flushAudioCache()
}
// reap when current source is pure audio.
// it maybe changed when stream info changed,
// for example, pure audio when start, audio/video when publishing,
// pure audio again for audio disabled.
// so we reap event when the audio incoming when segment overflow.
// we use absolutely overflow of segment to make jwplayer/ffplay happy
if muxer.isSegmentAbsolutelyOverflow() {
if err = muxer.reapSegment(frame.Pts); err != nil {
return
}
}
return
}
if frame.IsKeyFrame() && muxer.isSegmentOverflow() {
if err = muxer.reapSegment(frame.Pts); err != nil {
return
}
}
// flush video when got one
if err = muxer.flushFrame(frame); err != nil {
return
}
return
}
func (muxer *Muxer) flushAudioCache() (err error) {
if muxer.afCache == nil {
return
}
muxer.afCache.Payload = muxer.afCacheBuff.Bytes()
err = muxer.flushFrame(muxer.afCache)
muxer.afCache = nil
muxer.afCacheBuff.Reset()
return
}
func (muxer *Muxer) flushFrame(frame *mpegts.Frame) (err error) {
muxer.current.updateDuration(frame.Pts)
if err = muxer.current.file.writeFrame(frame); err != nil {
return
}
return
}
// close segment(ts)
func (muxer *Muxer) segmentClose(muxerClosed bool) (err error) {
if nil == muxer.current {
return
}
muxer.current.file.close()
muxer.l.Lock()
defer muxer.l.Unlock()
remain := hlsRemainSegments
if muxerClosed {
remain = 0
}
// valid, add to segments if segment duration is ok
if muxer.current.duration*1000 >= hlsSegmentMinDurationMs {
muxer.segments = append(muxer.segments, muxer.current)
muxer.current = nil
} else {
// reuse current segment index
muxer.sequenceNo--
muxer.current.file.delete()
}
// 仅保留3个
if len(muxer.segments) > remain {
for i := 0; i < len(muxer.segments)-remain; i++ {
// // 可以考虑异步删除
// if muxerClosed {
// muxer.segments[i].file.delete()
// } else {
// file := muxer.segments[i].file
// delay := time.Duration(2*muxer.hlsFragment) * time.Second
// scheduler.AfterFunc(delay, func() {
// file.delete()
// }, "hls segment file delay(1.5*hlsFragment).")
// }
muxer.segments[i].file.delete()
muxer.segments[i] = nil
}
copy(muxer.segments[:remain], muxer.segments[len(muxer.segments)-remain:])
muxer.segments = muxer.segments[:remain]
}
return
}
// reopen the muxer for a new hls segment,
// close current segment, open a new segment,
// then write the key frame to the new segment.
// so, user must reap_segment then flush_video to hls muxer.
func (muxer *Muxer) reapSegment(segmentStartDts int64) (err error) {
if err = muxer.segmentClose(false); err != nil {
return
}
if err = muxer.segmentOpen(segmentStartDts); err != nil {
return
}
// segment open, flush the audio.
// @see: ngx_rtmp_hls_open_fragment
/* start fragment with audio to make iPhone happy */
err = muxer.flushAudioCache()
return
}
// whether segment overflow,
// that is whether the current segment duration>=(the segment in config)
func (muxer *Muxer) isSegmentOverflow() bool {
return muxer.current.duration >= float64(muxer.hlsFragment)
}
// whether segment absolutely overflow, for pure audio to reap segment,
// that is whether the current segment duration>=2*(the segment in config)
func (muxer *Muxer) isSegmentAbsolutelyOverflow() bool {
if nil == muxer.current {
return true
}
res := muxer.current.duration >= float64(2*muxer.hlsFragment)
return res
}
var m3u8Pool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 512))
},
}
// M3u8 获取 m3u8 播放列表
func (muxer *Muxer) M3u8(token string) ([]byte, error) {
muxer.lastAccessTime = time.Now()
w := m3u8Pool.Get().(*bytes.Buffer)
w.Reset()
defer m3u8Pool.Put(w)
muxer.l.RLock()
defer muxer.l.RUnlock()
segments := muxer.segments
if len(segments) < hlsRemainSegments {
return nil, errors.New("playlist is not enough,maybe the HLS stream just started")
}
seq := segments[0].sequenceNo
var maxDuration float64
for _, seg := range segments {
if seg.duration > maxDuration {
maxDuration = seg.duration
}
}
duration := int32(maxDuration + 1)
// 描述部分
fmt.Fprintf(w,
"#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:%d\n#EXT-X-MEDIA-SEQUENCE:%d\n\n",
duration, seq)
// 列表部分
for _, seg := range segments {
if seg.isSequenceHeader {
// #EXT-X-DISCONTINUITY\n
fmt.Fprint(w, "#EXT-X-DISCONTINUITY\n")
}
if len(token) > 0 {
fmt.Fprintf(w, "#EXTINF:%.3f,\n%s?token=%s\n",
seg.duration,
seg.uri, token)
} else {
fmt.Fprintf(w, "#EXTINF:%.3f,\n%s\n",
seg.duration,
seg.uri)
}
}
return w.Bytes(), nil
}
// Segment 获取 segment
func (muxer *Muxer) Segment(seq int) (io.Reader, int, error) {
muxer.lastAccessTime = time.Now()
muxer.l.RLock()
defer muxer.l.RUnlock()
for _, seg := range muxer.segments {
if seg.sequenceNo == seq {
return seg.file.get()
}
}
return nil, 0, errors.New("Not found TSFile")
}
// LastAccessTime 最后hls访问时间
func (muxer *Muxer) LastAccessTime() time.Time {
return muxer.lastAccessTime
}
// Close .
func (muxer *Muxer) Close() error {
muxer.flushAudioCache()
return muxer.segmentClose(true)
}

140
av/format/hls/playlist.go Executable file
View File

@@ -0,0 +1,140 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package hls
import (
"bytes"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
)
const hlsRemainSegments = 3
// Playlist the HLS playlist(m3u8 and ts files).
type Playlist struct {
// m3u8 segments
l sync.RWMutex
segments []*segment
// last http access time
lastAccessTime int64
}
// NewPlaylist .
func NewPlaylist() *Playlist {
return &Playlist{
lastAccessTime: time.Now().UnixNano(),
}
}
var m3u8Pool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 512))
},
}
// M3u8 获取 m3u8 播放列表
func (pl *Playlist) M3u8(token string) ([]byte, error) {
atomic.StoreInt64(&pl.lastAccessTime, time.Now().UnixNano())
w := m3u8Pool.Get().(*bytes.Buffer)
w.Reset()
defer m3u8Pool.Put(w)
pl.l.RLock()
defer pl.l.RUnlock()
segments := pl.segments
if len(segments) < hlsRemainSegments {
return nil, errors.New("playlist is not enough,maybe the HLS stream just started")
}
seq := segments[0].sequenceNo
var maxDuration float64
for _, seg := range segments {
if seg.duration > maxDuration {
maxDuration = seg.duration
}
}
duration := int32(maxDuration + 1)
// 描述部分
fmt.Fprintf(w,
"#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:%d\n#EXT-X-MEDIA-SEQUENCE:%d\n\n",
duration, seq)
// 列表部分
for _, seg := range segments {
if seg.isSequenceHeader {
// #EXT-X-DISCONTINUITY\n
fmt.Fprint(w, "#EXT-X-DISCONTINUITY\n")
}
if len(token) > 0 {
fmt.Fprintf(w, "#EXTINF:%.3f,\n%s?token=%s\n",
seg.duration,
seg.uri, token)
} else {
fmt.Fprintf(w, "#EXTINF:%.3f,\n%s\n",
seg.duration,
seg.uri)
}
}
return w.Bytes(), nil
}
// Segment 获取 segment
func (pl *Playlist) Segment(seq int) (io.Reader, int, error) {
atomic.StoreInt64(&pl.lastAccessTime, time.Now().UnixNano())
pl.l.RLock()
defer pl.l.RUnlock()
for _, seg := range pl.segments {
if seg.sequenceNo == seq {
return seg.file.get()
}
}
return nil, 0, errors.New("Not found TSFile")
}
// LastAccessTime 最后hls访问时间
func (pl *Playlist) LastAccessTime() time.Time {
lastAccessTime := atomic.LoadInt64(&pl.lastAccessTime)
return time.Unix(0, lastAccessTime)
}
// Close .
func (pl *Playlist) Close() error {
pl.l.Lock()
defer pl.l.Unlock()
pl.clearSegments(0)
return nil
}
func (pl *Playlist) addSegment(seg *segment) {
pl.l.Lock()
defer pl.l.Unlock()
pl.segments = append(pl.segments, seg)
pl.clearSegments(hlsRemainSegments)
}
func (pl *Playlist) clearSegments(remain int) {
// TODO: 延时异步删除?
if len(pl.segments) > remain {
for i := 0; i < len(pl.segments)-remain; i++ {
pl.segments[i].file.delete()
pl.segments[i] = nil
}
copy(pl.segments[:remain], pl.segments[len(pl.segments)-remain:])
pl.segments = pl.segments[:remain]
}
return
}

240
av/format/hls/segmentgenerator.go Executable file
View File

@@ -0,0 +1,240 @@
// Copyright calabashdad. https://github.com/calabashdad/seal.git
//
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package hls
import (
"bytes"
"fmt"
"path/filepath"
"strconv"
"github.com/cnotch/ipchub/av/format/mpegts"
"github.com/cnotch/ipchub/utils/murmur"
"github.com/cnotch/xlog"
)
// drop the segment when duration of ts too small.
const hlsSegmentMinDurationMs = 100
// in ms, for HLS aac flush the audio
const hlsAacDelay = 100
// SegmentGenerator generate the HLS ts segment.
type SegmentGenerator struct {
playlist *Playlist // 播放列表
path string // 流路径
hlsFragment int // 每个片段长度
memory bool // 使用内存存储缓存到硬盘
segmentPath string // 缓存文件路径
sequenceNo int // 片段序号
current *segment //current segment
logger *xlog.Logger
audioRate int
afCache *mpegts.Frame // audio frame cache
afCacheBuff bytes.Buffer
// time jitter for aac
aacJitter *hlsAacJitter
}
// NewSegmentGenerator .
func NewSegmentGenerator(playlist *Playlist, path string, hlsFragment int, segmentPath string, audioRate int, logger *xlog.Logger) (*SegmentGenerator, error) {
sg := &SegmentGenerator{
playlist: playlist,
path: path,
hlsFragment: hlsFragment,
memory: segmentPath == "",
segmentPath: segmentPath,
logger: logger,
sequenceNo: 0,
audioRate: audioRate,
aacJitter: newHlsAacJitter(),
}
if err := sg.segmentOpen(0); err != nil {
return nil, err
}
// set the current segment to sequence header,
// when close the segement, it will write a discontinuity to m3u8 file.
sg.current.isSequenceHeader = true
return sg, nil
}
// open a new segment, a new ts file
// segmentStartDts use to calc the segment duration, use 0 for the first segment of hls
func (sg *SegmentGenerator) segmentOpen(segmentStartDts int64) (err error) {
if nil != sg.current {
// has already opened, ignore segment open
return
}
// new segment
sg.sequenceNo++
curr := newSegment(sg.memory)
curr.sequenceNo = sg.sequenceNo
curr.segmentStartPts = segmentStartDts
curr.uri = "/streams" + sg.path + "/" + strconv.Itoa(sg.sequenceNo) + ".ts"
tsFileName := fmt.Sprintf("%d_%d.ts", murmur.OfString(sg.path), curr.sequenceNo)
tsFilePath := filepath.Join(sg.segmentPath, tsFileName)
if err = curr.file.open(tsFilePath); err != nil {
return
}
sg.current = curr
return
}
// WriteMpegtsFrame implements mpegts.FrameWriter
func (sg *SegmentGenerator) WriteMpegtsFrame(frame *mpegts.Frame) (err error) {
// if current is NULL, segment is not open, ignore the flush event.
if nil == sg.current {
return
}
if len(frame.Payload) <= 0 {
return
}
if frame.IsAudio() {
if sg.afCache == nil {
pts := sg.aacJitter.onBufferStart(frame.Pts, sg.audioRate)
headerFrame := *frame
headerFrame.Dts = pts
headerFrame.Pts = pts
sg.afCache = &headerFrame
sg.afCacheBuff.Write(frame.Payload)
} else {
sg.afCacheBuff.Write(frame.Header)
sg.afCacheBuff.Write(frame.Payload)
sg.aacJitter.onBufferContinue()
}
if frame.Pts-sg.afCache.Pts > hlsAacDelay*90 {
return sg.flushAudioCache()
}
// reap when current source is pure audio.
// it maybe changed when stream info changed,
// for example, pure audio when start, audio/video when publishing,
// pure audio again for audio disabled.
// so we reap event when the audio incoming when segment overflow.
// we use absolutely overflow of segment to make jwplayer/ffplay happy
if sg.isSegmentAbsolutelyOverflow() {
if err = sg.reapSegment(frame.Pts); err != nil {
return
}
}
return
}
if frame.IsKeyFrame() && sg.isSegmentOverflow() {
if err = sg.reapSegment(frame.Pts); err != nil {
return
}
}
// flush video when got one
if err = sg.flushFrame(frame); err != nil {
return
}
return
}
func (sg *SegmentGenerator) flushAudioCache() (err error) {
if sg.afCache == nil {
return
}
sg.afCache.Payload = sg.afCacheBuff.Bytes()
err = sg.flushFrame(sg.afCache)
sg.afCache = nil
sg.afCacheBuff.Reset()
return
}
func (sg *SegmentGenerator) flushFrame(frame *mpegts.Frame) (err error) {
sg.current.updateDuration(frame.Pts)
if err = sg.current.file.writeFrame(frame); err != nil {
return
}
return
}
// close segment(ts)
func (sg *SegmentGenerator) segmentClose() (err error) {
if nil == sg.current {
return
}
curr := sg.current
sg.current = nil
curr.file.close()
if curr.duration*1000 < hlsSegmentMinDurationMs {
// reuse current segment index
sg.sequenceNo--
curr.file.delete()
} else {
sg.playlist.addSegment(curr)
}
return
}
// reopen the sg for a new hls segment,
// close current segment, open a new segment,
// then write the key frame to the new segment.
// so, user must reap_segment then flush_video to hls sg.
func (sg *SegmentGenerator) reapSegment(segmentStartDts int64) (err error) {
if err = sg.segmentClose(); err != nil {
return
}
if err = sg.segmentOpen(segmentStartDts); err != nil {
return
}
// segment open, flush the audio.
// @see: ngx_rtmp_hls_open_fragment
/* start fragment with audio to make iPhone happy */
err = sg.flushAudioCache()
return
}
// whether segment overflow,
// that is whether the current segment duration>=(the segment in config)
func (sg *SegmentGenerator) isSegmentOverflow() bool {
return sg.current.duration >= float64(sg.hlsFragment)
}
// whether segment absolutely overflow, for pure audio to reap segment,
// that is whether the current segment duration>=2*(the segment in config)
func (sg *SegmentGenerator) isSegmentAbsolutelyOverflow() bool {
if nil == sg.current {
return true
}
res := sg.current.duration >= float64(2*sg.hlsFragment)
return res
}
// Close .
func (sg *SegmentGenerator) Close() error {
if nil == sg.current {
return nil
}
curr := sg.current
sg.current = nil
curr.file.close()
curr.file.delete()
return nil
}

View File

@@ -56,7 +56,7 @@ type Stream struct {
flvConsumptions consumptions
flvCache packCache
tsMuxer *mpegts.MuxerAvcAac
hlsMuxer *hls.Muxer
hlsPlaylist *hls.Playlist
attrs map[string]string // 流属性
multicast Multicastable
hls Hlsable
@@ -128,20 +128,21 @@ func (s *Stream) prepareOtherStream() {
// prepare av.Frame -> mpegts.Frame
if s.Video.Codec == "H264" {
hlsMuxer, err := hls.NewMuxer(s.path,
hlsPlaylist := hls.NewPlaylist()
sg, err := hls.NewSegmentGenerator(hlsPlaylist, s.path,
config.HlsFragment(),
config.HlsPath(), s.Audio.SampleRate,
s.logger.With(xlog.Fields(xlog.F("extra", "hls.Muxer"))))
if err != nil {
return
}
tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, hlsMuxer,
tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, sg,
s.logger.With(xlog.Fields(xlog.F("extra", "ts.Muxer"))))
if err2 != nil {
return
}
s.tsMuxer = tsMuxer
s.hlsMuxer = hlsMuxer
s.hlsPlaylist = hlsPlaylist
}
}
@@ -183,6 +184,7 @@ func (s *Stream) close(status int32) error {
// 关闭 hls
if s.tsMuxer != nil {
s.tsMuxer.Close()
s.hlsPlaylist.Close()
}
// 关闭 flv 消费者和 Muxer
@@ -246,7 +248,7 @@ func (s *Stream) Multicastable() Multicastable {
// Hlsable 返回支持hls能力不支持返回nil
func (s *Stream) Hlsable() Hlsable {
return s.hlsMuxer
return s.hlsPlaylist
}
func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra string, useGopCache bool) CID {

View File

@@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/media"
"github.com/cnotch/xlog"
)
@@ -34,7 +35,8 @@ func GetM3u8(logger *xlog.Logger, path string, token string, addr string, w http
var cont []byte
// 最多等待完成 30 秒
for i := 0; i < 30; i++ {
waitSeconds := int(1.5 * float64(3*config.HlsFragment()))
for i := 0; i < waitSeconds; i++ {
cont, err = c.M3u8(token)
if err == nil {
break