gb28181 register

This commit is contained in:
xugo
2025-01-31 19:55:33 +08:00
parent e55099c34e
commit f33eb37681
44 changed files with 1267 additions and 454 deletions

View File

@@ -70,6 +70,8 @@ Java WVP @648540858 [wvp-GB28181-pro](https://github.com/648540858/wvp-GB28181-p
[如何使用 OBS RTMP 推流到 GB/T28181平台](https://juejin.cn/post/7463350947100786739) [如何使用 OBS RTMP 推流到 GB/T28181平台](https://juejin.cn/post/7463350947100786739)
[GB28181 七种注册姿势](https://juejin.cn/post/7465274924899532838)
码字中... 码字中...
@@ -110,28 +112,27 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit
## 功能特性 ## 功能特性
- [x] 集成 web 界面 - [x] 集成 web 界面
- [x] 兼容性良好 - [x] 兼容性良好
- [x] 接入设备 - [x] 接入设备
- [ ] 视频预览 - [x] 视频预览
- [ ] 支持主码流子码流切换 - [ ] 支持主码流子码流切换
- [ ] 无限制接入路数,能接入多少设备只取决于你的服务器性能 - [x] 无限制接入路数,能接入多少设备只取决于你的服务器性能
- [ ] 云台控制,控制设备转向,拉近,拉远 - [ ] 云台控制,控制设备转向,拉近,拉远
- [ ] 预置位查询,使用与设置 - [ ] 预置位查询,使用与设置
- [ ] 查询 NVR/IPC 上的录像与播放,支持指定时间播放与下载 - [ ] 查询 NVR/IPC 上的录像与播放,支持指定时间播放与下载
- [ ] 无人观看自动断流,节省流量 - [ ] 无人观看自动断流,节省流量
- [ ] 视频设备信息同步 - [x] 视频设备信息同步
- [ ] 离在线监控 - [ ] 离在线监控
- [ ] 支持直接输出RTSP、RTMP、HTTP-FLV、Websocket-FLV、HLS多种协议流地址 - [x] 支持直接输出RTSP、RTMP、HTTP-FLV、Websocket-FLV、HLS多种协议流地址
- [ ] 支持通过一个流地址直接观看摄像头,无需登录以及调用任何接口 - [ ] 支持通过一个流地址直接观看摄像头,无需登录以及调用任何接口
- [ ] 支持 UDP 和 TCP 两种国标信令传输模式 - [x] 支持 UDP 和 TCP 两种国标信令传输模式
- [ ] 支持 UDP 和 TCP 被动,TCP 主动 三种国标流传输模式 - [ ] 支持 UDP 和 TCP 被动,TCP 主动 三种国标流传输模式
- [ ] 支持检索,通道筛选 - [x] 支持检索,通道筛选
- [ ] 支持通道子目录查询 - [ ] 支持通道子目录查询
- [ ] 支持过滤音频,防止杂音影响观看 - [ ] 支持过滤音频,防止杂音影响观看
- [ ] 支持国标网络校时 - [x] 支持国标网络校时
- [x] 支持播放 H264 和 H265 - [x] 支持播放 H264 和 H265
- [ ] 报警信息处理,支持向前端推送报警信息 - [ ] 报警信息处理,支持向前端推送报警信息
- [ ] 语音对讲 - [ ] 语音对讲
@@ -144,7 +145,7 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit
- [ ] 设备目录订阅 - [ ] 设备目录订阅
- [ ] 设备目录通知处理 - [ ] 设备目录通知处理
- [ ] 移动位置查询和显示 - [ ] 移动位置查询和显示
- [ ] 支持手动添加设备和给设备设置单独的密码 - [x] 支持手动添加设备和给设备设置单独的密码
- [ ] 支持平台对接接入 - [ ] 支持平台对接接入
- [ ] 支持国标级联 - [ ] 支持国标级联
- [ ] 国标通道向上级联 - [ ] 国标通道向上级联
@@ -167,12 +168,12 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit
- [ ] 支持同时级联到多个上级平台 - [ ] 支持同时级联到多个上级平台
- [ ] 支持自动配置ZLM媒体服务, 减少因配置问题所出现的问题 - [ ] 支持自动配置ZLM媒体服务, 减少因配置问题所出现的问题
- [ ] 多流媒体节点,自动选择负载最低的节点使用 - [ ] 多流媒体节点,自动选择负载最低的节点使用
- [ ] 支持启用udp多端口模式, 提高udp模式下媒体传输性能 - [x] 支持启用 udp 多端口模式, 提高 udp 模式下媒体传输性能
- [x] 支持局域网/互联网/特殊网络环境部署 - [x] 支持局域网/互联网/特殊网络环境部署
- [x] 支持 gowvp 与 zlm 分开部署,提升平台并发能力 - [x] 支持 gowvp 与 zlm 分开部署,提升平台并发能力
- [ ] 支持拉流RTSP/RTMP分发为各种流格式或者推送到其他国标平台 - [ ] 支持拉流RTSP/RTMP分发为各种流格式或者推送到其他国标平台
- [ ] 支持推流RTSP/RTMP分发为各种流格式或者推送到其他国标平台 - [ ] 支持推流RTSP/RTMP分发为各种流格式或者推送到其他国标平台
- [ ] 支持推流鉴权 - [x] 支持推流鉴权
- [x] 支持接口鉴权 - [x] 支持接口鉴权
- [ ] 云端录像,推流/代理/国标视频均可以录制在云端服务器,支持预览和下载 - [ ] 云端录像,推流/代理/国标视频均可以录制在云端服务器,支持预览和下载
- [ ] 支持跨域请求,支持前后端分离部署 - [ ] 支持跨域请求,支持前后端分离部署

View File

@@ -31,7 +31,8 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
webHookAPI := api.NewWebHookAPI(smsCore, mediaCore, bc) webHookAPI := api.NewWebHookAPI(smsCore, mediaCore, bc)
mediaAPI := api.NewMediaAPI(mediaCore, smsCore, bc) mediaAPI := api.NewMediaAPI(mediaCore, smsCore, bc)
gb28181API := api.NewGb28181API(db, uniqueidCore) gb28181API := api.NewGb28181API(db, uniqueidCore)
server := gbs.NewServer() gb28181 := api.NewGB28181(db, uniqueidCore)
server, cleanup := gbs.NewServer(bc, gb28181)
usecase := &api.Usecase{ usecase := &api.Usecase{
Conf: bc, Conf: bc,
DB: db, DB: db,
@@ -45,5 +46,6 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
} }
handler := api.NewHTTPHandler(usecase) handler := api.NewHTTPHandler(usecase)
return handler, func() { return handler, func() {
cleanup()
}, nil }, nil
} }

View File

@@ -18,11 +18,17 @@ version = 1
ConnMaxLifetime = '6h0m0s' ConnMaxLifetime = '6h0m0s'
SlowThreshold = '200ms' SlowThreshold = '200ms'
[Sip]
Port = 15062
ID = "3402000000200000001"
Domain = "3402000000"
Password = "12345678"
[Media] [Media]
IP = "127.0.0.1" IP = "127.0.0.1"
HTTPPort = 8080 HTTPPort = 8080
Secret = "s1kPE7bzqKeHUaVcp8dCA0jeB8yxyFq4" Secret = "s1kPE7bzqKeHUaVcp8dCA0jeB8yxyFq4"
WebHookIP = "192.168.10.28" WebHookIP = "host.docker.internal"
RTPPortRange = "20000,20500" RTPPortRange = "20000,20500"
[Log] [Log]

2
go.mod
View File

@@ -61,7 +61,7 @@ require (
golang.org/x/arch v0.13.0 // indirect golang.org/x/arch v0.13.0 // indirect
golang.org/x/crypto v0.32.0 // indirect golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/net v0.34.0 golang.org/x/net v0.34.0 // indirect
golang.org/x/sync v0.10.0 // indirect golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 golang.org/x/text v0.21.0

1
go.sum
View File

@@ -47,7 +47,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

View File

@@ -62,7 +62,7 @@ type SIP struct {
Port int `comment:"服务监听的 tcp/udp 端口号"` Port int `comment:"服务监听的 tcp/udp 端口号"`
ID string `comment:"gb/t28181 20 位国标 ID"` ID string `comment:"gb/t28181 20 位国标 ID"`
Domain string Domain string
Password int `comment:"注册密码"` Password string `comment:"注册密码"`
} }
type Media struct { type Media struct {

View File

@@ -1,6 +1,8 @@
package bz package bz
const ( const (
IDPrefixGB = "gb" IDPrefixGB = "gb" // 国标设备
IDPrefixRTMP = "r" // rtmp ID 前缀,取 rtmp 首字母 IDPrefixGBChannel = "ch" // 国标通道 id 前缀
IDPrefixRTMP = "m" // rtmp ID 前缀,取 rtmp 中的 m不好记但是清晰
IDPrefixRTSP = "s" // rtsp ID 前缀,取 rtsp 中的 s不好记但是清晰
) )

View File

@@ -4,7 +4,9 @@ package gb28181
import ( import (
"context" "context"
"log/slog" "log/slog"
"strings"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goweb/pkg/orm" "github.com/ixugo/goweb/pkg/orm"
"github.com/ixugo/goweb/pkg/web" "github.com/ixugo/goweb/pkg/web"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
@@ -22,7 +24,21 @@ type ChannelStorer interface {
// FindChannel Paginated search // FindChannel Paginated search
func (c *Core) FindChannel(ctx context.Context, in *FindChannelInput) ([]*Channel, int64, error) { func (c *Core) FindChannel(ctx context.Context, in *FindChannelInput) ([]*Channel, int64, error) {
items := make([]*Channel, 0) items := make([]*Channel, 0)
total, err := c.store.Channel().Find(ctx, &items, in)
query := orm.NewQuery(1)
query.OrderBy("channel_id ASC")
if in.DeviceID != "" {
query.Where("device_id = ?", in.DeviceID)
}
if in.Key != "" {
if strings.HasPrefix(in.Key, bz.IDPrefixGBChannel) {
query.Where("id=?", in.Key)
} else {
query.Where("channel_id like ? OR name like ?", "%"+in.Key+"%", "%"+in.Key+"%")
}
}
total, err := c.store.Channel().Find(ctx, &items, in, query.Encode()...)
if err != nil { if err != nil {
return nil, 0, web.ErrDB.Withf(`Find err[%s]`, err.Error()) return nil, 0, web.ErrDB.Withf(`Find err[%s]`, err.Error())
} }

View File

@@ -3,12 +3,13 @@ package gb28181
// Channel domain model // Channel domain model
type Channel struct { type Channel struct {
ID string `gorm:"primaryKey" json:"id"` ID string `gorm:"primaryKey" json:"id"`
DeviceID string `gorm:"column:device_id;notNull;default:'';comment:国标编码" json:"device_id"` // 国标编码 DeviceID string `gorm:"column:device_id;index;notNull;default:'';comment:国标编码" json:"device_id"` // 国标编码
Name string `gorm:"column:name;notNull;default:'';comment:通道名称" json:"name"` // 通道名称 ChannelID string `gorm:"column:channel_id;index;notNull;default:'';comment:国标编码" json:"channel_id"` // 国标编码
PTZType int `gorm:"column:ptztype;notNull;default:0;comment:云台类型" json:"ptztype"` // 云台类型 Name string `gorm:"column:name;notNull;default:'';comment:通道名称" json:"name"` // 通道名称
IsOnline bool `gorm:"column:is_online;notNull;default:FALSE;comment:是否在线" json:"is_online"` // 是否在线 PTZType int `gorm:"column:ptztype;notNull;default:0;comment:云台类型" json:"ptztype"` // 云台类型
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"` IsOnline bool `gorm:"column:is_online;notNull;default:FALSE;comment:是否在线" json:"is_online"` // 是否在线
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"`
} }
// TableName database table name // TableName database table name

View File

@@ -5,11 +5,11 @@ import "github.com/ixugo/goweb/pkg/web"
type FindChannelInput struct { type FindChannelInput struct {
web.PagerFilter web.PagerFilter
DeviceID string `form:"device_id"` // 国标编码 DeviceID string `form:"device_id"` // 国标编码
Name string `form:"name"` // 通道名称 Key string `form:"key"` // 名称/国标编码 模糊搜索id 精确搜索
PTZType int `form:"ptztype"` // 云台类型 // Name string `form:"name"` // 通道名称
IsOnline bool `form:"is_online"` // 是否在线 // PTZType int `form:"ptztype"` // 云台类型
Ext DeviceExt `form:"ext"` IsOnline bool `form:"is_online"` // 是否在线
} }
type EditChannelInput struct { type EditChannelInput struct {

View File

@@ -25,6 +25,7 @@ type Device struct {
CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间
Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"` Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"`
Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"`
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性 Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性
} }
@@ -39,3 +40,8 @@ func (d Device) Check() error {
} }
return nil return nil
} }
func (d *Device) init(id, deviceID string) {
d.ID = id
d.DeviceID = deviceID
}

View File

@@ -0,0 +1,92 @@
package gb28181
import (
"context"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/uniqueid"
"github.com/ixugo/goweb/pkg/orm"
)
type GB28181 struct {
deviceStore DeviceStorer
channelStore ChannelStorer
uni uniqueid.Core
}
func NewGB28181(ds DeviceStorer, cs ChannelStorer, uni uniqueid.Core) GB28181 {
return GB28181{
deviceStore: ds,
channelStore: cs,
uni: uni,
}
}
func (g GB28181) GetDeviceByDeviceID(deviceID string) (*Device, error) {
ctx := context.TODO()
var d Device
if err := g.deviceStore.Get(ctx, &d, orm.Where("device_id=?", deviceID)); err != nil {
if !orm.IsErrRecordNotFound(err) {
return nil, err
}
d.init(g.uni.UniqueID(bz.IDPrefixGB), deviceID)
if err := g.deviceStore.Add(ctx, &d); err != nil {
return nil, err
}
}
return &d, nil
}
func (g GB28181) Logout(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.deviceStore.Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g GB28181) Login(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.deviceStore.Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g GB28181) Edit(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.deviceStore.Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g GB28181) SaveChannels(channels []*Channel) error {
if len(channels) <= 0 {
return nil
}
var dev Device
g.deviceStore.Edit(context.TODO(), &dev, func(d *Device) {
d.Channels = len(channels)
}, orm.Where("device_id=?", channels[0].DeviceID))
for _, channel := range channels {
var ch Channel
if err := g.channelStore.Edit(context.TODO(), &ch, func(c *Channel) {
c.IsOnline = channel.IsOnline
}, orm.Where("device_id = ? AND channel_id = ?", channel.DeviceID, channel.ChannelID)); err != nil {
channel.ID = g.uni.UniqueID(bz.IDPrefixGBChannel)
g.channelStore.Add(context.TODO(), channel)
}
}
return nil
}

View File

@@ -1,16 +1,26 @@
// Code generated by gowebx, DO AVOID EDIT. // Code generated by gowebx, DO AVOID EDIT.
package gb28181 package gb28181
import "github.com/ixugo/goweb/pkg/orm" import (
"database/sql/driver"
"encoding/json"
"github.com/ixugo/goweb/pkg/orm"
)
// DeviceExt domain model // DeviceExt domain model
type DeviceExt struct { type DeviceExt struct {
Manufacturer string `json:"manufacturer"` // 生产厂商 Manufacturer string `json:"manufacturer"` // 生产厂商
Model string `json:"model"` // 型号 Model string `json:"model"` // 型号
Firmware string `json:"firmware"` // 固件版本 Firmware string `json:"firmware"` // 固件版本
Name string `json:"name"` // 设备名
} }
// Scan implements orm.Scaner. // Scan implements orm.Scaner.
func (i *DeviceExt) Scan(input interface{}) error { func (i *DeviceExt) Scan(input interface{}) error {
return orm.JsonUnmarshal(input, i) return orm.JsonUnmarshal(input, i)
} }
func (i DeviceExt) Value() (driver.Value, error) {
return json.Marshal(i)
}

View File

@@ -2,7 +2,6 @@ package api
import ( import (
"expvar" "expvar"
"fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"path/filepath" "path/filepath"
@@ -14,8 +13,6 @@ import (
"github.com/gin-contrib/gzip" "github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/media"
"github.com/gowvp/gb28181/plugin/stat" "github.com/gowvp/gb28181/plugin/stat"
"github.com/gowvp/gb28181/plugin/stat/statapi" "github.com/gowvp/gb28181/plugin/stat/statapi"
"github.com/ixugo/goweb/pkg/system" "github.com/ixugo/goweb/pkg/system"
@@ -69,67 +66,9 @@ func setupRouter(r *gin.Engine, uc *Usecase) {
registerZLMWebhookAPI(r, uc.WebHookAPI) registerZLMWebhookAPI(r, uc.WebHookAPI)
// TODO: 待增加鉴权 // TODO: 待增加鉴权
registerMediaAPI(r, uc.MediaAPI) registerMediaAPI(r, uc.MediaAPI)
registerGb28181(r, uc.GB28181API)
// TODO: 临时播放接口,待重构 uc.GB28181API.uc = uc
r.POST("/channels/:id/play", web.WarpH(func(c *gin.Context, _ *struct{}) (*playOutput, error) { registerGB28181(r, uc.GB28181API)
channelID := c.Param("id")
// TODO: 目前仅开发到 rtsp待扩展 rtsp/gb 等
if !strings.HasPrefix(channelID, bz.IDPrefixRTMP) {
return nil, web.ErrNotFound.Msg("不支持的播放通道")
}
push, err := uc.MediaAPI.mediaCore.GetStreamPush(c.Request.Context(), channelID)
if err != nil {
return nil, err
}
if push.Status != media.StatusPushing {
return nil, web.ErrNotFound.Msg("未推流")
}
svr, err := uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), push.MediaServerID)
if err != nil {
return nil, err
}
stream := push.App + "/" + push.Stream
host := c.Request.Host
if l := strings.Split(c.Request.Host, ":"); len(l) == 2 {
host = l[0]
}
var session string
if !push.IsAuthDisabled && push.Session != "" {
session = "session=" + push.Session
}
// 播放规则
// https://github.com/zlmediakit/ZLMediaKit/wiki/%E6%92%AD%E6%94%BEurl%E8%A7%84%E5%88%99
return &playOutput{
App: push.App,
Stream: push.Stream,
Items: []streamAddrItem{
{
Label: "默认线路",
WSFLV: fmt.Sprintf("ws://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session,
HTTPFLV: fmt.Sprintf("http://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session,
RTMP: fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session,
RTSP: fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session,
WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTP, push.App, push.Stream) + "&" + session,
HLS: fmt.Sprintf("http://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTP, stream) + "?" + session,
},
{
Label: "SSL 线路",
WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session,
RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session,
WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, push.App, push.Stream) + "&" + session,
HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session,
},
},
}, nil
}))
} }
type playOutput struct { type playOutput struct {

View File

@@ -2,26 +2,31 @@
package api package api
import ( import (
"fmt"
"strconv" "strconv"
"strings"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db" "github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db"
"github.com/gowvp/gb28181/internal/core/media"
"github.com/gowvp/gb28181/internal/core/uniqueid" "github.com/gowvp/gb28181/internal/core/uniqueid"
"github.com/ixugo/goweb/pkg/web" "github.com/ixugo/goweb/pkg/web"
"gorm.io/gorm" "gorm.io/gorm"
) )
type Gb28181API struct { type GB28181API struct {
gb28181Core gb28181.Core gb28181Core gb28181.Core
uc *Usecase
} }
func NewGb28181API(db *gorm.DB, uni uniqueid.Core) Gb28181API { func NewGb28181API(db *gorm.DB, uni uniqueid.Core) GB28181API {
core := gb28181.NewCore(gb28181db.NewDB(db).AutoMigrate(true), uni) core := gb28181.NewCore(gb28181db.NewDB(db).AutoMigrate(true), uni)
return Gb28181API{gb28181Core: core} return GB28181API{gb28181Core: core}
} }
func registerGb28181(g gin.IRouter, api Gb28181API, handler ...gin.HandlerFunc) { func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc) {
{ {
group := g.Group("/devices", handler...) group := g.Group("/devices", handler...)
group.GET("", web.WarpH(api.findDevice)) group.GET("", web.WarpH(api.findDevice))
@@ -34,61 +39,129 @@ func registerGb28181(g gin.IRouter, api Gb28181API, handler ...gin.HandlerFunc)
{ {
group := g.Group("/channels", handler...) group := g.Group("/channels", handler...)
group.GET("", web.WarpH(api.findChannel)) group.GET("", web.WarpH(api.findChannel))
group.GET("/:id", web.WarpH(api.getChannel))
group.PUT("/:id", web.WarpH(api.editChannel)) group.PUT("/:id", web.WarpH(api.editChannel))
group.POST("", web.WarpH(api.addChannel)) group.POST("/:id/play", web.WarpH(api.play))
group.DELETE("/:id", web.WarpH(api.delChannel)) // group.GET("/:id", web.WarpH(api.getChannel))
// group.POST("", web.WarpH(api.addChannel))
// group.DELETE("/:id", web.WarpH(api.delChannel))
} }
} }
// >>> device >>>>>>>>>>>>>>>>>>>> // >>> device >>>>>>>>>>>>>>>>>>>>
func (a Gb28181API) findDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) { func (a GB28181API) findDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) {
items, total, err := a.gb28181Core.FindDevice(c.Request.Context(), in) items, total, err := a.gb28181Core.FindDevice(c.Request.Context(), in)
return gin.H{"items": items, "total": total}, err return gin.H{"items": items, "total": total}, err
} }
func (a Gb28181API) getDevice(c *gin.Context, _ *struct{}) (any, error) { func (a GB28181API) getDevice(c *gin.Context, _ *struct{}) (any, error) {
deviceID, _ := strconv.Atoi(c.Param("id")) deviceID, _ := strconv.Atoi(c.Param("id"))
return a.gb28181Core.GetDevice(c.Request.Context(), deviceID) return a.gb28181Core.GetDevice(c.Request.Context(), deviceID)
} }
func (a Gb28181API) editDevice(c *gin.Context, in *gb28181.EditDeviceInput) (any, error) { func (a GB28181API) editDevice(c *gin.Context, in *gb28181.EditDeviceInput) (any, error) {
deviceID := c.Param("id") deviceID := c.Param("id")
return a.gb28181Core.EditDevice(c.Request.Context(), in, deviceID) return a.gb28181Core.EditDevice(c.Request.Context(), in, deviceID)
} }
func (a Gb28181API) addDevice(c *gin.Context, in *gb28181.AddDeviceInput) (any, error) { func (a GB28181API) addDevice(c *gin.Context, in *gb28181.AddDeviceInput) (any, error) {
return a.gb28181Core.AddDevice(c.Request.Context(), in) return a.gb28181Core.AddDevice(c.Request.Context(), in)
} }
func (a Gb28181API) delDevice(c *gin.Context, _ *struct{}) (any, error) { func (a GB28181API) delDevice(c *gin.Context, _ *struct{}) (any, error) {
deviceID := c.Param("id") deviceID := c.Param("id")
return a.gb28181Core.DelDevice(c.Request.Context(), deviceID) return a.gb28181Core.DelDevice(c.Request.Context(), deviceID)
} }
// >>> channel >>>>>>>>>>>>>>>>>>>> // >>> channel >>>>>>>>>>>>>>>>>>>>
func (a Gb28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) { func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) {
items, total, err := a.gb28181Core.FindChannel(c.Request.Context(), in) items, total, err := a.gb28181Core.FindChannel(c.Request.Context(), in)
return gin.H{"items": items, "total": total}, err return gin.H{"items": items, "total": total}, err
} }
func (a Gb28181API) getChannel(c *gin.Context, _ *struct{}) (any, error) { func (a GB28181API) getChannel(c *gin.Context, _ *struct{}) (any, error) {
channelID, _ := strconv.Atoi(c.Param("id")) channelID, _ := strconv.Atoi(c.Param("id"))
return a.gb28181Core.GetChannel(c.Request.Context(), channelID) return a.gb28181Core.GetChannel(c.Request.Context(), channelID)
} }
func (a Gb28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (any, error) { func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (any, error) {
channelID, _ := strconv.Atoi(c.Param("id")) channelID, _ := strconv.Atoi(c.Param("id"))
return a.gb28181Core.EditChannel(c.Request.Context(), in, channelID) return a.gb28181Core.EditChannel(c.Request.Context(), in, channelID)
} }
func (a Gb28181API) addChannel(c *gin.Context, in *gb28181.AddChannelInput) (any, error) { func (a GB28181API) addChannel(c *gin.Context, in *gb28181.AddChannelInput) (any, error) {
return a.gb28181Core.AddChannel(c.Request.Context(), in) return a.gb28181Core.AddChannel(c.Request.Context(), in)
} }
func (a Gb28181API) delChannel(c *gin.Context, _ *struct{}) (any, error) { func (a GB28181API) delChannel(c *gin.Context, _ *struct{}) (any, error) {
channelID, _ := strconv.Atoi(c.Param("id")) channelID, _ := strconv.Atoi(c.Param("id"))
return a.gb28181Core.DelChannel(c.Request.Context(), channelID) return a.gb28181Core.DelChannel(c.Request.Context(), channelID)
} }
func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
channelID := c.Param("id")
// TODO: 目前仅开发到 rtmp/gb28181待扩展 rtsp... 等
if !strings.HasPrefix(channelID, bz.IDPrefixRTMP) || !strings.HasPrefix(channelID, bz.IDPrefixGBChannel) {
return nil, web.ErrNotFound.Msg("不支持的播放通道")
}
// 国标逻辑
if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) {
// a.uc.SipServer.
}
push, err := a.uc.MediaAPI.mediaCore.GetStreamPush(c.Request.Context(), channelID)
if err != nil {
return nil, err
}
if push.Status != media.StatusPushing {
return nil, web.ErrNotFound.Msg("未推流")
}
svr, err := a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), push.MediaServerID)
if err != nil {
return nil, err
}
stream := push.App + "/" + push.Stream
host := c.Request.Host
if l := strings.Split(c.Request.Host, ":"); len(l) == 2 {
host = l[0]
}
var session string
if !push.IsAuthDisabled && push.Session != "" {
session = "session=" + push.Session
}
// 播放规则
// https://github.com/zlmediakit/ZLMediaKit/wiki/%E6%92%AD%E6%94%BEurl%E8%A7%84%E5%88%99
return &playOutput{
App: push.App,
Stream: push.Stream,
Items: []streamAddrItem{
{
Label: "默认线路",
WSFLV: fmt.Sprintf("ws://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session,
HTTPFLV: fmt.Sprintf("http://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + "?" + session,
RTMP: fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session,
RTSP: fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session,
WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTP, push.App, push.Stream) + "&" + session,
HLS: fmt.Sprintf("http://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTP, stream) + "?" + session,
},
{
Label: "SSL 线路",
WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session,
RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session,
WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, push.App, push.Stream) + "&" + session,
HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session,
},
},
}, nil
}
func (uc *Usecase) play(channelID string) {
}

View File

@@ -7,6 +7,8 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/google/wire" "github.com/google/wire"
"github.com/gowvp/gb28181/internal/conf" "github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db"
"github.com/gowvp/gb28181/internal/core/media" "github.com/gowvp/gb28181/internal/core/media"
"github.com/gowvp/gb28181/internal/core/media/store/mediadb" "github.com/gowvp/gb28181/internal/core/media/store/mediadb"
"github.com/gowvp/gb28181/internal/core/uniqueid" "github.com/gowvp/gb28181/internal/core/uniqueid"
@@ -31,6 +33,7 @@ var (
NewMediaCore, NewMediaAPI, NewMediaCore, NewMediaAPI,
gbs.NewServer, gbs.NewServer,
NewGb28181API, NewGb28181API,
NewGB28181,
) )
) )
@@ -42,7 +45,7 @@ type Usecase struct {
WebHookAPI WebHookAPI WebHookAPI WebHookAPI
UniqueID uniqueid.Core UniqueID uniqueid.Core
MediaAPI MediaAPI MediaAPI MediaAPI
GB28181API Gb28181API GB28181API GB28181API
SipServer *gbs.Server SipServer *gbs.Server
} }
@@ -97,3 +100,11 @@ func NewUniqueID(db *gorm.DB) uniqueid.Core {
func NewMediaCore(db *gorm.DB, uni uniqueid.Core) media.Core { func NewMediaCore(db *gorm.DB, uni uniqueid.Core) media.Core {
return media.NewCore(mediadb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate), uni) return media.NewCore(mediadb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate), uni)
} }
func NewGB28181(db *gorm.DB, uni uniqueid.Core) gb28181.GB28181 {
return gb28181.NewGB28181(
gb28181db.NewDevice(db),
gb28181db.NewChannel(db),
uni,
)
}

