diff --git a/api.go b/api.go index 79a88d2..401ad4b 100644 --- a/api.go +++ b/api.go @@ -12,8 +12,8 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" myip "github.com/husanpao/ip" "github.com/shirou/gopsutil/v4/cpu" diff --git a/go.mod b/go.mod index 4dc8afa..9ceacb8 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/juju/errors v1.0.0 // indirect github.com/klauspost/compress v1.18.0 // indirect + github.com/langhuihui/gotask v0.0.0-20250926063623-e8031a3bf4d2 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/marcboeker/go-duckdb v1.0.5 // indirect diff --git a/go.sum b/go.sum index 1bf8ca3..1e57999 100644 --- a/go.sum +++ b/go.sum @@ -158,6 +158,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/langhuihui/gotask v0.0.0-20250926063623-e8031a3bf4d2 h1:PmB8c9hTONwHGfsKd2JTwXttNWJvb1az5Xvv7yHGL+Y= +github.com/langhuihui/gotask v0.0.0-20250926063623-e8031a3bf4d2/go.mod h1:2zNqwV8M1pHoO0b5JC/A37oYpdtXrfL10Qof9AvR5IE= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kULo2bwGEkFvCePZ3qHDDTC3/J9Swo= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= diff --git a/pkg/av_reader.go b/pkg/av_reader.go index 90c1e88..5029dd3 100644 --- a/pkg/av_reader.go +++ b/pkg/av_reader.go @@ -5,9 +5,9 @@ import ( "log/slog" "time" + "github.com/langhuihui/gotask" "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" ) const ( diff --git a/pkg/config/quic.go b/pkg/config/quic.go index c0b9dd4..a966ee8 100644 --- a/pkg/config/quic.go +++ b/pkg/config/quic.go @@ -5,8 +5,8 @@ import ( "crypto/tls" "log/slog" + "github.com/langhuihui/gotask" "github.com/quic-go/quic-go" - "m7s.live/v5/pkg/task" ) type QuicConfig interface { diff --git a/pkg/config/tcp.go b/pkg/config/tcp.go index b26d620..9732ff1 100644 --- a/pkg/config/tcp.go +++ b/pkg/config/tcp.go @@ -8,7 +8,7 @@ import ( "runtime" "time" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" ) //go:embed local.monibuca.com_bundle.pem diff --git a/pkg/config/udp.go b/pkg/config/udp.go index 1004218..4191e9b 100644 --- a/pkg/config/udp.go +++ b/pkg/config/udp.go @@ -6,7 +6,7 @@ import ( "net" "time" - "m7s.live/v5/pkg/task" + task "github.com/langhuihui/gotask" ) type UDP struct { diff --git a/pkg/http_server_fasthttp.go b/pkg/http_server_fasthttp.go index 10e996d..7940fa5 100644 --- a/pkg/http_server_fasthttp.go +++ b/pkg/http_server_fasthttp.go @@ -6,10 +6,10 @@ import ( "crypto/tls" "log/slog" + "github.com/langhuihui/gotask" "github.com/valyala/fasthttp" "github.com/valyala/fasthttp/fasthttpadaptor" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" ) func CreateHTTPWork(conf *config.HTTP, logger *slog.Logger) *ListenFastHTTPWork { diff --git a/pkg/http_server_std.go b/pkg/http_server_std.go index 58b8f4b..51b7d17 100644 --- a/pkg/http_server_std.go +++ b/pkg/http_server_std.go @@ -7,8 +7,8 @@ import ( "log/slog" "net/http" + "github.com/langhuihui/gotask" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" ) func CreateHTTPWork(conf *config.HTTP, logger *slog.Logger) *ListenHTTPWork { diff --git a/pkg/log.go b/pkg/log.go index bd8f148..b63ce4f 100644 --- a/pkg/log.go +++ b/pkg/log.go @@ -6,7 +6,7 @@ import ( "slices" "sync" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" ) var _ slog.Handler = (*MultiLogHandler)(nil) diff --git a/pkg/ring-writer.go b/pkg/ring-writer.go index 66313e5..4a78fd4 100644 --- a/pkg/ring-writer.go +++ b/pkg/ring-writer.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "time" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" "m7s.live/v5/pkg/util" ) diff --git a/pkg/task/README.md b/pkg/task/README.md deleted file mode 100644 index a2271d5..0000000 --- a/pkg/task/README.md +++ /dev/null @@ -1,59 +0,0 @@ -# 任务系统概要 - -# 任务的启动 - -任务通过调用父任务的 AddTask 来启动,此时会进入队列中等待启动,父任务的 EventLoop 会接受到子任务,然后调用子任务的 Start 方法进行启动操作 - -## EventLoop 的初始化 -为了节省资源,EventLoop 在没有子任务时不会创建协程,一直等到有子任务时才会创建,并且如果这个子任务也是一个空的 Job(即没有 Start、Run、Go)则仍然不会创建协程。 - -## EventLoop 停止 -为了节省资源,当 EventLoop 中没有待执行的子任务时,需要退出协程。EventLoop 会在以下情况退出: - -1. 没有待处理的任务且没有活跃的子任务,且父任务的 keepalive() 返回 false -2. EventLoop 的状态被设置为停止状态(-1) - -# 任务的停止 - -## 主动停止某个任务 - -调用任务的 Stop 方法即可停止某个任务,此时该任务会由其父任务的 eventLoop 检测到 context 取消信号然后开始执行任务的 dispose 来进行销毁 - -## 任务的意外退出 - -当任务的 Run 返回错误,或者 context 被取消时,任务会退出,最终流程会同主动停止一样 - -## 父任务停止 - -当父任务停止并销毁时,会按照以下步骤处理子任务: - -### 步骤 - -1. **设置 EventLoop 的状态为停止状态**:调用 `stop()` 方法设置 status = -1,防止继续添加子任务 -2. **激活 EventLoop 处理剩余任务**:调用 `active()` 方法,即使状态为 -1 也能处理剩余的子任务 -3. **停止所有子任务**:调用所有子任务的 Stop 方法 -4. **等待子任务销毁完成**:等待 EventLoop 处理完所有子任务的销毁工作 - -### 设计要点 - -- EventLoop 的 `active()` 方法允许在状态为 -1 时调用,以确保剩余的子任务能被正确处理 -- 使用互斥锁保护状态转换,避免竞态条件 -- 先停止再处理剩余任务,确保不会添加新的子任务 - -## 竞态条件处理 - -为了确保任务系统的线程安全,我们采取了以下措施: - -### 状态管理 -- 使用 `sync.RWMutex` 保护 EventLoop 的状态转换 -- `add()` 方法使用读锁检查状态,防止在停止后添加新任务 -- `stop()` 方法使用写锁设置状态,确保原子性 - -### EventLoop 生命周期 -- EventLoop 只有在状态从 0(ready)转换到 1(running)时才启动新的 goroutine -- 即使状态为 -1(stopped),`active()` 方法仍可被调用以处理剩余任务 -- 使用 `hasPending` 标志和互斥锁跟踪待处理任务,避免频繁检查 channel 长度 - -### 任务添加 -- 添加任务时会检查 EventLoop 状态,如果已停止则返回 `ErrDisposed` -- 使用 `pendingMux` 保护 `hasPending` 标志,避免竞态条件 \ No newline at end of file diff --git a/pkg/task/channel.go b/pkg/task/channel.go deleted file mode 100644 index 9c021f1..0000000 --- a/pkg/task/channel.go +++ /dev/null @@ -1,69 +0,0 @@ -package task - -import ( - "time" -) - -type ITickTask interface { - IChannelTask - GetTickInterval() time.Duration - GetTicker() *time.Ticker -} - -type ChannelTask struct { - Task - SignalChan any -} - -func (*ChannelTask) GetTaskType() TaskType { - return TASK_TYPE_CHANNEL -} - -func (t *ChannelTask) GetSignal() any { - return t.SignalChan -} - -func (t *ChannelTask) Tick(any) { -} - -type TickTask struct { - ChannelTask - Ticker *time.Ticker -} - -func (t *TickTask) GetTicker() *time.Ticker { - return t.Ticker -} - -func (t *TickTask) GetTickInterval() time.Duration { - return time.Second -} - -func (t *TickTask) Start() (err error) { - t.Ticker = time.NewTicker(t.handler.(ITickTask).GetTickInterval()) - t.SignalChan = t.Ticker.C - t.OnStop(func() { - t.Ticker.Reset(time.Millisecond) - }) - return -} - -type AsyncTickTask struct { - TickTask -} - -func (t *AsyncTickTask) GetSignal() any { - return t.Task.GetSignal() -} - -func (t *AsyncTickTask) Go() error { - t.handler.(ITickTask).Tick(nil) - for { - select { - case c := <-t.Ticker.C: - t.handler.(ITickTask).Tick(c) - case <-t.Done(): - return nil - } - } -} diff --git a/pkg/task/event_loop.go b/pkg/task/event_loop.go deleted file mode 100644 index c62b4b0..0000000 --- a/pkg/task/event_loop.go +++ /dev/null @@ -1,167 +0,0 @@ -package task - -import ( - "errors" - "reflect" - "runtime/debug" - "slices" - "sync" - "sync/atomic" -) - -type Singleton[T comparable] struct { - instance atomic.Value - mux sync.Mutex -} - -func (s *Singleton[T]) Load() T { - return s.instance.Load().(T) -} - -func (s *Singleton[T]) Get(newF func() T) T { - ch := s.instance.Load() //fast - if ch == nil { // slow - s.mux.Lock() - defer s.mux.Unlock() - if ch = s.instance.Load(); ch == nil { - ch = newF() - s.instance.Store(ch) - } - } - return ch.(T) -} - -type EventLoop struct { - cases []reflect.SelectCase - children []ITask - addSub Singleton[chan any] - running atomic.Bool -} - -func (e *EventLoop) getInput() chan any { - return e.addSub.Get(func() chan any { - return make(chan any, 20) - }) -} - -func (e *EventLoop) active(mt *Job) { - if mt.parent != nil { - mt.parent.eventLoop.active(mt.parent) - } - if e.running.CompareAndSwap(false, true) { - go e.run(mt) - } -} - -func (e *EventLoop) add(mt *Job, sub any) (err error) { - shouldActive := true - switch sub.(type) { - case TaskStarter, TaskBlock, TaskGo: - case IJob: - shouldActive = false - } - select { - case e.getInput() <- sub: - if shouldActive || mt.IsStopped() { - e.active(mt) - } - return nil - default: - return ErrTooManyChildren - } -} - -func (e *EventLoop) run(mt *Job) { - mt.Debug("event loop start", "jobId", mt.GetTaskID(), "type", mt.GetOwnerType()) - ch := e.getInput() - e.cases = []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}} - defer func() { - err := recover() - if err != nil { - mt.Error("job panic", "err", err, "stack", string(debug.Stack())) - if !ThrowPanic { - mt.Stop(errors.Join(err.(error), ErrPanic)) - } else { - panic(err) - } - } - mt.Debug("event loop exit", "jobId", mt.GetTaskID(), "type", mt.GetOwnerType()) - if !mt.handler.keepalive() { - if mt.blocked != nil { - mt.Stop(errors.Join(mt.blocked.StopReason(), ErrAutoStop)) - } else { - mt.Stop(ErrAutoStop) - } - } - mt.blocked = nil - }() - - // Main event loop - only exit when no more events AND no children - for { - if len(ch) == 0 && len(e.children) == 0 { - if e.running.CompareAndSwap(true, false) { - if len(ch) > 0 { // if add before running set to false - e.active(mt) - } - return - } - } - mt.blocked = nil - if chosen, rev, ok := reflect.Select(e.cases); chosen == 0 { - if !ok { - mt.Debug("job addSub channel closed, exiting", "taskId", mt.GetTaskID()) - mt.Stop(ErrAutoStop) - return - } - switch v := rev.Interface().(type) { - case func(): - v() - case ITask: - if len(e.cases) >= 65535 { - mt.Warn("task children too many, may cause performance issue", "count", len(e.cases), "taskId", mt.GetTaskID(), "taskType", mt.GetTaskType(), "ownerType", mt.GetOwnerType()) - v.Stop(ErrTooManyChildren) - continue - } - if mt.blocked = v; v.start() { - e.cases = append(e.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.GetSignal())}) - e.children = append(e.children, v) - mt.onChildStart(v) - } else { - mt.removeChild(v) - } - } - } else { - taskIndex := chosen - 1 - child := e.children[taskIndex] - mt.blocked = child - switch tt := mt.blocked.(type) { - case IChannelTask: - if tt.IsStopped() { - switch ttt := tt.(type) { - case ITickTask: - ttt.GetTicker().Stop() - } - mt.onChildDispose(child) - mt.removeChild(child) - e.children = slices.Delete(e.children, taskIndex, taskIndex+1) - e.cases = slices.Delete(e.cases, chosen, chosen+1) - } else { - tt.Tick(rev.Interface()) - } - default: - if !ok { - if mt.onChildDispose(child); child.checkRetry(child.StopReason()) { - if child.reset(); child.start() { - e.cases[chosen].Chan = reflect.ValueOf(child.GetSignal()) - mt.onChildStart(child) - continue - } - } - mt.removeChild(child) - e.children = slices.Delete(e.children, taskIndex, taskIndex+1) - e.cases = slices.Delete(e.cases, chosen, chosen+1) - } - } - } - } -} diff --git a/pkg/task/job.go b/pkg/task/job.go deleted file mode 100644 index 7c38504..0000000 --- a/pkg/task/job.go +++ /dev/null @@ -1,202 +0,0 @@ -package task - -import ( - "context" - "fmt" - "log/slog" - "runtime" - "strings" - "sync" - "sync/atomic" - - "m7s.live/v5/pkg/util" -) - -var idG atomic.Uint32 -var sourceFilePathPrefix string - -func init() { - if _, file, _, ok := runtime.Caller(0); ok { - sourceFilePathPrefix = strings.TrimSuffix(file, "pkg/task/job.go") - } -} - -func GetNextTaskID() uint32 { - return idG.Add(1) -} - -// Job include tasks -type Job struct { - Task - children sync.Map - descendantsDisposeListeners []func(ITask) - descendantsStartListeners []func(ITask) - blocked ITask - eventLoop EventLoop - Size atomic.Int32 -} - -func (*Job) GetTaskType() TaskType { - return TASK_TYPE_JOB -} - -func (mt *Job) getJob() *Job { - return mt -} - -func (mt *Job) Blocked() ITask { - return mt.blocked -} - -func (mt *Job) EventLoopRunning() bool { - return mt.eventLoop.running.Load() -} - -func (mt *Job) waitChildrenDispose(stopReason error) { - mt.eventLoop.active(mt) - mt.children.Range(func(key, value any) bool { - child := value.(ITask) - child.Stop(stopReason) - child.WaitStopped() - return true - }) -} - -func (mt *Job) OnDescendantsDispose(listener func(ITask)) { - mt.descendantsDisposeListeners = append(mt.descendantsDisposeListeners, listener) -} - -func (mt *Job) onDescendantsDispose(descendants ITask) { - for _, listener := range mt.descendantsDisposeListeners { - listener(descendants) - } - if mt.parent != nil { - mt.parent.onDescendantsDispose(descendants) - } -} - -func (mt *Job) onChildDispose(child ITask) { - mt.onDescendantsDispose(child) - child.dispose() -} - -func (mt *Job) removeChild(child ITask) { - value, loaded := mt.children.LoadAndDelete(child.getKey()) - if loaded { - if value != child { - panic("remove child") - } - remains := mt.Size.Add(-1) - mt.Debug("remove child", "id", child.GetTaskID(), "remains", remains) - } -} - -func (mt *Job) OnDescendantsStart(listener func(ITask)) { - mt.descendantsStartListeners = append(mt.descendantsStartListeners, listener) -} - -func (mt *Job) onDescendantsStart(descendants ITask) { - for _, listener := range mt.descendantsStartListeners { - listener(descendants) - } - if mt.parent != nil { - mt.parent.onDescendantsStart(descendants) - } -} - -func (mt *Job) onChildStart(child ITask) { - mt.onDescendantsStart(child) -} - -func (mt *Job) RangeSubTask(callback func(task ITask) bool) { - mt.children.Range(func(key, value any) bool { - callback(value.(ITask)) - return true - }) -} - -func (mt *Job) AddDependTask(t ITask, opt ...any) (task *Task) { - t.Using(mt) - opt = append(opt, 1) - return mt.AddTask(t, opt...) -} - -func (mt *Job) initContext(task *Task, opt ...any) { - callDepth := 2 - for _, o := range opt { - switch v := o.(type) { - case context.Context: - task.parentCtx = v - case Description: - task.SetDescriptions(v) - case RetryConfig: - task.retry = v - case *slog.Logger: - task.Logger = v - case int: - callDepth += v - } - } - _, file, line, ok := runtime.Caller(callDepth) - if ok { - task.StartReason = fmt.Sprintf("%s:%d", strings.TrimPrefix(file, sourceFilePathPrefix), line) - } - task.parent = mt - if task.parentCtx == nil { - task.parentCtx = mt.Context - } - task.level = mt.level + 1 - if task.ID == 0 { - task.ID = GetNextTaskID() - } - task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx) - task.startup = util.NewPromise(task.Context) - task.shutdown = util.NewPromise(context.Background()) - if task.Logger == nil { - task.Logger = mt.Logger - } -} - -func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { - task = t.GetTask() - task.handler = t - mt.initContext(task, opt...) - if mt.IsStopped() { - task.startup.Reject(mt.StopReason()) - return - } - actual, loaded := mt.children.LoadOrStore(t.getKey(), t) - if loaded { - task.startup.Reject(ExistTaskError{ - Task: actual.(ITask), - }) - return - } - var err error - defer func() { - if err != nil { - mt.children.Delete(t.getKey()) - task.startup.Reject(err) - } - }() - if err = mt.eventLoop.add(mt, t); err != nil { - return - } - if mt.IsStopped() { - err = mt.StopReason() - return - } - remains := mt.Size.Add(1) - mt.Debug("child added", "id", task.ID, "remains", remains) - return -} - -func (mt *Job) Call(callback func()) { - if mt.Size.Load() <= 0 { - callback() - return - } - ctx, cancel := context.WithCancel(mt) - _ = mt.eventLoop.add(mt, func() { callback(); cancel() }) - <-ctx.Done() -} diff --git a/pkg/task/panic.go b/pkg/task/panic.go deleted file mode 100644 index 749d2be..0000000 --- a/pkg/task/panic.go +++ /dev/null @@ -1,6 +0,0 @@ -//go:build !taskpanic -// +build !taskpanic - -package task - -var ThrowPanic = false diff --git a/pkg/task/panic_true.go b/pkg/task/panic_true.go deleted file mode 100644 index 1070a3f..0000000 --- a/pkg/task/panic_true.go +++ /dev/null @@ -1,6 +0,0 @@ -//go:build taskpanic -// +build taskpanic - -package task - -var ThrowPanic = true diff --git a/pkg/task/root.go b/pkg/task/root.go deleted file mode 100644 index 7c7098b..0000000 --- a/pkg/task/root.go +++ /dev/null @@ -1,54 +0,0 @@ -package task - -import ( - "context" - "log/slog" - "os" - "os/signal" - "syscall" - "time" -) - -type shutdown interface { - Shutdown() -} - -type OSSignal struct { - ChannelTask - root shutdown -} - -func (o *OSSignal) Start() error { - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - o.SignalChan = signalChan - o.OnStop(func() { - signal.Stop(signalChan) - close(signalChan) - }) - return nil -} - -func (o *OSSignal) Tick(any) { - println("OSSignal Tick") - go o.root.Shutdown() -} - -type RootManager[K comparable, T ManagerItem[K]] struct { - WorkCollection[K, T] -} - -func (m *RootManager[K, T]) Init() { - m.parentCtx = context.Background() - m.reset() - m.handler = m - m.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) - m.StartTime = time.Now() - m.AddTask(&OSSignal{root: m}).WaitStarted() - m.state = TASK_STATE_STARTED -} - -func (m *RootManager[K, T]) Shutdown() { - m.Stop(ErrExit) - m.dispose() -} diff --git a/pkg/task/task.go b/pkg/task/task.go deleted file mode 100644 index 0871466..0000000 --- a/pkg/task/task.go +++ /dev/null @@ -1,543 +0,0 @@ -package task - -import ( - "context" - "errors" - "fmt" - "io" - "log/slog" - "maps" - "reflect" - "runtime" - "runtime/debug" - "strings" - "sync" - "time" - "unsafe" - - "m7s.live/v5/pkg/util" -) - -const TraceLevel = slog.Level(-8) -const OwnerTypeKey = "ownerType" - -var ( - ErrAutoStop = errors.New("auto stop") - ErrRetryRunOut = errors.New("retry out") - ErrStopByUser = errors.New("stop by user") - ErrRestart = errors.New("restart") - ErrTaskComplete = errors.New("complete") - ErrTimeout = errors.New("timeout") - ErrExit = errors.New("exit") - ErrPanic = errors.New("panic") - ErrTooManyChildren = errors.New("too many children in job") - ErrDisposed = errors.New("disposed") -) - -const ( - TASK_STATE_INIT TaskState = iota - TASK_STATE_STARTING - TASK_STATE_STARTED - TASK_STATE_RUNNING - TASK_STATE_GOING - TASK_STATE_DISPOSING - TASK_STATE_DISPOSED -) - -const ( - TASK_TYPE_TASK TaskType = iota - TASK_TYPE_JOB - TASK_TYPE_Work - TASK_TYPE_CHANNEL -) - -type ( - TaskState byte - TaskType byte - ITask interface { - context.Context - keepalive() bool - GetParent() ITask - GetTask() *Task - GetTaskID() uint32 - GetSignal() any - Stop(error) - StopReason() error - start() bool - dispose() - checkRetry(error) bool - reset() - IsStopped() bool - GetTaskType() TaskType - GetOwnerType() string - GetDescriptions() map[string]string - SetDescription(key string, value any) - SetDescriptions(value Description) - SetRetry(maxRetry int, retryInterval time.Duration) - Using(resource ...any) - OnStop(any) - OnStart(func()) - OnDispose(func()) - GetState() TaskState - GetLevel() byte - WaitStopped() error - WaitStarted() error - getKey() any - } - IJob interface { - ITask - getJob() *Job - AddTask(ITask, ...any) *Task - RangeSubTask(func(yield ITask) bool) - OnDescendantsDispose(func(ITask)) - OnDescendantsStart(func(ITask)) - Blocked() ITask - EventLoopRunning() bool - Call(func()) - } - IChannelTask interface { - ITask - Tick(any) - } - TaskStarter interface { - Start() error - } - TaskDisposal interface { - Dispose() - } - TaskBlock interface { - Run() error - } - TaskGo interface { - Go() error - } - RetryConfig struct { - MaxRetry int - RetryCount int - RetryInterval time.Duration - } - Description = map[string]any - TaskContextKey string - Task struct { - ID uint32 - StartTime time.Time - StartReason string - Logger *slog.Logger - context.Context - context.CancelCauseFunc - handler ITask - retry RetryConfig - afterStartListeners, afterDisposeListeners []func() - closeOnStop []any - resources []any - stopOnce sync.Once - description sync.Map - startup, shutdown *util.Promise - parent *Job - parentCtx context.Context - state TaskState - level byte - } -) - -func FromPointer(pointer uintptr) *Task { - return (*Task)(unsafe.Pointer(pointer)) -} - -func (*Task) keepalive() bool { - return false -} - -func (task *Task) GetState() TaskState { - return task.state -} -func (task *Task) GetLevel() byte { - return task.level -} -func (task *Task) GetParent() ITask { - if task.parent != nil { - return task.parent.handler - } - return nil -} -func (task *Task) SetRetry(maxRetry int, retryInterval time.Duration) { - task.retry.MaxRetry = maxRetry - task.retry.RetryInterval = retryInterval -} -func (task *Task) GetTaskID() uint32 { - return task.ID -} -func (task *Task) GetOwnerType() string { - if ownerType, ok := task.description.Load(OwnerTypeKey); ok { - return ownerType.(string) - } - return strings.TrimSuffix(reflect.TypeOf(task.handler).Elem().Name(), "Task") -} - -func (*Task) GetTaskType() TaskType { - return TASK_TYPE_TASK -} - -func (task *Task) GetTask() *Task { - return task -} - -func (task *Task) GetTaskPointer() uintptr { - return uintptr(unsafe.Pointer(task)) -} - -func (task *Task) GetKey() uint32 { - return task.ID -} - -func (task *Task) getKey() any { - return reflect.ValueOf(task.handler).MethodByName("GetKey").Call(nil)[0].Interface() -} - -func (task *Task) WaitStarted() error { - if task.startup == nil { - return nil - } - return task.startup.Await() -} - -func (task *Task) WaitStopped() (err error) { - err = task.WaitStarted() - if err != nil { - return err - } - //if task.shutdown == nil { - // return task.StopReason() - //} - return task.shutdown.Await() -} - -func (task *Task) Trace(msg string, fields ...any) { - if task.Logger == nil { - slog.Default().Log(task.Context, TraceLevel, msg, fields...) - return - } - task.Logger.Log(task.Context, TraceLevel, msg, fields...) -} - -func (task *Task) IsStopped() bool { - return task.Err() != nil -} - -func (task *Task) StopReason() error { - return context.Cause(task.Context) -} - -func (task *Task) StopReasonIs(errs ...error) bool { - stopReason := task.StopReason() - for _, err := range errs { - if errors.Is(err, stopReason) { - return true - } - } - return false -} - -func (task *Task) Stop(err error) { - if err == nil { - task.Error("task stop with nil error", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType(), "parent", task.GetParent().GetOwnerType()) - panic("task stop with nil error") - } - _, file, line, _ := runtime.Caller(1) - task.stopOnce.Do(func() { - if task.CancelCauseFunc != nil { - msg := "task stop" - if task.startup.IsRejected() { - msg = "task start failed" - } - task.Debug(msg, "caller", fmt.Sprintf("%s:%d", strings.TrimPrefix(file, sourceFilePathPrefix), line), "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) - task.CancelCauseFunc(err) - } - task.stop() - }) -} - -func (task *Task) stop() { - for _, resource := range task.closeOnStop { - switch v := resource.(type) { - case func(): - v() - case func() error: - v() - case ITask: - v.Stop(task.StopReason()) - } - } - task.closeOnStop = task.closeOnStop[:0] -} - -func (task *Task) OnStart(listener func()) { - task.afterStartListeners = append(task.afterStartListeners, listener) -} - -func (task *Task) OnDispose(listener func()) { - task.afterDisposeListeners = append(task.afterDisposeListeners, listener) -} - -func (task *Task) Using(resource ...any) { - task.resources = append(task.resources, resource...) -} - -func (task *Task) OnStop(resource any) { - if t, ok := resource.(ITask); ok && t.GetTask() == task { - panic("onStop resource is task itself") - } - task.closeOnStop = append(task.closeOnStop, resource) -} - -func (task *Task) GetSignal() any { - return task.Done() -} - -func (task *Task) checkRetry(err error) bool { - if errors.Is(err, ErrTaskComplete) || errors.Is(err, ErrExit) || errors.Is(err, ErrStopByUser) { - return false - } - if task.parent.IsStopped() { - return false - } - if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry { - task.retry.RetryCount++ - task.SetDescription("retryCount", task.retry.RetryCount) - if task.retry.MaxRetry < 0 { - task.Warn(fmt.Sprintf("retry %d/∞", task.retry.RetryCount), "taskId", task.ID) - } else { - task.Warn(fmt.Sprintf("retry %d/%d", task.retry.RetryCount, task.retry.MaxRetry), "taskId", task.ID) - } - if delta := time.Since(task.StartTime); delta < task.retry.RetryInterval { - time.Sleep(task.retry.RetryInterval - delta) - } - return true - } else { - if task.retry.MaxRetry > 0 { - task.Warn(fmt.Sprintf("max retry %d failed", task.retry.MaxRetry)) - return false - } - } - return errors.Is(err, ErrRestart) -} - -func (task *Task) start() bool { - var err error - if !ThrowPanic { - defer func() { - if r := recover(); r != nil { - err = errors.New(fmt.Sprint(r)) - task.Error("panic", "error", err, "stack", string(debug.Stack())) - } - }() - } - for { - task.StartTime = time.Now() - task.Debug("task start", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType(), "reason", task.StartReason) - task.state = TASK_STATE_STARTING - if v, ok := task.handler.(TaskStarter); ok { - err = v.Start() - } - if err == nil { - task.state = TASK_STATE_STARTED - task.startup.Fulfill(err) - for _, listener := range task.afterStartListeners { - if task.IsStopped() { - break - } - listener() - } - if task.IsStopped() { - err = task.StopReason() - } else { - task.ResetRetryCount() - if runHandler, ok := task.handler.(TaskBlock); ok { - task.state = TASK_STATE_RUNNING - task.Debug("task run", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) - err = runHandler.Run() - if err == nil { - err = ErrTaskComplete - } - } - } - } - if err == nil { - if goHandler, ok := task.handler.(TaskGo); ok { - task.state = TASK_STATE_GOING - task.Debug("task go", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) - go task.run(goHandler.Go) - } - return true - } else { - task.Stop(err) - task.parent.onChildDispose(task.handler) - if task.checkRetry(err) { - task.reset() - } else { - return false - } - } - } -} - -func (task *Task) reset() { - task.stopOnce = sync.Once{} - task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx) - task.shutdown = util.NewPromise(context.Background()) - task.startup = util.NewPromise(task.Context) -} - -func (task *Task) GetDescriptions() map[string]string { - return maps.Collect(func(yield func(key, value string) bool) { - task.description.Range(func(key, value any) bool { - return yield(key.(string), fmt.Sprintf("%+v", value)) - }) - }) -} - -func (task *Task) GetDescription(key string) (any, bool) { - return task.description.Load(key) -} - -func (task *Task) SetDescription(key string, value any) { - task.description.Store(key, value) -} - -func (task *Task) RemoveDescription(key string) { - task.description.Delete(key) -} - -func (task *Task) SetDescriptions(value Description) { - for k, v := range value { - task.description.Store(k, v) - } -} - -func (task *Task) dispose() { - taskType, ownerType := task.handler.GetTaskType(), task.GetOwnerType() - if task.state < TASK_STATE_STARTED { - task.Debug("task dispose canceled", "taskId", task.ID, "taskType", taskType, "ownerType", ownerType, "state", task.state) - return - } - reason := task.StopReason() - task.state = TASK_STATE_DISPOSING - yargs := []any{"reason", reason, "taskId", task.ID, "taskType", taskType, "ownerType", ownerType} - task.Debug("task dispose", yargs...) - defer task.Debug("task disposed", yargs...) - if job, ok := task.handler.(IJob); ok { - mt := job.getJob() - task.SetDescription("disposeProcess", "wait children") - mt.waitChildrenDispose(reason) - } - task.SetDescription("disposeProcess", "self") - if v, ok := task.handler.(TaskDisposal); ok { - v.Dispose() - } - task.shutdown.Fulfill(reason) - task.SetDescription("disposeProcess", "resources") - task.stopOnce.Do(task.stop) - for _, resource := range task.resources { - switch v := resource.(type) { - case func(): - v() - case ITask: - v.Stop(task.StopReason()) - case util.Recyclable: - v.Recycle() - case io.Closer: - v.Close() - } - } - task.resources = task.resources[:0] - for i, listener := range task.afterDisposeListeners { - task.SetDescription("disposeProcess", fmt.Sprintf("a:%d/%d", i, len(task.afterDisposeListeners))) - listener() - } - task.SetDescription("disposeProcess", "done") - task.state = TASK_STATE_DISPOSED -} - -func (task *Task) ResetRetryCount() { - task.retry.RetryCount = 0 -} - -func (task *Task) GetRetryCount() int { - return task.retry.RetryCount -} - -func (task *Task) run(handler func() error) { - var err error - defer func() { - if !ThrowPanic { - if r := recover(); r != nil { - err = errors.New(fmt.Sprint(r)) - task.Error("panic", "error", err, "stack", string(debug.Stack())) - } - } - if err == nil { - task.Stop(ErrTaskComplete) - } else { - task.Stop(err) - } - }() - err = handler() -} - -func (task *Task) Debug(msg string, args ...any) { - if task.Logger == nil { - slog.Default().Debug(msg, args...) - return - } - task.Logger.Debug(msg, args...) -} - -func (task *Task) Info(msg string, args ...any) { - if task.Logger == nil { - slog.Default().Info(msg, args...) - return - } - task.Logger.Info(msg, args...) -} - -func (task *Task) Warn(msg string, args ...any) { - if task.Logger == nil { - slog.Default().Warn(msg, args...) - return - } - task.Logger.Warn(msg, args...) -} - -func (task *Task) Error(msg string, args ...any) { - if task.Logger == nil { - slog.Default().Error(msg, args...) - return - } - task.Logger.Error(msg, args...) -} - -func (task *Task) TraceEnabled() bool { - return task.Logger.Enabled(task.Context, TraceLevel) -} - -func (task *Task) RunTask(t ITask, opt ...any) (err error) { - tt := t.GetTask() - tt.handler = t - mt := task.parent - if job, ok := task.handler.(IJob); ok { - mt = job.getJob() - } - mt.initContext(tt, opt...) - if mt.IsStopped() { - err = mt.StopReason() - task.startup.Reject(err) - return - } - task.OnStop(t) - started := tt.start() - <-tt.Done() - if started { - tt.dispose() - } - return tt.StopReason() -} diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go deleted file mode 100644 index 5f1d3a6..0000000 --- a/pkg/task/task_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package task - -import ( - "context" - "errors" - "io" - "log/slog" - "os" - "sync/atomic" - "testing" - "time" -) - -var root Work - -func init() { - root.Context, root.CancelCauseFunc = context.WithCancelCause(context.Background()) - root.handler = &root - root.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) - slog.SetLogLoggerLevel(slog.LevelDebug) -} - -func Test_AddTask_AddsTaskSuccessfully(t *testing.T) { - var task Task - root.AddTask(&task) - _ = task.WaitStarted() - root.RangeSubTask(func(t ITask) bool { - if t.GetTaskID() == task.GetTaskID() { - return false - } - return true - }) -} - -type retryDemoTask struct { - Task -} - -func (task *retryDemoTask) Start() error { - return io.ErrClosedPipe -} - -func Test_RetryTask(t *testing.T) { - var demoTask retryDemoTask - var parent Job - root.AddTask(&parent) - demoTask.SetRetry(3, time.Second) - parent.AddTask(&demoTask) - _ = parent.WaitStopped() - if demoTask.retry.RetryCount != 3 { - t.Errorf("expected 3 retries, got %d", demoTask.retry.RetryCount) - } -} - -func Test_Call_ExecutesCallback(t *testing.T) { - called := false - root.Call(func() { - called = true - return - }) - if !called { - t.Errorf("expected callback to be called") - } -} - -func Test_StopByContext(t *testing.T) { - var task Task - ctx, cancel := context.WithCancel(context.Background()) - root.AddTask(&task, ctx) - time.AfterFunc(time.Millisecond*100, cancel) - if !errors.Is(task.WaitStopped(), context.Canceled) { - t.Errorf("expected task to be stopped by context") - } -} - -func Test_ParentStop(t *testing.T) { - var parent Job - root.AddTask(&parent) - var called atomic.Uint32 - var task Task - checkCalled := func(expected uint32) { - if count := called.Add(1); count != expected { - t.Errorf("expected %d, got %d", expected, count) - } - } - task.OnDispose(func() { - checkCalled(1) - }) - parent.OnDispose(func() { - checkCalled(2) - }) - parent.AddTask(&task) - parent.Stop(ErrAutoStop) - if !errors.Is(task.WaitStopped(), ErrAutoStop) { - t.Errorf("expected task auto stop") - } -} - -func Test_ParentAutoStop(t *testing.T) { - var parent Job - root.AddTask(&parent) - var called atomic.Uint32 - var task Task - checkCalled := func(expected uint32) { - if count := called.Add(1); count != expected { - t.Errorf("expected %d, got %d", expected, count) - } - } - task.OnDispose(func() { - checkCalled(1) - }) - parent.OnDispose(func() { - checkCalled(2) - }) - parent.AddTask(&task) - time.AfterFunc(time.Second, func() { - task.Stop(ErrTaskComplete) - }) - if !errors.Is(parent.WaitStopped(), ErrAutoStop) { - t.Errorf("expected task auto stop") - } -} - -func Test_Hooks(t *testing.T) { - var called atomic.Uint32 - var task Task - checkCalled := func(expected uint32) { - if count := called.Add(1); count != expected { - t.Errorf("expected %d, got %d", expected, count) - } - } - task.OnStart(func() { - checkCalled(1) - }) - task.OnDispose(func() { - checkCalled(3) - }) - task.OnStart(func() { - checkCalled(2) - }) - task.OnDispose(func() { - checkCalled(4) - }) - task.Stop(ErrTaskComplete) - root.AddTask(&task).WaitStopped() -} - -type startFailTask struct { - Task -} - -func (task *startFailTask) Start() error { - return errors.New("start failed") -} - -func (task *startFailTask) Dispose() { - task.Logger.Info("Dispose") -} - -func Test_StartFail(t *testing.T) { - var task startFailTask - root.AddTask(&task) - if err := task.WaitStarted(); err == nil { - t.Errorf("expected start to fail") - } -} - -func Test_Block(t *testing.T) { - var task Task - block := make(chan struct{}) - var job Job - task.OnStart(func() { - task.OnStop(func() { - close(block) - }) - <-block - }) - time.AfterFunc(time.Second*2, func() { - job.Stop(ErrTaskComplete) - }) - root.AddTask(&job) - job.AddTask(&task) - job.WaitStopped() -} - -// -//type DemoTask struct { -// Task -// file *os.File -// filePath string -//} -// -//func (d *DemoTask) Start() (err error) { -// d.file, err = os.Open(d.filePath) -// return -//} -// -//func (d *DemoTask) Run() (err error) { -// _, err = d.file.Write([]byte("hello")) -// return -//} -// -//func (d *DemoTask) Dispose() { -// d.file.Close() -//} -// -//type HelloWorld struct { -// DemoTask -//} -// -//func (h *HelloWorld) Run() (err error) { -// _, err = h.file.Write([]byte("world")) -// return nil -//} - -//type HelloWorld struct { -// Task -//} -// -//func (h *HelloWorld) Start() (err error) { -// fmt.Println("Hello World") -// return nil -//} diff --git a/pkg/task/work.go b/pkg/task/work.go deleted file mode 100644 index b4245da..0000000 --- a/pkg/task/work.go +++ /dev/null @@ -1,67 +0,0 @@ -package task - -type Work struct { - Job -} - -func (m *Work) keepalive() bool { - return true -} - -func (*Work) GetTaskType() TaskType { - return TASK_TYPE_Work -} - -type WorkCollection[K comparable, T interface { - ITask - GetKey() K -}] struct { - Work -} - -func (c *WorkCollection[K, T]) Find(f func(T) bool) (item T, ok bool) { - c.RangeSubTask(func(task ITask) bool { - if v, _ok := task.(T); _ok && f(v) { - item = v - ok = true - return false - } - return true - }) - return -} - -func (c *WorkCollection[K, T]) Get(key K) (item T, ok bool) { - var value any - value, ok = c.children.Load(key) - if ok { - item, ok = value.(T) - } - return -} - -func (c *WorkCollection[K, T]) Range(f func(T) bool) { - c.RangeSubTask(func(task ITask) bool { - if v, ok := task.(T); ok && !f(v) { - return false - } - return true - }) -} - -func (c *WorkCollection[K, T]) Has(key K) (ok bool) { - _, ok = c.children.Load(key) - return -} - -func (c *WorkCollection[K, T]) ToList() (list []T) { - c.Range(func(t T) bool { - list = append(list, t) - return true - }) - return -} - -func (c *WorkCollection[K, T]) Length() int { - return int(c.Size.Load()) -} diff --git a/pkg/track.go b/pkg/track.go index 39bb218..2bfb97b 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -7,9 +7,9 @@ import ( "reflect" "time" + "github.com/langhuihui/gotask" "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/pkg/task/manager.go b/pkg/util/Manager.go similarity index 78% rename from pkg/task/manager.go rename to pkg/util/Manager.go index 722e93e..c43981e 100644 --- a/pkg/task/manager.go +++ b/pkg/util/Manager.go @@ -1,27 +1,9 @@ -package task +package util import ( - "errors" - "fmt" - - . "m7s.live/v5/pkg/util" + . "github.com/langhuihui/gotask" ) -var ErrExist = errors.New("exist") - -type ExistTaskError struct { - Task ITask -} - -func (e ExistTaskError) Error() string { - return fmt.Sprintf("%v exist", e.Task.getKey()) -} - -type ManagerItem[K comparable] interface { - ITask - GetKey() K -} - type Manager[K comparable, T ManagerItem[K]] struct { Work Collection[K, T] @@ -29,8 +11,12 @@ type Manager[K comparable, T ManagerItem[K]] struct { func (m *Manager[K, T]) Add(ctx T, opt ...any) *Task { ctx.OnStart(func() { - if !m.Collection.AddUnique(ctx) { - ctx.Stop(ErrExist) + if old, ok := m.Get(ctx.GetKey()); !ok { + m.Add(ctx) + } else { + ctx.Stop(ExistTaskError{ + Task: old, + }) return } m.Debug("add", "key", ctx.GetKey(), "count", m.Length) diff --git a/plugin.go b/plugin.go index 77fc987..a350d56 100644 --- a/plugin.go +++ b/plugin.go @@ -21,8 +21,6 @@ import ( "gopkg.in/yaml.v3" - "m7s.live/v5/pkg/task" - "github.com/quic-go/quic-go" gatewayRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -30,6 +28,7 @@ import ( "google.golang.org/grpc" "gorm.io/gorm" + task "github.com/langhuihui/gotask" . "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/db" diff --git a/plugin/cascade/client.go b/plugin/cascade/client.go index 9e3c3ce..f95cfac 100644 --- a/plugin/cascade/client.go +++ b/plugin/cascade/client.go @@ -7,9 +7,9 @@ import ( "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" cascade "m7s.live/v5/plugin/cascade/pkg" + task "github.com/langhuihui/gotask" "github.com/quic-go/quic-go" ) diff --git a/plugin/cascade/pkg/quic-http.go b/plugin/cascade/pkg/quic-http.go index ff448a4..e28e06e 100644 --- a/plugin/cascade/pkg/quic-http.go +++ b/plugin/cascade/pkg/quic-http.go @@ -8,11 +8,11 @@ import ( "net/http" "strings" + task "github.com/langhuihui/gotask" flv "m7s.live/v5/plugin/flv/pkg" "github.com/quic-go/quic-go" "m7s.live/v5" - "m7s.live/v5/pkg/task" ) type RelayAPIConfig struct { diff --git a/plugin/cascade/server.go b/plugin/cascade/server.go index bf5d14b..e4db8d4 100644 --- a/plugin/cascade/server.go +++ b/plugin/cascade/server.go @@ -8,10 +8,10 @@ import ( "strings" "sync" + task "github.com/langhuihui/gotask" "google.golang.org/protobuf/types/known/timestamppb" "m7s.live/v5" "m7s.live/v5/pkg" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" "context" diff --git a/plugin/crontab/crontab.go b/plugin/crontab/crontab.go index 3faf983..9536ba6 100644 --- a/plugin/crontab/crontab.go +++ b/plugin/crontab/crontab.go @@ -8,7 +8,7 @@ import ( "strconv" "time" - "m7s.live/v5/pkg/task" + task "github.com/langhuihui/gotask" "m7s.live/v5/plugin/crontab/pkg" ) diff --git a/plugin/debug/chart.go b/plugin/debug/chart.go index 953d6f8..9c5062e 100644 --- a/plugin/debug/chart.go +++ b/plugin/debug/chart.go @@ -13,9 +13,9 @@ import ( "time" "github.com/gorilla/websocket" + task "github.com/langhuihui/gotask" "github.com/shirou/gopsutil/v4/cpu" "github.com/shirou/gopsutil/v4/process" - "m7s.live/v5/pkg/task" ) //go:embed static/* diff --git a/plugin/debug/index.go b/plugin/debug/index.go index 1231a2d..a6c8a37 100644 --- a/plugin/debug/index.go +++ b/plugin/debug/index.go @@ -21,10 +21,10 @@ import ( myproc "github.com/cloudwego/goref/pkg/proc" "github.com/go-delve/delve/pkg/config" "github.com/go-delve/delve/service/debugger" + task "github.com/langhuihui/gotask" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/debug/pb" debug "m7s.live/v5/plugin/debug/pkg" "m7s.live/v5/plugin/debug/pkg/profile" diff --git a/plugin/flv/pkg/pull-recorder.go b/plugin/flv/pkg/pull-recorder.go index 4578949..2bf66f7 100644 --- a/plugin/flv/pkg/pull-recorder.go +++ b/plugin/flv/pkg/pull-recorder.go @@ -8,10 +8,10 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" m7s "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" rtmp "m7s.live/v5/plugin/rtmp/pkg" ) diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index 820806b..8fa4256 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -8,11 +8,11 @@ import ( "path/filepath" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/storage" - "m7s.live/v5/pkg/task" rtmp "m7s.live/v5/plugin/rtmp/pkg" ) diff --git a/plugin/gb28181/alarmsub.go b/plugin/gb28181/alarmsub.go index eddd27a..fe11889 100644 --- a/plugin/gb28181/alarmsub.go +++ b/plugin/gb28181/alarmsub.go @@ -3,7 +3,7 @@ package plugin_gb28181pro import ( "time" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" ) // AlarmSubscribeTask 报警订阅任务 diff --git a/plugin/gb28181/catalogsub.go b/plugin/gb28181/catalogsub.go index 98226d7..08ee3a0 100644 --- a/plugin/gb28181/catalogsub.go +++ b/plugin/gb28181/catalogsub.go @@ -1,10 +1,10 @@ package plugin_gb28181pro import ( - "github.com/emiago/sipgo/sip" "time" - "m7s.live/v5/pkg/task" + "github.com/emiago/sipgo/sip" + "github.com/langhuihui/gotask" ) // CatalogSubscribeTask 目录订阅任务 diff --git a/plugin/gb28181/channel.go b/plugin/gb28181/channel.go index 36771e3..6fa7782 100644 --- a/plugin/gb28181/channel.go +++ b/plugin/gb28181/channel.go @@ -6,8 +6,8 @@ import ( "sync/atomic" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" gb28181 "m7s.live/v5/plugin/gb28181/pkg" ) diff --git a/plugin/gb28181/client.go b/plugin/gb28181/client.go index 7c43514..23a95f0 100644 --- a/plugin/gb28181/client.go +++ b/plugin/gb28181/client.go @@ -12,7 +12,7 @@ import ( "github.com/emiago/sipgo/sip" myip "github.com/husanpao/ip" "github.com/icholy/digest" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" gb28181 "m7s.live/v5/plugin/gb28181/pkg" ) @@ -53,7 +53,7 @@ func (c *Client) Start() (err error) { // Check if host is private/internal network IP //if util.IsPrivateIP(c.recipient.Host) { opts := &slog.HandlerOptions{ - Level: slog.LevelDebug, + Level: slog.LevelDebug, AddSource: true, } logHandler := slog.NewJSONHandler(os.Stdout, opts) diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index afd2b83..353ad02 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -16,7 +16,7 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" "m7s.live/v5/pkg/util" gb28181 "m7s.live/v5/plugin/gb28181/pkg" mrtp "m7s.live/v5/plugin/rtp/pkg" diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index 16bf91e..b41c008 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -11,9 +11,9 @@ import ( sipgo "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "github.com/langhuihui/gotask" m7s "m7s.live/v5" pkg "m7s.live/v5/pkg" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" gb28181 "m7s.live/v5/plugin/gb28181/pkg" mrtp "m7s.live/v5/plugin/rtp/pkg" diff --git a/plugin/gb28181/forwarddialog.go b/plugin/gb28181/forwarddialog.go index afc6432..fb0d4ad 100644 --- a/plugin/gb28181/forwarddialog.go +++ b/plugin/gb28181/forwarddialog.go @@ -8,8 +8,8 @@ import ( sipgo "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "github.com/langhuihui/gotask" m7s "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" gb28181 "m7s.live/v5/plugin/gb28181/pkg" mrtp "m7s.live/v5/plugin/rtp/pkg" diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index b63b9df..ce2ab84 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -17,9 +17,9 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + task "github.com/langhuihui/gotask" m7s "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" "m7s.live/v5/plugin/gb28181/pb" gb28181 "m7s.live/v5/plugin/gb28181/pkg" diff --git a/plugin/gb28181/pkg/forwarder.go b/plugin/gb28181/pkg/forwarder.go index 3714b2d..a65d223 100644 --- a/plugin/gb28181/pkg/forwarder.go +++ b/plugin/gb28181/pkg/forwarder.go @@ -27,8 +27,8 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "github.com/pion/rtp" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/gb28181/pkg/single_port.go b/plugin/gb28181/pkg/single_port.go index 811873b..a460fb2 100644 --- a/plugin/gb28181/pkg/single_port.go +++ b/plugin/gb28181/pkg/single_port.go @@ -6,8 +6,8 @@ import ( "io" "net" + task "github.com/langhuihui/gotask" "github.com/pion/rtp" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/gb28181/platform.go b/plugin/gb28181/platform.go index 2ea6201..28c5c78 100644 --- a/plugin/gb28181/platform.go +++ b/plugin/gb28181/platform.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/langhuihui/gotask" "golang.org/x/text/encoding/simplifiedchinese" "golang.org/x/text/transform" "m7s.live/v5" @@ -17,7 +18,6 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/icholy/digest" - "m7s.live/v5/pkg/task" gb28181 "m7s.live/v5/plugin/gb28181/pkg" ) diff --git a/plugin/gb28181/positionsub.go b/plugin/gb28181/positionsub.go index 1e75ddd..2b008f6 100644 --- a/plugin/gb28181/positionsub.go +++ b/plugin/gb28181/positionsub.go @@ -3,7 +3,7 @@ package plugin_gb28181pro import ( "time" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" ) // PositionSubscribeTask 位置订阅任务 diff --git a/plugin/gb28181/register.go b/plugin/gb28181/register.go index 1fe3259..86ec251 100644 --- a/plugin/gb28181/register.go +++ b/plugin/gb28181/register.go @@ -4,7 +4,7 @@ import ( "errors" "time" - "m7s.live/v5/pkg/task" + "github.com/langhuihui/gotask" ) type Register struct { diff --git a/plugin/gb28181/registerhandler.go b/plugin/gb28181/registerhandler.go index 35ec4d9..c129531 100644 --- a/plugin/gb28181/registerhandler.go +++ b/plugin/gb28181/registerhandler.go @@ -14,9 +14,9 @@ import ( "github.com/emiago/sipgo/sip" myip "github.com/husanpao/ip" "github.com/icholy/digest" + "github.com/langhuihui/gotask" "gorm.io/gorm" "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" mrtp "m7s.live/v5/plugin/rtp/pkg" ) diff --git a/plugin/hiksdk/client.go b/plugin/hiksdk/client.go index d0fd08e..3e0f3ea 100644 --- a/plugin/hiksdk/client.go +++ b/plugin/hiksdk/client.go @@ -1,9 +1,9 @@ package plugin_hiksdk import ( + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" ) const ( @@ -13,7 +13,7 @@ const ( type ClientPlugin struct { task.Job - conf *HikPlugin + conf *HikPlugin pullCtx m7s.PullJob pushCtx m7s.PushJob @@ -42,4 +42,4 @@ func NewPusher() m7s.IPusher { } client.SetDescription(task.OwnerTypeKey, "HikPusher") return client -} \ No newline at end of file +} diff --git a/plugin/hiksdk/device.go b/plugin/hiksdk/device.go index ac53044..69de9e7 100644 --- a/plugin/hiksdk/device.go +++ b/plugin/hiksdk/device.go @@ -4,9 +4,9 @@ import ( "fmt" "strings" - "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/hiksdk/pkg" + task "github.com/langhuihui/gotask" "github.com/prometheus/client_golang/prometheus" ) @@ -54,7 +54,6 @@ func (d *HikDevice) Run() (err error) { return } - func (d *HikDevice) AutoPullStream() { deviceInfo, _ := d.Device.GetDeiceInfo() // 获取设备参数 channelNames, _ := d.Device.GetChannelName() // 获取通道 diff --git a/plugin/hiksdk/pkg/transceiver.go b/plugin/hiksdk/pkg/transceiver.go index 91d59fd..a16fee6 100644 --- a/plugin/hiksdk/pkg/transceiver.go +++ b/plugin/hiksdk/pkg/transceiver.go @@ -4,8 +4,8 @@ import ( "fmt" "io" + task "github.com/langhuihui/gotask" mpegps "m7s.live/v5/pkg/format/ps" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/hls/index.go b/plugin/hls/index.go index 1d3de5c..d89e48c 100644 --- a/plugin/hls/index.go +++ b/plugin/hls/index.go @@ -14,9 +14,9 @@ import ( _ "embed" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" hls "m7s.live/v5/plugin/hls/pkg" ) diff --git a/plugin/hls/pkg/pull.go b/plugin/hls/pkg/pull.go index f1622b0..2cffa04 100644 --- a/plugin/hls/pkg/pull.go +++ b/plugin/hls/pkg/pull.go @@ -11,12 +11,12 @@ import ( "sync" "time" + task "github.com/langhuihui/gotask" "github.com/quangngotan95/go-m3u8/m3u8" "m7s.live/v5" pkg "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" mpegts "m7s.live/v5/pkg/format/ts" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index 174f9a5..2e0308e 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -13,13 +13,13 @@ import ( "time" "unsafe" + task "github.com/langhuihui/gotask" "github.com/mcuadros/go-defaults" "google.golang.org/protobuf/types/known/emptypb" m7s "m7s.live/v5" "m7s.live/v5/pb" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" mp4pb "m7s.live/v5/plugin/mp4/pb" mp4 "m7s.live/v5/plugin/mp4/pkg" diff --git a/plugin/mp4/exception.go b/plugin/mp4/exception.go index ac8f17b..7ccc80b 100644 --- a/plugin/mp4/exception.go +++ b/plugin/mp4/exception.go @@ -6,10 +6,10 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "github.com/shirou/gopsutil/v4/disk" "gorm.io/gorm" "m7s.live/v5" - "m7s.live/v5/pkg/task" ) // mysql数据库里Exception 定义异常结构体 diff --git a/plugin/mp4/pkg/pull-recorder.go b/plugin/mp4/pkg/pull-recorder.go index 1f99e84..a475484 100644 --- a/plugin/mp4/pkg/pull-recorder.go +++ b/plugin/mp4/pkg/pull-recorder.go @@ -4,11 +4,11 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" m7s "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" "m7s.live/v5/plugin/mp4/pkg/box" ) diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index 3f20618..c2c9836 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -8,12 +8,12 @@ import ( "path/filepath" "time" + task "github.com/langhuihui/gotask" m7s "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/storage" - "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/mp4/pkg/box" ) diff --git a/plugin/mp4/recovery.go b/plugin/mp4/recovery.go index 615b7f2..0af6e77 100644 --- a/plugin/mp4/recovery.go +++ b/plugin/mp4/recovery.go @@ -7,9 +7,9 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "gorm.io/gorm" "m7s.live/v5" - "m7s.live/v5/pkg/task" mp4 "m7s.live/v5/plugin/mp4/pkg" "m7s.live/v5/plugin/mp4/pkg/box" ) diff --git a/plugin/onvif/index.go b/plugin/onvif/index.go index 5b1b5ff..bd6382a 100755 --- a/plugin/onvif/index.go +++ b/plugin/onvif/index.go @@ -5,10 +5,10 @@ import ( "sync" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/util" m7s "m7s.live/v5" - "m7s.live/v5/pkg/task" ) const VIRTUAL_IFACE = "virtual" diff --git a/plugin/room/index.go b/plugin/room/index.go index 23bdd67..89fff2f 100644 --- a/plugin/room/index.go +++ b/plugin/room/index.go @@ -13,10 +13,10 @@ import ( "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" "github.com/google/uuid" + task "github.com/langhuihui/gotask" "m7s.live/v5" . "m7s.live/v5" "m7s.live/v5/pkg" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index e4e1384..11a61db 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -7,8 +7,8 @@ import ( "net" "strings" + task "github.com/langhuihui/gotask" "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/rtmp/pb" . "m7s.live/v5/plugin/rtmp/pkg" ) diff --git a/plugin/rtmp/pkg/client.go b/plugin/rtmp/pkg/client.go index 74e48c0..b0babbb 100644 --- a/plugin/rtmp/pkg/client.go +++ b/plugin/rtmp/pkg/client.go @@ -7,9 +7,9 @@ import ( "net/url" "strings" + task "github.com/langhuihui/gotask" pkg "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5" ) diff --git a/plugin/rtmp/pkg/net-connection.go b/plugin/rtmp/pkg/net-connection.go index a59658f..8cf6814 100644 --- a/plugin/rtmp/pkg/net-connection.go +++ b/plugin/rtmp/pkg/net-connection.go @@ -8,8 +8,8 @@ import ( "sync/atomic" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/rtp/pkg/transceiver.go b/plugin/rtp/pkg/transceiver.go index 9859973..9f22a98 100644 --- a/plugin/rtp/pkg/transceiver.go +++ b/plugin/rtp/pkg/transceiver.go @@ -7,9 +7,9 @@ import ( "net" "strings" + task "github.com/langhuihui/gotask" "github.com/pion/rtp" mpegps "m7s.live/v5/pkg/format/ps" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/rtsp/index.go b/plugin/rtsp/index.go index a447146..5f71f78 100644 --- a/plugin/rtsp/index.go +++ b/plugin/rtsp/index.go @@ -5,10 +5,9 @@ import ( "net" "strings" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/util" - "m7s.live/v5/pkg/task" - "m7s.live/v5" . "m7s.live/v5/plugin/rtsp/pkg" ) diff --git a/plugin/rtsp/pkg/client.go b/plugin/rtsp/pkg/client.go index 363e6c3..176caa9 100644 --- a/plugin/rtsp/pkg/client.go +++ b/plugin/rtsp/pkg/client.go @@ -1,8 +1,8 @@ package rtsp import ( + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5" pkg "m7s.live/v5/pkg" diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index 1c1e12c..627c2a9 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -12,8 +12,8 @@ import ( "time" "m7s.live/v5/pkg" - "m7s.live/v5/pkg/task" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg/util" ) diff --git a/plugin/snap/pkg/transform.go b/plugin/snap/pkg/transform.go index ed97085..8a77f64 100644 --- a/plugin/snap/pkg/transform.go +++ b/plugin/snap/pkg/transform.go @@ -10,12 +10,12 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/format" m7s "m7s.live/v5" - "m7s.live/v5/pkg/task" ) const ( diff --git a/plugin/srt/index.go b/plugin/srt/index.go index e1b82be..788a5a0 100644 --- a/plugin/srt/index.go +++ b/plugin/srt/index.go @@ -5,8 +5,8 @@ import ( "strings" srt "github.com/datarhei/gosrt" + task "github.com/langhuihui/gotask" "m7s.live/v5" - "m7s.live/v5/pkg/task" srt_pkg "m7s.live/v5/plugin/srt/pkg" ) diff --git a/plugin/srt/pkg/client.go b/plugin/srt/pkg/client.go index ee18ceb..5b974d1 100644 --- a/plugin/srt/pkg/client.go +++ b/plugin/srt/pkg/client.go @@ -4,10 +4,10 @@ import ( "net/url" srt "github.com/datarhei/gosrt" + "github.com/langhuihui/gotask" "m7s.live/v5" pkg "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" ) // Fixed steps for SRT pull workflow diff --git a/plugin/srt/pkg/receiver.go b/plugin/srt/pkg/receiver.go index 524058e..564be13 100644 --- a/plugin/srt/pkg/receiver.go +++ b/plugin/srt/pkg/receiver.go @@ -4,8 +4,8 @@ import ( "bytes" srt "github.com/datarhei/gosrt" + "github.com/langhuihui/gotask" mpegts "m7s.live/v5/pkg/format/ts" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/srt/pkg/sender.go b/plugin/srt/pkg/sender.go index 59d0a3c..349733e 100644 --- a/plugin/srt/pkg/sender.go +++ b/plugin/srt/pkg/sender.go @@ -2,11 +2,11 @@ package srt import ( srt "github.com/datarhei/gosrt" + "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/format" mpegts "m7s.live/v5/pkg/format/ts" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" hls "m7s.live/v5/plugin/hls/pkg" ) diff --git a/plugin/test/accept_push_task.go b/plugin/test/accept_push_task.go index daedb69..37c721c 100644 --- a/plugin/test/accept_push_task.go +++ b/plugin/test/accept_push_task.go @@ -7,7 +7,7 @@ import ( "os/exec" "strings" - "m7s.live/v5/pkg/task" + task "github.com/langhuihui/gotask" ) func init() { diff --git a/plugin/test/api.go b/plugin/test/api.go index eafb055..9e36ad9 100644 --- a/plugin/test/api.go +++ b/plugin/test/api.go @@ -14,10 +14,10 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" + task "github.com/langhuihui/gotask" "m7s.live/v5" pb "m7s.live/v5/pb" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" flv "m7s.live/v5/plugin/flv/pkg" hls "m7s.live/v5/plugin/hls/pkg" @@ -158,12 +158,15 @@ func (p *TestPlugin) pull(count int, url string, testMode int32, puller m7s.Pull if err = ctx.WaitStarted(); err != nil { return } - if p.pullers.AddUnique(ctx) { + if old, ok := p.pullers.Get(ctx.GetKey()); ok { + ctx.Stop(task.ExistTaskError{ + Task: old, + }) + } else { + p.pullers.Add(ctx) ctx.OnDispose(func() { p.pullers.Remove(ctx) }) - } else { - ctx.Stop(task.ErrExist) } } } else if count < i { @@ -185,12 +188,15 @@ func (p *TestPlugin) push(count int, streamPath, url string, pusher m7s.PusherFa if err = ctx.WaitStarted(); err != nil { return } - if p.pushers.AddUnique(ctx) { + if old, ok := p.pushers.Get(ctx.GetKey()); ok { + ctx.Stop(task.ExistTaskError{ + Task: old, + }) + } else { + p.pushers.Add(ctx) ctx.OnDispose(func() { p.pushers.Remove(ctx) }) - } else { - ctx.Stop(task.ErrExist) } } } else if count < i { diff --git a/plugin/test/index.go b/plugin/test/index.go index d462f84..d5d771b 100644 --- a/plugin/test/index.go +++ b/plugin/test/index.go @@ -10,8 +10,8 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" "m7s.live/v5/plugin/test/pb" ) diff --git a/plugin/test/read _task.go b/plugin/test/read _task.go index fc20875..6afde5b 100644 --- a/plugin/test/read _task.go +++ b/plugin/test/read _task.go @@ -3,9 +3,9 @@ package plugin_test import ( "fmt" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" flv "m7s.live/v5/plugin/flv/pkg" hls "m7s.live/v5/plugin/hls/pkg" mp4 "m7s.live/v5/plugin/mp4/pkg" diff --git a/plugin/test/snapshot_task.go b/plugin/test/snapshot_task.go index a391bb0..646c27f 100644 --- a/plugin/test/snapshot_task.go +++ b/plugin/test/snapshot_task.go @@ -6,10 +6,10 @@ import ( "reflect" "strings" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/format" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/plugin/test/write_task.go b/plugin/test/write_task.go index 34c0efa..f934c10 100644 --- a/plugin/test/write_task.go +++ b/plugin/test/write_task.go @@ -10,9 +10,9 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" flv "m7s.live/v5/plugin/flv/pkg" hls "m7s.live/v5/plugin/hls/pkg" diff --git a/plugin/transcode/pkg/transform.go b/plugin/transcode/pkg/transform.go index 3e06146..9b6f879 100644 --- a/plugin/transcode/pkg/transform.go +++ b/plugin/transcode/pkg/transform.go @@ -11,12 +11,12 @@ import ( "strings" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg" "m7s.live/v5/pkg/filerotate" m7s "m7s.live/v5" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" flv "m7s.live/v5/plugin/flv/pkg" ) diff --git a/plugin/webrtc/batchv2.go b/plugin/webrtc/batchv2.go index 523e716..ad72dc3 100644 --- a/plugin/webrtc/batchv2.go +++ b/plugin/webrtc/batchv2.go @@ -9,9 +9,9 @@ import ( "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" + task "github.com/langhuihui/gotask" . "github.com/pion/webrtc/v4" "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/task" . "m7s.live/v5/plugin/webrtc/pkg" ) diff --git a/plugin/webrtc/pkg/connection.go b/plugin/webrtc/pkg/connection.go index 10588ce..15c1ed0 100644 --- a/plugin/webrtc/pkg/connection.go +++ b/plugin/webrtc/pkg/connection.go @@ -7,11 +7,11 @@ import ( "strings" // Add this import "time" + task "github.com/langhuihui/gotask" "github.com/pion/rtcp" . "github.com/pion/webrtc/v4" "m7s.live/v5" "m7s.live/v5/pkg/codec" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" flv "m7s.live/v5/plugin/flv/pkg" mrtp "m7s.live/v5/plugin/rtp/pkg" diff --git a/pull_proxy.go b/pull_proxy.go index 1a7a5ad..7bac030 100644 --- a/pull_proxy.go +++ b/pull_proxy.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/langhuihui/gotask" "github.com/mcuadros/go-defaults" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/emptypb" @@ -18,7 +19,6 @@ import ( "m7s.live/v5/pb" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/puller.go b/puller.go index c535810..58f4fc7 100644 --- a/puller.go +++ b/puller.go @@ -12,10 +12,10 @@ import ( "time" "github.com/gorilla/websocket" + task "github.com/langhuihui/gotask" pkg "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/format" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" ) diff --git a/push_proxy.go b/push_proxy.go index 4cf5812..7b0bede 100644 --- a/push_proxy.go +++ b/push_proxy.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/langhuihui/gotask" "github.com/mcuadros/go-defaults" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -16,7 +17,6 @@ import ( "m7s.live/v5/pb" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" ) const ( diff --git a/pusher.go b/pusher.go index 895b4d5..aff716c 100644 --- a/pusher.go +++ b/pusher.go @@ -1,8 +1,7 @@ package m7s import ( - "m7s.live/v5/pkg/task" - + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/config" ) diff --git a/recoder.go b/recoder.go index 63189fc..9782078 100644 --- a/recoder.go +++ b/recoder.go @@ -6,10 +6,9 @@ import ( "gorm.io/gorm" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/storage" - - "m7s.live/v5/pkg/task" ) type ( diff --git a/server.go b/server.go index 8bd7630..1829098 100644 --- a/server.go +++ b/server.go @@ -18,8 +18,8 @@ import ( "github.com/shirou/gopsutil/v4/cpu" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" sysruntime "runtime" @@ -101,7 +101,7 @@ type ( ServerConfig Plugins util.Collection[string, *Plugin] - Streams task.Manager[string, *Publisher] + Streams util.Manager[string, *Publisher] AliasStreams util.Collection[string, *AliasStream] Waiting WaitManager Pulls task.WorkCollection[string, *PullJob] diff --git a/subscriber.go b/subscriber.go index 13f351b..3aa5554 100644 --- a/subscriber.go +++ b/subscriber.go @@ -13,8 +13,8 @@ import ( "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" - "m7s.live/v5/pkg/task" + task "github.com/langhuihui/gotask" . "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/util" diff --git a/test/server_test.go b/test/server_test.go index 98aed83..cff292f 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -5,9 +5,9 @@ import ( "testing" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5" "m7s.live/v5/pkg" - "m7s.live/v5/pkg/task" ) func TestRestart(b *testing.T) { diff --git a/transformer.go b/transformer.go index 71e3138..61a46ab 100644 --- a/transformer.go +++ b/transformer.go @@ -5,9 +5,9 @@ import ( "slices" "time" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" )