Files
gb28181/pkg/gbs/server.go
2025-03-09 17:18:10 +08:00

211 lines
5.2 KiB
Go

package gbs
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goweb/pkg/conc"
"github.com/ixugo/goweb/pkg/system"
)
type MemoryStorer interface {
LoadDeviceToMemory(conn sip.Connection) // 加载设备到内存
RangeDevices(fn func(key string, value *Device) bool) // 遍历设备
Change(deviceID string, changeFn func(*gb28181.Device), changeFn2 func(*Device)) error // 登出设备
Load(deviceID string) (*Device, bool)
Store(deviceID string, value *Device)
GetChannel(deviceID, channelID string) (*Channel, bool)
// Change(deviceID string, changeFn func(*gb28181.Device)) // 修改设备
}
type Server struct {
*sip.Server
gb *GB28181API
mediaService sms.Core
fromAddress *sip.Address
memoryStorer MemoryStorer
}
func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server, func()) {
api := NewGB28181API(cfg, store, sc.NodeManager)
ip := system.LocalIP()
uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, ip, cfg.Sip.Port))
from := sip.Address{
DisplayName: sip.String{Str: "gowvp"},
URI: &uri,
Params: sip.NewParams(),
}
svr = sip.NewServer(&from)
svr.Register(api.handlerRegister)
msg := svr.Message()
msg.Handle("Keepalive", api.sipMessageKeepalive)
msg.Handle("Catalog", api.sipMessageCatalog)
msg.Handle("DeviceInfo", api.sipMessageDeviceInfo)
// msg.Handle("RecordInfo", api.handlerMessage)
c := Server{
Server: svr,
mediaService: sc,
fromAddress: &from,
gb: api,
memoryStorer: store.Store().(MemoryStorer),
}
api.svr = &c
go svr.ListenUDPServer(fmt.Sprintf(":%d", cfg.Sip.Port))
go svr.ListenTCPServer(fmt.Sprintf(":%d", cfg.Sip.Port))
go c.startTickerCheck()
// 等待 UDP 连接
for {
time.Sleep(50 * time.Millisecond)
if svr.UDPConn() != nil {
c.memoryStorer.LoadDeviceToMemory(svr.UDPConn())
break
}
}
return &c, c.Close
}
// startTickerCheck 定时检查离线
func (s *Server) startTickerCheck() {
conc.Timer(context.Background(), 60*time.Second, time.Second, func() {
now := time.Now()
s.memoryStorer.RangeDevices(func(key string, value *Device) bool {
if !value.IsOnline {
return true
}
if sub := now.Sub(value.LastKeepaliveAt); sub >= 3*60*time.Second || value.conn == nil {
s.gb.logout(key, func(d *gb28181.Device) {
d.IsOnline = false
})
}
return true
})
})
}
// MODDEBUG MODDEBUG
var MODDEBUG = "DEBUG"
// ActiveDevices 记录当前活跃设备,请求播放时设备必须处于活跃状态
type ActiveDevices struct {
sync.Map
}
// Get Get
func (a *ActiveDevices) Get(key string) (Devices, bool) {
if v, ok := a.Load(key); ok {
return v.(Devices), ok
}
return Devices{}, false
}
var _activeDevices ActiveDevices
// 系统运行信息
var (
_sysinfo *m.SysInfo
config *m.Config
)
func LoadSYSInfo() {
config = m.MConfig
_activeDevices = ActiveDevices{sync.Map{}}
StreamList = streamsList{&sync.Map{}, &sync.Map{}, 0}
ssrcLock = &sync.Mutex{}
_recordList = &sync.Map{}
RecordList = apiRecordList{items: map[string]*apiRecordItem{}, l: sync.RWMutex{}}
// init sysinfo
// _sysinfo = &m.SysInfo{}
// if err := db.Get(db.DBClient, _sysinfo); err != nil {
// if db.RecordNotFound(err) {
// // 初始不存在
// _sysinfo = m.DefaultInfo()
// if err = db.Create(db.DBClient, _sysinfo); err != nil {
// // logrus.Fatalf("1 init sysinfo err:%v", err)
// }
// } else {
// // logrus.Fatalf("2 init sysinfo err:%v", err)
// }
// }
m.MConfig.GB28181 = _sysinfo
// uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", _sysinfo.LID, _sysinfo.Region))
_serverDevices = Devices{
DeviceID: _sysinfo.LID,
// Region: _sysinfo.Region,
addr: &sip.Address{
DisplayName: sip.String{Str: "sipserver"},
// URI: &uri,
Params: sip.NewParams(),
},
}
// init media
url, err := url.Parse(config.Media.RTP)
if err != nil {
// logrus.Fatalf("media rtp url error,url:%s,err:%v", config.Media.RTP, err)
}
ipaddr, err := net.ResolveIPAddr("ip", url.Hostname())
if err != nil {
// logrus.Fatalf("media rtp url error,url:%s,err:%v", config.Media.RTP, err)
}
_sysinfo.MediaServerRtpIP = ipaddr.IP
_sysinfo.MediaServerRtpPort, _ = strconv.Atoi(url.Port())
}
// zlm接收到的ssrc为16进制。发起请求的ssrc为10进制
func ssrc2stream(ssrc string) string {
if ssrc[0:1] == "0" {
ssrc = ssrc[1:]
}
num, _ := strconv.Atoi(ssrc)
return fmt.Sprintf("%08X", num)
}
func sipResponse(tx *sip.Transaction) (*sip.Response, error) {
response := tx.GetResponse()
if response == nil {
return nil, sip.NewError(nil, "response timeout", "tx key:", tx.Key())
}
if response.StatusCode() != http.StatusOK {
return response, sip.NewError(nil, "device: ", response.StatusCode(), " ", response.Reason())
}
return response, nil
}
// QueryCatalog 查询 catalog
func (s *Server) QueryCatalog(deviceID string) error {
return s.gb.QueryCatalog(deviceID)
}
func (s *Server) Play(in *PlayInput) error {
return s.gb.Play(in)
}
func (s *Server) StopPlay(in *StopPlayInput) error {
return s.gb.StopPlay(in)
}