feat: 支持1078转GB28181

This commit is contained in:
ydajiang
2025-05-31 21:10:04 +08:00
parent 42649cdd21
commit 19ab406edd
32 changed files with 1174 additions and 638 deletions

194
api.go
View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"gb-cms/hook"
"github.com/ghettovoice/gosip/sip"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
@@ -31,7 +32,7 @@ type InviteParams struct {
type StreamParams struct {
Stream StreamID `json:"stream"` // Source
Protocol string `json:"protocol"` // 推拉流协议
Protocol int `json:"protocol"` // 推拉流协议
RemoteAddr string `json:"remote_addr"` // peer地址
}
@@ -161,8 +162,8 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表
apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &SIPUAParams{})) // 添加级联设备
apiServer.router.HandleFunc("/api/v1/platform/remove", withJsonResponse(apiServer.OnPlatformRemove, &SIPUAParams{})) // 删除级联设备
apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &PlatformModel{})) // 添加级联设备
apiServer.router.HandleFunc("/api/v1/platform/remove", withJsonResponse(apiServer.OnPlatformRemove, &PlatformModel{})) // 删除级联设备
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", withJsonResponse(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", withJsonResponse(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联解绑通道
@@ -170,6 +171,14 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", withJsonResponse(apiServer.OnHangup, &BroadcastParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
apiServer.router.HandleFunc("/api/v1/jt/device/add", withJsonResponse(apiServer.OnVirtualDeviceAdd, &JTDeviceModel{}))
apiServer.router.HandleFunc("/api/v1/jt/device/edit", withJsonResponse(apiServer.OnVirtualDeviceEdit, &JTDeviceModel{}))
apiServer.router.HandleFunc("/api/v1/jt/device/remove", withJsonResponse(apiServer.OnVirtualDeviceRemove, &JTDeviceModel{}))
apiServer.router.HandleFunc("/api/v1/jt/channel/add", withJsonResponse(apiServer.OnVirtualChannelAdd, &Channel{}))
apiServer.router.HandleFunc("/api/v1/jt/channel/edit", withJsonResponse(apiServer.OnVirtualChannelEdit, &Channel{}))
apiServer.router.HandleFunc("/api/v1/jt/channel/remove", withJsonResponse(apiServer.OnVirtualChannelRemove, &Channel{}))
http.Handle("/", apiServer.router)
srv := &http.Server{
@@ -201,9 +210,13 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive"&"stream_type=playback"&"start_time=2024-06-18T15:20:56"&"end_time=2024-06-18T15:25:56
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive&stream_type=playback&start_time=2024-06-18T15:20:56&end_time=2024-06-18T15:25:56
// 拉流地址携带的参数
query := r.URL.Query()
jtSource := query.Get("forward_type") == "gateway_1078"
// 跳过非国标拉流
sourceStream := strings.Split(string(params.Stream), "/")
if len(sourceStream) != 2 || len(sourceStream[0]) != 20 || len(sourceStream[1]) < 20 {
if !jtSource && (len(sourceStream) != 2 || len(sourceStream[0]) != 20 || len(sourceStream[1]) < 20) {
Sugar.Infof("跳过非国标拉流 stream: %s", params.Stream)
return
}
@@ -211,6 +224,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
// 已经存在,累加计数
if stream, _ := StreamDao.QueryStream(params.Stream); stream != nil {
stream.IncreaseSinkCount()
return
}
deviceId := sourceStream[0]
@@ -219,35 +233,52 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
channelId = channelId[:20]
}
// 发起invite的参数
query := r.URL.Query()
inviteParams := &InviteParams{
DeviceID: deviceId,
ChannelID: channelId,
StartTime: query.Get("start_time"),
EndTime: query.Get("end_time"),
Setup: strings.ToLower(query.Get("setup")),
Speed: query.Get("speed"),
streamId: params.Stream,
}
var code int
var stream *Stream
var err error
streamType := strings.ToLower(query.Get("stream_type"))
if "playback" == streamType {
code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
} else if "download" == streamType {
code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r)
} else {
code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
}
// 通知1078信令服务器
if jtSource {
if len(sourceStream) != 2 {
code = http.StatusBadRequest
Sugar.Errorf("1078信令服务器转发请求参数错误")
return
}
if err != nil {
Sugar.Errorf("请求流失败 err: %s", err.Error())
utils.Assert(http.StatusOK != code)
} else if http.StatusOK == code {
stream.IncreaseSinkCount()
simNumber := sourceStream[0]
channelNumber := sourceStream[1]
response, err := hook.PostOnInviteEvent(simNumber, channelNumber)
if err != nil {
code = http.StatusInternalServerError
Sugar.Errorf("通知1078信令服务器失败 err: %s sim number: %s channel number: %s", err.Error(), simNumber, channelNumber)
} else if code = response.StatusCode; code != http.StatusOK {
Sugar.Errorf("通知1078信令服务器失败. 响应状态码: %d sim number: %s channel number: %s", response.StatusCode, simNumber, channelNumber)
}
} else {
inviteParams := &InviteParams{
DeviceID: deviceId,
ChannelID: channelId,
StartTime: query.Get("start_time"),
EndTime: query.Get("end_time"),
Setup: strings.ToLower(query.Get("setup")),
Speed: query.Get("speed"),
streamId: params.Stream,
}
var stream *Stream
var err error
streamType := strings.ToLower(query.Get("stream_type"))
if "playback" == streamType {
code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
} else if "download" == streamType {
code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r)
} else {
code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
}
if err != nil {
Sugar.Errorf("请求流失败 err: %s", err.Error())
utils.Assert(http.StatusOK != code)
} else if http.StatusOK == code {
stream.IncreaseSinkCount()
}
}
w.WriteHeader(code)
@@ -256,63 +287,50 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
//stream := StreamManager.Find(params.StreamID)
//if stream == nil {
// Sugar.Errorf("处理播放结束事件失败, stream不存在. id: %s", params.StreamID)
// return
//}
//if 0 == stream.DecreaseSinkCount() && Config.AutoCloseOnIdle {
// CloseStream(params.StreamID, true)
//}
if !strings.HasPrefix(params.Protocol, "gb") {
return
}
sink := RemoveForwardSink(params.Stream, params.Sink)
if sink == nil {
Sugar.Errorf("处理转发结束事件失败, 找不到sink. stream: %s sink: %s", params.Stream, params.Sink)
return
}
// 级联断开连接, 向上级发送Bye请求
if params.Protocol == "gb_cascaded_forward" {
if params.Protocol == TransStreamGBCascaded {
if platform := PlatformManager.Find(sink.ServerAddr); platform != nil {
callID, _ := sink.Dialog.CallID()
platform.CloseStream(callID.Value(), true, false)
platform.(*Platform).CloseStream(callID.Value(), true, false)
}
} else if params.Protocol == "gb_talk_forward" {
// 对讲设备断开连接
} else {
sink.Close(true, false)
}
sink.Close(true, false)
}
func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream)
stream := Dialogs.Find(string(params.Stream))
if SourceTypeRtmp == params.Protocol {
return
}
stream := EarlyDialogs.Find(string(params.Stream))
if stream != nil {
stream.Put(200)
}
// 对讲websocket已连接
// 创建stream
if "gb_talk" == params.Protocol {
if params.Protocol == SourceTypeGBTalk {
Sugar.Infof("对讲websocket已连接, stream: %s", params.Stream)
}
s := &Stream{
StreamID: params.Stream,
Protocol: params.Protocol,
}
s := &Stream{
StreamID: params.Stream,
Protocol: params.Protocol,
}
_, ok := StreamDao.SaveStream(s)
if !ok {
Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream)
w.WriteHeader(http.StatusBadRequest)
return
}
_, ok := StreamDao.SaveStream(s)
if !ok {
Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream)
w.WriteHeader(http.StatusBadRequest)
return
}
}
@@ -321,7 +339,7 @@ func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter,
CloseStream(params.Stream, false)
// 对讲websocket断开连接
if "gb_talk" == params.Protocol {
if SourceTypeGBTalk == params.Protocol {
}
}
@@ -330,7 +348,7 @@ func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter,
Sugar.Infof("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// 非rtmp空闲超时, 返回非200应答, 删除会话
if params.Protocol != "rtmp" {
if SourceTypeRtmp != params.Protocol {
w.WriteHeader(http.StatusForbidden)
CloseStream(params.Stream, false)
}
@@ -340,7 +358,7 @@ func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWrit
Sugar.Infof("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// 非rtmp推流超时, 返回非200应答, 删除会话
if params.Protocol != "rtmp" {
if SourceTypeRtmp != params.Protocol {
w.WriteHeader(http.StatusForbidden)
CloseStream(params.Stream, false)
}
@@ -576,7 +594,7 @@ func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *ht
seekRequest.RemoveHeader(RtspMessageType.Name())
seekRequest.AppendHeader(&RtspMessageType)
SipUA.SendRequest(seekRequest)
SipStack.SendRequest(seekRequest)
return nil, nil
}
@@ -605,7 +623,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
defer func() {
if !ok {
if InviteSourceId != "" {
Dialogs.Remove(InviteSourceId)
EarlyDialogs.Remove(InviteSourceId)
}
if sinkStreamId != "" {
@@ -642,7 +660,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
sink := &Sink{
StreamID: v.StreamId,
SinkStreamID: sinkStreamId,
Protocol: "gb_talk_forward",
Protocol: "gb_talk",
CreateTime: time.Now().Unix(),
SetupType: setupType,
}
@@ -651,7 +669,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
if err := SinkDao.SaveForwardSink(v.StreamId, sink); err != nil {
Sugar.Errorf("广播失败, 设备正在广播中. stream: %s", sinkStreamId)
return nil, fmt.Errorf("设备正在广播中")
} else if _, ok = Dialogs.Add(InviteSourceId, streamWaiting); !ok {
} else if _, ok = EarlyDialogs.Add(InviteSourceId, streamWaiting); !ok {
Sugar.Errorf("广播失败, id冲突. id: %s", InviteSourceId)
return nil, fmt.Errorf("id冲突")
}
@@ -682,7 +700,8 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
Sugar.Errorf("广播失败, 下级设备invite失败. stream: %s", sinkStreamId)
return nil, fmt.Errorf("错误应答 code: %d", code)
} else {
ok = AddForwardSink(v.StreamId, sink)
//ok = AddForwardSink(v.StreamId, sink)
ok = true
}
break
case <-cancel.Done():
@@ -712,7 +731,7 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) {
}
}
func (api *ApiServer) OnPlatformAdd(v *SIPUAParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
func (api *ApiServer) OnPlatformAdd(v *PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("添加级联设备 %v", *v)
if v.Username == "" {
@@ -731,27 +750,32 @@ func (api *ApiServer) OnPlatformAdd(v *SIPUAParams, w http.ResponseWriter, r *ht
}
v.Status = "OFF"
platform, err := NewGBPlatform(v, SipUA)
if err == nil {
err = AddPlatform(platform)
platform, err := NewPlatform(&v.SIPUAOptions, SipStack)
if err != nil {
Sugar.Errorf("创建级联设备失败 err: %s", err.Error())
return nil, err
}
if err == nil {
platform.Start()
if !PlatformManager.Add(v.ServerAddr, platform) {
Sugar.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr)
return fmt.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr), nil
} else if err = PlatformDao.SavePlatform(v); err != nil {
PlatformManager.Remove(v.ServerAddr)
Sugar.Errorf("保存级联设备失败 err: %s", err.Error())
return nil, err
}
platform.Start()
return nil, err
}
func (api *ApiServer) OnPlatformRemove(v *SIPUAParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
func (api *ApiServer) OnPlatformRemove(v *PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("删除级联设备 %v", *v)
platform, err := RemovePlatform(v.ServerAddr)
err := PlatformDao.DeleteUAByAddr(v.ServerAddr)
if err != nil {
Sugar.Errorf("删除级联设备失败 err: %s", err.Error())
return nil, err
} else if platform != nil {
} else if platform := PlatformManager.Remove(v.ServerAddr); platform != nil {
platform.Stop()
}
@@ -759,8 +783,8 @@ func (api *ApiServer) OnPlatformRemove(v *SIPUAParams, w http.ResponseWriter, r
}
func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
platforms := LoadPlatforms()
httpResponseOK(w, platforms)
//platforms := LoadPlatforms()
//httpResponseOK(w, platforms)
}
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) {

116
api_jt.go Normal file
View File

@@ -0,0 +1,116 @@
package main
import (
"fmt"
"net/http"
)
func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("add virtual device: %v", *device)
if len(device.Username) != 20 {
Sugar.Errorf("invalid username: %s", device.Username)
return nil, fmt.Errorf("invalid username: %s", device.Username)
} else if len(device.SeverID) != 20 {
Sugar.Errorf("invalid server id: %s", device.SeverID)
return nil, fmt.Errorf("invalid server id: %s", device.SeverID)
} else if device.SimNumber == "" {
// sim卡号必选项
Sugar.Errorf("sim number is required")
return nil, fmt.Errorf("sim number is required")
}
if JTDeviceDao.ExistDevice(device.Username, device.SimNumber) {
// 用户名或sim卡号已存在
Sugar.Errorf("username or sim number already exists")
return nil, fmt.Errorf("username or sim number already exists")
} else if DeviceDao.ExistDevice(device.Username) {
// 用户名与下级设备冲突
Sugar.Errorf("username already exists")
return nil, fmt.Errorf("username already exists")
}
jtDevice, err := NewJTDevice(device, SipStack)
if err != nil {
Sugar.Errorf("create virtual device failed: %s", err.Error())
return nil, err
}
if !JTDeviceManager.Add(device.Username, jtDevice) {
return nil, fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username)
} else if err = JTDeviceDao.SaveDevice(device); err != nil {
JTDeviceManager.Remove(device.Username)
Sugar.Errorf("save device failed: %s", err.Error())
return nil, err
}
jtDevice.Start()
if err != nil {
Sugar.Errorf("add jt device failed: %s", err.Error())
return nil, err
}
return nil, nil
}
func (api *ApiServer) OnVirtualDeviceEdit(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
return nil, nil
}
func (api *ApiServer) OnVirtualDeviceRemove(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
err := JTDeviceDao.DeleteDevice(device.Username)
if err != nil {
return nil, err
} else if client := JTDeviceManager.Remove(device.Username); client != nil {
client.Stop()
}
return nil, nil
}
func (api *ApiServer) OnVirtualChannelAdd(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("add virtual channel: %v", *channel)
device, err := JTDeviceDao.QueryDevice(channel.RootID)
if err != nil {
Sugar.Errorf("query jt device failed: %s device: %s ", err.Error(), channel.RootID)
return nil, err
}
if len(channel.DeviceID) != 20 {
Sugar.Errorf("invalid channel id: %s", channel.DeviceID)
return nil, fmt.Errorf("invalid channel id: %s", channel.DeviceID)
}
channel.ParentID = device.Username
channel.RootID = device.Username
channel.GroupID = device.Username
err = ChannelDao.SaveJTChannel(channel)
if err != nil {
Sugar.Errorf("save channel failed: %s", err.Error())
}
return nil, err
}
func (api *ApiServer) OnVirtualChannelEdit(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
return nil, nil
}
func (api *ApiServer) OnVirtualChannelRemove(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("remove virtual channel: %v", *channel)
device, err := JTDeviceDao.QueryDevice(channel.RootID)
if err != nil {
Sugar.Errorf("query jt device failed: %s device: %s ", err.Error(), channel.RootID)
return nil, err
}
err = ChannelDao.DeleteChannel(device.Username, channel.DeviceID)
if err != nil {
Sugar.Errorf("delete channel failed: %s", err.Error())
}
return nil, err
}

View File

@@ -2,12 +2,8 @@ package main
import (
"fmt"
"gb-cms/sdp"
"github.com/ghettovoice/gosip/sip"
"net"
"net/http"
"strconv"
"strings"
)
const (
@@ -18,107 +14,50 @@ const (
"<SourceID>%s</SourceID>\r\n" +
"<TargetID>%s</TargetID>\r\n" +
"</Notify>\r\n"
AnswerFormat = "v=0\r\n" +
"o=%s 0 0 IN IP4 %s\r\n" +
"s=Play\r\n" +
"c=IN IP4 %s\r\n" +
"t=0 0\r\n" +
"m=audio %d %s 8\r\n" +
"a=sendonly\r\n" +
"a=rtpmap:8 PCMA/8000\r\n"
)
func findSetup(descriptor *sdp.SDP) SetupType {
var tcp bool
if descriptor.Audio != nil {
tcp = strings.Contains(descriptor.Audio.Proto, "TCP")
}
if !tcp && descriptor.Video != nil {
tcp = strings.Contains(descriptor.Video.Proto, "TCP")
}
setup := SetupTypeUDP
if tcp {
for _, attr := range descriptor.Attrs {
if "setup" == attr[0] {
if SetupTypePassive.String() == attr[1] {
setup = SetupTypePassive
} else if SetupTypeActive.String() == attr[1] {
setup = SetupTypeActive
}
}
}
}
return setup
}
func (d *Device) DoBroadcast(sourceId, channelId string) error {
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
SipUA.SendRequest(request)
SipStack.SendRequest(request)
return nil
}
// OnInvite 语音广播
func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
streamWaiting := Dialogs.Find(user)
// 会话是否存在
streamWaiting := EarlyDialogs.Find(user)
if streamWaiting == nil {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
// 解析offer
sink := streamWaiting.data.(*Sink)
body := request.Body()
offer, err := sdp.Parse(body)
offer, err := ParseGBSDP(body)
if err != nil {
Sugar.Infof("广播失败, 解析sdp发生err: %s sink: %s sdp: %s", err.Error(), sink.SinkID, body)
streamWaiting.Put(http.StatusBadRequest)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
} else if offer.Audio == nil {
} else if offer.media == nil {
Sugar.Infof("广播失败, offer中缺少audio字段. sink: %s sdp: %s", sink.SinkID, body)
streamWaiting.Put(http.StatusBadRequest)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
// 通知流媒体服务器创建answer
offerSetup := findSetup(offer)
answerSetup := sink.SetupType
finalSetup := offerSetup
if answerSetup != offerSetup {
finalSetup = answerSetup
// http接口中设置的setup优先级高于sdp中的setup
if offer.answerSetup != sink.SetupType {
offer.answerSetup = sink.SetupType
}
addr := net.JoinHostPort(offer.Addr, strconv.Itoa(int(offer.Audio.Port)))
host, port, sinkId, err := CreateAnswer(string(sink.StreamID), addr, offerSetup.String(), answerSetup.String(), "", string(InviteTypeBroadcast))
response, err := AddForwardSink(TransStreamGBTalk, request, user, sink, sink.StreamID, offer, InviteTypeBroadcast, "8 PCMA/8000")
if err != nil {
Sugar.Errorf("广播失败, 流媒体创建answer发生err: %s sink: %s ", err.Error(), sink.SinkID)
streamWaiting.Put(http.StatusInternalServerError)
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
var answerSDP string
// UDP广播
if SetupTypeUDP == finalSetup {
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipID, host, host, port, "RTP/AVP")
} else {
// TCP广播
answerSDP = fmt.Sprintf(AnswerFormat, Config.SipID, host, host, port, "TCP/RTP/AVP")
}
// 创建answer和dialog
response := CreateResponseWithStatusCode(request, http.StatusOK)
setToTag(response)
sink.SinkID = sinkId
sink.SetDialog(d.CreateDialogRequestFromAnswer(response, true))
response.SetBody(answerSDP, true)
response.AppendHeader(&SDPMessageType)
response.AppendHeader(GlobalContactAddress.AsContactHeader())
streamWaiting.Put(http.StatusOK)
return response
}

110
client.go
View File

@@ -2,14 +2,22 @@ package main
import (
"encoding/xml"
"fmt"
"gb-cms/sdp"
"github.com/ghettovoice/gosip/sip"
"strconv"
"strings"
)
const (
DefaultDomainName = "本域"
DefaultManufacturer = "github/lkmio"
DefaultModel = "gb-cms"
DefaultFirmware = "dev"
)
type GBClient interface {
SipClient
SIPUA
GBDevice
@@ -22,19 +30,21 @@ type GBClient interface {
OnQueryDeviceInfo(sn int)
OnSubscribeCatalog(sn int)
CloseStream(callId string, bye, ms bool)
}
type Client struct {
*sipClient
type gbClient struct {
*sipUA
Device
deviceInfo *DeviceInfoResponse
}
func (g *Client) OnQueryCatalog(sn int, channels []*Channel) {
func (g *gbClient) OnQueryCatalog(sn int, channels []*Channel) {
response := CatalogResponse{}
response.SN = sn
response.CmdType = CmdCatalog
response.DeviceID = g.sipClient.Username
response.DeviceID = g.sipUA.Username
response.SumNum = len(channels)
if response.SumNum < 1 {
@@ -48,60 +58,78 @@ func (g *Client) OnQueryCatalog(sn int, channels []*Channel) {
response.DeviceList.Devices = nil
response.DeviceList.Num = 1 // 一次发一个通道
response.DeviceList.Devices = append(response.DeviceList.Devices, &channel)
response.DeviceList.Devices[0].ParentID = g.sipClient.Username
response.DeviceList.Devices[0].ParentID = g.sipUA.Username
g.SendMessage(&response)
}
}
func (g *Client) SendMessage(msg interface{}) {
func (g *gbClient) SendMessage(msg interface{}) {
marshal, err := xml.MarshalIndent(msg, "", " ")
if err != nil {
panic(err)
}
request, err := BuildMessageRequest(g.sipClient.Username, g.sipClient.ListenAddr, g.sipClient.SeverID, g.sipClient.ServerAddr, g.sipClient.Transport, string(marshal))
request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.SeverID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal))
if err != nil {
panic(err)
}
g.sipClient.ua.SendRequest(request)
g.sipUA.stack.SendRequest(request)
}
func (g *Client) OnQueryDeviceInfo(sn int) {
func (g *gbClient) OnQueryDeviceInfo(sn int) {
g.deviceInfo.SN = sn
g.SendMessage(&g.deviceInfo)
}
func (g *Client) OnInvite(request sip.Request, user string) sip.Response {
func (g *gbClient) OnInvite(request sip.Request, user string) sip.Response {
return nil
}
func (g *Client) SetDeviceInfo(name, manufacturer, model, firmware string) {
func (g *gbClient) SetDeviceInfo(name, manufacturer, model, firmware string) {
g.deviceInfo.DeviceName = name
g.deviceInfo.Manufacturer = manufacturer
g.deviceInfo.Model = model
g.deviceInfo.Firmware = firmware
}
func (g *Client) OnSubscribeCatalog(sn int) {
func (g *gbClient) OnSubscribeCatalog(sn int) {
}
func ParseGBSDP(body string) (offer *sdp.SDP, ssrc string, speed int, media *sdp.Media, offerSetup, answerSetup string, err error) {
offer, err = sdp.Parse(body)
func (g *gbClient) CloseStream(callId string, bye, ms bool) {
}
type GBSDP struct {
sdp *sdp.SDP
ssrc string
speed int
media *sdp.Media
mediaType string
offerSetup, answerSetup SetupType
startTime, stopTime string
connectionAddr string
isTcpTransport bool
}
func ParseGBSDP(body string) (*GBSDP, error) {
offer, err := sdp.Parse(body)
if err != nil {
return nil, "", 0, nil, "", "", err
return nil, err
}
gbSdp := &GBSDP{sdp: offer}
// 解析设置下载速度
var setup string
for _, attr := range offer.Attrs {
if "downloadspeed" == attr[0] {
speed, err = strconv.Atoi(attr[1])
speed, err := strconv.Atoi(attr[1])
if err != nil {
return nil, "", 0, nil, "", "", err
return nil, err
}
gbSdp.speed = speed
} else if "setup" == attr[0] {
setup = attr[1]
}
@@ -110,35 +138,51 @@ func ParseGBSDP(body string) (offer *sdp.SDP, ssrc string, speed int, media *sdp
// 解析ssrc
for _, attr := range offer.Other {
if "y" == attr[0] {
ssrc = attr[1]
gbSdp.ssrc = attr[1]
}
}
if offer.Video != nil {
media = offer.Video
gbSdp.media = offer.Video
gbSdp.mediaType = "video"
} else if offer.Audio != nil {
media = offer.Audio
gbSdp.media = offer.Audio
gbSdp.mediaType = "audio"
}
tcp := strings.HasPrefix(media.Proto, "TCP")
tcp := strings.HasPrefix(gbSdp.media.Proto, "TCP")
if "passive" == setup && tcp {
offerSetup = "passive"
answerSetup = "active"
gbSdp.offerSetup = SetupTypePassive
gbSdp.answerSetup = SetupTypeActive
} else if "active" == setup && tcp {
offerSetup = "active"
answerSetup = "passive"
gbSdp.offerSetup = SetupTypeActive
gbSdp.answerSetup = SetupTypePassive
}
return
time := strings.Split(gbSdp.sdp.Time, " ")
if len(time) < 2 {
return nil, fmt.Errorf("sdp的时间范围格式错误 time: %s sdp: %s", gbSdp.sdp.Time, body)
}
gbSdp.startTime = time[0]
gbSdp.stopTime = time[1]
gbSdp.isTcpTransport = tcp
gbSdp.connectionAddr = fmt.Sprintf("%s:%d", gbSdp.sdp.Addr, gbSdp.media.Port)
return gbSdp, nil
}
func NewGBClient(params *SIPUAParams, ua SipServer) GBClient {
sip := &sipClient{
SIPUAParams: *params,
ListenAddr: ua.ListenAddr(),
ua: ua,
func NewGBClient(params *SIPUAOptions, stack SipServer) GBClient {
ua := &sipUA{
SIPUAOptions: *params,
ListenAddr: stack.ListenAddr(),
stack: stack,
}
client := &Client{sip, Device{DeviceID: params.Username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: params.Username, CmdType: CmdDeviceInfo}, Result: "OK"}}}
// 心跳间隔最低10秒
if ua.SIPUAOptions.KeepaliveInterval < 10 {
ua.SIPUAOptions.KeepaliveInterval = 10
}
client := &gbClient{ua, Device{DeviceID: params.Username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: params.Username, CmdType: CmdDeviceInfo}, Result: "OK"}}}
return client
}

View File

@@ -100,8 +100,8 @@ package main
// m.Close(true)
//}
//
//type VirtualDevice struct {
// *Client
//type Platform struct {
// *gbClient
// streams map[string]*MediaStream
// lock sync.Locker
//}
@@ -126,7 +126,7 @@ package main
// }
//}
//
//func (v VirtualDevice) OnInvite(request sip.Request, user string) sip.Response {
//func (v Platform) OnInvite(request sip.Request, user string) sip.Response {
// if len(rtpPackets) < 1 {
// return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
// }
@@ -150,10 +150,10 @@ package main
// var ip string
// var port sip.Port
// var contactAddr string
// if v.sipClient.NatAddr != "" {
// contactAddr = v.sipClient.NatAddr
// if v.sipUA.NatAddr != "" {
// contactAddr = v.sipUA.NatAddr
// } else {
// contactAddr = v.sipClient.ListenAddr
// contactAddr = v.sipUA.ListenAddr
// }
//
// host, p, _ := net.SplitHostPort(contactAddr)
@@ -180,7 +180,7 @@ package main
// i, _ := strconv.Atoi(ssrc)
// stream.ssrc = uint32(i)
// stream.tcp = tcp
// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipClient.Domain)
// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain)
// callId, _ := response.CallID()
//
// {
@@ -203,7 +203,7 @@ package main
//
// if sendBye {
// bye := CreateRequestFromDialog(stream.dialog, sip.BYE)
// v.sipClient.ua.SendRequest(bye)
// v.sipUA.stack.SendRequest(bye)
// }
//
// stream.dialog = nil
@@ -219,7 +219,7 @@ package main
// stream.Start()
//
// // 绑定到StreamManager, bye请求才会找到设备回调
// streamId := GenerateStreamID(InviteTypePlay, v.sipClient.Username, user, "", "")
// streamId := GenerateStreamID(InviteTypePlay, v.sipUA.Username, user, "", "")
// s := StreamID{StreamID: streamId, Dialog: stream.dialog}
// StreamManager.Add(&s)
//
@@ -228,7 +228,7 @@ package main
// return response
//}
//
//func (v VirtualDevice) OnBye(request sip.Request) {
//func (v Platform) OnBye(request sip.Request) {
// id, _ := request.CallID()
// stream, ok := v.streams[id.Value()]
// if !ok {
@@ -245,7 +245,7 @@ package main
// stream.Close(false)
//}
//
//func (v VirtualDevice) Offline() {
//func (v Platform) Offline() {
// for _, stream := range v.streams {
// stream.Close(true)
// }
@@ -333,7 +333,7 @@ package main
// channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1)
// client := NewGBClient(deviceId, clientConfig.ServerAddr, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server)
//
// device := VirtualDevice{client.(*Client), map[string]*MediaStream{}, &sync.Mutex{}}
// device := Platform{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}}
// device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
//
// channel := &Channel{

94
client_manager.go Normal file
View File

@@ -0,0 +1,94 @@
package main
import (
"sync"
)
var (
// PlatformManager 管理级联设备
PlatformManager = &ClientManager{
clients: make(map[string]GBClient, 8), // server addr->client
addrMap: make(map[string]int, 8),
}
// JTDeviceManager 管理1078设备
JTDeviceManager = &ClientManager{
clients: make(map[string]GBClient, 8), // username->client
addrMap: make(map[string]int, 8),
}
)
type ClientManager struct {
clients map[string]GBClient
addrMap map[string]int
lock sync.RWMutex
}
func (p *ClientManager) Add(key string, client GBClient) bool {
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.clients[key]; ok {
return false
}
p.clients[key] = client
p.addrMap[client.GetDomain()]++
return true
}
func (p *ClientManager) Find(key string) GBClient {
p.lock.RLock()
defer p.lock.RUnlock()
if client, ok := p.clients[key]; ok {
return client
}
return nil
}
func (p *ClientManager) Remove(addr string) GBClient {
p.lock.Lock()
defer p.lock.Unlock()
client, ok := p.clients[addr]
if !ok {
return nil
}
p.addrMap[client.GetDomain()]++
if p.addrMap[client.GetDomain()] < 1 {
delete(p.addrMap, client.GetDomain())
}
delete(p.clients, addr)
return client
}
func (p *ClientManager) All() []GBClient {
p.lock.RLock()
defer p.lock.RUnlock()
clients := make([]GBClient, 0, len(p.clients))
for _, client := range p.clients {
clients = append(clients, client)
}
return clients
}
func (p *ClientManager) ExistClientByServerAddr(addr string) bool {
p.lock.RLock()
defer p.lock.RUnlock()
_, ok := p.addrMap[addr]
return ok
}
func RemovePlatform(key string) (GBClient, error) {
err := PlatformDao.DeleteUAByAddr(key)
if err != nil {
return nil, err
}
platform := PlatformManager.Remove(key)
return platform, nil
}

View File

@@ -25,6 +25,13 @@ type Config_ struct {
Addr string `json:"addr"`
Password string `json:"password"`
}
Hooks struct {
Online string `json:"online"`
Offline string `json:"offline"`
Position string `json:"position"`
OnInvite string `json:"on_invite"`
}
}
type LogConfig struct {

View File

@@ -22,6 +22,12 @@
"offline": "",
"?position" : "设备位置通知",
"position": ""
"position": "",
"?on_invite": "被邀请, 用于通知1078信令服务器, 向设备下发推流指令",
"on_invite": "http://localhost:8081/api/v1/jt1078/on_invite",
"?on_answer": "被查询录像,用于通知1078信令服务器",
"on_query_record": ""
}
}

View File

@@ -1,6 +1,7 @@
package main
import (
"fmt"
"gorm.io/gorm"
)
@@ -13,11 +14,25 @@ type DaoChannel interface {
QueryChannels(deviceId, groupId, string, page, size int) ([]*Channel, int, error)
QueryChannelsByRootID(rootId string) ([]*Channel, error)
QueryChannelsByChannelID(channelId string) ([]*Channel, error)
QueryChanelCount(deviceId string) (int, error)
QueryOnlineChanelCount(deviceId string) (int, error)
QueryChannelByTypeCode(codecs ...int) ([]*Channel, error)
ExistChannel(channelId string) bool
SaveJTChannel(channel *Channel) error
ExistJTChannel(simNumber string, channelNumber int) bool
QueryJTChannelBySimNumber(simNumber string) (*Channel, error)
DeleteChannel(deviceId string, channelId string) error
}
type daoChannel struct {
@@ -68,6 +83,15 @@ func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int) ([]
return channels, int(total), nil
}
func (d *daoChannel) QueryChannelsByRootID(rootId string) ([]*Channel, error) {
var channels []*Channel
tx := db.Where("root_id =?", rootId).Find(&channels)
if tx.Error != nil {
return nil, tx.Error
}
return channels, nil
}
func (d *daoChannel) QueryChanelCount(deviceId string) (int, error) {
var total int64
tx := db.Model(&Channel{}).Where("root_id =?", deviceId).Count(&total)
@@ -95,3 +119,37 @@ func (d *daoChannel) QueryChannelByTypeCode(codecs ...int) ([]*Channel, error) {
}
return channels, nil
}
func (d *daoChannel) ExistChannel(channelId string) bool {
var channel Channel
if db.Select("id").Where("device_id =?", channelId).Take(&channel).Error == nil {
return true
}
return false
}
func (d *daoChannel) SaveJTChannel(channel *Channel) error {
return DBTransaction(func(tx *gorm.DB) error {
var old Channel
if tx.Select("id").Where("root_id =? and channel_number =?", channel.RootID, channel.ChannelNumber).Take(&old).Error == nil {
return fmt.Errorf("channel number %d already exist", channel.ChannelNumber)
} else if tx.Select("id").Where("device_id =?", channel.DeviceID).Take(&old).Error == nil {
return fmt.Errorf("channel id %s already exist", channel.DeviceID)
}
return tx.Save(channel).Error
})
}
func (d *daoChannel) DeleteChannel(deviceId string, channelId string) error {
return db.Where("root_id =? and device_id =?", deviceId, channelId).Unscoped().Delete(&Channel{}).Error
}
func (d *daoChannel) QueryChannelsByChannelID(channelId string) ([]*Channel, error) {
var channels []*Channel
tx := db.Where("device_id =?", channelId).Find(&channels)
if tx.Error != nil {
return nil, tx.Error
}
return channels, nil
}

118
dao_jt.go Normal file
View File

@@ -0,0 +1,118 @@
package main
import (
"fmt"
"gorm.io/gorm"
)
// JTDeviceModel 数据库表结构
type JTDeviceModel struct {
GBModel
SIPUAOptions
Manufacturer string `json:"manufacturer"`
Model string `json:"model"`
Firmware string `json:"firmware"`
SimNumber string `json:"sim_number"`
}
func (g *JTDeviceModel) TableName() string {
return "lkm_jt_device"
}
// DaoJTDevice 保存级联和1078设备的sipua参数项
type DaoJTDevice interface {
LoadDevices() ([]*JTDeviceModel, error)
UpdateOnlineStatus(status OnlineStatus, username string) error
QueryDevice(user string) (*JTDeviceModel, error)
QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error)
ExistDevice(username, simNumber string) bool
DeleteDevice(username string) error
SaveDevice(model *JTDeviceModel) error
UpdateDevice(model *JTDeviceModel) error
}
type daoJTDevice struct {
}
func (d *daoJTDevice) LoadDevices() ([]*JTDeviceModel, error) {
var devices []*JTDeviceModel
tx := db.Find(&devices)
if tx.Error != nil {
return nil, tx.Error
}
return devices, nil
}
func (d *daoJTDevice) UpdateOnlineStatus(status OnlineStatus, username string) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Model(&JTDeviceModel{}).Where("username =?", username).Update("status", status).Error
})
}
func (d *daoJTDevice) ExistDevice(id, simNumber string) bool {
var device JTDeviceModel
if db.Where("username =? or sim_number =?", id, simNumber).Select("id").Take(&device).Error == nil {
return true
}
return false
}
func (d *daoJTDevice) QueryDevice(id string) (*JTDeviceModel, error) {
var device JTDeviceModel
tx := db.Where("username =?", id).Take(&device)
if tx.Error != nil {
return nil, tx.Error
}
return &device, nil
}
func (d *daoJTDevice) DeleteDevice(id string) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error
})
}
func (d *daoJTDevice) QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error) {
var device JTDeviceModel
tx := db.Where("sim_number =?", simNumber).Take(&device)
if tx.Error != nil {
return nil, tx.Error
}
return &device, nil
}
func (d *daoJTDevice) SaveDevice(model *JTDeviceModel) error {
return DBTransaction(func(tx *gorm.DB) error {
var old JTDeviceModel
tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old)
if tx.Error == nil {
return fmt.Errorf("username or sim number already exists")
}
return db.Save(model).Error
})
}
func (d *daoJTDevice) UpdateDevice(model *JTDeviceModel) error {
return DBTransaction(func(tx *gorm.DB) error {
var old JTDeviceModel
tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old)
if tx.Error != nil {
return tx.Error
} else {
model.ID = old.ID
}
return db.Save(model).Error
})
}

View File

@@ -1,17 +1,26 @@
package main
type DaoPlatform interface {
LoadPlatforms() ([]*SIPUAParams, error)
// PlatformModel 数据库表结构
type PlatformModel struct {
GBModel
SIPUAOptions
}
QueryPlatform(addr string) (*SIPUAParams, error)
func (g *PlatformModel) TableName() string {
return "lkm_platform"
}
SavePlatform(platform *SIPUAParams) error
// DaoVirtualDevice 保存级联和1078设备的sipua参数项
type DaoVirtualDevice interface {
LoadPlatforms() ([]*PlatformModel, error)
QueryPlatform(addr string) (*PlatformModel, error)
SavePlatform(platform *PlatformModel) error
DeletePlatform(addr string) error
UpdatePlatform(platform *SIPUAParams) error
UpdatePlatformStatus(addr string, status OnlineStatus) error
UpdatePlatform(platform *PlatformModel) error
BindChannels(addr string, channels [][2]string) ([][2]string, error)
@@ -26,8 +35,8 @@ type DaoPlatform interface {
type daoPlatform struct {
}
func (d *daoPlatform) LoadPlatforms() ([]*SIPUAParams, error) {
var platforms []*SIPUAParams
func (d *daoPlatform) LoadPlatforms() ([]*PlatformModel, error) {
var platforms []*PlatformModel
tx := db.Find(&platforms)
if tx.Error != nil {
return nil, tx.Error
@@ -36,8 +45,8 @@ func (d *daoPlatform) LoadPlatforms() ([]*SIPUAParams, error) {
return platforms, nil
}
func (d *daoPlatform) QueryPlatform(addr string) (*SIPUAParams, error) {
var platform SIPUAParams
func (d *daoPlatform) QueryUAByAddr(addr string) (*PlatformModel, error) {
var platform PlatformModel
tx := db.Where("server_addr =?", addr).First(&platform)
if tx.Error != nil {
return nil, tx.Error
@@ -46,8 +55,8 @@ func (d *daoPlatform) QueryPlatform(addr string) (*SIPUAParams, error) {
return &platform, nil
}
func (d *daoPlatform) SavePlatform(platform *SIPUAParams) error {
var old SIPUAParams
func (d *daoPlatform) SavePlatform(platform *PlatformModel) error {
var old PlatformModel
tx := db.Where("server_addr =?", platform.ServerAddr).First(&old)
if tx.Error == nil {
platform.ID = old.ID
@@ -55,27 +64,27 @@ func (d *daoPlatform) SavePlatform(platform *SIPUAParams) error {
return db.Save(platform).Error
}
func (d *daoPlatform) DeletePlatform(addr string) error {
return db.Where("server_addr =?", addr).Unscoped().Delete(&SIPUAParams{}).Error
func (d *daoPlatform) DeleteUAByAddr(addr string) error {
return db.Where("server_addr =?", addr).Unscoped().Delete(&PlatformModel{}).Error
}
func (d *daoPlatform) UpdatePlatform(platform *SIPUAParams) error {
func (d *daoPlatform) UpdatePlatform(platform *PlatformModel) error {
//TODO implement me
panic("implement me")
}
func (d *daoPlatform) UpdatePlatformStatus(addr string, status OnlineStatus) error {
return db.Model(&SIPUAParams{}).Where("server_addr =?", addr).Update("status", status).Error
func (d *daoPlatform) UpdateOnlineStatus(status OnlineStatus, addr string) error {
return db.Model(&PlatformModel{}).Where("server_addr =?", addr).Update("status", status).Error
}
type DBPlatformChannel struct {
type PlatformChannelModel struct {
GBModel
DeviceID string `json:"device_id"`
Channel string `json:"channel_id"`
ServerAddr string `json:"server_addr"`
}
func (d *DBPlatformChannel) TableName() string {
func (d *PlatformChannelModel) TableName() string {
return "lkm_platform_channel"
}
@@ -83,10 +92,10 @@ func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]stri
var res [][2]string
for _, channel := range channels {
var old DBPlatformChannel
var old PlatformChannelModel
_ = db.Where("device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr).First(&old)
if old.ID == 0 {
_ = db.Create(&DBPlatformChannel{
_ = db.Create(&PlatformChannelModel{
DeviceID: channel[0],
Channel: channel[1],
})
@@ -100,7 +109,7 @@ func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]stri
func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]string, error) {
var res [][2]string
for _, channel := range channels {
tx := db.Unscoped().Delete(&DBPlatformChannel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr)
tx := db.Unscoped().Delete(&PlatformChannelModel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr)
if tx.Error == nil {
res = append(res, channel)
} else {
@@ -112,8 +121,8 @@ func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]st
}
func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (string, *Channel, error) {
var platformChannel DBPlatformChannel
tx := db.Model(&DBPlatformChannel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel)
var platformChannel PlatformChannelModel
tx := db.Model(&PlatformChannelModel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel)
if tx.Error != nil {
return "", nil, tx.Error
}
@@ -128,7 +137,7 @@ func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (strin
}
func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*Channel, error) {
var platformChannels []*DBPlatformChannel
var platformChannels []*PlatformChannelModel
tx := db.Where("server_addr =?", addr).Find(&platformChannels)
if tx.Error != nil {
return nil, tx.Error
@@ -143,7 +152,6 @@ func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*Channel, error) {
} else {
Sugar.Errorf("查询级联设备通道失败. device_id: %s, channel_id: %s err: %s", platformChannel.DeviceID, platformChannel.Channel, tx.Error)
}
}
return channels, nil

View File

@@ -21,6 +21,7 @@ var (
PlatformDao = &daoPlatform{}
StreamDao = &daoStream{}
SinkDao = &daoSink{}
JTDeviceDao = &daoJTDevice{}
)
func init() {
@@ -61,13 +62,15 @@ func init() {
panic(err)
} else if err = db.AutoMigrate(&Channel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&SIPUAParams{}); err != nil {
} else if err = db.AutoMigrate(&PlatformModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Stream{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Sink{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&DBPlatformChannel{}); err != nil {
} else if err = db.AutoMigrate(&PlatformChannelModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&JTDeviceModel{}); err != nil {
panic(err)
}

View File

@@ -127,19 +127,19 @@ func (d *Device) BuildMessageRequest(to, body string) sip.Request {
func (d *Device) QueryDeviceInfo() {
body := fmt.Sprintf(DeviceInfoFormat, "1", d.DeviceID)
request := d.BuildMessageRequest(d.DeviceID, body)
SipUA.SendRequest(request)
SipStack.SendRequest(request)
}
func (d *Device) QueryCatalog() {
body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID)
request := d.BuildMessageRequest(d.DeviceID, body)
SipUA.SendRequest(request)
SipStack.SendRequest(request)
}
func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error {
body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_)
request := d.BuildMessageRequest(channelId, body)
SipUA.SendRequest(request)
SipStack.SendRequest(request)
return nil
}
@@ -169,7 +169,7 @@ func (d *Device) SubscribePosition(channelId string) error {
event := Event("Catalog;id=2")
request.AppendHeader(&event)
response, err := SipUA.SendRequestWithTimeout(5, request)
response, err := SipStack.SendRequestWithTimeout(5, request)
if err != nil {
return err
}
@@ -184,7 +184,7 @@ func (d *Device) SubscribePosition(channelId string) error {
func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
return SipUA.SendRequest(request)
return SipStack.SendRequest(request)
}
func (d *Device) UpdateChannel(id string, event string) {
@@ -241,7 +241,7 @@ func (d *Device) NewRequestBuilder(method sip.RequestMethod, fromUser, realm, to
func (d *Device) BuildInviteRequest(sessionName, channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) (sip.Request, error) {
builder := d.NewRequestBuilder(sip.INVITE, Config.SipID, Config.SipContactAddr, channelId)
sdp := BuildSDP(Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc)
sdp := BuildSDP("video", Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc, "96 PS/90000")
builder.SetContentType(&SDPMessageType)
builder.SetContact(GlobalContactAddress)
builder.SetBody(sdp)

View File

@@ -10,7 +10,7 @@ import (
)
var (
Dialogs = NewDialogManager[*StreamWaiting]()
EarlyDialogs = NewDialogManager[*StreamWaiting]()
)
type StreamWaiting struct {

50
hook/event.go Normal file
View File

@@ -0,0 +1,50 @@
package hook
import (
"bytes"
"encoding/json"
"net/http"
)
const (
EventTypeDeviceOnline = iota + 1
EventTypeDeviceOffline
EventTypeDevicePosition
EventTypeDeviceOnInvite
)
var (
EventUrls = make(map[int]string)
)
func RegisterEventUrl(event int, url string) {
EventUrls[event] = url
}
func PostEvent(url string, body []byte) (*http.Response, error) {
client := &http.Client{
//Timeout: time.Duration(AppConfig.Hooks.Timeout),
}
request, err := http.NewRequest("post", url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
return client.Do(request)
}
func PostOnInviteEvent(simNumber, channelNumber string) (*http.Response, error) {
params := map[string]string{
"sim_number": simNumber,
"channel_number": channelNumber,
}
body, err := json.Marshal(params)
if err != nil {
return nil, err
}
return PostEvent(EventUrls[EventTypeDeviceOnInvite], body)
}

70
jt_device.go Normal file
View File

@@ -0,0 +1,70 @@
package main
import (
"github.com/ghettovoice/gosip/sip"
"net/http"
"strconv"
"strings"
)
type JTDevice struct {
*Platform
username string
simNumber string
}
func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response {
// 通知1078的信令服务器
channels, _ := ChannelDao.QueryChannelsByChannelID(user)
if len(channels) < 1 {
Sugar.Errorf("处理1078的invite失败. 通道不存在 channel: %s device: %s", user, g.Username)
return CreateResponseWithStatusCode(request, http.StatusNotFound)
} else if channels[0].RootID != g.username {
Sugar.Errorf("处理1078的invite失败. 设备和通道不匹配 channel: %s device: %s", user, g.Username)
return CreateResponseWithStatusCode(request, http.StatusNotFound)
}
channel := channels[0]
gbsdp, err := ParseGBSDP(request.Body())
if err != nil {
Sugar.Errorf("处理上级Invite失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body())
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
var inviteType InviteType
inviteType.SessionName2Type(strings.ToLower(gbsdp.sdp.Session))
if InviteTypePlay != inviteType {
Sugar.Warnf("处理上级Invite失败, 1078暂不支持非实时预览流 inviteType: %s channel: %s device: %s", inviteType, user, g.Username)
return CreateResponseWithStatusCode(request, http.StatusNotImplemented)
}
streamId := GenerateStreamID(inviteType, g.simNumber, strconv.Itoa(channel.ChannelNumber), gbsdp.startTime, gbsdp.stopTime)
sink := &Sink{
StreamID: streamId,
ServerAddr: g.ServerAddr,
Protocol: "gb_gateway"}
response, err := AddForwardSink(TransStreamGBGateway, request, user, sink, streamId, gbsdp, inviteType, "96 PS/90000")
if err != nil {
Sugar.Errorf("处理1078的invite失败. 发送hook失败 err: %s channel: %s device: %s", err.Error(), user, g.Username)
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
return response
}
func NewJTDevice(model *JTDeviceModel, ua SipServer) (*JTDevice, error) {
platform, err := NewPlatform(&model.SIPUAOptions, ua)
if err != nil {
return nil, err
}
platform.SetDeviceInfo(model.Name, model.Manufacturer, model.Model, model.Firmware)
return &JTDevice{
Platform: platform,
username: model.Username,
simNumber: model.SimNumber,
}, nil
}

16
live.go
View File

@@ -41,7 +41,7 @@ func (i *InviteType) SessionName2Type(name string) {
func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) {
stream := &Stream{
StreamID: streamId,
Protocol: "28181",
Protocol: SourceType28181,
}
// 先添加占位置, 防止重复请求
@@ -64,8 +64,8 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId
// 等待流媒体服务发送推流通知
wait := func() bool {
waiting := StreamWaiting{}
_, _ = Dialogs.Add(string(streamId), &waiting)
defer Dialogs.Remove(string(streamId))
_, _ = EarlyDialogs.Add(string(streamId), &waiting)
defer EarlyDialogs.Remove(string(streamId))
ok := http.StatusOK == waiting.Receive(10)
if !ok {
@@ -95,12 +95,12 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
defer func() {
// 如果失败, 告知流媒体服务释放国标源
if err != nil {
go CloseSource(string(streamId))
go MSCloseSource(string(streamId))
}
}()
// 告知流媒体服务创建国标源, 返回收流地址信息
ip, port, urls, ssrc, msErr := CreateGBSource(string(streamId), setup, "", string(inviteType))
ip, port, urls, ssrc, msErr := MSCreateGBSource(string(streamId), setup, "", string(inviteType))
if msErr != nil {
Sugar.Errorf("创建GBSource失败 err: %s", msErr.Error())
return nil, nil, msErr
@@ -126,7 +126,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
var body string
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// invite信令交互
SipUA.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) {
SipStack.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) {
if res.StatusCode() < 200 {
} else if res.StatusCode() == 200 {
@@ -144,7 +144,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
Sugar.Infof("send ack %s", ackRequest.String())
err = SipUA.Send(ackRequest)
err = SipStack.Send(ackRequest)
if err != nil {
cancel()
Sugar.Errorf("send ack error %s %s", err.Error(), ackRequest.String())
@@ -172,7 +172,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
}
addr := fmt.Sprintf("%s:%d", answer.Addr, answer.Video.Port)
if err = ConnectGBSource(string(streamId), addr); err != nil {
if err = MSConnectGBSource(string(streamId), addr); err != nil {
Sugar.Errorf("设置GB28181连接地址失败 err: %s addr: %s", err.Error(), addr)
return nil, nil, err
}

15
main.go
View File

@@ -2,6 +2,7 @@ package main
import (
"encoding/json"
"gb-cms/hook"
"go.uber.org/zap/zapcore"
"net"
"net/http"
@@ -11,14 +12,14 @@ import (
)
var (
Config *Config_
SipUA SipServer
Config *Config_
SipStack SipServer
)
func init() {
logConfig := LogConfig{
Level: int(zapcore.DebugLevel),
Name: "./logs/cms.log",
Name: "./logs/clog",
MaxSize: 10,
MaxBackup: 100,
MaxAge: 7,
@@ -38,6 +39,10 @@ func main() {
indent, _ := json.MarshalIndent(Config, "", "\t")
Sugar.Infof("server config:\r\n%s", indent)
if config.Hooks.OnInvite != "" {
hook.RegisterEventUrl(hook.EventTypeDeviceOnInvite, config.Hooks.OnInvite)
}
OnlineDeviceManager.Start(time.Duration(Config.AliveExpires)*time.Second/4, time.Duration(Config.AliveExpires)*time.Second, OnExpires)
// 从数据库中恢复会话
@@ -58,7 +63,7 @@ func main() {
Sugar.Infof("启动sip server成功. addr: %s:%d", config.ListenIP, config.SipPort)
Config.SipContactAddr = net.JoinHostPort(config.PublicIP, strconv.Itoa(config.SipPort))
SipUA = server
SipStack = server
// 在sip启动后, 关闭无效的流
for _, stream := range streams {
@@ -71,6 +76,8 @@ func main() {
// 启动级联设备
startPlatformDevices()
// 启动1078设备
startJTDevices()
httpAddr := net.JoinHostPort(config.ListenIP, strconv.Itoa(config.HttpPort))
Sugar.Infof("启动http server. addr: %s", httpAddr)

View File

@@ -6,10 +6,29 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"time"
)
const (
TransStreamRtmp = iota + 1
TransStreamFlv = 2
TransStreamRtsp = 3
TransStreamHls = 4
TransStreamRtc = 5
TransStreamGBCascaded = 6 // 国标级联转发
TransStreamGBTalk = 7 // 国标广播/对讲转发
TransStreamGBGateway = 8 // 国标网关
)
const (
SourceTypeRtmp = iota + 1
SourceType28181
SourceType1078
SourceTypeGBTalk
)
type SourceDetails struct {
ID string `json:"id"`
Protocol string `json:"protocol"` // 推流协议
@@ -43,10 +62,22 @@ type SourceSDP struct {
type GBOffer struct {
SourceSDP
AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
TransStreamProtocol int `json:"trans_stream_protocol,omitempty"`
}
func Send(path string, body interface{}) (*http.Response, error) {
return SendWithUrlParams(path, body, nil)
}
func SendWithUrlParams(path string, body interface{}, values url.Values) (*http.Response, error) {
if values != nil {
params := values.Encode()
if len(params) > 0 {
path = fmt.Sprintf("%s?%s", path, params)
}
}
url := fmt.Sprintf("http://%s/%s", Config.MediaServer, path)
data, err := json.Marshal(body)
@@ -67,7 +98,7 @@ func Send(path string, body interface{}) (*http.Response, error) {
return client.Do(request)
}
func CreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) {
func MSCreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) {
v := &SourceSDP{
Source: id,
SDP: SDP{
@@ -102,7 +133,7 @@ func CreateGBSource(id, setup string, ssrc string, sessionName string) (string,
return host, uint16(port), data.Data.Urls, data.Data.SSRC, err
}
func ConnectGBSource(id, addr string) error {
func MSConnectGBSource(id, addr string) error {
v := &SourceSDP{
Source: id,
SDP: SDP{
@@ -114,7 +145,7 @@ func ConnectGBSource(id, addr string) error {
return err
}
func CloseSource(id string) error {
func MSCloseSource(id string) error {
v := &struct {
Source string `json:"source"`
}{
@@ -125,10 +156,53 @@ func CloseSource(id string) error {
return err
}
func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (string, uint16, string, error) {
func MSCloseSink(sourceId string, sinkId string) {
v := struct {
SourceID string `json:"source"`
SinkID string `json:"sink"` // sink id
}{
sourceId, sinkId,
}
_, _ = Send("api/v1/sink/close", v)
}
func MSQuerySourceList() ([]*SourceDetails, error) {
response, err := Send("api/v1/source/list", nil)
if err != nil {
return nil, err
}
data := &Response[[]*SourceDetails]{}
if err = DecodeJSONBody(response.Body, data); err != nil {
return nil, err
}
return data.Data, err
}
func MSQuerySinkList(source string) ([]*SinkDetails, error) {
id := struct {
Source string `json:"source"`
}{source}
response, err := Send("api/v1/sink/list", id)
if err != nil {
return nil, err
}
data := &Response[[]*SinkDetails]{}
if err = DecodeJSONBody(response.Body, data); err != nil {
return nil, err
}
return data.Data, err
}
func MSAddForwardSink(protocol int, source, addr, offerSetup, answerSetup, ssrc, sessionName string, values url.Values) (string, uint16, string, error) {
offer := &GBOffer{
SourceSDP: SourceSDP{
Source: id,
Source: source,
SDP: SDP{
Addr: addr,
Setup: offerSetup,
@@ -136,10 +210,12 @@ func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (
SessionName: sessionName,
},
},
AnswerSetup: answerSetup,
AnswerSetup: answerSetup,
TransStreamProtocol: protocol,
}
response, err := Send("api/v1/gb28181/answer/create", offer)
var err error
response, err := SendWithUrlParams("api/v1/sink/add", offer, values)
if err != nil {
return "", 0, "", err
}
@@ -163,46 +239,3 @@ func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (
port, _ := strconv.Atoi(p)
return host, uint16(port), data.Data.Sink, nil
}
func CloseSink(sourceId string, sinkId string) {
v := struct {
SourceID string `json:"source"`
SinkID string `json:"sink"` // sink id
}{
sourceId, sinkId,
}
_, _ = Send("api/v1/sink/close", v)
}
func QuerySourceList() ([]*SourceDetails, error) {
response, err := Send("api/v1/source/list", nil)
if err != nil {
return nil, err
}
data := &Response[[]*SourceDetails]{}
if err = DecodeJSONBody(response.Body, data); err != nil {
return nil, err
}
return data.Data, err
}
func QuerySinkList(source string) ([]*SinkDetails, error) {
id := struct {
Source string `json:"source"`
}{source}
response, err := Send("api/v1/sink/list", id)
if err != nil {
return nil, err
}
data := &Response[[]*SinkDetails]{}
if err = DecodeJSONBody(response.Body, data); err != nil {
return nil, err
}
return data.Data, err
}

View File

@@ -12,15 +12,14 @@ const (
XmlHeaderGBK = `<?xml version="1.0" encoding="GB2312"?>` + "\r\n"
)
func BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) string {
func BuildSDP(media, userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string, attrs ...string) string {
format := "v=0\r\n" +
"o=%s 0 0 IN IP4 %s\r\n" +
"s=%s\r\n" +
"c=IN IP4 %s\r\n" +
"t=%s %s\r\n" +
"m=video %d %s 96\r\n" +
"a=%s\r\n" +
"a=rtpmap:96 PS/90000\r\n"
"m=%s %d %s %s\r\n" +
"a=%s\r\n"
tcpFormat := "a=setup:%s\r\n" +
"a=connection:new\r\n"
@@ -34,7 +33,16 @@ func BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime
mediaProtocol = "RTP/AVP"
}
sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, port, mediaProtocol, "recvonly")
var mediaFormats []string
for _, attr := range attrs {
mediaFormats = append(mediaFormats, strings.Split(attr, " ")[0])
}
sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, media, port, mediaProtocol, strings.Join(mediaFormats, " "), "recvonly")
for _, attr := range attrs {
sdp += fmt.Sprintf("a=rtpmap:%s\r\n", attr)
}
if tcp {
sdp += fmt.Sprintf(tcpFormat, setup)
}
@@ -54,6 +62,7 @@ func NewSIPRequestBuilderWithTransport(transport string) *sip.RequestBuilder {
}
builder.AddVia(&hop)
builder.SetUserAgent(nil)
return builder
}

View File

@@ -10,19 +10,24 @@ import (
"sync"
)
type GBPlatform struct {
*Client
const (
UATypeGB = iota + 1
UATypeJT
)
type Platform struct {
*gbClient
lock sync.Mutex
sinks map[string]StreamID // 保存级联转发的sink, 方便离线的时候关闭sink
}
func (g *GBPlatform) addSink(callId string, stream StreamID) {
func (g *Platform) addSink(callId string, stream StreamID) {
g.lock.Lock()
defer g.lock.Unlock()
g.sinks[callId] = stream
}
func (g *GBPlatform) removeSink(callId string) StreamID {
func (g *Platform) removeSink(callId string) StreamID {
g.lock.Lock()
defer g.lock.Unlock()
stream := g.sinks[callId]
@@ -31,17 +36,17 @@ func (g *GBPlatform) removeSink(callId string) StreamID {
}
// OnBye 被上级挂断
func (g *GBPlatform) OnBye(request sip.Request) {
func (g *Platform) OnBye(request sip.Request) {
id, _ := request.CallID()
g.CloseStream(id.Value(), false, true)
}
// CloseStream 关闭级联会话
func (g *GBPlatform) CloseStream(callId string, bye, ms bool) {
func (g *Platform) CloseStream(callId string, bye, ms bool) {
_ = g.removeSink(callId)
sink := RemoveForwardSinkWithCallId(callId)
if sink == nil {
Sugar.Errorf("关闭级联转发sink失败, 找不到sink. callid: %s", callId)
Sugar.Errorf("关闭转发sink失败, 找不到sink. callid: %s", callId)
return
}
@@ -49,7 +54,7 @@ func (g *GBPlatform) CloseStream(callId string, bye, ms bool) {
}
// CloseStreams 关闭所有级联会话
func (g *GBPlatform) CloseStreams(bye, ms bool) {
func (g *Platform) CloseStreams(bye, ms bool) {
var callIds []string
g.lock.Lock()
@@ -66,8 +71,8 @@ func (g *GBPlatform) CloseStreams(bye, ms bool) {
}
// OnInvite 被上级呼叫
func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
Sugar.Infof("收到级Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body())
func (g *Platform) OnInvite(request sip.Request, user string) sip.Response {
Sugar.Infof("收到级Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body())
source := request.Source()
platform := PlatformManager.Find(source)
@@ -75,124 +80,84 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
deviceId, channel, err := PlatformDao.QueryPlatformChannel(g.ServerAddr, user)
if err != nil {
Sugar.Errorf("级联转发失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user)
Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user)
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
// 查找通道对应的设备
device, _ := DeviceDao.QueryDevice(deviceId)
if device == nil {
Sugar.Errorf("级联转发失败, 设备不存在 device: %s channel: %s", device, user)
Sugar.Errorf("处理上级Invite失败, 设备不存在 device: %s channel: %s", device, user)
return CreateResponseWithStatusCode(request, http.StatusNotFound)
}
parse, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
gbSdp, err := ParseGBSDP(request.Body())
if err != nil {
Sugar.Errorf("级联转发失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body())
Sugar.Errorf("处理上级Invite失败,err: %s sdp: %s", err.Error(), request.Body())
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
// 解析时间范围
time := strings.Split(parse.Time, " ")
if len(time) < 2 {
Sugar.Errorf("级联转发失败 上级sdp的时间范围格式错误 time: %s sdp: %s", parse.Time, request.Body())
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
var streamId StreamID
var inviteType InviteType
inviteType.SessionName2Type(strings.ToLower(parse.Session))
switch inviteType {
case InviteTypePlay:
streamId = GenerateStreamID(InviteTypePlay, channel.ParentID, user, "", "")
break
case InviteTypePlayback:
// 级联下载和回放不限制路数,也不共享流
streamId = GenerateStreamID(InviteTypePlayback, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
break
case InviteTypeDownload:
streamId = GenerateStreamID(InviteTypeDownload, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
break
}
inviteType.SessionName2Type(strings.ToLower(gbSdp.sdp.Session))
streamId := GenerateStreamID(inviteType, channel.RootID, channel.DeviceID, gbSdp.startTime, gbSdp.stopTime)
// 如果流不存在, 向通道发送Invite请求
stream, _ := StreamDao.QueryStream(streamId)
addr := fmt.Sprintf("%s:%d", parse.Addr, media.Port)
if stream == nil {
s := channel.SetupType.String()
println(s)
stream, err = device.StartStream(inviteType, streamId, user, time[0], time[1], channel.SetupType.String(), 0, true)
stream, err = device.StartStream(inviteType, streamId, user, gbSdp.startTime, gbSdp.stopTime, channel.SetupType.String(), 0, true)
if err != nil {
Sugar.Errorf("级联转发失败 err: %s stream: %s", err.Error(), streamId)
Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
}
ip, port, sinkID, err := CreateAnswer(string(streamId), addr, offerSetup, answerSetup, ssrc, string(inviteType))
if err != nil {
Sugar.Errorf("级联转发失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
if "play" != parse.Session {
CloseStream(streamId, true)
}
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
// answer添加contact头域
answer := BuildSDP(user, parse.Session, ip, port, time[0], time[1], answerSetup, speed, ssrc)
response := CreateResponseWithStatusCode(request, http.StatusOK)
response.RemoveHeader("Contact")
response.AppendHeader(GlobalContactAddress.AsContactHeader())
response.AppendHeader(&SDPMessageType)
response.SetBody(answer, true)
setToTag(response)
sink := &Sink{
SinkID: sinkID,
StreamID: streamId,
ServerAddr: g.ServerAddr,
Protocol: "gb_cascaded_forward"}
sink.SetDialog(g.CreateDialogRequestFromAnswer(response, true))
Protocol: "gb_cascaded"}
response, err := AddForwardSink(TransStreamGBCascaded, request, user, sink, streamId, gbSdp, inviteType, "96 PS/90000")
if err != nil {
Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId)
}
AddForwardSink(streamId, sink)
return response
}
func (g *GBPlatform) Start() {
Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.Username, g.sipClient.Transport, g.sipClient.ServerAddr)
g.sipClient.Start()
g.sipClient.SetOnRegisterHandler(g.onlineCB, g.offlineCB)
func (g *Platform) Start() {
Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.Username, g.sipUA.Transport, g.sipUA.ServerAddr)
g.sipUA.Start()
g.sipUA.SetOnRegisterHandler(g.Online, g.Offline)
}
func (g *GBPlatform) Stop() {
g.sipClient.Stop()
g.sipClient.SetOnRegisterHandler(nil, nil)
func (g *Platform) Stop() {
g.sipUA.Stop()
g.sipUA.SetOnRegisterHandler(nil, nil)
// 释放所有推流
g.CloseStreams(true, true)
}
func (g *GBPlatform) Online() {
Sugar.Infof("级联设备上线 device: %s", g.SeverID)
func (g *Platform) Online() {
Sugar.Infof("ua上线 device: %s server addr: %s", g.Username, g.ServerAddr)
if err := PlatformDao.UpdatePlatformStatus(g.SeverID, ON); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverID)
if err := PlatformDao.UpdateOnlineStatus(ON, g.ServerAddr); err != nil {
Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
}
}
func (g *GBPlatform) Offline() {
Sugar.Infof("级联设备离线 device: %s", g.SeverID)
func (g *Platform) Offline() {
Sugar.Infof("ua离线 device: %s server addr: %s", g.Username, g.ServerAddr)
if err := PlatformDao.UpdatePlatformStatus(g.SeverID, OFF); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverID)
if err := PlatformDao.UpdateOnlineStatus(OFF, g.ServerAddr); err != nil {
Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
}
// 释放所有推流
g.CloseStreams(true, true)
}
func NewGBPlatform(record *SIPUAParams, ua SipServer) (*GBPlatform, error) {
func NewPlatform(record *SIPUAOptions, ua SipServer) (*Platform, error) {
if len(record.SeverID) != 20 {
return nil, fmt.Errorf("SeverID must be exactly 20 characters long")
}
@@ -201,6 +166,6 @@ func NewGBPlatform(record *SIPUAParams, ua SipServer) (*GBPlatform, error) {
return nil, err
}
gbClient := NewGBClient(record, ua)
return &GBPlatform{Client: gbClient.(*Client), sinks: make(map[string]StreamID, 8)}, nil
client := NewGBClient(record, ua)
return &Platform{gbClient: client.(*gbClient), sinks: make(map[string]StreamID, 8)}, nil
}

View File

@@ -1,119 +0,0 @@
package main
import (
"fmt"
"sync"
)
var (
PlatformManager = &platformManager{
addrMap: make(map[string]*GBPlatform, 8),
}
)
type platformManager struct {
addrMap map[string]*GBPlatform //上级地址->平台
lock sync.RWMutex
}
func (p *platformManager) Add(platform *GBPlatform) bool {
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.addrMap[platform.sipClient.ServerAddr]; ok {
return false
}
p.addrMap[platform.sipClient.ServerAddr] = platform
return true
}
func (p *platformManager) Find(addr string) *GBPlatform {
p.lock.RLock()
defer p.lock.RUnlock()
if platform, ok := p.addrMap[addr]; ok {
return platform
}
return nil
}
func (p *platformManager) Remove(addr string) *GBPlatform {
p.lock.Lock()
defer p.lock.Unlock()
platform, ok := p.addrMap[addr]
if !ok {
return nil
}
delete(p.addrMap, addr)
return platform
}
func (p *platformManager) Platforms() []*GBPlatform {
p.lock.RLock()
defer p.lock.RUnlock()
platforms := make([]*GBPlatform, 0, len(p.addrMap))
for _, platform := range p.addrMap {
platforms = append(platforms, platform)
}
return platforms
}
func AddPlatform(platform *GBPlatform) error {
ok := PlatformManager.Add(platform)
if !ok {
return fmt.Errorf("平台添加失败, 地址冲突. addr: %s", platform.sipClient.ServerAddr)
}
err := PlatformDao.SavePlatform(&platform.SIPUAParams)
if err != nil {
PlatformManager.Remove(platform.sipClient.ServerAddr)
return fmt.Errorf("平台保存到数据库失败, err: %s", err.Error())
}
return nil
}
func RemovePlatform(addr string) (*GBPlatform, error) {
err := PlatformDao.DeletePlatform(addr)
if err != nil {
return nil, err
}
platform := PlatformManager.Remove(addr)
return platform, nil
}
func LoadPlatforms() []*SIPUAParams {
platforms := PlatformManager.Platforms()
params := make([]*SIPUAParams, 0, len(platforms))
for _, platform := range platforms {
params = append(params, &platform.SIPUAParams)
}
return params
}
func QueryPlatform(add string) *GBPlatform {
return PlatformManager.Find(add)
}
func UpdatePlatformStatus(addr string, status OnlineStatus) error {
platform := PlatformManager.Find(addr)
if platform == nil {
return fmt.Errorf("平台不存在. addr: %s", addr)
}
//old := platform.Device.Status
platform.Device.Status = status
err := PlatformDao.UpdatePlatformStatus(addr, status)
// platform.Device.Status = old
if err != nil {
return err
}
return nil
}

View File

@@ -47,7 +47,7 @@ func (d *Device) DoSubscribePosition(channelId string) error {
event := Event("Catalog;id=2")
request.AppendHeader(&event)
response, err := SipUA.SendRequestWithTimeout(5, request)
response, err := SipStack.SendRequestWithTimeout(5, request)
if err != nil {
return err
}

View File

@@ -14,34 +14,40 @@ func startPlatformDevices() {
}
for _, record := range platforms {
platform, err := NewGBPlatform(record, SipUA)
platform, err := NewPlatform(&record.SIPUAOptions, SipStack)
// 都入库了不允许失败, 程序有BUG, 及时修复
utils.Assert(err == nil)
utils.Assert(PlatformManager.Add(platform))
utils.Assert(PlatformManager.Add(platform.ServerAddr, platform))
if err := PlatformDao.UpdatePlatformStatus(record.ServerAddr, OFF); err != nil {
if err := PlatformDao.UpdateOnlineStatus(OFF, record.ServerAddr); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
}
// 恢复级联会话
// 不删会话能正常通信
//for _, stream := range streams {
// sinks := stream.GetForwardStreamSinks()
// for _, sink := range sinks {
// if sink.DeviceID != record.SeverID {
// continue
// }
//
// callId, _ := sink.Dialog.CallID()
// channelCallId, _ := stream.Dialog.CallID()
// platform.addSink(callId.Value(), channelCallId.Value())
// }
//}
platform.Start()
}
}
// 启动1078设备
func startJTDevices() {
devices, err := JTDeviceDao.LoadDevices()
if err != nil {
Sugar.Errorf("查询1078设备失败 err: %s", err.Error())
return
}
for _, record := range devices {
// 都入库了不允许失败, 程序有BUG, 及时修复
device, err := NewJTDevice(record, SipStack)
utils.Assert(err == nil)
utils.Assert(JTDeviceManager.Add(device.Username, device))
if err := JTDeviceDao.UpdateOnlineStatus(OFF, device.Username); err != nil {
Sugar.Infof("更新1078设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
}
device.Start()
}
}
// 返回需要关闭的推流源和转流Sink
func recoverStreams() (map[string]*Stream, map[string]*Sink) {
// 比较数据库和流媒体服务器中的流会话, 以流媒体服务器中的为准, 释放过期的会话
@@ -55,10 +61,10 @@ func recoverStreams() (map[string]*Stream, map[string]*Sink) {
dbSinks, _ := SinkDao.LoadForwardSinks()
// 查询流媒体服务器中的推流源列表
msSources, err := QuerySourceList()
msSources, err := MSQuerySourceList()
if err != nil {
// 流媒体服务器崩了, 存在的所有记录都无效, 全部删除
Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除数据库中的所有记录. err: %s", err.Error())
Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除所有推流记录. err: %s", err.Error())
}
// 查询推流源下所有的转发sink列表
@@ -70,7 +76,7 @@ func recoverStreams() (map[string]*Stream, map[string]*Sink) {
}
// 查询转发sink
sinks, err := QuerySinkList(source.ID)
sinks, err := MSQuerySinkList(source.ID)
if err != nil {
Sugar.Warnf("查询拉流列表发生 err: %s", err.Error())
continue

10
sink.go
View File

@@ -6,18 +6,18 @@ import (
"github.com/ghettovoice/gosip/sip/parser"
)
// Sink 国标级联转发流
// Sink 级联/对讲/网关转发流Sink
type Sink struct {
GBModel
SinkID string `json:"sink_id"` // 流媒体服务器中的sink id
StreamID StreamID `json:"stream_id"` // 推流ID
SinkStreamID StreamID `json:"sink_stream_id"` // 广播使用, 每个广播设备的唯一ID
Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded_forward/gb_talk_forward
Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded/gb_talk/gb_gateway
Dialog *RequestWrapper `json:"dialog,omitempty"`
CallID string `json:"call_id,omitempty"`
ServerAddr string `json:"server_addr,omitempty"` // 级联上级地址
CreateTime int64 `json:"create_time"`
SetupType SetupType // 转发类型
SetupType SetupType // 转发类型
}
// Close 关闭级联会话. 是否向上级发送bye请求, 是否通知流媒体服务器发送删除sink
@@ -28,7 +28,7 @@ func (s *Sink) Close(bye, ms bool) {
}
if ms {
go CloseSink(string(s.StreamID), s.SinkID)
go MSCloseSink(string(s.StreamID), s.SinkID)
}
}
@@ -51,7 +51,7 @@ func (s *Sink) MarshalJSON() ([]byte, error) {
func (s *Sink) Bye() {
if s.Dialog != nil && s.Dialog.Request != nil {
byeRequest := CreateRequestFromDialog(s.Dialog.Request, sip.BYE)
go SipUA.SendRequest(byeRequest)
go SipStack.SendRequest(byeRequest)
}
}

View File

@@ -1,12 +1,50 @@
package main
func AddForwardSink(StreamID StreamID, sink *Sink) bool {
if err := SinkDao.SaveForwardSink(StreamID, sink); err != nil {
Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", StreamID, sink.SinkID, err.Error())
return false
import (
"github.com/ghettovoice/gosip/sip"
"net/http"
"net/url"
)
func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) {
urlParams := make(url.Values)
if TransStreamGBTalk == forwardType {
urlParams.Add("forward_type", "broadcast")
} else if TransStreamGBCascaded == forwardType {
urlParams.Add("forward_type", "cascaded")
} else if TransStreamGBGateway == forwardType {
urlParams.Add("forward_type", "gateway_1078")
}
return true
ip, port, sinkID, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams)
if err != nil {
Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
if InviteTypePlay != inviteType {
CloseStream(streamId, true)
}
return nil, err
}
sink.SinkID = sinkID
// 创建answer
answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, gbSdp.ssrc, attrs...)
response := CreateResponseWithStatusCode(request, http.StatusOK)
// answer添加contact头域
response.RemoveHeader("Contact")
response.AppendHeader(GlobalContactAddress.AsContactHeader())
response.AppendHeader(&SDPMessageType)
response.SetBody(answer, true)
setToTag(response)
sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source()))
if err = SinkDao.SaveForwardSink(streamId, sink); err != nil {
Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error())
}
return response, nil
}
func RemoveForwardSink(StreamID StreamID, sinkID string) *Sink {

View File

@@ -104,6 +104,14 @@ func (e *EventHandler) OnCatalog(device string, response *CatalogResponse) {
}
}
func GetTypeCode(id string) string {
if len(id) != 20 {
return ""
}
return id[10:13]
}
func (e *EventHandler) OnRecord(device string, response *QueryRecordInfoResponse) {
event := SNManager.FindEvent(response.SN)
if event == nil {

View File

@@ -64,6 +64,13 @@ type sipServer struct {
handler EventHandler
}
type SipRequestSource struct {
req sip.Request
tx sip.ServerTransaction
fromCascade bool
fromJt bool
}
func (s *sipServer) Send(msg sip.Message) error {
return s.sip.Send(msg)
}
@@ -74,39 +81,39 @@ func setToTag(response sip.Message) {
to.Params = sip.NewParams().Add("tag", sip.String{Str: util.RandString(10)})
}
func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent bool) {
func (s *sipServer) OnRegister(wrapper *SipRequestSource) {
var device GBDevice
var queryCatalog bool
fromHeaders := req.GetHeaders("From")
fromHeaders := wrapper.req.GetHeaders("From")
if len(fromHeaders) == 0 {
Sugar.Errorf("not find From header. message: %s", req.String())
Sugar.Errorf("not find From header. message: %s", wrapper.req.String())
return
}
_ = req.GetHeaders("Authorization")
_ = wrapper.req.GetHeaders("Authorization")
fromHeader := fromHeaders[0].(*sip.FromHeader)
expiresHeader := req.GetHeaders("Expires")
expiresHeader := wrapper.req.GetHeaders("Expires")
response := sip.NewResponseFromRequest("", req, 200, "OK", "")
response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
id := fromHeader.Address.User().String()
if len(expiresHeader) > 0 && "0" == expiresHeader[0].Value() {
Sugar.Infof("设备注销 Device: %s", id)
s.handler.OnUnregister(id)
} else /*if authorizationHeader == nil*/ {
var expires int
expires, device, queryCatalog = s.handler.OnRegister(id, req.Transport(), req.Source())
expires, device, queryCatalog = s.handler.OnRegister(id, wrapper.req.Transport(), wrapper.req.Source())
if device != nil {
Sugar.Infof("注册成功 Device: %s addr: %s", id, req.Source())
Sugar.Infof("注册成功 Device: %s addr: %s", id, wrapper.req.Source())
expiresHeader := sip.Expires(expires)
response.AppendHeader(&expiresHeader)
} else {
Sugar.Infof("注册失败 Device: %s", id)
response = sip.NewResponseFromRequest("", req, 401, "Unauthorized", "")
response = sip.NewResponseFromRequest("", wrapper.req, 401, "Unauthorized", "")
}
}
SendResponse(tx, response)
SendResponse(wrapper.tx, response)
if device != nil {
// 查询设备信息
@@ -119,9 +126,9 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent
}
// OnInvite 收到上级预览/下级设备广播请求
func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent bool) {
SendResponse(tx, sip.NewResponseFromRequest("", req, 100, "Trying", ""))
user := req.Recipient().User().String()
func (s *sipServer) OnInvite(wrapper *SipRequestSource) {
SendResponse(wrapper.tx, sip.NewResponseFromRequest("", wrapper.req, 100, "Trying", ""))
user := wrapper.req.Recipient().User().String()
//if len(user) != 20 {
// SendResponseWithStatusCode(req, tx, http.StatusNotFound)
@@ -130,43 +137,52 @@ func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent b
// 查找对应的设备
var device GBDevice
if parent {
if wrapper.fromCascade {
// 级联设备
device = PlatformManager.Find(req.Source())
} else if session := Dialogs.Find(user); session != nil {
// 语音广播设备
device, _ = DeviceDao.QueryDevice(session.data.(*Sink).SinkStreamID.DeviceID())
device = PlatformManager.Find(wrapper.req.Source())
} else if wrapper.fromJt {
// 部标设备
// 1. 根据通道查找到对应的设备ID
// 2. 根据Subject头域查找对应的设备ID
if channels, _ := ChannelDao.QueryChannelsByChannelID(user); len(channels) > 0 {
device = JTDeviceManager.Find(channels[0].RootID)
}
} else {
// 根据Subject头域查找设备
headers := req.GetHeaders("Subject")
if len(headers) > 0 {
subject := headers[0].(*sip.GenericHeader)
split := strings.Split(strings.Split(subject.Value(), ",")[0], ":")
if len(split) > 1 {
device, _ = DeviceDao.QueryDevice(split[1])
if session := EarlyDialogs.Find(user); session != nil {
// 语音广播设备
device, _ = DeviceDao.QueryDevice(session.data.(*Sink).SinkStreamID.DeviceID())
} else {
// 根据Subject头域查找设备
headers := wrapper.req.GetHeaders("Subject")
if len(headers) > 0 {
subject := headers[0].(*sip.GenericHeader)
split := strings.Split(strings.Split(subject.Value(), ",")[0], ":")
if len(split) > 1 {
device, _ = DeviceDao.QueryDevice(split[1])
}
}
}
}
if device == nil {
logger.Error("处理Invite失败, 找不到设备. request: %s", req.String())
logger.Error("处理Invite失败, 找不到设备. request: %s", wrapper.req.String())
SendResponseWithStatusCode(req, tx, http.StatusNotFound)
SendResponseWithStatusCode(wrapper.req, wrapper.tx, http.StatusNotFound)
} else {
response := device.OnInvite(req, user)
SendResponse(tx, response)
response := device.OnInvite(wrapper.req, user)
SendResponse(wrapper.tx, response)
}
}
func (s *sipServer) OnAck(req sip.Request, tx sip.ServerTransaction, parent bool) {
func (s *sipServer) OnAck(wrapper *SipRequestSource) {
}
func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool) {
response := sip.NewResponseFromRequest("", req, 200, "OK", "")
SendResponse(tx, response)
func (s *sipServer) OnBye(wrapper *SipRequestSource) {
response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
SendResponse(wrapper.tx, response)
id, _ := req.CallID()
id, _ := wrapper.req.CallID()
var deviceId string
if stream, _ := StreamDao.DeleteStreamByCallID(id.Value()); stream != nil {
@@ -177,48 +193,53 @@ func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool
sink.Close(false, true)
}
if parent {
// 上级设备挂断
if platform := PlatformManager.Find(req.Source()); platform != nil {
platform.OnBye(req)
if wrapper.fromCascade {
// 级联上级挂断
if platform := PlatformManager.Find(wrapper.req.Source()); platform != nil {
platform.OnBye(wrapper.req)
}
} else if wrapper.fromJt {
// 部标设备挂断
if jtDevice := JTDeviceManager.Find(deviceId); jtDevice != nil {
jtDevice.OnBye(wrapper.req)
}
} else if device, _ := DeviceDao.QueryDevice(deviceId); device != nil {
device.OnBye(req)
device.OnBye(wrapper.req)
}
}
func (s *sipServer) OnNotify(req sip.Request, tx sip.ServerTransaction, parent bool) {
response := sip.NewResponseFromRequest("", req, 200, "OK", "")
SendResponse(tx, response)
func (s *sipServer) OnNotify(wrapper *SipRequestSource) {
response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
SendResponse(wrapper.tx, response)
mobilePosition := MobilePositionNotify{}
if err := DecodeXML([]byte(req.Body()), &mobilePosition); err != nil {
Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), req.String())
if err := DecodeXML([]byte(wrapper.req.Body()), &mobilePosition); err != nil {
Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
s.handler.OnNotifyPosition(&mobilePosition)
}
func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent bool) {
func (s *sipServer) OnMessage(wrapper *SipRequestSource) {
var ok bool
defer func() {
var response sip.Response
if ok {
response = CreateResponseWithStatusCode(req, http.StatusOK)
response = CreateResponseWithStatusCode(wrapper.req, http.StatusOK)
} else {
response = CreateResponseWithStatusCode(req, http.StatusForbidden)
response = CreateResponseWithStatusCode(wrapper.req, http.StatusForbidden)
}
SendResponse(tx, response)
SendResponse(wrapper.tx, response)
}()
body := req.Body()
body := wrapper.req.Body()
xmlName := GetRootElementName(body)
cmd := GetCmdType(body)
src, ok := s.xmlReflectTypes[xmlName+"."+cmd]
if !ok {
Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", req.String())
Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", wrapper.req.String())
return
}
@@ -232,7 +253,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
deviceId := message.(BaseMessageGetter).GetDeviceID()
if CmdBroadcast == cmd {
// 广播消息
from, _ := req.From()
from, _ := wrapper.req.From()
deviceId = from.Address.User().String()
}
@@ -241,9 +262,15 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
break
case XmlNameQuery:
// 被上级查询
device := PlatformManager.Find(req.Source())
var device GBClient
if wrapper.fromCascade {
device = PlatformManager.Find(wrapper.req.Source())
} else if wrapper.fromJt {
device = JTDeviceManager.Find(deviceId)
}
if ok = device != nil; !ok {
Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", req.Source(), req.String())
Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String())
return
}
@@ -253,13 +280,15 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
var channels []*Channel
// 查询出所有通道
if PlatformDao != nil {
result, err := PlatformDao.QueryPlatformChannels(device.ServerAddr)
if wrapper.fromCascade {
result, err := PlatformDao.QueryPlatformChannels(device.GetDomain())
if err != nil {
Sugar.Errorf("查询设备通道列表失败 err: %s device: %s", err.Error(), device.GetID())
}
channels = result
} else if wrapper.fromJt {
channels, _ = ChannelDao.QueryChannelsByRootID(device.GetID())
} else {
// 从模拟多个国标客户端中查找
channels = DeviceChannelsManager.FindChannels(device.GetID())
@@ -272,7 +301,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
case XmlNameNotify:
if CmdKeepalive == cmd {
// 下级设备心跳通知
ok = s.handler.OnKeepAlive(deviceId, req.Source())
ok = s.handler.OnKeepAlive(deviceId, wrapper.req.Source())
}
break
@@ -332,22 +361,28 @@ func (s *sipServer) ListenAddr() string {
}
// 过滤SIP消息、超找消息来源
func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool)) gosip.RequestHandler {
func filterRequest(f func(wrapper *SipRequestSource)) gosip.RequestHandler {
return func(req sip.Request, tx sip.ServerTransaction) {
source := req.Source()
// 是否是级联上级下发的请求
platform := PlatformManager.Find(source)
// 是否是部标设备上级下发的请求
var fromJt bool
if platform == nil {
fromJt = JTDeviceManager.ExistClientByServerAddr(req.Source())
}
switch req.Method() {
case sip.SUBSCRIBE, sip.INFO:
if platform == nil {
// SUBSCRIBE/INFO只能级发起
if platform == nil || fromJt {
// SUBSCRIBE/INFO只能本级域向下级发起
SendResponseWithStatusCode(req, tx, http.StatusBadRequest)
Sugar.Errorf("处理%s请求失败, %s消息只能上级发起. request: %s", req.Method(), req.Method(), req.String())
return
}
break
case sip.NOTIFY, sip.REGISTER:
if platform != nil {
if platform != nil || fromJt {
// NOTIFY和REGISTER只能下级发起
SendResponseWithStatusCode(req, tx, http.StatusBadRequest)
Sugar.Errorf("处理%s请求失败, %s消息只能下级发起. request: %s", req.Method(), req.Method(), req.String())
@@ -356,13 +391,19 @@ func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool
break
}
f(req, tx, platform != nil)
f(&SipRequestSource{
req,
tx,
platform != nil,
fromJt,
})
}
}
func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, error) {
ua := gosip.NewServer(gosip.ServerConfig{
Host: publicIP,
Host: publicIP,
UserAgent: "github/lkmio",
}, nil, nil, logger)
addr := net.JoinHostPort(listenIP, strconv.Itoa(listenPort))
@@ -392,11 +433,11 @@ func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, e
utils.Assert(ua.OnRequest(sip.NOTIFY, filterRequest(server.OnNotify)) == nil)
utils.Assert(ua.OnRequest(sip.MESSAGE, filterRequest(server.OnMessage)) == nil)
utils.Assert(ua.OnRequest(sip.INFO, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) {
utils.Assert(ua.OnRequest(sip.INFO, filterRequest(func(wrapper *SipRequestSource) {
})) == nil)
utils.Assert(ua.OnRequest(sip.CANCEL, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) {
utils.Assert(ua.OnRequest(sip.CANCEL, filterRequest(func(wrapper *SipRequestSource) {
})) == nil)
utils.Assert(ua.OnRequest(sip.SUBSCRIBE, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) {
utils.Assert(ua.OnRequest(sip.SUBSCRIBE, filterRequest(func(wrapper *SipRequestSource) {
})) == nil)
server.listenAddr = addr

View File

@@ -25,7 +25,7 @@ var (
UnregisterExpiresHeader = sip.Expires(0)
)
type SipClient interface {
type SIPUA interface {
doRegister(request sip.Request) bool
doUnregister()
@@ -37,10 +37,12 @@ type SipClient interface {
Stop()
SetOnRegisterHandler(online, offline func())
GetDomain() string
}
type SIPUAParams struct {
GBModel
type SIPUAOptions struct {
Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name
Username string `json:"username"` // 用户名
SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复.
ServerAddr string `json:"server_addr"` // 上级地址, 必选
@@ -51,17 +53,13 @@ type SIPUAParams struct {
Status OnlineStatus `json:"status"` // 在线状态
}
func (g *SIPUAParams) TableName() string {
return "lkm_virtual_device"
}
type sipClient struct {
SIPUAParams
type sipUA struct {
SIPUAOptions
ListenAddr string //UA的监听地址
NatAddr string //Nat地址
ua SipServer
stack SipServer
exited bool
ctx context.Context
cancel context.CancelFunc
@@ -74,7 +72,7 @@ type sipClient struct {
offlineCB func()
}
func (g *sipClient) doRegister(request sip.Request) bool {
func (g *sipUA) doRegister(request sip.Request) bool {
hop, _ := request.ViaHop()
empty := sip.String{}
hop.Params.Add("rport", &empty)
@@ -82,7 +80,7 @@ func (g *sipClient) doRegister(request sip.Request) bool {
for i := 0; i < 2; i++ {
//发起注册, 第一次未携带授权头, 第二次携带授权头
clientTransaction := g.ua.SendRequest(request)
clientTransaction := g.stack.SendRequest(request)
//等待响应
responses := clientTransaction.Responses()
@@ -118,7 +116,7 @@ func (g *sipClient) doRegister(request sip.Request) bool {
return false
}
func (g *sipClient) startNewRegister() bool {
func (g *sipUA) startNewRegister() bool {
builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport)
expires := sip.Expires(g.RegisterExpires)
builder.SetExpires(&expires)
@@ -159,30 +157,30 @@ func CopySipRequest(old sip.Request) sip.Request {
return request
}
func (g *sipClient) refreshRegister() bool {
func (g *sipUA) refreshRegister() bool {
request := CopySipRequest(g.registerOKRequest)
return g.doRegister(request)
}
func (g *sipClient) doUnregister() {
func (g *sipUA) doUnregister() {
request := CopySipRequest(g.registerOKRequest)
request.RemoveHeader("Expires")
request.AppendHeader(&UnregisterExpiresHeader)
g.ua.SendRequest(request)
g.stack.SendRequest(request)
if g.offlineCB != nil {
go g.offlineCB()
}
}
func (g *sipClient) doKeepalive() bool {
func (g *sipUA) doKeepalive() bool {
body := fmt.Sprintf(KeepAliveBody, time.Now().UnixMilli()/1000, g.Username)
request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport, body)
if err != nil {
panic(err)
}
transaction := g.ua.SendRequest(request)
transaction := g.stack.SendRequest(request)
responses := transaction.Responses()
var response sip.Response
@@ -197,7 +195,7 @@ func (g *sipClient) doKeepalive() bool {
}
// IsExpires 是否临近注册有效期
func (g *sipClient) IsExpires() (bool, int) {
func (g *sipUA) IsExpires() (bool, int) {
if !g.registerOK {
return false, 0
}
@@ -207,7 +205,7 @@ func (g *sipClient) IsExpires() (bool, int) {
}
// Refresh 处理Client的生命周期任务, 发起注册, 发送心跳,断开重连等, 并返回下次刷新任务时间
func (g *sipClient) Refresh() time.Duration {
func (g *sipUA) Refresh() time.Duration {
expires, _ := g.IsExpires()
if !g.registerOK || expires {
@@ -256,7 +254,7 @@ func (g *sipClient) Refresh() time.Duration {
return time.Duration(g.KeepaliveInterval) * time.Second
}
func (g *sipClient) Start() {
func (g *sipUA) Start() {
utils.Assert(!g.exited)
g.ctx, g.cancel = context.WithCancel(context.Background())
@@ -284,21 +282,24 @@ func (g *sipClient) Start() {
}()
}
func (g *sipClient) Stop() {
func (g *sipUA) Stop() {
utils.Assert(!g.exited)
if g.registerOK {
g.doUnregister()
}
g.exited = true
g.cancel()
g.registerOK = false
g.onlineCB = nil
g.offlineCB = nil
if g.registerOK {
g.doUnregister()
}
}
func (g *sipClient) SetOnRegisterHandler(online, offline func()) {
func (g *sipUA) SetOnRegisterHandler(online, offline func()) {
g.onlineCB = online
g.offlineCB = offline
}
func (g *sipUA) GetDomain() string {
return g.ServerAddr
}

View File

@@ -34,6 +34,15 @@ func (s SetupType) String() string {
panic("invalid setup type")
}
func (s SetupType) MediaProtocol() string {
switch s {
case SetupTypePassive, SetupTypeActive:
return "TCP/RTP/AVP"
default:
return "RTP/AVP"
}
}
// RequestWrapper sql序列化
type RequestWrapper struct {
sip.Request
@@ -71,7 +80,7 @@ func (r *RequestWrapper) Scan(value interface{}) error {
type Stream struct {
GBModel
StreamID StreamID `json:"stream_id"` // 流ID
Protocol string `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Dialog *RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发)
SetupType SetupType
@@ -158,7 +167,7 @@ func (s *Stream) Close(bye, ms bool) {
if ms {
// 告知媒体服务释放source
go CloseSource(string(s.StreamID))
go MSCloseSource(string(s.StreamID))
}
// 关闭所转发会话
@@ -170,7 +179,7 @@ func (s *Stream) Close(bye, ms bool) {
func (s *Stream) Bye() {
if s.Dialog != nil && s.Dialog.Request != nil {
go SipUA.SendRequest(s.CreateRequestFromDialog(sip.BYE))
go SipStack.SendRequest(s.CreateRequestFromDialog(sip.BYE))
s.Dialog = nil
}
}

5
xml.go
View File

@@ -17,8 +17,8 @@ type Channel struct {
GBModel
// RootID 是设备的根ID, 用于查询设备的所有通道.
RootID string `json:"-" xml:"-" gorm:"index"` // 根设备ID
TypeCode int `json:"-" xml:"-" gorm:"index"` // 设备类型编码
RootID string `json:"root_id" xml:"-" gorm:"index"` // 根设备ID
TypeCode int `json:"-" xml:"-" gorm:"index"` // 设备类型编码
// 所在组ID. 扩展的数据库字段, 方便查询某个目录下的设备列表.
// 如果ParentID不为空, ParentID作为组ID, 如果ParentID为空, BusinessGroupID作为组ID.
@@ -49,6 +49,7 @@ type Channel struct {
Longitude string `json:"longitude" xml:"Longitude,omitempty"`
Latitude string `json:"latitude" xml:"Latitude,omitempty"`
SetupType SetupType `json:"setup_type,omitempty"`
ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号
}
func (d *Channel) Online() bool {

View File

@@ -63,6 +63,6 @@ type RecordInfo struct {
func (d *Device) DoQueryRecordList(channelId, startTime, endTime string, sn int, type_ string) error {
body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_)
request := d.BuildMessageRequest(channelId, body)
SipUA.SendRequest(request)
SipStack.SendRequest(request)
return nil
}