client: switch to TCP if no UDP packets have been received within a timeout (https://github.com/aler9/rtsp-simple-server/issues/173)

This commit is contained in:
aler9
2021-03-26 23:02:36 +01:00
parent 3a38a93b66
commit 2df9029f01
8 changed files with 473 additions and 154 deletions

View File

@@ -13,19 +13,26 @@ Go ≥ 1.14 is required.
Features:
* Client
* Read streams from servers with UDP or TCP
* Publish streams to servers with UDP or TCP
* Encrypt streams with TLS (RTSPS)
* Query servers about published streams
* Read only selected tracks of a stream
* Pause reading or publishing without disconnecting from the server
* General
* Query servers about published streams
* Encrypt connection with TLS (RTSPS)
* Reading
* Read streams from servers with UDP or TCP
* Select protocol automatically
* Read only selected tracks of a stream
* Pause reading without disconnecting from the server
* Generate RTCP receiver reports automatically
* Publishing
* Publish streams to servers with UDP or TCP
* Pause publishing without disconnecting from the server
* Generate RTCP sender reports automatically
* Server
* Handle requests from clients
* Read streams from clients with UDP or TCP
* Send streams to clients with UDP or TCP
* Write streams to clients with UDP or TCP
* Encrypt streams with TLS (RTSPS)
* Generate RTCP sender and receiver reports automatically
* General
* RTCP reports are generated automatically
* Encode and decode RTSP primitives, RTP/H264, RTP/AAC, SDP
## Table of contents

View File

@@ -52,6 +52,11 @@ type ClientConf struct {
// It defaults to 10 seconds.
ReadTimeout time.Duration
// If the client is reading with UDP, it must receive
// at least a packet within this timeout.
// It defaults to 3 seconds.
InitialUDPReadTimeout time.Duration
// timeout of write operations.
// It defaults to 10 seconds.
WriteTimeout time.Duration

View File

@@ -32,8 +32,9 @@ const (
clientConnWriteBufferSize = 4096
clientConnReceiverReportPeriod = 10 * time.Second
clientConnSenderReportPeriod = 10 * time.Second
clientConnUDPCheckStreamPeriod = 5 * time.Second
clientConnUDPCheckStreamPeriod = 1 * time.Second
clientConnUDPKeepalivePeriod = 30 * time.Second
clientConnTCPSetDeadlinePeriod = 1 * time.Second
)
type clientConnState int
@@ -106,12 +107,16 @@ func newClientConn(conf ClientConf, scheme string, host string) (*ClientConn, er
if conf.ReadTimeout == 0 {
conf.ReadTimeout = 10 * time.Second
}
if conf.InitialUDPReadTimeout == 0 {
conf.InitialUDPReadTimeout = 3 * time.Second
}
if conf.WriteTimeout == 0 {
conf.WriteTimeout = 10 * time.Second
}
if conf.ReadBufferCount == 0 {
conf.ReadBufferCount = 1
}
if conf.ReadBufferSize == 0 {
conf.ReadBufferSize = 2048
}
@@ -126,9 +131,6 @@ func newClientConn(conf ClientConf, scheme string, host string) (*ClientConn, er
conf: conf,
udpRTPListeners: make(map[int]*clientConnUDPListener),
udpRTCPListeners: make(map[int]*clientConnUDPListener),
rtcpReceivers: make(map[int]*rtcpreceiver.RTCPReceiver),
tcpFrameBuffer: multibuffer.New(uint64(conf.ReadBufferCount), uint64(conf.ReadBufferSize)),
rtcpSenders: make(map[int]*rtcpsender.RTCPSender),
publishError: fmt.Errorf("not running"),
}
@@ -161,7 +163,30 @@ func (c *ClientConn) Close() error {
l.close()
}
return c.connClose()
if c.nconn != nil {
c.nconn.Close()
}
return nil
}
func (c *ClientConn) reset() {
c.Close()
c.state = clientConnStateInitial
c.nconn = nil
c.streamURL = nil
c.streamProtocol = nil
c.tracks = nil
c.udpRTPListeners = make(map[int]*clientConnUDPListener)
c.udpRTCPListeners = make(map[int]*clientConnUDPListener)
c.getParameterSupported = false
// read only
c.rtpInfo = nil
c.rtcpReceivers = nil
c.tcpFrameBuffer = nil
c.readCB = nil
}
func (c *ClientConn) connOpen(scheme string, host string) error {
@@ -197,16 +222,6 @@ func (c *ClientConn) connOpen(scheme string, host string) error {
return nil
}
func (c *ClientConn) connClose() error {
if c.nconn == nil {
return nil
}
err := c.nconn.Close()
c.nconn = nil
return err
}
func (c *ClientConn) checkState(allowed map[clientConnState]struct{}) error {
if _, ok := allowed[c.state]; ok {
return nil
@@ -269,15 +284,23 @@ func (c *ClientConn) Do(req *base.Request) (*base.Response, error) {
return nil, nil
}
// read the response and ignore interleaved frames in between;
// interleaved frames are sent in two situations:
// * when the server is v4lrtspserver, before the PLAY response
// * when the stream is already playing
var res base.Response
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
err = res.ReadIgnoreFrames(c.br, c.tcpFrameBuffer.Next())
if err != nil {
return nil, err
if c.tcpFrameBuffer != nil {
// read the response and ignore interleaved frames in between;
// interleaved frames are sent in two scenarios:
// * when the server is v4lrtspserver, before the PLAY response
// * when the stream is already playing
err = res.ReadIgnoreFrames(c.br, c.tcpFrameBuffer.Next())
if err != nil {
return nil, err
}
} else {
err = res.Read(c.br)
if err != nil {
return nil, err
}
}
if c.conf.OnResponse != nil {
@@ -386,7 +409,7 @@ func (c *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) {
res.StatusCode <= base.StatusUseProxy &&
len(res.Header["Location"]) == 1 {
c.connClose()
c.reset()
u, err := base.ParseURL(res.Header["Location"][0])
if err != nil {
@@ -460,7 +483,7 @@ func (c *ClientConn) Setup(mode headers.TransportMode, track *Track,
}
proto := func() StreamProtocol {
// protocol set by previous Setup()
// protocol set by previous Setup() or ReadFrames()
if c.streamProtocol != nil {
return *c.streamProtocol
}
@@ -628,8 +651,14 @@ func (c *ClientConn) Setup(mode headers.TransportMode, track *Track,
clockRate, _ := track.ClockRate()
if mode == headers.TransportModePlay {
if c.rtcpReceivers == nil {
c.rtcpReceivers = make(map[int]*rtcpreceiver.RTCPReceiver)
}
c.rtcpReceivers[track.ID] = rtcpreceiver.New(nil, clockRate)
} else {
if c.rtcpSenders == nil {
c.rtcpSenders = make(map[int]*rtcpsender.RTCPSender)
}
c.rtcpSenders[track.ID] = rtcpsender.New(clockRate)
}
@@ -659,6 +688,11 @@ func (c *ClientConn) Setup(mode headers.TransportMode, track *Track,
if mode == headers.TransportModePlay {
c.state = clientConnStatePrePlay
if *c.streamProtocol == StreamProtocolTCP && c.tcpFrameBuffer == nil {
c.tcpFrameBuffer = multibuffer.New(uint64(c.conf.ReadBufferCount), uint64(c.conf.ReadBufferSize))
}
} else {
c.state = clientConnStatePreRecord
}

View File

@@ -93,8 +93,11 @@ func (c *ClientConn) backgroundPlayUDP() error {
keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamTicker := time.NewTicker(clientConnUDPCheckStreamPeriod)
defer checkStreamTicker.Stop()
checkStreamInitial := true
checkStreamTicker := time.NewTicker(c.conf.InitialUDPReadTimeout)
defer func() {
checkStreamTicker.Stop()
}()
for {
select {
@@ -130,25 +133,55 @@ func (c *ClientConn) backgroundPlayUDP() error {
}
case <-checkStreamTicker.C:
inTimeout := func() bool {
now := time.Now()
for trackID := range c.udpRTPListeners {
last := time.Unix(atomic.LoadInt64(c.udpRTPListeners[trackID].lastFrameTime), 0)
if now.Sub(last) < c.conf.ReadTimeout {
return false
}
if checkStreamInitial {
// check that at least one packet has been received
inTimeout := func() bool {
for trackID := range c.udpRTPListeners {
lft := atomic.LoadInt64(c.udpRTPListeners[trackID].lastFrameTime)
if lft != 0 {
fmt.Println("LFT", lft)
return false
}
last = time.Unix(atomic.LoadInt64(c.udpRTCPListeners[trackID].lastFrameTime), 0)
if now.Sub(last) < c.conf.ReadTimeout {
return false
lft = atomic.LoadInt64(c.udpRTCPListeners[trackID].lastFrameTime)
if lft != 0 {
fmt.Println("LFT", lft)
return false
}
}
return true
}()
if inTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientNoUDPPacketsRecently{}
}
checkStreamInitial = false
checkStreamTicker.Stop()
checkStreamTicker = time.NewTicker(clientConnUDPCheckStreamPeriod)
} else {
inTimeout := func() bool {
now := time.Now()
for trackID := range c.udpRTPListeners {
lft := atomic.LoadInt64(c.udpRTPListeners[trackID].lastFrameTime)
if now.Sub(time.Unix(lft, 0)) < c.conf.ReadTimeout {
return false
}
lft = atomic.LoadInt64(c.udpRTCPListeners[trackID].lastFrameTime)
if now.Sub(time.Unix(lft, 0)) < c.conf.ReadTimeout {
return false
}
}
return true
}()
if inTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientUDPTimeout{}
}
return true
}()
if inTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientNoUDPPacketsRecently{}
}
case err := <-readerDone:
@@ -181,7 +214,7 @@ func (c *ClientConn) backgroundPlayTCP() error {
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we call it with a ticker.
deadlineTicker := time.NewTicker(1 * time.Second)
deadlineTicker := time.NewTicker(clientConnTCPSetDeadlinePeriod)
defer deadlineTicker.Stop()
for {
@@ -234,11 +267,55 @@ func (c *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan erro
c.backgroundDone = make(chan struct{})
go func() {
defer close(c.backgroundDone)
if *c.streamProtocol == StreamProtocolUDP {
done <- c.backgroundPlayUDP()
err := c.backgroundPlayUDP()
close(c.backgroundDone)
// automatically change protocol in case of timeout
if _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently); ok {
if c.conf.StreamProtocol == nil {
err := func() error {
u := c.streamURL
tracks := c.tracks
c.reset()
v := StreamProtocolTCP
c.streamProtocol = &v
err := c.connOpen(u.Scheme, u.Host)
if err != nil {
return err
}
_, err = c.Options(u)
if err != nil {
c.Close()
return err
}
for _, track := range tracks {
_, err := c.Setup(headers.TransportModePlay, track, 0, 0)
if err != nil {
c.Close()
return err
}
}
_, err = c.Play()
if err != nil {
c.Close()
return err
}
return <-c.ReadFrames(onFrame)
}()
done <- err
}
}
done <- err
} else {
defer close(c.backgroundDone)
done <- c.backgroundPlayTCP()
}
}()

View File

@@ -326,111 +326,293 @@ func TestClientReadAnyPort(t *testing.T) {
}
func TestClientReadAutomaticProtocol(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer l.Close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
conn, err := l.Accept()
t.Run("switch after status code", func(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
defer l.Close()
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
conn, err := l.Accept()
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: Tracks{track}.Write(),
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = base.Response{
StatusCode: base.StatusUnsupportedTransport,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
InterleavedIDs: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
require.NoError(t, err)
}()
conn, err := DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
frameRecv := make(chan struct{})
done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
close(frameRecv)
})
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: Tracks{track}.Write(),
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = base.Response{
StatusCode: base.StatusUnsupportedTransport,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
InterleavedIDs: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
require.NoError(t, err)
}()
conf := ClientConf{StreamProtocol: nil}
conn, err := conf.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
frameRecv := make(chan struct{})
done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
close(frameRecv)
<-frameRecv
conn.Close()
<-done
})
<-frameRecv
conn.Close()
<-done
t.Run("switch after timeout", func(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer l.Close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
conn, err := l.Accept()
require.NoError(t, err)
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: Tracks{track}.Write(),
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
var inTH headers.Transport
err = inTH.Read(req.Header["Transport"])
require.NoError(t, err)
th := headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Protocol: StreamProtocolUDP,
ServerPorts: &[2]int{34556, 34557},
ClientPorts: inTH.ClientPorts,
}
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": th.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
conn.Close()
conn, err = l.Accept()
require.NoError(t, err)
bconn = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
inTH = headers.Transport{}
err = inTH.Read(req.Header["Transport"])
require.NoError(t, err)
th = headers.Transport{
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Protocol: StreamProtocolTCP,
InterleavedIDs: inTH.InterleavedIDs,
}
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": th.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
conn.Close()
}()
conf := ClientConf{
ReadTimeout: 1 * time.Second,
}
conn, err := conf.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
frameRecv := make(chan struct{})
done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
close(frameRecv)
})
<-frameRecv
conn.Close()
<-done
})
}
func TestClientReadRedirect(t *testing.T) {
@@ -742,6 +924,11 @@ func TestClientReadPause(t *testing.T) {
close(writerTerminate)
<-writerDone
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
conn.Close()
}()

View File

@@ -45,7 +45,7 @@ func newClientConnUDPListener(c *ClientConn, port int) (*clientConnUDPListener,
pc: pc,
frameBuffer: multibuffer.New(uint64(c.conf.ReadBufferCount), uint64(c.conf.ReadBufferSize)),
lastFrameTime: func() *int64 {
v := time.Now().Unix()
v := int64(0)
return &v
}(),
}, nil

View File

@@ -139,7 +139,16 @@ type ErrClientNoUDPPacketsRecently struct{}
// Error implements the error interface.
func (e ErrClientNoUDPPacketsRecently) Error() string {
return "no UDP packets received recently (maybe there's a firewall/NAT in between)"
return "no UDP packets received (maybe there's a firewall/NAT in between)"
}
// ErrClientUDPTimeout is returned when UDP packets have been received previously
// but now nothing is being received.
type ErrClientUDPTimeout struct{}
// Error implements the error interface.
func (e ErrClientUDPTimeout) Error() string {
return "UDP timeout"
}
// ErrClientRTPInfoInvalid is returned in case of an invalid RTP-Info.

View File

@@ -165,5 +165,5 @@ type ErrServerNoUDPPacketsRecently struct{}
// Error implements the error interface.
func (e ErrServerNoUDPPacketsRecently) Error() string {
return "no UDP packets received recently (maybe there's a firewall/NAT in between)"
return "no UDP packets received (maybe there's a firewall/NAT in between)"
}