diff --git a/flv/http_flv.go b/flv/http_flv.go index 25c9bd0..c090095 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -59,9 +59,8 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { if videoKey { head, _ := t.StreamBuffers[0].Data() if len(head) > t.SegmentOffset { - t.StreamBuffers[0].Mark() + //分配末尾换行符 t.StreamBuffers[0].Allocate(2) - t.StreamBuffers[0].Fetch() head, _ = t.StreamBuffers[0].Data() t.writeSeparator(head[t.SegmentOffset:]) @@ -84,12 +83,9 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { separatorSize = 2 } - t.StreamBuffers[0].Mark() allocate := t.StreamBuffers[0].Allocate(separatorSize + flvSize) n += t.muxer.Input(allocate[n:], packet.MediaType(), len(data), packet.Dts(), packet.Pts(), packet.KeyFrame(), false) copy(allocate[n:], data) - _ = t.StreamBuffers[0].Fetch() - if !full { return nil } @@ -115,8 +111,9 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error { t.muxer.AddAudioTrack(stream.CodecId(), 0, 0, 0) } else if utils.AVMediaTypeVideo == stream.Type() { t.muxer.AddVideoTrack(stream.CodecId()) - t.muxer.AddProperty("width", stream.(utils.VideoStream).Width()) - t.muxer.AddProperty("height", stream.(utils.VideoStream).Height()) + + t.muxer.AddProperty("width", stream.CodecParameters().SPSInfo().Width()) + t.muxer.AddProperty("height", stream.CodecParameters().SPSInfo().Height()) } return nil } diff --git a/hls/hls_stream.go b/hls/hls_stream.go index e34695f..8c8e914 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -99,7 +99,7 @@ func (t *transStream) Input(packet utils.AVPacket) error { pts := packet.ConvertPts(90000) dts := packet.ConvertDts(90000) if utils.AVMediaTypeVideo == packet.MediaType() { - return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(), pts, dts, packet.KeyFrame()) + return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]), pts, dts, packet.KeyFrame()) } else { return t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame()) } diff --git a/rtc/rtc_stream.go b/rtc/rtc_stream.go index 0e4d228..841ba32 100644 --- a/rtc/rtc_stream.go +++ b/rtc/rtc_stream.go @@ -34,7 +34,8 @@ func (t *transStream) Input(packet utils.AVPacket) error { } sink_.input(packet.Index(), extra, 0) } - sink_.input(packet.Index(), packet.AnnexBPacketData(), 40) + + sink_.input(packet.Index(), packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]), uint32(packet.Duration(1000))) } } diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index 58731f0..9f4c23f 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -49,7 +49,7 @@ func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) Publishe func (p *publisher) Init() { //创建内存池 - p.audioMemoryPool = stream.NewMemoryPool(48000 * 1) + p.audioMemoryPool = stream.NewMemoryPool(48000 * 64) if stream.AppConfig.GOPCache { //以每秒钟4M码率大小创建内存池 p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000) diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index 3c5531c..5e5ed32 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -24,7 +24,10 @@ func NewTransStream(chunkSize int) stream.ITransStream { return transStream } +var count int + func (t *TransStream) Input(packet utils.AVPacket) error { + count++ utils.Assert(t.TransStreamImpl.Completed) var data []byte @@ -34,12 +37,16 @@ func (t *TransStream) Input(packet utils.AVPacket) error { var length int //rtmp chunk消息体的数据大小 var payloadSize int + //先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小 + var chunkPayloadOffset int + ct := packet.Pts() - packet.Dts() if utils.AVMediaTypeAudio == packet.MediaType() { data = packet.Data() length = len(data) chunk = &t.audioChunk - payloadSize += 2 + length + chunkPayloadOffset = 2 + payloadSize += chunkPayloadOffset + length } else if utils.AVMediaTypeVideo == packet.MediaType() { videoPkt = true @@ -47,26 +54,30 @@ func (t *TransStream) Input(packet utils.AVPacket) error { data = packet.AVCCPacketData() length = len(data) chunk = &t.videoChunk - payloadSize += 5 + length + chunkPayloadOffset = t.muxer.ComputeVideoDataSize(uint32(ct)) + payloadSize += chunkPayloadOffset + length } + //遇到视频关键帧,不考虑合并写大小,发送之前剩余的数据. if videoKey { tmp := t.StreamBuffers[0] head, _ := tmp.Data() - t.SendPacket(head[t.SegmentOffset:]) - t.SwapStreamBuffer() + if len(head[t.SegmentOffset:]) > 0 { + bytes := head[t.SegmentOffset:] + t.SendPacket(bytes) + //交替使用两块内存 + t.SwapStreamBuffer() + } } //分配内存 - t.StreamBuffers[0].Mark() allocate := t.StreamBuffers[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) - //写chunk头 var ts uint32 if packet.CodecId() == utils.AVCodecIdAAC { - ts = uint32(packet.ConvertPts(libflv.AACFrameSize)) + ts = uint32(packet.ConvertDts(libflv.AACFrameSize)) } else { - ts = uint32(packet.ConvertPts(1000)) + ts = uint32(packet.ConvertDts(1000)) } chunk.Length = payloadSize @@ -75,22 +86,22 @@ func (t *TransStream) Input(packet utils.AVPacket) error { utils.Assert(n == 12) //写flv - ct := packet.Pts() - packet.Dts() if videoPkt { n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false) - n += chunk.WriteData(allocate[n:], data, t.chunkSize) + n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset) } else { n += t.muxer.WriteAudioData(allocate[12:], false) - n += chunk.WriteData(allocate[n:], data, t.chunkSize) + n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset) } - _ = t.StreamBuffers[0].Fetch()[:n] - if t.Full(packet.Pts()) { + //未满合并写大小, 不发送 + if !t.Full(packet.Dts()) { return nil } head, _ := t.StreamBuffers[0].Data() - t.SendPacketWithOffset(head[:], t.SegmentOffset) + //发送合并写数据 + t.SendPacketWithOffset(head, t.SegmentOffset) return nil } diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index 77a04bc..442a561 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -3,6 +3,8 @@ package rtsp import ( "encoding/binary" "fmt" + "github.com/yangjiechina/avformat/libavc" + "github.com/yangjiechina/avformat/libhevc" "github.com/yangjiechina/avformat/librtp" "github.com/yangjiechina/avformat/librtsp/sdp" "github.com/yangjiechina/avformat/utils" @@ -109,23 +111,23 @@ func (t *tranStream) Input(packet utils.AVPacket) error { return nil } - extra, err := t.TransStreamImpl.Tracks[packet.Index()].AnnexBExtraData() - if err != nil { - return err + stream_.cache = true + parameters := t.TransStreamImpl.Tracks[packet.Index()].CodecParameters() + + if utils.AVCodecIdH265 == packet.CodecId() { + bytes := parameters.DecoderConfRecord().(*libhevc.HEVCDecoderConfRecord).VPS + stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate))) } - var count int - stream_.cache = true - utils.SplitNalU(extra, func(nalu []byte) { - data := utils.RemoveStartCode(nalu) - stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate))) - count++ - }) + spsBytes := parameters.DecoderConfRecord().SPSBytes() + ppsBytes := parameters.DecoderConfRecord().PPSBytes() + stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate))) + stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate))) stream_.header = stream_.tmp } - data := utils.RemoveStartCode(packet.AnnexBPacketData()) + data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()])) stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate))) } @@ -153,7 +155,7 @@ func (t *tranStream) AddTrack(stream utils.AVStream) error { //创建RTP封装 var muxer librtp.Muxer - if utils.AVCodecIdH264 == stream.CodecId() { + if utils.AVCodecIdH264 == stream.CodecId() || utils.AVCodecIdH265 == stream.CodecId() { muxer = librtp.NewH264Muxer(payloadType.Pt, 0, 0xFFFFFFFF) } else if utils.AVCodecIdAAC == stream.CodecId() { muxer = librtp.NewAACMuxer(payloadType.Pt, 0, 0xFFFFFFFF) diff --git a/rtsp/transport_manager.go b/rtsp/transport_manager.go index 13e1c25..7fd313f 100644 --- a/rtsp/transport_manager.go +++ b/rtsp/transport_manager.go @@ -2,6 +2,7 @@ package rtsp import ( "fmt" + "github.com/yangjiechina/avformat/libbufio" "github.com/yangjiechina/avformat/utils" ) @@ -54,7 +55,7 @@ func (t *transportManager) AllocTransport(tcp bool, cb func(port int)) error { } t.nextPort = t.nextPort + 1%t.endPort - t.nextPort = utils.MaxInt(t.nextPort, t.startPort) + t.nextPort = libbufio.MaxInt(t.nextPort, t.startPort) return nil } diff --git a/stream/memory_pool.go b/stream/memory_pool.go index 80a93ad..3cb99c3 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -6,24 +6,31 @@ import ( // MemoryPool 从解复用阶段,拼凑成完整的AVPacket开始(写),到GOP缓存结束(释放),整个过程都使用池中内存 // 类似环形缓冲区, 区别在于,写入的内存块是连续的、整块内存. +// 两种使用方式: +// 1. 已知需要分配内存大小, 直接使用Allocate()函数分配, 并且外部自行操作内存块 +// 2. 未知分配内存大小, 先使用Mark()函数,标记内存起始偏移量, 再通过Write()函数将数据拷贝进内存块,最后调用Fetch/Reset函数完成或释放内存块 +// +// 两种使用方式互斥,不能同时使用. type MemoryPool interface { - // Mark 标记一块写的内存地址 - //使用流程 Mark->Write/Allocate....->Fetch/Reset + // Allocate 分配指定大小的内存块 + Allocate(size int) []byte + + // Mark 标记内存块起始位置 Mark() + // Write 向内存块中写入数据, 必须先调用Mark函数 Write(data []byte) + // Fetch 获取当前内存块,必须先调用Mark函数 + Fetch() []byte + + // Reset 清空写入的数据,本次缓存的数据无效 + Reset() + // Reserve 保留指定大小的内存空间 //主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样. Reserve(size int) - Allocate(size int) []byte - - Fetch() []byte - - // Reset 清空此次Write的标记,本次缓存的数据无效 - Reset() - // FreeHead 从头部释放指定大小内存 FreeHead() @@ -127,6 +134,8 @@ func (m *memoryPool) allocate(size int) []byte { } func (m *memoryPool) Mark() { + utils.Assert(!m.mark) + m.markIndex = m.tail m.mark = true } @@ -144,8 +153,9 @@ func (m *memoryPool) Reserve(size int) { } func (m *memoryPool) Allocate(size int) []byte { - utils.Assert(m.mark) - return m.allocate(size) + m.Mark() + _ = m.allocate(size) + return m.Fetch() } func (m *memoryPool) Fetch() []byte { @@ -163,6 +173,7 @@ func (m *memoryPool) Reset() { } func (m *memoryPool) FreeHead() { + utils.Assert(!m.mark) utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.Pop().(int) @@ -177,6 +188,7 @@ func (m *memoryPool) FreeHead() { } func (m *memoryPool) FreeTail() { + utils.Assert(!m.mark) utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.PopBack().(int) diff --git a/stream/queue.go b/stream/queue.go index 1e214fa..fb86ccb 100644 --- a/stream/queue.go +++ b/stream/queue.go @@ -1,6 +1,9 @@ package stream -import "github.com/yangjiechina/avformat/utils" +import ( + "github.com/yangjiechina/avformat/libbufio" + "github.com/yangjiechina/avformat/utils" +) type Queue struct { *ringBuffer @@ -42,7 +45,7 @@ func (q *Queue) PopBack() interface{} { value := q.ringBuffer.Tail() q.size-- - q.tail = utils.MaxInt(0, q.tail-1) + q.tail = libbufio.MaxInt(0, q.tail-1) return value } diff --git a/stream/sink.go b/stream/sink.go index b99d4be..a3be9e2 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -1,6 +1,7 @@ package stream import ( + "encoding/binary" "fmt" "github.com/yangjiechina/avformat/utils" "net" @@ -50,13 +51,13 @@ type ISink interface { func GenerateSinkId(addr net.Addr) SinkId { network := addr.Network() if "tcp" == network { - id := uint64(utils.BytesToInt(addr.(*net.TCPAddr).IP.To4())) + id := uint64(binary.BigEndian.Uint32(addr.(*net.TCPAddr).IP.To4())) id <<= 32 id |= uint64(addr.(*net.TCPAddr).Port << 16) return id } else if "udp" == network { - id := uint64(utils.BytesToInt(addr.(*net.UDPAddr).IP.To4())) + id := uint64(binary.BigEndian.Uint32(addr.(*net.UDPAddr).IP.To4())) id <<= 32 id |= uint64(addr.(*net.UDPAddr).Port << 16) diff --git a/stream/trans_stream.go b/stream/trans_stream.go index 6f8d453..c8ce2b8 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -198,13 +198,13 @@ func (c *CacheTransStream) Full(ts int64) bool { } func (c *CacheTransStream) SwapStreamBuffer() { - utils.Assert(c.ExistVideo) + if c.ExistVideo && AppConfig.MergeWriteLatency > 0 { + tmp := c.StreamBuffers[0] + c.StreamBuffers[0] = c.StreamBuffers[1] + c.StreamBuffers[1] = tmp + } - tmp := c.StreamBuffers[0] - c.StreamBuffers[0] = c.StreamBuffers[1] - c.StreamBuffers[1] = tmp c.StreamBuffers[0].Clear() - c.PrePacketTS = -1 c.SegmentOffset = 0 }