Files
m7s-v5/pusher.go
2025-05-27 10:43:34 +08:00

87 lines
2.1 KiB
Go

package m7s
import (
"time"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/config"
)
type IPusher interface {
task.ITask
GetPushJob() *PushJob
}
type PusherFactory = func() IPusher
type PushJob struct {
Connection
Subscriber *Subscriber
SubConf config.Subscribe
pusher IPusher
}
func (p *PushJob) GetKey() string {
return p.Connection.RemoteURL
}
func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push, subConf *config.Subscribe) *PushJob {
p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy)
p.pusher = pusher
if subConf == nil {
p.SubConf = plugin.config.Subscribe
} else {
p.SubConf = *subConf
}
p.SubConf.SubType = SubscribeTypePush
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
"streamPath": streamPath,
"url": conf.URL,
"maxRetry": conf.MaxRetry,
})
pusher.SetRetry(conf.MaxRetry, conf.RetryInterval)
if sender := plugin.getHookSender(config.HookOnPushStart); sender != nil {
pusher.OnStart(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPullStart,
"streamPath": streamPath,
"url": conf.URL,
"pluginName": plugin.Meta.Name,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPullStart, webhookData)
})
}
if sender := plugin.getHookSender(config.HookOnPushEnd); sender != nil {
pusher.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPullEnd,
"streamPath": streamPath,
"reason": pusher.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPullEnd, webhookData)
})
}
plugin.Server.Pushs.Add(p, plugin.Logger.With("pushURL", conf.URL, "streamPath", streamPath))
return p
}
func (p *PushJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.pusher.GetTask().Context, p.StreamPath, p.SubConf)
return
}
func (p *PushJob) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Pushs.Get(p.GetKey()); ok {
return pkg.ErrPushRemoteURLExist
}
p.AddTask(p.pusher, p.Logger)
return
}