refactor: rename marco task to job

This commit is contained in:
langhuihui
2024-08-25 11:33:10 +08:00
parent c38d3fdc50
commit a214f51378
42 changed files with 253 additions and 436 deletions

9
api.go
View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"m7s.live/m7s/v5/pkg/task"
"maps"
"net"
"net/http"
@@ -13,6 +12,8 @@ import (
"strings"
"time"
"m7s.live/m7s/v5/pkg/task"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
@@ -146,15 +147,15 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
}
func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
var fillData func(m task.IMarcoTask) *pb.TaskTreeResponse
fillData = func(m task.IMarcoTask) (res *pb.TaskTreeResponse) {
var fillData func(m task.IJob) *pb.TaskTreeResponse
fillData = func(m task.IJob) (res *pb.TaskTreeResponse) {
res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Blocked: m.Blocked(), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) {
for k, v := range m.GetTask().Description {
yield(k, fmt.Sprintf("%v", v))
}
})}
for t := range m.RangeSubTask {
if marcoTask, ok := t.(task.IMarcoTask); ok {
if marcoTask, ok := t.(task.IJob); ok {
res.Children = append(res.Children, fillData(marcoTask))
} else {
res.Children = append(res.Children, &pb.TaskTreeResponse{Id: t.GetTaskID(), State: uint32(t.GetState()), Type: uint32(t.GetTaskType()), Owner: t.GetOwnerType(), StartTime: timestamppb.New(t.GetTask().StartTime)})

View File

@@ -18,6 +18,6 @@ import (
func main() {
ctx := context.Background()
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml"))
go m7s.Run(ctx, "config2.yaml")
m7s.Run(ctx, "config1.yaml")
}

View File

@@ -17,7 +17,7 @@ func main() {
flag.BoolVar(&multi, "multi", false, "debug")
flag.Parse()
if multi {
m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml"))
go m7s.Run(ctx, "config2.yaml")
}
time.Sleep(time.Second)
m7s.Run(ctx, "config1.yaml")

View File

@@ -17,7 +17,7 @@ func main() {
flag.BoolVar(&multi, "multi", false, "debug")
flag.Parse()
if multi {
m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml"))
go m7s.Run(ctx, "config2.yaml")
}
time.Sleep(time.Second)
m7s.Run(ctx, "config1.yaml")

View File

@@ -90,14 +90,14 @@ func (config *HTTP) GetHTTPConfig() *HTTP {
// return config.mux.Handler(r)
// }
func (config *HTTP) CreateHTTPTask(logger *slog.Logger) *ListenHTTPTask {
ret := &ListenHTTPTask{HTTP: config}
func (config *HTTP) CreateHTTPWork(logger *slog.Logger) *ListenHTTPWork {
ret := &ListenHTTPWork{HTTP: config}
ret.Logger = logger.With("addr", config.ListenAddr)
return ret
}
func (config *HTTP) CreateHTTPSTask(logger *slog.Logger) *ListenHTTPSTask {
ret := &ListenHTTPSTask{ListenHTTPTask{HTTP: config}}
func (config *HTTP) CreateHTTPSWork(logger *slog.Logger) *ListenHTTPSWork {
ret := &ListenHTTPSWork{ListenHTTPWork{HTTP: config}}
ret.Logger = logger.With("addr", config.ListenAddrTLS)
return ret
}
@@ -166,13 +166,13 @@ func BasicAuth(u, p string, next http.Handler) http.Handler {
})
}
type ListenHTTPTask struct {
type ListenHTTPWork struct {
task.Task
*HTTP
*http.Server
}
func (task *ListenHTTPTask) Start() (err error) {
func (task *ListenHTTPWork) Start() (err error) {
task.Server = &http.Server{
Addr: task.ListenAddr,
ReadTimeout: task.HTTP.ReadTimeout,
@@ -183,21 +183,21 @@ func (task *ListenHTTPTask) Start() (err error) {
return
}
func (task *ListenHTTPTask) Go() error {
func (task *ListenHTTPWork) Go() error {
task.Info("listen http")
return task.Server.ListenAndServe()
}
func (task *ListenHTTPTask) Dispose() {
func (task *ListenHTTPWork) Dispose() {
task.Info("http server stop")
task.Server.Close()
}
type ListenHTTPSTask struct {
ListenHTTPTask
type ListenHTTPSWork struct {
ListenHTTPWork
}
func (task *ListenHTTPSTask) Start() (err error) {
func (task *ListenHTTPSWork) Start() (err error) {
cer, _ := tls.X509KeyPair(LocalCert, LocalKey)
task.Server = &http.Server{
Addr: task.HTTP.ListenAddrTLS,
@@ -233,7 +233,7 @@ func (task *ListenHTTPSTask) Start() (err error) {
return
}
func (task *ListenHTTPSTask) Go() error {
func (task *ListenHTTPSWork) Go() error {
task.Info("listen https")
return task.Server.ListenAndServeTLS(task.HTTP.CertFile, task.HTTP.KeyFile)
}

View File

@@ -3,8 +3,9 @@ package config
import (
"context"
"crypto/tls"
"github.com/quic-go/quic-go"
"log/slog"
"github.com/quic-go/quic-go"
"m7s.live/m7s/v5/pkg/task"
)
@@ -19,8 +20,8 @@ type Quic struct {
AutoListen bool `default:"true" desc:"是否自动监听"`
}
func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic.Connection) task.ITask) *ListenQuicTask {
ret := &ListenQuicTask{
func (q *Quic) CreateQUICWork(logger *slog.Logger, handler func(connection quic.Connection) task.ITask) *ListenQuicWork {
ret := &ListenQuicWork{
Quic: q,
handler: handler,
}
@@ -28,14 +29,14 @@ func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic.
return ret
}
type ListenQuicTask struct {
task.MarcoLongTask
type ListenQuicWork struct {
task.Work
*Quic
*quic.Listener
handler func(connection quic.Connection) task.ITask
}
func (task *ListenQuicTask) Start() (err error) {
func (task *ListenQuicWork) Start() (err error) {
var ltsc *tls.Config
ltsc, err = GetTLSConfig(task.CertFile, task.KeyFile)
if err != nil {
@@ -52,7 +53,7 @@ func (task *ListenQuicTask) Start() (err error) {
return
}
func (task *ListenQuicTask) Go() error {
func (task *ListenQuicWork) Go() error {
for {
conn, err := task.Accept(task.Context)
if err != nil {
@@ -63,6 +64,6 @@ func (task *ListenQuicTask) Go() error {
}
}
func (task *ListenQuicTask) Dispose() {
func (task *ListenQuicWork) Dispose() {
_ = task.Listener.Close()
}

View File

@@ -4,10 +4,11 @@ import (
"crypto/tls"
_ "embed"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"net"
"runtime"
"time"
"m7s.live/m7s/v5/pkg/task"
)
//go:embed local.monibuca.com_bundle.pem
@@ -43,28 +44,28 @@ type TCP struct {
AutoListen bool `default:"true" desc:"是否自动监听"`
}
func (config *TCP) CreateTCPTask(logger *slog.Logger, handler TCPHandler) *ListenTCPTask {
ret := &ListenTCPTask{TCP: config, handler: handler}
func (config *TCP) CreateTCPWork(logger *slog.Logger, handler TCPHandler) *ListenTCPWork {
ret := &ListenTCPWork{TCP: config, handler: handler}
ret.Logger = logger.With("addr", config.ListenAddr)
return ret
}
func (config *TCP) CreateTCPTLSTask(logger *slog.Logger, handler TCPHandler) *ListenTCPTLSTask {
ret := &ListenTCPTLSTask{ListenTCPTask{TCP: config, handler: handler}}
func (config *TCP) CreateTCPTLSWork(logger *slog.Logger, handler TCPHandler) *ListenTCPTLSWork {
ret := &ListenTCPTLSWork{ListenTCPWork{TCP: config, handler: handler}}
ret.Logger = logger.With("addr", config.ListenAddrTLS)
return ret
}
type TCPHandler = func(conn *net.TCPConn) task.ITask
type ListenTCPTask struct {
task.MarcoLongTask
type ListenTCPWork struct {
task.Work
*TCP
net.Listener
handler TCPHandler
}
func (task *ListenTCPTask) Start() (err error) {
func (task *ListenTCPWork) Start() (err error) {
task.Listener, err = net.Listen("tcp", task.ListenAddr)
if err == nil {
task.Info("listen tcp")
@@ -84,16 +85,16 @@ func (task *ListenTCPTask) Start() (err error) {
return
}
func (task *ListenTCPTask) Dispose() {
func (task *ListenTCPWork) Dispose() {
task.Info("tcp server stop")
task.Listener.Close()
}
type ListenTCPTLSTask struct {
ListenTCPTask
type ListenTCPTLSWork struct {
ListenTCPWork
}
func (task *ListenTCPTLSTask) Start() (err error) {
func (task *ListenTCPTLSWork) Start() (err error) {
var tlsConfig *tls.Config
if tlsConfig, err = GetTLSConfig(task.CertFile, task.KeyFile); err != nil {
return
@@ -107,7 +108,7 @@ func (task *ListenTCPTLSTask) Start() (err error) {
return
}
func (task *ListenTCPTask) listen(handler TCPHandler) {
func (task *ListenTCPWork) listen(handler TCPHandler) {
var tempDelay time.Duration
for {
conn, err := task.Accept()

View File

@@ -3,9 +3,10 @@ package config
import (
"crypto/tls"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"net"
"time"
"m7s.live/m7s/v5/pkg/task"
)
type UDP struct {
@@ -15,24 +16,24 @@ type UDP struct {
AutoListen bool `default:"true" desc:"是否自动监听"`
}
func (config *UDP) CreateUDPTask(logger *slog.Logger, handler func(conn *net.UDPConn) task.ITask) *ListenUDPTask {
ret := &ListenUDPTask{UDP: config, handler: handler}
func (config *UDP) CreateUDPWork(logger *slog.Logger, handler func(conn *net.UDPConn) task.ITask) *ListenUDPWork {
ret := &ListenUDPWork{UDP: config, handler: handler}
ret.Logger = logger.With("addr", config.ListenAddr)
return ret
}
type ListenUDPTask struct {
task.MarcoLongTask
type ListenUDPWork struct {
task.Work
*UDP
net.Listener
handler func(conn *net.UDPConn) task.ITask
}
func (task *ListenUDPTask) Dispose() {
func (task *ListenUDPWork) Dispose() {
task.Close()
}
func (task *ListenUDPTask) Start() (err error) {
func (task *ListenUDPWork) Start() (err error) {
task.Listener, err = net.Listen("udp", task.ListenAddr)
if err == nil {
task.Info("listen udp")
@@ -42,7 +43,7 @@ func (task *ListenUDPTask) Start() (err error) {
return
}
func (task *ListenUDPTask) Go() error {
func (task *ListenUDPWork) Go() error {
var tempDelay time.Duration
for {
conn, err := task.Accept()

View File

@@ -1,13 +0,0 @@
package task
type MarcoLongTask struct {
MarcoTask
}
func (m *MarcoLongTask) keepalive() bool {
return true
}
func (*MarcoLongTask) GetTaskType() TaskType {
return TASK_TYPE_LONG_MACRO
}

View File

@@ -1,199 +0,0 @@
package task
import (
"context"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"reflect"
"slices"
"sync"
"sync/atomic"
)
var idG atomic.Uint32
func GetNextTaskID() uint32 {
return idG.Add(1)
}
// MarcoTask include sub tasks
type MarcoTask struct {
Task
addSub chan ITask
children []ITask
lazyRun sync.Once
childrenDisposed chan struct{}
childDisposeListeners []func(ITask)
blocked bool
}
func (*MarcoTask) GetTaskType() TaskType {
return TASK_TYPE_MACRO
}
func (mt *MarcoTask) getMarcoTask() *MarcoTask {
return mt
}
func (mt *MarcoTask) Blocked() bool {
return mt.blocked
}
func (mt *MarcoTask) waitChildrenDispose() {
close(mt.addSub)
<-mt.childrenDisposed
}
func (mt *MarcoTask) OnChildDispose(listener func(ITask)) {
mt.childDisposeListeners = append(mt.childDisposeListeners, listener)
}
func (mt *MarcoTask) onChildDispose(child ITask) {
for _, listener := range mt.childDisposeListeners {
listener(child)
}
if mt.parent != nil {
mt.parent.onChildDispose(child)
}
if child.getParent() == mt {
child.dispose()
}
}
func (mt *MarcoTask) dispose() {
if mt.childrenDisposed != nil {
mt.OnBeforeDispose(mt.waitChildrenDispose)
}
mt.Task.dispose()
}
func (mt *MarcoTask) RangeSubTask(callback func(task ITask) bool) {
for _, task := range mt.children {
callback(task)
}
}
func (mt *MarcoTask) AddTaskLazy(t IMarcoTask) {
task := t.GetTask()
task.parent = mt
task.handler = t
}
func (mt *MarcoTask) AddTask(t ITask, opt ...any) (task *Task) {
mt.lazyRun.Do(func() {
if mt.parent != nil && mt.Context == nil {
mt.parent.AddTask(mt.handler)
}
mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
go mt.run()
})
if task = t.GetTask(); task.Context == nil {
task.parentCtx = mt.Context
for _, o := range opt {
switch v := o.(type) {
case context.Context:
task.parentCtx = v
case Description:
task.Description = v
case RetryConfig:
task.retry = v
case *slog.Logger:
task.Logger = v
}
}
if task.parentCtx == nil {
panic("context is nil")
}
task.parent = mt
task.level = mt.level + 1
if task.ID == 0 {
task.ID = GetNextTaskID()
}
task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx)
task.startup = util.NewPromise(task.Context)
task.shutdown = util.NewPromise(context.Background())
task.handler = t
if task.Logger == nil {
task.Logger = mt.Logger
}
}
if mt.IsStopped() {
task.startup.Reject(mt.StopReason())
return
}
mt.addSub <- t
return
}
func (mt *MarcoTask) Call(callback func() error) {
mt.Post(callback).WaitStarted()
}
func (mt *MarcoTask) Post(callback func() error) *Task {
task := CreateTaskByCallBack(callback, nil)
return mt.AddTask(task)
}
func (mt *MarcoTask) addChild(task ITask) int {
mt.children = append(mt.children, task)
return len(mt.children) - 1
}
func (mt *MarcoTask) removeChild(index int) {
mt.onChildDispose(mt.children[index])
mt.children = slices.Delete(mt.children, index, index+1)
}
func (mt *MarcoTask) run() {
cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}}
defer func() {
err := recover()
if err != nil {
mt.Stop(err.(error))
}
stopReason := mt.StopReason()
for _, task := range mt.children {
task.Stop(stopReason)
mt.onChildDispose(task)
}
mt.children = nil
close(mt.childrenDisposed)
}()
for {
mt.blocked = false
if chosen, rev, ok := reflect.Select(cases); chosen == 0 {
mt.blocked = true
if !ok {
return
}
if task := rev.Interface().(ITask); task.getParent() == mt {
index := mt.addChild(task)
if err := task.start(); err == nil {
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())})
} else {
task.Stop(err)
mt.removeChild(index)
}
} else {
mt.addChild(task)
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())})
}
} else {
taskIndex := chosen - 1
task := mt.children[taskIndex]
switch tt := task.(type) {
case IChannelTask:
tt.Tick(rev.Interface())
}
if !ok {
mt.removeChild(taskIndex)
cases = slices.Delete(cases, chosen, chosen+1)
}
}
if !mt.handler.keepalive() && len(mt.children) == 0 {
mt.Stop(ErrAutoStop)
}
}
}

View File

@@ -10,16 +10,16 @@ type ManagerItem[K comparable] interface {
}
type Manager[K comparable, T ManagerItem[K]] struct {
MarcoLongTask
Work
Collection[K, T]
}
func (m *Manager[K, T]) Add(ctx T) {
func (m *Manager[K, T]) Add(ctx T, opt ...any) *Task {
ctx.OnStart(func() {
m.Collection.Add(ctx)
})
ctx.OnDispose(func() {
m.Remove(ctx)
})
m.AddTask(ctx)
return m.AddTask(ctx, opt...)
}

View File

@@ -5,11 +5,12 @@ import (
"errors"
"fmt"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"reflect"
"runtime/debug"
"strings"
"time"
"m7s.live/m7s/v5/pkg/util"
)
const TraceLevel = slog.Level(-8)
@@ -34,9 +35,9 @@ const (
)
const (
TASK_TYPE_BASE TaskType = iota
TASK_TYPE_MACRO
TASK_TYPE_LONG_MACRO
TASK_TYPE_TASK TaskType = iota
TASK_TYPE_JOB
TASK_TYPE_Work
TASK_TYPE_CHANNEL
TASK_TYPE_CALL
)
@@ -46,7 +47,7 @@ type (
TaskType byte
ITask interface {
keepalive() bool
getParent() *MarcoTask
getParent() *Job
GetParent() ITask
GetTask() *Task
GetTaskID() uint32
@@ -65,9 +66,9 @@ type (
GetState() TaskState
GetLevel() byte
}
IMarcoTask interface {
IJob interface {
ITask
getMarcoTask() *MarcoTask
getJob() *Job
AddTask(ITask, ...any) *Task
RangeSubTask(func(yield ITask) bool)
OnChildDispose(func(ITask))
@@ -107,7 +108,7 @@ type (
afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func()
Description
startup, shutdown *util.Promise
parent *MarcoTask
parent *Job
parentCtx context.Context
needRetry bool
state TaskState
@@ -148,14 +149,14 @@ func (task *Task) GetOwnerType() string {
}
func (*Task) GetTaskType() TaskType {
return TASK_TYPE_BASE
return TASK_TYPE_TASK
}
func (task *Task) GetTask() *Task {
return task
}
func (task *Task) getParent() *MarcoTask {
func (task *Task) getParent() *Job {
return task.parent
}

View File

@@ -10,8 +10,8 @@ import (
"time"
)
func createMarcoTask() *MarcoTask {
var mt MarcoTask
func createMarcoTask() *Job {
var mt Job
mt.Context, mt.CancelCauseFunc = context.WithCancelCause(context.Background())
mt.handler = &mt
mt.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
@@ -78,7 +78,7 @@ func Test_StopByContext(t *testing.T) {
func Test_ParentStop(t *testing.T) {
mt := createMarcoTask()
parent := &MarcoTask{}
parent := &Job{}
mt.AddTask(parent)
var task Task
parent.AddTask(&task)

View File

@@ -3,7 +3,6 @@ package m7s
import (
"context"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/http"
"os"
@@ -12,6 +11,8 @@ import (
"runtime"
"strings"
"m7s.live/m7s/v5/pkg/task"
"github.com/quic-go/quic-go"
gatewayRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@@ -52,7 +53,7 @@ type (
}
IPlugin interface {
task.IMarcoTask
task.IJob
OnInit() error
OnStop()
Pull(path string, url string)
@@ -196,7 +197,7 @@ func InstallPlugin[C iPlugin](options ...any) error {
}
type Plugin struct {
task.MarcoLongTask
task.Work
Disabled bool
Meta *PluginMeta
config config.Common
@@ -301,24 +302,24 @@ func (p *Plugin) listen() (err error) {
httpConf := &p.config.HTTP
if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.Server.config.HTTP.ListenAddrTLS) {
p.stopOnError(httpConf.CreateHTTPSTask(p.Logger))
p.stopOnError(httpConf.CreateHTTPSWork(p.Logger))
}
if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.Server.config.HTTP.ListenAddr) {
p.stopOnError(httpConf.CreateHTTPTask(p.Logger))
p.stopOnError(httpConf.CreateHTTPWork(p.Logger))
}
if tcphandler, ok := p.handler.(ITCPPlugin); ok {
tcpConf := &p.config.TCP
if tcpConf.ListenAddr != "" && tcpConf.AutoListen {
task := tcpConf.CreateTCPTask(p.Logger, tcphandler.OnTCPConnect)
task := tcpConf.CreateTCPWork(p.Logger, tcphandler.OnTCPConnect)
err = p.AddTask(task).WaitStarted()
if err != nil {
return
}
}
if tcpConf.ListenAddrTLS != "" && tcpConf.AutoListen {
task := tcpConf.CreateTCPTLSTask(p.Logger, tcphandler.OnTCPConnect)
task := tcpConf.CreateTCPTLSWork(p.Logger, tcphandler.OnTCPConnect)
err = p.AddTask(task).WaitStarted()
if err != nil {
return
@@ -329,7 +330,7 @@ func (p *Plugin) listen() (err error) {
if udpHandler, ok := p.handler.(IUDPPlugin); ok {
udpConf := &p.config.UDP
if udpConf.ListenAddr != "" && udpConf.AutoListen {
task := udpConf.CreateUDPTask(p.Logger, udpHandler.OnUDPConnect)
task := udpConf.CreateUDPWork(p.Logger, udpHandler.OnUDPConnect)
err = p.AddTask(task).WaitStarted()
if err != nil {
return
@@ -340,7 +341,7 @@ func (p *Plugin) listen() (err error) {
if quicHandler, ok := p.handler.(IQUICPlugin); ok {
quicConf := &p.config.Quic
if quicConf.ListenAddr != "" && quicConf.AutoListen {
task := quicConf.CreateQUICTask(p.Logger, quicHandler.OnQUICConnect)
task := quicConf.CreateQUICWork(p.Logger, quicHandler.OnQUICConnect)
err = p.AddTask(task).WaitStarted()
}
}
@@ -401,22 +402,22 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber *
func (p *Plugin) Pull(streamPath string, url string) {
puller := p.Meta.Puller()
puller.GetPullContext().Init(puller, p, streamPath, url)
puller.GetPullJob().Init(puller, p, streamPath, url)
}
func (p *Plugin) Push(streamPath string, url string) {
pusher := p.Meta.Pusher()
pusher.GetPushContext().Init(pusher, p, streamPath, url)
pusher.GetPushJob().Init(pusher, p, streamPath, url)
}
func (p *Plugin) Record(streamPath string, filePath string) {
recorder := p.Meta.Recorder()
recorder.GetRecordContext().Init(recorder, p, streamPath, filePath)
recorder.GetRecordJob().Init(recorder, p, streamPath, filePath)
}
func (p *Plugin) Transform(fromStreamPath, toStreamPath string) {
transformer := p.Meta.Transformer()
transformer.GetTransformContext().Init(transformer, p, fromStreamPath, toStreamPath)
transformer.GetTransformJob().Init(transformer, p, fromStreamPath, toStreamPath)
}
func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {

View File

@@ -93,7 +93,7 @@ func (c *CascadeClientPlugin) Pull(streamPath, url string) {
puller := &cascade.Puller{
Connection: c.conn,
}
puller.GetPullContext().Init(puller, &c.Plugin, streamPath, url)
puller.GetPullJob().Init(puller, &c.Plugin, streamPath, url)
}
//func (c *CascadeClientPlugin) Start() {

View File

@@ -2,16 +2,17 @@ package cascade
import (
"fmt"
"io"
"net/http"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/quic-go/quic-go"
"io"
"m7s.live/m7s/v5/pkg/util"
"net/http"
"m7s.live/m7s/v5/pkg/task"
)
type Http2Quic struct {
util.Task
task.Task
quic.Connection
quic.Stream
}

View File

@@ -2,6 +2,7 @@ package cascade
import (
"fmt"
"github.com/quic-go/quic-go"
"m7s.live/m7s/v5"
flv "m7s.live/m7s/v5/plugin/flv/pkg"
@@ -12,8 +13,8 @@ type Puller struct {
quic.Connection
}
func (p *Puller) GetPullContext() *m7s.PullContext {
return &p.Ctx
func (p *Puller) GetPullJob() *m7s.PullJob {
return &p.PullJob
}
func NewCascadePuller() m7s.IPuller {
@@ -21,7 +22,7 @@ func NewCascadePuller() m7s.IPuller {
}
func (p *Puller) Start() (err error) {
if err = p.Ctx.Publish(); err != nil {
if err = p.PullJob.Publish(); err != nil {
return
}
var stream quic.Stream
@@ -30,6 +31,6 @@ func (p *Puller) Start() (err error) {
return
}
p.ReadCloser = stream
_, err = fmt.Fprintf(stream, "%s %s\r\n", "PULLFLV", p.Ctx.Publisher.StreamPath)
_, err = fmt.Fprintf(stream, "%s %s\r\n", "PULLFLV", p.PullJob.Publisher.StreamPath)
return
}

View File

@@ -2,14 +2,15 @@ package plugin_cascade
import (
"bufio"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"net/http"
"strconv"
"strings"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"github.com/quic-go/quic-go"
"m7s.live/m7s/v5/plugin/cascade/pkg"
cascade "m7s.live/m7s/v5/plugin/cascade/pkg"
)
type CascadeServerPlugin struct {
@@ -21,12 +22,12 @@ type CascadeServerPlugin struct {
var _ = m7s.InstallPlugin[CascadeServerPlugin]()
type CascadeServer struct {
util.MarcoLongTask
task.Work
quic.Connection
conf *CascadeServerPlugin
}
func (c *CascadeServerPlugin) OnQUICConnect(conn quic.Connection) util.ITask {
func (c *CascadeServerPlugin) OnQUICConnect(conn quic.Connection) task.ITask {
ret := &CascadeServer{
Connection: conn,
conf: c,

View File

@@ -2,13 +2,14 @@ package flv
import (
"errors"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
type Puller struct {
m7s.HttpFilePuller
m7s.HTTPFilePuller
}
func NewPuller() m7s.IPuller {
@@ -17,7 +18,7 @@ func NewPuller() m7s.IPuller {
func (p *Puller) Run() (err error) {
reader := util.NewBufReader(p.ReadCloser)
publisher := p.Ctx.Publisher
publisher := p.PullJob.Publisher
var hasAudio, hasVideo bool
var absTS uint32
var head util.Memory

View File

@@ -3,19 +3,20 @@ package flv
import (
"fmt"
"io"
"os"
"path/filepath"
"slices"
"time"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
"os"
"path/filepath"
"slices"
"time"
)
type WriteFlvMetaTagQueueTask struct {
task.MarcoLongTask
task.Work
}
var writeMetaTagQueueTask WriteFlvMetaTagQueueTask
@@ -149,7 +150,7 @@ func (r *Recorder) Run() (err error) {
var times []float64
var offset int64
var duration int64
ctx := &r.Ctx
ctx := &r.RecordJob
suber := ctx.Subscriber
noFragment := ctx.Fragment == 0 || ctx.Append
if noFragment {

View File

@@ -2,6 +2,10 @@ package plugin_gb28181
import (
"fmt"
"net"
"strconv"
"strings"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"m7s.live/m7s/v5"
@@ -10,26 +14,23 @@ import (
"m7s.live/m7s/v5/pkg/util"
gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg"
rtp2 "m7s.live/m7s/v5/plugin/rtp/pkg"
"net"
"strconv"
"strings"
)
type Dialog struct {
task.MarcoTask
task.Job
*Channel
*gb28181.Receiver
gb28181.InviteOptions
gb *GB28181Plugin
session *sipgo.DialogClientSession
pullCtx m7s.PullContext
pullCtx m7s.PullJob
}
func (d *Dialog) GetCallID() string {
return d.session.InviteRequest.CallID().Value()
}
func (d *Dialog) GetPullContext() *m7s.PullContext {
func (d *Dialog) GetPullJob() *m7s.PullJob {
return &d.pullCtx
}
@@ -123,7 +124,7 @@ func (d *Dialog) Run() (err error) {
var tcpConf config.TCP
tcpConf.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
tcpConf.ListenNum = 1
d.AddTask(tcpConf.CreateTCPTask(d.Logger, func(conn *net.TCPConn) task.ITask {
d.AddTask(tcpConf.CreateTCPWork(d.Logger, func(conn *net.TCPConn) task.ITask {
d.Receiver.RTPReader = (*rtp2.TCP)(conn)
return d.Receiver
}))

View File

@@ -296,7 +296,7 @@ func (gb *GB28181Plugin) Pull(streamPath, url string) {
dialog := Dialog{
gb: gb,
}
dialog.GetPullContext().Init(&dialog, &gb.Plugin, streamPath, url)
dialog.GetPullJob().Init(&dialog, &gb.Plugin, streamPath, url)
}
func (gb *GB28181Plugin) GetPullableList() []string {

View File

@@ -1,18 +1,19 @@
package mp4
import (
"github.com/deepch/vdk/codec/h265parser"
"io"
"strings"
"github.com/deepch/vdk/codec/h265parser"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/mp4/pkg/box"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
"strings"
)
type Puller struct {
m7s.HttpFilePuller
m7s.HTTPFilePuller
}
func NewPuller() m7s.IPuller {
@@ -20,7 +21,7 @@ func NewPuller() m7s.IPuller {
}
func (p *Puller) Run() (err error) {
ctx := &p.Ctx
ctx := &p.PullJob
var demuxer *box.MovDemuxer
switch v := p.ReadCloser.(type) {
case io.ReadSeeker:

View File

@@ -1,17 +1,18 @@
package mp4
import (
"os"
"time"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/plugin/mp4/pkg/box"
"os"
"time"
)
type WriteTrailerQueueTask struct {
task.MarcoLongTask
task.Work
}
var writeTrailerQueueTask WriteTrailerQueueTask
@@ -45,7 +46,7 @@ func (task *writeTrailerTask) Start() (err error) {
}
func (r *Recorder) Run() (err error) {
ctx := &r.Ctx
ctx := &r.RecordJob
var file *os.File
var muxer *box.Movmuxer
var audioId, videoId uint32

View File

@@ -9,6 +9,6 @@ import (
func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *gpb.SuccessResponse, err error) {
pusher := rtmp.NewPusher()
err = pusher.GetPushContext().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL).WaitStarted()
err = pusher.GetPushJob().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL).WaitStarted()
return &gpb.SuccessResponse{}, err
}

View File

@@ -110,8 +110,8 @@ const (
type Client struct {
*NetStream
pullCtx m7s.PullContext
pushCtx m7s.PushContext
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string
}
@@ -125,11 +125,11 @@ func (c *Client) Start() (err error) {
return
}
func (c *Client) GetPullContext() *m7s.PullContext {
func (c *Client) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *Client) GetPushContext() *m7s.PushContext {
func (c *Client) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}

View File

@@ -2,12 +2,13 @@ package rtmp
import (
"errors"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"net"
"runtime"
"sync/atomic"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
)
@@ -44,7 +45,7 @@ const (
)
type NetConnection struct {
task.MarcoTask
task.Job
*util.BufReader
net.Conn
bandwidth uint32

View File

@@ -2,11 +2,12 @@ package rtsp
import (
"crypto/tls"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"net"
"net/url"
"strings"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
)
const (
@@ -59,8 +60,8 @@ func createClient(p *m7s.Connection) (s *Stream, err error) {
type Client struct {
*Stream
pullCtx m7s.PullContext
pushCtx m7s.PushContext
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string
}
@@ -74,11 +75,11 @@ func (c *Client) Start() (err error) {
return
}
func (c *Client) GetPullContext() *m7s.PullContext {
func (c *Client) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *Client) GetPushContext() *m7s.PushContext {
func (c *Client) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}

View File

@@ -2,7 +2,6 @@ package rtsp
import (
"encoding/binary"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/url"
"runtime"
@@ -11,6 +10,8 @@ import (
"sync/atomic"
"time"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
)
@@ -27,7 +28,7 @@ func NewNetConnection(conn net.Conn) *NetConnection {
}
type NetConnection struct {
task.MarcoTask
task.Job
*util.BufReader
Backchannel bool
Media string

View File

@@ -18,7 +18,7 @@ func (r *StressPlugin) pull(count int, format, url string, puller m7s.Puller) (e
if i := r.pullers.Length; count > i {
for j := i; j < count; j++ {
p := puller()
ctx := p.GetPullContext().Init(p, &r.Plugin, fmt.Sprintf("stress/%d", j), fmt.Sprintf(format, url))
ctx := p.GetPullJob().Init(p, &r.Plugin, fmt.Sprintf("stress/%d", j), fmt.Sprintf(format, url))
if err = ctx.WaitStarted(); err != nil {
return
}
@@ -40,7 +40,7 @@ func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pu
if i := r.pushers.Length; count > i {
for j := i; j < count; j++ {
p := pusher()
ctx := p.GetPushContext().Init(p, &r.Plugin, streamPath, fmt.Sprintf(format, remoteHost, j))
ctx := p.GetPushJob().Init(p, &r.Plugin, streamPath, fmt.Sprintf(format, remoteHost, j))
if err = ctx.WaitStarted(); err != nil {
return
}

View File

@@ -1,17 +1,18 @@
package plugin_stress
import (
"sync"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/stress/pb"
"sync"
)
type StressPlugin struct {
pb.UnimplementedApiServer
m7s.Plugin
pushers util.Collection[string, *m7s.PushContext]
pullers util.Collection[string, *m7s.PullContext]
pushers util.Collection[string, *m7s.PushJob]
pullers util.Collection[string, *m7s.PullJob]
}
var _ = m7s.InstallPlugin[StressPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)

View File

@@ -159,6 +159,6 @@ func (p *WebRTCPlugin) Pull(streamPath, url string) {
p.Error("pull", "error", err)
return
}
cfClient.GetPullContext().Init(cfClient, &p.Plugin, streamPath, url)
cfClient.GetPullJob().Init(cfClient, &p.Plugin, streamPath, url)
}
}

View File

@@ -15,19 +15,19 @@ type PullRequest struct {
type Client struct {
Connection
pullCtx m7s.PullContext
pushCtx m7s.PushContext
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string
appId string
token string
apiBase string
}
func (c *Client) GetPullContext() *m7s.PullContext {
func (c *Client) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *Client) GetPushContext() *m7s.PushContext {
func (c *Client) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}

View File

@@ -4,18 +4,19 @@ import (
"bytes"
"encoding/json"
"errors"
"github.com/pion/webrtc/v3"
"m7s.live/m7s/v5"
"net/http"
"net/url"
"strings"
"github.com/pion/webrtc/v3"
"m7s.live/m7s/v5"
)
type (
CFClient struct {
Connection
pullCtx m7s.PullContext
pushCtx m7s.PushContext
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string
ApiBase string
sessionId string
@@ -153,10 +154,10 @@ func (c *CFClient) request(href string, body any, result any) (err error) {
return
}
func (c *CFClient) GetPullContext() *m7s.PullContext {
func (c *CFClient) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *CFClient) GetPushContext() *m7s.PushContext {
func (c *CFClient) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}

View File

@@ -2,6 +2,8 @@ package webrtc
import (
"errors"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
. "github.com/pion/webrtc/v3"
@@ -9,11 +11,10 @@ import (
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
mrtp "m7s.live/m7s/v5/plugin/rtp/pkg"
"time"
)
type Connection struct {
task.MarcoTask
task.Job
*PeerConnection
SDP string
// LocalSDP *sdp.SessionDescription

View File

@@ -2,19 +2,20 @@ package m7s
import (
"io"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
"net/http"
"net/url"
"os"
"strings"
"time"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
)
type (
Connection struct {
task.MarcoTask
task.Job
Plugin *Plugin
StreamPath string // 对应本地流
RemoteURL string // 远程服务器地址(用于推拉)
@@ -23,12 +24,12 @@ type (
IPuller interface {
task.ITask
GetPullContext() *PullContext
GetPullJob() *PullJob
}
Puller = func() IPuller
PullContext struct {
PullJob struct {
Connection
Publisher *Publisher
publishConfig *config.Publish
@@ -36,9 +37,9 @@ type (
puller IPuller
}
HttpFilePuller struct {
HTTPFilePuller struct {
task.Task
Ctx PullContext
PullJob PullJob
io.ReadCloser
}
)
@@ -58,11 +59,11 @@ func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, pro
}
}
func (p *PullContext) GetPullContext() *PullContext {
func (p *PullJob) GetPullJob() *PullJob {
return p
}
func (p *PullContext) Init(puller IPuller, plugin *Plugin, streamPath string, url string) *PullContext {
func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, url string) *PullJob {
publishConfig := plugin.config.Publish
publishConfig.PublishTimeout = 0
p.Pull = plugin.config.Pull
@@ -78,16 +79,16 @@ func (p *PullContext) Init(puller IPuller, plugin *Plugin, streamPath string, ur
return p
}
func (p *PullContext) GetKey() string {
func (p *PullJob) GetKey() string {
return p.StreamPath
}
func (p *PullContext) Publish() (err error) {
func (p *PullJob) Publish() (err error) {
p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, p.StreamPath, *p.publishConfig)
return
}
func (p *PullContext) Start() (err error) {
func (p *PullJob) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Pulls.Get(p.GetKey()); ok {
return pkg.ErrStreamExist
@@ -97,18 +98,18 @@ func (p *PullContext) Start() (err error) {
return
}
func (p *PullContext) Dispose() {
func (p *PullJob) Dispose() {
p.Plugin.Server.Pulls.Remove(p)
}
func (p *HttpFilePuller) Start() (err error) {
if err = p.Ctx.Publish(); err != nil {
func (p *HTTPFilePuller) Start() (err error) {
if err = p.PullJob.Publish(); err != nil {
return
}
remoteURL := p.Ctx.RemoteURL
remoteURL := p.PullJob.RemoteURL
if strings.HasPrefix(remoteURL, "http") {
var res *http.Response
if res, err = p.Ctx.HTTPClient.Get(remoteURL); err == nil {
if res, err = p.PullJob.HTTPClient.Get(remoteURL); err == nil {
if res.StatusCode != http.StatusOK {
return io.EOF
}
@@ -123,10 +124,10 @@ func (p *HttpFilePuller) Start() (err error) {
return
}
func (p *HttpFilePuller) GetPullContext() *PullContext {
return &p.Ctx
func (p *HTTPFilePuller) GetPullJob() *PullJob {
return &p.PullJob
}
func (p *HttpFilePuller) Dispose() {
func (p *HTTPFilePuller) Dispose() {
p.ReadCloser.Close()
}

View File

@@ -1,32 +1,33 @@
package m7s
import (
"time"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/task"
"time"
"m7s.live/m7s/v5/pkg/config"
)
type IPusher interface {
task.ITask
GetPushContext() *PushContext
GetPushJob() *PushJob
}
type Pusher = func() IPusher
type PushContext struct {
type PushJob struct {
Connection
Subscriber *Subscriber
config.Push
pusher IPusher
}
func (p *PushContext) GetKey() string {
func (p *PushJob) GetKey() string {
return p.RemoteURL
}
func (p *PushContext) Init(pusher IPusher, plugin *Plugin, streamPath string, url string) *PushContext {
func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, url string) *PushJob {
p.Push = plugin.config.Push
p.Connection.Init(plugin, streamPath, url, plugin.config.Push.Proxy)
p.Logger = plugin.Logger.With("pushURL", url, "streamPath", streamPath)
@@ -39,12 +40,12 @@ func (p *PushContext) Init(pusher IPusher, plugin *Plugin, streamPath string, ur
return p
}
func (p *PushContext) Subscribe() (err error) {
func (p *PushJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.pusher.GetTask().Context, p.StreamPath)
return
}
func (p *PushContext) Start() (err error) {
func (p *PushJob) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Pushs.Get(p.GetKey()); ok {
return pkg.ErrPushRemoteURLExist
@@ -54,6 +55,6 @@ func (p *PushContext) Start() (err error) {
return
}
func (p *PushContext) Dispose() {
func (p *PushJob) Dispose() {
p.Plugin.Server.Pushs.Remove(p)
}

View File

@@ -1,22 +1,23 @@
package m7s
import (
"m7s.live/m7s/v5/pkg/task"
"os"
"path/filepath"
"time"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg"
)
type (
IRecorder interface {
task.ITask
GetRecordContext() *RecordContext
GetRecordJob() *RecordJob
}
Recorder = func() IRecorder
RecordContext struct {
task.MarcoTask
Recorder = func() IRecorder
RecordJob struct {
task.Job
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
@@ -27,28 +28,28 @@ type (
}
DefaultRecorder struct {
task.Task
Ctx RecordContext
RecordJob RecordJob
}
)
func (r *DefaultRecorder) GetRecordContext() *RecordContext {
return &r.Ctx
func (r *DefaultRecorder) GetRecordJob() *RecordJob {
return &r.RecordJob
}
func (r *DefaultRecorder) Start() (err error) {
return r.Ctx.Subscribe()
return r.RecordJob.Subscribe()
}
func (p *RecordContext) GetKey() string {
func (p *RecordJob) GetKey() string {
return p.FilePath
}
func (p *RecordContext) Subscribe() (err error) {
func (p *RecordJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath)
return
}
func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath string, filePath string) *RecordContext {
func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, filePath string) *RecordJob {
p.Plugin = plugin
p.Fragment = plugin.config.Record.Fragment
p.Append = plugin.config.Record.Append
@@ -63,7 +64,7 @@ func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath stri
return p
}
func (p *RecordContext) Start() (err error) {
func (p *RecordJob) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Records.Get(p.GetKey()); ok {
return pkg.ErrRecordSamePath

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"net/http"
"os"
"path/filepath"
@@ -13,6 +12,8 @@ import (
"strings"
"time"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/config"
sysruntime "runtime"
@@ -71,10 +72,10 @@ type Server struct {
Plugins util.Collection[string, *Plugin]
Streams task.Manager[string, *Publisher]
Waiting util.Collection[string, *WaitStream]
Pulls task.Manager[string, *PullContext]
Pushs task.Manager[string, *PushContext]
Records task.Manager[string, *RecordContext]
Transforms task.Manager[string, *TransformContext]
Pulls task.Manager[string, *PullJob]
Pushs task.Manager[string, *PushJob]
Records task.Manager[string, *RecordJob]
Transforms task.Manager[string, *TransformJob]
Subscribers SubscriberCollection
LogHandler MultiLogHandler
apiList []string
@@ -102,7 +103,7 @@ func NewServer(conf any) (s *Server) {
}
func Run(ctx context.Context, conf any) (err error) {
for err = ErrRestart; errors.Is(err, ErrRestart); err = Servers.AddTask(NewServer(conf), ctx).WaitStopped() {
for err = ErrRestart; errors.Is(err, ErrRestart); err = Servers.Add(NewServer(conf), ctx).WaitStopped() {
}
return
}
@@ -197,12 +198,12 @@ func (s *Server) Start() (err error) {
}
}
if httpConf.ListenAddrTLS != "" {
s.stopOnError(httpConf.CreateHTTPSTask(s.Logger))
s.stopOnError(httpConf.CreateHTTPSWork(s.Logger))
}
if httpConf.ListenAddr != "" {
s.stopOnError(httpConf.CreateHTTPTask(s.Logger))
s.stopOnError(httpConf.CreateHTTPWork(s.Logger))
}
var tcpTask *config.ListenTCPTask
var tcpTask *config.ListenTCPWork
if tcpConf.ListenAddr != "" {
var opts []grpc.ServerOption
s.grpcServer = grpc.NewServer(opts...)
@@ -217,7 +218,7 @@ func (s *Server) Start() (err error) {
s.Error("register handler faild", "error", err)
return
}
tcpTask = tcpConf.CreateTCPTask(s.Logger, nil)
tcpTask = tcpConf.CreateTCPWork(s.Logger, nil)
if err = s.AddTask(tcpTask).WaitStarted(); err != nil {
s.Error("failed to listen", "error", err)
return
@@ -275,7 +276,7 @@ func (c *CheckSubWaitTimeout) Tick(any) {
type GRPCServer struct {
task.Task
s *Server
tcpTask *config.ListenTCPTask
tcpTask *config.ListenTCPWork
}
func (gRPC *GRPCServer) Dispose() {

View File

@@ -2,13 +2,14 @@ package m7s
import (
"errors"
"m7s.live/m7s/v5/pkg/task"
"net/url"
"reflect"
"runtime"
"strings"
"time"
"m7s.live/m7s/v5/pkg/task"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
@@ -17,7 +18,7 @@ import (
var AVFrameType = reflect.TypeOf((*AVFrame)(nil))
type PubSubBase struct {
task.MarcoTask
task.Job
Plugin *Plugin
StreamPath string
Args url.Values

View File

@@ -1,10 +1,12 @@
package test
import (
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"errors"
"testing"
"time"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
)
func TestRestart(b *testing.T) {
@@ -21,11 +23,9 @@ func TestRestart(b *testing.T) {
server.Stop(pkg.ErrStopFromAPI)
b.Log("server stop3")
}()
for {
for err := pkg.ErrRestart; errors.Is(err, pkg.ErrRestart); {
server = m7s.NewServer(conf)
if err := m7s.AddRootTask(server).WaitStopped(); err != pkg.ErrRestart {
return
}
err = m7s.Servers.Add(server).WaitStopped()
}
//if err := util.RootTask.AddTask(server).WaitStopped(); err != pkg.ErrStopFromAPI {
// b.Error("server.Run should return ErrStopFromAPI", err)

View File

@@ -8,11 +8,11 @@ import (
type (
ITransformer interface {
task.ITask
GetTransformContext() *TransformContext
GetTransformJob() *TransformJob
}
Transformer = func() ITransformer
TransformContext struct {
task.MarcoTask
Transformer = func() ITransformer
TransformJob struct {
task.Job
FromStreamPath string // 待转换的本地流
ToStreamPath string // 转换后的本地流
Plugin *Plugin
@@ -22,33 +22,37 @@ type (
}
DefaultTransformer struct {
task.Task
Ctx TransformContext
TransformJob TransformJob
}
)
func (r *DefaultTransformer) GetTransformContext() *TransformContext {
return &r.Ctx
func (r *DefaultTransformer) GetTransformJob() *TransformJob {
return &r.TransformJob
}
func (r *DefaultTransformer) Start() (err error) {
return r.Ctx.Subscribe()
err = r.TransformJob.Subscribe()
if err == nil {
err = r.TransformJob.Publish()
}
return
}
func (p *TransformContext) GetKey() string {
func (p *TransformJob) GetKey() string {
return p.ToStreamPath
}
func (p *TransformContext) Subscribe() (err error) {
func (p *TransformJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.transformer.GetTask().Context, p.FromStreamPath)
return
}
func (p *TransformContext) Publish() (err error) {
func (p *TransformJob) Publish() (err error) {
p.Publisher, err = p.Plugin.Publish(p.transformer.GetTask().Context, p.ToStreamPath)
return
}
func (p *TransformContext) Init(transformer ITransformer, plugin *Plugin, fromStreamPath string, toStreamPath string) *TransformContext {
func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, fromStreamPath string, toStreamPath string) *TransformJob {
p.Plugin = plugin
p.FromStreamPath = fromStreamPath
p.ToStreamPath = toStreamPath
@@ -61,7 +65,7 @@ func (p *TransformContext) Init(transformer ITransformer, plugin *Plugin, fromSt
return p
}
func (p *TransformContext) Start() (err error) {
func (p *TransformJob) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Transforms.Get(p.GetKey()); ok {
return pkg.ErrRecordSamePath
@@ -71,6 +75,6 @@ func (p *TransformContext) Start() (err error) {
return
}
func (p *TransformContext) Dispose() {
func (p *TransformJob) Dispose() {
p.Plugin.Server.Transforms.Remove(p)
}