diff --git a/examples/proxy/client.go b/examples/proxy/client.go index de65f3f7..e8dc24f9 100644 --- a/examples/proxy/client.go +++ b/examples/proxy/client.go @@ -2,7 +2,6 @@ package main import ( "log" - "sync" "time" "github.com/bluenviron/gortsplib/v4" @@ -18,12 +17,13 @@ const ( ) type client struct { - mutex sync.RWMutex - stream *gortsplib.ServerStream + s *server } -func newClient() *client { - c := &client{} +func newClient(s *server) *client { + c := &client{ + s: s, + } // start a separated routine go c.run() @@ -40,12 +40,6 @@ func (c *client) run() { } } -func (c *client) getStream() *gortsplib.ServerStream { - c.mutex.RLock() - defer c.mutex.RUnlock() - return c.stream -} - func (c *client) read() error { rc := gortsplib.Client{} @@ -74,24 +68,11 @@ func (c *client) read() error { return err } - // create a server stream - stream := gortsplib.NewServerStream(medias) - defer stream.Close() + stream := c.s.setStreamReady(medias) + defer c.s.setStreamUnready() log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n") - // make stream available by using getStream() - c.mutex.Lock() - c.stream = stream - c.mutex.Unlock() - - defer func() { - // remove stream from getStream() - c.mutex.Lock() - c.stream = nil - c.mutex.Unlock() - }() - // called when a RTP packet arrives rc.OnPacketRTPAny(func(medi *media.Media, forma formats.Format, pkt *rtp.Packet) { // route incoming packets to the server stream diff --git a/examples/proxy/main.go b/examples/proxy/main.go index 2efda710..ab829706 100644 --- a/examples/proxy/main.go +++ b/examples/proxy/main.go @@ -1,14 +1,21 @@ package main +import "log" + // This example shows how to -// 1. read an existing stream from an external server or camera, with a client -// 2. create a server that allow to proxy that stream +// 1. create a server that allow to serve a stream. +// 2. create a client, read an existing stream from an external server or camera, +// pass the stream to the server in order to serve it. func main() { - // allocate the client - c := newClient() - // allocate the server. - // give server access to the method client.getStream(). - newServer(c.getStream) + s := newServer() + + // allocate the client. + // give client access to the server. + newClient(s) + + // start server and wait until a fatal error + log.Printf("server is ready") + s.s.StartAndWait() } diff --git a/examples/proxy/server.go b/examples/proxy/server.go index c439f8c8..5cf0234e 100644 --- a/examples/proxy/server.go +++ b/examples/proxy/server.go @@ -2,24 +2,24 @@ package main import ( "log" + "sync" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/media" ) type server struct { - getStream func() *gortsplib.ServerStream + s *gortsplib.Server + mutex sync.Mutex + stream *gortsplib.ServerStream } -func newServer( - getStream func() *gortsplib.ServerStream, -) *server { - s := &server{ - getStream: getStream, - } +func newServer() *server { + s := &server{} // configure the server - rs := &gortsplib.Server{ + s.s = &gortsplib.Server{ Handler: s, RTSPAddress: ":8554", UDPRTPAddress: ":8000", @@ -29,9 +29,7 @@ func newServer( MulticastRTCPPort: 8003, } - // start server and wait until a fatal error - log.Printf("server is ready") - panic(rs.StartAndWait()) + return s } // called when a connection is opened. @@ -58,10 +56,11 @@ func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { log.Printf("describe request") - stream := s.getStream() + s.mutex.Lock() + defer s.mutex.Unlock() // stream is not available yet - if stream == nil { + if s.stream == nil { return &base.Response{ StatusCode: base.StatusNotFound, }, nil, nil @@ -69,17 +68,18 @@ func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil + }, s.stream, nil } // called when receiving a SETUP request. func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { log.Printf("setup request") - stream := s.getStream() + s.mutex.Lock() + defer s.mutex.Unlock() // stream is not available yet - if stream == nil { + if s.stream == nil { return &base.Response{ StatusCode: base.StatusNotFound, }, nil, nil @@ -87,7 +87,7 @@ func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil + }, s.stream, nil } // called when receiving a PLAY request. @@ -98,3 +98,17 @@ func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, StatusCode: base.StatusOK, }, nil } + +func (s *server) setStreamReady(medias media.Medias) *gortsplib.ServerStream { + s.mutex.Lock() + defer s.mutex.Unlock() + s.stream = gortsplib.NewServerStream(s.s, medias) + return s.stream +} + +func (s *server) setStreamUnready() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.stream.Close() + s.stream = nil +} diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index e3e01813..92d4a07e 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -88,7 +88,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // create the stream and save the publisher - sh.stream = gortsplib.NewServerStream(ctx.Medias) + sh.stream = gortsplib.NewServerStream(ctx.Server, ctx.Medias) sh.publisher = ctx.Session return &base.Response{ diff --git a/examples/server/main.go b/examples/server/main.go index 57319ce4..8513ca53 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -87,7 +87,7 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // create the stream and save the publisher - sh.stream = gortsplib.NewServerStream(ctx.Medias) + sh.stream = gortsplib.NewServerStream(ctx.Server, ctx.Medias) sh.publisher = ctx.Session return &base.Response{ diff --git a/internal/highleveltests/server_test.go b/internal/highleveltests/server_test.go index 33fdbff7..da912b5f 100644 --- a/internal/highleveltests/server_test.go +++ b/internal/highleveltests/server_test.go @@ -328,7 +328,7 @@ func TestServerRecordRead(t *testing.T) { }, fmt.Errorf("someone is already publishing") } - stream = gortsplib.NewServerStream(ctx.Medias) + stream = gortsplib.NewServerStream(ctx.Server, ctx.Medias) publisher = ctx.Session return &base.Response{ diff --git a/server_play_test.go b/server_play_test.go index a792e3a2..be00f2b1 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -222,13 +222,7 @@ func TestServerPlayPath(t *testing.T) { }, } { t.Run(ca.name, func(t *testing.T) { - stream := NewServerStream(media.Medias{ - testH264Media, - testH264Media, - testH264Media, - testH264Media, - }) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -259,6 +253,14 @@ func TestServerPlayPath(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{ + testH264Media, + testH264Media, + testH264Media, + testH264Media, + }) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -298,15 +300,9 @@ func TestServerPlaySetupErrors(t *testing.T) { "closed stream", } { t.Run(ca, func(t *testing.T) { + var stream *ServerStream nconnClosed := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - if ca == "closed stream" { - stream.Close() - } else { - defer stream.Close() - } - s := &Server{ Handler: &testServerHandler{ onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { @@ -340,6 +336,13 @@ func TestServerPlaySetupErrors(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + if ca == "closed stream" { + stream.Close() + } else { + defer stream.Close() + } + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -421,8 +424,7 @@ func TestServerPlaySetupErrors(t *testing.T) { } func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream first := int32(1) errorRecv := make(chan struct{}) @@ -460,6 +462,9 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + for i := 0; i < 2; i++ { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) @@ -509,15 +514,12 @@ func TestServerPlay(t *testing.T) { "multicast", } { t.Run(transport, func(t *testing.T) { + var stream *ServerStream nconnOpened := make(chan struct{}) nconnClosed := make(chan struct{}) sessionOpened := make(chan struct{}) sessionClosed := make(chan struct{}) framesReceived := make(chan struct{}) - - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - counter := uint64(0) listenIP := multicastCapableIP(t) @@ -609,6 +611,9 @@ func TestServerPlay(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", listenIP+":8554") require.NoError(t, err) @@ -838,11 +843,9 @@ func TestServerPlayDecodeErrors(t *testing.T) { {"tcp", "rtcp too big"}, } { t.Run(ca.proto+" "+ca.name, func(t *testing.T) { + var stream *ServerStream errorRecv := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ Handler: &testServerHandler{ onDescribe: func(ctx *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) { @@ -886,6 +889,9 @@ func TestServerPlayDecodeErrors(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -969,8 +975,7 @@ func TestServerPlayDecodeErrors(t *testing.T) { func TestServerPlayRTCPReport(t *testing.T) { for _, ca := range []string{"udp", "tcp"} { t.Run(ca, func(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -1000,6 +1005,9 @@ func TestServerPlayRTCPReport(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1092,9 +1100,7 @@ func TestServerPlayRTCPReport(t *testing.T) { } func TestServerPlayVLCMulticast(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - + var stream *ServerStream listenIP := multicastCapableIP(t) s := &Server{ @@ -1115,6 +1121,9 @@ func TestServerPlayVLCMulticast(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", listenIP+":8554") require.NoError(t, err) conn := conn.NewConn(nconn) @@ -1138,12 +1147,10 @@ func TestServerPlayVLCMulticast(t *testing.T) { } func TestServerPlayTCPResponseBeforeFrames(t *testing.T) { + var stream *ServerStream writerDone := make(chan struct{}) writerTerminate := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ RTSPAddress: "localhost:8554", Handler: &testServerHandler{ @@ -1195,6 +1202,9 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1226,8 +1236,7 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) { } func TestServerPlayPlayPlay(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -1256,6 +1265,9 @@ func TestServerPlayPlayPlay(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1285,13 +1297,11 @@ func TestServerPlayPlayPlay(t *testing.T) { } func TestServerPlayPlayPausePlay(t *testing.T) { + var stream *ServerStream writerStarted := false writerDone := make(chan struct{}) writerTerminate := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ Handler: &testServerHandler{ onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { @@ -1346,6 +1356,9 @@ func TestServerPlayPlayPausePlay(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1376,12 +1389,10 @@ func TestServerPlayPlayPausePlay(t *testing.T) { } func TestServerPlayPlayPausePause(t *testing.T) { + var stream *ServerStream writerDone := make(chan struct{}) writerTerminate := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ Handler: &testServerHandler{ onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { @@ -1433,6 +1444,9 @@ func TestServerPlayPlayPausePause(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1471,11 +1485,9 @@ func TestServerPlayTimeout(t *testing.T) { // there's no timeout when reading with TCP } { t.Run(transport, func(t *testing.T) { + var stream *ServerStream sessionClosed := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ Handler: &testServerHandler{ onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { @@ -1518,6 +1530,9 @@ func TestServerPlayTimeout(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1562,12 +1577,10 @@ func TestServerPlayWithoutTeardown(t *testing.T) { "tcp", } { t.Run(transport, func(t *testing.T) { + var stream *ServerStream nconnClosed := make(chan struct{}) sessionClosed := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ Handler: &testServerHandler{ onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { @@ -1606,6 +1619,9 @@ func TestServerPlayWithoutTeardown(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1647,8 +1663,7 @@ func TestServerPlayWithoutTeardown(t *testing.T) { } func TestServerPlayUDPChangeConn(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -1682,6 +1697,9 @@ func TestServerPlayUDPChangeConn(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + sxID := "" func() { @@ -1734,8 +1752,7 @@ func TestServerPlayUDPChangeConn(t *testing.T) { } func TestServerPlayPartialMedias(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media, testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -1770,6 +1787,9 @@ func TestServerPlayPartialMedias(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media, testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -1864,17 +1884,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) { err := forma.Init() require.NoError(t, err) - stream := NewServerStream(media.Medias{ - &media.Media{ - Type: "application", - Formats: []formats.Format{forma}, - }, - &media.Media{ - Type: "application", - Formats: []formats.Format{forma}, - }, - }) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -1901,6 +1911,18 @@ func TestServerPlayAdditionalInfos(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{ + &media.Media{ + Type: "application", + Formats: []formats.Format{forma}, + }, + &media.Media{ + Type: "application", + Formats: []formats.Format{forma}, + }, + }) + defer stream.Close() + err = stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{ Header: rtp.Header{ Version: 2, @@ -1979,17 +2001,7 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { err := forma.Init() require.NoError(t, err) - stream := NewServerStream(media.Medias{ - &media.Media{ - Type: "application", - Formats: []formats.Format{forma}, - }, - &media.Media{ - Type: "application", - Formats: []formats.Format{forma}, - }, - }) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -2016,6 +2028,18 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{ + &media.Media{ + Type: "application", + Formats: []formats.Format{forma}, + }, + &media.Media{ + Type: "application", + Formats: []formats.Format{forma}, + }, + }) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() diff --git a/server_record_test.go b/server_record_test.go index fce92880..184be30a 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -223,7 +223,7 @@ func TestServerRecordPath(t *testing.T) { Handler: &testServerHandler{ onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { // make sure that media URLs are not overridden by NewServerStream() - stream := NewServerStream(ctx.Medias) + stream := NewServerStream(ctx.Server, ctx.Medias) defer stream.Close() return &base.Response{ diff --git a/server_stream.go b/server_stream.go index 916c0028..8438afbc 100644 --- a/server_stream.go +++ b/server_stream.go @@ -19,10 +19,10 @@ import ( // - allocating multicast listeners // - gathering infos about the stream in order to generate SSRC and RTP-Info type ServerStream struct { + s *Server medias media.Medias mutex sync.RWMutex - s *Server activeUnicastReaders map[*ServerSession]struct{} readers map[*ServerSession]struct{} streamMedias map[*media.Media]*serverStreamMedia @@ -30,8 +30,9 @@ type ServerStream struct { } // NewServerStream allocates a ServerStream. -func NewServerStream(medias media.Medias) *ServerStream { +func NewServerStream(s *Server, medias media.Medias) *ServerStream { st := &ServerStream{ + s: s, medias: medias, activeUnicastReaders: make(map[*ServerSession]struct{}), readers: make(map[*ServerSession]struct{}), @@ -42,10 +43,6 @@ func NewServerStream(medias media.Medias) *ServerStream { st.streamMedias[medi] = newServerStreamMedia(st, medi, i) } - return st -} - -func (st *ServerStream) initializeServerDependentPart() { if !st.s.DisableRTCPSenderReports { for _, ssm := range st.streamMedias { for _, tr := range ssm.formats { @@ -53,6 +50,8 @@ func (st *ServerStream) initializeServerDependentPart() { } } } + + return st } // Close closes a ServerStream. @@ -156,11 +155,6 @@ func (st *ServerStream) readerAdd( return fmt.Errorf("stream is closed") } - if st.s == nil { - st.s = ss.s - st.initializeServerDependentPart() - } - switch transport { case TransportUDP: // check whether UDP ports and IP are already assigned to another reader diff --git a/server_test.go b/server_test.go index 464c7dc8..fce0faf1 100644 --- a/server_test.go +++ b/server_test.go @@ -342,11 +342,10 @@ func (s *testServerErrMethodNotImplemented) OnSetup( func TestServerErrorMethodNotImplemented(t *testing.T) { for _, ca := range []string{"outside session", "inside session"} { t.Run(ca, func(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + h := &testServerErrMethodNotImplemented{} s := &Server{ - Handler: &testServerErrMethodNotImplemented{stream}, + Handler: h, RTSPAddress: "localhost:8554", } @@ -354,6 +353,11 @@ func TestServerErrorMethodNotImplemented(t *testing.T) { require.NoError(t, err) defer s.Close() + stream := NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + + h.stream = stream + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -416,8 +420,7 @@ func TestServerErrorMethodNotImplemented(t *testing.T) { } func TestServerErrorTCPTwoConnOneSession(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -449,6 +452,9 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn1, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn1.Close() @@ -507,8 +513,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { } func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -540,6 +545,9 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -590,8 +598,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { } func TestServerSetupMultipleTransports(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -613,6 +620,9 @@ func TestServerSetupMultipleTransports(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -674,9 +684,7 @@ func TestServerSetupMultipleTransports(t *testing.T) { func TestServerGetSetParameter(t *testing.T) { for _, ca := range []string{"inside session", "outside session"} { t.Run(ca, func(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - + var stream *ServerStream var params []byte s := &Server{ @@ -723,6 +731,9 @@ func TestServerGetSetParameter(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -840,9 +851,7 @@ func TestServerErrorInvalidSession(t *testing.T) { } func TestServerSessionClose(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - + var stream *ServerStream var session *ServerSession connClosed := make(chan struct{}) @@ -872,6 +881,9 @@ func TestServerSessionClose(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() @@ -918,11 +930,9 @@ func TestServerSessionAutoClose(t *testing.T) { "200", "400", } { t.Run(ca, func(t *testing.T) { + var stream *ServerStream sessionClosed := make(chan struct{}) - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() - s := &Server{ Handler: &testServerHandler{ onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { @@ -952,6 +962,9 @@ func TestServerSessionAutoClose(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) conn := conn.NewConn(nconn) @@ -995,8 +1008,7 @@ func TestServerSessionAutoClose(t *testing.T) { } func TestServerSessionTeardown(t *testing.T) { - stream := NewServerStream(media.Medias{testH264Media}) - defer stream.Close() + var stream *ServerStream s := &Server{ Handler: &testServerHandler{ @@ -1018,6 +1030,9 @@ func TestServerSessionTeardown(t *testing.T) { require.NoError(t, err) defer s.Close() + stream = NewServerStream(s, media.Medias{testH264Media}) + defer stream.Close() + nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close()