refactor: description use sync.Map

This commit is contained in:
langhuihui
2024-10-08 20:32:12 +08:00
parent 9cb27f654e
commit f2f65478ad
29 changed files with 233 additions and 197 deletions

13
api.go
View File

@@ -150,9 +150,9 @@ func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResp
var fillData func(m task.ITask) *pb.TaskTreeResponse
fillData = func(m task.ITask) (res *pb.TaskTreeResponse) {
res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) {
for k, v := range m.GetTask().Description {
yield(k, fmt.Sprintf("%+v", v))
}
m.GetTask().Description.Range(func(key, value any) bool {
return yield(key.(string), fmt.Sprintf("%+v", value))
})
})}
if job, ok := m.(task.IJob); ok {
if blockedTask := job.Blocked(); blockedTask != nil {
@@ -172,7 +172,12 @@ func (s *Server) GetSubscribers(context.Context, *pb.SubscribersRequest) (res *p
s.Streams.Call(func() error {
var subscribers []*pb.SubscriberSnapShot
for subscriber := range s.Subscribers.Range {
meta, _ := json.Marshal(subscriber.Description)
metaData := make(task.Description)
subscriber.Description.Range(func(key, value any) bool {
metaData[key.(string)] = value
return true
})
meta, _ := json.Marshal(metaData)
snap := &pb.SubscriberSnapShot{
Id: subscriber.ID,
StartTime: timestamppb.New(subscriber.StartTime),

View File

@@ -13,7 +13,7 @@ transcode:
^live.+:
output:
- target: rtmp://localhost/trans/$0/small
conf: -loglevel debug -c:a aac -c:v h264 -vf scale=320:240
conf: -loglevel debug -c:a aac -c:v h264_videotoolbox -vf scale=320:240
#mp4:
# onpub:

View File

@@ -1,8 +1,10 @@
global:
# loglevel: trace
loglevel: debug
disableall: true
console:
enable: true
debug:
enable: true
rtsp:
enable: true
tcp: :8554

View File

@@ -46,18 +46,14 @@ type TCP struct {
func (config *TCP) CreateTCPWork(logger *slog.Logger, handler TCPHandler) *ListenTCPWork {
ret := &ListenTCPWork{TCP: config, handler: handler}
ret.Description = task.Description{
"listenAddr": config.ListenAddr,
}
ret.SetDescription("listenAddr", config.ListenAddr)
ret.Logger = logger.With("addr", config.ListenAddr)
return ret
}
func (config *TCP) CreateTCPTLSWork(logger *slog.Logger, handler TCPHandler) *ListenTCPTLSWork {
ret := &ListenTCPTLSWork{ListenTCPWork{TCP: config, handler: handler}}
ret.Description = task.Description{
"listenAddr": config.ListenAddrTLS,
}
ret.SetDescription("listenAddr", config.ListenAddrTLS)
ret.Logger = logger.With("addr", config.ListenAddrTLS)
return ret
}

View File

@@ -20,7 +20,7 @@ func (t *CallBackTask) Dispose() {
}
}
func CreateTaskByCallBack(start func() error, dispose func()) ITask {
func CreateTaskByCallBack(start func() error, dispose func()) *CallBackTask {
var task CallBackTask
task.startHandler = func() error {
err := start()

View File

@@ -22,9 +22,11 @@ func GetNextTaskID() uint32 {
// Job include tasks
type Job struct {
Task
cases []reflect.SelectCase
addSub chan ITask
children []ITask
lazyRun sync.Once
eventLoopLock sync.Mutex
childrenDisposed chan struct{}
childDisposeListeners []func(ITask)
blocked ITask
@@ -43,7 +45,10 @@ func (mt *Job) Blocked() ITask {
}
func (mt *Job) waitChildrenDispose() {
close(mt.addSub)
if mt.blocked != nil {
mt.blocked.Stop(mt.StopReason())
}
mt.addSub <- nil
<-mt.childrenDisposed
}
@@ -62,24 +67,24 @@ func (mt *Job) onDescendantsDispose(descendants ITask) {
func (mt *Job) onChildDispose(child ITask) {
if child.getParent() == mt {
if child.GetTaskType() != TASK_TYPE_CALL || child.GetOwnerType() != "CallBack" {
mt.onDescendantsDispose(child)
}
child.dispose()
}
}
func (mt *Job) dispose() {
if mt.childrenDisposed != nil {
mt.OnBeforeDispose(mt.waitChildrenDispose)
}
mt.Task.dispose()
}
func (mt *Job) RangeSubTask(callback func(task ITask) bool) {
for _, task := range mt.children {
callback(task)
}
}
func (mt *Job) AddDependTask(t ITask, opt ...any) (task *Task) {
mt.Depend(t)
return mt.AddTask(t, opt...)
}
func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
if task = t.GetTask(); t != task.handler { // first add
for _, o := range opt {
@@ -87,7 +92,9 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
case context.Context:
task.parentCtx = v
case Description:
task.Description = v
for k, v := range v {
task.Description.Store(k, v)
}
case RetryConfig:
task.retry = v
case *slog.Logger:
@@ -106,12 +113,15 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
}
mt.lazyRun.Do(func() {
if mt.eventLoopLock.TryLock() {
defer mt.eventLoopLock.Unlock()
if mt.parent != nil && mt.Context == nil {
mt.parent.AddTask(mt.handler) // second add, lazy start
}
mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
mt.addSub = make(chan ITask, 20)
go mt.run()
}
})
if task.Context == nil {
if task.parentCtx == nil {
@@ -133,7 +143,11 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
task.startup.Reject(mt.StopReason())
return
}
if len(mt.addSub) > 10 {
if mt.Logger != nil {
mt.Warn("task wait list too many", "count", len(mt.addSub))
}
}
mt.addSub <- t
return
}
@@ -144,17 +158,14 @@ func (mt *Job) Call(callback func() error, args ...any) {
func (mt *Job) Post(callback func() error, args ...any) *Task {
task := CreateTaskByCallBack(callback, nil)
description := make(Description)
if len(args) > 0 {
description[OwnerTypeKey] = args[0]
} else {
description = nil
task.SetDescription(OwnerTypeKey, args[0])
}
return mt.AddTask(task, description)
return mt.AddTask(task)
}
func (mt *Job) run() {
cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}}
mt.cases = []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}}
defer func() {
if !ThrowPanic {
err := recover()
@@ -175,33 +186,33 @@ func (mt *Job) run() {
}()
for {
mt.blocked = nil
if chosen, rev, ok := reflect.Select(cases); chosen == 0 {
if !ok {
if chosen, rev, ok := reflect.Select(mt.cases); chosen == 0 {
if rev.IsNil() {
return
}
if mt.blocked = rev.Interface().(ITask); mt.blocked.getParent() != mt || mt.blocked.start() {
mt.children = append(mt.children, mt.blocked)
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.blocked.GetSignal())})
mt.cases = append(mt.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.blocked.GetSignal())})
}
} else {
taskIndex := chosen - 1
child := mt.children[taskIndex]
switch tt := child.(type) {
mt.blocked = mt.children[taskIndex]
switch tt := mt.blocked.(type) {
case IChannelTask:
tt.Tick(rev.Interface())
if tt.IsStopped() {
mt.onChildDispose(child)
mt.onChildDispose(mt.blocked)
}
}
if !ok {
if mt.onChildDispose(child); child.checkRetry(child.StopReason()) {
if child.reset(); child.start() {
cases[chosen].Chan = reflect.ValueOf(child.GetSignal())
if mt.onChildDispose(mt.blocked); mt.blocked.checkRetry(mt.blocked.StopReason()) {
if mt.blocked.reset(); mt.blocked.start() {
mt.cases[chosen].Chan = reflect.ValueOf(mt.blocked.GetSignal())
continue
}
}
mt.children = slices.Delete(mt.children, taskIndex, taskIndex+1)
cases = slices.Delete(cases, chosen, chosen+1)
mt.cases = slices.Delete(mt.cases, chosen, chosen+1)
}
}
if !mt.handler.keepalive() && len(mt.children) == 0 {

View File

@@ -8,6 +8,7 @@ import (
"reflect"
"runtime/debug"
"strings"
"sync"
"time"
"m7s.live/m7s/v5/pkg/util"
@@ -63,6 +64,7 @@ type (
GetTaskType() TaskType
GetOwnerType() string
SetRetry(maxRetry int, retryInterval time.Duration)
Depend(ITask)
OnStart(func())
OnBeforeDispose(func())
OnDispose(func())
@@ -111,7 +113,7 @@ type (
handler ITask
retry RetryConfig
afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func()
Description
Description sync.Map
startup, shutdown *util.Promise
parent *Job
parentCtx context.Context
@@ -144,11 +146,9 @@ func (task *Task) GetTaskID() uint32 {
return task.ID
}
func (task *Task) GetOwnerType() string {
if task.Description != nil {
if ownerType, ok := task.Description[OwnerTypeKey]; ok {
if ownerType, ok := task.Description.Load(OwnerTypeKey); ok {
return ownerType.(string)
}
}
return strings.TrimSuffix(reflect.TypeOf(task.handler).Elem().Name(), "Task")
}
@@ -205,13 +205,19 @@ func (task *Task) Stop(err error) {
panic("task stop with nil error")
}
if task.CancelCauseFunc != nil {
if task.Logger != nil {
task.Debug("task stop", "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
if tt := task.handler.GetTaskType(); task.Logger != nil && tt != TASK_TYPE_CALL {
task.Debug("task stop", "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType())
}
task.CancelCauseFunc(err)
}
}
func (task *Task) Depend(t ITask) {
t.OnDispose(func() {
task.Stop(t.StopReason())
})
}
func (task *Task) OnStart(listener func()) {
task.afterStartListeners = append(task.afterStartListeners, listener)
}
@@ -232,8 +238,12 @@ func (task *Task) checkRetry(err error) bool {
if errors.Is(err, ErrTaskComplete) || errors.Is(err, ErrExit) {
return false
}
if task.parent.IsStopped() {
return false
}
if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry {
task.retry.RetryCount++
task.SetDescription("retryCount", task.retry.RetryCount)
if task.Logger != nil {
if task.retry.MaxRetry < 0 {
task.Warn(fmt.Sprintf("retry %d/∞", task.retry.RetryCount))
@@ -270,8 +280,8 @@ func (task *Task) start() bool {
}
for {
task.StartTime = time.Now()
if task.Logger != nil {
task.Debug("task start", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
if tt := task.handler.GetTaskType(); task.Logger != nil && tt != TASK_TYPE_CALL {
task.Debug("task start", "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType())
}
task.state = TASK_STATE_STARTING
if v, ok := task.handler.(TaskStarter); ok {
@@ -320,6 +330,16 @@ func (task *Task) reset() {
task.startup = util.NewPromise(task.Context)
}
func (task *Task) SetDescription(key string, value any) {
task.Description.Store(key, value)
}
func (task *Task) SetDescriptions(value Description) {
for k, v := range value {
task.Description.Store(k, v)
}
}
func (task *Task) dispose() {
if task.state < TASK_STATE_STARTED {
return
@@ -327,21 +347,39 @@ func (task *Task) dispose() {
reason := task.StopReason()
task.state = TASK_STATE_DISPOSING
if task.Logger != nil {
taskType, ownerType := task.GetTaskType(), task.GetOwnerType()
taskType, ownerType := task.handler.GetTaskType(), task.GetOwnerType()
if taskType != TASK_TYPE_CALL {
yargs := []any{"reason", reason, "taskId", task.ID, "taskType", taskType, "ownerType", ownerType}
task.Debug("task dispose", yargs...)
defer task.Debug("task disposed", yargs...)
}
for _, listener := range task.beforeDisposeListeners {
}
befores := len(task.beforeDisposeListeners)
for i, listener := range task.beforeDisposeListeners {
task.SetDescription("disposeProcess", fmt.Sprintf("b:%d/%d", i, befores))
listener()
}
if job, ok := task.handler.(IJob); ok {
mt := job.getJob()
task.SetDescription("disposeProcess", "wait children")
mt.eventLoopLock.Lock()
if mt.addSub != nil {
mt.waitChildrenDispose()
mt.lazyRun = sync.Once{}
}
mt.eventLoopLock.Unlock()
}
task.SetDescription("disposeProcess", "self")
if v, ok := task.handler.(TaskDisposal); ok {
v.Dispose()
}
task.shutdown.Fulfill(reason)
for _, listener := range task.afterDisposeListeners {
afters := len(task.afterDisposeListeners)
for i, listener := range task.afterDisposeListeners {
task.SetDescription("disposeProcess", fmt.Sprintf("a:%d/%d", i, afters))
listener()
}
task.SetDescription("disposeProcess", "done")
task.state = TASK_STATE_DISPOSED
}

View File

@@ -145,6 +145,7 @@ func (sma *ScalableMemoryAllocator) Recycle() {
for _, child := range sma.children {
child.Recycle()
}
sma.children = nil
}
// Borrow = Malloc + Free = Find, must use the memory at once

View File

@@ -122,9 +122,9 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
}
}
p.Config.ParseUserFile(userConfig)
p.Description = map[string]any{"version": plugin.Version}
p.SetDescription("version", plugin.Version)
if userConfig != nil {
p.Description["userConfig"] = userConfig
p.SetDescription("userConfig", userConfig)
}
finalConfig, _ := yaml.Marshal(p.Config.GetMap())
p.Logger.Handler().(*MultiLogHandler).SetLevel(ParseLevel(p.config.LogLevel))
@@ -288,12 +288,10 @@ func (p *Plugin) Start() (err error) {
}
}
s.Plugins.Add(p)
err = p.listen()
if err != nil {
if err = p.listen(); err != nil {
return
}
err = p.handler.OnInit()
if err != nil {
if err = p.handler.OnInit(); err != nil {
return
}
return
@@ -304,36 +302,26 @@ func (p *Plugin) Dispose() {
p.Server.Plugins.Remove(p)
}
func (p *Plugin) stopOnError(t task.ITask) {
p.AddTask(t).OnDispose(func() {
p.Stop(t.StopReason())
})
}
func (p *Plugin) listen() (err error) {
httpConf := &p.config.HTTP
if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.Server.config.HTTP.ListenAddrTLS) {
p.stopOnError(httpConf.CreateHTTPSWork(p.Logger))
p.AddDependTask(httpConf.CreateHTTPSWork(p.Logger))
}
if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.Server.config.HTTP.ListenAddr) {
p.stopOnError(httpConf.CreateHTTPWork(p.Logger))
p.AddDependTask(httpConf.CreateHTTPWork(p.Logger))
}
if tcphandler, ok := p.handler.(ITCPPlugin); ok {
tcpConf := &p.config.TCP
if tcpConf.ListenAddr != "" && tcpConf.AutoListen {
task := tcpConf.CreateTCPWork(p.Logger, tcphandler.OnTCPConnect)
err = p.AddTask(task).WaitStarted()
if err != nil {
if err = p.AddTask(tcpConf.CreateTCPWork(p.Logger, tcphandler.OnTCPConnect)).WaitStarted(); err != nil {
return
}
}
if tcpConf.ListenAddrTLS != "" && tcpConf.AutoListen {
task := tcpConf.CreateTCPTLSWork(p.Logger, tcphandler.OnTCPConnect)
err = p.AddTask(task).WaitStarted()
if err != nil {
if err = p.AddTask(tcpConf.CreateTCPTLSWork(p.Logger, tcphandler.OnTCPConnect)).WaitStarted(); err != nil {
return
}
}
@@ -342,9 +330,7 @@ func (p *Plugin) listen() (err error) {
if udpHandler, ok := p.handler.(IUDPPlugin); ok {
udpConf := &p.config.UDP
if udpConf.ListenAddr != "" && udpConf.AutoListen {
task := udpConf.CreateUDPWork(p.Logger, udpHandler.OnUDPConnect)
err = p.AddTask(task).WaitStarted()
if err != nil {
if err = p.AddTask(udpConf.CreateUDPWork(p.Logger, udpHandler.OnUDPConnect)).WaitStarted(); err != nil {
return
}
}
@@ -353,8 +339,7 @@ func (p *Plugin) listen() (err error) {
if quicHandler, ok := p.handler.(IQUICPlugin); ok {
quicConf := &p.config.Quic
if quicConf.ListenAddr != "" && quicConf.AutoListen {
task := quicConf.CreateQUICWork(p.Logger, quicHandler.OnQUICConnect)
err = p.AddTask(task).WaitStarted()
err = p.AddTask(quicConf.CreateQUICWork(p.Logger, quicHandler.OnQUICConnect)).WaitStarted()
}
}
return

View File

@@ -16,9 +16,7 @@ type Puller struct {
func NewPuller(_ config.Pull) m7s.IPuller {
p := &Puller{}
p.Description = map[string]any{
task.OwnerTypeKey: "FlvPuller",
}
p.SetDescription(task.OwnerTypeKey, "FlvPuller")
return p
}

View File

@@ -3,11 +3,12 @@ package plugin_rtmp
import (
"errors"
"io"
"net"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/plugin/rtmp/pb"
. "m7s.live/m7s/v5/plugin/rtmp/pkg"
"net"
)
type RTMPPlugin struct {
@@ -55,7 +56,7 @@ func (task *RTMPServer) Go() (err error) {
task.Debug("recv cmd", "commandName", cmd.CommandName, "streamID", msg.MessageStreamID)
switch cmd := msg.MsgData.(type) {
case *CallMessage: //connect
task.Description = cmd.Object
task.SetDescriptions(cmd.Object)
app := cmd.Object["app"] // 客户端要连接到的服务应用名
objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法
switch v := objectEncoding.(type) {
@@ -147,9 +148,7 @@ func (task *RTMPServer) Go() (err error) {
if err != nil {
task.Error("sendMessage publish", "error", err)
} else {
publisher.OnDispose(func() {
task.Stop(publisher.StopReason())
})
task.Depend(publisher)
}
case *PlayMessage:
streamPath := task.AppName + "/" + cmd.StreamName

View File

@@ -86,9 +86,7 @@ func NewPuller(_ config.Pull) m7s.IPuller {
chunkSize: 4096,
}
ret.NetConnection = &NetConnection{}
ret.Description = task.Description{
task.OwnerTypeKey: "RTMPPuller",
}
ret.SetDescription(task.OwnerTypeKey, "RTMPPuller")
return ret
}
@@ -98,9 +96,7 @@ func NewPusher() m7s.IPusher {
chunkSize: 4096,
}
ret.NetConnection = &NetConnection{}
ret.Description = task.Description{
task.OwnerTypeKey: "RTMPPusher",
}
ret.SetDescription(task.OwnerTypeKey, "RTMPPusher")
return ret
}
@@ -138,7 +134,7 @@ func (c *Client) Run() (err error) {
case Response_Result, Response_OnStatus:
switch response := msg.MsgData.(type) {
case *ResponseMessage:
c.Description = response.Properties
c.SetDescriptions(response.Properties)
if response.Infomation["code"] == NetConnection_Connect_Success {
err = c.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
if err == nil {

View File

@@ -69,5 +69,5 @@ func (ns *NetStream) BeginPlay(tid uint64) (err error) {
func (ns *NetStream) Subscribe(suber *m7s.Subscriber) {
audio, video := ns.CreateSender(false)
ns.AddTask(m7s.CreatePlayTask(suber, audio.HandleAudio, video.HandleVideo))
go m7s.PlayBlock(suber, audio.HandleAudio, video.HandleVideo)
}

View File

@@ -5,7 +5,6 @@ import (
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
)
const (
@@ -42,9 +41,7 @@ func NewPuller(_ config.Pull) m7s.IPuller {
direction: DIRECTION_PULL,
}
client.NetConnection = &NetConnection{}
client.Description = map[string]any{
task.OwnerTypeKey: "RTSPPuller",
}
client.SetDescription(task.OwnerTypeKey, "RTSPPuller")
return client
}
@@ -53,14 +50,11 @@ func NewPusher() m7s.IPusher {
direction: DIRECTION_PUSH,
}
client.NetConnection = &NetConnection{}
client.Description = map[string]any{
task.OwnerTypeKey: "RTSPPusher",
}
client.SetDescription(task.OwnerTypeKey, "RTSPPusher")
return client
}
func (c *Client) Run() (err error) {
c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12)
if err = c.Options(); err != nil {
return
}

View File

@@ -139,9 +139,8 @@ func (c *NetConnection) Connect(remoteURL string) (err error) {
c.URL = rtspURL
c.UserAgent = "monibuca" + m7s.Version
c.auth = util.NewAuth(c.URL.User)
if c.Description != nil {
c.Description["remoteAddr"] = conn.RemoteAddr().String()
}
c.SetDescription("remoteAddr", conn.RemoteAddr().String())
c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12)
// c.Backchannel = true
return
}
@@ -188,6 +187,7 @@ func (c *NetConnection) ReadRequest() (req *util.Request, err error) {
if err != nil {
return
}
c.SetDescription("lastReq", req.Method)
c.Debug("<-", "req", req.String())
return
}
@@ -229,6 +229,7 @@ func (c *NetConnection) WriteResponse(res *util.Response) (err error) {
return err
}
resStr := res.String()
c.SetDescription("lastRes", res.Request.Method)
c.Debug("->", "res", resStr)
_, err = c.conn.Write([]byte(resStr))
return

View File

@@ -235,7 +235,7 @@ func (c *Stream) Teardown() (err error) {
return c.WriteRequest(&util.Request{Method: MethodTeardown, URL: c.URL})
}
func (ns *Stream) Dispose() {
_ = ns.Teardown()
ns.NetConnection.Dispose()
}
//func (ns *Stream) Dispose() {
// //_ = ns.Teardown()
// ns.NetConnection.Dispose()
//}

View File

@@ -88,11 +88,11 @@ func (s *Sender) sendRTP(pack *mrtp.RTPData, channel int) (err error) {
}
func (s *Sender) Send() (err error) {
s.Stream.AddTask(m7s.CreatePlayTask(s.Subscriber, func(audio *mrtp.Audio) error {
go m7s.PlayBlock(s.Subscriber, func(audio *mrtp.Audio) error {
return s.sendRTP(&audio.RTPData, s.AudioChannelID)
}, func(video *mrtp.Video) error {
return s.sendRTP(&video.RTPData, s.VideoChannelID)
}))
})
for err == nil {
_, _, err = s.NetConnection.Receive(true)
}

View File

@@ -88,9 +88,7 @@ func (task *RTSPServer) Go() (err error) {
if err = task.WriteResponse(res); err != nil {
return
}
receiver.Publisher.OnDispose(func() {
task.Stop(receiver.Publisher.StopReason())
})
task.Depend(receiver.Publisher)
case MethodDescribe:
sendMode = true
sender = &Sender{}

View File

@@ -73,6 +73,6 @@ func (t *SRTServer) OnStop() {
t.server.Shutdown()
}
func (t *SRTServer) Run() error {
func (t *SRTServer) Go() error {
return t.server.ListenAndServe()
}

View File

@@ -14,11 +14,9 @@ type CommandTask struct {
}
func (ct *CommandTask) Start() (err error) {
ct.Description = task.Description{
"cmd": ct.Cmd.String(),
}
ct.SetDescription("cmd", ct.Cmd.String())
if ct.logFileName != "" {
ct.Description["log"] = ct.logFileName
ct.SetDescription("log", ct.logFileName)
ct.logFile, err = os.OpenFile(ct.logFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
ct.Error("Could not create transcode log", "err", err)
@@ -35,12 +33,13 @@ func (ct *CommandTask) Start() (err error) {
}
ct.Info("start exec", "cmd", ct.Cmd.String())
err = ct.Cmd.Start()
ct.Description["pid"] = ct.Cmd.Process.Pid
ct.SetDescription("pid", ct.Cmd.Process.Pid)
return
}
func (ct *CommandTask) Dispose() {
_ = ct.Cmd.Process.Kill()
err := ct.Cmd.Process.Kill()
ct.Info("kill", "err", err)
if ct.logFile != nil {
_ = ct.logFile.Close()
}

View File

@@ -1,6 +1,7 @@
package transcode
import (
"bufio"
"fmt"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config"
@@ -46,9 +47,8 @@ type (
func NewTransform() m7s.ITransformer {
ret := &Transformer{}
ret.Description = map[string]any{
task.OwnerTypeKey: "Transcode",
}
ret.SetDescription(task.OwnerTypeKey, "Transcode")
var bufferFull time.Time
ret.WriteFlvTag = func(flv net.Buffers) (err error) {
var buffer []byte
for _, b := range flv {
@@ -56,8 +56,12 @@ func NewTransform() m7s.ITransformer {
}
select {
case ret.rBuf <- buffer:
bufferFull = time.Now()
default:
ret.Warn("pipe input buffer full")
if time.Since(bufferFull) > time.Second*5 {
ret.Stop(bufio.ErrBufferFull)
}
}
return
}
@@ -68,7 +72,6 @@ type Transformer struct {
m7s.DefaultTransformer
TransRule
rBuf chan []byte
*util.BufReader
flv.Live
}
@@ -127,23 +130,19 @@ func (t *Transformer) Start() (err error) {
args = append(args, to.Target)
}
}
t.Description = task.Description{
"cmd": args,
"config": t.TransRule,
}
t.SetDescription("cmd", args)
t.SetDescription("config", t.TransRule)
t.rBuf = make(chan []byte, 100)
t.BufReader = util.NewBufReaderChan(t.rBuf)
t.Subscriber = t.TransformJob.Subscriber
//t.BufReader.Dump, err = os.OpenFile("dump.flv", os.O_CREATE|os.O_WRONLY, 0644)
var cmdTask CommandTask
cmdTask.logFileName = fmt.Sprintf("logs/transcode_%s_%s.log", strings.ReplaceAll(t.TransformJob.StreamPath, "/", "_"), time.Now().Format("20060102150405"))
cmdTask.Cmd = exec.CommandContext(t, "ffmpeg", args...)
cmdTask.Cmd.Stdin = t.BufReader
cmdTask.Cmd.Stdin = util.NewBufReaderChan(t.rBuf)
t.AddTask(&cmdTask)
return
}
func (t *Transformer) Dispose() {
close(t.rBuf)
t.BufReader.Recycle()
}

View File

@@ -186,7 +186,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) {
if videoSender == nil {
suber.SubVideo = false
}
conn.AddTask(m7s.CreatePlayTask(suber, func(frame *mrtp.Audio) (err error) {
go m7s.PlayBlock(suber, func(frame *mrtp.Audio) (err error) {
for _, p := range frame.Packets {
if err = audioTLSRTP.WriteRTP(p); err != nil {
return
@@ -200,7 +200,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) {
}
}
return nil
}))
})
}
conn.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {

View File

@@ -2,6 +2,7 @@ package m7s
import (
"context"
"fmt"
"math"
"os"
"path/filepath"
@@ -110,6 +111,7 @@ func (t *AVTracks) Dispose() {
t.Lock()
defer t.Unlock()
for track := range t.Range {
track.Ready(ErrDiscard)
if track == t.AVTrack || track.RingWriter != t.AVTrack.RingWriter {
track.Dispose()
}
@@ -188,7 +190,7 @@ func (p *Publisher) Start() (err error) {
for plugin := range s.Plugins.Range {
plugin.OnPublish(p)
}
s.Transforms.PublishEvent <- p
//s.Transforms.PublishEvent <- p
p.AddTask(&PublishTimeout{Publisher: p})
if p.PublishTimeout > 0 {
p.AddTask(&PublishNoDataTimeout{Publisher: p})
@@ -572,8 +574,14 @@ func (p *Publisher) takeOver(old *Publisher) {
}
old.Stop(ErrKick)
p.Info("takeOver", "old", old.ID)
if old.Subscribers.Length > 0 {
p.Info(fmt.Sprintf("subscriber +%d", old.Subscribers.Length))
for subscriber := range old.SubscriberRange {
p.AddSubscriber(subscriber)
subscriber.Publisher = p
if subscriber.BufferTime > p.BufferTime {
p.BufferTime = subscriber.BufferTime
}
}
}
old.AudioTrack.Dispose()
old.VideoTrack.Dispose()

View File

@@ -102,13 +102,13 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c
}
p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy, conf.Header)
p.puller = puller
p.Description = map[string]any{
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
"streamPath": streamPath,
"url": conf.URL,
"args": conf.Args,
"maxRetry": conf.MaxRetry,
}
})
puller.SetRetry(conf.MaxRetry, conf.RetryInterval)
plugin.Server.Pulls.Add(p, plugin.Logger.With("pullURL", conf.URL, "streamPath", streamPath))
return p

View File

@@ -27,12 +27,12 @@ func (p *PushJob) GetKey() string {
func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push) *PushJob {
p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy, conf.Header)
p.pusher = pusher
p.Description = map[string]any{
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
"streamPath": streamPath,
"url": conf.URL,
"maxRetry": conf.MaxRetry,
}
})
pusher.SetRetry(conf.MaxRetry, conf.RetryInterval)
plugin.Server.Pushs.Add(p, plugin.Logger.With("pushURL", conf.URL, "streamPath", streamPath))
return p

View File

@@ -64,13 +64,13 @@ func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string,
p.FilePath = conf.FilePath
p.StreamPath = streamPath
p.recorder = recorder
p.Description = map[string]any{
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
"streamPath": streamPath,
"filePath": conf.FilePath,
"append": conf.Append,
"fragment": conf.Fragment,
}
})
plugin.Server.Records.Add(p, plugin.Logger.With("filePath", conf.FilePath, "streamPath", streamPath))
return p
}
@@ -84,7 +84,7 @@ func (p *RecordJob) Start() (err error) {
if p.Fragment == 0 || p.Append {
dir = filepath.Dir(p.FilePath)
}
p.Description["filePath"] = p.FilePath
p.SetDescription("filePath", p.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}

View File

@@ -116,14 +116,14 @@ func NewServer(conf any) (s *Server) {
}
s.ID = task.GetNextTaskID()
s.Meta = &serverMeta
s.Description = map[string]any{
s.SetDescriptions(task.Description{
"version": Version,
"goVersion": sysruntime.Version(),
"os": sysruntime.GOOS,
"arch": sysruntime.GOARCH,
"cpus": int32(sysruntime.NumCPU()),
}
s.Transforms.PublishEvent = make(chan *Publisher, 10)
})
//s.Transforms.PublishEvent = make(chan *Publisher, 10)
s.prometheusDesc.init()
return
}
@@ -230,10 +230,10 @@ func (s *Server) Start() (err error) {
}
}
if httpConf.ListenAddrTLS != "" {
s.stopOnError(httpConf.CreateHTTPSWork(s.Logger))
s.AddDependTask(httpConf.CreateHTTPSWork(s.Logger))
}
if httpConf.ListenAddr != "" {
s.stopOnError(httpConf.CreateHTTPWork(s.Logger))
s.AddDependTask(httpConf.CreateHTTPWork(s.Logger))
}
var grpcServer *GRPCServer
if tcpConf.ListenAddr != "" {

View File

@@ -36,11 +36,11 @@ func (ps *PubSubBase) Init(streamPath string, conf any) {
if u, err := url.Parse(streamPath); err == nil {
ps.StreamPath, ps.Args = u.Path, u.Query()
}
ps.Description = map[string]any{
ps.SetDescriptions(task.Description{
"streamPath": ps.StreamPath,
"args": ps.Args,
"plugin": ps.Plugin.Meta.Name,
}
})
// args to config
if len(ps.Args) != 0 {
ignores, cc := make(map[string]struct{}), make(map[string]any)
@@ -88,7 +88,7 @@ func (s *Subscriber) Start() (err error) {
s.Info("subscribe")
if publisher, ok := server.Streams.Get(s.StreamPath); ok {
publisher.AddSubscriber(s)
return publisher.WaitTrack()
return
} else {
for reg, streamPath := range server.StreamAlias {
if g := reg.FindStringSubmatch(s.StreamPath); len(g) > 0 {
@@ -96,9 +96,9 @@ func (s *Subscriber) Start() (err error) {
streamPath = strings.ReplaceAll(streamPath, fmt.Sprintf("$%d", i), gg)
}
if publisher, ok = server.Streams.Get(streamPath); ok {
s.Description["alias"] = streamPath
s.SetDescription("alias", streamPath)
publisher.AddSubscriber(s)
return publisher.WaitTrack()
return
}
}
}
@@ -223,20 +223,20 @@ func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time.
}
type SubscribeHandler[A any, V any] struct {
task.Task
//task.Task
s *Subscriber
OnAudio func(A) error
OnVideo func(V) error
ProcessAudio, ProcessVideo chan func(*AVFrame)
}
func CreatePlayTask[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) task.ITask {
return &SubscribeHandler[A, V]{
s: s,
OnAudio: onAudio,
OnVideo: onVideo,
}
}
//func Play[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) {
// s.AddTask(&SubscribeHandler[A, V]{
// s: s,
// OnAudio: onAudio,
// OnVideo: onVideo,
// })
//}
func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) (err error) {
handler := &SubscribeHandler[A, V]{
@@ -244,12 +244,13 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(
OnAudio: onAudio,
OnVideo: onVideo,
}
err = handler.Start()
err = handler.Run()
s.Stop(err)
return
}
func (handler *SubscribeHandler[A, V]) Start() (err error) {
func (handler *SubscribeHandler[A, V]) Run() (err error) {
handler.s.SetDescription("play", time.Now())
var a1, v1 reflect.Type
s := handler.s
startAudioTs, startVideoTs := s.StartAudioTS, s.StartVideoTS
@@ -271,6 +272,7 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) {
if s.VideoReader != nil {
s.VideoReader.StopRead()
}
handler.s.SetDescription("stopPlay", time.Now())
}()
sendAudioFrame := func() (err error) {
if awi >= 0 {

View File

@@ -3,6 +3,7 @@ package m7s
import (
"context"
"slices"
"time"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
@@ -37,7 +38,7 @@ type (
Transforms struct {
task.Work
util.Collection[string, *TransformedMap]
PublishEvent chan *Publisher
//PublishEvent chan *Publisher
}
TransformsPublishEvent struct {
task.ChannelTask
@@ -45,18 +46,18 @@ type (
}
)
func (t *TransformsPublishEvent) GetSignal() any {
return t.Transforms.PublishEvent
}
func (t *TransformsPublishEvent) Tick(pub any) {
incomingPublisher := pub.(*Publisher)
for job := range t.Transforms.Search(func(m *TransformedMap) bool {
return m.StreamPath == incomingPublisher.StreamPath
}) {
job.TransformJob.TransformPublished(incomingPublisher)
}
}
//func (t *TransformsPublishEvent) GetSignal() any {
// return t.Transforms.PublishEvent
//}
//
//func (t *TransformsPublishEvent) Tick(pub any) {
// incomingPublisher := pub.(*Publisher)
// for job := range t.Transforms.Search(func(m *TransformedMap) bool {
// return m.StreamPath == incomingPublisher.StreamPath
// }) {
// job.TransformJob.TransformPublished(incomingPublisher)
// }
//}
func (t *TransformedMap) GetKey() string {
return t.Target
@@ -68,6 +69,9 @@ func (r *DefaultTransformer) GetTransformJob() *TransformJob {
func (p *TransformJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.Transformer, p.StreamPath)
if err == nil {
p.Transformer.Depend(p.Subscriber)
}
return
}
@@ -81,11 +85,11 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath
p.Config = conf
p.StreamPath = streamPath
p.Transformer = transformer
p.Description = map[string]any{
p.SetDescriptions(task.Description{
"streamPath": streamPath,
"conf": conf,
}
})
transformer.SetRetry(-1, time.Second*2)
plugin.Server.Transforms.AddTask(p, plugin.Logger.With("streamPath", streamPath))
return p
}
@@ -106,19 +110,19 @@ func (p *TransformJob) Start() (err error) {
})
}
}
p.Info("transform +1", "count", s.Transforms.Length)
p.AddTask(p.Transformer, p.Logger)
return
}
func (p *TransformJob) TransformPublished(pub *Publisher) {
p.Publisher = pub
// pub.OnDispose(func() {
// p.Stop(pub.StopReason())
// })
}
func (p *TransformJob) Dispose() {
transList := &p.Plugin.Server.Transforms
p.Info("transform -1", "count", transList.Length)
for _, to := range p.Config.Output {
p.Plugin.Server.Transforms.RemoveByKey(to.Target)
transList.RemoveByKey(to.Target)
}
}