From 19ab406edd0200c3002b7179918092a3919dcaad Mon Sep 17 00:00:00 2001 From: ydajiang Date: Sat, 31 May 2025 21:10:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=811078=E8=BD=ACGB28181?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 194 +++++++++++++++++++++---------------- api_jt.go | 116 ++++++++++++++++++++++ broadcast.go | 81 ++-------------- client.go | 110 ++++++++++++++------- client_benchmark_test.go | 24 ++--- client_manager.go | 94 ++++++++++++++++++ config.go | 7 ++ config.json | 8 +- dao_channel.go | 58 +++++++++++ dao_jt.go | 118 ++++++++++++++++++++++ dao_platform.go | 62 ++++++------ db_sqlite.go | 7 +- device.go | 12 +-- dialogs.go | 2 +- hook/event.go | 50 ++++++++++ jt_device.go | 70 +++++++++++++ live.go | 16 +-- main.go | 15 ++- media_server.go | 135 ++++++++++++++++---------- message_factory.go | 19 +++- platform.go | 131 +++++++++---------------- platform_manager.go | 119 ----------------------- position.go | 2 +- recover.go | 48 +++++---- sink.go | 10 +- sink_manager.go | 48 ++++++++- sip_handler.go | 8 ++ sip_server.go | 171 +++++++++++++++++++------------- sip_client.go => sip_ua.go | 55 +++++------ stream.go | 15 ++- xml.go | 5 +- xml_record.go | 2 +- 32 files changed, 1174 insertions(+), 638 deletions(-) create mode 100644 api_jt.go create mode 100644 client_manager.go create mode 100644 dao_jt.go create mode 100644 hook/event.go create mode 100644 jt_device.go delete mode 100644 platform_manager.go rename sip_client.go => sip_ua.go (87%) diff --git a/api.go b/api.go index 30378b3..5983116 100644 --- a/api.go +++ b/api.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "gb-cms/hook" "github.com/ghettovoice/gosip/sip" "github.com/gorilla/mux" "github.com/gorilla/websocket" @@ -31,7 +32,7 @@ type InviteParams struct { type StreamParams struct { Stream StreamID `json:"stream"` // Source - Protocol string `json:"protocol"` // 推拉流协议 + Protocol int `json:"protocol"` // 推拉流协议 RemoteAddr string `json:"remote_addr"` // peer地址 } @@ -161,8 +162,8 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制 apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表 - apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &SIPUAParams{})) // 添加级联设备 - apiServer.router.HandleFunc("/api/v1/platform/remove", withJsonResponse(apiServer.OnPlatformRemove, &SIPUAParams{})) // 删除级联设备 + apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &PlatformModel{})) // 添加级联设备 + apiServer.router.HandleFunc("/api/v1/platform/remove", withJsonResponse(apiServer.OnPlatformRemove, &PlatformModel{})) // 删除级联设备 apiServer.router.HandleFunc("/api/v1/platform/channel/bind", withJsonResponse(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道 apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", withJsonResponse(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联解绑通道 @@ -170,6 +171,14 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/broadcast/hangup", withJsonResponse(apiServer.OnHangup, &BroadcastParams{})) // 挂断广播会话 apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲 + apiServer.router.HandleFunc("/api/v1/jt/device/add", withJsonResponse(apiServer.OnVirtualDeviceAdd, &JTDeviceModel{})) + apiServer.router.HandleFunc("/api/v1/jt/device/edit", withJsonResponse(apiServer.OnVirtualDeviceEdit, &JTDeviceModel{})) + apiServer.router.HandleFunc("/api/v1/jt/device/remove", withJsonResponse(apiServer.OnVirtualDeviceRemove, &JTDeviceModel{})) + + apiServer.router.HandleFunc("/api/v1/jt/channel/add", withJsonResponse(apiServer.OnVirtualChannelAdd, &Channel{})) + apiServer.router.HandleFunc("/api/v1/jt/channel/edit", withJsonResponse(apiServer.OnVirtualChannelEdit, &Channel{})) + apiServer.router.HandleFunc("/api/v1/jt/channel/remove", withJsonResponse(apiServer.OnVirtualChannelRemove, &Channel{})) + http.Handle("/", apiServer.router) srv := &http.Server{ @@ -201,9 +210,13 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt //ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive"&"stream_type=playback"&"start_time=2024-06-18T15:20:56"&"end_time=2024-06-18T15:25:56 //ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive&stream_type=playback&start_time=2024-06-18T15:20:56&end_time=2024-06-18T15:25:56 + // 拉流地址携带的参数 + query := r.URL.Query() + jtSource := query.Get("forward_type") == "gateway_1078" + // 跳过非国标拉流 sourceStream := strings.Split(string(params.Stream), "/") - if len(sourceStream) != 2 || len(sourceStream[0]) != 20 || len(sourceStream[1]) < 20 { + if !jtSource && (len(sourceStream) != 2 || len(sourceStream[0]) != 20 || len(sourceStream[1]) < 20) { Sugar.Infof("跳过非国标拉流 stream: %s", params.Stream) return } @@ -211,6 +224,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt // 已经存在,累加计数 if stream, _ := StreamDao.QueryStream(params.Stream); stream != nil { stream.IncreaseSinkCount() + return } deviceId := sourceStream[0] @@ -219,35 +233,52 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt channelId = channelId[:20] } - // 发起invite的参数 - query := r.URL.Query() - inviteParams := &InviteParams{ - DeviceID: deviceId, - ChannelID: channelId, - StartTime: query.Get("start_time"), - EndTime: query.Get("end_time"), - Setup: strings.ToLower(query.Get("setup")), - Speed: query.Get("speed"), - streamId: params.Stream, - } - var code int - var stream *Stream - var err error - streamType := strings.ToLower(query.Get("stream_type")) - if "playback" == streamType { - code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r) - } else if "download" == streamType { - code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r) - } else { - code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r) - } + // 通知1078信令服务器 + if jtSource { + if len(sourceStream) != 2 { + code = http.StatusBadRequest + Sugar.Errorf("1078信令服务器转发请求参数错误") + return + } - if err != nil { - Sugar.Errorf("请求流失败 err: %s", err.Error()) - utils.Assert(http.StatusOK != code) - } else if http.StatusOK == code { - stream.IncreaseSinkCount() + simNumber := sourceStream[0] + channelNumber := sourceStream[1] + response, err := hook.PostOnInviteEvent(simNumber, channelNumber) + if err != nil { + code = http.StatusInternalServerError + Sugar.Errorf("通知1078信令服务器失败 err: %s sim number: %s channel number: %s", err.Error(), simNumber, channelNumber) + } else if code = response.StatusCode; code != http.StatusOK { + Sugar.Errorf("通知1078信令服务器失败. 响应状态码: %d sim number: %s channel number: %s", response.StatusCode, simNumber, channelNumber) + } + } else { + inviteParams := &InviteParams{ + DeviceID: deviceId, + ChannelID: channelId, + StartTime: query.Get("start_time"), + EndTime: query.Get("end_time"), + Setup: strings.ToLower(query.Get("setup")), + Speed: query.Get("speed"), + streamId: params.Stream, + } + + var stream *Stream + var err error + streamType := strings.ToLower(query.Get("stream_type")) + if "playback" == streamType { + code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r) + } else if "download" == streamType { + code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r) + } else { + code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r) + } + + if err != nil { + Sugar.Errorf("请求流失败 err: %s", err.Error()) + utils.Assert(http.StatusOK != code) + } else if http.StatusOK == code { + stream.IncreaseSinkCount() + } } w.WriteHeader(code) @@ -256,63 +287,50 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) { Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream) - //stream := StreamManager.Find(params.StreamID) - //if stream == nil { - // Sugar.Errorf("处理播放结束事件失败, stream不存在. id: %s", params.StreamID) - // return - //} - - //if 0 == stream.DecreaseSinkCount() && Config.AutoCloseOnIdle { - // CloseStream(params.StreamID, true) - //} - - if !strings.HasPrefix(params.Protocol, "gb") { - return - } - sink := RemoveForwardSink(params.Stream, params.Sink) if sink == nil { - Sugar.Errorf("处理转发结束事件失败, 找不到sink. stream: %s sink: %s", params.Stream, params.Sink) return } // 级联断开连接, 向上级发送Bye请求 - if params.Protocol == "gb_cascaded_forward" { + if params.Protocol == TransStreamGBCascaded { if platform := PlatformManager.Find(sink.ServerAddr); platform != nil { callID, _ := sink.Dialog.CallID() - platform.CloseStream(callID.Value(), true, false) + platform.(*Platform).CloseStream(callID.Value(), true, false) } - } else if params.Protocol == "gb_talk_forward" { - // 对讲设备断开连接 + } else { + sink.Close(true, false) } - - sink.Close(true, false) } func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) { Sugar.Infof("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream) - stream := Dialogs.Find(string(params.Stream)) + if SourceTypeRtmp == params.Protocol { + return + } + + stream := EarlyDialogs.Find(string(params.Stream)) if stream != nil { stream.Put(200) } // 对讲websocket已连接 // 创建stream - if "gb_talk" == params.Protocol { + if params.Protocol == SourceTypeGBTalk { Sugar.Infof("对讲websocket已连接, stream: %s", params.Stream) + } - s := &Stream{ - StreamID: params.Stream, - Protocol: params.Protocol, - } + s := &Stream{ + StreamID: params.Stream, + Protocol: params.Protocol, + } - _, ok := StreamDao.SaveStream(s) - if !ok { - Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream) - w.WriteHeader(http.StatusBadRequest) - return - } + _, ok := StreamDao.SaveStream(s) + if !ok { + Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream) + w.WriteHeader(http.StatusBadRequest) + return } } @@ -321,7 +339,7 @@ func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter, CloseStream(params.Stream, false) // 对讲websocket断开连接 - if "gb_talk" == params.Protocol { + if SourceTypeGBTalk == params.Protocol { } } @@ -330,7 +348,7 @@ func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, Sugar.Infof("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream) // 非rtmp空闲超时, 返回非200应答, 删除会话 - if params.Protocol != "rtmp" { + if SourceTypeRtmp != params.Protocol { w.WriteHeader(http.StatusForbidden) CloseStream(params.Stream, false) } @@ -340,7 +358,7 @@ func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWrit Sugar.Infof("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream) // 非rtmp推流超时, 返回非200应答, 删除会话 - if params.Protocol != "rtmp" { + if SourceTypeRtmp != params.Protocol { w.WriteHeader(http.StatusForbidden) CloseStream(params.Stream, false) } @@ -576,7 +594,7 @@ func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *ht seekRequest.RemoveHeader(RtspMessageType.Name()) seekRequest.AppendHeader(&RtspMessageType) - SipUA.SendRequest(seekRequest) + SipStack.SendRequest(seekRequest) return nil, nil } @@ -605,7 +623,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r * defer func() { if !ok { if InviteSourceId != "" { - Dialogs.Remove(InviteSourceId) + EarlyDialogs.Remove(InviteSourceId) } if sinkStreamId != "" { @@ -642,7 +660,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r * sink := &Sink{ StreamID: v.StreamId, SinkStreamID: sinkStreamId, - Protocol: "gb_talk_forward", + Protocol: "gb_talk", CreateTime: time.Now().Unix(), SetupType: setupType, } @@ -651,7 +669,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r * if err := SinkDao.SaveForwardSink(v.StreamId, sink); err != nil { Sugar.Errorf("广播失败, 设备正在广播中. stream: %s", sinkStreamId) return nil, fmt.Errorf("设备正在广播中") - } else if _, ok = Dialogs.Add(InviteSourceId, streamWaiting); !ok { + } else if _, ok = EarlyDialogs.Add(InviteSourceId, streamWaiting); !ok { Sugar.Errorf("广播失败, id冲突. id: %s", InviteSourceId) return nil, fmt.Errorf("id冲突") } @@ -682,7 +700,8 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r * Sugar.Errorf("广播失败, 下级设备invite失败. stream: %s", sinkStreamId) return nil, fmt.Errorf("错误应答 code: %d", code) } else { - ok = AddForwardSink(v.StreamId, sink) + //ok = AddForwardSink(v.StreamId, sink) + ok = true } break case <-cancel.Done(): @@ -712,7 +731,7 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) { } } -func (api *ApiServer) OnPlatformAdd(v *SIPUAParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformAdd(v *PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { Sugar.Infof("添加级联设备 %v", *v) if v.Username == "" { @@ -731,27 +750,32 @@ func (api *ApiServer) OnPlatformAdd(v *SIPUAParams, w http.ResponseWriter, r *ht } v.Status = "OFF" - - platform, err := NewGBPlatform(v, SipUA) - if err == nil { - err = AddPlatform(platform) + platform, err := NewPlatform(&v.SIPUAOptions, SipStack) + if err != nil { + Sugar.Errorf("创建级联设备失败 err: %s", err.Error()) + return nil, err } - if err == nil { - platform.Start() + if !PlatformManager.Add(v.ServerAddr, platform) { + Sugar.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr) + return fmt.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr), nil + } else if err = PlatformDao.SavePlatform(v); err != nil { + PlatformManager.Remove(v.ServerAddr) + Sugar.Errorf("保存级联设备失败 err: %s", err.Error()) + return nil, err } + platform.Start() return nil, err } -func (api *ApiServer) OnPlatformRemove(v *SIPUAParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformRemove(v *PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { Sugar.Infof("删除级联设备 %v", *v) - platform, err := RemovePlatform(v.ServerAddr) + err := PlatformDao.DeleteUAByAddr(v.ServerAddr) if err != nil { - Sugar.Errorf("删除级联设备失败 err: %s", err.Error()) return nil, err - } else if platform != nil { + } else if platform := PlatformManager.Remove(v.ServerAddr); platform != nil { platform.Stop() } @@ -759,8 +783,8 @@ func (api *ApiServer) OnPlatformRemove(v *SIPUAParams, w http.ResponseWriter, r } func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) { - platforms := LoadPlatforms() - httpResponseOK(w, platforms) + //platforms := LoadPlatforms() + //httpResponseOK(w, platforms) } func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { diff --git a/api_jt.go b/api_jt.go new file mode 100644 index 0000000..203b53e --- /dev/null +++ b/api_jt.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "net/http" +) + +func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("add virtual device: %v", *device) + + if len(device.Username) != 20 { + Sugar.Errorf("invalid username: %s", device.Username) + return nil, fmt.Errorf("invalid username: %s", device.Username) + } else if len(device.SeverID) != 20 { + Sugar.Errorf("invalid server id: %s", device.SeverID) + return nil, fmt.Errorf("invalid server id: %s", device.SeverID) + } else if device.SimNumber == "" { + // sim卡号必选项 + Sugar.Errorf("sim number is required") + return nil, fmt.Errorf("sim number is required") + } + + if JTDeviceDao.ExistDevice(device.Username, device.SimNumber) { + // 用户名或sim卡号已存在 + Sugar.Errorf("username or sim number already exists") + return nil, fmt.Errorf("username or sim number already exists") + } else if DeviceDao.ExistDevice(device.Username) { + // 用户名与下级设备冲突 + Sugar.Errorf("username already exists") + return nil, fmt.Errorf("username already exists") + } + + jtDevice, err := NewJTDevice(device, SipStack) + if err != nil { + Sugar.Errorf("create virtual device failed: %s", err.Error()) + return nil, err + } + + if !JTDeviceManager.Add(device.Username, jtDevice) { + return nil, fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username) + } else if err = JTDeviceDao.SaveDevice(device); err != nil { + JTDeviceManager.Remove(device.Username) + Sugar.Errorf("save device failed: %s", err.Error()) + return nil, err + } + + jtDevice.Start() + + if err != nil { + Sugar.Errorf("add jt device failed: %s", err.Error()) + return nil, err + } + + return nil, nil +} + +func (api *ApiServer) OnVirtualDeviceEdit(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + + return nil, nil +} + +func (api *ApiServer) OnVirtualDeviceRemove(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + err := JTDeviceDao.DeleteDevice(device.Username) + if err != nil { + return nil, err + } else if client := JTDeviceManager.Remove(device.Username); client != nil { + client.Stop() + } + + return nil, nil +} + +func (api *ApiServer) OnVirtualChannelAdd(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("add virtual channel: %v", *channel) + + device, err := JTDeviceDao.QueryDevice(channel.RootID) + if err != nil { + Sugar.Errorf("query jt device failed: %s device: %s ", err.Error(), channel.RootID) + return nil, err + } + + if len(channel.DeviceID) != 20 { + Sugar.Errorf("invalid channel id: %s", channel.DeviceID) + return nil, fmt.Errorf("invalid channel id: %s", channel.DeviceID) + } + + channel.ParentID = device.Username + channel.RootID = device.Username + channel.GroupID = device.Username + err = ChannelDao.SaveJTChannel(channel) + if err != nil { + Sugar.Errorf("save channel failed: %s", err.Error()) + } + return nil, err +} + +func (api *ApiServer) OnVirtualChannelEdit(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + + return nil, nil +} + +func (api *ApiServer) OnVirtualChannelRemove(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("remove virtual channel: %v", *channel) + + device, err := JTDeviceDao.QueryDevice(channel.RootID) + if err != nil { + Sugar.Errorf("query jt device failed: %s device: %s ", err.Error(), channel.RootID) + return nil, err + } + + err = ChannelDao.DeleteChannel(device.Username, channel.DeviceID) + if err != nil { + Sugar.Errorf("delete channel failed: %s", err.Error()) + } + return nil, err +} diff --git a/broadcast.go b/broadcast.go index b9c0114..cbb7811 100644 --- a/broadcast.go +++ b/broadcast.go @@ -2,12 +2,8 @@ package main import ( "fmt" - "gb-cms/sdp" "github.com/ghettovoice/gosip/sip" - "net" "net/http" - "strconv" - "strings" ) const ( @@ -18,107 +14,50 @@ const ( "%s\r\n" + "%s\r\n" + "\r\n" - - AnswerFormat = "v=0\r\n" + - "o=%s 0 0 IN IP4 %s\r\n" + - "s=Play\r\n" + - "c=IN IP4 %s\r\n" + - "t=0 0\r\n" + - "m=audio %d %s 8\r\n" + - "a=sendonly\r\n" + - "a=rtpmap:8 PCMA/8000\r\n" ) -func findSetup(descriptor *sdp.SDP) SetupType { - var tcp bool - if descriptor.Audio != nil { - tcp = strings.Contains(descriptor.Audio.Proto, "TCP") - } - - if !tcp && descriptor.Video != nil { - tcp = strings.Contains(descriptor.Video.Proto, "TCP") - } - - setup := SetupTypeUDP - if tcp { - for _, attr := range descriptor.Attrs { - if "setup" == attr[0] { - if SetupTypePassive.String() == attr[1] { - setup = SetupTypePassive - } else if SetupTypeActive.String() == attr[1] { - setup = SetupTypeActive - } - } - } - } - - return setup -} - func (d *Device) DoBroadcast(sourceId, channelId string) error { body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId) request := d.BuildMessageRequest(channelId, body) - SipUA.SendRequest(request) + SipStack.SendRequest(request) return nil } // OnInvite 语音广播 func (d *Device) OnInvite(request sip.Request, user string) sip.Response { - streamWaiting := Dialogs.Find(user) + // 会话是否存在 + streamWaiting := EarlyDialogs.Find(user) if streamWaiting == nil { return CreateResponseWithStatusCode(request, http.StatusBadRequest) } + // 解析offer sink := streamWaiting.data.(*Sink) body := request.Body() - offer, err := sdp.Parse(body) + offer, err := ParseGBSDP(body) if err != nil { Sugar.Infof("广播失败, 解析sdp发生err: %s sink: %s sdp: %s", err.Error(), sink.SinkID, body) streamWaiting.Put(http.StatusBadRequest) return CreateResponseWithStatusCode(request, http.StatusBadRequest) - } else if offer.Audio == nil { + } else if offer.media == nil { Sugar.Infof("广播失败, offer中缺少audio字段. sink: %s sdp: %s", sink.SinkID, body) streamWaiting.Put(http.StatusBadRequest) return CreateResponseWithStatusCode(request, http.StatusBadRequest) } - // 通知流媒体服务器创建answer - offerSetup := findSetup(offer) - answerSetup := sink.SetupType - finalSetup := offerSetup - if answerSetup != offerSetup { - finalSetup = answerSetup + // http接口中设置的setup优先级高于sdp中的setup + if offer.answerSetup != sink.SetupType { + offer.answerSetup = sink.SetupType } - addr := net.JoinHostPort(offer.Addr, strconv.Itoa(int(offer.Audio.Port))) - host, port, sinkId, err := CreateAnswer(string(sink.StreamID), addr, offerSetup.String(), answerSetup.String(), "", string(InviteTypeBroadcast)) + response, err := AddForwardSink(TransStreamGBTalk, request, user, sink, sink.StreamID, offer, InviteTypeBroadcast, "8 PCMA/8000") if err != nil { Sugar.Errorf("广播失败, 流媒体创建answer发生err: %s sink: %s ", err.Error(), sink.SinkID) streamWaiting.Put(http.StatusInternalServerError) return CreateResponseWithStatusCode(request, http.StatusInternalServerError) } - var answerSDP string - // UDP广播 - if SetupTypeUDP == finalSetup { - answerSDP = fmt.Sprintf(AnswerFormat, Config.SipID, host, host, port, "RTP/AVP") - } else { - // TCP广播 - answerSDP = fmt.Sprintf(AnswerFormat, Config.SipID, host, host, port, "TCP/RTP/AVP") - } - - // 创建answer和dialog - response := CreateResponseWithStatusCode(request, http.StatusOK) - setToTag(response) - - sink.SinkID = sinkId - sink.SetDialog(d.CreateDialogRequestFromAnswer(response, true)) - - response.SetBody(answerSDP, true) - response.AppendHeader(&SDPMessageType) - response.AppendHeader(GlobalContactAddress.AsContactHeader()) - streamWaiting.Put(http.StatusOK) return response } diff --git a/client.go b/client.go index 5646fb5..aefab1e 100644 --- a/client.go +++ b/client.go @@ -2,14 +2,22 @@ package main import ( "encoding/xml" + "fmt" "gb-cms/sdp" "github.com/ghettovoice/gosip/sip" "strconv" "strings" ) +const ( + DefaultDomainName = "本域" + DefaultManufacturer = "github/lkmio" + DefaultModel = "gb-cms" + DefaultFirmware = "dev" +) + type GBClient interface { - SipClient + SIPUA GBDevice @@ -22,19 +30,21 @@ type GBClient interface { OnQueryDeviceInfo(sn int) OnSubscribeCatalog(sn int) + + CloseStream(callId string, bye, ms bool) } -type Client struct { - *sipClient +type gbClient struct { + *sipUA Device deviceInfo *DeviceInfoResponse } -func (g *Client) OnQueryCatalog(sn int, channels []*Channel) { +func (g *gbClient) OnQueryCatalog(sn int, channels []*Channel) { response := CatalogResponse{} response.SN = sn response.CmdType = CmdCatalog - response.DeviceID = g.sipClient.Username + response.DeviceID = g.sipUA.Username response.SumNum = len(channels) if response.SumNum < 1 { @@ -48,60 +58,78 @@ func (g *Client) OnQueryCatalog(sn int, channels []*Channel) { response.DeviceList.Devices = nil response.DeviceList.Num = 1 // 一次发一个通道 response.DeviceList.Devices = append(response.DeviceList.Devices, &channel) - response.DeviceList.Devices[0].ParentID = g.sipClient.Username + response.DeviceList.Devices[0].ParentID = g.sipUA.Username g.SendMessage(&response) } } -func (g *Client) SendMessage(msg interface{}) { +func (g *gbClient) SendMessage(msg interface{}) { marshal, err := xml.MarshalIndent(msg, "", " ") if err != nil { panic(err) } - request, err := BuildMessageRequest(g.sipClient.Username, g.sipClient.ListenAddr, g.sipClient.SeverID, g.sipClient.ServerAddr, g.sipClient.Transport, string(marshal)) + request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.SeverID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal)) if err != nil { panic(err) } - g.sipClient.ua.SendRequest(request) + g.sipUA.stack.SendRequest(request) } -func (g *Client) OnQueryDeviceInfo(sn int) { +func (g *gbClient) OnQueryDeviceInfo(sn int) { g.deviceInfo.SN = sn g.SendMessage(&g.deviceInfo) } -func (g *Client) OnInvite(request sip.Request, user string) sip.Response { +func (g *gbClient) OnInvite(request sip.Request, user string) sip.Response { return nil } -func (g *Client) SetDeviceInfo(name, manufacturer, model, firmware string) { +func (g *gbClient) SetDeviceInfo(name, manufacturer, model, firmware string) { g.deviceInfo.DeviceName = name g.deviceInfo.Manufacturer = manufacturer g.deviceInfo.Model = model g.deviceInfo.Firmware = firmware } -func (g *Client) OnSubscribeCatalog(sn int) { +func (g *gbClient) OnSubscribeCatalog(sn int) { } -func ParseGBSDP(body string) (offer *sdp.SDP, ssrc string, speed int, media *sdp.Media, offerSetup, answerSetup string, err error) { - offer, err = sdp.Parse(body) +func (g *gbClient) CloseStream(callId string, bye, ms bool) { + +} + +type GBSDP struct { + sdp *sdp.SDP + ssrc string + speed int + media *sdp.Media + mediaType string + offerSetup, answerSetup SetupType + startTime, stopTime string + connectionAddr string + isTcpTransport bool +} + +func ParseGBSDP(body string) (*GBSDP, error) { + offer, err := sdp.Parse(body) if err != nil { - return nil, "", 0, nil, "", "", err + return nil, err } + gbSdp := &GBSDP{sdp: offer} // 解析设置下载速度 var setup string for _, attr := range offer.Attrs { if "downloadspeed" == attr[0] { - speed, err = strconv.Atoi(attr[1]) + speed, err := strconv.Atoi(attr[1]) if err != nil { - return nil, "", 0, nil, "", "", err + return nil, err } + gbSdp.speed = speed } else if "setup" == attr[0] { setup = attr[1] } @@ -110,35 +138,51 @@ func ParseGBSDP(body string) (offer *sdp.SDP, ssrc string, speed int, media *sdp // 解析ssrc for _, attr := range offer.Other { if "y" == attr[0] { - ssrc = attr[1] + gbSdp.ssrc = attr[1] } } if offer.Video != nil { - media = offer.Video + gbSdp.media = offer.Video + gbSdp.mediaType = "video" } else if offer.Audio != nil { - media = offer.Audio + gbSdp.media = offer.Audio + gbSdp.mediaType = "audio" } - tcp := strings.HasPrefix(media.Proto, "TCP") + tcp := strings.HasPrefix(gbSdp.media.Proto, "TCP") if "passive" == setup && tcp { - offerSetup = "passive" - answerSetup = "active" + gbSdp.offerSetup = SetupTypePassive + gbSdp.answerSetup = SetupTypeActive } else if "active" == setup && tcp { - offerSetup = "active" - answerSetup = "passive" + gbSdp.offerSetup = SetupTypeActive + gbSdp.answerSetup = SetupTypePassive } - return + time := strings.Split(gbSdp.sdp.Time, " ") + if len(time) < 2 { + return nil, fmt.Errorf("sdp的时间范围格式错误 time: %s sdp: %s", gbSdp.sdp.Time, body) + } + + gbSdp.startTime = time[0] + gbSdp.stopTime = time[1] + gbSdp.isTcpTransport = tcp + gbSdp.connectionAddr = fmt.Sprintf("%s:%d", gbSdp.sdp.Addr, gbSdp.media.Port) + return gbSdp, nil } -func NewGBClient(params *SIPUAParams, ua SipServer) GBClient { - sip := &sipClient{ - SIPUAParams: *params, - ListenAddr: ua.ListenAddr(), - ua: ua, +func NewGBClient(params *SIPUAOptions, stack SipServer) GBClient { + ua := &sipUA{ + SIPUAOptions: *params, + ListenAddr: stack.ListenAddr(), + stack: stack, } - client := &Client{sip, Device{DeviceID: params.Username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: params.Username, CmdType: CmdDeviceInfo}, Result: "OK"}}} + // 心跳间隔最低10秒 + if ua.SIPUAOptions.KeepaliveInterval < 10 { + ua.SIPUAOptions.KeepaliveInterval = 10 + } + + client := &gbClient{ua, Device{DeviceID: params.Username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: params.Username, CmdType: CmdDeviceInfo}, Result: "OK"}}} return client } diff --git a/client_benchmark_test.go b/client_benchmark_test.go index d0abe5f..d932d22 100644 --- a/client_benchmark_test.go +++ b/client_benchmark_test.go @@ -100,8 +100,8 @@ package main // m.Close(true) //} // -//type VirtualDevice struct { -// *Client +//type Platform struct { +// *gbClient // streams map[string]*MediaStream // lock sync.Locker //} @@ -126,7 +126,7 @@ package main // } //} // -//func (v VirtualDevice) OnInvite(request sip.Request, user string) sip.Response { +//func (v Platform) OnInvite(request sip.Request, user string) sip.Response { // if len(rtpPackets) < 1 { // return CreateResponseWithStatusCode(request, http.StatusInternalServerError) // } @@ -150,10 +150,10 @@ package main // var ip string // var port sip.Port // var contactAddr string -// if v.sipClient.NatAddr != "" { -// contactAddr = v.sipClient.NatAddr +// if v.sipUA.NatAddr != "" { +// contactAddr = v.sipUA.NatAddr // } else { -// contactAddr = v.sipClient.ListenAddr +// contactAddr = v.sipUA.ListenAddr // } // // host, p, _ := net.SplitHostPort(contactAddr) @@ -180,7 +180,7 @@ package main // i, _ := strconv.Atoi(ssrc) // stream.ssrc = uint32(i) // stream.tcp = tcp -// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipClient.Domain) +// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain) // callId, _ := response.CallID() // // { @@ -203,7 +203,7 @@ package main // // if sendBye { // bye := CreateRequestFromDialog(stream.dialog, sip.BYE) -// v.sipClient.ua.SendRequest(bye) +// v.sipUA.stack.SendRequest(bye) // } // // stream.dialog = nil @@ -219,7 +219,7 @@ package main // stream.Start() // // // 绑定到StreamManager, bye请求才会找到设备回调 -// streamId := GenerateStreamID(InviteTypePlay, v.sipClient.Username, user, "", "") +// streamId := GenerateStreamID(InviteTypePlay, v.sipUA.Username, user, "", "") // s := StreamID{StreamID: streamId, Dialog: stream.dialog} // StreamManager.Add(&s) // @@ -228,7 +228,7 @@ package main // return response //} // -//func (v VirtualDevice) OnBye(request sip.Request) { +//func (v Platform) OnBye(request sip.Request) { // id, _ := request.CallID() // stream, ok := v.streams[id.Value()] // if !ok { @@ -245,7 +245,7 @@ package main // stream.Close(false) //} // -//func (v VirtualDevice) Offline() { +//func (v Platform) Offline() { // for _, stream := range v.streams { // stream.Close(true) // } @@ -333,7 +333,7 @@ package main // channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1) // client := NewGBClient(deviceId, clientConfig.ServerAddr, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server) // -// device := VirtualDevice{client.(*Client), map[string]*MediaStream{}, &sync.Mutex{}} +// device := Platform{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}} // device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1") // // channel := &Channel{ diff --git a/client_manager.go b/client_manager.go new file mode 100644 index 0000000..d9dd10f --- /dev/null +++ b/client_manager.go @@ -0,0 +1,94 @@ +package main + +import ( + "sync" +) + +var ( + // PlatformManager 管理级联设备 + PlatformManager = &ClientManager{ + clients: make(map[string]GBClient, 8), // server addr->client + addrMap: make(map[string]int, 8), + } + + // JTDeviceManager 管理1078设备 + JTDeviceManager = &ClientManager{ + clients: make(map[string]GBClient, 8), // username->client + addrMap: make(map[string]int, 8), + } +) + +type ClientManager struct { + clients map[string]GBClient + addrMap map[string]int + lock sync.RWMutex +} + +func (p *ClientManager) Add(key string, client GBClient) bool { + p.lock.Lock() + defer p.lock.Unlock() + + if _, ok := p.clients[key]; ok { + return false + } + + p.clients[key] = client + p.addrMap[client.GetDomain()]++ + return true +} + +func (p *ClientManager) Find(key string) GBClient { + p.lock.RLock() + defer p.lock.RUnlock() + if client, ok := p.clients[key]; ok { + return client + } + return nil +} + +func (p *ClientManager) Remove(addr string) GBClient { + p.lock.Lock() + defer p.lock.Unlock() + + client, ok := p.clients[addr] + if !ok { + return nil + } + + p.addrMap[client.GetDomain()]++ + if p.addrMap[client.GetDomain()] < 1 { + delete(p.addrMap, client.GetDomain()) + } + + delete(p.clients, addr) + return client +} + +func (p *ClientManager) All() []GBClient { + p.lock.RLock() + defer p.lock.RUnlock() + + clients := make([]GBClient, 0, len(p.clients)) + for _, client := range p.clients { + clients = append(clients, client) + } + + return clients +} + +func (p *ClientManager) ExistClientByServerAddr(addr string) bool { + p.lock.RLock() + defer p.lock.RUnlock() + _, ok := p.addrMap[addr] + return ok +} + +func RemovePlatform(key string) (GBClient, error) { + err := PlatformDao.DeleteUAByAddr(key) + if err != nil { + return nil, err + } + + platform := PlatformManager.Remove(key) + return platform, nil +} diff --git a/config.go b/config.go index 8836b5a..b7eec6a 100644 --- a/config.go +++ b/config.go @@ -25,6 +25,13 @@ type Config_ struct { Addr string `json:"addr"` Password string `json:"password"` } + + Hooks struct { + Online string `json:"online"` + Offline string `json:"offline"` + Position string `json:"position"` + OnInvite string `json:"on_invite"` + } } type LogConfig struct { diff --git a/config.json b/config.json index fa4c2f0..978bc64 100644 --- a/config.json +++ b/config.json @@ -22,6 +22,12 @@ "offline": "", "?position" : "设备位置通知", - "position": "" + "position": "", + + "?on_invite": "被邀请, 用于通知1078信令服务器, 向设备下发推流指令", + "on_invite": "http://localhost:8081/api/v1/jt1078/on_invite", + + "?on_answer": "被查询录像,用于通知1078信令服务器", + "on_query_record": "" } } \ No newline at end of file diff --git a/dao_channel.go b/dao_channel.go index f96a397..49890ea 100644 --- a/dao_channel.go +++ b/dao_channel.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "gorm.io/gorm" ) @@ -13,11 +14,25 @@ type DaoChannel interface { QueryChannels(deviceId, groupId, string, page, size int) ([]*Channel, int, error) + QueryChannelsByRootID(rootId string) ([]*Channel, error) + + QueryChannelsByChannelID(channelId string) ([]*Channel, error) + QueryChanelCount(deviceId string) (int, error) QueryOnlineChanelCount(deviceId string) (int, error) QueryChannelByTypeCode(codecs ...int) ([]*Channel, error) + + ExistChannel(channelId string) bool + + SaveJTChannel(channel *Channel) error + + ExistJTChannel(simNumber string, channelNumber int) bool + + QueryJTChannelBySimNumber(simNumber string) (*Channel, error) + + DeleteChannel(deviceId string, channelId string) error } type daoChannel struct { @@ -68,6 +83,15 @@ func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int) ([] return channels, int(total), nil } +func (d *daoChannel) QueryChannelsByRootID(rootId string) ([]*Channel, error) { + var channels []*Channel + tx := db.Where("root_id =?", rootId).Find(&channels) + if tx.Error != nil { + return nil, tx.Error + } + return channels, nil +} + func (d *daoChannel) QueryChanelCount(deviceId string) (int, error) { var total int64 tx := db.Model(&Channel{}).Where("root_id =?", deviceId).Count(&total) @@ -95,3 +119,37 @@ func (d *daoChannel) QueryChannelByTypeCode(codecs ...int) ([]*Channel, error) { } return channels, nil } + +func (d *daoChannel) ExistChannel(channelId string) bool { + var channel Channel + if db.Select("id").Where("device_id =?", channelId).Take(&channel).Error == nil { + return true + } + + return false +} + +func (d *daoChannel) SaveJTChannel(channel *Channel) error { + return DBTransaction(func(tx *gorm.DB) error { + var old Channel + if tx.Select("id").Where("root_id =? and channel_number =?", channel.RootID, channel.ChannelNumber).Take(&old).Error == nil { + return fmt.Errorf("channel number %d already exist", channel.ChannelNumber) + } else if tx.Select("id").Where("device_id =?", channel.DeviceID).Take(&old).Error == nil { + return fmt.Errorf("channel id %s already exist", channel.DeviceID) + } + return tx.Save(channel).Error + }) +} + +func (d *daoChannel) DeleteChannel(deviceId string, channelId string) error { + return db.Where("root_id =? and device_id =?", deviceId, channelId).Unscoped().Delete(&Channel{}).Error +} + +func (d *daoChannel) QueryChannelsByChannelID(channelId string) ([]*Channel, error) { + var channels []*Channel + tx := db.Where("device_id =?", channelId).Find(&channels) + if tx.Error != nil { + return nil, tx.Error + } + return channels, nil +} diff --git a/dao_jt.go b/dao_jt.go new file mode 100644 index 0000000..a3c29c0 --- /dev/null +++ b/dao_jt.go @@ -0,0 +1,118 @@ +package main + +import ( + "fmt" + "gorm.io/gorm" +) + +// JTDeviceModel 数据库表结构 +type JTDeviceModel struct { + GBModel + SIPUAOptions + Manufacturer string `json:"manufacturer"` + Model string `json:"model"` + Firmware string `json:"firmware"` + SimNumber string `json:"sim_number"` +} + +func (g *JTDeviceModel) TableName() string { + return "lkm_jt_device" +} + +// DaoJTDevice 保存级联和1078设备的sipua参数项 +type DaoJTDevice interface { + LoadDevices() ([]*JTDeviceModel, error) + + UpdateOnlineStatus(status OnlineStatus, username string) error + + QueryDevice(user string) (*JTDeviceModel, error) + + QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error) + + ExistDevice(username, simNumber string) bool + + DeleteDevice(username string) error + + SaveDevice(model *JTDeviceModel) error + + UpdateDevice(model *JTDeviceModel) error +} + +type daoJTDevice struct { +} + +func (d *daoJTDevice) LoadDevices() ([]*JTDeviceModel, error) { + var devices []*JTDeviceModel + tx := db.Find(&devices) + if tx.Error != nil { + return nil, tx.Error + } + + return devices, nil +} + +func (d *daoJTDevice) UpdateOnlineStatus(status OnlineStatus, username string) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(&JTDeviceModel{}).Where("username =?", username).Update("status", status).Error + }) +} + +func (d *daoJTDevice) ExistDevice(id, simNumber string) bool { + var device JTDeviceModel + if db.Where("username =? or sim_number =?", id, simNumber).Select("id").Take(&device).Error == nil { + return true + } + + return false +} + +func (d *daoJTDevice) QueryDevice(id string) (*JTDeviceModel, error) { + var device JTDeviceModel + tx := db.Where("username =?", id).Take(&device) + if tx.Error != nil { + return nil, tx.Error + } + return &device, nil +} + +func (d *daoJTDevice) DeleteDevice(id string) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error + }) +} + +func (d *daoJTDevice) QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error) { + var device JTDeviceModel + tx := db.Where("sim_number =?", simNumber).Take(&device) + if tx.Error != nil { + return nil, tx.Error + } + + return &device, nil +} + +func (d *daoJTDevice) SaveDevice(model *JTDeviceModel) error { + return DBTransaction(func(tx *gorm.DB) error { + var old JTDeviceModel + tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old) + if tx.Error == nil { + return fmt.Errorf("username or sim number already exists") + } + + return db.Save(model).Error + }) +} + +func (d *daoJTDevice) UpdateDevice(model *JTDeviceModel) error { + return DBTransaction(func(tx *gorm.DB) error { + var old JTDeviceModel + tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old) + if tx.Error != nil { + return tx.Error + } else { + model.ID = old.ID + } + + return db.Save(model).Error + }) +} diff --git a/dao_platform.go b/dao_platform.go index adef939..c9e1531 100644 --- a/dao_platform.go +++ b/dao_platform.go @@ -1,17 +1,26 @@ package main -type DaoPlatform interface { - LoadPlatforms() ([]*SIPUAParams, error) +// PlatformModel 数据库表结构 +type PlatformModel struct { + GBModel + SIPUAOptions +} - QueryPlatform(addr string) (*SIPUAParams, error) +func (g *PlatformModel) TableName() string { + return "lkm_platform" +} - SavePlatform(platform *SIPUAParams) error +// DaoVirtualDevice 保存级联和1078设备的sipua参数项 +type DaoVirtualDevice interface { + LoadPlatforms() ([]*PlatformModel, error) + + QueryPlatform(addr string) (*PlatformModel, error) + + SavePlatform(platform *PlatformModel) error DeletePlatform(addr string) error - UpdatePlatform(platform *SIPUAParams) error - - UpdatePlatformStatus(addr string, status OnlineStatus) error + UpdatePlatform(platform *PlatformModel) error BindChannels(addr string, channels [][2]string) ([][2]string, error) @@ -26,8 +35,8 @@ type DaoPlatform interface { type daoPlatform struct { } -func (d *daoPlatform) LoadPlatforms() ([]*SIPUAParams, error) { - var platforms []*SIPUAParams +func (d *daoPlatform) LoadPlatforms() ([]*PlatformModel, error) { + var platforms []*PlatformModel tx := db.Find(&platforms) if tx.Error != nil { return nil, tx.Error @@ -36,8 +45,8 @@ func (d *daoPlatform) LoadPlatforms() ([]*SIPUAParams, error) { return platforms, nil } -func (d *daoPlatform) QueryPlatform(addr string) (*SIPUAParams, error) { - var platform SIPUAParams +func (d *daoPlatform) QueryUAByAddr(addr string) (*PlatformModel, error) { + var platform PlatformModel tx := db.Where("server_addr =?", addr).First(&platform) if tx.Error != nil { return nil, tx.Error @@ -46,8 +55,8 @@ func (d *daoPlatform) QueryPlatform(addr string) (*SIPUAParams, error) { return &platform, nil } -func (d *daoPlatform) SavePlatform(platform *SIPUAParams) error { - var old SIPUAParams +func (d *daoPlatform) SavePlatform(platform *PlatformModel) error { + var old PlatformModel tx := db.Where("server_addr =?", platform.ServerAddr).First(&old) if tx.Error == nil { platform.ID = old.ID @@ -55,27 +64,27 @@ func (d *daoPlatform) SavePlatform(platform *SIPUAParams) error { return db.Save(platform).Error } -func (d *daoPlatform) DeletePlatform(addr string) error { - return db.Where("server_addr =?", addr).Unscoped().Delete(&SIPUAParams{}).Error +func (d *daoPlatform) DeleteUAByAddr(addr string) error { + return db.Where("server_addr =?", addr).Unscoped().Delete(&PlatformModel{}).Error } -func (d *daoPlatform) UpdatePlatform(platform *SIPUAParams) error { +func (d *daoPlatform) UpdatePlatform(platform *PlatformModel) error { //TODO implement me panic("implement me") } -func (d *daoPlatform) UpdatePlatformStatus(addr string, status OnlineStatus) error { - return db.Model(&SIPUAParams{}).Where("server_addr =?", addr).Update("status", status).Error +func (d *daoPlatform) UpdateOnlineStatus(status OnlineStatus, addr string) error { + return db.Model(&PlatformModel{}).Where("server_addr =?", addr).Update("status", status).Error } -type DBPlatformChannel struct { +type PlatformChannelModel struct { GBModel DeviceID string `json:"device_id"` Channel string `json:"channel_id"` ServerAddr string `json:"server_addr"` } -func (d *DBPlatformChannel) TableName() string { +func (d *PlatformChannelModel) TableName() string { return "lkm_platform_channel" } @@ -83,10 +92,10 @@ func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]stri var res [][2]string for _, channel := range channels { - var old DBPlatformChannel + var old PlatformChannelModel _ = db.Where("device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr).First(&old) if old.ID == 0 { - _ = db.Create(&DBPlatformChannel{ + _ = db.Create(&PlatformChannelModel{ DeviceID: channel[0], Channel: channel[1], }) @@ -100,7 +109,7 @@ func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]stri func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]string, error) { var res [][2]string for _, channel := range channels { - tx := db.Unscoped().Delete(&DBPlatformChannel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr) + tx := db.Unscoped().Delete(&PlatformChannelModel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr) if tx.Error == nil { res = append(res, channel) } else { @@ -112,8 +121,8 @@ func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]st } func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (string, *Channel, error) { - var platformChannel DBPlatformChannel - tx := db.Model(&DBPlatformChannel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel) + var platformChannel PlatformChannelModel + tx := db.Model(&PlatformChannelModel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel) if tx.Error != nil { return "", nil, tx.Error } @@ -128,7 +137,7 @@ func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (strin } func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*Channel, error) { - var platformChannels []*DBPlatformChannel + var platformChannels []*PlatformChannelModel tx := db.Where("server_addr =?", addr).Find(&platformChannels) if tx.Error != nil { return nil, tx.Error @@ -143,7 +152,6 @@ func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*Channel, error) { } else { Sugar.Errorf("查询级联设备通道失败. device_id: %s, channel_id: %s err: %s", platformChannel.DeviceID, platformChannel.Channel, tx.Error) } - } return channels, nil diff --git a/db_sqlite.go b/db_sqlite.go index 61a676c..f03236d 100644 --- a/db_sqlite.go +++ b/db_sqlite.go @@ -21,6 +21,7 @@ var ( PlatformDao = &daoPlatform{} StreamDao = &daoStream{} SinkDao = &daoSink{} + JTDeviceDao = &daoJTDevice{} ) func init() { @@ -61,13 +62,15 @@ func init() { panic(err) } else if err = db.AutoMigrate(&Channel{}); err != nil { panic(err) - } else if err = db.AutoMigrate(&SIPUAParams{}); err != nil { + } else if err = db.AutoMigrate(&PlatformModel{}); err != nil { panic(err) } else if err = db.AutoMigrate(&Stream{}); err != nil { panic(err) } else if err = db.AutoMigrate(&Sink{}); err != nil { panic(err) - } else if err = db.AutoMigrate(&DBPlatformChannel{}); err != nil { + } else if err = db.AutoMigrate(&PlatformChannelModel{}); err != nil { + panic(err) + } else if err = db.AutoMigrate(&JTDeviceModel{}); err != nil { panic(err) } diff --git a/device.go b/device.go index 5cca9bd..6636daf 100644 --- a/device.go +++ b/device.go @@ -127,19 +127,19 @@ func (d *Device) BuildMessageRequest(to, body string) sip.Request { func (d *Device) QueryDeviceInfo() { body := fmt.Sprintf(DeviceInfoFormat, "1", d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) - SipUA.SendRequest(request) + SipStack.SendRequest(request) } func (d *Device) QueryCatalog() { body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) - SipUA.SendRequest(request) + SipStack.SendRequest(request) } func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error { body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_) request := d.BuildMessageRequest(channelId, body) - SipUA.SendRequest(request) + SipStack.SendRequest(request) return nil } @@ -169,7 +169,7 @@ func (d *Device) SubscribePosition(channelId string) error { event := Event("Catalog;id=2") request.AppendHeader(&event) - response, err := SipUA.SendRequestWithTimeout(5, request) + response, err := SipStack.SendRequestWithTimeout(5, request) if err != nil { return err } @@ -184,7 +184,7 @@ func (d *Device) SubscribePosition(channelId string) error { func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction { body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId) request := d.BuildMessageRequest(channelId, body) - return SipUA.SendRequest(request) + return SipStack.SendRequest(request) } func (d *Device) UpdateChannel(id string, event string) { @@ -241,7 +241,7 @@ func (d *Device) NewRequestBuilder(method sip.RequestMethod, fromUser, realm, to func (d *Device) BuildInviteRequest(sessionName, channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) (sip.Request, error) { builder := d.NewRequestBuilder(sip.INVITE, Config.SipID, Config.SipContactAddr, channelId) - sdp := BuildSDP(Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc) + sdp := BuildSDP("video", Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc, "96 PS/90000") builder.SetContentType(&SDPMessageType) builder.SetContact(GlobalContactAddress) builder.SetBody(sdp) diff --git a/dialogs.go b/dialogs.go index 44e299c..9041f08 100644 --- a/dialogs.go +++ b/dialogs.go @@ -10,7 +10,7 @@ import ( ) var ( - Dialogs = NewDialogManager[*StreamWaiting]() + EarlyDialogs = NewDialogManager[*StreamWaiting]() ) type StreamWaiting struct { diff --git a/hook/event.go b/hook/event.go new file mode 100644 index 0000000..437245c --- /dev/null +++ b/hook/event.go @@ -0,0 +1,50 @@ +package hook + +import ( + "bytes" + "encoding/json" + "net/http" +) + +const ( + EventTypeDeviceOnline = iota + 1 + EventTypeDeviceOffline + EventTypeDevicePosition + EventTypeDeviceOnInvite +) + +var ( + EventUrls = make(map[int]string) +) + +func RegisterEventUrl(event int, url string) { + EventUrls[event] = url +} + +func PostEvent(url string, body []byte) (*http.Response, error) { + client := &http.Client{ + //Timeout: time.Duration(AppConfig.Hooks.Timeout), + } + + request, err := http.NewRequest("post", url, bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + + request.Header.Set("Content-Type", "application/json") + return client.Do(request) +} + +func PostOnInviteEvent(simNumber, channelNumber string) (*http.Response, error) { + params := map[string]string{ + "sim_number": simNumber, + "channel_number": channelNumber, + } + + body, err := json.Marshal(params) + if err != nil { + return nil, err + } + + return PostEvent(EventUrls[EventTypeDeviceOnInvite], body) +} diff --git a/jt_device.go b/jt_device.go new file mode 100644 index 0000000..a86cb00 --- /dev/null +++ b/jt_device.go @@ -0,0 +1,70 @@ +package main + +import ( + "github.com/ghettovoice/gosip/sip" + "net/http" + "strconv" + "strings" +) + +type JTDevice struct { + *Platform + username string + simNumber string +} + +func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response { + // 通知1078的信令服务器 + channels, _ := ChannelDao.QueryChannelsByChannelID(user) + if len(channels) < 1 { + Sugar.Errorf("处理1078的invite失败. 通道不存在 channel: %s device: %s", user, g.Username) + return CreateResponseWithStatusCode(request, http.StatusNotFound) + } else if channels[0].RootID != g.username { + Sugar.Errorf("处理1078的invite失败. 设备和通道不匹配 channel: %s device: %s", user, g.Username) + return CreateResponseWithStatusCode(request, http.StatusNotFound) + } + + channel := channels[0] + gbsdp, err := ParseGBSDP(request.Body()) + if err != nil { + Sugar.Errorf("处理上级Invite失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body()) + return CreateResponseWithStatusCode(request, http.StatusBadRequest) + } + + var inviteType InviteType + inviteType.SessionName2Type(strings.ToLower(gbsdp.sdp.Session)) + if InviteTypePlay != inviteType { + Sugar.Warnf("处理上级Invite失败, 1078暂不支持非实时预览流 inviteType: %s channel: %s device: %s", inviteType, user, g.Username) + return CreateResponseWithStatusCode(request, http.StatusNotImplemented) + } + + streamId := GenerateStreamID(inviteType, g.simNumber, strconv.Itoa(channel.ChannelNumber), gbsdp.startTime, gbsdp.stopTime) + + sink := &Sink{ + StreamID: streamId, + ServerAddr: g.ServerAddr, + Protocol: "gb_gateway"} + + response, err := AddForwardSink(TransStreamGBGateway, request, user, sink, streamId, gbsdp, inviteType, "96 PS/90000") + if err != nil { + Sugar.Errorf("处理1078的invite失败. 发送hook失败 err: %s channel: %s device: %s", err.Error(), user, g.Username) + return CreateResponseWithStatusCode(request, http.StatusInternalServerError) + } + + return response +} + +func NewJTDevice(model *JTDeviceModel, ua SipServer) (*JTDevice, error) { + platform, err := NewPlatform(&model.SIPUAOptions, ua) + if err != nil { + return nil, err + } + + platform.SetDeviceInfo(model.Name, model.Manufacturer, model.Model, model.Firmware) + + return &JTDevice{ + Platform: platform, + username: model.Username, + simNumber: model.SimNumber, + }, nil +} diff --git a/live.go b/live.go index 100ca1e..e3fe76a 100644 --- a/live.go +++ b/live.go @@ -41,7 +41,7 @@ func (i *InviteType) SessionName2Type(name string) { func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) { stream := &Stream{ StreamID: streamId, - Protocol: "28181", + Protocol: SourceType28181, } // 先添加占位置, 防止重复请求 @@ -64,8 +64,8 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId // 等待流媒体服务发送推流通知 wait := func() bool { waiting := StreamWaiting{} - _, _ = Dialogs.Add(string(streamId), &waiting) - defer Dialogs.Remove(string(streamId)) + _, _ = EarlyDialogs.Add(string(streamId), &waiting) + defer EarlyDialogs.Remove(string(streamId)) ok := http.StatusOK == waiting.Receive(10) if !ok { @@ -95,12 +95,12 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta defer func() { // 如果失败, 告知流媒体服务释放国标源 if err != nil { - go CloseSource(string(streamId)) + go MSCloseSource(string(streamId)) } }() // 告知流媒体服务创建国标源, 返回收流地址信息 - ip, port, urls, ssrc, msErr := CreateGBSource(string(streamId), setup, "", string(inviteType)) + ip, port, urls, ssrc, msErr := MSCreateGBSource(string(streamId), setup, "", string(inviteType)) if msErr != nil { Sugar.Errorf("创建GBSource失败 err: %s", msErr.Error()) return nil, nil, msErr @@ -126,7 +126,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta var body string reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // invite信令交互 - SipUA.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) { + SipStack.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) { if res.StatusCode() < 200 { } else if res.StatusCode() == 200 { @@ -144,7 +144,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta Sugar.Infof("send ack %s", ackRequest.String()) - err = SipUA.Send(ackRequest) + err = SipStack.Send(ackRequest) if err != nil { cancel() Sugar.Errorf("send ack error %s %s", err.Error(), ackRequest.String()) @@ -172,7 +172,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta } addr := fmt.Sprintf("%s:%d", answer.Addr, answer.Video.Port) - if err = ConnectGBSource(string(streamId), addr); err != nil { + if err = MSConnectGBSource(string(streamId), addr); err != nil { Sugar.Errorf("设置GB28181连接地址失败 err: %s addr: %s", err.Error(), addr) return nil, nil, err } diff --git a/main.go b/main.go index 9390b0c..bd78b24 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "gb-cms/hook" "go.uber.org/zap/zapcore" "net" "net/http" @@ -11,14 +12,14 @@ import ( ) var ( - Config *Config_ - SipUA SipServer + Config *Config_ + SipStack SipServer ) func init() { logConfig := LogConfig{ Level: int(zapcore.DebugLevel), - Name: "./logs/cms.log", + Name: "./logs/clog", MaxSize: 10, MaxBackup: 100, MaxAge: 7, @@ -38,6 +39,10 @@ func main() { indent, _ := json.MarshalIndent(Config, "", "\t") Sugar.Infof("server config:\r\n%s", indent) + if config.Hooks.OnInvite != "" { + hook.RegisterEventUrl(hook.EventTypeDeviceOnInvite, config.Hooks.OnInvite) + } + OnlineDeviceManager.Start(time.Duration(Config.AliveExpires)*time.Second/4, time.Duration(Config.AliveExpires)*time.Second, OnExpires) // 从数据库中恢复会话 @@ -58,7 +63,7 @@ func main() { Sugar.Infof("启动sip server成功. addr: %s:%d", config.ListenIP, config.SipPort) Config.SipContactAddr = net.JoinHostPort(config.PublicIP, strconv.Itoa(config.SipPort)) - SipUA = server + SipStack = server // 在sip启动后, 关闭无效的流 for _, stream := range streams { @@ -71,6 +76,8 @@ func main() { // 启动级联设备 startPlatformDevices() + // 启动1078设备 + startJTDevices() httpAddr := net.JoinHostPort(config.ListenIP, strconv.Itoa(config.HttpPort)) Sugar.Infof("启动http server. addr: %s", httpAddr) diff --git a/media_server.go b/media_server.go index e49cae5..5c11e12 100644 --- a/media_server.go +++ b/media_server.go @@ -6,10 +6,29 @@ import ( "fmt" "net" "net/http" + "net/url" "strconv" "time" ) +const ( + TransStreamRtmp = iota + 1 + TransStreamFlv = 2 + TransStreamRtsp = 3 + TransStreamHls = 4 + TransStreamRtc = 5 + TransStreamGBCascaded = 6 // 国标级联转发 + TransStreamGBTalk = 7 // 国标广播/对讲转发 + TransStreamGBGateway = 8 // 国标网关 +) + +const ( + SourceTypeRtmp = iota + 1 + SourceType28181 + SourceType1078 + SourceTypeGBTalk +) + type SourceDetails struct { ID string `json:"id"` Protocol string `json:"protocol"` // 推流协议 @@ -43,10 +62,22 @@ type SourceSDP struct { type GBOffer struct { SourceSDP - AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式 + AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式 + TransStreamProtocol int `json:"trans_stream_protocol,omitempty"` } func Send(path string, body interface{}) (*http.Response, error) { + return SendWithUrlParams(path, body, nil) +} + +func SendWithUrlParams(path string, body interface{}, values url.Values) (*http.Response, error) { + if values != nil { + params := values.Encode() + if len(params) > 0 { + path = fmt.Sprintf("%s?%s", path, params) + } + } + url := fmt.Sprintf("http://%s/%s", Config.MediaServer, path) data, err := json.Marshal(body) @@ -67,7 +98,7 @@ func Send(path string, body interface{}) (*http.Response, error) { return client.Do(request) } -func CreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) { +func MSCreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) { v := &SourceSDP{ Source: id, SDP: SDP{ @@ -102,7 +133,7 @@ func CreateGBSource(id, setup string, ssrc string, sessionName string) (string, return host, uint16(port), data.Data.Urls, data.Data.SSRC, err } -func ConnectGBSource(id, addr string) error { +func MSConnectGBSource(id, addr string) error { v := &SourceSDP{ Source: id, SDP: SDP{ @@ -114,7 +145,7 @@ func ConnectGBSource(id, addr string) error { return err } -func CloseSource(id string) error { +func MSCloseSource(id string) error { v := &struct { Source string `json:"source"` }{ @@ -125,10 +156,53 @@ func CloseSource(id string) error { return err } -func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (string, uint16, string, error) { +func MSCloseSink(sourceId string, sinkId string) { + v := struct { + SourceID string `json:"source"` + SinkID string `json:"sink"` // sink id + }{ + sourceId, sinkId, + } + + _, _ = Send("api/v1/sink/close", v) +} + +func MSQuerySourceList() ([]*SourceDetails, error) { + response, err := Send("api/v1/source/list", nil) + if err != nil { + return nil, err + } + + data := &Response[[]*SourceDetails]{} + if err = DecodeJSONBody(response.Body, data); err != nil { + return nil, err + } + + return data.Data, err +} + +func MSQuerySinkList(source string) ([]*SinkDetails, error) { + id := struct { + Source string `json:"source"` + }{source} + + response, err := Send("api/v1/sink/list", id) + if err != nil { + return nil, err + } + + data := &Response[[]*SinkDetails]{} + if err = DecodeJSONBody(response.Body, data); err != nil { + return nil, err + } + + return data.Data, err +} + +func MSAddForwardSink(protocol int, source, addr, offerSetup, answerSetup, ssrc, sessionName string, values url.Values) (string, uint16, string, error) { offer := &GBOffer{ SourceSDP: SourceSDP{ - Source: id, + Source: source, SDP: SDP{ Addr: addr, Setup: offerSetup, @@ -136,10 +210,12 @@ func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) ( SessionName: sessionName, }, }, - AnswerSetup: answerSetup, + AnswerSetup: answerSetup, + TransStreamProtocol: protocol, } - response, err := Send("api/v1/gb28181/answer/create", offer) + var err error + response, err := SendWithUrlParams("api/v1/sink/add", offer, values) if err != nil { return "", 0, "", err } @@ -163,46 +239,3 @@ func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) ( port, _ := strconv.Atoi(p) return host, uint16(port), data.Data.Sink, nil } - -func CloseSink(sourceId string, sinkId string) { - v := struct { - SourceID string `json:"source"` - SinkID string `json:"sink"` // sink id - }{ - sourceId, sinkId, - } - - _, _ = Send("api/v1/sink/close", v) -} - -func QuerySourceList() ([]*SourceDetails, error) { - response, err := Send("api/v1/source/list", nil) - if err != nil { - return nil, err - } - - data := &Response[[]*SourceDetails]{} - if err = DecodeJSONBody(response.Body, data); err != nil { - return nil, err - } - - return data.Data, err -} - -func QuerySinkList(source string) ([]*SinkDetails, error) { - id := struct { - Source string `json:"source"` - }{source} - - response, err := Send("api/v1/sink/list", id) - if err != nil { - return nil, err - } - - data := &Response[[]*SinkDetails]{} - if err = DecodeJSONBody(response.Body, data); err != nil { - return nil, err - } - - return data.Data, err -} diff --git a/message_factory.go b/message_factory.go index ceafa1d..2a0fd0a 100644 --- a/message_factory.go +++ b/message_factory.go @@ -12,15 +12,14 @@ const ( XmlHeaderGBK = `` + "\r\n" ) -func BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) string { +func BuildSDP(media, userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string, attrs ...string) string { format := "v=0\r\n" + "o=%s 0 0 IN IP4 %s\r\n" + "s=%s\r\n" + "c=IN IP4 %s\r\n" + "t=%s %s\r\n" + - "m=video %d %s 96\r\n" + - "a=%s\r\n" + - "a=rtpmap:96 PS/90000\r\n" + "m=%s %d %s %s\r\n" + + "a=%s\r\n" tcpFormat := "a=setup:%s\r\n" + "a=connection:new\r\n" @@ -34,7 +33,16 @@ func BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime mediaProtocol = "RTP/AVP" } - sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, port, mediaProtocol, "recvonly") + var mediaFormats []string + for _, attr := range attrs { + mediaFormats = append(mediaFormats, strings.Split(attr, " ")[0]) + } + + sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, media, port, mediaProtocol, strings.Join(mediaFormats, " "), "recvonly") + for _, attr := range attrs { + sdp += fmt.Sprintf("a=rtpmap:%s\r\n", attr) + } + if tcp { sdp += fmt.Sprintf(tcpFormat, setup) } @@ -54,6 +62,7 @@ func NewSIPRequestBuilderWithTransport(transport string) *sip.RequestBuilder { } builder.AddVia(&hop) + builder.SetUserAgent(nil) return builder } diff --git a/platform.go b/platform.go index a995a03..448aa12 100644 --- a/platform.go +++ b/platform.go @@ -10,19 +10,24 @@ import ( "sync" ) -type GBPlatform struct { - *Client +const ( + UATypeGB = iota + 1 + UATypeJT +) + +type Platform struct { + *gbClient lock sync.Mutex sinks map[string]StreamID // 保存级联转发的sink, 方便离线的时候关闭sink } -func (g *GBPlatform) addSink(callId string, stream StreamID) { +func (g *Platform) addSink(callId string, stream StreamID) { g.lock.Lock() defer g.lock.Unlock() g.sinks[callId] = stream } -func (g *GBPlatform) removeSink(callId string) StreamID { +func (g *Platform) removeSink(callId string) StreamID { g.lock.Lock() defer g.lock.Unlock() stream := g.sinks[callId] @@ -31,17 +36,17 @@ func (g *GBPlatform) removeSink(callId string) StreamID { } // OnBye 被上级挂断 -func (g *GBPlatform) OnBye(request sip.Request) { +func (g *Platform) OnBye(request sip.Request) { id, _ := request.CallID() g.CloseStream(id.Value(), false, true) } // CloseStream 关闭级联会话 -func (g *GBPlatform) CloseStream(callId string, bye, ms bool) { +func (g *Platform) CloseStream(callId string, bye, ms bool) { _ = g.removeSink(callId) sink := RemoveForwardSinkWithCallId(callId) if sink == nil { - Sugar.Errorf("关闭级联转发sink失败, 找不到sink. callid: %s", callId) + Sugar.Errorf("关闭转发sink失败, 找不到sink. callid: %s", callId) return } @@ -49,7 +54,7 @@ func (g *GBPlatform) CloseStream(callId string, bye, ms bool) { } // CloseStreams 关闭所有级联会话 -func (g *GBPlatform) CloseStreams(bye, ms bool) { +func (g *Platform) CloseStreams(bye, ms bool) { var callIds []string g.lock.Lock() @@ -66,8 +71,8 @@ func (g *GBPlatform) CloseStreams(bye, ms bool) { } // OnInvite 被上级呼叫 -func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response { - Sugar.Infof("收到级联Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body()) +func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { + Sugar.Infof("收到上级Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body()) source := request.Source() platform := PlatformManager.Find(source) @@ -75,124 +80,84 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response { deviceId, channel, err := PlatformDao.QueryPlatformChannel(g.ServerAddr, user) if err != nil { - Sugar.Errorf("级联转发失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user) + Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user) return CreateResponseWithStatusCode(request, http.StatusInternalServerError) } // 查找通道对应的设备 device, _ := DeviceDao.QueryDevice(deviceId) if device == nil { - Sugar.Errorf("级联转发失败, 设备不存在 device: %s channel: %s", device, user) + Sugar.Errorf("处理上级Invite失败, 设备不存在 device: %s channel: %s", device, user) return CreateResponseWithStatusCode(request, http.StatusNotFound) } - parse, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body()) + gbSdp, err := ParseGBSDP(request.Body()) if err != nil { - Sugar.Errorf("级联转发失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body()) + Sugar.Errorf("处理上级Invite失败,err: %s sdp: %s", err.Error(), request.Body()) return CreateResponseWithStatusCode(request, http.StatusBadRequest) } - // 解析时间范围 - time := strings.Split(parse.Time, " ") - if len(time) < 2 { - Sugar.Errorf("级联转发失败 上级sdp的时间范围格式错误 time: %s sdp: %s", parse.Time, request.Body()) - return CreateResponseWithStatusCode(request, http.StatusBadRequest) - } - - var streamId StreamID var inviteType InviteType - inviteType.SessionName2Type(strings.ToLower(parse.Session)) - switch inviteType { - case InviteTypePlay: - streamId = GenerateStreamID(InviteTypePlay, channel.ParentID, user, "", "") - break - case InviteTypePlayback: - // 级联下载和回放不限制路数,也不共享流 - streamId = GenerateStreamID(InviteTypePlayback, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10)) - break - case InviteTypeDownload: - streamId = GenerateStreamID(InviteTypeDownload, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10)) - break - } + inviteType.SessionName2Type(strings.ToLower(gbSdp.sdp.Session)) + streamId := GenerateStreamID(inviteType, channel.RootID, channel.DeviceID, gbSdp.startTime, gbSdp.stopTime) + // 如果流不存在, 向通道发送Invite请求 stream, _ := StreamDao.QueryStream(streamId) - addr := fmt.Sprintf("%s:%d", parse.Addr, media.Port) if stream == nil { - s := channel.SetupType.String() - println(s) - stream, err = device.StartStream(inviteType, streamId, user, time[0], time[1], channel.SetupType.String(), 0, true) + stream, err = device.StartStream(inviteType, streamId, user, gbSdp.startTime, gbSdp.stopTime, channel.SetupType.String(), 0, true) if err != nil { - Sugar.Errorf("级联转发失败 err: %s stream: %s", err.Error(), streamId) + Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId) return CreateResponseWithStatusCode(request, http.StatusBadRequest) } } - ip, port, sinkID, err := CreateAnswer(string(streamId), addr, offerSetup, answerSetup, ssrc, string(inviteType)) - if err != nil { - Sugar.Errorf("级联转发失败,向流媒体服务添加转发Sink失败 err: %s", err.Error()) - - if "play" != parse.Session { - CloseStream(streamId, true) - } - - return CreateResponseWithStatusCode(request, http.StatusInternalServerError) - } - - // answer添加contact头域 - answer := BuildSDP(user, parse.Session, ip, port, time[0], time[1], answerSetup, speed, ssrc) - response := CreateResponseWithStatusCode(request, http.StatusOK) - response.RemoveHeader("Contact") - response.AppendHeader(GlobalContactAddress.AsContactHeader()) - response.AppendHeader(&SDPMessageType) - response.SetBody(answer, true) - - setToTag(response) - sink := &Sink{ - SinkID: sinkID, StreamID: streamId, ServerAddr: g.ServerAddr, - Protocol: "gb_cascaded_forward"} - sink.SetDialog(g.CreateDialogRequestFromAnswer(response, true)) + Protocol: "gb_cascaded"} + + response, err := AddForwardSink(TransStreamGBCascaded, request, user, sink, streamId, gbSdp, inviteType, "96 PS/90000") + if err != nil { + Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId) + } - AddForwardSink(streamId, sink) return response } -func (g *GBPlatform) Start() { - Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.Username, g.sipClient.Transport, g.sipClient.ServerAddr) - g.sipClient.Start() - g.sipClient.SetOnRegisterHandler(g.onlineCB, g.offlineCB) +func (g *Platform) Start() { + Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.Username, g.sipUA.Transport, g.sipUA.ServerAddr) + g.sipUA.Start() + g.sipUA.SetOnRegisterHandler(g.Online, g.Offline) } -func (g *GBPlatform) Stop() { - g.sipClient.Stop() - g.sipClient.SetOnRegisterHandler(nil, nil) +func (g *Platform) Stop() { + g.sipUA.Stop() + g.sipUA.SetOnRegisterHandler(nil, nil) // 释放所有推流 g.CloseStreams(true, true) } -func (g *GBPlatform) Online() { - Sugar.Infof("级联设备上线 device: %s", g.SeverID) +func (g *Platform) Online() { + Sugar.Infof("ua上线 device: %s server addr: %s", g.Username, g.ServerAddr) - if err := PlatformDao.UpdatePlatformStatus(g.SeverID, ON); err != nil { - Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverID) + if err := PlatformDao.UpdateOnlineStatus(ON, g.ServerAddr); err != nil { + Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr) } } -func (g *GBPlatform) Offline() { - Sugar.Infof("级联设备离线 device: %s", g.SeverID) +func (g *Platform) Offline() { + Sugar.Infof("ua离线 device: %s server addr: %s", g.Username, g.ServerAddr) - if err := PlatformDao.UpdatePlatformStatus(g.SeverID, OFF); err != nil { - Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverID) + if err := PlatformDao.UpdateOnlineStatus(OFF, g.ServerAddr); err != nil { + Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr) } // 释放所有推流 g.CloseStreams(true, true) } -func NewGBPlatform(record *SIPUAParams, ua SipServer) (*GBPlatform, error) { +func NewPlatform(record *SIPUAOptions, ua SipServer) (*Platform, error) { if len(record.SeverID) != 20 { return nil, fmt.Errorf("SeverID must be exactly 20 characters long") } @@ -201,6 +166,6 @@ func NewGBPlatform(record *SIPUAParams, ua SipServer) (*GBPlatform, error) { return nil, err } - gbClient := NewGBClient(record, ua) - return &GBPlatform{Client: gbClient.(*Client), sinks: make(map[string]StreamID, 8)}, nil + client := NewGBClient(record, ua) + return &Platform{gbClient: client.(*gbClient), sinks: make(map[string]StreamID, 8)}, nil } diff --git a/platform_manager.go b/platform_manager.go deleted file mode 100644 index d8b4a21..0000000 --- a/platform_manager.go +++ /dev/null @@ -1,119 +0,0 @@ -package main - -import ( - "fmt" - "sync" -) - -var ( - PlatformManager = &platformManager{ - addrMap: make(map[string]*GBPlatform, 8), - } -) - -type platformManager struct { - addrMap map[string]*GBPlatform //上级地址->平台 - lock sync.RWMutex -} - -func (p *platformManager) Add(platform *GBPlatform) bool { - p.lock.Lock() - defer p.lock.Unlock() - - if _, ok := p.addrMap[platform.sipClient.ServerAddr]; ok { - return false - } - - p.addrMap[platform.sipClient.ServerAddr] = platform - return true -} - -func (p *platformManager) Find(addr string) *GBPlatform { - p.lock.RLock() - defer p.lock.RUnlock() - if platform, ok := p.addrMap[addr]; ok { - return platform - } - return nil -} - -func (p *platformManager) Remove(addr string) *GBPlatform { - p.lock.Lock() - defer p.lock.Unlock() - - platform, ok := p.addrMap[addr] - if !ok { - return nil - } - - delete(p.addrMap, addr) - return platform -} -func (p *platformManager) Platforms() []*GBPlatform { - p.lock.RLock() - defer p.lock.RUnlock() - - platforms := make([]*GBPlatform, 0, len(p.addrMap)) - for _, platform := range p.addrMap { - platforms = append(platforms, platform) - } - - return platforms -} - -func AddPlatform(platform *GBPlatform) error { - ok := PlatformManager.Add(platform) - if !ok { - return fmt.Errorf("平台添加失败, 地址冲突. addr: %s", platform.sipClient.ServerAddr) - } - - err := PlatformDao.SavePlatform(&platform.SIPUAParams) - if err != nil { - PlatformManager.Remove(platform.sipClient.ServerAddr) - return fmt.Errorf("平台保存到数据库失败, err: %s", err.Error()) - } - - return nil -} - -func RemovePlatform(addr string) (*GBPlatform, error) { - err := PlatformDao.DeletePlatform(addr) - if err != nil { - return nil, err - } - - platform := PlatformManager.Remove(addr) - return platform, nil -} - -func LoadPlatforms() []*SIPUAParams { - platforms := PlatformManager.Platforms() - params := make([]*SIPUAParams, 0, len(platforms)) - for _, platform := range platforms { - params = append(params, &platform.SIPUAParams) - } - - return params -} - -func QueryPlatform(add string) *GBPlatform { - return PlatformManager.Find(add) -} - -func UpdatePlatformStatus(addr string, status OnlineStatus) error { - platform := PlatformManager.Find(addr) - if platform == nil { - return fmt.Errorf("平台不存在. addr: %s", addr) - } - - //old := platform.Device.Status - platform.Device.Status = status - - err := PlatformDao.UpdatePlatformStatus(addr, status) - // platform.Device.Status = old - if err != nil { - return err - } - - return nil -} diff --git a/position.go b/position.go index 73c46c5..dd9afa5 100644 --- a/position.go +++ b/position.go @@ -47,7 +47,7 @@ func (d *Device) DoSubscribePosition(channelId string) error { event := Event("Catalog;id=2") request.AppendHeader(&event) - response, err := SipUA.SendRequestWithTimeout(5, request) + response, err := SipStack.SendRequestWithTimeout(5, request) if err != nil { return err } diff --git a/recover.go b/recover.go index ee5167c..44baf46 100644 --- a/recover.go +++ b/recover.go @@ -14,34 +14,40 @@ func startPlatformDevices() { } for _, record := range platforms { - platform, err := NewGBPlatform(record, SipUA) + platform, err := NewPlatform(&record.SIPUAOptions, SipStack) // 都入库了不允许失败, 程序有BUG, 及时修复 utils.Assert(err == nil) - utils.Assert(PlatformManager.Add(platform)) + utils.Assert(PlatformManager.Add(platform.ServerAddr, platform)) - if err := PlatformDao.UpdatePlatformStatus(record.ServerAddr, OFF); err != nil { + if err := PlatformDao.UpdateOnlineStatus(OFF, record.ServerAddr); err != nil { Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID) } - // 恢复级联会话 - // 不删会话能正常通信 - //for _, stream := range streams { - // sinks := stream.GetForwardStreamSinks() - // for _, sink := range sinks { - // if sink.DeviceID != record.SeverID { - // continue - // } - // - // callId, _ := sink.Dialog.CallID() - // channelCallId, _ := stream.Dialog.CallID() - // platform.addSink(callId.Value(), channelCallId.Value()) - // } - //} - platform.Start() } } +// 启动1078设备 +func startJTDevices() { + devices, err := JTDeviceDao.LoadDevices() + if err != nil { + Sugar.Errorf("查询1078设备失败 err: %s", err.Error()) + return + } + + for _, record := range devices { + // 都入库了不允许失败, 程序有BUG, 及时修复 + device, err := NewJTDevice(record, SipStack) + utils.Assert(err == nil) + utils.Assert(JTDeviceManager.Add(device.Username, device)) + + if err := JTDeviceDao.UpdateOnlineStatus(OFF, device.Username); err != nil { + Sugar.Infof("更新1078设备状态失败 err: %s device: %s", err.Error(), record.SeverID) + } + device.Start() + } +} + // 返回需要关闭的推流源和转流Sink func recoverStreams() (map[string]*Stream, map[string]*Sink) { // 比较数据库和流媒体服务器中的流会话, 以流媒体服务器中的为准, 释放过期的会话 @@ -55,10 +61,10 @@ func recoverStreams() (map[string]*Stream, map[string]*Sink) { dbSinks, _ := SinkDao.LoadForwardSinks() // 查询流媒体服务器中的推流源列表 - msSources, err := QuerySourceList() + msSources, err := MSQuerySourceList() if err != nil { // 流媒体服务器崩了, 存在的所有记录都无效, 全部删除 - Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除数据库中的所有记录. err: %s", err.Error()) + Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除所有推流记录. err: %s", err.Error()) } // 查询推流源下所有的转发sink列表 @@ -70,7 +76,7 @@ func recoverStreams() (map[string]*Stream, map[string]*Sink) { } // 查询转发sink - sinks, err := QuerySinkList(source.ID) + sinks, err := MSQuerySinkList(source.ID) if err != nil { Sugar.Warnf("查询拉流列表发生 err: %s", err.Error()) continue diff --git a/sink.go b/sink.go index bc02c73..7fa3566 100644 --- a/sink.go +++ b/sink.go @@ -6,18 +6,18 @@ import ( "github.com/ghettovoice/gosip/sip/parser" ) -// Sink 国标级联转发流 +// Sink 级联/对讲/网关转发流Sink type Sink struct { GBModel SinkID string `json:"sink_id"` // 流媒体服务器中的sink id StreamID StreamID `json:"stream_id"` // 推流ID SinkStreamID StreamID `json:"sink_stream_id"` // 广播使用, 每个广播设备的唯一ID - Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded_forward/gb_talk_forward + Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded/gb_talk/gb_gateway Dialog *RequestWrapper `json:"dialog,omitempty"` CallID string `json:"call_id,omitempty"` ServerAddr string `json:"server_addr,omitempty"` // 级联上级地址 CreateTime int64 `json:"create_time"` - SetupType SetupType // 转发类型 + SetupType SetupType // 流转发类型 } // Close 关闭级联会话. 是否向上级发送bye请求, 是否通知流媒体服务器发送删除sink @@ -28,7 +28,7 @@ func (s *Sink) Close(bye, ms bool) { } if ms { - go CloseSink(string(s.StreamID), s.SinkID) + go MSCloseSink(string(s.StreamID), s.SinkID) } } @@ -51,7 +51,7 @@ func (s *Sink) MarshalJSON() ([]byte, error) { func (s *Sink) Bye() { if s.Dialog != nil && s.Dialog.Request != nil { byeRequest := CreateRequestFromDialog(s.Dialog.Request, sip.BYE) - go SipUA.SendRequest(byeRequest) + go SipStack.SendRequest(byeRequest) } } diff --git a/sink_manager.go b/sink_manager.go index 56c5f00..af77906 100644 --- a/sink_manager.go +++ b/sink_manager.go @@ -1,12 +1,50 @@ package main -func AddForwardSink(StreamID StreamID, sink *Sink) bool { - if err := SinkDao.SaveForwardSink(StreamID, sink); err != nil { - Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", StreamID, sink.SinkID, err.Error()) - return false +import ( + "github.com/ghettovoice/gosip/sip" + "net/http" + "net/url" +) + +func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) { + urlParams := make(url.Values) + if TransStreamGBTalk == forwardType { + urlParams.Add("forward_type", "broadcast") + } else if TransStreamGBCascaded == forwardType { + urlParams.Add("forward_type", "cascaded") + } else if TransStreamGBGateway == forwardType { + urlParams.Add("forward_type", "gateway_1078") } - return true + ip, port, sinkID, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams) + if err != nil { + Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error()) + if InviteTypePlay != inviteType { + CloseStream(streamId, true) + } + + return nil, err + } + + sink.SinkID = sinkID + // 创建answer + answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, gbSdp.ssrc, attrs...) + response := CreateResponseWithStatusCode(request, http.StatusOK) + + // answer添加contact头域 + response.RemoveHeader("Contact") + response.AppendHeader(GlobalContactAddress.AsContactHeader()) + response.AppendHeader(&SDPMessageType) + response.SetBody(answer, true) + setToTag(response) + + sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source())) + + if err = SinkDao.SaveForwardSink(streamId, sink); err != nil { + Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error()) + } + + return response, nil } func RemoveForwardSink(StreamID StreamID, sinkID string) *Sink { diff --git a/sip_handler.go b/sip_handler.go index 00f9bab..645735b 100644 --- a/sip_handler.go +++ b/sip_handler.go @@ -104,6 +104,14 @@ func (e *EventHandler) OnCatalog(device string, response *CatalogResponse) { } } +func GetTypeCode(id string) string { + if len(id) != 20 { + return "" + } + + return id[10:13] +} + func (e *EventHandler) OnRecord(device string, response *QueryRecordInfoResponse) { event := SNManager.FindEvent(response.SN) if event == nil { diff --git a/sip_server.go b/sip_server.go index 6fefdc4..bec4e66 100644 --- a/sip_server.go +++ b/sip_server.go @@ -64,6 +64,13 @@ type sipServer struct { handler EventHandler } +type SipRequestSource struct { + req sip.Request + tx sip.ServerTransaction + fromCascade bool + fromJt bool +} + func (s *sipServer) Send(msg sip.Message) error { return s.sip.Send(msg) } @@ -74,39 +81,39 @@ func setToTag(response sip.Message) { to.Params = sip.NewParams().Add("tag", sip.String{Str: util.RandString(10)}) } -func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent bool) { +func (s *sipServer) OnRegister(wrapper *SipRequestSource) { var device GBDevice var queryCatalog bool - fromHeaders := req.GetHeaders("From") + fromHeaders := wrapper.req.GetHeaders("From") if len(fromHeaders) == 0 { - Sugar.Errorf("not find From header. message: %s", req.String()) + Sugar.Errorf("not find From header. message: %s", wrapper.req.String()) return } - _ = req.GetHeaders("Authorization") + _ = wrapper.req.GetHeaders("Authorization") fromHeader := fromHeaders[0].(*sip.FromHeader) - expiresHeader := req.GetHeaders("Expires") + expiresHeader := wrapper.req.GetHeaders("Expires") - response := sip.NewResponseFromRequest("", req, 200, "OK", "") + response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "") id := fromHeader.Address.User().String() if len(expiresHeader) > 0 && "0" == expiresHeader[0].Value() { Sugar.Infof("设备注销 Device: %s", id) s.handler.OnUnregister(id) } else /*if authorizationHeader == nil*/ { var expires int - expires, device, queryCatalog = s.handler.OnRegister(id, req.Transport(), req.Source()) + expires, device, queryCatalog = s.handler.OnRegister(id, wrapper.req.Transport(), wrapper.req.Source()) if device != nil { - Sugar.Infof("注册成功 Device: %s addr: %s", id, req.Source()) + Sugar.Infof("注册成功 Device: %s addr: %s", id, wrapper.req.Source()) expiresHeader := sip.Expires(expires) response.AppendHeader(&expiresHeader) } else { Sugar.Infof("注册失败 Device: %s", id) - response = sip.NewResponseFromRequest("", req, 401, "Unauthorized", "") + response = sip.NewResponseFromRequest("", wrapper.req, 401, "Unauthorized", "") } } - SendResponse(tx, response) + SendResponse(wrapper.tx, response) if device != nil { // 查询设备信息 @@ -119,9 +126,9 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent } // OnInvite 收到上级预览/下级设备广播请求 -func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent bool) { - SendResponse(tx, sip.NewResponseFromRequest("", req, 100, "Trying", "")) - user := req.Recipient().User().String() +func (s *sipServer) OnInvite(wrapper *SipRequestSource) { + SendResponse(wrapper.tx, sip.NewResponseFromRequest("", wrapper.req, 100, "Trying", "")) + user := wrapper.req.Recipient().User().String() //if len(user) != 20 { // SendResponseWithStatusCode(req, tx, http.StatusNotFound) @@ -130,43 +137,52 @@ func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent b // 查找对应的设备 var device GBDevice - if parent { + if wrapper.fromCascade { // 级联设备 - device = PlatformManager.Find(req.Source()) - } else if session := Dialogs.Find(user); session != nil { - // 语音广播设备 - device, _ = DeviceDao.QueryDevice(session.data.(*Sink).SinkStreamID.DeviceID()) + device = PlatformManager.Find(wrapper.req.Source()) + } else if wrapper.fromJt { + // 部标设备 + // 1. 根据通道查找到对应的设备ID + // 2. 根据Subject头域查找对应的设备ID + if channels, _ := ChannelDao.QueryChannelsByChannelID(user); len(channels) > 0 { + device = JTDeviceManager.Find(channels[0].RootID) + } } else { - // 根据Subject头域查找设备 - headers := req.GetHeaders("Subject") - if len(headers) > 0 { - subject := headers[0].(*sip.GenericHeader) - split := strings.Split(strings.Split(subject.Value(), ",")[0], ":") - if len(split) > 1 { - device, _ = DeviceDao.QueryDevice(split[1]) + if session := EarlyDialogs.Find(user); session != nil { + // 语音广播设备 + device, _ = DeviceDao.QueryDevice(session.data.(*Sink).SinkStreamID.DeviceID()) + } else { + // 根据Subject头域查找设备 + headers := wrapper.req.GetHeaders("Subject") + if len(headers) > 0 { + subject := headers[0].(*sip.GenericHeader) + split := strings.Split(strings.Split(subject.Value(), ",")[0], ":") + if len(split) > 1 { + device, _ = DeviceDao.QueryDevice(split[1]) + } } } } if device == nil { - logger.Error("处理Invite失败, 找不到设备. request: %s", req.String()) + logger.Error("处理Invite失败, 找不到设备. request: %s", wrapper.req.String()) - SendResponseWithStatusCode(req, tx, http.StatusNotFound) + SendResponseWithStatusCode(wrapper.req, wrapper.tx, http.StatusNotFound) } else { - response := device.OnInvite(req, user) - SendResponse(tx, response) + response := device.OnInvite(wrapper.req, user) + SendResponse(wrapper.tx, response) } } -func (s *sipServer) OnAck(req sip.Request, tx sip.ServerTransaction, parent bool) { +func (s *sipServer) OnAck(wrapper *SipRequestSource) { } -func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool) { - response := sip.NewResponseFromRequest("", req, 200, "OK", "") - SendResponse(tx, response) +func (s *sipServer) OnBye(wrapper *SipRequestSource) { + response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "") + SendResponse(wrapper.tx, response) - id, _ := req.CallID() + id, _ := wrapper.req.CallID() var deviceId string if stream, _ := StreamDao.DeleteStreamByCallID(id.Value()); stream != nil { @@ -177,48 +193,53 @@ func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool sink.Close(false, true) } - if parent { - // 上级设备挂断 - if platform := PlatformManager.Find(req.Source()); platform != nil { - platform.OnBye(req) + if wrapper.fromCascade { + // 级联上级挂断 + if platform := PlatformManager.Find(wrapper.req.Source()); platform != nil { + platform.OnBye(wrapper.req) + } + } else if wrapper.fromJt { + // 部标设备挂断 + if jtDevice := JTDeviceManager.Find(deviceId); jtDevice != nil { + jtDevice.OnBye(wrapper.req) } } else if device, _ := DeviceDao.QueryDevice(deviceId); device != nil { - device.OnBye(req) + device.OnBye(wrapper.req) } } -func (s *sipServer) OnNotify(req sip.Request, tx sip.ServerTransaction, parent bool) { - response := sip.NewResponseFromRequest("", req, 200, "OK", "") - SendResponse(tx, response) +func (s *sipServer) OnNotify(wrapper *SipRequestSource) { + response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "") + SendResponse(wrapper.tx, response) mobilePosition := MobilePositionNotify{} - if err := DecodeXML([]byte(req.Body()), &mobilePosition); err != nil { - Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), req.String()) + if err := DecodeXML([]byte(wrapper.req.Body()), &mobilePosition); err != nil { + Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), wrapper.req.String()) return } s.handler.OnNotifyPosition(&mobilePosition) } -func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent bool) { +func (s *sipServer) OnMessage(wrapper *SipRequestSource) { var ok bool defer func() { var response sip.Response if ok { - response = CreateResponseWithStatusCode(req, http.StatusOK) + response = CreateResponseWithStatusCode(wrapper.req, http.StatusOK) } else { - response = CreateResponseWithStatusCode(req, http.StatusForbidden) + response = CreateResponseWithStatusCode(wrapper.req, http.StatusForbidden) } - SendResponse(tx, response) + SendResponse(wrapper.tx, response) }() - body := req.Body() + body := wrapper.req.Body() xmlName := GetRootElementName(body) cmd := GetCmdType(body) src, ok := s.xmlReflectTypes[xmlName+"."+cmd] if !ok { - Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", req.String()) + Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", wrapper.req.String()) return } @@ -232,7 +253,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent deviceId := message.(BaseMessageGetter).GetDeviceID() if CmdBroadcast == cmd { // 广播消息 - from, _ := req.From() + from, _ := wrapper.req.From() deviceId = from.Address.User().String() } @@ -241,9 +262,15 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent break case XmlNameQuery: // 被上级查询 - device := PlatformManager.Find(req.Source()) + var device GBClient + if wrapper.fromCascade { + device = PlatformManager.Find(wrapper.req.Source()) + } else if wrapper.fromJt { + device = JTDeviceManager.Find(deviceId) + } + if ok = device != nil; !ok { - Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", req.Source(), req.String()) + Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String()) return } @@ -253,13 +280,15 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent var channels []*Channel // 查询出所有通道 - if PlatformDao != nil { - result, err := PlatformDao.QueryPlatformChannels(device.ServerAddr) + if wrapper.fromCascade { + result, err := PlatformDao.QueryPlatformChannels(device.GetDomain()) if err != nil { Sugar.Errorf("查询设备通道列表失败 err: %s device: %s", err.Error(), device.GetID()) } channels = result + } else if wrapper.fromJt { + channels, _ = ChannelDao.QueryChannelsByRootID(device.GetID()) } else { // 从模拟多个国标客户端中查找 channels = DeviceChannelsManager.FindChannels(device.GetID()) @@ -272,7 +301,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent case XmlNameNotify: if CmdKeepalive == cmd { // 下级设备心跳通知 - ok = s.handler.OnKeepAlive(deviceId, req.Source()) + ok = s.handler.OnKeepAlive(deviceId, wrapper.req.Source()) } break @@ -332,22 +361,28 @@ func (s *sipServer) ListenAddr() string { } // 过滤SIP消息、超找消息来源 -func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool)) gosip.RequestHandler { +func filterRequest(f func(wrapper *SipRequestSource)) gosip.RequestHandler { return func(req sip.Request, tx sip.ServerTransaction) { source := req.Source() + // 是否是级联上级下发的请求 platform := PlatformManager.Find(source) + // 是否是部标设备上级下发的请求 + var fromJt bool + if platform == nil { + fromJt = JTDeviceManager.ExistClientByServerAddr(req.Source()) + } switch req.Method() { case sip.SUBSCRIBE, sip.INFO: - if platform == nil { - // SUBSCRIBE/INFO只能上级发起 + if platform == nil || fromJt { + // SUBSCRIBE/INFO只能本级域向下级发起 SendResponseWithStatusCode(req, tx, http.StatusBadRequest) Sugar.Errorf("处理%s请求失败, %s消息只能上级发起. request: %s", req.Method(), req.Method(), req.String()) return } break case sip.NOTIFY, sip.REGISTER: - if platform != nil { + if platform != nil || fromJt { // NOTIFY和REGISTER只能下级发起 SendResponseWithStatusCode(req, tx, http.StatusBadRequest) Sugar.Errorf("处理%s请求失败, %s消息只能下级发起. request: %s", req.Method(), req.Method(), req.String()) @@ -356,13 +391,19 @@ func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool break } - f(req, tx, platform != nil) + f(&SipRequestSource{ + req, + tx, + platform != nil, + fromJt, + }) } } func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, error) { ua := gosip.NewServer(gosip.ServerConfig{ - Host: publicIP, + Host: publicIP, + UserAgent: "github/lkmio", }, nil, nil, logger) addr := net.JoinHostPort(listenIP, strconv.Itoa(listenPort)) @@ -392,11 +433,11 @@ func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, e utils.Assert(ua.OnRequest(sip.NOTIFY, filterRequest(server.OnNotify)) == nil) utils.Assert(ua.OnRequest(sip.MESSAGE, filterRequest(server.OnMessage)) == nil) - utils.Assert(ua.OnRequest(sip.INFO, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) { + utils.Assert(ua.OnRequest(sip.INFO, filterRequest(func(wrapper *SipRequestSource) { })) == nil) - utils.Assert(ua.OnRequest(sip.CANCEL, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) { + utils.Assert(ua.OnRequest(sip.CANCEL, filterRequest(func(wrapper *SipRequestSource) { })) == nil) - utils.Assert(ua.OnRequest(sip.SUBSCRIBE, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) { + utils.Assert(ua.OnRequest(sip.SUBSCRIBE, filterRequest(func(wrapper *SipRequestSource) { })) == nil) server.listenAddr = addr diff --git a/sip_client.go b/sip_ua.go similarity index 87% rename from sip_client.go rename to sip_ua.go index cf8de60..579f01e 100644 --- a/sip_client.go +++ b/sip_ua.go @@ -25,7 +25,7 @@ var ( UnregisterExpiresHeader = sip.Expires(0) ) -type SipClient interface { +type SIPUA interface { doRegister(request sip.Request) bool doUnregister() @@ -37,10 +37,12 @@ type SipClient interface { Stop() SetOnRegisterHandler(online, offline func()) + + GetDomain() string } -type SIPUAParams struct { - GBModel +type SIPUAOptions struct { + Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name Username string `json:"username"` // 用户名 SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复. ServerAddr string `json:"server_addr"` // 上级地址, 必选 @@ -51,17 +53,13 @@ type SIPUAParams struct { Status OnlineStatus `json:"status"` // 在线状态 } -func (g *SIPUAParams) TableName() string { - return "lkm_virtual_device" -} - -type sipClient struct { - SIPUAParams +type sipUA struct { + SIPUAOptions ListenAddr string //UA的监听地址 NatAddr string //Nat地址 - ua SipServer + stack SipServer exited bool ctx context.Context cancel context.CancelFunc @@ -74,7 +72,7 @@ type sipClient struct { offlineCB func() } -func (g *sipClient) doRegister(request sip.Request) bool { +func (g *sipUA) doRegister(request sip.Request) bool { hop, _ := request.ViaHop() empty := sip.String{} hop.Params.Add("rport", &empty) @@ -82,7 +80,7 @@ func (g *sipClient) doRegister(request sip.Request) bool { for i := 0; i < 2; i++ { //发起注册, 第一次未携带授权头, 第二次携带授权头 - clientTransaction := g.ua.SendRequest(request) + clientTransaction := g.stack.SendRequest(request) //等待响应 responses := clientTransaction.Responses() @@ -118,7 +116,7 @@ func (g *sipClient) doRegister(request sip.Request) bool { return false } -func (g *sipClient) startNewRegister() bool { +func (g *sipUA) startNewRegister() bool { builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport) expires := sip.Expires(g.RegisterExpires) builder.SetExpires(&expires) @@ -159,30 +157,30 @@ func CopySipRequest(old sip.Request) sip.Request { return request } -func (g *sipClient) refreshRegister() bool { +func (g *sipUA) refreshRegister() bool { request := CopySipRequest(g.registerOKRequest) return g.doRegister(request) } -func (g *sipClient) doUnregister() { +func (g *sipUA) doUnregister() { request := CopySipRequest(g.registerOKRequest) request.RemoveHeader("Expires") request.AppendHeader(&UnregisterExpiresHeader) - g.ua.SendRequest(request) + g.stack.SendRequest(request) if g.offlineCB != nil { go g.offlineCB() } } -func (g *sipClient) doKeepalive() bool { +func (g *sipUA) doKeepalive() bool { body := fmt.Sprintf(KeepAliveBody, time.Now().UnixMilli()/1000, g.Username) request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport, body) if err != nil { panic(err) } - transaction := g.ua.SendRequest(request) + transaction := g.stack.SendRequest(request) responses := transaction.Responses() var response sip.Response @@ -197,7 +195,7 @@ func (g *sipClient) doKeepalive() bool { } // IsExpires 是否临近注册有效期 -func (g *sipClient) IsExpires() (bool, int) { +func (g *sipUA) IsExpires() (bool, int) { if !g.registerOK { return false, 0 } @@ -207,7 +205,7 @@ func (g *sipClient) IsExpires() (bool, int) { } // Refresh 处理Client的生命周期任务, 发起注册, 发送心跳,断开重连等, 并返回下次刷新任务时间 -func (g *sipClient) Refresh() time.Duration { +func (g *sipUA) Refresh() time.Duration { expires, _ := g.IsExpires() if !g.registerOK || expires { @@ -256,7 +254,7 @@ func (g *sipClient) Refresh() time.Duration { return time.Duration(g.KeepaliveInterval) * time.Second } -func (g *sipClient) Start() { +func (g *sipUA) Start() { utils.Assert(!g.exited) g.ctx, g.cancel = context.WithCancel(context.Background()) @@ -284,21 +282,24 @@ func (g *sipClient) Start() { }() } -func (g *sipClient) Stop() { +func (g *sipUA) Stop() { utils.Assert(!g.exited) + if g.registerOK { + g.doUnregister() + } g.exited = true g.cancel() g.registerOK = false g.onlineCB = nil g.offlineCB = nil - - if g.registerOK { - g.doUnregister() - } } -func (g *sipClient) SetOnRegisterHandler(online, offline func()) { +func (g *sipUA) SetOnRegisterHandler(online, offline func()) { g.onlineCB = online g.offlineCB = offline } + +func (g *sipUA) GetDomain() string { + return g.ServerAddr +} diff --git a/stream.go b/stream.go index 7244418..ef047f5 100644 --- a/stream.go +++ b/stream.go @@ -34,6 +34,15 @@ func (s SetupType) String() string { panic("invalid setup type") } +func (s SetupType) MediaProtocol() string { + switch s { + case SetupTypePassive, SetupTypeActive: + return "TCP/RTP/AVP" + default: + return "RTP/AVP" + } +} + // RequestWrapper sql序列化 type RequestWrapper struct { sip.Request @@ -71,7 +80,7 @@ func (r *RequestWrapper) Scan(value interface{}) error { type Stream struct { GBModel StreamID StreamID `json:"stream_id"` // 流ID - Protocol string `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk + Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk Dialog *RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话 SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发) SetupType SetupType @@ -158,7 +167,7 @@ func (s *Stream) Close(bye, ms bool) { if ms { // 告知媒体服务释放source - go CloseSource(string(s.StreamID)) + go MSCloseSource(string(s.StreamID)) } // 关闭所转发会话 @@ -170,7 +179,7 @@ func (s *Stream) Close(bye, ms bool) { func (s *Stream) Bye() { if s.Dialog != nil && s.Dialog.Request != nil { - go SipUA.SendRequest(s.CreateRequestFromDialog(sip.BYE)) + go SipStack.SendRequest(s.CreateRequestFromDialog(sip.BYE)) s.Dialog = nil } } diff --git a/xml.go b/xml.go index 2cb6496..5c31bce 100644 --- a/xml.go +++ b/xml.go @@ -17,8 +17,8 @@ type Channel struct { GBModel // RootID 是设备的根ID, 用于查询设备的所有通道. - RootID string `json:"-" xml:"-" gorm:"index"` // 根设备ID - TypeCode int `json:"-" xml:"-" gorm:"index"` // 设备类型编码 + RootID string `json:"root_id" xml:"-" gorm:"index"` // 根设备ID + TypeCode int `json:"-" xml:"-" gorm:"index"` // 设备类型编码 // 所在组ID. 扩展的数据库字段, 方便查询某个目录下的设备列表. // 如果ParentID不为空, ParentID作为组ID, 如果ParentID为空, BusinessGroupID作为组ID. @@ -49,6 +49,7 @@ type Channel struct { Longitude string `json:"longitude" xml:"Longitude,omitempty"` Latitude string `json:"latitude" xml:"Latitude,omitempty"` SetupType SetupType `json:"setup_type,omitempty"` + ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号 } func (d *Channel) Online() bool { diff --git a/xml_record.go b/xml_record.go index 15ef228..ab1f84e 100644 --- a/xml_record.go +++ b/xml_record.go @@ -63,6 +63,6 @@ type RecordInfo struct { func (d *Device) DoQueryRecordList(channelId, startTime, endTime string, sn int, type_ string) error { body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_) request := d.BuildMessageRequest(channelId, body) - SipUA.SendRequest(request) + SipStack.SendRequest(request) return nil }