diff --git a/api.go b/api.go index dccd81f..9e93cf0 100644 --- a/api.go +++ b/api.go @@ -290,7 +290,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) { Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream) - sink := RemoveForwardSink(params.Stream, params.Sink) + sink, _ := SinkDao.DeleteForwardSink(params.Stream, params.Sink) if sink == nil { return } @@ -606,7 +606,7 @@ func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *htt Sugar.Infof("广播挂断 %v", *v) id := GenerateStreamID(InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "") - if sink := RemoveForwardSinkWithSinkStreamID(id); sink != nil { + if sink, _ := SinkDao.DeleteForwardSinkBySinkStreamID(id); sink != nil { sink.Close(true, true) } diff --git a/client.go b/client.go index 0565722..cc22aee 100644 --- a/client.go +++ b/client.go @@ -30,8 +30,6 @@ type GBClient interface { OnQueryDeviceInfo(sn int) OnSubscribeCatalog(sn int) - - CloseStream(callId string, bye, ms bool) } type gbClient struct { @@ -98,10 +96,6 @@ func (g *gbClient) OnSubscribeCatalog(sn int) { } -func (g *gbClient) CloseStream(callId string, bye, ms bool) { - -} - type GBSDP struct { sdp *sdp.SDP ssrc string diff --git a/sink.go b/sink.go index 7fa3566..4bd8531 100644 --- a/sink.go +++ b/sink.go @@ -4,6 +4,8 @@ import ( "encoding/json" "github.com/ghettovoice/gosip/sip" "github.com/ghettovoice/gosip/sip/parser" + "net/http" + "net/url" ) // Sink 级联/对讲/网关转发流Sink @@ -89,3 +91,45 @@ func (s *Sink) SetDialog(dialog sip.Request) { id, _ := dialog.CallID() s.CallID = id.Value() } + +// AddForwardSink 向流媒体服务添加转发Sink +func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) { + urlParams := make(url.Values) + if TransStreamGBTalk == forwardType { + urlParams.Add("forward_type", "broadcast") + } else if TransStreamGBCascaded == forwardType { + urlParams.Add("forward_type", "cascaded") + } else if TransStreamGBGateway == forwardType { + urlParams.Add("forward_type", "gateway_1078") + } + + ip, port, sinkID, ssrc, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams) + if err != nil { + Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error()) + if InviteTypePlay != inviteType { + CloseStream(streamId, true) + } + + return nil, err + } + + sink.SinkID = sinkID + // 创建answer + answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, ssrc, attrs...) + response := CreateResponseWithStatusCode(request, http.StatusOK) + + // answer添加contact头域 + response.RemoveHeader("Contact") + response.AppendHeader(GlobalContactAddress.AsContactHeader()) + response.AppendHeader(&SDPMessageType) + response.SetBody(answer, true) + setToTag(response) + + sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source())) + + if err = SinkDao.SaveForwardSink(streamId, sink); err != nil { + Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error()) + } + + return response, nil +} diff --git a/sink_manager.go b/sink_manager.go deleted file mode 100644 index acf192e..0000000 --- a/sink_manager.go +++ /dev/null @@ -1,111 +0,0 @@ -package main - -import ( - "github.com/ghettovoice/gosip/sip" - "net/http" - "net/url" -) - -func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) { - urlParams := make(url.Values) - if TransStreamGBTalk == forwardType { - urlParams.Add("forward_type", "broadcast") - } else if TransStreamGBCascaded == forwardType { - urlParams.Add("forward_type", "cascaded") - } else if TransStreamGBGateway == forwardType { - urlParams.Add("forward_type", "gateway_1078") - } - - ip, port, sinkID, ssrc, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams) - if err != nil { - Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error()) - if InviteTypePlay != inviteType { - CloseStream(streamId, true) - } - - return nil, err - } - - sink.SinkID = sinkID - // 创建answer - answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, ssrc, attrs...) - response := CreateResponseWithStatusCode(request, http.StatusOK) - - // answer添加contact头域 - response.RemoveHeader("Contact") - response.AppendHeader(GlobalContactAddress.AsContactHeader()) - response.AppendHeader(&SDPMessageType) - response.SetBody(answer, true) - setToTag(response) - - sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source())) - - if err = SinkDao.SaveForwardSink(streamId, sink); err != nil { - Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error()) - } - - return response, nil -} - -func RemoveForwardSink(StreamID StreamID, sinkID string) *Sink { - sink, _ := SinkDao.DeleteForwardSink(StreamID, sinkID) - if sink == nil { - return nil - } - - releaseSink(sink) - return sink -} - -func RemoveForwardSinkWithCallId(callId string) *Sink { - sink, _ := SinkDao.DeleteForwardSinkByCallID(callId) - if sink == nil { - return nil - } - - releaseSink(sink) - return sink -} - -func RemoveForwardSinkWithSinkStreamID(sinkStreamId StreamID) *Sink { - sink, _ := SinkDao.DeleteForwardSinkBySinkStreamID(sinkStreamId) - if sink == nil { - return nil - } - - releaseSink(sink) - return sink -} - -func releaseSink(sink *Sink) { - // 减少拉流计数 - //if stream := StreamManager.Find(sink.StreamID); stream != nil { - // stream.DecreaseSinkCount() - //} -} - -func closeSink(sink *Sink, bye, ms bool) { - releaseSink(sink) - - var callId string - if sink.Dialog != nil { - callId_, _ := sink.Dialog.CallID() - callId = callId_.Value() - } - - platform := PlatformManager.Find(sink.ServerAddr) - if platform != nil { - platform.CloseStream(callId, bye, ms) - } else { - sink.Close(bye, ms) - } -} - -func CloseStreamSinks(StreamID StreamID, bye, ms bool) []*Sink { - sinks, _ := SinkDao.DeleteForwardSinksByStreamID(StreamID) - for _, sink := range sinks { - closeSink(sink, bye, ms) - } - - return sinks -} diff --git a/stream.go b/stream.go index 6d2b28d..3ac63d1 100644 --- a/stream.go +++ b/stream.go @@ -209,3 +209,13 @@ func CloseStream(streamId StreamID, ms bool) { deleteStream.Close(true, ms) } } + +// CloseStreamSinks 关闭某个流的所有sink +func CloseStreamSinks(StreamID StreamID, bye, ms bool) []*Sink { + sinks, _ := SinkDao.DeleteForwardSinksByStreamID(StreamID) + for _, sink := range sinks { + sink.Close(bye, ms) + } + + return sinks +}