diff --git a/example/8081/rtsp_server.yaml b/example/8081/rtsp_server.yaml new file mode 100644 index 0000000..ead9098 --- /dev/null +++ b/example/8081/rtsp_server.yaml @@ -0,0 +1,12 @@ +global: + loglevel: debug + tcp: :50052 + http: :8081 + disableall: true +flv: + enable: true + pull: + live/test: /Users/dexter/Movies/jb-demo.flv +rtsp: + enable: true + tcp: :8554 \ No newline at end of file diff --git a/pkg/task/channel.go b/pkg/task/channel.go index 6df5650..209d206 100644 --- a/pkg/task/channel.go +++ b/pkg/task/channel.go @@ -54,11 +54,11 @@ func (t *AsyncTickTask) GetSignal() any { } func (t *AsyncTickTask) Go() error { - t.Tick(nil) + t.handler.(ITickTask).Tick(nil) for { select { case c := <-t.Ticker.C: - t.Tick(c) + t.handler.(ITickTask).Tick(c) case <-t.Done(): return nil } diff --git a/pkg/task/task.go b/pkg/task/task.go index eb70e89..4147c41 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -77,6 +77,8 @@ type ( OnDispose(func()) GetState() TaskState GetLevel() byte + WaitStopped() error + WaitStarted() error } IJob interface { ITask @@ -324,6 +326,9 @@ func (task *Task) start() bool { task.ResetRetryCount() if runHandler, ok := task.handler.(TaskBlock); ok { task.state = TASK_STATE_RUNNING + if task.Logger != nil { + task.Debug("task run", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) + } err = runHandler.Run() if err == nil { err = ErrTaskComplete @@ -334,6 +339,9 @@ func (task *Task) start() bool { if err == nil { if goHandler, ok := task.handler.(TaskGo); ok { task.state = TASK_STATE_GOING + if task.Logger != nil { + task.Debug("task go", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType()) + } go task.run(goHandler.Go) } return true diff --git a/plugin.go b/plugin.go index 38d3533..b7c444c 100644 --- a/plugin.go +++ b/plugin.go @@ -89,7 +89,7 @@ type ( OnQUICConnect(quic.Connection) task.ITask } IPullProxyPlugin interface { - OnPullProxyAdd(pullProxy *PullProxy) any + OnPullProxyAdd(pullProxy *PullProxyConfig) any } IPushProxyPlugin interface { OnPushProxyAdd(pushProxy *PushProxy) any diff --git a/plugin/flv/index.go b/plugin/flv/index.go index 8c47cf2..675ac16 100644 --- a/plugin/flv/index.go +++ b/plugin/flv/index.go @@ -97,9 +97,9 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { err = live.Run() } -func (plugin *FLVPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any { +func (plugin *FLVPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any { d := &m7s.HTTPPullProxy{} - d.PullProxy = pullProxy + d.PullProxyConfig = pullProxy d.Plugin = &plugin.Plugin return d } diff --git a/plugin/gb28181/channel.go b/plugin/gb28181/channel.go index 9d3cb53..9bb1214 100644 --- a/plugin/gb28181/channel.go +++ b/plugin/gb28181/channel.go @@ -41,6 +41,7 @@ func (r *PresetRequest) GetKey() int { } type Channel struct { + PullProxyTask *m7s.PullProxyTask Device *Device // 所属设备 State atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲 GpsTime time.Time // gps时间 @@ -49,21 +50,12 @@ type Channel struct { PresetReqs util.Collection[int, *PresetRequest] // 预置位请求集合 *slog.Logger gb28181.DeviceChannel - AbstractDevice *m7s.PullProxy } func (c *Channel) GetKey() string { return c.DeviceID } -func (c *Channel) Pull() { - pubConf := c.Device.plugin.GetCommonConf().Publish - pubConf.PubAudio = c.AbstractDevice.Audio - pubConf.DelayCloseTimeout = util.Conditional(c.AbstractDevice.StopOnIdle, time.Second*5, 0) - c.Info("into channel.Pull") - c.Device.plugin.Pull(c.AbstractDevice.GetStreamPath(), c.AbstractDevice.Pull, &pubConf) -} - func (c *Channel) GetDeviceID() string { return c.DeviceID } diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 456b6eb..2b78d86 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -264,11 +264,11 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) { gb.devices.Set(device) }) device.channels.OnAdd(func(c *Channel) { - if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool { - return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", device.DeviceID, c.DeviceID) + if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice m7s.IPullProxy) bool { + conf := absDevice.GetConfig() + return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceID, c.DeviceID) }); ok { - c.AbstractDevice = absDevice - absDevice.Handler = c + c.PullProxyTask = absDevice.(*m7s.PullProxyTask) absDevice.ChangeStatus(m7s.PullProxyStatusOnline) } if gb.AutoInvite { @@ -282,8 +282,8 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) { device.Status = DeviceOfflineStatus if gb.devices.RemoveByKey(device.DeviceID) { for c := range device.channels.Range { - if c.AbstractDevice != nil { - c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline) + if c.PullProxyTask != nil { + c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline) } } } @@ -372,14 +372,17 @@ func (gb *GB28181Plugin) checkPlatform() { } } -func (p *GB28181Plugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any { - deviceID, channelID, _ := strings.Cut(pullProxy.URL, "/") +func (p *GB28181Plugin) OnPullProxyAdd(conf *m7s.PullProxyConfig) any { + deviceID, channelID, _ := strings.Cut(conf.URL, "/") if d, ok := p.devices.Get(deviceID); ok { if channel, ok := d.channels.Get(channelID); ok { - channel.AbstractDevice = pullProxy - pullProxy.Handler = channel + pullProxy := &m7s.PullProxyTask{ + PullProxyConfig: conf, + Plugin: &p.Plugin, + } pullProxy.ChangeStatus(m7s.PullProxyStatusOnline) - return channel + channel.PullProxyTask = pullProxy + return pullProxy } } return nil @@ -839,11 +842,11 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi d.OnStart(func() { gb.devices.Set(d) d.channels.OnAdd(func(c *Channel) { - if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool { - return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID) + if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice m7s.IPullProxy) bool { + conf := absDevice.GetConfig() + return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID) }); ok { - c.AbstractDevice = absDevice - absDevice.Handler = c + c.PullProxyTask = absDevice.(*m7s.PullProxyTask) absDevice.ChangeStatus(m7s.PullProxyStatusOnline) } if gb.AutoInvite { @@ -858,8 +861,8 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi d.Status = DeviceOfflineStatus if gb.devices.RemoveByKey(d.DeviceID) { for c := range d.channels.Range { - if c.AbstractDevice != nil { - c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline) + if c.PullProxyTask != nil { + c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline) } } } diff --git a/plugin/hls/index.go b/plugin/hls/index.go index 7b44d90..97e3a0a 100644 --- a/plugin/hls/index.go +++ b/plugin/hls/index.go @@ -45,9 +45,9 @@ func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc { } } -func (p *HLSPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any { +func (p *HLSPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any { d := &m7s.HTTPPullProxy{} - d.PullProxy = pullProxy + d.PullProxyConfig = pullProxy d.Plugin = &p.Plugin return d } diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index f7a7bc0..31ea1d7 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -182,11 +182,10 @@ func (task *RTMPServer) Go() (err error) { return } -func (p *RTMPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any { +func (p *RTMPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any { ret := &RTMPPullProxy{} - ret.PullProxy = pullProxy + ret.PullProxyConfig = pullProxy ret.Plugin = &p.Plugin - ret.Logger = p.With("pullProxy", pullProxy.Name) return ret } diff --git a/plugin/rtmp/pull-proxy.go b/plugin/rtmp/pull-proxy.go index cf84cdb..8b31668 100644 --- a/plugin/rtmp/pull-proxy.go +++ b/plugin/rtmp/pull-proxy.go @@ -13,7 +13,7 @@ type RTMPPullProxy struct { } func (d *RTMPPullProxy) Start() (err error) { - d.URL, err = url.Parse(d.PullProxy.URL) + d.URL, err = url.Parse(d.PullProxyConfig.URL) if err != nil { return } diff --git a/plugin/rtsp/index.go b/plugin/rtsp/index.go index 30b33d1..7fd4c88 100644 --- a/plugin/rtsp/index.go +++ b/plugin/rtsp/index.go @@ -26,11 +26,10 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask { return ret } -func (p *RTSPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any { +func (p *RTSPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any { ret := &RTSPPullProxy{} - ret.PullProxy = pullProxy + ret.PullProxyConfig = pullProxy ret.Plugin = &p.Plugin - ret.Logger = p.With("pullProxy", pullProxy.Name) return ret } diff --git a/plugin/rtsp/pull-proxy.go b/plugin/rtsp/pull-proxy.go index 6e99556..51ff274 100644 --- a/plugin/rtsp/pull-proxy.go +++ b/plugin/rtsp/pull-proxy.go @@ -17,7 +17,7 @@ type RTSPPullProxy struct { } func (d *RTSPPullProxy) Start() (err error) { - d.URL, err = url.Parse(d.PullProxy.URL) + d.URL, err = url.Parse(d.PullProxyConfig.URL) if err != nil { return } @@ -54,23 +54,23 @@ func (d *RTSPPullProxy) GetTickInterval() time.Duration { func (d *RTSPPullProxy) Tick(any) { var err error - switch d.PullProxy.Status { + switch d.Status { case m7s.PullProxyStatusOffline: - err = d.conn.Connect(d.PullProxy.URL) + err = d.conn.Connect(d.PullProxyConfig.URL) if err != nil { return } - d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline) + d.ChangeStatus(m7s.PullProxyStatusOnline) case m7s.PullProxyStatusOnline, m7s.PullProxyStatusPulling: if d.conn.Conn == nil { - err = d.conn.Connect(d.PullProxy.URL) + err = d.conn.Connect(d.PullProxyConfig.URL) } else { t := time.Now() err = d.conn.Options() - d.PullProxy.RTT = time.Since(t) + d.RTT = time.Since(t) } if err != nil { - d.PullProxy.ChangeStatus(m7s.PullProxyStatusOffline) + d.ChangeStatus(m7s.PullProxyStatusOffline) } } } diff --git a/publisher.go b/publisher.go index 80808ba..b88348e 100644 --- a/publisher.go +++ b/publisher.go @@ -143,7 +143,7 @@ type Publisher struct { GOP int OnSeek func(time.Time) OnGetPosition func() time.Time - PullProxy *PullProxy + PullProxyConfig *PullProxyConfig dumpFile *os.File dropRate float64 // 丢帧率,0-1之间 dropAfterTs time.Duration diff --git a/pull-proxy.go b/pull-proxy.go index 8e7a5f8..35eb2d9 100644 --- a/pull-proxy.go +++ b/pull-proxy.go @@ -30,11 +30,15 @@ const ( type ( IPullProxy interface { + task.ITask + GetStreamPath() string + GetConfig() *PullProxyConfig + ChangeStatus(status byte) Pull() + GetKey() uint } - PullProxy struct { - server *Server `gorm:"-:all"` - task.Work `gorm:"-:all" yaml:"-"` + PullProxyConfig struct { + server *Server `gorm:"-:all"` ID uint `gorm:"primarykey"` CreatedAt, UpdatedAt time.Time `yaml:"-"` DeletedAt gorm.DeletedAt `yaml:"-"` @@ -48,15 +52,14 @@ type ( Status byte Description string RTT time.Duration - Handler IPullProxy `gorm:"-:all" yaml:"-"` } PullProxyManager struct { - task.Manager[uint, *PullProxy] + task.Manager[uint, IPullProxy] } PullProxyTask struct { + *PullProxyConfig task.AsyncTickTask - PullProxy *PullProxy - Plugin *Plugin + Plugin *Plugin } HTTPPullProxy struct { TCPPullProxy @@ -68,38 +71,22 @@ type ( } ) -func (d *PullProxy) GetKey() uint { +func (d *PullProxyConfig) GetKey() uint { return d.ID } -func (d *PullProxy) GetStreamPath() string { +func (d *PullProxyConfig) GetConfig() *PullProxyConfig { + return d +} + +func (d *PullProxyConfig) GetStreamPath() string { if d.StreamPath == "" { return fmt.Sprintf("pull/%s/%d", d.Type, d.ID) } return d.StreamPath } -func (d *PullProxy) Start() (err error) { - for plugin := range d.server.Plugins.Range { - if pullPlugin, ok := plugin.handler.(IPullProxyPlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) { - pullTask := pullPlugin.OnPullProxyAdd(d) - if pullTask == nil { - continue - } - if pullTask, ok := pullTask.(IPullProxy); ok { - d.Handler = pullTask - } - if t, ok := pullTask.(task.ITask); ok { - d.AddTask(t) - } else { - d.ChangeStatus(PullProxyStatusOnline) - } - } - } - return -} - -func (d *PullProxy) ChangeStatus(status byte) { +func (d *PullProxyTask) ChangeStatus(status byte) { if d.Status == status { return } @@ -110,34 +97,30 @@ func (d *PullProxy) ChangeStatus(status byte) { switch status { case PullProxyStatusOnline: if d.PullOnStart && from == PullProxyStatusOffline { - d.Handler.Pull() + d.Pull() } } } -func (d *PullProxy) Update() { +func (d *PullProxyConfig) Update() { if d.server.DB != nil { d.server.DB.Omit("deleted_at").Save(d) } } func (d *PullProxyTask) Dispose() { - d.PullProxy.ChangeStatus(PullProxyStatusOffline) - d.Plugin.Server.Streams.Call(func() error { - if stream, ok := d.Plugin.Server.Streams.Get(d.PullProxy.GetStreamPath()); ok { - stream.Stop(task.ErrStopByUser) - } - return nil - }) + d.ChangeStatus(PullProxyStatusOffline) + if stream, ok := d.Plugin.Server.Streams.SafeGet(d.GetStreamPath()); ok { + stream.Stop(task.ErrStopByUser) + } } -func (d *PullProxy) InitializeWithServer(s *Server) { +func (d *PullProxyConfig) InitializeWithServer(s *Server) { d.server = s - d.Logger = s.Logger.With("pullProxy", d.ID, "type", d.Type, "name", d.Name) if d.Type == "" { u, err := url.Parse(d.URL) if err != nil { - d.Logger.Error("parse pull url failed", "error", err) + s.Error("parse pull url failed", "error", err) return } switch u.Scheme { @@ -159,13 +142,13 @@ func (d *PullProxy) InitializeWithServer(s *Server) { func (d *PullProxyTask) Pull() { var pubConf = d.Plugin.config.Publish - pubConf.PubAudio = d.PullProxy.Audio - pubConf.DelayCloseTimeout = util.Conditional(d.PullProxy.StopOnIdle, time.Second*5, 0) - d.Plugin.handler.Pull(d.PullProxy.GetStreamPath(), d.PullProxy.Pull, &pubConf) + pubConf.PubAudio = d.Audio + pubConf.DelayCloseTimeout = util.Conditional(d.StopOnIdle, time.Second*5, 0) + d.Plugin.handler.Pull(d.GetStreamPath(), d.PullProxyConfig.Pull, &pubConf) } func (d *HTTPPullProxy) Start() (err error) { - d.URL, err = url.Parse(d.PullProxy.URL) + d.URL, err = url.Parse(d.PullProxyConfig.URL) if err != nil { return } @@ -194,30 +177,30 @@ func (d *TCPPullProxy) GetTickInterval() time.Duration { } func (d *TCPPullProxy) Tick(any) { - switch d.PullProxy.Status { + switch d.Status { case PullProxyStatusOffline: startTime := time.Now() conn, err := net.DialTCP("tcp", nil, d.TCPAddr) if err != nil { - d.PullProxy.ChangeStatus(PullProxyStatusOffline) + d.ChangeStatus(PullProxyStatusOffline) return } conn.Close() - d.PullProxy.RTT = time.Since(startTime) - d.PullProxy.ChangeStatus(PullProxyStatusOnline) + d.RTT = time.Since(startTime) + d.ChangeStatus(PullProxyStatusOnline) } } func (p *Publisher) processPullProxyOnStart() { s := p.Plugin.Server - if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool { + if pullProxy, ok := s.PullProxies.Find(func(pullProxy IPullProxy) bool { return pullProxy.GetStreamPath() == p.StreamPath }); ok { - p.PullProxy = pullProxy - if pullProxy.Status == PullProxyStatusOnline { + p.PullProxyConfig = pullProxy.GetConfig() + if p.PullProxyConfig.Status == PullProxyStatusOnline { pullProxy.ChangeStatus(PullProxyStatusPulling) - if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" { - mp4Plugin.Record(p, pullProxy.Record, nil) + if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && p.PullProxyConfig.FilePath != "" { + mp4Plugin.Record(p, p.PullProxyConfig.Record, nil) } } } @@ -225,30 +208,49 @@ func (p *Publisher) processPullProxyOnStart() { func (p *Publisher) processPullProxyOnDispose() { s := p.Plugin.Server - if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) { - p.PullProxy.ChangeStatus(PullProxyStatusOnline) + if p.PullProxyConfig != nil && p.PullProxyConfig.Status == PullProxyStatusPulling { + if pullproxy, ok := s.PullProxies.Get(p.PullProxyConfig.GetKey()); ok { + pullproxy.ChangeStatus(PullProxyStatusOnline) + } } } +func (s *Server) createPullProxy(conf *PullProxyConfig) (pullProxy IPullProxy, err error) { + for plugin := range s.Plugins.Range { + if pullPlugin, ok := plugin.handler.(IPullProxyPlugin); ok && strings.EqualFold(conf.Type, plugin.Meta.Name) { + pullTask := pullPlugin.OnPullProxyAdd(conf) + if pullTask == nil { + continue + } + if pullTask, ok := pullTask.(IPullProxy); ok { + s.PullProxies.Add(pullTask, plugin.Logger.With("pullProxyId", conf.ID, "pullProxyType", conf.Type, "pullProxyName", conf.Name)) + return pullTask, nil + } + } + } + return +} + func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PullProxyListResponse, err error) { res = &pb.PullProxyListResponse{} for device := range s.PullProxies.SafeRange { + conf := device.GetConfig() res.Data = append(res.Data, &pb.PullProxyInfo{ - Name: device.Name, - CreateTime: timestamppb.New(device.CreatedAt), - UpdateTime: timestamppb.New(device.UpdatedAt), - Type: device.Type, - PullURL: device.URL, - ParentID: uint32(device.ParentID), - Status: uint32(device.Status), - ID: uint32(device.ID), - PullOnStart: device.PullOnStart, - StopOnIdle: device.StopOnIdle, - Audio: device.Audio, - RecordPath: device.Record.FilePath, - RecordFragment: durationpb.New(device.Record.Fragment), - Description: device.Description, - Rtt: uint32(device.RTT.Milliseconds()), + Name: conf.Name, + CreateTime: timestamppb.New(conf.CreatedAt), + UpdateTime: timestamppb.New(conf.UpdatedAt), + Type: conf.Type, + PullURL: conf.URL, + ParentID: uint32(conf.ParentID), + Status: uint32(conf.Status), + ID: uint32(conf.ID), + PullOnStart: conf.PullOnStart, + StopOnIdle: conf.StopOnIdle, + Audio: conf.Audio, + RecordPath: conf.Record.FilePath, + RecordFragment: durationpb.New(conf.Record.Fragment), + Description: conf.Description, + Rtt: uint32(conf.RTT.Milliseconds()), StreamPath: device.GetStreamPath(), }) } @@ -256,7 +258,7 @@ func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res } func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) { - device := &PullProxy{ + device := &PullProxyConfig{ server: s, Name: req.Name, Type: req.Type, @@ -302,7 +304,8 @@ func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res * if req.StreamPath == "" { device.StreamPath = device.GetStreamPath() } - s.PullProxies.Add(device) + _, err = s.createPullProxy(device) + res = &pb.SuccessResponse{} return } @@ -312,7 +315,7 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re err = pkg.ErrNoDB return } - target := &PullProxy{ + target := &PullProxyConfig{ server: s, } err = s.DB.First(target, req.ID).Error @@ -354,25 +357,29 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re target.RTT = time.Duration(int(req.Rtt)) * time.Millisecond target.StreamPath = req.StreamPath s.DB.Save(target) - var needStopOld *PullProxy if device, ok := s.PullProxies.SafeGet(uint(req.ID)); ok { - if target.URL != device.URL || device.Audio != target.Audio || device.StreamPath != target.StreamPath || device.Record.FilePath != target.Record.FilePath || device.Record.Fragment != target.Record.Fragment { + conf := device.GetConfig() + if target.URL != conf.URL || conf.Audio != target.Audio || conf.StreamPath != target.StreamPath || conf.Record.FilePath != target.Record.FilePath || conf.Record.Fragment != target.Record.Fragment { device.Stop(task.ErrStopByUser) - needStopOld = device + device.WaitStopped() + _, err = s.createPullProxy(target) + if target.Status == PullProxyStatusPulling { + if pullJob, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok { + pullJob.Stop(task.ErrStopByUser) + pullJob.WaitStopped() + } + device.Pull() + } } else { - device.Name = target.Name - device.PullOnStart = target.PullOnStart - device.StopOnIdle = target.StopOnIdle - device.Description = target.Description + conf.Name = target.Name + conf.PullOnStart = target.PullOnStart + if conf.PullOnStart && conf.Status == PullProxyStatusOnline { + device.Pull() + } + conf.StopOnIdle = target.StopOnIdle + conf.Description = target.Description } } - if needStopOld != nil { - if pullJob, ok := s.Pulls.SafeGet(req.StreamPath); ok { - pullJob.Stop(task.ErrStopByUser) - pullJob.WaitStopped() - } - s.PullProxies.Add(target).WaitStarted() - } res = &pb.SuccessResponse{} return } @@ -384,29 +391,29 @@ func (s *Server) RemovePullProxy(ctx context.Context, req *pb.RequestWithId) (re } res = &pb.SuccessResponse{} if req.Id > 0 { - tx := s.DB.Delete(&PullProxy{ + tx := s.DB.Delete(&PullProxyConfig{ ID: uint(req.Id), }) err = tx.Error if device, ok := s.PullProxies.SafeGet(uint(req.Id)); ok { device.Stop(task.ErrStopByUser) - if pull, ok := device.server.Pulls.SafeGet(device.StreamPath); ok { - pull.Stop(task.ErrStopByUser) - } + // if pull, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok { + // pull.Stop(task.ErrStopByUser) + // } } return } else if req.StreamPath != "" { - var deviceList []*PullProxy + var deviceList []*PullProxyConfig s.DB.Find(&deviceList, "stream_path=?", req.StreamPath) if len(deviceList) > 0 { for _, device := range deviceList { - tx := s.DB.Delete(&PullProxy{}, device.ID) + tx := s.DB.Delete(&PullProxyConfig{}, device.ID) err = tx.Error if device, ok := s.PullProxies.SafeGet(uint(device.ID)); ok { device.Stop(task.ErrStopByUser) - if pull, ok := device.server.Pulls.SafeGet(device.StreamPath); ok { - pull.Stop(task.ErrStopByUser) - } + // if pull, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok { + // pull.Stop(task.ErrStopByUser) + // } } } } diff --git a/server.go b/server.go index 22d2256..49586a4 100644 --- a/server.go +++ b/server.go @@ -63,7 +63,7 @@ type ( DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件 StreamAlias map[config.Regexp]string `desc:"流别名"` Location map[config.Regexp]string `desc:"HTTP路由转发规则,key为正则表达式,value为目标地址"` - PullProxy []*PullProxy + PullProxy []*PullProxyConfig PushProxy []*PushProxy Admin struct { EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制 @@ -281,7 +281,7 @@ func (s *Server) Start() (err error) { return } // Auto-migrate models - if err = s.DB.AutoMigrate(&db.User{}, &PullProxy{}, &PushProxy{}, &StreamAliasDB{}); err != nil { + if err = s.DB.AutoMigrate(&db.User{}, &PullProxyConfig{}, &PushProxy{}, &StreamAliasDB{}); err != nil { s.Error("failed to auto-migrate models", "error", err) return } @@ -424,15 +424,15 @@ func (s *Server) Start() (err error) { func (s *Server) initPullProxies() { // 1. First read all pull proxies from database - var pullProxies []*PullProxy + var pullProxies []*PullProxyConfig s.DB.Find(&pullProxies) // Create a map for quick lookup of existing proxies - existingPullProxies := make(map[uint]*PullProxy) + existingPullProxies := make(map[uint]*PullProxyConfig) for _, proxy := range pullProxies { existingPullProxies[proxy.ID] = proxy + proxy.Status = PullProxyStatusOffline proxy.InitializeWithServer(s) - proxy.ChangeStatus(PullProxyStatusOffline) } // 2. Process and override with config data @@ -457,7 +457,7 @@ func (s *Server) initPullProxies() { // 3. Finally add all proxies to collections for _, proxy := range pullProxies { - s.PullProxies.Add(proxy) + s.createPullProxy(proxy) } } @@ -506,7 +506,7 @@ func (s *Server) initPullProxiesWithoutDB() { for _, proxy := range s.PullProxy { if proxy.ID != 0 { proxy.InitializeWithServer(s) - s.PullProxies.Add(proxy, proxy.Logger) + s.createPullProxy(proxy) } } } @@ -598,8 +598,9 @@ func (s *Server) OnSubscribe(streamPath string, args url.Values) { plugin.OnSubscribe(streamPath, args) } for pullProxy := range s.PullProxies.Range { - if pullProxy.Status == PullProxyStatusOnline && pullProxy.GetStreamPath() == streamPath && !pullProxy.PullOnStart { - pullProxy.Handler.Pull() + conf := pullProxy.GetConfig() + if conf.Status == PullProxyStatusOnline && pullProxy.GetStreamPath() == streamPath && !conf.PullOnStart { + pullProxy.Pull() } } }