Compare commits

...

12 Commits

Author SHA1 Message Date
dexter
682aec656b Merge pull request #90 from rufftio/v4
ptz 控制接口,采用更易理解和使用的参数
2023-06-05 18:40:32 +08:00
ogofly
fbd8683f5b Merge branch 'Monibuca:v4' into v4 2023-06-05 18:18:50 +08:00
liuyancong
b15e4ee89c add: ptz 控制接口,采用更易理解和使用的参数 2023-06-05 18:17:46 +08:00
langhuihui
3c7b3a042d fix: list接口为空时返回[] 而不是null 2023-05-25 14:12:57 +08:00
dexter
858df1377e Merge pull request #88 from ogofly/v4
录像查询重构为在当前查询的http响应中返回
2023-05-24 11:36:14 +08:00
liuyancong
60021d3cd9 录像查询重构为在当前查询的http响应中返回 2023-05-24 11:30:50 +08:00
langhuihui
d8061cd7c3 channel结构体反转 2023-05-23 20:56:24 +08:00
langhuihui
ed397063c4 chroe: update log format 2023-05-21 22:12:09 +08:00
langhuihui
5853120d30 update readme 2023-05-17 09:07:12 +08:00
langhuihui
4c47df0695 fix: update dep ps version to 4.0.1 2023-05-16 23:11:26 +08:00
langhuihui
37fd121d11 feat: change to use ps plugin 2023-05-14 11:12:25 +08:00
langhuihui
05fd8c38f7 feat: add a new way to config port 2023-05-04 09:57:20 +08:00
14 changed files with 582 additions and 706 deletions

View File

