Compare commits

...

38 Commits

Author SHA1 Message Date
langhuihui
538e96a5c2 规避注册包里面from.Address为空导致的panic 2023-08-06 14:26:41 +08:00
langhuihui
c2003d53e8 更新readme 2023-07-14 15:55:48 +08:00
langhuihui
64ac75905f 默认打开范围端口 2023-07-11 19:38:30 +08:00
langhuihui
d2cc62ff9e 修改一个变量名拼写错误 2023-07-07 14:50:09 +08:00
dexter
2e8aa47bc5 Merge pull request #99 from kingecg/v4
修复:按需拉流支持回放流
2023-07-04 18:42:09 +08:00
程广
585d5949d3 fix query record 2023-07-04 18:17:11 +08:00
kingecg
0285236cce 修复:按需拉流支持回放流 2023-06-30 22:31:51 +08:00
dexter
12895fa2cc Merge pull request #98 from kingecg/patch-1
Update channel.go to support device not use tcp
2023-06-30 21:24:07 +08:00
kingecg
c66303e7e8 Update channel.go to support device not use tcp
when media set to tcp and device not support, fallback to udp
2023-06-30 19:13:47 +08:00
ogofly
5435c2ef1c Merge pull request #95 from rufftio/v4
设备状态变更处理
2023-06-20 14:38:59 +08:00
ogofly
d8c6ad30dd 合并三处定时任务到一个协程 2023-06-19 15:50:31 +08:00
ogofly
86fa7cc7e6 Merge branch 'Monibuca:v4' into v4 2023-06-19 14:33:50 +08:00
liuyancong
692ec21877 定时删除注册超时设备,定时设置心跳超时设备为离线, 规范设备和通道状态为枚举量 2023-06-19 14:33:18 +08:00
liuyancong
71f2b36d2d update: 默认注册有效期配置为 3600s 2023-06-19 13:20:02 +08:00
dexter
b068bd9e5b Merge pull request #94 from rufftio/v4
add:处理 Register 消息的注销情况,将设备从列表中清除
2023-06-19 12:53:10 +08:00
liuyancong
78ac89e7af add:处理 Register 消息的注销情况,将设备从列表中清除 2023-06-19 11:56:17 +08:00
dexter
1cec5301c3 Merge pull request #92 from rufftio/v4
使用 engine 的 stream pasue 和 resume 替代 neverTimeout
2023-06-07 11:09:54 +08:00
ogofly
319f7fc636 Merge branch 'Monibuca:v4' into v4 2023-06-07 11:08:22 +08:00
liuyancong
7a75810203 使用 engine 的 stream pasue 和 resume 替代 neverTimeout 2023-06-07 10:46:42 +08:00
dexter
bbc7b09835 Merge pull request #91 from rufftio/v4
fix: ptz api 参数违规返回问题
2023-06-06 19:03:00 +08:00
liuyancong
1dbdff1fe5 add: 录像播放的暂停、恢复、快进、跳转到制定时间接口 2023-06-06 18:54:27 +08:00
ogofly
952c8f0ff8 Merge branch 'Monibuca:v4' into v4 2023-06-05 22:55:29 +08:00
liuyancong
730f3014f8 fix: ptz api 参数违规返回问题 2023-06-05 19:02:14 +08:00
dexter
682aec656b Merge pull request #90 from rufftio/v4
ptz 控制接口,采用更易理解和使用的参数
2023-06-05 18:40:32 +08:00
ogofly
fbd8683f5b Merge branch 'Monibuca:v4' into v4 2023-06-05 18:18:50 +08:00
liuyancong
b15e4ee89c add: ptz 控制接口,采用更易理解和使用的参数 2023-06-05 18:17:46 +08:00
langhuihui
3c7b3a042d fix: list接口为空时返回[] 而不是null 2023-05-25 14:12:57 +08:00
dexter
858df1377e Merge pull request #88 from ogofly/v4
录像查询重构为在当前查询的http响应中返回
2023-05-24 11:36:14 +08:00
liuyancong
60021d3cd9 录像查询重构为在当前查询的http响应中返回 2023-05-24 11:30:50 +08:00
langhuihui
d8061cd7c3 channel结构体反转 2023-05-23 20:56:24 +08:00
langhuihui
ed397063c4 chroe: update log format 2023-05-21 22:12:09 +08:00
langhuihui
5853120d30 update readme 2023-05-17 09:07:12 +08:00
langhuihui
4c47df0695 fix: update dep ps version to 4.0.1 2023-05-16 23:11:26 +08:00
langhuihui
37fd121d11 feat: change to use ps plugin 2023-05-14 11:12:25 +08:00
langhuihui
05fd8c38f7 feat: add a new way to config port 2023-05-04 09:57:20 +08:00
dexter
2d85e46a8b ChannelEx 2023-04-09 11:08:23 +08:00
charlestamz
4a90d7bf91 修复Recover导致断流的问题 2023-04-05 22:25:53 +08:00
dexter
a020f3ea81 适配引擎修改 2023-04-04 20:20:27 +08:00
16 changed files with 1009 additions and 824 deletions

View File

