first commit

This commit is contained in:
dexter
2022-02-07 17:29:45 +08:00
parent cea276b5b1
commit d5a61e5c98
4 changed files with 172 additions and 233 deletions

14
go.mod
View File

@@ -1,11 +1,7 @@
module github.com/Monibuca/plugin-hdl/v3
module github.com/Monibuca/plugin-hdl/v4
go 1.13
go 1.18
require (
github.com/Monibuca/engine/v3 v3.4.1
github.com/Monibuca/utils/v3 v3.0.5
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
)
require github.com/logrusorgru/aurora v2.0.3+incompatible
require github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 // indirect

78
go.sum
View File

@@ -1,82 +1,4 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/Monibuca/engine/v3 v3.4.1 h1:Ap2VbwTkMUkv80NPeUX2sNdV5Vz5nPVoU/6RU51PSAc=
github.com/Monibuca/engine/v3 v3.4.1/go.mod h1:rgAUey5ziRhlh6WugWyA5fYKyGOvcwhtTMDk4sukE7E=
github.com/Monibuca/utils/v3 v3.0.5 h1:w14x0HkWTbF4MmHbINLlOwe4VJNoSOeaQChMk5E/4es=
github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY=
github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs=
github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4=
github.com/cnotch/loader v0.0.0-20200405015128-d9d964d09439/go.mod h1:oWpDagHB6p+Kqqq7RoRZKyC4XAXft50hR8pbTxdbYYs=
github.com/cnotch/queue v0.0.0-20200326024423-6e88bdbf2ad4/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg=
github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg=
github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo=
github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg=
github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE=
github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es=
github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA=
github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY=
github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8 h1:r1JUI0wuHlgRb8jNd3zPBBkjUdrjpVKr8SdJWc8ntg8=
github.com/zhangpeihao/goamf v0.0.0-20140409082417-3ff2c19514a8/go.mod h1:RZd/IqzNpFANwOB9rVmsnAYpo/6KesK4PqrN1a5cRgg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

180
main.go
View File

