From c6aba06199433089e522439ddcb998f110ae9df1 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Sun, 27 Jul 2025 15:05:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20rtsp=E6=B5=81=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=85=B3=E9=94=AE=E5=B8=A7=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flv/http_flv.go | 32 +++++++-- gb28181/gateway.go | 4 +- hls/hls_stream.go | 4 +- rtmp/rtmp_stream.go | 32 +++++++-- rtsp/rtsp_handler.go | 16 ++++- rtsp/rtsp_stream.go | 139 ++++++++++++++++++++----------------- rtsp/rtsp_track.go | 8 +-- stream/config.go | 2 +- stream/rtp_buffer.go | 17 +++-- stream/stream_publisher.go | 36 ++++++---- stream/trans_stream.go | 19 +++-- 11 files changed, 199 insertions(+), 110 deletions(-) diff --git a/flv/http_flv.go b/flv/http_flv.go index c8d8136..3f7d657 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -136,7 +136,7 @@ func (t *TransStream) ReadExtraData(_ int64) ([]*collections.ReferenceCounter[[] return []*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(GetHttpFLVBlock(t.flvHeaderBlock)), collections.NewReferenceCounter(GetHttpFLVBlock(t.flvExtraDataBlock))}, 0, nil } -func (t *TransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (t *TransStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) { t.ClearOutStreamBuffer() // 发送当前内存池已有的合并写切片 @@ -144,20 +144,42 @@ func (t *TransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]by t.AppendOutStreamBuffer(segment) }) - return t.OutBuffer[:t.OutBufferSize], 0, nil + if t.OutBufferSize < 1 { + return nil, nil + } + + return []stream.TransStreamSegment{ + { + Data: t.OutBuffer[:t.OutBufferSize], + TS: 0, + Key: true, + }, + }, nil } -func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (t *TransStream) Close() ([]stream.TransStreamSegment, error) { t.ClearOutStreamBuffer() // 发送剩余的流 + var key bool + var segment *collections.ReferenceCounter[[]byte] if !t.MWBuffer.IsNewSegment() { - if segment, _ := t.flushSegment(); segment != nil { + if segment, key = t.flushSegment(); segment != nil { t.AppendOutStreamBuffer(segment) } } - return t.OutBuffer[:t.OutBufferSize], 0, nil + if t.OutBufferSize < 1 { + return nil, nil + } + + return []stream.TransStreamSegment{ + { + Data: t.OutBuffer[:t.OutBufferSize], + TS: 0, + Key: key, + }, + }, nil } // 保存为完整的http-flv切片 diff --git a/gb28181/gateway.go b/gb28181/gateway.go index e3fa848..08b9574 100644 --- a/gb28181/gateway.go +++ b/gb28181/gateway.go @@ -65,9 +65,9 @@ func (s *GBGateway) Input(packet *avformat.AVPacket, index int) ([]*collections. return result, 0, true, nil } -func (s *GBGateway) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (s *GBGateway) Close() ([]stream.TransStreamSegment, error) { s.rtpBuffer.Clear() - return nil, 0, nil + return nil, nil } func NewGBGateway(ssrc uint32) *GBGateway { diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 5ecf0a3..b23529d 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -191,7 +191,7 @@ func (t *TransStream) createSegment() error { return nil } -func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (t *TransStream) Close() ([]stream.TransStreamSegment, error) { var err error if t.ctx.file != nil { @@ -210,7 +210,7 @@ func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, e t.m3u8File = nil } - return nil, 0, err + return nil, err } func stringPtrToBytes(ptr *string) []byte { diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index ae403e8..74b4158 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -125,7 +125,7 @@ func (t *transStream) ReadExtraData(_ int64) ([]*collections.ReferenceCounter[[] return []*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(t.sequenceHeader)}, 0, nil } -func (t *transStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (t *transStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) { t.ClearOutStreamBuffer() // 发送当前内存池已有的合并写切片 @@ -133,7 +133,17 @@ func (t *transStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]by t.AppendOutStreamBuffer(bytes) }) - return t.OutBuffer[:t.OutBufferSize], 0, nil + if t.OutBufferSize < 1 { + return nil, nil + } + + return []stream.TransStreamSegment{ + { + Data: t.OutBuffer[:t.OutBufferSize], + TS: 0, + Key: true, + }, + }, nil } func (t *transStream) WriteHeader() error { @@ -226,15 +236,27 @@ func (t *transStream) WriteHeader() error { return nil } -func (t *transStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (t *transStream) Close() ([]stream.TransStreamSegment, error) { t.ClearOutStreamBuffer() // 发送剩余的流 - if segment, _ := t.MWBuffer.FlushSegment(); segment != nil { + var key bool + var segment *collections.ReferenceCounter[[]byte] + if segment, key = t.MWBuffer.FlushSegment(); segment != nil { t.AppendOutStreamBuffer(segment) } - return t.OutBuffer[:t.OutBufferSize], 0, nil + if t.OutBufferSize < 1 { + return nil, nil + } + + return []stream.TransStreamSegment{ + { + Data: t.OutBuffer[:t.OutBufferSize], + TS: 0, + Key: key, + }, + }, nil } func NewTransStream(chunkSize int, metaData *amf0.Object) stream.TransStream { diff --git a/rtsp/rtsp_handler.go b/rtsp/rtsp_handler.go index 8f6675d..5c2ff02 100644 --- a/rtsp/rtsp_handler.go +++ b/rtsp/rtsp_handler.go @@ -11,6 +11,7 @@ import ( "reflect" "strconv" "strings" + "time" ) type Request struct { @@ -72,7 +73,7 @@ func (h handler) Process(session *session, method string, url_ *url.URL, headers source, _ := stream.Path2SourceID(url_.Path, "") - //反射调用各个处理函数 + // 反射调用各个处理函数 results := m.Call([]reflect.Value{ reflect.ValueOf(&h), reflect.ValueOf(Request{session, source, method, url_, headers}), @@ -220,11 +221,16 @@ func (h handler) OnSetup(request Request) (*http.Response, []byte, error) { func (h handler) OnPlay(request Request) (*http.Response, []byte, error) { response := NewOKResponse(request.headers.Get("Cseq")) - sessionHeader := request.headers.Get("Session") - if sessionHeader != "" { + + response.Header.Set("Date", time.Now().Format("Mon, 02 Jan 2006 15:04:05 GMT")) + if sessionHeader := request.headers.Get("Session"); sessionHeader != "" { response.Header.Set("Session", sessionHeader) } + if rangeV := request.headers.Get("Range"); rangeV != "" { + response.Header.Set("Range", rangeV) + } + sink := request.session.sink sink.SetReady(true) source := stream.SourceManager.Find(sink.GetSourceID()) @@ -233,6 +239,10 @@ func (h handler) OnPlay(request Request) (*http.Response, []byte, error) { } source.GetTransStreamPublisher().AddSink(sink) + + // RTP-Info: url=rtsp://192.168.2.110:8554/hls/mystream/trackID=0;seq=21592;rtptime=4586400,url=rtsp://192.168.2.110:8554/hls/mystream/trackID=1;seq=403;rtptime=412672\r\n + //info := <-sink.onPlayResponse + //response.Header.Set("RTP-Info", fmt.Sprintf("url=%s;seq=%d;rtptime=%d", "rtsp://192.168.2.119:554/hls/mystream/?track=0", info[0], info[1])) return response, nil, nil } diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index 7c5a9bc..49298ef 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -21,18 +21,17 @@ const ( ) // TransStream rtsp传输流封装 -// 低延迟是rtsp特性, 所以不考虑实现GOP缓存 type TransStream struct { stream.BaseTransStream addr net.IPAddr addrType string urlFormat string - RtspTracks []*Track - oldTracks map[utils.AVCodecID]uint16 // 上次推流的rtp seq - sdp string - - rtpBuffer *stream.RtpBuffer + sdp string + RtspTracks []*Track + lastEndSeq map[utils.AVCodecID]uint16 // 上次结束推流的rtp seq + segments []stream.TransStreamSegment // 缓存的切片 + packetAllocator *stream.RtpBuffer // 分配rtp包 } func (t *TransStream) OverTCP(data []byte, channel int) { @@ -53,76 +52,98 @@ func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*colle t.Tracks[trackIndex].Pts = t.Tracks[trackIndex].Dts + packet.GetPtsDtsDelta(track.payload.ClockRate) if utils.AVMediaTypeAudio == packet.MediaType { - result = t.PackRtpPayload(track, trackIndex, packet.Data, ts) + result = t.PackRtpPayload(track, trackIndex, packet.Data, ts, false) } else if utils.AVMediaTypeVideo == packet.MediaType { annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet) - data := avc.RemoveStartCode(annexBData) - result = t.PackRtpPayload(track, trackIndex, data, ts) + result = t.PackRtpPayload(track, trackIndex, annexBData, ts, packet.Key) } return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil } -func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { - // 返回视频编码数据的rtp包 - for _, track := range t.RtspTracks { - if utils.AVMediaTypeVideo != track.MediaType { - continue - } - - // 回滚序号和时间戳 - index := int(track.StartSeq) - len(track.ExtraDataBuffer) - for i, packet := range track.ExtraDataBuffer { - rtp.RollbackSeq(packet.Get()[OverTcpHeaderSize:], index+i+1) - binary.BigEndian.PutUint32(packet.Get()[OverTcpHeaderSize+4:], uint32(ts)) - } - - // 目前只有视频需要发送扩展数据的rtp包, 所以直接返回 - return track.ExtraDataBuffer, ts, nil - } - - return nil, ts, nil +func (t *TransStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) { + // 默认不开启rtsp的关键帧缓存, 一次发送rtp包过多, 播放器的jitter buffer可能会溢出丢弃, 造成播放花屏 + //return t.segments, nil + return nil, nil } // PackRtpPayload 打包返回rtp over tcp的数据包 -func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) []*collections.ReferenceCounter[[]byte] { +func (t *TransStream) PackRtpPayload(track *Track, trackIndex int, data []byte, timestamp uint32, videoKey bool) []*collections.ReferenceCounter[[]byte] { + // 分割nalu + var payloads [][]byte + if utils.AVCodecIdH264 == track.CodecID || utils.AVCodecIdH265 == track.CodecID { + avc.SplitNalU(data, func(nalu []byte) { + payloads = append(payloads, avc.RemoveStartCode(nalu)) + }) + } else { + payloads = append(payloads, data) + } + var result []*collections.ReferenceCounter[[]byte] var packet []byte var counter *collections.ReferenceCounter[[]byte] - // 保存开始序号 - track.StartSeq = track.Muxer.GetHeader().Seq - track.Muxer.Input(data, timestamp, func() []byte { - counter = t.rtpBuffer.Get() - counter.Refer() + for _, payload := range payloads { + // 保存开始序号 + track.StartSeq = track.Muxer.GetHeader().Seq + track.Muxer.Input(payload, timestamp, func() []byte { + counter = t.packetAllocator.Get() + counter.Refer() - packet = counter.Get() - // 预留rtp over tcp 4字节头部 - return packet[OverTcpHeaderSize:] - }, func(bytes []byte) { - track.EndSeq = track.Muxer.GetHeader().Seq - // 每个包都存在rtp over tcp 4字节头部 - overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)] - t.OverTCP(overTCPPacket, channel) + packet = counter.Get() + // 预留rtp over tcp 4字节头部 + return packet[OverTcpHeaderSize:] + }, func(bytes []byte) { + track.EndSeq = track.Muxer.GetHeader().Seq + // 每个包都存在rtp over tcp 4字节头部 + overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)] + t.OverTCP(overTCPPacket, trackIndex) - counter.ResetData(overTCPPacket) - result = append(result, counter) - }) + counter.ResetData(overTCPPacket) + result = append(result, counter) + }) + } // 引用计数保持为1 for _, pkt := range result { pkt.Release() } + if t.HasVideo() && stream.AppConfig.GOPCache { + // 遇到视频关键帧, 丢弃前一帧缓存 + if videoKey { + for _, segment := range t.segments { + for _, pkt := range segment.Data { + pkt.Release() + } + } + + t.segments = t.segments[:0] + } + + // 计数+1 + for _, pkt := range result { + pkt.Refer() + } + + // 放在缓存末尾 + t.segments = append(t.segments, stream.TransStreamSegment{ + Data: result, + TS: int64(timestamp), + Key: videoKey, + Index: trackIndex, + }) + } + return result } func (t *TransStream) AddTrack(track *stream.Track) (int, error) { // 恢复上次拉流的序号 var startSeq uint16 - if t.oldTracks != nil { + if t.lastEndSeq != nil { var ok bool - startSeq, ok = t.oldTracks[track.Stream.CodecID] + startSeq, ok = t.lastEndSeq[track.Stream.CodecID] utils.Assert(ok) } @@ -139,15 +160,9 @@ func (t *TransStream) AddTrack(track *stream.Track) (int, error) { trackIndex := len(t.RtspTracks) - 1 // 将sps和pps按照单一模式打包 - var extraDataPackets []*collections.ReferenceCounter[[]byte] packAndAdd := func(data []byte) { - packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0) - for _, packet := range packets { - extra := packet.Get() - bytes := make([]byte, len(extra)) - copy(bytes, extra) - extraDataPackets = append(extraDataPackets, collections.NewReferenceCounter(bytes)) - } + packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0, true) + utils.Assert(len(packets) == 1) } if utils.AVMediaTypeVideo == track.Stream.MediaType { @@ -161,21 +176,19 @@ func (t *TransStream) AddTrack(track *stream.Track) (int, error) { ppsBytes := parameters.PPS() packAndAdd(avc.RemoveStartCode(spsBytes[0])) packAndAdd(avc.RemoveStartCode(ppsBytes[0])) - - t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets } return trackIndex, nil } -func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { +func (t *TransStream) Close() ([]stream.TransStreamSegment, error) { for _, track := range t.RtspTracks { if track != nil { track.Close() } } - return nil, 0, nil + return nil, nil } func (t *TransStream) WriteHeader() error { @@ -263,10 +276,10 @@ func (t *TransStream) WriteHeader() error { func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[utils.AVCodecID]uint16) stream.TransStream { t := &TransStream{ - addr: addr, - urlFormat: urlFormat, - oldTracks: oldTracks, - rtpBuffer: stream.NewRtpBuffer(512), + addr: addr, + urlFormat: urlFormat, + lastEndSeq: oldTracks, + packetAllocator: stream.NewRtpBuffer(512), } if addr.IP.To4() != nil { diff --git a/rtsp/rtsp_track.go b/rtsp/rtsp_track.go index 5b1c1ed..e8165cd 100644 --- a/rtsp/rtsp_track.go +++ b/rtsp/rtsp_track.go @@ -1,7 +1,6 @@ package rtsp import ( - "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/rtp" ) @@ -13,20 +12,19 @@ type Track struct { StartSeq uint16 EndSeq uint16 CodecID utils.AVCodecID - - Muxer rtp.Muxer - ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用 + Muxer rtp.Muxer } func (r *Track) Close() { + } func NewRTSPTrack(muxer rtp.Muxer, payload rtp.PayloadType, mediaType utils.AVMediaType, id utils.AVCodecID) *Track { stream := &Track{ payload: payload, - Muxer: muxer, MediaType: mediaType, CodecID: id, + Muxer: muxer, } return stream diff --git a/stream/config.go b/stream/config.go index ab0c2a4..2f6eb89 100644 --- a/stream/config.go +++ b/stream/config.go @@ -308,7 +308,7 @@ func SetDefaultConfig(config *AppConfig_) { if !config.GOPCache { config.GOPCache = true config.MergeWriteLatency = 350 - log.Sugar.Warnf("强制开启GOP缓存") + println("强制开启GOP缓存") } config.MergeWriteLatency = limitInt(350, 2000, config.MergeWriteLatency) // 最低缓存350毫秒数据才发送 最高缓存2秒数据才发送 diff --git a/stream/rtp_buffer.go b/stream/rtp_buffer.go index ee8284a..ebb2b1c 100644 --- a/stream/rtp_buffer.go +++ b/stream/rtp_buffer.go @@ -10,11 +10,11 @@ type RtpBuffer struct { func (r *RtpBuffer) Get() *collections.ReferenceCounter[[]byte] { if r.queue.Size() > 0 { - rtp := r.queue.Peek(0) - if rtp.UseCount() < 2 { - bytes := rtp.Get() - rtp.ResetData(bytes[:cap(bytes)]) - return rtp + pkt := r.queue.Peek(0) + if pkt.UseCount() < 2 { + r.queue.Pop() + r.Put(pkt) + return pkt } } @@ -34,6 +34,13 @@ func (r *RtpBuffer) Clear() { } } +// Put 归还rtp包 +func (r *RtpBuffer) Put(pkt *collections.ReferenceCounter[[]byte]) { + bytes := pkt.Get() + pkt.ResetData(bytes[:cap(bytes)]) + r.queue.Push(pkt) +} + func NewRtpBuffer(capacity int) *RtpBuffer { return &RtpBuffer{queue: collections.NewQueue[*collections.ReferenceCounter[[]byte]](capacity)} } diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 2c8353e..154ba93 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -349,6 +349,9 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t // 设置转发流 if TransStreamGBCascaded == transStream.GetProtocol() { t.forwardTransStream = transStream + } else if AppConfig.GOPCache && t.hasVideo { + // 新建传输流,发送GOP缓存 + t.DispatchGOPBuffer(transStream) } return transStream, nil @@ -423,6 +426,12 @@ func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int } } +func (t *transStreamPublisher) DispatchSegments(transStream TransStream, segments []TransStreamSegment) { + for _, segment := range segments { + t.DispatchBuffer(transStream, segment.Index, segment.Data, segment.TS, segment.Key) + } +} + func (t *transStreamPublisher) pendingSink(sink Sink) { log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), t.source) go sink.Close() @@ -445,6 +454,12 @@ func (t *transStreamPublisher) write(sink Sink, index int, data []*collections.R return false } +func (t *transStreamPublisher) writeSegments(sink Sink, segments []TransStreamSegment) { + for _, segment := range segments { + t.write(sink, segment.Index, segment.Data, segment.TS, segment.Key) + } +} + // 创建sink需要的输出流 func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 @@ -554,18 +569,13 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { } // 发送已缓存的合并写切片 - keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer() - if len(keyBuffer) > 0 { - if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - t.write(sink, 0, extraData, timestamp, false) + segments, _ := transStream.ReadKeyFrameBuffer() + if len(segments) > 0 { + if extraData, _, _ := transStream.ReadExtraData(0); len(extraData) > 0 { + t.write(sink, 0, extraData, 0, false) } - t.write(sink, 0, keyBuffer, timestamp, true) - } - - // 新建传输流,发送已经缓存的音视频帧 - if !exist && AppConfig.GOPCache && t.hasVideo && TransStreamGBCascaded != transStream.GetProtocol() { - t.DispatchGOPBuffer(transStream) + t.writeSegments(sink, segments) } return true @@ -674,9 +684,9 @@ func (t *transStreamPublisher) doClose() { // 关闭所有输出流 for _, transStream := range t.transStreams { // 发送剩余包 - data, ts, _ := transStream.Close() - if len(data) > 0 { - t.DispatchBuffer(transStream, -1, data, ts, true) + segments, _ := transStream.Close() + if len(segments) > 0 { + t.DispatchSegments(transStream, segments) } // 如果是tcp传输流, 归还合并写缓冲区 diff --git a/stream/trans_stream.go b/stream/trans_stream.go index 348b326..589a30c 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -43,10 +43,10 @@ type TransStream interface { ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error) // ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列 - ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) + ReadKeyFrameBuffer() ([]TransStreamSegment, error) // Close 关闭传输流, 返回还未flush的合并写块 - Close() ([]*collections.ReferenceCounter[[]byte], int64, error) + Close() ([]TransStreamSegment, error) HasVideo() bool @@ -55,6 +55,13 @@ type TransStream interface { GetMWBuffer() MergeWritingBuffer } +type TransStreamSegment struct { + Data []*collections.ReferenceCounter[[]byte] + TS int64 + Key bool + Index int +} + type BaseTransStream struct { ID TransStreamID Tracks []*Track @@ -119,8 +126,8 @@ func (t *BaseTransStream) FindTrackWithStreamIndex(streamIndex int) *Track { return nil } -func (t *BaseTransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { - return nil, 0, nil +func (t *BaseTransStream) Close() ([]TransStreamSegment, error) { + return nil, nil } func (t *BaseTransStream) GetProtocol() TransStreamProtocol { @@ -167,8 +174,8 @@ func (t *BaseTransStream) ReadExtraData(timestamp int64) ([]*collections.Referen return nil, 0, nil } -func (t *BaseTransStream) ReadKeyFrameBuffer() ([]*collections.ReferenceCounter[[]byte], int64, error) { - return nil, 0, nil +func (t *BaseTransStream) ReadKeyFrameBuffer() ([]TransStreamSegment, error) { + return nil, nil } func (t *BaseTransStream) IsTCPStreaming() bool {