取消每次注册查询通道列表

This commit is contained in:
yangjiechina
2024-08-08 20:16:25 +08:00
parent 5df2274d7c
commit 823014161e
8 changed files with 90 additions and 43 deletions

12
catalog.go Normal file
View File

@@ -0,0 +1,12 @@
package main
func (d *DBDevice) OnCatalog(response *QueryCatalogResponse) {
if d.Channels == nil {
d.Channels = make(map[string]Channel, 5)
}
for index := range response.DeviceList.Devices {
device := response.DeviceList.Devices[index]
d.Channels[device.DeviceID] = device
}
}

39
db_local.go Normal file
View File

@@ -0,0 +1,39 @@
package main
// LocalDB 使用json文件保存设备和通道信息
type LocalDB struct {
}
func (m LocalDB) LoadDevices() []*DBDevice {
return nil
}
func (m LocalDB) RegisterDevice(device *DBDevice) (error, bool) {
//持久化...
device.Status = "ON"
d := DeviceManager.Find(device.Id)
if d != nil {
d.Status = "ON"
d.RemoteAddr = device.RemoteAddr
d.Name = device.Name
d.Protocol = device.Protocol
} else if err := DeviceManager.Add(device); err != nil {
return err, false
}
return nil, d == nil || len(d.Channels) == 0
}
func (m LocalDB) UnRegisterDevice(id string) {
device := DeviceManager.Find(id)
if device == nil {
return
}
device.Status = "OFF"
}
func (m LocalDB) KeepAliveDevice(device *DBDevice) {
}

View File

@@ -1,19 +0,0 @@
package main
type MemoryDB struct {
}
func (m MemoryDB) LoadDevices() []*DBDevice {
return nil
}
func (m MemoryDB) AddDevice(device *DBDevice) error {
//持久化...
return DeviceManager.Add(device)
}
func (m MemoryDB) RemoveDevice(id string) {
//
DeviceManager.Remove(id)
}

1
db_redis.go Normal file
View File

@@ -0,0 +1 @@
package main

View File

@@ -32,6 +32,7 @@ type DBDevice struct {
Name string `json:"name"`
RemoteAddr string `json:"remote_addr"`
Protocol string `json:"protocol"`
Status string `xml:"Status,omitempty"` //在线状态 ON-在线/OFF-离线
Channels map[string]Channel `json:"channels"`
}
@@ -184,17 +185,6 @@ func (d *DBDevice) BuildDownloadRequest(channelId, ip string, port uint16, start
return d.BuildInviteRequest("Download", channelId, ip, port, startTime, stopTime, setup, speed, ssrc)
}
func (d *DBDevice) OnCatalog(response *QueryCatalogResponse) {
if d.Channels == nil {
d.Channels = make(map[string]Channel, 5)
}
for index := range response.DeviceList.Devices {
device := response.DeviceList.Devices[index]
d.Channels[device.DeviceID] = device
}
}
// CreateByeRequestFromAnswer 根据invite的应答创建Bye请求
// 应答的to头域需携带tag
func (d *DBDevice) CreateByeRequestFromAnswer(message sip.Response, uas bool) sip.Request {

View File

@@ -3,7 +3,9 @@ package main
type DeviceDB interface {
LoadDevices() []*DBDevice
AddDevice(device *DBDevice) error
RegisterDevice(device *DBDevice) (error, bool)
RemoveDevice(id string)
UnRegisterDevice(id string)
KeepAliveDevice(device *DBDevice)
}

View File

@@ -9,6 +9,7 @@ var (
Config *Config_
SipUA SipServer
TransportManager transport.Manager
DB DeviceDB
)
func init() {
@@ -33,13 +34,13 @@ func main() {
Config = config
TransportManager = transport.NewTransportManager(uint16(Config.Port[0]), uint16(Config.Port[1]))
db := &MemoryDB{}
devices := db.LoadDevices()
DB = &LocalDB{}
devices := DB.LoadDevices()
for _, device := range devices {
DeviceManager.Add(device)
}
server, err := StartSipServer(config, db)
server, err := StartSipServer(config)
if err != nil {
panic(err)
}

View File

@@ -51,7 +51,6 @@ type SipServer interface {
type sipServer struct {
sip gosip.Server
db DeviceDB
config *Config_
}
@@ -67,6 +66,7 @@ func setToTag(response sip.Message, toTag string) {
func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction) {
var device *DBDevice
var query bool
_ = req.GetHeaders("Authorization")
fromHeader := req.GetHeaders("From")[0].(*sip.FromHeader)
expiresHeader := req.GetHeaders("Expires")
@@ -75,7 +75,7 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction) {
if expiresHeader != nil && "0" == expiresHeader[0].Value() {
Sugar.Infof("注销信令 from:%s", fromHeader.Address.User())
s.db.RemoveDevice(fromHeader.Name())
DB.UnRegisterDevice(fromHeader.Name())
} else /*if authorizationHeader == nil*/ {
expires := sip.Expires(3600)
response.AppendHeader(&expires)
@@ -86,16 +86,18 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction) {
RemoteAddr: req.Source(),
}
s.db.AddDevice(device)
err, b := DB.RegisterDevice(device)
query = err != nil || b
}
sendResponse(tx, response)
if device != nil {
if device != nil && query {
catalog, err := device.BuildCatalogRequest()
if err != nil {
panic(err)
}
s.SendRequest(catalog)
}
}
@@ -212,7 +214,7 @@ func (s *sipServer) SendRequest(request sip.Request) {
}()
}
func StartSipServer(config *Config_, db DeviceDB) (SipServer, error) {
func StartSipServer(config *Config_) (SipServer, error) {
server := gosip.NewServer(gosip.ServerConfig{
Host: config.PublicIP,
}, nil, nil, logger)
@@ -229,8 +231,17 @@ func StartSipServer(config *Config_, db DeviceDB) (SipServer, error) {
server.OnRequest(sip.ACK, s.OnAck)
server.OnRequest(sip.NOTIFY, s.OnNotify)
server.OnRequest(sip.MESSAGE, func(req sip.Request, tx sip.ServerTransaction) {
response := sip.NewResponseFromRequest("", req, 200, "OK", "")
sendResponse(tx, response)
online := true
defer func() {
var response sip.Response
if online {
response = sip.NewResponseFromRequest("", req, 200, "OK", "")
} else {
response = sip.NewResponseFromRequest("", req, 403, "OK", "")
}
sendResponse(tx, response)
}()
body := req.Body()
startIndex := strings.Index(body, CmdTagStart)
@@ -273,10 +284,20 @@ func StartSipServer(config *Config_, db DeviceDB) (SipServer, error) {
}
}
break
case "keepalive":
{
device := DeviceManager.Find(message.(*QueryCatalogResponse).DeviceID)
if device != nil {
DB.KeepAliveDevice(device)
}
online = device != nil
}
break
}
})
s.db = db
s.config = config
_, p, _ := net.SplitHostPort(config.SipAddr)