From aeb1958bc1eb6dca44b1a70e4e2f1cb863564a87 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 5 Jun 2021 23:15:18 +0200 Subject: [PATCH] remove StreamProtocol from root --- client.go | 2 +- client_publish_test.go | 50 +++++++++++++++---------------- client_read_test.go | 68 +++++++++++++++++++++--------------------- clientconn.go | 36 +++++++++++----------- defs.go | 11 ------- server_publish_test.go | 26 ++++++++-------- server_read_test.go | 26 ++++++++-------- server_test.go | 14 ++++----- serversession.go | 36 +++++++++++----------- 9 files changed, 129 insertions(+), 140 deletions(-) diff --git a/client.go b/client.go index bde71405..571124bc 100644 --- a/client.go +++ b/client.go @@ -67,7 +67,7 @@ type Client struct { // the stream protocol (UDP or TCP). // If nil, it is chosen automatically (first UDP, then, if it fails, TCP). // It defaults to nil. - StreamProtocol *StreamProtocol + StreamProtocol *base.StreamProtocol // If the client is reading with UDP, it must receive // at least a packet within this timeout. // It defaults to 3 seconds. diff --git a/client_publish_test.go b/client_publish_test.go index aa453262..b02849b2 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -80,12 +80,12 @@ func TestClientPublishSerial(t *testing.T) { } if proto == "udp" { - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -162,12 +162,12 @@ func TestClientPublishSerial(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { if proto == "udp" { - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v } - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v }(), } @@ -267,12 +267,12 @@ func TestClientPublishParallel(t *testing.T) { } if proto == "udp" { - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -304,12 +304,12 @@ func TestClientPublishParallel(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { if proto == "udp" { - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v } - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v }(), } @@ -407,12 +407,12 @@ func TestClientPublishPauseSerial(t *testing.T) { } if proto == "udp" { - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -462,12 +462,12 @@ func TestClientPublishPauseSerial(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { if proto == "udp" { - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v } - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v }(), } @@ -563,12 +563,12 @@ func TestClientPublishPauseParallel(t *testing.T) { } if proto == "udp" { - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -600,12 +600,12 @@ func TestClientPublishPauseParallel(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { if proto == "udp" { - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v } - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v }(), } @@ -702,14 +702,14 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { var inTH headers.Transport err = inTH.Read(req.Header["Transport"]) require.NoError(t, err) - require.Equal(t, StreamProtocolTCP, inTH.Protocol) + require.Equal(t, base.StreamProtocolTCP, inTH.Protocol) th := headers.Transport{ Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, InterleavedIDs: &[2]int{0, 1}, } @@ -814,7 +814,7 @@ func TestClientPublishRTCPReport(t *testing.T) { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, InterleavedIDs: inTH.InterleavedIDs, } @@ -883,8 +883,8 @@ func TestClientPublishRTCPReport(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { - v := StreamProtocolTCP + StreamProtocol: func() *base.StreamProtocol { + v := base.StreamProtocolTCP return &v }(), senderReportPeriod: 1 * time.Second, diff --git a/client_read_test.go b/client_read_test.go index 3a917d90..38e6f87e 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -91,7 +91,7 @@ func TestClientReadTracks(t *testing.T) { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, ClientPorts: inTH.ClientPorts, ServerPorts: &[2]int{34556 + i*2, 34557 + i*2}, } @@ -258,11 +258,11 @@ func TestClientRead(t *testing.T) { } if proto == "udp" { - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ClientPorts = inTH.ClientPorts th.ServerPorts = &[2]int{34556, 34557} } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = &[2]int{0, 1} } @@ -347,12 +347,12 @@ func TestClientRead(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { if proto == "udp" { - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v } - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v }(), } @@ -445,7 +445,7 @@ func TestClientReadNoContentBase(t *testing.T) { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, ClientPorts: inTH.ClientPorts, ServerPorts: &[2]int{34556, 34557}, } @@ -550,7 +550,7 @@ func TestClientReadAnyPort(t *testing.T) { StatusCode: base.StatusOK, Header: base.Header{ "Transport": headers.Transport{ - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -680,13 +680,13 @@ func TestClientReadAutomaticProtocol(t *testing.T) { var inTH headers.Transport err = inTH.Read(req.Header["Transport"]) require.NoError(t, err) - require.Equal(t, StreamProtocolTCP, inTH.Protocol) + require.Equal(t, base.StreamProtocolTCP, inTH.Protocol) err = base.Response{ StatusCode: base.StatusOK, Header: base.Header{ "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -809,7 +809,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, ServerPorts: &[2]int{34556, 34557}, ClientPorts: inTH.ClientPorts, } @@ -877,7 +877,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, InterleavedIDs: inTH.InterleavedIDs, } @@ -1032,7 +1032,7 @@ func TestClientReadRedirect(t *testing.T) { StatusCode: base.StatusOK, Header: base.Header{ "Transport": headers.Transport{ - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -1091,7 +1091,7 @@ func TestClientReadPause(t *testing.T) { defer close(writerDone) var l1 net.PacketConn - if inTH.Protocol == StreamProtocolUDP { + if inTH.Protocol == base.StreamProtocolUDP { var err error l1, err = net.ListenPacket("udp", "localhost:34556") require.NoError(t, err) @@ -1104,7 +1104,7 @@ func TestClientReadPause(t *testing.T) { for { select { case <-t.C: - if inTH.Protocol == StreamProtocolUDP { + if inTH.Protocol == base.StreamProtocolUDP { l1.WriteTo([]byte("\x00\x00\x00\x00"), &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: inTH.ClientPorts[0], @@ -1194,12 +1194,12 @@ func TestClientReadPause(t *testing.T) { } if proto == "udp" { - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -1259,12 +1259,12 @@ func TestClientReadPause(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { if proto == "udp" { - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v } - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v }(), } @@ -1374,7 +1374,7 @@ func TestClientReadRTCPReport(t *testing.T) { StatusCode: base.StatusOK, Header: base.Header{ "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -1454,8 +1454,8 @@ func TestClientReadRTCPReport(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { - v := StreamProtocolTCP + StreamProtocol: func() *base.StreamProtocol { + v := base.StreamProtocolTCP return &v }(), receiverReportPeriod: 1 * time.Second, @@ -1560,12 +1560,12 @@ func TestClientReadErrorTimeout(t *testing.T) { require.NoError(t, err) defer l1.Close() - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ServerPorts = &[2]int{34556, 34557} th.ClientPorts = inTH.ClientPorts } else { - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -1611,14 +1611,14 @@ func TestClientReadErrorTimeout(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { + StreamProtocol: func() *base.StreamProtocol { switch proto { case "udp": - v := StreamProtocolUDP + v := base.StreamProtocolUDP return &v case "tcp": - v := StreamProtocolTCP + v := base.StreamProtocolTCP return &v } return nil @@ -1707,7 +1707,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { return &v }(), } - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs err = base.Response{ @@ -1752,8 +1752,8 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { - v := StreamProtocolTCP + StreamProtocol: func() *base.StreamProtocol { + v := base.StreamProtocolTCP return &v }(), } @@ -1836,7 +1836,7 @@ func TestClientReadSeek(t *testing.T) { v := base.StreamDeliveryUnicast return &v }(), - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, InterleavedIDs: inTH.InterleavedIDs, } @@ -1903,8 +1903,8 @@ func TestClientReadSeek(t *testing.T) { }() c := &Client{ - StreamProtocol: func() *StreamProtocol { - v := StreamProtocolTCP + StreamProtocol: func() *base.StreamProtocol { + v := base.StreamProtocolTCP return &v }(), } diff --git a/clientconn.go b/clientconn.go index 0aab449c..bb57039b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -134,7 +134,7 @@ type ClientConn struct { cseq int useGetParameter bool streamBaseURL *base.URL - streamProtocol *StreamProtocol + streamProtocol *base.StreamProtocol tracks map[int]clientConnTrack lastRange *headers.Range backgroundRunning bool @@ -364,7 +364,7 @@ func (cc *ClientConn) checkState(allowed map[clientConnState]struct{}) error { } func (cc *ClientConn) switchProtocolIfTimeout(err error) error { - if *cc.streamProtocol != StreamProtocolUDP || + if *cc.streamProtocol != base.StreamProtocolUDP || cc.state != clientConnStatePlay || !isErrNOUDPPacketsReceivedRecently(err) || cc.c.StreamProtocol != nil { @@ -377,7 +377,7 @@ func (cc *ClientConn) switchProtocolIfTimeout(err error) error { cc.reset(true) - v := StreamProtocolTCP + v := base.StreamProtocolTCP cc.streamProtocol = &v cc.useGetParameter = oldUseGetParameter cc.scheme = prevBaseURL.Scheme @@ -443,13 +443,13 @@ func (cc *ClientConn) backgroundClose(isSwitchingProtocol bool) { func (cc *ClientConn) runBackground() { cc.backgroundInnerDone <- func() error { if cc.state == clientConnStatePlay { - if *cc.streamProtocol == StreamProtocolUDP { + if *cc.streamProtocol == base.StreamProtocolUDP { return cc.runBackgroundPlayUDP() } return cc.runBackgroundPlayTCP() } - if *cc.streamProtocol == StreamProtocolUDP { + if *cc.streamProtocol == base.StreamProtocolUDP { return cc.runBackgroundRecordUDP() } return cc.runBackgroundRecordTCP() @@ -766,7 +766,7 @@ func (cc *ClientConn) connOpen() error { return fmt.Errorf("unsupported scheme '%s'", cc.scheme) } - v := StreamProtocolUDP + v := base.StreamProtocolUDP if cc.scheme == "rtsps" && cc.c.StreamProtocol == &v { return fmt.Errorf("RTSPS can't be used with UDP") } @@ -1168,11 +1168,11 @@ func (cc *ClientConn) doSetup( // always use TCP if encrypted if cc.scheme == "rtsps" { - v := StreamProtocolTCP + v := base.StreamProtocolTCP cc.streamProtocol = &v } - proto := func() StreamProtocol { + proto := func() base.StreamProtocol { // protocol set by previous Setup() or switchProtocolIfTimeout() if cc.streamProtocol != nil { return *cc.streamProtocol @@ -1184,7 +1184,7 @@ func (cc *ClientConn) doSetup( } // try UDP - return StreamProtocolUDP + return base.StreamProtocolUDP }() th := headers.Transport{ @@ -1255,7 +1255,7 @@ func (cc *ClientConn) doSetup( trackURL, err := track.URL() if err != nil { - if proto == StreamProtocolUDP { + if proto == base.StreamProtocolUDP { rtpListener.close() rtcpListener.close() } @@ -1270,7 +1270,7 @@ func (cc *ClientConn) doSetup( }, }, false) if err != nil { - if proto == StreamProtocolUDP { + if proto == base.StreamProtocolUDP { rtpListener.close() rtcpListener.close() } @@ -1278,7 +1278,7 @@ func (cc *ClientConn) doSetup( } if res.StatusCode != base.StatusOK { - if proto == StreamProtocolUDP { + if proto == base.StreamProtocolUDP { rtpListener.close() rtcpListener.close() } @@ -1288,7 +1288,7 @@ func (cc *ClientConn) doSetup( cc.streamProtocol == nil && cc.c.StreamProtocol == nil { - v := StreamProtocolTCP + v := base.StreamProtocolTCP cc.streamProtocol = &v return cc.doSetup(mode, track, 0, 0) @@ -1300,14 +1300,14 @@ func (cc *ClientConn) doSetup( var thRes headers.Transport err = thRes.Read(res.Header["Transport"]) if err != nil { - if proto == StreamProtocolUDP { + if proto == base.StreamProtocolUDP { rtpListener.close() rtcpListener.close() } return nil, liberrors.ErrClientTransportHeaderInvalid{Err: err} } - if proto == StreamProtocolUDP { + if proto == base.StreamProtocolUDP { if !cc.c.AnyPortEnable { if thRes.ServerPorts == nil || isAnyPort(thRes.ServerPorts[0]) || isAnyPort(thRes.ServerPorts[1]) { rtpListener.close() @@ -1342,7 +1342,7 @@ func (cc *ClientConn) doSetup( cc.streamBaseURL = track.BaseURL cc.streamProtocol = &proto - if proto == StreamProtocolUDP { + if proto == base.StreamProtocolUDP { rtpListener.remoteIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteZone = cc.nconn.RemoteAddr().(*net.TCPAddr).Zone if thRes.ServerPorts != nil { @@ -1370,7 +1370,7 @@ func (cc *ClientConn) doSetup( cc.state = clientConnStatePreRecord } - if *cc.streamProtocol == StreamProtocolTCP && + if *cc.streamProtocol == base.StreamProtocolTCP && cc.tcpFrameBuffer == nil { cc.tcpFrameBuffer = multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize)) } @@ -1607,7 +1607,7 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b cc.tracks[trackID].rtcpSender.ProcessFrame(now, streamType, payload) } - if *cc.streamProtocol == StreamProtocolUDP { + if *cc.streamProtocol == base.StreamProtocolUDP { if streamType == StreamTypeRTP { return cc.tracks[trackID].udpRTPListener.write(payload) } diff --git a/defs.go b/defs.go index bd767b64..727ecd09 100644 --- a/defs.go +++ b/defs.go @@ -4,17 +4,6 @@ import ( "github.com/aler9/gortsplib/pkg/base" ) -// StreamProtocol is the protocol of a stream. -type StreamProtocol = base.StreamProtocol - -const ( - // StreamProtocolUDP means that the stream uses the UDP protocol - StreamProtocolUDP StreamProtocol = base.StreamProtocolUDP - - // StreamProtocolTCP means that the stream uses the TCP protocol - StreamProtocolTCP StreamProtocol = base.StreamProtocolTCP -) - // StreamType is the stream type. type StreamType = base.StreamType diff --git a/server_publish_test.go b/server_publish_test.go index 082f6b8e..98ff2493 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -358,7 +358,7 @@ func TestServerPublishSetupPath(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -439,7 +439,7 @@ func TestServerPublishErrorSetupDifferentPaths(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -521,7 +521,7 @@ func TestServerPublishErrorSetupTrackTwice(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -623,7 +623,7 @@ func TestServerPublishErrorRecordPartialTracks(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -786,10 +786,10 @@ func TestServerPublish(t *testing.T) { } if proto == "udp" { - inTH.Protocol = StreamProtocolUDP + inTH.Protocol = base.StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} } else { - inTH.Protocol = StreamProtocolTCP + inTH.Protocol = base.StreamProtocolTCP inTH.InterleavedIDs = &[2]int{0, 1} } @@ -971,7 +971,7 @@ func TestServerPublishErrorWrongProtocol(t *testing.T) { v := headers.TransportModeRecord return &v }(), - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, ClientPorts: &[2]int{35466, 35467}, } @@ -1073,7 +1073,7 @@ func TestServerPublishRTCPReport(t *testing.T) { v := headers.TransportModeRecord return &v }(), - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, InterleavedIDs: &[2]int{0, 1}, } @@ -1237,10 +1237,10 @@ func TestServerPublishTimeout(t *testing.T) { } if proto == "udp" { - inTH.Protocol = StreamProtocolUDP + inTH.Protocol = base.StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} } else { - inTH.Protocol = StreamProtocolTCP + inTH.Protocol = base.StreamProtocolTCP inTH.InterleavedIDs = &[2]int{0, 1} } @@ -1364,10 +1364,10 @@ func TestServerPublishWithoutTeardown(t *testing.T) { } if proto == "udp" { - inTH.Protocol = StreamProtocolUDP + inTH.Protocol = base.StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} } else { - inTH.Protocol = StreamProtocolTCP + inTH.Protocol = base.StreamProtocolTCP inTH.InterleavedIDs = &[2]int{0, 1} } @@ -1480,7 +1480,7 @@ func TestServerPublishUDPChangeConn(t *testing.T) { v := headers.TransportModeRecord return &v }(), - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, ClientPorts: &[2]int{35466, 35467}, } diff --git a/server_read_test.go b/server_read_test.go index 7498dd4e..8b0319bb 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -81,7 +81,7 @@ func TestServerReadSetupPath(t *testing.T) { bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -134,7 +134,7 @@ func TestServerReadErrorSetupDifferentPaths(t *testing.T) { bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -201,7 +201,7 @@ func TestServerReadErrorSetupTrackTwice(t *testing.T) { bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) th := &headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -336,10 +336,10 @@ func TestServerRead(t *testing.T) { } if proto == "udp" { - inTH.Protocol = StreamProtocolUDP + inTH.Protocol = base.StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} } else { - inTH.Protocol = StreamProtocolTCP + inTH.Protocol = base.StreamProtocolTCP inTH.InterleavedIDs = &[2]int{0, 1} } @@ -533,7 +533,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -599,7 +599,7 @@ func TestServerReadPlayPlay(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -701,7 +701,7 @@ func TestServerReadPlayPausePlay(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -810,7 +810,7 @@ func TestServerReadPlayPausePause(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -922,7 +922,7 @@ func TestServerReadTimeout(t *testing.T) { }(), } - inTH.Protocol = StreamProtocolUDP + inTH.Protocol = base.StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} res, err := writeReqReadRes(bconn, base.Request{ @@ -1015,10 +1015,10 @@ func TestServerReadWithoutTeardown(t *testing.T) { } if proto == "udp" { - inTH.Protocol = StreamProtocolUDP + inTH.Protocol = base.StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} } else { - inTH.Protocol = StreamProtocolTCP + inTH.Protocol = base.StreamProtocolTCP inTH.InterleavedIDs = &[2]int{0, 1} } @@ -1098,7 +1098,7 @@ func TestServerReadUDPChangeConn(t *testing.T) { v := headers.TransportModePlay return &v }(), - Protocol: StreamProtocolUDP, + Protocol: base.StreamProtocolUDP, ClientPorts: &[2]int{35466, 35467}, } diff --git a/server_test.go b/server_test.go index 17adca80..8eff0afe 100644 --- a/server_test.go +++ b/server_test.go @@ -655,7 +655,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -693,7 +693,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -747,7 +747,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -780,7 +780,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"3"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -945,7 +945,7 @@ func TestServerSessionClose(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -993,7 +993,7 @@ func TestServerSessionAutoClose(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -1096,7 +1096,7 @@ func TestServerErrorInvalidPath(t *testing.T) { "CSeq": base.HeaderValue{"2"}, "Session": base.HeaderValue{sxID}, "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v diff --git a/serversession.go b/serversession.go index b1a9c39c..2c67b67b 100644 --- a/serversession.go +++ b/serversession.go @@ -123,7 +123,7 @@ type ServerSession struct { conns map[*ServerConn]struct{} state ServerSessionState setuppedTracks map[int]ServerSessionSetuppedTrack - setupProtocol *StreamProtocol + setupProtocol *base.StreamProtocol setupPath *string setupQuery *string lastRequestTime time.Time @@ -180,7 +180,7 @@ func (ss *ServerSession) State() ServerSessionState { } // StreamProtocol returns the stream protocol of the setupped tracks. -func (ss *ServerSession) StreamProtocol() *StreamProtocol { +func (ss *ServerSession) StreamProtocol() *base.StreamProtocol { return ss.setupProtocol } @@ -268,7 +268,7 @@ func (ss *ServerSession) run() { // if session is not in state RECORD or PLAY, or protocol is TCP if (ss.state != ServerSessionStateRecord && ss.state != ServerSessionStatePlay) || - *ss.setupProtocol == StreamProtocolTCP { + *ss.setupProtocol == base.StreamProtocolTCP { // close if there are no active connections if len(ss.conns) == 0 { @@ -279,7 +279,7 @@ func (ss *ServerSession) run() { case <-checkTimeoutTicker.C: switch { // in case of RECORD and UDP, timeout happens when no frames are being received - case ss.state == ServerSessionStateRecord && *ss.setupProtocol == StreamProtocolUDP: + case ss.state == ServerSessionStateRecord && *ss.setupProtocol == base.StreamProtocolUDP: now := time.Now() lft := atomic.LoadInt64(ss.udpLastFrameTime) if now.Sub(time.Unix(lft, 0)) >= ss.s.ReadTimeout { @@ -287,7 +287,7 @@ func (ss *ServerSession) run() { } // in case of PLAY and UDP, timeout happens when no request arrives - case ss.state == ServerSessionStatePlay && *ss.setupProtocol == StreamProtocolUDP: + case ss.state == ServerSessionStatePlay && *ss.setupProtocol == base.StreamProtocolUDP: now := time.Now() if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor { return liberrors.ErrServerSessionTimedOut{} @@ -317,12 +317,12 @@ func (ss *ServerSession) run() { switch ss.state { case ServerSessionStatePlay: - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { ss.s.udpRTCPListener.removeClient(ss) } case ServerSessionStateRecord: - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { ss.s.udpRTPListener.removeClient(ss) ss.s.udpRTCPListener.removeClient(ss) } @@ -549,7 +549,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base } } - if inTH.Protocol == StreamProtocolUDP { + if inTH.Protocol == base.StreamProtocolUDP { if ss.s.udpRTPListener == nil { return &base.Response{ StatusCode: base.StatusUnsupportedTransport, @@ -613,20 +613,20 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base }(), } - if inTH.Protocol == StreamProtocolUDP { + if inTH.Protocol == base.StreamProtocolUDP { ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{ udpRTPPort: inTH.ClientPorts[0], udpRTCPPort: inTH.ClientPorts[1], } - th.Protocol = StreamProtocolUDP + th.Protocol = base.StreamProtocolUDP th.ClientPorts = inTH.ClientPorts th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()} } else { ss.setuppedTracks[trackID] = ServerSessionSetuppedTrack{} - th.Protocol = StreamProtocolTCP + th.Protocol = base.StreamProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs } @@ -686,7 +686,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // allow to use WriteFrame() before response if ss.state != ServerSessionStatePlay { - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { ss.udpIP = sc.ip() ss.udpZone = sc.zone() } else { @@ -706,7 +706,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base if res.StatusCode == base.StatusOK { ss.state = ServerSessionStatePlay - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { // readers can send RTCP frames, they cannot sent RTP frames for trackID, track := range ss.setuppedTracks { sc.s.udpRTCPListener.addClient(ss.udpIP, track.udpRTCPPort, ss, trackID, false) @@ -754,7 +754,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base path, query := base.PathSplitQuery(pathAndQuery) // allow to use WriteFrame() before response - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { ss.udpIP = sc.ip() ss.udpZone = sc.zone() } else { @@ -772,7 +772,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base if res.StatusCode == base.StatusOK { ss.state = ServerSessionStateRecord - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { for trackID, track := range ss.setuppedTracks { ss.s.udpRTPListener.addClient(ss.udpIP, track.udpRTPPort, ss, trackID, true) ss.s.udpRTCPListener.addClient(ss.udpIP, track.udpRTCPPort, ss, trackID, true) @@ -837,7 +837,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.udpZone = "" ss.tcpConn = nil - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { ss.s.udpRTCPListener.removeClient(ss) } else { return res, liberrors.ErrServerTCPFramesDisable{} @@ -849,7 +849,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.udpZone = "" ss.tcpConn = nil - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { ss.s.udpRTPListener.removeClient(ss) } else { return res, liberrors.ErrServerTCPFramesDisable{} @@ -902,7 +902,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // WriteFrame writes a frame. func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload []byte) { - if *ss.setupProtocol == StreamProtocolUDP { + if *ss.setupProtocol == base.StreamProtocolUDP { track := ss.setuppedTracks[trackID] if streamType == StreamTypeRTP {