修复rtmp推流结束时, stack先释放, 后续解析剩余的流, 造成rtmp stack 回调流时空指针问题

This commit is contained in:
yangjiechina
2024-07-05 22:03:13 +08:00
parent 217605072e
commit a8f26f74df
2 changed files with 16 additions and 11 deletions

View File

@@ -89,11 +89,13 @@ func (s *Session) Input(conn net.Conn, data []byte) error {
func (s *Session) Close() { func (s *Session) Close() {
//session/conn/stack相关引用, go释放不了...手动赋值为nil //session/conn/stack相关引用, go释放不了...手动赋值为nil
s.conn = nil s.conn = nil
//释放协议栈
if s.stack != nil { defer func() {
s.stack.Close() if s.stack != nil {
s.stack = nil s.stack.Close()
} s.stack = nil
}
}()
//还没到publish/play //还没到publish/play
if s.handle == nil { if s.handle == nil {

View File

@@ -157,13 +157,13 @@ type PublishSource struct {
inputCB func(data []byte) error //子类Input回调 inputCB func(data []byte) error //子类Input回调
closeCB func() //子类Close回调 closeCB func() //子类Close回调
//所有的输出协议, 持有Sink transStreams map[TransStreamId]TransStream //所有的输出, 持有Sink
transStreams map[TransStreamId]TransStream
//sink的拉流和断开拉流事件都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作 //sink的拉流和断开拉流事件都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件 //golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
inputDataEvent chan []byte inputDataEvent chan []byte
closedEvent chan byte closedEvent chan byte //发送关闭事件
closedConsumedEvent chan byte //关闭事件已经被消费
playingEventQueue chan Sink playingEventQueue chan Sink
playingDoneEventQueue chan Sink playingDoneEventQueue chan Sink
probeTimoutEvent chan bool probeTimoutEvent chan bool
@@ -172,9 +172,9 @@ type PublishSource struct {
removeSinkTime time.Time removeSinkTime time.Time
receiveDataTimer *time.Timer receiveDataTimer *time.Timer
idleTimer *time.Timer idleTimer *time.Timer
sinkCount int sinkCount int //拉流计数
closed bool closed bool //是否已经被关闭
firstPacket bool firstPacket bool //是否第一包
urlValues url.Values urlValues url.Values
} }
@@ -196,6 +196,7 @@ func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), re
//-2是为了保证从管道取到流, 到处理完流.整个过程安全的, 不会被覆盖 //-2是为了保证从管道取到流, 到处理完流.整个过程安全的, 不会被覆盖
s.inputDataEvent = make(chan []byte, receiveQueueSize-2) s.inputDataEvent = make(chan []byte, receiveQueueSize-2)
s.closedEvent = make(chan byte) s.closedEvent = make(chan byte)
s.closedConsumedEvent = make(chan byte)
s.playingEventQueue = make(chan Sink, 128) s.playingEventQueue = make(chan Sink, 128)
s.playingDoneEventQueue = make(chan Sink, 128) s.playingDoneEventQueue = make(chan Sink, 128)
s.probeTimoutEvent = make(chan bool) s.probeTimoutEvent = make(chan bool)
@@ -286,6 +287,7 @@ func (s *PublishSource) LoopEvent() {
break break
case _ = <-s.closedEvent: case _ = <-s.closedEvent:
s.doClose() s.doClose()
s.closedConsumedEvent <- 1
return return
case _ = <-s.probeTimoutEvent: case _ = <-s.probeTimoutEvent:
s.writeHeader() s.writeHeader()
@@ -537,6 +539,7 @@ func (s *PublishSource) doClose() {
func (s *PublishSource) Close() { func (s *PublishSource) Close() {
s.AddEvent(SourceEventClose, nil) s.AddEvent(SourceEventClose, nil)
<-s.closedConsumedEvent
} }
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) { func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {