From e883bb94cdf85c95c54532228d26e4cd92a24d5f Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Fri, 23 Aug 2024 14:24:04 +0800 Subject: [PATCH] feat: complete task system and console --- api.go | 32 ++- pb/global.pb.go | 31 ++- pb/global.proto | 4 +- pkg/util/task-call.go | 22 ++ pkg/util/task-channel.go | 8 +- pkg/util/task-macro.go | 119 +++++----- pkg/util/task.go | 113 +++++++--- plugin.go | 5 +- plugin/console/web/index.html | 2 +- plugin/monitor/api.go | 51 ++++- plugin/monitor/index.go | 59 ++--- plugin/monitor/pb/monitor.pb.go | 312 ++++++++++++++++++++++----- plugin/monitor/pb/monitor.pb.gw.go | 70 ++++++ plugin/monitor/pb/monitor.proto | 23 +- plugin/monitor/pb/monitor_grpc.pb.go | 37 ++++ plugin/monitor/pkg/schema-session.go | 2 + plugin/monitor/pkg/schema-task.go | 1 + server.go | 22 +- 18 files changed, 674 insertions(+), 239 deletions(-) create mode 100644 pkg/util/task-call.go diff --git a/api.go b/api.go index 2e76493..bbc9172 100644 --- a/api.go +++ b/api.go @@ -145,27 +145,23 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res } func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) { - res = &pb.TaskTreeResponse{Id: s.ID, Type: s.GetTaskType(), Owner: s.GetOwnerType(), StartTime: timestamppb.New(s.StartTime), Description: maps.Collect(func(yield func(key, value string) bool) { - for k, v := range s.Description { - yield(k, fmt.Sprintf("%v", v)) - } - })} - var fillData func(m util.IMarcoTask, res *pb.TaskTreeResponse) - fillData = func(m util.IMarcoTask, res *pb.TaskTreeResponse) { - for task := range m.RangeSubTask { - t := task.GetTask() - child := &pb.TaskTreeResponse{Id: t.ID, Type: task.GetTaskType(), Owner: task.GetOwnerType(), StartTime: timestamppb.New(t.StartTime), Description: maps.Collect(func(yield func(key, value string) bool) { - for k, v := range t.Description { - yield(k, fmt.Sprintf("%v", v)) - } - })} - if marcoTask, ok := task.(util.IMarcoTask); ok { - fillData(marcoTask, child) + var fillData func(m util.IMarcoTask) *pb.TaskTreeResponse + fillData = func(m util.IMarcoTask) (res *pb.TaskTreeResponse) { + res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Blocked: m.Blocked(), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) { + for k, v := range m.GetTask().Description { + yield(k, fmt.Sprintf("%v", v)) + } + })} + for task := range m.RangeSubTask { + if marcoTask, ok := task.(util.IMarcoTask); ok { + res.Children = append(res.Children, fillData(marcoTask)) + } else { + res.Children = append(res.Children, &pb.TaskTreeResponse{Id: task.GetTaskID(), State: uint32(task.GetState()), Type: uint32(task.GetTaskType()), Owner: task.GetOwnerType(), StartTime: timestamppb.New(task.GetTask().StartTime)}) } - res.Children = append(res.Children, child) } + return } - fillData(s, res) + res = fillData(&util.RootTask) return } diff --git a/pb/global.pb.go b/pb/global.pb.go index 6e5d654..9137d34 100644 --- a/pb/global.pb.go +++ b/pb/global.pb.go @@ -748,11 +748,13 @@ type TaskTreeResponse struct { unknownFields protoimpl.UnknownFields Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Type uint32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` Owner string `protobuf:"bytes,3,opt,name=owner,proto3" json:"owner,omitempty"` StartTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=startTime,proto3" json:"startTime,omitempty"` Description map[string]string `protobuf:"bytes,5,rep,name=description,proto3" json:"description,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Children []*TaskTreeResponse `protobuf:"bytes,6,rep,name=children,proto3" json:"children,omitempty"` + State uint32 `protobuf:"varint,7,opt,name=state,proto3" json:"state,omitempty"` + Blocked bool `protobuf:"varint,8,opt,name=blocked,proto3" json:"blocked,omitempty"` } func (x *TaskTreeResponse) Reset() { @@ -794,11 +796,11 @@ func (x *TaskTreeResponse) GetId() uint32 { return 0 } -func (x *TaskTreeResponse) GetType() string { +func (x *TaskTreeResponse) GetType() uint32 { if x != nil { return x.Type } - return "" + return 0 } func (x *TaskTreeResponse) GetOwner() string { @@ -829,6 +831,20 @@ func (x *TaskTreeResponse) GetChildren() []*TaskTreeResponse { return nil } +func (x *TaskTreeResponse) GetState() uint32 { + if x != nil { + return x.State + } + return 0 +} + +func (x *TaskTreeResponse) GetBlocked() bool { + if x != nil { + return x.Blocked + } + return false +} + type StreamListRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2229,10 +2245,10 @@ var file_global_proto_rawDesc = []byte{ 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x70, 0x75, 0x73, 0x12, 0x29, 0x0a, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, - 0x52, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x22, 0xc3, 0x02, 0x0a, 0x10, 0x54, 0x61, + 0x52, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x22, 0xf3, 0x02, 0x0a, 0x10, 0x54, 0x61, 0x73, 0x6b, 0x54, 0x72, 0x65, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, - 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, + 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, @@ -2245,7 +2261,10 @@ var file_global_proto_rawDesc = []byte{ 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x31, 0x0a, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x54, 0x72, 0x65, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x1a, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x12, + 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, + 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x1a, 0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, diff --git a/pb/global.proto b/pb/global.proto index 76b8c1a..e9bcea9 100644 --- a/pb/global.proto +++ b/pb/global.proto @@ -166,11 +166,13 @@ message SysInfoResponse { message TaskTreeResponse { uint32 id = 1; - string type = 2; + uint32 type = 2; string owner = 3; google.protobuf.Timestamp startTime = 4; map description = 5; repeated TaskTreeResponse children = 6; + uint32 state = 7; + bool blocked = 8; } message StreamListRequest { diff --git a/pkg/util/task-call.go b/pkg/util/task-call.go new file mode 100644 index 0000000..b12d0f9 --- /dev/null +++ b/pkg/util/task-call.go @@ -0,0 +1,22 @@ +package util + +type CallBackTask struct { + Task +} + +func (t *CallBackTask) GetTaskType() TaskType { + return TASK_TYPE_CALL +} + +func CreateTaskByCallBack(start func() error, dispose func()) ITask { + var task CallBackTask + task.startHandler = func() error { + err := start() + if err == nil && dispose == nil { + err = ErrTaskComplete + } + return err + } + task.disposeHandler = dispose + return &task +} diff --git a/pkg/util/task-channel.go b/pkg/util/task-channel.go index dfb98f3..8aae8eb 100644 --- a/pkg/util/task-channel.go +++ b/pkg/util/task-channel.go @@ -9,12 +9,8 @@ type ChannelTask struct { SignalChan any } -func (*ChannelTask) GetTaskType() string { - return "channel" -} - -func (*ChannelTask) GetTaskTypeID() byte { - return 3 +func (*ChannelTask) GetTaskType() TaskType { + return TASK_TYPE_CHANNEL } func (t *ChannelTask) GetSignal() any { diff --git a/pkg/util/task-macro.go b/pkg/util/task-macro.go index 30ed127..cb13296 100644 --- a/pkg/util/task-macro.go +++ b/pkg/util/task-macro.go @@ -21,7 +21,7 @@ var RootTask MarcoLongTask func init() { RootTask.initTask(context.Background(), &RootTask) RootTask.Description = map[string]any{ - "title": "RootTask", + "ownerType": "root", } RootTask.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) } @@ -40,55 +40,56 @@ func (m *MarcoLongTask) initTask(ctx context.Context, task ITask) { m.keepAlive = true } -func (*MarcoLongTask) GetTaskType() string { - return "long" -} -func (*MarcoLongTask) GetTaskTypeID() byte { - return 2 +func (*MarcoLongTask) GetTaskType() TaskType { + return TASK_TYPE_LONG_MACRO } // MarcoTask include sub tasks type MarcoTask struct { Task - addSub chan ITask - children []ITask - lazyRun sync.Once - keepAlive bool - addListeners []func(task ITask) + addSub chan ITask + children []ITask + lazyRun sync.Once + keepAlive bool + childrenDisposed chan struct{} + childDisposeListeners []func(ITask) + blocked bool } -func (*MarcoTask) GetTaskType() string { - return "marco" +func (*MarcoTask) GetTaskType() TaskType { + return TASK_TYPE_MACRO } -func (*MarcoTask) GetTaskTypeID() byte { - return 1 +func (mt *MarcoTask) Blocked() bool { + return mt.blocked } -func (mt *MarcoTask) getMaroTask() *MarcoTask { - return mt +func (mt *MarcoTask) waitChildrenDispose() { + close(mt.addSub) + <-mt.childrenDisposed } -func (mt *MarcoTask) initTask(ctx context.Context, task ITask) { - mt.Task.initTask(ctx, task) - mt.shutdown = nil - mt.addSub = make(chan ITask, 10) +func (mt *MarcoTask) OnChildDispose(listener func(ITask)) { + mt.childDisposeListeners = append(mt.childDisposeListeners, listener) +} + +func (mt *MarcoTask) onChildDispose(child ITask) { + for _, listener := range mt.childDisposeListeners { + listener(child) + } + if mt.parent != nil { + mt.parent.onChildDispose(child) + } + if child.getParent() == mt { + child.dispose() + } } func (mt *MarcoTask) dispose() { - reason := mt.StopReason() - if mt.Logger != nil { - mt.Debug("task dispose", "reason", reason, "taskId", mt.ID, "taskType", mt.GetTaskType(), "ownerType", mt.GetOwnerType()) - } - mt.disposeHandler() - close(mt.addSub) - _ = mt.WaitStopped() - if mt.Logger != nil { - mt.Debug("task disposed", "reason", reason, "taskId", mt.ID, "taskType", mt.GetTaskType(), "ownerType", mt.GetOwnerType()) - } - for _, listener := range mt.afterDisposeListeners { - listener() + if mt.childrenDisposed != nil { + mt.OnBeforeDispose(mt.waitChildrenDispose) } + mt.Task.dispose() } func (mt *MarcoTask) lazyStart(t ITask) { @@ -102,6 +103,7 @@ func (mt *MarcoTask) lazyStart(t ITask) { } if task.parent == nil { task.parent = mt + task.level = mt.level + 1 } if task.Logger == nil { task.Logger = mt.Logger @@ -113,7 +115,8 @@ func (mt *MarcoTask) lazyStart(t ITask) { task.disposeHandler = EmptyDispose } mt.lazyRun.Do(func() { - mt.shutdown = NewPromise(context.Background()) + mt.childrenDisposed = make(chan struct{}) + mt.addSub = make(chan ITask, 10) go mt.run() }) mt.addSub <- t @@ -129,10 +132,6 @@ func (mt *MarcoTask) AddTask(task ITask) *Task { return mt.AddTaskWithContext(mt.Context, task) } -func (mt *MarcoTask) OnTaskAdded(f func(ITask)) { - mt.addListeners = append(mt.addListeners, f) -} - func (mt *MarcoTask) AddTaskWithContext(ctx context.Context, t ITask) (task *Task) { if ctx == nil && mt.Context == nil { panic("context is nil") @@ -153,32 +152,13 @@ func (mt *MarcoTask) Post(callback func() error) *Task { return mt.AddTask(task) } -type CallBackTask struct { - Task -} - -func CreateTaskByCallBack(start func() error, dispose func()) ITask { - var task CallBackTask - task.startHandler = func() error { - err := start() - if err == nil && dispose == nil { - err = ErrTaskComplete - } - return err - } - task.disposeHandler = dispose - return &task -} - func (mt *MarcoTask) addChild(task ITask) int { mt.children = append(mt.children, task) - for _, listener := range mt.addListeners { - listener(task) - } return len(mt.children) - 1 } func (mt *MarcoTask) removeChild(index int) { + mt.onChildDispose(mt.children[index]) mt.children = slices.Delete(mt.children, index, index+1) } @@ -192,16 +172,15 @@ func (mt *MarcoTask) run() { stopReason := mt.StopReason() for _, task := range mt.children { task.Stop(stopReason) - if task.getParent() == mt { - task.dispose() - } + mt.onChildDispose(task) } mt.children = nil - mt.addSub = nil - mt.shutdown.Fulfill(stopReason) + close(mt.childrenDisposed) }() for { + mt.blocked = false if chosen, rev, ok := reflect.Select(cases); chosen == 0 { + mt.blocked = true if !ok { return } @@ -210,25 +189,23 @@ func (mt *MarcoTask) run() { if err := task.start(); err == nil { cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())}) } else { - mt.removeChild(index) task.Stop(err) + mt.removeChild(index) } } else { - mt.children = append(mt.children, task) + mt.addChild(task) cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())}) } } else { taskIndex := chosen - 1 task := mt.children[taskIndex] + switch tt := task.(type) { + case IChannelTask: + tt.Tick(rev.Interface()) + } if !ok { - if task.getParent() == mt { - task.dispose() - } mt.removeChild(taskIndex) cases = slices.Delete(cases, chosen, chosen+1) - - } else if c, ok := task.(IChannelTask); ok { - c.Tick(rev.Interface()) } } if !mt.keepAlive && len(mt.children) == 0 { diff --git a/pkg/util/task.go b/pkg/util/task.go index a94849e..aace860 100644 --- a/pkg/util/task.go +++ b/pkg/util/task.go @@ -7,6 +7,7 @@ import ( "log/slog" "reflect" "runtime/debug" + "strings" "time" ) @@ -21,28 +22,55 @@ var ( EmptyDispose = func() {} ) +const ( + TASK_STATE_INIT TaskState = iota + TASK_STATE_STARTING + TASK_STATE_STARTED + TASK_STATE_RUNNING + TASK_STATE_GOING + TASK_STATE_DISPOSING + TASK_STATE_DISPOSED +) + +const ( + TASK_TYPE_BASE TaskType = iota + TASK_TYPE_MACRO + TASK_TYPE_LONG_MACRO + TASK_TYPE_CHANNEL + TASK_TYPE_CALL +) + type ( - ITask interface { + TaskState byte + TaskType byte + ITask interface { initTask(context.Context, ITask) getParent() *MarcoTask + GetParent() ITask GetTask() *Task + GetTaskID() uint32 GetSignal() any Stop(error) StopReason() error start() error dispose() IsStopped() bool - GetTaskType() string - GetTaskTypeID() byte + GetTaskType() TaskType GetOwnerType() string SetRetry(maxRetry int, retryInterval time.Duration) OnStart(func()) + OnBeforeDispose(func()) OnDispose(func()) + GetState() TaskState + GetLevel() byte } IMarcoTask interface { ITask RangeSubTask(func(yield ITask) bool) - OnTaskAdded(func(ITask)) + OnChildDispose(func(ITask)) + Blocked() bool + Call(func() error) + Post(func() error) *Task } IChannelTask interface { Tick(any) @@ -70,34 +98,51 @@ type ( *slog.Logger context.Context context.CancelCauseFunc - handler ITask - retry RetryConfig - startHandler func() error - afterStartListeners, afterDisposeListeners []func() - disposeHandler func() - Description map[string]any - startup, shutdown *Promise - parent *MarcoTask - parentCtx context.Context - needRetry bool + handler ITask + retry RetryConfig + startHandler func() error + afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func() + disposeHandler func() + Description map[string]any + startup, shutdown *Promise + parent *MarcoTask + parentCtx context.Context + needRetry bool + state TaskState + level byte } ) +func (task *Task) GetState() TaskState { + return task.state +} +func (task *Task) GetLevel() byte { + return task.level +} +func (task *Task) GetParent() ITask { + if task.parent != nil { + return task.parent.handler + } + return nil +} func (task *Task) SetRetry(maxRetry int, retryInterval time.Duration) { task.retry.MaxRetry = maxRetry task.retry.RetryInterval = retryInterval } - +func (task *Task) GetTaskID() uint32 { + return task.ID +} func (task *Task) GetOwnerType() string { - return reflect.TypeOf(task.handler).Elem().Name() + if task.Description != nil { + if ownerType, ok := task.Description["ownerType"]; ok { + return ownerType.(string) + } + } + return strings.TrimSuffix(reflect.TypeOf(task.handler).Elem().Name(), "Task") } -func (*Task) GetTaskType() string { - return "base" -} - -func (*Task) GetTaskTypeID() byte { - return 0 +func (*Task) GetTaskType() TaskType { + return TASK_TYPE_BASE } func (task *Task) GetTask() *Task { @@ -117,13 +162,13 @@ func (task *Task) WaitStarted() error { } func (task *Task) WaitStopped() (err error) { - err = task.WaitStarted() + err = task.startup.Await() if err != nil { return err } - if task.shutdown == nil { - return task.StopReason() - } + //if task.shutdown == nil { + // return task.StopReason() + //} return task.shutdown.Await() } @@ -159,6 +204,10 @@ func (task *Task) OnStart(listener func()) { task.afterStartListeners = append(task.afterStartListeners, listener) } +func (task *Task) OnBeforeDispose(listener func()) { + task.beforeDisposeListeners = append(task.beforeDisposeListeners, listener) +} + func (task *Task) OnDispose(listener func()) { task.afterDisposeListeners = append(task.afterDisposeListeners, listener) } @@ -203,11 +252,14 @@ func (task *Task) start() (err error) { } hasRun := false for { + task.state = TASK_STATE_STARTING err = task.startHandler() + task.state = TASK_STATE_STARTED if err == nil { task.ResetRetryCount() if runHandler, ok := task.handler.(TaskBlock); ok { hasRun = true + task.state = TASK_STATE_RUNNING err = runHandler.Run() if err == nil { err = ErrTaskComplete @@ -239,21 +291,30 @@ func (task *Task) start() (err error) { listener() } if goHandler, ok := task.handler.(TaskGo); ok { + task.state = TASK_STATE_GOING go task.run(goHandler.Go) } return } func (task *Task) dispose() { + task.state = TASK_STATE_DISPOSING reason := task.StopReason() if task.Logger != nil { task.Debug("task dispose", "reason", reason, "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) } + for _, listener := range task.beforeDisposeListeners { + listener() + } task.disposeHandler() task.shutdown.Fulfill(reason) for _, listener := range task.afterDisposeListeners { listener() } + task.state = TASK_STATE_DISPOSED + if task.Logger != nil { + task.Debug("task disposed", "reason", reason, "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) + } if !errors.Is(reason, ErrTaskComplete) && task.needRetry { task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx) task.startup = NewPromise(task.Context) diff --git a/plugin.go b/plugin.go index e3655c2..3807560 100644 --- a/plugin.go +++ b/plugin.go @@ -110,7 +110,10 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin) } } p.Config.ParseUserFile(userConfig) - p.Description = map[string]any{"version": plugin.Version, "userConfig": userConfig} + p.Description = map[string]any{"version": plugin.Version} + if userConfig != nil { + p.Description["userConfig"] = userConfig + } finalConfig, _ := yaml.Marshal(p.Config.GetMap()) p.Logger.Handler().(*MultiLogHandler).SetLevel(ParseLevel(p.config.LogLevel)) p.Debug("config", "detail", string(finalConfig)) diff --git a/plugin/console/web/index.html b/plugin/console/web/index.html index 203b951..de4595b 100644 --- a/plugin/console/web/index.html +++ b/plugin/console/web/index.html @@ -1 +1 @@ -Monibuca 实例管理平台
\ No newline at end of file +Monibuca 实例管理平台
\ No newline at end of file diff --git a/plugin/monitor/api.go b/plugin/monitor/api.go index b17927e..9440be6 100644 --- a/plugin/monitor/api.go +++ b/plugin/monitor/api.go @@ -2,6 +2,8 @@ package plugin_monitor import ( "context" + "errors" + "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" "m7s.live/m7s/v5/plugin/monitor/pb" monitor "m7s.live/m7s/v5/plugin/monitor/pkg" @@ -9,22 +11,51 @@ import ( ) func (cfg *MonitorPlugin) SearchTask(ctx context.Context, req *pb.SearchTaskRequest) (res *pb.SearchTaskResponse, err error) { + if cfg.DB == nil { + return nil, errors.New("database is not initialized") + } res = &pb.SearchTaskResponse{} var tasks []*monitor.Task tx := cfg.DB.Find(&tasks) if err = tx.Error; err == nil { res.Data = slices.Collect(func(yield func(*pb.Task) bool) { for _, t := range tasks { - yield(&pb.Task{ - Id: t.TaskID, - StartTime: timestamppb.New(t.StartTime), - EndTime: timestamppb.New(t.EndTime), - Owner: t.OwnerType, - Type: uint32(t.TaskType), - Description: t.Description, - Reason: t.Reason, - SessionId: t.SessionID, - ParentId: t.ParentID, + if t.SessionID == req.SessionId { + yield(&pb.Task{ + Id: t.TaskID, + StartTime: timestamppb.New(t.StartTime), + EndTime: timestamppb.New(t.EndTime), + Owner: t.OwnerType, + Type: uint32(t.TaskType), + Description: t.Description, + Reason: t.Reason, + SessionId: t.SessionID, + ParentId: t.ParentID, + }) + } + } + }) + } + return +} + +func (cfg *MonitorPlugin) SessionList(context.Context, *emptypb.Empty) (res *pb.SessionListResponse, err error) { + if cfg.DB == nil { + return nil, errors.New("database is not initialized") + } + res = &pb.SessionListResponse{} + var sessions []*monitor.Session + tx := cfg.DB.Find(&sessions) + err = tx.Error + if err == nil { + res.Data = slices.Collect(func(yield func(*pb.Session) bool) { + for _, s := range sessions { + yield(&pb.Session{ + Id: s.ID, + Pid: uint32(s.PID), + Args: s.Args, + StartTime: timestamppb.New(s.StartTime), + EndTime: timestamppb.New(s.EndTime), }) } }) diff --git a/plugin/monitor/index.go b/plugin/monitor/index.go index 17681ef..8f4e868 100644 --- a/plugin/monitor/index.go +++ b/plugin/monitor/index.go @@ -6,6 +6,8 @@ import ( "m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/plugin/monitor/pb" monitor "m7s.live/m7s/v5/plugin/monitor/pkg" + "os" + "strings" "time" ) @@ -25,49 +27,27 @@ func (cfg *MonitorPlugin) OnStop() { } } -func (cfg *MonitorPlugin) taskDisposeListener(task util.ITask, mt util.IMarcoTask) func() { - return func() { - var th monitor.Task - th.SessionID = cfg.session.ID - th.TaskID = task.GetTask().ID - th.ParentID = mt.GetTask().ID - th.StartTime = task.GetTask().StartTime - th.EndTime = time.Now() - th.OwnerType = task.GetOwnerType() - th.TaskType = task.GetTaskTypeID() - th.Reason = task.StopReason().Error() - b, _ := json.Marshal(task.GetTask().Description) - th.Description = string(b) - cfg.DB.Create(&th) - } +func (cfg *MonitorPlugin) saveTask(task util.ITask) { + var th monitor.Task + th.SessionID = cfg.session.ID + th.TaskID = task.GetTaskID() + th.ParentID = task.GetParent().GetTaskID() + th.StartTime = task.GetTask().StartTime + th.EndTime = time.Now() + th.OwnerType = task.GetOwnerType() + th.TaskType = byte(task.GetTaskType()) + th.Reason = task.StopReason().Error() + th.Level = task.GetLevel() + b, _ := json.Marshal(task.GetTask().Description) + th.Description = string(b) + cfg.DB.Create(&th) } -func (cfg *MonitorPlugin) monitorTask(mt util.IMarcoTask) { - mt.OnTaskAdded(func(task util.ITask) { - task.GetTask().OnDispose(cfg.taskDisposeListener(task, mt)) - }) - for t := range mt.RangeSubTask { - t.OnDispose(cfg.taskDisposeListener(t, mt)) - if mt, ok := t.(util.IMarcoTask); ok { - cfg.monitorTask(mt) - } - } -} - -//func (cfg *MonitorPlugin) saveUnDisposeTask(mt util.IMarcoTask) { -// for t := range mt.RangeSubTask { -// cfg.taskDisposeListener(t, mt)() -// if mt, ok := t.(util.IMarcoTask); ok { -// cfg.saveUnDisposeTask(mt) -// } -// } -//} - func (cfg *MonitorPlugin) OnInit() (err error) { //cfg.columnstore, err = frostdb.New() //database, _ := cfg.columnstore.DB(cfg, "monitor") if cfg.DB != nil { - session := &monitor.Session{StartTime: time.Now()} + session := &monitor.Session{StartTime: time.Now(), PID: os.Getpid(), Args: strings.Join(os.Args, " ")} err = cfg.DB.AutoMigrate(session) if err != nil { return err @@ -82,7 +62,10 @@ func (cfg *MonitorPlugin) OnInit() (err error) { if err != nil { return err } - cfg.monitorTask(cfg.Plugin.Server) + cfg.Plugin.Server.OnBeforeDispose(func() { + cfg.saveTask(cfg.Plugin.Server) + }) + cfg.Plugin.Server.OnChildDispose(cfg.saveTask) } return } diff --git a/plugin/monitor/pb/monitor.pb.go b/plugin/monitor/pb/monitor.pb.go index 3b98913..86067c4 100644 --- a/plugin/monitor/pb/monitor.pb.go +++ b/plugin/monitor/pb/monitor.pb.go @@ -10,6 +10,7 @@ import ( _ "google.golang.org/genproto/googleapis/api/annotations" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" @@ -26,6 +27,8 @@ type SearchTaskRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + SessionId uint32 `protobuf:"varint,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` } func (x *SearchTaskRequest) Reset() { @@ -60,6 +63,13 @@ func (*SearchTaskRequest) Descriptor() ([]byte, []int) { return file_monitor_proto_rawDescGZIP(), []int{0} } +func (x *SearchTaskRequest) GetSessionId() uint32 { + if x != nil { + return x.SessionId + } + return 0 +} + type Task struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -234,51 +244,221 @@ func (x *SearchTaskResponse) GetData() []*Task { return nil } +type Session struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Pid uint32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + Args string `protobuf:"bytes,3,opt,name=args,proto3" json:"args,omitempty"` + StartTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=startTime,proto3" json:"startTime,omitempty"` + EndTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=endTime,proto3" json:"endTime,omitempty"` +} + +func (x *Session) Reset() { + *x = Session{} + if protoimpl.UnsafeEnabled { + mi := &file_monitor_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Session) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Session) ProtoMessage() {} + +func (x *Session) ProtoReflect() protoreflect.Message { + mi := &file_monitor_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Session.ProtoReflect.Descriptor instead. +func (*Session) Descriptor() ([]byte, []int) { + return file_monitor_proto_rawDescGZIP(), []int{3} +} + +func (x *Session) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *Session) GetPid() uint32 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *Session) GetArgs() string { + if x != nil { + return x.Args + } + return "" +} + +func (x *Session) GetStartTime() *timestamppb.Timestamp { + if x != nil { + return x.StartTime + } + return nil +} + +func (x *Session) GetEndTime() *timestamppb.Timestamp { + if x != nil { + return x.EndTime + } + return nil +} + +type SessionListResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + Data []*Session `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"` +} + +func (x *SessionListResponse) Reset() { + *x = SessionListResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_monitor_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SessionListResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SessionListResponse) ProtoMessage() {} + +func (x *SessionListResponse) ProtoReflect() protoreflect.Message { + mi := &file_monitor_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SessionListResponse.ProtoReflect.Descriptor instead. +func (*SessionListResponse) Descriptor() ([]byte, []int) { + return file_monitor_proto_rawDescGZIP(), []int{4} +} + +func (x *SessionListResponse) GetCode() uint32 { + if x != nil { + return x.Code + } + return 0 +} + +func (x *SessionListResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *SessionListResponse) GetData() []*Session { + if x != nil { + return x.Data + } + return nil +} + var File_monitor_proto protoreflect.FileDescriptor var file_monitor_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x65, 0x61, 0x72, 0x63, - 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xa4, 0x02, 0x0a, - 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, - 0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, - 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x31, 0x0a, 0x11, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xa4, 0x02, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, + 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, + 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, + 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, + 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x09, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x65, + 0x0a, 0x12, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x0d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0xaf, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, + 0x70, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, - 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, - 0x74, 0x49, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, - 0x74, 0x49, 0x64, 0x22, 0x65, 0x0a, 0x12, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, - 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, - 0x54, 0x61, 0x73, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x71, 0x0a, 0x03, 0x61, 0x70, - 0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, - 0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, - 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, - 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, - 0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, - 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x3a, 0x01, 0x2a, 0x42, 0x23, 0x5a, - 0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, - 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, - 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, + 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, + 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x69, 0x0a, 0x13, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, + 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, + 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x24, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6d, 0x6f, 0x6e, + 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x32, 0xd9, 0x01, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, + 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, + 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, + 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, + 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, + 0x61, 0x73, 0x6b, 0x3a, 0x01, 0x2a, 0x12, 0x66, 0x0a, 0x0b, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, + 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4c, + 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x21, 0x82, 0xd3, 0xe4, + 0x93, 0x02, 0x1b, 0x12, 0x19, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, + 0x69, 0x2f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x2f, 0x6c, 0x69, 0x73, 0x74, 0x42, 0x23, + 0x5a, 0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, + 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, + 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -293,24 +473,32 @@ func file_monitor_proto_rawDescGZIP() []byte { return file_monitor_proto_rawDescData } -var file_monitor_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_monitor_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_monitor_proto_goTypes = []interface{}{ (*SearchTaskRequest)(nil), // 0: monitor.SearchTaskRequest (*Task)(nil), // 1: monitor.Task (*SearchTaskResponse)(nil), // 2: monitor.SearchTaskResponse - (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp + (*Session)(nil), // 3: monitor.Session + (*SessionListResponse)(nil), // 4: monitor.SessionListResponse + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty } var file_monitor_proto_depIdxs = []int32{ - 3, // 0: monitor.Task.startTime:type_name -> google.protobuf.Timestamp - 3, // 1: monitor.Task.endTime:type_name -> google.protobuf.Timestamp + 5, // 0: monitor.Task.startTime:type_name -> google.protobuf.Timestamp + 5, // 1: monitor.Task.endTime:type_name -> google.protobuf.Timestamp 1, // 2: monitor.SearchTaskResponse.data:type_name -> monitor.Task - 0, // 3: monitor.api.SearchTask:input_type -> monitor.SearchTaskRequest - 2, // 4: monitor.api.SearchTask:output_type -> monitor.SearchTaskResponse - 4, // [4:5] is the sub-list for method output_type - 3, // [3:4] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 5, // 3: monitor.Session.startTime:type_name -> google.protobuf.Timestamp + 5, // 4: monitor.Session.endTime:type_name -> google.protobuf.Timestamp + 3, // 5: monitor.SessionListResponse.data:type_name -> monitor.Session + 0, // 6: monitor.api.SearchTask:input_type -> monitor.SearchTaskRequest + 6, // 7: monitor.api.SessionList:input_type -> google.protobuf.Empty + 2, // 8: monitor.api.SearchTask:output_type -> monitor.SearchTaskResponse + 4, // 9: monitor.api.SessionList:output_type -> monitor.SessionListResponse + 8, // [8:10] is the sub-list for method output_type + 6, // [6:8] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_monitor_proto_init() } @@ -355,6 +543,30 @@ func file_monitor_proto_init() { return nil } } + file_monitor_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Session); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_monitor_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionListResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -362,7 +574,7 @@ func file_monitor_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_monitor_proto_rawDesc, NumEnums: 0, - NumMessages: 3, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, diff --git a/plugin/monitor/pb/monitor.pb.gw.go b/plugin/monitor/pb/monitor.pb.gw.go index 4bf83a1..ccfed70 100644 --- a/plugin/monitor/pb/monitor.pb.gw.go +++ b/plugin/monitor/pb/monitor.pb.gw.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" ) // Suppress "imported and not used" errors @@ -57,6 +58,24 @@ func local_request_Api_SearchTask_0(ctx context.Context, marshaler runtime.Marsh } +func request_Api_SessionList_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := client.SessionList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Api_SessionList_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := server.SessionList(ctx, &protoReq) + return msg, metadata, err + +} + // RegisterApiHandlerServer registers the http handlers for service Api to "mux". // UnaryRPC :call ApiServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. @@ -88,6 +107,31 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server }) + mux.Handle("GET", pattern_Api_SessionList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/monitor.Api/SessionList", runtime.WithHTTPPathPattern("/monitor/api/session/list")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Api_SessionList_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Api_SessionList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -151,13 +195,39 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client }) + mux.Handle("GET", pattern_Api_SessionList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/monitor.Api/SessionList", runtime.WithHTTPPathPattern("/monitor/api/session/list")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Api_SessionList_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Api_SessionList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } var ( pattern_Api_SearchTask_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"monitor", "api", "search", "task"}, "")) + + pattern_Api_SessionList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"monitor", "api", "session", "list"}, "")) ) var ( forward_Api_SearchTask_0 = runtime.ForwardResponseMessage + + forward_Api_SessionList_0 = runtime.ForwardResponseMessage ) diff --git a/plugin/monitor/pb/monitor.proto b/plugin/monitor/pb/monitor.proto index f25d583..ca2378e 100644 --- a/plugin/monitor/pb/monitor.proto +++ b/plugin/monitor/pb/monitor.proto @@ -1,6 +1,6 @@ syntax = "proto3"; import "google/api/annotations.proto"; -//import "google/protobuf/empty.proto"; +import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; package monitor; option go_package="m7s.live/m7s/v5/plugin/monitor/pb"; @@ -12,9 +12,15 @@ service api { body: "*" }; } + rpc SessionList (google.protobuf.Empty) returns (SessionListResponse) { + option (google.api.http) = { + get: "/monitor/api/session/list" + }; + } } message SearchTaskRequest { + uint32 sessionId = 1; } message Task { @@ -33,4 +39,19 @@ message SearchTaskResponse { uint32 code = 1; string message = 2; repeated Task data = 3; +} + + +message Session { + uint32 id = 1; + uint32 pid = 2; + string args = 3; + google.protobuf.Timestamp startTime = 4; + google.protobuf.Timestamp endTime = 5; +} + +message SessionListResponse { + uint32 code = 1; + string message = 2; + repeated Session data = 3; } \ No newline at end of file diff --git a/plugin/monitor/pb/monitor_grpc.pb.go b/plugin/monitor/pb/monitor_grpc.pb.go index 6aadcc4..6e23106 100644 --- a/plugin/monitor/pb/monitor_grpc.pb.go +++ b/plugin/monitor/pb/monitor_grpc.pb.go @@ -11,6 +11,7 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file @@ -23,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ApiClient interface { SearchTask(ctx context.Context, in *SearchTaskRequest, opts ...grpc.CallOption) (*SearchTaskResponse, error) + SessionList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SessionListResponse, error) } type apiClient struct { @@ -42,11 +44,21 @@ func (c *apiClient) SearchTask(ctx context.Context, in *SearchTaskRequest, opts return out, nil } +func (c *apiClient) SessionList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SessionListResponse, error) { + out := new(SessionListResponse) + err := c.cc.Invoke(ctx, "/monitor.api/SessionList", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ApiServer is the server API for Api service. // All implementations must embed UnimplementedApiServer // for forward compatibility type ApiServer interface { SearchTask(context.Context, *SearchTaskRequest) (*SearchTaskResponse, error) + SessionList(context.Context, *emptypb.Empty) (*SessionListResponse, error) mustEmbedUnimplementedApiServer() } @@ -57,6 +69,9 @@ type UnimplementedApiServer struct { func (UnimplementedApiServer) SearchTask(context.Context, *SearchTaskRequest) (*SearchTaskResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SearchTask not implemented") } +func (UnimplementedApiServer) SessionList(context.Context, *emptypb.Empty) (*SessionListResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SessionList not implemented") +} func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {} // UnsafeApiServer may be embedded to opt out of forward compatibility for this service. @@ -88,6 +103,24 @@ func _Api_SearchTask_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Api_SessionList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ApiServer).SessionList(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/monitor.api/SessionList", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ApiServer).SessionList(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + // Api_ServiceDesc is the grpc.ServiceDesc for Api service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -99,6 +132,10 @@ var Api_ServiceDesc = grpc.ServiceDesc{ MethodName: "SearchTask", Handler: _Api_SearchTask_Handler, }, + { + MethodName: "SessionList", + Handler: _Api_SessionList_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "monitor.proto", diff --git a/plugin/monitor/pkg/schema-session.go b/plugin/monitor/pkg/schema-session.go index 9d1fe01..881b04d 100644 --- a/plugin/monitor/pkg/schema-session.go +++ b/plugin/monitor/pkg/schema-session.go @@ -4,5 +4,7 @@ import "time" type Session struct { ID uint32 `gorm:"primarykey"` + PID int + Args string StartTime, EndTime time.Time } diff --git a/plugin/monitor/pkg/schema-task.go b/plugin/monitor/pkg/schema-task.go index 3c4f1eb..7ab25d0 100644 --- a/plugin/monitor/pkg/schema-task.go +++ b/plugin/monitor/pkg/schema-task.go @@ -12,4 +12,5 @@ type Task struct { TaskType byte Description string Reason string + Level byte } diff --git a/server.go b/server.go index c9948ea..0ce6487 100644 --- a/server.go +++ b/server.go @@ -130,7 +130,10 @@ func (o *OSSignal) Start() error { } func (o *OSSignal) Tick(any) { - util.ShutdownRootTask() + go util.ShutdownRootTask() +} + +func exit() { for _, meta := range plugins { if meta.OnExit != nil { meta.OnExit() @@ -144,6 +147,7 @@ func (o *OSSignal) Tick(any) { func init() { util.RootTask.AddTask(&OSSignal{}) + util.RootTask.OnDispose(exit) for k, v := range myip.LocalAndInternalIPs() { Routes[k] = v fmt.Println(k, v) @@ -243,16 +247,15 @@ func (s *Server) Start() (err error) { return } } - s.AddTask(&s.streamTask) - s.AddTask(&s.pullTask) - s.AddTask(&s.pushTask) - s.AddTask(&s.recordTask) + s.AddTask(&s.streamTask).Description = map[string]any{"ownerType": "Stream"} + s.AddTask(&s.pullTask).Description = map[string]any{"ownerType": "Pull"} + s.AddTask(&s.pushTask).Description = map[string]any{"ownerType": "Push"} + s.AddTask(&s.recordTask).Description = map[string]any{"ownerType": "Record"} for _, plugin := range plugins { if p := plugin.Init(s, cg[strings.ToLower(plugin.Name)]); !p.Disabled { s.AddTask(p.handler) } } - if tcpTask != nil { s.AddTask(&GRPCServer{Task: util.Task{Logger: s.Logger}, s: s, tcpTask: tcpTask}) } @@ -328,7 +331,6 @@ func (s *Server) Dispose() { _ = s.grpcClientConn.Close() if s.DB != nil { db, err := s.DB.DB() - s.DB.Commit() if err == nil { err = db.Close() } @@ -350,11 +352,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, "favicon.ico") return } - fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime) + _, _ = fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime) for plugin := range s.Plugins.Range { - fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version) + _, _ = fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version) } for _, api := range s.apiList { - fmt.Fprintf(w, "%s\n", api) + _, _ = fmt.Fprintf(w, "%s\n", api) } }