diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index 6d602db..c2e9bf4 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -65,11 +65,7 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte { _ = conn.Close() } - if session.isPublisher { - return stream.TCPReceiveBufferPool.Get().([]byte) - } - - return nil + return session.receiveBuffer } func NewServer() Server { diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 42ecfd1..cd2885c 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -10,10 +10,11 @@ import ( // Session RTMP会话, 解析处理Message type Session struct { - conn net.Conn - stack *rtmp.ServerStack // rtmp协议栈, 解析message - handle interface{} // 持有具体会话句柄(推流端/拉流端), 在@see OnPublish @see OnPlay回调中赋值 - isPublisher bool // 是否是推流会话 + conn net.Conn + stack *rtmp.ServerStack // rtmp协议栈, 解析message + handle interface{} // 持有具体会话句柄(推流端/拉流端), 在@see OnPublish @see OnPlay回调中赋值 + isPublisher bool // 是否是推流会话 + receiveBuffer []byte } func (s *Session) generateSourceID(app, stream string) string { @@ -95,6 +96,11 @@ func (s *Session) Close() { s.stack.Close() s.stack = nil } + + if s.receiveBuffer != nil { + stream.TCPReceiveBufferPool.Put(s.receiveBuffer[:cap(s.receiveBuffer)]) + s.receiveBuffer = nil + } }() // 还未确定会话类型, 无需处理 @@ -118,7 +124,9 @@ func (s *Session) Close() { } func NewSession(conn net.Conn) *Session { - session := &Session{} + session := &Session{ + receiveBuffer: stream.TCPReceiveBufferPool.Get().([]byte), + } stackServer := rtmp.NewStackServer(false) stackServer.SetOnStreamHandler(session)