v20240225110000

This commit is contained in:
erroot
2024-02-25 11:00:47 +08:00
commit 06816dba9d
7 changed files with 1076 additions and 0 deletions

759
main.go Normal file
View File

@@ -0,0 +1,759 @@
package transform
import (
_ "embed"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"log"
"net/http"
"os/exec"
"strconv"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
)
//go:embed default.yaml
var defaultYaml DefaultYaml
var tanfsTaskArray = make(map[string]*TransformTask)
type TransformConfig struct {
DefaultYaml
config.Publish
config.Subscribe
Ffmpeg string `default:"ffmpeg" desc:"ffmpeg的路径 "`
Path string //存储路径
Filter string //过滤器
Fontfile string `default:"shoujin.ttf" desc:"叠加字体路径 "` //osd 叠加字帖路径 shoujin.ttf
//OnStart []string `desc:"启动时转码的列表"` // 启动时转码的列表
OnStart []StreamConfig `yaml:"onstart"`
}
// 用此包解析yaml文件到结构体存在三个问题。
// yaml文件中的配置项首字母不能大写配置项名字要求比较严格。
// 没有默认值。
// 没有数值校验。
type StreamConfig struct {
TransType int `default:"2" yaml:"transtype"` //转码类型 0: rtsp pull rtmp push; 1: sub raw frame rtmp push; 2: sub raw frame ts publiser;
StreamPath string `default:"" yaml:"streampath"`
NewStreamPath string `default:"" yaml:"newstreampath"`
Resolution string `default:"720*576" yaml:"resolution"`
VideoCodec string `default:"libx264" yaml:"videocodec"` //libx264, libx265
Fps string `default:"25" yaml:"fps"`
HasOsd bool `default:"false" yaml:"hasosd"`
OsdText string `default:"M7S 转码" yaml:"osdtext"`
OsdFontsize int `default:"100" yaml:"osdfontsize"`
OsdFontColor string `default:"white" yaml:"osdfontcolor"`
OsdX int `default:"50" yaml:"osdx"`
OsdY int `default:"50" yaml:"osdy"`
OsdBox int `default:"1" yaml:"osdbox"`
OsdBoxcolor string `default:"yellow" yaml:"osdboxcolor"`
}
type TransformTask struct {
plugin *TransformConfig
status int //0 :idel ; 1 input ing; 2 output ing
//统计信息
restartFFCount int
rePullCount int
streamConfig StreamConfig
atTime time.Time //开始时间
cmd *exec.Cmd
p *TransformPublisher
s *TransformSubscriber
//两个管道一个输入
in_wp io.WriteCloser
in_bytes int
out_rp io.ReadCloser
out_bytes int
mt sync.Mutex
f *os.File
}
type TransformPublisher struct {
TSPublisher
tsReader *TSReader
task *TransformTask
}
func (p *TransformPublisher) Delete() {
p.Stop()
if p.tsReader != nil {
p.tsReader.Close()
}
}
func (p *TransformPublisher) OnEvent(event any) {
switch event.(type) {
case IPublisher:
p.Stream.NeverTimeout = true
log.Println("TransformPublisher OnEvent IPublisher...")
p.TSPublisher.OnEvent(event)
case SEKick, SEclose:
log.Println("TransformPublisher SEclose IPublisher...")
p.Publisher.OnEvent(event)
default:
p.Publisher.OnEvent(event)
}
}
func (t *TransformConfig) OnEvent(event any) {
switch v := event.(type) {
case IPublisher:
log.Println("TransformConfig OnEvent IPublisher...")
case FirstConfig:
log.Println("transform FirstConfig")
for _, stream := range t.OnStart {
t.SetUpTransformTask(stream)
}
break
case config.Config:
log.Println("transform config.Config")
break
case SEclose:
log.Println("transform SEclose:%s", v.Target.Path)
break
// case SEpublish:
// log.Println("transform SEpublish:%s", v.Stream.Path)
// break
}
}
var conf = &TransformConfig{
DefaultYaml: defaultYaml,
}
var TransformPlugin = InstallPlugin(conf)
func (t *TransformConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//streamPath := strings.TrimPrefix(r.RequestURI, "/transform/")
streamConfig := StreamConfig{
TransType: 0,
StreamPath: "",
NewStreamPath: "",
Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720
Fps: "25",
VideoCodec: "libx264", //libx264,libx265
HasOsd: false,
OsdText: "M7S 转码",
OsdFontColor: "green",
OsdFontsize: 100,
OsdX: 50,
OsdY: 50,
OsdBox: 0,
OsdBoxcolor: "yellow",
}
streamPath := r.URL.Query().Get("streampath")
streamConfig.StreamPath = streamPath
transType, err := strconv.Atoi(r.URL.Query().Get("transtype"))
if err == nil {
streamConfig.TransType = transType
}
//输出分辨率
if r.URL.Query().Has("resolution") {
streamConfig.Resolution = r.URL.Query().Get("resolution")
}
//输出编码格式
if r.URL.Query().Has("videocodec") {
streamConfig.VideoCodec = r.URL.Query().Get("videocodec")
}
//输出分辨率
if r.URL.Query().Has("fps") {
streamConfig.Fps = r.URL.Query().Get("fps")
}
if r.URL.Query().Has("osdtext") {
streamConfig.OsdText = r.URL.Query().Get("osdtext")
streamConfig.HasOsd = true
}
if r.URL.Query().Has("osdx") {
streamConfig.OsdX, err = strconv.Atoi(r.URL.Query().Get("osdx"))
}
if r.URL.Query().Has("osdy") {
streamConfig.OsdY, err = strconv.Atoi(r.URL.Query().Get("osdy"))
}
if r.URL.Query().Has("osdboxcolor") {
streamConfig.OsdBoxcolor = r.URL.Query().Get("osdboxcolor")
}
if r.URL.Query().Has("osdbox") {
streamConfig.OsdBox, err = strconv.Atoi(r.URL.Query().Get("osdbox"))
}
if r.URL.Query().Has("newstreampath") {
streamConfig.NewStreamPath = r.URL.Query().Get("newstreampath")
}
t.SetUpTransformTask(streamConfig)
w.Write([]byte("ok"))
}
func (t *TransformConfig) SetDefaultStreamConfig(config *StreamConfig) {
// streamConfig := StreamConfig{
// TransType: 0,
// StreamPath: "",
// NewStreamPath: "",
// Resolution: "352*288", //CIF 352*288 qcif 176×144 320*240 1280*720
// Fps: "25",
// VideoCodec: "libx264", //libx264,libx265
// HasOsd: false,
// OsdText: "M7S 转码",
// OsdFontColor: "green",
// OsdFontsize: 100,
// OsdX: 50,
// OsdY: 50,
// OsdBox: 0,
// OsdBoxcolor: "yellow",
// }
if config.Fps == "" {
config.Fps = "25"
}
if config.Resolution == "" {
config.Resolution = "352*288"
}
if config.VideoCodec == "" {
config.VideoCodec = "libx264"
}
if config.OsdText == "" {
config.OsdText = "M7S 转码"
}
if config.OsdFontColor == "" {
config.OsdFontColor = "green"
}
if config.OsdFontsize == 0 {
config.OsdFontsize = 100
}
if config.OsdX == 0 {
config.OsdX = 100
}
if config.OsdY == 0 {
config.OsdY = 100
}
if config.OsdBoxcolor == "" {
config.OsdBoxcolor = "yellow"
}
}
func (t *TransformConfig) SetUpTransformTask(config StreamConfig) {
//更新默认配置
t.SetDefaultStreamConfig(&config)
task := &TransformTask{
plugin: t,
streamConfig: config,
}
if task.streamConfig.StreamPath == "" {
TransformPlugin.Info("stream transform invalid\n", zap.String("streamPath", task.streamConfig.StreamPath))
return
}
if task.streamConfig.NewStreamPath == "" {
typeStr := strconv.FormatInt(int64(task.streamConfig.TransType), 10)
task.streamConfig.NewStreamPath = task.streamConfig.StreamPath + "-ts" + typeStr
}
if tanfsTaskArray[task.streamConfig.NewStreamPath] != nil {
TransformPlugin.Info("stream transform\n", zap.String("streamPath", task.streamConfig.NewStreamPath))
return
}
task.atTime = time.Now()
tanfsTaskArray[task.streamConfig.NewStreamPath] = task
switch task.streamConfig.TransType {
case 0:
go task.setupFfmpegTransformThrd0()
case 1:
go task.setupFfmpegTransformThrd1()
case 2:
go task.setupFfmpegTransformThrd2()
}
return
}
// 重点方案增加ffmpeg 进程异常退出重启功能 默认方法
// 学习stream 码流订阅用法
// 学习stream 码流发布用法
func (t *TransformTask) setupFfmpegTransformThrd0() {
TransformPlugin.Info("setupFfmpegTransformThrd0 pipe in and out...")
//添加一个循环 避免ffmpeg 进程异常退出,退出后自动重新启动
for {
t.status = 0
//ffmpeg 启动次数+1
t.restartFFCount++
osdText := ""
//"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
if t.streamConfig.OsdText != "" {
osdText += fmt.Sprintf("drawtext=fontsize=%d:fontfile=%s:text='%s':x=%d:y=%d:fontcolor=%s",
t.streamConfig.OsdFontsize,
t.plugin.Fontfile,
t.streamConfig.OsdText,
t.streamConfig.OsdX,
t.streamConfig.OsdY,
t.streamConfig.OsdFontColor)
}
if t.streamConfig.OsdBox != 0 && t.streamConfig.OsdBoxcolor != "" {
osdText += fmt.Sprintf(":box=1:boxcolor=%s",
t.streamConfig.OsdBoxcolor)
}
TransformPlugin.Info(osdText)
cmd := exec.Command(conf.Ffmpeg, "-re",
"-i", "pipe:0",
"-tune", "zerolatency", //编码延迟参数
//"-vcodec", t.videoCodec,
//"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
//"-b:v", "400k",
//"-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
"-s", t.streamConfig.Resolution,
"-r", t.streamConfig.Fps,
"-vf",
osdText,
"-c:v", t.streamConfig.VideoCodec,
//"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
//"-acodec", "libfaac",
//"-b:a", "64k",
"-acodec", "copy",
"-f",
"mpegts", //TS
"pipe:1",
)
TransformPlugin.Info(cmd.String())
// cmd := exec.Command(conf.Ffmpeg, "-re",
// "-i", "pipe:0",
// "-tune", "zerolatency", //编码延迟参数
// "-vcodec", "libx264",
// "-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
// "-b:v", "400k",
// "-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
// "-s", "320*240",
// "-r", "25",
// "-vf",
// "drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts2':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
// "-acodec", "libfaac",
// "-b:a", "64k",
// "-f",
// //"h264",
// //"flv",
// "mpegts", //TS
// //"mpeg", //ps
// "pipe:1",
// //"-y",
// //"transform.flv",
// )
//获取输入流
stdin, err := cmd.StdinPipe()
if err != nil {
TransformPlugin.Error("Error getting stdin pipe:", zap.Error(err))
continue
}
t.cmd = cmd
t.in_wp = stdin
//获取输出流 句柄
stdout, err := cmd.StdoutPipe()
if err != nil {
TransformPlugin.Error("Error getting stdout pipe:", zap.Error(err))
continue
}
t.out_rp = stdout
// Start the command
err = cmd.Start()
if err != nil {
TransformPlugin.Error("Error starting command:", zap.Error(err))
continue
}
//优先启动读管道数据进程
go t.readFFPipe1AndToPublisher()
//定义一个订阅者
s := &TransformSubscriber{}
//s.IsInternal = true
s.task = t
t.s = s
if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil {
TransformPlugin.Error("TransformPlugin 2 Subscribe faild")
continue
} else {
//重点需要goroutin 启动订阅流且只订阅了video track 裸流
//避免重复请求播放
if !s.IsPlaying() {
TransformPlugin.Info("TransformPlugin Subscribe sucess 2 play")
go s.PlayRaw()
}
}
TransformPlugin.Info("cmd Start wait end....\n")
err = cmd.Wait()
if err != nil {
TransformPlugin.Error("Error Wait command:", zap.Error(err))
}
//复位读写指针
t.out_rp = nil
t.in_wp = nil
//关闭订阅流
if t.s != nil {
TransformPlugin.Info("try to close TransformSubscriber")
t.s.Delete()
t.s = nil
}
//关闭发布流
if t.p != nil {
t.p.Delete()
t.p = nil
TransformPlugin.Info("try to close TransformPublisher")
}
TransformPlugin.Info("ffmpegTransformThrd end to restart", zap.Int("restartFFCount", t.restartFFCount))
//延迟后重启
time.Sleep(time.Duration(1000) * time.Millisecond)
}
//TransformPlugin.Info("ffmpeg task end...")
//t.taskEnd("ffmpeg cmd end")
}
// 订阅的Track数据写入ffmpeg 输入管道
func (t *TransformTask) writeToFFPipe0(buf []byte) {
//左闭右开
// n := len(buf)
// if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 7 {
// log.Printf("pipe in SPS:%d, [%v]\n",
// len(buf), hex.EncodeToString(buf))
// } else if n > 5 && binary.BigEndian.Uint32(buf[0:5]) == 1 && buf[4]&0x1f == 8 {
// log.Printf("pipe in PPS:%d, [%v]\n",
// len(buf), hex.EncodeToString(buf))
// } else {
// if n > 5 {
// n = 5
// }
// log.Printf("pipe in raw:%d, [%v]\n",
// len(buf), hex.EncodeToString(buf[0:n]))
// }
t.status = 1
if t.in_wp == nil {
TransformPlugin.Warn("invalid in pipe wp")
return
}
t.in_bytes += len(buf)
_, err := t.in_wp.Write(buf)
if err != nil {
TransformPlugin.Error("write to pipe0 failed:", zap.Error(err))
}
}
// ffmpeg 转码后的ts 流 发布 stream
func (t *TransformTask) readFFPipe1AndToPublisher() {
//s.readTsDataFromPipeOut()
if t.out_rp == nil {
//time.Sleep(time.Duration(50) * time.Microsecond)
//continue
return
}
if t.p == nil {
//发布一个新的转码流
//定义一个发布者
p := &TransformPublisher{}
p.task = t
t.p = p
//判断流是否存在,存在则删除重新发布
s := Streams.Get(t.streamConfig.NewStreamPath)
if s != nil {
Streams.Delete(t.streamConfig.NewStreamPath)
}
TransformPlugin.Info("TransformTask TSPublisher", zap.String("newStreamPath", t.streamConfig.NewStreamPath))
if err := TransformPlugin.Publish(t.streamConfig.NewStreamPath, p); err != nil {
TransformPlugin.Error("TransformTask publish:", zap.Error(err))
return
}
p.AudioTrack = nil
p.VideoTrack = nil
//buf := make([]byte, 64*1024)
p.tsReader = NewTSReader(&p.TSPublisher)
//defer tsReader.Close()
//很重要这一步
//ffmpeg restart t.out_rp 会发生变化
if p.TSPublisher.IO.Reader != t.out_rp {
TransformPlugin.Info("TSPublisher SetIO...")
p.TSPublisher.SetIO(t.out_rp)
}
}
p := t.p
for {
if t.out_rp == nil {
TransformPlugin.Info("TransformTask TSPublisher no out rp valid exit thrd")
break
}
//err := tsReader.Feed(t.out_rp)
//很重要这一步
if p != nil && p.tsReader != nil {
err := p.tsReader.Feed(p)
if err == errors.New("file already closed") {
TransformPlugin.Error("tsReader.Feed:", zap.Error(err))
break
}
if err != nil {
TransformPlugin.Error("tsReader.Feed:", zap.Error(err))
//避免循环快速打印
time.Sleep(time.Duration(500) * time.Millisecond)
//
}
}
//log.Println("p end Feed...")
}
}
// 为了验证测试,使用者自己加固,异常处理
func (t *TransformTask) setupFfmpegTransformThrd1() {
TransformPlugin.Info("setupFfmpegTransformThrd1 url in and rtmp out...")
//转码并缩放
//成功
//ffmpeg -ss 0:01 -i "rtsp://127.0.0.1:554/njtv/glgc" -vcodec copy -vcodec libx264 -s 720*576 -f flv "rtmp://127.0.0.1:1935/njtv/glgc-d1"
pullpath := "rtsp://127.0.0.1:554/" + t.streamConfig.StreamPath
tspath := "rtmp://127.0.0.1:1935/" + t.streamConfig.NewStreamPath
//转码并保存
//cmd := exec.Command("ffmpeg", "-re", "-r", "30", "-i", "pipe:0", "-vcodec", "libx264", "-f", "flv", "pipe:1", "-y", "another.flv")
cmd := exec.Command(conf.Ffmpeg, "-re",
"-i", pullpath,
//"-vcodec", "copy",
"-tune", "zerolatency", //编码延迟参数
"-vcodec", "libx264",
"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
"-b:v", "400k",
"-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
"-s", "320*240",
"-r", "25",
"-vf",
"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts0':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
"-acodec", "libfaac",
"-b:a", "64k",
"-f",
"flv",
tspath,
)
t.cmd = cmd
// Start the command
err := cmd.Start()
if err != nil {
fmt.Println("Error starting command:", err)
return
}
log.Printf("cmd Start wait end....\n")
err = cmd.Wait()
if err != nil {
fmt.Println("Error Wait command:", err)
}
log.Printf("ffmpegTransformThrd end\n")
t.taskEnd("cmd end")
}
// 为了验证测试,使用者自己加固,异常处理
func (t *TransformTask) setupFfmpegTransformThrd2() {
TransformPlugin.Info("setupFfmpegTransformThrd2 pipe in and rtmp out...")
s := &TransformSubscriber{}
//s.IsInternal = true
s.task = t
t.s = s
// sub.onSubscriberSucess()
if err := TransformPlugin.Subscribe(t.streamConfig.StreamPath, s); err != nil {
TransformPlugin.Error("TransformPlugin 1 Subscribe faild")
t.taskEnd("Subscribe faild")
return
} else {
TransformPlugin.Info("TransformPlugin Subscribe sucess")
//重点需要goroutin
go s.PlayRaw()
}
cmd := exec.Command(conf.Ffmpeg, "-re",
"-i", "pipe:0",
"-tune", "zerolatency", //编码延迟参数
"-vcodec", "libx264",
"-g", "12", "-keyint_min", "12", //设置GOP 大小和关键帧间隔
"-b:v", "400k",
"-preset", "superfast", //编码延迟参数superfast ultrafast 影响图像质量
"-s", "320*240",
"-r", "25",
"-vf",
"drawtext=fontsize=100:fontfile=shoujin.ttf:text='m7s转码 ts1':x=500:y=500:fontcolor=green:box=1:boxcolor=yellow",
"-acodec", "libfaac",
"-b:a", "64k",
"-f",
"flv",
"rtmp://127.0.0.1:1935/"+t.streamConfig.NewStreamPath,
)
t.cmd = cmd
//获取输入流
stdin, err := cmd.StdinPipe()
if err != nil {
fmt.Println("Error getting stdin pipe:", err)
return
}
t.in_wp = stdin
// Start the command
err = cmd.Start()
if err != nil {
fmt.Println("Error starting command:", err)
return
}
log.Printf("cmd Start wait end....\n")
err = cmd.Wait()
if err != nil {
fmt.Println("Error Wait command:", err)
}
log.Printf("ffmpegTransformThrd end\n")
t.taskEnd("cmd end")
}
func (t *TransformTask) taskEnd(reason string) {
tanfsTaskArray[t.streamConfig.NewStreamPath] = nil
if t.p != nil {
t.p.Stop()
t.p = nil
}
if t.s != nil {
t.s.Stop()
t.s = nil
}
log.Printf(fmt.Sprintf("task:%s end for:%s\n", t.streamConfig.NewStreamPath, reason))
}
func (t *TransformTask) debugPrintfNal(buf []byte, name string) {
n := len(buf)
if n > 5 {
n = 5
}
log.Printf("nal %s out len:%d, [%v]\n",
name,
len(buf), hex.EncodeToString(buf[0:n]))
// s.Info("nal :",
// zap.String("int_bytes", name),
// zap.Int("int_bytes", len(buf)),
// zap.String("int_bytes", hex.EncodeToString(buf[0:n])))
}
func (t *TransformTask) writeTmpFile(data []byte) {
if t.f == nil {
f, err := os.Create("tranform-tmp.ps")
if f == nil {
log.Println("create file faild:%v", err)
return
}
t.f = f
}
_, err2 := t.f.Write(data)
if err2 != nil {
log.Println("Write file faild:%v", err2)
return
}
if t.out_bytes > 1024*1024*5 {
t.f.Close()
os.Exit(0)
}
}
/*
func (s *TransformSubscriber) OnEvent(event any) {
switch v := event.(type) {
case VideoFrame:
//s.Stop()
// var errOut util.Buffer
// firstFrame := v.GetAnnexB()
// cmd := exec.Command(conf.FFmpeg, "-hide_banner", "-i", "pipe:0", "-vframes", "1", "-f", "mjpeg", "pipe:1")
// cmd.Stdin = &firstFrame
// cmd.Stderr = &errOut
// cmd.Stdout = s
// cmd.Run()
// if errOut.CanRead() {
// s.Info(string(errOut))
// }
default:
s.Subscriber.OnEvent(event)
}
}*/