Compare commits

...

1 Commits

6 changed files with 60 additions and 74 deletions

View File

@@ -18,59 +18,33 @@ _ "m7s.live/plugin/gb28181/v4"
```yaml
gb28181:
autoinvite: true
autoinvite: true #表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流
position:
autosubposition: false #是否自动订阅定位
expires: 3600s #订阅周期(单位:秒)默认3600
interval: 6s #订阅间隔单位默认6
prefetchrecord: false
udpcachesize: 0
prefetchrecord: false
udpcachesize: 0 #表示UDP缓存大小默认为0不开启。仅当TCP关闭切缓存大于0时才开启
sipnetwork: udp
sipip: ""
sipip: "" #sip服务器地址 默认 自动适配设备网段
sipport: 5060
serial: "34020000002000000001"
realm: "3402000000"
username: ""
password: ""
registervalidity: 60s
registervalidity: 60s #注册有效期
mediaip: ""
mediaport: 58200
mediaidletimeout: 30
medianetwork: udp
mediaportmin: 0
meidaportmax: 0
mediaip: "" #媒体服务器地址 默认 自动适配设备网段
mediaport: 58200 #媒体服务器端口,用于接收设备的流
medianetwork: tcp
mediaportmin: 0 #媒体服务器端口范围最小值,设置后将开启端口范围模式
mediaportmax: 0 #媒体服务器端口范围最大值,设置后将开启端口范围模式
removebaninterval: 10m
removebaninterval: 10m #定时移除注册失败的设备黑名单单位秒默认10分钟600秒
loglevel: info
```
- `AutoInvite` bool 表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流
- `PreFetchRecord` bool
* sip服务器的配置
- `SipNetwork` string 传输协议默认UDP可选TCP
- `SipIP` string sip 服务器公网IP 默认 自动适配设备网段
- `SipPort` uint16 sip 服务器端口,默认 5060
- `Serial` string sip 服务器 id, 默认 34020000002000000001
- `Realm` string sip 服务器域,默认 3402000000
- `Username` string sip 服务器账号
- `Password` string sip 服务器密码
- `RegisterValidity` time.Duration 注册有效期,单位秒,默认 60
* 媒体服务器配置
- `MediaIP` string 媒体服务器地址 默认 自动适配设备网段
- `MediaPort` uint16 媒体服务器端口
- `MediaNetwork` string 媒体传输协议默认UDP可选TCP
- `MediaIdleTimeout` uint16 推流超时时间,超过则断开链接,让设备重连
- `MediaPortMin` uint16 媒体服务器端口范围最小值
- `MediaPortMax` uint16 媒体服务器端口范围最大值
- `LogLevel` string 日志级别,默认 infotracedebuginfowarnerrorfatal, panic
- `RemoveBanInterval` time.Duration 定时移除注册失败的设备黑名单单位秒默认10分钟600秒
- `UdpCacheSize` int 表示UDP缓存大小默认为0不开启。仅当TCP关闭切缓存大于0时才开启会最多缓存最多N个包并排序修复乱序造成的无法播放问题注意开启后会有一定的性能损耗并丢失部分包。
**如果配置了端口范围,将采用范围端口机制,每一个流对应一个端口
**注意某些摄像机没有设置用户名的地方摄像机会以自身的国标id作为用户名这个时候m7s会忽略使用摄像机的用户名忽略配置的用户名**

4
go.mod
View File

