diff --git a/flv/flv_block.go b/flv/flv_block.go index 51a4290..2a41fe0 100644 --- a/flv/flv_block.go +++ b/flv/flv_block.go @@ -3,6 +3,7 @@ package flv import ( "encoding/binary" "fmt" + "github.com/lkmio/flv" ) const ( @@ -32,6 +33,38 @@ func GetFLVTag(block []byte) []byte { return block[offset : length-2] } +type TagPacket struct { + flv.Tag + Raw []byte + Offset int +} + +// SplitHttpFlvBlock http-flv块分割为多个flv tag +func SplitHttpFlvBlock(httpFlv []byte) []TagPacket { + data := GetFLVTag(httpFlv) + length := len(data) + start := len(httpFlv) - length - 2 + + var packets []TagPacket + for i := 0; i < length; { + tag := flv.UnmarshalTag(data[i:]) + + offset := i + i += flv.TagHeaderSize + tag.DataSize + + // 目前只需要保留第一个和最后一个tag + if offset == 0 || i >= length { + packets = append(packets, TagPacket{ + Tag: tag, + Raw: data[offset:i], + Offset: start + offset, + }) + } + } + + return packets +} + // 计算头部的无效数据, 返回http-flv的其实位置 func computeSkipBytesSize(data []byte) int { return int(6 + binary.BigEndian.Uint16(data[4:])) diff --git a/flv/flv_sink.go b/flv/flv_sink.go index 76dcce8..9bfa38c 100644 --- a/flv/flv_sink.go +++ b/flv/flv_sink.go @@ -1,6 +1,7 @@ package flv import ( + "encoding/binary" "github.com/lkmio/avformat/collections" "github.com/lkmio/lkm/stream" "github.com/lkmio/transport" @@ -18,13 +19,42 @@ func (s *Sink) StopStreaming(stream stream.TransStream) { } func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { - // 恢复推流时, 不发送9个字节的flv header - if s.prevTagSize > 0 { - data = data[1:] - s.prevTagSize = 0 + var offset int + if s.SentPacketCount == 0 { + // 恢复推流时, 不发送9个字节的flv header + if s.prevTagSize > 0 { + if s.prevTagSize > 0 { + data = data[1:] + } + } else { + offset++ + } + } - return s.BaseSink.Write(index, data, ts, keyVideo) + var modifiedTags []*collections.ReferenceCounter[[]byte] + // 修改第一个tag的prevTagSize, 如果第一个tag的prevTagSize与sink的prevTagSize不一致 + if s.SentPacketCount < 2 { + tags := SplitHttpFlvBlock(data[offset].Get()) + if s.prevTagSize != tags[0].PrevTagSize { + for _, datum := range data { + modifiedTags = append(modifiedTags, datum) + } + + bytes := make([]byte, len(data[offset].Get())) + copy(bytes, data[offset].Get()) + binary.BigEndian.PutUint32(bytes[tags[0].Offset:], s.prevTagSize) + modifiedTags[offset] = collections.NewReferenceCounter(bytes) + } + + s.prevTagSize = uint32(11 + tags[len(tags)-1].DataSize) + } + + if modifiedTags == nil { + modifiedTags = data + } + + return s.BaseSink.Write(index, modifiedTags, ts, keyVideo) } func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink { diff --git a/flv/http_flv.go b/flv/http_flv.go index 49a728b..42f8342 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -1,7 +1,6 @@ package flv import ( - "encoding/binary" "github.com/lkmio/avformat" "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" @@ -14,10 +13,9 @@ import ( type TransStream struct { stream.TCPTransStream - Muxer *flv.Muxer - flvHeaderBlock []byte // 单独保存9个字节长的flv头, 只发一次, 后续恢复推流不再发送 - flvExtraDataBlock []byte // metadata和sequence header - flvExtraDataPreTagSize uint32 + Muxer *flv.Muxer + flvHeaderBlock []byte // 单独保存9个字节长的flv头, 只发一次, 后续恢复推流不再发送 + flvExtraDataBlock []byte // metadata和sequence header } func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { @@ -119,8 +117,6 @@ func (t *TransStream) WriteHeader() error { copy(t.flvHeaderBlock[HttpFlvBlockHeaderSize:], header[:9]) copy(t.flvExtraDataBlock[HttpFlvBlockHeaderSize:], tags) - t.flvExtraDataPreTagSize = t.Muxer.PrevTagSize() - // +2 加上末尾换行符 t.flvExtraDataBlock = t.flvExtraDataBlock[:HttpFlvBlockHeaderSize+size-9+2] writeSeparator(t.flvHeaderBlock) @@ -139,12 +135,6 @@ func (t *TransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]by // 发送当前内存池已有的合并写切片 t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(segment *collections.ReferenceCounter[[]byte]) { - // 修改第一个flv tag的pre tag size为sequence header tag size - bytes := segment.Get() - if t.OutBufferSize < 1 { - binary.BigEndian.PutUint32(GetFLVTag(bytes), t.flvExtraDataPreTagSize) - } - t.AppendOutStreamBuffer(segment) })