删除设备的同时清理通道

This commit is contained in:
xugo
2025-03-09 18:46:24 +08:00
parent 5dfe109b28
commit 6d2f457fde
7 changed files with 153 additions and 52 deletions

View File

@@ -19,18 +19,18 @@ version = 1
SlowThreshold = '200ms'
[Sip]
Port = 15062
Port = 15060
ID = "3402000000200000001"
Domain = "3402000000"
Password = "12345678"
Password = ""
[Media]
IP = "127.0.0.1"
HTTPPort = 8080
Secret = "s1kPE7bzqKeHUaVcp8dCA0jeB8yxyFq4"
WebHookIP = "host.docker.internal"
WebHookIP = "192.168.1.10"
RTPPortRange = "20000-20500"
SDPIP = "192.168.10.14"
SDPIP = "192.168.1.10"
[Log]
# 日志存储目录,不能使用特殊符号
@@ -40,6 +40,6 @@ version = 1
# 保留日志多久,超过时间自动删除
MaxAge = '744h0m0s'
# 多久时间,分割一个新的日志文件
RotationTime = '8h0m0s'
RotationTime = '12h0m0s'
# 多大文件,分割一个新的日志文件(MB)
RotationSize = 50

View File

@@ -9,6 +9,7 @@ import (
"github.com/ixugo/goweb/pkg/orm"
"github.com/ixugo/goweb/pkg/web"
"github.com/jinzhu/copier"
"gorm.io/gorm"
)
// DeviceStorer Instantiation interface
@@ -18,6 +19,8 @@ type DeviceStorer interface {
Add(context.Context, *Device) error
Edit(context.Context, *Device, func(*Device), ...orm.QueryOption) error
Del(context.Context, *Device, ...orm.QueryOption) error
Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error
}
// FindDevice Paginated search
@@ -96,9 +99,9 @@ func (c Core) EditDevice(ctx context.Context, in *EditDeviceInput, id string) (*
// DelDevice Delete object
func (c Core) DelDevice(ctx context.Context, id string) (*Device, error) {
var out Device
if err := c.store.Device().Del(ctx, &out, orm.Where("id=?", id)); err != nil {
var dev Device
if err := c.store.Device().Del(ctx, &dev, orm.Where("id=?", id)); err != nil {
return nil, web.ErrDB.Withf(`Del err[%s]`, err.Error())
}
return &out, nil
return &dev, nil
}

View File

@@ -15,9 +15,8 @@ import (
)
var (
_ gbs.MemoryStorer = &Cache{}
_ gb28181.Storer = &Cache{}
_ gb28181.DeviceStorer = &Cache{}
_ gbs.MemoryStorer = &Cache{}
_ gb28181.Storer = &Cache{}
)
type Cache struct {
@@ -26,33 +25,12 @@ type Cache struct {
devices *conc.Map[string, *gbs.Device]
}
// Add implements gb28181.DeviceStorer.
func (c *Cache) Add(ctx context.Context, d *gb28181.Device) error {
return c.Storer.Device().Add(ctx, d)
}
// Del implements gb28181.DeviceStorer.
func (c *Cache) Del(ctx context.Context, d *gb28181.Device, opts ...orm.QueryOption) error {
return c.Storer.Device().Del(ctx, d, opts...)
}
// Edit implements gb28181.DeviceStorer.
func (c *Cache) Edit(ctx context.Context, d *gb28181.Device, changeFn func(*gb28181.Device), opts ...orm.QueryOption) error {
return c.Storer.Device().Edit(ctx, d, changeFn, opts...)
}
// Find implements gb28181.DeviceStorer.
func (c *Cache) Find(ctx context.Context, d *[]*gb28181.Device, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
return c.Storer.Device().Find(ctx, d, pager, opts...)
}
// Get implements gb28181.DeviceStorer.
func (c *Cache) Get(ctx context.Context, d *gb28181.Device, opts ...orm.QueryOption) error {
return c.Storer.Device().Get(ctx, d, opts...)
}
func (c *Cache) Device() gb28181.DeviceStorer {
return c
return (*Device)(c)
}
func (c *Cache) Channel() gb28181.ChannelStorer {
return (*Channel)(c)
}
func NewCache(store gb28181.Storer) *Cache {
@@ -76,15 +54,15 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) {
continue
}
channels := make([]*gb28181.Channel, 0, 8)
_, err := c.Storer.Channel().Find(context.TODO(), &channels, web.NewPagerFilterMaxSize(), orm.Where("device_id=?", d.DeviceID))
if err != nil {
panic(err)
}
dev := gbs.NewDevice(conn, d, channels)
dev := gbs.NewDevice(conn, d)
if dev != nil {
slog.Debug("load device to memory", "device_id", d.DeviceID, "to", dev.To())
channels := make([]*gb28181.Channel, 0, 8)
_, err := c.Storer.Channel().Find(context.TODO(), &channels, web.NewPagerFilterMaxSize(), orm.Where("device_id=?", d.DeviceID))
if err != nil {
panic(err)
}
dev.LoadChannels(channels...)
c.devices.Store(d.DeviceID, dev)
}
}

View File

@@ -0,0 +1,49 @@
package gb28181cache
import (
"context"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/ixugo/goweb/pkg/orm"
)
var _ gb28181.ChannelStorer = &Channel{}
type Channel Cache
// Add implements gb28181.ChannelStorer.
func (c *Channel) Add(ctx context.Context, ch *gb28181.Channel) error {
if err := c.Storer.Channel().Add(ctx, ch); err != nil {
return err
}
dev, ok := c.devices.Load(ch.DeviceID)
if ok {
dev.LoadChannels(ch)
}
return nil
}
// BatchEdit implements gb28181.ChannelStorer.
func (c *Channel) BatchEdit(ctx context.Context, field string, value any, opts ...orm.QueryOption) error {
return c.Storer.Channel().BatchEdit(ctx, field, value, opts...)
}
// Del implements gb28181.ChannelStorer.
func (c *Channel) Del(ctx context.Context, ch *gb28181.Channel, opts ...orm.QueryOption) error {
return c.Storer.Channel().Del(ctx, ch, opts...)
}
// Edit implements gb28181.ChannelStorer.
func (c *Channel) Edit(ctx context.Context, ch *gb28181.Channel, changeFn func(*gb28181.Channel), opts ...orm.QueryOption) error {
return c.Storer.Channel().Edit(ctx, ch, changeFn, opts...)
}
// Find implements gb28181.ChannelStorer.
func (c *Channel) Find(ctx context.Context, chs *[]*gb28181.Channel, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
return c.Storer.Channel().Find(ctx, chs, pager, opts...)
}
// Get implements gb28181.ChannelStorer.
func (c *Channel) Get(ctx context.Context, ch *gb28181.Channel, opts ...orm.QueryOption) error {
return c.Storer.Channel().Get(ctx, ch, opts...)
}

View File

@@ -0,0 +1,66 @@
package gb28181cache
import (
"context"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/ixugo/goweb/pkg/orm"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var _ gb28181.DeviceStorer = &Device{}
type Device Cache
// Add implements gb28181.DeviceStorer.
func (d *Device) Add(ctx context.Context, dev *gb28181.Device) error {
if err := d.Storer.Device().Add(ctx, dev); err != nil {
return err
}
d.devices.LoadOrStore(dev.DeviceID, gbs.NewDevice(nil, dev))
return nil
}
// Del implements gb28181.DeviceStorer.
func (d *Device) Del(ctx context.Context, dev *gb28181.Device, opts ...orm.QueryOption) error {
if err := d.Storer.Device().Session(
ctx,
func(tx *gorm.DB) error {
db := tx.Clauses(clause.Returning{})
for _, fn := range opts {
db = fn(db)
}
return db.Delete(dev).Error
},
func(tx *gorm.DB) error {
return tx.Model(&gb28181.Channel{}).Where("did=?", dev.ID).Delete(nil).Error
},
); err != nil {
return err
}
d.devices.Delete(dev.DeviceID)
return nil
}
// Edit implements gb28181.DeviceStorer.
func (d *Device) Edit(ctx context.Context, dev *gb28181.Device, changeFn func(*gb28181.Device), opts ...orm.QueryOption) error {
return d.Storer.Device().Edit(ctx, dev, changeFn, opts...)
}
// Find implements gb28181.DeviceStorer.
func (d *Device) Find(ctx context.Context, devs *[]*gb28181.Device, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
return d.Storer.Device().Find(ctx, devs, pager, opts...)
}
// Get implements gb28181.DeviceStorer.
func (d *Device) Get(ctx context.Context, dev *gb28181.Device, opts ...orm.QueryOption) error {
return d.Storer.Device().Get(ctx, dev, opts...)
}
// Session implements gb28181.DeviceStorer.
func (d *Device) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error {
return d.Storer.Device().Session(ctx, changeFns...)
}

View File

@@ -2,7 +2,6 @@ package gbs
import (
"encoding/xml"
"fmt"
"log/slog"
"net"
@@ -70,9 +69,10 @@ func (g GB28181API) sipMessageCatalog(ctx *sip.Context) {
// QueryCatalog 设备目录查询或订阅请求
// GB/T28181 81 页 A.2.4.3
func (g *GB28181API) QueryCatalog(deviceID string) error {
slog.Debug("QueryCatalog", "deviceID", deviceID)
ipc, ok := g.svr.memoryStorer.Load(deviceID)
if !ok {
return fmt.Errorf("device not found")
return ErrDeviceOffline
}
_, err := g.svr.wrapRequest(ipc, sip.MethodMessage, &sip.ContentTypeXML, sip.GetCatalogXML(deviceID))

View File

@@ -21,13 +21,14 @@ var (
)
type Device struct {
channels conc.Map[string, *Channel]
Channels conc.Map[string, *Channel]
registerWithKeepaliveMutex sync.Mutex
// 播放互斥锁也可以移动到 channel 属性
playMutex sync.Mutex
IsOnline bool
Address string
conn sip.Connection
source net.Addr
@@ -38,7 +39,7 @@ type Device struct {
Expires int
}
func NewDevice(conn sip.Connection, d *gb28181.Device, channels []*gb28181.Channel) *Device {
func NewDevice(conn sip.Connection, d *gb28181.Device) *Device {
uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.DeviceID, d.Address))
if err != nil {
slog.Error("parse uri", "err", err, "did", d.ID)
@@ -58,20 +59,24 @@ func NewDevice(conn sip.Connection, d *gb28181.Device, channels []*gb28181.Chann
URI: uri,
Params: sip.NewParams(),
},
Address: d.Address,
LastKeepaliveAt: d.KeepaliveAt.Time,
LastRegisterAt: d.RegisteredAt.Time,
IsOnline: d.IsOnline,
}
return &c
}
func (d *Device) LoadChannels(channels ...*gb28181.Channel) {
for _, channel := range channels {
ch := Channel{
ChannelID: channel.ChannelID,
device: &c,
device: d,
}
ch.init(d.Address)
c.channels.Store(channel.ChannelID, &ch)
d.Channels.Store(channel.ChannelID, &ch)
}
return &c
}
// Conn implements Targeter.
@@ -164,7 +169,7 @@ func newDevice(network, address string, conn sip.Connection) *Device {
// }
func (c *Device) GetChannel(channelID string) (*Channel, bool) {
return c.channels.Load(channelID)
return c.Channels.Load(channelID)
}
// func (c *Client) Delete(deviceID string) {