初次提交

This commit is contained in:
liuzhihang1
2024-06-26 20:45:23 +08:00
parent 4b388a5be1
commit 831ea9889f
57 changed files with 3945 additions and 0 deletions

155
service/es/es.go Normal file
View File

@@ -0,0 +1,155 @@
package es
import (
"bytes"
"context"
"encoding/json"
"msm/config"
"msm/log"
"msm/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
var esClient *elasticsearch.Client
type esService struct{}
var EsService = new(esService)
func InitEs() bool {
if config.CF.EsEnable {
cfg := elasticsearch.Config{
Addresses: []string{
config.CF.EsUrl,
},
Username: config.CF.EsUsername,
Password: config.CF.EsPassword,
}
var err error
esClient, err = elasticsearch.NewClient(cfg)
if err != nil {
log.Logger.Fatalln("Failed to connect to es")
}
_, err = esClient.Info()
if err != nil {
log.Logger.Error("es启动失败", err)
config.CF.EsEnable = false
} else {
return true
}
} else {
log.Logger.Debug("不使用es")
}
return false
}
// idx 为空,默认随机唯一字符串
func (e *esService) Index(index, idx string, doc map[string]interface{}) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
log.Logger.Error(err, "Error encoding doc")
return
}
res, err := esClient.Index(
index,
&buf,
esClient.Index.WithDocumentID(idx),
esClient.Index.WithRefresh("true"),
)
if err != nil {
log.Logger.Error(err, "Error create response")
}
defer res.Body.Close()
}
func (e *esService) Insert(log string, processName string, using string, ts int64) {
doc := map[string]interface{}{
"log": log,
"name": processName,
"using": using,
"time": ts,
}
e.Index(config.CF.EsIndex, "", doc)
}
func (e *esService) Search(req model.GetLogReq) model.LogResp {
query := []func(*esapi.SearchRequest){
esClient.Search.WithIndex(config.CF.EsIndex),
esClient.Search.WithContext(context.Background()),
esClient.Search.WithPretty(),
esClient.Search.WithTrackTotalHits(true),
esClient.Search.WithFrom(req.Page.From),
esClient.Search.WithSize(req.Page.Size),
}
if req.Sort == "asc" {
query = append(query, esClient.Search.WithSort("time:asc"))
}
if req.Sort == "desc" {
query = append(query, esClient.Search.WithSort("time:desc"))
}
body := e.buildQueryBody(req)
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(body); err != nil {
log.Logger.Error(err)
return model.LogResp{}
}
query = append(query, esClient.Search.WithBody(&buf))
res, err := esClient.Search(query...)
if err != nil {
log.Logger.Error(err)
return model.LogResp{}
}
resp := model.EsResp{}
json.NewDecoder(res.Body).Decode(&resp)
res.Body.Close()
result := model.LogResp{}
for _, v := range resp.Hits.Hits {
result.Data = append(result.Data, model.Eslog{
Log: v.Source.Log,
Name: v.Source.Name,
Using: v.Source.Using,
Time: v.Source.Time,
Id: v.ID,
})
}
result.Total = resp.Hits.Total.Value
return result
}
func (e *esService) buildQueryBody(req model.GetLogReq) model.QueryBody {
result := model.QueryBody{}
if req.TimeRange.EndTime != 0 || req.TimeRange.StartTime != 0 {
result.Query.Bool.Must = append(result.Query.Bool.Must, map[string]any{
"range": map[string]any{
"time": map[string]any{
"gte": req.TimeRange.StartTime,
"lte": req.TimeRange.EndTime,
},
},
})
}
if req.Match.Log != "" {
result.Query.Bool.Must = append(result.Query.Bool.Must, map[string]any{
"match": map[string]any{
"log": req.Match.Log,
},
})
}
if req.Match.Name != "" {
result.Query.Bool.Must = append(result.Query.Bool.Must, map[string]any{
"match": map[string]any{
"name": req.Match.Name,
},
})
}
if req.Match.Using != "" {
result.Query.Bool.Must = append(result.Query.Bool.Must, map[string]any{
"match": map[string]any{
"using": req.Match.Using,
},
})
}
return result
}

