server: add WithoutTeardown tests

This commit is contained in:
aler9
2021-05-04 16:42:56 +02:00
committed by Alessandro Ros
parent e271c4e6ef
commit f74be9a72f
2 changed files with 240 additions and 2 deletions

View File

@@ -1000,7 +1000,7 @@ func TestServerPublishRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestServerPublishErrorTimeout(t *testing.T) { func TestServerPublishTimeout(t *testing.T) {
for _, proto := range []string{ for _, proto := range []string{
"udp", "udp",
"tcp", "tcp",
@@ -1137,3 +1137,139 @@ func TestServerPublishErrorTimeout(t *testing.T) {
}) })
} }
} }
func TestServerPublishWithoutTeardown(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
} {
t.Run(proto, func(t *testing.T) {
connClosed := make(chan struct{})
sessionClosed := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(sc *ServerConn, err error) {
close(connClosed)
},
onSessionClose: func(ss *ServerSession, err error) {
close(sessionClosed)
},
onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
ReadTimeout: 1 * time.Second,
}
if proto == "udp" {
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
}
err := s.Start("127.0.0.1:8554")
require.NoError(t, err)
defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
bconn := bufio.NewReadWriter(bufio.NewReader(nconn), bufio.NewWriter(nconn))
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
tracks := Tracks{track}
for i, t := range tracks {
t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{
Key: "control",
Value: "trackID=" + strconv.FormatInt(int64(i), 10),
})
}
err = base.Request{
Method: base.Announce,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: tracks.Write(),
}.Write(bconn.Writer)
require.NoError(t, err)
var res base.Response
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
inTH := &headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModeRecord
return &v
}(),
}
if proto == "udp" {
inTH.Protocol = StreamProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
} else {
inTH.Protocol = StreamProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
}
err = base.Request{
Method: base.Setup,
URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": inTH.Write(),
"Session": res.Header["Session"],
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var th headers.Transport
err = th.Read(res.Header["Transport"])
require.NoError(t, err)
err = base.Request{
Method: base.Record,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"3"},
"Session": res.Header["Session"],
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
nconn.Close()
<-sessionClosed
<-connClosed
})
}
}

View File

@@ -905,7 +905,7 @@ func TestServerReadPlayPausePause(t *testing.T) {
require.Equal(t, base.StatusOK, res.StatusCode) require.Equal(t, base.StatusOK, res.StatusCode)
} }
func TestServerReadErrorTimeout(t *testing.T) { func TestServerReadTimeout(t *testing.T) {
for _, proto := range []string{ for _, proto := range []string{
"udp", "udp",
// checking TCP is useless, since there's no timeout when reading with TCP // checking TCP is useless, since there's no timeout when reading with TCP
@@ -961,6 +961,105 @@ func TestServerReadErrorTimeout(t *testing.T) {
}(), }(),
} }
inTH.Protocol = StreamProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
err = base.Request{
Method: base.Setup,
URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": inTH.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
var res base.Response
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Play,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Session": res.Header["Session"],
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
<-sessionClosed
})
}
}
func TestServerReadWithoutTeardown(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
} {
t.Run(proto, func(t *testing.T) {
connClosed := make(chan struct{})
sessionClosed := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(sc *ServerConn, err error) {
close(connClosed)
},
onSessionClose: func(ss *ServerSession, err error) {
close(sessionClosed)
},
onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
ReadTimeout: 1 * time.Second,
closeSessionAfterNoRequestsFor: 1 * time.Second,
}
if proto == "udp" {
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
}
err := s.Start("127.0.0.1:8554")
require.NoError(t, err)
defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(nconn), bufio.NewWriter(nconn))
inTH := &headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
}
if proto == "udp" { if proto == "udp" {
inTH.Protocol = StreamProtocolUDP inTH.Protocol = StreamProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467} inTH.ClientPorts = &[2]int{35466, 35467}
@@ -998,7 +1097,10 @@ func TestServerReadErrorTimeout(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode) require.Equal(t, base.StatusOK, res.StatusCode)
nconn.Close()
<-sessionClosed <-sessionClosed
<-connClosed
}) })
} }
} }