mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
feat: change to use ps plugin
This commit is contained in:
152
channel.go
152
channel.go
@@ -5,27 +5,55 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
"m7s.live/plugin/ps/v4"
|
||||
)
|
||||
|
||||
type PullStream struct {
|
||||
opt *InviteOptions
|
||||
channel *Channel
|
||||
inviteRes sip.Response
|
||||
}
|
||||
|
||||
func (p *PullStream) Bye() int {
|
||||
res := p.inviteRes
|
||||
bye := p.channel.CreateRequst(sip.BYE)
|
||||
from, _ := res.From()
|
||||
to, _ := res.To()
|
||||
callId, _ := res.CallID()
|
||||
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
|
||||
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
|
||||
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
|
||||
resp, err := p.channel.device.SipRequestForResponse(bye)
|
||||
if p.opt.IsLive() {
|
||||
p.channel.status.Store(0)
|
||||
// defer p.channel.TryAutoInvite(p.opt)
|
||||
}
|
||||
if p.opt.recyclePort != nil {
|
||||
p.opt.recyclePort(p.opt.MediaPort)
|
||||
}
|
||||
if err != nil {
|
||||
return ServerInternalError
|
||||
}
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
type ChannelEx struct {
|
||||
device *Device
|
||||
RecordPublisher *GBPublisher `json:"-" yaml:"-"`
|
||||
LivePublisher *GBPublisher
|
||||
LiveSubSP string //实时子码流
|
||||
device *Device // 所属设备
|
||||
status atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放
|
||||
LiveSubSP string // 实时子码流,通过rtsp
|
||||
Records []*Record
|
||||
RecordStartTime string
|
||||
RecordEndTime string
|
||||
recordStartTime time.Time
|
||||
recordEndTime time.Time
|
||||
liveInviteLock *sync.Mutex
|
||||
tcpPortIndex uint16
|
||||
GpsTime time.Time //gps时间
|
||||
Longitude string //经度
|
||||
Latitude string //纬度
|
||||
@@ -203,18 +231,26 @@ f = v/a/编码格式/码率大小/采样率
|
||||
f字段中视、音频参数段之间不需空格分割。
|
||||
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
|
||||
*/
|
||||
|
||||
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
if opt.IsLive() {
|
||||
if !channel.liveInviteLock.TryLock() {
|
||||
if !channel.status.CompareAndSwap(0, 1) {
|
||||
return 304, nil
|
||||
}
|
||||
defer func() {
|
||||
if code != OK {
|
||||
channel.liveInviteLock.Unlock()
|
||||
if err != nil {
|
||||
channel.status.Store(0)
|
||||
if conf.InviteMode == 1 {
|
||||
// 5秒后重试
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
channel.Invite(opt)
|
||||
})
|
||||
}
|
||||
} else {
|
||||
channel.status.Store(2)
|
||||
}
|
||||
}()
|
||||
}
|
||||
channel.Bye(opt.IsLive())
|
||||
d := channel.device
|
||||
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
|
||||
s := "Play"
|
||||
@@ -223,37 +259,36 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
s = "Playback"
|
||||
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
|
||||
}
|
||||
if opt.StreamPath != "" {
|
||||
streamPath = opt.StreamPath
|
||||
}
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
publisher := &GBPublisher{
|
||||
InviteOptions: opt,
|
||||
channel: channel,
|
||||
}
|
||||
publisher.DisableReorder = !conf.RtpReorder
|
||||
protocol := ""
|
||||
networkType := "udp"
|
||||
resuePort := true
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
networkType = "tcp"
|
||||
protocol = "TCP/"
|
||||
if conf.tcpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenTCP()
|
||||
if err != nil {
|
||||
return ServerInternalError, err
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
opt.MediaPort, err = conf.tcpPorts.GetPort()
|
||||
opt.recyclePort = conf.tcpPorts.Recycle
|
||||
resuePort = false
|
||||
}
|
||||
publisher.DisableReorder = true
|
||||
} else {
|
||||
if conf.udpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenUDP()
|
||||
if err != nil {
|
||||
code = ServerInternalError
|
||||
return
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
opt.MediaPort, err = conf.udpPorts.GetPort()
|
||||
opt.recyclePort = conf.udpPorts.Recycle
|
||||
resuePort = false
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
|
||||
sdpInfo := []string{
|
||||
"v=0",
|
||||
@@ -266,7 +301,6 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
"a=recvonly",
|
||||
"a=rtpmap:96 PS/90000",
|
||||
"y=" + opt.ssrc,
|
||||
"",
|
||||
}
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
@@ -275,22 +309,22 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
contentType := sip.ContentType("application/sdp")
|
||||
invite.AppendHeader(&contentType)
|
||||
|
||||
invite.SetBody(strings.Join(sdpInfo, "\r\n"), true)
|
||||
invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true)
|
||||
|
||||
subject := sip.GenericHeader{
|
||||
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
|
||||
}
|
||||
invite.AppendHeader(&subject)
|
||||
publisher.inviteRes, err = d.SipRequestForResponse(invite)
|
||||
inviteRes, err := d.SipRequestForResponse(invite)
|
||||
if err != nil {
|
||||
plugin.Error(fmt.Sprintf("SIP->Invite %s :%s invite error: %s", channel.DeviceID, invite.String(), err.Error()))
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
code = int(publisher.inviteRes.StatusCode())
|
||||
code = int(inviteRes.StatusCode())
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
|
||||
|
||||
if code == OK {
|
||||
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
|
||||
ds := strings.Split(inviteRes.Body(), "\r\n")
|
||||
for _, l := range ds {
|
||||
if ls := strings.Split(l, "="); len(ls) > 1 {
|
||||
if ls[0] == "y" && len(ls[1]) > 0 {
|
||||
@@ -303,46 +337,42 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
// publisher.udpCache = utils.NewPqRtp()
|
||||
// }
|
||||
if err = plugin.Publish(streamPath, publisher); err != nil {
|
||||
code = ServerInternalError
|
||||
return
|
||||
err = ps.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, resuePort)
|
||||
if err == nil {
|
||||
PullStreams.Store(streamPath, &PullStream{
|
||||
opt: opt,
|
||||
channel: channel,
|
||||
inviteRes: inviteRes,
|
||||
})
|
||||
err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil))
|
||||
}
|
||||
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
|
||||
srv.Send(ack)
|
||||
} else if channel.CanInvite() {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
channel.TryAutoInvite()
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (channel *Channel) Bye(live bool) int {
|
||||
func (channel *Channel) Bye(streamPath string) int {
|
||||
d := channel.device
|
||||
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Close()
|
||||
if streamPath == "" {
|
||||
streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
|
||||
}
|
||||
if live && channel.LivePublisher != nil {
|
||||
return channel.LivePublisher.Bye()
|
||||
if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded {
|
||||
s.(*PullStream).Bye()
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Close()
|
||||
}
|
||||
return http.StatusOK
|
||||
}
|
||||
if !live && channel.RecordPublisher != nil {
|
||||
return channel.RecordPublisher.Bye()
|
||||
}
|
||||
return 404
|
||||
return http.StatusNotFound
|
||||
}
|
||||
|
||||
func (channel *Channel) TryAutoInvite() {
|
||||
if conf.AutoInvite && channel.CanInvite() {
|
||||
go channel.Invite(&InviteOptions{})
|
||||
func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
|
||||
if conf.InviteMode == 1 && channel.CanInvite() {
|
||||
go channel.Invite(opt)
|
||||
}
|
||||
}
|
||||
|
||||
func (channel *Channel) CanInvite() bool {
|
||||
if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
|
||||
if channel.status.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
65
device.go
65
device.go
@@ -10,8 +10,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -62,8 +60,7 @@ type Device struct {
|
||||
sipIP string //设备对应网卡的服务器ip
|
||||
mediaIP string //设备对应网卡的服务器ip
|
||||
NetAddr string
|
||||
ChannelMap map[string]*Channel
|
||||
channelMutex sync.RWMutex
|
||||
channelMap sync.Map
|
||||
subscriber struct {
|
||||
CallID string
|
||||
Timeout time.Time
|
||||
@@ -76,13 +73,18 @@ type Device struct {
|
||||
|
||||
func (d *Device) MarshalJSON() ([]byte, error) {
|
||||
type Alias Device
|
||||
return json.Marshal(&struct {
|
||||
data := &struct {
|
||||
Channels []*Channel
|
||||
*Alias
|
||||
}{
|
||||
Channels: maps.Values(d.ChannelMap),
|
||||
Alias: (*Alias)(d),
|
||||
Alias: (*Alias)(d),
|
||||
}
|
||||
d.channelMap.Range(func(key, value interface{}) bool {
|
||||
c := value.(*Channel)
|
||||
data.Channels = append(data.Channels, c)
|
||||
return true
|
||||
})
|
||||
return json.Marshal(data)
|
||||
}
|
||||
func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
from, _ := req.From()
|
||||
@@ -116,9 +118,6 @@ func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
d.mediaIP = mediaIp
|
||||
d.NetAddr = deviceIp
|
||||
d.UpdateTime = time.Now()
|
||||
if d.ChannelMap == nil {
|
||||
d.ChannelMap = make(map[string]*Channel)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
|
||||
@@ -164,7 +163,6 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
|
||||
sipIP: sipIP,
|
||||
mediaIP: mediaIp,
|
||||
NetAddr: deviceIp,
|
||||
ChannelMap: make(map[string]*Channel),
|
||||
}
|
||||
Devices.Store(id, d)
|
||||
c.SaveDevices()
|
||||
@@ -200,35 +198,27 @@ func (c *GB28181Config) SaveDevices() {
|
||||
}
|
||||
|
||||
func (d *Device) addOrUpdateChannel(channel *Channel) {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
if old, ok := d.channelMap.Load(channel.DeviceID); ok {
|
||||
channel.ChannelEx = old.(*Channel).ChannelEx
|
||||
}
|
||||
channel.device = d
|
||||
if old, ok := d.ChannelMap[channel.DeviceID]; ok {
|
||||
//复制锁指针
|
||||
channel.ChannelEx = old.ChannelEx
|
||||
}
|
||||
if channel.liveInviteLock == nil {
|
||||
channel.liveInviteLock = &sync.Mutex{}
|
||||
}
|
||||
d.ChannelMap[channel.DeviceID] = channel
|
||||
d.channelMap.Store(channel.DeviceID, channel)
|
||||
}
|
||||
|
||||
func (d *Device) deleteChannel(DeviceID string) {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
delete(d.ChannelMap, DeviceID)
|
||||
d.channelMap.Delete(DeviceID)
|
||||
}
|
||||
|
||||
func (d *Device) CheckSubStream() {
|
||||
d.channelMutex.Lock()
|
||||
defer d.channelMutex.Unlock()
|
||||
for _, c := range d.ChannelMap {
|
||||
d.channelMap.Range(func(key, value any) bool {
|
||||
c := value.(*Channel)
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
c.LiveSubSP = ""
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
func (d *Device) UpdateChannels(list []*Channel) {
|
||||
|
||||
@@ -265,7 +255,7 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
}
|
||||
c.TryAutoInvite()
|
||||
c.TryAutoInvite(&InviteOptions{})
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
@@ -274,11 +264,11 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
}
|
||||
}
|
||||
func (d *Device) UpdateRecord(channelId string, list []*Record) {
|
||||
d.channelMutex.RLock()
|
||||
if c, ok := d.ChannelMap[channelId]; ok {
|
||||
d.channelMap.Range(func(key, value any) bool {
|
||||
c := value.(*Channel)
|
||||
c.Records = append(c.Records, list...)
|
||||
}
|
||||
d.channelMutex.RUnlock()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
|
||||
@@ -455,7 +445,8 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
|
||||
|
||||
// UpdateChannelPosition 更新通道GPS坐标
|
||||
func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
|
||||
if c, ok := d.ChannelMap[channelId]; ok {
|
||||
if v, ok := d.channelMap.Load(channelId); ok {
|
||||
c := v.(*Channel)
|
||||
c.ChannelEx.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
|
||||
c.ChannelEx.Longitude = lng
|
||||
c.ChannelEx.Latitude = lat
|
||||
@@ -534,7 +525,8 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
}
|
||||
|
||||
func (d *Device) channelOnline(DeviceID string) {
|
||||
if c, ok := d.ChannelMap[DeviceID]; ok {
|
||||
if v, ok := d.channelMap.Load(DeviceID); ok {
|
||||
c := v.(*Channel)
|
||||
c.Status = "ON"
|
||||
plugin.Sugar().Debugf("通道[%s]在线\n", c.Name)
|
||||
} else {
|
||||
@@ -543,7 +535,8 @@ func (d *Device) channelOnline(DeviceID string) {
|
||||
}
|
||||
|
||||
func (d *Device) channelOffline(DeviceID string) {
|
||||
if c, ok := d.ChannelMap[DeviceID]; ok {
|
||||
if v, ok := d.channelMap.Load(DeviceID); ok {
|
||||
c := v.(*Channel)
|
||||
c.Status = "OFF"
|
||||
plugin.Sugar().Debugf("通道[%s]离线\n", c.Name)
|
||||
} else {
|
||||
|
||||
10
handle.go
10
handle.go
@@ -185,12 +185,14 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
|
||||
case "Keepalive":
|
||||
d.LastKeepaliveAt = time.Now()
|
||||
//callID !="" 说明是订阅的事件类型信息
|
||||
if d.ChannelMap == nil || len(d.ChannelMap) == 0 {
|
||||
if d.lastSyncTime.IsZero() {
|
||||
go d.syncChannels()
|
||||
} else {
|
||||
for _, ch := range d.ChannelMap {
|
||||
ch.TryAutoInvite()
|
||||
}
|
||||
d.channelMap.Range(func(key, value interface{}) bool {
|
||||
channel := value.(*Channel)
|
||||
channel.TryAutoInvite(&InviteOptions{})
|
||||
return true
|
||||
})
|
||||
}
|
||||
//为什么要查找子码流?
|
||||
//d.CheckSubStream()
|
||||
|
||||
@@ -8,12 +8,14 @@ import (
|
||||
)
|
||||
|
||||
type InviteOptions struct {
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
StreamPath string
|
||||
recyclePort func(p uint16) (err error)
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
|
||||
46
main.go
46
main.go
@@ -4,11 +4,11 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
myip "github.com/husanpao/ip"
|
||||
. "m7s.live/engine/v4"
|
||||
"m7s.live/engine/v4/config"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
@@ -19,18 +19,19 @@ type GB28181PositionConfig struct {
|
||||
}
|
||||
|
||||
type GB28181Config struct {
|
||||
AutoInvite bool `default:"true"`
|
||||
// AutoInvite bool `default:"true"`
|
||||
InviteMode int `default:"1"` //邀请模式,0:手动拉流,1:预拉流,2:按需拉流
|
||||
PreFetchRecord bool
|
||||
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
|
||||
ListenAddr string `default:"0.0.0.0"`
|
||||
//sip服务器的配置
|
||||
SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP
|
||||
SipIP string //sip 服务器公网IP
|
||||
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
|
||||
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
|
||||
Username string //sip 服务器账号
|
||||
Password string //sip 服务器密码
|
||||
SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP
|
||||
SipIP string //sip 服务器公网IP
|
||||
SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060
|
||||
Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001
|
||||
Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000
|
||||
Username string //sip 服务器账号
|
||||
Password string //sip 服务器密码
|
||||
Port struct { // 新配置方式
|
||||
Sip string `default:"udp:5060"`
|
||||
Media string `default:"tcp:58200"`
|
||||
@@ -52,14 +53,15 @@ type GB28181Config struct {
|
||||
// WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
|
||||
RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔
|
||||
// UdpCacheSize int //udp缓存大小
|
||||
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
|
||||
routes map[string]string
|
||||
DumpPath string //dump PS流本地文件路径
|
||||
RtpReorder bool `default:"true"`
|
||||
config.Publish
|
||||
Server
|
||||
LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic
|
||||
routes map[string]string
|
||||
DumpPath string //dump PS流本地文件路径
|
||||
Ignores map[string]struct{}
|
||||
tcpPorts PortManager
|
||||
udpPorts PortManager
|
||||
|
||||
Position GB28181PositionConfig //关于定位的配置参数
|
||||
|
||||
}
|
||||
|
||||
func (c *GB28181Config) initRoutes() {
|
||||
@@ -73,8 +75,9 @@ func (c *GB28181Config) initRoutes() {
|
||||
}
|
||||
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
|
||||
}
|
||||
|
||||
func (c *GB28181Config) OnEvent(event any) {
|
||||
switch event.(type) {
|
||||
switch e := event.(type) {
|
||||
case FirstConfig:
|
||||
if c.Port.Sip != "udp:5060" {
|
||||
protocol, ports := util.Conf2Listener(c.Port.Sip)
|
||||
@@ -95,6 +98,16 @@ func (c *GB28181Config) OnEvent(event any) {
|
||||
c.ReadDevices()
|
||||
go c.initRoutes()
|
||||
c.startServer()
|
||||
case *Stream:
|
||||
if c.InviteMode == 2 {
|
||||
if channel := FindChannel(e.AppName, e.StreamName); channel != nil {
|
||||
channel.TryAutoInvite(&InviteOptions{})
|
||||
}
|
||||
}
|
||||
case SEclose:
|
||||
if v, ok := PullStreams.LoadAndDelete(e.Target.Path); ok {
|
||||
go v.(*PullStream).Bye()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,3 +118,4 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
|
||||
var conf GB28181Config
|
||||
|
||||
var plugin = InstallPlugin(&conf)
|
||||
var PullStreams sync.Map //拉流
|
||||
|
||||
235
publisher.go
235
publisher.go
@@ -1,235 +0,0 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"github.com/pion/rtp"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
type GBPublisher struct {
|
||||
PSPublisher
|
||||
*InviteOptions
|
||||
channel *Channel
|
||||
inviteRes sip.Response
|
||||
// udpCache *utils.PriorityQueueRtp
|
||||
dumpFile *os.File
|
||||
dumpPrint io.Writer
|
||||
lastReceive time.Time
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PrintDump(s string) {
|
||||
if p.dumpPrint != nil {
|
||||
p.dumpPrint.Write([]byte(s))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GBPublisher) OnEvent(event any) {
|
||||
if p.channel == nil {
|
||||
// p.parser.EsHandler = p
|
||||
p.IO.OnEvent(event)
|
||||
return
|
||||
}
|
||||
switch event.(type) {
|
||||
case IPublisher:
|
||||
if p.IsLive() {
|
||||
p.Type = "GB28181 Live"
|
||||
p.channel.LivePublisher = p
|
||||
} else {
|
||||
p.Type = "GB28181 Playback"
|
||||
p.channel.RecordPublisher = p
|
||||
}
|
||||
// p.parser.EsHandler = p
|
||||
conf.publishers.Add(p.SSRC, p)
|
||||
if err := error(nil); p.dump != "" {
|
||||
fp := filepath.Join(p.dump, p.Stream.Path)
|
||||
os.MkdirAll(filepath.Dir(fp), 0766)
|
||||
if p.dumpFile, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
|
||||
p.Error("open dump file failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
case SEwaitPublish:
|
||||
//掉线自动重新拉流
|
||||
if p.IsLive() {
|
||||
if p.channel.LivePublisher != nil {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
go p.channel.Invite(&InviteOptions{})
|
||||
}
|
||||
case SEclose, SEKick:
|
||||
if p.IsLive() {
|
||||
if p.channel.LivePublisher != nil {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
} else {
|
||||
p.channel.RecordPublisher = nil
|
||||
}
|
||||
conf.publishers.Delete(p.SSRC)
|
||||
if p.dumpFile != nil {
|
||||
p.dumpFile.Close()
|
||||
}
|
||||
go p.Bye()
|
||||
}
|
||||
p.Publisher.OnEvent(event)
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Bye() int {
|
||||
res := p.inviteRes
|
||||
if res == nil {
|
||||
return 404
|
||||
}
|
||||
defer p.Stop()
|
||||
p.inviteRes = nil
|
||||
bye := p.channel.CreateRequst(sip.BYE)
|
||||
from, _ := res.From()
|
||||
to, _ := res.To()
|
||||
callId, _ := res.CallID()
|
||||
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
|
||||
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
|
||||
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
|
||||
resp, err := p.channel.device.SipRequestForResponse(bye)
|
||||
if err != nil {
|
||||
p.Error("Bye", zap.Error(err))
|
||||
return ServerInternalError
|
||||
}
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
if p.dumpPrint != nil {
|
||||
p.PrintDump(`<style type="text/css">
|
||||
.gray {
|
||||
color: gray;
|
||||
}
|
||||
</style>
|
||||
`)
|
||||
p.PrintDump("<table>")
|
||||
defer p.PrintDump("</table>")
|
||||
}
|
||||
var t uint16
|
||||
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
|
||||
_, err = f.Read(l)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
payload := make([]byte, util.ReadBE[int](l[:4]))
|
||||
t = util.ReadBE[uint16](l[4:])
|
||||
p.PrintDump(fmt.Sprintf("[<b>%d</b> %d]", t, len(payload)))
|
||||
_, err = f.Read(payload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rtpPacket.Unmarshal(payload)
|
||||
p.PushPS(&rtpPacket)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
port, err = conf.udpPorts.GetPort()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
|
||||
conn, err := net.ListenUDP("udp", mediaAddr)
|
||||
if err != nil {
|
||||
conf.udpPorts.Recycle(port)
|
||||
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
p.SetIO(conn)
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", port))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
|
||||
defer conf.udpPorts.Recycle(port)
|
||||
dumpLen := make([]byte, 6)
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("Decode rtp error:", zap.Error(err))
|
||||
}
|
||||
p.writeDump(ps, dumpLen)
|
||||
p.PushPS(&rtpPacket)
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
func (p *GBPublisher) writeDump(ps util.Buffer, dumpLen []byte) {
|
||||
if p.dumpFile != nil {
|
||||
util.PutBE(dumpLen[:4], ps.Len())
|
||||
if p.lastReceive.IsZero() {
|
||||
util.PutBE(dumpLen[4:], 0)
|
||||
} else {
|
||||
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
|
||||
}
|
||||
p.lastReceive = time.Now()
|
||||
p.dumpFile.Write(dumpLen)
|
||||
p.dumpFile.Write(ps)
|
||||
}
|
||||
}
|
||||
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
|
||||
port, err = conf.tcpPorts.GetPort()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
if err != nil {
|
||||
defer conf.tcpPorts.Recycle(port)
|
||||
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
go func() {
|
||||
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
|
||||
defer conf.tcpPorts.Recycle(port)
|
||||
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
|
||||
conn, err := listen.Accept()
|
||||
listen.Close()
|
||||
p.SetIO(conn)
|
||||
if err != nil {
|
||||
plugin.Error("Accept err=", zap.Error(err))
|
||||
return
|
||||
}
|
||||
var rtpPacket rtp.Packet
|
||||
ps := make(util.Buffer, 1024)
|
||||
dumpLen := make([]byte, 6)
|
||||
defer conn.Close()
|
||||
for err == nil {
|
||||
if _, err = io.ReadFull(conn, dumpLen[:2]); err != nil {
|
||||
return
|
||||
}
|
||||
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
|
||||
if _, err = io.ReadFull(conn, ps); err != nil {
|
||||
return
|
||||
}
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
|
||||
} else if !p.IsClosed() {
|
||||
p.writeDump(ps, dumpLen)
|
||||
p.PushPS(&rtpPacket)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
50
restful.go
50
restful.go
@@ -2,9 +2,7 @@ package gb28181
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"m7s.live/engine/v4/util"
|
||||
@@ -52,15 +50,17 @@ func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
id := query.Get("id")
|
||||
channel := query.Get("channel")
|
||||
streamPath := query.Get("streamPath")
|
||||
port, _ := strconv.Atoi(query.Get("mediaPort"))
|
||||
opt := InviteOptions{
|
||||
dump: query.Get("dump"),
|
||||
MediaPort: uint16(port),
|
||||
dump: query.Get("dump"),
|
||||
MediaPort: uint16(port),
|
||||
StreamPath: streamPath,
|
||||
}
|
||||
opt.Validate(query.Get("startTime"), query.Get("endTime"))
|
||||
if c := FindChannel(id, channel); c == nil {
|
||||
http.NotFound(w, r)
|
||||
} else if opt.IsLive() && c.LivePublisher != nil {
|
||||
} else if opt.IsLive() && c.status.Load() > 0 {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else if code, err := c.Invite(&opt); err == nil {
|
||||
w.WriteHeader(code)
|
||||
@@ -69,48 +69,12 @@ func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
|
||||
dump := r.URL.Query().Get("dump")
|
||||
printOut := r.URL.Query().Get("print")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
if dump == "" {
|
||||
dump = c.DumpPath
|
||||
}
|
||||
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
if streamPath == "" {
|
||||
if strings.HasPrefix(dump, "/") {
|
||||
streamPath = "replay" + dump
|
||||
} else {
|
||||
streamPath = "replay/" + dump
|
||||
}
|
||||
}
|
||||
var pub GBPublisher
|
||||
pub.SetIO(f)
|
||||
if err = plugin.Publish(streamPath, &pub); err == nil {
|
||||
if printOut != "" {
|
||||
pub.dumpPrint = w
|
||||
pub.SetParentCtx(r.Context())
|
||||
err = pub.Replay(f)
|
||||
} else {
|
||||
go pub.Replay(f)
|
||||
w.Write([]byte("ok"))
|
||||
}
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
// CORS(w, r)
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
live := r.URL.Query().Get("live")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Bye(live != "false"))
|
||||
w.WriteHeader(c.Bye(streamPath))
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
|
||||
121
server.go
121
server.go
@@ -1,20 +1,14 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/logrusorgru/aurora"
|
||||
"github.com/pion/rtp"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
|
||||
"github.com/ghettovoice/gosip"
|
||||
@@ -24,21 +18,14 @@ import (
|
||||
|
||||
var srv gosip.Server
|
||||
|
||||
type Server struct {
|
||||
Ignores map[string]struct{}
|
||||
publishers util.Map[uint32, *GBPublisher]
|
||||
tcpPorts PortManager
|
||||
udpPorts PortManager
|
||||
}
|
||||
|
||||
const MaxRegisterCount = 3
|
||||
|
||||
func FindChannel(deviceId string, channelId string) (c *Channel) {
|
||||
if v, ok := Devices.Load(deviceId); ok {
|
||||
d := v.(*Device)
|
||||
d.channelMutex.RLock()
|
||||
c = d.ChannelMap[channelId]
|
||||
d.channelMutex.RUnlock()
|
||||
if v, ok := d.channelMap.Load(channelId); ok {
|
||||
return v.(*Channel)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -129,7 +116,6 @@ func RequestForResponse(transport string, request sip.Request,
|
||||
}
|
||||
|
||||
func (c *GB28181Config) startServer() {
|
||||
c.publishers.Init()
|
||||
addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
|
||||
|
||||
logger := utils.NewZapLogger(plugin.Logger, "GB SIP Server", nil)
|
||||
@@ -151,108 +137,17 @@ func (c *GB28181Config) startServer() {
|
||||
plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
|
||||
}
|
||||
|
||||
go c.startMediaServer()
|
||||
if c.MediaNetwork == "tcp" {
|
||||
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
} else {
|
||||
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
}
|
||||
|
||||
if c.Username != "" || c.Password != "" {
|
||||
go c.removeBanDevice()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) startMediaServer() {
|
||||
if c.MediaNetwork == "tcp" {
|
||||
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
if !c.tcpPorts.Valid {
|
||||
c.listenMediaTCP()
|
||||
}
|
||||
} else {
|
||||
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
if !c.udpPorts.Valid {
|
||||
c.listenMediaUDP()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
|
||||
var rtpPacket rtp.Packet
|
||||
reader := bufio.NewReader(conn)
|
||||
defer conn.Close()
|
||||
var err error
|
||||
dumpLen := make([]byte, 6)
|
||||
ps := make(util.Buffer, 1024)
|
||||
for err == nil {
|
||||
if _, err = io.ReadFull(reader, dumpLen[:2]); err != nil {
|
||||
return
|
||||
}
|
||||
ps.Relloc(int(binary.BigEndian.Uint16(dumpLen[:2])))
|
||||
if _, err = io.ReadFull(reader, ps); err != nil {
|
||||
return
|
||||
}
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
|
||||
} else if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
publisher.writeDump(ps, dumpLen)
|
||||
publisher.PushPS(&rtpPacket)
|
||||
} else {
|
||||
plugin.Info("gb28181 publisher not found", zap.Uint32("ssrc", rtpPacket.SSRC))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) listenMediaTCP() {
|
||||
addr := ":" + strconv.Itoa(int(c.MediaPort))
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
|
||||
if err != nil {
|
||||
plugin.Error("MediaServer listened tcp err", zap.String("addr", addr), zap.Error(err))
|
||||
return
|
||||
}
|
||||
plugin.Sugar().Infof("MediaServer started tcp at %s", addr)
|
||||
defer listen.Close()
|
||||
defer plugin.Info("MediaServer stopped tcp at", zap.Uint16("port", c.MediaPort))
|
||||
|
||||
for {
|
||||
conn, err := listen.Accept()
|
||||
if err != nil {
|
||||
plugin.Error("Accept err=", zap.Error(err))
|
||||
}
|
||||
go c.processTcpMediaConn(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GB28181Config) listenMediaUDP() {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
|
||||
addr := ":" + strconv.Itoa(int(c.MediaPort))
|
||||
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
|
||||
conn, err := net.ListenUDP("udp", mediaAddr)
|
||||
|
||||
if err != nil {
|
||||
plugin.Error(" MediaServer started listening udp err", zap.String("addr", addr), zap.Error(err))
|
||||
return
|
||||
}
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Sugar().Infof("MediaServer started at udp %s", addr)
|
||||
defer plugin.Sugar().Infof("MediaServer stopped at udp %s", addr)
|
||||
dumpLen := make([]byte, 6)
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("Decode rtp error:", zap.Error(err))
|
||||
}
|
||||
t := time.Now()
|
||||
if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
publisher.writeDump(ps, dumpLen)
|
||||
publisher.PushPS(&rtpPacket)
|
||||
}
|
||||
x := time.Since(t)
|
||||
if x > time.Millisecond {
|
||||
fmt.Println(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// func queryCatalog(config *transaction.Config) {
|
||||
// t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
|
||||
// for range t.C {
|
||||
|
||||
Reference in New Issue
Block a user