76
service/file/file.go Normal file
View File

@@ -0,0 +1,76 @@
package file
import (
"fmt"
"io"
"msm/config"
"msm/log"
"msm/model"
"os"
)
type fileService struct{}
var FileService = new(fileService)
func (f *fileService) ReadFileFromPath(path string) (result []byte, err error) {
fi, err := os.Open(path)
if err != nil {
return
}
defer fi.Close()
fileInfo, err := fi.Stat()
if err != nil {
return
}
if size := float64(fileInfo.Size()) / 1e6; size > config.CF.FileSizeLimit {
err = fmt.Errorf("写入数据大小%vMB,超过%vMB限制", size, config.CF.FileSizeLimit)
return
}
result, err = io.ReadAll(fi)
if err != nil {
return
}
log.Logger.Debugw("文件写入成功", "path", path)
return
}
func (f *fileService) UpdateFileData(filePath string, file io.Reader, size int64) error {
if size := float64(size) / 1e6; size > config.CF.FileSizeLimit {
return fmt.Errorf("写入数据大小%vMB,超过%vMB限制", size, config.CF.FileSizeLimit)
}
fi, err := os.OpenFile(filePath, os.O_RDWR|os.O_TRUNC, 0777)
if err != nil {
return err
}
defer fi.Close()
if _, err = io.Copy(fi, file); err != nil {
return err
}
log.Logger.Debugw("文件写入成功", "path", filePath)
return nil
}
func (f *fileService) GetFileAndDirByPath(srcPath string) ([]model.FileStruct, error) {
result := []model.FileStruct{}
files, err := os.ReadDir(srcPath)
if err != nil {
return result, err
}
for _, file := range files {
result = append(result, model.FileStruct{
Name: file.Name(),
IsDir: file.IsDir(),
})
}
return result, nil
}
func (f *fileService) CreateNewDir(path string, name string) error {
_, err := os.Create(path + name)
return err
}
func (f *fileService) CreateNewFile(path string, name string) error {
return os.MkdirAll(path+name, os.ModeDir)
}

40
service/log/loghandler.go Normal file
View File

@@ -0,0 +1,40 @@
package loghandler
import (
"msm/log"
"msm/model"
"msm/service/es"
"time"
"github.com/panjf2000/ants"
)
type loghandler struct{}
var (
antsPool *ants.PoolWithFunc
Loghandler = new(loghandler)
logHanleFunc = func(i interface{}) {
esLog, ok := i.(model.Eslog)
if !ok {
log.Logger.Panicw("传入错误参数", "data", esLog)
return
}
es.EsService.Insert(esLog.Log, esLog.Name, esLog.Using, esLog.Time)
}
panicHanlderFunc = func(i interface{}) {
log.Logger.Error("es消息储存失败")
}
)
func init() {
antsPool, _ = ants.NewPoolWithFunc(1000, logHanleFunc, ants.WithPanicHandler(panicHanlderFunc), ants.WithExpiryDuration(time.Second*10))
}
func (l *loghandler) AddLog(data model.Eslog) {
if err := antsPool.Invoke(data); err != nil {
log.Logger.Errorw("协程池添加任务失败", "err", err, "当前运行数量", antsPool.Running())
}
}

324
service/process/proccess.go Normal file
View File

