client: fix OnResponse() not getting called when playing / recording (#290)

This commit is contained in:
Alessandro Ros
2023-05-28 12:12:25 +02:00
committed by GitHub
parent f254503cf5
commit 1e1e10b031
5 changed files with 277 additions and 214 deletions

View File

@@ -6,24 +6,24 @@ import (
// this struct contains a queue that allows to detach the routine that is reading a stream // this struct contains a queue that allows to detach the routine that is reading a stream
// from the routine that is writing a stream. // from the routine that is writing a stream.
type writer struct { type asyncProcessor struct {
running bool running bool
buffer *ringbuffer.RingBuffer buffer *ringbuffer.RingBuffer
done chan struct{} done chan struct{}
} }
func (w *writer) allocateBuffer(size int) { func (w *asyncProcessor) allocateBuffer(size int) {
w.buffer, _ = ringbuffer.New(uint64(size)) w.buffer, _ = ringbuffer.New(uint64(size))
} }
func (w *writer) start() { func (w *asyncProcessor) start() {
w.running = true w.running = true
w.done = make(chan struct{}) w.done = make(chan struct{})
go w.run() go w.run()
} }
func (w *writer) stop() { func (w *asyncProcessor) stop() {
if w.running { if w.running {
w.buffer.Close() w.buffer.Close()
<-w.done <-w.done
@@ -31,7 +31,7 @@ func (w *writer) stop() {
} }
} }
func (w *writer) run() { func (w *asyncProcessor) run() {
defer close(w.done) defer close(w.done)
for { for {
@@ -44,6 +44,6 @@ func (w *writer) run() {
} }
} }
func (w *writer) queue(cb func()) { func (w *asyncProcessor) queue(cb func()) {
w.buffer.Push(cb) w.buffer.Push(cb)
} }

351
client.go
View File

@@ -232,49 +232,45 @@ type Client struct {
senderReportPeriod time.Duration senderReportPeriod time.Duration
udpReceiverReportPeriod time.Duration udpReceiverReportPeriod time.Duration
checkStreamPeriod time.Duration checkTimeoutPeriod time.Duration
keepalivePeriod time.Duration keepalivePeriod time.Duration
scheme string scheme string
host string host string
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
state clientState state clientState
nconn net.Conn nconn net.Conn
conn *conn.Conn conn *conn.Conn
session string session string
sender *auth.Sender sender *auth.Sender
cseq int cseq int
optionsSent bool optionsSent bool
useGetParameter bool useGetParameter bool
lastDescribeURL *url.URL lastDescribeURL *url.URL
baseURL *url.URL baseURL *url.URL
effectiveTransport *Transport effectiveTransport *Transport
medias map[*media.Media]*clientMedia medias map[*media.Media]*clientMedia
tcpMediasByChannel map[int]*clientMedia tcpMediasByChannel map[int]*clientMedia
lastRange *headers.Range lastRange *headers.Range
checkStreamTimer *time.Timer checkTimeoutTimer *time.Timer
checkStreamInitial bool checkTimeoutInitial bool
tcpLastFrameTime *int64 tcpLastFrameTime *int64
keepaliveTimer *time.Timer keepaliveTimer *time.Timer
closeError error closeError error
writer writer writer asyncProcessor
reader *clientReader
// connCloser channels connCloser *clientConnCloser
connCloserTerminate chan struct{}
connCloserDone chan struct{}
// reader channels
readerErr chan error
// in // in
options chan optionsReq options chan optionsReq
describe chan describeReq describe chan describeReq
announce chan announceReq announce chan announceReq
setup chan setupReq setup chan setupReq
play chan playReq play chan playReq
record chan recordReq record chan recordReq
pause chan pauseReq pause chan pauseReq
readError chan error
// out // out
done chan struct{} done chan struct{}
@@ -364,8 +360,8 @@ func (c *Client) Start(scheme string, host string) error {
// some cameras require a maximum of 5secs between keepalives // some cameras require a maximum of 5secs between keepalives
c.udpReceiverReportPeriod = 5 * time.Second c.udpReceiverReportPeriod = 5 * time.Second
} }
if c.checkStreamPeriod == 0 { if c.checkTimeoutPeriod == 0 {
c.checkStreamPeriod = 1 * time.Second c.checkTimeoutPeriod = 1 * time.Second
} }
if c.keepalivePeriod == 0 { if c.keepalivePeriod == 0 {
c.keepalivePeriod = 30 * time.Second c.keepalivePeriod = 30 * time.Second
@@ -377,7 +373,7 @@ func (c *Client) Start(scheme string, host string) error {
c.host = host c.host = host
c.ctx = ctx c.ctx = ctx
c.ctxCancel = ctxCancel c.ctxCancel = ctxCancel
c.checkStreamTimer = emptyTimer() c.checkTimeoutTimer = emptyTimer()
c.keepaliveTimer = emptyTimer() c.keepaliveTimer = emptyTimer()
c.options = make(chan optionsReq) c.options = make(chan optionsReq)
c.describe = make(chan describeReq) c.describe = make(chan describeReq)
@@ -386,6 +382,7 @@ func (c *Client) Start(scheme string, host string) error {
c.play = make(chan playReq) c.play = make(chan playReq)
c.record = make(chan recordReq) c.record = make(chan recordReq)
c.pause = make(chan pauseReq) c.pause = make(chan pauseReq)
c.readError = make(chan error)
c.done = make(chan struct{}) c.done = make(chan struct{})
go c.run() go c.run()
@@ -481,86 +478,22 @@ func (c *Client) runInner() error {
res, err := c.doPause() res, err := c.doPause()
req.res <- clientRes{res: res, err: err} req.res <- clientRes{res: res, err: err}
case <-c.checkStreamTimer.C: case <-c.checkTimeoutTimer.C:
if *c.effectiveTransport == TransportUDP || err := c.checkTimeout()
*c.effectiveTransport == TransportUDPMulticast {
if c.checkStreamInitial {
c.checkStreamInitial = false
// check that at least one packet has been received
inTimeout := func() bool {
for _, ct := range c.medias {
lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime)
if lft != 0 {
return false
}
lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime)
if lft != 0 {
return false
}
}
return true
}()
if inTimeout {
err := c.trySwitchingProtocol()
if err != nil {
return err
}
}
} else {
inTimeout := func() bool {
now := time.Now()
for _, ct := range c.medias {
lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
}
return true
}()
if inTimeout {
return liberrors.ErrClientUDPTimeout{}
}
}
} else { // TCP
inTimeout := func() bool {
now := time.Now()
lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0)
return now.Sub(lft) >= c.ReadTimeout
}()
if inTimeout {
return liberrors.ErrClientTCPTimeout{}
}
}
c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod)
case <-c.keepaliveTimer.C:
_, err := c.do(&base.Request{
Method: func() base.Method {
// the VLC integrated rtsp server requires GET_PARAMETER
if c.useGetParameter {
return base.GetParameter
}
return base.Options
}(),
// use the stream base URL, otherwise some cameras do not reply
URL: c.baseURL,
}, true, false)
if err != nil { if err != nil {
return err return err
} }
c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
case <-c.keepaliveTimer.C:
err := c.doKeepalive()
if err != nil {
return err
}
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
case err := <-c.readerErr: case err := <-c.readError:
c.readerErr = nil c.reader = nil
return err return err
case <-c.ctx.Done(): case <-c.ctx.Done():
@@ -571,7 +504,8 @@ func (c *Client) runInner() error {
func (c *Client) doClose() { func (c *Client) doClose() {
if c.state != clientStatePlay && c.state != clientStateRecord && c.conn != nil { if c.state != clientStatePlay && c.state != clientStateRecord && c.conn != nil {
c.connCloserStop() c.connCloser.close()
c.connCloser = nil
} }
if c.state == clientStatePlay || c.state == clientStateRecord { if c.state == clientStatePlay || c.state == clientStateRecord {
@@ -690,22 +624,22 @@ func (c *Client) trySwitchingProtocol2(medi *media.Media, baseURL *url.URL) (*ba
} }
func (c *Client) playRecordStart() { func (c *Client) playRecordStart() {
// stop connCloser c.connCloser.close()
c.connCloserStop() c.connCloser = nil
if c.state == clientStatePlay { if c.state == clientStatePlay {
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod) c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
switch *c.effectiveTransport { switch *c.effectiveTransport {
case TransportUDP: case TransportUDP:
c.checkStreamTimer = time.NewTimer(c.InitialUDPReadTimeout) c.checkTimeoutTimer = time.NewTimer(c.InitialUDPReadTimeout)
c.checkStreamInitial = true c.checkTimeoutInitial = true
case TransportUDPMulticast: case TransportUDPMulticast:
c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod) c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
default: // TCP default: // TCP
c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod) c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
v := time.Now().Unix() v := time.Now().Unix()
c.tcpLastFrameTime = &v c.tcpLastFrameTime = &v
} }
@@ -726,68 +660,18 @@ func (c *Client) playRecordStart() {
cm.start() cm.start()
} }
// for some reason, SetReadDeadline() must always be called in the same c.reader = newClientReader(c)
// goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform a check with a ticker.
c.nconn.SetReadDeadline(time.Time{})
// start reader
c.readerErr = make(chan error)
go c.runReader()
}
func (c *Client) runReader() {
c.readerErr <- func() error {
if *c.effectiveTransport == TransportUDP || *c.effectiveTransport == TransportUDPMulticast {
for {
_, err := c.conn.ReadResponse()
if err != nil {
return err
}
}
} else {
for {
what, err := c.conn.ReadInterleavedFrameOrResponse()
if err != nil {
return err
}
if fr, ok := what.(*base.InterleavedFrame); ok {
channel := fr.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
media, ok := c.tcpMediasByChannel[channel]
if !ok {
continue
}
if isRTP {
err = media.readRTP(fr.Payload)
} else {
err = media.readRTCP(fr.Payload)
}
if err != nil {
return err
}
}
}
}
}()
} }
func (c *Client) playRecordStop(isClosing bool) { func (c *Client) playRecordStop(isClosing bool) {
// stop reader if c.reader != nil {
if c.readerErr != nil { c.reader.close()
c.nconn.SetReadDeadline(time.Now()) <-c.readError
<-c.readerErr c.reader = nil
} }
// stop timers // stop timers
c.checkStreamTimer = emptyTimer() c.checkTimeoutTimer = emptyTimer()
c.keepaliveTimer = emptyTimer() c.keepaliveTimer = emptyTimer()
c.writer.stop() c.writer.stop()
@@ -796,9 +680,8 @@ func (c *Client) playRecordStop(isClosing bool) {
cm.stop() cm.stop()
} }
// start connCloser
if !isClosing { if !isClosing {
c.connCloserStart() c.connCloser = newClientConnCloser(c.ctx, c.nconn)
} }
} }
@@ -821,10 +704,10 @@ func (c *Client) connOpen() error {
} }
} }
ctx, cancel := context.WithTimeout(c.ctx, c.ReadTimeout) dialCtx, dialCtxCancel := context.WithTimeout(c.ctx, c.ReadTimeout)
defer cancel() defer dialCtxCancel()
nconn, err := c.DialContext(ctx, "tcp", c.host) nconn, err := c.DialContext(dialCtx, "tcp", c.host)
if err != nil { if err != nil {
return err return err
} }
@@ -845,33 +728,11 @@ func (c *Client) connOpen() error {
c.nconn = nconn c.nconn = nconn
bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent) bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent)
c.conn = conn.NewConn(bc) c.conn = conn.NewConn(bc)
c.connCloser = newClientConnCloser(c.ctx, c.nconn)
c.connCloserStart()
return nil return nil
} }
func (c *Client) connCloserStart() {
c.connCloserTerminate = make(chan struct{})
c.connCloserDone = make(chan struct{})
go func() {
defer close(c.connCloserDone)
select {
case <-c.ctx.Done():
c.nconn.Close()
case <-c.connCloserTerminate:
}
}()
}
func (c *Client) connCloserStop() {
close(c.connCloserTerminate)
<-c.connCloserDone
c.connCloserDone = nil
}
func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*base.Response, error) { func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*base.Response, error) {
if c.nconn == nil { if c.nconn == nil {
err := c.connOpen() err := c.connOpen()
@@ -964,6 +825,82 @@ func (c *Client) do(req *base.Request, skipResponse bool, allowFrames bool) (*ba
return res, nil return res, nil
} }
func (c *Client) checkTimeout() error {
if *c.effectiveTransport == TransportUDP ||
*c.effectiveTransport == TransportUDPMulticast {
if c.checkTimeoutInitial {
c.checkTimeoutInitial = false
// check that at least one packet has been received
inTimeout := func() bool {
for _, ct := range c.medias {
lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime)
if lft != 0 {
return false
}
lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime)
if lft != 0 {
return false
}
}
return true
}()
if inTimeout {
err := c.trySwitchingProtocol()
if err != nil {
return err
}
}
} else {
inTimeout := func() bool {
now := time.Now()
for _, ct := range c.medias {
lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
}
return true
}()
if inTimeout {
return liberrors.ErrClientUDPTimeout{}
}
}
} else { // TCP
inTimeout := func() bool {
now := time.Now()
lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0)
return now.Sub(lft) >= c.ReadTimeout
}()
if inTimeout {
return liberrors.ErrClientTCPTimeout{}
}
}
return nil
}
func (c *Client) doKeepalive() error {
_, err := c.do(&base.Request{
Method: func() base.Method {
// the VLC integrated rtsp server requires GET_PARAMETER
if c.useGetParameter {
return base.GetParameter
}
return base.Options
}(),
// use the stream base URL, otherwise some cameras do not reply
URL: c.baseURL,
}, true, false)
return err
}
func (c *Client) doOptions(u *url.URL) (*base.Response, error) { func (c *Client) doOptions(u *url.URL) (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{ err := c.checkState(map[clientState]struct{}{
clientStateInitial: {}, clientStateInitial: {},

43
client_conn_closer.go Normal file
View File

@@ -0,0 +1,43 @@
package gortsplib
import (
"context"
"net"
)
type clientConnCloser struct {
ctx context.Context
nconn net.Conn
terminate chan struct{}
done chan struct{}
}
func newClientConnCloser(ctx context.Context, nconn net.Conn) *clientConnCloser {
cc := &clientConnCloser{
ctx: ctx,
nconn: nconn,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go cc.run()
return cc
}
func (cc *clientConnCloser) close() {
close(cc.terminate)
<-cc.done
}
func (cc *clientConnCloser) run() {
defer close(cc.done)
select {
case <-cc.ctx.Done():
cc.nconn.Close()
case <-cc.terminate:
}
}

83
client_reader.go Normal file
View File

@@ -0,0 +1,83 @@
package gortsplib
import (
"time"
"github.com/bluenviron/gortsplib/v3/pkg/base"
)
type clientReader struct {
c *Client
closeErr chan error
}
func newClientReader(c *Client) *clientReader {
r := &clientReader{
c: c,
closeErr: make(chan error),
}
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform a check with a ticker.
r.c.nconn.SetReadDeadline(time.Time{})
go r.run()
return r
}
func (r *clientReader) close() {
r.c.nconn.SetReadDeadline(time.Now())
}
func (r *clientReader) run() {
r.c.readError <- r.runInner()
}
func (r *clientReader) runInner() error {
if *r.c.effectiveTransport == TransportUDP || *r.c.effectiveTransport == TransportUDPMulticast {
for {
res, err := r.c.conn.ReadResponse()
if err != nil {
return err
}
r.c.OnResponse(res)
}
} else {
for {
what, err := r.c.conn.ReadInterleavedFrameOrResponse()
if err != nil {
return err
}
switch what := what.(type) {
case *base.Response:
r.c.OnResponse(what)
case *base.InterleavedFrame:
channel := what.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
media, ok := r.c.tcpMediasByChannel[channel]
if !ok {
continue
}
if isRTP {
err = media.readRTP(what.Payload)
} else {
err = media.readRTCP(what.Payload)
}
if err != nil {
return err
}
}
}
}
}

View File

@@ -189,7 +189,7 @@ type ServerSession struct {
announcedMedias media.Medias // publish announcedMedias media.Medias // publish
udpLastPacketTime *int64 // publish udpLastPacketTime *int64 // publish
udpCheckStreamTimer *time.Timer udpCheckStreamTimer *time.Timer
writer writer writer asyncProcessor
// in // in
chHandleRequest chan sessionRequestReq chHandleRequest chan sessionRequestReq