diff --git a/config.json b/config.json index a8f6599..2d911fb 100644 --- a/config.json +++ b/config.json @@ -1,6 +1,7 @@ { - "gop_cache": 0, + "gop_cache": true, "probe_timeout": 2000, + "mw_latency": 350, "rtmp": { "enable": true, diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index 2424a5c..7485fc7 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -30,10 +30,10 @@ func NewPublisher(sourceId string, stack *librtmp.Stack) *Publisher { func (p *Publisher) Init() { //创建内存池 - p.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1)) - if stream.AppConfig.GOPCache > 0 { + p.audioMemoryPool = stream.NewMemoryPool(48000 * 1) + if stream.AppConfig.GOPCache { //以每秒钟4M码率大小创建内存池 - p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8 * stream.AppConfig.GOPCache) + p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000) } else { p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8) } @@ -77,7 +77,7 @@ func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) { func (p *Publisher) OnDeMuxPacket(packet utils.AVPacket) { p.SourceImpl.OnDeMuxPacket(packet) - if stream.AppConfig.GOPCache > 0 { + if stream.AppConfig.GOPCache { return } diff --git a/rtmp/rtmp_transtream.go b/rtmp/rtmp_transtream.go index 273e853..797cc98 100644 --- a/rtmp/rtmp_transtream.go +++ b/rtmp/rtmp_transtream.go @@ -27,6 +27,7 @@ var nextOffset int func (t *TransStream) Input(packet utils.AVPacket) { utils.Assert(t.TransStreamImpl.Completed) + var data []byte var chunk *librtmp.Chunk var videoPkt bool @@ -47,6 +48,13 @@ func (t *TransStream) Input(packet utils.AVPacket) { payloadSize += 5 + length } + //即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送 + if !stream.AppConfig.GOPCache && stream.AppConfig.MergeWriteLatency < 1 { + //首帧视频帧必须要 + } else { + + } + //payloadSize += payloadSize / t.chunkSize //分配内存 t.memoryPool.Mark() @@ -95,12 +103,12 @@ func (t *TransStream) Input(packet utils.AVPacket) { rtmpData := t.memoryPool.Fetch()[:n] ret := true - if stream.AppConfig.GOPCache > 0 { + if stream.AppConfig.GOPCache { //ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) ret = t.transBuffer.AddPacket(packet, packet.KeyFrame() && videoPkt, packet.Dts()) } - if !ret || stream.AppConfig.GOPCache < 1 { + if !ret || stream.AppConfig.GOPCache { t.memoryPool.FreeTail() } @@ -197,7 +205,13 @@ func (t *TransStream) AddSink(sink stream.ISink) { utils.Assert(t.headerSize > 0) sink.Input(t.header[:t.headerSize]) + if !stream.AppConfig.GOPCache { + return + } + //开启GOP缓存的情况下 + //开启合并写的情况下: + // 如果合并写大小每满一次 // if stream.AppConfig.GOPCache > 0 { // t.transBuffer.PeekAll(func(packet interface{}) { // sink.Input(packet.([]byte)) @@ -238,9 +252,9 @@ func (t *TransStream) WriteHeader() error { t.TransStreamImpl.Completed = true t.header = make([]byte, 1024) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) - t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 1000 * (stream.AppConfig.GOPCache + 1)) - if stream.AppConfig.GOPCache > 0 { - t.transBuffer = stream.NewStreamBuffer(int64(stream.AppConfig.GOPCache * 200)) + t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 4000) + if stream.AppConfig.GOPCache { + t.transBuffer = stream.NewStreamBuffer(200) t.transBuffer.SetDiscardHandler(t.onDiscardPacket) } diff --git a/stream/config.go b/stream/config.go index 055a006..287b107 100644 --- a/stream/config.go +++ b/stream/config.go @@ -1,5 +1,9 @@ package stream +const ( + DefaultMergeWriteLatency = 350 +) + type RtmpConfig struct { Enable bool `json:"enable"` Addr string `json:"addr"` @@ -50,9 +54,11 @@ func init() { AppConfig = AppConfig_{} } +// AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写. type AppConfig_ struct { - GOPCache int `json:"gop_cache"` //缓存GOP个数,不是时长 - ProbeTimeout int `json:"probe_timeout"` - Rtmp RtmpConfig - Hook HookConfig + GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 + ProbeTimeout int `json:"probe_timeout"` + MergeWriteLatency int `json:"mw_latency"` //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互,大幅提升性能. + Rtmp RtmpConfig + Hook HookConfig } diff --git a/stream/memory_pool.go b/stream/memory_pool.go index 45c9446..201b638 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -13,6 +13,10 @@ type MemoryPool interface { Write(data []byte) + // Reserve 保留指定大小的内存空间 + //主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样. + Reserve(size int) + Allocate(size int) []byte Fetch() []byte @@ -117,6 +121,11 @@ func (m *memoryPool) Write(data []byte) { copy(allocate, data) } +func (m *memoryPool) Reserve(size int) { + utils.Assert(m.mark) + _ = m.allocate(size) +} + func (m *memoryPool) Allocate(size int) []byte { utils.Assert(m.mark) return m.allocate(size) diff --git a/stream/sink.go b/stream/sink.go index 7e69643..2b35cbb 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -66,11 +66,16 @@ type SinkImpl struct { State_ SessionState TransStreamId_ TransStreamId disableVideo bool + //Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全 //如果Sink在等待队列-Sink断开,这个过程是非线程安全的 //SetState的时候,如果closed为true,返回false, 调用者自行删除sink closed atomic.Bool + //HasSentKeyVideo 是否已经发送视频关键帧 + //未开启GOP缓存的情况下,为避免播放花屏,发送的首个视频帧必须为关键帧 + HasSentKeyVideo bool + DesiredAudioCodecId_ utils.AVCodecID DesiredVideoCodecId_ utils.AVCodecID diff --git a/stream/source.go b/stream/source.go index a3cba46..be100ba 100644 --- a/stream/source.go +++ b/stream/source.go @@ -10,14 +10,18 @@ import ( "github.com/yangjiechina/live-server/transcode" ) -// SourceType Source 推流类型 +// SourceType 推流类型 type SourceType byte -// Protocol 输出协议 +// Protocol 输出的流协议 type Protocol uint32 type SourceEvent byte +// SessionState 推拉流Session的状态 +// 包含握手和Hook授权阶段 +type SessionState uint32 + const ( SourceTypeRtmp = SourceType(1) SourceType28181 = SourceType(2) @@ -35,11 +39,12 @@ const ( SourceEventPlayDone = SourceEvent(2) SourceEventInput = SourceEvent(3) SourceEventClose = SourceEvent(4) -) -// SessionState 推拉流Session状态 -// 包含, 握手阶段、Hook授权. -type SessionState uint32 + // TransMuxerHeaderMaxSize 传输流协议头的最大长度 + // 在解析流分配AVPacket的Data时, 如果没有开启合并写, 提前预留指定长度的字节数量. + // 在封装传输流时, 直接在预留头中添加对应传输流的协议头,减少或免内存拷贝. 在传输flv以及转换AVCC和AnnexB格式时有显著提升. + TransMuxerHeaderMaxSize = 30 +) const ( SessionStateCreate = SessionState(1) @@ -167,6 +172,53 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID) return true } +// 分发每路Stream的Buffer给传输流 +// 按照时间戳升序发送 +func (s *SourceImpl) dispatchStreamBuffer(transStream ITransStream, streams []utils.AVStream) { + size := len(streams) + indexs := make([]int, size) + + for { + min := int64(0xFFFFFFFF) + + //找出最小的时间戳 + for index, stream := range streams[:size] { + if s.buffers[stream.Index()].Size() == indexs[index] { + continue + } + + pkt := s.buffers[stream.Index()].Peek(indexs[index]).(utils.AVPacket) + v := pkt.Dts() + if min == 0xFFFFFFFF { + min = v + } else if v < min { + min = v + } + } + + if min == 0xFFFFFFFF { + break + } + + for index, stream := range streams[:size] { + buffer := s.buffers[stream.Index()] + if buffer.Size() == indexs[index] { + continue + } + + for i := indexs[index]; i < buffer.Size(); i++ { + packet := buffer.Peek(i).(utils.AVPacket) + if packet.Dts() > min { + break + } + + transStream.Input(packet) + indexs[index]++ + } + } + } +} + func (s *SourceImpl) AddSink(sink ISink) bool { // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() @@ -240,49 +292,8 @@ func (s *SourceImpl) AddSink(sink ISink) bool { return false } - if AppConfig.GOPCache > 0 && !ok { - indexs := make([]int, size) - - for { - min := int64(0xFFFFFFFF) - - for index, stream := range streams[:size] { - size := s.buffers[stream.Index()].Size() - if size == indexs[index] { - continue - } - - pkt := s.buffers[stream.Index()].Peek(indexs[index]).(utils.AVPacket) - v := pkt.Dts() - if min == 0xFFFFFFFF { - min = v - } else if v < min { - min = v - } - } - - if min == 0xFFFFFFFF { - break - } - - for index, stream := range streams[:size] { - buffer := s.buffers[stream.Index()] - size := buffer.Size() - if size == indexs[index] { - continue - } - - for i := indexs[index]; i < buffer.Size(); i++ { - packet := buffer.Peek(i).(utils.AVPacket) - if packet.Dts() > min { - break - } - - transStream.Input(packet) - indexs[index]++ - } - } - } + if AppConfig.GOPCache && !ok { + s.dispatchStreamBuffer(transStream, streams[:size]) } return false @@ -357,8 +368,8 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) { } //为每个Stream创建对应的Buffer - if AppConfig.GOPCache > 0 { - buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000)) + if AppConfig.GOPCache { + buffer := NewStreamBuffer(200) //OnDeMuxStream的调用顺序,就是AVStream和AVPacket的Index的递增顺序 s.buffers = append(s.buffers, buffer) return true, buffer @@ -393,7 +404,7 @@ func (s *SourceImpl) OnDeMuxStreamDone() { } func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) { - if AppConfig.GOPCache > 0 { + if AppConfig.GOPCache { buffer := s.buffers[packet.Index()] buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts()) }