mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3046bcde3 | ||
|
|
9c970ad282 | ||
|
|
c4de92e9f6 | ||
|
|
cf5a803971 | ||
|
|
f487be5fdb | ||
|
|
bd70d24a16 | ||
|
|
708cd042df | ||
|
|
a69b739e5e | ||
|
|
4e96efa9ff | ||
|
|
3a704b68cc | ||
|
|
c8f51a7ec5 | ||
|
|
b7bad99292 | ||
|
|
7b6b827899 | ||
|
|
d121927c96 | ||
|
|
9a3ad6a51c | ||
|
|
e0c6fbefcd | ||
|
|
b924977085 | ||
|
|
521ee36769 | ||
|
|
583754ea82 | ||
|
|
58b6a818bd | ||
|
|
8b1b176f51 | ||
|
|
68d6cbaab9 |
75
README.md
75
README.md
@@ -19,6 +19,10 @@ _ "m7s.live/plugin/gb28181/v4"
|
||||
```yaml
|
||||
gb28181:
|
||||
autoinvite: true
|
||||
position:
|
||||
autosubposition: false #是否自动订阅定位
|
||||
expires: 3600s #订阅周期(单位:秒),默认3600
|
||||
interval: 6s #订阅间隔(单位:秒),默认6
|
||||
prefetchrecord: false
|
||||
udpcachesize: 0
|
||||
sipnetwork: udp
|
||||
@@ -29,11 +33,7 @@ gb28181:
|
||||
username: ""
|
||||
password: ""
|
||||
|
||||
acktimeout: 10
|
||||
registervalidity: 60
|
||||
registerinterval: 60
|
||||
heartbeatinterval: 60
|
||||
heartbeatretry: 3
|
||||
registervalidity: 60s
|
||||
|
||||
mediaip: ""
|
||||
mediaport: 58200
|
||||
@@ -42,7 +42,7 @@ gb28181:
|
||||
mediaportmin: 0
|
||||
meidaportmax: 0
|
||||
|
||||
removebaninterval: 600
|
||||
removebaninterval: 10m
|
||||
loglevel: info
|
||||
```
|
||||
|
||||
@@ -58,11 +58,7 @@ gb28181:
|
||||
- `Username` string sip 服务器账号
|
||||
- `Password` string sip 服务器密码
|
||||
|
||||
- `AckTimeout` uint16 sip 服务应答超时,单位秒
|
||||
- `RegisterValidity` int 注册有效期,单位秒,默认 3600
|
||||
- `RegisterInterval` int 注册间隔,单位秒,默认 60
|
||||
- `HeartbeatInterval` int 心跳间隔,单位秒,默认 60
|
||||
- `HeartbeatRetry` int 心跳超时次数,默认 3
|
||||
- `RegisterValidity` time.Duration 注册有效期,单位秒,默认 60
|
||||
|
||||
* 媒体服务器配置
|
||||
- `MediaIP` string 媒体服务器地址 默认 自动适配设备网段
|
||||
@@ -71,9 +67,8 @@ gb28181:
|
||||
- `MediaIdleTimeout` uint16 推流超时时间,超过则断开链接,让设备重连
|
||||
- `MediaPortMin` uint16 媒体服务器端口范围最小值
|
||||
- `MediaPortMax` uint16 媒体服务器端口范围最大值
|
||||
- `AudioEnable` bool 是否开启音频
|
||||
- `LogLevel` string 日志级别,默认 info(trace,debug,info,warn,error,fatal, panic)
|
||||
- `RemoveBanInterval` int 定时移除注册失败的设备黑名单,单位秒,默认10分钟(600秒)
|
||||
- `LogLevel` string 日志级别,默认 info(trace,debug,info,warn,error,fatal, panic)
|
||||
- `RemoveBanInterval` time.Duration 定时移除注册失败的设备黑名单,单位秒,默认10分钟(600秒)
|
||||
- `UdpCacheSize` int 表示UDP缓存大小,默认为0,不开启。仅当TCP关闭,切缓存大于0时才开启,会最多缓存最多N个包,并排序,修复乱序造成的无法播放问题,注意开启后,会有一定的性能损耗,并丢失部分包。
|
||||
|
||||
**如果配置了端口范围,将采用范围端口机制,每一个流对应一个端口
|
||||
@@ -130,12 +125,12 @@ type Device struct {
|
||||
|
||||
`/gb28181/api/invite`
|
||||
|
||||
参数名 | 必传 | 含义
|
||||
|----|---|---
|
||||
id|是 | 设备ID
|
||||
channel|是|通道编号
|
||||
startTime|否|开始时间(纯数字Unix时间戳)
|
||||
endTime|否|结束时间(纯数字Unix时间戳)
|
||||
| 参数名 | 必传 | 含义 |
|
||||
| --------- | ---- | ---------------------------- |
|
||||
| id | 是 | 设备ID |
|
||||
| channel | 是 | 通道编号 |
|
||||
| startTime | 否 | 开始时间(纯数字Unix时间戳) |
|
||||
| endTime | 否 | 结束时间(纯数字Unix时间戳) |
|
||||
|
||||
返回200代表成功
|
||||
|
||||
@@ -143,38 +138,38 @@ endTime|否|结束时间(纯数字Unix时间戳)
|
||||
|
||||
`/gb28181/api/bye`
|
||||
|
||||
参数名 | 必传 | 含义
|
||||
|----|---|---
|
||||
id|是 | 设备ID
|
||||
channel|是|通道编号
|
||||
| 参数名 | 必传 | 含义 |
|
||||
| ------- | ---- | -------- |
|
||||
| id | 是 | 设备ID |
|
||||
| channel | 是 | 通道编号 |
|
||||
|
||||
### 发送控制命令
|
||||
|
||||
`/gb28181/api/control`
|
||||
|
||||
参数名 | 必传 | 含义
|
||||
|----|---|---
|
||||
id|是 | 设备ID
|
||||
channel|是|通道编号
|
||||
ptzcmd|是|PTZ控制指令
|
||||
| 参数名 | 必传 | 含义 |
|
||||
| ------- | ---- | ----------- |
|
||||
| id | 是 | 设备ID |
|
||||
| channel | 是 | 通道编号 |
|
||||
| ptzcmd | 是 | PTZ控制指令 |
|
||||
|
||||
### 查询录像
|
||||
|
||||
`/gb28181/api/records`
|
||||
|
||||
参数名 | 必传 | 含义
|
||||
|----|---|---
|
||||
id|是 | 设备ID
|
||||
channel|是|通道编号
|
||||
startTime|否|开始时间(字符串,格式:2021-7-23T12:00:00)
|
||||
endTime|否|结束时间(字符串格式同上)
|
||||
| 参数名 | 必传 | 含义 |
|
||||
| --------- | ---- | -------------------------------------------- |
|
||||
| id | 是 | 设备ID |
|
||||
| channel | 是 | 通道编号 |
|
||||
| startTime | 否 | 开始时间(字符串,格式:2021-7-23T12:00:00) |
|
||||
| endTime | 否 | 结束时间(字符串格式同上) |
|
||||
|
||||
### 移动位置订阅
|
||||
|
||||
`/gb28181/api/position`
|
||||
|
||||
参数名 | 必传 | 含义
|
||||
|----|---|---
|
||||
id|是 | 设备ID
|
||||
expires|是|订阅周期(秒)
|
||||
interval|是|订阅间隔(秒)
|
||||
| 参数名 | 必传 | 含义 |
|
||||
| -------- | ---- | -------------- |
|
||||
| id | 是 | 设备ID |
|
||||
| expires | 是 | 订阅周期(秒) |
|
||||
| interval | 是 | 订阅间隔(秒) |
|
||||
|
||||
172
channel.go
172
channel.go
@@ -1,9 +1,7 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -26,7 +24,7 @@ type ChannelEx struct {
|
||||
RecordEndTime string
|
||||
recordStartTime time.Time
|
||||
recordEndTime time.Time
|
||||
liveInviteLock sync.Mutex
|
||||
liveInviteLock *sync.Mutex
|
||||
tcpPortIndex uint16
|
||||
GpsTime time.Time //gps时间
|
||||
Longitude string //经度
|
||||
@@ -50,37 +48,16 @@ type Channel struct {
|
||||
Secrecy int
|
||||
Status string
|
||||
Children []*Channel `json:"-"`
|
||||
*ChannelEx //自定义属性
|
||||
ChannelEx //自定义属性
|
||||
}
|
||||
|
||||
func (c *Channel) Copy(v *Channel) {
|
||||
if v == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.DeviceID = v.DeviceID
|
||||
c.ParentID = v.ParentID
|
||||
c.Name = v.Name
|
||||
c.Manufacturer = v.Manufacturer
|
||||
c.Model = v.Model
|
||||
c.Owner = v.Owner
|
||||
c.CivilCode = v.CivilCode
|
||||
c.Address = v.Address
|
||||
c.Parental = v.Parental
|
||||
c.SafetyWay = v.SafetyWay
|
||||
c.RegisterWay = v.RegisterWay
|
||||
c.Secrecy = v.Secrecy
|
||||
c.Status = v.Status
|
||||
c.Status = v.Status
|
||||
c.ChannelEx = v.ChannelEx
|
||||
}
|
||||
|
||||
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
d := c.device
|
||||
func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
d := channel.device
|
||||
d.sn++
|
||||
|
||||
callId := sip.CallID(utils.RandNumString(10))
|
||||
userAgent := sip.UserAgentHeader("Monibuca")
|
||||
maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
|
||||
cseq := sip.CSeq{
|
||||
SeqNo: uint32(d.sn),
|
||||
MethodName: Method,
|
||||
@@ -97,15 +74,15 @@ func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
}
|
||||
//非同一域的目标地址需要使用@host
|
||||
host := conf.Realm
|
||||
if c.DeviceID[0:9] != host {
|
||||
if channel.DeviceID[0:9] != host {
|
||||
deviceIp := d.NetAddr
|
||||
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
|
||||
host = fmt.Sprintf("%s:%d", deviceIp, c.Port)
|
||||
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
|
||||
}
|
||||
|
||||
channelAddr := sip.Address{
|
||||
//DisplayName: sip.String{Str: d.serverConfig.Serial},
|
||||
Uri: &sip.SipUri{FUser: sip.String{Str: c.DeviceID}, FHost: host},
|
||||
Uri: &sip.SipUri{FUser: sip.String{Str: channel.DeviceID}, FHost: host},
|
||||
}
|
||||
req = sip.NewRequest(
|
||||
"",
|
||||
@@ -118,6 +95,7 @@ func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
&callId,
|
||||
&userAgent,
|
||||
&cseq,
|
||||
&maxForwards,
|
||||
serverAddr.AsContactHeader(),
|
||||
},
|
||||
"",
|
||||
@@ -175,64 +153,9 @@ func (channel *Channel) Control(PTZCmd string) int {
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
type InviteOptions struct {
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
return o.Start == 0 || o.End == 0
|
||||
}
|
||||
|
||||
func (o InviteOptions) Record() bool {
|
||||
return !o.IsLive()
|
||||
}
|
||||
|
||||
func (o *InviteOptions) Validate(start, end string) error {
|
||||
if start != "" {
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
o.Start = int(sint)
|
||||
}
|
||||
if end != "" {
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
o.End = int(eint)
|
||||
}
|
||||
if o.Start >= o.End {
|
||||
return errors.New("start < end")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o InviteOptions) String() string {
|
||||
return fmt.Sprintf("t=%d %d", o.Start, o.End)
|
||||
}
|
||||
|
||||
func (o *InviteOptions) CreateSSRC() {
|
||||
ssrc := make([]byte, 10)
|
||||
if o.IsLive() {
|
||||
ssrc[0] = '0'
|
||||
} else {
|
||||
ssrc[0] = '1'
|
||||
}
|
||||
copy(ssrc[1:6], conf.Serial[3:8])
|
||||
randNum := 1000 + rand.Intn(8999)
|
||||
copy(ssrc[6:], strconv.Itoa(randNum))
|
||||
o.ssrc = string(ssrc)
|
||||
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
|
||||
o.SSRC = uint32(_ssrc)
|
||||
}
|
||||
|
||||
/*
|
||||
// Invite 发送Invite报文 invites a channel to play
|
||||
// 注意里面的锁保证不同时发送invite报文,该锁由channel持有
|
||||
/***
|
||||
f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
|
||||
各项具体含义:
|
||||
v:后续参数为视频的参数;各参数间以 “/”分割;
|
||||
@@ -276,13 +199,13 @@ f = v/a/编码格式/码率大小/采样率
|
||||
f字段中视、音频参数段之间不需空格分割。
|
||||
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
|
||||
*/
|
||||
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
if opt.IsLive() {
|
||||
if !channel.liveInviteLock.TryLock() {
|
||||
return 304, nil
|
||||
}
|
||||
defer func() {
|
||||
if code != 200 {
|
||||
if code != OK {
|
||||
channel.liveInviteLock.Unlock()
|
||||
}
|
||||
}()
|
||||
@@ -299,10 +222,6 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
// size := 1
|
||||
// fps := 15
|
||||
// bitrate := 200
|
||||
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
|
||||
publisher := &GBPublisher{
|
||||
InviteOptions: opt,
|
||||
channel: channel,
|
||||
@@ -313,32 +232,24 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if conf.tcpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenTCP()
|
||||
if err != nil {
|
||||
return 500, err
|
||||
return ServerInternalError, err
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
publisher.DisableReorder = true
|
||||
} else {
|
||||
if conf.udpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenUDP()
|
||||
if err != nil {
|
||||
code = 500
|
||||
code = ServerInternalError
|
||||
return
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
}
|
||||
// if opt.MediaPort == 0 {
|
||||
// opt.MediaPort = conf.MediaPort
|
||||
// if conf.IsMediaNetworkTCP() {
|
||||
// protocol = "TCP/"
|
||||
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
|
||||
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
|
||||
// channel.tcpPortIndex = 0
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
sdpInfo := []string{
|
||||
"v=0",
|
||||
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
|
||||
@@ -367,11 +278,13 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
invite.AppendHeader(&subject)
|
||||
publisher.inviteRes, err = d.SipRequestForResponse(invite)
|
||||
if err != nil {
|
||||
return http.StatusRequestTimeout, err
|
||||
plugin.Error(fmt.Sprintf("SIP->Invite %s :%s invite error: %s", channel.DeviceID, invite.String(), err.Error()))
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
code = int(publisher.inviteRes.StatusCode())
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
|
||||
if code == 200 {
|
||||
|
||||
if code == OK {
|
||||
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
|
||||
for _, l := range ds {
|
||||
if ls := strings.Split(l, "="); len(ls) > 1 {
|
||||
@@ -389,14 +302,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
publisher.udpCache = utils.NewPqRtp()
|
||||
}
|
||||
if err = plugin.Publish(streamPath, publisher); err != nil {
|
||||
code = 403
|
||||
code = ServerInternalError
|
||||
return
|
||||
}
|
||||
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
|
||||
srv.Send(ack)
|
||||
} else if opt.IsLive() && conf.AutoInvite {
|
||||
} else if channel.CanInvite() {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
channel.Invite(InviteOptions{})
|
||||
channel.TryAutoInvite()
|
||||
})
|
||||
}
|
||||
return
|
||||
@@ -416,3 +329,38 @@ func (channel *Channel) Bye(live bool) int {
|
||||
}
|
||||
return 404
|
||||
}
|
||||
|
||||
func (channel *Channel) TryAutoInvite() {
|
||||
if conf.AutoInvite && channel.CanInvite() {
|
||||
go channel.Invite(&InviteOptions{})
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *Channel) CanInvite() bool {
|
||||
if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
|
||||
return false
|
||||
}
|
||||
|
||||
if conf.InviteIDs == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
// 11~13位是设备类型编码
|
||||
typeID := channel.DeviceID[10:13]
|
||||
|
||||
// format: start-end,type1,type2
|
||||
tokens := strings.Split(conf.InviteIDs, ",")
|
||||
for _, tok := range tokens {
|
||||
if first, second, ok := strings.Cut(tok, "-"); ok {
|
||||
if typeID >= first && typeID <= second {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if typeID == first {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
115
const.go
Normal file
115
const.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package gb28181
|
||||
|
||||
const (
|
||||
Trying = 100
|
||||
Ringing = 180
|
||||
CallIsBeingForwarded = 181
|
||||
Queued = 182
|
||||
SessionProgress = 183
|
||||
OK = 200
|
||||
Accepted = 202
|
||||
MultipleChoices = 300
|
||||
MovedPermanently = 301
|
||||
MovedTemporarily = 302
|
||||
UseProxy = 305
|
||||
AlternativeService = 380
|
||||
BadRequest = 400
|
||||
Unauthorized = 401
|
||||
PaymentRequired = 402
|
||||
Forbidden = 403
|
||||
NotFound = 404
|
||||
MethodNotAllowed = 405
|
||||
NotAcceptable = 406
|
||||
ProxyAuthenticationRequired = 407
|
||||
RequestTimeout = 408
|
||||
Gone = 410
|
||||
RequestEntityTooLarge = 413
|
||||
RequestURITooLong = 414
|
||||
UnsupportedMediaType = 415
|
||||
UnsupportedURIScheme = 416
|
||||
BadExtension = 420
|
||||
ExtensionRequired = 421
|
||||
IntervalTooBrief = 423
|
||||
TemporarilyUnavailable = 480
|
||||
CallTransactionDoesNotExist = 481
|
||||
LoopDetected = 482
|
||||
TooManyHops = 483
|
||||
AddressIncomplete = 484
|
||||
Ambiguous = 485
|
||||
BusyHere = 486
|
||||
RequestTerminated = 487
|
||||
NotAcceptableHere = 488
|
||||
BadEvent = 489
|
||||
RequestPending = 491
|
||||
Undecipherable = 493
|
||||
ServerInternalError = 500
|
||||
NotImplemented = 501
|
||||
BadGateway = 502
|
||||
ServiceUnavailable = 503
|
||||
ServerTim = 504
|
||||
VersionNotSupported = 505
|
||||
MessageTooLarge = 513
|
||||
BusyEverywhere = 600
|
||||
Decline = 603
|
||||
DoesNotExistAnywhere = 604
|
||||
SessionNotAcceptable = 606
|
||||
)
|
||||
|
||||
var reasons = map[int]string{
|
||||
100: "Trying",
|
||||
180: "Ringing",
|
||||
181: "Call Is Being Forwarded",
|
||||
182: "Queued",
|
||||
183: "Session Progress",
|
||||
200: "OK",
|
||||
202: "Accepted",
|
||||
300: "Multiple Choices",
|
||||
301: "Moved Permanently",
|
||||
302: "Moved Temporarily",
|
||||
305: "Use Proxy",
|
||||
380: "Alternative Service",
|
||||
400: "Bad Request",
|
||||
401: "Unauthorized",
|
||||
402: "Payment Required",
|
||||
403: "Forbidden",
|
||||
404: "Not Found",
|
||||
405: "Method Not Allowed",
|
||||
406: "Not Acceptable",
|
||||
407: "Proxy Authentication Required",
|
||||
408: "Request Timeout",
|
||||
410: "Gone",
|
||||
413: "Request Entity Too Large",
|
||||
414: "Request-URI Too Long",
|
||||
415: "Unsupported Media Type",
|
||||
416: "Unsupported URI Scheme",
|
||||
420: "Bad Extension",
|
||||
421: "Extension Required",
|
||||
423: "Interval Too Brief",
|
||||
480: "Temporarily Unavailable",
|
||||
481: "Call transaction Does Not Exist",
|
||||
482: "Loop Detected",
|
||||
483: "Too Many Hops",
|
||||
484: "Address Incomplete",
|
||||
485: "Ambiguous",
|
||||
486: "Busy Here",
|
||||
487: "Request Terminated",
|
||||
488: "Not Acceptable Here",
|
||||
489: "Bad Event",
|
||||
491: "Request Pending",
|
||||
493: "Undecipherable",
|
||||
500: "Server Internal Error",
|
||||
501: "Not Implemented",
|
||||
502: "Bad Gateway",
|
||||
503: "Service Unavailable",
|
||||
504: "Server Tim",
|
||||
505: "Version Not Supported",
|
||||
513: "message Too Large",
|
||||
600: "Busy Everywhere",
|
||||
603: "Decline",
|
||||
604: "Does Not Exist Anywhere",
|
||||
606: "SESSION NOT ACCEPTABLE",
|
||||
}
|
||||
|
||||
func Explain(statusCode int) string {
|
||||
return reasons[statusCode]
|
||||
}
|
||||
171
device.go
171
device.go
@@ -10,6 +10,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -55,7 +57,6 @@ type Device struct {
|
||||
UpdateTime time.Time
|
||||
LastKeepaliveAt time.Time
|
||||
Status string
|
||||
Channels []*Channel
|
||||
sn int
|
||||
addr sip.Address
|
||||
sipIP string //设备对应网卡的服务器ip
|
||||
@@ -67,9 +68,23 @@ type Device struct {
|
||||
CallID string
|
||||
Timeout time.Time
|
||||
}
|
||||
lastSyncTime time.Time
|
||||
GpsTime time.Time //gps时间
|
||||
Longitude string //经度
|
||||
Latitude string //纬度
|
||||
}
|
||||
|
||||
func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
func (d *Device) MarshalJSON() ([]byte, error) {
|
||||
type Alias Device
|
||||
return json.Marshal(&struct {
|
||||
Channels []*Channel
|
||||
*Alias
|
||||
}{
|
||||
Channels: maps.Values(d.channelMap),
|
||||
Alias: (*Alias)(d),
|
||||
})
|
||||
}
|
||||
func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
from, _ := req.From()
|
||||
d.addr = sip.Address{
|
||||
DisplayName: from.DisplayName,
|
||||
@@ -78,7 +93,7 @@ func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
deviceIp := req.Source()
|
||||
servIp := req.Recipient().Host()
|
||||
//根据网卡ip获取对应的公网ip
|
||||
sipIP := config.routes[servIp]
|
||||
sipIP := c.routes[servIp]
|
||||
//如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
|
||||
if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
|
||||
if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
|
||||
@@ -86,14 +101,14 @@ func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
}
|
||||
}
|
||||
//如果用户配置过则使用配置的
|
||||
if config.SipIP != "" {
|
||||
sipIP = config.SipIP
|
||||
if c.SipIP != "" {
|
||||
sipIP = c.SipIP
|
||||
} else if sipIP == "" {
|
||||
sipIP = myip.InternalIPv4()
|
||||
}
|
||||
mediaIp := sipIP
|
||||
if config.MediaIP != "" {
|
||||
mediaIp = config.MediaIP
|
||||
if c.MediaIP != "" {
|
||||
mediaIp = c.MediaIP
|
||||
}
|
||||
plugin.Info("RecoverDevice", zap.String("id", d.ID), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
|
||||
d.Status = string(sip.REGISTER)
|
||||
@@ -102,10 +117,9 @@ func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
d.NetAddr = deviceIp
|
||||
d.UpdateTime = time.Now()
|
||||
d.channelMap = make(map[string]*Channel)
|
||||
go d.Catalog()
|
||||
}
|
||||
func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
|
||||
var d *Device
|
||||
|
||||
func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
|
||||
from, _ := req.From()
|
||||
deviceAddr := sip.Address{
|
||||
DisplayName: from.DisplayName,
|
||||
@@ -121,7 +135,7 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
|
||||
} else {
|
||||
servIp := req.Recipient().Host()
|
||||
//根据网卡ip获取对应的公网ip
|
||||
sipIP := config.routes[servIp]
|
||||
sipIP := c.routes[servIp]
|
||||
//如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
|
||||
if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
|
||||
if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
|
||||
@@ -129,14 +143,14 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
|
||||
}
|
||||
}
|
||||
//如果用户配置过则使用配置的
|
||||
if config.SipIP != "" {
|
||||
sipIP = config.SipIP
|
||||
if c.SipIP != "" {
|
||||
sipIP = c.SipIP
|
||||
} else if sipIP == "" {
|
||||
sipIP = myip.InternalIPv4()
|
||||
}
|
||||
mediaIp := sipIP
|
||||
if config.MediaIP != "" {
|
||||
mediaIp = config.MediaIP
|
||||
if c.MediaIP != "" {
|
||||
mediaIp = c.MediaIP
|
||||
}
|
||||
plugin.Info("StoreDevice", zap.String("id", id), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
|
||||
d = &Device{
|
||||
@@ -151,17 +165,17 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
|
||||
channelMap: make(map[string]*Channel),
|
||||
}
|
||||
Devices.Store(id, d)
|
||||
SaveDevices()
|
||||
go d.Catalog()
|
||||
c.SaveDevices()
|
||||
}
|
||||
return
|
||||
}
|
||||
func ReadDevices() {
|
||||
func (c *GB28181Config) ReadDevices() {
|
||||
if f, err := os.OpenFile("devices.json", os.O_RDONLY, 0644); err == nil {
|
||||
defer f.Close()
|
||||
var items []*Device
|
||||
if err = json.NewDecoder(f).Decode(&items); err == nil {
|
||||
for _, item := range items {
|
||||
if time.Since(item.UpdateTime) < time.Duration(conf.RegisterValidity)*time.Second {
|
||||
if time.Since(item.UpdateTime) < conf.RegisterValidity {
|
||||
item.Status = "RECOVER"
|
||||
Devices.Store(item.ID, item)
|
||||
}
|
||||
@@ -169,7 +183,7 @@ func ReadDevices() {
|
||||
}
|
||||
}
|
||||
}
|
||||
func SaveDevices() {
|
||||
func (c *GB28181Config) SaveDevices() {
|
||||
var item []any
|
||||
Devices.Range(func(key, value any) bool {
|
||||
item = append(item, value)
|
||||
@@ -183,19 +197,33 @@ func SaveDevices() {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Device) addChannel(channel *Channel) {
|
||||
for _, c := range d.Channels {
|
||||
if c.DeviceID == channel.DeviceID {
|
||||
return
|
||||
}
|
||||
func (d *Device) addOrUpdateChannel(channel *Channel) {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
channel.device = d
|
||||
var oldLock *sync.Mutex
|
||||
if old, ok := d.channelMap[channel.DeviceID]; ok {
|
||||
//复制锁指针
|
||||
oldLock = old.liveInviteLock
|
||||
}
|
||||
d.Channels = append(d.Channels, channel)
|
||||
if oldLock == nil {
|
||||
channel.liveInviteLock = &sync.Mutex{}
|
||||
} else {
|
||||
channel.liveInviteLock = oldLock
|
||||
}
|
||||
d.channelMap[channel.DeviceID] = channel
|
||||
}
|
||||
|
||||
func (d *Device) deleteChannel(DeviceID string) {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
delete(d.channelMap, DeviceID)
|
||||
}
|
||||
|
||||
func (d *Device) CheckSubStream() {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
for _, c := range d.Channels {
|
||||
for _, c := range d.channelMap {
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
@@ -204,46 +232,38 @@ func (d *Device) CheckSubStream() {
|
||||
}
|
||||
}
|
||||
func (d *Device) UpdateChannels(list []*Channel) {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
|
||||
for _, c := range list {
|
||||
if _, ok := conf.Ignores[c.DeviceID]; ok {
|
||||
continue
|
||||
}
|
||||
//当父设备非空且存在时、父设备节点增加通道
|
||||
if c.ParentID != "" {
|
||||
path := strings.Split(c.ParentID, "/")
|
||||
parentId := path[len(path)-1]
|
||||
if parent, ok := d.channelMap[parentId]; ok {
|
||||
if c.DeviceID != parentId {
|
||||
parent.Children = append(parent.Children, c)
|
||||
}
|
||||
} else {
|
||||
d.addChannel(c)
|
||||
//如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。
|
||||
// 暂时不考虑级联目录的实现
|
||||
if d.ID != parentId {
|
||||
//if v, ok := Devices.Load(parentId); ok {
|
||||
// parent := v.(*Device)
|
||||
// parent.addOrUpdateChannel(c)
|
||||
continue
|
||||
//}
|
||||
}
|
||||
} else {
|
||||
d.addChannel(c)
|
||||
}
|
||||
if old, ok := d.channelMap[c.DeviceID]; ok {
|
||||
c.ChannelEx = old.ChannelEx
|
||||
if conf.PreFetchRecord {
|
||||
n := time.Now()
|
||||
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
|
||||
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
|
||||
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
//本设备增加通道
|
||||
d.addOrUpdateChannel(c)
|
||||
|
||||
//预取和邀请
|
||||
if conf.PreFetchRecord {
|
||||
n := time.Now()
|
||||
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
|
||||
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
|
||||
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
old.Copy(c)
|
||||
c = old
|
||||
} else {
|
||||
c.ChannelEx = &ChannelEx{
|
||||
device: d,
|
||||
}
|
||||
d.channelMap[c.DeviceID] = c
|
||||
}
|
||||
if conf.AutoInvite && (c.LivePublisher == nil) {
|
||||
go c.Invite(InviteOptions{})
|
||||
}
|
||||
c.TryAutoInvite()
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
@@ -264,6 +284,7 @@ func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
|
||||
|
||||
callId := sip.CallID(utils.RandNumString(10))
|
||||
userAgent := sip.UserAgentHeader("Monibuca")
|
||||
maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
|
||||
cseq := sip.CSeq{
|
||||
SeqNo: uint32(d.sn),
|
||||
MethodName: Method,
|
||||
@@ -289,6 +310,7 @@ func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
|
||||
&callId,
|
||||
&userAgent,
|
||||
&cseq,
|
||||
&maxForwards,
|
||||
serverAddr.AsContactHeader(),
|
||||
},
|
||||
"",
|
||||
@@ -339,7 +361,7 @@ func (d *Device) Subscribe() int {
|
||||
|
||||
response, err := d.SipRequestForResponse(request)
|
||||
if err == nil && response != nil {
|
||||
if response.StatusCode() == 200 {
|
||||
if response.StatusCode() == OK {
|
||||
callId, _ := request.CallID()
|
||||
d.subscriber.CallID = string(*callId)
|
||||
} else {
|
||||
@@ -351,6 +373,7 @@ func (d *Device) Subscribe() int {
|
||||
}
|
||||
|
||||
func (d *Device) Catalog() int {
|
||||
//os.Stdout.Write(debug.Stack())
|
||||
request := d.CreateRequest(sip.MESSAGE)
|
||||
expires := sip.Expires(3600)
|
||||
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
|
||||
@@ -368,10 +391,9 @@ func (d *Device) Catalog() int {
|
||||
return http.StatusRequestTimeout
|
||||
}
|
||||
|
||||
func (d *Device) QueryDeviceInfo(req *sip.Request) {
|
||||
func (d *Device) QueryDeviceInfo() {
|
||||
for i := time.Duration(5); i < 100; i++ {
|
||||
|
||||
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s", d.ID, d.NetAddr))
|
||||
time.Sleep(time.Second * i)
|
||||
request := d.CreateRequest(sip.MESSAGE)
|
||||
contentType := sip.ContentType("Application/MANSCDP+xml")
|
||||
@@ -386,10 +408,8 @@ func (d *Device) QueryDeviceInfo(req *sip.Request) {
|
||||
// received, _ := via.Params.Get("received")
|
||||
// d.SipIP = received.String()
|
||||
// }
|
||||
if response.StatusCode() != 200 {
|
||||
plugin.Sugar().Errorf("device %s send Catalog : %d\n", d.ID, response.StatusCode())
|
||||
} else {
|
||||
d.Subscribe()
|
||||
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s response code:%d", d.ID, d.NetAddr, response.StatusCode()))
|
||||
if response.StatusCode() == OK {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -401,23 +421,23 @@ func (d *Device) SipRequestForResponse(request sip.Request) (sip.Response, error
|
||||
}
|
||||
|
||||
// MobilePositionSubscribe 移动位置订阅
|
||||
func (d *Device) MobilePositionSubscribe(id string, expires int, interval int) (code int) {
|
||||
func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, interval time.Duration) (code int) {
|
||||
mobilePosition := d.CreateRequest(sip.SUBSCRIBE)
|
||||
if d.subscriber.CallID != "" {
|
||||
callId := sip.CallID(utils.RandNumString(10))
|
||||
mobilePosition.ReplaceHeaders(callId.Name(), []sip.Header{&callId})
|
||||
}
|
||||
expiresHeader := sip.Expires(expires)
|
||||
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
|
||||
expiresHeader := sip.Expires(expires / time.Second)
|
||||
d.subscriber.Timeout = time.Now().Add(expires)
|
||||
contentType := sip.ContentType("Application/MANSCDP+xml")
|
||||
mobilePosition.AppendHeader(&contentType)
|
||||
mobilePosition.AppendHeader(&expiresHeader)
|
||||
|
||||
mobilePosition.SetBody(BuildDevicePositionXML(d.sn, id, interval), true)
|
||||
mobilePosition.SetBody(BuildDevicePositionXML(d.sn, id, int(interval/time.Second)), true)
|
||||
|
||||
response, err := d.SipRequestForResponse(mobilePosition)
|
||||
if err == nil && response != nil {
|
||||
if response.StatusCode() == 200 {
|
||||
if response.StatusCode() == OK {
|
||||
callId, _ := mobilePosition.CallID()
|
||||
d.subscriber.CallID = callId.String()
|
||||
} else {
|
||||
@@ -431,12 +451,16 @@ func (d *Device) MobilePositionSubscribe(id string, expires int, interval int) (
|
||||
// UpdateChannelPosition 更新通道GPS坐标
|
||||
func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
|
||||
if c, ok := d.channelMap[channelId]; ok {
|
||||
c.ChannelEx.GpsTime, _ = time.ParseInLocation("2006-01-02 15:04:05", gpsTime, time.Local)
|
||||
c.ChannelEx.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
|
||||
c.ChannelEx.Longitude = lng
|
||||
c.ChannelEx.Latitude = lat
|
||||
plugin.Sugar().Debugf("更新通道[%s]坐标成功\n", c.Name)
|
||||
} else {
|
||||
plugin.Sugar().Debugf("更新失败,未找到通道[%s]\n", channelId)
|
||||
//如果未找到通道,则更新到设备上
|
||||
d.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
|
||||
d.Longitude = lng
|
||||
d.Latitude = lat
|
||||
plugin.Sugar().Debugf("未找到通道[%s],更新设备[%s]坐标成功\n", channelId, d.ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,7 +482,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
d.channelOffline(v.DeviceID)
|
||||
case "ADD":
|
||||
plugin.Debug("收到通道新增通知")
|
||||
channel := &Channel{
|
||||
channel := Channel{
|
||||
DeviceID: v.DeviceID,
|
||||
ParentID: v.ParentID,
|
||||
Name: v.Name,
|
||||
@@ -474,12 +498,11 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
Secrecy: v.Secrecy,
|
||||
Status: v.Status,
|
||||
}
|
||||
channels := []*Channel{channel}
|
||||
d.UpdateChannels(channels)
|
||||
d.addOrUpdateChannel(&channel)
|
||||
case "DEL":
|
||||
//删除
|
||||
plugin.Debug("收到通道删除通知")
|
||||
d.channelOffline(v.DeviceID)
|
||||
d.deleteChannel(v.DeviceID)
|
||||
case "UPDATE":
|
||||
plugin.Debug("收到通道更新通知")
|
||||
// 更新通道
|
||||
|
||||
72
handle.go
72
handle.go
@@ -52,15 +52,14 @@ func (a *Authorization) getDigest(raw string) string {
|
||||
}
|
||||
}
|
||||
|
||||
func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
|
||||
func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
|
||||
from, _ := req.From()
|
||||
|
||||
id := from.Address.User().String()
|
||||
plugin.Debug(id)
|
||||
|
||||
plugin.Sugar().Debugf("OnRegister: %s, %s from %s ", req.Destination(), id, req.Source())
|
||||
passAuth := false
|
||||
// 不需要密码情况
|
||||
if config.Username == "" && config.Password == "" {
|
||||
if c.Username == "" && c.Password == "" {
|
||||
passAuth = true
|
||||
} else {
|
||||
// 需要密码情况 设备第一次上报,返回401和加密算法
|
||||
@@ -73,7 +72,7 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
|
||||
if auth.Username() == id {
|
||||
username = id
|
||||
} else {
|
||||
username = config.Username
|
||||
username = c.Username
|
||||
}
|
||||
|
||||
if dc, ok := DeviceRegisterCount.LoadOrStore(id, 1); ok && dc.(int) > MaxRegisterCount {
|
||||
@@ -83,7 +82,7 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
|
||||
} else {
|
||||
// 设备第二次上报,校验
|
||||
_nonce, loaded := DeviceNonce.Load(id)
|
||||
if loaded && auth.Verify(username, config.Password, config.Realm, _nonce.(string)) {
|
||||
if loaded && auth.Verify(username, c.Password, c.Realm, _nonce.(string)) {
|
||||
passAuth = true
|
||||
} else {
|
||||
DeviceRegisterCount.Store(id, dc.(int)+1)
|
||||
@@ -92,7 +91,13 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
|
||||
}
|
||||
}
|
||||
if passAuth {
|
||||
config.StoreDevice(id, req)
|
||||
var d *Device
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d = v.(*Device)
|
||||
c.RecoverDevice(d, req)
|
||||
} else {
|
||||
d = c.StoreDevice(id, req)
|
||||
}
|
||||
DeviceNonce.Delete(id)
|
||||
DeviceRegisterCount.Delete(id)
|
||||
resp := sip.NewResponseFromRequest("", req, http.StatusOK, "OK", "")
|
||||
@@ -106,12 +111,14 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
|
||||
Contents: time.Now().Format(TIME_LAYOUT),
|
||||
})
|
||||
_ = tx.Respond(resp)
|
||||
//订阅设备更新
|
||||
go d.syncChannels()
|
||||
} else {
|
||||
response := sip.NewResponseFromRequest("", req, http.StatusUnauthorized, "Unauthorized", "")
|
||||
_nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
|
||||
auth := fmt.Sprintf(
|
||||
`Digest realm="%s",algorithm=%s,nonce="%s"`,
|
||||
config.Realm,
|
||||
c.Realm,
|
||||
"MD5",
|
||||
_nonce.(string),
|
||||
)
|
||||
@@ -122,18 +129,31 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
|
||||
_ = tx.Respond(response)
|
||||
}
|
||||
}
|
||||
func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
|
||||
// syncChannels
|
||||
// 同步设备信息、下属通道信息,包括主动查询通道信息,订阅通道变化情况
|
||||
func (d *Device) syncChannels() {
|
||||
if time.Since(d.lastSyncTime) > 2*conf.HeartbeatInterval {
|
||||
d.lastSyncTime = time.Now()
|
||||
d.QueryDeviceInfo()
|
||||
d.Catalog()
|
||||
d.Subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
from, _ := req.From()
|
||||
id := from.Address.User().String()
|
||||
plugin.Sugar().Debugf("SIP<-OnMessage from %s : %s", req.Source(), req.String())
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d := v.(*Device)
|
||||
switch d.Status {
|
||||
case "RECOVER":
|
||||
config.RecoverDevice(d, req)
|
||||
return
|
||||
c.RecoverDevice(d, req)
|
||||
go d.syncChannels()
|
||||
//return
|
||||
case string(sip.REGISTER):
|
||||
d.Status = "ONLINE"
|
||||
//go d.QueryDeviceInfo(req)
|
||||
}
|
||||
d.UpdateTime = time.Now()
|
||||
temp := &struct {
|
||||
@@ -161,22 +181,20 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
|
||||
case "Keepalive":
|
||||
d.LastKeepaliveAt = time.Now()
|
||||
//callID !="" 说明是订阅的事件类型信息
|
||||
if d.Channels == nil {
|
||||
go d.Catalog()
|
||||
if d.channelMap == nil || len(d.channelMap) == 0 {
|
||||
go d.syncChannels()
|
||||
} else {
|
||||
if d.subscriber.CallID != "" && d.LastKeepaliveAt.After(d.subscriber.Timeout) {
|
||||
go d.Catalog()
|
||||
} else {
|
||||
for _, c := range d.Channels {
|
||||
if config.AutoInvite &&
|
||||
(c.LivePublisher == nil) {
|
||||
c.Invite(InviteOptions{})
|
||||
}
|
||||
}
|
||||
for _, ch := range d.channelMap {
|
||||
ch.TryAutoInvite()
|
||||
}
|
||||
|
||||
}
|
||||
d.CheckSubStream()
|
||||
//为什么要查找子码流?
|
||||
//d.CheckSubStream()
|
||||
//在KeepLive 进行位置订阅的处理,如果开启了自动订阅位置,则去订阅位置
|
||||
if c.Position.AutosubPosition && time.Since(d.GpsTime) > c.Position.Interval*2 {
|
||||
d.MobilePositionSubscribe(d.ID, c.Position.Expires, c.Position.Interval)
|
||||
plugin.Sugar().Debugf("位置自动订阅,设备[%s]成功\n", d.ID)
|
||||
}
|
||||
case "Catalog":
|
||||
d.UpdateChannels(temp.DeviceList)
|
||||
case "RecordInfo":
|
||||
@@ -199,12 +217,12 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
|
||||
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
|
||||
}
|
||||
}
|
||||
func (config *GB28181Config) onBye(req sip.Request, tx sip.ServerTransaction) {
|
||||
func (c *GB28181Config) OnBye(req sip.Request, tx sip.ServerTransaction) {
|
||||
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", ""))
|
||||
}
|
||||
|
||||
// OnNotify 订阅通知处理
|
||||
func (config *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
|
||||
func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
|
||||
from, _ := req.From()
|
||||
id := from.Address.User().String()
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
|
||||
65
inviteoption.go
Normal file
65
inviteoption.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type InviteOptions struct {
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
return o.Start == 0 || o.End == 0
|
||||
}
|
||||
|
||||
func (o InviteOptions) Record() bool {
|
||||
return !o.IsLive()
|
||||
}
|
||||
|
||||
func (o *InviteOptions) Validate(start, end string) error {
|
||||
if start != "" {
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
o.Start = int(sint)
|
||||
}
|
||||
if end != "" {
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
o.End = int(eint)
|
||||
}
|
||||
if o.Start >= o.End {
|
||||
return errors.New("start < end")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o InviteOptions) String() string {
|
||||
return fmt.Sprintf("t=%d %d", o.Start, o.End)
|
||||
}
|
||||
|
||||
func (o *InviteOptions) CreateSSRC() {
|
||||
ssrc := make([]byte, 10)
|
||||
if o.IsLive() {
|
||||
ssrc[0] = '0'
|
||||
} else {
|
||||
ssrc[0] = '1'
|
||||
}
|
||||
copy(ssrc[1:6], conf.Serial[3:8])
|
||||
randNum := 1000 + rand.Intn(8999)
|
||||
copy(ssrc[6:], strconv.Itoa(randNum))
|
||||
o.ssrc = string(ssrc)
|
||||
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
|
||||
o.SSRC = uint32(_ssrc)
|
||||
}
|
||||
80
main.go
80
main.go
@@ -3,48 +3,58 @@ package gb28181
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
myip "github.com/husanpao/ip"
|
||||
. "m7s.live/engine/v4"
|
||||
"m7s.live/engine/v4/config"
|
||||
)
|
||||
|
||||
type GB28181PositionConfig struct {
|
||||
AutosubPosition bool //是否自动订阅定位
|
||||
Expires time.Duration `default:"3600s"` //订阅周期(单位:秒)
|
||||
Interval time.Duration `default:"6s"` //订阅间隔(单位:秒)
|
||||
}
|
||||
|
||||
type GB28181Config struct {
|
||||
AutoInvite bool
|
||||
AutoInvite bool `default:"true"`
|
||||
PreFetchRecord bool
|
||||
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
|
||||
|
||||
//sip服务器的配置
|
||||
SipNetwork string //传输协议,默认UDP,可选TCP
|
||||
SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP
|
||||
SipIP string //sip 服务器公网IP
|
||||
SipPort uint16 //sip 服务器端口,默认 5060
|
||||
Serial string //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string //sip 服务器域,默认 3402000000
|
||||
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
|
||||
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
|
||||
Username string //sip 服务器账号
|
||||
Password string //sip 服务器密码
|
||||
|
||||
AckTimeout uint16 //sip 服务应答超时,单位秒
|
||||
RegisterValidity int //注册有效期,单位秒,默认 3600
|
||||
RegisterInterval int //注册间隔,单位秒,默认 60
|
||||
HeartbeatInterval int //心跳间隔,单位秒,默认 60
|
||||
HeartbeatRetry int //心跳超时次数,默认 3
|
||||
// AckTimeout uint16 //sip 服务应答超时,单位秒
|
||||
RegisterValidity time.Duration `default:"60s"` //注册有效期,单位秒,默认 3600
|
||||
// RegisterInterval int //注册间隔,单位秒,默认 60
|
||||
HeartbeatInterval time.Duration `default:"60s"` //心跳间隔,单位秒,默认 60
|
||||
// HeartbeatRetry int //心跳超时次数,默认 3
|
||||
|
||||
//媒体服务器配置
|
||||
MediaIP string //媒体服务器地址
|
||||
MediaPort uint16 //媒体服务器端口
|
||||
MediaNetwork string //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPortMin uint16
|
||||
MediaPortMax uint16
|
||||
MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
|
||||
MediaIP string //媒体服务器地址
|
||||
MediaPort uint16 `default:"58200"` //媒体服务器端口
|
||||
MediaNetwork string `default:"udp"` //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPortMin uint16
|
||||
MediaPortMax uint16
|
||||
// MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
|
||||
|
||||
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
|
||||
RemoveBanInterval int //移除禁止设备间隔
|
||||
UdpCacheSize int //udp缓存大小
|
||||
RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔
|
||||
UdpCacheSize int //udp缓存大小
|
||||
|
||||
config.Publish
|
||||
Server
|
||||
LogLevel string //trace, debug, info, warn, error, fatal, panic
|
||||
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
|
||||
routes map[string]string
|
||||
DumpPath string //dump PS流本地文件路径
|
||||
|
||||
Position GB28181PositionConfig //关于定位的配置参数
|
||||
}
|
||||
|
||||
func (c *GB28181Config) initRoutes() {
|
||||
@@ -61,7 +71,7 @@ func (c *GB28181Config) initRoutes() {
|
||||
func (c *GB28181Config) OnEvent(event any) {
|
||||
switch event.(type) {
|
||||
case FirstConfig:
|
||||
ReadDevices()
|
||||
c.ReadDevices()
|
||||
go c.initRoutes()
|
||||
c.startServer()
|
||||
}
|
||||
@@ -71,32 +81,6 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
|
||||
return strings.ToLower(c.MediaNetwork) == "tcp"
|
||||
}
|
||||
|
||||
var conf = &GB28181Config{
|
||||
AutoInvite: true,
|
||||
PreFetchRecord: false,
|
||||
UdpCacheSize: 0,
|
||||
SipNetwork: "udp",
|
||||
SipIP: "",
|
||||
SipPort: 5060,
|
||||
Serial: "34020000002000000001",
|
||||
Realm: "3402000000",
|
||||
Username: "",
|
||||
Password: "",
|
||||
var conf GB28181Config
|
||||
|
||||
AckTimeout: 10,
|
||||
RegisterValidity: 60,
|
||||
RegisterInterval: 60,
|
||||
HeartbeatInterval: 60,
|
||||
HeartbeatRetry: 3,
|
||||
|
||||
MediaIP: "",
|
||||
MediaPort: 58200,
|
||||
MediaIdleTimeout: 30,
|
||||
MediaNetwork: "udp",
|
||||
|
||||
RemoveBanInterval: 600,
|
||||
LogLevel: "info",
|
||||
// WaitKeyFrame: true,
|
||||
}
|
||||
|
||||
var plugin = InstallPlugin(conf)
|
||||
var plugin = InstallPlugin(&conf)
|
||||
|
||||
107
publisher.go
107
publisher.go
@@ -12,25 +12,19 @@ import (
|
||||
"github.com/pion/rtp/v2"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
. "m7s.live/engine/v4/codec"
|
||||
"m7s.live/engine/v4/codec/mpegts"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
)
|
||||
|
||||
type GBPublisher struct {
|
||||
Publisher
|
||||
InviteOptions
|
||||
PSPublisher
|
||||
*InviteOptions
|
||||
channel *Channel
|
||||
inviteRes sip.Response
|
||||
parser *utils.DecPSPackage
|
||||
lastSeq uint16
|
||||
udpCache *utils.PriorityQueueRtp
|
||||
dumpFile *os.File
|
||||
dumpPrint io.Writer
|
||||
lastReceive time.Time
|
||||
reorder util.RTPReorder[*rtp.Packet]
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PrintDump(s string) {
|
||||
@@ -41,6 +35,7 @@ func (p *GBPublisher) PrintDump(s string) {
|
||||
|
||||
func (p *GBPublisher) OnEvent(event any) {
|
||||
if p.channel == nil {
|
||||
// p.parser.EsHandler = p
|
||||
p.IO.OnEvent(event)
|
||||
return
|
||||
}
|
||||
@@ -53,6 +48,7 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.Type = "GB28181 Playback"
|
||||
p.channel.RecordPublisher = p
|
||||
}
|
||||
// p.parser.EsHandler = p
|
||||
conf.publishers.Add(p.SSRC, p)
|
||||
if err := error(nil); p.dump != "" {
|
||||
if p.dumpFile, err = os.OpenFile(p.dump, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
|
||||
@@ -66,7 +62,7 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
go p.channel.Invite(InviteOptions{})
|
||||
go p.channel.Invite(&InviteOptions{})
|
||||
}
|
||||
case SEclose, SEKick:
|
||||
if p.IsLive() {
|
||||
@@ -103,102 +99,11 @@ func (p *GBPublisher) Bye() int {
|
||||
resp, err := p.channel.device.SipRequestForResponse(bye)
|
||||
if err != nil {
|
||||
p.Error("Bye", zap.Error(err))
|
||||
return 500
|
||||
return ServerInternalError
|
||||
}
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
if p.VideoTrack == nil {
|
||||
switch p.parser.VideoStreamType {
|
||||
case mpegts.STREAM_TYPE_H264:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
case mpegts.STREAM_TYPE_H265:
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
default:
|
||||
//推测编码类型
|
||||
var maybe264 H264NALUType
|
||||
maybe264 = maybe264.Parse(payload[4])
|
||||
switch maybe264 {
|
||||
case NALU_Non_IDR_Picture,
|
||||
NALU_IDR_Picture,
|
||||
NALU_SEI,
|
||||
NALU_SPS,
|
||||
NALU_PPS,
|
||||
NALU_Access_Unit_Delimiter:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
default:
|
||||
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(payload) > 10 {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
} else {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
|
||||
}
|
||||
if dts == 0 {
|
||||
dts = pts
|
||||
}
|
||||
p.VideoTrack.WriteAnnexB(pts, dts, payload)
|
||||
}
|
||||
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
|
||||
if p.AudioTrack == nil {
|
||||
switch p.parser.AudioStreamType {
|
||||
case mpegts.STREAM_TYPE_G711A:
|
||||
at := NewG711(p.Publisher.Stream, true)
|
||||
at.Audio.SampleRate = 8000
|
||||
at.Audio.SampleSize = 16
|
||||
at.Channels = 1
|
||||
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
|
||||
p.AudioTrack = at
|
||||
case mpegts.STREAM_TYPE_G711U:
|
||||
at := NewG711(p.Publisher.Stream, false)
|
||||
at.Audio.SampleRate = 8000
|
||||
at.Audio.SampleSize = 16
|
||||
at.Channels = 1
|
||||
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
|
||||
p.AudioTrack = at
|
||||
case mpegts.STREAM_TYPE_AAC:
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(payload[:7])
|
||||
default:
|
||||
if payload[0] == 0xff && payload[1]>>4 == 0xf {
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(payload[:7])
|
||||
} else {
|
||||
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p.AudioTrack.WriteRaw(ts, payload)
|
||||
}
|
||||
}
|
||||
|
||||
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
|
||||
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
if p.parser == nil {
|
||||
p.parser = utils.NewDecPSPackage(p)
|
||||
}
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
p.parser.Feed(rtp)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
} else {
|
||||
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
|
||||
if rtp.SequenceNumber != p.lastSeq+1 {
|
||||
p.parser.Drop()
|
||||
if p.VideoTrack != nil {
|
||||
p.VideoTrack.SetLostFlag()
|
||||
}
|
||||
}
|
||||
p.parser.Feed(rtp)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
|
||||
59
restful.go
59
restful.go
@@ -10,11 +10,11 @@ import (
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
func (conf *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
|
||||
util.ReturnJson(func() (list []*Device) {
|
||||
Devices.Range(func(key, value interface{}) bool {
|
||||
device := value.(*Device)
|
||||
if time.Since(device.UpdateTime) > time.Duration(conf.RegisterValidity)*time.Second {
|
||||
if time.Since(device.UpdateTime) > c.RegisterValidity {
|
||||
Devices.Delete(key)
|
||||
} else {
|
||||
list = append(list, device)
|
||||
@@ -25,7 +25,7 @@ func (conf *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
|
||||
}, time.Second*5, w, r)
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
startTime := r.URL.Query().Get("startTime")
|
||||
@@ -37,7 +37,7 @@ func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
ptzcmd := r.URL.Query().Get("ptzcmd")
|
||||
@@ -48,7 +48,7 @@ func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
id := query.Get("id")
|
||||
channel := query.Get("channel")
|
||||
@@ -62,18 +62,18 @@ func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
http.NotFound(w, r)
|
||||
} else if opt.IsLive() && c.LivePublisher != nil {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else if code, err := c.Invite(opt); err == nil {
|
||||
} else if code, err := c.Invite(&opt); err == nil {
|
||||
w.WriteHeader(code)
|
||||
} else {
|
||||
http.Error(w, err.Error(), code)
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
|
||||
dump := r.URL.Query().Get("dump")
|
||||
printOut := r.URL.Query().Get("print")
|
||||
if dump == "" {
|
||||
dump = conf.DumpPath
|
||||
dump = c.DumpPath
|
||||
}
|
||||
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
@@ -102,7 +102,7 @@ func (conf *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
// CORS(w, r)
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
@@ -114,7 +114,7 @@ func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request) {
|
||||
func (c *GB28181Config) API_position(w http.ResponseWriter, r *http.Request) {
|
||||
//CORS(w, r)
|
||||
query := r.URL.Query()
|
||||
//设备id
|
||||
@@ -124,8 +124,14 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
|
||||
//订阅间隔(单位:秒)
|
||||
interval := query.Get("interval")
|
||||
|
||||
expiresInt, _ := strconv.Atoi(expires)
|
||||
intervalInt, _ := strconv.Atoi(interval)
|
||||
expiresInt, err := time.ParseDuration(expires)
|
||||
if expires == "" || err != nil {
|
||||
expiresInt = c.Position.Expires
|
||||
}
|
||||
intervalInt, err := time.ParseDuration(interval)
|
||||
if interval == "" || err != nil {
|
||||
intervalInt = c.Position.Interval
|
||||
}
|
||||
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d := v.(*Device)
|
||||
@@ -134,3 +140,32 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
type DevicePosition struct {
|
||||
ID string
|
||||
GpsTime time.Time //gps时间
|
||||
Longitude string //经度
|
||||
Latitude string //纬度
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
//设备id
|
||||
id := query.Get("id")
|
||||
|
||||
util.ReturnJson(func() (list []*DevicePosition) {
|
||||
if id == "" {
|
||||
Devices.Range(func(key, value interface{}) bool {
|
||||
d := value.(*Device)
|
||||
if time.Since(d.GpsTime) <= c.Position.Interval {
|
||||
list = append(list, &DevicePosition{ID: d.ID, GpsTime: d.GpsTime, Longitude: d.Longitude, Latitude: d.Latitude})
|
||||
}
|
||||
return true
|
||||
})
|
||||
} else if v, ok := Devices.Load(id); ok {
|
||||
d := v.(*Device)
|
||||
list = append(list, &DevicePosition{ID: d.ID, GpsTime: d.GpsTime, Longitude: d.Longitude, Latitude: d.Latitude})
|
||||
}
|
||||
return
|
||||
}, c.Position.Interval, w, r)
|
||||
}
|
||||
|
||||
78
server.go
78
server.go
@@ -31,7 +31,7 @@ type PortManager struct {
|
||||
}
|
||||
|
||||
func (pm *PortManager) Init(start, end uint16) {
|
||||
pm.pos = start
|
||||
pm.pos = start - 1
|
||||
pm.max = end
|
||||
if pm.pos > 0 && pm.max > pm.pos {
|
||||
pm.Valid = true
|
||||
@@ -96,74 +96,76 @@ var levelMap = map[string]log.Level{
|
||||
"panic": log.PanicLevel,
|
||||
}
|
||||
|
||||
func (config *GB28181Config) startServer() {
|
||||
config.publishers.Init()
|
||||
addr := "0.0.0.0:" + strconv.Itoa(int(config.SipPort))
|
||||
func (c *GB28181Config) startServer() {
|
||||
c.publishers.Init()
|
||||
addr := "0.0.0.0:" + strconv.Itoa(int(c.SipPort))
|
||||
|
||||
logger := utils.NewZapLogger(plugin.Logger, "GB SIP Server", nil)
|
||||
logger.SetLevel(levelMap[config.LogLevel])
|
||||
logger.SetLevel(levelMap[c.LogLevel])
|
||||
// logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
|
||||
srvConf := gosip.ServerConfig{}
|
||||
if config.SipIP != "" {
|
||||
srvConf.Host = config.SipIP
|
||||
if c.SipIP != "" {
|
||||
srvConf.Host = c.SipIP
|
||||
}
|
||||
srv = gosip.NewServer(srvConf, nil, nil, logger)
|
||||
srv.OnRequest(sip.REGISTER, config.OnRegister)
|
||||
srv.OnRequest(sip.MESSAGE, config.OnMessage)
|
||||
srv.OnRequest(sip.NOTIFY, config.OnNotify)
|
||||
srv.OnRequest(sip.BYE, config.onBye)
|
||||
err := srv.Listen(strings.ToLower(config.SipNetwork), addr)
|
||||
srv.OnRequest(sip.REGISTER, c.OnRegister)
|
||||
srv.OnRequest(sip.MESSAGE, c.OnMessage)
|
||||
srv.OnRequest(sip.NOTIFY, c.OnNotify)
|
||||
srv.OnRequest(sip.BYE, c.OnBye)
|
||||
err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
|
||||
if err != nil {
|
||||
plugin.Logger.Error("gb28181 server listen", zap.Error(err))
|
||||
} else {
|
||||
plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
|
||||
}
|
||||
|
||||
go config.startMediaServer()
|
||||
go c.startMediaServer()
|
||||
|
||||
if config.Username != "" || config.Password != "" {
|
||||
go removeBanDevice(config)
|
||||
if c.Username != "" || c.Password != "" {
|
||||
go c.removeBanDevice()
|
||||
}
|
||||
}
|
||||
|
||||
func (config *GB28181Config) startMediaServer() {
|
||||
if config.MediaNetwork == "tcp" {
|
||||
config.tcpPorts.Init(config.MediaPortMin, config.MediaPortMax)
|
||||
if !config.tcpPorts.Valid {
|
||||
config.listenMediaTCP()
|
||||
func (c *GB28181Config) startMediaServer() {
|
||||
if c.MediaNetwork == "tcp" {
|
||||
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
if !c.tcpPorts.Valid {
|
||||
c.listenMediaTCP()
|
||||
}
|
||||
} else {
|
||||
config.udpPorts.Init(config.MediaPortMin, config.MediaPortMax)
|
||||
if !config.udpPorts.Valid {
|
||||
config.listenMediaUDP()
|
||||
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
if !c.udpPorts.Valid {
|
||||
c.listenMediaUDP()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
|
||||
func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
|
||||
var rtpPacket rtp.Packet
|
||||
reader := bufio.NewReader(conn)
|
||||
lenBuf := make([]byte, 2)
|
||||
defer conn.Close()
|
||||
var err error
|
||||
ps := make(util.Buffer, 1024)
|
||||
for err == nil {
|
||||
if _, err = io.ReadFull(reader, lenBuf); err != nil {
|
||||
return
|
||||
}
|
||||
ps := make([]byte, binary.BigEndian.Uint16(lenBuf))
|
||||
ps.Reset()
|
||||
ps.Glow(int(binary.BigEndian.Uint16(lenBuf)))
|
||||
if _, err = io.ReadFull(reader, ps); err != nil {
|
||||
return
|
||||
}
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
|
||||
} else if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
} else if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
publisher.PushPS(&rtpPacket)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (config *GB28181Config) listenMediaTCP() {
|
||||
addr := ":" + strconv.Itoa(int(config.MediaPort))
|
||||
func (c *GB28181Config) listenMediaTCP() {
|
||||
addr := ":" + strconv.Itoa(int(c.MediaPort))
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
|
||||
@@ -171,24 +173,24 @@ func (config *GB28181Config) listenMediaTCP() {
|
||||
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
|
||||
return
|
||||
}
|
||||
plugin.Info("Media tcp server start.", zap.Uint16("port", config.MediaPort))
|
||||
plugin.Info("Media tcp server start.", zap.Uint16("port", c.MediaPort))
|
||||
defer listen.Close()
|
||||
defer plugin.Info("Media tcp server stop", zap.Uint16("port", config.MediaPort))
|
||||
defer plugin.Info("Media tcp server stop", zap.Uint16("port", c.MediaPort))
|
||||
|
||||
for {
|
||||
conn, err := listen.Accept()
|
||||
if err != nil {
|
||||
plugin.Error("Accept err=", zap.Error(err))
|
||||
}
|
||||
go processTcpMediaConn(config, conn)
|
||||
go c.processTcpMediaConn(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (config *GB28181Config) listenMediaUDP() {
|
||||
func (c *GB28181Config) listenMediaUDP() {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
|
||||
addr := ":" + strconv.Itoa(int(config.MediaPort))
|
||||
addr := ":" + strconv.Itoa(int(c.MediaPort))
|
||||
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
|
||||
conn, err := net.ListenUDP("udp", mediaAddr)
|
||||
|
||||
@@ -197,15 +199,15 @@ func (config *GB28181Config) listenMediaUDP() {
|
||||
return
|
||||
}
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", config.MediaPort))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", config.MediaPort))
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", c.MediaPort))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", c.MediaPort))
|
||||
dumpLen := make([]byte, 6)
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("Decode rtp error:", zap.Error(err))
|
||||
}
|
||||
if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
if publisher.dumpFile != nil {
|
||||
util.PutBE(dumpLen[:4], n)
|
||||
if publisher.lastReceive.IsZero() {
|
||||
@@ -237,8 +239,8 @@ func (config *GB28181Config) listenMediaUDP() {
|
||||
// }
|
||||
// }
|
||||
|
||||
func removeBanDevice(config *GB28181Config) {
|
||||
t := time.NewTicker(time.Duration(config.RemoveBanInterval) * time.Second)
|
||||
func (c *GB28181Config) removeBanDevice() {
|
||||
t := time.NewTicker(c.RemoveBanInterval)
|
||||
for range t.C {
|
||||
DeviceRegisterCount.Range(func(key, value interface{}) bool {
|
||||
if value.(int) > MaxRegisterCount {
|
||||
|
||||
@@ -47,6 +47,13 @@ func (b *IOBuffer) ReadN(length int) ([]byte, error) {
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
||||
//func (b *IOBuffer) Read(buf []byte) (n int, err error) {
|
||||
// var ret []byte
|
||||
// ret, err = b.ReadN(len(buf))
|
||||
// copy(buf, ret)
|
||||
// return len(ret), err
|
||||
//}
|
||||
|
||||
// empty reports whether the unread portion of the buffer is empty.
|
||||
func (b *IOBuffer) empty() bool { return b.Len() <= b.off }
|
||||
|
||||
|
||||
11
utils/ps.go
11
utils/ps.go
@@ -192,12 +192,9 @@ func (dec *DecPSPackage) Feed(rtp *rtp.Packet) (err error) {
|
||||
return err
|
||||
}
|
||||
psl, err := dec.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
psl &= 0x07
|
||||
if err = dec.Skip(int(psl)); err != nil {
|
||||
return err
|
||||
if err == nil {
|
||||
psl &= 0x07
|
||||
dec.Skip(int(psl))
|
||||
}
|
||||
if len(dec.videoBuffer) > 0 {
|
||||
dec.PushVideo(dec.vPTS, dec.vDTS, dec.videoBuffer)
|
||||
@@ -311,7 +308,7 @@ func (dec *DecPSPackage) decProgramStreamMap() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dec *DecPSPackage) decPESPacket(pts *uint32,dts *uint32) error {
|
||||
func (dec *DecPSPackage) decPESPacket(pts *uint32, dts *uint32) error {
|
||||
payload, err := dec.ReadPayload()
|
||||
|
||||
if len(payload) < 4 {
|
||||
|
||||
Reference in New Issue
Block a user