fix: restart transcode

This commit is contained in:
langhuihui
2024-11-05 09:48:43 +08:00
parent 4c7cb6fed7
commit 1e61c9ccd7
4 changed files with 26 additions and 11 deletions

View File

@@ -59,7 +59,7 @@ type (
OnInit() error OnInit() error
OnStop() OnStop()
Pull(string, config.Pull) Pull(string, config.Pull)
Transform(string, config.Transform) Transform(*Publisher, config.Transform)
OnPublish(*Publisher) OnPublish(*Publisher)
} }
@@ -388,7 +388,7 @@ func (p *Plugin) OnPublish(pub *Publisher) {
} }
tranConf.Output[j] = to tranConf.Output[j] = to
} }
p.Transform(pub.StreamPath, tranConf) p.Transform(pub, tranConf)
} }
} }
} }
@@ -500,9 +500,10 @@ func (p *Plugin) Record(streamPath string, conf config.Record) {
recorder.GetRecordJob().Init(recorder, p, streamPath, conf) recorder.GetRecordJob().Init(recorder, p, streamPath, conf)
} }
func (p *Plugin) Transform(streamPath string, conf config.Transform) { func (p *Plugin) Transform(pub *Publisher, conf config.Transform) {
transformer := p.Meta.Transformer() transformer := p.Meta.Transformer()
transformer.GetTransformJob().Init(transformer, p, streamPath, conf) job := transformer.GetTransformJob().Init(transformer, p, pub.StreamPath, conf)
job.Depend(pub)
} }
func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) { func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {

View File

@@ -5,6 +5,7 @@ import (
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
m7s "m7s.live/pro"
"net/url" "net/url"
"os" "os"
"path" "path"
@@ -135,7 +136,16 @@ func parseCrop(cropString string) (string, error) {
} }
func (t *TranscodePlugin) Launch(ctx context.Context, transReq *pb.TransRequest) (response *globalPB.SuccessResponse, err error) { func (t *TranscodePlugin) Launch(ctx context.Context, transReq *pb.TransRequest) (response *globalPB.SuccessResponse, err error) {
var publisher *m7s.Publisher
var ok bool
t.Server.Server.Call(func() error {
publisher, ok = t.Server.Streams.Get(transReq.SrcStream)
return nil
})
if !ok {
err = fmt.Errorf("src stream not found")
return
}
response = &globalPB.SuccessResponse{} response = &globalPB.SuccessResponse{}
defer func() { defer func() {
if err != nil { if err != nil {
@@ -289,7 +299,7 @@ func (t *TranscodePlugin) Launch(ctx context.Context, transReq *pb.TransRequest)
Codec: transReq.Decodec, Codec: transReq.Decodec,
} }
t.Transform(transReq.SrcStream, cfg) t.Transform(publisher, cfg)
return return
} }

View File

@@ -3,6 +3,7 @@ package transcode
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"m7s.live/pro/pkg"
"net" "net"
"net/url" "net/url"
"os" "os"
@@ -191,7 +192,10 @@ func (t *Transformer) Run() error {
defer close(rBuf) defer close(rBuf)
return live.Run() return live.Run()
} else { } else {
return t.ffmpeg.Wait() if err := t.ffmpeg.Wait(); err != nil {
return err
}
return pkg.ErrRestart
} }
} }

View File

@@ -80,7 +80,7 @@ func (p *TransformJob) Publish(streamPath string) (err error) {
p.Publisher.Type = PublishTypeTransform p.Publisher.Type = PublishTypeTransform
if err == nil { if err == nil {
p.Publisher.OnDispose(func() { p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout,pkg.ErrStopFromAPI) { if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, pkg.ErrStopFromAPI) {
p.Stop(p.Publisher.StopReason()) p.Stop(p.Publisher.StopReason())
} }
}) })
@@ -123,9 +123,9 @@ func (p *TransformJob) Start() (err error) {
return return
} }
func (p *TransformJob) TransformPublished(pub *Publisher) { //func (p *TransformJob) TransformPublished(pub *Publisher) {
//
} //}
func (p *TransformJob) Dispose() { func (p *TransformJob) Dispose() {
transList := &p.Plugin.Server.Transforms transList := &p.Plugin.Server.Transforms