diff --git a/config/config.go b/config/config.go index 7d90c29..9a040a3 100644 --- a/config/config.go +++ b/config/config.go @@ -67,11 +67,16 @@ func (config Config) Unmarshal(s any) { l := value.Len() s := reflect.MakeSlice(fv.Type(), l, value.Cap()) for i := 0; i < l; i++ { - fv := value.Field(i) + fv := value.Index(i) if fv.Type() == reflect.TypeOf(config) { - fv.FieldByName("Unmarshal").Call([]reflect.Value{s.Field(i)}) + fv.FieldByName("Unmarshal").Call([]reflect.Value{fv}) } else { - s.Field(i).Set(fv) + item := s.Index(i) + if fv.Kind() == reflect.Interface { + item.Set(reflect.ValueOf(fv.Interface()).Convert(item.Type())) + } else { + item.Set(fv) + } } } fv.Set(s) diff --git a/config/types.go b/config/types.go old mode 100644 new mode 100755 index 28b5eb0..46b2166 --- a/config/types.go +++ b/config/types.go @@ -31,9 +31,9 @@ type PushConfig interface { type Publish struct { PubAudio bool PubVideo bool - KickExist bool // 是否踢掉已经存在的发布者 - PublishTimeout int // 发布无数据超时 - WaitCloseTimeout int // 延迟自动关闭(无订阅时) + KickExist bool // 是否踢掉已经存在的发布者 + PublishTimeout int // 发布无数据超时 + WaitCloseTimeout int // 延迟自动关闭(无订阅时) } func (c *Publish) GetPublishConfig() *Publish { @@ -43,8 +43,8 @@ func (c *Publish) GetPublishConfig() *Publish { type Subscribe struct { SubAudio bool SubVideo bool - IFrameOnly bool // 只要关键帧 - WaitTimeout int // 等待流超时 + IFrameOnly bool // 只要关键帧 + WaitTimeout int // 等待流超时 } func (c *Subscribe) GetSubscribeConfig() *Subscribe { @@ -89,12 +89,13 @@ type Engine struct { Publish Subscribe HTTP - RTPReorder bool - EnableAVCC bool //启用AVCC格式,rtmp协议使用 - EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 - EnableFLV bool //开启FLV格式,hdl协议使用 - ConsoleURL string //远程控制台地址 - Secret string //远程控制台密钥 + RTPReorder bool + EnableAVCC bool //启用AVCC格式,rtmp协议使用 + EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用 + EnableFLV bool //开启FLV格式,hdl协议使用 + ConsoleURL string //远程控制台地址 + Secret string //远程控制台密钥 + HTTPCallback []string } type myResponseWriter struct { io.Writer @@ -153,4 +154,5 @@ var Global = &Engine{ Subscribe{true, true, false, 10}, HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux}, false, true, true, true, "wss://console.monibuca.com:8080", "", + []string{}, } diff --git a/http_callback.go b/http_callback.go new file mode 100755 index 0000000..dfe1bb8 --- /dev/null +++ b/http_callback.go @@ -0,0 +1,88 @@ +package engine + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" + + "m7s.live/engine/v4/config" + "m7s.live/engine/v4/log" + "m7s.live/engine/v4/util" +) + +const retryTimes = 3 + +type HttpCallbackData struct { + StreamName string `json:"stream_name"` //媒体流名称 + AppName string `json:"app_name"` + Event string `json:"event"` //事件名称 + Schema string `json:"schema"` //媒体流类型 + Time int64 `json:"time"` //调用时间 +} + +func doRequest(host string, data any) error { + param, _ := json.Marshal(data) + + // Execute the request + return util.Retry(retryTimes, time.Second, func() error { + resp, err := http.DefaultClient.Post(host, "application/json", bytes.NewBuffer(param)) + if err != nil { + // Retry + log.Warnf("post %s error: %s", host, err.Error()) + return err + } + defer resp.Body.Close() + + s := resp.StatusCode + switch { + case s >= 500: + // Retry + return fmt.Errorf("server %s error: %v", host, s) + case s >= 400: + // Don't retry, it was client's fault + return util.RetryStopErr(fmt.Errorf("client %s error: %v", host, s)) + default: + // Happy + return nil + } + }) +} + +func HttpCallbackEvent(event any) { + data := HttpCallbackData{} + var streamIo any + for _, endpoint := range EngineConfig.HTTPCallback { + switch e := event.(type) { + case SEclose: + data.Event = "close" + streamIo = e.Stream.Publisher.GetIO() + + case SEpublish: + data.Event = "publish" + streamIo = e.Stream.Publisher.GetIO() + + case ISubscriber: + data.Event = "subscribe" + streamIo = e.GetIO() + default: + } + if streamIo == nil { + return + } + switch s := streamIo.(type) { + case *IO[config.Publish, IPublisher]: + data.StreamName = s.Stream.StreamName + data.AppName = s.Stream.AppName + data.Schema = s.Type + case *IO[config.Subscribe, ISubscriber]: + data.StreamName = s.Stream.StreamName + data.AppName = s.Stream.AppName + data.Schema = s.Type + } + + data.Time = time.Now().Unix() + go doRequest(endpoint, data) + } +} diff --git a/main.go b/main.go old mode 100644 new mode 100755 index 43d5a2b..dd8e492 --- a/main.go +++ b/main.go @@ -88,6 +88,7 @@ func Run(ctx context.Context, configFile string) (err error) { for _, plugin := range Plugins { plugin.Config.OnEvent(event) } + HttpCallbackEvent(event) case <-ctx.Done(): return case <-reportTimer.C: diff --git a/util/retry.go b/util/retry.go new file mode 100755 index 0000000..ff81308 --- /dev/null +++ b/util/retry.go @@ -0,0 +1,39 @@ +package util + +import ( + "math/rand" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func Retry(attempts int, sleep time.Duration, f func() error) error { + if err := f(); err != nil { + if s, ok := err.(retryStop); ok { + // Return the original error for later checking + return s.error + } + + if attempts--; attempts > 0 { + // Add some randomness to prevent creating a Thundering Herd + jitter := time.Duration(rand.Int63n(int64(sleep))) + sleep = sleep + jitter/2 + + time.Sleep(sleep) + return Retry(attempts, 2*sleep, f) + } + return err + } + + return nil +} + +type retryStop struct { + error +} + +func RetryStopErr(err error) retryStop { + return retryStop{err} +}