diff --git a/client.go b/client.go index bdfc3442..46adb14c 100644 --- a/client.go +++ b/client.go @@ -35,14 +35,14 @@ func DialPublish(address string, tracks Tracks) (*ClientConn, error) { return DefaultClient.DialPublish(address, tracks) } -// ClientProtocol is a RTSP stream protocol used by the client. -type ClientProtocol int +// ClientTransport is a stream transport used by the client. +type ClientTransport int -// standard RTSP stream protocols. +// standard client transports. const ( - ClientProtocolUDP ClientProtocol = iota - ClientProtocolMulticast - ClientProtocolTCP + ClientTransportUDP ClientTransport = iota + ClientTransportMulticast + ClientTransportTCP ) // Client is a RTSP client. @@ -74,10 +74,10 @@ type Client struct { // // reading / writing // - // the stream protocol (UDP, Multicast or TCP). + // the stream transport (UDP, Multicast or TCP). // If nil, it is chosen automatically (first UDP, then, if it fails, TCP). // It defaults to nil. - Protocol *ClientProtocol + Transport *ClientTransport // If the client is reading with UDP, it must receive // at least a packet within this timeout. // It defaults to 3 seconds. diff --git a/client_publish_test.go b/client_publish_test.go index 68c50f76..c7f845ba 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -17,11 +17,11 @@ import ( ) func TestClientPublishSerial(t *testing.T) { - for _, proto := range []string{ + for _, transport := range []string{ "udp", "tcp", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -79,7 +79,7 @@ func TestClientPublishSerial(t *testing.T) { }(), } - if proto == "udp" { + if transport == "udp" { th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts @@ -99,7 +99,7 @@ func TestClientPublishSerial(t *testing.T) { var l1 net.PacketConn var l2 net.PacketConn - if proto == "udp" { + if transport == "udp" { l1, err = net.ListenPacket("udp", "localhost:34556") require.NoError(t, err) defer l1.Close() @@ -120,7 +120,7 @@ func TestClientPublishSerial(t *testing.T) { require.NoError(t, err) // client -> server - if proto == "udp" { + if transport == "udp" { buf := make([]byte, 2048) n, _, err := l1.ReadFrom(buf) require.NoError(t, err) @@ -136,7 +136,7 @@ func TestClientPublishSerial(t *testing.T) { } // server -> client (RTCP) - if proto == "udp" { + if transport == "udp" { l2.WriteTo([]byte{0x05, 0x06, 0x07, 0x08}, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: th.ClientPorts[1], @@ -161,12 +161,12 @@ func TestClientPublishSerial(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - if proto == "udp" { - v := ClientProtocolUDP + Transport: func() *ClientTransport { + if transport == "udp" { + v := ClientTransportUDP return &v } - v := ClientProtocolTCP + v := ClientTransportTCP return &v }(), } @@ -206,11 +206,11 @@ func TestClientPublishSerial(t *testing.T) { } func TestClientPublishParallel(t *testing.T) { - for _, proto := range []string{ + for _, transport := range []string{ "udp", "tcp", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -265,7 +265,7 @@ func TestClientPublishParallel(t *testing.T) { }(), } - if proto == "udp" { + if transport == "udp" { th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts @@ -303,12 +303,12 @@ func TestClientPublishParallel(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - if proto == "udp" { - v := ClientProtocolUDP + Transport: func() *ClientTransport { + if transport == "udp" { + v := ClientTransportUDP return &v } - v := ClientProtocolTCP + v := ClientTransportTCP return &v }(), } @@ -345,11 +345,11 @@ func TestClientPublishParallel(t *testing.T) { } func TestClientPublishPauseSerial(t *testing.T) { - for _, proto := range []string{ + for _, transport := range []string{ "udp", "tcp", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -405,7 +405,7 @@ func TestClientPublishPauseSerial(t *testing.T) { }(), } - if proto == "udp" { + if transport == "udp" { th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts @@ -461,12 +461,12 @@ func TestClientPublishPauseSerial(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - if proto == "udp" { - v := ClientProtocolUDP + Transport: func() *ClientTransport { + if transport == "udp" { + v := ClientTransportUDP return &v } - v := ClientProtocolTCP + v := ClientTransportTCP return &v }(), } @@ -501,11 +501,11 @@ func TestClientPublishPauseSerial(t *testing.T) { } func TestClientPublishPauseParallel(t *testing.T) { - for _, proto := range []string{ + for _, transport := range []string{ "udp", "tcp", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -561,7 +561,7 @@ func TestClientPublishPauseParallel(t *testing.T) { }(), } - if proto == "udp" { + if transport == "udp" { th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts @@ -599,12 +599,12 @@ func TestClientPublishPauseParallel(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - if proto == "udp" { - v := ClientProtocolUDP + Transport: func() *ClientTransport { + if transport == "udp" { + v := ClientTransportUDP return &v } - v := ClientProtocolTCP + v := ClientTransportTCP return &v }(), } @@ -881,8 +881,8 @@ func TestClientPublishRTCPReport(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - v := ClientProtocolTCP + Transport: func() *ClientTransport { + v := ClientTransportTCP return &v }(), senderReportPeriod: 1 * time.Second, diff --git a/client_read_test.go b/client_read_test.go index f4e8f18c..0f6f07fe 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -162,13 +162,13 @@ func TestClientReadTracks(t *testing.T) { } func TestClientRead(t *testing.T) { - for _, proto := range []string{ + for _, transport := range []string{ "udp", "multicast", "tcp", "tls", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { frameRecv := make(chan struct{}) listenIP := multicastCapableIP(t) @@ -177,7 +177,7 @@ func TestClientRead(t *testing.T) { defer l.Close() var scheme string - if proto == "tls" { + if transport == "tls" { scheme = "rtsps" cert, err := tls.X509KeyPair(serverCert, serverKey) @@ -250,7 +250,7 @@ func TestClientRead(t *testing.T) { var l1 net.PacketConn var l2 net.PacketConn - switch proto { + switch transport { case "udp": v := base.StreamDeliveryUnicast th.Delivery = &v @@ -329,7 +329,7 @@ func TestClientRead(t *testing.T) { require.NoError(t, err) // server -> client - switch proto { + switch transport { case "udp": time.Sleep(1 * time.Second) l1.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ @@ -353,9 +353,9 @@ func TestClientRead(t *testing.T) { } // client -> server (RTCP) - switch proto { + switch transport { case "udp", "multicast": - if proto == "udp" { + if transport == "udp" { // skip firewall opening buf := make([]byte, 2048) _, _, err := l2.ReadFrom(buf) @@ -390,18 +390,18 @@ func TestClientRead(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - switch proto { + Transport: func() *ClientTransport { + switch transport { case "udp": - v := ClientProtocolUDP + v := ClientTransportUDP return &v case "multicast": - v := ClientProtocolMulticast + v := ClientTransportMulticast return &v default: // tcp, tls - v := ClientProtocolTCP + v := ClientTransportTCP return &v } }(), @@ -416,7 +416,7 @@ func TestClientRead(t *testing.T) { defer close(done) conn.ReadFrames(func(id int, streamType StreamType, payload []byte) { // skip multicast loopback - if proto == "multicast" { + if transport == "multicast" { add := atomic.AddUint64(&counter, 1) if add >= 2 { return @@ -536,8 +536,8 @@ func TestClientReadPartial(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - v := ClientProtocolTCP + Transport: func() *ClientTransport { + v := ClientTransportTCP return &v }(), } @@ -1241,8 +1241,8 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - v := ClientProtocolTCP + Transport: func() *ClientTransport { + v := ClientTransportTCP return &v }(), } @@ -1454,11 +1454,11 @@ func TestClientReadPause(t *testing.T) { return writerTerminate, writerDone } - for _, proto := range []string{ + for _, transport := range []string{ "udp", "tcp", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -1523,7 +1523,7 @@ func TestClientReadPause(t *testing.T) { }(), } - if proto == "udp" { + if transport == "udp" { th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts @@ -1589,12 +1589,12 @@ func TestClientReadPause(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - if proto == "udp" { - v := ClientProtocolUDP + Transport: func() *ClientTransport { + if transport == "udp" { + v := ClientTransportUDP return &v } - v := ClientProtocolTCP + v := ClientTransportTCP return &v }(), } @@ -1783,8 +1783,8 @@ func TestClientReadRTCPReport(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - v := ClientProtocolTCP + Transport: func() *ClientTransport { + v := ClientTransportTCP return &v }(), receiverReportPeriod: 1 * time.Second, @@ -1814,12 +1814,12 @@ func TestClientReadRTCPReport(t *testing.T) { } func TestClientReadErrorTimeout(t *testing.T) { - for _, proto := range []string{ + for _, transport := range []string{ "udp", "tcp", "auto", } { - t.Run(proto, func(t *testing.T) { + t.Run(transport, func(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -1885,7 +1885,7 @@ func TestClientReadErrorTimeout(t *testing.T) { } var l1 net.PacketConn - if proto == "udp" || proto == "auto" { + if transport == "udp" || transport == "auto" { var err error l1, err = net.ListenPacket("udp", "localhost:34557") require.NoError(t, err) @@ -1917,7 +1917,7 @@ func TestClientReadErrorTimeout(t *testing.T) { }.Write(bconn.Writer) require.NoError(t, err) - if proto == "udp" || proto == "auto" { + if transport == "udp" || transport == "auto" { time.Sleep(500 * time.Millisecond) l1, err := net.ListenPacket("udp", "localhost:34556") @@ -1942,14 +1942,14 @@ func TestClientReadErrorTimeout(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - switch proto { + Transport: func() *ClientTransport { + switch transport { case "udp": - v := ClientProtocolUDP + v := ClientTransportUDP return &v case "tcp": - v := ClientProtocolTCP + v := ClientTransportTCP return &v } return nil @@ -1965,7 +1965,7 @@ func TestClientReadErrorTimeout(t *testing.T) { err = conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { }) - switch proto { + switch transport { case "udp", "auto": require.Equal(t, "UDP timeout", err.Error()) @@ -2083,8 +2083,8 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - v := ClientProtocolTCP + Transport: func() *ClientTransport { + v := ClientTransportTCP return &v }(), } @@ -2236,8 +2236,8 @@ func TestClientReadSeek(t *testing.T) { }() c := &Client{ - Protocol: func() *ClientProtocol { - v := ClientProtocolTCP + Transport: func() *ClientTransport { + v := ClientTransportTCP return &v }(), } diff --git a/clientconn.go b/clientconn.go index 8d4a1f45..cafdc8ae 100644 --- a/clientconn.go +++ b/clientconn.go @@ -136,7 +136,7 @@ type ClientConn struct { cseq int useGetParameter bool streamBaseURL *base.URL - protocol *ClientProtocol + protocol *ClientTransport tracks map[int]clientConnTrack tracksByChannel map[int]int lastRange *headers.Range @@ -370,10 +370,10 @@ func (cc *ClientConn) checkState(allowed map[clientConnState]struct{}) error { } func (cc *ClientConn) switchProtocolIfTimeout(err error) error { - if *cc.protocol != ClientProtocolUDP || + if *cc.protocol != ClientTransportUDP || cc.state != clientConnStatePlay || !isErrNOUDPPacketsReceivedRecently(err) || - cc.c.Protocol != nil { + cc.c.Transport != nil { return err } @@ -383,7 +383,7 @@ func (cc *ClientConn) switchProtocolIfTimeout(err error) error { cc.reset(true) - v := ClientProtocolTCP + v := ClientTransportTCP cc.protocol = &v cc.useGetParameter = oldUseGetParameter cc.scheme = prevBaseURL.Scheme @@ -449,13 +449,13 @@ func (cc *ClientConn) backgroundClose(isSwitchingProtocol bool) { func (cc *ClientConn) runBackground() { cc.backgroundInnerDone <- func() error { if cc.state == clientConnStatePlay { - if *cc.protocol == ClientProtocolUDP || *cc.protocol == ClientProtocolMulticast { + if *cc.protocol == ClientTransportUDP || *cc.protocol == ClientTransportMulticast { return cc.runBackgroundPlayUDP() } return cc.runBackgroundPlayTCP() } - if *cc.protocol == ClientProtocolUDP { + if *cc.protocol == ClientTransportUDP { return cc.runBackgroundRecordUDP() } return cc.runBackgroundRecordTCP() @@ -463,7 +463,7 @@ func (cc *ClientConn) runBackground() { } func (cc *ClientConn) runBackgroundPlayUDP() error { - if *cc.protocol == ClientProtocolUDP { + if *cc.protocol == ClientTransportUDP { // open the firewall by sending packets to the counterpart for _, cct := range cc.tracks { cct.udpRTPListener.write( @@ -793,7 +793,7 @@ func (cc *ClientConn) connOpen() error { return fmt.Errorf("unsupported scheme '%s'", cc.scheme) } - if cc.scheme == "rtsps" && cc.c.Protocol != nil && *cc.c.Protocol != ClientProtocolTCP { + if cc.scheme == "rtsps" && cc.c.Transport != nil && *cc.c.Transport != ClientTransportTCP { return fmt.Errorf("RTSPS can be used only with TCP") } @@ -1197,23 +1197,23 @@ func (cc *ClientConn) doSetup( // always use TCP if encrypted if cc.scheme == "rtsps" { - v := ClientProtocolTCP + v := ClientTransportTCP cc.protocol = &v } - proto := func() ClientProtocol { + proto := func() ClientTransport { // protocol set by previous Setup() or switchProtocolIfTimeout() if cc.protocol != nil { return *cc.protocol } // protocol set by conf - if cc.c.Protocol != nil { - return *cc.c.Protocol + if cc.c.Transport != nil { + return *cc.c.Transport } // try UDP - return ClientProtocolUDP + return ClientTransportUDP }() th := headers.Transport{ @@ -1223,7 +1223,7 @@ func (cc *ClientConn) doSetup( trackID := len(cc.tracks) switch proto { - case ClientProtocolUDP: + case ClientTransportUDP: if (rtpPort == 0 && rtcpPort != 0) || (rtpPort != 0 && rtcpPort == 0) { return nil, liberrors.ErrClientUDPPortsZero{} @@ -1257,12 +1257,12 @@ func (cc *ClientConn) doSetup( rtcpListener.port(), } - case ClientProtocolMulticast: + case ClientTransportMulticast: v1 := base.StreamDeliveryMulticast th.Delivery = &v1 th.Protocol = base.StreamProtocolUDP - case ClientProtocolTCP: + case ClientTransportTCP: v1 := base.StreamDeliveryUnicast th.Delivery = &v1 th.Protocol = base.StreamProtocolTCP @@ -1271,7 +1271,7 @@ func (cc *ClientConn) doSetup( trackURL, err := track.URL(baseURL) if err != nil { - if proto == ClientProtocolUDP { + if proto == ClientTransportUDP { rtpListener.close() rtcpListener.close() } @@ -1286,7 +1286,7 @@ func (cc *ClientConn) doSetup( }, }, false) if err != nil { - if proto == ClientProtocolUDP { + if proto == ClientTransportUDP { rtpListener.close() rtcpListener.close() } @@ -1294,7 +1294,7 @@ func (cc *ClientConn) doSetup( } if res.StatusCode != base.StatusOK { - if proto == ClientProtocolUDP { + if proto == ClientTransportUDP { rtpListener.close() rtcpListener.close() } @@ -1302,9 +1302,9 @@ func (cc *ClientConn) doSetup( // switch protocol automatically if res.StatusCode == base.StatusUnsupportedTransport && cc.protocol == nil && - cc.c.Protocol == nil { + cc.c.Transport == nil { - v := ClientProtocolTCP + v := ClientTransportTCP cc.protocol = &v return cc.doSetup(mode, baseURL, track, 0, 0) @@ -1316,7 +1316,7 @@ func (cc *ClientConn) doSetup( var thRes headers.Transport err = thRes.Read(res.Header["Transport"]) if err != nil { - if proto == ClientProtocolUDP { + if proto == ClientTransportUDP { rtpListener.close() rtcpListener.close() } @@ -1324,7 +1324,7 @@ func (cc *ClientConn) doSetup( } switch proto { - case ClientProtocolUDP: + case ClientTransportUDP: if thRes.Delivery != nil && *thRes.Delivery != base.StreamDeliveryUnicast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} } @@ -1337,7 +1337,7 @@ func (cc *ClientConn) doSetup( } } - case ClientProtocolMulticast: + case ClientTransportMulticast: if thRes.Delivery == nil || *thRes.Delivery != base.StreamDeliveryMulticast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} } @@ -1363,7 +1363,7 @@ func (cc *ClientConn) doSetup( return nil, err } - case ClientProtocolTCP: + case ClientTransportTCP: if thRes.Delivery != nil && *thRes.Delivery != base.StreamDeliveryUnicast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} } @@ -1401,7 +1401,7 @@ func (cc *ClientConn) doSetup( cc.protocol = &proto switch proto { - case ClientProtocolUDP: + case ClientTransportUDP: rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteWriteIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteZone = cc.nconn.RemoteAddr().(*net.TCPAddr).Zone @@ -1422,7 +1422,7 @@ func (cc *ClientConn) doSetup( rtcpListener.streamType = StreamTypeRTCP cct.udpRTCPListener = rtcpListener - case ClientProtocolMulticast: + case ClientTransportMulticast: rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteWriteIP = *thRes.Destination rtpListener.remoteZone = "" @@ -1439,7 +1439,7 @@ func (cc *ClientConn) doSetup( rtcpListener.streamType = StreamTypeRTCP cct.udpRTCPListener = rtcpListener - case ClientProtocolTCP: + case ClientTransportTCP: if cc.tcpFrameBuffer == nil { cc.tcpFrameBuffer = multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize)) } @@ -1700,7 +1700,7 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b } switch *cc.protocol { - case ClientProtocolUDP, ClientProtocolMulticast: + case ClientTransportUDP, ClientTransportMulticast: if streamType == StreamTypeRTP { return cc.tracks[trackID].udpRTPListener.write(payload) } diff --git a/examples/client-publish-options/main.go b/examples/client-publish-options/main.go index ac01fed3..0ac771db 100644 --- a/examples/client-publish-options/main.go +++ b/examples/client-publish-options/main.go @@ -43,8 +43,8 @@ func main() { // Client allows to set additional client options c := &gortsplib.Client{ - // the stream protocol (UDP or TCP). If nil, it is chosen automatically - Protocol: nil, + // the stream transport (UDP, Multicast or TCP). If nil, it is chosen automatically + Transport: nil, // timeout of read operations ReadTimeout: 10 * time.Second, // timeout of write operations diff --git a/examples/client-read-options/main.go b/examples/client-read-options/main.go index 62335aa4..c714b784 100644 --- a/examples/client-read-options/main.go +++ b/examples/client-read-options/main.go @@ -14,8 +14,8 @@ import ( func main() { // Client allows to set additional client options c := &gortsplib.Client{ - // the stream protocol (UDP or TCP). If nil, it is chosen automatically - Protocol: nil, + // the stream transport (UDP, Multicast or TCP). If nil, it is chosen automatically + Transport: nil, // timeout of read operations ReadTimeout: 10 * time.Second, // timeout of write operations diff --git a/examples/client-read-save-to-disk/main.go b/examples/client-read-save-to-disk/main.go index 04e03ab0..cb357eb7 100644 --- a/examples/client-read-save-to-disk/main.go +++ b/examples/client-read-save-to-disk/main.go @@ -25,9 +25,9 @@ const ( ) func main() { - p := gortsplib.ClientProtocolUDP + p := gortsplib.ClientTransportUDP c := gortsplib.Client{ - Protocol: &p, + Transport: &p, } // connect to the server and start reading all tracks