Files
gortsplib/clientconnread.go
2020-12-09 22:07:39 +01:00

239 lines
5.3 KiB
Go

package gortsplib
import (
"fmt"
"sync/atomic"
"time"
"github.com/aler9/gortsplib/pkg/base"
)
// Play writes a PLAY request and reads a Response.
// This can be called only after Setup().
func (c *ClientConn) Play() (*base.Response, error) {
err := c.checkState(map[clientConnState]struct{}{
clientConnStatePrePlay: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.Play,
URL: c.streamURL,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
func (c *ClientConn) backgroundPlayUDP(done chan error) {
defer close(c.backgroundDone)
var returnError error
defer func() {
for trackID := range c.udpRtpListeners {
c.udpRtpListeners[trackID].stop()
c.udpRtcpListeners[trackID].stop()
}
done <- returnError
}()
// open the firewall by sending packets to the counterpart
for trackID := range c.udpRtpListeners {
c.udpRtpListeners[trackID].write(
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
c.udpRtcpListeners[trackID].write(
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
for trackID := range c.udpRtpListeners {
c.udpRtpListeners[trackID].start()
c.udpRtcpListeners[trackID].start()
}
// disable deadline
c.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
go func() {
for {
var res base.Response
err := res.Read(c.br)
if err != nil {
readerDone <- err
return
}
}
}()
reportTicker := time.NewTicker(clientReceiverReportPeriod)
defer reportTicker.Stop()
keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod)
defer checkStreamTicker.Stop()
for {
select {
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = fmt.Errorf("terminated")
return
case <-reportTicker.C:
now := time.Now()
for trackID := range c.rtcpReceivers {
r := c.rtcpReceivers[trackID].Report(now)
c.udpRtcpListeners[trackID].write(r)
}
case <-keepaliveTicker.C:
_, err := c.Do(&base.Request{
Method: func() base.Method {
// the vlc integrated rtsp server requires GET_PARAMETER
if c.getParameterSupported {
return base.GetParameter
}
return base.Options
}(),
// use the stream path, otherwise some cameras do not reply
URL: c.streamURL,
SkipResponse: true,
})
if err != nil {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = err
return
}
case <-checkStreamTicker.C:
now := time.Now()
for _, lastUnix := range c.udpLastFrameTimes {
last := time.Unix(atomic.LoadInt64(lastUnix), 0)
if now.Sub(last) >= c.conf.ReadTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = fmt.Errorf("no UDP packets received recently (maybe there's a firewall/NAT in between)")
return
}
}
case err := <-readerDone:
returnError = err
return
}
}
}
func (c *ClientConn) backgroundPlayTCP(done chan error) {
defer close(c.backgroundDone)
var returnError error
defer func() {
done <- returnError
}()
readerDone := make(chan error)
go func() {
for {
frame := base.InterleavedFrame{
Content: c.tcpFrameBuffer.Next(),
}
err := frame.Read(c.br)
if err != nil {
readerDone <- err
return
}
c.rtcpReceivers[frame.TrackID].ProcessFrame(time.Now(), frame.StreamType, frame.Content)
c.readCB(frame.TrackID, frame.StreamType, frame.Content)
}
}()
reportTicker := time.NewTicker(clientReceiverReportPeriod)
defer reportTicker.Stop()
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we call it with a ticker.
deadlineTicker := time.NewTicker(1 * time.Second)
defer deadlineTicker.Stop()
for {
select {
case <-deadlineTicker.C:
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = fmt.Errorf("terminated")
return
case <-reportTicker.C:
now := time.Now()
for trackID := range c.rtcpReceivers {
r := c.rtcpReceivers[trackID].Report(now)
c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
frame := base.InterleavedFrame{
TrackID: trackID,
StreamType: StreamTypeRtcp,
Content: r,
}
frame.Write(c.bw)
}
case err := <-readerDone:
returnError = err
return
}
}
}
// ReadFrames starts reading frames.
// it returns a channel that is written when the reading stops.
// This can be called only after Play().
func (c *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan error {
// channel is buffered, since listening to it is not mandatory
done := make(chan error, 1)
err := c.checkState(map[clientConnState]struct{}{
clientConnStatePrePlay: {},
})
if err != nil {
done <- err
return done
}
c.state = clientConnStatePlay
c.readCB = onFrame
c.backgroundTerminate = make(chan struct{})
c.backgroundDone = make(chan struct{})
if *c.streamProtocol == StreamProtocolUDP {
go c.backgroundPlayUDP(done)
} else {
go c.backgroundPlayTCP(done)
}
return done
}