mirror of
https://github.com/aler9/gortsplib
synced 2025-10-05 15:16:51 +08:00
server: simplify tcp handling
This commit is contained in:
@@ -32,20 +32,18 @@ type ServerConn struct {
|
|||||||
s *Server
|
s *Server
|
||||||
nconn net.Conn
|
nconn net.Conn
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel func()
|
ctxCancel func()
|
||||||
remoteAddr *net.TCPAddr // to improve speed
|
remoteAddr *net.TCPAddr // to improve speed
|
||||||
br *bufio.Reader
|
br *bufio.Reader
|
||||||
bw *bufio.Writer
|
bw *bufio.Writer
|
||||||
sessions map[string]*ServerSession
|
sessions map[string]*ServerSession
|
||||||
tcpFrameSetEnabled bool
|
tcpFrameEnabled bool
|
||||||
tcpFrameEnabled bool
|
tcpSession *ServerSession
|
||||||
tcpSession *ServerSession
|
tcpFrameTimeout bool
|
||||||
tcpFrameIsRecording bool
|
tcpReadBuffer *multibuffer.MultiBuffer
|
||||||
tcpFrameTimeout bool
|
tcpProcessFunc func(int, bool, []byte)
|
||||||
tcpReadBuffer *multibuffer.MultiBuffer
|
tcpWriteMutex sync.Mutex
|
||||||
tcpProcessFunc func(int, bool, []byte)
|
|
||||||
tcpWriteMutex sync.Mutex
|
|
||||||
|
|
||||||
// in
|
// in
|
||||||
sessionRemove chan *ServerSession
|
sessionRemove chan *ServerSession
|
||||||
@@ -485,31 +483,6 @@ func (sc *ServerConn) handleRequestOuter(req *base.Request) error {
|
|||||||
|
|
||||||
sc.tcpWriteMutex.Unlock()
|
sc.tcpWriteMutex.Unlock()
|
||||||
|
|
||||||
if sc.tcpFrameSetEnabled != sc.tcpFrameEnabled {
|
|
||||||
sc.tcpFrameEnabled = sc.tcpFrameSetEnabled
|
|
||||||
|
|
||||||
if sc.tcpFrameEnabled {
|
|
||||||
if sc.tcpFrameIsRecording {
|
|
||||||
sc.tcpFrameTimeout = true
|
|
||||||
sc.tcpReadBuffer = multibuffer.New(uint64(sc.s.ReadBufferCount), uint64(sc.s.ReadBufferSize))
|
|
||||||
sc.tcpProcessFunc = sc.tcpProcessRecord
|
|
||||||
} else {
|
|
||||||
// when playing, tcpReadBuffer is only used to receive RTCP receiver reports,
|
|
||||||
// that are much smaller than RTP packets and are sent at a fixed interval.
|
|
||||||
// decrease RAM consumption by allocating less buffers.
|
|
||||||
sc.tcpReadBuffer = multibuffer.New(8, uint64(sc.s.ReadBufferSize))
|
|
||||||
sc.tcpProcessFunc = sc.tcpProcessPlay
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if sc.tcpFrameIsRecording {
|
|
||||||
sc.tcpFrameTimeout = false
|
|
||||||
sc.nconn.SetReadDeadline(time.Time{})
|
|
||||||
}
|
|
||||||
|
|
||||||
sc.tcpReadBuffer = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/aler9/gortsplib/pkg/base"
|
"github.com/aler9/gortsplib/pkg/base"
|
||||||
"github.com/aler9/gortsplib/pkg/headers"
|
"github.com/aler9/gortsplib/pkg/headers"
|
||||||
"github.com/aler9/gortsplib/pkg/liberrors"
|
"github.com/aler9/gortsplib/pkg/liberrors"
|
||||||
|
"github.com/aler9/gortsplib/pkg/multibuffer"
|
||||||
"github.com/aler9/gortsplib/pkg/ringbuffer"
|
"github.com/aler9/gortsplib/pkg/ringbuffer"
|
||||||
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
|
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
|
||||||
)
|
)
|
||||||
@@ -843,8 +844,13 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
|
|||||||
default: // TCP
|
default: // TCP
|
||||||
ss.tcpConn = sc
|
ss.tcpConn = sc
|
||||||
ss.tcpConn.tcpSession = ss
|
ss.tcpConn.tcpSession = ss
|
||||||
ss.tcpConn.tcpFrameIsRecording = false
|
ss.tcpConn.tcpFrameEnabled = true
|
||||||
ss.tcpConn.tcpFrameSetEnabled = true
|
ss.tcpConn.tcpFrameTimeout = false
|
||||||
|
// when playing, tcpReadBuffer is only used to receive RTCP receiver reports,
|
||||||
|
// that are much smaller than RTP packets and are sent at a fixed interval.
|
||||||
|
// decrease RAM consumption by allocating less buffers.
|
||||||
|
ss.tcpConn.tcpReadBuffer = multibuffer.New(8, uint64(sc.s.ReadBufferSize))
|
||||||
|
ss.tcpConn.tcpProcessFunc = sc.tcpProcessPlay
|
||||||
|
|
||||||
ss.writerRunning = true
|
ss.writerRunning = true
|
||||||
ss.writerDone = make(chan struct{})
|
ss.writerDone = make(chan struct{})
|
||||||
@@ -972,8 +978,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
|
|||||||
default: // TCP
|
default: // TCP
|
||||||
ss.tcpConn = sc
|
ss.tcpConn = sc
|
||||||
ss.tcpConn.tcpSession = ss
|
ss.tcpConn.tcpSession = ss
|
||||||
ss.tcpConn.tcpFrameIsRecording = true
|
ss.tcpConn.tcpFrameEnabled = true
|
||||||
ss.tcpConn.tcpFrameSetEnabled = true
|
ss.tcpConn.tcpFrameTimeout = true
|
||||||
|
ss.tcpConn.tcpReadBuffer = multibuffer.New(uint64(sc.s.ReadBufferCount), uint64(sc.s.ReadBufferSize))
|
||||||
|
ss.tcpConn.tcpProcessFunc = sc.tcpProcessRecord
|
||||||
|
|
||||||
ss.writerRunning = true
|
ss.writerRunning = true
|
||||||
ss.writerDone = make(chan struct{})
|
ss.writerDone = make(chan struct{})
|
||||||
@@ -1040,7 +1048,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
|
|||||||
|
|
||||||
default: // TCP
|
default: // TCP
|
||||||
ss.tcpConn.tcpSession = nil
|
ss.tcpConn.tcpSession = nil
|
||||||
ss.tcpConn.tcpFrameSetEnabled = false
|
ss.tcpConn.tcpFrameEnabled = false
|
||||||
|
ss.tcpConn.tcpReadBuffer = nil
|
||||||
ss.tcpConn = nil
|
ss.tcpConn = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1058,7 +1067,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
|
|||||||
|
|
||||||
default: // TCP
|
default: // TCP
|
||||||
ss.tcpConn.tcpSession = nil
|
ss.tcpConn.tcpSession = nil
|
||||||
ss.tcpConn.tcpFrameSetEnabled = false
|
ss.tcpConn.tcpFrameEnabled = false
|
||||||
|
ss.tcpConn.tcpReadBuffer = nil
|
||||||
|
ss.tcpConn.nconn.SetReadDeadline(time.Time{})
|
||||||
ss.tcpConn = nil
|
ss.tcpConn = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user