mirror of
https://github.com/erroot/plugin-transform.git
synced 2025-09-27 03:06:31 +08:00
760 lines
19 KiB
Go
760 lines
19 KiB
Go
package transform
|
||
|
||
import (
|
||
_ "embed"
|
||
"encoding/hex"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"os"
|
||
"sync"
|
||
"time"
|
||
|
||
"log"
|
||
"net/http"
|
||
"os/exec"
|
||
"strconv"
|
||
|
||
"go.uber.org/zap"
|
||
. "m7s.live/engine/v4"
|
||
"m7s.live/engine/v4/config"
|
||
)
|
||
|
||
//go:embed default.yaml
|
||
var defaultYaml DefaultYaml
|
||
|
||
var tanfsTaskArray = make(map[string]*TransformTask)
|
||
|
||
type TransformConfig struct {
|
||
DefaultYaml
|
||
config.Publish
|
||
config.Subscribe
|
||
Ffmpeg string `default:"ffmpeg" desc:"ffmpeg的路径 "`
|
||
Path string //存储路径
|
||
Filter string //过滤器
|
||
Fontfile string `default:"shoujin.ttf" desc:"叠加字体路径 "` //osd 叠加字帖路径 shoujin.ttf
|
||
//OnStart []string `desc:"启动时转码的列表"` // 启动时转码的列表
|
||
|
||
OnStart []StreamConfig `yaml:"onstart"`
|
||
}
|
||
|
||
// 用此包解析yaml文件到结构体存在三个问题。
|
||
// yaml文件中的配置项首字母不能大写,配置项名字要求比较严格。
|
||
// 没有默认值。
|
||
// 没有数值校验。
|
||
|
||
type StreamConfig struct {
|
||
TransType int `default:"2" yaml:"transtype"` //转码类型 0: rtsp pull rtmp push; 1: sub raw frame rtmp push; 2: sub raw frame ts publiser;
|
||
StreamPath string `default:"" yaml:"streampath"`
|
||
NewStreamPath string `default:"" yaml:"newstreampath"`
|
||
Resolution string `default:"720*576" yaml:"resolution"`
|
||
|
||
VideoCodec string `default:"libx264" yaml:"videocodec"` //libx264, libx265
|
||
Fps string `default:"25" yaml:"fps"`
|
||
|
||
HasOsd bool `default:"false" yaml:"hasosd"`
|
||
OsdText string `default:"M7S 转码" yaml:"osdtext"`
|
||
OsdFontsize int `default:"100" yaml:"osdfontsize"`
|
||
OsdFontColor string `default:"white" yaml:"osdfontcolor"`
|
||
OsdX int `default:"50" yaml:"osdx"`
|
||
OsdY int `default:"50" yaml:"osdy"`
|
||
OsdBox int `default:"1" yaml:"osdbox"`
|
||
OsdBoxcolor string `default:"yellow" yaml:"osdboxcolor"`
|
||
}
|
||
|
||
type TransformTask struct {
|
||
plugin *TransformConfig
|
||
|
||
status int //0 :idel ; 1 input ing; 2 output ing
|
||
|
||
//统计信息
|
||
restartFFCount int
|
||
rePullCount int
|
||
|
||
streamConfig StreamConfig
|
||
|
||
atTime time.Time //开始时间
|
||
cmd *exec.Cmd
|
||
|
||
p *TransformPublisher
|
||
s *TransformSubscriber
|
||
|
||
//两个管道一个输入
|
||
in_wp io.WriteCloser
|
||
in_bytes int
|
||
|
||
out_rp io.ReadCloser
|
||
out_bytes int
|
||
|
||
mt sync.Mutex
|
||
|
||
f *os.File
|
||
}
|
||
|
||
type TransformPublisher struct {
|
||
TSPublisher
|
||
tsReader *TSReader
|
||
task *TransformTask
|
||
}
|
||
|
||
func (p *TransformPublisher) Delete() {
|
||
p.Stop()
|
||
if p.tsReader != nil {
|
||
p.tsReader.Close()
|
||
}
|
||
}
|
||
|
||
func (p *TransformPublisher) OnEvent(event any) {
|
||
switch event.(type) {
|
||
case IPublisher:
|
||
p.Stream.NeverTimeout = true
|
||
log.Println("TransformPublisher OnEvent IPublisher...")
|
||
p.TSPublisher.OnEvent(event)
|
||
case SEKick, SEclose:
|
||
log.Println("TransformPublisher SEclose IPublisher...")
|
||
p.Publisher.OnEvent(event)
|
||
default:
|
||
p.Publisher.OnEvent(event)
|
||
}
|
||
}
|
||
|
||
func (t *TransformConfig) OnEvent(event any) {
|
||
switch v := event.(type) {
|
||
case IPublisher:
|
||
log.Println("TransformConfig OnEvent IPublisher...")
|
||
case FirstConfig:
|
||
log.Println("transform FirstConfig")
|
||
for _, stream := range t.OnStart {
|
||
t.SetUpTransformTask(stream)
|
||
}
|
||
break
|
||
case config.Config:
|
||
log.Println("transform config.Config")
|
||
break
|
||
case SEclose:
|
||
log.Println("transform SEclose:%s", v.Target.Path)
|
||
break
|
||
// case SEpublish:
|
||
// log.Println("transform SEpublish:%s", v.Stream.Path)
|
||
// break
|
||
}
|
||
}
|
||
|
||
var conf = &TransformConfig{
|
||
DefaultYaml: defaultYaml,
|
||
}
|
||
|
||
var TransformPlugin = InstallPlugin(conf)
|
||
|
||
func (t *TransformConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||
//streamPath := strings.TrimPrefix(r.RequestURI, "/transform/")
|
||
|
||
streamConfig := StreamConfig{
|
||
TransType: 0,
|
||
StreamPath: "",
|
||
NewStreamPath: "",
|
||
Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720
|
||
Fps: "25",
|
||
VideoCodec: "libx264", //libx264,libx265
|
||
|
||
HasOsd: false,
|
||
OsdText: "M7S 转码",
|
||
OsdFontColor: "green",
|
||
OsdFontsize: 100,
|
||
OsdX: 50,
|
||
OsdY: 50,
|
||
OsdBox: 0,
|
||
OsdBoxcolor: "yellow",
|
||
}
|
||
|
||
streamPath := r.URL.Query().Get("streampath")
|
||
streamConfig.StreamPath = streamPath
|
||
|
||
transType, err := strconv.Atoi(r.URL.Query().Get("transtype"))
|
||
if err == nil {
|
||
streamConfig.TransType = transType
|
||
}
|
||
//输出分辨率
|
||
if r.URL.Query().Has("resolution") {
|
||
streamConfig.Resolution = r.URL.Query().Get("resolution")
|
||
}
|
||
//输出编码格式
|
||
if r.URL.Query().Has("videocodec") {
|
||
streamConfig.VideoCodec = r.URL.Query().Get("videocodec")
|
||
}
|
||
//输出分辨率
|
||
if r.URL.Query().Has("fps") {
|
||
streamConfig.Fps = r.URL.Query().Get("fps")
|
||
}
|
||
|
||
if r.URL.Query().Has("osdtext") {
|
||
streamConfig.OsdText = r.URL.Query().Get("osdtext")
|
||
streamConfig.HasOsd = true
|
||
}
|
||
|
||
if r.URL.Query().Has("osdx") {
|
||
streamConfig.OsdX, err = strconv.Atoi(r.URL.Query().Get("osdx"))
|
||
}
|
||
if r.URL.Query().Has("osdy") {
|
||
streamConfig.OsdY, err = strconv.Atoi(r.URL.Query().Get("osdy"))
|
||
}
|
||
if r.URL.Query().Has("osdboxcolor") {
|
||
streamConfig.OsdBoxcolor = r.URL.Query().Get("osdboxcolor")
|
||
}
|
||
if r.URL.Query().Has("osdbox") {
|
||
streamConfig.OsdBox, err = strconv.Atoi(r.URL.Query().Get("osdbox"))
|
||
}
|
||
|
||
if r.URL.Query().Has("newstreampath") {
|
||
streamConfig.NewStreamPath = r.URL.Query().Get("newstreampath")
|
||
}
|
||
|
||
t.SetUpTransformTask(streamConfig)
|
||
|
||
w.Write([]byte("ok"))
|
||
}
|
||
|
||
func (t *TransformConfig) SetDefaultStreamConfig(config *StreamConfig) {
|
||
// streamConfig := StreamConfig{
|
||
// TransType: 0,
|
||
// StreamPath: "",
|
||
// NewStreamPath: "",
|
||
// Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720
|
||
// Fps: "25",
|
||
// VideoCodec: "libx264", //libx264,libx265
|
||
|
||
// HasOsd: false,
|
||
// OsdText: "M7S 转码",
|
||
// OsdFontColor: "green",
|
||
// OsdFontsize: 100,
|
||
// OsdX: 50,
|
||
// OsdY: 50,
|
||
// OsdBox: 0,
|
||
// OsdBoxcolor: "yellow",
|
||
// }
|
||
|
||
if config.Fps == "" {
|
||
config.Fps = "25"
|
||
}
|
||
|
||
if config.Resolution == "" {
|
||
config.Resolution = "352*288"
|
||
}
|
||
|
||
if config.VideoCodec == "" {
|
||
config.VideoCodec = "libx264"
|
||
}
|
||
|
||
if config.OsdText == "" {
|
||
config.OsdText = "M7S 转码"
|
||
}
|
||
|
||
if config.OsdFontColor == "" {
|
||
config.OsdFontColor = "green"
|
||
}
|
||
|
||
if config.OsdFontsize == 0 {
|
||
config.OsdFontsize = 100
|
||
}
|
||
|
||
if config.OsdX == 0 {
|
||
config.OsdX = 100
|
||
}
|
||
|
||
if config.OsdY == 0 {
|
||
config.OsdY = 100
|
||
}
|
||
|
||
if config.OsdBoxcolor == "" {
|
||
config.OsdBoxcolor = "yellow"
|
||
}
|
||
|
||
}
|
||
|
||
func (t *TransformConfig) SetUpTransformTask(config StreamConfig) {
|
||
//更新默认配置
|
||
t.SetDefaultStreamConfig(&config)
|
||
|
||
task := &TransformTask{
|
||
plugin: t,
|
||
streamConfig: config,
|
||
}
|
||
|
||
if task.streamConfig.StreamPath == "" {
|
||
TransformPlugin.Info("stream transform invalid\n", zap.String("streamPath", task.streamConfig.StreamPath))
|
||
return
|
||
}
|
||
|
||
if task.streamConfig.NewStreamPath == "" {
|
||
typeStr := strconv.FormatInt(int64(task.streamConfig.TransType), 10)
|
||
task.streamConfig.NewStreamPath = task.streamConfig.StreamPath + "-ts" + typeStr
|
||
}
|
||
|
||
if tanfsTaskArray[task.streamConfig.NewStreamPath] != nil {
|
||
TransformPlugin.Info("stream transform\n", zap.String("streamPath", task.streamConfig.NewStreamPath))
|
||
return
|
||
}
|
||
|
||
task.atTime = time.Now()
|
||
tanfsTaskArray[task.streamConfig.NewStreamPath] = task
|
||
|
||
switch task.streamConfig.TransType {
|
||
case 0:
|
||
go task.setupFfmpegTransformThrd0()
|
||
case 1:
|
||
go task.setupFfmpegTransformThrd1()
|
||
case 2:
|
||
go task.setupFfmpegTransformThrd2()
|
||
}
|
||
|
||
return
|
||
|
||
}
|
||
|
||
// 重点方案,增加ffmpeg 进程异常退出重启功能 默认方法
|
||
// 学习stream 码流订阅用法
|
||
// 学习stream 码流发布用法
|
||
func (t *TransformTask) setupFfmpegTransformThrd0() {
|
||
TransformPlugin.Info("setupFfmpegTransformThrd0 pipe in and out...")
|
||
|
||
//添加一个循环 避免ffmpeg 进程异常退出,退出后自动重新启动
|
||
for {
|
||
t.status = 0
|
||
|
||
//ffmpeg 启动次数+1
|
||
t.restartFFCount++
|
||
|
||
osdText := ""
|
||
//"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
|
||
|
||
if t.streamConfig.OsdText != "" {
|
||
osdText += fmt.Sprintf("drawtext=fontsize=%d:fontfile=%s:text='%s':x=%d:y=%d:fontcolor=%s",
|
||
t.streamConfig.OsdFontsize,
|
||
t.plugin.Fontfile,
|
||
t.streamConfig.OsdText,
|
||
t.streamConfig.OsdX,
|
||
t.streamConfig.OsdY,
|
||
t.streamConfig.OsdFontColor)
|
||
}
|
||
|
||
if t.streamConfig.OsdBox != 0 && t.streamConfig.OsdBoxcolor != "" {
|
||
osdText += fmt.Sprintf(":box=1:boxcolor=%s",
|
||
t.streamConfig.OsdBoxcolor)
|
||
}
|
||
|
||
TransformPlugin.Info(osdText)
|
||
|
||
cmd := exec.Command(conf.Ffmpeg, "-re",
|
||
"-i", "pipe:0",
|
||
"-tune", "zerolatency", //编码延迟参数
|
||
//"-vcodec", t.videoCodec,
|
||
//"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
|
||
//"-b:v", "400k",
|
||
//"-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量
|
||
"-s", t.streamConfig.Resolution,
|
||
"-r", t.streamConfig.Fps,
|
||
"-vf",
|
||
osdText,
|
||
"-c:v", t.streamConfig.VideoCodec,
|
||
//"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
|
||
//"-acodec", "libfaac",
|
||
//"-b:a", "64k",
|
||
"-acodec", "copy",
|
||
"-f",
|
||
"mpegts", //TS
|
||
"pipe:1",
|
||
)
|
||
|
||
TransformPlugin.Info(cmd.String())
|
||
|
||
// cmd := exec.Command(conf.Ffmpeg, "-re",
|
||
// "-i", "pipe:0",
|
||
// "-tune", "zerolatency", //编码延迟参数
|
||
// "-vcodec", "libx264",
|
||
// "-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
|
||
// "-b:v", "400k",
|
||
// "-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量
|
||
// "-s", "320*240",
|
||
// "-r", "25",
|
||
// "-vf",
|
||
// "drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
|
||
// "-acodec", "libfaac",
|
||
// "-b:a", "64k",
|
||
// "-f",
|
||
// //"h264",
|
||
// //"flv",
|
||
// "mpegts", //TS
|
||
// //"mpeg", //ps
|
||
// "pipe:1",
|
||
// //"-y",
|
||
// //"transform.flv",
|
||
// )
|
||
|
||
//获取输入流
|
||
stdin, err := cmd.StdinPipe()
|
||
if err != nil {
|
||
TransformPlugin.Error("Error getting stdin pipe:", zap.Error(err))
|
||
continue
|
||
}
|
||
|
||
t.cmd = cmd
|
||
|
||
t.in_wp = stdin
|
||
|
||
//获取输出流 句柄
|
||
stdout, err := cmd.StdoutPipe()
|
||
if err != nil {
|
||
TransformPlugin.Error("Error getting stdout pipe:", zap.Error(err))
|
||
continue
|
||
}
|
||
t.out_rp = stdout
|
||
|
||
// Start the command
|
||
err = cmd.Start()
|
||
if err != nil {
|
||
TransformPlugin.Error("Error starting command:", zap.Error(err))
|
||
continue
|
||
}
|
||
|
||
//优先启动读管道数据进程
|
||
go t.readFFPipe1AndToPublisher()
|
||
|
||
//定义一个订阅者
|
||
s := &TransformSubscriber{}
|
||
//s.IsInternal = true
|
||
s.task = t
|
||
t.s = s
|
||
|
||
if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil {
|
||
TransformPlugin.Error("TransformPlugin 2 Subscribe faild")
|
||
continue
|
||
} else {
|
||
//重点需要goroutin 启动订阅流,且只订阅了video track 裸流
|
||
//避免重复请求播放
|
||
if !s.IsPlaying() {
|
||
TransformPlugin.Info("TransformPlugin Subscribe sucess 2 play")
|
||
go s.PlayRaw()
|
||
}
|
||
}
|
||
|
||
TransformPlugin.Info("cmd Start wait end....\n")
|
||
err = cmd.Wait()
|
||
if err != nil {
|
||
TransformPlugin.Error("Error Wait command:", zap.Error(err))
|
||
}
|
||
//复位读写指针
|
||
t.out_rp = nil
|
||
t.in_wp = nil
|
||
|
||
//关闭订阅流
|
||
if t.s != nil {
|
||
TransformPlugin.Info("try to close TransformSubscriber")
|
||
t.s.Delete()
|
||
t.s = nil
|
||
}
|
||
//关闭发布流
|
||
if t.p != nil {
|
||
t.p.Delete()
|
||
t.p = nil
|
||
TransformPlugin.Info("try to close TransformPublisher")
|
||
}
|
||
TransformPlugin.Info("ffmpegTransformThrd end to restart", zap.Int("restartFFCount", t.restartFFCount))
|
||
|
||
//延迟后重启
|
||
time.Sleep(time.Duration(1000) * time.Millisecond)
|
||
|
||
}
|
||
|
||
//TransformPlugin.Info("ffmpeg task end...")
|
||
|
||
//t.taskEnd("ffmpeg cmd end")
|
||
}
|
||
|
||
// 订阅的Track数据写入ffmpeg 输入管道
|
||
func (t *TransformTask) writeToFFPipe0(buf []byte) {
|
||
|
||
//左闭右开
|
||
// n := len(buf)
|
||
// if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 7 {
|
||
// log.Printf("pipe in SPS:%d, [%v]\n",
|
||
// len(buf), hex.EncodeToString(buf))
|
||
// } else if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 8 {
|
||
// log.Printf("pipe in PPS:%d, [%v]\n",
|
||
// len(buf), hex.EncodeToString(buf))
|
||
// } else {
|
||
// if n > 5 {
|
||
// n = 5
|
||
// }
|
||
// log.Printf("pipe in raw:%d, [%v]\n",
|
||
// len(buf), hex.EncodeToString(buf[0:n]))
|
||
// }
|
||
|
||
t.status = 1
|
||
if t.in_wp == nil {
|
||
TransformPlugin.Warn("invalid in pipe wp")
|
||
return
|
||
}
|
||
t.in_bytes += len(buf)
|
||
_, err := t.in_wp.Write(buf)
|
||
if err != nil {
|
||
TransformPlugin.Error("write to pipe0 failed:", zap.Error(err))
|
||
}
|
||
}
|
||
|
||
// ffmpeg 转码后的ts 流 发布 stream
|
||
func (t *TransformTask) readFFPipe1AndToPublisher() {
|
||
//s.readTsDataFromPipeOut()
|
||
|
||
if t.out_rp == nil {
|
||
//time.Sleep(time.Duration(50) * time.Microsecond)
|
||
//continue
|
||
return
|
||
}
|
||
|
||
if t.p == nil {
|
||
//发布一个新的转码流
|
||
//定义一个发布者
|
||
p := &TransformPublisher{}
|
||
p.task = t
|
||
t.p = p
|
||
|
||
//判断流是否存在,存在则删除重新发布
|
||
s := Streams.Get(t.streamConfig.NewStreamPath)
|
||
if s != nil {
|
||
Streams.Delete(t.streamConfig.NewStreamPath)
|
||
}
|
||
TransformPlugin.Info("TransformTask TSPublisher", zap.String("newStreamPath", t.streamConfig.NewStreamPath))
|
||
if err := TransformPlugin.Publish(t.streamConfig.NewStreamPath, p); err != nil {
|
||
TransformPlugin.Error("TransformTask publish:", zap.Error(err))
|
||
return
|
||
}
|
||
p.AudioTrack = nil
|
||
p.VideoTrack = nil
|
||
|
||
//buf := make([]byte, 64*1024)
|
||
p.tsReader = NewTSReader(&p.TSPublisher)
|
||
//defer tsReader.Close()
|
||
|
||
//很重要这一步
|
||
//ffmpeg restart t.out_rp 会发生变化
|
||
if p.TSPublisher.IO.Reader != t.out_rp {
|
||
TransformPlugin.Info("TSPublisher SetIO...")
|
||
p.TSPublisher.SetIO(t.out_rp)
|
||
}
|
||
}
|
||
|
||
p := t.p
|
||
|
||
for {
|
||
if t.out_rp == nil {
|
||
TransformPlugin.Info("TransformTask TSPublisher no out rp valid exit thrd")
|
||
break
|
||
}
|
||
//err := tsReader.Feed(t.out_rp)
|
||
//很重要这一步
|
||
if p != nil && p.tsReader != nil {
|
||
err := p.tsReader.Feed(p)
|
||
if err == errors.New("file already closed") {
|
||
TransformPlugin.Error("tsReader.Feed:", zap.Error(err))
|
||
break
|
||
}
|
||
if err != nil {
|
||
TransformPlugin.Error("tsReader.Feed:", zap.Error(err))
|
||
//避免循环快速打印
|
||
time.Sleep(time.Duration(500) * time.Millisecond)
|
||
//
|
||
}
|
||
|
||
}
|
||
|
||
//log.Println("p end Feed...")
|
||
}
|
||
|
||
}
|
||
|
||
// 为了验证测试,使用者自己加固,异常处理
|
||
func (t *TransformTask) setupFfmpegTransformThrd1() {
|
||
TransformPlugin.Info("setupFfmpegTransformThrd1 url in and rtmp out...")
|
||
//转码并缩放
|
||
//成功
|
||
//ffmpeg -ss 0:01 -i "rtsp://127.0.0.1:554/njtv/glgc" -vcodec copy -vcodec libx264 -s 720*576 -f flv "rtmp://127.0.0.1:1935/njtv/glgc-d1"
|
||
|
||
pullpath := "rtsp://127.0.0.1:554/" + t.streamConfig.StreamPath
|
||
tspath := "rtmp://127.0.0.1:1935/" + t.streamConfig.NewStreamPath
|
||
|
||
//转码并保存
|
||
//cmd := exec.Command("ffmpeg", "-re", "-r", "30", "-i", "pipe:0", "-vcodec", "libx264", "-f", "flv", "pipe:1", "-y", "another.flv")
|
||
cmd := exec.Command(conf.Ffmpeg, "-re",
|
||
"-i", pullpath,
|
||
//"-vcodec", "copy",
|
||
"-tune", "zerolatency", //编码延迟参数
|
||
"-vcodec", "libx264",
|
||
"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
|
||
"-b:v", "400k",
|
||
"-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量
|
||
"-s", "320*240",
|
||
"-r", "25",
|
||
"-vf",
|
||
"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts0':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
|
||
"-acodec", "libfaac",
|
||
"-b:a", "64k",
|
||
"-f",
|
||
"flv",
|
||
tspath,
|
||
)
|
||
|
||
t.cmd = cmd
|
||
// Start the command
|
||
err := cmd.Start()
|
||
if err != nil {
|
||
fmt.Println("Error starting command:", err)
|
||
return
|
||
}
|
||
log.Printf("cmd Start wait end....\n")
|
||
err = cmd.Wait()
|
||
if err != nil {
|
||
fmt.Println("Error Wait command:", err)
|
||
}
|
||
log.Printf("ffmpegTransformThrd end\n")
|
||
|
||
t.taskEnd("cmd end")
|
||
}
|
||
|
||
// 为了验证测试,使用者自己加固,异常处理
|
||
func (t *TransformTask) setupFfmpegTransformThrd2() {
|
||
TransformPlugin.Info("setupFfmpegTransformThrd2 pipe in and rtmp out...")
|
||
s := &TransformSubscriber{}
|
||
//s.IsInternal = true
|
||
s.task = t
|
||
|
||
t.s = s
|
||
|
||
// sub.onSubscriberSucess()
|
||
if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil {
|
||
TransformPlugin.Error("TransformPlugin 1 Subscribe faild")
|
||
t.taskEnd("Subscribe faild")
|
||
return
|
||
} else {
|
||
TransformPlugin.Info("TransformPlugin Subscribe sucess")
|
||
//重点需要goroutin
|
||
go s.PlayRaw()
|
||
}
|
||
|
||
cmd := exec.Command(conf.Ffmpeg, "-re",
|
||
"-i", "pipe:0",
|
||
"-tune", "zerolatency", //编码延迟参数
|
||
"-vcodec", "libx264",
|
||
"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
|
||
"-b:v", "400k",
|
||
"-preset", "superfast", //编码延迟参数,superfast ultrafast 影响图像质量
|
||
"-s", "320*240",
|
||
"-r", "25",
|
||
"-vf",
|
||
"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts1':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
|
||
"-acodec", "libfaac",
|
||
"-b:a", "64k",
|
||
"-f",
|
||
"flv",
|
||
"rtmp://127.0.0.1:1935/"+t.streamConfig.NewStreamPath,
|
||
)
|
||
|
||
t.cmd = cmd
|
||
//获取输入流
|
||
stdin, err := cmd.StdinPipe()
|
||
if err != nil {
|
||
fmt.Println("Error getting stdin pipe:", err)
|
||
return
|
||
}
|
||
t.in_wp = stdin
|
||
|
||
// Start the command
|
||
err = cmd.Start()
|
||
if err != nil {
|
||
fmt.Println("Error starting command:", err)
|
||
return
|
||
}
|
||
log.Printf("cmd Start wait end....\n")
|
||
err = cmd.Wait()
|
||
if err != nil {
|
||
fmt.Println("Error Wait command:", err)
|
||
}
|
||
log.Printf("ffmpegTransformThrd end\n")
|
||
|
||
t.taskEnd("cmd end")
|
||
}
|
||
|
||
func (t *TransformTask) taskEnd(reason string) {
|
||
|
||
tanfsTaskArray[t.streamConfig.NewStreamPath] = nil
|
||
|
||
if t.p != nil {
|
||
t.p.Stop()
|
||
t.p = nil
|
||
}
|
||
if t.s != nil {
|
||
t.s.Stop()
|
||
t.s = nil
|
||
}
|
||
|
||
log.Printf(fmt.Sprintf("task:%s end for:%s\n", t.streamConfig.NewStreamPath, reason))
|
||
}
|
||
|
||
func (t *TransformTask) debugPrintfNal(buf []byte, name string) {
|
||
n := len(buf)
|
||
if n > 5 {
|
||
n = 5
|
||
}
|
||
log.Printf("nal %s out len:%d, [%v]\n",
|
||
name,
|
||
len(buf), hex.EncodeToString(buf[0:n]))
|
||
|
||
// s.Info("nal :",
|
||
// zap.String("int_bytes", name),
|
||
// zap.Int("int_bytes", len(buf)),
|
||
// zap.String("int_bytes", hex.EncodeToString(buf[0:n])))
|
||
}
|
||
|
||
func (t *TransformTask) writeTmpFile(data []byte) {
|
||
|
||
if t.f == nil {
|
||
f, err := os.Create("tranform-tmp.ps")
|
||
if f == nil {
|
||
log.Println("create file faild:%v", err)
|
||
return
|
||
}
|
||
t.f = f
|
||
}
|
||
|
||
_, err2 := t.f.Write(data)
|
||
if err2 != nil {
|
||
log.Println("Write file faild:%v", err2)
|
||
return
|
||
}
|
||
|
||
if t.out_bytes > 1024*1024*5 {
|
||
t.f.Close()
|
||
os.Exit(0)
|
||
}
|
||
|
||
}
|
||
|
||
/*
|
||
func (s *TransformSubscriber) OnEvent(event any) {
|
||
switch v := event.(type) {
|
||
case VideoFrame:
|
||
//s.Stop()
|
||
// var errOut util.Buffer
|
||
// firstFrame := v.GetAnnexB()
|
||
// cmd := exec.Command(conf.FFmpeg, "-hide_banner", "-i", "pipe:0", "-vframes", "1", "-f", "mjpeg", "pipe:1")
|
||
// cmd.Stdin = &firstFrame
|
||
// cmd.Stderr = &errOut
|
||
// cmd.Stdout = s
|
||
// cmd.Run()
|
||
// if errOut.CanRead() {
|
||
// s.Info(string(errOut))
|
||
// }
|
||
default:
|
||
s.Subscriber.OnEvent(event)
|
||
}
|
||
}*/
|