mirror of
https://github.com/Monibuca/plugin-hook.git
synced 2025-12-24 10:40:51 +08:00
171 lines
4.7 KiB
Go
171 lines
4.7 KiB
Go
package hook // import "m7s.live/plugin/hook/v4"
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
"fmt"
|
||
"net/http"
|
||
"strings"
|
||
"time"
|
||
|
||
. "m7s.live/engine/v4"
|
||
"m7s.live/engine/v4/log"
|
||
"m7s.live/engine/v4/util"
|
||
)
|
||
|
||
type HookData struct {
|
||
*Stream `json:"stream"`
|
||
Extra map[string]any `json:"extra"`
|
||
Event string `json:"event"` //事件名称
|
||
Time int64 `json:"time"` //调用时间
|
||
}
|
||
|
||
type HookAddr struct {
|
||
URL string `desc:"上报远程地址"`
|
||
Header map[string]string `desc:"附加 HTTP 头"`
|
||
Method string `desc:"HTTP请求方法,默认为POST"`
|
||
}
|
||
|
||
type HookConfig struct {
|
||
KeepAlive time.Duration `desc:"保活间隔,0则不发送保活事件"`
|
||
RetryTimes int `default:"3" desc:"重试次数"`
|
||
BaseURL string `desc:"URL前缀,如果URL不是http开头,则会自动加上前缀"`
|
||
Header map[string]string `desc:"附加 HTTP 头" key:"名称" value:"值"`
|
||
RequestList map[string]any `desc:"请求列表是需要单独配置 Method 和 header 时用到的,key为事件名称(*代表全部),value为请求信息,可以是字符串也可以是键值对(可以包含 url、header、method"`
|
||
Extra map[string]any `desc:"附加的额外参数"`
|
||
requestList map[string]*HookAddr
|
||
}
|
||
|
||
var HookPlugin = InstallPlugin(&HookConfig{})
|
||
|
||
func (h *HookConfig) OnEvent(event any) {
|
||
switch v := event.(type) {
|
||
case FirstConfig:
|
||
h.requestList = make(map[string]*HookAddr)
|
||
for k, v := range h.RequestList {
|
||
switch u := v.(type) {
|
||
case string:
|
||
if !strings.HasSuffix(u, "http") && h.BaseURL != "" {
|
||
u = h.BaseURL + u
|
||
}
|
||
h.requestList[k] = &HookAddr{URL: u, Method: "POST", Header: h.Header}
|
||
case map[string]string:
|
||
var addr = HookAddr{
|
||
Method: u["method"],
|
||
Header: h.Header,
|
||
URL: u["url"],
|
||
}
|
||
if !strings.HasSuffix(addr.URL, "http") && h.BaseURL != "" {
|
||
addr.URL = h.BaseURL + addr.URL
|
||
}
|
||
h.requestList[k] = &addr
|
||
case map[string]any:
|
||
var addr HookAddr
|
||
for k, v := range u {
|
||
switch k {
|
||
case "url":
|
||
addr.URL = v.(string)
|
||
if !strings.HasSuffix(addr.URL, "http") && h.BaseURL != "" {
|
||
addr.URL = h.BaseURL + addr.URL
|
||
}
|
||
case "method":
|
||
addr.Method = v.(string)
|
||
case "header":
|
||
addr.Header = v.(map[string]string)
|
||
}
|
||
}
|
||
if addr.Header == nil {
|
||
addr.Header = h.Header
|
||
} else {
|
||
for k, v := range h.Header {
|
||
addr.Header[k] = v
|
||
}
|
||
}
|
||
h.requestList[k] = &addr
|
||
}
|
||
}
|
||
|
||
h.request(&HookData{Event: "startup"})
|
||
var heartreq []*HookAddr
|
||
if v, ok := h.requestList["keepalive"]; ok {
|
||
heartreq = append(heartreq, v)
|
||
}
|
||
if v, ok := h.requestList["*"]; ok {
|
||
heartreq = append(heartreq, v)
|
||
}
|
||
if h.KeepAlive > 0 && len(heartreq) > 0 {
|
||
go func(addrs ...*HookAddr) {
|
||
for {
|
||
for _, addr := range addrs {
|
||
h.doRequest(addr, &HookData{
|
||
Event: "keepalive",
|
||
})
|
||
}
|
||
time.Sleep(h.KeepAlive)
|
||
}
|
||
}(heartreq...)
|
||
}
|
||
case SEpublish:
|
||
h.request(&HookData{Event: "publish", Stream: v.Target})
|
||
case SEclose:
|
||
h.request(&HookData{Event: "streamClose", Stream: v.Target})
|
||
case ISubscriber:
|
||
h.request(&HookData{Event: "subscribe", Stream: v.GetSubscriber().Stream, Extra: map[string]any{"subscriber": v}})
|
||
case UnsubscribeEvent:
|
||
h.request(&HookData{Event: "unsubscribe", Stream: v.Target.GetSubscriber().Stream, Extra: map[string]any{"subscriber": v.Target}})
|
||
}
|
||
}
|
||
func (h *HookConfig) request(hookData *HookData) {
|
||
if req, ok := h.requestList[hookData.Event]; ok {
|
||
go h.doRequest(req, hookData)
|
||
}
|
||
if req, ok := h.requestList["*"]; ok {
|
||
go h.doRequest(req, hookData)
|
||
}
|
||
}
|
||
func (h *HookConfig) doRequest(req *HookAddr, data *HookData) (err error) {
|
||
data.Time = time.Now().Unix()
|
||
if data.Extra == nil {
|
||
data.Extra = h.Extra
|
||
} else {
|
||
for k, v := range h.Extra {
|
||
data.Extra[k] = v
|
||
}
|
||
}
|
||
param, _ := json.Marshal(data)
|
||
var resp *http.Response
|
||
// Execute the request
|
||
return util.Retry(h.RetryTimes, time.Second, func() error {
|
||
if req.Method == "GET" {
|
||
r, _ := http.NewRequest(req.Method, req.URL, nil)
|
||
r.URL.Query().Set("event", data.Event)
|
||
if data.Stream != nil {
|
||
r.URL.Query().Set("stream", data.Stream.Path)
|
||
} else {
|
||
}
|
||
resp, err = http.DefaultClient.Do(r)
|
||
} else {
|
||
resp, err = http.DefaultClient.Post(req.URL, "application/json", bytes.NewBuffer(param))
|
||
}
|
||
if err != nil {
|
||
// Retry
|
||
log.Warnf("post %s error: %s", req.URL, err.Error())
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
s := resp.StatusCode
|
||
switch {
|
||
case s >= 500:
|
||
// Retry
|
||
return fmt.Errorf("server %s error: %v", req.URL, s)
|
||
case s >= 400:
|
||
// Don't retry, it was client's fault
|
||
return util.RetryStopErr(fmt.Errorf("client %s error: %v", req.URL, s))
|
||
default:
|
||
// Happy
|
||
return nil
|
||
}
|
||
})
|
||
}
|