diff --git a/README.md b/README.md index 219d278..674a3ef 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,8 @@ Java WVP @648540858 [wvp-GB28181-pro](https://github.com/648540858/wvp-GB28181-p [如何使用 OBS RTMP 推流到 GB/T28181平台](https://juejin.cn/post/7463350947100786739) +[GB28181 七种注册姿势](https://juejin.cn/post/7465274924899532838) + 码字中... @@ -110,28 +112,27 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit - ## 功能特性 - [x] 集成 web 界面 - [x] 兼容性良好 - [x] 接入设备 - - [ ] 视频预览 + - [x] 视频预览 - [ ] 支持主码流子码流切换 - - [ ] 无限制接入路数,能接入多少设备只取决于你的服务器性能 + - [x] 无限制接入路数,能接入多少设备只取决于你的服务器性能 - [ ] 云台控制,控制设备转向,拉近,拉远 - [ ] 预置位查询,使用与设置 - [ ] 查询 NVR/IPC 上的录像与播放,支持指定时间播放与下载 - [ ] 无人观看自动断流,节省流量 - - [ ] 视频设备信息同步 + - [x] 视频设备信息同步 - [ ] 离在线监控 - - [ ] 支持直接输出RTSP、RTMP、HTTP-FLV、Websocket-FLV、HLS多种协议流地址 + - [x] 支持直接输出RTSP、RTMP、HTTP-FLV、Websocket-FLV、HLS多种协议流地址 - [ ] 支持通过一个流地址直接观看摄像头,无需登录以及调用任何接口 - - [ ] 支持 UDP 和 TCP 两种国标信令传输模式 + - [x] 支持 UDP 和 TCP 两种国标信令传输模式 - [ ] 支持 UDP 和 TCP 被动,TCP 主动 三种国标流传输模式 - - [ ] 支持检索,通道筛选 + - [x] 支持检索,通道筛选 - [ ] 支持通道子目录查询 - [ ] 支持过滤音频,防止杂音影响观看 - - [ ] 支持国标网络校时 + - [x] 支持国标网络校时 - [x] 支持播放 H264 和 H265 - [ ] 报警信息处理,支持向前端推送报警信息 - [ ] 语音对讲 @@ -144,7 +145,7 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit - [ ] 设备目录订阅 - [ ] 设备目录通知处理 - [ ] 移动位置查询和显示 - - [ ] 支持手动添加设备和给设备设置单独的密码 + - [x] 支持手动添加设备和给设备设置单独的密码 - [ ] 支持平台对接接入 - [ ] 支持国标级联 - [ ] 国标通道向上级联 @@ -167,12 +168,12 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit - [ ] 支持同时级联到多个上级平台 - [ ] 支持自动配置ZLM媒体服务, 减少因配置问题所出现的问题 - [ ] 多流媒体节点,自动选择负载最低的节点使用 -- [ ] 支持启用udp多端口模式, 提高udp模式下媒体传输性能 +- [x] 支持启用 udp 多端口模式, 提高 udp 模式下媒体传输性能 - [x] 支持局域网/互联网/特殊网络环境部署 - [x] 支持 gowvp 与 zlm 分开部署,提升平台并发能力 - [ ] 支持拉流RTSP/RTMP,分发为各种流格式,或者推送到其他国标平台 - [ ] 支持推流RTSP/RTMP,分发为各种流格式,或者推送到其他国标平台 -- [ ] 支持推流鉴权 +- [x] 支持推流鉴权 - [x] 支持接口鉴权 - [ ] 云端录像,推流/代理/国标视频均可以录制在云端服务器,支持预览和下载 - [ ] 支持跨域请求,支持前后端分离部署 diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index 47184f2..9d3ebbb 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -31,7 +31,8 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error) webHookAPI := api.NewWebHookAPI(smsCore, mediaCore, bc) mediaAPI := api.NewMediaAPI(mediaCore, smsCore, bc) gb28181API := api.NewGb28181API(db, uniqueidCore) - server := gbs.NewServer() + gb28181 := api.NewGB28181(db, uniqueidCore) + server, cleanup := gbs.NewServer(bc, gb28181) usecase := &api.Usecase{ Conf: bc, DB: db, @@ -45,5 +46,6 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error) } handler := api.NewHTTPHandler(usecase) return handler, func() { + cleanup() }, nil } diff --git a/configs/config.toml b/configs/config.toml index 1a56c11..24679a2 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -18,11 +18,17 @@ version = 1 ConnMaxLifetime = '6h0m0s' SlowThreshold = '200ms' +[Sip] + Port = 15062 + ID = "3402000000200000001" + Domain = "3402000000" + Password = "12345678" + [Media] IP = "127.0.0.1" HTTPPort = 8080 Secret = "s1kPE7bzqKeHUaVcp8dCA0jeB8yxyFq4" - WebHookIP = "192.168.10.28" + WebHookIP = "host.docker.internal" RTPPortRange = "20000,20500" [Log] diff --git a/go.mod b/go.mod index 828bfe2..ba129ef 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( golang.org/x/arch v0.13.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect - golang.org/x/net v0.34.0 + golang.org/x/net v0.34.0 // indirect golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 diff --git a/go.sum b/go.sum index 907a3bd..f1a3384 100644 --- a/go.sum +++ b/go.sum @@ -47,7 +47,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= -github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/internal/conf/config.go b/internal/conf/config.go index 282d81e..a3b4d95 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -62,7 +62,7 @@ type SIP struct { Port int `comment:"服务监听的 tcp/udp 端口号"` ID string `comment:"gb/t28181 20 位国标 ID"` Domain string - Password int `comment:"注册密码"` + Password string `comment:"注册密码"` } type Media struct { diff --git a/internal/core/bz/param.go b/internal/core/bz/param.go index 43db866..46217b0 100644 --- a/internal/core/bz/param.go +++ b/internal/core/bz/param.go @@ -1,6 +1,8 @@ package bz const ( - IDPrefixGB = "gb" - IDPrefixRTMP = "r" // rtmp ID 前缀,取 rtmp 首字母 + IDPrefixGB = "gb" // 国标设备 + IDPrefixGBChannel = "ch" // 国标通道 id 前缀 + IDPrefixRTMP = "m" // rtmp ID 前缀,取 rtmp 中的 m,不好记但是清晰 + IDPrefixRTSP = "s" // rtsp ID 前缀,取 rtsp 中的 s,不好记但是清晰 ) diff --git a/internal/core/gb28181/channel.go b/internal/core/gb28181/channel.go index e760770..e07628f 100755 --- a/internal/core/gb28181/channel.go +++ b/internal/core/gb28181/channel.go @@ -4,7 +4,9 @@ package gb28181 import ( "context" "log/slog" + "strings" + "github.com/gowvp/gb28181/internal/core/bz" "github.com/ixugo/goweb/pkg/orm" "github.com/ixugo/goweb/pkg/web" "github.com/jinzhu/copier" @@ -22,7 +24,21 @@ type ChannelStorer interface { // FindChannel Paginated search func (c *Core) FindChannel(ctx context.Context, in *FindChannelInput) ([]*Channel, int64, error) { items := make([]*Channel, 0) - total, err := c.store.Channel().Find(ctx, &items, in) + + query := orm.NewQuery(1) + query.OrderBy("channel_id ASC") + if in.DeviceID != "" { + query.Where("device_id = ?", in.DeviceID) + } + if in.Key != "" { + if strings.HasPrefix(in.Key, bz.IDPrefixGBChannel) { + query.Where("id=?", in.Key) + } else { + query.Where("channel_id like ? OR name like ?", "%"+in.Key+"%", "%"+in.Key+"%") + } + } + + total, err := c.store.Channel().Find(ctx, &items, in, query.Encode()...) if err != nil { return nil, 0, web.ErrDB.Withf(`Find err[%s]`, err.Error()) } diff --git a/internal/core/gb28181/channel.model.go b/internal/core/gb28181/channel.model.go index fa7e47b..1d74a1d 100755 --- a/internal/core/gb28181/channel.model.go +++ b/internal/core/gb28181/channel.model.go @@ -3,12 +3,13 @@ package gb28181 // Channel domain model type Channel struct { - ID string `gorm:"primaryKey" json:"id"` - DeviceID string `gorm:"column:device_id;notNull;default:'';comment:国标编码" json:"device_id"` // 国标编码 - Name string `gorm:"column:name;notNull;default:'';comment:通道名称" json:"name"` // 通道名称 - PTZType int `gorm:"column:ptztype;notNull;default:0;comment:云台类型" json:"ptztype"` // 云台类型 - IsOnline bool `gorm:"column:is_online;notNull;default:FALSE;comment:是否在线" json:"is_online"` // 是否在线 - Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"` + ID string `gorm:"primaryKey" json:"id"` + DeviceID string `gorm:"column:device_id;index;notNull;default:'';comment:国标编码" json:"device_id"` // 国标编码 + ChannelID string `gorm:"column:channel_id;index;notNull;default:'';comment:国标编码" json:"channel_id"` // 国标编码 + Name string `gorm:"column:name;notNull;default:'';comment:通道名称" json:"name"` // 通道名称 + PTZType int `gorm:"column:ptztype;notNull;default:0;comment:云台类型" json:"ptztype"` // 云台类型 + IsOnline bool `gorm:"column:is_online;notNull;default:FALSE;comment:是否在线" json:"is_online"` // 是否在线 + Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"` } // TableName database table name diff --git a/internal/core/gb28181/channel.param.go b/internal/core/gb28181/channel.param.go index f657865..fdb38c7 100755 --- a/internal/core/gb28181/channel.param.go +++ b/internal/core/gb28181/channel.param.go @@ -5,11 +5,11 @@ import "github.com/ixugo/goweb/pkg/web" type FindChannelInput struct { web.PagerFilter - DeviceID string `form:"device_id"` // 国标编码 - Name string `form:"name"` // 通道名称 - PTZType int `form:"ptztype"` // 云台类型 - IsOnline bool `form:"is_online"` // 是否在线 - Ext DeviceExt `form:"ext"` + DeviceID string `form:"device_id"` // 国标编码 + Key string `form:"key"` // 名称/国标编码 模糊搜索,id 精确搜索 + // Name string `form:"name"` // 通道名称 + // PTZType int `form:"ptztype"` // 云台类型 + IsOnline bool `form:"is_online"` // 是否在线 } type EditChannelInput struct { diff --git a/internal/core/gb28181/device.model.go b/internal/core/gb28181/device.model.go index cced380..497d78e 100755 --- a/internal/core/gb28181/device.model.go +++ b/internal/core/gb28181/device.model.go @@ -25,6 +25,7 @@ type Device struct { CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"` + Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"` Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性 } @@ -39,3 +40,8 @@ func (d Device) Check() error { } return nil } + +func (d *Device) init(id, deviceID string) { + d.ID = id + d.DeviceID = deviceID +} diff --git a/internal/core/gb28181/gbs.go b/internal/core/gb28181/gbs.go new file mode 100644 index 0000000..afaffcb --- /dev/null +++ b/internal/core/gb28181/gbs.go @@ -0,0 +1,92 @@ +package gb28181 + +import ( + "context" + + "github.com/gowvp/gb28181/internal/core/bz" + "github.com/gowvp/gb28181/internal/core/uniqueid" + "github.com/ixugo/goweb/pkg/orm" +) + +type GB28181 struct { + deviceStore DeviceStorer + channelStore ChannelStorer + uni uniqueid.Core +} + +func NewGB28181(ds DeviceStorer, cs ChannelStorer, uni uniqueid.Core) GB28181 { + return GB28181{ + deviceStore: ds, + channelStore: cs, + uni: uni, + } +} + +func (g GB28181) GetDeviceByDeviceID(deviceID string) (*Device, error) { + ctx := context.TODO() + var d Device + if err := g.deviceStore.Get(ctx, &d, orm.Where("device_id=?", deviceID)); err != nil { + if !orm.IsErrRecordNotFound(err) { + return nil, err + } + d.init(g.uni.UniqueID(bz.IDPrefixGB), deviceID) + if err := g.deviceStore.Add(ctx, &d); err != nil { + return nil, err + } + } + return &d, nil +} + +func (g GB28181) Logout(deviceID string, changeFn func(*Device)) error { + var d Device + if err := g.deviceStore.Edit(context.TODO(), &d, func(d *Device) { + changeFn(d) + }, orm.Where("device_id=?", deviceID)); err != nil { + return err + } + + return nil +} + +func (g GB28181) Login(deviceID string, changeFn func(*Device)) error { + var d Device + if err := g.deviceStore.Edit(context.TODO(), &d, func(d *Device) { + changeFn(d) + }, orm.Where("device_id=?", deviceID)); err != nil { + return err + } + + return nil +} + +func (g GB28181) Edit(deviceID string, changeFn func(*Device)) error { + var d Device + if err := g.deviceStore.Edit(context.TODO(), &d, func(d *Device) { + changeFn(d) + }, orm.Where("device_id=?", deviceID)); err != nil { + return err + } + + return nil +} + +func (g GB28181) SaveChannels(channels []*Channel) error { + if len(channels) <= 0 { + return nil + } + var dev Device + g.deviceStore.Edit(context.TODO(), &dev, func(d *Device) { + d.Channels = len(channels) + }, orm.Where("device_id=?", channels[0].DeviceID)) + + for _, channel := range channels { + var ch Channel + if err := g.channelStore.Edit(context.TODO(), &ch, func(c *Channel) { + c.IsOnline = channel.IsOnline + }, orm.Where("device_id = ? AND channel_id = ?", channel.DeviceID, channel.ChannelID)); err != nil { + channel.ID = g.uni.UniqueID(bz.IDPrefixGBChannel) + g.channelStore.Add(context.TODO(), channel) + } + } + return nil +} diff --git a/internal/core/gb28181/model.go b/internal/core/gb28181/model.go index 87305a2..0223b0c 100755 --- a/internal/core/gb28181/model.go +++ b/internal/core/gb28181/model.go @@ -1,16 +1,26 @@ // Code generated by gowebx, DO AVOID EDIT. package gb28181 -import "github.com/ixugo/goweb/pkg/orm" +import ( + "database/sql/driver" + "encoding/json" + + "github.com/ixugo/goweb/pkg/orm" +) // DeviceExt domain model type DeviceExt struct { Manufacturer string `json:"manufacturer"` // 生产厂商 Model string `json:"model"` // 型号 Firmware string `json:"firmware"` // 固件版本 + Name string `json:"name"` // 设备名 } // Scan implements orm.Scaner. func (i *DeviceExt) Scan(input interface{}) error { return orm.JsonUnmarshal(input, i) } + +func (i DeviceExt) Value() (driver.Value, error) { + return json.Marshal(i) +} diff --git a/internal/web/api/api.go b/internal/web/api/api.go index fcc0fad..396121f 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -2,7 +2,6 @@ package api import ( "expvar" - "fmt" "log/slog" "net/http" "path/filepath" @@ -14,8 +13,6 @@ import ( "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" - "github.com/gowvp/gb28181/internal/core/bz" - "github.com/gowvp/gb28181/internal/core/media" "github.com/gowvp/gb28181/plugin/stat" "github.com/gowvp/gb28181/plugin/stat/statapi" "github.com/ixugo/goweb/pkg/system" @@ -69,67 +66,9 @@ func setupRouter(r *gin.Engine, uc *Usecase) { registerZLMWebhookAPI(r, uc.WebHookAPI) // TODO: 待增加鉴权 registerMediaAPI(r, uc.MediaAPI) - registerGb28181(r, uc.GB28181API) - // TODO: 临时播放接口,待重构 - r.POST("/channels/:id/play", web.WarpH(func(c *gin.Context, _ *struct{}) (*playOutput, error) { - channelID := c.Param("id") - - // TODO: 目前仅开发到 rtsp,待扩展 rtsp/gb 等 - if !strings.HasPrefix(channelID, bz.IDPrefixRTMP) { - return nil, web.ErrNotFound.Msg("不支持的播放通道") - } - - push, err := uc.MediaAPI.mediaCore.GetStreamPush(c.Request.Context(), channelID) - if err != nil { - return nil, err - } - if push.Status != media.StatusPushing { - return nil, web.ErrNotFound.Msg("未推流") - } - - svr, err := uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), push.MediaServerID) - if err != nil { - return nil, err - } - - stream := push.App + "/" + push.Stream - host := c.Request.Host - if l := strings.Split(c.Request.Host, ":"); len(l) == 2 { - host = l[0] - } - var session string - if !push.IsAuthDisabled && push.Session != "" { - session = "session=" + push.Session - } - - // 播放规则 - // https://github.com/zlmediakit/ZLMediaKit/wiki/%E6%92%AD%E6%94%BEurl%E8%A7%84%E5%88%99 - return &playOutput{ - App: push.App, - Stream: push.Stream, - Items: []streamAddrItem{ - { - Label: "默认线路", - WSFLV: fmt.Sprintf("ws://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session, - HTTPFLV: fmt.Sprintf("http://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session, - RTMP: fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session, - RTSP: fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session, - WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTP, push.App, push.Stream) + "&" + session, - HLS: fmt.Sprintf("http://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTP, stream) + "?" + session, - }, - { - Label: "SSL 线路", - WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, - HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, - RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session, - RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session, - WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, push.App, push.Stream) + "&" + session, - HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session, - }, - }, - }, nil - })) + uc.GB28181API.uc = uc + registerGB28181(r, uc.GB28181API) } type playOutput struct { diff --git a/internal/web/api/gb28181.go b/internal/web/api/gb28181.go index 8e730e3..d41fb39 100755 --- a/internal/web/api/gb28181.go +++ b/internal/web/api/gb28181.go @@ -2,26 +2,31 @@ package api import ( + "fmt" "strconv" + "strings" "github.com/gin-gonic/gin" + "github.com/gowvp/gb28181/internal/core/bz" "github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db" + "github.com/gowvp/gb28181/internal/core/media" "github.com/gowvp/gb28181/internal/core/uniqueid" "github.com/ixugo/goweb/pkg/web" "gorm.io/gorm" ) -type Gb28181API struct { +type GB28181API struct { gb28181Core gb28181.Core + uc *Usecase } -func NewGb28181API(db *gorm.DB, uni uniqueid.Core) Gb28181API { +func NewGb28181API(db *gorm.DB, uni uniqueid.Core) GB28181API { core := gb28181.NewCore(gb28181db.NewDB(db).AutoMigrate(true), uni) - return Gb28181API{gb28181Core: core} + return GB28181API{gb28181Core: core} } -func registerGb28181(g gin.IRouter, api Gb28181API, handler ...gin.HandlerFunc) { +func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc) { { group := g.Group("/devices", handler...) group.GET("", web.WarpH(api.findDevice)) @@ -34,61 +39,129 @@ func registerGb28181(g gin.IRouter, api Gb28181API, handler ...gin.HandlerFunc) { group := g.Group("/channels", handler...) group.GET("", web.WarpH(api.findChannel)) - group.GET("/:id", web.WarpH(api.getChannel)) group.PUT("/:id", web.WarpH(api.editChannel)) - group.POST("", web.WarpH(api.addChannel)) - group.DELETE("/:id", web.WarpH(api.delChannel)) + group.POST("/:id/play", web.WarpH(api.play)) + // group.GET("/:id", web.WarpH(api.getChannel)) + // group.POST("", web.WarpH(api.addChannel)) + // group.DELETE("/:id", web.WarpH(api.delChannel)) } } // >>> device >>>>>>>>>>>>>>>>>>>> -func (a Gb28181API) findDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) { +func (a GB28181API) findDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) { items, total, err := a.gb28181Core.FindDevice(c.Request.Context(), in) return gin.H{"items": items, "total": total}, err } -func (a Gb28181API) getDevice(c *gin.Context, _ *struct{}) (any, error) { +func (a GB28181API) getDevice(c *gin.Context, _ *struct{}) (any, error) { deviceID, _ := strconv.Atoi(c.Param("id")) return a.gb28181Core.GetDevice(c.Request.Context(), deviceID) } -func (a Gb28181API) editDevice(c *gin.Context, in *gb28181.EditDeviceInput) (any, error) { +func (a GB28181API) editDevice(c *gin.Context, in *gb28181.EditDeviceInput) (any, error) { deviceID := c.Param("id") return a.gb28181Core.EditDevice(c.Request.Context(), in, deviceID) } -func (a Gb28181API) addDevice(c *gin.Context, in *gb28181.AddDeviceInput) (any, error) { +func (a GB28181API) addDevice(c *gin.Context, in *gb28181.AddDeviceInput) (any, error) { return a.gb28181Core.AddDevice(c.Request.Context(), in) } -func (a Gb28181API) delDevice(c *gin.Context, _ *struct{}) (any, error) { +func (a GB28181API) delDevice(c *gin.Context, _ *struct{}) (any, error) { deviceID := c.Param("id") return a.gb28181Core.DelDevice(c.Request.Context(), deviceID) } // >>> channel >>>>>>>>>>>>>>>>>>>> -func (a Gb28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) { +func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) { items, total, err := a.gb28181Core.FindChannel(c.Request.Context(), in) return gin.H{"items": items, "total": total}, err } -func (a Gb28181API) getChannel(c *gin.Context, _ *struct{}) (any, error) { +func (a GB28181API) getChannel(c *gin.Context, _ *struct{}) (any, error) { channelID, _ := strconv.Atoi(c.Param("id")) return a.gb28181Core.GetChannel(c.Request.Context(), channelID) } -func (a Gb28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (any, error) { +func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (any, error) { channelID, _ := strconv.Atoi(c.Param("id")) return a.gb28181Core.EditChannel(c.Request.Context(), in, channelID) } -func (a Gb28181API) addChannel(c *gin.Context, in *gb28181.AddChannelInput) (any, error) { +func (a GB28181API) addChannel(c *gin.Context, in *gb28181.AddChannelInput) (any, error) { return a.gb28181Core.AddChannel(c.Request.Context(), in) } -func (a Gb28181API) delChannel(c *gin.Context, _ *struct{}) (any, error) { +func (a GB28181API) delChannel(c *gin.Context, _ *struct{}) (any, error) { channelID, _ := strconv.Atoi(c.Param("id")) return a.gb28181Core.DelChannel(c.Request.Context(), channelID) } + +func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { + channelID := c.Param("id") + + // TODO: 目前仅开发到 rtmp/gb28181,待扩展 rtsp... 等 + if !strings.HasPrefix(channelID, bz.IDPrefixRTMP) || !strings.HasPrefix(channelID, bz.IDPrefixGBChannel) { + return nil, web.ErrNotFound.Msg("不支持的播放通道") + } + + // 国标逻辑 + if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) { + // a.uc.SipServer. + } + + push, err := a.uc.MediaAPI.mediaCore.GetStreamPush(c.Request.Context(), channelID) + if err != nil { + return nil, err + } + if push.Status != media.StatusPushing { + return nil, web.ErrNotFound.Msg("未推流") + } + + svr, err := a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), push.MediaServerID) + if err != nil { + return nil, err + } + + stream := push.App + "/" + push.Stream + host := c.Request.Host + if l := strings.Split(c.Request.Host, ":"); len(l) == 2 { + host = l[0] + } + var session string + if !push.IsAuthDisabled && push.Session != "" { + session = "session=" + push.Session + } + + // 播放规则 + // https://github.com/zlmediakit/ZLMediaKit/wiki/%E6%92%AD%E6%94%BEurl%E8%A7%84%E5%88%99 + return &playOutput{ + App: push.App, + Stream: push.Stream, + Items: []streamAddrItem{ + { + Label: "默认线路", + WSFLV: fmt.Sprintf("ws://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session, + HTTPFLV: fmt.Sprintf("http://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session, + RTMP: fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session, + RTSP: fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session, + WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTP, push.App, push.Stream) + "&" + session, + HLS: fmt.Sprintf("http://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTP, stream) + "?" + session, + }, + { + Label: "SSL 线路", + WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, + HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, + RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session, + RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session, + WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, push.App, push.Stream) + "&" + session, + HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session, + }, + }, + }, nil +} + +func (uc *Usecase) play(channelID string) { +} diff --git a/internal/web/api/provider.go b/internal/web/api/provider.go index d43e93e..22f8414 100644 --- a/internal/web/api/provider.go +++ b/internal/web/api/provider.go @@ -7,6 +7,8 @@ import ( "github.com/gin-gonic/gin" "github.com/google/wire" "github.com/gowvp/gb28181/internal/conf" + "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db" "github.com/gowvp/gb28181/internal/core/media" "github.com/gowvp/gb28181/internal/core/media/store/mediadb" "github.com/gowvp/gb28181/internal/core/uniqueid" @@ -31,6 +33,7 @@ var ( NewMediaCore, NewMediaAPI, gbs.NewServer, NewGb28181API, + NewGB28181, ) ) @@ -42,7 +45,7 @@ type Usecase struct { WebHookAPI WebHookAPI UniqueID uniqueid.Core MediaAPI MediaAPI - GB28181API Gb28181API + GB28181API GB28181API SipServer *gbs.Server } @@ -97,3 +100,11 @@ func NewUniqueID(db *gorm.DB) uniqueid.Core { func NewMediaCore(db *gorm.DB, uni uniqueid.Core) media.Core { return media.NewCore(mediadb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate), uni) } + +func NewGB28181(db *gorm.DB, uni uniqueid.Core) gb28181.GB28181 { + return gb28181.NewGB28181( + gb28181db.NewDevice(db), + gb28181db.NewChannel(db), + uni, + ) +} diff --git a/pkg/gbs/catalog.go b/pkg/gbs/catalog.go new file mode 100644 index 0000000..314a12d --- /dev/null +++ b/pkg/gbs/catalog.go @@ -0,0 +1,75 @@ +package gbs + +import ( + "encoding/xml" + "log/slog" + + "github.com/gowvp/gb28181/pkg/gbs/sip" +) + +// MessageDeviceListResponse 设备明细列表返回结构 +type MessageDeviceListResponse struct { + XMLName xml.Name `xml:"Response"` + CmdType string `xml:"CmdType"` + SN int `xml:"SN"` + DeviceID string `xml:"DeviceID"` + SumNum int `xml:"SumNum"` + Item []Channels `xml:"DeviceList>Item"` +} + +func (g GB28181API) sipMessageCatalog(ctx *sip.Context) { + var msg MessageDeviceListResponse + if err := sip.XMLDecode(ctx.Request.Body(), &msg); err != nil { + slog.Error("Message Unmarshal xml", "err", err) + ctx.String(400, "xml err") + return + } + if msg.SumNum < 0 { + ctx.String(200, "OK") + return + } + + for _, d := range msg.Item { + d.DeviceID = msg.DeviceID + g.catalog.Write(&sip.CollectorMsg[Channels]{ + Key: d.DeviceID, + Data: &d, + Total: msg.SumNum, + }) + + // channel := Channels{ChannelID: d.ChannelID, DeviceID: message.DeviceID} + // if err := db.Get(db.DBClient, &channel); err == nil { + // channel.Active = time.Now().Unix() + // channel.URIStr = fmt.Sprintf("sip:%s@%s", d.ChannelID, _sysinfo.Region) + // channel.Status = transDeviceStatus(d.Status) + // channel.Name = d.Name + // channel.Manufacturer = d.Manufacturer + // channel.Model = d.Model + // channel.Owner = d.Owner + // channel.CivilCode = d.CivilCode + // // Address ip地址 + // channel.Address = d.Address + // channel.Parental = d.Parental + // channel.SafetyWay = d.SafetyWay + // channel.RegisterWay = d.RegisterWay + // channel.Secrecy = d.Secrecy + // db.Save(db.DBClient, &channel) + // go notify(notifyChannelsActive(channel)) + // } else { + // // logrus.Infoln("deviceid not found,deviceid:", d.DeviceID, "pdid:", message.DeviceID, "err", err) + // } + } + + ctx.String(200, "OK") +} + +// QueryCatalog 获取注册设备包含的列表 +func (g GB28181API) QueryCatalog(ctx *sip.Context) { + _, err := ctx.SendRequest(sip.MethodMessage, sip.GetCatalogXML(ctx.DeviceID)) + if err != nil { + slog.Error("sipCatalog", "err", err) + return + } + g.catalog.Run(ctx.DeviceID) + g.catalog.Wait(ctx.DeviceID) +} diff --git a/pkg/gbs/devices.go b/pkg/gbs/devices.go index 57629e1..bf9440e 100644 --- a/pkg/gbs/devices.go +++ b/pkg/gbs/devices.go @@ -1,7 +1,6 @@ package gbs import ( - "encoding/xml" "net" "strings" @@ -12,7 +11,7 @@ import ( var ( // sip服务用户信息 _serverDevices Devices - srv *sip.Server + svr *sip.Server ) // Devices NVR 设备信息 @@ -192,114 +191,6 @@ func parserDevicesFromReqeust(req *sip.Request) (Devices, bool) { return u, true } -// 获取设备信息(注册设备) -func sipDeviceInfo(to Devices) { - hb := sip.NewHeaderBuilder().SetTo(to.addr).SetFrom(_serverDevices.addr).AddVia(&sip.ViaHop{ - Params: sip.NewParams().Add("branch", sip.String{Str: sip.GenerateBranch()}), - }).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage) - req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetDeviceInfoXML(to.DeviceID)) - req.SetDestination(to.source) - tx, err := srv.Request(req) - if err != nil { - // logrus.Warnln("sipDeviceInfo error,", err) - return - } - _, err = sipResponse(tx) - if err != nil { - // logrus.Warnln("sipDeviceInfo response error,", err) - return - } -} - -// sipCatalog 获取注册设备包含的列表 -func sipCatalog(to Devices) { - hb := sip.NewHeaderBuilder().SetTo(to.addr).SetFrom(_serverDevices.addr).AddVia(&sip.ViaHop{ - Params: sip.NewParams().Add("branch", sip.String{Str: sip.GenerateBranch()}), - }).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage) - req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetCatalogXML(to.DeviceID)) - req.SetDestination(to.source) - tx, err := srv.Request(req) - if err != nil { - // logrus.Warnln("sipCatalog error,", err) - return - } - _, err = sipResponse(tx) - if err != nil { - // logrus.Warnln("sipCatalog response error,", err) - return - } -} - -// MessageDeviceInfoResponse 主设备明细返回结构 -type MessageDeviceInfoResponse struct { - CmdType string `xml:"CmdType"` - SN int `xml:"SN"` - DeviceID string `xml:"DeviceID"` - DeviceType string `xml:"DeviceType"` - Manufacturer string `xml:"Manufacturer"` - Model string `xml:"Model"` - Firmware string `xml:"Firmware"` -} - -func sipMessageDeviceInfo(u Devices, body []byte) error { - message := &MessageDeviceInfoResponse{} - if err := sip.XMLDecode([]byte(body), message); err != nil { - // logrus.Errorln("sipMessageDeviceInfo Unmarshal xml err:", err, "body:", body) - return err - } - // db.UpdateAll(db.DBClient, new(Devices), db.M{"deviceid=?": u.DeviceID}, Devices{ - // Model: message.Model, - // DeviceType: message.DeviceType, - // Firmware: message.Firmware, - // Manufacturer: message.Manufacturer, - // }) - return nil -} - -// MessageDeviceListResponse 设备明细列表返回结构 -type MessageDeviceListResponse struct { - XMLName xml.Name `xml:"Response"` - CmdType string `xml:"CmdType"` - SN int `xml:"SN"` - DeviceID string `xml:"DeviceID"` - SumNum int `xml:"SumNum"` - Item []Channels `xml:"DeviceList>Item"` -} - -func sipMessageCatalog(u Devices, body []byte) error { - message := &MessageDeviceListResponse{} - if err := sip.XMLDecode(body, message); err != nil { - // logrus.Errorln("Message Unmarshal xml err:", err, "body:", string(body)) - return err - } - if message.SumNum > 0 { - // for _, d := range message.Item { - // channel := Channels{ChannelID: d.ChannelID, DeviceID: message.DeviceID} - // if err := db.Get(db.DBClient, &channel); err == nil { - // channel.Active = time.Now().Unix() - // channel.URIStr = fmt.Sprintf("sip:%s@%s", d.ChannelID, _sysinfo.Region) - // channel.Status = transDeviceStatus(d.Status) - // channel.Name = d.Name - // channel.Manufacturer = d.Manufacturer - // channel.Model = d.Model - // channel.Owner = d.Owner - // channel.CivilCode = d.CivilCode - // // Address ip地址 - // channel.Address = d.Address - // channel.Parental = d.Parental - // channel.SafetyWay = d.SafetyWay - // channel.RegisterWay = d.RegisterWay - // channel.Secrecy = d.Secrecy - // db.Save(db.DBClient, &channel) - // go notify(notifyChannelsActive(channel)) - // } else { - // // logrus.Infoln("deviceid not found,deviceid:", d.DeviceID, "pdid:", message.DeviceID, "err", err) - // } - // } - } - return nil -} - var deviceStatusMap = map[string]string{ "ON": m.DeviceStatusON, "OK": m.DeviceStatusON, diff --git a/pkg/gbs/error.go b/pkg/gbs/error.go new file mode 100644 index 0000000..0c1d5c2 --- /dev/null +++ b/pkg/gbs/error.go @@ -0,0 +1,8 @@ +package gbs + +import "errors" + +var ( + ErrXMLDecode = errors.New("xml decode error") + ErrDatabase = errors.New("database error") +) diff --git a/pkg/gbs/handler.go b/pkg/gbs/handler.go index 186faa5..b899404 100644 --- a/pkg/gbs/handler.go +++ b/pkg/gbs/handler.go @@ -1,69 +1,103 @@ package gbs import ( - "net/http" - "github.com/gowvp/gb28181/pkg/gbs/sip" ) -// MessageReceive 接收到的请求数据最外层,主要用来判断数据类型 -type MessageReceive struct { - CmdType string `xml:"CmdType"` - SN int `xml:"SN"` -} +func (g GB28181API) handlerMessage(ctx *sip.Context) { + // req := ctx.Request + // tx := ctx.Tx -func (g GB28181API) handlerMessage(req *sip.Request, tx *sip.Transaction) { - u, ok := parserDevicesFromReqeust(req) - if !ok { - // 未解析出来源用户返回错误 - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) - return - } - // 判断是否存在body数据 - if len, have := req.ContentLength(); !have || len.Equals(0) { - // 不存在就直接返回的成功 - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) - return - } - body := req.Body() - message := &MessageReceive{} + // case "Catalog": + // // 设备列表 + // sipMessageCatalog(u, body) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // return + // case "Keepalive": + // // heardbeat + // if err := sipMessageKeepalive(u, body); err == nil { + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // // 心跳后同步注册设备列表信息 + // // sipCatalog(u) + // return + // } + // case "RecordInfo": + // // 设备音视频文件列表 + // sipMessageRecordInfo(u, body) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // case "DeviceInfo": + // // 主设备信息 + // sipMessageDeviceInfo(u, body) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // return + // } + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) + // } - if err := sip.XMLDecode(body, message); err != nil { - // logrus.Warnln("Message Unmarshal xml err:", err, "body:", string(body)) - // 有些body xml发送过来的不带encoding ,而且格式不是utf8的,导致xml解析失败,此处使用gbk转utf8后再次尝试xml解析 - body, err = sip.GbkToUtf8(body) - if err != nil { - // logrus.Errorln("message gbk to utf8 err", err) - } - if err := sip.XMLDecode(body, message); err != nil { - // logrus.Errorln("Message Unmarshal xml after gbktoutf8 err:", err, "body:", string(body)) - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) - return - } - } - switch message.CmdType { - case "Catalog": - // 设备列表 - sipMessageCatalog(u, body) - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) - return - case "Keepalive": - // heardbeat - if err := sipMessageKeepalive(u, body); err == nil { - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) - // 心跳后同步注册设备列表信息 - sipCatalog(u) - return - } - case "RecordInfo": - // 设备音视频文件列表 - sipMessageRecordInfo(u, body) - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) - case "DeviceInfo": - // 主设备信息 - sipMessageDeviceInfo(u, body) - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) - return - } - tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) + // func (g GB28181API) handlerMessage2(ctx *sip.Context) { + // req := ctx.Request + // tx := ctx.Tx + + // u, ok := parserDevicesFromReqeust(req) + // if !ok { + // // 未解析出来源用户返回错误 + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) + // return + // } + // // 判断是否存在body数据 + // if len, have := req.ContentLength(); !have || len.Equals(0) { + // // 不存在就直接返回的成功 + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // return + // } + // body := req.Body() + // message := &MessageReceive{} + + // if err := sip.XMLDecode(body, message); err != nil { + // // logrus.Warnln("Message Unmarshal xml err:", err, "body:", string(body)) + // // 有些body xml发送过来的不带encoding ,而且格式不是utf8的,导致xml解析失败,此处使用gbk转utf8后再次尝试xml解析 + // body, err = sip.GbkToUtf8(body) + // if err != nil { + // // logrus.Errorln("message gbk to utf8 err", err) + // } + // if err := sip.XMLDecode(body, message); err != nil { + // // logrus.Errorln("Message Unmarshal xml after gbktoutf8 err:", err, "body:", string(body)) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) + // return + // } + // } + // + // switch message.CmdType { + // case "Catalog": + // + // // 设备列表 + // sipMessageCatalog(u, body) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // return + // + // case "Keepalive": + // + // // heardbeat + // if err := sipMessageKeepalive(u, body); err == nil { + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // // 心跳后同步注册设备列表信息 + // sipCatalog(u) + // return + // } + // + // case "RecordInfo": + // + // // 设备音视频文件列表 + // sipMessageRecordInfo(u, body) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // + // case "DeviceInfo": + // + // // 主设备信息 + // sipMessageDeviceInfo(u, body) + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) + // return + // } + // + // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) } diff --git a/pkg/gbs/info.go b/pkg/gbs/info.go new file mode 100644 index 0000000..8422418 --- /dev/null +++ b/pkg/gbs/info.go @@ -0,0 +1,60 @@ +package gbs + +import ( + "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/pkg/gbs/sip" +) + +// 获取设备信息(注册设备) +func (g GB28181API) QueryDeviceInfo(ctx *sip.Context) { + tx, err := ctx.SendRequest(sip.MethodMessage, sip.GetDeviceInfoXML(ctx.DeviceID)) + if err != nil { + ctx.Log.Error("sipDeviceInfo", "err", err) + return + } + if _, err := sipResponse(tx); err != nil { + ctx.Log.Error("sipResponse", "err", err) + return + } +} + +// MessageDeviceInfoResponse 主设备明细返回结构 +type MessageDeviceInfoResponse struct { + CmdType string `xml:"CmdType"` + SN int `xml:"SN"` + DeviceID string `xml:"DeviceID"` + DeviceName string `xml:"DeviceName"` // 设备名 + DeviceType string `xml:"DeviceType"` + Manufacturer string `xml:"Manufacturer"` // 生产商 + Model string `xml:"Model"` // 设备型号 + Firmware string `xml:"Firmware"` // 固件版本 +} + +func (g GB28181API) sipMessageDeviceInfo(ctx *sip.Context) { + var msg MessageDeviceInfoResponse + if err := sip.XMLDecode(ctx.Request.Body(), &msg); err != nil { + ctx.Log.Error("sipMessageDeviceInfo", "err", err) + ctx.String(400, ErrXMLDecode.Error()) + return + } + + if err := g.store.Edit(ctx.DeviceID, func(d *gb28181.Device) { + d.Ext.Firmware = msg.Firmware + d.Ext.Manufacturer = msg.Manufacturer + d.Ext.Model = msg.Model + d.Ext.Name = msg.DeviceName + }); err != nil { + ctx.Log.Error("Edit", "err", err) + ctx.String(500, ErrDatabase.Error()) + return + } + + ctx.String(200, "OK") + + // db.UpdateAll(db.DBClient, new(Devices), db.M{"deviceid=?": u.DeviceID}, Devices{ + // Model: message.Model, + // DeviceType: message.DeviceType, + // Firmware: message.Firmware, + // Manufacturer: message.Manufacturer, + // }) +} diff --git a/pkg/gbs/keepalive.go b/pkg/gbs/keepalive.go index 0ac4782..d6d623d 100644 --- a/pkg/gbs/keepalive.go +++ b/pkg/gbs/keepalive.go @@ -1,9 +1,9 @@ package gbs import ( - "time" - + "github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/pkg/gbs/sip" + "github.com/ixugo/goweb/pkg/orm" // "github.com/panjjo/gosip/db" ) @@ -16,27 +16,30 @@ type MessageNotify struct { Info string `xml:"Info"` } -func sipMessageKeepalive(u Devices, body []byte) error { - message := &MessageNotify{} - if err := sip.XMLDecode(body, message); err != nil { - // logrus.Errorln("Message Unmarshal xml err:", err, "body:", string(body)) - return err +func (g GB28181API) sipMessageKeepalive(ctx *sip.Context) { + var msg MessageNotify + if err := sip.XMLDecode(ctx.Request.Body(), &msg); err != nil { + ctx.Log.Error("Message Unmarshal xml err", "err", err) + return } - device, ok := _activeDevices.Get(u.DeviceID) - if !ok { - device = Devices{DeviceID: u.DeviceID} - // if err := db.Get(db.DBClient, &device); err != nil { - // logrus.Warnln("Device Keepalive not found ", u.DeviceID, err) - // } + + // device, ok := _activeDevices.Get(ctx.DeviceID) + // if !ok { + // device = Devices{DeviceID: ctx.DeviceID} + // if err := db.Get(db.DBClient, &device); err != nil { + // logrus.Warnln("Device Keepalive not found ", u.DeviceID, err) + // } + // } + + if err := g.store.Edit(ctx.DeviceID, func(d *gb28181.Device) { + d.KeepaliveAt = orm.Now() + d.IsOnline = msg.Status == "OK" + }); err != nil { + ctx.Log.Error("keepalive", "err", err) } - if message.Status == "OK" { - device.ActiveAt = time.Now().Unix() - _activeDevices.Store(u.DeviceID, u) - } else { - device.ActiveAt = -1 - _activeDevices.Delete(u.DeviceID) - } - go notify(notifyDevicesAcitve(u.DeviceID, message.Status)) + + // _activeDevices.Store(u.DeviceID, u) + // go notify(notifyDevicesAcitve(u.DeviceID, message.Status)) // _, err := db.UpdateAll(db.DBClient, new(Devices), map[string]interface{}{"deviceid=?": u.DeviceID}, Devices{ // Host: u.Host, // Port: u.Port, @@ -47,5 +50,6 @@ func sipMessageKeepalive(u Devices, body []byte) error { // ActiveAt: device.ActiveAt, // }) // return err - return nil + // return nil + ctx.String(200, "OK") } diff --git a/pkg/gbs/m/config.go b/pkg/gbs/m/config.go index db25d47..fe9c843 100644 --- a/pkg/gbs/m/config.go +++ b/pkg/gbs/m/config.go @@ -46,7 +46,7 @@ type MediaServer struct { type SysInfo struct { // db.DBModel // Region 当前域 - Region string `json:"region" yaml:"region" mapstructure:"region"` + // Region string `json:"region" yaml:"region" mapstructure:"region"` // CID 通道id固定头部 CID string `json:"cid" yaml:"cid" mapstructure:"cid"` // CNUM 当前通道数 diff --git a/pkg/gbs/play.go b/pkg/gbs/play.go index ce5e34d..8dabc33 100644 --- a/pkg/gbs/play.go +++ b/pkg/gbs/play.go @@ -138,7 +138,7 @@ func sipPlayPush(data *Streams, channel Channels, device Devices) (*Streams, err req.SetDestination(device.source) req.AppendHeader(&sip.GenericHeader{HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:%s", channel.ChannelID, data.StreamID, _serverDevices.DeviceID, data.StreamID)}) req.SetRecipient(channel.addr.URI) - tx, err := srv.Request(req) + tx, err := svr.Request(req) if err != nil { // logrus.Warningln("sipPlayPush fail.id:", device.DeviceID, channel.ChannelID, "err:", err) return data, err @@ -192,7 +192,7 @@ func SipStopPlay(ssrc string) { user := u.(Devices) req := sip.NewRequestFromResponse(sip.MethodBYE, resp) req.SetDestination(user.source) - tx, err := srv.Request(req) + tx, err := svr.Request(req) if err != nil { // logrus.Warningln("sipStopPlay bye fail.id:", play.DeviceID, play.ChannelID, "err:", err) } diff --git a/pkg/gbs/record.go b/pkg/gbs/record.go index e826795..d48d438 100644 --- a/pkg/gbs/record.go +++ b/pkg/gbs/record.go @@ -30,7 +30,7 @@ func SipRecordList(to *Channels, start, end int64) (*Records, error) { }).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage) req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetRecordInfoXML(to.ChannelID, sn, start, end)) req.SetDestination(device.source) - tx, err := srv.Request(req) + tx, err := svr.Request(req) if err != nil { return nil, err } diff --git a/pkg/gbs/register.go b/pkg/gbs/register.go index a9a4d4d..6a9e41b 100644 --- a/pkg/gbs/register.go +++ b/pkg/gbs/register.go @@ -3,62 +3,133 @@ package gbs import ( "fmt" "net/http" + "strconv" + "strings" + "time" + "github.com/gowvp/gb28181/internal/conf" + "github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/pkg/gbs/sip" + "github.com/ixugo/goweb/pkg/orm" ) -type GB28181API struct{} +const ignorePassword = "#" + +type GB28181API struct { + cfg *conf.SIP + store gb28181.GB28181 + + catalog *sip.Collector[Channels] +} + +func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181) *GB28181API { + g := GB28181API{ + cfg: &cfg.Sip, + store: store, + catalog: sip.NewCollector[Channels](func(c1, c2 *Channels) bool { + return c1.ChannelID == c2.ChannelID + }), + } + go g.catalog.Start(func(s string, c []*Channels) { + out := make([]*gb28181.Channel, len(c)) + for i, ch := range c { + out[i] = &gb28181.Channel{ + DeviceID: s, + ChannelID: ch.ChannelID, + Name: ch.Name, + IsOnline: ch.Status == "OK", + Ext: gb28181.DeviceExt{ + Manufacturer: ch.Manufacturer, + Model: ch.Model, + }, + } + } + g.store.SaveChannels(out) + }) + return &g +} func (g GB28181API) handlerRegister(ctx *sip.Context) { - fromUser, ok := parserDevicesFromReqeust(ctx.Request) - if !ok { - return - } - - if len(fromUser.DeviceID) < 18 { + if len(ctx.DeviceID) < 18 { ctx.String(http.StatusBadRequest, "device id too short") return } - // 判断是否存在授权字段 - if hdrs := ctx.Request.GetHeaders("Authorization"); len(hdrs) > 0 { - // user := Devices{DeviceID: fromUser.DeviceID} - // if err := db.Get(db.DBClient, &user); err == nil { - // if !user.Regist { - // // 如果数据库里用户未激活,替换user数据 - // // fromUser.ID = user.ID - // fromUser.Name = user.Name - // fromUser.PWD = user.PWD - // user = fromUser - // } - // user.addr = fromUser.addr - // authenticateHeader := hdrs[0].(*sip.GenericHeader) - // auth := sip.AuthFromValue(authenticateHeader.Contents) - // auth.SetPassword(user.PWD) - // auth.SetUsername(user.DeviceID) - // auth.SetMethod(string(req.Method())) - // auth.SetURI(auth.Get("uri")) - // if auth.CalcResponse() == auth.Get("response") { - // // 验证成功 - // // 记录活跃设备 - // user.source = fromUser.source - // user.addr = fromUser.addr - // _activeDevices.Store(user.DeviceID, user) - // if !user.Regist { - // // 第一次激活,保存数据库 - // user.Regist = true - // db.DBClient.Save(&user) - // // logrus.Infoln("new user regist,id:", user.DeviceID) - // } - // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) - // // 注册成功后查询设备信息,获取制作厂商等信息 - // go notify(notifyDevicesRegister(user)) - // go sipDeviceInfo(fromUser) - // return - // } - // } + dev, err := g.store.GetDeviceByDeviceID(ctx.DeviceID) + if err != nil { + ctx.Log.Error("GetDeviceByDeviceID", "err", err) + ctx.String(http.StatusInternalServerError, "server db error") + return } - resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil) - resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf("Digest nonce=\"%s\", algorithm=MD5, realm=\"%s\",qop=\"auth\"", sip.RandString(32), _sysinfo.Region)}) - ctx.Tx.Respond(resp) + + password := dev.Password + if password == "" { + password = g.cfg.Password + } + // 免鉴权 + if dev.Password == ignorePassword { + password = "" + } + if password != "" { + hdrs := ctx.Request.GetHeaders("Authorization") + if len(hdrs) == 0 { + resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil) + resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf("Digest nonce=\"%s\", algorithm=MD5, realm=\"%s\",qop=\"auth\"", sip.RandString(32), g.cfg.Domain)}) + _ = ctx.Tx.Respond(resp) + return + } + authenticateHeader := hdrs[0].(*sip.GenericHeader) + auth := sip.AuthFromValue(authenticateHeader.Contents) + auth.SetPassword(password) + auth.SetUsername(dev.DeviceID) + auth.SetMethod(ctx.Request.Method()) + auth.SetURI(auth.Get("uri")) + if auth.CalcResponse() != auth.Get("response") { + ctx.Log.Info("设备注册鉴权失败") + ctx.String(http.StatusUnauthorized, "wrong password") + return + } + } + + respFn := func() { + resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusOK, "OK", nil) + resp.AppendHeader(&sip.GenericHeader{ + HeaderName: "Date", + Contents: time.Now().Format("2006-01-02T15:04:05.000"), + }) + _ = ctx.Tx.Respond(resp) + } + + expire := ctx.GetHeader("Expires") + if expire == "0" { + ctx.Log.Info("设备注销") + g.logout(ctx.DeviceID, func(b *gb28181.Device) { + b.IsOnline = false + b.Address = ctx.Source.String() + }) + respFn() + return + } + g.login(ctx.DeviceID, func(b *gb28181.Device) { + b.IsOnline = true + b.Address = ctx.Source.String() + b.Trasnport = strings.ToUpper(ctx.Source.Network()) + b.RegisteredAt = orm.Now() + b.Expires, _ = strconv.Atoi(expire) + }) + + ctx.Log.Info("设备注册成功") + + respFn() + + g.QueryDeviceInfo(ctx) + g.QueryCatalog(ctx) +} + +func (g GB28181API) login(deviceID string, changeFn func(*gb28181.Device)) { + g.store.Login(deviceID, changeFn) +} + +func (g GB28181API) logout(deviceID string, changeFn func(*gb28181.Device)) { + g.store.Logout(deviceID, changeFn) } diff --git a/pkg/gbs/server.go b/pkg/gbs/server.go index 6fc49fd..cb86821 100644 --- a/pkg/gbs/server.go +++ b/pkg/gbs/server.go @@ -1,7 +1,6 @@ package gbs import ( - "context" "fmt" "net" "net/http" @@ -9,29 +8,42 @@ import ( "strconv" "sync" + "github.com/gowvp/gb28181/internal/conf" + "github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/pkg/gbs/m" "github.com/gowvp/gb28181/pkg/gbs/sip" ) type Server struct { *sip.Server + gb *GB28181API } -func NewServer() *Server { - api := GB28181API{} +func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181) (*Server, func()) { + api := NewGB28181API(cfg, store) - srv = sip.NewServer() - srv.Register(api.handlerRegister) - // srv.RegistHandler(sip.MethodMessage, api.handlerMessage) - // srv.Message("catalog", api.handlerMessage) - // srv.Message("keepalive", api.handlerMessage) - // srv.Message("RecordInfo", api.handlerMessage) - // srv.Message("DeviceInfo", api.handlerMessage) - go srv.ListenUDPServer("0.0.0.0:15060") - go srv.ListenTCPServer(context.TODO(), "0.0.0.0:15060") - return &Server{ - Server: srv, + uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", cfg.Sip.ID, cfg.Sip.Domain)) + from := sip.Address{ + DisplayName: sip.String{Str: "gowvp"}, + URI: &uri, + Params: sip.NewParams(), } + + svr = sip.NewServer(&from) + svr.Register(api.handlerRegister) + msg := svr.Message() + msg.Handle("Keepalive", api.sipMessageKeepalive) + msg.Handle("Catalog", api.sipMessageCatalog) + msg.Handle("DeviceInfo", api.sipMessageDeviceInfo) + + // msg.Handle("RecordInfo", api.handlerMessage) + + go svr.ListenUDPServer(fmt.Sprintf(":%d", cfg.Sip.Port)) + go svr.ListenTCPServer(fmt.Sprintf(":%d", cfg.Sip.Port)) + c := Server{ + Server: svr, + } + return &c, c.Close } func Start() { @@ -44,8 +56,8 @@ func Start() { LoadSYSInfo() - srv = sip.NewServer() - go srv.ListenUDPServer(config.UDP) + // svr = sip.NewServer() + // go svr.ListenUDPServer(config.UDP) } // MODDEBUG MODDEBUG @@ -97,14 +109,14 @@ func LoadSYSInfo() { // } m.MConfig.GB28181 = _sysinfo - uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", _sysinfo.LID, _sysinfo.Region)) + // uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", _sysinfo.LID, _sysinfo.Region)) _serverDevices = Devices{ DeviceID: _sysinfo.LID, - Region: _sysinfo.Region, + // Region: _sysinfo.Region, addr: &sip.Address{ DisplayName: sip.String{Str: "sipserver"}, - URI: &uri, - Params: sip.NewParams(), + // URI: &uri, + Params: sip.NewParams(), }, } @@ -140,3 +152,12 @@ func sipResponse(tx *sip.Transaction) (*sip.Response, error) { } return response, nil } + +// QueryCatalog 查询 catalog +func (s *Server) QueryCatalog(deviceID string) { + // TODO: query 查询不能直接传递 ctx,需要缓存目标信息 + s.gb.QueryCatalog(nil) +} + +func (s *Server) Play() { +} diff --git a/pkg/gbs/sip/collector.go b/pkg/gbs/sip/collector.go new file mode 100644 index 0000000..41abbb1 --- /dev/null +++ b/pkg/gbs/sip/collector.go @@ -0,0 +1,113 @@ +package sip + +import ( + "log/slog" + "slices" + "time" +) + +// Collector . +// 1. 收集器 +// 2. 分门别类 +// 3. 定时同步,超时删除,删除之前再同步一次 +// 4. 不会去重 +// 如何使用? +// 1. 通过 NewCatalogRecv 创建一个新的收集器 +// 2. s.createCh <- deviceID +// 3. s.catalog.msg <- &CollectorMsg[Channel]{Data: &c, Total: msg.SumNum, Key: msg.DeviceID} +type Collector[T any] struct { + data map[string]*Content[T] + msg chan *CollectorMsg[T] + createCh chan string + noRepeatFn NoRepeatFn[T] + observer *Observer +} + +func (c *Collector[T]) Run(key string) { + select { + case c.createCh <- key: + default: + } +} + +func (c *Collector[T]) Write(info *CollectorMsg[T]) { + c.msg <- info +} + +type CollectorMsg[T any] struct { + Key string + Data *T + Total int +} + +type NoRepeatFn[T any] func(*T, *T) bool + +// newCollector 创建一个新的收集器 +// noRepeatFn 用于提前去重,避免重复数据存储 +func NewCollector[T any](noRepeatFn NoRepeatFn[T]) *Collector[T] { + return &Collector[T]{ + data: make(map[string]*Content[T]), + msg: make(chan *CollectorMsg[T], 10), + createCh: make(chan string, 10), + noRepeatFn: noRepeatFn, + observer: NewObserver(), + } +} + +type Content[T any] struct { + lastUpdateAt time.Time + data []*T + total int +} + +// Wait 在执行 Start 以后,可以调用 Wait 等待 +func (c *Collector[T]) Wait(key string) { + c.observer.DefaultRegister(key) +} + +// Start 启动定时任务检查和保存数据 +func (c *Collector[T]) Start(save func(string, []*T)) { + fn := func(k string, data []*T) { + save(k, data) + c.observer.Notify(k) + } + + check := time.NewTicker(time.Second * 3) + defer check.Stop() + for { + select { + case <-check.C: + for k, v := range c.data { + if time.Since(v.lastUpdateAt) > 10*time.Second { + fn(k, v.data) + delete(c.data, k) + continue + } + if v.total > 0 && len(v.data) >= v.total { + fn(k, v.data) + delete(c.data, k) + continue + } + } + case v := <-c.createCh: + c.data[v] = &Content[T]{lastUpdateAt: time.Now(), data: make([]*T, 0, 2), total: -1} + case msg := <-c.msg: + data, exist := c.data[msg.Key] + if !exist { + slog.Debug("key 不存在或已过期", "key", msg.Key, "data", msg.Data) + continue + } + // 如果数据已存在且无重复,跳过该消息 + if slices.ContainsFunc(data.data, func(v *T) bool { + return c.noRepeatFn(v, msg.Data) + }) { + slog.Debug("catalog 发现重复数据", "key", msg.Key, "data", msg.Data) + continue + } + // 添加数据到对应的条目并更新最后更新时间和总量 + data.data = append(data.data, msg.Data) + data.lastUpdateAt = time.Now() + data.total = msg.Total + } + } +} diff --git a/pkg/gbs/sip/connection.go b/pkg/gbs/sip/connection.go index f9820fc..63829ac 100644 --- a/pkg/gbs/sip/connection.go +++ b/pkg/gbs/sip/connection.go @@ -125,7 +125,11 @@ func (conn *connection) Write(buf []byte) (int, error) { } func (conn *connection) WriteTo(buf []byte, raddr net.Addr) (num int, err error) { - num, err = conn.baseConn.(net.PacketConn).WriteTo(buf, raddr) + if conn.Network() == "tcp" { + num, err = conn.baseConn.Write(buf) + } else { + num, err = conn.baseConn.(net.PacketConn).WriteTo(buf, raddr) + } if err != nil { return num, NewError(err, conn.logKey, "writeTo", conn.baseConn.LocalAddr().String(), raddr.String()) } @@ -150,7 +154,7 @@ func (conn *connection) Close() error { } func (conn *connection) Network() string { - return strings.ToUpper(conn.baseConn.LocalAddr().Network()) + return conn.baseConn.LocalAddr().Network() } func (conn *connection) SetDeadline(t time.Time) error { diff --git a/pkg/gbs/sip/context.go b/pkg/gbs/sip/context.go index 762acf4..df0fc3b 100644 --- a/pkg/gbs/sip/context.go +++ b/pkg/gbs/sip/context.go @@ -1,37 +1,111 @@ package sip +import ( + "fmt" + "log/slog" + "math" + "net" + "strings" +) + +const abortIndex int8 = math.MaxInt8 >> 1 + type HandlerFunc func(*Context) type Context struct { Request *Request Tx *Transaction handlers []HandlerFunc - index int + index int8 cache map[string]any + + DeviceID string + Host string + Port string + + Source net.Addr + toAddr *Address + fromAddr *Address + + Log *slog.Logger + + svr *Server } func newContext(req *Request, tx *Transaction) *Context { - return &Context{ + c := Context{ Request: req, Tx: tx, cache: make(map[string]any), + Log: slog.Default(), + index: -1, } + if err := c.parserRequest(); err != nil { + slog.Error("parserRequest", "err", err) + } + return &c +} + +func (c *Context) parserRequest() error { + req := c.Request + header, ok := req.From() + if !ok { + return fmt.Errorf("req from is nil") + } + + if header.Address == nil { + return fmt.Errorf("header address is nil") + } + user := header.Address.User() + if user == nil { + return fmt.Errorf("address user is nil") + } + c.DeviceID = user.String() + c.Host = header.Address.Host() + via, ok := req.ViaHop() + if !ok { + return fmt.Errorf("via is nil") + } + c.Host = via.Host + c.Port = via.Port.String() + + c.Source = req.Source() + c.toAddr = NewAddressFromFromHeader(header) + + c.Log = slog.Default().With("deviceID", c.DeviceID, "host", c.Host) + return nil } func (c *Context) Next() { - l := len(c.handlers) - for { - c.handlers[c.index](c) - if c.index == -1 || c.index >= l { - return + c.index++ + for c.index < int8(len(c.handlers)) { + if fn := c.handlers[c.index]; fn != nil { + fn(c) } c.index++ } } +func (c *Context) GetHeader(key string) string { + headers := c.Request.GetHeaders(key) + if len(headers) > 0 { + header := headers[0] + splits := strings.Split(header.String(), ":") + if len(splits) == 2 { + return strings.TrimSpace(splits[1]) + } + } + return "" +} + func (c *Context) Abort() { - c.index = -1 + c.index = abortIndex +} + +func (c *Context) AbortString(status int, msg string) { + c.Abort() + c.String(status, msg) } func (c *Context) String(status int, msg string) { @@ -60,3 +134,14 @@ func (c *Context) GetMustInt(k string) int { } return 0 } + +func (c *Context) SendRequest(method string, body []byte) (*Transaction, error) { + hb := NewHeaderBuilder().SetTo(c.toAddr).SetFrom(c.fromAddr).AddVia(&ViaHop{ + Params: NewParams().Add("branch", String{Str: GenerateBranch()}), + }).SetContentType(&ContentTypeXML).SetMethod(method) + + req := NewRequest("", method, c.toAddr.URI, DefaultSipVersion, hb.Build(), body) + req.SetDestination(c.Source) + req.SetConnection(c.Request.conn) + return c.svr.Request(req) +} diff --git a/pkg/gbs/sip/group.go b/pkg/gbs/sip/group.go new file mode 100644 index 0000000..22f6002 --- /dev/null +++ b/pkg/gbs/sip/group.go @@ -0,0 +1,29 @@ +package sip + +type RouteGroup struct { + method string + middlewares []HandlerFunc + s *Server +} + +type MessageReceive struct { + CmdType string `xml:"CmdType"` + SN int `xml:"SN"` +} + +func newRouteGroup(method string, s *Server, ms ...HandlerFunc) *RouteGroup { + return &RouteGroup{ + method: method, + middlewares: ms, + s: s, + } +} + +func (g *RouteGroup) addGroup(pattern string, handler ...HandlerFunc) { + key := g.method + "-" + pattern + g.s.addRoute(key, append(g.middlewares, handler...)...) +} + +func (g *RouteGroup) Handle(pattern string, handler ...HandlerFunc) { + g.addGroup(pattern, handler...) +} diff --git a/pkg/gbs/sip/header.go b/pkg/gbs/sip/header.go index 2c9f6fc..1afd2d9 100644 --- a/pkg/gbs/sip/header.go +++ b/pkg/gbs/sip/header.go @@ -33,7 +33,7 @@ type HeadersBuilder struct { func NewHeaderBuilder() *HeadersBuilder { callID := CallID(RandString(32)) maxForwards := MaxForwards(70) - userAgent := UserAgentHeader("GoSIP") + userAgent := UserAgentHeader("GoWVP") return &HeadersBuilder{ protocol: "SIP", protocolVersion: "2.0", diff --git a/pkg/gbs/sip/message.go b/pkg/gbs/sip/message.go index 49276a3..e6dfa8f 100644 --- a/pkg/gbs/sip/message.go +++ b/pkg/gbs/sip/message.go @@ -24,7 +24,7 @@ const ( MethodRegister = "REGISTER" MethodOptions = "OPTIONS" // SUBSCRIBE = "SUBSCRIBE" - // NOTIFY = "NOTIFY" + MethodNotify = "NOTIFY" // REFER = "REFER" MethodInfo = "INFO" MethodMessage = "MESSAGE" @@ -80,6 +80,7 @@ type Message interface { SetSource(src net.Addr) Destination() net.Addr SetDestination(dest net.Addr) + SetConnection(Connection) IsCancel() bool IsAck() bool @@ -93,6 +94,8 @@ type message struct { body []byte source, dest net.Addr startLine func() string + + conn Connection `json:"-"` } // MessageID MessageID @@ -167,6 +170,10 @@ func (msg *message) SetSource(src net.Addr) { msg.source = src } +func (msg *message) SetConnection(conn Connection) { + msg.conn = conn +} + // Destination Destination func (msg *message) Destination() net.Addr { return msg.dest diff --git a/pkg/gbs/sip/obs.go b/pkg/gbs/sip/obs.go new file mode 100644 index 0000000..519cdc8 --- /dev/null +++ b/pkg/gbs/sip/obs.go @@ -0,0 +1,73 @@ +package sip + +import ( + "fmt" + "time" + + "github.com/ixugo/goweb/pkg/conc" +) + +// ObserverHandler 返回 true 表示完成任务 +type ObserverHandler func(deviceID string, args ...string) bool + +// Observer 观察者 +type Observer struct { + data conc.Map[string, ObserverHandler] +} + +// NewObserver 创建观察者 +func NewObserver() *Observer { + return &Observer{} +} + +// concRegister 异步注册观察者 +func (o *Observer) concRegister(deviceID string, handler ObserverHandler) { + o.data.Store(deviceID, handler) +} + +// Register 同步等待观察者完成任务 +func (o *Observer) Register(deviceID string, duration time.Duration, fn ObserverHandler) { + ch := make(chan struct{}, 1) + defer close(ch) + o.concRegister(deviceID, func(did string, args ...string) bool { + if fn(did, args...) { + ch <- struct{}{} + return true + } + return false + }) + // 等待通知或超时 + select { + // 收到通知 + case <-ch: + // 超时7秒 + case <-time.After(duration): + o.data.Delete(deviceID) + } +} + +// DefaultRegister 默认的注册行为 +func (o *Observer) DefaultRegister(deviceID string) { + key := fmt.Sprintf("%s:%d", deviceID, time.Now().UnixMilli()) + o.Register(key, 7*time.Second, func(did string, _ ...string) bool { + return deviceID == did + }) +} + +// RegisterWithTimeout 自定义等待时间 +func (o *Observer) RegisterWithTimeout(deviceID string, duration time.Duration) { + key := fmt.Sprintf("%s:%d", deviceID, time.Now().UnixMilli()) + o.Register(key, duration, func(did string, _ ...string) bool { + return deviceID == did + }) +} + +// Notify 通知观察者 +func (o *Observer) Notify(deviceID string, args ...string) { + o.data.Range(func(key string, fn ObserverHandler) bool { + if fn(deviceID, args...) { + o.data.Delete(key) + } + return true + }) +} diff --git a/pkg/gbs/sip/obs_test.go b/pkg/gbs/sip/obs_test.go new file mode 100644 index 0000000..a29083e --- /dev/null +++ b/pkg/gbs/sip/obs_test.go @@ -0,0 +1,20 @@ +package sip + +import ( + "fmt" + "testing" +) + +func TestObserver(t *testing.T) { + s := NewObserver() + go func() { + s.Notify("1") + s.Notify("2") + }() + var i int + s.data.Range(func(key string, value ObserverHandler) bool { + i++ + return true + }) + fmt.Println("i:", i) +} diff --git a/pkg/gbs/sip/parser.go b/pkg/gbs/sip/parser.go index 573d5e1..a664696 100644 --- a/pkg/gbs/sip/parser.go +++ b/pkg/gbs/sip/parser.go @@ -665,6 +665,7 @@ func (p *parser) start() { msg.SetBody(body, false) } msg.SetSource(packet.raddr) + msg.SetConnection(packet.conn) p.out <- msg } } diff --git a/pkg/gbs/sip/request.go b/pkg/gbs/sip/request.go index 048c7ac..dda7145 100644 --- a/pkg/gbs/sip/request.go +++ b/pkg/gbs/sip/request.go @@ -157,6 +157,10 @@ func (req *Request) SetDestination(dest net.Addr) { req.dest = dest } +func (req *Request) SetConnection(conn Connection) { + req.conn = conn +} + // Clone Clone func (req *Request) Clone() Message { return NewRequest( diff --git a/pkg/gbs/sip/server.go b/pkg/gbs/sip/server.go index 1555e24..4ddeb8a 100644 --- a/pkg/gbs/sip/server.go +++ b/pkg/gbs/sip/server.go @@ -12,6 +12,8 @@ import ( "strconv" "strings" "sync" + + "github.com/ixugo/goweb/pkg/conc" ) var bufferSize uint16 = 65535 - 20 - 8 // IPv4 max size - IPv4 Header size - UDP Header size @@ -22,12 +24,11 @@ var bufferSize uint16 = 65535 - 20 - 8 // IPv4 max size - IPv4 Header size - UDP // Server Server type Server struct { udpaddr net.Addr - conn Connection + udpConn Connection txs *transacionts - hmu *sync.RWMutex - route map[string][]HandlerFunc + route conc.Map[string, []HandlerFunc] port *Port host net.IP @@ -36,42 +37,58 @@ type Server struct { tcpListener *net.TCPListener tcpaddr net.Addr + + ctx context.Context + cancel context.CancelFunc + + from *Address } // NewServer NewServer -func NewServer() *Server { +func NewServer(form *Address) *Server { activeTX = &transacionts{txs: map[string]*Transaction{}, rwm: &sync.RWMutex{}} + ctx, cancel := context.WithCancel(context.TODO()) srv := &Server{ - hmu: &sync.RWMutex{}, - txs: activeTX, - route: make(map[string][]HandlerFunc), + txs: activeTX, + ctx: ctx, + cancel: cancel, + from: form, } return srv } -func (s *Server) addRoute(method string, pattern string, handler ...HandlerFunc) { - s.hmu.Lock() - defer s.hmu.Unlock() - key := method + "-" + pattern - s.route[key] = handler +func (s *Server) addRoute(method string, handler ...HandlerFunc) { + s.route.Store(strings.ToUpper(method), handler) } func (s *Server) Register(handler ...HandlerFunc) { - s.addRoute(MethodRegister, "", handler...) + s.addRoute(MethodRegister, handler...) } -func (s *Server) Message(handler ...HandlerFunc) { - s.addRoute(MethodMessage, "", handler...) +func (s *Server) Message(handler ...HandlerFunc) *RouteGroup { + s.addRoute(MethodMessage, handler...) + return newRouteGroup(MethodMessage, s, handler...) +} + +func (s *Server) Notify(handler ...HandlerFunc) *RouteGroup { + s.addRoute(MethodNotify, handler...) + return newRouteGroup(MethodNotify, s, handler...) } func (s *Server) getTX(key string) *Transaction { return s.txs.getTX(key) } -func (s *Server) mustTX(key string) *Transaction { +func (s *Server) mustTX(msg *Request) *Transaction { + key := getTXKey(msg) tx := s.txs.getTX(key) + if tx == nil { - tx = s.txs.newTX(key, s.conn) + if msg.conn.Network() == "udp" { + tx = s.txs.newTX(key, s.udpConn) + } else { + tx = s.txs.newTX(key, msg.conn) + } } return tx } @@ -91,7 +108,7 @@ func (s *Server) ListenUDPServer(addr string) { if err != nil { panic(fmt.Errorf("net.ListenUDP err[%w]", err)) } - s.conn = newUDPConnection(udp) + s.udpConn = newUDPConnection(udp) var ( raddr net.Addr num int @@ -101,17 +118,22 @@ func (s *Server) ListenUDPServer(addr string) { defer parser.stop() go s.handlerListen(parser.out) for { - num, raddr, err = s.conn.ReadFrom(buf) - if err != nil { - // logrus.Errorln("udp.ReadFromUDP err", err) - continue + select { + case <-s.ctx.Done(): + return + default: + num, raddr, err = s.udpConn.ReadFrom(buf) + if err != nil { + slog.Error("udp.ReadFromUDP", "err", err) + continue + } + parser.in <- newPacket(append([]byte{}, buf[:num]...), raddr, s.udpConn) } - parser.in <- newPacket(append([]byte{}, buf[:num]...), raddr, s.conn) } } // ListenTCPServer 启动 TCP 服务器并监听指定地址。 -func (s *Server) ListenTCPServer(ctx context.Context, addr string) { +func (s *Server) ListenTCPServer(addr string) { // 解析传入的地址为 TCP 地址 tcpaddr, err := net.ResolveTCPAddr("tcp", addr) // 如果解析地址失败,则抛出错误 @@ -138,7 +160,7 @@ func (s *Server) ListenTCPServer(ctx context.Context, addr string) { for { select { - case <-ctx.Done(): + case <-s.ctx.Done(): slog.Info("ListenTCPServer Has Been Exits") return default: @@ -152,14 +174,25 @@ func (s *Server) ListenTCPServer(ctx context.Context, addr string) { } } +func (s *Server) Close() { + if s.cancel != nil { + s.cancel() + s.cancel = nil + } + if s.udpConn != nil { + s.udpConn.Close() + s.udpConn = nil + } + if s.tcpListener != nil { + s.tcpListener.Close() + s.tcpListener = nil + } +} + // ProcessTcpConn 处理传入的 TCP 连接。 func (s *Server) ProcessTcpConn(conn net.Conn) { - // 确保在方法退出时关闭连接 - defer conn.Close() // 关闭连接 - // 创建一个新的缓冲读取器,用于从连接中读取数据 + defer conn.Close() reader := bufio.NewReader(conn) - // lenBuf := make([]byte, 2) - // 创建一个新的 TCP 连接实例 c := NewTCPConnection(conn) parser := newParser() @@ -167,47 +200,34 @@ func (s *Server) ProcessTcpConn(conn net.Conn) { go s.handlerListen(parser.out) for { - // 初始化一个缓冲区,用于存储读取的数据 var buffer bytes.Buffer - // 初始化 body 长度 bodyLen := 0 for { // 读取一行数据,以 '\n' 为结束符 line, err := reader.ReadBytes('\n') - // 如果读取过程中出错,则退出方法 if err != nil { // logrus.Debugln("tcp conn read error:", err) return } - // 将读取的数据写入缓冲区 buffer.Write(line) - // 如果读取到的行长度小于等于2且 body 长度小于等于0,则跳出循环 if len(line) <= 2 { if bodyLen <= 0 { break } - // 读取 body 数据 - // read body bodyBuf := make([]byte, bodyLen) n, err := io.ReadFull(reader, bodyBuf) - // 如果读取 body 数据时出错,则记录错误并退出循环 if err != nil || n != bodyLen { slog.Error(`error while read full`, "err", err) - // err process } - // 将读取的 body 数据写入缓冲区 buffer.Write(bodyBuf) break } - // 如果读取到 "Content-Length" 头部,则解析 body 长度 if strings.Contains(string(line), "Content-Length") { - // 以: 对line做分割 s := strings.Split(string(line), ":") value := strings.Trim(s[len(s)-1], " \r\n") bodyLen, err = strconv.Atoi(value) - // 如果解析 "Content-Length" 头部失败,则记录错误并退出循环 if err != nil { slog.Error("parse Content-Length failed") break @@ -236,11 +256,22 @@ func (s *Server) handlerListen(msgs chan Message) { switch tmsg := msg.(type) { case *Request: req := tmsg - req.SetDestination(s.udpaddr) + + dst := s.udpaddr + if req.conn.Network() == "tcp" { + dst = s.tcpaddr + } + + req.SetDestination(dst) s.handlerRequest(req) case *Response: resp := tmsg - resp.SetDestination(s.udpaddr) + + dst := s.udpaddr + if resp.conn.Network() == "tcp" { + dst = s.tcpaddr + } + resp.SetDestination(dst) s.handlerResponse(resp) default: // logrus.Errorln("undefind msg type,", tmsg, msg.String()) @@ -249,19 +280,35 @@ func (s *Server) handlerListen(msgs chan Message) { } func (s *Server) handlerRequest(msg *Request) { - tx := s.mustTX(getTXKey(msg)) + tx := s.mustTX(msg) // logrus.Traceln("receive request from:", msg.Source(), ",method:", msg.Method(), "txKey:", tx.key, "message: \n", msg.String()) - s.hmu.RLock() - handlers, ok := s.route[msg.Method()] - s.hmu.RUnlock() + + key := msg.Method() + if key == MethodMessage || key == MethodNotify { + + if l, ok := msg.ContentLength(); !ok || l.Equals(0) { + slog.Error("ContentLength is empty") + return + } + body := msg.Body() + var msg MessageReceive + if err := XMLDecode(body, &msg); err != nil { + slog.Error("xml decode err") + return + } + key += "-" + msg.CmdType + } + handlers, ok := s.route.Load(strings.ToUpper(key)) if !ok { - // logrus.Errorln("not found handler func,string:", msg.Method(), msg.String()) + slog.Error("not found handler func,string:", "method", msg.Method(), "msg", msg.String()) go handlerMethodNotAllowed(msg, tx) return } ctx := newContext(msg, tx) ctx.handlers = handlers + ctx.fromAddr = s.from + ctx.svr = s go ctx.Next() } @@ -290,7 +337,7 @@ func (s *Server) Request(req *Request) (*Transaction, error) { viaHop.Params.Add("rport", nil) } - tx := s.mustTX(getTXKey(req)) + tx := s.mustTX(req) return tx, tx.Request(req) } diff --git a/pkg/gbs/sip/tx.go b/pkg/gbs/sip/tx.go index d5e3263..39f2d16 100644 --- a/pkg/gbs/sip/tx.go +++ b/pkg/gbs/sip/tx.go @@ -4,6 +4,7 @@ import ( "net/http" "sync" "time" + "unsafe" ) var activeTX *transacionts @@ -117,8 +118,10 @@ func (tx *Transaction) Respond(res *Response) error { // Request Request func (tx *Transaction) Request(req *Request) error { + str := req.String() + s := unsafe.Slice(unsafe.StringData(str), len(str)) // logrus.Traceln("send request,to:", req.dest.String(), "txkey:", tx.key, "message: \n", req.String()) - _, err := tx.conn.WriteTo([]byte(req.String()), req.dest) + _, err := tx.conn.WriteTo(s, req.dest) return err } diff --git a/pkg/gbs/sip/utils.go b/pkg/gbs/sip/utils.go index 7d99fe1..1ba7b79 100644 --- a/pkg/gbs/sip/utils.go +++ b/pkg/gbs/sip/utils.go @@ -3,7 +3,6 @@ package sip import ( "bytes" "context" - "crypto/md5" "encoding/json" "encoding/xml" "errors" @@ -15,8 +14,8 @@ import ( "net" "net/http" "time" + "unicode/utf8" - "golang.org/x/net/html/charset" "golang.org/x/text/encoding/simplifiedchinese" "golang.org/x/text/transform" ) @@ -156,24 +155,34 @@ func GetRequest(url string) ([]byte, error) { } defer resp.Body.Close() - respbody, err := ioutil.ReadAll(resp.Body) + respbody, err := io.ReadAll(resp.Body) if err != nil { return nil, err } return respbody, nil } -// GetMD5 GetMD5 -func GetMD5(str string) string { - h := md5.New() - io.WriteString(h, str) - return fmt.Sprintf("%x", h.Sum(nil)) +// XMLDecode 解码 xml +func XMLDecode(data []byte, v interface{}) error { + if err := xmlDecode(data, v); err == nil { + return nil + } + // 有些body xml发送过来的不带encoding ,而且格式不是utf8的,导致xml解析失败,此处使用gbk转utf8后再次尝试xml解析 + body, err := GbkToUtf8(data) + if err != nil { + return err + } + return xmlDecode(body, v) } -// XMLDecode XMLDecode -func XMLDecode(data []byte, v interface{}) error { +func xmlDecode(data []byte, v interface{}) error { decoder := xml.NewDecoder(bytes.NewReader(data)) - decoder.CharsetReader = charset.NewReaderLabel + decoder.CharsetReader = func(charset string, input io.Reader) (io.Reader, error) { + if utf8.Valid(data) { + return input, nil + } + return simplifiedchinese.GB18030.NewDecoder().Reader(input), nil + } return decoder.Decode(v) } @@ -226,7 +235,7 @@ func ResolveSelfIP() (net.IP, error) { // GBK 转 UTF-8 func GbkToUtf8(s []byte) ([]byte, error) { reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewDecoder()) - d, e := ioutil.ReadAll(reader) + d, e := io.ReadAll(reader) if e != nil { return nil, e } @@ -236,7 +245,7 @@ func GbkToUtf8(s []byte) ([]byte, error) { // UTF-8 转 GBK func Utf8ToGbk(s []byte) ([]byte, error) { reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewEncoder()) - d, e := ioutil.ReadAll(reader) + d, e := io.ReadAll(reader) if e != nil { return nil, e } diff --git a/pkg/gbs/stream.go b/pkg/gbs/stream.go index 5cbd028..f3c3d46 100644 --- a/pkg/gbs/stream.go +++ b/pkg/gbs/stream.go @@ -150,7 +150,7 @@ func CheckStreams() { StreamList.Response.Delete(stream.StreamID) StreamList.Succ.Delete(stream.ChannelID) - tx, err := srv.Request(req) + tx, err := svr.Request(req) if err != nil { // logrus.Warningln("checkStreamClosedFail", stream.StreamID, err) stream.Msg = err.Error() diff --git a/pkg/zlm/rtp.go b/pkg/zlm/rtp.go new file mode 100644 index 0000000..3800244 --- /dev/null +++ b/pkg/zlm/rtp.go @@ -0,0 +1,59 @@ +package zlm + +const ( + openRtpServer = `/index/api/openRtpServer` + closeRtpServer = `/index/api/closeRtpServer` +) + +type OpenRTPServerResponse struct { + Code int `json:"code"` + Port int `json:"port"` // 接收端口,方便获取随机端口号 +} +type OpenRTPServerRequest struct { + Port int `json:"port"` // 接收端口,0 则为随机端口 + TCPMode int `json:"tcp_mode"` // 0 udp 模式,1 tcp 被动模式, 2 tcp 主动模式。 (兼容 enable_tcp 为 0/1) + StreamID string `json:"stream_id"` // 该端口绑定的流 ID,该端口只能创建这一个流(而不是根据 ssrc 创建多个) +} + +// OpenRTPServer 创建 GB28181 RTP 接收端口,如果该端口接收数据超时,则会自动被回收(不用调用 closeRtpServer 接口) +// https://docs.zlmediakit.com/zh/guide/media_server/restful_api.html#_24%E3%80%81-index-api-openrtpserver +func (e *Engine) OpenRTPServer(in OpenRTPServerRequest) (*OpenRTPServerResponse, error) { + body, err := struct2map(in) + if err != nil { + return nil, err + } + var resp OpenRTPServerResponse + if err := e.post(openRtpServer, body, &resp); err != nil { + return nil, err + } + if err := e.ErrHandle(resp.Code, "rtp err"); err != nil { + return nil, err + } + return &resp, nil +} + +type CloseRTPServerRequest struct { + StreamID string `json:"stream_id"` // 调用 openRtpServer 接口时提供的流 ID +} + +type CloseRTPServerResponse struct { + Code int `json:"code"` + Hit int `json:"hit"` // 是否找到记录并关闭 +} + +// CloseRTPServer 关闭 GB28181 RTP 接收端口 +// https://docs.zlmediakit.com/zh/guide/media_server/restful_api.html#_25%E3%80%81-index-api-closertpserver +func (e *Engine) CloseRTPServer(in CloseRTPServerRequest) (*CloseRTPServerResponse, error) { + body, err := struct2map(in) + if err != nil { + return nil, err + } + var resp CloseRTPServerResponse + if err := e.post(closeRtpServer, body, &resp); err != nil { + return nil, err + } + if err := e.ErrHandle(resp.Code, "rtp close err"); err != nil { + return nil, err + } + return &resp, nil +} diff --git a/pkg/zlm/snap.go b/pkg/zlm/snap.go new file mode 100644 index 0000000..91d2432 --- /dev/null +++ b/pkg/zlm/snap.go @@ -0,0 +1,21 @@ +package zlm + +const ( + getSnapshot = `/index/api/getSnap` +) + +type GetSnapRequest struct { + URL string + TimeoutSec int // 截图失败超时时间 + ExpireSec int // 截图过期时间 +} + +// GetSnap 获取截图或生成实时截图并返回 +// https://docs.zlmediakit.com/zh/guide/media_server/restful_api.html#_23%E3%80%81-index-api-getsnap +func (e *Engine) GetSnap(in GetSnapRequest) ([]byte, error) { + body, err := struct2map(in) + if err != nil { + return nil, err + } + return e.post2(getSnapshot, body) +} diff --git a/pkg/zlm/zlm.go b/pkg/zlm/zlm.go index ede0903..3608ecc 100644 --- a/pkg/zlm/zlm.go +++ b/pkg/zlm/zlm.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "maps" "net/http" "time" @@ -58,12 +59,27 @@ func (e *Engine) post(path string, data map[string]any, out any) error { return err } defer resp.Body.Close() - // b, _ := io.ReadAll(resp.Body) // fmt.Println(string(b)) return json.NewDecoder(resp.Body).Decode(out) } +// post2 直接读取全部响应返回 +func (e *Engine) post2(path string, data map[string]any) ([]byte, error) { + bodyMap := map[string]any{ + "secret": e.cfg.Secret, + } + maps.Copy(bodyMap, data) + body, _ := json.Marshal(bodyMap) + + resp, err := e.cli.Post(e.cfg.URL+path, "application/json", bytes.NewReader(body)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return io.ReadAll(resp.Body) +} + func (e *Engine) ErrHandle(code int, msg string) error { switch code { case Success: