mirror of
https://github.com/nats-io/nats.go.git
synced 2025-09-26 20:41:41 +08:00
[FIXED] Call ReconnectErrHandler for initial connection failures with RetryOnFailedConnect (#1915)
When RetryOnFailedConnect is enabled and initial connection fails, ReconnectErrHandler is now called to report the error, allowing users to debug initial connection issues. Resolves issue mentioned in https://github.com/nats-io/nats.go/issues/1870#issuecomment-2853851326 Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
18
nats.go
18
nats.go
@@ -1660,14 +1660,17 @@ func (o Options) Connect() (*Conn, error) {
|
||||
// Create reader/writer
|
||||
nc.newReaderWriter()
|
||||
|
||||
// Spin up the async cb dispatcher before connect so it's ready
|
||||
// to handle callbacks, especially when RetryOnFailedConnect is used
|
||||
// and initial connection fails.
|
||||
go nc.ach.asyncCBDispatcher()
|
||||
|
||||
connectionEstablished, err := nc.connect()
|
||||
if err != nil {
|
||||
nc.ach.close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Spin up the async cb dispatcher on success
|
||||
go nc.ach.asyncCBDispatcher()
|
||||
|
||||
if connectionEstablished && nc.Opts.ConnectedCB != nil {
|
||||
nc.ach.push(func() { nc.Opts.ConnectedCB(nc) })
|
||||
}
|
||||
@@ -2554,7 +2557,7 @@ func (nc *Conn) connect() (bool, error) {
|
||||
nc.setup()
|
||||
nc.changeConnStatus(RECONNECTING)
|
||||
nc.bw.switchToPending()
|
||||
go nc.doReconnect(ErrNoServers, false)
|
||||
go nc.doReconnect(err, false)
|
||||
err = nil
|
||||
} else {
|
||||
nc.current = nil
|
||||
@@ -2877,6 +2880,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
|
||||
|
||||
// Clear any errors.
|
||||
nc.err = nil
|
||||
|
||||
// Perform appropriate callback if needed for a disconnect.
|
||||
// DisconnectedErrCB has priority over deprecated DisconnectedCB
|
||||
if !nc.initc {
|
||||
@@ -2885,6 +2889,12 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
|
||||
} else if nc.Opts.DisconnectedCB != nil {
|
||||
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
|
||||
}
|
||||
} else if nc.Opts.RetryOnFailedConnect && nc.initc && err != nil {
|
||||
// For initial connection failure with RetryOnFailedConnect,
|
||||
// report the error via ReconnectErrCB if available
|
||||
if nc.Opts.ReconnectErrCB != nil {
|
||||
nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) })
|
||||
}
|
||||
}
|
||||
|
||||
// This is used to wait on go routines exit if we start them in the loop
|
||||
|
@@ -2920,6 +2920,77 @@ func TestRetryOnFailedConnect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryOnFailedConnectReconnectErrCB(t *testing.T) {
|
||||
errChan := make(chan error, 10)
|
||||
|
||||
nc, err := nats.Connect(nats.DefaultURL,
|
||||
nats.RetryOnFailedConnect(true),
|
||||
nats.MaxReconnects(0), // Limited retries for faster test
|
||||
nats.ReconnectWait(10*time.Millisecond),
|
||||
nats.ReconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
errChan <- err
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
// Verify the first error is the initial connection error
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if !errors.Is(err, nats.ErrNoServers) {
|
||||
t.Fatalf("Expected ErrNoServers for initial connection failure, got: %v", err)
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("Should have received initial connection error in ReconnectErrCB")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryOnFailedConnectWithAuthError(t *testing.T) {
|
||||
o := test.DefaultTestOptions
|
||||
o.Username = "user"
|
||||
o.Password = "password"
|
||||
s := RunServerWithOptions(&o)
|
||||
defer s.Shutdown()
|
||||
|
||||
errChan := make(chan error, 10)
|
||||
closedCh := make(chan bool, 1)
|
||||
|
||||
// Try to connect without credentials
|
||||
nc, err := nats.Connect(nats.DefaultURL,
|
||||
nats.RetryOnFailedConnect(true),
|
||||
nats.MaxReconnects(2),
|
||||
nats.ReconnectWait(10*time.Millisecond),
|
||||
nats.ReconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
errChan <- err
|
||||
}),
|
||||
nats.ClosedHandler(func(_ *nats.Conn) {
|
||||
closedCh <- true
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
// Wait for closed due to auth failure
|
||||
select {
|
||||
case <-closedCh:
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("Connection should have closed due to auth failure")
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if !errors.Is(err, nats.ErrAuthorization) {
|
||||
t.Fatalf("Expected ErrAuthorization for auth failure, got: %v", err)
|
||||
}
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("Should have received authorization error in ReconnectErrCB")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryOnFailedConnectWithTLSError(t *testing.T) {
|
||||
opts := test.DefaultTestOptions
|
||||
opts.Port = 4222
|
||||
|
Reference in New Issue
Block a user