fix: switch alias

This commit is contained in:
langhuihui
2025-01-08 13:19:34 +08:00
parent baf79cfc7f
commit ebb188e7ae
7 changed files with 131 additions and 98 deletions

View File

@@ -84,12 +84,14 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
defer s.OnSubscribe(req.StreamPath, u.Query())
}
if aliasInfo, ok := s.AliasStreams.Get(req.Alias); ok { //modify alias
oldStreamPath := aliasInfo.StreamPath
aliasInfo.AutoRemove = req.AutoRemove
if aliasInfo.StreamPath != req.StreamPath {
aliasInfo.StreamPath = req.StreamPath
if canReplace {
if aliasInfo.Publisher != nil {
aliasInfo.TransferSubscribers(publisher) // replace stream
aliasInfo.Publisher = publisher
} else {
s.Waiting.WakeUp(req.Alias, publisher)
}
@@ -103,6 +105,7 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
dbAlias.AutoRemove = req.AutoRemove
s.DB.Save(&dbAlias)
}
s.Info("modify alias", "alias", req.Alias, "oldStreamPath", oldStreamPath, "streamPath", req.StreamPath, "replace", ok && canReplace)
} else { // create alias
aliasInfo := &AliasStream{
AutoRemove: req.AutoRemove,
@@ -164,3 +167,81 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
})
return
}
func (p *Publisher) processAliasOnStart() {
s := p.Plugin.Server
for alias := range s.AliasStreams.Range {
if alias.StreamPath != p.StreamPath {
continue
}
if alias.Publisher == nil {
alias.Publisher = p
s.Waiting.WakeUp(alias.Alias, p)
} else if alias.Publisher.StreamPath != alias.StreamPath {
alias.Publisher.TransferSubscribers(p)
alias.Publisher = p
}
}
}
func (p *Publisher) processAliasOnDispose() {
s := p.Plugin.Server
var relatedAlias []*AliasStream
for alias := range s.AliasStreams.Range {
if alias.StreamPath == p.StreamPath {
if alias.AutoRemove {
defer s.AliasStreams.Remove(alias)
}
alias.Publisher = nil
relatedAlias = append(relatedAlias, alias)
}
}
if p.Subscribers.Length > 0 {
SUBSCRIBER:
for subscriber := range p.SubscriberRange {
for _, alias := range relatedAlias {
if subscriber.StreamPath == alias.Alias {
if originStream, ok := s.Streams.Get(alias.Alias); ok {
originStream.AddSubscriber(subscriber)
continue SUBSCRIBER
}
}
}
s.Waiting.Wait(subscriber)
}
p.Subscribers.Clear()
}
}
func (s *Subscriber) processAliasOnStart() (hasInvited bool, done bool) {
server := s.Plugin.Server
if alias, ok := server.AliasStreams.Get(s.StreamPath); ok {
if alias.Publisher != nil {
alias.Publisher.AddSubscriber(s)
done = true
return
} else {
server.OnSubscribe(alias.StreamPath, s.Args)
hasInvited = true
}
} else {
for reg, alias := range server.StreamAlias {
if streamPath := reg.Replace(s.StreamPath, alias); streamPath != "" {
server.AliasStreams.Set(&AliasStream{
StreamPath: streamPath,
Alias: s.StreamPath,
})
if publisher, ok := server.Streams.Get(streamPath); ok {
publisher.AddSubscriber(s)
done = true
return
} else {
server.OnSubscribe(streamPath, s.Args)
hasInvited = true
}
break
}
}
}
return
}

View File

