mirror of
				https://github.com/aler9/gortsplib
				synced 2025-10-31 02:26:57 +08:00 
			
		
		
		
	add RTCP tests
This commit is contained in:
		| @@ -17,13 +17,13 @@ Features: | |||||||
|   * Encrypt connection with TLS (RTSPS) |   * Encrypt connection with TLS (RTSPS) | ||||||
|   * Reading |   * Reading | ||||||
|     * Read streams from servers with UDP or TCP |     * Read streams from servers with UDP or TCP | ||||||
|     * Select protocol automatically |     * Switch protocol automatically (switch to TCP in case of code 451 or UDP timeout) | ||||||
|     * Read only selected tracks of a stream |     * Read only selected tracks of a stream | ||||||
|     * Pause reading without disconnecting from the server |     * Pause reading without disconnecting from the server | ||||||
|     * Generate RTCP receiver reports automatically |     * Generate RTCP receiver reports automatically | ||||||
|   * Publishing |   * Publishing | ||||||
|     * Publish streams to servers with UDP or TCP |     * Publish streams to servers with UDP or TCP | ||||||
|     * Select protocol automatically |     * Switch protocol automatically (switch to TCP in case of code 451) | ||||||
|     * Pause publishing without disconnecting from the server |     * Pause publishing without disconnecting from the server | ||||||
|     * Generate RTCP sender reports automatically |     * Generate RTCP sender reports automatically | ||||||
| * Server | * Server | ||||||
|   | |||||||
| @@ -85,6 +85,9 @@ type ClientConf struct { | |||||||
| 	// function used to initialize UDP listeners. | 	// function used to initialize UDP listeners. | ||||||
| 	// It defaults to net.ListenPacket. | 	// It defaults to net.ListenPacket. | ||||||
| 	ListenPacket func(network, address string) (net.PacketConn, error) | 	ListenPacket func(network, address string) (net.PacketConn, error) | ||||||
|  |  | ||||||
|  | 	senderReportPeriod   time.Duration | ||||||
|  | 	receiverReportPeriod time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| // Dial connects to a server. | // Dial connects to a server. | ||||||
|   | |||||||
| @@ -30,8 +30,6 @@ import ( | |||||||
| const ( | const ( | ||||||
| 	clientConnReadBufferSize       = 4096 | 	clientConnReadBufferSize       = 4096 | ||||||
| 	clientConnWriteBufferSize      = 4096 | 	clientConnWriteBufferSize      = 4096 | ||||||
| 	clientConnReceiverReportPeriod = 10 * time.Second |  | ||||||
| 	clientConnSenderReportPeriod   = 10 * time.Second |  | ||||||
| 	clientConnUDPCheckStreamPeriod = 1 * time.Second | 	clientConnUDPCheckStreamPeriod = 1 * time.Second | ||||||
| 	clientConnUDPKeepalivePeriod   = 30 * time.Second | 	clientConnUDPKeepalivePeriod   = 30 * time.Second | ||||||
| 	clientConnTCPSetDeadlinePeriod = 1 * time.Second | 	clientConnTCPSetDeadlinePeriod = 1 * time.Second | ||||||
| @@ -127,6 +125,13 @@ func newClientConn(conf ClientConf, scheme string, host string) (*ClientConn, er | |||||||
| 		conf.ListenPacket = net.ListenPacket | 		conf.ListenPacket = net.ListenPacket | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if conf.senderReportPeriod == 0 { | ||||||
|  | 		conf.senderReportPeriod = 10 * time.Second | ||||||
|  | 	} | ||||||
|  | 	if conf.receiverReportPeriod == 0 { | ||||||
|  | 		conf.receiverReportPeriod = 10 * time.Second | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	cc := &ClientConn{ | 	cc := &ClientConn{ | ||||||
| 		conf:             conf, | 		conf:             conf, | ||||||
| 		udpRTPListeners:  make(map[int]*clientConnUDPListener), | 		udpRTPListeners:  make(map[int]*clientConnUDPListener), | ||||||
|   | |||||||
| @@ -76,7 +76,7 @@ func (cc *ClientConn) backgroundRecordUDP() { | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	reportTicker := time.NewTicker(clientConnSenderReportPeriod) | 	reportTicker := time.NewTicker(cc.conf.senderReportPeriod) | ||||||
| 	defer reportTicker.Stop() | 	defer reportTicker.Stop() | ||||||
|  |  | ||||||
| 	for { | 	for { | ||||||
| @@ -112,7 +112,7 @@ func (cc *ClientConn) backgroundRecordTCP() { | |||||||
| 		cc.publishOpen = false | 		cc.publishOpen = false | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	reportTicker := time.NewTicker(clientConnSenderReportPeriod) | 	reportTicker := time.NewTicker(cc.conf.senderReportPeriod) | ||||||
| 	defer reportTicker.Stop() | 	defer reportTicker.Stop() | ||||||
|  |  | ||||||
| 	for { | 	for { | ||||||
|   | |||||||
| @@ -7,10 +7,13 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/pion/rtcp" | ||||||
|  | 	"github.com/pion/rtp" | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
|  |  | ||||||
| 	"github.com/aler9/gortsplib/pkg/base" | 	"github.com/aler9/gortsplib/pkg/base" | ||||||
| 	"github.com/aler9/gortsplib/pkg/headers" | 	"github.com/aler9/gortsplib/pkg/headers" | ||||||
|  | 	"github.com/aler9/gortsplib/pkg/rtcpreceiver" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestClientPublishSerial(t *testing.T) { | func TestClientPublishSerial(t *testing.T) { | ||||||
| @@ -113,9 +116,6 @@ func TestClientPublishSerial(t *testing.T) { | |||||||
| 				conn.Close() | 				conn.Close() | ||||||
| 			}() | 			}() | ||||||
|  |  | ||||||
| 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) |  | ||||||
| 			require.NoError(t, err) |  | ||||||
|  |  | ||||||
| 			conf := ClientConf{ | 			conf := ClientConf{ | ||||||
| 				StreamProtocol: func() *StreamProtocol { | 				StreamProtocol: func() *StreamProtocol { | ||||||
| 					if proto == "udp" { | 					if proto == "udp" { | ||||||
| @@ -127,6 +127,9 @@ func TestClientPublishSerial(t *testing.T) { | |||||||
| 				}(), | 				}(), | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 			require.NoError(t, err) | ||||||
|  |  | ||||||
| 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | ||||||
| 				Tracks{track}) | 				Tracks{track}) | ||||||
| 			require.NoError(t, err) | 			require.NoError(t, err) | ||||||
| @@ -244,12 +247,6 @@ func TestClientPublishParallel(t *testing.T) { | |||||||
| 				conn.Close() | 				conn.Close() | ||||||
| 			}() | 			}() | ||||||
|  |  | ||||||
| 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) |  | ||||||
| 			require.NoError(t, err) |  | ||||||
|  |  | ||||||
| 			writerDone := make(chan struct{}) |  | ||||||
| 			defer func() { <-writerDone }() |  | ||||||
|  |  | ||||||
| 			conf := ClientConf{ | 			conf := ClientConf{ | ||||||
| 				StreamProtocol: func() *StreamProtocol { | 				StreamProtocol: func() *StreamProtocol { | ||||||
| 					if proto == "udp" { | 					if proto == "udp" { | ||||||
| @@ -261,6 +258,12 @@ func TestClientPublishParallel(t *testing.T) { | |||||||
| 				}(), | 				}(), | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 			require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 			writerDone := make(chan struct{}) | ||||||
|  | 			defer func() { <-writerDone }() | ||||||
|  |  | ||||||
| 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | ||||||
| 				Tracks{track}) | 				Tracks{track}) | ||||||
| 			require.NoError(t, err) | 			require.NoError(t, err) | ||||||
| @@ -406,9 +409,6 @@ func TestClientPublishPauseSerial(t *testing.T) { | |||||||
| 				conn.Close() | 				conn.Close() | ||||||
| 			}() | 			}() | ||||||
|  |  | ||||||
| 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) |  | ||||||
| 			require.NoError(t, err) |  | ||||||
|  |  | ||||||
| 			conf := ClientConf{ | 			conf := ClientConf{ | ||||||
| 				StreamProtocol: func() *StreamProtocol { | 				StreamProtocol: func() *StreamProtocol { | ||||||
| 					if proto == "udp" { | 					if proto == "udp" { | ||||||
| @@ -420,6 +420,9 @@ func TestClientPublishPauseSerial(t *testing.T) { | |||||||
| 				}(), | 				}(), | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 			require.NoError(t, err) | ||||||
|  |  | ||||||
| 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | ||||||
| 				Tracks{track}) | 				Tracks{track}) | ||||||
| 			require.NoError(t, err) | 			require.NoError(t, err) | ||||||
| @@ -547,9 +550,6 @@ func TestClientPublishPauseParallel(t *testing.T) { | |||||||
| 				conn.Close() | 				conn.Close() | ||||||
| 			}() | 			}() | ||||||
|  |  | ||||||
| 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) |  | ||||||
| 			require.NoError(t, err) |  | ||||||
|  |  | ||||||
| 			conf := ClientConf{ | 			conf := ClientConf{ | ||||||
| 				StreamProtocol: func() *StreamProtocol { | 				StreamProtocol: func() *StreamProtocol { | ||||||
| 					if proto == "udp" { | 					if proto == "udp" { | ||||||
| @@ -561,6 +561,9 @@ func TestClientPublishPauseParallel(t *testing.T) { | |||||||
| 				}(), | 				}(), | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | 			track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 			require.NoError(t, err) | ||||||
|  |  | ||||||
| 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | 			conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | ||||||
| 				Tracks{track}) | 				Tracks{track}) | ||||||
| 			require.NoError(t, err) | 			require.NoError(t, err) | ||||||
| @@ -591,3 +594,162 @@ func TestClientPublishPauseParallel(t *testing.T) { | |||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestClientPublishRTCP(t *testing.T) { | ||||||
|  | 	l, err := net.Listen("tcp", "localhost:8554") | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer l.Close() | ||||||
|  |  | ||||||
|  | 	serverDone := make(chan struct{}) | ||||||
|  | 	defer func() { <-serverDone }() | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(serverDone) | ||||||
|  |  | ||||||
|  | 		conn, err := l.Accept() | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) | ||||||
|  |  | ||||||
|  | 		var req base.Request | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Options, req.Method) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 			Header: base.Header{ | ||||||
|  | 				"Public": base.HeaderValue{strings.Join([]string{ | ||||||
|  | 					string(base.Announce), | ||||||
|  | 					string(base.Setup), | ||||||
|  | 					string(base.Record), | ||||||
|  | 				}, ", ")}, | ||||||
|  | 			}, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Announce, req.Method) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Setup, req.Method) | ||||||
|  |  | ||||||
|  | 		var inTH headers.Transport | ||||||
|  | 		err = inTH.Read(req.Header["Transport"]) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		th := headers.Transport{ | ||||||
|  | 			Delivery: func() *base.StreamDelivery { | ||||||
|  | 				v := base.StreamDeliveryUnicast | ||||||
|  | 				return &v | ||||||
|  | 			}(), | ||||||
|  | 			Protocol:       StreamProtocolTCP, | ||||||
|  | 			InterleavedIDs: inTH.InterleavedIDs, | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 			Header: base.Header{ | ||||||
|  | 				"Transport": th.Write(), | ||||||
|  | 			}, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Record, req.Method) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		rr := rtcpreceiver.New(nil, 90000) | ||||||
|  |  | ||||||
|  | 		var f base.InterleavedFrame | ||||||
|  | 		f.Payload = make([]byte, 2048) | ||||||
|  | 		err = f.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, StreamTypeRTP, f.StreamType) | ||||||
|  | 		rr.ProcessFrame(time.Now(), StreamTypeRTP, f.Payload) | ||||||
|  |  | ||||||
|  | 		f.Payload = make([]byte, 2048) | ||||||
|  | 		err = f.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, StreamTypeRTCP, f.StreamType) | ||||||
|  | 		pkt, err := rtcp.Unmarshal(f.Payload) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		sr, ok := pkt[0].(*rtcp.SenderReport) | ||||||
|  | 		require.True(t, ok) | ||||||
|  | 		require.Equal(t, &rtcp.SenderReport{ | ||||||
|  | 			SSRC:        753621, | ||||||
|  | 			NTPTime:     sr.NTPTime, | ||||||
|  | 			RTPTime:     sr.RTPTime, | ||||||
|  | 			PacketCount: 1, | ||||||
|  | 			OctetCount:  4, | ||||||
|  | 		}, sr) | ||||||
|  | 		rr.ProcessFrame(time.Now(), StreamTypeRTCP, f.Payload) | ||||||
|  |  | ||||||
|  | 		err = base.InterleavedFrame{ | ||||||
|  | 			TrackID:    0, | ||||||
|  | 			StreamType: StreamTypeRTCP, | ||||||
|  | 			Payload:    rr.Report(time.Now()), | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		f.Payload = make([]byte, 2048) | ||||||
|  | 		err = f.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, StreamTypeRTP, f.StreamType) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Teardown, req.Method) | ||||||
|  |  | ||||||
|  | 		base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  |  | ||||||
|  | 		conn.Close() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	conf := ClientConf{ | ||||||
|  | 		StreamProtocol: func() *StreamProtocol { | ||||||
|  | 			v := StreamProtocolTCP | ||||||
|  | 			return &v | ||||||
|  | 		}(), | ||||||
|  | 		senderReportPeriod: 1 * time.Second, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", | ||||||
|  | 		Tracks{track}) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer conn.Close() | ||||||
|  |  | ||||||
|  | 	byts, _ := (&rtp.Packet{ | ||||||
|  | 		Header: rtp.Header{ | ||||||
|  | 			Version:        2, | ||||||
|  | 			Marker:         true, | ||||||
|  | 			PayloadType:    96, | ||||||
|  | 			SequenceNumber: 946, | ||||||
|  | 			Timestamp:      54352, | ||||||
|  | 			SSRC:           753621, | ||||||
|  | 		}, | ||||||
|  | 		Payload: []byte("\x01\x02\x03\x04"), | ||||||
|  | 	}).Marshal() | ||||||
|  | 	err = conn.WriteFrame(track.ID, StreamTypeRTP, byts) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	time.Sleep(1300 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	err = conn.WriteFrame(track.ID, StreamTypeRTP, byts) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | } | ||||||
|   | |||||||
| @@ -87,7 +87,7 @@ func (cc *ClientConn) backgroundPlayUDP() error { | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	reportTicker := time.NewTicker(clientConnReceiverReportPeriod) | 	reportTicker := time.NewTicker(cc.conf.receiverReportPeriod) | ||||||
| 	defer reportTicker.Stop() | 	defer reportTicker.Stop() | ||||||
|  |  | ||||||
| 	keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod) | 	keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod) | ||||||
| @@ -206,7 +206,7 @@ func (cc *ClientConn) backgroundPlayTCP() error { | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	reportTicker := time.NewTicker(clientConnReceiverReportPeriod) | 	reportTicker := time.NewTicker(cc.conf.receiverReportPeriod) | ||||||
| 	defer reportTicker.Stop() | 	defer reportTicker.Stop() | ||||||
|  |  | ||||||
| 	// for some reason, SetReadDeadline() must always be called in the same | 	// for some reason, SetReadDeadline() must always be called in the same | ||||||
|   | |||||||
| @@ -9,10 +9,13 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/pion/rtcp" | ||||||
|  | 	"github.com/pion/rtp" | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
|  |  | ||||||
| 	"github.com/aler9/gortsplib/pkg/base" | 	"github.com/aler9/gortsplib/pkg/base" | ||||||
| 	"github.com/aler9/gortsplib/pkg/headers" | 	"github.com/aler9/gortsplib/pkg/headers" | ||||||
|  | 	"github.com/aler9/gortsplib/pkg/rtcpsender" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestClientRead(t *testing.T) { | func TestClientRead(t *testing.T) { | ||||||
| @@ -976,3 +979,169 @@ func TestClientReadPause(t *testing.T) { | |||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestClientReadRTCP(t *testing.T) { | ||||||
|  | 	l, err := net.Listen("tcp", "localhost:8554") | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer l.Close() | ||||||
|  |  | ||||||
|  | 	serverDone := make(chan struct{}) | ||||||
|  | 	defer func() { <-serverDone }() | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(serverDone) | ||||||
|  |  | ||||||
|  | 		conn, err := l.Accept() | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		defer conn.Close() | ||||||
|  | 		bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) | ||||||
|  |  | ||||||
|  | 		var req base.Request | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Options, req.Method) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 			Header: base.Header{ | ||||||
|  | 				"Public": base.HeaderValue{strings.Join([]string{ | ||||||
|  | 					string(base.Describe), | ||||||
|  | 					string(base.Setup), | ||||||
|  | 					string(base.Play), | ||||||
|  | 				}, ", ")}, | ||||||
|  | 			}, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Describe, req.Method) | ||||||
|  |  | ||||||
|  | 		track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 			Header: base.Header{ | ||||||
|  | 				"Content-Type": base.HeaderValue{"application/sdp"}, | ||||||
|  | 			}, | ||||||
|  | 			Body: Tracks{track}.Write(), | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Setup, req.Method) | ||||||
|  |  | ||||||
|  | 		var th headers.Transport | ||||||
|  | 		err = th.Read(req.Header["Transport"]) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 			Header: base.Header{ | ||||||
|  | 				"Transport": headers.Transport{ | ||||||
|  | 					Protocol: StreamProtocolTCP, | ||||||
|  | 					Delivery: func() *base.StreamDelivery { | ||||||
|  | 						v := base.StreamDeliveryUnicast | ||||||
|  | 						return &v | ||||||
|  | 					}(), | ||||||
|  | 					ClientPorts:    th.ClientPorts, | ||||||
|  | 					InterleavedIDs: &[2]int{0, 1}, | ||||||
|  | 				}.Write(), | ||||||
|  | 			}, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		err = req.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, base.Play, req.Method) | ||||||
|  |  | ||||||
|  | 		err = base.Response{ | ||||||
|  | 			StatusCode: base.StatusOK, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		rs := rtcpsender.New(90000) | ||||||
|  |  | ||||||
|  | 		byts, _ := (&rtp.Packet{ | ||||||
|  | 			Header: rtp.Header{ | ||||||
|  | 				Version:        2, | ||||||
|  | 				Marker:         true, | ||||||
|  | 				PayloadType:    96, | ||||||
|  | 				SequenceNumber: 946, | ||||||
|  | 				Timestamp:      54352, | ||||||
|  | 				SSRC:           753621, | ||||||
|  | 			}, | ||||||
|  | 			Payload: []byte("\x01\x02\x03\x04"), | ||||||
|  | 		}).Marshal() | ||||||
|  | 		err = base.InterleavedFrame{ | ||||||
|  | 			TrackID:    0, | ||||||
|  | 			StreamType: StreamTypeRTP, | ||||||
|  | 			Payload:    byts, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		rs.ProcessFrame(time.Now(), StreamTypeRTP, byts) | ||||||
|  |  | ||||||
|  | 		err = base.InterleavedFrame{ | ||||||
|  | 			TrackID:    0, | ||||||
|  | 			StreamType: StreamTypeRTCP, | ||||||
|  | 			Payload:    rs.Report(time.Now()), | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 		var f base.InterleavedFrame | ||||||
|  | 		f.Payload = make([]byte, 2048) | ||||||
|  | 		err = f.Read(bconn.Reader) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		require.Equal(t, StreamTypeRTCP, f.StreamType) | ||||||
|  | 		pkt, err := rtcp.Unmarshal(f.Payload) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		rr, ok := pkt[0].(*rtcp.ReceiverReport) | ||||||
|  | 		require.True(t, ok) | ||||||
|  | 		require.Equal(t, &rtcp.ReceiverReport{ | ||||||
|  | 			SSRC: rr.SSRC, | ||||||
|  | 			Reports: []rtcp.ReceptionReport{ | ||||||
|  | 				{ | ||||||
|  | 					SSRC:               rr.Reports[0].SSRC, | ||||||
|  | 					LastSequenceNumber: 946, | ||||||
|  | 					LastSenderReport:   rr.Reports[0].LastSenderReport, | ||||||
|  | 					Delay:              rr.Reports[0].Delay, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			ProfileExtensions: []uint8{}, | ||||||
|  | 		}, rr) | ||||||
|  |  | ||||||
|  | 		err = base.InterleavedFrame{ | ||||||
|  | 			TrackID:    0, | ||||||
|  | 			StreamType: StreamTypeRTP, | ||||||
|  | 			Payload:    byts, | ||||||
|  | 		}.Write(bconn.Writer) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	conf := ClientConf{ | ||||||
|  | 		StreamProtocol: func() *StreamProtocol { | ||||||
|  | 			v := StreamProtocolTCP | ||||||
|  | 			return &v | ||||||
|  | 		}(), | ||||||
|  | 		receiverReportPeriod: 1 * time.Second, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	conn, err := conf.DialRead("rtsp://localhost:8554/teststream") | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	recv := 0 | ||||||
|  | 	recvDone := make(chan struct{}) | ||||||
|  | 	done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) { | ||||||
|  | 		recv++ | ||||||
|  | 		if recv >= 3 { | ||||||
|  | 			close(recvDone) | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  |  | ||||||
|  | 	time.Sleep(1300 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	<-recvDone | ||||||
|  | 	conn.Close() | ||||||
|  | 	<-done | ||||||
|  | } | ||||||
|   | |||||||
| @@ -27,10 +27,15 @@ func newServer(conf ServerConf, address string) (*Server, error) { | |||||||
| 	if conf.ReadBufferSize == 0 { | 	if conf.ReadBufferSize == 0 { | ||||||
| 		conf.ReadBufferSize = 2048 | 		conf.ReadBufferSize = 2048 | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if conf.Listen == nil { | 	if conf.Listen == nil { | ||||||
| 		conf.Listen = net.Listen | 		conf.Listen = net.Listen | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if conf.receiverReportPeriod == 0 { | ||||||
|  | 		conf.receiverReportPeriod = 10 * time.Second | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if conf.TLSConfig != nil && conf.UDPRTPAddress != "" { | 	if conf.TLSConfig != nil && conf.UDPRTPAddress != "" { | ||||||
| 		return nil, fmt.Errorf("TLS can't be used together with UDP") | 		return nil, fmt.Errorf("TLS can't be used together with UDP") | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -52,6 +52,8 @@ type ServerConf struct { | |||||||
| 	// function used to initialize the TCP listener. | 	// function used to initialize the TCP listener. | ||||||
| 	// It defaults to net.Listen | 	// It defaults to net.Listen | ||||||
| 	Listen func(network string, address string) (net.Listener, error) | 	Listen func(network string, address string) (net.Listener, error) | ||||||
|  |  | ||||||
|  | 	receiverReportPeriod time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| // Serve starts a server on the given address. | // Serve starts a server on the given address. | ||||||
|   | |||||||
| @@ -21,8 +21,7 @@ import ( | |||||||
| const ( | const ( | ||||||
| 	serverConnReadBufferSize    = 4096 | 	serverConnReadBufferSize    = 4096 | ||||||
| 	serverConnWriteBufferSize   = 4096 | 	serverConnWriteBufferSize   = 4096 | ||||||
| 	serverConnCheckStreamInterval    = 5 * time.Second | 	serverConnCheckStreamPeriod = 5 * time.Second | ||||||
| 	serverConnReceiverReportInterval = 10 * time.Second |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func stringsReverseIndex(s, substr string) int { | func stringsReverseIndex(s, substr string) int { | ||||||
| @@ -1180,10 +1179,10 @@ func (sc *ServerConn) WriteFrame(trackID int, streamType StreamType, payload []b | |||||||
| func (sc *ServerConn) backgroundRecord() { | func (sc *ServerConn) backgroundRecord() { | ||||||
| 	defer close(sc.backgroundRecordDone) | 	defer close(sc.backgroundRecordDone) | ||||||
|  |  | ||||||
| 	checkStreamTicker := time.NewTicker(serverConnCheckStreamInterval) | 	checkStreamTicker := time.NewTicker(serverConnCheckStreamPeriod) | ||||||
| 	defer checkStreamTicker.Stop() | 	defer checkStreamTicker.Stop() | ||||||
|  |  | ||||||
| 	receiverReportTicker := time.NewTicker(serverConnReceiverReportInterval) | 	receiverReportTicker := time.NewTicker(sc.conf.receiverReportPeriod) | ||||||
| 	defer receiverReportTicker.Stop() | 	defer receiverReportTicker.Stop() | ||||||
|  |  | ||||||
| 	for { | 	for { | ||||||
|   | |||||||
| @@ -8,6 +8,8 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/pion/rtcp" | ||||||
|  | 	"github.com/pion/rtp" | ||||||
| 	psdp "github.com/pion/sdp/v3" | 	psdp "github.com/pion/sdp/v3" | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
|  |  | ||||||
| @@ -841,3 +843,174 @@ func TestServerPublishFramesWrongProtocol(t *testing.T) { | |||||||
| 	}.Write(bconn.Writer) | 	}.Write(bconn.Writer) | ||||||
| 	require.NoError(t, err) | 	require.NoError(t, err) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestServerPublishRTCP(t *testing.T) { | ||||||
|  | 	conf := ServerConf{ | ||||||
|  | 		receiverReportPeriod: 1 * time.Second, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s, err := conf.Serve("127.0.0.1:8554") | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer s.Close() | ||||||
|  |  | ||||||
|  | 	serverDone := make(chan struct{}) | ||||||
|  | 	defer func() { <-serverDone }() | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(serverDone) | ||||||
|  |  | ||||||
|  | 		conn, err := s.Accept() | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		defer conn.Close() | ||||||
|  |  | ||||||
|  | 		onAnnounce := func(ctx *ServerConnAnnounceCtx) (*base.Response, error) { | ||||||
|  | 			return &base.Response{ | ||||||
|  | 				StatusCode: base.StatusOK, | ||||||
|  | 			}, nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		onSetup := func(ctx *ServerConnSetupCtx) (*base.Response, error) { | ||||||
|  | 			return &base.Response{ | ||||||
|  | 				StatusCode: base.StatusOK, | ||||||
|  | 			}, nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		onRecord := func(ctx *ServerConnRecordCtx) (*base.Response, error) { | ||||||
|  | 			return &base.Response{ | ||||||
|  | 				StatusCode: base.StatusOK, | ||||||
|  | 			}, nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		onFrame := func(trackID int, typ StreamType, buf []byte) { | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		<-conn.Read(ServerConnReadHandlers{ | ||||||
|  | 			OnAnnounce: onAnnounce, | ||||||
|  | 			OnSetup:    onSetup, | ||||||
|  | 			OnRecord:   onRecord, | ||||||
|  | 			OnFrame:    onFrame, | ||||||
|  | 		}) | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	conn, err := net.Dial("tcp", "localhost:8554") | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer conn.Close() | ||||||
|  | 	bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) | ||||||
|  |  | ||||||
|  | 	track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	tracks := Tracks{track} | ||||||
|  | 	for i, t := range tracks { | ||||||
|  | 		t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ | ||||||
|  | 			Key:   "control", | ||||||
|  | 			Value: "trackID=" + strconv.FormatInt(int64(i), 10), | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	err = base.Request{ | ||||||
|  | 		Method: base.Announce, | ||||||
|  | 		URL:    base.MustParseURL("rtsp://localhost:8554/teststream"), | ||||||
|  | 		Header: base.Header{ | ||||||
|  | 			"CSeq":         base.HeaderValue{"1"}, | ||||||
|  | 			"Content-Type": base.HeaderValue{"application/sdp"}, | ||||||
|  | 		}, | ||||||
|  | 		Body: tracks.Write(), | ||||||
|  | 	}.Write(bconn.Writer) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	var res base.Response | ||||||
|  | 	err = res.Read(bconn.Reader) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	require.Equal(t, base.StatusOK, res.StatusCode) | ||||||
|  |  | ||||||
|  | 	inTH := &headers.Transport{ | ||||||
|  | 		Delivery: func() *base.StreamDelivery { | ||||||
|  | 			v := base.StreamDeliveryUnicast | ||||||
|  | 			return &v | ||||||
|  | 		}(), | ||||||
|  | 		Mode: func() *headers.TransportMode { | ||||||
|  | 			v := headers.TransportModeRecord | ||||||
|  | 			return &v | ||||||
|  | 		}(), | ||||||
|  | 		Protocol:       StreamProtocolTCP, | ||||||
|  | 		InterleavedIDs: &[2]int{0, 1}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	err = base.Request{ | ||||||
|  | 		Method: base.Setup, | ||||||
|  | 		URL:    base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), | ||||||
|  | 		Header: base.Header{ | ||||||
|  | 			"CSeq":      base.HeaderValue{"2"}, | ||||||
|  | 			"Transport": inTH.Write(), | ||||||
|  | 		}, | ||||||
|  | 	}.Write(bconn.Writer) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	err = res.Read(bconn.Reader) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	require.Equal(t, base.StatusOK, res.StatusCode) | ||||||
|  |  | ||||||
|  | 	var th headers.Transport | ||||||
|  | 	err = th.Read(res.Header["Transport"]) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	err = base.Request{ | ||||||
|  | 		Method: base.Record, | ||||||
|  | 		URL:    base.MustParseURL("rtsp://localhost:8554/teststream"), | ||||||
|  | 		Header: base.Header{ | ||||||
|  | 			"CSeq": base.HeaderValue{"3"}, | ||||||
|  | 		}, | ||||||
|  | 	}.Write(bconn.Writer) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	err = res.Read(bconn.Reader) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	require.Equal(t, base.StatusOK, res.StatusCode) | ||||||
|  |  | ||||||
|  | 	byts, _ := (&rtp.Packet{ | ||||||
|  | 		Header: rtp.Header{ | ||||||
|  | 			Version:        2, | ||||||
|  | 			Marker:         true, | ||||||
|  | 			PayloadType:    96, | ||||||
|  | 			SequenceNumber: 534, | ||||||
|  | 			Timestamp:      54352, | ||||||
|  | 			SSRC:           753621, | ||||||
|  | 		}, | ||||||
|  | 		Payload: []byte("\x01\x02\x03\x04"), | ||||||
|  | 	}).Marshal() | ||||||
|  | 	err = base.InterleavedFrame{ | ||||||
|  | 		TrackID:    0, | ||||||
|  | 		StreamType: StreamTypeRTP, | ||||||
|  | 		Payload:    byts, | ||||||
|  | 	}.Write(bconn.Writer) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|  | 	var f base.InterleavedFrame | ||||||
|  | 	f.Payload = make([]byte, 2048) | ||||||
|  | 	f.Read(bconn.Reader) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	require.Equal(t, StreamTypeRTCP, f.StreamType) | ||||||
|  | 	pkt, err := rtcp.Unmarshal(f.Payload) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	rr, ok := pkt[0].(*rtcp.ReceiverReport) | ||||||
|  | 	require.True(t, ok) | ||||||
|  | 	require.Equal(t, &rtcp.ReceiverReport{ | ||||||
|  | 		SSRC: rr.SSRC, | ||||||
|  | 		Reports: []rtcp.ReceptionReport{ | ||||||
|  | 			{ | ||||||
|  | 				SSRC:               rr.Reports[0].SSRC, | ||||||
|  | 				LastSequenceNumber: 534, | ||||||
|  | 				LastSenderReport:   rr.Reports[0].LastSenderReport, | ||||||
|  | 				Delay:              rr.Reports[0].Delay, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		ProfileExtensions: []uint8{}, | ||||||
|  | 	}, rr) | ||||||
|  |  | ||||||
|  | 	err = base.InterleavedFrame{ | ||||||
|  | 		TrackID:    0, | ||||||
|  | 		StreamType: StreamTypeRTP, | ||||||
|  | 		Payload:    byts, | ||||||
|  | 	}.Write(bconn.Writer) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 aler9
					aler9