From 37fd121d11b461c2ded243f04898642aea20b3c9 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 11 May 2023 21:12:41 +0800 Subject: [PATCH] feat: change to use ps plugin --- channel.go | 152 ++++++++++++++++++------------- device.go | 65 ++++++-------- handle.go | 10 ++- inviteoption.go | 14 +-- main.go | 46 ++++++---- publisher.go | 235 ------------------------------------------------ restful.go | 50 ++--------- server.go | 121 ++----------------------- 8 files changed, 179 insertions(+), 514 deletions(-) delete mode 100644 publisher.go diff --git a/channel.go b/channel.go index 63c843a..71cc52f 100644 --- a/channel.go +++ b/channel.go @@ -5,27 +5,55 @@ import ( "net/http" "strconv" "strings" - "sync" "time" + "sync/atomic" + "github.com/ghettovoice/gosip/sip" "go.uber.org/zap" . "m7s.live/engine/v4" "m7s.live/plugin/gb28181/v4/utils" + "m7s.live/plugin/ps/v4" ) +type PullStream struct { + opt *InviteOptions + channel *Channel + inviteRes sip.Response +} + +func (p *PullStream) Bye() int { + res := p.inviteRes + bye := p.channel.CreateRequst(sip.BYE) + from, _ := res.From() + to, _ := res.To() + callId, _ := res.CallID() + bye.ReplaceHeaders(from.Name(), []sip.Header{from}) + bye.ReplaceHeaders(to.Name(), []sip.Header{to}) + bye.ReplaceHeaders(callId.Name(), []sip.Header{callId}) + resp, err := p.channel.device.SipRequestForResponse(bye) + if p.opt.IsLive() { + p.channel.status.Store(0) + // defer p.channel.TryAutoInvite(p.opt) + } + if p.opt.recyclePort != nil { + p.opt.recyclePort(p.opt.MediaPort) + } + if err != nil { + return ServerInternalError + } + return int(resp.StatusCode()) +} + type ChannelEx struct { - device *Device - RecordPublisher *GBPublisher `json:"-" yaml:"-"` - LivePublisher *GBPublisher - LiveSubSP string //实时子码流 + device *Device // 所属设备 + status atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放 + LiveSubSP string // 实时子码流,通过rtsp Records []*Record RecordStartTime string RecordEndTime string recordStartTime time.Time recordEndTime time.Time - liveInviteLock *sync.Mutex - tcpPortIndex uint16 GpsTime time.Time //gps时间 Longitude string //经度 Latitude string //纬度 @@ -203,18 +231,26 @@ f = v/a/编码格式/码率大小/采样率 f字段中视、音频参数段之间不需空格分割。 可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。 */ + func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) { if opt.IsLive() { - if !channel.liveInviteLock.TryLock() { + if !channel.status.CompareAndSwap(0, 1) { return 304, nil } defer func() { - if code != OK { - channel.liveInviteLock.Unlock() + if err != nil { + channel.status.Store(0) + if conf.InviteMode == 1 { + // 5秒后重试 + time.AfterFunc(time.Second*5, func() { + channel.Invite(opt) + }) + } + } else { + channel.status.Store(2) } }() } - channel.Bye(opt.IsLive()) d := channel.device streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID) s := "Play" @@ -223,37 +259,36 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) { s = "Playback" streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End) } + if opt.StreamPath != "" { + streamPath = opt.StreamPath + } if opt.dump == "" { opt.dump = conf.DumpPath } - publisher := &GBPublisher{ - InviteOptions: opt, - channel: channel, - } - publisher.DisableReorder = !conf.RtpReorder protocol := "" + networkType := "udp" + resuePort := true if conf.IsMediaNetworkTCP() { + networkType = "tcp" protocol = "TCP/" if conf.tcpPorts.Valid { - opt.MediaPort, err = publisher.ListenTCP() - if err != nil { - return ServerInternalError, err - } - } else if opt.MediaPort == 0 { - opt.MediaPort = conf.MediaPort + opt.MediaPort, err = conf.tcpPorts.GetPort() + opt.recyclePort = conf.tcpPorts.Recycle + resuePort = false } - publisher.DisableReorder = true } else { if conf.udpPorts.Valid { - opt.MediaPort, err = publisher.ListenUDP() - if err != nil { - code = ServerInternalError - return - } - } else if opt.MediaPort == 0 { - opt.MediaPort = conf.MediaPort + opt.MediaPort, err = conf.udpPorts.GetPort() + opt.recyclePort = conf.udpPorts.Recycle + resuePort = false } } + if err != nil { + return http.StatusInternalServerError, err + } + if opt.MediaPort == 0 { + opt.MediaPort = conf.MediaPort + } sdpInfo := []string{ "v=0", @@ -266,7 +301,6 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) { "a=recvonly", "a=rtpmap:96 PS/90000", "y=" + opt.ssrc, - "", } if conf.IsMediaNetworkTCP() { sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new") @@ -275,22 +309,22 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) { contentType := sip.ContentType("application/sdp") invite.AppendHeader(&contentType) - invite.SetBody(strings.Join(sdpInfo, "\r\n"), true) + invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true) subject := sip.GenericHeader{ HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial), } invite.AppendHeader(&subject) - publisher.inviteRes, err = d.SipRequestForResponse(invite) + inviteRes, err := d.SipRequestForResponse(invite) if err != nil { plugin.Error(fmt.Sprintf("SIP->Invite %s :%s invite error: %s", channel.DeviceID, invite.String(), err.Error())) return http.StatusInternalServerError, err } - code = int(publisher.inviteRes.StatusCode()) + code = int(inviteRes.StatusCode()) plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code)) if code == OK { - ds := strings.Split(publisher.inviteRes.Body(), "\r\n") + ds := strings.Split(inviteRes.Body(), "\r\n") for _, l := range ds { if ls := strings.Split(l, "="); len(ls) > 1 { if ls[0] == "y" && len(ls[1]) > 0 { @@ -303,46 +337,42 @@ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) { } } } - // if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() { - // publisher.udpCache = utils.NewPqRtp() - // } - if err = plugin.Publish(streamPath, publisher); err != nil { - code = ServerInternalError - return + err = ps.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, resuePort) + if err == nil { + PullStreams.Store(streamPath, &PullStream{ + opt: opt, + channel: channel, + inviteRes: inviteRes, + }) + err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil)) } - ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil) - srv.Send(ack) - } else if channel.CanInvite() { - time.AfterFunc(time.Second*5, func() { - channel.TryAutoInvite() - }) } return } -func (channel *Channel) Bye(live bool) int { +func (channel *Channel) Bye(streamPath string) int { d := channel.device - streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID) - if s := Streams.Get(streamPath); s != nil { - s.Close() + if streamPath == "" { + streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID) } - if live && channel.LivePublisher != nil { - return channel.LivePublisher.Bye() + if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded { + s.(*PullStream).Bye() + if s := Streams.Get(streamPath); s != nil { + s.Close() + } + return http.StatusOK } - if !live && channel.RecordPublisher != nil { - return channel.RecordPublisher.Bye() - } - return 404 + return http.StatusNotFound } -func (channel *Channel) TryAutoInvite() { - if conf.AutoInvite && channel.CanInvite() { - go channel.Invite(&InviteOptions{}) +func (channel *Channel) TryAutoInvite(opt *InviteOptions) { + if conf.InviteMode == 1 && channel.CanInvite() { + go channel.Invite(opt) } } func (channel *Channel) CanInvite() bool { - if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" { + if channel.status.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == "OFF" { return false } diff --git a/device.go b/device.go index f7df038..ec93ded 100644 --- a/device.go +++ b/device.go @@ -10,8 +10,6 @@ import ( "sync" "time" - "golang.org/x/exp/maps" - "go.uber.org/zap" "m7s.live/engine/v4" "m7s.live/plugin/gb28181/v4/utils" @@ -62,8 +60,7 @@ type Device struct { sipIP string //设备对应网卡的服务器ip mediaIP string //设备对应网卡的服务器ip NetAddr string - ChannelMap map[string]*Channel - channelMutex sync.RWMutex + channelMap sync.Map subscriber struct { CallID string Timeout time.Time @@ -76,13 +73,18 @@ type Device struct { func (d *Device) MarshalJSON() ([]byte, error) { type Alias Device - return json.Marshal(&struct { + data := &struct { Channels []*Channel *Alias }{ - Channels: maps.Values(d.ChannelMap), - Alias: (*Alias)(d), + Alias: (*Alias)(d), + } + d.channelMap.Range(func(key, value interface{}) bool { + c := value.(*Channel) + data.Channels = append(data.Channels, c) + return true }) + return json.Marshal(data) } func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) { from, _ := req.From() @@ -116,9 +118,6 @@ func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) { d.mediaIP = mediaIp d.NetAddr = deviceIp d.UpdateTime = time.Now() - if d.ChannelMap == nil { - d.ChannelMap = make(map[string]*Channel) - } } func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) { @@ -164,7 +163,6 @@ func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) { sipIP: sipIP, mediaIP: mediaIp, NetAddr: deviceIp, - ChannelMap: make(map[string]*Channel), } Devices.Store(id, d) c.SaveDevices() @@ -200,35 +198,27 @@ func (c *GB28181Config) SaveDevices() { } func (d *Device) addOrUpdateChannel(channel *Channel) { - d.channelMutex.Lock() - defer d.channelMutex.Unlock() + if old, ok := d.channelMap.Load(channel.DeviceID); ok { + channel.ChannelEx = old.(*Channel).ChannelEx + } channel.device = d - if old, ok := d.ChannelMap[channel.DeviceID]; ok { - //复制锁指针 - channel.ChannelEx = old.ChannelEx - } - if channel.liveInviteLock == nil { - channel.liveInviteLock = &sync.Mutex{} - } - d.ChannelMap[channel.DeviceID] = channel + d.channelMap.Store(channel.DeviceID, channel) } func (d *Device) deleteChannel(DeviceID string) { - d.channelMutex.Lock() - defer d.channelMutex.Unlock() - delete(d.ChannelMap, DeviceID) + d.channelMap.Delete(DeviceID) } func (d *Device) CheckSubStream() { - d.channelMutex.Lock() - defer d.channelMutex.Unlock() - for _, c := range d.ChannelMap { + d.channelMap.Range(func(key, value any) bool { + c := value.(*Channel) if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil { c.LiveSubSP = s.Path } else { c.LiveSubSP = "" } - } + return true + }) } func (d *Device) UpdateChannels(list []*Channel) { @@ -265,7 +255,7 @@ func (d *Device) UpdateChannels(list []*Channel) { go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT)) } } - c.TryAutoInvite() + c.TryAutoInvite(&InviteOptions{}) if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil { c.LiveSubSP = s.Path } else { @@ -274,11 +264,11 @@ func (d *Device) UpdateChannels(list []*Channel) { } } func (d *Device) UpdateRecord(channelId string, list []*Record) { - d.channelMutex.RLock() - if c, ok := d.ChannelMap[channelId]; ok { + d.channelMap.Range(func(key, value any) bool { + c := value.(*Channel) c.Records = append(c.Records, list...) - } - d.channelMutex.RUnlock() + return true + }) } func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) { @@ -455,7 +445,8 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter // UpdateChannelPosition 更新通道GPS坐标 func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) { - if c, ok := d.ChannelMap[channelId]; ok { + if v, ok := d.channelMap.Load(channelId); ok { + c := v.(*Channel) c.ChannelEx.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题 c.ChannelEx.Longitude = lng c.ChannelEx.Latitude = lat @@ -534,7 +525,8 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) { } func (d *Device) channelOnline(DeviceID string) { - if c, ok := d.ChannelMap[DeviceID]; ok { + if v, ok := d.channelMap.Load(DeviceID); ok { + c := v.(*Channel) c.Status = "ON" plugin.Sugar().Debugf("通道[%s]在线\n", c.Name) } else { @@ -543,7 +535,8 @@ func (d *Device) channelOnline(DeviceID string) { } func (d *Device) channelOffline(DeviceID string) { - if c, ok := d.ChannelMap[DeviceID]; ok { + if v, ok := d.channelMap.Load(DeviceID); ok { + c := v.(*Channel) c.Status = "OFF" plugin.Sugar().Debugf("通道[%s]离线\n", c.Name) } else { diff --git a/handle.go b/handle.go index c7275d3..37fa78d 100644 --- a/handle.go +++ b/handle.go @@ -185,12 +185,14 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) { case "Keepalive": d.LastKeepaliveAt = time.Now() //callID !="" 说明是订阅的事件类型信息 - if d.ChannelMap == nil || len(d.ChannelMap) == 0 { + if d.lastSyncTime.IsZero() { go d.syncChannels() } else { - for _, ch := range d.ChannelMap { - ch.TryAutoInvite() - } + d.channelMap.Range(func(key, value interface{}) bool { + channel := value.(*Channel) + channel.TryAutoInvite(&InviteOptions{}) + return true + }) } //为什么要查找子码流? //d.CheckSubStream() diff --git a/inviteoption.go b/inviteoption.go index db4fc2c..369eaf6 100644 --- a/inviteoption.go +++ b/inviteoption.go @@ -8,12 +8,14 @@ import ( ) type InviteOptions struct { - Start int - End int - dump string - ssrc string - SSRC uint32 - MediaPort uint16 + Start int + End int + dump string + ssrc string + SSRC uint32 + MediaPort uint16 + StreamPath string + recyclePort func(p uint16) (err error) } func (o InviteOptions) IsLive() bool { diff --git a/main.go b/main.go index ce7d2e8..8572ce3 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,11 @@ import ( "fmt" "os" "strings" + "sync" "time" myip "github.com/husanpao/ip" . "m7s.live/engine/v4" - "m7s.live/engine/v4/config" "m7s.live/engine/v4/util" ) @@ -19,18 +19,19 @@ type GB28181PositionConfig struct { } type GB28181Config struct { - AutoInvite bool `default:"true"` + // AutoInvite bool `default:"true"` + InviteMode int `default:"1"` //邀请模式,0:手动拉流,1:预拉流,2:按需拉流 PreFetchRecord bool InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR ListenAddr string `default:"0.0.0.0"` //sip服务器的配置 - SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP - SipIP string //sip 服务器公网IP - SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060 - Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000 - Username string //sip 服务器账号 - Password string //sip 服务器密码 + SipNetwork string `default:"udp"` //传输协议,默认UDP,可选TCP + SipIP string //sip 服务器公网IP + SipPort uint16 `default:"5060"` //sip 服务器端口,默认 5060 + Serial string `default:"34020000002000000001"` //sip 服务器 id, 默认 34020000002000000001 + Realm string `default:"3402000000"` //sip 服务器域,默认 3402000000 + Username string //sip 服务器账号 + Password string //sip 服务器密码 Port struct { // 新配置方式 Sip string `default:"udp:5060"` Media string `default:"tcp:58200"` @@ -52,14 +53,15 @@ type GB28181Config struct { // WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流 RemoveBanInterval time.Duration `default:"600s"` //移除禁止设备间隔 // UdpCacheSize int //udp缓存大小 - LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic - routes map[string]string - DumpPath string //dump PS流本地文件路径 - RtpReorder bool `default:"true"` - config.Publish - Server + LogLevel string `default:"info"` //trace, debug, info, warn, error, fatal, panic + routes map[string]string + DumpPath string //dump PS流本地文件路径 + Ignores map[string]struct{} + tcpPorts PortManager + udpPorts PortManager Position GB28181PositionConfig //关于定位的配置参数 + } func (c *GB28181Config) initRoutes() { @@ -73,8 +75,9 @@ func (c *GB28181Config) initRoutes() { } plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes)) } + func (c *GB28181Config) OnEvent(event any) { - switch event.(type) { + switch e := event.(type) { case FirstConfig: if c.Port.Sip != "udp:5060" { protocol, ports := util.Conf2Listener(c.Port.Sip) @@ -95,6 +98,16 @@ func (c *GB28181Config) OnEvent(event any) { c.ReadDevices() go c.initRoutes() c.startServer() + case *Stream: + if c.InviteMode == 2 { + if channel := FindChannel(e.AppName, e.StreamName); channel != nil { + channel.TryAutoInvite(&InviteOptions{}) + } + } + case SEclose: + if v, ok := PullStreams.LoadAndDelete(e.Target.Path); ok { + go v.(*PullStream).Bye() + } } } @@ -105,3 +118,4 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool { var conf GB28181Config var plugin = InstallPlugin(&conf) +var PullStreams sync.Map //拉流 diff --git a/publisher.go b/publisher.go deleted file mode 100644 index b9eae8b..0000000 --- a/publisher.go +++ /dev/null @@ -1,235 +0,0 @@ -package gb28181 - -import ( - "encoding/binary" - "fmt" - "io" - "net" - "os" - "path/filepath" - "time" - - "github.com/ghettovoice/gosip/sip" - "github.com/pion/rtp" - "go.uber.org/zap" - . "m7s.live/engine/v4" - "m7s.live/engine/v4/util" -) - -type GBPublisher struct { - PSPublisher - *InviteOptions - channel *Channel - inviteRes sip.Response - // udpCache *utils.PriorityQueueRtp - dumpFile *os.File - dumpPrint io.Writer - lastReceive time.Time -} - -func (p *GBPublisher) PrintDump(s string) { - if p.dumpPrint != nil { - p.dumpPrint.Write([]byte(s)) - } -} - -func (p *GBPublisher) OnEvent(event any) { - if p.channel == nil { - // p.parser.EsHandler = p - p.IO.OnEvent(event) - return - } - switch event.(type) { - case IPublisher: - if p.IsLive() { - p.Type = "GB28181 Live" - p.channel.LivePublisher = p - } else { - p.Type = "GB28181 Playback" - p.channel.RecordPublisher = p - } - // p.parser.EsHandler = p - conf.publishers.Add(p.SSRC, p) - if err := error(nil); p.dump != "" { - fp := filepath.Join(p.dump, p.Stream.Path) - os.MkdirAll(filepath.Dir(fp), 0766) - if p.dumpFile, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE, 0644); err != nil { - p.Error("open dump file failed", zap.Error(err)) - } - } - case SEwaitPublish: - //掉线自动重新拉流 - if p.IsLive() { - if p.channel.LivePublisher != nil { - p.channel.LivePublisher = nil - p.channel.liveInviteLock.Unlock() - } - go p.channel.Invite(&InviteOptions{}) - } - case SEclose, SEKick: - if p.IsLive() { - if p.channel.LivePublisher != nil { - p.channel.LivePublisher = nil - p.channel.liveInviteLock.Unlock() - } - } else { - p.channel.RecordPublisher = nil - } - conf.publishers.Delete(p.SSRC) - if p.dumpFile != nil { - p.dumpFile.Close() - } - go p.Bye() - } - p.Publisher.OnEvent(event) -} - -func (p *GBPublisher) Bye() int { - res := p.inviteRes - if res == nil { - return 404 - } - defer p.Stop() - p.inviteRes = nil - bye := p.channel.CreateRequst(sip.BYE) - from, _ := res.From() - to, _ := res.To() - callId, _ := res.CallID() - bye.ReplaceHeaders(from.Name(), []sip.Header{from}) - bye.ReplaceHeaders(to.Name(), []sip.Header{to}) - bye.ReplaceHeaders(callId.Name(), []sip.Header{callId}) - resp, err := p.channel.device.SipRequestForResponse(bye) - if err != nil { - p.Error("Bye", zap.Error(err)) - return ServerInternalError - } - return int(resp.StatusCode()) -} - -func (p *GBPublisher) Replay(f *os.File) (err error) { - var rtpPacket rtp.Packet - defer f.Close() - if p.dumpPrint != nil { - p.PrintDump(` - `) - p.PrintDump("