From d475f2c9ee342e7261e4dc34d86ed8dc412cddaf Mon Sep 17 00:00:00 2001 From: Joe Turki Date: Thu, 18 Dec 2025 02:43:17 +0200 Subject: [PATCH] Implement write deadline --- internal/mux/endpoint.go | 19 +++++---- internal/mux/mux_test.go | 88 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 12 deletions(-) diff --git a/internal/mux/endpoint.go b/internal/mux/endpoint.go index 06b71f40..33088baf 100644 --- a/internal/mux/endpoint.go +++ b/internal/mux/endpoint.go @@ -80,21 +80,24 @@ func (e *Endpoint) RemoteAddr() net.Addr { return e.mux.nextConn.RemoteAddr() } -// SetDeadline sets the read deadline for this Endpoint. -// Write deadlines are not supported because writes go directly to the shared -// underlying connection and are non-blocking for this endpoint. +// SetDeadline sets the read and write deadlines on the shared underlying +// connection. Because the connection is shared, this applies to all endpoints +// on the mux. Per-endpoint read deadlines can be set with SetReadDeadline. func (e *Endpoint) SetDeadline(t time.Time) error { - return e.buffer.SetReadDeadline(t) + return e.mux.nextConn.SetDeadline(t) } -// SetReadDeadline sets the read deadline for this Endpoint. +// SetReadDeadline sets the read deadline for this Endpoint's internal +// packet buffer. This timeout applies only to reads from this Endpoint, +// not to the shared underlying connection. func (e *Endpoint) SetReadDeadline(t time.Time) error { return e.buffer.SetReadDeadline(t) } -// SetWriteDeadline is a stub. -func (e *Endpoint) SetWriteDeadline(time.Time) error { - return nil +// SetWriteDeadline sets the write deadline on the shared underlying connection. +// Because the connection is shared, this applies to all endpoints on the mux. +func (e *Endpoint) SetWriteDeadline(t time.Time) error { + return e.mux.nextConn.SetWriteDeadline(t) } // SetOnClose is a user set callback that diff --git a/internal/mux/mux_test.go b/internal/mux/mux_test.go index a219f85b..1899b37a 100644 --- a/internal/mux/mux_test.go +++ b/internal/mux/mux_test.go @@ -4,6 +4,7 @@ package mux import ( + "errors" "io" "net" "testing" @@ -41,10 +42,6 @@ func TestEndpointDeadline(t *testing.T) { name: "SetReadDeadline", setDeadline: (*Endpoint).SetReadDeadline, }, - { - name: "SetDeadline", - setDeadline: (*Endpoint).SetDeadline, - }, } for _, tt := range tests { @@ -78,6 +75,89 @@ func TestEndpointDeadline(t *testing.T) { } } +type writeDeadlineConn struct { + net.Conn + writeDeadline time.Time +} + +func (w *writeDeadlineConn) SetWriteDeadline(t time.Time) error { + w.writeDeadline = t + + if w.Conn == nil { + return nil + } + + return w.Conn.SetWriteDeadline(t) +} + +func TestEndpointSetWriteDeadline(t *testing.T) { + lim := test.TimeOut(2 * time.Second) + defer lim.Stop() + + ca, cb := net.Pipe() + defer func() { + _ = cb.Close() + }() + + rdConn := &writeDeadlineConn{Conn: ca} + + mux := NewMux(Config{ + Conn: rdConn, + BufferSize: testPipeBufferSize, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + + endpoint := mux.NewEndpoint(MatchAll) + deadline := time.Now().Add(10 * time.Millisecond) + require.NoError(t, endpoint.SetWriteDeadline(deadline)) + require.WithinDuration(t, deadline, rdConn.writeDeadline, time.Millisecond) + + require.NoError(t, mux.Close()) +} + +type writeDeadlineErrorConn struct { + net.Conn + deadlineErr error +} + +func (w *writeDeadlineErrorConn) SetDeadline(t time.Time) error { + if w.deadlineErr != nil { + return w.deadlineErr + } + + return nil +} + +var errDeadlineTest = errors.New("write deadline failed") + +func TestEndpointSetDeadlineWriteDeadlineError(t *testing.T) { + lim := test.TimeOut(2 * time.Second) + defer lim.Stop() + + ca, cb := net.Pipe() + defer func() { + _ = ca.Close() + _ = cb.Close() + }() + + rdConn := &writeDeadlineErrorConn{Conn: ca, deadlineErr: errDeadlineTest} + + mux := NewMux(Config{ + Conn: rdConn, + BufferSize: testPipeBufferSize, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + + endpoint := mux.NewEndpoint(MatchAll) + err := endpoint.SetDeadline(time.Now().Add(10 * time.Millisecond)) + require.Error(t, err) + require.ErrorIs(t, err, errDeadlineTest) + + require.NoError(t, mux.Close()) + require.NoError(t, ca.Close()) + require.NoError(t, rdConn.Close()) +} + type muxErrorConnReadResult struct { err error data []byte