feat: add more hooks

This commit is contained in:
langhuihui
2025-05-22 10:03:13 +08:00
parent f475419b7b
commit 8d6bcc7b1b
20 changed files with 434 additions and 298 deletions

1
.gitignore vendored
View File

@@ -19,3 +19,4 @@ __debug*
example/default/*
!example/default/main.go
!example/default/config.yaml
shutdown.sh

209
api.go
View File

@@ -79,7 +79,7 @@ func (s *Server) DisabledPlugins(ctx context.Context, _ *emptypb.Empty) (res *pb
// /api/stream/annexb/{streamPath}
func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) {
publisher, ok := s.Streams.Get(r.PathValue("streamPath"))
publisher, ok := s.Streams.SafeGet(r.PathValue("streamPath"))
if !ok || publisher.VideoTrack.AVTrack == nil {
http.Error(rw, pkg.ErrNotFound.Error(), http.StatusNotFound)
return
@@ -195,18 +195,15 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
}
return nil
})
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
res, err = s.getStreamInfo(pub)
if err != nil {
return err
}
res.Data.Recording = recordings
} else {
err = pkg.ErrNotFound
if pub, ok := s.Streams.SafeGet(req.StreamPath); ok {
res, err = s.getStreamInfo(pub)
if err != nil {
return
}
return nil
})
res.Data.Recording = recordings
} else {
err = pkg.ErrNotFound
}
return
}
@@ -324,50 +321,47 @@ func (s *Server) GetSubscribers(context.Context, *pb.SubscribersRequest) (res *p
return
}
func (s *Server) AudioTrackSnap(_ context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() {
data := &pb.TrackSnapShotData{}
if pub.AudioTrack.Allocator != nil {
for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock
for _, block := range memlist.GetBlocks() {
list = append(list, &pb.MemoryBlock{
S: uint32(block.Start),
E: uint32(block.End),
})
}
data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
if pub, ok := s.Streams.SafeGet(req.StreamPath); ok && pub.HasAudioTrack() {
data := &pb.TrackSnapShotData{}
if pub.AudioTrack.Allocator != nil {
for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock
for _, block := range memlist.GetBlocks() {
list = append(list, &pb.MemoryBlock{
S: uint32(block.Start),
E: uint32(block.End),
})
}
data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
}
pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) {
if len(v.Wraps) > 0 {
var snap pb.TrackSnapShot
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
snap.WriteTime = timestamppb.New(v.WriteTime)
snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
snap.KeyFrame = v.IDR
data.RingDataSize += uint32(v.Wraps[0].GetSize())
for i, wrap := range v.Wraps {
snap.Wrap[i] = &pb.Wrap{
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
Size: uint32(wrap.GetSize()),
Data: wrap.String(),
}
}
data.Ring = append(data.Ring, &snap)
}
})
res = &pb.TrackSnapShotResponse{
Code: 0,
Message: "success",
Data: data,
}
} else {
err = pkg.ErrNotFound
}
return nil
})
pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) {
if len(v.Wraps) > 0 {
var snap pb.TrackSnapShot
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
snap.WriteTime = timestamppb.New(v.WriteTime)
snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
snap.KeyFrame = v.IDR
data.RingDataSize += uint32(v.Wraps[0].GetSize())
for i, wrap := range v.Wraps {
snap.Wrap[i] = &pb.Wrap{
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
Size: uint32(wrap.GetSize()),
Data: wrap.String(),
}
}
data.Ring = append(data.Ring, &snap)
}
})
res = &pb.TrackSnapShotResponse{
Code: 0,
Message: "success",
Data: data,
}
} else {
err = pkg.ErrNotFound
}
return
}
func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
@@ -437,50 +431,47 @@ func (s *Server) api_AudioTrack_SSE(rw http.ResponseWriter, r *http.Request) {
}
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() {
data := &pb.TrackSnapShotData{}
if pub.VideoTrack.Allocator != nil {
for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock
for _, block := range memlist.GetBlocks() {
list = append(list, &pb.MemoryBlock{
S: uint32(block.Start),
E: uint32(block.End),
})
}
data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
if pub, ok := s.Streams.SafeGet(req.StreamPath); ok && pub.HasVideoTrack() {
data := &pb.TrackSnapShotData{}
if pub.VideoTrack.Allocator != nil {
for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock
for _, block := range memlist.GetBlocks() {
list = append(list, &pb.MemoryBlock{
S: uint32(block.Start),
E: uint32(block.End),
})
}
data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
}
pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) {
if len(v.Wraps) > 0 {
var snap pb.TrackSnapShot
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
snap.WriteTime = timestamppb.New(v.WriteTime)
snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
snap.KeyFrame = v.IDR
data.RingDataSize += uint32(v.Wraps[0].GetSize())
for i, wrap := range v.Wraps {
snap.Wrap[i] = &pb.Wrap{
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
Size: uint32(wrap.GetSize()),
Data: wrap.String(),
}
}
data.Ring = append(data.Ring, &snap)
}
})
res = &pb.TrackSnapShotResponse{
Code: 0,
Message: "success",
Data: data,
}
} else {
err = pkg.ErrNotFound
}
return nil
})
pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) {
if len(v.Wraps) > 0 {
var snap pb.TrackSnapShot
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
snap.WriteTime = timestamppb.New(v.WriteTime)
snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
snap.KeyFrame = v.IDR
data.RingDataSize += uint32(v.Wraps[0].GetSize())
for i, wrap := range v.Wraps {
snap.Wrap[i] = &pb.Wrap{
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
Size: uint32(wrap.GetSize()),
Data: wrap.String(),
}
}
data.Ring = append(data.Ring, &snap)
}
})
res = &pb.TrackSnapShotResponse{
Code: 0,
Message: "success",
Data: data,
}
} else {
err = pkg.ErrNotFound
}
return
}
@@ -500,7 +491,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *pb.S
func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if subscriber, ok := s.Subscribers.Get(req.Id); ok {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
if pub, ok := s.Streams.SafeGet(req.StreamPath); ok {
subscriber.Publisher.RemoveSubscriber(subscriber)
subscriber.StreamPath = req.StreamPath
pub.AddSubscriber(subscriber)
@@ -527,7 +518,7 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res
func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Pause()
}
return nil
@@ -537,7 +528,7 @@ func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (re
func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Resume()
}
return nil
@@ -547,7 +538,7 @@ func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (r
func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
@@ -559,7 +550,7 @@ func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedReque
func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Seek(time.Unix(int64(req.TimeStamp), 0))
}
return nil
@@ -569,7 +560,7 @@ func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res
func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Stop(task.ErrStopByUser)
}
return nil
@@ -632,24 +623,18 @@ func (s *Server) Api_Summary_SSE(rw http.ResponseWriter, r *http.Request) {
func (s *Server) Api_Stream_Position_SSE(rw http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Query().Get("streamPath")
util.ReturnFetchValue(func() (t time.Time) {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(streamPath); ok {
t = pub.GetPosition()
}
return nil
})
if pub, ok := s.Streams.SafeGet(streamPath); ok {
t = pub.GetPosition()
}
return
}, rw, r)
}
// func (s *Server) Api_Vod_Position(rw http.ResponseWriter, r *http.Request) {
// streamPath := r.URL.Query().Get("streamPath")
// s.Streams.Call(func() error {
// if pub, ok := s.Streams.Get(streamPath); ok {
// t = pub.GetPosition()
// }
// return nil
// })
// if pub, ok := s.Streams.SafeGet(streamPath); ok {
// t = pub.GetPosition()
// }
// }
func (s *Server) Summary(context.Context, *emptypb.Empty) (res *pb.SummaryResponse, err error) {

View File

@@ -7,4 +7,9 @@ rtsp:
mp4:
enable: true
pull:
live/test: /Users/dexter/Movies/test.mp4
live/test: /Users/dexter/Movies/test.mp4
rtmp:
enable: true
debug:
enable: true

13
example/8081/default.yaml Normal file
View File

@@ -0,0 +1,13 @@
global:
# loglevel: debug
http:
listenaddr: :8081
listenaddrtls: :8555
tcp:
listenaddr: :50052
rtsp:
enable: false
rtmp:
tcp: :1936
webrtc:
enable: false

View File

@@ -16,11 +16,19 @@ const (
RelayModeRelay = "relay"
RelayModeMix = "mix"
HookOnPublish HookType = "publish"
HookOnSubscribe HookType = "subscribe"
HookOnPublishEnd HookType = "publish_end"
HookOnSubscribeEnd HookType = "subscribe_end"
HookOnServerKeepAlive HookType = "server_keep_alive"
HookOnPublishStart HookType = "publish_start"
HookOnPublishEnd HookType = "publish_end"
HookOnSubscribeStart HookType = "subscribe_start"
HookOnSubscribeEnd HookType = "subscribe_end"
HookOnPullStart HookType = "pull_start"
HookOnPullEnd HookType = "pull_end"
HookOnPushStart HookType = "push_start"
HookOnPushEnd HookType = "push_end"
HookOnRecordStart HookType = "record_start"
HookOnRecordEnd HookType = "record_end"
HookOnTransformStart HookType = "transform_start"
HookOnTransformEnd HookType = "transform_end"
)
type (
@@ -99,13 +107,13 @@ type (
Transform map[Regexp]Transform
}
Webhook struct {
URL string `yaml:"url" json:"url"` // Webhook 地址
Method string `yaml:"method" json:"method" default:"POST"` // HTTP 方法
Headers map[string]string `yaml:"headers" json:"headers"` // 自定义请求头
TimeoutSeconds int `yaml:"timeout" json:"timeout" default:"5"` // 超时时间(秒)
RetryTimes int `yaml:"retry" json:"retry" default:"3"` // 重试次数
RetryInterval time.Duration `yaml:"retryInterval" json:"retryInterval" default:"1s"` // 重试间隔
Interval int `yaml:"interval" json:"interval" default:"60"` // 保活间隔(秒)
URL string // Webhook 地址
Method string `default:"POST"` // HTTP 方法
Headers map[string]string // 自定义请求头
TimeoutSeconds int `default:"5"` // 超时时间(秒)
RetryTimes int `default:"3"` // 重试次数
RetryInterval time.Duration `default:"1s"` // 重试间隔
Interval int `default:"60"` // 保活间隔(秒)
}
Common struct {
PublicIP string

View File

@@ -32,14 +32,15 @@ func GetNextTaskID() uint32 {
// Job include tasks
type Job struct {
Task
cases []reflect.SelectCase
addSub chan ITask
children []ITask
lazyRun sync.Once
eventLoopLock sync.Mutex
childrenDisposed chan struct{}
childDisposeListeners []func(ITask)
blocked ITask
cases []reflect.SelectCase
addSub chan ITask
children []ITask
lazyRun sync.Once
eventLoopLock sync.Mutex
childrenDisposed chan struct{}
descendantsDisposeListeners []func(ITask)
descendantsStartListeners []func(ITask)
blocked ITask
}
func (*Job) GetTaskType() TaskType {
@@ -68,12 +69,12 @@ func (mt *Job) waitChildrenDispose() {
}
}
func (mt *Job) OnChildDispose(listener func(ITask)) {
mt.childDisposeListeners = append(mt.childDisposeListeners, listener)
func (mt *Job) OnDescendantsDispose(listener func(ITask)) {
mt.descendantsDisposeListeners = append(mt.descendantsDisposeListeners, listener)
}
func (mt *Job) onDescendantsDispose(descendants ITask) {
for _, listener := range mt.childDisposeListeners {
for _, listener := range mt.descendantsDisposeListeners {
listener(descendants)
}
if mt.parent != nil {
@@ -82,11 +83,28 @@ func (mt *Job) onDescendantsDispose(descendants ITask) {
}
func (mt *Job) onChildDispose(child ITask) {
if child.getParent() == mt {
if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" {
mt.onDescendantsDispose(child)
}
child.dispose()
if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" {
mt.onDescendantsDispose(child)
}
child.dispose()
}
func (mt *Job) OnDescendantsStart(listener func(ITask)) {
mt.descendantsStartListeners = append(mt.descendantsStartListeners, listener)
}
func (mt *Job) onDescendantsStart(descendants ITask) {
for _, listener := range mt.descendantsStartListeners {
listener(descendants)
}
if mt.parent != nil {
mt.parent.onDescendantsStart(descendants)
}
}
func (mt *Job) onChildStart(child ITask) {
if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" {
mt.onDescendantsStart(child)
}
}
@@ -211,9 +229,10 @@ func (mt *Job) run() {
if rev.IsNil() {
return
}
if mt.blocked = rev.Interface().(ITask); mt.blocked.getParent() != mt || mt.blocked.start() {
if mt.blocked = rev.Interface().(ITask); mt.blocked.start() {
mt.children = append(mt.children, mt.blocked)
mt.cases = append(mt.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.blocked.GetSignal())})
mt.onChildStart(mt.blocked)
}
} else {
taskIndex := chosen - 1
@@ -236,6 +255,7 @@ func (mt *Job) run() {
if mt.onChildDispose(mt.blocked); mt.blocked.checkRetry(mt.blocked.StopReason()) {
if mt.blocked.reset(); mt.blocked.start() {
mt.cases[chosen].Chan = reflect.ValueOf(mt.blocked.GetSignal())
mt.onChildStart(mt.blocked)
continue
}
}

View File

@@ -53,7 +53,6 @@ type (
ITask interface {
context.Context
keepalive() bool
getParent() *Job
GetParent() ITask
GetTask() *Task
GetTaskID() uint32
@@ -85,7 +84,8 @@ type (
getJob() *Job
AddTask(ITask, ...any) *Task
RangeSubTask(func(yield ITask) bool)
OnChildDispose(func(ITask))
OnDescendantsDispose(func(ITask))
OnDescendantsStart(func(ITask))
Blocked() ITask
Call(func() error, ...any)
Post(func() error, ...any) *Task
@@ -178,10 +178,6 @@ func (task *Task) GetTaskPointer() uintptr {
return uintptr(unsafe.Pointer(task))
}
func (task *Task) getParent() *Job {
return task.parent
}
func (task *Task) GetKey() uint32 {
return task.ID
}
@@ -435,6 +431,10 @@ func (task *Task) ResetRetryCount() {
task.retry.RetryCount = 0
}
func (task *Task) GetRetryCount() int {
return task.retry.RetryCount
}
func (task *Task) run(handler func() error) {
var err error
defer func() {

View File

@@ -142,6 +142,26 @@ func Test_Hooks(t *testing.T) {
root.AddTask(&task).WaitStopped()
}
type startFailTask struct {
Task
}
func (task *startFailTask) Start() error {
return errors.New("start failed")
}
func (task *startFailTask) Dispose() {
task.Logger.Info("Dispose")
}
func Test_StartFail(t *testing.T) {
var task startFailTask
root.AddTask(&task)
if err := task.WaitStarted(); err == nil {
t.Errorf("expected start to fail")
}
}
//
//type DemoTask struct {
// Task

178
plugin.go
View File

@@ -386,13 +386,13 @@ type WebHookTask struct {
task.Task
plugin *Plugin
hookType config.HookType
conf *config.Webhook
conf config.Webhook
data any
jsonData []byte
}
func (t *WebHookTask) Start() error {
if t.conf == nil || t.conf.URL == "" {
if t.conf.URL == "" {
return task.ErrTaskComplete
}
@@ -437,11 +437,11 @@ func (t *WebHookTask) Go() error {
return err
}
func (p *Plugin) SendWebhook(hookType config.HookType, conf config.Webhook, data any) *task.Task {
func (p *Plugin) SendWebhook(hookType config.HookType, data any) *task.Task {
webhookTask := &WebHookTask{
plugin: p,
hookType: hookType,
conf: &conf,
conf: p.config.Hook[hookType],
data: data,
}
return p.AddTask(webhookTask)
@@ -560,10 +560,31 @@ func (p *Plugin) PublishWithConfig(ctx context.Context, streamPath string, conf
}
err = p.Server.Streams.AddTask(publisher, ctx).WaitStarted()
if err == nil {
publisher.OnDispose(func() {
p.sendPublishEndWebhook(publisher)
})
p.sendPublishWebhook(publisher)
if sender := p.getHookSender(config.HookOnPublishEnd); sender != nil {
publisher.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPublishEnd,
"streamPath": publisher.StreamPath,
"publishId": publisher.ID,
"reason": publisher.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPublishEnd, webhookData)
})
}
if sender := p.getHookSender(config.HookOnPublishStart); sender != nil {
webhookData := map[string]interface{}{
"event": config.HookOnPublishStart,
"streamPath": publisher.StreamPath,
"args": publisher.Args,
"publishId": publisher.ID,
"remoteAddr": publisher.RemoteAddr,
"type": publisher.Type,
"pluginName": p.Meta.Name,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPublishStart, webhookData)
}
}
return
}
@@ -601,10 +622,34 @@ func (p *Plugin) SubscribeWithConfig(ctx context.Context, streamPath string, con
}
}
if err == nil {
subscriber.OnDispose(func() {
p.sendSubscribeEndWebhook(subscriber)
})
p.sendSubscribeWebhook(subscriber)
if sender := p.getHookSender(config.HookOnSubscribeEnd); sender != nil {
subscriber.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnSubscribeEnd,
"streamPath": subscriber.StreamPath,
"subscriberId": subscriber.ID,
"reason": subscriber.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
if subscriber.Publisher != nil {
webhookData["publishId"] = subscriber.Publisher.ID
}
sender(config.HookOnSubscribeEnd, webhookData)
})
}
if sender := p.getHookSender(config.HookOnSubscribeStart); sender != nil {
webhookData := map[string]interface{}{
"event": config.HookOnSubscribeStart,
"streamPath": subscriber.StreamPath,
"publishId": subscriber.Publisher.ID,
"subscriberId": subscriber.ID,
"remoteAddr": subscriber.RemoteAddr,
"type": subscriber.Type,
"args": subscriber.Args,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnSubscribeStart, webhookData)
}
}
return
}
@@ -715,90 +760,17 @@ func (p *Plugin) handle(pattern string, handler http.Handler) {
p.Server.apiList = append(p.Server.apiList, pattern)
}
func (p *Plugin) sendPublishWebhook(pub *Publisher) {
if p.config.Hook == nil {
return
func (p *Plugin) getHookSender(hookType config.HookType) (sender func(hookType config.HookType, data any) *task.Task) {
if p.config.Hook != nil {
if _, ok := p.config.Hook[hookType]; ok {
sender = p.SendWebhook
} else if p.Server.config.Hook != nil {
if _, ok := p.Server.config.Hook[hookType]; ok {
sender = p.Server.SendWebhook
}
}
}
webhookData := map[string]interface{}{
"event": "publish",
"streamPath": pub.StreamPath,
"args": pub.Args,
"publishId": pub.ID,
"remoteAddr": pub.RemoteAddr,
"type": pub.Type,
"pluginName": p.Meta.Name,
"timestamp": time.Now().Unix(),
}
p.SendWebhook(config.HookOnPublish, p.config.Hook[config.HookOnPublish], webhookData)
if p.Server.config.Hook == nil {
return
}
p.Server.SendWebhook(config.HookOnPublish, p.Server.config.Hook[config.HookOnPublish], webhookData)
}
func (p *Plugin) sendPublishEndWebhook(pub *Publisher) {
if p.config.Hook == nil {
return
}
webhookData := map[string]interface{}{
"event": "publish_end",
"streamPath": pub.StreamPath,
"publishId": pub.ID,
"reason": pub.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
p.SendWebhook(config.HookOnPublishEnd, p.config.Hook[config.HookOnPublishEnd], webhookData)
}
func (p *Plugin) sendSubscribeWebhook(sub *Subscriber) {
if p.config.Hook == nil {
return
}
webhookData := map[string]interface{}{
"event": "subscribe",
"streamPath": sub.StreamPath,
"publishId": sub.Publisher.ID,
"subscriberId": sub.ID,
"remoteAddr": sub.RemoteAddr,
"type": sub.Type,
"args": sub.Args,
"timestamp": time.Now().Unix(),
}
p.SendWebhook(config.HookOnSubscribe, p.config.Hook[config.HookOnSubscribe], webhookData)
}
func (p *Plugin) sendSubscribeEndWebhook(sub *Subscriber) {
if p.config.Hook == nil {
return
}
webhookData := map[string]interface{}{
"event": "subscribe_end",
"streamPath": sub.StreamPath,
"subscriberId": sub.ID,
"reason": sub.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
if sub.Publisher != nil {
webhookData["publishId"] = sub.Publisher.ID
}
p.SendWebhook(config.HookOnSubscribeEnd, p.config.Hook[config.HookOnSubscribeEnd], webhookData)
}
func (p *Plugin) sendServerKeepAliveWebhook() {
if p.config.Hook == nil {
return
}
s := p.Server
webhookData := map[string]interface{}{
"event": "server_keep_alive",
"timestamp": time.Now().Unix(),
"streams": s.Streams.Length,
"subscribers": s.Subscribers.Length,
"publisherCount": s.Streams.Length,
"subscriberCount": s.Subscribers.Length,
"uptime": time.Since(s.StartTime).Seconds(),
}
p.SendWebhook(config.HookOnServerKeepAlive, p.config.Hook[config.HookOnServerKeepAlive], webhookData)
return
}
type ServerKeepAliveTask struct {
@@ -811,5 +783,19 @@ func (t *ServerKeepAliveTask) GetTickInterval() time.Duration {
}
func (t *ServerKeepAliveTask) Tick(now any) {
t.plugin.sendServerKeepAliveWebhook()
sender := t.plugin.getHookSender(config.HookOnServerKeepAlive)
if sender == nil {
return
}
s := t.plugin.Server
webhookData := map[string]interface{}{
"event": config.HookOnServerKeepAlive,
"timestamp": time.Now().Unix(),
"streams": s.Streams.Length,
"subscribers": s.Subscribers.Length,
"publisherCount": s.Streams.Length,
"subscriberCount": s.Subscribers.Length,
"uptime": time.Since(s.StartTime).Seconds(),
}
sender(config.HookOnServerKeepAlive, webhookData)
}

View File

@@ -2,13 +2,14 @@ package plugin_monitor
import (
"encoding/json"
"os"
"strings"
"time"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/monitor/pb"
monitor "m7s.live/v5/plugin/monitor/pkg"
"os"
"strings"
"time"
)
var _ = m7s.InstallPlugin[MonitorPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)
@@ -65,7 +66,7 @@ func (cfg *MonitorPlugin) OnInit() (err error) {
cfg.Plugin.Server.OnBeforeDispose(func() {
cfg.saveTask(cfg.Plugin.Server)
})
cfg.Plugin.Server.OnChildDispose(cfg.saveTask)
cfg.Plugin.Server.OnDescendantsDispose(cfg.saveTask)
}
return
}

View File

@@ -371,20 +371,17 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord)
err = pkg.ErrRecordExists
return
}
p.Server.Streams.Call(func() error {
if stream, ok := p.Server.Streams.Get(req.StreamPath); ok {
recordConf := config.Record{
Append: false,
Fragment: fragment,
FilePath: filePath,
}
job := p.Record(stream, recordConf, nil)
res.Data = uint64(uintptr(unsafe.Pointer(job.GetTask())))
} else {
err = pkg.ErrNotFound
if stream, ok := p.Server.Streams.SafeGet(req.StreamPath); ok {
recordConf := config.Record{
Append: false,
Fragment: fragment,
FilePath: filePath,
}
return nil
})
job := p.Record(stream, recordConf, nil)
res.Data = uint64(uintptr(unsafe.Pointer(job.GetTask())))
} else {
err = pkg.ErrNotFound
}
return
}
@@ -432,28 +429,25 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) (
return nil
})
if tmpJob == nil { //为空表示没有正在进行的录制,也就是没有自动录像,则进行正常的事件录像
p.Server.Streams.Call(func() error {
if stream, ok := p.Server.Streams.Get(req.StreamPath); ok {
recordConf := config.Record{
Append: false,
Fragment: 0,
FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")),
}
//recordJob := recorder.GetRecordJob()
var subconfig config.Subscribe
defaults.SetDefaults(&subconfig)
subconfig.BufferTime = beforeDuration
recordJob := p.Record(stream, recordConf, &subconfig)
recordJob.EventId = req.EventId
recordJob.EventLevel = req.EventLevel
recordJob.EventName = req.EventName
recordJob.EventDesc = req.EventDesc
recordJob.AfterDuration = afterDuration
recordJob.BeforeDuration = beforeDuration
recordJob.Mode = m7s.RecordModeEvent
if stream, ok := p.Server.Streams.SafeGet(req.StreamPath); ok {
recordConf := config.Record{
Append: false,
Fragment: 0,
FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")),
}
return nil
})
//recordJob := recorder.GetRecordJob()
var subconfig config.Subscribe
defaults.SetDefaults(&subconfig)
subconfig.BufferTime = beforeDuration
recordJob := p.Record(stream, recordConf, &subconfig)
recordJob.EventId = req.EventId
recordJob.EventLevel = req.EventLevel
recordJob.EventName = req.EventName
recordJob.EventDesc = req.EventDesc
recordJob.AfterDuration = afterDuration
recordJob.BeforeDuration = beforeDuration
recordJob.Mode = m7s.RecordModeEvent
}
} else {
if tmpJob.AfterDuration != 0 { //当前有事件录像正在录制,则更新该录像的结束时间
tmpJob.AfterDuration = time.Duration(tmpJob.Subscriber.VideoReader.AbsTime)*time.Millisecond + afterDuration

View File

@@ -3,12 +3,13 @@ package rtmp
import (
"crypto/tls"
"errors"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"net"
"net/url"
"strings"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5"
)
@@ -53,7 +54,7 @@ func (c *Client) Start() (err error) {
return err
}
c.Init(conn)
c.Logger = c.Logger.With("local", conn.LocalAddr().String())
c.SetDescription("local", conn.LocalAddr().String())
c.Info("connect")
c.WriteChunkSize = c.chunkSize
c.AppName = strings.Join(ps[1:len(ps)-1], "/")

View File

@@ -31,8 +31,8 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) {
reader := avcc.NewReader()
lenReader := reader.NewReader()
reader.Skip(5)
lenReader.Skip(5)
var afterFilter util.Memory
lenReader.RangeN(5, afterFilter.AppendOne)
allocator := avcc.GetAllocator()
var hasBadNalu bool
for {
@@ -49,7 +49,12 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) {
reader.RangeN(int(naluLen), func(b []byte) {
naluBuffer = append(naluBuffer, b)
})
if badType := codec.ParseH264NALUType(naluBuffer[0][0]); badType > 9 {
badType := codec.ParseH264NALUType(naluBuffer[0][0])
switch badType {
case 5, 6, 1:
afterFilter.Append(lenBuffer...)
afterFilter.Append(naluBuffer...)
default:
hasBadNalu = true
if allocator != nil {
for _, nalu := range lenBuffer {
@@ -59,9 +64,6 @@ func (avcc *RTMPVideo) filterH264(naluSizeLen int) {
allocator.Free(nalu)
}
}
} else {
afterFilter.Append(lenBuffer...)
afterFilter.Append(naluBuffer...)
}
}
if hasBadNalu {
@@ -166,15 +168,15 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
return
}
} else {
// switch ctx := t.ICodecCtx.(type) {
// case *codec.H264Ctx:
// avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
// case *H265Ctx:
// avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
// }
// if avcc.Size == 0 {
// return ErrSkip
// }
switch ctx := t.ICodecCtx.(type) {
case *codec.H264Ctx:
avcc.filterH264(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
case *H265Ctx:
avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
}
if avcc.Size <= 5 {
return ErrSkip
}
}
}
return

View File

@@ -122,6 +122,33 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c
"maxRetry": conf.MaxRetry,
})
puller.SetRetry(conf.MaxRetry, conf.RetryInterval)
if sender := plugin.getHookSender(config.HookOnPullStart); sender != nil {
puller.OnStart(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPullStart,
"streamPath": streamPath,
"url": conf.URL,
"args": conf.Args,
"pluginName": plugin.Meta.Name,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPullStart, webhookData)
})
}
if sender := plugin.getHookSender(config.HookOnPullEnd); sender != nil {
puller.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPullEnd,
"streamPath": streamPath,
"reason": puller.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPullEnd, webhookData)
})
}
plugin.Server.Pulls.Add(p, plugin.Logger.With("pullURL", conf.URL, "streamPath", streamPath))
return p
}

View File

@@ -2,6 +2,7 @@ package m7s
import (
"net/http"
"time"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/task"
@@ -43,6 +44,30 @@ func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf c
"maxRetry": conf.MaxRetry,
})
pusher.SetRetry(conf.MaxRetry, conf.RetryInterval)
if sender := plugin.getHookSender(config.HookOnPushStart); sender != nil {
pusher.OnStart(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPullStart,
"streamPath": streamPath,
"url": conf.URL,
"pluginName": plugin.Meta.Name,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPullStart, webhookData)
})
}
if sender := plugin.getHookSender(config.HookOnPushEnd); sender != nil {
pusher.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnPullEnd,
"streamPath": streamPath,
"reason": pusher.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
sender(config.HookOnPullEnd, webhookData)
})
}
plugin.Server.Pushs.Add(p, plugin.Logger.With("pushURL", conf.URL, "streamPath", streamPath))
return p
}

View File

@@ -103,6 +103,32 @@ func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string,
"fragment": conf.Fragment,
})
recorder.SetRetry(-1, time.Second)
if sender := plugin.getHookSender(config.HookOnRecordStart); sender != nil {
recorder.OnStart(func() {
webhookData := map[string]interface{}{
"event": config.HookOnRecordStart,
"streamPath": streamPath,
"filePath": conf.FilePath,
"pluginName": plugin.Meta.Name,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnRecordStart, webhookData)
})
}
if sender := plugin.getHookSender(config.HookOnRecordEnd); sender != nil {
recorder.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnRecordEnd,
"streamPath": streamPath,
"filePath": conf.FilePath,
"reason": recorder.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
sender(config.HookOnRecordEnd, webhookData)
})
}
plugin.Server.Records.Add(p, plugin.Logger.With("filePath", conf.FilePath, "streamPath", streamPath))
return p
}

View File

@@ -105,6 +105,28 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, pub *Publi
"conf": conf,
})
transformer.SetRetry(-1, time.Second*2)
if sender := plugin.getHookSender(config.HookOnTransformStart); sender != nil {
transformer.OnStart(func() {
webhookData := map[string]interface{}{
"event": config.HookOnTransformStart,
"streamPath": pub.StreamPath,
"pluginName": plugin.Meta.Name,
"timestamp": time.Now().Unix(),
}
sender(config.HookOnTransformStart, webhookData)
})
}
if sender := plugin.getHookSender(config.HookOnTransformEnd); sender != nil {
transformer.OnDispose(func() {
webhookData := map[string]interface{}{
"event": config.HookOnTransformEnd,
"streamPath": pub.StreamPath,
"reason": transformer.StopReason().Error(),
"timestamp": time.Now().Unix(),
}
sender(config.HookOnTransformEnd, webhookData)
})
}
plugin.Server.Transforms.AddTask(p, plugin.Logger.With("streamPath", pub.StreamPath))
return p
}