From d082fcedda0b9e723542b971d6081ac49b4f88d0 Mon Sep 17 00:00:00 2001 From: xugo Date: Sat, 14 Jun 2025 01:20:46 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=9B=BD=E6=A0=87=E6=92=AD?= =?UTF-8?q?=E6=94=BE=E4=B8=8E=E4=BF=AE=E5=A4=8D=E5=BF=AB=E7=85=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + changelog | 5 ++ configs/config.toml | 106 +++++++++++++------------- internal/core/sms/node_manager.go | 1 + internal/web/api/api.go | 4 + internal/web/api/gb28181.go | 20 +---- internal/web/api/zlm_webhook.go | 37 +++++++++ internal/web/api/zlm_webhook_param.go | 12 +++ main.go | 2 + pkg/gbs/play.go | 27 ++++--- 10 files changed, 135 insertions(+), 80 deletions(-) diff --git a/.gitignore b/.gitignore index 6f130d6..7fae4de 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ tables/ *.zip .idea/ data/ +cover/ diff --git a/changelog b/changelog index 25b4a1c..c66b37d 100644 --- a/changelog +++ b/changelog @@ -1,3 +1,8 @@ +# 2025-06-14 +重构国标播放逻辑, play 接口仅返回播放地址,通过 webhook 来通知拉流 +可能会导致首播慢一些 +重构 main 函数,将文件移动到项目根目录下 + # 2025-02-15 国标流停止,实现 sip bye 请求 diff --git a/configs/config.toml b/configs/config.toml index 6fe6e38..928907c 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -1,64 +1,64 @@ [Server] -Debug = false -# rtmp 推流秘钥 -RTMPSecret = '123' + Debug = false + # rtmp 推流秘钥 + RTMPSecret = '123' -# 对外提供的服务,建议由 nginx 代理 -[Server.HTTP] -# http 端口 -Port = 15123 -# 请求超时时间 -Timeout = '1m0s' -# jwt 秘钥,空串时,每次启动程序将随机赋值 -JwtSecret = '' + # 对外提供的服务,建议由 nginx 代理 + [Server.HTTP] + # http 端口 + Port = 15123 + # 请求超时时间 + Timeout = '1m0s' + # jwt 秘钥,空串时,每次启动程序将随机赋值 + JwtSecret = '' -[Server.HTTP.PProf] -# 是否启用 pprof, 建议设置为 true -Enabled = true -# 访问白名单 -AccessIps = ['::1', '127.0.0.1'] + [Server.HTTP.PProf] + # 是否启用 pprof, 建议设置为 true + Enabled = true + # 访问白名单 + AccessIps = ['::1', '127.0.0.1'] [Data] -# 数据库支持 sqlite 和 postgres 两种,使用 sqlite 时 dsn 应当填写文件存储路径 -[Data.Database] -Dsn = './configs/data.db' -MaxIdleConns = 1 -MaxOpenConns = 1 -ConnMaxLifetime = '6h0m0s' -SlowThreshold = '200ms' + # 数据库支持 sqlite 和 postgres 两种,使用 sqlite 时 dsn 应当填写文件存储路径 + [Data.Database] + Dsn = './configs/data.db' + MaxIdleConns = 1 + MaxOpenConns = 1 + ConnMaxLifetime = '6h0m0s' + SlowThreshold = '200ms' [Log] -# 日志存储目录,不能使用特殊符号 -Dir = './logs' -# 记录级别 debug/info/warn/error -Level = 'debug' -# 保留日志多久,超过时间自动删除 -MaxAge = '744h0m0s' -# 多久时间,分割一个新的日志文件 -RotationTime = '12h0m0s' -# 多大文件,分割一个新的日志文件(MB) -RotationSize = 50 + # 日志存储目录,不能使用特殊符号 + Dir = './logs' + # 记录级别 debug/info/warn/error + Level = 'debug' + # 保留日志多久,超过时间自动删除 + MaxAge = '744h0m0s' + # 多久时间,分割一个新的日志文件 + RotationTime = '12h0m0s' + # 多大文件,分割一个新的日志文件(MB) + RotationSize = 50 [Sip] -# 服务监听的 tcp/udp 端口号 -Port = 15060 -# gb/t28181 20 位国标 ID -ID = '3402000000200000001' -# 域 -Domain = '3402000000' -# 注册密码 -Password = '' + # 服务监听的 tcp/udp 端口号 + Port = 15060 + # gb/t28181 20 位国标 ID + ID = '3402000000200000001' + # 域 + Domain = '3402000000' + # 注册密码 + Password = '' [Media] -# 媒体服务器 IP -IP = '127.0.0.1' -# 媒体服务器 HTTP 端口 -HTTPPort = 8080 -# 媒体服务器密钥 -Secret = 'jvRqCAzEg7AszBi4gm1cfhwXpmnVmJMG' -# 用于流媒体 webhook 回调 -WebHookIP = '192.168.10.37' -# 媒体服务器 RTP 端口范围 -RTPPortRange = '20000-20100' -# 媒体服务器 SDP IP -SDPIP = '192.168.10.37' + # 媒体服务器 IP + IP = '127.0.0.1' + # 媒体服务器 HTTP 端口 + HTTPPort = 8080 + # 媒体服务器密钥 + Secret = 'jvRqCAzEg7AszBi4gm1cfhwXpmnVmJMG' + # 用于流媒体 webhook 回调 + WebHookIP = '192.168.10.10' + # 媒体服务器 RTP 端口范围 + RTPPortRange = '20000-20100' + # 媒体服务器 SDP IP + SDPIP = '192.168.10.10' diff --git a/internal/core/sms/node_manager.go b/internal/core/sms/node_manager.go index bce507e..54d7c7e 100644 --- a/internal/core/sms/node_manager.go +++ b/internal/core/sms/node_manager.go @@ -168,6 +168,7 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error { // HookOnHTTPAccess: zlm.NewString(""), HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)), + HookOnStreamNotFound: zlm.NewString(fmt.Sprintf("%s/on_stream_not_found", hookPrefix)), HookOnRecordTs: zlm.NewString(""), HookOnRtspAuth: zlm.NewString(""), HookOnRtspRealm: zlm.NewString(""), diff --git a/internal/web/api/api.go b/internal/web/api/api.go index 905690f..f4c511c 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -28,6 +28,8 @@ var startRuntime = time.Now() func setupRouter(r *gin.Engine, uc *Usecase) { uc.GB28181API.uc = uc uc.SMSAPI.uc = uc + uc.WebHookAPI.uc = uc + go stat.LoadTop(system.Getwd(), func(m map[string]any) { _ = m }) @@ -175,6 +177,8 @@ func sortExpvarMap(data *expvar.Map, top int) []KV { } func (uc *Usecase) proxySMS(c *gin.Context) { + defer recover() + rc := http.NewResponseController(c.Writer) exp := time.Now().AddDate(99, 0, 0) _ = rc.SetReadDeadline(exp) diff --git a/internal/web/api/gb28181.go b/internal/web/api/gb28181.go index 29d2574..627f938 100755 --- a/internal/web/api/gb28181.go +++ b/internal/web/api/gb28181.go @@ -15,12 +15,10 @@ import ( "github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/internal/core/media" "github.com/gowvp/gb28181/internal/core/sms" - "github.com/gowvp/gb28181/pkg/gbs" "github.com/gowvp/gb28181/pkg/zlm" "github.com/ixugo/goddd/domain/uniqueid" "github.com/ixugo/goddd/pkg/orm" "github.com/ixugo/goddd/pkg/reason" - "github.com/ixugo/goddd/pkg/system" "github.com/ixugo/goddd/pkg/web" ) @@ -32,13 +30,13 @@ const ( // TODO: 快照不会删除,只会覆盖,设备删除时也不会删除快照,待实现 func writeCover(dataDir, channelID string, body []byte) error { - coverPath := filepath.Join(system.Getwd(), dataDir, coverDir) + coverPath := filepath.Join(dataDir, coverDir) os.MkdirAll(coverPath, 0o755) return os.WriteFile(filepath.Join(coverPath, channelID+".jpg"), body, 0o644) } func readCoverPath(dataDir, channelID string) string { - coverPath := filepath.Join(system.Getwd(), dataDir, coverDir) + coverPath := filepath.Join(dataDir, coverDir) return filepath.Join(coverPath, channelID+".jpg") } @@ -179,18 +177,6 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { return nil, err } - dev, err := a.gb28181Core.GetDeviceByDeviceID(c.Request.Context(), ch.DeviceID) - if err != nil { - return nil, err - } - - if err := a.uc.SipServer.Play(&gbs.PlayInput{ - Channel: ch, - StreamMode: dev.StreamMode, - SMS: svr, - }); err != nil { - return nil, ErrDevice.SetMsg(err.Error()) - } } else if strings.HasPrefix(channelID, bz.IDPrefixRTMP) { push, err := a.uc.MediaAPI.mediaCore.GetStreamPush(c.Request.Context(), channelID) if err != nil { @@ -335,7 +321,7 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a } } - return gin.H{"link": fmt.Sprintf("/api/channels/%s/snapshot", channelID)}, nil + return gin.H{"link": fmt.Sprintf("/channels/%s/snapshot", channelID)}, nil } func (a GB28181API) getSnapshot(c *gin.Context) { diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index 74dbff8..fc9a9a0 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -20,6 +20,7 @@ type WebHookAPI struct { conf *conf.Bootstrap log *slog.Logger gbs *gbs.Server + uc *Usecase } func NewWebHookAPI(core sms.Core, mediaCore media.Core, conf *conf.Bootstrap, gbs *gbs.Server, gb28181 gb28181.Core) WebHookAPI { @@ -42,6 +43,7 @@ func registerZLMWebhookAPI(r gin.IRouter, api WebHookAPI, handler ...gin.Handler group.POST("/on_play", web.WarpH(api.onPlay)) group.POST("/on_stream_none_reader", web.WarpH(api.onStreamNoneReader)) group.POST("/on_rtp_server_timeout", web.WarpH(api.onRTPServerTimeout)) + group.POST("/on_stream_not_found", web.WarpH(api.onStreamNotFound)) } } @@ -159,3 +161,38 @@ func (w WebHookAPI) onRTPServerTimeout(c *gin.Context, in *onRTPServerTimeoutInp w.log.Info("rtp 收流超时", "local_port", in.LocalPort, "ssrc", in.SSRC, "stream_id", in.StreamID, "mediaServerID", in.MediaServerID) return newDefaultOutputOK(), nil } + +func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) (DefaultOutput, error) { + w.log.Info("流不存在", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) + + // 国标流处理 + if in.App == "rtp" { + ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) + if err != nil { + // slog.Error("获取通道失败", "err", err) + return newDefaultOutputOK(), nil + } + + dev, err := w.gb28181Core.GetDeviceByDeviceID(c.Request.Context(), ch.DeviceID) + if err != nil { + // slog.Error("获取设备失败", "err", err) + return newDefaultOutputOK(), nil + } + + svr, err := w.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID) + if err != nil { + // slog.Error("GetMediaServer", "err", err) + return newDefaultOutputOK(), nil + } + + if err := w.gbs.Play(&gbs.PlayInput{ + Channel: ch, + StreamMode: dev.StreamMode, + SMS: svr, + }); err != nil { + slog.Error("play", "err", err, "channel", ch.ID) + return newDefaultOutputOK(), nil + } + } + return newDefaultOutputOK(), nil +} diff --git a/internal/web/api/zlm_webhook_param.go b/internal/web/api/zlm_webhook_param.go index 84fb314..0e900dd 100644 --- a/internal/web/api/zlm_webhook_param.go +++ b/internal/web/api/zlm_webhook_param.go @@ -201,3 +201,15 @@ type onRTPServerTimeoutInput struct { TCPMode int `json:"tcp_mode"` // openRtpServer 输入的参数 MediaServerID string `json:"mediaServerId"` // 服务器 id,通过配置文件设置 } + +type onStreamNotFoundInput struct { + MediaServerID string `json:"mediaServerId"` // 服务器 id,通过配置文件设置 + App string `json:"app"` // 流应用名 + ID string `json:"id"` // TCP链接唯一ID + IP string `json:"ip"` // 播放器ip + Params string `json:"params"` // 播放url参数 + Port int `json:"port"` // 播放器端口号 + Schema string `json:"schema"` // 播放的协议,可能是rtsp、rtmp、http + Stream string `json:"stream"` // 流 ID + Vhost string `json:"vhost"` // 流虚拟主机 +} diff --git a/main.go b/main.go index 294202d..7b81d36 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,8 @@ func main() { } bc.Debug = !getBuildRelease() bc.BuildVersion = buildVersion + bc.ConfigDir = filedir + bc.ConfigPath = filePath { expvar.NewString("version").Set(buildVersion) diff --git a/pkg/gbs/play.go b/pkg/gbs/play.go index 5c69053..16c490c 100644 --- a/pkg/gbs/play.go +++ b/pkg/gbs/play.go @@ -24,23 +24,18 @@ type StopPlayInput struct { Channel *gb28181.Channel } -func (g *GB28181API) StopPlay(in *StopPlayInput) error { - ch, ok := g.svr.memoryStorer.GetChannel(in.Channel.DeviceID, in.Channel.ChannelID) - if !ok { - return ErrDeviceNotExist - } - - ch.device.playMutex.Lock() - defer ch.device.playMutex.Unlock() - +// stopPlay 不加锁的 +func (g *GB28181API) stopPlay(ch *Channel, in *StopPlayInput) error { key := "play:" + in.Channel.DeviceID + ":" + in.Channel.ChannelID stream, ok := g.streams.LoadAndDelete(key) if !ok { return nil } + if stream.Resp == nil { return nil } + req := sip.NewRequestFromResponse(sip.MethodBYE, stream.Resp) req.SetDestination(ch.Source()) req.SetConnection(ch.Conn()) @@ -53,6 +48,18 @@ func (g *GB28181API) StopPlay(in *StopPlayInput) error { return err } +// StopPlay 加锁的停止播放 +func (g *GB28181API) StopPlay(in *StopPlayInput) error { + ch, ok := g.svr.memoryStorer.GetChannel(in.Channel.DeviceID, in.Channel.ChannelID) + if !ok { + return ErrDeviceNotExist + } + + ch.device.playMutex.Lock() + defer ch.device.playMutex.Unlock() + return g.stopPlay(ch, in) +} + func (g *GB28181API) Play(in *PlayInput) error { ch, ok := g.svr.memoryStorer.GetChannel(in.Channel.DeviceID, in.Channel.ChannelID) if !ok { @@ -68,7 +75,7 @@ func (g *GB28181API) Play(in *PlayInput) error { if ok { // TODO: 临时解决方案,每次播放,先停止再播放 // https://github.com/gowvp/gb28181/issues/16 - if err := g.StopPlay(&StopPlayInput{ + if err := g.stopPlay(ch, &StopPlayInput{ Channel: in.Channel, }); err != nil { slog.Error("stop play failed", "err", err)