feat: add pause and resume

This commit is contained in:
langhuihui
2023-06-06 19:25:51 +08:00
parent f4fb7881f7
commit 28a51b9b60
4 changed files with 43 additions and 10 deletions

View File

@@ -35,6 +35,7 @@ type Publish struct {
WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连)
DelayCloseTimeout time.Duration // 延迟自动关闭(无订阅时)
IdleTimeout time.Duration // 空闲(无订阅)超时
PauseTimeout time.Duration `default:"30s"` // 暂停超时
BufferTime time.Duration // 缓冲长度(单位:秒)0代表取最近关键帧
Key string // 发布鉴权key
SecretArgName string `default:"secret"` // 发布鉴权参数名
@@ -123,8 +124,8 @@ type Engine struct {
Publish
Subscribe
HTTP
EnableAVCC bool `default:"true"` //启用AVCC格式rtmp协议使用
EnableRTP bool `default:"true"` //启用RTP格式rtsp、gb18181等协议使用
EnableAVCC bool `default:"true"` //启用AVCC格式rtmp、http-flv协议使用
EnableRTP bool `default:"true"` //启用RTP格式rtsp、webrtc等协议使用
EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能
EnableAuth bool `default:"true"` //启用鉴权
Console

1
io.go
View File

@@ -197,6 +197,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
s.PublishTimeout = conf.PublishTimeout
s.DelayCloseTimeout = conf.DelayCloseTimeout
s.IdleTimeout = conf.IdleTimeout
s.PauseTimeout = conf.PauseTimeout
defer func() {
if err == nil {
if oldPublisher == nil {

View File

@@ -189,12 +189,29 @@ func (opt *Plugin) registerHandler() {
// 注册http响应
for i, j := 0, t.NumMethod(); i < j; i++ {
name := t.Method(i).Name
if handler, ok := v.Method(i).Interface().(func(http.ResponseWriter, *http.Request)); ok {
patten := "/"
if name != "ServeHTTP" {
patten = strings.ToLower(strings.ReplaceAll(name, "_", "/"))
}
switch handler := v.Method(i).Interface().(type) {
case func(http.ResponseWriter, *http.Request):
opt.handle(patten, http.HandlerFunc(handler))
// case func(*http.Request) (int, string, any):
// opt.handle(patten, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// code, msg, data := handler(r)
// switch returnMode {
// case "json":
// rw.Header().Set("Content-Type", "application/json")
// rw.WriteHeader(http.StatusOK)
// json.NewEncoder(rw).Encode(map[string]interface{}{
// "code": code,
// "msg": msg,
// "data": data,
// })
// default:
// http.Error(rw, msg, code)
// }
// }))
}
}
}

View File

@@ -124,6 +124,7 @@ type StreamTimeoutConfig struct {
PublishTimeout time.Duration //发布者无数据后超时
DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活
IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活
PauseTimeout time.Duration //暂停后超时
NeverTimeout bool // 永不超时
}
type Tracks struct {
@@ -188,6 +189,7 @@ type Stream struct {
Tracks Tracks
AppName string
StreamName string
IsPause bool // 是否处于暂停状态
}
type StreamSummay struct {
Path string
@@ -422,14 +424,18 @@ func (s *Stream) run() {
if !s.NeverTimeout {
hasTrackTimeout := false
trackCount := 0
timeout := s.PublishTimeout
if s.IsPause {
timeout = s.PauseTimeout
}
s.Tracks.Range(func(name string, t Track) {
trackCount++
if _, ok := t.(track.Custom); ok {
return
}
// track 超过一定时间没有更新数据了
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout {
s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout {
s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout))
hasTrackTimeout = true
}
})
@@ -589,6 +595,14 @@ func (s *Stream) RemoveTrack(t Track) {
s.Receive(TrackRemoved{t})
}
func (s *Stream) Pause() {
s.IsPause = true
}
func (s *Stream) Resume() {
s.IsPause = false
}
type TrackRemoved struct {
Track
}