server: make Close() wait for all resources to close

This commit is contained in:
aler9
2021-11-12 17:38:35 +01:00
committed by Alessandro Ros
parent 08ab7f87ac
commit f304ec52fb
5 changed files with 12 additions and 51 deletions

View File

@@ -222,7 +222,7 @@ type Client struct {
checkStreamInitial bool checkStreamInitial bool
tcpLastFrameTime int64 tcpLastFrameTime int64
keepaliveTimer *time.Timer keepaliveTimer *time.Timer
finalErr error closeError error
// connCloser channels // connCloser channels
connCloserTerminate chan struct{} connCloserTerminate chan struct{}
@@ -412,14 +412,14 @@ func (c *Client) StartPublishing(address string, tracks Tracks) error {
func (c *Client) Close() error { func (c *Client) Close() error {
c.ctxCancel() c.ctxCancel()
<-c.done <-c.done
return c.finalErr return c.closeError
} }
// Wait waits until all client resources are closed. // Wait waits until all client resources are closed.
// This can happen when a read error occurs or when Close() is called. // This can happen when a read error occurs or when Close() is called.
func (c *Client) Wait() error { func (c *Client) Wait() error {
<-c.done <-c.done
return c.finalErr return c.closeError
} }
// Tracks returns all the tracks that the client is reading or publishing. // Tracks returns all the tracks that the client is reading or publishing.
@@ -444,7 +444,7 @@ func (c *Client) Tracks() Tracks {
func (c *Client) run() { func (c *Client) run() {
defer close(c.done) defer close(c.done)
c.finalErr = func() error { c.closeError = func() error {
for { for {
select { select {
case req := <-c.options: case req := <-c.options:
@@ -1746,7 +1746,7 @@ func (c *Client) WritePacketRTP(trackID int, payload []byte) error {
if !c.writeFrameAllowed { if !c.writeFrameAllowed {
select { select {
case <-c.done: case <-c.done:
return c.finalErr return c.closeError
default: default:
return nil return nil
} }
@@ -1784,7 +1784,7 @@ func (c *Client) WritePacketRTCP(trackID int, payload []byte) error {
if !c.writeFrameAllowed { if !c.writeFrameAllowed {
select { select {
case <-c.done: case <-c.done:
return c.finalErr return c.closeError
default: default:
return nil return nil
} }

View File

@@ -142,7 +142,7 @@ type Server struct {
udpRTCPListener *serverUDPListener udpRTCPListener *serverUDPListener
sessions map[string]*ServerSession sessions map[string]*ServerSession
conns map[*ServerConn]struct{} conns map[*ServerConn]struct{}
exitError error closeError error
streams map[*ServerStream]struct{} streams map[*ServerStream]struct{}
// in // in
@@ -302,18 +302,18 @@ func (s *Server) Start() error {
return nil return nil
} }
// Close closes all the server resources. // Close closes all the server resources and waits for the to close.
// It doesn't wait for the server resources to close (use Wait for that).
func (s *Server) Close() error { func (s *Server) Close() error {
s.ctxCancel() s.ctxCancel()
return nil s.wg.Wait()
return s.closeError
} }
// Wait waits until all server resources are closed. // Wait waits until all server resources are closed.
// This can happen when a fatal error occurs or when Close() is called. // This can happen when a fatal error occurs or when Close() is called.
func (s *Server) Wait() error { func (s *Server) Wait() error {
s.wg.Wait() s.wg.Wait()
return s.exitError return s.closeError
} }
func (s *Server) run() { func (s *Server) run() {
@@ -355,7 +355,7 @@ func (s *Server) run() {
} }
}() }()
s.exitError = func() error { s.closeError = func() error {
for { for {
select { select {
case err := <-acceptErr: case err := <-acceptErr:

View File

@@ -164,7 +164,6 @@ func TestServerPublishErrorAnnounce(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -252,7 +251,6 @@ func TestServerPublishSetupPath(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -349,7 +347,6 @@ func TestServerPublishErrorSetupDifferentPaths(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -433,7 +430,6 @@ func TestServerPublishErrorSetupTrackTwice(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -534,7 +530,6 @@ func TestServerPublishErrorRecordPartialTracks(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -675,7 +670,6 @@ func TestServerPublish(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -876,7 +870,6 @@ func TestServerPublishNonStandardFrameSize(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -978,7 +971,6 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1081,7 +1073,6 @@ func TestServerPublishRTCPReport(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1242,7 +1233,6 @@ func TestServerPublishTimeout(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1372,7 +1362,6 @@ func TestServerPublishWithoutTeardown(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1489,7 +1478,6 @@ func TestServerPublishUDPChangeConn(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
sxID := "" sxID := ""

View File

@@ -110,7 +110,6 @@ func TestServerReadSetupPath(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -176,7 +175,6 @@ func TestServerReadSetupErrors(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -344,7 +342,6 @@ func TestServerRead(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", listenIP+":8554") nconn, err := net.Dial("tcp", listenIP+":8554")
@@ -604,7 +601,6 @@ func TestServerReadNonStandardFrameSize(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -705,7 +701,6 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -778,7 +773,6 @@ func TestServerReadPlayPlay(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -887,7 +881,6 @@ func TestServerReadPlayPausePlay(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1003,7 +996,6 @@ func TestServerReadPlayPausePause(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1116,7 +1108,6 @@ func TestServerReadTimeout(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1215,7 +1206,6 @@ func TestServerReadWithoutTeardown(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554") nconn, err := net.Dial("tcp", "localhost:8554")
@@ -1303,7 +1293,6 @@ func TestServerReadUDPChangeConn(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
sxID := "" sxID := ""
@@ -1396,7 +1385,6 @@ func TestServerReadErrorUDPSamePorts(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
func() { func() {
@@ -1503,7 +1491,6 @@ func TestServerReadNonSetuppedPath(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1669,7 +1656,6 @@ func TestServerReadAdditionalInfos(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
buf, err := (&rtp.Packet{ buf, err := (&rtp.Packet{

View File

@@ -436,7 +436,6 @@ func TestServerHighLevelPublishRead(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
switch ca.publisherSoft { switch ca.publisherSoft {
@@ -538,7 +537,6 @@ func TestServerClose(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
s.Close() s.Close()
s.Close() s.Close()
} }
@@ -583,7 +581,6 @@ func TestServerConnClose(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -599,7 +596,6 @@ func TestServerCSeq(t *testing.T) {
} }
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -634,7 +630,6 @@ func TestServerErrorCSeqMissing(t *testing.T) {
} }
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -667,7 +662,6 @@ func TestServerErrorInvalidMethod(t *testing.T) {
} }
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -717,7 +711,6 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn1, err := net.Dial("tcp", "localhost:8554") conn1, err := net.Dial("tcp", "localhost:8554")
@@ -816,7 +809,6 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -903,7 +895,6 @@ func TestServerGetSetParameter(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -976,7 +967,6 @@ func TestServerErrorInvalidSession(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1020,7 +1010,6 @@ func TestServerSessionClose(t *testing.T) {
err := s.Start() err := s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1076,7 +1065,6 @@ func TestServerSessionAutoClose(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")
@@ -1155,7 +1143,6 @@ func TestServerErrorInvalidPath(t *testing.T) {
err = s.Start() err = s.Start()
require.NoError(t, err) require.NoError(t, err)
defer s.Wait()
defer s.Close() defer s.Close()
conn, err := net.Dial("tcp", "localhost:8554") conn, err := net.Dial("tcp", "localhost:8554")