refactor: pull proxy

This commit is contained in:
langhuihui
2025-04-11 17:10:48 +08:00
parent 74dd4d7235
commit 546ca02eb6
15 changed files with 176 additions and 155 deletions

View File

@@ -0,0 +1,12 @@
global:
loglevel: debug
tcp: :50052
http: :8081
disableall: true
flv:
enable: true
pull:
live/test: /Users/dexter/Movies/jb-demo.flv
rtsp:
enable: true
tcp: :8554

View File

@@ -54,11 +54,11 @@ func (t *AsyncTickTask) GetSignal() any {
}
func (t *AsyncTickTask) Go() error {
t.Tick(nil)
t.handler.(ITickTask).Tick(nil)
for {
select {
case c := <-t.Ticker.C:
t.Tick(c)
t.handler.(ITickTask).Tick(c)
case <-t.Done():
return nil
}

View File

@@ -77,6 +77,8 @@ type (
OnDispose(func())
GetState() TaskState
GetLevel() byte
WaitStopped() error
WaitStarted() error
}
IJob interface {
ITask
@@ -324,6 +326,9 @@ func (task *Task) start() bool {
task.ResetRetryCount()
if runHandler, ok := task.handler.(TaskBlock); ok {
task.state = TASK_STATE_RUNNING
if task.Logger != nil {
task.Debug("task run", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
}
err = runHandler.Run()
if err == nil {
err = ErrTaskComplete
@@ -334,6 +339,9 @@ func (task *Task) start() bool {
if err == nil {
if goHandler, ok := task.handler.(TaskGo); ok {
task.state = TASK_STATE_GOING
if task.Logger != nil {
task.Debug("task go", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
}
go task.run(goHandler.Go)
}
return true

View File

@@ -89,7 +89,7 @@ type (
OnQUICConnect(quic.Connection) task.ITask
}
IPullProxyPlugin interface {
OnPullProxyAdd(pullProxy *PullProxy) any
OnPullProxyAdd(pullProxy *PullProxyConfig) any
}
IPushProxyPlugin interface {
OnPushProxyAdd(pushProxy *PushProxy) any

View File

@@ -97,9 +97,9 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = live.Run()
}
func (plugin *FLVPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
func (plugin *FLVPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any {
d := &m7s.HTTPPullProxy{}
d.PullProxy = pullProxy
d.PullProxyConfig = pullProxy
d.Plugin = &plugin.Plugin
return d
}

View File

@@ -41,6 +41,7 @@ func (r *PresetRequest) GetKey() int {
}
type Channel struct {
PullProxyTask *m7s.PullProxyTask
Device *Device // 所属设备
State atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲
GpsTime time.Time // gps时间
@@ -49,21 +50,12 @@ type Channel struct {
PresetReqs util.Collection[int, *PresetRequest] // 预置位请求集合
*slog.Logger
gb28181.DeviceChannel
AbstractDevice *m7s.PullProxy
}
func (c *Channel) GetKey() string {
return c.DeviceID
}
func (c *Channel) Pull() {
pubConf := c.Device.plugin.GetCommonConf().Publish
pubConf.PubAudio = c.AbstractDevice.Audio
pubConf.DelayCloseTimeout = util.Conditional(c.AbstractDevice.StopOnIdle, time.Second*5, 0)
c.Info("into channel.Pull")
c.Device.plugin.Pull(c.AbstractDevice.GetStreamPath(), c.AbstractDevice.Pull, &pubConf)
}
func (c *Channel) GetDeviceID() string {
return c.DeviceID
}

View File

@@ -264,11 +264,11 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
gb.devices.Set(device)
})
device.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool {
return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", device.DeviceID, c.DeviceID)
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice m7s.IPullProxy) bool {
conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceID, c.DeviceID)
}); ok {
c.AbstractDevice = absDevice
absDevice.Handler = c
c.PullProxyTask = absDevice.(*m7s.PullProxyTask)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
if gb.AutoInvite {
@@ -282,8 +282,8 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
device.Status = DeviceOfflineStatus
if gb.devices.RemoveByKey(device.DeviceID) {
for c := range device.channels.Range {
if c.AbstractDevice != nil {
c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline)
if c.PullProxyTask != nil {
c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}
@@ -372,14 +372,17 @@ func (gb *GB28181Plugin) checkPlatform() {
}
}
func (p *GB28181Plugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
deviceID, channelID, _ := strings.Cut(pullProxy.URL, "/")
func (p *GB28181Plugin) OnPullProxyAdd(conf *m7s.PullProxyConfig) any {
deviceID, channelID, _ := strings.Cut(conf.URL, "/")
if d, ok := p.devices.Get(deviceID); ok {
if channel, ok := d.channels.Get(channelID); ok {
channel.AbstractDevice = pullProxy
pullProxy.Handler = channel
pullProxy := &m7s.PullProxyTask{
PullProxyConfig: conf,
Plugin: &p.Plugin,
}
pullProxy.ChangeStatus(m7s.PullProxyStatusOnline)
return channel
channel.PullProxyTask = pullProxy
return pullProxy
}
}
return nil
@@ -839,11 +842,11 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
d.OnStart(func() {
gb.devices.Set(d)
d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool {
return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID)
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice m7s.IPullProxy) bool {
conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID)
}); ok {
c.AbstractDevice = absDevice
absDevice.Handler = c
c.PullProxyTask = absDevice.(*m7s.PullProxyTask)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
if gb.AutoInvite {
@@ -858,8 +861,8 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
d.Status = DeviceOfflineStatus
if gb.devices.RemoveByKey(d.DeviceID) {
for c := range d.channels.Range {
if c.AbstractDevice != nil {
c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline)
if c.PullProxyTask != nil {
c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}

View File

@@ -45,9 +45,9 @@ func (p *HLSPlugin) RegisterHandler() map[string]http.HandlerFunc {
}
}
func (p *HLSPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
func (p *HLSPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any {
d := &m7s.HTTPPullProxy{}
d.PullProxy = pullProxy
d.PullProxyConfig = pullProxy
d.Plugin = &p.Plugin
return d
}

View File

@@ -182,11 +182,10 @@ func (task *RTMPServer) Go() (err error) {
return
}
func (p *RTMPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
func (p *RTMPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any {
ret := &RTMPPullProxy{}
ret.PullProxy = pullProxy
ret.PullProxyConfig = pullProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("pullProxy", pullProxy.Name)
return ret
}

View File

@@ -13,7 +13,7 @@ type RTMPPullProxy struct {
}
func (d *RTMPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxy.URL)
d.URL, err = url.Parse(d.PullProxyConfig.URL)
if err != nil {
return
}

View File

@@ -26,11 +26,10 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
return ret
}
func (p *RTSPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxy) any {
func (p *RTSPPlugin) OnPullProxyAdd(pullProxy *m7s.PullProxyConfig) any {
ret := &RTSPPullProxy{}
ret.PullProxy = pullProxy
ret.PullProxyConfig = pullProxy
ret.Plugin = &p.Plugin
ret.Logger = p.With("pullProxy", pullProxy.Name)
return ret
}

View File

@@ -17,7 +17,7 @@ type RTSPPullProxy struct {
}
func (d *RTSPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxy.URL)
d.URL, err = url.Parse(d.PullProxyConfig.URL)
if err != nil {
return
}
@@ -54,23 +54,23 @@ func (d *RTSPPullProxy) GetTickInterval() time.Duration {
func (d *RTSPPullProxy) Tick(any) {
var err error
switch d.PullProxy.Status {
switch d.Status {
case m7s.PullProxyStatusOffline:
err = d.conn.Connect(d.PullProxy.URL)
err = d.conn.Connect(d.PullProxyConfig.URL)
if err != nil {
return
}
d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline)
d.ChangeStatus(m7s.PullProxyStatusOnline)
case m7s.PullProxyStatusOnline, m7s.PullProxyStatusPulling:
if d.conn.Conn == nil {
err = d.conn.Connect(d.PullProxy.URL)
err = d.conn.Connect(d.PullProxyConfig.URL)
} else {
t := time.Now()
err = d.conn.Options()
d.PullProxy.RTT = time.Since(t)
d.RTT = time.Since(t)
}
if err != nil {
d.PullProxy.ChangeStatus(m7s.PullProxyStatusOffline)
d.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}

View File

@@ -143,7 +143,7 @@ type Publisher struct {
GOP int
OnSeek func(time.Time)
OnGetPosition func() time.Time
PullProxy *PullProxy
PullProxyConfig *PullProxyConfig
dumpFile *os.File
dropRate float64 // 丢帧率0-1之间
dropAfterTs time.Duration

View File

@@ -30,11 +30,15 @@ const (
type (
IPullProxy interface {
task.ITask
GetStreamPath() string
GetConfig() *PullProxyConfig
ChangeStatus(status byte)
Pull()
GetKey() uint
}
PullProxy struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all" yaml:"-"`
PullProxyConfig struct {
server *Server `gorm:"-:all"`
ID uint `gorm:"primarykey"`
CreatedAt, UpdatedAt time.Time `yaml:"-"`
DeletedAt gorm.DeletedAt `yaml:"-"`
@@ -48,15 +52,14 @@ type (
Status byte
Description string
RTT time.Duration
Handler IPullProxy `gorm:"-:all" yaml:"-"`
}
PullProxyManager struct {
task.Manager[uint, *PullProxy]
task.Manager[uint, IPullProxy]
}
PullProxyTask struct {
*PullProxyConfig
task.AsyncTickTask
PullProxy *PullProxy
Plugin *Plugin
Plugin *Plugin
}
HTTPPullProxy struct {
TCPPullProxy
@@ -68,38 +71,22 @@ type (
}
)
func (d *PullProxy) GetKey() uint {
func (d *PullProxyConfig) GetKey() uint {
return d.ID
}
func (d *PullProxy) GetStreamPath() string {
func (d *PullProxyConfig) GetConfig() *PullProxyConfig {
return d
}
func (d *PullProxyConfig) GetStreamPath() string {
if d.StreamPath == "" {
return fmt.Sprintf("pull/%s/%d", d.Type, d.ID)
}
return d.StreamPath
}
func (d *PullProxy) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if pullPlugin, ok := plugin.handler.(IPullProxyPlugin); ok && strings.EqualFold(d.Type, plugin.Meta.Name) {
pullTask := pullPlugin.OnPullProxyAdd(d)
if pullTask == nil {
continue
}
if pullTask, ok := pullTask.(IPullProxy); ok {
d.Handler = pullTask
}
if t, ok := pullTask.(task.ITask); ok {
d.AddTask(t)
} else {
d.ChangeStatus(PullProxyStatusOnline)
}
}
}
return
}
func (d *PullProxy) ChangeStatus(status byte) {
func (d *PullProxyTask) ChangeStatus(status byte) {
if d.Status == status {
return
}
@@ -110,34 +97,30 @@ func (d *PullProxy) ChangeStatus(status byte) {
switch status {
case PullProxyStatusOnline:
if d.PullOnStart && from == PullProxyStatusOffline {
d.Handler.Pull()
d.Pull()
}
}
}
func (d *PullProxy) Update() {
func (d *PullProxyConfig) Update() {
if d.server.DB != nil {
d.server.DB.Omit("deleted_at").Save(d)
}
}
func (d *PullProxyTask) Dispose() {
d.PullProxy.ChangeStatus(PullProxyStatusOffline)
d.Plugin.Server.Streams.Call(func() error {
if stream, ok := d.Plugin.Server.Streams.Get(d.PullProxy.GetStreamPath()); ok {
stream.Stop(task.ErrStopByUser)
}
return nil
})
d.ChangeStatus(PullProxyStatusOffline)
if stream, ok := d.Plugin.Server.Streams.SafeGet(d.GetStreamPath()); ok {
stream.Stop(task.ErrStopByUser)
}
}
func (d *PullProxy) InitializeWithServer(s *Server) {
func (d *PullProxyConfig) InitializeWithServer(s *Server) {
d.server = s
d.Logger = s.Logger.With("pullProxy", d.ID, "type", d.Type, "name", d.Name)
if d.Type == "" {
u, err := url.Parse(d.URL)
if err != nil {
d.Logger.Error("parse pull url failed", "error", err)
s.Error("parse pull url failed", "error", err)
return
}
switch u.Scheme {
@@ -159,13 +142,13 @@ func (d *PullProxy) InitializeWithServer(s *Server) {
func (d *PullProxyTask) Pull() {
var pubConf = d.Plugin.config.Publish
pubConf.PubAudio = d.PullProxy.Audio
pubConf.DelayCloseTimeout = util.Conditional(d.PullProxy.StopOnIdle, time.Second*5, 0)
d.Plugin.handler.Pull(d.PullProxy.GetStreamPath(), d.PullProxy.Pull, &pubConf)
pubConf.PubAudio = d.Audio
pubConf.DelayCloseTimeout = util.Conditional(d.StopOnIdle, time.Second*5, 0)
d.Plugin.handler.Pull(d.GetStreamPath(), d.PullProxyConfig.Pull, &pubConf)
}
func (d *HTTPPullProxy) Start() (err error) {
d.URL, err = url.Parse(d.PullProxy.URL)
d.URL, err = url.Parse(d.PullProxyConfig.URL)
if err != nil {
return
}
@@ -194,30 +177,30 @@ func (d *TCPPullProxy) GetTickInterval() time.Duration {
}
func (d *TCPPullProxy) Tick(any) {
switch d.PullProxy.Status {
switch d.Status {
case PullProxyStatusOffline:
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.TCPAddr)
if err != nil {
d.PullProxy.ChangeStatus(PullProxyStatusOffline)
d.ChangeStatus(PullProxyStatusOffline)
return
}
conn.Close()
d.PullProxy.RTT = time.Since(startTime)
d.PullProxy.ChangeStatus(PullProxyStatusOnline)
d.RTT = time.Since(startTime)
d.ChangeStatus(PullProxyStatusOnline)
}
}
func (p *Publisher) processPullProxyOnStart() {
s := p.Plugin.Server
if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool {
if pullProxy, ok := s.PullProxies.Find(func(pullProxy IPullProxy) bool {
return pullProxy.GetStreamPath() == p.StreamPath
}); ok {
p.PullProxy = pullProxy
if pullProxy.Status == PullProxyStatusOnline {
p.PullProxyConfig = pullProxy.GetConfig()
if p.PullProxyConfig.Status == PullProxyStatusOnline {
pullProxy.ChangeStatus(PullProxyStatusPulling)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" {
mp4Plugin.Record(p, pullProxy.Record, nil)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && p.PullProxyConfig.FilePath != "" {
mp4Plugin.Record(p, p.PullProxyConfig.Record, nil)
}
}
}
@@ -225,30 +208,49 @@ func (p *Publisher) processPullProxyOnStart() {
func (p *Publisher) processPullProxyOnDispose() {
s := p.Plugin.Server
if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) {
p.PullProxy.ChangeStatus(PullProxyStatusOnline)
if p.PullProxyConfig != nil && p.PullProxyConfig.Status == PullProxyStatusPulling {
if pullproxy, ok := s.PullProxies.Get(p.PullProxyConfig.GetKey()); ok {
pullproxy.ChangeStatus(PullProxyStatusOnline)
}
}
}
func (s *Server) createPullProxy(conf *PullProxyConfig) (pullProxy IPullProxy, err error) {
for plugin := range s.Plugins.Range {
if pullPlugin, ok := plugin.handler.(IPullProxyPlugin); ok && strings.EqualFold(conf.Type, plugin.Meta.Name) {
pullTask := pullPlugin.OnPullProxyAdd(conf)
if pullTask == nil {
continue
}
if pullTask, ok := pullTask.(IPullProxy); ok {
s.PullProxies.Add(pullTask, plugin.Logger.With("pullProxyId", conf.ID, "pullProxyType", conf.Type, "pullProxyName", conf.Name))
return pullTask, nil
}
}
}
return
}
func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PullProxyListResponse, err error) {
res = &pb.PullProxyListResponse{}
for device := range s.PullProxies.SafeRange {
conf := device.GetConfig()
res.Data = append(res.Data, &pb.PullProxyInfo{
Name: device.Name,
CreateTime: timestamppb.New(device.CreatedAt),
UpdateTime: timestamppb.New(device.UpdatedAt),
Type: device.Type,
PullURL: device.URL,
ParentID: uint32(device.ParentID),
Status: uint32(device.Status),
ID: uint32(device.ID),
PullOnStart: device.PullOnStart,
StopOnIdle: device.StopOnIdle,
Audio: device.Audio,
RecordPath: device.Record.FilePath,
RecordFragment: durationpb.New(device.Record.Fragment),
Description: device.Description,
Rtt: uint32(device.RTT.Milliseconds()),
Name: conf.Name,
CreateTime: timestamppb.New(conf.CreatedAt),
UpdateTime: timestamppb.New(conf.UpdatedAt),
Type: conf.Type,
PullURL: conf.URL,
ParentID: uint32(conf.ParentID),
Status: uint32(conf.Status),
ID: uint32(conf.ID),
PullOnStart: conf.PullOnStart,
StopOnIdle: conf.StopOnIdle,
Audio: conf.Audio,
RecordPath: conf.Record.FilePath,
RecordFragment: durationpb.New(conf.Record.Fragment),
Description: conf.Description,
Rtt: uint32(conf.RTT.Milliseconds()),
StreamPath: device.GetStreamPath(),
})
}
@@ -256,7 +258,7 @@ func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res
}
func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) {
device := &PullProxy{
device := &PullProxyConfig{
server: s,
Name: req.Name,
Type: req.Type,
@@ -302,7 +304,8 @@ func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *
if req.StreamPath == "" {
device.StreamPath = device.GetStreamPath()
}
s.PullProxies.Add(device)
_, err = s.createPullProxy(device)
res = &pb.SuccessResponse{}
return
}
@@ -312,7 +315,7 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re
err = pkg.ErrNoDB
return
}
target := &PullProxy{
target := &PullProxyConfig{
server: s,
}
err = s.DB.First(target, req.ID).Error
@@ -354,25 +357,29 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re
target.RTT = time.Duration(int(req.Rtt)) * time.Millisecond
target.StreamPath = req.StreamPath
s.DB.Save(target)
var needStopOld *PullProxy
if device, ok := s.PullProxies.SafeGet(uint(req.ID)); ok {
if target.URL != device.URL || device.Audio != target.Audio || device.StreamPath != target.StreamPath || device.Record.FilePath != target.Record.FilePath || device.Record.Fragment != target.Record.Fragment {
conf := device.GetConfig()
if target.URL != conf.URL || conf.Audio != target.Audio || conf.StreamPath != target.StreamPath || conf.Record.FilePath != target.Record.FilePath || conf.Record.Fragment != target.Record.Fragment {
device.Stop(task.ErrStopByUser)
needStopOld = device
device.WaitStopped()
_, err = s.createPullProxy(target)
if target.Status == PullProxyStatusPulling {
if pullJob, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok {
pullJob.Stop(task.ErrStopByUser)
pullJob.WaitStopped()
}
device.Pull()
}
} else {
device.Name = target.Name
device.PullOnStart = target.PullOnStart
device.StopOnIdle = target.StopOnIdle
device.Description = target.Description
conf.Name = target.Name
conf.PullOnStart = target.PullOnStart
if conf.PullOnStart && conf.Status == PullProxyStatusOnline {
device.Pull()
}
conf.StopOnIdle = target.StopOnIdle
conf.Description = target.Description
}
}
if needStopOld != nil {
if pullJob, ok := s.Pulls.SafeGet(req.StreamPath); ok {
pullJob.Stop(task.ErrStopByUser)
pullJob.WaitStopped()
}
s.PullProxies.Add(target).WaitStarted()
}
res = &pb.SuccessResponse{}
return
}
@@ -384,29 +391,29 @@ func (s *Server) RemovePullProxy(ctx context.Context, req *pb.RequestWithId) (re
}
res = &pb.SuccessResponse{}
if req.Id > 0 {
tx := s.DB.Delete(&PullProxy{
tx := s.DB.Delete(&PullProxyConfig{
ID: uint(req.Id),
})
err = tx.Error
if device, ok := s.PullProxies.SafeGet(uint(req.Id)); ok {
device.Stop(task.ErrStopByUser)
if pull, ok := device.server.Pulls.SafeGet(device.StreamPath); ok {
pull.Stop(task.ErrStopByUser)
}
// if pull, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok {
// pull.Stop(task.ErrStopByUser)
// }
}
return
} else if req.StreamPath != "" {
var deviceList []*PullProxy
var deviceList []*PullProxyConfig
s.DB.Find(&deviceList, "stream_path=?", req.StreamPath)
if len(deviceList) > 0 {
for _, device := range deviceList {
tx := s.DB.Delete(&PullProxy{}, device.ID)
tx := s.DB.Delete(&PullProxyConfig{}, device.ID)
err = tx.Error
if device, ok := s.PullProxies.SafeGet(uint(device.ID)); ok {
device.Stop(task.ErrStopByUser)
if pull, ok := device.server.Pulls.SafeGet(device.StreamPath); ok {
pull.Stop(task.ErrStopByUser)
}
// if pull, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok {
// pull.Stop(task.ErrStopByUser)
// }
}
}
}

View File

@@ -63,7 +63,7 @@ type (
DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件
StreamAlias map[config.Regexp]string `desc:"流别名"`
Location map[config.Regexp]string `desc:"HTTP路由转发规则,key为正则表达式,value为目标地址"`
PullProxy []*PullProxy
PullProxy []*PullProxyConfig
PushProxy []*PushProxy
Admin struct {
EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制
@@ -281,7 +281,7 @@ func (s *Server) Start() (err error) {
return
}
// Auto-migrate models
if err = s.DB.AutoMigrate(&db.User{}, &PullProxy{}, &PushProxy{}, &StreamAliasDB{}); err != nil {
if err = s.DB.AutoMigrate(&db.User{}, &PullProxyConfig{}, &PushProxy{}, &StreamAliasDB{}); err != nil {
s.Error("failed to auto-migrate models", "error", err)
return
}
@@ -424,15 +424,15 @@ func (s *Server) Start() (err error) {
func (s *Server) initPullProxies() {
// 1. First read all pull proxies from database
var pullProxies []*PullProxy
var pullProxies []*PullProxyConfig
s.DB.Find(&pullProxies)
// Create a map for quick lookup of existing proxies
existingPullProxies := make(map[uint]*PullProxy)
existingPullProxies := make(map[uint]*PullProxyConfig)
for _, proxy := range pullProxies {
existingPullProxies[proxy.ID] = proxy
proxy.Status = PullProxyStatusOffline
proxy.InitializeWithServer(s)
proxy.ChangeStatus(PullProxyStatusOffline)
}
// 2. Process and override with config data
@@ -457,7 +457,7 @@ func (s *Server) initPullProxies() {
// 3. Finally add all proxies to collections
for _, proxy := range pullProxies {
s.PullProxies.Add(proxy)
s.createPullProxy(proxy)
}
}
@@ -506,7 +506,7 @@ func (s *Server) initPullProxiesWithoutDB() {
for _, proxy := range s.PullProxy {
if proxy.ID != 0 {
proxy.InitializeWithServer(s)
s.PullProxies.Add(proxy, proxy.Logger)
s.createPullProxy(proxy)
}
}
}
@@ -598,8 +598,9 @@ func (s *Server) OnSubscribe(streamPath string, args url.Values) {
plugin.OnSubscribe(streamPath, args)
}
for pullProxy := range s.PullProxies.Range {
if pullProxy.Status == PullProxyStatusOnline && pullProxy.GetStreamPath() == streamPath && !pullProxy.PullOnStart {
pullProxy.Handler.Pull()
conf := pullProxy.GetConfig()
if conf.Status == PullProxyStatusOnline && pullProxy.GetStreamPath() == streamPath && !conf.PullOnStart {
pullProxy.Pull()
}
}
}