first commit

This commit is contained in:
dexter
2022-05-10 22:00:48 +08:00
parent abe0904343
commit d4f76fd1c4
3 changed files with 279 additions and 1 deletions

125
README.md
View File

@@ -1,2 +1,125 @@
# plugin-hook
# Hook 插件
WebHook For Monibuca
## 插件地址
https://github.com/Monibuca/plugin-hook
## 插件引入
```go
import (
_ "m7s.live/plugin/hook/v4"
)
```
## 配置
```yaml
hook:
keepalive: 0 # 定时发送心跳请求单位秒默认0不开启
retrytimes: 3 # 重试次数
baseurl: "" # url前缀
header: {} # 自定义HTTP请求头
urllist: {} # url列表
requestlist: {} # 请求列表
extra: {} # 额外自定义传输数据
```
其中 urllist 是简单的地址列表,例如:
```yaml
hook:
urllist:
"*": "http://www.example.com" # 任意时间均会发送请求
startup: "http://www.example.com" # m7s启动时发送请求
publish: "http://www.example.com/publish" # 发布时发送请求
subscribe: "http://www.example.com/subscribe" # 订阅时发送请求
streamClose: "http://www.example.com/streamClose" # 流关闭时发送请求
keepalive: "http://www.example.com/keepalive" # 心跳时发送请求
```
requestlist 是需要单独配置 Method 和 header 时用到的
```yaml
hook:
requestlist:
"*":
method: GET
header:
referer: http://www.example.com
url: "http://www.example.com"
```
## 发送请求数据
默认使用 POST 发送一个 json 数据:
```json
{
"stream": {
"StartTime": "0001-01-01T00:00:00Z",
"WaitTimeout": 10000000000,
"PublishTimeout": 10000000000,
"WaitCloseTimeout": 0,
"Path": "live/test",
"Publisher": {
"ID": "",
"Type": "RTMPReceiver",
"StartTime": "2022-05-03T13:00:22.5353264+08:00",
"Args": {},
"StreamID": 1
},
"State": 1,
"Subscribers": [
{
"ID": "",
"Type": "RTSPSubscriber",
"StartTime": "2022-05-03T13:00:23.8753554+08:00",
"Args": {}
}
],
"Tracks": {
"aac": {
"Name": "aac",
"BPS": 72480,
"SampleRate": 44100,
"SampleSize": 16,
"CodecID": 10,
"Channels": 2,
"AVCCHead": "rwE=",
"Profile": 2
},
"h264": {
"Name": "h264",
"BPS": 2226142,
"SampleRate": 90000,
"SampleSize": 0,
"CodecID": 7,
"SPSInfo": {
"ProfileIdc": 66,
"LevelIdc": 31,
"MbWidth": 80,
"MbHeight": 45,
"CropLeft": 0,
"CropRight": 0,
"CropTop": 0,
"CropBottom": 0,
"Width": 1280,
"Height": 720
},
"GOP": 27
}
},
"AppName": "live",
"StreamName": "test"
},
"extra": {},
"event": "publish",
"time": 1257894000
}
```
如果指定为 GET 方法,则不会发送 json
而是将 event 和 streamPath 加在 url 后面

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module m7s.live/plugin/hook/v4
go 1.18

152
main.go Normal file
View File

@@ -0,0 +1,152 @@
package hook // import "m7s.live/plugin/hook/v4"
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
)
type HookData struct {
*Stream `json:"stream"`
Extra map[string]interface{} `json:"extra"`
Event string `json:"event"` //事件名称
Time int64 `json:"time"` //调用时间
}
type HookAddr struct {
URL string
Header map[string]string
Method string
}
type HookConfig struct {
KeepAlive int
RetryTimes int
BaseURL string
Header map[string]string
URLList map[string]string
RequestList map[string]*HookAddr
Extra map[string]interface{}
}
var plugin = InstallPlugin(&HookConfig{
RetryTimes: 3,
})
func (h *HookConfig) OnEvent(event any) {
switch v := event.(type) {
case FirstConfig:
if h.BaseURL != "" {
for k, u := range h.URLList {
if !strings.HasSuffix(u, "http") {
h.URLList[k] = h.BaseURL + u
}
}
for _, u := range h.RequestList {
if !strings.HasSuffix(u.URL, "http") {
u.URL = h.BaseURL + u.URL
}
if u.Header != nil {
if h.Header != nil {
for k, v := range h.Header {
u.Header[k] = v
}
}
} else {
u.Header = h.Header
}
}
}
for k, u := range h.URLList {
if _, ok := h.RequestList[k]; !ok {
h.RequestList[k] = &HookAddr{URL: u, Method: "POST", Header: h.Header}
}
}
h.request("startup", nil)
var heartreq []*HookAddr
if v, ok := h.RequestList["keepalive"]; ok {
heartreq = append(heartreq, v)
}
if v, ok := h.RequestList["*"]; ok {
heartreq = append(heartreq, v)
}
if h.KeepAlive > 0 && len(heartreq) > 0 {
go func(addrs ...*HookAddr) {
for {
for _, addr := range addrs {
h.doRequest(addr, &HookData{
Event: "keepalive",
})
}
time.Sleep(time.Second * time.Duration(h.KeepAlive))
}
}(heartreq...)
}
case SEpublish:
h.request("publish", v.Stream)
case SEclose:
h.request("streamClose", v.Stream)
case ISubscriber:
h.request("subscribe", v.GetIO().Stream)
}
}
func (h *HookConfig) request(event string, stream *Stream) {
if req, ok := h.RequestList[event]; ok {
go h.doRequest(req, &HookData{
Stream: stream,
Event: event,
})
}
if req, ok := h.RequestList["*"]; ok {
go h.doRequest(req, &HookData{
Stream: stream,
Event: event,
})
}
}
func (h *HookConfig) doRequest(req *HookAddr, data *HookData) (err error) {
data.Time = time.Now().Unix()
data.Extra = h.Extra
param, _ := json.Marshal(data)
var resp *http.Response
// Execute the request
return util.Retry(h.RetryTimes, time.Second, func() error {
if req.Method == "GET" {
r, _ := http.NewRequest(req.Method, req.URL, nil)
r.URL.Query().Set("event", data.Event)
if data.Stream != nil {
r.URL.Query().Set("stream", data.Stream.Path)
} else {
}
resp, err = http.DefaultClient.Do(r)
} else {
resp, err = http.DefaultClient.Post(req.URL, "application/json", bytes.NewBuffer(param))
}
if err != nil {
// Retry
log.Warnf("post %s error: %s", req.URL, err.Error())
return err
}
defer resp.Body.Close()
s := resp.StatusCode
switch {
case s >= 500:
// Retry
return fmt.Errorf("server %s error: %v", req.URL, s)
case s >= 400:
// Don't retry, it was client's fault
return util.RetryStopErr(fmt.Errorf("client %s error: %v", req.URL, s))
default:
// Happy
return nil
}
})
}