/* Package gortsplib is a RTSP library for the Go programming language. Examples are available at https://github.com/bluenviron/gortsplib/tree/main/examples */ package gortsplib import ( "bufio" "context" "crypto/rand" "crypto/tls" "fmt" "log" "net" "slices" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/bluenviron/gortsplib/v5/internal/asyncprocessor" "github.com/bluenviron/gortsplib/v5/pkg/auth" "github.com/bluenviron/gortsplib/v5/pkg/base" "github.com/bluenviron/gortsplib/v5/pkg/bytecounter" "github.com/bluenviron/gortsplib/v5/pkg/conn" "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/gortsplib/v5/pkg/headers" "github.com/bluenviron/gortsplib/v5/pkg/liberrors" "github.com/bluenviron/gortsplib/v5/pkg/mikey" "github.com/bluenviron/gortsplib/v5/pkg/rtpreceiver" "github.com/bluenviron/gortsplib/v5/pkg/rtpsender" "github.com/bluenviron/gortsplib/v5/pkg/rtptime" "github.com/bluenviron/gortsplib/v5/pkg/sdp" ) const ( clientUserAgent = "gortsplib" ) func generateLocalSSRCs(existing []uint32, formats []format.Format) (map[uint8]uint32, error) { ret := make(map[uint8]uint32) for _, forma := range formats { for { ssrc, err := randUint32() if err != nil { return nil, err } if ssrc != 0 && !slices.Contains(existing, ssrc) { existing = append(existing, ssrc) ret[forma.PayloadType()] = ssrc break } } } return ret, nil } func ssrcsMapToList(m map[uint8]uint32) []uint32 { ret := make([]uint32, len(m)) n := 0 for _, el := range m { ret[n] = el n++ } return ret } func clientExtractExistingSSRCs(setuppedMedias map[*description.Media]*clientMedia) []uint32 { var ret []uint32 for _, media := range setuppedMedias { for _, forma := range media.formats { ret = append(ret, forma.localSSRC) } } return ret } // convert an URL into an address, in particular: // * add default port // * handle IPv6 with or without square brackets. // Adapted from net/http: // https://cs.opensource.google/go/go/+/refs/tags/go1.20.5:src/net/http/transport.go;l=2747 func canonicalAddr(u *base.URL) string { addr := u.Hostname() port := u.Port() if port == "" { if u.Scheme == "rtsp" { port = "554" } else { // rtsps port = "322" } } return net.JoinHostPort(addr, port) } func isAnyPort(p int) bool { return p == 0 || p == 1 } func findBaseURL(sd *sdp.SessionDescription, res *base.Response, u *base.URL) (*base.URL, error) { // use global control attribute if control, ok := sd.Attribute("control"); ok && control != "*" { ret, err := base.ParseURL(control) if err != nil { return nil, fmt.Errorf("invalid control attribute: '%v'", control) } // add credentials ret.User = u.User return ret, nil } // use Content-Base if cb, ok := res.Header["Content-Base"]; ok { if len(cb) != 1 { return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) } if strings.HasPrefix(cb[0], "/") { // parse as a relative path ret, err := base.ParseURL(u.Scheme + "://" + u.Host + cb[0]) if err != nil { return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) } // add credentials ret.User = u.User return ret, nil } ret, err := base.ParseURL(cb[0]) if err != nil { return nil, fmt.Errorf("invalid Content-Base: '%v'", cb) } // add credentials ret.User = u.User return ret, nil } // use URL of request return u, nil } type clientAnnounceDataFormat struct { localSSRC uint32 } type clientAnnounceDataMedia struct { srtpOutKey []byte formats map[uint8]*clientAnnounceDataFormat } func announceDataPickLocalSSRC( am *clientAnnounceDataMedia, data map[*description.Media]*clientAnnounceDataMedia, ) (uint32, error) { var existing []uint32 //nolint:prealloc for _, am := range data { for _, af := range am.formats { existing = append(existing, af.localSSRC) } } for _, af := range am.formats { existing = append(existing, af.localSSRC) } for { ssrc, err := randUint32() if err != nil { return 0, err } if ssrc != 0 && !slices.Contains(existing, ssrc) { return ssrc, nil } } } func generateAnnounceData( desc *description.Session, secure bool, ) (map[*description.Media]*clientAnnounceDataMedia, error) { data := make(map[*description.Media]*clientAnnounceDataMedia) for _, medi := range desc.Medias { am := &clientAnnounceDataMedia{ formats: make(map[uint8]*clientAnnounceDataFormat), } for _, format := range medi.Formats { dataFormat := &clientAnnounceDataFormat{} var err error dataFormat.localSSRC, err = announceDataPickLocalSSRC(am, data) if err != nil { return nil, err } am.formats[format.PayloadType()] = dataFormat } if secure { am.srtpOutKey = make([]byte, srtpKeyLength) _, err := rand.Read(am.srtpOutKey) if err != nil { return nil, err } } data[medi] = am } return data, nil } func prepareForAnnounce( desc *description.Session, announceData map[*description.Media]*clientAnnounceDataMedia, secure bool, ) error { for i, m := range desc.Medias { m.Control = "trackID=" + strconv.FormatInt(int64(i), 10) if secure { m.Profile = headers.TransportProfileSAVP announceDataMedia := announceData[m] ssrcs := make([]uint32, len(m.Formats)) n := 0 for _, af := range announceDataMedia.formats { ssrcs[n] = af.localSSRC n++ } // create a temporary Context. // Context is needed to extract ROC, but since client has not started streaming, // ROC is always zero, therefore a temporary Context can be used. srtpCtx := &wrappedSRTPContext{ key: announceDataMedia.srtpOutKey, ssrcs: ssrcs, } err := srtpCtx.initialize() if err != nil { return err } mikeyMsg, err := mikeyGenerate(srtpCtx) if err != nil { return err } m.KeyMgmtMikey = mikeyMsg } else { m.Profile = headers.TransportProfileAVP } } return nil } func supportsGetParameter(header base.Header) bool { pub, ok := header["Public"] if !ok || len(pub) != 1 { return false } for _, m := range strings.Split(pub[0], ",") { if base.Method(strings.Trim(m, " ")) == base.GetParameter { return true } } return false } func interfaceOfConn(c net.Conn) (*net.Interface, error) { var localIP net.IP switch addr := c.LocalAddr().(type) { case *net.TCPAddr: localIP = addr.IP case *net.UDPAddr: localIP = addr.IP default: return nil, fmt.Errorf("unknown connection type: %T", c.LocalAddr()) } interfaces, err := net.Interfaces() if err != nil { return nil, err } for _, iface := range interfaces { var addrs []net.Addr addrs, err = iface.Addrs() if err != nil { continue } for _, addr := range addrs { var ip net.IP switch v := addr.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP } if ip != nil && ip.Equal(localIP) { return &iface, nil } } } return nil, fmt.Errorf("no interface found for IP %s", localIP) } type clientState int const ( clientStateInitial clientState = iota clientStatePrePlay clientStatePlay clientStatePreRecord clientStateRecord ) func (s clientState) String() string { switch s { case clientStateInitial: return "initial" case clientStatePrePlay: return "prePlay" case clientStatePlay: return "play" case clientStatePreRecord: return "preRecord" case clientStateRecord: return "record" } return "unknown" } type optionsReq struct { url *base.URL res chan clientRes } type describeReq struct { url *base.URL res chan clientRes } type announceReq struct { url *base.URL desc *description.Session res chan clientRes } type setupReq struct { baseURL *base.URL media *description.Media rtpPort int rtcpPort int res chan clientRes } type playReq struct { ra *headers.Range res chan clientRes } type recordReq struct { res chan clientRes } type pauseReq struct { res chan clientRes } type clientRes struct { sd *description.Session // describe only res *base.Response err error } // ClientOnRequestFunc is the prototype of Client.OnRequest. type ClientOnRequestFunc func(*base.Request) // ClientOnResponseFunc is the prototype of Client.OnResponse. type ClientOnResponseFunc func(*base.Response) // ClientOnTransportSwitchFunc is the prototype of Client.OnTransportSwitch. type ClientOnTransportSwitchFunc func(err error) // ClientOnPacketsLostFunc is the prototype of Client.OnPacketsLost. type ClientOnPacketsLostFunc func(lost uint64) // ClientOnDecodeErrorFunc is the prototype of Client.OnDecodeError. type ClientOnDecodeErrorFunc func(err error) // OnPacketRTPFunc is the prototype of the callback passed to OnPacketRTP(). type OnPacketRTPFunc func(*rtp.Packet) // OnPacketRTPAnyFunc is the prototype of the callback passed to OnPacketRTP(Any). type OnPacketRTPAnyFunc func(*description.Media, format.Format, *rtp.Packet) // OnPacketRTCPFunc is the prototype of the callback passed to OnPacketRTCP(). type OnPacketRTCPFunc func(rtcp.Packet) // OnPacketRTCPAnyFunc is the prototype of the callback passed to OnPacketRTCPAny(). type OnPacketRTCPAnyFunc func(*description.Media, rtcp.Packet) // Client is a RTSP client. type Client struct { // // Target // // Scheme. Either "rtsp" or "rtsps". Scheme string // Host and port. Host string // // RTSP parameters (all optional) // // timeout of read operations. // It defaults to 10 seconds. ReadTimeout time.Duration // timeout of write operations. // It defaults to 10 seconds. WriteTimeout time.Duration // a TLS configuration to connect to TLS/RTSPS servers. // It defaults to nil. TLSConfig *tls.Config // tunneling method. Tunnel Tunnel // transport protocol (UDP, Multicast or TCP). // If nil, it is chosen automatically (first UDP, then, if it fails, TCP). // It defaults to nil. Protocol *Protocol // enable communication with servers which don't provide UDP server ports // or use different server ports than the announced ones. // This can be a security issue. // It defaults to false. AnyPortEnable bool // If the client is reading with UDP, it must receive // at least a packet within this timeout, otherwise it switches to TCP. // It defaults to 3 seconds. InitialUDPReadTimeout time.Duration // Size of the UDP read buffer. // This can be increased to reduce packet losses. // It defaults to the operating system default value. UDPReadBufferSize int // Size of the queue of outgoing packets. // It defaults to 256. WriteQueueSize int // maximum size of outgoing RTP / RTCP packets. // This must be less than the UDP MTU (1472 bytes). // It defaults to 1472. MaxPacketSize int // user agent header. // It defaults to "gortsplib" UserAgent string // disable automatic RTCP sender reports. DisableRTCPSenderReports bool // explicitly request back channels to the server. RequestBackChannels bool // // system functions (all optional) // // function used to initialize the TCP client. // It defaults to (&net.Dialer{}).DialContext. DialContext func(ctx context.Context, network, address string) (net.Conn, error) // function used to initialize UDP listeners. // It defaults to net.ListenPacket. ListenPacket func(network, address string) (net.PacketConn, error) // // callbacks (all optional) // // called when sending a request to the server. OnRequest ClientOnRequestFunc // called when receiving a response from the server. OnResponse ClientOnResponseFunc // called when receiving a request from the server. OnServerRequest ClientOnRequestFunc // called when sending a response to the server. OnServerResponse ClientOnResponseFunc // called when the transport protocol changes. OnTransportSwitch ClientOnTransportSwitchFunc // called when the client detects lost packets. OnPacketsLost ClientOnPacketsLostFunc // called when a non-fatal decode error occurs. OnDecodeError ClientOnDecodeErrorFunc // // private // timeNow func() time.Time senderReportPeriod time.Duration receiverReportPeriod time.Duration checkTimeoutPeriod time.Duration ctx context.Context ctxCancel func() propsMutex sync.RWMutex state clientState nconn net.Conn conn *conn.Conn session string sender *auth.Sender cseq int optionsSent bool useGetParameter bool lastDescribeURL *base.URL lastDescribeDesc *description.Session baseURL *base.URL announceData map[*description.Media]*clientAnnounceDataMedia // record setuppedTransport *SessionTransport backChannelSetupped bool stdChannelSetupped bool setuppedMedias map[*description.Media]*clientMedia tcpCallbackByChannel map[int]readFunc lastRange *headers.Range checkTimeoutTimer *time.Timer checkTimeoutInitial bool tcpLastFrameTime *int64 keepAlivePeriod time.Duration keepAliveTimer *time.Timer closeError error writerMutex sync.RWMutex writer *asyncprocessor.Processor reader *clientReader timeDecoder *rtptime.GlobalDecoder mustClose bool tcpFrame *base.InterleavedFrame tcpBuffer []byte bytesReceived *uint64 bytesSent *uint64 // in chOptions chan optionsReq chDescribe chan describeReq chAnnounce chan announceReq chSetup chan setupReq chPlay chan playReq chRecord chan recordReq chPause chan pauseReq chResponse chan *base.Response chRequest chan *base.Request chReadError chan error chWriterError chan error // out done chan struct{} } // Start initializes the connection to a server. func (c *Client) Start() error { // RTSP parameters if c.ReadTimeout == 0 { c.ReadTimeout = 10 * time.Second } if c.WriteTimeout == 0 { c.WriteTimeout = 10 * time.Second } if c.InitialUDPReadTimeout == 0 { c.InitialUDPReadTimeout = 3 * time.Second } if c.WriteQueueSize == 0 { c.WriteQueueSize = 256 } else if (c.WriteQueueSize & (c.WriteQueueSize - 1)) != 0 { return fmt.Errorf("WriteQueueSize must be a power of two") } if c.MaxPacketSize == 0 { c.MaxPacketSize = udpMaxPayloadSize } else if c.MaxPacketSize > udpMaxPayloadSize { return fmt.Errorf("MaxPacketSize must be less than %d", udpMaxPayloadSize) } if c.UserAgent == "" { c.UserAgent = clientUserAgent } // system functions if c.DialContext == nil { c.DialContext = (&net.Dialer{}).DialContext } if c.ListenPacket == nil { c.ListenPacket = net.ListenPacket } // callbacks if c.OnRequest == nil { c.OnRequest = func(*base.Request) { } } if c.OnResponse == nil { c.OnResponse = func(*base.Response) { } } if c.OnServerRequest == nil { c.OnServerRequest = func(*base.Request) { } } if c.OnServerResponse == nil { c.OnServerResponse = func(*base.Response) { } } if c.OnTransportSwitch == nil { c.OnTransportSwitch = func(err error) { log.Println(err.Error()) } } if c.OnPacketsLost == nil { c.OnPacketsLost = func(lost uint64) { log.Printf("%d RTP %s lost", lost, func() string { if lost == 1 { return "packet" } return "packets" }()) } } if c.OnDecodeError == nil { c.OnDecodeError = func(err error) { log.Println(err.Error()) } } // private if c.timeNow == nil { c.timeNow = time.Now } if c.senderReportPeriod == 0 { c.senderReportPeriod = 10 * time.Second } if c.receiverReportPeriod == 0 { // some cameras require a maximum of 5secs between keepalives c.receiverReportPeriod = 5 * time.Second } if c.checkTimeoutPeriod == 0 { c.checkTimeoutPeriod = 1 * time.Second } ctx, ctxCancel := context.WithCancel(context.Background()) c.ctx = ctx c.ctxCancel = ctxCancel c.checkTimeoutTimer = emptyTimer() c.keepAlivePeriod = 30 * time.Second c.keepAliveTimer = emptyTimer() c.bytesReceived = new(uint64) c.bytesSent = new(uint64) c.chOptions = make(chan optionsReq) c.chDescribe = make(chan describeReq) c.chAnnounce = make(chan announceReq) c.chSetup = make(chan setupReq) c.chPlay = make(chan playReq) c.chRecord = make(chan recordReq) c.chPause = make(chan pauseReq) c.chResponse = make(chan *base.Response) c.chRequest = make(chan *base.Request) c.chReadError = make(chan error) c.chWriterError = make(chan error) c.done = make(chan struct{}) go c.run() return nil } // StartRecording connects to the address and starts publishing given media. func (c *Client) StartRecording(address string, desc *description.Session) error { u, err := base.ParseURL(address) if err != nil { return err } c.Scheme = u.Scheme c.Host = u.Host err = c.Start() if err != nil { return err } _, err = c.Announce(u, desc) if err != nil { c.Close() return err } err = c.SetupAll(u, desc.Medias) if err != nil { c.Close() return err } _, err = c.Record() if err != nil { c.Close() return err } return nil } // Close closes all client resources and waits for them to exit. func (c *Client) Close() { c.ctxCancel() <-c.done } // Wait waits until all client resources are closed. // This can happen when a fatal error occurs or when Close() is called. func (c *Client) Wait() error { <-c.done return c.closeError } func (c *Client) run() { defer close(c.done) c.closeError = c.runInner() c.ctxCancel() c.doClose() } func (c *Client) runInner() error { for { select { case req := <-c.chOptions: res, err := c.doOptions(req.url) req.res <- clientRes{res: res, err: err} if c.mustClose { return err } case req := <-c.chDescribe: sd, res, err := c.doDescribe(req.url) req.res <- clientRes{sd: sd, res: res, err: err} if c.mustClose { return err } case req := <-c.chAnnounce: res, err := c.doAnnounce(req.url, req.desc) req.res <- clientRes{res: res, err: err} if c.mustClose { return err } case req := <-c.chSetup: res, err := c.doSetup(req.baseURL, req.media, req.rtpPort, req.rtcpPort) req.res <- clientRes{res: res, err: err} if c.mustClose { return err } case req := <-c.chPlay: res, err := c.doPlay(req.ra) req.res <- clientRes{res: res, err: err} if c.mustClose { return err } case req := <-c.chRecord: res, err := c.doRecord() req.res <- clientRes{res: res, err: err} if c.mustClose { return err } case req := <-c.chPause: res, err := c.doPause() req.res <- clientRes{res: res, err: err} if c.mustClose { return err } case <-c.checkTimeoutTimer.C: err := c.doCheckTimeout() if err != nil { return err } c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) case <-c.keepAliveTimer.C: err := c.doKeepAlive() if err != nil { return err } c.keepAliveTimer = time.NewTimer(c.keepAlivePeriod) case res := <-c.chResponse: c.OnResponse(res) // these are responses to keepalives, ignore them. case req := <-c.chRequest: err := c.handleServerRequest(req) if err != nil { return err } case err := <-c.chReadError: c.reader.close() c.reader = nil return err case err := <-c.chWriterError: return err case <-c.ctx.Done(): return liberrors.ErrClientTerminated{} } } } func (c *Client) waitResponse(requestCseqStr string) (*base.Response, error) { t := time.NewTimer(c.ReadTimeout) defer t.Stop() for { select { case <-t.C: return nil, liberrors.ErrClientRequestTimedOut{} case req := <-c.chRequest: err := c.handleServerRequest(req) if err != nil { return nil, err } case res := <-c.chResponse: c.OnResponse(res) // accept response if CSeq equals request CSeq, or if CSeq is not present if cseq, ok := res.Header["CSeq"]; !ok || len(cseq) != 1 || strings.TrimSpace(cseq[0]) == requestCseqStr { return res, nil } case err := <-c.chReadError: c.reader.close() c.reader = nil return nil, err case <-c.ctx.Done(): return nil, liberrors.ErrClientTerminated{} } } } func (c *Client) handleServerRequest(req *base.Request) error { c.OnServerRequest(req) if req.Method != base.Options { return liberrors.ErrClientUnhandledMethod{Method: req.Method} } h := base.Header{ "User-Agent": base.HeaderValue{c.UserAgent}, } if cseq, ok := req.Header["CSeq"]; ok { h["CSeq"] = cseq } res := &base.Response{ StatusCode: base.StatusOK, Header: h, } c.OnServerResponse(res) c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) return c.conn.WriteResponse(res) } func (c *Client) doClose() { if c.state == clientStatePlay || c.state == clientStateRecord { c.destroyWriter() c.stopTransportRoutines() } if c.nconn != nil && c.baseURL != nil { header := base.Header{} if c.backChannelSetupped { header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"} } c.do(&base.Request{ //nolint:errcheck Method: base.Teardown, URL: c.baseURL, Header: header, }, true) } if c.reader != nil { c.nconn.Close() c.reader.close() c.reader = nil c.nconn = nil c.conn = nil } else if c.nconn != nil { c.nconn.Close() c.nconn = nil c.conn = nil } for _, cm := range c.setuppedMedias { cm.close() } } func (c *Client) reset() { c.doClose() c.state = clientStateInitial c.session = "" c.sender = nil c.cseq = 0 c.optionsSent = false c.useGetParameter = false c.baseURL = nil c.setuppedTransport = nil c.backChannelSetupped = false c.stdChannelSetupped = false c.setuppedMedias = nil c.tcpCallbackByChannel = nil } func (c *Client) checkState(allowed map[clientState]struct{}) error { if _, ok := allowed[c.state]; ok { return nil } allowedList := make([]fmt.Stringer, len(allowed)) i := 0 for a := range allowed { allowedList[i] = a i++ } return liberrors.ErrClientInvalidState{AllowedList: allowedList, State: c.state} } func (c *Client) trySwitchingProtocol() error { c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP{}) prevBaseURL := c.baseURL prevMedias := c.setuppedMedias c.reset() c.setuppedTransport = &SessionTransport{ Protocol: ProtocolTCP, } // some Hikvision cameras require a describe before a setup _, _, err := c.doDescribe(c.lastDescribeURL) if err != nil { return err } for i, cm := range prevMedias { _, err = c.doSetup(prevBaseURL, cm.media, 0, 0) if err != nil { return err } c.setuppedMedias[i].onPacketRTCP = cm.onPacketRTCP for j, tr := range cm.formats { c.setuppedMedias[i].formats[j].onPacketRTP = tr.onPacketRTP } } _, err = c.doPlay(c.lastRange) if err != nil { return err } return nil } func (c *Client) startTransportRoutines() { c.timeDecoder = &rtptime.GlobalDecoder{} c.timeDecoder.Initialize() for _, cm := range c.setuppedMedias { cm.start() } if c.setuppedTransport.Protocol == ProtocolTCP { c.tcpFrame = &base.InterleavedFrame{} c.tcpBuffer = make([]byte, c.MaxPacketSize+4) } // always enable keepalives unless we are recording with TCP if c.state == clientStatePlay || c.setuppedTransport.Protocol != ProtocolTCP { c.keepAliveTimer = time.NewTimer(c.keepAlivePeriod) } if c.state == clientStatePlay && c.stdChannelSetupped { switch c.setuppedTransport.Protocol { case ProtocolUDP: c.checkTimeoutTimer = time.NewTimer(c.InitialUDPReadTimeout) c.checkTimeoutInitial = true case ProtocolUDPMulticast: c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) default: // TCP c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) v := c.timeNow().Unix() c.tcpLastFrameTime = &v } } if c.setuppedTransport.Protocol == ProtocolTCP { c.reader.setAllowInterleavedFrames(true) } } func (c *Client) stopTransportRoutines() { if c.reader != nil { c.reader.setAllowInterleavedFrames(false) } c.checkTimeoutTimer = emptyTimer() c.keepAliveTimer = emptyTimer() for _, cm := range c.setuppedMedias { cm.stop() } c.timeDecoder = nil } func (c *Client) createWriter() { c.writerMutex.Lock() c.writer = &asyncprocessor.Processor{ BufferSize: func() int { if c.state == clientStateRecord || c.backChannelSetupped { return c.WriteQueueSize } // when reading, buffer is only used to send RTCP receiver reports, // that are much smaller than RTP packets and are sent at a fixed interval. // decrease RAM consumption by allocating less buffers. return 8 }(), OnError: func(ctx context.Context, err error) { select { case <-ctx.Done(): case <-c.ctx.Done(): case c.chWriterError <- err: } }, } c.writer.Initialize() c.writerMutex.Unlock() } func (c *Client) startWriter() { c.writer.Start() } func (c *Client) destroyWriter() { c.writer.Close() c.writerMutex.Lock() c.writer = nil c.writerMutex.Unlock() } func (c *Client) connOpen() error { if c.nconn != nil { return nil } if c.Scheme != "rtsp" && c.Scheme != "rtsps" { return liberrors.ErrClientUnsupportedScheme{Scheme: c.Scheme} } dialCtx, dialCtxCancel := context.WithTimeout(c.ctx, c.ReadTimeout) defer dialCtxCancel() addr := canonicalAddr(&base.URL{ Scheme: c.Scheme, Host: c.Host, }) var tlsConfig *tls.Config if c.Scheme == "rtsps" { tlsConfig = c.TLSConfig if tlsConfig == nil { host, _, _ := net.SplitHostPort(addr) tlsConfig = &tls.Config{ ServerName: host, } } } var nconn net.Conn switch c.Tunnel { case TunnelHTTP: var err error nconn, err = newClientTunnelHTTP(dialCtx, c.DialContext, addr, tlsConfig) if err != nil { return err } case TunnelWebSocket: var err error nconn, err = newClientTunnelWebSocket(dialCtx, c.DialContext, addr, tlsConfig) if err != nil { return err } default: var err error nconn, err = c.DialContext(dialCtx, "tcp", addr) if err != nil { return err } if tlsConfig != nil { nconn = tls.Client(nconn, tlsConfig) } } c.nconn = nconn bc := bytecounter.New(c.nconn, c.bytesReceived, c.bytesSent) c.conn = conn.NewConn(bufio.NewReader(bc), bc) c.reader = &clientReader{ c: c, } c.reader.start() return nil } func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error) { if !c.optionsSent && req.Method != base.Options { _, err := c.doOptions(req.URL) if err != nil { return nil, err } } if req.Header == nil { req.Header = make(base.Header) } if c.session != "" { req.Header["Session"] = base.HeaderValue{c.session} } c.cseq++ cseqStr := strconv.FormatInt(int64(c.cseq), 10) req.Header["CSeq"] = base.HeaderValue{cseqStr} req.Header["User-Agent"] = base.HeaderValue{c.UserAgent} if c.sender != nil { c.sender.AddAuthorization(req) } c.OnRequest(req) c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) err := c.conn.WriteRequest(req) if err != nil { return nil, err } if skipResponse { return nil, nil } res, err := c.waitResponse(cseqStr) if err != nil { c.mustClose = true return nil, err } // get session from response if v, ok := res.Header["Session"]; ok { var sx headers.Session err = sx.Unmarshal(v) if err != nil { return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err} } c.session = sx.Session if sx.Timeout != nil && *sx.Timeout > 0 { c.keepAlivePeriod = time.Duration(*sx.Timeout) * time.Second * 8 / 10 } } // send request again with authentication if res.StatusCode == base.StatusUnauthorized && req.URL.User != nil && c.sender == nil { pass, _ := req.URL.User.Password() user := req.URL.User.Username() sender := &auth.Sender{ WWWAuth: res.Header["WWW-Authenticate"], User: user, Pass: pass, } err = sender.Initialize() if err != nil { return nil, liberrors.ErrClientAuthSetup{Err: err} } c.sender = sender return c.do(req, skipResponse) } return res, nil } func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool { for _, ct := range c.setuppedMedias { lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime) if lft != 0 { return true } lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime) if lft != 0 { return true } } return false } func (c *Client) isInUDPTimeout() bool { now := c.timeNow() for _, ct := range c.setuppedMedias { lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) if now.Sub(lft) < c.ReadTimeout { return false } lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0) if now.Sub(lft) < c.ReadTimeout { return false } } return true } func (c *Client) isInTCPTimeout() bool { now := c.timeNow() lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0) return now.Sub(lft) >= c.ReadTimeout } func (c *Client) doCheckTimeout() error { if c.setuppedTransport.Protocol == ProtocolUDP || c.setuppedTransport.Protocol == ProtocolUDPMulticast { if c.checkTimeoutInitial && !c.backChannelSetupped && c.Protocol == nil { c.checkTimeoutInitial = false if !c.atLeastOneUDPPacketHasBeenReceived() { err := c.trySwitchingProtocol() if err != nil { return err } } } else if c.isInUDPTimeout() { return liberrors.ErrClientUDPTimeout{} } } else if c.isInTCPTimeout() { return liberrors.ErrClientTCPTimeout{} } return nil } func (c *Client) doKeepAlive() error { // some cameras do not reply to keepalives, do not wait for responses. _, err := c.do(&base.Request{ Method: func() base.Method { // the VLC integrated rtsp server requires GET_PARAMETER if c.useGetParameter { return base.GetParameter } return base.Options }(), // use the stream base URL, otherwise some cameras do not reply URL: c.baseURL, }, true) return err } func (c *Client) doOptions(u *base.URL) (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStateInitial: {}, clientStatePrePlay: {}, clientStatePreRecord: {}, }) if err != nil { return nil, err } err = c.connOpen() if err != nil { return nil, err } res, err := c.do(&base.Request{ Method: base.Options, URL: u, }, false) if err != nil { return nil, err } if res.StatusCode != base.StatusOK { // since this method is not implemented by every RTSP server, // return an error only if status code is not 404 if res.StatusCode == base.StatusNotFound { return res, nil } return nil, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} } c.optionsSent = true c.useGetParameter = supportsGetParameter(res.Header) return res, nil } // Options sends an OPTIONS request. func (c *Client) Options(u *base.URL) (*base.Response, error) { cres := make(chan clientRes) select { case c.chOptions <- optionsReq{url: u, res: cres}: res := <-cres return res.res, res.err case <-c.done: return nil, c.closeError } } func (c *Client) doDescribe(u *base.URL) (*description.Session, *base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStateInitial: {}, clientStatePrePlay: {}, clientStatePreRecord: {}, }) if err != nil { return nil, nil, err } err = c.connOpen() if err != nil { return nil, nil, err } header := base.Header{ "Accept": base.HeaderValue{"application/sdp"}, } if c.RequestBackChannels { header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"} } res, err := c.do(&base.Request{ Method: base.Describe, URL: u, Header: header, }, false) if err != nil { return nil, nil, err } if res.StatusCode != base.StatusOK { // redirect if res.StatusCode >= base.StatusMovedPermanently && res.StatusCode <= base.StatusUseProxy && len(res.Header["Location"]) == 1 { c.reset() var ru *base.URL ru, err = base.ParseURL(res.Header["Location"][0]) if err != nil { return nil, nil, err } if c.Scheme == "rtsps" && ru.Scheme != "rtsps" { return nil, nil, fmt.Errorf("connection cannot be downgraded from RTSPS to RTSP") } if u.User != nil { ru.User = u.User } c.Scheme = ru.Scheme c.Host = ru.Host return c.doDescribe(ru) } return nil, res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} } ct, ok := res.Header["Content-Type"] if !ok || len(ct) != 1 { return nil, nil, liberrors.ErrClientContentTypeMissing{} } // strip encoding information from Content-Type header ct = base.HeaderValue{strings.Split(ct[0], ";")[0]} if ct[0] != "application/sdp" { return nil, nil, liberrors.ErrClientContentTypeUnsupported{CT: ct} } var ssd sdp.SessionDescription err = ssd.Unmarshal(res.Body) if err != nil { return nil, nil, liberrors.ErrClientSDPInvalid{Err: err} } var desc description.Session err = desc.Unmarshal(&ssd) if err != nil { return nil, nil, liberrors.ErrClientSDPInvalid{Err: err} } baseURL, err := findBaseURL(&ssd, res, u) if err != nil { return nil, nil, err } desc.BaseURL = baseURL c.lastDescribeURL = u c.lastDescribeDesc = &desc return &desc, res, nil } // Describe sends a DESCRIBE request. func (c *Client) Describe(u *base.URL) (*description.Session, *base.Response, error) { cres := make(chan clientRes) select { case c.chDescribe <- describeReq{url: u, res: cres}: res := <-cres return res.sd, res.res, res.err case <-c.done: return nil, nil, c.closeError } } func (c *Client) doAnnounce(u *base.URL, desc *description.Session) (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStateInitial: {}, }) if err != nil { return nil, err } if c.Protocol != nil && *c.Protocol == ProtocolUDPMulticast { return nil, fmt.Errorf("recording with UDP multicast is not supported") } err = c.connOpen() if err != nil { return nil, err } announceData, err := generateAnnounceData(desc, c.Scheme == "rtsps") if err != nil { return nil, err } err = prepareForAnnounce(desc, announceData, c.Scheme == "rtsps") if err != nil { return nil, err } byts, err := desc.Marshal() if err != nil { return nil, err } res, err := c.do(&base.Request{ Method: base.Announce, URL: u, Header: base.Header{ "Content-Type": base.HeaderValue{"application/sdp"}, }, Body: byts, }, false) if err != nil { return nil, err } if res.StatusCode != base.StatusOK { return nil, liberrors.ErrClientBadStatusCode{ Code: res.StatusCode, Message: res.StatusMessage, } } c.baseURL = u.Clone() c.state = clientStatePreRecord c.announceData = announceData return res, nil } // Announce sends an ANNOUNCE request. func (c *Client) Announce(u *base.URL, desc *description.Session) (*base.Response, error) { cres := make(chan clientRes) select { case c.chAnnounce <- announceReq{url: u, desc: desc, res: cres}: res := <-cres return res.res, res.err case <-c.done: return nil, c.closeError } } func (c *Client) doSetup( baseURL *base.URL, medi *description.Media, rtpPort int, rtcpPort int, ) (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStateInitial: {}, clientStatePrePlay: {}, clientStatePreRecord: {}, }) if err != nil { return nil, err } err = c.connOpen() if err != nil { return nil, err } if c.baseURL != nil && *baseURL != *c.baseURL { return nil, liberrors.ErrClientCannotSetupMediasDifferentURLs{} } th := headers.Transport{} // when playing, omit mode, since it causes errors with some servers. if c.state == clientStatePreRecord { v := headers.TransportModeRecord th.Mode = &v } var protocol Protocol switch { // use transport from previous SETUP calls case c.setuppedTransport != nil: protocol = c.setuppedTransport.Protocol th.Profile = c.setuppedTransport.Profile // use transport from config, secure flag from server case c.Protocol != nil: protocol = *c.Protocol if isSecure(medi.Profile) && c.Scheme == "rtsps" { th.Profile = headers.TransportProfileSAVP } else { th.Profile = headers.TransportProfileAVP } // try // - UDP if unencrypted or secure is supported by server // - otherwise, TCP default: if isSecure(medi.Profile) && c.Scheme == "rtsps" { th.Profile = headers.TransportProfileSAVP } else { th.Profile = headers.TransportProfileAVP } if c.Tunnel == TunnelNone && (th.Profile == headers.TransportProfileSAVP || c.Scheme == "rtsp") { protocol = ProtocolUDP } else { protocol = ProtocolTCP } } var localSSRCs map[uint8]uint32 if c.state == clientStatePreRecord { localSSRCs = make(map[uint8]uint32) for forma, data := range c.announceData[medi].formats { localSSRCs[forma] = data.localSSRC } } else { localSSRCs, err = generateLocalSSRCs( clientExtractExistingSSRCs(c.setuppedMedias), medi.Formats, ) if err != nil { return nil, err } } var udpRTPListener *clientUDPListener var udpRTCPListener *clientUDPListener var tcpChannel int var srtpInCtx *wrappedSRTPContext var srtpOutCtx *wrappedSRTPContext defer func() { if udpRTPListener != nil { udpRTPListener.close() } if udpRTCPListener != nil { udpRTCPListener.close() } }() switch protocol { case ProtocolUDP, ProtocolUDPMulticast: if c.Scheme == "rtsps" && !isSecure(th.Profile) { return nil, fmt.Errorf("unable to setup secure UDP") } th.Protocol = headers.TransportProtocolUDP if protocol == ProtocolUDP { if (rtpPort == 0 && rtcpPort != 0) || (rtpPort != 0 && rtcpPort == 0) { return nil, liberrors.ErrClientUDPPortsZero{} } if rtpPort != 0 && rtcpPort != (rtpPort+1) { return nil, liberrors.ErrClientUDPPortsNotConsecutive{} } udpRTPListener, udpRTCPListener, err = createUDPListenerPair( c, false, nil, net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)), net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)), ) if err != nil { return nil, err } v1 := headers.TransportDeliveryUnicast th.Delivery = &v1 th.ClientPorts = &[2]int{udpRTPListener.port(), udpRTCPListener.port()} } else { v1 := headers.TransportDeliveryMulticast th.Delivery = &v1 } case ProtocolTCP: v1 := headers.TransportDeliveryUnicast th.Delivery = &v1 th.Protocol = headers.TransportProtocolTCP ch := c.findFreeChannelPair() th.InterleavedIDs = &[2]int{ch, ch + 1} } mediaURL, err := medi.URL(baseURL) if err != nil { return nil, err } header := base.Header{ "Transport": th.Marshal(), } if medi.IsBackChannel { if !c.RequestBackChannels { return nil, fmt.Errorf("we are setupping a back channel but we did not request back channels") } header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"} } if isSecure(th.Profile) { var srtpOutKey []byte if c.state == clientStatePreRecord { srtpOutKey = c.announceData[medi].srtpOutKey } else { srtpOutKey = make([]byte, srtpKeyLength) _, err = rand.Read(srtpOutKey) if err != nil { return nil, err } } srtpOutCtx = &wrappedSRTPContext{ key: srtpOutKey, ssrcs: ssrcsMapToList(localSSRCs), } err = srtpOutCtx.initialize() if err != nil { return nil, err } var mikeyMsg *mikey.Message mikeyMsg, err = mikeyGenerate(srtpOutCtx) if err != nil { return nil, err } var enc base.HeaderValue enc, err = headers.KeyMgmt{ URL: mediaURL.String(), MikeyMessage: mikeyMsg, }.Marshal() if err != nil { return nil, err } header["KeyMgmt"] = enc } res, err := c.do(&base.Request{ Method: base.Setup, URL: mediaURL, Header: header, }, false) if err != nil { return nil, err } if res.StatusCode != base.StatusOK { // switch transport automatically if res.StatusCode == base.StatusUnsupportedTransport && c.setuppedTransport == nil && c.Protocol == nil { c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP2{}) c.setuppedTransport = &SessionTransport{ Protocol: ProtocolTCP, Profile: th.Profile, } return c.doSetup(baseURL, medi, 0, 0) } return nil, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage} } var thRes headers.Transport err = thRes.Unmarshal(res.Header["Transport"]) if err != nil { return nil, liberrors.ErrClientTransportHeaderInvalid{Err: err} } switch protocol { case ProtocolUDP, ProtocolUDPMulticast: if thRes.Protocol == headers.TransportProtocolTCP { // switch transport automatically if c.setuppedTransport == nil && c.Protocol == nil { c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP2{}) c.baseURL = baseURL c.reset() c.setuppedTransport = &SessionTransport{ Protocol: ProtocolTCP, Profile: th.Profile, } // some Hikvision cameras require a describe before a setup _, _, err = c.doDescribe(c.lastDescribeURL) if err != nil { return nil, err } return c.doSetup(baseURL, medi, 0, 0) } return nil, liberrors.ErrClientServerRequestedTCP{} } } switch protocol { case ProtocolUDP: if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} } serverPortsValid := thRes.ServerPorts != nil && !isAnyPort(thRes.ServerPorts[0]) && !isAnyPort(thRes.ServerPorts[1]) if (c.state == clientStatePreRecord || !c.AnyPortEnable) && !serverPortsValid { return nil, liberrors.ErrClientServerPortsNotProvided{} } var remoteIP net.IP if thRes.Source2 != nil { if ip := net.ParseIP(*thRes.Source2); ip != nil { remoteIP = ip } else { var addr *net.UDPAddr addr, err = net.ResolveUDPAddr("udp", *thRes.Source2) if err != nil { return nil, fmt.Errorf("unable to solve source host: %w", err) } remoteIP = addr.IP } } else { remoteIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP } if serverPortsValid { if !c.AnyPortEnable { udpRTPListener.readPort = thRes.ServerPorts[0] } udpRTPListener.writeAddr = &net.UDPAddr{ IP: remoteIP, Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone, Port: thRes.ServerPorts[0], } } udpRTPListener.readIP = remoteIP if serverPortsValid { if !c.AnyPortEnable { udpRTCPListener.readPort = thRes.ServerPorts[1] } udpRTCPListener.writeAddr = &net.UDPAddr{ IP: remoteIP, Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone, Port: thRes.ServerPorts[1], } } udpRTCPListener.readIP = remoteIP case ProtocolUDPMulticast: if thRes.Delivery == nil || *thRes.Delivery != headers.TransportDeliveryMulticast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} } var remoteIP net.IP if thRes.Source2 != nil { if ip := net.ParseIP(*thRes.Source2); ip != nil { remoteIP = ip } else { var addr *net.UDPAddr addr, err = net.ResolveUDPAddr("udp", *thRes.Source2) if err != nil { return nil, fmt.Errorf("unable to solve source host: %w", err) } remoteIP = addr.IP } } else { remoteIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP } var destIP net.IP if thRes.Destination2 == nil { return nil, liberrors.ErrClientTransportHeaderNoDestination{} } if ip := net.ParseIP(*thRes.Destination2); ip != nil { destIP = ip } else { var addr *net.UDPAddr addr, err = net.ResolveUDPAddr("udp", *thRes.Destination2) if err != nil { return nil, fmt.Errorf("unable to solve destination host: %w", err) } destIP = addr.IP } if thRes.Ports == nil { return nil, liberrors.ErrClientTransportHeaderNoPorts{} } var intf *net.Interface intf, err = interfaceOfConn(c.nconn) if err != nil { return nil, err } udpRTPListener, udpRTCPListener, err = createUDPListenerPair( c, true, intf, net.JoinHostPort(destIP.String(), strconv.FormatInt(int64(thRes.Ports[0]), 10)), net.JoinHostPort(destIP.String(), strconv.FormatInt(int64(thRes.Ports[1]), 10)), ) if err != nil { return nil, err } udpRTPListener.readIP = remoteIP udpRTPListener.readPort = thRes.Ports[0] udpRTPListener.writeAddr = &net.UDPAddr{ IP: remoteIP, Port: thRes.Ports[0], } udpRTCPListener.readIP = remoteIP udpRTCPListener.readPort = thRes.Ports[1] udpRTCPListener.writeAddr = &net.UDPAddr{ IP: remoteIP, Port: thRes.Ports[1], } case ProtocolTCP: if thRes.Protocol != headers.TransportProtocolTCP { return nil, liberrors.ErrClientServerRequestedUDP{} } if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast { return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{} } if thRes.InterleavedIDs == nil { return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{} } if (thRes.InterleavedIDs[0] + 1) != thRes.InterleavedIDs[1] { return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{} } if c.isChannelPairInUse(thRes.InterleavedIDs[0]) { return &base.Response{ StatusCode: base.StatusBadRequest, }, liberrors.ErrClientTransportHeaderInterleavedIDsInUse{} } tcpChannel = thRes.InterleavedIDs[0] } if thRes.Profile != th.Profile { return nil, fmt.Errorf("returned profile does not match requested profile") } if isSecure(th.Profile) { var mikeyMsg *mikey.Message // extract key-mgmt from (in order of priority): // - response // - media SDP attributes // - session SDP attributes switch { case res.Header["KeyMgmt"] != nil: var keyMgmt headers.KeyMgmt err = keyMgmt.Unmarshal(res.Header["KeyMgmt"]) if err != nil { return nil, err } mikeyMsg = keyMgmt.MikeyMessage case medi.KeyMgmtMikey != nil: mikeyMsg = medi.KeyMgmtMikey case c.lastDescribeDesc.KeyMgmtMikey != nil: mikeyMsg = c.lastDescribeDesc.KeyMgmtMikey default: return nil, fmt.Errorf("server did not provide key-mgmt data in any supported way") } srtpInCtx, err = mikeyToContext(mikeyMsg) if err != nil { return nil, err } } cm := &clientMedia{ c: c, media: medi, secure: isSecure(th.Profile), udpRTPListener: udpRTPListener, udpRTCPListener: udpRTCPListener, tcpChannel: tcpChannel, localSSRCs: localSSRCs, srtpInCtx: srtpInCtx, srtpOutCtx: srtpOutCtx, } cm.initialize() udpRTPListener = nil udpRTCPListener = nil c.propsMutex.Lock() if c.setuppedMedias == nil { c.setuppedMedias = make(map[*description.Media]*clientMedia) } c.setuppedMedias[medi] = cm c.baseURL = baseURL c.setuppedTransport = &SessionTransport{ Protocol: protocol, Profile: th.Profile, } c.propsMutex.Unlock() if medi.IsBackChannel { c.backChannelSetupped = true } else { c.stdChannelSetupped = true } if c.state == clientStateInitial { c.state = clientStatePrePlay } return res, nil } func (c *Client) isChannelPairInUse(channel int) bool { for _, cm := range c.setuppedMedias { if (cm.tcpChannel+1) == channel || cm.tcpChannel == channel || cm.tcpChannel == (channel+1) { return true } } return false } func (c *Client) findFreeChannelPair() int { for i := 0; ; i += 2 { // prefer even channels if !c.isChannelPairInUse(i) { return i } } } // Setup sends a SETUP request. // rtpPort and rtcpPort are used only if transport is UDP. // if rtpPort and rtcpPort are zero, they are chosen automatically. func (c *Client) Setup( baseURL *base.URL, media *description.Media, rtpPort int, rtcpPort int, ) (*base.Response, error) { cres := make(chan clientRes) select { case c.chSetup <- setupReq{ baseURL: baseURL, media: media, rtpPort: rtpPort, rtcpPort: rtcpPort, res: cres, }: res := <-cres return res.res, res.err case <-c.done: return nil, c.closeError } } // SetupAll setups all the given medias. func (c *Client) SetupAll(baseURL *base.URL, medias []*description.Media) error { for _, m := range medias { _, err := c.Setup(baseURL, m, 0, 0) if err != nil { return err } } return nil } func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStatePrePlay: {}, }) if err != nil { return nil, err } c.state = clientStatePlay c.startTransportRoutines() c.createWriter() // Range is mandatory in Parrot Streaming Server if ra == nil { ra = &headers.Range{ Value: &headers.RangeNPT{ Start: 0, }, } } header := base.Header{ "Range": ra.Marshal(), } if c.backChannelSetupped { header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"} } // when protocol is UDP, // open the firewall by sending empty packets to the remote part. // do this before sending the PLAY request. if c.setuppedTransport.Protocol == ProtocolUDP { for _, cm := range c.setuppedMedias { if !cm.media.IsBackChannel && cm.udpRTPListener.writeAddr != nil { buf, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal() if cm.srtpOutCtx != nil { encr := make([]byte, cm.c.MaxPacketSize) encr, err = cm.srtpOutCtx.encryptRTP(encr, buf, nil) if err != nil { return nil, err } buf = encr } err = cm.udpRTPListener.write(buf) if err != nil { return nil, err } buf, _ = (&rtcp.ReceiverReport{}).Marshal() if cm.srtpOutCtx != nil { encr := make([]byte, cm.c.MaxPacketSize) encr, err = cm.srtpOutCtx.encryptRTCP(encr, buf, nil) if err != nil { return nil, err } buf = encr } err = cm.udpRTCPListener.write(buf) if err != nil { return nil, err } } } } res, err := c.do(&base.Request{ Method: base.Play, URL: c.baseURL, Header: header, }, false) if err != nil { c.destroyWriter() c.stopTransportRoutines() c.state = clientStatePrePlay return nil, err } if res.StatusCode != base.StatusOK { c.destroyWriter() c.stopTransportRoutines() c.state = clientStatePrePlay return nil, liberrors.ErrClientBadStatusCode{ Code: res.StatusCode, Message: res.StatusMessage, } } c.startWriter() c.lastRange = ra return res, nil } // Play sends a PLAY request. // This can be called only after Setup(). func (c *Client) Play(ra *headers.Range) (*base.Response, error) { cres := make(chan clientRes) select { case c.chPlay <- playReq{ra: ra, res: cres}: res := <-cres return res.res, res.err case <-c.done: return nil, c.closeError } } func (c *Client) doRecord() (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStatePreRecord: {}, }) if err != nil { return nil, err } c.state = clientStateRecord c.startTransportRoutines() c.createWriter() res, err := c.do(&base.Request{ Method: base.Record, URL: c.baseURL, }, false) if err != nil { c.destroyWriter() c.stopTransportRoutines() c.state = clientStatePreRecord return nil, err } if res.StatusCode != base.StatusOK { c.destroyWriter() c.stopTransportRoutines() c.state = clientStatePreRecord return nil, liberrors.ErrClientBadStatusCode{ Code: res.StatusCode, Message: res.StatusMessage, } } c.startWriter() return nil, nil } // Record sends a RECORD request. // This can be called only after Announce() and Setup(). func (c *Client) Record() (*base.Response, error) { cres := make(chan clientRes) select { case c.chRecord <- recordReq{res: cres}: res := <-cres return res.res, res.err case <-c.done: return nil, c.closeError } } func (c *Client) doPause() (*base.Response, error) { err := c.checkState(map[clientState]struct{}{ clientStatePlay: {}, clientStateRecord: {}, }) if err != nil { return nil, err } c.destroyWriter() res, err := c.do(&base.Request{ Method: base.Pause, URL: c.baseURL, }, false) if err != nil { c.createWriter() c.startWriter() return nil, err } if res.StatusCode != base.StatusOK { c.createWriter() c.startWriter() return nil, liberrors.ErrClientBadStatusCode{ Code: res.StatusCode, Message: res.StatusMessage, } } c.stopTransportRoutines() switch c.state { case clientStatePlay: c.state = clientStatePrePlay case clientStateRecord: c.state = clientStatePreRecord } return res, nil } // Pause sends a PAUSE request. // This can be called only after Play() or Record(). func (c *Client) Pause() (*base.Response, error) { cres := make(chan clientRes) select { case c.chPause <- pauseReq{res: cres}: res := <-cres return res.res, res.err case <-c.done: return nil, c.closeError } } // OnPacketRTPAny sets a callback that is called when a RTP packet is read from any setupped media. func (c *Client) OnPacketRTPAny(cb OnPacketRTPAnyFunc) { for _, cm := range c.setuppedMedias { cmedia := cm.media for _, forma := range cm.media.Formats { c.OnPacketRTP(cm.media, forma, func(pkt *rtp.Packet) { cb(cmedia, forma, pkt) }) } } } // OnPacketRTCPAny sets a callback that is called when a RTCP packet is read from any setupped media. func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) { for _, cm := range c.setuppedMedias { cmedia := cm.media c.OnPacketRTCP(cm.media, func(pkt rtcp.Packet) { cb(cmedia, pkt) }) } } // OnPacketRTP sets a callback that is called when a RTP packet is read. func (c *Client) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) { cm := c.setuppedMedias[medi] ct := cm.formats[forma.PayloadType()] ct.onPacketRTP = cb } // OnPacketRTCP sets a callback that is called when a RTCP packet is read. func (c *Client) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) { cm := c.setuppedMedias[medi] cm.onPacketRTCP = cb } // WritePacketRTP writes a RTP packet to the server. func (c *Client) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error { return c.WritePacketRTPWithNTP(medi, pkt, c.timeNow()) } // WritePacketRTPWithNTP writes a RTP packet to the server. // ntp is the absolute timestamp of the packet, and is sent with periodic RTCP sender reports. func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error { select { case <-c.done: return c.closeError default: } cm := c.setuppedMedias[medi] cf := cm.formats[pkt.PayloadType] return cf.writePacketRTP(pkt, ntp) } // WritePacketRTCP writes a RTCP packet to the server. func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error { select { case <-c.done: return c.closeError default: } cm := c.setuppedMedias[medi] return cm.writePacketRTCP(pkt) } // PacketPTS returns the PTS (presentation timestamp) of an incoming RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (int64, bool) { cm := c.setuppedMedias[medi] ct := cm.formats[pkt.PayloadType] return c.timeDecoder.Decode(ct.format, pkt) } // PacketNTP returns the NTP (absolute timestamp) of an incoming RTP packet. // The NTP is computed from RTCP sender reports. func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { cm := c.setuppedMedias[medi] ct := cm.formats[pkt.PayloadType] return ct.rtpReceiver.PacketNTP(pkt.Timestamp) } // Transport returns transport details. func (c *Client) Transport() *ClientTransport { c.propsMutex.RLock() defer c.propsMutex.RUnlock() return &ClientTransport{ Conn: ConnTransport{ Tunnel: c.Tunnel, }, Session: c.setuppedTransport, } } // Stats returns client statistics. func (c *Client) Stats() *ClientStats { c.propsMutex.RLock() defer c.propsMutex.RUnlock() mediaStats := func() map[*description.Media]SessionStatsMedia { //nolint:dupl ret := make(map[*description.Media]SessionStatsMedia, len(c.setuppedMedias)) for med, sm := range c.setuppedMedias { ret[med] = SessionStatsMedia{ BytesReceived: atomic.LoadUint64(sm.bytesReceived), BytesSent: atomic.LoadUint64(sm.bytesSent), RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), Formats: func() map[format.Format]SessionStatsFormat { ret := make(map[format.Format]SessionStatsFormat, len(sm.formats)) for _, fo := range sm.formats { recvStats := func() *rtpreceiver.Stats { if fo.rtpReceiver != nil { return fo.rtpReceiver.Stats() } return nil }() sentStats := func() *rtpsender.Stats { if fo.rtpSender != nil { return fo.rtpSender.Stats() } return nil }() ret[fo.format] = SessionStatsFormat{ //nolint:dupl RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), LocalSSRC: fo.localSSRC, RemoteSSRC: func() uint32 { if v, ok := fo.remoteSSRC(); ok { return v } return 0 }(), RTPPacketsLastSequenceNumber: func() uint16 { if recvStats != nil { return recvStats.LastSequenceNumber } if sentStats != nil { return sentStats.LastSequenceNumber } return 0 }(), RTPPacketsLastRTP: func() uint32 { if recvStats != nil { return recvStats.LastRTP } if sentStats != nil { return sentStats.LastRTP } return 0 }(), RTPPacketsLastNTP: func() time.Time { if recvStats != nil { return recvStats.LastNTP } if sentStats != nil { return sentStats.LastNTP } return time.Time{} }(), RTPPacketsJitter: func() float64 { if recvStats != nil { return recvStats.Jitter } return 0 }(), } } return ret }(), } } return ret }() return &ClientStats{ Conn: ConnStats{ BytesReceived: atomic.LoadUint64(c.bytesReceived), BytesSent: atomic.LoadUint64(c.bytesSent), }, Session: SessionStats{ //nolint:dupl BytesReceived: func() uint64 { v := uint64(0) for _, ms := range mediaStats { v += ms.BytesReceived } return v }(), BytesSent: func() uint64 { v := uint64(0) for _, ms := range mediaStats { v += ms.BytesSent } return v }(), RTPPacketsReceived: func() uint64 { v := uint64(0) for _, ms := range mediaStats { for _, f := range ms.Formats { v += f.RTPPacketsReceived } } return v }(), RTPPacketsSent: func() uint64 { v := uint64(0) for _, ms := range mediaStats { for _, f := range ms.Formats { v += f.RTPPacketsSent } } return v }(), RTPPacketsLost: func() uint64 { v := uint64(0) for _, ms := range mediaStats { for _, f := range ms.Formats { v += f.RTPPacketsLost } } return v }(), RTPPacketsInError: func() uint64 { v := uint64(0) for _, ms := range mediaStats { v += ms.RTPPacketsInError } return v }(), RTPPacketsJitter: func() float64 { v := float64(0) n := float64(0) for _, ms := range mediaStats { for _, f := range ms.Formats { v += f.RTPPacketsJitter n++ } } if n != 0 { return v / n } return 0 }(), RTCPPacketsReceived: func() uint64 { v := uint64(0) for _, ms := range mediaStats { v += ms.RTCPPacketsReceived } return v }(), RTCPPacketsSent: func() uint64 { v := uint64(0) for _, ms := range mediaStats { v += ms.RTCPPacketsSent } return v }(), RTCPPacketsInError: func() uint64 { v := uint64(0) for _, ms := range mediaStats { v += ms.RTCPPacketsInError } return v }(), Medias: mediaStats, }, } }