add flv service

This commit is contained in:
notch
2020-12-24 09:58:45 +08:00
parent 90086728ca
commit d228a35d53
6 changed files with 352 additions and 72 deletions

View File

@@ -4,7 +4,7 @@
"cache_gop": true,
"profile": true,
"log": {
"level": "INFO",
"level": "DEBUG",
"tofile": false,
"filename": "./logs/ipchub.log",
"maxsize": 20,

View File

@@ -18,6 +18,10 @@ import (
"github.com/cnotch/xlog"
)
// 网络播放时 PTSPresentation Time Stamp的延时
// 影响视频 Tag 的 CTS 和音频的 DTSDecoding Time Stamp
const ptsDelay = 1000
type flvMuxer struct {
videoMeta av.VideoMeta
audioMeta av.AudioMeta
@@ -41,6 +45,7 @@ func newFlvMuxer(videoMeta av.VideoMeta, audioMeta av.AudioMeta, tagWriter flv.T
typeFlags: byte(flv.TypeFlagsVideo),
tagWriter: tagWriter,
closed: false,
baseTs: time.Now().UnixNano() / int64(time.Millisecond),
logger: logger,
}
@@ -98,6 +103,10 @@ func (muxer *flvMuxer) prepareTemplate() {
}
func (muxer *flvMuxer) WriteFrame(frame *rtp.Frame) error {
if muxer.baseNtp == 0 {
muxer.baseNtp = frame.NTPTime
}
if frame.FrameType == rtp.FrameVideo {
return muxer.muxVideoTag(frame)
} else {
@@ -120,17 +129,17 @@ func (muxer *flvMuxer) muxVideoTag(frame *rtp.Frame) error {
return muxer.muxSequenceHeaderTag()
}
if muxer.baseNtp == 0 {
muxer.baseNtp = frame.NTPTime
muxer.baseTs = time.Now().UnixNano() / int64(time.Millisecond)
dts := time.Now().UnixNano()/int64(time.Millisecond) - muxer.baseTs
pts := frame.NTPTime - muxer.baseNtp + ptsDelay
if dts > pts {
pts = dts
}
dts := time.Now().UnixNano()/int64(time.Millisecond) - muxer.baseTs
videoData := &flv.VideoData{
FrameType: flv.FrameTypeInterFrame,
CodecID: flv.CodecIDAVC,
AVCPacketType: flv.AVCPacketTypeNALU,
CompositionTime: uint32(frame.NTPTime - muxer.baseNtp),
CompositionTime: uint32(pts - dts),
Body: frame.Payload,
}
@@ -151,11 +160,6 @@ func (muxer *flvMuxer) muxVideoTag(frame *rtp.Frame) error {
}
func (muxer *flvMuxer) muxAudioTag(frame *rtp.Frame) error {
if muxer.baseNtp == 0 {
muxer.baseNtp = frame.NTPTime
muxer.baseTs = time.Now().UnixNano() / int64(time.Millisecond)
}
audioData := *muxer.audioDataTemplate
audioData.Body = frame.Payload
data, _ := audioData.Marshal()
@@ -163,7 +167,7 @@ func (muxer *flvMuxer) muxAudioTag(frame *rtp.Frame) error {
tag := &flv.Tag{
TagType: flv.TagTypeAudio,
DataSize: uint32(len(data)),
Timestamp: uint32(frame.NTPTime - muxer.baseNtp),
Timestamp: uint32(frame.NTPTime-muxer.baseNtp) + ptsDelay,
StreamID: 0,
Data: data,
}
@@ -173,6 +177,15 @@ func (muxer *flvMuxer) muxAudioTag(frame *rtp.Frame) error {
func (muxer *flvMuxer) muxMetadataTag() error {
properties := make(amf.EcmaArray, 0, 12)
properties = append(properties,
amf.ObjectProperty{
Name: "creator",
Value: "ipchub stream media server"})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataCreationDate,
Value: time.Now().Format(time.RFC3339)})
if muxer.typeFlags&flv.TypeFlagsAudio > 0 {
properties = append(properties,
amf.ObjectProperty{
@@ -196,30 +209,26 @@ func (muxer *flvMuxer) muxMetadataTag() error {
Value: muxer.audioMeta.Channels > 1})
}
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataFrameRate,
Value: muxer.videoMeta.FrameRate})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataHeight,
Value: muxer.videoMeta.Height})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataVideoCodecID,
Value: flv.CodecIDAVC})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataHeight,
Value: muxer.videoMeta.Height})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataVideoDataRate,
Value: muxer.videoMeta.DataRate})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataFrameRate,
Value: muxer.videoMeta.FrameRate})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataWidth,
Value: muxer.videoMeta.Width})
properties = append(properties,
amf.ObjectProperty{
Name: flv.MetaDataHeight,
Value: muxer.videoMeta.Height})
scriptData := flv.ScriptData{
Name: flv.ScriptOnMetaData,
@@ -250,20 +259,13 @@ func (muxer *flvMuxer) muxSequenceHeaderTag() error {
muxer.spsMuxed = true
record := &flv.AVCDecoderConfigurationRecord{
ConfigurationVersion: 1,
AVCProfileIndication: muxer.videoMeta.Sps[1],
ProfileCompatibility: muxer.videoMeta.Sps[2],
AVCLevelIndication: muxer.videoMeta.Sps[3],
SPS: muxer.videoMeta.Sps,
PPS: muxer.videoMeta.Pps,
}
record := flv.NewAVCDecoderConfigurationRecord(muxer.videoMeta.Sps, muxer.videoMeta.Pps)
body, _ := record.Marshal()
videoData := &flv.VideoData{
FrameType: flv.FrameTypeKeyFrame,
CodecID: flv.CodecIDAVC,
AVCPacketType: flv.AACPacketTypeSequenceHeader,
AVCPacketType: flv.AVCPacketTypeSequenceHeader,
CompositionTime: 0,
Body: body,
}
@@ -285,7 +287,7 @@ func (muxer *flvMuxer) muxSequenceHeaderTag() error {
}
func (muxer *flvMuxer) muxAudioSequenceHeaderTag() error {
if muxer.typeFlags&flv.TypeFlagsAudio > 0 {
if muxer.typeFlags&flv.TypeFlagsAudio == 0 {
return nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/cnotch/ipchub/av"
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/protos/rtp"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/ipchub/utils"
@@ -44,8 +45,11 @@ type Stream struct {
size uint64 // 流已经接收到的输入(字节)
status int32 // 流状态
consumerSequenceSeed uint32
consumptions consumptions // 消费者列表
cache cache.PackCache // 媒体包缓存
consumptions consumptions // 消费者列表
cache cache.PackCache // 媒体包缓存
flvConsumptions consumptions
flvCache cache.PackCache
flvMuxer FlvMuxer
attrs map[string]string // 流属性
multicast Multicastable
hls Hlsable
@@ -83,6 +87,15 @@ func NewStream(path string, rawsdp string, options ...Option) *Stream {
option.apply(s)
}
if s.Video.Codec == "H264" {
s.flvCache = cache.NewFlvCache(config.CacheGop())
s.flvMuxer = newFlvMuxer(s.Video, s.Audio,
s, s.logger.With(xlog.Fields(xlog.F("extra", "rtp2flv"))))
} else {
s.flvCache = cache.NewEmptyCache()
s.flvMuxer = emptyFlvMuxer{}
}
return s
}
@@ -96,6 +109,11 @@ func (s *Stream) Sdp() string {
return s.rawsdp
}
// FlvTypeFlags 支持的 flv TypeFlags
func (s *Stream) FlvTypeFlags() byte {
return s.flvMuxer.TypeFlags()
}
// Attr 流属性
func (s *Stream) Attr(key string) string {
return s.attrs[strings.ToLower(strings.TrimSpace(key))]
@@ -116,9 +134,11 @@ func (s *Stream) close(status int32) error {
}
atomic.StoreInt32(&s.status, status)
s.consumptions.RemoveAndCloseAll()
s.flvMuxer.Close()
s.flvConsumptions.RemoveAndCloseAll()
s.flvCache.Reset()
// 尽早通知GC回收内存
s.consumptions.RemoveAndCloseAll()
s.cache.Reset()
return nil
}
@@ -133,9 +153,21 @@ func (s *Stream) WritePacket(packet *rtp.Packet) error {
atomic.AddUint64(&s.size, uint64(packet.Size()))
s.cache.CachePack(packet)
s.consumptions.SendToAll(packet)
s.flvMuxer.WritePacket(packet)
return nil
}
// WriteTag .
func (s *Stream) WriteTag(tag *flv.Tag) error {
status := atomic.LoadInt32(&s.status)
if status != StreamOK {
return statusErrors[status]
}
s.flvCache.CachePack(tag)
s.flvConsumptions.SendToAll(tag)
return nil
}
@@ -150,6 +182,10 @@ func (s *Stream) Hlsable() Hlsable {
}
func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra string, useGopCache bool) CID {
if packetType == FLVPacket && s.flvMuxer == nil {
return CID(0) // 不支持
}
c := &consumption{
startOn: time.Now(),
stream: s,
@@ -166,13 +202,19 @@ func (s *Stream) startConsume(consumer Consumer, packetType PacketType, extra st
xlog.F("packettype", c.packetType.String()),
xlog.F("extra", c.extra)))
if useGopCache {
c.sendGop(s.cache) // 新消费者先发送gop缓存
cs := &s.consumptions
cache := s.cache
if packetType == FLVPacket {
cs = &s.flvConsumptions
cache = s.flvCache
}
s.consumptions.Add(c)
go c.consume()
if useGopCache {
c.sendGop(cache) // 新消费者先发送gop缓存
}
cs.Add(c)
go c.consume()
return c.cid
}
@@ -188,7 +230,12 @@ func (s *Stream) StartConsumeNoGopCache(consumer Consumer, packetType PacketType
// StopConsume 开始消费
func (s *Stream) StopConsume(cid CID) {
c := s.consumptions.Remove(cid)
cs := &s.consumptions
if cid.Type() == FLVPacket {
cs = &s.flvConsumptions
}
c := cs.Remove(cid)
if c != nil {
c.Close()
}
@@ -196,7 +243,7 @@ func (s *Stream) StopConsume(cid CID) {
// ConsumerCount 流消费者计数
func (s *Stream) ConsumerCount() int {
return s.consumptions.Count()
return s.consumptions.Count() + s.flvConsumptions.Count()
}
// StreamInfo 流信息
@@ -218,7 +265,7 @@ func (s *Stream) Info(includeCS bool) *StreamInfo {
Path: s.path,
Addr: s.Attr("addr"),
Size: int(atomic.LoadUint64(&s.size) / 1024),
ConsumptionCount: s.consumptions.Count(),
ConsumptionCount: s.ConsumerCount(),
}
if len(s.Video.Codec) != 0 {
@@ -229,13 +276,19 @@ func (s *Stream) Info(includeCS bool) *StreamInfo {
}
if includeCS {
si.Consumptions = s.consumptions.Infos()
si.Consumptions = append(si.Consumptions, s.flvConsumptions.Infos()...)
}
return si
}
// GetConsumption 获取指定消费信息
func (s *Stream) GetConsumption(cid CID) (ConsumptionInfo, bool) {
c, ok := s.consumptions.Load(cid)
cs := &s.consumptions
if cid.Type() == FLVPacket {
cs = &s.flvConsumptions
}
c, ok := cs.Load(cid)
if ok {
return c.(*consumption).Info(), ok
}

105
service/flv/httpflv.go Executable file
View File

@@ -0,0 +1,105 @@
// Copyright calabashdad. https://github.com/calabashdad/seal.git
//
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package flv
import (
"net/http"
"runtime/debug"
"github.com/cnotch/ipchub/media"
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/xlog"
)
type httpFlvConsumer struct {
logger *xlog.Logger
addr string
w *flv.Writer
closeCh chan bool
closed bool
}
func (c *httpFlvConsumer) Consume(pack cache.Pack) {
if c.closed {
return
}
err := c.w.WriteTag(pack.(*flv.Tag))
if err != nil {
c.logger.Errorf("http-flv: send tag failed; %v", err)
c.Close()
return
}
}
func (c *httpFlvConsumer) Close() (err error) {
if c.closed {
return
}
c.closed = true
close(c.closeCh)
return nil
}
// ConsumeByHTTP 处理 http 方式访问流媒体
func ConsumeByHTTP(logger *xlog.Logger, path string, addr string, w http.ResponseWriter) {
logger = logger.With(xlog.Fields(
xlog.F("path", path),
xlog.F("addr", addr)))
stream := media.GetOrCreate(path)
if stream == nil {
http.Error(w, "404 page not found", http.StatusNotFound)
logger.Errorf("http-flv: no stream found")
return
}
typeFlags := stream.FlvTypeFlags()
if typeFlags == 0 {
http.Error(w, "404 page not found", http.StatusNotFound)
logger.Errorf("http-flv: stream not support flv")
return
}
var cid media.CID
defer func() {
if r := recover(); r != nil {
xlog.Errorf("http-flv: panic; %v \n %s", r, debug.Stack())
}
stream.StopConsume(cid)
stats.FlvConns.Release()
logger.Info("http-flv: stop http-flv consume")
}()
logger.Info("http-flv: start http-flv consume")
stats.FlvConns.Add()
// 启动 pack 消费,必须 StartConsume 前写入 Header
flvWriter, err := flv.NewWriter(w, typeFlags)
if err != nil {
http.Error(w, "send flv header failed", http.StatusInternalServerError)
logger.Error("http-flv: send flv header failed.")
return
}
c := &httpFlvConsumer{
logger: logger,
w: flvWriter,
closeCh: make(chan bool),
}
cid = stream.StartConsume(c, media.FLVPacket, "net=http-flv,"+addr)
// 等待关闭
select {
case <-c.closeCh:
}
}

118
service/flv/wsflv.go Executable file
View File

@@ -0,0 +1,118 @@
// Copyright (c) 2019,CAOHONGJU All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package flv
import (
"runtime/debug"
"time"
"github.com/cnotch/ipchub/media"
"github.com/cnotch/ipchub/media/cache"
"github.com/cnotch/ipchub/network/websocket"
"github.com/cnotch/ipchub/protos/flv"
"github.com/cnotch/ipchub/stats"
"github.com/cnotch/xlog"
)
type wsFlvConsumer struct {
logger *xlog.Logger
w *flv.Writer
conn websocket.Conn
closed bool
}
func (c *wsFlvConsumer) Consume(pack cache.Pack) {
if c.closed {
return
}
err := c.w.WriteTag(pack.(*flv.Tag))
if err != nil {
c.logger.Errorf("ws-flv: send tag failed; %v", err)
c.Close()
return
}
}
func (c *wsFlvConsumer) Close() (err error) {
if c.closed {
return
}
c.closed = true
c.conn.Close()
return nil
}
func (c *wsFlvConsumer) Type() string {
return "websocket-flv"
}
// ConsumeByWebsocket 处理 websocket 方式访问流媒体
func ConsumeByWebsocket(logger *xlog.Logger, path string, addr string, conn websocket.Conn) {
logger = logger.With(xlog.Fields(
xlog.F("path", path),
xlog.F("addr", addr)))
stream := media.GetOrCreate(path)
if stream == nil {
conn.Close()
logger.Errorf("ws-flv: no stream found")
return
}
typeFlags := stream.FlvTypeFlags()
if typeFlags == 0 {
conn.Close()
logger.Errorf("ws-flv: stream not support flv")
return
}
var cid media.CID
defer func() {
if r := recover(); r != nil {
xlog.Errorf("ws-flv: panic; %v \n %s", r, debug.Stack())
}
stream.StopConsume(cid)
conn.Close()
stats.FlvConns.Release()
logger.Info("stop websocket-flv consume")
}()
logger.Info("start websocket-flv consume")
stats.FlvConns.Add()
// 启动 pack 消费,必须 StartConsume 前写入 Header
flvWriter, err := flv.NewWriter(conn, typeFlags)
if err != nil {
logger.Error("ws-flv: send flv header failed.")
return
}
c := &wsFlvConsumer{
logger: logger,
conn: conn,
w: flvWriter,
}
cid = stream.StartConsume(c, media.FLVPacket, "net=websocket-flv,"+addr)
for !c.closed {
deadLine := time.Time{}
if err := conn.SetReadDeadline(deadLine); err != nil {
break
}
var temp [1]byte
if _, err := conn.Read(temp[:]); err != nil {
if !c.closed {
logger.Errorf("websocket error; %v.", err)
}
break
}
}
}

View File

@@ -11,6 +11,8 @@ import (
"github.com/cnotch/ipchub/config"
"github.com/cnotch/ipchub/network/websocket"
"github.com/cnotch/ipchub/provider/auth"
"github.com/cnotch/ipchub/service/flv"
"github.com/cnotch/xlog"
"github.com/cnotch/apirouter"
"github.com/cnotch/ipchub/utils/scan"
@@ -19,7 +21,7 @@ import (
// 初始化流式访问
func (s *Service) initHTTPStreams(mux *http.ServeMux) {
mux.Handle("/ws/", apirouter.WrapHandler(http.HandlerFunc(s.onWebSocketRequest), apirouter.PreInterceptor(s.streamInterceptor)))
// mux.Handle("/streams/", apirouter.WrapHandler(http.HandlerFunc(s.onStreamsRequest), apirouter.PreInterceptor(s.streamInterceptor)))
mux.Handle("/streams/", apirouter.WrapHandler(http.HandlerFunc(s.onStreamsRequest), apirouter.PreInterceptor(s.streamInterceptor)))
}
// websocket 请求处理
@@ -42,37 +44,37 @@ func (s *Service) onWebSocketRequest(w http.ResponseWriter, r *http.Request) {
return
}
// if ext == ".flv" {
// go flv.ConsumeByWebsocket(s.logger, streamPath, r.RemoteAddr, ws)
// return
// }
if ext == ".flv" {
go flv.ConsumeByWebsocket(s.logger, streamPath, r.RemoteAddr, ws)
return
}
s.logger.Warnf("websocket sub-protocol is not supported: %s.", ws.Subprotocol())
ws.Close()
}
}
// // streams 请求处理(flv,mu38,ts)
// func (s *Service) onStreamsRequest(w http.ResponseWriter, r *http.Request) {
// // 获取文件后缀和流路径
// streamPath, ext := extractStreamPathAndExt(r.URL.Path)
// s.logger.Debug("http access stream media.",
// xlog.F("path", streamPath),
// xlog.F("ext", ext))
// streams 请求处理(flv,mu38,ts)
func (s *Service) onStreamsRequest(w http.ResponseWriter, r *http.Request) {
// 获取文件后缀和流路径
streamPath, ext := extractStreamPathAndExt(r.URL.Path)
s.logger.Debug("http access stream media.",
xlog.F("path", streamPath),
xlog.F("ext", ext))
// w.Header().Set("Access-Control-Allow-Origin", "*")
// switch ext {
// case ".flv":
// flv.ConsumeByHTTP(s.logger, streamPath, r.RemoteAddr, w)
// case ".m3u8":
// hls.GetM3u8(s.logger, streamPath, r.RemoteAddr, w)
// case ".ts":
// hls.GetTS(s.logger, streamPath, r.RemoteAddr, w)
// default:
// s.logger.Warnf("request file ext is not supported: %s.", ext)
// http.NotFound(w, r)
// }
// }
w.Header().Set("Access-Control-Allow-Origin", "*")
switch ext {
case ".flv":
flv.ConsumeByHTTP(s.logger, streamPath, r.RemoteAddr, w)
// case ".m3u8":
// hls.GetM3u8(s.logger, streamPath, r.RemoteAddr, w)
// case ".ts":
// hls.GetTS(s.logger, streamPath, r.RemoteAddr, w)
default:
s.logger.Warnf("request file ext is not supported: %s.", ext)
http.NotFound(w, r)
}
}
func (s *Service) streamInterceptor(w http.ResponseWriter, r *http.Request) bool {
if path.Base(r.URL.Path) == "crossdomain.xml" {