From 3fc455ecdd892cbcda7e67a57eb93f21cda35d3c Mon Sep 17 00:00:00 2001 From: xugo Date: Sat, 14 Jun 2025 18:11:36 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E7=8A=B6=E6=80=81=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 ++-- configs/config.toml | 2 +- go.mod | 1 + go.sum | 2 ++ internal/app/app.go | 2 +- internal/core/gb28181/channel.model.go | 4 +++ internal/core/gb28181/device.go | 30 +++++++++++++++++++ internal/core/gb28181/device.model.go | 2 ++ .../core/gb28181/store/gb28181cache/cache.go | 5 ++++ internal/core/sms/node_manager.go | 13 ++++---- internal/web/api/api.go | 23 +++++++++++++- internal/web/api/gb28181.go | 7 +++++ internal/web/api/zlm_webhook.go | 14 ++++++--- pkg/gbs/keepalive.go | 11 +++++++ pkg/gbs/sip/context.go | 3 ++ pkg/gbs/sip/header.go | 4 +++ 16 files changed, 114 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index b68ff44..53b306e 100644 --- a/README.md +++ b/README.md @@ -204,7 +204,8 @@ services: - [x] 支持输出 HTTP_FLV,Websocket_FLV,HLS,WebRTC,RTSP、RTMP 等多种协议流地址 - [x] 支持局域网/互联网/多层 NAT/特殊网络环境部署 - [x] 支持 SQLite 数据库快速部署 -- [x] 支持 PostgreSQL 数据库,当接入设备数超过 300 时推荐 +- [x] 支持 PostgreSQL 数据库 +- [x] 服务重启自动离线/自动尝试连接 - [x] GB/T 28181 - [x] 设备注册,支持 7 种接入方式 - [x] 支持 UDP 和 TCP 两种国标信令传输模式 @@ -215,10 +216,11 @@ services: - [x] 设备基础配置查询(例如设备侧填写超时 3 秒,次数 3 次,则 9+x 秒左右收不到心跳认为离线,x 是检测间隔周期) - [x] 设备实时直播 - [x] 支持 UDP 和 TCP 被动两种国标流传输模式 - - [x] 按需拉流,节省流量 + - [x] 按需拉流,节省流量 (60秒无人观看自动停止) - [x] 视频支持播放 H264 和 H265 - [x] 音频支持 g711a/g711u/aac - [x] 快照 + - [x] 支持跨域 - [ ] 设备云台控制 - [ ] 录像回放 - [ ] 报警事件订阅 diff --git a/configs/config.toml b/configs/config.toml index 928907c..34045b5 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -43,7 +43,7 @@ # 服务监听的 tcp/udp 端口号 Port = 15060 # gb/t28181 20 位国标 ID - ID = '3402000000200000001' + ID = '34020000002000000001' # 域 Domain = '3402000000' # 注册密码 diff --git a/go.mod b/go.mod index ed46b4f..b3370f2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24 require ( github.com/DATA-DOG/go-sqlmock v1.5.2 + github.com/gin-contrib/cors v1.7.5 github.com/gin-contrib/gzip v1.2.3 github.com/gin-gonic/gin v1.10.0 github.com/glebarez/sqlite v1.11.0 diff --git a/go.sum b/go.sum index 5e10906..78d3f81 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= +github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk= +github.com/gin-contrib/cors v1.7.5/go.mod h1:4q3yi7xBEDDWKapjT2o1V7mScKDDr8k+jZ0fSquGoy0= github.com/gin-contrib/gzip v1.2.3 h1:dAhT722RuEG330ce2agAs75z7yB+NKvX/ZM1r8w0u2U= github.com/gin-contrib/gzip v1.2.3/go.mod h1:ad72i4Bzmaypk8M762gNXa2wkxxjbz0icRNnuLJ9a/c= github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E= diff --git a/internal/app/app.go b/internal/app/app.go index dd65d67..8984048 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -34,7 +34,7 @@ func Run(bc *conf.Bootstrap) { // TODO: 异步发现 zlm 配置,有概率程序启动了,才找到 zlm 的秘钥,建议提前配置好秘钥 go setupSecret(bc) // 如果需要执行表迁移,递增此版本号和表更新说明 - versionapi.DBVersion = "0.0.10" + versionapi.DBVersion = "0.0.11" versionapi.DBRemark = "add stream proxy" handler, cleanUp, err := wireApp(bc, log) diff --git a/internal/core/gb28181/channel.model.go b/internal/core/gb28181/channel.model.go index b2c17e5..4b17b74 100755 --- a/internal/core/gb28181/channel.model.go +++ b/internal/core/gb28181/channel.model.go @@ -1,6 +1,8 @@ // Code generated by godddx, DO AVOID EDIT. package gb28181 +import "github.com/ixugo/goddd/pkg/orm" + // Channel domain model type Channel struct { ID string `gorm:"primaryKey" json:"id"` @@ -11,6 +13,8 @@ type Channel struct { PTZType int `gorm:"column:ptztype;notNull;default:0;comment:云台类型" json:"ptztype"` // 云台类型 IsOnline bool `gorm:"column:is_online;notNull;default:FALSE;comment:是否在线" json:"is_online"` // 是否在线 Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"` + CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 + UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 } // TableName database table name diff --git a/internal/core/gb28181/device.go b/internal/core/gb28181/device.go index d5b8e13..6cdb5e8 100755 --- a/internal/core/gb28181/device.go +++ b/internal/core/gb28181/device.go @@ -8,6 +8,7 @@ import ( "github.com/gowvp/gb28181/internal/core/bz" "github.com/ixugo/goddd/pkg/orm" "github.com/ixugo/goddd/pkg/reason" + "github.com/ixugo/goddd/pkg/web" "github.com/jinzhu/copier" "gorm.io/gorm" ) @@ -23,6 +24,35 @@ type DeviceStorer interface { Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error } +func (c Core) FindChannelsForDevice(ctx context.Context, in *FindDeviceInput) ([]*Device, int64, error) { + items := make([]*Device, 0, in.Limit()) + + query := orm.NewQuery(3) + query.OrderBy("created_at DESC") + + total, err := c.store.Device().Find(ctx, &items, in, query.Encode()...) + if err != nil { + return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error()) + } + + for _, item := range items { + const size = 3 + item.Children = make([]*Channel, 0, size) + query := orm.NewQuery(2).OrderBy("created_at DESC").Where("did=?", item.ID) + _, err := c.store.Channel().Find(ctx, &item.Children, web.PagerFilter{Size: size}, query.Encode()...) + if err != nil { + continue + } + + for _, ch := range item.Children { + if !item.IsOnline { + ch.IsOnline = false + } + } + } + return items, total, nil +} + // FindDevice Paginated search func (c Core) FindDevice(ctx context.Context, in *FindDeviceInput) ([]*Device, int64, error) { items := make([]*Device, 0) diff --git a/internal/core/gb28181/device.model.go b/internal/core/gb28181/device.model.go index cccf996..e697ce6 100755 --- a/internal/core/gb28181/device.model.go +++ b/internal/core/gb28181/device.model.go @@ -27,6 +27,8 @@ type Device struct { Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"` Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"` Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性 + + Children []*Channel `gorm:"-" json:"children,omitzero"` } // TableName database table name diff --git a/internal/core/gb28181/store/gb28181cache/cache.go b/internal/core/gb28181/store/gb28181cache/cache.go index 97d9d76..a03c808 100644 --- a/internal/core/gb28181/store/gb28181cache/cache.go +++ b/internal/core/gb28181/store/gb28181cache/cache.go @@ -56,6 +56,11 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) { for _, d := range devices { if strings.ToLower(d.Trasnport) == "tcp" { // 通知相关设备/通道离线 + c.Change(d.DeviceID, func(d *gb28181.Device) { + d.IsOnline = false + }, func(d *gbs.Device) { + d.IsOnline = false + }) continue } diff --git a/internal/core/sms/node_manager.go b/internal/core/sms/node_manager.go index 54d7c7e..61f68a5 100644 --- a/internal/core/sms/node_manager.go +++ b/internal/core/sms/node_manager.go @@ -166,12 +166,13 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error { HookOnFlowReport: zlm.NewString(""), HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)), // HookOnHTTPAccess: zlm.NewString(""), - HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), - HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)), - HookOnStreamNotFound: zlm.NewString(fmt.Sprintf("%s/on_stream_not_found", hookPrefix)), - HookOnRecordTs: zlm.NewString(""), - HookOnRtspAuth: zlm.NewString(""), - HookOnRtspRealm: zlm.NewString(""), + HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), + HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)), + GeneralStreamNoneReaderDelayMS: zlm.NewString("60000"), + HookOnStreamNotFound: zlm.NewString(fmt.Sprintf("%s/on_stream_not_found", hookPrefix)), + HookOnRecordTs: zlm.NewString(""), + HookOnRtspAuth: zlm.NewString(""), + HookOnRtspRealm: zlm.NewString(""), // HookOnServerStarted: , HookOnShellLogin: zlm.NewString(""), HookOnStreamChanged: zlm.NewString(fmt.Sprintf("%s/on_stream_changed", hookPrefix)), diff --git a/internal/web/api/api.go b/internal/web/api/api.go index f4c511c..762b74c 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/gin-contrib/cors" "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/gowvp/gb28181/plugin/stat" @@ -48,6 +49,24 @@ func setupRouter(r *gin.Engine, uc *Usecase) { ) go web.CountGoroutines(10*time.Minute, 20) + r.Use(cors.New(cors.Config{ + AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}, + AllowHeaders: []string{ + "Accept", "Content-Length", "Content-Type", "Range", "Accept-Language", + "Origin", "Authorization", + "Accept-Encoding", + "Cache-Control", "Pragma", "X-Requested-With", + "Sec-Fetch-Mode", "Sec-Fetch-Site", "Sec-Fetch-Dest", + "Dnt", "X-Forwarded-For", "X-Forwarded-Proto", "X-Forwarded-Host", + "X-Real-IP", "X-Request-ID", "X-Request-Start", "X-Request-Time", + }, + AllowCredentials: true, + MaxAge: 12 * time.Hour, + AllowOriginFunc: func(origin string) bool { + return true + }, + })) + const staticPrefix = "/web" const staticDir = "www" admin := r.Group(staticPrefix, gzip.Gzip(gzip.DefaultCompression)) @@ -177,7 +196,9 @@ func sortExpvarMap(data *expvar.Map, top int) []KV { } func (uc *Usecase) proxySMS(c *gin.Context) { - defer recover() + defer func() { + _ = recover() + }() rc := http.NewResponseController(c.Writer) exp := time.Now().AddDate(99, 0, 0) diff --git a/internal/web/api/gb28181.go b/internal/web/api/gb28181.go index 627f938..1981199 100755 --- a/internal/web/api/gb28181.go +++ b/internal/web/api/gb28181.go @@ -75,6 +75,8 @@ func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc) group.DELETE("/:id", web.WarpH(api.delDevice)) group.POST("/:id/catalog", web.WarpH(api.queryCatalog)) // 刷新通道 + + group.GET("/channels", web.WarpH(api.FindChannelsForDevice)) } { @@ -125,6 +127,11 @@ func (a GB28181API) queryCatalog(c *gin.Context, _ *struct{}) (any, error) { return gin.H{"msg": "ok"}, nil } +func (a GB28181API) FindChannelsForDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) { + items, total, err := a.gb28181Core.FindChannelsForDevice(c.Request.Context(), in) + return gin.H{"items": items, "total": total}, err +} + // >>> channel >>>>>>>>>>>>>>>>>>>> func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) { diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index fc9a9a0..136feb3 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -114,9 +114,6 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D // https://docs.zlmediakit.com/guide/media_server/web_hook_api.html#_6-on-play func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, error) { return newDefaultOutputOK(), nil - if in.App == "rtp" { - return newDefaultOutputOK(), nil - } switch in.Schema { case "rtmp": @@ -150,6 +147,15 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInput) (onStreamNoneReaderOutput, error) { // rtmp 无人观看时,也允许推流 w.log.Info("无人观看", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID) + + if in.App == "rtp" { + ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) + if err != nil { + w.log.Warn("获取通道失败", "err", err) + return onStreamNoneReaderOutput{Close: true}, nil + } + _ = w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch}) + } // 存在录像计划时,不关闭流 return onStreamNoneReaderOutput{Close: true}, nil } @@ -173,7 +179,7 @@ func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) return newDefaultOutputOK(), nil } - dev, err := w.gb28181Core.GetDeviceByDeviceID(c.Request.Context(), ch.DeviceID) + dev, err := w.gb28181Core.GetDevice(c.Request.Context(), ch.DID) if err != nil { // slog.Error("获取设备失败", "err", err) return newDefaultOutputOK(), nil diff --git a/pkg/gbs/keepalive.go b/pkg/gbs/keepalive.go index 04ff5dc..79d019f 100644 --- a/pkg/gbs/keepalive.go +++ b/pkg/gbs/keepalive.go @@ -23,12 +23,23 @@ func (g *GB28181API) sipMessageKeepalive(ctx *sip.Context) { return } + // 程序重启时会丢内存,收到 keepalive 时,补上 + // 并未补充到 + g.svr.memoryStorer.LoadOrStore(ctx.DeviceID, &Device{ + conn: ctx.Request.GetConnection(), + source: ctx.Source, + to: ctx.To, + }) + if err := g.svr.memoryStorer.Change(ctx.DeviceID, func(d *gb28181.Device) { d.KeepaliveAt = orm.Now() d.IsOnline = msg.Status == "OK" || msg.Status == "ON" d.Address = ctx.Source.String() d.Trasnport = ctx.Source.Network() }, func(d *Device) { + d.conn = ctx.Request.GetConnection() + d.source = ctx.Source + d.to = ctx.To }); err != nil { ctx.Log.Error("keepalive", "err", err) } diff --git a/pkg/gbs/sip/context.go b/pkg/gbs/sip/context.go index 7a854c4..01c4059 100644 --- a/pkg/gbs/sip/context.go +++ b/pkg/gbs/sip/context.go @@ -72,6 +72,9 @@ func (c *Context) parserRequest() error { c.Source = req.Source() c.To = NewAddressFromFromHeader(header) + if c.To == nil { + slog.Error(">>>>>>>> to is nil", "header", header) + } c.Log = slog.Default().With("deviceID", c.DeviceID, "host", c.Host) return nil diff --git a/pkg/gbs/sip/header.go b/pkg/gbs/sip/header.go index faa5f35..4693167 100644 --- a/pkg/gbs/sip/header.go +++ b/pkg/gbs/sip/header.go @@ -131,6 +131,10 @@ func (hb *HeadersBuilder) SetFrom(address *Address) *HeadersBuilder { // SetTo ToHeader func (hb *HeadersBuilder) SetTo(address *Address) *HeadersBuilder { + // TODO: 防止崩溃,但应该在上层,防止传递空指针 + if address == nil { + return hb + } address = address.Clone() if address.URI.Host() == "" { address.URI.SetHost(hb.host)