mirror of
				https://github.com/lzh-1625/go_process_manager.git
				synced 2025-10-26 09:20:24 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			178 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			178 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package logic
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"reflect"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/lzh-1625/go_process_manager/config"
 | |
| 	"github.com/lzh-1625/go_process_manager/internal/app/model"
 | |
| 	"github.com/lzh-1625/go_process_manager/log"
 | |
| 
 | |
| 	"github.com/olivere/elastic/v7"
 | |
| )
 | |
| 
 | |
| type esLogic struct {
 | |
| 	esClient *elastic.Client
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	EsLogic = new(esLogic)
 | |
| )
 | |
| 
 | |
| func (e *esLogic) InitEs() bool {
 | |
| 	if !config.CF.EsEnable {
 | |
| 		log.Logger.Debug("不使用es")
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	EsLogic.esClient, err = elastic.NewClient(
 | |
| 		elastic.SetURL(config.CF.EsUrl),
 | |
| 		elastic.SetBasicAuth(config.CF.EsUsername, config.CF.EsPassword),
 | |
| 		elastic.SetSniff(false),
 | |
| 		elastic.SetHttpClient(&http.Client{
 | |
| 			Transport: &http.Transport{
 | |
| 				MaxIdleConnsPerHost: config.CF.LogHandlerPoolSize,
 | |
| 				IdleConnTimeout:     90 * time.Second,
 | |
| 			},
 | |
| 		}),
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		config.CF.EsEnable = false
 | |
| 		log.Logger.Warnw("Failed to connect to es", "err", err)
 | |
| 		return false
 | |
| 	}
 | |
| 	EsLogic.CreateIndexIfNotExists(config.CF.EsIndex)
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (e *esLogic) Insert(logContent string, processName string, using string, ts int64) {
 | |
| 	data := model.ProcessLog{
 | |
| 		Log:   logContent,
 | |
| 		Name:  processName,
 | |
| 		Using: using,
 | |
| 		Time:  ts,
 | |
| 	}
 | |
| 	_, err := e.esClient.Index().Index(config.CF.EsIndex).BodyJson(data).Do(context.TODO())
 | |
| 	if err != nil {
 | |
| 		log.Logger.Errorw("es数据插入失败", "err", err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (e *esLogic) CreateIndexIfNotExists(index string) error {
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 	exists, err := e.esClient.IndexExists(index).Do(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if exists {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	info, err := e.esClient.CreateIndex(index).BodyString(e.structToJSON()).Do(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if !info.Acknowledged {
 | |
| 		return fmt.Errorf("ES 创建索引 [%s] 失败", index)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (e *esLogic) Search(req model.GetLogReq, filterProcessName ...string) model.LogResp {
 | |
| 	// 检查 req 是否为 nil
 | |
| 	if req.Page.From < 0 || req.Page.Size <= 0 {
 | |
| 		log.Logger.Error("无效的分页请求参数")
 | |
| 		return model.LogResp{Total: 0, Data: []*model.ProcessLog{}}
 | |
| 	}
 | |
| 
 | |
| 	search := e.esClient.Search(config.CF.EsIndex).From(req.Page.From).Size(req.Page.Size).TrackScores(true)
 | |
| 	if req.Sort == "asc" {
 | |
| 		search.Sort("time", true)
 | |
| 	}
 | |
| 	if req.Sort == "desc" {
 | |
| 		search.Sort("time", false)
 | |
| 	}
 | |
| 
 | |
| 	queryList := []elastic.Query{}
 | |
| 	timeRangeQuery := elastic.NewRangeQuery("time")
 | |
| 	if req.TimeRange.StartTime != 0 {
 | |
| 		queryList = append(queryList, timeRangeQuery.Gte(req.TimeRange.StartTime))
 | |
| 	}
 | |
| 	if req.TimeRange.EndTime != 0 {
 | |
| 		queryList = append(queryList, timeRangeQuery.Lte(req.TimeRange.EndTime))
 | |
| 	}
 | |
| 	if req.Match.Log != "" {
 | |
| 		queryList = append(queryList, elastic.NewMatchQuery("log", req.Match.Log))
 | |
| 	}
 | |
| 	if req.Match.Name != "" {
 | |
| 		queryList = append(queryList, elastic.NewMatchQuery("name", req.Match.Name))
 | |
| 	}
 | |
| 	if req.Match.Using != "" {
 | |
| 		queryList = append(queryList, elastic.NewMatchQuery("using", req.Match.Using))
 | |
| 	}
 | |
| 
 | |
| 	if len(filterProcessName) != 0 { // 过滤进程名
 | |
| 		shouldQueryList := []elastic.Query{}
 | |
| 		for _, fpn := range filterProcessName {
 | |
| 			shouldQueryList = append(shouldQueryList, elastic.NewMatchQuery("name", fpn))
 | |
| 		}
 | |
| 		if len(shouldQueryList) > 0 {
 | |
| 			shouldQuery := elastic.NewBoolQuery().Should(shouldQueryList...)
 | |
| 			queryList = append(queryList, shouldQuery)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	result := model.LogResp{}
 | |
| 	resp, err := search.Query(elastic.NewBoolQuery().Must(queryList...)).Do(context.TODO())
 | |
| 	if err != nil {
 | |
| 		log.Logger.Errorw("es查询失败", "err", err, "reason", resp.Error.Reason)
 | |
| 		return result
 | |
| 	}
 | |
| 
 | |
| 	// 遍历响应结果
 | |
| 	for _, v := range resp.Hits.Hits {
 | |
| 		if v.Source != nil {
 | |
| 			var data model.ProcessLog
 | |
| 			if err := json.Unmarshal(v.Source, &data); err == nil {
 | |
| 				result.Data = append(result.Data, &data)
 | |
| 			} else {
 | |
| 				log.Logger.Errorw("JSON 解码失败", "err", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	result.Total = resp.TotalHits()
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| // 通过反射得到mapping
 | |
| func (e *esLogic) structToJSON() string {
 | |
| 	typ := reflect.TypeOf(model.ProcessLog{})
 | |
| 	properties := make(map[string]map[string]string)
 | |
| 	for i := 0; i < typ.NumField(); i++ {
 | |
| 		field := typ.Field(i)
 | |
| 		fieldTag := field.Tag.Get("type")
 | |
| 		if fieldTag != "" {
 | |
| 			properties[field.Tag.Get("json")] = map[string]string{
 | |
| 				"type": fieldTag,
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	result := map[string]interface{}{
 | |
| 		"mappings": map[string]interface{}{
 | |
| 			"properties": properties,
 | |
| 		},
 | |
| 	}
 | |
| 	jsonData, err := json.Marshal(result)
 | |
| 	if err != nil {
 | |
| 		return ""
 | |
| 	}
 | |
| 	return string(jsonData)
 | |
| }
 | 
