From fd088b11a257dd4f30bb6e826ccb3d2738471047 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Thu, 17 Feb 2022 21:49:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E9=99=A4logrus,=E4=BC=98=E5=8C=96pull?= =?UTF-8?q?er=E5=92=8Cpusher?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/frame.go | 18 +- config/config.go | 5 +- go.mod | 1 - go.sum | 3 - io.go | 41 ++-- log/log.go | 563 +++++++++++++++++++++++++---------------------- main.go | 32 +-- plugin.go | 41 ++-- publisher.go | 18 +- stream.go | 26 +-- subscriber.go | 64 ++++-- track/aac.go | 1 + track/audio.go | 8 + track/h264.go | 1 + track/h265.go | 1 + track/video.go | 8 + 16 files changed, 474 insertions(+), 357 deletions(-) diff --git a/common/frame.go b/common/frame.go index 3e4c810..9469d6c 100644 --- a/common/frame.go +++ b/common/frame.go @@ -5,8 +5,8 @@ import ( "time" "github.com/Monibuca/engine/v4/codec" + "github.com/Monibuca/engine/v4/log" "github.com/pion/rtp" - "github.com/sirupsen/logrus" ) type NALUSlice net.Buffers @@ -94,7 +94,7 @@ func (rtp *RTPFrame) Marshal() *RTPFrame { func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame { rtp.Raw = raw if err := rtp.Packet.Unmarshal(raw); err != nil { - logrus.Error(err) + log.Error(err) return nil } return rtp @@ -112,6 +112,11 @@ type DataFrame[T any] struct { BaseFrame Value T } +type MediaFrame interface { + GetFLV() net.Buffers + GetAVCC() net.Buffers + GetRTP() []*RTPFrame +} type AVFrame[T RawSlice] struct { BaseFrame IFrame bool @@ -124,6 +129,15 @@ type AVFrame[T RawSlice] struct { canRead bool } +func (av *AVFrame[T]) GetFLV() net.Buffers { + return av.FLV +} +func (av *AVFrame[T]) GetAVCC() net.Buffers { + return av.AVCC +} +func (av *AVFrame[T]) GetRTP() []*RTPFrame { + return av.RTP +} func (av *AVFrame[T]) AppendRaw(raw ...T) { av.Raw = append(av.Raw, raw...) } diff --git a/config/config.go b/config/config.go index 2231bdb..9492b16 100644 --- a/config/config.go +++ b/config/config.go @@ -1,13 +1,12 @@ package config import ( + "github.com/Monibuca/engine/v4/log" "net" "net/http" "reflect" "strings" "time" - - "github.com/sirupsen/logrus" ) type Config map[string]any @@ -61,7 +60,7 @@ func (config Config) Unmarshal(s any) { for k, v := range config { name, ok := nameMap[k] if !ok { - logrus.Error("no config named:", k) + log.Error("no config named:", k) continue } // 需要被写入的字段 diff --git a/go.mod b/go.mod index 026ad71..7e32b55 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/mattn/go-colorable v0.1.8 github.com/pion/rtp v1.7.4 github.com/q191201771/naza v0.19.1 - github.com/sirupsen/logrus v1.8.1 go.uber.org/zap v1.21.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b diff --git a/go.sum b/go.sum index 6e710fb..ec72e22 100644 --- a/go.sum +++ b/go.sum @@ -50,11 +50,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/io.go b/io.go index b266022..855d22f 100644 --- a/io.go +++ b/io.go @@ -8,9 +8,7 @@ import ( "strings" "time" - "github.com/Monibuca/engine/v4/common" "github.com/Monibuca/engine/v4/config" - "github.com/Monibuca/engine/v4/log" "go.uber.org/zap" ) @@ -18,14 +16,14 @@ type IOConfig interface { config.Publish | config.Subscribe } -type IO[C IOConfig, S IIO] struct { +type IO[C IOConfig] struct { ID string Type string context.Context context.CancelFunc *zap.Logger - StartTime time.Time //创建时间 - Stream common.IStream `json:"-"` + StartTime time.Time //创建时间 + Stream *Stream `json:"-"` io.Reader `json:"-"` io.Writer `json:"-"` io.Closer `json:"-"` @@ -33,8 +31,13 @@ type IO[C IOConfig, S IIO] struct { Config *C } -func (io *IO[C, S]) OnEvent(event any) any { +func (io *IO[C]) IsClosed() bool { + return io.Err() != nil +} +func (io *IO[C]) OnEvent(event any) any { switch v := event.(type) { + case context.Context: + io.Context, io.CancelFunc = context.WithCancel(v) case *Stream: io.StartTime = time.Now() io.Stream = v @@ -52,14 +55,21 @@ func (io *IO[C, S]) OnEvent(event any) any { } return event } - -type IIO interface { - context.Context - log.Zap - OnEvent(any) any +func (io *IO[C]) getID() string { + return io.ID +} +func (io *IO[C]) getType() string { + return io.Type } -func (io *IO[C, S]) bye(specific S) { +type IIO interface { + IsClosed() bool + OnEvent(any) any + getID() string + getType() string +} + +func (io *IO[C]) bye(specific any) { if io.CancelFunc != nil { io.CancelFunc() } @@ -68,7 +78,7 @@ func (io *IO[C, S]) bye(specific S) { } } -func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) bool { +func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool { Streams.Lock() defer Streams.Unlock() streamPath = strings.Trim(streamPath, "/") @@ -82,12 +92,15 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) bool { if v, ok := c.(*config.Subscribe); ok { wt = v.WaitTimeout.Duration() } + if io.Context == nil { + io.Context, io.CancelFunc = context.WithCancel(Engine) + } s, created := findOrCreateStream(u.Path, wt) if s.IsClosed() { return false } if v, ok := c.(*config.Publish); ok { - if s.Publisher != nil && s.Publisher.Err() == nil { + if s.Publisher != nil && !s.Publisher.IsClosed() { // 根据配置是否剔出原来的发布者 if v.KickExist { s.Warn("kick", zap.Any("publisher", s.Publisher)) diff --git a/log/log.go b/log/log.go index 03402f3..e6c6795 100644 --- a/log/log.go +++ b/log/log.go @@ -1,28 +1,25 @@ package log import ( - "context" - "io" - "strings" - "time" - - . "github.com/logrusorgru/aurora" + // . "github.com/logrusorgru/aurora" "github.com/mattn/go-colorable" - log "github.com/sirupsen/logrus" + + // log "github.com/sirupsen/logrus" "go.uber.org/zap" ) -var logger *zap.Logger -var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} +var logger *zap.SugaredLogger +// var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} -type LogWriter func(*log.Entry) string +// type LogWriter func(*log.Entry) string var colorableStdout = colorable.NewColorableStdout() func init() { - logger, _ = zap.NewDevelopment() - std.SetOutput(colorableStdout) - std.SetFormatter(LogWriter(defaultFormatter)) + l, _ := zap.NewDevelopment() + logger = l.Sugar() + // std.SetOutput(colorableStdout) + // std.SetFormatter(LogWriter(defaultFormatter)) } type Zap interface { @@ -34,291 +31,335 @@ type Zap interface { } func With(fields ...zap.Field) *zap.Logger { - return logger.With(fields...) + return logger.Desugar().With(fields...) } -func defaultFormatter(entry *log.Entry) string { - pl := entry.Data["plugin"] - if pl == nil { - pl = "Engine" - } - l := strings.ToUpper(entry.Level.String())[:1] - var props string - if stream := entry.Data["stream"]; stream != nil { - props = Sprintf("[s:%s] ", stream) - } - if puber := entry.Data["puber"]; puber != nil { - props += Sprintf("[pub:%s] ", puber) - } - if suber := entry.Data["suber"]; suber != nil { - props += Sprintf("[sub:%s] ", suber) - } - return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message) +func Debug(args ...any) { + logger.Debug(args...) } -func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) { - return []byte(f(entry)), nil +func Info(args ...any) { + logger.Info(args...) } -var ( - // std is the name of the standard logger in stdlib `log` - std = log.New() -) - -func StandardLogger() *log.Logger { - return std +func Warn(args ...any) { + logger.Warn(args...) } -// SetOutput sets the standard logger output. -func SetOutput(out io.Writer) { - std.SetOutput(out) +func Error(args ...any) { + logger.Error(args...) } -// SetFormatter sets the standard logger formatter. -func SetFormatter(formatter log.Formatter) { - std.SetFormatter(formatter) -} - -// SetReportCaller sets whether the standard logger will include the calling -// method as a field. -func SetReportCaller(include bool) { - std.SetReportCaller(include) -} - -// SetLevel sets the standard logger level. -func SetLevel(level log.Level) { - std.SetLevel(level) -} - -// GetLevel returns the standard logger level. -func GetLevel() log.Level { - return std.GetLevel() -} - -// IsLevelEnabled checks if the log level of the standard logger is greater than the level param -func IsLevelEnabled(level log.Level) bool { - return std.IsLevelEnabled(level) -} - -// AddHook adds a hook to the standard logger hooks. -func AddHook(hook log.Hook) { - std.AddHook(hook) -} - -// WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key. -func WithError(err error) *log.Entry { - return std.WithField(log.ErrorKey, err) -} - -// WithContext creates an entry from the standard logger and adds a context to it. -func WithContext(ctx context.Context) *log.Entry { - return std.WithContext(ctx) -} - -// WithField creates an entry from the standard logger and adds a field to -// it. If you want multiple fields, use `WithFields`. -// -// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal -// or Panic on the Entry it returns. -func WithField(key string, value interface{}) *log.Entry { - return std.WithField(key, value) -} - -// WithFields creates an entry from the standard logger and adds multiple -// fields to it. This is simply a helper for `WithField`, invoking it -// once for each field. -// -// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal -// or Panic on the Entry it returns. -func WithFields(fields log.Fields) *log.Entry { - return std.WithFields(fields) -} - -// WithTime creates an entry from the standard logger and overrides the time of -// logs generated with it. -// -// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal -// or Panic on the Entry it returns. -func WithTime(t time.Time) *log.Entry { - return std.WithTime(t) -} - -// Trace logs a message at level Trace on the standard logger. -func Trace(args ...interface{}) { - std.Trace(args...) -} - -// Debug logs a message at level Debug on the standard logger. -func Debug(args ...interface{}) { - std.Debug(args...) -} - -// Print logs a message at level Info on the standard logger. -func Print(args ...interface{}) { - std.Print(args...) -} - -// Info logs a message at level Info on the standard logger. -func Info(args ...interface{}) { - std.Info(args...) -} - -// Warn logs a message at level Warn on the standard logger. -func Warn(args ...interface{}) { - std.Warn(args...) -} - -// Warning logs a message at level Warn on the standard logger. -func Warning(args ...interface{}) { - std.Warning(args...) -} - -// Error logs a message at level Error on the standard logger. -func Error(args ...interface{}) { - std.Error(args...) -} - -// Panic logs a message at level Panic on the standard logger. -func Panic(args ...interface{}) { - std.Panic(args...) -} - -// Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1. -func Fatal(args ...interface{}) { - std.Fatal(args...) -} - -// TraceFn logs a message from a func at level Trace on the standard logger. -func TraceFn(fn log.LogFunction) { - std.TraceFn(fn) -} - -// DebugFn logs a message from a func at level Debug on the standard logger. -func DebugFn(fn log.LogFunction) { - std.DebugFn(fn) -} - -// PrintFn logs a message from a func at level Info on the standard logger. -func PrintFn(fn log.LogFunction) { - std.PrintFn(fn) -} - -// InfoFn logs a message from a func at level Info on the standard logger. -func InfoFn(fn log.LogFunction) { - std.InfoFn(fn) -} - -// WarnFn logs a message from a func at level Warn on the standard logger. -func WarnFn(fn log.LogFunction) { - std.WarnFn(fn) -} - -// WarningFn logs a message from a func at level Warn on the standard logger. -func WarningFn(fn log.LogFunction) { - std.WarningFn(fn) -} - -// ErrorFn logs a message from a func at level Error on the standard logger. -func ErrorFn(fn log.LogFunction) { - std.ErrorFn(fn) -} - -// PanicFn logs a message from a func at level Panic on the standard logger. -func PanicFn(fn log.LogFunction) { - std.PanicFn(fn) -} - -// FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1. -func FatalFn(fn log.LogFunction) { - std.FatalFn(fn) -} - -// Tracef logs a message at level Trace on the standard logger. -func Tracef(format string, args ...interface{}) { - std.Tracef(format, args...) -} - -// Debugf logs a message at level Debug on the standard logger. func Debugf(format string, args ...interface{}) { - std.Debugf(format, args...) + logger.Debugf(format, args...) } - -// Printf logs a message at level Info on the standard logger. -func Printf(format string, args ...interface{}) { - std.Printf(format, args...) -} - // Infof logs a message at level Info on the standard logger. func Infof(format string, args ...interface{}) { - std.Infof(format, args...) + logger.Infof(format, args...) } // Warnf logs a message at level Warn on the standard logger. func Warnf(format string, args ...interface{}) { - std.Warnf(format, args...) -} - -// Warningf logs a message at level Warn on the standard logger. -func Warningf(format string, args ...interface{}) { - std.Warningf(format, args...) + logger.Warnf(format, args...) } // Errorf logs a message at level Error on the standard logger. func Errorf(format string, args ...interface{}) { - std.Errorf(format, args...) + logger.Errorf(format, args...) } // Panicf logs a message at level Panic on the standard logger. func Panicf(format string, args ...interface{}) { - std.Panicf(format, args...) + logger.Panicf(format, args...) } // Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1. func Fatalf(format string, args ...interface{}) { - std.Fatalf(format, args...) + logger.Fatalf(format, args...) } -// Traceln logs a message at level Trace on the standard logger. -func Traceln(args ...interface{}) { - std.Traceln(args...) -} +// func defaultFormatter(entry *log.Entry) string { +// pl := entry.Data["plugin"] +// if pl == nil { +// pl = "Engine" +// } +// l := strings.ToUpper(entry.Level.String())[:1] +// var props string +// if stream := entry.Data["stream"]; stream != nil { +// props = Sprintf("[s:%s] ", stream) +// } +// if puber := entry.Data["puber"]; puber != nil { +// props += Sprintf("[pub:%s] ", puber) +// } +// if suber := entry.Data["suber"]; suber != nil { +// props += Sprintf("[sub:%s] ", suber) +// } +// return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message) +// } -// Debugln logs a message at level Debug on the standard logger. -func Debugln(args ...interface{}) { - std.Debugln(args...) -} +// func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) { +// return []byte(f(entry)), nil +// } -// Println logs a message at level Info on the standard logger. -func Println(args ...interface{}) { - std.Println(args...) -} +// var ( +// // std is the name of the standard logger in stdlib `log` +// std = log.New() +// ) -// Infoln logs a message at level Info on the standard logger. -func Infoln(args ...interface{}) { - std.Infoln(args...) -} +// func StandardLogger() *log.Logger { +// return std +// } -// Warnln logs a message at level Warn on the standard logger. -func Warnln(args ...interface{}) { - std.Warnln(args...) -} +// // SetOutput sets the standard logger output. +// func SetOutput(out io.Writer) { +// std.SetOutput(out) +// } -// Warningln logs a message at level Warn on the standard logger. -func Warningln(args ...interface{}) { - std.Warningln(args...) -} +// // SetFormatter sets the standard logger formatter. +// func SetFormatter(formatter log.Formatter) { +// std.SetFormatter(formatter) +// } -// Errorln logs a message at level Error on the standard logger. -func Errorln(args ...interface{}) { - std.Errorln(args...) -} +// // SetReportCaller sets whether the standard logger will include the calling +// // method as a field. +// func SetReportCaller(include bool) { +// std.SetReportCaller(include) +// } -// Panicln logs a message at level Panic on the standard logger. -func Panicln(args ...interface{}) { - std.Panicln(args...) -} +// // SetLevel sets the standard logger level. +// func SetLevel(level log.Level) { +// std.SetLevel(level) +// } -// Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1. -func Fatalln(args ...interface{}) { - std.Fatalln(args...) -} +// // GetLevel returns the standard logger level. +// func GetLevel() log.Level { +// return std.GetLevel() +// } + +// // IsLevelEnabled checks if the log level of the standard logger is greater than the level param +// func IsLevelEnabled(level log.Level) bool { +// return std.IsLevelEnabled(level) +// } + +// // AddHook adds a hook to the standard logger hooks. +// func AddHook(hook log.Hook) { +// std.AddHook(hook) +// } + +// // WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key. +// func WithError(err error) *log.Entry { +// return std.WithField(log.ErrorKey, err) +// } + +// // WithContext creates an entry from the standard logger and adds a context to it. +// func WithContext(ctx context.Context) *log.Entry { +// return std.WithContext(ctx) +// } + +// // WithField creates an entry from the standard logger and adds a field to +// // it. If you want multiple fields, use `WithFields`. +// // +// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal +// // or Panic on the Entry it returns. +// func WithField(key string, value interface{}) *log.Entry { +// return std.WithField(key, value) +// } + +// // WithFields creates an entry from the standard logger and adds multiple +// // fields to it. This is simply a helper for `WithField`, invoking it +// // once for each field. +// // +// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal +// // or Panic on the Entry it returns. +// func WithFields(fields log.Fields) *log.Entry { +// return std.WithFields(fields) +// } + +// // WithTime creates an entry from the standard logger and overrides the time of +// // logs generated with it. +// // +// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal +// // or Panic on the Entry it returns. +// func WithTime(t time.Time) *log.Entry { +// return std.WithTime(t) +// } + +// // Trace logs a message at level Trace on the standard logger. +// func Trace(args ...interface{}) { +// std.Trace(args...) +// } + +// // Debug logs a message at level Debug on the standard logger. +// func Debug(args ...interface{}) { +// std.Debug(args...) +// } + +// // Print logs a message at level Info on the standard logger. +// func Print(args ...interface{}) { +// std.Print(args...) +// } + +// // Info logs a message at level Info on the standard logger. +// func Info(args ...interface{}) { +// std.Info(args...) +// } + +// // Warn logs a message at level Warn on the standard logger. +// func Warn(args ...interface{}) { +// std.Warn(args...) +// } + +// // Warning logs a message at level Warn on the standard logger. +// func Warning(args ...interface{}) { +// std.Warning(args...) +// } + +// // Error logs a message at level Error on the standard logger. +// func Error(args ...interface{}) { +// std.Error(args...) +// } + +// // Panic logs a message at level Panic on the standard logger. +// func Panic(args ...interface{}) { +// std.Panic(args...) +// } + +// // Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +// func Fatal(args ...interface{}) { +// std.Fatal(args...) +// } + +// // TraceFn logs a message from a func at level Trace on the standard logger. +// func TraceFn(fn log.LogFunction) { +// std.TraceFn(fn) +// } + +// // DebugFn logs a message from a func at level Debug on the standard logger. +// func DebugFn(fn log.LogFunction) { +// std.DebugFn(fn) +// } + +// // PrintFn logs a message from a func at level Info on the standard logger. +// func PrintFn(fn log.LogFunction) { +// std.PrintFn(fn) +// } + +// // InfoFn logs a message from a func at level Info on the standard logger. +// func InfoFn(fn log.LogFunction) { +// std.InfoFn(fn) +// } + +// // WarnFn logs a message from a func at level Warn on the standard logger. +// func WarnFn(fn log.LogFunction) { +// std.WarnFn(fn) +// } + +// // WarningFn logs a message from a func at level Warn on the standard logger. +// func WarningFn(fn log.LogFunction) { +// std.WarningFn(fn) +// } + +// // ErrorFn logs a message from a func at level Error on the standard logger. +// func ErrorFn(fn log.LogFunction) { +// std.ErrorFn(fn) +// } + +// // PanicFn logs a message from a func at level Panic on the standard logger. +// func PanicFn(fn log.LogFunction) { +// std.PanicFn(fn) +// } + +// // FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1. +// func FatalFn(fn log.LogFunction) { +// std.FatalFn(fn) +// } + +// // Tracef logs a message at level Trace on the standard logger. +// func Tracef(format string, args ...interface{}) { +// std.Tracef(format, args...) +// } + +// // Debugf logs a message at level Debug on the standard logger. +// func Debugf(format string, args ...interface{}) { +// std.Debugf(format, args...) +// } + +// // Printf logs a message at level Info on the standard logger. +// func Printf(format string, args ...interface{}) { +// std.Printf(format, args...) +// } + +// // Infof logs a message at level Info on the standard logger. +// func Infof(format string, args ...interface{}) { +// std.Infof(format, args...) +// } + +// // Warnf logs a message at level Warn on the standard logger. +// func Warnf(format string, args ...interface{}) { +// std.Warnf(format, args...) +// } + +// // Warningf logs a message at level Warn on the standard logger. +// func Warningf(format string, args ...interface{}) { +// std.Warningf(format, args...) +// } + +// // Errorf logs a message at level Error on the standard logger. +// func Errorf(format string, args ...interface{}) { +// std.Errorf(format, args...) +// } + +// // Panicf logs a message at level Panic on the standard logger. +// func Panicf(format string, args ...interface{}) { +// std.Panicf(format, args...) +// } + +// // Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +// func Fatalf(format string, args ...interface{}) { +// std.Fatalf(format, args...) +// } + +// // Traceln logs a message at level Trace on the standard logger. +// func Traceln(args ...interface{}) { +// std.Traceln(args...) +// } + +// // Debugln logs a message at level Debug on the standard logger. +// func Debugln(args ...interface{}) { +// std.Debugln(args...) +// } + +// // Println logs a message at level Info on the standard logger. +// func Println(args ...interface{}) { +// std.Println(args...) +// } + +// // Infoln logs a message at level Info on the standard logger. +// func Infoln(args ...interface{}) { +// std.Infoln(args...) +// } + +// // Warnln logs a message at level Warn on the standard logger. +// func Warnln(args ...interface{}) { +// std.Warnln(args...) +// } + +// // Warningln logs a message at level Warn on the standard logger. +// func Warningln(args ...interface{}) { +// std.Warningln(args...) +// } + +// // Errorln logs a message at level Error on the standard logger. +// func Errorln(args ...interface{}) { +// std.Errorln(args...) +// } + +// // Panicln logs a message at level Panic on the standard logger. +// func Panicln(args ...interface{}) { +// std.Panicln(args...) +// } + +// // Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +// func Fatalln(args ...interface{}) { +// std.Fatalln(args...) +// } diff --git a/main.go b/main.go index 2fdbc0f..2416313 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package engine import ( + "bytes" "context" "fmt" "io/ioutil" @@ -16,6 +17,7 @@ import ( "github.com/Monibuca/engine/v4/util" "github.com/google/uuid" . "github.com/logrusorgru/aurora" + "go.uber.org/zap" "gopkg.in/yaml.v3" ) @@ -39,22 +41,20 @@ var ( type PushOnPublish struct { PushPlugin - RemoteURL string - Config *config.Push + Pusher } -func (p PushOnPublish) Push(stream *Stream) { - p.PushStream(stream, p.RemoteURL, p.Config) +func (p PushOnPublish) Push() { + p.PushStream(p.Pusher) } type PullOnSubscribe struct { PullPlugin - RemoteURL string - Config *config.Pull + Puller } -func (p PullOnSubscribe) Pull(streamPath string) { - p.PullStream(streamPath, p.RemoteURL, p.Config) +func (p PullOnSubscribe) Pull() { + p.PullStream(p.Puller) } // Run 启动Monibuca引擎,传入总的Context,可用于关闭所有 @@ -83,7 +83,7 @@ func Run(ctx context.Context, configFile string) (err error) { } else { log.Warn("no config file found , use default config values") } - Engine.Entry = log.WithContext(Engine) + Engine.Logger = log.With(zap.String("plugin", "engine")) Engine.registerHandler() go EngineConfig.Update(Engine.RawConfig) for name, plugin := range Plugins { @@ -92,13 +92,17 @@ func Run(ctx context.Context, configFile string) (err error) { } UUID := uuid.NewString() reportTimer := time.NewTimer(time.Minute) - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://monibuca.com:2022/report/engine", nil) - req.Header.Set("os", runtime.GOOS) - req.Header.Set("version", Engine.Version) - req.Header.Set("uuid", UUID) + contentBuf := bytes.NewBuffer(nil) + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://logs-01.loggly.com/inputs/758a662d-f630-40cb-95ed-2502a5e9c872/tag/monibuca/", nil) + req.Header.Set("Content-Type", "application/json") + + content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"`, UUID, Engine.Version, runtime.GOOS, runtime.GOARCH) var c http.Client for { - req.Header.Set("streams", fmt.Sprintf("%d", Streams.Len())) + contentBuf.Reset() + postJson := fmt.Sprintf(`%s,"streams":%d}`, content, len(Streams.Map)) + contentBuf.WriteString(postJson) + req.Body = ioutil.NopCloser(contentBuf) c.Do(req) select { case <-ctx.Done(): diff --git a/plugin.go b/plugin.go index 4a86249..4821ffa 100644 --- a/plugin.go +++ b/plugin.go @@ -11,9 +11,9 @@ import ( "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/log" + "github.com/Monibuca/engine/v4/track" "github.com/Monibuca/engine/v4/util" - . "github.com/logrusorgru/aurora" - "github.com/sirupsen/logrus" + "go.uber.org/zap" "gopkg.in/yaml.v3" ) @@ -34,9 +34,9 @@ func InstallPlugin(config config.Plugin) *Plugin { return nil } if config != EngineConfig { - plugin.Entry = log.WithField("plugin", name) + plugin.Logger = log.With(zap.String("plugin", name)) Plugins[name] = plugin - plugin.Info(Green("install"), BrightBlue(plugin.Version)) + plugin.Info("install", zap.String("version", plugin.Version)) } return plugin } @@ -50,13 +50,13 @@ type Plugin struct { Version string //插件版本 RawConfig config.Config //配置的map形式方便查询 Modified config.Config //修改过的配置项 - *logrus.Entry + *zap.Logger } type PushPlugin interface { - PushStream(*Stream, string, *config.Push) + PushStream(Pusher) } type PullPlugin interface { - PullStream(string, string, *config.Pull) bool + PullStream(Puller) } func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { @@ -75,12 +75,12 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, if opt != Engine { pattern = "/" + strings.ToLower(opt.Name) + pattern } - opt.Info("http handle added:", pattern) + opt.Info("http handle added:" + pattern) EngineConfig.HandleFunc(pattern, func(rw http.ResponseWriter, r *http.Request) { if cors { util.CORS(rw, r) } - opt.Debug(r.RemoteAddr, " -> ", pattern) + opt.Debug("visit", zap.String("path", pattern), zap.String("remote", r.RemoteAddr)) handler(rw, r) }) } @@ -134,16 +134,16 @@ func (opt *Plugin) autoPull() { reflect.ValueOf(&pullConfig).Elem().Set(v.Field(i)) for streamPath, url := range pullConfig.PullList { if pullConfig.PullOnStart { - opt.Config.(PullPlugin).PullStream(streamPath, url, &pullConfig) + opt.Config.(PullPlugin).PullStream(Puller{&pullConfig, streamPath, url, 0}) } else if pullConfig.PullOnSubscribe { - PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), url, &pullConfig} + PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), Puller{&pullConfig, streamPath, url, 0}} } } } else if name == "Push" { var pushConfig config.Push reflect.ValueOf(&pushConfig).Elem().Set(v.Field(i)) for streamPath, url := range pushConfig.PushList { - PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), url, &pushConfig}) + PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), Pusher{&pushConfig, streamPath, url, 0}}) } } } @@ -186,7 +186,16 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) bool { if !ok { conf = EngineConfig } - return pub.receive(streamPath, pub, conf.GetPublishConfig()) + if ok = pub.receive(streamPath, pub, conf.GetPublishConfig()); ok { + p := pub.GetPublisher() + unA := track.UnknowAudio{} + unA.Stream = p.Stream + p.AudioTrack = &unA + unV := track.UnknowVideo{} + unV.Stream = p.Stream + p.VideoTrack = &unV + } + return ok } func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool { @@ -194,5 +203,9 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool { if !ok { conf = EngineConfig } - return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) + if ok = sub.receive(streamPath, sub, conf.GetSubscribeConfig()); ok { + p := sub.GetSubscriber() + p.TrackPlayer.Context, p.TrackPlayer.CancelFunc = context.WithCancel(p.IO) + } + return ok } diff --git a/publisher.go b/publisher.go index 95ed2ec..e5c809f 100644 --- a/publisher.go +++ b/publisher.go @@ -7,15 +7,21 @@ import ( type IPublisher interface { IIO - receive(string, IPublisher, *config.Publish) bool + GetPublisher() *Publisher + receive(string, any, *config.Publish) bool + Unpublish() } type Publisher struct { - IO[config.Publish, IPublisher] + IO[config.Publish] common.AudioTrack common.VideoTrack } +func (p *Publisher) GetPublisher() *Publisher { + return p +} + func (p *Publisher) Unpublish() { p.bye(p) } @@ -24,10 +30,10 @@ type PullEvent int // 用于远程拉流的发布者 type Puller struct { - Publisher - Config *config.Pull - RemoteURL string - PullCount int + Config *config.Pull + StreamPath string + RemoteURL string + PullCount int } // 是否需要重连 diff --git a/stream.go b/stream.go index 60af85b..3e5b18b 100644 --- a/stream.go +++ b/stream.go @@ -162,14 +162,14 @@ func (r *Stream) action(action StreamAction) bool { Bus.Publish(Event_REQUEST_PUBLISH, r) r.timeout.Reset(r.WaitTimeout) if _, ok = PullOnSubscribeList[r.Path]; ok { - PullOnSubscribeList[r.Path].Pull(r.Path) + PullOnSubscribeList[r.Path].Pull() } case STATE_PUBLISHING: r.timeout.Reset(time.Second) // 秒级心跳,检测track的存活度 Bus.Publish(Event_PUBLISH, r) if v, ok := PushOnPublishList[r.Path]; ok { for _, v := range v { - v.Push(r) + v.Push() } } case STATE_WAITCLOSE: @@ -243,7 +243,7 @@ func (s *Stream) run() { if ok { switch v := action.(type) { case IPublisher: - if v.Err() != nil { + if v.IsClosed() { s.action(ACTION_PUBLISHLOST) } else if s.action(ACTION_PUBLISH) { s.Publisher = v @@ -272,14 +272,14 @@ func (s *Stream) run() { case StreamAction: s.action(v) case ISubscriber: - if v.Err() == nil { + if !v.IsClosed() { s.Subscribers.Add(v) if wt := v.GetSubscribeConfig().WaitTimeout.Duration(); wt > s.WaitTimeout { s.WaitTimeout = wt } v.OnEvent(s) // 通知Subscriber已成功进入Stream Bus.Publish(Event_SUBSCRIBE, v) - v.Info(Sprintf(Yellow("added remains:%d"), len(s.Subscribers))) + s.Info("suber added", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers))) if s.Publisher != nil { s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 } @@ -288,7 +288,7 @@ func (s *Stream) run() { } } else if s.Subscribers.Delete(v) { Bus.Publish(Event_UNSUBSCRIBE, v) - v.Info(Sprintf(Yellow("removed remains:%d"), len(s.Subscribers))) + s.Info("suber removed", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers))) if s.Publisher != nil { s.Publisher.OnEvent(v) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量 } @@ -329,20 +329,6 @@ func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) { at.Stream = r return } -func (r *Stream) NewH264Track() *track.H264 { - r.Debug("create h264 track") - return track.NewH264(r) -} - -func (r *Stream) NewH265Track() *track.H265 { - r.Debug("create h265 track") - return track.NewH265(r) -} - -func (r *Stream) NewAACTrack() *track.AAC { - r.Debug("create aac track") - return track.NewAAC(r) -} // func (r *Stream) WaitDataTrack(names ...string) DataTrack { // t := <-r.WaitTrack(names...) diff --git a/subscriber.go b/subscriber.go index 91c76c5..18a10b2 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,6 +1,7 @@ package engine import ( + "context" "time" . "github.com/Monibuca/engine/v4/common" @@ -12,20 +13,31 @@ type AudioFrame AVFrame[AudioSlice] type VideoFrame AVFrame[NALUSlice] type ISubscriber interface { IIO - receive(string, ISubscriber, *config.Subscribe) bool + receive(string, any, *config.Subscribe) bool config.SubscribeConfig + GetSubscriber() *Subscriber + Unsubscribe() } - -// Subscriber 订阅者实体定义 -type Subscriber struct { - IO[config.Subscribe, ISubscriber] +type TrackPlayer struct { + context.Context + context.CancelFunc AudioTrack *track.Audio VideoTrack *track.Video vr *AVRing[NALUSlice] ar *AVRing[AudioSlice] } -func (p *Publisher) Unsubscribe() { +// Subscriber 订阅者实体定义 +type Subscriber struct { + IO[config.Subscribe] + TrackPlayer +} + +func (p *Subscriber) GetSubscriber() *Subscriber { + return p +} + +func (p *Subscriber) Unsubscribe() { p.bye(p) } @@ -46,28 +58,41 @@ func (s *Subscriber) OnEvent(event any) any { return event } -func (s *Subscriber) AcceptTrack(t Track) { +func (s *Subscriber) AddTrack(t Track) bool { if v, ok := t.(*track.Video); ok { - s.VideoTrack = v - s.vr = v.ReadRing() - go s.play() + if s.Config.SubVideo { + if s.VideoTrack != nil { + return false + } + s.VideoTrack = v + s.vr = v.ReadRing() + return true + } } else if a, ok := t.(*track.Audio); ok { - s.AudioTrack = a - s.ar = a.ReadRing() - if !s.Config.SubVideo { - go s.play() + if s.Config.SubAudio { + if s.AudioTrack != nil { + return false + } + s.AudioTrack = a + s.ar = a.ReadRing() + return true } } + return false // TODO: data track } +func (s *Subscriber) IsPlaying() bool { + return s.TrackPlayer.Err() == nil && (s.AudioTrack != nil || s.VideoTrack != nil) +} + //Play 开始播放 -func (s *Subscriber) play() { +func (s *Subscriber) Play() { var t time.Time - for s.Err() == nil { + for s.TrackPlayer.Err() == nil { if s.vr != nil { for { - vp := s.vr.Read(s) + vp := s.vr.Read(s.TrackPlayer) s.OnEvent((*VideoFrame)(vp)) s.vr.MoveNext() if vp.Timestamp.After(t) { @@ -78,7 +103,7 @@ func (s *Subscriber) play() { } if s.ar != nil { for { - ap := s.ar.Read(s) + ap := s.ar.Read(s.TrackPlayer) s.OnEvent((*AudioFrame)(ap)) s.ar.MoveNext() if ap.Timestamp.After(t) { @@ -91,9 +116,10 @@ func (s *Subscriber) play() { return } +type PushEvent int type Pusher struct { - Subscriber Config *config.Push + StreamPath string RemoteURL string PushCount int } diff --git a/track/aac.go b/track/aac.go index 281c681..5e8ade3 100644 --- a/track/aac.go +++ b/track/aac.go @@ -11,6 +11,7 @@ import ( ) func NewAAC(stream IStream) (aac *AAC) { + stream.Debug("create aac track") aac = &AAC{} aac.Name = "aac" aac.Stream = stream diff --git a/track/audio.go b/track/audio.go index b12b442..9c9f8e9 100644 --- a/track/audio.go +++ b/track/audio.go @@ -80,6 +80,14 @@ type UnknowAudio struct { AudioTrack } +func (ua *UnknowAudio) GetName() string { + return ua.Base.GetName() +} + +func (ua *UnknowAudio) Flush() { + ua.AudioTrack.Flush() +} + func (ua *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) { if ua.AudioTrack == nil { codecID := frame.AudioCodecID() diff --git a/track/h264.go b/track/h264.go index b713c26..7ecb345 100644 --- a/track/h264.go +++ b/track/h264.go @@ -16,6 +16,7 @@ type H264 struct { } func NewH264(stream IStream) (vt *H264) { + stream.Debug("create h264 track") vt = &H264{} vt.Name = "h264" vt.CodecID = codec.CodecID_H264 diff --git a/track/h265.go b/track/h265.go index db669c3..4a06a02 100644 --- a/track/h265.go +++ b/track/h265.go @@ -15,6 +15,7 @@ type H265 struct { } func NewH265(stream IStream) (vt *H265) { + stream.Debug("create h265 track") vt = &H265{} vt.Name = "h265" vt.CodecID = codec.CodecID_H265 diff --git a/track/video.go b/track/video.go index d608db4..0f4891d 100644 --- a/track/video.go +++ b/track/video.go @@ -151,6 +151,14 @@ type UnknowVideo struct { VideoTrack } +func (uv *UnknowVideo) GetName() string { + return uv.Base.GetName() +} + +func (uv *UnknowVideo) Flush() { + uv.VideoTrack.Flush() +} + /* Access Unit的首个nalu是4字节起始码。 这里举个例子说明,用JM可以生成这样一段码流(不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件: