feat: transcode success

This commit is contained in:
langhuihui
2024-08-30 09:26:17 +08:00
parent be337f7535
commit 60022b850c
22 changed files with 435 additions and 249 deletions

View File

@@ -1,42 +1,15 @@
global:
loglevel: debug
http:
listenaddr: :8082
onpublish:
record:
.* :
filepath: $0
# enableauth: true
# tcp:
# listenaddr: :50051
# ringsize: 20-250
# buffertime: 10s
# speed: 1
#console:
# secret: de2c0bb9fd47684adc07a426e139239b
logrotate:
level: debug
rtsp:
rtmp:
# tcp:
# listenaddr: :11935
publish:
# idletimeout: 10s
# closedelaytimeout: 4s
subscribe:
# submode: 1
# subaudio: false
onsub:
pull:
live/.*: rtmp://localhost/$0
#flv:
# pull:
# pullonstart:
# live/test: /Users/dexter/project/v5/monibuca/example/default/record/live/test.flv
gb28181:
sip:
listenaddr:
- udp::5060
onsubscribe:
onsub:
pull:
.* : $0

View File

@@ -2,5 +2,4 @@ global:
loglevel: trace
flv:
pull:
pullonstart:
live/test: /Users/dexter/Movies/jb-demo.flv
live/test: dump.flv

View File

@@ -6,4 +6,4 @@ transcode:
.+:
output:
- target: rtmp://localhost/$0/h265
conf: -loglevel trace -c:v h265
conf: -loglevel debug -c:a aac -c:v hevc_videotoolbox

View File

@@ -1,4 +1,5 @@
global:
loglevel: debug
tcp: :50050
http: :8081
rtsp:
@@ -7,4 +8,4 @@ rtsp:
live/test:
url: rtsp://localhost/live/test
maxretry: -1
retryinterval: 5s
retryinterval: 10s

View File

