server: rename session states

new states:
* PreRead
* Read
* PrePublish
* Publish
This commit is contained in:
aler9
2021-08-11 12:04:19 +02:00
parent c45a1b3995
commit d05a92be5f

View File

@@ -81,10 +81,10 @@ type ServerSessionState int
// standard states. // standard states.
const ( const (
ServerSessionStateInitial ServerSessionState = iota ServerSessionStateInitial ServerSessionState = iota
ServerSessionStatePrePlay ServerSessionStatePreRead
ServerSessionStatePlay ServerSessionStateRead
ServerSessionStatePreRecord ServerSessionStatePrePublish
ServerSessionStateRecord ServerSessionStatePublish
) )
// String implements fmt.Stringer. // String implements fmt.Stringer.
@@ -92,13 +92,13 @@ func (s ServerSessionState) String() string {
switch s { switch s {
case ServerSessionStateInitial: case ServerSessionStateInitial:
return "initial" return "initial"
case ServerSessionStatePrePlay: case ServerSessionStatePreRead:
return "prePlay" return "prePlay"
case ServerSessionStatePlay: case ServerSessionStateRead:
return "play" return "play"
case ServerSessionStatePreRecord: case ServerSessionStatePrePublish:
return "preRecord" return "preRecord"
case ServerSessionStateRecord: case ServerSessionStatePublish:
return "record" return "record"
} }
return "unknown" return "unknown"
@@ -275,8 +275,8 @@ func (ss *ServerSession) run() {
} }
// if session is not in state RECORD or PLAY, or protocol is TCP // if session is not in state RECORD or PLAY, or protocol is TCP
if (ss.state != ServerSessionStateRecord && if (ss.state != ServerSessionStatePublish &&
ss.state != ServerSessionStatePlay) || ss.state != ServerSessionStateRead) ||
*ss.setuppedProtocol == base.StreamProtocolTCP { *ss.setuppedProtocol == base.StreamProtocolTCP {
// close if there are no active connections // close if there are no active connections
@@ -288,7 +288,7 @@ func (ss *ServerSession) run() {
case <-checkTimeoutTicker.C: case <-checkTimeoutTicker.C:
switch { switch {
// in case of RECORD and UDP, timeout happens when no frames are being received // in case of RECORD and UDP, timeout happens when no frames are being received
case ss.state == ServerSessionStateRecord && *ss.setuppedProtocol == base.StreamProtocolUDP: case ss.state == ServerSessionStatePublish && *ss.setuppedProtocol == base.StreamProtocolUDP:
now := time.Now() now := time.Now()
lft := atomic.LoadInt64(ss.udpLastFrameTime) lft := atomic.LoadInt64(ss.udpLastFrameTime)
if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout {
@@ -296,7 +296,7 @@ func (ss *ServerSession) run() {
} }
// in case of PLAY and UDP, timeout happens when no request arrives // in case of PLAY and UDP, timeout happens when no request arrives
case ss.state == ServerSessionStatePlay && *ss.setuppedProtocol == base.StreamProtocolUDP: case ss.state == ServerSessionStateRead && *ss.setuppedProtocol == base.StreamProtocolUDP:
now := time.Now() now := time.Now()
if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor { if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor {
return liberrors.ErrServerSessionTimedOut{} return liberrors.ErrServerSessionTimedOut{}
@@ -306,7 +306,7 @@ func (ss *ServerSession) run() {
} }
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
if ss.state != ServerSessionStateRecord { if ss.state != ServerSessionStatePublish {
continue continue
} }
@@ -325,7 +325,7 @@ func (ss *ServerSession) run() {
ss.ctxCancel() ss.ctxCancel()
switch ss.state { switch ss.state {
case ServerSessionStatePlay: case ServerSessionStateRead:
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
if *ss.setuppedProtocol == base.StreamProtocolUDP && if *ss.setuppedProtocol == base.StreamProtocolUDP &&
@@ -333,7 +333,7 @@ func (ss *ServerSession) run() {
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
} }
case ServerSessionStateRecord: case ServerSessionStatePublish:
if *ss.setuppedProtocol == base.StreamProtocolUDP { if *ss.setuppedProtocol == base.StreamProtocolUDP {
ss.s.udpRTPListener.removeClient(ss) ss.s.udpRTPListener.removeClient(ss)
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
@@ -491,7 +491,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}) })
if res.StatusCode == base.StatusOK { if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStatePreRecord ss.state = ServerSessionStatePrePublish
ss.setuppedPath = &path ss.setuppedPath = &path
ss.setuppedQuery = &query ss.setuppedQuery = &query
ss.setuppedBaseURL = req.URL ss.setuppedBaseURL = req.URL
@@ -514,8 +514,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Setup: case base.Setup:
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStateInitial: {}, ServerSessionStateInitial: {},
ServerSessionStatePrePlay: {}, ServerSessionStatePreRead: {},
ServerSessionStatePreRecord: {}, ServerSessionStatePrePublish: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -553,7 +553,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}() }()
switch ss.state { switch ss.state {
case ServerSessionStateInitial, ServerSessionStatePrePlay: // play case ServerSessionStateInitial, ServerSessionStatePreRead: // play
if inTH.Mode != nil && *inTH.Mode != headers.TransportModePlay { if inTH.Mode != nil && *inTH.Mode != headers.TransportModePlay {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
@@ -646,7 +646,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}, err }, err
} }
ss.state = ServerSessionStatePrePlay ss.state = ServerSessionStatePreRead
ss.setuppedPath = &path ss.setuppedPath = &path
ss.setuppedQuery = &query ss.setuppedQuery = &query
ss.setuppedStream = stream ss.setuppedStream = stream
@@ -654,7 +654,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
th := headers.Transport{} th := headers.Transport{}
if ss.state == ServerSessionStatePrePlay { if ss.state == ServerSessionStatePreRead {
ssrc := stream.ssrc(trackID) ssrc := stream.ssrc(trackID)
if ssrc != 0 { if ssrc != 0 {
th.SSRC = &ssrc th.SSRC = &ssrc
@@ -737,8 +737,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Play: case base.Play:
// play can be sent twice, allow calling it even if we're already playing // play can be sent twice, allow calling it even if we're already playing
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStatePrePlay: {}, ServerSessionStatePreRead: {},
ServerSessionStatePlay: {}, ServerSessionStateRead: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -764,7 +764,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
path, query := base.PathSplitQuery(pathAndQuery) path, query := base.PathSplitQuery(pathAndQuery)
if ss.State() == ServerSessionStatePrePlay && if ss.State() == ServerSessionStatePreRead &&
path != *ss.setuppedPath { path != *ss.setuppedPath {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
@@ -779,9 +779,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Query: query, Query: query,
}) })
if ss.state != ServerSessionStatePlay { if ss.state != ServerSessionStateRead {
if res.StatusCode == base.StatusOK { if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStatePlay ss.state = ServerSessionStateRead
if *ss.setuppedProtocol == base.StreamProtocolUDP { if *ss.setuppedProtocol == base.StreamProtocolUDP {
ss.udpIP = sc.ip() ss.udpIP = sc.ip()
@@ -852,7 +852,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Record: case base.Record:
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStatePreRecord: {}, ServerSessionStatePrePublish: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -901,7 +901,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}) })
if res.StatusCode == base.StatusOK { if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStateRecord ss.state = ServerSessionStatePublish
if *ss.setuppedProtocol == base.StreamProtocolUDP { if *ss.setuppedProtocol == base.StreamProtocolUDP {
for trackID, track := range ss.setuppedTracks { for trackID, track := range ss.setuppedTracks {
@@ -929,10 +929,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
case base.Pause: case base.Pause:
err := ss.checkState(map[ServerSessionState]struct{}{ err := ss.checkState(map[ServerSessionState]struct{}{
ServerSessionStatePrePlay: {}, ServerSessionStatePreRead: {},
ServerSessionStatePlay: {}, ServerSessionStateRead: {},
ServerSessionStatePreRecord: {}, ServerSessionStatePrePublish: {},
ServerSessionStateRecord: {}, ServerSessionStatePublish: {},
}) })
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
@@ -962,10 +962,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
if res.StatusCode == base.StatusOK { if res.StatusCode == base.StatusOK {
switch ss.state { switch ss.state {
case ServerSessionStatePlay: case ServerSessionStateRead:
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
ss.state = ServerSessionStatePrePlay ss.state = ServerSessionStatePreRead
ss.udpIP = nil ss.udpIP = nil
ss.udpZone = "" ss.udpZone = ""
ss.tcpConn = nil ss.tcpConn = nil
@@ -976,8 +976,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
return res, liberrors.ErrServerTCPFramesDisable{} return res, liberrors.ErrServerTCPFramesDisable{}
} }
case ServerSessionStateRecord: case ServerSessionStatePublish:
ss.state = ServerSessionStatePreRecord ss.state = ServerSessionStatePrePublish
ss.udpIP = nil ss.udpIP = nil
ss.udpZone = "" ss.udpZone = ""
ss.tcpConn = nil ss.tcpConn = nil