@@ -18,16 +18,13 @@ _ "m7s.live/plugin/gb28181/v4"
```yaml
gb28181:
autoinvite: true #表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流
invitemode: 1 #0、手动invite 1、表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流,2、按需拉流既等待订阅者触发
position:
autosubposition: false #是否自动订阅定位
expires: 3600s #订阅周期(单位:秒)默认3600
interval: 6s #订阅间隔单位默认6
prefetchrecord: false
udpcachesize: 0 #表示UDP缓存大小默认为0不开启。仅当TCP关闭切缓存大于0时才开启
sipnetwork: udp
sipip: "" #sip服务器地址 默认 自动适配设备网段
sipport: 5060
serial: "34020000002000000001"
realm: "3402000000"
username: ""
@@ -36,10 +33,9 @@ gb28181:
registervalidity: 60s #注册有效期
mediaip: "" #媒体服务器地址 默认 自动适配设备网段
mediaport: 58200 #媒体服务器端口,用于接收设备的流
medianetwork: tcp
mediaportmin: 0 #媒体服务器端口范围最小值,设置后将开启端口范围模式
mediaportmax: 0 #媒体服务器端口范围最大值,设置后将开启端口范围模式
port:
sip: udp:5060 #sip服务器端口
media: tcp:58200-59200 #媒体服务器端口,用于接收设备的流
removebaninterval: 10m #定时移除注册失败的设备黑名单单位秒默认10分钟600秒
loglevel: info
@@ -106,7 +102,7 @@ type Device struct {
| startTime | 否 | 开始时间纯数字Unix时间戳 |
| endTime | 否 | 结束时间纯数字Unix时间戳 |
返回200代表成功
返回200代表成功, 304代表已经在拉取中不能重复拉仅仅针对直播流
### 停止从设备拉流
@@ -117,6 +113,8 @@ type Device struct {
| id | 是 | 设备ID |
| channel | 是 | 通道编号 |
http 200 表示成功404流不存在
### 发送控制命令
`/gb28181/api/control`
@@ -135,8 +133,8 @@ type Device struct {
| --------- | ---- | -------------------------------------------- |
| id | 是 | 设备ID |
| channel | 是 | 通道编号 |
| startTime | 否 | 开始时间(字符串格式2021-7-23T12:00:00 |
| endTime | 否 | 结束时间(字符串格式同上 |
| startTime | 否 | 开始时间(Unix时间戳 |
| endTime | 否 | 结束时间(Unix时间戳 |
### 移动位置订阅

View File

@@ -5,35 +5,123 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"
"sync/atomic"
"github.com/ghettovoice/gosip/sip"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/plugin/gb28181/v4/utils"
"m7s.live/plugin/ps/v4"
)
type ChannelEx struct {
device *Device
RecordPublisher *GBPublisher `json:"-"`
LivePublisher *GBPublisher
LiveSubSP string //实时子码流
Records []*Record
RecordStartTime string
RecordEndTime string
recordStartTime time.Time
recordEndTime time.Time
liveInviteLock *sync.Mutex
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
var QUERY_RECORD_TIMEOUT = time.Second * 5
type PullStream struct {
opt *InviteOptions
channel *Channel
inviteRes sip.Response
}
func (p *PullStream) CreateRequest(method sip.RequestMethod) (req sip.Request) {
res := p.inviteRes
req = p.channel.CreateRequst(method)
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
req.ReplaceHeaders(from.Name(), []sip.Header{from})
req.ReplaceHeaders(to.Name(), []sip.Header{to})
req.ReplaceHeaders(callId.Name(), []sip.Header{callId})
return
}
func (p *PullStream) Bye() int {
req := p.CreateRequest(sip.BYE)
resp, err := p.channel.device.SipRequestForResponse(req)
if p.opt.IsLive() {
p.channel.status.Store(0)
}
if p.opt.recyclePort != nil {
p.opt.recyclePort(p.opt.MediaPort)
}
if err != nil {
return http.StatusInternalServerError
}
return int(resp.StatusCode())
}
func (p *PullStream) info(body string) int {
d := p.channel.device
req := p.CreateRequest(sip.INFO)
contentType := sip.ContentType("Application/MANSRTSP")
req.AppendHeader(&contentType)
req.SetBody(body, true)
resp, err := d.SipRequestForResponse(req)
if err != nil {
log.Warnf("Send info to stream error: %v, stream=%s, body=%s", err, p.opt.StreamPath, body)
return getSipRespErrorCode(err)
}
return int(resp.StatusCode())
}
// 暂停播放
func (p *PullStream) Pause() int {
body := fmt.Sprintf(`PAUSE RTSP/1.0
CSeq: %d
PauseTime: now
`, p.channel.device.sn)
return p.info(body)
}
// 恢复播放
func (p *PullStream) Resume() int {
d := p.channel.device
body := fmt.Sprintf(`PLAY RTSP/1.0
CSeq: %d
Range: npt=now-
`, d.sn)
return p.info(body)
}
// 跳转到播放时间
// second: 相对于起始点调整到第 sec 秒播放
func (p *PullStream) PlayAt(second uint) int {
d := p.channel.device
body := fmt.Sprintf(`PLAY RTSP/1.0
CSeq: %d
Range: npt=%d-
`, d.sn, second)
return p.info(body)
}
// 快进/快退播放
// speed 取值: 0.25 0.5 1 2 4 或者其对应的负数表示倒放
func (p *PullStream) PlayForward(speed float32) int {
d := p.channel.device
body := fmt.Sprintf(`PLAY RTSP/1.0
CSeq: %d
Scale: %0.6f
`, d.sn, speed)
return p.info(body)
}
type Channel struct {
device *Device // 所属设备
status atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放
LiveSubSP string // 实时子码流通过rtsp
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
*log.Logger `json:"-" yaml:"-"`
ChannelInfo
}
// Channel 通道
type Channel struct {
DeviceID string
type ChannelInfo struct {
DeviceID string // 通道ID
ParentID string
Name string
Manufacturer string
@@ -46,11 +134,16 @@ type Channel struct {
SafetyWay int
RegisterWay int
Secrecy int
Status string
Children []*Channel `json:"-"`
ChannelEx //自定义属性
Status ChannelStatus
}
type ChannelStatus string
const (
ChannelOnStatus = "ON"
ChannelOffStatus = "OFF"
)
func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := channel.device
d.sn++
@@ -75,9 +168,13 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request)
//非同一域的目标地址需要使用@host
host := conf.Realm
if channel.DeviceID[0:9] != host {
deviceIp := d.NetAddr
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
if channel.Port != 0 {
deviceIp := d.NetAddr
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
} else {
host = d.NetAddr
}
}
channelAddr := sip.Address{
@@ -106,33 +203,41 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request)
req.SetDestination(d.NetAddr)
return req
}
func (channel *Channel) QueryRecord(startTime, endTime string) int {
func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error) {
d := channel.device
channel.RecordStartTime = startTime
channel.RecordEndTime = endTime
channel.recordStartTime, _ = time.Parse(TIME_LAYOUT, startTime)
channel.recordEndTime, _ = time.Parse(TIME_LAYOUT, endTime)
channel.Records = nil
request := d.CreateRequest(sip.MESSAGE)
contentType := sip.ContentType("Application/MANSCDP+xml")
request.AppendHeader(&contentType)
body := fmt.Sprintf(`<?xml version="1.0"?>
<Query>
<CmdType>RecordInfo</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
<StartTime>%s</StartTime>
<EndTime>%s</EndTime>
<Secrecy>0</Secrecy>
<Type>all</Type>
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
// body := fmt.Sprintf(`<?xml version="1.0"?>
// <Query>
// <CmdType>RecordInfo</CmdType>
// <SN>%d</SN>
// <DeviceID>%s</DeviceID>
// <StartTime>%s</StartTime>
// <EndTime>%s</EndTime>
// <Secrecy>0</Secrecy>
// <Type>all</Type>
// </Query>`, d.sn, channel.DeviceID, startTime, endTime)
start, _ := strconv.ParseInt(startTime, 10, 0)
end, _ := strconv.ParseInt(endTime, 10, 0)
body := BuildRecordInfoXML(d.sn, channel.DeviceID, start, end)
request.SetBody(body, true)
resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.sn, QUERY_RECORD_TIMEOUT)
resp, err := d.SipRequestForResponse(request)
if err != nil {
return http.StatusRequestTimeout
return nil, fmt.Errorf("query error: %s", err)
}
return int(resp.StatusCode())
if resp.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("query error, status=%d", resp.StatusCode())
}
// RecordQueryLink 中加了超时机制,该结果一定会返回
// 所以此处不用再增加超时等保护机制
r := <-resultCh
return r.list, r.err
}
func (channel *Channel) Control(PTZCmd string) int {
d := channel.device
request := d.CreateRequest(sip.MESSAGE)
@@ -199,18 +304,27 @@ f = v/a/编码格式/码率大小/采样率
f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
if opt.IsLive() {
if !channel.liveInviteLock.TryLock() {
if !channel.status.CompareAndSwap(0, 1) {
return 304, nil
}
defer func() {
if code != OK {
channel.liveInviteLock.Unlock()
if err != nil {
GB28181Plugin.Error("Invite", zap.Error(err))
channel.status.Store(0)
if conf.InviteMode == 1 {
// 5秒后重试
time.AfterFunc(time.Second*5, func() {
channel.Invite(opt)
})
}
} else {
channel.status.Store(2)
}
}()
}
channel.Bye(opt.IsLive())
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
s := "Play"
@@ -219,37 +333,38 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
s = "Playback"
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
}
if opt.StreamPath != "" {
streamPath = opt.StreamPath
} else {
opt.StreamPath = streamPath
}
if opt.dump == "" {
opt.dump = conf.DumpPath
}
publisher := &GBPublisher{
InviteOptions: opt,
channel: channel,
}
publisher.DisableReorder = !conf.RtpReorder
protocol := ""
networkType := "udp"
reusePort := true
if conf.IsMediaNetworkTCP() {
networkType = "tcp"
protocol = "TCP/"
if conf.tcpPorts.Valid {
opt.MediaPort, err = publisher.ListenTCP()
if err != nil {
return ServerInternalError, err
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
opt.MediaPort, err = conf.tcpPorts.GetPort()
opt.recyclePort = conf.tcpPorts.Recycle
reusePort = false
}
publisher.DisableReorder = true
} else {
if conf.udpPorts.Valid {
opt.MediaPort, err = publisher.ListenUDP()
if err != nil {
code = ServerInternalError
return
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
opt.MediaPort, err = conf.udpPorts.GetPort()
opt.recyclePort = conf.udpPorts.Recycle
reusePort = false
}
}
if err != nil {
return http.StatusInternalServerError, err
}
if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
sdpInfo := []string{
"v=0",
@@ -262,7 +377,6 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
"a=recvonly",
"a=rtpmap:96 PS/90000",
"y=" + opt.ssrc,
"",
}
if conf.IsMediaNetworkTCP() {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
@@ -271,74 +385,122 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
contentType := sip.ContentType("application/sdp")
invite.AppendHeader(&contentType)
invite.SetBody(strings.Join(sdpInfo, "\r\n"), true)
invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true)
subject := sip.GenericHeader{
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
}
invite.AppendHeader(&subject)
publisher.inviteRes, err = d.SipRequestForResponse(invite)
inviteRes, err := d.SipRequestForResponse(invite)
if err != nil {
plugin.Error(fmt.Sprintf("SIP->Invite %s :%s invite error: %s", channel.DeviceID, invite.String(), err.Error()))
channel.Error("invite", zap.Error(err), zap.String("msg", invite.String()))
return http.StatusInternalServerError, err
}
code = int(publisher.inviteRes.StatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
code = int(inviteRes.StatusCode())
channel.Info("invite response", zap.Int("status code", code))
if code == OK {
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
if code == http.StatusOK {
ds := strings.Split(inviteRes.Body(), "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
if ls[0] == "y" && len(ls[1]) > 0 {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
opt.SSRC = uint32(_ssrc)
} else {
plugin.Error("read invite response y ", zap.Error(err))
channel.Error("read invite response y ", zap.Error(err))
}
// break
}
if ls[0] == "m" && len(ls[1]) > 0 {
netinfo := strings.Split(ls[1], " ")
if strings.ToUpper(netinfo[2]) == "TCP/RTP/AVP" {
channel.Debug("Device support tcp")
} else {
channel.Debug("Device not support tcp")
networkType = "udp"
}
break
}
}
}
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
publisher.udpCache = utils.NewPqRtp()
err = ps.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, reusePort)
if err == nil {
PullStreams.Store(streamPath, &PullStream{
opt: opt,
channel: channel,
inviteRes: inviteRes,
})
err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil))
}
if err = plugin.Publish(streamPath, publisher); err != nil {
code = ServerInternalError
return
}
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
srv.Send(ack)
} else if channel.CanInvite() {
time.AfterFunc(time.Second*5, func() {
channel.TryAutoInvite()
})
}
return
}
func (channel *Channel) Bye(live bool) int {
func (channel *Channel) Bye(streamPath string) int {
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
if s := Streams.Get(streamPath); s != nil {
s.Close()
if streamPath == "" {
streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
}
if live && channel.LivePublisher != nil {
return channel.LivePublisher.Bye()
if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded {
s.(*PullStream).Bye()
if s := Streams.Get(streamPath); s != nil {
s.Close()
}
return http.StatusOK
}
if !live && channel.RecordPublisher != nil {
return channel.RecordPublisher.Bye()
}
return 404
return http.StatusNotFound
}
func (channel *Channel) TryAutoInvite() {
if conf.AutoInvite && channel.CanInvite() {
go channel.Invite(&InviteOptions{})
func (channel *Channel) Pause(streamPath string) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
r := s.(*PullStream).Pause()
if s := Streams.Get(streamPath); s != nil {
s.Pause()
}
return r
}
return http.StatusNotFound
}
func (channel *Channel) Resume(streamPath string) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
r := s.(*PullStream).Resume()
if s := Streams.Get(streamPath); s != nil {
s.Resume()
}
return r
}
return http.StatusNotFound
}
func (channel *Channel) PlayAt(streamPath string, second uint) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
r := s.(*PullStream).PlayAt(second)
if s := Streams.Get(streamPath); s != nil {
s.Resume()
}
return r
}
return http.StatusNotFound
}
func (channel *Channel) PlayForward(streamPath string, speed float32) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
return s.(*PullStream).PlayForward(speed)
}
if s := Streams.Get(streamPath); s != nil {
s.Resume()
}
return http.StatusNotFound
}
func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
if channel.CanInvite() {
go channel.Invite(opt)
}
}
func (channel *Channel) CanInvite() bool {
if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
if channel.status.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == ChannelOffStatus {
return false
}
@@ -365,3 +527,11 @@ func (channel *Channel) CanInvite() bool {
return false
}
func getSipRespErrorCode(err error) int {
if re, ok := err.(*sip.RequestError); ok {
return int(re.Code)
} else {
return http.StatusInternalServerError
}
}

View File

@@ -1,60 +1,5 @@
package gb28181
const (
Trying = 100
Ringing = 180
CallIsBeingForwarded = 181
Queued = 182
SessionProgress = 183
OK = 200
Accepted = 202
MultipleChoices = 300
MovedPermanently = 301
MovedTemporarily = 302
UseProxy = 305
AlternativeService = 380
BadRequest = 400
Unauthorized = 401
PaymentRequired = 402
Forbidden = 403
NotFound = 404
MethodNotAllowed = 405
NotAcceptable = 406
ProxyAuthenticationRequired = 407
RequestTimeout = 408
Gone = 410
RequestEntityTooLarge = 413
RequestURITooLong = 414
UnsupportedMediaType = 415
UnsupportedURIScheme = 416
BadExtension = 420
ExtensionRequired = 421
IntervalTooBrief = 423
TemporarilyUnavailable = 480
CallTransactionDoesNotExist = 481
LoopDetected = 482
TooManyHops = 483
AddressIncomplete = 484
Ambiguous = 485
BusyHere = 486
RequestTerminated = 487
NotAcceptableHere = 488
BadEvent = 489
RequestPending = 491
Undecipherable = 493
ServerInternalError = 500
NotImplemented = 501
BadGateway = 502
ServiceUnavailable = 503
ServerTim = 504
VersionNotSupported = 505
MessageTooLarge = 513
BusyEverywhere = 600
Decline = 603
DoesNotExistAnywhere = 604
SessionNotAcceptable = 606
)
var reasons = map[int]string{
100: "Trying",
180: "Ringing",
@@ -113,3 +58,9 @@ var reasons = map[int]string{
func Explain(statusCode int) string {
return reasons[statusCode]
}
const (
INVIDE_MODE_MANUAL = iota
INVIDE_MODE_AUTO
INVIDE_MODE_ONSUBSCRIBE
)

190
device.go
View File

@@ -10,10 +10,9 @@ import (
"sync"
"time"
"golang.org/x/exp/maps"
"go.uber.org/zap"
"m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/plugin/gb28181/v4/utils"
// . "github.com/logrusorgru/aurora"
@@ -25,7 +24,6 @@ const TIME_LAYOUT = "2006-01-02T15:04:05"
// Record 录像
type Record struct {
//channel *Channel
DeviceID string
Name string
FilePath string
@@ -46,8 +44,18 @@ var (
DeviceRegisterCount sync.Map //设备注册次数
)
type DeviceStatus string
const (
DeviceRegisterStatus = "REGISTER"
DeviceRecoverStatus = "RECOVER"
DeviceOnlineStatus = "ONLINE"
DeviceOfflineStatus = "OFFLINE"
DeviceAlarmedStatus = "ALARMED"
)
type Device struct {
//*transaction.Core `json:"-"`
//*transaction.Core `json:"-" yaml:"-"`
ID string
Name string
Manufacturer string
@@ -56,14 +64,13 @@ type Device struct {
RegisterTime time.Time
UpdateTime time.Time
LastKeepaliveAt time.Time
Status string
Status DeviceStatus
sn int
addr sip.Address
sipIP string //设备对应网卡的服务器ip
mediaIP string //设备对应网卡的服务器ip
NetAddr string
ChannelMap map[string]*Channel
channelMutex sync.RWMutex
channelMap sync.Map
subscriber struct {
CallID string
Timeout time.Time
@@ -72,17 +79,23 @@ type Device struct {
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
*log.Logger `json:"-" yaml:"-"`
}
func (d *Device) MarshalJSON() ([]byte, error) {
type Alias Device
return json.Marshal(&struct {
Channels []*Channel
data := &struct {
Channels []*ChannelInfo
*Alias
}{
Channels: maps.Values(d.ChannelMap),
Alias: (*Alias)(d),
Alias: (*Alias)(d),
}
d.channelMap.Range(func(key, value interface{}) bool {
c := value.(*Channel)
data.Channels = append(data.Channels, &c.ChannelInfo)
return true
})
return json.Marshal(data)
}
func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
from, _ := req.From()
@@ -110,13 +123,12 @@ func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
if c.MediaIP != "" {
mediaIp = c.MediaIP
}
plugin.Info("RecoverDevice", zap.String("id", d.ID), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d.Status = string(sip.REGISTER)
d.Info("RecoverDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d.Status = DeviceRegisterStatus
d.sipIP = sipIP
d.mediaIP = mediaIp
d.NetAddr = deviceIp
d.UpdateTime = time.Now()
d.ChannelMap = make(map[string]*Channel)
}
func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
@@ -131,7 +143,7 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
d.UpdateTime = time.Now()
d.NetAddr = deviceIp
d.addr = deviceAddr
plugin.Debug("UpdateDevice", zap.String("id", id), zap.String("netaddr", d.NetAddr))
d.Debug("UpdateDevice", zap.String("netaddr", d.NetAddr))
} else {
servIp := req.Recipient().Host()
//根据网卡ip获取对应的公网ip
@@ -152,18 +164,18 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
if c.MediaIP != "" {
mediaIp = c.MediaIP
}
plugin.Info("StoreDevice", zap.String("id", id), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d = &Device{
ID: id,
RegisterTime: time.Now(),
UpdateTime: time.Now(),
Status: string(sip.REGISTER),
Status: DeviceRegisterStatus,
addr: deviceAddr,
sipIP: sipIP,
mediaIP: mediaIp,
NetAddr: deviceIp,
ChannelMap: make(map[string]*Channel),
Logger: GB28181Plugin.With(zap.String("id", id)),
}
d.Info("StoreDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
Devices.Store(id, d)
c.SaveDevices()
}
@@ -177,6 +189,7 @@ func (c *GB28181Config) ReadDevices() {
for _, item := range items {
if time.Since(item.UpdateTime) < conf.RegisterValidity {
item.Status = "RECOVER"
item.Logger = GB28181Plugin.With(zap.String("id", item.ID))
Devices.Store(item.ID, item)
}
}
@@ -197,42 +210,31 @@ func (c *GB28181Config) SaveDevices() {
}
}
func (d *Device) addOrUpdateChannel(channel *Channel) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
channel.device = d
var oldLock *sync.Mutex
if old, ok := d.ChannelMap[channel.DeviceID]; ok {
//复制锁指针
oldLock = old.liveInviteLock
}
if oldLock == nil {
channel.liveInviteLock = &sync.Mutex{}
func (d *Device) addOrUpdateChannel(info ChannelInfo) (c *Channel) {
if old, ok := d.channelMap.Load(info.DeviceID); ok {
c = old.(*Channel)
c.ChannelInfo = info
} else {
channel.liveInviteLock = oldLock
}
d.ChannelMap[channel.DeviceID] = channel
}
func (d *Device) deleteChannel(DeviceID string) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
delete(d.ChannelMap, DeviceID)
}
func (d *Device) CheckSubStream() {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
for _, c := range d.ChannelMap {
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
c = &Channel{
device: d,
ChannelInfo: info,
Logger: d.Logger.With(zap.String("channel", info.DeviceID)),
}
if s := engine.Streams.Get(fmt.Sprintf("%s/%s/rtsp", c.device.ID, c.DeviceID)); s != nil {
c.LiveSubSP = s.Path
} else {
c.LiveSubSP = ""
}
d.channelMap.Store(info.DeviceID, c)
}
return
}
func (d *Device) UpdateChannels(list []*Channel) {
func (d *Device) deleteChannel(DeviceID string) {
d.channelMap.Delete(DeviceID)
}
func (d *Device) UpdateChannels(list ...ChannelInfo) {
for _, c := range list {
if _, ok := conf.Ignores[c.DeviceID]; ok {
continue
@@ -255,32 +257,18 @@ func (d *Device) UpdateChannels(list []*Channel) {
}
}
//本设备增加通道
d.addOrUpdateChannel(c)
channel := d.addOrUpdateChannel(c)
//预取和邀请
if conf.PreFetchRecord {
n := time.Now()
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
if conf.InviteMode == INVIDE_MODE_AUTO {
channel.TryAutoInvite(&InviteOptions{})
}
c.TryAutoInvite()
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
c.LiveSubSP = s.Path
channel.LiveSubSP = s.Path
} else {
c.LiveSubSP = ""
channel.LiveSubSP = ""
}
}
}
func (d *Device) UpdateRecord(channelId string, list []*Record) {
d.channelMutex.RLock()
if c, ok := d.ChannelMap[channelId]; ok {
c.Records = append(c.Records, list...)
}
d.channelMutex.RUnlock()
}
func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
d.sn++
@@ -364,7 +352,7 @@ func (d *Device) Subscribe() int {
response, err := d.SipRequestForResponse(request)
if err == nil && response != nil {
if response.StatusCode() == OK {
if response.StatusCode() == http.StatusOK {
callId, _ := request.CallID()
d.subscriber.CallID = string(*callId)
} else {
@@ -386,13 +374,13 @@ func (d *Device) Catalog() int {
request.AppendHeader(&expires)
request.SetBody(BuildCatalogXML(d.sn, d.ID), true)
// 输出Sip请求设备通道信息信令
plugin.Sugar().Debugf("SIP->Catalog:%s", request)
GB28181Plugin.Sugar().Debugf("SIP->Catalog:%s", request)
resp, err := d.SipRequestForResponse(request)
if err == nil && resp != nil {
plugin.Sugar().Debugf("SIP<-Catalog Response: %s", resp.String())
GB28181Plugin.Sugar().Debugf("SIP<-Catalog Response: %s", resp.String())
return int(resp.StatusCode())
} else if err != nil {
plugin.Error("SIP<-Catalog error:", zap.Error(err))
GB28181Plugin.Error("SIP<-Catalog error:", zap.Error(err))
}
return http.StatusRequestTimeout
}
@@ -414,8 +402,8 @@ func (d *Device) QueryDeviceInfo() {
// received, _ := via.Params.Get("received")
// d.SipIP = received.String()
// }
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s response code:%d", d.ID, d.NetAddr, response.StatusCode()))
if response.StatusCode() == OK {
d.Info("QueryDeviceInfo", zap.Uint16("status code", uint16(response.StatusCode())))
if response.StatusCode() == http.StatusOK {
break
}
}
@@ -443,7 +431,7 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
response, err := d.SipRequestForResponse(mobilePosition)
if err == nil && response != nil {
if response.StatusCode() == OK {
if response.StatusCode() == http.StatusOK {
callId, _ := mobilePosition.CallID()
d.subscriber.CallID = callId.String()
} else {
@@ -456,17 +444,18 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
// UpdateChannelPosition 更新通道GPS坐标
func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
if c, ok := d.ChannelMap[channelId]; ok {
c.ChannelEx.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
c.ChannelEx.Longitude = lng
c.ChannelEx.Latitude = lat
plugin.Sugar().Debugf("更新通道[%s]坐标成功\n", c.Name)
if v, ok := d.channelMap.Load(channelId); ok {
c := v.(*Channel)
c.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
c.Longitude = lng
c.Latitude = lat
c.Debug("update channel position success")
} else {
//如果未找到通道,则更新到设备上
d.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
d.Longitude = lng
d.Latitude = lat
plugin.Sugar().Debugf("未找到通道[%s],更新设备[%s]坐标成功\n", channelId, d.ID)
d.Debug("update device position success", zap.String("channelId", channelId))
}
}
@@ -475,20 +464,20 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
for _, v := range deviceList {
switch v.Event {
case "ON":
plugin.Debug("收到通道上线通知")
d.Debug("receive channel online notify")
d.channelOnline(v.DeviceID)
case "OFF":
plugin.Debug("收到通道离线通知")
d.Debug("receive channel offline notify")
d.channelOffline(v.DeviceID)
case "VLOST":
plugin.Debug("收到通道视频丢失通知")
d.Debug("receive channel video lost notify")
d.channelOffline(v.DeviceID)
case "DEFECT":
plugin.Debug("收到通道故障通知")
d.Debug("receive channel video defect notify")
d.channelOffline(v.DeviceID)
case "ADD":
plugin.Debug("收到通道新增通知")
channel := Channel{
d.Debug("receive channel add notify")
channel := ChannelInfo{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -502,17 +491,17 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
SafetyWay: v.SafetyWay,
RegisterWay: v.RegisterWay,
Secrecy: v.Secrecy,
Status: v.Status,
Status: ChannelStatus(v.Status),
}
d.addOrUpdateChannel(&channel)
d.addOrUpdateChannel(channel)
case "DEL":
//删除
plugin.Debug("收到通道删除通知")
d.Debug("receive channel delete notify")
d.deleteChannel(v.DeviceID)
case "UPDATE":
plugin.Debug("收到通道更新通知")
d.Debug("receive channel update notify")
// 更新通道
channel := &Channel{
channel := ChannelInfo{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -526,28 +515,29 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
SafetyWay: v.SafetyWay,
RegisterWay: v.RegisterWay,
Secrecy: v.Secrecy,
Status: v.Status,
Status: ChannelStatus(v.Status),
}
channels := []*Channel{channel}
d.UpdateChannels(channels)
d.UpdateChannels(channel)
}
}
}
func (d *Device) channelOnline(DeviceID string) {
if c, ok := d.ChannelMap[DeviceID]; ok {
c.Status = "ON"
plugin.Sugar().Debugf("通道[%s]在线\n", c.Name)
if v, ok := d.channelMap.Load(DeviceID); ok {
c := v.(*Channel)
c.Status = ChannelOnStatus
c.Debug("channel online", zap.String("channelId", DeviceID))
} else {
plugin.Sugar().Debugf("更新通道[%s]状态失败,未找到\n", DeviceID)
d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
}
}
func (d *Device) channelOffline(DeviceID string) {
if c, ok := d.ChannelMap[DeviceID]; ok {
c.Status = "OFF"
plugin.Sugar().Debugf("通道[%s]离线\n", c.Name)
if v, ok := d.channelMap.Load(DeviceID); ok {
c := v.(*Channel)
c.Status = ChannelOffStatus
c.Debug("channel offline", zap.String("channelId", DeviceID))
} else {
plugin.Sugar().Debugf("更新通道[%s]状态失败,未找到\n", DeviceID)
d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
}
}

36
go.mod
View File

@@ -3,20 +3,19 @@ module m7s.live/plugin/gb28181/v4
go 1.19
require (
github.com/ghettovoice/gosip v0.0.0-20221121090201-9a2ed2233b6d
github.com/ghettovoice/gosip v0.0.0-20230802091127-d58873a3fe44
github.com/husanpao/ip v0.0.0-20220711082147-73160bb611a8
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/pion/rtp v1.7.13
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db
golang.org/x/net v0.7.0
golang.org/x/text v0.7.0
m7s.live/engine/v4 v4.11.18
github.com/pion/rtp v1.8.0
go.uber.org/zap v1.24.0
golang.org/x/net v0.12.0
golang.org/x/text v0.11.0
m7s.live/engine/v4 v4.13.8
m7s.live/plugin/ps/v4 v4.0.9
)
require (
github.com/aler9/gortsplib v1.0.1 // indirect
github.com/aler9/gortsplib/v2 v2.1.4 // indirect
github.com/bluenviron/mediacommon v0.7.0 // indirect
github.com/cnotch/ipchub v1.1.0 // indirect
github.com/denisbrodbeck/machineid v1.0.1 // indirect
github.com/discoviking/fsm v0.0.0-20150126104936-f4a273feecca // indirect
@@ -25,7 +24,7 @@ require (
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.1.0 // indirect
github.com/gobwas/ws v1.2.1 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/google/uuid v1.3.0 // indirect
@@ -36,7 +35,7 @@ require (
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/webrtc/v3 v3.1.49 // indirect
github.com/pion/webrtc/v3 v3.1.56 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
github.com/q191201771/naza v0.30.8 // indirect
github.com/quic-go/qtls-go1-18 v0.2.0 // indirect
@@ -44,21 +43,22 @@ require (
github.com/quic-go/qtls-go1-20 v0.1.0 // indirect
github.com/quic-go/quic-go v0.32.0 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
github.com/shirou/gopsutil/v3 v3.22.10 // indirect
github.com/shirou/gopsutil/v3 v3.22.11 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/tevino/abool v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 // indirect
github.com/yapingcat/gomedia v0.0.0-20230727105416-c491e66c9d2a // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/tools v0.6.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

118
go.sum
View File

@@ -1,9 +1,7 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aler9/gortsplib v1.0.1 h1:R13+hxlvg2Hvu98+0hzg0o5fPjyUA9ZPJneMIBxKGXk=
github.com/aler9/gortsplib v1.0.1/go.mod h1:BOWNZ/QBkY/eVcRqUzJbPFEsRJshwxaxBT01K260Jeo=
github.com/aler9/gortsplib/v2 v2.1.4 h1:A4C4Qxz3aQibphXoKsifwKmKZRY7leaO3jHkA+SQ2kw=
github.com/aler9/gortsplib/v2 v2.1.4/go.mod h1:Eegw8PWa8hNYXiYMlbK3RX1gr7+r25MxniAPGA+kKUE=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/bluenviron/mediacommon v0.7.0 h1:dJWLLL9oDbAqfK8KuNfnDUQwNbeMAtGeRjZc9Vo95js=
github.com/bluenviron/mediacommon v0.7.0/go.mod h1:wuLJdxcITiSPgY1MvQqrX+qPlKmNfeV9wNvXth5M98I=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -27,8 +25,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghettovoice/gosip v0.0.0-20221121090201-9a2ed2233b6d h1:f1JRfm0MwkluwtUsbYxuVReDMajlc9Wn6zc2orX4sRE=
github.com/ghettovoice/gosip v0.0.0-20221121090201-9a2ed2233b6d/go.mod h1:yTr3BEYSFe9As6XM7ldyrVgqsPwlnw8Ahc4N28VFM2g=
github.com/ghettovoice/gosip v0.0.0-20230802091127-d58873a3fe44 h1:m4/46V6uAJ95CLimMRHJjiH5psW1JuL+iLeUBzF2r70=
github.com/ghettovoice/gosip v0.0.0-20230802091127-d58873a3fe44/go.mod h1:rlD1yLOErWYohWTryG/2bTTpmzB79p52ntLA/uIFXeI=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
@@ -38,8 +36,8 @@ github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u1
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0-rc.1/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gobwas/ws v1.1.0 h1:7RFti/xnNkMJnrK7D1yQ/iCIB5OrrY/54/H930kIbHA=
github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -114,32 +112,31 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q=
github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ=
github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY=
github.com/pion/ice/v2 v2.2.12/go.mod h1:z2KXVFyRkmjetRlaVRgjO9U3ShKwzhlUylvxKfHfd5A=
github.com/pion/interceptor v0.1.11/go.mod h1:tbtKjZY14awXd7Bq0mmWvgtHB5MDaRN7HV3OZ/uy7s8=
github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0=
github.com/pion/dtls/v2 v2.2.6/go.mod h1:t8fWJCIquY5rlQZwA2yWxUS1+OCrAdXrhVKXB5oD/wY=
github.com/pion/ice/v2 v2.3.1/go.mod h1:aq2kc6MtYNcn4XmMhobAv6hTNJiHzvD0yXRz80+bnP8=
github.com/pion/interceptor v0.1.12/go.mod h1:bDtgAD9dRkBZpWHGKaoKb42FhDHTG2rX8Ii9LRALLVA=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.5/go.mod h1:UgssrvdD3mxpi8tMxAXbsppL3vJ4Jipw1mTCW+al01g=
github.com/pion/mdns v0.0.7/go.mod h1:4iP2UbeFhLI/vWju/bw6ZfwjJzk0z8DNValjGxR/dD8=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo=
github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA=
github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sctp v1.8.0/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s=
github.com/pion/sctp v1.8.3/go.mod h1:OHbDjdk7kg+L+7TJim9q/qGVefdEJohuA2SZyihccgI=
github.com/pion/rtp v1.8.0 h1:SYD7040IR+NqrGBOc2GDU5iDjAR+0m5rnX/EWCUMNhw=
github.com/pion/rtp v1.8.0/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0=
github.com/pion/sctp v1.8.6/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0=
github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw=
github.com/pion/srtp/v2 v2.0.10/go.mod h1:XEeSWaK9PfuMs7zxXyiN252AHPbH12NX5q/CFDWtUuA=
github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA=
github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pion/transport v0.12.3/go.mod h1:OViWW9SP2peE/HbwBvARicmAVnesphkNkCVZIWJ6q9A=
github.com/pion/transport v0.13.0/go.mod h1:yxm9uXpK9bpBBWkITk13cLo1y5/ur5VQpG22ny6EP7g=
github.com/pion/transport v0.13.1/go.mod h1:EBxbqzyv+ZrmDb82XswEE0BjfQFtuw1Nu6sjnjWCsGg=
github.com/pion/turn/v2 v2.0.8/go.mod h1:+y7xl719J8bAEVpSXBXvTxStjJv3hbz9YFflvkpcGPw=
github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M=
github.com/pion/webrtc/v3 v3.1.49 h1:rbsNGxK9jMYts+xE6zYAJMUQHnGwmk/JYze8yttW+to=
github.com/pion/webrtc/v3 v3.1.49/go.mod h1:kHf/o47QW4No1rgpsFux/h7lUhtUnwFnSFDZOXeLapw=
github.com/pion/srtp/v2 v2.0.12/go.mod h1:C3Ep44hlOo2qEYaq4ddsmK5dL63eLehXFbHaZ9F5V9Y=
github.com/pion/stun v0.4.0/go.mod h1:QPsh1/SbXASntw3zkkrIk3ZJVKz4saBY2G7S10P3wCw=
github.com/pion/transport v0.14.1/go.mod h1:4tGmbk00NeYA3rUa9+n+dzCCoKkcy3YlYb99Jn2fNnI=
github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc=
github.com/pion/transport/v2 v2.0.2/go.mod h1:vrz6bUbFr/cjdwbnxq8OdDDzHf7JJfGsIRkxfpZoTA0=
github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs=
github.com/pion/udp/v2 v2.0.1/go.mod h1:B7uvTMP00lzWdyMr/1PVZXtV3wpPIxBRd4Wl6AksXn8=
github.com/pion/webrtc/v3 v3.1.56 h1:ScaiqKQN3liQwT+kJwOBaYP6TwSfixzdUnZmzHAo0a0=
github.com/pion/webrtc/v3 v3.1.56/go.mod h1:7VhbA6ihqJlz6R/INHjyh1b8HpiV9Ct4UQvE1OB/xoM=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -160,8 +157,8 @@ github.com/quic-go/quic-go v0.32.0/go.mod h1:/fCsKANhQIeD5l76c2JFU+07gVE3KaA0FP+
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/shirou/gopsutil/v3 v3.22.10 h1:4KMHdfBRYXGF9skjDWiL4RA2N+E8dRdodU/bOZpPoVg=
github.com/shirou/gopsutil/v3 v3.22.10/go.mod h1:QNza6r4YQoydyCfo6rH0blGfKahgibh4dQmV5xdFkQk=
github.com/shirou/gopsutil/v3 v3.22.11 h1:kxsPKS+Eeo+VnEQ2XCaGJepeP6KY53QoRTETx3+1ndM=
github.com/shirou/gopsutil/v3 v3.22.11/go.mod h1:xl0EeL4vXJ+hQMAGN8B9VFpxukEMA0XdevQOe5MZ1oY=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
@@ -177,21 +174,19 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5/go.mod h1:f1SCnEOt6sc3fOJfPQDRDzHOtSXuTtnz0ImG9kPRDV0=
github.com/tevino/abool v1.2.0 h1:heAkClL8H6w+mK5md9dzsuohKeXHUpY7Vw0ZCKW+huA=
github.com/tevino/abool v1.2.0/go.mod h1:qc66Pna1RiIsPa7O4Egxxs9OqkuxDX55zznh9K07Tzg=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 h1:uyZY++dluUg7iTSsNzuOVln/mC2U2KXwgKLfKLCJ74Y=
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc=
github.com/yapingcat/gomedia v0.0.0-20230727105416-c491e66c9d2a h1:x60q0A7QmoUTzixNz7zVTdEA9JC0oYqm8S51PdbTWgs=
github.com/yapingcat/gomedia v0.0.0-20230727105416-c491e66c9d2a/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
@@ -203,43 +198,40 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.4.0 h1:UVQgzMY87xqpKNgb+kDsll2Igd33HszWHFLmpaRMq/8=
golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -264,46 +256,48 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201214095126-aec9a390925b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -334,5 +328,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
m7s.live/engine/v4 v4.11.18 h1:tgINiHqrrLdWEnoFZ0n2gSwoQwPwBgz+V/NXgo5K4Ks=
m7s.live/engine/v4 v4.11.18/go.mod h1:YoOThdhOpkf7MUDWciy449vfBF7i1p+dtf5o32hOvXY=
m7s.live/engine/v4 v4.13.8 h1:pDl8YWxip5aTidw2Q4NuU+8A6irBraLRfoeBi42S6iQ=
m7s.live/engine/v4 v4.13.8/go.mod h1:k/6iFSuJxmhJL8VO45NAga8BbgZHLLfRXOwCcCzk2s8=
m7s.live/plugin/ps/v4 v4.0.9 h1:79iHz546EdjhH2Op5IMXsjO0SK9tUk9AFSWKo0VON3w=
m7s.live/plugin/ps/v4 v4.0.9/go.mod h1:v59bPt1T+IxuRLRchQ+PwKkLxTRuEY4tbo13lNX6JPc=

120
handle.go
View File

@@ -5,8 +5,8 @@ import (
"crypto/md5"
"encoding/xml"
"fmt"
"strconv"
"github.com/logrusorgru/aurora"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/utils"
@@ -32,7 +32,7 @@ func (a *Authorization) Verify(username, passwd, realm, nonce string) bool {
r2 := a.getDigest(s2)
if r1 == "" || r2 == "" {
plugin.Error("Authorization algorithm wrong")
GB28181Plugin.Error("Authorization algorithm wrong")
return false
}
//3、将密文 1nonce 和密文 2 依次组合获取 1 个字符串,并对这个字符串使用算法加密,获得密文 r3即Response
@@ -53,12 +53,47 @@ func (a *Authorization) getDigest(raw string) string {
}
func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
from, ok := req.From()
if !ok || from.Address == nil {
GB28181Plugin.Error("OnRegister", zap.String("error", "no from"))
return
}
id := from.Address.User().String()
plugin.Sugar().Infof("OnRegister: %s, %s, from: %s", req.Destination(), id, req.Source())
GB28181Plugin.Debug("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
isUnregister := false
if exps := req.GetHeaders("Expires"); len(exps) > 0 {
exp := exps[0]
expSec, err := strconv.ParseInt(exp.Value(), 10, 32)
if err != nil {
GB28181Plugin.Info("OnRegister",
zap.String("error", fmt.Sprintf("wrong expire header value %q", exp)),
zap.String("id", id),
zap.String("source", req.Source()),
zap.String("destination", req.Destination()))
return
}
if expSec == 0 {
isUnregister = true
}
} else {
GB28181Plugin.Info("OnRegister",
zap.String("error", "has no expire header"),
zap.String("id", id),
zap.String("source", req.Source()),
zap.String("destination", req.Destination()))
return
}
GB28181Plugin.Info("OnRegister",
zap.Bool("isUnregister", isUnregister),
zap.String("id", id),
zap.String("source", req.Source()),
zap.String("destination", req.Destination()))
if len(id) != 20 {
plugin.Sugar().Infof("Wrong GB-28181 id: %s", id)
GB28181Plugin.Info("Wrong GB-28181", zap.String("id", id))
return
}
passAuth := false
@@ -96,11 +131,21 @@ func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
}
if passAuth {
var d *Device
if v, ok := Devices.Load(id); ok {
d = v.(*Device)
c.RecoverDevice(d, req)
if isUnregister {
tmpd, ok := Devices.LoadAndDelete(id)
if ok {
GB28181Plugin.Info("Unregister Device", zap.String("id", id))
d = tmpd.(*Device)
} else {
return
}
} else {
d = c.StoreDevice(id, req)
if v, ok := Devices.Load(id); ok {
d = v.(*Device)
c.RecoverDevice(d, req)
} else {
d = c.StoreDevice(id, req)
}
}
DeviceNonce.Delete(id)
DeviceRegisterCount.Delete(id)
@@ -115,9 +160,14 @@ func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
Contents: time.Now().Format(TIME_LAYOUT),
})
_ = tx.Respond(resp)
//订阅设备更新
go d.syncChannels()
if !isUnregister {
//订阅设备更新
go d.syncChannels()
}
} else {
GB28181Plugin.Info("OnRegister unauthorized", zap.String("id", id), zap.String("source", req.Source()),
zap.String("destination", req.Destination()))
response := sip.NewResponseFromRequest("", req, http.StatusUnauthorized, "Unauthorized", "")
_nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
auth := fmt.Sprintf(
@@ -148,28 +198,29 @@ func (d *Device) syncChannels() {
func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
id := from.Address.User().String()
plugin.Sugar().Debugf("SIP<-OnMessage from %s : %s", req.Source(), req.String())
GB28181Plugin.Debug("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
if v, ok := Devices.Load(id); ok {
d := v.(*Device)
switch d.Status {
case "RECOVER":
case DeviceOfflineStatus, DeviceRecoverStatus:
c.RecoverDevice(d, req)
go d.syncChannels()
//return
case string(sip.REGISTER):
d.Status = "ONLINE"
case DeviceRegisterStatus:
d.Status = DeviceOnlineStatus
}
d.UpdateTime = time.Now()
temp := &struct {
XMLName xml.Name
CmdType string
SN int // 请求序列号,一般用于对应 request 和 response
DeviceID string
DeviceName string
Manufacturer string
Model string
Channel string
DeviceList []*Channel `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"`
DeviceList []ChannelInfo `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"`
SumNum int // 录像结果的总数 SumNum录像结果会按照多条消息返回可用于判断是否全部返回
}{}
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
decoder.CharsetReader = charset.NewReaderLabel
@@ -177,7 +228,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body()))
if err != nil {
plugin.Error("decode catelog err", zap.Error(err))
GB28181Plugin.Error("decode catelog err", zap.Error(err))
}
}
var body string
@@ -185,40 +236,43 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
case "Keepalive":
d.LastKeepaliveAt = time.Now()
//callID !="" 说明是订阅的事件类型信息
if d.ChannelMap == nil || len(d.ChannelMap) == 0 {
if d.lastSyncTime.IsZero() {
go d.syncChannels()
} else {
for _, ch := range d.ChannelMap {
ch.TryAutoInvite()
}
d.channelMap.Range(func(key, value interface{}) bool {
if conf.InviteMode == INVIDE_MODE_AUTO {
value.(*Channel).TryAutoInvite(&InviteOptions{})
}
return true
})
}
//为什么要查找子码流?
//d.CheckSubStream()
//在KeepLive 进行位置订阅的处理,如果开启了自动订阅位置,则去订阅位置
if c.Position.AutosubPosition && time.Since(d.GpsTime) > c.Position.Interval*2 {
d.MobilePositionSubscribe(d.ID, c.Position.Expires, c.Position.Interval)
plugin.Sugar().Debugf("位置自动订阅,设备[%s]成功\n", d.ID)
GB28181Plugin.Debug("Mobile Position Subscribe", zap.String("deviceID", d.ID))
}
case "Catalog":
d.UpdateChannels(temp.DeviceList)
d.UpdateChannels(temp.DeviceList...)
case "RecordInfo":
d.UpdateRecord(temp.DeviceID, temp.RecordList)
RecordQueryLink.Put(d.ID, temp.DeviceID, temp.SN, temp.SumNum, temp.RecordList)
case "DeviceInfo":
// 主设备信息
d.Name = temp.DeviceName
d.Manufacturer = temp.Manufacturer
d.Model = temp.Model
case "Alarm":
d.Status = "Alarmed"
d.Status = DeviceAlarmedStatus
body = BuildAlarmResponseXML(d.ID)
default:
plugin.Sugar().Warnf("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:\n", req.Body)
d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
tx.Respond(response)
return
}
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
} else {
GB28181Plugin.Debug("Unauthorized message, device not found", zap.String("id", id))
}
}
func (c *GB28181Config) OnBye(req sip.Request, tx sip.ServerTransaction) {
@@ -250,7 +304,7 @@ func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body()))
if err != nil {
plugin.Error("decode catelog err", zap.Error(err))
GB28181Plugin.Error("decode catelog err", zap.Error(err))
}
}
var body string
@@ -264,7 +318,7 @@ func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
// case "Alarm":
// //报警事件通知 TODO
default:
plugin.Sugar().Warnf("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:", req.Body)
d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
tx.Respond(response)
return

View File

@@ -8,12 +8,14 @@ import (
)
type InviteOptions struct {
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
StreamPath string
recyclePort func(p uint16) (err error)
}
func (o InviteOptions) IsLive() bool {

119
link.go Normal file
View File

@@ -0,0 +1,119 @@
package gb28181
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
)
// 对于录像查询,通过 queryKey (即 deviceId + channelId + sn) 唯一区分一次请求和响应
// 并将其关联起来,以实现异步响应的目的
// 提供单例实例供调用
var RecordQueryLink = NewRecordQueryLink(time.Second * 60)
type recordQueryLink struct {
pendingResult map[string]recordQueryResult // queryKey 查询结果缓存
pendingResp map[string]recordQueryResp // queryKey 待回复的查询请求
timeout time.Duration // 查询结果的过期时间
sync.RWMutex
}
type recordQueryResult struct {
time time.Time
err error
sum int
finished bool
list []*Record
}
type recordQueryResp struct {
respChan chan<- recordQueryResult
timeout time.Duration
startTime time.Time
}
func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
c := &recordQueryLink{
timeout: resultTimeout,
pendingResult: make(map[string]recordQueryResult),
pendingResp: make(map[string]recordQueryResp),
}
return c
}
// 唯一区分一次录像查询
func recordQueryKey(deviceId, channelId string, sn int) string {
return fmt.Sprintf("%s-%s-%d", deviceId, channelId, sn)
}
// 定期清理过期的查询结果和请求
func (c *recordQueryLink) cleanTimeout() {
for k, s := range c.pendingResp {
if time.Since(s.startTime) > s.timeout {
if r, ok := c.pendingResult[k]; ok {
c.notify(k, r)
} else {
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
}
}
}
for k, r := range c.pendingResult {
if time.Since(r.time) > c.timeout {
delete(c.pendingResult, k)
}
}
}
func (c *recordQueryLink) Put(deviceId, channelId string, sn int, sum int, record []*Record) {
key, r := c.doPut(deviceId, channelId, sn, sum, record)
if r.finished {
c.notify(key, r)
}
}
func (c *recordQueryLink) doPut(deviceId, channelId string, sn, sum int, record []*Record) (key string, r recordQueryResult) {
c.Lock()
defer c.Unlock()
key = recordQueryKey(deviceId, channelId, sn)
if v, ok := c.pendingResult[key]; ok {
r = v
} else {
r = recordQueryResult{time: time.Now(), sum: sum, list: make([]*Record, 0)}
}
r.list = append(r.list, record...)
if len(r.list) == sum {
r.finished = true
}
c.pendingResult[key] = r
GB28181Plugin.Logger.Debug("put record",
zap.String("key", key),
zap.Int("sum", sum),
zap.Int("count", len(r.list)))
return
}
func (c *recordQueryLink) WaitResult(
deviceId, channelId string, sn int,
timeout time.Duration) (resultCh <-chan recordQueryResult) {
key := recordQueryKey(deviceId, channelId, sn)
c.Lock()
defer c.Unlock()
respCh := make(chan recordQueryResult, 1)
resultCh = respCh
c.pendingResp[key] = recordQueryResp{startTime: time.Now(), timeout: timeout, respChan: respCh}
return
}
func (c *recordQueryLink) notify(key string, r recordQueryResult) {
if s, ok := c.pendingResp[key]; ok {
s.respChan <- r
}
c.Lock()
defer c.Unlock()
delete(c.pendingResp, key)
delete(c.pendingResult, key)
GB28181Plugin.Logger.Debug("record notify", zap.String("key", key))
}

107
main.go
View File

@@ -1,14 +1,15 @@
package gb28181
import (
"fmt"
"os"
"strings"
"sync"
"time"
myip "github.com/husanpao/ip"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
type GB28181PositionConfig struct {
@@ -18,21 +19,23 @@ type GB28181PositionConfig struct {
}
type GB28181Config struct {
AutoInvite bool `default:"true"`
PreFetchRecord bool
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
ListenAddr string `default:"0.0.0.0"`
InviteMode int `default:"1"` //邀请模式0:手动拉流1:预拉流2:按需拉流
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
ListenAddr string `default:"0.0.0.0"`
//sip服务器的配置
SipNetwork string `default:"udp"` //传输协议默认UDP可选TCP
SipIP string //sip 服务器公网IP
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
Username string //sip 服务器账号
Password string //sip 服务器密码
SipNetwork string `default:"udp"` //传输协议默认UDP可选TCP
SipIP string //sip 服务器公网IP
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
Username string //sip 服务器账号
Password string //sip 服务器密码
Port struct { // 新配置方式
Sip string `default:"udp:5060"`
Media string `default:"tcp:58200-59200"`
}
// AckTimeout uint16 //sip 服务应答超时,单位秒
RegisterValidity time.Duration `default:"60s"` //注册有效期,单位秒,默认 3600
RegisterValidity time.Duration `default:"3600s"` //注册有效期,单位秒,默认 3600
// RegisterInterval int //注册间隔,单位秒,默认 60
HeartbeatInterval time.Duration `default:"60s"` //心跳间隔,单位秒,默认 60
// HeartbeatRetry int //心跳超时次数,默认 3
@@ -41,21 +44,22 @@ type GB28181Config struct {
MediaIP string //媒体服务器地址
MediaPort uint16 `default:"58200"` //媒体服务器端口
MediaNetwork string `default:"tcp"` //媒体传输协议默认UDP可选TCP
MediaPortMin uint16
MediaPortMax uint16
MediaPortMin uint16 `default:"58200"`
MediaPortMax uint16 `default:"59200"`
// MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔
UdpCacheSize int //udp缓存大小
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
routes map[string]string
DumpPath string //dump PS流本地文件路径
RtpReorder bool `default:"true"`
config.Publish
Server
// UdpCacheSize int //udp缓存大小
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
routes map[string]string
DumpPath string //dump PS流本地文件路径
Ignores map[string]struct{}
tcpPorts PortManager
udpPorts PortManager
Position GB28181PositionConfig //关于定位的配置参数
}
func (c *GB28181Config) initRoutes() {
@@ -67,15 +71,63 @@ func (c *GB28181Config) initRoutes() {
c.routes[k[0:lastdot]] = k
}
}
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
GB28181Plugin.Info("LocalAndInternalIPs", zap.Any("routes", c.routes))
}
func (c *GB28181Config) OnEvent(event any) {
switch event.(type) {
switch e := event.(type) {
case FirstConfig:
if c.Port.Sip != "udp:5060" {
protocol, ports := util.Conf2Listener(c.Port.Sip)
c.SipNetwork = protocol
c.SipPort = ports[0]
}
if c.Port.Media != "tcp:58200-59200" {
protocol, ports := util.Conf2Listener(c.Port.Media)
c.MediaNetwork = protocol
if len(ports) > 1 {
c.MediaPortMin = ports[0]
c.MediaPortMax = ports[1]
} else {
c.MediaPortMin = 0
c.MediaPortMax = 0
c.MediaPort = ports[0]
}
}
os.MkdirAll(c.DumpPath, 0766)
c.ReadDevices()
go c.initRoutes()
c.startServer()
case *Stream:
if c.InviteMode == INVIDE_MODE_ONSUBSCRIBE {
//流可能是回放流stream path是device/channel/start-end形式
streamNames := strings.Split(e.StreamName, "/")
if channel := FindChannel(e.AppName, streamNames[0]); channel != nil {
opt := InviteOptions{}
if len(streamNames) > 1 {
last := len(streamNames) - 1
timestr := streamNames[last]
trange := strings.Split(timestr, "-")
if len(trange) == 2 {
startTime := trange[0]
endTime := trange[1]
opt.Validate(startTime, endTime)
}
}
channel.TryAutoInvite(&opt)
}
}
case SEpublish:
if channel := FindChannel(e.Target.AppName, strings.TrimSuffix(e.Target.StreamName, "/rtsp")); channel != nil {
channel.LiveSubSP = e.Target.Path
}
case SEclose:
if channel := FindChannel(e.Target.AppName, strings.TrimSuffix(e.Target.StreamName, "/rtsp")); channel != nil {
channel.LiveSubSP = ""
}
if v, ok := PullStreams.LoadAndDelete(e.Target.Path); ok {
go v.(*PullStream).Bye()
}
}
}
@@ -85,4 +137,5 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
var conf GB28181Config
var plugin = InstallPlugin(&conf)
var GB28181Plugin = InstallPlugin(&conf)
var PullStreams sync.Map //拉流

View File

@@ -2,6 +2,7 @@ package gb28181
import (
"fmt"
"strconv"
"time"
)
@@ -43,6 +44,17 @@ var (
</Query>`
)
func intTotime(t int64) time.Time {
tstr := strconv.FormatInt(t, 10)
if len(tstr) == 10 {
return time.Unix(t, 0)
}
if len(tstr) == 13 {
return time.UnixMilli(t)
}
return time.Now()
}
// BuildDeviceInfoXML 获取设备详情指令
func BuildDeviceInfoXML(sn int, id string) string {
return fmt.Sprintf(DeviceInfoXML, sn, id)
@@ -55,7 +67,7 @@ func BuildCatalogXML(sn int, id string) string {
// BuildRecordInfoXML 获取录像文件列表指令
func BuildRecordInfoXML(sn int, id string, start, end int64) string {
return fmt.Sprintf(RecordInfoXML, sn, id, time.Unix(start, 0).Format("2006-01-02T15:04:05"), time.Unix(end, 0).Format("2006-01-02T15:04:05"))
return fmt.Sprintf(RecordInfoXML, sn, id, intTotime(start).Format("2006-01-02T15:04:05"), intTotime(end).Format("2006-01-02T15:04:05"))
}
// BuildDevicePositionXML 订阅设备位置

47
ptz.go Normal file
View File

@@ -0,0 +1,47 @@
package gb28181
import "fmt"
var (
name2code = map[string]uint8{
"stop": 0,
"right": 1,
"left": 2,
"down": 4,
"downright": 5,
"downleft": 6,
"up": 8,
"upright": 9,
"upleft": 10,
"zoomin": 16,
"zoomout": 32,
}
)
func toPtzStrByCmdName(cmdName string, horizontalSpeed, verticalSpeed, zoomSpeed uint8) (string, error) {
c, err := toPtzCode(cmdName)
if err != nil {
return "", err
}
return toPtzStr(c, horizontalSpeed, verticalSpeed, zoomSpeed), nil
}
func toPtzStr(cmdCode, horizontalSpeed, verticalSpeed, zoomSpeed uint8) string {
checkCode := uint16(0xA5+0x0F+0x01+cmdCode+horizontalSpeed+verticalSpeed+(zoomSpeed&0xF0)) % 0x100
return fmt.Sprintf("A50F01%02X%02X%02X%01X0%02X",
cmdCode,
horizontalSpeed,
verticalSpeed,
zoomSpeed>>4, // 根据 GB28181 协议zoom 只取 4 bit
checkCode,
)
}
func toPtzCode(cmd string) (uint8, error) {
if code, ok := name2code[cmd]; ok {
return code, nil
} else {
return 0, fmt.Errorf("invalid ptz cmd %q", cmd)
}
}

View File

@@ -1,236 +0,0 @@
package gb28181
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path/filepath"
"time"
"github.com/ghettovoice/gosip/sip"
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
)
type GBPublisher struct {
PSPublisher
*InviteOptions
channel *Channel
inviteRes sip.Response
udpCache *utils.PriorityQueueRtp
dumpFile *os.File
dumpPrint io.Writer
lastReceive time.Time
}
func (p *GBPublisher) PrintDump(s string) {
if p.dumpPrint != nil {
p.dumpPrint.Write([]byte(s))
}
}
func (p *GBPublisher) OnEvent(event any) {
if p.channel == nil {
// p.parser.EsHandler = p
p.IO.OnEvent(event)
return
}
switch event.(type) {
case IPublisher:
if p.IsLive() {
p.Type = "GB28181 Live"
p.channel.LivePublisher = p
} else {
p.Type = "GB28181 Playback"
p.channel.RecordPublisher = p
}
// p.parser.EsHandler = p
conf.publishers.Add(p.SSRC, p)
if err := error(nil); p.dump != "" {
fp := filepath.Join(p.dump, p.Stream.Path)
os.MkdirAll(filepath.Dir(fp), 0766)
if p.dumpFile, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
p.Error("open dump file failed", zap.Error(err))
}
}
case SEwaitPublish:
//掉线自动重新拉流
if p.IsLive() {
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
go p.channel.Invite(&InviteOptions{})
}
case SEclose, SEKick:
if p.IsLive() {
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
} else {
p.channel.RecordPublisher = nil
}
conf.publishers.Delete(p.SSRC)
if p.dumpFile != nil {
p.dumpFile.Close()
}
p.Bye()
}
p.Publisher.OnEvent(event)
}
func (p *GBPublisher) Bye() int {
res := p.inviteRes
if res == nil {
return 404
}
defer p.Stop()
p.inviteRes = nil
bye := p.channel.CreateRequst(sip.BYE)
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
resp, err := p.channel.device.SipRequestForResponse(bye)
if err != nil {
p.Error("Bye", zap.Error(err))
return ServerInternalError
}
return int(resp.StatusCode())
}
func (p *GBPublisher) Replay(f *os.File) (err error) {
var rtpPacket rtp.Packet
defer f.Close()
if p.dumpPrint != nil {
p.PrintDump(`<style type="text/css">
.gray {
color: gray;
}
</style>
`)
p.PrintDump("<table>")
defer p.PrintDump("</table>")
}
var t uint16
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = f.Read(l)
if err != nil {
return
}
payload := make([]byte, util.ReadBE[int](l[:4]))
t = util.ReadBE[uint16](l[4:])
p.PrintDump(fmt.Sprintf("[<b>%d</b> %d]", t, len(payload)))
_, err = f.Read(payload)
if err != nil {
return
}
rtpPacket.Unmarshal(payload)
p.PushPS(&rtpPacket)
}
return
}
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
var rtpPacket rtp.Packet
networkBuffer := 1048576
port, err = conf.udpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
conf.udpPorts.Recycle(port)
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
p.SetIO(conn)
go func() {
defer conn.Close()
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", port))
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
defer conf.udpPorts.Recycle(port)
dumpLen := make([]byte, 6)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
p.writeDump(ps, dumpLen)
p.PushPS(&rtpPacket)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
}
}()
return
}
func (p *GBPublisher) writeDump(ps util.Buffer, dumpLen []byte) {
if p.dumpFile != nil {
util.PutBE(dumpLen[:4], ps.Len())
if p.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dumpFile.Write(dumpLen)
p.dumpFile.Write(ps)
}
}
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
port, err = conf.tcpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
defer conf.tcpPorts.Recycle(port)
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
go func() {
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
defer conf.tcpPorts.Recycle(port)
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
conn, err := listen.Accept()
listen.Close()
p.SetIO(conn)
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
return
}
var rtpPacket rtp.Packet
ps := make(util.Buffer, 1024)
dumpLen := make([]byte, 6)
defer conn.Close()
for err == nil {
if _, err = io.ReadFull(conn, dumpLen[:2]); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
if _, err = io.ReadFull(conn, ps); err != nil {
return
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if !p.IsClosed() {
p.writeDump(ps, dumpLen)
p.PushPS(&rtpPacket)
}
}
}()
return
}

View File

@@ -1,8 +1,9 @@
package gb28181
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"
@@ -10,15 +11,15 @@ import (
"m7s.live/engine/v4/util"
)
var (
playScaleValues = map[float32]bool{0.25: true, 0.5: true, 1: true, 2: true, 4: true}
)
func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
util.ReturnJson(func() (list []*Device) {
list = make([]*Device, 0)
Devices.Range(func(key, value interface{}) bool {
device := value.(*Device)
if time.Since(device.UpdateTime) > c.RegisterValidity {
Devices.Delete(key)
} else {
list = append(list, device)
}
list = append(list, value.(*Device))
return true
})
return
@@ -26,12 +27,23 @@ func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
}
func (c *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
startTime := r.URL.Query().Get("startTime")
endTime := r.URL.Query().Get("endTime")
query := r.URL.Query()
id := query.Get("id")
channel := query.Get("channel")
startTime := query.Get("startTime")
endTime := query.Get("endTime")
trange := strings.Split(query.Get("range"), "-")
if len(trange) == 2 {
startTime = trange[0]
endTime = trange[1]
}
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.QueryRecord(startTime, endTime))
res, err := c.QueryRecord(startTime, endTime)
if err == nil {
WriteJSONOk(w, res)
} else {
WriteJSON(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.NotFound(w, r)
}
@@ -48,20 +60,67 @@ func (c *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
}
}
func (c *GB28181Config) API_ptz(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
id := q.Get("id")
channel := q.Get("channel")
cmd := q.Get("cmd") // 命令名称,见 ptz.go name2code 定义
hs := q.Get("hSpeed") // 水平速度
vs := q.Get("vSpeed") // 垂直速度
zs := q.Get("zSpeed") // 缩放速度
hsN, err := strconv.ParseUint(hs, 10, 8)
if err != nil {
WriteJSON(w, "hSpeed parameter is invalid", 400)
return
}
vsN, err := strconv.ParseUint(vs, 10, 8)
if err != nil {
WriteJSON(w, "vSpeed parameter is invalid", 400)
return
}
zsN, err := strconv.ParseUint(zs, 10, 8)
if err != nil {
WriteJSON(w, "zSpeed parameter is invalid", 400)
return
}
ptzcmd, err := toPtzStrByCmdName(cmd, uint8(hsN), uint8(vsN), uint8(zsN))
if err != nil {
WriteJSON(w, err.Error(), 400)
return
}
if c := FindChannel(id, channel); c != nil {
code := c.Control(ptzcmd)
WriteJSON(w, "device received", code)
} else {
WriteJSON(w, fmt.Sprintf("device %q channel %q not found", id, channel), 404)
}
}
func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
id := query.Get("id")
channel := query.Get("channel")
streamPath := query.Get("streamPath")
port, _ := strconv.Atoi(query.Get("mediaPort"))
opt := InviteOptions{
dump: query.Get("dump"),
MediaPort: uint16(port),
dump: query.Get("dump"),
MediaPort: uint16(port),
StreamPath: streamPath,
}
opt.Validate(query.Get("startTime"), query.Get("endTime"))
startTime := query.Get("startTime")
endTime := query.Get("endTime")
trange := strings.Split(query.Get("range"), "-")
if len(trange) == 2 {
startTime = trange[0]
endTime = trange[1]
}
opt.Validate(startTime, endTime)
if c := FindChannel(id, channel); c == nil {
http.NotFound(w, r)
} else if opt.IsLive() && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else if opt.IsLive() && c.status.Load() > 0 {
http.Error(w, "live stream already exists", http.StatusNotModified)
} else if code, err := c.Invite(&opt); err == nil {
w.WriteHeader(code)
} else {
@@ -69,46 +128,69 @@ func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
}
}
func (c *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
dump := r.URL.Query().Get("dump")
printOut := r.URL.Query().Get("print")
if dump == "" {
dump = c.DumpPath
}
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
streamPath := r.URL.Query().Get("streamPath")
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Bye(streamPath))
} else {
streamPath := dump
if strings.HasPrefix(dump, "/") {
streamPath = "replay" + dump
} else {
streamPath = "replay/" + dump
}
var pub GBPublisher
pub.SetIO(f)
if err = plugin.Publish(streamPath, &pub); err == nil {
if printOut != "" {
pub.dumpPrint = w
pub.SetParentCtx(r.Context())
err = pub.Replay(f)
} else {
go pub.Replay(f)
w.Write([]byte("ok"))
}
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
http.NotFound(w, r)
}
}
func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
// CORS(w, r)
func (c *GB28181Config) API_play_pause(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
live := r.URL.Query().Get("live")
streamPath := r.URL.Query().Get("streamPath")
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Bye(live != "false"))
w.WriteHeader(c.Pause(streamPath))
} else {
http.NotFound(w, r)
}
}
func (c *GB28181Config) API_play_resume(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
streamPath := r.URL.Query().Get("streamPath")
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Resume(streamPath))
} else {
http.NotFound(w, r)
}
}
func (c *GB28181Config) API_play_seek(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
streamPath := r.URL.Query().Get("streamPath")
secStr := r.URL.Query().Get("second")
sec, err := strconv.ParseUint(secStr, 10, 32)
if err != nil {
WriteJSON(w, "second parameter is invalid: "+err.Error(), 400)
return
}
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.PlayAt(streamPath, uint(sec)))
} else {
http.NotFound(w, r)
}
}
func (c *GB28181Config) API_play_forward(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
streamPath := r.URL.Query().Get("streamPath")
speedStr := r.URL.Query().Get("speed")
speed, err := strconv.ParseFloat(speedStr, 32)
secondErrMsg := "speed parameter is invalid, should be one of 0.25,0.5,1,2,4"
if err != nil || !playScaleValues[float32(speed)] {
WriteJSON(w, secondErrMsg, 400)
return
}
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.PlayForward(streamPath, float32(speed)))
} else {
http.NotFound(w, r)
}
@@ -169,3 +251,13 @@ func (c *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Request)
return
}, c.Position.Interval, w, r)
}
func WriteJSONOk(w http.ResponseWriter, data interface{}) {
WriteJSON(w, data, 200)
}
func WriteJSON(w http.ResponseWriter, data interface{}, status int) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
}

182
server.go
View File

@@ -1,20 +1,14 @@
package gb28181
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
"github.com/logrusorgru/aurora"
"github.com/pion/rtp"
"go.uber.org/zap"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
"github.com/ghettovoice/gosip"
@@ -24,21 +18,14 @@ import (
var srv gosip.Server
type Server struct {
Ignores map[string]struct{}
publishers util.Map[uint32, *GBPublisher]
tcpPorts PortManager
udpPorts PortManager
}
const MaxRegisterCount = 3
func FindChannel(deviceId string, channelId string) (c *Channel) {
if v, ok := Devices.Load(deviceId); ok {
d := v.(*Device)
d.channelMutex.RLock()
c = d.ChannelMap[channelId]
d.channelMutex.RUnlock()
if v, ok := d.channelMap.Load(channelId); ok {
return v.(*Channel)
}
}
return
}
@@ -129,10 +116,9 @@ func RequestForResponse(transport string, request sip.Request,
}
func (c *GB28181Config) startServer() {
c.publishers.Init()
addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
logger := utils.NewZapLogger(plugin.Logger, "GB SIP Server", nil)
logger := utils.NewZapLogger(GB28181Plugin.Logger, "GB SIP Server", nil)
logger.SetLevel(levelMap[c.LogLevel])
// logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
srvConf := gosip.ServerConfig{}
@@ -146,111 +132,17 @@ func (c *GB28181Config) startServer() {
srv.OnRequest(sip.BYE, c.OnBye)
err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
if err != nil {
plugin.Logger.Error("gb28181 server listen", zap.Error(err))
GB28181Plugin.Logger.Error("gb28181 server listen", zap.Error(err))
} else {
plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
GB28181Plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
}
go c.startMediaServer()
if c.Username != "" || c.Password != "" {
go c.removeBanDevice()
}
}
func (c *GB28181Config) startMediaServer() {
if c.MediaNetwork == "tcp" {
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
if !c.tcpPorts.Valid {
c.listenMediaTCP()
}
} else {
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
if !c.udpPorts.Valid {
c.listenMediaUDP()
}
}
}
func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
var rtpPacket rtp.Packet
reader := bufio.NewReader(conn)
defer conn.Close()
var err error
dumpLen := make([]byte, 6)
ps := make(util.Buffer, 1024)
for err == nil {
if _, err = io.ReadFull(reader, dumpLen[:2]); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
if _, err = io.ReadFull(reader, ps); err != nil {
return
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
publisher.writeDump(ps, dumpLen)
publisher.PushPS(&rtpPacket)
} else {
plugin.Info("gb28181 publisher not found", zap.Uint32("ssrc", rtpPacket.SSRC))
}
}
}
func (c *GB28181Config) listenMediaTCP() {
addr := ":" + strconv.Itoa(int(c.MediaPort))
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
plugin.Error("MediaServer listened tcp err", zap.String("addr", addr), zap.Error(err))
return
}
plugin.Sugar().Infof("MediaServer started tcp at %s", addr)
defer listen.Close()
defer plugin.Info("MediaServer stopped tcp at", zap.Uint16("port", c.MediaPort))
for {
conn, err := listen.Accept()
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
}
go c.processTcpMediaConn(conn)
}
}
func (c *GB28181Config) listenMediaUDP() {
var rtpPacket rtp.Packet
networkBuffer := 1048576
addr := ":" + strconv.Itoa(int(c.MediaPort))
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
plugin.Error(" MediaServer started listening udp err", zap.String("addr", addr), zap.Error(err))
return
}
bufUDP := make([]byte, networkBuffer)
plugin.Sugar().Infof("MediaServer started at udp %s", addr)
defer plugin.Sugar().Infof("MediaServer stopped at udp %s", addr)
dumpLen := make([]byte, 6)
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
t := time.Now()
if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
publisher.writeDump(ps, dumpLen)
publisher.PushPS(&rtpPacket)
}
x := time.Since(t)
if x > time.Millisecond {
fmt.Println(x)
}
}
go c.startJob()
}
// func queryCatalog(config *transaction.Config) {
@@ -268,14 +160,58 @@ func (c *GB28181Config) listenMediaUDP() {
// }
// }
func (c *GB28181Config) removeBanDevice() {
t := time.NewTicker(c.RemoveBanInterval)
for range t.C {
DeviceRegisterCount.Range(func(key, value interface{}) bool {
if value.(int) > MaxRegisterCount {
DeviceRegisterCount.Delete(key)
// 定时任务
func (c *GB28181Config) startJob() {
statusTick := time.NewTicker(c.HeartbeatInterval / 2)
banTick := time.NewTicker(c.RemoveBanInterval)
linkTick := time.NewTicker(time.Millisecond * 100)
GB28181Plugin.Debug("start job")
for {
select {
case <-banTick.C:
if c.Username != "" || c.Password != "" {
c.removeBanDevice()
}
return true
})
case <-statusTick.C:
c.statusCheck()
case <-linkTick.C:
RecordQueryLink.cleanTimeout()
}
}
}
func (c *GB28181Config) removeBanDevice() {
DeviceRegisterCount.Range(func(key, value interface{}) bool {
if value.(int) > MaxRegisterCount {
DeviceRegisterCount.Delete(key)
}
return true
})
}
// statusCheck
// - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线
// - 当设备超过注册有效期内为发送过消息,则从设备列表中删除
// UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间
func (c *GB28181Config) statusCheck() {
Devices.Range(func(key, value any) bool {
d := value.(*Device)
if time.Since(d.UpdateTime) > c.RegisterValidity {
Devices.Delete(key)
GB28181Plugin.Info("Device register timeout",
zap.String("id", d.ID),
zap.Time("registerTime", d.RegisterTime),
zap.Time("updateTime", d.UpdateTime),
)
} else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
d.Status = DeviceOfflineStatus
d.channelMap.Range(func(key, value any) bool {
ch := value.(*Channel)
ch.Status = ChannelOffStatus
return true
})
GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
}
return true
})
}

View File

@@ -6,17 +6,18 @@ import (
"github.com/ghettovoice/gosip/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
m7slog "m7s.live/engine/v4/log"
)
type ZapLogger struct {
log *zap.Logger
log *m7slog.Logger
prefix string
fields log.Fields
sugared *zap.SugaredLogger
level log.Level
}
func NewZapLogger(log *zap.Logger, prefix string, fields log.Fields) (z *ZapLogger) {
func NewZapLogger(log *m7slog.Logger, prefix string, fields log.Fields) (z *ZapLogger) {
z = &ZapLogger{
log: log,
prefix: prefix,