Compare commits

...

2 Commits

Author SHA1 Message Date
charlestamz
dcabca1ee8 Merge pull request #45 from INeverCry/v3
1.目录订阅head添加Contact,部分厂商必须要该字段 2.添加订阅设备位置 3.添加Notify消息处理
2022-04-11 10:26:56 +08:00
lqh
7414285731 1.目录订阅head添加Contact,部分厂商必须要该字段
2.添加订阅设备位置
3.添加Notify消息处理
2022-04-09 23:00:49 +08:00
5 changed files with 233 additions and 2 deletions

View File

@@ -29,6 +29,9 @@ type ChannelEx struct {
recordEndTime time.Time
state int32
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
}
// Channel 通道

124
device.go
View File

@@ -2,6 +2,7 @@ package gb28181
import (
"fmt"
"log"
"net"
"net/http"
"strings"
@@ -204,6 +205,9 @@ func (d *Device) Subscribe() int {
requestMsg.Event = "Catalog"
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(requestMsg.Expires))
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Contact = &sip.Contact{
Uri: sip.NewURI(fmt.Sprintf("%s@%s:%d", d.Serial, d.SipIP, d.SipPort)),
}
requestMsg.Body = sip.BuildCatalogXML(d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.ContentLength = len(requestMsg.Body)
@@ -262,3 +266,123 @@ func (d *Device) QueryDeviceInfo(req *sip.Request) {
}
}
}
// MobilePositionSubscribe 移动位置订阅
func (d *Device) MobilePositionSubscribe(id string, expires int, interval int) (code int) {
mobilePosition := d.CreateMessage(sip.SUBSCRIBE)
if d.subscriber.CallID != "" {
mobilePosition.CallID = d.subscriber.CallID
}
mobilePosition.Expires = expires
mobilePosition.Event = "presence"
mobilePosition.Contact = &sip.Contact{
Uri: sip.NewURI(fmt.Sprintf("%s@%s:%d", d.Serial, d.SipIP, d.SipPort)),
}
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(mobilePosition.Expires))
mobilePosition.ContentType = "Application/MANSCDP+xml"
mobilePosition.Body = sip.BuildDevicePositionXML(d.sn, id, interval)
mobilePosition.ContentLength = len(mobilePosition.Body)
msg := &sip.Request{Message: mobilePosition}
response, err := d.Core.SipRequestForResponse(msg)
if err == nil && response != nil {
if response.GetStatusCode() == 200 {
d.subscriber.CallID = mobilePosition.CallID
} else {
d.subscriber.CallID = ""
}
return response.GetStatusCode()
}
return http.StatusRequestTimeout
}
// UpdateChannelPosition 更新通道GPS坐标
func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
if c, ok := d.channelMap[channelId]; ok {
c.ChannelEx.GpsTime, _ = time.ParseInLocation("2006-01-02 15:04:05", gpsTime, time.Local)
c.ChannelEx.Longitude = lng
c.ChannelEx.Latitude = lat
log.Printf("更新通道[%s]坐标成功\n", c.Name)
} else {
log.Printf("更新失败,未找到通道[%s]\n", channelId)
}
}
// UpdateChannelStatus 目录订阅消息处理:新增/移除/更新通道或者更改通道状态
func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
for _, v := range deviceList {
switch v.Event {
case "ON":
log.Println("收到通道上线通知")
d.channelOnline(v.DeviceID)
case "OFF":
log.Println("收到通道离线通知")
d.channelOffline(v.DeviceID)
case "VLOST":
log.Println("收到通道视频丢失通知")
d.channelOffline(v.DeviceID)
case "DEFECT":
log.Println("收到通道故障通知")
d.channelOffline(v.DeviceID)
case "ADD":
log.Println("收到通道新增通知")
channel := Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
Manufacturer: v.Manufacturer,
Model: v.Model,
Owner: v.Owner,
CivilCode: v.CivilCode,
Address: v.Address,
Parental: v.Parental,
SafetyWay: v.SafetyWay,
RegisterWay: v.RegisterWay,
Secrecy: v.Secrecy,
Status: v.Status,
}
d.addChannel(&channel)
case "DEL":
//删除
log.Println("收到通道删除通知")
delete(d.channelMap, v.DeviceID)
case "UPDATE":
fmt.Println("收到通道更新通知")
// 更新通道
channel := &Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
Manufacturer: v.Manufacturer,
Model: v.Model,
Owner: v.Owner,
CivilCode: v.CivilCode,
Address: v.Address,
Parental: v.Parental,
SafetyWay: v.SafetyWay,
RegisterWay: v.RegisterWay,
Secrecy: v.Secrecy,
Status: v.Status,
}
channels := []*Channel{channel}
d.UpdateChannels(channels)
}
}
}
func (d *Device) channelOnline(DeviceID string) {
if c, ok := d.channelMap[DeviceID]; ok {
c.Status = "ON"
log.Printf("通道[%s]在线\n", c.Name)
} else {
log.Printf("更新通道[%s]状态失败,未找到\n", DeviceID)
}
}
func (d *Device) channelOffline(DeviceID string) {
if c, ok := d.channelMap[DeviceID]; ok {
c.Status = "OFF"
log.Printf("通道[%s]离线\n", c.Name)
} else {
log.Printf("更新通道[%s]状态失败,未找到\n", DeviceID)
}
}

