feat: rtsp流支持关键帧缓存

This commit is contained in:
ydajiang
2025-07-27 15:05:37 +08:00
parent 77d18481c0
commit c6aba06199
11 changed files with 199 additions and 110 deletions

View File

@@ -136,7 +136,7 @@ func (t *TransStream) ReadExtraData(_ int64) ([]*collections.ReferenceCounter[[]
return []*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(GetHttpFLVBlock(t.flvHeaderBlock)), collections.NewReferenceCounter(GetHttpFLVBlock(t.flvExtraDataBlock))}, 0, nil
}
func (t *TransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (t *TransStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) {
t.ClearOutStreamBuffer()
// 发送当前内存池已有的合并写切片
@@ -144,20 +144,42 @@ func (t *TransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]by
t.AppendOutStreamBuffer(segment)
})
return t.OutBuffer[:t.OutBufferSize], 0, nil
if t.OutBufferSize < 1 {
return nil, nil
}
return []stream.TransStreamSegment{
{
Data: t.OutBuffer[:t.OutBufferSize],
TS: 0,
Key: true,
},
}, nil
}
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (t *TransStream) Close() ([]stream.TransStreamSegment, error) {
t.ClearOutStreamBuffer()
// 发送剩余的流
var key bool
var segment *collections.ReferenceCounter[[]byte]
if !t.MWBuffer.IsNewSegment() {
if segment, _ := t.flushSegment(); segment != nil {
if segment, key = t.flushSegment(); segment != nil {
t.AppendOutStreamBuffer(segment)
}
}
return t.OutBuffer[:t.OutBufferSize], 0, nil
if t.OutBufferSize < 1 {
return nil, nil
}
return []stream.TransStreamSegment{
{
Data: t.OutBuffer[:t.OutBufferSize],
TS: 0,
Key: key,
},
}, nil
}
// 保存为完整的http-flv切片

View File

@@ -65,9 +65,9 @@ func (s *GBGateway) Input(packet *avformat.AVPacket, index int) ([]*collections.
return result, 0, true, nil
}
func (s *GBGateway) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (s *GBGateway) Close() ([]stream.TransStreamSegment, error) {
s.rtpBuffer.Clear()
return nil, 0, nil
return nil, nil
}
func NewGBGateway(ssrc uint32) *GBGateway {

View File

@@ -191,7 +191,7 @@ func (t *TransStream) createSegment() error {
return nil
}
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (t *TransStream) Close() ([]stream.TransStreamSegment, error) {
var err error
if t.ctx.file != nil {
@@ -210,7 +210,7 @@ func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, e
t.m3u8File = nil
}
return nil, 0, err
return nil, err
}
func stringPtrToBytes(ptr *string) []byte {

View File

@@ -125,7 +125,7 @@ func (t *transStream) ReadExtraData(_ int64) ([]*collections.ReferenceCounter[[]
return []*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(t.sequenceHeader)}, 0, nil
}
func (t *transStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (t *transStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) {
t.ClearOutStreamBuffer()
// 发送当前内存池已有的合并写切片
@@ -133,7 +133,17 @@ func (t *transStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]by
t.AppendOutStreamBuffer(bytes)
})
return t.OutBuffer[:t.OutBufferSize], 0, nil
if t.OutBufferSize < 1 {
return nil, nil
}
return []stream.TransStreamSegment{
{
Data: t.OutBuffer[:t.OutBufferSize],
TS: 0,
Key: true,
},
}, nil
}
func (t *transStream) WriteHeader() error {
@@ -226,15 +236,27 @@ func (t *transStream) WriteHeader() error {
return nil
}
func (t *transStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (t *transStream) Close() ([]stream.TransStreamSegment, error) {
t.ClearOutStreamBuffer()
// 发送剩余的流
if segment, _ := t.MWBuffer.FlushSegment(); segment != nil {
var key bool
var segment *collections.ReferenceCounter[[]byte]
if segment, key = t.MWBuffer.FlushSegment(); segment != nil {
t.AppendOutStreamBuffer(segment)
}
return t.OutBuffer[:t.OutBufferSize], 0, nil
if t.OutBufferSize < 1 {
return nil, nil
}
return []stream.TransStreamSegment{
{
Data: t.OutBuffer[:t.OutBufferSize],
TS: 0,
Key: key,
},
}, nil
}
func NewTransStream(chunkSize int, metaData *amf0.Object) stream.TransStream {

View File

@@ -11,6 +11,7 @@ import (
"reflect"
"strconv"
"strings"
"time"
)
type Request struct {
@@ -72,7 +73,7 @@ func (h handler) Process(session *session, method string, url_ *url.URL, headers
source, _ := stream.Path2SourceID(url_.Path, "")
//反射调用各个处理函数
// 反射调用各个处理函数
results := m.Call([]reflect.Value{
reflect.ValueOf(&h),
reflect.ValueOf(Request{session, source, method, url_, headers}),
@@ -220,11 +221,16 @@ func (h handler) OnSetup(request Request) (*http.Response, []byte, error) {
func (h handler) OnPlay(request Request) (*http.Response, []byte, error) {
response := NewOKResponse(request.headers.Get("Cseq"))
sessionHeader := request.headers.Get("Session")
if sessionHeader != "" {
response.Header.Set("Date", time.Now().Format("Mon, 02 Jan 2006 15:04:05 GMT"))
if sessionHeader := request.headers.Get("Session"); sessionHeader != "" {
response.Header.Set("Session", sessionHeader)
}
if rangeV := request.headers.Get("Range"); rangeV != "" {
response.Header.Set("Range", rangeV)
}
sink := request.session.sink
sink.SetReady(true)
source := stream.SourceManager.Find(sink.GetSourceID())
@@ -233,6 +239,10 @@ func (h handler) OnPlay(request Request) (*http.Response, []byte, error) {
}
source.GetTransStreamPublisher().AddSink(sink)
// RTP-Info: url=rtsp://192.168.2.110:8554/hls/mystream/trackID=0;seq=21592;rtptime=4586400,url=rtsp://192.168.2.110:8554/hls/mystream/trackID=1;seq=403;rtptime=412672\r\n
//info := <-sink.onPlayResponse
//response.Header.Set("RTP-Info", fmt.Sprintf("url=%s;seq=%d;rtptime=%d", "rtsp://192.168.2.119:554/hls/mystream/?track=0", info[0], info[1]))
return response, nil, nil
}

View File

@@ -21,18 +21,17 @@ const (
)
// TransStream rtsp传输流封装
// 低延迟是rtsp特性, 所以不考虑实现GOP缓存
type TransStream struct {
stream.BaseTransStream
addr net.IPAddr
addrType string
urlFormat string
RtspTracks []*Track
oldTracks map[utils.AVCodecID]uint16 // 上次推流的rtp seq
sdp string
rtpBuffer *stream.RtpBuffer
sdp string
RtspTracks []*Track
lastEndSeq map[utils.AVCodecID]uint16 // 上次结束推流的rtp seq
segments []stream.TransStreamSegment // 缓存的切片
packetAllocator *stream.RtpBuffer // 分配rtp包
}
func (t *TransStream) OverTCP(data []byte, channel int) {
@@ -53,76 +52,98 @@ func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*colle
t.Tracks[trackIndex].Pts = t.Tracks[trackIndex].Dts + packet.GetPtsDtsDelta(track.payload.ClockRate)
if utils.AVMediaTypeAudio == packet.MediaType {
result = t.PackRtpPayload(track, trackIndex, packet.Data, ts)
result = t.PackRtpPayload(track, trackIndex, packet.Data, ts, false)
} else if utils.AVMediaTypeVideo == packet.MediaType {
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet)
data := avc.RemoveStartCode(annexBData)
result = t.PackRtpPayload(track, trackIndex, data, ts)
result = t.PackRtpPayload(track, trackIndex, annexBData, ts, packet.Key)
}
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
}
func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[]byte], int64, error) {
// 返回视频编码数据的rtp包
for _, track := range t.RtspTracks {
if utils.AVMediaTypeVideo != track.MediaType {
continue
}
// 回滚序号和时间戳
index := int(track.StartSeq) - len(track.ExtraDataBuffer)
for i, packet := range track.ExtraDataBuffer {
rtp.RollbackSeq(packet.Get()[OverTcpHeaderSize:], index+i+1)
binary.BigEndian.PutUint32(packet.Get()[OverTcpHeaderSize+4:], uint32(ts))
}
// 目前只有视频需要发送扩展数据的rtp包, 所以直接返回
return track.ExtraDataBuffer, ts, nil
}
return nil, ts, nil
func (t *TransStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) {
// 默认不开启rtsp的关键帧缓存, 一次发送rtp包过多, 播放器的jitter buffer可能会溢出丢弃, 造成播放花屏
//return t.segments, nil
return nil, nil
}
// PackRtpPayload 打包返回rtp over tcp的数据包
func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) []*collections.ReferenceCounter[[]byte] {
func (t *TransStream) PackRtpPayload(track *Track, trackIndex int, data []byte, timestamp uint32, videoKey bool) []*collections.ReferenceCounter[[]byte] {
// 分割nalu
var payloads [][]byte
if utils.AVCodecIdH264 == track.CodecID || utils.AVCodecIdH265 == track.CodecID {
avc.SplitNalU(data, func(nalu []byte) {
payloads = append(payloads, avc.RemoveStartCode(nalu))
})
} else {
payloads = append(payloads, data)
}
var result []*collections.ReferenceCounter[[]byte]
var packet []byte
var counter *collections.ReferenceCounter[[]byte]
// 保存开始序号
track.StartSeq = track.Muxer.GetHeader().Seq
track.Muxer.Input(data, timestamp, func() []byte {
counter = t.rtpBuffer.Get()
counter.Refer()
for _, payload := range payloads {
// 保存开始序号
track.StartSeq = track.Muxer.GetHeader().Seq
track.Muxer.Input(payload, timestamp, func() []byte {
counter = t.packetAllocator.Get()
counter.Refer()
packet = counter.Get()
// 预留rtp over tcp 4字节头部
return packet[OverTcpHeaderSize:]
}, func(bytes []byte) {
track.EndSeq = track.Muxer.GetHeader().Seq
// 每个包都存在rtp over tcp 4字节头部
overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)]
t.OverTCP(overTCPPacket, channel)
packet = counter.Get()
// 预留rtp over tcp 4字节头部
return packet[OverTcpHeaderSize:]
}, func(bytes []byte) {
track.EndSeq = track.Muxer.GetHeader().Seq
// 每个包都存在rtp over tcp 4字节头部
overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)]
t.OverTCP(overTCPPacket, trackIndex)
counter.ResetData(overTCPPacket)
result = append(result, counter)
})
counter.ResetData(overTCPPacket)
result = append(result, counter)
})
}
// 引用计数保持为1
for _, pkt := range result {
pkt.Release()
}
if t.HasVideo() && stream.AppConfig.GOPCache {
// 遇到视频关键帧, 丢弃前一帧缓存
if videoKey {
for _, segment := range t.segments {
for _, pkt := range segment.Data {
pkt.Release()
}
}
t.segments = t.segments[:0]
}
// 计数+1
for _, pkt := range result {
pkt.Refer()
}
// 放在缓存末尾
t.segments = append(t.segments, stream.TransStreamSegment{
Data: result,
TS: int64(timestamp),
Key: videoKey,
Index: trackIndex,
})
}
return result
}
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
// 恢复上次拉流的序号
var startSeq uint16
if t.oldTracks != nil {
if t.lastEndSeq != nil {
var ok bool
startSeq, ok = t.oldTracks[track.Stream.CodecID]
startSeq, ok = t.lastEndSeq[track.Stream.CodecID]
utils.Assert(ok)
}
@@ -139,15 +160,9 @@ func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
trackIndex := len(t.RtspTracks) - 1
// 将sps和pps按照单一模式打包
var extraDataPackets []*collections.ReferenceCounter[[]byte]
packAndAdd := func(data []byte) {
packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0)
for _, packet := range packets {
extra := packet.Get()
bytes := make([]byte, len(extra))
copy(bytes, extra)
extraDataPackets = append(extraDataPackets, collections.NewReferenceCounter(bytes))
}
packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0, true)
utils.Assert(len(packets) == 1)
}
if utils.AVMediaTypeVideo == track.Stream.MediaType {
@@ -161,21 +176,19 @@ func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
ppsBytes := parameters.PPS()
packAndAdd(avc.RemoveStartCode(spsBytes[0]))
packAndAdd(avc.RemoveStartCode(ppsBytes[0]))
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
}
return trackIndex, nil
}
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
func (t *TransStream) Close() ([]stream.TransStreamSegment, error) {
for _, track := range t.RtspTracks {
if track != nil {
track.Close()
}
}
return nil, 0, nil
return nil, nil
}
func (t *TransStream) WriteHeader() error {
@@ -263,10 +276,10 @@ func (t *TransStream) WriteHeader() error {
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[utils.AVCodecID]uint16) stream.TransStream {
t := &TransStream{
addr: addr,
urlFormat: urlFormat,
oldTracks: oldTracks,
rtpBuffer: stream.NewRtpBuffer(512),
addr: addr,
urlFormat: urlFormat,
lastEndSeq: oldTracks,
packetAllocator: stream.NewRtpBuffer(512),
}
if addr.IP.To4() != nil {

View File

@@ -1,7 +1,6 @@
package rtsp
import (
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/rtp"
)
@@ -13,20 +12,19 @@ type Track struct {
StartSeq uint16
EndSeq uint16
CodecID utils.AVCodecID
Muxer rtp.Muxer
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
Muxer rtp.Muxer
}
func (r *Track) Close() {
}
func NewRTSPTrack(muxer rtp.Muxer, payload rtp.PayloadType, mediaType utils.AVMediaType, id utils.AVCodecID) *Track {
stream := &Track{
payload: payload,
Muxer: muxer,
MediaType: mediaType,
CodecID: id,
Muxer: muxer,
}
return stream

View File

@@ -308,7 +308,7 @@ func SetDefaultConfig(config *AppConfig_) {
if !config.GOPCache {
config.GOPCache = true
config.MergeWriteLatency = 350
log.Sugar.Warnf("强制开启GOP缓存")
println("强制开启GOP缓存")
}
config.MergeWriteLatency = limitInt(350, 2000, config.MergeWriteLatency) // 最低缓存350毫秒数据才发送 最高缓存2秒数据才发送

View File

@@ -10,11 +10,11 @@ type RtpBuffer struct {
func (r *RtpBuffer) Get() *collections.ReferenceCounter[[]byte] {
if r.queue.Size() > 0 {
rtp := r.queue.Peek(0)
if rtp.UseCount() < 2 {
bytes := rtp.Get()
rtp.ResetData(bytes[:cap(bytes)])
return rtp
pkt := r.queue.Peek(0)
if pkt.UseCount() < 2 {
r.queue.Pop()
r.Put(pkt)
return pkt
}
}
@@ -34,6 +34,13 @@ func (r *RtpBuffer) Clear() {
}
}
// Put 归还rtp包
func (r *RtpBuffer) Put(pkt *collections.ReferenceCounter[[]byte]) {
bytes := pkt.Get()
pkt.ResetData(bytes[:cap(bytes)])
r.queue.Push(pkt)
}
func NewRtpBuffer(capacity int) *RtpBuffer {
return &RtpBuffer{queue: collections.NewQueue[*collections.ReferenceCounter[[]byte]](capacity)}
}

View File

@@ -349,6 +349,9 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t
// 设置转发流
if TransStreamGBCascaded == transStream.GetProtocol() {
t.forwardTransStream = transStream
} else if AppConfig.GOPCache && t.hasVideo {
// 新建传输流,发送GOP缓存
t.DispatchGOPBuffer(transStream)
}
return transStream, nil
@@ -423,6 +426,12 @@ func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int
}
}
func (t *transStreamPublisher) DispatchSegments(transStream TransStream, segments []TransStreamSegment) {
for _, segment := range segments {
t.DispatchBuffer(transStream, segment.Index, segment.Data, segment.TS, segment.Key)
}
}
func (t *transStreamPublisher) pendingSink(sink Sink) {
log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), t.source)
go sink.Close()
@@ -445,6 +454,12 @@ func (t *transStreamPublisher) write(sink Sink, index int, data []*collections.R
return false
}
func (t *transStreamPublisher) writeSegments(sink Sink, segments []TransStreamSegment) {
for _, segment := range segments {
t.write(sink, segment.Index, segment.Data, segment.TS, segment.Key)
}
}
// 创建sink需要的输出流
func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
@@ -554,18 +569,13 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
}
// 发送已缓存的合并写切片
keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(keyBuffer) > 0 {
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
t.write(sink, 0, extraData, timestamp, false)
segments, _ := transStream.ReadKeyFrameBuffer()
if len(segments) > 0 {
if extraData, _, _ := transStream.ReadExtraData(0); len(extraData) > 0 {
t.write(sink, 0, extraData, 0, false)
}
t.write(sink, 0, keyBuffer, timestamp, true)
}
// 新建传输流,发送已经缓存的音视频帧
if !exist && AppConfig.GOPCache && t.hasVideo && TransStreamGBCascaded != transStream.GetProtocol() {
t.DispatchGOPBuffer(transStream)
t.writeSegments(sink, segments)
}
return true
@@ -674,9 +684,9 @@ func (t *transStreamPublisher) doClose() {
// 关闭所有输出流
for _, transStream := range t.transStreams {
// 发送剩余包
data, ts, _ := transStream.Close()
if len(data) > 0 {
t.DispatchBuffer(transStream, -1, data, ts, true)
segments, _ := transStream.Close()
if len(segments) > 0 {
t.DispatchSegments(transStream, segments)
}
// 如果是tcp传输流, 归还合并写缓冲区

View File

@@ -43,10 +43,10 @@ type TransStream interface {
ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error)
// ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列
ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error)
ReadKeyFrameBuffer() ([]TransStreamSegment, error)
// Close 关闭传输流, 返回还未flush的合并写块
Close() ([]*collections.ReferenceCounter[[]byte], int64, error)
Close() ([]TransStreamSegment, error)
HasVideo() bool
@@ -55,6 +55,13 @@ type TransStream interface {
GetMWBuffer() MergeWritingBuffer
}
type TransStreamSegment struct {
Data []*collections.ReferenceCounter[[]byte]
TS int64
Key bool
Index int
}
type BaseTransStream struct {
ID TransStreamID
Tracks []*Track
@@ -119,8 +126,8 @@ func (t *BaseTransStream) FindTrackWithStreamIndex(streamIndex int) *Track {
return nil
}
func (t *BaseTransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
return nil, 0, nil
func (t *BaseTransStream) Close() ([]TransStreamSegment, error) {
return nil, nil
}
func (t *BaseTransStream) GetProtocol() TransStreamProtocol {
@@ -167,8 +174,8 @@ func (t *BaseTransStream) ReadExtraData(timestamp int64) ([]*collections.Referen
return nil, 0, nil
}
func (t *BaseTransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) {
return nil, 0, nil
func (t *BaseTransStream) ReadKeyFrameBuffer() ([]TransStreamSegment, error) {
return nil, nil
}
func (t *BaseTransStream) IsTCPStreaming() bool {