修复http-flv拉流失败问题

This commit is contained in:
yangjiechina
2024-07-11 22:26:58 +08:00
parent b5523e6e6f
commit 6212dda56c
3 changed files with 16 additions and 13 deletions

View File

@@ -1,10 +1,11 @@
package flv package flv
import ( import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/stream" "github.com/lkmio/lkm/stream"
"net" "net"
) )
func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.Sink { func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: conn} return &stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: transport.NewConn(conn)}
} }

View File

@@ -43,11 +43,9 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
videoKey = packet.KeyFrame() videoKey = packet.KeyFrame()
} }
//发送剩余数据 //关键帧都放在切片头部,所以需要创建新切片, 发送当前切片剩余流
if videoKey && !t.mwBuffer.IsNewSegment() { if videoKey && !t.mwBuffer.IsNewSegment() {
t.mwBuffer.Reserve(2) t.forceFlushSegment()
segment := t.mwBuffer.FlushSegment()
t.sendUnpackedSegment(segment)
} }
var n int var n int
@@ -60,7 +58,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
n = HttpFlvBlockLengthSize n = HttpFlvBlockLengthSize
} }
//结束时, 预留换行符 //切片末尾, 预留换行符
if t.mwBuffer.IsFull(dts) { if t.mwBuffer.IsFull(dts) {
separatorSize += 2 separatorSize += 2
} }
@@ -70,10 +68,8 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
copy(bytes[n:], data) copy(bytes[n:], data)
//添加长度和换行符 //添加长度和换行符, 长度是16进制字符串
//每一个合并写切片开始和预留长度所需的字节数 //每一个合并写切片, 头部预留长度所需的字节数, 末尾加上换行符
//合并写切片末尾加上换行符
//长度是16进制字符串
if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 { if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
t.sendUnpackedSegment(segment) t.sendUnpackedSegment(segment)
} }
@@ -96,6 +92,12 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
return nil return nil
} }
func (t *httpTransStream) forceFlushSegment() {
t.mwBuffer.Reserve(2)
segment := t.mwBuffer.FlushSegment()
t.sendUnpackedSegment(segment)
}
// 发送还未添加包长和换行符的切片 // 发送还未添加包长和换行符的切片
func (t *httpTransStream) sendUnpackedSegment(segment []byte) { func (t *httpTransStream) sendUnpackedSegment(segment []byte) {
t.writeSeparator(segment) t.writeSeparator(segment)
@@ -197,8 +199,8 @@ func (t *httpTransStream) WriteHeader() error {
func (t *httpTransStream) Close() error { func (t *httpTransStream) Close() error {
//发送剩余的流 //发送剩余的流
if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 { if !t.mwBuffer.IsNewSegment() {
t.sendUnpackedSegment(segment) t.forceFlushSegment()
} }
return nil return nil
} }

View File

@@ -174,7 +174,7 @@ func (m *mergeWritingBuffer) IsNewSegment() bool {
func (m *mergeWritingBuffer) Reserve(number int) { func (m *mergeWritingBuffer) Reserve(number int) {
utils.Assert(m.mwBlocks[m.index].buffer != nil) utils.Assert(m.mwBlocks[m.index].buffer != nil)
m.mwBlocks[m.index].buffer.Reserve(number) _ = m.mwBlocks[m.index].buffer.Allocate(number)
} }
func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {