feat: consider pull proxy disable status

This commit is contained in:
langhuihui
2025-06-12 13:50:47 +08:00
parent bacda6f5a0
commit 94be02cd79
6 changed files with 126 additions and 29 deletions

View File

@@ -98,4 +98,13 @@ jobs:
if: success() && !contains(env.version, 'beta')
run: |
docker tag langhuihui/monibuca:v5 langhuihui/monibuca:${{ env.version }}
docker push langhuihui/monibuca:${{ env.version }}
docker push langhuihui/monibuca:${{ env.version }}
- name: docker build lite version
if: success() && startsWith(github.ref, 'refs/tags/')
run: |
docker buildx build --platform linux/amd64,linux/arm64 -f DockerfileLite -t monibuca/v5:latest --push .
- name: docker lite push version tag
if: success() && !contains(env.version, 'beta')
run: |
docker tag monibuca/v5 monibuca/v5:${{ env.version }}
docker push lmonibuca/v5:${{ env.version }}

31
DockerfileLite Normal file
View File

@@ -0,0 +1,31 @@
# Running Stage
FROM alpine:latest
WORKDIR /monibuca
# Copy the pre-compiled binary from the build context
# The GitHub Actions workflow prepares 'monibuca_linux' in the context root
COPY monibuca_amd64 ./monibuca_amd64
COPY monibuca_arm64 ./monibuca_arm64
COPY admin.zip ./admin.zip
# Copy the configuration file from the build context
COPY example/default/config.yaml /etc/monibuca/config.yaml
# Export necessary ports
EXPOSE 6000 8080 8443 1935 554 5060 9000-20000
EXPOSE 5060/udp 44944/udp
RUN if [ "$(uname -m)" = "aarch64" ]; then \
mv ./monibuca_arm64 ./monibuca_linux; \
rm ./monibuca_amd64; \
else \
mv ./monibuca_amd64 ./monibuca_linux; \
rm ./monibuca_arm64; \
fi
ENTRYPOINT [ "./monibuca_linux"]
CMD ["-c", "/etc/monibuca/config.yaml"]

2
go.mod
View File

