From a2d6ce8af6c49641a539eb8ab4a17e35e6a2929b Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 23 Jan 2023 12:54:57 +0100 Subject: [PATCH] replace OnDecodeError with OnWarning (#177) --- client.go | 16 ++++++++++------ client_format.go | 2 +- client_media.go | 22 +++++++++++----------- client_play_test.go | 8 ++++---- client_record_test.go | 2 +- server_handler.go | 16 ++++++++++++++++ server_play_test.go | 2 +- server_record_test.go | 2 +- server_session_format.go | 2 +- server_session_media.go | 24 ++++++++++++------------ server_test.go | 8 ++++---- server_udpl.go | 9 +++++++-- 12 files changed, 69 insertions(+), 44 deletions(-) diff --git a/client.go b/client.go index 3acd92f0..397c3a84 100644 --- a/client.go +++ b/client.go @@ -207,7 +207,8 @@ type Client struct { // called after every response. OnResponse func(*base.Response) // called when there's a non-fatal warning. - // TODO: rename. + OnWarning func(error) + // Deprecated: replaced by OnWarning. OnDecodeError func(error) // @@ -312,10 +313,13 @@ func (c *Client) Start(scheme string, host string) error { c.OnResponse = func(*base.Response) { } } - if c.OnDecodeError == nil { - c.OnDecodeError = func(error) { + if c.OnWarning == nil { + c.OnWarning = func(error) { } } + if c.OnDecodeError != nil { + c.OnWarning = c.OnDecodeError + } // private if c.senderReportPeriod == 0 { @@ -587,7 +591,7 @@ func (c *Client) checkState(allowed map[clientState]struct{}) error { } func (c *Client) trySwitchingProtocol() error { - c.OnDecodeError(fmt.Errorf("no UDP packets received, switching to TCP")) + c.OnWarning(fmt.Errorf("no UDP packets received, switching to TCP")) prevScheme := c.scheme prevHost := c.host @@ -628,7 +632,7 @@ func (c *Client) trySwitchingProtocol() error { } func (c *Client) trySwitchingProtocol2(medi *media.Media, baseURL *url.URL) (*base.Response, error) { - c.OnDecodeError(fmt.Errorf("switching to TCP due to server request")) + c.OnWarning(fmt.Errorf("switching to TCP due to server request")) prevScheme := c.scheme prevHost := c.host @@ -1243,7 +1247,7 @@ func (c *Client) doSetup( if res.StatusCode == base.StatusUnsupportedTransport && c.effectiveTransport == nil && c.Transport == nil { - c.OnDecodeError(fmt.Errorf("switching to TCP due to server request")) + c.OnWarning(fmt.Errorf("switching to TCP due to server request")) v := TransportTCP c.effectiveTransport = &v return c.doSetup(medi, baseURL, 0, 0) diff --git a/client_format.go b/client_format.go index 717f4b83..99d87d08 100644 --- a/client_format.go +++ b/client_format.go @@ -95,7 +95,7 @@ func (ct *clientFormat) writePacketRTPWithNTP(pkt *rtp.Packet, ntp time.Time) er func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) { packets, missing := ct.udpReorderer.Process(pkt) if missing != 0 { - ct.c.OnDecodeError(fmt.Errorf("%d RTP packet(s) lost", missing)) + ct.c.OnWarning(fmt.Errorf("%d RTP packet(s) lost", missing)) // do not return } diff --git a/client_media.go b/client_media.go index c99275b8..d754979a 100644 --- a/client_media.go +++ b/client_media.go @@ -222,14 +222,14 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) error { atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) if len(payload) > maxPacketSize { - cm.c.OnDecodeError(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + cm.c.OnWarning(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.c.OnWarning(err) return nil } @@ -246,14 +246,14 @@ func (cm *clientMedia) readRTPTCPRecord(payload []byte) error { func (cm *clientMedia) readRTCPTCPRecord(payload []byte) error { if len(payload) > maxPacketSize { - cm.c.OnDecodeError(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + cm.c.OnWarning(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.c.OnWarning(err) return nil } @@ -270,20 +270,20 @@ func (cm *clientMedia) readRTPUDPPlay(payload []byte) error { atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - cm.c.OnDecodeError(fmt.Errorf("RTP packet is too big to be read with UDP")) + cm.c.OnWarning(fmt.Errorf("RTP packet is too big to be read with UDP")) return nil } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.c.OnWarning(err) return nil } forma, ok := cm.formats[pkt.PayloadType] if !ok { - cm.c.OnDecodeError(fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) + cm.c.OnWarning(fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) return nil } @@ -298,13 +298,13 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) error { atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - cm.c.OnDecodeError(fmt.Errorf("RTCP packet is too big to be read with UDP")) + cm.c.OnWarning(fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.c.OnWarning(err) return nil } @@ -332,13 +332,13 @@ func (cm *clientMedia) readRTCPUDPRecord(payload []byte) error { atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - cm.c.OnDecodeError(fmt.Errorf("RTCP packet is too big to be read with UDP")) + cm.c.OnWarning(fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.c.OnWarning(err) return nil } diff --git a/client_play_test.go b/client_play_test.go index 721bdd8b..763d786c 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -1049,7 +1049,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { packetRecv := make(chan struct{}) c := Client{ - OnDecodeError: func(err error) { + OnWarning: func(err error) { require.EqualError(t, err, "switching to TCP due to server request") }, } @@ -1225,7 +1225,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { packetRecv := make(chan struct{}) c := Client{ - OnDecodeError: func(err error) { + OnWarning: func(err error) { require.EqualError(t, err, "switching to TCP due to server request") }, } @@ -1459,7 +1459,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { packetRecv := make(chan struct{}) c := Client{ - OnDecodeError: func(err error) { + OnWarning: func(err error) { require.EqualError(t, err, "no UDP packets received, switching to TCP") }, ReadTimeout: 1 * time.Second, @@ -3072,7 +3072,7 @@ func TestClientPlayDecodeErrors(t *testing.T) { v := TransportTCP return &v }(), - OnDecodeError: func(err error) { + OnWarning: func(err error) { switch { case ca.proto == "udp" && ca.name == "rtp invalid": require.EqualError(t, err, "RTP header size insufficient: 2 < 4") diff --git a/client_record_test.go b/client_record_test.go index 794c9ed5..10fa98e7 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -1021,7 +1021,7 @@ func TestClientRecordDecodeErrors(t *testing.T) { v := TransportTCP return &v }(), - OnDecodeError: func(err error) { + OnWarning: func(err error) { switch { case ca.proto == "udp" && ca.name == "rtcp invalid": require.EqualError(t, err, "rtcp: packet too short") diff --git a/server_handler.go b/server_handler.go index 9cfe1afb..7572ce9d 100644 --- a/server_handler.go +++ b/server_handler.go @@ -194,13 +194,29 @@ type ServerHandlerOnSetParameter interface { OnSetParameter(*ServerHandlerOnSetParameterCtx) (*base.Response, error) } +// ServerHandlerOnWarningCtx is the context of OnWarning. +type ServerHandlerOnWarningCtx struct { + Session *ServerSession + Error error +} + +// ServerHandlerOnWarning can be implemented by a ServerHandler. +type ServerHandlerOnWarning interface { + // called when there's a non-fatal decoding error of RTP or RTCP packets. + OnWarning(*ServerHandlerOnWarningCtx) +} + // ServerHandlerOnDecodeErrorCtx is the context of OnDecodeError. +// +// Deprecated. Replaced by ServerHandlerOnWarningCtx. type ServerHandlerOnDecodeErrorCtx struct { Session *ServerSession Error error } // ServerHandlerOnDecodeError can be implemented by a ServerHandler. +// +// Deprecated. Replaced by ServerHandlerOnWarning. type ServerHandlerOnDecodeError interface { // called when there's a non-fatal decoding error of RTP or RTCP packets. OnDecodeError(*ServerHandlerOnDecodeErrorCtx) diff --git a/server_play_test.go b/server_play_test.go index b742dcc1..16450705 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -831,7 +831,7 @@ func TestServerPlayDecodeErrors(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onDecodeError: func(ctx *ServerHandlerOnDecodeErrorCtx) { + onWarning: func(ctx *ServerHandlerOnWarningCtx) { switch { case ca.proto == "udp" && ca.name == "rtcp invalid": require.EqualError(t, ctx.Error, "rtcp: packet too short") diff --git a/server_record_test.go b/server_record_test.go index d5a45495..548bae93 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -1455,7 +1455,7 @@ func TestServerRecordDecodeErrors(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onDecodeError: func(ctx *ServerHandlerOnDecodeErrorCtx) { + onWarning: func(ctx *ServerHandlerOnWarningCtx) { switch { case ca.proto == "udp" && ca.name == "rtp invalid": require.EqualError(t, ctx.Error, "RTP header size insufficient: 2 < 4") diff --git a/server_session_format.go b/server_session_format.go index eb6790ae..079df6e7 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -52,7 +52,7 @@ func (sf *serverSessionFormat) stop() { func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { packets, missing := sf.udpReorderer.Process(pkt) if missing != 0 { - onDecodeError(sf.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing)) + onWarning(sf.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing)) // do not return } diff --git a/server_session_media.go b/server_session_media.go index 86ada2fc..af8301b4 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -153,13 +153,13 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) error { atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - onDecodeError(sm.ss, fmt.Errorf("RTCP packet is too big to be read with UDP")) + onWarning(sm.ss, fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onDecodeError(sm.ss, err) + onWarning(sm.ss, err) return nil } @@ -176,20 +176,20 @@ func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) error { atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - onDecodeError(sm.ss, fmt.Errorf("RTP packet is too big to be read with UDP")) + onWarning(sm.ss, fmt.Errorf("RTP packet is too big to be read with UDP")) return nil } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - onDecodeError(sm.ss, err) + onWarning(sm.ss, err) return nil } forma, ok := sm.formats[pkt.PayloadType] if !ok { - onDecodeError(sm.ss, fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) + onWarning(sm.ss, fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) return nil } @@ -206,13 +206,13 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) error { atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - onDecodeError(sm.ss, fmt.Errorf("RTCP packet is too big to be read with UDP")) + onWarning(sm.ss, fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onDecodeError(sm.ss, err) + onWarning(sm.ss, err) return nil } @@ -241,14 +241,14 @@ func (sm *serverSessionMedia) readRTPTCPPlay(payload []byte) error { func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) error { if len(payload) > maxPacketSize { - onDecodeError(sm.ss, fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + onWarning(sm.ss, fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onDecodeError(sm.ss, err) + onWarning(sm.ss, err) return nil } @@ -268,7 +268,7 @@ func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) error { forma, ok := sm.formats[pkt.PayloadType] if !ok { - onDecodeError(sm.ss, fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) + onWarning(sm.ss, fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) return nil } @@ -278,14 +278,14 @@ func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) error { func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) error { if len(payload) > maxPacketSize { - onDecodeError(sm.ss, fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + onWarning(sm.ss, fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onDecodeError(sm.ss, err) + onWarning(sm.ss, err) return nil } diff --git a/server_test.go b/server_test.go index 45f5fe5e..7ba93246 100644 --- a/server_test.go +++ b/server_test.go @@ -93,7 +93,7 @@ type testServerHandler struct { onPause func(*ServerHandlerOnPauseCtx) (*base.Response, error) onSetParameter func(*ServerHandlerOnSetParameterCtx) (*base.Response, error) onGetParameter func(*ServerHandlerOnGetParameterCtx) (*base.Response, error) - onDecodeError func(*ServerHandlerOnDecodeErrorCtx) + onWarning func(*ServerHandlerOnWarningCtx) } func (sh *testServerHandler) OnConnOpen(ctx *ServerHandlerOnConnOpenCtx) { @@ -176,9 +176,9 @@ func (sh *testServerHandler) OnGetParameter(ctx *ServerHandlerOnGetParameterCtx) return nil, fmt.Errorf("unimplemented") } -func (sh *testServerHandler) OnDecodeError(ctx *ServerHandlerOnDecodeErrorCtx) { - if sh.onDecodeError != nil { - sh.onDecodeError(ctx) +func (sh *testServerHandler) OnWarning(ctx *ServerHandlerOnWarningCtx) { + if sh.onWarning != nil { + sh.onWarning(ctx) } } diff --git a/server_udpl.go b/server_udpl.go index 6377f4dc..13e5dfde 100644 --- a/server_udpl.go +++ b/server_udpl.go @@ -38,8 +38,13 @@ func (p *clientAddr) fill(ip net.IP, port int) { } } -func onDecodeError(ss *ServerSession, err error) { - if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok { +func onWarning(ss *ServerSession, err error) { + if h, ok := ss.s.Handler.(ServerHandlerOnWarning); ok { + h.OnWarning(&ServerHandlerOnWarningCtx{ + Session: ss, + Error: err, + }) + } else if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok { h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ Session: ss, Error: err,