diff --git a/example/default/main.go b/example/default/main.go index e161222..9597798 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -9,6 +9,7 @@ import ( _ "m7s.live/m7s/v5/plugin/flv" _ "m7s.live/m7s/v5/plugin/gb28181" _ "m7s.live/m7s/v5/plugin/logrotate" + _ "m7s.live/m7s/v5/plugin/monitor" _ "m7s.live/m7s/v5/plugin/mp4" _ "m7s.live/m7s/v5/plugin/preview" _ "m7s.live/m7s/v5/plugin/rtmp" diff --git a/pkg/util/task.go b/pkg/util/task.go index 0fd378a..ac477c6 100644 --- a/pkg/util/task.go +++ b/pkg/util/task.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "reflect" + "runtime/debug" "time" ) @@ -39,6 +40,7 @@ type ( OnDispose(func()) } IMarcoTask interface { + ITask RangeSubTask(func(yield ITask) bool) OnTaskAdded(func(ITask)) } @@ -187,6 +189,14 @@ func (task *Task) checkRetry(err error) (bool, error) { } func (task *Task) start() (err error) { + defer func() { + if r := recover(); r != nil { + err = errors.New(fmt.Sprint(r)) + if task.Logger != nil { + task.Error("panic", "error", err, "stack", string(debug.Stack())) + } + } + }() task.StartTime = time.Now() if task.Logger != nil { task.Debug("task start", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) diff --git a/plugin.go b/plugin.go index 043bcab..71e4b6c 100644 --- a/plugin.go +++ b/plugin.go @@ -419,19 +419,16 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber * func (p *Plugin) Pull(streamPath string, url string) { puller := p.Meta.Puller() p.Server.AddPullTask(puller.GetPullContext().Init(puller, p, streamPath, url)) - return } -func (p *Plugin) Push(streamPath string, url string) (ctx *PushContext, err error) { +func (p *Plugin) Push(streamPath string, url string) { pusher := p.Meta.Pusher() p.Server.AddPushTask(pusher.GetPushContext().Init(pusher, p, streamPath, url)) - return } -func (p *Plugin) Record(streamPath string, filePath string) (ctx *RecordContext, err error) { - ctx = createRecoder(p, streamPath, filePath) - err = p.Server.recordTask.AddTask(ctx).WaitStarted() - return +func (p *Plugin) Record(streamPath string, filePath string) { + recorder := p.Meta.Recorder() + p.Server.AddRecordTask(recorder.GetRecordContext().Init(recorder, p, streamPath, filePath)) } func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) { diff --git a/plugin/flv/index.go b/plugin/flv/index.go index 6ae1f50..60f651a 100644 --- a/plugin/flv/index.go +++ b/plugin/flv/index.go @@ -28,7 +28,7 @@ func (plugin *FLVPlugin) OnInit() error { return nil } -var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, RecordFlv) +var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, NewRecorder) func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv") diff --git a/plugin/flv/pkg/pull.go b/plugin/flv/pkg/pull.go index 99c73ff..ce478fb 100644 --- a/plugin/flv/pkg/pull.go +++ b/plugin/flv/pkg/pull.go @@ -2,25 +2,13 @@ package flv import ( "errors" - "io" - "net/http" - "net/url" - "os" - "strings" - "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/util" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" ) type Puller struct { - util.Task - Ctx m7s.PullContext - *util.BufReader -} - -func (p *Puller) GetPullContext() *m7s.PullContext { - return &p.Ctx + m7s.HttpFilePuller } func NewPuller() m7s.IPuller { @@ -28,7 +16,8 @@ func NewPuller() m7s.IPuller { } func (p *Puller) Run() (err error) { - reader, publisher := p.BufReader, p.Ctx.Publisher + reader := util.NewBufReader(p.ReadCloser) + publisher := p.Ctx.Publisher var hasAudio, hasVideo bool var absTS uint32 var head util.Memory @@ -111,40 +100,3 @@ func (p *Puller) Run() (err error) { } return } - -func (p *Puller) Start() (err error) { - if err = p.Ctx.Publish(); err != nil { - return - } - remoteURL := p.Ctx.RemoteURL - if strings.HasPrefix(remoteURL, "http") { - var res *http.Response - client := http.DefaultClient - if proxyConf := p.Ctx.ConnectProxy; proxyConf != "" { - proxy, err := url.Parse(proxyConf) - if err != nil { - return err - } - transport := &http.Transport{Proxy: http.ProxyURL(proxy)} - client = &http.Client{Transport: transport} - } - if res, err = client.Get(remoteURL); err == nil { - if res.StatusCode != http.StatusOK { - return io.EOF - } - p.OnDispose(func() { - res.Body.Close() - }) - p.BufReader = util.NewBufReader(res.Body) - } - } else { - var res *os.File - if res, err = os.Open(remoteURL); err == nil { - p.OnDispose(func() { - res.Close() - }) - p.BufReader = util.NewBufReader(res) - } - } - return -} diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index 0b9c3a4..db48756 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -134,12 +134,21 @@ func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64, writeMetaTagQueueTask.AddTask(task) } -func RecordFlv(ctx *m7s.RecordContext) (err error) { +func NewRecorder() m7s.IRecorder { + return &Recorder{} +} + +type Recorder struct { + m7s.DefaultRecorder +} + +func (r *Recorder) Run() (err error) { var file *os.File var filepositions []uint64 var times []float64 var offset int64 var duration int64 + ctx := &r.Ctx suber := ctx.Subscriber noFragment := ctx.Fragment == 0 || ctx.Append if noFragment { diff --git a/plugin/monitor/api.go b/plugin/monitor/api.go index dc5f597..b17927e 100644 --- a/plugin/monitor/api.go +++ b/plugin/monitor/api.go @@ -16,13 +16,15 @@ func (cfg *MonitorPlugin) SearchTask(ctx context.Context, req *pb.SearchTaskRequ res.Data = slices.Collect(func(yield func(*pb.Task) bool) { for _, t := range tasks { yield(&pb.Task{ - Id: t.ID, + Id: t.TaskID, StartTime: timestamppb.New(t.StartTime), - EndTime: timestamppb.New(t.CreatedAt), + EndTime: timestamppb.New(t.EndTime), Owner: t.OwnerType, Type: uint32(t.TaskType), Description: t.Description, Reason: t.Reason, + SessionId: t.SessionID, + ParentId: t.ParentID, }) } }) diff --git a/plugin/monitor/index.go b/plugin/monitor/index.go index 55db896..14f084b 100644 --- a/plugin/monitor/index.go +++ b/plugin/monitor/index.go @@ -5,10 +5,16 @@ import ( "m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/plugin/monitor/pb" monitor "m7s.live/m7s/v5/plugin/monitor/pkg" + "os" "time" ) var _ = m7s.InstallPlugin[MonitorPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler) +var sessionID uint32 + +func init() { + sessionID = uint32(os.Getpid()<<16) | uint32(uint16(time.Now().UnixNano())) +} type MonitorPlugin struct { pb.UnimplementedApiServer @@ -16,12 +22,14 @@ type MonitorPlugin struct { //columnstore *frostdb.ColumnStore } -func (cfg *MonitorPlugin) taskDisposeListener(task *util.Task) func() { +func (cfg *MonitorPlugin) taskDisposeListener(task *util.Task, mt util.IMarcoTask) func() { return func() { var th monitor.Task - th.ID = task.ID + th.SessionID = sessionID + th.TaskID = task.ID + th.ParentID = mt.GetTask().ID th.StartTime = task.StartTime - th.CreatedAt = time.Now() + th.EndTime = time.Now() th.OwnerType = task.GetOwnerType() th.TaskType = task.GetTaskTypeID() th.Reason = task.StopReason().Error() @@ -31,10 +39,10 @@ func (cfg *MonitorPlugin) taskDisposeListener(task *util.Task) func() { func (cfg *MonitorPlugin) monitorTask(mt util.IMarcoTask) { mt.OnTaskAdded(func(task util.ITask) { - task.GetTask().OnDispose(cfg.taskDisposeListener(task.GetTask())) + task.GetTask().OnDispose(cfg.taskDisposeListener(task.GetTask(), mt)) }) for t := range mt.RangeSubTask { - t.OnDispose(cfg.taskDisposeListener(t.GetTask())) + t.OnDispose(cfg.taskDisposeListener(t.GetTask(), mt)) if mt, ok := t.(util.IMarcoTask); ok { cfg.monitorTask(mt) } diff --git a/plugin/monitor/pb/monitor.pb.go b/plugin/monitor/pb/monitor.pb.go index 4f7519c..3b98913 100644 --- a/plugin/monitor/pb/monitor.pb.go +++ b/plugin/monitor/pb/monitor.pb.go @@ -72,6 +72,8 @@ type Task struct { EndTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=endTime,proto3" json:"endTime,omitempty"` Description string `protobuf:"bytes,6,opt,name=description,proto3" json:"description,omitempty"` Reason string `protobuf:"bytes,7,opt,name=reason,proto3" json:"reason,omitempty"` + SessionId uint32 `protobuf:"varint,8,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + ParentId uint32 `protobuf:"varint,9,opt,name=parentId,proto3" json:"parentId,omitempty"` } func (x *Task) Reset() { @@ -155,6 +157,20 @@ func (x *Task) GetReason() string { return "" } +func (x *Task) GetSessionId() uint32 { + if x != nil { + return x.SessionId + } + return 0 +} + +func (x *Task) GetParentId() uint32 { + if x != nil { + return x.ParentId + } + return 0 +} + type SearchTaskResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -227,7 +243,7 @@ var file_monitor_proto_rawDesc = []byte{ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x65, 0x61, 0x72, 0x63, - 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xea, 0x01, 0x0a, + 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xa4, 0x02, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, @@ -242,23 +258,27 @@ var file_monitor_proto_rawDesc = []byte{ 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x65, 0x0a, 0x12, 0x53, 0x65, 0x61, - 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, - 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x6f, - 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x32, 0x71, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, 0x61, 0x72, 0x63, - 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, - 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, - 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, - 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, - 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, 0x61, 0x73, 0x6b, - 0x3a, 0x01, 0x2a, 0x42, 0x23, 0x5a, 0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, - 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, - 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x22, 0x65, 0x0a, 0x12, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, + 0x54, 0x61, 0x73, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x71, 0x0a, 0x03, 0x61, 0x70, + 0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, + 0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, + 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, + 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, + 0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, + 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x3a, 0x01, 0x2a, 0x42, 0x23, 0x5a, + 0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, + 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, + 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/plugin/monitor/pb/monitor.proto b/plugin/monitor/pb/monitor.proto index 7aabedb..f25d583 100644 --- a/plugin/monitor/pb/monitor.proto +++ b/plugin/monitor/pb/monitor.proto @@ -25,6 +25,8 @@ message Task { google.protobuf.Timestamp endTime = 5; string description = 6; string reason = 7; + uint32 sessionId = 8; + uint32 parentId = 9; } message SearchTaskResponse { diff --git a/plugin/monitor/pkg/schema-task.go b/plugin/monitor/pkg/schema-task.go index c975e93..3c4f1eb 100644 --- a/plugin/monitor/pkg/schema-task.go +++ b/plugin/monitor/pkg/schema-task.go @@ -5,15 +5,11 @@ import ( ) type Task struct { - ID uint32 `gorm:"primarykey"` - CreatedAt time.Time - StartTime time.Time - OwnerType string - TaskType byte - Description string - Reason string -} - -func (i *Task) GetKey() uint32 { - return i.ID + ID uint `gorm:"primarykey"` + SessionID, TaskID, ParentID uint32 + StartTime, EndTime time.Time + OwnerType string + TaskType byte + Description string + Reason string } diff --git a/plugin/mp4/index.go b/plugin/mp4/index.go index 6cadb4d..34b02c3 100644 --- a/plugin/mp4/index.go +++ b/plugin/mp4/index.go @@ -81,7 +81,7 @@ func (p *MP4Plugin) OnInit() error { return nil } -var _ = m7s.InstallPlugin[MP4Plugin](defaultConfig, pkg.PullMP4, pkg.RecordMP4) +var _ = m7s.InstallPlugin[MP4Plugin](defaultConfig, pkg.NewPuller, pkg.NewRecorder) func (p *MP4Plugin) GetPullableList() []string { return slices.Collect(maps.Keys(p.GetCommonConf().PullOnSub)) diff --git a/plugin/mp4/pkg/pull.go b/plugin/mp4/pkg/pull.go index 9de2ee4..cc6a8d0 100644 --- a/plugin/mp4/pkg/pull.go +++ b/plugin/mp4/pkg/pull.go @@ -8,44 +8,28 @@ import ( "m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/plugin/mp4/pkg/box" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" - "net/http" - "net/url" - "os" "strings" ) -func PullMP4(ctx *m7s.PullContext) (err error) { - var demuxer *box.MovDemuxer - if strings.HasPrefix(ctx.RemoteURL, "http") { - var res *http.Response - client := http.DefaultClient - if proxyConf := ctx.ConnectProxy; proxyConf != "" { - proxy, err := url.Parse(proxyConf) - if err != nil { - return err - } - transport := &http.Transport{Proxy: http.ProxyURL(proxy)} - client = &http.Client{Transport: transport} - } - if res, err = client.Get(ctx.RemoteURL); err == nil { - if res.StatusCode != http.StatusOK { - return io.EOF - } - defer res.Body.Close() - content, err := io.ReadAll(res.Body) - if err != nil { - return err - } - demuxer = box.CreateMp4Demuxer(strings.NewReader(string(content))) - } - } else { - var res *os.File - if res, err = os.Open(ctx.RemoteURL); err == nil { - defer res.Close() - } - demuxer = box.CreateMp4Demuxer(res) - } +type Puller struct { + m7s.HttpFilePuller +} +func NewPuller() m7s.IPuller { + return &Puller{} +} + +func (p *Puller) Run() (err error) { + ctx := &p.Ctx + var demuxer *box.MovDemuxer + switch v := p.ReadCloser.(type) { + case io.ReadSeeker: + demuxer = box.CreateMp4Demuxer(v) + default: + var content []byte + content, err = io.ReadAll(p.ReadCloser) + demuxer = box.CreateMp4Demuxer(strings.NewReader(string(content))) + } var tracks []box.TrackInfo if tracks, err = demuxer.ReadHead(); err != nil { return diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index e98d256..99c2096 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -4,12 +4,48 @@ import ( "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/plugin/mp4/pkg/box" "os" "time" ) -func RecordMP4(ctx *m7s.RecordContext) (err error) { +type WriteTrailerQueueTask struct { + util.MarcoLongTask +} + +var writeTrailerQueueTask WriteTrailerQueueTask + +func init() { + m7s.AddRootTask(&writeTrailerQueueTask) +} + +func NewRecorder() *Recorder { + return &Recorder{} +} + +type Recorder struct { + m7s.DefaultRecorder +} + +type writeTrailerTask struct { + util.Task + muxer *box.Movmuxer + file *os.File +} + +func (task *writeTrailerTask) Start() (err error) { + err = task.muxer.WriteTrailer() + if err != nil { + task.Error("write trailer", "err", err) + } else { + task.Info("write trailer") + } + return task.file.Close() +} + +func (r *Recorder) Run() (err error) { + ctx := &r.Ctx var file *os.File var muxer *box.Movmuxer var audioId, videoId uint32 @@ -17,16 +53,13 @@ func RecordMP4(ctx *m7s.RecordContext) (err error) { if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666); err != nil { return } - defer func() { - err = muxer.WriteTrailer() - if err != nil { - ctx.Error("write trailer", "err", err) - } else { - ctx.Info("write trailer") - } - err = file.Close() - }() muxer, err = box.CreateMp4Muxer(file) + task := &writeTrailerTask{ + file: file, + muxer: muxer, + } + task.Logger = r.Logger + defer writeTrailerQueueTask.AddTask(task) ar, vr := ctx.Subscriber.AudioReader, ctx.Subscriber.VideoReader if ar != nil { audioTrack := ar.Track diff --git a/plugin/rtmp/api.go b/plugin/rtmp/api.go index 9b3ba85..bbb6412 100644 --- a/plugin/rtmp/api.go +++ b/plugin/rtmp/api.go @@ -4,9 +4,11 @@ import ( "context" gpb "m7s.live/m7s/v5/pb" "m7s.live/m7s/v5/plugin/rtmp/pb" + rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" ) func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *gpb.SuccessResponse, err error) { - _, err = r.Push(req.StreamPath, req.RemoteURL) + pusher := rtmp.NewPusher() + err = r.Server.AddPushTask(pusher.GetPushContext().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL)).WaitStarted() return &gpb.SuccessResponse{}, err } diff --git a/publisher.go b/publisher.go index 800d8de..f5cb582 100644 --- a/publisher.go +++ b/publisher.go @@ -503,6 +503,7 @@ func (p *Publisher) Dispose() { w.baseTs = p.lastTs w.Info("takeOver", "pId", p.ID) for subscriber := range p.SubscriberRange { + subscriber.Publisher = nil w.Add(subscriber) } p.AudioTrack.Dispose() diff --git a/puller.go b/puller.go index 11f6091..b052825 100644 --- a/puller.go +++ b/puller.go @@ -1,34 +1,47 @@ package m7s import ( + "io" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/util" + "net/http" + "net/url" + "os" + "strings" "time" ) -type Connection struct { - util.MarcoTask - Plugin *Plugin - StreamPath string // 对应本地流 - RemoteURL string // 远程服务器地址(用于推拉) - ConnectProxy string // 连接代理 -} +type ( + Connection struct { + util.MarcoTask + Plugin *Plugin + StreamPath string // 对应本地流 + RemoteURL string // 远程服务器地址(用于推拉) + ConnectProxy string // 连接代理 + } -type IPuller interface { - util.ITask - GetPullContext() *PullContext -} + IPuller interface { + util.ITask + GetPullContext() *PullContext + } -type Puller = func() IPuller + Puller = func() IPuller -type PullContext struct { - Connection - Publisher *Publisher - publishConfig *config.Publish - config.Pull - puller IPuller -} + PullContext struct { + Connection + Publisher *Publisher + publishConfig *config.Publish + config.Pull + puller IPuller + } + + HttpFilePuller struct { + util.Task + Ctx PullContext + io.ReadCloser + } +) func (p *PullContext) GetPullContext() *PullContext { return p @@ -71,3 +84,42 @@ func (p *PullContext) Start() (err error) { func (p *PullContext) Dispose() { p.Plugin.Server.Pulls.Remove(p) } + +func (p *HttpFilePuller) Start() (err error) { + if err = p.Ctx.Publish(); err != nil { + return + } + remoteURL := p.Ctx.RemoteURL + if strings.HasPrefix(remoteURL, "http") { + var res *http.Response + client := http.DefaultClient + if proxyConf := p.Ctx.ConnectProxy; proxyConf != "" { + proxy, err := url.Parse(proxyConf) + if err != nil { + return err + } + transport := &http.Transport{Proxy: http.ProxyURL(proxy)} + client = &http.Client{Transport: transport} + } + if res, err = client.Get(remoteURL); err == nil { + if res.StatusCode != http.StatusOK { + return io.EOF + } + p.ReadCloser = res.Body + } + } else { + var res *os.File + if res, err = os.Open(remoteURL); err == nil { + p.ReadCloser = res + } + } + return +} + +func (p *HttpFilePuller) GetPullContext() *PullContext { + return &p.Ctx +} + +func (p *HttpFilePuller) Dispose() { + p.ReadCloser.Close() +} diff --git a/recoder.go b/recoder.go index ecd3163..1334f4b 100644 --- a/recoder.go +++ b/recoder.go @@ -9,68 +9,54 @@ import ( "m7s.live/m7s/v5/pkg" ) -type Recorder = func(*RecordContext) error - -func createRecoder(p *Plugin, streamPath string, filePath string) (recorder *RecordContext) { - recorder = &RecordContext{ - Plugin: p, - Fragment: p.config.Record.Fragment, - Append: p.config.Record.Append, - FilePath: filePath, - StreamPath: streamPath, +type ( + IRecorder interface { + util.ITask + GetRecordContext() *RecordContext } - recorder.Logger = p.Logger.With("filePath", filePath, "streamPath", streamPath) - return + Recorder = func() IRecorder + RecordContext struct { + util.MarcoTask + StreamPath string // 对应本地流 + Plugin *Plugin + Subscriber *Subscriber + Fragment time.Duration + Append bool + FilePath string + recorder IRecorder + } + DefaultRecorder struct { + util.Task + Ctx RecordContext + } +) + +func (r *DefaultRecorder) GetRecordContext() *RecordContext { + return &r.Ctx } -type RecordContext struct { - util.MarcoTask - StreamPath string // 对应本地流 - Plugin *Plugin - Subscriber *Subscriber - Fragment time.Duration - Append bool - FilePath string +func (r *DefaultRecorder) Start() (err error) { + return r.Ctx.Subscribe() } func (p *RecordContext) GetKey() string { return p.FilePath } -type recordSubTask struct { - util.Task - ctx *RecordContext - Recorder -} - -func (r *recordSubTask) Start() (err error) { - p := r.ctx - dir := p.FilePath - if p.Fragment == 0 || p.Append { - if filepath.Ext(p.FilePath) == "" { - p.FilePath += ".flv" - } - dir = filepath.Dir(p.FilePath) - } - r.Description = map[string]any{ - "filePath": p.FilePath, - } - if err = os.MkdirAll(dir, 0755); err != nil { - return - } - p.Subscriber, err = p.Plugin.Subscribe(r.Context, p.StreamPath) - if err != nil { - return - } - err = r.Recorder(p) +func (p *RecordContext) Subscribe() (err error) { + p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath) return } -func (p *RecordContext) Do(recorder Recorder) { - p.AddTask(&recordSubTask{ - ctx: p, - Recorder: recorder, - }) +func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath string, filePath string) *RecordContext { + p.Plugin = plugin + p.Fragment = plugin.config.Record.Fragment + p.Append = plugin.config.Record.Append + p.FilePath = filePath + p.StreamPath = streamPath + p.Logger = plugin.Logger.With("filePath", filePath, "streamPath", streamPath) + p.recorder = recorder + return p } func (p *RecordContext) Start() (err error) { @@ -78,10 +64,21 @@ func (p *RecordContext) Start() (err error) { if _, ok := s.Records.Get(p.GetKey()); ok { return pkg.ErrRecordSamePath } - s.Records.Add(p) - if p.Plugin.Meta.Recorder != nil { - p.Do(p.Plugin.Meta.Recorder) + dir := p.FilePath + if p.Fragment == 0 || p.Append { + if filepath.Ext(p.FilePath) == "" { + p.FilePath += ".flv" + } + dir = filepath.Dir(p.FilePath) } + p.Description = map[string]any{ + "filePath": p.FilePath, + } + if err = os.MkdirAll(dir, 0755); err != nil { + return + } + s.Records.Add(p) + s.AddTask(p.recorder) return } diff --git a/server.go b/server.go index 4d4cab7..9c6c041 100644 --- a/server.go +++ b/server.go @@ -275,6 +275,10 @@ func (s *Server) AddPushTask(task *PushContext) *util.Task { return s.pushTask.AddTask(task) } +func (s *Server) AddRecordTask(task *RecordContext) *util.Task { + return s.recordTask.AddTask(task) +} + func (s *Server) Dispose() { Servers.Remove(s) _ = s.tcplis.Close() diff --git a/subscriber.go b/subscriber.go index 48b0e36..d66d957 100644 --- a/subscriber.go +++ b/subscriber.go @@ -258,7 +258,11 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) { } checkPublisherChange := func() { if prePublisher != s.Publisher { - s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) + if s.Publisher == nil { + s.Info("publisher gone", "prePublisher", prePublisher.ID) + } else { + s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) + } if s.AudioReader != nil { startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond s.AudioReader.StopRead()