mirror of
https://github.com/nats-io/nats.go.git
synced 2025-09-26 20:41:41 +08:00
[FIXED] timeoutWriter not recovering after first error (#1896)
Signed-off-by: Waldemar Quevedo <wally@nats.io>
This commit is contained in:
10
nats.go
10
nats.go
@@ -6149,18 +6149,12 @@ func sigHandler(nonce []byte, seedFile string) ([]byte, error) {
|
||||
type timeoutWriter struct {
|
||||
timeout time.Duration
|
||||
conn net.Conn
|
||||
err error
|
||||
}
|
||||
|
||||
// Write implements the io.Writer interface.
|
||||
func (tw *timeoutWriter) Write(p []byte) (int, error) {
|
||||
if tw.err != nil {
|
||||
return 0, tw.err
|
||||
}
|
||||
|
||||
var n int
|
||||
tw.conn.SetWriteDeadline(time.Now().Add(tw.timeout))
|
||||
n, tw.err = tw.conn.Write(p)
|
||||
n, err := tw.conn.Write(p)
|
||||
tw.conn.SetWriteDeadline(time.Time{})
|
||||
return n, tw.err
|
||||
return n, err
|
||||
}
|
||||
|
72
nats_test.go
72
nats_test.go
@@ -1780,3 +1780,75 @@ func BenchmarkHeaderDecode(b *testing.B) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// mockConn simulates a network connection that can fail and recover
|
||||
// after a number of attempts.
|
||||
type mockConn struct {
|
||||
failures int
|
||||
temporaryFailures int
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (mc *mockConn) Write(p []byte) (int, error) {
|
||||
if mc.failures < mc.temporaryFailures {
|
||||
mc.failures++
|
||||
return 0, &net.OpError{Op: "write", Net: "tcp", Err: errors.New("i/o timeout")}
|
||||
}
|
||||
mc.data = append(mc.data, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (mc *mockConn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
func (mc *mockConn) Read(b []byte) (int, error) { return 0, nil }
|
||||
func (mc *mockConn) Close() error { return nil }
|
||||
func (mc *mockConn) LocalAddr() net.Addr { return nil }
|
||||
func (mc *mockConn) RemoteAddr() net.Addr { return nil }
|
||||
func (mc *mockConn) SetDeadline(t time.Time) error { return nil }
|
||||
func (mc *mockConn) SetReadDeadline(t time.Time) error { return nil }
|
||||
|
||||
func TestTimeoutWriterRecovery(t *testing.T) {
|
||||
mc := &mockConn{temporaryFailures: 2}
|
||||
tw := &timeoutWriter{
|
||||
timeout: time.Second,
|
||||
conn: mc,
|
||||
}
|
||||
n, err := tw.Write([]byte("foo"))
|
||||
if err == nil {
|
||||
t.Fatal("Unexpected success")
|
||||
}
|
||||
if n != 0 {
|
||||
t.Fatalf("Expected 0 bytes, got %d", n)
|
||||
}
|
||||
n, err = tw.Write([]byte("bar"))
|
||||
if err == nil {
|
||||
t.Fatal("Unexpected success")
|
||||
}
|
||||
if n != 0 {
|
||||
t.Fatalf("Expected 0 bytes, got: %d", n)
|
||||
}
|
||||
|
||||
// Should succeed since it was a temporary error.
|
||||
testData := []byte("quux")
|
||||
n, err = tw.Write(testData)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected success, got: %v", err)
|
||||
}
|
||||
if n != len(testData) {
|
||||
t.Fatalf("Expected %d, got: %d", len(testData), n)
|
||||
}
|
||||
if !bytes.Equal(mc.data, testData) {
|
||||
t.Fatalf("Expected %q, got: %q", testData, mc.data)
|
||||
}
|
||||
testData2 := []byte("quuz")
|
||||
n, err = tw.Write(testData2)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected success, got: %v", err)
|
||||
}
|
||||
if n != len(testData2) {
|
||||
t.Fatalf("Expected %d bytes written, got %d", len(testData2), n)
|
||||
}
|
||||
expectedData := append(testData, testData2...)
|
||||
if !bytes.Equal(mc.data, expectedData) {
|
||||
t.Fatalf("Expected data %q, got %q", expectedData, mc.data)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user