fix: publish gone

This commit is contained in:
langhuihui
2024-08-19 12:18:26 +08:00
parent 3d0609e4a8
commit 807faa0536
20 changed files with 289 additions and 215 deletions

View File

@@ -9,6 +9,7 @@ import (
_ "m7s.live/m7s/v5/plugin/flv"
_ "m7s.live/m7s/v5/plugin/gb28181"
_ "m7s.live/m7s/v5/plugin/logrotate"
_ "m7s.live/m7s/v5/plugin/monitor"
_ "m7s.live/m7s/v5/plugin/mp4"
_ "m7s.live/m7s/v5/plugin/preview"
_ "m7s.live/m7s/v5/plugin/rtmp"

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"reflect"
"runtime/debug"
"time"
)
@@ -39,6 +40,7 @@ type (
OnDispose(func())
}
IMarcoTask interface {
ITask
RangeSubTask(func(yield ITask) bool)
OnTaskAdded(func(ITask))
}
@@ -187,6 +189,14 @@ func (task *Task) checkRetry(err error) (bool, error) {
}
func (task *Task) start() (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
if task.Logger != nil {
task.Error("panic", "error", err, "stack", string(debug.Stack()))
}
}
}()
task.StartTime = time.Now()
if task.Logger != nil {
task.Debug("task start", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())

View File

@@ -419,19 +419,16 @@ func (p *Plugin) Subscribe(ctx context.Context, streamPath string) (subscriber *
func (p *Plugin) Pull(streamPath string, url string) {
puller := p.Meta.Puller()
p.Server.AddPullTask(puller.GetPullContext().Init(puller, p, streamPath, url))
return
}
func (p *Plugin) Push(streamPath string, url string) (ctx *PushContext, err error) {
func (p *Plugin) Push(streamPath string, url string) {
pusher := p.Meta.Pusher()
p.Server.AddPushTask(pusher.GetPushContext().Init(pusher, p, streamPath, url))
return
}
func (p *Plugin) Record(streamPath string, filePath string) (ctx *RecordContext, err error) {
ctx = createRecoder(p, streamPath, filePath)
err = p.Server.recordTask.AddTask(ctx).WaitStarted()
return
func (p *Plugin) Record(streamPath string, filePath string) {
recorder := p.Meta.Recorder()
p.Server.AddRecordTask(recorder.GetRecordContext().Init(recorder, p, streamPath, filePath))
}
func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {

View File

@@ -28,7 +28,7 @@ func (plugin *FLVPlugin) OnInit() error {
return nil
}
var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, RecordFlv)
var _ = m7s.InstallPlugin[FLVPlugin](defaultConfig, NewPuller, NewRecorder)
func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv")

View File

@@ -2,25 +2,13 @@ package flv
import (
"errors"
"io"
"net/http"
"net/url"
"os"
"strings"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
type Puller struct {
util.Task
Ctx m7s.PullContext
*util.BufReader
}
func (p *Puller) GetPullContext() *m7s.PullContext {
return &p.Ctx
m7s.HttpFilePuller
}
func NewPuller() m7s.IPuller {
@@ -28,7 +16,8 @@ func NewPuller() m7s.IPuller {
}
func (p *Puller) Run() (err error) {
reader, publisher := p.BufReader, p.Ctx.Publisher
reader := util.NewBufReader(p.ReadCloser)
publisher := p.Ctx.Publisher
var hasAudio, hasVideo bool
var absTS uint32
var head util.Memory
@@ -111,40 +100,3 @@ func (p *Puller) Run() (err error) {
}
return
}
func (p *Puller) Start() (err error) {
if err = p.Ctx.Publish(); err != nil {
return
}
remoteURL := p.Ctx.RemoteURL
if strings.HasPrefix(remoteURL, "http") {
var res *http.Response
client := http.DefaultClient
if proxyConf := p.Ctx.ConnectProxy; proxyConf != "" {
proxy, err := url.Parse(proxyConf)
if err != nil {
return err
}
transport := &http.Transport{Proxy: http.ProxyURL(proxy)}
client = &http.Client{Transport: transport}
}
if res, err = client.Get(remoteURL); err == nil {
if res.StatusCode != http.StatusOK {
return io.EOF
}
p.OnDispose(func() {
res.Body.Close()
})
p.BufReader = util.NewBufReader(res.Body)
}
} else {
var res *os.File
if res, err = os.Open(remoteURL); err == nil {
p.OnDispose(func() {
res.Close()
})
p.BufReader = util.NewBufReader(res)
}
}
return
}

View File

@@ -134,12 +134,21 @@ func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64,
writeMetaTagQueueTask.AddTask(task)
}
func RecordFlv(ctx *m7s.RecordContext) (err error) {
func NewRecorder() m7s.IRecorder {
return &Recorder{}
}
type Recorder struct {
m7s.DefaultRecorder
}
func (r *Recorder) Run() (err error) {
var file *os.File
var filepositions []uint64
var times []float64
var offset int64
var duration int64
ctx := &r.Ctx
suber := ctx.Subscriber
noFragment := ctx.Fragment == 0 || ctx.Append
if noFragment {

View File

@@ -16,13 +16,15 @@ func (cfg *MonitorPlugin) SearchTask(ctx context.Context, req *pb.SearchTaskRequ
res.Data = slices.Collect(func(yield func(*pb.Task) bool) {
for _, t := range tasks {
yield(&pb.Task{
Id: t.ID,
Id: t.TaskID,
StartTime: timestamppb.New(t.StartTime),
EndTime: timestamppb.New(t.CreatedAt),
EndTime: timestamppb.New(t.EndTime),
Owner: t.OwnerType,
Type: uint32(t.TaskType),
Description: t.Description,
Reason: t.Reason,
SessionId: t.SessionID,
ParentId: t.ParentID,
})
}
})

View File

@@ -5,10 +5,16 @@ import (
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/monitor/pb"
monitor "m7s.live/m7s/v5/plugin/monitor/pkg"
"os"
"time"
)
var _ = m7s.InstallPlugin[MonitorPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)
var sessionID uint32
func init() {
sessionID = uint32(os.Getpid()<<16) | uint32(uint16(time.Now().UnixNano()))
}
type MonitorPlugin struct {
pb.UnimplementedApiServer
@@ -16,12 +22,14 @@ type MonitorPlugin struct {
//columnstore *frostdb.ColumnStore
}
func (cfg *MonitorPlugin) taskDisposeListener(task *util.Task) func() {
func (cfg *MonitorPlugin) taskDisposeListener(task *util.Task, mt util.IMarcoTask) func() {
return func() {
var th monitor.Task
th.ID = task.ID
th.SessionID = sessionID
th.TaskID = task.ID
th.ParentID = mt.GetTask().ID
th.StartTime = task.StartTime
th.CreatedAt = time.Now()
th.EndTime = time.Now()
th.OwnerType = task.GetOwnerType()
th.TaskType = task.GetTaskTypeID()
th.Reason = task.StopReason().Error()
@@ -31,10 +39,10 @@ func (cfg *MonitorPlugin) taskDisposeListener(task *util.Task) func() {
func (cfg *MonitorPlugin) monitorTask(mt util.IMarcoTask) {
mt.OnTaskAdded(func(task util.ITask) {
task.GetTask().OnDispose(cfg.taskDisposeListener(task.GetTask()))
task.GetTask().OnDispose(cfg.taskDisposeListener(task.GetTask(), mt))
})
for t := range mt.RangeSubTask {
t.OnDispose(cfg.taskDisposeListener(t.GetTask()))
t.OnDispose(cfg.taskDisposeListener(t.GetTask(), mt))
if mt, ok := t.(util.IMarcoTask); ok {
cfg.monitorTask(mt)
}

View File

@@ -72,6 +72,8 @@ type Task struct {
EndTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=endTime,proto3" json:"endTime,omitempty"`
Description string `protobuf:"bytes,6,opt,name=description,proto3" json:"description,omitempty"`
Reason string `protobuf:"bytes,7,opt,name=reason,proto3" json:"reason,omitempty"`
SessionId uint32 `protobuf:"varint,8,opt,name=sessionId,proto3" json:"sessionId,omitempty"`
ParentId uint32 `protobuf:"varint,9,opt,name=parentId,proto3" json:"parentId,omitempty"`
}
func (x *Task) Reset() {
@@ -155,6 +157,20 @@ func (x *Task) GetReason() string {
return ""
}
func (x *Task) GetSessionId() uint32 {
if x != nil {
return x.SessionId
}
return 0
}
func (x *Task) GetParentId() uint32 {
if x != nil {
return x.ParentId
}
return 0
}
type SearchTaskResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -227,7 +243,7 @@ var file_monitor_proto_rawDesc = []byte{
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x65, 0x61, 0x72, 0x63,
0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xea, 0x01, 0x0a,
0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xa4, 0x02, 0x0a,
0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74,
@@ -242,23 +258,27 @@ var file_monitor_proto_rawDesc = []byte{
0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x65, 0x0a, 0x12, 0x53, 0x65, 0x61,
0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63,
0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x6f,
0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
0x32, 0x71, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, 0x61, 0x72, 0x63,
0x68, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e,
0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72,
0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23,
0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72,
0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, 0x61, 0x73, 0x6b,
0x3a, 0x01, 0x2a, 0x42, 0x23, 0x5a, 0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f,
0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f,
0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x73, 0x65,
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x22, 0x65, 0x0a, 0x12, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73,
0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18,
0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e,
0x54, 0x61, 0x73, 0x6b, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x71, 0x0a, 0x03, 0x61, 0x70,
0x69, 0x12, 0x6a, 0x0a, 0x0a, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b, 0x12,
0x1a, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68,
0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x6d, 0x6f,
0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x54, 0x61, 0x73, 0x6b,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d,
0x22, 0x18, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73,
0x65, 0x61, 0x72, 0x63, 0x68, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x3a, 0x01, 0x2a, 0x42, 0x23, 0x5a,
0x21, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35,
0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f,
0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -25,6 +25,8 @@ message Task {
google.protobuf.Timestamp endTime = 5;
string description = 6;
string reason = 7;
uint32 sessionId = 8;
uint32 parentId = 9;
}
message SearchTaskResponse {

View File

@@ -5,15 +5,11 @@ import (
)
type Task struct {
ID uint32 `gorm:"primarykey"`
CreatedAt time.Time
StartTime time.Time
OwnerType string
TaskType byte
Description string
Reason string
}
func (i *Task) GetKey() uint32 {
return i.ID
ID uint `gorm:"primarykey"`
SessionID, TaskID, ParentID uint32
StartTime, EndTime time.Time
OwnerType string
TaskType byte
Description string
Reason string
}

View File

@@ -81,7 +81,7 @@ func (p *MP4Plugin) OnInit() error {
return nil
}
var _ = m7s.InstallPlugin[MP4Plugin](defaultConfig, pkg.PullMP4, pkg.RecordMP4)
var _ = m7s.InstallPlugin[MP4Plugin](defaultConfig, pkg.NewPuller, pkg.NewRecorder)
func (p *MP4Plugin) GetPullableList() []string {
return slices.Collect(maps.Keys(p.GetCommonConf().PullOnSub))

View File

@@ -8,44 +8,28 @@ import (
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/mp4/pkg/box"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
"net/http"
"net/url"
"os"
"strings"
)
func PullMP4(ctx *m7s.PullContext) (err error) {
var demuxer *box.MovDemuxer
if strings.HasPrefix(ctx.RemoteURL, "http") {
var res *http.Response
client := http.DefaultClient
if proxyConf := ctx.ConnectProxy; proxyConf != "" {
proxy, err := url.Parse(proxyConf)
if err != nil {
return err
}
transport := &http.Transport{Proxy: http.ProxyURL(proxy)}
client = &http.Client{Transport: transport}
}
if res, err = client.Get(ctx.RemoteURL); err == nil {
if res.StatusCode != http.StatusOK {
return io.EOF
}
defer res.Body.Close()
content, err := io.ReadAll(res.Body)
if err != nil {
return err
}
demuxer = box.CreateMp4Demuxer(strings.NewReader(string(content)))
}
} else {
var res *os.File
if res, err = os.Open(ctx.RemoteURL); err == nil {
defer res.Close()
}
demuxer = box.CreateMp4Demuxer(res)
}
type Puller struct {
m7s.HttpFilePuller
}
func NewPuller() m7s.IPuller {
return &Puller{}
}
func (p *Puller) Run() (err error) {
ctx := &p.Ctx
var demuxer *box.MovDemuxer
switch v := p.ReadCloser.(type) {
case io.ReadSeeker:
demuxer = box.CreateMp4Demuxer(v)
default:
var content []byte
content, err = io.ReadAll(p.ReadCloser)
demuxer = box.CreateMp4Demuxer(strings.NewReader(string(content)))
}
var tracks []box.TrackInfo
if tracks, err = demuxer.ReadHead(); err != nil {
return

View File

@@ -4,12 +4,48 @@ import (
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/mp4/pkg/box"
"os"
"time"
)
func RecordMP4(ctx *m7s.RecordContext) (err error) {
type WriteTrailerQueueTask struct {
util.MarcoLongTask
}
var writeTrailerQueueTask WriteTrailerQueueTask
func init() {
m7s.AddRootTask(&writeTrailerQueueTask)
}
func NewRecorder() *Recorder {
return &Recorder{}
}
type Recorder struct {
m7s.DefaultRecorder
}
type writeTrailerTask struct {
util.Task
muxer *box.Movmuxer
file *os.File
}
func (task *writeTrailerTask) Start() (err error) {
err = task.muxer.WriteTrailer()
if err != nil {
task.Error("write trailer", "err", err)
} else {
task.Info("write trailer")
}
return task.file.Close()
}
func (r *Recorder) Run() (err error) {
ctx := &r.Ctx
var file *os.File
var muxer *box.Movmuxer
var audioId, videoId uint32
@@ -17,16 +53,13 @@ func RecordMP4(ctx *m7s.RecordContext) (err error) {
if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666); err != nil {
return
}
defer func() {
err = muxer.WriteTrailer()
if err != nil {
ctx.Error("write trailer", "err", err)
} else {
ctx.Info("write trailer")
}
err = file.Close()
}()
muxer, err = box.CreateMp4Muxer(file)
task := &writeTrailerTask{
file: file,
muxer: muxer,
}
task.Logger = r.Logger
defer writeTrailerQueueTask.AddTask(task)
ar, vr := ctx.Subscriber.AudioReader, ctx.Subscriber.VideoReader
if ar != nil {
audioTrack := ar.Track

View File

@@ -4,9 +4,11 @@ import (
"context"
gpb "m7s.live/m7s/v5/pb"
"m7s.live/m7s/v5/plugin/rtmp/pb"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
func (r *RTMPPlugin) PushOut(ctx context.Context, req *pb.PushRequest) (res *gpb.SuccessResponse, err error) {
_, err = r.Push(req.StreamPath, req.RemoteURL)
pusher := rtmp.NewPusher()
err = r.Server.AddPushTask(pusher.GetPushContext().Init(pusher, &r.Plugin, req.StreamPath, req.RemoteURL)).WaitStarted()
return &gpb.SuccessResponse{}, err
}

View File

@@ -503,6 +503,7 @@ func (p *Publisher) Dispose() {
w.baseTs = p.lastTs
w.Info("takeOver", "pId", p.ID)
for subscriber := range p.SubscriberRange {
subscriber.Publisher = nil
w.Add(subscriber)
}
p.AudioTrack.Dispose()

View File

@@ -1,34 +1,47 @@
package m7s
import (
"io"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
"net/http"
"net/url"
"os"
"strings"
"time"
)
type Connection struct {
util.MarcoTask
Plugin *Plugin
StreamPath string // 对应本地流
RemoteURL string // 远程服务器地址(用于推拉)
ConnectProxy string // 连接代理
}
type (
Connection struct {
util.MarcoTask
Plugin *Plugin
StreamPath string // 对应本地流
RemoteURL string // 远程服务器地址(用于推拉)
ConnectProxy string // 连接代理
}
type IPuller interface {
util.ITask
GetPullContext() *PullContext
}
IPuller interface {
util.ITask
GetPullContext() *PullContext
}
type Puller = func() IPuller
Puller = func() IPuller
type PullContext struct {
Connection
Publisher *Publisher
publishConfig *config.Publish
config.Pull
puller IPuller
}
PullContext struct {
Connection
Publisher *Publisher
publishConfig *config.Publish
config.Pull
puller IPuller
}
HttpFilePuller struct {
util.Task
Ctx PullContext
io.ReadCloser
}
)
func (p *PullContext) GetPullContext() *PullContext {
return p
@@ -71,3 +84,42 @@ func (p *PullContext) Start() (err error) {
func (p *PullContext) Dispose() {
p.Plugin.Server.Pulls.Remove(p)
}
func (p *HttpFilePuller) Start() (err error) {
if err = p.Ctx.Publish(); err != nil {
return
}
remoteURL := p.Ctx.RemoteURL
if strings.HasPrefix(remoteURL, "http") {
var res *http.Response
client := http.DefaultClient
if proxyConf := p.Ctx.ConnectProxy; proxyConf != "" {
proxy, err := url.Parse(proxyConf)
if err != nil {
return err
}
transport := &http.Transport{Proxy: http.ProxyURL(proxy)}
client = &http.Client{Transport: transport}
}
if res, err = client.Get(remoteURL); err == nil {
if res.StatusCode != http.StatusOK {
return io.EOF
}
p.ReadCloser = res.Body
}
} else {
var res *os.File
if res, err = os.Open(remoteURL); err == nil {
p.ReadCloser = res
}
}
return
}
func (p *HttpFilePuller) GetPullContext() *PullContext {
return &p.Ctx
}
func (p *HttpFilePuller) Dispose() {
p.ReadCloser.Close()
}

View File

@@ -9,68 +9,54 @@ import (
"m7s.live/m7s/v5/pkg"
)
type Recorder = func(*RecordContext) error
func createRecoder(p *Plugin, streamPath string, filePath string) (recorder *RecordContext) {
recorder = &RecordContext{
Plugin: p,
Fragment: p.config.Record.Fragment,
Append: p.config.Record.Append,
FilePath: filePath,
StreamPath: streamPath,
type (
IRecorder interface {
util.ITask
GetRecordContext() *RecordContext
}
recorder.Logger = p.Logger.With("filePath", filePath, "streamPath", streamPath)
return
Recorder = func() IRecorder
RecordContext struct {
util.MarcoTask
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
Fragment time.Duration
Append bool
FilePath string
recorder IRecorder
}
DefaultRecorder struct {
util.Task
Ctx RecordContext
}
)
func (r *DefaultRecorder) GetRecordContext() *RecordContext {
return &r.Ctx
}
type RecordContext struct {
util.MarcoTask
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
Fragment time.Duration
Append bool
FilePath string
func (r *DefaultRecorder) Start() (err error) {
return r.Ctx.Subscribe()
}
func (p *RecordContext) GetKey() string {
return p.FilePath
}
type recordSubTask struct {
util.Task
ctx *RecordContext
Recorder
}
func (r *recordSubTask) Start() (err error) {
p := r.ctx
dir := p.FilePath
if p.Fragment == 0 || p.Append {
if filepath.Ext(p.FilePath) == "" {
p.FilePath += ".flv"
}
dir = filepath.Dir(p.FilePath)
}
r.Description = map[string]any{
"filePath": p.FilePath,
}
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
p.Subscriber, err = p.Plugin.Subscribe(r.Context, p.StreamPath)
if err != nil {
return
}
err = r.Recorder(p)
func (p *RecordContext) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.recorder.GetTask().Context, p.StreamPath)
return
}
func (p *RecordContext) Do(recorder Recorder) {
p.AddTask(&recordSubTask{
ctx: p,
Recorder: recorder,
})
func (p *RecordContext) Init(recorder IRecorder, plugin *Plugin, streamPath string, filePath string) *RecordContext {
p.Plugin = plugin
p.Fragment = plugin.config.Record.Fragment
p.Append = plugin.config.Record.Append
p.FilePath = filePath
p.StreamPath = streamPath
p.Logger = plugin.Logger.With("filePath", filePath, "streamPath", streamPath)
p.recorder = recorder
return p
}
func (p *RecordContext) Start() (err error) {
@@ -78,10 +64,21 @@ func (p *RecordContext) Start() (err error) {
if _, ok := s.Records.Get(p.GetKey()); ok {
return pkg.ErrRecordSamePath
}
s.Records.Add(p)
if p.Plugin.Meta.Recorder != nil {
p.Do(p.Plugin.Meta.Recorder)
dir := p.FilePath
if p.Fragment == 0 || p.Append {
if filepath.Ext(p.FilePath) == "" {
p.FilePath += ".flv"
}
dir = filepath.Dir(p.FilePath)
}
p.Description = map[string]any{
"filePath": p.FilePath,
}
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
s.Records.Add(p)
s.AddTask(p.recorder)
return
}

View File

@@ -275,6 +275,10 @@ func (s *Server) AddPushTask(task *PushContext) *util.Task {
return s.pushTask.AddTask(task)
}
func (s *Server) AddRecordTask(task *RecordContext) *util.Task {
return s.recordTask.AddTask(task)
}
func (s *Server) Dispose() {
Servers.Remove(s)
_ = s.tcplis.Close()

View File

@@ -258,7 +258,11 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) {
}
checkPublisherChange := func() {
if prePublisher != s.Publisher {
s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID)
if s.Publisher == nil {
s.Info("publisher gone", "prePublisher", prePublisher.ID)
} else {
s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID)
}
if s.AudioReader != nil {
startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond
s.AudioReader.StopRead()