diff --git a/common/index.go b/common/index.go index 150dff9..761e89a 100644 --- a/common/index.go +++ b/common/index.go @@ -5,6 +5,8 @@ import ( "time" "github.com/pion/rtp" + "go.uber.org/zap" + "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -12,19 +14,27 @@ type TimelineData[T any] struct { Timestamp time.Time Value T } +type TrackState byte + +const ( + TrackStateOnline TrackState = iota // 上线 + TrackStateOffline // 下线 +) // Base 基础Track类 type Base struct { Name string + log.Zap `json:"-"` Stream IStream `json:"-"` Attached atomic.Bool `json:"-"` + State TrackState ts time.Time bytes int frames int BPS int FPS int - RawPart []int // 裸数据片段用于UI上显示 RawSize int // 裸数据长度 + RawPart []int // 裸数据片段用于UI上显示 BPSs []TimelineData[int] // 10s码率统计 FPSs []TimelineData[int] // 10s帧率统计 } @@ -39,7 +49,7 @@ func (bt *Base) ComputeBPS(bytes int) { bt.frames = 0 bt.ts = time.Now() bt.BPSs = append(bt.BPSs, TimelineData[int]{Timestamp: bt.ts, Value: bt.BPS}) - if len(bt.BPSs) > 10 { + if len(bt.BPSs) > 9 { copy(bt.BPSs, bt.BPSs[1:]) bt.BPSs = bt.BPSs[:10] } @@ -61,6 +71,17 @@ func (bt *Base) Flush(bf *BaseFrame) { bf.Timestamp = time.Now() } func (bt *Base) SetStuff(stuff ...any) { + for _, s := range stuff { + switch v := s.(type) { + case IStream: + bt.Stream = v + bt.Zap = v.With(zap.String("track", bt.Name)) + case TrackState: + bt.State = v + case string: + bt.Name = v + } + } } type Track interface { diff --git a/stream.go b/stream.go index 16fbba0..864b9e0 100644 --- a/stream.go +++ b/stream.go @@ -161,8 +161,16 @@ type Tracks struct { } func (tracks *Tracks) Add(name string, t Track) bool { - if v, ok := t.(*track.Video); ok && tracks.MainVideo == nil { - tracks.MainVideo = v + switch v := t.(type) { + case *track.Video: + if tracks.MainVideo == nil { + tracks.MainVideo = v + tracks.SetIDR(v) + } + case *track.Audio: + if tracks.MainVideo != nil { + v.Narrow() + } } return tracks.Map.Add(name, t) } @@ -281,16 +289,18 @@ func (r *Stream) action(action StreamAction) (ok bool) { waitTime := time.Duration(0) if r.Publisher != nil { waitTime = r.Publisher.GetPublisher().Config.WaitCloseTimeout + r.Tracks.Range(func(name string, t Track) { + t.SetStuff(TrackStateOffline) + }) } r.Subscribers.OnPublisherLost(event) - suber := r.Subscribers.Pick() - if suber != nil { + if suber := r.Subscribers.Pick(); suber != nil { r.Subscribers.Broadcast(stateEvent) if waitTime == 0 { waitTime = suber.GetSubscriber().Config.WaitTimeout } } else if waitTime == 0 { - waitTime = time.Second //没有订阅者也没有配置发布者等待重连时间,默认1秒后关闭流 + waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流 } r.timeout.Reset(waitTime) case STATE_PUBLISHING: diff --git a/subscriber.go b/subscriber.go index f18fdfc..81c1982 100644 --- a/subscriber.go +++ b/subscriber.go @@ -5,7 +5,6 @@ import ( "io" "net" "strconv" - "time" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -168,25 +167,26 @@ func (s *Subscriber) PlayBlock(subType byte) { return } s.Info("playblock") - var lastPlayAbsTime uint32 //最新的音频或者视频的时间戳,用于音视频同步 s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) ctx := s.TrackPlayer.Context sendVideoDecConf := func() { + s.Debug("sendVideoDecConf") spesic.OnEvent(s.Video.ParamaterSets) spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead)) } sendAudioDecConf := func() { + s.Debug("sendAudioDecConf") spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead)) } var sendAudioFrame, sendVideoFrame func(*AVFrame) switch subType { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { - // println("v", frame.Sequence, s.VideoReader.AbsTime, frame.IFrame) + // println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) spesic.OnEvent(VideoFrame{frame, s.VideoReader.AbsTime, frame.PTS - s.VideoReader.SkipTs*90, frame.DTS - s.VideoReader.SkipTs*90}) } sendAudioFrame = func(frame *AVFrame) { - // println("a", frame.Sequence, s.AudioReader.AbsTime) + // println("a", frame.Sequence, frame.AbsTime, s.AudioReader.AbsTime) spesic.OnEvent(AudioFrame{frame, s.AudioReader.AbsTime, s.AudioReader.AbsTime * 90, s.AudioReader.AbsTime * 90}) } case SUBTYPE_RTP: @@ -254,54 +254,46 @@ func (s *Subscriber) PlayBlock(subType byte) { subMode, _ = strconv.Atoi(s.Args.Get(s.Config.SubModeArgName)) } var videoFrame, audioFrame *AVFrame + var lastAbsTime uint32 + hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio + if !hasAudio && !hasVideo { + s.Error("play neither video nor audio") + return + } for ctx.Err() == nil { - hasVideo, hasAudio := s.VideoReader.Track != nil && s.Config.SubVideo, s.AudioReader.Track != nil && s.Config.SubAudio if hasVideo { - if videoFrame != nil { - if videoFrame.CanRead { - sendVideoFrame(videoFrame) - } - videoFrame = nil - } for ctx.Err() == nil { s.VideoReader.Read(ctx, subMode) frame := s.VideoReader.Frame - // println("video", frame.Sequence, frame.AbsTime) if frame == nil || ctx.Err() != nil { return } + if frame.IFrame && s.VideoReader.DecConfChanged() { + s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq + sendVideoDecConf() + } if audioFrame != nil { - if frame.AbsTime > audioFrame.AbsTime { + if frame.AbsTime > lastAbsTime { if audioFrame.CanRead { sendAudioFrame(audioFrame) } - audioFrame = nil + videoFrame = frame + lastAbsTime = frame.AbsTime + break } - } - if frame.IFrame && s.VideoReader.DecConfChanged() { - s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq - // println(s.Video.confSeq, s.Video.Track.SPSInfo.Width, s.Video.Track.SPSInfo.Height) - sendVideoDecConf() - } - if !s.Config.IFrameOnly || frame.IFrame { - if frame.AbsTime > lastPlayAbsTime { - lastPlayAbsTime = frame.AbsTime + } else if lastAbsTime == 0 { + if lastAbsTime = frame.AbsTime; lastAbsTime != 0 { videoFrame = frame break - } else { - sendVideoFrame(frame) } } + if !s.Config.IFrameOnly || frame.IFrame { + sendVideoFrame(frame) + } } } // 正常模式下或者纯音频模式下,音频开始播放 if hasAudio { - if audioFrame != nil { - if audioFrame.CanRead { - sendAudioFrame(audioFrame) - } - audioFrame = nil - } for ctx.Err() == nil { switch s.AudioReader.State { case track.READSTATE_INIT: @@ -319,30 +311,25 @@ func (s *Subscriber) PlayBlock(subType byte) { if frame == nil || ctx.Err() != nil { return } - if videoFrame != nil { - if frame.AbsTime > videoFrame.AbsTime { - if videoFrame.CanRead { - sendVideoFrame(videoFrame) - } - videoFrame = nil - } - } if s.AudioReader.DecConfChanged() { s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq sendAudioDecConf() } - if frame.AbsTime > lastPlayAbsTime { - lastPlayAbsTime = frame.AbsTime - audioFrame = frame - break - } else { + if videoFrame != nil { + if frame.AbsTime > lastAbsTime { + if videoFrame.CanRead { + sendVideoFrame(videoFrame) + } + audioFrame = frame + lastAbsTime = frame.AbsTime + break + } + } + if frame.AbsTime >= s.AudioReader.SkipTs { sendAudioFrame(frame) } } } - if !hasVideo && !hasAudio { - time.Sleep(time.Second) - } } } diff --git a/track/aac.go b/track/aac.go index 514ded0..31a5ff0 100644 --- a/track/aac.go +++ b/track/aac.go @@ -58,7 +58,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { if aac.lack > 0 { rawLen := aac.Value.AUList.ByteLength if rawLen == 0 { - aac.Stream.Error("lack >0 but rawlen=0") + aac.Error("lack >0 but rawlen=0") } last := aac.Value.AUList.Pre auLen := len(frame.Payload) - startOffset @@ -67,7 +67,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { aac.lack -= auLen return } else if aac.lack < auLen { - aac.Stream.Warn("lack < auLen", zap.Int("lack", aac.lack), zap.Int("auLen", auLen)) + aac.Warn("lack < auLen", zap.Int("lack", aac.lack), zap.Int("auLen", auLen)) } last.Value.Push(aac.BytesPool.GetShell(frame.Payload[startOffset : startOffset+aac.lack])) aac.lack = 0 @@ -99,7 +99,7 @@ func (aac *AAC) WriteSequenceHead(sh []byte) { func (aac *AAC) WriteAVCC(ts uint32, frame util.BLL) error { if l := frame.ByteLength; l < 4 { - aac.Stream.Error("AVCC data too short", zap.Int("len", l)) + aac.Error("AVCC data too short", zap.Int("len", l)) return io.ErrShortWrite } if frame.GetByte(1) == 0 { diff --git a/track/base.go b/track/base.go index 12e9edd..c8967c2 100644 --- a/track/base.go +++ b/track/base.go @@ -84,6 +84,7 @@ type Media struct { RTPMuxer RTPDemuxer SpesificTrack `json:"-"` + deltaTs int64 //用于接续发布后时间戳连续 流速控制 } @@ -106,10 +107,9 @@ func (av *Media) SetSpeedLimit(value time.Duration) { } func (av *Media) SetStuff(stuff ...any) { + // 代表发布者已经离线,该Track成为遗留Track,等待下一任发布者接续发布 for _, s := range stuff { switch v := s.(type) { - case string: - av.Name = v case int: av.Init(v) av.SSRC = uint32(uintptr(unsafe.Pointer(av))) @@ -118,12 +118,12 @@ func (av *Media) SetStuff(stuff ...any) { av.SampleRate = v case byte: av.PayloadType = v - case IStream: - av.Stream = v case util.BytesPool: av.BytesPool = v case SpesificTrack: av.SpesificTrack = v + default: + av.Base.SetStuff(v) } } } @@ -189,6 +189,16 @@ func (av *Media) AddIDR() { func (av *Media) Flush() { curValue, preValue, nextValue := &av.Value, av.LastValue, av.Next() + if av.State == TrackStateOffline { + av.State = TrackStateOnline + av.deltaTs = int64(preValue.AbsTime) - int64(curValue.AbsTime) + int64(preValue.DeltaTime) + av.Info("track back online") + } + if av.deltaTs != 0 { + curValue.DTS = uint32(int64(curValue.DTS) + av.deltaTs*90) + curValue.PTS = uint32(int64(curValue.PTS) + av.deltaTs*90) + curValue.AbsTime = 0 + } bufferTime := av.Stream.GetPublisherConfig().BufferTime if bufferTime > 0 && av.IDRingList.Length > 1 && time.Duration(curValue.AbsTime-av.IDRingList.Next.Next.Value.Value.AbsTime)*time.Millisecond > bufferTime { av.ShiftIDR() diff --git a/track/g711.go b/track/g711.go index 9fd8d51..7932950 100644 --- a/track/g711.go +++ b/track/g711.go @@ -39,7 +39,7 @@ type G711 struct { func (g711 *G711) WriteAVCC(ts uint32, frame util.BLL) error { if l := frame.ByteLength; l < 2 { - g711.Stream.Error("AVCC data too short", zap.Int("len", l)) + g711.Error("AVCC data too short", zap.Int("len", l)) return io.ErrShortWrite } g711.Value.AUList.Push(g711.BytesPool.GetShell(frame.Next.Value[1:])) diff --git a/track/h264.go b/track/h264.go index dd4c31a..659d69e 100644 --- a/track/h264.go +++ b/track/h264.go @@ -68,13 +68,13 @@ func (vt *H264) WriteSliceBytes(slice []byte) { case codec.NALU_Access_Unit_Delimiter: case codec.NALU_Filler_Data: default: - vt.Stream.Error("H264 WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType))) + vt.Error("WriteSliceBytes naluType not support", zap.Int("naluType", int(naluType))) } } func (vt *H264) WriteAVCC(ts uint32, frame util.BLL) (err error) { if l := frame.ByteLength; l < 6 { - vt.Stream.Error("AVCC data too short", zap.Int("len", l)) + vt.Error("AVCC data too short", zap.Int("len", l)) return io.ErrShortWrite } if frame.GetByte(1) == 0 { @@ -110,7 +110,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { if buffer.Len() >= nextSize { vt.WriteSliceBytes(buffer.ReadN(nextSize)) } else { - vt.Stream.Error("invalid nalu size", zap.Int("naluType", int(naluType))) + vt.Error("invalid nalu size", zap.Int("naluType", int(naluType))) return } } diff --git a/track/h265.go b/track/h265.go index 48d0c0a..308d80e 100644 --- a/track/h265.go +++ b/track/h265.go @@ -58,13 +58,13 @@ func (vt *H265) WriteSliceBytes(slice []byte) { case codec.NAL_UNIT_SEI: vt.AppendAuBytes(slice) default: - vt.Video.Stream.Warn("h265 slice type not supported", zap.Uint("type", uint(t))) + vt.Warn("h265 slice type not supported", zap.Uint("type", uint(t))) } } func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) (err error) { if l := frame.ByteLength; l < 6 { - vt.Stream.Error("AVCC data too short", zap.Int("len", l)) + vt.Error("AVCC data too short", zap.Int("len", l)) return io.ErrShortWrite } if frame.GetByte(1) == 0 { @@ -74,7 +74,7 @@ func (vt *H265) WriteAVCC(ts uint32, frame util.BLL) (err error) { vt.SPSInfo, _ = codec.ParseHevcSPS(vt.SequenceHead) vt.nalulenSize = (int(vt.SequenceHead[26]) & 0x03) + 1 } else { - vt.Stream.Error("H265 ParseVpsSpsPps Error") + vt.Error("H265 ParseVpsSpsPps Error") vt.Stream.Close() } return diff --git a/track/reader-av.go b/track/reader-av.go index b80ee9a..4890879 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -5,6 +5,7 @@ import ( "runtime" "time" + "go.uber.org/zap" "m7s.live/engine/v4/common" "m7s.live/engine/v4/util" ) @@ -64,9 +65,12 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.ctx = ctx switch r.State { case READSTATE_INIT: + r.Track.Info("start read", zap.Int("mode", mode)) startRing := r.Track.Ring if r.Track.IDRing != nil { startRing = r.Track.IDRing + } else { + r.Track.Warn("no IDRring") } switch mode { case 0: @@ -94,6 +98,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { } r.SkipTs = r.FirstTs r.FirstSeq = r.Frame.Sequence + r.Track.Info("first frame read", zap.Uint32("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq)) case READSTATE_FIRST: if r.Track.IDRing.Value.Sequence != r.FirstSeq { r.Ring = r.Track.IDRing @@ -102,6 +107,7 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { return } r.SkipTs = frame.AbsTime - r.beforeJump + r.Track.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Uint32("skipTs", r.SkipTs)) r.State = READSTATE_NORMAL } else { r.MoveNext() @@ -117,5 +123,6 @@ func (r *AVRingReader) Read(ctx context.Context, mode int) (err error) { r.ReadFrame() } r.AbsTime = r.Frame.AbsTime - r.SkipTs + // println(r.Track.Name, r.State, r.Frame.AbsTime, r.SkipTs, r.AbsTime) return } diff --git a/track/rtp.go b/track/rtp.go index 76ae010..aca5f19 100644 --- a/track/rtp.go +++ b/track/rtp.go @@ -12,7 +12,7 @@ const RTPMTU = 1400 func (av *Media) UnmarshalRTPPacket(p *rtp.Packet) (frame *RTPFrame) { if av.PayloadType != p.PayloadType { - av.Stream.Warn("RTP PayloadType error", zap.Uint8("want", av.PayloadType), zap.Uint8("got", p.PayloadType)) + av.Warn("RTP PayloadType error", zap.Uint8("want", av.PayloadType), zap.Uint8("got", p.PayloadType)) return } frame = &RTPFrame{ @@ -26,7 +26,7 @@ func (av *Media) UnmarshalRTP(raw []byte) (frame *RTPFrame) { var p rtp.Packet err := p.Unmarshal(raw) if err != nil { - av.Stream.Warn("RTP Unmarshal error", zap.Error(err)) + av.Warn("RTP Unmarshal error", zap.Error(err)) return } return av.UnmarshalRTPPacket(&p) diff --git a/util/pool.go b/util/pool.go index 6989288..94d6ec9 100644 --- a/util/pool.go +++ b/util/pool.go @@ -282,9 +282,10 @@ func (p BytesPool) Get(size int) (item *ListItem[Buffer]) { type Pool[T any] List[T] func (p *Pool[T]) Get() (item *ListItem[T]) { - item = (*List[T])(p).PoolShift() - if item == nil { - item = &ListItem[T]{} + if item = (*List[T])(p).PoolShift(); item == nil { + item = &ListItem[T]{ + Pool: (*List[T])(p), + } } return }