mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-04 13:52:41 +08:00
feat: add maxcount error
This commit is contained in:
@@ -9,10 +9,12 @@ var (
|
|||||||
ErrRecordExists = errors.New("record exists")
|
ErrRecordExists = errors.New("record exists")
|
||||||
ErrKick = errors.New("kick")
|
ErrKick = errors.New("kick")
|
||||||
ErrDiscard = errors.New("discard")
|
ErrDiscard = errors.New("discard")
|
||||||
|
ErrPublishMaxCount = errors.New("publish max count exceeded")
|
||||||
ErrPublishTimeout = errors.New("publish timeout")
|
ErrPublishTimeout = errors.New("publish timeout")
|
||||||
ErrPublishIdleTimeout = errors.New("publish idle timeout")
|
ErrPublishIdleTimeout = errors.New("publish idle timeout")
|
||||||
ErrPublishDelayCloseTimeout = errors.New("publish delay close timeout")
|
ErrPublishDelayCloseTimeout = errors.New("publish delay close timeout")
|
||||||
ErrPushRemoteURLExist = errors.New("push remote url exist")
|
ErrPushRemoteURLExist = errors.New("push remote url exist")
|
||||||
|
ErrSubscribeMaxCount = errors.New("subscribe max count exceeded")
|
||||||
ErrSubscribeTimeout = errors.New("subscribe timeout")
|
ErrSubscribeTimeout = errors.New("subscribe timeout")
|
||||||
ErrRestart = errors.New("restart")
|
ErrRestart = errors.New("restart")
|
||||||
ErrInterrupt = errors.New("interrupt")
|
ErrInterrupt = errors.New("interrupt")
|
||||||
|
@@ -145,6 +145,9 @@ func (p *Publisher) Start() (err error) {
|
|||||||
return ErrStreamExist
|
return ErrStreamExist
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if p.MaxCount > 0 && s.Streams.Length >= p.MaxCount {
|
||||||
|
return ErrPublishMaxCount
|
||||||
|
}
|
||||||
s.Streams.Set(p)
|
s.Streams.Set(p)
|
||||||
p.Info("publish")
|
p.Info("publish")
|
||||||
p.processPullProxyOnStart()
|
p.processPullProxyOnStart()
|
||||||
|
@@ -103,7 +103,11 @@ func (s *Subscriber) waitingPublish() bool {
|
|||||||
|
|
||||||
func (s *Subscriber) Start() (err error) {
|
func (s *Subscriber) Start() (err error) {
|
||||||
server := s.Plugin.Server
|
server := s.Plugin.Server
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
server.Subscribers.Add(s)
|
server.Subscribers.Add(s)
|
||||||
|
}
|
||||||
|
}()
|
||||||
s.Info("subscribe")
|
s.Info("subscribe")
|
||||||
hasInvited, done := s.processAliasOnStart()
|
hasInvited, done := s.processAliasOnStart()
|
||||||
if done {
|
if done {
|
||||||
@@ -111,6 +115,9 @@ func (s *Subscriber) Start() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if publisher, ok := server.Streams.Get(s.StreamPath); ok {
|
if publisher, ok := server.Streams.Get(s.StreamPath); ok {
|
||||||
|
if s.MaxCount > 0 && publisher.Subscribers.Length >= s.MaxCount {
|
||||||
|
return ErrSubscribeMaxCount
|
||||||
|
}
|
||||||
publisher.AddSubscriber(s)
|
publisher.AddSubscriber(s)
|
||||||
} else {
|
} else {
|
||||||
server.Waiting.Wait(s)
|
server.Waiting.Wait(s)
|
||||||
|
Reference in New Issue
Block a user