@@ -18,16 +18,13 @@ _ "m7s.live/plugin/gb28181/v4"
```yaml
gb28181:
autoinvite: true #表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流
invitemode: 1 #0、手动invite 1、表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流,2、按需拉流既等待订阅者触发
position:
autosubposition: false #是否自动订阅定位
expires: 3600s #订阅周期(单位:秒)默认3600
interval: 6s #订阅间隔单位默认6
prefetchrecord: false
udpcachesize: 0 #表示UDP缓存大小默认为0不开启。仅当TCP关闭切缓存大于0时才开启
sipnetwork: udp
sipip: "" #sip服务器地址 默认 自动适配设备网段
sipport: 5060
serial: "34020000002000000001"
realm: "3402000000"
username: ""
@@ -36,10 +33,9 @@ gb28181:
registervalidity: 60s #注册有效期
mediaip: "" #媒体服务器地址 默认 自动适配设备网段
mediaport: 58200 #媒体服务器端口,用于接收设备的流
medianetwork: tcp
mediaportmin: 0 #媒体服务器端口范围最小值,设置后将开启端口范围模式
mediaportmax: 0 #媒体服务器端口范围最大值,设置后将开启端口范围模式
port:
sip: udp:5060 #sip服务器端口
media: tcp:58200 #媒体服务器端口,用于接收设备的流,范围端口表示法udp:50000-60000
removebaninterval: 10m #定时移除注册失败的设备黑名单单位秒默认10分钟600秒
loglevel: info
@@ -106,7 +102,7 @@ type Device struct {
| startTime | 否 | 开始时间纯数字Unix时间戳 |
| endTime | 否 | 结束时间纯数字Unix时间戳 |
返回200代表成功
返回200代表成功, 304代表已经在拉取中不能重复拉仅仅针对直播流
### 停止从设备拉流
@@ -117,6 +113,8 @@ type Device struct {
| id | 是 | 设备ID |
| channel | 是 | 通道编号 |
http 200 表示成功404流不存在
### 发送控制命令
`/gb28181/api/control`

View File

@@ -5,35 +5,63 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"
"sync/atomic"
"github.com/ghettovoice/gosip/sip"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/plugin/gb28181/v4/utils"
"m7s.live/plugin/ps/v4"
)
type ChannelEx struct {
device *Device
RecordPublisher *GBPublisher `json:"-"`
LivePublisher *GBPublisher
LiveSubSP string //实时子码流
Records []*Record
RecordStartTime string
RecordEndTime string
recordStartTime time.Time
recordEndTime time.Time
liveInviteLock *sync.Mutex
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
var QUERY_RECORD_TIMEOUT = time.Second * 5
type PullStream struct {
opt *InviteOptions
channel *Channel
inviteRes sip.Response
}
func (p *PullStream) Bye() int {
res := p.inviteRes
bye := p.channel.CreateRequst(sip.BYE)
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
resp, err := p.channel.device.SipRequestForResponse(bye)
if p.opt.IsLive() {
p.channel.status.Store(0)
// defer p.channel.TryAutoInvite(p.opt)
}
if p.opt.recyclePort != nil {
p.opt.recyclePort(p.opt.MediaPort)
}
if err != nil {
return http.StatusInternalServerError
}
return int(resp.StatusCode())
}
type Channel struct {
device *Device // 所属设备
status atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放
LiveSubSP string // 实时子码流通过rtsp
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
*log.Logger `json:"-" yaml:"-"`
ChannelInfo
}
// Channel 通道
type Channel struct {
DeviceID string
type ChannelInfo struct {
DeviceID string // 通道ID
ParentID string
Name string
Manufacturer string
@@ -47,8 +75,6 @@ type Channel struct {
RegisterWay int
Secrecy int
Status string
Children []*Channel `json:"-"`
ChannelEx //自定义属性
}
func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
@@ -75,9 +101,13 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request)
//非同一域的目标地址需要使用@host
host := conf.Realm
if channel.DeviceID[0:9] != host {
deviceIp := d.NetAddr
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
if channel.Port != 0 {
deviceIp := d.NetAddr
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
} else {
host = d.NetAddr
}
}
channelAddr := sip.Address{
@@ -106,13 +136,9 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request)
req.SetDestination(d.NetAddr)
return req
}
func (channel *Channel) QueryRecord(startTime, endTime string) int {
func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error) {
d := channel.device
channel.RecordStartTime = startTime
channel.RecordEndTime = endTime
channel.recordStartTime, _ = time.Parse(TIME_LAYOUT, startTime)
channel.recordEndTime, _ = time.Parse(TIME_LAYOUT, endTime)
channel.Records = nil
request := d.CreateRequest(sip.MESSAGE)
contentType := sip.ContentType("Application/MANSCDP+xml")
request.AppendHeader(&contentType)
@@ -127,12 +153,21 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
<Type>all</Type>
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
request.SetBody(body, true)
resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.sn, QUERY_RECORD_TIMEOUT)
resp, err := d.SipRequestForResponse(request)
if err != nil {
return http.StatusRequestTimeout
return nil, fmt.Errorf("query error: %s", err)
}
return int(resp.StatusCode())
if resp.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("query error, status=%d", resp.StatusCode())
}
// RecordQueryLink 中加了超时机制,该结果一定会返回
// 所以此处不用再增加超时等保护机制
r := <-resultCh
return r.list, r.err
}
func (channel *Channel) Control(PTZCmd string) int {
d := channel.device
request := d.CreateRequest(sip.MESSAGE)
@@ -199,18 +234,27 @@ f = v/a/编码格式/码率大小/采样率
f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
if opt.IsLive() {
if !channel.liveInviteLock.TryLock() {
if !channel.status.CompareAndSwap(0, 1) {
return 304, nil
}
defer func() {
if code != OK {
channel.liveInviteLock.Unlock()
if err != nil {
GB28181Plugin.Error("Invite", zap.Error(err))
channel.status.Store(0)
if conf.InviteMode == 1 {
// 5秒后重试
time.AfterFunc(time.Second*5, func() {
channel.Invite(opt)
})
}
} else {
channel.status.Store(2)
}
}()
}
channel.Bye(opt.IsLive())
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
s := "Play"
@@ -219,37 +263,36 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
s = "Playback"
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
}
if opt.StreamPath != "" {
streamPath = opt.StreamPath
}
if opt.dump == "" {
opt.dump = conf.DumpPath
}
publisher := &GBPublisher{
InviteOptions: opt,
channel: channel,
}
publisher.DisableReorder = !conf.RtpReorder
protocol := ""
networkType := "udp"
resuePort := true
if conf.IsMediaNetworkTCP() {
networkType = "tcp"
protocol = "TCP/"
if conf.tcpPorts.Valid {
opt.MediaPort, err = publisher.ListenTCP()
if err != nil {
return ServerInternalError, err
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
opt.MediaPort, err = conf.tcpPorts.GetPort()
opt.recyclePort = conf.tcpPorts.Recycle
resuePort = false
}
publisher.DisableReorder = true
} else {
if conf.udpPorts.Valid {
opt.MediaPort, err = publisher.ListenUDP()
if err != nil {
code = ServerInternalError
return
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
opt.MediaPort, err = conf.udpPorts.GetPort()
opt.recyclePort = conf.udpPorts.Recycle
resuePort = false
}
}
if err != nil {
return http.StatusInternalServerError, err
}
if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
sdpInfo := []string{
"v=0",
@@ -262,7 +305,6 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
"a=recvonly",
"a=rtpmap:96 PS/90000",
"y=" + opt.ssrc,
"",
}
if conf.IsMediaNetworkTCP() {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
@@ -271,74 +313,70 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
contentType := sip.ContentType("application/sdp")
invite.AppendHeader(&contentType)
invite.SetBody(strings.Join(sdpInfo, "\r\n"), true)
invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true)
subject := sip.GenericHeader{
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
}
invite.AppendHeader(&subject)
publisher.inviteRes, err = d.SipRequestForResponse(invite)
inviteRes, err := d.SipRequestForResponse(invite)
if err != nil {
plugin.Error(fmt.Sprintf("SIP->Invite %s :%s invite error: %s", channel.DeviceID, invite.String(), err.Error()))
channel.Error("invite", zap.Error(err), zap.String("msg", invite.String()))
return http.StatusInternalServerError, err
}
code = int(publisher.inviteRes.StatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
code = int(inviteRes.StatusCode())
channel.Info("invite response", zap.Int("status code", code))
if code == OK {
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
if code == http.StatusOK {
ds := strings.Split(inviteRes.Body(), "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
if ls[0] == "y" && len(ls[1]) > 0 {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
opt.SSRC = uint32(_ssrc)
} else {
plugin.Error("read invite response y ", zap.Error(err))
channel.Error("read invite response y ", zap.Error(err))
}
break
}
}
}
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
publisher.udpCache = utils.NewPqRtp()
err = ps.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, resuePort)
if err == nil {
PullStreams.Store(streamPath, &PullStream{
opt: opt,
channel: channel,
inviteRes: inviteRes,
})
err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil))
}
if err = plugin.Publish(streamPath, publisher); err != nil {
code = ServerInternalError
return
}
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
srv.Send(ack)
} else if channel.CanInvite() {
time.AfterFunc(time.Second*5, func() {
channel.TryAutoInvite()
})
}
return
}
func (channel *Channel) Bye(live bool) int {
func (channel *Channel) Bye(streamPath string) int {
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
if s := Streams.Get(streamPath); s != nil {
s.Close()
if streamPath == "" {
streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
}
if live && channel.LivePublisher != nil {
return channel.LivePublisher.Bye()
if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded {
s.(*PullStream).Bye()
if s := Streams.Get(streamPath); s != nil {
s.Close()
}
return http.StatusOK
}
if !live && channel.RecordPublisher != nil {
return channel.RecordPublisher.Bye()
}
return 404
return http.StatusNotFound
}
func (channel *Channel) TryAutoInvite() {
if conf.AutoInvite && channel.CanInvite() {
go channel.Invite(&InviteOptions{})
func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
if channel.CanInvite() {
go channel.Invite(opt)
}
}
func (channel *Channel) CanInvite() bool {
if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
if channel.status.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
return false
}

View File

@@ -1,60 +1,5 @@
package gb28181
const (
Trying = 100
Ringing = 180
CallIsBeingForwarded = 181
Queued = 182
SessionProgress = 183
OK = 200
Accepted = 202
MultipleChoices = 300
MovedPermanently = 301
MovedTemporarily = 302
UseProxy = 305
AlternativeService = 380
BadRequest = 400
Unauthorized = 401
PaymentRequired = 402
Forbidden = 403
NotFound = 404
MethodNotAllowed = 405
NotAcceptable = 406
ProxyAuthenticationRequired = 407
RequestTimeout = 408
Gone = 410
RequestEntityTooLarge = 413
RequestURITooLong = 414
UnsupportedMediaType = 415
UnsupportedURIScheme = 416
BadExtension = 420
ExtensionRequired = 421
IntervalTooBrief = 423
TemporarilyUnavailable = 480
CallTransactionDoesNotExist = 481
LoopDetected = 482
TooManyHops = 483
AddressIncomplete = 484
Ambiguous = 485
BusyHere = 486
RequestTerminated = 487
NotAcceptableHere = 488
BadEvent = 489
RequestPending = 491
Undecipherable = 493
ServerInternalError = 500
NotImplemented = 501
BadGateway = 502
ServiceUnavailable = 503
ServerTim = 504
VersionNotSupported = 505
MessageTooLarge = 513
BusyEverywhere = 600
Decline = 603
DoesNotExistAnywhere = 604
SessionNotAcceptable = 606
)
var reasons = map[int]string{
100: "Trying",
180: "Ringing",
@@ -113,3 +58,9 @@ var reasons = map[int]string{
func Explain(statusCode int) string {
return reasons[statusCode]
}
const (
INVIDE_MODE_MANUAL = iota
INVIDE_MODE_AUTO
INVIDE_MODE_ONSUBSCRIBE
)

167
device.go
View File

@@ -10,10 +10,9 @@ import (
"sync"
"time"
"golang.org/x/exp/maps"
"go.uber.org/zap"
"m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/plugin/gb28181/v4/utils"
// . "github.com/logrusorgru/aurora"
@@ -25,7 +24,6 @@ const TIME_LAYOUT = "2006-01-02T15:04:05"
// Record 录像
type Record struct {
//channel *Channel
DeviceID string
Name string
FilePath string
@@ -47,7 +45,7 @@ var (
)
type Device struct {
//*transaction.Core `json:"-"`
//*transaction.Core `json:"-" yaml:"-"`
ID string
Name string
Manufacturer string
@@ -62,8 +60,7 @@ type Device struct {
sipIP string //设备对应网卡的服务器ip
mediaIP string //设备对应网卡的服务器ip
NetAddr string
ChannelMap map[string]*Channel
channelMutex sync.RWMutex
channelMap sync.Map
subscriber struct {
CallID string
Timeout time.Time
@@ -72,17 +69,23 @@ type Device struct {
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
*log.Logger `json:"-" yaml:"-"`
}
func (d *Device) MarshalJSON() ([]byte, error) {
type Alias Device
return json.Marshal(&struct {
Channels []*Channel
data := &struct {
Channels []*ChannelInfo
*Alias
}{
Channels: maps.Values(d.ChannelMap),
Alias: (*Alias)(d),
Alias: (*Alias)(d),
}
d.channelMap.Range(func(key, value interface{}) bool {
c := value.(*Channel)
data.Channels = append(data.Channels, &c.ChannelInfo)
return true
})
return json.Marshal(data)
}
func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
from, _ := req.From()
@@ -110,15 +113,12 @@ func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
if c.MediaIP != "" {
mediaIp = c.MediaIP
}
plugin.Info("RecoverDevice", zap.String("id", d.ID), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d.Info("RecoverDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d.Status = string(sip.REGISTER)
d.sipIP = sipIP
d.mediaIP = mediaIp
d.NetAddr = deviceIp
d.UpdateTime = time.Now()
if d.ChannelMap == nil {
d.ChannelMap = make(map[string]*Channel)
}
}
func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
@@ -133,7 +133,7 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
d.UpdateTime = time.Now()
d.NetAddr = deviceIp
d.addr = deviceAddr
plugin.Debug("UpdateDevice", zap.String("id", id), zap.String("netaddr", d.NetAddr))
d.Debug("UpdateDevice", zap.String("netaddr", d.NetAddr))
} else {
servIp := req.Recipient().Host()
//根据网卡ip获取对应的公网ip
@@ -154,7 +154,6 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
if c.MediaIP != "" {
mediaIp = c.MediaIP
}
plugin.Info("StoreDevice", zap.String("id", id), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d = &Device{
ID: id,
RegisterTime: time.Now(),
@@ -164,8 +163,9 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
sipIP: sipIP,
mediaIP: mediaIp,
NetAddr: deviceIp,
ChannelMap: make(map[string]*Channel),
Logger: GB28181Plugin.With(zap.String("id", id)),
}
d.Info("StoreDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
Devices.Store(id, d)
c.SaveDevices()
}
@@ -179,6 +179,7 @@ func (c *GB28181Config) ReadDevices() {
for _, item := range items {
if time.Since(item.UpdateTime) < conf.RegisterValidity {
item.Status = "RECOVER"
item.Logger = GB28181Plugin.With(zap.String("id", item.ID))
Devices.Store(item.ID, item)
}
}
@@ -199,39 +200,31 @@ func (c *GB28181Config) SaveDevices() {
}
}
func (d *Device) addOrUpdateChannel(channel *Channel) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
channel.device = d
if old, ok := d.ChannelMap[channel.DeviceID]; ok {
//复制锁指针
channel.ChannelEx = old.ChannelEx
}
if channel.liveInviteLock == nil {
channel.liveInviteLock = &sync.Mutex{}
}
d.ChannelMap[channel.DeviceID] = channel
}
func (d *Device) deleteChannel(DeviceID string) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
delete(d.ChannelMap, DeviceID)
}
func (d *Device) CheckSubStream() {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
for _, c := range d.ChannelMap {
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
func (d *Device) addOrUpdateChannel(info ChannelInfo) (c *Channel) {
if old, ok := d.channelMap.Load(info.DeviceID); ok {
c = old.(*Channel)
c.ChannelInfo = info
} else {
c = &Channel{
device: d,
ChannelInfo: info,
Logger: d.Logger.With(zap.String("channel", info.DeviceID)),
}
if s := engine.Streams.Get(fmt.Sprintf("%s/%s/rtsp", c.device.ID, c.DeviceID)); s != nil {
c.LiveSubSP = s.Path
} else {
c.LiveSubSP = ""
}
d.channelMap.Store(info.DeviceID, c)
}
return
}
func (d *Device) UpdateChannels(list []*Channel) {
func (d *Device) deleteChannel(DeviceID string) {
d.channelMap.Delete(DeviceID)
}
func (d *Device) UpdateChannels(list ...ChannelInfo) {
for _, c := range list {
if _, ok := conf.Ignores[c.DeviceID]; ok {
continue
@@ -254,32 +247,18 @@ func (d *Device) UpdateChannels(list []*Channel) {
}
}
//本设备增加通道
d.addOrUpdateChannel(c)
channel := d.addOrUpdateChannel(c)
//预取和邀请
if conf.PreFetchRecord {
n := time.Now()
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
if conf.InviteMode == INVIDE_MODE_AUTO {
channel.TryAutoInvite(&InviteOptions{})
}
c.TryAutoInvite()
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
c.LiveSubSP = s.Path
channel.LiveSubSP = s.Path
} else {
c.LiveSubSP = ""
channel.LiveSubSP = ""
}
}
}
func (d *Device) UpdateRecord(channelId string, list []*Record) {
d.channelMutex.RLock()
if c, ok := d.ChannelMap[channelId]; ok {
c.Records = append(c.Records, list...)
}
d.channelMutex.RUnlock()
}
func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
d.sn++
@@ -363,7 +342,7 @@ func (d *Device) Subscribe() int {
response, err := d.SipRequestForResponse(request)
if err == nil && response != nil {
if response.StatusCode() == OK {
if response.StatusCode() == http.StatusOK {
callId, _ := request.CallID()
d.subscriber.CallID = string(*callId)
} else {
@@ -385,13 +364,13 @@ func (d *Device) Catalog() int {
request.AppendHeader(&expires)
request.SetBody(BuildCatalogXML(d.sn, d.ID), true)
// 输出Sip请求设备通道信息信令
plugin.Sugar().Debugf("SIP->Catalog:%s", request)
GB28181Plugin.Sugar().Debugf("SIP->Catalog:%s", request)
resp, err := d.SipRequestForResponse(request)
if err == nil && resp != nil {
plugin.Sugar().Debugf("SIP<-Catalog Response: %s", resp.String())
GB28181Plugin.Sugar().Debugf("SIP<-Catalog Response: %s", resp.String())
return int(resp.StatusCode())
} else if err != nil {
plugin.Error("SIP<-Catalog error:", zap.Error(err))
GB28181Plugin.Error("SIP<-Catalog error:", zap.Error(err))
}
return http.StatusRequestTimeout
}
@@ -413,8 +392,8 @@ func (d *Device) QueryDeviceInfo() {
// received, _ := via.Params.Get("received")
// d.SipIP = received.String()
// }
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s response code:%d", d.ID, d.NetAddr, response.StatusCode()))
if response.StatusCode() == OK {
d.Info("QueryDeviceInfo", zap.Uint16("status code", uint16(response.StatusCode())))
if response.StatusCode() == http.StatusOK {
break
}
}
@@ -442,7 +421,7 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
response, err := d.SipRequestForResponse(mobilePosition)
if err == nil && response != nil {
if response.StatusCode() == OK {
if response.StatusCode() == http.StatusOK {
callId, _ := mobilePosition.CallID()
d.subscriber.CallID = callId.String()
} else {
@@ -455,17 +434,18 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
// UpdateChannelPosition 更新通道GPS坐标
func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
if c, ok := d.ChannelMap[channelId]; ok {
c.ChannelEx.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
c.ChannelEx.Longitude = lng
c.ChannelEx.Latitude = lat
plugin.Sugar().Debugf("更新通道[%s]坐标成功\n", c.Name)
if v, ok := d.channelMap.Load(channelId); ok {
c := v.(*Channel)
c.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
c.Longitude = lng
c.Latitude = lat
c.Debug("update channel position success")
} else {
//如果未找到通道,则更新到设备上
d.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
d.Longitude = lng
d.Latitude = lat
plugin.Sugar().Debugf("未找到通道[%s],更新设备[%s]坐标成功\n", channelId, d.ID)
d.Debug("update device position success", zap.String("channelId", channelId))
}
}
@@ -474,20 +454,20 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
for _, v := range deviceList {
switch v.Event {
case "ON":
plugin.Debug("收到通道上线通知")
d.Debug("receive channel online notify")
d.channelOnline(v.DeviceID)
case "OFF":
plugin.Debug("收到通道离线通知")
d.Debug("receive channel offline notify")
d.channelOffline(v.DeviceID)
case "VLOST":
plugin.Debug("收到通道视频丢失通知")
d.Debug("receive channel video lost notify")
d.channelOffline(v.DeviceID)
case "DEFECT":
plugin.Debug("收到通道故障通知")
d.Debug("receive channel video defect notify")
d.channelOffline(v.DeviceID)
case "ADD":
plugin.Debug("收到通道新增通知")
channel := Channel{
d.Debug("receive channel add notify")
channel := ChannelInfo{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -503,15 +483,15 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
Secrecy: v.Secrecy,
Status: v.Status,
}
d.addOrUpdateChannel(&channel)
d.addOrUpdateChannel(channel)
case "DEL":
//删除
plugin.Debug("收到通道删除通知")
d.Debug("receive channel delete notify")
d.deleteChannel(v.DeviceID)
case "UPDATE":
plugin.Debug("收到通道更新通知")
d.Debug("receive channel update notify")
// 更新通道
channel := &Channel{
channel := ChannelInfo{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -527,26 +507,27 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
Secrecy: v.Secrecy,
Status: v.Status,
}
channels := []*Channel{channel}
d.UpdateChannels(channels)
d.UpdateChannels(channel)
}
}
}
func (d *Device) channelOnline(DeviceID string) {
if c, ok := d.ChannelMap[DeviceID]; ok {
if v, ok := d.channelMap.Load(DeviceID); ok {
c := v.(*Channel)
c.Status = "ON"
plugin.Sugar().Debugf("通道[%s]在线\n", c.Name)
c.Debug("online")
} else {
plugin.Sugar().Debugf("更新通道[%s]状态失败,未找到\n", DeviceID)
d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
}
}
func (d *Device) channelOffline(DeviceID string) {
if c, ok := d.ChannelMap[DeviceID]; ok {
if v, ok := d.channelMap.Load(DeviceID); ok {
c := v.(*Channel)
c.Status = "OFF"
plugin.Sugar().Debugf("通道[%s]离线\n", c.Name)
c.Debug("offline")
} else {
plugin.Sugar().Debugf("更新通道[%s]状态失败,未找到\n", DeviceID)
d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
}
}

7
go.mod
View File

@@ -8,10 +8,10 @@ require (
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/pion/rtp v1.7.13
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db
golang.org/x/net v0.8.0
golang.org/x/text v0.8.0
m7s.live/engine/v4 v4.12.0
m7s.live/engine/v4 v4.12.8
m7s.live/plugin/ps/v4 v4.0.1
)
require (
@@ -49,11 +49,12 @@ require (
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 // indirect
github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect

10
go.sum
View File

@@ -188,8 +188,8 @@ github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYm
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 h1:uyZY++dluUg7iTSsNzuOVln/mC2U2KXwgKLfKLCJ74Y=
github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc=
github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 h1:cj4I+bvWX9I+Hg6tnZ7DAiOVxzhyLhdvYVKp+WpM/2c=
github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
@@ -332,5 +332,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
m7s.live/engine/v4 v4.12.0 h1:CRPbJ0jhHVZArc5mvV7e6Seb4Ye816kGzs3FOVKnfHw=
m7s.live/engine/v4 v4.12.0/go.mod h1:AiJPBwdA77DM3fymlcH2qYPR8ivL6ib9UVLm1Rft/to=
m7s.live/engine/v4 v4.12.8 h1:cNGyajzEkbUzIcPtcedGbxvMlIuScWxDb/raYFgAHKE=
m7s.live/engine/v4 v4.12.8/go.mod h1:LoALBfV5rmsz5TJQr6cmLxM33mfUE5BKBq/sMtXOVlc=
m7s.live/plugin/ps/v4 v4.0.1 h1:iKgo9D4g6vo3I97Je1hG8v/6+IDRei7sHnTCYBEyasY=
m7s.live/plugin/ps/v4 v4.0.1/go.mod h1:lAPr3gGIFoU4ctMRnPeyjbcREueyT6TfiKhWBgDrOGM=

View File

@@ -6,7 +6,6 @@ import (
"encoding/xml"
"fmt"
"github.com/logrusorgru/aurora"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/utils"
@@ -32,7 +31,7 @@ func (a *Authorization) Verify(username, passwd, realm, nonce string) bool {
r2 := a.getDigest(s2)
if r1 == "" || r2 == "" {
plugin.Error("Authorization algorithm wrong")
GB28181Plugin.Error("Authorization algorithm wrong")
return false
}
//3、将密文 1nonce 和密文 2 依次组合获取 1 个字符串,并对这个字符串使用算法加密,获得密文 r3即Response
@@ -53,12 +52,15 @@ func (a *Authorization) getDigest(raw string) string {
}
func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
from, ok := req.From()
if !ok {
GB28181Plugin.Error("OnRegister", zap.String("error", "no from"))
return
}
id := from.Address.User().String()
plugin.Sugar().Infof("OnRegister: %s, %s, from: %s", req.Destination(), id, req.Source())
GB28181Plugin.Info("OnRegister", zap.String("id", id), zap.String("source", req.Source()), zap.String("destination", req.Destination()))
if len(id) != 20 {
plugin.Sugar().Infof("Wrong GB-28181 id: %s", id)
GB28181Plugin.Info("Wrong GB-28181", zap.String("id", id))
return
}
passAuth := false
@@ -148,7 +150,7 @@ func (d *Device) syncChannels() {
func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
id := from.Address.User().String()
plugin.Sugar().Debugf("SIP<-OnMessage from %s : %s", req.Source(), req.String())
GB28181Plugin.Debug("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
if v, ok := Devices.Load(id); ok {
d := v.(*Device)
switch d.Status {
@@ -163,13 +165,15 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
temp := &struct {
XMLName xml.Name
CmdType string
SN int // 请求序列号,一般用于对应 request 和 response
DeviceID string
DeviceName string
Manufacturer string
Model string
Channel string
DeviceList []*Channel `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"`
DeviceList []ChannelInfo `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"`
SumNum int // 录像结果的总数 SumNum录像结果会按照多条消息返回可用于判断是否全部返回
}{}
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
decoder.CharsetReader = charset.NewReaderLabel
@@ -177,7 +181,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body()))
if err != nil {
plugin.Error("decode catelog err", zap.Error(err))
GB28181Plugin.Error("decode catelog err", zap.Error(err))
}
}
var body string
@@ -185,24 +189,25 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
case "Keepalive":
d.LastKeepaliveAt = time.Now()
//callID !="" 说明是订阅的事件类型信息
if d.ChannelMap == nil || len(d.ChannelMap) == 0 {
if d.lastSyncTime.IsZero() {
go d.syncChannels()
} else {
for _, ch := range d.ChannelMap {
ch.TryAutoInvite()
}
d.channelMap.Range(func(key, value interface{}) bool {
if conf.InviteMode == INVIDE_MODE_AUTO {
value.(*Channel).TryAutoInvite(&InviteOptions{})
}
return true
})
}
//为什么要查找子码流?
//d.CheckSubStream()
//在KeepLive 进行位置订阅的处理,如果开启了自动订阅位置,则去订阅位置
if c.Position.AutosubPosition && time.Since(d.GpsTime) > c.Position.Interval*2 {
d.MobilePositionSubscribe(d.ID, c.Position.Expires, c.Position.Interval)
plugin.Sugar().Debugf("位置自动订阅,设备[%s]成功\n", d.ID)
GB28181Plugin.Debug("Mobile Position Subscribe", zap.String("deviceID", d.ID))
}
case "Catalog":
d.UpdateChannels(temp.DeviceList)
d.UpdateChannels(temp.DeviceList...)
case "RecordInfo":
d.UpdateRecord(temp.DeviceID, temp.RecordList)
RecordQueryLink.Put(d.ID, temp.DeviceID, temp.SN, temp.SumNum, temp.RecordList)
case "DeviceInfo":
// 主设备信息
d.Name = temp.DeviceName
@@ -212,7 +217,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
d.Status = "Alarmed"
body = BuildAlarmResponseXML(d.ID)
default:
plugin.Sugar().Warnf("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:\n", req.Body)
d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
tx.Respond(response)
return
@@ -250,7 +255,7 @@ func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body()))
if err != nil {
plugin.Error("decode catelog err", zap.Error(err))
GB28181Plugin.Error("decode catelog err", zap.Error(err))
}
}
var body string
@@ -264,7 +269,7 @@ func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
// case "Alarm":
// //报警事件通知 TODO
default:
plugin.Sugar().Warnf("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:", req.Body)
d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
tx.Respond(response)
return

