server: sync states with client ones

This commit is contained in:
aler9
2022-02-18 10:02:20 +01:00
parent d44f1eb03a
commit fe99404495
2 changed files with 41 additions and 41 deletions

View File

@@ -225,7 +225,7 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error {
var tcpReadBuffer *multibuffer.MultiBuffer var tcpReadBuffer *multibuffer.MultiBuffer
var processFunc func(int, bool, []byte) var processFunc func(int, bool, []byte)
if sc.tcpSession.state == ServerSessionStateRead { if sc.tcpSession.state == ServerSessionStatePlay {
// when playing, tcpReadBuffer is only used to receive RTCP receiver reports, // when playing, tcpReadBuffer is only used to receive RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval. // that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers. // decrease RAM consumption by allocating less buffers.
@@ -291,7 +291,7 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error {
var frame base.InterleavedFrame var frame base.InterleavedFrame
for { for {
if sc.tcpSession.state == ServerSessionStatePublish { if sc.tcpSession.state == ServerSessionStateRecord {
sc.conn.SetReadDeadline(time.Now().Add(sc.s.ReadTimeout)) sc.conn.SetReadDeadline(time.Now().Add(sc.s.ReadTimeout))
} }

View File

@@ -116,10 +116,10 @@ type ServerSessionState int
// standard states. // standard states.
const ( const (
ServerSessionStateInitial ServerSessionState = iota ServerSessionStateInitial ServerSessionState = iota
ServerSessionStatePreRead ServerSessionStatePrePlay
ServerSessionStateRead ServerSessionStatePlay
ServerSessionStatePrePublish ServerSessionStatePreRecord
ServerSessionStatePublish ServerSessionStateRecord
) )
// String implements fmt.Stringer. // String implements fmt.Stringer.
@@ -127,13 +127,13 @@ func (s ServerSessionState) String() string {
switch s { switch s {
case ServerSessionStateInitial: case ServerSessionStateInitial:
return "initial" return "initial"
case ServerSessionStatePreRead: case ServerSessionStatePrePlay:
return "prePlay" return "prePlay"
case ServerSessionStateRead: case ServerSessionStatePlay:
return "play" return "play"
case ServerSessionStatePrePublish: case ServerSessionStatePreRecord:
return "preRecord" return "preRecord"
case ServerSessionStatePublish: case ServerSessionStateRecord:
return "record" return "record"
} }
return "unknown" return "unknown"
@@ -316,8 +316,8 @@ func (ss *ServerSession) run() {
} }
// if session is not in state RECORD or PLAY, or transport is TCP // if session is not in state RECORD or PLAY, or transport is TCP
if (ss.state != ServerSessionStatePublish && if (ss.state != ServerSessionStateRecord &&
ss.state != ServerSessionStateRead) || ss.state != ServerSessionStatePlay) ||
*ss.setuppedTransport == TransportTCP { *ss.setuppedTransport == TransportTCP {
// close if there are no associated connections // close if there are no associated connections
if len(ss.conns) == 0 { if len(ss.conns) == 0 {
@@ -326,8 +326,8 @@ func (ss *ServerSession) run() {
} }
case <-ss.startWriter: case <-ss.startWriter:
if !ss.writerRunning && (ss.state == ServerSessionStatePublish || if !ss.writerRunning && (ss.state == ServerSessionStateRecord ||
ss.state == ServerSessionStateRead) && ss.state == ServerSessionStatePlay) &&
*ss.setuppedTransport == TransportTCP { *ss.setuppedTransport == TransportTCP {
ss.writerRunning = true ss.writerRunning = true
ss.writerDone = make(chan struct{}) ss.writerDone = make(chan struct{})
@@ -338,7 +338,7 @@ func (ss *ServerSession) run() {
now := time.Now() now := time.Now()
// in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received // in case of RECORD and UDP, timeout happens when no RTP or RTCP packets are being received
if ss.state == ServerSessionStatePublish { if ss.state == ServerSessionStateRecord {
lft := atomic.LoadInt64(ss.udpLastFrameTime) lft := atomic.LoadInt64(ss.udpLastFrameTime)
if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout {
return liberrors.ErrServerNoUDPPacketsInAWhile{} return liberrors.ErrServerNoUDPPacketsInAWhile{}
@@ -372,14 +372,14 @@ func (ss *ServerSession) run() {
ss.ctxCancel() ss.ctxCancel()
switch ss.state { switch ss.state {
case ServerSessionStateRead: case ServerSessionStatePlay:
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
if *ss.setuppedTransport == TransportUDP { if *ss.setuppedTransport == TransportUDP {
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
} }
case ServerSessionStatePublish: case ServerSessionStateRecord:
if *ss.setuppedTransport == TransportUDP { if *ss.setuppedTransport == TransportUDP {
ss.s.udpRTPListener.removeClient(ss) ss.s.udpRTPListener.removeClient(ss)
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
@@ -546,7 +546,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
return res, err return res, err
} }
ss.state = ServerSessionStatePrePublish ss.state = ServerSessionStatePreRecord
ss.setuppedPath = &path ss.setuppedPath = &path
ss.setuppedQuery = &query ss.setuppedQuery = &query
ss.setuppedBaseURL = req.URL ss.setuppedBaseURL = req.URL
@@ -564,9 +564,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Setup: case base.Setup:
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStateInitial: {}, ServerSessionStateInitial: {},
ServerSessionStatePreRead: {}, ServerSessionStatePrePlay: {},
ServerSessionStatePrePublish: {}, ServerSessionStatePreRecord: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -652,7 +652,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
switch ss.state { switch ss.state {
case ServerSessionStateInitial, ServerSessionStatePreRead: // play case ServerSessionStateInitial, ServerSessionStatePrePlay: // play
if inTH.Mode != nil && *inTH.Mode != headers.TransportModePlay { if inTH.Mode != nil && *inTH.Mode != headers.TransportModePlay {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
@@ -711,7 +711,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}, err }, err
} }
ss.state = ServerSessionStatePreRead ss.state = ServerSessionStatePrePlay
ss.setuppedPath = &path ss.setuppedPath = &path
ss.setuppedQuery = &query ss.setuppedQuery = &query
ss.setuppedStream = stream ss.setuppedStream = stream
@@ -719,7 +719,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
th := headers.Transport{} th := headers.Transport{}
if ss.state == ServerSessionStatePreRead { if ss.state == ServerSessionStatePrePlay {
ssrc := stream.ssrc(trackID) ssrc := stream.ssrc(trackID)
if ssrc != 0 { if ssrc != 0 {
th.SSRC = &ssrc th.SSRC = &ssrc
@@ -796,7 +796,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.setuppedTracks[trackID] = sst ss.setuppedTracks[trackID] = sst
if ss.state == ServerSessionStatePrePublish && *ss.setuppedTransport != TransportTCP { if ss.state == ServerSessionStatePreRecord && *ss.setuppedTransport != TransportTCP {
ss.announcedTracks[trackID].rtcpReceiver = rtcpreceiver.New(nil, ss.announcedTracks[trackID].rtcpReceiver = rtcpreceiver.New(nil,
ss.announcedTracks[trackID].track.ClockRate()) ss.announcedTracks[trackID].track.ClockRate())
} }
@@ -808,8 +808,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Play: case base.Play:
// play can be sent twice, allow calling it even if we're already playing // play can be sent twice, allow calling it even if we're already playing
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStatePreRead: {}, ServerSessionStatePrePlay: {},
ServerSessionStateRead: {}, ServerSessionStatePlay: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -829,7 +829,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
path, query := base.PathSplitQuery(pathAndQuery) path, query := base.PathSplitQuery(pathAndQuery)
if ss.State() == ServerSessionStatePreRead && if ss.State() == ServerSessionStatePrePlay &&
path != *ss.setuppedPath { path != *ss.setuppedPath {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
@@ -845,17 +845,17 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}) })
if res.StatusCode != base.StatusOK { if res.StatusCode != base.StatusOK {
if ss.State() == ServerSessionStatePreRead { if ss.State() == ServerSessionStatePrePlay {
ss.writeBuffer = nil ss.writeBuffer = nil
} }
return res, err return res, err
} }
if ss.state == ServerSessionStateRead { if ss.state == ServerSessionStatePlay {
return res, err return res, err
} }
ss.state = ServerSessionStateRead ss.state = ServerSessionStatePlay
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case TransportUDP: case TransportUDP:
@@ -933,7 +933,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Record: case base.Record:
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStatePrePublish: {}, ServerSessionStatePreRecord: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -977,7 +977,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
return res, err return res, err
} }
ss.state = ServerSessionStatePublish ss.state = ServerSessionStateRecord
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case TransportUDP: case TransportUDP:
@@ -1025,10 +1025,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Pause: case base.Pause:
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStatePreRead: {}, ServerSessionStatePrePlay: {},
ServerSessionStateRead: {}, ServerSessionStatePlay: {},
ServerSessionStatePrePublish: {}, ServerSessionStatePreRecord: {},
ServerSessionStatePublish: {}, ServerSessionStateRecord: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -1067,10 +1067,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
switch ss.state { switch ss.state {
case ServerSessionStateRead: case ServerSessionStatePlay:
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
ss.state = ServerSessionStatePreRead ss.state = ServerSessionStatePrePlay
ss.udpCheckStreamTimer = emptyTimer() ss.udpCheckStreamTimer = emptyTimer()
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
@@ -1087,8 +1087,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.tcpConn = nil ss.tcpConn = nil
} }
case ServerSessionStatePublish: case ServerSessionStateRecord:
ss.state = ServerSessionStatePrePublish ss.state = ServerSessionStatePreRecord
ss.udpCheckStreamTimer = emptyTimer() ss.udpCheckStreamTimer = emptyTimer()
ss.udpReceiverReportTimer = emptyTimer() ss.udpReceiverReportTimer = emptyTimer()