mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-19 23:16:27 +08:00
1、引入ClockRate, 用于RTP时间戳处理(MPEG2中的时间戳是90KHZ)
2、修复控制流Sleep传入的时间单位错误 3、优化TS格式解析内存复用,减少内存分配
This commit is contained in:
@@ -104,7 +104,6 @@ const (
|
|||||||
type MpegTsStream struct {
|
type MpegTsStream struct {
|
||||||
PAT MpegTsPAT // PAT表信息
|
PAT MpegTsPAT // PAT表信息
|
||||||
PMT MpegTsPMT // PMT表信息
|
PMT MpegTsPMT // PMT表信息
|
||||||
tsPktBuffer [][]byte // TS包缓存
|
|
||||||
PESChan chan *MpegTsPESPacket
|
PESChan chan *MpegTsPESPacket
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -518,64 +517,60 @@ func (s *MpegTsStream) ReadPMT(packet *MpegTsPacket, pr io.Reader) (err error) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (s *MpegTsStream) Feed(ts io.Reader) error {
|
func (s *MpegTsStream) Feed(ts io.Reader) (err error) {
|
||||||
var frame int64
|
var reader bytes.Reader
|
||||||
var tsPktArr []MpegTsPacket
|
var lr io.LimitedReader
|
||||||
var reader, pr bytes.Reader
|
lr.R = &reader
|
||||||
defer func() {
|
var pesPkt *MpegTsPESPacket
|
||||||
s.tsPktBuffer = s.tsPktBuffer[:0]
|
var tsHeader MpegTsHeader
|
||||||
}()
|
tsData := make([]byte, TS_PACKET_SIZE)
|
||||||
for {
|
for {
|
||||||
var tsData []byte
|
_, err = io.ReadFull(ts, tsData)
|
||||||
if tsDataP := util.MallocSlice(&s.tsPktBuffer); tsDataP == nil {
|
|
||||||
tsData = make([]byte, TS_PACKET_SIZE)
|
|
||||||
s.tsPktBuffer = append(s.tsPktBuffer, tsData)
|
|
||||||
} else {
|
|
||||||
tsData = *tsDataP
|
|
||||||
}
|
|
||||||
_, err := io.ReadFull(ts, tsData)
|
|
||||||
reader.Reset(tsData)
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// 文件结尾 把最后面的数据发出去
|
// 文件结尾 把最后面的数据发出去
|
||||||
pesPkt, err := TsToPES(tsPktArr)
|
if pesPkt != nil {
|
||||||
if err != nil {
|
s.PESChan <- pesPkt
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
s.PESChan <- &pesPkt
|
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
packet, err := ReadTsPacket(&reader)
|
reader.Reset(tsData)
|
||||||
if err != nil {
|
lr.N = TS_PACKET_SIZE
|
||||||
return err
|
if tsHeader, err = ReadTsHeader(&lr); err != nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
pr.Reset(packet.Payload)
|
if tsHeader.SyncByte != 0x47 {
|
||||||
err = s.ReadPAT(&packet, &pr)
|
return errors.New("sync byte error")
|
||||||
if err != nil {
|
}
|
||||||
return err
|
if tsHeader.Pid == PID_PAT {
|
||||||
|
if s.PAT, err = ReadPAT(&lr); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(s.PMT.Stream) == 0 {
|
||||||
|
for _, v := range s.PAT.Program {
|
||||||
|
if v.ProgramMapPID == tsHeader.Pid {
|
||||||
|
if s.PMT, err = ReadPMT(&lr); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
err = s.ReadPMT(&packet, &pr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
// 在读取PMT中已经将所有的音视频PES的索引信息全部保存了起来
|
|
||||||
// 接着读取所有TS包里面的PID,找出PID==elementaryPID的TS包,就是音视频数据
|
|
||||||
for _, v := range s.PMT.Stream {
|
for _, v := range s.PMT.Stream {
|
||||||
if v.ElementaryPID == packet.Header.Pid {
|
if v.ElementaryPID == tsHeader.Pid {
|
||||||
if packet.Header.PayloadUnitStartIndicator == 1 {
|
if tsHeader.PayloadUnitStartIndicator == 1 {
|
||||||
if frame != 0 {
|
if pesPkt != nil {
|
||||||
pesPkt, err := TsToPES(tsPktArr)
|
s.PESChan <- pesPkt
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
s.PESChan <- &pesPkt
|
pesPkt = &MpegTsPESPacket{}
|
||||||
s.tsPktBuffer = s.tsPktBuffer[:0]
|
if pesPkt.Header, err = ReadPESHeader(&lr); err != nil {
|
||||||
tsPktArr = tsPktArr[:0]
|
return
|
||||||
}
|
}
|
||||||
frame++
|
|
||||||
}
|
}
|
||||||
tsPktArr = append(tsPktArr, packet)
|
io.Copy(&pesPkt.Payload, &lr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -25,7 +25,7 @@ type MpegTsPesStream struct {
|
|||||||
// 110x xxxx 为音频流(0xC0)
|
// 110x xxxx 为音频流(0xC0)
|
||||||
type MpegTsPESPacket struct {
|
type MpegTsPESPacket struct {
|
||||||
Header MpegTsPESHeader
|
Header MpegTsPESHeader
|
||||||
Payload []byte
|
Payload util.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
type MpegTsPESHeader struct {
|
type MpegTsPESHeader struct {
|
||||||
|
@@ -72,7 +72,6 @@ func (av *AVFrame) WriteAVCC(ts uint32, frame util.BLL) {
|
|||||||
}
|
}
|
||||||
// frame.Transfer(&av.AVCC)
|
// frame.Transfer(&av.AVCC)
|
||||||
// frame.ByteLength = 0
|
// frame.ByteLength = 0
|
||||||
av.DTS = ts * 90
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *AVFrame) AppendMem(item *util.ListItem[util.Buffer]) {
|
func (av *AVFrame) AppendMem(item *util.ListItem[util.Buffer]) {
|
||||||
|
@@ -2,6 +2,7 @@ package engine
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pion/webrtc/v3/pkg/media/rtpdump"
|
"github.com/pion/webrtc/v3/pkg/media/rtpdump"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@@ -47,8 +48,8 @@ func (t *RTPDumpPublisher) OnEvent(event any) {
|
|||||||
case codec.CodecID_PCMU:
|
case codec.CodecID_PCMU:
|
||||||
t.AudioTrack = track.NewG711(t.Publisher.Stream, false)
|
t.AudioTrack = track.NewG711(t.Publisher.Stream, false)
|
||||||
}
|
}
|
||||||
t.VideoTrack.SetSpeedLimit(500)
|
t.VideoTrack.SetSpeedLimit(500 * time.Millisecond)
|
||||||
t.AudioTrack.SetSpeedLimit(500)
|
t.AudioTrack.SetSpeedLimit(500 * time.Millisecond)
|
||||||
for {
|
for {
|
||||||
packet, err := r.Next()
|
packet, err := r.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -3,6 +3,7 @@ package engine
|
|||||||
import (
|
import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"m7s.live/engine/v4/codec/mpegts"
|
"m7s.live/engine/v4/codec/mpegts"
|
||||||
|
"m7s.live/engine/v4/common"
|
||||||
"m7s.live/engine/v4/track"
|
"m7s.live/engine/v4/track"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -38,15 +39,15 @@ func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) {
|
|||||||
}
|
}
|
||||||
case mpegts.STREAM_TYPE_AAC:
|
case mpegts.STREAM_TYPE_AAC:
|
||||||
if t.AudioTrack == nil {
|
if t.AudioTrack == nil {
|
||||||
t.AudioTrack = track.NewAAC(t.Publisher.Stream)
|
t.AudioTrack = track.NewAAC(t.Publisher.Stream, uint32(90000))
|
||||||
}
|
}
|
||||||
case mpegts.STREAM_TYPE_G711A:
|
case mpegts.STREAM_TYPE_G711A:
|
||||||
if t.AudioTrack == nil {
|
if t.AudioTrack == nil {
|
||||||
t.AudioTrack = track.NewG711(t.Publisher.Stream, true)
|
t.AudioTrack = track.NewG711(t.Publisher.Stream, true, uint32(90000))
|
||||||
}
|
}
|
||||||
case mpegts.STREAM_TYPE_G711U:
|
case mpegts.STREAM_TYPE_G711U:
|
||||||
if t.AudioTrack == nil {
|
if t.AudioTrack == nil {
|
||||||
t.AudioTrack = track.NewG711(t.Publisher.Stream, false)
|
t.AudioTrack = track.NewG711(t.Publisher.Stream, false, uint32(90000))
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType))
|
t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType))
|
||||||
@@ -72,11 +73,7 @@ func (t *TSPublisher) ReadPES() {
|
|||||||
t.adts = append(t.adts, pes.Payload[:7]...)
|
t.adts = append(t.adts, pes.Payload[:7]...)
|
||||||
t.AudioTrack.WriteADTS(t.adts)
|
t.AudioTrack.WriteADTS(t.adts)
|
||||||
}
|
}
|
||||||
current := t.AudioTrack.CurrentFrame()
|
|
||||||
current.PTS = uint32(pes.Header.Pts)
|
|
||||||
current.DTS = uint32(pes.Header.Dts)
|
|
||||||
remainLen := len(pes.Payload)
|
remainLen := len(pes.Payload)
|
||||||
current.BytesIn += remainLen
|
|
||||||
for remainLen > 0 {
|
for remainLen > 0 {
|
||||||
// AACFrameLength(13)
|
// AACFrameLength(13)
|
||||||
// xx xxxxxxxx xxx
|
// xx xxxxxxxx xxx
|
||||||
@@ -88,12 +85,9 @@ func (t *TSPublisher) ReadPES() {
|
|||||||
pes.Payload = pes.Payload[frameLen:remainLen]
|
pes.Payload = pes.Payload[frameLen:remainLen]
|
||||||
remainLen -= frameLen
|
remainLen -= frameLen
|
||||||
}
|
}
|
||||||
t.AudioTrack.Flush()
|
|
||||||
case *track.G711:
|
case *track.G711:
|
||||||
t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload)
|
t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload)
|
||||||
t.AudioTrack.Flush()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
case mpegts.STREAM_ID_VIDEO:
|
case mpegts.STREAM_ID_VIDEO:
|
||||||
if t.VideoTrack == nil {
|
if t.VideoTrack == nil {
|
||||||
@@ -102,7 +96,7 @@ func (t *TSPublisher) ReadPES() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if t.VideoTrack != nil {
|
if t.VideoTrack != nil {
|
||||||
t.VideoTrack.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), pes.Payload)
|
t.VideoTrack.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), common.AnnexBFrame(pes.Payload))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -183,28 +183,37 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
|||||||
case SUBTYPE_RAW:
|
case SUBTYPE_RAW:
|
||||||
sendVideoFrame = func(frame *AVFrame) {
|
sendVideoFrame = func(frame *AVFrame) {
|
||||||
// println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
|
// println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
|
||||||
spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipTs*90, frame.DTS - s.VideoReader.SkipTs*90})
|
spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs})
|
||||||
}
|
}
|
||||||
sendAudioFrame = func(frame *AVFrame) {
|
sendAudioFrame = func(frame *AVFrame) {
|
||||||
// println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
|
// println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime)
|
||||||
spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.AbsTime * 90, s.AudioReader.AbsTime * 90})
|
spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.Track.Ms2RTPTs(s.AudioReader.AbsTime), s.AudioReader.Track.Ms2RTPTs(s.AudioReader.AbsTime)})
|
||||||
}
|
}
|
||||||
case SUBTYPE_RTP:
|
case SUBTYPE_RTP:
|
||||||
var videoSeq, audioSeq uint16
|
var videoSeq, audioSeq uint16
|
||||||
sendVideoFrame = func(frame *AVFrame) {
|
sendVideoFrame = func(frame *AVFrame) {
|
||||||
frame.RTP.Range(func(vp RTPFrame) bool {
|
frame.RTP.Range(func(vp RTPFrame) bool {
|
||||||
videoSeq++
|
videoSeq++
|
||||||
vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90
|
vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipRTPTs
|
||||||
vp.Header.SequenceNumber = videoSeq
|
vp.Header.SequenceNumber = videoSeq
|
||||||
spesic.OnEvent((VideoRTP)(vp))
|
spesic.OnEvent((VideoRTP)(vp))
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
createTimestamp := func(ts uint32) uint32 {
|
||||||
|
return ts - s.AudioReader.SkipRTPTs
|
||||||
|
}
|
||||||
|
// RTP需要转换时间戳调整为采样率
|
||||||
|
if s.Audio.SampleRate != s.Audio.ClockRate {
|
||||||
|
createTimestamp = func(ts uint32) uint32 {
|
||||||
|
return uint32(uint64(ts-s.AudioReader.SkipRTPTs) * uint64(s.Audio.SampleRate) / uint64(s.Audio.ClockRate))
|
||||||
|
}
|
||||||
|
}
|
||||||
sendAudioFrame = func(frame *AVFrame) {
|
sendAudioFrame = func(frame *AVFrame) {
|
||||||
frame.RTP.Range(func(ap RTPFrame) bool {
|
frame.RTP.Range(func(ap RTPFrame) bool {
|
||||||
audioSeq++
|
audioSeq++
|
||||||
ap.Header.SequenceNumber = audioSeq
|
ap.Header.SequenceNumber = audioSeq
|
||||||
ap.Header.Timestamp = ap.Header.Timestamp - s.AudioReader.SkipTs*90
|
ap.Header.Timestamp = createTimestamp(ap.Header.Timestamp)
|
||||||
spesic.OnEvent((AudioRTP)(ap))
|
spesic.OnEvent((AudioRTP)(ap))
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
@@ -221,11 +230,11 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
|||||||
spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11)))
|
spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11)))
|
||||||
}
|
}
|
||||||
sendVideoDecConf = func() {
|
sendVideoDecConf = func() {
|
||||||
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, 0, s.VideoReader.Track.SequenceHead)
|
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, s.VideoReader.Track.SequenceHead)
|
||||||
// spesic.OnEvent(FLVFrame(copyBuffers(s.Video.Track.DecoderConfiguration.FLV)))
|
// spesic.OnEvent(FLVFrame(copyBuffers(s.Video.Track.DecoderConfiguration.FLV)))
|
||||||
}
|
}
|
||||||
sendAudioDecConf = func() {
|
sendAudioDecConf = func() {
|
||||||
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, 0, s.AudioReader.Track.SequenceHead)
|
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead)
|
||||||
// spesic.OnEvent(FLVFrame(copyBuffers(s.Audio.Track.DecoderConfiguration.FLV)))
|
// spesic.OnEvent(FLVFrame(copyBuffers(s.Audio.Track.DecoderConfiguration.FLV)))
|
||||||
}
|
}
|
||||||
sendVideoFrame = func(frame *AVFrame) {
|
sendVideoFrame = func(frame *AVFrame) {
|
||||||
@@ -299,10 +308,12 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
|||||||
case track.READSTATE_INIT:
|
case track.READSTATE_INIT:
|
||||||
if s.Video != nil {
|
if s.Video != nil {
|
||||||
s.AudioReader.FirstTs = s.VideoReader.FirstTs
|
s.AudioReader.FirstTs = s.VideoReader.FirstTs
|
||||||
|
|
||||||
}
|
}
|
||||||
case track.READSTATE_NORMAL:
|
case track.READSTATE_NORMAL:
|
||||||
if s.Video != nil {
|
if s.Video != nil {
|
||||||
s.AudioReader.SkipTs = s.VideoReader.SkipTs
|
s.AudioReader.SkipTs = s.VideoReader.SkipTs
|
||||||
|
s.AudioReader.SkipRTPTs = s.AudioReader.Track.Ms2RTPTs(s.AudioReader.SkipTs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.AudioReader.Read(ctx, subMode)
|
s.AudioReader.Read(ctx, subMode)
|
||||||
|
@@ -40,9 +40,9 @@ func (aac *AAC) WriteADTS(adts []byte) {
|
|||||||
channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6)
|
channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6)
|
||||||
config1 := (profile << 3) | ((sampleRate & 0xe) >> 1)
|
config1 := (profile << 3) | ((sampleRate & 0xe) >> 1)
|
||||||
config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
|
config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
|
||||||
|
aac.Media.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2})
|
||||||
aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
|
aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
|
||||||
aac.Channels = channel
|
aac.Channels = channel
|
||||||
aac.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2})
|
|
||||||
aac.Parse(aac.SequenceHead[2:])
|
aac.Parse(aac.SequenceHead[2:])
|
||||||
aac.Attach()
|
aac.Attach()
|
||||||
}
|
}
|
||||||
@@ -93,6 +93,7 @@ func (aac *AAC) WriteSequenceHead(sh []byte) {
|
|||||||
config1, config2 := aac.SequenceHead[2], aac.SequenceHead[3]
|
config1, config2 := aac.SequenceHead[2], aac.SequenceHead[3]
|
||||||
aac.Channels = ((config2 >> 3) & 0x0F) //声道
|
aac.Channels = ((config2 >> 3) & 0x0F) //声道
|
||||||
aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
|
aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
|
||||||
|
aac.ClockRate = aac.SampleRate
|
||||||
aac.Parse(aac.SequenceHead[2:])
|
aac.Parse(aac.SequenceHead[2:])
|
||||||
aac.Attach()
|
aac.Attach()
|
||||||
}
|
}
|
||||||
|
@@ -11,6 +11,7 @@ type Audio struct {
|
|||||||
CodecID codec.AudioCodecID
|
CodecID codec.AudioCodecID
|
||||||
Channels byte
|
Channels byte
|
||||||
SampleSize byte
|
SampleSize byte
|
||||||
|
SampleRate uint32
|
||||||
AVCCHead []byte // 音频包在AVCC格式中,AAC会有两个字节,其他的只有一个字节
|
AVCCHead []byte // 音频包在AVCC格式中,AAC会有两个字节,其他的只有一个字节
|
||||||
codec.AudioSpecificConfig
|
codec.AudioSpecificConfig
|
||||||
}
|
}
|
||||||
@@ -50,7 +51,7 @@ func (av *Audio) WriteRaw(pts uint32, raw []byte) {
|
|||||||
|
|
||||||
func (av *Audio) WriteAVCC(ts uint32, frame util.BLL) {
|
func (av *Audio) WriteAVCC(ts uint32, frame util.BLL) {
|
||||||
av.Value.WriteAVCC(ts, frame)
|
av.Value.WriteAVCC(ts, frame)
|
||||||
av.generateTimestamp(ts * 90)
|
av.generateTimestamp(ts * av.ClockRate / 1000)
|
||||||
av.Flush()
|
av.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -31,15 +31,15 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) {
|
|||||||
// return
|
// return
|
||||||
// }
|
// }
|
||||||
// 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep
|
// 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep
|
||||||
if 过快毫秒 := (数据时间差 - 实际时间差) / time.Millisecond; 过快毫秒 > 100 {
|
if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond {
|
||||||
// println("过快毫秒", 过快毫秒)
|
// println("过快毫秒", 过快)
|
||||||
if 过快毫秒 > p.等待上限 {
|
if 过快 > p.等待上限 {
|
||||||
time.Sleep(time.Millisecond * p.等待上限)
|
time.Sleep(p.等待上限)
|
||||||
} else {
|
} else {
|
||||||
time.Sleep(过快毫秒 * time.Millisecond)
|
time.Sleep(过快)
|
||||||
}
|
}
|
||||||
} else if 过快毫秒 < -100 {
|
} else if 过快 < -100*time.Millisecond {
|
||||||
// println("过慢毫秒", 过快毫秒)
|
// println("过慢毫秒", 过快)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,7 +74,7 @@ type Media struct {
|
|||||||
Base
|
Base
|
||||||
RingBuffer[AVFrame]
|
RingBuffer[AVFrame]
|
||||||
IDRingList `json:"-"` //最近的关键帧位置,首屏渲染
|
IDRingList `json:"-"` //最近的关键帧位置,首屏渲染
|
||||||
SampleRate uint32
|
ClockRate uint32 //时钟频率,mpeg中均为90000,rtsp中音频根据sample_rate
|
||||||
SSRC uint32
|
SSRC uint32
|
||||||
PayloadType byte
|
PayloadType byte
|
||||||
BytesPool util.BytesPool `json:"-"`
|
BytesPool util.BytesPool `json:"-"`
|
||||||
@@ -88,6 +88,16 @@ type Media struct {
|
|||||||
流速控制
|
流速控制
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 毫秒转换为RTP时间戳
|
||||||
|
func (av *Media) Ms2RTPTs(ms uint32) uint32 {
|
||||||
|
return uint32(uint64(ms) * uint64(av.ClockRate) / 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RTP时间戳转换为毫秒
|
||||||
|
func (av *Media) RTPTs2Ms(rtpts uint32) uint32 {
|
||||||
|
return uint32(uint64(rtpts) * 1000 / uint64(av.ClockRate))
|
||||||
|
}
|
||||||
|
|
||||||
// 为json序列化而计算的数据
|
// 为json序列化而计算的数据
|
||||||
func (av *Media) SnapForJson() {
|
func (av *Media) SnapForJson() {
|
||||||
v := av.LastValue
|
v := av.LastValue
|
||||||
@@ -103,7 +113,7 @@ func (av *Media) SnapForJson() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *Media) SetSpeedLimit(value time.Duration) {
|
func (av *Media) SetSpeedLimit(value time.Duration) {
|
||||||
av.等待上限 = value * time.Millisecond
|
av.等待上限 = value
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *Media) SetStuff(stuff ...any) {
|
func (av *Media) SetStuff(stuff ...any) {
|
||||||
@@ -115,7 +125,7 @@ func (av *Media) SetStuff(stuff ...any) {
|
|||||||
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
||||||
av.等待上限 = config.Global.SpeedLimit
|
av.等待上限 = config.Global.SpeedLimit
|
||||||
case uint32:
|
case uint32:
|
||||||
av.SampleRate = v
|
av.ClockRate = v
|
||||||
case byte:
|
case byte:
|
||||||
av.PayloadType = v
|
av.PayloadType = v
|
||||||
case util.BytesPool:
|
case util.BytesPool:
|
||||||
@@ -195,8 +205,9 @@ func (av *Media) Flush() {
|
|||||||
av.Info("track back online")
|
av.Info("track back online")
|
||||||
}
|
}
|
||||||
if av.deltaTs != 0 {
|
if av.deltaTs != 0 {
|
||||||
curValue.DTS = uint32(int64(curValue.DTS) + av.deltaTs*90)
|
rtpts := int64(av.deltaTs) * int64(av.ClockRate) / 1000
|
||||||
curValue.PTS = uint32(int64(curValue.PTS) + av.deltaTs*90)
|
curValue.DTS = uint32(int64(curValue.DTS) + rtpts)
|
||||||
|
curValue.PTS = uint32(int64(curValue.PTS) + rtpts)
|
||||||
curValue.AbsTime = 0
|
curValue.AbsTime = 0
|
||||||
}
|
}
|
||||||
bufferTime := av.Stream.GetPublisherConfig().BufferTime
|
bufferTime := av.Stream.GetPublisherConfig().BufferTime
|
||||||
@@ -213,6 +224,15 @@ func (av *Media) Flush() {
|
|||||||
// av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
|
// av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
if av.起始时间.IsZero() {
|
||||||
|
curValue.DeltaTime = 0
|
||||||
|
av.重置(curValue.AbsTime)
|
||||||
|
} else if curValue.AbsTime == 0 {
|
||||||
|
curValue.DeltaTime = (curValue.DTS - preValue.DTS) * 1000 / av.ClockRate
|
||||||
|
curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime
|
||||||
|
} else {
|
||||||
|
curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime
|
||||||
|
}
|
||||||
if curValue.AUList.Length > 0 {
|
if curValue.AUList.Length > 0 {
|
||||||
// 补完RTP
|
// 补完RTP
|
||||||
if config.Global.EnableRTP && curValue.RTP.Length == 0 {
|
if config.Global.EnableRTP && curValue.RTP.Length == 0 {
|
||||||
@@ -223,15 +243,6 @@ func (av *Media) Flush() {
|
|||||||
av.CompleteAVCC(curValue)
|
av.CompleteAVCC(curValue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if av.起始时间.IsZero() {
|
|
||||||
curValue.DeltaTime = 0
|
|
||||||
av.重置(curValue.AbsTime)
|
|
||||||
} else if curValue.AbsTime == 0 {
|
|
||||||
curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90
|
|
||||||
curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime
|
|
||||||
} else {
|
|
||||||
curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime
|
|
||||||
}
|
|
||||||
av.Base.Flush(&curValue.BaseFrame)
|
av.Base.Flush(&curValue.BaseFrame)
|
||||||
if av.等待上限 > 0 {
|
if av.等待上限 > 0 {
|
||||||
av.控制流速(curValue.AbsTime)
|
av.控制流速(curValue.AbsTime)
|
||||||
|
@@ -25,6 +25,7 @@ type AVRingReader struct {
|
|||||||
FirstSeq uint32
|
FirstSeq uint32
|
||||||
FirstTs uint32
|
FirstTs uint32
|
||||||
SkipTs uint32
|
SkipTs uint32
|
||||||
|
SkipRTPTs uint32
|
||||||
beforeJump uint32
|
beforeJump uint32
|
||||||
ConfSeq int
|
ConfSeq int
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
@@ -97,6 +98,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
|||||||
r.FirstTs = r.Frame.AbsTime
|
r.FirstTs = r.Frame.AbsTime
|
||||||
}
|
}
|
||||||
r.SkipTs = r.FirstTs
|
r.SkipTs = r.FirstTs
|
||||||
|
r.SkipRTPTs = r.Track.Ms2RTPTs(r.SkipTs)
|
||||||
r.FirstSeq = r.Frame.Sequence
|
r.FirstSeq = r.Frame.Sequence
|
||||||
r.Track.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
|
r.Track.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
|
||||||
case READSTATE_FIRST:
|
case READSTATE_FIRST:
|
||||||
@@ -107,6 +109,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.SkipTs = frame.AbsTime - r.beforeJump
|
r.SkipTs = frame.AbsTime - r.beforeJump
|
||||||
|
r.SkipRTPTs = r.Track.Ms2RTPTs(r.SkipTs)
|
||||||
r.Track.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs))
|
r.Track.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs))
|
||||||
r.State = READSTATE_NORMAL
|
r.State = READSTATE_NORMAL
|
||||||
} else {
|
} else {
|
||||||
|
@@ -114,7 +114,8 @@ func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
vt.Value.PTS = (ts + cts) * 90
|
vt.Value.PTS = vt.Ms2RTPTs(ts + cts)
|
||||||
|
vt.Value.DTS = vt.Ms2RTPTs(ts)
|
||||||
// println(":", vt.Value.Sequence)
|
// println(":", vt.Value.Sequence)
|
||||||
for nalulen, err := r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
|
for nalulen, err := r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
|
||||||
// var au util.BLL
|
// var au util.BLL
|
||||||
@@ -148,30 +149,30 @@ func (vt *Video) WriteSliceByte(b ...byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 在I帧前面插入sps pps webrtc需要
|
// 在I帧前面插入sps pps webrtc需要
|
||||||
func (av *Video) insertDCRtp() {
|
func (vt *Video) insertDCRtp() {
|
||||||
head := av.Value.RTP.Next
|
head := vt.Value.RTP.Next
|
||||||
seq := head.Value.SequenceNumber
|
seq := head.Value.SequenceNumber
|
||||||
for _, nalu := range av.ParamaterSets {
|
for _, nalu := range vt.ParamaterSets {
|
||||||
var packet RTPFrame
|
var packet RTPFrame
|
||||||
packet.Version = 2
|
packet.Version = 2
|
||||||
packet.PayloadType = av.PayloadType
|
packet.PayloadType = vt.PayloadType
|
||||||
packet.Payload = nalu
|
packet.Payload = nalu
|
||||||
packet.SSRC = av.SSRC
|
packet.SSRC = vt.SSRC
|
||||||
packet.Timestamp = av.Value.PTS
|
packet.Timestamp = vt.Value.PTS
|
||||||
packet.Marker = false
|
packet.Marker = false
|
||||||
head.InsertBeforeValue(packet)
|
head.InsertBeforeValue(packet)
|
||||||
av.rtpSequence++
|
vt.rtpSequence++
|
||||||
}
|
}
|
||||||
av.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool {
|
vt.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool {
|
||||||
item.Value.SequenceNumber = seq
|
item.Value.SequenceNumber = seq
|
||||||
seq++
|
seq++
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *Video) generateTimestamp(ts uint32) {
|
func (vt *Video) generateTimestamp(ts uint32) {
|
||||||
av.Value.PTS = ts
|
vt.Value.PTS = ts
|
||||||
av.Value.DTS = av.dtsEst.Feed(ts)
|
vt.Value.DTS = vt.dtsEst.Feed(ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vt *Video) SetLostFlag() {
|
func (vt *Video) SetLostFlag() {
|
||||||
@@ -188,7 +189,7 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
|
|||||||
b[1] = 1
|
b[1] = 1
|
||||||
// println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS)
|
// println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS)
|
||||||
// 写入CTS
|
// 写入CTS
|
||||||
util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
|
util.PutBE(b[2:5], vt.RTPTs2Ms(rv.PTS-rv.DTS))
|
||||||
rv.AVCC.Push(mem)
|
rv.AVCC.Push(mem)
|
||||||
rv.AUList.Range(func(au *util.BLL) bool {
|
rv.AUList.Range(func(au *util.BLL) bool {
|
||||||
mem = vt.BytesPool.Get(4)
|
mem = vt.BytesPool.Get(4)
|
||||||
|
Reference in New Issue
Block a user