View File

@@ -8,12 +8,14 @@ import (
)
type InviteOptions struct {
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
StreamPath string
recyclePort func(p uint16) (err error)
}
func (o InviteOptions) IsLive() bool {

124
link.go Normal file
View File

@@ -0,0 +1,124 @@
package gb28181
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
)
// 对于录像查询,通过 queryKey (即 deviceId + channelId + sn) 唯一区分一次请求和响应
// 并将其关联起来,以实现异步响应的目的
// 提供单例实例供调用
var RecordQueryLink = NewRecordQueryLink(time.Second * 60)
type recordQueryLink struct {
pendingResult map[string]recordQueryResult // queryKey 查询结果缓存
pendingResp map[string]recordQueryResp // queryKey 待回复的查询请求
timeout time.Duration // 查询结果的过期时间
sync.RWMutex
}
type recordQueryResult struct {
time time.Time
err error
sum int
finished bool
list []*Record
}
type recordQueryResp struct {
respChan chan<- recordQueryResult
timeout time.Duration
startTime time.Time
}
func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
c := &recordQueryLink{
timeout: resultTimeout,
pendingResult: make(map[string]recordQueryResult),
pendingResp: make(map[string]recordQueryResp),
}
go c.cleanTimeout()
return c
}
// 唯一区分一次录像查询
func recordQueryKey(deviceId, channelId string, sn int) string {
return fmt.Sprintf("%s-%s-%d", deviceId, channelId, sn)
}
// 定期清理过期的查询结果和请求
func (c *recordQueryLink) cleanTimeout() {
tick := time.NewTicker(time.Millisecond * 100)
for {
<-tick.C
for k, s := range c.pendingResp {
if time.Since(s.startTime) > s.timeout {
if r, ok := c.pendingResult[k]; ok {
c.notify(k, r)
} else {
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
}
}
}
for k, r := range c.pendingResult {
if time.Since(r.time) > c.timeout {
delete(c.pendingResult, k)
}
}
}
}
func (c *recordQueryLink) Put(deviceId, channelId string, sn int, sum int, record []*Record) {
key, r := c.doPut(deviceId, channelId, sn, sum, record)
if r.finished {
c.notify(key, r)
}
}
func (c *recordQueryLink) doPut(deviceId, channelId string, sn, sum int, record []*Record) (key string, r recordQueryResult) {
c.Lock()
defer c.Unlock()
key = recordQueryKey(deviceId, channelId, sn)
if v, ok := c.pendingResult[key]; ok {
r = v
} else {
r = recordQueryResult{time: time.Now(), sum: sum, list: make([]*Record, 0)}
}
r.list = append(r.list, record...)
if len(r.list) == sum {
r.finished = true
}
c.pendingResult[key] = r
GB28181Plugin.Logger.Debug("put record",
zap.String("key", key),
zap.Int("sum", sum),
zap.Int("count", len(r.list)))
return
}
func (c *recordQueryLink) WaitResult(
deviceId, channelId string, sn int,
timeout time.Duration) (resultCh <-chan recordQueryResult) {
key := recordQueryKey(deviceId, channelId, sn)
c.Lock()
defer c.Unlock()
respCh := make(chan recordQueryResult, 1)
resultCh = respCh
c.pendingResp[key] = recordQueryResp{startTime: time.Now(), timeout: timeout, respChan: respCh}
return
}
func (c *recordQueryLink) notify(key string, r recordQueryResult) {
if s, ok := c.pendingResp[key]; ok {
s.respChan <- r
}
c.Lock()
defer c.Unlock()
delete(c.pendingResp, key)
delete(c.pendingResult, key)
GB28181Plugin.Logger.Debug("record notify", zap.String("key", key))
}

