package stack import ( "context" "errors" "fmt" "gb-cms/common" "gb-cms/dao" "gb-cms/log" "github.com/ghettovoice/gosip/sip" "net" "net/http" "strconv" "strings" "time" ) const ( CatalogFormat = "\r\n" + "\r\n" + "Catalog\r\n" + "" + "%d" + "\r\n" + "" + "%s" + "\r\n" + "\r\n" DeviceInfoFormat = "\r\n" + "\r\n" + "DeviceInfo\r\n" + "" + "%d" + "\r\n" + "" + "%s" + "\r\n" + "\r\n" ) var ( XmlMessageType sip.ContentType = "Application/MANSCDP+xml" SDPMessageType sip.ContentType = "application/sdp" RTSPMessageType sip.ContentType = "application/RTSP" ) type GBDevice interface { GetID() string // QueryDeviceInfo 发送查询设备信息命令 QueryDeviceInfo() // QueryCatalog 发送查询目录命令 QueryCatalog(timeout int) ([]*dao.ChannelModel, error) // QueryRecord 发送查询录像命令 QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error //Invite(channel string, setup string) // OnInvite 语音广播 OnInvite(request sip.Request, user string) sip.Response // OnBye 设备侧主动挂断 OnBye(request sip.Request) // //OnNotifyCatalog() // //OnNotifyAlarm() SubscribePosition(channelId string) error //SubscribeCatalog() // //SubscribeAlarm() Broadcast(sourceId, channelId string) sip.ClientTransaction // UpdateChannel 订阅目录,通道发生改变 // 附录P.4.2.2 // @Params event ON-上线/OFF-离线/VLOST-视频丢失/DEFECT-故障/ADD-增加/DEL-删除/UPDATE-更新 UpdateChannel(id string, event string) Close() } type CatalogProgress struct { TotalSize int RecvSize int } type Device struct { *dao.DeviceModel } func (d *Device) BuildMessageRequest(to, body string) sip.Request { request, err := BuildMessageRequest(common.Config.SipID, net.JoinHostPort(GlobalContactAddress.Uri.Host(), GlobalContactAddress.Uri.Port().String()), to, net.JoinHostPort(d.RemoteIP, strconv.Itoa(d.RemotePort)), d.Transport, body) if err != nil { panic(err) } return request } func (d *Device) QueryDeviceInfo() { body := fmt.Sprintf(DeviceInfoFormat, GetSN(), d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) common.SipStack.SendRequest(request) } func (d *Device) QueryCatalog(timeoutSeconds int) ([]*dao.ChannelModel, error) { catalogProgress := &CatalogProgress{} var timeoutCtx context.Context var timeoutCancelFunc context.CancelFunc if timeoutSeconds > 0 { timeoutCtx, timeoutCancelFunc = context.WithTimeout(context.Background(), time.Duration(timeoutSeconds)*time.Second) } var err error var result []*dao.ChannelModel query := func() { defer func() { if timeoutCancelFunc != nil { timeoutCancelFunc() } }() // 下发查询指令 finish := make(chan byte, 1) sn := GetSN() body := fmt.Sprintf(CatalogFormat, sn, d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) tx := common.SipStack.SendRequest(request) // 异步等待响应 go func() { response := <-tx.Responses() if response != nil && response.StatusCode() != http.StatusOK { err = fmt.Errorf("query catalog res[%d] %s", response.StatusCode(), StatusCode2Reason(int(response.StatusCode()))) finish <- 1 return } }() // 处理目录消息 lastTime := time.Now() var list []*CatalogResponse SNManager.AddEvent(sn, func(response interface{}) { lastTime = time.Now() catalog := response.(*CatalogResponse) catalogProgress.TotalSize = catalog.SumNum catalogProgress.RecvSize += catalog.DeviceList.Num list = append(list, catalog) if catalogProgress.RecvSize >= catalogProgress.TotalSize { finish <- 1 } }) // 定时检测是否超时或完成 timeout := 10 * time.Second ticker := time.NewTicker(timeout) for { var end bool select { case <-ticker.C: if time.Since(lastTime) > timeout { // 超时, 则直接返回 err = fmt.Errorf("query catalog timeout[%ds]", int(timeout.Seconds())) ticker.Stop() end = true break } case <-finish: ticker.Stop() end = true break } if end { break } } if err != nil { return } // 如果查询不完整, 并且数据库中通道列表不为空, 则丢弃本次查询的数据, 否则依旧入库 var oldChannelCount int oldChannelCount, err = dao.Channel.QueryChanelCount(d.DeviceID, true) if err != nil { log.Sugar.Errorf("query channel count failed, device: %s, err: %s", d.DeviceID, err.Error()) return } else if len(list) < 1 || (oldChannelCount > 0 && catalogProgress.RecvSize < catalogProgress.TotalSize) { log.Sugar.Errorf("query catalog failed, device: %s, count: %d, recvSize: %d, totalSize: %d", d.DeviceID, oldChannelCount, catalogProgress.RecvSize, catalogProgress.TotalSize) return } // 删除旧的通道列表 if oldChannelCount > 0 { err = dao.Channel.DeleteChannels(d.DeviceID) if err != nil { log.Sugar.Errorf("delete channels failed, device: %s, err: %s", d.DeviceID, err.Error()) return } } // 批量保存通道 result, err = d.SaveChannels(list) } if !UniqueTaskManager.Commit(GenerateCatalogTaskID(d.DeviceID), query, catalogProgress) { return nil, errors.New("device busy") } // web接口的查询超时 if timeoutCtx != nil { select { case <-timeoutCtx.Done(): if err == nil && catalogProgress.RecvSize < catalogProgress.TotalSize { err = fmt.Errorf("wait for catalog[%d/%d] timeout[%ds]", catalogProgress.RecvSize, catalogProgress.TotalSize, timeoutSeconds) } break } } return result, err } func IsDir(typeCode int) bool { return typeCode < 131 || typeCode > 199 } func (d *Device) SaveChannels(list []*CatalogResponse) ([]*dao.ChannelModel, error) { var channels []*dao.ChannelModel // 目录 dirs := make(map[string]*dao.ChannelModel) for _, response := range list { for _, channel := range response.DeviceList.Devices { // 状态转为大写 channel.Status = common.OnlineStatus(strings.ToUpper(channel.Status.String())) // 默认在线 if common.OFF != channel.Status { channel.Status = common.ON } // 下级设备的系统ID, 更新DeviceInfo if channel.DeviceID == d.DeviceID && dao.Device.ExistDevice(d.DeviceID) { _ = dao.Device.UpdateDeviceInfo(d.DeviceID, &dao.DeviceModel{ Manufacturer: channel.Manufacturer, Model: channel.Model, Name: channel.Name, }) } typeCode := GetTypeCode(channel.DeviceID) if typeCode == "" { log.Sugar.Errorf("保存通道时, 获取设备类型失败 device: %s", channel.DeviceID) } // 通道所属组, ParentID优先, 其次BusinessGroupID var groupId string if channel.ParentID != "" { layers := strings.Split(channel.ParentID, "/") groupId = layers[len(layers)-1] } else if channel.BusinessGroupID != "" { groupId = channel.BusinessGroupID } code, _ := strconv.Atoi(typeCode) channel.RootID = d.DeviceID channel.TypeCode = code channel.GroupID = groupId channels = append(channels, channel) dirs[channel.RootID+"/"+channel.DeviceID] = channel } } // 父通道不是目录, 归属到最近的目录或设备, 所有外围设备同级 for _, channel := range channels { for { parentChannel, ok := dirs[channel.RootID+"/"+channel.GroupID] if !ok { break } else if !IsDir(parentChannel.TypeCode) { channel.GroupID = parentChannel.GroupID } else { break } } } // 统计目录的子通道数量 for _, channel := range channels { if parentChannel, ok := dirs[channel.RootID+"/"+channel.GroupID]; ok && IsDir(parentChannel.TypeCode) { parentChannel.IsDir = true parentChannel.SubCount++ } } err := dao.Channel.SaveChannels(channels) if err != nil { log.Sugar.Errorf("save channels failed, device: %s, err: %s", d.DeviceID, err.Error()) return nil, err } return channels, nil } func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error { body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_) request := d.BuildMessageRequest(channelId, body) common.SipStack.SendRequest(request) return nil } func (d *Device) OnBye(request sip.Request) { } func (d *Device) SubscribePosition(channelId string) error { if channelId == "" { channelId = d.DeviceID } //暂时不考虑级联 builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, channelId) body := fmt.Sprintf(MobilePositionMessageFormat, GetSN(), channelId, common.Config.MobilePositionInterval) expiresHeader := sip.Expires(common.Config.MobilePositionExpires) builder.SetExpires(&expiresHeader) builder.SetContentType(&XmlMessageType) builder.SetContact(GlobalContactAddress) builder.SetBody(body) request, err := builder.Build() if err != nil { return err } event := Event(EventPresence) request.AppendHeader(&event) response, err := common.SipStack.SendRequestWithTimeout(5, request) if err != nil { return err } if response.StatusCode() != 200 { return fmt.Errorf("err code %d", response.StatusCode()) } return nil } func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction { body := fmt.Sprintf(BroadcastFormat, GetSN(), sourceId, channelId) request := d.BuildMessageRequest(channelId, body) return common.SipStack.SendRequest(request) } func (d *Device) UpdateChannel(id string, event string) { } func (d *Device) BuildCatalogRequest() (sip.Request, error) { body := fmt.Sprintf(CatalogFormat, GetSN(), d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) return request, nil } func (d *Device) NewSIPRequestBuilderWithTransport() *sip.RequestBuilder { builder := sip.NewRequestBuilder() hop := sip.ViaHop{ Transport: d.Transport, } builder.AddVia(&hop) return builder } func (d *Device) NewRequestBuilder(method sip.RequestMethod, fromUser, realm, toUser string) *sip.RequestBuilder { builder := d.NewSIPRequestBuilderWithTransport() builder.SetMethod(method) sipPort := sip.Port(d.RemotePort) requestUri := &sip.SipUri{ FUser: sip.String{Str: toUser}, FHost: d.RemoteIP, FPort: &sipPort, } builder.SetRecipient(requestUri) fromAddress := &sip.Address{ Uri: &sip.SipUri{ FUser: sip.String{Str: fromUser}, FHost: realm, }, } fromAddress.Params = sip.NewParams().Add("tag", sip.String{Str: GenerateTag()}) builder.SetFrom(fromAddress) builder.SetTo(&sip.Address{ Uri: requestUri, }) return builder } func (d *Device) BuildInviteRequest(sessionName, channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) (sip.Request, error) { builder := d.NewRequestBuilder(sip.INVITE, common.Config.SipID, common.Config.SipContactAddr, channelId) sdp := BuildSDP("video", common.Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc, "96 PS/90000") builder.SetContentType(&SDPMessageType) builder.SetContact(GlobalContactAddress) builder.SetBody(sdp) request, err := builder.Build() if err != nil { return nil, err } var subjectHeader = Subject(channelId + ":" + d.DeviceID + "," + common.Config.SipID + ":" + ssrc) request.AppendHeader(subjectHeader) return request, err } func (d *Device) BuildLiveRequest(channelId, ip string, port uint16, setup string, ssrc string) (sip.Request, error) { return d.BuildInviteRequest("Play", channelId, ip, port, "0", "0", setup, 0, ssrc) } func (d *Device) BuildPlaybackRequest(channelId, ip string, port uint16, startTime, stopTime, setup string, ssrc string) (sip.Request, error) { return d.BuildInviteRequest("Playback", channelId, ip, port, startTime, stopTime, setup, 0, ssrc) } func (d *Device) BuildDownloadRequest(channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) (sip.Request, error) { return d.BuildInviteRequest("Download", channelId, ip, port, startTime, stopTime, setup, speed, ssrc) } func (d *Device) Close() { // 更新在数据库中的状态 d.Status = common.OFF _ = dao.Device.UpdateDeviceStatus(d.DeviceID, common.OFF) } // CreateDialogRequestFromAnswer 根据invite的应答创建Dialog请求 // 应答的to头域需携带tag func CreateDialogRequestFromAnswer(message sip.Response, uas bool, remoteAddr string) sip.Request { from, _ := message.From() to, _ := message.To() id, _ := message.CallID() requestLine := &sip.SipUri{} host, port, _ := net.SplitHostPort(remoteAddr) portInt, _ := strconv.Atoi(port) sipPort := sip.Port(portInt) requestLine.SetHost(host) requestLine.SetPort(&sipPort) seq, _ := message.CSeq() builder := NewSIPRequestBuilderWithTransport(message.Transport()) if uas { requestLine.SetUser(from.Address.User()) builder.SetFrom(sip.NewAddressFromToHeader(to)) builder.SetTo(sip.NewAddressFromFromHeader(from)) } else { requestLine.SetUser(to.Address.User()) builder.SetFrom(sip.NewAddressFromFromHeader(from)) builder.SetTo(sip.NewAddressFromToHeader(to)) } builder.SetCallID(id) builder.SetMethod(sip.BYE) builder.SetRecipient(requestLine) builder.SetSeqNo(uint(seq.SeqNo + 1)) request, err := builder.Build() if err != nil { panic(err) } return request } func (d *Device) CreateDialogRequestFromAnswer(message sip.Response, uas bool) sip.Request { return CreateDialogRequestFromAnswer(message, uas, net.JoinHostPort(d.RemoteIP, strconv.Itoa(d.RemotePort))) }