add hls service

This commit is contained in:
notch
2020-12-27 18:12:22 +08:00
parent 688b3f4572
commit d2f6c23c3d
7 changed files with 176 additions and 27 deletions

View File

@@ -177,7 +177,7 @@ func HlsEnable() bool {
// HlsFragment TS片段时长s
func HlsFragment() int {
return 3
return 6
}
// HlsPath hls 存储目录

View File

@@ -22,9 +22,10 @@ type Multicastable interface {
// Hlsable 支持Hls访问
type Hlsable interface {
WriteM3U8PlayListTo(w io.Writer) error
GetTS(seq int) ([]byte, error)
M3u8() ([]byte, error)
Segment(seq int) (io.Reader,int, error)
LastAccessTime() time.Time
Close() error
}
// Option 配置 Stream 的选项接口

View File

@@ -14,6 +14,8 @@ import (
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/protos/hls"
"github.com/cnotch/ipchub/protos/mpegts"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/ipchub/utils"
@@ -52,6 +54,8 @@ type Stream struct {
flvMuxer flvMuxer
flvConsumptions consumptions
flvCache packCache
tsMuxer *mpegts.MuxerAvcAac
hlsMuxer *hls.Muxer
attrs map[string]string // 流属性
multicast Multicastable
hls Hlsable
@@ -120,6 +124,24 @@ func (s *Stream) prepareOtherStream() {
s.flvCache = emptyCache{}
s.flvMuxer = emptyFlvMuxer{}
}
// prepare av.Frame -> mpegts.Frame
if s.Video.Codec == "H264" {
hlsMuxer, err := hls.NewMuxer(s.path,
config.HlsFragment(),
config.HlsPath(),
s.logger.With(xlog.Fields(xlog.F("extra", "hls.Muxer"))))
if err != nil {
return
}
tsMuxer, err2 := mpegts.NewMuxerAvcAac(s.Video, s.Audio, hlsMuxer,
s.logger.With(xlog.Fields(xlog.F("extra", "ts.Muxer"))))
if err2 != nil {
return
}
s.tsMuxer = tsMuxer
s.hlsMuxer = hlsMuxer
}
}
// Path 流路径
@@ -157,6 +179,12 @@ func (s *Stream) close(status int32) error {
}
atomic.StoreInt32(&s.status, status)
// 关闭 hls
if s.hlsMuxer != nil {
s.tsMuxer.Close()
s.hlsMuxer.Close()
}
// 关闭 flv 消费者和 Muxer
s.flvConsumptions.RemoveAndCloseAll()
s.flvCache.Reset()
@@ -188,7 +216,15 @@ func (s *Stream) WritePacket(packet *rtp.Packet) error {
// WriteFrame .
func (s *Stream) WriteFrame(frame *av.Frame) error {
return s.flvMuxer.WriteFrame(frame)
if err := s.flvMuxer.WriteFrame(frame); err != nil {
s.logger.Error(err.Error())
}
if s.tsMuxer != nil {
if err := s.tsMuxer.WriteFrame(frame); err != nil {
s.logger.Error(err.Error())
}
}
return nil
}
// WriteTag .
@@ -210,7 +246,7 @@ func (s *Stream) Multicastable() Multicastable {
// Hlsable 返回支持hls能力不支持返回nil
func (s *Stream) Hlsable() Hlsable {
return s.hls
return s.hlsMuxer
}
func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra string, useGopCache bool) CID {

View File

@@ -16,7 +16,6 @@ import (
"sync"
"time"
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/protos/mpegts"
"github.com/cnotch/ipchub/utils/murmur"
"github.com/cnotch/xlog"
@@ -27,6 +26,7 @@ const hlsSegmentMinDurationMs = 100
// in ms, for HLS aac flush the audio
const hlsAacDelay = 100
const remainSegmets = 3
// Muxer the HLS stream(m3u8 and ts files).
type Muxer struct {
@@ -87,8 +87,8 @@ func (muxer *Muxer) segmentOpen(segmentStartDts int64) (err error) {
curr.segmentStartPts = segmentStartDts
curr.uri = "/streams" + muxer.path + "/" + strconv.Itoa(muxer.sequenceNo) + ".ts"
tsFileName := fmt.Sprintf("%d_%d.ts", murmur.OfString(muxer.path), muxer.current.sequenceNo)
tsFilePath := filepath.Join(config.HlsPath(), tsFileName)
tsFileName := fmt.Sprintf("%d_%d.ts", murmur.OfString(muxer.path), curr.sequenceNo)
tsFilePath := filepath.Join(muxer.segmentPath, tsFileName)
if err = curr.file.open(tsFilePath); err != nil {
return
}
@@ -152,7 +152,6 @@ func (muxer *Muxer) WriteMpegtsFrame(frame *mpegts.Frame) (err error) {
func (muxer *Muxer) flushFrame(frame *mpegts.Frame) (err error) {
muxer.current.updateDuration(frame.Pts)
if err = muxer.current.file.writeFrame(frame); err != nil {
return
}
@@ -169,7 +168,7 @@ func (muxer *Muxer) segmentClose(muxerClosed bool) (err error) {
defer muxer.l.Unlock()
muxer.current.file.close()
remain := 3
remain := remainSegmets
if muxerClosed {
remain = 0
}
@@ -243,16 +242,16 @@ func (muxer *Muxer) isSegmentAbsolutelyOverflow() bool {
}
// M3u8 获取 m3u8 播放列表
func (muxer *Muxer) M3u8() (string, error) {
func (muxer *Muxer) M3u8() ([]byte, error) {
muxer.lastAccessTime = time.Now()
w := bytes.NewBuffer(make([]byte, 1024))
w := &bytes.Buffer{}
muxer.l.RLock()
defer muxer.l.RUnlock()
segments := muxer.segments
if len(segments) == 0 {
return "", errors.New("Playlist is empty,Maybe the HLS stream just started")
if len(segments) < remainSegmets {
return nil, errors.New("Playlist is not enough,Maybe the HLS stream just started")
}
seq := segments[0].sequenceNo
@@ -280,11 +279,11 @@ func (muxer *Muxer) M3u8() (string, error) {
seg.uri)
}
return string(w.Bytes()), nil
return w.Bytes(), nil
}
// Segment 获取 segment
func (muxer *Muxer) Segment(seq int) (io.Reader, error) {
func (muxer *Muxer) Segment(seq int) (io.Reader, int, error) {
muxer.lastAccessTime = time.Now()
muxer.l.RLock()
defer muxer.l.RUnlock()
@@ -294,7 +293,7 @@ func (muxer *Muxer) Segment(seq int) (io.Reader, error) {
return seg.file.get()
}
}
return nil, errors.New("Not found TSFile")
return nil, 0, errors.New("Not found TSFile")
}
// LastAccessTime 最后hls访问时间

View File

@@ -20,7 +20,7 @@ type segmentFile interface {
open(path string) error
close() error
writeFrame(frame *mpegts.Frame) error
get() (io.Reader, error)
get() (io.Reader, int, error)
delete() error
}
@@ -50,8 +50,9 @@ func (mf *memorySegmentFile) close() (err error) {
return
}
func (mf *memorySegmentFile) get() (io.Reader, error) {
return bytes.NewReader(mf.buff.Bytes()), nil
func (mf *memorySegmentFile) get() (io.Reader, int, error) {
data := mf.buff.Bytes()
return bytes.NewReader(data), len(data), nil
}
func (mf *memorySegmentFile) delete() error {
@@ -100,13 +101,18 @@ func (pf *persistentSegmentFile) close() (err error) {
return nil
}
func (pf *persistentSegmentFile) get() (reader io.Reader, err error) {
func (pf *persistentSegmentFile) get() (reader io.Reader, size int, err error) {
var finfo os.FileInfo
finfo, err = os.Stat(pf.path)
if err != nil {
return
}
var f *os.File
if f, err = os.Open(pf.path); err != nil {
return nil, err
return nil, 0, err
}
return f, nil
return f, int(finfo.Size()), nil
}
func (pf *persistentSegmentFile) delete() error {

106
service/hls/hls.go Executable file
View File

@@ -0,0 +1,106 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package hls
import (
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/cnotch/ipchub/media"
"github.com/cnotch/xlog"
)
// GetM3u8 .
func GetM3u8(logger *xlog.Logger, path string, addr string, w http.ResponseWriter) {
// 需要手动启动,如果需要转换或拉流,很耗时
var c media.Hlsable
s := media.GetOrCreate(path)
if s != nil {
c = s.Hlsable()
}
if c == nil {
logger.Errorf("http-hls: not found stream '%s'", path)
http.Error(w, "404 page not found", http.StatusNotFound)
return
}
var err error
var cont []byte
// 最多等待完成 30 秒
for i := 0; i < 30; i++ {
cont, err = c.M3u8()
if err == nil {
break
}
<-time.After(time.Second)
}
if err != nil {
logger.Errorf("http-hls: request playlist error, %v.", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Content-Type", "application/x-mpegURL")
w.Header().Set("Content-Length", strconv.Itoa(len(cont)))
w.Write(cont)
}
// GetTS .
func GetTS(logger *xlog.Logger, path string, addr string, w http.ResponseWriter) {
i := strings.LastIndex(path, "/")
if i < 0 {
logger.Errorf("http-hls: path illegal `%s`", path)
http.Error(w, "Path illegal", http.StatusBadRequest)
return
}
streamPath := path[:i]
seqStr := path[i+1:]
seq, err := strconv.Atoi(seqStr)
if err != nil {
logger.Errorf("http-hls: path illegal `%s`", path)
http.Error(w, "Path illegal", http.StatusBadRequest)
return
}
// 查找的消费者但不创建
var c media.Hlsable
s := media.GetOrCreate(streamPath)
if s != nil {
c = s.Hlsable()
}
if c == nil {
logger.Errorf("http-hls: not found `%s`", path)
http.Error(w, "404 page not found", http.StatusNotFound)
return
}
reader, size, err := c.Segment(seq)
if err != nil {
logger.Errorf("http-hls: not found `%s`", path)
http.Error(w, "404 page not found", http.StatusNotFound)
return
}
defer func() {
if closer, ok := reader.(io.Closer); ok {
closer.Close()
}
}()
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "video/mp2ts")
w.Header().Set("Content-Length", strconv.Itoa(size))
io.Copy(w, reader)
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/cnotch/ipchub/network/websocket"
"github.com/cnotch/ipchub/provider/auth"
"github.com/cnotch/ipchub/service/flv"
"github.com/cnotch/ipchub/service/hls"
"github.com/cnotch/xlog"
"github.com/cnotch/apirouter"
@@ -66,10 +67,10 @@ func (s *Service) onStreamsRequest(w http.ResponseWriter, r *http.Request) {
switch ext {
case ".flv":
flv.ConsumeByHTTP(s.logger, streamPath, r.RemoteAddr, w)
// case ".m3u8":
// hls.GetM3u8(s.logger, streamPath, r.RemoteAddr, w)
// case ".ts":
// hls.GetTS(s.logger, streamPath, r.RemoteAddr, w)
case ".m3u8":
hls.GetM3u8(s.logger, streamPath, r.RemoteAddr, w)
case ".ts":
hls.GetTS(s.logger, streamPath, r.RemoteAddr, w)
default:
s.logger.Warnf("request file ext is not supported: %s.", ext)
http.NotFound(w, r)