View File

@@ -3,15 +3,17 @@ package gb28181
import (
"bytes"
"encoding/xml"
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transaction"
"github.com/Monibuca/plugin-gb28181/v3/utils"
"github.com/logrusorgru/aurora"
. "github.com/Monibuca/utils/v3"
"golang.org/x/net/html/charset"
"net/http"
"time"
. "github.com/Monibuca/utils/v3"
"golang.org/x/net/html/charset"
)
func OnRegister(req *sip.Request, tx *transaction.GBTx) {
@@ -147,3 +149,71 @@ func onBye(req *sip.Request, tx *transaction.GBTx) {
response := &sip.Response{req.BuildOK()}
_ = tx.Respond(response)
}
// OnNotify 订阅通知处理
func OnNotify(req *sip.Request, tx *transaction.GBTx) {
if v, ok := Devices.Load(req.From.Uri.UserInfo()); ok {
d := v.(*Device)
d.UpdateTime = time.Now()
temp := &struct {
XMLName xml.Name
CmdType string
DeviceID string
Time string //位置订阅-GPS时间
Longitude string //位置订阅-经度
Latitude string //位置订阅-维度
// Speed string //位置订阅-速度(km/h)(可选)
// Direction string //位置订阅-方向(取值为当前摄像头方向与正北方的顺时针夹角,取值范围0°~360°,单位:°)(可选)
// Altitude string //位置订阅-海拔高度,单位:m(可选)
DeviceList []*notifyMessage `xml:"DeviceList>Item"` //目录订阅
}{}
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body)))
decoder.CharsetReader = charset.NewReaderLabel
err := decoder.Decode(temp)
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body))
if err != nil {
Printf("decode catelog err: %s", err)
}
}
var body string
switch temp.CmdType {
case "Catalog":
//目录状态
d.UpdateChannelStatus(temp.DeviceList)
case "MobilePosition":
//更新channel的坐标
d.UpdateChannelPosition(temp.DeviceID, temp.Time, temp.Longitude, temp.Latitude)
// case "Alarm":
// //报警事件通知 TODO
default:
Println("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:\n", req.Body)
response := &sip.Response{req.BuildResponse(http.StatusBadRequest)}
tx.Respond(response)
return
}
buildOK := req.BuildOK()
buildOK.Body = body
response := &sip.Response{buildOK}
tx.Respond(response)
}
}
type notifyMessage struct {
DeviceID string
ParentID string
Name string
Manufacturer string
Model string
Owner string
CivilCode string
Address string
Parental int
SafetyWay int
RegisterWay int
Secrecy int
Status string
//状态改变事件 ON:上线,OFF:离线,VLOST:视频丢失,DEFECT:故障,ADD:增加,DEL:删除,UPDATE:更新(必选)
Event string
}

21
main.go
View File

@@ -158,6 +158,7 @@ func run() {
s := transaction.NewCore(serverConfig)
s.RegistHandler(sip.REGISTER, OnRegister)
s.RegistHandler(sip.MESSAGE, OnMessage)
s.RegistHandler(sip.NOTIFY, OnNotify)
s.RegistHandler(sip.BYE, onBye)
//OnStreamClosedHooks.AddHook(func(stream *Stream) {
@@ -253,6 +254,26 @@ func run() {
w.WriteHeader(404)
}
})
http.HandleFunc("/api/gb28181/position", func(w http.ResponseWriter, r *http.Request) {
CORS(w, r)
query := r.URL.Query()
//设备id
id := query.Get("id")
//订阅周期(单位:秒)
expires := query.Get("expires")
//订阅间隔(单位:秒)
interval := query.Get("interval")
expiresInt, _ := strconv.Atoi(expires)
intervalInt, _ := strconv.Atoi(interval)
if v, ok := Devices.Load(id); ok {
d := v.(*Device)
w.WriteHeader(d.MobilePositionSubscribe(id, expiresInt, intervalInt))
} else {
w.WriteHeader(404)
}
})
s.StartAndWait()
}

View File

@@ -38,6 +38,14 @@ var (
<DeviceID>%s</DeviceID>
</Query>
`
// DevicePositionXML 订阅设备位置
DevicePositionXML = `<?xml version="1.0"?>
<Query>
<CmdType>MobilePosition</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
<Interval>%d</Interval>
</Query>`
)
// BuildDeviceInfoXML 获取设备详情指令
@@ -54,3 +62,8 @@ func BuildCatalogXML(sn int, id string) string {
func BuildRecordInfoXML(sn int, id string, start, end int64) string {
return fmt.Sprintf(RecordInfoXML, sn, id, time.Unix(start, 0).Format("2006-01-02T15:04:05"), time.Unix(end, 0).Format("2006-01-02T15:04:05"))
}
// BuildDevicePositionXML 订阅设备位置
func BuildDevicePositionXML(sn int, id string, interval int) string {
return fmt.Sprintf(DevicePositionXML, sn, id, interval)
}