Files
plugin-transform/main.go
2024-02-25 11:00:47 +08:00

760 lines
19 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package transform
import (
_ "embed"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"log"
"net/http"
"os/exec"
"strconv"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
)
//go:embed default.yaml
var defaultYaml DefaultYaml
var tanfsTaskArray = make(map[string]*TransformTask)
type TransformConfig struct {
DefaultYaml
config.Publish
config.Subscribe
Ffmpeg string `default:"ffmpeg" desc:"ffmpeg的路径 "`
Path string //存储路径
Filter string //过滤器
Fontfile string `default:"shoujin.ttf" desc:"叠加字体路径 "` //osd 叠加字帖路径 shoujin.ttf
//OnStart []string `desc:"启动时转码的列表"` // 启动时转码的列表
OnStart []StreamConfig `yaml:"onstart"`
}
// 用此包解析yaml文件到结构体存在三个问题。
// yaml文件中的配置项首字母不能大写配置项名字要求比较严格。
// 没有默认值。
// 没有数值校验。
type StreamConfig struct {
TransType int `default:"2" yaml:"transtype"` //转码类型 0: rtsp pull rtmp push; 1: sub raw frame rtmp push; 2: sub raw frame ts publiser;
StreamPath string `default:"" yaml:"streampath"`
NewStreamPath string `default:"" yaml:"newstreampath"`
Resolution string `default:"720*576" yaml:"resolution"`
VideoCodec string `default:"libx264" yaml:"videocodec"` //libx264, libx265
Fps string `default:"25" yaml:"fps"`
HasOsd bool `default:"false" yaml:"hasosd"`
OsdText string `default:"M7S 转码" yaml:"osdtext"`
OsdFontsize int `default:"100" yaml:"osdfontsize"`
OsdFontColor string `default:"white" yaml:"osdfontcolor"`
OsdX int `default:"50" yaml:"osdx"`
OsdY int `default:"50" yaml:"osdy"`
OsdBox int `default:"1" yaml:"osdbox"`
OsdBoxcolor string `default:"yellow" yaml:"osdboxcolor"`
}
type TransformTask struct {
plugin *TransformConfig
status int //0 :idel ; 1 input ing; 2 output ing
//统计信息
restartFFCount int
rePullCount int
streamConfig StreamConfig
atTime time.Time //开始时间
cmd *exec.Cmd
p *TransformPublisher
s *TransformSubscriber
//两个管道一个输入
in_wp io.WriteCloser
in_bytes int
out_rp io.ReadCloser
out_bytes int
mt sync.Mutex
f *os.File
}
type TransformPublisher struct {
TSPublisher
tsReader *TSReader
task *TransformTask
}
func (p *TransformPublisher) Delete() {
p.Stop()
if p.tsReader != nil {
p.tsReader.Close()
}
}
func (p *TransformPublisher) OnEvent(event any) {
switch event.(type) {
case IPublisher:
p.Stream.NeverTimeout = true
log.Println("TransformPublisher OnEvent IPublisher...")
p.TSPublisher.OnEvent(event)
case SEKick, SEclose:
log.Println("TransformPublisher SEclose IPublisher...")
p.Publisher.OnEvent(event)
default:
p.Publisher.OnEvent(event)
}
}
func (t *TransformConfig) OnEvent(event any) {
switch v := event.(type) {
case IPublisher:
log.Println("TransformConfig OnEvent IPublisher...")
case FirstConfig:
log.Println("transform FirstConfig")
for _, stream := range t.OnStart {
t.SetUpTransformTask(stream)
}
break
case config.Config:
log.Println("transform config.Config")
break
case SEclose:
log.Println("transform SEclose:%s", v.Target.Path)
break
// case SEpublish:
// log.Println("transform SEpublish:%s", v.Stream.Path)
// break
}
}
var conf = &TransformConfig{
DefaultYaml: defaultYaml,
}
var TransformPlugin = InstallPlugin(conf)
func (t *TransformConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//streamPath := strings.TrimPrefix(r.RequestURI, "/transform/")
streamConfig := StreamConfig{
TransType: 0,
StreamPath: "",
NewStreamPath: "",
Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720
Fps: "25",
VideoCodec: "libx264", //libx264,libx265
HasOsd: false,
OsdText: "M7S 转码",
OsdFontColor: "green",
OsdFontsize: 100,
OsdX: 50,
OsdY: 50,
OsdBox: 0,
OsdBoxcolor: "yellow",
}
streamPath := r.URL.Query().Get("streampath")
streamConfig.StreamPath = streamPath
transType, err := strconv.Atoi(r.URL.Query().Get("transtype"))
if err == nil {
streamConfig.TransType = transType
}
//输出分辨率
if r.URL.Query().Has("resolution") {
streamConfig.Resolution = r.URL.Query().Get("resolution")
}
//输出编码格式
if r.URL.Query().Has("videocodec") {
streamConfig.VideoCodec = r.URL.Query().Get("videocodec")
}
//输出分辨率
if r.URL.Query().Has("fps") {
streamConfig.Fps = r.URL.Query().Get("fps")
}
if r.URL.Query().Has("osdtext") {
streamConfig.OsdText = r.URL.Query().Get("osdtext")
streamConfig.HasOsd = true
}
if r.URL.Query().Has("osdx") {
streamConfig.OsdX, err = strconv.Atoi(r.URL.Query().Get("osdx"))
}
if r.URL.Query().Has("osdy") {
streamConfig.OsdY, err = strconv.Atoi(r.URL.Query().Get("osdy"))
}
if r.URL.Query().Has("osdboxcolor") {
streamConfig.OsdBoxcolor = r.URL.Query().Get("osdboxcolor")
}
if r.URL.Query().Has("osdbox") {
streamConfig.OsdBox, err = strconv.Atoi(r.URL.Query().Get("osdbox"))
}
if r.URL.Query().Has("newstreampath") {
streamConfig.NewStreamPath = r.URL.Query().Get("newstreampath")
}
t.SetUpTransformTask(streamConfig)
w.Write([]byte("ok"))
}
func (t *TransformConfig) SetDefaultStreamConfig(config *StreamConfig) {
// streamConfig := StreamConfig{
// TransType: 0,
// StreamPath: "",
// NewStreamPath: "",
// Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720
// Fps: "25",
// VideoCodec: "libx264", //libx264,libx265
// HasOsd: false,
// OsdText: "M7S 转码",
// OsdFontColor: "green",
// OsdFontsize: 100,
// OsdX: 50,
// OsdY: 50,
// OsdBox: 0,
// OsdBoxcolor: "yellow",
// }
if config.Fps == "" {
config.Fps = "25"
}
if config.Resolution == "" {
config.Resolution = "352*288"
}
if config.VideoCodec == "" {
config.VideoCodec = "libx264"
}
if config.OsdText == "" {
config.OsdText = "M7S 转码"
}
if config.OsdFontColor == "" {
config.OsdFontColor = "green"
}
if config.OsdFontsize == 0 {
config.OsdFontsize = 100
}
if config.OsdX == 0 {
config.OsdX = 100
}
if config.OsdY == 0 {
config.OsdY = 100
}
if config.OsdBoxcolor == "" {
config.OsdBoxcolor = "yellow"
}
}
func (t *TransformConfig) SetUpTransformTask(config StreamConfig) {
//更新默认配置
t.SetDefaultStreamConfig(&config)
task := &TransformTask{
plugin: t,
streamConfig: config,
}
if task.streamConfig.StreamPath == "" {
TransformPlugin.Info("stream transform invalid\n", zap.String("streamPath", task.streamConfig.StreamPath))
return
}
if task.streamConfig.NewStreamPath == "" {
typeStr := strconv.FormatInt(int64(task.streamConfig.TransType), 10)
task.streamConfig.NewStreamPath = task.streamConfig.StreamPath + "-ts" + typeStr
}
if tanfsTaskArray[task.streamConfig.NewStreamPath] != nil {
TransformPlugin.Info("stream transform\n", zap.String("streamPath", task.streamConfig.NewStreamPath))
return
}
task.atTime = time.Now()
tanfsTaskArray[task.streamConfig.NewStreamPath] = task
switch task.streamConfig.TransType {
case 0:
go task.setupFfmpegTransformThrd0()
case 1:
go task.setupFfmpegTransformThrd1()
case 2:
go task.setupFfmpegTransformThrd2()
}
return
}
// 重点方案增加ffmpeg 进程异常退出重启功能 默认方法
// 学习stream 码流订阅用法
// 学习stream 码流发布用法
func (t *TransformTask) setupFfmpegTransformThrd0() {
TransformPlugin.Info("setupFfmpegTransformThrd0 pipe in and out...")
//添加一个循环 避免ffmpeg 进程异常退出,退出后自动重新启动
for {
t.status = 0
//ffmpeg 启动次数+1
t.restartFFCount++
osdText := ""
//"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
if t.streamConfig.OsdText != "" {
osdText += fmt.Sprintf("drawtext=fontsize=%d:fontfile=%s:text='%s':x=%d:y=%d:fontcolor=%s",
t.streamConfig.OsdFontsize,
t.plugin.Fontfile,
t.streamConfig.OsdText,
t.streamConfig.OsdX,
t.streamConfig.OsdY,
t.streamConfig.OsdFontColor)
}
if t.streamConfig.OsdBox != 0 && t.streamConfig.OsdBoxcolor != "" {
osdText += fmt.Sprintf(":box=1:boxcolor=%s",
t.streamConfig.OsdBoxcolor)
}
TransformPlugin.Info(osdText)
cmd := exec.Command(conf.Ffmpeg, "-re",
"-i", "pipe:0",
"-tune", "zerolatency", //编码延迟参数
//"-vcodec", t.videoCodec,
//"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
//"-b:v", "400k",
//"-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
"-s", t.streamConfig.Resolution,
"-r", t.streamConfig.Fps,
"-vf",
osdText,
"-c:v", t.streamConfig.VideoCodec,
//"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
//"-acodec", "libfaac",
//"-b:a", "64k",
"-acodec", "copy",
"-f",
"mpegts", //TS
"pipe:1",
)
TransformPlugin.Info(cmd.String())
// cmd := exec.Command(conf.Ffmpeg, "-re",
// "-i", "pipe:0",
// "-tune", "zerolatency", //编码延迟参数
// "-vcodec", "libx264",
// "-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
// "-b:v", "400k",
// "-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
// "-s", "320*240",
// "-r", "25",
// "-vf",
// "drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
// "-acodec", "libfaac",
// "-b:a", "64k",
// "-f",
// //"h264",
// //"flv",
// "mpegts", //TS
// //"mpeg", //ps
// "pipe:1",
// //"-y",
// //"transform.flv",
// )
//获取输入流
stdin, err := cmd.StdinPipe()
if err != nil {
TransformPlugin.Error("Error getting stdin pipe:", zap.Error(err))
continue
}
t.cmd = cmd
t.in_wp = stdin
//获取输出流 句柄
stdout, err := cmd.StdoutPipe()
if err != nil {
TransformPlugin.Error("Error getting stdout pipe:", zap.Error(err))
continue
}
t.out_rp = stdout
// Start the command
err = cmd.Start()
if err != nil {
TransformPlugin.Error("Error starting command:", zap.Error(err))
continue
}
//优先启动读管道数据进程
go t.readFFPipe1AndToPublisher()
//定义一个订阅者
s := &TransformSubscriber{}
//s.IsInternal = true
s.task = t
t.s = s
if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil {
TransformPlugin.Error("TransformPlugin 2 Subscribe faild")
continue
} else {
//重点需要goroutin 启动订阅流且只订阅了video track 裸流
//避免重复请求播放
if !s.IsPlaying() {
TransformPlugin.Info("TransformPlugin Subscribe sucess 2 play")
go s.PlayRaw()
}
}
TransformPlugin.Info("cmd Start wait end....\n")
err = cmd.Wait()
if err != nil {
TransformPlugin.Error("Error Wait command:", zap.Error(err))
}
//复位读写指针
t.out_rp = nil
t.in_wp = nil
//关闭订阅流
if t.s != nil {
TransformPlugin.Info("try to close TransformSubscriber")
t.s.Delete()
t.s = nil
}
//关闭发布流
if t.p != nil {
t.p.Delete()
t.p = nil
TransformPlugin.Info("try to close TransformPublisher")
}
TransformPlugin.Info("ffmpegTransformThrd end to restart", zap.Int("restartFFCount", t.restartFFCount))
//延迟后重启
time.Sleep(time.Duration(1000) * time.Millisecond)
}
//TransformPlugin.Info("ffmpeg task end...")
//t.taskEnd("ffmpeg cmd end")
}
// 订阅的Track数据写入ffmpeg 输入管道
func (t *TransformTask) writeToFFPipe0(buf []byte) {
//左闭右开
// n := len(buf)
// if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 7 {
// log.Printf("pipe in SPS:%d, [%v]\n",
// len(buf), hex.EncodeToString(buf))
// } else if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 8 {
// log.Printf("pipe in PPS:%d, [%v]\n",
// len(buf), hex.EncodeToString(buf))
// } else {
// if n > 5 {
// n = 5
// }
// log.Printf("pipe in raw:%d, [%v]\n",
// len(buf), hex.EncodeToString(buf[0:n]))
// }
t.status = 1
if t.in_wp == nil {
TransformPlugin.Warn("invalid in pipe wp")
return
}
t.in_bytes += len(buf)
_, err := t.in_wp.Write(buf)
if err != nil {
TransformPlugin.Error("write to pipe0 failed:", zap.Error(err))
}
}
// ffmpeg 转码后的ts 流 发布 stream
func (t *TransformTask) readFFPipe1AndToPublisher() {
//s.readTsDataFromPipeOut()
if t.out_rp == nil {
//time.Sleep(time.Duration(50) * time.Microsecond)
//continue
return
}
if t.p == nil {
//发布一个新的转码流
//定义一个发布者
p := &TransformPublisher{}
p.task = t
t.p = p
//判断流是否存在,存在则删除重新发布
s := Streams.Get(t.streamConfig.NewStreamPath)
if s != nil {
Streams.Delete(t.streamConfig.NewStreamPath)
}
TransformPlugin.Info("TransformTask TSPublisher", zap.String("newStreamPath", t.streamConfig.NewStreamPath))
if err := TransformPlugin.Publish(t.streamConfig.NewStreamPath, p); err != nil {
TransformPlugin.Error("TransformTask publish:", zap.Error(err))
return
}
p.AudioTrack = nil
p.VideoTrack = nil
//buf := make([]byte, 64*1024)
p.tsReader = NewTSReader(&p.TSPublisher)
//defer tsReader.Close()
//很重要这一步
//ffmpeg restart t.out_rp 会发生变化
if p.TSPublisher.IO.Reader != t.out_rp {
TransformPlugin.Info("TSPublisher SetIO...")
p.TSPublisher.SetIO(t.out_rp)
}
}
p := t.p
for {
if t.out_rp == nil {
TransformPlugin.Info("TransformTask TSPublisher no out rp valid exit thrd")
break
}
//err := tsReader.Feed(t.out_rp)
//很重要这一步
if p != nil && p.tsReader != nil {
err := p.tsReader.Feed(p)
if err == errors.New("file already closed") {
TransformPlugin.Error("tsReader.Feed:", zap.Error(err))
break
}
if err != nil {
TransformPlugin.Error("tsReader.Feed:", zap.Error(err))
//避免循环快速打印
time.Sleep(time.Duration(500) * time.Millisecond)
//
}
}
//log.Println("p end Feed...")
}
}
// 为了验证测试,使用者自己加固,异常处理
func (t *TransformTask) setupFfmpegTransformThrd1() {
TransformPlugin.Info("setupFfmpegTransformThrd1 url in and rtmp out...")
//转码并缩放
//成功
//ffmpeg -ss 0:01 -i "rtsp://127.0.0.1:554/njtv/glgc" -vcodec copy -vcodec libx264 -s 720*576 -f flv "rtmp://127.0.0.1:1935/njtv/glgc-d1"
pullpath := "rtsp://127.0.0.1:554/" + t.streamConfig.StreamPath
tspath := "rtmp://127.0.0.1:1935/" + t.streamConfig.NewStreamPath
//转码并保存
//cmd := exec.Command("ffmpeg", "-re", "-r", "30", "-i", "pipe:0", "-vcodec", "libx264", "-f", "flv", "pipe:1", "-y", "another.flv")
cmd := exec.Command(conf.Ffmpeg, "-re",
"-i", pullpath,
//"-vcodec", "copy",
"-tune", "zerolatency", //编码延迟参数
"-vcodec", "libx264",
"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
"-b:v", "400k",
"-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
"-s", "320*240",
"-r", "25",
"-vf",
"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts0':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
"-acodec", "libfaac",
"-b:a", "64k",
"-f",
"flv",
tspath,
)
t.cmd = cmd
// Start the command
err := cmd.Start()
if err != nil {
fmt.Println("Error starting command:", err)
return
}
log.Printf("cmd Start wait end....\n")
err = cmd.Wait()
if err != nil {
fmt.Println("Error Wait command:", err)
}
log.Printf("ffmpegTransformThrd end\n")
t.taskEnd("cmd end")
}
// 为了验证测试,使用者自己加固,异常处理
func (t *TransformTask) setupFfmpegTransformThrd2() {
TransformPlugin.Info("setupFfmpegTransformThrd2 pipe in and rtmp out...")
s := &TransformSubscriber{}
//s.IsInternal = true
s.task = t
t.s = s
// sub.onSubscriberSucess()
if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil {
TransformPlugin.Error("TransformPlugin 1 Subscribe faild")
t.taskEnd("Subscribe faild")
return
} else {
TransformPlugin.Info("TransformPlugin Subscribe sucess")
//重点需要goroutin
go s.PlayRaw()
}
cmd := exec.Command(conf.Ffmpeg, "-re",
"-i", "pipe:0",
"-tune", "zerolatency", //编码延迟参数
"-vcodec", "libx264",
"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
"-b:v", "400k",
"-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
"-s", "320*240",
"-r", "25",
"-vf",
"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts1':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
"-acodec", "libfaac",
"-b:a", "64k",
"-f",
"flv",
"rtmp://127.0.0.1:1935/"+t.streamConfig.NewStreamPath,
)
t.cmd = cmd
//获取输入流
stdin, err := cmd.StdinPipe()
if err != nil {
fmt.Println("Error getting stdin pipe:", err)
return
}
t.in_wp = stdin
// Start the command
err = cmd.Start()
if err != nil {
fmt.Println("Error starting command:", err)
return
}
log.Printf("cmd Start wait end....\n")
err = cmd.Wait()
if err != nil {
fmt.Println("Error Wait command:", err)
}
log.Printf("ffmpegTransformThrd end\n")
t.taskEnd("cmd end")
}
func (t *TransformTask) taskEnd(reason string) {
tanfsTaskArray[t.streamConfig.NewStreamPath] = nil
if t.p != nil {
t.p.Stop()
t.p = nil
}
if t.s != nil {
t.s.Stop()
t.s = nil
}
log.Printf(fmt.Sprintf("task:%s end for:%s\n", t.streamConfig.NewStreamPath, reason))
}
func (t *TransformTask) debugPrintfNal(buf []byte, name string) {
n := len(buf)
if n > 5 {
n = 5
}
log.Printf("nal %s out len:%d, [%v]\n",
name,
len(buf), hex.EncodeToString(buf[0:n]))
// s.Info("nal :",
// zap.String("int_bytes", name),
// zap.Int("int_bytes", len(buf)),
// zap.String("int_bytes", hex.EncodeToString(buf[0:n])))
}
func (t *TransformTask) writeTmpFile(data []byte) {
if t.f == nil {
f, err := os.Create("tranform-tmp.ps")
if f == nil {
log.Println("create file faild:%v", err)
return
}
t.f = f
}
_, err2 := t.f.Write(data)
if err2 != nil {
log.Println("Write file faild:%v", err2)
return
}
if t.out_bytes > 1024*1024*5 {
t.f.Close()
os.Exit(0)
}
}
/*
func (s *TransformSubscriber) OnEvent(event any) {
switch v := event.(type) {
case VideoFrame:
//s.Stop()
// var errOut util.Buffer
// firstFrame := v.GetAnnexB()
// cmd := exec.Command(conf.FFmpeg, "-hide_banner", "-i", "pipe:0", "-vframes", "1", "-f", "mjpeg", "pipe:1")
// cmd.Stdin = &firstFrame
// cmd.Stderr = &errOut
// cmd.Stdout = s
// cmd.Run()
// if errOut.CanRead() {
// s.Info(string(errOut))
// }
default:
s.Subscriber.OnEvent(event)
}
}*/