diff --git a/server_publish_test.go b/server_publish_test.go index 301d6523..6483a6ba 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -1491,3 +1491,137 @@ func TestServerPublishWithoutTeardown(t *testing.T) { }) } } + +func TestServerPublishUDPChangeConn(t *testing.T) { + s := &Server{ + Handler: &testServerHandler{ + onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onGetParameter: func(ctx *ServerHandlerOnGetParameterCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onFrame: func(ctx *ServerHandlerOnFrameCtx) { + }, + }, + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + } + + err := s.Start("127.0.0.1:8554") + require.NoError(t, err) + defer s.Close() + + sxID := "" + + func() { + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + tracks := Tracks{track} + for i, t := range tracks { + t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=" + strconv.FormatInt(int64(i), 10), + }) + } + + err = base.Request{ + Method: base.Announce, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: tracks.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err := readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + inTH := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModeRecord + return &v + }(), + Protocol: StreamProtocolUDP, + ClientPorts: &[2]int{35466, 35467}, + } + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": inTH.Write(), + "Session": res.Header["Session"], + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err = readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Record, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + "Session": res.Header["Session"], + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err = readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + sxID = res.Header["Session"][0] + }() + + func() { + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.GetParameter, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Session": base.HeaderValue{sxID}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err := readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + }() +} diff --git a/server_read_test.go b/server_read_test.go index f66f58c2..771227ea 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -1129,3 +1129,105 @@ func TestServerReadWithoutTeardown(t *testing.T) { }) } } + +func TestServerReadUDPChangeConn(t *testing.T) { + s := &Server{ + Handler: &testServerHandler{ + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onFrame: func(ctx *ServerHandlerOnFrameCtx) { + }, + onGetParameter: func(ctx *ServerHandlerOnGetParameterCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + } + + err := s.Start("127.0.0.1:8554") + require.NoError(t, err) + defer s.Close() + + sxID := "" + + func() { + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + inTH := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + Protocol: StreamProtocolUDP, + ClientPorts: &[2]int{35466, 35467}, + } + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": inTH.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err := readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Session": res.Header["Session"], + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err = readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + sxID = res.Header["Session"][0] + }() + + func() { + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.GetParameter, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Session": base.HeaderValue{sxID}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + res, err := readResponse(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + }() +}