diff --git a/go.mod b/go.mod index 122505e..b58fa2e 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,7 @@ -module github.com/Monibuca/plugin-hdl/v3 +module github.com/Monibuca/plugin-hdl/v4 -go 1.13 +go 1.18 -require ( - github.com/Monibuca/engine/v3 v3.4.1 - github.com/Monibuca/utils/v3 v3.0.5 - github.com/logrusorgru/aurora v2.0.3+incompatible - github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 - golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect -) +require github.com/logrusorgru/aurora v2.0.3+incompatible + +require github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 // indirect diff --git a/go.sum b/go.sum index 782e311..c1bdab1 100644 --- a/go.sum +++ b/go.sum @@ -1,82 +1,4 @@ -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= -github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/Monibuca/engine/v3 v3.4.1 h1:Ap2VbwTkMUkv80NPeUX2sNdV5Vz5nPVoU/6RU51PSAc= -github.com/Monibuca/engine/v3 v3.4.1/go.mod h1:rgAUey5ziRhlh6WugWyA5fYKyGOvcwhtTMDk4sukE7E= -github.com/Monibuca/utils/v3 v3.0.5 h1:w14x0HkWTbF4MmHbINLlOwe4VJNoSOeaQChMk5E/4es= -github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= -github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY= -github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs= -github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4= -github.com/cnotch/loader v0.0.0-20200405015128-d9d964d09439/go.mod h1:oWpDagHB6p+Kqqq7RoRZKyC4XAXft50hR8pbTxdbYYs= -github.com/cnotch/queue v0.0.0-20200326024423-6e88bdbf2ad4/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg= -github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg= -github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo= -github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= -github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= -github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= -github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= -github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= -github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= -github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= -github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= -github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA= -github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= -github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= -github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 h1:r1JUI0wuHlgRb8jNd3zPBBkjUdrjpVKr8SdJWc8ntg8= github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8/go.mod h1:RZd/IqzNpFANwOB9rVmsnAYpo/6KesK4PqrN1a5cRgg= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 326d390..a0662b4 100644 --- a/main.go +++ b/main.go @@ -2,101 +2,96 @@ package hdl import ( "bytes" + "context" "encoding/binary" - "encoding/json" + "net" "net/http" "regexp" "time" - . "github.com/Monibuca/engine/v3" - "github.com/Monibuca/utils/v3" - "github.com/Monibuca/utils/v3/codec" + . "github.com/Monibuca/engine/v4" + "github.com/Monibuca/engine/v4/codec" + "github.com/Monibuca/engine/v4/util" . "github.com/logrusorgru/aurora" amf "github.com/zhangpeihao/goamf" ) -var config struct { - ListenAddr string - ListenAddrTLS string - CertFile string - KeyFile string - Reconnect bool - AutoPullList map[string]string +type HDLConfig struct { + HTTPConfig + PublishConfig + SubscribeConfig + PullConfig + context.Context + context.CancelFunc } + var streamPathReg = regexp.MustCompile(`/(hdl/)?((.+)(\.flv)|(.+))`) -var pconfig = PluginConfig{ - Name: "HDL", - Config: &config, +var config = &HDLConfig{ + PublishConfig: DefaultPublishConfig, + SubscribeConfig: DefaultSubscribeConfig, +} + +func (config *HDLConfig) Update(override Config) { + override.Unmarshal(config) + needListen := false + if config.CancelFunc == nil { + needListen = config.ListenAddr != "" || config.ListenAddrTLS != "" + if config.PullOnStart { + for streamPath, url := range config.AutoPullList { + if err := PullStream(streamPath, url); err != nil { + util.Println(err) + } + } + } + } else { + if override.Has("ListenAddr") || override.Has("ListenAddrTLS") { + config.CancelFunc() + needListen = config.ListenAddr != "" || config.ListenAddrTLS != "" + } + } + config.Context, config.CancelFunc = context.WithCancel(Ctx) + if needListen { + util.Print(Green("HDL Listen at "), BrightBlue(config.ListenAddr), BrightBlue(config.ListenAddrTLS)) + config.Listen(config) + } } func init() { - pconfig.Install(run) + if plugin := InstallPlugin(config); plugin != nil { + plugin.HandleApi("/list", util.GetJsonHandler(getHDList, time.Second)) + plugin.HandleFunc("/pull", func(rw http.ResponseWriter, r *http.Request) { + util.CORS(rw, r) + targetURL := r.URL.Query().Get("target") + streamPath := r.URL.Query().Get("streamPath") + save := r.URL.Query().Get("save") + if err := PullStream(streamPath, targetURL); err == nil { + if save == "1" { + if config.AutoPullList == nil { + config.AutoPullList = make(map[string]string) + } + config.AutoPullList[streamPath] = targetURL + if err = plugin.Save(); err != nil { + util.Println(err) + } + } + rw.WriteHeader(200) + } else { + rw.WriteHeader(500) + } + }) + plugin.HandleFunc("/", config.ServeHTTP) + } } func getHDList() (info []*Stream) { for _, s := range Streams.ToList() { - if _, ok := s.ExtraProp.(*HDLPuller); ok { + if _, ok := s.Publisher.(*HDLPuller); ok { info = append(info, s) } } return } -func run() { - http.HandleFunc("/api/hdl/list", func(rw http.ResponseWriter, r *http.Request) { - utils.CORS(rw, r) - if r.URL.Query().Get("json") != "" { - if jsonData, err := json.Marshal(getHDList()); err == nil { - rw.Write(jsonData) - } else { - rw.WriteHeader(500) - } - return - } - sse := utils.NewSSE(rw, r.Context()) - var err error - for tick := time.NewTicker(time.Second); err == nil; <-tick.C { - err = sse.WriteJSON(getHDList()) - } - }) - http.HandleFunc("/api/hdl/pull", func(rw http.ResponseWriter, r *http.Request) { - utils.CORS(rw, r) - targetURL := r.URL.Query().Get("target") - streamPath := r.URL.Query().Get("streamPath") - save := r.URL.Query().Get("save") - if err := PullStream(streamPath, targetURL); err == nil { - if save == "1" { - if config.AutoPullList == nil { - config.AutoPullList = make(map[string]string) - } - config.AutoPullList[streamPath] = targetURL - if err = pconfig.Save(); err != nil { - utils.Println(err) - } - } - rw.WriteHeader(200) - } else { - rw.WriteHeader(500) - } - }) - if config.ListenAddr != "" || config.ListenAddrTLS != "" { - utils.Print(Green("HDL start at "), BrightBlue(config.ListenAddr), BrightBlue(config.ListenAddrTLS)) - utils.ListenAddrs(config.ListenAddr, config.ListenAddrTLS, config.CertFile, config.KeyFile, http.HandlerFunc(HDLHandler)) - } else { - utils.Print(Green("HDL start reuse gateway port")) - http.HandleFunc("/hdl/", HDLHandler) - } - for streamPath, url := range config.AutoPullList { - if err := PullStream(streamPath, url); err != nil { - utils.Println(err) - } - } -} -func HDLHandler(w http.ResponseWriter, r *http.Request) { - // if err := AuthHooks.Trigger(sign); err != nil { - // w.WriteHeader(403) - // return - // } - utils.CORS(w, r) +func (config *HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { parts := streamPathReg.FindStringSubmatch(r.RequestURI) if len(parts) == 0 { w.WriteHeader(404) @@ -108,9 +103,8 @@ func HDLHandler(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Transfer-Encoding", "chunked") w.Header().Set("Content-Type", "video/x-flv") - w.Write(codec.FLVHeader) - sub := Subscriber{ID: r.RemoteAddr, Type: "FLV", Ctx2: r.Context()} - if err := sub.Subscribe(stringPath); err == nil { + sub := Subscriber{ID: r.RemoteAddr, Type: "FLV"} + if sub.Subscribe(stringPath, config.SubscribeConfig) { vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack() var buffer bytes.Buffer if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil { @@ -128,32 +122,44 @@ func HDLHandler(w http.ResponseWriter, r *http.Request) { "videodatarate": 0, "filesize": 0, } + if _, err := WriteEcmaArray(&buffer, metaData); err != nil { + return + } + var flags byte + if at != nil { + flags |= (1 << 2) + } + if vt != nil { + flags |= 1 + } + w.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0}) + codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, net.Buffers{buffer.Bytes()}) if vt != nil { metaData["videocodecid"] = int(vt.CodecID) metaData["width"] = vt.SPSInfo.Width metaData["height"] = vt.SPSInfo.Height - codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, 0, vt.ExtraData.Payload) - sub.OnVideo = func(ts uint32, pack *VideoPack) { - codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, ts, pack.Payload) + vt.DecoderConfiguration.FLV.WriteTo(w) + sub.OnVideo = func(frame *VideoFrame) error { + frame.FLV.WriteTo(w) + return r.Context().Err() } } if at != nil { metaData["audiocodecid"] = int(at.CodecID) - metaData["audiosamplerate"] = at.SoundRate - metaData["audiosamplesize"] = int(at.SoundSize) + metaData["audiosamplerate"] = at.SampleRate + metaData["audiosamplesize"] = at.SampleSize metaData["stereo"] = at.Channels == 2 if at.CodecID == 10 { - codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, 0, at.ExtraData) + at.DecoderConfiguration.FLV.WriteTo(w) } - sub.OnAudio = func(ts uint32, pack *AudioPack) { - codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, ts, pack.Payload) + sub.OnAudio = func(frame *AudioFrame) error { + frame.FLV.WriteTo(w) + return r.Context().Err() } } - if _, err := WriteEcmaArray(&buffer, metaData); err != nil { - return - } - codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, buffer.Bytes()) sub.Play(at, vt) + } else { + w.WriteHeader(500) } } func WriteEcmaArray(w amf.Writer, o amf.Object) (n int, err error) { diff --git a/pull.go b/pull.go index e7e707a..5594a0a 100644 --- a/pull.go +++ b/pull.go @@ -1,27 +1,29 @@ package hdl import ( - "errors" "io" "net/http" + "net/url" "os" "strings" "time" - . "github.com/Monibuca/engine/v3" - "github.com/Monibuca/utils/v3/codec" + . "github.com/Monibuca/engine/v4" + "github.com/Monibuca/engine/v4/codec" + "github.com/Monibuca/engine/v4/track" ) -func pull(at *AudioTrack, vt *VideoTrack, reader io.Reader, lastDisconnect uint32) (lastTime uint32) { +func (puller *HDLPuller) pull() { head := make([]byte, len(codec.FLVHeader)) - io.ReadFull(reader, head) - for startTime := time.Now(); ; { - if t, timestamp, payload, err := codec.ReadFLVTag(reader); err == nil { + io.ReadFull(puller, head) + startTime := time.Now() + for { + if t, timestamp, payload, err := codec.ReadFLVTag(puller); err == nil { switch t { case codec.FLV_TAG_TYPE_AUDIO: - at.PushByteStream(timestamp+lastDisconnect, payload) + puller.at.WriteAVCC(timestamp+puller.lastTs, payload) case codec.FLV_TAG_TYPE_VIDEO: - vt.PushByteStream(timestamp+lastDisconnect, payload) + puller.vt.WriteAVCC(timestamp+puller.lastTs, payload) } if timestamp != 0 { elapse := time.Since(startTime) @@ -30,70 +32,83 @@ func pull(at *AudioTrack, vt *VideoTrack, reader io.Reader, lastDisconnect uint3 time.Sleep(time.Millisecond*time.Duration(timestamp) - elapse) } } - lastTime = timestamp + puller.lastTs = timestamp } else { + puller.UnPublish() return } } } -type HDLPuller struct{} +type HDLPuller struct { + Publisher + lastTs uint32 //断线前的时间戳 + at *track.UnknowAudio + vt *track.UnknowVideo + io.ReadCloser +} -func PullStream(streamPath, url string) error { - stream := Stream{ - URL: url, - Type: "HDL Pull", - StreamPath: streamPath, - ExtraProp: &HDLPuller{}, - } - if strings.HasPrefix(url, "http") { - if res, err := http.Get(url); err == nil { - if stream.Publish() { - at := stream.NewAudioTrack(0) - vt := stream.NewVideoTrack(0) - go func() { - lastTs := pull(at, vt, res.Body, 0) - if config.Reconnect { - for stream.Err() == nil { - time.Sleep(time.Second * 5) - lastTs = pull(at, vt, res.Body, lastTs) - } - } else { - stream.Close() - } - }() +func (puller *HDLPuller) Close() { + +} + +func (puller *HDLPuller) OnStateChange(old StreamState, n StreamState) bool { + switch n { + case STATE_PUBLISHING: + puller.at = puller.NewAudioTrack() + puller.vt = puller.NewVideoTrack() + if puller.Type == "HDL Pull" { + if res, err := http.Get(puller.String()); err == nil { + puller.ReadCloser = res.Body } else { - return errors.New("Bad Name") + return false } } else { - return err - } - } else { - stream.Type = "FLV File" - if file, err := os.Open(url); err == nil { - if stream.Publish() { - at := stream.NewAudioTrack(0) - vt := stream.NewVideoTrack(0) - go func() { - file.Seek(int64(len(codec.FLVHeader)), io.SeekStart) - lastTs := pull(at, vt, file, 0) - if config.Reconnect { - for stream.Err() == nil { - file.Seek(int64(len(codec.FLVHeader)), io.SeekStart) - lastTs = pull(at, vt, file, lastTs) - } - } else { - file.Close() - stream.Close() - } - }() + if file, err := os.Open(puller.String()); err == nil { + file.Seek(int64(len(codec.FLVHeader)), io.SeekStart) + puller.ReadCloser = file } else { file.Close() - return errors.New("Bad Name") + return false } - } else { - return err } + go puller.pull() + case STATE_WAITPUBLISH: + if config.AutoReconnect { + if puller.Type == "HDL Pull" { + if res, err := http.Get(puller.String()); err == nil { + puller.ReadCloser = res.Body + } else { + return true + } + } else { + if file, err := os.Open(puller.String()); err == nil { + file.Seek(int64(len(codec.FLVHeader)), io.SeekStart) + puller.ReadCloser = file + } else { + file.Close() + return true + } + go puller.pull() + } + } + } + return true +} + +func PullStream(streamPath, address string) (err error) { + puller := &HDLPuller{} + puller.PullURL, err = url.Parse(address) + if err != nil { + return + } + puller.Config = config.PublishConfig + if strings.HasPrefix(puller.Scheme, "http") { + puller.Type = "HDL Pull" + puller.Publish(streamPath, puller) + } else { + puller.Type = "FLV File" + puller.Publish(streamPath, puller) } return nil }