75
pkg/gbs/catalog.go Normal file
View File

@@ -0,0 +1,75 @@
package gbs
import (
"encoding/xml"
"log/slog"
"github.com/gowvp/gb28181/pkg/gbs/sip"
)
// MessageDeviceListResponse 设备明细列表返回结构
type MessageDeviceListResponse struct {
XMLName xml.Name `xml:"Response"`
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
DeviceID string `xml:"DeviceID"`
SumNum int `xml:"SumNum"`
Item []Channels `xml:"DeviceList>Item"`
}
func (g GB28181API) sipMessageCatalog(ctx *sip.Context) {
var msg MessageDeviceListResponse
if err := sip.XMLDecode(ctx.Request.Body(), &msg); err != nil {
slog.Error("Message Unmarshal xml", "err", err)
ctx.String(400, "xml err")
return
}
if msg.SumNum < 0 {
ctx.String(200, "OK")
return
}
for _, d := range msg.Item {
d.DeviceID = msg.DeviceID
g.catalog.Write(&sip.CollectorMsg[Channels]{
Key: d.DeviceID,
Data: &d,
Total: msg.SumNum,
})
// channel := Channels{ChannelID: d.ChannelID, DeviceID: message.DeviceID}
// if err := db.Get(db.DBClient, &channel); err == nil {
// channel.Active = time.Now().Unix()
// channel.URIStr = fmt.Sprintf("sip:%s@%s", d.ChannelID, _sysinfo.Region)
// channel.Status = transDeviceStatus(d.Status)
// channel.Name = d.Name
// channel.Manufacturer = d.Manufacturer
// channel.Model = d.Model
// channel.Owner = d.Owner
// channel.CivilCode = d.CivilCode
// // Address ip地址
// channel.Address = d.Address
// channel.Parental = d.Parental
// channel.SafetyWay = d.SafetyWay
// channel.RegisterWay = d.RegisterWay
// channel.Secrecy = d.Secrecy
// db.Save(db.DBClient, &channel)
// go notify(notifyChannelsActive(channel))
// } else {
// // logrus.Infoln("deviceid not found,deviceid:", d.DeviceID, "pdid:", message.DeviceID, "err", err)
// }
}
ctx.String(200, "OK")
}
// QueryCatalog 获取注册设备包含的列表
func (g GB28181API) QueryCatalog(ctx *sip.Context) {
_, err := ctx.SendRequest(sip.MethodMessage, sip.GetCatalogXML(ctx.DeviceID))
if err != nil {
slog.Error("sipCatalog", "err", err)
return
}
g.catalog.Run(ctx.DeviceID)
g.catalog.Wait(ctx.DeviceID)
}