@@ -11,12 +11,14 @@ require (
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db
golang.org/x/net v0.7.0
golang.org/x/text v0.7.0
m7s.live/engine/v4 v4.11.15
m7s.live/engine/v4 v4.11.18
)
require (
github.com/aler9/gortsplib v1.0.1 // indirect
github.com/aler9/gortsplib/v2 v2.1.4 // indirect
github.com/cnotch/ipchub v1.1.0 // indirect
github.com/denisbrodbeck/machineid v1.0.1 // indirect
github.com/discoviking/fsm v0.0.0-20150126104936-f4a273feecca // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect

8
go.sum
View File

@@ -1,4 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aler9/gortsplib v1.0.1 h1:R13+hxlvg2Hvu98+0hzg0o5fPjyUA9ZPJneMIBxKGXk=
github.com/aler9/gortsplib v1.0.1/go.mod h1:BOWNZ/QBkY/eVcRqUzJbPFEsRJshwxaxBT01K260Jeo=
github.com/aler9/gortsplib/v2 v2.1.4 h1:A4C4Qxz3aQibphXoKsifwKmKZRY7leaO3jHkA+SQ2kw=
github.com/aler9/gortsplib/v2 v2.1.4/go.mod h1:Eegw8PWa8hNYXiYMlbK3RX1gr7+r25MxniAPGA+kKUE=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
@@ -16,6 +18,8 @@ github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ=
github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI=
github.com/discoviking/fsm v0.0.0-20150126104936-f4a273feecca h1:cTTdXpkQ1aVbOOmHwdwtYuwUZcQtcMrleD1UXLWhAq8=
github.com/discoviking/fsm v0.0.0-20150126104936-f4a273feecca/go.mod h1:W+3LQaEkN8qAwwcw0KC546sUEnX86GIT8CcMLZC4mG0=
github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug=
@@ -330,5 +334,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
m7s.live/engine/v4 v4.11.15 h1:Hwcfsw1XK63tSJlt7oI+2NY+DXds1urSZJtjQqeB0T8=
m7s.live/engine/v4 v4.11.15/go.mod h1:0gK75fj3GjUcVX5Tu/zC7MSob5nFnA1BYTeMt3w7uMU=
m7s.live/engine/v4 v4.11.18 h1:tgINiHqrrLdWEnoFZ0n2gSwoQwPwBgz+V/NXgo5K4Ks=
m7s.live/engine/v4 v4.11.18/go.mod h1:YoOThdhOpkf7MUDWciy449vfBF7i1p+dtf5o32hOvXY=

View File

@@ -2,6 +2,7 @@ package gb28181
import (
"fmt"
"os"
"strings"
"time"
@@ -20,7 +21,7 @@ type GB28181Config struct {
AutoInvite bool `default:"true"`
PreFetchRecord bool
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
ListenAddr string `default:"0.0.0.0"`
//sip服务器的配置
SipNetwork string `default:"udp"` //传输协议默认UDP可选TCP
SipIP string //sip 服务器公网IP
@@ -39,7 +40,7 @@ type GB28181Config struct {
//媒体服务器配置
MediaIP string //媒体服务器地址
MediaPort uint16 `default:"58200"` //媒体服务器端口
MediaNetwork string `default:"udp"` //媒体传输协议默认UDP可选TCP
MediaNetwork string `default:"tcp"` //媒体传输协议默认UDP可选TCP
MediaPortMin uint16
MediaPortMax uint16
// MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
@@ -71,6 +72,7 @@ func (c *GB28181Config) initRoutes() {
func (c *GB28181Config) OnEvent(event any) {
switch event.(type) {
case FirstConfig:
os.MkdirAll(c.DumpPath, 0766)
c.ReadDevices()
go c.initRoutes()
c.startServer()

View File

@@ -6,6 +6,7 @@ import (
"io"
"net"
"os"
"path/filepath"
"time"
"github.com/ghettovoice/gosip/sip"
@@ -51,7 +52,9 @@ func (p *GBPublisher) OnEvent(event any) {
// p.parser.EsHandler = p
conf.publishers.Add(p.SSRC, p)
if err := error(nil); p.dump != "" {
if p.dumpFile, err = os.OpenFile(p.dump, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
fp := filepath.Join(p.dump, p.Stream.Path)
os.MkdirAll(filepath.Dir(fp), 0766)
if p.dumpFile, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
p.Error("open dump file failed", zap.Error(err))
}
}
@@ -165,24 +168,26 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
if p.dumpFile != nil {
util.PutBE(dumpLen[:4], n)
if p.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dumpFile.Write(dumpLen)
p.dumpFile.Write(ps)
}
p.writeDump(ps, dumpLen)
p.PushPS(&rtpPacket)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
}
}()
return
}
func (p *GBPublisher) writeDump(ps util.Buffer, dumpLen []byte) {
if p.dumpFile != nil {
util.PutBE(dumpLen[:4], ps.Len())
if p.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dumpFile.Write(dumpLen)
p.dumpFile.Write(ps)
}
}
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
port, err = conf.tcpPorts.GetPort()
if err != nil {
@@ -208,19 +213,21 @@ func (p *GBPublisher) ListenTCP() (port uint16, err error) {
return
}
var rtpPacket rtp.Packet
lenBuf := make([]byte, 2)
ps := make(util.Buffer, 1024)
dumpLen := make([]byte, 6)
defer conn.Close()
for err == nil {
if _, err = io.ReadFull(conn, lenBuf); err != nil {
if _, err = io.ReadFull(conn, dumpLen[:2]); err != nil {
return
}
ps := make([]byte, binary.BigEndian.Uint16(lenBuf))
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
if _, err = io.ReadFull(conn, ps); err != nil {
return
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if !p.IsClosed() {
p.writeDump(ps, dumpLen)
p.PushPS(&rtpPacket)
}
}

View File

@@ -130,7 +130,7 @@ func RequestForResponse(transport string, request sip.Request,
func (c *GB28181Config) startServer() {
c.publishers.Init()
addr := "0.0.0.0:" + strconv.Itoa(int(c.SipPort))
addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
logger := utils.NewZapLogger(plugin.Logger, "GB SIP Server", nil)
logger.SetLevel(levelMap[c.LogLevel])
@@ -175,23 +175,25 @@ func (c *GB28181Config) startMediaServer() {
func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
var rtpPacket rtp.Packet
reader := bufio.NewReader(conn)
lenBuf := make([]byte, 2)
defer conn.Close()
var err error
dumpLen := make([]byte, 6)
ps := make(util.Buffer, 1024)
for err == nil {
if _, err = io.ReadFull(reader, lenBuf); err != nil {
if _, err = io.ReadFull(reader, dumpLen[:2]); err != nil {
return
}
ps.Reset()
ps.Glow(int(binary.BigEndian.Uint16(lenBuf)))
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
if _, err = io.ReadFull(reader, ps); err != nil {
return
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
publisher.writeDump(ps, dumpLen)
publisher.PushPS(&rtpPacket)
} else {
plugin.Info("gb28181 publisher not found", zap.Uint32("ssrc", rtpPacket.SSRC))
}
}
}
@@ -239,20 +241,15 @@ func (c *GB28181Config) listenMediaUDP() {
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
t := time.Now()
if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
if publisher.dumpFile != nil {
util.PutBE(dumpLen[:4], n)
if publisher.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(publisher.lastReceive).Milliseconds()))
}
publisher.lastReceive = time.Now()
publisher.dumpFile.Write(dumpLen)
publisher.dumpFile.Write(ps)
}
publisher.writeDump(ps, dumpLen)
publisher.PushPS(&rtpPacket)
}
x := time.Since(t)
if x > time.Millisecond {
fmt.Println(x)
}
}
}