86
main.go
View File

@@ -1,14 +1,15 @@
package gb28181
import (
"fmt"
"os"
"strings"
"sync"
"time"
myip "github.com/husanpao/ip"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
)
type GB28181PositionConfig struct {
@@ -18,19 +19,21 @@ type GB28181PositionConfig struct {
}
type GB28181Config struct {
AutoInvite bool `default:"true"`
PreFetchRecord bool
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
ListenAddr string `default:"0.0.0.0"`
InviteMode int `default:"1"` //邀请模式0:手动拉流1:预拉流2:按需拉流
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
ListenAddr string `default:"0.0.0.0"`
//sip服务器的配置
SipNetwork string `default:"udp"` //传输协议默认UDP可选TCP
SipIP string //sip 服务器公网IP
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
Username string //sip 服务器账号
Password string //sip 服务器密码
SipNetwork string `default:"udp"` //传输协议默认UDP可选TCP
SipIP string //sip 服务器公网IP
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
Username string //sip 服务器账号
Password string //sip 服务器密码
Port struct { // 新配置方式
Sip string `default:"udp:5060"`
Media string `default:"tcp:58200"`
}
// AckTimeout uint16 //sip 服务应答超时,单位秒
RegisterValidity time.Duration `default:"60s"` //注册有效期,单位秒,默认 3600
// RegisterInterval int //注册间隔,单位秒,默认 60
@@ -47,15 +50,16 @@ type GB28181Config struct {
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔
UdpCacheSize int //udp缓存大小
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
routes map[string]string
DumpPath string //dump PS流本地文件路径
RtpReorder bool `default:"true"`
config.Publish
Server
// UdpCacheSize int //udp缓存大小
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
routes map[string]string
DumpPath string //dump PS流本地文件路径
Ignores map[string]struct{}
tcpPorts PortManager
udpPorts PortManager
Position GB28181PositionConfig //关于定位的配置参数
}
func (c *GB28181Config) initRoutes() {
@@ -67,15 +71,48 @@ func (c *GB28181Config) initRoutes() {
c.routes[k[0:lastdot]] = k
}
}
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
GB28181Plugin.Info("LocalAndInternalIPs", zap.Any("routes", c.routes))
}
func (c *GB28181Config) OnEvent(event any) {
switch event.(type) {
switch e := event.(type) {
case FirstConfig:
if c.Port.Sip != "udp:5060" {
protocol, ports := util.Conf2Listener(c.Port.Sip)
c.SipNetwork = protocol
c.SipPort = ports[0]
}
if c.Port.Media != "tcp:58200" {
protocol, ports := util.Conf2Listener(c.Port.Media)
c.MediaNetwork = protocol
if len(ports) > 1 {
c.MediaPortMin = ports[0]
c.MediaPortMax = ports[1]
} else {
c.MediaPort = ports[0]
}
}
os.MkdirAll(c.DumpPath, 0766)
c.ReadDevices()
go c.initRoutes()
c.startServer()
case *Stream:
if c.InviteMode == INVIDE_MODE_ONSUBSCRIBE {
if channel := FindChannel(e.AppName, e.StreamName); channel != nil {
channel.TryAutoInvite(&InviteOptions{})
}
}
case SEpublish:
if channel := FindChannel(e.Target.AppName, strings.TrimSuffix(e.Target.StreamName, "/rtsp")); channel != nil {
channel.LiveSubSP = e.Target.Path
}
case SEclose:
if channel := FindChannel(e.Target.AppName, strings.TrimSuffix(e.Target.StreamName, "/rtsp")); channel != nil {
channel.LiveSubSP = ""
}
if v, ok := PullStreams.LoadAndDelete(e.Target.Path); ok {
go v.(*PullStream).Bye()
}
}
}
@@ -85,4 +122,5 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
var conf GB28181Config
var plugin = InstallPlugin(&conf)
var GB28181Plugin = InstallPlugin(&conf)
var PullStreams sync.Map //拉流

47
ptz.go Normal file
View File

@@ -0,0 +1,47 @@
package gb28181
import "fmt"
var (
name2code = map[string]uint8{
"stop": 0,
"right": 1,
"left": 2,
"down": 4,
"downright": 5,
"downleft": 6,
"up": 8,
"upright": 9,
"upleft": 10,
"zoomin": 16,
"zoomout": 32,
}
)
func toPtzStrByCmdName(cmdName string, horizontalSpeed, verticalSpeed, zoomSpeed uint8) (string, error) {
c, err := toPtzCode(cmdName)
if err != nil {
return "", err
}
return toPtzStr(c, horizontalSpeed, verticalSpeed, zoomSpeed), nil
}
func toPtzStr(cmdCode, horizontalSpeed, verticalSpeed, zoomSpeed uint8) string {
checkCode := uint16(0xA5+0x0F+0x01+cmdCode+horizontalSpeed+verticalSpeed+(zoomSpeed&0xF0)) % 0x100
return fmt.Sprintf("A50F01%02X%02X%02X%01X0%02X",
cmdCode,
horizontalSpeed,
verticalSpeed,
zoomSpeed>>4, // 根据 GB28181 协议zoom 只取 4 bit
checkCode,
)
}
func toPtzCode(cmd string) (uint8, error) {
if code, ok := name2code[cmd]; ok {
return code, nil
} else {
return 0, fmt.Errorf("invalid ptz cmd %q", cmd)
}
}

View File

@@ -1,236 +0,0 @@
package gb28181
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path/filepath"
"time"
"github.com/ghettovoice/gosip/sip"
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
)
type GBPublisher struct {
PSPublisher
*InviteOptions
channel *Channel
inviteRes sip.Response
udpCache *utils.PriorityQueueRtp
dumpFile *os.File
dumpPrint io.Writer
lastReceive time.Time
}
func (p *GBPublisher) PrintDump(s string) {
if p.dumpPrint != nil {
p.dumpPrint.Write([]byte(s))
}
}
func (p *GBPublisher) OnEvent(event any) {
if p.channel == nil {
// p.parser.EsHandler = p
p.IO.OnEvent(event)
return
}
switch event.(type) {
case IPublisher:
if p.IsLive() {
p.Type = "GB28181 Live"
p.channel.LivePublisher = p
} else {
p.Type = "GB28181 Playback"
p.channel.RecordPublisher = p
}
// p.parser.EsHandler = p
conf.publishers.Add(p.SSRC, p)
if err := error(nil); p.dump != "" {
fp := filepath.Join(p.dump, p.Stream.Path)
os.MkdirAll(filepath.Dir(fp), 0766)
if p.dumpFile, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
p.Error("open dump file failed", zap.Error(err))
}
}
case SEwaitPublish:
//掉线自动重新拉流
if p.IsLive() {
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
go p.channel.Invite(&InviteOptions{})
}
case SEclose, SEKick:
if p.IsLive() {
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
} else {
p.channel.RecordPublisher = nil
}
conf.publishers.Delete(p.SSRC)
if p.dumpFile != nil {
p.dumpFile.Close()
}
p.Bye()
}
p.Publisher.OnEvent(event)
}
func (p *GBPublisher) Bye() int {
res := p.inviteRes
if res == nil {
return 404
}
defer p.Stop()
p.inviteRes = nil
bye := p.channel.CreateRequst(sip.BYE)
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
resp, err := p.channel.device.SipRequestForResponse(bye)
if err != nil {
p.Error("Bye", zap.Error(err))
return ServerInternalError
}
return int(resp.StatusCode())
}
func (p *GBPublisher) Replay(f *os.File) (err error) {
var rtpPacket rtp.Packet
defer f.Close()
if p.dumpPrint != nil {
p.PrintDump(`<style type="text/css">
.gray {
color: gray;
}
</style>
`)
p.PrintDump("<table>")
defer p.PrintDump("</table>")
}
var t uint16
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = f.Read(l)
if err != nil {
return
}
payload := make([]byte, util.ReadBE[int](l[:4]))
t = util.ReadBE[uint16](l[4:])
p.PrintDump(fmt.Sprintf("[<b>%d</b> %d]", t, len(payload)))
_, err = f.Read(payload)
if err != nil {
return
}
rtpPacket.Unmarshal(payload)
p.PushPS(&rtpPacket)
}
return
}
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
var rtpPacket rtp.Packet
networkBuffer := 1048576
port, err = conf.udpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
conf.udpPorts.Recycle(port)
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
p.SetIO(conn)
go func() {
defer conn.Close()
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", port))
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
defer conf.udpPorts.Recycle(port)
dumpLen := make([]byte, 6)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
p.writeDump(ps, dumpLen)
p.PushPS(&rtpPacket)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
}
}()
return
}
func (p *GBPublisher) writeDump(ps util.Buffer, dumpLen []byte) {
if p.dumpFile != nil {
util.PutBE(dumpLen[:4], ps.Len())
if p.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dumpFile.Write(dumpLen)
p.dumpFile.Write(ps)
}
}
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
port, err = conf.tcpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
defer conf.tcpPorts.Recycle(port)
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
go func() {
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
defer conf.tcpPorts.Recycle(port)
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
conn, err := listen.Accept()
listen.Close()
p.SetIO(conn)
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
return
}
var rtpPacket rtp.Packet
ps := make(util.Buffer, 1024)
dumpLen := make([]byte, 6)
defer conn.Close()
for err == nil {
if _, err = io.ReadFull(conn, dumpLen[:2]); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
if _, err = io.ReadFull(conn, ps); err != nil {
return
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if !p.IsClosed() {
p.writeDump(ps, dumpLen)
p.PushPS(&rtpPacket)
}
}
}()
return
}

