From bedf402ab43edf9e75878f60a925aaae9314e29e Mon Sep 17 00:00:00 2001 From: ydajiang Date: Fri, 18 Apr 2025 10:58:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BD=BF=E7=94=A8=E5=BC=95=E7=94=A8?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E5=99=A8=E7=AE=A1=E7=90=86=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E5=86=99=E5=88=87=E7=89=87=E7=9A=84=E7=94=9F=E5=91=BD=E5=91=A8?= =?UTF-8?q?=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flv/flv_sink.go | 3 +- flv/http_flv.go | 36 +++--- flv/ws_conn.go | 2 +- gb28181/forward_sink.go | 9 +- gb28181/source.go | 4 +- hls/hls_stream.go | 5 +- record/record_flv.go | 10 +- rtc/rtc_sink.go | 5 +- rtc/rtc_stream.go | 9 +- rtmp/rtmp_stream.go | 17 +-- rtsp/rtsp_sink.go | 9 +- rtsp/rtsp_stream.go | 17 ++- stream/mw_buffer.go | 267 +++++++++++----------------------------- stream/mwb_pool.go | 100 +++++++++++++++ stream/sink.go | 79 +++++++----- stream/source.go | 98 +++++---------- stream/trans_stream.go | 64 ++++------ 17 files changed, 349 insertions(+), 385 deletions(-) create mode 100644 stream/mwb_pool.go diff --git a/flv/flv_sink.go b/flv/flv_sink.go index 4525a00..fdf3487 100644 --- a/flv/flv_sink.go +++ b/flv/flv_sink.go @@ -1,6 +1,7 @@ package flv import ( + "github.com/lkmio/avformat/collections" "github.com/lkmio/lkm/stream" "github.com/lkmio/transport" "net" @@ -16,7 +17,7 @@ func (s *Sink) StopStreaming(stream stream.TransStream) { s.prevTagSize = stream.(*TransStream).Muxer.PrevTagSize() } -func (s *Sink) Write(index int, data [][]byte, ts int64) error { +func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { // 恢复推流时, 不发送9个字节的flv header if s.prevTagSize > 0 { data = data[1:] diff --git a/flv/http_flv.go b/flv/http_flv.go index 3de1240..95f60ad 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -3,6 +3,7 @@ package flv import ( "encoding/binary" "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/flv" "github.com/lkmio/flv/amf0" @@ -19,7 +20,7 @@ type TransStream struct { flvExtraDataPreTagSize uint32 } -func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) { +func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { t.ClearOutStreamBuffer() var flvTagSize int @@ -82,13 +83,14 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e copy(bytes[n:], data) // 合并写满再发 - if segment, key := t.MWBuffer.TryFlushSegment(); len(segment) > 0 { + if segment, key := t.MWBuffer.TryFlushSegment(); segment != nil { if !keyBuffer { keyBuffer = key } // 已经分配末尾换行符内存, 直接添加 - t.AppendOutStreamBuffer(FormatSegment(segment)) + segment.ResetData(FormatSegment(segment.Get())) + t.AppendOutStreamBuffer(segment) } return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil @@ -128,38 +130,33 @@ func (t *TransStream) WriteHeader() error { return nil } -func (t *TransStream) ReadExtraData(_ int64) ([][]byte, int64, error) { - return [][]byte{GetHttpFLVBlock(t.flvHeaderBlock), GetHttpFLVBlock(t.flvExtraDataBlock)}, 0, nil +func (t *TransStream) ReadExtraData(_ int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { + return []*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(GetHttpFLVBlock(t.flvHeaderBlock)), collections.NewReferenceCounter(GetHttpFLVBlock(t.flvExtraDataBlock))}, 0, nil } -func (t *TransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) { +func (t *TransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) { t.ClearOutStreamBuffer() // 发送当前内存池已有的合并写切片 - t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) { + t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(segment *collections.ReferenceCounter[[]byte]) { // 修改第一个flv tag的pre tag size为sequence header tag size + bytes := segment.Get() if t.OutBufferSize < 1 { - binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], t.flvExtraDataPreTagSize) + binary.BigEndian.PutUint32(GetFLVTag(bytes), t.flvExtraDataPreTagSize) } - // 遍历发送合并写切片 - var index int - for ; index < len(bytes); index += 4 { - size := binary.BigEndian.Uint32(bytes[index:]) - t.AppendOutStreamBuffer(GetHttpFLVBlock(bytes[index : index+4+int(size)])) - index += int(size) - } + t.AppendOutStreamBuffer(segment) }) return t.OutBuffer[:t.OutBufferSize], 0, nil } -func (t *TransStream) Close() ([][]byte, int64, error) { +func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { t.ClearOutStreamBuffer() // 发送剩余的流 if !t.MWBuffer.IsNewSegment() { - if segment, _ := t.flushSegment(); len(segment) > 0 { + if segment, _ := t.flushSegment(); segment != nil { t.AppendOutStreamBuffer(segment) } } @@ -168,11 +165,12 @@ func (t *TransStream) Close() ([][]byte, int64, error) { } // 保存为完整的http-flv切片 -func (t *TransStream) flushSegment() ([]byte, bool) { +func (t *TransStream) flushSegment() (*collections.ReferenceCounter[[]byte], bool) { // 预览末尾换行符 t.MWBuffer.Reserve(2) segment, key := t.MWBuffer.FlushSegment() - return FormatSegment(segment), key + segment.ResetData(FormatSegment(segment.Get())) + return segment, key } func NewHttpTransStream(metadata *amf0.Object, prevTagSize uint32) stream.TransStream { diff --git a/flv/ws_conn.go b/flv/ws_conn.go index 3262246..a141be6 100644 --- a/flv/ws_conn.go +++ b/flv/ws_conn.go @@ -16,7 +16,7 @@ func (w WSConn) Read(b []byte) (n int, err error) { } func (w WSConn) Write(block []byte) (n int, err error) { - // ws-flv负载的时flv tag + // ws-flv负载的是flv tag return 0, w.WriteMessage(websocket.BinaryMessage, GetFLVTag(block)) } diff --git a/gb28181/forward_sink.go b/gb28181/forward_sink.go index 9595b8d..2cb3ccc 100644 --- a/gb28181/forward_sink.go +++ b/gb28181/forward_sink.go @@ -1,6 +1,7 @@ package gb28181 import ( + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" @@ -39,7 +40,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) { f.Close() } -func (f *ForwardSink) Write(index int, data [][]byte, ts int64) error { +func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { if SetupUDP != f.setup && f.Conn == nil { return nil } @@ -50,12 +51,12 @@ func (f *ForwardSink) Write(index int, data [][]byte, ts int64) error { } // 修改为与上级协商的SSRC - rtp.ModifySSRC(data[0], f.ssrc) + rtp.ModifySSRC(data[0].Get(), f.ssrc) if SetupUDP == f.setup { - f.socket.(*transport.UDPClient).Write(data[0][2:]) + f.socket.(*transport.UDPClient).Write(data[0].Get()[2:]) } else { - if _, err := f.Conn.Write(data[0]); err != nil { + if _, err := f.Conn.Write(data[0].Get()); err != nil { return err } } diff --git a/gb28181/source.go b/gb28181/source.go index 40d59c5..fd85454 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -3,6 +3,7 @@ package gb28181 import ( "fmt" "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" @@ -82,7 +83,8 @@ func (source *BaseGBSource) Input(data []byte) error { } bytes := transStream.(*ForwardStream).WrapData(data) - rtpPacket := [1][]byte{bytes} + rtpPacket := [1]*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(bytes)} + source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true) } diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 9074775..b7fd43b 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -3,6 +3,7 @@ package hls import ( "fmt" "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" @@ -37,7 +38,7 @@ type TransStream struct { PlaylistFormat *string // 位于内存中的播放列表,每个sink都引用指针地址. } -func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) { +func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { // 创建一下个切片 // 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片 if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) { @@ -190,7 +191,7 @@ func (t *TransStream) createSegment() error { return nil } -func (t *TransStream) Close() ([][]byte, int64, error) { +func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { var err error if t.ctx.file != nil { diff --git a/record/record_flv.go b/record/record_flv.go index 0b305b6..7088719 100644 --- a/record/record_flv.go +++ b/record/record_flv.go @@ -1,6 +1,7 @@ package record import ( + "github.com/lkmio/avformat/collections" "github.com/lkmio/lkm/stream" "os" "path/filepath" @@ -14,7 +15,7 @@ type FLVFileSink struct { } // Input 输入http-flv数据 -func (f *FLVFileSink) Write(index int, blocks [][]byte, ts int64) error { +func (f *FLVFileSink) Write(index int, blocks []*collections.ReferenceCounter[[]byte], ts int64) error { if f.fail { return nil } @@ -22,14 +23,15 @@ func (f *FLVFileSink) Write(index int, blocks [][]byte, ts int64) error { for _, data := range blocks { // 去掉不需要的换行符 var offset int - for i := 2; i < len(data); i++ { - if data[i-2] == 0x0D && data[i-1] == 0x0A { + bytes := data.Get() + for i := 2; i < len(bytes); i++ { + if bytes[i-2] == 0x0D && bytes[i-1] == 0x0A { offset = i break } } - _, err := f.file.Write(data[offset : len(data)-2]) + _, err := f.file.Write(bytes[offset : len(bytes)-2]) if err != nil { // 只要写入失败一次,后续不再允许写入, 不影响拉流 f.fail = true diff --git a/rtc/rtc_sink.go b/rtc/rtc_sink.go index 0216f2b..75de538 100644 --- a/rtc/rtc_sink.go +++ b/rtc/rtc_sink.go @@ -2,6 +2,7 @@ package rtc import ( "fmt" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" @@ -140,14 +141,14 @@ func (s *Sink) Close() { } } -func (s *Sink) Write(index int, data [][]byte, ts int64) error { +func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { if s.tracks[index] == nil { return nil } for _, bytes := range data { err := s.tracks[index].WriteSample(media.Sample{ - Data: bytes, + Data: bytes.Get(), Duration: time.Duration(ts) * time.Millisecond, }) diff --git a/rtc/rtc_stream.go b/rtc/rtc_stream.go index c60c1b0..92c0aff 100644 --- a/rtc/rtc_stream.go +++ b/rtc/rtc_stream.go @@ -2,6 +2,7 @@ package rtc import ( "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/stream" "github.com/pion/interceptor" @@ -17,20 +18,20 @@ type transStream struct { stream.BaseTransStream } -func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) { +func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { t.ClearOutStreamBuffer() if utils.AVMediaTypeAudio == packet.MediaType { - t.AppendOutStreamBuffer(packet.Data) + t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data)) } else if utils.AVMediaTypeVideo == packet.MediaType { avStream := t.BaseTransStream.Tracks[packet.Index].Stream if packet.Key { extra := avStream.CodecParameters.AnnexBExtraData() - t.AppendOutStreamBuffer(extra) + t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra)) } data := avformat.AVCCPacket2AnnexB(avStream, packet) - t.AppendOutStreamBuffer(data) + t.AppendOutStreamBuffer(collections.NewReferenceCounter(data)) } return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.GetDuration(1000))), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index 57b0a69..98d7e96 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -2,6 +2,7 @@ package rtmp import ( "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/flv" "github.com/lkmio/flv/amf0" @@ -20,7 +21,7 @@ type transStream struct { metaData *amf0.Object // 推流方携带的元数据 } -func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) { +func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { t.ClearOutStreamBuffer() var data []byte @@ -106,7 +107,7 @@ func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e utils.Assert(len(bytes) == n) // 合并写满了再发 - if segment, key := t.MWBuffer.TryFlushSegment(); len(segment) > 0 { + if segment, key := t.MWBuffer.TryFlushSegment(); segment != nil { keyBuffer = key t.AppendOutStreamBuffer(segment) } @@ -114,18 +115,18 @@ func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil } -func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) { +func (t *transStream) ReadExtraData(_ int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { utils.Assert(len(t.sequenceHeader) > 0) // 发送sequence sequenceHeader - return [][]byte{t.sequenceHeader}, 0, nil + return []*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(t.sequenceHeader)}, 0, nil } -func (t *transStream) ReadKeyFrameBuffer() ([][]byte, int64, error) { +func (t *transStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) { t.ClearOutStreamBuffer() // 发送当前内存池已有的合并写切片 - t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) { + t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes *collections.ReferenceCounter[[]byte]) { t.AppendOutStreamBuffer(bytes) }) @@ -222,11 +223,11 @@ func (t *transStream) WriteHeader() error { return nil } -func (t *transStream) Close() ([][]byte, int64, error) { +func (t *transStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { t.ClearOutStreamBuffer() // 发送剩余的流 - if segment, _ := t.MWBuffer.FlushSegment(); len(segment) > 0 { + if segment, _ := t.MWBuffer.FlushSegment(); segment != nil { t.AppendOutStreamBuffer(segment) } diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go index e353a4d..1699dbf 100644 --- a/rtsp/rtsp_sink.go +++ b/rtsp/rtsp_sink.go @@ -1,6 +1,7 @@ package rtsp import ( + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" @@ -79,7 +80,7 @@ func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro return rtpPort, rtcpPort, err } -func (s *Sink) Write(index int, data [][]byte, rtpTime int64) error { +func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rtpTime int64) error { // 拉流方还没有连接上来 if index >= cap(s.senders) || s.senders[index] == nil { return nil @@ -88,12 +89,12 @@ func (s *Sink) Write(index int, data [][]byte, rtpTime int64) error { for _, bytes := range data { sender := s.senders[index] sender.PktCount++ - sender.OctetCount += len(bytes) + sender.OctetCount += len(bytes.Get()) if s.TCPStreaming { - s.Conn.Write(bytes) + s.Conn.Write(bytes.Get()) } else { //发送rtcp sr包 - sender.RtpConn.Write(bytes[OverTcpHeaderSize:]) + sender.RtpConn.Write(bytes.Get()[OverTcpHeaderSize:]) if sender.RtcpConn == nil || sender.PktCount%100 != 0 { continue diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index aaf778b..affd86e 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/lkmio/avformat" "github.com/lkmio/avformat/avc" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/stream" "github.com/lkmio/rtp" @@ -39,7 +40,7 @@ func (t *TransStream) OverTCP(data []byte, channel int) { binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4)) } -func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) { +func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { t.ClearOutStreamBuffer() var ts uint32 @@ -57,7 +58,9 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil } -func (t *TransStream) ReadExtraData(ts int64) ([][]byte, int64, error) { +func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { + t.ClearOutStreamBuffer() + // 返回视频编码数据的rtp包 for _, track := range t.RtspTracks { if utils.AVMediaTypeVideo != track.MediaType { @@ -71,7 +74,11 @@ func (t *TransStream) ReadExtraData(ts int64) ([][]byte, int64, error) { binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts)) } - return track.ExtraDataBuffer, ts, nil + for _, data := range track.ExtraDataBuffer { + t.AppendOutStreamBuffer(collections.NewReferenceCounter(data)) + } + + return t.OutBuffer[:t.OutBufferSize], ts, nil } return nil, ts, nil @@ -91,7 +98,7 @@ func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, tim packet := t.buffer.Get(index)[:OverTcpHeaderSize+len(bytes)] t.OverTCP(packet, channel) - t.AppendOutStreamBuffer(packet) + t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet)) }) } @@ -158,7 +165,7 @@ func (t *TransStream) AddTrack(track *stream.Track) error { return nil } -func (t *TransStream) Close() ([][]byte, int64, error) { +func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { for _, track := range t.RtspTracks { if track != nil { track.Close() diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go index 9ed16f1..2cf4d5b 100644 --- a/stream/mw_buffer.go +++ b/stream/mw_buffer.go @@ -1,27 +1,19 @@ package stream import ( - "github.com/lkmio/avformat/bufio" "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" ) -const ( - BlockBufferSize = 1024 * 1024 * 2 - BlockBufferCount = 4 -) - // MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存 type MergeWritingBuffer interface { - TryGrow() bool - TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool) // TryFlushSegment 尝试生成切片, 如果时长不足, 返回nil - TryFlushSegment() ([]byte, bool) + TryFlushSegment() (*collections.ReferenceCounter[[]byte], bool) // FlushSegment 生成并返回当前切片, 以及是否是关键帧切片. - FlushSegment() ([]byte, bool) + FlushSegment() (*collections.ReferenceCounter[[]byte], bool) // ShouldFlush 当前切片是否已达到生成条件 ShouldFlush(ts int64) bool @@ -33,161 +25,46 @@ type MergeWritingBuffer interface { Reserve(length int) // ReadSegmentsFromKeyFrameIndex 返回最近的关键帧切片 - ReadSegmentsFromKeyFrameIndex(cb func([]byte)) - - Capacity() int + ReadSegmentsFromKeyFrameIndex(cb func(*collections.ReferenceCounter[[]byte])) HasVideoDataInCurrentSegment() bool + + Close() *collections.Queue[*mbBuffer] +} + +type mbBuffer struct { + buffer collections.BlockBuffer + segments *collections.Queue[*collections.ReferenceCounter[[]byte]] } type mergeWritingBuffer struct { - buffers []struct { - buffer collections.BlockBuffer - nextSegmentDataSize int - preSegmentsDataSize int - preSegmentCount int + buffers *collections.Queue[*mbBuffer] + lastKeyVideoDataSegments *collections.Queue[*collections.ReferenceCounter[[]byte]] // 最近的关键帧切片 - prevSegments *collections.Queue[struct { - data []byte - key bool - }] - segments *collections.Queue[struct { - data []byte - key bool - }] - } - - index int // 当前使用内存池的索引 startTS int64 // 当前切片的开始时间 duration int // 当前切片时长 - hasKeyVideoDataInCurrentSegment bool // 当前切片是否存在关键视频帧 - hasVideoDataInCurrentSegment bool // 当前切片是否存在视频帧 - completedKeyVideoSegmentPositions []int64 // 完整视频关键帧切片的数量 - existVideo bool // 是否存在视频 - segmentCount int // 切片数量 -} - -func (m *mergeWritingBuffer) createBuffer(minSize int) collections.BlockBuffer { - var size int - if !m.existVideo { - size = 1024 * 500 - } else { - size = BlockBufferSize - } - - return collections.NewDirectBlockBuffer(bufio.MaxInt(size, minSize)) -} - -func (m *mergeWritingBuffer) grow(minSize int) { - m.buffers = append(m.buffers, struct { - buffer collections.BlockBuffer - nextSegmentDataSize int - preSegmentsDataSize int - preSegmentCount int - prevSegments *collections.Queue[struct { - data []byte - key bool - }] - segments *collections.Queue[struct { - data []byte - key bool - }] - }{buffer: m.createBuffer(minSize), prevSegments: collections.NewQueue[struct { - data []byte - key bool - }](64), segments: collections.NewQueue[struct { - data []byte - key bool - }](64)}) -} - -func (m *mergeWritingBuffer) TryGrow() bool { - var ok bool - if !m.existVideo { - ok = len(m.buffers) < 1 - } else { - ok = len(m.buffers) < BlockBufferCount - } - - if ok { - m.grow(0) - } - - return ok -} - -func (m *mergeWritingBuffer) RemoveSegment() { - segment := m.buffers[m.index].prevSegments.Pop() - m.buffers[m.index].nextSegmentDataSize += len(segment.data) - m.segmentCount-- - - if segment.key { - m.completedKeyVideoSegmentPositions = m.completedKeyVideoSegmentPositions[1:] - } + hasKeyVideoDataInCurrentSegment bool // 当前切片是否存在关键视频帧 + hasVideoDataInCurrentSegment bool // 当前切片是否存在视频帧 + existVideo bool // 是否存在视频 } func (m *mergeWritingBuffer) TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool) { - length := len(m.buffers) - if length < 1 { - m.grow(size) + if m.buffers.IsEmpty() { + m.buffers.Push(MWBufferPool.Get().(*mbBuffer)) } - bytes := m.buffers[m.index].buffer.AvailableBytes() + buffer := m.buffers.Peek(m.buffers.Size() - 1).buffer + bytes := buffer.AvailableBytes() if bytes < size { // 非完整切片,先保存切片再分配新的内存 - if m.buffers[m.index].buffer.PendingBlockSize() > 0 { + if buffer.PendingBlockSize() > 0 { return nil, false } - // 还未遇到2组GOP, 不能释放旧的内存池, 创建新的内存池 - // 其他情况, 调用tryAlloc, 手动申请内存 - if m.existVideo && AppConfig.GOPCache && len(m.completedKeyVideoSegmentPositions) < 2 { - m.grow(size) - } - - // 即将使用下一个内存池, 清空上次创建的切片 - for m.buffers[m.index].prevSegments.Size() > 0 { - m.RemoveSegment() - } - - // 使用下一块内存, 或者从头覆盖 - if m.index+1 < len(m.buffers) { - m.index++ - } else { - m.index = 0 - } - - // 复用内存池, 将未清空完的上上次创建的切片放在尾部 - //for m.buffers[m.index].prevSegments.Size() > 0 { - // m.buffers[m.index].segments.Push(m.buffers[m.index].prevSegments.Pop()) - //} - - // 复用内存池, 清空上上次创建的切片 - //for m.buffers[m.index].prevSegments.Size() > 0 { - // m.RemoveSegment() - //} - - // 复用内存池, 保留上次内存池创建的切片 - m.buffers[m.index].nextSegmentDataSize = 0 - m.buffers[m.index].preSegmentsDataSize = 0 - m.buffers[m.index].preSegmentCount = m.buffers[m.index].segments.Size() - m.buffers[m.index].buffer.Clear() - if m.buffers[m.index].preSegmentCount > 0 { - m.buffers[m.index].prevSegments.Clear() - tmp := m.buffers[m.index].prevSegments - m.buffers[m.index].prevSegments = m.buffers[m.index].segments - m.buffers[m.index].segments = tmp - m.RemoveSegment() - } - } - - // 复用旧的内存池, 减少计数 - if !m.buffers[m.index].prevSegments.IsEmpty() { - totalSize := len(m.buffers[m.index].buffer.(*collections.DirectBlockBuffer).Data()) + size - for !m.buffers[m.index].prevSegments.IsEmpty() && totalSize > m.buffers[m.index].nextSegmentDataSize { - m.RemoveSegment() - } + // -1, 当前内存池不释放 + release(m.buffers, m.buffers.Size()-1) + m.buffers.Push(MWBufferPool.Get().(*mbBuffer)) } return m.alloc(size, ts, videoPkt, videoKey), true @@ -195,7 +72,8 @@ func (m *mergeWritingBuffer) TryAlloc(size int, ts int64, videoPkt, videoKey boo func (m *mergeWritingBuffer) alloc(size int, ts int64, videoPkt, videoKey bool) []byte { utils.Assert(ts != -1) - bytes := m.buffers[m.index].buffer.AvailableBytes() + buffer := m.buffers.Peek(m.buffers.Size() - 1).buffer + bytes := buffer.AvailableBytes() // 当前切片必须有足够空间, 否则先调用TryAlloc utils.Assert(bytes >= size) @@ -217,35 +95,42 @@ func (m *mergeWritingBuffer) alloc(size int, ts int64, videoPkt, videoKey bool) } m.duration = int(ts - m.startTS) - return m.buffers[m.index].buffer.Alloc(size) + return buffer.Alloc(size) } -func (m *mergeWritingBuffer) FlushSegment() ([]byte, bool) { - data := m.buffers[m.index].buffer.Feat() +func (m *mergeWritingBuffer) FlushSegment() (*collections.ReferenceCounter[[]byte], bool) { + buffer := m.buffers.Peek(m.buffers.Size() - 1) + data := buffer.buffer.Feat() if len(data) == 0 { return nil, false } - m.segmentCount++ - key := m.hasKeyVideoDataInCurrentSegment - m.hasKeyVideoDataInCurrentSegment = false - if key { - m.completedKeyVideoSegmentPositions = append(m.completedKeyVideoSegmentPositions, int64(m.index<<32|m.buffers[m.index].segments.Size())) + counter := collections.NewReferenceCounter(data) + // 遇到完整关键帧切片, 替代前一组 + if m.hasKeyVideoDataInCurrentSegment { + for m.lastKeyVideoDataSegments.Size() > 0 { + segment := m.lastKeyVideoDataSegments.Pop() + segment.Release() + } } - m.buffers[m.index].segments.Push(struct { - data []byte - key bool - }{data: data, key: key}) + if AppConfig.GOPCache { + counter.Refer() + m.lastKeyVideoDataSegments.Push(counter) + } + + buffer.segments.Push(counter) // 清空下一个切片的标记 m.startTS = -1 m.duration = 0 m.hasVideoDataInCurrentSegment = false - return data, key + key := m.hasKeyVideoDataInCurrentSegment + m.hasKeyVideoDataInCurrentSegment = false + return counter, key } -func (m *mergeWritingBuffer) TryFlushSegment() ([]byte, bool) { +func (m *mergeWritingBuffer) TryFlushSegment() (*collections.ReferenceCounter[[]byte], bool) { if (!AppConfig.GOPCache || !m.existVideo) || m.duration >= AppConfig.MergeWriteLatency { return m.FlushSegment() } @@ -262,55 +147,51 @@ func (m *mergeWritingBuffer) ShouldFlush(ts int64) bool { } func (m *mergeWritingBuffer) IsNewSegment() bool { - return m.buffers == nil || m.buffers[m.index].buffer.PendingBlockSize() == 0 + size := m.buffers.Size() + return size == 0 || m.buffers.Peek(size-1).buffer.PendingBlockSize() == 0 } func (m *mergeWritingBuffer) Reserve(size int) { - _ = m.buffers[m.index].buffer.Alloc(size) + _ = m.buffers.Peek(m.buffers.Size() - 1).buffer.Alloc(size) } -func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { - if !AppConfig.GOPCache || !m.existVideo || len(m.completedKeyVideoSegmentPositions) < 1 { +func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func(*collections.ReferenceCounter[[]byte])) { + if !AppConfig.GOPCache || !m.existVideo || m.lastKeyVideoDataSegments.Size() == 0 { return } - marker := m.completedKeyVideoSegmentPositions[len(m.completedKeyVideoSegmentPositions)-1] - bufferIndex := int(marker >> 32 & 0xFFFFFFFF) - position := int(marker & 0xFFFFFFFF) - - var ranges [][2]int - // 回环 - if m.index < bufferIndex { - ranges = append(ranges, [2]int{bufferIndex, len(m.buffers) - 1}) - ranges = append(ranges, [2]int{0, m.index}) - } else { - ranges = append(ranges, [2]int{bufferIndex, m.index}) + size := m.lastKeyVideoDataSegments.Size() + for i := 0; i < size; i++ { + cb(m.lastKeyVideoDataSegments.Peek(i)) } - - for _, ints := range ranges { - for i := ints[0]; i <= ints[1]; i++ { - - for j := position; j < m.buffers[i].segments.Size(); j++ { - cb(m.buffers[i].segments.Peek(j).data) - } - - // 后续的切片, 从0开始 - position = 0 - } - } -} - -func (m *mergeWritingBuffer) Capacity() int { - return m.segmentCount } func (m *mergeWritingBuffer) HasVideoDataInCurrentSegment() bool { return m.hasVideoDataInCurrentSegment } +func (m *mergeWritingBuffer) Close() *collections.Queue[*mbBuffer] { + for m.lastKeyVideoDataSegments.Size() > 0 { + m.lastKeyVideoDataSegments.Pop().Release() + } + + if m.buffers.Size() > 0 && !release(m.buffers, m.buffers.Size()) { + return m.buffers + } + + return nil +} + func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { - return &mergeWritingBuffer{ + buffer := &mergeWritingBuffer{ startTS: -1, existVideo: existVideo, + buffers: collections.NewQueue[*mbBuffer](24), } + + if existVideo && AppConfig.GOPCache { + buffer.lastKeyVideoDataSegments = collections.NewQueue[*collections.ReferenceCounter[[]byte]](36) + } + + return buffer } diff --git a/stream/mwb_pool.go b/stream/mwb_pool.go new file mode 100644 index 0000000..26e1010 --- /dev/null +++ b/stream/mwb_pool.go @@ -0,0 +1,100 @@ +package stream + +import ( + "fmt" + "github.com/lkmio/avformat/collections" + "github.com/lkmio/lkm/log" + "sync" +) + +const ( + BlockBufferSize = 1024 * 1024 * 2 +) + +var ( + MWBufferPool = sync.Pool{ + New: func() any { + log.Sugar.Debug("create new merge writing buffer") + + return &mbBuffer{ + buffer: collections.NewDirectBlockBuffer(BlockBufferSize), + segments: collections.NewQueue[*collections.ReferenceCounter[[]byte]](32), + } + }, + } + + pendingReleaseBuffers = make(map[string]*collections.Queue[*mbBuffer]) + lock sync.Mutex +) + +func AddMWBuffersToPending(sourceId string, transStreamId TransStreamID, buffers *collections.Queue[*mbBuffer]) { + key := fmt.Sprintf("%s-%d", sourceId, transStreamId) + + lock.Lock() + defer lock.Unlock() + + for buffers.Size() > 0 { + v, ok := pendingReleaseBuffers[key] + if ok { + // 第二次都推流结束了,第一次的内存还被占用 + // 强制释放上次推流的内存池 + log.Sugar.Warnf("force release last pending buffers of %s", key) + + for v.Size() > 0 { + pop := v.Pop() + pop.buffer.Clear() + pop.segments.Clear() + MWBufferPool.Put(pop) + } + + delete(pendingReleaseBuffers, key) + } + + pendingReleaseBuffers[key] = buffers + } +} + +func ReleasePendingBuffers(sourceId string, transStreamId TransStreamID) { + key := fmt.Sprintf("%s-%d", sourceId, transStreamId) + + lock.Lock() + defer lock.Unlock() + + v, ok := pendingReleaseBuffers[key] + if !ok || !release(v, v.Size()) { + return + } + + delete(pendingReleaseBuffers, key) +} + +func release(buffers *collections.Queue[*mbBuffer], length int) bool { + var count int + for i := 0; i < length; i++ { + buffer := buffers.Peek(i) + size := buffer.segments.Size() + + var j int + for ; j < size; j++ { + segment := buffer.segments.Peek(0) + if segment.UseCount() > 1 { + break + } + + buffer.segments.Pop() + } + + // 所有切片都已经没有使用, 释放内存池 + if j == size { + buffer.buffer.Clear() + MWBufferPool.Put(buffer) + count++ + } + } + + for ; count > 0; count-- { + buffers.Pop() + } + + return buffers.Size() == 0 +} diff --git a/stream/sink.go b/stream/sink.go index 26806d6..16bfbea 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -3,13 +3,12 @@ package stream import ( "context" "fmt" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" - "github.com/lkmio/transport" "net" "net/url" "sync" - "sync/atomic" "time" ) @@ -21,7 +20,7 @@ type Sink interface { GetSourceID() string - Write(index int, data [][]byte, ts int64) error + Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error GetTransStreamID() TransStreamID @@ -91,12 +90,6 @@ type Sink interface { // EnableAsyncWriteMode 开启异步发送 EnableAsyncWriteMode(queueSize int) - // Pause 暂停推流 - Pause() - - // IsExited 异步发送协程是否退出, 如果还没有退出(write阻塞)不恢复推流 - IsExited() bool - PendingSendQueueSize() int } @@ -121,10 +114,11 @@ type BaseSink struct { Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流. createTime time.Time - existed atomic.Bool - pendingSendQueue chan []byte // 等待发送的数据队列 - cancelFunc func() - cancelCtx context.Context + pendingSendQueue chan *collections.ReferenceCounter[[]byte] // 等待发送的数据队列 + blockedBufferList *collections.LinkedList[*collections.ReferenceCounter[[]byte]] // 异步队列阻塞后的切片数据 + + cancelFunc func() + cancelCtx context.Context } func (s *BaseSink) GetID() SinkID { @@ -136,14 +130,28 @@ func (s *BaseSink) SetID(id SinkID) { } func (s *BaseSink) doAsyncWrite() { - defer s.existed.Store(true) + defer func() { + // 释放未发送的数据 + for len(s.pendingSendQueue) > 0 { + buffer := <-s.pendingSendQueue + buffer.Release() + } + + for s.blockedBufferList.Size() > 0 { + buffer := s.blockedBufferList.Remove(0) + buffer.Release() + } + + ReleasePendingBuffers(s.SourceID, s.TransStreamID) + }() for { select { case <-s.cancelCtx.Done(): return case data := <-s.pendingSendQueue: - s.Conn.Write(data) + s.Conn.Write(data.Get()) + data.Release() break } } @@ -151,37 +159,48 @@ func (s *BaseSink) doAsyncWrite() { func (s *BaseSink) EnableAsyncWriteMode(queueSize int) { utils.Assert(s.Conn != nil) - s.existed.Store(false) - s.pendingSendQueue = make(chan []byte, queueSize) + s.pendingSendQueue = make(chan *collections.ReferenceCounter[[]byte], queueSize) + s.blockedBufferList = &collections.LinkedList[*collections.ReferenceCounter[[]byte]]{} s.cancelCtx, s.cancelFunc = context.WithCancel(context.Background()) go s.doAsyncWrite() } -func (s *BaseSink) Pause() { - if s.cancelCtx != nil { - s.cancelFunc() - } -} - -func (s *BaseSink) IsExited() bool { - return s.existed.Load() -} - -func (s *BaseSink) Write(index int, data [][]byte, ts int64) error { +func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { if s.Conn == nil { return nil } + // 发送被阻塞的数据 + for s.blockedBufferList.Size() > 0 { + bytes := s.blockedBufferList.Get(0) + select { + case s.pendingSendQueue <- bytes: + s.blockedBufferList.Remove(0) + break + default: + // 发送被阻塞的数据失败, 将本次发送的数据加入阻塞队列 + for _, datum := range data { + s.blockedBufferList.Add(datum) + datum.Refer() + } + return nil + } + } + for _, bytes := range data { if s.cancelCtx != nil { + bytes.Refer() select { case s.pendingSendQueue <- bytes: break default: - return transport.ZeroWindowSizeError{} + // 将本次发送的数据加入阻塞队列 + s.blockedBufferList.Add(bytes) + //return transport.ZeroWindowSizeError{} + return nil } } else { - _, err := s.Conn.Write(bytes) + _, err := s.Conn.Write(bytes.Get()) if err != nil { return err } diff --git a/stream/source.go b/stream/source.go index d5dabca..4a5e272 100644 --- a/stream/source.go +++ b/stream/source.go @@ -3,6 +3,7 @@ package stream import ( "fmt" "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/lkm/log" "github.com/lkmio/transport" "net" @@ -135,7 +136,6 @@ type PublishSource struct { TransStreams map[TransStreamID]TransStream // 所有输出流 sinks map[SinkID]Sink // 保存所有Sink - slowSinks map[SinkID]Sink // 因推流慢被挂起的sink队列 TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink streamEndInfo *StreamEndInfo // 之前推流源信息 accumulateTimestamps bool // 是否累加时间戳 @@ -199,7 +199,6 @@ func (s *PublishSource) Init(receiveQueueSize int) { s.TransStreams = make(map[TransStreamID]TransStream, 10) s.sinks = make(map[SinkID]Sink, 128) - s.slowSinks = make(map[SinkID]Sink, 12) s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) s.statistics = NewBitrateStatistics() @@ -318,7 +317,7 @@ func (s *PublishSource) DispatchPacket(transStream TransStream, packet *avformat } // DispatchBuffer 分发传输流 -func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool) { +func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, videoKey bool) { sinks := s.TransStreamSinks[transStream.GetID()] exist := transStream.IsExistVideo() @@ -331,76 +330,42 @@ func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data } if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - s.write(transStream, sink, index, extraData, timestamp) + if ok := s.write(sink, index, extraData, timestamp); !ok { + continue + } } } - s.write(transStream, sink, index, data, timestamp) + if ok := s.write(sink, index, data, timestamp); !ok { + continue + } } } func (s *PublishSource) pendingSink(sink Sink) { - if s.existVideo { - log.Sugar.Errorf("向sink推流超时,挂起%s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID) - // 等待下个关键帧恢复推流 - s.PauseStreaming(sink) - } else { - log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID) - go sink.Close() - } + log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID) + go sink.Close() } // 向sink推流 -func (s *PublishSource) write(transStream TransStream, sink Sink, index int, data [][]byte, timestamp int64) { +func (s *PublishSource) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64) bool { err := sink.Write(index, data, timestamp) - ok := err == nil - - defer func() { - if ok { - sink.IncreaseSentPacketCount() - } - }() - - // 跳过非TCP流和待发送包数量小于合并写缓冲区大小的sink - if !transStream.IsTCPStreaming() || sink.PendingSendQueueSize() <= transStream.Capacity() { - return - } - - // 尝试扩容合并写缓冲区, 不能扩容, 则挂起Sink - if !transStream.GrowMWBuffer() { - ok = false - s.pendingSink(sink) + if err == nil { + sink.IncreaseSentPacketCount() + return true } // 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞. // 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流. - //_, ok := err.(*transport.ZeroWindowSizeError) - //if ok { - // s.pendingSink(sink) - //} -} - -func (s *PublishSource) PauseStreaming(sink Sink) { - s.cleanupSinkStreaming(sink) - s.slowSinks[sink.GetID()] = sink -} - -func (s *PublishSource) ResumeStreaming() { - for id, sink := range s.sinks { - if !sink.IsExited() { - continue - } - - delete(s.slowSinks, id) - ok := s.doAddSink(sink) - if ok { - go sink.Close() - } + if _, ok := err.(transport.ZeroWindowSizeError); ok { + s.pendingSink(sink) } + + return false } // 创建sink需要的输出流 -func (s *PublishSource) doAddSink(sink Sink) bool { +func (s *PublishSource) doAddSink(sink Sink, resume bool) bool { // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() audioTrack := s.originTracks.FindWithType(utils.AVMediaTypeAudio) @@ -478,7 +443,7 @@ func (s *PublishSource) doAddSink(sink Sink) bool { } // 累加拉流计数 - if s.recordSink != sink { + if !resume && s.recordSink != sink { s.sinkCount++ log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) } @@ -495,7 +460,7 @@ func (s *PublishSource) doAddSink(sink Sink) bool { // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. _, ok := sink.GetConn().(*transport.Conn) if ok && sink.IsTCPStreaming() { - sink.EnableAsyncWriteMode(64) + sink.EnableAsyncWriteMode(24) } // 发送已有的缓存数据 @@ -503,10 +468,10 @@ func (s *PublishSource) doAddSink(sink Sink) bool { data, timestamp, _ := transStream.ReadKeyFrameBuffer() if len(data) > 0 { if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - s.write(transStream, sink, 0, extraData, timestamp) + s.write(sink, 0, extraData, timestamp) } - s.write(transStream, sink, 0, data, timestamp) + s.write(sink, 0, data, timestamp) } // 新建传输流,发送已经缓存的音视频帧 @@ -522,7 +487,7 @@ func (s *PublishSource) AddSink(sink Sink) { if !s.completed { AddSinkToWaitingQueue(sink.GetSourceID(), sink) } else { - if !s.doAddSink(sink) { + if !s.doAddSink(sink, false) { go sink.Close() } } @@ -585,7 +550,6 @@ func (s *PublishSource) cleanupSinkStreaming(sink Sink) { func (s *PublishSource) doRemoveSink(sink Sink) bool { s.cleanupSinkStreaming(sink) delete(s.sinks, sink.GetID()) - delete(s.slowSinks, sink.GetID()) s.sinkCount-- log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) @@ -655,6 +619,13 @@ func (s *PublishSource) DoClose() { if len(data) > 0 { s.DispatchBuffer(transStream, -1, data, ts, true) } + + // 如果是tcp传输流, 归还合并写缓冲区 + if !transStream.IsTCPStreaming() || transStream.GetMWBuffer() == nil { + continue + } else if buffers := transStream.GetMWBuffer().Close(); buffers != nil { + AddMWBuffersToPending(s.ID, transStream.GetID(), buffers) + } } // 将所有sink添加到等待队列 @@ -768,7 +739,7 @@ func (s *PublishSource) writeHeader() { } for _, sink := range sinks { - if !s.doAddSink(sink) { + if !s.doAddSink(sink, false) { go sink.Close() } } @@ -880,11 +851,6 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) { s.gopBuffer.AddPacket(packet) } - // 遇到关键帧, 恢复推流 - if utils.AVMediaTypeVideo == packet.MediaType && packet.Key && len(s.slowSinks) > 0 { - s.ResumeStreaming() - } - // track解析完毕后,才能生成传输流 if s.completed { s.CorrectTimestamp(packet) diff --git a/stream/trans_stream.go b/stream/trans_stream.go index f219801..b39d505 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -2,6 +2,7 @@ package stream import ( "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" ) @@ -12,7 +13,7 @@ type TransStream interface { SetID(id TransStreamID) // Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧 - Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) + Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) AddTrack(track *Track) error @@ -29,28 +30,19 @@ type TransStream interface { SetProtocol(protocol TransStreamProtocol) // ReadExtraData 读取传输流的编码器扩展数据 - ReadExtraData(timestamp int64) ([][]byte, int64, error) + ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error) // ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列 - ReadKeyFrameBuffer() ([][]byte, int64, error) + ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) // Close 关闭传输流, 返回还未flush的合并写块 - Close() ([][]byte, int64, error) - - // ClearOutStreamBuffer 清空传输流的合并写块队列 - ClearOutStreamBuffer() - - // AppendOutStreamBuffer 添加合并写块到队列 - AppendOutStreamBuffer(buffer []byte) - - // Capacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小; - Capacity() int + Close() ([]*collections.ReferenceCounter[[]byte], int64, error) IsExistVideo() bool - GrowMWBuffer() bool - IsTCPStreaming() bool + + GetMWBuffer() MergeWritingBuffer } type BaseTransStream struct { @@ -60,8 +52,8 @@ type BaseTransStream struct { ExistVideo bool Protocol TransStreamProtocol - OutBuffer [][]byte // 传输流的合并写块队列 - OutBufferSize int // 传输流返合并写块队列大小 + OutBuffer []*collections.ReferenceCounter[[]byte] // 传输流的合并写块队列 + OutBufferSize int // 传输流返合并写块队列大小 } func (t *BaseTransStream) GetID() TransStreamID { @@ -72,7 +64,7 @@ func (t *BaseTransStream) SetID(id TransStreamID) { t.ID = id } -func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, error) { +func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { return nil, -1, false, nil } @@ -84,7 +76,7 @@ func (t *BaseTransStream) AddTrack(track *Track) error { return nil } -func (t *BaseTransStream) Close() ([][]byte, int64, error) { +func (t *BaseTransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { return nil, 0, nil } @@ -100,11 +92,11 @@ func (t *BaseTransStream) ClearOutStreamBuffer() { t.OutBufferSize = 0 } -func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte) { +func (t *BaseTransStream) AppendOutStreamBuffer(buffer *collections.ReferenceCounter[[]byte]) { if t.OutBufferSize+1 > len(t.OutBuffer) { // 扩容 size := (t.OutBufferSize + 1) * 2 - newBuffer := make([][]byte, size) + newBuffer := make([]*collections.ReferenceCounter[[]byte], size) for i := 0; i < t.OutBufferSize; i++ { newBuffer[i] = t.OutBuffer[i] } @@ -116,10 +108,6 @@ func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte) { t.OutBufferSize++ } -func (t *BaseTransStream) Capacity() int { - return 0 -} - func (t *BaseTransStream) TrackSize() int { return len(t.Tracks) } @@ -132,22 +120,22 @@ func (t *BaseTransStream) IsExistVideo() bool { return t.ExistVideo } -func (t *BaseTransStream) ReadExtraData(timestamp int64) ([][]byte, int64, error) { +func (t *BaseTransStream) ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { return nil, 0, nil } -func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) { +func (t *BaseTransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) { return nil, 0, nil } -func (t *BaseTransStream) GrowMWBuffer() bool { - return false -} - func (t *BaseTransStream) IsTCPStreaming() bool { return false } +func (t *BaseTransStream) GetMWBuffer() MergeWritingBuffer { + return nil +} + type TCPTransStream struct { BaseTransStream @@ -157,16 +145,10 @@ type TCPTransStream struct { MWBuffer MergeWritingBuffer //合并写缓冲区, 同时作为用户态的发送缓冲区 } -func (t *TCPTransStream) Capacity() int { - utils.Assert(t.MWBuffer != nil) - return t.MWBuffer.Capacity() -} - -func (t *TCPTransStream) GrowMWBuffer() bool { - utils.Assert(t.MWBuffer != nil) - return t.MWBuffer.TryGrow() -} - func (t *TCPTransStream) IsTCPStreaming() bool { return true } + +func (t *TCPTransStream) GetMWBuffer() MergeWritingBuffer { + return t.MWBuffer +}