🔥 Make DynamoDB Production-Ready [🎌 breaking change] (#323)

* Update CI/CD tests.

* Update aws-sdk-go to v2, add unit tests, remove warning test, make config better.

* add action

Co-authored-by: wernerr <rene.werner@verivox.com>
This commit is contained in:
M. Efe Çetin
2022-02-10 17:44:59 +03:00
committed by GitHub
parent f9eaa6ae4d
commit 315f14ce58
7 changed files with 450 additions and 174 deletions

View File

@@ -3,90 +3,23 @@ package dynamodb
import (
"context"
"errors"
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
awsdynamodb "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
awsdynamodb "github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// Storage interface that is implemented by storage providers
type Storage struct {
db *awsdynamodb.DynamoDB
table string
}
// New creates a new storage
func New(config Config) *Storage {
// Set default config
cfg := configDefault(config)
// Create db
var creds *credentials.Credentials
if (cfg.AWSaccessKeyID != "" && cfg.AWSsecretAccessKey == "") || (cfg.AWSaccessKeyID == "" && cfg.AWSsecretAccessKey != "") {
panic("[DynamoDB] You need to set BOTH AWSaccessKeyID AND AWSsecretAccessKey")
} else if cfg.AWSaccessKeyID != "" {
// Due to the previous check we can be sure that in this case AWSsecretAccessKey is not empty as well.
creds = credentials.NewStaticCredentials(cfg.AWSaccessKeyID, cfg.AWSsecretAccessKey, "")
}
// Set database options
opt := aws.NewConfig()
if cfg.Region != "" {
opt = opt.WithRegion(cfg.Region)
}
if creds != nil {
opt = opt.WithCredentials(creds)
}
if cfg.CustomEndpoint != "" {
opt = opt.WithEndpoint(cfg.CustomEndpoint)
}
sessionOpt := session.Options{
SharedConfigState: session.SharedConfigEnable,
}
// ...but allow overwrite of region and credentials if they are set in the options.
sessionOpt.Config.MergeIn(opt)
session, err := session.NewSessionWithOptions(sessionOpt)
if err != nil {
panic(err)
}
svc := awsdynamodb.New(session)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
describeTableInput := awsdynamodb.DescribeTableInput{
TableName: &cfg.Table,
}
_, err = svc.DescribeTableWithContext(timeoutCtx, &describeTableInput)
if err != nil {
awsErr, ok := err.(awserr.Error)
if !ok {
panic(err)
} else if awsErr.Code() == awsdynamodb.ErrCodeResourceNotFoundException {
err = createTable(cfg.Table, cfg.readCapacityUnits, cfg.writeCapacityUnits, *cfg.waitForTableCreation, describeTableInput, svc)
if err != nil {
panic(err)
}
} else {
panic(err)
}
}
// Create storage
store := &Storage{
db: svc,
table: cfg.Table,
}
// Start garbage collector
//go store.gc()
return store
db *awsdynamodb.Client
table string
requestTimeout time.Duration
}
// "k" is used as table column name for the key.
@@ -95,126 +28,182 @@ var keyAttrName = "k"
// "v" is used as table column name for the value.
var valAttrName = "v"
type table struct {
K string
V []byte
}
// New creates a new storage
func New(config Config) *Storage {
// Set default config
cfg := configDefault(config)
awscfg, err := returnAWSConfig(cfg)
if err != nil {
panic(fmt.Sprintf("unable to load SDK config, %v", err))
}
// Create db
sess := awsdynamodb.NewFromConfig(awscfg)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
describeTableInput := awsdynamodb.DescribeTableInput{
TableName: &cfg.Table,
}
// Create storage
store := &Storage{
db: sess,
table: cfg.Table,
}
// Create table
_, err = sess.DescribeTable(timeoutCtx, &describeTableInput)
if err != nil {
var rnfe *types.ResourceNotFoundException
if errors.As(err, &rnfe) {
err := store.createTable(cfg, describeTableInput)
if err != nil {
panic(err)
}
} else {
panic(err)
}
}
return store
}
// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
k := make(map[string]*awsdynamodb.AttributeValue)
k[keyAttrName] = &awsdynamodb.AttributeValue{
S: &key,
ctx, cancel := s.requestContext()
defer cancel()
k := make(map[string]types.AttributeValue)
k[keyAttrName] = &types.AttributeValueMemberS{
Value: key,
}
getItemInput := awsdynamodb.GetItemInput{
TableName: &s.table,
Key: k,
}
getItemOutput, err := s.db.GetItem(&getItemInput)
getItemOutput, err := s.db.GetItem(ctx, &getItemInput)
if err != nil {
var rnfe *types.ResourceNotFoundException
if errors.As(err, &rnfe) {
return nil, nil
}
return nil, err
} else if getItemOutput.Item == nil {
return nil, nil
}
attributeVal := getItemOutput.Item[valAttrName]
if attributeVal == nil {
return nil, nil
}
return attributeVal.B, nil
item := &table{}
err = attributevalue.UnmarshalMap(getItemOutput.Item, &item)
return item.V, err
}
// Set key with value
// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
ctx, cancel := s.requestContext()
defer cancel()
// Ain't Nobody Got Time For That
if len(key) <= 0 || len(val) <= 0 {
return nil
}
item := make(map[string]*awsdynamodb.AttributeValue)
item[keyAttrName] = &awsdynamodb.AttributeValue{
S: &key,
item := make(map[string]types.AttributeValue)
item[keyAttrName] = &types.AttributeValueMemberS{
Value: key,
}
item[valAttrName] = &awsdynamodb.AttributeValue{
B: val,
item[valAttrName] = &types.AttributeValueMemberB{
Value: val,
}
putItemInput := awsdynamodb.PutItemInput{
TableName: &s.table,
Item: item,
}
_, err := s.db.PutItem(&putItemInput)
_, err := s.db.PutItem(ctx, &putItemInput)
return err
}
// Delete entry by key
func (s *Storage) Delete(key string) error {
ctx, cancel := s.requestContext()
defer cancel()
// Ain't Nobody Got Time For That
if len(key) <= 0 {
return nil
}
k := make(map[string]*awsdynamodb.AttributeValue)
k[keyAttrName] = &awsdynamodb.AttributeValue{
S: &key,
k := make(map[string]types.AttributeValue)
k[keyAttrName] = &types.AttributeValueMemberS{
Value: key,
}
deleteItemInput := awsdynamodb.DeleteItemInput{
TableName: &s.table,
Key: k,
}
_, err := s.db.DeleteItem(&deleteItemInput)
_, err := s.db.DeleteItem(ctx, &deleteItemInput)
return err
}
// Reset all entries, including unexpired
func (s *Storage) Reset() error {
ctx, cancel := s.requestContext()
defer cancel()
deleteTableInput := awsdynamodb.DeleteTableInput{
TableName: &s.table,
}
_, err := s.db.DeleteTable(&deleteTableInput)
_, err := s.db.DeleteTable(ctx, &deleteTableInput)
return err
}
// Close the database
func (s *Storage) Close() error {
// In the DynamoDB implementation this doesn't have any effect.
return nil
}
// GC deletes all expired entries
// func (s *Storage) gc() {
// ticker := time.NewTicker(s.gcInterval)
// defer ticker.Stop()
// for {
// select {
// case <-s.done:
// return
// case t := <-ticker.C:
// _, _ = s.db.Exec(s.sqlGC, t.Unix())
// }
// }
// }
func (s *Storage) createTable(cfg Config, describeTableInput awsdynamodb.DescribeTableInput) error {
ctx, cancel := s.requestContext()
defer cancel()
func createTable(tableName string, readCapacityUnits, writeCapacityUnits int64, waitForTableCreation bool, describeTableInput awsdynamodb.DescribeTableInput, svc *awsdynamodb.DynamoDB) error {
keyAttrType := "S" // For "string"
keyType := "HASH" // As opposed to "RANGE"
createTableInput := awsdynamodb.CreateTableInput{
TableName: &tableName,
AttributeDefinitions: []*awsdynamodb.AttributeDefinition{{
TableName: &s.table,
AttributeDefinitions: []types.AttributeDefinition{{
AttributeName: &keyAttrName,
AttributeType: &keyAttrType,
AttributeType: types.ScalarAttributeType(keyAttrType),
}},
KeySchema: []*awsdynamodb.KeySchemaElement{{
KeySchema: []types.KeySchemaElement{{
AttributeName: &keyAttrName,
KeyType: &keyType,
KeyType: types.KeyType(keyType),
}},
ProvisionedThroughput: &awsdynamodb.ProvisionedThroughput{
ReadCapacityUnits: &readCapacityUnits,
WriteCapacityUnits: &writeCapacityUnits,
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: &cfg.ReadCapacityUnits,
WriteCapacityUnits: &cfg.WriteCapacityUnits,
},
}
_, err := svc.CreateTable(&createTableInput)
_, err := s.db.CreateTable(ctx, &createTableInput)
if err != nil {
return err
}
// If configured (true by default), block until the table is created.
// Typical table creation duration is 10 seconds.
if waitForTableCreation {
if *cfg.WaitForTableCreation {
for try := 1; try < 16; try++ {
describeTableOutput, err := svc.DescribeTable(&describeTableInput)
if err != nil || *describeTableOutput.Table.TableStatus == "CREATING" {
describeTableOutput, err := s.db.DescribeTable(ctx, &describeTableInput)
if err != nil || describeTableOutput.Table.TableStatus == "CREATING" {
time.Sleep(1 * time.Second)
} else {
break
@@ -222,14 +211,56 @@ func createTable(tableName string, readCapacityUnits, writeCapacityUnits int64,
}
// Last try (16th) after 15 seconds of waiting.
// Now handle error as such.
describeTableOutput, err := svc.DescribeTable(&describeTableInput)
describeTableOutput, err := s.db.DescribeTable(ctx, &describeTableInput)
if err != nil {
return errors.New("The DynamoDB table couldn't be created")
return errors.New("dynamodb: the table couldn't be created")
}
if *describeTableOutput.Table.TableStatus == "CREATING" {
return errors.New("The DynamoDB table took too long to be created")
if describeTableOutput.Table.TableStatus == "CREATING" {
return errors.New("dynamodb: the table took too long to be created")
}
}
return nil
}
// Context for making requests will timeout if a non-zero timeout is configured
func (s *Storage) requestContext() (context.Context, context.CancelFunc) {
if s.requestTimeout > 0 {
return context.WithTimeout(context.Background(), s.requestTimeout)
}
return context.Background(), func() {}
}
func returnAWSConfig(cfg Config) (aws.Config, error) {
endpoint := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if cfg.Endpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: cfg.Endpoint,
SigningRegion: cfg.Region,
HostnameImmutable: true,
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
if cfg.Credentials != (Credentials{}) {
credentials := credentials.NewStaticCredentialsProvider(cfg.Credentials.AccessKey, cfg.Credentials.SecretAccessKey, "")
return awsconfig.LoadDefaultConfig(context.TODO(),
awsconfig.WithRegion(cfg.Region),
awsconfig.WithEndpointResolverWithOptions(endpoint),
awsconfig.WithCredentialsProvider(credentials),
awsconfig.WithRetryer(func() aws.Retryer {
return retry.AddWithMaxAttempts(retry.NewStandard(), cfg.MaxAttempts)
}),
)
}
return awsconfig.LoadDefaultConfig(context.TODO(),
awsconfig.WithRegion(cfg.Region),
awsconfig.WithEndpointResolverWithOptions(endpoint),
awsconfig.WithRetryer(func() aws.Retryer {
return retry.AddWithMaxAttempts(retry.NewStandard(), cfg.MaxAttempts)
}),
)
}