From 289c2724692d42b699623f1dad71d0d22087b504 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 7 Dec 2021 22:08:48 +0100 Subject: [PATCH] server: simplify tcp handling --- serverconn.go | 51 ++++++++++++------------------------------------ serversession.go | 23 ++++++++++++++++------ 2 files changed, 29 insertions(+), 45 deletions(-) diff --git a/serverconn.go b/serverconn.go index 23ac12c8..046e94dc 100644 --- a/serverconn.go +++ b/serverconn.go @@ -32,20 +32,18 @@ type ServerConn struct { s *Server nconn net.Conn - ctx context.Context - ctxCancel func() - remoteAddr *net.TCPAddr // to improve speed - br *bufio.Reader - bw *bufio.Writer - sessions map[string]*ServerSession - tcpFrameSetEnabled bool - tcpFrameEnabled bool - tcpSession *ServerSession - tcpFrameIsRecording bool - tcpFrameTimeout bool - tcpReadBuffer *multibuffer.MultiBuffer - tcpProcessFunc func(int, bool, []byte) - tcpWriteMutex sync.Mutex + ctx context.Context + ctxCancel func() + remoteAddr *net.TCPAddr // to improve speed + br *bufio.Reader + bw *bufio.Writer + sessions map[string]*ServerSession + tcpFrameEnabled bool + tcpSession *ServerSession + tcpFrameTimeout bool + tcpReadBuffer *multibuffer.MultiBuffer + tcpProcessFunc func(int, bool, []byte) + tcpWriteMutex sync.Mutex // in sessionRemove chan *ServerSession @@ -485,31 +483,6 @@ func (sc *ServerConn) handleRequestOuter(req *base.Request) error { sc.tcpWriteMutex.Unlock() - if sc.tcpFrameSetEnabled != sc.tcpFrameEnabled { - sc.tcpFrameEnabled = sc.tcpFrameSetEnabled - - if sc.tcpFrameEnabled { - if sc.tcpFrameIsRecording { - sc.tcpFrameTimeout = true - sc.tcpReadBuffer = multibuffer.New(uint64(sc.s.ReadBufferCount), uint64(sc.s.ReadBufferSize)) - sc.tcpProcessFunc = sc.tcpProcessRecord - } else { - // when playing, tcpReadBuffer is only used to receive RTCP receiver reports, - // that are much smaller than RTP packets and are sent at a fixed interval. - // decrease RAM consumption by allocating less buffers. - sc.tcpReadBuffer = multibuffer.New(8, uint64(sc.s.ReadBufferSize)) - sc.tcpProcessFunc = sc.tcpProcessPlay - } - } else { - if sc.tcpFrameIsRecording { - sc.tcpFrameTimeout = false - sc.nconn.SetReadDeadline(time.Time{}) - } - - sc.tcpReadBuffer = nil - } - } - return err } diff --git a/serversession.go b/serversession.go index e53ec9a4..1557f127 100644 --- a/serversession.go +++ b/serversession.go @@ -13,6 +13,7 @@ import ( "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/liberrors" + "github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/ringbuffer" "github.com/aler9/gortsplib/pkg/rtcpreceiver" ) @@ -843,8 +844,13 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base default: // TCP ss.tcpConn = sc ss.tcpConn.tcpSession = ss - ss.tcpConn.tcpFrameIsRecording = false - ss.tcpConn.tcpFrameSetEnabled = true + ss.tcpConn.tcpFrameEnabled = true + ss.tcpConn.tcpFrameTimeout = false + // when playing, tcpReadBuffer is only used to receive RTCP receiver reports, + // that are much smaller than RTP packets and are sent at a fixed interval. + // decrease RAM consumption by allocating less buffers. + ss.tcpConn.tcpReadBuffer = multibuffer.New(8, uint64(sc.s.ReadBufferSize)) + ss.tcpConn.tcpProcessFunc = sc.tcpProcessPlay ss.writerRunning = true ss.writerDone = make(chan struct{}) @@ -972,8 +978,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base default: // TCP ss.tcpConn = sc ss.tcpConn.tcpSession = ss - ss.tcpConn.tcpFrameIsRecording = true - ss.tcpConn.tcpFrameSetEnabled = true + ss.tcpConn.tcpFrameEnabled = true + ss.tcpConn.tcpFrameTimeout = true + ss.tcpConn.tcpReadBuffer = multibuffer.New(uint64(sc.s.ReadBufferCount), uint64(sc.s.ReadBufferSize)) + ss.tcpConn.tcpProcessFunc = sc.tcpProcessRecord ss.writerRunning = true ss.writerDone = make(chan struct{}) @@ -1040,7 +1048,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base default: // TCP ss.tcpConn.tcpSession = nil - ss.tcpConn.tcpFrameSetEnabled = false + ss.tcpConn.tcpFrameEnabled = false + ss.tcpConn.tcpReadBuffer = nil ss.tcpConn = nil } @@ -1058,7 +1067,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base default: // TCP ss.tcpConn.tcpSession = nil - ss.tcpConn.tcpFrameSetEnabled = false + ss.tcpConn.tcpFrameEnabled = false + ss.tcpConn.tcpReadBuffer = nil + ss.tcpConn.nconn.SetReadDeadline(time.Time{}) ss.tcpConn = nil } }