server: fix crash when calling RECORD and PAUSE (#392)

This commit is contained in:
Alessandro Ros
2023-08-31 22:42:17 +02:00
committed by GitHub
parent fc0c242789
commit efc7783650
3 changed files with 88 additions and 15 deletions

View File

@@ -1541,3 +1541,75 @@ func TestServerRecordPacketNTP(t *testing.T) {
<-recv <-recv
} }
func TestServerRecordPausePause(t *testing.T) {
s := &Server{
Handler: &testServerHandler{
onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil, nil
},
onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onPause: func(ctx *ServerHandlerOnPauseCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
medias := []*description.Media{{
Type: description.MediaTypeApplication,
Formats: []format.Format{&format.Generic{
PayloadTyp: 97,
RTPMa: "private/90000",
}},
}}
doAnnounce(t, conn, "rtsp://localhost:8554/teststream", medias)
inTH := &headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModeRecord
return &v
}(),
Protocol: headers.TransportProtocolUDP,
ClientPorts: &[2]int{35466, 35467},
}
res, _ := doSetup(t, conn, "rtsp://localhost:8554/teststream/"+medias[0].Control, inTH, "")
session := readSession(t, res)
doRecord(t, conn, "rtsp://localhost:8554/teststream", session)
doPause(t, conn, "rtsp://localhost:8554/teststream", session)
doPause(t, conn, "rtsp://localhost:8554/teststream", session)
}

View File

@@ -336,17 +336,6 @@ func (ss *ServerSession) run() {
ss.ctxCancel() ss.ctxCancel()
if ss.setuppedStream != nil {
ss.setuppedStream.readerSetInactive(ss)
ss.setuppedStream.readerRemove(ss)
}
for _, sm := range ss.setuppedMedias {
sm.stop()
}
ss.writer.stop()
// close all associated connections, both UDP and TCP // close all associated connections, both UDP and TCP
// except for the ones that called TEARDOWN // except for the ones that called TEARDOWN
// (that are detached from the session just after the request) // (that are detached from the session just after the request)
@@ -359,6 +348,17 @@ func (ss *ServerSession) run() {
sc.removeSession(ss) sc.removeSession(ss)
} }
if ss.setuppedStream != nil {
ss.setuppedStream.readerSetInactive(ss)
ss.setuppedStream.readerRemove(ss)
}
ss.writer.stop()
for _, sm := range ss.setuppedMedias {
sm.stop()
}
ss.s.closeSession(ss) ss.s.closeSession(ss)
if h, ok := ss.s.Handler.(ServerHandlerOnSessionClose); ok { if h, ok := ss.s.Handler.(ServerHandlerOnSessionClose); ok {
@@ -929,8 +929,6 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
sm.start() sm.start()
} }
ss.setuppedStream.readerSetActive(ss)
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case TransportUDP: case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
@@ -945,6 +943,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
// writer.start() is called by ServerConn after the response has been sent // writer.start() is called by ServerConn after the response has been sent
} }
ss.setuppedStream.readerSetActive(ss)
rtpInfo, ok := generateRTPInfo( rtpInfo, ok := generateRTPInfo(
ss.s.timeNow(), ss.s.timeNow(),
ss.setuppedMediasOrdered, ss.setuppedMediasOrdered,
@@ -1053,12 +1053,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
return res, err return res, err
} }
ss.writer.stop()
if ss.setuppedStream != nil { if ss.setuppedStream != nil {
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
} }
ss.writer.stop()
for _, sm := range ss.setuppedMedias { for _, sm := range ss.setuppedMedias {
sm.stop() sm.stop()
} }

View File

@@ -58,6 +58,7 @@ func (sf *serverSessionFormat) start() {
func (sf *serverSessionFormat) stop() { func (sf *serverSessionFormat) stop() {
if sf.rtcpReceiver != nil { if sf.rtcpReceiver != nil {
sf.rtcpReceiver.Close() sf.rtcpReceiver.Close()
sf.rtcpReceiver = nil
} }
} }