server: fill SSRC in SETUP responses and RTP-Info in PLAY responses automatically

This commit is contained in:
aler9
2021-06-17 15:39:18 +02:00
committed by Alessandro Ros
parent 3f3226b53d
commit db28e87ecb
9 changed files with 383 additions and 89 deletions

View File

@@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"golang.org/x/net/ipv4"
@@ -64,16 +65,16 @@ func TestServerReadSetupPath(t *testing.T) {
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
stream := NewServerStream(Tracks{track})
stream := NewServerStream(Tracks{track, track, track, track, track})
s := &Server{
Handler: &testServerHandler{
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
require.Equal(t, ca.path, ctx.Path)
require.Equal(t, ca.trackID, ctx.TrackID)
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
},
}
@@ -128,10 +129,10 @@ func TestServerReadErrorSetupDifferentPaths(t *testing.T) {
require.Equal(t, "can't setup tracks with different paths", ctx.Error.Error())
close(connClosed)
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
},
}
@@ -200,10 +201,10 @@ func TestServerReadErrorSetupTrackTwice(t *testing.T) {
require.Equal(t, "track 0 has already been setup", ctx.Error.Error())
close(connClosed)
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
},
}
@@ -291,12 +292,10 @@ func TestServerRead(t *testing.T) {
onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) {
close(sessionClosed)
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
v := uint32(123456)
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, &v, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
@@ -391,7 +390,6 @@ func TestServerRead(t *testing.T) {
var th headers.Transport
err = th.Read(res.Header["Transport"])
require.NoError(t, err)
require.Equal(t, uint32(123456), *th.SSRC)
<-sessionOpened
@@ -566,10 +564,10 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) {
close(writerTerminate)
<-writerDone
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
@@ -655,10 +653,10 @@ func TestServerReadPlayPlay(t *testing.T) {
s := &Server{
Handler: &testServerHandler{
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
@@ -740,10 +738,10 @@ func TestServerReadPlayPausePlay(t *testing.T) {
close(writerTerminate)
<-writerDone
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
if !writerStarted {
@@ -857,10 +855,10 @@ func TestServerReadPlayPausePause(t *testing.T) {
close(writerTerminate)
<-writerDone
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
@@ -985,10 +983,10 @@ func TestServerReadTimeout(t *testing.T) {
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
@@ -1080,10 +1078,10 @@ func TestServerReadWithoutTeardown(t *testing.T) {
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
@@ -1166,10 +1164,10 @@ func TestServerReadUDPChangeConn(t *testing.T) {
s := &Server{
Handler: &testServerHandler{
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
@@ -1264,10 +1262,10 @@ func TestServerReadNonSetuppedPath(t *testing.T) {
s := &Server{
Handler: &testServerHandler{
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) {
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil, nil
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
@@ -1335,3 +1333,207 @@ func TestServerReadNonSetuppedPath(t *testing.T) {
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload)
}
func TestServerReadAdditionalInfos(t *testing.T) {
getInfos := func() (*headers.RTPInfo, []*uint32) {
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ssrcs := make([]*uint32, 2)
inTH := &headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
Protocol: base.StreamProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}
res, err := writeReqReadRes(bconn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": inTH.Write(),
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var th headers.Transport
err = th.Read(res.Header["Transport"])
require.NoError(t, err)
ssrcs[0] = th.SSRC
inTH = &headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
Protocol: base.StreamProtocolTCP,
InterleavedIDs: &[2]int{2, 3},
}
res, err = writeReqReadRes(bconn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=1"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": inTH.Write(),
"Session": res.Header["Session"],
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
th = headers.Transport{}
err = th.Read(res.Header["Transport"])
require.NoError(t, err)
ssrcs[1] = th.SSRC
res, err = writeReqReadRes(bconn, base.Request{
Method: base.Play,
URL: mustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"3"},
"Session": res.Header["Session"],
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var ri headers.RTPInfo
err = ri.Read(res.Header["RTP-Info"])
require.NoError(t, err)
return &ri, ssrcs
}
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
stream := NewServerStream(Tracks{track, track})
s := &Server{
Handler: &testServerHandler{
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(1 * time.Second)
stream.WriteFrame(1, base.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04})
stream.WriteFrame(0, base.StreamTypeRTP, []byte{0x05, 0x06, 0x07, 0x08})
}()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
}
err = s.Start("localhost:8554")
require.NoError(t, err)
defer s.Close()
buf, err := (&rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 556,
Timestamp: 984512368,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal()
require.NoError(t, err)
stream.WriteFrame(0, StreamTypeRTP, buf)
rtpInfo, ssrcs := getInfos()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*rtpInfo)[0].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
func() *uint32 {
v := uint32(96342362)
return &v
}(),
nil,
}, ssrcs)
buf, err = (&rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 536474323,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal()
require.NoError(t, err)
stream.WriteFrame(1, StreamTypeRTP, buf)
rtpInfo, ssrcs = getInfos()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*rtpInfo)[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: "/teststream/trackID=1",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*rtpInfo)[1].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
func() *uint32 {
v := uint32(96342362)
return &v
}(),
func() *uint32 {
v := uint32(536474323)
return &v
}(),
}, ssrcs)
}