打印http hook响应

This commit is contained in:
yangjiechina
2024-07-16 21:17:22 +08:00
parent 6a9a797f3f
commit c353600844
9 changed files with 42 additions and 31 deletions

View File

@@ -67,7 +67,7 @@ func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
//单端口模式,ssrc匹配source //单端口模式,ssrc匹配source
if session.source == nil { if session.source == nil {
//匹配不到直接关闭 //匹配不到直接关闭
source := filter.FindSource(packet.SSRC) source := filter.FindSource(packet.SSRC)
if source == nil { if source == nil {
log.Sugar.Errorf("gb28181推流失败 ssrc:%x配置不到source conn:%s data:%s", packet.SSRC, session.conn.RemoteAddr().String(), hex.EncodeToString(bytes)) log.Sugar.Errorf("gb28181推流失败 ssrc:%x配置不到source conn:%s data:%s", packet.SSRC, session.conn.RemoteAddr().String(), hex.EncodeToString(bytes))

View File

@@ -128,9 +128,7 @@ func main() {
if stream.AppConfig.Hook.IsEnableOnStarted() { if stream.AppConfig.Hook.IsEnableOnStarted() {
go func() { go func() {
if _, err := stream.Hook(stream.HookEventStarted, "", nil); err != nil { _, _ = stream.Hook(stream.HookEventStarted, "", nil)
log.Sugar.Errorf("发送启动通知失败 err:%s", err.Error())
}
}() }()
} }

View File

@@ -112,7 +112,7 @@ func (t *transStream) AddSink(sink_ stream.Sink) error {
log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), rtcSink.Id_, rtcSink.SourceId_) log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), rtcSink.Id_, rtcSink.SourceId_)
if state > webrtc.ICEConnectionStateDisconnected { if state > webrtc.ICEConnectionStateDisconnected {
log.Sugar.Errorf("webrtc peer断开接 sink:%v source:%s", rtcSink.Id_, rtcSink.SourceId_) log.Sugar.Errorf("webrtc peer断开接 sink:%v source:%s", rtcSink.Id_, rtcSink.SourceId_)
rtcSink.Close() rtcSink.Close()
} }
}) })

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/lkmio/lkm/log" "github.com/lkmio/lkm/log"
"io"
"net/http" "net/http"
"time" "time"
) )
@@ -16,28 +17,25 @@ type eventInfo struct {
RemoteAddr string `json:"remote_addr"` //peer地址 RemoteAddr string `json:"remote_addr"` //peer地址
} }
func NewHookPlayEventInfo(sink Sink) eventInfo { func responseBodyToString(resp *http.Response) string {
return eventInfo{Stream: sink.SourceId(), Protocol: sink.Protocol().ToString(), RemoteAddr: sink.PrintInfo()} bodyBytes, err := io.ReadAll(resp.Body)
}
func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{Stream: source.Id(), Protocol: source.Type().ToString(), RemoteAddr: source.RemoteAddr()}
}
func sendHookEvent(url string, body interface{}) (*http.Response, error) {
marshal, err := json.Marshal(body)
if err != nil { if err != nil {
return nil, err return ""
} }
resp.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
return string(bodyBytes)
}
func sendHookEvent(url string, body []byte) (*http.Response, error) {
client := &http.Client{ client := &http.Client{
Timeout: time.Duration(AppConfig.Hook.Timeout), Timeout: time.Duration(AppConfig.Hook.Timeout),
} }
request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal)) request, err := http.NewRequest("post", url, bytes.NewBuffer(body))
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Sugar.Infof("发送hook通知 url:%s body:%s", url, marshal)
request.Header.Set("Content-Type", "application/json") request.Header.Set("Content-Type", "application/json")
return client.Do(request) return client.Do(request)
} }
@@ -48,14 +46,34 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err
return nil, fmt.Errorf("the url for this %s event does not exist", event.ToString()) return nil, fmt.Errorf("the url for this %s event does not exist", event.ToString())
} }
bytes, err := json.Marshal(body)
if err != nil {
return nil, err
}
if "" != params { if "" != params {
url += "?" + params url += "?" + params
} }
response, err := sendHookEvent(url, body) log.Sugar.Infof("sent a hook event for %s. url: %s body: %s", event.ToString(), url, bytes)
response, err := sendHookEvent(url, bytes)
if err != nil {
log.Sugar.Errorf("failed to %s the hook event. err: %s", event.ToString(), err.Error())
} else {
log.Sugar.Infof("received response for hook %s event: status='%s', response body='%s'", event.ToString(), response.Status, responseBodyToString(response))
}
if err == nil && http.StatusOK != response.StatusCode { if err == nil && http.StatusOK != response.StatusCode {
return response, fmt.Errorf("reason %s", response.Status) return response, fmt.Errorf("unexpected response status: %s for request %s", response.Status, url)
} }
return response, err return response, err
} }
func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{Stream: sink.SourceId(), Protocol: sink.Protocol().ToString(), RemoteAddr: sink.PrintInfo()}
}
func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{Stream: source.Id(), Protocol: source.Type().ToString(), RemoteAddr: source.RemoteAddr()}
}

