client: use a single goroutine during play / record

This commit is contained in:
aler9
2021-11-12 13:03:38 +01:00
committed by Alessandro Ros
parent f6601580db
commit 0440a926b8
3 changed files with 349 additions and 472 deletions

554
client.go
View File

@@ -38,9 +38,10 @@ const (
clientUDPKeepalivePeriod = 30 * time.Second
)
func isErrNOUDPPacketsReceivedRecently(err error) bool {
_, ok := err.(liberrors.ErrClientNoUDPPacketsRecently)
return ok
func emptyTimer() *time.Timer {
t := time.NewTimer(0)
<-t.C
return t
}
func isAnyPort(p int) bool {
@@ -214,12 +215,17 @@ type Client struct {
tracks map[int]clientTrack
tracksByChannel map[int]int
lastRange *headers.Range
backgroundRunning bool
backgroundErr error
tcpFrameBuffer *multibuffer.MultiBuffer // tcp
tcpWriteMutex sync.Mutex // tcp
writeMutex sync.RWMutex // write
writeFrameAllowed bool // write
reportTimer *time.Timer
checkStreamTimer *time.Timer
checkStreamInitial bool
tcpLastFrameTime int64
keepaliveTimer *time.Timer
readerRunning bool
finalErr error
// in
options chan optionsReq
@@ -229,11 +235,9 @@ type Client struct {
play chan playReq
record chan recordReq
pause chan pauseReq
backgroundTerminate chan struct{}
// out
backgroundInnerDone chan error
backgroundDone chan struct{}
readerErr chan error
done chan struct{}
}
@@ -291,6 +295,9 @@ func (c *Client) Dial(scheme string, host string) error {
c.host = host
c.ctx = ctx
c.ctxCancel = ctxCancel
c.reportTimer = emptyTimer()
c.checkStreamTimer = emptyTimer()
c.keepaliveTimer = emptyTimer()
c.options = make(chan optionsReq)
c.describe = make(chan describeReq)
c.announce = make(chan announceReq)
@@ -416,7 +423,7 @@ func (c *Client) Tracks() Tracks {
func (c *Client) run() {
defer close(c.done)
outer:
c.finalErr = func() error {
for {
select {
case req := <-c.options:
@@ -447,34 +454,128 @@ outer:
res, err := c.doPause()
req.res <- clientRes{res: res, err: err}
case err := <-c.backgroundInnerDone:
c.backgroundRunning = false
err = c.switchProtocolIfTimeout(err)
if err != nil {
c.backgroundErr = err
close(c.backgroundDone)
c.writeMutex.Lock()
c.writeFrameAllowed = false
c.writeMutex.Unlock()
case <-c.reportTimer.C:
if c.state == clientStatePlay {
now := time.Now()
for trackID, cct := range c.tracks {
rr := cct.rtcpReceiver.Report(now)
c.WritePacketRTCP(trackID, rr)
}
c.reportTimer = time.NewTimer(c.receiverReportPeriod)
} else { // Record
now := time.Now()
for trackID, cct := range c.tracks {
sr := cct.rtcpSender.Report(now)
if sr != nil {
c.WritePacketRTCP(trackID, sr)
}
}
c.reportTimer = time.NewTimer(c.senderReportPeriod)
}
case <-c.checkStreamTimer.C:
if *c.protocol == TransportUDP ||
*c.protocol == TransportUDPMulticast {
if c.checkStreamInitial {
c.checkStreamInitial = false
// check that at least one packet has been received
inTimeout := func() bool {
for _, cct := range c.tracks {
lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime)
if lft != 0 {
return false
}
lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime)
if lft != 0 {
return false
}
}
return true
}()
if inTimeout {
err := c.trySwitchingProtocol()
if err != nil {
return err
}
}
c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod)
} else {
inTimeout := func() bool {
now := time.Now()
for _, cct := range c.tracks {
lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
}
return true
}()
if inTimeout {
return liberrors.ErrClientUDPTimeout{}
}
c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod)
}
} else { // TCP
inTimeout := func() bool {
now := time.Now()
lft := time.Unix(atomic.LoadInt64(&c.tcpLastFrameTime), 0)
return now.Sub(lft) >= c.ReadTimeout
}()
if inTimeout {
return liberrors.ErrClientTCPTimeout{}
}
c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod)
}
case <-c.keepaliveTimer.C:
_, err := c.do(&base.Request{
Method: func() base.Method {
// the vlc integrated rtsp server requires GET_PARAMETER
if c.useGetParameter {
return base.GetParameter
}
return base.Options
}(),
// use the stream base URL, otherwise some cameras do not reply
URL: c.streamBaseURL,
}, true)
if err != nil {
return err
}
c.keepaliveTimer = time.NewTimer(clientUDPKeepalivePeriod)
case err := <-c.readerErr:
c.readerRunning = false
return err
case <-c.ctx.Done():
break outer
return liberrors.ErrClientTerminated{}
}
}
}()
c.ctxCancel()
c.doClose(false)
c.doClose()
}
func (c *Client) doClose(isSwitchingProtocol bool) {
if c.backgroundRunning {
c.backgroundClose(isSwitchingProtocol)
}
func (c *Client) doClose() {
if c.state == clientStatePlay || c.state == clientStateRecord {
c.playRecordClose()
c.do(&base.Request{
Method: base.Teardown,
URL: c.streamBaseURL,
@@ -494,8 +595,8 @@ func (c *Client) doClose(isSwitchingProtocol bool) {
}
}
func (c *Client) reset(isSwitchingProtocol bool) {
c.doClose(isSwitchingProtocol)
func (c *Client) reset() {
c.doClose()
c.state = clientStateInitial
c.session = ""
@@ -524,19 +625,12 @@ func (c *Client) checkState(allowed map[clientState]struct{}) error {
return liberrors.ErrClientInvalidState{AllowedList: allowedList, State: c.state}
}
func (c *Client) switchProtocolIfTimeout(err error) error {
if *c.protocol != TransportUDP ||
c.state != clientStatePlay ||
!isErrNOUDPPacketsReceivedRecently(err) ||
c.Transport != nil {
return err
}
func (c *Client) trySwitchingProtocol() error {
prevBaseURL := c.streamBaseURL
oldUseGetParameter := c.useGetParameter
prevTracks := c.tracks
c.reset(true)
c.reset()
v := TransportTCP
c.protocol = &v
@@ -551,7 +645,7 @@ func (c *Client) switchProtocolIfTimeout(err error) error {
}
}
_, err = c.doPlay(c.lastRange, true)
_, err := c.doPlay(c.lastRange, true)
if err != nil {
return err
}
@@ -559,198 +653,74 @@ func (c *Client) switchProtocolIfTimeout(err error) error {
return nil
}
func (c *Client) backgroundStart(isSwitchingProtocol bool) {
func (c *Client) playRecordStart() {
// start timers
if c.state == clientStatePlay {
c.reportTimer = time.NewTimer(c.receiverReportPeriod)
switch *c.protocol {
case TransportUDP:
c.checkStreamTimer = time.NewTimer(c.InitialUDPReadTimeout)
c.checkStreamInitial = true
c.keepaliveTimer = time.NewTimer(clientUDPKeepalivePeriod)
case TransportUDPMulticast:
c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod)
c.keepaliveTimer = time.NewTimer(clientUDPKeepalivePeriod)
default: // TCP
c.checkStreamTimer = time.NewTimer(clientCheckStreamPeriod)
c.tcpLastFrameTime = time.Now().Unix()
}
} else {
c.reportTimer = time.NewTimer(c.senderReportPeriod)
}
// allow writing
c.writeMutex.Lock()
c.writeFrameAllowed = true
c.writeMutex.Unlock()
c.backgroundRunning = true
c.backgroundTerminate = make(chan struct{})
c.backgroundInnerDone = make(chan error)
if !isSwitchingProtocol {
c.backgroundDone = make(chan struct{})
}
go c.runBackground()
}
func (c *Client) backgroundClose(isSwitchingProtocol bool) {
close(c.backgroundTerminate)
err := <-c.backgroundInnerDone
c.backgroundRunning = false
if !isSwitchingProtocol {
c.backgroundErr = err
close(c.backgroundDone)
}
c.writeMutex.Lock()
c.writeFrameAllowed = false
c.writeMutex.Unlock()
}
func (c *Client) runBackground() {
c.backgroundInnerDone <- func() error {
if c.state == clientStatePlay {
// start UDP listeners
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
return c.runBackgroundPlayUDP()
}
return c.runBackgroundPlayTCP()
}
if *c.protocol == TransportUDP {
return c.runBackgroundRecordUDP()
}
return c.runBackgroundRecordTCP()
}()
}
func (c *Client) runBackgroundPlayUDP() error {
for _, cct := range c.tracks {
cct.udpRTPListener.start()
cct.udpRTCPListener.start()
}
defer func() {
for _, cct := range c.tracks {
cct.udpRTPListener.stop()
cct.udpRTCPListener.stop()
}
}()
// disable deadline
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform a check with a ticker.
c.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
// start reader
c.readerRunning = true
c.readerErr = make(chan error)
go func() {
c.readerErr <- c.runReader()
}()
}
func (c *Client) runReader() error {
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
for {
var res base.Response
err := res.Read(c.br)
if err != nil {
readerDone <- err
return
}
}
}()
reportTicker := time.NewTicker(c.receiverReportPeriod)
defer reportTicker.Stop()
keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamInitial := true
checkStreamTicker := time.NewTicker(c.InitialUDPReadTimeout)
defer func() {
checkStreamTicker.Stop()
}()
for {
select {
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range c.tracks {
rr := cct.rtcpReceiver.Report(now)
c.WritePacketRTCP(trackID, rr)
}
case <-keepaliveTicker.C:
_, err := c.do(&base.Request{
Method: func() base.Method {
// the vlc integrated rtsp server requires GET_PARAMETER
if c.useGetParameter {
return base.GetParameter
}
return base.Options
}(),
// use the stream base URL, otherwise some cameras do not reply
URL: c.streamBaseURL,
}, true)
if err != nil {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return err
}
case <-checkStreamTicker.C:
if checkStreamInitial {
// check that at least one packet has been received
inTimeout := func() bool {
for _, cct := range c.tracks {
lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime)
if lft != 0 {
return false
}
lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime)
if lft != 0 {
return false
}
}
return true
}()
if inTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientNoUDPPacketsRecently{}
}
checkStreamInitial = false
checkStreamTicker.Stop()
checkStreamTicker = time.NewTicker(clientCheckStreamPeriod)
} else {
inTimeout := func() bool {
now := time.Now()
for _, cct := range c.tracks {
lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
}
return true
}()
if inTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientUDPTimeout{}
}
}
case err := <-readerDone:
return err
}
}
}
func (c *Client) runBackgroundPlayTCP() error {
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform check with a ticker.
c.nconn.SetReadDeadline(time.Time{})
lastFrameTime := time.Now().Unix()
readerDone := make(chan error)
go func() {
if c.state == clientStatePlay {
for {
frame := base.InterleavedFrame{
Payload: c.tcpFrameBuffer.Next(),
}
err := frame.Read(c.br)
if err != nil {
readerDone <- err
return
return err
}
channel := frame.Channel
@@ -766,7 +736,7 @@ func (c *Client) runBackgroundPlayTCP() error {
}
now := time.Now()
atomic.StoreInt64(&lastFrameTime, now.Unix())
atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix())
if isRTP {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload)
@@ -776,113 +746,14 @@ func (c *Client) runBackgroundPlayTCP() error {
c.OnPacketRTCP(c, trackID, frame.Payload)
}
}
}()
reportTicker := time.NewTicker(c.receiverReportPeriod)
defer reportTicker.Stop()
checkStreamTicker := time.NewTicker(clientCheckStreamPeriod)
defer checkStreamTicker.Stop()
for {
select {
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range c.tracks {
rr := cct.rtcpReceiver.Report(now)
c.WritePacketRTCP(trackID, rr)
}
case <-checkStreamTicker.C:
inTimeout := func() bool {
now := time.Now()
lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0)
return now.Sub(lft) >= c.ReadTimeout
}()
if inTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientTCPTimeout{}
}
case err := <-readerDone:
return err
}
}
}
func (c *Client) runBackgroundRecordUDP() error {
for _, cct := range c.tracks {
cct.udpRTPListener.start()
cct.udpRTCPListener.start()
}
defer func() {
for _, cct := range c.tracks {
cct.udpRTPListener.stop()
cct.udpRTCPListener.stop()
}
}()
// disable deadline
c.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
go func() {
for {
var res base.Response
err := res.Read(c.br)
if err != nil {
readerDone <- err
return
}
}
}()
reportTicker := time.NewTicker(c.senderReportPeriod)
defer reportTicker.Stop()
for {
select {
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range c.tracks {
sr := cct.rtcpSender.Report(now)
if sr != nil {
c.WritePacketRTCP(trackID, sr)
}
}
case err := <-readerDone:
return err
}
}
}
func (c *Client) runBackgroundRecordTCP() error {
// disable deadline
c.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
go func() {
} else { // Record
for {
frame := base.InterleavedFrame{
Payload: c.tcpFrameBuffer.Next(),
}
err := frame.Read(c.br)
if err != nil {
readerDone <- err
return
return err
}
channel := frame.Channel
@@ -901,31 +772,34 @@ func (c *Client) runBackgroundRecordTCP() error {
c.OnPacketRTCP(c, trackID, frame.Payload)
}
}
}()
}
}
}
reportTicker := time.NewTicker(c.senderReportPeriod)
defer reportTicker.Stop()
for {
select {
case <-c.backgroundTerminate:
func (c *Client) playRecordClose() {
// stop reader
if c.readerRunning {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
<-c.readerErr
}
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range c.tracks {
sr := cct.rtcpSender.Report(now)
if sr != nil {
c.WritePacketRTCP(trackID, sr)
// stop UDP listeners
if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast {
for _, cct := range c.tracks {
cct.udpRTPListener.stop()
cct.udpRTCPListener.stop()
}
}
case err := <-readerDone:
return err
}
}
// forbid writing
c.writeMutex.Lock()
c.writeFrameAllowed = false
c.writeMutex.Unlock()
// stop timers
c.reportTimer = emptyTimer()
c.checkStreamTimer = emptyTimer()
c.keepaliveTimer = emptyTimer()
}
func (c *Client) connOpen() error {
@@ -1164,7 +1038,7 @@ func (c *Client) doDescribe(u *base.URL) (Tracks, *base.URL, *base.Response, err
res.StatusCode >= base.StatusMovedPermanently &&
res.StatusCode <= base.StatusUseProxy &&
len(res.Header["Location"]) == 1 {
c.reset(false)
c.reset()
u, err := base.ParseURL(res.Header["Location"][0])
if err != nil {
@@ -1336,7 +1210,7 @@ func (c *Client) doSetup(
}
proto := func() Transport {
// protocol set by previous Setup() or switchProtocolIfTimeout()
// protocol set by previous Setup() or trySwitchingProtocol()
if c.protocol != nil {
return *c.protocol
}
@@ -1676,7 +1550,7 @@ func (c *Client) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Resp
c.state = clientStatePlay
c.lastRange = ra
c.backgroundStart(isSwitchingProtocol)
c.playRecordStart()
return res, nil
}
@@ -1719,7 +1593,7 @@ func (c *Client) doRecord() (*base.Response, error) {
c.state = clientStateRecord
c.backgroundStart(false)
c.playRecordStart()
return nil, nil
}
@@ -1747,7 +1621,7 @@ func (c *Client) doPause() (*base.Response, error) {
return nil, err
}
c.backgroundClose(false)
c.playRecordClose()
res, err := c.do(&base.Request{
Method: base.Pause,
@@ -1799,21 +1673,27 @@ func (c *Client) Seek(ra *headers.Range) (*base.Response, error) {
// ReadFrames starts reading frames.
func (c *Client) ReadFrames() error {
<-c.backgroundDone
return c.backgroundErr
<-c.done
return c.finalErr
}
// WritePacketRTP writes a RTP packet.
func (c *Client) WritePacketRTP(trackID int, payload []byte) error {
now := time.Now()
select {
case <-c.done:
return c.finalErr
default:
}
c.writeMutex.RLock()
defer c.writeMutex.RUnlock()
if !c.writeFrameAllowed {
return c.backgroundErr
return liberrors.ErrClientWriteNotAllowed{}
}
now := time.Now()
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload)
}
@@ -1838,15 +1718,21 @@ func (c *Client) WritePacketRTP(trackID int, payload []byte) error {
// WritePacketRTCP writes a RTCP packet.
func (c *Client) WritePacketRTCP(trackID int, payload []byte) error {
now := time.Now()
select {
case <-c.done:
return c.finalErr
default:
}
c.writeMutex.RLock()
defer c.writeMutex.RUnlock()
if !c.writeFrameAllowed {
return c.backgroundErr
return liberrors.ErrClientWriteNotAllowed{}
}
now := time.Now()
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload)
}

View File

@@ -1768,9 +1768,6 @@ func TestClientReadPause(t *testing.T) {
<-frameRecv
_, err = c.Pause()
require.NoError(t, err)
<-done
c.ReadFrames()
firstFrame = int32(0)
frameRecv = make(chan struct{})
@@ -1778,12 +1775,6 @@ func TestClientReadPause(t *testing.T) {
_, err = c.Play(nil)
require.NoError(t, err)
done = make(chan struct{})
go func() {
defer close(done)
c.ReadFrames()
}()
<-frameRecv
c.Close()
<-done

View File

@@ -163,14 +163,6 @@ func (e ErrClientTransportHeaderInterleavedIDsAlreadyUsed) Error() string {
return "interleaved IDs already used"
}
// ErrClientNoUDPPacketsRecently is an error that can be returned by a client.
type ErrClientNoUDPPacketsRecently struct{}
// Error implements the error interface.
func (e ErrClientNoUDPPacketsRecently) Error() string {
return "no UDP packets received (maybe there's a firewall/NAT in between)"
}
// ErrClientUDPTimeout is an error that can be returned by a client.
type ErrClientUDPTimeout struct{}
@@ -196,3 +188,11 @@ type ErrClientRTPInfoInvalid struct {
func (e ErrClientRTPInfoInvalid) Error() string {
return fmt.Sprintf("invalid RTP-Info: %v", e.Err)
}
// ErrClientWriteNotAllowed is an error that can be returned by a client.
type ErrClientWriteNotAllowed struct{}
// Error implements the error interface.
func (e ErrClientWriteNotAllowed) Error() string {
return "writing is not allowed at the moment"
}