diff --git a/main.go b/main.go index 12a56da..2f589af 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "encoding/binary" "net" "net/http" - "regexp" + "strings" "time" . "github.com/Monibuca/engine/v4" @@ -24,10 +24,8 @@ type HDLConfig struct { config.Pull } -var streamPathReg = regexp.MustCompile(`/(hdl/)?((.+)(\.flv)|(.+))`) - func (c *HDLConfig) OnEvent(event any) { - switch event.(type) { + switch v := event.(type) { case FirstConfig: if c.ListenAddr != "" || c.ListenAddrTLS != "" { plugin.Info(Green("HDL Server Start").String(), zap.String("ListenAddr", c.ListenAddr), zap.String("ListenAddrTLS", c.ListenAddrTLS)) @@ -35,33 +33,37 @@ func (c *HDLConfig) OnEvent(event any) { } else { plugin.Info(Green("HDL start reuse engine port").String()) } + case PullerPromise: + puller := v.Value + client := &HDLPuller{ + Puller: puller, + } + err := client.connect() + if err == nil { + if err = plugin.Publish(puller.StreamPath, client); err == nil { + v.Resolve(util.Null) + break + } + } + client.Error(puller.RemoteURL, zap.Error(err)) + v.Reject(err) } } func (c *HDLConfig) API_Pull(rw http.ResponseWriter, r *http.Request) { - var puller Puller - puller.StreamPath = r.URL.Query().Get("streamPath") - puller.RemoteURL = r.URL.Query().Get("target") - puller.Config = &c.Pull - c.PullStream(puller) - - if r.URL.Query().Get("save") != "" { - c.AddPull(puller.StreamPath, puller.RemoteURL) - plugin.Modified["pull"] = c.Pull - if err := plugin.Save(); err != nil { - plugin.Error("save faild", zap.Error(err)) - } + err := plugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), r.URL.Query().Has("save")) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) } } + func (*HDLConfig) API_List(rw http.ResponseWriter, r *http.Request) { util.ReturnJson(FilterStreams[*HDLPuller], time.Second, rw, r) } -var Config = new(HDLConfig) - // 确保HDLConfig实现了PullPlugin接口 -var plugin = InstallPlugin(Config) +var plugin = InstallPlugin(new(HDLConfig)) type HDLSubscriber struct { Subscriber @@ -78,26 +80,14 @@ func (sub *HDLSubscriber) OnEvent(event any) { } func (*HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { - parts := streamPathReg.FindStringSubmatch(r.RequestURI) - if len(parts) == 0 { - w.WriteHeader(404) - return - } - stringPath := parts[3] - if stringPath == "" { - stringPath = parts[5] - } + streamPath := strings.TrimPrefix(r.URL.Path, "/hls") w.Header().Set("Transfer-Encoding", "chunked") w.Header().Set("Content-Type", "video/x-flv") sub := &HDLSubscriber{} sub.ID = r.RemoteAddr - sub.OnEvent(r.Context()) - if err := plugin.Subscribe(stringPath, sub); err == nil { - if sub.Stream.Publisher == nil { - w.WriteHeader(404) - return - } - sub.Writer = w + sub.OnEvent(r.Context()) //注入父级Context + sub.OnEvent(w) //注入Writer + if err := plugin.Subscribe(streamPath, sub); err == nil { at, vt := sub.AudioTrack, sub.VideoTrack hasVideo := at != nil hasAudio := vt != nil diff --git a/main_test.go b/main_test.go deleted file mode 100644 index 6f741b9..0000000 --- a/main_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package hdl - -import ( - "testing" -) - -func TestHDLHandler(t *testing.T) { - tests := map[string]string{ - "/hdl/abc.flv": "abc", "/hdl/abc": "abc", "/abc": "abc", "/abc.flv": "abc", - } - for name, result := range tests { - t.Run(name, func(t *testing.T) { - parts := streamPathReg.FindStringSubmatch(name) - stringPath := parts[3] - if stringPath == "" { - stringPath = parts[5] - } - if stringPath != result { - t.Fail() - } - }) - } -} diff --git a/pull.go b/pull.go index ad034d4..8ae2992 100644 --- a/pull.go +++ b/pull.go @@ -9,15 +9,45 @@ import ( . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/codec" + "github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" ) -func (puller *HDLPuller) pull() { +func (puller *HDLPuller) connect() (err error) { puller.ReConnectCount++ + log.Info("connect", zap.String("remoteURL", puller.RemoteURL)) + if strings.HasPrefix(puller.RemoteURL, "http") { + var res *http.Response + if res, err = http.Get(puller.RemoteURL); err == nil { + puller.OnEvent(res.Body) + } + } else { + var res *os.File + if res, err = os.Open(puller.RemoteURL); err == nil { + puller.OnEvent(res) + } + } + if err != nil { + log.Error("connect", zap.Error(err)) + } + return +} +func (puller *HDLPuller) pull() { + var err error + defer func() { + puller.Closer.Close() + if !puller.Stream.IsClosed() { + if err = puller.connect(); err == nil { + go puller.pull() + } + } else { + puller.Info("stop", zap.String("remoteURL", puller.RemoteURL)) + } + }() head := util.Buffer(make([]byte, len(codec.FLVHeader))) reader := bufio.NewReader(puller) - _, err := io.ReadFull(reader, head) + _, err = io.ReadFull(reader, head) if err != nil { return } @@ -58,49 +88,10 @@ type HDLPuller struct { } func (puller *HDLPuller) OnEvent(event any) { - switch v := event.(type) { - case PullEvent: - if v > 0 { - go func(count PullEvent) { - puller.pull() //阻塞拉流 - // 如果流没有被关闭,则重连,重拉 - if !puller.Stream.IsClosed() { - puller.OnEvent(count) - } - }(v + 1) - } else { - // TODO: 发布失败重新发布 - if plugin.Publish(puller.StreamPath, puller) == nil { - if strings.HasPrefix(puller.RemoteURL, "http") { - if res, err := http.Get(puller.RemoteURL); err == nil { - puller.Reader = res.Body - puller.Closer = res.Body - } else { - puller.Error(puller.RemoteURL, zap.Error(err)) - return - } - } else { - if res, err := os.Open(puller.RemoteURL); err == nil { - puller.Reader = res - puller.Closer = res - } else { - puller.Error(puller.RemoteURL, zap.Error(err)) - return - } - } - // 注入context - puller.OnEvent(Engine) - puller.OnEvent(PullEvent(1)) - } - } + switch event.(type) { + case SEpublish: + go puller.pull() //阻塞拉流 default: puller.Publisher.OnEvent(event) } } - -func (config *HDLConfig) PullStream(puller Puller) { - client := &HDLPuller{ - Puller: puller, - } - client.OnEvent(PullEvent(0)) -}