mirror of
https://github.com/zhufuyi/sponge.git
synced 2025-09-26 20:51:14 +08:00
380 lines
11 KiB
Plaintext
380 lines
11 KiB
Plaintext
package dao
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"golang.org/x/sync/singleflight"
|
|
|
|
"github.com/go-dev-frame/sponge/pkg/logger"
|
|
"github.com/go-dev-frame/sponge/pkg/mgo"
|
|
"github.com/go-dev-frame/sponge/pkg/mgo/query"
|
|
|
|
"github.com/go-dev-frame/sponge/internal/cache"
|
|
"github.com/go-dev-frame/sponge/internal/database"
|
|
"github.com/go-dev-frame/sponge/internal/model"
|
|
)
|
|
|
|
var _ UserExampleDao = (*userExampleDao)(nil)
|
|
|
|
// UserExampleDao defining the dao interface
|
|
type UserExampleDao interface {
|
|
Create(ctx context.Context, record *model.UserExample) error
|
|
DeleteByID(ctx context.Context, id string) error
|
|
UpdateByID(ctx context.Context, record *model.UserExample) error
|
|
GetByID(ctx context.Context, id string) (*model.UserExample, error)
|
|
GetByColumns(ctx context.Context, params *query.Params) ([]*model.UserExample, int64, error)
|
|
|
|
DeleteByIDs(ctx context.Context, ids []string) error
|
|
GetByCondition(ctx context.Context, condition *query.Conditions) (*model.UserExample, error)
|
|
GetByIDs(ctx context.Context, ids []string) (map[string]*model.UserExample, error)
|
|
GetByLastID(ctx context.Context, lastID string, limit int, sort string) ([]*model.UserExample, error)
|
|
}
|
|
|
|
type userExampleDao struct {
|
|
collection *mongo.Collection
|
|
cache cache.UserExampleCache // if nil, the cache is not used.
|
|
sfg *singleflight.Group // if cache is nil, the sfg is not used.
|
|
}
|
|
|
|
// NewUserExampleDao creating the dao interface
|
|
func NewUserExampleDao(collection *mongo.Collection, xCache cache.UserExampleCache) UserExampleDao {
|
|
if xCache == nil {
|
|
return &userExampleDao{collection: collection}
|
|
}
|
|
return &userExampleDao{
|
|
collection: collection,
|
|
cache: xCache,
|
|
sfg: new(singleflight.Group),
|
|
}
|
|
}
|
|
|
|
func (d *userExampleDao) deleteCache(ctx context.Context, id string) error {
|
|
if d.cache != nil {
|
|
return d.cache.Del(ctx, id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Create a new userExample, insert the record and the id value is written back to the table
|
|
func (d *userExampleDao) Create(ctx context.Context, record *model.UserExample) error {
|
|
if record.ID.IsZero() {
|
|
record.ID = primitive.NewObjectID()
|
|
}
|
|
if record.CreatedAt.IsZero() {
|
|
record.CreatedAt = time.Now()
|
|
record.UpdatedAt = time.Now()
|
|
}
|
|
_, err := d.collection.InsertOne(ctx, record)
|
|
|
|
_ = d.deleteCache(ctx, record.ID.Hex())
|
|
return err
|
|
}
|
|
|
|
// DeleteByID delete a userExample by id
|
|
func (d *userExampleDao) DeleteByID(ctx context.Context, id string) error {
|
|
filter := bson.M{"_id": database.ToObjectID(id)}
|
|
_, err := d.collection.UpdateOne(ctx, mgo.ExcludeDeleted(filter), mgo.EmbedDeletedAt(bson.M{}))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// delete cache
|
|
_ = d.deleteCache(ctx, id)
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateByID update a userExample by id
|
|
func (d *userExampleDao) UpdateByID(ctx context.Context, record *model.UserExample) error {
|
|
err := d.updateDataByID(ctx, d.collection, record)
|
|
|
|
// delete cache
|
|
_ = d.deleteCache(ctx, record.ID.Hex())
|
|
|
|
return err
|
|
}
|
|
|
|
func (d *userExampleDao) updateDataByID(ctx context.Context, collection *mongo.Collection, table *model.UserExample) error {
|
|
if table.ID.IsZero() {
|
|
return errors.New("id is empty or invalid")
|
|
}
|
|
|
|
update := bson.M{}
|
|
// todo generate the update fields code to here
|
|
// delete the templates code start
|
|
if table.Name != "" {
|
|
update["name"] = table.Name
|
|
}
|
|
if table.Password != "" {
|
|
update["password"] = table.Password
|
|
}
|
|
if table.Email != "" {
|
|
update["email"] = table.Email
|
|
}
|
|
if table.Phone != "" {
|
|
update["phone"] = table.Phone
|
|
}
|
|
if table.Avatar != "" {
|
|
update["avatar"] = table.Avatar
|
|
}
|
|
if table.Age > 0 {
|
|
update["age"] = table.Age
|
|
}
|
|
if table.Gender > 0 {
|
|
update["gender"] = table.Gender
|
|
}
|
|
if table.LoginAt > 0 {
|
|
update["login_at"] = table.LoginAt
|
|
}
|
|
// delete the templates code end
|
|
|
|
filter := bson.M{"_id": table.ID}
|
|
_, err := collection.UpdateOne(ctx, mgo.ExcludeDeleted(filter), mgo.EmbedUpdatedAt(update))
|
|
return err
|
|
}
|
|
|
|
// GetByID get a userExample by id
|
|
func (d *userExampleDao) GetByID(ctx context.Context, id string) (*model.UserExample, error) {
|
|
oid := database.ToObjectID(id)
|
|
if oid.IsZero() {
|
|
return nil, database.ErrRecordNotFound
|
|
}
|
|
filter := bson.M{"_id": oid}
|
|
// no cache
|
|
if d.cache == nil {
|
|
record := &model.UserExample{}
|
|
err := d.collection.FindOne(ctx, mgo.ExcludeDeleted(filter)).Decode(record)
|
|
return record, err
|
|
}
|
|
|
|
// get from cache
|
|
cacheRecord, err := d.cache.Get(ctx, id)
|
|
if err == nil {
|
|
return cacheRecord, nil
|
|
}
|
|
|
|
// get from mongodb
|
|
if errors.Is(err, database.ErrCacheNotFound) {
|
|
// for the same id, prevent high concurrent simultaneous access to mongodb
|
|
val, err, _ := d.sfg.Do(id, func() (interface{}, error) {
|
|
record := &model.UserExample{}
|
|
err = d.collection.FindOne(ctx, mgo.ExcludeDeleted(filter)).Decode(record)
|
|
if err != nil {
|
|
// set placeholder cache to prevent cache penetration, default expiration time 10 minutes
|
|
if errors.Is(err, database.ErrRecordNotFound) {
|
|
if err = d.cache.SetPlaceholder(ctx, id); err != nil {
|
|
logger.Warn("cache.SetPlaceholder error", logger.Err(err), logger.Any("id", id))
|
|
}
|
|
return nil, database.ErrRecordNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
// set cache
|
|
if err = d.cache.Set(ctx, id, record, cache.UserExampleExpireTime); err != nil {
|
|
logger.Warn("cache.Set error", logger.Err(err), logger.Any("id", id))
|
|
}
|
|
return record, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
record, ok := val.(*model.UserExample)
|
|
if !ok {
|
|
return nil, database.ErrRecordNotFound
|
|
}
|
|
return record, nil
|
|
}
|
|
|
|
if d.cache.IsPlaceholderErr(err) {
|
|
return nil, database.ErrRecordNotFound
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// GetByColumns get a paginated list of userExamples by custom conditions.
|
|
// For more details, please refer to https://go-sponge.com/component/custom-page-query.html
|
|
func (d *userExampleDao) GetByColumns(ctx context.Context, params *query.Params) ([]*model.UserExample, int64, error) {
|
|
filter, err := params.ConvertToMongoFilter(query.WithWhitelistNames(model.UserExampleColumnNames))
|
|
if err != nil {
|
|
return nil, 0, errors.New("query params error: " + err.Error())
|
|
}
|
|
filter = mgo.ExcludeDeleted(filter)
|
|
logger.Info("query filter", logger.Any("filter", filter))
|
|
|
|
total, err := d.collection.CountDocuments(ctx, filter)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if total == 0 {
|
|
return nil, total, nil
|
|
}
|
|
|
|
records := []*model.UserExample{}
|
|
sort, limit, skip := params.ConvertToPage()
|
|
findOpts := new(options.FindOptions)
|
|
findOpts.SetLimit(int64(limit)).SetSkip(int64(skip))
|
|
findOpts.Sort = sort
|
|
|
|
cursor, err := d.collection.Find(ctx, filter, findOpts)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
err = cursor.All(ctx, &records)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return records, total, err
|
|
}
|
|
|
|
// DeleteByIDs batch delete userExample by ids
|
|
func (d *userExampleDao) DeleteByIDs(ctx context.Context, ids []string) error {
|
|
oids := mgo.ConvertToObjectIDs(ids)
|
|
filter := bson.M{"_id": bson.M{"$in": oids}}
|
|
_, err := d.collection.UpdateMany(ctx, mgo.ExcludeDeleted(filter), mgo.EmbedDeletedAt(bson.M{}))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// delete cache
|
|
for _, id := range ids {
|
|
_ = d.deleteCache(ctx, id)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetByCondition get a userExample by custom condition.
|
|
// For more details, please refer to https://go-sponge.com/component/custom-page-query.html#_2-condition-parameters-optional
|
|
func (d *userExampleDao) GetByCondition(ctx context.Context, c *query.Conditions) (*model.UserExample, error) {
|
|
filter, err := c.ConvertToMongo(query.WithWhitelistNames(model.UserExampleColumnNames))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
filter = mgo.ExcludeDeleted(filter)
|
|
logger.Info("query filter", logger.Any("filter", filter))
|
|
|
|
record := &model.UserExample{}
|
|
err = d.collection.FindOne(ctx, filter).Decode(record)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return record, nil
|
|
}
|
|
|
|
// GetByIDs Batch get userExample by ids
|
|
func (d *userExampleDao) GetByIDs(ctx context.Context, ids []string) (map[string]*model.UserExample, error) {
|
|
// no cache
|
|
if d.cache == nil {
|
|
records := []*model.UserExample{}
|
|
oids := mgo.ConvertToObjectIDs(ids)
|
|
filter := bson.M{"_id": bson.M{"$in": oids}}
|
|
cursor, err := d.collection.Find(ctx, mgo.ExcludeDeleted(filter))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = cursor.All(ctx, &records)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
itemMap := make(map[string]*model.UserExample)
|
|
for _, record := range records {
|
|
itemMap[record.ID.Hex()] = record
|
|
}
|
|
return itemMap, nil
|
|
}
|
|
|
|
// get form cache
|
|
itemMap, err := d.cache.MultiGet(ctx, ids)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var missedIDs []string
|
|
for _, id := range ids {
|
|
if _, ok := itemMap[id]; !ok {
|
|
missedIDs = append(missedIDs, id)
|
|
}
|
|
}
|
|
|
|
// get missed data
|
|
if len(missedIDs) > 0 {
|
|
// find the id of an active placeholder, i.e. an id that does not exist in mongodb
|
|
var realMissedIDs []string
|
|
for _, id := range missedIDs {
|
|
_, err = d.cache.Get(ctx, id)
|
|
if d.cache.IsPlaceholderErr(err) {
|
|
continue
|
|
}
|
|
realMissedIDs = append(realMissedIDs, id)
|
|
}
|
|
|
|
// get missed id from database
|
|
if len(realMissedIDs) > 0 {
|
|
records := []*model.UserExample{}
|
|
recordIDMap := make(map[string]struct{})
|
|
oids := mgo.ConvertToObjectIDs(realMissedIDs)
|
|
filter := bson.M{"_id": bson.M{"$in": oids}}
|
|
cursor, err := d.collection.Find(ctx, mgo.ExcludeDeleted(filter))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = cursor.All(ctx, &records)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(records) > 0 {
|
|
for _, data := range records {
|
|
itemMap[data.ID.Hex()] = data
|
|
recordIDMap[data.ID.Hex()] = struct{}{}
|
|
}
|
|
if err = d.cache.MultiSet(ctx, records, cache.UserExampleExpireTime); err != nil {
|
|
logger.Warn("cache.MultiSet error", logger.Err(err), logger.Any("ids", records))
|
|
}
|
|
if len(records) == len(realMissedIDs) {
|
|
return itemMap, nil
|
|
}
|
|
}
|
|
for _, id := range realMissedIDs {
|
|
if _, ok := recordIDMap[id]; !ok {
|
|
if err = d.cache.SetPlaceholder(ctx, id); err != nil {
|
|
logger.Warn("cache.SetPlaceholder error", logger.Err(err), logger.Any("id", id))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return itemMap, nil
|
|
}
|
|
|
|
// GetByLastID Get a paginated list of userExamples by last id
|
|
func (d *userExampleDao) GetByLastID(ctx context.Context, lastID string, limit int, sort string) ([]*model.UserExample, error) {
|
|
page := query.NewPage(0, limit, sort)
|
|
|
|
findOpts := new(options.FindOptions)
|
|
findOpts.SetLimit(int64(page.Limit())).SetSkip(int64(page.Skip()))
|
|
findOpts.Sort = page.Sort()
|
|
|
|
records := []*model.UserExample{}
|
|
filter := bson.M{"_id": bson.M{"$lt": database.ToObjectID(lastID)}}
|
|
|
|
cursor, err := d.collection.Find(ctx, mgo.ExcludeDeleted(filter), findOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = cursor.All(ctx, &records)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return records, nil
|
|
}
|