diff --git a/codec/mpegts/mpegts.go b/codec/mpegts/mpegts.go index d1f7555..ca13c9c 100644 --- a/codec/mpegts/mpegts.go +++ b/codec/mpegts/mpegts.go @@ -102,10 +102,9 @@ const ( // type MpegTsStream struct { - PAT MpegTsPAT // PAT表信息 - PMT MpegTsPMT // PMT表信息 - tsPktBuffer [][]byte // TS包缓存 - PESChan chan *MpegTsPESPacket + PAT MpegTsPAT // PAT表信息 + PMT MpegTsPMT // PMT表信息 + PESChan chan *MpegTsPESPacket } // ios13818-1-CN.pdf 33/165 @@ -518,64 +517,60 @@ func (s *MpegTsStream) ReadPMT(packet *MpegTsPacket, pr io.Reader) (err error) { } return } -func (s *MpegTsStream) Feed(ts io.Reader) error { - var frame int64 - var tsPktArr []MpegTsPacket - var reader, pr bytes.Reader - defer func() { - s.tsPktBuffer = s.tsPktBuffer[:0] - }() +func (s *MpegTsStream) Feed(ts io.Reader) (err error) { + var reader bytes.Reader + var lr io.LimitedReader + lr.R = &reader + var pesPkt *MpegTsPESPacket + var tsHeader MpegTsHeader + tsData := make([]byte, TS_PACKET_SIZE) for { - var tsData []byte - if tsDataP := util.MallocSlice(&s.tsPktBuffer); tsDataP == nil { - tsData = make([]byte, TS_PACKET_SIZE) - s.tsPktBuffer = append(s.tsPktBuffer, tsData) - } else { - tsData = *tsDataP - } - _, err := io.ReadFull(ts, tsData) - reader.Reset(tsData) + _, err = io.ReadFull(ts, tsData) if err == io.EOF { // 文件结尾 把最后面的数据发出去 - pesPkt, err := TsToPES(tsPktArr) - if err != nil { - return err + if pesPkt != nil { + s.PESChan <- pesPkt } - s.PESChan <- &pesPkt return nil } else if err != nil { - return err + return } - packet, err := ReadTsPacket(&reader) - if err != nil { - return err + reader.Reset(tsData) + lr.N = TS_PACKET_SIZE + if tsHeader, err = ReadTsHeader(&lr); err != nil { + return } - pr.Reset(packet.Payload) - err = s.ReadPAT(&packet, &pr) - if err != nil { - return err + if tsHeader.SyncByte != 0x47 { + return errors.New("sync byte error") } - err = s.ReadPMT(&packet, &pr) - if err != nil { - return err + if tsHeader.Pid == PID_PAT { + if s.PAT, err = ReadPAT(&lr); err != nil { + return + } + continue } - // 在读取PMT中已经将所有的音视频PES的索引信息全部保存了起来 - // 接着读取所有TS包里面的PID,找出PID==elementaryPID的TS包,就是音视频数据 - for _, v := range s.PMT.Stream { - if v.ElementaryPID == packet.Header.Pid { - if packet.Header.PayloadUnitStartIndicator == 1 { - if frame != 0 { - pesPkt, err := TsToPES(tsPktArr) - if err != nil { - return err - } - s.PESChan <- &pesPkt - s.tsPktBuffer = s.tsPktBuffer[:0] - tsPktArr = tsPktArr[:0] + if len(s.PMT.Stream) == 0 { + for _, v := range s.PAT.Program { + if v.ProgramMapPID == tsHeader.Pid { + if s.PMT, err = ReadPMT(&lr); err != nil { + return } - frame++ } - tsPktArr = append(tsPktArr, packet) + continue + } + } + for _, v := range s.PMT.Stream { + if v.ElementaryPID == tsHeader.Pid { + if tsHeader.PayloadUnitStartIndicator == 1 { + if pesPkt != nil { + s.PESChan <- pesPkt + } + pesPkt = &MpegTsPESPacket{} + if pesPkt.Header, err = ReadPESHeader(&lr); err != nil { + return + } + } + io.Copy(&pesPkt.Payload, &lr) } } } diff --git a/codec/mpegts/mpegts_pes.go b/codec/mpegts/mpegts_pes.go index e986fe2..c0e8b62 100644 --- a/codec/mpegts/mpegts_pes.go +++ b/codec/mpegts/mpegts_pes.go @@ -25,7 +25,7 @@ type MpegTsPesStream struct { // 110x xxxx 为音频流(0xC0) type MpegTsPESPacket struct { Header MpegTsPESHeader - Payload []byte + Payload util.Buffer } type MpegTsPESHeader struct { diff --git a/common/frame.go b/common/frame.go index f0526c4..0c0ebda 100644 --- a/common/frame.go +++ b/common/frame.go @@ -72,7 +72,6 @@ func (av *AVFrame) WriteAVCC(ts uint32, frame util.BLL) { } // frame.Transfer(&av.AVCC) // frame.ByteLength = 0 - av.DTS = ts * 90 } func (av *AVFrame) AppendMem(item *util.ListItem[util.Buffer]) { diff --git a/publisher-rtpdump.go b/publisher-rtpdump.go index 774ebeb..c93b183 100644 --- a/publisher-rtpdump.go +++ b/publisher-rtpdump.go @@ -2,6 +2,7 @@ package engine import ( "os" + "time" "github.com/pion/webrtc/v3/pkg/media/rtpdump" "go.uber.org/zap" @@ -47,8 +48,8 @@ func (t *RTPDumpPublisher) OnEvent(event any) { case codec.CodecID_PCMU: t.AudioTrack = track.NewG711(t.Publisher.Stream, false) } - t.VideoTrack.SetSpeedLimit(500) - t.AudioTrack.SetSpeedLimit(500) + t.VideoTrack.SetSpeedLimit(500 * time.Millisecond) + t.AudioTrack.SetSpeedLimit(500 * time.Millisecond) for { packet, err := r.Next() if err != nil { diff --git a/publisher-ts.go b/publisher-ts.go index 788acaa..e73bd77 100644 --- a/publisher-ts.go +++ b/publisher-ts.go @@ -3,6 +3,7 @@ package engine import ( "go.uber.org/zap" "m7s.live/engine/v4/codec/mpegts" + "m7s.live/engine/v4/common" "m7s.live/engine/v4/track" ) @@ -38,15 +39,15 @@ func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) { } case mpegts.STREAM_TYPE_AAC: if t.AudioTrack == nil { - t.AudioTrack = track.NewAAC(t.Publisher.Stream) + t.AudioTrack = track.NewAAC(t.Publisher.Stream, uint32(90000)) } case mpegts.STREAM_TYPE_G711A: if t.AudioTrack == nil { - t.AudioTrack = track.NewG711(t.Publisher.Stream, true) + t.AudioTrack = track.NewG711(t.Publisher.Stream, true, uint32(90000)) } case mpegts.STREAM_TYPE_G711U: if t.AudioTrack == nil { - t.AudioTrack = track.NewG711(t.Publisher.Stream, false) + t.AudioTrack = track.NewG711(t.Publisher.Stream, false, uint32(90000)) } default: t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType)) @@ -72,11 +73,7 @@ func (t *TSPublisher) ReadPES() { t.adts = append(t.adts, pes.Payload[:7]...) t.AudioTrack.WriteADTS(t.adts) } - current := t.AudioTrack.CurrentFrame() - current.PTS = uint32(pes.Header.Pts) - current.DTS = uint32(pes.Header.Dts) remainLen := len(pes.Payload) - current.BytesIn += remainLen for remainLen > 0 { // AACFrameLength(13) // xx xxxxxxxx xxx @@ -88,12 +85,9 @@ func (t *TSPublisher) ReadPES() { pes.Payload = pes.Payload[frameLen:remainLen] remainLen -= frameLen } - t.AudioTrack.Flush() case *track.G711: t.AudioTrack.WriteRaw(uint32(pes.Header.Pts), pes.Payload) - t.AudioTrack.Flush() } - } case mpegts.STREAM_ID_VIDEO: if t.VideoTrack == nil { @@ -102,7 +96,7 @@ func (t *TSPublisher) ReadPES() { } } if t.VideoTrack != nil { - t.VideoTrack.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), pes.Payload) + t.VideoTrack.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), common.AnnexBFrame(pes.Payload)) } } } diff --git a/subscriber.go b/subscriber.go index 81c1982..5ac30ed 100644 --- a/subscriber.go +++ b/subscriber.go @@ -183,28 +183,37 @@ func (s *Subscriber) PlayBlock(subType byte) { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { // println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) - spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipTs*90, frame.DTS - s.VideoReader.SkipTs*90}) + spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipRTPTs, frame.DTS - s.VideoReader.SkipRTPTs}) } sendAudioFrame = func(frame *AVFrame) { // println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) - spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.AbsTime * 90, s.AudioReader.AbsTime * 90}) + spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.Track.Ms2RTPTs(s.AudioReader.AbsTime), s.AudioReader.Track.Ms2RTPTs(s.AudioReader.AbsTime)}) } case SUBTYPE_RTP: var videoSeq, audioSeq uint16 sendVideoFrame = func(frame *AVFrame) { frame.RTP.Range(func(vp RTPFrame) bool { videoSeq++ - vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipTs*90 + vp.Header.Timestamp = vp.Header.Timestamp - s.VideoReader.SkipRTPTs vp.Header.SequenceNumber = videoSeq spesic.OnEvent((VideoRTP)(vp)) return true }) } + createTimestamp := func(ts uint32) uint32 { + return ts - s.AudioReader.SkipRTPTs + } + // RTP需要转换时间戳调整为采样率 + if s.Audio.SampleRate != s.Audio.ClockRate { + createTimestamp = func(ts uint32) uint32 { + return uint32(uint64(ts-s.AudioReader.SkipRTPTs) * uint64(s.Audio.SampleRate) / uint64(s.Audio.ClockRate)) + } + } sendAudioFrame = func(frame *AVFrame) { frame.RTP.Range(func(ap RTPFrame) bool { audioSeq++ ap.Header.SequenceNumber = audioSeq - ap.Header.Timestamp = ap.Header.Timestamp - s.AudioReader.SkipTs*90 + ap.Header.Timestamp = createTimestamp(ap.Header.Timestamp) spesic.OnEvent((AudioRTP)(ap)) return true }) @@ -221,11 +230,11 @@ func (s *Subscriber) PlayBlock(subType byte) { spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11))) } sendVideoDecConf = func() { - sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, 0, s.VideoReader.Track.SequenceHead) + sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, s.VideoReader.Track.SequenceHead) // spesic.OnEvent(FLVFrame(copyBuffers(s.Video.Track.DecoderConfiguration.FLV))) } sendAudioDecConf = func() { - sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, 0, s.AudioReader.Track.SequenceHead) + sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead) // spesic.OnEvent(FLVFrame(copyBuffers(s.Audio.Track.DecoderConfiguration.FLV))) } sendVideoFrame = func(frame *AVFrame) { @@ -299,10 +308,12 @@ func (s *Subscriber) PlayBlock(subType byte) { case track.READSTATE_INIT: if s.Video != nil { s.AudioReader.FirstTs = s.VideoReader.FirstTs + } case track.READSTATE_NORMAL: if s.Video != nil { s.AudioReader.SkipTs = s.VideoReader.SkipTs + s.AudioReader.SkipRTPTs = s.AudioReader.Track.Ms2RTPTs(s.AudioReader.SkipTs) } } s.AudioReader.Read(ctx, subMode) diff --git a/track/aac.go b/track/aac.go index 31a5ff0..d5049e2 100644 --- a/track/aac.go +++ b/track/aac.go @@ -40,9 +40,9 @@ func (aac *AAC) WriteADTS(adts []byte) { channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6) config1 := (profile << 3) | ((sampleRate & 0xe) >> 1) config2 := ((sampleRate & 0x1) << 7) | (channel << 3) + aac.Media.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2}) aac.SampleRate = uint32(codec.SamplingFrequencies[sampleRate]) aac.Channels = channel - aac.WriteSequenceHead([]byte{0xAF, 0x00, config1, config2}) aac.Parse(aac.SequenceHead[2:]) aac.Attach() } @@ -93,6 +93,7 @@ func (aac *AAC) WriteSequenceHead(sh []byte) { config1, config2 := aac.SequenceHead[2], aac.SequenceHead[3] aac.Channels = ((config2 >> 3) & 0x0F) //声道 aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]) + aac.ClockRate = aac.SampleRate aac.Parse(aac.SequenceHead[2:]) aac.Attach() } diff --git a/track/audio.go b/track/audio.go index 03d387b..6ec4822 100644 --- a/track/audio.go +++ b/track/audio.go @@ -11,6 +11,7 @@ type Audio struct { CodecID codec.AudioCodecID Channels byte SampleSize byte + SampleRate uint32 AVCCHead []byte // 音频包在AVCC格式中,AAC会有两个字节,其他的只有一个字节 codec.AudioSpecificConfig } @@ -50,7 +51,7 @@ func (av *Audio) WriteRaw(pts uint32, raw []byte) { func (av *Audio) WriteAVCC(ts uint32, frame util.BLL) { av.Value.WriteAVCC(ts, frame) - av.generateTimestamp(ts * 90) + av.generateTimestamp(ts * av.ClockRate / 1000) av.Flush() } diff --git a/track/base.go b/track/base.go index c8967c2..746edf5 100644 --- a/track/base.go +++ b/track/base.go @@ -31,15 +31,15 @@ func (p *流速控制) 控制流速(绝对时间戳 uint32) { // return // } // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep - if 过快毫秒 := (数据时间差 - 实际时间差) / time.Millisecond; 过快毫秒 > 100 { - // println("过快毫秒", 过快毫秒) - if 过快毫秒 > p.等待上限 { - time.Sleep(time.Millisecond * p.等待上限) + if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond { + // println("过快毫秒", 过快) + if 过快 > p.等待上限 { + time.Sleep(p.等待上限) } else { - time.Sleep(过快毫秒 * time.Millisecond) + time.Sleep(过快) } - } else if 过快毫秒 < -100 { - // println("过慢毫秒", 过快毫秒) + } else if 过快 < -100*time.Millisecond { + // println("过慢毫秒", 过快) } } @@ -74,7 +74,7 @@ type Media struct { Base RingBuffer[AVFrame] IDRingList `json:"-"` //最近的关键帧位置,首屏渲染 - SampleRate uint32 + ClockRate uint32 //时钟频率,mpeg中均为90000,rtsp中音频根据sample_rate SSRC uint32 PayloadType byte BytesPool util.BytesPool `json:"-"` @@ -88,6 +88,16 @@ type Media struct { 流速控制 } +// 毫秒转换为RTP时间戳 +func (av *Media) Ms2RTPTs(ms uint32) uint32 { + return uint32(uint64(ms) * uint64(av.ClockRate) / 1000) +} + +// RTP时间戳转换为毫秒 +func (av *Media) RTPTs2Ms(rtpts uint32) uint32 { + return uint32(uint64(rtpts) * 1000 / uint64(av.ClockRate)) +} + // 为json序列化而计算的数据 func (av *Media) SnapForJson() { v := av.LastValue @@ -103,7 +113,7 @@ func (av *Media) SnapForJson() { } func (av *Media) SetSpeedLimit(value time.Duration) { - av.等待上限 = value * time.Millisecond + av.等待上限 = value } func (av *Media) SetStuff(stuff ...any) { @@ -115,7 +125,7 @@ func (av *Media) SetStuff(stuff ...any) { av.SSRC = uint32(uintptr(unsafe.Pointer(av))) av.等待上限 = config.Global.SpeedLimit case uint32: - av.SampleRate = v + av.ClockRate = v case byte: av.PayloadType = v case util.BytesPool: @@ -195,8 +205,9 @@ func (av *Media) Flush() { av.Info("track back online") } if av.deltaTs != 0 { - curValue.DTS = uint32(int64(curValue.DTS) + av.deltaTs*90) - curValue.PTS = uint32(int64(curValue.PTS) + av.deltaTs*90) + rtpts := int64(av.deltaTs) * int64(av.ClockRate) / 1000 + curValue.DTS = uint32(int64(curValue.DTS) + rtpts) + curValue.PTS = uint32(int64(curValue.PTS) + rtpts) curValue.AbsTime = 0 } bufferTime := av.Stream.GetPublisherConfig().BufferTime @@ -213,6 +224,15 @@ func (av *Media) Flush() { // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name)) // } } + if av.起始时间.IsZero() { + curValue.DeltaTime = 0 + av.重置(curValue.AbsTime) + } else if curValue.AbsTime == 0 { + curValue.DeltaTime = (curValue.DTS - preValue.DTS) * 1000 / av.ClockRate + curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime + } else { + curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime + } if curValue.AUList.Length > 0 { // 补完RTP if config.Global.EnableRTP && curValue.RTP.Length == 0 { @@ -223,15 +243,6 @@ func (av *Media) Flush() { av.CompleteAVCC(curValue) } } - if av.起始时间.IsZero() { - curValue.DeltaTime = 0 - av.重置(curValue.AbsTime) - } else if curValue.AbsTime == 0 { - curValue.DeltaTime = (curValue.DTS - preValue.DTS) / 90 - curValue.AbsTime = preValue.AbsTime + curValue.DeltaTime - } else { - curValue.DeltaTime = curValue.AbsTime - preValue.AbsTime - } av.Base.Flush(&curValue.BaseFrame) if av.等待上限 > 0 { av.控制流速(curValue.AbsTime) diff --git a/track/reader-av.go b/track/reader-av.go index 4890879..5a18abd 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -25,6 +25,7 @@ type AVRingReader struct { FirstSeq uint32 FirstTs uint32 SkipTs uint32 + SkipRTPTs uint32 beforeJump uint32 ConfSeq int startTime time.Time @@ -97,6 +98,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.FirstTs = r.Frame.AbsTime } r.SkipTs = r.FirstTs + r.SkipRTPTs = r.Track.Ms2RTPTs(r.SkipTs) r.FirstSeq = r.Frame.Sequence r.Track.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) case READSTATE_FIRST: @@ -107,6 +109,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { return } r.SkipTs = frame.AbsTime - r.beforeJump + r.SkipRTPTs = r.Track.Ms2RTPTs(r.SkipTs) r.Track.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs)) r.State = READSTATE_NORMAL } else { diff --git a/track/video.go b/track/video.go index ea6de8b..6cd0eb1 100644 --- a/track/video.go +++ b/track/video.go @@ -114,7 +114,8 @@ func (vt *Video) WriteAVCC(ts uint32, frame util.BLL) error { if err != nil { return err } - vt.Value.PTS = (ts + cts) * 90 + vt.Value.PTS = vt.Ms2RTPTs(ts + cts) + vt.Value.DTS = vt.Ms2RTPTs(ts) // println(":", vt.Value.Sequence) for nalulen, err := r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) { // var au util.BLL @@ -148,30 +149,30 @@ func (vt *Video) WriteSliceByte(b ...byte) { } // 在I帧前面插入sps pps webrtc需要 -func (av *Video) insertDCRtp() { - head := av.Value.RTP.Next +func (vt *Video) insertDCRtp() { + head := vt.Value.RTP.Next seq := head.Value.SequenceNumber - for _, nalu := range av.ParamaterSets { + for _, nalu := range vt.ParamaterSets { var packet RTPFrame packet.Version = 2 - packet.PayloadType = av.PayloadType + packet.PayloadType = vt.PayloadType packet.Payload = nalu - packet.SSRC = av.SSRC - packet.Timestamp = av.Value.PTS + packet.SSRC = vt.SSRC + packet.Timestamp = vt.Value.PTS packet.Marker = false head.InsertBeforeValue(packet) - av.rtpSequence++ + vt.rtpSequence++ } - av.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool { + vt.Value.RTP.RangeItem(func(item *util.ListItem[RTPFrame]) bool { item.Value.SequenceNumber = seq seq++ return true }) } -func (av *Video) generateTimestamp(ts uint32) { - av.Value.PTS = ts - av.Value.DTS = av.dtsEst.Feed(ts) +func (vt *Video) generateTimestamp(ts uint32) { + vt.Value.PTS = ts + vt.Value.DTS = vt.dtsEst.Feed(ts) } func (vt *Video) SetLostFlag() { @@ -188,7 +189,7 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { b[1] = 1 // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS) // 写入CTS - util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90) + util.PutBE(b[2:5], vt.RTPTs2Ms(rv.PTS-rv.DTS)) rv.AVCC.Push(mem) rv.AUList.Range(func(au *util.BLL) bool { mem = vt.BytesPool.Get(4)