server: do not allow a reader to use the same UDP ports of another reader

This commit is contained in:
aler9
2021-09-23 19:16:01 +02:00
parent 7ac0d79ae9
commit 5ef9076357
4 changed files with 153 additions and 6 deletions

View File

@@ -265,3 +265,14 @@ type ErrServerCannotSetupFromDifferentIPs struct{}
func (e ErrServerCannotSetupFromDifferentIPs) Error() string {
return "cannot setup tracks from different IPs"
}
// ErrServerUDPPortsAlreadyInUse is an error that can be returned by a server.
type ErrServerUDPPortsAlreadyInUse struct {
Port int
}
// Error implements the error interface.
func (e ErrServerUDPPortsAlreadyInUse) Error() string {
return fmt.Sprintf("UDP ports %d and %d are already in use by another reader",
e.Port, e.Port+1)
}

View File

@@ -1294,6 +1294,109 @@ func TestServerReadUDPChangeConn(t *testing.T) {
}()
}
func TestServerReadErrorUDPSamePorts(t *testing.T) {
track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}})
require.NoError(t, err)
stream := NewServerStream(Tracks{track})
s := &Server{
Handler: &testServerHandler{
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onFrame: func(ctx *ServerHandlerOnFrameCtx) {
},
},
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
}
err = s.Start("localhost:8554")
require.NoError(t, err)
defer s.Close()
func() {
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
inTH := &headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
Protocol: base.StreamProtocolUDP,
ClientPorts: &[2]int{35466, 35467},
}
res, err := writeReqReadRes(bconn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": inTH.Write(),
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
res, err = writeReqReadRes(bconn, base.Request{
Method: base.Play,
URL: mustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Session": res.Header["Session"],
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}()
func() {
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
inTH := &headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
Protocol: base.StreamProtocolUDP,
ClientPorts: &[2]int{35466, 35467},
}
res, err := writeReqReadRes(bconn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": inTH.Write(),
},
})
require.NoError(t, err)
require.Equal(t, base.StatusBadRequest, res.StatusCode)
}()
}
func TestServerReadNonSetuppedPath(t *testing.T) {
track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}})
require.NoError(t, err)

View File

@@ -594,7 +594,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerCannotSetupFromDifferentIPs{}
}
} else if ss.s.MulticastIPRange == "" {
return &base.Response{
StatusCode: base.StatusUnsupportedTransport,
@@ -647,7 +646,13 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
if res.StatusCode == base.StatusOK {
if ss.state == ServerSessionStateInitial {
err := stream.readerAdd(ss, delivery == base.StreamDeliveryMulticast)
err := stream.readerAdd(ss,
inTH.Protocol,
delivery,
sc.ip(),
sc.zone(),
inTH.ClientPorts,
)
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/liberrors"
)
type listenerPair struct {
@@ -111,12 +112,17 @@ func (st *ServerStream) lastSequenceNumber(trackID int) uint16 {
return uint16(atomic.LoadUint32(&st.trackInfos[trackID].lastSequenceNumber))
}
func (st *ServerStream) readerAdd(ss *ServerSession, isMulticast bool) error {
func (st *ServerStream) readerAdd(
ss *ServerSession,
protocol base.StreamProtocol,
delivery base.StreamDelivery,
ip net.IP,
zone string,
clientPorts *[2]int,
) error {
st.mutex.Lock()
defer st.mutex.Unlock()
st.readers[ss] = struct{}{}
if st.s == nil {
st.s = ss.s
select {
@@ -125,7 +131,27 @@ func (st *ServerStream) readerAdd(ss *ServerSession, isMulticast bool) error {
}
}
if isMulticast && st.multicastListeners == nil {
// if new reader is a UDP-unicast reader, check that its port are not already
// in use by another reader.
if protocol == base.StreamProtocolUDP && delivery == base.StreamDeliveryUnicast {
for r := range st.readersUnicast {
if *r.setuppedProtocol == base.StreamProtocolUDP &&
*r.setuppedDelivery == base.StreamDeliveryUnicast &&
r.udpIP.Equal(ip) &&
r.udpZone == zone {
for _, rt := range r.setuppedTracks {
if rt.udpRTPPort == clientPorts[0] {
return liberrors.ErrServerUDPPortsAlreadyInUse{Port: rt.udpRTPPort}
}
}
}
}
}
// allocate multicast listeners
if protocol == base.StreamProtocolUDP &&
delivery == base.StreamDeliveryMulticast &&
st.multicastListeners == nil {
st.multicastListeners = make([]*listenerPair, len(st.tracks))
for i := range st.tracks {
@@ -148,6 +174,8 @@ func (st *ServerStream) readerAdd(ss *ServerSession, isMulticast bool) error {
}
}
st.readers[ss] = struct{}{}
return nil
}