server: allow to call server.Close() twice

This commit is contained in:
aler9
2021-05-09 14:10:21 +02:00
parent 1b9b19dd84
commit 994cd442e3
4 changed files with 38 additions and 22 deletions

View File

@@ -225,7 +225,7 @@ func (s *Server) Start(address string) error {
return err return err
} }
s.terminate = make(chan struct{}) s.terminate = make(chan struct{}, 1)
s.done = make(chan struct{}) s.done = make(chan struct{})
go s.run() go s.run()
@@ -233,6 +233,22 @@ func (s *Server) Start(address string) error {
return nil return nil
} }
// Close closes all the server resources and waits for the server to exit.
func (s *Server) Close() error {
select {
case s.terminate <- struct{}{}:
default:
}
<-s.done
return nil
}
// Wait waits until a fatal error.
func (s *Server) Wait() error {
<-s.done
return s.exitError
}
func (s *Server) run() { func (s *Server) run() {
s.sessions = make(map[string]*ServerSession) s.sessions = make(map[string]*ServerSession)
s.conns = make(map[*ServerConn]struct{}) s.conns = make(map[*ServerConn]struct{})
@@ -386,19 +402,6 @@ outer:
close(s.done) close(s.done)
} }
// Close closes all the server resources and waits for the server to exit.
func (s *Server) Close() error {
close(s.terminate)
<-s.done
return nil
}
// Wait waits until a fatal error.
func (s *Server) Wait() error {
<-s.done
return s.exitError
}
// StartAndWait starts the server and waits until a fatal error. // StartAndWait starts the server and waits until a fatal error.
func (s *Server) StartAndWait(address string) error { func (s *Server) StartAndWait(address string) error {
err := s.Start(address) err := s.Start(address)

View File

@@ -422,6 +422,17 @@ func TestServerHighLevelPublishRead(t *testing.T) {
} }
} }
func TestServerClose(t *testing.T) {
s := &Server{
Handler: &testServerHandler{},
}
err := s.Start("127.0.0.1:8554")
require.NoError(t, err)
s.Close()
s.Close()
}
func TestServerErrorWrongUDPPorts(t *testing.T) { func TestServerErrorWrongUDPPorts(t *testing.T) {
t.Run("non consecutive", func(t *testing.T) { t.Run("non consecutive", func(t *testing.T) {
s := &Server{ s := &Server{
@@ -449,6 +460,7 @@ func TestServerConnClose(t *testing.T) {
Handler: &testServerHandler{ Handler: &testServerHandler{
onConnOpen: func(ctx *ServerHandlerOnConnOpenCtx) { onConnOpen: func(ctx *ServerHandlerOnConnOpenCtx) {
ctx.Conn.Close() ctx.Conn.Close()
ctx.Conn.Close()
}, },
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
close(connClosed) close(connClosed)
@@ -887,6 +899,7 @@ func TestServerSessionClose(t *testing.T) {
Handler: &testServerHandler{ Handler: &testServerHandler{
onSessionOpen: func(ctx *ServerHandlerOnSessionOpenCtx) { onSessionOpen: func(ctx *ServerHandlerOnSessionOpenCtx) {
ctx.Session.Close() ctx.Session.Close()
ctx.Session.Close()
}, },
onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) {
close(sessionClosed) close(sessionClosed)

View File

@@ -63,7 +63,7 @@ type ServerConn struct {
// in // in
sessionRemove chan *ServerSession sessionRemove chan *ServerSession
innerTerminate chan struct{} terminate chan struct{}
parentTerminate chan struct{} parentTerminate chan struct{}
} }
@@ -77,7 +77,7 @@ func newServerConn(
wg: wg, wg: wg,
nconn: nconn, nconn: nconn,
sessionRemove: make(chan *ServerSession), sessionRemove: make(chan *ServerSession),
innerTerminate: make(chan struct{}, 1), terminate: make(chan struct{}, 1),
parentTerminate: make(chan struct{}), parentTerminate: make(chan struct{}),
} }
@@ -90,7 +90,7 @@ func newServerConn(
// Close closes the ServerConn. // Close closes the ServerConn.
func (sc *ServerConn) Close() error { func (sc *ServerConn) Close() error {
select { select {
case sc.innerTerminate <- struct{}{}: case sc.terminate <- struct{}{}:
default: default:
} }
return nil return nil
@@ -214,7 +214,7 @@ func (sc *ServerConn) run() {
sc.sessionsWG.Done() sc.sessionsWG.Done()
} }
case <-sc.innerTerminate: case <-sc.terminate:
return liberrors.ErrServerTerminated{} return liberrors.ErrServerTerminated{}
} }
} }

View File

@@ -136,7 +136,7 @@ type ServerSession struct {
// in // in
request chan request request chan request
connRemove chan *ServerConn connRemove chan *ServerConn
innerTerminate chan struct{} terminate chan struct{}
parentTerminate chan struct{} parentTerminate chan struct{}
} }
@@ -156,7 +156,7 @@ func newServerSession(
lastRequestTime: time.Now(), lastRequestTime: time.Now(),
request: make(chan request), request: make(chan request),
connRemove: make(chan *ServerConn), connRemove: make(chan *ServerConn),
innerTerminate: make(chan struct{}, 1), terminate: make(chan struct{}, 1),
parentTerminate: make(chan struct{}), parentTerminate: make(chan struct{}),
} }
@@ -169,7 +169,7 @@ func newServerSession(
// Close closes the ServerSession. // Close closes the ServerSession.
func (ss *ServerSession) Close() error { func (ss *ServerSession) Close() error {
select { select {
case ss.innerTerminate <- struct{}{}: case ss.terminate <- struct{}{}:
default: default:
} }
return nil return nil
@@ -311,7 +311,7 @@ func (ss *ServerSession) run() {
ss.WriteFrame(trackID, StreamTypeRTCP, r) ss.WriteFrame(trackID, StreamTypeRTCP, r)
} }
case <-ss.innerTerminate: case <-ss.terminate:
return liberrors.ErrServerTerminated{} return liberrors.ErrServerTerminated{}
} }
} }