diff --git a/config/types.go b/config/types.go index e4de83f..b761524 100644 --- a/config/types.go +++ b/config/types.go @@ -16,7 +16,7 @@ type Subscribe struct { } type Pull struct { - Reconnect int // 自动重连,0 表示不自动重连,-1 表示无限重连,高于0 的数代表最大重连次数 + RePull int // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉,高于0 的数代表最大重拉次数 PullOnStart bool // 启动时拉流 PullOnSubscribe bool // 订阅时自动拉流 PullList map[string]string // 自动拉流列表,以streamPath为key,url为value @@ -30,6 +30,7 @@ func (p *Pull) AddPull(streamPath string, url string) { } type Push struct { + RePush int // 断开后自动重推,0 表示不自动重推,-1 表示无限重推,高于0 的数代表最大重推次数 PushList map[string]string // 自动推流列表 } diff --git a/http.go b/http.go index ebc5790..372913e 100644 --- a/http.go +++ b/http.go @@ -3,10 +3,9 @@ package engine import ( "encoding/json" "net/http" - + "github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/config" . "github.com/logrusorgru/aurora" - log "github.com/sirupsen/logrus" ) type GlobalConfig struct { diff --git a/log.go b/log.go deleted file mode 100644 index a8241fb..0000000 --- a/log.go +++ /dev/null @@ -1,59 +0,0 @@ -package engine - -import ( - "io" - "strings" - - "github.com/Monibuca/engine/v4/util" - . "github.com/logrusorgru/aurora" - "github.com/mattn/go-colorable" - log "github.com/sirupsen/logrus" -) - -var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} - -// MultiLogWriter 可动态增减输出的多端写日志类 -type MultiLogWriter struct { - writers util.Slice[io.Writer] - io.Writer -} - -var colorableStdout = colorable.NewColorableStdout() -var LogWriter = &MultiLogWriter{ - writers: util.Slice[io.Writer]{colorableStdout}, - Writer: colorableStdout, -} - -func init() { - log.SetOutput(LogWriter) - log.SetFormatter(LogWriter) -} - -func (ml *MultiLogWriter) Add(w io.Writer) { - ml.writers.Add(w) - ml.Writer = io.MultiWriter(ml.writers...) -} - -func (ml *MultiLogWriter) Delete(w io.Writer) { - ml.writers.Delete(w) - ml.Writer = io.MultiWriter(ml.writers...) -} - -func (ml *MultiLogWriter) Format(entry *log.Entry) (b []byte, err error) { - pl := entry.Data["plugin"] - if pl == nil { - pl = "Engine" - } - l := strings.ToUpper(entry.Level.String())[:1] - var props string - if stream := entry.Data["stream"]; stream != nil { - props = Sprintf("[s:%s] ", stream) - } - if puber := entry.Data["puber"]; puber != nil { - props += Sprintf("[pub:%s] ", puber) - } - if suber := entry.Data["suber"]; suber != nil { - props += Sprintf("[sub:%s] ", suber) - } - return []byte(Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message)), nil -} diff --git a/log/log.go b/log/log.go new file mode 100644 index 0000000..0438bc2 --- /dev/null +++ b/log/log.go @@ -0,0 +1,309 @@ +package log + +import ( + "context" + "io" + "strings" + "time" + + . "github.com/logrusorgru/aurora" + "github.com/mattn/go-colorable" + log "github.com/sirupsen/logrus" +) + +var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} + +type LogWriter func(*log.Entry) string + +var colorableStdout = colorable.NewColorableStdout() + +func init() { + std.SetOutput(colorableStdout) + std.SetFormatter(LogWriter(defaultFormatter)) +} + +func defaultFormatter(entry *log.Entry) string { + pl := entry.Data["plugin"] + if pl == nil { + pl = "Engine" + } + l := strings.ToUpper(entry.Level.String())[:1] + var props string + if stream := entry.Data["stream"]; stream != nil { + props = Sprintf("[s:%s] ", stream) + } + if puber := entry.Data["puber"]; puber != nil { + props += Sprintf("[pub:%s] ", puber) + } + if suber := entry.Data["suber"]; suber != nil { + props += Sprintf("[sub:%s] ", suber) + } + return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message) +} + +func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) { + return []byte(f(entry)), nil +} + +var ( + // std is the name of the standard logger in stdlib `log` + std = log.New() +) + +func StandardLogger() *log.Logger { + return std +} + +// SetOutput sets the standard logger output. +func SetOutput(out io.Writer) { + std.SetOutput(out) +} + +// SetFormatter sets the standard logger formatter. +func SetFormatter(formatter log.Formatter) { + std.SetFormatter(formatter) +} + +// SetReportCaller sets whether the standard logger will include the calling +// method as a field. +func SetReportCaller(include bool) { + std.SetReportCaller(include) +} + +// SetLevel sets the standard logger level. +func SetLevel(level log.Level) { + std.SetLevel(level) +} + +// GetLevel returns the standard logger level. +func GetLevel() log.Level { + return std.GetLevel() +} + +// IsLevelEnabled checks if the log level of the standard logger is greater than the level param +func IsLevelEnabled(level log.Level) bool { + return std.IsLevelEnabled(level) +} + +// AddHook adds a hook to the standard logger hooks. +func AddHook(hook log.Hook) { + std.AddHook(hook) +} + +// WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key. +func WithError(err error) *log.Entry { + return std.WithField(log.ErrorKey, err) +} + +// WithContext creates an entry from the standard logger and adds a context to it. +func WithContext(ctx context.Context) *log.Entry { + return std.WithContext(ctx) +} + +// WithField creates an entry from the standard logger and adds a field to +// it. If you want multiple fields, use `WithFields`. +// +// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal +// or Panic on the Entry it returns. +func WithField(key string, value interface{}) *log.Entry { + return std.WithField(key, value) +} + +// WithFields creates an entry from the standard logger and adds multiple +// fields to it. This is simply a helper for `WithField`, invoking it +// once for each field. +// +// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal +// or Panic on the Entry it returns. +func WithFields(fields log.Fields) *log.Entry { + return std.WithFields(fields) +} + +// WithTime creates an entry from the standard logger and overrides the time of +// logs generated with it. +// +// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal +// or Panic on the Entry it returns. +func WithTime(t time.Time) *log.Entry { + return std.WithTime(t) +} + +// Trace logs a message at level Trace on the standard logger. +func Trace(args ...interface{}) { + std.Trace(args...) +} + +// Debug logs a message at level Debug on the standard logger. +func Debug(args ...interface{}) { + std.Debug(args...) +} + +// Print logs a message at level Info on the standard logger. +func Print(args ...interface{}) { + std.Print(args...) +} + +// Info logs a message at level Info on the standard logger. +func Info(args ...interface{}) { + std.Info(args...) +} + +// Warn logs a message at level Warn on the standard logger. +func Warn(args ...interface{}) { + std.Warn(args...) +} + +// Warning logs a message at level Warn on the standard logger. +func Warning(args ...interface{}) { + std.Warning(args...) +} + +// Error logs a message at level Error on the standard logger. +func Error(args ...interface{}) { + std.Error(args...) +} + +// Panic logs a message at level Panic on the standard logger. +func Panic(args ...interface{}) { + std.Panic(args...) +} + +// Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +func Fatal(args ...interface{}) { + std.Fatal(args...) +} + +// TraceFn logs a message from a func at level Trace on the standard logger. +func TraceFn(fn log.LogFunction) { + std.TraceFn(fn) +} + +// DebugFn logs a message from a func at level Debug on the standard logger. +func DebugFn(fn log.LogFunction) { + std.DebugFn(fn) +} + +// PrintFn logs a message from a func at level Info on the standard logger. +func PrintFn(fn log.LogFunction) { + std.PrintFn(fn) +} + +// InfoFn logs a message from a func at level Info on the standard logger. +func InfoFn(fn log.LogFunction) { + std.InfoFn(fn) +} + +// WarnFn logs a message from a func at level Warn on the standard logger. +func WarnFn(fn log.LogFunction) { + std.WarnFn(fn) +} + +// WarningFn logs a message from a func at level Warn on the standard logger. +func WarningFn(fn log.LogFunction) { + std.WarningFn(fn) +} + +// ErrorFn logs a message from a func at level Error on the standard logger. +func ErrorFn(fn log.LogFunction) { + std.ErrorFn(fn) +} + +// PanicFn logs a message from a func at level Panic on the standard logger. +func PanicFn(fn log.LogFunction) { + std.PanicFn(fn) +} + +// FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1. +func FatalFn(fn log.LogFunction) { + std.FatalFn(fn) +} + +// Tracef logs a message at level Trace on the standard logger. +func Tracef(format string, args ...interface{}) { + std.Tracef(format, args...) +} + +// Debugf logs a message at level Debug on the standard logger. +func Debugf(format string, args ...interface{}) { + std.Debugf(format, args...) +} + +// Printf logs a message at level Info on the standard logger. +func Printf(format string, args ...interface{}) { + std.Printf(format, args...) +} + +// Infof logs a message at level Info on the standard logger. +func Infof(format string, args ...interface{}) { + std.Infof(format, args...) +} + +// Warnf logs a message at level Warn on the standard logger. +func Warnf(format string, args ...interface{}) { + std.Warnf(format, args...) +} + +// Warningf logs a message at level Warn on the standard logger. +func Warningf(format string, args ...interface{}) { + std.Warningf(format, args...) +} + +// Errorf logs a message at level Error on the standard logger. +func Errorf(format string, args ...interface{}) { + std.Errorf(format, args...) +} + +// Panicf logs a message at level Panic on the standard logger. +func Panicf(format string, args ...interface{}) { + std.Panicf(format, args...) +} + +// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +func Fatalf(format string, args ...interface{}) { + std.Fatalf(format, args...) +} + +// Traceln logs a message at level Trace on the standard logger. +func Traceln(args ...interface{}) { + std.Traceln(args...) +} + +// Debugln logs a message at level Debug on the standard logger. +func Debugln(args ...interface{}) { + std.Debugln(args...) +} + +// Println logs a message at level Info on the standard logger. +func Println(args ...interface{}) { + std.Println(args...) +} + +// Infoln logs a message at level Info on the standard logger. +func Infoln(args ...interface{}) { + std.Infoln(args...) +} + +// Warnln logs a message at level Warn on the standard logger. +func Warnln(args ...interface{}) { + std.Warnln(args...) +} + +// Warningln logs a message at level Warn on the standard logger. +func Warningln(args ...interface{}) { + std.Warningln(args...) +} + +// Errorln logs a message at level Error on the standard logger. +func Errorln(args ...interface{}) { + std.Errorln(args...) +} + +// Panicln logs a message at level Panic on the standard logger. +func Panicln(args ...interface{}) { + std.Panicln(args...) +} + +// Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +func Fatalln(args ...interface{}) { + std.Fatalln(args...) +} diff --git a/main.go b/main.go index 235d93a..69f482d 100644 --- a/main.go +++ b/main.go @@ -12,10 +12,10 @@ import ( "time" "github.com/Monibuca/engine/v4/config" + "github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/util" "github.com/google/uuid" . "github.com/logrusorgru/aurora" - log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" ) @@ -34,15 +34,25 @@ var ( handlerFuncType = reflect.TypeOf(toolManForGetHandlerFuncType) //供反射使用的Handler类型的类型 MergeConfigs = []string{"Publish", "Subscribe"} //需要合并配置的属性项,插件若没有配置则使用全局配置 PullOnSubscribeList = make(map[string]PullOnSubscribe) //按需拉流的配置信息 + PushOnPublishList = make(map[string][]PushOnPublish) //发布时自动推流配置 ) +type PushOnPublish struct { + PushPlugin + Pusher +} + +func (p PushOnPublish) Push(stream *Stream) { + p.PushStream(stream, p.Pusher) +} + type PullOnSubscribe struct { - Plugin PullPlugin + PullPlugin Puller } func (p PullOnSubscribe) Pull(streamPath string) { - p.Plugin.PullStream(streamPath, p.Puller) + p.PullStream(streamPath, p.Puller) } // Run 启动Monibuca引擎,传入总的Context,可用于关闭所有 diff --git a/plugin.go b/plugin.go index c8f4e51..786cf91 100644 --- a/plugin.go +++ b/plugin.go @@ -10,9 +10,10 @@ import ( "strings" "github.com/Monibuca/engine/v4/config" + "github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/util" . "github.com/logrusorgru/aurora" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" ) @@ -49,9 +50,11 @@ type Plugin struct { Version string //插件版本 RawConfig config.Config //配置的map形式方便查询 Modified config.Config //修改过的配置项 - *log.Entry + *logrus.Entry +} +type PushPlugin interface { + PushStream(*Stream, Pusher) } - type PullPlugin interface { PullStream(string, Puller) bool } @@ -126,7 +129,7 @@ func (opt *Plugin) autoPull() { t := reflect.TypeOf(opt.Config).Elem() v := reflect.ValueOf(opt.Config).Elem() for i, j := 0, t.NumField(); i < j; i++ { - if t.Field(i).Name == "Pull" { + if name := t.Field(i).Name; name == "Pull" { var pullConfig config.Pull reflect.ValueOf(&pullConfig).Elem().Set(v.Field(i)) for streamPath, url := range pullConfig.PullList { @@ -137,6 +140,13 @@ func (opt *Plugin) autoPull() { PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), puller} } } + } else if name == "Push" { + var pushConfig config.Push + reflect.ValueOf(&pushConfig).Elem().Set(v.Field(i)) + for streamPath, url := range pushConfig.PushList { + pusher := Pusher{RemoteURL: url, Config: &pushConfig} + PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), pusher}) + } } } } diff --git a/publisher.go b/publisher.go index b240c4f..bdbefc5 100644 --- a/publisher.go +++ b/publisher.go @@ -78,7 +78,7 @@ type Puller struct { // 是否需要重连 func (pub *Puller) reconnect() bool { - return pub.Config.Reconnect == -1 || pub.pullCount <= pub.Config.Reconnect + return pub.Config.RePull == -1 || pub.pullCount <= pub.Config.RePull } func (pub *Puller) pull() { diff --git a/stream.go b/stream.go index 6573998..086a849 100644 --- a/stream.go +++ b/stream.go @@ -8,10 +8,11 @@ import ( "time" "unsafe" + "github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/track" "github.com/Monibuca/engine/v4/util" . "github.com/logrusorgru/aurora" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) type StreamState byte @@ -62,9 +63,7 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{ { ACTION_TIMEOUT: STATE_DESTROYED, }, - { - - }, + {}, } // Streams 所有的流集合 @@ -103,10 +102,10 @@ type Stream struct { StartTime time.Time //流的创建时间 Subscribers util.Slice[*Subscriber] // 订阅者 Tracks - FrameCount uint32 //帧总数 - AppName string - StreamName string - *log.Entry `json:"-"` + FrameCount uint32 //帧总数 + AppName string + StreamName string + *logrus.Entry `json:"-"` } func (s *Stream) SSRC() uint32 { @@ -180,6 +179,11 @@ func (r *Stream) action(action StreamAction) bool { r.WaitDone() r.timeout.Reset(r.PublishTimeout) Bus.Publish(Event_PUBLISH, r) + if v, ok := PushOnPublishList[r.Path]; ok { + for _, v := range v { + v.Push(r) + } + } case STATE_WAITCLOSE: r.timeout.Reset(r.WaitCloseTimeout) case STATE_CLOSED: @@ -251,8 +255,9 @@ func (r *Stream) run() { r.action(v) case *Subscriber: r.Subscribers.Add(v) + v.SubscribeTime = time.Now() Bus.Publish(Event_SUBSCRIBE, v) - v.Info(Sprintf(Yellow("added remains:%d") ,len(r.Subscribers))) + v.Info(Sprintf(Yellow("added remains:%d"), len(r.Subscribers))) if r.Subscribers.Len() == 1 { r.action(ACTION_FIRSTENTER) } diff --git a/subscriber.go b/subscriber.go index 8734906..bb8bf5b 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,6 +2,7 @@ package engine import ( "context" + "io" "net/url" "time" @@ -144,3 +145,45 @@ func (r *Subscriber) WaitAudioTrack(names ...string) *track.Audio { return t.(*track.Audio) } } + +type IPusher interface { + Push(int) + Close() +} +type Pusher struct { + Subscriber + specific IPusher + Config *config.Push + RemoteURL string + io.Writer + io.Closer + pushCount int +} + +// 是否需要重连 +func (pub *Pusher) reconnect() bool { + return pub.Config.RePush == -1 || pub.pushCount <= pub.Config.RePush +} + +func (pub *Pusher) push() { + pub.specific.Push(pub.pushCount) + pub.pushCount++ + pub.specific.Close() + pub.Subscriber.Stream.Subscribe(&pub.Subscriber) + if !pub.Subscriber.Stream.IsClosed() { + go pub.push() + } +} + +func (pub *Pusher) Push(specific IPusher, config config.Push) { + pub.specific = specific + pub.Config = &config + go pub.push() +} + +func (p *Pusher) Close() { + if p.Closer != nil { + p.Closer.Close() + } + p.Subscriber.Close() +} diff --git a/util/socket.go b/util/socket.go index 396a971..5cd169c 100644 --- a/util/socket.go +++ b/util/socket.go @@ -3,10 +3,11 @@ package util import ( "context" "encoding/json" - log "github.com/sirupsen/logrus" "net" "net/http" "time" + + "github.com/Monibuca/engine/v4/log" ) type TCPListener interface {