增加subscribe信令定时发送

This commit is contained in:
dexter
2021-10-06 08:52:42 +08:00
parent fd2e614d74
commit ad0402fcbf
3 changed files with 49 additions and 34 deletions

View File

@@ -26,7 +26,6 @@ type ChannelEx struct {
RecordEndTime string
recordStartTime time.Time
recordEndTime time.Time
alive bool
state int32
}

View File

@@ -47,6 +47,10 @@ type Device struct {
SipIP string //暴露的IP
channelMap map[string]*Channel
channelMutex sync.RWMutex
subscriber struct {
CallID string
Timeout time.Time
}
}
func (d *Device) addChannel(channel *Channel) {
@@ -84,7 +88,6 @@ func (d *Device) UpdateChannels(list []*Channel) {
}
if old, ok := d.channelMap[c.DeviceID]; ok {
c.ChannelEx = old.ChannelEx
c.alive = true
if len(old.Children) == 0 {
if config.PreFetchRecord {
n := time.Now()
@@ -100,7 +103,6 @@ func (d *Device) UpdateChannels(list []*Channel) {
} else {
c.ChannelEx = &ChannelEx{
device: d,
alive: true,
}
}
if s := engine.FindStream("sub/" + c.DeviceID); s != nil {
@@ -145,6 +147,29 @@ func (d *Device) CreateMessage(Method sip.Method) (requestMsg *sip.Message) {
}
return
}
func (d *Device) Subscribe() int {
requestMsg := d.CreateMessage(sip.SUBSCRIBE)
if d.subscriber.CallID != "" {
requestMsg.CallID = d.subscriber.CallID
}
requestMsg.Expires = 3600
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(requestMsg.Expires))
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Body = fmt.Sprintf(`<?xml version="1.0"?>
<Query>
<CmdType>Catalog</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
</Query>`, d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.ContentLength = len(requestMsg.Body)
response := d.SendMessage(requestMsg)
if response.Code == 200 {
d.subscriber.CallID = requestMsg.CallID
} else {
d.subscriber.CallID = ""
}
return response.Code
}
func (d *Device) Query() int {
requestMsg := d.CreateMessage(sip.MESSAGE)
requestMsg.ContentType = "Application/MANSCDP+xml"
@@ -159,19 +184,5 @@ func (d *Device) Query() int {
if response.Data != nil && response.Data.Via.Params["received"] != "" {
d.SipIP = response.Data.Via.Params["received"]
}
if response.Code == 200 {
d.channelMutex.Lock()
var stillAlive []*Channel
for _, c := range d.Channels {
if !c.alive {
delete(d.channelMap, c.DeviceID)
continue
}
c.alive = false
stillAlive = append(stillAlive, c)
}
d.Channels = stillAlive
d.channelMutex.Unlock()
}
return response.Code
}

39
main.go
View File

@@ -265,8 +265,13 @@ func run() {
}
switch temp.XMLName.Local {
case "Notify":
if d.Channels == nil {
go d.Query()
switch temp.CmdType {
case "Keeyalive":
if time.Now().After(d.subscriber.Timeout) {
go d.Subscribe()
}
case "Catalog":
d.UpdateChannels(temp.DeviceList)
}
case "Response":
switch temp.CmdType {
@@ -291,7 +296,7 @@ func run() {
// })
//})
go listenMedia()
go queryCatalog(config)
// go queryCatalog(config)
if config.Username != "" || config.Password != "" {
go removeBanDevice(config)
}
@@ -328,20 +333,20 @@ func listenMedia() {
}
}
func queryCatalog(config *transaction.Config) {
t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
for range t.C {
Devices.Range(func(key, value interface{}) bool {
device := value.(*Device)
if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
Devices.Delete(key)
} else {
go device.Query()
}
return true
})
}
}
// func queryCatalog(config *transaction.Config) {
// t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
// for range t.C {
// Devices.Range(func(key, value interface{}) bool {
// device := value.(*Device)
// if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
// Devices.Delete(key)
// } else if device.Channels != nil {
// go device.Subscribe()
// }
// return true
// })
// }
// }
func onRegister(s *transaction.Core, config *transaction.Config, d *Device) {
if old, ok := Devices.Load(d.ID); !ok {