diff --git a/api.go b/api.go index 7540f8e..5ad2c4b 100644 --- a/api.go +++ b/api.go @@ -253,17 +253,6 @@ func (api *ApiServer) generateSourceId(remoteAddr string) stream.SinkId { return stream.GenerateSinkId(tcpAddr) } -func (api *ApiServer) doPlay(sink stream.Sink) utils.HookState { - ok := utils.HookStateOK - stream.HookPlaying(sink, func() { - - }, func(state utils.HookState) { - ok = state - }) - - return ok -} - func (api *ApiServer) onFlv(sourceId string, w http.ResponseWriter, r *http.Request) { ws := true if !("upgrade" == strings.ToLower(r.Header.Get("Connection"))) { @@ -291,9 +280,9 @@ func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Re sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, flv.NewWSConn(conn)) log.Sugar.Infof("ws-flv 连接 sink:%s", sink.PrintInfo()) - state := api.doPlay(sink) + _, state := stream.PreparePlaySink(sink) if utils.HookStateOK != state { - log.Sugar.Warnf("ws-flv 播放失败 state:%d sink:%s", state, sink.PrintInfo()) + log.Sugar.Warnf("ws-flv 播放失败 sink:%s", sink.PrintInfo()) w.WriteHeader(http.StatusForbidden) return } @@ -330,9 +319,9 @@ func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http. sink := flv.NewFLVSink(api.generateSinkId(r.RemoteAddr), sourceId, conn) log.Sugar.Infof("http-flv 连接 sink:%s", sink.PrintInfo()) - state := api.doPlay(sink) + _, state := stream.PreparePlaySink(sink) if utils.HookStateOK != state { - log.Sugar.Warnf("http-flv 播放失败 state:%d sink:%s", state, sink.PrintInfo()) + log.Sugar.Warnf("http-flv 播放失败 sink:%s", sink.PrintInfo()) w.WriteHeader(http.StatusForbidden) return @@ -398,9 +387,9 @@ func (api *ApiServer) onHLS(sourceId string, w http.ResponseWriter, r *http.Requ done <- 0 }) - state := api.doPlay(sink) + _, state := stream.PreparePlaySink(sink) if utils.HookStateOK != state { - log.Sugar.Warnf("m3u8 请求失败 state:%d sink:%s", state, sink.PrintInfo()) + log.Sugar.Warnf("m3u8 请求失败 sink:%s", sink.PrintInfo()) w.WriteHeader(http.StatusForbidden) return @@ -460,9 +449,9 @@ func (api *ApiServer) onRtc(sourceId string, w http.ResponseWriter, r *http.Requ log.Sugar.Infof("rtc 请求 sink:%s sdp:%v", sink.PrintInfo(), v.SDP) - state := api.doPlay(sink) + _, state := stream.PreparePlaySink(sink) if utils.HookStateOK != state { - log.Sugar.Warnf("rtc 播放失败 state:%d sink:%s", state, sink.PrintInfo()) + log.Sugar.Warnf("rtc 播放失败 sink:%s", sink.PrintInfo()) w.WriteHeader(http.StatusForbidden) group.Done() diff --git a/config.json b/config.json index 3814c57..42a28e6 100644 --- a/config.json +++ b/config.json @@ -4,6 +4,8 @@ "probe_timeout": 2000, "mw_latency": 350, "public_ip": "192.168.2.148", + "idle_timeout": 60, + "receive_timeout":60, "http": { "addr": "0.0.0.0:8080" @@ -49,14 +51,15 @@ }, "hook": { + "enable": true, "timeout": 10, - "on_publish": "http://localhost:8080/api/v1/live/publish/auth", - "on_publish_done": "http://localhost:8080/api/v1/live/publishdone", - "on_play" : "http://localhost:8080/api/v1/live/play/auth", - "on_play_done" : "http://localhost:8080/api/v1/live/playdone", + "on_publish": "http://localhost:8081/api/v1/on_publish", + "on_publish_done": "http://localhost:8081/api/v1/on_publish_done", + "on_play" : "http://localhost:8081/api/v1/on_play", + "on_play_done" : "http://localhost:8081/api/on_play_done", - "on_record": "", - "on_idle_timeout": "", - "on_recv_timeout": "" + "on_record": "http://localhost:8081/api/v1/on_reocrd", + "on_idle_timeout": "http://localhost:8081/api/v1/on_idle_timeout", + "on_receive_timeout": "http://localhost:8081/api/v1/on_recv_timeout" } } \ No newline at end of file diff --git a/gb28181/source.go b/gb28181/source.go index f563d0a..6810e9e 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -12,6 +12,7 @@ import ( "github.com/yangjiechina/lkm/log" "github.com/yangjiechina/lkm/stream" "net" + "net/http" ) type TransportType int @@ -137,10 +138,9 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1 } source.PrepareTransDeMuxer(id, ssrc) - - if err = stream.SourceManager.Add(source); err != nil { - source.Close() - return nil, 0, err + _, state := stream.PreparePublishSource(source, false) + if http.StatusOK != state { + return nil, 0, fmt.Errorf("error code %d", state) } source.Init(source.Input) diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index 90d1c66..f5e6cd1 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -211,11 +211,12 @@ func (s *Session) OnJtPTPPacket(data []byte) { s.rtpPacket = &RtpPacket{} *s.rtpPacket = packet - s.Publish(s, func() { - //response <- utils.HookStateOK - }, func(state utils.HookState) { - //response <- state - }) + _, state := stream.PreparePublishSource(s, true) + if utils.HookStateOK != state { + log.Sugar.Errorf("1078推流失败 source:%s", s.phone) + } + + s.Close() } //完整包/最后一个分包, 创建AVPacket diff --git a/main.go b/main.go index 09043e8..2454810 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap/zapcore" "net" "net/http" + "time" _ "net/http/pprof" @@ -25,6 +26,8 @@ func NewDefaultAppConfig() stream.AppConfig_ { GOPBufferSize: 8196000, MergeWriteLatency: 350, PublicIP: "192.168.2.148", + IdleTimeout: int64(60 * time.Second), + ReceiveTimeout: int64(60 * time.Second), Hls: stream.HlsConfig{ Enable: false, @@ -74,6 +77,18 @@ func NewDefaultAppConfig() stream.AppConfig_ { Enable: true, Addr: "0.0.0.0:1078", }, + + Hook: stream.HookConfig{ + Enable: true, + Timeout: int64(60 * time.Second), + OnPublishUrl: "http://localhost:8082/api/v1/on_publish", + OnPublishDoneUrl: "http://localhost:8082/api/v1/on_publish_done", + OnPlayUrl: "http://localhost:8082/api/v1/on_play", + OnPlayDoneUrl: "http://localhost:8082/api/on_play_done", + OnRecordUrl: "http://localhost:8082/api/v1/on_reocrd", + OnIdleTimeoutUrl: "http://localhost:8082/api/v1/on_idle_timeout", + OnReceiveTimeoutUrl: "http://localhost:8082/api/v1/on_recv_timeout", + }, } } @@ -85,6 +100,7 @@ func init() { stream.RegisterTransStreamFactory(stream.ProtocolRtc, rtc.TransStreamFactory) stream.AppConfig = NewDefaultAppConfig() + stream.InitHookUrl() //初始化日志 log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress) diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 913c55a..9807b93 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -38,17 +38,18 @@ func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState) s.stack.SetOnPublishHandler(source) //推流事件Source统一处理, 是否已经存在, Hook回调.... - source.Publish(source, func() { + _, state := stream.PreparePublishSource(source, true) + if utils.HookStateOK != state { + log.Sugar.Errorf("rtmp推流失败 source:%s", sourceId) + } else { s.handle = source s.isPublisher = true source.Init(source.Input) go source.LoopEvent() + } - response <- utils.HookStateOK - }, func(state utils.HookState) { - response <- state - }) + response <- state } func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) { @@ -58,12 +59,14 @@ func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) { log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String()) - stream.HookPlaying(sink, func() { + _, state := stream.PreparePlaySink(sink) + if utils.HookStateOK != state { + log.Sugar.Errorf("rtmp拉流失败 source:%s sink:%s", sourceId, sink.Id()) + } else { s.handle = sink - response <- utils.HookStateOK - }, func(state utils.HookState) { - response <- state - }) + } + + response <- state } func (s *Session) Input(conn net.Conn, data []byte) error { diff --git a/rtsp/rtsp_handler.go b/rtsp/rtsp_handler.go index 7c67683..744e8d3 100644 --- a/rtsp/rtsp_handler.go +++ b/rtsp/rtsp_handler.go @@ -138,14 +138,7 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) { request.session.response(response, []byte(sdp)) }) - code := utils.HookStateOK - - stream.HookPlaying(sink_, func() { - - }, func(state utils.HookState) { - code = state - }) - + _, code := stream.PreparePlaySink(sink_) if utils.HookStateOK != code { return nil, nil, fmt.Errorf("hook failed. code:%d", code) } diff --git a/stream/config.go b/stream/config.go index 748c37c..e293aa8 100644 --- a/stream/config.go +++ b/stream/config.go @@ -1,6 +1,8 @@ package stream -import "strings" +import ( + "strings" +) const ( DefaultMergeWriteLatency = 350 @@ -8,7 +10,7 @@ const ( type TransportConfig struct { Transport string //"UDP|TCP" - Port [2]uint16 //单端口模式[0]=port/多端口模式[0]=start port, [0]=end port. + Port [2]uint16 //单端口-1个元素/多端口-2个元素 } type RtmpConfig struct { @@ -20,8 +22,8 @@ type RtspConfig struct { TransportConfig Addr string - Enable bool `json:"enable"` - Password string + Enable bool `json:"enable"` + Password string `json:"password"` } type RecordConfig struct { @@ -30,10 +32,10 @@ type RecordConfig struct { } type HlsConfig struct { - Enable bool - Dir string - Duration int - PlaylistLength int + Enable bool `json:"enable"` + Dir string `json:"dir"` + Duration int `json:"duration"` + PlaylistLength int `json:"playlist_length"` } type LogConfig struct { @@ -46,18 +48,18 @@ type LogConfig struct { } type HttpConfig struct { - Enable bool - Addr string + Enable bool `json:"enable"` + Addr string `json:"addr"` } type GB28181Config struct { TransportConfig - Addr string + Addr string `json:"addr"` } type JT1078Config struct { - Enable bool - Addr string + Enable bool `json:"enable"` + Addr string `json:"addr"` } func (g TransportConfig) EnableTCP() bool { @@ -91,43 +93,43 @@ func (c HlsConfig) TSFormat(sourceId string, tsSeq string) string { } type HookConfig struct { - Time int - Enable bool `json:"enable"` - OnPublish string `json:"on_publish"` //推流回调 - OnPublishDone string `json:"on_publish_done"` //推流结束回调 - OnPlay string `json:"on_play"` //拉流回调 - OnPlayDone string `json:"on_play_done"` //拉流结束回调 - OnRecord string `json:"on_record"` //录制流回调 - OnIdleTimeout string `json:"on_idle_timeout"` //多久没有sink拉流回调 - OnRecvTimeout string `json:"on_recv_timeout"` //多久没有推流回调 + Enable bool `json:"enable"` + Timeout int64 `json:"timeout"` + OnPublishUrl string `json:"on_publish"` //推流回调 + OnPublishDoneUrl string `json:"on_publish_done"` //推流结束回调 + OnPlayUrl string `json:"on_play"` //拉流回调 + OnPlayDoneUrl string `json:"on_play_done"` //拉流结束回调 + OnRecordUrl string `json:"on_record"` //录制流回调 + OnIdleTimeoutUrl string `json:"on_idle_timeout"` //多久没有sink拉流回调 + OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //多久没有推流回调 } -func (hook *HookConfig) EnableOnPublish() bool { - return hook.OnPublish != "" +func (hook *HookConfig) EnablePublishEvent() bool { + return hook.Enable && hook.OnPublishUrl != "" } func (hook *HookConfig) EnableOnPublishDone() bool { - return hook.OnPublishDone != "" + return hook.Enable && hook.OnPublishDoneUrl != "" } func (hook *HookConfig) EnableOnPlay() bool { - return hook.OnPlay != "" + return hook.Enable && hook.OnPlayUrl != "" } func (hook *HookConfig) EnableOnPlayDone() bool { - return hook.OnPlayDone != "" + return hook.Enable && hook.OnPlayDoneUrl != "" } func (hook *HookConfig) EnableOnRecord() bool { - return hook.OnRecord != "" + return hook.Enable && hook.OnRecordUrl != "" } func (hook *HookConfig) EnableOnIdleTimeout() bool { - return hook.OnIdleTimeout != "" + return hook.Enable && hook.OnIdleTimeoutUrl != "" } -func (hook *HookConfig) EnableOnRecvTimeout() bool { - return hook.OnRecvTimeout != "" +func (hook *HookConfig) EnableOnReceiveTimeout() bool { + return hook.Enable && hook.OnReceiveTimeoutUrl != "" } var AppConfig AppConfig_ @@ -138,27 +140,23 @@ func init() { // AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写. type AppConfig_ struct { - GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 - GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 - ProbeTimeout int `json:"probe_timeout"` - PublicIP string `json:"public_ip"` + GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 + GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 + ProbeTimeout int `json:"probe_timeout"` + PublicIP string `json:"public_ip"` + IdleTimeout int64 `json:"idle_timeout"` //多长时间没有sink拉流, 单位秒 + ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒 //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. MergeWriteLatency int `json:"mw_latency"` Rtmp RtmpConfig Rtsp RtspConfig - - Hook HookConfig - - Record RecordConfig - Hls HlsConfig - - Log LogConfig - - Http HttpConfig - - GB28181 GB28181Config - - JT1078 JT1078Config + Hook HookConfig + Record RecordConfig + Hls HlsConfig + Log LogConfig + Http HttpConfig + GB28181 GB28181Config + JT1078 JT1078Config } diff --git a/stream/hook.go b/stream/hook.go index 80aa8a1..0a64266 100644 --- a/stream/hook.go +++ b/stream/hook.go @@ -4,24 +4,10 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/yangjiechina/avformat/utils" "net/http" "time" ) -type HookFunc func(m map[string]interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error -type HookEvent int - -const ( - HookEventPublish = HookEvent(0x1) - HookEventPublishDone = HookEvent(0x2) - HookEventPlay = HookEvent(0x3) - HookEventPlayDone = HookEvent(0x4) - HookEventRecord = HookEvent(0x5) - HookEventIdleTimeout = HookEvent(0x6) - HookEventRecvTimeout = HookEvent(0x7) -) - // 每个通知的时间都需要携带的字段 type eventInfo struct { stream string //stream id @@ -29,107 +15,41 @@ type eventInfo struct { remoteAddr string //peer地址 } -func NewPlayHookEventInfo(stream, remoteAddr string, protocol Protocol) eventInfo { - return eventInfo{stream: stream, protocol: protocol.ToString(), remoteAddr: remoteAddr} +func NewHookPlayEventInfo(sink Sink) eventInfo { + return eventInfo{stream: sink.SourceId(), protocol: sink.Protocol().ToString(), remoteAddr: sink.PrintInfo()} +} +func NewHookPublishEventInfo(source Source) eventInfo { + return eventInfo{stream: source.Id(), protocol: source.Type().ToString(), remoteAddr: source.RemoteAddr()} } -func NewPublishHookEventInfo(stream, remoteAddr string, protocol SourceType) eventInfo { - return eventInfo{stream: stream, protocol: protocol.ToString(), remoteAddr: remoteAddr} -} - -type HookHandler interface { - Play(success func(), failure func(state utils.HookState)) - - PlayDone(success func(), failure func(state utils.HookState)) -} - -type HookSession interface { - send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error - - Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error -} - -var hookUrls map[HookEvent]string - -func init() { - hookUrls = map[HookEvent]string{ - HookEventPublish: "", - HookEventPublishDone: "", - HookEventPlay: "", - HookEventPlayDone: "", - HookEventRecord: "", - HookEventIdleTimeout: "", - HookEventRecvTimeout: "", - } -} - -func sendHookEvent(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { +func sendHookEvent(url string, body interface{}) (*http.Response, error) { marshal, err := json.Marshal(body) if err != nil { - return err + return nil, err } client := &http.Client{ - Timeout: time.Second * time.Duration(AppConfig.Hook.Time), + Timeout: time.Duration(AppConfig.Hook.Timeout), } request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal)) if err != nil { - return err + return nil, err } request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - if err != nil { - failure(response, err) - } else if response.StatusCode != http.StatusOK { - failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status)) - } else { - success(response) - } - - return nil + return client.Do(request) } -func hookEvent(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { - url := hookUrls[event] - if url == "" { - success(nil) - return nil +func Hook(event HookEvent, body interface{}) (*http.Response, error) { + url, ok := hookUrls[event] + if url == "" || !ok { + return nil, fmt.Errorf("the url for this %s event does not exist", event.ToString()) } - return sendHookEvent(url, body, success, failure) -} - -type hookSession struct { -} - -func (h *hookSession) send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { - marshal, err := json.Marshal(body) - if err != nil { - return err - } - - client := &http.Client{ - Timeout: time.Second * time.Duration(AppConfig.Hook.Time), - } - request, err := http.NewRequest("post", url, bytes.NewBuffer(marshal)) - if err != nil { - return err - } - - request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - if err != nil { - failure(response, err) - } else if response.StatusCode != http.StatusOK { - failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status)) - } else { - success(response) - } - - return sendHookEvent(url, body, success, failure) -} - -func (h *hookSession) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error { - return hookEvent(event, body, success, failure) + response, err := sendHookEvent(url, body) + if err != nil && http.StatusOK != response.StatusCode { + return response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status) + } + + return response, err } diff --git a/stream/hook_event.go b/stream/hook_event.go new file mode 100644 index 0000000..f8825fc --- /dev/null +++ b/stream/hook_event.go @@ -0,0 +1,51 @@ +package stream + +import "fmt" + +type HookEvent int + +const ( + HookEventPublish = HookEvent(0x1) + HookEventPublishDone = HookEvent(0x2) + HookEventPlay = HookEvent(0x3) + HookEventPlayDone = HookEvent(0x4) + HookEventRecord = HookEvent(0x5) + HookEventIdleTimeout = HookEvent(0x6) + HookEventReceiveTimeout = HookEvent(0x7) +) + +var ( + hookUrls map[HookEvent]string +) + +func InitHookUrl() { + hookUrls = map[HookEvent]string{ + HookEventPublish: AppConfig.Hook.OnPublishUrl, + HookEventPublishDone: AppConfig.Hook.OnPublishDoneUrl, + HookEventPlay: AppConfig.Hook.OnPlayUrl, + HookEventPlayDone: AppConfig.Hook.OnPlayDoneUrl, + HookEventRecord: AppConfig.Hook.OnRecordUrl, + HookEventIdleTimeout: AppConfig.Hook.OnIdleTimeoutUrl, + HookEventReceiveTimeout: AppConfig.Hook.OnReceiveTimeoutUrl, + } +} + +func (h *HookEvent) ToString() string { + if HookEventPublish == *h { + return "publish" + } else if HookEventPublishDone == *h { + return "publish done" + } else if HookEventPlay == *h { + return "play" + } else if HookEventPlayDone == *h { + return "play done" + } else if HookEventRecord == *h { + return "record" + } else if HookEventIdleTimeout == *h { + return "idle timeout" + } else if HookEventReceiveTimeout == *h { + return "receive timeout" + } + + panic(fmt.Sprintf("unknow hook type %d", h)) +} diff --git a/stream/hook_sink.go b/stream/hook_sink.go new file mode 100644 index 0000000..0c69767 --- /dev/null +++ b/stream/hook_sink.go @@ -0,0 +1,60 @@ +package stream + +import ( + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/lkm/log" + "net/http" +) + +func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) { + var response *http.Response + + if AppConfig.Hook.EnableOnPlay() { + hook, err := Hook(HookEventPlay, NewHookPlayEventInfo(sink)) + if err != nil { + log.Sugar.Errorf("通知播放事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId()) + + return hook, utils.HookStateFailure + } + + response = hook + } + + source := SourceManager.Find(sink.SourceId()) + if source == nil { + log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.Protocol().ToString(), sink.Id(), sink.SourceId()) + + { + sink.Lock() + defer sink.UnLock() + + if SessionStateClose == sink.State() { + log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.Id()) + return response, utils.HookStateFailure + } else { + sink.SetState(SessionStateWait) + AddSinkToWaitingQueue(sink.SourceId(), sink) + } + } + } else { + source.AddEvent(SourceEventPlay, sink) + } + + return response, utils.HookStateOK +} + +func HookPlayDoneEvent(sink Sink) (*http.Response, bool) { + var response *http.Response + + if AppConfig.Hook.EnableOnPlayDone() { + hook, err := Hook(HookEventPlay, NewHookPlayEventInfo(sink)) + if err != nil { + log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId()) + return hook, false + } + + response = hook + } + + return response, true +} diff --git a/stream/hook_source.go b/stream/hook_source.go new file mode 100644 index 0000000..91ca761 --- /dev/null +++ b/stream/hook_source.go @@ -0,0 +1,78 @@ +package stream + +import ( + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/lkm/log" + "net/http" +) + +func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState) { + var response *http.Response + + if hook && AppConfig.Hook.EnablePublishEvent() { + rep, state := HookPublishEvent(source) + if utils.HookStateOK != state { + return rep, state + } + + response = rep + } + + if err := SourceManager.Add(source); err != nil { + log.Sugar.Errorf("添加源失败 source:%s err:%s", source.Id(), err.Error()) + return nil, utils.HookStateOccupy + } + + if AppConfig.ReceiveTimeout > 0 { + source.StartReceiveDataTimer() + } + + if AppConfig.IdleTimeout > 0 { + source.StartIdleTimer() + } + + return response, utils.HookStateOK +} + +func HookPublishEvent(source Source) (*http.Response, utils.HookState) { + var response *http.Response + + if AppConfig.Hook.EnablePublishEvent() { + hook, err := Hook(HookEventPublish, NewHookPublishEventInfo(source)) + if err != nil { + log.Sugar.Errorf("通知推流事件失败 source:%s err:%s", source.Id(), err.Error()) + return hook, utils.HookStateFailure + } + + response = hook + } + + return response, utils.HookStateOK +} + +func HookPublishDoneEvent(source Source) { + if AppConfig.Hook.EnablePublishEvent() { + _, err := Hook(HookEventPublishDone, NewHookPublishEventInfo(source)) + if err != nil { + log.Sugar.Errorf("通知推流结束事件失败 source:%s err:%s", source.Id(), err.Error()) + } + } +} + +func HookReceiveTimeoutEvent(source Source) { + if AppConfig.Hook.EnableOnReceiveTimeout() { + _, err := Hook(HookEventReceiveTimeout, NewHookPublishEventInfo(source)) + if err != nil { + log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error()) + } + } +} + +func HookIdleTimeoutEvent(source Source) { + if AppConfig.Hook.EnableOnIdleTimeout() { + _, err := Hook(HookEventIdleTimeout, NewHookPublishEventInfo(source)) + if err != nil { + log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error()) + } + } +} diff --git a/stream/hook_test.go b/stream/hook_test.go index 3e55f59..864d859 100644 --- a/stream/hook_test.go +++ b/stream/hook_test.go @@ -1,21 +1,42 @@ package stream import ( + "fmt" "net/http" "testing" + "time" ) func TestHookServer(t *testing.T) { - - http.HandleFunc("/api/v1/live/publish/auth", func(writer http.ResponseWriter, request *http.Request) { - if true { + //模拟各种多个情况对推拉流的影响 + random := false + i := 1 + http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + println(fmt.Sprintf("接收到请求 path:%s", request.URL.Path)) + if !random { writer.WriteHeader(http.StatusOK) - } else { - writer.WriteHeader(http.StatusNonAuthoritativeInfo) + return } + + switch i { + case 1: + writer.WriteHeader(http.StatusOK) + break + case 2: + writer.WriteHeader(http.StatusNonAuthoritativeInfo) + break + case 3: + time.Sleep(5 * time.Second) + break + case 4: + time.Sleep(20 * time.Second) + break + } + + i = i%5 + 1 }) - err := http.ListenAndServe(":8080", nil) + err := http.ListenAndServe(":8082", nil) if err != nil { panic(err) } diff --git a/stream/memory_pool.go b/stream/memory_pool.go index e23b649..e86ca19 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -146,13 +146,13 @@ func (m *memoryPool) Reset() { func (m *memoryPool) FreeHead() { utils.Assert(!m.marked) - utils.Assert(!m.blockQueue.IsEmpty()) - if m.discardBlockCount > 1 { + if m.discardBlockCount > 0 { m.discardBlockCount-- return } + utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.Pop().(int) m.head += size @@ -162,22 +162,30 @@ func (m *memoryPool) FreeHead() { } else if m.head >= m.capacity { m.head = 0 } + + if m.blockQueue.IsEmpty() { + m.markIndex = 0 + } } func (m *memoryPool) FreeTail() { utils.Assert(!m.marked) - utils.Assert(!m.blockQueue.IsEmpty()) - if m.discardBlockCount > 1 { + if m.discardBlockCount > 0 { m.discardBlockCount-- return } + utils.Assert(!m.blockQueue.IsEmpty()) size := m.blockQueue.PopBack().(int) m.tail -= size if m.tail == 0 && !m.blockQueue.IsEmpty() { m.tail = m.capacity } + + if m.blockQueue.IsEmpty() { + m.markIndex = 0 + } } func (m *memoryPool) Data() ([]byte, []byte) { diff --git a/stream/session.go b/stream/session.go deleted file mode 100644 index b91811b..0000000 --- a/stream/session.go +++ /dev/null @@ -1,17 +0,0 @@ -package stream - -import ( - "github.com/yangjiechina/avformat/utils" -) - -type SourceHook interface { - Publish(source Source, success func(), failure func(state utils.HookState)) - - PublishDone(source Source, success func(), failure func(state utils.HookState)) -} - -type SinkHook interface { - Play(sink Sink, success func(), failure func(state utils.HookState)) - - PlayDone(source Sink, success func(), failure func(state utils.HookState)) -} diff --git a/stream/sink.go b/stream/sink.go index ed30491..bd4e24f 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -47,6 +47,8 @@ type Sink interface { PrintInfo() string + RemoteAddr() string + // Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全 //如果Sink在等待队列-Sink断开, 这个过程是非线程安全的 //所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护. @@ -76,8 +78,6 @@ func GenerateSinkId(addr net.Addr) SinkId { } type BaseSink struct { - hookSession - Id_ SinkId SourceId_ string Protocol_ Protocol @@ -202,10 +202,17 @@ func (s *BaseSink) Close() { } else if s.State_ == SessionStateWait { RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_) //拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送. - HookPlayingDone(s, nil, nil) + HookPlayDoneEvent(s) } } - func (s *BaseSink) PrintInfo() string { return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_) } + +func (s *BaseSink) RemoteAddr() string { + if s.Conn == nil { + return "" + } + + return s.Conn.RemoteAddr().String() +} diff --git a/stream/sink_hook.go b/stream/sink_hook.go deleted file mode 100644 index 54e4ce4..0000000 --- a/stream/sink_hook.go +++ /dev/null @@ -1,94 +0,0 @@ -package stream - -import ( - "github.com/yangjiechina/avformat/utils" - "github.com/yangjiechina/lkm/log" - "net/http" -) - -func HookPlaying(s Sink, success func(), failure func(state utils.HookState)) { - f := func() { - source := SourceManager.Find(s.SourceId()) - if source == nil { - log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.Protocol().ToString(), s.Id(), s.SourceId()) - - { - s.Lock() - defer s.UnLock() - - if SessionStateClose == s.State() { - log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", s.PrintInfo()) - } else { - s.SetState(SessionStateWait) - AddSinkToWaitingQueue(s.SourceId(), s) - } - } - } else { - log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.Protocol().ToString(), s.Id(), s.SourceId()) - - source.AddEvent(SourceEventPlay, s) - } - } - - if !AppConfig.Hook.EnableOnPlay() { - f() - - if success != nil { - success() - } - return - } - - err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { - f() - - if success != nil { - success() - } - }, func(response *http.Response, err error) { - log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId()) - - if failure != nil { - failure(utils.HookStateFailure) - } - }) - - if err != nil { - log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId()) - - if failure != nil { - failure(utils.HookStateFailure) - } - return - } -} - -func HookPlayingDone(s Sink, success func(), failure func(state utils.HookState)) { - if !AppConfig.Hook.EnableOnPlayDone() { - if success != nil { - success() - } - return - } - - err := hookEvent(HookEventPlayDone, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { - if success != nil { - success() - } - }, func(response *http.Response, err error) { - log.Sugar.Errorf("Hook播放结束事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId()) - - if failure != nil { - failure(utils.HookStateFailure) - } - }) - - if err != nil { - log.Sugar.Errorf("Hook播放结束事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.Protocol().ToString(), s.Id(), s.SourceId()) - - if failure != nil { - failure(utils.HookStateFailure) - } - return - } -} diff --git a/stream/source.go b/stream/source.go index 1c0f5c4..f526b1d 100644 --- a/stream/source.go +++ b/stream/source.go @@ -3,8 +3,8 @@ package stream import ( "fmt" "github.com/yangjiechina/lkm/log" + "math" "net" - "net/http" "time" "github.com/yangjiechina/avformat/stream" @@ -107,14 +107,20 @@ type Source interface { // OnDeMuxDone 所有流解析完毕回调 OnDeMuxDone() + Init(input func(data []byte) error) + LoopEvent() - Init(input func(data []byte) error) + RemoteAddr() string + + // StartReceiveDataTimer 启动收流超时计时器 + StartReceiveDataTimer() + + // StartIdleTimer 启动拉流空闲计时器 + StartIdleTimer() } type PublishSource struct { - hookSession - Id_ string Type_ SourceType state SessionState @@ -141,12 +147,20 @@ type PublishSource struct { //sink的拉流和断开拉流事件,都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作 //golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件 - inputEvent chan []byte - responseEvent chan byte //解析完input的数据后,才能继续从网络io中读取流 - closeEvent chan byte - playingEventQueue chan Sink - playingDoneEventQueue chan Sink - probeTimoutEvent chan bool + inputDataEvent chan []byte + dataConsumedEvent chan byte //解析完input的数据后,才能继续从网络io中读取流 + closedEvent chan byte + playingEventQueue chan Sink + playingDoneEventQueue chan Sink + probeTimoutEvent chan bool + receiveDataTimeoutEvent chan byte + idleTimeoutEvent chan byte + + lastPacketTime time.Time + removeSinkTime time.Time + receiveDataTimer *time.Timer + idleTimer *time.Timer + sinkCount int } func (s *PublishSource) Id() string { @@ -160,13 +174,20 @@ func (s *PublishSource) Init(input func(data []byte) error) { s.SetState(SessionStateTransferring) //收流和网络断开的chan都阻塞执行 - s.inputEvent = make(chan []byte) - s.responseEvent = make(chan byte) - s.closeEvent = make(chan byte) + s.inputDataEvent = make(chan []byte) + s.dataConsumedEvent = make(chan byte) + s.closedEvent = make(chan byte) s.playingEventQueue = make(chan Sink, 128) s.playingDoneEventQueue = make(chan Sink, 128) s.probeTimoutEvent = make(chan bool) + if AppConfig.ReceiveTimeout > 0 { + s.receiveDataTimeoutEvent = make(chan byte) + } + if AppConfig.IdleTimeout > 0 { + s.idleTimeoutEvent = make(chan byte) + } + if s.transStreams == nil { s.transStreams = make(map[TransStreamId]TransStream, 10) } @@ -199,11 +220,10 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe s.pktBuffers[index] = NewRbMemoryPool(48000 * 64) } else if AppConfig.GOPCache { //开启GOP缓存 - //以每秒钟4M码率大小创建视频内存池 s.pktBuffers[index] = NewRbMemoryPool(AppConfig.GOPBufferSize) } else { //未开启GOP缓存 - //以每秒钟4M的1/8码率大小创建视频内存池 + //1M缓存大小, 单帧绰绰有余 s.pktBuffers[index] = NewRbMemoryPool(1024 * 1000) } } @@ -214,13 +234,17 @@ func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMe func (s *PublishSource) LoopEvent() { for { select { - case data := <-s.inputEvent: + case data := <-s.inputDataEvent: + if AppConfig.ReceiveTimeout > 0 { + s.lastPacketTime = time.Now() + } + if err := s.Input_(data); err != nil { log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error()) s.Close() } - s.responseEvent <- 0 + s.dataConsumedEvent <- 0 break case sink := <-s.playingEventQueue: if !s.completed { @@ -234,12 +258,22 @@ func (s *PublishSource) LoopEvent() { case sink := <-s.playingDoneEventQueue: s.RemoveSink(sink) break - case _ = <-s.closeEvent: + case _ = <-s.closedEvent: s.Close() return case _ = <-s.probeTimoutEvent: s.writeHeader() break + case _ = <-s.receiveDataTimeoutEvent: + log.Sugar.Errorf("收流超时 source:%s", s.Id_) + s.Close() + HookReceiveTimeoutEvent(s) + break + case _ = <-s.idleTimeoutEvent: + log.Sugar.Errorf("空闲超时 source:%s", s.Id_) + s.Close() + HookIdleTimeoutEvent(s) + break } } } @@ -350,6 +384,8 @@ func (s *PublishSource) AddSink(sink Sink) bool { sink.SetState(SessionStateTransferring) } + s.sinkCount++ + //新的传输流,发送缓存的音视频帧 if !ok && AppConfig.GOPCache && s.existVideo { s.gopBuffer.PeekAll(func(packet utils.AVPacket) { @@ -367,7 +403,9 @@ func (s *PublishSource) RemoveSink(sink Sink) bool { //如果从传输流没能删除sink, 再从等待队列删除 _, b := transStream.RemoveSink(sink.Id()) if b { - HookPlayingDone(sink, nil, nil) + s.sinkCount-- + s.removeSinkTime = time.Now() + HookPlayDoneEvent(sink) return true } } @@ -378,14 +416,14 @@ func (s *PublishSource) RemoveSink(sink Sink) bool { func (s *PublishSource) AddEvent(event SourceEvent, data interface{}) { if SourceEventInput == event { - s.inputEvent <- data.([]byte) - <-s.responseEvent + s.inputDataEvent <- data.([]byte) + <-s.dataConsumedEvent } else if SourceEventPlay == event { s.playingEventQueue <- data.(Sink) } else if SourceEventPlayDone == event { s.playingDoneEventQueue <- data.(Sink) } else if SourceEventClose == event { - s.closeEvent <- 0 + s.closedEvent <- 0 } } @@ -398,11 +436,24 @@ func (s *PublishSource) Close() { if s.gopBuffer != nil { s.gopBuffer.Clear() } + if s.probeTimer != nil { + s.probeTimer.Stop() + } + if s.receiveDataTimer != nil { + s.receiveDataTimer.Stop() + } + if s.idleTimer != nil { + s.idleTimer.Stop() + } //释放解复用器 //释放转码器 //释放每路转协议流, 将所有sink添加到等待队列 - _, _ = SourceManager.Remove(s.Id_) + _, err := SourceManager.Remove(s.Id_) + if err != nil { + log.Sugar.Errorf("删除源失败 source:%s err:%s", s.Id_, err.Error()) + } + for _, transStream := range s.transStreams { transStream.Close() @@ -421,6 +472,7 @@ func (s *PublishSource) Close() { }) } + HookPublishDoneEvent(s) s.transStreams = nil } @@ -517,46 +569,46 @@ func (s *PublishSource) OnDeMuxDone() { } -func (s *PublishSource) Publish(source Source, success func(), failure func(state utils.HookState)) { - //streamId 已经被占用 - if source_ := SourceManager.Find(source.Id()); source_ != nil { - fmt.Printf("推流已经占用 Source:%s", source.Id()) - failure(utils.HookStateOccupy) - } - - if !AppConfig.Hook.EnableOnPublish() { - if err := SourceManager.Add(source); err == nil { - success() - } else { - fmt.Printf("添加失败 Source:%s", source.Id()) - failure(utils.HookStateOccupy) - } - - return - } - - err := s.Hook(HookEventPublish, NewPublishHookEventInfo(source.Id(), "", source.Type()), - func(response *http.Response) { - if err := SourceManager.Add(source); err == nil { - success() - } else { - failure(utils.HookStateOccupy) - } - }, func(response *http.Response, err error) { - failure(utils.HookStateFailure) - }) - - //hook地址连接失败 - if err != nil { - failure(utils.HookStateFailure) - return - } -} - -func (s *PublishSource) PublishDone(source Source, success func(), failure func(state utils.HookState)) { - -} - func (s *PublishSource) Type() SourceType { return s.Type_ } + +func (s *PublishSource) RemoteAddr() string { + if s.Conn == nil { + return "" + } + + return s.Conn.RemoteAddr().String() +} + +func (s *PublishSource) StartReceiveDataTimer() { + utils.Assert(s.receiveDataTimer == nil) + utils.Assert(AppConfig.ReceiveTimeout > 0) + + s.lastPacketTime = time.Now() + s.receiveDataTimer = time.AfterFunc(time.Duration(AppConfig.ReceiveTimeout), func() { + dis := time.Now().Sub(s.lastPacketTime) + + if dis >= time.Duration(AppConfig.ReceiveTimeout) { + s.receiveDataTimeoutEvent <- 0 + } else { + s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis)))) + } + }) +} + +func (s *PublishSource) StartIdleTimer() { + utils.Assert(s.idleTimer == nil) + utils.Assert(AppConfig.IdleTimeout > 0) + + s.removeSinkTime = time.Now() + s.idleTimer = time.AfterFunc(time.Duration(AppConfig.IdleTimeout), func() { + dis := time.Now().Sub(s.removeSinkTime) + + if s.sinkCount < 1 && dis >= time.Duration(AppConfig.IdleTimeout) { + s.idleTimeoutEvent <- 0 + } else { + s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis))))) + } + }) +}