diff --git a/plugin/gb28181pro/api.go b/plugin/gb28181pro/api.go index d30e54a..9907802 100644 --- a/plugin/gb28181pro/api.go +++ b/plugin/gb28181pro/api.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "sort" "strings" "sync" "time" @@ -463,7 +464,7 @@ func (gb *GB28181ProPlugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceRe // 初始化 SIP 客户端 d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.LocalIP)) if d.client != nil { - d.dialogClient = sipgo.NewDialogClient(d.client, d.contactHDR) + d.dialogClient = sipgo.NewDialogClientCache(d.client, d.contactHDR) } else { return resp, fmt.Errorf("failed to create sip client") } @@ -1129,6 +1130,11 @@ func (gb *GB28181ProPlugin) QueryRecord(ctx context.Context, req *pb.QueryRecord resp.Code = 0 resp.Message = fmt.Sprintf("success, received %d/%d records", recordReq.ReceivedNum, recordReq.SumNum) + // 排序录像列表,按StartTime升序排序 + sort.Slice(resp.Records, func(i, j int) bool { + return resp.Records[i].StartTime < resp.Records[j].StartTime + }) + // 清理请求 channel.RecordReqs.Remove(recordReq) @@ -1219,7 +1225,7 @@ func (gb *GB28181ProPlugin) TestSip(ctx context.Context, req *pb.TestSipRequest) resp.Message = "failed to create sip client" return resp, nil } - device.dialogClient = sipgo.NewDialogClient(device.client, device.contactHDR) + device.dialogClient = sipgo.NewDialogClientCache(device.client, device.contactHDR) // 构建目标URI recipient := sip.Uri{ diff --git a/plugin/gb28181pro/device.go b/plugin/gb28181pro/device.go index 7700cc2..12fc488 100644 --- a/plugin/gb28181pro/device.go +++ b/plugin/gb28181pro/device.go @@ -70,7 +70,7 @@ type Device struct { Longitude, Latitude string // 经度,纬度 eventChan chan any client *sipgo.Client - dialogClient *sipgo.DialogClient + dialogClient *sipgo.DialogClientCache contactHDR sip.ContactHeader fromHDR sip.FromHeader toHDR sip.ToHeader diff --git a/plugin/gb28181pro/index.go b/plugin/gb28181pro/index.go index 4790ae6..bd268df 100644 --- a/plugin/gb28181pro/index.go +++ b/plugin/gb28181pro/index.go @@ -44,7 +44,7 @@ type GB28181ProPlugin struct { m7s.Plugin AutoInvite bool `default:"true" desc:"自动邀请"` Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 + Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 Username string Password string Sip SipConfig @@ -75,8 +75,8 @@ func (gb *GB28181ProPlugin) OnInit() (err error) { // Creating client handle for ua if len(gb.Sip.ListenAddr) > 0 { gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua - gb.server.OnRegister(gb.OnRegister) gb.server.OnMessage(gb.OnMessage) + gb.server.OnRegister(gb.OnRegister) gb.server.OnBye(gb.OnBye) gb.devices.L = new(sync.RWMutex) gb.server.OnInvite(gb.OnInvite) @@ -120,6 +120,11 @@ func (gb *GB28181ProPlugin) OnInit() (err error) { gb.Error("检查设备过期状态失败", "error", err) } + // 初始化数据库中的设备 + if err := gb.checkDevices(); err != nil { + gb.Error("检查设备有效性失败", "error", err) + } + // 检查并初始化平台 gb.checkPlatform() } @@ -133,6 +138,158 @@ func (gb *GB28181ProPlugin) OnInit() (err error) { return } +// InitDevicesFromDB 从数据库中加载并初始化在线设备 +func (gb *GB28181ProPlugin) checkDevices() error { + // 检查数据库是否已初始化 + if gb.DB == nil { + gb.Warn("InitDevicesFromDB", "warning", "数据库未初始化") + return nil + } + + // 查询所有有效设备:在线且注册未过期 + var devices []*Device + now := time.Now() + + // 先查询所有在线设备 + if err := gb.DB.Where("online = ?", true).Find(&devices).Error; err != nil { + gb.Error("InitDevicesFromDB", "error", err.Error()) + return err + } + + // 过滤出未过期的设备 + validDevices := make([]*Device, 0, len(devices)) + for _, d := range devices { + expireTime := d.RegisterTime.Add(time.Duration(d.Expires) * time.Second) + if !now.After(expireTime) { + validDevices = append(validDevices, d) + } else { + gb.Debug("InitDevicesFromDB", "跳过过期设备", d.DeviceID, "注册时间", d.RegisterTime, "过期时间", expireTime) + } + } + + gb.Info("InitDevicesFromDB", "找到有效设备数量", len(validDevices), "总在线设备数量", len(devices)) + + // 初始化每个设备 + for _, device := range validDevices { + d := device // 创建副本以避免循环变量问题 + + // 设置设备基本属性 + d.Status = DeviceRecoverStatus + + // 设置事件通道 + d.eventChan = make(chan any, 10) + + // 设置Logger + d.Logger = gb.With("deviceid", d.DeviceID) + + // 初始化通道集合 + d.channels.L = new(sync.RWMutex) + + // 设置plugin引用 + d.plugin = gb + + // 配置SIP相关参数 + host := d.LocalIP + if host == "" { + host = myip.InternalIPv4() + d.LocalIP = host + d.mediaIp = host + } + + // 获取公网或内网IP配置 + if !net.ParseIP(d.IP).IsPrivate() { + host = gb.GetPublicIP(host) + } + + // 设置联系人头信息 + d.contactHDR = sip.ContactHeader{ + Address: sip.Uri{ + User: gb.Serial, + Host: host, + Port: d.LocalPort, + }, + } + + // 设置来源头信息 + d.fromHDR = sip.FromHeader{ + Address: sip.Uri{ + User: gb.Serial, + Host: gb.Realm, + }, + Params: sip.NewParams(), + } + d.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) + + // 设置接收者 + d.Recipient = sip.Uri{ + Host: d.IP, + Port: d.Port, + User: d.DeviceID, + } + + // 创建SIP客户端 + d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(host), sipgo.WithClientPort(d.LocalPort)) + d.dialogClient = sipgo.NewDialogClientCache(d.client, d.contactHDR) + + // 设置设备ID的hash值作为任务ID + var hash uint32 + for i := 0; i < len(d.DeviceID); i++ { + ch := d.DeviceID[i] + hash = hash*31 + uint32(ch) + } + d.Task.ID = hash + + // 设置启动和销毁回调 + d.OnStart(func() { + gb.devices.Add(d) + d.channels.OnAdd(func(c *Channel) { + if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool { + return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID) + }); ok { + c.AbstractDevice = absDevice + absDevice.Handler = c + absDevice.ChangeStatus(m7s.PullProxyStatusOnline) + } + if gb.AutoInvite { + gb.Pull(fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID), config.Pull{ + MaxRetry: 0, + URL: fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID), + }, nil) + } + }) + }) + d.OnDispose(func() { + d.Status = DeviceOfflineStatus + if gb.devices.RemoveByKey(d.DeviceID) { + for c := range d.channels.Range { + if c.AbstractDevice != nil { + c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline) + } + } + } + }) + + // 添加设备任务 + gb.AddTask(d) + expireTime := d.RegisterTime.Add(time.Duration(d.Expires) * time.Second) + gb.Info("InitDevicesFromDB", "已初始化设备", d.DeviceID, "注册时间", d.RegisterTime, "过期时间", expireTime) + + // 加载设备的通道 + var channels []gb28181.DeviceChannel + if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceDBID: d.ID}).Find(&channels).Error; err != nil { + gb.Error("InitDevicesFromDB", "加载通道失败", d.DeviceID, "error", err.Error()) + } else { + // 初始化设备通道 + for _, channel := range channels { + d.addOrUpdateChannel(channel) + } + gb.Info("InitDevicesFromDB", "已加载通道数量", len(channels), "设备ID", d.DeviceID) + } + } + + return nil +} + // checkPlatform 从数据库中查找启用状态的平台,初始化它们,并进行注册和定时任务设置 func (gb *GB28181ProPlugin) checkPlatform() { // 检查数据库是否初始化 @@ -362,6 +519,17 @@ func (gb *GB28181ProPlugin) OnRegister(req *sip.Request, tx sip.ServerTransactio } func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { + // 解析消息内容 + temp := &gb28181.Message{} + err := gb28181.DecodeXML(temp, req.Body()) + if err != nil { + gb.Error("OnMessage", "error", err.Error()) + response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil) + if err := tx.Respond(response); err != nil { + gb.Error("respond BadRequest", "error", err.Error()) + } + return + } from := req.From() if from == nil || from.Address.User == "" { gb.Error("OnMessage", "error", "no user") @@ -406,18 +574,6 @@ func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction return } - // 解析消息内容 - temp := &gb28181.Message{} - err := gb28181.DecodeXML(temp, req.Body()) - if err != nil { - gb.Error("OnMessage", "error", err.Error()) - response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil) - if err := tx.Respond(response); err != nil { - gb.Error("respond BadRequest", "error", err.Error()) - } - return - } - // 根据来源调用不同的处理方法 if d != nil { d.UpdateTime = time.Now() @@ -500,7 +656,6 @@ func (gb *GB28181ProPlugin) RecoverDevice(d *Device, req *sip.Request) { } func (gb *GB28181ProPlugin) StoreDevice(deviceid string, req *sip.Request) (d *Device) { - from := req.From() source := req.Source() desc := req.Destination() servIp, sPortStr, _ := net.SplitHostPort(desc) @@ -561,7 +716,7 @@ func (gb *GB28181ProPlugin) StoreDevice(deviceid string, req *sip.Request) (d *D Recipient: sip.Uri{ Host: hostname, Port: port, - User: from.Address.User, + User: deviceid, }, contactHDR: sip.ContactHeader{ Address: sip.Uri{ @@ -585,7 +740,7 @@ func (gb *GB28181ProPlugin) StoreDevice(deviceid string, req *sip.Request) (d *D d.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(host), sipgo.WithClientPort(serverPort)) gb.Info("get serverport is ", serverPort) - d.dialogClient = sipgo.NewDialogClient(d.client, d.contactHDR) + d.dialogClient = sipgo.NewDialogClientCache(d.client, d.contactHDR) d.channels.L = new(sync.RWMutex) d.Info("StoreDevice", "source", source, "desc", desc, "servIp", servIp, "publicIP", host, "recipient", req.Recipient) diff --git a/plugin/gb28181pro/platform.go b/plugin/gb28181pro/platform.go index 564a6ae..eeb43f8 100644 --- a/plugin/gb28181pro/platform.go +++ b/plugin/gb28181pro/platform.go @@ -23,12 +23,12 @@ type Platform struct { PlatformModel *gb28181.PlatformModel // SIP相关字段,不存储到数据库 - Client *sipgo.Client `gorm:"-" json:"-"` // SIP客户端 - DialogClient *sipgo.DialogClient `gorm:"-" json:"-"` // SIP对话客户端 - Recipient sip.Uri `gorm:"-" json:"-"` // 接收者地址 - ContactHDR *sip.ContactHeader `gorm:"-" json:"-"` // 联系人头部 - UserAgentHDR sip.Header `gorm:"-" json:"-"` - MaxForwardsHDR sip.MaxForwardsHeader `gorm:"-" json:"-"` + Client *sipgo.Client `gorm:"-" json:"-"` // SIP客户端 + DialogClient *sipgo.DialogClientCache `gorm:"-" json:"-"` // SIP对话客户端 + Recipient sip.Uri `gorm:"-" json:"-"` // 接收者地址 + ContactHDR *sip.ContactHeader `gorm:"-" json:"-"` // 联系人头部 + UserAgentHDR sip.Header `gorm:"-" json:"-"` + MaxForwardsHDR sip.MaxForwardsHeader `gorm:"-" json:"-"` // 运行时字段 KeepAliveReply int `gorm:"-" json:"keepAliveReply"` // KeepAliveReply表示心跳未回复次数 @@ -75,7 +75,7 @@ func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181ProPlugin) *Platform p.ContactHDR = &contactHdr // 创建对话客户端 - p.DialogClient = sipgo.NewDialogClient(p.Client, *p.ContactHDR) + p.DialogClient = sipgo.NewDialogClientCache(p.Client, *p.ContactHDR) p.MaxForwardsHDR = sip.MaxForwardsHeader(70) p.plugin.platforms.Set(p) @@ -563,7 +563,7 @@ func (p *Platform) buildChannelItem(channel gb28181.CommonGBChannel) string { channel.GbRegisterWay, // 直接使用整数值 channel.GbSecrecy, // 直接使用整数值 parentID, - channel.GbParental, // 直接使用整数值 + channel.GbParental, // 直接使用整数值 channel.GbSafetyWay) // 直接使用整数值 }