add stream proxy

This commit is contained in:
xugo
2025-02-11 23:08:56 +08:00
parent 085a22349b
commit 28d2e24637
19 changed files with 431 additions and 60 deletions

View File

@@ -12,7 +12,6 @@
go wvp 是 Go 语言实现的开源 GB28181 解决方案基于GB28181-2022标准实现的网络视频平台支持 rtmp/rtsp客户端支持网页版本和安卓 App。支持rtsp/rtmp等视频流转发到国标平台支持rtsp/rtmp等推流转发到国标平台。 go wvp 是 Go 语言实现的开源 GB28181 解决方案基于GB28181-2022标准实现的网络视频平台支持 rtmp/rtsp客户端支持网页版本和安卓 App。支持rtsp/rtmp等视频流转发到国标平台支持rtsp/rtmp等推流转发到国标平台。
## 在线演示平台 ## 在线演示平台
+ [在线演示平台 :)](http://gowvp.golang.space:15123/) + [在线演示平台 :)](http://gowvp.golang.space:15123/)
@@ -21,8 +20,6 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案基于GB28181-2022标
|![](./docs/demo/home.webp)|![](./docs/demo/play.webp)| |![](./docs/demo/home.webp)|![](./docs/demo/play.webp)|
|-|-| |-|-|
## 应用场景: ## 应用场景:
+ 支持浏览器无插件播放摄像头视频。 + 支持浏览器无插件播放摄像头视频。
+ 支持国标设备(摄像机、平台、NVR等)设备接入 + 支持国标设备(摄像机、平台、NVR等)设备接入
@@ -43,10 +40,6 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案基于GB28181-2022标
Java 语言 WVP @648540858 [wvp-GB28181-pro](https://github.com/648540858/wvp-GB28181-pro) Java 语言 WVP @648540858 [wvp-GB28181-pro](https://github.com/648540858/wvp-GB28181-pro)
## GoWVP, GB/T28181 交流群
<h1>点个 Star 再扫码进群呀</h1>
<img src="./wechat.jpg" alt="wechat" width="200"/>
## QA ## QA
> 怎么没有前端资源? 如何加载网页呢? > 怎么没有前端资源? 如何加载网页呢?
@@ -93,6 +86,10 @@ GoWVP [在线接口文档](apifox.com/apidoc/shared-7b67c918-5f72-4f64-b71d-0593
ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit) ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit)
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
<h1>看到这里啦,恭喜你发现新项目</h1>
<h1>点个 star 不迷路</h1>
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
## 快速开始 ## 快速开始
@@ -116,7 +113,6 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit
+ 6. 浏览器访问 `http://localhost:15123` + 6. 浏览器访问 `http://localhost:15123`
## 如何参与开发? ## 如何参与开发?
1. fork 本项目 1. fork 本项目
@@ -124,51 +120,35 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit
3. 修改,提交 PR说明修改内容 3. 修改,提交 PR说明修改内容
## 功能特性 ## 功能特性
- [x] 集成 web 界面
- [x] 接入设备
- [x] 视频预览
- [x] 无限制接入路数,能接入多少设备只取决于你的服务器性能
- [ ] 云台控制,控制设备转向,拉近,拉远
- [ ] 预置位查询,使用与设置
- [ ] 查询 NVR/IPC 上的录像与播放,支持指定时间播放与下载
- [x] 无人观看自动断流,节省流量
- [x] 视频设备信息同步
- [ ] 离在线监控
- [x] 支持直接输出RTSP、RTMP、HTTP-FLV、Websocket-FLV、HLS多种协议流地址
- [ ] 支持通过一个流地址直接观看摄像头,无需登录以及调用任何接口
- [x] 支持 UDP 和 TCP 两种国标信令传输模式
- [ ] 支持 UDP 和 TCP 被动,TCP 主动 三种国标流传输模式
- [x] 支持检索,通道筛选
- [ ] 支持通道子目录查询
- [ ] 支持过滤音频,防止杂音影响观看
- [x] 支持国标网络校时
- [x] 支持播放 H264 和 H265
- [ ] 报警信息处理,支持向前端推送报警信息
- [ ] 语音对讲
- [ ] 支持业务分组和行政区划树自定义展示以及级联推送
- [ ] 支持订阅与通知方法
- [ ] 移动位置订阅
- [ ] 移动位置通知处理
- [ ] 报警事件订阅
- [ ] 报警事件通知处理
- [ ] 设备目录订阅
- [ ] 设备目录通知处理
- [ ] 移动位置查询和显示
- [x] 支持手动添加设备和给设备设置单独的密码
- [ ] 支持平台对接接入
- [x] 支持自动配置ZLM媒体服务, 减少因配置问题所出现的问题
- [x] 支持启用 udp 多端口模式, 提高 udp 模式下媒体传输性能
- [x] 支持局域网/互联网/特殊网络环境部署
- [x] 支持 gowvp 与 zlm 分开部署,提升平台并发能力
- [ ] 支持拉流RTSP/RTMP分发为各种流格式或者推送到其他国标平台
- [ ] 支持推流RTSP/RTMP分发为各种流格式或者推送到其他国标平台
- [x] 支持推流鉴权
- [x] 支持接口鉴权
- [ ] 云端录像,推流/代理/国标视频均可以录制在云端服务器,支持预览和下载
- [ ] 支持跨域请求,支持前后端分离部署
- [x] 支持 PostgreSQL 数据库
- [ ] 支持录制计划, 根据设定的时间对通道进行录制. 暂不支持将录制的内容转发到国标上级
- [x] 开箱即用,支持 web
- [ ] 支持移动端 app
- [x] 支持 rtmp 流分发
- [x] 支持 rtsp 流分发
- [x] 支持输出 HTTP_FLV,Websocket_FLV,HLS,WebRTC,RTSP、RTMP 等多种协议流地址
- [x] 支持局域网/互联网/多层 NAT/特殊网络环境部署
- [x] 支持 SQLite 数据库快速部署
- [x] 支持 PostgreSQL 数据库,当接入设备数超过 300 时推荐
- [x] GB/T 28181
- [x] 设备注册,支持 7 种接入方式
- [x] 支持 UDP 和 TCP 两种国标信令传输模式
- [x] 设备校时
- [x] 设备目录查询
- [x] 设备信息同步
- [x] 设备实时直播
- [x] 支持 UDP 和 TCP 被动两种国标流传输模式
- [x] 按需拉流,节省流量
- [x] 视频支持播放 H264 和 H265
- [x] 音频支持 g711a/g711u/aac
- [ ] 设备语音对讲
- [ ] 设备云台控制
- [ ] 预置位控制
- [ ] 录像文件检索
- [ ] 录像文件下载
- [ ] 录像回放
- [ ] 录像倍速
- [ ] 报警事件订阅
- [ ] 报警事件通知处理
## 授权协议 ## 授权协议

View File

@@ -31,6 +31,7 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
webHookAPI := api.NewWebHookAPI(smsCore, mediaCore, bc) webHookAPI := api.NewWebHookAPI(smsCore, mediaCore, bc)
mediaAPI := api.NewMediaAPI(mediaCore, smsCore, bc) mediaAPI := api.NewMediaAPI(mediaCore, smsCore, bc)
gb28181API := api.NewGb28181API(db, uniqueidCore) gb28181API := api.NewGb28181API(db, uniqueidCore)
proxyAPI := api.NewProxyAPI(db, uniqueidCore)
gb28181 := api.NewGB28181(db, uniqueidCore) gb28181 := api.NewGB28181(db, uniqueidCore)
server, cleanup := gbs.NewServer(bc, gb28181, smsCore) server, cleanup := gbs.NewServer(bc, gb28181, smsCore)
usecase := &api.Usecase{ usecase := &api.Usecase{
@@ -42,6 +43,7 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
UniqueID: uniqueidCore, UniqueID: uniqueidCore,
MediaAPI: mediaAPI, MediaAPI: mediaAPI,
GB28181API: gb28181API, GB28181API: gb28181API,
ProxyAPI: proxyAPI,
SipServer: server, SipServer: server,
} }
handler := api.NewHTTPHandler(usecase) handler := api.NewHTTPHandler(usecase)

View File

@@ -14,9 +14,12 @@ services:
zlm: zlm:
image: zlmediakit/zlmediakit:master image: zlmediakit/zlmediakit:master
restart: always restart: always
# 推荐 linux 主机使用 host 模式
# network_mode: host
ports: ports:
- 1935:1935 # rtmp - 1935:1935 # rtmp
- 8080:80 - 554:554 # rtsp
- 8080:80 # api
- 8443:443 - 8443:443
- 10000:10000 - 10000:10000
- 10000:10000/udp - 10000:10000/udp

View File

@@ -3,6 +3,6 @@ package bz
const ( const (
IDPrefixGB = "gb" // 国标设备 IDPrefixGB = "gb" // 国标设备
IDPrefixGBChannel = "ch" // 国标通道 id 前缀 IDPrefixGBChannel = "ch" // 国标通道 id 前缀
IDPrefixRTMP = "m" // rtmp ID 前缀,取 rtmp 的 m不好记但是清晰 IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰
IDPrefixRTSP = "s" // rtsp ID 前缀,取 rtsp 的 s不好记但是清晰 IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰
) )

23
internal/core/proxy/core.go Executable file
View File

@@ -0,0 +1,23 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxy
import "github.com/gowvp/gb28181/internal/core/uniqueid"
// Storer data persistence
type Storer interface {
StreamProxy() StreamProxyStorer
}
// Core business domain
type Core struct {
store Storer
uniqueID uniqueid.Core
}
// NewCore create business domain
func NewCore(store Storer, uni uniqueid.Core) *Core {
return &Core{
store: store,
uniqueID: uni,
}
}

2
internal/core/proxy/model.go Executable file
View File

@@ -0,0 +1,2 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxy

View File

@@ -0,0 +1,37 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxydb
import (
"github.com/gowvp/gb28181/internal/core/proxy"
"gorm.io/gorm"
)
var _ proxy.Storer = DB{}
// DB Related business namespaces
type DB struct {
db *gorm.DB
}
// NewDB instance object
func NewDB(db *gorm.DB) DB {
return DB{db: db}
}
// StreamProxy Get business instance
func (d DB) StreamProxy() proxy.StreamProxyStorer {
return StreamProxy(d)
}
// AutoMigrate sync database
func (d DB) AutoMigrate(ok bool) DB {
if !ok {
return d
}
if err := d.db.AutoMigrate(
new(proxy.StreamProxy),
); err != nil {
panic(err)
}
return d
}

View File

@@ -0,0 +1,18 @@
package proxydb
import (
"github.com/DATA-DOG/go-sqlmock"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func generateMockDB() (*gorm.DB, sqlmock.Sqlmock, error) {
db, mock, err := sqlmock.New()
if err != nil {
return nil, nil, err
}
gormDB, err := gorm.Open(postgres.New(postgres.Config{
Conn: db,
}), &gorm.Config{})
return gormDB, mock, err
}

View File

@@ -0,0 +1,60 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxydb
import (
"context"
"github.com/gowvp/gb28181/internal/core/proxy"
"github.com/ixugo/goweb/pkg/orm"
"gorm.io/gorm"
)
var _ proxy.StreamProxyStorer = StreamProxy{}
// StreamProxy Related business namespaces
type StreamProxy DB
// NewStreamProxy instance object
func NewStreamProxy(db *gorm.DB) StreamProxy {
return StreamProxy{db: db}
}
// Find implements proxy.StreamProxyStorer.
func (d StreamProxy) Find(ctx context.Context, bs *[]*proxy.StreamProxy, page orm.Pager, opts ...orm.QueryOption) (int64, error) {
return orm.FindWithContext(ctx, d.db, bs, page, opts...)
}
// Get implements proxy.StreamProxyStorer.
func (d StreamProxy) Get(ctx context.Context, model *proxy.StreamProxy, opts ...orm.QueryOption) error {
return orm.FirstWithContext(ctx, d.db, model, opts...)
}
// Add implements proxy.StreamProxyStorer.
func (d StreamProxy) Add(ctx context.Context, model *proxy.StreamProxy) error {
return d.db.WithContext(ctx).Create(model).Error
}
// Edit implements proxy.StreamProxyStorer.
func (d StreamProxy) Edit(ctx context.Context, model *proxy.StreamProxy, changeFn func(*proxy.StreamProxy), opts ...orm.QueryOption) error {
return orm.UpdateWithContext(ctx, d.db, model, changeFn, opts...)
}
// Del implements proxy.StreamProxyStorer.
func (d StreamProxy) Del(ctx context.Context, model *proxy.StreamProxy, opts ...orm.QueryOption) error {
return orm.DeleteWithContext(ctx, d.db, model, opts...)
}
func (d StreamProxy) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, fn := range changeFns {
if err := fn(tx); err != nil {
return err
}
}
return nil
})
}
func (d StreamProxy) EditWithSession(tx *gorm.DB, model *proxy.StreamProxy, changeFn func(b *proxy.StreamProxy) error, opts ...orm.QueryOption) error {
return orm.UpdateWithSession(tx, model, changeFn, opts...)
}

