diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 00b8d4c..4865f2c 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -89,11 +89,13 @@ func (s *Session) Input(conn net.Conn, data []byte) error { func (s *Session) Close() { //session/conn/stack相关引用, go释放不了...手动赋值为nil s.conn = nil - //释放协议栈 - if s.stack != nil { - s.stack.Close() - s.stack = nil - } + + defer func() { + if s.stack != nil { + s.stack.Close() + s.stack = nil + } + }() //还没到publish/play if s.handle == nil { diff --git a/stream/source.go b/stream/source.go index 6c07587..0f3f06f 100644 --- a/stream/source.go +++ b/stream/source.go @@ -157,13 +157,13 @@ type PublishSource struct { inputCB func(data []byte) error //子类Input回调 closeCB func() //子类Close回调 - //所有的输出协议, 持有Sink - transStreams map[TransStreamId]TransStream + transStreams map[TransStreamId]TransStream //所有的输出流, 持有Sink //sink的拉流和断开拉流事件,都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作 //golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件 inputDataEvent chan []byte - closedEvent chan byte + closedEvent chan byte //发送关闭事件 + closedConsumedEvent chan byte //关闭事件已经被消费 playingEventQueue chan Sink playingDoneEventQueue chan Sink probeTimoutEvent chan bool @@ -172,9 +172,9 @@ type PublishSource struct { removeSinkTime time.Time receiveDataTimer *time.Timer idleTimer *time.Timer - sinkCount int - closed bool - firstPacket bool + sinkCount int //拉流计数 + closed bool //是否已经被关闭 + firstPacket bool //是否第一包 urlValues url.Values } @@ -196,6 +196,7 @@ func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), re //-2是为了保证从管道取到流, 到处理完流.整个过程安全的, 不会被覆盖 s.inputDataEvent = make(chan []byte, receiveQueueSize-2) s.closedEvent = make(chan byte) + s.closedConsumedEvent = make(chan byte) s.playingEventQueue = make(chan Sink, 128) s.playingDoneEventQueue = make(chan Sink, 128) s.probeTimoutEvent = make(chan bool) @@ -286,6 +287,7 @@ func (s *PublishSource) LoopEvent() { break case _ = <-s.closedEvent: s.doClose() + s.closedConsumedEvent <- 1 return case _ = <-s.probeTimoutEvent: s.writeHeader() @@ -537,6 +539,7 @@ func (s *PublishSource) doClose() { func (s *PublishSource) Close() { s.AddEvent(SourceEventClose, nil) + <-s.closedConsumedEvent } func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {