消除logrus,优化puller和pusher

This commit is contained in:
dexter
2022-02-17 21:49:55 +08:00
parent d32dfd84ac
commit fd088b11a2
16 changed files with 474 additions and 357 deletions

View File

@@ -5,8 +5,8 @@ import (
"time"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/log"
"github.com/pion/rtp"
"github.com/sirupsen/logrus"
)
type NALUSlice net.Buffers
@@ -94,7 +94,7 @@ func (rtp *RTPFrame) Marshal() *RTPFrame {
func (rtp *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
rtp.Raw = raw
if err := rtp.Packet.Unmarshal(raw); err != nil {
logrus.Error(err)
log.Error(err)
return nil
}
return rtp
@@ -112,6 +112,11 @@ type DataFrame[T any] struct {
BaseFrame
Value T
}
type MediaFrame interface {
GetFLV() net.Buffers
GetAVCC() net.Buffers
GetRTP() []*RTPFrame
}
type AVFrame[T RawSlice] struct {
BaseFrame
IFrame bool
@@ -124,6 +129,15 @@ type AVFrame[T RawSlice] struct {
canRead bool
}
func (av *AVFrame[T]) GetFLV() net.Buffers {
return av.FLV
}
func (av *AVFrame[T]) GetAVCC() net.Buffers {
return av.AVCC
}
func (av *AVFrame[T]) GetRTP() []*RTPFrame {
return av.RTP
}
func (av *AVFrame[T]) AppendRaw(raw ...T) {
av.Raw = append(av.Raw, raw...)
}

View File

@@ -1,13 +1,12 @@
package config
import (
"github.com/Monibuca/engine/v4/log"
"net"
"net/http"
"reflect"
"strings"
"time"
"github.com/sirupsen/logrus"
)
type Config map[string]any
@@ -61,7 +60,7 @@ func (config Config) Unmarshal(s any) {
for k, v := range config {
name, ok := nameMap[k]
if !ok {
logrus.Error("no config named:", k)
log.Error("no config named:", k)
continue
}
// 需要被写入的字段

1
go.mod
View File

@@ -9,7 +9,6 @@ require (
github.com/mattn/go-colorable v0.1.8
github.com/pion/rtp v1.7.4
github.com/q191201771/naza v0.19.1
github.com/sirupsen/logrus v1.8.1
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b

3
go.sum
View File

@@ -50,11 +50,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY=
github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=

41
io.go
View File

@@ -8,9 +8,7 @@ import (
"strings"
"time"
"github.com/Monibuca/engine/v4/common"
"github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/log"
"go.uber.org/zap"
)
@@ -18,14 +16,14 @@ type IOConfig interface {
config.Publish | config.Subscribe
}
type IO[C IOConfig, S IIO] struct {
type IO[C IOConfig] struct {
ID string
Type string
context.Context
context.CancelFunc
*zap.Logger
StartTime time.Time //创建时间
Stream common.IStream `json:"-"`
StartTime time.Time //创建时间
Stream *Stream `json:"-"`
io.Reader `json:"-"`
io.Writer `json:"-"`
io.Closer `json:"-"`
@@ -33,8 +31,13 @@ type IO[C IOConfig, S IIO] struct {
Config *C
}
func (io *IO[C, S]) OnEvent(event any) any {
func (io *IO[C]) IsClosed() bool {
return io.Err() != nil
}
func (io *IO[C]) OnEvent(event any) any {
switch v := event.(type) {
case context.Context:
io.Context, io.CancelFunc = context.WithCancel(v)
case *Stream:
io.StartTime = time.Now()
io.Stream = v
@@ -52,14 +55,21 @@ func (io *IO[C, S]) OnEvent(event any) any {
}
return event
}
type IIO interface {
context.Context
log.Zap
OnEvent(any) any
func (io *IO[C]) getID() string {
return io.ID
}
func (io *IO[C]) getType() string {
return io.Type
}
func (io *IO[C, S]) bye(specific S) {
type IIO interface {
IsClosed() bool
OnEvent(any) any
getID() string
getType() string
}
func (io *IO[C]) bye(specific any) {
if io.CancelFunc != nil {
io.CancelFunc()
}
@@ -68,7 +78,7 @@ func (io *IO[C, S]) bye(specific S) {
}
}
func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) bool {
func (io *IO[C]) receive(streamPath string, specific any, conf *C) bool {
Streams.Lock()
defer Streams.Unlock()
streamPath = strings.Trim(streamPath, "/")
@@ -82,12 +92,15 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) bool {
if v, ok := c.(*config.Subscribe); ok {
wt = v.WaitTimeout.Duration()
}
if io.Context == nil {
io.Context, io.CancelFunc = context.WithCancel(Engine)
}
s, created := findOrCreateStream(u.Path, wt)
if s.IsClosed() {
return false
}
if v, ok := c.(*config.Publish); ok {
if s.Publisher != nil && s.Publisher.Err() == nil {
if s.Publisher != nil && !s.Publisher.IsClosed() {
// 根据配置是否剔出原来的发布者
if v.KickExist {
s.Warn("kick", zap.Any("publisher", s.Publisher))

View File

@@ -1,28 +1,25 @@
package log
import (
"context"
"io"
"strings"
"time"
. "github.com/logrusorgru/aurora"
// . "github.com/logrusorgru/aurora"
"github.com/mattn/go-colorable"
log "github.com/sirupsen/logrus"
// log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)
var logger *zap.Logger
var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White}
var logger *zap.SugaredLogger
// var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White}
type LogWriter func(*log.Entry) string
// type LogWriter func(*log.Entry) string
var colorableStdout = colorable.NewColorableStdout()
func init() {
logger, _ = zap.NewDevelopment()
std.SetOutput(colorableStdout)
std.SetFormatter(LogWriter(defaultFormatter))
l, _ := zap.NewDevelopment()
logger = l.Sugar()
// std.SetOutput(colorableStdout)
// std.SetFormatter(LogWriter(defaultFormatter))
}
type Zap interface {
@@ -34,291 +31,335 @@ type Zap interface {
}
func With(fields ...zap.Field) *zap.Logger {
return logger.With(fields...)
return logger.Desugar().With(fields...)
}
func defaultFormatter(entry *log.Entry) string {
pl := entry.Data["plugin"]
if pl == nil {
pl = "Engine"
}
l := strings.ToUpper(entry.Level.String())[:1]
var props string
if stream := entry.Data["stream"]; stream != nil {
props = Sprintf("[s:%s] ", stream)
}
if puber := entry.Data["puber"]; puber != nil {
props += Sprintf("[pub:%s] ", puber)
}
if suber := entry.Data["suber"]; suber != nil {
props += Sprintf("[sub:%s] ", suber)
}
return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message)
func Debug(args ...any) {
logger.Debug(args...)
}
func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) {
return []byte(f(entry)), nil
func Info(args ...any) {
logger.Info(args...)
}
var (
// std is the name of the standard logger in stdlib `log`
std = log.New()
)
func StandardLogger() *log.Logger {
return std
func Warn(args ...any) {
logger.Warn(args...)
}
// SetOutput sets the standard logger output.
func SetOutput(out io.Writer) {
std.SetOutput(out)
func Error(args ...any) {
logger.Error(args...)
}
// SetFormatter sets the standard logger formatter.
func SetFormatter(formatter log.Formatter) {
std.SetFormatter(formatter)
}
// SetReportCaller sets whether the standard logger will include the calling
// method as a field.
func SetReportCaller(include bool) {
std.SetReportCaller(include)
}
// SetLevel sets the standard logger level.
func SetLevel(level log.Level) {
std.SetLevel(level)
}
// GetLevel returns the standard logger level.
func GetLevel() log.Level {
return std.GetLevel()
}
// IsLevelEnabled checks if the log level of the standard logger is greater than the level param
func IsLevelEnabled(level log.Level) bool {
return std.IsLevelEnabled(level)
}
// AddHook adds a hook to the standard logger hooks.
func AddHook(hook log.Hook) {
std.AddHook(hook)
}
// WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key.
func WithError(err error) *log.Entry {
return std.WithField(log.ErrorKey, err)
}
// WithContext creates an entry from the standard logger and adds a context to it.
func WithContext(ctx context.Context) *log.Entry {
return std.WithContext(ctx)
}
// WithField creates an entry from the standard logger and adds a field to
// it. If you want multiple fields, use `WithFields`.
//
// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// or Panic on the Entry it returns.
func WithField(key string, value interface{}) *log.Entry {
return std.WithField(key, value)
}
// WithFields creates an entry from the standard logger and adds multiple
// fields to it. This is simply a helper for `WithField`, invoking it
// once for each field.
//
// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// or Panic on the Entry it returns.
func WithFields(fields log.Fields) *log.Entry {
return std.WithFields(fields)
}
// WithTime creates an entry from the standard logger and overrides the time of
// logs generated with it.
//
// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// or Panic on the Entry it returns.
func WithTime(t time.Time) *log.Entry {
return std.WithTime(t)
}
// Trace logs a message at level Trace on the standard logger.
func Trace(args ...interface{}) {
std.Trace(args...)
}
// Debug logs a message at level Debug on the standard logger.
func Debug(args ...interface{}) {
std.Debug(args...)
}
// Print logs a message at level Info on the standard logger.
func Print(args ...interface{}) {
std.Print(args...)
}
// Info logs a message at level Info on the standard logger.
func Info(args ...interface{}) {
std.Info(args...)
}
// Warn logs a message at level Warn on the standard logger.
func Warn(args ...interface{}) {
std.Warn(args...)
}
// Warning logs a message at level Warn on the standard logger.
func Warning(args ...interface{}) {
std.Warning(args...)
}
// Error logs a message at level Error on the standard logger.
func Error(args ...interface{}) {
std.Error(args...)
}
// Panic logs a message at level Panic on the standard logger.
func Panic(args ...interface{}) {
std.Panic(args...)
}
// Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
func Fatal(args ...interface{}) {
std.Fatal(args...)
}
// TraceFn logs a message from a func at level Trace on the standard logger.
func TraceFn(fn log.LogFunction) {
std.TraceFn(fn)
}
// DebugFn logs a message from a func at level Debug on the standard logger.
func DebugFn(fn log.LogFunction) {
std.DebugFn(fn)
}
// PrintFn logs a message from a func at level Info on the standard logger.
func PrintFn(fn log.LogFunction) {
std.PrintFn(fn)
}
// InfoFn logs a message from a func at level Info on the standard logger.
func InfoFn(fn log.LogFunction) {
std.InfoFn(fn)
}
// WarnFn logs a message from a func at level Warn on the standard logger.
func WarnFn(fn log.LogFunction) {
std.WarnFn(fn)
}
// WarningFn logs a message from a func at level Warn on the standard logger.
func WarningFn(fn log.LogFunction) {
std.WarningFn(fn)
}
// ErrorFn logs a message from a func at level Error on the standard logger.
func ErrorFn(fn log.LogFunction) {
std.ErrorFn(fn)
}
// PanicFn logs a message from a func at level Panic on the standard logger.
func PanicFn(fn log.LogFunction) {
std.PanicFn(fn)
}
// FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1.
func FatalFn(fn log.LogFunction) {
std.FatalFn(fn)
}
// Tracef logs a message at level Trace on the standard logger.
func Tracef(format string, args ...interface{}) {
std.Tracef(format, args...)
}
// Debugf logs a message at level Debug on the standard logger.
func Debugf(format string, args ...interface{}) {
std.Debugf(format, args...)
logger.Debugf(format, args...)
}
// Printf logs a message at level Info on the standard logger.
func Printf(format string, args ...interface{}) {
std.Printf(format, args...)
}
// Infof logs a message at level Info on the standard logger.
func Infof(format string, args ...interface{}) {
std.Infof(format, args...)
logger.Infof(format, args...)
}
// Warnf logs a message at level Warn on the standard logger.
func Warnf(format string, args ...interface{}) {
std.Warnf(format, args...)
}
// Warningf logs a message at level Warn on the standard logger.
func Warningf(format string, args ...interface{}) {
std.Warningf(format, args...)
logger.Warnf(format, args...)
}
// Errorf logs a message at level Error on the standard logger.
func Errorf(format string, args ...interface{}) {
std.Errorf(format, args...)
logger.Errorf(format, args...)
}
// Panicf logs a message at level Panic on the standard logger.
func Panicf(format string, args ...interface{}) {
std.Panicf(format, args...)
logger.Panicf(format, args...)
}
// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
func Fatalf(format string, args ...interface{}) {
std.Fatalf(format, args...)
logger.Fatalf(format, args...)
}
// Traceln logs a message at level Trace on the standard logger.
func Traceln(args ...interface{}) {
std.Traceln(args...)
}
// func defaultFormatter(entry *log.Entry) string {
// pl := entry.Data["plugin"]
// if pl == nil {
// pl = "Engine"
// }
// l := strings.ToUpper(entry.Level.String())[:1]
// var props string
// if stream := entry.Data["stream"]; stream != nil {
// props = Sprintf("[s:%s] ", stream)
// }
// if puber := entry.Data["puber"]; puber != nil {
// props += Sprintf("[pub:%s] ", puber)
// }
// if suber := entry.Data["suber"]; suber != nil {
// props += Sprintf("[sub:%s] ", suber)
// }
// return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message)
// }
// Debugln logs a message at level Debug on the standard logger.
func Debugln(args ...interface{}) {
std.Debugln(args...)
}
// func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) {
// return []byte(f(entry)), nil
// }
// Println logs a message at level Info on the standard logger.
func Println(args ...interface{}) {
std.Println(args...)
}
// var (
// // std is the name of the standard logger in stdlib `log`
// std = log.New()
// )
// Infoln logs a message at level Info on the standard logger.
func Infoln(args ...interface{}) {
std.Infoln(args...)
}
// func StandardLogger() *log.Logger {
// return std
// }
// Warnln logs a message at level Warn on the standard logger.
func Warnln(args ...interface{}) {
std.Warnln(args...)
}
// // SetOutput sets the standard logger output.
// func SetOutput(out io.Writer) {
// std.SetOutput(out)
// }
// Warningln logs a message at level Warn on the standard logger.
func Warningln(args ...interface{}) {
std.Warningln(args...)
}
// // SetFormatter sets the standard logger formatter.
// func SetFormatter(formatter log.Formatter) {
// std.SetFormatter(formatter)
// }
// Errorln logs a message at level Error on the standard logger.
func Errorln(args ...interface{}) {
std.Errorln(args...)
}
// // SetReportCaller sets whether the standard logger will include the calling
// // method as a field.
// func SetReportCaller(include bool) {
// std.SetReportCaller(include)
// }
// Panicln logs a message at level Panic on the standard logger.
func Panicln(args ...interface{}) {
std.Panicln(args...)
}
// // SetLevel sets the standard logger level.
// func SetLevel(level log.Level) {
// std.SetLevel(level)
// }
// Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
func Fatalln(args ...interface{}) {
std.Fatalln(args...)
}
// // GetLevel returns the standard logger level.
// func GetLevel() log.Level {
// return std.GetLevel()
// }
// // IsLevelEnabled checks if the log level of the standard logger is greater than the level param
// func IsLevelEnabled(level log.Level) bool {
// return std.IsLevelEnabled(level)
// }
// // AddHook adds a hook to the standard logger hooks.
// func AddHook(hook log.Hook) {
// std.AddHook(hook)
// }
// // WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key.
// func WithError(err error) *log.Entry {
// return std.WithField(log.ErrorKey, err)
// }
// // WithContext creates an entry from the standard logger and adds a context to it.
// func WithContext(ctx context.Context) *log.Entry {
// return std.WithContext(ctx)
// }
// // WithField creates an entry from the standard logger and adds a field to
// // it. If you want multiple fields, use `WithFields`.
// //
// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// // or Panic on the Entry it returns.
// func WithField(key string, value interface{}) *log.Entry {
// return std.WithField(key, value)
// }
// // WithFields creates an entry from the standard logger and adds multiple
// // fields to it. This is simply a helper for `WithField`, invoking it
// // once for each field.
// //
// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// // or Panic on the Entry it returns.
// func WithFields(fields log.Fields) *log.Entry {
// return std.WithFields(fields)
// }
// // WithTime creates an entry from the standard logger and overrides the time of
// // logs generated with it.
// //
// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// // or Panic on the Entry it returns.
// func WithTime(t time.Time) *log.Entry {
// return std.WithTime(t)
// }
// // Trace logs a message at level Trace on the standard logger.
// func Trace(args ...interface{}) {
// std.Trace(args...)
// }
// // Debug logs a message at level Debug on the standard logger.
// func Debug(args ...interface{}) {
// std.Debug(args...)
// }
// // Print logs a message at level Info on the standard logger.
// func Print(args ...interface{}) {
// std.Print(args...)
// }
// // Info logs a message at level Info on the standard logger.
// func Info(args ...interface{}) {
// std.Info(args...)
// }
// // Warn logs a message at level Warn on the standard logger.
// func Warn(args ...interface{}) {
// std.Warn(args...)
// }
// // Warning logs a message at level Warn on the standard logger.
// func Warning(args ...interface{}) {
// std.Warning(args...)
// }
// // Error logs a message at level Error on the standard logger.
// func Error(args ...interface{}) {
// std.Error(args...)
// }
// // Panic logs a message at level Panic on the standard logger.
// func Panic(args ...interface{}) {
// std.Panic(args...)
// }
// // Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
// func Fatal(args ...interface{}) {
// std.Fatal(args...)
// }
// // TraceFn logs a message from a func at level Trace on the standard logger.
// func TraceFn(fn log.LogFunction) {
// std.TraceFn(fn)
// }
// // DebugFn logs a message from a func at level Debug on the standard logger.
// func DebugFn(fn log.LogFunction) {
// std.DebugFn(fn)
// }
// // PrintFn logs a message from a func at level Info on the standard logger.
// func PrintFn(fn log.LogFunction) {
// std.PrintFn(fn)
// }
// // InfoFn logs a message from a func at level Info on the standard logger.
// func InfoFn(fn log.LogFunction) {
// std.InfoFn(fn)
// }
// // WarnFn logs a message from a func at level Warn on the standard logger.
// func WarnFn(fn log.LogFunction) {
// std.WarnFn(fn)
// }
// // WarningFn logs a message from a func at level Warn on the standard logger.
// func WarningFn(fn log.LogFunction) {
// std.WarningFn(fn)
// }
// // ErrorFn logs a message from a func at level Error on the standard logger.
// func ErrorFn(fn log.LogFunction) {
// std.ErrorFn(fn)
// }
// // PanicFn logs a message from a func at level Panic on the standard logger.
// func PanicFn(fn log.LogFunction) {
// std.PanicFn(fn)
// }
// // FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1.
// func FatalFn(fn log.LogFunction) {
// std.FatalFn(fn)
// }
// // Tracef logs a message at level Trace on the standard logger.
// func Tracef(format string, args ...interface{}) {
// std.Tracef(format, args...)
// }
// // Debugf logs a message at level Debug on the standard logger.
// func Debugf(format string, args ...interface{}) {
// std.Debugf(format, args...)
// }
// // Printf logs a message at level Info on the standard logger.
// func Printf(format string, args ...interface{}) {
// std.Printf(format, args...)
// }
// // Infof logs a message at level Info on the standard logger.
// func Infof(format string, args ...interface{}) {
// std.Infof(format, args...)
// }
// // Warnf logs a message at level Warn on the standard logger.
// func Warnf(format string, args ...interface{}) {
// std.Warnf(format, args...)
// }
// // Warningf logs a message at level Warn on the standard logger.
// func Warningf(format string, args ...interface{}) {
// std.Warningf(format, args...)
// }
// // Errorf logs a message at level Error on the standard logger.
// func Errorf(format string, args ...interface{}) {
// std.Errorf(format, args...)
// }
// // Panicf logs a message at level Panic on the standard logger.
// func Panicf(format string, args ...interface{}) {
// std.Panicf(format, args...)
// }
// // Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
// func Fatalf(format string, args ...interface{}) {
// std.Fatalf(format, args...)
// }
// // Traceln logs a message at level Trace on the standard logger.
// func Traceln(args ...interface{}) {
// std.Traceln(args...)
// }
// // Debugln logs a message at level Debug on the standard logger.
// func Debugln(args ...interface{}) {
// std.Debugln(args...)
// }
// // Println logs a message at level Info on the standard logger.
// func Println(args ...interface{}) {
// std.Println(args...)
// }
// // Infoln logs a message at level Info on the standard logger.
// func Infoln(args ...interface{}) {
// std.Infoln(args...)
// }
// // Warnln logs a message at level Warn on the standard logger.
// func Warnln(args ...interface{}) {
// std.Warnln(args...)
// }
// // Warningln logs a message at level Warn on the standard logger.
// func Warningln(args ...interface{}) {
// std.Warningln(args...)
// }
// // Errorln logs a message at level Error on the standard logger.
// func Errorln(args ...interface{}) {
// std.Errorln(args...)
// }
// // Panicln logs a message at level Panic on the standard logger.
// func Panicln(args ...interface{}) {
// std.Panicln(args...)
// }
// // Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
// func Fatalln(args ...interface{}) {
// std.Fatalln(args...)
// }

32
main.go
View File

@@ -1,6 +1,7 @@
package engine
import (
"bytes"
"context"
"fmt"
"io/ioutil"
@@ -16,6 +17,7 @@ import (
"github.com/Monibuca/engine/v4/util"
"github.com/google/uuid"
. "github.com/logrusorgru/aurora"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
@@ -39,22 +41,20 @@ var (
type PushOnPublish struct {
PushPlugin
RemoteURL string
Config *config.Push
Pusher
}
func (p PushOnPublish) Push(stream *Stream) {
p.PushStream(stream, p.RemoteURL, p.Config)
func (p PushOnPublish) Push() {
p.PushStream(p.Pusher)
}
type PullOnSubscribe struct {
PullPlugin
RemoteURL string
Config *config.Pull
Puller
}
func (p PullOnSubscribe) Pull(streamPath string) {
p.PullStream(streamPath, p.RemoteURL, p.Config)
func (p PullOnSubscribe) Pull() {
p.PullStream(p.Puller)
}
// Run 启动Monibuca引擎传入总的Context可用于关闭所有
@@ -83,7 +83,7 @@ func Run(ctx context.Context, configFile string) (err error) {
} else {
log.Warn("no config file found , use default config values")
}
Engine.Entry = log.WithContext(Engine)
Engine.Logger = log.With(zap.String("plugin", "engine"))
Engine.registerHandler()
go EngineConfig.Update(Engine.RawConfig)
for name, plugin := range Plugins {
@@ -92,13 +92,17 @@ func Run(ctx context.Context, configFile string) (err error) {
}
UUID := uuid.NewString()
reportTimer := time.NewTimer(time.Minute)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://monibuca.com:2022/report/engine", nil)
req.Header.Set("os", runtime.GOOS)
req.Header.Set("version", Engine.Version)
req.Header.Set("uuid", UUID)
contentBuf := bytes.NewBuffer(nil)
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://logs-01.loggly.com/inputs/758a662d-f630-40cb-95ed-2502a5e9c872/tag/monibuca/", nil)
req.Header.Set("Content-Type", "application/json")
content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"`, UUID, Engine.Version, runtime.GOOS, runtime.GOARCH)
var c http.Client
for {
req.Header.Set("streams", fmt.Sprintf("%d", Streams.Len()))
contentBuf.Reset()
postJson := fmt.Sprintf(`%s,"streams":%d}`, content, len(Streams.Map))
contentBuf.WriteString(postJson)
req.Body = ioutil.NopCloser(contentBuf)
c.Do(req)
select {
case <-ctx.Done():

View File

@@ -11,9 +11,9 @@ import (
"github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/log"
"github.com/Monibuca/engine/v4/track"
"github.com/Monibuca/engine/v4/util"
. "github.com/logrusorgru/aurora"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
@@ -34,9 +34,9 @@ func InstallPlugin(config config.Plugin) *Plugin {
return nil
}
if config != EngineConfig {
plugin.Entry = log.WithField("plugin", name)
plugin.Logger = log.With(zap.String("plugin", name))
Plugins[name] = plugin
plugin.Info(Green("install"), BrightBlue(plugin.Version))
plugin.Info("install", zap.String("version", plugin.Version))
}
return plugin
}
@@ -50,13 +50,13 @@ type Plugin struct {
Version string //插件版本
RawConfig config.Config //配置的map形式方便查询
Modified config.Config //修改过的配置项
*logrus.Entry
*zap.Logger
}
type PushPlugin interface {
PushStream(*Stream, string, *config.Push)
PushStream(Pusher)
}
type PullPlugin interface {
PullStream(string, string, *config.Pull) bool
PullStream(Puller)
}
func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
@@ -75,12 +75,12 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter,
if opt != Engine {
pattern = "/" + strings.ToLower(opt.Name) + pattern
}
opt.Info("http handle added:", pattern)
opt.Info("http handle added:" + pattern)
EngineConfig.HandleFunc(pattern, func(rw http.ResponseWriter, r *http.Request) {
if cors {
util.CORS(rw, r)
}
opt.Debug(r.RemoteAddr, " -> ", pattern)
opt.Debug("visit", zap.String("path", pattern), zap.String("remote", r.RemoteAddr))
handler(rw, r)
})
}
@@ -134,16 +134,16 @@ func (opt *Plugin) autoPull() {
reflect.ValueOf(&pullConfig).Elem().Set(v.Field(i))
for streamPath, url := range pullConfig.PullList {
if pullConfig.PullOnStart {
opt.Config.(PullPlugin).PullStream(streamPath, url, &pullConfig)
opt.Config.(PullPlugin).PullStream(Puller{&pullConfig, streamPath, url, 0})
} else if pullConfig.PullOnSubscribe {
PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), url, &pullConfig}
PullOnSubscribeList[streamPath] = PullOnSubscribe{opt.Config.(PullPlugin), Puller{&pullConfig, streamPath, url, 0}}
}
}
} else if name == "Push" {
var pushConfig config.Push
reflect.ValueOf(&pushConfig).Elem().Set(v.Field(i))
for streamPath, url := range pushConfig.PushList {
PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), url, &pushConfig})
PushOnPublishList[streamPath] = append(PushOnPublishList[streamPath], PushOnPublish{opt.Config.(PushPlugin), Pusher{&pushConfig, streamPath, url, 0}})
}
}
}
@@ -186,7 +186,16 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) bool {
if !ok {
conf = EngineConfig
}
return pub.receive(streamPath, pub, conf.GetPublishConfig())
if ok = pub.receive(streamPath, pub, conf.GetPublishConfig()); ok {
p := pub.GetPublisher()
unA := track.UnknowAudio{}
unA.Stream = p.Stream
p.AudioTrack = &unA
unV := track.UnknowVideo{}
unV.Stream = p.Stream
p.VideoTrack = &unV
}
return ok
}
func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool {
@@ -194,5 +203,9 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) bool {
if !ok {
conf = EngineConfig
}
return sub.receive(streamPath, sub, conf.GetSubscribeConfig())
if ok = sub.receive(streamPath, sub, conf.GetSubscribeConfig()); ok {
p := sub.GetSubscriber()
p.TrackPlayer.Context, p.TrackPlayer.CancelFunc = context.WithCancel(p.IO)
}
return ok
}

View File

@@ -7,15 +7,21 @@ import (
type IPublisher interface {
IIO
receive(string, IPublisher, *config.Publish) bool
GetPublisher() *Publisher
receive(string, any, *config.Publish) bool
Unpublish()
}
type Publisher struct {
IO[config.Publish, IPublisher]
IO[config.Publish]
common.AudioTrack
common.VideoTrack
}
func (p *Publisher) GetPublisher() *Publisher {
return p
}
func (p *Publisher) Unpublish() {
p.bye(p)
}
@@ -24,10 +30,10 @@ type PullEvent int
// 用于远程拉流的发布者
type Puller struct {
Publisher
Config *config.Pull
RemoteURL string
PullCount int
Config *config.Pull
StreamPath string
RemoteURL string
PullCount int
}
// 是否需要重连

View File

@@ -162,14 +162,14 @@ func (r *Stream) action(action StreamAction) bool {
Bus.Publish(Event_REQUEST_PUBLISH, r)
r.timeout.Reset(r.WaitTimeout)
if _, ok = PullOnSubscribeList[r.Path]; ok {
PullOnSubscribeList[r.Path].Pull(r.Path)
PullOnSubscribeList[r.Path].Pull()
}
case STATE_PUBLISHING:
r.timeout.Reset(time.Second) // 秒级心跳检测track的存活度
Bus.Publish(Event_PUBLISH, r)
if v, ok := PushOnPublishList[r.Path]; ok {
for _, v := range v {
v.Push(r)
v.Push()
}
}
case STATE_WAITCLOSE:
@@ -243,7 +243,7 @@ func (s *Stream) run() {
if ok {
switch v := action.(type) {
case IPublisher:
if v.Err() != nil {
if v.IsClosed() {
s.action(ACTION_PUBLISHLOST)
} else if s.action(ACTION_PUBLISH) {
s.Publisher = v
@@ -272,14 +272,14 @@ func (s *Stream) run() {
case StreamAction:
s.action(v)
case ISubscriber:
if v.Err() == nil {
if !v.IsClosed() {
s.Subscribers.Add(v)
if wt := v.GetSubscribeConfig().WaitTimeout.Duration(); wt > s.WaitTimeout {
s.WaitTimeout = wt
}
v.OnEvent(s) // 通知Subscriber已成功进入Stream
Bus.Publish(Event_SUBSCRIBE, v)
v.Info(Sprintf(Yellow("added remains:%d"), len(s.Subscribers)))
s.Info("suber added", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
}
@@ -288,7 +288,7 @@ func (s *Stream) run() {
}
} else if s.Subscribers.Delete(v) {
Bus.Publish(Event_UNSUBSCRIBE, v)
v.Info(Sprintf(Yellow("removed remains:%d"), len(s.Subscribers)))
s.Info("suber removed", zap.String("id", v.getID()), zap.String("type", v.getType()), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有订阅者离开在回调中可以去获取订阅者数量
}
@@ -329,20 +329,6 @@ func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) {
at.Stream = r
return
}
func (r *Stream) NewH264Track() *track.H264 {
r.Debug("create h264 track")
return track.NewH264(r)
}
func (r *Stream) NewH265Track() *track.H265 {
r.Debug("create h265 track")
return track.NewH265(r)
}
func (r *Stream) NewAACTrack() *track.AAC {
r.Debug("create aac track")
return track.NewAAC(r)
}
// func (r *Stream) WaitDataTrack(names ...string) DataTrack {
// t := <-r.WaitTrack(names...)

View File

@@ -1,6 +1,7 @@
package engine
import (
"context"
"time"
. "github.com/Monibuca/engine/v4/common"
@@ -12,20 +13,31 @@ type AudioFrame AVFrame[AudioSlice]
type VideoFrame AVFrame[NALUSlice]
type ISubscriber interface {
IIO
receive(string, ISubscriber, *config.Subscribe) bool
receive(string, any, *config.Subscribe) bool
config.SubscribeConfig
GetSubscriber() *Subscriber
Unsubscribe()
}
// Subscriber 订阅者实体定义
type Subscriber struct {
IO[config.Subscribe, ISubscriber]
type TrackPlayer struct {
context.Context
context.CancelFunc
AudioTrack *track.Audio
VideoTrack *track.Video
vr *AVRing[NALUSlice]
ar *AVRing[AudioSlice]
}
func (p *Publisher) Unsubscribe() {
// Subscriber 订阅者实体定义
type Subscriber struct {
IO[config.Subscribe]
TrackPlayer
}
func (p *Subscriber) GetSubscriber() *Subscriber {
return p
}
func (p *Subscriber) Unsubscribe() {
p.bye(p)
}
@@ -46,28 +58,41 @@ func (s *Subscriber) OnEvent(event any) any {
return event
}
func (s *Subscriber) AcceptTrack(t Track) {
func (s *Subscriber) AddTrack(t Track) bool {
if v, ok := t.(*track.Video); ok {
s.VideoTrack = v
s.vr = v.ReadRing()
go s.play()
if s.Config.SubVideo {
if s.VideoTrack != nil {
return false
}
s.VideoTrack = v
s.vr = v.ReadRing()
return true
}
} else if a, ok := t.(*track.Audio); ok {
s.AudioTrack = a
s.ar = a.ReadRing()
if !s.Config.SubVideo {
go s.play()
if s.Config.SubAudio {
if s.AudioTrack != nil {
return false
}
s.AudioTrack = a
s.ar = a.ReadRing()
return true
}
}
return false
// TODO: data track
}
func (s *Subscriber) IsPlaying() bool {
return s.TrackPlayer.Err() == nil && (s.AudioTrack != nil || s.VideoTrack != nil)
}
//Play 开始播放
func (s *Subscriber) play() {
func (s *Subscriber) Play() {
var t time.Time
for s.Err() == nil {
for s.TrackPlayer.Err() == nil {
if s.vr != nil {
for {
vp := s.vr.Read(s)
vp := s.vr.Read(s.TrackPlayer)
s.OnEvent((*VideoFrame)(vp))
s.vr.MoveNext()
if vp.Timestamp.After(t) {
@@ -78,7 +103,7 @@ func (s *Subscriber) play() {
}
if s.ar != nil {
for {
ap := s.ar.Read(s)
ap := s.ar.Read(s.TrackPlayer)
s.OnEvent((*AudioFrame)(ap))
s.ar.MoveNext()
if ap.Timestamp.After(t) {
@@ -91,9 +116,10 @@ func (s *Subscriber) play() {
return
}
type PushEvent int
type Pusher struct {
Subscriber
Config *config.Push
StreamPath string
RemoteURL string
PushCount int
}

View File

@@ -11,6 +11,7 @@ import (
)
func NewAAC(stream IStream) (aac *AAC) {
stream.Debug("create aac track")
aac = &AAC{}
aac.Name = "aac"
aac.Stream = stream

View File

@@ -80,6 +80,14 @@ type UnknowAudio struct {
AudioTrack
}
func (ua *UnknowAudio) GetName() string {
return ua.Base.GetName()
}
func (ua *UnknowAudio) Flush() {
ua.AudioTrack.Flush()
}
func (ua *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) {
if ua.AudioTrack == nil {
codecID := frame.AudioCodecID()

View File

@@ -16,6 +16,7 @@ type H264 struct {
}
func NewH264(stream IStream) (vt *H264) {
stream.Debug("create h264 track")
vt = &H264{}
vt.Name = "h264"
vt.CodecID = codec.CodecID_H264

View File

@@ -15,6 +15,7 @@ type H265 struct {
}
func NewH265(stream IStream) (vt *H265) {
stream.Debug("create h265 track")
vt = &H265{}
vt.Name = "h265"
vt.CodecID = codec.CodecID_H265

View File

@@ -151,6 +151,14 @@ type UnknowVideo struct {
VideoTrack
}
func (uv *UnknowVideo) GetName() string {
return uv.Base.GetName()
}
func (uv *UnknowVideo) Flush() {
uv.VideoTrack.Flush()
}
/*
Access Unit的首个nalu是4字节起始码。
这里举个例子说明用JM可以生成这样一段码流不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件: