refactor: task system

This commit is contained in:
langhuihui
2024-08-24 21:11:32 +08:00
parent e883bb94cd
commit d9f9df06b7
42 changed files with 407 additions and 313 deletions

29
api.go
View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"m7s.live/m7s/v5/pkg/task"
"maps"
"net"
"net/http"
@@ -133,7 +134,7 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err
}
func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
res, err = s.getStreamInfo(pub)
} else {
@@ -145,28 +146,28 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
}
func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
var fillData func(m util.IMarcoTask) *pb.TaskTreeResponse
fillData = func(m util.IMarcoTask) (res *pb.TaskTreeResponse) {
var fillData func(m task.IMarcoTask) *pb.TaskTreeResponse
fillData = func(m task.IMarcoTask) (res *pb.TaskTreeResponse) {
res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Blocked: m.Blocked(), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: maps.Collect(func(yield func(key, value string) bool) {
for k, v := range m.GetTask().Description {
yield(k, fmt.Sprintf("%v", v))
}
})}
for task := range m.RangeSubTask {
if marcoTask, ok := task.(util.IMarcoTask); ok {
for t := range m.RangeSubTask {
if marcoTask, ok := t.(task.IMarcoTask); ok {
res.Children = append(res.Children, fillData(marcoTask))
} else {
res.Children = append(res.Children, &pb.TaskTreeResponse{Id: task.GetTaskID(), State: uint32(task.GetState()), Type: uint32(task.GetTaskType()), Owner: task.GetOwnerType(), StartTime: timestamppb.New(task.GetTask().StartTime)})
res.Children = append(res.Children, &pb.TaskTreeResponse{Id: t.GetTaskID(), State: uint32(t.GetState()), Type: uint32(t.GetTaskType()), Owner: t.GetOwnerType(), StartTime: timestamppb.New(t.GetTask().StartTime)})
}
}
return
}
res = fillData(&util.RootTask)
res = fillData(&Servers)
return
}
func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest) (res *pb.SubscribersResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
var subscribers []*pb.SubscriberSnapShot
for subscriber := range s.Subscribers.Range {
meta, _ := json.Marshal(subscriber.Description)
@@ -202,7 +203,7 @@ func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest)
return
}
func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() {
res = &pb.TrackSnapShotResponse{}
for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
@@ -281,7 +282,7 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
}
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() {
res = &pb.TrackSnapShotResponse{}
for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
@@ -348,7 +349,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *empt
}
func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
if subscriber, ok := s.Subscribers.Get(req.Id); ok {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
subscriber.Publisher.RemoveSubscriber(subscriber)
@@ -364,7 +365,7 @@ func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeReq
}
func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
if subscriber, ok := s.Subscribers.Get(req.Id); ok {
subscriber.Stop(errors.New("stop by api"))
} else {
@@ -377,7 +378,7 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res
// /api/stream/list
func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
var streams []*pb.StreamInfoResponse
for publisher := range s.Streams.Range {
info, err := s.getStreamInfo(publisher)
@@ -393,7 +394,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
}
func (s *Server) WaitList(context.Context, *emptypb.Empty) (res *pb.StreamWaitListResponse, err error) {
s.streamTask.Call(func() error {
s.Streams.Call(func() error {
res = &pb.StreamWaitListResponse{
List: make(map[string]int32),
}

View File

@@ -17,7 +17,7 @@ func main() {
flag.BoolVar(&multi, "multi", false, "debug")
flag.Parse()
if multi {
m7s.AddRootTaskWithContext(ctx, m7s.NewServer("config2.yaml"))
go m7s.Run(ctx, "config2.yaml")
}
time.Sleep(time.Second)
m7s.Run(ctx, "config1.yaml")

View File

@@ -5,7 +5,7 @@ import (
"log/slog"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"time"
)
@@ -161,7 +161,7 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) {
r.AbsTime = 1
}
r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
r.Log(context.TODO(), util.TraceLevel, r.Track.FourCC().String(), "delay", r.Delay)
r.Log(context.TODO(), task.TraceLevel, r.Track.FourCC().String(), "delay", r.Delay)
return
}

View File

@@ -5,7 +5,7 @@ import (
"crypto/subtle"
"crypto/tls"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@@ -167,7 +167,7 @@ func BasicAuth(u, p string, next http.Handler) http.Handler {
}
type ListenHTTPTask struct {
util.Task
task.Task
*HTTP
*http.Server
}

View File

@@ -3,10 +3,9 @@ package config
import (
"context"
"crypto/tls"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"github.com/quic-go/quic-go"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
)
type QuicConfig interface {
@@ -20,7 +19,7 @@ type Quic struct {
AutoListen bool `default:"true" desc:"是否自动监听"`
}
func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic.Connection) util.ITask) *ListenQuicTask {
func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic.Connection) task.ITask) *ListenQuicTask {
ret := &ListenQuicTask{
Quic: q,
handler: handler,
@@ -30,10 +29,10 @@ func (q *Quic) CreateQUICTask(logger *slog.Logger, handler func(connection quic.
}
type ListenQuicTask struct {
util.MarcoLongTask
task.MarcoLongTask
*Quic
*quic.Listener
handler func(connection quic.Connection) util.ITask
handler func(connection quic.Connection) task.ITask
}
func (task *ListenQuicTask) Start() (err error) {

View File

@@ -4,7 +4,7 @@ import (
"crypto/tls"
_ "embed"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"net"
"runtime"
"time"
@@ -55,10 +55,10 @@ func (config *TCP) CreateTCPTLSTask(logger *slog.Logger, handler TCPHandler) *Li
return ret
}
type TCPHandler = func(conn *net.TCPConn) util.ITask
type TCPHandler = func(conn *net.TCPConn) task.ITask
type ListenTCPTask struct {
util.MarcoLongTask
task.MarcoLongTask
*TCP
net.Listener
handler TCPHandler

View File

@@ -3,7 +3,7 @@ package config
import (
"crypto/tls"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"net"
"time"
)
@@ -15,17 +15,17 @@ type UDP struct {
AutoListen bool `default:"true" desc:"是否自动监听"`
}
func (config *UDP) CreateUDPTask(logger *slog.Logger, handler func(conn *net.UDPConn) util.ITask) *ListenUDPTask {
func (config *UDP) CreateUDPTask(logger *slog.Logger, handler func(conn *net.UDPConn) task.ITask) *ListenUDPTask {
ret := &ListenUDPTask{UDP: config, handler: handler}
ret.Logger = logger.With("addr", config.ListenAddr)
return ret
}
type ListenUDPTask struct {
util.MarcoLongTask
task.MarcoLongTask
*UDP
net.Listener
handler func(conn *net.UDPConn) util.ITask
handler func(conn *net.UDPConn) task.ITask
}
func (task *ListenUDPTask) Dispose() {

View File

@@ -3,7 +3,7 @@ package pkg
import (
"context"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"slices"
)
@@ -12,7 +12,7 @@ var _ slog.Handler = (*MultiLogHandler)(nil)
func ParseLevel(level string) slog.Level {
var lv slog.LevelVar
if level == "trace" {
lv.Set(util.TraceLevel)
lv.Set(task.TraceLevel)
} else {
lv.UnmarshalText([]byte(level))
}

View File

@@ -3,6 +3,7 @@ package pkg
import (
"fmt"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
"sync"
"time"
@@ -139,7 +140,7 @@ func (rb *RingWriter) Step() (normal bool) {
isIDR := rb.Value.IDR
next := rb.Next()
if isIDR {
rb.SLogger.Log(nil, util.TraceLevel, "add idr")
rb.SLogger.Log(nil, task.TraceLevel, "add idr")
rb.PushIDR()
}
if rb.IDRingList.Len() > 0 {
@@ -153,12 +154,12 @@ func (rb *RingWriter) Step() (normal bool) {
}
} else if next == oldIDR.Value {
if nextOld := oldIDR.Next(); nextOld != nil && rb.durationFrom(nextOld.Value) > rb.BufferRange[0] {
rb.SLogger.Log(nil, util.TraceLevel, "remove old idr")
rb.SLogger.Log(nil, task.TraceLevel, "remove old idr")
rb.Lock()
rb.IDRingList.Remove(oldIDR)
rb.Unlock()
} else {
rb.SLogger.Log(nil, util.TraceLevel, "not enough buffer")
rb.SLogger.Log(nil, task.TraceLevel, "not enough buffer")
rb.glow(5)
next = rb.Next()
}

View File

@@ -1,13 +1,25 @@
package util
package task
type CallBackTask struct {
Task
startHandler func() error
disposeHandler func()
}
func (t *CallBackTask) GetTaskType() TaskType {
return TASK_TYPE_CALL
}
func (t *CallBackTask) Start() error {
return t.startHandler()
}
func (t *CallBackTask) Dispose() {
if t.disposeHandler != nil {
t.disposeHandler()
}
}
func CreateTaskByCallBack(start func() error, dispose func()) ITask {
var task CallBackTask
task.startHandler = func() error {

View File

@@ -1,4 +1,4 @@
package util
package task
import (
"time"

13
pkg/task/long.go Normal file
View File

@@ -0,0 +1,13 @@
package task
type MarcoLongTask struct {
MarcoTask
}
func (m *MarcoLongTask) keepalive() bool {
return true
}
func (*MarcoLongTask) GetTaskType() TaskType {
return TASK_TYPE_LONG_MACRO
}

View File

@@ -1,9 +1,9 @@
package util
package task
import (
"context"
"log/slog"
"os"
"m7s.live/m7s/v5/pkg/util"
"reflect"
"slices"
"sync"
@@ -16,41 +16,12 @@ func GetNextTaskID() uint32 {
return idG.Add(1)
}
var RootTask MarcoLongTask
func init() {
RootTask.initTask(context.Background(), &RootTask)
RootTask.Description = map[string]any{
"ownerType": "root",
}
RootTask.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
}
func ShutdownRootTask() {
RootTask.Stop(ErrExit)
RootTask.dispose()
}
type MarcoLongTask struct {
MarcoTask
}
func (m *MarcoLongTask) initTask(ctx context.Context, task ITask) {
m.MarcoTask.initTask(ctx, task)
m.keepAlive = true
}
func (*MarcoLongTask) GetTaskType() TaskType {
return TASK_TYPE_LONG_MACRO
}
// MarcoTask include sub tasks
type MarcoTask struct {
Task
addSub chan ITask
children []ITask
lazyRun sync.Once
keepAlive bool
childrenDisposed chan struct{}
childDisposeListeners []func(ITask)
blocked bool
@@ -60,6 +31,10 @@ func (*MarcoTask) GetTaskType() TaskType {
return TASK_TYPE_MACRO
}
func (mt *MarcoTask) getMarcoTask() *MarcoTask {
return mt
}
func (mt *MarcoTask) Blocked() bool {
return mt.blocked
}
@@ -92,54 +67,61 @@ func (mt *MarcoTask) dispose() {
mt.Task.dispose()
}
func (mt *MarcoTask) lazyStart(t ITask) {
task := t.GetTask()
if mt.IsStopped() {
task.startup.Reject(mt.StopReason())
return
}
if task.ID == 0 {
task.ID = GetNextTaskID()
}
if task.parent == nil {
task.parent = mt
task.level = mt.level + 1
}
if task.Logger == nil {
task.Logger = mt.Logger
}
if task.startHandler == nil {
task.startHandler = EmptyStart
}
if task.disposeHandler == nil {
task.disposeHandler = EmptyDispose
}
mt.lazyRun.Do(func() {
mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
go mt.run()
})
mt.addSub <- t
}
func (mt *MarcoTask) RangeSubTask(callback func(task ITask) bool) {
for _, task := range mt.children {
callback(task)
}
}
func (mt *MarcoTask) AddTask(task ITask) *Task {
return mt.AddTaskWithContext(mt.Context, task)
func (mt *MarcoTask) AddTaskLazy(t IMarcoTask) {
t.GetTask().parent = mt
}
func (mt *MarcoTask) AddTaskWithContext(ctx context.Context, t ITask) (task *Task) {
if ctx == nil && mt.Context == nil {
func (mt *MarcoTask) AddTask(t ITask, opt ...any) (task *Task) {
mt.lazyRun.Do(func() {
if mt.parent != nil && mt.handler == nil {
mt.parent.AddTask(mt)
}
mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
go mt.run()
})
if task = t.GetTask(); task.handler == nil {
task.parentCtx = mt.Context
for _, o := range opt {
switch v := o.(type) {
case context.Context:
task.parentCtx = v
case Description:
task.Description = v
case RetryConfig:
task.retry = v
case *slog.Logger:
task.Logger = v
}
}
if task.parentCtx == nil {
panic("context is nil")
}
if task = t.GetTask(); task.parent == nil {
t.initTask(ctx, t)
task.parent = mt
task.level = mt.level + 1
if task.ID == 0 {
task.ID = GetNextTaskID()
}
mt.lazyStart(t)
task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx)
task.startup = util.NewPromise(task.Context)
task.shutdown = util.NewPromise(context.Background())
task.handler = t
if task.Logger == nil {
task.Logger = mt.Logger
}
}
if mt.IsStopped() {
task.startup.Reject(mt.StopReason())
return
}
mt.addSub <- t
return
}
@@ -208,7 +190,7 @@ func (mt *MarcoTask) run() {
cases = slices.Delete(cases, chosen, chosen+1)
}
}
if !mt.keepAlive && len(mt.children) == 0 {
if !mt.handler.keepalive() && len(mt.children) == 0 {
mt.Stop(ErrAutoStop)
}
}

25
pkg/task/manager.go Normal file
View File

@@ -0,0 +1,25 @@
package task
import (
. "m7s.live/m7s/v5/pkg/util"
)
type ManagerItem[K comparable] interface {
ITask
GetKey() K
}
type Manager[K comparable, T ManagerItem[K]] struct {
MarcoLongTask
Collection[K, T]
}
func (m *Manager[K, T]) Add(ctx T) {
ctx.OnStart(func() {
m.Collection.Add(ctx)
})
ctx.OnDispose(func() {
m.Remove(ctx)
})
m.AddTask(ctx)
}

43
pkg/task/root.go Normal file
View File

@@ -0,0 +1,43 @@
package task
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
)
type OSSignal struct {
ChannelTask
root interface {
Shutdown()
}
}
func (o *OSSignal) Start() error {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
o.SignalChan = signalChan
return nil
}
func (o *OSSignal) Tick(any) {
go o.root.Shutdown()
}
type RootManager[K comparable, T ManagerItem[K]] struct {
Manager[K, T]
}
func (m *RootManager[K, T]) Init() {
m.Context = context.Background()
m.handler = m
m.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
m.AddTask(&OSSignal{})
}
func (m *RootManager[K, T]) Shutdown() {
m.Stop(ErrExit)
m.dispose()
}

View File

@@ -1,10 +1,11 @@
package util
package task
import (
"context"
"errors"
"fmt"
"log/slog"
"m7s.live/m7s/v5/pkg/util"
"reflect"
"runtime/debug"
"strings"
@@ -44,7 +45,7 @@ type (
TaskState byte
TaskType byte
ITask interface {
initTask(context.Context, ITask)
keepalive() bool
getParent() *MarcoTask
GetParent() ITask
GetTask() *Task
@@ -66,6 +67,8 @@ type (
}
IMarcoTask interface {
ITask
getMarcoTask() *MarcoTask
AddTask(ITask, ...any) *Task
RangeSubTask(func(yield ITask) bool)
OnChildDispose(func(ITask))
Blocked() bool
@@ -92,6 +95,7 @@ type (
RetryCount int
RetryInterval time.Duration
}
Description = map[string]any
Task struct {
ID uint32
StartTime time.Time
@@ -100,11 +104,9 @@ type (
context.CancelCauseFunc
handler ITask
retry RetryConfig
startHandler func() error
afterStartListeners, beforeDisposeListeners, afterDisposeListeners []func()
disposeHandler func()
Description map[string]any
startup, shutdown *Promise
Description
startup, shutdown *util.Promise
parent *MarcoTask
parentCtx context.Context
needRetry bool
@@ -113,6 +115,10 @@ type (
}
)
func (*Task) keepalive() bool {
return false
}
func (task *Task) GetState() TaskState {
return task.state
}
@@ -253,7 +259,9 @@ func (task *Task) start() (err error) {
hasRun := false
for {
task.state = TASK_STATE_STARTING
err = task.startHandler()
if v, ok := task.handler.(TaskStarter); ok {
err = v.Start()
}
task.state = TASK_STATE_STARTED
if err == nil {
task.ResetRetryCount()
@@ -306,7 +314,9 @@ func (task *Task) dispose() {
for _, listener := range task.beforeDisposeListeners {
listener()
}
task.disposeHandler()
if v, ok := task.handler.(TaskDisposal); ok {
v.Dispose()
}
task.shutdown.Fulfill(reason)
for _, listener := range task.afterDisposeListeners {
listener()
@@ -317,28 +327,14 @@ func (task *Task) dispose() {
}
if !errors.Is(reason, ErrTaskComplete) && task.needRetry {
task.Context, task.CancelCauseFunc = context.WithCancelCause(task.parentCtx)
task.startup = NewPromise(task.Context)
task.shutdown = NewPromise(context.Background())
task.startup = util.NewPromise(task.Context)
task.shutdown = util.NewPromise(context.Background())
parent := task.parent
task.parent = nil
parent.AddTask(task.handler)
}
}
func (task *Task) initTask(ctx context.Context, iTask ITask) {
task.parentCtx = ctx
task.Context, task.CancelCauseFunc = context.WithCancelCause(ctx)
task.startup = NewPromise(task.Context)
task.shutdown = NewPromise(context.Background())
task.handler = iTask
if v, ok := iTask.(TaskStarter); ok {
task.startHandler = v.Start
}
if v, ok := iTask.(TaskDisposal); ok {
task.disposeHandler = v.Dispose
}
}
func (task *Task) ResetRetryCount() {
task.retry.RetryCount = 0
}

View File

@@ -1,4 +1,4 @@
package util
package task
import (
"context"
@@ -63,21 +63,6 @@ func Test_Call_ExecutesCallback(t *testing.T) {
}
}
func Test_AddChan_AddsChannelTask(t *testing.T) {
mt := createMarcoTask()
channel := time.NewTimer(time.Millisecond * 100)
called := false
callback := func(time.Time) {
called = true
}
mt.AddChan(channel.C, callback)
time.AfterFunc(time.Millisecond*500, func() {
if !called {
t.Errorf("expected callback to be called")
}
})
}
func Test_StopByContext(t *testing.T) {
mt := createMarcoTask()
var task Task

View File

@@ -5,6 +5,7 @@ import (
"log/slog"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
"reflect"
"time"
@@ -116,5 +117,5 @@ func (t *Track) WaitReady() error {
}
func (t *Track) Trace(msg string, fields ...any) {
t.Log(context.TODO(), util.TraceLevel, msg, fields...)
t.Log(context.TODO(), task.TraceLevel, msg, fields...)
}

View File

@@ -2,8 +2,8 @@ package m7s
import (
"context"
"github.com/quic-go/quic-go"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/http"
"os"
@@ -12,6 +12,8 @@ import (
"runtime"
"strings"
"github.com/quic-go/quic-go"
gatewayRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
myip "github.com/husanpao/ip"
"google.golang.org/grpc"
@@ -39,6 +41,7 @@ type (
Puller Puller
Pusher Pusher
Recorder Recorder
Transformer Transformer
OnExit OnExitHandler
OnAuthPub AuthPublisher
OnAuthSub AuthSubscriber
@@ -49,7 +52,7 @@ type (
}
IPlugin interface {
util.ITask
task.ITask
OnInit() error
OnStop()
Pull(path string, url string)
@@ -64,15 +67,15 @@ type (
}
ITCPPlugin interface {
OnTCPConnect(conn *net.TCPConn) util.ITask
OnTCPConnect(conn *net.TCPConn) task.ITask
}
IUDPPlugin interface {
OnUDPConnect(conn *net.UDPConn) util.ITask
OnUDPConnect(conn *net.UDPConn) task.ITask
}
IQUICPlugin interface {
OnQUICConnect(quic.Connection) util.ITask
OnQUICConnect(quic.Connection) task.ITask
}
)
@@ -145,7 +148,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) (p *Plugin)
}
}
}
s.AddTask(instance)
return
}
@@ -193,7 +196,7 @@ func InstallPlugin[C iPlugin](options ...any) error {
}
type Plugin struct {
util.MarcoLongTask
task.MarcoLongTask
Disabled bool
Meta *PluginMeta
config config.Common
@@ -288,7 +291,7 @@ func (p *Plugin) Dispose() {
p.Server.Plugins.Remove(p)
}
func (p *Plugin) stopOnError(t util.ITask) {
func (p *Plugin) stopOnError(t task.ITask) {
p.AddTask(t).OnDispose(func() {
p.Stop(t.StopReason())
})
@@ -366,7 +369,7 @@ func (p *Plugin) PublishWithConfig(ctx context.Context, streamPath string, conf
}
}
}
err = p.Server.streamTask.AddTaskWithContext(ctx, publisher).WaitStarted()
err = p.Server.Streams.AddTask(publisher, ctx).WaitStarted()
return
}
@@ -388,7 +391,7 @@ func (p *Plugin) SubscribeWithConfig(ctx context.Context, streamPath string, con
}
}
}
err = p.Server.streamTask.AddTaskWithContext(ctx, subscriber).WaitStarted()
err = p.Server.Streams.AddTask(subscriber, ctx).WaitStarted()
return
}
@@ -398,17 +401,22 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber *
func (p *Plugin) Pull(streamPath string, url string) {
puller := p.Meta.Puller()
p.Server.AddPullTask(puller.GetPullContext().Init(puller, p, streamPath, url))
puller.GetPullContext().Init(puller, p, streamPath, url)
}
func (p *Plugin) Push(streamPath string, url string) {
pusher := p.Meta.Pusher()
p.Server.AddPushTask(pusher.GetPushContext().Init(pusher, p, streamPath, url))
pusher.GetPushContext().Init(pusher, p, streamPath, url)
}
func (p *Plugin) Record(streamPath string, filePath string) {
recorder := p.Meta.Recorder()
p.Server.AddRecordTask(recorder.GetRecordContext().Init(recorder, p, streamPath, filePath))
recorder.GetRecordContext().Init(recorder, p, streamPath, filePath)
}
func (p *Plugin) Transform(fromStreamPath, toStreamPath string) {
transformer := p.Meta.Transformer()
transformer.GetTransformContext().Init(transformer, p, fromStreamPath, toStreamPath)
}
func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {
@@ -466,11 +474,11 @@ func (p *Plugin) AddLogHandler(handler slog.Handler) {
}
func (p *Plugin) SaveConfig() (err error) {
return util.RootTask.AddTask(&SaveConfig{Plugin: p}).WaitStopped()
return Servers.AddTask(&SaveConfig{Plugin: p}).WaitStopped()
}
type SaveConfig struct {
util.Task
task.Task
Plugin *Plugin
file *os.File
}
@@ -479,7 +487,7 @@ func (s *SaveConfig) Start() (err error) {
if s.Plugin.Modify == nil {
err = os.Remove(s.Plugin.settingPath())
if err == nil {
err = util.ErrTaskComplete
err = task.ErrTaskComplete
}
}
s.file, err = os.OpenFile(s.Plugin.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)

View File

@@ -93,7 +93,7 @@ func (c *CascadeClientPlugin) Pull(streamPath, url string) {
puller := &cascade.Puller{
Connection: c.conn,
}
c.Plugin.Server.AddPullTask(puller.GetPullContext().Init(puller, &c.Plugin, streamPath, url))
puller.GetPullContext().Init(puller, &c.Plugin, streamPath, url)
}
//func (c *CascadeClientPlugin) Start() {

View File

@@ -10,7 +10,7 @@ import (
"github.com/quic-go/quic-go"
"io"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/http"
"os"
@@ -62,7 +62,7 @@ type ConsolePlugin struct {
var _ = m7s.InstallPlugin[ConsolePlugin]()
type ConnectServerTask struct {
util.Task
task.Task
cfg *ConsolePlugin
quic.Connection
}
@@ -120,7 +120,7 @@ func (task *ConnectServerTask) Run() (err error) {
}
type ReceiveRequestTask struct {
util.Task
task.Task
stream quic.Stream
handler http.Handler
conn quic.Connection

View File

@@ -5,6 +5,7 @@ import (
"io"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
"os"
@@ -14,7 +15,7 @@ import (
)
type WriteFlvMetaTagQueueTask struct {
util.MarcoLongTask
task.MarcoLongTask
}
var writeMetaTagQueueTask WriteFlvMetaTagQueueTask
@@ -24,7 +25,7 @@ func init() {
}
type writeMetaTagTask struct {
util.Task
task.Task
file *os.File
writer *FlvWriter
flags byte

View File

@@ -4,6 +4,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg"
"net/http"
@@ -22,7 +23,7 @@ const (
)
type Device struct {
util.Task
task.Task
ID string
Name string
Manufacturer string

View File

@@ -6,6 +6,7 @@ import (
"github.com/emiago/sipgo/sip"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg"
rtp2 "m7s.live/m7s/v5/plugin/rtp/pkg"
@@ -15,7 +16,7 @@ import (
)
type Dialog struct {
util.MarcoTask
task.MarcoTask
*Channel
*gb28181.Receiver
gb28181.InviteOptions
@@ -122,7 +123,7 @@ func (d *Dialog) Run() (err error) {
var tcpConf config.TCP
tcpConf.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
tcpConf.ListenNum = 1
tcpConf.CreateTCPTask(d.Logger, func(conn *net.TCPConn) util.ITask {
tcpConf.CreateTCPTask(d.Logger, func(conn *net.TCPConn) task.ITask {
d.Receiver.RTPReader = (*rtp2.TCP)(conn)
return d.Receiver
})

View File

@@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog/log"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/gb28181/pb"
gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg"
@@ -295,8 +296,7 @@ func (gb *GB28181Plugin) Pull(streamPath, url string) {
dialog := Dialog{
gb: gb,
}
ctx := dialog.GetPullContext().Init(&dialog, &gb.Plugin, streamPath, url)
gb.Server.AddPullTask(ctx)
dialog.GetPullContext().Init(&dialog, &gb.Plugin, streamPath, url)
}
func (gb *GB28181Plugin) GetPullableList() []string {
@@ -310,13 +310,13 @@ func (gb *GB28181Plugin) GetPullableList() []string {
}
type PSServer struct {
util.Task
task.Task
*rtp2.TCP
theDialog *Dialog
gb *GB28181Plugin
}
func (gb *GB28181Plugin) OnTCPConnect(conn *net.TCPConn) util.ITask {
func (gb *GB28181Plugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
ret := &PSServer{gb: gb, TCP: (*rtp2.TCP)(conn)}
ret.Task.Logger = gb.With("remote", conn.RemoteAddr().String())
return ret
@@ -353,6 +353,6 @@ func (gb *GB28181Plugin) OnBye(req *sip.Request, tx sip.ServerTransaction) {
return d.GetCallID() == req.CallID().Value()
}); ok {
gb.Warn("OnBye", "dialog", dialog.GetCallID())
dialog.Stop(util.ErrTaskComplete)
dialog.Stop(task.ErrTaskComplete)
}
}

View File

@@ -3,7 +3,7 @@ package plugin_monitor
import (
"encoding/json"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/plugin/monitor/pb"
monitor "m7s.live/m7s/v5/plugin/monitor/pkg"
"os"
@@ -27,7 +27,7 @@ func (cfg *MonitorPlugin) OnStop() {
}
}
func (cfg *MonitorPlugin) saveTask(task util.ITask) {
func (cfg *MonitorPlugin) saveTask(task task.ITask) {
var th monitor.Task
th.SessionID = cfg.session.ID
th.TaskID = task.GetTaskID()

View File

@@ -4,14 +4,14 @@ import (
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/plugin/mp4/pkg/box"
"os"
"time"
)
type WriteTrailerQueueTask struct {
util.MarcoLongTask
task.MarcoLongTask
}
var writeTrailerQueueTask WriteTrailerQueueTask
@@ -29,7 +29,7 @@ type Recorder struct {
}
type writeTrailerTask struct {
util.Task
task.Task
muxer *box.Movmuxer
file *os.File
}

View File

@@ -9,6 +9,6 @@ import (
func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *gpb.SuccessResponse, err error) {
pusher := rtmp.NewPusher()
err = r.Server.AddPushTask(pusher.GetPushContext().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL)).WaitStarted()
err = pusher.GetPushContext().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL).WaitStarted()
return &gpb.SuccessResponse{}, err
}

View File

@@ -4,7 +4,7 @@ import (
"errors"
"io"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/plugin/rtmp/pb"
. "m7s.live/m7s/v5/plugin/rtmp/pkg"
"net"
@@ -26,7 +26,7 @@ type RTMPServer struct {
conf *RTMPPlugin
}
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) util.ITask {
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
ret := &RTMPServer{NetConnection: NewNetConnection(conn), conf: p}
ret.Logger = p.With("remote", conn.RemoteAddr().String())
return ret

View File

@@ -3,6 +3,7 @@ package rtmp
import (
"errors"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"net"
"runtime"
"sync/atomic"
@@ -43,7 +44,7 @@ const (
)
type NetConnection struct {
util.MarcoTask
task.MarcoTask
*util.BufReader
net.Conn
bandwidth uint32

View File

@@ -3,6 +3,7 @@ package plugin_rtsp
import (
"errors"
"fmt"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/http"
"strconv"
@@ -27,7 +28,7 @@ type RTSPServer struct {
conf *RTSPPlugin
}
func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) util.ITask {
func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
ret := &RTSPServer{NetConnection: NewNetConnection(conn), conf: p}
ret.Logger = p.With("remote", conn.RemoteAddr().String())
return ret

View File

@@ -2,6 +2,7 @@ package rtsp
import (
"encoding/binary"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/url"
"runtime"
@@ -26,7 +27,7 @@ func NewNetConnection(conn net.Conn) *NetConnection {
}
type NetConnection struct {
util.MarcoTask
task.MarcoTask
*util.BufReader
Backchannel bool
Media string

View File

@@ -19,8 +19,7 @@ func (r *StressPlugin) pull(count int, format, url string, puller m7s.Puller) (e
for j := i; j < count; j++ {
p := puller()
ctx := p.GetPullContext().Init(p, &r.Plugin, fmt.Sprintf("stress/%d", j), fmt.Sprintf(format, url))
err = r.Plugin.Server.AddPullTask(ctx).WaitStarted()
if err != nil {
if err = ctx.WaitStarted(); err != nil {
return
}
r.pullers.AddUnique(ctx)
@@ -42,8 +41,7 @@ func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pu
for j := i; j < count; j++ {
p := pusher()
ctx := p.GetPushContext().Init(p, &r.Plugin, streamPath, fmt.Sprintf(format, remoteHost, j))
err = r.Plugin.Server.AddPushTask(ctx).WaitStarted()
if err != nil {
if err = ctx.WaitStarted(); err != nil {
return
}
r.pushers.AddUnique(ctx)

View File

@@ -159,7 +159,6 @@ func (p *WebRTCPlugin) Pull(streamPath, url string) {
p.Error("pull", "error", err)
return
}
ctx := cfClient.GetPullContext().Init(cfClient, &p.Plugin, streamPath, url)
p.Server.AddPullTask(ctx)
cfClient.GetPullContext().Init(cfClient, &p.Plugin, streamPath, url)
}
}

View File

@@ -6,13 +6,14 @@ import (
"github.com/pion/rtp"
. "github.com/pion/webrtc/v3"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
mrtp "m7s.live/m7s/v5/plugin/rtp/pkg"
"time"
)
type Connection struct {
util.MarcoTask
task.MarcoTask
*PeerConnection
SDP string
// LocalSDP *sdp.SessionDescription

View File

@@ -2,6 +2,7 @@ package m7s
import (
"context"
"m7s.live/m7s/v5/pkg/task"
"math"
"os"
"path/filepath"
@@ -134,7 +135,7 @@ func (p *Publisher) GetKey() string {
// createPublisher -> Start -> WriteAudio/WriteVideo -> Dispose
func createPublisher(p *Plugin, streamPath string, conf config.Publish) (publisher *Publisher) {
publisher = &Publisher{Publish: conf}
publisher.ID = util.GetNextTaskID()
publisher.ID = task.GetNextTaskID()
publisher.Plugin = p
publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout)
publisher.Logger = p.Logger.With("streamPath", streamPath, "pId", publisher.ID)
@@ -191,7 +192,7 @@ func (p *Publisher) Start() (err error) {
}
type PublishTimeout struct {
util.ChannelTask
task.ChannelTask
Publisher *Publisher
}
@@ -223,7 +224,7 @@ func (p *PublishTimeout) Tick(any) {
}
type PublishNoDataTimeout struct {
util.TickTask
task.TickTask
Publisher *Publisher
}
@@ -303,7 +304,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
}
}
p.lastTs = frame.Timestamp
if p.Enabled(p, util.TraceLevel) {
if p.Enabled(p, task.TraceLevel) {
codec := t.FourCC().String()
data := frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data)

View File

@@ -4,7 +4,7 @@ import (
"io"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"net/http"
"net/url"
"os"
@@ -14,7 +14,7 @@ import (
type (
Connection struct {
util.MarcoTask
task.MarcoTask
Plugin *Plugin
StreamPath string // 对应本地流
RemoteURL string // 远程服务器地址(用于推拉)
@@ -22,7 +22,7 @@ type (
}
IPuller interface {
util.ITask
task.ITask
GetPullContext() *PullContext
}
@@ -37,7 +37,7 @@ type (
}
HttpFilePuller struct {
util.Task
task.Task
Ctx PullContext
io.ReadCloser
}
@@ -74,6 +74,7 @@ func (p *PullContext) Init(puller IPuller, plugin *Plugin, streamPath string, ur
}
p.puller = puller
puller.SetRetry(plugin.config.Pull.RePull, time.Second*5)
plugin.Server.Pulls.Add(p)
return p
}

View File

@@ -2,14 +2,14 @@ package m7s
import (
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"time"
"m7s.live/m7s/v5/pkg/config"
)
type IPusher interface {
util.ITask
task.ITask
GetPushContext() *PushContext
}
@@ -35,6 +35,7 @@ func (p *PushContext) Init(pusher IPusher, plugin *Plugin, streamPath string, ur
}
p.pusher = pusher
pusher.SetRetry(plugin.config.RePush, time.Second*5)
plugin.Server.Pushs.Add(p)
return p
}

View File

@@ -1,7 +1,7 @@
package m7s
import (
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/pkg/task"
"os"
"path/filepath"
"time"
@@ -11,12 +11,12 @@ import (
type (
IRecorder interface {
util.ITask
task.ITask
GetRecordContext() *RecordContext
}
Recorder = func() IRecorder
RecordContext struct {
util.MarcoTask
task.MarcoTask
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
@@ -26,7 +26,7 @@ type (
recorder IRecorder
}
DefaultRecorder struct {
util.Task
task.Task
Ctx RecordContext
}
)
@@ -59,6 +59,7 @@ func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath stri
recorderTask.Logger = p.Logger
}
p.recorder = recorder
plugin.Server.Records.Add(p)
return p
}
@@ -80,11 +81,6 @@ func (p *RecordContext) Start() (err error) {
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
s.Records.Add(p)
s.AddTask(p.recorder)
p.AddTask(p.recorder)
return
}
func (p *RecordContext) Dispose() {
p.Plugin.Server.Records.Remove(p)
}

View File

@@ -5,16 +5,18 @@ import (
"errors"
"fmt"
"log/slog"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime/debug"
"strings"
"syscall"
"time"
"m7s.live/m7s/v5/pkg/config"
sysruntime "runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
myip "github.com/husanpao/ip"
"github.com/phsym/console-slog"
@@ -26,7 +28,6 @@ import (
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/db"
"m7s.live/m7s/v5/pkg/util"
sysruntime "runtime"
)
var (
@@ -38,7 +39,7 @@ var (
Name: "Global",
Version: Version,
}
Servers util.Collection[uint32, *Server]
Servers task.RootManager[uint32, *Server]
Routes = map[string]string{}
defaultLogHandler = console.NewHandler(os.Stdout, &console.HandlerOptions{TimeFormat: "15:04:05.000000"})
)
@@ -68,11 +69,12 @@ type Server struct {
Plugin
ServerConfig
Plugins util.Collection[string, *Plugin]
Streams util.Collection[string, *Publisher]
Streams task.Manager[string, *Publisher]
Waiting util.Collection[string, *WaitStream]
Pulls util.Collection[string, *PullContext]
Pushs util.Collection[string, *PushContext]
Records util.Collection[string, *RecordContext]
Pulls task.Manager[string, *PullContext]
Pushs task.Manager[string, *PushContext]
Records task.Manager[string, *RecordContext]
Transforms task.Manager[string, *TransformContext]
Subscribers SubscriberCollection
LogHandler MultiLogHandler
apiList []string
@@ -80,7 +82,6 @@ type Server struct {
grpcClientConn *grpc.ClientConn
lastSummaryTime time.Time
lastSummary *pb.SummaryResponse
streamTask, pullTask, pushTask, recordTask util.MarcoLongTask
conf any
}
@@ -88,7 +89,7 @@ func NewServer(conf any) (s *Server) {
s = &Server{
conf: conf,
}
s.ID = util.GetNextTaskID()
s.ID = task.GetNextTaskID()
s.Meta = &serverMeta
s.Description = map[string]any{
"version": Version,
@@ -101,38 +102,18 @@ func NewServer(conf any) (s *Server) {
}
func Run(ctx context.Context, conf any) (err error) {
for err = ErrRestart; errors.Is(err, ErrRestart); err = util.RootTask.AddTaskWithContext(ctx, NewServer(conf)).WaitStopped() {
for err = ErrRestart; errors.Is(err, ErrRestart); err = Servers.AddTask(NewServer(conf), ctx).WaitStopped() {
}
return
}
func AddRootTask[T util.ITask](task T) T {
util.RootTask.AddTask(task)
return task
}
func AddRootTaskWithContext[T util.ITask](ctx context.Context, task T) T {
util.RootTask.AddTaskWithContext(ctx, task)
func AddRootTask[T task.ITask](task T, opt ...any) T {
Servers.AddTask(task, opt...)
return task
}
type RawConfig = map[string]map[string]any
type OSSignal struct {
util.ChannelTask
}
func (o *OSSignal) Start() error {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
o.SignalChan = signalChan
return nil
}
func (o *OSSignal) Tick(any) {
go util.ShutdownRootTask()
}
func exit() {
for _, meta := range plugins {
if meta.OnExit != nil {
@@ -146,8 +127,8 @@ func exit() {
}
func init() {
util.RootTask.AddTask(&OSSignal{})
util.RootTask.OnDispose(exit)
Servers.Init()
Servers.OnDispose(exit)
for k, v := range myip.LocalAndInternalIPs() {
Routes[k] = v
fmt.Println(k, v)
@@ -247,20 +228,20 @@ func (s *Server) Start() (err error) {
return
}
}
s.AddTask(&s.streamTask).Description = map[string]any{"ownerType": "Stream"}
s.AddTask(&s.pullTask).Description = map[string]any{"ownerType": "Pull"}
s.AddTask(&s.pushTask).Description = map[string]any{"ownerType": "Push"}
s.AddTask(&s.recordTask).Description = map[string]any{"ownerType": "Record"}
s.AddTaskLazy(&s.Records)
s.AddTaskLazy(&s.Streams)
s.AddTaskLazy(&s.Pulls)
s.AddTaskLazy(&s.Pushs)
s.AddTaskLazy(&s.Transforms)
for _, plugin := range plugins {
if p := plugin.Init(s, cg[strings.ToLower(plugin.Name)]); !p.Disabled {
s.AddTask(p.handler)
}
plugin.Init(s, cg[strings.ToLower(plugin.Name)])
}
if tcpTask != nil {
s.AddTask(&GRPCServer{Task: util.Task{Logger: s.Logger}, s: s, tcpTask: tcpTask})
s.AddTask(&GRPCServer{s: s, tcpTask: tcpTask}, s.Logger)
}
s.streamTask.AddTask(&CheckSubWaitTimeout{s: s})
Servers.Add(s)
s.Streams.OnStart(func() {
s.Streams.AddTask(&CheckSubWaitTimeout{s: s})
})
s.Info("server started")
s.Post(func() error {
for plugin := range s.Plugins.Range {
@@ -276,7 +257,7 @@ func (s *Server) Start() (err error) {
}
type CheckSubWaitTimeout struct {
util.TickTask
task.TickTask
s *Server
}
@@ -297,7 +278,7 @@ func (c *CheckSubWaitTimeout) Tick(any) {
}
type GRPCServer struct {
util.Task
task.Task
s *Server
tcpTask *config.ListenTCPTask
}
@@ -311,23 +292,10 @@ func (gRPC *GRPCServer) Go() (err error) {
}
func (s *Server) CallOnStreamTask(callback func() error) {
s.streamTask.Call(callback)
}
func (s *Server) AddPullTask(task *PullContext) *util.Task {
return s.pullTask.AddTask(task)
}
func (s *Server) AddPushTask(task *PushContext) *util.Task {
return s.pushTask.AddTask(task)
}
func (s *Server) AddRecordTask(task *RecordContext) *util.Task {
return s.recordTask.AddTask(task)
s.Streams.Call(callback)
}
func (s *Server) Dispose() {
Servers.Remove(s)
_ = s.grpcClientConn.Close()
if s.DB != nil {
db, err := s.DB.DB()

View File

@@ -2,6 +2,7 @@ package m7s
import (
"errors"
"m7s.live/m7s/v5/pkg/task"
"net/url"
"reflect"
"runtime"
@@ -16,7 +17,7 @@ import (
var AVFrameType = reflect.TypeOf((*AVFrame)(nil))
type PubSubBase struct {
util.MarcoTask
task.MarcoTask
Plugin *Plugin
StreamPath string
Args url.Values
@@ -63,7 +64,7 @@ type Subscriber struct {
func createSubscriber(p *Plugin, streamPath string, conf config.Subscribe) *Subscriber {
subscriber := &Subscriber{Subscribe: conf}
subscriber.ID = util.GetNextTaskID()
subscriber.ID = task.GetNextTaskID()
subscriber.Plugin = p
subscriber.TimeoutTimer = time.NewTimer(subscriber.WaitTimeout)
subscriber.Logger = p.Logger.With("streamPath", streamPath, "sId", subscriber.ID)
@@ -159,7 +160,7 @@ func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time.
}
type SubscribeHandler[A any, V any] struct {
util.Task
task.Task
s *Subscriber
OnAudio func(A) error
OnVideo func(V) error
@@ -167,7 +168,7 @@ type SubscribeHandler[A any, V any] struct {
ProcessVideo chan func(*AVFrame)
}
func CreatePlayTask[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) util.ITask {
func CreatePlayTask[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) task.ITask {
var handler SubscribeHandler[A, V]
handler.s = s
handler.OnAudio = onAudio
@@ -211,7 +212,7 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) {
sendAudioFrame := func() (err error) {
if awi >= 0 {
if len(audioFrame.Wraps) > awi {
if s.Enabled(s, util.TraceLevel) {
if s.Enabled(s, task.TraceLevel) {
s.Trace("send audio frame", "seq", audioFrame.Sequence)
}
err = handler.OnAudio(audioFrame.Wraps[awi].(A))
@@ -235,7 +236,7 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) {
sendVideoFrame := func() (err error) {
if vwi >= 0 {
if len(videoFrame.Wraps) > vwi {
if s.Enabled(s, util.TraceLevel) {
if s.Enabled(s, task.TraceLevel) {
s.Trace("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wraps[vwi].String(), "size", videoFrame.Wraps[vwi].GetSize())
}
err = handler.OnVideo(videoFrame.Wraps[vwi].(V))

View File

@@ -2,19 +2,75 @@ package m7s
import (
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/task"
)
type Transformer struct {
*Publisher
*Subscriber
type (
ITransformer interface {
task.ITask
GetTransformContext() *TransformContext
}
Transformer = func() ITransformer
TransformContext struct {
task.MarcoTask
FromStreamPath string // 待转换的本地流
ToStreamPath string // 转换后的本地流
Plugin *Plugin
Publisher *Publisher
Subscriber *Subscriber
transformer ITransformer
}
DefaultTransformer struct {
task.Task
Ctx TransformContext
}
)
func (r *DefaultTransformer) GetTransformContext() *TransformContext {
return &r.Ctx
}
func (t *Transformer) Transform() {
PlayBlock(t.Subscriber, func(audioFrame *pkg.AVFrame) error {
//t.Publisher.WriteAudio()
return nil
}, func(videoFrame *pkg.AVFrame) error {
//t.Publisher.WriteVideo()
return nil
})
func (r *DefaultTransformer) Start() (err error) {
return r.Ctx.Subscribe()
}
func (p *TransformContext) GetKey() string {
return p.ToStreamPath
}
func (p *TransformContext) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.transformer.GetTask().Context, p.FromStreamPath)
return
}
func (p *TransformContext) Publish() (err error) {
p.Publisher, err = p.Plugin.Publish(p.transformer.GetTask().Context, p.ToStreamPath)
return
}
func (p *TransformContext) Init(transformer ITransformer, plugin *Plugin, fromStreamPath string, toStreamPath string) *TransformContext {
p.Plugin = plugin
p.FromStreamPath = fromStreamPath
p.ToStreamPath = toStreamPath
p.Logger = plugin.Logger.With("fromStreamPath", fromStreamPath, "toStreamPath", toStreamPath)
if recorderTask := transformer.GetTask(); recorderTask.Logger == nil {
recorderTask.Logger = p.Logger
}
p.transformer = transformer
plugin.Server.Transforms.Add(p)
return p
}
func (p *TransformContext) Start() (err error) {
s := p.Plugin.Server
if _, ok := s.Transforms.Get(p.GetKey()); ok {
return pkg.ErrRecordSamePath
}
s.Transforms.Add(p)
s.AddTask(p.transformer)
return
}
func (p *TransformContext) Dispose() {
p.Plugin.Server.Transforms.Remove(p)
}