View File

@@ -1,8 +1,9 @@
package gb28181
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"
@@ -12,6 +13,7 @@ import (
func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
util.ReturnJson(func() (list []*Device) {
list = make([]*Device, 0)
Devices.Range(func(key, value interface{}) bool {
device := value.(*Device)
if time.Since(device.UpdateTime) > c.RegisterValidity {
@@ -26,12 +28,23 @@ func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
}
func (c *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
startTime := r.URL.Query().Get("startTime")
endTime := r.URL.Query().Get("endTime")
query := r.URL.Query()
id := query.Get("id")
channel := query.Get("channel")
startTime := query.Get("startTime")
endTime := query.Get("endTime")
trange := strings.Split(query.Get("range"), "-")
if len(trange) == 2 {
startTime = trange[0]
endTime = trange[1]
}
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.QueryRecord(startTime, endTime))
res, err := c.QueryRecord(startTime, endTime)
if err == nil {
WriteJSONOk(w, res)
} else {
WriteJSON(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.NotFound(w, r)
}
@@ -48,20 +61,63 @@ func (c *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
}
}
func (c *GB28181Config) API_ptz(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
id := q.Get("id")
channel := q.Get("channel")
cmd := q.Get("cmd") // 命令名称,见 ptz.go name2code 定义
hs := q.Get("hSpeed") // 水平速度
vs := q.Get("vSpeed") // 垂直速度
zs := q.Get("zSpeed") // 缩放速度
hsN, err := strconv.ParseUint(hs, 10, 8)
if err != nil {
WriteJSON(w, "hSpeed parameter is invalid", 400)
}
vsN, err := strconv.ParseUint(vs, 10, 8)
if err != nil {
WriteJSON(w, "vSpeed parameter is invalid", 400)
}
zsN, err := strconv.ParseUint(zs, 10, 8)
if err != nil {
WriteJSON(w, "zSpeed parameter is invalid", 400)
}
ptzcmd, err := toPtzStrByCmdName(cmd, uint8(hsN), uint8(vsN), uint8(zsN))
if err != nil {
WriteJSON(w, err.Error(), 400)
}
if c := FindChannel(id, channel); c != nil {
code := c.Control(ptzcmd)
WriteJSON(w, "device received", code)
} else {
WriteJSON(w, fmt.Sprintf("device %q channel %q not found", id, channel), 404)
}
}
func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
id := query.Get("id")
channel := query.Get("channel")
streamPath := query.Get("streamPath")
port, _ := strconv.Atoi(query.Get("mediaPort"))
opt := InviteOptions{
dump: query.Get("dump"),
MediaPort: uint16(port),
dump: query.Get("dump"),
MediaPort: uint16(port),
StreamPath: streamPath,
}
opt.Validate(query.Get("startTime"), query.Get("endTime"))
startTime := query.Get("startTime")
endTime := query.Get("endTime")
trange := strings.Split(query.Get("range"), "-")
if len(trange) == 2 {
startTime = trange[0]
endTime = trange[1]
}
opt.Validate(startTime, endTime)
if c := FindChannel(id, channel); c == nil {
http.NotFound(w, r)
} else if opt.IsLive() && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else if opt.IsLive() && c.status.Load() > 0 {
http.Error(w, "live stream already exists", http.StatusNotModified)
} else if code, err := c.Invite(&opt); err == nil {
w.WriteHeader(code)
} else {
@@ -69,48 +125,12 @@ func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
}
}
func (c *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
dump := r.URL.Query().Get("dump")
printOut := r.URL.Query().Get("print")
streamPath := r.URL.Query().Get("streamPath")
if dump == "" {
dump = c.DumpPath
}
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
if streamPath == "" {
if strings.HasPrefix(dump, "/") {
streamPath = "replay" + dump
} else {
streamPath = "replay/" + dump
}
}
var pub GBPublisher
pub.SetIO(f)
if err = plugin.Publish(streamPath, &pub); err == nil {
if printOut != "" {
pub.dumpPrint = w
pub.SetParentCtx(r.Context())
err = pub.Replay(f)
} else {
go pub.Replay(f)
w.Write([]byte("ok"))
}
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
// CORS(w, r)
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
live := r.URL.Query().Get("live")
streamPath := r.URL.Query().Get("streamPath")
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Bye(live != "false"))
w.WriteHeader(c.Bye(streamPath))
} else {
http.NotFound(w, r)
}
@@ -171,3 +191,13 @@ func (c *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Request)
return
}, c.Position.Interval, w, r)
}
func WriteJSONOk(w http.ResponseWriter, data interface{}) {
WriteJSON(w, data, 200)
}
func WriteJSON(w http.ResponseWriter, data interface{}, status int) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
}

