diff --git a/rtmp/rtmp_server_test.go b/rtmp/rtmp_server_test.go index f3b7ab4..535dd61 100644 --- a/rtmp/rtmp_server_test.go +++ b/rtmp/rtmp_server_test.go @@ -1,6 +1,7 @@ package rtmp import ( + "github.com/yangjiechina/avformat/librtmp" "github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/live-server/stream" "net" @@ -9,7 +10,7 @@ import ( func CreateTransStream(protocol stream.Protocol, streams []utils.AVStream) stream.ITransStream { if stream.ProtocolRtmp == protocol { - return &TransStream{} + return NewTransStream(librtmp.ChunkSize) } return nil diff --git a/rtmp/rtmp_transtream.go b/rtmp/rtmp_transtream.go index c5120f0..ef98d0c 100644 --- a/rtmp/rtmp_transtream.go +++ b/rtmp/rtmp_transtream.go @@ -26,24 +26,30 @@ func (t *TransStream) Input(packet utils.AVPacket) { var data []byte var chunk *librtmp.Chunk var videoPkt bool + var length int + //rtmp chunk消息体的数据大小 + var payloadSize int if utils.AVMediaTypeAudio == packet.MediaType() { data = packet.Data() + length = len(data) chunk = &t.audioChunk + payloadSize += 2 + length } else if utils.AVMediaTypeVideo == packet.MediaType() { + if packet.KeyFrame() { + println("") + } videoPkt = true data = packet.AVCCPacketData() + length = len(data) chunk = &t.videoChunk + payloadSize += 5 + length } - length := len(data) - //rtmp chunk消息体的数据大小 - payloadSize := 5 + length - payloadSize += payloadSize / t.chunkSize - + //payloadSize += payloadSize / t.chunkSize //分配内存 t.memoryPool.Mark() - allocate := t.memoryPool.Allocate(12 + payloadSize) + allocate := t.memoryPool.Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) //写chunk头 chunk.Length = payloadSize @@ -54,13 +60,21 @@ func (t *TransStream) Input(packet utils.AVPacket) { //写flv ct := packet.Pts() - packet.Dts() if videoPkt { - n += t.muxer.WriteVideoData(allocate, uint32(ct), packet.KeyFrame(), false) + n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false) } else { - n += t.muxer.WriteAudioData(allocate, false) + n += t.muxer.WriteAudioData(allocate[12:], false) } + first := true + var min int for length > 0 { - min := utils.MinInt(length, t.chunkSize) + if first { + min = utils.MinInt(length, t.chunkSize-5) + first = false + } else { + min = utils.MinInt(length, t.chunkSize) + } + copy(allocate[n:], data[:min]) n += min @@ -78,8 +92,9 @@ func (t *TransStream) Input(packet utils.AVPacket) { } } - rtmpData := t.memoryPool.Fetch() - ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && utils.AVMediaTypeVideo == packet.MediaType(), packet.Dts()) + rtmpData := t.memoryPool.Fetch()[:n] + ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) + if ret { //发送给sink @@ -88,18 +103,21 @@ func (t *TransStream) Input(packet utils.AVPacket) { } } + t.memoryPool.FreeTail() } func (t *TransStream) AddSink(sink stream.ISink) { t.TransStreamImpl.AddSink(sink) + utils.Assert(t.headerSize > 0) + sink.Input(t.header[:t.headerSize]) + t.transBuffer.Peek(func(packet interface{}) { sink.Input(packet.([]byte)) }) } func (t *TransStream) onDiscardPacket(pkt interface{}) { - //bytes := pkt.([]byte) t.memoryPool.FreeHead() } @@ -117,7 +135,7 @@ func (t *TransStream) WriteHeader() error { audioStream = track audioCodecId = audioStream.CodecId() t.audioChunk = librtmp.NewAudioChunk() - } else if utils.AVMediaTypeAudio == track.Type() { + } else if utils.AVMediaTypeVideo == track.Type() { videoStream = track videoCodecId = videoStream.CodecId() t.videoChunk = librtmp.NewVideoChunk() @@ -127,6 +145,7 @@ func (t *TransStream) WriteHeader() error { utils.Assert(audioStream != nil || videoStream != nil) //初始化 + t.TransStreamImpl.Completed = true t.header = make([]byte, 1024) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) t.memoryPool = stream.NewMemoryPool(1024 * 1024 * 2) @@ -135,17 +154,26 @@ func (t *TransStream) WriteHeader() error { var n int if audioStream != nil { - n += t.muxer.WriteAudioData(t.header, true) + n += t.muxer.WriteAudioData(t.header[12:], true) extra := audioStream.Extra() - copy(t.header[n:], extra) + copy(t.header[n+12:], extra) n += len(extra) + + t.audioChunk.Length = n + t.audioChunk.ToBytes(t.header) + n += 12 } if videoStream != nil { - n += t.muxer.WriteAudioData(t.header[n:], true) + tmp := n + n += t.muxer.WriteVideoData(t.header[n+12:], 0, false, true) extra := videoStream.Extra() - copy(t.header[n:], extra) + copy(t.header[n+12:], extra) n += len(extra) + + t.videoChunk.Length = 5 + len(extra) + t.videoChunk.ToBytes(t.header[tmp:]) + n += 12 } t.headerSize = n @@ -153,6 +181,6 @@ func (t *TransStream) WriteHeader() error { } func NewTransStream(chunkSize int) stream.ITransStream { - transStream := &TransStream{chunkSize: chunkSize} + transStream := &TransStream{chunkSize: chunkSize, TransStreamImpl: stream.TransStreamImpl{Sinks: make(map[stream.SinkId]stream.ISink, 64)}} return transStream } diff --git a/stream/ring_buffer.go b/stream/ring_buffer.go index fe9c3ba..ed25146 100644 --- a/stream/ring_buffer.go +++ b/stream/ring_buffer.go @@ -82,6 +82,10 @@ func (r *ringBuffer) Size() int { } func (r *ringBuffer) All() ([]interface{}, []interface{}) { + if r.size == 0 { + return nil, nil + } + if r.head < r.tail { return r.data[r.head:], r.data[:r.tail] } else {