@@ -135,7 +135,7 @@ func (a *AnnexB) Demux(codecCtx codec.ICodecCtx) (ret any, err error) {
} else if [3]byte(lastFourBytes[1:]) == codec.NALU_Delimiter1 {
startCode = 3
}
if startCode > 0 {
if startCode > 0 && reader.Offset() >= 3 {
if reader.Offset() == 3 {
startCode = 3
}

View File

@@ -674,7 +674,7 @@ func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {
for patten, handler := range handlers {
p.handle(patten, handler)
}
if p.config.EnableAuth && p.Server.ServerConfig.EnableLogin {
if p.config.EnableAuth && p.Server.ServerConfig.Admin.EnableLogin {
p.handle("/api/secret/{type}/{streamPath...}", http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {

View File

@@ -178,17 +178,7 @@ func (p *Publisher) Start() (err error) {
}
s.Streams.Set(p)
p.Info("publish")
if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool {
return pullProxy.GetStreamPath() == p.StreamPath
}); ok {
p.PullProxy = pullProxy
if pullProxy.Status == PullProxyStatusOnline {
pullProxy.ChangeStatus(PullProxyStatusPulling)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" {
mp4Plugin.Record(p, pullProxy.Record, nil)
}
}
}
p.processPullProxyOnStart()
p.audioReady = util.NewPromiseWithTimeout(p, p.PublishTimeout)
if !p.PubAudio {
p.audioReady.Reject(ErrMuted)
@@ -204,20 +194,7 @@ func (p *Publisher) Start() (err error) {
}
s.Waiting.WakeUp(p.StreamPath, p)
for alias := range s.AliasStreams.Range {
if alias.StreamPath != p.StreamPath {
continue
}
if alias.Publisher == nil {
alias.Publisher = p
s.Waiting.WakeUp(alias.Alias, p)
} else if alias.Publisher.StreamPath != alias.StreamPath {
alias.Publisher.TransferSubscribers(p)
alias.Publisher = p
}
}
p.processAliasOnStart()
for plugin := range s.Plugins.Range {
plugin.OnPublish(p)
}
@@ -589,32 +566,7 @@ func (p *Publisher) Dispose() {
if p.Paused != nil {
p.Paused.Reject(p.StopReason())
}
var relatedAlias []*AliasStream
for alias := range s.AliasStreams.Range {
if alias.StreamPath == p.StreamPath {
if alias.AutoRemove {
defer s.AliasStreams.Remove(alias)
}
alias.Publisher = nil
relatedAlias = append(relatedAlias, alias)
}
}
if p.Subscribers.Length > 0 {
SUBSCRIBER:
for subscriber := range p.SubscriberRange {
for _, alias := range relatedAlias {
if subscriber.StreamPath == alias.Alias {
if originStream, ok := s.Streams.Get(alias.Alias); ok {
originStream.AddSubscriber(subscriber)
continue SUBSCRIBER
}
}
}
s.Waiting.Wait(subscriber)
}
p.Subscribers.Clear()
}
p.processAliasOnDispose()
p.AudioTrack.Dispose()
p.VideoTrack.Dispose()
p.Info("unpublish", "remain", s.Streams.Length, "reason", p.StopReason())
@@ -622,9 +574,7 @@ func (p *Publisher) Dispose() {
p.dumpFile.Close()
}
p.State = PublisherStateDisposed
if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) {
p.PullProxy.ChangeStatus(PullProxyStatusOnline)
}
p.processPullProxyOnDispose()
}
func (p *Publisher) TransferSubscribers(newPublisher *Publisher) {

View File

@@ -9,8 +9,6 @@ import (
"strings"
"time"
"log/slog"
"github.com/mcuadros/go-defaults"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
@@ -45,13 +43,13 @@ type (
PullOnStart, Audio, StopOnIdle bool
config.Pull `gorm:"embedded;embeddedPrefix:pull_"`
config.Record `gorm:"embedded;embeddedPrefix:record_"`
RecordType string
ParentID uint
Type string
Status byte
Description string
RTT time.Duration
Handler IPullProxy `gorm:"-:all" yaml:"-"`
Logger *slog.Logger `gorm:"-:all" yaml:"-"`
Handler IPullProxy `gorm:"-:all" yaml:"-"`
}
PullProxyManager struct {
task.Manager[uint, *PullProxy]
@@ -216,6 +214,28 @@ func (d *TCPPullProxy) Tick(any) {
}
}
func (p *Publisher) processPullProxyOnStart() {
s := p.Plugin.Server
if pullProxy, ok := s.PullProxies.Find(func(pullProxy *PullProxy) bool {
return pullProxy.GetStreamPath() == p.StreamPath
}); ok {
p.PullProxy = pullProxy
if pullProxy.Status == PullProxyStatusOnline {
pullProxy.ChangeStatus(PullProxyStatusPulling)
if mp4Plugin, ok := s.Plugins.Get("MP4"); ok && pullProxy.FilePath != "" {
mp4Plugin.Record(p, pullProxy.Record, nil)
}
}
}
}
func (p *Publisher) processPullProxyOnDispose() {
s := p.Plugin.Server
if p.PullProxy != nil && p.PullProxy.Status == PullProxyStatusPulling && s.PullProxies.Has(p.PullProxy.GetKey()) {
p.PullProxy.ChangeStatus(PullProxyStatusOnline)
}
}
func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res *pb.PullProxyListResponse, err error) {
res = &pb.PullProxyListResponse{}
s.PullProxies.Call(func() error {

View File

@@ -62,12 +62,16 @@ type (
StreamAlias map[config.Regexp]string `desc:"流别名"`
PullProxy []*PullProxy
PushProxy []*PushProxy
EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制
Users []struct {
Username string `desc:"用户名"`
Password string `desc:"密码"`
Role string `default:"user" desc:"角色,可选值:admin,user"`
} `desc:"用户列表,仅在启用登录机制时生效"`
Admin struct {
EnableLogin bool `default:"false" desc:"启用登录机制"` //启用登录机制
FilePath string `default:"admin.zip" desc:"管理员界面文件路径"`
HomePage string `default:"home" desc:"管理员界面首页"`
Users []struct {
Username string `desc:"用户名"`
Password string `desc:"密码"`
Role string `default:"user" desc:"角色,可选值:admin,user"`
} `desc:"用户列表,仅在启用登录机制时生效"`
} `desc:"管理员界面配置"`
}
WaitStream struct {
StreamPath string
@@ -278,8 +282,8 @@ func (s *Server) Start() (err error) {
return
}
// Create users from configuration if EnableLogin is true
if s.ServerConfig.EnableLogin {
for _, userConfig := range s.ServerConfig.Users {
if s.ServerConfig.Admin.EnableLogin {
for _, userConfig := range s.ServerConfig.Admin.Users {
var user db.User
// Check if user exists
if err = s.DB.Where("username = ?", userConfig.Username).First(&user).Error; err != nil {
@@ -601,7 +605,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ValidateToken implements auth.TokenValidator
func (s *Server) ValidateToken(tokenString string) (*auth.JWTClaims, error) {
if !s.ServerConfig.EnableLogin {
if !s.ServerConfig.Admin.EnableLogin {
return &auth.JWTClaims{Username: "anonymous"}, nil
}
return auth.ValidateJWT(tokenString)
@@ -610,7 +614,7 @@ func (s *Server) ValidateToken(tokenString string) (*auth.JWTClaims, error) {
// Login implements the Login RPC method
func (s *Server) Login(ctx context.Context, req *pb.LoginRequest) (res *pb.LoginResponse, err error) {
res = &pb.LoginResponse{}
if !s.ServerConfig.EnableLogin {
if !s.ServerConfig.Admin.EnableLogin {
res.Data = &pb.LoginSuccess{
Token: "monibuca",
UserInfo: &pb.UserInfo{
@@ -663,7 +667,7 @@ func (s *Server) Logout(ctx context.Context, req *pb.LogoutRequest) (res *pb.Log
// GetUserInfo implements the GetUserInfo RPC method
func (s *Server) GetUserInfo(ctx context.Context, req *pb.UserInfoRequest) (res *pb.UserInfoResponse, err error) {
if !s.ServerConfig.EnableLogin {
if !s.ServerConfig.Admin.EnableLogin {
res = &pb.UserInfoResponse{
Code: 0,
Message: "success",
@@ -702,7 +706,7 @@ func (s *Server) GetUserInfo(ctx context.Context, req *pb.UserInfoRequest) (res
// AuthInterceptor creates a new unary interceptor for authentication
func (s *Server) AuthInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !s.ServerConfig.EnableLogin {
if !s.ServerConfig.Admin.EnableLogin {
return handler(ctx, req)
}

View File

@@ -105,33 +105,11 @@ func (s *Subscriber) Start() (err error) {
server := s.Plugin.Server
server.Subscribers.Add(s)
s.Info("subscribe")
hasInvited := false
if alias, ok := server.AliasStreams.Get(s.StreamPath); ok {
if alias.Publisher != nil {
alias.Publisher.AddSubscriber(s)
return
} else {
server.OnSubscribe(alias.StreamPath, s.Args)
hasInvited = true
}
} else {
for reg, alias := range server.StreamAlias {
if streamPath := reg.Replace(s.StreamPath, alias); streamPath != "" {
server.AliasStreams.Set(&AliasStream{
StreamPath: streamPath,
Alias: s.StreamPath,
})
if publisher, ok := server.Streams.Get(streamPath); ok {
publisher.AddSubscriber(s)
return
} else {
server.OnSubscribe(streamPath, s.Args)
hasInvited = true
}
break
}
}
hasInvited, done := s.processAliasOnStart()
if done {
return
}
if publisher, ok := server.Streams.Get(s.StreamPath); ok {
publisher.AddSubscriber(s)
} else {