rename ClientTransport into Transport

This commit is contained in:
aler9
2021-10-22 17:53:51 +02:00
parent 1865ba58eb
commit 320c1b6f2c
11 changed files with 150 additions and 116 deletions

View File

@@ -16,21 +16,25 @@ Features:
* Client * Client
* Query servers about available streams * Query servers about available streams
* Read * Read
* Read streams from servers with UDP, UDP-multicast, TCP or TLS * Read streams from servers with the UDP, UDP-multicast or TCP transports
* Read streams encrypted with TLS
* Switch protocol automatically (switch to TCP in case of server error or UDP timeout) * Switch protocol automatically (switch to TCP in case of server error or UDP timeout)
* Read only selected tracks of a stream * Read only selected tracks of a stream
* Pause or seek without disconnecting from the server * Pause or seek without disconnecting from the server
* Generate RTCP receiver reports automatically * Generate RTCP receiver reports automatically
* Publish * Publish
* Publish streams to servers with UDP, TCP or TLS * Publish streams to servers with the UDP or TCP transports
* Publish streams encrypted with TLS
* Switch protocol automatically (switch to TCP in case of server error) * Switch protocol automatically (switch to TCP in case of server error)
* Pause without disconnecting from the server * Pause without disconnecting from the server
* Generate RTCP sender reports automatically * Generate RTCP sender reports automatically
* Server * Server
* Handle requests from clients * Handle requests from clients
* Sessions and connections are independent * Sessions and connections are independent
* Read streams from clients with UDP, TCP or TLS * Write streams to clients with the UDP, UDP-multicast or TCP transports
* Write streams to clients with UDP, UDP-multicast, TCP or TLS * Write streams to clients encrypted with TLS
* Read streams from clients with the UDP or TCP transports
* Write streams to clients encrypted with TLS
* Provide SSRC, RTP-Info to clients automatically * Provide SSRC, RTP-Info to clients automatically
* Generate RTCP receiver reports automatically * Generate RTCP receiver reports automatically
* Utilities * Utilities

View File