127
server.go
View File

@@ -1,20 +1,14 @@
package gb28181
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
"github.com/logrusorgru/aurora"
"github.com/pion/rtp"
"go.uber.org/zap"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
"github.com/ghettovoice/gosip"
@@ -24,21 +18,14 @@ import (
var srv gosip.Server
type Server struct {
Ignores map[string]struct{}
publishers util.Map[uint32, *GBPublisher]
tcpPorts PortManager
udpPorts PortManager
}
const MaxRegisterCount = 3
func FindChannel(deviceId string, channelId string) (c *Channel) {
if v, ok := Devices.Load(deviceId); ok {
d := v.(*Device)
d.channelMutex.RLock()
c = d.ChannelMap[channelId]
d.channelMutex.RUnlock()
if v, ok := d.channelMap.Load(channelId); ok {
return v.(*Channel)
}
}
return
}
@@ -129,10 +116,9 @@ func RequestForResponse(transport string, request sip.Request,
}
func (c *GB28181Config) startServer() {
c.publishers.Init()
addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
logger := utils.NewZapLogger(plugin.Logger, "GB SIP Server", nil)
logger := utils.NewZapLogger(GB28181Plugin.Logger, "GB SIP Server", nil)
logger.SetLevel(levelMap[c.LogLevel])
// logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
srvConf := gosip.ServerConfig{}
@@ -146,113 +132,22 @@ func (c *GB28181Config) startServer() {
srv.OnRequest(sip.BYE, c.OnBye)
err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
if err != nil {
plugin.Logger.Error("gb28181 server listen", zap.Error(err))
GB28181Plugin.Logger.Error("gb28181 server listen", zap.Error(err))
} else {
plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
GB28181Plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
}
go c.startMediaServer()
if c.MediaNetwork == "tcp" {
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
} else {
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
}
if c.Username != "" || c.Password != "" {
go c.removeBanDevice()
}
}
func (c *GB28181Config) startMediaServer() {
if c.MediaNetwork == "tcp" {
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
if !c.tcpPorts.Valid {
c.listenMediaTCP()
}
} else {
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
if !c.udpPorts.Valid {
c.listenMediaUDP()
}
}
}
func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
var rtpPacket rtp.Packet
reader := bufio.NewReader(conn)
defer conn.Close()
var err error
dumpLen := make([]byte, 6)
ps := make(util.Buffer, 1024)
for err == nil {
if _, err = io.ReadFull(reader, dumpLen[:2]); err != nil {
return
}
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
if _, err = io.ReadFull(reader, ps); err != nil {
return
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
publisher.writeDump(ps, dumpLen)
publisher.PushPS(&rtpPacket)
} else {
plugin.Info("gb28181 publisher not found", zap.Uint32("ssrc", rtpPacket.SSRC))
}
}
}
func (c *GB28181Config) listenMediaTCP() {
addr := ":" + strconv.Itoa(int(c.MediaPort))
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
plugin.Error("MediaServer listened tcp err", zap.String("addr", addr), zap.Error(err))
return
}
plugin.Sugar().Infof("MediaServer started tcp at %s", addr)
defer listen.Close()
defer plugin.Info("MediaServer stopped tcp at", zap.Uint16("port", c.MediaPort))
for {
conn, err := listen.Accept()
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
}
go c.processTcpMediaConn(conn)
}
}
func (c *GB28181Config) listenMediaUDP() {
var rtpPacket rtp.Packet
networkBuffer := 1048576
addr := ":" + strconv.Itoa(int(c.MediaPort))
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
plugin.Error(" MediaServer started listening udp err", zap.String("addr", addr), zap.Error(err))
return
}
bufUDP := make([]byte, networkBuffer)
plugin.Sugar().Infof("MediaServer started at udp %s", addr)
defer plugin.Sugar().Infof("MediaServer stopped at udp %s", addr)
dumpLen := make([]byte, 6)
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
t := time.Now()
if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
publisher.writeDump(ps, dumpLen)
publisher.PushPS(&rtpPacket)
}
x := time.Since(t)
if x > time.Millisecond {
fmt.Println(x)
}
}
}
// func queryCatalog(config *transaction.Config) {
// t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
// for range t.C {