feat: complete task system and console

This commit is contained in:
langhuihui
2024-08-23 14:24:04 +08:00
parent 84b7240f7b
commit e883bb94cd
18 changed files with 674 additions and 239 deletions

24
api.go
View File

@@ -145,27 +145,23 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
} }
func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) { func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
res = &pb.TaskTreeResponse{Id: s.ID, Type: s.GetTaskType(), Owner: s.GetOwnerType(), StartTime: timestamppb.New(s.StartTime), Description: maps.Collect(func(yield func(key, value string) bool) { var fillData func(m util.IMarcoTask) *pb.TaskTreeResponse
for k, v := range s.Description { fillData = func(m util.IMarcoTask) (res *pb.TaskTreeResponse) {
res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Blocked: m.Blocked(), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) {
for k, v := range m.GetTask().Description {
yield(k, fmt.Sprintf("%v", v)) yield(k, fmt.Sprintf("%v", v))
} }
})} })}
var fillData func(m util.IMarcoTask, res *pb.TaskTreeResponse)
fillData = func(m util.IMarcoTask, res *pb.TaskTreeResponse) {
for task := range m.RangeSubTask { for task := range m.RangeSubTask {
t := task.GetTask()
child := &pb.TaskTreeResponse{Id: t.ID, Type: task.GetTaskType(), Owner: task.GetOwnerType(), StartTime: timestamppb.New(t.StartTime), Description: maps.Collect(func(yield func(key, value string) bool) {
for k, v := range t.Description {
yield(k, fmt.Sprintf("%v", v))
}
})}
if marcoTask, ok := task.(util.IMarcoTask); ok { if marcoTask, ok := task.(util.IMarcoTask); ok {
fillData(marcoTask, child) res.Children = append(res.Children, fillData(marcoTask))
} } else {
res.Children = append(res.Children, child) res.Children = append(res.Children, &pb.TaskTreeResponse{Id: task.GetTaskID(), State: uint32(task.GetState()), Type: uint32(task.GetTaskType()), Owner: task.GetOwnerType(), StartTime: timestamppb.New(task.GetTask().StartTime)})
} }
} }
fillData(s, res) return
}
res = fillData(&util.RootTask)
return return
} }

View File

