server: use relative control attributes (#620)

This commit is contained in:
Alessandro Ros
2024-09-15 22:51:55 +02:00
committed by GitHub
parent deb78c299e
commit 72e74f349e
12 changed files with 126 additions and 170 deletions

View File

@@ -71,7 +71,7 @@ func (cm *clientMedia) allocateUDPListeners(
} }
var err error var err error
cm.udpRTPListener, cm.udpRTCPListener, err = clientAllocateUDPListenerPair(cm.c) cm.udpRTPListener, cm.udpRTCPListener, err = allocateUDPListenerPair(cm.c)
return err return err
} }

View File

@@ -185,6 +185,10 @@ func TestClientRecordSerial(t *testing.T) {
err = desc.Unmarshal(req.Body) err = desc.Unmarshal(req.Body)
require.NoError(t, err2) require.NoError(t, err2)
var desc2 description.Session
err = desc2.Unmarshal(&desc)
require.NoError(t, err2)
err2 = conn.WriteResponse(&base.Response{ err2 = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
}) })
@@ -194,7 +198,7 @@ func TestClientRecordSerial(t *testing.T) {
require.NoError(t, err2) require.NoError(t, err2)
require.Equal(t, base.Setup, req.Method) require.Equal(t, base.Setup, req.Method)
require.Equal(t, mustParseURL( require.Equal(t, mustParseURL(
scheme+"://localhost:8554/teststream/"+relativeControlAttribute(desc.MediaDescriptions[0])), req.URL) scheme+"://localhost:8554/teststream/"+desc2.Medias[0].Control), req.URL)
var inTH headers.Transport var inTH headers.Transport
err2 = inTH.Unmarshal(req.Header["Transport"]) err2 = inTH.Unmarshal(req.Header["Transport"])

View File

@@ -24,7 +24,7 @@ func randInRange(max int) (int, error) {
return int(n.Int64()), nil return int(n.Int64()), nil
} }
func clientAllocateUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener, error) { func allocateUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener, error) {
// choose two consecutive ports in range 65535-10000 // choose two consecutive ports in range 65535-10000
// RTP port must be even and RTCP port odd // RTP port must be even and RTCP port odd
for { for {

View File

@@ -5,6 +5,8 @@ import (
) )
// PathSplitQuery splits a path from a query. // PathSplitQuery splits a path from a query.
//
// Deprecated: not useful anymore.
func PathSplitQuery(pathAndQuery string) (string, string) { func PathSplitQuery(pathAndQuery string) (string, string) {
i := strings.Index(pathAndQuery, "?") i := strings.Index(pathAndQuery, "?")
if i >= 0 { if i >= 0 {

View File

@@ -75,6 +75,8 @@ func (u *URL) CloneWithoutCredentials() *URL {
} }
// RTSPPathAndQuery returns the path and query of a RTSP URL. // RTSPPathAndQuery returns the path and query of a RTSP URL.
//
// Deprecated: not useful anymore.
func (u *URL) RTSPPathAndQuery() (string, bool) { func (u *URL) RTSPPathAndQuery() (string, bool) {
var pathAndQuery string var pathAndQuery string
if u.RawPath != "" { if u.RawPath != "" {

View File

@@ -24,7 +24,7 @@ func getSessionID(header base.Header) string {
return "" return ""
} }
func serverSideDescription(d *description.Session, contentBase *base.URL) *description.Session { func serverSideDescription(d *description.Session) *description.Session {
out := &description.Session{ out := &description.Session{
Title: d.Title, Title: d.Title,
FECGroups: d.FECGroups, FECGroups: d.FECGroups,
@@ -32,7 +32,7 @@ func serverSideDescription(d *description.Session, contentBase *base.URL) *descr
} }
for i, medi := range d.Medias { for i, medi := range d.Medias {
mc := &description.Media{ out.Medias[i] = &description.Media{
Type: medi.Type, Type: medi.Type,
ID: medi.ID, ID: medi.ID,
IsBackChannel: medi.IsBackChannel, IsBackChannel: medi.IsBackChannel,
@@ -41,15 +41,6 @@ func serverSideDescription(d *description.Session, contentBase *base.URL) *descr
Control: "trackID=" + strconv.FormatInt(int64(i), 10), Control: "trackID=" + strconv.FormatInt(int64(i), 10),
Formats: medi.Formats, Formats: medi.Formats,
} }
// always use the absolute URL of the track as control attribute, in order
// to fix compatibility between GStreamer and URLs with queries.
// (when a relative control is used, GStreamer puts it between path and query,
// instead of appending it to the URL).
u, _ := mc.URL(contentBase)
mc.Control = u.String()
out.Medias[i] = mc
} }
return out return out
@@ -215,16 +206,10 @@ func (sc *ServerConn) handleRequestInner(req *base.Request) (*base.Response, err
var path string var path string
var query string var query string
switch req.Method { switch req.Method {
case base.Describe, base.GetParameter, base.SetParameter: case base.Describe, base.GetParameter, base.SetParameter:
pathAndQuery, ok := req.URL.RTSPPathAndQuery() path, query = getPathAndQuery(req.URL, false)
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerInvalidPath{}
}
path, query = base.PathSplitQuery(pathAndQuery)
} }
switch req.Method { switch req.Method {
@@ -295,7 +280,7 @@ func (sc *ServerConn) handleRequestInner(req *base.Request) (*base.Response, err
} }
if stream != nil { if stream != nil {
byts, _ := serverSideDescription(stream.desc, req.URL).Marshal(multicast) byts, _ := serverSideDescription(stream.desc).Marshal(multicast)
res.Body = byts res.Body = byts
} }
} }

View File

@@ -22,7 +22,7 @@ func (h *serverMulticastWriter) initialize() error {
return err return err
} }
rtpl, rtcpl, err := serverAllocateUDPListenerMulticastPair( rtpl, rtcpl, err := allocateUDPListenerMulticastPair(
h.s.ListenPacket, h.s.ListenPacket,
h.s.WriteTimeout, h.s.WriteTimeout,
h.s.MulticastRTPPort, h.s.MulticastRTPPort,

View File

@@ -13,7 +13,6 @@ import (
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/rtp" "github.com/pion/rtp"
psdp "github.com/pion/sdp/v3"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
@@ -63,18 +62,13 @@ func multicastCapableIP(t *testing.T) string {
return "" return ""
} }
func relativeControlAttribute(md *psdp.MediaDescription) string { func mediaURL(t *testing.T, baseURL *base.URL, media *description.Media) *base.URL {
v, _ := md.Attribute("control") u, err := media.URL(baseURL)
i := strings.Index(v, "trackID=") require.NoError(t, err)
return v[i:] return u
} }
func absoluteControlAttribute(md *psdp.MediaDescription) string { func doDescribe(t *testing.T, conn *conn.Conn) *description.Session {
v, _ := md.Attribute("control")
return v
}
func doDescribe(t *testing.T, conn *conn.Conn) *sdp.SessionDescription {
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
Method: base.Describe, Method: base.Describe,
URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"), URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"),
@@ -88,7 +82,14 @@ func doDescribe(t *testing.T, conn *conn.Conn) *sdp.SessionDescription {
var desc sdp.SessionDescription var desc sdp.SessionDescription
err = desc.Unmarshal(res.Body) err = desc.Unmarshal(res.Body)
require.NoError(t, err) require.NoError(t, err)
return &desc
var desc2 description.Session
err = desc2.Unmarshal(&desc)
require.NoError(t, err)
desc2.BaseURL = mustParseURL(res.Header["Content-Base"][0])
return &desc2
} }
func doSetup(t *testing.T, conn *conn.Conn, u string, func doSetup(t *testing.T, conn *conn.Conn, u string,
@@ -179,11 +180,17 @@ func TestServerPlayPath(t *testing.T) {
"/teststream", "/teststream",
}, },
{ {
"with query", "with query, ffmpeg format",
"rtsp://localhost:8554/teststream?testing=123[control]", "rtsp://localhost:8554/teststream?testing=123[control]",
"rtsp://localhost:8554/teststream/", "rtsp://localhost:8554/teststream/",
"/teststream", "/teststream",
}, },
{
"with query, gstreamer format",
"rtsp://localhost:8554/teststream[control]?testing=123",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{ {
// this is needed to support reading mpegts with ffmpeg // this is needed to support reading mpegts with ffmpeg
"without media id", "without media id",
@@ -204,15 +211,21 @@ func TestServerPlayPath(t *testing.T) {
"/test/stream", "/test/stream",
}, },
{ {
"subpath with query", "subpath with query, ffmpeg format",
"rtsp://localhost:8554/test/stream?testing=123[control]", "rtsp://localhost:8554/test/stream?testing=123[control]",
"rtsp://localhost:8554/test/stream", "rtsp://localhost:8554/test/stream",
"/test/stream", "/test/stream",
}, },
{
"subpath with query, gstreamer format",
"rtsp://localhost:8554/test/stream[control]?testing=123",
"rtsp://localhost:8554/test/stream",
"/test/stream",
},
{ {
"no slash", "no slash",
"rtsp://localhost:8554[control]", "rtsp://localhost:8554[control]",
"rtsp://localhost:8554/", "rtsp://localhost:8554",
"", "",
}, },
{ {
@@ -279,8 +292,7 @@ func TestServerPlayPath(t *testing.T) {
} }
res, _ := doSetup(t, conn, res, _ := doSetup(t, conn,
strings.ReplaceAll(ca.setupURL, "[control]", "/"+ strings.ReplaceAll(ca.setupURL, "[control]", "/"+desc.Medias[1].Control),
relativeControlAttribute(desc.MediaDescriptions[1])),
th, "") th, "")
session := readSession(t, res) session := readSession(t, res)
@@ -356,7 +368,7 @@ func TestServerPlaySetupErrors(t *testing.T) {
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"2"}, "CSeq": base.HeaderValue{"2"},
"Transport": th.Marshal(), "Transport": th.Marshal(),
@@ -374,7 +386,7 @@ func TestServerPlaySetupErrors(t *testing.T) {
res, err = writeReqReadRes(conn, base.Request{ res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/test12stream/" + relativeControlAttribute(desc.MediaDescriptions[0])), URL: mustParseURL("rtsp://localhost:8554/test12stream/" + desc.Medias[0].Control),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"3"}, "CSeq": base.HeaderValue{"3"},
"Transport": th.Marshal(), "Transport": th.Marshal(),
@@ -394,7 +406,7 @@ func TestServerPlaySetupErrors(t *testing.T) {
res, err = writeReqReadRes(conn, base.Request{ res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"4"}, "CSeq": base.HeaderValue{"4"},
"Transport": th.Marshal(), "Transport": th.Marshal(),
@@ -473,7 +485,7 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) {
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"2"}, "CSeq": base.HeaderValue{"2"},
"Transport": inTH.Marshal(), "Transport": inTH.Marshal(),
@@ -654,7 +666,7 @@ func TestServerPlay(t *testing.T) {
inTH.InterleavedIDs = &[2]int{5, 6} // odd value inTH.InterleavedIDs = &[2]int{5, 6} // odd value
} }
res, th := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
var l1 net.PacketConn var l1 net.PacketConn
var l2 net.PacketConn var l2 net.PacketConn
@@ -916,7 +928,7 @@ func TestServerPlayDecodeErrors(t *testing.T) {
inTH.InterleavedIDs = &[2]int{0, 1} inTH.InterleavedIDs = &[2]int{0, 1}
} }
res, resTH := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, resTH := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
var l1 net.PacketConn var l1 net.PacketConn
var l2 net.PacketConn var l2 net.PacketConn
@@ -1034,7 +1046,7 @@ func TestServerPlayRTCPReport(t *testing.T) {
inTH.InterleavedIDs = &[2]int{0, 1} inTH.InterleavedIDs = &[2]int{0, 1}
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
var l1 net.PacketConn var l1 net.PacketConn
var l2 net.PacketConn var l2 net.PacketConn
@@ -1228,7 +1240,7 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1285,7 +1297,7 @@ func TestServerPlayPlayPlay(t *testing.T) {
ClientPorts: &[2]int{30450, 30451}, ClientPorts: &[2]int{30450, 30451},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1370,7 +1382,7 @@ func TestServerPlayPlayPausePlay(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1452,7 +1464,7 @@ func TestServerPlayPlayPausePause(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1542,7 +1554,7 @@ func TestServerPlayTimeout(t *testing.T) {
inTH.Protocol = headers.TransportProtocolUDP inTH.Protocol = headers.TransportProtocolUDP
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1624,7 +1636,7 @@ func TestServerPlayWithoutTeardown(t *testing.T) {
inTH.InterleavedIDs = &[2]int{0, 1} inTH.InterleavedIDs = &[2]int{0, 1}
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1693,7 +1705,7 @@ func TestServerPlayUDPChangeConn(t *testing.T) {
ClientPorts: &[2]int{35466, 35467}, ClientPorts: &[2]int{35466, 35467},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1774,7 +1786,7 @@ func TestServerPlayPartialMedias(t *testing.T) {
InterleavedIDs: &[2]int{4, 5}, InterleavedIDs: &[2]int{4, 5},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -1802,7 +1814,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, th := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
ssrcs := make([]*uint32, 2) ssrcs := make([]*uint32, 2)
ssrcs[0] = th.SSRC ssrcs[0] = th.SSRC
@@ -1816,7 +1828,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
session := readSession(t, res) session := readSession(t, res)
_, th = doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[1]), inTH, session) _, th = doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[1]).String(), inTH, session)
ssrcs[1] = th.SSRC ssrcs[1] = th.SSRC
@@ -2016,13 +2028,13 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
Protocol: headers.TransportProtocolTCP, Protocol: headers.TransportProtocolTCP,
} }
res, th := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
require.Equal(t, &[2]int{0, 1}, th.InterleavedIDs) require.Equal(t, &[2]int{0, 1}, th.InterleavedIDs)
session := readSession(t, res) session := readSession(t, res)
_, th = doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[1]), inTH, session) _, th = doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[1]).String(), inTH, session)
require.Equal(t, &[2]int{2, 3}, th.InterleavedIDs) require.Equal(t, &[2]int{2, 3}, th.InterleavedIDs)
@@ -2097,7 +2109,7 @@ func TestServerPlayBytesSent(t *testing.T) {
inTH.InterleavedIDs = &[2]int{0, 1} inTH.InterleavedIDs = &[2]int{0, 1}
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)

View File

@@ -48,38 +48,6 @@ func doRecord(t *testing.T, conn *conn.Conn, u string, session string) {
require.Equal(t, base.StatusOK, res.StatusCode) require.Equal(t, base.StatusOK, res.StatusCode)
} }
func invalidURLAnnounceReq(t *testing.T, control string) base.Request {
medi := testH264Media
medi.Control = control
sout := &sdp.SessionDescription{
SessionName: psdp.SessionName("Stream"),
Origin: psdp.Origin{
Username: "-",
NetworkType: "IN",
AddressType: "IP4",
UnicastAddress: "127.0.0.1",
},
TimeDescriptions: []psdp.TimeDescription{
{Timing: psdp.Timing{}},
},
MediaDescriptions: []*psdp.MediaDescription{medi.Marshal()},
}
byts, err := sout.Marshal()
require.NoError(t, err)
return base.Request{
Method: base.Announce,
URL: mustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: byts,
}
}
func TestServerRecordErrorAnnounce(t *testing.T) { func TestServerRecordErrorAnnounce(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
name string name string
@@ -145,16 +113,6 @@ func TestServerRecordErrorAnnounce(t *testing.T) {
}, },
"invalid SDP: media 1 is invalid: clock rate not found", "invalid SDP: media 1 is invalid: clock rate not found",
}, },
{
"invalid URL 1",
invalidURLAnnounceReq(t, "rtsp:// aaaaa"),
"unable to generate media URL",
},
{
"invalid URL 3",
invalidURLAnnounceReq(t, "rtsp://host/otherpath"),
"invalid media path: must begin with '/teststream', but is '/otherpath'",
},
} { } {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
nconnClosed := make(chan struct{}) nconnClosed := make(chan struct{})

View File

@@ -34,28 +34,53 @@ func stringsReverseIndex(s, substr string) int {
return -1 return -1
} }
func serverParseURLForPlay(u *base.URL) (string, string, string, error) { // used for all methods except SETUP
pathAndQuery, ok := u.RTSPPathAndQuery() func getPathAndQuery(u *base.URL, isAnnounce bool) (string, string) {
if !ok { if !isAnnounce {
return "", "", "", liberrors.ErrServerInvalidPath{} // FFmpeg format
} if strings.HasSuffix(u.RawQuery, "/") {
return u.Path, u.RawQuery[:len(u.RawQuery)-1]
i := stringsReverseIndex(pathAndQuery, "/trackID=")
if i < 0 {
if !strings.HasSuffix(pathAndQuery, "/") {
return "", "", "", liberrors.ErrServerPathNoSlash{}
} }
path, query := base.PathSplitQuery(pathAndQuery[:len(pathAndQuery)-1]) // GStreamer format
return path, query, "", nil if len(u.Path) > 1 && strings.HasSuffix(u.Path, "/") {
return u.Path[:len(u.Path)-1], u.RawQuery
}
} }
var trackID string return u.Path, u.RawQuery
pathAndQuery, trackID = pathAndQuery[:i], pathAndQuery[i+len("/trackID="):]
path, query := base.PathSplitQuery(pathAndQuery)
return path, query, trackID, nil
} }
// used for SETUP when playing
func getPathAndQueryAndTrackID(u *base.URL) (string, string, string, error) {
// FFmpeg format
i := stringsReverseIndex(u.RawQuery, "/trackID=")
if i >= 0 {
path := u.Path
query := u.RawQuery[:i]
trackID := u.RawQuery[i+len("/trackID="):]
return path, query, trackID, nil
}
// GStreamer format
i = stringsReverseIndex(u.Path, "/trackID=")
if i >= 0 {
path := u.Path[:i]
query := u.RawQuery
trackID := u.Path[i+len("/trackID="):]
return path, query, trackID, nil
}
// no track ID and a trailing slash.
// this happens when trying to read a MPEG-TS stream with FFmpeg.
if strings.HasSuffix(u.Path[1:], "/") {
return u.Path[:len(u.Path)-1], u.RawQuery, "0", nil
}
return "", "", "", liberrors.ErrServerPathNoSlash{}
}
// used for SETUP when recording
func findMediaByURL( func findMediaByURL(
medias []*description.Media, medias []*description.Media,
path string, path string,
@@ -520,21 +545,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
var path string var path string
var query string var query string
switch req.Method { switch req.Method {
case base.Announce, base.Play, base.Record, base.Pause, base.GetParameter, base.SetParameter: case base.Announce:
pathAndQuery, ok := req.URL.RTSPPathAndQuery() path, query = getPathAndQuery(req.URL, true)
if !ok { case base.Pause, base.GetParameter, base.SetParameter, base.Play, base.Record:
return &base.Response{ path, query = getPathAndQuery(req.URL, false)
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerInvalidPath{}
}
// pathAndQuery can end with a slash due to Content-Base, remove it
if ss.state == ServerSessionStatePrePlay || ss.state == ServerSessionStatePlay {
pathAndQuery = strings.TrimSuffix(pathAndQuery, "/")
}
path, query = base.PathSplitQuery(pathAndQuery)
} }
switch req.Method { switch req.Method {
@@ -610,30 +626,6 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}, liberrors.ErrServerSDPInvalid{Err: err} }, liberrors.ErrServerSDPInvalid{Err: err}
} }
for _, medi := range desc.Medias {
var mediURL *base.URL
mediURL, err = medi.URL(req.URL)
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to generate media URL")
}
mediPath, ok := mediURL.RTSPPathAndQuery()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid media URL (%v)", mediURL)
}
if !strings.HasPrefix(mediPath, path) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid media path: must begin with '%s', but is '%s'",
path, mediPath)
}
}
res, err := ss.s.Handler.(ServerHandlerOnAnnounce).OnAnnounce(&ServerHandlerOnAnnounceCtx{ res, err := ss.s.Handler.(ServerHandlerOnAnnounce).OnAnnounce(&ServerHandlerOnAnnounceCtx{
Session: ss, Session: ss,
Conn: sc, Conn: sc,
@@ -682,9 +674,10 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
} }
var trackID string var trackID string
switch ss.state { switch ss.state {
case ServerSessionStateInitial, ServerSessionStatePrePlay: // play case ServerSessionStateInitial, ServerSessionStatePrePlay: // play
path, query, trackID, err = serverParseURLForPlay(req.URL) path, query, trackID, err = getPathAndQueryAndTrackID(req.URL)
if err != nil { if err != nil {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,

View File

@@ -408,7 +408,7 @@ func TestServerErrorMethodNotImplemented(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session = readSession(t, res) session = readSession(t, res)
} }
@@ -422,7 +422,7 @@ func TestServerErrorMethodNotImplemented(t *testing.T) {
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
Method: base.SetParameter, Method: base.SetParameter,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: headers, Header: headers,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -496,7 +496,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn1, absoluteControlAttribute(desc1.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn1, mediaURL(t, desc1.BaseURL, desc1.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -511,7 +511,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) {
res, err = writeReqReadRes(conn2, base.Request{ res, err = writeReqReadRes(conn2, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc2.MediaDescriptions[0])), URL: mediaURL(t, desc2.BaseURL, desc2.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"1"}, "CSeq": base.HeaderValue{"1"},
"Transport": headers.Transport{ "Transport": headers.Transport{
@@ -577,7 +577,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)
@@ -585,7 +585,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) {
res, err = writeReqReadRes(conn, base.Request{ res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"3"}, "CSeq": base.HeaderValue{"3"},
"Transport": headers.Transport{ "Transport": headers.Transport{
@@ -650,7 +650,7 @@ func TestServerSetupMultipleTransports(t *testing.T) {
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"1"}, "CSeq": base.HeaderValue{"1"},
"Transport": inTHS.Marshal(), "Transport": inTHS.Marshal(),
@@ -739,7 +739,7 @@ func TestServerGetSetParameter(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session = readSession(t, res) session = readSession(t, res)
} }
@@ -880,7 +880,7 @@ func TestServerSessionClose(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session.Close() session.Close()
session.Close() session.Close()
@@ -956,7 +956,7 @@ func TestServerSessionAutoClose(t *testing.T) {
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup, Method: base.Setup,
URL: mustParseURL(absoluteControlAttribute(desc.MediaDescriptions[0])), URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"1"}, "CSeq": base.HeaderValue{"1"},
"Transport": inTH.Marshal(), "Transport": inTH.Marshal(),
@@ -1017,7 +1017,7 @@ func TestServerSessionTeardown(t *testing.T) {
InterleavedIDs: &[2]int{0, 1}, InterleavedIDs: &[2]int{0, 1},
} }
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res) session := readSession(t, res)

View File

@@ -25,7 +25,7 @@ func (p *clientAddr) fill(ip net.IP, port int) {
} }
} }
func serverAllocateUDPListenerMulticastPair( func allocateUDPListenerMulticastPair(
listenPacket func(network, address string) (net.PacketConn, error), listenPacket func(network, address string) (net.PacketConn, error),
writeTimeout time.Duration, writeTimeout time.Duration,
multicastRTPPort int, multicastRTPPort int,