From ad8d21480594ed0e7be7526fc855d37f8688b1a8 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 29 May 2024 15:51:51 +0200 Subject: [PATCH] Add WithLevel() to Logger interface --- log/log.go | 30 +++++++++++++++++------------- rtmp/rtmp.go | 40 +++++++++++++++++++++++----------------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/log/log.go b/log/log.go index d730b449..03d53842 100644 --- a/log/log.go +++ b/log/log.go @@ -95,6 +95,11 @@ type Logger interface { // be reset to nil. Error() Logger + // WithLevel writes a message with the given level to all registered outputs. + // The message will be written according to fmt.Printf(). The detail field will + // be reset to nil. + WithLevel(level Level) Logger + // Write implements the io.Writer interface such that it can be used in e.g. the // the log/Logger facility. Messages will be printed with debug level. Write(p []byte) (int, error) @@ -178,6 +183,10 @@ func (l *logger) Error() Logger { return newEvent(l).Error() } +func (l *logger) WithLevel(level Level) Logger { + return newEvent(l).WithLevel(level) +} + func (l *logger) Write(p []byte) (int, error) { return newEvent(l).Write(p) } @@ -318,29 +327,24 @@ func (e *Event) WithError(err error) Logger { } func (e *Event) Debug() Logger { - clone := e.clone() - clone.Level = Ldebug - - return clone + return e.WithLevel(Ldebug) } func (e *Event) Info() Logger { - clone := e.clone() - clone.Level = Linfo - - return clone + return e.WithLevel(Linfo) } func (e *Event) Warn() Logger { - clone := e.clone() - clone.Level = Lwarn - - return clone + return e.WithLevel(Lwarn) } func (e *Event) Error() Logger { + return e.WithLevel(Lerror) +} + +func (e *Event) WithLevel(level Level) Logger { clone := e.clone() - clone.Level = Lerror + clone.Level = level return clone } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 0eb83f10..9d40aa77 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -318,8 +318,8 @@ func (s *server) Channels() []string { return channels } -func (s *server) log(who, action, path, message string, client net.Addr) { - s.logger.Info().WithFields(log.Fields{ +func (s *server) log(level log.Level, who, action, path, message string, client net.Addr) { + s.logger.WithLevel(level).WithFields(log.Fields{ "who": who, "action": action, "path": path, @@ -364,12 +364,12 @@ func (s *server) handlePlay(conn *rtmp.Conn) { path, token := getToken(conn.URL) if len(token) == 0 { - s.log("PLAY", "FORBIDDEN", path, "no streamkey provided", client) + s.log(log.Lwarn, "PLAY", "FORBIDDEN", path, "no streamkey provided", client) return } if s.token != token { - s.log("PLAY", "FORBIDDEN", path, "invalid streamkey ("+token+")", client) + s.log(log.Lwarn, "PLAY", "FORBIDDEN", path, "invalid streamkey ("+token+")", client) return } @@ -404,7 +404,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { // Set the metadata for the client conn.SetMetaData(ch.metadata) - s.log("PLAY", "START", playPath, "", client) + s.log(log.Linfo, "PLAY", "START", playPath, "", client) // Get a cursor and apply filters cursor := ch.queue.Oldest() @@ -427,13 +427,16 @@ func (s *server) handlePlay(conn *rtmp.Conn) { id := ch.AddSubscriber(conn) // Transfer the data - avutil.CopyFile(conn, demuxer) + err := avutil.CopyFile(conn, demuxer) + if err != nil { + s.log(log.Lerror, "PLAY", "ERROR", playPath, err.Error(), client) + } ch.RemoveSubscriber(id) - s.log("PLAY", "STOP", playPath, "", client) + s.log(log.Linfo, "PLAY", "STOP", playPath, "", client) } else { - s.log("PLAY", "NOTFOUND", playPath, "", client) + s.log(log.Lwarn, "PLAY", "NOTFOUND", playPath, "", client) } } @@ -449,12 +452,12 @@ func (s *server) handlePublish(conn *rtmp.Conn) { path, token := getToken(conn.URL) if len(token) == 0 { - s.log("PUBLISH", "FORBIDDEN", path, "no streamkey provided", client) + s.log(log.Lwarn, "PUBLISH", "FORBIDDEN", path, "no streamkey provided", client) return } if s.token != token { - s.log("PUBLISH", "FORBIDDEN", path, "invalid streamkey ("+token+")", client) + s.log(log.Lwarn, "PUBLISH", "FORBIDDEN", path, "invalid streamkey ("+token+")", client) return } @@ -463,7 +466,7 @@ func (s *server) handlePublish(conn *rtmp.Conn) { // Check the app patch if !strings.HasPrefix(playPath, s.app) { - s.log("PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", client) + s.log(log.Lwarn, "PUBLISH", "FORBIDDEN", conn.URL.Path, "invalid app", client) return } @@ -471,7 +474,7 @@ func (s *server) handlePublish(conn *rtmp.Conn) { streams, _ := conn.Streams() if len(streams) == 0 { - s.log("PUBLISH", "INVALID", playPath, "no streams available", client) + s.log(log.Lwarn, "PUBLISH", "INVALID", playPath, "no streams available", client) return } @@ -506,18 +509,21 @@ func (s *server) handlePublish(conn *rtmp.Conn) { s.lock.Unlock() if ch == nil { - s.log("PUBLISH", "CONFLICT", playPath, "already publishing", client) + s.log(log.Lwarn, "PUBLISH", "CONFLICT", playPath, "already publishing", client) return } - s.log("PUBLISH", "START", playPath, "", client) + s.log(log.Linfo, "PUBLISH", "START", playPath, "", client) for _, stream := range streams { - s.log("PUBLISH", "STREAM", playPath, stream.Type().String(), client) + s.log(log.Linfo, "PUBLISH", "STREAM", playPath, stream.Type().String(), client) } // Ingest the data - avutil.CopyPackets(ch.queue, conn) + err := avutil.CopyPackets(ch.queue, conn) + if err != nil { + s.log(log.Lerror, "PUBLISH", "ERROR", playPath, err.Error(), client) + } s.lock.Lock() delete(s.channels, playPath) @@ -525,5 +531,5 @@ func (s *server) handlePublish(conn *rtmp.Conn) { ch.Close() - s.log("PUBLISH", "STOP", playPath, "", client) + s.log(log.Linfo, "PUBLISH", "STOP", playPath, "", client) }