修复关闭Sink死锁问题

This commit is contained in:
yangjiechina
2024-11-21 20:38:09 +08:00
parent 99b9c7871a
commit fff14dcfbf

View File

@@ -349,7 +349,7 @@ func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int
err := sink.Write(index, data, timestamp) err := sink.Write(index, data, timestamp)
if err == nil { if err == nil {
sink.IncreaseSentPacketCount() sink.IncreaseSentPacketCount()
//return return
} }
// 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞. // 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞.
@@ -357,7 +357,7 @@ func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int
_, ok := err.(*transport.ZeroWindowSizeError) _, ok := err.(*transport.ZeroWindowSizeError)
if ok { if ok {
log.Sugar.Errorf("向sink推流超时,关闭连接. sink: %s", sink.GetID()) log.Sugar.Errorf("向sink推流超时,关闭连接. sink: %s", sink.GetID())
sink.Close() go sink.Close()
} }
} }
@@ -486,7 +486,7 @@ func (s *PublishSource) AddSink(sink Sink) {
AddSinkToWaitingQueue(sink.GetSourceID(), sink) AddSinkToWaitingQueue(sink.GetSourceID(), sink)
} else { } else {
if !s.doAddSink(sink) { if !s.doAddSink(sink) {
sink.Close() go sink.Close()
} }
} }
}) })
@@ -735,7 +735,7 @@ func (s *PublishSource) writeHeader() {
for _, sink := range sinks { for _, sink := range sinks {
if !s.doAddSink(sink) { if !s.doAddSink(sink) {
sink.Close() go sink.Close()
} }
} }
} }