diff --git a/config/types.go b/config/types.go index 027a348..a804b27 100644 --- a/config/types.go +++ b/config/types.go @@ -1,6 +1,15 @@ package config -import "net/http" +import ( + "context" + "io" + "net/http" + "time" + + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" + "m7s.live/engine/v4/log" +) type PublishConfig interface { GetPublishConfig() *Publish @@ -62,6 +71,7 @@ type Push struct { RePush int // 断开后自动重推,0 表示不自动重推,-1 表示无限重推,高于0 的数代表最大重推次数 PushList map[string]string // 自动推流列表 } + func (p *Push) GetPushConfig() *Push { return p } @@ -78,16 +88,65 @@ type Engine struct { Subscribe HTTP RTPReorder bool - EnableAVCC bool //启用AVCC格式,rtmp协议使用 - EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 - EnableFLV bool //开启FLV格式,hdl协议使用 + EnableAVCC bool //启用AVCC格式,rtmp协议使用 + EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 + EnableFLV bool //开启FLV格式,hdl协议使用 + ConsoleURL string //远程控制台地址 + Secret string //远程控制台密钥 +} +type myResponseWriter struct { + io.Writer +} + +func (w *myResponseWriter) Write(b []byte) (int, error) { + return len(b), wsutil.WriteClientMessage(w, ws.OpBinary, b) +} + +func (w *myResponseWriter) Header() http.Header { + return make(http.Header) +} +func (w *myResponseWriter) WriteHeader(statusCode int) { } func (cfg *Engine) OnEvent(event any) { - + switch v := event.(type) { + case context.Context: + go func() { + for { + conn, _, _, err := ws.Dial(v, cfg.ConsoleURL) + wr := &myResponseWriter{conn} + if err != nil { + log.Error("connect to console server error:", err) + time.Sleep(time.Second * 5) + continue + } + err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(cfg.Secret)) + if err != nil { + time.Sleep(time.Second * 5) + continue + } + for { + msg, _, err := wsutil.ReadServerData(conn) + if err != nil { + log.Error("read console server error:", err) + break + } else { + req, err := http.NewRequest("GET", string(msg), nil) + if err != nil { + log.Error("receive console request :", msg, err) + break + } + h, _ := cfg.mux.Handler(req) + h.ServeHTTP(wr, req) + } + } + } + }() + } } + var Global = &Engine{ Publish{true, true, false, 10, 0}, Subscribe{true, true, false, 10}, HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux}, - false, true, true, true, + false, true, true, true, "wss://console.monibuca.com", "", } diff --git a/io.go b/io.go index b16ac1b..375a1e5 100644 --- a/io.go +++ b/io.go @@ -59,14 +59,7 @@ func (i *IO[C, S]) SetParentCtx(parent context.Context) { } func (i *IO[C, S]) OnEvent(event any) { - switch v := event.(type) { - case *Stream: - i.Stream = v - i.StartTime = time.Now() - i.Logger = v.With(zap.String("type", i.Type)) - if i.ID != "" { - i.Logger = i.Logger.With(zap.String("ID", i.ID)) - } + switch event.(type) { case SEclose, SEKick: if i.Closer != nil { i.Closer.Close() @@ -76,11 +69,9 @@ func (i *IO[C, S]) OnEvent(event any) { } } } -func (io *IO[C, S]) getID() string { - return io.ID -} -func (io *IO[C, S]) getType() string { - return io.Type + +func (io *IO[C, S]) getIO() *IO[C, S] { + return io } func (io *IO[C, S]) GetConfig() *C { @@ -91,8 +82,6 @@ type IIO interface { IsClosed() bool OnEvent(any) Stop() - getID() string - getType() string } //Stop 停止订阅或者发布,由订阅者或者发布者调用 diff --git a/main.go b/main.go index d795d2f..0844b7f 100644 --- a/main.go +++ b/main.go @@ -77,6 +77,9 @@ func Run(ctx context.Context, configFile string) (err error) { req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://logs-01.loggly.com/inputs/758a662d-f630-40cb-95ed-2502a5e9c872/tag/monibuca/", nil) req.Header.Set("Content-Type", "application/json") content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"`, UUID, Engine.Version, runtime.GOOS, runtime.GOARCH) + if EngineConfig.Secret != "" { + EngineConfig.OnEvent(ctx) + } var c http.Client for { select { diff --git a/plugin.go b/plugin.go index b23e49e..a8579c8 100644 --- a/plugin.go +++ b/plugin.go @@ -170,6 +170,7 @@ func (opt *Plugin) Save() error { } func (opt *Plugin) Publish(streamPath string, pub IPublisher) error { + opt.Info("publish", zap.String("path", streamPath)) conf, ok := opt.Config.(config.PublishConfig) if !ok { conf = EngineConfig @@ -178,6 +179,7 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) error { } func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { + opt.Info("subscribe", zap.String("path", streamPath)) conf, ok := opt.Config.(config.SubscribeConfig) if !ok { conf = EngineConfig @@ -185,9 +187,17 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { return sub.receive(streamPath, sub, conf.GetSubscribeConfig()) } +func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber) (err error) { + if err = opt.Subscribe(streamPath, sub); err == nil { + sub.PlayBlock(sub) + } + return +} + var NoPullConfigErr = errors.New("no pull config") func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool) (err error) { + opt.Info("pull", zap.String("path", streamPath), zap.String("url", url)) conf, ok := opt.Config.(config.PullConfig) if !ok { return NoPullConfigErr @@ -231,8 +241,11 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool } return } + var NoPushConfigErr = errors.New("no push config") + func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error) { + opt.Info("push", zap.String("path", streamPath), zap.String("url", url)) conf, ok := opt.Config.(config.PushConfig) if !ok { return NoPushConfigErr diff --git a/publisher.go b/publisher.go index f4404e4..6c4937e 100644 --- a/publisher.go +++ b/publisher.go @@ -9,6 +9,7 @@ type IPublisher interface { IIO GetConfig() *config.Publish receive(string, IPublisher, *config.Publish) error + getIO() *IO[config.Publish, IPublisher] } type Publisher struct { @@ -18,10 +19,10 @@ type Publisher struct { } func (p *Publisher) OnEvent(event any) { - switch v := event.(type) { - case *Stream: - p.AudioTrack = v.NewAudioTrack() - p.VideoTrack = v.NewVideoTrack() + switch event.(type) { + case IPublisher: + p.AudioTrack = p.Stream.NewAudioTrack() + p.VideoTrack = p.Stream.NewVideoTrack() } p.IO.OnEvent(event) } diff --git a/stream.go b/stream.go index a0058c6..75cfe8d 100644 --- a/stream.go +++ b/stream.go @@ -247,7 +247,7 @@ func (s *Stream) run() { for i, sub := range s.Subscribers { if sub.IsClosed() { s.Subscribers = append(s.Subscribers[:(i-deletes)], s.Subscribers[i-deletes+1:]...) - s.Info("suber -1", zap.String("id", sub.getID()), zap.String("type", sub.getType()), zap.Int("remains", len(s.Subscribers))) + s.Info("suber -1", zap.String("id", sub.getIO().ID), zap.String("type", sub.getIO().Type), zap.Int("remains", len(s.Subscribers))) if s.Publisher != nil { s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量 } @@ -271,7 +271,14 @@ func (s *Stream) run() { case *util.Promise[IPublisher, struct{}]: s.Publisher = v.Value if s.action(ACTION_PUBLISH) { - s.Publisher.OnEvent(s) // 通知Publisher已成功进入Stream + io := v.Value.getIO() + io.Stream = s + io.StartTime = time.Now() + io.Logger = s.With(zap.String("type", io.Type)) + if io.ID != "" { + io.Logger = io.Logger.With(zap.String("ID", io.ID)) + } + v.Value.OnEvent(v.Value) // 发出成功发布事件 v.Resolve(util.Null) for _, p := range waitP { p.Resolve(util.Null) @@ -286,13 +293,19 @@ func (s *Stream) run() { v.Reject(StreamIsClosedErr) } suber := v.Value + io := suber.getIO() s.Subscribers = append(s.Subscribers, suber) - sbConfig := suber.GetConfig() + sbConfig := io.Config if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout { s.WaitTimeout = wt } - suber.OnEvent(s) // 通知Subscriber已成功进入Stream - s.Info("suber +1", zap.String("id", suber.getID()), zap.String("type", suber.getType()), zap.Int("remains", len(s.Subscribers))) + io.Stream = s + io.StartTime = time.Now() + io.Logger = s.With(zap.String("type", io.Type)) + if io.ID != "" { + io.Logger = io.Logger.With(zap.String("ID", io.ID)) + } + s.Info("suber +1", zap.String("id", io.ID), zap.String("type", io.Type), zap.Int("remains", len(s.Subscribers))) if s.Publisher != nil { s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 for _, t := range s.Tracks { @@ -311,6 +324,7 @@ func (s *Stream) run() { } else { waitP = append(waitP, v) } + suber.OnEvent(suber) // 订阅成功事件 v.Resolve(util.Null) if len(s.Subscribers) == 1 { s.action(ACTION_FIRSTENTER) diff --git a/subscriber.go b/subscriber.go index 383c9ae..43fb653 100644 --- a/subscriber.go +++ b/subscriber.go @@ -57,6 +57,7 @@ func (a VideoDeConf) GetAVCC() net.Buffers { type ISubscriber interface { IIO receive(string, ISubscriber, *config.Subscribe) error + getIO() *IO[config.Subscribe, ISubscriber] GetConfig() *config.Subscribe IsPlaying() bool Play(ISubscriber) func() error