diff --git a/codec/mpegps/mpegps.go b/codec/mpegps/mpegps.go index 1e1d4f8..cb6217c 100644 --- a/codec/mpegps/mpegps.go +++ b/codec/mpegps/mpegps.go @@ -37,7 +37,7 @@ type MpegPsStream struct { video MpegPsEsStream } -func (ps *MpegPsStream) Drop() { +func (ps *MpegPsStream) Reset() { ps.buffer.Reset() ps.audio.Reset() if ps.video.Buffer.CanRead() { diff --git a/codec/mpegps/ps-demuxer.go b/codec/mpegps/ps-demuxer.go index 11ab7dd..79028a7 100644 --- a/codec/mpegps/ps-demuxer.go +++ b/codec/mpegps/ps-demuxer.go @@ -166,7 +166,7 @@ func (psdemuxer *PSDemuxer) Feed(data []byte) error { return ret } -func (psdemuxer *PSDemuxer) Drop() { +func (psdemuxer *PSDemuxer) Reset() { psdemuxer.cache = psdemuxer.cache[:0] for _, stream := range psdemuxer.streamMap { if len(stream.streamBuf) == 0 { diff --git a/common/dtsestimator.go b/common/dtsestimator.go index a7ee56c..e4d39f8 100644 --- a/common/dtsestimator.go +++ b/common/dtsestimator.go @@ -58,10 +58,12 @@ func (d *DTSEstimator) Feed(pts uint32) uint32 { dts = d.cache[0] } - if d.prevDTS > dts { - dts = d.prevDTS + // if d.prevDTS > dts { + // dts = d.prevDTS + // } + if d.prevDTS >= dts { + dts = d.prevDTS + 90 } - d.prevPTS = pts d.prevDTS = dts return dts diff --git a/common/frame.go b/common/frame.go index ed21966..238678f 100644 --- a/common/frame.go +++ b/common/frame.go @@ -46,8 +46,7 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame { type BaseFrame struct { DeltaTime uint32 // 相对上一帧时间戳,毫秒 - AbsTime uint32 // 绝对时间戳,毫秒 - Timestamp time.Time // 写入时间,可用于比较两个帧的先后 + WriteTime time.Time // 写入时间,可用于比较两个帧的先后 Sequence uint32 // 在一个Track中的序号 BytesIn int // 输入字节数用于计算BPS } @@ -59,21 +58,22 @@ type DataFrame[T any] struct { type AVFrame struct { BaseFrame - IFrame bool - CanRead bool `json:"-"` - PTS uint32 - DTS uint32 - ADTS *util.ListItem[util.Buffer] `json:"-"` // ADTS头 - AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) - RTP util.List[RTPFrame] `json:"-"` - AUList util.BLLs `json:"-"` // 裸数据 + IFrame bool + CanRead bool `json:"-"` + PTS time.Duration + DTS time.Duration + Timestamp time.Duration // 绝对时间戳 + ADTS *util.ListItem[util.Buffer] `json:"-"` // ADTS头 + AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format) + RTP util.List[RTPFrame] `json:"-"` + AUList util.BLLs `json:"-"` // 裸数据 } func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) { if ts == 0 { ts = 1 } - av.AbsTime = ts + av.Timestamp = time.Duration(ts) * time.Millisecond av.BytesIn += frame.ByteLength for { item := frame.Shift() @@ -96,7 +96,7 @@ func (av *AVFrame) Reset() { av.ADTS = nil } av.BytesIn = 0 - av.AbsTime = 0 + av.Timestamp = 0 av.DeltaTime = 0 } diff --git a/common/index.go b/common/index.go index d4af0b8..acebfeb 100644 --- a/common/index.go +++ b/common/index.go @@ -68,7 +68,7 @@ func (bt *Base) SnapForJson() { } func (bt *Base) Flush(bf *BaseFrame) { bt.ComputeBPS(bf.BytesIn) - bf.Timestamp = time.Now() + bf.WriteTime = time.Now() } func (bt *Base) SetStuff(stuff ...any) { for _, s := range stuff { diff --git a/http.go b/http.go index becedc6..1ca78b8 100644 --- a/http.go +++ b/http.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" "os" + "strings" "time" "gopkg.in/yaml.v3" @@ -227,11 +228,34 @@ func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Requ default: pub.ACodec = codec.CodecID_AAC } - pub.DumpFile = dumpFile - if err := Engine.Publish(streamPath, &pub); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + ss := strings.Split(dumpFile, ",") + if len(ss) > 1 { + if err := Engine.Publish(streamPath, &pub); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + for _, s := range ss { + f, err := os.Open(s) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + go pub.Feed(f) + } + w.Write([]byte("ok")) + } } else { - w.Write([]byte("ok")) + f, err := os.Open(dumpFile) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := Engine.Publish(streamPath, &pub); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + pub.SetIO(f) + w.Write([]byte("ok")) + go pub.Feed(f) + } } } @@ -245,12 +269,42 @@ func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request) if dumpFile == "" { dumpFile = streamPath + ".ts" } + f, err := os.Open(dumpFile) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } var pub TSPublisher if err := Engine.Publish(streamPath, &pub); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } else { - f, _ := os.Open(dumpFile) + pub.SetIO(f) go pub.Feed(f) w.Write([]byte("ok")) } } + +func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + streamPath := q.Get("streamPath") + if streamPath == "" { + streamPath = "dump/mp4" + } + dumpFile := q.Get("dump") + if dumpFile == "" { + dumpFile = streamPath + ".mp4" + } + var pub MP4Publisher + f, err := os.Open(dumpFile) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := Engine.Publish(streamPath, &pub); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + pub.SetIO(f) + w.Write([]byte("ok")) + go pub.ReadMP4Data(f) + } +} diff --git a/publisher-mp4.go b/publisher-mp4.go new file mode 100644 index 0000000..253f541 --- /dev/null +++ b/publisher-mp4.go @@ -0,0 +1,61 @@ +package engine + +import ( + "io" + + "github.com/yapingcat/gomedia/go-mp4" + "go.uber.org/zap" + "m7s.live/engine/v4/track" +) + +type MP4Publisher struct { + Publisher + *mp4.MovDemuxer `json:"-"` +} + +// Start reading the MP4 file +func (p *MP4Publisher) ReadMP4Data(source io.ReadSeeker) error { + defer p.Stop() + p.MovDemuxer = mp4.CreateMp4Demuxer(source) + if tracks, err := p.ReadHead(); err != nil { + if err == io.EOF { + p.Info("Reached end of MP4 file") + return nil + } + p.Error("Error reading MP4 header", zap.Error(err)) + return err + } else { + info := p.GetMp4Info() + p.Info("MP4 info", zap.Any("info", info)) + for _, t := range tracks { + p.Info("MP4 track", zap.Any("track", t)) + switch t.Cid { + case mp4.MP4_CODEC_H264: + p.VideoTrack = track.NewH264(p.Stream) + case mp4.MP4_CODEC_H265: + p.VideoTrack = track.NewH265(p.Stream) + case mp4.MP4_CODEC_AAC: + p.AudioTrack = track.NewAAC(p.Stream) + case mp4.MP4_CODEC_G711A: + p.AudioTrack = track.NewG711(p.Stream, true) + case mp4.MP4_CODEC_G711U: + p.AudioTrack = track.NewG711(p.Stream, false) + } + } + for { + pkg, err := p.ReadPacket() + if err != nil { + p.Error("Error reading MP4 packet", zap.Error(err)) + return err + } + switch pkg.Cid { + case mp4.MP4_CODEC_H264, mp4.MP4_CODEC_H265: + p.VideoTrack.WriteAnnexB(uint32(pkg.Pts), uint32(pkg.Dts), pkg.Data) + case mp4.MP4_CODEC_AAC: + p.AudioTrack.WriteADTS(uint32(pkg.Pts), pkg.Data) + case mp4.MP4_CODEC_G711A, mp4.MP4_CODEC_G711U: + p.AudioTrack.WriteRaw(uint32(pkg.Pts), pkg.Data) + } + } + } +} diff --git a/publisher-ps.go b/publisher-ps.go index 60904da..ae75f5c 100644 --- a/publisher-ps.go +++ b/publisher-ps.go @@ -22,9 +22,9 @@ type PSPublisher struct { // mpegps.MpegPsStream `json:"-"` // *mpegps.PSDemuxer `json:"-"` mpegps.DecPSPackage `json:"-"` - reorder util.RTPReorder[*cacheItem] - pool util.BytesPool - lastSeq uint16 + reorder util.RTPReorder[*cacheItem] + pool util.BytesPool + lastSeq uint16 } // 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt @@ -32,15 +32,13 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) { if p.Stream == nil { return } - if p.EsHandler == nil { + if p.pool == nil { // p.PSDemuxer = mpegps.NewPSDemuxer() // p.PSDemuxer.OnPacket = p.OnPacket // p.PSDemuxer.OnFrame = p.OnFrame p.EsHandler = p p.lastSeq = rtp.SequenceNumber - 1 - if p.pool == nil { - p.pool = make(util.BytesPool, 17) - } + p.pool = make(util.BytesPool, 17) } if p.DisableReorder { p.Feed(rtp.Payload) diff --git a/publisher-rtpdump.go b/publisher-rtpdump.go index fa9555b..94e88d0 100644 --- a/publisher-rtpdump.go +++ b/publisher-rtpdump.go @@ -2,8 +2,10 @@ package engine import ( "os" + "sync" "time" + "github.com/aler9/gortsplib/pkg/mpeg4audio" "github.com/pion/webrtc/v3/pkg/media/rtpdump" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -14,56 +16,84 @@ import ( type RTPDumpPublisher struct { Publisher - DumpFile string - VCodec codec.VideoCodecID - ACodec codec.AudioCodecID - file *os.File + VCodec codec.VideoCodecID + ACodec codec.AudioCodecID + other *rtpdump.Packet + sync.Mutex } -func (t *RTPDumpPublisher) OnEvent(event any) { - var err error - t.Publisher.OnEvent(event) - switch event.(type) { - case IPublisher: - t.file, err = os.Open(t.DumpFile) - if err != nil { - t.Stream.Error("RTPDumpPublisher open file error", zap.Error(err)) - return - } - r, h, err := rtpdump.NewReader(t.file) - if err != nil { - t.Stream.Error("RTPDumpPublisher open file error", zap.Error(err)) - return - } - t.Stream.Info("RTPDumpPublisher open file success", zap.String("file", t.DumpFile), zap.String("start", h.Start.String()), zap.String("source", h.Source.String()), zap.Uint16("port", h.Port)) +func (t *RTPDumpPublisher) Feed(file *os.File) { + r, h, err := rtpdump.NewReader(file) + if err != nil { + t.Stream.Error("RTPDumpPublisher open file error", zap.Error(err)) + return + } + t.Stream.Info("RTPDumpPublisher open file success", zap.String("file", file.Name()), zap.String("start", h.Start.String()), zap.String("source", h.Source.String()), zap.Uint16("port", h.Port)) + if t.VideoTrack == nil { switch t.VCodec { case codec.CodecID_H264: t.VideoTrack = track.NewH264(t.Publisher.Stream) case codec.CodecID_H265: t.VideoTrack = track.NewH265(t.Publisher.Stream) } + t.VideoTrack.SetSpeedLimit(500 * time.Millisecond) + } + if t.AudioTrack == nil { switch t.ACodec { case codec.CodecID_AAC: - t.AudioTrack = track.NewAAC(t.Publisher.Stream) + at := track.NewAAC(t.Publisher.Stream) + t.AudioTrack = at + var c mpeg4audio.Config + c.ChannelCount = 2 + c.SampleRate = 48000 + asc, _ := c.Marshal() + at.WriteSequenceHead(append([]byte{0xAF, 0x00}, asc...)) case codec.CodecID_PCMA: t.AudioTrack = track.NewG711(t.Publisher.Stream, true) case codec.CodecID_PCMU: t.AudioTrack = track.NewG711(t.Publisher.Stream, false) } - t.VideoTrack.SetSpeedLimit(500 * time.Millisecond) t.AudioTrack.SetSpeedLimit(500 * time.Millisecond) - for { - packet, err := r.Next() - if err != nil { - t.Stream.Error("RTPDumpPublisher read file error", zap.Error(err)) - return - } - if !packet.IsRTCP { - var frame common.RTPFrame - frame.Unmarshal(packet.Payload) - t.VideoTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame}) - } - // t.AudioTrack.WriteRTP(packet) + } + needLock := true + for { + packet, err := r.Next() + if err != nil { + t.Stream.Error("RTPDumpPublisher read file error", zap.Error(err)) + return } + if packet.IsRTCP { + continue + } + if needLock { + t.Lock() + } + if t.other == nil { + t.other = &packet + t.Unlock() + needLock = true + continue + } + if packet.Offset > t.other.Offset { + t.WriteRTP(t.other.Payload) + t.other = &packet + t.Unlock() + needLock = true + continue + } + needLock = false + t.WriteRTP(packet.Payload) + } +} +func (t *RTPDumpPublisher) WriteRTP(raw []byte) { + var frame common.RTPFrame + frame.Unmarshal(raw) + switch frame.PayloadType { + case 96: + t.VideoTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame}) + case 97: + t.AudioTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame}) + default: + t.Stream.Warn("RTPDumpPublisher unknown payload type", zap.Uint8("payloadType", frame.PayloadType)) } } diff --git a/subscriber.go b/subscriber.go index ff181a3..a6e378d 100644 --- a/subscriber.go +++ b/subscriber.go @@ -5,6 +5,7 @@ import ( "io" "net" "strconv" + "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -206,11 +207,11 @@ func (s *Subscriber) PlayBlock(subType byte) { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { // println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) - spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs}) + spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()}) } sendAudioFrame = func(frame *AVFrame) { // println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) - spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, frame.PTS - s.AudioReader.SkipRTPTs, frame.PTS - s.AudioReader.SkipRTPTs}) + spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()}) } case SUBTYPE_RTP: var videoSeq, audioSeq uint16 @@ -218,7 +219,9 @@ func (s *Subscriber) PlayBlock(subType byte) { // fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) frame.RTP.Range(func(vp RTPFrame) bool { videoSeq++ - vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipRTPTs + copy := *vp.Packet + vp.Packet = © + vp.Header.Timestamp = vp.Header.Timestamp - uint32(s.VideoReader.SkipTs*90/time.Millisecond) vp.Header.SequenceNumber = videoSeq spesic.OnEvent((VideoRTP)(vp)) return true @@ -229,8 +232,10 @@ func (s *Subscriber) PlayBlock(subType byte) { // fmt.Println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) frame.RTP.Range(func(ap RTPFrame) bool { audioSeq++ + copy := *ap.Packet + ap.Packet = © ap.Header.SequenceNumber = audioSeq - ap.Header.Timestamp = ap.Header.Timestamp - s.AudioReader.Track.MpegTs2RTPTs(s.AudioReader.SkipRTPTs) + ap.Header.Timestamp = ap.Header.Timestamp - uint32(s.AudioReader.SkipTs/time.Millisecond*time.Duration(s.AudioReader.Track.SampleRate)/1000) spesic.OnEvent((AudioRTP)(ap)) return true }) @@ -254,7 +259,7 @@ func (s *Subscriber) PlayBlock(subType byte) { sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead) } sendVideoFrame = func(frame *AVFrame) { - // println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame) + // fmt.Println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame) // b := util.Buffer(frame.AVCC.ToBytes()[5:]) // for b.CanRead() { // nalulen := int(b.ReadUint32()) @@ -278,7 +283,7 @@ func (s *Subscriber) PlayBlock(subType byte) { subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName)) } var videoFrame, audioFrame *AVFrame - var lastAbsTime uint32 + var lastAbsTime time.Duration for ctx.Err() == nil { if hasVideo { @@ -295,17 +300,18 @@ func (s *Subscriber) PlayBlock(subType byte) { } if hasAudio { if audioFrame != nil { - if frame.AbsTime > lastAbsTime { + if frame.Timestamp > lastAbsTime { // fmt.Println("switch audio", audioFrame.CanRead) if audioFrame.CanRead { sendAudioFrame(audioFrame) } + audioFrame = nil videoFrame = frame - lastAbsTime = frame.AbsTime + lastAbsTime = frame.Timestamp break } } else if lastAbsTime == 0 { - if lastAbsTime = frame.AbsTime; lastAbsTime != 0 { + if lastAbsTime = frame.Timestamp; lastAbsTime != 0 { videoFrame = frame break } @@ -330,7 +336,6 @@ func (s *Subscriber) PlayBlock(subType byte) { case track.READSTATE_NORMAL: if s.Video != nil { s.AudioReader.SkipTs = s.VideoReader.SkipTs - s.AudioReader.SkipRTPTs = s.AudioReader.Track.Ms2MpegTs(s.AudioReader.SkipTs) } } s.AudioReader.Read(ctx, subMode) @@ -344,17 +349,18 @@ func (s *Subscriber) PlayBlock(subType byte) { sendAudioDecConf() } if hasVideo && videoFrame != nil { - if frame.AbsTime > lastAbsTime { + if frame.Timestamp > lastAbsTime { // fmt.Println("switch video", videoFrame.CanRead) if videoFrame.CanRead { sendVideoFrame(videoFrame) } + videoFrame = nil audioFrame = frame - lastAbsTime = frame.AbsTime + lastAbsTime = frame.Timestamp break } } - if frame.AbsTime >= s.AudioReader.SkipTs { + if frame.Timestamp >= s.AudioReader.SkipTs { sendAudioFrame(frame) } else { // fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs) diff --git a/track/base.go b/track/base.go index 3f83ef8..7f7701a 100644 --- a/track/base.go +++ b/track/base.go @@ -11,20 +11,20 @@ import ( ) type 流速控制 struct { - 起始时间戳 uint32 - 起始时间 time.Time + 起始时间戳 time.Duration 等待上限 time.Duration + 起始时间 time.Time } -func (p *流速控制) 重置(绝对时间戳 uint32) { +func (p *流速控制) 重置(绝对时间戳 time.Duration) { p.起始时间 = time.Now() p.起始时间戳 = 绝对时间戳 // println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳) } -func (p *流速控制) 时间戳差(绝对时间戳 uint32) time.Duration { - return time.Duration(绝对时间戳-p.起始时间戳) * time.Millisecond +func (p *流速控制) 时间戳差(绝对时间戳 time.Duration) time.Duration { + return 绝对时间戳 - p.起始时间戳 } -func (p *流速控制) 控制流速(绝对时间戳 uint32) { +func (p *流速控制) 控制流速(绝对时间戳 time.Duration) { 数据时间差, 实际时间差 := p.时间戳差(绝对时间戳), time.Since(p.起始时间) // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05")) // if 实际时间差 > 数据时间差 { @@ -83,10 +83,9 @@ type Media struct { RtpPool util.Pool[RTPFrame] `json:"-"` SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) SequenceHeadSeq int - RTPMuxer RTPDemuxer SpesificTrack `json:"-"` - deltaTs int64 //用于接续发布后时间戳连续 + deltaTs time.Duration //用于接续发布后时间戳连续 流速控制 } @@ -103,20 +102,6 @@ func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) { return } -// 毫秒转换为Mpeg时间戳 -func (av *Media) Ms2MpegTs(ms uint32) uint32 { - return uint32(uint64(ms) * 90) -} - -// Mpeg时间戳转换为毫秒 -func (av *Media) MpegTs2Ms(mpegTs uint32) uint32 { - return uint32(uint64(mpegTs) / 90) -} - -func (av *Media) MpegTs2RTPTs(mpegTs uint32) uint32 { - return uint32(uint64(mpegTs) * uint64(av.SampleRate) / 90000) -} - // 为json序列化而计算的数据 func (av *Media) SnapForJson() { v := av.LastValue @@ -158,7 +143,7 @@ func (av *Media) SetStuff(stuff ...any) { } func (av *Media) LastWriteTime() time.Time { - return av.LastValue.Timestamp + return av.LastValue.WriteTime } // func (av *Media) Play(ctx context.Context, onMedia func(*AVFrame) error) error { @@ -180,8 +165,8 @@ func (av *Media) PreFrame() *AVFrame { } func (av *Media) generateTimestamp(ts uint32) { - av.Value.PTS = ts - av.Value.DTS = ts + av.Value.PTS = time.Duration(ts) + av.Value.DTS = time.Duration(ts) } func (av *Media) WriteSequenceHead(sh []byte) { @@ -221,17 +206,17 @@ func (av *Media) Flush() { curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next() if av.State == TrackStateOffline { av.State = TrackStateOnline - av.deltaTs = int64(preValue.AbsTime) - int64(curValue.AbsTime) + int64(preValue.DeltaTime) + av.deltaTs = preValue.Timestamp - curValue.Timestamp + time.Duration(preValue.DeltaTime)*time.Millisecond av.Info("track back online") } if av.deltaTs != 0 { - rtpts := int64(av.deltaTs) * 90 - curValue.DTS = uint32(int64(curValue.DTS) + rtpts) - curValue.PTS = uint32(int64(curValue.PTS) + rtpts) - curValue.AbsTime = 0 + rtpts := av.deltaTs * 90 / 1000 + curValue.DTS = curValue.DTS + rtpts + curValue.PTS = curValue.PTS + rtpts + curValue.Timestamp = 0 } bufferTime := av.Stream.GetPublisherConfig().BufferTime - if bufferTime > 0 && av.IDRingList.Length > 1 && time.Duration(curValue.AbsTime-av.IDRingList.Next.Next.Value.Value.AbsTime)*time.Millisecond > bufferTime { + if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime { av.ShiftIDR() av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence)) } @@ -247,17 +232,17 @@ func (av *Media) Flush() { if av.起始时间.IsZero() { curValue.DeltaTime = 0 - if curValue.AbsTime == 0 { - curValue.AbsTime = uint32(time.Since(av.Stream.GetStartTime()).Milliseconds()) + if curValue.Timestamp == 0 { + curValue.Timestamp = time.Since(av.Stream.GetStartTime()) } - av.重置(curValue.AbsTime) - } else if curValue.AbsTime == 0 { - curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90 - curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime + av.重置(curValue.Timestamp) } else { - curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime + if curValue.Timestamp == 0 { + curValue.Timestamp = (preValue.Timestamp*90 + (curValue.DTS-preValue.DTS)*time.Millisecond) / 90 + } + curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond) } - // fmt.Println(av.Name, curValue.DTS, curValue.AbsTime, curValue.DeltaTime) + // fmt.Println(av.Name, curValue.DTS, curValue.Timestamp, curValue.DeltaTime) if curValue.AUList.Length > 0 { // 补完RTP if config.Global.EnableRTP && curValue.RTP.Length == 0 { @@ -270,7 +255,7 @@ func (av *Media) Flush() { } av.Base.Flush(&curValue.BaseFrame) if av.等待上限 > 0 { - av.控制流速(curValue.AbsTime) + av.控制流速(curValue.Timestamp) } preValue = curValue curValue = av.MoveNext() diff --git a/track/data.go b/track/data.go index 59de88e..3bf7e50 100644 --- a/track/data.go +++ b/track/data.go @@ -22,7 +22,7 @@ func (d *Data) ReadRing() *LockRing[any] { } func (d *Data) LastWriteTime() time.Time { - return d.LockRing.RingBuffer.LastValue.Timestamp + return d.LockRing.RingBuffer.LastValue.WriteTime } func (dt *Data) Push(data any) { diff --git a/track/h264.go b/track/h264.go index 39dfdfe..88bbc34 100644 --- a/track/h264.go +++ b/track/h264.go @@ -32,8 +32,11 @@ func (vt *H264) WriteSliceBytes(slice []byte) { // vt.Info("naluType", zap.Uint8("naluType", naluType.Byte())) switch naluType { case codec.NALU_SPS: - vt.SPSInfo, _ = codec.ParseSPS(slice) - vt.Debug("SPS", zap.Any("SPSInfo", vt.SPSInfo)) + spsInfo, _ := codec.ParseSPS(slice) + if spsInfo.Width != vt.SPSInfo.Width || spsInfo.Height != vt.SPSInfo.Height { + vt.Debug("SPS", zap.Any("SPSInfo", spsInfo)) + } + vt.SPSInfo = spsInfo vt.Video.SPS = slice vt.ParamaterSets[0] = slice case codec.NALU_PPS: @@ -132,44 +135,40 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { } } } - frame.SequenceNumber += vt.rtpSequence //增加偏移,需要增加rtp包后需要顺延 - if frame.Marker { + if frame.Marker && rv.AUList.ByteLength > 0 { vt.generateTimestamp(frame.Timestamp) + if !vt.dcChanged && rv.IFrame { + vt.insertDCRtp() + } vt.Flush() } } // RTP格式补完 func (vt *H264) CompleteRTP(value *AVFrame) { - if value.RTP.Length > 0 { - if !vt.dcChanged && value.IFrame { - vt.insertDCRtp() - } - } else { - var out [][][]byte - if value.IFrame { - out = append(out, [][]byte{vt.SPS}, [][]byte{vt.PPS}) - } - vt.Value.AUList.Range(func(au *util.BLL) bool { - if au.ByteLength < RTPMTU { - out = append(out, au.ToBuffers()) - } else { - var naluType codec.H264NALUType - r := au.NewReader() - b0, _ := r.ReadByte() - naluType = naluType.Parse(b0) - b0 = codec.NALU_FUA.Or(b0 & 0x60) - buf := [][]byte{{b0, naluType.Or(1 << 7)}} - buf = append(buf, r.ReadN(RTPMTU-2)...) - out = append(out, buf) - for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { - buf = append([][]byte{{b0, naluType.Byte()}}, bufs...) - out = append(out, buf) - } - buf[0][1] |= 1 << 6 // set end bit - } - return true - }) - vt.PacketizeRTP(out...) + var out [][][]byte + if value.IFrame { + out = append(out, [][]byte{vt.SPS}, [][]byte{vt.PPS}) } + vt.Value.AUList.Range(func(au *util.BLL) bool { + if au.ByteLength < RTPMTU { + out = append(out, au.ToBuffers()) + } else { + var naluType codec.H264NALUType + r := au.NewReader() + b0, _ := r.ReadByte() + naluType = naluType.Parse(b0) + b0 = codec.NALU_FUA.Or(b0 & 0x60) + buf := [][]byte{{b0, naluType.Or(1 << 7)}} + buf = append(buf, r.ReadN(RTPMTU-2)...) + out = append(out, buf) + for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { + buf = append([][]byte{{b0, naluType.Byte()}}, bufs...) + out = append(out, buf) + } + buf[0][1] |= 1 << 6 // set end bit + } + return true + }) + vt.PacketizeRTP(out...) } diff --git a/track/h265.go b/track/h265.go index 29e5b70..fa6c4e2 100644 --- a/track/h265.go +++ b/track/h265.go @@ -38,7 +38,11 @@ func (vt *H265) WriteSliceBytes(slice []byte) { case codec.NAL_UNIT_SPS: vt.SPS = slice vt.ParamaterSets[1] = slice - vt.SPSInfo, _ = codec.ParseHevcSPS(slice) + spsInfo, _ := codec.ParseHevcSPS(slice) + if spsInfo.Width != vt.SPSInfo.Width || spsInfo.Height != vt.SPSInfo.Height { + vt.Debug("SPS", zap.Any("SPSInfo", spsInfo)) + } + vt.SPSInfo = spsInfo case codec.NAL_UNIT_PPS: vt.PPS = slice vt.ParamaterSets[2] = slice @@ -112,6 +116,9 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) { } } case codec.NAL_UNIT_RTP_FU: + if !buffer.CanReadN(3) { + return + } first3 := buffer.ReadN(3) fuHeader := first3[2] if usingDonlField { @@ -120,49 +127,47 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) { if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) { vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1]) } - rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(buffer)) + if rv.AUList.Pre != nil { + rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(buffer)) + } default: vt.WriteSliceBytes(frame.Payload) } - frame.SequenceNumber += vt.rtpSequence //增加偏移,需要增加rtp包后需要顺延 if frame.Marker { vt.generateTimestamp(frame.Timestamp) + if !vt.dcChanged && rv.IFrame { + vt.insertDCRtp() + } vt.Flush() } } // RTP格式补完 func (vt *H265) CompleteRTP(value *AVFrame) { - if value.RTP.Length > 0 { - if !vt.dcChanged && value.IFrame { - vt.insertDCRtp() - } - } else { - // H265打包: https://blog.csdn.net/fanyun_01/article/details/114234290 - var out [][][]byte - if value.IFrame { - out = append(out, [][]byte{vt.VPS}, [][]byte{vt.SPS}, [][]byte{vt.PPS}) - } - for au := vt.Value.AUList.Next; au != nil && au != &vt.Value.AUList.ListItem; au = au.Next { - if au.Value.ByteLength < RTPMTU { - out = append(out, au.Value.ToBuffers()) - } else { - var naluType codec.H265NALUType - r := au.Value.NewReader() - b0, _ := r.ReadByte() - b1, _ := r.ReadByte() - naluType = naluType.Parse(b0) - b0 = (byte(codec.NAL_UNIT_RTP_FU) << 1) | (b0 & 0b10000001) - buf := [][]byte{{b0, b1, (1 << 7) | byte(naluType)}} - buf = append(buf, r.ReadN(RTPMTU-3)...) - out = append(out, buf) - for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { - buf = append([][]byte{{b0, b1, byte(naluType)}}, bufs...) - out = append(out, buf) - } - buf[0][2] |= 1 << 6 // set end bit - } - } - vt.PacketizeRTP(out...) + // H265打包: https://blog.csdn.net/fanyun_01/article/details/114234290 + var out [][][]byte + if value.IFrame { + out = append(out, [][]byte{vt.VPS}, [][]byte{vt.SPS}, [][]byte{vt.PPS}) } + for au := vt.Value.AUList.Next; au != nil && au != &vt.Value.AUList.ListItem; au = au.Next { + if au.Value.ByteLength < RTPMTU { + out = append(out, au.Value.ToBuffers()) + } else { + var naluType codec.H265NALUType + r := au.Value.NewReader() + b0, _ := r.ReadByte() + b1, _ := r.ReadByte() + naluType = naluType.Parse(b0) + b0 = (byte(codec.NAL_UNIT_RTP_FU) << 1) | (b0 & 0b10000001) + buf := [][]byte{{b0, b1, (1 << 7) | byte(naluType)}} + buf = append(buf, r.ReadN(RTPMTU-3)...) + out = append(out, buf) + for bufs := r.ReadN(RTPMTU); len(bufs) > 0; bufs = r.ReadN(RTPMTU) { + buf = append([][]byte{{b0, b1, byte(naluType)}}, bufs...) + out = append(out, buf) + } + buf[0][2] |= 1 << 6 // set end bit + } + } + vt.PacketizeRTP(out...) } diff --git a/track/reader-av.go b/track/reader-av.go index bb7ac26..ba98bcb 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -23,14 +23,14 @@ type AVRingReader struct { Poll time.Duration State byte FirstSeq uint32 - FirstTs uint32 - SkipTs uint32 - SkipRTPTs uint32 - beforeJump uint32 + FirstTs time.Duration + SkipTs time.Duration //ms + beforeJump time.Duration ConfSeq int startTime time.Time Frame *common.AVFrame AbsTime uint32 + Delay uint32 *zap.Logger } @@ -96,12 +96,11 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { } r.startTime = time.Now() if r.FirstTs == 0 { - r.FirstTs = r.Frame.AbsTime + r.FirstTs = r.Frame.Timestamp } r.SkipTs = r.FirstTs - r.SkipRTPTs = r.Track.Ms2MpegTs(r.SkipTs) r.FirstSeq = r.Frame.Sequence - r.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) + r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) case READSTATE_FIRST: if r.Track.IDRing.Value.Sequence != r.FirstSeq { r.Ring = r.Track.IDRing @@ -109,14 +108,13 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { if err = r.ctx.Err(); err != nil { return } - r.SkipTs = frame.AbsTime - r.beforeJump - r.SkipRTPTs = r.Track.Ms2MpegTs(r.SkipTs) - r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs)) + r.SkipTs = frame.Timestamp - r.beforeJump + r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs)) r.State = READSTATE_NORMAL } else { r.MoveNext() frame := r.ReadFrame() - r.beforeJump = frame.AbsTime - r.FirstTs + r.beforeJump = frame.Timestamp - r.FirstTs // 防止过快消费 if fast := time.Duration(r.beforeJump)*time.Millisecond - time.Since(r.startTime); fast > 0 && fast < time.Second { time.Sleep(fast) @@ -126,12 +124,18 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.MoveNext() r.ReadFrame() } - r.AbsTime = r.Frame.AbsTime - r.SkipTs + r.AbsTime = uint32((r.Frame.Timestamp - r.SkipTs).Milliseconds()) + r.Delay = uint32((r.Track.LastValue.Timestamp - r.Frame.Timestamp).Milliseconds()) // println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime) return } - +func (r *AVRingReader) GetPTS32() uint32 { + return uint32((r.Frame.PTS - r.SkipTs * 90 / time.Millisecond)) +} +func (r *AVRingReader) GetDTS32() uint32 { + return uint32((r.Frame.DTS - r.SkipTs * 90 / time.Millisecond)) +} func (r *AVRingReader) ResetAbsTime() { - r.SkipTs = r.Frame.AbsTime + r.SkipTs = r.Frame.Timestamp r.AbsTime = 0 } diff --git a/track/rtp.go b/track/rtp.go index 93543b9..3778044 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -1,6 +1,8 @@ package track import ( + "time" + "github.com/pion/rtp" "go.uber.org/zap" . "m7s.live/engine/v4/common" @@ -40,15 +42,13 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) { func (av *Media) PacketizeRTP(payloads ...[][]byte) { packetCount := len(payloads) for i, pp := range payloads { - av.rtpSequence++ rtpItem := av.GetRTPFromPool() packet := &rtpItem.Value packet.Payload = packet.Payload[:0] - packet.SequenceNumber = av.rtpSequence if av.SampleRate != 90000 { - packet.Timestamp = uint32(uint64(av.SampleRate) * uint64(av.Value.PTS) / 90000) + packet.Timestamp = uint32(time.Duration(av.SampleRate) * av.Value.PTS / 90000) } else { - packet.Timestamp = av.Value.PTS + packet.Timestamp = uint32(av.Value.PTS) } packet.Marker = i == packetCount-1 for _, p := range pp { @@ -85,7 +85,3 @@ func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) (frame *util.Li av.lastSeq = frame.Value.SequenceNumber return } - -type RTPMuxer struct { - rtpSequence uint16 //用于生成下一个rtp包的序号 -} diff --git a/track/video.go b/track/video.go index 9e712ef..08a91fa 100644 --- a/track/video.go +++ b/track/video.go @@ -4,6 +4,9 @@ import ( // . "github.com/logrusorgru/aurora" + "time" + + "github.com/pion/rtp" "go.uber.org/zap" "m7s.live/engine/v4/codec" "m7s.live/engine/v4/common" @@ -95,8 +98,8 @@ func (vt *Video) WriteNalu(pts uint32, dts uint32, nalu []byte) { if dts == 0 { vt.generateTimestamp(pts) } else { - vt.Value.PTS = pts - vt.Value.DTS = dts + vt.Value.PTS = time.Duration(pts) + vt.Value.DTS = time.Duration(dts) } vt.Value.BytesIn += len(nalu) vt.WriteSliceBytes(nalu) @@ -106,8 +109,8 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) { if dts == 0 { vt.generateTimestamp(pts) } else { - vt.Value.PTS = pts - vt.Value.DTS = dts + vt.Value.PTS = time.Duration(pts) + vt.Value.DTS = time.Duration(dts) } vt.Value.BytesIn += len(frame) common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2) @@ -130,8 +133,8 @@ func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (e error) { if err != nil { return err } - vt.Value.PTS = vt.Ms2MpegTs(ts + cts) - vt.Value.DTS = vt.Ms2MpegTs(ts) + vt.Value.PTS = time.Duration(ts+cts) * 90 + vt.Value.DTS = time.Duration(ts) * 90 // println(":", vt.Value.Sequence) var nalulen uint32 for nalulen, e = r.ReadBE(vt.nalulenSize); e == nil; nalulen, e = r.ReadBE(vt.nalulenSize) { @@ -184,28 +187,21 @@ func (vt *Video) WriteSliceByte(b ...byte) { // 在I帧前面插入sps pps webrtc需要 func (vt *Video) insertDCRtp() { head := vt.Value.RTP.Next - seq := head.Value.SequenceNumber for _, nalu := range vt.ParamaterSets { - var packet RTPFrame + var packet rtp.Packet packet.Version = 2 packet.PayloadType = vt.PayloadType packet.Payload = nalu packet.SSRC = vt.SSRC - packet.Timestamp = vt.Value.PTS + packet.Timestamp = uint32(vt.Value.PTS) packet.Marker = false - head.InsertBeforeValue(packet) - vt.rtpSequence++ + head.InsertBeforeValue(RTPFrame{&packet, nil}) } - vt.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool { - item.Value.SequenceNumber = seq - seq++ - return true - }) } func (vt *Video) generateTimestamp(ts uint32) { - vt.Value.PTS = ts - vt.Value.DTS = vt.dtsEst.Feed(ts) + vt.Value.PTS = time.Duration(ts) + vt.Value.DTS = time.Duration(vt.dtsEst.Feed(ts)) } func (vt *Video) SetLostFlag() { @@ -222,7 +218,7 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { b[1] = 1 // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS) // 写入CTS - util.PutBE(b[2:5], vt.MpegTs2Ms(rv.PTS-rv.DTS)) + util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90) rv.AVCC.Push(mem) // if rv.AVCC.ByteLength != 5 { // panic("error")