View File

@@ -47,6 +47,8 @@ func (h *HookEvent) ToString() string {
return "idle timeout" return "idle timeout"
} else if HookEventReceiveTimeout == *h { } else if HookEventReceiveTimeout == *h {
return "receive timeout" return "receive timeout"
} else if HookEventStarted == *h {
return "started"
} }
panic(fmt.Sprintf("unknow hook type %d", h)) panic(fmt.Sprintf("unknow hook type %d", h))

View File

@@ -29,7 +29,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
defer sink.UnLock() defer sink.UnLock()
if SessionStateClosed == sink.State() { if SessionStateClosed == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开接 %s", sink.Id()) log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开接 %s", sink.Id())
return response, utils.HookStateFailure return response, utils.HookStateFailure
} else { } else {
sink.SetState(SessionStateWait) sink.SetState(SessionStateWait)

View File

@@ -20,7 +20,6 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS
} }
if err := SourceManager.Add(source); err != nil { if err := SourceManager.Add(source); err != nil {
log.Sugar.Errorf("添加源失败 source:%s err:%s", source.Id(), err.Error())
return nil, utils.HookStateOccupy return nil, utils.HookStateOccupy
} }
@@ -45,7 +44,6 @@ func HookPublishEvent(source Source) (*http.Response, utils.HookState) {
if AppConfig.Hook.IsEnablePublishEvent() { if AppConfig.Hook.IsEnablePublishEvent() {
hook, err := Hook(HookEventPublish, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) hook, err := Hook(HookEventPublish, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil { if err != nil {
log.Sugar.Errorf("通知推流事件失败 source:%s err:%s", source.Id(), err.Error())
return hook, utils.HookStateFailure return hook, utils.HookStateFailure
} }
@@ -57,10 +55,7 @@ func HookPublishEvent(source Source) (*http.Response, utils.HookState) {
func HookPublishDoneEvent(source Source) { func HookPublishDoneEvent(source Source) {
if AppConfig.Hook.IsEnablePublishEvent() { if AppConfig.Hook.IsEnablePublishEvent() {
_, err := Hook(HookEventPublishDone, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) _, _ = Hook(HookEventPublishDone, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知推流结束事件失败 source:%s err:%s", source.Id(), err.Error())
}
} }
} }
@@ -70,7 +65,6 @@ func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) {
if AppConfig.Hook.IsEnableOnReceiveTimeout() { if AppConfig.Hook.IsEnableOnReceiveTimeout() {
resp, err := Hook(HookEventReceiveTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) resp, err := Hook(HookEventReceiveTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil { if err != nil {
log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error())
return resp, utils.HookStateFailure return resp, utils.HookStateFailure
} }
@@ -86,7 +80,6 @@ func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState) {
if AppConfig.Hook.IsEnableOnIdleTimeout() { if AppConfig.Hook.IsEnableOnIdleTimeout() {
resp, err := Hook(HookEventIdleTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) resp, err := Hook(HookEventIdleTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil { if err != nil {
log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error())
return resp, utils.HookStateFailure return resp, utils.HookStateFailure
} }

View File

@@ -401,7 +401,7 @@ func (s *PublishSource) AddSink(sink Sink) bool {
defer sink.UnLock() defer sink.UnLock()
if SessionStateClosed == sink.State() { if SessionStateClosed == sink.State() {
log.Sugar.Warnf("AddSink失败, sink已经断开接 %s", sink.PrintInfo()) log.Sugar.Warnf("AddSink失败, sink已经断开接 %s", sink.PrintInfo())
} else { } else {
transStream.AddSink(sink) transStream.AddSink(sink)
} }
@@ -510,7 +510,7 @@ func (s *PublishSource) doClose() {
defer sink.UnLock() defer sink.UnLock()
if SessionStateClosed == sink.State() { if SessionStateClosed == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开接 %s", sink.PrintInfo()) log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开接 %s", sink.PrintInfo())
} else { } else {
sink.SetState(SessionStateWait) sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.Id_, sink) AddSinkToWaitingQueue(s.Id_, sink)

View File

@@ -121,7 +121,7 @@ func (t *TCPTransStream) SendPacket(data []byte) error {
} }
if _, ok := err.(*transport.ZeroWindowSizeError); ok { if _, ok := err.(*transport.ZeroWindowSizeError); ok {
log.Sugar.Errorf("发送超时, 强制断开接 sink:%s", sink.PrintInfo()) log.Sugar.Errorf("发送超时, 强制断开接 sink:%s", sink.PrintInfo())
sink.GetConn().Close() sink.GetConn().Close()
} }
} }