Files
monibuca/transformer.go
2024-12-31 09:58:41 +08:00

142 lines
3.4 KiB
Go

package m7s
import (
"context"
"slices"
"time"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
type (
ITransformer interface {
task.ITask
GetTransformJob() *TransformJob
}
Transformer = func() ITransformer
TransformJob struct {
task.Job
StreamPath string // 对应本地流
Config config.Transform // 对应目标流
Plugin *Plugin
Publisher *Publisher
Subscriber *Subscriber
Transformer ITransformer
}
DefaultTransformer struct {
task.Task
TransformJob TransformJob
}
TransformedMap struct {
StreamPath string
Target string
TransformJob *TransformJob
}
Transforms struct {
task.Work
util.Collection[string, *TransformedMap]
//PublishEvent chan *Publisher
}
// TransformsPublishEvent struct {
// task.ChannelTask
// Transforms *Transforms
// }
)
//func (t *TransformsPublishEvent) GetSignal() any {
// return t.Transforms.PublishEvent
//}
//
//func (t *TransformsPublishEvent) Tick(pub any) {
// incomingPublisher := pub.(*Publisher)
// for job := range t.Transforms.Search(func(m *TransformedMap) bool {
// return m.StreamPath == incomingPublisher.StreamPath
// }) {
// job.TransformJob.TransformPublished(incomingPublisher)
// }
//}
func (t *TransformedMap) GetKey() string {
return t.Target
}
func (r *DefaultTransformer) GetTransformJob() *TransformJob {
return &r.TransformJob
}
func (p *TransformJob) Subscribe() (err error) {
subConfig := p.Plugin.config.Subscribe
subConfig.SubType = SubscribeTypeTransform
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.Transformer, p.StreamPath, subConfig)
if err == nil {
p.Transformer.Depend(p.Subscriber)
}
return
}
func (p *TransformJob) Publish(streamPath string) (err error) {
var conf = p.Plugin.GetCommonConf().Publish
conf.PubType = PublishTypeTransform
p.Publisher, err = p.Plugin.PublishWithConfig(context.WithValue(p.Transformer, Owner, p.Transformer), streamPath, conf)
if err == nil {
p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) {
p.Stop(p.Publisher.StopReason())
} else {
p.Transformer.Stop(p.Publisher.StopReason())
}
})
}
return
}
func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath string, conf config.Transform) *TransformJob {
p.Plugin = plugin
p.Config = conf
p.StreamPath = streamPath
p.Transformer = transformer
p.SetDescriptions(task.Description{
"streamPath": streamPath,
"conf": conf,
})
transformer.SetRetry(-1, time.Second*2)
plugin.Server.Transforms.AddTask(p, plugin.Logger.With("streamPath", streamPath))
return p
}
func (p *TransformJob) Start() (err error) {
s := p.Plugin.Server
if slices.ContainsFunc(p.Config.Output, func(to config.TransfromOutput) bool {
return s.Transforms.Has(to.Target)
}) {
return pkg.ErrTransformSame
}
for _, to := range p.Config.Output {
if to.Target != "" {
s.Transforms.Set(&TransformedMap{
StreamPath: to.StreamPath,
Target: to.Target,
TransformJob: p,
})
}
}
p.Info("transform +1", "count", s.Transforms.Length)
p.AddTask(p.Transformer, p.Logger)
return
}
//func (p *TransformJob) TransformPublished(pub *Publisher) {
//
//}
func (p *TransformJob) Dispose() {
transList := &p.Plugin.Server.Transforms
p.Info("transform -1", "count", transList.Length)
for _, to := range p.Config.Output {
transList.RemoveByKey(to.Target)
}
}