diff --git a/api.go b/api.go index c0dac8b..3be5454 100644 --- a/api.go +++ b/api.go @@ -150,9 +150,9 @@ func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResp var fillData func(m task.ITask) *pb.TaskTreeResponse fillData = func(m task.ITask) (res *pb.TaskTreeResponse) { res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) { - for k, v := range m.GetTask().Description { - yield(k, fmt.Sprintf("%+v", v)) - } + m.GetTask().Description.Range(func(key, value any) bool { + return yield(key.(string), fmt.Sprintf("%+v", value)) + }) })} if job, ok := m.(task.IJob); ok { if blockedTask := job.Blocked(); blockedTask != nil { @@ -172,7 +172,12 @@ func (s *Server) GetSubscribers(context.Context, *pb.SubscribersRequest) (res *p s.Streams.Call(func() error { var subscribers []*pb.SubscriberSnapShot for subscriber := range s.Subscribers.Range { - meta, _ := json.Marshal(subscriber.Description) + metaData := make(task.Description) + subscriber.Description.Range(func(key, value any) bool { + metaData[key.(string)] = value + return true + }) + meta, _ := json.Marshal(metaData) snap := &pb.SubscriberSnapShot{ Id: subscriber.ID, StartTime: timestamppb.New(subscriber.StartTime), diff --git a/example/custom/config.yaml b/example/custom/config.yaml index f85bbed..ec2d4f5 100644 --- a/example/custom/config.yaml +++ b/example/custom/config.yaml @@ -13,7 +13,7 @@ transcode: ^live.+: output: - target: rtmp://localhost/trans/$0/small - conf: -loglevel debug -c:a aac -c:v h264 -vf scale=320:240 + conf: -loglevel debug -c:a aac -c:v h264_videotoolbox -vf scale=320:240 #mp4: # onpub: diff --git a/example/default/readflv.yaml b/example/default/readflv.yaml index b8a1b72..529c18f 100644 --- a/example/default/readflv.yaml +++ b/example/default/readflv.yaml @@ -1,8 +1,10 @@ global: - # loglevel: trace + loglevel: debug disableall: true console: enable: true +debug: + enable: true rtsp: enable: true tcp: :8554 diff --git a/pkg/config/tcp.go b/pkg/config/tcp.go index 68854a2..3ce028e 100644 --- a/pkg/config/tcp.go +++ b/pkg/config/tcp.go @@ -46,18 +46,14 @@ type TCP struct { func (config *TCP) CreateTCPWork(logger *slog.Logger, handler TCPHandler) *ListenTCPWork { ret := &ListenTCPWork{TCP: config, handler: handler} - ret.Description = task.Description{ - "listenAddr": config.ListenAddr, - } + ret.SetDescription("listenAddr", config.ListenAddr) ret.Logger = logger.With("addr", config.ListenAddr) return ret } func (config *TCP) CreateTCPTLSWork(logger *slog.Logger, handler TCPHandler) *ListenTCPTLSWork { ret := &ListenTCPTLSWork{ListenTCPWork{TCP: config, handler: handler}} - ret.Description = task.Description{ - "listenAddr": config.ListenAddrTLS, - } + ret.SetDescription("listenAddr", config.ListenAddrTLS) ret.Logger = logger.With("addr", config.ListenAddrTLS) return ret } diff --git a/pkg/task/call.go b/pkg/task/call.go index 214628b..f7807f5 100644 --- a/pkg/task/call.go +++ b/pkg/task/call.go @@ -20,7 +20,7 @@ func (t *CallBackTask) Dispose() { } } -func CreateTaskByCallBack(start func() error, dispose func()) ITask { +func CreateTaskByCallBack(start func() error, dispose func()) *CallBackTask { var task CallBackTask task.startHandler = func() error { err := start() diff --git a/pkg/task/job.go b/pkg/task/job.go index 9b2c623..18a7234 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -22,9 +22,11 @@ func GetNextTaskID() uint32 { // Job include tasks type Job struct { Task + cases []reflect.SelectCase addSub chan ITask children []ITask lazyRun sync.Once + eventLoopLock sync.Mutex childrenDisposed chan struct{} childDisposeListeners []func(ITask) blocked ITask @@ -43,7 +45,10 @@ func (mt *Job) Blocked() ITask { } func (mt *Job) waitChildrenDispose() { - close(mt.addSub) + if mt.blocked != nil { + mt.blocked.Stop(mt.StopReason()) + } + mt.addSub <- nil <-mt.childrenDisposed } @@ -62,24 +67,24 @@ func (mt *Job) onDescendantsDispose(descendants ITask) { func (mt *Job) onChildDispose(child ITask) { if child.getParent() == mt { - mt.onDescendantsDispose(child) + if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" { + mt.onDescendantsDispose(child) + } child.dispose() } } -func (mt *Job) dispose() { - if mt.childrenDisposed != nil { - mt.OnBeforeDispose(mt.waitChildrenDispose) - } - mt.Task.dispose() -} - func (mt *Job) RangeSubTask(callback func(task ITask) bool) { for _, task := range mt.children { callback(task) } } +func (mt *Job) AddDependTask(t ITask, opt ...any) (task *Task) { + mt.Depend(t) + return mt.AddTask(t, opt...) +} + func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { if task = t.GetTask(); t != task.handler { // first add for _, o := range opt { @@ -87,7 +92,9 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { case context.Context: task.parentCtx = v case Description: - task.Description = v + for k, v := range v { + task.Description.Store(k, v) + } case RetryConfig: task.retry = v case *slog.Logger: @@ -106,12 +113,15 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { } mt.lazyRun.Do(func() { - if mt.parent != nil && mt.Context == nil { - mt.parent.AddTask(mt.handler) // second add, lazy start + if mt.eventLoopLock.TryLock() { + defer mt.eventLoopLock.Unlock() + if mt.parent != nil && mt.Context == nil { + mt.parent.AddTask(mt.handler) // second add, lazy start + } + mt.childrenDisposed = make(chan struct{}) + mt.addSub = make(chan ITask, 20) + go mt.run() } - mt.childrenDisposed = make(chan struct{}) - mt.addSub = make(chan ITask, 10) - go mt.run() }) if task.Context == nil { if task.parentCtx == nil { @@ -133,7 +143,11 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { task.startup.Reject(mt.StopReason()) return } - + if len(mt.addSub) > 10 { + if mt.Logger != nil { + mt.Warn("task wait list too many", "count", len(mt.addSub)) + } + } mt.addSub <- t return } @@ -144,17 +158,14 @@ func (mt *Job) Call(callback func() error, args ...any) { func (mt *Job) Post(callback func() error, args ...any) *Task { task := CreateTaskByCallBack(callback, nil) - description := make(Description) if len(args) > 0 { - description[OwnerTypeKey] = args[0] - } else { - description = nil + task.SetDescription(OwnerTypeKey, args[0]) } - return mt.AddTask(task, description) + return mt.AddTask(task) } func (mt *Job) run() { - cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}} + mt.cases = []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}} defer func() { if !ThrowPanic { err := recover() @@ -175,33 +186,33 @@ func (mt *Job) run() { }() for { mt.blocked = nil - if chosen, rev, ok := reflect.Select(cases); chosen == 0 { - if !ok { + if chosen, rev, ok := reflect.Select(mt.cases); chosen == 0 { + if rev.IsNil() { return } if mt.blocked = rev.Interface().(ITask); mt.blocked.getParent() != mt || mt.blocked.start() { mt.children = append(mt.children, mt.blocked) - cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.blocked.GetSignal())}) + mt.cases = append(mt.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.blocked.GetSignal())}) } } else { taskIndex := chosen - 1 - child := mt.children[taskIndex] - switch tt := child.(type) { + mt.blocked = mt.children[taskIndex] + switch tt := mt.blocked.(type) { case IChannelTask: tt.Tick(rev.Interface()) if tt.IsStopped() { - mt.onChildDispose(child) + mt.onChildDispose(mt.blocked) } } if !ok { - if mt.onChildDispose(child); child.checkRetry(child.StopReason()) { - if child.reset(); child.start() { - cases[chosen].Chan = reflect.ValueOf(child.GetSignal()) + if mt.onChildDispose(mt.blocked); mt.blocked.checkRetry(mt.blocked.StopReason()) { + if mt.blocked.reset(); mt.blocked.start() { + mt.cases[chosen].Chan = reflect.ValueOf(mt.blocked.GetSignal()) continue } } mt.children = slices.Delete(mt.children, taskIndex, taskIndex+1) - cases = slices.Delete(cases, chosen, chosen+1) + mt.cases = slices.Delete(mt.cases, chosen, chosen+1) } } if !mt.handler.keepalive() && len(mt.children) == 0 { diff --git a/pkg/task/task.go b/pkg/task/task.go index 2dedc93..3d5b61c 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -8,6 +8,7 @@ import ( "reflect" "runtime/debug" "strings" + "sync" "time" "m7s.live/m7s/v5/pkg/util" @@ -63,6 +64,7 @@ type ( GetTaskType() TaskType GetOwnerType() string SetRetry(maxRetry int, retryInterval time.Duration) + Depend(ITask) OnStart(func()) OnBeforeDispose(func()) OnDispose(func()) @@ -111,12 +113,12 @@ type ( handler ITask retry RetryConfig afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func() - Description - startup, shutdown *util.Promise - parent *Job - parentCtx context.Context - state TaskState - level byte + Description sync.Map + startup, shutdown *util.Promise + parent *Job + parentCtx context.Context + state TaskState + level byte } ) @@ -144,10 +146,8 @@ func (task *Task) GetTaskID() uint32 { return task.ID } func (task *Task) GetOwnerType() string { - if task.Description != nil { - if ownerType, ok := task.Description[OwnerTypeKey]; ok { - return ownerType.(string) - } + if ownerType, ok := task.Description.Load(OwnerTypeKey); ok { + return ownerType.(string) } return strings.TrimSuffix(reflect.TypeOf(task.handler).Elem().Name(), "Task") } @@ -205,13 +205,19 @@ func (task *Task) Stop(err error) { panic("task stop with nil error") } if task.CancelCauseFunc != nil { - if task.Logger != nil { - task.Debug("task stop", "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) + if tt := task.handler.GetTaskType(); task.Logger != nil && tt != TASK_TYPE_CALL { + task.Debug("task stop", "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType()) } task.CancelCauseFunc(err) } } +func (task *Task) Depend(t ITask) { + t.OnDispose(func() { + task.Stop(t.StopReason()) + }) +} + func (task *Task) OnStart(listener func()) { task.afterStartListeners = append(task.afterStartListeners, listener) } @@ -232,8 +238,12 @@ func (task *Task) checkRetry(err error) bool { if errors.Is(err, ErrTaskComplete) || errors.Is(err, ErrExit) { return false } + if task.parent.IsStopped() { + return false + } if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry { task.retry.RetryCount++ + task.SetDescription("retryCount", task.retry.RetryCount) if task.Logger != nil { if task.retry.MaxRetry < 0 { task.Warn(fmt.Sprintf("retry %d/∞", task.retry.RetryCount)) @@ -270,8 +280,8 @@ func (task *Task) start() bool { } for { task.StartTime = time.Now() - if task.Logger != nil { - task.Debug("task start", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) + if tt := task.handler.GetTaskType(); task.Logger != nil && tt != TASK_TYPE_CALL { + task.Debug("task start", "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType()) } task.state = TASK_STATE_STARTING if v, ok := task.handler.(TaskStarter); ok { @@ -320,6 +330,16 @@ func (task *Task) reset() { task.startup = util.NewPromise(task.Context) } +func (task *Task) SetDescription(key string, value any) { + task.Description.Store(key, value) +} + +func (task *Task) SetDescriptions(value Description) { + for k, v := range value { + task.Description.Store(k, v) + } +} + func (task *Task) dispose() { if task.state < TASK_STATE_STARTED { return @@ -327,21 +347,39 @@ func (task *Task) dispose() { reason := task.StopReason() task.state = TASK_STATE_DISPOSING if task.Logger != nil { - taskType, ownerType := task.GetTaskType(), task.GetOwnerType() - yargs := []any{"reason", reason, "taskId", task.ID, "taskType", taskType, "ownerType", ownerType} - task.Debug("task dispose", yargs...) - defer task.Debug("task disposed", yargs...) + taskType, ownerType := task.handler.GetTaskType(), task.GetOwnerType() + if taskType != TASK_TYPE_CALL { + yargs := []any{"reason", reason, "taskId", task.ID, "taskType", taskType, "ownerType", ownerType} + task.Debug("task dispose", yargs...) + defer task.Debug("task disposed", yargs...) + } } - for _, listener := range task.beforeDisposeListeners { + befores := len(task.beforeDisposeListeners) + for i, listener := range task.beforeDisposeListeners { + task.SetDescription("disposeProcess", fmt.Sprintf("b:%d/%d", i, befores)) listener() } + if job, ok := task.handler.(IJob); ok { + mt := job.getJob() + task.SetDescription("disposeProcess", "wait children") + mt.eventLoopLock.Lock() + if mt.addSub != nil { + mt.waitChildrenDispose() + mt.lazyRun = sync.Once{} + } + mt.eventLoopLock.Unlock() + } + task.SetDescription("disposeProcess", "self") if v, ok := task.handler.(TaskDisposal); ok { v.Dispose() } task.shutdown.Fulfill(reason) - for _, listener := range task.afterDisposeListeners { + afters := len(task.afterDisposeListeners) + for i, listener := range task.afterDisposeListeners { + task.SetDescription("disposeProcess", fmt.Sprintf("a:%d/%d", i, afters)) listener() } + task.SetDescription("disposeProcess", "done") task.state = TASK_STATE_DISPOSED } diff --git a/pkg/util/mem.go b/pkg/util/mem.go index 7208679..38451a5 100644 --- a/pkg/util/mem.go +++ b/pkg/util/mem.go @@ -145,6 +145,7 @@ func (sma *ScalableMemoryAllocator) Recycle() { for _, child := range sma.children { child.Recycle() } + sma.children = nil } // Borrow = Malloc + Free = Find, must use the memory at once diff --git a/plugin.go b/plugin.go index d081ab3..9d68d58 100644 --- a/plugin.go +++ b/plugin.go @@ -122,9 +122,9 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin) } } p.Config.ParseUserFile(userConfig) - p.Description = map[string]any{"version": plugin.Version} + p.SetDescription("version", plugin.Version) if userConfig != nil { - p.Description["userConfig"] = userConfig + p.SetDescription("userConfig", userConfig) } finalConfig, _ := yaml.Marshal(p.Config.GetMap()) p.Logger.Handler().(*MultiLogHandler).SetLevel(ParseLevel(p.config.LogLevel)) @@ -288,12 +288,10 @@ func (p *Plugin) Start() (err error) { } } s.Plugins.Add(p) - err = p.listen() - if err != nil { + if err = p.listen(); err != nil { return } - err = p.handler.OnInit() - if err != nil { + if err = p.handler.OnInit(); err != nil { return } return @@ -304,36 +302,26 @@ func (p *Plugin) Dispose() { p.Server.Plugins.Remove(p) } -func (p *Plugin) stopOnError(t task.ITask) { - p.AddTask(t).OnDispose(func() { - p.Stop(t.StopReason()) - }) -} - func (p *Plugin) listen() (err error) { httpConf := &p.config.HTTP if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.Server.config.HTTP.ListenAddrTLS) { - p.stopOnError(httpConf.CreateHTTPSWork(p.Logger)) + p.AddDependTask(httpConf.CreateHTTPSWork(p.Logger)) } if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.Server.config.HTTP.ListenAddr) { - p.stopOnError(httpConf.CreateHTTPWork(p.Logger)) + p.AddDependTask(httpConf.CreateHTTPWork(p.Logger)) } if tcphandler, ok := p.handler.(ITCPPlugin); ok { tcpConf := &p.config.TCP if tcpConf.ListenAddr != "" && tcpConf.AutoListen { - task := tcpConf.CreateTCPWork(p.Logger, tcphandler.OnTCPConnect) - err = p.AddTask(task).WaitStarted() - if err != nil { + if err = p.AddTask(tcpConf.CreateTCPWork(p.Logger, tcphandler.OnTCPConnect)).WaitStarted(); err != nil { return } } if tcpConf.ListenAddrTLS != "" && tcpConf.AutoListen { - task := tcpConf.CreateTCPTLSWork(p.Logger, tcphandler.OnTCPConnect) - err = p.AddTask(task).WaitStarted() - if err != nil { + if err = p.AddTask(tcpConf.CreateTCPTLSWork(p.Logger, tcphandler.OnTCPConnect)).WaitStarted(); err != nil { return } } @@ -342,9 +330,7 @@ func (p *Plugin) listen() (err error) { if udpHandler, ok := p.handler.(IUDPPlugin); ok { udpConf := &p.config.UDP if udpConf.ListenAddr != "" && udpConf.AutoListen { - task := udpConf.CreateUDPWork(p.Logger, udpHandler.OnUDPConnect) - err = p.AddTask(task).WaitStarted() - if err != nil { + if err = p.AddTask(udpConf.CreateUDPWork(p.Logger, udpHandler.OnUDPConnect)).WaitStarted(); err != nil { return } } @@ -353,8 +339,7 @@ func (p *Plugin) listen() (err error) { if quicHandler, ok := p.handler.(IQUICPlugin); ok { quicConf := &p.config.Quic if quicConf.ListenAddr != "" && quicConf.AutoListen { - task := quicConf.CreateQUICWork(p.Logger, quicHandler.OnQUICConnect) - err = p.AddTask(task).WaitStarted() + err = p.AddTask(quicConf.CreateQUICWork(p.Logger, quicHandler.OnQUICConnect)).WaitStarted() } } return diff --git a/plugin/flv/pkg/pull.go b/plugin/flv/pkg/pull.go index ac57409..e0a7dc2 100644 --- a/plugin/flv/pkg/pull.go +++ b/plugin/flv/pkg/pull.go @@ -16,9 +16,7 @@ type Puller struct { func NewPuller(_ config.Pull) m7s.IPuller { p := &Puller{} - p.Description = map[string]any{ - task.OwnerTypeKey: "FlvPuller", - } + p.SetDescription(task.OwnerTypeKey, "FlvPuller") return p } diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index 34b5603..1a4c019 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -3,11 +3,12 @@ package plugin_rtmp import ( "errors" "io" + "net" + "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5/plugin/rtmp/pb" . "m7s.live/m7s/v5/plugin/rtmp/pkg" - "net" ) type RTMPPlugin struct { @@ -55,7 +56,7 @@ func (task *RTMPServer) Go() (err error) { task.Debug("recv cmd", "commandName", cmd.CommandName, "streamID", msg.MessageStreamID) switch cmd := msg.MsgData.(type) { case *CallMessage: //connect - task.Description = cmd.Object + task.SetDescriptions(cmd.Object) app := cmd.Object["app"] // 客户端要连接到的服务应用名 objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法 switch v := objectEncoding.(type) { @@ -147,9 +148,7 @@ func (task *RTMPServer) Go() (err error) { if err != nil { task.Error("sendMessage publish", "error", err) } else { - publisher.OnDispose(func() { - task.Stop(publisher.StopReason()) - }) + task.Depend(publisher) } case *PlayMessage: streamPath := task.AppName + "/" + cmd.StreamName diff --git a/plugin/rtmp/pkg/client.go b/plugin/rtmp/pkg/client.go index eac485a..a517947 100644 --- a/plugin/rtmp/pkg/client.go +++ b/plugin/rtmp/pkg/client.go @@ -86,9 +86,7 @@ func NewPuller(_ config.Pull) m7s.IPuller { chunkSize: 4096, } ret.NetConnection = &NetConnection{} - ret.Description = task.Description{ - task.OwnerTypeKey: "RTMPPuller", - } + ret.SetDescription(task.OwnerTypeKey, "RTMPPuller") return ret } @@ -98,9 +96,7 @@ func NewPusher() m7s.IPusher { chunkSize: 4096, } ret.NetConnection = &NetConnection{} - ret.Description = task.Description{ - task.OwnerTypeKey: "RTMPPusher", - } + ret.SetDescription(task.OwnerTypeKey, "RTMPPusher") return ret } @@ -138,7 +134,7 @@ func (c *Client) Run() (err error) { case Response_Result, Response_OnStatus: switch response := msg.MsgData.(type) { case *ResponseMessage: - c.Description = response.Properties + c.SetDescriptions(response.Properties) if response.Infomation["code"] == NetConnection_Connect_Success { err = c.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) if err == nil { diff --git a/plugin/rtmp/pkg/net-stream.go b/plugin/rtmp/pkg/net-stream.go index b327d72..ee05e37 100644 --- a/plugin/rtmp/pkg/net-stream.go +++ b/plugin/rtmp/pkg/net-stream.go @@ -69,5 +69,5 @@ func (ns *NetStream) BeginPlay(tid uint64) (err error) { func (ns *NetStream) Subscribe(suber *m7s.Subscriber) { audio, video := ns.CreateSender(false) - ns.AddTask(m7s.CreatePlayTask(suber, audio.HandleAudio, video.HandleVideo)) + go m7s.PlayBlock(suber, audio.HandleAudio, video.HandleVideo) } diff --git a/plugin/rtsp/pkg/client.go b/plugin/rtsp/pkg/client.go index f76eca2..938cace 100644 --- a/plugin/rtsp/pkg/client.go +++ b/plugin/rtsp/pkg/client.go @@ -5,7 +5,6 @@ import ( "m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg/util" ) const ( @@ -42,9 +41,7 @@ func NewPuller(_ config.Pull) m7s.IPuller { direction: DIRECTION_PULL, } client.NetConnection = &NetConnection{} - client.Description = map[string]any{ - task.OwnerTypeKey: "RTSPPuller", - } + client.SetDescription(task.OwnerTypeKey, "RTSPPuller") return client } @@ -53,14 +50,11 @@ func NewPusher() m7s.IPusher { direction: DIRECTION_PUSH, } client.NetConnection = &NetConnection{} - client.Description = map[string]any{ - task.OwnerTypeKey: "RTSPPusher", - } + client.SetDescription(task.OwnerTypeKey, "RTSPPusher") return client } func (c *Client) Run() (err error) { - c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12) if err = c.Options(); err != nil { return } diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index cdbe1cb..92d2ccf 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -139,9 +139,8 @@ func (c *NetConnection) Connect(remoteURL string) (err error) { c.URL = rtspURL c.UserAgent = "monibuca" + m7s.Version c.auth = util.NewAuth(c.URL.User) - if c.Description != nil { - c.Description["remoteAddr"] = conn.RemoteAddr().String() - } + c.SetDescription("remoteAddr", conn.RemoteAddr().String()) + c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12) // c.Backchannel = true return } @@ -188,6 +187,7 @@ func (c *NetConnection) ReadRequest() (req *util.Request, err error) { if err != nil { return } + c.SetDescription("lastReq", req.Method) c.Debug("<-", "req", req.String()) return } @@ -229,6 +229,7 @@ func (c *NetConnection) WriteResponse(res *util.Response) (err error) { return err } resStr := res.String() + c.SetDescription("lastRes", res.Request.Method) c.Debug("->", "res", resStr) _, err = c.conn.Write([]byte(resStr)) return diff --git a/plugin/rtsp/pkg/net-stream.go b/plugin/rtsp/pkg/net-stream.go index 51946ca..28c33b2 100644 --- a/plugin/rtsp/pkg/net-stream.go +++ b/plugin/rtsp/pkg/net-stream.go @@ -235,7 +235,7 @@ func (c *Stream) Teardown() (err error) { return c.WriteRequest(&util.Request{Method: MethodTeardown, URL: c.URL}) } -func (ns *Stream) Dispose() { - _ = ns.Teardown() - ns.NetConnection.Dispose() -} +//func (ns *Stream) Dispose() { +// //_ = ns.Teardown() +// ns.NetConnection.Dispose() +//} diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index 0fdd8f9..350ea02 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -88,11 +88,11 @@ func (s *Sender) sendRTP(pack *mrtp.RTPData, channel int) (err error) { } func (s *Sender) Send() (err error) { - s.Stream.AddTask(m7s.CreatePlayTask(s.Subscriber, func(audio *mrtp.Audio) error { + go m7s.PlayBlock(s.Subscriber, func(audio *mrtp.Audio) error { return s.sendRTP(&audio.RTPData, s.AudioChannelID) }, func(video *mrtp.Video) error { return s.sendRTP(&video.RTPData, s.VideoChannelID) - })) + }) for err == nil { _, _, err = s.NetConnection.Receive(true) } diff --git a/plugin/rtsp/server.go b/plugin/rtsp/server.go index ab27b1d..bc5793c 100644 --- a/plugin/rtsp/server.go +++ b/plugin/rtsp/server.go @@ -88,9 +88,7 @@ func (task *RTSPServer) Go() (err error) { if err = task.WriteResponse(res); err != nil { return } - receiver.Publisher.OnDispose(func() { - task.Stop(receiver.Publisher.StopReason()) - }) + task.Depend(receiver.Publisher) case MethodDescribe: sendMode = true sender = &Sender{} diff --git a/plugin/srt/index.go b/plugin/srt/index.go index 87956c2..9be3bc2 100644 --- a/plugin/srt/index.go +++ b/plugin/srt/index.go @@ -73,6 +73,6 @@ func (t *SRTServer) OnStop() { t.server.Shutdown() } -func (t *SRTServer) Run() error { +func (t *SRTServer) Go() error { return t.server.ListenAndServe() } diff --git a/plugin/transcode/pkg/cmd.go b/plugin/transcode/pkg/cmd.go index 1a41ee0..6240dc6 100644 --- a/plugin/transcode/pkg/cmd.go +++ b/plugin/transcode/pkg/cmd.go @@ -14,11 +14,9 @@ type CommandTask struct { } func (ct *CommandTask) Start() (err error) { - ct.Description = task.Description{ - "cmd": ct.Cmd.String(), - } + ct.SetDescription("cmd", ct.Cmd.String()) if ct.logFileName != "" { - ct.Description["log"] = ct.logFileName + ct.SetDescription("log", ct.logFileName) ct.logFile, err = os.OpenFile(ct.logFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { ct.Error("Could not create transcode log", "err", err) @@ -35,12 +33,13 @@ func (ct *CommandTask) Start() (err error) { } ct.Info("start exec", "cmd", ct.Cmd.String()) err = ct.Cmd.Start() - ct.Description["pid"] = ct.Cmd.Process.Pid + ct.SetDescription("pid", ct.Cmd.Process.Pid) return } func (ct *CommandTask) Dispose() { - _ = ct.Cmd.Process.Kill() + err := ct.Cmd.Process.Kill() + ct.Info("kill", "err", err) if ct.logFile != nil { _ = ct.logFile.Close() } diff --git a/plugin/transcode/pkg/transform.go b/plugin/transcode/pkg/transform.go index f8a5bf9..c1f65f6 100644 --- a/plugin/transcode/pkg/transform.go +++ b/plugin/transcode/pkg/transform.go @@ -1,6 +1,7 @@ package transcode import ( + "bufio" "fmt" "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/config" @@ -46,9 +47,8 @@ type ( func NewTransform() m7s.ITransformer { ret := &Transformer{} - ret.Description = map[string]any{ - task.OwnerTypeKey: "Transcode", - } + ret.SetDescription(task.OwnerTypeKey, "Transcode") + var bufferFull time.Time ret.WriteFlvTag = func(flv net.Buffers) (err error) { var buffer []byte for _, b := range flv { @@ -56,8 +56,12 @@ func NewTransform() m7s.ITransformer { } select { case ret.rBuf <- buffer: + bufferFull = time.Now() default: ret.Warn("pipe input buffer full") + if time.Since(bufferFull) > time.Second*5 { + ret.Stop(bufio.ErrBufferFull) + } } return } @@ -68,7 +72,6 @@ type Transformer struct { m7s.DefaultTransformer TransRule rBuf chan []byte - *util.BufReader flv.Live } @@ -127,23 +130,19 @@ func (t *Transformer) Start() (err error) { args = append(args, to.Target) } } - t.Description = task.Description{ - "cmd": args, - "config": t.TransRule, - } + t.SetDescription("cmd", args) + t.SetDescription("config", t.TransRule) t.rBuf = make(chan []byte, 100) - t.BufReader = util.NewBufReaderChan(t.rBuf) t.Subscriber = t.TransformJob.Subscriber //t.BufReader.Dump, err = os.OpenFile("dump.flv", os.O_CREATE|os.O_WRONLY, 0644) var cmdTask CommandTask cmdTask.logFileName = fmt.Sprintf("logs/transcode_%s_%s.log", strings.ReplaceAll(t.TransformJob.StreamPath, "/", "_"), time.Now().Format("20060102150405")) cmdTask.Cmd = exec.CommandContext(t, "ffmpeg", args...) - cmdTask.Cmd.Stdin = t.BufReader + cmdTask.Cmd.Stdin = util.NewBufReaderChan(t.rBuf) t.AddTask(&cmdTask) return } func (t *Transformer) Dispose() { close(t.rBuf) - t.BufReader.Recycle() } diff --git a/plugin/webrtc/api.go b/plugin/webrtc/api.go index 5b67411..cc052aa 100644 --- a/plugin/webrtc/api.go +++ b/plugin/webrtc/api.go @@ -186,7 +186,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) { if videoSender == nil { suber.SubVideo = false } - conn.AddTask(m7s.CreatePlayTask(suber, func(frame *mrtp.Audio) (err error) { + go m7s.PlayBlock(suber, func(frame *mrtp.Audio) (err error) { for _, p := range frame.Packets { if err = audioTLSRTP.WriteRTP(p); err != nil { return @@ -200,7 +200,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) { } } return nil - })) + }) } conn.OnICECandidate(func(ice *ICECandidate) { if ice != nil { diff --git a/publisher.go b/publisher.go index 00ad8b1..33febee 100644 --- a/publisher.go +++ b/publisher.go @@ -2,6 +2,7 @@ package m7s import ( "context" + "fmt" "math" "os" "path/filepath" @@ -110,6 +111,7 @@ func (t *AVTracks) Dispose() { t.Lock() defer t.Unlock() for track := range t.Range { + track.Ready(ErrDiscard) if track == t.AVTrack || track.RingWriter != t.AVTrack.RingWriter { track.Dispose() } @@ -188,7 +190,7 @@ func (p *Publisher) Start() (err error) { for plugin := range s.Plugins.Range { plugin.OnPublish(p) } - s.Transforms.PublishEvent <- p + //s.Transforms.PublishEvent <- p p.AddTask(&PublishTimeout{Publisher: p}) if p.PublishTimeout > 0 { p.AddTask(&PublishNoDataTimeout{Publisher: p}) @@ -572,8 +574,14 @@ func (p *Publisher) takeOver(old *Publisher) { } old.Stop(ErrKick) p.Info("takeOver", "old", old.ID) - for subscriber := range old.SubscriberRange { - p.AddSubscriber(subscriber) + if old.Subscribers.Length > 0 { + p.Info(fmt.Sprintf("subscriber +%d", old.Subscribers.Length)) + for subscriber := range old.SubscriberRange { + subscriber.Publisher = p + if subscriber.BufferTime > p.BufferTime { + p.BufferTime = subscriber.BufferTime + } + } } old.AudioTrack.Dispose() old.VideoTrack.Dispose() diff --git a/puller.go b/puller.go index 40c1eef..3511604 100644 --- a/puller.go +++ b/puller.go @@ -102,13 +102,13 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c } p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy, conf.Header) p.puller = puller - p.Description = map[string]any{ + p.SetDescriptions(task.Description{ "plugin": plugin.Meta.Name, "streamPath": streamPath, "url": conf.URL, "args": conf.Args, "maxRetry": conf.MaxRetry, - } + }) puller.SetRetry(conf.MaxRetry, conf.RetryInterval) plugin.Server.Pulls.Add(p, plugin.Logger.With("pullURL", conf.URL, "streamPath", streamPath)) return p diff --git a/pusher.go b/pusher.go index d80fce3..74e07c2 100644 --- a/pusher.go +++ b/pusher.go @@ -27,12 +27,12 @@ func (p *PushJob) GetKey() string { func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push) *PushJob { p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy, conf.Header) p.pusher = pusher - p.Description = map[string]any{ + p.SetDescriptions(task.Description{ "plugin": plugin.Meta.Name, "streamPath": streamPath, "url": conf.URL, "maxRetry": conf.MaxRetry, - } + }) pusher.SetRetry(conf.MaxRetry, conf.RetryInterval) plugin.Server.Pushs.Add(p, plugin.Logger.With("pushURL", conf.URL, "streamPath", streamPath)) return p diff --git a/recoder.go b/recoder.go index b542483..94923c8 100644 --- a/recoder.go +++ b/recoder.go @@ -64,13 +64,13 @@ func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, p.FilePath = conf.FilePath p.StreamPath = streamPath p.recorder = recorder - p.Description = map[string]any{ + p.SetDescriptions(task.Description{ "plugin": plugin.Meta.Name, "streamPath": streamPath, "filePath": conf.FilePath, "append": conf.Append, "fragment": conf.Fragment, - } + }) plugin.Server.Records.Add(p, plugin.Logger.With("filePath", conf.FilePath, "streamPath", streamPath)) return p } @@ -84,7 +84,7 @@ func (p *RecordJob) Start() (err error) { if p.Fragment == 0 || p.Append { dir = filepath.Dir(p.FilePath) } - p.Description["filePath"] = p.FilePath + p.SetDescription("filePath", p.FilePath) if err = os.MkdirAll(dir, 0755); err != nil { return } diff --git a/server.go b/server.go index 80ad96e..4bfdce3 100644 --- a/server.go +++ b/server.go @@ -116,14 +116,14 @@ func NewServer(conf any) (s *Server) { } s.ID = task.GetNextTaskID() s.Meta = &serverMeta - s.Description = map[string]any{ + s.SetDescriptions(task.Description{ "version": Version, "goVersion": sysruntime.Version(), "os": sysruntime.GOOS, "arch": sysruntime.GOARCH, "cpus": int32(sysruntime.NumCPU()), - } - s.Transforms.PublishEvent = make(chan *Publisher, 10) + }) + //s.Transforms.PublishEvent = make(chan *Publisher, 10) s.prometheusDesc.init() return } @@ -230,10 +230,10 @@ func (s *Server) Start() (err error) { } } if httpConf.ListenAddrTLS != "" { - s.stopOnError(httpConf.CreateHTTPSWork(s.Logger)) + s.AddDependTask(httpConf.CreateHTTPSWork(s.Logger)) } if httpConf.ListenAddr != "" { - s.stopOnError(httpConf.CreateHTTPWork(s.Logger)) + s.AddDependTask(httpConf.CreateHTTPWork(s.Logger)) } var grpcServer *GRPCServer if tcpConf.ListenAddr != "" { diff --git a/subscriber.go b/subscriber.go index ea4180c..f689150 100644 --- a/subscriber.go +++ b/subscriber.go @@ -36,11 +36,11 @@ func (ps *PubSubBase) Init(streamPath string, conf any) { if u, err := url.Parse(streamPath); err == nil { ps.StreamPath, ps.Args = u.Path, u.Query() } - ps.Description = map[string]any{ + ps.SetDescriptions(task.Description{ "streamPath": ps.StreamPath, "args": ps.Args, "plugin": ps.Plugin.Meta.Name, - } + }) // args to config if len(ps.Args) != 0 { ignores, cc := make(map[string]struct{}), make(map[string]any) @@ -88,7 +88,7 @@ func (s *Subscriber) Start() (err error) { s.Info("subscribe") if publisher, ok := server.Streams.Get(s.StreamPath); ok { publisher.AddSubscriber(s) - return publisher.WaitTrack() + return } else { for reg, streamPath := range server.StreamAlias { if g := reg.FindStringSubmatch(s.StreamPath); len(g) > 0 { @@ -96,9 +96,9 @@ func (s *Subscriber) Start() (err error) { streamPath = strings.ReplaceAll(streamPath, fmt.Sprintf("$%d", i), gg) } if publisher, ok = server.Streams.Get(streamPath); ok { - s.Description["alias"] = streamPath + s.SetDescription("alias", streamPath) publisher.AddSubscriber(s) - return publisher.WaitTrack() + return } } } @@ -223,20 +223,20 @@ func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time. } type SubscribeHandler[A any, V any] struct { - task.Task + //task.Task s *Subscriber OnAudio func(A) error OnVideo func(V) error ProcessAudio, ProcessVideo chan func(*AVFrame) } -func CreatePlayTask[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) task.ITask { - return &SubscribeHandler[A, V]{ - s: s, - OnAudio: onAudio, - OnVideo: onVideo, - } -} +//func Play[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) { +// s.AddTask(&SubscribeHandler[A, V]{ +// s: s, +// OnAudio: onAudio, +// OnVideo: onVideo, +// }) +//} func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) (err error) { handler := &SubscribeHandler[A, V]{ @@ -244,12 +244,13 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( OnAudio: onAudio, OnVideo: onVideo, } - err = handler.Start() + err = handler.Run() s.Stop(err) return } -func (handler *SubscribeHandler[A, V]) Start() (err error) { +func (handler *SubscribeHandler[A, V]) Run() (err error) { + handler.s.SetDescription("play", time.Now()) var a1, v1 reflect.Type s := handler.s startAudioTs, startVideoTs := s.StartAudioTS, s.StartVideoTS @@ -271,6 +272,7 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) { if s.VideoReader != nil { s.VideoReader.StopRead() } + handler.s.SetDescription("stopPlay", time.Now()) }() sendAudioFrame := func() (err error) { if awi >= 0 { diff --git a/transformer.go b/transformer.go index d69aebd..6bf4ac4 100644 --- a/transformer.go +++ b/transformer.go @@ -3,6 +3,7 @@ package m7s import ( "context" "slices" + "time" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/config" @@ -37,7 +38,7 @@ type ( Transforms struct { task.Work util.Collection[string, *TransformedMap] - PublishEvent chan *Publisher + //PublishEvent chan *Publisher } TransformsPublishEvent struct { task.ChannelTask @@ -45,18 +46,18 @@ type ( } ) -func (t *TransformsPublishEvent) GetSignal() any { - return t.Transforms.PublishEvent -} - -func (t *TransformsPublishEvent) Tick(pub any) { - incomingPublisher := pub.(*Publisher) - for job := range t.Transforms.Search(func(m *TransformedMap) bool { - return m.StreamPath == incomingPublisher.StreamPath - }) { - job.TransformJob.TransformPublished(incomingPublisher) - } -} +//func (t *TransformsPublishEvent) GetSignal() any { +// return t.Transforms.PublishEvent +//} +// +//func (t *TransformsPublishEvent) Tick(pub any) { +// incomingPublisher := pub.(*Publisher) +// for job := range t.Transforms.Search(func(m *TransformedMap) bool { +// return m.StreamPath == incomingPublisher.StreamPath +// }) { +// job.TransformJob.TransformPublished(incomingPublisher) +// } +//} func (t *TransformedMap) GetKey() string { return t.Target @@ -68,6 +69,9 @@ func (r *DefaultTransformer) GetTransformJob() *TransformJob { func (p *TransformJob) Subscribe() (err error) { p.Subscriber, err = p.Plugin.Subscribe(p.Transformer, p.StreamPath) + if err == nil { + p.Transformer.Depend(p.Subscriber) + } return } @@ -81,11 +85,11 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath p.Config = conf p.StreamPath = streamPath p.Transformer = transformer - p.Description = map[string]any{ + p.SetDescriptions(task.Description{ "streamPath": streamPath, "conf": conf, - } - + }) + transformer.SetRetry(-1, time.Second*2) plugin.Server.Transforms.AddTask(p, plugin.Logger.With("streamPath", streamPath)) return p } @@ -106,19 +110,19 @@ func (p *TransformJob) Start() (err error) { }) } } + p.Info("transform +1", "count", s.Transforms.Length) p.AddTask(p.Transformer, p.Logger) return } func (p *TransformJob) TransformPublished(pub *Publisher) { - p.Publisher = pub - // pub.OnDispose(func() { - // p.Stop(pub.StopReason()) - // }) + } func (p *TransformJob) Dispose() { + transList := &p.Plugin.Server.Transforms + p.Info("transform -1", "count", transList.Length) for _, to := range p.Config.Output { - p.Plugin.Server.Transforms.RemoveByKey(to.Target) + transList.RemoveByKey(to.Target) } }