From 9823ca0ceffcf23ed852816aa43910418a1ec83e Mon Sep 17 00:00:00 2001 From: lzh <1625167628@qq.com> Date: Wed, 9 Jul 2025 14:51:22 +0800 Subject: [PATCH] optimal storge choice --- internal/app/api/config.go | 9 +-- internal/app/logic/log_logic.go | 71 +++---------------- internal/app/route/route.go | 2 +- internal/app/{logic => search/bleve}/bleve.go | 22 +++--- internal/app/{logic => search/es}/es.go | 31 ++++---- internal/app/search/search.go | 27 +++++++ internal/app/search/sqlite/sqlite.go | 42 +++++++++++ 7 files changed, 109 insertions(+), 95 deletions(-) rename internal/app/{logic => search/bleve}/bleve.go (89%) rename internal/app/{logic => search/es}/es.go (87%) create mode 100644 internal/app/search/search.go create mode 100644 internal/app/search/sqlite/sqlite.go diff --git a/internal/app/api/config.go b/internal/app/api/config.go index 55db311..c438eec 100644 --- a/internal/app/api/config.go +++ b/internal/app/api/config.go @@ -1,8 +1,6 @@ package api import ( - "errors" - "github.com/lzh-1625/go_process_manager/internal/app/logic" "github.com/gin-gonic/gin" @@ -27,9 +25,6 @@ func (c *configApi) SetSystemConfiguration(ctx *gin.Context, _ any) (err error) return } -func (c *configApi) EsConfigReload(ctx *gin.Context, _ any) (err error) { - if !logic.EsLogic.InitEs() { - err = errors.New("es init fail") - } - return +func (c *configApi) LogConfigReload(ctx *gin.Context, _ any) (err error) { + return logic.LogLogicImpl.Init() } diff --git a/internal/app/logic/log_logic.go b/internal/app/logic/log_logic.go index c28a6d4..e67cdf6 100644 --- a/internal/app/logic/log_logic.go +++ b/internal/app/logic/log_logic.go @@ -1,72 +1,17 @@ package logic import ( - "strings" - "github.com/lzh-1625/go_process_manager/config" - "github.com/lzh-1625/go_process_manager/internal/app/model" - "github.com/lzh-1625/go_process_manager/internal/app/repository" - "github.com/lzh-1625/go_process_manager/log" + "github.com/lzh-1625/go_process_manager/internal/app/search" + + _ "github.com/lzh-1625/go_process_manager/internal/app/search/bleve" + _ "github.com/lzh-1625/go_process_manager/internal/app/search/es" + _ "github.com/lzh-1625/go_process_manager/internal/app/search/sqlite" ) -type LogLogic interface { - Search(req model.GetLogReq, filterProcessName ...string) model.LogResp - Insert(log string, processName string, using string, ts int64) -} - -var LogLogicImpl LogLogic +var LogLogicImpl search.LogLogic func InitLog() { - switch config.CF.StorgeType { - case "es": - LogLogicImpl = LogEs - EsLogic.InitEs() - log.Logger.Infow("使用es作为日志存储") - case "bleve": - LogLogicImpl = BleveLogic - BleveLogic.InitBleve() - log.Logger.Infow("使用bleve作为日志存储") - default: - LogLogicImpl = LogSqlite - log.Logger.Infow("使用sqlite作为日志存储") - } -} - -type logSqlite struct{} - -var LogSqlite = new(logSqlite) - -func (l *logSqlite) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp { - req.FilterName = filterProcessName - data, total := repository.LogRepository.SearchLog(req) - if req.Match.Log != "" { - for i := range data { - data[i].Log = strings.ReplaceAll(data[i].Log, req.Match.Log, "\033[43m"+req.Match.Log+"\033[0m") - } - } - return model.LogResp{ - Data: data, - Total: total, - } -} - -func (l *logSqlite) Insert(log string, processName string, using string, ts int64) { - repository.LogRepository.InsertLog(model.ProcessLog{ - Log: log, - Name: processName, - Using: using, - Time: ts, - }) -} - -type logEs struct{} - -var LogEs = new(logEs) - -func (l *logEs) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp { - return EsLogic.Search(req, filterProcessName...) -} - -func (l *logEs) Insert(log string, processName string, using string, ts int64) { - EsLogic.Insert(log, processName, using, ts) + LogLogicImpl = search.GetSearchImpl(config.CF.StorgeType) + LogLogicImpl.Init() } diff --git a/internal/app/route/route.go b/internal/app/route/route.go index e336281..561a1e4 100644 --- a/internal/app/route/route.go +++ b/internal/app/route/route.go @@ -138,7 +138,7 @@ func routePathInit(r *gin.Engine) { { configGroup.GET("", bind(api.ConfigApi.GetSystemConfiguration, None)) configGroup.PUT("", bind(api.ConfigApi.SetSystemConfiguration, None)) - configGroup.PUT("/es", bind(api.ConfigApi.EsConfigReload, None)) + configGroup.PUT("/log", bind(api.ConfigApi.LogConfigReload, None)) } } } diff --git a/internal/app/logic/bleve.go b/internal/app/search/bleve/bleve.go similarity index 89% rename from internal/app/logic/bleve.go rename to internal/app/search/bleve/bleve.go index 9c390da..81bd8f4 100644 --- a/internal/app/logic/bleve.go +++ b/internal/app/search/bleve/bleve.go @@ -1,4 +1,4 @@ -package logic +package bleve import ( "time" @@ -9,17 +9,20 @@ import ( "github.com/google/uuid" "github.com/lzh-1625/go_process_manager/internal/app/model" + sr "github.com/lzh-1625/go_process_manager/internal/app/search" logger "github.com/lzh-1625/go_process_manager/log" gse "github.com/vcaesar/gse-bleve" ) -type bleveLogic struct { +func init() { + sr.Register("bleve", new(bleveSearch)) +} + +type bleveSearch struct { index bleve.Index } -var BleveLogic = new(bleveLogic) - -func (b *bleveLogic) InitBleve() { +func (b *bleveSearch) Init() error { opt := gse.Option{ Dicts: "embed, zh", Stop: "", @@ -29,7 +32,7 @@ func (b *bleveLogic) InitBleve() { indexMapping, err := gse.NewMapping(opt) if err != nil { logger.Logger.Errorw("bleve init fail", "err", err) - return + return err } mapping := bleve.NewDocumentMapping() log := bleve.NewTextFieldMapping() @@ -50,13 +53,14 @@ func (b *bleveLogic) InitBleve() { index, err = bleve.New("log.bleve", indexMapping) if err != nil { logger.Logger.Errorw("bleve init error", "err", err) - return + return err } } b.index = index + return nil } -func (b *bleveLogic) Insert(logContent string, processName string, using string, ts int64) { +func (b *bleveSearch) Insert(logContent string, processName string, using string, ts int64) { if err := b.index.Index(uuid.NewString(), model.ProcessLog{ Log: logContent, Name: processName, @@ -67,7 +71,7 @@ func (b *bleveLogic) Insert(logContent string, processName string, using string, } } -func (b *bleveLogic) Search(req model.GetLogReq, filterProcessName ...string) (result model.LogResp) { +func (b *bleveSearch) Search(req model.GetLogReq, filterProcessName ...string) (result model.LogResp) { buildQuery := bleve.NewBooleanQuery() if req.Match.Log != "" { logQuery := bleve.NewMatchQuery(req.Match.Log) diff --git a/internal/app/logic/es.go b/internal/app/search/es/es.go similarity index 87% rename from internal/app/logic/es.go rename to internal/app/search/es/es.go index 28b4a0a..84f7ca3 100644 --- a/internal/app/logic/es.go +++ b/internal/app/search/es/es.go @@ -1,4 +1,4 @@ -package logic +package es import ( "context" @@ -10,22 +10,23 @@ import ( "github.com/lzh-1625/go_process_manager/config" "github.com/lzh-1625/go_process_manager/internal/app/model" + "github.com/lzh-1625/go_process_manager/internal/app/search" "github.com/lzh-1625/go_process_manager/log" "github.com/olivere/elastic/v7" ) -type esLogic struct { +func init() { + search.Register("es", new(esSearch)) +} + +type esSearch struct { esClient *elastic.Client } -var ( - EsLogic = new(esLogic) -) - -func (e *esLogic) InitEs() bool { +func (e *esSearch) Init() error { var err error - EsLogic.esClient, err = elastic.NewClient( + e.esClient, err = elastic.NewClient( elastic.SetURL(config.CF.EsUrl), elastic.SetBasicAuth(config.CF.EsUsername, config.CF.EsPassword), elastic.SetSniff(false), @@ -38,13 +39,13 @@ func (e *esLogic) InitEs() bool { ) if err != nil { log.Logger.Warnw("Failed to connect to es", "err", err) - return false + return err } - EsLogic.CreateIndexIfNotExists(config.CF.EsIndex) - return true + e.CreateIndexIfNotExists(config.CF.EsIndex) + return nil } -func (e *esLogic) Insert(logContent string, processName string, using string, ts int64) { +func (e *esSearch) Insert(logContent string, processName string, using string, ts int64) { data := model.ProcessLog{ Log: logContent, Name: processName, @@ -57,7 +58,7 @@ func (e *esLogic) Insert(logContent string, processName string, using string, ts } } -func (e *esLogic) CreateIndexIfNotExists(index string) error { +func (e *esSearch) CreateIndexIfNotExists(index string) error { ctx := context.Background() exists, err := e.esClient.IndexExists(index).Do(ctx) @@ -78,7 +79,7 @@ func (e *esLogic) CreateIndexIfNotExists(index string) error { return nil } -func (e *esLogic) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp { +func (e *esSearch) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp { // 检查 req 是否为 nil if req.Page.From < 0 || req.Page.Size <= 0 { log.Logger.Error("无效的分页请求参数") @@ -148,7 +149,7 @@ func (e *esLogic) Search(req model.GetLogReq, filterProcessName ...string) model } // 通过反射得到mapping -func (e *esLogic) structToJSON() string { +func (e *esSearch) structToJSON() string { typ := reflect.TypeOf(model.ProcessLog{}) properties := make(map[string]map[string]string) for i := 0; i < typ.NumField(); i++ { diff --git a/internal/app/search/search.go b/internal/app/search/search.go new file mode 100644 index 0000000..b499d2a --- /dev/null +++ b/internal/app/search/search.go @@ -0,0 +1,27 @@ +package search + +import ( + "github.com/lzh-1625/go_process_manager/internal/app/model" + "github.com/lzh-1625/go_process_manager/log" +) + +type LogLogic interface { + Search(req model.GetLogReq, filterProcessName ...string) model.LogResp + Insert(log string, processName string, using string, ts int64) + Init() error +} + +var searchImplMap map[string]LogLogic = map[string]LogLogic{} + +func Register(name string, impl LogLogic) { + searchImplMap[name] = impl +} + +func GetSearchImpl(name string) LogLogic { + v, ok := searchImplMap[name] + if ok { + return v + } + log.Logger.Warnw("未找到对应的存储引擎,使用默认[sqlite]", "name", name) + return searchImplMap["sqlite"] +} diff --git a/internal/app/search/sqlite/sqlite.go b/internal/app/search/sqlite/sqlite.go new file mode 100644 index 0000000..ab26aa0 --- /dev/null +++ b/internal/app/search/sqlite/sqlite.go @@ -0,0 +1,42 @@ +package sqlite + +import ( + "strings" + + "github.com/lzh-1625/go_process_manager/internal/app/model" + "github.com/lzh-1625/go_process_manager/internal/app/repository" + "github.com/lzh-1625/go_process_manager/internal/app/search" +) + +func init() { + search.Register("sqlite", new(sqliteSearch)) +} + +type sqliteSearch struct{} + +func (l *sqliteSearch) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp { + req.FilterName = filterProcessName + data, total := repository.LogRepository.SearchLog(req) + if req.Match.Log != "" { + for i := range data { + data[i].Log = strings.ReplaceAll(data[i].Log, req.Match.Log, "\033[43m"+req.Match.Log+"\033[0m") + } + } + return model.LogResp{ + Data: data, + Total: total, + } +} + +func (l *sqliteSearch) Insert(log string, processName string, using string, ts int64) { + repository.LogRepository.InsertLog(model.ProcessLog{ + Log: log, + Name: processName, + Using: using, + Time: ts, + }) +} + +func (l *sqliteSearch) Init() error { + return nil +}