mirror of
https://github.com/lkmio/gb-cms.git
synced 2025-09-26 19:51:22 +08:00
Compare commits
2 Commits
4b104a2d5c
...
e2938976e7
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e2938976e7 | ||
![]() |
f14fe4a0c4 |
4
api.go
4
api.go
@@ -290,7 +290,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
|
||||
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
|
||||
Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
||||
|
||||
sink := RemoveForwardSink(params.Stream, params.Sink)
|
||||
sink, _ := SinkDao.DeleteForwardSink(params.Stream, params.Sink)
|
||||
if sink == nil {
|
||||
return
|
||||
}
|
||||
@@ -606,7 +606,7 @@ func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *htt
|
||||
Sugar.Infof("广播挂断 %v", *v)
|
||||
|
||||
id := GenerateStreamID(InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "")
|
||||
if sink := RemoveForwardSinkWithSinkStreamID(id); sink != nil {
|
||||
if sink, _ := SinkDao.DeleteForwardSinkBySinkStreamID(id); sink != nil {
|
||||
sink.Close(true, true)
|
||||
}
|
||||
|
||||
|
@@ -30,8 +30,6 @@ type GBClient interface {
|
||||
OnQueryDeviceInfo(sn int)
|
||||
|
||||
OnSubscribeCatalog(sn int)
|
||||
|
||||
CloseStream(callId string, bye, ms bool)
|
||||
}
|
||||
|
||||
type gbClient struct {
|
||||
@@ -98,10 +96,6 @@ func (g *gbClient) OnSubscribeCatalog(sn int) {
|
||||
|
||||
}
|
||||
|
||||
func (g *gbClient) CloseStream(callId string, bye, ms bool) {
|
||||
|
||||
}
|
||||
|
||||
type GBSDP struct {
|
||||
sdp *sdp.SDP
|
||||
ssrc string
|
||||
|
44
sink.go
44
sink.go
@@ -4,6 +4,8 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"github.com/ghettovoice/gosip/sip/parser"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// Sink 级联/对讲/网关转发流Sink
|
||||
@@ -89,3 +91,45 @@ func (s *Sink) SetDialog(dialog sip.Request) {
|
||||
id, _ := dialog.CallID()
|
||||
s.CallID = id.Value()
|
||||
}
|
||||
|
||||
// AddForwardSink 向流媒体服务添加转发Sink
|
||||
func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) {
|
||||
urlParams := make(url.Values)
|
||||
if TransStreamGBTalk == forwardType {
|
||||
urlParams.Add("forward_type", "broadcast")
|
||||
} else if TransStreamGBCascaded == forwardType {
|
||||
urlParams.Add("forward_type", "cascaded")
|
||||
} else if TransStreamGBGateway == forwardType {
|
||||
urlParams.Add("forward_type", "gateway_1078")
|
||||
}
|
||||
|
||||
ip, port, sinkID, ssrc, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams)
|
||||
if err != nil {
|
||||
Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
|
||||
if InviteTypePlay != inviteType {
|
||||
CloseStream(streamId, true)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sink.SinkID = sinkID
|
||||
// 创建answer
|
||||
answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, ssrc, attrs...)
|
||||
response := CreateResponseWithStatusCode(request, http.StatusOK)
|
||||
|
||||
// answer添加contact头域
|
||||
response.RemoveHeader("Contact")
|
||||
response.AppendHeader(GlobalContactAddress.AsContactHeader())
|
||||
response.AppendHeader(&SDPMessageType)
|
||||
response.SetBody(answer, true)
|
||||
setToTag(response)
|
||||
|
||||
sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source()))
|
||||
|
||||
if err = SinkDao.SaveForwardSink(streamId, sink); err != nil {
|
||||
Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error())
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
111
sink_manager.go
111
sink_manager.go
@@ -1,111 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) {
|
||||
urlParams := make(url.Values)
|
||||
if TransStreamGBTalk == forwardType {
|
||||
urlParams.Add("forward_type", "broadcast")
|
||||
} else if TransStreamGBCascaded == forwardType {
|
||||
urlParams.Add("forward_type", "cascaded")
|
||||
} else if TransStreamGBGateway == forwardType {
|
||||
urlParams.Add("forward_type", "gateway_1078")
|
||||
}
|
||||
|
||||
ip, port, sinkID, ssrc, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams)
|
||||
if err != nil {
|
||||
Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
|
||||
if InviteTypePlay != inviteType {
|
||||
CloseStream(streamId, true)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sink.SinkID = sinkID
|
||||
// 创建answer
|
||||
answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, ssrc, attrs...)
|
||||
response := CreateResponseWithStatusCode(request, http.StatusOK)
|
||||
|
||||
// answer添加contact头域
|
||||
response.RemoveHeader("Contact")
|
||||
response.AppendHeader(GlobalContactAddress.AsContactHeader())
|
||||
response.AppendHeader(&SDPMessageType)
|
||||
response.SetBody(answer, true)
|
||||
setToTag(response)
|
||||
|
||||
sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source()))
|
||||
|
||||
if err = SinkDao.SaveForwardSink(streamId, sink); err != nil {
|
||||
Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error())
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func RemoveForwardSink(StreamID StreamID, sinkID string) *Sink {
|
||||
sink, _ := SinkDao.DeleteForwardSink(StreamID, sinkID)
|
||||
if sink == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
releaseSink(sink)
|
||||
return sink
|
||||
}
|
||||
|
||||
func RemoveForwardSinkWithCallId(callId string) *Sink {
|
||||
sink, _ := SinkDao.DeleteForwardSinkByCallID(callId)
|
||||
if sink == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
releaseSink(sink)
|
||||
return sink
|
||||
}
|
||||
|
||||
func RemoveForwardSinkWithSinkStreamID(sinkStreamId StreamID) *Sink {
|
||||
sink, _ := SinkDao.DeleteForwardSinkBySinkStreamID(sinkStreamId)
|
||||
if sink == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
releaseSink(sink)
|
||||
return sink
|
||||
}
|
||||
|
||||
func releaseSink(sink *Sink) {
|
||||
// 减少拉流计数
|
||||
//if stream := StreamManager.Find(sink.StreamID); stream != nil {
|
||||
// stream.DecreaseSinkCount()
|
||||
//}
|
||||
}
|
||||
|
||||
func closeSink(sink *Sink, bye, ms bool) {
|
||||
releaseSink(sink)
|
||||
|
||||
var callId string
|
||||
if sink.Dialog != nil {
|
||||
callId_, _ := sink.Dialog.CallID()
|
||||
callId = callId_.Value()
|
||||
}
|
||||
|
||||
platform := PlatformManager.Find(sink.ServerAddr)
|
||||
if platform != nil {
|
||||
platform.CloseStream(callId, bye, ms)
|
||||
} else {
|
||||
sink.Close(bye, ms)
|
||||
}
|
||||
}
|
||||
|
||||
func CloseStreamSinks(StreamID StreamID, bye, ms bool) []*Sink {
|
||||
sinks, _ := SinkDao.DeleteForwardSinksByStreamID(StreamID)
|
||||
for _, sink := range sinks {
|
||||
closeSink(sink, bye, ms)
|
||||
}
|
||||
|
||||
return sinks
|
||||
}
|
10
stream.go
10
stream.go
@@ -209,3 +209,13 @@ func CloseStream(streamId StreamID, ms bool) {
|
||||
deleteStream.Close(true, ms)
|
||||
}
|
||||
}
|
||||
|
||||
// CloseStreamSinks 关闭某个流的所有sink
|
||||
func CloseStreamSinks(StreamID StreamID, bye, ms bool) []*Sink {
|
||||
sinks, _ := SinkDao.DeleteForwardSinksByStreamID(StreamID)
|
||||
for _, sink := range sinks {
|
||||
sink.Close(bye, ms)
|
||||
}
|
||||
|
||||
return sinks
|
||||
}
|
||||
|
Reference in New Issue
Block a user