@@ -35,16 +35,6 @@ func DialPublish(address string, tracks Tracks) (*ClientConn, error) {
return DefaultClient.DialPublish(address, tracks) return DefaultClient.DialPublish(address, tracks)
} }
// ClientTransport is a stream transport used by the client.
type ClientTransport int
// standard client transports.
const (
ClientTransportUDP ClientTransport = iota
ClientTransportUDPMulticast
ClientTransportTCP
)
// Client is a RTSP client. // Client is a RTSP client.
type Client struct { type Client struct {
// //
@@ -77,7 +67,7 @@ type Client struct {
// the stream transport (UDP, Multicast or TCP). // the stream transport (UDP, Multicast or TCP).
// If nil, it is chosen automatically (first UDP, then, if it fails, TCP). // If nil, it is chosen automatically (first UDP, then, if it fails, TCP).
// It defaults to nil. // It defaults to nil.
Transport *ClientTransport Transport *Transport
// If the client is reading with UDP, it must receive // If the client is reading with UDP, it must receive
// at least a packet within this timeout. // at least a packet within this timeout.
// It defaults to 3 seconds. // It defaults to 3 seconds.

View File

@@ -161,12 +161,12 @@ func TestClientPublishSerial(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
if transport == "udp" { if transport == "udp" {
v := ClientTransportUDP v := TransportUDP
return &v return &v
} }
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -303,12 +303,12 @@ func TestClientPublishParallel(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
if transport == "udp" { if transport == "udp" {
v := ClientTransportUDP v := TransportUDP
return &v return &v
} }
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -461,12 +461,12 @@ func TestClientPublishPauseSerial(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
if transport == "udp" { if transport == "udp" {
v := ClientTransportUDP v := TransportUDP
return &v return &v
} }
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -599,12 +599,12 @@ func TestClientPublishPauseParallel(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
if transport == "udp" { if transport == "udp" {
v := ClientTransportUDP v := TransportUDP
return &v return &v
} }
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -881,8 +881,8 @@ func TestClientPublishRTCPReport(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
senderReportPeriod: 1 * time.Second, senderReportPeriod: 1 * time.Second,

View File

@@ -390,18 +390,18 @@ func TestClientRead(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
switch transport { switch transport {
case "udp": case "udp":
v := ClientTransportUDP v := TransportUDP
return &v return &v
case "multicast": case "multicast":
v := ClientTransportUDPMulticast v := TransportUDPMulticast
return &v return &v
default: // tcp, tls default: // tcp, tls
v := ClientTransportTCP v := TransportTCP
return &v return &v
} }
}(), }(),
@@ -536,8 +536,8 @@ func TestClientReadPartial(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -1241,8 +1241,8 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -1589,12 +1589,12 @@ func TestClientReadPause(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
if transport == "udp" { if transport == "udp" {
v := ClientTransportUDP v := TransportUDP
return &v return &v
} }
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -1783,8 +1783,8 @@ func TestClientReadRTCPReport(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
receiverReportPeriod: 1 * time.Second, receiverReportPeriod: 1 * time.Second,
@@ -1942,14 +1942,14 @@ func TestClientReadErrorTimeout(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
switch transport { switch transport {
case "udp": case "udp":
v := ClientTransportUDP v := TransportUDP
return &v return &v
case "tcp": case "tcp":
v := ClientTransportTCP v := TransportTCP
return &v return &v
} }
return nil return nil
@@ -2083,8 +2083,8 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }
@@ -2236,8 +2236,8 @@ func TestClientReadSeek(t *testing.T) {
}() }()
c := &Client{ c := &Client{
Transport: func() *ClientTransport { Transport: func() *Transport {
v := ClientTransportTCP v := TransportTCP
return &v return &v
}(), }(),
} }

View File

@@ -136,7 +136,7 @@ type ClientConn struct {
cseq int cseq int
useGetParameter bool useGetParameter bool
streamBaseURL *base.URL streamBaseURL *base.URL
protocol *ClientTransport protocol *Transport
tracks map[int]clientConnTrack tracks map[int]clientConnTrack
tracksByChannel map[int]int tracksByChannel map[int]int
lastRange *headers.Range lastRange *headers.Range
@@ -370,7 +370,7 @@ func (cc *ClientConn) checkState(allowed map[clientConnState]struct{}) error {
} }
func (cc *ClientConn) switchProtocolIfTimeout(err error) error { func (cc *ClientConn) switchProtocolIfTimeout(err error) error {
if *cc.protocol != ClientTransportUDP || if *cc.protocol != TransportUDP ||
cc.state != clientConnStatePlay || cc.state != clientConnStatePlay ||
!isErrNOUDPPacketsReceivedRecently(err) || !isErrNOUDPPacketsReceivedRecently(err) ||
cc.c.Transport != nil { cc.c.Transport != nil {
@@ -383,7 +383,7 @@ func (cc *ClientConn) switchProtocolIfTimeout(err error) error {
cc.reset(true) cc.reset(true)
v := ClientTransportTCP v := TransportTCP
cc.protocol = &v cc.protocol = &v
cc.useGetParameter = oldUseGetParameter cc.useGetParameter = oldUseGetParameter
cc.scheme = prevBaseURL.Scheme cc.scheme = prevBaseURL.Scheme
@@ -449,13 +449,13 @@ func (cc *ClientConn) backgroundClose(isSwitchingProtocol bool) {
func (cc *ClientConn) runBackground() { func (cc *ClientConn) runBackground() {
cc.backgroundInnerDone <- func() error { cc.backgroundInnerDone <- func() error {
if cc.state == clientConnStatePlay { if cc.state == clientConnStatePlay {
if *cc.protocol == ClientTransportUDP || *cc.protocol == ClientTransportUDPMulticast { if *cc.protocol == TransportUDP || *cc.protocol == TransportUDPMulticast {
return cc.runBackgroundPlayUDP() return cc.runBackgroundPlayUDP()
} }
return cc.runBackgroundPlayTCP() return cc.runBackgroundPlayTCP()
} }
if *cc.protocol == ClientTransportUDP { if *cc.protocol == TransportUDP {
return cc.runBackgroundRecordUDP() return cc.runBackgroundRecordUDP()
} }
return cc.runBackgroundRecordTCP() return cc.runBackgroundRecordTCP()
@@ -463,7 +463,7 @@ func (cc *ClientConn) runBackground() {
} }
func (cc *ClientConn) runBackgroundPlayUDP() error { func (cc *ClientConn) runBackgroundPlayUDP() error {
if *cc.protocol == ClientTransportUDP { if *cc.protocol == TransportUDP {
// open the firewall by sending packets to the counterpart // open the firewall by sending packets to the counterpart
for _, cct := range cc.tracks { for _, cct := range cc.tracks {
cct.udpRTPListener.write( cct.udpRTPListener.write(
@@ -793,7 +793,7 @@ func (cc *ClientConn) connOpen() error {
return fmt.Errorf("unsupported scheme '%s'", cc.scheme) return fmt.Errorf("unsupported scheme '%s'", cc.scheme)
} }
if cc.scheme == "rtsps" && cc.c.Transport != nil && *cc.c.Transport != ClientTransportTCP { if cc.scheme == "rtsps" && cc.c.Transport != nil && *cc.c.Transport != TransportTCP {
return fmt.Errorf("RTSPS can be used only with TCP") return fmt.Errorf("RTSPS can be used only with TCP")
} }
@@ -1197,11 +1197,11 @@ func (cc *ClientConn) doSetup(
// always use TCP if encrypted // always use TCP if encrypted
if cc.scheme == "rtsps" { if cc.scheme == "rtsps" {
v := ClientTransportTCP v := TransportTCP
cc.protocol = &v cc.protocol = &v
} }
proto := func() ClientTransport { proto := func() Transport {
// protocol set by previous Setup() or switchProtocolIfTimeout() // protocol set by previous Setup() or switchProtocolIfTimeout()
if cc.protocol != nil { if cc.protocol != nil {
return *cc.protocol return *cc.protocol
@@ -1213,7 +1213,7 @@ func (cc *ClientConn) doSetup(
} }
// try UDP // try UDP
return ClientTransportUDP return TransportUDP
}() }()
th := headers.Transport{ th := headers.Transport{
@@ -1223,7 +1223,7 @@ func (cc *ClientConn) doSetup(
trackID := len(cc.tracks) trackID := len(cc.tracks)
switch proto { switch proto {
case ClientTransportUDP: case TransportUDP:
if (rtpPort == 0 && rtcpPort != 0) || if (rtpPort == 0 && rtcpPort != 0) ||
(rtpPort != 0 && rtcpPort == 0) { (rtpPort != 0 && rtcpPort == 0) {
return nil, liberrors.ErrClientUDPPortsZero{} return nil, liberrors.ErrClientUDPPortsZero{}
@@ -1257,12 +1257,12 @@ func (cc *ClientConn) doSetup(
rtcpListener.port(), rtcpListener.port(),
} }
case ClientTransportUDPMulticast: case TransportUDPMulticast:
v1 := base.StreamDeliveryMulticast v1 := base.StreamDeliveryMulticast
th.Delivery = &v1 th.Delivery = &v1
th.Protocol = base.StreamProtocolUDP th.Protocol = base.StreamProtocolUDP
case ClientTransportTCP: case TransportTCP:
v1 := base.StreamDeliveryUnicast v1 := base.StreamDeliveryUnicast
th.Delivery = &v1 th.Delivery = &v1
th.Protocol = base.StreamProtocolTCP th.Protocol = base.StreamProtocolTCP
@@ -1271,7 +1271,7 @@ func (cc *ClientConn) doSetup(
trackURL, err := track.URL(baseURL) trackURL, err := track.URL(baseURL)
if err != nil { if err != nil {
if proto == ClientTransportUDP { if proto == TransportUDP {
rtpListener.close() rtpListener.close()
rtcpListener.close() rtcpListener.close()
} }
@@ -1286,7 +1286,7 @@ func (cc *ClientConn) doSetup(
}, },
}, false) }, false)
if err != nil { if err != nil {
if proto == ClientTransportUDP { if proto == TransportUDP {
rtpListener.close() rtpListener.close()
rtcpListener.close() rtcpListener.close()
} }
@@ -1294,7 +1294,7 @@ func (cc *ClientConn) doSetup(
} }
if res.StatusCode != base.StatusOK { if res.StatusCode != base.StatusOK {
if proto == ClientTransportUDP { if proto == TransportUDP {
rtpListener.close() rtpListener.close()
rtcpListener.close() rtcpListener.close()
} }
@@ -1304,7 +1304,7 @@ func (cc *ClientConn) doSetup(
cc.protocol == nil && cc.protocol == nil &&
cc.c.Transport == nil { cc.c.Transport == nil {
v := ClientTransportTCP v := TransportTCP
cc.protocol = &v cc.protocol = &v
return cc.doSetup(mode, baseURL, track, 0, 0) return cc.doSetup(mode, baseURL, track, 0, 0)
@@ -1316,7 +1316,7 @@ func (cc *ClientConn) doSetup(
var thRes headers.Transport var thRes headers.Transport
err = thRes.Read(res.Header["Transport"]) err = thRes.Read(res.Header["Transport"])
if err != nil { if err != nil {
if proto == ClientTransportUDP { if proto == TransportUDP {
rtpListener.close() rtpListener.close()
rtcpListener.close() rtcpListener.close()
} }
@@ -1324,7 +1324,7 @@ func (cc *ClientConn) doSetup(
} }
switch proto { switch proto {
case ClientTransportUDP: case TransportUDP:
if thRes.Delivery != nil && *thRes.Delivery != base.StreamDeliveryUnicast { if thRes.Delivery != nil && *thRes.Delivery != base.StreamDeliveryUnicast {
return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{}
} }
@@ -1337,7 +1337,7 @@ func (cc *ClientConn) doSetup(
} }
} }
case ClientTransportUDPMulticast: case TransportUDPMulticast:
if thRes.Delivery == nil || *thRes.Delivery != base.StreamDeliveryMulticast { if thRes.Delivery == nil || *thRes.Delivery != base.StreamDeliveryMulticast {
return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{}
} }
@@ -1363,7 +1363,7 @@ func (cc *ClientConn) doSetup(
return nil, err return nil, err
} }
case ClientTransportTCP: case TransportTCP:
if thRes.Delivery != nil && *thRes.Delivery != base.StreamDeliveryUnicast { if thRes.Delivery != nil && *thRes.Delivery != base.StreamDeliveryUnicast {
return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{}
} }
@@ -1401,7 +1401,7 @@ func (cc *ClientConn) doSetup(
cc.protocol = &proto cc.protocol = &proto
switch proto { switch proto {
case ClientTransportUDP: case TransportUDP:
rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.remoteWriteIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteWriteIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.remoteZone = cc.nconn.RemoteAddr().(*net.TCPAddr).Zone rtpListener.remoteZone = cc.nconn.RemoteAddr().(*net.TCPAddr).Zone
@@ -1422,7 +1422,7 @@ func (cc *ClientConn) doSetup(
rtcpListener.streamType = StreamTypeRTCP rtcpListener.streamType = StreamTypeRTCP
cct.udpRTCPListener = rtcpListener cct.udpRTCPListener = rtcpListener
case ClientTransportUDPMulticast: case TransportUDPMulticast:
rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.remoteWriteIP = *thRes.Destination rtpListener.remoteWriteIP = *thRes.Destination
rtpListener.remoteZone = "" rtpListener.remoteZone = ""
@@ -1439,7 +1439,7 @@ func (cc *ClientConn) doSetup(
rtcpListener.streamType = StreamTypeRTCP rtcpListener.streamType = StreamTypeRTCP
cct.udpRTCPListener = rtcpListener cct.udpRTCPListener = rtcpListener
case ClientTransportTCP: case TransportTCP:
if cc.tcpFrameBuffer == nil { if cc.tcpFrameBuffer == nil {
cc.tcpFrameBuffer = multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize)) cc.tcpFrameBuffer = multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize))
} }
@@ -1700,7 +1700,7 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b
} }
switch *cc.protocol { switch *cc.protocol {
case ClientTransportUDP, ClientTransportUDPMulticast: case TransportUDP, TransportUDPMulticast:
if streamType == StreamTypeRTP { if streamType == StreamTypeRTP {
return cc.tracks[trackID].udpRTPListener.write(payload) return cc.tracks[trackID].udpRTPListener.write(payload)
} }

View File

@@ -82,21 +82,21 @@ type Server struct {
WriteTimeout time.Duration WriteTimeout time.Duration
// a TLS configuration to accept TLS (RTSPS) connections. // a TLS configuration to accept TLS (RTSPS) connections.
TLSConfig *tls.Config TLSConfig *tls.Config
// a port to send and receive RTP packets with UDP. // a port to send and receive RTP packets with the UDP transport.
// If UDPRTPAddress and UDPRTCPAddress are filled, the server can read and write UDP streams. // If UDPRTPAddress and UDPRTCPAddress are filled, the server can read and write UDP streams.
UDPRTPAddress string UDPRTPAddress string
// a port to send and receive RTCP packets with UDP. // a port to send and receive RTCP packets with the UDP transport.
// If UDPRTPAddress and UDPRTCPAddress are filled, the server can read and write UDP streams. // If UDPRTPAddress and UDPRTCPAddress are filled, the server can read and write UDP streams.
UDPRTCPAddress string UDPRTCPAddress string
// a range of multicast IPs to use. // a range of multicast IPs to use with the UDP-multicast transport.
// If MulticastIPRange, MulticastRTPPort, MulticastRTCPPort are filled, the server // If MulticastIPRange, MulticastRTPPort, MulticastRTCPPort are filled, the server
// can read and write UDP-multicast streams. // can read and write UDP-multicast streams.
MulticastIPRange string MulticastIPRange string
// a port to send RTP packets with UDP-multicast. // a port to send RTP packets with the UDP-multicast transport.
// If MulticastIPRange, MulticastRTPPort, MulticastRTCPPort are filled, the server // If MulticastIPRange, MulticastRTPPort, MulticastRTCPPort are filled, the server
// can read and write UDP-multicast streams. // can read and write UDP-multicast streams.
MulticastRTPPort int MulticastRTPPort int
// a port to send RTCP packets with UDP-multicast. // a port to send RTCP packets with the UDP-multicast transport.
// If MulticastIPRange, MulticastRTPPort, MulticastRTCPPort are filled, the server // If MulticastIPRange, MulticastRTPPort, MulticastRTCPPort are filled, the server
// can read and write UDP-multicast streams. // can read and write UDP-multicast streams.
MulticastRTCPPort int MulticastRTCPPort int

View File

@@ -98,7 +98,7 @@ type ServerHandlerOnSetupCtx struct {
Path string Path string
Query string Query string
TrackID int TrackID int
Transport ClientTransport Transport Transport
} }
// ServerHandlerOnSetup can be implemented by a ServerHandler. // ServerHandlerOnSetup can be implemented by a ServerHandler.

View File

@@ -75,7 +75,7 @@ func setupGetTrackIDPathQuery(
return 0, "", "", fmt.Errorf("invalid track path (%s)", pathAndQuery) return 0, "", "", fmt.Errorf("invalid track path (%s)", pathAndQuery)
} }
func setupGetTransport(th headers.Transport) (ClientTransport, bool) { func setupGetTransport(th headers.Transport) (Transport, bool) {
delivery := func() base.StreamDelivery { delivery := func() base.StreamDelivery {
if th.Delivery != nil { if th.Delivery != nil {
return *th.Delivery return *th.Delivery
@@ -86,15 +86,15 @@ func setupGetTransport(th headers.Transport) (ClientTransport, bool) {
switch th.Protocol { switch th.Protocol {
case base.StreamProtocolUDP: case base.StreamProtocolUDP:
if delivery == base.StreamDeliveryUnicast { if delivery == base.StreamDeliveryUnicast {
return ClientTransportUDP, true return TransportUDP, true
} }
return ClientTransportUDPMulticast, true return TransportUDPMulticast, true
default: // TCP default: // TCP
if delivery != base.StreamDeliveryUnicast { if delivery != base.StreamDeliveryUnicast {
return 0, false return 0, false
} }
return ClientTransportTCP, true return TransportTCP, true
} }
} }
@@ -152,7 +152,7 @@ type ServerSession struct {
state ServerSessionState state ServerSessionState
setuppedTracks map[int]ServerSessionSetuppedTrack setuppedTracks map[int]ServerSessionSetuppedTrack
setuppedTracksByChannel map[int]int // tcp setuppedTracksByChannel map[int]int // tcp
setuppedTransport *ClientTransport setuppedTransport *Transport
setuppedBaseURL *base.URL // publish setuppedBaseURL *base.URL // publish
setuppedStream *ServerStream // read setuppedStream *ServerStream // read
setuppedPath *string setuppedPath *string
@@ -209,7 +209,7 @@ func (ss *ServerSession) SetuppedTracks() map[int]ServerSessionSetuppedTrack {
} }
// SetuppedTransport returns the transport of the setupped tracks. // SetuppedTransport returns the transport of the setupped tracks.
func (ss *ServerSession) SetuppedTransport() *ClientTransport { func (ss *ServerSession) SetuppedTransport() *Transport {
return ss.setuppedTransport return ss.setuppedTransport
} }
@@ -299,7 +299,7 @@ func (ss *ServerSession) run() {
// if session is not in state RECORD or PLAY, or transport is TCP // if session is not in state RECORD or PLAY, or transport is TCP
if (ss.state != ServerSessionStatePublish && if (ss.state != ServerSessionStatePublish &&
ss.state != ServerSessionStateRead) || ss.state != ServerSessionStateRead) ||
*ss.setuppedTransport == ClientTransportTCP { *ss.setuppedTransport == TransportTCP {
// close if there are no active connections // close if there are no active connections
if len(ss.conns) == 0 { if len(ss.conns) == 0 {
@@ -310,8 +310,8 @@ func (ss *ServerSession) run() {
case <-checkTimeoutTicker.C: case <-checkTimeoutTicker.C:
switch { switch {
// in case of RECORD and UDP, timeout happens when no frames are being received // in case of RECORD and UDP, timeout happens when no frames are being received
case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == ClientTransportUDP || case ss.state == ServerSessionStatePublish && (*ss.setuppedTransport == TransportUDP ||
*ss.setuppedTransport == ClientTransportUDPMulticast): *ss.setuppedTransport == TransportUDPMulticast):
now := time.Now() now := time.Now()
lft := atomic.LoadInt64(ss.udpLastFrameTime) lft := atomic.LoadInt64(ss.udpLastFrameTime)
if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout {
@@ -319,8 +319,8 @@ func (ss *ServerSession) run() {
} }
// in case of PLAY and UDP, timeout happens when no request arrives // in case of PLAY and UDP, timeout happens when no request arrives
case ss.state == ServerSessionStateRead && (*ss.setuppedTransport == ClientTransportUDP || case ss.state == ServerSessionStateRead && (*ss.setuppedTransport == TransportUDP ||
*ss.setuppedTransport == ClientTransportUDPMulticast): *ss.setuppedTransport == TransportUDPMulticast):
now := time.Now() now := time.Now()
if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor { if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor {
return liberrors.ErrServerSessionTimedOut{} return liberrors.ErrServerSessionTimedOut{}
@@ -352,12 +352,12 @@ func (ss *ServerSession) run() {
case ServerSessionStateRead: case ServerSessionStateRead:
ss.setuppedStream.readerSetInactive(ss) ss.setuppedStream.readerSetInactive(ss)
if *ss.setuppedTransport == ClientTransportUDP { if *ss.setuppedTransport == TransportUDP {
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
} }
case ServerSessionStatePublish: case ServerSessionStatePublish:
if *ss.setuppedTransport == ClientTransportUDP { if *ss.setuppedTransport == TransportUDP {
ss.s.udpRTPListener.removeClient(ss) ss.s.udpRTPListener.removeClient(ss)
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
} }
@@ -576,7 +576,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
switch transport { switch transport {
case ClientTransportUDP: case TransportUDP:
if inTH.ClientPorts == nil { if inTH.ClientPorts == nil {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
@@ -589,7 +589,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}, nil }, nil
} }
case ClientTransportUDPMulticast: case TransportUDPMulticast:
if ss.s.MulticastIPRange == "" { if ss.s.MulticastIPRange == "" {
return &base.Response{ return &base.Response{
StatusCode: base.StatusUnsupportedTransport, StatusCode: base.StatusUnsupportedTransport,
@@ -632,7 +632,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
} }
default: // record default: // record
if transport == ClientTransportUDPMulticast { if transport == TransportUDPMulticast {
return &base.Response{ return &base.Response{
StatusCode: base.StatusUnsupportedTransport, StatusCode: base.StatusUnsupportedTransport,
}, nil }, nil
@@ -692,7 +692,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
sst := ServerSessionSetuppedTrack{} sst := ServerSessionSetuppedTrack{}
switch transport { switch transport {
case ClientTransportUDP: case TransportUDP:
sst.udpRTPPort = inTH.ClientPorts[0] sst.udpRTPPort = inTH.ClientPorts[0]
sst.udpRTCPPort = inTH.ClientPorts[1] sst.udpRTCPPort = inTH.ClientPorts[1]
@@ -702,7 +702,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
th.ClientPorts = inTH.ClientPorts th.ClientPorts = inTH.ClientPorts
th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()} th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()}
case ClientTransportUDPMulticast: case TransportUDPMulticast:
th.Protocol = base.StreamProtocolUDP th.Protocol = base.StreamProtocolUDP
de := base.StreamDeliveryMulticast de := base.StreamDeliveryMulticast
th.Delivery = &de th.Delivery = &de
@@ -802,7 +802,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
if res.StatusCode == base.StatusOK { if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStateRead ss.state = ServerSessionStateRead
if *ss.setuppedTransport == ClientTransportTCP { if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc ss.tcpConn = sc
} }
@@ -846,7 +846,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.setuppedStream.readerSetActive(ss) ss.setuppedStream.readerSetActive(ss)
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP: case TransportUDP:
for trackID, track := range ss.setuppedTracks { for trackID, track := range ss.setuppedTracks {
// readers can send RTCP packets // readers can send RTCP packets
sc.s.udpRTCPListener.addClient(ss.ip(), track.udpRTCPPort, ss, trackID, false) sc.s.udpRTCPListener.addClient(ss.ip(), track.udpRTCPPort, ss, trackID, false)
@@ -858,7 +858,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
return res, err return res, err
case ClientTransportUDPMulticast: case TransportUDPMulticast:
default: // TCP default: // TCP
err = liberrors.ErrServerTCPFramesEnable{} err = liberrors.ErrServerTCPFramesEnable{}
@@ -899,7 +899,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
path, query := base.PathSplitQuery(pathAndQuery) path, query := base.PathSplitQuery(pathAndQuery)
// allow to use WriteFrame() before response // allow to use WriteFrame() before response
if *ss.setuppedTransport == ClientTransportTCP { if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc ss.tcpConn = sc
} }
@@ -921,7 +921,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.state = ServerSessionStatePublish ss.state = ServerSessionStatePublish
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP: case TransportUDP:
for trackID, track := range ss.setuppedTracks { for trackID, track := range ss.setuppedTracks {
ss.s.udpRTPListener.addClient(ss.ip(), track.udpRTPPort, ss, trackID, true) ss.s.udpRTPListener.addClient(ss.ip(), track.udpRTPPort, ss, trackID, true)
ss.s.udpRTCPListener.addClient(ss.ip(), track.udpRTCPPort, ss, trackID, true) ss.s.udpRTCPListener.addClient(ss.ip(), track.udpRTCPPort, ss, trackID, true)
@@ -933,7 +933,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
} }
case ClientTransportUDPMulticast: case TransportUDPMulticast:
default: // TCP default: // TCP
err = liberrors.ErrServerTCPFramesEnable{} err = liberrors.ErrServerTCPFramesEnable{}
@@ -988,10 +988,10 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.tcpConn = nil ss.tcpConn = nil
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP: case TransportUDP:
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
case ClientTransportUDPMulticast: case TransportUDPMulticast:
default: // TCP default: // TCP
err = liberrors.ErrServerTCPFramesDisable{} err = liberrors.ErrServerTCPFramesDisable{}
@@ -1002,11 +1002,11 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.tcpConn = nil ss.tcpConn = nil
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP: case TransportUDP:
ss.s.udpRTPListener.removeClient(ss) ss.s.udpRTPListener.removeClient(ss)
ss.s.udpRTCPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss)
case ClientTransportUDPMulticast: case TransportUDPMulticast:
default: // TCP default: // TCP
err = liberrors.ErrServerTCPFramesDisable{} err = liberrors.ErrServerTCPFramesDisable{}
@@ -1064,7 +1064,7 @@ func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload
} }
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP: case TransportUDP:
track := ss.setuppedTracks[trackID] track := ss.setuppedTracks[trackID]
if streamType == StreamTypeRTP { if streamType == StreamTypeRTP {
@@ -1081,7 +1081,7 @@ func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload
}) })
} }
case ClientTransportTCP: case TransportTCP:
channel := ss.setuppedTracks[trackID].tcpChannel channel := ss.setuppedTracks[trackID].tcpChannel
if streamType == base.StreamTypeRTCP { if streamType == base.StreamTypeRTCP {
channel++ channel++

View File

@@ -113,7 +113,7 @@ func (st *ServerStream) lastSequenceNumber(trackID int) uint16 {
func (st *ServerStream) readerAdd( func (st *ServerStream) readerAdd(
ss *ServerSession, ss *ServerSession,
transport ClientTransport, transport Transport,
clientPorts *[2]int, clientPorts *[2]int,
) error { ) error {
st.mutex.Lock() st.mutex.Lock()
@@ -128,10 +128,10 @@ func (st *ServerStream) readerAdd(
} }
switch transport { switch transport {
case ClientTransportUDP: case TransportUDP:
// check whether client ports are already in use by another reader. // check whether client ports are already in use by another reader.
for r := range st.readersUnicast { for r := range st.readersUnicast {
if *r.setuppedTransport == ClientTransportUDP && if *r.setuppedTransport == TransportUDP &&
r.ip().Equal(ss.ip()) && r.ip().Equal(ss.ip()) &&
r.zone() == ss.zone() { r.zone() == ss.zone() {
for _, rt := range r.setuppedTracks { for _, rt := range r.setuppedTracks {
@@ -142,7 +142,7 @@ func (st *ServerStream) readerAdd(
} }
} }
case ClientTransportUDPMulticast: case TransportUDPMulticast:
// allocate multicast listeners // allocate multicast listeners
if st.multicastListeners == nil { if st.multicastListeners == nil {
st.multicastListeners = make([]*listenerPair, len(st.tracks)) st.multicastListeners = make([]*listenerPair, len(st.tracks))
@@ -193,7 +193,7 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) {
defer st.mutex.Unlock() defer st.mutex.Unlock()
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP, ClientTransportTCP: case TransportUDP, TransportTCP:
st.readersUnicast[ss] = struct{}{} st.readersUnicast[ss] = struct{}{}
default: // UDPMulticast default: // UDPMulticast
@@ -209,7 +209,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
defer st.mutex.Unlock() defer st.mutex.Unlock()
switch *ss.setuppedTransport { switch *ss.setuppedTransport {
case ClientTransportUDP, ClientTransportTCP: case TransportUDP, TransportTCP:
delete(st.readersUnicast, ss) delete(st.readersUnicast, ss)
default: // UDPMulticast default: // UDPMulticast

25
transport.go Normal file
View File

@@ -0,0 +1,25 @@
package gortsplib
// Transport is a RTSP stream transport.
type Transport int
// standard transports.
const (
TransportUDP Transport = iota
TransportUDPMulticast
TransportTCP
)
var transportLabels = map[Transport]string{
TransportUDP: "UDP",
TransportUDPMulticast: "UDP-multicast",
TransportTCP: "TCP",
}
// String implements fmt.Stringer.
func (t Transport) String() string {
if l, ok := transportLabels[t]; ok {
return l
}
return "unknown"
}

15
transport_test.go Normal file
View File

@@ -0,0 +1,15 @@
package gortsplib
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestTransportString(t *testing.T) {
tr := TransportUDPMulticast
require.NotEqual(t, "unknown", tr.String())
tr = Transport(15)
require.Equal(t, "unknown", tr.String())
}