diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index 157f8ba..e169df3 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -18,7 +18,7 @@ type Publisher struct { func NewPublisher(sourceId string) *Publisher { publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false} - publisher.deMuxer = libflv.DeMuxer{} + publisher.deMuxer = libflv.NewDeMuxer() //设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl publisher.deMuxer.SetHandler(publisher) diff --git a/rtmp/rtmp_server_test.go b/rtmp/rtmp_server_test.go index 535dd61..65c08cb 100644 --- a/rtmp/rtmp_server_test.go +++ b/rtmp/rtmp_server_test.go @@ -21,6 +21,7 @@ func init() { } func TestServer(t *testing.T) { + stream.AppConfig.GOPCache = 2 impl := serverImpl{} addr := "0.0.0.0:1935" tcpAddr, err := net.ResolveTCPAddr("tcp", addr) diff --git a/rtmp/rtmp_transtream.go b/rtmp/rtmp_transtream.go index ef98d0c..567bf71 100644 --- a/rtmp/rtmp_transtream.go +++ b/rtmp/rtmp_transtream.go @@ -36,9 +36,6 @@ func (t *TransStream) Input(packet utils.AVPacket) { chunk = &t.audioChunk payloadSize += 2 + length } else if utils.AVMediaTypeVideo == packet.MediaType() { - if packet.KeyFrame() { - println("") - } videoPkt = true data = packet.AVCCPacketData() length = len(data) @@ -93,7 +90,10 @@ func (t *TransStream) Input(packet utils.AVPacket) { } rtmpData := t.memoryPool.Fetch()[:n] - ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) + ret := true + if stream.AppConfig.GOPCache > 0 { + ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) + } if ret { //发送给sink @@ -103,7 +103,9 @@ func (t *TransStream) Input(packet utils.AVPacket) { } } - t.memoryPool.FreeTail() + if stream.AppConfig.GOPCache < 1 { + t.memoryPool.FreeTail() + } } func (t *TransStream) AddSink(sink stream.ISink) { @@ -112,9 +114,11 @@ func (t *TransStream) AddSink(sink stream.ISink) { utils.Assert(t.headerSize > 0) sink.Input(t.header[:t.headerSize]) - t.transBuffer.Peek(func(packet interface{}) { - sink.Input(packet.([]byte)) - }) + if stream.AppConfig.GOPCache > 0 { + t.transBuffer.PeekAll(func(packet interface{}) { + sink.Input(packet.([]byte)) + }) + } } func (t *TransStream) onDiscardPacket(pkt interface{}) { @@ -148,9 +152,11 @@ func (t *TransStream) WriteHeader() error { t.TransStreamImpl.Completed = true t.header = make([]byte, 1024) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) - t.memoryPool = stream.NewMemoryPool(1024 * 1024 * 2) - t.transBuffer = stream.NewStreamBuffer(2000) - t.transBuffer.SetDiscardHandler(t.onDiscardPacket) + t.memoryPool = stream.NewMemoryPool(1024 * 1000 * (stream.AppConfig.GOPCache + 1)) + if stream.AppConfig.GOPCache > 0 { + t.transBuffer = stream.NewStreamBuffer(int64(stream.AppConfig.GOPCache * 200)) + t.transBuffer.SetDiscardHandler(t.onDiscardPacket) + } var n int if audioStream != nil { diff --git a/stream/source.go b/stream/source.go index aa5d522..2ca7a82 100644 --- a/stream/source.go +++ b/stream/source.go @@ -166,6 +166,8 @@ func (s *SourceImpl) AddSink(sink ISink) bool { var streams [5]utils.AVStream var index int + bufferCount := -1 + for _, stream := range s.originStreams.All() { if disableVideo && stream.Type() == utils.AVMediaTypeVideo { continue @@ -173,11 +175,20 @@ func (s *SourceImpl) AddSink(sink ISink) bool { streams[index] = stream index++ + + //从缓存的Stream中,挑选出最小的缓存数量,交叉发送. + count := s.buffers[stream.Index()].Size() + if bufferCount == -1 { + bufferCount = count + } else { + bufferCount = utils.MinInt(bufferCount, count) + } } transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:index]...) transStream, ok := s.transStreams[transStreamId] if !ok { + //创建一个新的传输流 transStream = TransStreamFactory(sink.Protocol(), streams[:index]) if s.transStreams == nil { s.transStreams = make(map[TransStreamId]ITransStream, 10) @@ -192,6 +203,28 @@ func (s *SourceImpl) AddSink(sink ISink) bool { } transStream.AddSink(sink) + + if AppConfig.GOPCache > 0 && !ok { + //先交叉发送 + for i := 0; i < bufferCount; i++ { + for _, stream := range streams { + buffer := s.buffers[stream.Index()] + packet := buffer.Peek(i).(utils.AVPacket) + transStream.Input(packet) + } + } + + //发送超过最低缓存数的缓存包 + for _, stream := range streams { + buffer := s.buffers[stream.Index()] + + for i := bufferCount; i > buffer.Size(); i++ { + packet := buffer.Peek(i).(utils.AVPacket) + transStream.Input(packet) + } + } + } + return false } @@ -217,7 +250,7 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) { //为每个Stream创建对于的Buffer if AppConfig.GOPCache > 0 { - buffer := NewStreamBuffer(int64(AppConfig.GOPCache)) + buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000)) s.buffers = append(s.buffers, buffer) } } diff --git a/stream/stream_buffer.go b/stream/stream_buffer.go index c5c1157..ef061fd 100644 --- a/stream/stream_buffer.go +++ b/stream/stream_buffer.go @@ -1,5 +1,7 @@ package stream +import "github.com/yangjiechina/avformat/utils" + // StreamBuffer GOP缓存 type StreamBuffer interface { @@ -9,9 +11,13 @@ type StreamBuffer interface { // SetDiscardHandler 设置丢弃帧时的回调 SetDiscardHandler(handler func(packet interface{})) - Peek(handler func(packet interface{})) + PeekAll(handler func(packet interface{})) + + Peek(index int) interface{} Duration() int64 + + Size() int } type streamBuffer struct { @@ -78,7 +84,18 @@ func (s *streamBuffer) SetDiscardHandler(handler func(packet interface{})) { s.discardHandler = handler } -func (s *streamBuffer) Peek(handler func(packet interface{})) { +func (s *streamBuffer) Peek(index int) interface{} { + utils.Assert(index < s.buffer.Size()) + head, tail := s.buffer.All() + + if index < len(head) { + return head[index] + } else { + return tail[index-len(head)] + } +} + +func (s *streamBuffer) PeekAll(handler func(packet interface{})) { head, tail := s.buffer.All() if head == nil { @@ -106,3 +123,7 @@ func (s *streamBuffer) Duration() int64 { return tail.(element).ts - head.(element).ts } + +func (s *streamBuffer) Size() int { + return s.buffer.Size() +}