diff --git a/server.go b/server.go index 7d311e6c..32bdf71e 100644 --- a/server.go +++ b/server.go @@ -153,14 +153,11 @@ type Server struct { sessions map[string]*ServerSession conns map[*ServerConn]struct{} closeError error - streams map[*ServerStream]struct{} // in connClose chan *ServerConn sessionRequest chan sessionRequestReq sessionClose chan *ServerSession - streamAdd chan *ServerStream - streamRemove chan *ServerStream streamMulticastIP chan streamMulticastIPReq } @@ -337,12 +334,9 @@ func (s *Server) run() { s.sessions = make(map[string]*ServerSession) s.conns = make(map[*ServerConn]struct{}) - s.streams = make(map[*ServerStream]struct{}) s.connClose = make(chan *ServerConn) s.sessionRequest = make(chan sessionRequestReq) s.sessionClose = make(chan *ServerSession) - s.streamAdd = make(chan *ServerStream) - s.streamRemove = make(chan *ServerStream) s.streamMulticastIP = make(chan streamMulticastIPReq) s.wg.Add(1) @@ -446,12 +440,6 @@ func (s *Server) run() { delete(s.sessions, ss.secretID) ss.Close() - case st := <-s.streamAdd: - s.streams[st] = struct{}{} - - case st := <-s.streamRemove: - delete(s.streams, st) - case req := <-s.streamMulticastIP: ip32 := binary.BigEndian.Uint32(s.multicastNextIP) mask := binary.BigEndian.Uint32(s.multicastNet.Mask) @@ -478,10 +466,6 @@ func (s *Server) run() { } s.tcpListener.Close() - - for st := range s.streams { - st.Close() - } } // StartAndWait starts the server and waits until a fatal error. diff --git a/server_publish_test.go b/server_publish_test.go index b71d636f..c0843757 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -201,7 +201,8 @@ func TestServerPublishSetupPath(t *testing.T) { Handler: &testServerHandler{ onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { // make sure that track URLs are not overridden by NewServerStream() - NewServerStream(ctx.Tracks) + stream := NewServerStream(ctx.Tracks) + defer stream.Close() return &base.Response{ StatusCode: base.StatusOK, diff --git a/server_read_test.go b/server_read_test.go index d70fc035..01ad1069 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -95,6 +95,7 @@ func TestServerReadSetupPath(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track, track, track, track, track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -154,6 +155,7 @@ func TestServerReadSetupErrors(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -262,6 +264,7 @@ func TestServerRead(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() counter := uint64(0) @@ -612,6 +615,7 @@ func TestServerReadVLCMulticast(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() listenIP := multicastCapableIP(t) @@ -670,6 +674,7 @@ func TestServerReadNonStandardFrameSize(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -755,6 +760,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ RTSPAddress: "localhost:8554", @@ -853,6 +859,7 @@ func TestServerReadPlayPlay(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -939,6 +946,7 @@ func TestServerReadPlayPausePlay(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1061,6 +1069,7 @@ func TestServerReadPlayPausePause(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1194,6 +1203,7 @@ func TestServerReadTimeout(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1305,6 +1315,7 @@ func TestServerReadWithoutTeardown(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1407,6 +1418,7 @@ func TestServerReadUDPChangeConn(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1512,6 +1524,7 @@ func TestServerReadPartialTracks(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track1, track2}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1685,6 +1698,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track, track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1795,6 +1809,7 @@ func TestServerReadErrorUDPSamePorts(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ diff --git a/server_test.go b/server_test.go index 20a075f4..9e83f1ba 100644 --- a/server_test.go +++ b/server_test.go @@ -694,6 +694,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -796,6 +797,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1065,6 +1067,7 @@ func TestServerSessionAutoClose(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ @@ -1132,6 +1135,7 @@ func TestServerErrorInvalidPath(t *testing.T) { require.NoError(t, err) stream := NewServerStream(Tracks{track}) + defer stream.Close() s := &Server{ Handler: &testServerHandler{ diff --git a/serverstream.go b/serverstream.go index 0893e77c..99bc5a73 100644 --- a/serverstream.go +++ b/serverstream.go @@ -58,13 +58,6 @@ func (st *ServerStream) Close() error { st.mutex.Lock() defer st.mutex.Unlock() - if st.s != nil { - select { - case st.s.streamRemove <- st: - case <-st.s.ctx.Done(): - } - } - for ss := range st.readers { ss.Close() } @@ -117,10 +110,6 @@ func (st *ServerStream) readerAdd( if st.s == nil { st.s = ss.s - select { - case st.s.streamAdd <- st: - case <-st.s.ctx.Done(): - } } switch transport {