提高时间戳精度,修复rtsp写入和rtp补完问题

This commit is contained in:
dexter
2023-03-23 09:36:49 +08:00
parent 7e42a85ebf
commit 336b0ae759
17 changed files with 363 additions and 227 deletions

View File

@@ -37,7 +37,7 @@ type MpegPsStream struct {
video MpegPsEsStream
}
func (ps *MpegPsStream) Drop() {
func (ps *MpegPsStream) Reset() {
ps.buffer.Reset()
ps.audio.Reset()
if ps.video.Buffer.CanRead() {

View File

@@ -166,7 +166,7 @@ func (psdemuxer *PSDemuxer) Feed(data []byte) error {
return ret
}
func (psdemuxer *PSDemuxer) Drop() {
func (psdemuxer *PSDemuxer) Reset() {
psdemuxer.cache = psdemuxer.cache[:0]
for _, stream := range psdemuxer.streamMap {
if len(stream.streamBuf) == 0 {

View File

@@ -58,10 +58,12 @@ func (d *DTSEstimator) Feed(pts uint32) uint32 {
dts = d.cache[0]
}
if d.prevDTS > dts {
dts = d.prevDTS
// if d.prevDTS > dts {
// dts = d.prevDTS
// }
if d.prevDTS >= dts {
dts = d.prevDTS + 90
}
d.prevPTS = pts
d.prevDTS = dts
return dts

View File

@@ -46,8 +46,7 @@ func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
type BaseFrame struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒
AbsTime uint32 // 绝对时间戳,毫秒
Timestamp time.Time // 写入时间,可用于比较两个帧的先后
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
}
@@ -61,8 +60,9 @@ type AVFrame struct {
BaseFrame
IFrame bool
CanRead bool `json:"-"`
PTS uint32
DTS uint32
PTS time.Duration
DTS time.Duration
Timestamp time.Duration // 绝对时间戳
ADTS *util.ListItem[util.Buffer] `json:"-"` // ADTS头
AVCC util.BLL `json:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
RTP util.List[RTPFrame] `json:"-"`
@@ -73,7 +73,7 @@ func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {
if ts == 0 {
ts = 1
}
av.AbsTime = ts
av.Timestamp = time.Duration(ts) * time.Millisecond
av.BytesIn += frame.ByteLength
for {
item := frame.Shift()
@@ -96,7 +96,7 @@ func (av *AVFrame) Reset() {
av.ADTS = nil
}
av.BytesIn = 0
av.AbsTime = 0
av.Timestamp = 0
av.DeltaTime = 0
}

View File

@@ -68,7 +68,7 @@ func (bt *Base) SnapForJson() {
}
func (bt *Base) Flush(bf *BaseFrame) {
bt.ComputeBPS(bf.BytesIn)
bf.Timestamp = time.Now()
bf.WriteTime = time.Now()
}
func (bt *Base) SetStuff(stuff ...any) {
for _, s := range stuff {

58
http.go
View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"
"os"
"strings"
"time"
"gopkg.in/yaml.v3"
@@ -227,12 +228,35 @@ func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Requ
default:
pub.ACodec = codec.CodecID_AAC
}
pub.DumpFile = dumpFile
ss := strings.Split(dumpFile, ",")
if len(ss) > 1 {
if err := Engine.Publish(streamPath, &pub); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
for _, s := range ss {
f, err := os.Open(s)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go pub.Feed(f)
}
w.Write([]byte("ok"))
}
} else {
f, err := os.Open(dumpFile)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := Engine.Publish(streamPath, &pub); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
pub.SetIO(f)
w.Write([]byte("ok"))
go pub.Feed(f)
}
}
}
func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request) {
@@ -245,12 +269,42 @@ func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request)
if dumpFile == "" {
dumpFile = streamPath + ".ts"
}
f, err := os.Open(dumpFile)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var pub TSPublisher
if err := Engine.Publish(streamPath, &pub); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
f, _ := os.Open(dumpFile)
pub.SetIO(f)
go pub.Feed(f)
w.Write([]byte("ok"))
}
}
func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
streamPath := q.Get("streamPath")
if streamPath == "" {
streamPath = "dump/mp4"
}
dumpFile := q.Get("dump")
if dumpFile == "" {
dumpFile = streamPath + ".mp4"
}
var pub MP4Publisher
f, err := os.Open(dumpFile)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := Engine.Publish(streamPath, &pub); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
pub.SetIO(f)
w.Write([]byte("ok"))
go pub.ReadMP4Data(f)
}
}

61
publisher-mp4.go Normal file
View File

@@ -0,0 +1,61 @@
package engine
import (
"io"
"github.com/yapingcat/gomedia/go-mp4"
"go.uber.org/zap"
"m7s.live/engine/v4/track"
)
type MP4Publisher struct {
Publisher
*mp4.MovDemuxer `json:"-"`
}
// Start reading the MP4 file
func (p *MP4Publisher) ReadMP4Data(source io.ReadSeeker) error {
defer p.Stop()
p.MovDemuxer = mp4.CreateMp4Demuxer(source)
if tracks, err := p.ReadHead(); err != nil {
if err == io.EOF {
p.Info("Reached end of MP4 file")
return nil
}
p.Error("Error reading MP4 header", zap.Error(err))
return err
} else {
info := p.GetMp4Info()
p.Info("MP4 info", zap.Any("info", info))
for _, t := range tracks {
p.Info("MP4 track", zap.Any("track", t))
switch t.Cid {
case mp4.MP4_CODEC_H264:
p.VideoTrack = track.NewH264(p.Stream)
case mp4.MP4_CODEC_H265:
p.VideoTrack = track.NewH265(p.Stream)
case mp4.MP4_CODEC_AAC:
p.AudioTrack = track.NewAAC(p.Stream)
case mp4.MP4_CODEC_G711A:
p.AudioTrack = track.NewG711(p.Stream, true)
case mp4.MP4_CODEC_G711U:
p.AudioTrack = track.NewG711(p.Stream, false)
}
}
for {
pkg, err := p.ReadPacket()
if err != nil {
p.Error("Error reading MP4 packet", zap.Error(err))
return err
}
switch pkg.Cid {
case mp4.MP4_CODEC_H264, mp4.MP4_CODEC_H265:
p.VideoTrack.WriteAnnexB(uint32(pkg.Pts), uint32(pkg.Dts), pkg.Data)
case mp4.MP4_CODEC_AAC:
p.AudioTrack.WriteADTS(uint32(pkg.Pts), pkg.Data)
case mp4.MP4_CODEC_G711A, mp4.MP4_CODEC_G711U:
p.AudioTrack.WriteRaw(uint32(pkg.Pts), pkg.Data)
}
}
}
}

View File

@@ -32,16 +32,14 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) {
if p.Stream == nil {
return
}
if p.EsHandler == nil {
if p.pool == nil {
// p.PSDemuxer = mpegps.NewPSDemuxer()
// p.PSDemuxer.OnPacket = p.OnPacket
// p.PSDemuxer.OnFrame = p.OnFrame
p.EsHandler = p
p.lastSeq = rtp.SequenceNumber - 1
if p.pool == nil {
p.pool = make(util.BytesPool, 17)
}
}
if p.DisableReorder {
p.Feed(rtp.Payload)
p.lastSeq = rtp.SequenceNumber

View File

@@ -2,8 +2,10 @@ package engine
import (
"os"
"sync"
"time"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/pion/webrtc/v3/pkg/media/rtpdump"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
@@ -14,56 +16,84 @@ import (
type RTPDumpPublisher struct {
Publisher
DumpFile string
VCodec codec.VideoCodecID
ACodec codec.AudioCodecID
file *os.File
other *rtpdump.Packet
sync.Mutex
}
func (t *RTPDumpPublisher) OnEvent(event any) {
var err error
t.Publisher.OnEvent(event)
switch event.(type) {
case IPublisher:
t.file, err = os.Open(t.DumpFile)
func (t *RTPDumpPublisher) Feed(file *os.File) {
r, h, err := rtpdump.NewReader(file)
if err != nil {
t.Stream.Error("RTPDumpPublisher open file error", zap.Error(err))
return
}
r, h, err := rtpdump.NewReader(t.file)
if err != nil {
t.Stream.Error("RTPDumpPublisher open file error", zap.Error(err))
return
}
t.Stream.Info("RTPDumpPublisher open file success", zap.String("file", t.DumpFile), zap.String("start", h.Start.String()), zap.String("source", h.Source.String()), zap.Uint16("port", h.Port))
t.Stream.Info("RTPDumpPublisher open file success", zap.String("file", file.Name()), zap.String("start", h.Start.String()), zap.String("source", h.Source.String()), zap.Uint16("port", h.Port))
if t.VideoTrack == nil {
switch t.VCodec {
case codec.CodecID_H264:
t.VideoTrack = track.NewH264(t.Publisher.Stream)
case codec.CodecID_H265:
t.VideoTrack = track.NewH265(t.Publisher.Stream)
}
t.VideoTrack.SetSpeedLimit(500 * time.Millisecond)
}
if t.AudioTrack == nil {
switch t.ACodec {
case codec.CodecID_AAC:
t.AudioTrack = track.NewAAC(t.Publisher.Stream)
at := track.NewAAC(t.Publisher.Stream)
t.AudioTrack = at
var c mpeg4audio.Config
c.ChannelCount = 2
c.SampleRate = 48000
asc, _ := c.Marshal()
at.WriteSequenceHead(append([]byte{0xAF, 0x00}, asc...))
case codec.CodecID_PCMA:
t.AudioTrack = track.NewG711(t.Publisher.Stream, true)
case codec.CodecID_PCMU:
t.AudioTrack = track.NewG711(t.Publisher.Stream, false)
}
t.VideoTrack.SetSpeedLimit(500 * time.Millisecond)
t.AudioTrack.SetSpeedLimit(500 * time.Millisecond)
}
needLock := true
for {
packet, err := r.Next()
if err != nil {
t.Stream.Error("RTPDumpPublisher read file error", zap.Error(err))
return
}
if !packet.IsRTCP {
if packet.IsRTCP {
continue
}
if needLock {
t.Lock()
}
if t.other == nil {
t.other = &packet
t.Unlock()
needLock = true
continue
}
if packet.Offset > t.other.Offset {
t.WriteRTP(t.other.Payload)
t.other = &packet
t.Unlock()
needLock = true
continue
}
needLock = false
t.WriteRTP(packet.Payload)
}
}
func (t *RTPDumpPublisher) WriteRTP(raw []byte) {
var frame common.RTPFrame
frame.Unmarshal(packet.Payload)
frame.Unmarshal(raw)
switch frame.PayloadType {
case 96:
t.VideoTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame})
}
// t.AudioTrack.WriteRTP(packet)
}
case 97:
t.AudioTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame})
default:
t.Stream.Warn("RTPDumpPublisher unknown payload type", zap.Uint8("payloadType", frame.PayloadType))
}
}

View File

@@ -5,6 +5,7 @@ import (
"io"
"net"
"strconv"
"time"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
@@ -206,11 +207,11 @@ func (s *Subscriber) PlayBlock(subType byte) {
case SUBTYPE_RAW:
sendVideoFrame = func(frame *AVFrame) {
// println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs})
spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()})
}
sendAudioFrame = func(frame *AVFrame) {
// println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, frame.PTS - s.AudioReader.SkipRTPTs, frame.PTS - s.AudioReader.SkipRTPTs})
spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()})
}
case SUBTYPE_RTP:
var videoSeq, audioSeq uint16
@@ -218,7 +219,9 @@ func (s *Subscriber) PlayBlock(subType byte) {
// fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
frame.RTP.Range(func(vp RTPFrame) bool {
videoSeq++
vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipRTPTs
copy := *vp.Packet
vp.Packet = &copy
vp.Header.Timestamp = vp.Header.Timestamp - uint32(s.VideoReader.SkipTs*90/time.Millisecond)
vp.Header.SequenceNumber = videoSeq
spesic.OnEvent((VideoRTP)(vp))
return true
@@ -229,8 +232,10 @@ func (s *Subscriber) PlayBlock(subType byte) {
// fmt.Println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
frame.RTP.Range(func(ap RTPFrame) bool {
audioSeq++
copy := *ap.Packet
ap.Packet = &copy
ap.Header.SequenceNumber = audioSeq
ap.Header.Timestamp = ap.Header.Timestamp - s.AudioReader.Track.MpegTs2RTPTs(s.AudioReader.SkipRTPTs)
ap.Header.Timestamp = ap.Header.Timestamp - uint32(s.AudioReader.SkipTs/time.Millisecond*time.Duration(s.AudioReader.Track.SampleRate)/1000)
spesic.OnEvent((AudioRTP)(ap))
return true
})
@@ -254,7 +259,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead)
}
sendVideoFrame = func(frame *AVFrame) {
// println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame)
// fmt.Println(frame.Sequence, s.VideoReader.AbsTime, frame.DeltaTime, frame.IFrame)
// b := util.Buffer(frame.AVCC.ToBytes()[5:])
// for b.CanRead() {
// nalulen := int(b.ReadUint32())
@@ -278,7 +283,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName))
}
var videoFrame, audioFrame *AVFrame
var lastAbsTime uint32
var lastAbsTime time.Duration
for ctx.Err() == nil {
if hasVideo {
@@ -295,17 +300,18 @@ func (s *Subscriber) PlayBlock(subType byte) {
}
if hasAudio {
if audioFrame != nil {
if frame.AbsTime > lastAbsTime {
if frame.Timestamp > lastAbsTime {
// fmt.Println("switch audio", audioFrame.CanRead)
if audioFrame.CanRead {
sendAudioFrame(audioFrame)
}
audioFrame = nil
videoFrame = frame
lastAbsTime = frame.AbsTime
lastAbsTime = frame.Timestamp
break
}
} else if lastAbsTime == 0 {
if lastAbsTime = frame.AbsTime; lastAbsTime != 0 {
if lastAbsTime = frame.Timestamp; lastAbsTime != 0 {
videoFrame = frame
break
}
@@ -330,7 +336,6 @@ func (s *Subscriber) PlayBlock(subType byte) {
case track.READSTATE_NORMAL:
if s.Video != nil {
s.AudioReader.SkipTs = s.VideoReader.SkipTs
s.AudioReader.SkipRTPTs = s.AudioReader.Track.Ms2MpegTs(s.AudioReader.SkipTs)
}
}
s.AudioReader.Read(ctx, subMode)
@@ -344,17 +349,18 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendAudioDecConf()
}
if hasVideo && videoFrame != nil {
if frame.AbsTime > lastAbsTime {
if frame.Timestamp > lastAbsTime {
// fmt.Println("switch video", videoFrame.CanRead)
if videoFrame.CanRead {
sendVideoFrame(videoFrame)
}
videoFrame = nil
audioFrame = frame
lastAbsTime = frame.AbsTime
lastAbsTime = frame.Timestamp
break
}
}
if frame.AbsTime >= s.AudioReader.SkipTs {
if frame.Timestamp >= s.AudioReader.SkipTs {
sendAudioFrame(frame)
} else {
// fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs)

View File

@@ -11,20 +11,20 @@ import (
)
type 流速控制 struct {
起始时间戳 uint32
起始时间 time.Time
起始时间戳 time.Duration
等待上限 time.Duration
起始时间 time.Time
}
func (p *流速控制) 重置(绝对时间戳 uint32) {
func (p *流速控制) 重置(绝对时间戳 time.Duration) {
p.起始时间 = time.Now()
p.起始时间戳 = 绝对时间戳
// println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳)
}
func (p *流速控制) 时间戳差(绝对时间戳 uint32) time.Duration {
return time.Duration(绝对时间戳-p.起始时间戳) * time.Millisecond
func (p *流速控制) 时间戳差(绝对时间戳 time.Duration) time.Duration {
return 绝对时间戳 - p.起始时间戳
}
func (p *流速控制) 控制流速(绝对时间戳 uint32) {
func (p *流速控制) 控制流速(绝对时间戳 time.Duration) {
数据时间差, 实际时间差 := p.时间戳差(绝对时间戳), time.Since(p.起始时间)
// println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05"))
// if 实际时间差 > 数据时间差 {
@@ -83,10 +83,9 @@ type Media struct {
RtpPool util.Pool[RTPFrame] `json:"-"`
SequenceHead []byte `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
SequenceHeadSeq int
RTPMuxer
RTPDemuxer
SpesificTrack `json:"-"`
deltaTs int64 //用于接续发布后时间戳连续
deltaTs time.Duration //用于接续发布后时间戳连续
流速控制
}
@@ -103,20 +102,6 @@ func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
return
}
// 毫秒转换为Mpeg时间戳
func (av *Media) Ms2MpegTs(ms uint32) uint32 {
return uint32(uint64(ms) * 90)
}
// Mpeg时间戳转换为毫秒
func (av *Media) MpegTs2Ms(mpegTs uint32) uint32 {
return uint32(uint64(mpegTs) / 90)
}
func (av *Media) MpegTs2RTPTs(mpegTs uint32) uint32 {
return uint32(uint64(mpegTs) * uint64(av.SampleRate) / 90000)
}
// 为json序列化而计算的数据
func (av *Media) SnapForJson() {
v := av.LastValue
@@ -158,7 +143,7 @@ func (av *Media) SetStuff(stuff ...any) {
}
func (av *Media) LastWriteTime() time.Time {
return av.LastValue.Timestamp
return av.LastValue.WriteTime
}
// func (av *Media) Play(ctx context.Context, onMedia func(*AVFrame) error) error {
@@ -180,8 +165,8 @@ func (av *Media) PreFrame() *AVFrame {
}
func (av *Media) generateTimestamp(ts uint32) {
av.Value.PTS = ts
av.Value.DTS = ts
av.Value.PTS = time.Duration(ts)
av.Value.DTS = time.Duration(ts)
}
func (av *Media) WriteSequenceHead(sh []byte) {
@@ -221,17 +206,17 @@ func (av *Media) Flush() {
curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next()
if av.State == TrackStateOffline {
av.State = TrackStateOnline
av.deltaTs = int64(preValue.AbsTime) - int64(curValue.AbsTime) + int64(preValue.DeltaTime)
av.deltaTs = preValue.Timestamp - curValue.Timestamp + time.Duration(preValue.DeltaTime)*time.Millisecond
av.Info("track back online")
}
if av.deltaTs != 0 {
rtpts := int64(av.deltaTs) * 90
curValue.DTS = uint32(int64(curValue.DTS) + rtpts)
curValue.PTS = uint32(int64(curValue.PTS) + rtpts)
curValue.AbsTime = 0
rtpts := av.deltaTs * 90 / 1000
curValue.DTS = curValue.DTS + rtpts
curValue.PTS = curValue.PTS + rtpts
curValue.Timestamp = 0
}
bufferTime := av.Stream.GetPublisherConfig().BufferTime
if bufferTime > 0 && av.IDRingList.Length > 1 && time.Duration(curValue.AbsTime-av.IDRingList.Next.Next.Value.Value.AbsTime)*time.Millisecond > bufferTime {
if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime {
av.ShiftIDR()
av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
}
@@ -247,17 +232,17 @@ func (av *Media) Flush() {
if av.起始时间.IsZero() {
curValue.DeltaTime = 0
if curValue.AbsTime == 0 {
curValue.AbsTime = uint32(time.Since(av.Stream.GetStartTime()).Milliseconds())
if curValue.Timestamp == 0 {
curValue.Timestamp = time.Since(av.Stream.GetStartTime())
}
av.重置(curValue.AbsTime)
} else if curValue.AbsTime == 0 {
curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90
curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime
av.重置(curValue.Timestamp)
} else {
curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime
if curValue.Timestamp == 0 {
curValue.Timestamp = (preValue.Timestamp*90 + (curValue.DTS-preValue.DTS)*time.Millisecond) / 90
}
// fmt.Println(av.Name, curValue.DTS, curValue.AbsTime, curValue.DeltaTime)
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
}
// fmt.Println(av.Name, curValue.DTS, curValue.Timestamp, curValue.DeltaTime)
if curValue.AUList.Length > 0 {
// 补完RTP
if config.Global.EnableRTP && curValue.RTP.Length == 0 {
@@ -270,7 +255,7 @@ func (av *Media) Flush() {
}
av.Base.Flush(&curValue.BaseFrame)
if av.等待上限 > 0 {
av.控制流速(curValue.AbsTime)
av.控制流速(curValue.Timestamp)
}
preValue = curValue
curValue = av.MoveNext()

View File

@@ -22,7 +22,7 @@ func (d *Data) ReadRing() *LockRing[any] {
}
func (d *Data) LastWriteTime() time.Time {
return d.LockRing.RingBuffer.LastValue.Timestamp
return d.LockRing.RingBuffer.LastValue.WriteTime
}
func (dt *Data) Push(data any) {

View File

@@ -32,8 +32,11 @@ func (vt *H264) WriteSliceBytes(slice []byte) {
// vt.Info("naluType", zap.Uint8("naluType", naluType.Byte()))
switch naluType {
case codec.NALU_SPS:
vt.SPSInfo, _ = codec.ParseSPS(slice)
vt.Debug("SPS", zap.Any("SPSInfo", vt.SPSInfo))
spsInfo, _ := codec.ParseSPS(slice)
if spsInfo.Width != vt.SPSInfo.Width || spsInfo.Height != vt.SPSInfo.Height {
vt.Debug("SPS", zap.Any("SPSInfo", spsInfo))
}
vt.SPSInfo = spsInfo
vt.Video.SPS = slice
vt.ParamaterSets[0] = slice
case codec.NALU_PPS:
@@ -132,20 +135,17 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
}
}
}
frame.SequenceNumber += vt.rtpSequence //增加偏移需要增加rtp包后需要顺延
if frame.Marker {
if frame.Marker && rv.AUList.ByteLength > 0 {
vt.generateTimestamp(frame.Timestamp)
if !vt.dcChanged && rv.IFrame {
vt.insertDCRtp()
}
vt.Flush()
}
}
// RTP格式补完
func (vt *H264) CompleteRTP(value *AVFrame) {
if value.RTP.Length > 0 {
if !vt.dcChanged && value.IFrame {
vt.insertDCRtp()
}
} else {
var out [][][]byte
if value.IFrame {
out = append(out, [][]byte{vt.SPS}, [][]byte{vt.PPS})
@@ -172,4 +172,3 @@ func (vt *H264) CompleteRTP(value *AVFrame) {
})
vt.PacketizeRTP(out...)
}
}

View File

@@ -38,7 +38,11 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
case codec.NAL_UNIT_SPS:
vt.SPS = slice
vt.ParamaterSets[1] = slice
vt.SPSInfo, _ = codec.ParseHevcSPS(slice)
spsInfo, _ := codec.ParseHevcSPS(slice)
if spsInfo.Width != vt.SPSInfo.Width || spsInfo.Height != vt.SPSInfo.Height {
vt.Debug("SPS", zap.Any("SPSInfo", spsInfo))
}
vt.SPSInfo = spsInfo
case codec.NAL_UNIT_PPS:
vt.PPS = slice
vt.ParamaterSets[2] = slice
@@ -112,6 +116,9 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
}
}
case codec.NAL_UNIT_RTP_FU:
if !buffer.CanReadN(3) {
return
}
first3 := buffer.ReadN(3)
fuHeader := first3[2]
if usingDonlField {
@@ -120,24 +127,23 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) {
vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1])
}
if rv.AUList.Pre != nil {
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(buffer))
}
default:
vt.WriteSliceBytes(frame.Payload)
}
frame.SequenceNumber += vt.rtpSequence //增加偏移需要增加rtp包后需要顺延
if frame.Marker {
vt.generateTimestamp(frame.Timestamp)
if !vt.dcChanged && rv.IFrame {
vt.insertDCRtp()
}
vt.Flush()
}
}
// RTP格式补完
func (vt *H265) CompleteRTP(value *AVFrame) {
if value.RTP.Length > 0 {
if !vt.dcChanged && value.IFrame {
vt.insertDCRtp()
}
} else {
// H265打包 https://blog.csdn.net/fanyun_01/article/details/114234290
var out [][][]byte
if value.IFrame {
@@ -165,4 +171,3 @@ func (vt *H265) CompleteRTP(value *AVFrame) {
}
vt.PacketizeRTP(out...)
}
}

View File

@@ -23,14 +23,14 @@ type AVRingReader struct {
Poll time.Duration
State byte
FirstSeq uint32
FirstTs uint32
SkipTs uint32
SkipRTPTs uint32
beforeJump uint32
FirstTs time.Duration
SkipTs time.Duration //ms
beforeJump time.Duration
ConfSeq int
startTime time.Time
Frame *common.AVFrame
AbsTime uint32
Delay uint32
*zap.Logger
}
@@ -96,12 +96,11 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
}
r.startTime = time.Now()
if r.FirstTs == 0 {
r.FirstTs = r.Frame.AbsTime
r.FirstTs = r.Frame.Timestamp
}
r.SkipTs = r.FirstTs
r.SkipRTPTs = r.Track.Ms2MpegTs(r.SkipTs)
r.FirstSeq = r.Frame.Sequence
r.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
case READSTATE_FIRST:
if r.Track.IDRing.Value.Sequence != r.FirstSeq {
r.Ring = r.Track.IDRing
@@ -109,14 +108,13 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
if err = r.ctx.Err(); err != nil {
return
}
r.SkipTs = frame.AbsTime - r.beforeJump
r.SkipRTPTs = r.Track.Ms2MpegTs(r.SkipTs)
r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs))
r.SkipTs = frame.Timestamp - r.beforeJump
r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs))
r.State = READSTATE_NORMAL
} else {
r.MoveNext()
frame := r.ReadFrame()
r.beforeJump = frame.AbsTime - r.FirstTs
r.beforeJump = frame.Timestamp - r.FirstTs
// 防止过快消费
if fast := time.Duration(r.beforeJump)*time.Millisecond - time.Since(r.startTime); fast > 0 && fast < time.Second {
time.Sleep(fast)
@@ -126,12 +124,18 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
r.MoveNext()
r.ReadFrame()
}
r.AbsTime = r.Frame.AbsTime - r.SkipTs
r.AbsTime = uint32((r.Frame.Timestamp - r.SkipTs).Milliseconds())
r.Delay = uint32((r.Track.LastValue.Timestamp - r.Frame.Timestamp).Milliseconds())
// println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime)
return
}
func (r *AVRingReader) GetPTS32() uint32 {
return uint32((r.Frame.PTS - r.SkipTs * 90 / time.Millisecond))
}
func (r *AVRingReader) GetDTS32() uint32 {
return uint32((r.Frame.DTS - r.SkipTs * 90 / time.Millisecond))
}
func (r *AVRingReader) ResetAbsTime() {
r.SkipTs = r.Frame.AbsTime
r.SkipTs = r.Frame.Timestamp
r.AbsTime = 0
}

View File

@@ -1,6 +1,8 @@
package track
import (
"time"
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4/common"
@@ -40,15 +42,13 @@ func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
func (av *Media) PacketizeRTP(payloads ...[][]byte) {
packetCount := len(payloads)
for i, pp := range payloads {
av.rtpSequence++
rtpItem := av.GetRTPFromPool()
packet := &rtpItem.Value
packet.Payload = packet.Payload[:0]
packet.SequenceNumber = av.rtpSequence
if av.SampleRate != 90000 {
packet.Timestamp = uint32(uint64(av.SampleRate) * uint64(av.Value.PTS) / 90000)
packet.Timestamp = uint32(time.Duration(av.SampleRate) * av.Value.PTS / 90000)
} else {
packet.Timestamp = av.Value.PTS
packet.Timestamp = uint32(av.Value.PTS)
}
packet.Marker = i == packetCount-1
for _, p := range pp {
@@ -85,7 +85,3 @@ func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) (frame *util.Li
av.lastSeq = frame.Value.SequenceNumber
return
}
type RTPMuxer struct {
rtpSequence uint16 //用于生成下一个rtp包的序号
}

View File

@@ -4,6 +4,9 @@ import (
// . "github.com/logrusorgru/aurora"
"time"
"github.com/pion/rtp"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/common"
@@ -95,8 +98,8 @@ func (vt *Video) WriteNalu(pts uint32, dts uint32, nalu []byte) {
if dts == 0 {
vt.generateTimestamp(pts)
} else {
vt.Value.PTS = pts
vt.Value.DTS = dts
vt.Value.PTS = time.Duration(pts)
vt.Value.DTS = time.Duration(dts)
}
vt.Value.BytesIn += len(nalu)
vt.WriteSliceBytes(nalu)
@@ -106,8 +109,8 @@ func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
if dts == 0 {
vt.generateTimestamp(pts)
} else {
vt.Value.PTS = pts
vt.Value.DTS = dts
vt.Value.PTS = time.Duration(pts)
vt.Value.DTS = time.Duration(dts)
}
vt.Value.BytesIn += len(frame)
common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2)
@@ -130,8 +133,8 @@ func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (e error) {
if err != nil {
return err
}
vt.Value.PTS = vt.Ms2MpegTs(ts + cts)
vt.Value.DTS = vt.Ms2MpegTs(ts)
vt.Value.PTS = time.Duration(ts+cts) * 90
vt.Value.DTS = time.Duration(ts) * 90
// println(":", vt.Value.Sequence)
var nalulen uint32
for nalulen, e = r.ReadBE(vt.nalulenSize); e == nil; nalulen, e = r.ReadBE(vt.nalulenSize) {
@@ -184,28 +187,21 @@ func (vt *Video) WriteSliceByte(b ...byte) {
// 在I帧前面插入sps pps webrtc需要
func (vt *Video) insertDCRtp() {
head := vt.Value.RTP.Next
seq := head.Value.SequenceNumber
for _, nalu := range vt.ParamaterSets {
var packet RTPFrame
var packet rtp.Packet
packet.Version = 2
packet.PayloadType = vt.PayloadType
packet.Payload = nalu
packet.SSRC = vt.SSRC
packet.Timestamp = vt.Value.PTS
packet.Timestamp = uint32(vt.Value.PTS)
packet.Marker = false
head.InsertBeforeValue(packet)
vt.rtpSequence++
head.InsertBeforeValue(RTPFrame{&packet, nil})
}
vt.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool {
item.Value.SequenceNumber = seq
seq++
return true
})
}
func (vt *Video) generateTimestamp(ts uint32) {
vt.Value.PTS = ts
vt.Value.DTS = vt.dtsEst.Feed(ts)
vt.Value.PTS = time.Duration(ts)
vt.Value.DTS = time.Duration(vt.dtsEst.Feed(ts))
}
func (vt *Video) SetLostFlag() {
@@ -222,7 +218,7 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
b[1] = 1
// println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS)
// 写入CTS
util.PutBE(b[2:5], vt.MpegTs2Ms(rv.PTS-rv.DTS))
util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
rv.AVCC.Push(mem)
// if rv.AVCC.ByteLength != 5 {
// panic("error")