diff --git a/README.md b/README.md index 11c24f6c..3bbe2fab 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ Features: * Sessions and connections are independent * Read streams from clients with UDP, TCP or TLS/RTSPS * Write streams to clients with UDP, UDP-multicast, TCP or TLS/RTSPS + * Provide SSRC, RTP-Info automatically * Generate RTCP receiver reports automatically * Utilities * Encode and decode RTSP primitives, RTP/H264, RTP/AAC, SDP diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index ab741fd6..6e8b36b0 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -91,19 +91,19 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // called after receiving a SETUP request. -func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, *uint32, error) { +func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { log.Printf("setup request") // no one is publishing yet if sh.stream == nil { return &base.Response{ StatusCode: base.StatusNotFound, - }, nil, nil, nil + }, nil, nil } return &base.Response{ StatusCode: base.StatusOK, - }, sh.stream, nil, nil + }, sh.stream, nil } // called after receiving a PLAY request. diff --git a/examples/server/main.go b/examples/server/main.go index bbbad3cb..21920f2b 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -90,19 +90,19 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // called after receiving a SETUP request. -func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, *uint32, error) { +func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { log.Printf("setup request") // no one is publishing yet if sh.stream == nil { return &base.Response{ StatusCode: base.StatusNotFound, - }, nil, nil, nil + }, nil, nil } return &base.Response{ StatusCode: base.StatusOK, - }, sh.stream, nil, nil + }, sh.stream, nil } // called after receiving a PLAY request. diff --git a/server_publish_test.go b/server_publish_test.go index b77ea810..7a0dc610 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -301,12 +301,12 @@ func TestServerPublishSetupPath(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { require.Equal(t, ca.path, ctx.Path) require.Equal(t, ca.trackID, ctx.TrackID) return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, }, } @@ -398,10 +398,10 @@ func TestServerPublishErrorSetupDifferentPaths(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, }, } @@ -480,10 +480,10 @@ func TestServerPublishErrorSetupTrackTwice(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, }, } @@ -574,10 +574,10 @@ func TestServerPublishErrorRecordPartialTracks(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ @@ -694,10 +694,10 @@ func TestServerPublish(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ @@ -912,10 +912,10 @@ func TestServerPublishErrorWrongProtocol(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ @@ -1018,10 +1018,10 @@ func TestServerPublishRTCPReport(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ @@ -1174,10 +1174,10 @@ func TestServerPublishTimeout(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ @@ -1302,10 +1302,10 @@ func TestServerPublishWithoutTeardown(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ @@ -1414,10 +1414,10 @@ func TestServerPublishUDPChangeConn(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { return &base.Response{ diff --git a/server_read_test.go b/server_read_test.go index 53216dd6..a41a3942 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/pion/rtp" "github.com/stretchr/testify/require" "golang.org/x/net/ipv4" @@ -64,16 +65,16 @@ func TestServerReadSetupPath(t *testing.T) { track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) require.NoError(t, err) - stream := NewServerStream(Tracks{track}) + stream := NewServerStream(Tracks{track, track, track, track, track}) s := &Server{ Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { require.Equal(t, ca.path, ctx.Path) require.Equal(t, ca.trackID, ctx.TrackID) return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, }, } @@ -128,10 +129,10 @@ func TestServerReadErrorSetupDifferentPaths(t *testing.T) { require.Equal(t, "can't setup tracks with different paths", ctx.Error.Error()) close(connClosed) }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, }, } @@ -200,10 +201,10 @@ func TestServerReadErrorSetupTrackTwice(t *testing.T) { require.Equal(t, "track 0 has already been setup", ctx.Error.Error()) close(connClosed) }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, }, } @@ -291,12 +292,10 @@ func TestServerRead(t *testing.T) { onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { close(sessionClosed) }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { - v := uint32(123456) - + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, &v, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { @@ -391,7 +390,6 @@ func TestServerRead(t *testing.T) { var th headers.Transport err = th.Read(res.Header["Transport"]) require.NoError(t, err) - require.Equal(t, uint32(123456), *th.SSRC) <-sessionOpened @@ -566,10 +564,10 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { close(writerTerminate) <-writerDone }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { @@ -655,10 +653,10 @@ func TestServerReadPlayPlay(t *testing.T) { s := &Server{ Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -740,10 +738,10 @@ func TestServerReadPlayPausePlay(t *testing.T) { close(writerTerminate) <-writerDone }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { if !writerStarted { @@ -857,10 +855,10 @@ func TestServerReadPlayPausePause(t *testing.T) { close(writerTerminate) <-writerDone }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { @@ -985,10 +983,10 @@ func TestServerReadTimeout(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -1080,10 +1078,10 @@ func TestServerReadWithoutTeardown(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -1166,10 +1164,10 @@ func TestServerReadUDPChangeConn(t *testing.T) { s := &Server{ Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -1264,10 +1262,10 @@ func TestServerReadNonSetuppedPath(t *testing.T) { s := &Server{ Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { @@ -1335,3 +1333,207 @@ func TestServerReadNonSetuppedPath(t *testing.T) { require.Equal(t, StreamTypeRTP, f.StreamType) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) } + +func TestServerReadAdditionalInfos(t *testing.T) { + getInfos := func() (*headers.RTPInfo, []*uint32) { + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + ssrcs := make([]*uint32, 2) + + inTH := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + Protocol: base.StreamProtocolTCP, + InterleavedIDs: &[2]int{0, 1}, + } + + res, err := writeReqReadRes(bconn, base.Request{ + Method: base.Setup, + URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": inTH.Write(), + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var th headers.Transport + err = th.Read(res.Header["Transport"]) + require.NoError(t, err) + ssrcs[0] = th.SSRC + + inTH = &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + Protocol: base.StreamProtocolTCP, + InterleavedIDs: &[2]int{2, 3}, + } + + res, err = writeReqReadRes(bconn, base.Request{ + Method: base.Setup, + URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=1"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": inTH.Write(), + "Session": res.Header["Session"], + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + th = headers.Transport{} + err = th.Read(res.Header["Transport"]) + require.NoError(t, err) + ssrcs[1] = th.SSRC + + res, err = writeReqReadRes(bconn, base.Request{ + Method: base.Play, + URL: mustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + "Session": res.Header["Session"], + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var ri headers.RTPInfo + err = ri.Read(res.Header["RTP-Info"]) + require.NoError(t, err) + + return &ri, ssrcs + } + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + stream := NewServerStream(Tracks{track, track}) + + s := &Server{ + Handler: &testServerHandler{ + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + go func() { + time.Sleep(1 * time.Second) + stream.WriteFrame(1, base.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) + stream.WriteFrame(0, base.StreamTypeRTP, []byte{0x05, 0x06, 0x07, 0x08}) + }() + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + } + + err = s.Start("localhost:8554") + require.NoError(t, err) + defer s.Close() + + buf, err := (&rtp.Packet{ + Header: rtp.Header{ + Version: 0x80, + PayloadType: 96, + SequenceNumber: 556, + Timestamp: 984512368, + SSRC: 96342362, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }).Marshal() + require.NoError(t, err) + stream.WriteFrame(0, StreamTypeRTP, buf) + + rtpInfo, ssrcs := getInfos() + require.Equal(t, &headers.RTPInfo{ + &headers.RTPInfoEntry{ + URL: (&base.URL{ + Scheme: "rtsp", + Host: "localhost:8554", + Path: "/teststream/trackID=0", + }).String(), + SequenceNumber: func() *uint16 { + v := uint16(556) + return &v + }(), + Timestamp: (*rtpInfo)[0].Timestamp, + }, + }, rtpInfo) + require.Equal(t, []*uint32{ + func() *uint32 { + v := uint32(96342362) + return &v + }(), + nil, + }, ssrcs) + + buf, err = (&rtp.Packet{ + Header: rtp.Header{ + Version: 0x80, + PayloadType: 96, + SequenceNumber: 87, + Timestamp: 756436454, + SSRC: 536474323, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }).Marshal() + require.NoError(t, err) + stream.WriteFrame(1, StreamTypeRTP, buf) + + rtpInfo, ssrcs = getInfos() + require.Equal(t, &headers.RTPInfo{ + &headers.RTPInfoEntry{ + URL: (&base.URL{ + Scheme: "rtsp", + Host: "localhost:8554", + Path: "/teststream/trackID=0", + }).String(), + SequenceNumber: func() *uint16 { + v := uint16(556) + return &v + }(), + Timestamp: (*rtpInfo)[0].Timestamp, + }, + &headers.RTPInfoEntry{ + URL: (&base.URL{ + Scheme: "rtsp", + Host: "localhost:8554", + Path: "/teststream/trackID=1", + }).String(), + SequenceNumber: func() *uint16 { + v := uint16(87) + return &v + }(), + Timestamp: (*rtpInfo)[1].Timestamp, + }, + }, rtpInfo) + require.Equal(t, []*uint32{ + func() *uint32 { + v := uint32(96342362) + return &v + }(), + func() *uint32 { + v := uint32(536474323) + return &v + }(), + }, ssrcs) +} diff --git a/server_test.go b/server_test.go index 7db4a130..405d52f3 100644 --- a/server_test.go +++ b/server_test.go @@ -44,7 +44,7 @@ type testServerHandler struct { onSessionClose func(*ServerHandlerOnSessionCloseCtx) onDescribe func(*ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) onAnnounce func(*ServerHandlerOnAnnounceCtx) (*base.Response, error) - onSetup func(*ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) + onSetup func(*ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) onPlay func(*ServerHandlerOnPlayCtx) (*base.Response, error) onRecord func(*ServerHandlerOnRecordCtx) (*base.Response, error) onPause func(*ServerHandlerOnPauseCtx) (*base.Response, error) @@ -91,11 +91,11 @@ func (sh *testServerHandler) OnAnnounce(ctx *ServerHandlerOnAnnounceCtx) (*base. return nil, fmt.Errorf("unimplemented") } -func (sh *testServerHandler) OnSetup(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { +func (sh *testServerHandler) OnSetup(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { if sh.onSetup != nil { return sh.onSetup(ctx) } - return nil, nil, nil, fmt.Errorf("unimplemented") + return nil, nil, fmt.Errorf("unimplemented") } func (sh *testServerHandler) OnPlay(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { @@ -328,22 +328,22 @@ func TestServerHighLevelPublishRead(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { if ctx.Path != "teststream" { return &base.Response{ StatusCode: base.StatusBadRequest, - }, nil, nil, fmt.Errorf("invalid path (%s)", ctx.Req.URL) + }, nil, fmt.Errorf("invalid path (%s)", ctx.Req.URL) } if stream == nil { return &base.Response{ StatusCode: base.StatusNotFound, - }, nil, nil, nil + }, nil, nil } return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { if ctx.Path != "teststream" { @@ -637,10 +637,10 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { s := &Server{ Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -734,10 +734,10 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { s := &Server{ Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -942,10 +942,10 @@ func TestServerSessionClose(t *testing.T) { onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { close(sessionClosed) }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil, nil + }, nil, nil }, }, } @@ -996,10 +996,10 @@ func TestServerSessionAutoClose(t *testing.T) { onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { close(sessionClosed) }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, }, } @@ -1068,10 +1068,10 @@ func TestServerErrorInvalidPath(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) { + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil, nil + }, stream, nil }, onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ diff --git a/serverhandler.go b/serverhandler.go index b8c2dc69..50c9739a 100644 --- a/serverhandler.go +++ b/serverhandler.go @@ -104,9 +104,11 @@ type ServerHandlerOnSetupCtx struct { // ServerHandlerOnSetup can be implemented by a ServerHandler. type ServerHandlerOnSetup interface { - // returns a Response and an optional SSRC that is - // inserted into the Transport header. - OnSetup(*ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, *uint32, error) + // must return a Response and a stream. + // the stream is needed to + // - add the session the the stream's readers + // - send the stream SSRC to the session + OnSetup(*ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) } // ServerHandlerOnPlayCtx is the context of a PLAY request. diff --git a/serversession.go b/serversession.go index adce0adb..59e6c386 100644 --- a/serversession.go +++ b/serversession.go @@ -613,7 +613,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }, liberrors.ErrServerTracksDifferentProtocols{} } - res, stream, ssrc, err := ss.s.Handler.(ServerHandlerOnSetup).OnSetup(&ServerHandlerOnSetupCtx{ + res, stream, err := ss.s.Handler.(ServerHandlerOnSetup).OnSetup(&ServerHandlerOnSetupCtx{ Server: ss.s, Session: ss, Conn: sc, @@ -625,6 +625,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }) if res.StatusCode == base.StatusOK { + th := headers.Transport{} + if ss.state == ServerSessionStateInitial { ss.state = ServerSessionStatePrePlay ss.setuppedPath = &path @@ -633,6 +635,13 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base stream.readerAdd(ss, delivery == base.StreamDeliveryMulticast) } + if ss.state == ServerSessionStatePrePlay { + ssrc := stream.ssrc(trackID) + if ssrc != 0 { + th.SSRC = &ssrc + } + } + ss.setuppedProtocol = &inTH.Protocol ss.setuppedDelivery = &delivery @@ -644,8 +653,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base res.Header = make(base.Header) } - th := headers.Transport{} - switch { case delivery == base.StreamDeliveryMulticast: ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{} @@ -683,10 +690,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base th.InterleavedIDs = inTH.InterleavedIDs } - if ssrc != nil { - th.SSRC = ssrc - } - res.Header["Transport"] = th.Write() } @@ -696,7 +699,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // this was causing problems during unit tests. if ua, ok := req.Header["User-Agent"]; ok && len(ua) == 1 && strings.HasPrefix(ua[0], "GStreamer") { - <-time.After(1 * time.Second) + select { + case <-time.After(1 * time.Second): + case <-ss.ctx.Done(): + } } return res, err @@ -750,6 +756,36 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.tcpConn = sc } + // add RTP-Info + var ri headers.RTPInfo + for trackID := range ss.setuppedTracks { + ts := ss.setuppedStream.timestamp(trackID) + if ts == 0 { + continue + } + + u := &base.URL{ + Scheme: req.URL.Scheme, + User: req.URL.User, + Host: req.URL.Host, + Path: "/" + *ss.setuppedPath + "/trackID=" + strconv.FormatInt(int64(trackID), 10), + } + + lsn := ss.setuppedStream.lastSequenceNumber(trackID) + + ri = append(ri, &headers.RTPInfoEntry{ + URL: u.String(), + SequenceNumber: &lsn, + Timestamp: &ts, + }) + } + if len(ri) > 0 { + if res.Header == nil { + res.Header = make(base.Header) + } + res.Header["RTP-Info"] = ri.Write() + } + ss.setuppedStream.readerSetActive(ss) if *ss.setuppedProtocol == base.StreamProtocolUDP && diff --git a/serverstream.go b/serverstream.go index c4def2ce..68ee80de 100644 --- a/serverstream.go +++ b/serverstream.go @@ -1,8 +1,11 @@ package gortsplib import ( + "encoding/binary" "net" "sync" + "sync/atomic" + "time" "github.com/aler9/gortsplib/pkg/base" ) @@ -12,7 +15,18 @@ type listenerPair struct { rtcpListener *serverUDPListener } -// ServerStream is an entity that allows to send frames to multiple readers. +type trackInfo struct { + lastSequenceNumber uint32 + lastTimeRTP uint32 + lastTimeNTP int64 + lastSSRC uint32 +} + +// ServerStream represents a single stream. +// This is in charge of +// - distributing the stream to each reader +// - allocating multicast listeners +// - gathering infos about the stream to generate SSRC and RTP-Info type ServerStream struct { s *Server tracks Tracks @@ -21,15 +35,23 @@ type ServerStream struct { readersUnicast map[*ServerSession]struct{} readers map[*ServerSession]struct{} multicastListeners []*listenerPair + trackInfos []*trackInfo } // NewServerStream allocates a ServerStream. func NewServerStream(tracks Tracks) *ServerStream { - return &ServerStream{ + st := &ServerStream{ tracks: tracks, readersUnicast: make(map[*ServerSession]struct{}), readers: make(map[*ServerSession]struct{}), } + + st.trackInfos = make([]*trackInfo, len(tracks)) + for i := range st.trackInfos { + st.trackInfos[i] = &trackInfo{} + } + + return st } // Close closes a ServerStream. @@ -65,16 +87,33 @@ func (st *ServerStream) Tracks() Tracks { return st.tracks } +func (st *ServerStream) ssrc(trackID int) uint32 { + return atomic.LoadUint32(&st.trackInfos[trackID].lastSSRC) +} + +func (st *ServerStream) timestamp(trackID int) uint32 { + lastTimeRTP := atomic.LoadUint32(&st.trackInfos[trackID].lastTimeRTP) + lastTimeNTP := atomic.LoadInt64(&st.trackInfos[trackID].lastTimeNTP) + clockRate, _ := st.tracks[trackID].ClockRate() + + if lastTimeRTP == 0 || lastTimeNTP == 0 { + return 0 + } + + return uint32(uint64(lastTimeRTP) + + uint64(time.Since(time.Unix(lastTimeNTP, 0)).Seconds()*float64(clockRate))) +} + +func (st *ServerStream) lastSequenceNumber(trackID int) uint16 { + return uint16(atomic.LoadUint32(&st.trackInfos[trackID].lastSequenceNumber)) +} + func (st *ServerStream) readerAdd(ss *ServerSession, isMulticast bool) { st.mutex.Lock() defer st.mutex.Unlock() st.readers[ss] = struct{}{} - if !isMulticast { - return - } - if st.s == nil { st.s = ss.s select { @@ -83,7 +122,7 @@ func (st *ServerStream) readerAdd(ss *ServerSession, isMulticast bool) { } } - if st.multicastListeners != nil { + if !isMulticast || st.multicastListeners != nil { return } @@ -132,6 +171,20 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { // WriteFrame writes a frame to all the readers of the stream. func (st *ServerStream) WriteFrame(trackID int, streamType StreamType, payload []byte) { + if streamType == StreamTypeRTP && len(payload) >= 8 { + track := st.trackInfos[trackID] + + sequenceNumber := binary.BigEndian.Uint16(payload[2:4]) + atomic.StoreUint32(&track.lastSequenceNumber, uint32(sequenceNumber)) + + timestamp := binary.BigEndian.Uint32(payload[4:8]) + atomic.StoreUint32(&track.lastTimeRTP, timestamp) + atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix()) + + ssrc := binary.BigEndian.Uint32(payload[8:12]) + atomic.StoreUint32(&track.lastSSRC, ssrc) + } + st.mutex.RLock() defer st.mutex.RUnlock()