diff --git a/pkg/nest/api.go b/pkg/nest/api.go index c2f255c2..4e9e4dbd 100644 --- a/pkg/nest/api.go +++ b/pkg/nest/api.go @@ -166,42 +166,100 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) { uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" + projectID + "/devices/" + deviceID + ":executeCommand" - req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) + + maxRetries := 3 + retryDelay := time.Second * 30 + + for attempt := 0; attempt < maxRetries; attempt++ { + req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) + if err != nil { + return "", err + } + + req.Header.Set("Authorization", "Bearer "+a.Token) + + client := &http.Client{Timeout: time.Second * 5000} + res, err := client.Do(req) + if err != nil { + return "", err + } + + // Handle 409 (Conflict), 429 (Too Many Requests), and 401 (Unauthorized) + if res.StatusCode == 409 || res.StatusCode == 429 || res.StatusCode == 401 { + res.Body.Close() + if attempt < maxRetries-1 { + // Get new token from Google + if err := a.refreshToken(); err != nil { + return "", err + } + time.Sleep(retryDelay) + retryDelay *= 2 // exponential backoff + continue + } + } + + defer res.Body.Close() + + if res.StatusCode != 200 { + return "", errors.New("nest: wrong status: " + res.Status) + } + + var resv struct { + Results struct { + Answer string `json:"answerSdp"` + ExpiresAt time.Time `json:"expiresAt"` + MediaSessionID string `json:"mediaSessionId"` + } `json:"results"` + } + + if err = json.NewDecoder(res.Body).Decode(&resv); err != nil { + return "", err + } + + a.StreamProjectID = projectID + a.StreamDeviceID = deviceID + a.StreamSessionID = resv.Results.MediaSessionID + a.StreamExpiresAt = resv.Results.ExpiresAt + + return resv.Results.Answer, nil + } + + return "", errors.New("nest: max retries exceeded") +} + +func (a *API) refreshToken() error { + // Get the cached API with matching token to get credentials + var refreshKey string + cacheMu.Lock() + for key, api := range cache { + if api.Token == a.Token { + refreshKey = key + break + } + } + cacheMu.Unlock() + + if refreshKey == "" { + return errors.New("nest: unable to find cached credentials") + } + + // Parse credentials from cache key + parts := strings.Split(refreshKey, ":") + if len(parts) != 3 { + return errors.New("nest: invalid cache key format") + } + clientID, clientSecret, refreshToken := parts[0], parts[1], parts[2] + + // Get new API instance which will refresh the token + newAPI, err := NewAPI(clientID, clientSecret, refreshToken) if err != nil { - return "", err + return err } - req.Header.Set("Authorization", "Bearer "+a.Token) - - client := &http.Client{Timeout: time.Second * 5000} - res, err := client.Do(req) - if err != nil { - return "", err - } - defer res.Body.Close() - - if res.StatusCode != 200 { - return "", errors.New("nest: wrong status: " + res.Status) - } - - var resv struct { - Results struct { - Answer string `json:"answerSdp"` - ExpiresAt time.Time `json:"expiresAt"` - MediaSessionID string `json:"mediaSessionId"` - } `json:"results"` - } - - if err = json.NewDecoder(res.Body).Decode(&resv); err != nil { - return "", err - } - - a.StreamProjectID = projectID - a.StreamDeviceID = deviceID - a.StreamSessionID = resv.Results.MediaSessionID - a.StreamExpiresAt = resv.Results.ExpiresAt - - return resv.Results.Answer, nil + // Update current API with new token + a.Token = newAPI.Token + a.ExpiresAt = newAPI.ExpiresAt + return nil } func (a *API) ExtendStream() error { @@ -407,20 +465,22 @@ type Device struct { } func (a *API) StartExtendStreamTimer() { - // Calculate the duration until 30 seconds before the stream expires - duration := time.Until(a.StreamExpiresAt.Add(-30 * time.Second)) - a.extendTimer = time.AfterFunc(duration, func() { + if a.extendTimer != nil { + return + } + + a.extendTimer = time.NewTimer(time.Until(a.StreamExpiresAt) - time.Minute) + go func() { + <-a.extendTimer.C if err := a.ExtendStream(); err != nil { return } - duration = time.Until(a.StreamExpiresAt.Add(-30 * time.Second)) - a.extendTimer.Reset(duration) - }) + }() } func (a *API) StopExtendStreamTimer() { - if a.extendTimer == nil { - return + if a.extendTimer != nil { + a.extendTimer.Stop() + a.extendTimer = nil } - a.extendTimer.Stop() } diff --git a/pkg/nest/client.go b/pkg/nest/client.go index 2c812834..6a570913 100644 --- a/pkg/nest/client.go +++ b/pkg/nest/client.go @@ -4,6 +4,7 @@ import ( "errors" "net/url" "strings" + "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/rtsp" @@ -38,9 +39,26 @@ func Dial(rawURL string) (core.Producer, error) { return nil, errors.New("nest: wrong query") } - nestAPI, err := NewAPI(cliendID, cliendSecret, refreshToken) - if err != nil { - return nil, err + maxRetries := 3 + retryDelay := time.Second * 30 + + var nestAPI *API + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + nestAPI, err = NewAPI(cliendID, cliendSecret, refreshToken) + if err == nil { + break + } + lastErr = err + if attempt < maxRetries-1 { + time.Sleep(retryDelay) + retryDelay *= 2 // exponential backoff + } + } + + if nestAPI == nil { + return nil, lastErr } protocols := strings.Split(query.Get("protocols"), ",") @@ -79,48 +97,62 @@ func (c *WebRTCClient) MarshalJSON() ([]byte, error) { } func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, error) { - rtcAPI, err := webrtc.NewAPI() - if err != nil { - return nil, err + maxRetries := 3 + retryDelay := time.Second * 30 + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + rtcAPI, err := webrtc.NewAPI() + if err != nil { + return nil, err + } + + conf := pion.Configuration{} + pc, err := rtcAPI.NewPeerConnection(conf) + if err != nil { + return nil, err + } + + conn := webrtc.NewConn(pc) + conn.FormatName = "nest/webrtc" + conn.Mode = core.ModeActiveProducer + conn.Protocol = "http" + conn.URL = rawURL + + // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields + medias := []*core.Media{ + {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + {Kind: "app"}, // important for Nest + } + + // 3. Create offer with candidates + offer, err := conn.CreateCompleteOffer(medias) + if err != nil { + return nil, err + } + + // 4. Exchange SDP via Hass + answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer) + if err != nil { + lastErr = err + if attempt < maxRetries-1 { + time.Sleep(retryDelay) + retryDelay *= 2 + continue + } + return nil, err + } + + // 5. Set answer with remote medias + if err = conn.SetAnswer(answer); err != nil { + return nil, err + } + + return &WebRTCClient{conn: conn, api: nestAPI}, nil } - conf := pion.Configuration{} - pc, err := rtcAPI.NewPeerConnection(conf) - if err != nil { - return nil, err - } - - conn := webrtc.NewConn(pc) - conn.FormatName = "nest/webrtc" - conn.Mode = core.ModeActiveProducer - conn.Protocol = "http" - conn.URL = rawURL - - // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields - medias := []*core.Media{ - {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, - {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, - {Kind: "app"}, // important for Nest - } - - // 3. Create offer with candidates - offer, err := conn.CreateCompleteOffer(medias) - if err != nil { - return nil, err - } - - // 4. Exchange SDP via Hass - answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer) - if err != nil { - return nil, err - } - - // 5. Set answer with remote medias - if err = conn.SetAnswer(answer); err != nil { - return nil, err - } - - return &WebRTCClient{conn: conn, api: nestAPI}, nil + return nil, lastErr } func rtspConn(nestAPI *API, rawURL, projectID, deviceID string) (*RTSPClient, error) {