diff --git a/config.json b/config.json index 1671ba9..3814c57 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,6 @@ { "gop_cache": true, + "gop_buffer_size": 8192000, "probe_timeout": 2000, "mw_latency": 350, "public_ip": "192.168.2.148", diff --git a/flv/http_flv.go b/flv/http_flv.go index 3f0aeb4..ef5cc75 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -28,22 +28,13 @@ func init() { } type httpTransStream struct { - stream.CacheTransStream - muxer libflv.Muxer - header []byte - headerSize int -} + stream.TransStreamImpl -func NewHttpTransStream() stream.ITransStream { - return &httpTransStream{ - muxer: libflv.NewMuxer(), - header: make([]byte, 1024), - headerSize: HttpFlvBlockLengthSize, - } -} - -func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) { - return NewHttpTransStream(), nil + muxer libflv.Muxer + mwBuffer stream.MergeWritingBuffer + header []byte + headerSize int + headerTagSize int } func (t *httpTransStream) Input(packet utils.AVPacket) error { @@ -71,49 +62,41 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { videoKey = packet.KeyFrame() } - if videoKey { - head, _ := t.StreamBuffers[0].Data() - if len(head) > t.SegmentOffset { - //分配末尾换行符 - t.StreamBuffers[0].Allocate(2) - - head, _ = t.StreamBuffers[0].Data() - t.writeSeparator(head[t.SegmentOffset:]) - skip := t.computeSikCount(head[t.SegmentOffset:]) - t.SendPacketWithOffset(head, t.SegmentOffset+skip) - } - - t.SwapStreamBuffer() + //发送剩余数据 + if videoKey && !t.mwBuffer.IsEmpty() { + t.mwBuffer.Reserve(2) + segment := t.mwBuffer.PopSegment() + t.sendUnpackedSegment(segment) } var n int var separatorSize int - full := t.Full(dts) - if head, _ := t.StreamBuffers[0].Data(); t.SegmentOffset == len(head) { + + //新的合并写切片, 预留包长字节 + if t.mwBuffer.IsCompeted() { separatorSize = HttpFlvBlockLengthSize //10字节描述flv包长, 前2个字节描述无效字节长度 n = HttpFlvBlockLengthSize } - if full { - separatorSize = 2 + + //结束时, 预留换行符 + if t.mwBuffer.IsFull(dts) { + separatorSize += 2 } - allocate := t.StreamBuffers[0].Allocate(separatorSize + flvSize) - n += t.muxer.Input(allocate[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) - copy(allocate[n:], data) - if !full { - return nil - } + //分配flv block + bytes := t.mwBuffer.Allocate(separatorSize + flvSize) + n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) + copy(bytes[n:], data) - head, _ := t.StreamBuffers[0].Data() //添加长度和换行符 //每一个合并写切片开始和预留长度所需的字节数 //合并写切片末尾加上换行符 //长度是16进制字符串 - t.writeSeparator(head[t.SegmentOffset:]) - - skip := t.computeSikCount(head[t.SegmentOffset:]) - t.SendPacketWithOffset(head, t.SegmentOffset+skip) + segment := t.mwBuffer.PeekCompletedSegment(dts) + if len(segment) > 0 { + t.sendUnpackedSegment(segment) + } return nil } @@ -133,11 +116,19 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error { return nil } -func (t *httpTransStream) sendBuffer(sink stream.ISink, data []byte) error { - return sink.Input(data[t.computeSikCount(data):]) +// 发送还未添加包长和换行符的切片 +func (t *httpTransStream) sendUnpackedSegment(segment []byte) { + t.writeSeparator(segment) + skip := t.computeSkipCount(segment) + t.SendPacket(segment[skip:]) } -func (t *httpTransStream) computeSikCount(data []byte) int { +// 为单个sink发送flv切片, 切片已经添加分隔符 +func (t *httpTransStream) sendSegment(sink stream.ISink, data []byte) error { + return sink.Input(data[t.computeSkipCount(data):]) +} + +func (t *httpTransStream) computeSkipCount(data []byte) int { return int(6 + binary.BigEndian.Uint16(data[4:])) } @@ -146,31 +137,21 @@ func (t *httpTransStream) AddSink(sink stream.ISink) error { t.TransStreamImpl.AddSink(sink) //发送sequence header - t.sendBuffer(sink, t.header[:t.headerSize]) - - send := func(sink stream.ISink, data []byte) { - var index int - for ; index < len(data); index += 4 { - size := binary.BigEndian.Uint32(data[index:]) - t.sendBuffer(sink, data[index:index+4+int(size)]) - index += int(size) - } - } + t.sendSegment(sink, t.header[:t.headerSize]) //发送当前内存池已有的合并写切片 - if t.SegmentOffset > 0 { - data, _ := t.StreamBuffers[0].Data() - utils.Assert(len(data) > 0) - send(sink, data[:t.SegmentOffset]) - return nil - } + segmentList := t.mwBuffer.SegmentList() + if len(segmentList) > 0 { + //修改第一个flv tag的pre tag size + binary.BigEndian.PutUint32(segmentList[20:], uint32(t.headerTagSize)) - //发送上一组GOP - if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() { - data, _ := t.StreamBuffers[1].Data() - utils.Assert(len(data) > 0) - send(sink, data) - return nil + //遍历发送合并写切片 + var index int + for ; index < len(segmentList); index += 4 { + size := binary.BigEndian.Uint32(segmentList[index:]) + t.sendSegment(sink, segmentList[index:index+4+int(size)]) + index += int(size) + } } return nil @@ -217,13 +198,39 @@ func (t *httpTransStream) WriteHeader() error { data = track.CodecParameters().DecoderConfRecord().ToMP4VC() } - t.headerSize += t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true) + n := t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true) + t.headerSize += n copy(t.header[t.headerSize:], data) t.headerSize += len(data) + + t.headerTagSize = n - 15 + len(data) + 11 } //将结尾换行符计算在内 t.headerSize += 2 t.writeSeparator(t.header[:t.headerSize]) + + t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo) return nil } + +func (t *httpTransStream) Close() error { + //发送剩余的流 + segment := t.mwBuffer.PopSegment() + if len(segment) > 0 { + t.sendUnpackedSegment(segment) + } + return nil +} + +func NewHttpTransStream() stream.ITransStream { + return &httpTransStream{ + muxer: libflv.NewMuxer(), + header: make([]byte, 1024), + headerSize: HttpFlvBlockLengthSize, + } +} + +func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) { + return NewHttpTransStream(), nil +} diff --git a/main.go b/main.go index c71ae48..4176550 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( func NewDefaultAppConfig() stream.AppConfig_ { return stream.AppConfig_{ GOPCache: true, + GOPBufferSize: 8196000, MergeWriteLatency: 350, PublicIP: "192.168.2.148", diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index c2e947f..63801b8 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -8,25 +8,17 @@ import ( ) type TransStream struct { - stream.CacheTransStream + stream.TransStreamImpl + chunkSize int - //sequence header - header []byte + header []byte //sequence header headerSize int muxer libflv.Muxer audioChunk librtmp.Chunk videoChunk librtmp.Chunk -} - -func NewTransStream(chunkSize int) stream.ITransStream { - transStream := &TransStream{chunkSize: chunkSize} - return transStream -} - -func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) { - return NewTransStream(librtmp.ChunkSize), nil + mwBuffer stream.MergeWritingBuffer } func (t *TransStream) Input(packet utils.AVPacket) error { @@ -72,20 +64,16 @@ func (t *TransStream) Input(packet utils.AVPacket) error { payloadSize += chunkPayloadOffset + len(data) } - //遇到视频关键帧,不考虑合并写大小,发送之前剩余的数据. + //遇到视频关键帧, 发送剩余的流 if videoKey { - tmp := t.StreamBuffers[0] - head, _ := tmp.Data() - if len(head[t.SegmentOffset:]) > 0 { - bytes := head[t.SegmentOffset:] - t.SendPacket(bytes) - //交替使用两块内存 - t.SwapStreamBuffer() + segment := t.mwBuffer.PopSegment() + if len(segment) > 0 { + t.SendPacket(segment) } } //分配内存 - allocate := t.StreamBuffers[0].Allocate(chunkHeaderSize + payloadSize + ((payloadSize - 1) / t.chunkSize)) + allocate := t.mwBuffer.Allocate(chunkHeaderSize + payloadSize + ((payloadSize - 1) / t.chunkSize)) //写rtmp chunk header chunk.Length = payloadSize @@ -101,14 +89,10 @@ func (t *TransStream) Input(packet utils.AVPacket) error { n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset) - //未满合并写大小, 不发送 - if !t.Full(dts) { - return nil + segment := t.mwBuffer.PeekCompletedSegment(dts) + if len(segment) > 0 { + t.SendPacket(segment) } - - head, _ := t.StreamBuffers[0].Data() - //发送合并写数据 - t.SendPacketWithOffset(head, t.SegmentOffset) return nil } @@ -120,18 +104,9 @@ func (t *TransStream) AddSink(sink stream.ISink) error { sink.Input(t.header[:t.headerSize]) //发送当前内存池已有的合并写切片 - if t.SegmentOffset > 0 { - data, _ := t.StreamBuffers[0].Data() - utils.Assert(len(data) > 0) - sink.Input(data[:t.SegmentOffset]) - return nil - } - - //发送上一组GOP - if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() { - data, _ := t.StreamBuffers[0].Data() - utils.Assert(len(data) > 0) - sink.Input(data) + segmentList := t.mwBuffer.SegmentList() + if len(segmentList) > 0 { + sink.Input(segmentList) return nil } @@ -201,5 +176,24 @@ func (t *TransStream) WriteHeader() error { } t.headerSize = n + + t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo) return nil } + +func (t *TransStream) Close() error { + //发送剩余的流 + segment := t.mwBuffer.PopSegment() + if len(segment) > 0 { + t.SendPacket(segment) + } + return nil +} + +func NewTransStream(chunkSize int) stream.ITransStream { + return &TransStream{chunkSize: chunkSize} +} + +func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) { + return NewTransStream(librtmp.ChunkSize), nil +} diff --git a/stream/config.go b/stream/config.go index 846c8e9..748c37c 100644 --- a/stream/config.go +++ b/stream/config.go @@ -138,9 +138,10 @@ func init() { // AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写. type AppConfig_ struct { - GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 - ProbeTimeout int `json:"probe_timeout"` - PublicIP string `json:"public_ip"` + GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 + GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 + ProbeTimeout int `json:"probe_timeout"` + PublicIP string `json:"public_ip"` //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. diff --git a/stream/gop_buffer.go b/stream/gop_buffer.go new file mode 100644 index 0000000..514beee --- /dev/null +++ b/stream/gop_buffer.go @@ -0,0 +1,109 @@ +package stream + +import "github.com/yangjiechina/avformat/utils" + +// GOPBuffer GOP缓存 +type GOPBuffer interface { + + // AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败 + AddPacket(packet utils.AVPacket) bool + + // SetDiscardHandler 设置丢弃帧时的回调 + SetDiscardHandler(handler func(packet utils.AVPacket)) + + PeekAll(handler func(packet utils.AVPacket)) + + Peek(index int) utils.AVPacket + + Size() int + + Clear() +} + +type streamBuffer struct { + buffer RingBuffer + existVideoKeyFrame bool + discardHandler func(packet utils.AVPacket) +} + +func NewStreamBuffer() GOPBuffer { + return &streamBuffer{buffer: NewRingBuffer(1000), existVideoKeyFrame: false} +} + +func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool { + //缓存满,清空 + if s.Size()+1 == s.buffer.Capacity() { + s.Clear() + } + + //丢弃首帧视频非关键帧 + if utils.AVMediaTypeVideo == packet.MediaType() && !s.existVideoKeyFrame && !packet.KeyFrame() { + return false + } + + //丢弃前一组GOP + videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame() + if videoKeyFrame { + if s.existVideoKeyFrame { + s.discard() + } + + s.existVideoKeyFrame = true + } + + s.buffer.Push(packet) + return true +} + +func (s *streamBuffer) SetDiscardHandler(handler func(packet utils.AVPacket)) { + s.discardHandler = handler +} + +func (s *streamBuffer) discard() { + for s.buffer.Size() > 0 { + pkt := s.buffer.Pop() + + if s.discardHandler != nil { + s.discardHandler(pkt.(utils.AVPacket)) + } + } + + s.existVideoKeyFrame = false +} + +func (s *streamBuffer) Peek(index int) utils.AVPacket { + utils.Assert(index < s.buffer.Size()) + head, tail := s.buffer.Data() + + if index < len(head) { + return head[index].(utils.AVPacket) + } else { + return tail[index-len(head)].(utils.AVPacket) + } +} + +func (s *streamBuffer) PeekAll(handler func(packet utils.AVPacket)) { + head, tail := s.buffer.Data() + + if head == nil { + return + } + for _, value := range head { + handler(value.(utils.AVPacket)) + } + + if tail == nil { + return + } + for _, value := range tail { + handler(value.(utils.AVPacket)) + } +} + +func (s *streamBuffer) Size() int { + return s.buffer.Size() +} + +func (s *streamBuffer) Clear() { + s.discard() +} diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go new file mode 100644 index 0000000..c87d059 --- /dev/null +++ b/stream/mw_buffer.go @@ -0,0 +1,125 @@ +package stream + +// MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存 +// 和GOP缓存一样, 也以视频关键帧为界. 遇到视频关键帧, 发送剩余输出流, 清空buffer + +type MergeWritingBuffer interface { + Allocate(size int) []byte + + // PeekCompletedSegment 返回当前完整合并写切片 + PeekCompletedSegment(ts int64) []byte + + // PopSegment 返回当前合并写切片, 并清空内存池 + PopSegment() []byte + + // SegmentList 返回所有完整切片 + SegmentList() []byte + + IsFull(ts int64) bool + + IsCompeted() bool + + IsEmpty() bool + + Reserve(count int) +} + +type mergeWritingBuffer struct { + transStreamBuffer MemoryPool + + segmentOffset int //当前合并写包位于memoryPool的开始偏移量 + + prePacketTS int64 //前一个包的时间戳 +} + +func (m *mergeWritingBuffer) Allocate(size int) []byte { + return m.transStreamBuffer.Allocate(size) +} + +func (m *mergeWritingBuffer) PeekCompletedSegment(ts int64) []byte { + if !AppConfig.GOPCache { + data, _ := m.transStreamBuffer.Data() + m.transStreamBuffer.Clear() + return data + } + + if m.prePacketTS == -1 { + m.prePacketTS = ts + } + + if ts < m.prePacketTS { + m.prePacketTS = ts + } + + if int(ts-m.prePacketTS) < AppConfig.MergeWriteLatency { + return nil + } + + head, _ := m.transStreamBuffer.Data() + data := head[m.segmentOffset:] + + m.segmentOffset = len(head) + m.prePacketTS = -1 + + return data +} + +func (m *mergeWritingBuffer) IsFull(ts int64) bool { + if m.prePacketTS == -1 { + return false + } + + return int(ts-m.prePacketTS) >= AppConfig.MergeWriteLatency +} + +func (m *mergeWritingBuffer) IsCompeted() bool { + data, _ := m.transStreamBuffer.Data() + return m.segmentOffset == len(data) +} + +func (m *mergeWritingBuffer) IsEmpty() bool { + data, _ := m.transStreamBuffer.Data() + return len(data) <= m.segmentOffset +} + +func (m *mergeWritingBuffer) Reserve(count int) { + _ = m.transStreamBuffer.Allocate(count) +} + +func (m *mergeWritingBuffer) PopSegment() []byte { + if !AppConfig.GOPCache { + return nil + } + + head, _ := m.transStreamBuffer.Data() + data := head[m.segmentOffset:] + m.transStreamBuffer.Clear() + m.segmentOffset = 0 + m.prePacketTS = -1 + return data +} + +func (m *mergeWritingBuffer) SegmentList() []byte { + if !AppConfig.GOPCache { + return nil + } + + head, _ := m.transStreamBuffer.Data() + return head[:m.segmentOffset] +} + +func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { + //开启GOP缓存, 输出流也缓存整个GOP + bufferSize := AppConfig.GOPBufferSize + if existVideo && !AppConfig.GOPCache { + bufferSize = 1024 * 1000 + } else if !existVideo { + bufferSize = 48000 * 10 + } + + return &mergeWritingBuffer{ + transStreamBuffer: NewDirectMemoryPool(bufferSize), + segmentOffset: 0, + prePacketTS: -1, + } +} diff --git a/stream/ring_buffer.go b/stream/ring_buffer.go index b4f06c2..a36e3a3 100644 --- a/stream/ring_buffer.go +++ b/stream/ring_buffer.go @@ -19,6 +19,8 @@ type RingBuffer interface { Size() int + Capacity() int + Data() ([]interface{}, []interface{}) Clear() @@ -27,20 +29,22 @@ type RingBuffer interface { func NewRingBuffer(capacity int) RingBuffer { utils.Assert(capacity > 0) r := &ringBuffer{ - data: make([]interface{}, capacity), - head: 0, - tail: 0, - size: 0, + data: make([]interface{}, capacity), + head: 0, + tail: 0, + size: 0, + capacity: capacity, } return r } type ringBuffer struct { - data []interface{} - head int - tail int - size int + data []interface{} + head int + tail int + size int + capacity int } func (r *ringBuffer) IsEmpty() bool { @@ -92,6 +96,10 @@ func (r *ringBuffer) Size() int { return r.size } +func (r *ringBuffer) Capacity() int { + return r.capacity +} + func (r *ringBuffer) Data() ([]interface{}, []interface{}) { if r.size == 0 { return nil, nil diff --git a/stream/source.go b/stream/source.go index 4670cf1..6a5292f 100644 --- a/stream/source.go +++ b/stream/source.go @@ -61,6 +61,9 @@ type ISource interface { //@Return bool fatal error.释放Source Input(data []byte) error + // Type 推流类型 + Type() SourceType + // OriginStreams 返回推流的原始Streams OriginStreams() []utils.AVStream @@ -83,14 +86,11 @@ type ISource interface { // 将Sink添加到等待队列 Close() - // Type 推流类型 - Type() SourceType - // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool // OnDiscardPacket GOP缓存溢出回调, 释放AVPacket - OnDiscardPacket(pkt interface{}) + OnDiscardPacket(pkt utils.AVPacket) // OnDeMuxStream 解析出AVStream回调 OnDeMuxStream(stream utils.AVStream) @@ -127,14 +127,15 @@ type SourceImpl struct { videoTranscoders []transcode.ITranscoder //视频解码器 originStreams StreamManager //推流的音视频Streams allStreams StreamManager //推流Streams+转码器获得的Stream - gopBuffers []StreamBuffer //推流每路的GOP缓存 pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存. + gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 - Input_ func(data []byte) error //解决多态无法传递给子类的问题 - + existVideo bool //是否存在视频 completed bool probeTimer *time.Timer + Input_ func(data []byte) error //解决多态无法传递给子类的问题 + //所有的输出协议, 持有Sink transStreams map[TransStreamId]ITransStream @@ -199,11 +200,11 @@ func (s *SourceImpl) FindOrCreatePacketBuffer(index int, mediaType utils.AVMedia } else if AppConfig.GOPCache { //开启GOP缓存 //以每秒钟4M码率大小创建视频内存池 - s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024) + s.pktBuffers[index] = NewRbMemoryPool(AppConfig.GOPBufferSize) } else { //未开启GOP缓存 //以每秒钟4M的1/8码率大小创建视频内存池 - s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024 / 8) + s.pktBuffers[index] = NewRbMemoryPool(1024 * 1000) } } @@ -263,53 +264,6 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID) return true } -// 将GOP缓存发送给TransStream -// 按照时间戳升序发送 -func (s *SourceImpl) dispatchStreamBuffer(transStream ITransStream, streams []utils.AVStream) { - size := len(streams) - indexs := make([]int, size) - - for { - min := int64(0xFFFFFFFF) - - //找出最小的时间戳 - for index, stream_ := range streams[:size] { - if s.gopBuffers[stream_.Index()].Size() == indexs[index] { - continue - } - - pkt := s.gopBuffers[stream_.Index()].Peek(indexs[index]).(utils.AVPacket) - v := pkt.Dts() - if min == 0xFFFFFFFF { - min = v - } else if v < min { - min = v - } - } - - if min == 0xFFFFFFFF { - break - } - - for index, stream_ := range streams[:size] { - buffer := s.gopBuffers[stream_.Index()] - if buffer.Size() == indexs[index] { - continue - } - - for i := indexs[index]; i < buffer.Size(); i++ { - packet := buffer.Peek(i).(utils.AVPacket) - if packet.Dts() > min { - break - } - - transStream.Input(packet) - indexs[index]++ - } - } - } -} - func (s *SourceImpl) AddSink(sink ISink) bool { // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() @@ -397,8 +351,10 @@ func (s *SourceImpl) AddSink(sink ISink) bool { } //新的传输流,发送缓存的音视频帧 - if !ok && AppConfig.GOPCache { - s.dispatchStreamBuffer(transStream, streams[:size]) + if !ok && AppConfig.GOPCache && s.existVideo { + s.gopBuffer.PeekAll(func(packet utils.AVPacket) { + transStream.Input(packet) + }) } return true @@ -438,11 +394,18 @@ func (s *SourceImpl) SetState(state SessionState) { } func (s *SourceImpl) Close() { + //释放GOP缓存 + if s.gopBuffer != nil { + s.gopBuffer.Clear() + } + //释放解复用器 //释放转码器 //释放每路转协议流, 将所有sink添加到等待队列 _, _ = SourceManager.Remove(s.Id_) for _, transStream := range s.transStreams { + transStream.Close() + transStream.PopAllSink(func(sink ISink) { sink.SetTransStreamId(0) { @@ -461,8 +424,7 @@ func (s *SourceImpl) Close() { s.transStreams = nil } -func (s *SourceImpl) OnDiscardPacket(pkt interface{}) { - packet := pkt.(utils.AVPacket) +func (s *SourceImpl) OnDiscardPacket(packet utils.AVPacket) { s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeHead() } @@ -486,13 +448,15 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) { }) } + if utils.AVMediaTypeVideo == stream.Type() { + s.existVideo = true + } + //为每个Stream创建对应的Buffer - if AppConfig.GOPCache { - buffer := NewStreamBuffer(200) - //OnDeMuxStream的调用顺序,就是AVStream和AVPacket的Index的递增顺序 - s.gopBuffers = append(s.gopBuffers, buffer) + if AppConfig.GOPCache && s.existVideo { + s.gopBuffer = NewStreamBuffer() //设置GOP缓存溢出回调 - buffer.SetDiscardHandler(s.OnDiscardPacket) + s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket) } } @@ -534,9 +498,8 @@ func (s *SourceImpl) OnDeMuxStreamDone() { } func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) { - if AppConfig.GOPCache { - buffer := s.gopBuffers[packet.Index()] - buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts()) + if AppConfig.GOPCache && s.existVideo { + s.gopBuffer.AddPacket(packet) } //分发给各个传输流 @@ -544,12 +507,10 @@ func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) { stream_.Input(packet) } - if AppConfig.GOPCache { - return + //未开启GOP缓存或只存在音频流, 释放掉内存 + if !AppConfig.GOPCache || !s.existVideo { + s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail() } - - //未开启GOP缓存,释放掉内存 - s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail() } func (s *SourceImpl) OnDeMuxDone() { diff --git a/stream/stream_buffer.go b/stream/stream_buffer.go deleted file mode 100644 index 96885a9..0000000 --- a/stream/stream_buffer.go +++ /dev/null @@ -1,129 +0,0 @@ -package stream - -import "github.com/yangjiechina/avformat/utils" - -// StreamBuffer GOP缓存 -type StreamBuffer interface { - - // AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败 - AddPacket(packet interface{}, key bool, ts int64) bool - - // SetDiscardHandler 设置丢弃帧时的回调 - SetDiscardHandler(handler func(packet interface{})) - - PeekAll(handler func(packet interface{})) - - Peek(index int) interface{} - - Duration() int64 - - Size() int -} - -type streamBuffer struct { - buffer RingBuffer - duration int64 - - keyFrameDts int64 //最近一个关键帧的Dts - FarthestKeyFrameDts int64 //最远一个关键帧的Dts - - discardHandler func(packet interface{}) -} - -type element struct { - ts int64 - key bool - pkt interface{} -} - -func NewStreamBuffer(duration int64) StreamBuffer { - return &streamBuffer{duration: duration, buffer: NewRingBuffer(1000)} -} - -func (s *streamBuffer) AddPacket(packet interface{}, key bool, ts int64) bool { - if s.buffer.IsEmpty() { - if !key { - return false - } - - s.FarthestKeyFrameDts = ts - } - - s.buffer.Push(element{ts, key, packet}) - if key { - s.keyFrameDts = ts - } - - //丢弃处理 - //以最近的关键帧时间戳开始,丢弃缓存超过duration长度的帧 - //至少需要保障当前GOP完整 - //暂时不考虑以下情况: - // 1. 音频收流正常,视频长时间没收流,待视频恢复后。 会造成在此期间,多余的音频帧被丢弃,播放时有画面,没声音. - // 2. 视频反之亦然 - if !key { - return true - } - - for farthest := s.keyFrameDts - s.duration; s.buffer.Size() > 1 && s.buffer.Head().(element).ts < farthest; { - ele := s.buffer.Pop().(element) - - //重新设置最早的关键帧时间戳 - if ele.key && ele.ts != s.FarthestKeyFrameDts { - s.FarthestKeyFrameDts = ele.ts - } - - if s.discardHandler != nil { - s.discardHandler(ele.pkt) - } - } - - return true -} - -func (s *streamBuffer) SetDiscardHandler(handler func(packet interface{})) { - s.discardHandler = handler -} - -func (s *streamBuffer) Peek(index int) interface{} { - utils.Assert(index < s.buffer.Size()) - head, tail := s.buffer.Data() - - if index < len(head) { - return head[index].(element).pkt - } else { - return tail[index-len(head)].(element).pkt - } -} - -func (s *streamBuffer) PeekAll(handler func(packet interface{})) { - head, tail := s.buffer.Data() - - if head == nil { - return - } - for _, value := range head { - handler(value.(element).pkt) - } - - if tail == nil { - return - } - for _, value := range tail { - handler(value.(element).pkt) - } -} - -func (s *streamBuffer) Duration() int64 { - head := s.buffer.Head() - tail := s.buffer.Tail() - - if head == nil || tail == nil { - return 0 - } - - return tail.(element).ts - head.(element).ts -} - -func (s *streamBuffer) Size() int { - return s.buffer.Size() -} diff --git a/stream/trans_stream.go b/stream/trans_stream.go index 86fc59c..0fed462 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -192,61 +192,3 @@ func (t *TransStreamImpl) SendPacket(data []byte) error { return nil } - -// CacheTransStream 针对RTMP/FLV/HLS等基于TCP传输的带缓存传输流. -type CacheTransStream struct { - TransStreamImpl - - //作为封装流的内存缓存区, 即使没有开启GOP缓存也创建一个, 开启GOP缓存的情况下, 创建2个, 反复交替使用. - StreamBuffers []MemoryPool - - //当前合并写切片位于memoryPool的开始偏移量 - SegmentOffset int - //前一个包的时间戳 - PrePacketTS int64 -} - -func (c *CacheTransStream) Init() { - c.TransStreamImpl.Init() - - c.StreamBuffers = make([]MemoryPool, 2) - c.StreamBuffers[0] = NewDirectMemoryPool(1024 * 4000) - - if c.ExistVideo && AppConfig.MergeWriteLatency > 0 { - c.StreamBuffers[1] = NewDirectMemoryPool(1024 * 4000) - } - - c.SegmentOffset = 0 - c.PrePacketTS = -1 -} - -func (c *CacheTransStream) Full(ts int64) bool { - if c.PrePacketTS == -1 { - c.PrePacketTS = ts - } - - if ts < c.PrePacketTS { - c.PrePacketTS = ts - } - - return int(ts-c.PrePacketTS) >= AppConfig.MergeWriteLatency -} - -func (c *CacheTransStream) SwapStreamBuffer() { - if c.ExistVideo && AppConfig.MergeWriteLatency > 0 { - tmp := c.StreamBuffers[0] - c.StreamBuffers[0] = c.StreamBuffers[1] - c.StreamBuffers[1] = tmp - } - - c.StreamBuffers[0].Clear() - c.PrePacketTS = -1 - c.SegmentOffset = 0 -} - -func (c *CacheTransStream) SendPacketWithOffset(data []byte, offset int) error { - c.TransStreamImpl.SendPacket(data[offset:]) - c.SegmentOffset = len(data) - c.PrePacketTS = -1 - return nil -}