View File

@@ -1,7 +1,6 @@
package gbs package gbs
import ( import (
"encoding/xml"
"net" "net"
"strings" "strings"
@@ -12,7 +11,7 @@ import (
var ( var (
// sip服务用户信息 // sip服务用户信息
_serverDevices Devices _serverDevices Devices
srv *sip.Server svr *sip.Server
) )
// Devices NVR 设备信息 // Devices NVR 设备信息
@@ -192,114 +191,6 @@ func parserDevicesFromReqeust(req *sip.Request) (Devices, bool) {
return u, true return u, true
} }
// 获取设备信息(注册设备)
func sipDeviceInfo(to Devices) {
hb := sip.NewHeaderBuilder().SetTo(to.addr).SetFrom(_serverDevices.addr).AddVia(&sip.ViaHop{
Params: sip.NewParams().Add("branch", sip.String{Str: sip.GenerateBranch()}),
}).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage)
req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetDeviceInfoXML(to.DeviceID))
req.SetDestination(to.source)
tx, err := srv.Request(req)
if err != nil {
// logrus.Warnln("sipDeviceInfo error,", err)
return
}
_, err = sipResponse(tx)
if err != nil {
// logrus.Warnln("sipDeviceInfo response error,", err)
return
}
}
// sipCatalog 获取注册设备包含的列表
func sipCatalog(to Devices) {
hb := sip.NewHeaderBuilder().SetTo(to.addr).SetFrom(_serverDevices.addr).AddVia(&sip.ViaHop{
Params: sip.NewParams().Add("branch", sip.String{Str: sip.GenerateBranch()}),
}).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage)
req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetCatalogXML(to.DeviceID))
req.SetDestination(to.source)
tx, err := srv.Request(req)
if err != nil {
// logrus.Warnln("sipCatalog error,", err)
return
}
_, err = sipResponse(tx)
if err != nil {
// logrus.Warnln("sipCatalog response error,", err)
return
}
}
// MessageDeviceInfoResponse 主设备明细返回结构
type MessageDeviceInfoResponse struct {
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
DeviceID string `xml:"DeviceID"`
DeviceType string `xml:"DeviceType"`
Manufacturer string `xml:"Manufacturer"`
Model string `xml:"Model"`
Firmware string `xml:"Firmware"`
}
func sipMessageDeviceInfo(u Devices, body []byte) error {
message := &MessageDeviceInfoResponse{}
if err := sip.XMLDecode([]byte(body), message); err != nil {
// logrus.Errorln("sipMessageDeviceInfo Unmarshal xml err:", err, "body:", body)
return err
}
// db.UpdateAll(db.DBClient, new(Devices), db.M{"deviceid=?": u.DeviceID}, Devices{
// Model: message.Model,
// DeviceType: message.DeviceType,
// Firmware: message.Firmware,
// Manufacturer: message.Manufacturer,
// })
return nil
}
// MessageDeviceListResponse 设备明细列表返回结构
type MessageDeviceListResponse struct {
XMLName xml.Name `xml:"Response"`
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
DeviceID string `xml:"DeviceID"`
SumNum int `xml:"SumNum"`
Item []Channels `xml:"DeviceList>Item"`
}
func sipMessageCatalog(u Devices, body []byte) error {
message := &MessageDeviceListResponse{}
if err := sip.XMLDecode(body, message); err != nil {
// logrus.Errorln("Message Unmarshal xml err:", err, "body:", string(body))
return err
}
if message.SumNum > 0 {
// for _, d := range message.Item {
// channel := Channels{ChannelID: d.ChannelID, DeviceID: message.DeviceID}
// if err := db.Get(db.DBClient, &channel); err == nil {
// channel.Active = time.Now().Unix()
// channel.URIStr = fmt.Sprintf("sip:%s@%s", d.ChannelID, _sysinfo.Region)
// channel.Status = transDeviceStatus(d.Status)
// channel.Name = d.Name
// channel.Manufacturer = d.Manufacturer
// channel.Model = d.Model
// channel.Owner = d.Owner
// channel.CivilCode = d.CivilCode
// // Address ip地址
// channel.Address = d.Address
// channel.Parental = d.Parental
// channel.SafetyWay = d.SafetyWay
// channel.RegisterWay = d.RegisterWay
// channel.Secrecy = d.Secrecy
// db.Save(db.DBClient, &channel)
// go notify(notifyChannelsActive(channel))
// } else {
// // logrus.Infoln("deviceid not found,deviceid:", d.DeviceID, "pdid:", message.DeviceID, "err", err)
// }
// }
}
return nil
}
var deviceStatusMap = map[string]string{ var deviceStatusMap = map[string]string{
"ON": m.DeviceStatusON, "ON": m.DeviceStatusON,
"OK": m.DeviceStatusON, "OK": m.DeviceStatusON,

8
pkg/gbs/error.go Normal file
View File

@@ -0,0 +1,8 @@
package gbs
import "errors"
var (
ErrXMLDecode = errors.New("xml decode error")
ErrDatabase = errors.New("database error")
)

View File

@@ -1,69 +1,103 @@
package gbs package gbs
import ( import (
"net/http"
"github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/gowvp/gb28181/pkg/gbs/sip"
) )
// MessageReceive 接收到的请求数据最外层,主要用来判断数据类型 func (g GB28181API) handlerMessage(ctx *sip.Context) {
type MessageReceive struct { // req := ctx.Request
CmdType string `xml:"CmdType"` // tx := ctx.Tx
SN int `xml:"SN"`
}
func (g GB28181API) handlerMessage(req *sip.Request, tx *sip.Transaction) { // case "Catalog":
u, ok := parserDevicesFromReqeust(req) // // 设备列表
if !ok { // sipMessageCatalog(u, body)
// 未解析出来源用户返回错误 // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) // return
return // case "Keepalive":
} // // heardbeat
// 判断是否存在body数据 // if err := sipMessageKeepalive(u, body); err == nil {
if len, have := req.ContentLength(); !have || len.Equals(0) { // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// 不存在就直接返回的成功 // // 心跳后同步注册设备列表信息
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) // // sipCatalog(u)
return // return
} // }
body := req.Body() // case "RecordInfo":
message := &MessageReceive{} // // 设备音视频文件列表
// sipMessageRecordInfo(u, body)
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// case "DeviceInfo":
// // 主设备信息
// sipMessageDeviceInfo(u, body)
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// return
// }
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil))
// }
if err := sip.XMLDecode(body, message); err != nil { // func (g GB28181API) handlerMessage2(ctx *sip.Context) {
// logrus.Warnln("Message Unmarshal xml err:", err, "body:", string(body)) // req := ctx.Request
// 有些body xml发送过来的不带encoding 而且格式不是utf8的导致xml解析失败此处使用gbk转utf8后再次尝试xml解析 // tx := ctx.Tx
body, err = sip.GbkToUtf8(body)
if err != nil { // u, ok := parserDevicesFromReqeust(req)
// logrus.Errorln("message gbk to utf8 err", err) // if !ok {
} // // 未解析出来源用户返回错误
if err := sip.XMLDecode(body, message); err != nil { // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil))
// logrus.Errorln("Message Unmarshal xml after gbktoutf8 err:", err, "body:", string(body)) // return
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) // }
return // // 判断是否存在body数据
} // if len, have := req.ContentLength(); !have || len.Equals(0) {
} // // 不存在就直接返回的成功
switch message.CmdType { // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
case "Catalog": // return
// 设备列表 // }
sipMessageCatalog(u, body) // body := req.Body()
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) // message := &MessageReceive{}
return
case "Keepalive": // if err := sip.XMLDecode(body, message); err != nil {
// heardbeat // // logrus.Warnln("Message Unmarshal xml err:", err, "body:", string(body))
if err := sipMessageKeepalive(u, body); err == nil { // // 有些body xml发送过来的不带encoding 而且格式不是utf8的导致xml解析失败此处使用gbk转utf8后再次尝试xml解析
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) // body, err = sip.GbkToUtf8(body)
// 心跳后同步注册设备列表信息 // if err != nil {
sipCatalog(u) // // logrus.Errorln("message gbk to utf8 err", err)
return // }
} // if err := sip.XMLDecode(body, message); err != nil {
case "RecordInfo": // // logrus.Errorln("Message Unmarshal xml after gbktoutf8 err:", err, "body:", string(body))
// 设备音视频文件列表 // tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil))
sipMessageRecordInfo(u, body) // return
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) // }
case "DeviceInfo": // }
// 主设备信息 //
sipMessageDeviceInfo(u, body) // switch message.CmdType {
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil)) // case "Catalog":
return //
} // // 设备列表
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil)) // sipMessageCatalog(u, body)
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// return
//
// case "Keepalive":
//
// // heardbeat
// if err := sipMessageKeepalive(u, body); err == nil {
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// // 心跳后同步注册设备列表信息
// sipCatalog(u)
// return
// }
//
// case "RecordInfo":
//
// // 设备音视频文件列表
// sipMessageRecordInfo(u, body)
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
//
// case "DeviceInfo":
//
// // 主设备信息
// sipMessageDeviceInfo(u, body)
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// return
// }
//
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusBadRequest, http.StatusText(http.StatusBadRequest), nil))
} }