View File

@@ -0,0 +1,26 @@
package proxydb
import (
"context"
"testing"
"github.com/gowvp/gb28181/internal/core/proxy"
"github.com/ixugo/goweb/pkg/orm"
)
func TestStreamProxyGet(t *testing.T) {
db, mock, err := generateMockDB()
if err != nil {
t.Fatal(err)
}
userDB := NewStreamProxy(db)
mock.ExpectQuery(`SELECT \* FROM "stream_proxys" WHERE id=\$1 (.+) LIMIT \$2`).WithArgs("jack", 1)
var out proxy.StreamProxy
if err := userDB.Get(context.Background(), &out, orm.Where("id=?", "jack")); err != nil {
t.Fatal(err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("ExpectationsWereMet err:", err)
}
}

View File

@@ -0,0 +1,81 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxy
import (
"context"
"log/slog"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goweb/pkg/orm"
"github.com/ixugo/goweb/pkg/web"
"github.com/jinzhu/copier"
)
// StreamProxyStorer Instantiation interface
type StreamProxyStorer interface {
Find(context.Context, *[]*StreamProxy, orm.Pager, ...orm.QueryOption) (int64, error)
Get(context.Context, *StreamProxy, ...orm.QueryOption) error
Add(context.Context, *StreamProxy) error
Edit(context.Context, *StreamProxy, func(*StreamProxy), ...orm.QueryOption) error
Del(context.Context, *StreamProxy, ...orm.QueryOption) error
}
// FindStreamProxy Paginated search
func (c *Core) FindStreamProxy(ctx context.Context, in *FindStreamProxyInput) ([]*StreamProxy, int64, error) {
items := make([]*StreamProxy, 0)
total, err := c.store.StreamProxy().Find(ctx, &items, in)
if err != nil {
return nil, 0, web.ErrDB.Withf(`Find err[%s]`, err.Error())
}
return items, total, nil
}
// GetStreamProxy Query a single object
func (c *Core) GetStreamProxy(ctx context.Context, id string) (*StreamProxy, error) {
var out StreamProxy
if err := c.store.StreamProxy().Get(ctx, &out, orm.Where("id=?", id)); err != nil {
if orm.IsErrRecordNotFound(err) {
return nil, web.ErrNotFound.Withf(`Get err[%s]`, err.Error())
}
return nil, web.ErrDB.Withf(`Get err[%s]`, err.Error())
}
return &out, nil
}
// AddStreamProxy Insert into database
func (c *Core) AddStreamProxy(ctx context.Context, in *AddStreamProxyInput) (*StreamProxy, error) {
var out StreamProxy
if err := copier.Copy(&out, in); err != nil {
slog.Error("Copy", "err", err)
}
out.ID = c.uniqueID.UniqueID(bz.IDPrefixRTSP)
if err := c.store.StreamProxy().Add(ctx, &out); err != nil {
if orm.IsDuplicatedKey(err) {
return nil, web.ErrDB.Msg("stream 重复,请勿重复添加")
}
return nil, web.ErrDB.Withf(`Add err[%s]`, err.Error())
}
return &out, nil
}
// EditStreamProxy Update object information
func (c *Core) EditStreamProxy(ctx context.Context, in *EditStreamProxyInput, id string) (*StreamProxy, error) {
var out StreamProxy
if err := c.store.StreamProxy().Edit(ctx, &out, func(b *StreamProxy) {
if err := copier.Copy(b, in); err != nil {
slog.Error("Copy", "err", err)
}
}, orm.Where("id=?", id)); err != nil {
return nil, web.ErrDB.Withf(`Edit err[%s]`, err.Error())
}
return &out, nil
}
// DelStreamProxy Delete object
func (c *Core) DelStreamProxy(ctx context.Context, id string) (*StreamProxy, error) {
var out StreamProxy
if err := c.store.StreamProxy().Del(ctx, &out, orm.Where("id=?", id)); err != nil {
return nil, web.ErrDB.Withf(`Del err[%s]`, err.Error())
}
return &out, nil
}

View File

@@ -0,0 +1,28 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxy
import "github.com/ixugo/goweb/pkg/orm"
// StreamProxy domain model
type StreamProxy struct {
ID string `gorm:"primaryKey" json:"id"`
CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间
App string `gorm:"column:app;uniqueIndex:idx_stream_proxys_app_stream;notNull;default:'';comment:应用名" json:"app"` // 应用名
Stream string `gorm:"column:stream;uniqueIndex:idx_stream_proxys_app_stream;notNull;default:'';comment:流 id" json:"stream"` // 流 id
MediaServerID string `gorm:"column:media_server_id;notNull;default:'';comment:媒体服务器 id" json:"media_server_id"` // 媒体服务器 id
SourceURL string `gorm:"column:source_url;notNull;default:'';comment:原始 url" json:"source_url"` // 原始 url
TimeoutS int `gorm:"column:timeout_s;notNull;default:0;comment:超时时间(秒)" json:"timeout_s"` // 超时时间(秒)
Transport int `gorm:"column:transport;notNull;default:0;comment:rtsp 拉流方式(0:udp;1:tcp)" json:"transport"` // rtsp 拉流方式(0:udp;1:tcp)
Enabled bool `gorm:"column:enabled;notNull;default:FALSE;comment:是否启用" json:"enabled"` // 是否启用
EnabledAudio bool `gorm:"column:enabled_audio;notNull;default:FALSE;comment:是否启用音频" json:"enabled_audio"` // 是否启用音频
EnabledRemoveNoneReader bool `gorm:"column:enabled_remove_none_reader;notNull;default:FALSE;comment:是否无人观看时删除" json:"enabled_remove_none_reader"` // 是否无人观看时删除
EnabledDisabledNoneReader bool `gorm:"column:enabled_disabled_none_reader;notNull;default:FALSE;comment:是否无人观看时禁用" json:"enabled_disabled_none_reader"` // 是否无人观看时禁用
StreamKey string `gorm:"column:stream_key;notNull;default:'';comment:拉流代理时 zlm 返回的 key用于停止拉流代理" json:"stream_key"` // 拉流代理时 zlm 返回的 key用于停止拉流代理
Pulling bool `gorm:"column:pulling;notNull;default:FALSE;comment:拉流状态" json:"pulling"` // 拉流状态
}
// TableName database table name
func (*StreamProxy) TableName() string {
return "stream_proxys"
}

View File

@@ -0,0 +1,50 @@
// Code generated by gowebx, DO AVOID EDIT.
package proxy
import "github.com/ixugo/goweb/pkg/web"
type FindStreamProxyInput struct {
web.PagerFilter
App string `form:"app"` // 应用名
Stream string `form:"stream"` // 流 id
MediaServerID string `form:"media_server_id"` // 媒体服务器 id
SourceURL string `form:"source_url"` // 原始 url
TimeoutS int `form:"timeout_s"` // 超时时间(秒)
Transport int `form:"transport"` // rtsp 拉流方式(0:udp;1:tcp)
Enabled bool `form:"enabled"` // 是否启用
EnabledAudio bool `form:"enabled_audio"` // 是否启用音频
EnabledRemoveNoneReader bool `form:"enabled_remove_none_reader"` // 是否无人观看时删除
EnabledDisabledNoneReader bool `form:"enabled_disabled_none_reader"` // 是否无人观看时禁用
StreamKey string `form:"stream_key"` // 拉流代理时 zlm 返回的 key用于停止拉流代理
Pulling bool `form:"pulling"` // 拉流状态
}
type EditStreamProxyInput struct {
App string `json:"app"` // 应用名
Stream string `json:"stream"` // 流 id
MediaServerID string `json:"media_server_id"` // 媒体服务器 id
SourceURL string `json:"source_url"` // 原始 url
TimeoutS int `json:"timeout_s"` // 超时时间(秒)
Transport int `json:"transport"` // rtsp 拉流方式(0:udp;1:tcp)
Enabled bool `json:"enabled"` // 是否启用
EnabledAudio bool `json:"enabled_audio"` // 是否启用音频
EnabledRemoveNoneReader bool `json:"enabled_remove_none_reader"` // 是否无人观看时删除
EnabledDisabledNoneReader bool `json:"enabled_disabled_none_reader"` // 是否无人观看时禁用
StreamKey string `json:"stream_key"` // 拉流代理时 zlm 返回的 key用于停止拉流代理
Pulling bool `json:"pulling"` // 拉流状态
}
type AddStreamProxyInput struct {
App string `json:"app"` // 应用名
Stream string `json:"stream"` // 流 id
MediaServerID string `json:"media_server_id"` // 媒体服务器 id
SourceURL string `json:"source_url"` // 原始 url
TimeoutS int `json:"timeout_s"` // 超时时间(秒)
Transport int `json:"transport"` // rtsp 拉流方式(0:udp;1:tcp)
Enabled bool `json:"enabled"` // 是否启用
EnabledAudio bool `json:"enabled_audio"` // 是否启用音频
EnabledRemoveNoneReader bool `json:"enabled_remove_none_reader"` // 是否无人观看时删除
EnabledDisabledNoneReader bool `json:"enabled_disabled_none_reader"` // 是否无人观看时禁用
StreamKey string `json:"stream_key"` // 拉流代理时 zlm 返回的 key用于停止拉流代理
Pulling bool `json:"pulling"` // 拉流状态
}

View File

@@ -22,6 +22,7 @@ import (
var startRuntime = time.Now() var startRuntime = time.Now()
func setupRouter(r *gin.Engine, uc *Usecase) { func setupRouter(r *gin.Engine, uc *Usecase) {
uc.GB28181API.uc = uc
go stat.LoadTop(system.Getwd(), func(m map[string]any) { go stat.LoadTop(system.Getwd(), func(m map[string]any) {
_ = m _ = m
}) })
@@ -66,9 +67,8 @@ func setupRouter(r *gin.Engine, uc *Usecase) {
registerZLMWebhookAPI(r, uc.WebHookAPI) registerZLMWebhookAPI(r, uc.WebHookAPI)
// TODO: 待增加鉴权 // TODO: 待增加鉴权
registerMediaAPI(r, uc.MediaAPI) registerMediaAPI(r, uc.MediaAPI)
uc.GB28181API.uc = uc
registerGB28181(r, uc.GB28181API) registerGB28181(r, uc.GB28181API)
registerProxy(r, uc.ProxyAPI)
} }
type playOutput struct { type playOutput struct {

View File

@@ -2,6 +2,6 @@ package api
// 如果需要执行表迁移,递增此版本号和表更新说明 // 如果需要执行表迁移,递增此版本号和表更新说明
var ( var (
dbVersion = "0.0.7" dbVersion = "0.0.8"
dbRemark = "add devices" dbRemark = "add stream proxy"
) )

View File

@@ -34,6 +34,7 @@ var (
gbs.NewServer, gbs.NewServer,
NewGb28181API, NewGb28181API,
NewGB28181, NewGB28181,
NewProxyAPI,
) )
) )
@@ -46,6 +47,7 @@ type Usecase struct {
UniqueID uniqueid.Core UniqueID uniqueid.Core
MediaAPI MediaAPI MediaAPI MediaAPI
GB28181API GB28181API GB28181API GB28181API
ProxyAPI ProxyAPI
SipServer *gbs.Server SipServer *gbs.Server
} }

58
internal/web/api/proxy.go Executable file
View File

@@ -0,0 +1,58 @@
// Code generated by gowebx, DO AVOID EDIT.
package api
import (
"github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/internal/core/proxy"
"github.com/gowvp/gb28181/internal/core/proxy/store/proxydb"
"github.com/gowvp/gb28181/internal/core/uniqueid"
"github.com/ixugo/goweb/pkg/orm"
"github.com/ixugo/goweb/pkg/web"
"gorm.io/gorm"
)
type ProxyAPI struct {
proxyCore *proxy.Core
}
func NewProxyAPI(db *gorm.DB, uni uniqueid.Core) ProxyAPI {
core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate), uni)
return ProxyAPI{proxyCore: core}
}
func registerProxy(g gin.IRouter, api ProxyAPI, handler ...gin.HandlerFunc) {
{
group := g.Group("/stream_proxys", handler...)
group.GET("", web.WarpH(api.findStreamProxy))
group.GET("/:id", web.WarpH(api.getStreamProxy))
group.PUT("/:id", web.WarpH(api.editStreamProxy))
group.POST("", web.WarpH(api.addStreamProxy))
group.DELETE("/:id", web.WarpH(api.delStreamProxy))
}
}
// >>> streamProxy >>>>>>>>>>>>>>>>>>>>
func (a ProxyAPI) findStreamProxy(c *gin.Context, in *proxy.FindStreamProxyInput) (any, error) {
items, total, err := a.proxyCore.FindStreamProxy(c.Request.Context(), in)
return gin.H{"items": items, "total": total}, err
}
func (a ProxyAPI) getStreamProxy(c *gin.Context, _ *struct{}) (any, error) {
streamProxyID := c.Param("id")
return a.proxyCore.GetStreamProxy(c.Request.Context(), streamProxyID)
}
func (a ProxyAPI) editStreamProxy(c *gin.Context, in *proxy.EditStreamProxyInput) (any, error) {
streamProxyID := c.Param("id")
return a.proxyCore.EditStreamProxy(c.Request.Context(), in, streamProxyID)
}
func (a ProxyAPI) addStreamProxy(c *gin.Context, in *proxy.AddStreamProxyInput) (any, error) {
return a.proxyCore.AddStreamProxy(c.Request.Context(), in)
}
func (a ProxyAPI) delStreamProxy(c *gin.Context, _ *struct{}) (any, error) {
streamProxyID := c.Param("id")
return a.proxyCore.DelStreamProxy(c.Request.Context(), streamProxyID)
}

View File

@@ -23,7 +23,7 @@ type GB28181API struct {
catalog *sip.Collector[Channels] catalog *sip.Collector[Channels]
// TODO: 待替换成 redis // TODO: 待替换成 redis
streams conc.Map[string, *Streams] streams *conc.Map[string, *Streams]
svr *Server svr *Server
@@ -38,6 +38,7 @@ func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181, sms *sms.NodeMana
catalog: sip.NewCollector[Channels](func(c1, c2 *Channels) bool { catalog: sip.NewCollector[Channels](func(c1, c2 *Channels) bool {
return c1.ChannelID == c2.ChannelID return c1.ChannelID == c2.ChannelID
}), }),
streams: &conc.Map[string, *Streams]{},
} }
go g.catalog.Start(func(s string, c []*Channels) { go g.catalog.Start(func(s string, c []*Channels) {
// 零值不做变更,没有通道又何必注册上来 // 零值不做变更,没有通道又何必注册上来

Binary file not shown.

Before

Width:  |  Height:  |  Size: 142 KiB