From 4a9dd2c921f710a943e9072a412ff3ddafb64a3b Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 30 Oct 2023 10:11:48 +0800 Subject: [PATCH] debug: add some log --- io.go | 68 ++++++++++++++++++++++++++++++++----------------------- plugin.go | 2 +- puller.go | 9 +------- stream.go | 3 ++- 4 files changed, 44 insertions(+), 38 deletions(-) diff --git a/io.go b/io.go index b5898cb..ab65759 100644 --- a/io.go +++ b/io.go @@ -35,19 +35,19 @@ type AuthPub interface { // 发布者或者订阅者的共用结构体 type IO struct { - ID string - Type string - RemoteAddr string - context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过OnEvent传入父级Context - context.CancelFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者 - *log.Logger `json:"-" yaml:"-"` - StartTime time.Time //创建时间 - Stream *Stream `json:"-" yaml:"-"` - io.Reader `json:"-" yaml:"-"` - io.Writer `json:"-" yaml:"-"` - io.Closer `json:"-" yaml:"-"` - Args url.Values - Spesific IIO `json:"-" yaml:"-"` + ID string + Type string + RemoteAddr string + context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过SetParentCtx传入父级Context + context.CancelCauseFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者 + *log.Logger `json:"-" yaml:"-"` + StartTime time.Time //创建时间 + Stream *Stream `json:"-" yaml:"-"` + io.Reader `json:"-" yaml:"-"` + io.Writer `json:"-" yaml:"-"` + io.Closer `json:"-" yaml:"-"` + Args url.Values + Spesific IIO `json:"-" yaml:"-"` } func (io *IO) IsClosed() bool { @@ -69,7 +69,7 @@ func (i *IO) SetIO(conn any) { // SetParentCtx(可选) func (i *IO) SetParentCtx(parent context.Context) { - i.Context, i.CancelFunc = context.WithCancel(parent) + i.Context, i.CancelCauseFunc = context.WithCancelCause(parent) } func (i *IO) SetLogger(logger *log.Logger) { @@ -78,8 +78,10 @@ func (i *IO) SetLogger(logger *log.Logger) { func (i *IO) OnEvent(event any) { switch event.(type) { - case SEclose, SEKick: - i.close() + case SEclose: + i.close(StopError{zap.String("event", "close")}) + case SEKick: + i.close(StopError{zap.String("event", "kick")}) } } @@ -102,26 +104,30 @@ type IIO interface { log.Zap } -func (i *IO) close() bool { +func (i *IO) close(err StopError) bool { if i.IsClosed() { + i.Warn("already closed", err...) return false } + i.Info("close", err...) if i.Closer != nil { i.Closer.Close() } - if i.CancelFunc != nil { - i.CancelFunc() + if i.CancelCauseFunc != nil { + i.CancelCauseFunc(err) } return true } // Stop 停止订阅或者发布,由订阅者或者发布者调用 func (io *IO) Stop(reason ...zapcore.Field) { - if io.close() { - io.Info("stop", reason...) - } else { - io.Warn("already stopped", reason...) - } + io.close(StopError(reason)) +} + +type StopError []zapcore.Field + +func (s StopError) Error() string { + return "stop" } var ( @@ -171,9 +177,11 @@ func (io *IO) receive(streamPath string, specific IIO) error { if s == nil { return ErrBadStreamName } - + if Engine.Err() != nil { + return Engine.Err() + } + io.SetParentCtx(util.Conditoinal(io.Context == nil || io.Err() != nil, Engine.Context, io.Context)) if io.Stream == nil { //初次 - io.Context, io.CancelFunc = context.WithCancel(util.Conditoinal[context.Context](io.Context == nil, Engine, io.Context)) if io.Type == "" { io.Type = reflect.TypeOf(specific).Elem().Name() } @@ -182,9 +190,9 @@ func (io *IO) receive(streamPath string, specific IIO) error { logFeilds = append(logFeilds, zap.String("ID", io.ID)) } if io.Logger == nil { + logFeilds = append(logFeilds, zap.String("stream", s.Path)) io.Logger = s.With(logFeilds...) } else { - logFeilds = append(logFeilds, zap.String("streamPath", s.Path)) io.Logger = io.Logger.With(logFeilds...) } } @@ -192,7 +200,8 @@ func (io *IO) receive(streamPath string, specific IIO) error { io.Spesific = specific io.StartTime = time.Now() if v, ok := specific.(IPublisher); ok { - conf := v.GetPublisher().Config + puber := v.GetPublisher() + conf := puber.Config io.Info("publish") s.pubLocker.Lock() defer s.pubLocker.Unlock() @@ -208,6 +217,9 @@ func (io *IO) receive(streamPath string, specific IIO) error { s.Warn("duplicate publish", zot) return ErrDuplicatePublish } + } else { + puber.AudioTrack = nil + puber.VideoTrack = nil } s.PublishTimeout = conf.PublishTimeout s.DelayCloseTimeout = conf.DelayCloseTimeout diff --git a/plugin.go b/plugin.go index ef69385..d1d4f5a 100644 --- a/plugin.go +++ b/plugin.go @@ -325,7 +325,7 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) pullConf := conf.GetPullConfig() if save < 2 { zurl := zap.String("url", url) - zpath := zap.String("path", streamPath) + zpath := zap.String("stream", streamPath) opt.Info("pull", zpath, zurl) puller.init(streamPath, url, pullConf) opt.AssignPubConfig(puller.GetPublisher()) diff --git a/puller.go b/puller.go index 9636527..c16987e 100644 --- a/puller.go +++ b/puller.go @@ -1,7 +1,6 @@ package engine import ( - "context" "io" "strings" "time" @@ -86,13 +85,7 @@ func (pub *Puller) startPull(puller IPuller) { puller.Error("pull publish", zap.Error(err)) return } - s := puber.Stream - if stream != s && stream != nil { // 这段代码说明老流已经中断,创建了新流,需要把track置空,从而避免复用 - puber.AudioTrack = nil - puber.VideoTrack = nil - puber.Context, puber.CancelFunc = context.WithCancel(Engine) // 老流的上下文已经取消,需要重新创建 - } - stream = s + stream = puber.Stream badPuller = false if err = puller.Pull(); err != nil && !puller.IsShutdown() { puller.Error("pull interrupt", zap.Error(err)) diff --git a/stream.go b/stream.go index 8f2a77b..bc66d20 100644 --- a/stream.go +++ b/stream.go @@ -1,6 +1,7 @@ package engine import ( + "context" "encoding/json" "strings" "sync" @@ -479,7 +480,7 @@ func (s *Stream) run() { s.Warn("no tracks") lost = true } else if s.Publisher != nil && s.Publisher.IsClosed() { - s.Warn("publish is closed") + s.Warn("publish is closed", zap.Error(context.Cause(s.Publisher.GetPublisher())), zap.Error(Engine.Err())) lost = true } }