rtmo pull auth

This commit is contained in:
xugo
2025-01-24 01:50:42 +08:00
parent 78d8ce3ac8
commit eebb9f5bd1
10 changed files with 134 additions and 57 deletions

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/gin-gonic/gin v1.10.0
github.com/glebarez/sqlite v1.11.0
github.com/google/wire v0.6.0
github.com/ixugo/goweb v1.0.17
github.com/ixugo/goweb v1.0.18
github.com/jinzhu/copier v0.4.0
github.com/pelletier/go-toml/v2 v2.2.3
github.com/shirou/gopsutil v3.21.11+incompatible

4
go.sum
View File

@@ -50,8 +50,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.6.0 h1:HBkoIh4BdSxoyo9PveV8giw7ZsaBOvzWKfcg/6MrVwI=
github.com/google/wire v0.6.0/go.mod h1:F4QhpQ9EDIdJ1Mbop/NZBRB+5yrR6qg3BnctaoUk6NA=
github.com/ixugo/goweb v1.0.17 h1:AU2ESyH9qhu5zUcPT88mq789xb6xMxwtzriPdf9GFt0=
github.com/ixugo/goweb v1.0.17/go.mod h1:iBwaaazAtvEuuODjnoCR/5bssvTi49Eft6x8ULg/jsg=
github.com/ixugo/goweb v1.0.18 h1:UXRTxk6zYFEy6RkZ72Esf2uKjWK3flcrRcPkwnH6moM=
github.com/ixugo/goweb v1.0.18/go.mod h1:iBwaaazAtvEuuODjnoCR/5bssvTi49Eft6x8ULg/jsg=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=

View File

