Files
lkm/stream/sink_waitting.go
2025-06-01 15:25:45 +08:00

107 lines
1.7 KiB
Go

package stream
import (
"sync"
)
// 等待队列所有的Sink
var waitingSinks map[string]map[SinkID]Sink
var mutex sync.RWMutex
func init() {
waitingSinks = make(map[string]map[SinkID]Sink, 1024)
}
func AddSinkToWaitingQueue(streamId string, sink Sink) {
mutex.Lock()
defer mutex.Unlock()
m, ok := waitingSinks[streamId]
if !ok {
if m, ok = waitingSinks[streamId]; !ok {
m = make(map[SinkID]Sink, 64)
waitingSinks[streamId] = m
}
}
m[sink.GetID()] = sink
}
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkID) (Sink, bool) {
var sink Sink
mutex.Lock()
defer func() {
mutex.Unlock()
if sink != nil {
sink.StopWaitTimer()
}
}()
m, ok := waitingSinks[sourceId]
if !ok {
return nil, false
}
sink, ok = m[sinkId]
if ok {
delete(m, sinkId)
}
return sink, ok
}
func PopWaitingSinks(sourceId string) []Sink {
var sinks []Sink
mutex.Lock()
defer func() {
mutex.Unlock()
for _, sink := range sinks {
sink.StopWaitTimer()
}
}()
source, ok := waitingSinks[sourceId]
if !ok {
return nil
}
sinks = make([]Sink, len(source))
var index = 0
for _, sink := range source {
sinks[index] = sink
index++
}
delete(waitingSinks, sourceId)
return sinks
}
func CloseWaitingSinks(sourceId string) {
sinks := PopWaitingSinks(sourceId)
for _, sink := range sinks {
sink.Close()
}
}
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool {
mutex.RLock()
defer mutex.RUnlock()
source, ok := waitingSinks[sourceId]
if !ok {
return false
}
_, ok = source[sinkId]
return ok
}
func ExistSourceInWaitingQueue(id string) bool {
mutex.RLock()
defer mutex.RUnlock()
_, ok := waitingSinks[id]
return ok
}