@@ -2,101 +2,96 @@ package hdl
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"net"
"net/http"
"regexp"
"time"
. "github.com/Monibuca/engine/v3"
"github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec"
. "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/util"
. "github.com/logrusorgru/aurora"
amf "github.com/zhangpeihao/goamf"
)
var config struct {
ListenAddr string
ListenAddrTLS string
CertFile string
KeyFile string
Reconnect bool
AutoPullList map[string]string
type HDLConfig struct {
HTTPConfig
PublishConfig
SubscribeConfig
PullConfig
context.Context
context.CancelFunc
}
var streamPathReg = regexp.MustCompile(`/(hdl/)?((.+)(\.flv)|(.+))`)
var pconfig = PluginConfig{
Name: "HDL",
Config: &config,
var config = &HDLConfig{
PublishConfig: DefaultPublishConfig,
SubscribeConfig: DefaultSubscribeConfig,
}
func (config *HDLConfig) Update(override Config) {
override.Unmarshal(config)
needListen := false
if config.CancelFunc == nil {
needListen = config.ListenAddr != "" || config.ListenAddrTLS != ""
if config.PullOnStart {
for streamPath, url := range config.AutoPullList {
if err := PullStream(streamPath, url); err != nil {
util.Println(err)
}
}
}
} else {
if override.Has("ListenAddr") || override.Has("ListenAddrTLS") {
config.CancelFunc()
needListen = config.ListenAddr != "" || config.ListenAddrTLS != ""
}
}
config.Context, config.CancelFunc = context.WithCancel(Ctx)
if needListen {
util.Print(Green("HDL Listen at "), BrightBlue(config.ListenAddr), BrightBlue(config.ListenAddrTLS))
config.Listen(config)
}
}
func init() {
pconfig.Install(run)
if plugin := InstallPlugin(config); plugin != nil {
plugin.HandleApi("/list", util.GetJsonHandler(getHDList, time.Second))
plugin.HandleFunc("/pull", func(rw http.ResponseWriter, r *http.Request) {
util.CORS(rw, r)
targetURL := r.URL.Query().Get("target")
streamPath := r.URL.Query().Get("streamPath")
save := r.URL.Query().Get("save")
if err := PullStream(streamPath, targetURL); err == nil {
if save == "1" {
if config.AutoPullList == nil {
config.AutoPullList = make(map[string]string)
}
config.AutoPullList[streamPath] = targetURL
if err = plugin.Save(); err != nil {
util.Println(err)
}
}
rw.WriteHeader(200)
} else {
rw.WriteHeader(500)
}
})
plugin.HandleFunc("/", config.ServeHTTP)
}
}
func getHDList() (info []*Stream) {
for _, s := range Streams.ToList() {
if _, ok := s.ExtraProp.(*HDLPuller); ok {
if _, ok := s.Publisher.(*HDLPuller); ok {
info = append(info, s)
}
}
return
}
func run() {
http.HandleFunc("/api/hdl/list", func(rw http.ResponseWriter, r *http.Request) {
utils.CORS(rw, r)
if r.URL.Query().Get("json") != "" {
if jsonData, err := json.Marshal(getHDList()); err == nil {
rw.Write(jsonData)
} else {
rw.WriteHeader(500)
}
return
}
sse := utils.NewSSE(rw, r.Context())
var err error
for tick := time.NewTicker(time.Second); err == nil; <-tick.C {
err = sse.WriteJSON(getHDList())
}
})
http.HandleFunc("/api/hdl/pull", func(rw http.ResponseWriter, r *http.Request) {
utils.CORS(rw, r)
targetURL := r.URL.Query().Get("target")
streamPath := r.URL.Query().Get("streamPath")
save := r.URL.Query().Get("save")
if err := PullStream(streamPath, targetURL); err == nil {
if save == "1" {
if config.AutoPullList == nil {
config.AutoPullList = make(map[string]string)
}
config.AutoPullList[streamPath] = targetURL
if err = pconfig.Save(); err != nil {
utils.Println(err)
}
}
rw.WriteHeader(200)
} else {
rw.WriteHeader(500)
}
})
if config.ListenAddr != "" || config.ListenAddrTLS != "" {
utils.Print(Green("HDL start at "), BrightBlue(config.ListenAddr), BrightBlue(config.ListenAddrTLS))
utils.ListenAddrs(config.ListenAddr, config.ListenAddrTLS, config.CertFile, config.KeyFile, http.HandlerFunc(HDLHandler))
} else {
utils.Print(Green("HDL start reuse gateway port"))
http.HandleFunc("/hdl/", HDLHandler)
}
for streamPath, url := range config.AutoPullList {
if err := PullStream(streamPath, url); err != nil {
utils.Println(err)
}
}
}
func HDLHandler(w http.ResponseWriter, r *http.Request) {
// if err := AuthHooks.Trigger(sign); err != nil {
// w.WriteHeader(403)
// return
// }
utils.CORS(w, r)
func (config *HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
parts := streamPathReg.FindStringSubmatch(r.RequestURI)
if len(parts) == 0 {
w.WriteHeader(404)
@@ -108,9 +103,8 @@ func HDLHandler(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "video/x-flv")
w.Write(codec.FLVHeader)
sub := Subscriber{ID: r.RemoteAddr, Type: "FLV", Ctx2: r.Context()}
if err := sub.Subscribe(stringPath); err == nil {
sub := Subscriber{ID: r.RemoteAddr, Type: "FLV"}
if sub.Subscribe(stringPath, config.SubscribeConfig) {
vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack()
var buffer bytes.Buffer
if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil {
@@ -128,32 +122,44 @@ func HDLHandler(w http.ResponseWriter, r *http.Request) {
"videodatarate": 0,
"filesize": 0,
}
if _, err := WriteEcmaArray(&buffer, metaData); err != nil {
return
}
var flags byte
if at != nil {
flags |= (1 << 2)
}
if vt != nil {
flags |= 1
}
w.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0})
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, net.Buffers{buffer.Bytes()})
if vt != nil {
metaData["videocodecid"] = int(vt.CodecID)
metaData["width"] = vt.SPSInfo.Width
metaData["height"] = vt.SPSInfo.Height
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, 0, vt.ExtraData.Payload)
sub.OnVideo = func(ts uint32, pack *VideoPack) {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, ts, pack.Payload)
vt.DecoderConfiguration.FLV.WriteTo(w)
sub.OnVideo = func(frame *VideoFrame) error {
frame.FLV.WriteTo(w)
return r.Context().Err()
}
}
if at != nil {
metaData["audiocodecid"] = int(at.CodecID)
metaData["audiosamplerate"] = at.SoundRate
metaData["audiosamplesize"] = int(at.SoundSize)
metaData["audiosamplerate"] = at.SampleRate
metaData["audiosamplesize"] = at.SampleSize
metaData["stereo"] = at.Channels == 2
if at.CodecID == 10 {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, 0, at.ExtraData)
at.DecoderConfiguration.FLV.WriteTo(w)
}
sub.OnAudio = func(ts uint32, pack *AudioPack) {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, ts, pack.Payload)
sub.OnAudio = func(frame *AudioFrame) error {
frame.FLV.WriteTo(w)
return r.Context().Err()
}
}
if _, err := WriteEcmaArray(&buffer, metaData); err != nil {
return
}
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, buffer.Bytes())
sub.Play(at, vt)
} else {
w.WriteHeader(500)
}
}
func WriteEcmaArray(w amf.Writer, o amf.Object) (n int, err error) {

133
pull.go
View File

@@ -1,27 +1,29 @@
package hdl
import (
"errors"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
. "github.com/Monibuca/engine/v3"
"github.com/Monibuca/utils/v3/codec"
. "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/track"
)
func pull(at *AudioTrack, vt *VideoTrack, reader io.Reader, lastDisconnect uint32) (lastTime uint32) {
func (puller *HDLPuller) pull() {
head := make([]byte, len(codec.FLVHeader))
io.ReadFull(reader, head)
for startTime := time.Now(); ; {
if t, timestamp, payload, err := codec.ReadFLVTag(reader); err == nil {
io.ReadFull(puller, head)
startTime := time.Now()
for {
if t, timestamp, payload, err := codec.ReadFLVTag(puller); err == nil {
switch t {
case codec.FLV_TAG_TYPE_AUDIO:
at.PushByteStream(timestamp+lastDisconnect, payload)
puller.at.WriteAVCC(timestamp+puller.lastTs, payload)
case codec.FLV_TAG_TYPE_VIDEO:
vt.PushByteStream(timestamp+lastDisconnect, payload)
puller.vt.WriteAVCC(timestamp+puller.lastTs, payload)
}
if timestamp != 0 {
elapse := time.Since(startTime)
@@ -30,70 +32,83 @@ func pull(at *AudioTrack, vt *VideoTrack, reader io.Reader, lastDisconnect uint3
time.Sleep(time.Millisecond*time.Duration(timestamp) - elapse)
}
}
lastTime = timestamp
puller.lastTs = timestamp
} else {
puller.UnPublish()
return
}
}
}
type HDLPuller struct{}
type HDLPuller struct {
Publisher
lastTs uint32 //断线前的时间戳
at *track.UnknowAudio
vt *track.UnknowVideo
io.ReadCloser
}
func PullStream(streamPath, url string) error {
stream := Stream{
URL: url,
Type: "HDL Pull",
StreamPath: streamPath,
ExtraProp: &HDLPuller{},
}
if strings.HasPrefix(url, "http") {
if res, err := http.Get(url); err == nil {
if stream.Publish() {
at := stream.NewAudioTrack(0)
vt := stream.NewVideoTrack(0)
go func() {
lastTs := pull(at, vt, res.Body, 0)
if config.Reconnect {
for stream.Err() == nil {
time.Sleep(time.Second * 5)
lastTs = pull(at, vt, res.Body, lastTs)
}
} else {
stream.Close()
}
}()
func (puller *HDLPuller) Close() {
}
func (puller *HDLPuller) OnStateChange(old StreamState, n StreamState) bool {
switch n {
case STATE_PUBLISHING:
puller.at = puller.NewAudioTrack()
puller.vt = puller.NewVideoTrack()
if puller.Type == "HDL Pull" {
if res, err := http.Get(puller.String()); err == nil {
puller.ReadCloser = res.Body
} else {
return errors.New("Bad Name")
return false
}
} else {
return err
}
} else {
stream.Type = "FLV File"
if file, err := os.Open(url); err == nil {
if stream.Publish() {
at := stream.NewAudioTrack(0)
vt := stream.NewVideoTrack(0)
go func() {
file.Seek(int64(len(codec.FLVHeader)), io.SeekStart)
lastTs := pull(at, vt, file, 0)
if config.Reconnect {
for stream.Err() == nil {
file.Seek(int64(len(codec.FLVHeader)), io.SeekStart)
lastTs = pull(at, vt, file, lastTs)
}
} else {
file.Close()
stream.Close()
}
}()
if file, err := os.Open(puller.String()); err == nil {
file.Seek(int64(len(codec.FLVHeader)), io.SeekStart)
puller.ReadCloser = file
} else {
file.Close()
return errors.New("Bad Name")
return false
}
} else {
return err
}
go puller.pull()
case STATE_WAITPUBLISH:
if config.AutoReconnect {
if puller.Type == "HDL Pull" {
if res, err := http.Get(puller.String()); err == nil {
puller.ReadCloser = res.Body
} else {
return true
}
} else {
if file, err := os.Open(puller.String()); err == nil {
file.Seek(int64(len(codec.FLVHeader)), io.SeekStart)
puller.ReadCloser = file
} else {
file.Close()
return true
}
go puller.pull()
}
}
}
return true
}
func PullStream(streamPath, address string) (err error) {
puller := &HDLPuller{}
puller.PullURL, err = url.Parse(address)
if err != nil {
return
}
puller.Config = config.PublishConfig
if strings.HasPrefix(puller.Scheme, "http") {
puller.Type = "HDL Pull"
puller.Publish(streamPath, puller)
} else {
puller.Type = "FLV File"
puller.Publish(streamPath, puller)
}
return nil
}