mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-27 11:32:26 +08:00
sink hook单独封装
This commit is contained in:
@@ -61,7 +61,7 @@ func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState)
|
|||||||
|
|
||||||
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String())
|
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String())
|
||||||
|
|
||||||
sink.Play(func() {
|
stream.HookPlaying(sink, func() {
|
||||||
s.handle = sink
|
s.handle = sink
|
||||||
response <- utils.HookStateOK
|
response <- utils.HookStateOK
|
||||||
}, func(state utils.HookState) {
|
}, func(state utils.HookState) {
|
||||||
|
@@ -43,10 +43,13 @@ const (
|
|||||||
type requestHandler interface {
|
type requestHandler interface {
|
||||||
onOptions(sourceId string, headers textproto.MIMEHeader)
|
onOptions(sourceId string, headers textproto.MIMEHeader)
|
||||||
|
|
||||||
|
//获取spd
|
||||||
onDescribe(sourceId string, headers textproto.MIMEHeader)
|
onDescribe(sourceId string, headers textproto.MIMEHeader)
|
||||||
|
|
||||||
|
//订阅track
|
||||||
onSetup(sourceId string, index int, headers textproto.MIMEHeader)
|
onSetup(sourceId string, index int, headers textproto.MIMEHeader)
|
||||||
|
|
||||||
|
//播放
|
||||||
onPlay(sourceId string)
|
onPlay(sourceId string)
|
||||||
|
|
||||||
onTeardown()
|
onTeardown()
|
||||||
@@ -57,7 +60,7 @@ type requestHandler interface {
|
|||||||
type session struct {
|
type session struct {
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
|
|
||||||
sink_ *sink
|
sink_ *Sink
|
||||||
sessionId string
|
sessionId string
|
||||||
writeBuffer *bytes.Buffer
|
writeBuffer *bytes.Buffer
|
||||||
}
|
}
|
||||||
@@ -162,8 +165,9 @@ func (s *session) onDescribe(source string, headers textproto.MIMEHeader) error
|
|||||||
})
|
})
|
||||||
|
|
||||||
code := utils.HookStateOK
|
code := utils.HookStateOK
|
||||||
s.sink_ = sink_.(*sink)
|
s.sink_ = sink_
|
||||||
sink_.Play(func() {
|
|
||||||
|
stream.HookPlaying(sink_, func() {
|
||||||
|
|
||||||
}, func(state utils.HookState) {
|
}, func(state utils.HookState) {
|
||||||
code = state
|
code = state
|
||||||
@@ -187,8 +191,6 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
|
|||||||
return fmt.Errorf("failed to parsing TRANSPORT header:%s", split)
|
return fmt.Errorf("failed to parsing TRANSPORT header:%s", split)
|
||||||
}
|
}
|
||||||
|
|
||||||
var clientRtpPort int
|
|
||||||
var clientRtcpPort int
|
|
||||||
tcp := "RTP/AVP" != split[0] && "RTP/AVP/UDP" != split[0]
|
tcp := "RTP/AVP" != split[0] && "RTP/AVP/UDP" != split[0]
|
||||||
if !tcp {
|
if !tcp {
|
||||||
for _, value := range split {
|
for _, value := range split {
|
||||||
@@ -205,13 +207,13 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
clientRtpPort = port
|
_ = port
|
||||||
|
|
||||||
port, err = strconv.Atoi(pairPort[1])
|
port, err = strconv.Atoi(pairPort[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
clientRtcpPort = port
|
_ = port
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,8 +222,6 @@ func (s *session) onSetup(sourceId string, index int, headers textproto.MIMEHead
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
println(clientRtpPort)
|
|
||||||
println(clientRtcpPort)
|
|
||||||
responseHeader := transportHeader
|
responseHeader := transportHeader
|
||||||
if tcp {
|
if tcp {
|
||||||
//修改interleaved为实际的stream index
|
//修改interleaved为实际的stream index
|
||||||
|
@@ -63,6 +63,43 @@ func init() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sendHookEvent(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
|
||||||
|
marshal, err := json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: time.Second * time.Duration(AppConfig.Hook.Time),
|
||||||
|
}
|
||||||
|
request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
request.Header.Set("Content-Type", "application/json")
|
||||||
|
response, err := client.Do(request)
|
||||||
|
if err != nil {
|
||||||
|
failure(response, err)
|
||||||
|
} else if response.StatusCode != http.StatusOK {
|
||||||
|
failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status))
|
||||||
|
} else {
|
||||||
|
success(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func hookEvent(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
|
||||||
|
url := hookUrls[event]
|
||||||
|
if url == "" {
|
||||||
|
success(nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return sendHookEvent(url, body, success, failure)
|
||||||
|
}
|
||||||
|
|
||||||
type hookSessionImpl struct {
|
type hookSessionImpl struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,15 +127,9 @@ func (h *hookSessionImpl) send(url string, body interface{}, success func(respon
|
|||||||
success(response)
|
success(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return sendHookEvent(url, body, success, failure)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hookSessionImpl) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
|
func (h *hookSessionImpl) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
|
||||||
url := hookUrls[event]
|
return hookEvent(event, body, success, failure)
|
||||||
if url == "" {
|
|
||||||
success(nil)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.send(url, body, success, failure)
|
|
||||||
}
|
}
|
||||||
|
@@ -3,16 +3,12 @@ package stream
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"github.com/yangjiechina/live-server/log"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type SinkId interface{}
|
type SinkId interface{}
|
||||||
|
|
||||||
type ISink interface {
|
type ISink interface {
|
||||||
HookHandler
|
|
||||||
|
|
||||||
Id() SinkId
|
Id() SinkId
|
||||||
|
|
||||||
Input(data []byte) error
|
Input(data []byte) error
|
||||||
@@ -191,45 +187,3 @@ func (s *SinkImpl) Close() {
|
|||||||
func (s *SinkImpl) PrintInfo() string {
|
func (s *SinkImpl) PrintInfo() string {
|
||||||
return fmt.Sprintf("%s-%v source:%s", s.ProtocolStr(), s.Id_, s.SourceId_)
|
return fmt.Sprintf("%s-%v source:%s", s.ProtocolStr(), s.Id_, s.SourceId_)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Play(success func(), failure func(state utils.HookState)) {
|
|
||||||
f := func() {
|
|
||||||
source := SourceManager.Find(s.SourceId())
|
|
||||||
if source == nil {
|
|
||||||
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId())
|
|
||||||
|
|
||||||
s.SetState(SessionStateWait)
|
|
||||||
AddSinkToWaitingQueue(s.SourceId(), s)
|
|
||||||
} else {
|
|
||||||
log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId())
|
|
||||||
|
|
||||||
source.AddEvent(SourceEventPlay, s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !AppConfig.Hook.EnableOnPlay() {
|
|
||||||
f()
|
|
||||||
success()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err := s.Hook(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) {
|
|
||||||
f()
|
|
||||||
success()
|
|
||||||
}, func(response *http.Response, err error) {
|
|
||||||
log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
|
|
||||||
|
|
||||||
failure(utils.HookStateFailure)
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
|
|
||||||
|
|
||||||
failure(utils.HookStateFailure)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SinkImpl) PlayDone(success func(), failure func(state utils.HookState)) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
45
stream/sink_hook.go
Normal file
45
stream/sink_hook.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
|
"github.com/yangjiechina/live-server/log"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) {
|
||||||
|
f := func() {
|
||||||
|
source := SourceManager.Find(s.SourceId())
|
||||||
|
if source == nil {
|
||||||
|
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId())
|
||||||
|
|
||||||
|
s.SetState(SessionStateWait)
|
||||||
|
AddSinkToWaitingQueue(s.SourceId(), s)
|
||||||
|
} else {
|
||||||
|
log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId())
|
||||||
|
|
||||||
|
source.AddEvent(SourceEventPlay, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !AppConfig.Hook.EnableOnPlay() {
|
||||||
|
f()
|
||||||
|
success()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) {
|
||||||
|
f()
|
||||||
|
success()
|
||||||
|
}, func(response *http.Response, err error) {
|
||||||
|
log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
|
||||||
|
|
||||||
|
failure(utils.HookStateFailure)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
|
||||||
|
|
||||||
|
failure(utils.HookStateFailure)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
@@ -2,6 +2,7 @@ package stream
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/yangjiechina/live-server/log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -328,6 +329,8 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
|
|||||||
s.transStreams = make(map[TransStreamId]ITransStream, 10)
|
s.transStreams = make(map[TransStreamId]ITransStream, 10)
|
||||||
}
|
}
|
||||||
//创建一个新的传输流
|
//创建一个新的传输流
|
||||||
|
log.Sugar.Debugf("创建%s-stream", sink.ProtocolStr())
|
||||||
|
|
||||||
transStream = TransStreamFactory(s, sink.Protocol(), streams[:size])
|
transStream = TransStreamFactory(s, sink.Protocol(), streams[:size])
|
||||||
s.transStreams[transStreamId] = transStream
|
s.transStreams[transStreamId] = transStream
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user