mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-27 03:26:01 +08:00
107 lines
1.7 KiB
Go
107 lines
1.7 KiB
Go
package stream
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// 等待队列所有的Sink
|
|
var waitingSinks map[string]map[SinkID]Sink
|
|
|
|
var mutex sync.RWMutex
|
|
|
|
func init() {
|
|
waitingSinks = make(map[string]map[SinkID]Sink, 1024)
|
|
}
|
|
|
|
func AddSinkToWaitingQueue(streamId string, sink Sink) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
|
|
m, ok := waitingSinks[streamId]
|
|
if !ok {
|
|
if m, ok = waitingSinks[streamId]; !ok {
|
|
m = make(map[SinkID]Sink, 64)
|
|
waitingSinks[streamId] = m
|
|
}
|
|
}
|
|
|
|
m[sink.GetID()] = sink
|
|
}
|
|
|
|
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkID) (Sink, bool) {
|
|
var sink Sink
|
|
mutex.Lock()
|
|
defer func() {
|
|
mutex.Unlock()
|
|
if sink != nil {
|
|
sink.StopWaitTimer()
|
|
}
|
|
}()
|
|
|
|
m, ok := waitingSinks[sourceId]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
sink, ok = m[sinkId]
|
|
if ok {
|
|
delete(m, sinkId)
|
|
}
|
|
|
|
return sink, ok
|
|
}
|
|
|
|
func PopWaitingSinks(sourceId string) []Sink {
|
|
var sinks []Sink
|
|
mutex.Lock()
|
|
defer func() {
|
|
mutex.Unlock()
|
|
for _, sink := range sinks {
|
|
sink.StopWaitTimer()
|
|
}
|
|
}()
|
|
|
|
source, ok := waitingSinks[sourceId]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
sinks = make([]Sink, len(source))
|
|
var index = 0
|
|
for _, sink := range source {
|
|
sinks[index] = sink
|
|
index++
|
|
}
|
|
|
|
delete(waitingSinks, sourceId)
|
|
return sinks
|
|
}
|
|
|
|
func CloseWaitingSinks(sourceId string) {
|
|
sinks := PopWaitingSinks(sourceId)
|
|
for _, sink := range sinks {
|
|
sink.Close()
|
|
}
|
|
}
|
|
|
|
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool {
|
|
mutex.RLock()
|
|
defer mutex.RUnlock()
|
|
|
|
source, ok := waitingSinks[sourceId]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
_, ok = source[sinkId]
|
|
return ok
|
|
}
|
|
|
|
func ExistSourceInWaitingQueue(id string) bool {
|
|
mutex.RLock()
|
|
defer mutex.RUnlock()
|
|
|
|
_, ok := waitingSinks[id]
|
|
return ok
|
|
}
|