@@ -748,11 +748,13 @@ type TaskTreeResponse struct {
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` Type uint32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"`
Owner string `protobuf:"bytes,3,opt,name=owner,proto3" json:"owner,omitempty"` Owner string `protobuf:"bytes,3,opt,name=owner,proto3" json:"owner,omitempty"`
StartTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=startTime,proto3" json:"startTime,omitempty"` StartTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=startTime,proto3" json:"startTime,omitempty"`
Description map[string]string `protobuf:"bytes,5,rep,name=description,proto3" json:"description,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Description map[string]string `protobuf:"bytes,5,rep,name=description,proto3" json:"description,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Children []*TaskTreeResponse `protobuf:"bytes,6,rep,name=children,proto3" json:"children,omitempty"` Children []*TaskTreeResponse `protobuf:"bytes,6,rep,name=children,proto3" json:"children,omitempty"`
State uint32 `protobuf:"varint,7,opt,name=state,proto3" json:"state,omitempty"`
Blocked bool `protobuf:"varint,8,opt,name=blocked,proto3" json:"blocked,omitempty"`
} }
func (x *TaskTreeResponse) Reset() { func (x *TaskTreeResponse) Reset() {
@@ -794,11 +796,11 @@ func (x *TaskTreeResponse) GetId() uint32 {
return 0 return 0
} }
func (x *TaskTreeResponse) GetType() string { func (x *TaskTreeResponse) GetType() uint32 {
if x != nil { if x != nil {
return x.Type return x.Type
} }
return "" return 0
} }
func (x *TaskTreeResponse) GetOwner() string { func (x *TaskTreeResponse) GetOwner() string {
@@ -829,6 +831,20 @@ func (x *TaskTreeResponse) GetChildren() []*TaskTreeResponse {
return nil return nil
} }
func (x *TaskTreeResponse) GetState() uint32 {
if x != nil {
return x.State
}
return 0
}
func (x *TaskTreeResponse) GetBlocked() bool {
if x != nil {
return x.Blocked
}
return false
}
type StreamListRequest struct { type StreamListRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@@ -2229,10 +2245,10 @@ var file_global_proto_rawDesc = []byte{
0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x70, 0x75, 0x73, 0x12, 0x29, 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x70, 0x75, 0x73, 0x12, 0x29,
0x0a, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0a, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x0f, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x0f, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f,
0x52, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x22, 0xc3, 0x02, 0x0a, 0x10, 0x54, 0x61, 0x52, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x22, 0xf3, 0x02, 0x0a, 0x10, 0x54, 0x61,
0x73, 0x6b, 0x54, 0x72, 0x65, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x73, 0x6b, 0x54, 0x72, 0x65, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79,
0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72,
0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
@@ -2245,7 +2261,10 @@ var file_global_proto_rawDesc = []byte{
0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x31, 0x0a, 0x08, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x31, 0x0a, 0x08,
0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15,
0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x54, 0x72, 0x65, 0x65, 0x52, 0x65, 0x73, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x54, 0x72, 0x65, 0x65, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x1a, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x12,
0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05,
0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64,
0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x1a,
0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e,
0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,

View File

@@ -166,11 +166,13 @@ message SysInfoResponse {
message TaskTreeResponse { message TaskTreeResponse {
uint32 id = 1; uint32 id = 1;
string type = 2; uint32 type = 2;
string owner = 3; string owner = 3;
google.protobuf.Timestamp startTime = 4; google.protobuf.Timestamp startTime = 4;
map<string, string> description = 5; map<string, string> description = 5;
repeated TaskTreeResponse children = 6; repeated TaskTreeResponse children = 6;
uint32 state = 7;
bool blocked = 8;
} }
message StreamListRequest { message StreamListRequest {

22
pkg/util/task-call.go Normal file
View File

@@ -0,0 +1,22 @@
package util
type CallBackTask struct {
Task
}
func (t *CallBackTask) GetTaskType() TaskType {
return TASK_TYPE_CALL
}
func CreateTaskByCallBack(start func() error, dispose func()) ITask {
var task CallBackTask
task.startHandler = func() error {
err := start()
if err == nil && dispose == nil {
err = ErrTaskComplete
}
return err
}
task.disposeHandler = dispose
return &task
}

View File

@@ -9,12 +9,8 @@ type ChannelTask struct {
SignalChan any SignalChan any
} }
func (*ChannelTask) GetTaskType() string { func (*ChannelTask) GetTaskType() TaskType {
return "channel" return TASK_TYPE_CHANNEL
}
func (*ChannelTask) GetTaskTypeID() byte {
return 3
} }
func (t *ChannelTask) GetSignal() any { func (t *ChannelTask) GetSignal() any {

View File

@@ -21,7 +21,7 @@ var RootTask MarcoLongTask
func init() { func init() {
RootTask.initTask(context.Background(), &RootTask) RootTask.initTask(context.Background(), &RootTask)
RootTask.Description = map[string]any{ RootTask.Description = map[string]any{
"title": "RootTask", "ownerType": "root",
} }
RootTask.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) RootTask.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
} }
@@ -40,11 +40,8 @@ func (m *MarcoLongTask) initTask(ctx context.Context, task ITask) {
m.keepAlive = true m.keepAlive = true
} }
func (*MarcoLongTask) GetTaskType() string { func (*MarcoLongTask) GetTaskType() TaskType {
return "long" return TASK_TYPE_LONG_MACRO
}
func (*MarcoLongTask) GetTaskTypeID() byte {
return 2
} }
// MarcoTask include sub tasks // MarcoTask include sub tasks
@@ -54,41 +51,45 @@ type MarcoTask struct {
children []ITask children []ITask
lazyRun sync.Once lazyRun sync.Once
keepAlive bool keepAlive bool
addListeners []func(task ITask) childrenDisposed chan struct{}
childDisposeListeners []func(ITask)
blocked bool
} }
func (*MarcoTask) GetTaskType() string { func (*MarcoTask) GetTaskType() TaskType {
return "marco" return TASK_TYPE_MACRO
} }
func (*MarcoTask) GetTaskTypeID() byte { func (mt *MarcoTask) Blocked() bool {
return 1 return mt.blocked
} }
func (mt *MarcoTask) getMaroTask() *MarcoTask { func (mt *MarcoTask) waitChildrenDispose() {
return mt close(mt.addSub)
<-mt.childrenDisposed
} }
func (mt *MarcoTask) initTask(ctx context.Context, task ITask) { func (mt *MarcoTask) OnChildDispose(listener func(ITask)) {
mt.Task.initTask(ctx, task) mt.childDisposeListeners = append(mt.childDisposeListeners, listener)
mt.shutdown = nil }
mt.addSub = make(chan ITask, 10)
func (mt *MarcoTask) onChildDispose(child ITask) {
for _, listener := range mt.childDisposeListeners {
listener(child)
}
if mt.parent != nil {
mt.parent.onChildDispose(child)
}
if child.getParent() == mt {
child.dispose()
}
} }
func (mt *MarcoTask) dispose() { func (mt *MarcoTask) dispose() {
reason := mt.StopReason() if mt.childrenDisposed != nil {
if mt.Logger != nil { mt.OnBeforeDispose(mt.waitChildrenDispose)
mt.Debug("task dispose", "reason", reason, "taskId", mt.ID, "taskType", mt.GetTaskType(), "ownerType", mt.GetOwnerType())
}
mt.disposeHandler()
close(mt.addSub)
_ = mt.WaitStopped()
if mt.Logger != nil {
mt.Debug("task disposed", "reason", reason, "taskId", mt.ID, "taskType", mt.GetTaskType(), "ownerType", mt.GetOwnerType())
}
for _, listener := range mt.afterDisposeListeners {
listener()
} }
mt.Task.dispose()
} }
func (mt *MarcoTask) lazyStart(t ITask) { func (mt *MarcoTask) lazyStart(t ITask) {
@@ -102,6 +103,7 @@ func (mt *MarcoTask) lazyStart(t ITask) {
} }
if task.parent == nil { if task.parent == nil {
task.parent = mt task.parent = mt
task.level = mt.level + 1
} }
if task.Logger == nil { if task.Logger == nil {
task.Logger = mt.Logger task.Logger = mt.Logger
@@ -113,7 +115,8 @@ func (mt *MarcoTask) lazyStart(t ITask) {
task.disposeHandler = EmptyDispose task.disposeHandler = EmptyDispose
} }
mt.lazyRun.Do(func() { mt.lazyRun.Do(func() {
mt.shutdown = NewPromise(context.Background()) mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
go mt.run() go mt.run()
}) })
mt.addSub <- t mt.addSub <- t
@@ -129,10 +132,6 @@ func (mt *MarcoTask) AddTask(task ITask) *Task {
return mt.AddTaskWithContext(mt.Context, task) return mt.AddTaskWithContext(mt.Context, task)
} }
func (mt *MarcoTask) OnTaskAdded(f func(ITask)) {
mt.addListeners = append(mt.addListeners, f)
}
func (mt *MarcoTask) AddTaskWithContext(ctx context.Context, t ITask) (task *Task) { func (mt *MarcoTask) AddTaskWithContext(ctx context.Context, t ITask) (task *Task) {
if ctx == nil && mt.Context == nil { if ctx == nil && mt.Context == nil {
panic("context is nil") panic("context is nil")
@@ -153,32 +152,13 @@ func (mt *MarcoTask) Post(callback func() error) *Task {
return mt.AddTask(task) return mt.AddTask(task)
} }
type CallBackTask struct {
Task
}
func CreateTaskByCallBack(start func() error, dispose func()) ITask {
var task CallBackTask
task.startHandler = func() error {
err := start()
if err == nil && dispose == nil {
err = ErrTaskComplete
}
return err
}
task.disposeHandler = dispose
return &task
}
func (mt *MarcoTask) addChild(task ITask) int { func (mt *MarcoTask) addChild(task ITask) int {
mt.children = append(mt.children, task) mt.children = append(mt.children, task)
for _, listener := range mt.addListeners {
listener(task)
}
return len(mt.children) - 1 return len(mt.children) - 1
} }
func (mt *MarcoTask) removeChild(index int) { func (mt *MarcoTask) removeChild(index int) {
mt.onChildDispose(mt.children[index])
mt.children = slices.Delete(mt.children, index, index+1) mt.children = slices.Delete(mt.children, index, index+1)
} }
@@ -192,16 +172,15 @@ func (mt *MarcoTask) run() {
stopReason := mt.StopReason() stopReason := mt.StopReason()
for _, task := range mt.children { for _, task := range mt.children {
task.Stop(stopReason) task.Stop(stopReason)
if task.getParent() == mt { mt.onChildDispose(task)
task.dispose()
}
} }
mt.children = nil mt.children = nil
mt.addSub = nil close(mt.childrenDisposed)
mt.shutdown.Fulfill(stopReason)
}() }()
for { for {
mt.blocked = false
if chosen, rev, ok := reflect.Select(cases); chosen == 0 { if chosen, rev, ok := reflect.Select(cases); chosen == 0 {
mt.blocked = true
if !ok { if !ok {
return return
} }
@@ -210,25 +189,23 @@ func (mt *MarcoTask) run() {
if err := task.start(); err == nil { if err := task.start(); err == nil {
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())}) cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())})
} else { } else {
mt.removeChild(index)
task.Stop(err) task.Stop(err)
mt.removeChild(index)
} }
} else { } else {
mt.children = append(mt.children, task) mt.addChild(task)
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())}) cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(task.GetSignal())})
} }
} else { } else {
taskIndex := chosen - 1 taskIndex := chosen - 1
task := mt.children[taskIndex] task := mt.children[taskIndex]
if !ok { switch tt := task.(type) {
if task.getParent() == mt { case IChannelTask:
task.dispose() tt.Tick(rev.Interface())
} }
if !ok {
mt.removeChild(taskIndex) mt.removeChild(taskIndex)
cases = slices.Delete(cases, chosen, chosen+1) cases = slices.Delete(cases, chosen, chosen+1)
} else if c, ok := task.(IChannelTask); ok {
c.Tick(rev.Interface())
} }
} }
if !mt.keepAlive && len(mt.children) == 0 { if !mt.keepAlive && len(mt.children) == 0 {

View File

@@ -7,6 +7,7 @@ import (
"log/slog" "log/slog"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"strings"
"time" "time"
) )
@@ -21,28 +22,55 @@ var (
EmptyDispose = func() {} EmptyDispose = func() {}
) )
const (
TASK_STATE_INIT TaskState = iota
TASK_STATE_STARTING
TASK_STATE_STARTED
TASK_STATE_RUNNING
TASK_STATE_GOING
TASK_STATE_DISPOSING
TASK_STATE_DISPOSED
)
const (
TASK_TYPE_BASE TaskType = iota
TASK_TYPE_MACRO
TASK_TYPE_LONG_MACRO
TASK_TYPE_CHANNEL
TASK_TYPE_CALL
)
type ( type (
TaskState byte
TaskType byte
ITask interface { ITask interface {
initTask(context.Context, ITask) initTask(context.Context, ITask)
getParent() *MarcoTask getParent() *MarcoTask
GetParent() ITask
GetTask() *Task GetTask() *Task
GetTaskID() uint32
GetSignal() any GetSignal() any
Stop(error) Stop(error)
StopReason() error StopReason() error
start() error start() error
dispose() dispose()
IsStopped() bool IsStopped() bool
GetTaskType() string GetTaskType() TaskType
GetTaskTypeID() byte
GetOwnerType() string GetOwnerType() string
SetRetry(maxRetry int, retryInterval time.Duration) SetRetry(maxRetry int, retryInterval time.Duration)
OnStart(func()) OnStart(func())
OnBeforeDispose(func())
OnDispose(func()) OnDispose(func())
GetState() TaskState
GetLevel() byte
} }
IMarcoTask interface { IMarcoTask interface {
ITask ITask
RangeSubTask(func(yield ITask) bool) RangeSubTask(func(yield ITask) bool)
OnTaskAdded(func(ITask)) OnChildDispose(func(ITask))
Blocked() bool
Call(func() error)
Post(func() error) *Task
} }
IChannelTask interface { IChannelTask interface {
Tick(any) Tick(any)
@@ -73,31 +101,48 @@ type (
handler ITask handler ITask
retry RetryConfig retry RetryConfig
startHandler func() error startHandler func() error
afterStartListeners, afterDisposeListeners []func() afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func()
disposeHandler func() disposeHandler func()
Description map[string]any Description map[string]any
startup, shutdown *Promise startup, shutdown *Promise
parent *MarcoTask parent *MarcoTask
parentCtx context.Context parentCtx context.Context
needRetry bool needRetry bool
state TaskState
level byte
} }
) )
func (task *Task) GetState() TaskState {
return task.state
}
func (task *Task) GetLevel() byte {
return task.level
}
func (task *Task) GetParent() ITask {
if task.parent != nil {
return task.parent.handler
}
return nil
}
func (task *Task) SetRetry(maxRetry int, retryInterval time.Duration) { func (task *Task) SetRetry(maxRetry int, retryInterval time.Duration) {
task.retry.MaxRetry = maxRetry task.retry.MaxRetry = maxRetry
task.retry.RetryInterval = retryInterval task.retry.RetryInterval = retryInterval
} }
func (task *Task) GetTaskID() uint32 {
return task.ID
}
func (task *Task) GetOwnerType() string { func (task *Task) GetOwnerType() string {
return reflect.TypeOf(task.handler).Elem().Name() if task.Description != nil {
if ownerType, ok := task.Description["ownerType"]; ok {
return ownerType.(string)
}
}
return strings.TrimSuffix(reflect.TypeOf(task.handler).Elem().Name(), "Task")
} }
func (*Task) GetTaskType() string { func (*Task) GetTaskType() TaskType {
return "base" return TASK_TYPE_BASE
}
func (*Task) GetTaskTypeID() byte {
return 0
} }
func (task *Task) GetTask() *Task { func (task *Task) GetTask() *Task {
@@ -117,13 +162,13 @@ func (task *Task) WaitStarted() error {
} }
func (task *Task) WaitStopped() (err error) { func (task *Task) WaitStopped() (err error) {
err = task.WaitStarted() err = task.startup.Await()
if err != nil { if err != nil {
return err return err
} }
if task.shutdown == nil { //if task.shutdown == nil {
return task.StopReason() // return task.StopReason()
} //}
return task.shutdown.Await() return task.shutdown.Await()
} }
@@ -159,6 +204,10 @@ func (task *Task) OnStart(listener func()) {
task.afterStartListeners = append(task.afterStartListeners, listener) task.afterStartListeners = append(task.afterStartListeners, listener)
} }
func (task *Task) OnBeforeDispose(listener func()) {
task.beforeDisposeListeners = append(task.beforeDisposeListeners, listener)
}
func (task *Task) OnDispose(listener func()) { func (task *Task) OnDispose(listener func()) {
task.afterDisposeListeners = append(task.afterDisposeListeners, listener) task.afterDisposeListeners = append(task.afterDisposeListeners, listener)
} }
@@ -203,11 +252,14 @@ func (task *Task) start() (err error) {
} }
hasRun := false hasRun := false
for { for {
task.state = TASK_STATE_STARTING
err = task.startHandler() err = task.startHandler()
task.state = TASK_STATE_STARTED
if err == nil { if err == nil {
task.ResetRetryCount() task.ResetRetryCount()
if runHandler, ok := task.handler.(TaskBlock); ok { if runHandler, ok := task.handler.(TaskBlock); ok {
hasRun = true hasRun = true
task.state = TASK_STATE_RUNNING
err = runHandler.Run() err = runHandler.Run()
if err == nil { if err == nil {
err = ErrTaskComplete err = ErrTaskComplete
@@ -239,21 +291,30 @@ func (task *Task) start() (err error) {
listener() listener()
} }
if goHandler, ok := task.handler.(TaskGo); ok { if goHandler, ok := task.handler.(TaskGo); ok {
task.state = TASK_STATE_GOING
go task.run(goHandler.Go) go task.run(goHandler.Go)
} }
return return
} }
func (task *Task) dispose() { func (task *Task) dispose() {
task.state = TASK_STATE_DISPOSING
reason := task.StopReason() reason := task.StopReason()
if task.Logger != nil { if task.Logger != nil {
task.Debug("task dispose", "reason", reason, "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) task.Debug("task dispose", "reason", reason, "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
} }
for _, listener := range task.beforeDisposeListeners {
listener()
}
task.disposeHandler() task.disposeHandler()
task.shutdown.Fulfill(reason) task.shutdown.Fulfill(reason)
for _, listener := range task.afterDisposeListeners { for _, listener := range task.afterDisposeListeners {
listener() listener()
} }
task.state = TASK_STATE_DISPOSED
if task.Logger != nil {
task.Debug("task disposed", "reason", reason, "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
}
if !errors.Is(reason, ErrTaskComplete) && task.needRetry { if !errors.Is(reason, ErrTaskComplete) && task.needRetry {
task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx) task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx)
task.startup = NewPromise(task.Context) task.startup = NewPromise(task.Context)

View File

@@ -110,7 +110,10 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
} }
} }
p.Config.ParseUserFile(userConfig) p.Config.ParseUserFile(userConfig)
p.Description = map[string]any{"version": plugin.Version, "userConfig": userConfig} p.Description = map[string]any{"version": plugin.Version}
if userConfig != nil {
p.Description["userConfig"] = userConfig
}
finalConfig, _ := yaml.Marshal(p.Config.GetMap()) finalConfig, _ := yaml.Marshal(p.Config.GetMap())
p.Logger.Handler().(*MultiLogHandler).SetLevel(ParseLevel(p.config.LogLevel)) p.Logger.Handler().(*MultiLogHandler).SetLevel(ParseLevel(p.config.LogLevel))
p.Debug("config", "detail", string(finalConfig)) p.Debug("config", "detail", string(finalConfig))

View File

@@ -1 +1 @@
<!doctype html><html id=htmlRoot data-theme=light><head><meta charset=UTF-8 /><meta content="IE=edge,chrome=1" http-equiv=X-UA-Compatible /><meta content=webkit name=renderer /><meta content="width=device-width,initial-scale=1,minimum-scale=1,maximum-scale=1,user-scalable=0" name=viewport /><link href=./favicon.ico rel=icon /><title>Monibuca 实例管理平台</title><script src=https://res.wx.qq.com/open/js/jweixin-1.6.0.js></script><script src=https://hm.baidu.com/hm.js?c602c039e7a2b165463bb1db0744a757></script><script src=./static/js/vue.js></script><script src=./static/js/vue-router.js></script><script src=./static/js/vue-demi.js></script><script src=./static/js/naive-ui.js></script><script type=module crossorigin src=./assets/index-2f71c4eb.js></script><link rel=modulepreload crossorigin href=./assets/vendor-ec30964e.js><link rel=stylesheet href=./assets/index-3de348ff.css></head><body><div id=appProvider style=display:none></div><div id=app><style>.first-loading-wrap{display:flex;width:100%;height:100vh;justify-content:center;align-items:center;flex-direction:column}.first-loading-wrap>h1{font-size:128px}.first-loading-wrap .loading-wrap{padding:98px;display:flex;justify-content:center;align-items:center}.dot{-webkit-animation:antRotate 1.2s infinite linear;animation:antRotate 1.2s infinite linear;transform:rotate(45deg);position:relative;display:inline-block;font-size:32px;width:32px;height:32px;box-sizing:border-box}.dot i{width:14px;height:14px;position:absolute;display:block;background-color:#1890ff;border-radius:100%;transform:scale(.75);transform-origin:50% 50%;opacity:.3;-webkit-animation:antSpinMove 1s infinite linear alternate;animation:antSpinMove 1s infinite linear alternate}.dot i:first-child{top:0;left:0}.dot i:nth-child(2){top:0;right:0;-webkit-animation-delay:.4s;animation-delay:.4s}.dot i:nth-child(3){right:0;bottom:0;-webkit-animation-delay:.8s;animation-delay:.8s}.dot i:nth-child(4){bottom:0;left:0;-webkit-animation-delay:1.2s;animation-delay:1.2s}@keyframes antRotate{to{transform:rotate(405deg)}}@-webkit-keyframes antRotate{to{transform:rotate(405deg)}}@keyframes antSpinMove{to{opacity:1}}@-webkit-keyframes antSpinMove{to{opacity:1}}</style><div class=first-loading-wrap><div class=loading-wrap><span class="dot dot-spin"><i></i><i></i><i></i><i></i></span></div></div></div><script>var globalThis = window</script></body></html> <!doctype html><html id=htmlRoot data-theme=light><head><meta charset=UTF-8 /><meta content="IE=edge,chrome=1" http-equiv=X-UA-Compatible /><meta content=webkit name=renderer /><meta content="width=device-width,initial-scale=1,minimum-scale=1,maximum-scale=1,user-scalable=0" name=viewport /><link href=./favicon.ico rel=icon /><title>Monibuca 实例管理平台</title><script src=https://res.wx.qq.com/open/js/jweixin-1.6.0.js></script><script src=https://hm.baidu.com/hm.js?c602c039e7a2b165463bb1db0744a757></script><script src=./static/js/vue.js></script><script src=./static/js/vue-router.js></script><script src=./static/js/vue-demi.js></script><script src=./static/js/naive-ui.js></script><script type=module crossorigin src=./assets/index-fa77b114.js></script><link rel=modulepreload crossorigin href=./assets/vendor-ec30964e.js><link rel=stylesheet href=./assets/index-3de348ff.css></head><body><div id=appProvider style=display:none></div><div id=app><style>.first-loading-wrap{display:flex;width:100%;height:100vh;justify-content:center;align-items:center;flex-direction:column}.first-loading-wrap>h1{font-size:128px}.first-loading-wrap .loading-wrap{padding:98px;display:flex;justify-content:center;align-items:center}.dot{-webkit-animation:antRotate 1.2s infinite linear;animation:antRotate 1.2s infinite linear;transform:rotate(45deg);position:relative;display:inline-block;font-size:32px;width:32px;height:32px;box-sizing:border-box}.dot i{width:14px;height:14px;position:absolute;display:block;background-color:#1890ff;border-radius:100%;transform:scale(.75);transform-origin:50% 50%;opacity:.3;-webkit-animation:antSpinMove 1s infinite linear alternate;animation:antSpinMove 1s infinite linear alternate}.dot i:first-child{top:0;left:0}.dot i:nth-child(2){top:0;right:0;-webkit-animation-delay:.4s;animation-delay:.4s}.dot i:nth-child(3){right:0;bottom:0;-webkit-animation-delay:.8s;animation-delay:.8s}.dot i:nth-child(4){bottom:0;left:0;-webkit-animation-delay:1.2s;animation-delay:1.2s}@keyframes antRotate{to{transform:rotate(405deg)}}@-webkit-keyframes antRotate{to{transform:rotate(405deg)}}@keyframes antSpinMove{to{opacity:1}}@-webkit-keyframes antSpinMove{to{opacity:1}}</style><div class=first-loading-wrap><div class=loading-wrap><span class="dot dot-spin"><i></i><i></i><i></i><i></i></span></div></div></div><script>var globalThis = window</script></body></html>

View File

@@ -2,6 +2,8 @@ package plugin_monitor
import ( import (
"context" "context"
"errors"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"m7s.live/m7s/v5/plugin/monitor/pb" "m7s.live/m7s/v5/plugin/monitor/pb"
monitor "m7s.live/m7s/v5/plugin/monitor/pkg" monitor "m7s.live/m7s/v5/plugin/monitor/pkg"
@@ -9,12 +11,16 @@ import (
) )
func (cfg *MonitorPlugin) SearchTask(ctx context.Context, req *pb.SearchTaskRequest) (res *pb.SearchTaskResponse, err error) { func (cfg *MonitorPlugin) SearchTask(ctx context.Context, req *pb.SearchTaskRequest) (res *pb.SearchTaskResponse, err error) {
if cfg.DB == nil {
return nil, errors.New("database is not initialized")
}
res = &pb.SearchTaskResponse{} res = &pb.SearchTaskResponse{}
var tasks []*monitor.Task var tasks []*monitor.Task
tx := cfg.DB.Find(&tasks) tx := cfg.DB.Find(&tasks)
if err = tx.Error; err == nil { if err = tx.Error; err == nil {
res.Data = slices.Collect(func(yield func(*pb.Task) bool) { res.Data = slices.Collect(func(yield func(*pb.Task) bool) {
for _, t := range tasks { for _, t := range tasks {
if t.SessionID == req.SessionId {
yield(&pb.Task{ yield(&pb.Task{
Id: t.TaskID, Id: t.TaskID,
StartTime: timestamppb.New(t.StartTime), StartTime: timestamppb.New(t.StartTime),
@@ -27,6 +33,31 @@ func (cfg *MonitorPlugin) SearchTask(ctx context.Context, req *pb.SearchTaskRequ
ParentId: t.ParentID, ParentId: t.ParentID,
}) })
} }
}
})
}
return
}
func (cfg *MonitorPlugin) SessionList(context.Context, *emptypb.Empty) (res *pb.SessionListResponse, err error) {
if cfg.DB == nil {
return nil, errors.New("database is not initialized")
}
res = &pb.SessionListResponse{}
var sessions []*monitor.Session
tx := cfg.DB.Find(&sessions)
err = tx.Error
if err == nil {
res.Data = slices.Collect(func(yield func(*pb.Session) bool) {
for _, s := range sessions {
yield(&pb.Session{
Id: s.ID,
Pid: uint32(s.PID),
Args: s.Args,
StartTime: timestamppb.New(s.StartTime),
EndTime: timestamppb.New(s.EndTime),
})
}
}) })
} }
return return

View File

@@ -6,6 +6,8 @@ import (
"m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/monitor/pb" "m7s.live/m7s/v5/plugin/monitor/pb"
monitor "m7s.live/m7s/v5/plugin/monitor/pkg" monitor "m7s.live/m7s/v5/plugin/monitor/pkg"
"os"
"strings"
"time" "time"
) )
@@ -25,49 +27,27 @@ func (cfg *MonitorPlugin) OnStop() {
} }
} }
func (cfg *MonitorPlugin) taskDisposeListener(task util.ITask, mt util.IMarcoTask) func() { func (cfg *MonitorPlugin) saveTask(task util.ITask) {
return func() {
var th monitor.Task var th monitor.Task
th.SessionID = cfg.session.ID th.SessionID = cfg.session.ID
th.TaskID = task.GetTask().ID th.TaskID = task.GetTaskID()
th.ParentID = mt.GetTask().ID th.ParentID = task.GetParent().GetTaskID()
th.StartTime = task.GetTask().StartTime th.StartTime = task.GetTask().StartTime
th.EndTime = time.Now() th.EndTime = time.Now()
th.OwnerType = task.GetOwnerType() th.OwnerType = task.GetOwnerType()
th.TaskType = task.GetTaskTypeID() th.TaskType = byte(task.GetTaskType())
th.Reason = task.StopReason().Error() th.Reason = task.StopReason().Error()
th.Level = task.GetLevel()
b, _ := json.Marshal(task.GetTask().Description) b, _ := json.Marshal(task.GetTask().Description)
th.Description = string(b) th.Description = string(b)
cfg.DB.Create(&th) cfg.DB.Create(&th)
} }
}
func (cfg *MonitorPlugin) monitorTask(mt util.IMarcoTask) {
mt.OnTaskAdded(func(task util.ITask) {
task.GetTask().OnDispose(cfg.taskDisposeListener(task, mt))
})
for t := range mt.RangeSubTask {
t.OnDispose(cfg.taskDisposeListener(t, mt))
if mt, ok := t.(util.IMarcoTask); ok {
cfg.monitorTask(mt)
}
}
}
//func (cfg *MonitorPlugin) saveUnDisposeTask(mt util.IMarcoTask) {
// for t := range mt.RangeSubTask {
// cfg.taskDisposeListener(t, mt)()
// if mt, ok := t.(util.IMarcoTask); ok {
// cfg.saveUnDisposeTask(mt)
// }
// }
//}
func (cfg *MonitorPlugin) OnInit() (err error) { func (cfg *MonitorPlugin) OnInit() (err error) {
//cfg.columnstore, err = frostdb.New() //cfg.columnstore, err = frostdb.New()
//database, _ := cfg.columnstore.DB(cfg, "monitor") //database, _ := cfg.columnstore.DB(cfg, "monitor")
if cfg.DB != nil { if cfg.DB != nil {
session := &monitor.Session{StartTime: time.Now()} session := &monitor.Session{StartTime: time.Now(), PID: os.Getpid(), Args: strings.Join(os.Args, " ")}
err = cfg.DB.AutoMigrate(session) err = cfg.DB.AutoMigrate(session)
if err != nil { if err != nil {
return err return err
@@ -82,7 +62,10 @@ func (cfg *MonitorPlugin) OnInit() (err error) {
if err != nil { if err != nil {
return err return err
} }
cfg.monitorTask(cfg.Plugin.Server) cfg.Plugin.Server.OnBeforeDispose(func() {
cfg.saveTask(cfg.Plugin.Server)
})
cfg.Plugin.Server.OnChildDispose(cfg.saveTask)
} }
return return
} }

View File

@@ -10,6 +10,7 @@ import (
_ "google.golang.org/genproto/googleapis/api/annotations" _ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb" timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect" reflect "reflect"
sync "sync" sync "sync"
@@ -26,6 +27,8 @@ type SearchTaskRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
SessionId uint32 `protobuf:"varint,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"`
} }
func (x *SearchTaskRequest) Reset() { func (x *SearchTaskRequest) Reset() {
@@ -60,6 +63,13 @@ func (*SearchTaskRequest) Descriptor() ([]byte, []int) {
return file_monitor_proto_rawDescGZIP(), []int{0} return file_monitor_proto_rawDescGZIP(), []int{0}
} }
func (x *SearchTaskRequest) GetSessionId() uint32 {
if x != nil {
return x.SessionId
}
return 0
}
type Task struct { type Task struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@@ -234,51 +244,221 @@ func (x *SearchTaskResponse) GetData() []*Task {
return nil return nil
} }
type Session struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Pid uint32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"`
Args string `protobuf:"bytes,3,opt,name=args,proto3" json:"args,omitempty"`
StartTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=endTime,proto3" json:"endTime,omitempty"`
}
func (x *Session) Reset() {
*x = Session{}
if protoimpl.UnsafeEnabled {
mi := &file_monitor_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Session) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Session) ProtoMessage() {}
func (x *Session) ProtoReflect() protoreflect.Message {
mi := &file_monitor_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Session.ProtoReflect.Descriptor instead.
func (*Session) Descriptor() ([]byte, []int) {
return file_monitor_proto_rawDescGZIP(), []int{3}
}
func (x *Session) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *Session) GetPid() uint32 {
if x != nil {
return x.Pid
}
return 0
}
func (x *Session) GetArgs() string {
if x != nil {
return x.Args
}
return ""
}
func (x *Session) GetStartTime() *timestamppb.Timestamp {
if x != nil {
return x.StartTime
}
return nil
}
func (x *Session) GetEndTime() *timestamppb.Timestamp {
if x != nil {
return x.EndTime
}
return nil
}
type SessionListResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data []*Session `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"`
}
func (x *SessionListResponse) Reset() {
*x = SessionListResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_monitor_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SessionListResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SessionListResponse) ProtoMessage() {}
func (x *SessionListResponse) ProtoReflect() protoreflect.Message {
mi := &file_monitor_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SessionListResponse.ProtoReflect.Descriptor instead.
func (*SessionListResponse) Descriptor() ([]byte, []int) {
return file_monitor_proto_rawDescGZIP(), []int{4}
}
func (x *SessionListResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *SessionListResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *SessionListResponse) GetData() []*Session {
if x != nil {
return x.Data
}
return nil
}
var File_monitor_proto protoreflect.FileDescriptor var File_monitor_proto protoreflect.FileDescriptor
var file_monitor_proto_rawDesc = []byte{ var file_monitor_proto_rawDesc = []byte{
0x0a, 0x0d, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x0d, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x07, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x07, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72,
0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x65, 0x61, 0x72, 0x63, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xa4, 0x02, 0x0a, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70,
0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x31, 0x0a, 0x11, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61,
0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x02, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65,
0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xa4, 0x02, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b,
0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64,
0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03,
0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74,
0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74,
0x54, 0x69, 0x6d, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18,
0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65,
0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06,
0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65,
0x61, 0x73, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49,
0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e,
0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x09,
0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x65,
0x0a, 0x12, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x0d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52,
0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0xaf, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69,
0x64, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03,
0x70, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74,
0x54, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d,
0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6e, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07,
0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x69, 0x0a, 0x13, 0x53, 0x65, 0x73, 0x73, 0x69,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12,
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f,
0x74, 0x49, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20,
0x74, 0x49, 0x64, 0x22, 0x65, 0x0a, 0x12, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x24, 0x0a, 0x04,
0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6d, 0x6f, 0x6e,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x04, 0x64, 0x61,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x61, 0x32, 0xd9, 0x01, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74,
0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71,
0x54, 0x61, 0x73, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x71, 0x0a, 0x03, 0x61, 0x70, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53,
0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69,
0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74,
0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x61, 0x73, 0x6b, 0x3a, 0x01, 0x2a, 0x12, 0x66, 0x0a, 0x0b, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e,
0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x3a, 0x01, 0x2a, 0x42, 0x23, 0x5a, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4c,
0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x21, 0x82, 0xd3, 0xe4,
0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x93, 0x02, 0x1b, 0x12, 0x19, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70,
0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x69, 0x2f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x2f, 0x6c, 0x69, 0x73, 0x74, 0x42, 0x23,
0x5a, 0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76,
0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72,
0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@@ -293,24 +473,32 @@ func file_monitor_proto_rawDescGZIP() []byte {
return file_monitor_proto_rawDescData return file_monitor_proto_rawDescData
} }
var file_monitor_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_monitor_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_monitor_proto_goTypes = []interface{}{ var file_monitor_proto_goTypes = []interface{}{
(*SearchTaskRequest)(nil), // 0: monitor.SearchTaskRequest (*SearchTaskRequest)(nil), // 0: monitor.SearchTaskRequest
(*Task)(nil), // 1: monitor.Task (*Task)(nil), // 1: monitor.Task
(*SearchTaskResponse)(nil), // 2: monitor.SearchTaskResponse (*SearchTaskResponse)(nil), // 2: monitor.SearchTaskResponse
(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp (*Session)(nil), // 3: monitor.Session
(*SessionListResponse)(nil), // 4: monitor.SessionListResponse
(*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
} }
var file_monitor_proto_depIdxs = []int32{ var file_monitor_proto_depIdxs = []int32{
3, // 0: monitor.Task.startTime:type_name -> google.protobuf.Timestamp 5, // 0: monitor.Task.startTime:type_name -> google.protobuf.Timestamp
3, // 1: monitor.Task.endTime:type_name -> google.protobuf.Timestamp 5, // 1: monitor.Task.endTime:type_name -> google.protobuf.Timestamp
1, // 2: monitor.SearchTaskResponse.data:type_name -> monitor.Task 1, // 2: monitor.SearchTaskResponse.data:type_name -> monitor.Task
0, // 3: monitor.api.SearchTask:input_type -> monitor.SearchTaskRequest 5, // 3: monitor.Session.startTime:type_name -> google.protobuf.Timestamp
2, // 4: monitor.api.SearchTask:output_type -> monitor.SearchTaskResponse 5, // 4: monitor.Session.endTime:type_name -> google.protobuf.Timestamp
4, // [4:5] is the sub-list for method output_type 3, // 5: monitor.SessionListResponse.data:type_name -> monitor.Session
3, // [3:4] is the sub-list for method input_type 0, // 6: monitor.api.SearchTask:input_type -> monitor.SearchTaskRequest
3, // [3:3] is the sub-list for extension type_name 6, // 7: monitor.api.SessionList:input_type -> google.protobuf.Empty
3, // [3:3] is the sub-list for extension extendee 2, // 8: monitor.api.SearchTask:output_type -> monitor.SearchTaskResponse
0, // [0:3] is the sub-list for field type_name 4, // 9: monitor.api.SessionList:output_type -> monitor.SessionListResponse
8, // [8:10] is the sub-list for method output_type
6, // [6:8] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
} }
func init() { file_monitor_proto_init() } func init() { file_monitor_proto_init() }
@@ -355,6 +543,30 @@ func file_monitor_proto_init() {
return nil return nil
} }
} }
file_monitor_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Session); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_monitor_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SessionListResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
@@ -362,7 +574,7 @@ func file_monitor_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_monitor_proto_rawDesc, RawDescriptor: file_monitor_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 3, NumMessages: 5,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@@ -21,6 +21,7 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
) )
// Suppress "imported and not used" errors // Suppress "imported and not used" errors
@@ -57,6 +58,24 @@ func local_request_Api_SearchTask_0(ctx context.Context, marshaler runtime.Marsh
} }
func request_Api_SessionList_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.SessionList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_SessionList_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.SessionList(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux". // RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly. // UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@@ -88,6 +107,31 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
}) })
mux.Handle("GET", pattern_Api_SessionList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/monitor.Api/SessionList", runtime.WithHTTPPathPattern("/monitor/api/session/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_SessionList_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_SessionList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil return nil
} }
@@ -151,13 +195,39 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
}) })
mux.Handle("GET", pattern_Api_SessionList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/monitor.Api/SessionList", runtime.WithHTTPPathPattern("/monitor/api/session/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_SessionList_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_SessionList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil return nil
} }
var ( var (
pattern_Api_SearchTask_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"monitor", "api", "search", "task"}, "")) pattern_Api_SearchTask_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"monitor", "api", "search", "task"}, ""))
pattern_Api_SessionList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"monitor", "api", "session", "list"}, ""))
) )
var ( var (
forward_Api_SearchTask_0 = runtime.ForwardResponseMessage forward_Api_SearchTask_0 = runtime.ForwardResponseMessage
forward_Api_SessionList_0 = runtime.ForwardResponseMessage
) )

