重构状态处理

This commit is contained in:
xugo
2025-06-14 18:11:36 +08:00
parent 57a9a5b42e
commit 3fc455ecdd
16 changed files with 114 additions and 15 deletions

View File

@@ -204,7 +204,8 @@ services:
- [x] 支持输出 HTTP_FLV,Websocket_FLV,HLS,WebRTC,RTSP、RTMP 等多种协议流地址
- [x] 支持局域网/互联网/多层 NAT/特殊网络环境部署
- [x] 支持 SQLite 数据库快速部署
- [x] 支持 PostgreSQL 数据库,当接入设备数超过 300 时推荐
- [x] 支持 PostgreSQL 数据库
- [x] 服务重启自动离线/自动尝试连接
- [x] GB/T 28181
- [x] 设备注册,支持 7 种接入方式
- [x] 支持 UDP 和 TCP 两种国标信令传输模式
@@ -215,10 +216,11 @@ services:
- [x] 设备基础配置查询(例如设备侧填写超时 3 秒,次数 3 次,则 9+x 秒左右收不到心跳认为离线x 是检测间隔周期)
- [x] 设备实时直播
- [x] 支持 UDP 和 TCP 被动两种国标流传输模式
- [x] 按需拉流,节省流量
- [x] 按需拉流,节省流量 (60秒无人观看自动停止)
- [x] 视频支持播放 H264 和 H265
- [x] 音频支持 g711a/g711u/aac
- [x] 快照
- [x] 支持跨域
- [ ] 设备云台控制
- [ ] 录像回放
- [ ] 报警事件订阅

View File

@@ -43,7 +43,7 @@
# 服务监听的 tcp/udp 端口号
Port = 15060
# gb/t28181 20 位国标 ID
ID = '3402000000200000001'
ID = '34020000002000000001'
# 域
Domain = '3402000000'
# 注册密码

1
go.mod
View File

@@ -4,6 +4,7 @@ go 1.24
require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/gin-contrib/cors v1.7.5
github.com/gin-contrib/gzip v1.2.3
github.com/gin-gonic/gin v1.10.0
github.com/glebarez/sqlite v1.11.0

2
go.sum
View File

@@ -15,6 +15,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk=
github.com/gin-contrib/cors v1.7.5/go.mod h1:4q3yi7xBEDDWKapjT2o1V7mScKDDr8k+jZ0fSquGoy0=
github.com/gin-contrib/gzip v1.2.3 h1:dAhT722RuEG330ce2agAs75z7yB+NKvX/ZM1r8w0u2U=
github.com/gin-contrib/gzip v1.2.3/go.mod h1:ad72i4Bzmaypk8M762gNXa2wkxxjbz0icRNnuLJ9a/c=
github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E=

View File