@@ -0,0 +1,324 @@
package process
import (
"errors"
"msm/config"
"msm/log"
"msm/model"
loghandler "msm/service/log"
"msm/service/push"
"os/exec"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
pu "github.com/shirou/gopsutil/process"
)
type Process interface {
ReadCache(*websocket.Conn)
GetName() string
SetName(string)
GetTermType() string
SetTermType(string)
SetIsUsing(bool)
GetWhoUsing() string
SetWhoUsing(string)
SetStartCommand([]string)
GetControlController() string
SetControlController(string)
ChangControlChan() chan int
StopChan() chan struct{}
SetConfigLogReport(bool)
SetConfigStatuPush(bool)
SetConfigAutoRestart(bool)
GetStateInfo() string
GetStateState() uint8
Kill() error
SetWsConn(*websocket.Conn)
Write(string) error
WriteBytes([]byte) error
GetStartTimeFormat() string
VerifyControl() bool
ResetRestartTimes()
InitPerformanceStatus()
ProcessControl(string)
AddCpuUsage(float64)
AddMemUsage(float64)
AddRecordTime()
GetTimeRecord() []string
GetMemUsage() []float64
GetCpuUsage() []float64
monitorHanler()
initPsutil()
SetAutoRestart(bool)
TryLock() bool
Unlock()
ReStart()
}
type ProcessBase struct {
Name string
termType string
Pid int
cmd *exec.Cmd
IsUsing atomic.Bool
StartCommand []string
Lock sync.Mutex
WhoUsing string
stopChan chan struct{}
Control struct {
Controller string
changControlChan chan int
changControlTime time.Time
}
ws struct {
wsConnect *websocket.Conn
wsMux sync.RWMutex
}
Config struct {
AutoRestart bool
statuPush bool
logReport bool
}
State struct {
startTime time.Time
Info string
State uint8 //0 为未运行1为运作中2为异常状态
restartTimes int
}
performanceStatus struct {
cpu []float64
mem []float64
time []string
}
monitor struct {
enable bool
ProcessBase *pu.Process
}
}
func (p *ProcessBase) GetTermType() string {
return p.termType
}
func (p *ProcessBase) SetTermType(s string) {
p.termType = s
}
func (p *ProcessBase) GetStateInfo() string {
return p.State.Info
}
func (p *ProcessBase) GetStateState() uint8 {
return p.State.State
}
func (p *ProcessBase) SetAutoRestart(data bool) {
p.Config.AutoRestart = data
}
func (p *ProcessBase) GetWhoUsing() string {
return p.WhoUsing
}
func (p *ProcessBase) GetControlController() string {
return p.Control.Controller
}
func (p *ProcessBase) SetControlController(c string) {
p.Control.Controller = c
}
func (p *ProcessBase) SetWsConn(ws *websocket.Conn) {
p.ws.wsConnect = ws
}
func (p *ProcessBase) logReportHandler(log string) {
if config.CF.EsEnable && p.Config.logReport && len([]rune(log)) > config.CF.LogMinLenth {
loghandler.Loghandler.AddLog(model.Eslog{
Log: log,
Using: p.WhoUsing,
Name: p.Name,
Time: time.Now().UnixMilli(),
})
}
}
func (p *ProcessBase) GetStartTimeFormat() string {
return p.State.startTime.Format(time.DateTime)
}
func (p *ProcessBase) ProcessControl(name string) {
p.Control.changControlTime = time.Now()
p.Control.Controller = name
if p.State.State == 1 && p.IsUsing.Load() {
p.Control.changControlChan <- 0
}
}
// 没人在使用或控制时间过期
func (p *ProcessBase) VerifyControl() bool {
return p.Control.Controller == "" || p.Control.changControlTime.Unix() < time.Now().Unix()-config.CF.ProcessExpireTime
}
func (p *ProcessBase) setProcessConfig(pconfig model.Process) {
p.Config.AutoRestart = pconfig.AutoRestart
p.Config.logReport = pconfig.LogReport
p.Config.statuPush = pconfig.Push
}
func (p *ProcessBase) ResetRestartTimes() {
p.State.restartTimes = 0
}
func (p *ProcessBase) push(message string) {
if p.Config.statuPush {
messagePlaceholders := map[string]string{
"{$name}": p.Name,
"{$user}": p.WhoUsing,
"{$message}": message,
"{$status}": strconv.Itoa(int(p.State.State)),
}
push.PushService.Push(messagePlaceholders)
}
}
func (p *ProcessBase) InitPerformanceStatus() {
p.performanceStatus.cpu = make([]float64, config.CF.PerformanceInfoListLength)
p.performanceStatus.mem = make([]float64, config.CF.PerformanceInfoListLength)
p.performanceStatus.time = make([]string, config.CF.PerformanceInfoListLength)
}
func (p *ProcessBase) AddCpuUsage(usage float64) {
p.performanceStatus.cpu = append(p.performanceStatus.cpu[1:], usage)
}
func (p *ProcessBase) AddMemUsage(usage float64) {
p.performanceStatus.mem = append(p.performanceStatus.mem[1:], usage)
}
func (p *ProcessBase) AddRecordTime() {
p.performanceStatus.time = append(p.performanceStatus.time[1:], time.Now().Format(time.DateTime))
}
func (p *ProcessBase) GetCpuUsage() []float64 {
return p.performanceStatus.cpu
}
func (p *ProcessBase) GetMemUsage() []float64 {
return p.performanceStatus.mem
}
func (p *ProcessBase) GetTimeRecord() []string {
return p.performanceStatus.time
}
func (p *ProcessBase) monitorHanler() {
defer log.Logger.Infow("性能监控结束", "name", p.Name, "pid", p.Pid)
for {
if !p.monitor.enable {
return
}
select {
case <-time.After(time.Minute * time.Duration(config.CF.PerformanceInfoInterval)):
if p.State.State != 1 {
log.Logger.Debugw("进程状态异常,跳过监控数据获取", "name", p.Name)
p.AddCpuUsage(0)
p.AddMemUsage(0)
p.AddRecordTime()
continue
}
ProcessBase := p.monitor.ProcessBase
cpuPercent, err := ProcessBase.CPUPercent()
if err != nil {
log.Logger.Errorw("CPU使用率获取失败", "err", err)
return
}
memInfo, err := ProcessBase.MemoryInfo()
if err != nil {
log.Logger.Errorw("内存使用率获取失败", "err", err)
return
}
p.AddRecordTime()
p.AddCpuUsage(cpuPercent)
p.AddMemUsage(float64(memInfo.RSS / 1000))
log.Logger.Debugw("进程资源使用率获取成功", "pid", p.Pid, "name", p.Name, "cpu", cpuPercent, "mem", memInfo.RSS)
case <-p.stopChan:
return
}
}
}
func (p *ProcessBase) initPsutil() {
pup, err := pu.NewProcess(int32(p.Pid))
if err != nil {
p.monitor.enable = false
log.Logger.Debug("pu进程获取失败")
} else {
p.monitor.enable = true
log.Logger.Debug("pu进程获取成功")
p.monitor.ProcessBase = pup
}
}
func (p *ProcessBase) SetConfigLogReport(b bool) {
p.Config.logReport = b
}
func (p *ProcessBase) SetConfigAutoRestart(b bool) {
p.Config.AutoRestart = b
}
func (p *ProcessBase) SetConfigStatuPush(b bool) {
p.Config.statuPush = b
}
func (p *ProcessBase) SetName(s string) {
p.Name = s
}
func (p *ProcessBase) SetStartCommand(cmd []string) {
p.StartCommand = cmd
}
func (p *ProcessBase) ChangControlChan() chan int {
return p.Control.changControlChan
}
func (p *ProcessBase) SetIsUsing(b bool) {
p.IsUsing.Store(b)
}
func (p *ProcessBase) GetName() string {
return p.Name
}
func (p *ProcessBase) SetWhoUsing(s string) {
p.WhoUsing = s
}
func (p *ProcessBase) StopChan() chan struct{} {
return p.stopChan
}
func (p *ProcessBase) TryLock() bool {
return p.Lock.TryLock()
}
func (p *ProcessBase) Unlock() {
p.Lock.Unlock()
}
func RunNewProcess(config model.Process) (proc Process, err error) {
switch config.TermType {
case "std":
proc, err = RunNewProcessStd(config)
case "pty":
proc, err = RunNewProcessPty(config)
default:
err = errors.New("终端类型错误")
}
return
}

