From 28b07f5f412a90855faa26d41ba821cff12e7a46 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Mon, 21 Jul 2025 10:00:14 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=85=B3=E9=97=AD=E6=9C=AA=E6=8E=A8?= =?UTF-8?q?=E6=B5=81=E7=9A=84sink,=20=E9=80=A0=E6=88=90sinkcount=E4=B8=BA?= =?UTF-8?q?=E8=B4=9F=E6=95=B0,=20=E5=8F=91=E7=94=9Fpanic=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream/stream_publisher.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 18b564b..80f2b7f 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -534,23 +534,22 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { return true } + t.sinks[sink.GetID()] = sink + t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink + // 累加拉流计数 if !resume && t.recordSink != sink { t.sinkCount++ log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) } - t.sinks[sink.GetID()] = sink - t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink - // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. _, ok := sink.GetConn().(*transport.Conn) if ok && sink.IsTCPStreaming() { sink.EnableAsyncWriteMode(24) } - // 发送已有的缓存数据 - // 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。 + // 发送已缓存的合并写切片 keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer() if len(keyBuffer) > 0 { if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { @@ -615,12 +614,14 @@ func (t *transStreamPublisher) clearSinkStreaming(sink Sink) { } func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { - t.clearSinkStreaming(sink) - delete(t.sinks, sink.GetID()) + if _, ok := t.sinks[sink.GetID()]; ok { + t.clearSinkStreaming(sink) + delete(t.sinks, sink.GetID()) - t.sinkCount-- - log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) - utils.Assert(t.sinkCount > -1) + t.sinkCount-- + log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) + utils.Assert(t.sinkCount > -1) + } HookPlayDoneEvent(sink) return true