diff --git a/api.go b/api.go index c04fd04..7e0dd07 100644 --- a/api.go +++ b/api.go @@ -149,7 +149,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 单端口模式下不能主动拉流"} } else if !tcp { err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, UDP不能主动拉流"} - } else if !stream.AppConfig.GB28181.EnableTCP() { + } else if !stream.AppConfig.GB28181.IsEnableTCP() { err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 未开启TCP, UDP不能主动拉流"} } diff --git a/config.json b/config.json index a5ff867..e922edf 100644 --- a/config.json +++ b/config.json @@ -56,6 +56,9 @@ "hook": { "enable": true, "timeout": 10, + + "on_started": "http://localhost:9000/api/v1/hook/on_started", + "on_publish": "http://localhost:9000/api/v1/hook/on_publish", "on_publish_done": "http://localhost:9000/api/v1/hook/on_publish_done", "on_play" : "http://localhost:9000/api/v1/hook/on_play", diff --git a/gb28181/source.go b/gb28181/source.go index 20d7479..be4504a 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -254,7 +254,7 @@ func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ G source.SetSSRC(ssrc) source.SetState(stream.SessionStateTransferring) - if stream.AppConfig.Hook.EnablePublishEvent() { + if stream.AppConfig.Hook.IsEnablePublishEvent() { go func() { _, state := stream.HookPublishEvent(source_) if utils.HookStateOK != state { @@ -271,13 +271,13 @@ func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ G // NewGBSource 创建gb源,返回监听的收流端口 func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) { if tcp { - utils.Assert(stream.AppConfig.GB28181.EnableTCP()) + utils.Assert(stream.AppConfig.GB28181.IsEnableTCP()) } else { - utils.Assert(stream.AppConfig.GB28181.EnableUDP()) + utils.Assert(stream.AppConfig.GB28181.IsEnableUDP()) } if active { - utils.Assert(tcp && stream.AppConfig.GB28181.EnableTCP() && stream.AppConfig.GB28181.IsMultiPort()) + utils.Assert(tcp && stream.AppConfig.GB28181.IsEnableTCP() && stream.AppConfig.GB28181.IsMultiPort()) } var source GBSource diff --git a/main.go b/main.go index 3a15891..cdeb636 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,7 @@ func main() { //单端口模式下, 启动时就创建收流端口 //多端口模式下, 创建GBSource时才创建收流端口 if !stream.AppConfig.GB28181.IsMultiPort() { - if stream.AppConfig.GB28181.EnableUDP() { + if stream.AppConfig.GB28181.IsEnableUDP() { server, err := gb28181.NewUDPServer(gb28181.NewSharedFilter(128)) if err != nil { panic(err) @@ -99,7 +99,7 @@ func main() { log.Sugar.Info("启动GB28181 UDP收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0])) } - if stream.AppConfig.GB28181.EnableTCP() { + if stream.AppConfig.GB28181.IsEnableTCP() { server, err := gb28181.NewTCPServer(gb28181.NewSharedFilter(128)) if err != nil { panic(err) @@ -125,8 +125,18 @@ func main() { log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) } + if stream.AppConfig.Hook.IsEnableOnStarted() { + go func() { + if _, err := stream.Hook(stream.HookEventStarted, "", nil); err != nil { + log.Sugar.Errorf("发送启动通知失败 err:%s", err.Error()) + } + }() + } + err := http.ListenAndServe(":19999", nil) if err != nil { - panic(err) + println(err) } + + select {} } diff --git a/stream/config.go b/stream/config.go index 2e40c1e..34f9d74 100644 --- a/stream/config.go +++ b/stream/config.go @@ -69,11 +69,11 @@ type GB28181Config struct { Port []int `json:"port"` } -func (g TransportConfig) EnableTCP() bool { +func (g TransportConfig) IsEnableTCP() bool { return strings.Contains(g.Transport, "TCP") } -func (g TransportConfig) EnableUDP() bool { +func (g TransportConfig) IsEnableUDP() bool { return strings.Contains(g.Transport, "UDP") } @@ -118,6 +118,7 @@ func (c HlsConfig) TSFormat(sourceId string) string { type HookConfig struct { Enable bool `json:"enable"` Timeout int64 `json:"timeout"` + OnStartedUrl string `json:"on_started"` //应用启动后回调 OnPublishUrl string `json:"on_publish"` //推流回调 OnPublishDoneUrl string `json:"on_publish_done"` //推流结束回调 OnPlayUrl string `json:"on_play"` //拉流回调 @@ -127,34 +128,38 @@ type HookConfig struct { OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //没有推流回调 } -func (hook *HookConfig) EnablePublishEvent() bool { +func (hook *HookConfig) IsEnablePublishEvent() bool { return hook.Enable && hook.OnPublishUrl != "" } -func (hook *HookConfig) EnableOnPublishDone() bool { +func (hook *HookConfig) IsEnableOnPublishDone() bool { return hook.Enable && hook.OnPublishDoneUrl != "" } -func (hook *HookConfig) EnableOnPlay() bool { +func (hook *HookConfig) IsEnableOnPlay() bool { return hook.Enable && hook.OnPlayUrl != "" } -func (hook *HookConfig) EnableOnPlayDone() bool { +func (hook *HookConfig) IsEnableOnPlayDone() bool { return hook.Enable && hook.OnPlayDoneUrl != "" } -func (hook *HookConfig) EnableOnRecord() bool { +func (hook *HookConfig) IsEnableOnRecord() bool { return hook.Enable && hook.OnRecordUrl != "" } -func (hook *HookConfig) EnableOnIdleTimeout() bool { +func (hook *HookConfig) IsEnableOnIdleTimeout() bool { return hook.Enable && hook.OnIdleTimeoutUrl != "" } -func (hook *HookConfig) EnableOnReceiveTimeout() bool { +func (hook *HookConfig) IsEnableOnReceiveTimeout() bool { return hook.Enable && hook.OnReceiveTimeoutUrl != "" } +func (hook *HookConfig) IsEnableOnStarted() bool { + return hook.Enable && hook.OnStartedUrl != "" +} + func GetStreamPlayUrls(sourceId string) []string { var urls []string if AppConfig.Rtmp.Enable { diff --git a/stream/hook.go b/stream/hook.go index ee93a3d..0043403 100644 --- a/stream/hook.go +++ b/stream/hook.go @@ -38,7 +38,6 @@ func sendHookEvent(url string, body interface{}) (*http.Response, error) { } log.Sugar.Infof("发送hook通知 url:%s body:%s", url, marshal) - request.Header.Set("Content-Type", "application/json") return client.Do(request) } @@ -52,6 +51,7 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err if "" != params { url += "?" + params } + response, err := sendHookEvent(url, body) if err == nil && http.StatusOK != response.StatusCode { return response, fmt.Errorf("reason %s", response.Status) diff --git a/stream/hook_event.go b/stream/hook_event.go index f8825fc..ed3e6d6 100644 --- a/stream/hook_event.go +++ b/stream/hook_event.go @@ -12,6 +12,7 @@ const ( HookEventRecord = HookEvent(0x5) HookEventIdleTimeout = HookEvent(0x6) HookEventReceiveTimeout = HookEvent(0x7) + HookEventStarted = HookEvent(0x8) ) var ( @@ -27,6 +28,7 @@ func InitHookUrl() { HookEventRecord: AppConfig.Hook.OnRecordUrl, HookEventIdleTimeout: AppConfig.Hook.OnIdleTimeoutUrl, HookEventReceiveTimeout: AppConfig.Hook.OnReceiveTimeoutUrl, + HookEventStarted: AppConfig.Hook.OnStartedUrl, } } diff --git a/stream/hook_sink.go b/stream/hook_sink.go index 105235e..62bba15 100644 --- a/stream/hook_sink.go +++ b/stream/hook_sink.go @@ -9,7 +9,7 @@ import ( func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) { var response *http.Response - if AppConfig.Hook.EnableOnPlay() { + if AppConfig.Hook.IsEnableOnPlay() { hook, err := Hook(HookEventPlay, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink)) if err != nil { log.Sugar.Errorf("通知播放事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId()) @@ -46,7 +46,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) { func HookPlayDoneEvent(sink Sink) (*http.Response, bool) { var response *http.Response - if AppConfig.Hook.EnableOnPlayDone() { + if AppConfig.Hook.IsEnableOnPlayDone() { hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink)) if err != nil { log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId()) diff --git a/stream/hook_source.go b/stream/hook_source.go index 0ff9d6a..88d4447 100644 --- a/stream/hook_source.go +++ b/stream/hook_source.go @@ -9,7 +9,7 @@ import ( func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState) { var response *http.Response - if hook && AppConfig.Hook.EnablePublishEvent() { + if hook && AppConfig.Hook.IsEnablePublishEvent() { rep, state := HookPublishEvent(source) if utils.HookStateOK != state { return rep, state @@ -37,7 +37,7 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS func HookPublishEvent(source Source) (*http.Response, utils.HookState) { var response *http.Response - if AppConfig.Hook.EnablePublishEvent() { + if AppConfig.Hook.IsEnablePublishEvent() { hook, err := Hook(HookEventPublish, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) if err != nil { log.Sugar.Errorf("通知推流事件失败 source:%s err:%s", source.Id(), err.Error()) @@ -51,7 +51,7 @@ func HookPublishEvent(source Source) (*http.Response, utils.HookState) { } func HookPublishDoneEvent(source Source) { - if AppConfig.Hook.EnablePublishEvent() { + if AppConfig.Hook.IsEnablePublishEvent() { _, err := Hook(HookEventPublishDone, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) if err != nil { log.Sugar.Errorf("通知推流结束事件失败 source:%s err:%s", source.Id(), err.Error()) @@ -62,7 +62,7 @@ func HookPublishDoneEvent(source Source) { func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) { var response *http.Response - if AppConfig.Hook.EnableOnReceiveTimeout() { + if AppConfig.Hook.IsEnableOnReceiveTimeout() { resp, err := Hook(HookEventReceiveTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) if err != nil { log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error()) @@ -78,7 +78,7 @@ func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) { func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState) { var response *http.Response - if AppConfig.Hook.EnableOnIdleTimeout() { + if AppConfig.Hook.IsEnableOnIdleTimeout() { resp, err := Hook(HookEventIdleTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source)) if err != nil { log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error())