Files
gb-cms/db_sqlite.go
2025-05-31 21:10:04 +08:00

144 lines
2.5 KiB
Go

package main
import (
"context"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/schema"
"time"
)
const (
DBNAME = "lkm_gb.db"
//DBNAME = ":memory:"
)
var (
db *gorm.DB
TaskQueue = make(chan *SaveTask, 1024)
DeviceDao = &daoDevice{}
ChannelDao = &daoChannel{}
PlatformDao = &daoPlatform{}
StreamDao = &daoStream{}
SinkDao = &daoSink{}
JTDeviceDao = &daoJTDevice{}
)
func init() {
db_, err := gorm.Open(sqlite.Open(DBNAME), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
TablePrefix: "lkm_",
},
})
if err != nil {
panic(err)
}
db = db_
tx := db.Exec("PRAGMA journal_mode=WAL;")
if tx.Error != nil {
panic(tx.Error)
}
// 每次启动释放空间
tx = db.Exec("VACUUM;")
if tx.Error != nil {
panic(tx.Error)
}
s, err := db.DB()
s.SetMaxOpenConns(40)
s.SetMaxIdleConns(10)
// devices
// channels
// platforms
// streams
// sinks
if err = db.AutoMigrate(&Device{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Channel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&PlatformModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Stream{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Sink{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&PlatformChannelModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&JTDeviceModel{}); err != nil {
panic(err)
}
StartSaveTask()
}
type SaveTask struct {
cb func(tx *gorm.DB) error
err error
cancel context.CancelFunc
}
func StartSaveTask() {
go func() {
for {
var tasks []*SaveTask
for len(TaskQueue) > 0 {
select {
case task := <-TaskQueue:
tasks = append(tasks, task)
}
}
if len(tasks) == 0 {
time.Sleep(50 * time.Millisecond)
continue
}
err := db.Transaction(func(tx *gorm.DB) error {
for _, task := range tasks {
task.err = task.cb(tx)
}
return nil
})
if err != nil {
Sugar.Errorf("DBTransaction error: %s", err)
}
for _, task := range tasks {
task.cancel()
}
}
}()
}
func DBTransaction(cb func(tx *gorm.DB) error) error {
ctx, cancel := context.WithCancel(context.Background())
task := &SaveTask{
cb: cb,
cancel: cancel,
}
TaskQueue <- task
<-ctx.Done()
return task.err
}
// OnExpires Redis设备ID到期回调
func OnExpires(db int, id string) {
Sugar.Infof("设备心跳过期 device: %s", id)
device, _ := DeviceDao.QueryDevice(id)
if device == nil {
Sugar.Errorf("设备不存在 device: %s", id)
return
}
device.Close()
}