diff --git a/.gitignore b/.gitignore index 673f159..dafc5ca 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,8 @@ bin *.flv pullcf.yaml admin.zip -example/default/default -__debug* \ No newline at end of file +__debug* +.cursorrules +example/default/* +!example/default/main.go +!example/default/config.yaml diff --git a/api.go b/api.go index 6d70893..d786d5d 100644 --- a/api.go +++ b/api.go @@ -793,26 +793,29 @@ func (s *Server) ModifyConfig(_ context.Context, req *pb.ModifyConfigRequest) (r func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PullProxyListResponse, err error) { res = &pb.PullProxyListResponse{} - for device := range s.PullProxies.Range { - res.Data = append(res.Data, &pb.PullProxyInfo{ - Name: device.Name, - CreateTime: timestamppb.New(device.CreatedAt), - UpdateTime: timestamppb.New(device.UpdatedAt), - Type: device.Type, - PullURL: device.URL, - ParentID: uint32(device.ParentID), - Status: uint32(device.Status), - ID: uint32(device.ID), - PullOnStart: device.PullOnStart, - StopOnIdle: device.StopOnIdle, - Audio: device.Audio, - RecordPath: device.Record.FilePath, - RecordFragment: durationpb.New(device.Record.Fragment), - Description: device.Description, - Rtt: uint32(device.RTT.Milliseconds()), - StreamPath: device.GetStreamPath(), - }) - } + s.PullProxies.Call(func() error { + for device := range s.PullProxies.Range { + res.Data = append(res.Data, &pb.PullProxyInfo{ + Name: device.Name, + CreateTime: timestamppb.New(device.CreatedAt), + UpdateTime: timestamppb.New(device.UpdatedAt), + Type: device.Type, + PullURL: device.URL, + ParentID: uint32(device.ParentID), + Status: uint32(device.Status), + ID: uint32(device.ID), + PullOnStart: device.PullOnStart, + StopOnIdle: device.StopOnIdle, + Audio: device.Audio, + RecordPath: device.Record.FilePath, + RecordFragment: durationpb.New(device.Record.Fragment), + Description: device.Description, + Rtt: uint32(device.RTT.Milliseconds()), + StreamPath: device.GetStreamPath(), + }) + } + return nil + }) return } @@ -1072,23 +1075,26 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque func (s *Server) GetPushProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PushProxyListResponse, err error) { res = &pb.PushProxyListResponse{} - for device := range s.PushProxies.Range { - res.Data = append(res.Data, &pb.PushProxyInfo{ - Name: device.Name, - CreateTime: timestamppb.New(device.CreatedAt), - UpdateTime: timestamppb.New(device.UpdatedAt), - Type: device.Type, - PushURL: device.URL, - ParentID: uint32(device.ParentID), - Status: uint32(device.Status), - ID: uint32(device.ID), - PushOnStart: device.PushOnStart, - Audio: device.Audio, - Description: device.Description, - Rtt: uint32(device.RTT.Milliseconds()), - StreamPath: device.GetStreamPath(), - }) - } + s.PushProxies.Call(func() error { + for device := range s.PushProxies.Range { + res.Data = append(res.Data, &pb.PushProxyInfo{ + Name: device.Name, + CreateTime: timestamppb.New(device.CreatedAt), + UpdateTime: timestamppb.New(device.UpdatedAt), + Type: device.Type, + PushURL: device.URL, + ParentID: uint32(device.ParentID), + Status: uint32(device.Status), + ID: uint32(device.ID), + PushOnStart: device.PushOnStart, + Audio: device.Audio, + Description: device.Description, + Rtt: uint32(device.RTT.Milliseconds()), + StreamPath: device.GetStreamPath(), + }) + } + return nil + }) return } diff --git a/pusher.go b/pusher.go index ec1b4c6..063b899 100644 --- a/pusher.go +++ b/pusher.go @@ -30,6 +30,11 @@ func (p *PushJob) GetKey() string { func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push, subConf *config.Subscribe) *PushJob { p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy, http.Header(conf.Header)) p.pusher = pusher + if subConf == nil { + conf := plugin.config.Subscribe + subConf = &conf + } + subConf.SubType = SubscribeTypePush p.SubConf = subConf p.SetDescriptions(task.Description{ "plugin": plugin.Meta.Name, @@ -43,13 +48,7 @@ func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf c } func (p *PushJob) Subscribe() (err error) { - if p.SubConf != nil { - p.SubConf.SubType = SubscribeTypePush - p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.pusher.GetTask().Context, p.StreamPath, *p.SubConf) - } else { - p.SubConf = &config.Subscribe{SubType: SubscribeTypePush} - p.Subscriber, err = p.Plugin.Subscribe(p.pusher.GetTask().Context, p.StreamPath) - } + p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.pusher.GetTask().Context, p.StreamPath, *p.SubConf) return } diff --git a/recoder.go b/recoder.go index 3692a20..1f22f0b 100644 --- a/recoder.go +++ b/recoder.go @@ -81,13 +81,8 @@ func (p *RecordJob) GetKey() string { } func (p *RecordJob) Subscribe() (err error) { - if p.SubConf != nil { - p.SubConf.SubType = SubscribeTypeVod - p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf) - } else { - p.SubConf = &config.Subscribe{SubType: SubscribeTypeVod} - p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf) - } + + p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf) return } @@ -97,6 +92,11 @@ func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, p.Append = conf.Append p.FilePath = conf.FilePath p.StreamPath = streamPath + if subConf == nil { + conf := p.Plugin.config.Subscribe + subConf = &conf + } + subConf.SubType = SubscribeTypeVod p.SubConf = subConf p.recorder = recorder p.SetDescriptions(task.Description{ diff --git a/server.go b/server.go index a077ffc..ea206f8 100644 --- a/server.go +++ b/server.go @@ -285,6 +285,7 @@ func (s *Server) Start() (err error) { s.AddTask(&s.Pushs) s.AddTask(&s.Transforms) s.AddTask(&s.PullProxies) + s.AddTask(&s.PushProxies) promReg := prometheus.NewPedanticRegistry() promReg.MustRegister(s) for _, plugin := range plugins { diff --git a/transformer.go b/transformer.go index f828ec7..f192c1d 100644 --- a/transformer.go +++ b/transformer.go @@ -68,8 +68,9 @@ func (r *DefaultTransformer) GetTransformJob() *TransformJob { } func (p *TransformJob) Subscribe() (err error) { - p.Plugin.config.SubType = SubscribeTypeTransform - p.Subscriber, err = p.Plugin.Subscribe(p.Transformer, p.StreamPath) + subConfig := p.Plugin.config.Subscribe + subConfig.SubType = SubscribeTypeTransform + p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.Transformer, p.StreamPath, subConfig) if err == nil { p.Transformer.Depend(p.Subscriber) }