commit 06816dba9d34255d3854b54b57070d5e178331b2 Author: erroot Date: Sun Feb 25 11:00:47 2024 +0800 v20240225110000 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c080bc5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 Monibuca + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..cced523 --- /dev/null +++ b/README.md @@ -0,0 +1,60 @@ +# 截图插件 + + + +## 插件地址 + +https://github.com/Monibuca/plugin-transform + + +http://127.0.0.1:8088/transform?streampath=njtv/njy&transtype=0&newstreampath=njtv/njy-tsh264&videocodec=libx264&osdtext=API动态添加h264转码&resolution=388*256 + +参数 +streampath: 订阅流地址(m7s 内部流地址) +newstreampath: 转码发布的新流地址 +videocodec: 转码流编码 libx264 、 libx265 +osdtext: 自定义叠加文字 默认“M7S转码” +osdfontcolor: 叠加文字颜色 默认green +osdfontsize: 叠加文字颜色 默认100 +osdy: 与osdX配合使用叠加文字位置 +osdx: 与osdY配合使用叠加文字位置 +osdbox: 叠加背景框 默认0, 1可选 +osdboxcolor: 叠加背颜色 默认yellow +resolution: 转码分辨率 格式w*h eg:720*576 + + +## 插件引入 +```go +import ( + _ "m7s.live/plugin/transform/v4" +) +``` +## 默认配置 + +```yaml + +transform: + ffmpeg: ffmpeg.exe + fontfile: "SIMHEI.TTF" #不支持绝对路线需要把系统字符复制到程序执行的相对目录下原因未知 + publishtimeout: 20s + onstart: # 服务启动时自动订阅m7s 系统流进行转码 + - + streampath: "njtv/glgc" + resolution: "320*240" + videocodec: "libx264" # 当前仅支持libx264 libxh265 转码后发布流无法播放, + osdfontcolor: "green" + osdText: "默认配置M7S转码" + osdbox: 1 + osdboxcolor: "yellow" + - + streampath: "live/305" + resolution: "320*240" + videocodec: "libx264" + osdfontcolor: "red" +``` +如果ffmpeg无法全局访问,则可修改ffmpeg路径为本地的绝对路径 +## API + +### `/transform/[streamPath]` + + diff --git a/default.yaml b/default.yaml new file mode 100644 index 0000000..48c5da2 --- /dev/null +++ b/default.yaml @@ -0,0 +1,19 @@ + +transform: + ffmpeg: ffmpeg.exe + fontfile: "SIMHEI.TTF" #不支持绝对路线需要把系统字符复制到程序执行的相对目录下原因未知 + publishtimeout: 20s + onstart: # 服务启动时自动拉流 + - + streampath: "njtv/glgc" + resolution: "320*240" + videocodec: "libx264" + osdfontcolor: "green" + osdText: "默认配置M7S转码" + osdbox: 1 + osdboxcolor: "yellow" + - + streampath: "live/305" + resolution: "320*240" + videocodec: "libx264" + osdfontcolor: "red" \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1172c58 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module m7s.live/plugin/transform/v4 + +go 1.18 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f19520f --- /dev/null +++ b/go.sum @@ -0,0 +1,83 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= +github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/Monibuca/engine/v3 v3.5.1 h1:3AX+FwxerMw3JuyGXIOd/1dYCjA3IzWLKH/zq/GWe20= +github.com/Monibuca/engine/v3 v3.5.1/go.mod h1:yNiVKeHxgv+Ez+f2RHXMkXoa5Oxv+G7Ch+MJdHi7ing= +github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/Monibuca/utils/v3 v3.0.6 h1:Ya1KjxmirzHnaLneOgbopzMm9NhJtZxD4XqePefOYZw= +github.com/Monibuca/utils/v3 v3.0.6/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= +github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY= +github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs= +github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4= +github.com/cnotch/loader v0.0.0-20200405015128-d9d964d09439/go.mod h1:oWpDagHB6p+Kqqq7RoRZKyC4XAXft50hR8pbTxdbYYs= +github.com/cnotch/queue v0.0.0-20200326024423-6e88bdbf2ad4/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg= +github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg= +github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo= +github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= +github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= +github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= +github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= +github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= +github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= +github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= +github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA= +github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= +github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= +github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..d2ea914 --- /dev/null +++ b/main.go @@ -0,0 +1,759 @@ +package transform + +import ( + _ "embed" + "encoding/hex" + "errors" + "fmt" + "io" + "os" + "sync" + "time" + + "log" + "net/http" + "os/exec" + "strconv" + + "go.uber.org/zap" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/config" +) + +//go:embed default.yaml +var defaultYaml DefaultYaml + +var tanfsTaskArray = make(map[string]*TransformTask) + +type TransformConfig struct { + DefaultYaml + config.Publish + config.Subscribe + Ffmpeg string `default:"ffmpeg" desc:"ffmpeg的路径 "` + Path string //存储路径 + Filter string //过滤器 + Fontfile string `default:"shoujin.ttf" desc:"叠加字体路径 "` //osd 叠加字帖路径 shoujin.ttf + //OnStart []string `desc:"启动时转码的列表"` // 启动时转码的列表 + + OnStart []StreamConfig `yaml:"onstart"` +} + +// 用此包解析yaml文件到结构体存在三个问题。 +// yaml文件中的配置项首字母不能大写,配置项名字要求比较严格。 +// 没有默认值。 +// 没有数值校验。 + +type StreamConfig struct { + TransType int `default:"2" yaml:"transtype"` //转码类型 0: rtsp pull rtmp push; 1: sub raw frame rtmp push; 2: sub raw frame ts publiser; + StreamPath string `default:"" yaml:"streampath"` + NewStreamPath string `default:"" yaml:"newstreampath"` + Resolution string `default:"720*576" yaml:"resolution"` + + VideoCodec string `default:"libx264" yaml:"videocodec"` //libx264, libx265 + Fps string `default:"25" yaml:"fps"` + + HasOsd bool `default:"false" yaml:"hasosd"` + OsdText string `default:"M7S 转码" yaml:"osdtext"` + OsdFontsize int `default:"100" yaml:"osdfontsize"` + OsdFontColor string `default:"white" yaml:"osdfontcolor"` + OsdX int `default:"50" yaml:"osdx"` + OsdY int `default:"50" yaml:"osdy"` + OsdBox int `default:"1" yaml:"osdbox"` + OsdBoxcolor string `default:"yellow" yaml:"osdboxcolor"` +} + +type TransformTask struct { + plugin *TransformConfig + + status int //0 :idel ; 1 input ing; 2 output ing + + //统计信息 + restartFFCount int + rePullCount int + + streamConfig StreamConfig + + atTime time.Time //开始时间 + cmd *exec.Cmd + + p *TransformPublisher + s *TransformSubscriber + + //两个管道一个输入 + in_wp io.WriteCloser + in_bytes int + + out_rp io.ReadCloser + out_bytes int + + mt sync.Mutex + + f *os.File +} + +type TransformPublisher struct { + TSPublisher + tsReader *TSReader + task *TransformTask +} + +func (p *TransformPublisher) Delete() { + p.Stop() + if p.tsReader != nil { + p.tsReader.Close() + } +} + +func (p *TransformPublisher) OnEvent(event any) { + switch event.(type) { + case IPublisher: + p.Stream.NeverTimeout = true + log.Println("TransformPublisher OnEvent IPublisher...") + p.TSPublisher.OnEvent(event) + case SEKick, SEclose: + log.Println("TransformPublisher SEclose IPublisher...") + p.Publisher.OnEvent(event) + default: + p.Publisher.OnEvent(event) + } +} + +func (t *TransformConfig) OnEvent(event any) { + switch v := event.(type) { + case IPublisher: + log.Println("TransformConfig OnEvent IPublisher...") + case FirstConfig: + log.Println("transform FirstConfig") + for _, stream := range t.OnStart { + t.SetUpTransformTask(stream) + } + break + case config.Config: + log.Println("transform config.Config") + break + case SEclose: + log.Println("transform SEclose:%s", v.Target.Path) + break + // case SEpublish: + // log.Println("transform SEpublish:%s", v.Stream.Path) + // break + } +} + +var conf = &TransformConfig{ + DefaultYaml: defaultYaml, +} + +var TransformPlugin = InstallPlugin(conf) + +func (t *TransformConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { + //streamPath := strings.TrimPrefix(r.RequestURI, "/transform/") + + streamConfig := StreamConfig{ + TransType: 0, + StreamPath: "", + NewStreamPath: "", + Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720 + Fps: "25", + VideoCodec: "libx264", //libx264,libx265 + + HasOsd: false, + OsdText: "M7S 转码", + OsdFontColor: "green", + OsdFontsize: 100, + OsdX: 50, + OsdY: 50, + OsdBox: 0, + OsdBoxcolor: "yellow", + } + + streamPath := r.URL.Query().Get("streampath") + streamConfig.StreamPath = streamPath + + transType, err := strconv.Atoi(r.URL.Query().Get("transtype")) + if err == nil { + streamConfig.TransType = transType + } + //输出分辨率 + if r.URL.Query().Has("resolution") { + streamConfig.Resolution = r.URL.Query().Get("resolution") + } + //输出编码格式 + if r.URL.Query().Has("videocodec") { + streamConfig.VideoCodec = r.URL.Query().Get("videocodec") + } + //输出分辨率 + if r.URL.Query().Has("fps") { + streamConfig.Fps = r.URL.Query().Get("fps") + } + + if r.URL.Query().Has("osdtext") { + streamConfig.OsdText = r.URL.Query().Get("osdtext") + streamConfig.HasOsd = true + } + + if r.URL.Query().Has("osdx") { + streamConfig.OsdX, err = strconv.Atoi(r.URL.Query().Get("osdx")) + } + if r.URL.Query().Has("osdy") { + streamConfig.OsdY, err = strconv.Atoi(r.URL.Query().Get("osdy")) + } + if r.URL.Query().Has("osdboxcolor") { + streamConfig.OsdBoxcolor = r.URL.Query().Get("osdboxcolor") + } + if r.URL.Query().Has("osdbox") { + streamConfig.OsdBox, err = strconv.Atoi(r.URL.Query().Get("osdbox")) + } + + if r.URL.Query().Has("newstreampath") { + streamConfig.NewStreamPath = r.URL.Query().Get("newstreampath") + } + + t.SetUpTransformTask(streamConfig) + + w.Write([]byte("ok")) +} + +func (t *TransformConfig) SetDefaultStreamConfig(config *StreamConfig) { + // streamConfig := StreamConfig{ + // TransType: 0, + // StreamPath: "", + // NewStreamPath: "", + // Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720 + // Fps: "25", + // VideoCodec: "libx264", //libx264,libx265 + + // HasOsd: false, + // OsdText: "M7S 转码", + // OsdFontColor: "green", + // OsdFontsize: 100, + // OsdX: 50, + // OsdY: 50, + // OsdBox: 0, + // OsdBoxcolor: "yellow", + // } + + if config.Fps == "" { + config.Fps = "25" + } + + if config.Resolution == "" { + config.Resolution = "352*288" + } + + if config.VideoCodec == "" { + config.VideoCodec = "libx264" + } + + if config.OsdText == "" { + config.OsdText = "M7S 转码" + } + + if config.OsdFontColor == "" { + config.OsdFontColor = "green" + } + + if config.OsdFontsize == 0 { + config.OsdFontsize = 100 + } + + if config.OsdX == 0 { + config.OsdX = 100 + } + + if config.OsdY == 0 { + config.OsdY = 100 + } + + if config.OsdBoxcolor == "" { + config.OsdBoxcolor = "yellow" + } + +} + +func (t *TransformConfig) SetUpTransformTask(config StreamConfig) { + //更新默认配置 + t.SetDefaultStreamConfig(&config) + + task := &TransformTask{ + plugin: t, + streamConfig: config, + } + + if task.streamConfig.StreamPath == "" { + TransformPlugin.Info("stream transform invalid\n", zap.String("streamPath", task.streamConfig.StreamPath)) + return + } + + if task.streamConfig.NewStreamPath == "" { + typeStr := strconv.FormatInt(int64(task.streamConfig.TransType), 10) + task.streamConfig.NewStreamPath = task.streamConfig.StreamPath + "-ts" + typeStr + } + + if tanfsTaskArray[task.streamConfig.NewStreamPath] != nil { + TransformPlugin.Info("stream transform\n", zap.String("streamPath", task.streamConfig.NewStreamPath)) + return + } + + task.atTime = time.Now() + tanfsTaskArray[task.streamConfig.NewStreamPath] = task + + switch task.streamConfig.TransType { + case 0: + go task.setupFfmpegTransformThrd0() + case 1: + go task.setupFfmpegTransformThrd1() + case 2: + go task.setupFfmpegTransformThrd2() + } + + return + +} + +// 重点方案,增加ffmpeg 进程异常退出重启功能 默认方法 +// 学习stream 码流订阅用法 +// 学习stream 码流发布用法 +func (t *TransformTask) setupFfmpegTransformThrd0() { + TransformPlugin.Info("setupFfmpegTransformThrd0 pipe in and out...") + + //添加一个循环 避免ffmpeg 进程异常退出,退出后自动重新启动 + for { + t.status = 0 + + //ffmpeg 启动次数+1 + t.restartFFCount++ + + osdText := "" + //"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow", + + if t.streamConfig.OsdText != "" { + osdText += fmt.Sprintf("drawtext=fontsize=%d:fontfile=%s:text='%s':x=%d:y=%d:fontcolor=%s", + t.streamConfig.OsdFontsize, + t.plugin.Fontfile, + t.streamConfig.OsdText, + t.streamConfig.OsdX, + t.streamConfig.OsdY, + t.streamConfig.OsdFontColor) + } + + if t.streamConfig.OsdBox != 0 && t.streamConfig.OsdBoxcolor != "" { + osdText += fmt.Sprintf(":box=1:boxcolor=%s", + t.streamConfig.OsdBoxcolor) + } + + TransformPlugin.Info(osdText) + + cmd := exec.Command(conf.Ffmpeg, "-re", + "-i", "pipe:0", + "-tune", "zerolatency", //编码延迟参数 + //"-vcodec", t.videoCodec, + //"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔 + //"-b:v", "400k", + //"-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量 + "-s", t.streamConfig.Resolution, + "-r", t.streamConfig.Fps, + "-vf", + osdText, + "-c:v", t.streamConfig.VideoCodec, + //"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow", + //"-acodec", "libfaac", + //"-b:a", "64k", + "-acodec", "copy", + "-f", + "mpegts", //TS + "pipe:1", + ) + + TransformPlugin.Info(cmd.String()) + + // cmd := exec.Command(conf.Ffmpeg, "-re", + // "-i", "pipe:0", + // "-tune", "zerolatency", //编码延迟参数 + // "-vcodec", "libx264", + // "-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔 + // "-b:v", "400k", + // "-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量 + // "-s", "320*240", + // "-r", "25", + // "-vf", + // "drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow", + // "-acodec", "libfaac", + // "-b:a", "64k", + // "-f", + // //"h264", + // //"flv", + // "mpegts", //TS + // //"mpeg", //ps + // "pipe:1", + // //"-y", + // //"transform.flv", + // ) + + //获取输入流 + stdin, err := cmd.StdinPipe() + if err != nil { + TransformPlugin.Error("Error getting stdin pipe:", zap.Error(err)) + continue + } + + t.cmd = cmd + + t.in_wp = stdin + + //获取输出流 句柄 + stdout, err := cmd.StdoutPipe() + if err != nil { + TransformPlugin.Error("Error getting stdout pipe:", zap.Error(err)) + continue + } + t.out_rp = stdout + + // Start the command + err = cmd.Start() + if err != nil { + TransformPlugin.Error("Error starting command:", zap.Error(err)) + continue + } + + //优先启动读管道数据进程 + go t.readFFPipe1AndToPublisher() + + //定义一个订阅者 + s := &TransformSubscriber{} + //s.IsInternal = true + s.task = t + t.s = s + + if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil { + TransformPlugin.Error("TransformPlugin 2 Subscribe faild") + continue + } else { + //重点需要goroutin 启动订阅流,且只订阅了video track 裸流 + //避免重复请求播放 + if !s.IsPlaying() { + TransformPlugin.Info("TransformPlugin Subscribe sucess 2 play") + go s.PlayRaw() + } + } + + TransformPlugin.Info("cmd Start wait end....\n") + err = cmd.Wait() + if err != nil { + TransformPlugin.Error("Error Wait command:", zap.Error(err)) + } + //复位读写指针 + t.out_rp = nil + t.in_wp = nil + + //关闭订阅流 + if t.s != nil { + TransformPlugin.Info("try to close TransformSubscriber") + t.s.Delete() + t.s = nil + } + //关闭发布流 + if t.p != nil { + t.p.Delete() + t.p = nil + TransformPlugin.Info("try to close TransformPublisher") + } + TransformPlugin.Info("ffmpegTransformThrd end to restart", zap.Int("restartFFCount", t.restartFFCount)) + + //延迟后重启 + time.Sleep(time.Duration(1000) * time.Millisecond) + + } + + //TransformPlugin.Info("ffmpeg task end...") + + //t.taskEnd("ffmpeg cmd end") +} + +// 订阅的Track数据写入ffmpeg 输入管道 +func (t *TransformTask) writeToFFPipe0(buf []byte) { + + //左闭右开 + // n := len(buf) + // if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 7 { + // log.Printf("pipe in SPS:%d, [%v]\n", + // len(buf), hex.EncodeToString(buf)) + // } else if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 8 { + // log.Printf("pipe in PPS:%d, [%v]\n", + // len(buf), hex.EncodeToString(buf)) + // } else { + // if n > 5 { + // n = 5 + // } + // log.Printf("pipe in raw:%d, [%v]\n", + // len(buf), hex.EncodeToString(buf[0:n])) + // } + + t.status = 1 + if t.in_wp == nil { + TransformPlugin.Warn("invalid in pipe wp") + return + } + t.in_bytes += len(buf) + _, err := t.in_wp.Write(buf) + if err != nil { + TransformPlugin.Error("write to pipe0 failed:", zap.Error(err)) + } +} + +// ffmpeg 转码后的ts 流 发布 stream +func (t *TransformTask) readFFPipe1AndToPublisher() { + //s.readTsDataFromPipeOut() + + if t.out_rp == nil { + //time.Sleep(time.Duration(50) * time.Microsecond) + //continue + return + } + + if t.p == nil { + //发布一个新的转码流 + //定义一个发布者 + p := &TransformPublisher{} + p.task = t + t.p = p + + //判断流是否存在,存在则删除重新发布 + s := Streams.Get(t.streamConfig.NewStreamPath) + if s != nil { + Streams.Delete(t.streamConfig.NewStreamPath) + } + TransformPlugin.Info("TransformTask TSPublisher", zap.String("newStreamPath", t.streamConfig.NewStreamPath)) + if err := TransformPlugin.Publish(t.streamConfig.NewStreamPath, p); err != nil { + TransformPlugin.Error("TransformTask publish:", zap.Error(err)) + return + } + p.AudioTrack = nil + p.VideoTrack = nil + + //buf := make([]byte, 64*1024) + p.tsReader = NewTSReader(&p.TSPublisher) + //defer tsReader.Close() + + //很重要这一步 + //ffmpeg restart t.out_rp 会发生变化 + if p.TSPublisher.IO.Reader != t.out_rp { + TransformPlugin.Info("TSPublisher SetIO...") + p.TSPublisher.SetIO(t.out_rp) + } + } + + p := t.p + + for { + if t.out_rp == nil { + TransformPlugin.Info("TransformTask TSPublisher no out rp valid exit thrd") + break + } + //err := tsReader.Feed(t.out_rp) + //很重要这一步 + if p != nil && p.tsReader != nil { + err := p.tsReader.Feed(p) + if err == errors.New("file already closed") { + TransformPlugin.Error("tsReader.Feed:", zap.Error(err)) + break + } + if err != nil { + TransformPlugin.Error("tsReader.Feed:", zap.Error(err)) + //避免循环快速打印 + time.Sleep(time.Duration(500) * time.Millisecond) + // + } + + } + + //log.Println("p end Feed...") + } + +} + +// 为了验证测试,使用者自己加固,异常处理 +func (t *TransformTask) setupFfmpegTransformThrd1() { + TransformPlugin.Info("setupFfmpegTransformThrd1 url in and rtmp out...") + //转码并缩放 + //成功 + //ffmpeg -ss 0:01 -i "rtsp://127.0.0.1:554/njtv/glgc" -vcodec copy -vcodec libx264 -s 720*576 -f flv "rtmp://127.0.0.1:1935/njtv/glgc-d1" + + pullpath := "rtsp://127.0.0.1:554/" + t.streamConfig.StreamPath + tspath := "rtmp://127.0.0.1:1935/" + t.streamConfig.NewStreamPath + + //转码并保存 + //cmd := exec.Command("ffmpeg", "-re", "-r", "30", "-i", "pipe:0", "-vcodec", "libx264", "-f", "flv", "pipe:1", "-y", "another.flv") + cmd := exec.Command(conf.Ffmpeg, "-re", + "-i", pullpath, + //"-vcodec", "copy", + "-tune", "zerolatency", //编码延迟参数 + "-vcodec", "libx264", + "-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔 + "-b:v", "400k", + "-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量 + "-s", "320*240", + "-r", "25", + "-vf", + "drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts0':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow", + "-acodec", "libfaac", + "-b:a", "64k", + "-f", + "flv", + tspath, + ) + + t.cmd = cmd + // Start the command + err := cmd.Start() + if err != nil { + fmt.Println("Error starting command:", err) + return + } + log.Printf("cmd Start wait end....\n") + err = cmd.Wait() + if err != nil { + fmt.Println("Error Wait command:", err) + } + log.Printf("ffmpegTransformThrd end\n") + + t.taskEnd("cmd end") +} + +// 为了验证测试,使用者自己加固,异常处理 +func (t *TransformTask) setupFfmpegTransformThrd2() { + TransformPlugin.Info("setupFfmpegTransformThrd2 pipe in and rtmp out...") + s := &TransformSubscriber{} + //s.IsInternal = true + s.task = t + + t.s = s + + // sub.onSubscriberSucess() + if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil { + TransformPlugin.Error("TransformPlugin 1 Subscribe faild") + t.taskEnd("Subscribe faild") + return + } else { + TransformPlugin.Info("TransformPlugin Subscribe sucess") + //重点需要goroutin + go s.PlayRaw() + } + + cmd := exec.Command(conf.Ffmpeg, "-re", + "-i", "pipe:0", + "-tune", "zerolatency", //编码延迟参数 + "-vcodec", "libx264", + "-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔 + "-b:v", "400k", + "-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量 + "-s", "320*240", + "-r", "25", + "-vf", + "drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts1':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow", + "-acodec", "libfaac", + "-b:a", "64k", + "-f", + "flv", + "rtmp://127.0.0.1:1935/"+t.streamConfig.NewStreamPath, + ) + + t.cmd = cmd + //获取输入流 + stdin, err := cmd.StdinPipe() + if err != nil { + fmt.Println("Error getting stdin pipe:", err) + return + } + t.in_wp = stdin + + // Start the command + err = cmd.Start() + if err != nil { + fmt.Println("Error starting command:", err) + return + } + log.Printf("cmd Start wait end....\n") + err = cmd.Wait() + if err != nil { + fmt.Println("Error Wait command:", err) + } + log.Printf("ffmpegTransformThrd end\n") + + t.taskEnd("cmd end") +} + +func (t *TransformTask) taskEnd(reason string) { + + tanfsTaskArray[t.streamConfig.NewStreamPath] = nil + + if t.p != nil { + t.p.Stop() + t.p = nil + } + if t.s != nil { + t.s.Stop() + t.s = nil + } + + log.Printf(fmt.Sprintf("task:%s end for:%s\n", t.streamConfig.NewStreamPath, reason)) +} + +func (t *TransformTask) debugPrintfNal(buf []byte, name string) { + n := len(buf) + if n > 5 { + n = 5 + } + log.Printf("nal %s out len:%d, [%v]\n", + name, + len(buf), hex.EncodeToString(buf[0:n])) + + // s.Info("nal :", + // zap.String("int_bytes", name), + // zap.Int("int_bytes", len(buf)), + // zap.String("int_bytes", hex.EncodeToString(buf[0:n]))) +} + +func (t *TransformTask) writeTmpFile(data []byte) { + + if t.f == nil { + f, err := os.Create("tranform-tmp.ps") + if f == nil { + log.Println("create file faild:%v", err) + return + } + t.f = f + } + + _, err2 := t.f.Write(data) + if err2 != nil { + log.Println("Write file faild:%v", err2) + return + } + + if t.out_bytes > 1024*1024*5 { + t.f.Close() + os.Exit(0) + } + +} + +/* +func (s *TransformSubscriber) OnEvent(event any) { + switch v := event.(type) { + case VideoFrame: + //s.Stop() + // var errOut util.Buffer + // firstFrame := v.GetAnnexB() + // cmd := exec.Command(conf.FFmpeg, "-hide_banner", "-i", "pipe:0", "-vframes", "1", "-f", "mjpeg", "pipe:1") + // cmd.Stdin = &firstFrame + // cmd.Stderr = &errOut + // cmd.Stdout = s + // cmd.Run() + // if errOut.CanRead() { + // s.Info(string(errOut)) + // } + default: + s.Subscriber.OnEvent(event) + } +}*/ diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..31d391e --- /dev/null +++ b/subscriber.go @@ -0,0 +1,131 @@ +package transform + +import ( + "encoding/hex" + "fmt" + "log" + + "go.uber.org/zap" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/track" +) + +type TransformSubscriber struct { + Subscriber + task *TransformTask +} + +func (s *TransformSubscriber) Delete() { + s.Stop(zap.String("reason", "for restart")) +} + +func sliceAppend(s1 []byte, s2 []byte) []byte { + l1 := len(s1) + l2 := len(s2) + + if cap(s1) < l2+l1 { + return s1 + } + + for i := 0; i < len(s2); i++ { + s1[l1+i] = s2[i] + } + + return s1 +} + +func (s *TransformSubscriber) OnEvent(event any) { + + //获取转码流发布者 + t := s.task + + //s.Stream.Path + switch v := event.(type) { + case *track.Video: + fmt.Println("=====> write track.Video to publisher") + if s.Video != nil { + return + } + switch v.CodecID { + case codec.CodecID_H264: + fmt.Println("=====> CodecID_H264 on sub:", v.PayloadType) + // vt := p.VideoTrack + // vt = track.NewH264(p.Stream, v.PayloadType) + // p.VideoTrack = vt + log.Printf("pipe in SPS:%d, [%v]\n", + len(v.ParamaterSets[0]), hex.EncodeToString(v.ParamaterSets[0])) + + log.Printf("pipe in PPS:%d, [%v]\n", + len(v.ParamaterSets[1]), hex.EncodeToString(v.ParamaterSets[1])) + + //2023/04/02 17:07:51 pipe in SPS:35, [6764001fac2ca4014016ec04400000fa000030d43800001e848000186a02ef2e0fa489] + //2023/04/02 17:07:51 pipe in PPS:4, [68eb8f2c] + nal := []byte{0, 0, 0, 1} + //SPS + if len(v.ParamaterSets[0]) > 0 { + //vt.WriteSliceBytes(v.ParamaterSets[0]) + + t.writeToFFPipe0(append(nal, v.ParamaterSets[0]...)) + } + //PPS: + if len(v.ParamaterSets[1]) > 0 { + //vt.WriteSliceBytes(v.ParamaterSets[1]) + t.writeToFFPipe0(append(nal, v.ParamaterSets[1]...)) + } + case codec.CodecID_H265: + fmt.Println("=====> CodecID_H265 on sub") + // vt := p.VideoTrack + // vt = track.NewH265(p.Stream, v.PayloadType) + // p.VideoTrack = vt + // //VPS + // if len(v.ParamaterSets[0]) > 0 { + // vt.WriteSliceBytes(v.ParamaterSets[0]) + // } + // //SPS + // if len(v.ParamaterSets[1]) > 0 { + // vt.WriteSliceBytes(v.ParamaterSets[1]) + // } + // //PPS: + // if len(v.ParamaterSets[2]) > 0 { + // vt.WriteSliceBytes(v.ParamaterSets[2]) + // } + } + s.AddTrack(v) + case *track.Audio: + if s.Audio != nil { + return + } + fmt.Println("=====> write *track.Audio to publisher") + //p.VideoTrack.WriteAnnexB(v.PTS, v.DTS, v.GetAnnexB()[0]) + s.AddTrack(v) + case VideoFrame: + //fmt.Println("=====> write VideoFrame to publisher") + firstFrame := v.GetAnnexB() + // log.Printf("pipe in PTS:%d,DTS:%d buf num:%d\n", + // v.PTS, v.DTS, len(firstFrame)) + + for _, buf := range firstFrame { + // log.Printf("pipe in PTS:%d,DTS:%d buf len:%d,\n", + // v.PTS, v.DTS, len(buf)) + //s.debugPrintfNal(buf, "on sub frame") + //p.VideoTrack.WriteAnnexB(v.PTS, v.DTS, buf) + t.writeToFFPipe0(buf) + } + case VideoRTP: + fmt.Println("=====> on subscribe VideoRTP") + //p.WritePacketRTP(s.videoTrack, v.Packet) + //p.VideoTrack.WriteRTPPack(v.Packet) + case AudioRTP: + fmt.Println("=====> on subscribe AudioRTP to") + //s.stream.WritePacketRTP(s.audioTrack, v.Packet) + //p.AudioTrack.WriteRTPPack(v.Packet) + case ISubscriber: + //代表订阅成功事件,v就是p + fmt.Println("=====> begain Subscriber sucess") + default: + s.Subscriber.OnEvent(event) + + //fmt.Println("TransformSubscriber OnEvent:%T", v) + } +}