fix: http-flv恢复推流时, pre tag size错误和重复发送flv header问题

This commit is contained in:
ydajiang
2024-12-24 20:10:33 +08:00
parent be4a13996b
commit da7f084a2c
6 changed files with 149 additions and 61 deletions

View File

@@ -2,6 +2,7 @@ package main
import (
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/flv"
"github.com/lkmio/lkm/hls"
"github.com/lkmio/lkm/rtsp"
"github.com/lkmio/lkm/stream"
@@ -44,6 +45,9 @@ func NewStreamEndInfo(source stream.Source) *stream.StreamEndInfo {
info.RtspTracks[track.PT] = track.EndSeq
}
}
} else if stream.TransStreamFlv == transStream.GetProtocol() {
stream := transStream.(*flv.TransStream)
info.FLVPrevTagSize = stream.Muxer.PrevTagSize()
}
}

75
flv/flv_block.go Normal file
View File

@@ -0,0 +1,75 @@
package flv
import (
"encoding/binary"
"fmt"
)
const (
// HttpFlvBlockHeaderSize 在每块http-flv流的头部预留指定大小的数据, 用于描述flv数据块的长度信息
// http-flv是以文件流的形式传输http流, 格式如下: length\r\n|flv data\r\n
// 我们对http-flv-block的封装: |block size[4]|skip count[2]|length\r\n|flv data\r\n
// skip count是因为length长度不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
HttpFlvBlockHeaderSize = 20
)
// GetHttpFLVBlock 跳过头部的无效数据返回http-flv块
func GetHttpFLVBlock(data []byte) []byte {
return data[computeSkipBytesSize(data):]
}
// GetFLVTag 从http flv块中提取返回flv tag
func GetFLVTag(block []byte) []byte {
length := len(block)
var offset int
for i := 2; i < length; i++ {
if block[i-2] == 0x0D && block[i-1] == 0x0A {
offset = i
break
}
}
return block[offset : length-2]
}
// 计算头部的无效数据, 返回http-flv的其实位置
func computeSkipBytesSize(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:]))
}
// FormatSegment 为切片添加包长和换行符
func FormatSegment(segment []byte) []byte {
writeSeparator(segment)
return GetHttpFLVBlock(segment)
}
// 为http-flv数据块添加长度和换行符
// @dst http-flv数据块, 头部需要空出HttpFlvBlockLengthSize字节长度, 末尾空出2字节换行符
func writeSeparator(dst []byte) {
// http-flv: length\r\n|flv data\r\n
// http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
// 写block size
binary.BigEndian.PutUint32(dst, uint32(len(dst)-4))
// 写flv实际长度字符串, 16进制表达
flvSize := len(dst) - HttpFlvBlockHeaderSize - 2
hexStr := fmt.Sprintf("%X", flvSize)
// +2是跳过length后的换行符
n := len(hexStr) + 2
copy(dst[HttpFlvBlockHeaderSize-n:], hexStr)
// 写跳过字节数量
// -6是block size和skip count字段合计长度
skipCount := HttpFlvBlockHeaderSize - n - 6
binary.BigEndian.PutUint16(dst[4:], uint16(skipCount))
// flv length字段和flv数据之间的换行符
dst[HttpFlvBlockHeaderSize-2] = 0x0D
dst[HttpFlvBlockHeaderSize-1] = 0x0A
// 末尾换行符
dst[len(dst)-2] = 0x0D
dst[len(dst)-1] = 0x0A
}

View File

@@ -6,6 +6,26 @@ import (
"net"
)
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}
type Sink struct {
stream.BaseSink
prevTagSize uint32
}
func (s *Sink) StopStreaming(stream stream.TransStream) {
s.BaseSink.StopStreaming(stream)
s.prevTagSize = stream.(*TransStream).Muxer.PrevTagSize()
}
func (s *Sink) Write(index int, data [][]byte, ts int64) error {
// 恢复推流时, 不发送9个字节的flv header
if s.prevTagSize > 0 {
data = data[1:]
s.prevTagSize = 0
}
return s.BaseSink.Write(index, data, ts)
}
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {
return &Sink{BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}}
}

View File

