diff --git a/main.go b/main.go index 90a3d44..507a859 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,11 @@ type RTMPConfig struct { KeepAlive bool //保持rtmp连接,默认随着stream的close而主动断开 } +func pull(streamPath, url string) { + if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil { + RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) + } +} func (c *RTMPConfig) OnEvent(event any) { switch v := event.(type) { case FirstConfig: @@ -30,9 +35,7 @@ func (c *RTMPConfig) OnEvent(event any) { go c.Listen(RTMPPlugin, c) } for streamPath, url := range c.PullOnStart { - if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil { - RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) - } + pull(streamPath, url) } case config.Config: RTMPPlugin.CancelFunc() @@ -42,21 +45,14 @@ func (c *RTMPConfig) OnEvent(event any) { go c.Listen(RTMPPlugin, c) } case SEpublish: - for streamPath, url := range c.PushList { - if streamPath == v.Target.Path { - if err := RTMPPlugin.Push(streamPath, url, new(RTMPPusher), false); err != nil { - RTMPPlugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) - } + if url, ok := c.PushList[v.Target.Path]; ok { + if err := RTMPPlugin.Push(v.Target.Path, url, new(RTMPPusher), false); err != nil { + RTMPPlugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", url), zap.Error(err)) } } case *Stream: //按需拉流 - for streamPath, url := range c.PullOnSub { - if streamPath == v.Path { - if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), 0); err != nil { - RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) - } - break - } + if url, ok := c.PullOnSub[v.Path]; ok { + pull(v.Path, url) } } }