From fe10a7e55f0edac93aa9be3721e681d34ef913f4 Mon Sep 17 00:00:00 2001 From: hnws Date: Fri, 21 Mar 2025 23:10:16 -0400 Subject: [PATCH] feat(nest): add retry logic for 429 and 409 errors with exponential backoff --- pkg/nest/api.go | 130 +++++++++++++++++++++++++++++++++----------- pkg/nest/client.go | 133 ++++++++++++++++++++++++++++++--------------- 2 files changed, 187 insertions(+), 76 deletions(-) diff --git a/pkg/nest/api.go b/pkg/nest/api.go index c2f255c2..e9c5f10f 100644 --- a/pkg/nest/api.go +++ b/pkg/nest/api.go @@ -166,42 +166,106 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) { uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" + projectID + "/devices/" + deviceID + ":executeCommand" - req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) + + maxRetries := 3 + retryDelay := time.Second * 30 + + for attempt := 0; attempt < maxRetries; attempt++ { + req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) + if err != nil { + return "", err + } + + req.Header.Set("Authorization", "Bearer "+a.Token) + + client := &http.Client{Timeout: time.Second * 5000} + res, err := client.Do(req) + if err != nil { + return "", err + } + + // Handle 409 (Conflict) and 429 (Too Many Requests) + if res.StatusCode == 409 || res.StatusCode == 429 { + res.Body.Close() + if attempt < maxRetries-1 { + log.Info(). + Int("status", res.StatusCode). + Float64("delay", retryDelay.Seconds()). + Int("attempt", attempt+1). + Int("max_retries", maxRetries-1). + Msg("API request failed, retrying") + // Get new token from Google + if err := a.refreshToken(); err != nil { + return "", err + } + time.Sleep(retryDelay) + retryDelay *= 2 // exponential backoff + continue + } + } + + defer res.Body.Close() + + if res.StatusCode != 200 { + return "", errors.New("nest: wrong status: " + res.Status) + } + + var resv struct { + Results struct { + Answer string `json:"answerSdp"` + ExpiresAt time.Time `json:"expiresAt"` + MediaSessionID string `json:"mediaSessionId"` + } `json:"results"` + } + + if err = json.NewDecoder(res.Body).Decode(&resv); err != nil { + return "", err + } + + a.StreamProjectID = projectID + a.StreamDeviceID = deviceID + a.StreamSessionID = resv.Results.MediaSessionID + a.StreamExpiresAt = resv.Results.ExpiresAt + + return resv.Results.Answer, nil + } + + return "", errors.New("nest: max retries exceeded") +} + +func (a *API) refreshToken() error { + // Get the cached API with matching token to get credentials + var refreshKey string + cacheMu.Lock() + for key, api := range cache { + if api.Token == a.Token { + refreshKey = key + break + } + } + cacheMu.Unlock() + + if refreshKey == "" { + return errors.New("nest: unable to find cached credentials") + } + + // Parse credentials from cache key + parts := strings.Split(refreshKey, ":") + if len(parts) != 3 { + return errors.New("nest: invalid cache key format") + } + clientID, clientSecret, refreshToken := parts[0], parts[1], parts[2] + + // Get new API instance which will refresh the token + newAPI, err := NewAPI(clientID, clientSecret, refreshToken) if err != nil { - return "", err + return err } - req.Header.Set("Authorization", "Bearer "+a.Token) - - client := &http.Client{Timeout: time.Second * 5000} - res, err := client.Do(req) - if err != nil { - return "", err - } - defer res.Body.Close() - - if res.StatusCode != 200 { - return "", errors.New("nest: wrong status: " + res.Status) - } - - var resv struct { - Results struct { - Answer string `json:"answerSdp"` - ExpiresAt time.Time `json:"expiresAt"` - MediaSessionID string `json:"mediaSessionId"` - } `json:"results"` - } - - if err = json.NewDecoder(res.Body).Decode(&resv); err != nil { - return "", err - } - - a.StreamProjectID = projectID - a.StreamDeviceID = deviceID - a.StreamSessionID = resv.Results.MediaSessionID - a.StreamExpiresAt = resv.Results.ExpiresAt - - return resv.Results.Answer, nil + // Update current API with new token + a.Token = newAPI.Token + a.ExpiresAt = newAPI.ExpiresAt + return nil } func (a *API) ExtendStream() error { diff --git a/pkg/nest/client.go b/pkg/nest/client.go index 93c4ce64..65153624 100644 --- a/pkg/nest/client.go +++ b/pkg/nest/client.go @@ -4,13 +4,17 @@ import ( "errors" "net/url" "strings" + "time" + "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/webrtc" pion "github.com/pion/webrtc/v3" ) +var log = app.GetLogger("nest") + type WebRTCClient struct { conn *webrtc.Conn api *API @@ -38,9 +42,32 @@ func Dial(rawURL string) (core.Producer, error) { return nil, errors.New("nest: wrong query") } - nestAPI, err := NewAPI(cliendID, cliendSecret, refreshToken) - if err != nil { - return nil, err + maxRetries := 3 + retryDelay := time.Second * 30 + + var nestAPI *API + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + nestAPI, err = NewAPI(cliendID, cliendSecret, refreshToken) + if err == nil { + break + } + lastErr = err + if attempt < maxRetries-1 { + log.Info(). + Float64("delay", retryDelay.Seconds()). + Int("attempt", attempt+1). + Int("max_retries", maxRetries-1). + Err(err). + Msg("API initialization failed, retrying") + time.Sleep(retryDelay) + retryDelay *= 2 // exponential backoff + } + } + + if nestAPI == nil { + return nil, lastErr } protocols := strings.Split(query.Get("protocols"), ",") @@ -79,48 +106,68 @@ func (c *WebRTCClient) MarshalJSON() ([]byte, error) { } func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, error) { - rtcAPI, err := webrtc.NewAPI() - if err != nil { - return nil, err + maxRetries := 3 + retryDelay := time.Second * 30 + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + rtcAPI, err := webrtc.NewAPI() + if err != nil { + return nil, err + } + + conf := pion.Configuration{} + pc, err := rtcAPI.NewPeerConnection(conf) + if err != nil { + return nil, err + } + + conn := webrtc.NewConn(pc) + conn.FormatName = "nest/webrtc" + conn.Mode = core.ModeActiveProducer + conn.Protocol = "http" + conn.URL = rawURL + + // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields + medias := []*core.Media{ + {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + {Kind: "app"}, // important for Nest + } + + // 3. Create offer with candidates + offer, err := conn.CreateCompleteOffer(medias) + if err != nil { + return nil, err + } + + // 4. Exchange SDP via Hass + answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer) + if err != nil { + lastErr = err + if attempt < maxRetries-1 { + log.Info(). + Float64("delay", retryDelay.Seconds()). + Int("attempt", attempt+1). + Int("max_retries", maxRetries-1). + Err(err). + Msg("WebRTC connection setup failed, retrying") + time.Sleep(retryDelay) + retryDelay *= 2 + continue + } + return nil, err + } + + // 5. Set answer with remote medias + if err = conn.SetAnswer(answer); err != nil { + return nil, err + } + + return &WebRTCClient{conn: conn, api: nestAPI}, nil } - conf := pion.Configuration{} - pc, err := rtcAPI.NewPeerConnection(conf) - if err != nil { - return nil, err - } - - conn := webrtc.NewConn(pc) - conn.FormatName = "nest/webrtc" - conn.Mode = core.ModeActiveProducer - conn.Protocol = "http" - conn.URL = rawURL - - // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields - medias := []*core.Media{ - {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, - {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, - {Kind: "app"}, // important for Nest - } - - // 3. Create offer with candidates - offer, err := conn.CreateCompleteOffer(medias) - if err != nil { - return nil, err - } - - // 4. Exchange SDP via Hass - answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer) - if err != nil { - return nil, err - } - - // 5. Set answer with remote medias - if err = conn.SetAnswer(answer); err != nil { - return nil, err - } - - return &WebRTCClient{conn: conn, api: nestAPI}, nil + return nil, lastErr } func rtspConn(nestAPI *API, rawURL, projectID, deviceID string) (*RTSPClient, error) {