From bb574e00f77106ba2b2e02ade53c1cef1a391df8 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sun, 26 Dec 2021 23:24:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8B=89=E6=B5=81=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 +++++++++++-- go.mod | 4 ++-- go.sum | 22 +++++++------------- main.go | 33 ++++++++++++++++++++++------- pull.go | 62 +++++++++++++++++++++++++++++++++---------------------- 5 files changed, 85 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 880186a..cbcdc80 100644 --- a/README.md +++ b/README.md @@ -21,15 +21,18 @@ ListenAddr = ":2020" #默认为空 ListenAddrTLS = ":2021" #默认为空 CertFile = "file.cert" #默认为空 KeyFile = "file.key" #默认为空 +Reconnect = false #默认为false +[HDL.AutoPullList] +"live/hdl" = "http://flv.bdplay.nodemedia.cn/live/bbb.flv" ``` - `ListenAddr`是监听的地址,如果配置为空字符串,则是复用Gateway插件监听的公共端口 - `ListenAddrTLS` 公共https监听端口,默认为空,则不监听 - `CertFile` https用的证书,默认为空 - `KeyFile` https用的证书的key,默认为空 - +- Reconnect 拉流断开后是否重连 +- `HDL.AutoPullList` 自动拉流,可以设置多个,key为流唯一标识,value为拉流地址 ## 插件功能 - ### 从m7s拉取http-flv协议流 如果m7s中已经存在live/test流的话就可以用http-flv协议进行播放 如果监听端口不配置则公用Gateway的端口(默认8080) @@ -40,3 +43,10 @@ ffplay -i http://localhost:8080/hdl/live/test.flv ```bash ffplay -i http://localhost:2020/live/test.flv ``` +### m7s从远程拉取http-flv协议流 + +可调用接口 +`/hdl/pull/pull?target=[HTTP-FLV地址]&streamPath=[流标识]&save=[是否保存配置(留空则不保存)]` + +或者编程方式拉流 +`PullStream(streamPath, targetURL)` \ No newline at end of file diff --git a/go.mod b/go.mod index de81e9e..ff94ae1 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/Monibuca/plugin-hdl/v3 go 1.13 require ( - github.com/Monibuca/engine/v3 v3.3.0 - github.com/Monibuca/utils/v3 v3.0.2 + github.com/Monibuca/engine/v3 v3.4.0 + github.com/Monibuca/utils/v3 v3.0.5 github.com/logrusorgru/aurora v2.0.3+incompatible github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect diff --git a/go.sum b/go.sum index f64ab50..677082b 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,10 @@ -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Monibuca/engine/v3 v3.1.1 h1:UZXvAsFO/5Tae6rN42Wppa+UOMxfPcy3erCpPWV+TQY= -github.com/Monibuca/engine/v3 v3.1.1/go.mod h1:yz6cssED2VlYu+g/LrxseBB9pcvsLM/o2QXa4gVY650= -github.com/Monibuca/engine/v3 v3.3.0 h1:7zwYsLEHdeVZy6+JjVlaDhl/asr0HG6jirBL4uynj0s= -github.com/Monibuca/engine/v3 v3.3.0/go.mod h1:odyqD/VTQDN4qgzajsgn7kW7MWDIzTHt+j+BcI8i+4g= -github.com/Monibuca/utils/v3 v3.0.0 h1:i8qCXQPQpRPgjuXKu5C2PYiL5LYzB6GW4xE162mB2ug= -github.com/Monibuca/utils/v3 v3.0.0/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= -github.com/Monibuca/utils/v3 v3.0.1/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= -github.com/Monibuca/utils/v3 v3.0.2 h1:n2vr67DHanav8wBC9IENk8xrKzeGJnBsxYUu69s8TrQ= -github.com/Monibuca/utils/v3 v3.0.2/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= +github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/Monibuca/engine/v3 v3.4.0 h1:dXh9ZRtnW6hrIGcGoG05MG6mNQsX2k27ftzgNczOyhE= +github.com/Monibuca/engine/v3 v3.4.0/go.mod h1:rgAUey5ziRhlh6WugWyA5fYKyGOvcwhtTMDk4sukE7E= +github.com/Monibuca/utils/v3 v3.0.5 h1:w14x0HkWTbF4MmHbINLlOwe4VJNoSOeaQChMk5E/4es= +github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY= github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs= github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4= @@ -43,8 +39,8 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI= -github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA= +github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -71,8 +67,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/main.go b/main.go index 73dcb1a..d902429 100644 --- a/main.go +++ b/main.go @@ -18,19 +18,38 @@ var config struct { ListenAddrTLS string CertFile string KeyFile string + Reconnect bool AutoPullList map[string]string } var streamPathReg = regexp.MustCompile(`/(hdl/)?((.+)(\.flv)|(.+))`) - -func init() { - InstallPlugin(&PluginConfig{ - Name: "HDL", - Config: &config, - Run: run, - }) +var pconfig = PluginConfig{ + Name: "HDL", + Config: &config, } +func init() { + pconfig.Install(run) +} func run() { + http.HandleFunc("/api/hdl/pull", func(rw http.ResponseWriter, r *http.Request) { + targetURL := r.URL.Query().Get("target") + streamPath := r.URL.Query().Get("streamPath") + save := r.URL.Query().Get("save") + if err := PullStream(streamPath, targetURL); err == nil { + if save != "" { + if config.AutoPullList == nil { + config.AutoPullList = make(map[string]string) + } + config.AutoPullList[streamPath] = targetURL + if err = pconfig.Save(); err != nil { + utils.Println(err) + } + } + rw.WriteHeader(200) + } else { + rw.WriteHeader(500) + } + }) if config.ListenAddr != "" || config.ListenAddrTLS != "" { utils.Print(Green("HDL start at "), BrightBlue(config.ListenAddr), BrightBlue(config.ListenAddrTLS)) utils.ListenAddrs(config.ListenAddr, config.ListenAddrTLS, config.CertFile, config.KeyFile, http.HandlerFunc(HDLHandler)) diff --git a/pull.go b/pull.go index 2df227a..26a95ff 100644 --- a/pull.go +++ b/pull.go @@ -10,6 +10,41 @@ import ( "github.com/Monibuca/utils/v3/codec" ) +func pull(stream *Stream, reader io.Reader, lastDisconnect uint32) error { + var lastTime uint32 + if config.Reconnect { + time.Sleep(time.Second * 5) + go pull(stream, reader, lastTime) + } else { + defer stream.Close() + } + head := make([]byte, len(codec.FLVHeader)) + io.ReadFull(reader, head) + at := stream.NewAudioTrack(0) + vt := stream.NewVideoTrack(0) + for readT := time.Now(); ; readT = time.Now() { + if t, timestamp, payload, err := codec.ReadFLVTag(reader); err == nil { + if lastDisconnect != 0 && timestamp == 0 { + continue + } + readCost := time.Since(readT) + switch t { + case codec.FLV_TAG_TYPE_AUDIO: + at.PushByteStream(timestamp+lastDisconnect, payload) + case codec.FLV_TAG_TYPE_VIDEO: + vt.PushByteStream(timestamp+lastDisconnect, payload) + if timestamp != 0 { + if duration := time.Duration(timestamp-lastTime) * time.Millisecond; readCost < duration { + time.Sleep(duration - readCost) + } + } + lastTime = timestamp + } + } else { + return err + } + } +} func PullStream(streamPath, url string) error { if res, err := http.Get(url); err == nil { stream := Stream{ @@ -17,35 +52,12 @@ func PullStream(streamPath, url string) error { StreamPath: streamPath, } if stream.Publish() { - defer stream.Close() - head := make([]byte, len(codec.FLVHeader)) - io.ReadFull(res.Body, head) - var lastTime uint32 - at := stream.NewAudioTrack(0) - vt := stream.NewVideoTrack(0) - for { - if t, timestamp, payload, err := codec.ReadFLVTag(res.Body); err == nil { - switch t { - case codec.FLV_TAG_TYPE_AUDIO: - at.PushByteStream(timestamp, payload) - case codec.FLV_TAG_TYPE_VIDEO: - if timestamp != 0 { - if lastTime == 0 { - lastTime = timestamp - } - } - vt.PushByteStream(timestamp, payload) - time.Sleep(time.Duration(timestamp-lastTime) * time.Millisecond) - lastTime = timestamp - } - } else { - return err - } - } + go pull(&stream, res.Body, 0) } else { return errors.New("Bad Name") } } else { return err } + return nil }