View File

@@ -0,0 +1,169 @@
package process
import (
"bytes"
"fmt"
"msm/config"
"msm/log"
"msm/model"
"msm/utils"
"os"
"os/exec"
"strings"
"time"
"github.com/creack/pty"
"github.com/gorilla/websocket"
)
type ProcessPty struct {
ProcessBase
cacheBytesBuf *bytes.Buffer
pty *os.File
}
func (p *ProcessPty) Kill() error {
if err := p.cmd.Process.Kill(); err != nil {
log.Logger.Errorw("进程杀死失败", "err", err, "state", p.State.State)
return err
}
return p.pty.Close()
}
func (p *ProcessPty) watchDog() {
state, _ := p.cmd.Process.Wait()
close(p.stopChan)
p.State.State = 0
p.pty.Close()
if state.ExitCode() != 0 {
log.Logger.Infow("进程停止", "进程名称", p.Name, "exitCode", state.ExitCode(), "进程类型", "pty")
p.push(fmt.Sprintf("进程停止,退出码 %d", state.ExitCode()))
if p.Config.AutoRestart {
p.ReStart()
}
} else {
log.Logger.Infow("进程正常退出", "进程名称", p.Name)
p.push("进程正常退出")
}
}
func (p *ProcessPty) ReStart() {
if p.State.restartTimes > config.CF.ProcessRestartsLimit {
log.Logger.Warnw("重启次数达到上限", "name", p.Name, "limit", config.CF.ProcessRestartsLimit)
p.State.State = 2
p.State.Info = "重启次数异常"
p.push("进程重启次数达到上限")
return
}
cmd := exec.Command(p.StartCommand[0], p.StartCommand[1:]...)
cmd.Dir = p.cmd.Dir
pf, err := pty.Start(cmd)
if err != nil || p.cmd.Process == nil {
log.Logger.Error("进程启动出错:", err)
return
}
pty.Setsize(pf, &pty.Winsize{
Rows: 100,
Cols: 100,
})
p.pty = pf
p.State.restartTimes++
log.Logger.Infow("进程启动成功", "进程名称", p.Name, "重启次数", p.State.restartTimes)
p.cmd = cmd
p.pInit()
p.push("进程启动成功")
}
func (p *ProcessPty) WriteBytes(input []byte) (err error) {
p.logReportHandler(config.CF.ProcessInputPrefix + string(input))
_, err = p.pty.Write(input)
return
}
func (p *ProcessPty) Write(input string) (err error) {
p.logReportHandler(config.CF.ProcessInputPrefix + input)
_, err = p.pty.Write([]byte(input))
return
}
func (p *ProcessPty) readInit() {
log.Logger.Debugw("stdout读取线程已启动", "进程名", p.Name, "使用者", p.WhoUsing)
buf := make([]byte, 1024)
for {
select {
case <-p.stopChan:
{
p.IsUsing.Store(false)
p.WhoUsing = ""
log.Logger.Debugw("stdout读取线程已退出", "进程名", p.Name, "使用者", p.WhoUsing)
return
}
default:
{
n, _ := p.pty.Read(buf)
p.bufHanle(buf[:n])
if p.IsUsing.Load() {
p.ws.wsMux.Lock()
p.ws.wsConnect.WriteMessage(websocket.TextMessage, buf[:n])
p.ws.wsMux.Unlock()
}
}
}
}
}
func (p *ProcessPty) ReadCache(ws *websocket.Conn) {
ws.WriteMessage(websocket.TextMessage, p.cacheBytesBuf.Bytes())
}
func (p *ProcessPty) bufHanle(b []byte) {
log := strings.TrimSpace(string(b))
if utils.RemoveANSI(log) != "" {
p.logReportHandler(log)
}
p.cacheBytesBuf.Write(b)
p.cacheBytesBuf.Next(len(b))
}
func (p *ProcessPty) pInit() {
p.SetTermType("pty")
p.Control.changControlChan = make(chan int)
p.stopChan = make(chan struct{})
p.State.State = 1
p.Pid = p.cmd.Process.Pid
p.State.startTime = time.Now()
p.cacheBytesBuf = bytes.NewBuffer(make([]byte, config.CF.ProcessMsgCacheBufLimit))
p.InitPerformanceStatus()
p.initPsutil()
go p.readInit()
go p.monitorHanler()
go p.watchDog()
}
func RunNewProcessPty(pconfig model.Process) (*ProcessPty, error) {
args := strings.Split(pconfig.Cmd, " ")
cmd := exec.Command(args[0], args[1:]...) // 替换为你要执行的命令及参数
processPty := ProcessPty{
ProcessBase: ProcessBase{
Name: pconfig.Name,
StartCommand: args,
},
}
cmd.Dir = pconfig.Cwd
pf, err := pty.Start(cmd)
if err != nil || cmd.Process == nil {
log.Logger.Error("进程启动出错:", err)
return nil, err
}
pty.Setsize(pf, &pty.Winsize{
Rows: 100,
Cols: 100,
})
processPty.pty = pf
processPty.cmd = cmd
log.Logger.Infow("创建进程成功")
processPty.setProcessConfig(pconfig)
processPty.pInit()
return &processPty, nil
}

View File

@@ -0,0 +1,182 @@
package process
import (
"bufio"
"fmt"
"io"
"msm/config"
"msm/log"
"msm/model"
"os/exec"
"strings"
"time"
"github.com/gorilla/websocket"
)
type ProcessStd struct {
ProcessBase
cacheLine []string
stdin io.WriteCloser
stdout *bufio.Scanner
}
func (p *ProcessStd) Kill() error {
return p.cmd.Process.Kill()
}
func (p *ProcessStd) watchDog() {
state, _ := p.cmd.Process.Wait()
close(p.stopChan)
p.State.State = 0
if state.ExitCode() != 0 {
log.Logger.Infow("进程停止", "进程名称", p.Name, "exitCode", state.ExitCode(), "进程类型", "std")
p.push(fmt.Sprintf("进程停止,退出码 %d", state.ExitCode()))
if p.Config.AutoRestart {
p.ReStart()
}
} else {
log.Logger.Infow("进程正常退出", "进程名称", p.Name)
p.push("进程正常退出")
}
}
func (p *ProcessStd) WriteBytes(input []byte) (err error) {
p.logReportHandler(config.CF.ProcessInputPrefix + string(input))
_, err = p.stdin.Write(append(input, '\n'))
return
}
func (p *ProcessStd) Write(input string) (err error) {
p.logReportHandler(config.CF.ProcessInputPrefix + input)
_, err = p.stdin.Write([]byte(input + "\n"))
return
}
func (p *ProcessStd) ReStart() {
if p.State.restartTimes > config.CF.ProcessRestartsLimit {
log.Logger.Warnw("重启次数达到上限", "name", p.Name, "limit", config.CF.ProcessRestartsLimit)
p.State.State = 2
p.State.Info = "重启次数异常"
p.push("进程重启次数达到上限")
return
}
cmd := exec.Command(p.StartCommand[0], p.StartCommand[1:]...) // 替换为你要执行的命令及参数
cmd.Dir = p.cmd.Dir
out, err := cmd.StdoutPipe()
if err != nil {
log.Logger.Errorw("重启失败,输出管道获取失败", "err", err)
p.Config.AutoRestart = false
return
}
p.stdout = bufio.NewScanner(out)
p.stdin, err = cmd.StdinPipe()
if err != nil {
log.Logger.Errorw("重启失败,输入管道获取失败", "err", err)
p.Config.AutoRestart = false
return
}
err = cmd.Start()
if err != nil {
log.Logger.Errorw("重启失败,进程启动出错:", "err", err)
p.Config.AutoRestart = false
return
}
p.State.restartTimes++
log.Logger.Infow("进程启动成功", "进程名称", p.Name, "重启次数", p.State.restartTimes)
p.cmd = cmd
p.pInit()
p.push("进程启动成功")
}
func (p *ProcessStd) pInit() {
log.Logger.Infow("创建进程成功")
p.Control.changControlChan = make(chan int)
p.stopChan = make(chan struct{})
p.State.State = 1
p.Pid = p.cmd.Process.Pid
p.State.startTime = time.Now()
p.cacheLine = make([]string, config.CF.ProcessMsgCacheLinesLimit)
p.InitPerformanceStatus()
p.initPsutil()
go p.watchDog()
go p.readInit()
go p.monitorHanler()
}
func (p *ProcessStd) ReadCache(ws *websocket.Conn) {
for _, line := range p.cacheLine {
ws.WriteMessage(websocket.TextMessage, []byte(line))
}
}
func (p *ProcessStd) readInit() {
var output string
log.Logger.Debugw("stdout读取线程已启动", "进程名", p.Name, "使用者", p.WhoUsing)
for {
select {
case <-p.stopChan:
{
p.IsUsing.Store(false)
p.WhoUsing = ""
log.Logger.Debugw("stdout读取线程已退出", "进程名", p.Name, "使用者", p.WhoUsing)
return
}
default:
{
output = p.Read()
if p.IsUsing.Load() && output != "" {
p.ws.wsMux.Lock()
p.ws.wsConnect.WriteMessage(websocket.TextMessage, []byte(output))
p.ws.wsMux.Unlock()
}
}
}
}
}
func (p *ProcessStd) Read() string {
if p.stdout.Scan() {
output := p.stdout.Text()
p.logReportHandler(output)
p.cacheLine = p.cacheLine[1:]
p.cacheLine = append(p.cacheLine, output)
return output
}
return ""
}
func RunNewProcessStd(pconfig model.Process) (*ProcessStd, error) {
args := strings.Split(pconfig.Cmd, " ")
cmd := exec.Command(args[0], args[1:]...) // 替换为你要执行的命令及参数
processStd := ProcessStd{
ProcessBase: ProcessBase{
Name: pconfig.Name,
StartCommand: args,
},
}
cmd.Dir = pconfig.Cwd
out, err := cmd.StdoutPipe()
if err != nil {
log.Logger.Errorw("输出管道获取失败", "err", err)
return nil, err
}
processStd.stdout = bufio.NewScanner(out)
processStd.stdin, err = cmd.StdinPipe()
if err != nil {
log.Logger.Errorw("输入管道获取失败", "err", err)
return nil, err
}
err = cmd.Start()
if err != nil || cmd.Process == nil {
log.Logger.Error("进程启动出错:", err)
return nil, err
}
log.Logger.Infow("创建进程成功", "config", pconfig)
processStd.cmd = cmd
processStd.SetTermType("std")
processStd.pInit()
processStd.setProcessConfig(pconfig)
return &processStd, nil
}