@@ -53,7 +53,7 @@ require (
google.golang.org/protobuf v1.34.2
gorm.io/driver/mysql v1.5.7
gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.11
gorm.io/gorm v1.30.0
)
require (

4
go.sum
View File

@@ -423,8 +423,8 @@ gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkD
gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8=
gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg=
gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs=
gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY=
gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=

View File

@@ -260,7 +260,7 @@ func (s *Server) GetPullProxyList(ctx context.Context, req *emptypb.Empty) (res
}
func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *pb.SuccessResponse, err error) {
device := &PullProxyConfig{
pullProxyConfig := &PullProxyConfig{
Name: req.Name,
Type: req.Type,
ParentID: uint(req.ParentID),
@@ -268,7 +268,7 @@ func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *
Description: req.Description,
StreamPath: req.StreamPath,
}
if device.Type == "" {
if pullProxyConfig.Type == "" {
var u *url.URL
u, err = url.Parse(req.PullURL)
if err != nil {
@@ -277,35 +277,49 @@ func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *
}
switch u.Scheme {
case "srt", "rtsp", "rtmp":
device.Type = u.Scheme
pullProxyConfig.Type = u.Scheme
default:
ext := filepath.Ext(u.Path)
switch ext {
case ".m3u8":
device.Type = "hls"
pullProxyConfig.Type = "hls"
case ".flv":
device.Type = "flv"
pullProxyConfig.Type = "flv"
case ".mp4":
device.Type = "mp4"
pullProxyConfig.Type = "mp4"
}
}
}
defaults.SetDefaults(&device.Pull)
defaults.SetDefaults(&device.Record)
device.URL = req.PullURL
device.Audio = req.Audio
device.StopOnIdle = req.StopOnIdle
device.Record.FilePath = req.RecordPath
device.Record.Fragment = req.RecordFragment.AsDuration()
defaults.SetDefaults(&pullProxyConfig.Pull)
defaults.SetDefaults(&pullProxyConfig.Record)
pullProxyConfig.URL = req.PullURL
pullProxyConfig.Audio = req.Audio
pullProxyConfig.StopOnIdle = req.StopOnIdle
pullProxyConfig.Record.FilePath = req.RecordPath
pullProxyConfig.Record.Fragment = req.RecordFragment.AsDuration()
if s.DB == nil {
err = pkg.ErrNoDB
return
}
s.DB.Create(device)
if req.StreamPath == "" {
device.StreamPath = device.GetStreamPath()
// 检查数据库中是否有相同的 streamPath 且状态不是 disabled 的记录
var existingCount int64
streamPath := pullProxyConfig.StreamPath
if streamPath == "" {
streamPath = pullProxyConfig.GetStreamPath()
}
_, err = s.createPullProxy(device)
s.DB.Model(&PullProxyConfig{}).Where("stream_path = ? AND status != ?", streamPath, PullProxyStatusDisabled).Count(&existingCount)
// 如果存在相同 streamPath 且状态不是 disabled 的记录,将当前记录状态设置为 disabled
if existingCount > 0 {
pullProxyConfig.Status = PullProxyStatusDisabled
}
s.DB.Create(pullProxyConfig)
if req.StreamPath == "" {
pullProxyConfig.StreamPath = pullProxyConfig.GetStreamPath()
}
_, err = s.createPullProxy(pullProxyConfig)
res = &pb.SuccessResponse{}
return
@@ -321,6 +335,10 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re
if err != nil {
return
}
// 记录原始状态,用于后续判断状态变化
originalStatus := target.Status
target.Name = req.Name
target.URL = req.PullURL
target.ParentID = uint(req.ParentID)
@@ -355,19 +373,50 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re
target.Record.Fragment = req.RecordFragment.AsDuration()
target.RTT = time.Duration(int(req.Rtt)) * time.Millisecond
target.StreamPath = req.StreamPath
// 如果设置状态为非 disable需要检查是否有相同 streamPath 的其他非 disable 代理
if req.Status != uint32(PullProxyStatusDisabled) {
var existingCount int64
streamPath := target.StreamPath
if streamPath == "" {
streamPath = target.GetStreamPath()
}
s.DB.Model(&PullProxyConfig{}).Where("stream_path = ? AND id != ? AND status != ?", streamPath, req.ID, PullProxyStatusDisabled).Count(&existingCount)
// 如果存在相同 streamPath 且状态不是 disabled 的其他记录,更新失败
if existingCount > 0 {
err = fmt.Errorf("已存在相同 streamPath [%s] 的非禁用代理,更新失败", streamPath)
return
}
target.Status = byte(req.Status)
} else {
target.Status = PullProxyStatusDisabled
}
s.DB.Save(target)
// 检查是否从 disable 状态变为非 disable 状态
wasDisabled := originalStatus == PullProxyStatusDisabled
isNowEnabled := target.Status != PullProxyStatusDisabled
isNowDisabled := target.Status == PullProxyStatusDisabled
wasEnabled := originalStatus != PullProxyStatusDisabled
if device, ok := s.PullProxies.SafeGet(uint(req.ID)); ok {
// 如果现在变为 disable 状态,需要停止并移除代理
if wasEnabled && isNowDisabled {
device.Stop(task.ErrStopByUser)
return
}
conf := device.GetConfig()
if target.URL != conf.URL || conf.Audio != target.Audio || conf.StreamPath != target.StreamPath || conf.Record.FilePath != target.Record.FilePath || conf.Record.Fragment != target.Record.Fragment {
device.Stop(task.ErrStopByUser)
device.WaitStopped()
_, err = s.createPullProxy(target)
device, err = s.createPullProxy(target)
if target.Status == PullProxyStatusPulling {
if pullJob, ok := s.Pulls.SafeGet(device.GetStreamPath()); ok {
pullJob.OnDispose(device.Pull)
pullJob.Stop(task.ErrStopByUser)
pullJob.WaitStopped()
}
device.Pull()
}
} else {
conf.Name = target.Name
@@ -382,6 +431,12 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.PullProxyInfo) (re
}
}
}
} else if wasDisabled && isNowEnabled {
// 如果原来是 disable 现在不是了,需要创建 PullProxy 并添加到集合中
_, err = s.createPullProxy(target)
if err != nil {
s.Error("create pull proxy failed", "error", err)
}
}
res = &pb.SuccessResponse{}
return

View File

@@ -441,9 +441,9 @@ func (s *Server) Start() (err error) {
}
func (s *Server) initPullProxies() {
// 1. First read all pull proxies from database
// 1. First read all pull proxies from database, excluding disabled ones
var pullProxies []*PullProxyConfig
s.DB.Find(&pullProxies)
s.DB.Where("status != ?", PullProxyStatusDisabled).Find(&pullProxies)
// Create a map for quick lookup of existing proxies
existingPullProxies := make(map[uint]*PullProxyConfig)
@@ -473,9 +473,11 @@ func (s *Server) initPullProxies() {
}
}
// 3. Finally add all proxies to collections
// 3. Finally add all proxies to collections, excluding disabled ones
for _, proxy := range pullProxies {
s.createPullProxy(proxy)
if proxy.Status != PullProxyStatusDisabled {
s.createPullProxy(proxy)
}
}
}