From 03383d9cfd4e9ed52279aa2729244fbad285fb61 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Wed, 9 Oct 2024 11:48:28 +0800 Subject: [PATCH] fix: track set bug --- api.go | 2 +- pkg/av-reader.go | 11 ++++++++--- pkg/track.go | 1 + publisher.go | 7 +++---- subscriber.go | 14 ++++---------- 5 files changed, 17 insertions(+), 18 deletions(-) diff --git a/api.go b/api.go index 3be5454..b9f248c 100644 --- a/api.go +++ b/api.go @@ -68,7 +68,7 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) { return } rw.Header().Set("Content-Type", "application/octet-stream") - reader := pkg.NewAVRingReader(publisher.VideoTrack.AVTrack) + reader := pkg.NewAVRingReader(publisher.VideoTrack.AVTrack, "Origin") err = reader.StartRead(publisher.VideoTrack.GetIDR()) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) diff --git a/pkg/av-reader.go b/pkg/av-reader.go index 5333bd7..566dc11 100644 --- a/pkg/av-reader.go +++ b/pkg/av-reader.go @@ -42,10 +42,11 @@ func (r *AVRingReader) DecConfChanged() bool { return r.LastCodecCtx != r.Track.ICodecCtx } -func NewAVRingReader(t *AVTrack) *AVRingReader { +func NewAVRingReader(t *AVTrack, dataType string) *AVRingReader { t.Debug("create reader") return &AVRingReader{ - Track: t, + Track: t, + Logger: t.With("reader", dataType), } } @@ -167,7 +168,11 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) { } } r.Delay = r.Track.LastValue.Sequence - r.Value.Sequence - r.Log(context.TODO(), task.TraceLevel, r.Track.FourCC().String(), "ts", r.Value.Timestamp, "delay", r.Delay) + if r.Track.ICodecCtx != nil { + r.Log(context.TODO(), task.TraceLevel, r.Track.FourCC().String(), "ts", r.Value.Timestamp, "delay", r.Delay) + } else { + r.Warn("no codec") + } return } diff --git a/pkg/track.go b/pkg/track.go index 9c3da0e..9bc7325 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -54,6 +54,7 @@ func NewAVTrack(args ...any) (t *AVTrack) { case *AVTrack: t.Logger = v.Logger.With("subtrack", t.FrameType.String()) t.RingWriter = v.RingWriter + t.ready = util.NewPromiseWithTimeout(v.ready.Context, time.Second*5) case *config.Publish: t.RingWriter = NewRingWriter(v.RingSize) t.BufferRange[0] = v.BufferTime diff --git a/publisher.go b/publisher.go index 33febee..31eb68a 100644 --- a/publisher.go +++ b/publisher.go @@ -1,7 +1,6 @@ package m7s import ( - "context" "fmt" "math" "os" @@ -67,11 +66,11 @@ type AVTracks struct { } func (t *AVTracks) Set(track *AVTrack) { + t.Lock() + defer t.Unlock() t.AVTrack = track track.BaseTs = t.baseTs - t.Lock() t.Add(track) - t.Unlock() } func (t *AVTracks) SetMinBuffer(start time.Duration) { @@ -101,7 +100,7 @@ func (t *AVTracks) CheckTimeout(timeout time.Duration) bool { } func (t *AVTracks) CreateSubTrack(dataType reflect.Type) (track *AVTrack) { - track = NewAVTrack(dataType, t.AVTrack, util.NewPromise(context.TODO())) + track = NewAVTrack(dataType, t.AVTrack) track.WrapIndex = t.Length t.Add(track) return diff --git a/subscriber.go b/subscriber.go index f689150..2d681a3 100644 --- a/subscriber.go +++ b/subscriber.go @@ -186,11 +186,8 @@ func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time. if err := at.WaitReady(); err != nil { return } - ar := NewAVRingReader(at) - s.AudioReader = ar - ar.StartTs = startAudioTs - ar.Logger = s.Logger.With("reader", dataType.String()) - ar.Info("start read") + s.AudioReader = NewAVRingReader(at, dataType.String()) + s.AudioReader.StartTs = startAudioTs } return } @@ -213,11 +210,8 @@ func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time. if err := vt.WaitReady(); err != nil { return } - vr := NewAVRingReader(vt) - vr.StartTs = startVideoTs - s.VideoReader = vr - vr.Logger = s.Logger.With("reader", dataType.String()) - vr.Info("start read") + s.VideoReader = NewAVRingReader(vt, dataType.String()) + s.VideoReader.StartTs = startVideoTs } return }