fix: reduce

This commit is contained in:
langhuihui
2024-06-06 08:51:01 +08:00
parent 0c7bd27176
commit 9b574f3ddc
5 changed files with 13 additions and 14 deletions

View File

@@ -22,6 +22,8 @@ func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) {
}
}
const rwmutexMaxReaders = 1 << 30
type LIRTP = util.ListItem[RTPFrame]
type RTPFrame struct {
*rtp.Packet
@@ -61,7 +63,7 @@ func NewDataFrame[T any]() *DataFrame[T] {
}
func (df *DataFrame[T]) IsWriting() bool {
return df.readerCount.Load() == -1
return df.readerCount.Load() < 0
}
func (df *DataFrame[T]) IsDiscarded() bool {
@@ -85,7 +87,7 @@ func (df *DataFrame[T]) ReaderLeave() int32 {
}
func (df *DataFrame[T]) StartWrite() bool {
if df.readerCount.CompareAndSwap(0, -1) {
if df.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
return true
}
df.L = nil //标记为废弃
@@ -94,9 +96,7 @@ func (df *DataFrame[T]) StartWrite() bool {
func (df *DataFrame[T]) Ready() {
df.WriteTime = time.Now()
if !df.readerCount.CompareAndSwap(-1, 0) {
panic("Ready")
}
df.readerCount.Add(rwmutexMaxReaders)
df.Broadcast()
}

2
io.go
View File

@@ -100,7 +100,7 @@ func (io *IO) IsShutdown() bool {
func (i *IO) close(err StopError) bool {
if i.IsClosed() {
i.Warn("already closed", err...)
i.Debug("already closed", err...)
return false
}
i.Info("close", err...)

View File

@@ -420,8 +420,8 @@ func (s *Subscriber) PlayBlock(subType byte) {
func (s *Subscriber) onStop(reason *zapcore.Field) {
if !s.Stream.IsClosed() {
s.Info("play stop", *reason)
if !s.Config.Internal {
s.Stop(*reason)
if s.Config.Internal {
s.Stream.Receive(s.Spesific)
}
}

View File

@@ -15,13 +15,12 @@ func (r *RingReader[T, F]) StartRead(ring *util.Ring[F]) (err error) {
if r.Value.IsDiscarded() {
return ErrDiscard
}
if r.Value.IsWriting() {
if r.Value.ReaderEnter() < 0 {
// t := time.Now()
r.Value.Wait()
// log.Info("wait", time.Since(t))
}
r.Count++
r.Value.ReaderEnter()
return
}

View File

@@ -72,7 +72,7 @@ func (rb *RingWriter[T, F]) Glow(size int) (newItem *Ring[F]) {
return
}
func (rb *RingWriter[T, F]) Recycle(r *Ring[F]) {
func (rb *RingWriter[T, F]) recycle(r *Ring[F]) {
rb.poolSize++
r.Value.Init()
r.Value.Reset()
@@ -87,10 +87,10 @@ func (rb *RingWriter[T, F]) Reduce(size int) {
r := rb.Unlink(size)
for p := r.Next(); p != r; {
next := p.Next() //先保存下一个节点
if !rb.Value.IsDiscarded() {
rb.Recycle(p.Prev().Unlink(1))
if rb.Value.IsDiscarded() {
p.Prev().Unlink(1).Value.Reset()
} else {
// fmt.Println("Reduce", p.Value.ReaderCount())
rb.recycle(p)
}
p = next
}