@@ -2,33 +2,25 @@ package flv
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat/libflv"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/rtmp"
"github.com/lkmio/lkm/stream"
)
const (
// HttpFlvBlockHeaderSize 在每块http-flv流的头部预留指定大小的数据, 用于描述flv数据块的长度信息
// http-flv是以文件流的形式传输http流, 格式如下: length\r\n|flv data\r\n
// 我们对http-flv-block的封装: |block size[4]|skip count[2]|length\r\n|flv data\r\n
// skip count是因为length长度不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
HttpFlvBlockHeaderSize = 20
)
type TransStream struct {
stream.TCPTransStream
muxer libflv.Muxer
header []byte
headerSize int
headerTagSize int
Muxer libflv.Muxer
flvHeaderBlock []byte // 单独保存9个字节长的flv头, 只发一次, 后续恢复推流不再发送
flvExtraDataBlock []byte // metadata和sequence header
flvExtraDataTagSize int // 整个flv tag大小
}
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var flvSize int
var flvTagSize int
var data []byte
var videoKey bool
var dts int64
@@ -38,10 +30,10 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
dts = packet.ConvertDts(1000)
pts = packet.ConvertPts(1000)
if utils.AVMediaTypeAudio == packet.MediaType() {
flvSize = 17 + len(packet.Data())
flvTagSize = 17 + len(packet.Data())
data = packet.Data()
} else if utils.AVMediaTypeVideo == packet.MediaType() {
flvSize = t.muxer.ComputeVideoDataSize(uint32(pts-dts)) + libflv.TagHeaderSize + len(packet.AVCCPacketData())
flvTagSize = t.Muxer.ComputeVideoDataSize(uint32(pts-dts)) + libflv.TagHeaderSize + len(packet.AVCCPacketData())
data = packet.AVCCPacketData()
videoKey = packet.KeyFrame()
@@ -49,7 +41,7 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
// 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流
if videoKey && !t.MWBuffer.IsNewSegment() {
segment, key := t.forceFlushSegment()
segment, key := t.flushSegment()
t.AppendOutStreamBuffer(segment)
keyBuffer = key
}
@@ -69,16 +61,17 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
separatorSize += 2
}
// 分配flv block
bytes := t.MWBuffer.Allocate(separatorSize+flvSize, dts, videoKey)
n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
// 分配block
bytes := t.MWBuffer.Allocate(separatorSize+flvTagSize, dts, videoKey)
// 写flv tag
n += t.Muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
copy(bytes[n:], data)
// 合并写满再发
if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
// 已经分配末尾换行符内存
keyBuffer = key
t.AppendOutStreamBuffer(t.FormatSegment(segment))
// 已经分配末尾换行符内存, 直接添加
t.AppendOutStreamBuffer(FormatSegment(segment))
}
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
@@ -90,19 +83,24 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
}
if utils.AVMediaTypeAudio == track.Stream.Type() {
t.muxer.AddAudioTrack(track.Stream.CodecId(), 0, 0, 0)
t.Muxer.AddAudioTrack(track.Stream.CodecId(), 0, 0, 0)
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
t.muxer.AddVideoTrack(track.Stream.CodecId())
t.Muxer.AddVideoTrack(track.Stream.CodecId())
t.muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters().Width()))
t.muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters().Height()))
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters().Width()))
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters().Height()))
}
return nil
}
func (t *TransStream) WriteHeader() error {
t.headerSize += t.muxer.WriteHeader(t.header[HttpFlvBlockHeaderSize:])
var header [4096]byte
var extraDataSize int
size := t.Muxer.WriteHeader(header[:])
copy(t.flvHeaderBlock[HttpFlvBlockHeaderSize:], header[:9])
copy(t.flvExtraDataBlock[HttpFlvBlockHeaderSize:], header[9:size])
extraDataSize = HttpFlvBlockHeaderSize + (size - 9)
for _, track := range t.BaseTransStream.Tracks {
var data []byte
if utils.AVMediaTypeAudio == track.Stream.Type() {
@@ -111,26 +109,25 @@ func (t *TransStream) WriteHeader() error {
data = track.Stream.CodecParameters().MP4ExtraData()
}
n := t.muxer.Input(t.header[t.headerSize:], track.Stream.Type(), len(data), 0, 0, false, true)
t.headerSize += n
copy(t.header[t.headerSize:], data)
t.headerSize += len(data)
t.headerTagSize = n - 15 + len(data) + 11
n := t.Muxer.Input(t.flvExtraDataBlock[extraDataSize:], track.Stream.Type(), len(data), 0, 0, false, true)
extraDataSize += n
copy(t.flvExtraDataBlock[extraDataSize:], data)
extraDataSize += len(data)
t.flvExtraDataTagSize = n - 15 + len(data) + 11
}
// 加上末尾换行符
t.headerSize += 2
t.writeSeparator(t.header[:t.headerSize])
extraDataSize += 2
t.flvExtraDataBlock = t.flvExtraDataBlock[:extraDataSize]
writeSeparator(t.flvHeaderBlock)
writeSeparator(t.flvExtraDataBlock)
t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil
}
func (t *TransStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
utils.Assert(t.headerSize > 0)
// 发送sequence header
return [][]byte{t.GetHttpFLVBlock(t.header[:t.headerSize])}, 0, nil
return [][]byte{GetHttpFLVBlock(t.flvHeaderBlock), GetHttpFLVBlock(t.flvExtraDataBlock)}, 0, nil
}
func (t *TransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
@@ -139,15 +136,15 @@ func (t *TransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
// 发送当前内存池已有的合并写切片
t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
if t.OutBufferSize < 1 {
// 修改第一个flv tag的pre tag size
binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], uint32(t.headerTagSize))
// 修改第一个flv tag的pre tag size为sequence header tag size
binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], uint32(t.flvExtraDataTagSize))
}
// 遍历发送合并写切片
var index int
for ; index < len(bytes); index += 4 {
size := binary.BigEndian.Uint32(bytes[index:])
t.AppendOutStreamBuffer(t.GetHttpFLVBlock(bytes[index : index+4+int(size)]))
t.AppendOutStreamBuffer(GetHttpFLVBlock(bytes[index : index+4+int(size)]))
index += int(size)
}
})
@@ -160,7 +157,7 @@ func (t *TransStream) Close() ([][]byte, int64, error) {
// 发送剩余的流
if !t.MWBuffer.IsNewSegment() {
if segment, _ := t.forceFlushSegment(); len(segment) > 0 {
if segment, _ := t.flushSegment(); len(segment) > 0 {
t.AppendOutStreamBuffer(segment)
}
}
@@ -169,11 +166,11 @@ func (t *TransStream) Close() ([][]byte, int64, error) {
}
// 保存为完整的http-flv切片
func (t *TransStream) forceFlushSegment() ([]byte, bool) {
func (t *TransStream) flushSegment() ([]byte, bool) {
// 预览末尾换行符
t.MWBuffer.Reserve(2)
segment, key := t.MWBuffer.FlushSegment()
return t.FormatSegment(segment), key
return FormatSegment(segment), key
}
// GetHttpFLVBlock 跳过头部的无效数据返回http-flv块

View File

@@ -15,18 +15,9 @@ func (w WSConn) Read(b []byte) (n int, err error) {
panic("implement me")
}
func (w WSConn) Write(b []byte) (n int, err error) {
//输入http-flv数据
//去掉不需要的换行符
var offset int
for i := 2; i < len(b); i++ {
if b[i-2] == 0x0D && b[i-1] == 0x0A {
offset = i
break
}
}
return 0, w.WriteMessage(websocket.BinaryMessage, b[offset:len(b)-2])
func (w WSConn) Write(block []byte) (n int, err error) {
// ws-flv负载的时flv tag
return 0, w.WriteMessage(websocket.BinaryMessage, GetFLVTag(block))
}
func (w WSConn) SetDeadline(t time.Time) error {

View File

@@ -20,9 +20,10 @@ func init() {
type StreamEndInfo struct {
ID string
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
M3U8Writer libhls.M3U8Writer
PlaylistFormat *string
RtspTracks map[byte]uint16
M3U8Writer libhls.M3U8Writer // 保存M3U8生成器
PlaylistFormat *string // M3U8播放列表
RtspTracks map[byte]uint16 // rtsp每路track的结束序号
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
}
func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool {