mirror of
https://github.com/lzh-1625/go_process_manager.git
synced 2025-09-27 04:16:04 +08:00
Compare commits
10 Commits
39b199eac6
...
dev
Author | SHA1 | Date | |
---|---|---|---|
![]() |
3fb8cd4dad | ||
![]() |
274a4d93df | ||
![]() |
c383e1737f | ||
![]() |
2a40d560d4 | ||
![]() |
d968ce2a3e | ||
![]() |
51ff529a88 | ||
![]() |
ed237e1ddb | ||
![]() |
f980e79e02 | ||
![]() |
cb15544ea3 | ||
![]() |
c69056d1b7 |
@@ -123,6 +123,7 @@ func initListenKillSignal() {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigs
|
||||
logger.Logger.Info("进程正在退出,等待全部进程停止")
|
||||
logic.ProcessCtlLogic.KillAllProcess()
|
||||
log.Print("已停止所有进程")
|
||||
os.Exit(0)
|
||||
|
@@ -7,7 +7,34 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var startTitle = `
|
||||
----------------------------------------------------------------------------
|
||||
_____ _____ _____
|
||||
/\ \ /\ \ /\ \
|
||||
/::\ \ /::\ \ /::\____\
|
||||
/::::\ \ /::::\ \ /::::| |
|
||||
/::::::\ \ /::::::\ \ /:::::| |
|
||||
/:::/\:::\ \ /:::/\:::\ \ /::::::| |
|
||||
/:::/ \:::\ \ /:::/__\:::\ \ /:::/|::| |
|
||||
/:::/ \:::\ \ /::::\ \:::\ \ /:::/ |::| |
|
||||
/:::/ / \:::\ \ /::::::\ \:::\ \ /:::/ |::|___|______
|
||||
/:::/ / \:::\ ___\ /:::/\:::\ \:::\____\ /:::/ |::::::::\ \
|
||||
/:::/____/ ___\:::| |/:::/ \:::\ \:::| |/:::/ |:::::::::\____\
|
||||
\:::\ \ /\ /:::|____|\::/ \:::\ /:::|____|\::/ / ~~~~~/:::/ /
|
||||
\:::\ /::\ \::/ / \/_____/\:::\/:::/ / \/____/ /:::/ /
|
||||
\:::\ \:::\ \/____/ \::::::/ / /:::/ /
|
||||
\:::\ \:::\____\ \::::/ / /:::/ /
|
||||
\:::\ /:::/ / \::/____/ /:::/ /
|
||||
\:::\/:::/ / ~~ /:::/ /
|
||||
\::::::/ / /:::/ /
|
||||
\::::/ / /:::/ /
|
||||
\::/____/ \::/ /
|
||||
\/____/
|
||||
----------------------------------------------------------------------------
|
||||
`
|
||||
|
||||
func main() {
|
||||
print(startTitle)
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
route.Route()
|
||||
}
|
||||
|
@@ -25,7 +25,7 @@ func NewTaskJob(data model.Task) (*TaskJob, error) {
|
||||
TaskConfig: &data,
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
if data.CronExpression != "" {
|
||||
if data.Enable && data.CronExpression != "" {
|
||||
err := tj.InitCronHandle()
|
||||
if err != nil {
|
||||
log.Logger.Warnw("定时任务启动失败", "err", err, "task", data.Id)
|
||||
@@ -114,6 +114,7 @@ func (t *TaskJob) InitCronHandle() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Start()
|
||||
t.Cron = c
|
||||
return nil
|
||||
}
|
||||
|
@@ -89,10 +89,10 @@ type LogResp struct {
|
||||
|
||||
type ProcessLog struct {
|
||||
Id int `json:"id,omitempty" gorm:"primaryKey;autoIncrement;column:id" `
|
||||
Log string `json:"log" gorm:"column:log" type:"text"`
|
||||
Time int64 `json:"time" gorm:"column:time" type:"long"`
|
||||
Name string `json:"name" gorm:"column:name" type:"text"`
|
||||
Using string `json:"using" gorm:"column:using" type:"keyword"`
|
||||
Log string `json:"log" gorm:"column:log"`
|
||||
Time int64 `json:"time" gorm:"column:time"`
|
||||
Name string `json:"name" gorm:"column:name"`
|
||||
Using string `json:"using" gorm:"column:using"`
|
||||
}
|
||||
|
||||
func (n *ProcessLog) TableName() string {
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/model"
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/repository/query"
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/search"
|
||||
"github.com/lzh-1625/go_process_manager/log"
|
||||
)
|
||||
|
||||
@@ -18,7 +19,7 @@ func (l *logRepository) InsertLog(data model.ProcessLog) {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *logRepository) SearchLog(req model.GetLogReq) (result []*model.ProcessLog, total int64) {
|
||||
func (l *logRepository) SearchLog(req model.GetLogReq, logQuery []search.Query) (result []*model.ProcessLog, total int64) {
|
||||
q := query.ProcessLog.WithContext(context.TODO())
|
||||
if req.Match.Name != "" {
|
||||
q = q.Where(query.ProcessLog.Name.Eq(req.Match.Name))
|
||||
@@ -26,9 +27,16 @@ func (l *logRepository) SearchLog(req model.GetLogReq) (result []*model.ProcessL
|
||||
if req.Match.Using != "" {
|
||||
q = q.Where(query.ProcessLog.Using.Eq(req.Match.Using))
|
||||
}
|
||||
if req.Match.Log != "" {
|
||||
q = q.Where(query.ProcessLog.Log.Like("%" + req.Match.Log + "%"))
|
||||
|
||||
for _, v := range logQuery {
|
||||
switch v.Cond {
|
||||
case search.Match, search.WildCard:
|
||||
q = q.Where(query.ProcessLog.Log.Like("%" + v.Content + "%"))
|
||||
case search.NotMatch, search.NotWildCard:
|
||||
q = q.Where(query.ProcessLog.Log.NotLike("%" + v.Content + "%"))
|
||||
}
|
||||
}
|
||||
|
||||
if req.Sort == "desc" {
|
||||
q = q.Order(query.ProcessLog.Time.Desc())
|
||||
}
|
||||
|
@@ -184,7 +184,7 @@ func bind[T any, R any](fn func(*gin.Context, T) R, bindOption int) func(*gin.Co
|
||||
})
|
||||
return
|
||||
}
|
||||
case api.Response:
|
||||
case *api.Response:
|
||||
ctx.JSON(v.StatusCode, gin.H{
|
||||
"data": v.Data,
|
||||
"msg": v.Msg,
|
||||
|
@@ -4,6 +4,7 @@
|
||||
package bleve
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
@@ -73,14 +74,30 @@ func (b *bleveSearch) Insert(logContent string, processName string, using string
|
||||
}); err != nil {
|
||||
logger.Logger.Warnw("bleve log insert failed", "err", err)
|
||||
}
|
||||
fmt.Printf("using: %v\n", using)
|
||||
}
|
||||
|
||||
func (b *bleveSearch) Search(req model.GetLogReq, filterProcessName ...string) (result model.LogResp) {
|
||||
buildQuery := bleve.NewBooleanQuery()
|
||||
if req.Match.Log != "" {
|
||||
logQuery := bleve.NewMatchQuery(req.Match.Log)
|
||||
logQuery.SetField("log")
|
||||
buildQuery.AddMust(logQuery)
|
||||
for _, v := range sr.QueryStringAnalysis(req.Match.Log) {
|
||||
switch v.Cond {
|
||||
case sr.Match, sr.WildCard:
|
||||
logQuery := bleve.NewMatchQuery(v.Content)
|
||||
logQuery.SetField("log")
|
||||
buildQuery.AddMust(logQuery)
|
||||
case sr.NotMatch, sr.NotWildCard:
|
||||
logQuery := bleve.NewMatchQuery(v.Content)
|
||||
logQuery.SetField("log")
|
||||
buildQuery.AddMustNot(logQuery)
|
||||
// case sr.WildCard:
|
||||
// logQuery := bleve.NewWildcardQuery("*" + v.Content + "*")
|
||||
// logQuery.SetField("log")
|
||||
// buildQuery.AddMust(logQuery)
|
||||
// case sr.NotWildCard:
|
||||
// logQuery := bleve.NewWildcardQuery("*" + v.Content + "*")
|
||||
// logQuery.SetField("log")
|
||||
// buildQuery.AddMustNot(logQuery)
|
||||
}
|
||||
}
|
||||
if req.Match.Name != "" {
|
||||
nameQuery := bleve.NewTermQuery(req.Match.Name)
|
||||
@@ -112,10 +129,11 @@ func (b *bleveSearch) Search(req model.GetLogReq, filterProcessName ...string) (
|
||||
}
|
||||
sortArgs := ([]string{"-_score"})
|
||||
if req.Sort == "desc" {
|
||||
sortArgs = append(sortArgs, "-time")
|
||||
|
||||
sortArgs = ([]string{"-time"})
|
||||
}
|
||||
if req.Sort == "asc" {
|
||||
sortArgs = append(sortArgs, "time")
|
||||
sortArgs = ([]string{"time"})
|
||||
}
|
||||
hl := bleve.HighlightRequest{}
|
||||
hl.AddField("log")
|
||||
|
@@ -5,19 +5,18 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/lzh-1625/go_process_manager/config"
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/model"
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/search"
|
||||
sr "github.com/lzh-1625/go_process_manager/internal/app/search"
|
||||
"github.com/lzh-1625/go_process_manager/log"
|
||||
|
||||
"github.com/olivere/elastic/v7"
|
||||
)
|
||||
|
||||
func init() {
|
||||
search.Register("es", new(esSearch))
|
||||
sr.Register("es", new(esSearch))
|
||||
}
|
||||
|
||||
type esSearch struct {
|
||||
@@ -41,7 +40,6 @@ func (e *esSearch) Init() error {
|
||||
log.Logger.Warnw("Failed to connect to es", "err", err)
|
||||
return err
|
||||
}
|
||||
e.CreateIndexIfNotExists(config.CF.EsIndex)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -58,27 +56,6 @@ func (e *esSearch) Insert(logContent string, processName string, using string, t
|
||||
}
|
||||
}
|
||||
|
||||
func (e *esSearch) CreateIndexIfNotExists(index string) error {
|
||||
|
||||
ctx := context.Background()
|
||||
exists, err := e.esClient.IndexExists(index).Do(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
info, err := e.esClient.CreateIndex(index).BodyString(e.structToJSON()).Do(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.Acknowledged {
|
||||
return fmt.Errorf("ES 创建索引 [%s] 失败", index)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *esSearch) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp {
|
||||
// 检查 req 是否为 nil
|
||||
if req.Page.From < 0 || req.Page.Size <= 0 {
|
||||
@@ -102,9 +79,23 @@ func (e *esSearch) Search(req model.GetLogReq, filterProcessName ...string) mode
|
||||
if req.TimeRange.EndTime != 0 {
|
||||
queryList = append(queryList, timeRangeQuery.Lte(req.TimeRange.EndTime))
|
||||
}
|
||||
if req.Match.Log != "" {
|
||||
queryList = append(queryList, elastic.NewMatchQuery("log", req.Match.Log))
|
||||
notQuery := []elastic.Query{}
|
||||
|
||||
for _, v := range sr.QueryStringAnalysis(req.Match.Log) {
|
||||
switch v.Cond {
|
||||
case sr.Match:
|
||||
queryList = append(queryList, elastic.NewMatchQuery("log", v.Content))
|
||||
case sr.NotMatch:
|
||||
notQuery = append(notQuery, elastic.NewMatchQuery("log", v.Content))
|
||||
case sr.WildCard:
|
||||
queryList = append(queryList, elastic.NewWildcardQuery("log.keyword", "*"+v.Content+"*"))
|
||||
case sr.NotWildCard:
|
||||
notQuery = append(notQuery, elastic.NewWildcardQuery("log.keyword", "*"+v.Content+"*"))
|
||||
}
|
||||
fmt.Printf("v.Cond: %v\n", v.Cond)
|
||||
fmt.Printf("v.Content: %v\n", v.Content)
|
||||
}
|
||||
|
||||
if req.Match.Name != "" {
|
||||
queryList = append(queryList, elastic.NewMatchQuery("name", req.Match.Name))
|
||||
}
|
||||
@@ -123,7 +114,7 @@ func (e *esSearch) Search(req model.GetLogReq, filterProcessName ...string) mode
|
||||
}
|
||||
|
||||
result := model.LogResp{}
|
||||
resp, err := search.Query(elastic.NewBoolQuery().Must(queryList...)).Highlight(elastic.NewHighlight().Field("log").PreTags("\033[43m").PostTags("\033[0m")).Do(context.TODO())
|
||||
resp, err := search.Query(elastic.NewBoolQuery().Must(queryList...).MustNot(notQuery...)).Highlight(elastic.NewHighlight().Field("log").PreTags("\033[43m").PostTags("\033[0m")).Do(context.TODO())
|
||||
if err != nil {
|
||||
log.Logger.Errorw("es查询失败", "err", err, "reason", resp.Error.Reason)
|
||||
return result
|
||||
@@ -147,28 +138,3 @@ func (e *esSearch) Search(req model.GetLogReq, filterProcessName ...string) mode
|
||||
result.Total = resp.TotalHits()
|
||||
return result
|
||||
}
|
||||
|
||||
// 通过反射得到mapping
|
||||
func (e *esSearch) structToJSON() string {
|
||||
typ := reflect.TypeOf(model.ProcessLog{})
|
||||
properties := make(map[string]map[string]string)
|
||||
for i := 0; i < typ.NumField(); i++ {
|
||||
field := typ.Field(i)
|
||||
fieldTag := field.Tag.Get("type")
|
||||
if fieldTag != "" {
|
||||
properties[field.Tag.Get("json")] = map[string]string{
|
||||
"type": fieldTag,
|
||||
}
|
||||
}
|
||||
}
|
||||
result := map[string]interface{}{
|
||||
"mappings": map[string]interface{}{
|
||||
"properties": properties,
|
||||
},
|
||||
}
|
||||
jsonData, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(jsonData)
|
||||
}
|
||||
|
@@ -1,6 +1,9 @@
|
||||
package search
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/model"
|
||||
"github.com/lzh-1625/go_process_manager/log"
|
||||
)
|
||||
@@ -25,3 +28,42 @@ func GetSearchImpl(name string) LogLogic {
|
||||
log.Logger.Warnw("未找到对应的存储引擎,使用默认[sqlite]", "name", name)
|
||||
return searchImplMap["sqlite"]
|
||||
}
|
||||
|
||||
type Cond int
|
||||
|
||||
const (
|
||||
Match Cond = iota // ^
|
||||
WildCard // ~
|
||||
NotMatch // !^
|
||||
NotWildCard // !~
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
Cond Cond
|
||||
Content string
|
||||
}
|
||||
|
||||
func QueryStringAnalysis(s string) (query []Query) {
|
||||
if strings.TrimSpace(s) == "" {
|
||||
return
|
||||
}
|
||||
strList := strings.Split(s, " ")
|
||||
for _, v := range strList {
|
||||
switch {
|
||||
case strings.HasPrefix(v, "!^"):
|
||||
query = append(query, Query{NotMatch, v[2:]})
|
||||
case strings.HasPrefix(v, "!~"):
|
||||
query = append(query, Query{NotWildCard, v[2:]})
|
||||
case strings.HasPrefix(v, "!"):
|
||||
query = append(query, Query{NotMatch, v[1:]})
|
||||
case strings.HasPrefix(v, "^"):
|
||||
query = append(query, Query{Match, v[1:]})
|
||||
case strings.HasPrefix(v, "~"):
|
||||
query = append(query, Query{WildCard, v[1:]})
|
||||
default:
|
||||
query = append(query, Query{Match, v})
|
||||
}
|
||||
}
|
||||
fmt.Printf("query: %v\n", query)
|
||||
return
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/lzh-1625/go_process_manager/internal/app/model"
|
||||
@@ -16,12 +17,16 @@ type sqliteSearch struct{}
|
||||
|
||||
func (l *sqliteSearch) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp {
|
||||
req.FilterName = filterProcessName
|
||||
data, total := repository.LogRepository.SearchLog(req)
|
||||
if req.Match.Log != "" {
|
||||
query := search.QueryStringAnalysis(req.Match.Log)
|
||||
data, total := repository.LogRepository.SearchLog(req, query)
|
||||
for _, v := range slices.DeleteFunc(query, func(q search.Query) bool {
|
||||
return q.Cond == search.NotMatch || q.Cond == search.NotWildCard
|
||||
}) {
|
||||
for i := range data {
|
||||
data[i].Log = strings.ReplaceAll(data[i].Log, req.Match.Log, "\033[43m"+req.Match.Log+"\033[0m")
|
||||
data[i].Log = strings.ReplaceAll(data[i].Log, v.Content, "\033[43m"+v.Content+"\033[0m")
|
||||
}
|
||||
}
|
||||
|
||||
return model.LogResp{
|
||||
Data: data,
|
||||
Total: total,
|
||||
|
Reference in New Issue
Block a user