@@ -34,7 +34,7 @@ func Run(bc *conf.Bootstrap) {
// TODO: 异步发现 zlm 配置,有概率程序启动了,才找到 zlm 的秘钥,建议提前配置好秘钥
go setupSecret(bc)
// 如果需要执行表迁移,递增此版本号和表更新说明
versionapi.DBVersion = "0.0.10"
versionapi.DBVersion = "0.0.11"
versionapi.DBRemark = "add stream proxy"
handler, cleanUp, err := wireApp(bc, log)

View File

@@ -1,6 +1,8 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
import "github.com/ixugo/goddd/pkg/orm"
// Channel domain model
type Channel struct {
ID string `gorm:"primaryKey" json:"id"`
@@ -11,6 +13,8 @@ type Channel struct {
PTZType int `gorm:"column:ptztype;notNull;default:0;comment:云台类型" json:"ptztype"` // 云台类型
IsOnline bool `gorm:"column:is_online;notNull;default:FALSE;comment:是否在线" json:"is_online"` // 是否在线
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"`
CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间
}
// TableName database table name

View File

@@ -8,6 +8,7 @@ import (
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/pkg/orm"
"github.com/ixugo/goddd/pkg/reason"
"github.com/ixugo/goddd/pkg/web"
"github.com/jinzhu/copier"
"gorm.io/gorm"
)
@@ -23,6 +24,35 @@ type DeviceStorer interface {
Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error
}
func (c Core) FindChannelsForDevice(ctx context.Context, in *FindDeviceInput) ([]*Device, int64, error) {
items := make([]*Device, 0, in.Limit())
query := orm.NewQuery(3)
query.OrderBy("created_at DESC")
total, err := c.store.Device().Find(ctx, &items, in, query.Encode()...)
if err != nil {
return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error())
}
for _, item := range items {
const size = 3
item.Children = make([]*Channel, 0, size)
query := orm.NewQuery(2).OrderBy("created_at DESC").Where("did=?", item.ID)
_, err := c.store.Channel().Find(ctx, &item.Children, web.PagerFilter{Size: size}, query.Encode()...)
if err != nil {
continue
}
for _, ch := range item.Children {
if !item.IsOnline {
ch.IsOnline = false
}
}
}
return items, total, nil
}
// FindDevice Paginated search
func (c Core) FindDevice(ctx context.Context, in *FindDeviceInput) ([]*Device, int64, error) {
items := make([]*Device, 0)

View File

@@ -27,6 +27,8 @@ type Device struct {
Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"`
Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"`
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性
Children []*Channel `gorm:"-" json:"children,omitzero"`
}
// TableName database table name

View File

@@ -56,6 +56,11 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) {
for _, d := range devices {
if strings.ToLower(d.Trasnport) == "tcp" {
// 通知相关设备/通道离线
c.Change(d.DeviceID, func(d *gb28181.Device) {
d.IsOnline = false
}, func(d *gbs.Device) {
d.IsOnline = false
})
continue
}

View File

@@ -166,12 +166,13 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error {
HookOnFlowReport: zlm.NewString(""),
HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)),
// HookOnHTTPAccess: zlm.NewString(""),
HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)),
HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)),
HookOnStreamNotFound: zlm.NewString(fmt.Sprintf("%s/on_stream_not_found", hookPrefix)),
HookOnRecordTs: zlm.NewString(""),
HookOnRtspAuth: zlm.NewString(""),
HookOnRtspRealm: zlm.NewString(""),
HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)),
HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)),
GeneralStreamNoneReaderDelayMS: zlm.NewString("60000"),
HookOnStreamNotFound: zlm.NewString(fmt.Sprintf("%s/on_stream_not_found", hookPrefix)),
HookOnRecordTs: zlm.NewString(""),
HookOnRtspAuth: zlm.NewString(""),
HookOnRtspRealm: zlm.NewString(""),
// HookOnServerStarted: ,
HookOnShellLogin: zlm.NewString(""),
HookOnStreamChanged: zlm.NewString(fmt.Sprintf("%s/on_stream_changed", hookPrefix)),

View File

@@ -14,6 +14,7 @@ import (
"strings"
"time"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/plugin/stat"
@@ -48,6 +49,24 @@ func setupRouter(r *gin.Engine, uc *Usecase) {
)
go web.CountGoroutines(10*time.Minute, 20)
r.Use(cors.New(cors.Config{
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"},
AllowHeaders: []string{
"Accept", "Content-Length", "Content-Type", "Range", "Accept-Language",
"Origin", "Authorization",
"Accept-Encoding",
"Cache-Control", "Pragma", "X-Requested-With",
"Sec-Fetch-Mode", "Sec-Fetch-Site", "Sec-Fetch-Dest",
"Dnt", "X-Forwarded-For", "X-Forwarded-Proto", "X-Forwarded-Host",
"X-Real-IP", "X-Request-ID", "X-Request-Start", "X-Request-Time",
},
AllowCredentials: true,
MaxAge: 12 * time.Hour,
AllowOriginFunc: func(origin string) bool {
return true
},
}))
const staticPrefix = "/web"
const staticDir = "www"
admin := r.Group(staticPrefix, gzip.Gzip(gzip.DefaultCompression))
@@ -177,7 +196,9 @@ func sortExpvarMap(data *expvar.Map, top int) []KV {
}
func (uc *Usecase) proxySMS(c *gin.Context) {
defer recover()
defer func() {
_ = recover()
}()
rc := http.NewResponseController(c.Writer)
exp := time.Now().AddDate(99, 0, 0)

View File

@@ -75,6 +75,8 @@ func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc)
group.DELETE("/:id", web.WarpH(api.delDevice))
group.POST("/:id/catalog", web.WarpH(api.queryCatalog)) // 刷新通道
group.GET("/channels", web.WarpH(api.FindChannelsForDevice))
}
{
@@ -125,6 +127,11 @@ func (a GB28181API) queryCatalog(c *gin.Context, _ *struct{}) (any, error) {
return gin.H{"msg": "ok"}, nil
}
func (a GB28181API) FindChannelsForDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) {
items, total, err := a.gb28181Core.FindChannelsForDevice(c.Request.Context(), in)
return gin.H{"items": items, "total": total}, err
}
// >>> channel >>>>>>>>>>>>>>>>>>>>
func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) {

View File

@@ -114,9 +114,6 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D
// https://docs.zlmediakit.com/guide/media_server/web_hook_api.html#_6-on-play
func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, error) {
return newDefaultOutputOK(), nil
if in.App == "rtp" {
return newDefaultOutputOK(), nil
}
switch in.Schema {
case "rtmp":
@@ -150,6 +147,15 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e
func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInput) (onStreamNoneReaderOutput, error) {
// rtmp 无人观看时,也允许推流
w.log.Info("无人观看", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID)
if in.App == "rtp" {
ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream)
if err != nil {
w.log.Warn("获取通道失败", "err", err)
return onStreamNoneReaderOutput{Close: true}, nil
}
_ = w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch})
}
// 存在录像计划时,不关闭流
return onStreamNoneReaderOutput{Close: true}, nil
}
@@ -173,7 +179,7 @@ func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput)
return newDefaultOutputOK(), nil
}
dev, err := w.gb28181Core.GetDeviceByDeviceID(c.Request.Context(), ch.DeviceID)
dev, err := w.gb28181Core.GetDevice(c.Request.Context(), ch.DID)
if err != nil {
// slog.Error("获取设备失败", "err", err)
return newDefaultOutputOK(), nil

View File

@@ -23,12 +23,23 @@ func (g *GB28181API) sipMessageKeepalive(ctx *sip.Context) {
return
}
// 程序重启时会丢内存,收到 keepalive 时,补上
// 并未补充到
g.svr.memoryStorer.LoadOrStore(ctx.DeviceID, &Device{
conn: ctx.Request.GetConnection(),
source: ctx.Source,
to: ctx.To,
})
if err := g.svr.memoryStorer.Change(ctx.DeviceID, func(d *gb28181.Device) {
d.KeepaliveAt = orm.Now()
d.IsOnline = msg.Status == "OK" || msg.Status == "ON"
d.Address = ctx.Source.String()
d.Trasnport = ctx.Source.Network()
}, func(d *Device) {
d.conn = ctx.Request.GetConnection()
d.source = ctx.Source
d.to = ctx.To
}); err != nil {
ctx.Log.Error("keepalive", "err", err)
}

View File

@@ -72,6 +72,9 @@ func (c *Context) parserRequest() error {
c.Source = req.Source()
c.To = NewAddressFromFromHeader(header)
if c.To == nil {
slog.Error(">>>>>>>> to is nil", "header", header)
}
c.Log = slog.Default().With("deviceID", c.DeviceID, "host", c.Host)
return nil

View File

@@ -131,6 +131,10 @@ func (hb *HeadersBuilder) SetFrom(address *Address) *HeadersBuilder {
// SetTo ToHeader
func (hb *HeadersBuilder) SetTo(address *Address) *HeadersBuilder {
// TODO: 防止崩溃,但应该在上层,防止传递空指针
if address == nil {
return hb
}
address = address.Clone()
if address.URI.Host() == "" {
address.URI.SetHost(hb.host)