refactor: pull and push proxy

This commit is contained in:
langhuihui
2025-04-13 23:05:36 +08:00
parent 122a91a9b8
commit 45479b41b5
24 changed files with 330 additions and 273 deletions

View File

@@ -172,6 +172,7 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) {
} }
} }
r.Delay = r.Track.LastValue.Sequence - r.Value.Sequence r.Delay = r.Track.LastValue.Sequence - r.Value.Sequence
// fmt.Println(r.Delay)
if r.Track.ICodecCtx != nil { if r.Track.ICodecCtx != nil {
r.Log(context.TODO(), task.TraceLevel, r.Track.FourCC().String(), "ts", r.Value.Timestamp, "delay", r.Delay, "bps", r.BPS) r.Log(context.TODO(), task.TraceLevel, r.Track.FourCC().String(), "ts", r.Value.Timestamp, "delay", r.Delay, "bps", r.BPS)
} else { } else {

View File

@@ -43,13 +43,15 @@ type (
Name string Name string
Version string //插件版本 Version string //插件版本
Type reflect.Type Type reflect.Type
defaultYaml DefaultYaml //默认配置 DefaultYaml DefaultYaml //默认配置
ServiceDesc *grpc.ServiceDesc ServiceDesc *grpc.ServiceDesc
RegisterGRPCHandler func(context.Context, *gatewayRuntime.ServeMux, *grpc.ClientConn) error RegisterGRPCHandler func(context.Context, *gatewayRuntime.ServeMux, *grpc.ClientConn) error
Puller Puller NewPuller PullerFactory
Pusher Pusher NewPusher PusherFactory
Recorder Recorder NewRecorder RecorderFactory
Transformer Transformer NewTransformer TransformerFactory
NewPullProxy PullProxyFactory
NewPushProxy PushProxyFactory
OnExit OnExitHandler OnExit OnExitHandler
OnAuthPub AuthPublisher OnAuthPub AuthPublisher
OnAuthSub AuthSubscriber OnAuthSub AuthSubscriber
@@ -88,12 +90,6 @@ type (
IQUICPlugin interface { IQUICPlugin interface {
OnQUICConnect(quic.Connection) task.ITask OnQUICConnect(quic.Connection) task.ITask
} }
IPullProxyPlugin interface {
OnPullProxyAdd(pullProxy *PullProxyConfig) IPullProxy
}
IPushProxyPlugin interface {
OnPushProxyAdd(pushProxy *PushProxy) any
}
) )
var plugins []PluginMeta var plugins []PluginMeta
@@ -121,9 +117,9 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
p.Config.Get(name).ParseGlobal(s.Config.Get(name)) p.Config.Get(name).ParseGlobal(s.Config.Get(name))
} }
} }
if plugin.defaultYaml != "" { if plugin.DefaultYaml != "" {
var defaultConf map[string]any var defaultConf map[string]any
if err := yaml.Unmarshal([]byte(plugin.defaultYaml), &defaultConf); err != nil { if err := yaml.Unmarshal([]byte(plugin.DefaultYaml), &defaultConf); err != nil {
p.Error("parsing default config", "error", err) p.Error("parsing default config", "error", err)
} else { } else {
p.Config.ParseDefaultYaml(defaultConf) p.Config.ParseDefaultYaml(defaultConf)
@@ -170,7 +166,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
} }
} }
} }
if p.DB != nil && p.Meta.Recorder != nil { if p.DB != nil && p.Meta.NewRecorder != nil {
if err = p.DB.AutoMigrate(&RecordStream{}); err != nil { if err = p.DB.AutoMigrate(&RecordStream{}); err != nil {
p.disable(fmt.Sprintf("auto migrate record stream failed %v", err)) p.disable(fmt.Sprintf("auto migrate record stream failed %v", err))
return return
@@ -182,35 +178,40 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
// InstallPlugin 安装插件 // InstallPlugin 安装插件
func InstallPlugin[C iPlugin](options ...any) error { func InstallPlugin[C iPlugin](options ...any) error {
var c *C var meta PluginMeta
t := reflect.TypeOf(c).Elem() for _, option := range options {
meta := PluginMeta{ if m, ok := option.(PluginMeta); ok {
Name: strings.TrimSuffix(t.Name(), "Plugin"), meta = m
Type: t, }
}
var c *C
meta.Type = reflect.TypeOf(c).Elem()
if meta.Name == "" {
meta.Name = strings.TrimSuffix(meta.Type.Name(), "Plugin")
} }
_, pluginFilePath, _, _ := runtime.Caller(1) _, pluginFilePath, _, _ := runtime.Caller(1)
configDir := filepath.Dir(pluginFilePath) configDir := filepath.Dir(pluginFilePath)
if meta.Version == "" {
if _, after, found := strings.Cut(configDir, "@"); found { if _, after, found := strings.Cut(configDir, "@"); found {
meta.Version = after meta.Version = after
} else { } else {
meta.Version = "dev" meta.Version = "dev"
} }
}
for _, option := range options { for _, option := range options {
switch v := option.(type) { switch v := option.(type) {
case OnExitHandler: case OnExitHandler:
meta.OnExit = v meta.OnExit = v
case DefaultYaml: case DefaultYaml:
meta.defaultYaml = v meta.DefaultYaml = v
case Puller: case PullerFactory:
meta.Puller = v meta.NewPuller = v
case Pusher: case PusherFactory:
meta.Pusher = v meta.NewPusher = v
case Recorder: case RecorderFactory:
meta.Recorder = v meta.NewRecorder = v
case Transformer: case TransformerFactory:
meta.Transformer = v meta.NewTransformer = v
case AuthPublisher: case AuthPublisher:
meta.OnAuthPub = v meta.OnAuthPub = v
case AuthSubscriber: case AuthSubscriber:
@@ -449,14 +450,14 @@ func (p *Plugin) SendWebhook(hookType config.HookType, conf config.Webhook, data
// TODO: use alias stream // TODO: use alias stream
func (p *Plugin) OnPublish(pub *Publisher) { func (p *Plugin) OnPublish(pub *Publisher) {
onPublish := p.config.OnPub onPublish := p.config.OnPub
if p.Meta.Pusher != nil { if p.Meta.NewPusher != nil {
for r, pushConf := range onPublish.Push { for r, pushConf := range onPublish.Push {
if pushConf.URL = r.Replace(pub.StreamPath, pushConf.URL); pushConf.URL != "" { if pushConf.URL = r.Replace(pub.StreamPath, pushConf.URL); pushConf.URL != "" {
p.Push(pub.StreamPath, pushConf, nil) p.Push(pub.StreamPath, pushConf, nil)
} }
} }
} }
if p.Meta.Recorder != nil { if p.Meta.NewRecorder != nil {
for r, recConf := range onPublish.Record { for r, recConf := range onPublish.Record {
if recConf.FilePath = r.Replace(pub.StreamPath, recConf.FilePath); recConf.FilePath != "" { if recConf.FilePath = r.Replace(pub.StreamPath, recConf.FilePath); recConf.FilePath != "" {
p.Record(pub, recConf, nil) p.Record(pub, recConf, nil)
@@ -468,7 +469,7 @@ func (p *Plugin) OnPublish(pub *Publisher) {
if owner != nil { if owner != nil {
_, isTransformer = owner.(ITransformer) _, isTransformer = owner.(ITransformer)
} }
if p.Meta.Transformer != nil && !isTransformer { if p.Meta.NewTransformer != nil && !isTransformer {
for r, tranConf := range onPublish.Transform { for r, tranConf := range onPublish.Transform {
if group := r.FindStringSubmatch(pub.StreamPath); group != nil { if group := r.FindStringSubmatch(pub.StreamPath); group != nil {
for j, to := range tranConf.Output { for j, to := range tranConf.Output {
@@ -513,7 +514,7 @@ func (p *Plugin) OnSubscribe(streamPath string, args url.Values) {
// } // }
// } // }
for reg, conf := range p.config.OnSub.Pull { for reg, conf := range p.config.OnSub.Pull {
if p.Meta.Puller != nil && reg.MatchString(streamPath) { if p.Meta.NewPuller != nil && reg.MatchString(streamPath) {
conf.Args = config.HTTPValues(args) conf.Args = config.HTTPValues(args)
conf.URL = reg.Replace(streamPath, conf.URL) conf.URL = reg.Replace(streamPath, conf.URL)
p.handler.Pull(streamPath, conf, nil) p.handler.Pull(streamPath, conf, nil)
@@ -613,7 +614,7 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber *
} }
func (p *Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publish) { func (p *Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publish) {
puller := p.Meta.Puller(conf) puller := p.Meta.NewPuller(conf)
if puller == nil { if puller == nil {
return return
} }
@@ -621,19 +622,19 @@ func (p *Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publi
} }
func (p *Plugin) Push(streamPath string, conf config.Push, subConf *config.Subscribe) { func (p *Plugin) Push(streamPath string, conf config.Push, subConf *config.Subscribe) {
pusher := p.Meta.Pusher() pusher := p.Meta.NewPusher()
pusher.GetPushJob().Init(pusher, p, streamPath, conf, subConf) pusher.GetPushJob().Init(pusher, p, streamPath, conf, subConf)
} }
func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) *RecordJob { func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) *RecordJob {
recorder := p.Meta.Recorder(conf) recorder := p.Meta.NewRecorder(conf)
job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf, subConf) job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf, subConf)
job.Depend(pub) job.Depend(pub)
return job return job
} }
func (p *Plugin) Transform(pub *Publisher, conf config.Transform) { func (p *Plugin) Transform(pub *Publisher, conf config.Transform) {
transformer := p.Meta.Transformer() transformer := p.Meta.NewTransformer()
job := transformer.GetTransformJob().Init(transformer, p, pub, conf) job := transformer.GetTransformJob().Init(transformer, p, pub, conf)
job.Depend(pub) job.Depend(pub)
} }

View File

@@ -24,7 +24,14 @@ type FLVPlugin struct {
const defaultConfig m7s.DefaultYaml = `publish: const defaultConfig m7s.DefaultYaml = `publish:
speed: 1` speed: 1`
var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, NewRecorder, pb.RegisterApiServer, &pb.Api_ServiceDesc) var _ = m7s.InstallPlugin[FLVPlugin](m7s.PluginMeta{
DefaultYaml: defaultConfig,
NewPuller: NewPuller,
NewRecorder: NewRecorder,
RegisterGRPCHandler: pb.RegisterApiHandler,
ServiceDesc: &pb.Api_ServiceDesc,
NewPullProxy: m7s.NewHTTPPullPorxy,
})
func (plugin *FLVPlugin) OnInit() (err error) { func (plugin *FLVPlugin) OnInit() (err error) {
_, port, _ := strings.Cut(plugin.GetCommonConf().HTTP.ListenAddr, ":") _, port, _ := strings.Cut(plugin.GetCommonConf().HTTP.ListenAddr, ":")
@@ -96,10 +103,3 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
err = live.Run() err = live.Run()
} }
func (plugin *FLVPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) m7s.IPullProxy {
d := &m7s.HTTPPullProxy{}
d.PullProxyConfig = pullProxy
d.Plugin = &plugin.Plugin
return d
}

View File

@@ -2,10 +2,12 @@ package plugin_gb28181pro
import ( import (
"log/slog" "log/slog"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
"m7s.live/v5" "m7s.live/v5"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util" "m7s.live/v5/pkg/util"
gb28181 "m7s.live/v5/plugin/gb28181/pkg" gb28181 "m7s.live/v5/plugin/gb28181/pkg"
) )
@@ -41,7 +43,7 @@ func (r *PresetRequest) GetKey() int {
} }
type Channel struct { type Channel struct {
PullProxyTask *m7s.PullProxyTask PullProxyTask *PullProxy // 拉流任务
Device *Device // 所属设备 Device *Device // 所属设备
State atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲 State atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲
GpsTime time.Time // gps时间 GpsTime time.Time // gps时间
@@ -59,3 +61,27 @@ func (c *Channel) GetKey() string {
func (c *Channel) GetDeviceID() string { func (c *Channel) GetDeviceID() string {
return c.DeviceID return c.DeviceID
} }
type PullProxy struct {
task.Task
m7s.BasePullProxy
}
func NewPullProxy() m7s.IPullProxy {
return &PullProxy{}
}
func (p *PullProxy) GetKey() uint {
return p.PullProxyConfig.ID
}
func (p *PullProxy) Start() error {
streamPaths := strings.Split(p.GetStreamPath(), "/")
deviceId, channelId := streamPaths[0], streamPaths[1]
if device, ok := p.Plugin.GetHandler().(*GB28181Plugin).devices.Get(deviceId); ok {
if _, ok := device.channels.Get(channelId); ok {
p.ChangeStatus(m7s.PullProxyStatusOnline)
}
}
return nil
}

View File

@@ -59,11 +59,16 @@ type GB28181Plugin struct {
sipPorts []int sipPorts []int
} }
var _ = m7s.InstallPlugin[GB28181Plugin](pb.RegisterApiHandler, &pb.Api_ServiceDesc, func(conf config.Pull) m7s.IPuller { var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
RegisterGRPCHandler: pb.RegisterApiHandler,
ServiceDesc: &pb.Api_ServiceDesc,
NewPuller: func(conf config.Pull) m7s.IPuller {
if util.Exist(conf.URL) { if util.Exist(conf.URL) {
return &gb28181.DumpPuller{} return &gb28181.DumpPuller{}
} }
return new(Dialog) return new(Dialog)
},
NewPullProxy: NewPullProxy,
}) })
func init() { func init() {
@@ -267,7 +272,7 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
conf := absDevice.GetConfig() conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceID, c.DeviceID) return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceID, c.DeviceID)
}); ok { }); ok {
c.PullProxyTask = absDevice.(*m7s.PullProxyTask) c.PullProxyTask = absDevice.(*PullProxy)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline) absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
} }
}) })
@@ -365,13 +370,6 @@ func (gb *GB28181Plugin) checkPlatform() {
} }
} }
func (p *GB28181Plugin) OnPullProxyAdd(conf *m7s.PullProxyConfig) m7s.IPullProxy {
return &m7s.PullProxyTask{
PullProxyConfig: conf,
Plugin: &p.Plugin,
}
}
func (gb *GB28181Plugin) RegisterHandler() map[string]http.HandlerFunc { func (gb *GB28181Plugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{ return map[string]http.HandlerFunc{
"/api/ps/replay/{streamPath...}": gb.api_ps_replay, "/api/ps/replay/{streamPath...}": gb.api_ps_replay,
@@ -831,7 +829,7 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
conf := absDevice.GetConfig() conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID) return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID)
}); ok { }); ok {
c.PullProxyTask = absDevice.(*m7s.PullProxyTask) c.PullProxyTask = absDevice.(*PullProxy)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline) absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
} }
}) })

View File

@@ -14,7 +14,12 @@ import (
hls "m7s.live/v5/plugin/hls/pkg" hls "m7s.live/v5/plugin/hls/pkg"
) )
var _ = m7s.InstallPlugin[HLSPlugin](hls.NewTransform, hls.NewRecorder, hls.NewPuller) var _ = m7s.InstallPlugin[HLSPlugin](m7s.PluginMeta{
NewTransformer: hls.NewTransform,
NewRecorder: hls.NewRecorder,
NewPuller: hls.NewPuller,
NewPullProxy: m7s.NewHTTPPullPorxy,
})
//go:embed hls.js //go:embed hls.js
var hls_js embed.FS var hls_js embed.FS
@@ -45,13 +50,6 @@ func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc {
} }
} }
func (p *HLSPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) m7s.IPullProxy {
d := &m7s.HTTPPullProxy{}
d.PullProxyConfig = pullProxy
d.Plugin = &p.Plugin
return d
}
func (config *HLSPlugin) vod(w http.ResponseWriter, r *http.Request) { func (config *HLSPlugin) vod(w http.ResponseWriter, r *http.Request) {
recordType := "ts" recordType := "ts"
if r.PathValue("streamPath") == "mp4.m3u8" { if r.PathValue("streamPath") == "mp4.m3u8" {

View File

@@ -64,7 +64,14 @@ const defaultConfig m7s.DefaultYaml = `publish:
speed: 1` speed: 1`
// var exceptionChannel = make(chan *Exception) // var exceptionChannel = make(chan *Exception)
var _ = m7s.InstallPlugin[MP4Plugin](defaultConfig, &pb.Api_ServiceDesc, pb.RegisterApiHandler, pkg.NewPuller, pkg.NewRecorder) var _ = m7s.InstallPlugin[MP4Plugin](m7s.PluginMeta{
DefaultYaml: defaultConfig,
ServiceDesc: &pb.Api_ServiceDesc,
RegisterGRPCHandler: pb.RegisterApiHandler,
NewPuller: pkg.NewPuller,
NewRecorder: pkg.NewRecorder,
NewPullProxy: m7s.NewHTTPPullPorxy,
})
func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc { func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{ return map[string]http.HandlerFunc{

View File

@@ -39,7 +39,7 @@ func (p *PreviewPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for _, streamPath := range pullPlugin.GetPullableList() { for _, streamPath := range pullPlugin.GetPullableList() {
s += fmt.Sprintf("<a href='%s'>%s</a><br>", streamPath, streamPath) s += fmt.Sprintf("<a href='%s'>%s</a><br>", streamPath, streamPath)
} }
} else if plugin.Meta.Puller != nil { } else if plugin.Meta.NewPuller != nil {
s += fmt.Sprintf("<h3>%s</h3>", plugin.Meta.Name) s += fmt.Sprintf("<h3>%s</h3>", plugin.Meta.Name)
for _, streamPath := range slices.Collect(maps.Keys(plugin.GetCommonConf().OnSub.Pull)) { for _, streamPath := range slices.Collect(maps.Keys(plugin.GetCommonConf().OnSub.Pull)) {
s += fmt.Sprintf("<a href='%s'>%s</a><br>", streamPath, streamPath) s += fmt.Sprintf("<a href='%s'>%s</a><br>", streamPath, streamPath)

View File

@@ -21,8 +21,16 @@ type RTMPPlugin struct {
C2 bool C2 bool
} }
var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp: var _ = m7s.InstallPlugin[RTMPPlugin](m7s.PluginMeta{
listenaddr: :1935`), &pb.Api_ServiceDesc, pb.RegisterApiHandler, NewPusher, NewPuller) DefaultYaml: `tcp:
listenaddr: :1935`,
ServiceDesc: &pb.Api_ServiceDesc,
RegisterGRPCHandler: pb.RegisterApiHandler,
NewPusher: NewPusher,
NewPuller: NewPuller,
NewPullProxy: NewPullProxy,
NewPushProxy: NewPushProxy,
})
type RTMPServer struct { type RTMPServer struct {
NetConnection NetConnection
@@ -182,21 +190,6 @@ func (task *RTMPServer) Go() (err error) {
return return
} }
func (p *RTMPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) m7s.IPullProxy {
ret := &RTMPPullProxy{}
ret.PullProxyConfig = pullProxy
ret.Plugin = &p.Plugin
return ret
}
func (p *RTMPPlugin) OnPushProxyAdd(pushProxy *m7s.PushProxy) any {
ret := &RTMPPushProxy{}
ret.PushProxy = pushProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("pushProxy", pushProxy.Name)
return ret
}
func (p *RTMPPlugin) OnInit() (err error) { func (p *RTMPPlugin) OnInit() (err error) {
if tcpAddr := p.GetCommonConf().TCP.ListenAddr; tcpAddr != "" { if tcpAddr := p.GetCommonConf().TCP.ListenAddr; tcpAddr != "" {
_, port, _ := strings.Cut(tcpAddr, ":") _, port, _ := strings.Cut(tcpAddr, ":")

View File

@@ -1,4 +1,4 @@
package plugin_rtmp package rtmp
import ( import (
"fmt" "fmt"
@@ -12,6 +12,10 @@ type RTMPPullProxy struct {
m7s.TCPPullProxy m7s.TCPPullProxy
} }
func NewPullProxy() m7s.IPullProxy {
return &RTMPPullProxy{}
}
func (d *RTMPPullProxy) Start() (err error) { func (d *RTMPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxyConfig.URL) d.URL, err = url.Parse(d.PullProxyConfig.URL)
if err != nil { if err != nil {

View File

@@ -1,4 +1,4 @@
package plugin_rtmp package rtmp
import ( import (
"fmt" "fmt"
@@ -8,12 +8,16 @@ import (
"m7s.live/v5" "m7s.live/v5"
) )
func NewPushProxy() m7s.IPushProxy {
return &RTMPPushProxy{}
}
type RTMPPushProxy struct { type RTMPPushProxy struct {
m7s.TCPPushProxy m7s.TCPPushProxy
} }
func (d *RTMPPushProxy) Start() (err error) { func (d *RTMPPushProxy) Start() (err error) {
d.URL, err = url.Parse(d.PushProxy.URL) d.URL, err = url.Parse(d.PushProxyConfig.URL)
if err != nil { if err != nil {
return return
} }

View File

@@ -11,10 +11,14 @@ import (
. "m7s.live/v5/plugin/rtsp/pkg" . "m7s.live/v5/plugin/rtsp/pkg"
) )
const defaultConfig = m7s.DefaultYaml(`tcp: var _ = m7s.InstallPlugin[RTSPPlugin](m7s.PluginMeta{
listenaddr: :554`) DefaultYaml: `tcp:
listenaddr: :554`,
var _ = m7s.InstallPlugin[RTSPPlugin](defaultConfig, NewPuller, NewPusher) NewPuller: NewPuller,
NewPusher: NewPusher,
NewPullProxy: NewPullProxy,
NewPushProxy: NewPushProxy,
})
type RTSPPlugin struct { type RTSPPlugin struct {
m7s.Plugin m7s.Plugin
@@ -26,21 +30,6 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
return ret return ret
} }
func (p *RTSPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) m7s.IPullProxy {
ret := &RTSPPullProxy{}
ret.PullProxyConfig = pullProxy
ret.Plugin = &p.Plugin
return ret
}
func (p *RTSPPlugin) OnPushProxyAdd(pushProxy *m7s.PushProxy) any {
ret := &RTSPPushProxy{}
ret.PushProxy = pushProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("pushProxy", pushProxy.Name)
return ret
}
func (p *RTSPPlugin) OnInit() (err error) { func (p *RTSPPlugin) OnInit() (err error) {
if tcpAddr := p.GetCommonConf().TCP.ListenAddr; tcpAddr != "" { if tcpAddr := p.GetCommonConf().TCP.ListenAddr; tcpAddr != "" {
_, port, _ := strings.Cut(tcpAddr, ":") _, port, _ := strings.Cut(tcpAddr, ":")

View File

@@ -1,4 +1,4 @@
package plugin_rtsp package rtsp
import ( import (
"fmt" "fmt"
@@ -8,9 +8,12 @@ import (
"m7s.live/v5" "m7s.live/v5"
"m7s.live/v5/pkg/util" "m7s.live/v5/pkg/util"
. "m7s.live/v5/plugin/rtsp/pkg"
) )
func NewPullProxy() m7s.IPullProxy {
return &RTSPPullProxy{}
}
type RTSPPullProxy struct { type RTSPPullProxy struct {
m7s.TCPPullProxy m7s.TCPPullProxy
conn Stream conn Stream
@@ -39,7 +42,7 @@ func (d *RTSPPullProxy) Start() (err error) {
MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12), MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
UserAgent: "monibuca" + m7s.Version, UserAgent: "monibuca" + m7s.Version,
} }
d.conn.Logger = d.Plugin.Logger d.conn.Logger = d.Logger
return d.TCPPullProxy.Start() return d.TCPPullProxy.Start()
} }

View File

@@ -1,4 +1,4 @@
package plugin_rtsp package rtsp
import ( import (
"fmt" "fmt"
@@ -8,16 +8,20 @@ import (
"m7s.live/v5" "m7s.live/v5"
"m7s.live/v5/pkg/util" "m7s.live/v5/pkg/util"
. "m7s.live/v5/plugin/rtsp/pkg"
) )
func NewPushProxy() m7s.IPushProxy {
return &RTSPPushProxy{}
}
type RTSPPushProxy struct { type RTSPPushProxy struct {
m7s.TCPPushProxy m7s.TCPPushProxy
conn Stream conn Stream
} }
func (d *RTSPPushProxy) Start() (err error) { func (d *RTSPPushProxy) Start() (err error) {
d.URL, err = url.Parse(d.PushProxy.URL) urlStr := d.PushProxyConfig.URL
d.URL, err = url.Parse(urlStr)
if err != nil { if err != nil {
return return
} }
@@ -39,24 +43,24 @@ func (d *RTSPPushProxy) Start() (err error) {
MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12), MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
UserAgent: "monibuca" + m7s.Version, UserAgent: "monibuca" + m7s.Version,
} }
d.conn.Logger = d.Plugin.Logger d.conn.Logger = d.Logger
return d.TCPPushProxy.Start() return d.TCPPushProxy.Start()
} }
func (d *RTSPPushProxy) Tick(any) { func (d *RTSPPushProxy) Tick(any) {
switch d.PushProxy.Status { switch d.Status {
case m7s.PushProxyStatusOffline: case m7s.PushProxyStatusOffline:
err := d.conn.Connect(d.PushProxy.URL) err := d.conn.Connect(d.URL.String())
if err != nil { if err != nil {
return return
} }
d.PushProxy.ChangeStatus(m7s.PushProxyStatusOnline) d.ChangeStatus(m7s.PushProxyStatusOnline)
case m7s.PushProxyStatusOnline, m7s.PushProxyStatusPushing: case m7s.PushProxyStatusOnline, m7s.PushProxyStatusPushing:
t := time.Now() t := time.Now()
err := d.conn.Options() err := d.conn.Options()
d.PushProxy.RTT = time.Since(t) d.RTT = time.Since(t)
if err != nil { if err != nil {
d.PushProxy.ChangeStatus(m7s.PushProxyStatusOffline) d.ChangeStatus(m7s.PushProxyStatusOffline)
} }
} }
} }

View File

@@ -6,7 +6,11 @@ import (
sei "m7s.live/v5/plugin/sei/pkg" sei "m7s.live/v5/plugin/sei/pkg"
) )
var _ = m7s.InstallPlugin[SEIPlugin](sei.NewTransform, pb.RegisterApiServer, &pb.Api_ServiceDesc) var _ = m7s.InstallPlugin[SEIPlugin](m7s.PluginMeta{
NewTransformer: sei.NewTransform,
RegisterGRPCHandler: pb.RegisterApiHandler,
ServiceDesc: &pb.Api_ServiceDesc,
})
type SEIPlugin struct { type SEIPlugin struct {
pb.UnimplementedApiServer pb.UnimplementedApiServer

View File

@@ -16,7 +16,7 @@ import (
"m7s.live/v5/plugin/stress/pb" "m7s.live/v5/plugin/stress/pb"
) )
func (r *StressPlugin) pull(count int, format, url string, puller m7s.Puller) (err error) { func (r *StressPlugin) pull(count int, format, url string, puller m7s.PullerFactory) (err error) {
if i := r.pullers.Length; count > i { if i := r.pullers.Length; count > i {
for j := i; j < count; j++ { for j := i; j < count; j++ {
conf := config.Pull{URL: fmt.Sprintf(format, url, j)} conf := config.Pull{URL: fmt.Sprintf(format, url, j)}
@@ -39,7 +39,7 @@ func (r *StressPlugin) pull(count int, format, url string, puller m7s.Puller) (e
return return
} }
func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pusher m7s.Pusher) (err error) { func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pusher m7s.PusherFactory) (err error) {
if i := r.pushers.Length; count > i { if i := r.pushers.Length; count > i {
for j := i; j < count; j++ { for j := i; j < count; j++ {
p := pusher() p := pusher()

View File

@@ -6,7 +6,11 @@ import (
transcode "m7s.live/v5/plugin/transcode/pkg" transcode "m7s.live/v5/plugin/transcode/pkg"
) )
var _ = m7s.InstallPlugin[TranscodePlugin](transcode.NewTransform, pb.RegisterApiHandler, &pb.Api_ServiceDesc) var _ = m7s.InstallPlugin[TranscodePlugin](m7s.PluginMeta{
NewTransformer: transcode.NewTransform,
RegisterGRPCHandler: pb.RegisterApiHandler,
ServiceDesc: &pb.Api_ServiceDesc,
})
type TranscodePlugin struct { type TranscodePlugin struct {
pb.UnimplementedApiServer pb.UnimplementedApiServer

View File

@@ -31,6 +31,7 @@ const (
type ( type (
IPullProxy interface { IPullProxy interface {
task.ITask task.ITask
GetBase() *BasePullProxy
GetStreamPath() string GetStreamPath() string
GetConfig() *PullProxyConfig GetConfig() *PullProxyConfig
ChangeStatus(status byte) ChangeStatus(status byte)
@@ -38,7 +39,6 @@ type (
GetKey() uint GetKey() uint
} }
PullProxyConfig struct { PullProxyConfig struct {
server *Server `gorm:"-:all"`
ID uint `gorm:"primarykey"` ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"` CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `yaml:"-"` DeletedAt gorm.DeletedAt `yaml:"-"`
@@ -53,24 +53,33 @@ type (
Description string Description string
RTT time.Duration RTT time.Duration
} }
PullProxyFactory = func() IPullProxy
PullProxyManager struct { PullProxyManager struct {
task.Manager[uint, IPullProxy] task.Manager[uint, IPullProxy]
} }
PullProxyTask struct { BasePullProxy struct {
*PullProxyConfig *PullProxyConfig
task.AsyncTickTask
Plugin *Plugin Plugin *Plugin
} }
HTTPPullProxy struct { HTTPPullProxy struct {
TCPPullProxy TCPPullProxy
} }
TCPPullProxy struct { TCPPullProxy struct {
PullProxyTask task.AsyncTickTask
BasePullProxy
TCPAddr *net.TCPAddr TCPAddr *net.TCPAddr
URL *url.URL URL *url.URL
} }
) )
func (b *BasePullProxy) GetBase() *BasePullProxy {
return b
}
func NewHTTPPullPorxy() IPullProxy {
return &HTTPPullProxy{}
}
func (d *PullProxyConfig) GetKey() uint { func (d *PullProxyConfig) GetKey() uint {
return d.ID return d.ID
} }
@@ -86,14 +95,16 @@ func (d *PullProxyConfig) GetStreamPath() string {
return d.StreamPath return d.StreamPath
} }
func (d *PullProxyTask) ChangeStatus(status byte) { func (d *BasePullProxy) ChangeStatus(status byte) {
if d.Status == status { if d.Status == status {
return return
} }
from := d.Status from := d.Status
d.Info("device status changed", "from", from, "to", status) d.Plugin.Info("device status changed", "from", from, "to", status)
d.Status = status d.Status = status
d.Update() if d.Plugin.Server.DB != nil {
d.Plugin.Server.DB.Omit("deleted_at").Save(d)
}
switch status { switch status {
case PullProxyStatusOnline: case PullProxyStatusOnline:
if d.PullOnStart && from == PullProxyStatusOffline { if d.PullOnStart && from == PullProxyStatusOffline {
@@ -102,13 +113,7 @@ func (d *PullProxyTask) ChangeStatus(status byte) {
} }
} }
func (d *PullProxyConfig) Update() { func (d *BasePullProxy) Dispose() {
if d.server.DB != nil {
d.server.DB.Omit("deleted_at").Save(d)
}
}
func (d *PullProxyTask) Dispose() {
d.ChangeStatus(PullProxyStatusOffline) d.ChangeStatus(PullProxyStatusOffline)
if stream, ok := d.Plugin.Server.Streams.SafeGet(d.GetStreamPath()); ok { if stream, ok := d.Plugin.Server.Streams.SafeGet(d.GetStreamPath()); ok {
stream.Stop(task.ErrStopByUser) stream.Stop(task.ErrStopByUser)
@@ -116,7 +121,6 @@ func (d *PullProxyTask) Dispose() {
} }
func (d *PullProxyConfig) InitializeWithServer(s *Server) { func (d *PullProxyConfig) InitializeWithServer(s *Server) {
d.server = s
if d.Type == "" { if d.Type == "" {
u, err := url.Parse(d.URL) u, err := url.Parse(d.URL)
if err != nil { if err != nil {
@@ -140,7 +144,7 @@ func (d *PullProxyConfig) InitializeWithServer(s *Server) {
} }
} }
func (d *PullProxyTask) Pull() { func (d *BasePullProxy) Pull() {
var pubConf = d.Plugin.config.Publish var pubConf = d.Plugin.config.Publish
pubConf.PubAudio = d.Audio pubConf.PubAudio = d.Audio
pubConf.DelayCloseTimeout = util.Conditional(d.StopOnIdle, time.Second*5, 0) pubConf.DelayCloseTimeout = util.Conditional(d.StopOnIdle, time.Second*5, 0)
@@ -169,7 +173,7 @@ func (d *HTTPPullProxy) Start() (err error) {
} }
} }
} }
return d.PullProxyTask.Start() return d.TCPPullProxy.Start()
} }
func (d *TCPPullProxy) GetTickInterval() time.Duration { func (d *TCPPullProxy) GetTickInterval() time.Duration {
@@ -217,11 +221,11 @@ func (p *Publisher) processPullProxyOnDispose() {
func (s *Server) createPullProxy(conf *PullProxyConfig) (pullProxy IPullProxy, err error) { func (s *Server) createPullProxy(conf *PullProxyConfig) (pullProxy IPullProxy, err error) {
for plugin := range s.Plugins.Range { for plugin := range s.Plugins.Range {
if pullPlugin, ok := plugin.handler.(IPullProxyPlugin); ok && strings.EqualFold(conf.Type, plugin.Meta.Name) { if plugin.Meta.NewPullProxy != nil && strings.EqualFold(conf.Type, plugin.Meta.Name) {
pullProxy = pullPlugin.OnPullProxyAdd(conf) pullProxy = plugin.Meta.NewPullProxy()
if pullProxy == nil { base := pullProxy.GetBase()
continue base.PullProxyConfig = conf
} base.Plugin = plugin
s.PullProxies.Add(pullProxy, plugin.Logger.With("pullProxyId", conf.ID, "pullProxyType", conf.Type, "pullProxyName", conf.Name)) s.PullProxies.Add(pullProxy, plugin.Logger.With("pullProxyId", conf.ID, "pullProxyType", conf.Type, "pullProxyName", conf.Name))
return return
} }
@@ -257,7 +261,6 @@ func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res
func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) { func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) {
device := &PullProxyConfig{ device := &PullProxyConfig{
server: s,
Name: req.Name, Name: req.Name,
Type: req.Type, Type: req.Type,
ParentID: uint(req.ParentID), ParentID: uint(req.ParentID),
@@ -313,9 +316,7 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re
err = pkg.ErrNoDB err = pkg.ErrNoDB
return return
} }
target := &PullProxyConfig{ target := &PullProxyConfig{}
server: s,
}
err = s.DB.First(target, req.ID).Error err = s.DB.First(target, req.ID).Error
if err != nil { if err != nil {
return return

View File

@@ -33,7 +33,7 @@ type (
GetPullJob() *PullJob GetPullJob() *PullJob
} }
Puller = func(config.Pull) IPuller PullerFactory = func(config.Pull) IPuller
PullJob struct { PullJob struct {
Connection Connection

View File

@@ -28,11 +28,15 @@ const (
type ( type (
IPushProxy interface { IPushProxy interface {
task.ITask
GetBase() *BasePushProxy
GetStreamPath() string
GetConfig() *PushProxyConfig
ChangeStatus(status byte)
Push() Push()
GetKey() uint
} }
PushProxy struct { PushProxyConfig struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all" yaml:"-"`
ID uint `gorm:"primarykey"` ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"` CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `yaml:"-"` DeletedAt gorm.DeletedAt `yaml:"-"`
@@ -45,76 +49,82 @@ type (
Status byte Status byte
Description string Description string
RTT time.Duration RTT time.Duration
Handler IPushProxy `gorm:"-:all" yaml:"-"`
} }
PushProxyFactory = func() IPushProxy
PushProxyManager struct { PushProxyManager struct {
task.Manager[uint, *PushProxy] task.Manager[uint, IPushProxy]
} }
PushProxyTask struct { BasePushProxy struct {
task.AsyncTickTask *PushProxyConfig
PushProxy *PushProxy
Plugin *Plugin Plugin *Plugin
} }
HTTPPushProxy struct {
TCPPushProxy
}
TCPPushProxy struct { TCPPushProxy struct {
PushProxyTask task.AsyncTickTask
BasePushProxy
TCPAddr *net.TCPAddr TCPAddr *net.TCPAddr
URL *url.URL URL *url.URL
} }
) )
func (d *PushProxy) GetKey() uint { func (d *PushProxyConfig) GetKey() uint {
return d.ID return d.ID
} }
func (d *PushProxy) GetStreamPath() string { func (d *PushProxyConfig) GetConfig() *PushProxyConfig {
return d
}
func (d *PushProxyConfig) GetStreamPath() string {
if d.StreamPath == "" { if d.StreamPath == "" {
return fmt.Sprintf("push/%s/%d", d.Type, d.ID) return fmt.Sprintf("push/%s/%d", d.Type, d.ID)
} }
return d.StreamPath return d.StreamPath
} }
func (d *PushProxy) Start() (err error) { func (s *Server) createPushProxy(conf *PushProxyConfig) (pushProxy IPushProxy, err error) {
for plugin := range d.server.Plugins.Range { for plugin := range s.Plugins.Range {
if pushPlugin, ok := plugin.handler.(IPushProxyPlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) { if plugin.Meta.NewPushProxy != nil && strings.EqualFold(conf.Type, plugin.Meta.Name) {
pushTask := pushPlugin.OnPushProxyAdd(d) pushProxy = plugin.Meta.NewPushProxy()
if pushTask == nil { base := pushProxy.GetBase()
continue base.PushProxyConfig = conf
} base.Plugin = plugin
if pushTask, ok := pushTask.(IPushProxy); ok { s.PushProxies.Add(pushProxy, plugin.Logger.With("pushProxyId", conf.ID, "pushProxyType", conf.Type, "pushProxyName", conf.Name))
d.Handler = pushTask return
}
if t, ok := pushTask.(task.ITask); ok {
if ticker, ok := t.(task.IChannelTask); ok {
t.OnStart(func() {
ticker.Tick(nil)
})
}
d.AddTask(t)
} else {
d.ChangeStatus(PushProxyStatusOnline)
}
} }
} }
return return
} }
func (d *PushProxy) ChangeStatus(status byte) { func (b *BasePushProxy) GetBase() *BasePushProxy {
return b
}
func NewHTTPPushProxy() IPushProxy {
return &HTTPPushProxy{}
}
func (d *BasePushProxy) ChangeStatus(status byte) {
if d.Status == status { if d.Status == status {
return return
} }
from := d.Status from := d.Status
d.Info("device status changed", "from", from, "to", status) d.Plugin.Info("device status changed", "from", from, "to", status)
d.Status = status d.Status = status
d.Update() if d.Plugin.Server.DB != nil {
d.Plugin.Server.DB.Omit("deleted_at").Save(d)
}
switch status { switch status {
case PushProxyStatusOnline: case PushProxyStatusOnline:
if from == PushProxyStatusOffline { if from == PushProxyStatusOffline {
if d.PushOnStart { if d.PushOnStart {
d.Handler.Push() d.Push()
} else { } else {
d.server.Streams.Call(func() error { d.Plugin.Server.Streams.Call(func() error {
if d.server.Streams.Has(d.GetStreamPath()) { if d.Plugin.Server.Streams.Has(d.GetStreamPath()) {
d.Handler.Push() d.Push()
} }
return nil return nil
}) })
@@ -123,20 +133,17 @@ func (d *PushProxy) ChangeStatus(status byte) {
} }
} }
func (d *PushProxy) Update() { func (d *BasePushProxy) Dispose() {
if d.server.DB != nil { d.ChangeStatus(PushProxyStatusOffline)
d.server.DB.Omit("deleted_at").Save(d) if stream, ok := d.Plugin.Server.Streams.SafeGet(d.GetStreamPath()); ok {
stream.Stop(task.ErrStopByUser)
} }
} }
func (d *PushProxyTask) Dispose() { func (d *BasePushProxy) Push() {
d.PushProxy.ChangeStatus(PushProxyStatusOffline)
}
func (d *PushProxyTask) Push() {
var subConf = d.Plugin.config.Subscribe var subConf = d.Plugin.config.Subscribe
subConf.SubAudio = d.PushProxy.Audio subConf.SubAudio = d.Audio
d.Plugin.handler.Push(d.PushProxy.GetStreamPath(), d.PushProxy.Push, &subConf) d.Plugin.handler.Push(d.GetStreamPath(), d.PushProxyConfig.Push, &subConf)
} }
func (d *TCPPushProxy) GetTickInterval() time.Duration { func (d *TCPPushProxy) GetTickInterval() time.Duration {
@@ -147,23 +154,46 @@ func (d *TCPPushProxy) Tick(any) {
startTime := time.Now() startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.TCPAddr) conn, err := net.DialTCP("tcp", nil, d.TCPAddr)
if err != nil { if err != nil {
d.PushProxy.ChangeStatus(PushProxyStatusOffline) d.ChangeStatus(PushProxyStatusOffline)
return return
} }
conn.Close() conn.Close()
d.PushProxy.RTT = time.Since(startTime) d.RTT = time.Since(startTime)
if d.PushProxy.Status == PushProxyStatusOffline { if d.Status == PushProxyStatusOffline {
d.PushProxy.ChangeStatus(PushProxyStatusOnline) d.ChangeStatus(PushProxyStatusOnline)
} }
} }
func (d *PushProxy) InitializeWithServer(s *Server) { func (d *HTTPPushProxy) Start() (err error) {
d.server = s d.URL, err = url.Parse(d.PushProxyConfig.URL)
d.Logger = s.Logger.With("pushProxy", d.ID, "type", d.Type, "name", d.Name) if err != nil {
return
}
if ips, err := net.LookupIP(d.URL.Hostname()); err != nil {
return err
} else if len(ips) == 0 {
return fmt.Errorf("no IP found for host: %s", d.URL.Hostname())
} else {
d.TCPAddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(ips[0].String(), d.URL.Port()))
if err != nil {
return err
}
if d.TCPAddr.Port == 0 {
if d.URL.Scheme == "https" || d.URL.Scheme == "wss" {
d.TCPAddr.Port = 443
} else {
d.TCPAddr.Port = 80
}
}
}
return d.TCPPushProxy.Start()
}
func (d *PushProxyConfig) InitializeWithServer(s *Server) {
if d.Type == "" { if d.Type == "" {
u, err := url.Parse(d.URL) u, err := url.Parse(d.URL)
if err != nil { if err != nil {
d.Logger.Error("parse push url failed", "error", err) s.Error("parse push url failed", "error", err)
return return
} }
switch u.Scheme { switch u.Scheme {
@@ -187,19 +217,20 @@ func (s *Server) GetPushProxyList(ctx context.Context, req *emptypb.Empty) (res
res = &pb.PushProxyListResponse{} res = &pb.PushProxyListResponse{}
s.PushProxies.Call(func() error { s.PushProxies.Call(func() error {
for device := range s.PushProxies.Range { for device := range s.PushProxies.Range {
conf := device.GetConfig()
res.Data = append(res.Data, &pb.PushProxyInfo{ res.Data = append(res.Data, &pb.PushProxyInfo{
Name: device.Name, Name: conf.Name,
CreateTime: timestamppb.New(device.CreatedAt), CreateTime: timestamppb.New(conf.CreatedAt),
UpdateTime: timestamppb.New(device.UpdatedAt), UpdateTime: timestamppb.New(conf.UpdatedAt),
Type: device.Type, Type: conf.Type,
PushURL: device.URL, PushURL: conf.URL,
ParentID: uint32(device.ParentID), ParentID: uint32(conf.ParentID),
Status: uint32(device.Status), Status: uint32(conf.Status),
ID: uint32(device.ID), ID: uint32(conf.ID),
PushOnStart: device.PushOnStart, PushOnStart: conf.PushOnStart,
Audio: device.Audio, Audio: conf.Audio,
Description: device.Description, Description: conf.Description,
Rtt: uint32(device.RTT.Milliseconds()), Rtt: uint32(conf.RTT.Milliseconds()),
StreamPath: device.GetStreamPath(), StreamPath: device.GetStreamPath(),
}) })
} }
@@ -209,8 +240,7 @@ func (s *Server) GetPushProxyList(ctx context.Context, req *emptypb.Empty) (res
} }
func (s *Server) AddPushProxy(ctx context.Context, req *pb.PushProxyInfo) (res *pb.SuccessResponse, err error) { func (s *Server) AddPushProxy(ctx context.Context, req *pb.PushProxyInfo) (res *pb.SuccessResponse, err error) {
device := &PushProxy{ device := &PushProxyConfig{
server: s,
Name: req.Name, Name: req.Name,
Type: req.Type, Type: req.Type,
ParentID: uint(req.ParentID), ParentID: uint(req.ParentID),
@@ -250,7 +280,7 @@ func (s *Server) AddPushProxy(ctx context.Context, req *pb.PushProxyInfo) (res *
return return
} }
s.DB.Create(device) s.DB.Create(device)
s.PushProxies.Add(device) _, err = s.createPushProxy(device)
res = &pb.SuccessResponse{} res = &pb.SuccessResponse{}
return return
} }
@@ -260,9 +290,7 @@ func (s *Server) UpdatePushProxy(ctx context.Context, req *pb.PushProxyInfo) (re
err = pkg.ErrNoDB err = pkg.ErrNoDB
return return
} }
target := &PushProxy{ target := &PushProxyConfig{}
server: s,
}
err = s.DB.First(target, req.ID).Error err = s.DB.First(target, req.ID).Error
if err != nil { if err != nil {
return return
@@ -299,27 +327,18 @@ func (s *Server) UpdatePushProxy(ctx context.Context, req *pb.PushProxyInfo) (re
target.RTT = time.Duration(int(req.Rtt)) * time.Millisecond target.RTT = time.Duration(int(req.Rtt)) * time.Millisecond
target.StreamPath = req.StreamPath target.StreamPath = req.StreamPath
s.DB.Save(target) s.DB.Save(target)
var needStopOld *PushProxy
// Stop the old proxy if needed
s.PushProxies.Call(func() error { s.PushProxies.Call(func() error {
if device, ok := s.PushProxies.Get(uint(req.ID)); ok { if device, ok := s.PushProxies.Get(uint(req.ID)); ok {
if target.URL != device.URL || device.Audio != target.Audio || device.StreamPath != target.StreamPath {
device.Stop(task.ErrStopByUser) device.Stop(task.ErrStopByUser)
needStopOld = device
return nil
}
if device.PushOnStart != target.PushOnStart && target.PushOnStart && device.Handler != nil && device.Status == PushProxyStatusOnline {
device.Handler.Push()
}
device.Name = target.Name
device.PushOnStart = target.PushOnStart
device.Description = target.Description
} }
return nil return nil
}) })
if needStopOld != nil {
needStopOld.WaitStopped() // Create a new proxy with the updated config
s.PushProxies.Add(target) _, err = s.createPushProxy(target)
} res = &pb.SuccessResponse{}
res = &pb.SuccessResponse{} res = &pb.SuccessResponse{}
return return
} }
@@ -331,7 +350,7 @@ func (s *Server) RemovePushProxy(ctx context.Context, req *pb.RequestWithId) (re
} }
res = &pb.SuccessResponse{} res = &pb.SuccessResponse{}
if req.Id > 0 { if req.Id > 0 {
tx := s.DB.Delete(&PushProxy{ tx := s.DB.Delete(&PushProxyConfig{
ID: uint(req.Id), ID: uint(req.Id),
}) })
err = tx.Error err = tx.Error
@@ -343,7 +362,7 @@ func (s *Server) RemovePushProxy(ctx context.Context, req *pb.RequestWithId) (re
}) })
return return
} else if req.StreamPath != "" { } else if req.StreamPath != "" {
var deviceList []*PushProxy var deviceList []*PushProxyConfig
s.DB.Find(&deviceList, "stream_path=?", req.StreamPath) s.DB.Find(&deviceList, "stream_path=?", req.StreamPath)
if len(deviceList) > 0 { if len(deviceList) > 0 {
for _, device := range deviceList { for _, device := range deviceList {

View File

@@ -14,7 +14,7 @@ type IPusher interface {
GetPushJob() *PushJob GetPushJob() *PushJob
} }
type Pusher = func() IPusher type PusherFactory = func() IPusher
type PushJob struct { type PushJob struct {
Connection Connection

View File

@@ -26,7 +26,7 @@ type (
task.ITask task.ITask
GetRecordJob() *RecordJob GetRecordJob() *RecordJob
} }
Recorder = func(config.Record) IRecorder RecorderFactory = func(config.Record) IRecorder
RecordJob struct { RecordJob struct {
task.Job task.Job
StreamPath string // 对应本地流 StreamPath string // 对应本地流

View File

@@ -64,7 +64,7 @@ type (
StreamAlias map[config.Regexp]string `desc:"流别名"` StreamAlias map[config.Regexp]string `desc:"流别名"`
Location map[config.Regexp]string `desc:"HTTP路由转发规则,key为正则表达式,value为目标地址"` Location map[config.Regexp]string `desc:"HTTP路由转发规则,key为正则表达式,value为目标地址"`
PullProxy []*PullProxyConfig PullProxy []*PullProxyConfig
PushProxy []*PushProxy PushProxy []*PushProxyConfig
Admin struct { Admin struct {
EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制 EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制
FilePath string `default:"admin.zip" desc:"管理员界面文件路径"` FilePath string `default:"admin.zip" desc:"管理员界面文件路径"`
@@ -281,7 +281,7 @@ func (s *Server) Start() (err error) {
return return
} }
// Auto-migrate models // Auto-migrate models
if err = s.DB.AutoMigrate(&db.User{}, &PullProxyConfig{}, &PushProxy{}, &StreamAliasDB{}); err != nil { if err = s.DB.AutoMigrate(&db.User{}, &PullProxyConfig{}, &PushProxyConfig{}, &StreamAliasDB{}); err != nil {
s.Error("failed to auto-migrate models", "error", err) s.Error("failed to auto-migrate models", "error", err)
return return
} }
@@ -403,12 +403,12 @@ func (s *Server) Start() (err error) {
s.Info("server started") s.Info("server started")
s.Post(func() error { s.Post(func() error {
for plugin := range s.Plugins.Range { for plugin := range s.Plugins.Range {
if plugin.Meta.Puller != nil { if plugin.Meta.NewPuller != nil {
for streamPath, conf := range plugin.config.Pull { for streamPath, conf := range plugin.config.Pull {
plugin.handler.Pull(streamPath, conf, nil) plugin.handler.Pull(streamPath, conf, nil)
} }
} }
if plugin.Meta.Transformer != nil { if plugin.Meta.NewTransformer != nil {
for streamPath, _ := range plugin.config.Transform { for streamPath, _ := range plugin.config.Transform {
plugin.OnSubscribe(streamPath, url.Values{}) //按需转换 plugin.OnSubscribe(streamPath, url.Values{}) //按需转换
// transformer := plugin.Meta.Transformer() // transformer := plugin.Meta.Transformer()
@@ -463,15 +463,15 @@ func (s *Server) initPullProxies() {
func (s *Server) initPushProxies() { func (s *Server) initPushProxies() {
// 1. Read all push proxies from database // 1. Read all push proxies from database
var pushProxies []*PushProxy var pushProxies []*PushProxyConfig
s.DB.Find(&pushProxies) s.DB.Find(&pushProxies)
// Create a map for quick lookup of existing proxies // Create a map for quick lookup of existing proxies
existingPushProxies := make(map[uint]*PushProxy) existingPushProxies := make(map[uint]*PushProxyConfig)
for _, proxy := range pushProxies { for _, proxy := range pushProxies {
existingPushProxies[proxy.ID] = proxy existingPushProxies[proxy.ID] = proxy
proxy.Status = PushProxyStatusOffline
proxy.InitializeWithServer(s) proxy.InitializeWithServer(s)
proxy.ChangeStatus(PushProxyStatusOffline)
} }
// 2. Process and override with config data // 2. Process and override with config data
@@ -497,7 +497,7 @@ func (s *Server) initPushProxies() {
// 3. Finally add all proxies to collections // 3. Finally add all proxies to collections
for _, proxy := range pushProxies { for _, proxy := range pushProxies {
s.PushProxies.Add(proxy) s.createPushProxy(proxy)
} }
} }
@@ -516,7 +516,7 @@ func (s *Server) initPushProxiesWithoutDB() {
for _, proxy := range s.PushProxy { for _, proxy := range s.PushProxy {
if proxy.ID != 0 { if proxy.ID != 0 {
proxy.InitializeWithServer(s) proxy.InitializeWithServer(s)
s.PushProxies.Add(proxy, proxy.Logger) s.createPushProxy(proxy)
} }
} }
} }
@@ -587,8 +587,9 @@ func (s *Server) OnPublish(p *Publisher) {
plugin.OnPublish(p) plugin.OnPublish(p)
} }
for pushProxy := range s.PushProxies.Range { for pushProxy := range s.PushProxies.Range {
if pushProxy.Status == PushProxyStatusOnline && pushProxy.GetStreamPath() == p.StreamPath && !pushProxy.PushOnStart { conf := pushProxy.GetConfig()
pushProxy.Handler.Push() if conf.Status == PushProxyStatusOnline && pushProxy.GetStreamPath() == p.StreamPath && !conf.PushOnStart {
pushProxy.Push()
} }
} }
} }

View File

@@ -16,7 +16,7 @@ type (
task.ITask task.ITask
GetTransformJob() *TransformJob GetTransformJob() *TransformJob
} }
Transformer = func() ITransformer TransformerFactory = func() ITransformer
TransformJob struct { TransformJob struct {
task.Job task.Job
StreamPath string // 对应本地流 StreamPath string // 对应本地流