feat: add change subscribe

This commit is contained in:
langhuihui
2024-06-13 17:15:30 +08:00
parent 68a7edf678
commit 00b39aee3e
26 changed files with 890 additions and 406 deletions

View File

@@ -51,7 +51,7 @@ type Server struct {
Streams util.Collection[string, *Publisher]
Pulls util.Collection[string, *Puller]
Pushs util.Collection[string, *Pusher]
Waiting map[string][]*Subscriber
Waiting util.Collection[string, *Publisher]
Subscribers util.Collection[int, *Subscriber]
LogHandler MultiLogHandler
pidG int
@@ -68,7 +68,6 @@ type Server struct {
func NewServer() (s *Server) {
s = &Server{
ID: int(serverIndexG.Add(1)),
Waiting: make(map[string][]*Subscriber),
eventChan: make(chan any, 10),
}
s.config.HTTP.ListenAddrTLS = ":8443"
@@ -94,7 +93,6 @@ type rawconfig = map[string]map[string]any
func (s *Server) reset() {
server := Server{
ID: s.ID,
Waiting: make(map[string][]*Subscriber),
eventChan: make(chan any, 10),
}
server.Logger = s.Logger
@@ -162,9 +160,9 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
}
s.LogHandler.SetLevel(lv.Level())
s.registerHandler(map[string]http.HandlerFunc{
"/api/config/json/{name}": s.api_Config_JSON_,
"/api/stream/annexb/{streamPath...}": s.api_Stream_AnnexB_,
"/api/videotrack/sse/{streamPath...}": s.api_VideoTrack_SSE,
"/api/config/json/{name}": s.api_Config_JSON_,
"/api/stream/annexb/{streamPath...}": s.api_Stream_AnnexB_,
"/api/videotrack/sse/{streamPath...}": s.api_VideoTrack_SSE,
})
if httpConf.ListenAddrTLS != "" {
@@ -222,13 +220,13 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
s.eventLoop()
err = context.Cause(s)
s.Warn("Server is done", "reason", err)
for _, publisher := range s.Streams.Items {
for publisher := range s.Streams.Range {
publisher.Stop(err)
}
for _, subscriber := range s.Subscribers.Items {
for subscriber := range s.Subscribers.Range {
subscriber.Stop(err)
}
for _, p := range s.Plugins.Items {
for p := range s.Plugins.Range {
p.Stop(err)
}
httpConf.StopListen()
@@ -269,13 +267,21 @@ func (s *Server) eventLoop() {
case <-s.Done():
return
case <-pulse.C:
for _, publisher := range s.Streams.Items {
for publisher := range s.Streams.Range {
if err := publisher.checkTimeout(); err != nil {
publisher.Stop(err)
}
}
for subscriber := range s.Waiting {
for _, sub := range s.Waiting[subscriber] {
for publisher := range s.Waiting.Range {
if publisher.Plugin != nil {
if err := publisher.checkTimeout(); err != nil {
publisher.Dispose(err)
newPublisher := &Publisher{}
newPublisher.StreamPath = publisher.StreamPath
s.Waiting.Set(newPublisher)
}
}
for sub := range publisher.SubscriberRange {
select {
case <-sub.TimeoutTimer.C:
sub.Stop(ErrSubscribeTimeout)
@@ -347,7 +353,7 @@ func (s *Server) eventLoop() {
case slog.Handler:
s.LogHandler.Add(v)
}
for _, plugin := range s.Plugins.Items {
for plugin := range s.Plugins.Range {
if plugin.Disabled {
continue
}
@@ -363,7 +369,7 @@ func (s *Server) onUnsubscribe(subscriber *Subscriber) {
if subscriber.Closer != nil {
subscriber.Close()
}
for _, pusher := range s.Pushs.Items {
for pusher := range s.Pushs.Range {
if &pusher.Subscriber == subscriber {
s.Pushs.Remove(pusher)
break
@@ -372,25 +378,17 @@ func (s *Server) onUnsubscribe(subscriber *Subscriber) {
if subscriber.Publisher != nil {
subscriber.Publisher.RemoveSubscriber(subscriber)
}
if subscribers, ok := s.Waiting[subscriber.StreamPath]; ok {
if index := slices.Index(subscribers, subscriber); index >= 0 {
s.Waiting[subscriber.StreamPath] = slices.Delete(subscribers, index, index+1)
if len(subscribers) == 1 {
delete(s.Waiting, subscriber.StreamPath)
}
}
}
}
func (s *Server) onUnpublish(publisher *Publisher) {
s.Streams.Remove(publisher)
s.Waiting.Add(publisher)
s.Info("unpublish", "streamPath", publisher.StreamPath, "count", s.Streams.Length)
for subscriber := range publisher.Subscribers {
s.Waiting[publisher.StreamPath] = append(s.Waiting[publisher.StreamPath], subscriber)
for subscriber := range publisher.SubscriberRange {
subscriber.TimeoutTimer.Reset(publisher.WaitCloseTimeout)
}
if publisher.Closer != nil {
publisher.Close()
_ = publisher.Close()
}
s.Pulls.RemoveByKey(publisher.StreamPath)
}
@@ -401,12 +399,9 @@ func (s *Server) OnPublish(publisher *Publisher) error {
publisher.Warn("kick")
oldPublisher.Stop(ErrKick)
publisher.TakeOver(oldPublisher)
oldPublisher.Subscribers = nil
} else {
return ErrStreamExist
}
} else {
publisher.Subscribers = make(map[*Subscriber]struct{})
}
s.Streams.Set(publisher)
s.pidG++
@@ -415,14 +410,15 @@ func (s *Server) OnPublish(publisher *Publisher) error {
publisher.Logger = p.With("streamPath", publisher.StreamPath, "puber", publisher.ID)
publisher.TimeoutTimer = time.NewTimer(p.config.PublishTimeout)
publisher.Info("publish")
if subscribers, ok := s.Waiting[publisher.StreamPath]; ok {
for i, subscriber := range subscribers {
if i == 0 && subscriber.Publisher != nil {
publisher.TakeOver(subscriber.Publisher)
if waiting, ok := s.Waiting.Get(publisher.StreamPath); ok {
if waiting.Plugin != nil {
publisher.TakeOver(waiting)
} else {
for subscriber := range waiting.SubscriberRange {
publisher.AddSubscriber(subscriber)
}
publisher.AddSubscriber(subscriber)
}
delete(s.Waiting, publisher.StreamPath)
s.Waiting.Remove(waiting)
}
return nil
}
@@ -435,9 +431,13 @@ func (s *Server) OnSubscribe(subscriber *Subscriber) error {
s.Subscribers.Add(subscriber)
subscriber.Info("subscribe")
if publisher, ok := s.Streams.Get(subscriber.StreamPath); ok {
return publisher.AddSubscriber(subscriber)
publisher.AddSubscriber(subscriber)
} else if publisher, ok = s.Waiting.Get(subscriber.StreamPath); ok {
publisher.AddSubscriber(subscriber)
} else {
s.Waiting[subscriber.StreamPath] = append(s.Waiting[subscriber.StreamPath], subscriber)
newPublisher := &Publisher{}
newPublisher.StreamPath = subscriber.StreamPath
newPublisher.AddSubscriber(subscriber)
}
return nil
}
@@ -448,7 +448,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
fmt.Fprintf(w, "visit:%s\nMonibuca Engine %s StartTime:%s\n", r.URL.Path, Version, s.StartTime)
for _, plugin := range s.Plugins.Items {
for plugin := range s.Plugins.Range {
fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version)
}
for _, api := range s.apiList {