mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-27 03:26:01 +08:00
fix: 关闭未推流的sink, 造成sinkcount为负数, 发生panic问题
This commit is contained in:
@@ -534,23 +534,22 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.sinks[sink.GetID()] = sink
|
||||||
|
t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink
|
||||||
|
|
||||||
// 累加拉流计数
|
// 累加拉流计数
|
||||||
if !resume && t.recordSink != sink {
|
if !resume && t.recordSink != sink {
|
||||||
t.sinkCount++
|
t.sinkCount++
|
||||||
log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source)
|
log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.sinks[sink.GetID()] = sink
|
|
||||||
t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink
|
|
||||||
|
|
||||||
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
|
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
|
||||||
_, ok := sink.GetConn().(*transport.Conn)
|
_, ok := sink.GetConn().(*transport.Conn)
|
||||||
if ok && sink.IsTCPStreaming() {
|
if ok && sink.IsTCPStreaming() {
|
||||||
sink.EnableAsyncWriteMode(24)
|
sink.EnableAsyncWriteMode(24)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送已有的缓存数据
|
// 发送已缓存的合并写切片
|
||||||
// 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。
|
|
||||||
keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
|
keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
|
||||||
if len(keyBuffer) > 0 {
|
if len(keyBuffer) > 0 {
|
||||||
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
|
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
|
||||||
@@ -615,12 +614,14 @@ func (t *transStreamPublisher) clearSinkStreaming(sink Sink) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *transStreamPublisher) doRemoveSink(sink Sink) bool {
|
func (t *transStreamPublisher) doRemoveSink(sink Sink) bool {
|
||||||
t.clearSinkStreaming(sink)
|
if _, ok := t.sinks[sink.GetID()]; ok {
|
||||||
delete(t.sinks, sink.GetID())
|
t.clearSinkStreaming(sink)
|
||||||
|
delete(t.sinks, sink.GetID())
|
||||||
|
|
||||||
t.sinkCount--
|
t.sinkCount--
|
||||||
log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source)
|
log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source)
|
||||||
utils.Assert(t.sinkCount > -1)
|
utils.Assert(t.sinkCount > -1)
|
||||||
|
}
|
||||||
|
|
||||||
HookPlayDoneEvent(sink)
|
HookPlayDoneEvent(sink)
|
||||||
return true
|
return true
|
||||||
|
Reference in New Issue
Block a user