View File

@@ -1,6 +1,6 @@
syntax = "proto3"; syntax = "proto3";
import "google/api/annotations.proto"; import "google/api/annotations.proto";
//import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
package monitor; package monitor;
option go_package="m7s.live/m7s/v5/plugin/monitor/pb"; option go_package="m7s.live/m7s/v5/plugin/monitor/pb";
@@ -12,9 +12,15 @@ service api {
body: "*" body: "*"
}; };
} }
rpc SessionList (google.protobuf.Empty) returns (SessionListResponse) {
option (google.api.http) = {
get: "/monitor/api/session/list"
};
}
} }
message SearchTaskRequest { message SearchTaskRequest {
uint32 sessionId = 1;
} }
message Task { message Task {
@@ -34,3 +40,18 @@ message SearchTaskResponse {
string message = 2; string message = 2;
repeated Task data = 3; repeated Task data = 3;
} }
message Session {
uint32 id = 1;
uint32 pid = 2;
string args = 3;
google.protobuf.Timestamp startTime = 4;
google.protobuf.Timestamp endTime = 5;
}
message SessionListResponse {
uint32 code = 1;
string message = 2;
repeated Session data = 3;
}

View File

@@ -11,6 +11,7 @@ import (
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
) )
// This is a compile-time assertion to ensure that this generated file // This is a compile-time assertion to ensure that this generated file
@@ -23,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ApiClient interface { type ApiClient interface {
SearchTask(ctx context.Context, in *SearchTaskRequest, opts ...grpc.CallOption) (*SearchTaskResponse, error) SearchTask(ctx context.Context, in *SearchTaskRequest, opts ...grpc.CallOption) (*SearchTaskResponse, error)
SessionList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SessionListResponse, error)
} }
type apiClient struct { type apiClient struct {
@@ -42,11 +44,21 @@ func (c *apiClient) SearchTask(ctx context.Context, in *SearchTaskRequest, opts
return out, nil return out, nil
} }
func (c *apiClient) SessionList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SessionListResponse, error) {
out := new(SessionListResponse)
err := c.cc.Invoke(ctx, "/monitor.api/SessionList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service. // ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer // All implementations must embed UnimplementedApiServer
// for forward compatibility // for forward compatibility
type ApiServer interface { type ApiServer interface {
SearchTask(context.Context, *SearchTaskRequest) (*SearchTaskResponse, error) SearchTask(context.Context, *SearchTaskRequest) (*SearchTaskResponse, error)
SessionList(context.Context, *emptypb.Empty) (*SessionListResponse, error)
mustEmbedUnimplementedApiServer() mustEmbedUnimplementedApiServer()
} }
@@ -57,6 +69,9 @@ type UnimplementedApiServer struct {
func (UnimplementedApiServer) SearchTask(context.Context, *SearchTaskRequest) (*SearchTaskResponse, error) { func (UnimplementedApiServer) SearchTask(context.Context, *SearchTaskRequest) (*SearchTaskResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SearchTask not implemented") return nil, status.Errorf(codes.Unimplemented, "method SearchTask not implemented")
} }
func (UnimplementedApiServer) SessionList(context.Context, *emptypb.Empty) (*SessionListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SessionList not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {} func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service. // UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
@@ -88,6 +103,24 @@ func _Api_SearchTask_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Api_SessionList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).SessionList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/monitor.api/SessionList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).SessionList(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service. // Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@@ -99,6 +132,10 @@ var Api_ServiceDesc = grpc.ServiceDesc{
MethodName: "SearchTask", MethodName: "SearchTask",
Handler: _Api_SearchTask_Handler, Handler: _Api_SearchTask_Handler,
}, },
{
MethodName: "SessionList",
Handler: _Api_SessionList_Handler,
},
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
Metadata: "monitor.proto", Metadata: "monitor.proto",

View File

@@ -4,5 +4,7 @@ import "time"
type Session struct { type Session struct {
ID uint32 `gorm:"primarykey"` ID uint32 `gorm:"primarykey"`
PID int
Args string
StartTime, EndTime time.Time StartTime, EndTime time.Time
} }

View File

@@ -12,4 +12,5 @@ type Task struct {
TaskType byte TaskType byte
Description string Description string
Reason string Reason string
Level byte
} }

