From a2c372a367d33699eb84e1fee77586bdb674b6f2 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Thu, 24 Jul 2025 14:32:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=A0=E8=BE=93=E6=B5=81=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E6=88=B3=E6=A0=B9=E6=8D=AEduration=E7=B4=AF=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bridge.go | 33 +++++++------- flv/http_flv.go | 13 ++++-- hls/hls_stream.go | 24 +++++----- rtmp/rtmp_stream.go | 9 ++-- rtsp/rtsp_stream.go | 30 +++++++++---- stream/source.go | 2 +- stream/stream_endinfo.go | 20 ++++----- stream/stream_publisher.go | 92 +++++++++++--------------------------- 8 files changed, 105 insertions(+), 118 deletions(-) diff --git a/bridge.go b/bridge.go index c7d8de6..e9d0df0 100644 --- a/bridge.go +++ b/bridge.go @@ -10,22 +10,14 @@ import ( // 处理不同包不能相互引用的需求 -func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream.TransStreamID]stream.TransStream) *stream.StreamEndInfo { - if len(tracks) < 1 || len(streams) < 1 { +func NewStreamEndInfo(source string, streams map[stream.TransStreamID]stream.TransStream) *stream.StreamEndInfo { + if len(streams) < 1 { return nil } info := stream.StreamEndInfo{ ID: source, - Timestamps: make(map[utils.AVCodecID][2]int64, len(tracks)), - } - - for _, track := range tracks { - var timestamp [2]int64 - timestamp[0] = track.Dts + int64(track.FrameDuration) - timestamp[1] = track.Pts + int64(track.FrameDuration) - - info.Timestamps[track.Stream.CodecID] = timestamp + Timestamps: make(map[stream.TransStreamID]map[utils.AVCodecID][2]int64, len(streams)), } for _, transStream := range streams { @@ -33,19 +25,28 @@ func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream. if stream.TransStreamHls == transStream.GetProtocol() { if hls := transStream.(*hls.TransStream); hls.M3U8Writer.Size() > 0 { info.M3U8Writer = hls.M3U8Writer - info.PlaylistFormat = hls.PlaylistFormatPtr + info.PlaylistFormat = hls.PlaylistFormat } } else if stream.TransStreamRtsp == transStream.GetProtocol() { if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 { - info.RtspTracks = make(map[int]uint16, len(tracks)) + info.RtspTracks = make(map[utils.AVCodecID]uint16, 8) for _, track := range rtsp.RtspTracks { - info.RtspTracks[int(track.CodecID)] = track.EndSeq + info.RtspTracks[track.CodecID] = track.EndSeq } } } else if stream.TransStreamFlv == transStream.GetProtocol() { - stream := transStream.(*flv.TransStream) - info.FLVPrevTagSize = stream.Muxer.PrevTagSize() + flv := transStream.(*flv.TransStream) + info.FLVPrevTagSize = flv.Muxer.PrevTagSize() } + + // 保存传输流最后的时间戳 + tracks := transStream.GetTracks() + ts := make(map[utils.AVCodecID][2]int64, len(tracks)) + for _, track := range tracks { + ts[track.Stream.CodecID] = [2]int64{track.Dts, track.Pts} + } + + info.Timestamps[transStream.GetID()] = ts } return &info diff --git a/flv/http_flv.go b/flv/http_flv.go index d84933a..c8d8136 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -18,7 +18,7 @@ type TransStream struct { flvExtraDataBlock []byte // metadata和sequence header } -func (t *TransStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { +func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { t.ClearOutStreamBuffer() var flvTagSize int @@ -29,12 +29,19 @@ func (t *TransStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.Re var keyBuffer bool var frameType int - dts = packet.ConvertDts(1000) - pts = packet.ConvertPts(1000) + duration := packet.GetDuration(1000) + track := t.Tracks[index] + dts = track.Dts + pts = track.Pts + track.Dts += duration + track.Pts = track.Dts + packet.GetPtsDtsDelta(1000) + if utils.AVMediaTypeAudio == packet.MediaType { + //log.Sugar.Infof("audio packet dts: %d, pts: %d data size: %d", dts, pts, len(packet.Data)) data = packet.Data flvTagSize = flv.TagHeaderSize + t.Muxer.ComputeAudioDataHeaderSize() + len(packet.Data) } else if utils.AVMediaTypeVideo == packet.MediaType { + //log.Sugar.Infof("video packet dts: %d, pts: %d", dts, pts) data = avformat.AnnexBPacket2AVCC(packet) flvTagSize = flv.TagHeaderSize + t.Muxer.ComputeVideoDataHeaderSize(uint32(pts-dts)) + len(data) if videoKey = packet.Key; videoKey { diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 3321690..5ecf0a3 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -35,8 +35,8 @@ type TransStream struct { duration int // 切片时长, 单位秒 playlistLength int // 最大切片文件个数 - PlaylistFormatPtr *string // 位于内存中的m3u8播放列表,每个sink都引用指针地址. - PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink + PlaylistFormat *string // 位于内存中的m3u8播放列表,每个sink都引用指针地址. + PlaylistFormatPtr []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink } func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { @@ -60,8 +60,12 @@ func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collection newSegment = true } - pts := packet.ConvertPts(90000) - dts := packet.ConvertDts(90000) + duration := packet.GetDuration(90000) + dts := t.Tracks[index].Dts + pts := t.Tracks[index].Pts + t.Tracks[index].Dts += duration + t.Tracks[index].Pts = t.Tracks[index].Dts + packet.GetPtsDtsDelta(90000) + data := packet.Data if utils.AVMediaTypeVideo == packet.MediaType { data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet) @@ -83,7 +87,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collection // 缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿. if newSegment && t.M3U8Writer.Size() > 1 { - return t.PlaylistFormatPtrCounter, -1, true, nil + return t.PlaylistFormatPtr, -1, true, nil } return nil, -1, true, nil @@ -132,7 +136,7 @@ func (t *TransStream) flushSegment(end bool) error { // m3u8Txt += "#EXT-X-ENDLIST" //} - *t.PlaylistFormatPtr = m3u8Txt + *t.PlaylistFormat = m3u8Txt // 写入最新的m3u8到文件 if t.m3u8File != nil { @@ -273,13 +277,13 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play } if playlistFormat != nil { - transStream.PlaylistFormatPtr = playlistFormat + transStream.PlaylistFormat = playlistFormat } else { - transStream.PlaylistFormatPtr = new(string) + transStream.PlaylistFormat = new(string) } - playlistFormatPtrCounter := collections.NewReferenceCounter[[]byte](stringPtrToBytes(transStream.PlaylistFormatPtr)) - transStream.PlaylistFormatPtrCounter = append(transStream.PlaylistFormatPtrCounter, playlistFormatPtrCounter) + playlistFormatPtrCounter := collections.NewReferenceCounter[[]byte](stringPtrToBytes(transStream.PlaylistFormat)) + transStream.PlaylistFormatPtr = append(transStream.PlaylistFormatPtr, playlistFormatPtrCounter) // 创建TS封装器 muxer := mpeg.NewTSMuxer() diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index ed9bef3..ae403e8 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -21,7 +21,7 @@ type transStream struct { metaData *amf0.Object // 推流方携带的元数据 } -func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { +func (t *transStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { t.ClearOutStreamBuffer() var data []byte @@ -37,9 +37,12 @@ func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.Re var keyBuffer bool var frameType int - dts = packet.ConvertDts(1000) - pts = packet.ConvertPts(1000) + duration := packet.GetDuration(1000) + dts = t.Tracks[index].Dts + pts = t.Tracks[index].Pts ct := pts - dts + t.Tracks[index].Dts += duration + t.Tracks[index].Pts = t.Tracks[index].Dts + packet.GetPtsDtsDelta(1000) // chunk = header+payload(audio data / video data) if utils.AVMediaTypeAudio == packet.MediaType { diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index cf0b2e7..7c5a9bc 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -7,6 +7,7 @@ import ( "github.com/lkmio/avformat/avc" "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" + "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" "github.com/lkmio/rtp" "github.com/pion/sdp/v3" @@ -28,9 +29,8 @@ type TransStream struct { urlFormat string RtspTracks []*Track - //oldTracks []*Track - oldTracks map[int]uint16 - sdp string + oldTracks map[utils.AVCodecID]uint16 // 上次推流的rtp seq + sdp string rtpBuffer *stream.RtpBuffer } @@ -45,11 +45,16 @@ func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*colle var ts uint32 var result []*collections.ReferenceCounter[[]byte] track := t.RtspTracks[trackIndex] + + duration := packet.GetDuration(track.payload.ClockRate) + //dts := t.Tracks[trackIndex].Dts + ts = uint32(t.Tracks[trackIndex].Pts) + t.Tracks[trackIndex].Dts += duration + t.Tracks[trackIndex].Pts = t.Tracks[trackIndex].Dts + packet.GetPtsDtsDelta(track.payload.ClockRate) + if utils.AVMediaTypeAudio == packet.MediaType { - ts = uint32(packet.ConvertPts(track.payload.ClockRate)) result = t.PackRtpPayload(track, trackIndex, packet.Data, ts) } else if utils.AVMediaTypeVideo == packet.MediaType { - ts = uint32(packet.ConvertPts(track.payload.ClockRate)) annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet) data := avc.RemoveStartCode(annexBData) result = t.PackRtpPayload(track, trackIndex, data, ts) @@ -92,9 +97,11 @@ func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, tim counter.Refer() packet = counter.Get() + // 预留rtp over tcp 4字节头部 return packet[OverTcpHeaderSize:] }, func(bytes []byte) { track.EndSeq = track.Muxer.GetHeader().Seq + // 每个包都存在rtp over tcp 4字节头部 overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)] t.OverTCP(overTCPPacket, channel) @@ -115,7 +122,7 @@ func (t *TransStream) AddTrack(track *stream.Track) (int, error) { var startSeq uint16 if t.oldTracks != nil { var ok bool - startSeq, ok = t.oldTracks[int(track.Stream.CodecID)] + startSeq, ok = t.oldTracks[track.Stream.CodecID] utils.Assert(ok) } @@ -238,7 +245,7 @@ func (t *TransStream) WriteHeader() error { mediaDescription.Attributes = append(mediaDescription.Attributes, fmtp) } - } else { + } else if utils.AVMediaTypeVideo == track.MediaType { mediaDescription.MediaName.Media = "video" } @@ -254,7 +261,7 @@ func (t *TransStream) WriteHeader() error { return nil } -func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[int]uint16) stream.TransStream { +func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[utils.AVCodecID]uint16) stream.TransStream { t := &TransStream{ addr: addr, urlFormat: urlFormat, @@ -273,9 +280,14 @@ func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[int]uint16) func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) { trackFormat := "?track=%d" - var oldTracks map[int]uint16 + var oldTracks map[utils.AVCodecID]uint16 if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil { oldTracks = endInfo.RtspTracks + if oldTracks != nil { + for codecID, seq := range oldTracks { + log.Sugar.Infof("track codecID: %s, seq: %d", codecID, seq) + } + } } return NewTransStream(net.IPAddr{ diff --git a/stream/source.go b/stream/source.go index e1f4f8e..2a84bec 100644 --- a/stream/source.go +++ b/stream/source.go @@ -16,7 +16,7 @@ import ( ) var ( - StreamEndInfoBride func(source string, tracks []*Track, streams map[TransStreamID]TransStream) *StreamEndInfo + StreamEndInfoBride func(source string, streams map[TransStreamID]TransStream) *StreamEndInfo ) // Source 对推流源的封装 diff --git a/stream/stream_endinfo.go b/stream/stream_endinfo.go index 1a73134..73698b7 100644 --- a/stream/stream_endinfo.go +++ b/stream/stream_endinfo.go @@ -18,11 +18,11 @@ func init() { // 如果重新推流之前,陆续有拉流端断开,直至sink计数为0,删除保存的推流信息。 type StreamEndInfo struct { ID string - Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳 - M3U8Writer M3U8Writer // 保存M3U8生成器 - PlaylistFormat *string // M3U8播放列表 - RtspTracks map[int]uint16 // rtsp每路track的结束序号 - FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size + Timestamps map[TransStreamID]map[utils.AVCodecID][2]int64 // 每路track结束时间戳 + M3U8Writer M3U8Writer // 保存M3U8生成器 + PlaylistFormat *string // M3U8播放列表 + RtspTracks map[utils.AVCodecID]uint16 // rtsp每路track的结束序号 + FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size } func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool { @@ -30,11 +30,11 @@ func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool { // return false //} - for _, track := range tracks { - if _, ok := info.Timestamps[track.Stream.CodecID]; !ok { - return false - } - } + //for _, track := range tracks { + // if _, ok := info.Timestamps[track.Stream.CodecID]; !ok { + // return false + // } + //} return true } diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 701adbc..21ab926 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -90,9 +90,7 @@ type transStreamPublisher struct { hasVideo bool // 是否存在视频 completed atomic.Bool // 推流track是否解析完毕 closed atomic.Bool - streamEndInfo *StreamEndInfo // 之前推流源信息 - accumulateTimestamps bool // 是否累加时间戳 - timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳 + streamEndInfo *StreamEndInfo // 上次结束推流的信息 lastStreamEndTime time.Time // 最近结束拉流的时间 bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区 } @@ -215,7 +213,7 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t // 匹配和创建适合TransStream流协议的track var finalTracks []*Track for _, track := range tracks { - // 对应传输流支持的编码器列表 + // 传输流支持的编码器列表 supportedCodecs, ok := SupportedCodes[protocol] if !ok { panic(fmt.Sprintf("unknown protocol %s", protocol.String())) @@ -240,7 +238,7 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t if !ok { log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID) - // 尝试音频转码 + // 如果没有开启音频转码或非音频流,跳过 if utils.AVMediaTypeAudio != track.Stream.MediaType || transcode.CreateAudioTranscoder == nil { continue } @@ -272,19 +270,10 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t stream.Index = len(t.originTracks.tracks) + len(t.transcodeTracks) newTrack := &Track{Stream: stream} - // 如果之前有转码过, 则使用之前的时间戳 - if t.streamEndInfo != nil { - oldTimestamps, ok := t.streamEndInfo.Timestamps[transcoder.GetEncoderID()] - if ok { - newTrack.Dts = oldTimestamps[0] - newTrack.Pts = oldTimestamps[1] - } - } - transcodeTrack = NewTranscodeTrack(newTrack, transcoder) t.transcodeTracks[transcoder.GetEncoderID()] = transcodeTrack - // 转码GOPBuffer中的音频 + // 转码GOP中的推流音频 t.transcodeGOPBuffer(transcodeTrack) } else { log.Sugar.Infof("使用已经存在的音频转码track source: %s stream: %s src: %s dst: %s", t.source, protocol.String(), track.Stream.CodecID, transcodeTrack.transcoder.GetEncoderID()) @@ -293,9 +282,11 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t track = transcodeTrack.track } - // 重新拷贝一个track,传输流内部使用track的时间戳, - newTrack := *track - finalTracks = append(finalTracks, &newTrack) + // 创建新的track + newTrack := &Track{ + Stream: track.Stream, + } + finalTracks = append(finalTracks, newTrack) } if len(finalTracks) < 1 { @@ -337,6 +328,19 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t transStream.SetID(id) transStream.SetProtocol(protocol) + // 使用上次推流结束的时间戳 + if t.streamEndInfo != nil { + oldTimestamps, ok := t.streamEndInfo.Timestamps[id] + if ok { + for _, track := range transStream.GetTracks() { + track.Dts = oldTimestamps[track.Stream.CodecID][0] + track.Pts = oldTimestamps[track.Stream.CodecID][1] + + log.Sugar.Debugf("使用上次结束推流的时间戳 source: %s stream: %s track: %s dts: %d pts: %d", t.source, protocol, track.Stream.CodecID, track.Dts, track.Pts) + } + } + } + t.transStreams[id] = transStream // 创建输出流对应的拉流队列 t.transStreamSinks[id] = make(map[SinkID]Sink, 128) @@ -396,13 +400,13 @@ func (t *transStreamPublisher) DispatchPacketToStream(transStream TransStream, p // DispatchBuffer 分发传输流 func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) { sinks := t.transStreamSinks[transStream.GetID()] - exist := transStream.HasVideo() + hasVideo := transStream.HasVideo() for _, sink := range sinks { if sink.GetSentPacketCount() < 1 { // 如果存在视频, 确保向sink发送的第一帧是关键帧 - if exist && !keyVideo { + if hasVideo && !keyVideo { continue } @@ -663,7 +667,7 @@ func (t *transStreamPublisher) doClose() { tracks = append(tracks, track.track) } - sourceHistory := StreamEndInfoBride(t.source, tracks, t.transStreams) + sourceHistory := StreamEndInfoBride(t.source, t.transStreams) streamEndInfoManager.Add(sourceHistory) } @@ -720,20 +724,6 @@ func (t *transStreamPublisher) WriteHeader() { // 尝试使用上次结束推流的时间戳 if streamInfo := streamEndInfoManager.Remove(t.source); streamInfo != nil && EqualsTracks(streamInfo, t.originTracks.All()) { t.streamEndInfo = streamInfo - - // 恢复每路track的时间戳 - for _, track := range t.originTracks.All() { - timestamps := streamInfo.Timestamps[track.Stream.CodecID] - track.Dts = timestamps[0] - track.Pts = timestamps[1] - } - } - - // 纠正GOP中的时间戳 - if t.gopBuffer != nil && t.gopBuffer.Size() != 0 { - t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { - t.CorrectTimestamp(packet.Get()) - }) } // 创建录制流和HLS @@ -810,8 +800,6 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av // track解析完毕后,才能生成传输流 if t.completed.Load() { - t.CorrectTimestamp(packet.Get()) - // 分发给各个传输流 t.DispatchPacket(packet.Get()) @@ -819,6 +807,7 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av for _, track := range t.transcodeTracks { transcodePackets := track.Input(packet.Get()) for _, transcodePkt := range transcodePackets { + //log.Sugar.Infof("packet dts: %d, pts: %d, t dts: %d, pts: %d", packet.Get().Dts, packet.Get().Pts, transcodePkt.Dts, transcodePkt.Pts) t.DispatchPacket(transcodePkt) } } @@ -844,35 +833,6 @@ func (t *transStreamPublisher) OnNewTrack(track *Track) { } } -// CorrectTimestamp 纠正时间戳 -func (t *transStreamPublisher) CorrectTimestamp(packet *avformat.AVPacket) { - // 对比第一包的时间戳和上次推流的最后时间戳。如果小于上次的推流时间戳,则在原来的基础上累加。 - if t.streamEndInfo != nil && !t.timestampModeDecided { - t.timestampModeDecided = true - - timestamps := t.streamEndInfo.Timestamps[packet.CodecID] - t.accumulateTimestamps = true - log.Sugar.Infof("使用上次推流的时间戳 dts: %d, pts: %d", timestamps[0], timestamps[1]) - } - - track := t.originTracks.Find(packet.CodecID) - if track == nil { - return - } - duration := packet.GetDuration(packet.Timebase) - - // 根据duration来累加时间戳 - if t.accumulateTimestamps { - offset := packet.Pts - packet.Dts - packet.Dts = track.Dts + duration - packet.Pts = packet.Dts + offset - } - - track.Dts = packet.Dts - track.Pts = packet.Pts - track.FrameDuration = int(duration) -} - func (t *transStreamPublisher) GetTransStreams() map[TransStreamID]TransStream { return t.transStreams }