diff --git a/client.go b/client.go index 30a54430..215f00fd 100644 --- a/client.go +++ b/client.go @@ -10,7 +10,6 @@ import ( "context" "crypto/tls" "fmt" - "log" "net" "strconv" "strings" @@ -141,23 +140,10 @@ type clientRes struct { err error } -// LogLevel is a log level. -type LogLevel int - -// Log levels. -const ( - LogLevelDebug LogLevel = iota + 1 - LogLevelInfo - LogLevelWarn - LogLevelError -) - -// LogFunc is the prototype of the log function. -type LogFunc func(level LogLevel, format string, args ...interface{}) - -func defaultLog(level LogLevel, format string, args ...interface{}) { - log.Printf(format, args...) -} +// ClientLogFunc is the prototype of the log function. +// +// Deprecated: Log() is deprecated. +type ClientLogFunc func(level LogLevel, format string, args ...interface{}) // Client is a RTSP client. type Client struct { @@ -225,13 +211,14 @@ type Client struct { OnRequest func(*base.Request) // called after every response. OnResponse func(*base.Response) - - // - // logging (all optional) - // - // function that receives log messages. - // It defaults to log.Printf. - Log LogFunc + // called when the transport protocol changes. + OnTransportSwitch func(err error) + // called when the client detects lost packets. + OnPacketLost func(err error) + // called when there's a decode error. + OnDecodeError func(err error) + // Deprecated: replaced by OnTransportSwitch, OnPacketLost, OnDecodeError + Log ClientLogFunc // // private @@ -335,9 +322,32 @@ func (c *Client) Start(scheme string, host string) error { c.OnResponse = func(*base.Response) { } } - - if c.Log == nil { - c.Log = defaultLog + if c.Log != nil && c.OnTransportSwitch == nil { + c.OnTransportSwitch = func(err error) { + c.Log(LogLevelWarn, "%v", err) + } + } + if c.OnTransportSwitch == nil { + c.OnTransportSwitch = func(err error) { + } + } + if c.Log != nil && c.OnPacketLost == nil { + c.OnPacketLost = func(err error) { + c.Log(LogLevelWarn, "%v", err) + } + } + if c.OnPacketLost == nil { + c.OnPacketLost = func(err error) { + } + } + if c.Log != nil && c.OnDecodeError == nil { + c.OnDecodeError = func(err error) { + c.Log(LogLevelWarn, "%v", err) + } + } + if c.OnDecodeError == nil { + c.OnDecodeError = func(err error) { + } } // private @@ -610,7 +620,7 @@ func (c *Client) checkState(allowed map[clientState]struct{}) error { } func (c *Client) trySwitchingProtocol() error { - c.Log(LogLevelWarn, "no UDP packets received, switching to TCP") + c.OnTransportSwitch(fmt.Errorf("no UDP packets received, switching to TCP")) prevScheme := c.scheme prevHost := c.host @@ -651,7 +661,7 @@ func (c *Client) trySwitchingProtocol() error { } func (c *Client) trySwitchingProtocol2(medi *media.Media, baseURL *url.URL) (*base.Response, error) { - c.Log(LogLevelWarn, "switching to TCP because server requested it") + c.OnTransportSwitch(fmt.Errorf("switching to TCP because server requested it")) prevScheme := c.scheme prevHost := c.host @@ -1266,7 +1276,7 @@ func (c *Client) doSetup( if res.StatusCode == base.StatusUnsupportedTransport && c.effectiveTransport == nil && c.Transport == nil { - c.Log(LogLevelWarn, "switching to TCP because server requested it") + c.OnTransportSwitch(fmt.Errorf("switching to TCP because server requested it")) v := TransportTCP c.effectiveTransport = &v return c.doSetup(medi, baseURL, 0, 0) diff --git a/client_format.go b/client_format.go index 35e45fc9..15b0abee 100644 --- a/client_format.go +++ b/client_format.go @@ -1,6 +1,7 @@ package gortsplib import ( + "fmt" "time" "github.com/pion/rtcp" @@ -94,7 +95,7 @@ func (ct *clientFormat) writePacketRTPWithNTP(pkt *rtp.Packet, ntp time.Time) er func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) { packets, missing := ct.udpReorderer.Process(pkt) if missing != 0 { - ct.c.Log(LogLevelWarn, "%d RTP packet(s) lost", missing) + ct.c.OnPacketLost(fmt.Errorf("%d RTP packet(s) lost", missing)) // do not return } diff --git a/client_media.go b/client_media.go index 9f1a6a18..6bf8cbd6 100644 --- a/client_media.go +++ b/client_media.go @@ -1,6 +1,7 @@ package gortsplib import ( + "fmt" "sync/atomic" "time" @@ -221,14 +222,14 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) error { atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) if len(payload) > maxPacketSize { - cm.c.Log(LogLevelWarn, "RTCP packet size (%d) is greater than maximum allowed (%d)", - len(payload), maxPacketSize) + cm.c.OnDecodeError(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.Log(LogLevelWarn, "%v", err) + cm.c.OnDecodeError(err) return nil } @@ -245,14 +246,14 @@ func (cm *clientMedia) readRTPTCPRecord(payload []byte) error { func (cm *clientMedia) readRTCPTCPRecord(payload []byte) error { if len(payload) > maxPacketSize { - cm.c.Log(LogLevelWarn, "RTCP packet size (%d) is greater than maximum allowed (%d)", - len(payload), maxPacketSize) + cm.c.OnDecodeError(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.Log(LogLevelWarn, "%v", err) + cm.c.OnDecodeError(err) return nil } @@ -269,20 +270,20 @@ func (cm *clientMedia) readRTPUDPPlay(payload []byte) error { atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - cm.c.Log(LogLevelWarn, "RTP packet is too big to be read with UDP") + cm.c.OnDecodeError(fmt.Errorf("RTP packet is too big to be read with UDP")) return nil } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - cm.c.Log(LogLevelWarn, "%v", err) + cm.c.OnDecodeError(err) return nil } forma, ok := cm.formats[pkt.PayloadType] if !ok { - cm.c.Log(LogLevelWarn, "received RTP packet with unknown payload type (%d)", pkt.PayloadType) + cm.c.OnDecodeError(fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) return nil } @@ -297,13 +298,13 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) error { atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - cm.c.Log(LogLevelWarn, "RTCP packet is too big to be read with UDP") + cm.c.OnDecodeError(fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.Log(LogLevelWarn, "%v", err) + cm.c.OnDecodeError(err) return nil } @@ -331,13 +332,13 @@ func (cm *clientMedia) readRTCPUDPRecord(payload []byte) error { atomic.AddUint64(cm.c.BytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - cm.c.Log(LogLevelWarn, "RTCP packet is too big to be read with UDP") + cm.c.OnDecodeError(fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.Log(LogLevelWarn, "%v", err) + cm.c.OnDecodeError(err) return nil } diff --git a/client_play_test.go b/client_play_test.go index dc79ead9..6fe2efda 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -3,7 +3,6 @@ package gortsplib import ( "bytes" "crypto/tls" - "fmt" "net" "strconv" "strings" @@ -1047,13 +1046,16 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { require.NoError(t, err) }() + msgRecv := make(chan struct{}) packetRecv := make(chan struct{}) c := Client{ - Log: func(level LogLevel, format string, args ...interface{}) { - require.Equal(t, format, "switching to TCP because server requested it") + OnTransportSwitch: func(err error) { + require.EqualError(t, err, "switching to TCP because server requested it") + close(msgRecv) }, } + err = readAll(&c, "rtsp://localhost:8554/teststream", func(medi *media.Media, forma formats.Format, pkt *rtp.Packet) { close(packetRecv) @@ -1061,6 +1063,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { require.NoError(t, err) defer c.Close() + <-msgRecv <-packetRecv }) @@ -1223,13 +1226,16 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { }() }() + msgRecv := make(chan struct{}) packetRecv := make(chan struct{}) c := Client{ - Log: func(level LogLevel, format string, args ...interface{}) { - require.Equal(t, format, "switching to TCP because server requested it") + OnTransportSwitch: func(err error) { + require.EqualError(t, err, "switching to TCP because server requested it") + close(msgRecv) }, } + err = readAll(&c, "rtsp://localhost:8554/teststream", func(medi *media.Media, forma formats.Format, pkt *rtp.Packet) { close(packetRecv) @@ -1237,6 +1243,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { require.NoError(t, err) defer c.Close() + <-msgRecv <-packetRecv }) @@ -1457,11 +1464,13 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { }() }() + msgRecv := make(chan struct{}) packetRecv := make(chan struct{}) c := Client{ - Log: func(level LogLevel, format string, args ...interface{}) { - require.Equal(t, format, "no UDP packets received, switching to TCP") + OnTransportSwitch: func(err error) { + require.EqualError(t, err, "no UDP packets received, switching to TCP") + close(msgRecv) }, ReadTimeout: 1 * time.Second, } @@ -1473,6 +1482,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { require.NoError(t, err) defer c.Close() + <-msgRecv <-packetRecv }) } @@ -3073,8 +3083,13 @@ func TestClientPlayDecodeErrors(t *testing.T) { v := TransportTCP return &v }(), - Log: func(level LogLevel, format string, args ...interface{}) { - err := fmt.Errorf(format, args...) + OnPacketLost: func(err error) { + if ca.proto == "udp" && ca.name == "rtp packets lost" { + require.EqualError(t, err, "69 RTP packet(s) lost") + } + close(errorRecv) + }, + OnDecodeError: func(err error) { switch { case ca.proto == "udp" && ca.name == "rtp invalid": require.EqualError(t, err, "RTP header size insufficient: 2 < 4") @@ -3082,9 +3097,6 @@ func TestClientPlayDecodeErrors(t *testing.T) { case ca.proto == "udp" && ca.name == "rtcp invalid": require.EqualError(t, err, "rtcp: packet too short") - case ca.proto == "udp" && ca.name == "rtp packets lost": - require.EqualError(t, err, "69 RTP packet(s) lost") - case ca.proto == "udp" && ca.name == "rtp too big": require.EqualError(t, err, "RTP packet is too big to be read with UDP") diff --git a/client_record_test.go b/client_record_test.go index 56f7d444..ed2470dd 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -3,7 +3,6 @@ package gortsplib import ( "bytes" "crypto/tls" - "fmt" "net" "strings" "testing" @@ -1022,8 +1021,7 @@ func TestClientRecordDecodeErrors(t *testing.T) { v := TransportTCP return &v }(), - Log: func(level LogLevel, format string, args ...interface{}) { - err := fmt.Errorf(format, args...) + OnDecodeError: func(err error) { switch { case ca.proto == "udp" && ca.name == "rtcp invalid": require.EqualError(t, err, "rtcp: packet too short") diff --git a/loglevel.go b/loglevel.go new file mode 100644 index 00000000..04ac5cef --- /dev/null +++ b/loglevel.go @@ -0,0 +1,16 @@ +package gortsplib + +// LogLevel is a log level. +// +// Deprecated: Log() is deprecated. +type LogLevel int + +// Log levels. +// +// Deprecated: Log() is deprecated. +const ( + LogLevelDebug LogLevel = iota + 1 + LogLevelInfo + LogLevelWarn + LogLevelError +) diff --git a/server.go b/server.go index dd23f2f5..543b6166 100644 --- a/server.go +++ b/server.go @@ -99,6 +99,7 @@ type Server struct { // handler (optional) // // an handler to handle server events. + // It may implement one or more of the ServerHandler* interfaces. Handler ServerHandler // diff --git a/server_handler.go b/server_handler.go index 2050eb55..918adf9b 100644 --- a/server_handler.go +++ b/server_handler.go @@ -195,13 +195,38 @@ type ServerHandlerOnSetParameter interface { } // ServerHandlerOnWarningCtx is the context of OnWarning. +// +// Deprecated: ServerHandlerOnWarning is deprecated. type ServerHandlerOnWarningCtx struct { Session *ServerSession Error error } // ServerHandlerOnWarning can be implemented by a ServerHandler. +// +// Deprecated: replaced by OnPacketLost, OnDecodeError. type ServerHandlerOnWarning interface { - // called when there's a non-fatal decoding error of RTP or RTCP packets. OnWarning(*ServerHandlerOnWarningCtx) } + +// ServerHandlerOnPacketLostCtx is the context of OnPacketLost. +type ServerHandlerOnPacketLostCtx struct { + Session *ServerSession + Error error +} + +// ServerHandlerOnPacketLost can be implemented by a ServerHandler. +type ServerHandlerOnPacketLost interface { + OnPacketLost(*ServerHandlerOnPacketLostCtx) +} + +// ServerHandlerOnDecodeErrorCtx is the context of OnDecodeError. +type ServerHandlerOnDecodeErrorCtx struct { + Session *ServerSession + Error error +} + +// ServerHandlerOnDecodeError can be implemented by a ServerHandler. +type ServerHandlerOnDecodeError interface { + OnDecodeError(*ServerHandlerOnDecodeErrorCtx) +} diff --git a/server_play_test.go b/server_play_test.go index f9f600ba..b38ba774 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -836,7 +836,7 @@ func TestServerPlayDecodeErrors(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onWarning: func(ctx *ServerHandlerOnWarningCtx) { + onDecodeError: func(ctx *ServerHandlerOnDecodeErrorCtx) { switch { case ca.proto == "udp" && ca.name == "rtcp invalid": require.EqualError(t, ctx.Error, "rtcp: packet too short") diff --git a/server_record_test.go b/server_record_test.go index 07a90816..9fdd99aa 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -1454,7 +1454,13 @@ func TestServerRecordDecodeErrors(t *testing.T) { StatusCode: base.StatusOK, }, nil }, - onWarning: func(ctx *ServerHandlerOnWarningCtx) { + onPacketLost: func(ctx *ServerHandlerOnPacketLostCtx) { + if ca.proto == "udp" && ca.name == "rtp packets lost" { + require.EqualError(t, ctx.Error, "69 RTP packet(s) lost") + } + close(errorRecv) + }, + onDecodeError: func(ctx *ServerHandlerOnDecodeErrorCtx) { switch { case ca.proto == "udp" && ca.name == "rtp invalid": require.EqualError(t, ctx.Error, "RTP header size insufficient: 2 < 4") @@ -1462,9 +1468,6 @@ func TestServerRecordDecodeErrors(t *testing.T) { case ca.proto == "udp" && ca.name == "rtcp invalid": require.EqualError(t, ctx.Error, "rtcp: packet too short") - case ca.proto == "udp" && ca.name == "rtp packets lost": - require.EqualError(t, ctx.Error, "69 RTP packet(s) lost") - case ca.proto == "udp" && ca.name == "rtp too big": require.EqualError(t, ctx.Error, "RTP packet is too big to be read with UDP") diff --git a/server_session.go b/server_session.go index 284135a7..13db162e 100644 --- a/server_session.go +++ b/server_session.go @@ -263,6 +263,34 @@ func (ss *ServerSession) UserData() interface{} { return ss.userData } +func (ss *ServerSession) onPacketLost(err error) { + if h, ok := ss.s.Handler.(ServerHandlerOnPacketLost); ok { + h.OnPacketLost(&ServerHandlerOnPacketLostCtx{ + Session: ss, + Error: err, + }) + } else if h, ok := ss.s.Handler.(ServerHandlerOnWarning); ok { + h.OnWarning(&ServerHandlerOnWarningCtx{ + Session: ss, + Error: err, + }) + } +} + +func (ss *ServerSession) onDecodeError(err error) { + if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok { + h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ + Session: ss, + Error: err, + }) + } else if h, ok := ss.s.Handler.(ServerHandlerOnWarning); ok { + h.OnWarning(&ServerHandlerOnWarningCtx{ + Session: ss, + Error: err, + }) + } +} + func (ss *ServerSession) checkState(allowed map[ServerSessionState]struct{}) error { if _, ok := allowed[ss.state]; ok { return nil diff --git a/server_session_format.go b/server_session_format.go index f32f5c67..a96baa16 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -52,7 +52,7 @@ func (sf *serverSessionFormat) stop() { func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { packets, missing := sf.udpReorderer.Process(pkt) if missing != 0 { - onWarning(sf.sm.ss, fmt.Errorf("%d RTP packet(s) lost", missing)) + sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP packet(s) lost", missing)) // do not return } diff --git a/server_session_media.go b/server_session_media.go index 3e6a5daf..c1b48302 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -155,13 +155,13 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) error { atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - onWarning(sm.ss, fmt.Errorf("RTCP packet is too big to be read with UDP")) + sm.ss.onDecodeError(fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onWarning(sm.ss, err) + sm.ss.onDecodeError(err) return nil } @@ -181,20 +181,20 @@ func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) error { atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - onWarning(sm.ss, fmt.Errorf("RTP packet is too big to be read with UDP")) + sm.ss.onDecodeError(fmt.Errorf("RTP packet is too big to be read with UDP")) return nil } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - onWarning(sm.ss, err) + sm.ss.onDecodeError(err) return nil } forma, ok := sm.formats[pkt.PayloadType] if !ok { - onWarning(sm.ss, fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) + sm.ss.onDecodeError(fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) return nil } @@ -211,13 +211,13 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) error { atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) if plen == (maxPacketSize + 1) { - onWarning(sm.ss, fmt.Errorf("RTCP packet is too big to be read with UDP")) + sm.ss.onDecodeError(fmt.Errorf("RTCP packet is too big to be read with UDP")) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onWarning(sm.ss, err) + sm.ss.onDecodeError(err) return nil } @@ -246,14 +246,14 @@ func (sm *serverSessionMedia) readRTPTCPPlay(payload []byte) error { func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) error { if len(payload) > maxPacketSize { - onWarning(sm.ss, fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + sm.ss.onDecodeError(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onWarning(sm.ss, err) + sm.ss.onDecodeError(err) return nil } @@ -273,7 +273,7 @@ func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) error { forma, ok := sm.formats[pkt.PayloadType] if !ok { - onWarning(sm.ss, fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) + sm.ss.onDecodeError(fmt.Errorf("received RTP packet with unknown payload type (%d)", pkt.PayloadType)) return nil } @@ -283,14 +283,14 @@ func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) error { func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) error { if len(payload) > maxPacketSize { - onWarning(sm.ss, fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", + sm.ss.onDecodeError(fmt.Errorf("RTCP packet size (%d) is greater than maximum allowed (%d)", len(payload), maxPacketSize)) return nil } packets, err := rtcp.Unmarshal(payload) if err != nil { - onWarning(sm.ss, err) + sm.ss.onDecodeError(err) return nil } diff --git a/server_test.go b/server_test.go index fb67e4ba..60b3accb 100644 --- a/server_test.go +++ b/server_test.go @@ -93,7 +93,8 @@ type testServerHandler struct { onPause func(*ServerHandlerOnPauseCtx) (*base.Response, error) onSetParameter func(*ServerHandlerOnSetParameterCtx) (*base.Response, error) onGetParameter func(*ServerHandlerOnGetParameterCtx) (*base.Response, error) - onWarning func(*ServerHandlerOnWarningCtx) + onPacketLost func(*ServerHandlerOnPacketLostCtx) + onDecodeError func(*ServerHandlerOnDecodeErrorCtx) } func (sh *testServerHandler) OnConnOpen(ctx *ServerHandlerOnConnOpenCtx) { @@ -176,9 +177,15 @@ func (sh *testServerHandler) OnGetParameter(ctx *ServerHandlerOnGetParameterCtx) return nil, fmt.Errorf("unimplemented") } -func (sh *testServerHandler) OnWarning(ctx *ServerHandlerOnWarningCtx) { - if sh.onWarning != nil { - sh.onWarning(ctx) +func (sh *testServerHandler) OnPacketLost(ctx *ServerHandlerOnPacketLostCtx) { + if sh.onPacketLost != nil { + sh.onPacketLost(ctx) + } +} + +func (sh *testServerHandler) OnDecodeError(ctx *ServerHandlerOnDecodeErrorCtx) { + if sh.onDecodeError != nil { + sh.onDecodeError(ctx) } } diff --git a/server_udpl.go b/server_udpl.go index 58f80abd..c42b1a47 100644 --- a/server_udpl.go +++ b/server_udpl.go @@ -38,15 +38,6 @@ func (p *clientAddr) fill(ip net.IP, port int) { } } -func onWarning(ss *ServerSession, err error) { - if h, ok := ss.s.Handler.(ServerHandlerOnWarning); ok { - h.OnWarning(&ServerHandlerOnWarningCtx{ - Session: ss, - Error: err, - }) - } -} - type serverUDPListener struct { pc *net.UDPConn listenIP net.IP