@@ -3,8 +3,10 @@ package media
import (
"context"
"fmt"
"log/slog"
"github.com/ixugo/goweb/pkg/hook"
"github.com/ixugo/goweb/pkg/orm"
"github.com/ixugo/goweb/pkg/web"
"github.com/jinzhu/copier"
@@ -68,7 +70,7 @@ func (c Core) AddStreamPush(ctx context.Context, in *AddStreamPushInput) (*Strea
if err := copier.Copy(&out, in); err != nil {
slog.Error("Copy", "err", err)
}
out.ID = RTMPIDPrefix + c.uniqueID.UniqueID()
out.ID = c.uniqueID.UniqueID(RTMPIDPrefix)
if err := c.store.StreamPush().Add(ctx, &out); err != nil {
if orm.IsDuplicatedKey(err) {
return nil, web.ErrDB.Msg("stream 重复,请勿重复添加")
@@ -103,18 +105,35 @@ func (c *Core) DelStreamPush(ctx context.Context, id string) (*StreamPush, error
return &out, nil
}
type PublishInput struct {
App string
Stream string
MediaServerID string
Sign string
Secret string
Session string
}
// Publish 由于 hook 的函数,无需 web.error 封装
func (c *Core) Publish(ctx context.Context, app, stream, mediaServerID string) error {
result, err := c.GetStreamPushByAppStream(ctx, app, stream)
func (c *Core) Publish(ctx context.Context, in PublishInput) error {
result, err := c.GetStreamPushByAppStream(ctx, in.App, in.Stream)
if err != nil {
return err
}
if !result.IsAuthDisabled {
if s := hook.MD5(in.Session + in.Secret); s != in.Sign {
slog.Info("推流鉴权失败", "got", in.Sign, "expect", s)
return fmt.Errorf("rtmp secret error, got[%s]", in.Sign)
}
}
var s StreamPush
return c.store.StreamPush().Edit(ctx, &s, func(b *StreamPush) {
b.MediaServerID = mediaServerID
b.MediaServerID = in.MediaServerID
b.Status = StatusPushing
now := orm.Now()
b.PushedAt = &now
b.Session = in.Session
}, orm.Where("id=?", result.ID))
}
@@ -124,5 +143,27 @@ func (c *Core) UnPublish(ctx context.Context, app, stream string) error {
b.Status = StatusStopped
now := orm.Now()
b.StoppedAt = &now
b.Session = ""
}, orm.Where("app = ? AND stream=?", app, stream))
}
type OnPlayInput struct {
App string
Stream string
Session string
}
func (c *Core) OnPlay(ctx context.Context, in OnPlayInput) error {
result, err := c.GetStreamPushByAppStream(ctx, in.App, in.Stream)
if err != nil {
return err
}
if result.IsAuthDisabled {
return nil
}
if in.Session != result.Session {
slog.Info("拉流鉴权失败", "got", in.Session, "expect", result.Session)
return fmt.Errorf("session error, got[%s]", in.Session)
}
return nil
}

View File

@@ -17,7 +17,7 @@ type StreamPush struct {
ID string `gorm:"primaryKey" json:"id"`
CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间
Name string `gorm:"column:name;notNull;default:'';comment:推流名称" json:"name"`
Name string `gorm:"column:name;notNull;default:'';comment:推流名称" json:"name"` // 推流名称
PushedAt *orm.Time `gorm:"column:pushed_at;notNull;default:'1970-01-01 00:00:00';comment:最后一次推流时间" json:"pushed_at"` // 最后一次推流时间
StoppedAt *orm.Time `gorm:"column:stopped_at;notNull;default:'1970-01-01 00:00:00';comment:最后一次停止时间" json:"stopped_at"` // 最后一次停止时间
App string `gorm:"column:app;notNull;default:'';uniqueIndex:idx_stream_pushs_app_stream;comment:应用名" json:"app"` // 应用名
@@ -25,6 +25,10 @@ type StreamPush struct {
MediaServerID string `gorm:"column:media_server_id;notNull;default:'';comment:媒体服务器 ID" json:"media_server_id"` // 媒体服务器 ID
ServerID string `gorm:"column:server_id;notNull;default:'';comment:服务器 ID" json:"server_id"` // 服务器 ID
Status string `gorm:"column:status;notNull;default:'';comment:推流状态(PUSHING)" json:"status"` // 推流状态(PUSHING)
IsAuthDisabled bool `gorm:"column:is_auth_disabled;notNull;default:false;comment:是否启用推流鉴权" json:"is_auth_disabled"` // 是否启用推流鉴权
// 自定义拉流鉴权参数IsAuthDisabled=false 时生效
Session string `gorm:"column:session;notNull;default:'';comment:session" json:"-"`
}
// TableName database table name

View File

@@ -2,7 +2,6 @@
package media
import (
"github.com/ixugo/goweb/pkg/orm"
"github.com/ixugo/goweb/pkg/web"
)
@@ -21,6 +20,7 @@ type FindStreamPushInput struct {
type EditStreamPushInput struct {
App string `json:"app"` // 应用名
Stream string `json:"stream"` // 流 ID
IsAuthDisabled bool `json:"is_auth_disabled"` // 是否禁用推流鉴权
// MediaServerID string `json:"media_server_id"` // 媒体服务器 ID
// ServerID string `json:"server_id"` // 服务器 ID
// Status string `json:"status"` // 推流状态(PUSHING)
@@ -28,13 +28,13 @@ type EditStreamPushInput struct {
type AddStreamPushInput struct {
Name string `json:"name"` // 推流名称
App string `json:"app"` // 应用名
PushedAt *orm.Time `json:"pushed_at"` // 最后一次推流时间
StoppedAt *orm.Time `json:"stopped_at"` // 最后一次停止时间
Stream string `json:"stream"` // 流 ID
MediaServerID string `json:"media_server_id"` // 媒体服务器 ID
ServerID string `json:"server_id"` // 服务器 ID
Status string `json:"status"` // 推流状态(PUSHING)
App string `json:"app,required"` // 应用名
Stream string `json:"stream,required"` // 流 ID
IsAuthDisabled bool `json:"is_auth_disabled"` // 是否禁用推流鉴权
// MediaServerID string `json:"media_server_id"` // 媒体服务器 ID
// ServerID string `json:"server_id"` // 服务器 ID
// Status string `json:"status"` // 推流状态(PUSHING)
}
type FindStreamPushOutputItem struct {

View File

@@ -10,6 +10,9 @@ import (
"crypto/rand"
"log/slog"
"math/big"
"time"
"github.com/ixugo/goweb/pkg/hook"
)
type IDManager struct {
@@ -25,10 +28,16 @@ func NewIDManager(store UniqueIDStorer, length int) *IDManager {
}
}
func (m *IDManager) UniqueID() string {
// UniqueID 获取唯一 id
func (m *IDManager) UniqueID(prefix string) string {
cost := hook.UseTiming(time.Second)
defer cost()
// 如果在最低长度中,碰撞比较频繁,增加 1 位长度再试一次
for i := range 10 {
// 生成自定义长度随机数,通过数据库主键来防止碰撞,碰撞后再次尝试
for range 66 {
id := GenerateRandomString(m.length + i)
id := prefix + GenerateRandomString(m.length+i)
if err := m.store.Add(context.Background(), &UniqueID{ID: id}); err != nil {
slog.Error("UniqueID", "err", err)
continue
@@ -41,6 +50,7 @@ func (m *IDManager) UniqueID() string {
}
// GenerateRandomString 生成随机字符串
// 采用全小写+数字,有识别需求,可以删除 o/0i/l 之一
func GenerateRandomString(length int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyz1234567890"
lettersLength := big.NewInt(int64(len(letterBytes)))

View File

@@ -20,6 +20,7 @@ func NewCore(store Storer, length int) Core {
}
}
func (c Core) UniqueID() string {
return c.m.UniqueID()
// UniqueID 获取全局唯一 ID
func (c Core) UniqueID(prefix string) string {
return c.m.UniqueID(prefix)
}

View File

@@ -2,6 +2,6 @@ package api
// 如果需要执行表迁移,递增此版本号和表更新说明
var (
dbVersion = "0.0.4"
dbVersion = "0.0.6"
dbRemark = "add uniqueID"
)

View File

@@ -37,7 +37,7 @@ func registerMediaAPI(g gin.IRouter, api MediaAPI, handler ...gin.HandlerFunc) {
// >>> streamPush >>>>>>>>>>>>>>>>>>>>
func (a MediaAPI) findStreamPush(c *gin.Context, in *media.FindStreamPushInput) (any, error) {
func (a MediaAPI) findStreamPush(c *gin.Context, in *media.FindStreamPushInput) (*web.PageOutput, error) {
items, total, err := a.mediaCore.FindStreamPush(c.Request.Context(), in)
if err != nil {
return nil, err
@@ -58,10 +58,12 @@ func (a MediaAPI) findStreamPush(c *gin.Context, in *media.FindStreamPushInput)
if mediaID == "" {
mediaID = sms.DefaultMediaServerID
}
if svr, _ := cacheFn(mediaID); svr != nil {
rtmpAddrs[0] = fmt.Sprintf("rtmp://%s:%d/%s/%s?sign=%s",
web.GetHost(c.Request), svr.Ports.RTMP, item.App, item.Stream, a.conf.Server.RTMPSecret,
)
if svr, _, _ := cacheFn(mediaID); svr != nil {
addr := fmt.Sprintf("rtmp://%s:%d/%s/%s", web.GetHost(c.Request), svr.Ports.RTMP, item.App, item.Stream)
if !item.IsAuthDisabled {
addr += fmt.Sprintf("?sign=%s", hook.MD5(a.conf.Server.RTMPSecret))
}
rtmpAddrs[0] = addr
}
out[i] = &media.FindStreamPushOutputItem{
@@ -69,25 +71,24 @@ func (a MediaAPI) findStreamPush(c *gin.Context, in *media.FindStreamPushInput)
PushAddrs: rtmpAddrs,
}
}
return gin.H{"items": out, "total": total}, err
return &web.PageOutput{Items: out, Total: total}, err
}
func (a MediaAPI) getStreamPush(c *gin.Context, _ *struct{}) (any, error) {
func (a MediaAPI) getStreamPush(c *gin.Context, _ *struct{}) (*media.StreamPush, error) {
streamPushID := c.Param("id")
return a.mediaCore.GetStreamPush(c.Request.Context(), streamPushID)
}
func (a MediaAPI) editStreamPush(c *gin.Context, in *media.EditStreamPushInput) (any, error) {
func (a MediaAPI) editStreamPush(c *gin.Context, in *media.EditStreamPushInput) (*media.StreamPush, error) {
streamPushID := c.Param("id")
return a.mediaCore.EditStreamPush(c.Request.Context(), in, streamPushID)
}
func (a MediaAPI) addStreamPush(c *gin.Context, in *media.AddStreamPushInput) (any, error) {
func (a MediaAPI) addStreamPush(c *gin.Context, in *media.AddStreamPushInput) (*media.StreamPush, error) {
return a.mediaCore.AddStreamPush(c.Request.Context(), in)
}
func (a MediaAPI) delStreamPush(c *gin.Context, _ *struct{}) (any, error) {
func (a MediaAPI) delStreamPush(c *gin.Context, _ *struct{}) (*media.StreamPush, error) {
streamPushID := c.Param("id")
return a.mediaCore.DelStreamPush(c.Request.Context(), streamPushID)
}

View File

@@ -1,7 +1,6 @@
package api
import (
"fmt"
"log/slog"
"net/url"
@@ -19,7 +18,11 @@ type WebHookAPI struct {
}
func NewWebHookAPI(core sms.Core, mediaCore media.Core, conf *conf.Bootstrap) WebHookAPI {
return WebHookAPI{smsCore: core, mediaCore: mediaCore, conf: conf}
return WebHookAPI{
smsCore: core,
mediaCore: mediaCore,
conf: conf,
}
}
func registerZLMWebhookAPI(r gin.IRouter, api WebHookAPI, handler ...gin.HandlerFunc) {
@@ -47,18 +50,18 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut
if err != nil {
return &onPublishOutput{DefaultOutput: DefaultOutput{Code: 1, Msg: err.Error()}}, nil
}
if sign := params.Get("sign"); sign != w.conf.Server.RTMPSecret {
return &onPublishOutput{DefaultOutput: DefaultOutput{Code: 1, Msg: fmt.Sprintf("rtmp secret 错误, got[%s] expect[%s]", sign, w.conf.Server.RTMPSecret)}}, nil
}
if err := w.mediaCore.Publish(c.Request.Context(), in.App, in.Stream, in.MediaServerID); err != nil {
sign := params.Get("sign")
if err := w.mediaCore.Publish(c.Request.Context(), media.PublishInput{
App: in.App,
Stream: in.Stream,
MediaServerID: in.MediaServerID,
Sign: sign,
Secret: w.conf.Server.RTMPSecret,
Session: params.Get("session"),
}); err != nil {
return &onPublishOutput{DefaultOutput: DefaultOutput{Code: 1, Msg: err.Error()}}, nil
}
}
// TODO: 待完善,鉴权推流
// TODO: 待重构,封装 publish 接口
return &onPublishOutput{
DefaultOutput: newDefaultOutputOK(),
}, nil
@@ -83,6 +86,23 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D
// 播放流时会触发此事件。如果流不存在,则首先触发 on_play 事件,然后触发 on_stream_not_found 事件。
// 播放rtsp流时如果该流开启了rtsp专用认证on_rtsp_realm则不会触发on_play事件。
// https://docs.zlmediakit.com/guide/media_server/web_hook_api.html#_6-on-play
func (w WebHookAPI) onPlay(_ *gin.Context, in *onPublishInput) (DefaultOutput, error) {
func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, error) {
switch in.Schema {
case "rtmp":
params, err := url.ParseQuery(in.Params)
if err != nil {
return DefaultOutput{Code: 1, Msg: err.Error()}, nil
}
session := params.Get("session")
if err := w.mediaCore.OnPlay(c.Request.Context(), media.OnPlayInput{
App: in.App,
Stream: in.Stream,
Session: session,
}); err != nil {
return DefaultOutput{Code: 1, Msg: err.Error()}, nil
}
case "rtsp":
}
return newDefaultOutputOK(), nil
}