diff --git a/api.go b/api.go index 01264d7..6fc8df5 100644 --- a/api.go +++ b/api.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "m7s.live/m7s/v5/pkg/task" "maps" "net" "net/http" @@ -13,6 +12,8 @@ import ( "strings" "time" + "m7s.live/m7s/v5/pkg/task" + "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" @@ -146,15 +147,15 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res } func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) { - var fillData func(m task.IMarcoTask) *pb.TaskTreeResponse - fillData = func(m task.IMarcoTask) (res *pb.TaskTreeResponse) { + var fillData func(m task.IJob) *pb.TaskTreeResponse + fillData = func(m task.IJob) (res *pb.TaskTreeResponse) { res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Blocked: m.Blocked(), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) { for k, v := range m.GetTask().Description { yield(k, fmt.Sprintf("%v", v)) } })} for t := range m.RangeSubTask { - if marcoTask, ok := t.(task.IMarcoTask); ok { + if marcoTask, ok := t.(task.IJob); ok { res.Children = append(res.Children, fillData(marcoTask)) } else { res.Children = append(res.Children, &pb.TaskTreeResponse{Id: t.GetTaskID(), State: uint32(t.GetState()), Type: uint32(t.GetTaskType()), Owner: t.GetOwnerType(), StartTime: timestamppb.New(t.GetTask().StartTime)}) diff --git a/example/multiple/main.go b/example/multiple/main.go index ac97acb..5a19dd3 100644 --- a/example/multiple/main.go +++ b/example/multiple/main.go @@ -18,6 +18,6 @@ import ( func main() { ctx := context.Background() // ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100)) - m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml")) + go m7s.Run(ctx, "config2.yaml") m7s.Run(ctx, "config1.yaml") } diff --git a/example/rtsp-pull/main.go b/example/rtsp-pull/main.go index da0a2a5..3ffc534 100644 --- a/example/rtsp-pull/main.go +++ b/example/rtsp-pull/main.go @@ -17,7 +17,7 @@ func main() { flag.BoolVar(&multi, "multi", false, "debug") flag.Parse() if multi { - m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml")) + go m7s.Run(ctx, "config2.yaml") } time.Sleep(time.Second) m7s.Run(ctx, "config1.yaml") diff --git a/example/rtsp-push/main.go b/example/rtsp-push/main.go index da0a2a5..3ffc534 100644 --- a/example/rtsp-push/main.go +++ b/example/rtsp-push/main.go @@ -17,7 +17,7 @@ func main() { flag.BoolVar(&multi, "multi", false, "debug") flag.Parse() if multi { - m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml")) + go m7s.Run(ctx, "config2.yaml") } time.Sleep(time.Second) m7s.Run(ctx, "config1.yaml") diff --git a/pkg/config/http.go b/pkg/config/http.go index a14e0ee..5e11ffd 100644 --- a/pkg/config/http.go +++ b/pkg/config/http.go @@ -90,14 +90,14 @@ func (config *HTTP) GetHTTPConfig() *HTTP { // return config.mux.Handler(r) // } -func (config *HTTP) CreateHTTPTask(logger *slog.Logger) *ListenHTTPTask { - ret := &ListenHTTPTask{HTTP: config} +func (config *HTTP) CreateHTTPWork(logger *slog.Logger) *ListenHTTPWork { + ret := &ListenHTTPWork{HTTP: config} ret.Logger = logger.With("addr", config.ListenAddr) return ret } -func (config *HTTP) CreateHTTPSTask(logger *slog.Logger) *ListenHTTPSTask { - ret := &ListenHTTPSTask{ListenHTTPTask{HTTP: config}} +func (config *HTTP) CreateHTTPSWork(logger *slog.Logger) *ListenHTTPSWork { + ret := &ListenHTTPSWork{ListenHTTPWork{HTTP: config}} ret.Logger = logger.With("addr", config.ListenAddrTLS) return ret } @@ -166,13 +166,13 @@ func BasicAuth(u, p string, next http.Handler) http.Handler { }) } -type ListenHTTPTask struct { +type ListenHTTPWork struct { task.Task *HTTP *http.Server } -func (task *ListenHTTPTask) Start() (err error) { +func (task *ListenHTTPWork) Start() (err error) { task.Server = &http.Server{ Addr: task.ListenAddr, ReadTimeout: task.HTTP.ReadTimeout, @@ -183,21 +183,21 @@ func (task *ListenHTTPTask) Start() (err error) { return } -func (task *ListenHTTPTask) Go() error { +func (task *ListenHTTPWork) Go() error { task.Info("listen http") return task.Server.ListenAndServe() } -func (task *ListenHTTPTask) Dispose() { +func (task *ListenHTTPWork) Dispose() { task.Info("http server stop") task.Server.Close() } -type ListenHTTPSTask struct { - ListenHTTPTask +type ListenHTTPSWork struct { + ListenHTTPWork } -func (task *ListenHTTPSTask) Start() (err error) { +func (task *ListenHTTPSWork) Start() (err error) { cer, _ := tls.X509KeyPair(LocalCert, LocalKey) task.Server = &http.Server{ Addr: task.HTTP.ListenAddrTLS, @@ -233,7 +233,7 @@ func (task *ListenHTTPSTask) Start() (err error) { return } -func (task *ListenHTTPSTask) Go() error { +func (task *ListenHTTPSWork) Go() error { task.Info("listen https") return task.Server.ListenAndServeTLS(task.HTTP.CertFile, task.HTTP.KeyFile) } diff --git a/pkg/config/quic.go b/pkg/config/quic.go index 0bce1e6..494ed6d 100644 --- a/pkg/config/quic.go +++ b/pkg/config/quic.go @@ -3,8 +3,9 @@ package config import ( "context" "crypto/tls" - "github.com/quic-go/quic-go" "log/slog" + + "github.com/quic-go/quic-go" "m7s.live/m7s/v5/pkg/task" ) @@ -19,8 +20,8 @@ type Quic struct { AutoListen bool `default:"true" desc:"是否自动监听"` } -func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic.Connection) task.ITask) *ListenQuicTask { - ret := &ListenQuicTask{ +func (q *Quic) CreateQUICWork(logger *slog.Logger, handler func(connection quic.Connection) task.ITask) *ListenQuicWork { + ret := &ListenQuicWork{ Quic: q, handler: handler, } @@ -28,14 +29,14 @@ func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic. return ret } -type ListenQuicTask struct { - task.MarcoLongTask +type ListenQuicWork struct { + task.Work *Quic *quic.Listener handler func(connection quic.Connection) task.ITask } -func (task *ListenQuicTask) Start() (err error) { +func (task *ListenQuicWork) Start() (err error) { var ltsc *tls.Config ltsc, err = GetTLSConfig(task.CertFile, task.KeyFile) if err != nil { @@ -52,7 +53,7 @@ func (task *ListenQuicTask) Start() (err error) { return } -func (task *ListenQuicTask) Go() error { +func (task *ListenQuicWork) Go() error { for { conn, err := task.Accept(task.Context) if err != nil { @@ -63,6 +64,6 @@ func (task *ListenQuicTask) Go() error { } } -func (task *ListenQuicTask) Dispose() { +func (task *ListenQuicWork) Dispose() { _ = task.Listener.Close() } diff --git a/pkg/config/tcp.go b/pkg/config/tcp.go index 748b254..68ef95f 100644 --- a/pkg/config/tcp.go +++ b/pkg/config/tcp.go @@ -4,10 +4,11 @@ import ( "crypto/tls" _ "embed" "log/slog" - "m7s.live/m7s/v5/pkg/task" "net" "runtime" "time" + + "m7s.live/m7s/v5/pkg/task" ) //go:embed local.monibuca.com_bundle.pem @@ -43,28 +44,28 @@ type TCP struct { AutoListen bool `default:"true" desc:"是否自动监听"` } -func (config *TCP) CreateTCPTask(logger *slog.Logger, handler TCPHandler) *ListenTCPTask { - ret := &ListenTCPTask{TCP: config, handler: handler} +func (config *TCP) CreateTCPWork(logger *slog.Logger, handler TCPHandler) *ListenTCPWork { + ret := &ListenTCPWork{TCP: config, handler: handler} ret.Logger = logger.With("addr", config.ListenAddr) return ret } -func (config *TCP) CreateTCPTLSTask(logger *slog.Logger, handler TCPHandler) *ListenTCPTLSTask { - ret := &ListenTCPTLSTask{ListenTCPTask{TCP: config, handler: handler}} +func (config *TCP) CreateTCPTLSWork(logger *slog.Logger, handler TCPHandler) *ListenTCPTLSWork { + ret := &ListenTCPTLSWork{ListenTCPWork{TCP: config, handler: handler}} ret.Logger = logger.With("addr", config.ListenAddrTLS) return ret } type TCPHandler = func(conn *net.TCPConn) task.ITask -type ListenTCPTask struct { - task.MarcoLongTask +type ListenTCPWork struct { + task.Work *TCP net.Listener handler TCPHandler } -func (task *ListenTCPTask) Start() (err error) { +func (task *ListenTCPWork) Start() (err error) { task.Listener, err = net.Listen("tcp", task.ListenAddr) if err == nil { task.Info("listen tcp") @@ -84,16 +85,16 @@ func (task *ListenTCPTask) Start() (err error) { return } -func (task *ListenTCPTask) Dispose() { +func (task *ListenTCPWork) Dispose() { task.Info("tcp server stop") task.Listener.Close() } -type ListenTCPTLSTask struct { - ListenTCPTask +type ListenTCPTLSWork struct { + ListenTCPWork } -func (task *ListenTCPTLSTask) Start() (err error) { +func (task *ListenTCPTLSWork) Start() (err error) { var tlsConfig *tls.Config if tlsConfig, err = GetTLSConfig(task.CertFile, task.KeyFile); err != nil { return @@ -107,7 +108,7 @@ func (task *ListenTCPTLSTask) Start() (err error) { return } -func (task *ListenTCPTask) listen(handler TCPHandler) { +func (task *ListenTCPWork) listen(handler TCPHandler) { var tempDelay time.Duration for { conn, err := task.Accept() diff --git a/pkg/config/udp.go b/pkg/config/udp.go index c4619be..35062eb 100644 --- a/pkg/config/udp.go +++ b/pkg/config/udp.go @@ -3,9 +3,10 @@ package config import ( "crypto/tls" "log/slog" - "m7s.live/m7s/v5/pkg/task" "net" "time" + + "m7s.live/m7s/v5/pkg/task" ) type UDP struct { @@ -15,24 +16,24 @@ type UDP struct { AutoListen bool `default:"true" desc:"是否自动监听"` } -func (config *UDP) CreateUDPTask(logger *slog.Logger, handler func(conn *net.UDPConn) task.ITask) *ListenUDPTask { - ret := &ListenUDPTask{UDP: config, handler: handler} +func (config *UDP) CreateUDPWork(logger *slog.Logger, handler func(conn *net.UDPConn) task.ITask) *ListenUDPWork { + ret := &ListenUDPWork{UDP: config, handler: handler} ret.Logger = logger.With("addr", config.ListenAddr) return ret } -type ListenUDPTask struct { - task.MarcoLongTask +type ListenUDPWork struct { + task.Work *UDP net.Listener handler func(conn *net.UDPConn) task.ITask } -func (task *ListenUDPTask) Dispose() { +func (task *ListenUDPWork) Dispose() { task.Close() } -func (task *ListenUDPTask) Start() (err error) { +func (task *ListenUDPWork) Start() (err error) { task.Listener, err = net.Listen("udp", task.ListenAddr) if err == nil { task.Info("listen udp") @@ -42,7 +43,7 @@ func (task *ListenUDPTask) Start() (err error) { return } -func (task *ListenUDPTask) Go() error { +func (task *ListenUDPWork) Go() error { var tempDelay time.Duration for { conn, err := task.Accept() diff --git a/pkg/task/long.go b/pkg/task/long.go deleted file mode 100644 index a5a723a..0000000 --- a/pkg/task/long.go +++ /dev/null @@ -1,13 +0,0 @@ -package task - -type MarcoLongTask struct { - MarcoTask -} - -func (m *MarcoLongTask) keepalive() bool { - return true -} - -func (*MarcoLongTask) GetTaskType() TaskType { - return TASK_TYPE_LONG_MACRO -} diff --git a/pkg/task/macro.go b/pkg/task/macro.go deleted file mode 100644 index c6ec428..0000000 --- a/pkg/task/macro.go +++ /dev/null @@ -1,199 +0,0 @@ -package task - -import ( - "context" - "log/slog" - "m7s.live/m7s/v5/pkg/util" - "reflect" - "slices" - "sync" - "sync/atomic" -) - -var idG atomic.Uint32 - -func GetNextTaskID() uint32 { - return idG.Add(1) -} - -// MarcoTask include sub tasks -type MarcoTask struct { - Task - addSub chan ITask - children []ITask - lazyRun sync.Once - childrenDisposed chan struct{} - childDisposeListeners []func(ITask) - blocked bool -} - -func (*MarcoTask) GetTaskType() TaskType { - return TASK_TYPE_MACRO -} - -func (mt *MarcoTask) getMarcoTask() *MarcoTask { - return mt -} - -func (mt *MarcoTask) Blocked() bool { - return mt.blocked -} - -func (mt *MarcoTask) waitChildrenDispose() { - close(mt.addSub) - <-mt.childrenDisposed -} - -func (mt *MarcoTask) OnChildDispose(listener func(ITask)) { - mt.childDisposeListeners = append(mt.childDisposeListeners, listener) -} - -func (mt *MarcoTask) onChildDispose(child ITask) { - for _, listener := range mt.childDisposeListeners { - listener(child) - } - if mt.parent != nil { - mt.parent.onChildDispose(child) - } - if child.getParent() == mt { - child.dispose() - } -} - -func (mt *MarcoTask) dispose() { - if mt.childrenDisposed != nil { - mt.OnBeforeDispose(mt.waitChildrenDispose) - } - mt.Task.dispose() -} - -func (mt *MarcoTask) RangeSubTask(callback func(task ITask) bool) { - for _, task := range mt.children { - callback(task) - } -} - -func (mt *MarcoTask) AddTaskLazy(t IMarcoTask) { - task := t.GetTask() - task.parent = mt - task.handler = t -} - -func (mt *MarcoTask) AddTask(t ITask, opt ...any) (task *Task) { - mt.lazyRun.Do(func() { - if mt.parent != nil && mt.Context == nil { - mt.parent.AddTask(mt.handler) - } - mt.childrenDisposed = make(chan struct{}) - mt.addSub = make(chan ITask, 10) - go mt.run() - }) - if task = t.GetTask(); task.Context == nil { - task.parentCtx = mt.Context - for _, o := range opt { - switch v := o.(type) { - case context.Context: - task.parentCtx = v - case Description: - task.Description = v - case RetryConfig: - task.retry = v - case *slog.Logger: - task.Logger = v - } - } - if task.parentCtx == nil { - panic("context is nil") - } - task.parent = mt - task.level = mt.level + 1 - if task.ID == 0 { - task.ID = GetNextTaskID() - } - task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx) - task.startup = util.NewPromise(task.Context) - task.shutdown = util.NewPromise(context.Background()) - task.handler = t - if task.Logger == nil { - task.Logger = mt.Logger - } - } - if mt.IsStopped() { - task.startup.Reject(mt.StopReason()) - return - } - - mt.addSub <- t - return -} - -func (mt *MarcoTask) Call(callback func() error) { - mt.Post(callback).WaitStarted() -} - -func (mt *MarcoTask) Post(callback func() error) *Task { - task := CreateTaskByCallBack(callback, nil) - return mt.AddTask(task) -} - -func (mt *MarcoTask) addChild(task ITask) int { - mt.children = append(mt.children, task) - return len(mt.children) - 1 -} - -func (mt *MarcoTask) removeChild(index int) { - mt.onChildDispose(mt.children[index]) - mt.children = slices.Delete(mt.children, index, index+1) -} - -func (mt *MarcoTask) run() { - cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}} - defer func() { - err := recover() - if err != nil { - mt.Stop(err.(error)) - } - stopReason := mt.StopReason() - for _, task := range mt.children { - task.Stop(stopReason) - mt.onChildDispose(task) - } - mt.children = nil - close(mt.childrenDisposed) - }() - for { - mt.blocked = false - if chosen, rev, ok := reflect.Select(cases); chosen == 0 { - mt.blocked = true - if !ok { - return - } - if task := rev.Interface().(ITask); task.getParent() == mt { - index := mt.addChild(task) - if err := task.start(); err == nil { - cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())}) - } else { - task.Stop(err) - mt.removeChild(index) - } - } else { - mt.addChild(task) - cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())}) - } - } else { - taskIndex := chosen - 1 - task := mt.children[taskIndex] - switch tt := task.(type) { - case IChannelTask: - tt.Tick(rev.Interface()) - } - if !ok { - mt.removeChild(taskIndex) - cases = slices.Delete(cases, chosen, chosen+1) - } - } - if !mt.handler.keepalive() && len(mt.children) == 0 { - mt.Stop(ErrAutoStop) - } - } -} diff --git a/pkg/task/manager.go b/pkg/task/manager.go index 5b3af42..15116b4 100644 --- a/pkg/task/manager.go +++ b/pkg/task/manager.go @@ -10,16 +10,16 @@ type ManagerItem[K comparable] interface { } type Manager[K comparable, T ManagerItem[K]] struct { - MarcoLongTask + Work Collection[K, T] } -func (m *Manager[K, T]) Add(ctx T) { +func (m *Manager[K, T]) Add(ctx T, opt ...any) *Task { ctx.OnStart(func() { m.Collection.Add(ctx) }) ctx.OnDispose(func() { m.Remove(ctx) }) - m.AddTask(ctx) + return m.AddTask(ctx, opt...) } diff --git a/pkg/task/task.go b/pkg/task/task.go index 953e963..9f49f37 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -5,11 +5,12 @@ import ( "errors" "fmt" "log/slog" - "m7s.live/m7s/v5/pkg/util" "reflect" "runtime/debug" "strings" "time" + + "m7s.live/m7s/v5/pkg/util" ) const TraceLevel = slog.Level(-8) @@ -34,9 +35,9 @@ const ( ) const ( - TASK_TYPE_BASE TaskType = iota - TASK_TYPE_MACRO - TASK_TYPE_LONG_MACRO + TASK_TYPE_TASK TaskType = iota + TASK_TYPE_JOB + TASK_TYPE_Work TASK_TYPE_CHANNEL TASK_TYPE_CALL ) @@ -46,7 +47,7 @@ type ( TaskType byte ITask interface { keepalive() bool - getParent() *MarcoTask + getParent() *Job GetParent() ITask GetTask() *Task GetTaskID() uint32 @@ -65,9 +66,9 @@ type ( GetState() TaskState GetLevel() byte } - IMarcoTask interface { + IJob interface { ITask - getMarcoTask() *MarcoTask + getJob() *Job AddTask(ITask, ...any) *Task RangeSubTask(func(yield ITask) bool) OnChildDispose(func(ITask)) @@ -107,7 +108,7 @@ type ( afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func() Description startup, shutdown *util.Promise - parent *MarcoTask + parent *Job parentCtx context.Context needRetry bool state TaskState @@ -148,14 +149,14 @@ func (task *Task) GetOwnerType() string { } func (*Task) GetTaskType() TaskType { - return TASK_TYPE_BASE + return TASK_TYPE_TASK } func (task *Task) GetTask() *Task { return task } -func (task *Task) getParent() *MarcoTask { +func (task *Task) getParent() *Job { return task.parent } diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index ced9e3a..e92d5b6 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -10,8 +10,8 @@ import ( "time" ) -func createMarcoTask() *MarcoTask { - var mt MarcoTask +func createMarcoTask() *Job { + var mt Job mt.Context, mt.CancelCauseFunc = context.WithCancelCause(context.Background()) mt.handler = &mt mt.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) @@ -78,7 +78,7 @@ func Test_StopByContext(t *testing.T) { func Test_ParentStop(t *testing.T) { mt := createMarcoTask() - parent := &MarcoTask{} + parent := &Job{} mt.AddTask(parent) var task Task parent.AddTask(&task) diff --git a/plugin.go b/plugin.go index 1bb627b..35644fc 100644 --- a/plugin.go +++ b/plugin.go @@ -3,7 +3,6 @@ package m7s import ( "context" "log/slog" - "m7s.live/m7s/v5/pkg/task" "net" "net/http" "os" @@ -12,6 +11,8 @@ import ( "runtime" "strings" + "m7s.live/m7s/v5/pkg/task" + "github.com/quic-go/quic-go" gatewayRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -52,7 +53,7 @@ type ( } IPlugin interface { - task.IMarcoTask + task.IJob OnInit() error OnStop() Pull(path string, url string) @@ -196,7 +197,7 @@ func InstallPlugin[C iPlugin](options ...any) error { } type Plugin struct { - task.MarcoLongTask + task.Work Disabled bool Meta *PluginMeta config config.Common @@ -301,24 +302,24 @@ func (p *Plugin) listen() (err error) { httpConf := &p.config.HTTP if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.Server.config.HTTP.ListenAddrTLS) { - p.stopOnError(httpConf.CreateHTTPSTask(p.Logger)) + p.stopOnError(httpConf.CreateHTTPSWork(p.Logger)) } if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.Server.config.HTTP.ListenAddr) { - p.stopOnError(httpConf.CreateHTTPTask(p.Logger)) + p.stopOnError(httpConf.CreateHTTPWork(p.Logger)) } if tcphandler, ok := p.handler.(ITCPPlugin); ok { tcpConf := &p.config.TCP if tcpConf.ListenAddr != "" && tcpConf.AutoListen { - task := tcpConf.CreateTCPTask(p.Logger, tcphandler.OnTCPConnect) + task := tcpConf.CreateTCPWork(p.Logger, tcphandler.OnTCPConnect) err = p.AddTask(task).WaitStarted() if err != nil { return } } if tcpConf.ListenAddrTLS != "" && tcpConf.AutoListen { - task := tcpConf.CreateTCPTLSTask(p.Logger, tcphandler.OnTCPConnect) + task := tcpConf.CreateTCPTLSWork(p.Logger, tcphandler.OnTCPConnect) err = p.AddTask(task).WaitStarted() if err != nil { return @@ -329,7 +330,7 @@ func (p *Plugin) listen() (err error) { if udpHandler, ok := p.handler.(IUDPPlugin); ok { udpConf := &p.config.UDP if udpConf.ListenAddr != "" && udpConf.AutoListen { - task := udpConf.CreateUDPTask(p.Logger, udpHandler.OnUDPConnect) + task := udpConf.CreateUDPWork(p.Logger, udpHandler.OnUDPConnect) err = p.AddTask(task).WaitStarted() if err != nil { return @@ -340,7 +341,7 @@ func (p *Plugin) listen() (err error) { if quicHandler, ok := p.handler.(IQUICPlugin); ok { quicConf := &p.config.Quic if quicConf.ListenAddr != "" && quicConf.AutoListen { - task := quicConf.CreateQUICTask(p.Logger, quicHandler.OnQUICConnect) + task := quicConf.CreateQUICWork(p.Logger, quicHandler.OnQUICConnect) err = p.AddTask(task).WaitStarted() } } @@ -401,22 +402,22 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber * func (p *Plugin) Pull(streamPath string, url string) { puller := p.Meta.Puller() - puller.GetPullContext().Init(puller, p, streamPath, url) + puller.GetPullJob().Init(puller, p, streamPath, url) } func (p *Plugin) Push(streamPath string, url string) { pusher := p.Meta.Pusher() - pusher.GetPushContext().Init(pusher, p, streamPath, url) + pusher.GetPushJob().Init(pusher, p, streamPath, url) } func (p *Plugin) Record(streamPath string, filePath string) { recorder := p.Meta.Recorder() - recorder.GetRecordContext().Init(recorder, p, streamPath, filePath) + recorder.GetRecordJob().Init(recorder, p, streamPath, filePath) } func (p *Plugin) Transform(fromStreamPath, toStreamPath string) { transformer := p.Meta.Transformer() - transformer.GetTransformContext().Init(transformer, p, fromStreamPath, toStreamPath) + transformer.GetTransformJob().Init(transformer, p, fromStreamPath, toStreamPath) } func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) { diff --git a/plugin/cascade/client.go b/plugin/cascade/client.go index f9043ee..2f664c0 100644 --- a/plugin/cascade/client.go +++ b/plugin/cascade/client.go @@ -93,7 +93,7 @@ func (c *CascadeClientPlugin) Pull(streamPath, url string) { puller := &cascade.Puller{ Connection: c.conn, } - puller.GetPullContext().Init(puller, &c.Plugin, streamPath, url) + puller.GetPullJob().Init(puller, &c.Plugin, streamPath, url) } //func (c *CascadeClientPlugin) Start() { diff --git a/plugin/cascade/pkg/http-quic.go b/plugin/cascade/pkg/http-quic.go index a1f56e5..639b1dd 100644 --- a/plugin/cascade/pkg/http-quic.go +++ b/plugin/cascade/pkg/http-quic.go @@ -2,16 +2,17 @@ package cascade import ( "fmt" + "io" + "net/http" + "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" "github.com/quic-go/quic-go" - "io" - "m7s.live/m7s/v5/pkg/util" - "net/http" + "m7s.live/m7s/v5/pkg/task" ) type Http2Quic struct { - util.Task + task.Task quic.Connection quic.Stream } diff --git a/plugin/cascade/pkg/pull.go b/plugin/cascade/pkg/pull.go index 52dd04b..e1aacea 100644 --- a/plugin/cascade/pkg/pull.go +++ b/plugin/cascade/pkg/pull.go @@ -2,6 +2,7 @@ package cascade import ( "fmt" + "github.com/quic-go/quic-go" "m7s.live/m7s/v5" flv "m7s.live/m7s/v5/plugin/flv/pkg" @@ -12,8 +13,8 @@ type Puller struct { quic.Connection } -func (p *Puller) GetPullContext() *m7s.PullContext { - return &p.Ctx +func (p *Puller) GetPullJob() *m7s.PullJob { + return &p.PullJob } func NewCascadePuller() m7s.IPuller { @@ -21,7 +22,7 @@ func NewCascadePuller() m7s.IPuller { } func (p *Puller) Start() (err error) { - if err = p.Ctx.Publish(); err != nil { + if err = p.PullJob.Publish(); err != nil { return } var stream quic.Stream @@ -30,6 +31,6 @@ func (p *Puller) Start() (err error) { return } p.ReadCloser = stream - _, err = fmt.Fprintf(stream, "%s %s\r\n", "PULLFLV", p.Ctx.Publisher.StreamPath) + _, err = fmt.Fprintf(stream, "%s %s\r\n", "PULLFLV", p.PullJob.Publisher.StreamPath) return } diff --git a/plugin/cascade/server.go b/plugin/cascade/server.go index ed29138..9f13214 100644 --- a/plugin/cascade/server.go +++ b/plugin/cascade/server.go @@ -2,14 +2,15 @@ package plugin_cascade import ( "bufio" - "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg/util" "net/http" "strconv" "strings" + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/task" + "github.com/quic-go/quic-go" - "m7s.live/m7s/v5/plugin/cascade/pkg" + cascade "m7s.live/m7s/v5/plugin/cascade/pkg" ) type CascadeServerPlugin struct { @@ -21,12 +22,12 @@ type CascadeServerPlugin struct { var _ = m7s.InstallPlugin[CascadeServerPlugin]() type CascadeServer struct { - util.MarcoLongTask + task.Work quic.Connection conf *CascadeServerPlugin } -func (c *CascadeServerPlugin) OnQUICConnect(conn quic.Connection) util.ITask { +func (c *CascadeServerPlugin) OnQUICConnect(conn quic.Connection) task.ITask { ret := &CascadeServer{ Connection: conn, conf: c, diff --git a/plugin/flv/pkg/pull.go b/plugin/flv/pkg/pull.go index 85664b2..5342159 100644 --- a/plugin/flv/pkg/pull.go +++ b/plugin/flv/pkg/pull.go @@ -2,13 +2,14 @@ package flv import ( "errors" + "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/util" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" ) type Puller struct { - m7s.HttpFilePuller + m7s.HTTPFilePuller } func NewPuller() m7s.IPuller { @@ -17,7 +18,7 @@ func NewPuller() m7s.IPuller { func (p *Puller) Run() (err error) { reader := util.NewBufReader(p.ReadCloser) - publisher := p.Ctx.Publisher + publisher := p.PullJob.Publisher var hasAudio, hasVideo bool var absTS uint32 var head util.Memory diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index 66869e0..3be7f23 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -3,19 +3,20 @@ package flv import ( "fmt" "io" + "os" + "path/filepath" + "slices" + "time" + "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5/pkg/util" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" - "os" - "path/filepath" - "slices" - "time" ) type WriteFlvMetaTagQueueTask struct { - task.MarcoLongTask + task.Work } var writeMetaTagQueueTask WriteFlvMetaTagQueueTask @@ -149,7 +150,7 @@ func (r *Recorder) Run() (err error) { var times []float64 var offset int64 var duration int64 - ctx := &r.Ctx + ctx := &r.RecordJob suber := ctx.Subscriber noFragment := ctx.Fragment == 0 || ctx.Append if noFragment { diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index 479914b..4916a87 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -2,6 +2,10 @@ package plugin_gb28181 import ( "fmt" + "net" + "strconv" + "strings" + "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "m7s.live/m7s/v5" @@ -10,26 +14,23 @@ import ( "m7s.live/m7s/v5/pkg/util" gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg" rtp2 "m7s.live/m7s/v5/plugin/rtp/pkg" - "net" - "strconv" - "strings" ) type Dialog struct { - task.MarcoTask + task.Job *Channel *gb28181.Receiver gb28181.InviteOptions gb *GB28181Plugin session *sipgo.DialogClientSession - pullCtx m7s.PullContext + pullCtx m7s.PullJob } func (d *Dialog) GetCallID() string { return d.session.InviteRequest.CallID().Value() } -func (d *Dialog) GetPullContext() *m7s.PullContext { +func (d *Dialog) GetPullJob() *m7s.PullJob { return &d.pullCtx } @@ -123,7 +124,7 @@ func (d *Dialog) Run() (err error) { var tcpConf config.TCP tcpConf.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) tcpConf.ListenNum = 1 - d.AddTask(tcpConf.CreateTCPTask(d.Logger, func(conn *net.TCPConn) task.ITask { + d.AddTask(tcpConf.CreateTCPWork(d.Logger, func(conn *net.TCPConn) task.ITask { d.Receiver.RTPReader = (*rtp2.TCP)(conn) return d.Receiver })) diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index f27542c..4caec81 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -296,7 +296,7 @@ func (gb *GB28181Plugin) Pull(streamPath, url string) { dialog := Dialog{ gb: gb, } - dialog.GetPullContext().Init(&dialog, &gb.Plugin, streamPath, url) + dialog.GetPullJob().Init(&dialog, &gb.Plugin, streamPath, url) } func (gb *GB28181Plugin) GetPullableList() []string { diff --git a/plugin/mp4/pkg/pull.go b/plugin/mp4/pkg/pull.go index cc6a8d0..93a4ac1 100644 --- a/plugin/mp4/pkg/pull.go +++ b/plugin/mp4/pkg/pull.go @@ -1,18 +1,19 @@ package mp4 import ( - "github.com/deepch/vdk/codec/h265parser" "io" + "strings" + + "github.com/deepch/vdk/codec/h265parser" "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/codec" "m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/plugin/mp4/pkg/box" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" - "strings" ) type Puller struct { - m7s.HttpFilePuller + m7s.HTTPFilePuller } func NewPuller() m7s.IPuller { @@ -20,7 +21,7 @@ func NewPuller() m7s.IPuller { } func (p *Puller) Run() (err error) { - ctx := &p.Ctx + ctx := &p.PullJob var demuxer *box.MovDemuxer switch v := p.ReadCloser.(type) { case io.ReadSeeker: diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index 2d6e4b9..6a06449 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -1,17 +1,18 @@ package mp4 import ( + "os" + "time" + "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/codec" "m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5/plugin/mp4/pkg/box" - "os" - "time" ) type WriteTrailerQueueTask struct { - task.MarcoLongTask + task.Work } var writeTrailerQueueTask WriteTrailerQueueTask @@ -45,7 +46,7 @@ func (task *writeTrailerTask) Start() (err error) { } func (r *Recorder) Run() (err error) { - ctx := &r.Ctx + ctx := &r.RecordJob var file *os.File var muxer *box.Movmuxer var audioId, videoId uint32 diff --git a/plugin/rtmp/api.go b/plugin/rtmp/api.go index 8f86b7f..e4693e9 100644 --- a/plugin/rtmp/api.go +++ b/plugin/rtmp/api.go @@ -9,6 +9,6 @@ import ( func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *gpb.SuccessResponse, err error) { pusher := rtmp.NewPusher() - err = pusher.GetPushContext().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL).WaitStarted() + err = pusher.GetPushJob().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL).WaitStarted() return &gpb.SuccessResponse{}, err } diff --git a/plugin/rtmp/pkg/client.go b/plugin/rtmp/pkg/client.go index 5600107..ceb861d 100644 --- a/plugin/rtmp/pkg/client.go +++ b/plugin/rtmp/pkg/client.go @@ -110,8 +110,8 @@ const ( type Client struct { *NetStream - pullCtx m7s.PullContext - pushCtx m7s.PushContext + pullCtx m7s.PullJob + pushCtx m7s.PushJob direction string } @@ -125,11 +125,11 @@ func (c *Client) Start() (err error) { return } -func (c *Client) GetPullContext() *m7s.PullContext { +func (c *Client) GetPullJob() *m7s.PullJob { return &c.pullCtx } -func (c *Client) GetPushContext() *m7s.PushContext { +func (c *Client) GetPushJob() *m7s.PushJob { return &c.pushCtx } diff --git a/plugin/rtmp/pkg/net-connection.go b/plugin/rtmp/pkg/net-connection.go index 372c41d..2e393ff 100644 --- a/plugin/rtmp/pkg/net-connection.go +++ b/plugin/rtmp/pkg/net-connection.go @@ -2,12 +2,13 @@ package rtmp import ( "errors" - "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg/task" "net" "runtime" "sync/atomic" + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/task" + "m7s.live/m7s/v5/pkg/util" ) @@ -44,7 +45,7 @@ const ( ) type NetConnection struct { - task.MarcoTask + task.Job *util.BufReader net.Conn bandwidth uint32 diff --git a/plugin/rtsp/pkg/client.go b/plugin/rtsp/pkg/client.go index b1f1ba0..8f57d27 100644 --- a/plugin/rtsp/pkg/client.go +++ b/plugin/rtsp/pkg/client.go @@ -2,11 +2,12 @@ package rtsp import ( "crypto/tls" - "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg/util" "net" "net/url" "strings" + + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/util" ) const ( @@ -59,8 +60,8 @@ func createClient(p *m7s.Connection) (s *Stream, err error) { type Client struct { *Stream - pullCtx m7s.PullContext - pushCtx m7s.PushContext + pullCtx m7s.PullJob + pushCtx m7s.PushJob direction string } @@ -74,11 +75,11 @@ func (c *Client) Start() (err error) { return } -func (c *Client) GetPullContext() *m7s.PullContext { +func (c *Client) GetPullJob() *m7s.PullJob { return &c.pullCtx } -func (c *Client) GetPushContext() *m7s.PushContext { +func (c *Client) GetPushJob() *m7s.PushJob { return &c.pushCtx } diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index 5f29366..9b96ac5 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -2,7 +2,6 @@ package rtsp import ( "encoding/binary" - "m7s.live/m7s/v5/pkg/task" "net" "net/url" "runtime" @@ -11,6 +10,8 @@ import ( "sync/atomic" "time" + "m7s.live/m7s/v5/pkg/task" + "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/util" ) @@ -27,7 +28,7 @@ func NewNetConnection(conn net.Conn) *NetConnection { } type NetConnection struct { - task.MarcoTask + task.Job *util.BufReader Backchannel bool Media string diff --git a/plugin/stress/api.go b/plugin/stress/api.go index cf6b4d8..76be7d9 100644 --- a/plugin/stress/api.go +++ b/plugin/stress/api.go @@ -18,7 +18,7 @@ func (r *StressPlugin) pull(count int, format, url string, puller m7s.Puller) (e if i := r.pullers.Length; count > i { for j := i; j < count; j++ { p := puller() - ctx := p.GetPullContext().Init(p, &r.Plugin, fmt.Sprintf("stress/%d", j), fmt.Sprintf(format, url)) + ctx := p.GetPullJob().Init(p, &r.Plugin, fmt.Sprintf("stress/%d", j), fmt.Sprintf(format, url)) if err = ctx.WaitStarted(); err != nil { return } @@ -40,7 +40,7 @@ func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pu if i := r.pushers.Length; count > i { for j := i; j < count; j++ { p := pusher() - ctx := p.GetPushContext().Init(p, &r.Plugin, streamPath, fmt.Sprintf(format, remoteHost, j)) + ctx := p.GetPushJob().Init(p, &r.Plugin, streamPath, fmt.Sprintf(format, remoteHost, j)) if err = ctx.WaitStarted(); err != nil { return } diff --git a/plugin/stress/index.go b/plugin/stress/index.go index 5e0a4d2..8c9493c 100644 --- a/plugin/stress/index.go +++ b/plugin/stress/index.go @@ -1,17 +1,18 @@ package plugin_stress import ( + "sync" + "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/plugin/stress/pb" - "sync" ) type StressPlugin struct { pb.UnimplementedApiServer m7s.Plugin - pushers util.Collection[string, *m7s.PushContext] - pullers util.Collection[string, *m7s.PullContext] + pushers util.Collection[string, *m7s.PushJob] + pullers util.Collection[string, *m7s.PullJob] } var _ = m7s.InstallPlugin[StressPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler) diff --git a/plugin/webrtc/index.go b/plugin/webrtc/index.go index a16198b..dc1efa1 100644 --- a/plugin/webrtc/index.go +++ b/plugin/webrtc/index.go @@ -159,6 +159,6 @@ func (p *WebRTCPlugin) Pull(streamPath, url string) { p.Error("pull", "error", err) return } - cfClient.GetPullContext().Init(cfClient, &p.Plugin, streamPath, url) + cfClient.GetPullJob().Init(cfClient, &p.Plugin, streamPath, url) } } diff --git a/plugin/webrtc/pkg/client.go b/plugin/webrtc/pkg/client.go index e56188b..bae3a6d 100644 --- a/plugin/webrtc/pkg/client.go +++ b/plugin/webrtc/pkg/client.go @@ -15,19 +15,19 @@ type PullRequest struct { type Client struct { Connection - pullCtx m7s.PullContext - pushCtx m7s.PushContext + pullCtx m7s.PullJob + pushCtx m7s.PushJob direction string appId string token string apiBase string } -func (c *Client) GetPullContext() *m7s.PullContext { +func (c *Client) GetPullJob() *m7s.PullJob { return &c.pullCtx } -func (c *Client) GetPushContext() *m7s.PushContext { +func (c *Client) GetPushJob() *m7s.PushJob { return &c.pushCtx } diff --git a/plugin/webrtc/pkg/cloudflare.go b/plugin/webrtc/pkg/cloudflare.go index 90d877e..025626e 100644 --- a/plugin/webrtc/pkg/cloudflare.go +++ b/plugin/webrtc/pkg/cloudflare.go @@ -4,18 +4,19 @@ import ( "bytes" "encoding/json" "errors" - "github.com/pion/webrtc/v3" - "m7s.live/m7s/v5" "net/http" "net/url" "strings" + + "github.com/pion/webrtc/v3" + "m7s.live/m7s/v5" ) type ( CFClient struct { Connection - pullCtx m7s.PullContext - pushCtx m7s.PushContext + pullCtx m7s.PullJob + pushCtx m7s.PushJob direction string ApiBase string sessionId string @@ -153,10 +154,10 @@ func (c *CFClient) request(href string, body any, result any) (err error) { return } -func (c *CFClient) GetPullContext() *m7s.PullContext { +func (c *CFClient) GetPullJob() *m7s.PullJob { return &c.pullCtx } -func (c *CFClient) GetPushContext() *m7s.PushContext { +func (c *CFClient) GetPushJob() *m7s.PushJob { return &c.pushCtx } diff --git a/plugin/webrtc/pkg/connection.go b/plugin/webrtc/pkg/connection.go index 923aaeb..aefc363 100644 --- a/plugin/webrtc/pkg/connection.go +++ b/plugin/webrtc/pkg/connection.go @@ -2,6 +2,8 @@ package webrtc import ( "errors" + "time" + "github.com/pion/rtcp" "github.com/pion/rtp" . "github.com/pion/webrtc/v3" @@ -9,11 +11,10 @@ import ( "m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5/pkg/util" mrtp "m7s.live/m7s/v5/plugin/rtp/pkg" - "time" ) type Connection struct { - task.MarcoTask + task.Job *PeerConnection SDP string // LocalSDP *sdp.SessionDescription diff --git a/puller.go b/puller.go index d27cb05..616ed33 100644 --- a/puller.go +++ b/puller.go @@ -2,19 +2,20 @@ package m7s import ( "io" - "m7s.live/m7s/v5/pkg" - "m7s.live/m7s/v5/pkg/config" - "m7s.live/m7s/v5/pkg/task" "net/http" "net/url" "os" "strings" "time" + + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/config" + "m7s.live/m7s/v5/pkg/task" ) type ( Connection struct { - task.MarcoTask + task.Job Plugin *Plugin StreamPath string // 对应本地流 RemoteURL string // 远程服务器地址(用于推拉) @@ -23,12 +24,12 @@ type ( IPuller interface { task.ITask - GetPullContext() *PullContext + GetPullJob() *PullJob } Puller = func() IPuller - PullContext struct { + PullJob struct { Connection Publisher *Publisher publishConfig *config.Publish @@ -36,9 +37,9 @@ type ( puller IPuller } - HttpFilePuller struct { + HTTPFilePuller struct { task.Task - Ctx PullContext + PullJob PullJob io.ReadCloser } ) @@ -58,11 +59,11 @@ func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, pro } } -func (p *PullContext) GetPullContext() *PullContext { +func (p *PullJob) GetPullJob() *PullJob { return p } -func (p *PullContext) Init(puller IPuller, plugin *Plugin, streamPath string, url string) *PullContext { +func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, url string) *PullJob { publishConfig := plugin.config.Publish publishConfig.PublishTimeout = 0 p.Pull = plugin.config.Pull @@ -78,16 +79,16 @@ func (p *PullContext) Init(puller IPuller, plugin *Plugin, streamPath string, ur return p } -func (p *PullContext) GetKey() string { +func (p *PullJob) GetKey() string { return p.StreamPath } -func (p *PullContext) Publish() (err error) { +func (p *PullJob) Publish() (err error) { p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, p.StreamPath, *p.publishConfig) return } -func (p *PullContext) Start() (err error) { +func (p *PullJob) Start() (err error) { s := p.Plugin.Server if _, ok := s.Pulls.Get(p.GetKey()); ok { return pkg.ErrStreamExist @@ -97,18 +98,18 @@ func (p *PullContext) Start() (err error) { return } -func (p *PullContext) Dispose() { +func (p *PullJob) Dispose() { p.Plugin.Server.Pulls.Remove(p) } -func (p *HttpFilePuller) Start() (err error) { - if err = p.Ctx.Publish(); err != nil { +func (p *HTTPFilePuller) Start() (err error) { + if err = p.PullJob.Publish(); err != nil { return } - remoteURL := p.Ctx.RemoteURL + remoteURL := p.PullJob.RemoteURL if strings.HasPrefix(remoteURL, "http") { var res *http.Response - if res, err = p.Ctx.HTTPClient.Get(remoteURL); err == nil { + if res, err = p.PullJob.HTTPClient.Get(remoteURL); err == nil { if res.StatusCode != http.StatusOK { return io.EOF } @@ -123,10 +124,10 @@ func (p *HttpFilePuller) Start() (err error) { return } -func (p *HttpFilePuller) GetPullContext() *PullContext { - return &p.Ctx +func (p *HTTPFilePuller) GetPullJob() *PullJob { + return &p.PullJob } -func (p *HttpFilePuller) Dispose() { +func (p *HTTPFilePuller) Dispose() { p.ReadCloser.Close() } diff --git a/pusher.go b/pusher.go index de0c9c9..c1d1592 100644 --- a/pusher.go +++ b/pusher.go @@ -1,32 +1,33 @@ package m7s import ( + "time" + "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/task" - "time" "m7s.live/m7s/v5/pkg/config" ) type IPusher interface { task.ITask - GetPushContext() *PushContext + GetPushJob() *PushJob } type Pusher = func() IPusher -type PushContext struct { +type PushJob struct { Connection Subscriber *Subscriber config.Push pusher IPusher } -func (p *PushContext) GetKey() string { +func (p *PushJob) GetKey() string { return p.RemoteURL } -func (p *PushContext) Init(pusher IPusher, plugin *Plugin, streamPath string, url string) *PushContext { +func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, url string) *PushJob { p.Push = plugin.config.Push p.Connection.Init(plugin, streamPath, url, plugin.config.Push.Proxy) p.Logger = plugin.Logger.With("pushURL", url, "streamPath", streamPath) @@ -39,12 +40,12 @@ func (p *PushContext) Init(pusher IPusher, plugin *Plugin, streamPath string, ur return p } -func (p *PushContext) Subscribe() (err error) { +func (p *PushJob) Subscribe() (err error) { p.Subscriber, err = p.Plugin.Subscribe(p.pusher.GetTask().Context, p.StreamPath) return } -func (p *PushContext) Start() (err error) { +func (p *PushJob) Start() (err error) { s := p.Plugin.Server if _, ok := s.Pushs.Get(p.GetKey()); ok { return pkg.ErrPushRemoteURLExist @@ -54,6 +55,6 @@ func (p *PushContext) Start() (err error) { return } -func (p *PushContext) Dispose() { +func (p *PushJob) Dispose() { p.Plugin.Server.Pushs.Remove(p) } diff --git a/recoder.go b/recoder.go index 369e455..1fd1649 100644 --- a/recoder.go +++ b/recoder.go @@ -1,22 +1,23 @@ package m7s import ( - "m7s.live/m7s/v5/pkg/task" "os" "path/filepath" "time" + "m7s.live/m7s/v5/pkg/task" + "m7s.live/m7s/v5/pkg" ) type ( IRecorder interface { task.ITask - GetRecordContext() *RecordContext + GetRecordJob() *RecordJob } - Recorder = func() IRecorder - RecordContext struct { - task.MarcoTask + Recorder = func() IRecorder + RecordJob struct { + task.Job StreamPath string // 对应本地流 Plugin *Plugin Subscriber *Subscriber @@ -27,28 +28,28 @@ type ( } DefaultRecorder struct { task.Task - Ctx RecordContext + RecordJob RecordJob } ) -func (r *DefaultRecorder) GetRecordContext() *RecordContext { - return &r.Ctx +func (r *DefaultRecorder) GetRecordJob() *RecordJob { + return &r.RecordJob } func (r *DefaultRecorder) Start() (err error) { - return r.Ctx.Subscribe() + return r.RecordJob.Subscribe() } -func (p *RecordContext) GetKey() string { +func (p *RecordJob) GetKey() string { return p.FilePath } -func (p *RecordContext) Subscribe() (err error) { +func (p *RecordJob) Subscribe() (err error) { p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath) return } -func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath string, filePath string) *RecordContext { +func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, filePath string) *RecordJob { p.Plugin = plugin p.Fragment = plugin.config.Record.Fragment p.Append = plugin.config.Record.Append @@ -63,7 +64,7 @@ func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath stri return p } -func (p *RecordContext) Start() (err error) { +func (p *RecordJob) Start() (err error) { s := p.Plugin.Server if _, ok := s.Records.Get(p.GetKey()); ok { return pkg.ErrRecordSamePath diff --git a/server.go b/server.go index 6a1b62b..e3a5a02 100644 --- a/server.go +++ b/server.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log/slog" - "m7s.live/m7s/v5/pkg/task" "net/http" "os" "path/filepath" @@ -13,6 +12,8 @@ import ( "strings" "time" + "m7s.live/m7s/v5/pkg/task" + "m7s.live/m7s/v5/pkg/config" sysruntime "runtime" @@ -71,10 +72,10 @@ type Server struct { Plugins util.Collection[string, *Plugin] Streams task.Manager[string, *Publisher] Waiting util.Collection[string, *WaitStream] - Pulls task.Manager[string, *PullContext] - Pushs task.Manager[string, *PushContext] - Records task.Manager[string, *RecordContext] - Transforms task.Manager[string, *TransformContext] + Pulls task.Manager[string, *PullJob] + Pushs task.Manager[string, *PushJob] + Records task.Manager[string, *RecordJob] + Transforms task.Manager[string, *TransformJob] Subscribers SubscriberCollection LogHandler MultiLogHandler apiList []string @@ -102,7 +103,7 @@ func NewServer(conf any) (s *Server) { } func Run(ctx context.Context, conf any) (err error) { - for err = ErrRestart; errors.Is(err, ErrRestart); err = Servers.AddTask(NewServer(conf), ctx).WaitStopped() { + for err = ErrRestart; errors.Is(err, ErrRestart); err = Servers.Add(NewServer(conf), ctx).WaitStopped() { } return } @@ -197,12 +198,12 @@ func (s *Server) Start() (err error) { } } if httpConf.ListenAddrTLS != "" { - s.stopOnError(httpConf.CreateHTTPSTask(s.Logger)) + s.stopOnError(httpConf.CreateHTTPSWork(s.Logger)) } if httpConf.ListenAddr != "" { - s.stopOnError(httpConf.CreateHTTPTask(s.Logger)) + s.stopOnError(httpConf.CreateHTTPWork(s.Logger)) } - var tcpTask *config.ListenTCPTask + var tcpTask *config.ListenTCPWork if tcpConf.ListenAddr != "" { var opts []grpc.ServerOption s.grpcServer = grpc.NewServer(opts...) @@ -217,7 +218,7 @@ func (s *Server) Start() (err error) { s.Error("register handler faild", "error", err) return } - tcpTask = tcpConf.CreateTCPTask(s.Logger, nil) + tcpTask = tcpConf.CreateTCPWork(s.Logger, nil) if err = s.AddTask(tcpTask).WaitStarted(); err != nil { s.Error("failed to listen", "error", err) return @@ -275,7 +276,7 @@ func (c *CheckSubWaitTimeout) Tick(any) { type GRPCServer struct { task.Task s *Server - tcpTask *config.ListenTCPTask + tcpTask *config.ListenTCPWork } func (gRPC *GRPCServer) Dispose() { diff --git a/subscriber.go b/subscriber.go index 3dd9855..f56f0e6 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,13 +2,14 @@ package m7s import ( "errors" - "m7s.live/m7s/v5/pkg/task" "net/url" "reflect" "runtime" "strings" "time" + "m7s.live/m7s/v5/pkg/task" + . "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/util" @@ -17,7 +18,7 @@ import ( var AVFrameType = reflect.TypeOf((*AVFrame)(nil)) type PubSubBase struct { - task.MarcoTask + task.Job Plugin *Plugin StreamPath string Args url.Values diff --git a/test/server_test.go b/test/server_test.go index 7cda79e..b54300b 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -1,10 +1,12 @@ package test import ( - "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg" + "errors" "testing" "time" + + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg" ) func TestRestart(b *testing.T) { @@ -21,11 +23,9 @@ func TestRestart(b *testing.T) { server.Stop(pkg.ErrStopFromAPI) b.Log("server stop3") }() - for { + for err := pkg.ErrRestart; errors.Is(err, pkg.ErrRestart); { server = m7s.NewServer(conf) - if err := m7s.AddRootTask(server).WaitStopped(); err != pkg.ErrRestart { - return - } + err = m7s.Servers.Add(server).WaitStopped() } //if err := util.RootTask.AddTask(server).WaitStopped(); err != pkg.ErrStopFromAPI { // b.Error("server.Run should return ErrStopFromAPI", err) diff --git a/transformer.go b/transformer.go index 2bee9b0..01799db 100644 --- a/transformer.go +++ b/transformer.go @@ -8,11 +8,11 @@ import ( type ( ITransformer interface { task.ITask - GetTransformContext() *TransformContext + GetTransformJob() *TransformJob } - Transformer = func() ITransformer - TransformContext struct { - task.MarcoTask + Transformer = func() ITransformer + TransformJob struct { + task.Job FromStreamPath string // 待转换的本地流 ToStreamPath string // 转换后的本地流 Plugin *Plugin @@ -22,33 +22,37 @@ type ( } DefaultTransformer struct { task.Task - Ctx TransformContext + TransformJob TransformJob } ) -func (r *DefaultTransformer) GetTransformContext() *TransformContext { - return &r.Ctx +func (r *DefaultTransformer) GetTransformJob() *TransformJob { + return &r.TransformJob } func (r *DefaultTransformer) Start() (err error) { - return r.Ctx.Subscribe() + err = r.TransformJob.Subscribe() + if err == nil { + err = r.TransformJob.Publish() + } + return } -func (p *TransformContext) GetKey() string { +func (p *TransformJob) GetKey() string { return p.ToStreamPath } -func (p *TransformContext) Subscribe() (err error) { +func (p *TransformJob) Subscribe() (err error) { p.Subscriber, err = p.Plugin.Subscribe(p.transformer.GetTask().Context, p.FromStreamPath) return } -func (p *TransformContext) Publish() (err error) { +func (p *TransformJob) Publish() (err error) { p.Publisher, err = p.Plugin.Publish(p.transformer.GetTask().Context, p.ToStreamPath) return } -func (p *TransformContext) Init(transformer ITransformer, plugin *Plugin, fromStreamPath string, toStreamPath string) *TransformContext { +func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, fromStreamPath string, toStreamPath string) *TransformJob { p.Plugin = plugin p.FromStreamPath = fromStreamPath p.ToStreamPath = toStreamPath @@ -61,7 +65,7 @@ func (p *TransformContext) Init(transformer ITransformer, plugin *Plugin, fromSt return p } -func (p *TransformContext) Start() (err error) { +func (p *TransformJob) Start() (err error) { s := p.Plugin.Server if _, ok := s.Transforms.Get(p.GetKey()); ok { return pkg.ErrRecordSamePath @@ -71,6 +75,6 @@ func (p *TransformContext) Start() (err error) { return } -func (p *TransformContext) Dispose() { +func (p *TransformJob) Dispose() { p.Plugin.Server.Transforms.Remove(p) }