增加应用启动后通知

This commit is contained in:
yangjiechina
2024-07-11 20:55:44 +08:00
parent 5783eb62c5
commit b5523e6e6f
9 changed files with 45 additions and 25 deletions

2
api.go
View File

@@ -149,7 +149,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 单端口模式下不能主动拉流"}
} else if !tcp {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, UDP不能主动拉流"}
} else if !stream.AppConfig.GB28181.EnableTCP() {
} else if !stream.AppConfig.GB28181.IsEnableTCP() {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 未开启TCP, UDP不能主动拉流"}
}

View File

@@ -56,6 +56,9 @@
"hook": {
"enable": true,
"timeout": 10,
"on_started": "http://localhost:9000/api/v1/hook/on_started",
"on_publish": "http://localhost:9000/api/v1/hook/on_publish",
"on_publish_done": "http://localhost:9000/api/v1/hook/on_publish_done",
"on_play" : "http://localhost:9000/api/v1/hook/on_play",

View File

@@ -254,7 +254,7 @@ func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ G
source.SetSSRC(ssrc)
source.SetState(stream.SessionStateTransferring)
if stream.AppConfig.Hook.EnablePublishEvent() {
if stream.AppConfig.Hook.IsEnablePublishEvent() {
go func() {
_, state := stream.HookPublishEvent(source_)
if utils.HookStateOK != state {
@@ -271,13 +271,13 @@ func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ G
// NewGBSource 创建gb源,返回监听的收流端口
func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, int, error) {
if tcp {
utils.Assert(stream.AppConfig.GB28181.EnableTCP())
utils.Assert(stream.AppConfig.GB28181.IsEnableTCP())
} else {
utils.Assert(stream.AppConfig.GB28181.EnableUDP())
utils.Assert(stream.AppConfig.GB28181.IsEnableUDP())
}
if active {
utils.Assert(tcp && stream.AppConfig.GB28181.EnableTCP() && stream.AppConfig.GB28181.IsMultiPort())
utils.Assert(tcp && stream.AppConfig.GB28181.IsEnableTCP() && stream.AppConfig.GB28181.IsMultiPort())
}
var source GBSource

16
main.go
View File

@@ -89,7 +89,7 @@ func main() {
//单端口模式下, 启动时就创建收流端口
//多端口模式下, 创建GBSource时才创建收流端口
if !stream.AppConfig.GB28181.IsMultiPort() {
if stream.AppConfig.GB28181.EnableUDP() {
if stream.AppConfig.GB28181.IsEnableUDP() {
server, err := gb28181.NewUDPServer(gb28181.NewSharedFilter(128))
if err != nil {
panic(err)
@@ -99,7 +99,7 @@ func main() {
log.Sugar.Info("启动GB28181 UDP收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
}
if stream.AppConfig.GB28181.EnableTCP() {
if stream.AppConfig.GB28181.IsEnableTCP() {
server, err := gb28181.NewTCPServer(gb28181.NewSharedFilter(128))
if err != nil {
panic(err)
@@ -125,8 +125,18 @@ func main() {
log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String())
}
if stream.AppConfig.Hook.IsEnableOnStarted() {
go func() {
if _, err := stream.Hook(stream.HookEventStarted, "", nil); err != nil {
log.Sugar.Errorf("发送启动通知失败 err:%s", err.Error())
}
}()
}
err := http.ListenAndServe(":19999", nil)
if err != nil {
panic(err)
println(err)
}
select {}
}

View File

@@ -69,11 +69,11 @@ type GB28181Config struct {
Port []int `json:"port"`
}
func (g TransportConfig) EnableTCP() bool {
func (g TransportConfig) IsEnableTCP() bool {
return strings.Contains(g.Transport, "TCP")
}
func (g TransportConfig) EnableUDP() bool {
func (g TransportConfig) IsEnableUDP() bool {
return strings.Contains(g.Transport, "UDP")
}
@@ -118,6 +118,7 @@ func (c HlsConfig) TSFormat(sourceId string) string {
type HookConfig struct {
Enable bool `json:"enable"`
Timeout int64 `json:"timeout"`
OnStartedUrl string `json:"on_started"` //应用启动后回调
OnPublishUrl string `json:"on_publish"` //推流回调
OnPublishDoneUrl string `json:"on_publish_done"` //推流结束回调
OnPlayUrl string `json:"on_play"` //拉流回调
@@ -127,34 +128,38 @@ type HookConfig struct {
OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //没有推流回调
}
func (hook *HookConfig) EnablePublishEvent() bool {
func (hook *HookConfig) IsEnablePublishEvent() bool {
return hook.Enable && hook.OnPublishUrl != ""
}
func (hook *HookConfig) EnableOnPublishDone() bool {
func (hook *HookConfig) IsEnableOnPublishDone() bool {
return hook.Enable && hook.OnPublishDoneUrl != ""
}
func (hook *HookConfig) EnableOnPlay() bool {
func (hook *HookConfig) IsEnableOnPlay() bool {
return hook.Enable && hook.OnPlayUrl != ""
}
func (hook *HookConfig) EnableOnPlayDone() bool {
func (hook *HookConfig) IsEnableOnPlayDone() bool {
return hook.Enable && hook.OnPlayDoneUrl != ""
}
func (hook *HookConfig) EnableOnRecord() bool {
func (hook *HookConfig) IsEnableOnRecord() bool {
return hook.Enable && hook.OnRecordUrl != ""
}
func (hook *HookConfig) EnableOnIdleTimeout() bool {
func (hook *HookConfig) IsEnableOnIdleTimeout() bool {
return hook.Enable && hook.OnIdleTimeoutUrl != ""
}
func (hook *HookConfig) EnableOnReceiveTimeout() bool {
func (hook *HookConfig) IsEnableOnReceiveTimeout() bool {
return hook.Enable && hook.OnReceiveTimeoutUrl != ""
}
func (hook *HookConfig) IsEnableOnStarted() bool {
return hook.Enable && hook.OnStartedUrl != ""
}
func GetStreamPlayUrls(sourceId string) []string {
var urls []string
if AppConfig.Rtmp.Enable {

View File

@@ -38,7 +38,6 @@ func sendHookEvent(url string, body interface{}) (*http.Response, error) {
}
log.Sugar.Infof("发送hook通知 url:%s body:%s", url, marshal)
request.Header.Set("Content-Type", "application/json")
return client.Do(request)
}
@@ -52,6 +51,7 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err
if "" != params {
url += "?" + params
}
response, err := sendHookEvent(url, body)
if err == nil && http.StatusOK != response.StatusCode {
return response, fmt.Errorf("reason %s", response.Status)

View File

@@ -12,6 +12,7 @@ const (
HookEventRecord = HookEvent(0x5)
HookEventIdleTimeout = HookEvent(0x6)
HookEventReceiveTimeout = HookEvent(0x7)
HookEventStarted = HookEvent(0x8)
)
var (
@@ -27,6 +28,7 @@ func InitHookUrl() {
HookEventRecord: AppConfig.Hook.OnRecordUrl,
HookEventIdleTimeout: AppConfig.Hook.OnIdleTimeoutUrl,
HookEventReceiveTimeout: AppConfig.Hook.OnReceiveTimeoutUrl,
HookEventStarted: AppConfig.Hook.OnStartedUrl,
}
}

View File

@@ -9,7 +9,7 @@ import (
func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnableOnPlay() {
if AppConfig.Hook.IsEnableOnPlay() {
hook, err := Hook(HookEventPlay, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())
@@ -46,7 +46,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
var response *http.Response
if AppConfig.Hook.EnableOnPlayDone() {
if AppConfig.Hook.IsEnableOnPlayDone() {
hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())

View File

@@ -9,7 +9,7 @@ import (
func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState) {
var response *http.Response
if hook && AppConfig.Hook.EnablePublishEvent() {
if hook && AppConfig.Hook.IsEnablePublishEvent() {
rep, state := HookPublishEvent(source)
if utils.HookStateOK != state {
return rep, state
@@ -37,7 +37,7 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS
func HookPublishEvent(source Source) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnablePublishEvent() {
if AppConfig.Hook.IsEnablePublishEvent() {
hook, err := Hook(HookEventPublish, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知推流事件失败 source:%s err:%s", source.Id(), err.Error())
@@ -51,7 +51,7 @@ func HookPublishEvent(source Source) (*http.Response, utils.HookState) {
}
func HookPublishDoneEvent(source Source) {
if AppConfig.Hook.EnablePublishEvent() {
if AppConfig.Hook.IsEnablePublishEvent() {
_, err := Hook(HookEventPublishDone, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知推流结束事件失败 source:%s err:%s", source.Id(), err.Error())
@@ -62,7 +62,7 @@ func HookPublishDoneEvent(source Source) {
func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnableOnReceiveTimeout() {
if AppConfig.Hook.IsEnableOnReceiveTimeout() {
resp, err := Hook(HookEventReceiveTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error())
@@ -78,7 +78,7 @@ func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) {
func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnableOnIdleTimeout() {
if AppConfig.Hook.IsEnableOnIdleTimeout() {
resp, err := Hook(HookEventIdleTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error())