diff --git a/client.go b/client.go index fb263d0b..0cc8f50a 100644 --- a/client.go +++ b/client.go @@ -8,15 +8,125 @@ Examples are available at https://github.com/aler9/gortsplib/tree/master/example package gortsplib import ( + "bufio" "context" "crypto/tls" + "fmt" "net" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" "time" + psdp "github.com/pion/sdp/v3" + + "github.com/aler9/gortsplib/pkg/auth" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" + "github.com/aler9/gortsplib/pkg/liberrors" + "github.com/aler9/gortsplib/pkg/multibuffer" + "github.com/aler9/gortsplib/pkg/rtcpreceiver" + "github.com/aler9/gortsplib/pkg/rtcpsender" ) +const ( + clientReadBufferSize = 4096 + clientWriteBufferSize = 4096 + clientCheckStreamPeriod = 1 * time.Second + clientUDPKeepalivePeriod = 30 * time.Second +) + +func isErrNOUDPPacketsReceivedRecently(err error) bool { + _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently) + return ok +} + +func isAnyPort(p int) bool { + return p == 0 || p == 1 +} + +type clientState int + +const ( + clientStateInitial clientState = iota + clientStatePrePlay + clientStatePlay + clientStatePreRecord + clientStateRecord +) + +type clientTrack struct { + track *Track + udpRTPListener *clientUDPListener + udpRTCPListener *clientUDPListener + tcpChannel int + rtcpReceiver *rtcpreceiver.RTCPReceiver + rtcpSender *rtcpsender.RTCPSender +} + +func (s clientState) String() string { + switch s { + case clientStateInitial: + return "initial" + case clientStatePrePlay: + return "prePlay" + case clientStatePlay: + return "play" + case clientStatePreRecord: + return "preRecord" + case clientStateRecord: + return "record" + } + return "unknown" +} + +type optionsReq struct { + url *base.URL + res chan clientRes +} + +type describeReq struct { + url *base.URL + res chan clientRes +} + +type announceReq struct { + url *base.URL + tracks Tracks + res chan clientRes +} + +type setupReq struct { + mode headers.TransportMode + baseURL *base.URL + track *Track + rtpPort int + rtcpPort int + res chan clientRes +} + +type playReq struct { + ra *headers.Range + res chan clientRes +} + +type recordReq struct { + res chan clientRes +} + +type pauseReq struct { + res chan clientRes +} + +type clientRes struct { + tracks Tracks + baseURL *base.URL + res *base.Response + err error +} + // Client is a RTSP client. type Client struct { // @@ -80,28 +190,123 @@ type Client struct { senderReportPeriod time.Duration receiverReportPeriod time.Duration + + scheme string + host string + ctx context.Context + ctxCancel func() + state clientState + nconn net.Conn + br *bufio.Reader + bw *bufio.Writer + session string + sender *auth.Sender + cseq int + useGetParameter bool + streamBaseURL *base.URL + protocol *Transport + tracks map[int]clientTrack + tracksByChannel map[int]int + lastRange *headers.Range + backgroundRunning bool + backgroundErr error + tcpFrameBuffer *multibuffer.MultiBuffer // tcp + tcpWriteMutex sync.Mutex // tcp + readCBMutex sync.RWMutex // read + readCB func(int, StreamType, []byte) // read + writeMutex sync.RWMutex // write + writeFrameAllowed bool // write + + // in + options chan optionsReq + describe chan describeReq + announce chan announceReq + setup chan setupReq + play chan playReq + record chan recordReq + pause chan pauseReq + backgroundTerminate chan struct{} + + // out + backgroundInnerDone chan error + backgroundDone chan struct{} + readCBSet chan struct{} + done chan struct{} } // Dial connects to a server. -func (c *Client) Dial(scheme string, host string) (*ClientConn, error) { - return newClientConn(c, scheme, host) +func (c *Client) Dial(scheme string, host string) error { + // RTSP parameters + if c.ReadTimeout == 0 { + c.ReadTimeout = 10 * time.Second + } + if c.WriteTimeout == 0 { + c.WriteTimeout = 10 * time.Second + } + if c.TLSConfig == nil { + c.TLSConfig = &tls.Config{InsecureSkipVerify: true} + } + if c.InitialUDPReadTimeout == 0 { + c.InitialUDPReadTimeout = 3 * time.Second + } + if c.ReadBufferCount == 0 { + c.ReadBufferCount = 1 + } + if c.ReadBufferSize == 0 { + c.ReadBufferSize = 2048 + } + + // system functions + if c.DialContext == nil { + c.DialContext = (&net.Dialer{}).DialContext + } + if c.ListenPacket == nil { + c.ListenPacket = net.ListenPacket + } + + // private + if c.senderReportPeriod == 0 { + c.senderReportPeriod = 10 * time.Second + } + if c.receiverReportPeriod == 0 { + c.receiverReportPeriod = 10 * time.Second + } + + ctx, ctxCancel := context.WithCancel(context.Background()) + + c.scheme = scheme + c.host = host + c.ctx = ctx + c.ctxCancel = ctxCancel + c.options = make(chan optionsReq) + c.describe = make(chan describeReq) + c.announce = make(chan announceReq) + c.setup = make(chan setupReq) + c.play = make(chan playReq) + c.record = make(chan recordReq) + c.pause = make(chan pauseReq) + c.done = make(chan struct{}) + + go c.run() + + return nil } // DialRead connects to the address and starts reading all tracks. -func (c *Client) DialRead(address string) (*ClientConn, error) { +func (c *Client) DialRead(address string) error { return c.DialReadContext(context.Background(), address) } // DialReadContext connects to the address with the given context and starts reading all tracks. -func (c *Client) DialReadContext(ctx context.Context, address string) (*ClientConn, error) { +func (c *Client) DialReadContext(ctx context.Context, address string) error { u, err := base.ParseURL(address) if err != nil { - return nil, err + return err } - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) if err != nil { - return nil, err + return err } ctxHandlerDone := make(chan struct{}) @@ -114,55 +319,55 @@ func (c *Client) DialReadContext(ctx context.Context, address string) (*ClientCo defer close(ctxHandlerDone) select { case <-ctx.Done(): - conn.Close() + c.Close() case <-ctxHandlerTerminate: } }() - _, err = conn.Options(u) + _, err = c.Options(u) if err != nil { - conn.Close() - return nil, err + c.Close() + return err } - tracks, baseURL, _, err := conn.Describe(u) + tracks, baseURL, _, err := c.Describe(u) if err != nil { - conn.Close() - return nil, err + c.Close() + return err } for _, track := range tracks { - _, err := conn.Setup(headers.TransportModePlay, baseURL, track, 0, 0) + _, err := c.Setup(headers.TransportModePlay, baseURL, track, 0, 0) if err != nil { - conn.Close() - return nil, err + c.Close() + return err } } - _, err = conn.Play(nil) + _, err = c.Play(nil) if err != nil { - conn.Close() - return nil, err + c.Close() + return err } - return conn, nil + return nil } // DialPublish connects to the address and starts publishing the tracks. -func (c *Client) DialPublish(address string, tracks Tracks) (*ClientConn, error) { +func (c *Client) DialPublish(address string, tracks Tracks) error { return c.DialPublishContext(context.Background(), address, tracks) } // DialPublishContext connects to the address with the given context and starts publishing the tracks. -func (c *Client) DialPublishContext(ctx context.Context, address string, tracks Tracks) (*ClientConn, error) { +func (c *Client) DialPublishContext(ctx context.Context, address string, tracks Tracks) error { u, err := base.ParseURL(address) if err != nil { - return nil, err + return err } - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) if err != nil { - return nil, err + return err } ctxHandlerDone := make(chan struct{}) @@ -175,36 +380,1553 @@ func (c *Client) DialPublishContext(ctx context.Context, address string, tracks defer close(ctxHandlerDone) select { case <-ctx.Done(): - conn.Close() + c.Close() case <-ctxHandlerTerminate: } }() - _, err = conn.Options(u) + _, err = c.Options(u) if err != nil { - conn.Close() - return nil, err + c.Close() + return err } - _, err = conn.Announce(u, tracks) + _, err = c.Announce(u, tracks) if err != nil { - conn.Close() - return nil, err + c.Close() + return err } for _, track := range tracks { - _, err := conn.Setup(headers.TransportModeRecord, u, track, 0, 0) + _, err := c.Setup(headers.TransportModeRecord, u, track, 0, 0) + if err != nil { + c.Close() + return err + } + } + + _, err = c.Record() + if err != nil { + c.Close() + return err + } + + return nil +} + +// Close closes the connection and waits for all its resources to exit. +func (c *Client) Close() error { + c.ctxCancel() + <-c.done + return nil +} + +// Tracks returns all the tracks that the connection is reading or publishing. +func (c *Client) Tracks() Tracks { + ids := make([]int, len(c.tracks)) + pos := 0 + for id := range c.tracks { + ids[pos] = id + pos++ + } + sort.Slice(ids, func(a, b int) bool { + return ids[a] < ids[b] + }) + + var ret Tracks + for _, id := range ids { + ret = append(ret, c.tracks[id].track) + } + return ret +} + +func (c *Client) run() { + defer close(c.done) + +outer: + for { + select { + case req := <-c.options: + res, err := c.doOptions(req.url) + req.res <- clientRes{res: res, err: err} + + case req := <-c.describe: + tracks, baseURL, res, err := c.doDescribe(req.url) + req.res <- clientRes{tracks: tracks, baseURL: baseURL, res: res, err: err} + + case req := <-c.announce: + res, err := c.doAnnounce(req.url, req.tracks) + req.res <- clientRes{res: res, err: err} + + case req := <-c.setup: + res, err := c.doSetup(req.mode, req.baseURL, req.track, req.rtpPort, req.rtcpPort) + req.res <- clientRes{res: res, err: err} + + case req := <-c.play: + res, err := c.doPlay(req.ra, false) + req.res <- clientRes{res: res, err: err} + + case req := <-c.record: + res, err := c.doRecord() + req.res <- clientRes{res: res, err: err} + + case req := <-c.pause: + res, err := c.doPause() + req.res <- clientRes{res: res, err: err} + + case err := <-c.backgroundInnerDone: + c.backgroundRunning = false + err = c.switchProtocolIfTimeout(err) + if err != nil { + c.backgroundErr = err + close(c.backgroundDone) + + c.writeMutex.Lock() + c.writeFrameAllowed = false + c.writeMutex.Unlock() + } + + case <-c.ctx.Done(): + break outer + } + } + + c.ctxCancel() + + c.doClose(false) +} + +func (c *Client) doClose(isSwitchingProtocol bool) { + if c.backgroundRunning { + c.backgroundClose(isSwitchingProtocol) + } + + if c.state == clientStatePlay || c.state == clientStateRecord { + c.do(&base.Request{ + Method: base.Teardown, + URL: c.streamBaseURL, + }, true) + } + + for _, track := range c.tracks { + if track.udpRTPListener != nil { + track.udpRTPListener.close() + track.udpRTCPListener.close() + } + } + + if c.nconn != nil { + c.nconn.Close() + c.nconn = nil + } +} + +func (c *Client) reset(isSwitchingProtocol bool) { + c.doClose(isSwitchingProtocol) + + c.state = clientStateInitial + c.session = "" + c.sender = nil + c.cseq = 0 + c.useGetParameter = false + c.streamBaseURL = nil + c.protocol = nil + c.tracks = nil + c.tracksByChannel = nil + c.tcpFrameBuffer = nil + + if !isSwitchingProtocol { + c.readCB = nil + } +} + +func (c *Client) checkState(allowed map[clientState]struct{}) error { + if _, ok := allowed[c.state]; ok { + return nil + } + + allowedList := make([]fmt.Stringer, len(allowed)) + i := 0 + for a := range allowed { + allowedList[i] = a + i++ + } + + return liberrors.ErrClientInvalidState{AllowedList: allowedList, State: c.state} +} + +func (c *Client) switchProtocolIfTimeout(err error) error { + if *c.protocol != TransportUDP || + c.state != clientStatePlay || + !isErrNOUDPPacketsReceivedRecently(err) || + c.Transport != nil { + return err + } + + prevBaseURL := c.streamBaseURL + oldUseGetParameter := c.useGetParameter + prevTracks := c.tracks + + c.reset(true) + + v := TransportTCP + c.protocol = &v + c.useGetParameter = oldUseGetParameter + c.scheme = prevBaseURL.Scheme + c.host = prevBaseURL.Host + + for _, track := range prevTracks { + _, err := c.doSetup(headers.TransportModePlay, prevBaseURL, track.track, 0, 0) + if err != nil { + return err + } + } + + _, err = c.doPlay(c.lastRange, true) + if err != nil { + return err + } + + return nil +} + +func (c *Client) pullReadCB() func(int, StreamType, []byte) { + c.readCBMutex.RLock() + defer c.readCBMutex.RUnlock() + return c.readCB +} + +func (c *Client) backgroundStart(isSwitchingProtocol bool) { + c.writeMutex.Lock() + c.writeFrameAllowed = true + c.writeMutex.Unlock() + + c.backgroundRunning = true + c.backgroundTerminate = make(chan struct{}) + c.backgroundInnerDone = make(chan error) + + if !isSwitchingProtocol { + c.backgroundDone = make(chan struct{}) + } + + go c.runBackground() +} + +func (c *Client) backgroundClose(isSwitchingProtocol bool) { + close(c.backgroundTerminate) + err := <-c.backgroundInnerDone + c.backgroundRunning = false + + if !isSwitchingProtocol { + c.backgroundErr = err + close(c.backgroundDone) + } + + c.writeMutex.Lock() + c.writeFrameAllowed = false + c.writeMutex.Unlock() +} + +func (c *Client) runBackground() { + c.backgroundInnerDone <- func() error { + if c.state == clientStatePlay { + if *c.protocol == TransportUDP || *c.protocol == TransportUDPMulticast { + return c.runBackgroundPlayUDP() + } + return c.runBackgroundPlayTCP() + } + + if *c.protocol == TransportUDP { + return c.runBackgroundRecordUDP() + } + return c.runBackgroundRecordTCP() + }() +} + +func (c *Client) runBackgroundPlayUDP() error { + for _, cct := range c.tracks { + cct.udpRTPListener.start() + cct.udpRTCPListener.start() + } + + defer func() { + for _, cct := range c.tracks { + cct.udpRTPListener.stop() + cct.udpRTCPListener.stop() + } + }() + + // disable deadline + c.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(c.br) + if err != nil { + readerDone <- err + return + } + } + }() + + reportTicker := time.NewTicker(c.receiverReportPeriod) + defer reportTicker.Stop() + + keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod) + defer keepaliveTicker.Stop() + + checkStreamInitial := true + checkStreamTicker := time.NewTicker(c.InitialUDPReadTimeout) + defer func() { + checkStreamTicker.Stop() + }() + + for { + select { + case <-c.backgroundTerminate: + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range c.tracks { + rr := cct.rtcpReceiver.Report(now) + c.WritePacketRTCP(trackID, rr) + } + + case <-keepaliveTicker.C: + _, err := c.do(&base.Request{ + Method: func() base.Method { + // the vlc integrated rtsp server requires GET_PARAMETER + if c.useGetParameter { + return base.GetParameter + } + return base.Options + }(), + // use the stream base URL, otherwise some cameras do not reply + URL: c.streamBaseURL, + }, true) + if err != nil { + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return err + } + + case <-checkStreamTicker.C: + if checkStreamInitial { + // check that at least one packet has been received + inTimeout := func() bool { + for _, cct := range c.tracks { + lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) + if lft != 0 { + return false + } + + lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) + if lft != 0 { + return false + } + } + return true + }() + if inTimeout { + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientNoUDPPacketsRecently{} + } + + checkStreamInitial = false + checkStreamTicker.Stop() + checkStreamTicker = time.NewTicker(clientCheckStreamPeriod) + } else { + inTimeout := func() bool { + now := time.Now() + for _, cct := range c.tracks { + lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) + if now.Sub(lft) < c.ReadTimeout { + return false + } + + lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) + if now.Sub(lft) < c.ReadTimeout { + return false + } + } + return true + }() + if inTimeout { + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientUDPTimeout{} + } + } + + case err := <-readerDone: + return err + } + } +} + +func (c *Client) runBackgroundPlayTCP() error { + // for some reason, SetReadDeadline() must always be called in the same + // goroutine, otherwise Read() freezes. + // therefore, we disable the deadline and perform check with a ticker. + c.nconn.SetReadDeadline(time.Time{}) + + lastFrameTime := time.Now().Unix() + + readerDone := make(chan error) + go func() { + for { + frame := base.InterleavedFrame{ + Payload: c.tcpFrameBuffer.Next(), + } + err := frame.Read(c.br) + if err != nil { + readerDone <- err + return + } + + channel := frame.Channel + streamType := StreamTypeRTP + if (channel % 2) != 0 { + channel-- + streamType = StreamTypeRTCP + } + + trackID, ok := c.tracksByChannel[channel] + if !ok { + continue + } + + now := time.Now() + atomic.StoreInt64(&lastFrameTime, now.Unix()) + + if streamType == StreamTypeRTP { + c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload) + } else { + c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload) + } + + c.pullReadCB()(trackID, streamType, frame.Payload) + } + }() + + reportTicker := time.NewTicker(c.receiverReportPeriod) + defer reportTicker.Stop() + + checkStreamTicker := time.NewTicker(clientCheckStreamPeriod) + defer checkStreamTicker.Stop() + + for { + select { + case <-c.backgroundTerminate: + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range c.tracks { + rr := cct.rtcpReceiver.Report(now) + c.WritePacketRTCP(trackID, rr) + } + + case <-checkStreamTicker.C: + inTimeout := func() bool { + now := time.Now() + lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0) + return now.Sub(lft) >= c.ReadTimeout + }() + if inTimeout { + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return liberrors.ErrClientTCPTimeout{} + } + + case err := <-readerDone: + return err + } + } +} + +func (c *Client) runBackgroundRecordUDP() error { + for _, cct := range c.tracks { + cct.udpRTPListener.start() + cct.udpRTCPListener.start() + } + + defer func() { + for _, cct := range c.tracks { + cct.udpRTPListener.stop() + cct.udpRTCPListener.stop() + } + }() + + // disable deadline + c.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(c.br) + if err != nil { + readerDone <- err + return + } + } + }() + + reportTicker := time.NewTicker(c.senderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-c.backgroundTerminate: + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range c.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + c.WritePacketRTCP(trackID, sr) + } + } + + case err := <-readerDone: + return err + } + } +} + +func (c *Client) runBackgroundRecordTCP() error { + // disable deadline + c.nconn.SetReadDeadline(time.Time{}) + + readerDone := make(chan error) + go func() { + for { + frame := base.InterleavedFrame{ + Payload: c.tcpFrameBuffer.Next(), + } + err := frame.Read(c.br) + if err != nil { + readerDone <- err + return + } + + channel := frame.Channel + streamType := StreamTypeRTP + if (channel % 2) != 0 { + channel-- + streamType = StreamTypeRTCP + } + + trackID, ok := c.tracksByChannel[channel] + if !ok { + continue + } + + c.pullReadCB()(trackID, streamType, frame.Payload) + } + }() + + reportTicker := time.NewTicker(c.senderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-c.backgroundTerminate: + c.nconn.SetReadDeadline(time.Now()) + <-readerDone + return fmt.Errorf("terminated") + + case <-reportTicker.C: + now := time.Now() + for trackID, cct := range c.tracks { + sr := cct.rtcpSender.Report(now) + if sr != nil { + c.WritePacketRTCP(trackID, sr) + } + } + + case err := <-readerDone: + return err + } + } +} + +func (c *Client) connOpen() error { + if c.scheme != "rtsp" && c.scheme != "rtsps" { + return fmt.Errorf("unsupported scheme '%s'", c.scheme) + } + + if c.scheme == "rtsps" && c.Transport != nil && *c.Transport != TransportTCP { + return fmt.Errorf("RTSPS can be used only with TCP") + } + + if !strings.Contains(c.host, ":") { + c.host += ":554" + } + + ctx, cancel := context.WithTimeout(c.ctx, c.ReadTimeout) + defer cancel() + + nconn, err := c.DialContext(ctx, "tcp", c.host) + if err != nil { + return err + } + + conn := func() net.Conn { + if c.scheme == "rtsps" { + return tls.Client(nconn, c.TLSConfig) + } + return nconn + }() + + c.nconn = nconn + c.br = bufio.NewReaderSize(conn, clientReadBufferSize) + c.bw = bufio.NewWriterSize(conn, clientWriteBufferSize) + return nil +} + +func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error) { + if c.nconn == nil { + err := c.connOpen() if err != nil { - conn.Close() return nil, err } } - _, err = conn.Record() + if req.Header == nil { + req.Header = make(base.Header) + } + + if c.session != "" { + req.Header["Session"] = base.HeaderValue{c.session} + } + + if c.sender != nil { + c.sender.AddAuthorization(req) + } + + c.cseq++ + req.Header["CSeq"] = base.HeaderValue{strconv.FormatInt(int64(c.cseq), 10)} + + req.Header["User-Agent"] = base.HeaderValue{"gortsplib"} + + if c.OnRequest != nil { + c.OnRequest(req) + } + + var res base.Response + + err := func() error { + // the only two do() with skipResponses are + // - TEARDOWN -> ctx is already canceled, so this can't be used + // - keepalives -> if ctx is canceled during a keepalive, + // it's better not to stop the request, but wait until teardown + if !skipResponse { + ctxHandlerDone := make(chan struct{}) + defer func() { <-ctxHandlerDone }() + + ctxHandlerTerminate := make(chan struct{}) + defer close(ctxHandlerTerminate) + + go func() { + defer close(ctxHandlerDone) + select { + case <-c.ctx.Done(): + c.nconn.Close() + case <-ctxHandlerTerminate: + } + }() + } + + c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + err := req.Write(c.bw) + if err != nil { + return err + } + + if skipResponse { + return nil + } + + c.nconn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + + if c.tcpFrameBuffer != nil { + // read the response and ignore interleaved frames in between; + // interleaved frames are sent in two scenarios: + // * when the server is v4lrtspserver, before the PLAY response + // * when the stream is already playing + err = res.ReadIgnoreFrames(c.br, c.tcpFrameBuffer.Next()) + if err != nil { + return err + } + } else { + err = res.Read(c.br) + if err != nil { + return err + } + } + + return nil + }() if err != nil { - conn.Close() return nil, err } - return conn, nil + if c.OnResponse != nil { + c.OnResponse(&res) + } + + // get session from response + if v, ok := res.Header["Session"]; ok { + var sx headers.Session + err := sx.Read(v) + if err != nil { + return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err} + } + c.session = sx.Session + } + + // if required, send request again with authentication + if res.StatusCode == base.StatusUnauthorized && req.URL.User != nil && c.sender == nil { + pass, _ := req.URL.User.Password() + user := req.URL.User.Username() + + sender, err := auth.NewSender(res.Header["WWW-Authenticate"], user, pass) + if err != nil { + return nil, fmt.Errorf("unable to setup authentication: %s", err) + } + c.sender = sender + + return c.do(req, false) + } + + return &res, nil +} + +func (c *Client) doOptions(u *base.URL) (*base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStateInitial: {}, + clientStatePrePlay: {}, + clientStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.do(&base.Request{ + Method: base.Options, + URL: u, + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + // since this method is not implemented by every RTSP server, + // return only if status code is not 404 + if res.StatusCode == base.StatusNotFound { + return res, nil + } + return res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} + } + + c.useGetParameter = func() bool { + pub, ok := res.Header["Public"] + if !ok || len(pub) != 1 { + return false + } + + for _, m := range strings.Split(pub[0], ",") { + if base.Method(strings.Trim(m, " ")) == base.GetParameter { + return true + } + } + return false + }() + + return res, nil +} + +// Options writes an OPTIONS request and reads a response. +func (c *Client) Options(u *base.URL) (*base.Response, error) { + cres := make(chan clientRes) + select { + case c.options <- optionsReq{url: u, res: cres}: + res := <-cres + return res.res, res.err + + case <-c.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (c *Client) doDescribe(u *base.URL) (Tracks, *base.URL, *base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStateInitial: {}, + clientStatePrePlay: {}, + clientStatePreRecord: {}, + }) + if err != nil { + return nil, nil, nil, err + } + + res, err := c.do(&base.Request{ + Method: base.Describe, + URL: u, + Header: base.Header{ + "Accept": base.HeaderValue{"application/sdp"}, + }, + }, false) + if err != nil { + return nil, nil, nil, err + } + + if res.StatusCode != base.StatusOK { + // redirect + if !c.RedirectDisable && + res.StatusCode >= base.StatusMovedPermanently && + res.StatusCode <= base.StatusUseProxy && + len(res.Header["Location"]) == 1 { + c.reset(false) + + u, err := base.ParseURL(res.Header["Location"][0]) + if err != nil { + return nil, nil, nil, err + } + + c.scheme = u.Scheme + c.host = u.Host + + _, err = c.doOptions(u) + if err != nil { + return nil, nil, nil, err + } + + return c.doDescribe(u) + } + + return nil, nil, res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} + } + + ct, ok := res.Header["Content-Type"] + if !ok || len(ct) != 1 { + return nil, nil, nil, liberrors.ErrClientContentTypeMissing{} + } + + // strip encoding information from Content-Type header + ct = base.HeaderValue{strings.Split(ct[0], ";")[0]} + + if ct[0] != "application/sdp" { + return nil, nil, nil, liberrors.ErrClientContentTypeUnsupported{CT: ct} + } + + baseURL, err := func() (*base.URL, error) { + // use Content-Base + if cb, ok := res.Header["Content-Base"]; ok { + if len(cb) != 1 { + return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) + } + + ret, err := base.ParseURL(cb[0]) + if err != nil { + return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) + } + + // add credentials from URL of request + ret.User = u.User + + return ret, nil + } + + // if not provided, use URL of request + return u, nil + }() + if err != nil { + return nil, nil, nil, err + } + + tracks, err := ReadTracks(res.Body) + if err != nil { + return nil, nil, nil, err + } + + return tracks, baseURL, res, nil +} + +// Describe writes a DESCRIBE request and reads a Response. +func (c *Client) Describe(u *base.URL) (Tracks, *base.URL, *base.Response, error) { + cres := make(chan clientRes) + select { + case c.describe <- describeReq{url: u, res: cres}: + res := <-cres + return res.tracks, res.baseURL, res.res, res.err + + case <-c.ctx.Done(): + return nil, nil, nil, liberrors.ErrClientTerminated{} + } +} + +func (c *Client) doAnnounce(u *base.URL, tracks Tracks) (*base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStateInitial: {}, + }) + if err != nil { + return nil, err + } + + // in case of ANNOUNCE, the base URL doesn't have a trailing slash. + // (tested with ffmpeg and gstreamer) + baseURL := u.Clone() + + for i, t := range tracks { + if !t.hasControlAttribute() { + t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=" + strconv.FormatInt(int64(i), 10), + }) + } + } + + res, err := c.do(&base.Request{ + Method: base.Announce, + URL: u, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: tracks.Write(), + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, liberrors.ErrClientBadStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage, + } + } + + c.streamBaseURL = baseURL + c.state = clientStatePreRecord + + return res, nil +} + +// Announce writes an ANNOUNCE request and reads a Response. +func (c *Client) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { + cres := make(chan clientRes) + select { + case c.announce <- announceReq{url: u, tracks: tracks, res: cres}: + res := <-cres + return res.res, res.err + + case <-c.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (c *Client) doSetup( + mode headers.TransportMode, + baseURL *base.URL, + track *Track, + rtpPort int, + rtcpPort int) (*base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStateInitial: {}, + clientStatePrePlay: {}, + clientStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + if (mode == headers.TransportModeRecord && c.state != clientStatePreRecord) || + (mode == headers.TransportModePlay && c.state != clientStatePrePlay && + c.state != clientStateInitial) { + return nil, liberrors.ErrClientCannotReadPublishAtSameTime{} + } + + if c.streamBaseURL != nil && *baseURL != *c.streamBaseURL { + return nil, liberrors.ErrClientCannotSetupTracksDifferentURLs{} + } + + var rtpListener *clientUDPListener + var rtcpListener *clientUDPListener + + // always use TCP if encrypted + if c.scheme == "rtsps" { + v := TransportTCP + c.protocol = &v + } + + proto := func() Transport { + // protocol set by previous Setup() or switchProtocolIfTimeout() + if c.protocol != nil { + return *c.protocol + } + + // protocol set by conf + if c.Transport != nil { + return *c.Transport + } + + // try UDP + return TransportUDP + }() + + th := headers.Transport{ + Mode: &mode, + } + + trackID := len(c.tracks) + + switch proto { + case TransportUDP: + if (rtpPort == 0 && rtcpPort != 0) || + (rtpPort != 0 && rtcpPort == 0) { + return nil, liberrors.ErrClientUDPPortsZero{} + } + + if rtpPort != 0 && rtcpPort != (rtpPort+1) { + return nil, liberrors.ErrClientUDPPortsNotConsecutive{} + } + + var err error + if rtpPort != 0 { + rtpListener, err = newClientUDPListener(c, false, ":"+strconv.FormatInt(int64(rtpPort), 10)) + if err != nil { + return nil, err + } + + rtcpListener, err = newClientUDPListener(c, false, ":"+strconv.FormatInt(int64(rtcpPort), 10)) + if err != nil { + rtpListener.close() + return nil, err + } + } else { + rtpListener, rtcpListener = newClientUDPListenerPair(c) + } + + v1 := headers.TransportDeliveryUnicast + th.Delivery = &v1 + th.Protocol = headers.TransportProtocolUDP + th.ClientPorts = &[2]int{ + rtpListener.port(), + rtcpListener.port(), + } + + case TransportUDPMulticast: + v1 := headers.TransportDeliveryMulticast + th.Delivery = &v1 + th.Protocol = headers.TransportProtocolUDP + + case TransportTCP: + v1 := headers.TransportDeliveryUnicast + th.Delivery = &v1 + th.Protocol = headers.TransportProtocolTCP + th.InterleavedIDs = &[2]int{(trackID * 2), (trackID * 2) + 1} + } + + trackURL, err := track.URL(baseURL) + if err != nil { + if proto == TransportUDP { + rtpListener.close() + rtcpListener.close() + } + return nil, err + } + + res, err := c.do(&base.Request{ + Method: base.Setup, + URL: trackURL, + Header: base.Header{ + "Transport": th.Write(), + }, + }, false) + if err != nil { + if proto == TransportUDP { + rtpListener.close() + rtcpListener.close() + } + return nil, err + } + + if res.StatusCode != base.StatusOK { + if proto == TransportUDP { + rtpListener.close() + rtcpListener.close() + } + + // switch protocol automatically + if res.StatusCode == base.StatusUnsupportedTransport && + c.protocol == nil && + c.Transport == nil { + v := TransportTCP + c.protocol = &v + + return c.doSetup(mode, baseURL, track, 0, 0) + } + + return res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} + } + + var thRes headers.Transport + err = thRes.Read(res.Header["Transport"]) + if err != nil { + if proto == TransportUDP { + rtpListener.close() + rtcpListener.close() + } + return nil, liberrors.ErrClientTransportHeaderInvalid{Err: err} + } + + switch proto { + case TransportUDP: + if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { + return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} + } + + if !c.AnyPortEnable { + if thRes.ServerPorts == nil || isAnyPort(thRes.ServerPorts[0]) || isAnyPort(thRes.ServerPorts[1]) { + rtpListener.close() + rtcpListener.close() + return nil, liberrors.ErrClientServerPortsNotProvided{} + } + } + + case TransportUDPMulticast: + if thRes.Delivery == nil || *thRes.Delivery != headers.TransportDeliveryMulticast { + return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} + } + + if thRes.Ports == nil { + return nil, liberrors.ErrClientTransportHeaderNoPorts{} + } + + if thRes.Destination == nil { + return nil, liberrors.ErrClientTransportHeaderNoDestination{} + } + + rtpListener, err = newClientUDPListener(c, true, + thRes.Destination.String()+":"+strconv.FormatInt(int64(thRes.Ports[0]), 10)) + if err != nil { + return nil, err + } + + rtcpListener, err = newClientUDPListener(c, true, + thRes.Destination.String()+":"+strconv.FormatInt(int64(thRes.Ports[1]), 10)) + if err != nil { + rtpListener.close() + return nil, err + } + + case TransportTCP: + if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { + return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} + } + + if thRes.InterleavedIDs == nil { + return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{} + } + + if (thRes.InterleavedIDs[0]%2) != 0 || + (thRes.InterleavedIDs[0]+1) != thRes.InterleavedIDs[1] { + return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{} + } + + if _, ok := c.tracksByChannel[thRes.InterleavedIDs[0]]; ok { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, liberrors.ErrClientTransportHeaderInterleavedIDsAlreadyUsed{} + } + } + + clockRate, _ := track.ClockRate() + cct := clientTrack{ + track: track, + } + + if mode == headers.TransportModePlay { + c.state = clientStatePrePlay + cct.rtcpReceiver = rtcpreceiver.New(nil, clockRate) + } else { + c.state = clientStatePreRecord + cct.rtcpSender = rtcpsender.New(clockRate) + } + + c.streamBaseURL = baseURL + c.protocol = &proto + + switch proto { + case TransportUDP: + rtpListener.remoteReadIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtpListener.remoteWriteIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone + if thRes.ServerPorts != nil { + rtpListener.remotePort = thRes.ServerPorts[0] + } + rtpListener.trackID = trackID + rtpListener.streamType = StreamTypeRTP + cct.udpRTPListener = rtpListener + + rtcpListener.remoteReadIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtcpListener.remoteWriteIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtcpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone + if thRes.ServerPorts != nil { + rtcpListener.remotePort = thRes.ServerPorts[1] + } + rtcpListener.trackID = trackID + rtcpListener.streamType = StreamTypeRTCP + cct.udpRTCPListener = rtcpListener + + case TransportUDPMulticast: + rtpListener.remoteReadIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtpListener.remoteWriteIP = *thRes.Destination + rtpListener.remoteZone = "" + rtpListener.remotePort = thRes.Ports[0] + rtpListener.trackID = trackID + rtpListener.streamType = StreamTypeRTP + cct.udpRTPListener = rtpListener + + rtcpListener.remoteReadIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtcpListener.remoteWriteIP = *thRes.Destination + rtcpListener.remoteZone = "" + rtcpListener.remotePort = thRes.Ports[1] + rtcpListener.trackID = trackID + rtcpListener.streamType = StreamTypeRTCP + cct.udpRTCPListener = rtcpListener + + case TransportTCP: + if c.tcpFrameBuffer == nil { + c.tcpFrameBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) + } + + if c.tracksByChannel == nil { + c.tracksByChannel = make(map[int]int) + } + + c.tracksByChannel[thRes.InterleavedIDs[0]] = trackID + + cct.tcpChannel = thRes.InterleavedIDs[0] + } + + if c.tracks == nil { + c.tracks = make(map[int]clientTrack) + } + + c.tracks[trackID] = cct + + return res, nil +} + +// Setup writes a SETUP request and reads a Response. +// rtpPort and rtcpPort are used only if protocol is UDP. +// if rtpPort and rtcpPort are zero, they are chosen automatically. +func (c *Client) Setup( + mode headers.TransportMode, + baseURL *base.URL, + track *Track, + rtpPort int, + rtcpPort int) (*base.Response, error) { + cres := make(chan clientRes) + select { + case c.setup <- setupReq{ + mode: mode, + baseURL: baseURL, + track: track, + rtpPort: rtpPort, + rtcpPort: rtcpPort, + res: cres, + }: + res := <-cres + return res.res, res.err + + case <-c.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (c *Client) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStatePrePlay: {}, + }) + if err != nil { + return nil, err + } + + // open the firewall by sending packets to the counterpart. + // do this before sending the PLAY request. + if *cc.protocol == TransportUDP { + for _, cct := range cc.tracks { + cct.udpRTPListener.write( + []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) + + cct.udpRTCPListener.write( + []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) + } + } + + header := make(base.Header) + + // Range is mandatory in Parrot Streaming Server + if ra == nil { + ra = &headers.Range{ + Value: &headers.RangeNPT{ + Start: headers.RangeNPTTime(0), + }, + } + } + header["Range"] = ra.Write() + + res, err := c.do(&base.Request{ + Method: base.Play, + URL: c.streamBaseURL, + Header: header, + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, liberrors.ErrClientBadStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage, + } + } + + c.state = clientStatePlay + c.lastRange = ra + + if !isSwitchingProtocol { + // use a temporary callback that is replaces as soon as + // the user calls ReadFrames() + c.readCBSet = make(chan struct{}) + copy := c.readCBSet + c.readCB = func(trackID int, streamType StreamType, payload []byte) { + select { + case <-copy: + case <-c.ctx.Done(): + return + } + c.pullReadCB()(trackID, streamType, payload) + } + } + + c.backgroundStart(isSwitchingProtocol) + + return res, nil +} + +// Play writes a PLAY request and reads a Response. +// This can be called only after Setup(). +func (c *Client) Play(ra *headers.Range) (*base.Response, error) { + cres := make(chan clientRes) + select { + case c.play <- playReq{ra: ra, res: cres}: + res := <-cres + return res.res, res.err + + case <-c.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (c *Client) doRecord() (*base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.do(&base.Request{ + Method: base.Record, + URL: c.streamBaseURL, + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, liberrors.ErrClientBadStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage, + } + } + + c.state = clientStateRecord + + // when publishing, calling ReadFrames() is not mandatory + // use an empty callback + c.readCB = func(trackID int, streamType StreamType, payload []byte) { + } + + c.backgroundStart(false) + + return nil, nil +} + +// Record writes a RECORD request and reads a Response. +// This can be called only after Announce() and Setup(). +func (c *Client) Record() (*base.Response, error) { + cres := make(chan clientRes) + select { + case c.record <- recordReq{res: cres}: + res := <-cres + return res.res, res.err + + case <-c.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +func (c *Client) doPause() (*base.Response, error) { + err := c.checkState(map[clientState]struct{}{ + clientStatePlay: {}, + clientStateRecord: {}, + }) + if err != nil { + return nil, err + } + + c.backgroundClose(false) + + res, err := c.do(&base.Request{ + Method: base.Pause, + URL: c.streamBaseURL, + }, false) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return res, liberrors.ErrClientBadStatusCode{ + Code: res.StatusCode, Message: res.StatusMessage, + } + } + + switch c.state { + case clientStatePlay: + c.state = clientStatePrePlay + case clientStateRecord: + c.state = clientStatePreRecord + } + + return res, nil +} + +// Pause writes a PAUSE request and reads a Response. +// This can be called only after Play() or Record(). +func (c *Client) Pause() (*base.Response, error) { + cres := make(chan clientRes) + select { + case c.pause <- pauseReq{res: cres}: + res := <-cres + return res.res, res.err + + case <-c.ctx.Done(): + return nil, liberrors.ErrClientTerminated{} + } +} + +// Seek asks the server to re-start the stream from a specific timestamp. +func (c *Client) Seek(ra *headers.Range) (*base.Response, error) { + _, err := c.Pause() + if err != nil { + return nil, err + } + + return c.Play(ra) +} + +// ReadFrames starts reading frames. +func (c *Client) ReadFrames(onFrame func(int, StreamType, []byte)) error { + c.readCBMutex.Lock() + c.readCB = onFrame + c.readCBMutex.Unlock() + + // replace temporary callback with final callback + if c.readCBSet != nil { + close(c.readCBSet) + c.readCBSet = nil + } + + <-c.backgroundDone + return c.backgroundErr +} + +// WritePacketRTP writes a RTP packet. +func (c *Client) WritePacketRTP(trackID int, payload []byte) error { + now := time.Now() + + c.writeMutex.RLock() + defer c.writeMutex.RUnlock() + + if !c.writeFrameAllowed { + return c.backgroundErr + } + + if c.tracks[trackID].rtcpSender != nil { + c.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload) + } + + switch *c.protocol { + case TransportUDP, TransportUDPMulticast: + return c.tracks[trackID].udpRTPListener.write(payload) + + default: // TCP + channel := c.tracks[trackID].tcpChannel + + c.tcpWriteMutex.Lock() + defer c.tcpWriteMutex.Unlock() + + c.nconn.SetWriteDeadline(now.Add(c.WriteTimeout)) + return base.InterleavedFrame{ + Channel: channel, + Payload: payload, + }.Write(c.bw) + } +} + +// WritePacketRTCP writes a RTCP packet. +func (c *Client) WritePacketRTCP(trackID int, payload []byte) error { + now := time.Now() + + c.writeMutex.RLock() + defer c.writeMutex.RUnlock() + + if !c.writeFrameAllowed { + return c.backgroundErr + } + + if c.tracks[trackID].rtcpSender != nil { + c.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload) + } + + switch *c.protocol { + case TransportUDP, TransportUDPMulticast: + return c.tracks[trackID].udpRTCPListener.write(payload) + + default: // TCP + channel := c.tracks[trackID].tcpChannel + channel++ + + c.tcpWriteMutex.Lock() + defer c.tcpWriteMutex.Unlock() + + c.nconn.SetWriteDeadline(now.Add(c.WriteTimeout)) + return base.InterleavedFrame{ + Channel: channel, + Payload: payload, + }.Write(c.bw) + } } diff --git a/client_publish_test.go b/client_publish_test.go index 098de68a..7a48de77 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -172,7 +172,7 @@ func TestClientPublishSerial(t *testing.T) { track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) require.NoError(t, err) - conn, err := c.DialPublish("rtsp://localhost:8554/teststream", + err = c.DialPublish("rtsp://localhost:8554/teststream", Tracks{track}) require.NoError(t, err) @@ -180,7 +180,7 @@ func TestClientPublishSerial(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + c.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { require.Equal(t, 0, trackID) require.Equal(t, StreamTypeRTCP, streamType) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, payload) @@ -188,15 +188,15 @@ func TestClientPublishSerial(t *testing.T) { }) }() - err = conn.WritePacketRTP(0, + err = c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) require.NoError(t, err) <-recvDone - conn.Close() + c.Close() <-done - err = conn.WritePacketRTP(0, + err = c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) require.Error(t, err) }) @@ -316,10 +316,10 @@ func TestClientPublishParallel(t *testing.T) { writerDone := make(chan struct{}) defer func() { <-writerDone }() - conn, err := c.DialPublish("rtsp://localhost:8554/teststream", + err = c.DialPublish("rtsp://localhost:8554/teststream", Tracks{track}) require.NoError(t, err) - defer conn.Close() + defer c.Close() go func() { defer close(writerDone) @@ -328,7 +328,7 @@ func TestClientPublishParallel(t *testing.T) { defer t.Stop() for range t.C { - err := conn.WritePacketRTP(0, + err := c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) if err != nil { return @@ -470,26 +470,26 @@ func TestClientPublishPauseSerial(t *testing.T) { track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) require.NoError(t, err) - conn, err := c.DialPublish("rtsp://localhost:8554/teststream", + err = c.DialPublish("rtsp://localhost:8554/teststream", Tracks{track}) require.NoError(t, err) - defer conn.Close() + defer c.Close() - err = conn.WritePacketRTP(0, + err = c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) require.NoError(t, err) - _, err = conn.Pause() + _, err = c.Pause() require.NoError(t, err) - err = conn.WritePacketRTP(0, + err = c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) require.Error(t, err) - _, err = conn.Record() + _, err = c.Record() require.NoError(t, err) - err = conn.WritePacketRTP(0, + err = c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) require.NoError(t, err) }) @@ -607,7 +607,7 @@ func TestClientPublishPauseParallel(t *testing.T) { track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) require.NoError(t, err) - conn, err := c.DialPublish("rtsp://localhost:8554/teststream", + err = c.DialPublish("rtsp://localhost:8554/teststream", Tracks{track}) require.NoError(t, err) @@ -619,7 +619,7 @@ func TestClientPublishPauseParallel(t *testing.T) { defer t.Stop() for range t.C { - err := conn.WritePacketRTP(0, + err := c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) if err != nil { return @@ -629,11 +629,11 @@ func TestClientPublishPauseParallel(t *testing.T) { time.Sleep(1 * time.Second) - _, err = conn.Pause() + _, err = c.Pause() require.NoError(t, err) <-writerDone - conn.Close() + c.Close() }) } } @@ -747,12 +747,12 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { c := Client{} - conn, err := c.DialPublish("rtsp://localhost:8554/teststream", + err = c.DialPublish("rtsp://localhost:8554/teststream", Tracks{track}) require.NoError(t, err) - defer conn.Close() + defer c.Close() - err = conn.WritePacketRTP(0, + err = c.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04}) require.NoError(t, err) } @@ -888,10 +888,10 @@ func TestClientPublishRTCPReport(t *testing.T) { track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) require.NoError(t, err) - conn, err := c.DialPublish("rtsp://localhost:8554/teststream", + err = c.DialPublish("rtsp://localhost:8554/teststream", Tracks{track}) require.NoError(t, err) - defer conn.Close() + defer c.Close() byts, _ := (&rtp.Packet{ Header: rtp.Header{ @@ -904,11 +904,11 @@ func TestClientPublishRTCPReport(t *testing.T) { }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, }).Marshal() - err = conn.WritePacketRTP(0, byts) + err = c.WritePacketRTP(0, byts) require.NoError(t, err) time.Sleep(1300 * time.Millisecond) - err = conn.WritePacketRTP(0, byts) + err = c.WritePacketRTP(0, byts) require.NoError(t, err) } diff --git a/client_read_test.go b/client_read_test.go index da513137..a48a3e09 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -132,9 +132,9 @@ func TestClientReadTracks(t *testing.T) { c := Client{} - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) - defer conn.Close() + defer c.Close() track1.Media.Attributes = append(track1.Media.Attributes, psdp.Attribute{ Key: "control", @@ -161,7 +161,7 @@ func TestClientReadTracks(t *testing.T) { { Media: track3.Media, }, - }, conn.Tracks()) + }, c.Tracks()) } func TestClientRead(t *testing.T) { @@ -412,14 +412,14 @@ func TestClientRead(t *testing.T) { }(), } - conn, err := c.DialRead(scheme + "://" + listenIP + ":8554/test/stream?param=value") + err = c.DialRead(scheme + "://" + listenIP + ":8554/test/stream?param=value") require.NoError(t, err) done := make(chan struct{}) counter := uint64(0) go func() { defer close(done) - conn.ReadFrames(func(id int, streamType StreamType, payload []byte) { + c.ReadFrames(func(id int, streamType StreamType, payload []byte) { // skip multicast loopback if transport == "multicast" { add := atomic.AddUint64(&counter, 1) @@ -432,16 +432,16 @@ func TestClientRead(t *testing.T) { require.Equal(t, StreamTypeRTP, streamType) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) - err = conn.WritePacketRTCP(0, []byte{0x05, 0x06, 0x07, 0x08}) + err = c.WritePacketRTCP(0, []byte{0x05, 0x06, 0x07, 0x08}) require.NoError(t, err) }) }() <-frameRecv - conn.Close() + c.Close() <-done - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { }) }) } @@ -568,7 +568,7 @@ func TestClientReadNonStandardFrameSize(t *testing.T) { }() <-frameRecv - conn.Close() + c.Close() <-done } @@ -675,24 +675,24 @@ func TestClientReadPartial(t *testing.T) { u, err := base.ParseURL("rtsp://" + listenIP + ":8554/teststream") require.NoError(t, err) - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) require.NoError(t, err) - defer conn.Close() + defer c.Close() - tracks, baseURL, _, err := conn.Describe(u) + tracks, baseURL, _, err := c.Describe(u) require.NoError(t, err) - _, err = conn.Setup(headers.TransportModePlay, baseURL, tracks[1], 0, 0) + _, err = c.Setup(headers.TransportModePlay, baseURL, tracks[1], 0, 0) require.NoError(t, err) - _, err = conn.Play(nil) + _, err = c.Play(nil) require.NoError(t, err) done := make(chan struct{}) frameRecv := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, streamType StreamType, payload []byte) { + c.ReadFrames(func(id int, streamType StreamType, payload []byte) { require.Equal(t, 0, id) require.Equal(t, StreamTypeRTP, streamType) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) @@ -805,9 +805,9 @@ func TestClientReadNoContentBase(t *testing.T) { c := Client{} - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) - conn.Close() + c.Close() } func TestClientReadAnyPort(t *testing.T) { @@ -924,20 +924,20 @@ func TestClientReadAnyPort(t *testing.T) { AnyPortEnable: true, } - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) frameRecv := make(chan struct{}) done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { close(frameRecv) }) }() <-frameRecv - conn.Close() + c.Close() <-done }) } @@ -1045,20 +1045,20 @@ func TestClientReadAutomaticProtocol(t *testing.T) { c := Client{} - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) frameRecv := make(chan struct{}) done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { close(frameRecv) }) }() <-frameRecv - conn.Close() + c.Close() <-done }) @@ -1252,20 +1252,20 @@ func TestClientReadAutomaticProtocol(t *testing.T) { ReadTimeout: 1 * time.Second, } - conn, err := c.DialRead("rtsp://myuser:mypass@localhost:8554/teststream") + err = c.DialRead("rtsp://myuser:mypass@localhost:8554/teststream") require.NoError(t, err) frameRecv := make(chan struct{}) done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { close(frameRecv) }) }() <-frameRecv - conn.Close() + c.Close() <-done }) } @@ -1381,21 +1381,21 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { }(), } - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) frameRecv := make(chan struct{}) done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { require.Equal(t, 0, id) close(frameRecv) }) }() <-frameRecv - conn.Close() + c.Close() <-done } @@ -1530,20 +1530,20 @@ func TestClientReadRedirect(t *testing.T) { c := Client{} - conn, err := c.DialRead("rtsp://localhost:8554/path1") + err = c.DialRead("rtsp://localhost:8554/path1") require.NoError(t, err) frameRecv := make(chan struct{}) done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { close(frameRecv) }) }() <-frameRecv - conn.Close() + c.Close() <-done } @@ -1734,7 +1734,7 @@ func TestClientReadPause(t *testing.T) { }(), } - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) firstFrame := int32(0) @@ -1742,7 +1742,7 @@ func TestClientReadPause(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } @@ -1750,14 +1750,14 @@ func TestClientReadPause(t *testing.T) { }() <-frameRecv - _, err = conn.Pause() + _, err = c.Pause() require.NoError(t, err) <-done - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { }) - _, err = conn.Play(nil) + _, err = c.Play(nil) require.NoError(t, err) firstFrame = int32(0) @@ -1765,7 +1765,7 @@ func TestClientReadPause(t *testing.T) { done = make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } @@ -1773,7 +1773,7 @@ func TestClientReadPause(t *testing.T) { }() <-frameRecv - conn.Close() + c.Close() <-done }) } @@ -1925,7 +1925,7 @@ func TestClientReadRTCPReport(t *testing.T) { receiverReportPeriod: 1 * time.Second, } - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) recv := 0 @@ -1933,7 +1933,7 @@ func TestClientReadRTCPReport(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(id int, typ StreamType, payload []byte) { + c.ReadFrames(func(id int, typ StreamType, payload []byte) { recv++ if recv >= 3 { close(recvDone) @@ -1944,7 +1944,7 @@ func TestClientReadRTCPReport(t *testing.T) { time.Sleep(1300 * time.Millisecond) <-recvDone - conn.Close() + c.Close() <-done } @@ -2092,11 +2092,11 @@ func TestClientReadErrorTimeout(t *testing.T) { ReadTimeout: 1 * time.Second, } - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) - defer conn.Close() + defer c.Close() - err = conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + err = c.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { }) switch transport { @@ -2223,20 +2223,20 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { }(), } - conn, err := c.DialRead("rtsp://localhost:8554/teststream") + err = c.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) recv := make(chan struct{}) done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + c.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { close(recv) }) }() <-recv - conn.Close() + c.Close() <-done } @@ -2379,29 +2379,29 @@ func TestClientReadSeek(t *testing.T) { u, err := base.ParseURL("rtsp://localhost:8554/teststream") require.NoError(t, err) - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) require.NoError(t, err) - defer conn.Close() + defer c.Close() - _, err = conn.Options(u) + _, err = c.Options(u) require.NoError(t, err) - tracks, baseURL, _, err := conn.Describe(u) + tracks, baseURL, _, err := c.Describe(u) require.NoError(t, err) for _, track := range tracks { - _, err := conn.Setup(headers.TransportModePlay, baseURL, track, 0, 0) + _, err := c.Setup(headers.TransportModePlay, baseURL, track, 0, 0) require.NoError(t, err) } - _, err = conn.Play(&headers.Range{ + _, err = c.Play(&headers.Range{ Value: &headers.RangeNPT{ Start: headers.RangeNPTTime(5500 * time.Millisecond), }, }) require.NoError(t, err) - _, err = conn.Seek(&headers.Range{ + _, err = c.Seek(&headers.Range{ Value: &headers.RangeNPT{ Start: headers.RangeNPTTime(6400 * time.Millisecond), }, diff --git a/client_test.go b/client_test.go index 34ff4eaa..921790ee 100644 --- a/client_test.go +++ b/client_test.go @@ -90,14 +90,14 @@ func TestClientSession(t *testing.T) { c := Client{} - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) require.NoError(t, err) - defer conn.Close() + defer c.Close() - _, err = conn.Options(u) + _, err = c.Options(u) require.NoError(t, err) - _, _, _, err = conn.Describe(u) + _, _, _, err = c.Describe(u) require.NoError(t, err) } @@ -171,14 +171,14 @@ func TestClientAuth(t *testing.T) { c := Client{} - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) require.NoError(t, err) - defer conn.Close() + defer c.Close() - _, err = conn.Options(u) + _, err = c.Options(u) require.NoError(t, err) - _, _, _, err = conn.Describe(u) + _, _, _, err = c.Describe(u) require.NoError(t, err) } @@ -235,13 +235,13 @@ func TestClientDescribeCharset(t *testing.T) { c := Client{} - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) require.NoError(t, err) - defer conn.Close() + defer c.Close() - _, err = conn.Options(u) + _, err = c.Options(u) require.NoError(t, err) - _, _, _, err = conn.Describe(u) + _, _, _, err = c.Describe(u) require.NoError(t, err) } diff --git a/clientconn.go b/clientconn.go deleted file mode 100644 index 43988438..00000000 --- a/clientconn.go +++ /dev/null @@ -1,1742 +0,0 @@ -package gortsplib - -import ( - "bufio" - "context" - "crypto/tls" - "fmt" - "net" - "sort" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - psdp "github.com/pion/sdp/v3" - - "github.com/aler9/gortsplib/pkg/auth" - "github.com/aler9/gortsplib/pkg/base" - "github.com/aler9/gortsplib/pkg/headers" - "github.com/aler9/gortsplib/pkg/liberrors" - "github.com/aler9/gortsplib/pkg/multibuffer" - "github.com/aler9/gortsplib/pkg/rtcpreceiver" - "github.com/aler9/gortsplib/pkg/rtcpsender" -) - -const ( - clientConnReadBufferSize = 4096 - clientConnWriteBufferSize = 4096 - clientConnCheckStreamPeriod = 1 * time.Second - clientConnUDPKeepalivePeriod = 30 * time.Second -) - -func isErrNOUDPPacketsReceivedRecently(err error) bool { - _, ok := err.(liberrors.ErrClientNoUDPPacketsRecently) - return ok -} - -func isAnyPort(p int) bool { - return p == 0 || p == 1 -} - -type clientConnState int - -const ( - clientConnStateInitial clientConnState = iota - clientConnStatePrePlay - clientConnStatePlay - clientConnStatePreRecord - clientConnStateRecord -) - -type clientConnTrack struct { - track *Track - udpRTPListener *clientUDPListener - udpRTCPListener *clientUDPListener - tcpChannel int - rtcpReceiver *rtcpreceiver.RTCPReceiver - rtcpSender *rtcpsender.RTCPSender -} - -func (s clientConnState) String() string { - switch s { - case clientConnStateInitial: - return "initial" - case clientConnStatePrePlay: - return "prePlay" - case clientConnStatePlay: - return "play" - case clientConnStatePreRecord: - return "preRecord" - case clientConnStateRecord: - return "record" - } - return "unknown" -} - -type optionsReq struct { - url *base.URL - res chan clientRes -} - -type describeReq struct { - url *base.URL - res chan clientRes -} - -type announceReq struct { - url *base.URL - tracks Tracks - res chan clientRes -} - -type setupReq struct { - mode headers.TransportMode - baseURL *base.URL - track *Track - rtpPort int - rtcpPort int - res chan clientRes -} - -type playReq struct { - ra *headers.Range - res chan clientRes -} - -type recordReq struct { - res chan clientRes -} - -type pauseReq struct { - res chan clientRes -} - -type clientRes struct { - tracks Tracks - baseURL *base.URL - res *base.Response - err error -} - -// ClientConn is a client-side RTSP connection. -type ClientConn struct { - c *Client - scheme string - host string - ctx context.Context - ctxCancel func() - state clientConnState - nconn net.Conn - br *bufio.Reader - bw *bufio.Writer - session string - sender *auth.Sender - cseq int - useGetParameter bool - streamBaseURL *base.URL - protocol *Transport - tracks map[int]clientConnTrack - tracksByChannel map[int]int - lastRange *headers.Range - backgroundRunning bool - backgroundErr error - tcpFrameBuffer *multibuffer.MultiBuffer // tcp - tcpWriteMutex sync.Mutex // tcp - readCBMutex sync.RWMutex // read - readCB func(int, StreamType, []byte) // read - writeMutex sync.RWMutex // write - writeFrameAllowed bool // write - - // in - options chan optionsReq - describe chan describeReq - announce chan announceReq - setup chan setupReq - play chan playReq - record chan recordReq - pause chan pauseReq - backgroundTerminate chan struct{} - - // out - backgroundInnerDone chan error - backgroundDone chan struct{} - readCBSet chan struct{} - done chan struct{} -} - -func newClientConn(c *Client, scheme string, host string) (*ClientConn, error) { - // RTSP parameters - if c.ReadTimeout == 0 { - c.ReadTimeout = 10 * time.Second - } - if c.WriteTimeout == 0 { - c.WriteTimeout = 10 * time.Second - } - if c.TLSConfig == nil { - c.TLSConfig = &tls.Config{InsecureSkipVerify: true} - } - if c.InitialUDPReadTimeout == 0 { - c.InitialUDPReadTimeout = 3 * time.Second - } - if c.ReadBufferCount == 0 { - c.ReadBufferCount = 1 - } - if c.ReadBufferSize == 0 { - c.ReadBufferSize = 2048 - } - - // system functions - if c.DialContext == nil { - c.DialContext = (&net.Dialer{}).DialContext - } - if c.ListenPacket == nil { - c.ListenPacket = net.ListenPacket - } - - // private - if c.senderReportPeriod == 0 { - c.senderReportPeriod = 10 * time.Second - } - if c.receiverReportPeriod == 0 { - c.receiverReportPeriod = 10 * time.Second - } - - ctx, ctxCancel := context.WithCancel(context.Background()) - - cc := &ClientConn{ - c: c, - scheme: scheme, - host: host, - ctx: ctx, - ctxCancel: ctxCancel, - options: make(chan optionsReq), - describe: make(chan describeReq), - announce: make(chan announceReq), - setup: make(chan setupReq), - play: make(chan playReq), - record: make(chan recordReq), - pause: make(chan pauseReq), - done: make(chan struct{}), - } - - go cc.run() - - return cc, nil -} - -// Close closes the connection and waits for all its resources to exit. -func (cc *ClientConn) Close() error { - cc.ctxCancel() - <-cc.done - return nil -} - -// Tracks returns all the tracks that the connection is reading or publishing. -func (cc *ClientConn) Tracks() Tracks { - ids := make([]int, len(cc.tracks)) - pos := 0 - for id := range cc.tracks { - ids[pos] = id - pos++ - } - sort.Slice(ids, func(a, b int) bool { - return ids[a] < ids[b] - }) - - var ret Tracks - for _, id := range ids { - ret = append(ret, cc.tracks[id].track) - } - return ret -} - -func (cc *ClientConn) run() { - defer close(cc.done) - -outer: - for { - select { - case req := <-cc.options: - res, err := cc.doOptions(req.url) - req.res <- clientRes{res: res, err: err} - - case req := <-cc.describe: - tracks, baseURL, res, err := cc.doDescribe(req.url) - req.res <- clientRes{tracks: tracks, baseURL: baseURL, res: res, err: err} - - case req := <-cc.announce: - res, err := cc.doAnnounce(req.url, req.tracks) - req.res <- clientRes{res: res, err: err} - - case req := <-cc.setup: - res, err := cc.doSetup(req.mode, req.baseURL, req.track, req.rtpPort, req.rtcpPort) - req.res <- clientRes{res: res, err: err} - - case req := <-cc.play: - res, err := cc.doPlay(req.ra, false) - req.res <- clientRes{res: res, err: err} - - case req := <-cc.record: - res, err := cc.doRecord() - req.res <- clientRes{res: res, err: err} - - case req := <-cc.pause: - res, err := cc.doPause() - req.res <- clientRes{res: res, err: err} - - case err := <-cc.backgroundInnerDone: - cc.backgroundRunning = false - err = cc.switchProtocolIfTimeout(err) - if err != nil { - cc.backgroundErr = err - close(cc.backgroundDone) - - cc.writeMutex.Lock() - cc.writeFrameAllowed = false - cc.writeMutex.Unlock() - } - - case <-cc.ctx.Done(): - break outer - } - } - - cc.ctxCancel() - - cc.doClose(false) -} - -func (cc *ClientConn) doClose(isSwitchingProtocol bool) { - if cc.backgroundRunning { - cc.backgroundClose(isSwitchingProtocol) - } - - if cc.state == clientConnStatePlay || cc.state == clientConnStateRecord { - cc.do(&base.Request{ - Method: base.Teardown, - URL: cc.streamBaseURL, - }, true) - } - - for _, track := range cc.tracks { - if track.udpRTPListener != nil { - track.udpRTPListener.close() - track.udpRTCPListener.close() - } - } - - if cc.nconn != nil { - cc.nconn.Close() - cc.nconn = nil - } -} - -func (cc *ClientConn) reset(isSwitchingProtocol bool) { - cc.doClose(isSwitchingProtocol) - - cc.state = clientConnStateInitial - cc.session = "" - cc.sender = nil - cc.cseq = 0 - cc.useGetParameter = false - cc.streamBaseURL = nil - cc.protocol = nil - cc.tracks = nil - cc.tracksByChannel = nil - cc.tcpFrameBuffer = nil - - if !isSwitchingProtocol { - cc.readCB = nil - } -} - -func (cc *ClientConn) checkState(allowed map[clientConnState]struct{}) error { - if _, ok := allowed[cc.state]; ok { - return nil - } - - allowedList := make([]fmt.Stringer, len(allowed)) - i := 0 - for a := range allowed { - allowedList[i] = a - i++ - } - - return liberrors.ErrClientInvalidState{AllowedList: allowedList, State: cc.state} -} - -func (cc *ClientConn) switchProtocolIfTimeout(err error) error { - if *cc.protocol != TransportUDP || - cc.state != clientConnStatePlay || - !isErrNOUDPPacketsReceivedRecently(err) || - cc.c.Transport != nil { - return err - } - - prevBaseURL := cc.streamBaseURL - oldUseGetParameter := cc.useGetParameter - prevTracks := cc.tracks - - cc.reset(true) - - v := TransportTCP - cc.protocol = &v - cc.useGetParameter = oldUseGetParameter - cc.scheme = prevBaseURL.Scheme - cc.host = prevBaseURL.Host - - for _, track := range prevTracks { - _, err := cc.doSetup(headers.TransportModePlay, prevBaseURL, track.track, 0, 0) - if err != nil { - return err - } - } - - _, err = cc.doPlay(cc.lastRange, true) - if err != nil { - return err - } - - return nil -} - -func (cc *ClientConn) pullReadCB() func(int, StreamType, []byte) { - cc.readCBMutex.RLock() - defer cc.readCBMutex.RUnlock() - return cc.readCB -} - -func (cc *ClientConn) backgroundStart(isSwitchingProtocol bool) { - cc.writeMutex.Lock() - cc.writeFrameAllowed = true - cc.writeMutex.Unlock() - - cc.backgroundRunning = true - cc.backgroundTerminate = make(chan struct{}) - cc.backgroundInnerDone = make(chan error) - - if !isSwitchingProtocol { - cc.backgroundDone = make(chan struct{}) - } - - go cc.runBackground() -} - -func (cc *ClientConn) backgroundClose(isSwitchingProtocol bool) { - close(cc.backgroundTerminate) - err := <-cc.backgroundInnerDone - cc.backgroundRunning = false - - if !isSwitchingProtocol { - cc.backgroundErr = err - close(cc.backgroundDone) - } - - cc.writeMutex.Lock() - cc.writeFrameAllowed = false - cc.writeMutex.Unlock() -} - -func (cc *ClientConn) runBackground() { - cc.backgroundInnerDone <- func() error { - if cc.state == clientConnStatePlay { - if *cc.protocol == TransportUDP || *cc.protocol == TransportUDPMulticast { - return cc.runBackgroundPlayUDP() - } - return cc.runBackgroundPlayTCP() - } - - if *cc.protocol == TransportUDP { - return cc.runBackgroundRecordUDP() - } - return cc.runBackgroundRecordTCP() - }() -} - -func (cc *ClientConn) runBackgroundPlayUDP() error { - for _, cct := range cc.tracks { - cct.udpRTPListener.start() - cct.udpRTCPListener.start() - } - - defer func() { - for _, cct := range cc.tracks { - cct.udpRTPListener.stop() - cct.udpRTCPListener.stop() - } - }() - - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(cc.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(cc.c.receiverReportPeriod) - defer reportTicker.Stop() - - keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod) - defer keepaliveTicker.Stop() - - checkStreamInitial := true - checkStreamTicker := time.NewTicker(cc.c.InitialUDPReadTimeout) - defer func() { - checkStreamTicker.Stop() - }() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - rr := cct.rtcpReceiver.Report(now) - cc.WritePacketRTCP(trackID, rr) - } - - case <-keepaliveTicker.C: - _, err := cc.do(&base.Request{ - Method: func() base.Method { - if cc.useGetParameter { - return base.GetParameter - } - return base.Options - }(), - URL: cc.streamBaseURL, - }, true) - if err != nil { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return err - } - - case <-checkStreamTicker.C: - if checkStreamInitial { - // check that at least one packet has been received - inTimeout := func() bool { - for _, cct := range cc.tracks { - lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime) - if lft != 0 { - return false - } - - lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime) - if lft != 0 { - return false - } - } - return true - }() - if inTimeout { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientNoUDPPacketsRecently{} - } - - checkStreamInitial = false - checkStreamTicker.Stop() - checkStreamTicker = time.NewTicker(clientConnCheckStreamPeriod) - } else { - inTimeout := func() bool { - now := time.Now() - for _, cct := range cc.tracks { - lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0) - if now.Sub(lft) < cc.c.ReadTimeout { - return false - } - - lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0) - if now.Sub(lft) < cc.c.ReadTimeout { - return false - } - } - return true - }() - if inTimeout { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientUDPTimeout{} - } - } - - case err := <-readerDone: - return err - } - } -} - -func (cc *ClientConn) runBackgroundPlayTCP() error { - // for some reason, SetReadDeadline() must always be called in the same - // goroutine, otherwise Read() freezes. - // therefore, we disable the deadline and perform check with a ticker. - cc.nconn.SetReadDeadline(time.Time{}) - - lastFrameTime := time.Now().Unix() - - readerDone := make(chan error) - go func() { - for { - frame := base.InterleavedFrame{ - Payload: cc.tcpFrameBuffer.Next(), - } - err := frame.Read(cc.br) - if err != nil { - readerDone <- err - return - } - - channel := frame.Channel - streamType := StreamTypeRTP - if (channel % 2) != 0 { - channel-- - streamType = StreamTypeRTCP - } - - trackID, ok := cc.tracksByChannel[channel] - if !ok { - continue - } - - now := time.Now() - atomic.StoreInt64(&lastFrameTime, now.Unix()) - - if streamType == StreamTypeRTP { - cc.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload) - } else { - cc.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload) - } - - cc.pullReadCB()(trackID, streamType, frame.Payload) - } - }() - - reportTicker := time.NewTicker(cc.c.receiverReportPeriod) - defer reportTicker.Stop() - - checkStreamTicker := time.NewTicker(clientConnCheckStreamPeriod) - defer checkStreamTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - rr := cct.rtcpReceiver.Report(now) - cc.WritePacketRTCP(trackID, rr) - } - - case <-checkStreamTicker.C: - inTimeout := func() bool { - now := time.Now() - lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0) - return now.Sub(lft) >= cc.c.ReadTimeout - }() - if inTimeout { - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return liberrors.ErrClientTCPTimeout{} - } - - case err := <-readerDone: - return err - } - } -} - -func (cc *ClientConn) runBackgroundRecordUDP() error { - for _, cct := range cc.tracks { - cct.udpRTPListener.start() - cct.udpRTCPListener.start() - } - - defer func() { - for _, cct := range cc.tracks { - cct.udpRTPListener.stop() - cct.udpRTCPListener.stop() - } - }() - - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - var res base.Response - err := res.Read(cc.br) - if err != nil { - readerDone <- err - return - } - } - }() - - reportTicker := time.NewTicker(cc.c.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - cc.WritePacketRTCP(trackID, sr) - } - } - - case err := <-readerDone: - return err - } - } -} - -func (cc *ClientConn) runBackgroundRecordTCP() error { - // disable deadline - cc.nconn.SetReadDeadline(time.Time{}) - - readerDone := make(chan error) - go func() { - for { - frame := base.InterleavedFrame{ - Payload: cc.tcpFrameBuffer.Next(), - } - err := frame.Read(cc.br) - if err != nil { - readerDone <- err - return - } - - channel := frame.Channel - streamType := StreamTypeRTP - if (channel % 2) != 0 { - channel-- - streamType = StreamTypeRTCP - } - - trackID, ok := cc.tracksByChannel[channel] - if !ok { - continue - } - - cc.pullReadCB()(trackID, streamType, frame.Payload) - } - }() - - reportTicker := time.NewTicker(cc.c.senderReportPeriod) - defer reportTicker.Stop() - - for { - select { - case <-cc.backgroundTerminate: - cc.nconn.SetReadDeadline(time.Now()) - <-readerDone - return fmt.Errorf("terminated") - - case <-reportTicker.C: - now := time.Now() - for trackID, cct := range cc.tracks { - sr := cct.rtcpSender.Report(now) - if sr != nil { - cc.WritePacketRTCP(trackID, sr) - } - } - - case err := <-readerDone: - return err - } - } -} - -func (cc *ClientConn) connOpen() error { - if cc.scheme != "rtsp" && cc.scheme != "rtsps" { - return fmt.Errorf("unsupported scheme '%s'", cc.scheme) - } - - if cc.scheme == "rtsps" && cc.c.Transport != nil && *cc.c.Transport != TransportTCP { - return fmt.Errorf("RTSPS can be used only with TCP") - } - - if !strings.Contains(cc.host, ":") { - cc.host += ":554" - } - - ctx, cancel := context.WithTimeout(cc.ctx, cc.c.ReadTimeout) - defer cancel() - - nconn, err := cc.c.DialContext(ctx, "tcp", cc.host) - if err != nil { - return err - } - - conn := func() net.Conn { - if cc.scheme == "rtsps" { - return tls.Client(nconn, cc.c.TLSConfig) - } - return nconn - }() - - cc.nconn = nconn - cc.br = bufio.NewReaderSize(conn, clientConnReadBufferSize) - cc.bw = bufio.NewWriterSize(conn, clientConnWriteBufferSize) - return nil -} - -func (cc *ClientConn) do(req *base.Request, skipResponse bool) (*base.Response, error) { - if cc.nconn == nil { - err := cc.connOpen() - if err != nil { - return nil, err - } - } - - if req.Header == nil { - req.Header = make(base.Header) - } - - if cc.session != "" { - req.Header["Session"] = base.HeaderValue{cc.session} - } - - if cc.sender != nil { - cc.sender.AddAuthorization(req) - } - - cc.cseq++ - req.Header["CSeq"] = base.HeaderValue{strconv.FormatInt(int64(cc.cseq), 10)} - - req.Header["User-Agent"] = base.HeaderValue{"gortsplib"} - - if cc.c.OnRequest != nil { - cc.c.OnRequest(req) - } - - var res base.Response - - err := func() error { - // the only two do() with skipResponses are - // - TEARDOWN -> ctx is already canceled, so this can't be used - // - keepalives -> if ctx is canceled during a keepalive, - // it's better not to stop the request, but wait until teardown - if !skipResponse { - ctxHandlerDone := make(chan struct{}) - defer func() { <-ctxHandlerDone }() - - ctxHandlerTerminate := make(chan struct{}) - defer close(ctxHandlerTerminate) - - go func() { - defer close(ctxHandlerDone) - select { - case <-cc.ctx.Done(): - cc.nconn.Close() - case <-ctxHandlerTerminate: - } - }() - } - - cc.nconn.SetWriteDeadline(time.Now().Add(cc.c.WriteTimeout)) - err := req.Write(cc.bw) - if err != nil { - return err - } - - if skipResponse { - return nil - } - - cc.nconn.SetReadDeadline(time.Now().Add(cc.c.ReadTimeout)) - - if cc.tcpFrameBuffer != nil { - // read the response and ignore interleaved frames in between; - // interleaved frames are sent in two scenarios: - // * when the server is v4lrtspserver, before the PLAY response - // * when the stream is already playing - err = res.ReadIgnoreFrames(cc.br, cc.tcpFrameBuffer.Next()) - if err != nil { - return err - } - } else { - err = res.Read(cc.br) - if err != nil { - return err - } - } - - return nil - }() - if err != nil { - return nil, err - } - - if cc.c.OnResponse != nil { - cc.c.OnResponse(&res) - } - - // get session from response - if v, ok := res.Header["Session"]; ok { - var sx headers.Session - err := sx.Read(v) - if err != nil { - return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err} - } - cc.session = sx.Session - } - - // if required, send request again with authentication - if res.StatusCode == base.StatusUnauthorized && req.URL.User != nil && cc.sender == nil { - pass, _ := req.URL.User.Password() - user := req.URL.User.Username() - - sender, err := auth.NewSender(res.Header["WWW-Authenticate"], user, pass) - if err != nil { - return nil, fmt.Errorf("unable to setup authentication: %s", err) - } - cc.sender = sender - - return cc.do(req, false) - } - - return &res, nil -} - -func (cc *ClientConn) doOptions(u *base.URL) (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStateInitial: {}, - clientConnStatePrePlay: {}, - clientConnStatePreRecord: {}, - }) - if err != nil { - return nil, err - } - - res, err := cc.do(&base.Request{ - Method: base.Options, - URL: u, - }, false) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - // since this method is not implemented by every RTSP server, - // return only if status code is not 404 - if res.StatusCode == base.StatusNotFound { - return res, nil - } - return res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} - } - - cc.useGetParameter = func() bool { - pub, ok := res.Header["Public"] - if !ok || len(pub) != 1 { - return false - } - - for _, m := range strings.Split(pub[0], ",") { - if base.Method(strings.Trim(m, " ")) == base.GetParameter { - return true - } - } - return false - }() - - return res, nil -} - -// Options writes an OPTIONS request and reads a response. -func (cc *ClientConn) Options(u *base.URL) (*base.Response, error) { - cres := make(chan clientRes) - select { - case cc.options <- optionsReq{url: u, res: cres}: - res := <-cres - return res.res, res.err - - case <-cc.ctx.Done(): - return nil, liberrors.ErrClientTerminated{} - } -} - -func (cc *ClientConn) doDescribe(u *base.URL) (Tracks, *base.URL, *base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStateInitial: {}, - clientConnStatePrePlay: {}, - clientConnStatePreRecord: {}, - }) - if err != nil { - return nil, nil, nil, err - } - - res, err := cc.do(&base.Request{ - Method: base.Describe, - URL: u, - Header: base.Header{ - "Accept": base.HeaderValue{"application/sdp"}, - }, - }, false) - if err != nil { - return nil, nil, nil, err - } - - if res.StatusCode != base.StatusOK { - // redirect - if !cc.c.RedirectDisable && - res.StatusCode >= base.StatusMovedPermanently && - res.StatusCode <= base.StatusUseProxy && - len(res.Header["Location"]) == 1 { - cc.reset(false) - - u, err := base.ParseURL(res.Header["Location"][0]) - if err != nil { - return nil, nil, nil, err - } - - cc.scheme = u.Scheme - cc.host = u.Host - - _, err = cc.doOptions(u) - if err != nil { - return nil, nil, nil, err - } - - return cc.doDescribe(u) - } - - return nil, nil, res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} - } - - ct, ok := res.Header["Content-Type"] - if !ok || len(ct) != 1 { - return nil, nil, nil, liberrors.ErrClientContentTypeMissing{} - } - - // strip encoding information from Content-Type header - ct = base.HeaderValue{strings.Split(ct[0], ";")[0]} - - if ct[0] != "application/sdp" { - return nil, nil, nil, liberrors.ErrClientContentTypeUnsupported{CT: ct} - } - - baseURL, err := func() (*base.URL, error) { - // use Content-Base - if cb, ok := res.Header["Content-Base"]; ok { - if len(cb) != 1 { - return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) - } - - ret, err := base.ParseURL(cb[0]) - if err != nil { - return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) - } - - // add credentials from URL of request - ret.User = u.User - - return ret, nil - } - - // if not provided, use URL of request - return u, nil - }() - if err != nil { - return nil, nil, nil, err - } - - tracks, err := ReadTracks(res.Body) - if err != nil { - return nil, nil, nil, err - } - - return tracks, baseURL, res, nil -} - -// Describe writes a DESCRIBE request and reads a Response. -func (cc *ClientConn) Describe(u *base.URL) (Tracks, *base.URL, *base.Response, error) { - cres := make(chan clientRes) - select { - case cc.describe <- describeReq{url: u, res: cres}: - res := <-cres - return res.tracks, res.baseURL, res.res, res.err - - case <-cc.ctx.Done(): - return nil, nil, nil, liberrors.ErrClientTerminated{} - } -} - -func (cc *ClientConn) doAnnounce(u *base.URL, tracks Tracks) (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStateInitial: {}, - }) - if err != nil { - return nil, err - } - - // in case of ANNOUNCE, the base URL doesn't have a trailing slash. - // (tested with ffmpeg and gstreamer) - baseURL := u.Clone() - - for i, t := range tracks { - if !t.hasControlAttribute() { - t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ - Key: "control", - Value: "trackID=" + strconv.FormatInt(int64(i), 10), - }) - } - } - - res, err := cc.do(&base.Request{ - Method: base.Announce, - URL: u, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Body: tracks.Write(), - }, false) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, liberrors.ErrClientBadStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage, - } - } - - cc.streamBaseURL = baseURL - cc.state = clientConnStatePreRecord - - return res, nil -} - -// Announce writes an ANNOUNCE request and reads a Response. -func (cc *ClientConn) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { - cres := make(chan clientRes) - select { - case cc.announce <- announceReq{url: u, tracks: tracks, res: cres}: - res := <-cres - return res.res, res.err - - case <-cc.ctx.Done(): - return nil, liberrors.ErrClientTerminated{} - } -} - -func (cc *ClientConn) doSetup( - mode headers.TransportMode, - baseURL *base.URL, - track *Track, - rtpPort int, - rtcpPort int) (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStateInitial: {}, - clientConnStatePrePlay: {}, - clientConnStatePreRecord: {}, - }) - if err != nil { - return nil, err - } - - if (mode == headers.TransportModeRecord && cc.state != clientConnStatePreRecord) || - (mode == headers.TransportModePlay && cc.state != clientConnStatePrePlay && - cc.state != clientConnStateInitial) { - return nil, liberrors.ErrClientCannotReadPublishAtSameTime{} - } - - if cc.streamBaseURL != nil && *baseURL != *cc.streamBaseURL { - return nil, liberrors.ErrClientCannotSetupTracksDifferentURLs{} - } - - var rtpListener *clientUDPListener - var rtcpListener *clientUDPListener - - // always use TCP if encrypted - if cc.scheme == "rtsps" { - v := TransportTCP - cc.protocol = &v - } - - proto := func() Transport { - // protocol set by previous Setup() or switchProtocolIfTimeout() - if cc.protocol != nil { - return *cc.protocol - } - - // protocol set by conf - if cc.c.Transport != nil { - return *cc.c.Transport - } - - // try UDP - return TransportUDP - }() - - th := headers.Transport{ - Mode: &mode, - } - - trackID := len(cc.tracks) - - switch proto { - case TransportUDP: - if (rtpPort == 0 && rtcpPort != 0) || - (rtpPort != 0 && rtcpPort == 0) { - return nil, liberrors.ErrClientUDPPortsZero{} - } - - if rtpPort != 0 && rtcpPort != (rtpPort+1) { - return nil, liberrors.ErrClientUDPPortsNotConsecutive{} - } - - var err error - if rtpPort != 0 { - rtpListener, err = newClientUDPListener(cc, false, ":"+strconv.FormatInt(int64(rtpPort), 10)) - if err != nil { - return nil, err - } - - rtcpListener, err = newClientUDPListener(cc, false, ":"+strconv.FormatInt(int64(rtcpPort), 10)) - if err != nil { - rtpListener.close() - return nil, err - } - } else { - rtpListener, rtcpListener = newClientUDPListenerPair(cc) - } - - v1 := headers.TransportDeliveryUnicast - th.Delivery = &v1 - th.Protocol = headers.TransportProtocolUDP - th.ClientPorts = &[2]int{ - rtpListener.port(), - rtcpListener.port(), - } - - case TransportUDPMulticast: - v1 := headers.TransportDeliveryMulticast - th.Delivery = &v1 - th.Protocol = headers.TransportProtocolUDP - - case TransportTCP: - v1 := headers.TransportDeliveryUnicast - th.Delivery = &v1 - th.Protocol = headers.TransportProtocolTCP - th.InterleavedIDs = &[2]int{(trackID * 2), (trackID * 2) + 1} - } - - trackURL, err := track.URL(baseURL) - if err != nil { - if proto == TransportUDP { - rtpListener.close() - rtcpListener.close() - } - return nil, err - } - - res, err := cc.do(&base.Request{ - Method: base.Setup, - URL: trackURL, - Header: base.Header{ - "Transport": th.Write(), - }, - }, false) - if err != nil { - if proto == TransportUDP { - rtpListener.close() - rtcpListener.close() - } - return nil, err - } - - if res.StatusCode != base.StatusOK { - if proto == TransportUDP { - rtpListener.close() - rtcpListener.close() - } - - // switch protocol automatically - if res.StatusCode == base.StatusUnsupportedTransport && - cc.protocol == nil && - cc.c.Transport == nil { - v := TransportTCP - cc.protocol = &v - - return cc.doSetup(mode, baseURL, track, 0, 0) - } - - return res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} - } - - var thRes headers.Transport - err = thRes.Read(res.Header["Transport"]) - if err != nil { - if proto == TransportUDP { - rtpListener.close() - rtcpListener.close() - } - return nil, liberrors.ErrClientTransportHeaderInvalid{Err: err} - } - - switch proto { - case TransportUDP: - if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { - return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} - } - - if !cc.c.AnyPortEnable { - if thRes.ServerPorts == nil || isAnyPort(thRes.ServerPorts[0]) || isAnyPort(thRes.ServerPorts[1]) { - rtpListener.close() - rtcpListener.close() - return nil, liberrors.ErrClientServerPortsNotProvided{} - } - } - - case TransportUDPMulticast: - if thRes.Delivery == nil || *thRes.Delivery != headers.TransportDeliveryMulticast { - return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} - } - - if thRes.Ports == nil { - return nil, liberrors.ErrClientTransportHeaderNoPorts{} - } - - if thRes.Destination == nil { - return nil, liberrors.ErrClientTransportHeaderNoDestination{} - } - - rtpListener, err = newClientUDPListener(cc, true, - thRes.Destination.String()+":"+strconv.FormatInt(int64(thRes.Ports[0]), 10)) - if err != nil { - return nil, err - } - - rtcpListener, err = newClientUDPListener(cc, true, - thRes.Destination.String()+":"+strconv.FormatInt(int64(thRes.Ports[1]), 10)) - if err != nil { - rtpListener.close() - return nil, err - } - - case TransportTCP: - if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { - return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} - } - - if thRes.InterleavedIDs == nil { - return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{} - } - - if (thRes.InterleavedIDs[0]%2) != 0 || - (thRes.InterleavedIDs[0]+1) != thRes.InterleavedIDs[1] { - return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{} - } - - if _, ok := cc.tracksByChannel[thRes.InterleavedIDs[0]]; ok { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, liberrors.ErrClientTransportHeaderInterleavedIDsAlreadyUsed{} - } - } - - clockRate, _ := track.ClockRate() - cct := clientConnTrack{ - track: track, - } - - if mode == headers.TransportModePlay { - cc.state = clientConnStatePrePlay - cct.rtcpReceiver = rtcpreceiver.New(nil, clockRate) - } else { - cc.state = clientConnStatePreRecord - cct.rtcpSender = rtcpsender.New(clockRate) - } - - cc.streamBaseURL = baseURL - cc.protocol = &proto - - switch proto { - case TransportUDP: - rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP - rtpListener.remoteWriteIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP - rtpListener.remoteZone = cc.nconn.RemoteAddr().(*net.TCPAddr).Zone - if thRes.ServerPorts != nil { - rtpListener.remotePort = thRes.ServerPorts[0] - } - rtpListener.trackID = trackID - rtpListener.streamType = StreamTypeRTP - cct.udpRTPListener = rtpListener - - rtcpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP - rtcpListener.remoteWriteIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP - rtcpListener.remoteZone = cc.nconn.RemoteAddr().(*net.TCPAddr).Zone - if thRes.ServerPorts != nil { - rtcpListener.remotePort = thRes.ServerPorts[1] - } - rtcpListener.trackID = trackID - rtcpListener.streamType = StreamTypeRTCP - cct.udpRTCPListener = rtcpListener - - case TransportUDPMulticast: - rtpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP - rtpListener.remoteWriteIP = *thRes.Destination - rtpListener.remoteZone = "" - rtpListener.remotePort = thRes.Ports[0] - rtpListener.trackID = trackID - rtpListener.streamType = StreamTypeRTP - cct.udpRTPListener = rtpListener - - rtcpListener.remoteReadIP = cc.nconn.RemoteAddr().(*net.TCPAddr).IP - rtcpListener.remoteWriteIP = *thRes.Destination - rtcpListener.remoteZone = "" - rtcpListener.remotePort = thRes.Ports[1] - rtcpListener.trackID = trackID - rtcpListener.streamType = StreamTypeRTCP - cct.udpRTCPListener = rtcpListener - - case TransportTCP: - if cc.tcpFrameBuffer == nil { - cc.tcpFrameBuffer = multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize)) - } - - if cc.tracksByChannel == nil { - cc.tracksByChannel = make(map[int]int) - } - - cc.tracksByChannel[thRes.InterleavedIDs[0]] = trackID - - cct.tcpChannel = thRes.InterleavedIDs[0] - } - - if cc.tracks == nil { - cc.tracks = make(map[int]clientConnTrack) - } - - cc.tracks[trackID] = cct - - return res, nil -} - -// Setup writes a SETUP request and reads a Response. -// rtpPort and rtcpPort are used only if protocol is UDP. -// if rtpPort and rtcpPort are zero, they are chosen automatically. -func (cc *ClientConn) Setup( - mode headers.TransportMode, - baseURL *base.URL, - track *Track, - rtpPort int, - rtcpPort int) (*base.Response, error) { - cres := make(chan clientRes) - select { - case cc.setup <- setupReq{ - mode: mode, - baseURL: baseURL, - track: track, - rtpPort: rtpPort, - rtcpPort: rtcpPort, - res: cres, - }: - res := <-cres - return res.res, res.err - - case <-cc.ctx.Done(): - return nil, liberrors.ErrClientTerminated{} - } -} - -func (cc *ClientConn) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePrePlay: {}, - }) - if err != nil { - return nil, err - } - - // open the firewall by sending packets to the counterpart. - // do this before sending the PLAY request. - if *cc.protocol == TransportUDP { - for _, cct := range cc.tracks { - cct.udpRTPListener.write( - []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) - - cct.udpRTCPListener.write( - []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) - } - } - - header := make(base.Header) - - // Range is mandatory in Parrot Streaming Server - if ra == nil { - ra = &headers.Range{ - Value: &headers.RangeNPT{ - Start: headers.RangeNPTTime(0), - }, - } - } - header["Range"] = ra.Write() - - res, err := cc.do(&base.Request{ - Method: base.Play, - URL: cc.streamBaseURL, - Header: header, - }, false) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, liberrors.ErrClientBadStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage, - } - } - - cc.state = clientConnStatePlay - cc.lastRange = ra - - if !isSwitchingProtocol { - // use a temporary callback that is replaces as soon as - // the user calls ReadFrames() - cc.readCBSet = make(chan struct{}) - copy := cc.readCBSet - cc.readCB = func(trackID int, streamType StreamType, payload []byte) { - select { - case <-copy: - case <-cc.ctx.Done(): - return - } - cc.pullReadCB()(trackID, streamType, payload) - } - } - - cc.backgroundStart(isSwitchingProtocol) - - return res, nil -} - -// Play writes a PLAY request and reads a Response. -// This can be called only after Setup(). -func (cc *ClientConn) Play(ra *headers.Range) (*base.Response, error) { - cres := make(chan clientRes) - select { - case cc.play <- playReq{ra: ra, res: cres}: - res := <-cres - return res.res, res.err - - case <-cc.ctx.Done(): - return nil, liberrors.ErrClientTerminated{} - } -} - -func (cc *ClientConn) doRecord() (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePreRecord: {}, - }) - if err != nil { - return nil, err - } - - res, err := cc.do(&base.Request{ - Method: base.Record, - URL: cc.streamBaseURL, - }, false) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, liberrors.ErrClientBadStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage, - } - } - - cc.state = clientConnStateRecord - - // when publishing, calling ReadFrames() is not mandatory - // use an empty callback - cc.readCB = func(trackID int, streamType StreamType, payload []byte) { - } - - cc.backgroundStart(false) - - return nil, nil -} - -// Record writes a RECORD request and reads a Response. -// This can be called only after Announce() and Setup(). -func (cc *ClientConn) Record() (*base.Response, error) { - cres := make(chan clientRes) - select { - case cc.record <- recordReq{res: cres}: - res := <-cres - return res.res, res.err - - case <-cc.ctx.Done(): - return nil, liberrors.ErrClientTerminated{} - } -} - -func (cc *ClientConn) doPause() (*base.Response, error) { - err := cc.checkState(map[clientConnState]struct{}{ - clientConnStatePlay: {}, - clientConnStateRecord: {}, - }) - if err != nil { - return nil, err - } - - cc.backgroundClose(false) - - res, err := cc.do(&base.Request{ - Method: base.Pause, - URL: cc.streamBaseURL, - }, false) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return res, liberrors.ErrClientBadStatusCode{ - Code: res.StatusCode, Message: res.StatusMessage, - } - } - - switch cc.state { - case clientConnStatePlay: - cc.state = clientConnStatePrePlay - case clientConnStateRecord: - cc.state = clientConnStatePreRecord - } - - return res, nil -} - -// Pause writes a PAUSE request and reads a Response. -// This can be called only after Play() or Record(). -func (cc *ClientConn) Pause() (*base.Response, error) { - cres := make(chan clientRes) - select { - case cc.pause <- pauseReq{res: cres}: - res := <-cres - return res.res, res.err - - case <-cc.ctx.Done(): - return nil, liberrors.ErrClientTerminated{} - } -} - -// Seek asks the server to re-start the stream from a specific timestamp. -func (cc *ClientConn) Seek(ra *headers.Range) (*base.Response, error) { - _, err := cc.Pause() - if err != nil { - return nil, err - } - - return cc.Play(ra) -} - -// ReadFrames starts reading frames. -func (cc *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) error { - cc.readCBMutex.Lock() - cc.readCB = onFrame - cc.readCBMutex.Unlock() - - // replace temporary callback with final callback - if cc.readCBSet != nil { - close(cc.readCBSet) - cc.readCBSet = nil - } - - <-cc.backgroundDone - return cc.backgroundErr -} - -// WritePacketRTP writes a RTP packet. -func (cc *ClientConn) WritePacketRTP(trackID int, payload []byte) error { - now := time.Now() - - cc.writeMutex.RLock() - defer cc.writeMutex.RUnlock() - - if !cc.writeFrameAllowed { - return cc.backgroundErr - } - - if cc.tracks[trackID].rtcpSender != nil { - cc.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload) - } - - switch *cc.protocol { - case TransportUDP, TransportUDPMulticast: - return cc.tracks[trackID].udpRTPListener.write(payload) - - default: // TCP - channel := cc.tracks[trackID].tcpChannel - - cc.tcpWriteMutex.Lock() - defer cc.tcpWriteMutex.Unlock() - - cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout)) - return base.InterleavedFrame{ - Channel: channel, - Payload: payload, - }.Write(cc.bw) - } -} - -// WritePacketRTCP writes a RTCP packet. -func (cc *ClientConn) WritePacketRTCP(trackID int, payload []byte) error { - now := time.Now() - - cc.writeMutex.RLock() - defer cc.writeMutex.RUnlock() - - if !cc.writeFrameAllowed { - return cc.backgroundErr - } - - if cc.tracks[trackID].rtcpSender != nil { - cc.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload) - } - - switch *cc.protocol { - case TransportUDP, TransportUDPMulticast: - return cc.tracks[trackID].udpRTCPListener.write(payload) - - default: // TCP - channel := cc.tracks[trackID].tcpChannel - channel++ - - cc.tcpWriteMutex.Lock() - defer cc.tcpWriteMutex.Unlock() - - cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout)) - return base.InterleavedFrame{ - Channel: channel, - Payload: payload, - }.Write(cc.bw) - } -} diff --git a/clientudpl.go b/clientudpl.go index 1b2aab29..81e49822 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -15,7 +15,7 @@ import ( const ( // use the same buffer size as gstreamer's rtspsrc - clientConnUDPKernelReadBufferSize = 0x80000 + clientUDPKernelReadBufferSize = 0x80000 ) func randUint32() uint32 { @@ -29,7 +29,7 @@ func randIntn(n int) int { } type clientUDPListener struct { - cc *ClientConn + c *Client pc *net.UDPConn remoteReadIP net.IP remoteWriteIP net.IP @@ -46,18 +46,18 @@ type clientUDPListener struct { done chan struct{} } -func newClientUDPListenerPair(cc *ClientConn) (*clientUDPListener, *clientUDPListener) { +func newClientUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener) { // choose two consecutive ports in range 65535-10000 // rtp must be even and rtcp odd for { rtpPort := (randIntn((65535-10000)/2) * 2) + 10000 - rtpListener, err := newClientUDPListener(cc, false, ":"+strconv.FormatInt(int64(rtpPort), 10)) + rtpListener, err := newClientUDPListener(c, false, ":"+strconv.FormatInt(int64(rtpPort), 10)) if err != nil { continue } rtcpPort := rtpPort + 1 - rtcpListener, err := newClientUDPListener(cc, false, ":"+strconv.FormatInt(int64(rtcpPort), 10)) + rtcpListener, err := newClientUDPListener(c, false, ":"+strconv.FormatInt(int64(rtcpPort), 10)) if err != nil { rtpListener.close() continue @@ -67,7 +67,7 @@ func newClientUDPListenerPair(cc *ClientConn) (*clientUDPListener, *clientUDPLis } } -func newClientUDPListener(cc *ClientConn, multicast bool, address string) (*clientUDPListener, error) { +func newClientUDPListener(c *Client, multicast bool, address string) (*clientUDPListener, error) { var pc *net.UDPConn if multicast { host, port, err := net.SplitHostPort(address) @@ -75,7 +75,7 @@ func newClientUDPListener(cc *ClientConn, multicast bool, address string) (*clie return nil, err } - tmp, err := cc.c.ListenPacket("udp", "224.0.0.0:"+port) + tmp, err := c.ListenPacket("udp", "224.0.0.0:"+port) if err != nil { return nil, err } @@ -101,22 +101,22 @@ func newClientUDPListener(cc *ClientConn, multicast bool, address string) (*clie pc = tmp.(*net.UDPConn) } else { - tmp, err := cc.c.ListenPacket("udp", address) + tmp, err := c.ListenPacket("udp", address) if err != nil { return nil, err } pc = tmp.(*net.UDPConn) } - err := pc.SetReadBuffer(clientConnUDPKernelReadBufferSize) + err := pc.SetReadBuffer(clientUDPKernelReadBufferSize) if err != nil { return nil, err } return &clientUDPListener{ - cc: cc, + c: c, pc: pc, - frameBuffer: multibuffer.New(uint64(cc.c.ReadBufferCount), uint64(cc.c.ReadBufferSize)), + frameBuffer: multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)), lastFrameTime: func() *int64 { v := int64(0) return &v @@ -150,7 +150,7 @@ func (l *clientUDPListener) stop() { func (l *clientUDPListener) run() { defer close(l.done) - if l.cc.state == clientConnStatePlay { + if l.c.state == clientStatePlay { for { buf := l.frameBuffer.Next() n, addr, err := l.pc.ReadFrom(buf) @@ -168,12 +168,12 @@ func (l *clientUDPListener) run() { atomic.StoreInt64(l.lastFrameTime, now.Unix()) if l.streamType == StreamTypeRTP { - l.cc.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n]) + l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n]) } else { - l.cc.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n]) + l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n]) } - l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n]) + l.c.pullReadCB()(l.trackID, l.streamType, buf[:n]) } } else { // record for { @@ -191,7 +191,7 @@ func (l *clientUDPListener) run() { now := time.Now() atomic.StoreInt64(l.lastFrameTime, now.Unix()) - l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n]) + l.c.pullReadCB()(l.trackID, l.streamType, buf[:n]) } } } @@ -200,7 +200,7 @@ func (l *clientUDPListener) write(buf []byte) error { l.writeMutex.Lock() defer l.writeMutex.Unlock() - l.pc.SetWriteDeadline(time.Now().Add(l.cc.c.WriteTimeout)) + l.pc.SetWriteDeadline(time.Now().Add(l.c.WriteTimeout)) _, err := l.pc.WriteTo(buf, &net.UDPAddr{ IP: l.remoteWriteIP, Zone: l.remoteZone, diff --git a/examples/client-publish-aac/main.go b/examples/client-publish-aac/main.go index 1b95dfab..39f57a61 100644 --- a/examples/client-publish-aac/main.go +++ b/examples/client-publish-aac/main.go @@ -42,12 +42,12 @@ func main() { c := gortsplib.Client{} // connect to the server and start publishing the track - conn, err := c.DialPublish("rtsp://localhost:8554/mystream", + err = c.DialPublish("rtsp://localhost:8554/mystream", gortsplib.Tracks{track}) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() buf = make([]byte, 2048) for { @@ -58,7 +58,7 @@ func main() { } // route RTP packets to the server - err = conn.WritePacketRTP(0, buf[:n]) + err = c.WritePacketRTP(0, buf[:n]) if err != nil { panic(err) } diff --git a/examples/client-publish-h264/main.go b/examples/client-publish-h264/main.go index f60fcb54..c8261ea8 100644 --- a/examples/client-publish-h264/main.go +++ b/examples/client-publish-h264/main.go @@ -43,12 +43,12 @@ func main() { c := gortsplib.Client{} // connect to the server and start publishing the track - conn, err := c.DialPublish("rtsp://localhost:8554/mystream", + err = c.DialPublish("rtsp://localhost:8554/mystream", gortsplib.Tracks{track}) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() buf := make([]byte, 2048) for { @@ -59,7 +59,7 @@ func main() { } // route RTP packets to the server - err = conn.WritePacketRTP(0, buf[:n]) + err = c.WritePacketRTP(0, buf[:n]) if err != nil { panic(err) } diff --git a/examples/client-publish-options/main.go b/examples/client-publish-options/main.go index 4e5c3c2c..be6d566f 100644 --- a/examples/client-publish-options/main.go +++ b/examples/client-publish-options/main.go @@ -52,12 +52,12 @@ func main() { } // connect to the server and start publishing the track - conn, err := c.DialPublish("rtsp://localhost:8554/mystream", + err = c.DialPublish("rtsp://localhost:8554/mystream", gortsplib.Tracks{track}) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() buf := make([]byte, 2048) for { @@ -68,7 +68,7 @@ func main() { } // route RTP packets to the server - err = conn.WritePacketRTP(0, buf[:n]) + err = c.WritePacketRTP(0, buf[:n]) if err != nil { panic(err) } diff --git a/examples/client-publish-opus/main.go b/examples/client-publish-opus/main.go index 68747626..020476fd 100644 --- a/examples/client-publish-opus/main.go +++ b/examples/client-publish-opus/main.go @@ -42,12 +42,12 @@ func main() { c := gortsplib.Client{} // connect to the server and start publishing the track - conn, err := c.DialPublish("rtsp://localhost:8554/mystream", + err = c.DialPublish("rtsp://localhost:8554/mystream", gortsplib.Tracks{track}) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() buf = make([]byte, 2048) for { @@ -58,7 +58,7 @@ func main() { } // route RTP packets to the server - err = conn.WritePacketRTP(0, buf[:n]) + err = c.WritePacketRTP(0, buf[:n]) if err != nil { panic(err) } diff --git a/examples/client-publish-pause/main.go b/examples/client-publish-pause/main.go index aa92065c..0719fc22 100644 --- a/examples/client-publish-pause/main.go +++ b/examples/client-publish-pause/main.go @@ -45,12 +45,12 @@ func main() { c := gortsplib.Client{} // connect to the server and start publishing the track - conn, err := c.DialPublish("rtsp://localhost:8554/mystream", + err = c.DialPublish("rtsp://localhost:8554/mystream", gortsplib.Tracks{track}) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() for { writerDone := make(chan struct{}) @@ -66,7 +66,7 @@ func main() { } // route RTP packets to the server - err = conn.WritePacketRTP(0, buf[:n]) + err = c.WritePacketRTP(0, buf[:n]) if err != nil { break } @@ -77,7 +77,7 @@ func main() { time.Sleep(5 * time.Second) // pause - _, err := conn.Pause() + _, err := c.Pause() if err != nil { panic(err) } @@ -89,7 +89,7 @@ func main() { time.Sleep(5 * time.Second) // record again - _, err = conn.Record() + _, err = c.Record() if err != nil { panic(err) } diff --git a/examples/client-query/main.go b/examples/client-query/main.go index 9c32ef1e..2cad2946 100644 --- a/examples/client-query/main.go +++ b/examples/client-query/main.go @@ -19,18 +19,18 @@ func main() { c := gortsplib.Client{} - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() - _, err = conn.Options(u) + _, err = c.Options(u) if err != nil { panic(err) } - tracks, _, _, err := conn.Describe(u) + tracks, _, _, err := c.Describe(u) if err != nil { panic(err) } diff --git a/examples/client-read-h264-save-to-disk/main.go b/examples/client-read-h264-save-to-disk/main.go index 6912f9ac..8c281cf7 100644 --- a/examples/client-read-h264-save-to-disk/main.go +++ b/examples/client-read-h264-save-to-disk/main.go @@ -28,16 +28,16 @@ func main() { c := gortsplib.Client{} // connect to the server and start reading all tracks - conn, err := c.DialRead(inputStream) + err := c.DialRead(inputStream) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() // find the H264 track var h264TrackID int = -1 var h264Conf *gortsplib.TrackConfigH264 - for i, track := range conn.Tracks() { + for i, track := range c.Tracks() { if track.IsH264() { h264TrackID = i h264Conf, err = track.ExtractConfigH264() @@ -74,7 +74,7 @@ func main() { mux.SetPCRPID(256) // read packets - err = conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + err = c.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { if trackID != h264TrackID { return } diff --git a/examples/client-read-h264/main.go b/examples/client-read-h264/main.go index 071dd2dd..3a48b847 100644 --- a/examples/client-read-h264/main.go +++ b/examples/client-read-h264/main.go @@ -17,15 +17,15 @@ func main() { c := gortsplib.Client{} // connect to the server and start reading all tracks - conn, err := c.DialRead("rtsp://localhost:8554/mystream") + err := c.DialRead("rtsp://localhost:8554/mystream") if err != nil { panic(err) } - defer conn.Close() + defer c.Close() // find the H264 track h264Track := func() int { - for i, track := range conn.Tracks() { + for i, track := range c.Tracks() { if track.IsH264() { return i } @@ -41,7 +41,7 @@ func main() { dec := rtph264.NewDecoder() // read packets - err = conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + err = c.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { if streamType != gortsplib.StreamTypeRTP { return } diff --git a/examples/client-read-options/main.go b/examples/client-read-options/main.go index 1035092b..a094d555 100644 --- a/examples/client-read-options/main.go +++ b/examples/client-read-options/main.go @@ -23,14 +23,14 @@ func main() { } // connect to the server and start reading all tracks - conn, err := c.DialRead("rtsp://localhost:8554/mystream") + err := c.DialRead("rtsp://localhost:8554/mystream") if err != nil { panic(err) } - defer conn.Close() + defer c.Close() // read packets - err = conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + err = c.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { fmt.Printf("packet from track %d, type %v, size %d\n", trackID, streamType, len(payload)) }) panic(err) diff --git a/examples/client-read-partial/main.go b/examples/client-read-partial/main.go index ec41c140..a8cf7a95 100644 --- a/examples/client-read-partial/main.go +++ b/examples/client-read-partial/main.go @@ -21,18 +21,18 @@ func main() { c := gortsplib.Client{} - conn, err := c.Dial(u.Scheme, u.Host) + err = c.Dial(u.Scheme, u.Host) if err != nil { panic(err) } - defer conn.Close() + defer c.Close() - _, err = conn.Options(u) + _, err = c.Options(u) if err != nil { panic(err) } - tracks, baseURL, _, err := conn.Describe(u) + tracks, baseURL, _, err := c.Describe(u) if err != nil { panic(err) } @@ -40,7 +40,7 @@ func main() { // start reading only video tracks, skipping audio or application tracks for _, t := range tracks { if t.Media.MediaName.Media == "video" { - _, err := conn.Setup(headers.TransportModePlay, baseURL, t, 0, 0) + _, err := c.Setup(headers.TransportModePlay, baseURL, t, 0, 0) if err != nil { panic(err) } @@ -48,13 +48,13 @@ func main() { } // play setupped tracks - _, err = conn.Play(nil) + _, err = c.Play(nil) if err != nil { panic(err) } // read packets - err = conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + err = c.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { fmt.Printf("packet from track %d, type %v, size %d\n", trackID, streamType, len(payload)) }) panic(err) diff --git a/examples/client-read-pause/main.go b/examples/client-read-pause/main.go index d1640ad4..c3e7f093 100644 --- a/examples/client-read-pause/main.go +++ b/examples/client-read-pause/main.go @@ -17,18 +17,18 @@ func main() { c := gortsplib.Client{} // connect to the server and start reading all tracks - conn, err := c.DialRead("rtsp://localhost:8554/mystream") + err := c.DialRead("rtsp://localhost:8554/mystream") if err != nil { panic(err) } - defer conn.Close() + defer c.Close() for { // read packets done := make(chan struct{}) go func() { defer close(done) - conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + c.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { fmt.Printf("packet from track %d, type %v, size %d\n", trackID, streamType, len(payload)) }) }() @@ -37,7 +37,7 @@ func main() { time.Sleep(5 * time.Second) // pause - _, err := conn.Pause() + _, err := c.Pause() if err != nil { panic(err) } @@ -49,7 +49,7 @@ func main() { time.Sleep(5 * time.Second) // play again - _, err = conn.Play(nil) + _, err = c.Play(nil) if err != nil { panic(err) } diff --git a/examples/client-read/main.go b/examples/client-read/main.go index ee9574cb..58dd16a1 100644 --- a/examples/client-read/main.go +++ b/examples/client-read/main.go @@ -13,14 +13,14 @@ func main() { c := gortsplib.Client{} // connect to the server and start reading all tracks - conn, err := c.DialRead("rtsp://localhost:8554/mystream") + err := c.DialRead("rtsp://localhost:8554/mystream") if err != nil { panic(err) } - defer conn.Close() + defer c.Close() // read packets - err = conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { + err = c.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { fmt.Printf("packet from track %d, type %v, size %d\n", trackID, streamType, len(payload)) }) panic(err)