diff --git a/Dockerfile_full b/Dockerfile_full index e573333..68834dd 100644 --- a/Dockerfile_full +++ b/Dockerfile_full @@ -6,9 +6,11 @@ WORKDIR /opt/media/bin/ # 添加应用程序文件 ADD ./build/linux_amd64/bin ./gowvp -ADD ./configs/config.toml ./config.toml ADD ./www ./www +# 创建配置目录 +RUN mkdir -p configs + # 添加元数据标签 LABEL Name=gowvp \ Version=0.0.1 \ @@ -18,9 +20,5 @@ LABEL Name=gowvp \ # 暴露必要端口 EXPOSE 15123 1935 8080 554 10000-10500/udp -# 设置健康检查 -# HEALTHCHECK --interval=30s --timeout=3s \ - # CMD wget -q --spider http://localhost:15123/index/api/getServerConfig || exit 1 - # 启动服务 -CMD ["sh", "-c", "./MediaServer -s default.pem -c & ./gowvp & wait"] \ No newline at end of file +CMD ["./gowvp"] \ No newline at end of file diff --git a/Makefile b/Makefile index ea195b2..9bf9e4a 100644 --- a/Makefile +++ b/Makefile @@ -183,8 +183,12 @@ docker/save: docker/push: @docker push $(IMAGE_NAME) -docker/build/full: +docker/build/test: build/clean build/linux + @docker build --force-rm=true -t $(IMAGE_NAME) -f Dockerfile_full . + +docker/build/full: build/clean build/linux @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t $(IMAGE_NAME) -f Dockerfile_full . + # @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:latest -f Dockerfile_full . docker/build/gowvp: build/clean build/linux @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t registry.cn-shanghai.aliyuncs.com/ixugo/gowvp:latest -f Dockerfile . diff --git a/README.md b/README.md index 3cfb919..b68ff44 100644 --- a/README.md +++ b/README.md @@ -101,8 +101,38 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit [docker hub](https://hub.docker.com/r/gospace/gowvp) -** gowvp & zlmediakit 分开镜像(推荐)** + +** gowvp & zlmediakit 融合镜像(推荐)** +docker-compose.yml +```yml +services: + gowvp: + # 如果拉不到 docker hub 镜像,也可以尝试 + # registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:latest + image: gospace/gowvp:latest + # linux 解开下行注释,并将 ports 全部注释 + # network_mode: host + ports: + # gb28181 + - 15123:15123 # 管理平台 http 端口 + - 15060:15060 # gb28181 sip tcp 端口 + - 15060:15060/udp # gb28181 sip udp 端口 + # zlm + - 1935:1935 # rtmp + - 554:554 # rtsp + - 8080:80 # http + - 8443:443 # https + - 10000:10000 + - 8000:8000/udp + - 9000:9000/udp + - 20000-20100:20000-20100 # gb28181 收流端口 + - 20000-20100:20000-20100/udp # gb28181 收流端口udp + volumes: + - ./data:/opt/media/bin/configs +``` + +** gowvp & zlmediakit 分开镜像** ```yml services: gowvp: @@ -130,43 +160,12 @@ services: - 10000:10000/udp - 8000:8000/udp - 9000:9000/udp - - 20000-20300:20000-20300 - - 20000-20300:20000-20300/udp + - 20000-20100:20000-20100 + - 20000-20100:20000-20100/udp volumes: - ./configs:/opt/media/conf - ``` -** gowvp & zlmediakit 融合镜像(不推荐)** -docker-compose.yml -```yml -services: - gowvp: - image: gospace/gowvp:latest - # linux 解开下行注释,并将 ports 全部注释 - # network_mode: host - ports: - # gb28181 - - 15123:15123 # 管理平台 http 端口 - - 15060:15060 # gb28181 sip tcp 端口 - - 15060:15060/udp # gb28181 sip udp 端口 - # zlm - - 1935:1935 # rtmp - - 554:554 # rtsp - - 8080:80 # http - - 8443:443 # https - - 10000:10000 - - 8000:8000/udp - - 9000:9000/udp - - 20050-20100:20050-20100 # gb28181 收流端口 - - 20050-20100:20050-20100/udp # gb28181 收流端口udp - volumes: - - ./configs:/opt/media/bin/configs - - ./logs:/opt/media/bin/logs - - ./zlm.conf:/opt/media/conf -``` - - ## 快速开始 diff --git a/cmd/server/main.go b/cmd/server/main.go index b82d49e..77800cd 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "os" + "os/exec" "os/signal" "path/filepath" "regexp" @@ -43,6 +44,7 @@ func main() { if err := os.Chdir(filepath.Dir(bin)); err != nil { slog.Error("change dir error") } + go setupZLM(*configDir) // 初始化配置 var bc conf.Bootstrap // 获取配置目录绝对路径 @@ -60,7 +62,10 @@ func main() { bc.ConfigPath = filePath // 初始化日志 - logDir := filepath.Join(system.Getwd(), bc.Log.Dir) + logDir := filepath.Join(system.Getwd(), *configDir, bc.Log.Dir) + if filepath.IsAbs(bc.Log.Dir) { + logDir = bc.Log.Dir + } log, clean := logger.SetupSlog(logger.Config{ Dir: logDir, // 日志地址 Debug: bc.Debug, // 服务级别Debug/Release @@ -79,13 +84,7 @@ func main() { })) } - secret, err := getSecret(*configDir) - if err == nil { - slog.Info("发现 zlm 配置,已赋值,未回写配置文件", "secret", secret) - bc.Media.Secret = secret - } else { - slog.Info("未发现 zlm 配置,请检查 config.ini 文件", "err", err) - } + go setupSecret(&bc) // 如果需要执行表迁移,递增此版本号和表更新说明 versionapi.DBVersion = "0.0.10" @@ -142,14 +141,63 @@ func configIsNotExistWrite(path string) { // 读取 config.ini 文件,通过正则表达式,获取 secret 的值 func getSecret(configDir string) (string, error) { - content, err := os.ReadFile(filepath.Join(system.Getwd(), configDir, "config.ini")) - if err != nil { - return "", err + for _, file := range []string{"zlm.ini", "config.ini"} { + content, err := os.ReadFile(filepath.Join(system.Getwd(), configDir, file)) + if err != nil { + continue + } + re := regexp.MustCompile(`secret=(\w+)`) + matches := re.FindStringSubmatch(string(content)) + if len(matches) < 2 { + continue + } + return matches[1], nil } - re := regexp.MustCompile(`secret=(\w+)`) - matches := re.FindStringSubmatch(string(content)) - if len(matches) < 2 { - return "", fmt.Errorf("secret not found") - } - return matches[1], nil + return "", fmt.Errorf("unknow") +} + +func setupZLM(dir string) { + // 检查是否在 Docker 环境中 + _, err := os.Stat("/.dockerenv") + if !(err == nil || os.Getenv("NVR_STREAM") == "ZLM") { + slog.Info("未在 Docker 环境中运行,跳过启动 zlm") + return + } + + // 检查 MediaServer 文件是否存在 + mediaServerPath := filepath.Join(system.Getwd(), "MediaServer") + if _, err := os.Stat(mediaServerPath); os.IsNotExist(err) { + slog.Info("MediaServer 文件不存在", "path", mediaServerPath) + return + } + + // 启动 MediaServer + cmd := exec.Command("./MediaServer", "-s", "default.pem", "-c", filepath.Join(dir, "zlm.ini")) // nolint + cmd.Dir = system.Getwd() + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + for { + slog.Info("MediaServer 启动中...") + // 启动命令 + if err := cmd.Run(); err != nil { + slog.Error("zlm 运行失败", "err", err) + continue + } + time.Sleep(5 * time.Second) + } +} + +func setupSecret(bc *conf.Bootstrap) { + for range 3 { + secret, err := getSecret(*configDir) + if err == nil { + slog.Info("发现 zlm 配置,已赋值,未回写配置文件", "secret", secret) + bc.Media.Secret = secret + return + } + time.Sleep(2 * time.Second) + continue + } + slog.Info("未发现 zlm 配置,请检查 config.ini 文件") } diff --git a/configs/config.toml b/configs/config.toml index cec0ad0..22d1d0a 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -1,64 +1,64 @@ [Server] -Debug = false -# rtmp 推流秘钥 -RTMPSecret = '123' + Debug = false + # rtmp 推流秘钥 + RTMPSecret = '123' -# 对外提供的服务,建议由 nginx 代理 -[Server.HTTP] -# http 端口 -Port = 15123 -# 请求超时时间 -Timeout = '1m0s' -# jwt 秘钥,空串时,每次启动程序将随机赋值 -JwtSecret = '' + # 对外提供的服务,建议由 nginx 代理 + [Server.HTTP] + # http 端口 + Port = 15123 + # 请求超时时间 + Timeout = '1m0s' + # jwt 秘钥,空串时,每次启动程序将随机赋值 + JwtSecret = '' -[Server.HTTP.PProf] -# 是否启用 pprof, 建议设置为 true -Enabled = true -# 访问白名单 -AccessIps = ['::1', '127.0.0.1'] + [Server.HTTP.PProf] + # 是否启用 pprof, 建议设置为 true + Enabled = true + # 访问白名单 + AccessIps = ['::1', '127.0.0.1'] [Data] -# 数据库支持 sqlite 和 postgres 两种,使用 sqlite 时 dsn 应当填写文件存储路径 -[Data.Database] -Dsn = './configs/data.db' -MaxIdleConns = 1 -MaxOpenConns = 1 -ConnMaxLifetime = '6h0m0s' -SlowThreshold = '200ms' + # 数据库支持 sqlite 和 postgres 两种,使用 sqlite 时 dsn 应当填写文件存储路径 + [Data.Database] + Dsn = './configs/data.db' + MaxIdleConns = 1 + MaxOpenConns = 1 + ConnMaxLifetime = '6h0m0s' + SlowThreshold = '200ms' [Log] -# 日志存储目录,不能使用特殊符号 -Dir = './logs' -# 记录级别 debug/info/warn/error -Level = 'debug' -# 保留日志多久,超过时间自动删除 -MaxAge = '744h0m0s' -# 多久时间,分割一个新的日志文件 -RotationTime = '12h0m0s' -# 多大文件,分割一个新的日志文件(MB) -RotationSize = 50 + # 日志存储目录,不能使用特殊符号 + Dir = './logs' + # 记录级别 debug/info/warn/error + Level = 'debug' + # 保留日志多久,超过时间自动删除 + MaxAge = '744h0m0s' + # 多久时间,分割一个新的日志文件 + RotationTime = '12h0m0s' + # 多大文件,分割一个新的日志文件(MB) + RotationSize = 50 [Sip] -# 服务监听的 tcp/udp 端口号 -Port = 15060 -# gb/t28181 20 位国标 ID -ID = '3402000000200000001' -# 域 -Domain = '3402000000' -# 注册密码 -Password = '' + # 服务监听的 tcp/udp 端口号 + Port = 15060 + # gb/t28181 20 位国标 ID + ID = '3402000000200000001' + # 域 + Domain = '3402000000' + # 注册密码 + Password = '' [Media] -# 媒体服务器 IP -IP = '127.0.0.1' -# 媒体服务器 HTTP 端口 -HTTPPort = 8080 -# 媒体服务器密钥 -Secret = 'jvRqCAzEg7AszBi4gm1cfhwXpmnVmJMG' -# 用于流媒体 webhook 回调 -WebHookIP = '192.168.31.180' -# 媒体服务器 RTP 端口范围 -RTPPortRange = '20000-20300' -# 媒体服务器 SDP IP -SDPIP = '192.168.31.180' + # 媒体服务器 IP + IP = '127.0.0.1' + # 媒体服务器 HTTP 端口 + HTTPPort = 8080 + # 媒体服务器密钥 + Secret = 'jvRqCAzEg7AszBi4gm1cfhwXpmnVmJMG' + # 用于流媒体 webhook 回调 + WebHookIP = '192.168.31.180' + # 媒体服务器 RTP 端口范围 + RTPPortRange = '20000-20100' + # 媒体服务器 SDP IP + SDPIP = '192.168.31.180' \ No newline at end of file diff --git a/docker-compose.debug.yml b/docker-compose.debug.yml index 41c6fcb..8e449a3 100644 --- a/docker-compose.debug.yml +++ b/docker-compose.debug.yml @@ -1,16 +1,15 @@ services: gowvp: - image: gb28181 - build: - context: . - dockerfile: ./Dockerfile + image: registry.cn-shanghai.aliyuncs.com/ixugo/gowvp:latest ports: - 15123:15123 # 管理平台 http 端口 - 15060:15060 # gb28181 sip tcp 端口 - 15060:15060/udp # gb28181 sip udp 端口 volumes: - - ./logs:/app/logs + # - ./logs:/app/logs # 如果需要持久化日志,请取消注释 - ./configs:/app/configs + networks: + - gowvp-network depends_on: - zlm zlm: @@ -18,6 +17,8 @@ services: restart: always # 推荐 linux 主机使用 host 模式 # network_mode: host + networks: + - gowvp-network ports: - 1935:1935 # rtmp - 554:554 # rtsp @@ -27,7 +28,12 @@ services: - 10000:10000/udp - 8000:8000/udp - 9000:9000/udp - - 20050-20100:20050-20100 - - 20050-20100:20050-20100/udp + - 20000-20100:20000-20100 + - 20000-20100:20000-20100/udp volumes: - - ./conf:/opt/media/conf + - ./configs:/opt/media/conf + +# 如果不使用 host 模式,可以使用下面的配置 +networks: + gowvp-network: + driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index 9aa8416..54b3a17 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,39 +1,27 @@ services: gowvp: - image: registry.cn-shanghai.aliyuncs.com/ixugo/gowvp:latest + # 如果拉不到 docker hub 镜像,也可以尝试 + # registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:latest + image: gospace/gowvp:latest + # linux 解开下行注释,并将 ports 全部注释 + # network_mode: host ports: + # gb28181 - 15123:15123 # 管理平台 http 端口 - 15060:15060 # gb28181 sip tcp 端口 - 15060:15060/udp # gb28181 sip udp 端口 - volumes: - # - ./logs:/app/logs # 如果需要持久化日志,请取消注释 - - ./configs:/app/configs - networks: - - gowvp-network - depends_on: - - zlm - zlm: - image: zlmediakit/zlmediakit:master - restart: always - # 推荐 linux 主机使用 host 模式 - # network_mode: host - networks: - - gowvp-network - ports: + # zlm - 1935:1935 # rtmp - 554:554 # rtsp - - 8080:80 # api - - 8443:443 + - 8080:80 # http + - 8443:443 # https - 10000:10000 - - 10000:10000/udp - 8000:8000/udp - 9000:9000/udp - - 20050-20100:20050-20100 - - 20050-20100:20050-20100/udp + - 20000-20100:20000-20100 # gb28181 收流端口 + - 20000-20100:20000-20100/udp # gb28181 收流端口udp + logging: + options: + max-size: "100M" volumes: - - ./configs:/opt/media/conf - -# 如果不使用 host 模式,可以使用下面的配置 -networks: - gowvp-network: - driver: bridge + - ./data:/opt/media/bin/configs diff --git a/go.mod b/go.mod index 49471e4..ed46b4f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/glebarez/sqlite v1.11.0 github.com/google/wire v0.6.0 + github.com/gorilla/websocket v1.5.3 github.com/ixugo/goddd v1.2.0 github.com/jinzhu/copier v0.4.0 github.com/pelletier/go-toml/v2 v2.2.3 diff --git a/go.sum b/go.sum index 083ce15..5e10906 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.6.0 h1:HBkoIh4BdSxoyo9PveV8giw7ZsaBOvzWKfcg/6MrVwI= github.com/google/wire v0.6.0/go.mod h1:F4QhpQ9EDIdJ1Mbop/NZBRB+5yrR6qg3BnctaoUk6NA= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/ixugo/goddd v1.2.0 h1:5jVyKIVXNPFJWfA2JolyWeSSnyb2etq31PSCN69eZRY= github.com/ixugo/goddd v1.2.0/go.mod h1:a/GJWwrX/irsGosgfnUNeBTOZ6of+IFcP6Fn82aluy0= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= diff --git a/internal/conf/default.go b/internal/conf/default.go index 9fe7c65..2829496 100644 --- a/internal/conf/default.go +++ b/internal/conf/default.go @@ -37,11 +37,11 @@ func DefaultConfig() Bootstrap { }, Media: Media{ IP: "127.0.0.1", - HTTPPort: 8080, + HTTPPort: 80, Secret: "", WebHookIP: "127.0.0.1", SDPIP: "127.0.0.1", - RTPPortRange: "20000-20300", + RTPPortRange: "20000-20100", }, Log: Log{ Dir: "./logs", diff --git a/internal/core/gb28181/store/gb28181cache/cache.go b/internal/core/gb28181/store/gb28181cache/cache.go index 07b2b1b..97d9d76 100644 --- a/internal/core/gb28181/store/gb28181cache/cache.go +++ b/internal/core/gb28181/store/gb28181cache/cache.go @@ -25,6 +25,11 @@ type Cache struct { devices *conc.Map[string, *gbs.Device] } +// LoadOrStore implements gbs.MemoryStorer. +func (c *Cache) LoadOrStore(deviceID string, value *gbs.Device) { + c.devices.LoadOrStore(deviceID, value) +} + func (c *Cache) Device() gb28181.DeviceStorer { return (*Device)(c) } diff --git a/internal/core/sms/media_server.go b/internal/core/sms/media_server.go index 049ef71..093ceb5 100755 --- a/internal/core/sms/media_server.go +++ b/internal/core/sms/media_server.go @@ -26,6 +26,13 @@ func (c *Core) FindMediaServer(ctx context.Context, in *FindMediaServerInput) ([ if err != nil { return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error()) } + + for _, item := range items { + value, ok := c.cacheServers.Load(item.ID) + if ok { + item.Status = value.IsOnline + } + } return items, total, nil } diff --git a/internal/core/sms/node_manager.go b/internal/core/sms/node_manager.go index 47a357f..bce507e 100644 --- a/internal/core/sms/node_manager.go +++ b/internal/core/sms/node_manager.go @@ -43,7 +43,7 @@ func (n *NodeManager) Close() { // tickCheck 定时检查服务是否离线 func (n *NodeManager) tickCheck() { - ticker := time.NewTicker(2 * time.Second) + ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { @@ -53,16 +53,16 @@ func (n *NodeManager) tickCheck() { // TODO: 前期先固定 10 秒保活,后期优化 const KeepaliveInterval = 2 * 10 * time.Second n.cacheServers.Range(func(serverID string, ms *WarpMediaServer) bool { - IsOffline := time.Since(ms.LastUpdatedAt) >= KeepaliveInterval - if ms.IsOnline == IsOffline { - ms.IsOnline = !IsOffline - var svr MediaServer - if err := n.storer.MediaServer().Edit(context.Background(), &svr, func(b *MediaServer) { - b.Status = ms.IsOnline - }, orm.Where("id=?", serverID)); err != nil { - slog.Error("Edit MediaServer err", "err", err) - } - } + isOffline := time.Since(ms.LastUpdatedAt) >= KeepaliveInterval + // if ms.IsOnline != isOffline { + // var svr MediaServer + // if err := n.storer.MediaServer().Edit(context.Background(), &svr, func(b *MediaServer) { + // b.Status = isOffline + // }, orm.Where("id=?", serverID)); err != nil { + // slog.Error("Edit MediaServer err", "err", err) + // } + // } + ms.IsOnline = !isOffline return true }) @@ -112,7 +112,7 @@ func (n *NodeManager) Run(cfg *conf.Media, serverPort int) error { return nil } -func (n *NodeManager) connection(server *MediaServer, serverPort int) { +func (n *NodeManager) connection(server *MediaServer, serverPort int) error { n.cacheServers.Store(server.ID, &WarpMediaServer{ LastUpdatedAt: time.Now(), }) @@ -127,84 +127,80 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) { log.Info("ZLM 服务节点连接中") - for i := range 5 { - resp, err := engine.GetServerConfig() - if err != nil { - log.Error("ZLM 服务节点连接失败", "err", err, "retry", i) - time.Sleep(10 * time.Second) - continue - } - log.Info("ZLM 服务节点连接成功") - - zlmConfig := resp.Data[0] - var ms MediaServer - if err := n.storer.MediaServer().Edit(context.Background(), &ms, func(b *MediaServer) { - // b.Ports.FLV = zlmConfig.HTTPPort - // TODO: 映射的端口,会导致获取配置文件的端口不一定能访问 - http := server.Ports.HTTP - b.Ports.FLV = http - b.Ports.WsFLV = http // zlmConfig.HTTPSslport - b.Ports.HTTPS = zlmConfig.HTTPSslport - b.Ports.RTMP = zlmConfig.RtmpPort - b.Ports.RTMPs = zlmConfig.RtmpSslport - b.Ports.RTSP = zlmConfig.RtspPort - b.Ports.RTSPs = zlmConfig.RtspSslport - b.Ports.RTPPorxy = zlmConfig.RtpProxyPort - b.Ports.FLVs = zlmConfig.HTTPSslport - b.Ports.WsFLVs = zlmConfig.HTTPSslport - b.HookAliveInterval = 10 - b.Status = true - }, orm.Where("id=?", server.ID)); err != nil { - panic(fmt.Errorf("保存 MediaServer 失败 %w", err)) - } - - log.Info("ZLM 服务节点配置设置") - - hookPrefix := fmt.Sprintf("http://%s:%d/webhook", server.HookIP, serverPort) - req := zlm.SetServerConfigRequest{ - RtcExternIP: zlm.NewString(server.IP), - GeneralMediaServerID: zlm.NewString(server.ID), - HookEnable: zlm.NewString("1"), - HookOnFlowReport: zlm.NewString(""), - HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)), - // HookOnHTTPAccess: zlm.NewString(""), - HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), - HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)), - HookOnRecordTs: zlm.NewString(""), - HookOnRtspAuth: zlm.NewString(""), - HookOnRtspRealm: zlm.NewString(""), - // HookOnServerStarted: , - HookOnShellLogin: zlm.NewString(""), - HookOnStreamChanged: zlm.NewString(fmt.Sprintf("%s/on_stream_changed", hookPrefix)), - // HookOnStreamNotFound: , - HookOnServerKeepalive: zlm.NewString(fmt.Sprintf("%s/on_server_keepalive", hookPrefix)), - // HookOnSendRtpStopped: , - // HookOnRtpServerTimeout: , - // HookOnRecordMp4: , - HookTimeoutSec: zlm.NewString("20"), - // TODO: 回调时间间隔有问题 - HookAliveInterval: zlm.NewString(fmt.Sprint(server.HookAliveInterval)), - // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 - // 置0关闭此特性(推流断开会导致立即断开播放器) - // 此参数不应大于播放器超时时间 - // 优化此消息以更快的收到流注销事件 - ProtocolContinuePushMs: zlm.NewString("3000"), - RtpProxyPortRange: &server.RTPPortRange, - } - - { - resp, err := engine.SetServerConfig(&req) - if err != nil { - log.Error("ZLM 服务节点配置设置失败", "err", err) - time.Sleep(10 * time.Second) - continue - } - - log.Info("ZLM 服务节点配置设置成功", "changed", resp.Changed) - } - - return + resp, err := engine.GetServerConfig() + if err != nil { + log.Error("ZLM 服务节点连接失败", "err", err) + return err } + log.Info("ZLM 服务节点连接成功") + + zlmConfig := resp.Data[0] + var ms MediaServer + if err := n.storer.MediaServer().Edit(context.Background(), &ms, func(b *MediaServer) { + // b.Ports.FLV = zlmConfig.HTTPPort + // TODO: 映射的端口,会导致获取配置文件的端口不一定能访问 + http := server.Ports.HTTP + b.Ports.FLV = http + b.Ports.WsFLV = http // zlmConfig.HTTPSslport + b.Ports.HTTPS = zlmConfig.HTTPSslport + b.Ports.RTMP = zlmConfig.RtmpPort + b.Ports.RTMPs = zlmConfig.RtmpSslport + b.Ports.RTSP = zlmConfig.RtspPort + b.Ports.RTSPs = zlmConfig.RtspSslport + b.Ports.RTPPorxy = zlmConfig.RtpProxyPort + b.Ports.FLVs = zlmConfig.HTTPSslport + b.Ports.WsFLVs = zlmConfig.HTTPSslport + b.HookAliveInterval = 10 + b.Status = true + }, orm.Where("id=?", server.ID)); err != nil { + panic(fmt.Errorf("保存 MediaServer 失败 %w", err)) + } + + log.Info("ZLM 服务节点配置设置") + + hookPrefix := fmt.Sprintf("http://%s:%d/webhook", server.HookIP, serverPort) + req := zlm.SetServerConfigRequest{ + RtcExternIP: zlm.NewString(server.IP), + GeneralMediaServerID: zlm.NewString(server.ID), + HookEnable: zlm.NewString("1"), + HookOnFlowReport: zlm.NewString(""), + HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)), + // HookOnHTTPAccess: zlm.NewString(""), + HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), + HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)), + HookOnRecordTs: zlm.NewString(""), + HookOnRtspAuth: zlm.NewString(""), + HookOnRtspRealm: zlm.NewString(""), + // HookOnServerStarted: , + HookOnShellLogin: zlm.NewString(""), + HookOnStreamChanged: zlm.NewString(fmt.Sprintf("%s/on_stream_changed", hookPrefix)), + // HookOnStreamNotFound: , + HookOnServerKeepalive: zlm.NewString(fmt.Sprintf("%s/on_server_keepalive", hookPrefix)), + // HookOnSendRtpStopped: , + // HookOnRtpServerTimeout: , + // HookOnRecordMp4: , + HookTimeoutSec: zlm.NewString("20"), + // TODO: 回调时间间隔有问题 + HookAliveInterval: zlm.NewString(fmt.Sprint(server.HookAliveInterval)), + // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 + // 置0关闭此特性(推流断开会导致立即断开播放器) + // 此参数不应大于播放器超时时间 + // 优化此消息以更快的收到流注销事件 + ProtocolContinuePushMs: zlm.NewString("3000"), + RtpProxyPortRange: &server.RTPPortRange, + } + + { + resp, err := engine.SetServerConfig(&req) + if err != nil { + log.Error("ZLM 服务节点配置设置失败", "err", err) + return err + } + + log.Info("ZLM 服务节点配置设置成功", "changed", resp.Changed) + } + + return nil } func (n *NodeManager) Keepalive(serverID string) { diff --git a/internal/web/api/api.go b/internal/web/api/api.go index 57c753a..905690f 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -2,8 +2,11 @@ package api import ( "expvar" + "fmt" "log/slog" "net/http" + "net/http/httputil" + "net/url" "path/filepath" "runtime" "runtime/debug" @@ -73,6 +76,8 @@ func setupRouter(r *gin.Engine, uc *Usecase) { registerProxy(r, uc.ProxyAPI) registerConfig(r, uc.ConfigAPI) registerSms(r, uc.SMSAPI) + + r.Any("/proxy/sms/*path", uc.proxySMS) } type playOutput struct { @@ -168,3 +173,39 @@ func sortExpvarMap(data *expvar.Map, top int) []KV { } return kvs[:idx] } + +func (uc *Usecase) proxySMS(c *gin.Context) { + rc := http.NewResponseController(c.Writer) + exp := time.Now().AddDate(99, 0, 0) + _ = rc.SetReadDeadline(exp) + _ = rc.SetWriteDeadline(exp) + + path := c.Param("path") + addr, err := url.JoinPath(fmt.Sprintf("http://%s:%d", uc.Conf.Media.IP, uc.Conf.Media.HTTPPort), path) + if err != nil { + web.Fail(c, err) + return + } + fullAddr, _ := url.Parse(addr) + c.Request.URL.Path = "" + proxy := httputil.NewSingleHostReverseProxy(fullAddr) + proxy.Director = func(req *http.Request) { + // 设置请求的URL + req.URL.Scheme = "http" + req.URL.Host = fmt.Sprintf("%s:%d", uc.Conf.Media.IP, uc.Conf.Media.HTTPPort) + req.URL.Path = path + } + proxy.ModifyResponse = func(r *http.Response) error { + r.Header.Del("access-control-allow-credentials") + r.Header.Del("access-control-allow-origin") + if r.StatusCode >= 300 && r.StatusCode < 400 { + if l := r.Header.Get("location"); l != "" { + if !strings.HasPrefix(l, "http") { + r.Header.Set("location", "/proxy/sms/"+strings.TrimPrefix(l, "/")) + } + } + } + return nil + } + proxy.ServeHTTP(c.Writer, c.Request) +} diff --git a/internal/web/api/gb28181.go b/internal/web/api/gb28181.go index 6f732ab..29d2574 100755 --- a/internal/web/api/gb28181.go +++ b/internal/web/api/gb28181.go @@ -27,24 +27,23 @@ import ( var ErrDevice = reason.NewError("ErrDevice", "设备错误") const ( - dataDir = "data" coverDir = "cover" ) // TODO: 快照不会删除,只会覆盖,设备删除时也不会删除快照,待实现 -func writeCover(channelID string, body []byte) error { +func writeCover(dataDir, channelID string, body []byte) error { coverPath := filepath.Join(system.Getwd(), dataDir, coverDir) os.MkdirAll(coverPath, 0o755) return os.WriteFile(filepath.Join(coverPath, channelID+".jpg"), body, 0o644) } -func readCoverPath(channelID string) string { +func readCoverPath(dataDir, channelID string) string { coverPath := filepath.Join(system.Getwd(), dataDir, coverDir) return filepath.Join(coverPath, channelID+".jpg") } -func readCover(channelID string) ([]byte, error) { - return os.ReadFile(readCoverPath(channelID)) +func readCover(dataDir, channelID string) ([]byte, error) { + return os.ReadFile(readCoverPath(dataDir, channelID)) } type GB28181API struct { @@ -62,7 +61,6 @@ func NewGB28181Core(store gb28181.Storer, uni uniqueid.Core) gb28181.Core { func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc) { g.Any("/gb28181/snapshot", func(c *gin.Context) { - fmt.Println(">>>>>>>>>>>>>>>") b, err := io.ReadAll(c.Request.Body) if err != nil { panic(err) @@ -163,6 +161,10 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { // 国标逻辑 if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) { + // 防止错误的配置,无法收到流 + if a.uc.Conf.Media.SDPIP == "127.0.0.1" { + return nil, reason.ErrUsedLogic.SetMsg("请先配置流媒体 SDP 收流地址") + } // a.uc.SipServer. ch, err := a.gb28181Core.GetChannel(c.Request.Context(), channelID) if err != nil { @@ -248,6 +250,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { if l := strings.Split(c.Request.Host, ":"); len(l) == 2 { host = l[0] } + httpPort := a.uc.Conf.Server.HTTP.Port // 播放规则 // https://github.com/zlmediakit/ZLMediaKit/wiki/%E6%92%AD%E6%94%BEurl%E8%A7%84%E5%88%99 @@ -258,12 +261,12 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { Items: []streamAddrItem{ { Label: "默认线路", - WSFLV: fmt.Sprintf("ws://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session, - HTTPFLV: fmt.Sprintf("http://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session, + WSFLV: fmt.Sprintf("ws://%s:%d/proxy/sms/%s.live.flv", host, httpPort, stream) + "?" + session, + HTTPFLV: fmt.Sprintf("http://%s:%d/proxy/sms/%s.live.flv", host, httpPort, stream) + "?" + session, RTMP: fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session, RTSP: fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session, - WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTP, app, stream) + "&" + session, - HLS: fmt.Sprintf("http://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTP, stream) + "?" + session, + WebRTC: fmt.Sprintf("webrtc://%s:%d/proxy/sms/index/api/webrtc?app=%s&stream=%s&type=play", host, httpPort, app, stream) + "&" + session, + HLS: fmt.Sprintf("http://%s:%d/proxy/sms/%s/hls.fmp4.m3u8", host, httpPort, stream) + "?" + session, }, { Label: "SSL 线路", @@ -287,7 +290,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { if err != nil { slog.Error("get snapshot", "err", err) } else { - writeCover(channelID, body) + writeCover(a.uc.Conf.ConfigDir, channelID, body) } }() return &out, nil @@ -303,7 +306,7 @@ type refreshSnapshotInput struct { func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (any, error) { channelID := c.Param("id") - path := readCoverPath(channelID) + path := readCoverPath(a.uc.Conf.ConfigDir, channelID) // 获取文件的修改时间 fileInfo, err := os.Stat(path) @@ -328,7 +331,7 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a slog.Error("get snapshot", "err", err) // return nil, reason.ErrBadRequest.Msg(err.Error()) } else { - writeCover(channelID, img) + writeCover(a.uc.Conf.ConfigDir, channelID, img) } } @@ -337,7 +340,7 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a func (a GB28181API) getSnapshot(c *gin.Context) { channelID := c.Param("id") - body, err := readCover(channelID) + body, err := readCover(a.uc.Conf.ConfigDir, channelID) if err != nil { reason.ErrNotFound.SetMsg(err.Error()) return diff --git a/internal/web/api/notify.go b/internal/web/api/notify.go new file mode 100644 index 0000000..4151dc2 --- /dev/null +++ b/internal/web/api/notify.go @@ -0,0 +1,33 @@ +package api + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "github.com/ixugo/goddd/pkg/web" +) + +// socketUpgrade 函数用于将HTTP连接升级为WebSocket连接 +func socketUpgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) { + socket := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + ReadBufferSize: 1024 * 2, + WriteBufferSize: 1024, + } + return socket.Upgrade(w, r, nil) +} + +func registerNotify(g gin.IRouter, handler ...gin.HandlerFunc) { + group := g.Group("/notify") + group.POST("/messages", func(c *gin.Context) { + conn, err := socketUpgrade(c.Writer, c.Request) + if err != nil { + web.Fail(c, err) + return + } + defer conn.Close() + }) +} diff --git a/pkg/gbs/play.go b/pkg/gbs/play.go index 566913f..e78b8e5 100644 --- a/pkg/gbs/play.go +++ b/pkg/gbs/play.go @@ -56,7 +56,7 @@ func (g *GB28181API) StopPlay(in *StopPlayInput) error { func (g *GB28181API) Play(in *PlayInput) error { ch, ok := g.svr.memoryStorer.GetChannel(in.Channel.DeviceID, in.Channel.ChannelID) if !ok { - return ErrDeviceNotExist + return ErrChannelNotExist } ch.device.playMutex.Lock() diff --git a/pkg/gbs/register.go b/pkg/gbs/register.go index 43fc429..d3665ad 100644 --- a/pkg/gbs/register.go +++ b/pkg/gbs/register.go @@ -41,9 +41,9 @@ func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181, sms *sms.NodeMana }), streams: &conc.Map[string, *Streams]{}, } - go g.catalog.Start(func(s string, c []*Channels) { + go g.catalog.Start(func(s string, channel []*Channels) { // 零值不做变更,没有通道又何必注册上来 - if len(c) == 0 { + if len(channel) == 0 { return } @@ -51,17 +51,24 @@ func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181, sms *sms.NodeMana // if ok { // ipc.channels.Clear() // for _, ch := range c { - // ch := Channel{ - // ChannelID: ch.ChannelID, - // device: ipc, - // } - // ch.init(g.cfg.Domain) - // ipc.channels.Store(ch.ChannelID, &ch) + // } // } - out := make([]*gb28181.Channel, len(c)) - for i, ch := range c { + ipc, ok := g.svr.memoryStorer.Load(s) + if ok { + for _, ch := range channel { + ch := Channel{ + ChannelID: ch.ChannelID, + device: ipc, + } + ch.init(g.cfg.Domain) + ipc.Channels.Store(ch.ChannelID, &ch) + } + } + + out := make([]*gb28181.Channel, len(channel)) + for i, ch := range channel { out[i] = &gb28181.Channel{ DeviceID: s, ChannelID: ch.ChannelID, @@ -90,6 +97,11 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { ctx.String(http.StatusInternalServerError, "server db error") return } + g.svr.memoryStorer.LoadOrStore(ctx.DeviceID, &Device{ + conn: ctx.Request.GetConnection(), + source: ctx.Source, + to: ctx.To, + }) password := dev.Password if password == "" { diff --git a/pkg/gbs/server.go b/pkg/gbs/server.go index ca6bc41..b72d652 100644 --- a/pkg/gbs/server.go +++ b/pkg/gbs/server.go @@ -20,6 +20,7 @@ import ( ) type MemoryStorer interface { + LoadOrStore(deviceID string, value *Device) LoadDeviceToMemory(conn sip.Connection) // 加载设备到内存 RangeDevices(fn func(key string, value *Device) bool) // 遍历设备 diff --git a/pkg/zlm/zlm.go b/pkg/zlm/zlm.go index 80320cc..1a6b3c3 100644 --- a/pkg/zlm/zlm.go +++ b/pkg/zlm/zlm.go @@ -32,7 +32,7 @@ type Engine struct { func NewEngine() Engine { return Engine{ cli: &http.Client{ - Timeout: 10 * time.Second, + Timeout: 5 * time.Second, Transport: &http.Transport{ MaxIdleConns: 30, MaxIdleConnsPerHost: 30,