实现远程控制台的通讯机制

This commit is contained in:
dexter
2022-03-12 22:35:17 +08:00
parent 204c37291c
commit c40665125f
7 changed files with 110 additions and 30 deletions

View File

@@ -1,6 +1,15 @@
package config
import "net/http"
import (
"context"
"io"
"net/http"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"m7s.live/engine/v4/log"
)
type PublishConfig interface {
GetPublishConfig() *Publish
@@ -62,6 +71,7 @@ type Push struct {
RePush int // 断开后自动重推,0 表示不自动重推,-1 表示无限重推高于0 的数代表最大重推次数
PushList map[string]string // 自动推流列表
}
func (p *Push) GetPushConfig() *Push {
return p
}
@@ -78,16 +88,65 @@ type Engine struct {
Subscribe
HTTP
RTPReorder bool
EnableAVCC bool //启用AVCC格式rtmp协议使用
EnableRTP bool //启用RTP格式rtsp、gb18181等协议使用
EnableFLV bool //开启FLV格式hdl协议使用
EnableAVCC bool //启用AVCC格式rtmp协议使用
EnableRTP bool //启用RTP格式rtsp、gb18181等协议使用
EnableFLV bool //开启FLV格式hdl协议使用
ConsoleURL string //远程控制台地址
Secret string //远程控制台密钥
}
type myResponseWriter struct {
io.Writer
}
func (w *myResponseWriter) Write(b []byte) (int, error) {
return len(b), wsutil.WriteClientMessage(w, ws.OpBinary, b)
}
func (w *myResponseWriter) Header() http.Header {
return make(http.Header)
}
func (w *myResponseWriter) WriteHeader(statusCode int) {
}
func (cfg *Engine) OnEvent(event any) {
switch v := event.(type) {
case context.Context:
go func() {
for {
conn, _, _, err := ws.Dial(v, cfg.ConsoleURL)
wr := &myResponseWriter{conn}
if err != nil {
log.Error("connect to console server error:", err)
time.Sleep(time.Second * 5)
continue
}
err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(cfg.Secret))
if err != nil {
time.Sleep(time.Second * 5)
continue
}
for {
msg, _, err := wsutil.ReadServerData(conn)
if err != nil {
log.Error("read console server error:", err)
break
} else {
req, err := http.NewRequest("GET", string(msg), nil)
if err != nil {
log.Error("receive console request :", msg, err)
break
}
h, _ := cfg.mux.Handler(req)
h.ServeHTTP(wr, req)
}
}
}
}()
}
}
var Global = &Engine{
Publish{true, true, false, 10, 0},
Subscribe{true, true, false, 10},
HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux},
false, true, true, true,
false, true, true, true, "wss://console.monibuca.com", "",
}

19
io.go
View File