@@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"fmt"
"github.com/mcuadros/go-defaults"
"log/slog"
"os"
"reflect"
@@ -324,6 +325,34 @@ func (config *Config) assign(k string, v any) (target reflect.Value) {
regexpStr := source.String()
target.Set(reflect.ValueOf(Regexp{regexp.MustCompile(regexpStr)}))
default:
if ft.Kind() == reflect.Map {
target = reflect.MakeMap(ft)
tmpStruct := reflect.StructOf([]reflect.StructField{
{
Name: "Key",
Type: ft.Key(),
},
})
tmpValue := reflect.New(tmpStruct)
for k, v := range v.(map[string]any) {
_ = yaml.Unmarshal([]byte(fmt.Sprintf("key: %s", k)), tmpValue.Interface())
var value reflect.Value
if ft.Elem().Kind() == reflect.Struct {
value = reflect.New(ft.Elem())
defaults.SetDefaults(value.Interface())
if reflect.TypeOf(v).Kind() != reflect.Map {
value.Elem().Field(0).Set(reflect.ValueOf(v))
} else {
out, _ := yaml.Marshal(v)
_ = yaml.Unmarshal(out, value.Interface())
}
value = value.Elem()
} else {
value = reflect.ValueOf(v)
}
target.SetMapIndex(tmpValue.Elem().Field(0), value)
}
} else {
tmpStruct := reflect.StructOf([]reflect.StructField{
{
Name: strings.ToUpper(k),
@@ -342,6 +371,7 @@ func (config *Config) assign(k string, v any) (target reflect.Value) {
}
target = tmpValue.Elem().Field(0)
}
}
return
}

View File

@@ -38,14 +38,14 @@ type (
Pull struct {
URL string `desc:"拉流地址"`
MaxRetry int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉高于0 的数代表最大重拉次数
RetryInterval time.Duration `desc:"重试间隔" default:"5s"` // 重试间隔
RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔
Proxy string `desc:"代理地址"` // 代理地址
Header map[string][]string
}
Push struct {
URL string `desc:"推送地址"` // 推送地址
MaxRetry int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重推,0 表示不自动重推,-1 表示无限重推高于0 的数代表最大重推次数
RetryInterval time.Duration `desc:"重试间隔" default:"5s"` // 重试间隔
RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔
Proxy string `desc:"代理地址"` // 代理地址
Header map[string][]string
}
@@ -58,6 +58,7 @@ type (
Input any
Output []struct {
Target string `desc:"转码目标"` // 转码目标
StreamPath string
Conf any
}
}

View File

@@ -20,4 +20,5 @@ var (
ErrLost = errors.New("lost")
ErrRecordSamePath = errors.New("record same path")
ErrTransformSame = errors.New("transform same")
)

View File

@@ -147,11 +147,6 @@ func (mt *Job) addChild(task ITask) int {
return len(mt.children) - 1
}
func (mt *Job) removeChild(index int) {
defer mt.onChildDispose(mt.children[index])
mt.children = slices.Delete(mt.children, index, index+1)
}
func (mt *Job) run() {
cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}}
defer func() {
@@ -177,27 +172,25 @@ func (mt *Job) run() {
if !ok {
return
}
if task := rev.Interface().(ITask); task.getParent() == mt {
index := mt.addChild(task)
if err := task.start(); err == nil {
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())})
} else {
task.Stop(err)
mt.removeChild(index)
}
} else {
mt.addChild(task)
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())})
if child := rev.Interface().(ITask); child.getParent() != mt || child.start() {
mt.children = append(mt.children, child)
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(child.GetSignal())})
}
} else {
taskIndex := chosen - 1
task := mt.children[taskIndex]
switch tt := task.(type) {
child := mt.children[taskIndex]
switch tt := child.(type) {
case IChannelTask:
tt.Tick(rev.Interface())
}
if !ok {
mt.removeChild(taskIndex)
if mt.onChildDispose(child); child.checkRetry(child.StopReason()) {
if child.reset(); child.start() {
cases[chosen].Chan = reflect.ValueOf(child.GetSignal())
continue
}
}
mt.children = slices.Delete(mt.children, taskIndex, taskIndex+1)
cases = slices.Delete(cases, chosen, chosen+1)
}
}

View File

@@ -54,8 +54,10 @@ type (
GetSignal() any
Stop(error)
StopReason() error
start() error
start() bool
dispose()
checkRetry(error) bool
reset()
IsStopped() bool
GetTaskType() TaskType
GetOwnerType() string
@@ -110,7 +112,6 @@ type (
startup, shutdown *util.Promise
parent *Job
parentCtx context.Context
needRetry bool
state TaskState
level byte
}
@@ -224,31 +225,36 @@ func (task *Task) GetSignal() any {
return task.Done()
}
func (task *Task) checkRetry(err error) (bool, error) {
func (task *Task) checkRetry(err error) bool {
if errors.Is(err, ErrTaskComplete) {
return false, err
return false
}
if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry {
task.retry.RetryCount++
if task.Logger != nil {
if task.retry.MaxRetry < 0 {
task.Warn(fmt.Sprintf("retry %d/∞", task.retry.RetryCount))
} else {
task.Warn(fmt.Sprintf("retry %d/%d", task.retry.RetryCount, task.retry.MaxRetry))
}
}
if delta := time.Since(task.StartTime); delta < task.retry.RetryInterval {
time.Sleep(task.retry.RetryInterval - delta)
}
return true, err
return true
} else {
if task.retry.MaxRetry > 0 {
if task.Logger != nil {
task.Warn(fmt.Sprintf("max retry %d failed", task.retry.MaxRetry))
}
return false, errors.Join(err, ErrRetryRunOut)
return false
}
}
return false, err
return false
}
func (task *Task) start() (err error) {
func (task *Task) start() bool {
var err error
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
@@ -257,6 +263,7 @@ func (task *Task) start() (err error) {
}
}
}()
for {
task.StartTime = time.Now()
if task.Logger != nil {
task.Debug("task start", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
@@ -267,45 +274,41 @@ func (task *Task) start() (err error) {
}
if err == nil {
task.state = TASK_STATE_STARTED
task.startup.Fulfill(err)
for _, listener := range task.afterStartListeners {
listener()
}
task.ResetRetryCount()
if runHandler, ok := task.handler.(TaskBlock); ok {
task.state = TASK_STATE_RUNNING
err = runHandler.Run()
if err == nil {
return ErrTaskComplete
} else {
task.needRetry, err = task.checkRetry(err)
err = ErrTaskComplete
}
}
} else {
task.needRetry, err = task.checkRetry(err)
if task.needRetry {
defer task.reStart()
}
}
task.startup.Fulfill(err)
if err != nil {
return
}
for _, listener := range task.afterStartListeners {
listener()
}
if err == nil {
if goHandler, ok := task.handler.(TaskGo); ok {
task.state = TASK_STATE_GOING
go task.run(goHandler.Go)
}
return
return true
} else {
task.Stop(err)
task.parent.onChildDispose(task.handler)
if task.checkRetry(err) {
task.reset()
} else {
return false
}
}
}
}
func (task *Task) reStart() {
if task.IsStopped() {
func (task *Task) reset() {
task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx)
task.shutdown = util.NewPromise(context.Background())
}
task.startup = util.NewPromise(task.Context)
parent := task.parent
task.parent = nil
parent.AddTask(task.handler)
}
func (task *Task) dispose() {
@@ -331,9 +334,6 @@ func (task *Task) dispose() {
listener()
}
task.state = TASK_STATE_DISPOSED
if !errors.Is(reason, ErrTaskComplete) && task.needRetry {
task.reStart()
}
}
func (task *Task) ResetRetryCount() {
@@ -341,16 +341,9 @@ func (task *Task) ResetRetryCount() {
}
func (task *Task) run(handler func() error) {
var err error
err = handler()
if err == nil {
task.needRetry = false
if err := handler(); err == nil {
task.Stop(ErrTaskComplete)
} else {
if task.needRetry, err = task.checkRetry(err); !task.needRetry {
task.Stop(errors.Join(err, ErrRetryRunOut))
} else {
task.Stop(err)
}
}
}

View File

@@ -4,6 +4,7 @@ import (
"io"
"net"
"net/textproto"
"os"
"strings"
)
@@ -14,6 +15,7 @@ type BufReader struct {
buf MemoryReader
BufLen int
feedData func() error
Dump *os.File
}
func NewBufReaderWithBufLen(reader io.Reader, bufLen int) (r *BufReader) {
@@ -172,6 +174,9 @@ func (r *BufReader) ReadRange(n int, yield func([]byte)) (err error) {
func (r *BufReader) Read(to []byte) (n int, err error) {
n = len(to)
err = r.ReadNto(n, to)
if r.Dump != nil {
r.Dump.Write(to)
}
return
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/binary"
"io"
"math/rand"
"net"
"testing"
)
@@ -22,6 +23,29 @@ func (l *limitReader) Read(p []byte) (n int, err error) {
return
}
func TestBufReader_Buffered(t *testing.T) {
var feeder = make(chan net.Buffers, 100)
testReader := NewBufReaderBuffersChan(feeder)
testReader.BufLen = 4
t.Run("feed", func(t *testing.T) {
feeder <- net.Buffers{[]byte{1, 2, 3, 4, 5}, []byte{6, 7, 8, 9, 10}}
feeder <- net.Buffers{[]byte{11, 12, 13, 14, 15}, []byte{16, 17, 18, 19, 20}}
})
t.Run("read", func(t *testing.T) {
var b = make([]byte, 10)
testReader.Read(b)
if !bytes.Equal(b, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) {
t.Error("read error")
return
}
testReader.Read(b)
if !bytes.Equal(b, []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}) {
t.Error("read error")
return
}
})
}
func TestBufRead(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var testData = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}

View File

@@ -2,9 +2,11 @@ package m7s
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
@@ -58,6 +60,7 @@ type (
OnStop()
Pull(string, config.Pull)
Transform(string, config.Transform)
OnPublish(*Publisher)
}
IRegisterHandler interface {
@@ -79,10 +82,6 @@ type (
IQUICPlugin interface {
OnQUICConnect(quic.Connection) task.ITask
}
IListenPublishPlugin interface {
OnPublish(*Publisher) task.ITask
}
)
var plugins []PluginMeta
@@ -364,7 +363,46 @@ func (p *Plugin) OnInit() error {
func (p *Plugin) OnStop() {
}
func (p *Plugin) OnPublish(pub *Publisher) {
onPublish := p.config.OnPub
if p.Meta.Pusher != nil {
for r, pushConf := range onPublish.Push {
if group := r.FindStringSubmatch(pub.StreamPath); group != nil {
for i, g := range group {
pushConf.URL = strings.Replace(pushConf.URL, fmt.Sprintf("$%d", i), g, -1)
}
p.Push(pub.StreamPath, pushConf)
}
}
}
if p.Meta.Recorder != nil {
for r, recConf := range onPublish.Record {
if group := r.FindStringSubmatch(pub.StreamPath); group != nil {
for i, g := range group {
recConf.FilePath = strings.Replace(recConf.FilePath, fmt.Sprintf("$%d", i), g, -1)
}
p.Record(pub.StreamPath, recConf)
}
}
}
if p.Meta.Transformer != nil {
for r, tranConf := range onPublish.Transform {
if group := r.FindStringSubmatch(pub.StreamPath); group != nil {
for j, to := range tranConf.Output {
for i, g := range group {
to.Target = strings.Replace(to.Target, fmt.Sprintf("$%d", i), g, -1)
}
targetUrl, err := url.Parse(to.Target)
if err == nil {
to.StreamPath = strings.TrimPrefix(targetUrl.Path, "/")
}
tranConf.Output[j] = to
}
p.Transform(pub.StreamPath, tranConf)
}
}
}
}
func (p *Plugin) PublishWithConfig(ctx context.Context, streamPath string, conf config.Publish) (publisher *Publisher, err error) {
publisher = createPublisher(p, streamPath, conf)
if p.config.EnableAuth {
@@ -402,6 +440,14 @@ func (p *Plugin) SubscribeWithConfig(ctx context.Context, streamPath string, con
}
}
err = p.Server.Streams.AddTask(subscriber, ctx).WaitStarted()
if err == nil {
select {
case <-subscriber.waitPublishDone:
err = subscriber.Publisher.WaitTrack()
case <-subscriber.Done():
err = subscriber.Err()
}
}
return
}

90
plugin/flv/pkg/echo.go Normal file
View File

@@ -0,0 +1,90 @@
package flv
import (
"errors"
"fmt"
"io"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
func Echo(r io.Reader) (err error) {
reader := util.NewBufReader(r)
var hasAudio, hasVideo bool
var absTS uint32
var head util.Memory
head, err = reader.ReadBytes(13)
if err == nil {
var flvHead [3]byte
var version, flag byte
err = head.NewReader().ReadByteTo(&flvHead[0], &flvHead[1], &flvHead[2], &version, &flag)
if flvHead != [...]byte{'F', 'L', 'V'} {
err = errors.New("not flv file")
} else {
hasAudio = flag&0x04 != 0
hasVideo = flag&0x01 != 0
}
}
var startTs uint32
fmt.Println(hasAudio, hasVideo)
allocator := util.NewScalableMemoryAllocator(1 << 10)
var tagSize int
for offsetTs := absTS; err == nil; tagSize, err = reader.ReadBE(4) {
fmt.Println(tagSize)
t, err := reader.ReadByte()
if err != nil {
return err
}
dataSize, err := reader.ReadBE32(3)
if err != nil {
return err
}
timestamp, err := reader.ReadBE32(3)
if err != nil {
return err
}
h, err := reader.ReadByte()
if err != nil {
return err
}
timestamp = timestamp | uint32(h)<<24
if startTs == 0 {
startTs = timestamp
}
if _, err = reader.ReadBE(3); err != nil { // stream id always 0
return err
}
var frame rtmp.RTMPData
ds := int(dataSize)
frame.SetAllocator(allocator)
err = reader.ReadNto(ds, frame.NextN(ds))
if err != nil {
return err
}
absTS = offsetTs + (timestamp - startTs)
frame.Timestamp = absTS
fmt.Println(t, offsetTs, timestamp, startTs, absTS)
switch t {
case FLV_TAG_TYPE_AUDIO:
frame.Recycle()
case FLV_TAG_TYPE_VIDEO:
frame.Recycle()
case FLV_TAG_TYPE_SCRIPT:
r := frame.NewReader()
amf := &rtmp.AMF{
Buffer: util.Buffer(r.ToBytes()),
}
var obj any
obj, err = amf.Unmarshal()
name := obj
obj, err = amf.Unmarshal()
metaData := obj
frame.Recycle()
if err != nil {
return err
}
fmt.Println("script", name, metaData)
}
}
return
}

View File

@@ -0,0 +1,37 @@
package flv
import (
"errors"
"io"
"m7s.live/m7s/v5/pkg/util"
"net"
"os"
"testing"
)
func TestRead(t *testing.T) {
var feeder = make(chan net.Buffers, 100)
reader := util.NewBufReaderBuffersChan(feeder)
t.Run("feed", func(t *testing.T) {
t.Parallel()
file, _ := os.Open("/Users/dexter/Downloads/ps.flv")
for {
var buf = make([]byte, 1024)
n, err := file.Read(buf)
if err != nil {
close(feeder)
break
}
feeder <- net.Buffers{buf[:n]}
}
})
t.Run("read", func(t *testing.T) {
t.Parallel()
err := Echo(reader)
if err != nil && !errors.Is(err, io.EOF) {
t.Error(err)
t.FailNow()
}
})
}

View File

@@ -5,10 +5,7 @@ import (
transcode "m7s.live/m7s/v5/plugin/transcode/pkg"
)
var (
//_ m7s.IListenPublishPlugin = (*TranscodePlugin)(nil)
_ = m7s.InstallPlugin[TranscodePlugin](transcode.NewTransform)
)
var _ = m7s.InstallPlugin[TranscodePlugin](transcode.NewTransform)
type TranscodePlugin struct {
m7s.Plugin

View File

@@ -1,35 +0,0 @@
package transcode
import (
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
flv "m7s.live/m7s/v5/plugin/flv/pkg"
"net"
)
type PipeInput struct {
task.Task
rBuf chan net.Buffers
*util.BufReader
flv.Live
}
func (p *PipeInput) Start() (err error) {
p.rBuf = make(chan net.Buffers, 100)
p.BufReader = util.NewBufReaderBuffersChan(p.rBuf)
p.rBuf <- net.Buffers{flv.FLVHead}
p.WriteFlvTag = func(flv net.Buffers) (err error) {
select {
case p.rBuf <- flv:
default:
p.Warn("pipe input buffer full")
}
return
}
return
}
func (p *PipeInput) Dispose() {
close(p.rBuf)
p.BufReader.Recycle()
}

View File

@@ -8,6 +8,7 @@ import (
"m7s.live/m7s/v5/pkg/util"
flv "m7s.live/m7s/v5/plugin/flv/pkg"
"net"
"net/url"
"os/exec"
"strings"
"time"
@@ -46,8 +47,12 @@ type (
func NewTransform() m7s.ITransformer {
ret := &Transformer{}
ret.WriteFlvTag = func(flv net.Buffers) (err error) {
var buffer []byte
for _, b := range flv {
buffer = append(buffer, b...)
}
select {
case ret.rBuf <- flv:
case ret.rBuf <- buffer:
default:
ret.Warn("pipe input buffer full")
}
@@ -59,7 +64,7 @@ func NewTransform() m7s.ITransformer {
type Transformer struct {
m7s.DefaultTransformer
TransRule
rBuf chan net.Buffers
rBuf chan []byte
*util.BufReader
flv.Live
}
@@ -95,11 +100,17 @@ func (t *Transformer) Start() (err error) {
}
t.To[i] = enc
args = append(args, strings.Fields(enc.Args)...)
if strings.HasPrefix(to.Target, "rtmp://") {
var targetUrl *url.URL
targetUrl, err = url.Parse(to.Target)
if err != nil {
return
}
switch targetUrl.Scheme {
case "rtmp":
args = append(args, "-f", "flv", to.Target)
} else if strings.HasPrefix(to.Target, "rtsp://") {
case "rtsp":
args = append(args, "-f", "rtsp", to.Target)
} else {
default:
args = append(args, to.Target)
}
}
@@ -107,10 +118,10 @@ func (t *Transformer) Start() (err error) {
"cmd": args,
"config": t.TransRule,
}
t.rBuf = make(chan net.Buffers, 100)
t.BufReader = util.NewBufReaderBuffersChan(t.rBuf)
t.rBuf = make(chan []byte, 100)
t.BufReader = util.NewBufReaderChan(t.rBuf)
t.Subscriber = t.TransformJob.Subscriber
//t.BufReader.Dump, err = os.OpenFile("dump.flv", os.O_CREATE|os.O_WRONLY, 0644)
var cmdTask CommandTask
cmdTask.logFileName = fmt.Sprintf("logs/transcode_%s_%s.log", strings.ReplaceAll(t.TransformJob.StreamPath, "/", "_"), time.Now().Format("20060102150405"))
cmdTask.Cmd = exec.CommandContext(t, "ffmpeg", args...)

View File

@@ -2,14 +2,12 @@ package m7s
import (
"context"
"fmt"
"m7s.live/m7s/v5/pkg/task"
"math"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"sync"
"time"
@@ -171,48 +169,14 @@ func (p *Publisher) Start() (err error) {
s.Waiting.Remove(waiting)
}
for plugin := range s.Plugins.Range {
if plugin.Disabled {
continue
}
onPublish := plugin.GetCommonConf().OnPub
if plugin.Meta.Pusher != nil {
for r, pushConf := range onPublish.Push {
if group := r.FindStringSubmatch(p.StreamPath); group != nil {
for i, g := range group {
pushConf.URL = strings.Replace(pushConf.URL, fmt.Sprintf("$%d", i), g, -1)
}
plugin.Push(p.StreamPath, pushConf)
}
}
}
if plugin.Meta.Recorder != nil {
for r, recConf := range onPublish.Record {
if group := r.FindStringSubmatch(p.StreamPath); group != nil {
for i, g := range group {
recConf.FilePath = strings.Replace(recConf.FilePath, fmt.Sprintf("$%d", i), g, -1)
}
plugin.Record(p.StreamPath, recConf)
}
}
}
if plugin.Meta.Transformer != nil {
for r, tranConf := range onPublish.Transform {
if group := r.FindStringSubmatch(p.StreamPath); group != nil {
for j, to := range tranConf.Output {
for i, g := range group {
to.Target = strings.Replace(to.Target, fmt.Sprintf("$%d", i), g, -1)
}
tranConf.Output[j] = to
}
plugin.Transform(p.StreamPath, tranConf)
}
}
}
if v, ok := plugin.handler.(IListenPublishPlugin); ok {
v.OnPublish(p)
plugin.OnPublish(p)
}
s.Transforms.Post(func() error {
if m, ok := s.Transforms.Transformed.Get(p.StreamPath); ok {
m.TransformJob.TransformPublished(p)
}
return nil
})
p.AddTask(&PublishTimeout{Publisher: p})
if p.PublishTimeout > 0 {
p.AddTask(&PublishNoDataTimeout{Publisher: p})
@@ -297,6 +261,7 @@ func (p *Publisher) RemoveSubscriber(subscriber *Subscriber) {
func (p *Publisher) AddSubscriber(subscriber *Subscriber) {
subscriber.Publisher = p
close(subscriber.waitPublishDone)
if p.Subscribers.AddUnique(subscriber) {
p.Info("subscriber +1", "count", p.Subscribers.Length)
if subscriber.BufferTime > p.BufferTime {

View File

@@ -115,6 +115,7 @@ func (p *HTTPFilePuller) Start() (err error) {
if res, err = os.Open(remoteURL); err == nil {
p.ReadCloser = res
}
//p.PullJob.Publisher.Publish.Speed = 1
}
return
}

View File

@@ -75,7 +75,7 @@ type Server struct {
Pulls task.Manager[string, *PullJob]
Pushs task.Manager[string, *PushJob]
Records task.Manager[string, *RecordJob]
Transforms task.Manager[string, *TransformJob]
Transforms Transforms
Subscribers SubscriberCollection
LogHandler MultiLogHandler
apiList []string

View File

@@ -60,12 +60,13 @@ type Subscriber struct {
PubSubBase
config.Subscribe
Publisher *Publisher
waitPublishDone chan struct{}
AudioReader, VideoReader *AVRingReader
StartAudioTS, StartVideoTS time.Duration
}
func createSubscriber(p *Plugin, streamPath string, conf config.Subscribe) *Subscriber {
subscriber := &Subscriber{Subscribe: conf}
subscriber := &Subscriber{Subscribe: conf, waitPublishDone: make(chan struct{})}
subscriber.ID = task.GetNextTaskID()
subscriber.Plugin = p
subscriber.TimeoutTimer = time.NewTimer(subscriber.WaitTimeout)
@@ -83,11 +84,21 @@ func (s *Subscriber) Start() (err error) {
s.Info("subscribe")
if publisher, ok := server.Streams.Get(s.StreamPath); ok {
publisher.AddSubscriber(s)
err = publisher.WaitTrack()
return publisher.WaitTrack()
} else if waitStream, ok := server.Waiting.Get(s.StreamPath); ok {
waitStream.Add(s)
} else {
server.createWait(s.StreamPath).Add(s)
// var avoidTrans bool
//AVOID:
// for trans := range server.Transforms.Range {
// for _, output := range trans.Config.Output {
// if output.StreamPath == s.StreamPath {
// avoidTrans = true
// break AVOID
// }
// }
// }
for plugin := range server.Plugins.Range {
for reg, conf := range plugin.GetCommonConf().OnSub.Pull {
if plugin.Meta.Puller != nil {
@@ -99,21 +110,23 @@ func (s *Subscriber) Start() (err error) {
plugin.handler.Pull(s.StreamPath, conf)
}
}
for reg, conf := range plugin.GetCommonConf().OnSub.Transform {
if plugin.Meta.Transformer != nil {
if reg.MatchString(s.StreamPath) {
if group := reg.FindStringSubmatch(s.StreamPath); group != nil {
for j, c := range conf.Output {
for i, value := range group {
c.Target = strings.Replace(c.Target, fmt.Sprintf("$%d", i), value, -1)
}
conf.Output[j] = c
}
}
plugin.handler.Transform(s.StreamPath, conf)
}
}
}
//if !avoidTrans {
// for reg, conf := range plugin.GetCommonConf().OnSub.Transform {
// if plugin.Meta.Transformer != nil {
// if reg.MatchString(s.StreamPath) {
// if group := reg.FindStringSubmatch(s.StreamPath); group != nil {
// for j, c := range conf.Output {
// for i, value := range group {
// c.Target = strings.Replace(c.Target, fmt.Sprintf("$%d", i), value, -1)
// }
// conf.Output[j] = c
// }
// }
// plugin.handler.Transform(s.StreamPath, conf)
// }
// }
// }
//}
}
}
return

View File

@@ -4,6 +4,7 @@ import (
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
)
type (
@@ -25,8 +26,20 @@ type (
task.Job
TransformJob TransformJob
}
TransformedMap struct {
StreamPath string
TransformJob *TransformJob
}
Transforms struct {
Transformed util.Collection[string, *TransformedMap]
task.Manager[string, *TransformJob]
}
)
func (t *TransformedMap) GetKey() string {
return t.StreamPath
}
func (r *DefaultTransformer) GetTransformJob() *TransformJob {
return &r.TransformJob
}
@@ -61,8 +74,36 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath
func (p *TransformJob) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Transforms.Get(p.GetKey()); ok {
return pkg.ErrRecordSamePath
return pkg.ErrTransformSame
}
if _, ok := s.Transforms.Transformed.Get(p.GetKey()); ok {
return pkg.ErrStreamExist
}
for _, to := range p.Config.Output {
if to.StreamPath != "" {
s.Transforms.Transformed.Set(&TransformedMap{
StreamPath: to.StreamPath,
TransformJob: p,
})
}
}
p.AddTask(p.transformer, p.Logger)
return
}
func (p *TransformJob) TransformPublished(pub *Publisher) {
p.Publisher = pub
pub.OnDispose(func() {
p.Stop(pub.StopReason())
})
}
func (p *TransformJob) Dispose() {
for _, to := range p.Config.Output {
if to.StreamPath != "" {
p.Plugin.Server.Transforms.Transformed.RemoveByKey(to.StreamPath)
}
}
}