mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f487be5fdb | ||
|
|
bd70d24a16 | ||
|
|
708cd042df | ||
|
|
a69b739e5e | ||
|
|
4e96efa9ff | ||
|
|
3a704b68cc |
@@ -4,13 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"golang.org/x/exp/maps"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -260,9 +261,7 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
}
|
||||
if conf.AutoInvite && (c.LivePublisher == nil) {
|
||||
go c.Invite(InviteOptions{})
|
||||
}
|
||||
conf.TryAutoInvite(c)
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
|
||||
@@ -185,9 +185,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
go d.syncChannels()
|
||||
} else {
|
||||
for _, ch := range d.channelMap {
|
||||
if c.AutoInvite && (ch.LivePublisher == nil) {
|
||||
ch.Invite(InviteOptions{})
|
||||
}
|
||||
c.TryAutoInvite(ch)
|
||||
}
|
||||
}
|
||||
//为什么要查找子码流?
|
||||
|
||||
91
main.go
91
main.go
@@ -12,44 +12,45 @@ import (
|
||||
|
||||
type GB28181PositionConfig struct {
|
||||
AutosubPosition bool //是否自动订阅定位
|
||||
Expires time.Duration //订阅周期(单位:秒)
|
||||
Interval time.Duration //订阅间隔(单位:秒)
|
||||
Expires time.Duration `default:"3600s"` //订阅周期(单位:秒)
|
||||
Interval time.Duration `default:"6s"` //订阅间隔(单位:秒)
|
||||
}
|
||||
|
||||
type GB28181Config struct {
|
||||
AutoInvite bool
|
||||
AutoInvite bool `default:"true"`
|
||||
PreFetchRecord bool
|
||||
InviteIDs string
|
||||
|
||||
//sip服务器的配置
|
||||
SipNetwork string //传输协议,默认UDP,可选TCP
|
||||
SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP
|
||||
SipIP string //sip 服务器公网IP
|
||||
SipPort uint16 //sip 服务器端口,默认 5060
|
||||
Serial string //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string //sip 服务器域,默认 3402000000
|
||||
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
|
||||
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
|
||||
Username string //sip 服务器账号
|
||||
Password string //sip 服务器密码
|
||||
|
||||
// AckTimeout uint16 //sip 服务应答超时,单位秒
|
||||
RegisterValidity time.Duration //注册有效期,单位秒,默认 3600
|
||||
RegisterValidity time.Duration `default:"60s"` //注册有效期,单位秒,默认 3600
|
||||
// RegisterInterval int //注册间隔,单位秒,默认 60
|
||||
HeartbeatInterval time.Duration //心跳间隔,单位秒,默认 60
|
||||
HeartbeatInterval time.Duration `default:"60s"` //心跳间隔,单位秒,默认 60
|
||||
// HeartbeatRetry int //心跳超时次数,默认 3
|
||||
|
||||
//媒体服务器配置
|
||||
MediaIP string //媒体服务器地址
|
||||
MediaPort uint16 //媒体服务器端口
|
||||
MediaNetwork string //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPort uint16 `default:"58200"` //媒体服务器端口
|
||||
MediaNetwork string `default:"udp"` //媒体传输协议,默认UDP,可选TCP
|
||||
MediaPortMin uint16
|
||||
MediaPortMax uint16
|
||||
// MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
|
||||
|
||||
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
|
||||
RemoveBanInterval time.Duration //移除禁止设备间隔
|
||||
RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔
|
||||
UdpCacheSize int //udp缓存大小
|
||||
|
||||
config.Publish
|
||||
Server
|
||||
LogLevel string //trace, debug, info, warn, error, fatal, panic
|
||||
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
|
||||
routes map[string]string
|
||||
DumpPath string //dump PS流本地文件路径
|
||||
|
||||
@@ -80,33 +81,41 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
|
||||
return strings.ToLower(c.MediaNetwork) == "tcp"
|
||||
}
|
||||
|
||||
var conf = &GB28181Config{
|
||||
AutoInvite: true,
|
||||
PreFetchRecord: false,
|
||||
UdpCacheSize: 0,
|
||||
SipNetwork: "udp",
|
||||
SipIP: "",
|
||||
SipPort: 5060,
|
||||
Serial: "34020000002000000001",
|
||||
Realm: "3402000000",
|
||||
Username: "",
|
||||
Password: "",
|
||||
|
||||
// AckTimeout: 10,
|
||||
RegisterValidity: 60 * time.Second,
|
||||
// RegisterInterval: 60,
|
||||
HeartbeatInterval: 60 * time.Second,
|
||||
// HeartbeatRetry: 3,
|
||||
|
||||
MediaIP: "",
|
||||
MediaPort: 58200,
|
||||
// MediaIdleTimeout: 30,
|
||||
MediaNetwork: "udp",
|
||||
|
||||
RemoveBanInterval: 600 * time.Second,
|
||||
LogLevel: "info",
|
||||
// WaitKeyFrame: true,
|
||||
Position: GB28181PositionConfig{AutosubPosition: false, Expires: 3600 * time.Second, Interval: 6 * time.Second},
|
||||
func (c *GB28181Config) TryAutoInvite(ch *Channel) {
|
||||
if c.AutoInvite && (ch.LivePublisher == nil) && c.CanInvite(ch.DeviceID) {
|
||||
go ch.Invite(InviteOptions{})
|
||||
}
|
||||
}
|
||||
|
||||
var plugin = InstallPlugin(conf)
|
||||
func (c *GB28181Config) CanInvite(deviceID string) bool {
|
||||
if len(deviceID) != 20 {
|
||||
return false
|
||||
}
|
||||
|
||||
if c.InviteIDs == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
// 11~13位是设备类型编码
|
||||
typeID := deviceID[10:13]
|
||||
|
||||
// format: start-end,type1,type2
|
||||
tokens := strings.Split(c.InviteIDs, ",")
|
||||
for _, tok := range tokens {
|
||||
if first, second, ok := strings.Cut(tok, "-"); ok {
|
||||
if typeID >= first && typeID <= second {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if typeID == first {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
var conf GB28181Config
|
||||
|
||||
var plugin = InstallPlugin(&conf)
|
||||
|
||||
38
publisher.go
38
publisher.go
@@ -13,6 +13,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
. "m7s.live/engine/v4/codec"
|
||||
"m7s.live/engine/v4/codec/mpegps"
|
||||
"m7s.live/engine/v4/codec/mpegts"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
@@ -24,7 +25,7 @@ type GBPublisher struct {
|
||||
InviteOptions
|
||||
channel *Channel
|
||||
inviteRes sip.Response
|
||||
parser *utils.DecPSPackage
|
||||
parser mpegps.MpegPsStream
|
||||
lastSeq uint16
|
||||
udpCache *utils.PriorityQueueRtp
|
||||
dumpFile *os.File
|
||||
@@ -41,6 +42,7 @@ func (p *GBPublisher) PrintDump(s string) {
|
||||
|
||||
func (p *GBPublisher) OnEvent(event any) {
|
||||
if p.channel == nil {
|
||||
// p.parser.EsHandler = p
|
||||
p.IO.OnEvent(event)
|
||||
return
|
||||
}
|
||||
@@ -53,6 +55,7 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.Type = "GB28181 Playback"
|
||||
p.channel.RecordPublisher = p
|
||||
}
|
||||
// p.parser.EsHandler = p
|
||||
conf.publishers.Add(p.SSRC, p)
|
||||
if err := error(nil); p.dump != "" {
|
||||
if p.dumpFile, err = os.OpenFile(p.dump, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
|
||||
@@ -108,9 +111,9 @@ func (p *GBPublisher) Bye() int {
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
func (p *GBPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
|
||||
if p.VideoTrack == nil {
|
||||
switch p.parser.VideoStreamType {
|
||||
switch es.Type {
|
||||
case mpegts.STREAM_TYPE_H264:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
case mpegts.STREAM_TYPE_H265:
|
||||
@@ -118,7 +121,7 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
default:
|
||||
//推测编码类型
|
||||
var maybe264 H264NALUType
|
||||
maybe264 = maybe264.Parse(payload[4])
|
||||
maybe264 = maybe264.Parse(es.Buffer[4])
|
||||
switch maybe264 {
|
||||
case NALU_Non_IDR_Picture,
|
||||
NALU_IDR_Picture,
|
||||
@@ -133,6 +136,7 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
}
|
||||
}
|
||||
}
|
||||
payload, pts, dts := es.Buffer, es.PTS, es.DTS
|
||||
if len(payload) > 10 {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
} else {
|
||||
@@ -146,25 +150,26 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
// }
|
||||
p.WriteAnnexB(pts, dts, payload)
|
||||
}
|
||||
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
|
||||
func (p *GBPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
|
||||
ts, payload := es.PTS, es.Buffer
|
||||
if p.AudioTrack == nil {
|
||||
switch p.parser.AudioStreamType {
|
||||
switch es.Type {
|
||||
case mpegts.STREAM_TYPE_G711A:
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, true, uint32(90000))
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, true)
|
||||
case mpegts.STREAM_TYPE_G711U:
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, false, uint32(90000))
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, false)
|
||||
case mpegts.STREAM_TYPE_AAC:
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream, uint32(90000))
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(ts, payload)
|
||||
case 0: //推测编码类型
|
||||
if payload[0] == 0xff && payload[1]>>4 == 0xf {
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream, uint32(90000))
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(ts, payload)
|
||||
}
|
||||
default:
|
||||
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
|
||||
p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
|
||||
}
|
||||
} else if p.parser.AudioStreamType == mpegts.STREAM_TYPE_AAC {
|
||||
} else if es.Type == mpegts.STREAM_TYPE_AAC {
|
||||
p.WriteADTS(ts, payload)
|
||||
} else {
|
||||
p.WriteRaw(ts, payload)
|
||||
@@ -173,11 +178,12 @@ func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
|
||||
|
||||
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
|
||||
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
if p.parser == nil {
|
||||
p.parser = utils.NewDecPSPackage(p)
|
||||
if p.parser.EsHandler == nil {
|
||||
p.parser.EsHandler = p
|
||||
p.lastSeq = rtp.SequenceNumber - 1
|
||||
}
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
p.parser.Feed(rtp)
|
||||
p.parser.Feed(rtp.Payload)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
} else {
|
||||
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
|
||||
@@ -188,7 +194,7 @@ func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
p.SetLostFlag()
|
||||
}
|
||||
}
|
||||
p.parser.Feed(rtp)
|
||||
p.parser.Feed(rtp.Payload)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user