mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64ac75905f | ||
|
|
d2cc62ff9e | ||
|
|
2e8aa47bc5 | ||
|
|
585d5949d3 | ||
|
|
0285236cce | ||
|
|
12895fa2cc | ||
|
|
c66303e7e8 | ||
|
|
5435c2ef1c | ||
|
|
d8c6ad30dd | ||
|
|
86fa7cc7e6 | ||
|
|
692ec21877 | ||
|
|
71f2b36d2d | ||
|
|
b068bd9e5b | ||
|
|
78ac89e7af | ||
|
|
1cec5301c3 | ||
|
|
319f7fc636 | ||
|
|
7a75810203 | ||
|
|
bbc7b09835 | ||
|
|
1dbdff1fe5 | ||
|
|
952c8f0ff8 | ||
|
|
730f3014f8 |
@@ -133,8 +133,8 @@ http 200 表示成功,404流不存在
|
||||
| --------- | ---- | -------------------------------------------- |
|
||||
| id | 是 | 设备ID |
|
||||
| channel | 是 | 通道编号 |
|
||||
| startTime | 否 | 开始时间(字符串,格式:2021-7-23T12:00:00) |
|
||||
| endTime | 否 | 结束时间(字符串格式同上) |
|
||||
| startTime | 否 | 开始时间(Unix时间戳) |
|
||||
| endTime | 否 | 结束时间(Unix时间戳) |
|
||||
|
||||
### 移动位置订阅
|
||||
|
||||
|
||||
180
channel.go
180
channel.go
@@ -25,19 +25,23 @@ type PullStream struct {
|
||||
inviteRes sip.Response
|
||||
}
|
||||
|
||||
func (p *PullStream) Bye() int {
|
||||
func (p *PullStream) CreateRequest(method sip.RequestMethod) (req sip.Request) {
|
||||
res := p.inviteRes
|
||||
bye := p.channel.CreateRequst(sip.BYE)
|
||||
req = p.channel.CreateRequst(method)
|
||||
from, _ := res.From()
|
||||
to, _ := res.To()
|
||||
callId, _ := res.CallID()
|
||||
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
|
||||
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
|
||||
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
|
||||
resp, err := p.channel.device.SipRequestForResponse(bye)
|
||||
req.ReplaceHeaders(from.Name(), []sip.Header{from})
|
||||
req.ReplaceHeaders(to.Name(), []sip.Header{to})
|
||||
req.ReplaceHeaders(callId.Name(), []sip.Header{callId})
|
||||
return
|
||||
}
|
||||
|
||||
func (p *PullStream) Bye() int {
|
||||
req := p.CreateRequest(sip.BYE)
|
||||
resp, err := p.channel.device.SipRequestForResponse(req)
|
||||
if p.opt.IsLive() {
|
||||
p.channel.status.Store(0)
|
||||
// defer p.channel.TryAutoInvite(p.opt)
|
||||
}
|
||||
if p.opt.recyclePort != nil {
|
||||
p.opt.recyclePort(p.opt.MediaPort)
|
||||
@@ -48,6 +52,62 @@ func (p *PullStream) Bye() int {
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
func (p *PullStream) info(body string) int {
|
||||
d := p.channel.device
|
||||
req := p.CreateRequest(sip.INFO)
|
||||
contentType := sip.ContentType("Application/MANSRTSP")
|
||||
req.AppendHeader(&contentType)
|
||||
req.SetBody(body, true)
|
||||
|
||||
resp, err := d.SipRequestForResponse(req)
|
||||
if err != nil {
|
||||
log.Warnf("Send info to stream error: %v, stream=%s, body=%s", err, p.opt.StreamPath, body)
|
||||
return getSipRespErrorCode(err)
|
||||
}
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
// 暂停播放
|
||||
func (p *PullStream) Pause() int {
|
||||
body := fmt.Sprintf(`PAUSE RTSP/1.0
|
||||
CSeq: %d
|
||||
PauseTime: now
|
||||
`, p.channel.device.sn)
|
||||
return p.info(body)
|
||||
}
|
||||
|
||||
// 恢复播放
|
||||
func (p *PullStream) Resume() int {
|
||||
d := p.channel.device
|
||||
body := fmt.Sprintf(`PLAY RTSP/1.0
|
||||
CSeq: %d
|
||||
Range: npt=now-
|
||||
`, d.sn)
|
||||
return p.info(body)
|
||||
}
|
||||
|
||||
// 跳转到播放时间
|
||||
// second: 相对于起始点调整到第 sec 秒播放
|
||||
func (p *PullStream) PlayAt(second uint) int {
|
||||
d := p.channel.device
|
||||
body := fmt.Sprintf(`PLAY RTSP/1.0
|
||||
CSeq: %d
|
||||
Range: npt=%d-
|
||||
`, d.sn, second)
|
||||
return p.info(body)
|
||||
}
|
||||
|
||||
// 快进/快退播放
|
||||
// speed 取值: 0.25 0.5 1 2 4 或者其对应的负数表示倒放
|
||||
func (p *PullStream) PlayForward(speed float32) int {
|
||||
d := p.channel.device
|
||||
body := fmt.Sprintf(`PLAY RTSP/1.0
|
||||
CSeq: %d
|
||||
Scale: %0.6f
|
||||
`, d.sn, speed)
|
||||
return p.info(body)
|
||||
}
|
||||
|
||||
type Channel struct {
|
||||
device *Device // 所属设备
|
||||
status atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放
|
||||
@@ -74,9 +134,16 @@ type ChannelInfo struct {
|
||||
SafetyWay int
|
||||
RegisterWay int
|
||||
Secrecy int
|
||||
Status string
|
||||
Status ChannelStatus
|
||||
}
|
||||
|
||||
type ChannelStatus string
|
||||
|
||||
const (
|
||||
ChannelOnStatus = "ON"
|
||||
ChannelOffStatus = "OFF"
|
||||
)
|
||||
|
||||
func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
d := channel.device
|
||||
d.sn++
|
||||
@@ -142,16 +209,19 @@ func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error
|
||||
request := d.CreateRequest(sip.MESSAGE)
|
||||
contentType := sip.ContentType("Application/MANSCDP+xml")
|
||||
request.AppendHeader(&contentType)
|
||||
body := fmt.Sprintf(`<?xml version="1.0"?>
|
||||
<Query>
|
||||
<CmdType>RecordInfo</CmdType>
|
||||
<SN>%d</SN>
|
||||
<DeviceID>%s</DeviceID>
|
||||
<StartTime>%s</StartTime>
|
||||
<EndTime>%s</EndTime>
|
||||
<Secrecy>0</Secrecy>
|
||||
<Type>all</Type>
|
||||
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
|
||||
// body := fmt.Sprintf(`<?xml version="1.0"?>
|
||||
// <Query>
|
||||
// <CmdType>RecordInfo</CmdType>
|
||||
// <SN>%d</SN>
|
||||
// <DeviceID>%s</DeviceID>
|
||||
// <StartTime>%s</StartTime>
|
||||
// <EndTime>%s</EndTime>
|
||||
// <Secrecy>0</Secrecy>
|
||||
// <Type>all</Type>
|
||||
// </Query>`, d.sn, channel.DeviceID, startTime, endTime)
|
||||
start, _ := strconv.ParseInt(startTime, 10, 0)
|
||||
end, _ := strconv.ParseInt(endTime, 10, 0)
|
||||
body := BuildRecordInfoXML(d.sn, channel.DeviceID, start, end)
|
||||
request.SetBody(body, true)
|
||||
|
||||
resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.sn, QUERY_RECORD_TIMEOUT)
|
||||
@@ -265,26 +335,28 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
}
|
||||
if opt.StreamPath != "" {
|
||||
streamPath = opt.StreamPath
|
||||
} else {
|
||||
opt.StreamPath = streamPath
|
||||
}
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
protocol := ""
|
||||
networkType := "udp"
|
||||
resuePort := true
|
||||
reusePort := true
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
networkType = "tcp"
|
||||
protocol = "TCP/"
|
||||
if conf.tcpPorts.Valid {
|
||||
opt.MediaPort, err = conf.tcpPorts.GetPort()
|
||||
opt.recyclePort = conf.tcpPorts.Recycle
|
||||
resuePort = false
|
||||
reusePort = false
|
||||
}
|
||||
} else {
|
||||
if conf.udpPorts.Valid {
|
||||
opt.MediaPort, err = conf.udpPorts.GetPort()
|
||||
opt.recyclePort = conf.udpPorts.Recycle
|
||||
resuePort = false
|
||||
reusePort = false
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -337,11 +409,20 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
} else {
|
||||
channel.Error("read invite response y ", zap.Error(err))
|
||||
}
|
||||
break
|
||||
// break
|
||||
}
|
||||
if ls[0] == "m" && len(ls[1]) > 0 {
|
||||
netinfo := strings.Split(ls[1], " ")
|
||||
if strings.ToUpper(netinfo[2]) == "TCP/RTP/AVP" {
|
||||
channel.Debug("Device support tcp")
|
||||
} else {
|
||||
channel.Debug("Device not support tcp")
|
||||
networkType = "udp"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
err = ps.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, resuePort)
|
||||
err = ps.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, reusePort)
|
||||
if err == nil {
|
||||
PullStreams.Store(streamPath, &PullStream{
|
||||
opt: opt,
|
||||
@@ -369,6 +450,49 @@ func (channel *Channel) Bye(streamPath string) int {
|
||||
return http.StatusNotFound
|
||||
}
|
||||
|
||||
func (channel *Channel) Pause(streamPath string) int {
|
||||
if s, loaded := PullStreams.Load(streamPath); loaded {
|
||||
r := s.(*PullStream).Pause()
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Pause()
|
||||
}
|
||||
return r
|
||||
}
|
||||
return http.StatusNotFound
|
||||
}
|
||||
|
||||
func (channel *Channel) Resume(streamPath string) int {
|
||||
if s, loaded := PullStreams.Load(streamPath); loaded {
|
||||
r := s.(*PullStream).Resume()
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Resume()
|
||||
}
|
||||
return r
|
||||
}
|
||||
return http.StatusNotFound
|
||||
}
|
||||
|
||||
func (channel *Channel) PlayAt(streamPath string, second uint) int {
|
||||
if s, loaded := PullStreams.Load(streamPath); loaded {
|
||||
r := s.(*PullStream).PlayAt(second)
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Resume()
|
||||
}
|
||||
return r
|
||||
}
|
||||
return http.StatusNotFound
|
||||
}
|
||||
|
||||
func (channel *Channel) PlayForward(streamPath string, speed float32) int {
|
||||
if s, loaded := PullStreams.Load(streamPath); loaded {
|
||||
return s.(*PullStream).PlayForward(speed)
|
||||
}
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Resume()
|
||||
}
|
||||
return http.StatusNotFound
|
||||
}
|
||||
|
||||
func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
|
||||
if channel.CanInvite() {
|
||||
go channel.Invite(opt)
|
||||
@@ -376,7 +500,7 @@ func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
|
||||
}
|
||||
|
||||
func (channel *Channel) CanInvite() bool {
|
||||
if channel.status.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
|
||||
if channel.status.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == ChannelOffStatus {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -403,3 +527,11 @@ func (channel *Channel) CanInvite() bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func getSipRespErrorCode(err error) int {
|
||||
if re, ok := err.(*sip.RequestError); ok {
|
||||
return int(re.Code)
|
||||
} else {
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
}
|
||||
|
||||
28
device.go
28
device.go
@@ -44,6 +44,16 @@ var (
|
||||
DeviceRegisterCount sync.Map //设备注册次数
|
||||
)
|
||||
|
||||
type DeviceStatus string
|
||||
|
||||
const (
|
||||
DeviceRegisterStatus = "REGISTER"
|
||||
DeviceRecoverStatus = "RECOVER"
|
||||
DeviceOnlineStatus = "ONLINE"
|
||||
DeviceOfflineStatus = "OFFLINE"
|
||||
DeviceAlarmedStatus = "ALARMED"
|
||||
)
|
||||
|
||||
type Device struct {
|
||||
//*transaction.Core `json:"-" yaml:"-"`
|
||||
ID string
|
||||
@@ -54,7 +64,7 @@ type Device struct {
|
||||
RegisterTime time.Time
|
||||
UpdateTime time.Time
|
||||
LastKeepaliveAt time.Time
|
||||
Status string
|
||||
Status DeviceStatus
|
||||
sn int
|
||||
addr sip.Address
|
||||
sipIP string //设备对应网卡的服务器ip
|
||||
@@ -114,7 +124,7 @@ func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
mediaIp = c.MediaIP
|
||||
}
|
||||
d.Info("RecoverDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
|
||||
d.Status = string(sip.REGISTER)
|
||||
d.Status = DeviceRegisterStatus
|
||||
d.sipIP = sipIP
|
||||
d.mediaIP = mediaIp
|
||||
d.NetAddr = deviceIp
|
||||
@@ -158,7 +168,7 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
|
||||
ID: id,
|
||||
RegisterTime: time.Now(),
|
||||
UpdateTime: time.Now(),
|
||||
Status: string(sip.REGISTER),
|
||||
Status: DeviceRegisterStatus,
|
||||
addr: deviceAddr,
|
||||
sipIP: sipIP,
|
||||
mediaIP: mediaIp,
|
||||
@@ -481,7 +491,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
SafetyWay: v.SafetyWay,
|
||||
RegisterWay: v.RegisterWay,
|
||||
Secrecy: v.Secrecy,
|
||||
Status: v.Status,
|
||||
Status: ChannelStatus(v.Status),
|
||||
}
|
||||
d.addOrUpdateChannel(channel)
|
||||
case "DEL":
|
||||
@@ -505,7 +515,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
SafetyWay: v.SafetyWay,
|
||||
RegisterWay: v.RegisterWay,
|
||||
Secrecy: v.Secrecy,
|
||||
Status: v.Status,
|
||||
Status: ChannelStatus(v.Status),
|
||||
}
|
||||
d.UpdateChannels(channel)
|
||||
}
|
||||
@@ -515,8 +525,8 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
func (d *Device) channelOnline(DeviceID string) {
|
||||
if v, ok := d.channelMap.Load(DeviceID); ok {
|
||||
c := v.(*Channel)
|
||||
c.Status = "ON"
|
||||
c.Debug("online")
|
||||
c.Status = ChannelOnStatus
|
||||
c.Debug("channel online", zap.String("channelId", DeviceID))
|
||||
} else {
|
||||
d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
|
||||
}
|
||||
@@ -525,8 +535,8 @@ func (d *Device) channelOnline(DeviceID string) {
|
||||
func (d *Device) channelOffline(DeviceID string) {
|
||||
if v, ok := d.channelMap.Load(DeviceID); ok {
|
||||
c := v.(*Channel)
|
||||
c.Status = "OFF"
|
||||
c.Debug("offline")
|
||||
c.Status = ChannelOffStatus
|
||||
c.Debug("channel offline", zap.String("channelId", DeviceID))
|
||||
} else {
|
||||
d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
|
||||
}
|
||||
|
||||
73
handle.go
73
handle.go
@@ -5,6 +5,7 @@ import (
|
||||
"crypto/md5"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -58,7 +59,39 @@ func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
|
||||
return
|
||||
}
|
||||
id := from.Address.User().String()
|
||||
GB28181Plugin.Info("OnRegister", zap.String("id", id), zap.String("source", req.Source()), zap.String("destination", req.Destination()))
|
||||
|
||||
GB28181Plugin.Debug("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
|
||||
|
||||
isUnregister := false
|
||||
if exps := req.GetHeaders("Expires"); len(exps) > 0 {
|
||||
exp := exps[0]
|
||||
expSec, err := strconv.ParseInt(exp.Value(), 10, 32)
|
||||
if err != nil {
|
||||
GB28181Plugin.Info("OnRegister",
|
||||
zap.String("error", fmt.Sprintf("wrong expire header value %q", exp)),
|
||||
zap.String("id", id),
|
||||
zap.String("source", req.Source()),
|
||||
zap.String("destination", req.Destination()))
|
||||
return
|
||||
}
|
||||
if expSec == 0 {
|
||||
isUnregister = true
|
||||
}
|
||||
} else {
|
||||
GB28181Plugin.Info("OnRegister",
|
||||
zap.String("error", "has no expire header"),
|
||||
zap.String("id", id),
|
||||
zap.String("source", req.Source()),
|
||||
zap.String("destination", req.Destination()))
|
||||
return
|
||||
}
|
||||
|
||||
GB28181Plugin.Info("OnRegister",
|
||||
zap.Bool("isUnregister", isUnregister),
|
||||
zap.String("id", id),
|
||||
zap.String("source", req.Source()),
|
||||
zap.String("destination", req.Destination()))
|
||||
|
||||
if len(id) != 20 {
|
||||
GB28181Plugin.Info("Wrong GB-28181", zap.String("id", id))
|
||||
return
|
||||
@@ -98,11 +131,21 @@ func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
|
||||
}
|
||||
if passAuth {
|
||||
var d *Device
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d = v.(*Device)
|
||||
c.RecoverDevice(d, req)
|
||||
if isUnregister {
|
||||
tmpd, ok := Devices.LoadAndDelete(id)
|
||||
if ok {
|
||||
GB28181Plugin.Info("Unregister Device", zap.String("id", id))
|
||||
d = tmpd.(*Device)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
d = c.StoreDevice(id, req)
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d = v.(*Device)
|
||||
c.RecoverDevice(d, req)
|
||||
} else {
|
||||
d = c.StoreDevice(id, req)
|
||||
}
|
||||
}
|
||||
DeviceNonce.Delete(id)
|
||||
DeviceRegisterCount.Delete(id)
|
||||
@@ -117,9 +160,14 @@ func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
|
||||
Contents: time.Now().Format(TIME_LAYOUT),
|
||||
})
|
||||
_ = tx.Respond(resp)
|
||||
//订阅设备更新
|
||||
go d.syncChannels()
|
||||
|
||||
if !isUnregister {
|
||||
//订阅设备更新
|
||||
go d.syncChannels()
|
||||
}
|
||||
} else {
|
||||
GB28181Plugin.Info("OnRegister unauthorized", zap.String("id", id), zap.String("source", req.Source()),
|
||||
zap.String("destination", req.Destination()))
|
||||
response := sip.NewResponseFromRequest("", req, http.StatusUnauthorized, "Unauthorized", "")
|
||||
_nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
|
||||
auth := fmt.Sprintf(
|
||||
@@ -154,12 +202,11 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d := v.(*Device)
|
||||
switch d.Status {
|
||||
case "RECOVER":
|
||||
case DeviceOfflineStatus, DeviceRecoverStatus:
|
||||
c.RecoverDevice(d, req)
|
||||
go d.syncChannels()
|
||||
//return
|
||||
case string(sip.REGISTER):
|
||||
d.Status = "ONLINE"
|
||||
case DeviceRegisterStatus:
|
||||
d.Status = DeviceOnlineStatus
|
||||
}
|
||||
d.UpdateTime = time.Now()
|
||||
temp := &struct {
|
||||
@@ -214,7 +261,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
d.Manufacturer = temp.Manufacturer
|
||||
d.Model = temp.Model
|
||||
case "Alarm":
|
||||
d.Status = "Alarmed"
|
||||
d.Status = DeviceAlarmedStatus
|
||||
body = BuildAlarmResponseXML(d.ID)
|
||||
default:
|
||||
d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
|
||||
@@ -224,6 +271,8 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
}
|
||||
|
||||
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
|
||||
} else {
|
||||
GB28181Plugin.Debug("Unauthorized message, device not found", zap.String("id", id))
|
||||
}
|
||||
}
|
||||
func (c *GB28181Config) OnBye(req sip.Request, tx sip.ServerTransaction) {
|
||||
|
||||
25
link.go
25
link.go
@@ -39,7 +39,6 @@ func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
|
||||
pendingResult: make(map[string]recordQueryResult),
|
||||
pendingResp: make(map[string]recordQueryResp),
|
||||
}
|
||||
go c.cleanTimeout()
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -50,22 +49,18 @@ func recordQueryKey(deviceId, channelId string, sn int) string {
|
||||
|
||||
// 定期清理过期的查询结果和请求
|
||||
func (c *recordQueryLink) cleanTimeout() {
|
||||
tick := time.NewTicker(time.Millisecond * 100)
|
||||
for {
|
||||
<-tick.C
|
||||
for k, s := range c.pendingResp {
|
||||
if time.Since(s.startTime) > s.timeout {
|
||||
if r, ok := c.pendingResult[k]; ok {
|
||||
c.notify(k, r)
|
||||
} else {
|
||||
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
|
||||
}
|
||||
for k, s := range c.pendingResp {
|
||||
if time.Since(s.startTime) > s.timeout {
|
||||
if r, ok := c.pendingResult[k]; ok {
|
||||
c.notify(k, r)
|
||||
} else {
|
||||
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
|
||||
}
|
||||
}
|
||||
for k, r := range c.pendingResult {
|
||||
if time.Since(r.time) > c.timeout {
|
||||
delete(c.pendingResult, k)
|
||||
}
|
||||
}
|
||||
for k, r := range c.pendingResult {
|
||||
if time.Since(r.time) > c.timeout {
|
||||
delete(c.pendingResult, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
29
main.go
29
main.go
@@ -32,10 +32,10 @@ type GB28181Config struct {
|
||||
Password string //sip 服务器密码
|
||||
Port struct { // 新配置方式
|
||||
Sip string `default:"udp:5060"`
|
||||
Media string `default:"tcp:58200"`
|
||||
Media string `default:"tcp:58200-59200"`
|
||||
}
|
||||
// AckTimeout uint16 //sip 服务应答超时,单位秒
|
||||
RegisterValidity time.Duration `default:"60s"` //注册有效期,单位秒,默认 3600
|
||||
RegisterValidity time.Duration `default:"3600s"` //注册有效期,单位秒,默认 3600
|
||||
// RegisterInterval int //注册间隔,单位秒,默认 60
|
||||
HeartbeatInterval time.Duration `default:"60s"` //心跳间隔,单位秒,默认 60
|
||||
// HeartbeatRetry int //心跳超时次数,默认 3
|
||||
@@ -44,8 +44,8 @@ type GB28181Config struct {
|
||||
MediaIP string //媒体服务器地址
|
||||
MediaPort uint16 `default:"58200"` //媒体服务器端口
|
||||
MediaNetwork string `default:"tcp"` //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPortMin uint16
|
||||
MediaPortMax uint16
|
||||
MediaPortMin uint16 `default:"58200"`
|
||||
MediaPortMax uint16 `default:"59200"`
|
||||
// MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
|
||||
|
||||
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
|
||||
@@ -82,13 +82,15 @@ func (c *GB28181Config) OnEvent(event any) {
|
||||
c.SipNetwork = protocol
|
||||
c.SipPort = ports[0]
|
||||
}
|
||||
if c.Port.Media != "tcp:58200" {
|
||||
if c.Port.Media != "tcp:58200-59200" {
|
||||
protocol, ports := util.Conf2Listener(c.Port.Media)
|
||||
c.MediaNetwork = protocol
|
||||
if len(ports) > 1 {
|
||||
c.MediaPortMin = ports[0]
|
||||
c.MediaPortMax = ports[1]
|
||||
} else {
|
||||
c.MediaPortMin = 0
|
||||
c.MediaPortMax = 0
|
||||
c.MediaPort = ports[0]
|
||||
}
|
||||
}
|
||||
@@ -98,8 +100,21 @@ func (c *GB28181Config) OnEvent(event any) {
|
||||
c.startServer()
|
||||
case *Stream:
|
||||
if c.InviteMode == INVIDE_MODE_ONSUBSCRIBE {
|
||||
if channel := FindChannel(e.AppName, e.StreamName); channel != nil {
|
||||
channel.TryAutoInvite(&InviteOptions{})
|
||||
//流可能是回放流,stream path是device/channel/start-end形式
|
||||
streamNames := strings.Split(e.StreamName, "/")
|
||||
if channel := FindChannel(e.AppName, streamNames[0]); channel != nil {
|
||||
opt := InviteOptions{}
|
||||
if len(streamNames) > 1 {
|
||||
last := len(streamNames) - 1
|
||||
timestr := streamNames[last]
|
||||
trange := strings.Split(timestr, "-")
|
||||
if len(trange) == 2 {
|
||||
startTime := trange[0]
|
||||
endTime := trange[1]
|
||||
opt.Validate(startTime, endTime)
|
||||
}
|
||||
}
|
||||
channel.TryAutoInvite(&opt)
|
||||
}
|
||||
}
|
||||
case SEpublish:
|
||||
|
||||
14
manscdp.go
14
manscdp.go
@@ -2,6 +2,7 @@ package gb28181
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -43,6 +44,17 @@ var (
|
||||
</Query>`
|
||||
)
|
||||
|
||||
func intTotime(t int64) time.Time {
|
||||
tstr := strconv.FormatInt(t, 10)
|
||||
if len(tstr) == 10 {
|
||||
return time.Unix(t, 0)
|
||||
}
|
||||
if len(tstr) == 13 {
|
||||
return time.UnixMilli(t)
|
||||
}
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
// BuildDeviceInfoXML 获取设备详情指令
|
||||
func BuildDeviceInfoXML(sn int, id string) string {
|
||||
return fmt.Sprintf(DeviceInfoXML, sn, id)
|
||||
@@ -55,7 +67,7 @@ func BuildCatalogXML(sn int, id string) string {
|
||||
|
||||
// BuildRecordInfoXML 获取录像文件列表指令
|
||||
func BuildRecordInfoXML(sn int, id string, start, end int64) string {
|
||||
return fmt.Sprintf(RecordInfoXML, sn, id, time.Unix(start, 0).Format("2006-01-02T15:04:05"), time.Unix(end, 0).Format("2006-01-02T15:04:05"))
|
||||
return fmt.Sprintf(RecordInfoXML, sn, id, intTotime(start).Format("2006-01-02T15:04:05"), intTotime(end).Format("2006-01-02T15:04:05"))
|
||||
}
|
||||
|
||||
// BuildDevicePositionXML 订阅设备位置
|
||||
|
||||
72
restful.go
72
restful.go
@@ -11,16 +11,15 @@ import (
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
var (
|
||||
playScaleValues = map[float32]bool{0.25: true, 0.5: true, 1: true, 2: true, 4: true}
|
||||
)
|
||||
|
||||
func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
|
||||
util.ReturnJson(func() (list []*Device) {
|
||||
list = make([]*Device, 0)
|
||||
Devices.Range(func(key, value interface{}) bool {
|
||||
device := value.(*Device)
|
||||
if time.Since(device.UpdateTime) > c.RegisterValidity {
|
||||
Devices.Delete(key)
|
||||
} else {
|
||||
list = append(list, device)
|
||||
}
|
||||
list = append(list, value.(*Device))
|
||||
return true
|
||||
})
|
||||
return
|
||||
@@ -73,19 +72,23 @@ func (c *GB28181Config) API_ptz(w http.ResponseWriter, r *http.Request) {
|
||||
hsN, err := strconv.ParseUint(hs, 10, 8)
|
||||
if err != nil {
|
||||
WriteJSON(w, "hSpeed parameter is invalid", 400)
|
||||
return
|
||||
}
|
||||
vsN, err := strconv.ParseUint(vs, 10, 8)
|
||||
if err != nil {
|
||||
WriteJSON(w, "vSpeed parameter is invalid", 400)
|
||||
return
|
||||
}
|
||||
zsN, err := strconv.ParseUint(zs, 10, 8)
|
||||
if err != nil {
|
||||
WriteJSON(w, "zSpeed parameter is invalid", 400)
|
||||
return
|
||||
}
|
||||
|
||||
ptzcmd, err := toPtzStrByCmdName(cmd, uint8(hsN), uint8(vsN), uint8(zsN))
|
||||
if err != nil {
|
||||
WriteJSON(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
code := c.Control(ptzcmd)
|
||||
@@ -136,6 +139,63 @@ func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_play_pause(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Pause(streamPath))
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_play_resume(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Resume(streamPath))
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_play_seek(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
secStr := r.URL.Query().Get("second")
|
||||
sec, err := strconv.ParseUint(secStr, 10, 32)
|
||||
if err != nil {
|
||||
WriteJSON(w, "second parameter is invalid: "+err.Error(), 400)
|
||||
return
|
||||
}
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.PlayAt(streamPath, uint(sec)))
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_play_forward(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
speedStr := r.URL.Query().Get("speed")
|
||||
speed, err := strconv.ParseFloat(speedStr, 32)
|
||||
secondErrMsg := "speed parameter is invalid, should be one of 0.25,0.5,1,2,4"
|
||||
if err != nil || !playScaleValues[float32(speed)] {
|
||||
WriteJSON(w, secondErrMsg, 400)
|
||||
return
|
||||
}
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.PlayForward(streamPath, float32(speed)))
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_position(w http.ResponseWriter, r *http.Request) {
|
||||
//CORS(w, r)
|
||||
query := r.URL.Query()
|
||||
|
||||
65
server.go
65
server.go
@@ -142,10 +142,7 @@ func (c *GB28181Config) startServer() {
|
||||
} else {
|
||||
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
}
|
||||
|
||||
if c.Username != "" || c.Password != "" {
|
||||
go c.removeBanDevice()
|
||||
}
|
||||
go c.startJob()
|
||||
}
|
||||
|
||||
// func queryCatalog(config *transaction.Config) {
|
||||
@@ -163,14 +160,58 @@ func (c *GB28181Config) startServer() {
|
||||
// }
|
||||
// }
|
||||
|
||||
func (c *GB28181Config) removeBanDevice() {
|
||||
t := time.NewTicker(c.RemoveBanInterval)
|
||||
for range t.C {
|
||||
DeviceRegisterCount.Range(func(key, value interface{}) bool {
|
||||
if value.(int) > MaxRegisterCount {
|
||||
DeviceRegisterCount.Delete(key)
|
||||
// 定时任务
|
||||
func (c *GB28181Config) startJob() {
|
||||
statusTick := time.NewTicker(c.HeartbeatInterval / 2)
|
||||
banTick := time.NewTicker(c.RemoveBanInterval)
|
||||
linkTick := time.NewTicker(time.Millisecond * 100)
|
||||
GB28181Plugin.Debug("start job")
|
||||
for {
|
||||
select {
|
||||
case <-banTick.C:
|
||||
if c.Username != "" || c.Password != "" {
|
||||
c.removeBanDevice()
|
||||
}
|
||||
return true
|
||||
})
|
||||
case <-statusTick.C:
|
||||
c.statusCheck()
|
||||
case <-linkTick.C:
|
||||
RecordQueryLink.cleanTimeout()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) removeBanDevice() {
|
||||
DeviceRegisterCount.Range(func(key, value interface{}) bool {
|
||||
if value.(int) > MaxRegisterCount {
|
||||
DeviceRegisterCount.Delete(key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// statusCheck
|
||||
// - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线
|
||||
// - 当设备超过注册有效期内为发送过消息,则从设备列表中删除
|
||||
// UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间
|
||||
func (c *GB28181Config) statusCheck() {
|
||||
Devices.Range(func(key, value any) bool {
|
||||
d := value.(*Device)
|
||||
if time.Since(d.UpdateTime) > c.RegisterValidity {
|
||||
Devices.Delete(key)
|
||||
GB28181Plugin.Info("Device register timeout",
|
||||
zap.String("id", d.ID),
|
||||
zap.Time("registerTime", d.RegisterTime),
|
||||
zap.Time("updateTime", d.UpdateTime),
|
||||
)
|
||||
} else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
|
||||
d.Status = DeviceOfflineStatus
|
||||
d.channelMap.Range(func(key, value any) bool {
|
||||
ch := value.(*Channel)
|
||||
ch.Status = ChannelOffStatus
|
||||
return true
|
||||
})
|
||||
GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user