124
service/process/service.go Normal file
View File

@@ -0,0 +1,124 @@
package process
import (
"errors"
"msm/dao"
"msm/log"
"msm/model"
"strings"
"sync"
)
type processCtlService struct{}
var processMap sync.Map = sync.Map{}
var ProcessCtlService = new(processCtlService)
func (p *processCtlService) AddProcess(uuid int, prcess Process) {
processMap.Store(uuid, prcess)
// processMap.Store("111", prcess)
// return "111"
}
func (p *processCtlService) KillProcess(uuid int) error {
value, ok := processMap.Load(uuid)
if !ok {
return errors.New("进程不存在")
}
result, ok := value.(Process)
if !ok {
return errors.New("进程类型错误")
}
result.SetAutoRestart(false)
return result.Kill()
}
func (p *processCtlService) GetProcess(uuid int) (Process, error) {
process, ok := processMap.Load(uuid)
if !ok {
return nil, errors.New("进程获取失败")
}
result, ok := process.(Process)
if !ok {
return nil, errors.New("进程类型错误")
}
return result, nil
}
func (p *processCtlService) KillAllProcess() {
processMap.Range(func(key, value any) bool {
value.(Process).Kill()
return true
})
}
func (p *processCtlService) DeleteProcess(uuid int) {
processMap.Delete(uuid)
}
func (p *processCtlService) GetProcessList() []model.ProcessInfo {
processConfiglist := dao.ProcessDao.GetAllProcessConfig()
return p.getProcessInfoList(processConfiglist)
}
func (p *processCtlService) GetProcessListByUser(username string) []model.ProcessInfo {
processConfiglist := dao.ProcessDao.GetProcessConfigByUser(username)
return p.getProcessInfoList(processConfiglist)
}
func (p *processCtlService) getProcessInfoList(processConfiglist []model.Process) []model.ProcessInfo {
processInfoList := []model.ProcessInfo{}
for _, v := range processConfiglist {
pi := model.ProcessInfo{
Name: v.Name,
Uuid: v.Uuid,
}
if value, ok := processMap.Load(v.Uuid); ok {
process := value.(Process)
pi.State.Info = process.GetStateInfo()
pi.State.State = process.GetStateState()
pi.StartTime = process.GetStartTimeFormat()
pi.User = process.GetWhoUsing()
pi.Usage.Cpu = process.GetCpuUsage()
pi.Usage.Mem = process.GetMemUsage()
pi.Usage.Time = process.GetTimeRecord()
pi.TermType = process.GetTermType()
}
processInfoList = append(processInfoList, pi)
}
return processInfoList
}
func (p *processCtlService) ProcessInit() {
config := dao.ProcessDao.GetAllProcessConfig()
for _, v := range config {
if !v.AutoRestart {
continue
}
proc, err := RunNewProcess(v)
if err != nil {
log.Logger.Warnw("初始化启动进程失败", v.Name, "name", "err", err)
continue
}
p.AddProcess(v.Uuid, proc)
}
}
func (p *processCtlService) UpdateProcessConfig(config model.Process) error {
process, ok := processMap.Load(config.Uuid)
if !ok {
return errors.New("进程获取失败")
}
result, ok := process.(Process)
if !ok {
return errors.New("进程类型错误")
}
result.SetConfigLogReport(config.LogReport)
result.SetConfigStatuPush(config.Push)
result.SetConfigAutoRestart(config.AutoRestart)
result.SetStartCommand(strings.Split(config.Cmd, " "))
result.SetName(config.Name)
return nil
}

35
service/push/push.go Normal file
View File

@@ -0,0 +1,35 @@
package push
import (
"msm/dao"
"strings"
"github.com/levigross/grequests"
)
type pushService struct{}
var PushService = new(pushService)
func (p *pushService) Push(placeholders map[string]string) {
pl := dao.PushDao.GetPushList()
for _, v := range pl {
if v.Enable {
if v.Method == "GET" {
grequests.Get(p.getReplaceMessage(placeholders, v.Url), nil)
}
if v.Method == "POST" {
grequests.Post(v.Url, &grequests.RequestOptions{
JSON: p.getReplaceMessage(placeholders, v.Body),
})
}
}
}
}
func (p *pushService) getReplaceMessage(placeholders map[string]string, message string) string {
for k, v := range placeholders {
message = strings.ReplaceAll(message, k, v)
}
return message
}