From 6212dda56c55938cf560264eefe3d620bf9879c6 Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Thu, 11 Jul 2024 22:26:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dhttp-flv=E6=8B=89=E6=B5=81?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flv/flv_sink.go | 3 ++- flv/http_flv.go | 24 +++++++++++++----------- stream/mw_buffer.go | 2 +- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/flv/flv_sink.go b/flv/flv_sink.go index 134814b..5f700cd 100644 --- a/flv/flv_sink.go +++ b/flv/flv_sink.go @@ -1,10 +1,11 @@ package flv import ( + "github.com/lkmio/avformat/transport" "github.com/lkmio/lkm/stream" "net" ) func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.Sink { - return &stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: conn} + return &stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: transport.NewConn(conn)} } diff --git a/flv/http_flv.go b/flv/http_flv.go index 3e4f39e..626de32 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -43,11 +43,9 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { videoKey = packet.KeyFrame() } - //发送剩余数据 + //关键帧都放在切片头部,所以需要创建新切片, 发送当前切片剩余流 if videoKey && !t.mwBuffer.IsNewSegment() { - t.mwBuffer.Reserve(2) - segment := t.mwBuffer.FlushSegment() - t.sendUnpackedSegment(segment) + t.forceFlushSegment() } var n int @@ -60,7 +58,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { n = HttpFlvBlockLengthSize } - //结束时, 预留换行符 + //切片末尾, 预留换行符 if t.mwBuffer.IsFull(dts) { separatorSize += 2 } @@ -70,10 +68,8 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) copy(bytes[n:], data) - //添加长度和换行符 - //每一个合并写切片开始和预留长度所需的字节数 - //合并写切片末尾加上换行符 - //长度是16进制字符串 + //添加长度和换行符, 长度是16进制字符串 + //每一个合并写切片, 头部预留长度所需的字节数, 末尾加上换行符 if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 { t.sendUnpackedSegment(segment) } @@ -96,6 +92,12 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error { return nil } +func (t *httpTransStream) forceFlushSegment() { + t.mwBuffer.Reserve(2) + segment := t.mwBuffer.FlushSegment() + t.sendUnpackedSegment(segment) +} + // 发送还未添加包长和换行符的切片 func (t *httpTransStream) sendUnpackedSegment(segment []byte) { t.writeSeparator(segment) @@ -197,8 +199,8 @@ func (t *httpTransStream) WriteHeader() error { func (t *httpTransStream) Close() error { //发送剩余的流 - if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 { - t.sendUnpackedSegment(segment) + if !t.mwBuffer.IsNewSegment() { + t.forceFlushSegment() } return nil } diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go index 50e2515..1898c18 100644 --- a/stream/mw_buffer.go +++ b/stream/mw_buffer.go @@ -174,7 +174,7 @@ func (m *mergeWritingBuffer) IsNewSegment() bool { func (m *mergeWritingBuffer) Reserve(number int) { utils.Assert(m.mwBlocks[m.index].buffer != nil) - m.mwBlocks[m.index].buffer.Reserve(number) + _ = m.mwBlocks[m.index].buffer.Allocate(number) } func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {