diff --git a/common/frame.go b/common/frame.go index bace995..00dd261 100644 --- a/common/frame.go +++ b/common/frame.go @@ -22,6 +22,8 @@ func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) { } } +const rwmutexMaxReaders = 1 << 30 + type LIRTP = util.ListItem[RTPFrame] type RTPFrame struct { *rtp.Packet @@ -61,7 +63,7 @@ func NewDataFrame[T any]() *DataFrame[T] { } func (df *DataFrame[T]) IsWriting() bool { - return df.readerCount.Load() == -1 + return df.readerCount.Load() < 0 } func (df *DataFrame[T]) IsDiscarded() bool { @@ -85,7 +87,7 @@ func (df *DataFrame[T]) ReaderLeave() int32 { } func (df *DataFrame[T]) StartWrite() bool { - if df.readerCount.CompareAndSwap(0, -1) { + if df.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) { return true } df.L = nil //标记为废弃 @@ -94,9 +96,7 @@ func (df *DataFrame[T]) StartWrite() bool { func (df *DataFrame[T]) Ready() { df.WriteTime = time.Now() - if !df.readerCount.CompareAndSwap(-1, 0) { - panic("Ready") - } + df.readerCount.Add(rwmutexMaxReaders) df.Broadcast() } diff --git a/io.go b/io.go index 2472a5a..4259318 100644 --- a/io.go +++ b/io.go @@ -100,7 +100,7 @@ func (io *IO) IsShutdown() bool { func (i *IO) close(err StopError) bool { if i.IsClosed() { - i.Warn("already closed", err...) + i.Debug("already closed", err...) return false } i.Info("close", err...) diff --git a/subscriber.go b/subscriber.go index 0df59e5..eb5c794 100644 --- a/subscriber.go +++ b/subscriber.go @@ -420,8 +420,8 @@ func (s *Subscriber) PlayBlock(subType byte) { func (s *Subscriber) onStop(reason *zapcore.Field) { if !s.Stream.IsClosed() { - s.Info("play stop", *reason) - if !s.Config.Internal { + s.Stop(*reason) + if s.Config.Internal { s.Stream.Receive(s.Spesific) } } diff --git a/track/reader-data.go b/track/reader-data.go index a24471d..de774bc 100644 --- a/track/reader-data.go +++ b/track/reader-data.go @@ -15,13 +15,12 @@ func (r *RingReader[T, F]) StartRead(ring *util.Ring[F]) (err error) { if r.Value.IsDiscarded() { return ErrDiscard } - if r.Value.IsWriting() { + if r.Value.ReaderEnter() < 0 { // t := time.Now() r.Value.Wait() // log.Info("wait", time.Since(t)) } r.Count++ - r.Value.ReaderEnter() return } diff --git a/util/ring-writer.go b/util/ring-writer.go index a985e1d..1b2c633 100644 --- a/util/ring-writer.go +++ b/util/ring-writer.go @@ -72,7 +72,7 @@ func (rb *RingWriter[T, F]) Glow(size int) (newItem *Ring[F]) { return } -func (rb *RingWriter[T, F]) Recycle(r *Ring[F]) { +func (rb *RingWriter[T, F]) recycle(r *Ring[F]) { rb.poolSize++ r.Value.Init() r.Value.Reset() @@ -87,10 +87,10 @@ func (rb *RingWriter[T, F]) Reduce(size int) { r := rb.Unlink(size) for p := r.Next(); p != r; { next := p.Next() //先保存下一个节点 - if !rb.Value.IsDiscarded() { - rb.Recycle(p.Prev().Unlink(1)) + if rb.Value.IsDiscarded() { + p.Prev().Unlink(1).Value.Reset() } else { - // fmt.Println("Reduce", p.Value.ReaderCount()) + rb.recycle(p) } p = next }