@@ -59,14 +59,7 @@ func (i *IO[C, S]) SetParentCtx(parent context.Context) {
}
func (i *IO[C, S]) OnEvent(event any) {
switch v := event.(type) {
case *Stream:
i.Stream = v
i.StartTime = time.Now()
i.Logger = v.With(zap.String("type", i.Type))
if i.ID != "" {
i.Logger = i.Logger.With(zap.String("ID", i.ID))
}
switch event.(type) {
case SEclose, SEKick:
if i.Closer != nil {
i.Closer.Close()
@@ -76,11 +69,9 @@ func (i *IO[C, S]) OnEvent(event any) {
}
}
}
func (io *IO[C, S]) getID() string {
return io.ID
}
func (io *IO[C, S]) getType() string {
return io.Type
func (io *IO[C, S]) getIO() *IO[C, S] {
return io
}
func (io *IO[C, S]) GetConfig() *C {
@@ -91,8 +82,6 @@ type IIO interface {
IsClosed() bool
OnEvent(any)
Stop()
getID() string
getType() string
}
//Stop 停止订阅或者发布,由订阅者或者发布者调用

View File

@@ -77,6 +77,9 @@ func Run(ctx context.Context, configFile string) (err error) {
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://logs-01.loggly.com/inputs/758a662d-f630-40cb-95ed-2502a5e9c872/tag/monibuca/", nil)
req.Header.Set("Content-Type", "application/json")
content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"`, UUID, Engine.Version, runtime.GOOS, runtime.GOARCH)
if EngineConfig.Secret != "" {
EngineConfig.OnEvent(ctx)
}
var c http.Client
for {
select {

View File

@@ -170,6 +170,7 @@ func (opt *Plugin) Save() error {
}
func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
opt.Info("publish", zap.String("path", streamPath))
conf, ok := opt.Config.(config.PublishConfig)
if !ok {
conf = EngineConfig
@@ -178,6 +179,7 @@ func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
}
func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
opt.Info("subscribe", zap.String("path", streamPath))
conf, ok := opt.Config.(config.SubscribeConfig)
if !ok {
conf = EngineConfig
@@ -185,9 +187,17 @@ func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
return sub.receive(streamPath, sub, conf.GetSubscribeConfig())
}
func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber) (err error) {
if err = opt.Subscribe(streamPath, sub); err == nil {
sub.PlayBlock(sub)
}
return
}
var NoPullConfigErr = errors.New("no pull config")
func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool) (err error) {
opt.Info("pull", zap.String("path", streamPath), zap.String("url", url))
conf, ok := opt.Config.(config.PullConfig)
if !ok {
return NoPullConfigErr
@@ -231,8 +241,11 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool
}
return
}
var NoPushConfigErr = errors.New("no push config")
func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error) {
opt.Info("push", zap.String("path", streamPath), zap.String("url", url))
conf, ok := opt.Config.(config.PushConfig)
if !ok {
return NoPushConfigErr

View File

@@ -9,6 +9,7 @@ type IPublisher interface {
IIO
GetConfig() *config.Publish
receive(string, IPublisher, *config.Publish) error
getIO() *IO[config.Publish, IPublisher]
}
type Publisher struct {
@@ -18,10 +19,10 @@ type Publisher struct {
}
func (p *Publisher) OnEvent(event any) {
switch v := event.(type) {
case *Stream:
p.AudioTrack = v.NewAudioTrack()
p.VideoTrack = v.NewVideoTrack()
switch event.(type) {
case IPublisher:
p.AudioTrack = p.Stream.NewAudioTrack()
p.VideoTrack = p.Stream.NewVideoTrack()
}
p.IO.OnEvent(event)
}

View File

@@ -247,7 +247,7 @@ func (s *Stream) run() {
for i, sub := range s.Subscribers {
if sub.IsClosed() {
s.Subscribers = append(s.Subscribers[:(i-deletes)], s.Subscribers[i-deletes+1:]...)
s.Info("suber -1", zap.String("id", sub.getID()), zap.String("type", sub.getType()), zap.Int("remains", len(s.Subscribers)))
s.Info("suber -1", zap.String("id", sub.getIO().ID), zap.String("type", sub.getIO().Type), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开在回调中可以去获取订阅者数量
}
@@ -271,7 +271,14 @@ func (s *Stream) run() {
case *util.Promise[IPublisher, struct{}]:
s.Publisher = v.Value
if s.action(ACTION_PUBLISH) {
s.Publisher.OnEvent(s) // 通知Publisher已成功进入Stream
io := v.Value.getIO()
io.Stream = s
io.StartTime = time.Now()
io.Logger = s.With(zap.String("type", io.Type))
if io.ID != "" {
io.Logger = io.Logger.With(zap.String("ID", io.ID))
}
v.Value.OnEvent(v.Value) // 发出成功发布事件
v.Resolve(util.Null)
for _, p := range waitP {
p.Resolve(util.Null)
@@ -286,13 +293,19 @@ func (s *Stream) run() {
v.Reject(StreamIsClosedErr)
}
suber := v.Value
io := suber.getIO()
s.Subscribers = append(s.Subscribers, suber)
sbConfig := suber.GetConfig()
sbConfig := io.Config
if wt := sbConfig.WaitTimeout.Duration(); wt > s.WaitTimeout {
s.WaitTimeout = wt
}
suber.OnEvent(s) // 通知Subscriber已成功进入Stream
s.Info("suber +1", zap.String("id", suber.getID()), zap.String("type", suber.getType()), zap.Int("remains", len(s.Subscribers)))
io.Stream = s
io.StartTime = time.Now()
io.Logger = s.With(zap.String("type", io.Type))
if io.ID != "" {
io.Logger = io.Logger.With(zap.String("ID", io.ID))
}
s.Info("suber +1", zap.String("id", io.ID), zap.String("type", io.Type), zap.Int("remains", len(s.Subscribers)))
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
for _, t := range s.Tracks {
@@ -311,6 +324,7 @@ func (s *Stream) run() {
} else {
waitP = append(waitP, v)
}
suber.OnEvent(suber) // 订阅成功事件
v.Resolve(util.Null)
if len(s.Subscribers) == 1 {
s.action(ACTION_FIRSTENTER)

View File

@@ -57,6 +57,7 @@ func (a VideoDeConf) GetAVCC() net.Buffers {
type ISubscriber interface {
IIO
receive(string, ISubscriber, *config.Subscribe) error
getIO() *IO[config.Subscribe, ISubscriber]
GetConfig() *config.Subscribe
IsPlaying() bool
Play(ISubscriber) func() error