diff --git a/alias.go b/alias.go index dca9957..176dc91 100644 --- a/alias.go +++ b/alias.go @@ -84,12 +84,14 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque defer s.OnSubscribe(req.StreamPath, u.Query()) } if aliasInfo, ok := s.AliasStreams.Get(req.Alias); ok { //modify alias + oldStreamPath := aliasInfo.StreamPath aliasInfo.AutoRemove = req.AutoRemove if aliasInfo.StreamPath != req.StreamPath { aliasInfo.StreamPath = req.StreamPath if canReplace { if aliasInfo.Publisher != nil { aliasInfo.TransferSubscribers(publisher) // replace stream + aliasInfo.Publisher = publisher } else { s.Waiting.WakeUp(req.Alias, publisher) } @@ -103,6 +105,7 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque dbAlias.AutoRemove = req.AutoRemove s.DB.Save(&dbAlias) } + s.Info("modify alias", "alias", req.Alias, "oldStreamPath", oldStreamPath, "streamPath", req.StreamPath, "replace", ok && canReplace) } else { // create alias aliasInfo := &AliasStream{ AutoRemove: req.AutoRemove, @@ -164,3 +167,81 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque }) return } + +func (p *Publisher) processAliasOnStart() { + s := p.Plugin.Server + for alias := range s.AliasStreams.Range { + if alias.StreamPath != p.StreamPath { + continue + } + if alias.Publisher == nil { + alias.Publisher = p + s.Waiting.WakeUp(alias.Alias, p) + } else if alias.Publisher.StreamPath != alias.StreamPath { + alias.Publisher.TransferSubscribers(p) + alias.Publisher = p + } + } +} + +func (p *Publisher) processAliasOnDispose() { + s := p.Plugin.Server + var relatedAlias []*AliasStream + for alias := range s.AliasStreams.Range { + if alias.StreamPath == p.StreamPath { + if alias.AutoRemove { + defer s.AliasStreams.Remove(alias) + } + alias.Publisher = nil + relatedAlias = append(relatedAlias, alias) + } + } + if p.Subscribers.Length > 0 { + SUBSCRIBER: + for subscriber := range p.SubscriberRange { + for _, alias := range relatedAlias { + if subscriber.StreamPath == alias.Alias { + if originStream, ok := s.Streams.Get(alias.Alias); ok { + originStream.AddSubscriber(subscriber) + continue SUBSCRIBER + } + } + } + s.Waiting.Wait(subscriber) + } + p.Subscribers.Clear() + } +} + +func (s *Subscriber) processAliasOnStart() (hasInvited bool, done bool) { + server := s.Plugin.Server + if alias, ok := server.AliasStreams.Get(s.StreamPath); ok { + if alias.Publisher != nil { + alias.Publisher.AddSubscriber(s) + done = true + return + } else { + server.OnSubscribe(alias.StreamPath, s.Args) + hasInvited = true + } + } else { + for reg, alias := range server.StreamAlias { + if streamPath := reg.Replace(s.StreamPath, alias); streamPath != "" { + server.AliasStreams.Set(&AliasStream{ + StreamPath: streamPath, + Alias: s.StreamPath, + }) + if publisher, ok := server.Streams.Get(streamPath); ok { + publisher.AddSubscriber(s) + done = true + return + } else { + server.OnSubscribe(streamPath, s.Args) + hasInvited = true + } + break + } + } + } + return +} diff --git a/pkg/annexb.go b/pkg/annexb.go index 0e56400..424c636 100644 --- a/pkg/annexb.go +++ b/pkg/annexb.go @@ -135,7 +135,7 @@ func (a *AnnexB) Demux(codecCtx codec.ICodecCtx) (ret any, err error) { } else if [3]byte(lastFourBytes[1:]) == codec.NALU_Delimiter1 { startCode = 3 } - if startCode > 0 { + if startCode > 0 && reader.Offset() >= 3 { if reader.Offset() == 3 { startCode = 3 } diff --git a/plugin.go b/plugin.go index 4fe72e0..b06d3d9 100644 --- a/plugin.go +++ b/plugin.go @@ -674,7 +674,7 @@ func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) { for patten, handler := range handlers { p.handle(patten, handler) } - if p.config.EnableAuth && p.Server.ServerConfig.EnableLogin { + if p.config.EnableAuth && p.Server.ServerConfig.Admin.EnableLogin { p.handle("/api/secret/{type}/{streamPath...}", http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { authHeader := r.Header.Get("Authorization") if authHeader == "" { diff --git a/publisher.go b/publisher.go index 01fa0b5..80e4bb9 100644 --- a/publisher.go +++ b/publisher.go @@ -178,17 +178,7 @@ func (p *Publisher) Start() (err error) { } s.Streams.Set(p) p.Info("publish") - if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool { - return pullProxy.GetStreamPath() == p.StreamPath - }); ok { - p.PullProxy = pullProxy - if pullProxy.Status == PullProxyStatusOnline { - pullProxy.ChangeStatus(PullProxyStatusPulling) - if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" { - mp4Plugin.Record(p, pullProxy.Record, nil) - } - } - } + p.processPullProxyOnStart() p.audioReady = util.NewPromiseWithTimeout(p, p.PublishTimeout) if !p.PubAudio { p.audioReady.Reject(ErrMuted) @@ -204,20 +194,7 @@ func (p *Publisher) Start() (err error) { } s.Waiting.WakeUp(p.StreamPath, p) - - for alias := range s.AliasStreams.Range { - if alias.StreamPath != p.StreamPath { - continue - } - if alias.Publisher == nil { - alias.Publisher = p - s.Waiting.WakeUp(alias.Alias, p) - } else if alias.Publisher.StreamPath != alias.StreamPath { - alias.Publisher.TransferSubscribers(p) - alias.Publisher = p - } - } - + p.processAliasOnStart() for plugin := range s.Plugins.Range { plugin.OnPublish(p) } @@ -589,32 +566,7 @@ func (p *Publisher) Dispose() { if p.Paused != nil { p.Paused.Reject(p.StopReason()) } - var relatedAlias []*AliasStream - for alias := range s.AliasStreams.Range { - if alias.StreamPath == p.StreamPath { - if alias.AutoRemove { - defer s.AliasStreams.Remove(alias) - } - alias.Publisher = nil - relatedAlias = append(relatedAlias, alias) - } - } - - if p.Subscribers.Length > 0 { - SUBSCRIBER: - for subscriber := range p.SubscriberRange { - for _, alias := range relatedAlias { - if subscriber.StreamPath == alias.Alias { - if originStream, ok := s.Streams.Get(alias.Alias); ok { - originStream.AddSubscriber(subscriber) - continue SUBSCRIBER - } - } - } - s.Waiting.Wait(subscriber) - } - p.Subscribers.Clear() - } + p.processAliasOnDispose() p.AudioTrack.Dispose() p.VideoTrack.Dispose() p.Info("unpublish", "remain", s.Streams.Length, "reason", p.StopReason()) @@ -622,9 +574,7 @@ func (p *Publisher) Dispose() { p.dumpFile.Close() } p.State = PublisherStateDisposed - if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) { - p.PullProxy.ChangeStatus(PullProxyStatusOnline) - } + p.processPullProxyOnDispose() } func (p *Publisher) TransferSubscribers(newPublisher *Publisher) { diff --git a/pull-proxy.go b/pull-proxy.go index 29e8f20..5266db7 100644 --- a/pull-proxy.go +++ b/pull-proxy.go @@ -9,8 +9,6 @@ import ( "strings" "time" - "log/slog" - "github.com/mcuadros/go-defaults" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/emptypb" @@ -45,13 +43,13 @@ type ( PullOnStart, Audio, StopOnIdle bool config.Pull `gorm:"embedded;embeddedPrefix:pull_"` config.Record `gorm:"embedded;embeddedPrefix:record_"` + RecordType string ParentID uint Type string Status byte Description string RTT time.Duration - Handler IPullProxy `gorm:"-:all" yaml:"-"` - Logger *slog.Logger `gorm:"-:all" yaml:"-"` + Handler IPullProxy `gorm:"-:all" yaml:"-"` } PullProxyManager struct { task.Manager[uint, *PullProxy] @@ -216,6 +214,28 @@ func (d *TCPPullProxy) Tick(any) { } } +func (p *Publisher) processPullProxyOnStart() { + s := p.Plugin.Server + if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool { + return pullProxy.GetStreamPath() == p.StreamPath + }); ok { + p.PullProxy = pullProxy + if pullProxy.Status == PullProxyStatusOnline { + pullProxy.ChangeStatus(PullProxyStatusPulling) + if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" { + mp4Plugin.Record(p, pullProxy.Record, nil) + } + } + } +} + +func (p *Publisher) processPullProxyOnDispose() { + s := p.Plugin.Server + if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) { + p.PullProxy.ChangeStatus(PullProxyStatusOnline) + } +} + func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PullProxyListResponse, err error) { res = &pb.PullProxyListResponse{} s.PullProxies.Call(func() error { diff --git a/server.go b/server.go index 2a0d6a7..9f58151 100644 --- a/server.go +++ b/server.go @@ -62,12 +62,16 @@ type ( StreamAlias map[config.Regexp]string `desc:"流别名"` PullProxy []*PullProxy PushProxy []*PushProxy - EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制 - Users []struct { - Username string `desc:"用户名"` - Password string `desc:"密码"` - Role string `default:"user" desc:"角色,可选值:admin,user"` - } `desc:"用户列表,仅在启用登录机制时生效"` + Admin struct { + EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制 + FilePath string `default:"admin.zip" desc:"管理员界面文件路径"` + HomePage string `default:"home" desc:"管理员界面首页"` + Users []struct { + Username string `desc:"用户名"` + Password string `desc:"密码"` + Role string `default:"user" desc:"角色,可选值:admin,user"` + } `desc:"用户列表,仅在启用登录机制时生效"` + } `desc:"管理员界面配置"` } WaitStream struct { StreamPath string @@ -278,8 +282,8 @@ func (s *Server) Start() (err error) { return } // Create users from configuration if EnableLogin is true - if s.ServerConfig.EnableLogin { - for _, userConfig := range s.ServerConfig.Users { + if s.ServerConfig.Admin.EnableLogin { + for _, userConfig := range s.ServerConfig.Admin.Users { var user db.User // Check if user exists if err = s.DB.Where("username = ?", userConfig.Username).First(&user).Error; err != nil { @@ -601,7 +605,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ValidateToken implements auth.TokenValidator func (s *Server) ValidateToken(tokenString string) (*auth.JWTClaims, error) { - if !s.ServerConfig.EnableLogin { + if !s.ServerConfig.Admin.EnableLogin { return &auth.JWTClaims{Username: "anonymous"}, nil } return auth.ValidateJWT(tokenString) @@ -610,7 +614,7 @@ func (s *Server) ValidateToken(tokenString string) (*auth.JWTClaims, error) { // Login implements the Login RPC method func (s *Server) Login(ctx context.Context, req *pb.LoginRequest) (res *pb.LoginResponse, err error) { res = &pb.LoginResponse{} - if !s.ServerConfig.EnableLogin { + if !s.ServerConfig.Admin.EnableLogin { res.Data = &pb.LoginSuccess{ Token: "monibuca", UserInfo: &pb.UserInfo{ @@ -663,7 +667,7 @@ func (s *Server) Logout(ctx context.Context, req *pb.LogoutRequest) (res *pb.Log // GetUserInfo implements the GetUserInfo RPC method func (s *Server) GetUserInfo(ctx context.Context, req *pb.UserInfoRequest) (res *pb.UserInfoResponse, err error) { - if !s.ServerConfig.EnableLogin { + if !s.ServerConfig.Admin.EnableLogin { res = &pb.UserInfoResponse{ Code: 0, Message: "success", @@ -702,7 +706,7 @@ func (s *Server) GetUserInfo(ctx context.Context, req *pb.UserInfoRequest) (res // AuthInterceptor creates a new unary interceptor for authentication func (s *Server) AuthInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - if !s.ServerConfig.EnableLogin { + if !s.ServerConfig.Admin.EnableLogin { return handler(ctx, req) } diff --git a/subscriber.go b/subscriber.go index 0e0ded0..b836751 100644 --- a/subscriber.go +++ b/subscriber.go @@ -105,33 +105,11 @@ func (s *Subscriber) Start() (err error) { server := s.Plugin.Server server.Subscribers.Add(s) s.Info("subscribe") - hasInvited := false - if alias, ok := server.AliasStreams.Get(s.StreamPath); ok { - if alias.Publisher != nil { - alias.Publisher.AddSubscriber(s) - return - } else { - server.OnSubscribe(alias.StreamPath, s.Args) - hasInvited = true - } - } else { - for reg, alias := range server.StreamAlias { - if streamPath := reg.Replace(s.StreamPath, alias); streamPath != "" { - server.AliasStreams.Set(&AliasStream{ - StreamPath: streamPath, - Alias: s.StreamPath, - }) - if publisher, ok := server.Streams.Get(streamPath); ok { - publisher.AddSubscriber(s) - return - } else { - server.OnSubscribe(streamPath, s.Args) - hasInvited = true - } - break - } - } + hasInvited, done := s.processAliasOnStart() + if done { + return } + if publisher, ok := server.Streams.Get(s.StreamPath); ok { publisher.AddSubscriber(s) } else {