View File

@@ -130,7 +130,10 @@ func (o *OSSignal) Start() error {
} }
func (o *OSSignal) Tick(any) { func (o *OSSignal) Tick(any) {
util.ShutdownRootTask() go util.ShutdownRootTask()
}
func exit() {
for _, meta := range plugins { for _, meta := range plugins {
if meta.OnExit != nil { if meta.OnExit != nil {
meta.OnExit() meta.OnExit()
@@ -144,6 +147,7 @@ func (o *OSSignal) Tick(any) {
func init() { func init() {
util.RootTask.AddTask(&OSSignal{}) util.RootTask.AddTask(&OSSignal{})
util.RootTask.OnDispose(exit)
for k, v := range myip.LocalAndInternalIPs() { for k, v := range myip.LocalAndInternalIPs() {
Routes[k] = v Routes[k] = v
fmt.Println(k, v) fmt.Println(k, v)
@@ -243,16 +247,15 @@ func (s *Server) Start() (err error) {
return return
} }
} }
s.AddTask(&s.streamTask) s.AddTask(&s.streamTask).Description = map[string]any{"ownerType": "Stream"}
s.AddTask(&s.pullTask) s.AddTask(&s.pullTask).Description = map[string]any{"ownerType": "Pull"}
s.AddTask(&s.pushTask) s.AddTask(&s.pushTask).Description = map[string]any{"ownerType": "Push"}
s.AddTask(&s.recordTask) s.AddTask(&s.recordTask).Description = map[string]any{"ownerType": "Record"}
for _, plugin := range plugins { for _, plugin := range plugins {
if p := plugin.Init(s, cg[strings.ToLower(plugin.Name)]); !p.Disabled { if p := plugin.Init(s, cg[strings.ToLower(plugin.Name)]); !p.Disabled {
s.AddTask(p.handler) s.AddTask(p.handler)
} }
} }
if tcpTask != nil { if tcpTask != nil {
s.AddTask(&GRPCServer{Task: util.Task{Logger: s.Logger}, s: s, tcpTask: tcpTask}) s.AddTask(&GRPCServer{Task: util.Task{Logger: s.Logger}, s: s, tcpTask: tcpTask})
} }
@@ -328,7 +331,6 @@ func (s *Server) Dispose() {
_ = s.grpcClientConn.Close() _ = s.grpcClientConn.Close()
if s.DB != nil { if s.DB != nil {
db, err := s.DB.DB() db, err := s.DB.DB()
s.DB.Commit()
if err == nil { if err == nil {
err = db.Close() err = db.Close()
} }
@@ -350,11 +352,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "favicon.ico") http.ServeFile(w, r, "favicon.ico")
return return
} }
fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime) _, _ = fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime)
for plugin := range s.Plugins.Range { for plugin := range s.Plugins.Range {
fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version) _, _ = fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version)
} }
for _, api := range s.apiList { for _, api := range s.apiList {
fmt.Fprintf(w, "%s\n", api) _, _ = fmt.Fprintf(w, "%s\n", api)
} }
} }