Files
public/myelastic/myelastic.go
xiexiaojun 2ace0bade5 new
new
2019-03-07 21:30:01 +08:00

392 lines
8.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package myelastic
import (
"context"
"encoding/json"
"errors"
"log"
"reflect"
"strings"
"time"
"github.com/olivere/elastic"
)
//
type MyElastic struct {
Client *elastic.Client
Err error
Ctx context.Context
}
//
func OnInitES(url string) MyElastic {
var es MyElastic
es.Ctx = context.Background()
es.Client, es.Err = elastic.NewClient(elastic.SetURL(url))
if es.Err != nil {
log.Println(es.Err)
//mylog.Error(es.Err)
//panic(es.Err)
}
return es
}
//func (es *MyElastic) Model(refs interface{}) *MyElastic {
// if reflect.ValueOf(refs).Type().Kind() != reflect.Ptr {
// mylog.Println("Model: attempt to Model into a pointer")
// panic(0)
// }
// es.Element = refs
// return es
//}
/*
创建索引(相当于数据库)
mapping 如果为空("")则表示不创建模型
*/
func (es *MyElastic) CreateIndex(index_name, mapping string) (result bool) {
es.Err = nil
exists, err := es.Client.IndexExists(index_name).Do(es.Ctx)
if err != nil {
es.Err = err
log.Println(es.Err)
return false
}
if !exists {
var re *elastic.IndicesCreateResult
if len(mapping) == 0 {
re, es.Err = es.Client.CreateIndex(index_name).Do(es.Ctx)
} else {
re, es.Err = es.Client.CreateIndex(index_name).BodyString(mapping).Do(es.Ctx)
}
if es.Err != nil {
log.Println(es.Err)
return false
}
return re.Acknowledged
}
return false
}
/*
排序查询
返回json数据集合
*/
func (es *MyElastic) SortQuery(index_name string, builder []elastic.Sorter, query []elastic.Query) (bool, []string) {
searchResult := es.Client.Search().Index(index_name)
if len(builder) > 0 {
for _, v := range builder {
searchResult = searchResult.SortBy(v)
}
}
if len(query) > 0 {
for _, v := range query {
searchResult = searchResult.Query(v)
}
}
es_result, err := searchResult.Do(es.Ctx) // execute
if err != nil {
log.Println(es.Err)
return false, nil
}
//log.Println("Found a total of %d entity\n", es_result.TotalHits())
if es_result.Hits.TotalHits > 0 {
var result []string
//log.Println("Found a total of %d entity\n", searchResult.Hits.TotalHits)
for _, hit := range es_result.Hits.Hits {
result = append(result, string(*hit.Source))
}
return true, result
} else {
// No hits
return true, nil
}
}
/*
排序查询
返回原始Hit
builder排序
agg聚合 类似group_by sum
query查询
*/
func (es *MyElastic) SortQueryReturnHits(index_name string, from, size int, builder []elastic.Sorter, query []elastic.Query) (bool, []*elastic.SearchHit) {
searchResult := es.Client.Search().Index(index_name)
if len(builder) > 0 {
for _, v := range builder {
searchResult = searchResult.SortBy(v)
}
}
if len(query) > 0 {
for _, v := range query {
searchResult = searchResult.Query(v)
}
}
if size > 0 {
searchResult = searchResult.From(from)
searchResult = searchResult.Size(size)
}
es_result, err := searchResult.Do(es.Ctx) // execute
if err != nil {
log.Println(es.Err)
return false, nil
}
// log.Println("wwwwww", es_result.Aggregations)
if es_result.Hits.TotalHits > 0 {
return true, es_result.Hits.Hits
} else {
return true, nil
}
}
/*
添加记录,覆盖添加
*/
func (es *MyElastic) Add(index_name, type_name, id string, data interface{}) (result bool) {
result = false
// Index a tweet (using JSON serialization)
if len(id) > 0 {
_, es.Err = es.Client.Index().
Index(index_name).
Type(type_name).
Id(id).
BodyJson(data).
Do(es.Ctx)
} else {
_, es.Err = es.Client.Index().
Index(index_name).
Type(type_name).
BodyJson(data).
Do(es.Ctx)
}
if es.Err != nil {
log.Println(es.Err)
return false
}
_, es.Err = es.Client.Flush().Index(index_name).Do(es.Ctx)
if es.Err != nil {
log.Println(es.Err)
return false
}
return true
}
/*
添加记录,覆盖添加
index_name
type_name
query interface{} //查询条件
out *[]Param //查询结果
*/
func (es *MyElastic) SearchMap(index_name, type_name string, query interface{}, out *[]map[string]interface{}) (result bool) {
es_search := es.Client.Search()
if len(type_name) > 0 {
es_search = es_search.Type(type_name)
}
if len(index_name) > 0 {
es_search = es_search.Index(index_name)
}
var es_result *elastic.SearchResult
es_result, es.Err = es_search.Source(query).Do(es.Ctx)
if es.Err != nil {
log.Println(es.Err)
return false
}
if es_result.Hits == nil {
log.Println(errors.New("expected SearchResult.Hits != nil; got nil"))
return false
}
for _, hit := range es_result.Hits.Hits {
tmp := make(map[string]interface{})
err := json.Unmarshal(*hit.Source, &tmp)
if err != nil {
log.Println(es.Err)
} else {
*out = append(*out, tmp)
}
}
return true
}
/*
添加记录,覆盖添加
index_name
type_name
query interface{} //查询条件
out *[]Param //查询结果
*/
func (es *MyElastic) Search(index_name, type_name string, query interface{}, out interface{}) (result bool) {
sliceValue := reflect.Indirect(reflect.ValueOf(out))
if sliceValue.Kind() != reflect.Slice {
log.Println(errors.New("needs a pointer to a slice"))
return false
}
sliceElementType := sliceValue.Type().Elem()
es_search := es.Client.Search()
if len(type_name) > 0 {
es_search = es_search.Type(type_name)
}
if len(index_name) > 0 {
es_search = es_search.Index(index_name)
}
var es_result *elastic.SearchResult
es_result, es.Err = es_search.Source(query).Do(es.Ctx)
if es.Err != nil {
log.Println(es.Err)
return false
}
if es_result.Hits == nil {
log.Println(errors.New("expected SearchResult.Hits != nil; got nil"))
return false
}
for _, hit := range es_result.Hits.Hits {
newValue := reflect.New(sliceElementType)
item := make(map[string]interface{})
err := json.Unmarshal(*hit.Source, &item)
//fmt.Println(string(*hit.Source))
err = scanMapIntoStruct(newValue.Interface(), item)
if err != nil {
log.Println(err)
}
if err != nil {
log.Println(err)
} else {
sliceValue.Set(reflect.Append(sliceValue, reflect.Indirect(reflect.ValueOf(newValue.Interface()))))
//out = append(out, tmp)
}
}
return true
}
func scanMapIntoStruct(obj interface{}, objMap map[string]interface{}) error {
dataStruct := reflect.Indirect(reflect.ValueOf(obj))
if dataStruct.Kind() != reflect.Struct {
return errors.New("expected a pointer to a struct")
}
dataStructType := dataStruct.Type()
for i := 0; i < dataStructType.NumField(); i++ {
field := dataStructType.Field(i)
fieldv := dataStruct.Field(i)
err := scanMapElement(fieldv, field, objMap)
if err != nil {
return err
}
}
return nil
}
func scanMapElement(fieldv reflect.Value, field reflect.StructField, objMap map[string]interface{}) error {
objFieldName := field.Name
bb := field.Tag
sqlTag := bb.Get("json")
if bb.Get("json") == "-" || sqlTag == "-" || reflect.ValueOf(bb).String() == "-" {
return nil
}
sqlTags := strings.Split(sqlTag, ",")
sqlFieldName := objFieldName
if len(sqlTags[0]) > 0 {
sqlFieldName = sqlTags[0]
}
data, ok := objMap[sqlFieldName]
if !ok || data == nil {
return nil
}
// fmt.Println("================")
// fmt.Println(field.Type.Kind())
// fmt.Println(sqlFieldName)
var v interface{}
switch field.Type.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
x := int(data.(float64))
v = x
case reflect.Slice:
if fieldv.Type().String() == "[]uint8" {
x := []byte(data.(string))
v = x
} else if fieldv.Type().String() == "[]string" {
mp := data.([]interface{})
var ss []string
for _, v := range mp {
ss = append(ss, v.(string))
}
v = ss
} else if fieldv.Type().String() == "[]int" {
mp := data.([]interface{})
var ss []int
for _, v := range mp {
ss = append(ss, int(v.(float64)))
}
v = ss
} else {
v = data
}
case reflect.Struct:
if fieldv.Type().String() == "time.Time" {
x, err := time.Parse("2006-01-02 15:04:05", data.(string))
if err != nil {
x, err = time.Parse("2006-01-02 15:04:05.000 -0700", data.(string))
if err != nil {
if err != nil {
x, err = time.Parse("2006-01-02T15:04:05.999999999Z07:00", data.(string))
if err != nil {
return errors.New("unsupported time format: " + data.(string))
}
}
}
}
v = x
} else {
v = data
}
default:
v = data
}
fieldv.Set(reflect.ValueOf(v))
// fmt.Println("================")
return nil
}