60
pkg/gbs/info.go Normal file
View File

@@ -0,0 +1,60 @@
package gbs
import (
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/pkg/gbs/sip"
)
// 获取设备信息(注册设备)
func (g GB28181API) QueryDeviceInfo(ctx *sip.Context) {
tx, err := ctx.SendRequest(sip.MethodMessage, sip.GetDeviceInfoXML(ctx.DeviceID))
if err != nil {
ctx.Log.Error("sipDeviceInfo", "err", err)
return
}
if _, err := sipResponse(tx); err != nil {
ctx.Log.Error("sipResponse", "err", err)
return
}
}
// MessageDeviceInfoResponse 主设备明细返回结构
type MessageDeviceInfoResponse struct {
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
DeviceID string `xml:"DeviceID"`
DeviceName string `xml:"DeviceName"` // 设备名
DeviceType string `xml:"DeviceType"`
Manufacturer string `xml:"Manufacturer"` // 生产商
Model string `xml:"Model"` // 设备型号
Firmware string `xml:"Firmware"` // 固件版本
}
func (g GB28181API) sipMessageDeviceInfo(ctx *sip.Context) {
var msg MessageDeviceInfoResponse
if err := sip.XMLDecode(ctx.Request.Body(), &msg); err != nil {
ctx.Log.Error("sipMessageDeviceInfo", "err", err)
ctx.String(400, ErrXMLDecode.Error())
return
}
if err := g.store.Edit(ctx.DeviceID, func(d *gb28181.Device) {
d.Ext.Firmware = msg.Firmware
d.Ext.Manufacturer = msg.Manufacturer
d.Ext.Model = msg.Model
d.Ext.Name = msg.DeviceName
}); err != nil {
ctx.Log.Error("Edit", "err", err)
ctx.String(500, ErrDatabase.Error())
return
}
ctx.String(200, "OK")
// db.UpdateAll(db.DBClient, new(Devices), db.M{"deviceid=?": u.DeviceID}, Devices{
// Model: message.Model,
// DeviceType: message.DeviceType,
// Firmware: message.Firmware,
// Manufacturer: message.Manufacturer,
// })
}

View File

