diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 18b564b..80f2b7f 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -534,23 +534,22 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { return true } + t.sinks[sink.GetID()] = sink + t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink + // 累加拉流计数 if !resume && t.recordSink != sink { t.sinkCount++ log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) } - t.sinks[sink.GetID()] = sink - t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink - // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. _, ok := sink.GetConn().(*transport.Conn) if ok && sink.IsTCPStreaming() { sink.EnableAsyncWriteMode(24) } - // 发送已有的缓存数据 - // 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。 + // 发送已缓存的合并写切片 keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer() if len(keyBuffer) > 0 { if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { @@ -615,12 +614,14 @@ func (t *transStreamPublisher) clearSinkStreaming(sink Sink) { } func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { - t.clearSinkStreaming(sink) - delete(t.sinks, sink.GetID()) + if _, ok := t.sinks[sink.GetID()]; ok { + t.clearSinkStreaming(sink) + delete(t.sinks, sink.GetID()) - t.sinkCount-- - log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) - utils.Assert(t.sinkCount > -1) + t.sinkCount-- + log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) + utils.Assert(t.sinkCount > -1) + } HookPlayDoneEvent(sink) return true