fix: uses existing table if exists

closes #1571
This commit is contained in:
Bryan Vaz
2025-03-10 09:12:26 -04:00
parent ab88d43900
commit 7f995dafb2
3 changed files with 228 additions and 16 deletions

View File

@@ -26,19 +26,31 @@ type Storage struct {
}
var (
checkSchemaMsg = "The `v` row has an incorrect data type. " +
"It should be BYTEA but is instead %s. This will cause encoding-related panics if the DB is not migrated (see https://github.com/gofiber/storage/blob/main/MIGRATE.md)."
dropQuery = `DROP TABLE IF EXISTS %s;`
checkSchemaMsg = "The `%s` row has an incorrect data type. " +
"It should be %s but is instead %s. This will cause encoding-related panics if the DB is not migrated (see https://github.com/gofiber/storage/blob/main/MIGRATE.md)."
dropQuery = `DROP TABLE IF EXISTS %s;`
checkTableExistsQuery = `SELECT COUNT(table_name)
FROM information_schema.tables
WHERE table_schema = '%s'
AND table_name = '%s';`
initQuery = []string{
`CREATE TABLE IF NOT EXISTS %s (
`CREATE TABLE %s (
k VARCHAR(64) PRIMARY KEY NOT NULL DEFAULT '',
v BYTEA NOT NULL,
e BIGINT NOT NULL DEFAULT '0'
);`,
`CREATE INDEX IF NOT EXISTS e ON %s (e);`,
}
checkSchemaQuery = `SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '%s' AND COLUMN_NAME = 'v';`
checkSchemaQuery = `SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '%s'
AND table_name = '%s'
AND column_name IN ('k','v','e');`
checkSchemaTargetDataType = map[string]string{
"k": "character varying",
"v": "bytea",
"e": "bigint",
}
)
// New creates a new storage
@@ -61,6 +73,14 @@ func New(config ...Config) *Storage {
panic(err)
}
// Parse out schema in config, if provided
schema := "public"
tableName := cfg.Table
if strings.Contains(cfg.Table, ".") {
schema = strings.Split(cfg.Table, ".")[0]
tableName = strings.Split(cfg.Table, ".")[1]
}
// Drop table if set to true
if cfg.Reset {
if _, err := db.Exec(context.Background(), fmt.Sprintf(dropQuery, cfg.Table)); err != nil {
@@ -69,11 +89,23 @@ func New(config ...Config) *Storage {
}
}
// Determine if table exists
tableExists := false
row := db.QueryRow(context.Background(), fmt.Sprintf(checkTableExistsQuery, schema, tableName))
var count int
if err := row.Scan(&count); err != nil {
db.Close()
panic(err)
}
tableExists = count > 0
// Init database queries
for _, query := range initQuery {
if _, err := db.Exec(context.Background(), fmt.Sprintf(query, cfg.Table)); err != nil {
db.Close()
panic(err)
if !tableExists {
for _, query := range initQuery {
if _, err := db.Exec(context.Background(), fmt.Sprintf(query, cfg.Table)); err != nil {
db.Close()
panic(err)
}
}
}
@@ -185,15 +217,41 @@ func (s *Storage) gc(t time.Time) {
_, _ = s.db.Exec(context.Background(), s.sqlGC, t.Unix())
}
func (s *Storage) checkSchema(tableName string) {
var data []byte
func (s *Storage) checkSchema(fullTableName string) {
schema := "public"
tableName := fullTableName
if strings.Contains(fullTableName, ".") {
schema = strings.Split(fullTableName, ".")[0]
tableName = strings.Split(fullTableName, ".")[1]
}
row := s.db.QueryRow(context.Background(), fmt.Sprintf(checkSchemaQuery, tableName))
if err := row.Scan(&data); err != nil {
rows, err := s.db.Query(context.Background(), fmt.Sprintf(checkSchemaQuery, schema, tableName))
if err != nil {
panic(err)
}
defer rows.Close()
if strings.ToLower(string(data)) != "bytea" {
fmt.Printf(checkSchemaMsg, string(data))
data := make(map[string]string)
rowCount := 0
for rows.Next() {
var columnName, dataType string
if err := rows.Scan(&columnName, &dataType); err != nil {
panic(err)
}
data[columnName] = dataType
rowCount++
}
if rowCount == 0 {
panic(fmt.Errorf("table %s does not exist", tableName))
}
for columnName, dataType := range checkSchemaTargetDataType {
dt, ok := data[columnName]
if !ok {
panic(fmt.Errorf("required column %s does not exist in table %s", columnName, tableName))
}
if dt != dataType {
panic(fmt.Errorf(checkSchemaMsg, columnName, dataType, dt))
}
}
}