@@ -1,9 +1,9 @@
package gbs package gbs
import ( import (
"time" "github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goweb/pkg/orm"
// "github.com/panjjo/gosip/db" // "github.com/panjjo/gosip/db"
) )
@@ -16,27 +16,30 @@ type MessageNotify struct {
Info string `xml:"Info"` Info string `xml:"Info"`
} }
func sipMessageKeepalive(u Devices, body []byte) error { func (g GB28181API) sipMessageKeepalive(ctx *sip.Context) {
message := &MessageNotify{} var msg MessageNotify
if err := sip.XMLDecode(body, message); err != nil { if err := sip.XMLDecode(ctx.Request.Body(), &msg); err != nil {
// logrus.Errorln("Message Unmarshal xml err:", err, "body:", string(body)) ctx.Log.Error("Message Unmarshal xml err", "err", err)
return err return
} }
device, ok := _activeDevices.Get(u.DeviceID)
if !ok { // device, ok := _activeDevices.Get(ctx.DeviceID)
device = Devices{DeviceID: u.DeviceID} // if !ok {
// if err := db.Get(db.DBClient, &device); err != nil { // device = Devices{DeviceID: ctx.DeviceID}
// logrus.Warnln("Device Keepalive not found ", u.DeviceID, err) // if err := db.Get(db.DBClient, &device); err != nil {
// } // logrus.Warnln("Device Keepalive not found ", u.DeviceID, err)
// }
// }
if err := g.store.Edit(ctx.DeviceID, func(d *gb28181.Device) {
d.KeepaliveAt = orm.Now()
d.IsOnline = msg.Status == "OK"
}); err != nil {
ctx.Log.Error("keepalive", "err", err)
} }
if message.Status == "OK" {
device.ActiveAt = time.Now().Unix() // _activeDevices.Store(u.DeviceID, u)
_activeDevices.Store(u.DeviceID, u) // go notify(notifyDevicesAcitve(u.DeviceID, message.Status))
} else {
device.ActiveAt = -1
_activeDevices.Delete(u.DeviceID)
}
go notify(notifyDevicesAcitve(u.DeviceID, message.Status))
// _, err := db.UpdateAll(db.DBClient, new(Devices), map[string]interface{}{"deviceid=?": u.DeviceID}, Devices{ // _, err := db.UpdateAll(db.DBClient, new(Devices), map[string]interface{}{"deviceid=?": u.DeviceID}, Devices{
// Host: u.Host, // Host: u.Host,
// Port: u.Port, // Port: u.Port,
@@ -47,5 +50,6 @@ func sipMessageKeepalive(u Devices, body []byte) error {
// ActiveAt: device.ActiveAt, // ActiveAt: device.ActiveAt,
// }) // })
// return err // return err
return nil // return nil
ctx.String(200, "OK")
} }

View File

@@ -46,7 +46,7 @@ type MediaServer struct {
type SysInfo struct { type SysInfo struct {
// db.DBModel // db.DBModel
// Region 当前域 // Region 当前域
Region string `json:"region" yaml:"region" mapstructure:"region"` // Region string `json:"region" yaml:"region" mapstructure:"region"`
// CID 通道id固定头部 // CID 通道id固定头部
CID string `json:"cid" yaml:"cid" mapstructure:"cid"` CID string `json:"cid" yaml:"cid" mapstructure:"cid"`
// CNUM 当前通道数 // CNUM 当前通道数

View File

@@ -138,7 +138,7 @@ func sipPlayPush(data *Streams, channel Channels, device Devices) (*Streams, err
req.SetDestination(device.source) req.SetDestination(device.source)
req.AppendHeader(&sip.GenericHeader{HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:%s", channel.ChannelID, data.StreamID, _serverDevices.DeviceID, data.StreamID)}) req.AppendHeader(&sip.GenericHeader{HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:%s", channel.ChannelID, data.StreamID, _serverDevices.DeviceID, data.StreamID)})
req.SetRecipient(channel.addr.URI) req.SetRecipient(channel.addr.URI)
tx, err := srv.Request(req) tx, err := svr.Request(req)
if err != nil { if err != nil {
// logrus.Warningln("sipPlayPush fail.id:", device.DeviceID, channel.ChannelID, "err:", err) // logrus.Warningln("sipPlayPush fail.id:", device.DeviceID, channel.ChannelID, "err:", err)
return data, err return data, err
@@ -192,7 +192,7 @@ func SipStopPlay(ssrc string) {
user := u.(Devices) user := u.(Devices)
req := sip.NewRequestFromResponse(sip.MethodBYE, resp) req := sip.NewRequestFromResponse(sip.MethodBYE, resp)
req.SetDestination(user.source) req.SetDestination(user.source)
tx, err := srv.Request(req) tx, err := svr.Request(req)
if err != nil { if err != nil {
// logrus.Warningln("sipStopPlay bye fail.id:", play.DeviceID, play.ChannelID, "err:", err) // logrus.Warningln("sipStopPlay bye fail.id:", play.DeviceID, play.ChannelID, "err:", err)
} }

View File

@@ -30,7 +30,7 @@ func SipRecordList(to *Channels, start, end int64) (*Records, error) {
}).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage) }).SetContentType(&sip.ContentTypeXML).SetMethod(sip.MethodMessage)
req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetRecordInfoXML(to.ChannelID, sn, start, end)) req := sip.NewRequest("", sip.MethodMessage, to.addr.URI, sip.DefaultSipVersion, hb.Build(), sip.GetRecordInfoXML(to.ChannelID, sn, start, end))
req.SetDestination(device.source) req.SetDestination(device.source)
tx, err := srv.Request(req) tx, err := svr.Request(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -3,62 +3,133 @@ package gbs
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strconv"
"strings"
"time"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goweb/pkg/orm"
) )
type GB28181API struct{} const ignorePassword = "#"
type GB28181API struct {
cfg *conf.SIP
store gb28181.GB28181
catalog *sip.Collector[Channels]
}
func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181) *GB28181API {
g := GB28181API{
cfg: &cfg.Sip,
store: store,
catalog: sip.NewCollector[Channels](func(c1, c2 *Channels) bool {
return c1.ChannelID == c2.ChannelID
}),
}
go g.catalog.Start(func(s string, c []*Channels) {
out := make([]*gb28181.Channel, len(c))
for i, ch := range c {
out[i] = &gb28181.Channel{
DeviceID: s,
ChannelID: ch.ChannelID,
Name: ch.Name,
IsOnline: ch.Status == "OK",
Ext: gb28181.DeviceExt{
Manufacturer: ch.Manufacturer,
Model: ch.Model,
},
}
}
g.store.SaveChannels(out)
})
return &g
}
func (g GB28181API) handlerRegister(ctx *sip.Context) { func (g GB28181API) handlerRegister(ctx *sip.Context) {
fromUser, ok := parserDevicesFromReqeust(ctx.Request) if len(ctx.DeviceID) < 18 {
if !ok {
return
}
if len(fromUser.DeviceID) < 18 {
ctx.String(http.StatusBadRequest, "device id too short") ctx.String(http.StatusBadRequest, "device id too short")
return return
} }
// 判断是否存在授权字段 dev, err := g.store.GetDeviceByDeviceID(ctx.DeviceID)
if hdrs := ctx.Request.GetHeaders("Authorization"); len(hdrs) > 0 { if err != nil {
// user := Devices{DeviceID: fromUser.DeviceID} ctx.Log.Error("GetDeviceByDeviceID", "err", err)
// if err := db.Get(db.DBClient, &user); err == nil { ctx.String(http.StatusInternalServerError, "server db error")
// if !user.Regist { return
// // 如果数据库里用户未激活替换user数据
// // fromUser.ID = user.ID
// fromUser.Name = user.Name
// fromUser.PWD = user.PWD
// user = fromUser
// }
// user.addr = fromUser.addr
// authenticateHeader := hdrs[0].(*sip.GenericHeader)
// auth := sip.AuthFromValue(authenticateHeader.Contents)
// auth.SetPassword(user.PWD)
// auth.SetUsername(user.DeviceID)
// auth.SetMethod(string(req.Method()))
// auth.SetURI(auth.Get("uri"))
// if auth.CalcResponse() == auth.Get("response") {
// // 验证成功
// // 记录活跃设备
// user.source = fromUser.source
// user.addr = fromUser.addr
// _activeDevices.Store(user.DeviceID, user)
// if !user.Regist {
// // 第一次激活,保存数据库
// user.Regist = true
// db.DBClient.Save(&user)
// // logrus.Infoln("new user regist,id:", user.DeviceID)
// }
// tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", nil))
// // 注册成功后查询设备信息,获取制作厂商等信息
// go notify(notifyDevicesRegister(user))
// go sipDeviceInfo(fromUser)
// return
// }
// }
} }
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil)
resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf("Digest nonce=\"%s\", algorithm=MD5, realm=\"%s\",qop=\"auth\"", sip.RandString(32), _sysinfo.Region)}) password := dev.Password
ctx.Tx.Respond(resp) if password == "" {
password = g.cfg.Password
}
// 免鉴权
if dev.Password == ignorePassword {
password = ""
}
if password != "" {
hdrs := ctx.Request.GetHeaders("Authorization")
if len(hdrs) == 0 {
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil)
resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf("Digest nonce=\"%s\", algorithm=MD5, realm=\"%s\",qop=\"auth\"", sip.RandString(32), g.cfg.Domain)})
_ = ctx.Tx.Respond(resp)
return
}
authenticateHeader := hdrs[0].(*sip.GenericHeader)
auth := sip.AuthFromValue(authenticateHeader.Contents)
auth.SetPassword(password)
auth.SetUsername(dev.DeviceID)
auth.SetMethod(ctx.Request.Method())
auth.SetURI(auth.Get("uri"))
if auth.CalcResponse() != auth.Get("response") {
ctx.Log.Info("设备注册鉴权失败")
ctx.String(http.StatusUnauthorized, "wrong password")
return
}
}
respFn := func() {
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusOK, "OK", nil)
resp.AppendHeader(&sip.GenericHeader{
HeaderName: "Date",
Contents: time.Now().Format("2006-01-02T15:04:05.000"),
})
_ = ctx.Tx.Respond(resp)
}
expire := ctx.GetHeader("Expires")
if expire == "0" {
ctx.Log.Info("设备注销")
g.logout(ctx.DeviceID, func(b *gb28181.Device) {
b.IsOnline = false
b.Address = ctx.Source.String()
})
respFn()
return
}
g.login(ctx.DeviceID, func(b *gb28181.Device) {
b.IsOnline = true
b.Address = ctx.Source.String()
b.Trasnport = strings.ToUpper(ctx.Source.Network())
b.RegisteredAt = orm.Now()
b.Expires, _ = strconv.Atoi(expire)
})
ctx.Log.Info("设备注册成功")
respFn()
g.QueryDeviceInfo(ctx)
g.QueryCatalog(ctx)
}
func (g GB28181API) login(deviceID string, changeFn func(*gb28181.Device)) {
g.store.Login(deviceID, changeFn)
}
func (g GB28181API) logout(deviceID string, changeFn func(*gb28181.Device)) {
g.store.Logout(deviceID, changeFn)
} }

View File

@@ -1,7 +1,6 @@
package gbs package gbs
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@@ -9,29 +8,42 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/pkg/gbs/m" "github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/gowvp/gb28181/pkg/gbs/sip"
) )
type Server struct { type Server struct {
*sip.Server *sip.Server
gb *GB28181API
} }
func NewServer() *Server { func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181) (*Server, func()) {
api := GB28181API{} api := NewGB28181API(cfg, store)
srv = sip.NewServer() uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", cfg.Sip.ID, cfg.Sip.Domain))
srv.Register(api.handlerRegister) from := sip.Address{
// srv.RegistHandler(sip.MethodMessage, api.handlerMessage) DisplayName: sip.String{Str: "gowvp"},
// srv.Message("catalog", api.handlerMessage) URI: &uri,
// srv.Message("keepalive", api.handlerMessage) Params: sip.NewParams(),
// srv.Message("RecordInfo", api.handlerMessage)
// srv.Message("DeviceInfo", api.handlerMessage)
go srv.ListenUDPServer("0.0.0.0:15060")
go srv.ListenTCPServer(context.TODO(), "0.0.0.0:15060")
return &Server{
Server: srv,
} }
svr = sip.NewServer(&from)
svr.Register(api.handlerRegister)
msg := svr.Message()
msg.Handle("Keepalive", api.sipMessageKeepalive)
msg.Handle("Catalog", api.sipMessageCatalog)
msg.Handle("DeviceInfo", api.sipMessageDeviceInfo)
// msg.Handle("RecordInfo", api.handlerMessage)
go svr.ListenUDPServer(fmt.Sprintf(":%d", cfg.Sip.Port))
go svr.ListenTCPServer(fmt.Sprintf(":%d", cfg.Sip.Port))
c := Server{
Server: svr,
}
return &c, c.Close
} }
func Start() { func Start() {
@@ -44,8 +56,8 @@ func Start() {
LoadSYSInfo() LoadSYSInfo()
srv = sip.NewServer() // svr = sip.NewServer()
go srv.ListenUDPServer(config.UDP) // go svr.ListenUDPServer(config.UDP)
} }
// MODDEBUG MODDEBUG // MODDEBUG MODDEBUG
@@ -97,14 +109,14 @@ func LoadSYSInfo() {
// } // }
m.MConfig.GB28181 = _sysinfo m.MConfig.GB28181 = _sysinfo
uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", _sysinfo.LID, _sysinfo.Region)) // uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s", _sysinfo.LID, _sysinfo.Region))
_serverDevices = Devices{ _serverDevices = Devices{
DeviceID: _sysinfo.LID, DeviceID: _sysinfo.LID,
Region: _sysinfo.Region, // Region: _sysinfo.Region,
addr: &sip.Address{ addr: &sip.Address{
DisplayName: sip.String{Str: "sipserver"}, DisplayName: sip.String{Str: "sipserver"},
URI: &uri, // URI: &uri,
Params: sip.NewParams(), Params: sip.NewParams(),
}, },
} }
@@ -140,3 +152,12 @@ func sipResponse(tx *sip.Transaction) (*sip.Response, error) {
} }
return response, nil return response, nil
} }
// QueryCatalog 查询 catalog
func (s *Server) QueryCatalog(deviceID string) {
// TODO: query 查询不能直接传递 ctx需要缓存目标信息
s.gb.QueryCatalog(nil)
}
func (s *Server) Play() {
}

