From f8ef945dae74c9d35d1f5acedba4a7517e1e79d0 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 10 May 2021 17:18:41 +0200 Subject: [PATCH] client: allow to call client.Close() always --- client.go | 7 + client_read_test.go | 6 + clientconn.go | 1276 ++++++++++++++++++++++++++++++--------- clientconnpublish.go | 187 ------ clientconnread.go | 267 -------- clientconnudpl.go | 9 +- pkg/base/request.go | 4 - pkg/liberrors/client.go | 8 + 8 files changed, 1019 insertions(+), 745 deletions(-) delete mode 100644 clientconnpublish.go delete mode 100644 clientconnread.go diff --git a/client.go b/client.go index af00c8ed..c3eed03f 100644 --- a/client.go +++ b/client.go @@ -1,3 +1,10 @@ +/* +Package gortsplib is a RTSP 1.0 library for the Go programming language, +written for rtsp-simple-server. + +Examples are available at https://github.com/aler9/gortsplib/tree/master/examples + +*/ package gortsplib import ( diff --git a/client_read_test.go b/client_read_test.go index ad657370..390865fb 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -381,6 +381,9 @@ func TestClientRead(t *testing.T) { <-frameRecv conn.Close() <-done + + <-conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + }) }) } } @@ -1232,6 +1235,9 @@ func TestClientReadPause(t *testing.T) { require.NoError(t, err) <-done + <-conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + }) + _, err = conn.Play() require.NoError(t, err) diff --git a/clientconn.go b/clientconn.go index 21f6af2e..6edd3cde 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1,14 +1,8 @@ -/* -Package gortsplib is a RTSP 1.0 library for the Go programming language, -written for rtsp-simple-server. - -Examples are available at https://github.com/aler9/gortsplib/tree/master/examples - -*/ package gortsplib import ( "bufio" + "context" "crypto/tls" "fmt" "math/rand" @@ -17,8 +11,11 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" + psdp "github.com/pion/sdp/v3" + "github.com/aler9/gortsplib/pkg/auth" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" @@ -35,6 +32,11 @@ const ( clientConnUDPKeepalivePeriod = 30 * time.Second ) +func isErrNOUDPPacketsReceivedRecently(err error) bool { + _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently) + return ok +} + type clientConnState int const ( @@ -69,38 +71,91 @@ func (s clientConnState) String() string { return "unknown" } +type optionsReq struct { + url *base.URL + res chan clientRes +} + +type describeReq struct { + url *base.URL + res chan clientRes +} + +type announceReq struct { + url *base.URL + tracks Tracks + res chan clientRes +} + +type setupReq struct { + mode headers.TransportMode + track *Track + rtpPort int + rtcpPort int + res chan clientRes +} + +type playReq struct { + res chan clientRes +} + +type recordReq struct { + res chan clientRes +} + +type pauseReq struct { + res chan clientRes +} + +type clientRes struct { + tracks Tracks + res *base.Response + err error +} + // ClientConn is a client-side RTSP connection. type ClientConn struct { c *Client + scheme string + host string + ctx context.Context + ctxCancel func() + state clientConnState nconn net.Conn - isTLS bool br *bufio.Reader bw *bufio.Writer session string - cseq int sender *auth.Sender - state clientConnState + cseq int + useGetParameter bool streamBaseURL *base.URL streamProtocol *StreamProtocol tracks map[int]clientConnTrack - useGetParameter bool - writeMutex sync.Mutex - writeFrameAllowed bool - writeError error backgroundRunning bool - readCB func(int, StreamType, []byte) - - // TCP stream protocol - tcpFrameBuffer *multibuffer.MultiBuffer - - // read - rtpInfo *headers.RTPInfo + backgroundErr error + rtpInfo *headers.RTPInfo // read + tcpFrameBuffer *multibuffer.MultiBuffer // tcp + tcpWriteMutex sync.Mutex // tcp + readCBMutex sync.RWMutex // read + readCB func(int, StreamType, []byte) // read + writeMutex sync.RWMutex // write + writeFrameAllowed bool // write // in + options chan optionsReq + describe chan describeReq + announce chan announceReq + setup chan setupReq + play chan playReq + record chan recordReq + pause chan pauseReq backgroundTerminate chan struct{} // out - backgroundDone chan struct{} + backgroundInnerDone chan error + backgroundDone chan struct{} + readCBSet chan struct{} + done chan struct{} } func newClientConn(c *Client, scheme string, host string) (*ClientConn, error) { @@ -143,118 +198,37 @@ func newClientConn(c *Client, scheme string, host string) (*ClientConn, error) { c.receiverReportPeriod = 10 * time.Second } + ctx, ctxCancel := context.WithCancel(context.Background()) + cc := &ClientConn{ - c: c, - tracks: make(map[int]clientConnTrack), - writeError: fmt.Errorf("not running"), + c: c, + scheme: scheme, + host: host, + ctx: ctx, + ctxCancel: ctxCancel, + tracks: make(map[int]clientConnTrack), + options: make(chan optionsReq), + describe: make(chan describeReq), + announce: make(chan announceReq), + setup: make(chan setupReq), + play: make(chan playReq), + record: make(chan recordReq), + pause: make(chan pauseReq), + done: make(chan struct{}), } - err := cc.connOpen(scheme, host) - if err != nil { - return nil, err - } + go cc.run() return cc, nil } -// Close closes all the ClientConn resources. +// Close closes the connection. func (cc *ClientConn) Close() error { - if cc.backgroundRunning { - close(cc.backgroundTerminate) - <-cc.backgroundDone - } - - if cc.state == clientConnStatePlay || cc.state == clientConnStateRecord { - cc.Do(&base.Request{ - Method: base.Teardown, - URL: cc.streamBaseURL, - SkipResponse: true, - }) - } - - for _, track := range cc.tracks { - if track.udpRTPListener != nil { - track.udpRTPListener.close() - track.udpRTCPListener.close() - } - } - - if cc.nconn != nil { - cc.nconn.Close() - } - + cc.ctxCancel() + <-cc.done return nil } -func (cc *ClientConn) reset() { - cc.Close() - - cc.state = clientConnStateInitial - cc.nconn = nil - cc.streamBaseURL = nil - cc.streamProtocol = nil - cc.tracks = make(map[int]clientConnTrack) - cc.useGetParameter = false - cc.backgroundRunning = false - - // read - cc.rtpInfo = nil - cc.tcpFrameBuffer = nil - cc.readCB = nil -} - -func (cc *ClientConn) connOpen(scheme string, host string) error { - if scheme != "rtsp" && scheme != "rtsps" { - return fmt.Errorf("unsupported scheme '%s'", scheme) - } - - v := StreamProtocolUDP - if scheme == "rtsps" && cc.c.StreamProtocol == &v { - return fmt.Errorf("RTSPS can't be used with UDP") - } - - if !strings.Contains(host, ":") { - host += ":554" - } - - nconn, err := cc.c.DialTimeout("tcp", host, cc.c.ReadTimeout) - if err != nil { - return err - } - - conn := func() net.Conn { - if scheme == "rtsps" { - return tls.Client(nconn, cc.c.TLSConfig) - } - return nconn - }() - - cc.nconn = nconn - cc.isTLS = (scheme == "rtsps") - cc.br = bufio.NewReaderSize(conn, clientConnReadBufferSize) - cc.bw = bufio.NewWriterSize(conn, clientConnWriteBufferSize) - return nil -} - -func (cc *ClientConn) checkState(allowed map[clientConnState]struct{}) error { - if _, ok := allowed[cc.state]; ok { - return nil - } - - allowedList := make([]fmt.Stringer, len(allowed)) - i := 0 - for a := range allowed { - allowedList[i] = a - i++ - } - return liberrors.ErrClientWrongState{AllowedList: allowedList, State: cc.state} -} - -// NetConn returns the underlying net.Conn. -func (cc *ClientConn) NetConn() net.Conn { - return cc.nconn -} - // Tracks returns all the tracks that the connection is reading or publishing. func (cc *ClientConn) Tracks() Tracks { var ret Tracks @@ -271,9 +245,556 @@ func (cc *ClientConn) Tracks() Tracks { return ret } -// Do writes a Request and reads a Response. -// Interleaved frames received before the response are ignored. -func (cc *ClientConn) Do(req *base.Request) (*base.Response, error) { +func (cc *ClientConn) run() { + defer close(cc.done) + +outer: + for { + select { + case req := <-cc.options: + res, err := cc.doOptions(req.url) + req.res <- clientRes{res: res, err: err} + + case req := <-cc.describe: + tracks, res, err := cc.doDescribe(req.url) + req.res <- clientRes{tracks: tracks, res: res, err: err} + + case req := <-cc.announce: + res, err := cc.doAnnounce(req.url, req.tracks) + req.res <- clientRes{res: res, err: err} + + case req := <-cc.setup: + res, err := cc.doSetup(req.mode, req.track, req.rtpPort, req.rtcpPort) + req.res <- clientRes{res: res, err: err} + + case req := <-cc.play: + res, err := cc.doPlay(false) + req.res <- clientRes{res: res, err: err} + + case req := <-cc.record: + res, err := cc.doRecord() + req.res <- clientRes{res: res, err: err} + + case req := <-cc.pause: + res, err := cc.doPause() + req.res <- clientRes{res: res, err: err} + + case err := <-cc.backgroundInnerDone: + cc.backgroundRunning = false + err = cc.switchProtocolIfTimeout(err) + if err != nil { + cc.backgroundErr = err + close(cc.backgroundDone) + + cc.writeMutex.Lock() + cc.writeFrameAllowed = false + cc.writeMutex.Unlock() + } + + case <-cc.ctx.Done(): + break outer + } + } + + cc.doClose(false) +} + +func (cc *ClientConn) doClose(isSwitchingProtocol bool) { + if cc.backgroundRunning { + cc.backgroundClose(isSwitchingProtocol) + } + + if cc.state == clientConnStatePlay || cc.state == clientConnStateRecord { + cc.do(&base.Request{ + Method: base.Teardown, + URL: cc.streamBaseURL, + }, true) + } + + for _, track := range cc.tracks { + if track.udpRTPListener != nil { + track.udpRTPListener.close() + track.udpRTCPListener.close() + } + } + + if cc.nconn != nil { + cc.nconn.Close() + cc.nconn = nil + } +} + +func (cc *ClientConn) reset(isSwitchingProtocol bool) { + cc.doClose(isSwitchingProtocol) + + cc.state = clientConnStateInitial + cc.session = "" + cc.sender = nil + cc.cseq = 0 + cc.useGetParameter = false + cc.streamBaseURL = nil + cc.streamProtocol = nil + cc.tracks = make(map[int]clientConnTrack) + cc.tcpFrameBuffer = nil + cc.rtpInfo = nil + + if !isSwitchingProtocol { + cc.readCB = nil + } +} + +func (cc *ClientConn) checkState(allowed map[clientConnState]struct{}) error { + if _, ok := allowed[cc.state]; ok { + return nil + } + + allowedList := make([]fmt.Stringer, len(allowed)) + i := 0 + for a := range allowed { + allowedList[i] = a + i++ + } + return liberrors.ErrClientWrongState{AllowedList: allowedList, State: cc.state} +} + +func (cc *ClientConn) switchProtocolIfTimeout(err error) error { + if *cc.streamProtocol != StreamProtocolUDP || + cc.state != clientConnStatePlay || + !isErrNOUDPPacketsReceivedRecently(err) || + cc.c.StreamProtocol != nil { + return err + } + + prevBaseURL := cc.streamBaseURL + oldUseGetParameter := cc.useGetParameter + prevTracks := cc.tracks + + cc.reset(true) + + v := StreamProtocolTCP + cc.streamProtocol = &v + cc.useGetParameter = oldUseGetParameter + cc.scheme = prevBaseURL.Scheme + cc.host = prevBaseURL.Host + + err = cc.connOpen() + if err != nil { + return err + } + + for _, track := range prevTracks { + _, err := cc.doSetup(headers.TransportModePlay, track.track, 0, 0) + if err != nil { + return err + } + } + + _, err = cc.doPlay(true) + if err != nil { + return err + } + + return nil +} + +func (cc *ClientConn) pullReadCB() func(int, StreamType, []byte) { + cc.readCBMutex.RLock() + defer cc.readCBMutex.RUnlock() + return cc.readCB +} + +func (cc *ClientConn) backgroundStart(isSwitchingProtocol bool) { + cc.writeMutex.Lock() + cc.writeFrameAllowed = true + cc.writeMutex.Unlock() + + cc.backgroundRunning = true + cc.backgroundTerminate = make(chan struct{}) + cc.backgroundInnerDone = make(chan error) + + if !isSwitchingProtocol { + cc.backgroundDone = make(chan struct{}) + } + + go cc.runBackground() +} + +func (cc *ClientConn) backgroundClose(isSwitchingProtocol bool) { + close(cc.backgroundTerminate) + err := <-cc.backgroundInnerDone + cc.backgroundRunning = false + + if !isSwitchingProtocol { + cc.backgroundErr = err + close(cc.backgroundDone) + } + + cc.writeMutex.Lock() + cc.writeFrameAllowed = false + cc.writeMutex.Unlock() +} + +func (cc *ClientConn) runBackground() { + cc.backgroundInnerDone <- func() error { + if cc.state == clientConnStatePlay { + if *cc.streamProtocol == StreamProtocolUDP { + return cc.runBackgroundPlayUDP() + } + return cc.runBackgroundPlayTCP() + } + + if *cc.streamProtocol == StreamProtocolUDP { + return cc.runBackgroundRecordUDP() + } + return cc.runBackgroundRecordTCP() + }() +} + +func (cc *ClientConn) runBackgroundPlayUDP() error { + // open the firewall by sending packets to the counterpart + for _, cct := range cc.tracks { + cct.udpRTPListener.write( + []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) + + cct.udpRTCPListener.write( + []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) + } + + for _, cct := range cc.tracks { + cct.udpRTPListener.start() + cct.udpRTCPListener.start() + } + + defer func() { + for _, cct := range cc.tracks { + cct.udpRTPListener.stop() + cct.udpRTCPListener.stop() + } + }() + + // disable deadline + cc.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(cc.br) + if err != nil { + readerDone <- err + return + } + } + }() + + reportTicker := time.NewTicker(cc.c.receiverReportPeriod) + defer reportTicker.Stop() + + keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod) + defer keepaliveTicker.Stop() + + checkStreamInitial := true + checkStreamTicker := time.NewTicker(cc.c.InitialUDPReadTimeout) + defer func() { + checkStreamTicker.Stop() + }() + + for { + select { + case <-cc.backgroundTerminate: + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range cc.tracks { + rr := cct.rtcpReceiver.Report(now) + cc.WriteFrame(trackID, StreamTypeRTCP, rr) + } + + case <-keepaliveTicker.C: + _, err := cc.do(&base.Request{ + Method: func() base.Method { + // the vlc integrated rtsp server requires GET_PARAMETER + if cc.useGetParameter { + return base.GetParameter + } + return base.Options + }(), + // use the stream base URL, otherwise some cameras do not reply + URL: cc.streamBaseURL, + }, true) + if err != nil { + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return err + } + + case <-checkStreamTicker.C: + if checkStreamInitial { + // check that at least one packet has been received + inTimeout := func() bool { + for _, cct := range cc.tracks { + lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) + if lft != 0 { + return false + } + + lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) + if lft != 0 { + return false + } + } + return true + }() + if inTimeout { + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientNoUDPPacketsRecently{} + } + + checkStreamInitial = false + checkStreamTicker.Stop() + checkStreamTicker = time.NewTicker(clientConnCheckStreamPeriod) + + } else { + inTimeout := func() bool { + now := time.Now() + for _, cct := range cc.tracks { + lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) + if now.Sub(lft) < cc.c.ReadTimeout { + return false + } + + lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) + if now.Sub(lft) < cc.c.ReadTimeout { + return false + } + } + return true + }() + if inTimeout { + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientUDPTimeout{} + } + } + + case err := <-readerDone: + return err + } + } +} + +func (cc *ClientConn) runBackgroundPlayTCP() error { + // for some reason, SetReadDeadline() must always be called in the same + // goroutine, otherwise Read() freezes. + // therefore, we disable the deadline and perform check with a ticker. + cc.nconn.SetReadDeadline(time.Time{}) + + var lastFrameTime int64 + + readerDone := make(chan error) + go func() { + for { + frame := base.InterleavedFrame{ + Payload: cc.tcpFrameBuffer.Next(), + } + err := frame.Read(cc.br) + if err != nil { + readerDone <- err + return + } + + track, ok := cc.tracks[frame.TrackID] + if !ok { + continue + } + + now := time.Now() + atomic.StoreInt64(&lastFrameTime, now.Unix()) + track.rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload) + cc.pullReadCB()(frame.TrackID, frame.StreamType, frame.Payload) + } + }() + + reportTicker := time.NewTicker(cc.c.receiverReportPeriod) + defer reportTicker.Stop() + + checkStreamTicker := time.NewTicker(clientConnCheckStreamPeriod) + defer checkStreamTicker.Stop() + + for { + select { + case <-cc.backgroundTerminate: + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range cc.tracks { + rr := cct.rtcpReceiver.Report(now) + cc.WriteFrame(trackID, StreamTypeRTCP, rr) + } + + case <-checkStreamTicker.C: + inTimeout := func() bool { + now := time.Now() + lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0) + return now.Sub(lft) >= cc.c.ReadTimeout + }() + if inTimeout { + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientTCPTimeout{} + } + + case err := <-readerDone: + return err + } + } +} + +func (cc *ClientConn) runBackgroundRecordUDP() error { + for _, cct := range cc.tracks { + cct.udpRTPListener.start() + cct.udpRTCPListener.start() + } + + defer func() { + for _, cct := range cc.tracks { + cct.udpRTPListener.stop() + cct.udpRTCPListener.stop() + } + }() + + // disable deadline + cc.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(cc.br) + if err != nil { + readerDone <- err + return + } + } + }() + + reportTicker := time.NewTicker(cc.c.senderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-cc.backgroundTerminate: + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range cc.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + cc.WriteFrame(trackID, StreamTypeRTCP, sr) + } + } + + case err := <-readerDone: + return err + } + } +} + +func (cc *ClientConn) runBackgroundRecordTCP() error { + // disable deadline + cc.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + frame := base.InterleavedFrame{ + Payload: cc.tcpFrameBuffer.Next(), + } + err := frame.Read(cc.br) + if err != nil { + readerDone <- err + return + } + + cc.pullReadCB()(frame.TrackID, frame.StreamType, frame.Payload) + } + }() + + reportTicker := time.NewTicker(cc.c.senderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-cc.backgroundTerminate: + cc.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range cc.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + cc.WriteFrame(trackID, StreamTypeRTCP, sr) + } + } + + case err := <-readerDone: + return err + } + } +} + +func (cc *ClientConn) connOpen() error { + if cc.scheme != "rtsp" && cc.scheme != "rtsps" { + return fmt.Errorf("unsupported scheme '%s'", cc.scheme) + } + + v := StreamProtocolUDP + if cc.scheme == "rtsps" && cc.c.StreamProtocol == &v { + return fmt.Errorf("RTSPS can't be used with UDP") + } + + if !strings.Contains(cc.host, ":") { + cc.host += ":554" + } + + nconn, err := cc.c.DialTimeout("tcp", cc.host, cc.c.ReadTimeout) + if err != nil { + return err + } + + conn := func() net.Conn { + if cc.scheme == "rtsps" { + return tls.Client(nconn, cc.c.TLSConfig) + } + return nconn + }() + + cc.nconn = nconn + cc.br = bufio.NewReaderSize(conn, clientConnReadBufferSize) + cc.bw = bufio.NewWriterSize(conn, clientConnWriteBufferSize) + return nil +} + +func (cc *ClientConn) do(req *base.Request, skipResponse bool) (*base.Response, error) { + if cc.nconn == nil { + err := cc.connOpen() + if err != nil { + return nil, err + } + } + if req.Header == nil { req.Header = make(base.Header) } @@ -299,35 +820,60 @@ func (cc *ClientConn) Do(req *base.Request) (*base.Response, error) { cc.c.OnRequest(req) } - cc.nconn.SetWriteDeadline(time.Now().Add(cc.c.WriteTimeout)) - err := req.Write(cc.bw) + var res base.Response + + err := func() error { + // the only two do() with skipResponses are + // - TEARDOWN -> ctx is already canceled, so this can't be used + // - keepalives -> if ctx is canceled during a keepalive, + // it's better not to close the request, but wait until teardown + if !skipResponse { + readWriteDone := make(chan struct{}) + defer close(readWriteDone) + + go func() { + select { + case <-cc.ctx.Done(): + cc.nconn.Close() + case <-readWriteDone: + } + }() + } + + cc.nconn.SetWriteDeadline(time.Now().Add(cc.c.WriteTimeout)) + err := req.Write(cc.bw) + if err != nil { + return err + } + + if skipResponse { + return nil + } + + cc.nconn.SetReadDeadline(time.Now().Add(cc.c.ReadTimeout)) + + if cc.tcpFrameBuffer != nil { + // read the response and ignore interleaved frames in between; + // interleaved frames are sent in two scenarios: + // * when the server is v4lrtspserver, before the PLAY response + // * when the stream is already playing + err = res.ReadIgnoreFrames(cc.br, cc.tcpFrameBuffer.Next()) + if err != nil { + return err + } + } else { + err = res.Read(cc.br) + if err != nil { + return err + } + } + + return nil + }() if err != nil { return nil, err } - if req.SkipResponse { - return nil, nil - } - - var res base.Response - cc.nconn.SetReadDeadline(time.Now().Add(cc.c.ReadTimeout)) - - if cc.tcpFrameBuffer != nil { - // read the response and ignore interleaved frames in between; - // interleaved frames are sent in two scenarios: - // * when the server is v4lrtspserver, before the PLAY response - // * when the stream is already playing - err = res.ReadIgnoreFrames(cc.br, cc.tcpFrameBuffer.Next()) - if err != nil { - return nil, err - } - } else { - err = res.Read(cc.br) - if err != nil { - return nil, err - } - } - if cc.c.OnResponse != nil { cc.c.OnResponse(&res) } @@ -354,14 +900,13 @@ func (cc *ClientConn) Do(req *base.Request) (*base.Response, error) { cc.sender = sender // send request again - return cc.Do(req) + return cc.do(req, false) } return &res, nil } -// Options writes an OPTIONS request and reads a response. -func (cc *ClientConn) Options(u *base.URL) (*base.Response, error) { +func (cc *ClientConn) doOptions(u *base.URL) (*base.Response, error) { err := cc.checkState(map[clientConnState]struct{}{ clientConnStateInitial: {}, clientConnStatePrePlay: {}, @@ -371,10 +916,10 @@ func (cc *ClientConn) Options(u *base.URL) (*base.Response, error) { return nil, err } - res, err := cc.Do(&base.Request{ + res, err := cc.do(&base.Request{ Method: base.Options, URL: u, - }) + }, false) if err != nil { return nil, err } @@ -405,8 +950,20 @@ func (cc *ClientConn) Options(u *base.URL) (*base.Response, error) { return res, nil } -// Describe writes a DESCRIBE request and reads a Response. -func (cc *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { +// Options writes an OPTIONS request and reads a response. +func (cc *ClientConn) Options(u *base.URL) (*base.Response, error) { + cres := make(chan clientRes) + select { + case cc.options <- optionsReq{url: u, res: cres}: + res := <-cres + return res.res, res.err + + case <-cc.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (cc *ClientConn) doDescribe(u *base.URL) (Tracks, *base.Response, error) { err := cc.checkState(map[clientConnState]struct{}{ clientConnStateInitial: {}, clientConnStatePrePlay: {}, @@ -416,13 +973,13 @@ func (cc *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { return nil, nil, err } - res, err := cc.Do(&base.Request{ + res, err := cc.do(&base.Request{ Method: base.Describe, URL: u, Header: base.Header{ "Accept": base.HeaderValue{"application/sdp"}, }, - }) + }, false) if err != nil { return nil, nil, err } @@ -434,24 +991,27 @@ func (cc *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { res.StatusCode <= base.StatusUseProxy && len(res.Header["Location"]) == 1 { - cc.reset() + cc.reset(false) u, err := base.ParseURL(res.Header["Location"][0]) if err != nil { return nil, nil, err } - err = cc.connOpen(u.Scheme, u.Host) + cc.scheme = u.Scheme + cc.host = u.Host + + err = cc.connOpen() if err != nil { return nil, nil, err } - _, err = cc.Options(u) + _, err = cc.doOptions(u) if err != nil { return nil, nil, err } - return cc.Describe(u) + return cc.doDescribe(u) } return nil, res, liberrors.ErrClientWrongStatusCode{Code: res.StatusCode, Message: res.StatusMessage} @@ -496,11 +1056,83 @@ func (cc *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { return tracks, res, nil } -// Setup writes a SETUP request and reads a Response. -// rtpPort and rtcpPort are used only if protocol is UDP. -// if rtpPort and rtcpPort are zero, they are chosen automatically. -func (cc *ClientConn) Setup(mode headers.TransportMode, track *Track, - rtpPort int, rtcpPort int) (*base.Response, error) { +// Describe writes a DESCRIBE request and reads a Response. +func (cc *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { + cres := make(chan clientRes) + select { + case cc.describe <- describeReq{url: u, res: cres}: + res := <-cres + return res.tracks, res.res, res.err + + case <-cc.ctx.Done(): + return nil, nil, liberrors.ErrClientTerminated{} + } +} + +func (cc *ClientConn) doAnnounce(u *base.URL, tracks Tracks) (*base.Response, error) { + err := cc.checkState(map[clientConnState]struct{}{ + clientConnStateInitial: {}, + }) + if err != nil { + return nil, err + } + + // in case of ANNOUNCE, the base URL doesn't have a trailing slash. + // (tested with ffmpeg and gstreamer) + baseURL := u.Clone() + + // set id, base url and control attribute on tracks + for i, t := range tracks { + t.ID = i + t.BaseURL = baseURL + t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=" + strconv.FormatInt(int64(i), 10), + }) + } + + res, err := cc.do(&base.Request{ + Method: base.Announce, + URL: u, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: tracks.Write(), + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, liberrors.ErrClientWrongStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage} + } + + cc.streamBaseURL = baseURL + cc.state = clientConnStatePreRecord + + return res, nil +} + +// Announce writes an ANNOUNCE request and reads a Response. +func (cc *ClientConn) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { + cres := make(chan clientRes) + select { + case cc.announce <- announceReq{url: u, tracks: tracks, res: cres}: + res := <-cres + return res.res, res.err + + case <-cc.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (cc *ClientConn) doSetup( + mode headers.TransportMode, + track *Track, + rtpPort int, + rtcpPort int) (*base.Response, error) { + err := cc.checkState(map[clientConnState]struct{}{ clientConnStateInitial: {}, clientConnStatePrePlay: {}, @@ -524,7 +1156,7 @@ func (cc *ClientConn) Setup(mode headers.TransportMode, track *Track, var rtcpListener *clientConnUDPListener // always use TCP if encrypted - if cc.isTLS { + if cc.scheme == "rtsps" { v := StreamProtocolTCP cc.streamProtocol = &v } @@ -619,13 +1251,13 @@ func (cc *ClientConn) Setup(mode headers.TransportMode, track *Track, return nil, err } - res, err := cc.Do(&base.Request{ + res, err := cc.do(&base.Request{ Method: base.Setup, URL: trackURL, Header: base.Header{ "Transport": th.Write(), }, - }) + }, false) if err != nil { if proto == StreamProtocolUDP { rtpListener.close() @@ -648,7 +1280,7 @@ func (cc *ClientConn) Setup(mode headers.TransportMode, track *Track, v := StreamProtocolTCP cc.streamProtocol = &v - return cc.Setup(mode, track, 0, 0) + return cc.doSetup(mode, track, 0, 0) } return res, liberrors.ErrClientWrongStatusCode{Code: res.StatusCode, Message: res.StatusMessage} @@ -744,9 +1376,146 @@ func (cc *ClientConn) Setup(mode headers.TransportMode, track *Track, return res, nil } -// Pause writes a PAUSE request and reads a Response. -// This can be called only after Play() or Record(). -func (cc *ClientConn) Pause() (*base.Response, error) { +// Setup writes a SETUP request and reads a Response. +// rtpPort and rtcpPort are used only if protocol is UDP. +// if rtpPort and rtcpPort are zero, they are chosen automatically. +func (cc *ClientConn) Setup( + mode headers.TransportMode, + track *Track, + rtpPort int, + rtcpPort int) (*base.Response, error) { + + cres := make(chan clientRes) + select { + case cc.setup <- setupReq{ + mode: mode, + track: track, + rtpPort: rtpPort, + rtcpPort: rtcpPort, + res: cres, + }: + res := <-cres + return res.res, res.err + + case <-cc.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (cc *ClientConn) doPlay(isSwitchingProtocol bool) (*base.Response, error) { + err := cc.checkState(map[clientConnState]struct{}{ + clientConnStatePrePlay: {}, + }) + if err != nil { + return nil, err + } + + res, err := cc.do(&base.Request{ + Method: base.Play, + URL: cc.streamBaseURL, + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, liberrors.ErrClientWrongStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage} + } + + if v, ok := res.Header["RTP-Info"]; ok { + var ri headers.RTPInfo + err := ri.Read(v) + if err != nil { + return nil, liberrors.ErrClientRTPInfoInvalid{Err: err} + } + cc.rtpInfo = &ri + } + + cc.state = clientConnStatePlay + + if !isSwitchingProtocol { + // use a temporary callback that is replaces as soon as + // the user calls ReadFrames() + cc.readCBSet = make(chan struct{}) + copy := cc.readCBSet + cc.readCB = func(trackID int, streamType base.StreamType, payload []byte) { + select { + case <-copy: + case <-cc.ctx.Done(): + return + } + cc.pullReadCB()(trackID, streamType, payload) + } + } + + cc.backgroundStart(isSwitchingProtocol) + + return res, nil +} + +// Play writes a PLAY request and reads a Response. +// This can be called only after Setup(). +func (cc *ClientConn) Play() (*base.Response, error) { + cres := make(chan clientRes) + select { + case cc.play <- playReq{res: cres}: + res := <-cres + return res.res, res.err + + case <-cc.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (cc *ClientConn) doRecord() (*base.Response, error) { + err := cc.checkState(map[clientConnState]struct{}{ + clientConnStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + res, err := cc.do(&base.Request{ + Method: base.Record, + URL: cc.streamBaseURL, + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, liberrors.ErrClientWrongStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage} + } + + cc.state = clientConnStateRecord + + // when publishing, calling ReadFrames() is not mandatory + // use an empty callback + cc.readCB = func(trackID int, streamType base.StreamType, payload []byte) { + } + + cc.backgroundStart(false) + + return nil, nil +} + +// Record writes a RECORD request and reads a Response. +// This can be called only after Announce() and Setup(). +func (cc *ClientConn) Record() (*base.Response, error) { + cres := make(chan clientRes) + select { + case cc.record <- recordReq{res: cres}: + res := <-cres + return res.res, res.err + + case <-cc.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (cc *ClientConn) doPause() (*base.Response, error) { err := cc.checkState(map[clientConnState]struct{}{ clientConnStatePlay: {}, clientConnStateRecord: {}, @@ -755,14 +1524,12 @@ func (cc *ClientConn) Pause() (*base.Response, error) { return nil, err } - close(cc.backgroundTerminate) - <-cc.backgroundDone - cc.backgroundRunning = false + cc.backgroundClose(false) - res, err := cc.Do(&base.Request{ + res, err := cc.do(&base.Request{ Method: base.Pause, URL: cc.streamBaseURL, - }) + }, false) if err != nil { return nil, err } @@ -782,15 +1549,50 @@ func (cc *ClientConn) Pause() (*base.Response, error) { return res, nil } +// Pause writes a PAUSE request and reads a Response. +// This can be called only after Play() or Record(). +func (cc *ClientConn) Pause() (*base.Response, error) { + cres := make(chan clientRes) + select { + case cc.pause <- pauseReq{res: cres}: + res := <-cres + return res.res, res.err + + case <-cc.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +// ReadFrames starts reading frames. +// it returns a channel that is written when the reading stops. +func (cc *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan error { + cc.readCBMutex.Lock() + cc.readCB = onFrame + cc.readCBMutex.Unlock() + + // replace temporary callback with final callback + if cc.readCBSet != nil { + close(cc.readCBSet) + cc.readCBSet = nil + } + + ch := make(chan error, 1) + go func() { + <-cc.backgroundDone + ch <- cc.backgroundErr + }() + return ch +} + // WriteFrame writes a frame. func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []byte) error { now := time.Now() - cc.writeMutex.Lock() - defer cc.writeMutex.Unlock() + cc.writeMutex.RLock() + defer cc.writeMutex.RUnlock() if !cc.writeFrameAllowed { - return cc.writeError + return cc.backgroundErr } if cc.tracks[trackID].rtcpSender != nil { @@ -804,109 +1606,13 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b return cc.tracks[trackID].udpRTCPListener.write(payload) } + cc.tcpWriteMutex.Lock() + defer cc.tcpWriteMutex.Unlock() + cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout)) - frame := base.InterleavedFrame{ + return base.InterleavedFrame{ TrackID: trackID, StreamType: streamType, Payload: payload, - } - return frame.Write(cc.bw) -} - -// ReadFrames starts reading frames. -// it returns a channel that is written when the reading stops. -func (cc *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan error { - // channel is buffered, since listening to it is not mandatory - done := make(chan error, 1) - - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePlay: {}, - clientConnStateRecord: {}, - }) - if err != nil { - done <- err - return done - } - - // close previous ReadFrames() - if cc.backgroundRunning { - close(cc.backgroundTerminate) - <-cc.backgroundDone - } - - cc.backgroundRunning = true - cc.backgroundTerminate = make(chan struct{}) - cc.backgroundDone = make(chan struct{}) - cc.readCB = onFrame - cc.writeFrameAllowed = true - - go func() { - done <- func() error { - safeState := cc.state - err := func() error { - if *cc.streamProtocol == StreamProtocolUDP { - if cc.state == clientConnStatePlay { - return cc.backgroundPlayUDP() - } - return cc.backgroundRecordUDP() - } - - if cc.state == clientConnStatePlay { - return cc.backgroundPlayTCP() - } - return cc.backgroundRecordTCP() - }() - - cc.writeError = err - - func() { - cc.writeMutex.Lock() - defer cc.writeMutex.Unlock() - cc.writeFrameAllowed = false - }() - - close(cc.backgroundDone) - - // automatically change protocol in case of timeout - if *cc.streamProtocol == StreamProtocolUDP && - safeState == clientConnStatePlay { - if _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently); ok { - if cc.c.StreamProtocol == nil { - prevBaseURL := cc.streamBaseURL - oldUseGetParameter := cc.useGetParameter - prevTracks := cc.tracks - cc.reset() - v := StreamProtocolTCP - cc.streamProtocol = &v - cc.useGetParameter = oldUseGetParameter - - err := cc.connOpen(prevBaseURL.Scheme, prevBaseURL.Host) - if err != nil { - return err - } - - for _, track := range prevTracks { - _, err := cc.Setup(headers.TransportModePlay, track.track, 0, 0) - if err != nil { - cc.Close() - return err - } - } - - _, err = cc.Play() - if err != nil { - cc.Close() - return err - } - - return <-cc.ReadFrames(onFrame) - } - } - } - - return err - }() - }() - - return done + }.Write(cc.bw) } diff --git a/clientconnpublish.go b/clientconnpublish.go deleted file mode 100644 index efa4e72b..00000000 --- a/clientconnpublish.go +++ /dev/null @@ -1,187 +0,0 @@ -package gortsplib - -import ( - "fmt" - "strconv" - "time" - - psdp "github.com/pion/sdp/v3" - - "github.com/aler9/gortsplib/pkg/base" - "github.com/aler9/gortsplib/pkg/liberrors" -) - -// Announce writes an ANNOUNCE request and reads a Response. -func (cc *ClientConn) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStateInitial: {}, - }) - if err != nil { - return nil, err - } - - // in case of ANNOUNCE, the base URL doesn't have a trailing slash. - // (tested with ffmpeg and gstreamer) - baseURL := u.Clone() - - // set id, base url and control attribute on tracks - for i, t := range tracks { - t.ID = i - t.BaseURL = baseURL - t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ - Key: "control", - Value: "trackID=" + strconv.FormatInt(int64(i), 10), - }) - } - - res, err := cc.Do(&base.Request{ - Method: base.Announce, - URL: u, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Body: tracks.Write(), - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, liberrors.ErrClientWrongStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage} - } - - cc.streamBaseURL = baseURL - cc.state = clientConnStatePreRecord - - return res, nil -} - -// Record writes a RECORD request and reads a Response. -// This can be called only after Announce() and Setup(). -func (cc *ClientConn) Record() (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePreRecord: {}, - }) - if err != nil { - return nil, err - } - - res, err := cc.Do(&base.Request{ - Method: base.Record, - URL: cc.streamBaseURL, - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, liberrors.ErrClientWrongStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage} - } - - cc.state = clientConnStateRecord - - cc.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { - }) - - return nil, nil -} - -func (cc *ClientConn) backgroundRecordUDP() error { - for _, cct := range cc.tracks { - cct.udpRTPListener.start() - cct.udpRTCPListener.start() - } - - defer func() { - for _, cct := range cc.tracks { - cct.udpRTPListener.stop() - cct.udpRTCPListener.stop() - } - }() - - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(cc.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(cc.c.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - cc.WriteFrame(trackID, StreamTypeRTCP, sr) - } - } - - case err := <-readerDone: - return err - } - } -} - -func (cc *ClientConn) backgroundRecordTCP() error { - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - frame := base.InterleavedFrame{ - Payload: cc.tcpFrameBuffer.Next(), - } - err := frame.Read(cc.br) - if err != nil { - readerDone <- err - return - } - - cc.readCB(frame.TrackID, frame.StreamType, frame.Payload) - } - }() - - reportTicker := time.NewTicker(cc.c.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - cc.WriteFrame(trackID, StreamTypeRTCP, sr) - } - } - - case err := <-readerDone: - return err - } - } -} diff --git a/clientconnread.go b/clientconnread.go deleted file mode 100644 index 6000e590..00000000 --- a/clientconnread.go +++ /dev/null @@ -1,267 +0,0 @@ -package gortsplib - -import ( - "fmt" - "sync/atomic" - "time" - - "github.com/aler9/gortsplib/pkg/base" - "github.com/aler9/gortsplib/pkg/headers" - "github.com/aler9/gortsplib/pkg/liberrors" -) - -// Play writes a PLAY request and reads a Response. -// This can be called only after Setup(). -func (cc *ClientConn) Play() (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePrePlay: {}, - }) - if err != nil { - return nil, err - } - - res, err := cc.Do(&base.Request{ - Method: base.Play, - URL: cc.streamBaseURL, - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, liberrors.ErrClientWrongStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage} - } - - if v, ok := res.Header["RTP-Info"]; ok { - var ri headers.RTPInfo - err := ri.Read(v) - if err != nil { - return nil, liberrors.ErrClientRTPInfoInvalid{Err: err} - } - cc.rtpInfo = &ri - } - - cc.state = clientConnStatePlay - - return res, nil -} - -// RTPInfo returns the RTP-Info header sent by the server in the PLAY response. -func (cc *ClientConn) RTPInfo() *headers.RTPInfo { - return cc.rtpInfo -} - -func (cc *ClientConn) backgroundPlayUDP() error { - // open the firewall by sending packets to the counterpart - for _, cct := range cc.tracks { - cct.udpRTPListener.write( - []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) - - cct.udpRTCPListener.write( - []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) - } - - for _, cct := range cc.tracks { - cct.udpRTPListener.start() - cct.udpRTCPListener.start() - } - - defer func() { - for _, cct := range cc.tracks { - cct.udpRTPListener.stop() - cct.udpRTCPListener.stop() - } - }() - - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(cc.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(cc.c.receiverReportPeriod) - defer reportTicker.Stop() - - keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod) - defer keepaliveTicker.Stop() - - checkStreamInitial := true - checkStreamTicker := time.NewTicker(cc.c.InitialUDPReadTimeout) - defer func() { - checkStreamTicker.Stop() - }() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for _, cct := range cc.tracks { - rr := cct.rtcpReceiver.Report(now) - cct.udpRTCPListener.write(rr) - } - - case <-keepaliveTicker.C: - _, err := cc.Do(&base.Request{ - Method: func() base.Method { - // the vlc integrated rtsp server requires GET_PARAMETER - if cc.useGetParameter { - return base.GetParameter - } - return base.Options - }(), - // use the stream base URL, otherwise some cameras do not reply - URL: cc.streamBaseURL, - SkipResponse: true, - }) - if err != nil { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return err - } - - case <-checkStreamTicker.C: - if checkStreamInitial { - // check that at least one packet has been received - inTimeout := func() bool { - for _, cct := range cc.tracks { - lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) - if lft != 0 { - return false - } - - lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) - if lft != 0 { - return false - } - } - return true - }() - if inTimeout { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientNoUDPPacketsRecently{} - } - - checkStreamInitial = false - checkStreamTicker.Stop() - checkStreamTicker = time.NewTicker(clientConnCheckStreamPeriod) - - } else { - inTimeout := func() bool { - now := time.Now() - for _, cct := range cc.tracks { - lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) - if now.Sub(lft) < cc.c.ReadTimeout { - return false - } - - lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) - if now.Sub(lft) < cc.c.ReadTimeout { - return false - } - } - return true - }() - if inTimeout { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientUDPTimeout{} - } - } - - case err := <-readerDone: - return err - } - } -} - -func (cc *ClientConn) backgroundPlayTCP() error { - // for some reason, SetReadDeadline() must always be called in the same - // goroutine, otherwise Read() freezes. - // therefore, we disable the deadline and perform check with a ticker. - cc.nconn.SetReadDeadline(time.Time{}) - - var lastFrameTime int64 - - readerDone := make(chan error) - go func() { - for { - frame := base.InterleavedFrame{ - Payload: cc.tcpFrameBuffer.Next(), - } - err := frame.Read(cc.br) - if err != nil { - readerDone <- err - return - } - - track, ok := cc.tracks[frame.TrackID] - if !ok { - continue - } - - now := time.Now() - atomic.StoreInt64(&lastFrameTime, now.Unix()) - track.rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload) - cc.readCB(frame.TrackID, frame.StreamType, frame.Payload) - } - }() - - reportTicker := time.NewTicker(cc.c.receiverReportPeriod) - defer reportTicker.Stop() - - checkStreamTicker := time.NewTicker(clientConnCheckStreamPeriod) - defer checkStreamTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - r := cct.rtcpReceiver.Report(now) - cc.nconn.SetWriteDeadline(time.Now().Add(cc.c.WriteTimeout)) - frame := base.InterleavedFrame{ - TrackID: trackID, - StreamType: StreamTypeRTCP, - Payload: r, - } - frame.Write(cc.bw) - } - - case <-checkStreamTicker.C: - inTimeout := func() bool { - now := time.Now() - lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0) - return now.Sub(lft) >= cc.c.ReadTimeout - }() - if inTimeout { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientTCPTimeout{} - } - - case err := <-readerDone: - return err - } - } -} diff --git a/clientconnudpl.go b/clientconnudpl.go index 5267ed61..63fcf90b 100644 --- a/clientconnudpl.go +++ b/clientconnudpl.go @@ -3,6 +3,7 @@ package gortsplib import ( "net" "strconv" + "sync" "sync/atomic" "time" @@ -25,6 +26,7 @@ type clientConnUDPListener struct { running bool frameBuffer *multibuffer.MultiBuffer lastFrameTime *int64 + writeMutex sync.Mutex done chan struct{} } @@ -90,7 +92,7 @@ func (l *clientConnUDPListener) run() { now := time.Now() atomic.StoreInt64(l.lastFrameTime, now.Unix()) l.cc.tracks[l.trackID].rtcpReceiver.ProcessFrame(now, l.streamType, buf[:n]) - l.cc.readCB(l.trackID, l.streamType, buf[:n]) + l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n]) } } else { for { @@ -108,12 +110,15 @@ func (l *clientConnUDPListener) run() { now := time.Now() atomic.StoreInt64(l.lastFrameTime, now.Unix()) - l.cc.readCB(l.trackID, l.streamType, buf[:n]) + l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n]) } } } func (l *clientConnUDPListener) write(buf []byte) error { + l.writeMutex.Lock() + defer l.writeMutex.Unlock() + l.pc.SetWriteDeadline(time.Now().Add(l.cc.c.WriteTimeout)) _, err := l.pc.WriteTo(buf, &net.UDPAddr{ IP: l.remoteIP, diff --git a/pkg/base/request.go b/pkg/base/request.go index 561ae3e7..2cac79e8 100644 --- a/pkg/base/request.go +++ b/pkg/base/request.go @@ -45,10 +45,6 @@ type Request struct { // optional body Body []byte - - // whether to wait for a response or not - // used only by ClientConn.Do() - SkipResponse bool } // Read reads a request. diff --git a/pkg/liberrors/client.go b/pkg/liberrors/client.go index 1946df5a..6445e3e5 100644 --- a/pkg/liberrors/client.go +++ b/pkg/liberrors/client.go @@ -6,6 +6,14 @@ import ( "github.com/aler9/gortsplib/pkg/base" ) +// ErrClientTerminated is an error that can be returned by a client. +type ErrClientTerminated struct{} + +// Error implements the error interface. +func (e ErrClientTerminated) Error() string { + return "terminated" +} + // ErrClientWrongState is an error that can be returned by a client. type ErrClientWrongState struct { AllowedList []fmt.Stringer