From 8d6bcc7b1ba68fe24a3066fb3c6e2b5cf76c9589 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 22 May 2025 10:03:13 +0800 Subject: [PATCH] feat: add more hooks --- .gitignore | 1 + api.go | 209 ++++++++++++++----------------- example/8080/pull_mp4_file.yaml | 7 +- example/8081/default.yaml | 13 ++ pkg/config/types.go | 30 +++-- pkg/task/job.go | 54 +++++--- pkg/task/task.go | 12 +- pkg/task/task_test.go | 20 +++ plugin.go | 178 ++++++++++++-------------- plugin/monitor/index.go | 9 +- plugin/mp4/api.go | 62 +++++---- plugin/rtmp/pkg/client.go | 7 +- plugin/rtmp/pkg/video.go | 30 ++--- pull-proxy.go => pull_proxy.go | 0 puller.go | 27 ++++ push-proxy.go => push_proxy.go | 0 pusher.go | 25 ++++ recoder.go | 26 ++++ transformer.go | 22 ++++ wait-stream.go => wait_stream.go | 0 20 files changed, 434 insertions(+), 298 deletions(-) create mode 100644 example/8081/default.yaml rename pull-proxy.go => pull_proxy.go (100%) rename push-proxy.go => push_proxy.go (100%) rename wait-stream.go => wait_stream.go (100%) diff --git a/.gitignore b/.gitignore index b0d873e..e10bbaa 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ __debug* example/default/* !example/default/main.go !example/default/config.yaml +shutdown.sh \ No newline at end of file diff --git a/api.go b/api.go index e6f4892..ca82599 100644 --- a/api.go +++ b/api.go @@ -79,7 +79,7 @@ func (s *Server) DisabledPlugins(ctx context.Context, _ *emptypb.Empty) (res *pb // /api/stream/annexb/{streamPath} func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) { - publisher, ok := s.Streams.Get(r.PathValue("streamPath")) + publisher, ok := s.Streams.SafeGet(r.PathValue("streamPath")) if !ok || publisher.VideoTrack.AVTrack == nil { http.Error(rw, pkg.ErrNotFound.Error(), http.StatusNotFound) return @@ -195,18 +195,15 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res } return nil }) - s.Streams.Call(func() error { - if pub, ok := s.Streams.Get(req.StreamPath); ok { - res, err = s.getStreamInfo(pub) - if err != nil { - return err - } - res.Data.Recording = recordings - } else { - err = pkg.ErrNotFound + if pub, ok := s.Streams.SafeGet(req.StreamPath); ok { + res, err = s.getStreamInfo(pub) + if err != nil { + return } - return nil - }) + res.Data.Recording = recordings + } else { + err = pkg.ErrNotFound + } return } @@ -324,50 +321,47 @@ func (s *Server) GetSubscribers(context.Context, *pb.SubscribersRequest) (res *p return } func (s *Server) AudioTrackSnap(_ context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) { - s.Streams.Call(func() error { - if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() { - data := &pb.TrackSnapShotData{} - if pub.AudioTrack.Allocator != nil { - for _, memlist := range pub.AudioTrack.Allocator.GetChildren() { - var list []*pb.MemoryBlock - for _, block := range memlist.GetBlocks() { - list = append(list, &pb.MemoryBlock{ - S: uint32(block.Start), - E: uint32(block.End), - }) - } - data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)}) + if pub, ok := s.Streams.SafeGet(req.StreamPath); ok && pub.HasAudioTrack() { + data := &pb.TrackSnapShotData{} + if pub.AudioTrack.Allocator != nil { + for _, memlist := range pub.AudioTrack.Allocator.GetChildren() { + var list []*pb.MemoryBlock + for _, block := range memlist.GetBlocks() { + list = append(list, &pb.MemoryBlock{ + S: uint32(block.Start), + E: uint32(block.End), + }) } + data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)}) } - pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) { - if len(v.Wraps) > 0 { - var snap pb.TrackSnapShot - snap.Sequence = v.Sequence - snap.Timestamp = uint32(v.Timestamp / time.Millisecond) - snap.WriteTime = timestamppb.New(v.WriteTime) - snap.Wrap = make([]*pb.Wrap, len(v.Wraps)) - snap.KeyFrame = v.IDR - data.RingDataSize += uint32(v.Wraps[0].GetSize()) - for i, wrap := range v.Wraps { - snap.Wrap[i] = &pb.Wrap{ - Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond), - Size: uint32(wrap.GetSize()), - Data: wrap.String(), - } - } - data.Ring = append(data.Ring, &snap) - } - }) - res = &pb.TrackSnapShotResponse{ - Code: 0, - Message: "success", - Data: data, - } - } else { - err = pkg.ErrNotFound } - return nil - }) + pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) { + if len(v.Wraps) > 0 { + var snap pb.TrackSnapShot + snap.Sequence = v.Sequence + snap.Timestamp = uint32(v.Timestamp / time.Millisecond) + snap.WriteTime = timestamppb.New(v.WriteTime) + snap.Wrap = make([]*pb.Wrap, len(v.Wraps)) + snap.KeyFrame = v.IDR + data.RingDataSize += uint32(v.Wraps[0].GetSize()) + for i, wrap := range v.Wraps { + snap.Wrap[i] = &pb.Wrap{ + Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond), + Size: uint32(wrap.GetSize()), + Data: wrap.String(), + } + } + data.Ring = append(data.Ring, &snap) + } + }) + res = &pb.TrackSnapShotResponse{ + Code: 0, + Message: "success", + Data: data, + } + } else { + err = pkg.ErrNotFound + } return } func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) { @@ -437,50 +431,47 @@ func (s *Server) api_AudioTrack_SSE(rw http.ResponseWriter, r *http.Request) { } func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) { - s.Streams.Call(func() error { - if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() { - data := &pb.TrackSnapShotData{} - if pub.VideoTrack.Allocator != nil { - for _, memlist := range pub.VideoTrack.Allocator.GetChildren() { - var list []*pb.MemoryBlock - for _, block := range memlist.GetBlocks() { - list = append(list, &pb.MemoryBlock{ - S: uint32(block.Start), - E: uint32(block.End), - }) - } - data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)}) + if pub, ok := s.Streams.SafeGet(req.StreamPath); ok && pub.HasVideoTrack() { + data := &pb.TrackSnapShotData{} + if pub.VideoTrack.Allocator != nil { + for _, memlist := range pub.VideoTrack.Allocator.GetChildren() { + var list []*pb.MemoryBlock + for _, block := range memlist.GetBlocks() { + list = append(list, &pb.MemoryBlock{ + S: uint32(block.Start), + E: uint32(block.End), + }) } + data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)}) } - pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) { - if len(v.Wraps) > 0 { - var snap pb.TrackSnapShot - snap.Sequence = v.Sequence - snap.Timestamp = uint32(v.Timestamp / time.Millisecond) - snap.WriteTime = timestamppb.New(v.WriteTime) - snap.Wrap = make([]*pb.Wrap, len(v.Wraps)) - snap.KeyFrame = v.IDR - data.RingDataSize += uint32(v.Wraps[0].GetSize()) - for i, wrap := range v.Wraps { - snap.Wrap[i] = &pb.Wrap{ - Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond), - Size: uint32(wrap.GetSize()), - Data: wrap.String(), - } - } - data.Ring = append(data.Ring, &snap) - } - }) - res = &pb.TrackSnapShotResponse{ - Code: 0, - Message: "success", - Data: data, - } - } else { - err = pkg.ErrNotFound } - return nil - }) + pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) { + if len(v.Wraps) > 0 { + var snap pb.TrackSnapShot + snap.Sequence = v.Sequence + snap.Timestamp = uint32(v.Timestamp / time.Millisecond) + snap.WriteTime = timestamppb.New(v.WriteTime) + snap.Wrap = make([]*pb.Wrap, len(v.Wraps)) + snap.KeyFrame = v.IDR + data.RingDataSize += uint32(v.Wraps[0].GetSize()) + for i, wrap := range v.Wraps { + snap.Wrap[i] = &pb.Wrap{ + Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond), + Size: uint32(wrap.GetSize()), + Data: wrap.String(), + } + } + data.Ring = append(data.Ring, &snap) + } + }) + res = &pb.TrackSnapShotResponse{ + Code: 0, + Message: "success", + Data: data, + } + } else { + err = pkg.ErrNotFound + } return } @@ -500,7 +491,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *pb.S func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { if subscriber, ok := s.Subscribers.Get(req.Id); ok { - if pub, ok := s.Streams.Get(req.StreamPath); ok { + if pub, ok := s.Streams.SafeGet(req.StreamPath); ok { subscriber.Publisher.RemoveSubscriber(subscriber) subscriber.StreamPath = req.StreamPath pub.AddSubscriber(subscriber) @@ -527,7 +518,7 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { - if s, ok := s.Streams.Get(req.StreamPath); ok { + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { s.Pause() } return nil @@ -537,7 +528,7 @@ func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (re func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { - if s, ok := s.Streams.Get(req.StreamPath); ok { + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { s.Resume() } return nil @@ -547,7 +538,7 @@ func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (r func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { - if s, ok := s.Streams.Get(req.StreamPath); ok { + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { s.Speed = float64(req.Speed) s.Scale = float64(req.Speed) s.Info("set stream speed", "speed", req.Speed) @@ -559,7 +550,7 @@ func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedReque func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { - if s, ok := s.Streams.Get(req.StreamPath); ok { + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { s.Seek(time.Unix(int64(req.TimeStamp), 0)) } return nil @@ -569,7 +560,7 @@ func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { - if s, ok := s.Streams.Get(req.StreamPath); ok { + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { s.Stop(task.ErrStopByUser) } return nil @@ -632,24 +623,18 @@ func (s *Server) Api_Summary_SSE(rw http.ResponseWriter, r *http.Request) { func (s *Server) Api_Stream_Position_SSE(rw http.ResponseWriter, r *http.Request) { streamPath := r.URL.Query().Get("streamPath") util.ReturnFetchValue(func() (t time.Time) { - s.Streams.Call(func() error { - if pub, ok := s.Streams.Get(streamPath); ok { - t = pub.GetPosition() - } - return nil - }) + if pub, ok := s.Streams.SafeGet(streamPath); ok { + t = pub.GetPosition() + } return }, rw, r) } // func (s *Server) Api_Vod_Position(rw http.ResponseWriter, r *http.Request) { // streamPath := r.URL.Query().Get("streamPath") -// s.Streams.Call(func() error { -// if pub, ok := s.Streams.Get(streamPath); ok { -// t = pub.GetPosition() -// } -// return nil -// }) +// if pub, ok := s.Streams.SafeGet(streamPath); ok { +// t = pub.GetPosition() +// } // } func (s *Server) Summary(context.Context, *emptypb.Empty) (res *pb.SummaryResponse, err error) { diff --git a/example/8080/pull_mp4_file.yaml b/example/8080/pull_mp4_file.yaml index 1b0f4c5..5d29440 100644 --- a/example/8080/pull_mp4_file.yaml +++ b/example/8080/pull_mp4_file.yaml @@ -7,4 +7,9 @@ rtsp: mp4: enable: true pull: - live/test: /Users/dexter/Movies/test.mp4 \ No newline at end of file + live/test: /Users/dexter/Movies/test.mp4 +rtmp: + enable: true + +debug: + enable: true diff --git a/example/8081/default.yaml b/example/8081/default.yaml new file mode 100644 index 0000000..02c552a --- /dev/null +++ b/example/8081/default.yaml @@ -0,0 +1,13 @@ +global: + # loglevel: debug + http: + listenaddr: :8081 + listenaddrtls: :8555 + tcp: + listenaddr: :50052 +rtsp: + enable: false +rtmp: + tcp: :1936 +webrtc: + enable: false \ No newline at end of file diff --git a/pkg/config/types.go b/pkg/config/types.go index 1cdcec0..867f297 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -16,11 +16,19 @@ const ( RelayModeRelay = "relay" RelayModeMix = "mix" - HookOnPublish HookType = "publish" - HookOnSubscribe HookType = "subscribe" - HookOnPublishEnd HookType = "publish_end" - HookOnSubscribeEnd HookType = "subscribe_end" HookOnServerKeepAlive HookType = "server_keep_alive" + HookOnPublishStart HookType = "publish_start" + HookOnPublishEnd HookType = "publish_end" + HookOnSubscribeStart HookType = "subscribe_start" + HookOnSubscribeEnd HookType = "subscribe_end" + HookOnPullStart HookType = "pull_start" + HookOnPullEnd HookType = "pull_end" + HookOnPushStart HookType = "push_start" + HookOnPushEnd HookType = "push_end" + HookOnRecordStart HookType = "record_start" + HookOnRecordEnd HookType = "record_end" + HookOnTransformStart HookType = "transform_start" + HookOnTransformEnd HookType = "transform_end" ) type ( @@ -99,13 +107,13 @@ type ( Transform map[Regexp]Transform } Webhook struct { - URL string `yaml:"url" json:"url"` // Webhook 地址 - Method string `yaml:"method" json:"method" default:"POST"` // HTTP 方法 - Headers map[string]string `yaml:"headers" json:"headers"` // 自定义请求头 - TimeoutSeconds int `yaml:"timeout" json:"timeout" default:"5"` // 超时时间(秒) - RetryTimes int `yaml:"retry" json:"retry" default:"3"` // 重试次数 - RetryInterval time.Duration `yaml:"retryInterval" json:"retryInterval" default:"1s"` // 重试间隔 - Interval int `yaml:"interval" json:"interval" default:"60"` // 保活间隔(秒) + URL string // Webhook 地址 + Method string `default:"POST"` // HTTP 方法 + Headers map[string]string // 自定义请求头 + TimeoutSeconds int `default:"5"` // 超时时间(秒) + RetryTimes int `default:"3"` // 重试次数 + RetryInterval time.Duration `default:"1s"` // 重试间隔 + Interval int `default:"60"` // 保活间隔(秒) } Common struct { PublicIP string diff --git a/pkg/task/job.go b/pkg/task/job.go index c331450..ee2b696 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -32,14 +32,15 @@ func GetNextTaskID() uint32 { // Job include tasks type Job struct { Task - cases []reflect.SelectCase - addSub chan ITask - children []ITask - lazyRun sync.Once - eventLoopLock sync.Mutex - childrenDisposed chan struct{} - childDisposeListeners []func(ITask) - blocked ITask + cases []reflect.SelectCase + addSub chan ITask + children []ITask + lazyRun sync.Once + eventLoopLock sync.Mutex + childrenDisposed chan struct{} + descendantsDisposeListeners []func(ITask) + descendantsStartListeners []func(ITask) + blocked ITask } func (*Job) GetTaskType() TaskType { @@ -68,12 +69,12 @@ func (mt *Job) waitChildrenDispose() { } } -func (mt *Job) OnChildDispose(listener func(ITask)) { - mt.childDisposeListeners = append(mt.childDisposeListeners, listener) +func (mt *Job) OnDescendantsDispose(listener func(ITask)) { + mt.descendantsDisposeListeners = append(mt.descendantsDisposeListeners, listener) } func (mt *Job) onDescendantsDispose(descendants ITask) { - for _, listener := range mt.childDisposeListeners { + for _, listener := range mt.descendantsDisposeListeners { listener(descendants) } if mt.parent != nil { @@ -82,11 +83,28 @@ func (mt *Job) onDescendantsDispose(descendants ITask) { } func (mt *Job) onChildDispose(child ITask) { - if child.getParent() == mt { - if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" { - mt.onDescendantsDispose(child) - } - child.dispose() + if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" { + mt.onDescendantsDispose(child) + } + child.dispose() +} + +func (mt *Job) OnDescendantsStart(listener func(ITask)) { + mt.descendantsStartListeners = append(mt.descendantsStartListeners, listener) +} + +func (mt *Job) onDescendantsStart(descendants ITask) { + for _, listener := range mt.descendantsStartListeners { + listener(descendants) + } + if mt.parent != nil { + mt.parent.onDescendantsStart(descendants) + } +} + +func (mt *Job) onChildStart(child ITask) { + if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" { + mt.onDescendantsStart(child) } } @@ -211,9 +229,10 @@ func (mt *Job) run() { if rev.IsNil() { return } - if mt.blocked = rev.Interface().(ITask); mt.blocked.getParent() != mt || mt.blocked.start() { + if mt.blocked = rev.Interface().(ITask); mt.blocked.start() { mt.children = append(mt.children, mt.blocked) mt.cases = append(mt.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.blocked.GetSignal())}) + mt.onChildStart(mt.blocked) } } else { taskIndex := chosen - 1 @@ -236,6 +255,7 @@ func (mt *Job) run() { if mt.onChildDispose(mt.blocked); mt.blocked.checkRetry(mt.blocked.StopReason()) { if mt.blocked.reset(); mt.blocked.start() { mt.cases[chosen].Chan = reflect.ValueOf(mt.blocked.GetSignal()) + mt.onChildStart(mt.blocked) continue } } diff --git a/pkg/task/task.go b/pkg/task/task.go index 4147c41..f00d6f5 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -53,7 +53,6 @@ type ( ITask interface { context.Context keepalive() bool - getParent() *Job GetParent() ITask GetTask() *Task GetTaskID() uint32 @@ -85,7 +84,8 @@ type ( getJob() *Job AddTask(ITask, ...any) *Task RangeSubTask(func(yield ITask) bool) - OnChildDispose(func(ITask)) + OnDescendantsDispose(func(ITask)) + OnDescendantsStart(func(ITask)) Blocked() ITask Call(func() error, ...any) Post(func() error, ...any) *Task @@ -178,10 +178,6 @@ func (task *Task) GetTaskPointer() uintptr { return uintptr(unsafe.Pointer(task)) } -func (task *Task) getParent() *Job { - return task.parent -} - func (task *Task) GetKey() uint32 { return task.ID } @@ -435,6 +431,10 @@ func (task *Task) ResetRetryCount() { task.retry.RetryCount = 0 } +func (task *Task) GetRetryCount() int { + return task.retry.RetryCount +} + func (task *Task) run(handler func() error) { var err error defer func() { diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index d64ab4c..f988565 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -142,6 +142,26 @@ func Test_Hooks(t *testing.T) { root.AddTask(&task).WaitStopped() } +type startFailTask struct { + Task +} + +func (task *startFailTask) Start() error { + return errors.New("start failed") +} + +func (task *startFailTask) Dispose() { + task.Logger.Info("Dispose") +} + +func Test_StartFail(t *testing.T) { + var task startFailTask + root.AddTask(&task) + if err := task.WaitStarted(); err == nil { + t.Errorf("expected start to fail") + } +} + // //type DemoTask struct { // Task diff --git a/plugin.go b/plugin.go index 3feb62e..3b9b648 100644 --- a/plugin.go +++ b/plugin.go @@ -386,13 +386,13 @@ type WebHookTask struct { task.Task plugin *Plugin hookType config.HookType - conf *config.Webhook + conf config.Webhook data any jsonData []byte } func (t *WebHookTask) Start() error { - if t.conf == nil || t.conf.URL == "" { + if t.conf.URL == "" { return task.ErrTaskComplete } @@ -437,11 +437,11 @@ func (t *WebHookTask) Go() error { return err } -func (p *Plugin) SendWebhook(hookType config.HookType, conf config.Webhook, data any) *task.Task { +func (p *Plugin) SendWebhook(hookType config.HookType, data any) *task.Task { webhookTask := &WebHookTask{ plugin: p, hookType: hookType, - conf: &conf, + conf: p.config.Hook[hookType], data: data, } return p.AddTask(webhookTask) @@ -560,10 +560,31 @@ func (p *Plugin) PublishWithConfig(ctx context.Context, streamPath string, conf } err = p.Server.Streams.AddTask(publisher, ctx).WaitStarted() if err == nil { - publisher.OnDispose(func() { - p.sendPublishEndWebhook(publisher) - }) - p.sendPublishWebhook(publisher) + if sender := p.getHookSender(config.HookOnPublishEnd); sender != nil { + publisher.OnDispose(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnPublishEnd, + "streamPath": publisher.StreamPath, + "publishId": publisher.ID, + "reason": publisher.StopReason().Error(), + "timestamp": time.Now().Unix(), + } + sender(config.HookOnPublishEnd, webhookData) + }) + } + if sender := p.getHookSender(config.HookOnPublishStart); sender != nil { + webhookData := map[string]interface{}{ + "event": config.HookOnPublishStart, + "streamPath": publisher.StreamPath, + "args": publisher.Args, + "publishId": publisher.ID, + "remoteAddr": publisher.RemoteAddr, + "type": publisher.Type, + "pluginName": p.Meta.Name, + "timestamp": time.Now().Unix(), + } + sender(config.HookOnPublishStart, webhookData) + } } return } @@ -601,10 +622,34 @@ func (p *Plugin) SubscribeWithConfig(ctx context.Context, streamPath string, con } } if err == nil { - subscriber.OnDispose(func() { - p.sendSubscribeEndWebhook(subscriber) - }) - p.sendSubscribeWebhook(subscriber) + if sender := p.getHookSender(config.HookOnSubscribeEnd); sender != nil { + subscriber.OnDispose(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnSubscribeEnd, + "streamPath": subscriber.StreamPath, + "subscriberId": subscriber.ID, + "reason": subscriber.StopReason().Error(), + "timestamp": time.Now().Unix(), + } + if subscriber.Publisher != nil { + webhookData["publishId"] = subscriber.Publisher.ID + } + sender(config.HookOnSubscribeEnd, webhookData) + }) + } + if sender := p.getHookSender(config.HookOnSubscribeStart); sender != nil { + webhookData := map[string]interface{}{ + "event": config.HookOnSubscribeStart, + "streamPath": subscriber.StreamPath, + "publishId": subscriber.Publisher.ID, + "subscriberId": subscriber.ID, + "remoteAddr": subscriber.RemoteAddr, + "type": subscriber.Type, + "args": subscriber.Args, + "timestamp": time.Now().Unix(), + } + sender(config.HookOnSubscribeStart, webhookData) + } } return } @@ -715,90 +760,17 @@ func (p *Plugin) handle(pattern string, handler http.Handler) { p.Server.apiList = append(p.Server.apiList, pattern) } -func (p *Plugin) sendPublishWebhook(pub *Publisher) { - if p.config.Hook == nil { - return +func (p *Plugin) getHookSender(hookType config.HookType) (sender func(hookType config.HookType, data any) *task.Task) { + if p.config.Hook != nil { + if _, ok := p.config.Hook[hookType]; ok { + sender = p.SendWebhook + } else if p.Server.config.Hook != nil { + if _, ok := p.Server.config.Hook[hookType]; ok { + sender = p.Server.SendWebhook + } + } } - webhookData := map[string]interface{}{ - "event": "publish", - "streamPath": pub.StreamPath, - "args": pub.Args, - "publishId": pub.ID, - "remoteAddr": pub.RemoteAddr, - "type": pub.Type, - "pluginName": p.Meta.Name, - "timestamp": time.Now().Unix(), - } - p.SendWebhook(config.HookOnPublish, p.config.Hook[config.HookOnPublish], webhookData) - if p.Server.config.Hook == nil { - return - } - p.Server.SendWebhook(config.HookOnPublish, p.Server.config.Hook[config.HookOnPublish], webhookData) -} - -func (p *Plugin) sendPublishEndWebhook(pub *Publisher) { - if p.config.Hook == nil { - return - } - webhookData := map[string]interface{}{ - "event": "publish_end", - "streamPath": pub.StreamPath, - "publishId": pub.ID, - "reason": pub.StopReason().Error(), - "timestamp": time.Now().Unix(), - } - p.SendWebhook(config.HookOnPublishEnd, p.config.Hook[config.HookOnPublishEnd], webhookData) -} - -func (p *Plugin) sendSubscribeWebhook(sub *Subscriber) { - if p.config.Hook == nil { - return - } - webhookData := map[string]interface{}{ - "event": "subscribe", - "streamPath": sub.StreamPath, - "publishId": sub.Publisher.ID, - "subscriberId": sub.ID, - "remoteAddr": sub.RemoteAddr, - "type": sub.Type, - "args": sub.Args, - "timestamp": time.Now().Unix(), - } - p.SendWebhook(config.HookOnSubscribe, p.config.Hook[config.HookOnSubscribe], webhookData) -} - -func (p *Plugin) sendSubscribeEndWebhook(sub *Subscriber) { - if p.config.Hook == nil { - return - } - webhookData := map[string]interface{}{ - "event": "subscribe_end", - "streamPath": sub.StreamPath, - "subscriberId": sub.ID, - "reason": sub.StopReason().Error(), - "timestamp": time.Now().Unix(), - } - if sub.Publisher != nil { - webhookData["publishId"] = sub.Publisher.ID - } - p.SendWebhook(config.HookOnSubscribeEnd, p.config.Hook[config.HookOnSubscribeEnd], webhookData) -} - -func (p *Plugin) sendServerKeepAliveWebhook() { - if p.config.Hook == nil { - return - } - s := p.Server - webhookData := map[string]interface{}{ - "event": "server_keep_alive", - "timestamp": time.Now().Unix(), - "streams": s.Streams.Length, - "subscribers": s.Subscribers.Length, - "publisherCount": s.Streams.Length, - "subscriberCount": s.Subscribers.Length, - "uptime": time.Since(s.StartTime).Seconds(), - } - p.SendWebhook(config.HookOnServerKeepAlive, p.config.Hook[config.HookOnServerKeepAlive], webhookData) + return } type ServerKeepAliveTask struct { @@ -811,5 +783,19 @@ func (t *ServerKeepAliveTask) GetTickInterval() time.Duration { } func (t *ServerKeepAliveTask) Tick(now any) { - t.plugin.sendServerKeepAliveWebhook() + sender := t.plugin.getHookSender(config.HookOnServerKeepAlive) + if sender == nil { + return + } + s := t.plugin.Server + webhookData := map[string]interface{}{ + "event": config.HookOnServerKeepAlive, + "timestamp": time.Now().Unix(), + "streams": s.Streams.Length, + "subscribers": s.Subscribers.Length, + "publisherCount": s.Streams.Length, + "subscriberCount": s.Subscribers.Length, + "uptime": time.Since(s.StartTime).Seconds(), + } + sender(config.HookOnServerKeepAlive, webhookData) } diff --git a/plugin/monitor/index.go b/plugin/monitor/index.go index b6af275..bae8feb 100644 --- a/plugin/monitor/index.go +++ b/plugin/monitor/index.go @@ -2,13 +2,14 @@ package plugin_monitor import ( "encoding/json" + "os" + "strings" + "time" + "m7s.live/v5" "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/monitor/pb" monitor "m7s.live/v5/plugin/monitor/pkg" - "os" - "strings" - "time" ) var _ = m7s.InstallPlugin[MonitorPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler) @@ -65,7 +66,7 @@ func (cfg *MonitorPlugin) OnInit() (err error) { cfg.Plugin.Server.OnBeforeDispose(func() { cfg.saveTask(cfg.Plugin.Server) }) - cfg.Plugin.Server.OnChildDispose(cfg.saveTask) + cfg.Plugin.Server.OnDescendantsDispose(cfg.saveTask) } return } diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index 5249d8d..6a82868 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -371,20 +371,17 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) err = pkg.ErrRecordExists return } - p.Server.Streams.Call(func() error { - if stream, ok := p.Server.Streams.Get(req.StreamPath); ok { - recordConf := config.Record{ - Append: false, - Fragment: fragment, - FilePath: filePath, - } - job := p.Record(stream, recordConf, nil) - res.Data = uint64(uintptr(unsafe.Pointer(job.GetTask()))) - } else { - err = pkg.ErrNotFound + if stream, ok := p.Server.Streams.SafeGet(req.StreamPath); ok { + recordConf := config.Record{ + Append: false, + Fragment: fragment, + FilePath: filePath, } - return nil - }) + job := p.Record(stream, recordConf, nil) + res.Data = uint64(uintptr(unsafe.Pointer(job.GetTask()))) + } else { + err = pkg.ErrNotFound + } return } @@ -432,28 +429,25 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) ( return nil }) if tmpJob == nil { //为空表示没有正在进行的录制,也就是没有自动录像,则进行正常的事件录像 - p.Server.Streams.Call(func() error { - if stream, ok := p.Server.Streams.Get(req.StreamPath); ok { - recordConf := config.Record{ - Append: false, - Fragment: 0, - FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")), - } - //recordJob := recorder.GetRecordJob() - var subconfig config.Subscribe - defaults.SetDefaults(&subconfig) - subconfig.BufferTime = beforeDuration - recordJob := p.Record(stream, recordConf, &subconfig) - recordJob.EventId = req.EventId - recordJob.EventLevel = req.EventLevel - recordJob.EventName = req.EventName - recordJob.EventDesc = req.EventDesc - recordJob.AfterDuration = afterDuration - recordJob.BeforeDuration = beforeDuration - recordJob.Mode = m7s.RecordModeEvent + if stream, ok := p.Server.Streams.SafeGet(req.StreamPath); ok { + recordConf := config.Record{ + Append: false, + Fragment: 0, + FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")), } - return nil - }) + //recordJob := recorder.GetRecordJob() + var subconfig config.Subscribe + defaults.SetDefaults(&subconfig) + subconfig.BufferTime = beforeDuration + recordJob := p.Record(stream, recordConf, &subconfig) + recordJob.EventId = req.EventId + recordJob.EventLevel = req.EventLevel + recordJob.EventName = req.EventName + recordJob.EventDesc = req.EventDesc + recordJob.AfterDuration = afterDuration + recordJob.BeforeDuration = beforeDuration + recordJob.Mode = m7s.RecordModeEvent + } } else { if tmpJob.AfterDuration != 0 { //当前有事件录像正在录制,则更新该录像的结束时间 tmpJob.AfterDuration = time.Duration(tmpJob.Subscriber.VideoReader.AbsTime)*time.Millisecond + afterDuration diff --git a/plugin/rtmp/pkg/client.go b/plugin/rtmp/pkg/client.go index 30df8b0..623f7f9 100644 --- a/plugin/rtmp/pkg/client.go +++ b/plugin/rtmp/pkg/client.go @@ -3,12 +3,13 @@ package rtmp import ( "crypto/tls" "errors" - "m7s.live/v5/pkg/config" - "m7s.live/v5/pkg/task" "net" "net/url" "strings" + "m7s.live/v5/pkg/config" + "m7s.live/v5/pkg/task" + "m7s.live/v5" ) @@ -53,7 +54,7 @@ func (c *Client) Start() (err error) { return err } c.Init(conn) - c.Logger = c.Logger.With("local", conn.LocalAddr().String()) + c.SetDescription("local", conn.LocalAddr().String()) c.Info("connect") c.WriteChunkSize = c.chunkSize c.AppName = strings.Join(ps[1:len(ps)-1], "/") diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index 30e143f..4a8fd75 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -31,8 +31,8 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) { reader := avcc.NewReader() lenReader := reader.NewReader() reader.Skip(5) - lenReader.Skip(5) var afterFilter util.Memory + lenReader.RangeN(5, afterFilter.AppendOne) allocator := avcc.GetAllocator() var hasBadNalu bool for { @@ -49,7 +49,12 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) { reader.RangeN(int(naluLen), func(b []byte) { naluBuffer = append(naluBuffer, b) }) - if badType := codec.ParseH264NALUType(naluBuffer[0][0]); badType > 9 { + badType := codec.ParseH264NALUType(naluBuffer[0][0]) + switch badType { + case 5, 6, 1: + afterFilter.Append(lenBuffer...) + afterFilter.Append(naluBuffer...) + default: hasBadNalu = true if allocator != nil { for _, nalu := range lenBuffer { @@ -59,9 +64,6 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) { allocator.Free(nalu) } } - } else { - afterFilter.Append(lenBuffer...) - afterFilter.Append(naluBuffer...) } } if hasBadNalu { @@ -166,15 +168,15 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) { return } } else { - // switch ctx := t.ICodecCtx.(type) { - // case *codec.H264Ctx: - // avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) - // case *H265Ctx: - // avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) - // } - // if avcc.Size == 0 { - // return ErrSkip - // } + switch ctx := t.ICodecCtx.(type) { + case *codec.H264Ctx: + avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) + case *H265Ctx: + avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1) + } + if avcc.Size <= 5 { + return ErrSkip + } } } return diff --git a/pull-proxy.go b/pull_proxy.go similarity index 100% rename from pull-proxy.go rename to pull_proxy.go diff --git a/puller.go b/puller.go index fac2522..c32d8f4 100644 --- a/puller.go +++ b/puller.go @@ -122,6 +122,33 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c "maxRetry": conf.MaxRetry, }) puller.SetRetry(conf.MaxRetry, conf.RetryInterval) + + if sender := plugin.getHookSender(config.HookOnPullStart); sender != nil { + puller.OnStart(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnPullStart, + "streamPath": streamPath, + "url": conf.URL, + "args": conf.Args, + "pluginName": plugin.Meta.Name, + "timestamp": time.Now().Unix(), + } + sender(config.HookOnPullStart, webhookData) + }) + } + + if sender := plugin.getHookSender(config.HookOnPullEnd); sender != nil { + puller.OnDispose(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnPullEnd, + "streamPath": streamPath, + "reason": puller.StopReason().Error(), + "timestamp": time.Now().Unix(), + } + sender(config.HookOnPullEnd, webhookData) + }) + } + plugin.Server.Pulls.Add(p, plugin.Logger.With("pullURL", conf.URL, "streamPath", streamPath)) return p } diff --git a/push-proxy.go b/push_proxy.go similarity index 100% rename from push-proxy.go rename to push_proxy.go diff --git a/pusher.go b/pusher.go index d3eca6c..ff96d36 100644 --- a/pusher.go +++ b/pusher.go @@ -2,6 +2,7 @@ package m7s import ( "net/http" + "time" "m7s.live/v5/pkg" "m7s.live/v5/pkg/task" @@ -43,6 +44,30 @@ func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf c "maxRetry": conf.MaxRetry, }) pusher.SetRetry(conf.MaxRetry, conf.RetryInterval) + if sender := plugin.getHookSender(config.HookOnPushStart); sender != nil { + pusher.OnStart(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnPullStart, + "streamPath": streamPath, + "url": conf.URL, + "pluginName": plugin.Meta.Name, + "timestamp": time.Now().Unix(), + } + sender(config.HookOnPullStart, webhookData) + }) + } + + if sender := plugin.getHookSender(config.HookOnPushEnd); sender != nil { + pusher.OnDispose(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnPullEnd, + "streamPath": streamPath, + "reason": pusher.StopReason().Error(), + "timestamp": time.Now().Unix(), + } + sender(config.HookOnPullEnd, webhookData) + }) + } plugin.Server.Pushs.Add(p, plugin.Logger.With("pushURL", conf.URL, "streamPath", streamPath)) return p } diff --git a/recoder.go b/recoder.go index c554b07..c6303a3 100644 --- a/recoder.go +++ b/recoder.go @@ -103,6 +103,32 @@ func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, "fragment": conf.Fragment, }) recorder.SetRetry(-1, time.Second) + if sender := plugin.getHookSender(config.HookOnRecordStart); sender != nil { + recorder.OnStart(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnRecordStart, + "streamPath": streamPath, + "filePath": conf.FilePath, + "pluginName": plugin.Meta.Name, + "timestamp": time.Now().Unix(), + } + sender(config.HookOnRecordStart, webhookData) + }) + } + + if sender := plugin.getHookSender(config.HookOnRecordEnd); sender != nil { + recorder.OnDispose(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnRecordEnd, + "streamPath": streamPath, + "filePath": conf.FilePath, + "reason": recorder.StopReason().Error(), + "timestamp": time.Now().Unix(), + } + sender(config.HookOnRecordEnd, webhookData) + }) + } + plugin.Server.Records.Add(p, plugin.Logger.With("filePath", conf.FilePath, "streamPath", streamPath)) return p } diff --git a/transformer.go b/transformer.go index cece6b4..d554278 100644 --- a/transformer.go +++ b/transformer.go @@ -105,6 +105,28 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, pub *Publi "conf": conf, }) transformer.SetRetry(-1, time.Second*2) + if sender := plugin.getHookSender(config.HookOnTransformStart); sender != nil { + transformer.OnStart(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnTransformStart, + "streamPath": pub.StreamPath, + "pluginName": plugin.Meta.Name, + "timestamp": time.Now().Unix(), + } + sender(config.HookOnTransformStart, webhookData) + }) + } + if sender := plugin.getHookSender(config.HookOnTransformEnd); sender != nil { + transformer.OnDispose(func() { + webhookData := map[string]interface{}{ + "event": config.HookOnTransformEnd, + "streamPath": pub.StreamPath, + "reason": transformer.StopReason().Error(), + "timestamp": time.Now().Unix(), + } + sender(config.HookOnTransformEnd, webhookData) + }) + } plugin.Server.Transforms.AddTask(p, plugin.Logger.With("streamPath", pub.StreamPath)) return p } diff --git a/wait-stream.go b/wait_stream.go similarity index 100% rename from wait-stream.go rename to wait_stream.go