113
pkg/gbs/sip/collector.go Normal file
View File

@@ -0,0 +1,113 @@
package sip
import (
"log/slog"
"slices"
"time"
)
// Collector .
// 1. 收集器
// 2. 分门别类
// 3. 定时同步,超时删除,删除之前再同步一次
// 4. 不会去重
// 如何使用?
// 1. 通过 NewCatalogRecv 创建一个新的收集器
// 2. s.createCh <- deviceID
// 3. s.catalog.msg <- &CollectorMsg[Channel]{Data: &c, Total: msg.SumNum, Key: msg.DeviceID}
type Collector[T any] struct {
data map[string]*Content[T]
msg chan *CollectorMsg[T]
createCh chan string
noRepeatFn NoRepeatFn[T]
observer *Observer
}
func (c *Collector[T]) Run(key string) {
select {
case c.createCh <- key:
default:
}
}
func (c *Collector[T]) Write(info *CollectorMsg[T]) {
c.msg <- info
}
type CollectorMsg[T any] struct {
Key string
Data *T
Total int
}
type NoRepeatFn[T any] func(*T, *T) bool
// newCollector 创建一个新的收集器
// noRepeatFn 用于提前去重,避免重复数据存储
func NewCollector[T any](noRepeatFn NoRepeatFn[T]) *Collector[T] {
return &Collector[T]{
data: make(map[string]*Content[T]),
msg: make(chan *CollectorMsg[T], 10),
createCh: make(chan string, 10),
noRepeatFn: noRepeatFn,
observer: NewObserver(),
}
}
type Content[T any] struct {
lastUpdateAt time.Time
data []*T
total int
}
// Wait 在执行 Start 以后,可以调用 Wait 等待
func (c *Collector[T]) Wait(key string) {
c.observer.DefaultRegister(key)
}
// Start 启动定时任务检查和保存数据
func (c *Collector[T]) Start(save func(string, []*T)) {
fn := func(k string, data []*T) {
save(k, data)
c.observer.Notify(k)
}
check := time.NewTicker(time.Second * 3)
defer check.Stop()
for {
select {
case <-check.C:
for k, v := range c.data {
if time.Since(v.lastUpdateAt) > 10*time.Second {
fn(k, v.data)
delete(c.data, k)
continue
}
if v.total > 0 && len(v.data) >= v.total {
fn(k, v.data)
delete(c.data, k)
continue
}
}
case v := <-c.createCh:
c.data[v] = &Content[T]{lastUpdateAt: time.Now(), data: make([]*T, 0, 2), total: -1}
case msg := <-c.msg:
data, exist := c.data[msg.Key]
if !exist {
slog.Debug("key 不存在或已过期", "key", msg.Key, "data", msg.Data)
continue
}
// 如果数据已存在且无重复,跳过该消息
if slices.ContainsFunc(data.data, func(v *T) bool {
return c.noRepeatFn(v, msg.Data)
}) {
slog.Debug("catalog 发现重复数据", "key", msg.Key, "data", msg.Data)
continue
}
// 添加数据到对应的条目并更新最后更新时间和总量
data.data = append(data.data, msg.Data)
data.lastUpdateAt = time.Now()
data.total = msg.Total
}
}
}

View File

@@ -125,7 +125,11 @@ func (conn *connection) Write(buf []byte) (int, error) {
} }
func (conn *connection) WriteTo(buf []byte, raddr net.Addr) (num int, err error) { func (conn *connection) WriteTo(buf []byte, raddr net.Addr) (num int, err error) {
num, err = conn.baseConn.(net.PacketConn).WriteTo(buf, raddr) if conn.Network() == "tcp" {
num, err = conn.baseConn.Write(buf)
} else {
num, err = conn.baseConn.(net.PacketConn).WriteTo(buf, raddr)
}
if err != nil { if err != nil {
return num, NewError(err, conn.logKey, "writeTo", conn.baseConn.LocalAddr().String(), raddr.String()) return num, NewError(err, conn.logKey, "writeTo", conn.baseConn.LocalAddr().String(), raddr.String())
} }
@@ -150,7 +154,7 @@ func (conn *connection) Close() error {
} }
func (conn *connection) Network() string { func (conn *connection) Network() string {
return strings.ToUpper(conn.baseConn.LocalAddr().Network()) return conn.baseConn.LocalAddr().Network()
} }
func (conn *connection) SetDeadline(t time.Time) error { func (conn *connection) SetDeadline(t time.Time) error {

View File

@@ -1,37 +1,111 @@
package sip package sip
import (
"fmt"
"log/slog"
"math"
"net"
"strings"
)
const abortIndex int8 = math.MaxInt8 >> 1
type HandlerFunc func(*Context) type HandlerFunc func(*Context)
type Context struct { type Context struct {
Request *Request Request *Request
Tx *Transaction Tx *Transaction
handlers []HandlerFunc handlers []HandlerFunc
index int index int8
cache map[string]any cache map[string]any
DeviceID string
Host string
Port string
Source net.Addr
toAddr *Address
fromAddr *Address
Log *slog.Logger
svr *Server
} }
func newContext(req *Request, tx *Transaction) *Context { func newContext(req *Request, tx *Transaction) *Context {
return &Context{ c := Context{
Request: req, Request: req,
Tx: tx, Tx: tx,
cache: make(map[string]any), cache: make(map[string]any),
Log: slog.Default(),
index: -1,
} }
if err := c.parserRequest(); err != nil {
slog.Error("parserRequest", "err", err)
}
return &c
}
func (c *Context) parserRequest() error {
req := c.Request
header, ok := req.From()
if !ok {
return fmt.Errorf("req from is nil")
}
if header.Address == nil {
return fmt.Errorf("header address is nil")
}
user := header.Address.User()
if user == nil {
return fmt.Errorf("address user is nil")
}
c.DeviceID = user.String()
c.Host = header.Address.Host()
via, ok := req.ViaHop()
if !ok {
return fmt.Errorf("via is nil")
}
c.Host = via.Host
c.Port = via.Port.String()
c.Source = req.Source()
c.toAddr = NewAddressFromFromHeader(header)
c.Log = slog.Default().With("deviceID", c.DeviceID, "host", c.Host)
return nil
} }
func (c *Context) Next() { func (c *Context) Next() {
l := len(c.handlers) c.index++
for { for c.index < int8(len(c.handlers)) {
c.handlers[c.index](c) if fn := c.handlers[c.index]; fn != nil {
if c.index == -1 || c.index >= l { fn(c)
return
} }
c.index++ c.index++
} }
} }
func (c *Context) GetHeader(key string) string {
headers := c.Request.GetHeaders(key)
if len(headers) > 0 {
header := headers[0]
splits := strings.Split(header.String(), ":")
if len(splits) == 2 {
return strings.TrimSpace(splits[1])
}
}
return ""
}
func (c *Context) Abort() { func (c *Context) Abort() {
c.index = -1 c.index = abortIndex
}
func (c *Context) AbortString(status int, msg string) {
c.Abort()
c.String(status, msg)
} }
func (c *Context) String(status int, msg string) { func (c *Context) String(status int, msg string) {
@@ -60,3 +134,14 @@ func (c *Context) GetMustInt(k string) int {
} }
return 0 return 0
} }
func (c *Context) SendRequest(method string, body []byte) (*Transaction, error) {
hb := NewHeaderBuilder().SetTo(c.toAddr).SetFrom(c.fromAddr).AddVia(&ViaHop{
Params: NewParams().Add("branch", String{Str: GenerateBranch()}),
}).SetContentType(&ContentTypeXML).SetMethod(method)
req := NewRequest("", method, c.toAddr.URI, DefaultSipVersion, hb.Build(), body)
req.SetDestination(c.Source)
req.SetConnection(c.Request.conn)
return c.svr.Request(req)
}

29
pkg/gbs/sip/group.go Normal file
View File

@@ -0,0 +1,29 @@
package sip
type RouteGroup struct {
method string
middlewares []HandlerFunc
s *Server
}
type MessageReceive struct {
CmdType string `xml:"CmdType"`
SN int `xml:"SN"`
}
func newRouteGroup(method string, s *Server, ms ...HandlerFunc) *RouteGroup {
return &RouteGroup{
method: method,
middlewares: ms,
s: s,
}
}
func (g *RouteGroup) addGroup(pattern string, handler ...HandlerFunc) {
key := g.method + "-" + pattern
g.s.addRoute(key, append(g.middlewares, handler...)...)
}
func (g *RouteGroup) Handle(pattern string, handler ...HandlerFunc) {
g.addGroup(pattern, handler...)
}

View File

@@ -33,7 +33,7 @@ type HeadersBuilder struct {
func NewHeaderBuilder() *HeadersBuilder { func NewHeaderBuilder() *HeadersBuilder {
callID := CallID(RandString(32)) callID := CallID(RandString(32))
maxForwards := MaxForwards(70) maxForwards := MaxForwards(70)
userAgent := UserAgentHeader("GoSIP") userAgent := UserAgentHeader("GoWVP")
return &HeadersBuilder{ return &HeadersBuilder{
protocol: "SIP", protocol: "SIP",
protocolVersion: "2.0", protocolVersion: "2.0",

View File

@@ -24,7 +24,7 @@ const (
MethodRegister = "REGISTER" MethodRegister = "REGISTER"
MethodOptions = "OPTIONS" MethodOptions = "OPTIONS"
// SUBSCRIBE = "SUBSCRIBE" // SUBSCRIBE = "SUBSCRIBE"
// NOTIFY = "NOTIFY" MethodNotify = "NOTIFY"
// REFER = "REFER" // REFER = "REFER"
MethodInfo = "INFO" MethodInfo = "INFO"
MethodMessage = "MESSAGE" MethodMessage = "MESSAGE"
@@ -80,6 +80,7 @@ type Message interface {
SetSource(src net.Addr) SetSource(src net.Addr)
Destination() net.Addr Destination() net.Addr
SetDestination(dest net.Addr) SetDestination(dest net.Addr)
SetConnection(Connection)
IsCancel() bool IsCancel() bool
IsAck() bool IsAck() bool
@@ -93,6 +94,8 @@ type message struct {
body []byte body []byte
source, dest net.Addr source, dest net.Addr
startLine func() string startLine func() string
conn Connection `json:"-"`
} }
// MessageID MessageID // MessageID MessageID
@@ -167,6 +170,10 @@ func (msg *message) SetSource(src net.Addr) {
msg.source = src msg.source = src
} }
func (msg *message) SetConnection(conn Connection) {
msg.conn = conn
}
// Destination Destination // Destination Destination
func (msg *message) Destination() net.Addr { func (msg *message) Destination() net.Addr {
return msg.dest return msg.dest

73
pkg/gbs/sip/obs.go Normal file
View File

@@ -0,0 +1,73 @@
package sip
import (
"fmt"
"time"
"github.com/ixugo/goweb/pkg/conc"
)
// ObserverHandler 返回 true 表示完成任务
type ObserverHandler func(deviceID string, args ...string) bool
// Observer 观察者
type Observer struct {
data conc.Map[string, ObserverHandler]
}
// NewObserver 创建观察者
func NewObserver() *Observer {
return &Observer{}
}
// concRegister 异步注册观察者
func (o *Observer) concRegister(deviceID string, handler ObserverHandler) {
o.data.Store(deviceID, handler)
}
// Register 同步等待观察者完成任务
func (o *Observer) Register(deviceID string, duration time.Duration, fn ObserverHandler) {
ch := make(chan struct{}, 1)
defer close(ch)
o.concRegister(deviceID, func(did string, args ...string) bool {
if fn(did, args...) {
ch <- struct{}{}
return true
}
return false
})
// 等待通知或超时
select {
// 收到通知
case <-ch:
// 超时7秒
case <-time.After(duration):
o.data.Delete(deviceID)
}
}
// DefaultRegister 默认的注册行为
func (o *Observer) DefaultRegister(deviceID string) {
key := fmt.Sprintf("%s:%d", deviceID, time.Now().UnixMilli())
o.Register(key, 7*time.Second, func(did string, _ ...string) bool {
return deviceID == did
})
}
// RegisterWithTimeout 自定义等待时间
func (o *Observer) RegisterWithTimeout(deviceID string, duration time.Duration) {
key := fmt.Sprintf("%s:%d", deviceID, time.Now().UnixMilli())
o.Register(key, duration, func(did string, _ ...string) bool {
return deviceID == did
})
}
// Notify 通知观察者
func (o *Observer) Notify(deviceID string, args ...string) {
o.data.Range(func(key string, fn ObserverHandler) bool {
if fn(deviceID, args...) {
o.data.Delete(key)
}
return true
})
}

20
pkg/gbs/sip/obs_test.go Normal file
View File

@@ -0,0 +1,20 @@
package sip
import (
"fmt"
"testing"
)
func TestObserver(t *testing.T) {
s := NewObserver()
go func() {
s.Notify("1")
s.Notify("2")
}()
var i int
s.data.Range(func(key string, value ObserverHandler) bool {
i++
return true
})
fmt.Println("i:", i)
}

View File

@@ -665,6 +665,7 @@ func (p *parser) start() {
msg.SetBody(body, false) msg.SetBody(body, false)
} }
msg.SetSource(packet.raddr) msg.SetSource(packet.raddr)
msg.SetConnection(packet.conn)
p.out <- msg p.out <- msg
} }
} }

View File

@@ -157,6 +157,10 @@ func (req *Request) SetDestination(dest net.Addr) {
req.dest = dest req.dest = dest
} }
func (req *Request) SetConnection(conn Connection) {
req.conn = conn
}
// Clone Clone // Clone Clone
func (req *Request) Clone() Message { func (req *Request) Clone() Message {
return NewRequest( return NewRequest(

View File

@@ -12,6 +12,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"github.com/ixugo/goweb/pkg/conc"
) )
var bufferSize uint16 = 65535 - 20 - 8 // IPv4 max size - IPv4 Header size - UDP Header size var bufferSize uint16 = 65535 - 20 - 8 // IPv4 max size - IPv4 Header size - UDP Header size
@@ -22,12 +24,11 @@ var bufferSize uint16 = 65535 - 20 - 8 // IPv4 max size - IPv4 Header size - UDP
// Server Server // Server Server
type Server struct { type Server struct {
udpaddr net.Addr udpaddr net.Addr
conn Connection udpConn Connection
txs *transacionts txs *transacionts
hmu *sync.RWMutex route conc.Map[string, []HandlerFunc]
route map[string][]HandlerFunc
port *Port port *Port
host net.IP host net.IP
@@ -36,42 +37,58 @@ type Server struct {
tcpListener *net.TCPListener tcpListener *net.TCPListener
tcpaddr net.Addr tcpaddr net.Addr
ctx context.Context
cancel context.CancelFunc
from *Address
} }
// NewServer NewServer // NewServer NewServer
func NewServer() *Server { func NewServer(form *Address) *Server {
activeTX = &transacionts{txs: map[string]*Transaction{}, rwm: &sync.RWMutex{}} activeTX = &transacionts{txs: map[string]*Transaction{}, rwm: &sync.RWMutex{}}
ctx, cancel := context.WithCancel(context.TODO())
srv := &Server{ srv := &Server{
hmu: &sync.RWMutex{}, txs: activeTX,
txs: activeTX, ctx: ctx,
route: make(map[string][]HandlerFunc), cancel: cancel,
from: form,
} }
return srv return srv
} }
func (s *Server) addRoute(method string, pattern string, handler ...HandlerFunc) { func (s *Server) addRoute(method string, handler ...HandlerFunc) {
s.hmu.Lock() s.route.Store(strings.ToUpper(method), handler)
defer s.hmu.Unlock()
key := method + "-" + pattern
s.route[key] = handler
} }
func (s *Server) Register(handler ...HandlerFunc) { func (s *Server) Register(handler ...HandlerFunc) {
s.addRoute(MethodRegister, "", handler...) s.addRoute(MethodRegister, handler...)
} }
func (s *Server) Message(handler ...HandlerFunc) { func (s *Server) Message(handler ...HandlerFunc) *RouteGroup {
s.addRoute(MethodMessage, "", handler...) s.addRoute(MethodMessage, handler...)
return newRouteGroup(MethodMessage, s, handler...)
}
func (s *Server) Notify(handler ...HandlerFunc) *RouteGroup {
s.addRoute(MethodNotify, handler...)
return newRouteGroup(MethodNotify, s, handler...)
} }
func (s *Server) getTX(key string) *Transaction { func (s *Server) getTX(key string) *Transaction {
return s.txs.getTX(key) return s.txs.getTX(key)
} }
func (s *Server) mustTX(key string) *Transaction { func (s *Server) mustTX(msg *Request) *Transaction {
key := getTXKey(msg)
tx := s.txs.getTX(key) tx := s.txs.getTX(key)
if tx == nil { if tx == nil {
tx = s.txs.newTX(key, s.conn) if msg.conn.Network() == "udp" {
tx = s.txs.newTX(key, s.udpConn)
} else {
tx = s.txs.newTX(key, msg.conn)
}
} }
return tx return tx
} }
@@ -91,7 +108,7 @@ func (s *Server) ListenUDPServer(addr string) {
if err != nil { if err != nil {
panic(fmt.Errorf("net.ListenUDP err[%w]", err)) panic(fmt.Errorf("net.ListenUDP err[%w]", err))
} }
s.conn = newUDPConnection(udp) s.udpConn = newUDPConnection(udp)
var ( var (
raddr net.Addr raddr net.Addr
num int num int
@@ -101,17 +118,22 @@ func (s *Server) ListenUDPServer(addr string) {
defer parser.stop() defer parser.stop()
go s.handlerListen(parser.out) go s.handlerListen(parser.out)
for { for {
num, raddr, err = s.conn.ReadFrom(buf) select {
if err != nil { case <-s.ctx.Done():
// logrus.Errorln("udp.ReadFromUDP err", err) return
continue default:
num, raddr, err = s.udpConn.ReadFrom(buf)
if err != nil {
slog.Error("udp.ReadFromUDP", "err", err)
continue
}
parser.in <- newPacket(append([]byte{}, buf[:num]...), raddr, s.udpConn)
} }
parser.in <- newPacket(append([]byte{}, buf[:num]...), raddr, s.conn)
} }
} }
// ListenTCPServer 启动 TCP 服务器并监听指定地址。 // ListenTCPServer 启动 TCP 服务器并监听指定地址。
func (s *Server) ListenTCPServer(ctx context.Context, addr string) { func (s *Server) ListenTCPServer(addr string) {
// 解析传入的地址为 TCP 地址 // 解析传入的地址为 TCP 地址
tcpaddr, err := net.ResolveTCPAddr("tcp", addr) tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
// 如果解析地址失败,则抛出错误 // 如果解析地址失败,则抛出错误
@@ -138,7 +160,7 @@ func (s *Server) ListenTCPServer(ctx context.Context, addr string) {
for { for {
select { select {
case <-ctx.Done(): case <-s.ctx.Done():
slog.Info("ListenTCPServer Has Been Exits") slog.Info("ListenTCPServer Has Been Exits")
return return
default: default:
@@ -152,14 +174,25 @@ func (s *Server) ListenTCPServer(ctx context.Context, addr string) {
} }
} }
func (s *Server) Close() {
if s.cancel != nil {
s.cancel()
s.cancel = nil
}
if s.udpConn != nil {
s.udpConn.Close()
s.udpConn = nil
}
if s.tcpListener != nil {
s.tcpListener.Close()
s.tcpListener = nil
}
}
// ProcessTcpConn 处理传入的 TCP 连接。 // ProcessTcpConn 处理传入的 TCP 连接。
func (s *Server) ProcessTcpConn(conn net.Conn) { func (s *Server) ProcessTcpConn(conn net.Conn) {
// 确保在方法退出时关闭连接 defer conn.Close()
defer conn.Close() // 关闭连接
// 创建一个新的缓冲读取器,用于从连接中读取数据
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
// lenBuf := make([]byte, 2)
// 创建一个新的 TCP 连接实例
c := NewTCPConnection(conn) c := NewTCPConnection(conn)
parser := newParser() parser := newParser()
@@ -167,47 +200,34 @@ func (s *Server) ProcessTcpConn(conn net.Conn) {
go s.handlerListen(parser.out) go s.handlerListen(parser.out)
for { for {
// 初始化一个缓冲区,用于存储读取的数据
var buffer bytes.Buffer var buffer bytes.Buffer
// 初始化 body 长度
bodyLen := 0 bodyLen := 0
for { for {
// 读取一行数据,以 '\n' 为结束符 // 读取一行数据,以 '\n' 为结束符
line, err := reader.ReadBytes('\n') line, err := reader.ReadBytes('\n')
// 如果读取过程中出错,则退出方法
if err != nil { if err != nil {
// logrus.Debugln("tcp conn read error:", err) // logrus.Debugln("tcp conn read error:", err)
return return
} }
// 将读取的数据写入缓冲区
buffer.Write(line) buffer.Write(line)
// 如果读取到的行长度小于等于2且 body 长度小于等于0则跳出循环
if len(line) <= 2 { if len(line) <= 2 {
if bodyLen <= 0 { if bodyLen <= 0 {
break break
} }
// 读取 body 数据
// read body
bodyBuf := make([]byte, bodyLen) bodyBuf := make([]byte, bodyLen)
n, err := io.ReadFull(reader, bodyBuf) n, err := io.ReadFull(reader, bodyBuf)
// 如果读取 body 数据时出错,则记录错误并退出循环
if err != nil || n != bodyLen { if err != nil || n != bodyLen {
slog.Error(`error while read full`, "err", err) slog.Error(`error while read full`, "err", err)
// err process
} }
// 将读取的 body 数据写入缓冲区
buffer.Write(bodyBuf) buffer.Write(bodyBuf)
break break
} }
// 如果读取到 "Content-Length" 头部,则解析 body 长度
if strings.Contains(string(line), "Content-Length") { if strings.Contains(string(line), "Content-Length") {
// 以: 对line做分割
s := strings.Split(string(line), ":") s := strings.Split(string(line), ":")
value := strings.Trim(s[len(s)-1], " \r\n") value := strings.Trim(s[len(s)-1], " \r\n")
bodyLen, err = strconv.Atoi(value) bodyLen, err = strconv.Atoi(value)
// 如果解析 "Content-Length" 头部失败,则记录错误并退出循环
if err != nil { if err != nil {
slog.Error("parse Content-Length failed") slog.Error("parse Content-Length failed")
break break
@@ -236,11 +256,22 @@ func (s *Server) handlerListen(msgs chan Message) {
switch tmsg := msg.(type) { switch tmsg := msg.(type) {
case *Request: case *Request:
req := tmsg req := tmsg
req.SetDestination(s.udpaddr)
dst := s.udpaddr
if req.conn.Network() == "tcp" {
dst = s.tcpaddr
}
req.SetDestination(dst)
s.handlerRequest(req) s.handlerRequest(req)
case *Response: case *Response:
resp := tmsg resp := tmsg
resp.SetDestination(s.udpaddr)
dst := s.udpaddr
if resp.conn.Network() == "tcp" {
dst = s.tcpaddr
}
resp.SetDestination(dst)
s.handlerResponse(resp) s.handlerResponse(resp)
default: default:
// logrus.Errorln("undefind msg type,", tmsg, msg.String()) // logrus.Errorln("undefind msg type,", tmsg, msg.String())
@@ -249,19 +280,35 @@ func (s *Server) handlerListen(msgs chan Message) {
} }
func (s *Server) handlerRequest(msg *Request) { func (s *Server) handlerRequest(msg *Request) {
tx := s.mustTX(getTXKey(msg)) tx := s.mustTX(msg)
// logrus.Traceln("receive request from:", msg.Source(), ",method:", msg.Method(), "txKey:", tx.key, "message: \n", msg.String()) // logrus.Traceln("receive request from:", msg.Source(), ",method:", msg.Method(), "txKey:", tx.key, "message: \n", msg.String())
s.hmu.RLock()
handlers, ok := s.route[msg.Method()] key := msg.Method()
s.hmu.RUnlock() if key == MethodMessage || key == MethodNotify {
if l, ok := msg.ContentLength(); !ok || l.Equals(0) {
slog.Error("ContentLength is empty")
return
}
body := msg.Body()
var msg MessageReceive
if err := XMLDecode(body, &msg); err != nil {
slog.Error("xml decode err")
return
}
key += "-" + msg.CmdType
}
handlers, ok := s.route.Load(strings.ToUpper(key))
if !ok { if !ok {
// logrus.Errorln("not found handler func,string:", msg.Method(), msg.String()) slog.Error("not found handler func,string:", "method", msg.Method(), "msg", msg.String())
go handlerMethodNotAllowed(msg, tx) go handlerMethodNotAllowed(msg, tx)
return return
} }
ctx := newContext(msg, tx) ctx := newContext(msg, tx)
ctx.handlers = handlers ctx.handlers = handlers
ctx.fromAddr = s.from
ctx.svr = s
go ctx.Next() go ctx.Next()
} }
@@ -290,7 +337,7 @@ func (s *Server) Request(req *Request) (*Transaction, error) {
viaHop.Params.Add("rport", nil) viaHop.Params.Add("rport", nil)
} }
tx := s.mustTX(getTXKey(req)) tx := s.mustTX(req)
return tx, tx.Request(req) return tx, tx.Request(req)
} }

View File

@@ -4,6 +4,7 @@ import (
"net/http" "net/http"
"sync" "sync"
"time" "time"
"unsafe"
) )
var activeTX *transacionts var activeTX *transacionts
@@ -117,8 +118,10 @@ func (tx *Transaction) Respond(res *Response) error {
// Request Request // Request Request
func (tx *Transaction) Request(req *Request) error { func (tx *Transaction) Request(req *Request) error {
str := req.String()
s := unsafe.Slice(unsafe.StringData(str), len(str))
// logrus.Traceln("send request,to:", req.dest.String(), "txkey:", tx.key, "message: \n", req.String()) // logrus.Traceln("send request,to:", req.dest.String(), "txkey:", tx.key, "message: \n", req.String())
_, err := tx.conn.WriteTo([]byte(req.String()), req.dest) _, err := tx.conn.WriteTo(s, req.dest)
return err return err
} }

View File

@@ -3,7 +3,6 @@ package sip
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/md5"
"encoding/json" "encoding/json"
"encoding/xml" "encoding/xml"
"errors" "errors"
@@ -15,8 +14,8 @@ import (
"net" "net"
"net/http" "net/http"
"time" "time"
"unicode/utf8"
"golang.org/x/net/html/charset"
"golang.org/x/text/encoding/simplifiedchinese" "golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform" "golang.org/x/text/transform"
) )
@@ -156,24 +155,34 @@ func GetRequest(url string) ([]byte, error) {
} }
defer resp.Body.Close() defer resp.Body.Close()
respbody, err := ioutil.ReadAll(resp.Body) respbody, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return respbody, nil return respbody, nil
} }
// GetMD5 GetMD5 // XMLDecode 解码 xml
func GetMD5(str string) string { func XMLDecode(data []byte, v interface{}) error {
h := md5.New() if err := xmlDecode(data, v); err == nil {
io.WriteString(h, str) return nil
return fmt.Sprintf("%x", h.Sum(nil)) }
// 有些body xml发送过来的不带encoding 而且格式不是utf8的导致xml解析失败此处使用gbk转utf8后再次尝试xml解析
body, err := GbkToUtf8(data)
if err != nil {
return err
}
return xmlDecode(body, v)
} }
// XMLDecode XMLDecode func xmlDecode(data []byte, v interface{}) error {
func XMLDecode(data []byte, v interface{}) error {
decoder := xml.NewDecoder(bytes.NewReader(data)) decoder := xml.NewDecoder(bytes.NewReader(data))
decoder.CharsetReader = charset.NewReaderLabel decoder.CharsetReader = func(charset string, input io.Reader) (io.Reader, error) {
if utf8.Valid(data) {
return input, nil
}
return simplifiedchinese.GB18030.NewDecoder().Reader(input), nil
}
return decoder.Decode(v) return decoder.Decode(v)
} }
@@ -226,7 +235,7 @@ func ResolveSelfIP() (net.IP, error) {
// GBK 转 UTF-8 // GBK 转 UTF-8
func GbkToUtf8(s []byte) ([]byte, error) { func GbkToUtf8(s []byte) ([]byte, error) {
reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewDecoder()) reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewDecoder())
d, e := ioutil.ReadAll(reader) d, e := io.ReadAll(reader)
if e != nil { if e != nil {
return nil, e return nil, e
} }
@@ -236,7 +245,7 @@ func GbkToUtf8(s []byte) ([]byte, error) {
// UTF-8 转 GBK // UTF-8 转 GBK
func Utf8ToGbk(s []byte) ([]byte, error) { func Utf8ToGbk(s []byte) ([]byte, error) {
reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewEncoder()) reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewEncoder())
d, e := ioutil.ReadAll(reader) d, e := io.ReadAll(reader)
if e != nil { if e != nil {
return nil, e return nil, e
} }

View File

@@ -150,7 +150,7 @@ func CheckStreams() {
StreamList.Response.Delete(stream.StreamID) StreamList.Response.Delete(stream.StreamID)
StreamList.Succ.Delete(stream.ChannelID) StreamList.Succ.Delete(stream.ChannelID)
tx, err := srv.Request(req) tx, err := svr.Request(req)
if err != nil { if err != nil {
// logrus.Warningln("checkStreamClosedFail", stream.StreamID, err) // logrus.Warningln("checkStreamClosedFail", stream.StreamID, err)
stream.Msg = err.Error() stream.Msg = err.Error()

59
pkg/zlm/rtp.go Normal file
View File

@@ -0,0 +1,59 @@
package zlm
const (
openRtpServer = `/index/api/openRtpServer`
closeRtpServer = `/index/api/closeRtpServer`
)
type OpenRTPServerResponse struct {
Code int `json:"code"`
Port int `json:"port"` // 接收端口,方便获取随机端口号
}
type OpenRTPServerRequest struct {
Port int `json:"port"` // 接收端口0 则为随机端口
TCPMode int `json:"tcp_mode"` // 0 udp 模式1 tcp 被动模式, 2 tcp 主动模式。 (兼容 enable_tcp 为 0/1)
StreamID string `json:"stream_id"` // 该端口绑定的流 ID该端口只能创建这一个流(而不是根据 ssrc 创建多个)
}
// OpenRTPServer 创建 GB28181 RTP 接收端口,如果该端口接收数据超时,则会自动被回收(不用调用 closeRtpServer 接口)
// https://docs.zlmediakit.com/zh/guide/media_server/restful_api.html#_24%E3%80%81-index-api-openrtpserver
func (e *Engine) OpenRTPServer(in OpenRTPServerRequest) (*OpenRTPServerResponse, error) {
body, err := struct2map(in)
if err != nil {
return nil, err
}
var resp OpenRTPServerResponse
if err := e.post(openRtpServer, body, &resp); err != nil {
return nil, err
}
if err := e.ErrHandle(resp.Code, "rtp err"); err != nil {
return nil, err
}
return &resp, nil
}
type CloseRTPServerRequest struct {
StreamID string `json:"stream_id"` // 调用 openRtpServer 接口时提供的流 ID
}
type CloseRTPServerResponse struct {
Code int `json:"code"`
Hit int `json:"hit"` // 是否找到记录并关闭
}
// CloseRTPServer 关闭 GB28181 RTP 接收端口
// https://docs.zlmediakit.com/zh/guide/media_server/restful_api.html#_25%E3%80%81-index-api-closertpserver
func (e *Engine) CloseRTPServer(in CloseRTPServerRequest) (*CloseRTPServerResponse, error) {
body, err := struct2map(in)
if err != nil {
return nil, err
}
var resp CloseRTPServerResponse
if err := e.post(closeRtpServer, body, &resp); err != nil {
return nil, err
}
if err := e.ErrHandle(resp.Code, "rtp close err"); err != nil {
return nil, err
}
return &resp, nil
}

21
pkg/zlm/snap.go Normal file
View File

@@ -0,0 +1,21 @@
package zlm
const (
getSnapshot = `/index/api/getSnap`
)
type GetSnapRequest struct {
URL string
TimeoutSec int // 截图失败超时时间
ExpireSec int // 截图过期时间
}
// GetSnap 获取截图或生成实时截图并返回
// https://docs.zlmediakit.com/zh/guide/media_server/restful_api.html#_23%E3%80%81-index-api-getsnap
func (e *Engine) GetSnap(in GetSnapRequest) ([]byte, error) {
body, err := struct2map(in)
if err != nil {
return nil, err
}
return e.post2(getSnapshot, body)
}

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"maps" "maps"
"net/http" "net/http"
"time" "time"
@@ -58,12 +59,27 @@ func (e *Engine) post(path string, data map[string]any, out any) error {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
// b, _ := io.ReadAll(resp.Body) // b, _ := io.ReadAll(resp.Body)
// fmt.Println(string(b)) // fmt.Println(string(b))
return json.NewDecoder(resp.Body).Decode(out) return json.NewDecoder(resp.Body).Decode(out)
} }
// post2 直接读取全部响应返回
func (e *Engine) post2(path string, data map[string]any) ([]byte, error) {
bodyMap := map[string]any{
"secret": e.cfg.Secret,
}
maps.Copy(bodyMap, data)
body, _ := json.Marshal(bodyMap)
resp, err := e.cli.Post(e.cfg.URL+path, "application/json", bytes.NewReader(body))
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
func (e *Engine) ErrHandle(code int, msg string) error { func (e *Engine) ErrHandle(code int, msg string) error {
switch code { switch code {
case Success: case Success: