diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 005ff06..f847980 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -61,7 +61,7 @@ func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String()) - sink.Play(func() { + stream.HookPlaying(sink, func() { s.handle = sink response <- utils.HookStateOK }, func(state utils.HookState) { diff --git a/rtsp/rtsp_session.go b/rtsp/rtsp_session.go index 8319218..0f9ba56 100644 --- a/rtsp/rtsp_session.go +++ b/rtsp/rtsp_session.go @@ -43,10 +43,13 @@ const ( type requestHandler interface { onOptions(sourceId string, headers textproto.MIMEHeader) + //获取spd onDescribe(sourceId string, headers textproto.MIMEHeader) + //订阅track onSetup(sourceId string, index int, headers textproto.MIMEHeader) + //播放 onPlay(sourceId string) onTeardown() @@ -57,7 +60,7 @@ type requestHandler interface { type session struct { conn net.Conn - sink_ *sink + sink_ *Sink sessionId string writeBuffer *bytes.Buffer } @@ -162,8 +165,9 @@ func (s *session) onDescribe(source string, headers textproto.MIMEHeader) error }) code := utils.HookStateOK - s.sink_ = sink_.(*sink) - sink_.Play(func() { + s.sink_ = sink_ + + stream.HookPlaying(sink_, func() { }, func(state utils.HookState) { code = state @@ -187,8 +191,6 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead return fmt.Errorf("failed to parsing TRANSPORT header:%s", split) } - var clientRtpPort int - var clientRtcpPort int tcp := "RTP/AVP" != split[0] && "RTP/AVP/UDP" != split[0] if !tcp { for _, value := range split { @@ -205,13 +207,13 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead if err != nil { return err } - clientRtpPort = port + _ = port port, err = strconv.Atoi(pairPort[1]) if err != nil { return err } - clientRtcpPort = port + _ = port } } @@ -220,8 +222,6 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead return err } - println(clientRtpPort) - println(clientRtcpPort) responseHeader := transportHeader if tcp { //修改interleaved为实际的stream index diff --git a/stream/hook.go b/stream/hook.go index d551e04..e9edb5f 100644 --- a/stream/hook.go +++ b/stream/hook.go @@ -63,6 +63,43 @@ func init() { } } +func sendHookEvent(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { + marshal, err := json.Marshal(body) + if err != nil { + return err + } + + client := &http.Client{ + Timeout: time.Second * time.Duration(AppConfig.Hook.Time), + } + request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal)) + if err != nil { + return err + } + + request.Header.Set("Content-Type", "application/json") + response, err := client.Do(request) + if err != nil { + failure(response, err) + } else if response.StatusCode != http.StatusOK { + failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status)) + } else { + success(response) + } + + return nil +} + +func hookEvent(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { + url := hookUrls[event] + if url == "" { + success(nil) + return nil + } + + return sendHookEvent(url, body, success, failure) +} + type hookSessionImpl struct { } @@ -90,15 +127,9 @@ func (h *hookSessionImpl) send(url string, body interface{}, success func(respon success(response) } - return nil + return sendHookEvent(url, body, success, failure) } func (h *hookSessionImpl) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { - url := hookUrls[event] - if url == "" { - success(nil) - return nil - } - - return h.send(url, body, success, failure) + return hookEvent(event, body, success, failure) } diff --git a/stream/sink.go b/stream/sink.go index 54c16f5..b99d4be 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -3,16 +3,12 @@ package stream import ( "fmt" "github.com/yangjiechina/avformat/utils" - "github.com/yangjiechina/live-server/log" "net" - "net/http" ) type SinkId interface{} type ISink interface { - HookHandler - Id() SinkId Input(data []byte) error @@ -191,45 +187,3 @@ func (s *SinkImpl) Close() { func (s *SinkImpl) PrintInfo() string { return fmt.Sprintf("%s-%v source:%s", s.ProtocolStr(), s.Id_, s.SourceId_) } - -func (s *SinkImpl) Play(success func(), failure func(state utils.HookState)) { - f := func() { - source := SourceManager.Find(s.SourceId()) - if source == nil { - log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) - - s.SetState(SessionStateWait) - AddSinkToWaitingQueue(s.SourceId(), s) - } else { - log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) - - source.AddEvent(SourceEventPlay, s) - } - } - - if !AppConfig.Hook.EnableOnPlay() { - f() - success() - return - } - - err := s.Hook(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { - f() - success() - }, func(response *http.Response, err error) { - log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) - - failure(utils.HookStateFailure) - }) - - if err != nil { - log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) - - failure(utils.HookStateFailure) - return - } -} - -func (s *SinkImpl) PlayDone(success func(), failure func(state utils.HookState)) { - -} diff --git a/stream/sink_hook.go b/stream/sink_hook.go new file mode 100644 index 0000000..d95008c --- /dev/null +++ b/stream/sink_hook.go @@ -0,0 +1,45 @@ +package stream + +import ( + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/live-server/log" + "net/http" +) + +func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) { + f := func() { + source := SourceManager.Find(s.SourceId()) + if source == nil { + log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) + + s.SetState(SessionStateWait) + AddSinkToWaitingQueue(s.SourceId(), s) + } else { + log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) + + source.AddEvent(SourceEventPlay, s) + } + } + + if !AppConfig.Hook.EnableOnPlay() { + f() + success() + return + } + + err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { + f() + success() + }, func(response *http.Response, err error) { + log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) + + failure(utils.HookStateFailure) + }) + + if err != nil { + log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) + + failure(utils.HookStateFailure) + return + } +} diff --git a/stream/source.go b/stream/source.go index f6bd8d9..d2f4b8b 100644 --- a/stream/source.go +++ b/stream/source.go @@ -2,6 +2,7 @@ package stream import ( "fmt" + "github.com/yangjiechina/live-server/log" "net" "net/http" "sync" @@ -328,6 +329,8 @@ func (s *SourceImpl) AddSink(sink ISink) bool { s.transStreams = make(map[TransStreamId]ITransStream, 10) } //创建一个新的传输流 + log.Sugar.Debugf("创建%s-stream", sink.ProtocolStr()) + transStream = TransStreamFactory(s, sink.Protocol(), streams[:size]) s.transStreams[transStreamId] = transStream