mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-28 07:32:09 +08:00
refactor: task system
This commit is contained in:
@@ -74,19 +74,21 @@ func (mt *MarcoTask) RangeSubTask(callback func(task ITask) bool) {
|
||||
}
|
||||
|
||||
func (mt *MarcoTask) AddTaskLazy(t IMarcoTask) {
|
||||
t.GetTask().parent = mt
|
||||
task := t.GetTask()
|
||||
task.parent = mt
|
||||
task.handler = t
|
||||
}
|
||||
|
||||
func (mt *MarcoTask) AddTask(t ITask, opt ...any) (task *Task) {
|
||||
mt.lazyRun.Do(func() {
|
||||
if mt.parent != nil && mt.handler == nil {
|
||||
mt.parent.AddTask(mt)
|
||||
if mt.parent != nil && mt.Context == nil {
|
||||
mt.parent.AddTask(mt.handler)
|
||||
}
|
||||
mt.childrenDisposed = make(chan struct{})
|
||||
mt.addSub = make(chan ITask, 10)
|
||||
go mt.run()
|
||||
})
|
||||
if task = t.GetTask(); task.handler == nil {
|
||||
if task = t.GetTask(); task.Context == nil {
|
||||
task.parentCtx = mt.Context
|
||||
for _, o := range opt {
|
||||
switch v := o.(type) {
|
||||
|
||||
@@ -31,10 +31,10 @@ type RootManager[K comparable, T ManagerItem[K]] struct {
|
||||
}
|
||||
|
||||
func (m *RootManager[K, T]) Init() {
|
||||
m.Context = context.Background()
|
||||
m.Context, m.CancelCauseFunc = context.WithCancelCause(context.Background())
|
||||
m.handler = m
|
||||
m.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
|
||||
m.AddTask(&OSSignal{})
|
||||
m.AddTask(&OSSignal{root: m})
|
||||
}
|
||||
|
||||
func (m *RootManager[K, T]) Shutdown() {
|
||||
|
||||
@@ -196,6 +196,7 @@ func (task *Task) StopReasonIs(err error) bool {
|
||||
|
||||
func (task *Task) Stop(err error) {
|
||||
if err == nil {
|
||||
task.Error("task stop with nil error", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType(), "parent", task.GetParent().GetOwnerType())
|
||||
panic("task stop with nil error")
|
||||
}
|
||||
if task.CancelCauseFunc != nil {
|
||||
|
||||
@@ -12,7 +12,8 @@ import (
|
||||
|
||||
func createMarcoTask() *MarcoTask {
|
||||
var mt MarcoTask
|
||||
mt.initTask(context.Background(), &mt)
|
||||
mt.Context, mt.CancelCauseFunc = context.WithCancelCause(context.Background())
|
||||
mt.handler = &mt
|
||||
mt.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
|
||||
return &mt
|
||||
}
|
||||
@@ -67,7 +68,7 @@ func Test_StopByContext(t *testing.T) {
|
||||
mt := createMarcoTask()
|
||||
var task Task
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
mt.AddTaskWithContext(ctx, &task)
|
||||
mt.AddTask(&task, ctx)
|
||||
time.AfterFunc(time.Millisecond*100, cancel)
|
||||
mt.WaitStopped()
|
||||
if !task.StopReasonIs(context.Canceled) {
|
||||
|
||||
@@ -52,7 +52,7 @@ type (
|
||||
}
|
||||
|
||||
IPlugin interface {
|
||||
task.ITask
|
||||
task.IMarcoTask
|
||||
OnInit() error
|
||||
OnStop()
|
||||
Pull(path string, url string)
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
"m7s.live/m7s/v5/pkg/task"
|
||||
"m7s.live/m7s/v5/plugin/cascade/pkg"
|
||||
"time"
|
||||
|
||||
@@ -23,7 +23,7 @@ type CascadeClientPlugin struct {
|
||||
var _ = m7s.InstallPlugin[CascadeClientPlugin](cascade.NewCascadePuller)
|
||||
|
||||
type CascadeClient struct {
|
||||
util.Task
|
||||
task.Task
|
||||
cfg *CascadeClientPlugin
|
||||
quic.Connection
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
"m7s.live/m7s/v5/pkg/task"
|
||||
flv "m7s.live/m7s/v5/plugin/flv/pkg"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -40,7 +40,7 @@ func (c *RelayAPIConfig) Check(path string) bool {
|
||||
}
|
||||
|
||||
type ReceiveRequestTask struct {
|
||||
util.Task
|
||||
task.Task
|
||||
Plugin *m7s.Plugin
|
||||
quic.Connection
|
||||
quic.Stream
|
||||
|
||||
@@ -21,7 +21,7 @@ type WriteFlvMetaTagQueueTask struct {
|
||||
var writeMetaTagQueueTask WriteFlvMetaTagQueueTask
|
||||
|
||||
func init() {
|
||||
m7s.AddRootTask(&writeMetaTagQueueTask)
|
||||
m7s.Servers.AddTaskLazy(&writeMetaTagQueueTask)
|
||||
}
|
||||
|
||||
type writeMetaTagTask struct {
|
||||
|
||||
@@ -123,10 +123,10 @@ func (d *Dialog) Run() (err error) {
|
||||
var tcpConf config.TCP
|
||||
tcpConf.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
|
||||
tcpConf.ListenNum = 1
|
||||
tcpConf.CreateTCPTask(d.Logger, func(conn *net.TCPConn) task.ITask {
|
||||
d.AddTask(tcpConf.CreateTCPTask(d.Logger, func(conn *net.TCPConn) task.ITask {
|
||||
d.Receiver.RTPReader = (*rtp2.TCP)(conn)
|
||||
return d.Receiver
|
||||
})
|
||||
}))
|
||||
d.Receiver.Demux()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ type WriteTrailerQueueTask struct {
|
||||
var writeTrailerQueueTask WriteTrailerQueueTask
|
||||
|
||||
func init() {
|
||||
m7s.AddRootTask(&writeTrailerQueueTask)
|
||||
m7s.Servers.AddTaskLazy(&writeTrailerQueueTask)
|
||||
}
|
||||
|
||||
func NewRecorder() *Recorder {
|
||||
|
||||
Reference in New Issue
Block a user