diff --git a/docs/init/rtsp2rtmp-postgresql.sql b/docs/init/rtsp2rtmp-postgresql.sql index d480826..9ceb328 100644 --- a/docs/init/rtsp2rtmp-postgresql.sql +++ b/docs/init/rtsp2rtmp-postgresql.sql @@ -43,7 +43,7 @@ CREATE TABLE public.camera_share ( id varchar NOT NULL, camera_id varchar NULL, -- 摄像头标识 auth_code varchar NULL, -- 播放权限码 - enabled varchar NULL, -- 启用状态:1.启用;0.禁用; + enabled int2 NULL, -- 启用状态:1.启用;0.禁用; created timestamp(0) NULL, -- 创建时间 deadline timestamp(0) NULL, -- 截止日期 "name" varchar NULL, -- 分享说明 diff --git a/main.go b/main.go index 0471de0..e727df8 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/ffmpegmanager" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" - // "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" _ "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/register" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/task" @@ -17,13 +17,22 @@ import ( // "net/http" // _ "net/http/pprof" + "github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/logs" ) func main() { - // rtspclientmanager.GetSingleRtspClientManager().StartClient() - rtmpserver.GetSingleRtmpServer().StartRtmpServer() - ffmpegmanager.GetSingleFFmpegManager().StartClient() + fgUseFfmpeg, err := config.Bool("server.use-ffmpeg") + if err != nil { + logs.Error("get use-ffmpeg fail : %v", err) + fgUseFfmpeg = false + } + if fgUseFfmpeg { + rtmpserver.GetSingleRtmpServer().StartRtmpServer() + ffmpegmanager.GetSingleFFmpegManager().StartClient() + } else { + rtspclientmanager.GetSingleRtspClientManager().StartClient() + } task.GetSingleTask().StartTask() web.GetSingleWeb().StartWeb() sigs := make(chan os.Signal, 1) diff --git a/resources/conf/conf-dev.yml b/resources/conf/conf-dev.yml index bc165e3..9a6c3b7 100644 --- a/resources/conf/conf-dev.yml +++ b/resources/conf/conf-dev.yml @@ -1,11 +1,15 @@ server: + use-ffmpeg: false + security: true user: name: admin password: admin + rtmp: + port: 1936 http: port: 8080 static: - path: D:\development\project\rtsp2rtmp-web\build + path: ./resources/static fileflv: path: ./resources/output/live log: diff --git a/resources/conf/conf-prod.yml b/resources/conf/conf-prod.yml index 135aba8..9a6c3b7 100644 --- a/resources/conf/conf-prod.yml +++ b/resources/conf/conf-prod.yml @@ -1,5 +1,6 @@ server: - security: false + use-ffmpeg: false + security: true user: name: admin password: admin @@ -18,5 +19,5 @@ server: driver-type: 4 driver: postgres url: user=postgres password=123456 dbname=rtsp2rtmp host=localhost port=5432 sslmode=disable TimeZone=UTC - show-sql: true + show-sql: false \ No newline at end of file diff --git a/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go b/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go index 5af3a8c..ce56f23 100644 --- a/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go +++ b/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go @@ -114,15 +114,9 @@ func (s *FFmpegManager) startConnections() { logs.Error("rtspManager panic %v", r) } }() - condition := common.GetEmptyCondition() - es, err := base_service.CameraFindCollectionByCondition(condition) - if err != nil { - logs.Error("camera list query error: %s", err) - return - } for { condition := common.GetEmptyCondition() - es, err = base_service.CameraFindCollectionByCondition(condition) + es, err := base_service.CameraFindCollectionByCondition(condition) if err != nil { logs.Error("camera list query error: %s", err) return diff --git a/src/rtsp2rtmp/rtmpserver/rtmpserver.go b/src/rtsp2rtmp/rtmpserver/rtmpserver.go index 8f2aa4e..7250c6c 100644 --- a/src/rtsp2rtmp/rtmpserver/rtmpserver.go +++ b/src/rtsp2rtmp/rtmpserver/rtmpserver.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/logs" @@ -175,11 +176,33 @@ func (r *rtmpServer) handleRtmpConn(conn *rtmp.Conn) { done := make(chan int) //添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包 pktStream := make(chan av.Packet, 1024) + heartBeatChan := make(chan int) defer func() { close(done) close(pktStream) + close(heartBeatChan) }() + go func() { + defer func() { + if recover_rusult := recover(); recover_rusult != nil { + logs.Error("HandleConn error : %v", recover_rusult) + } + }() + ticker := time.NewTicker(10 * time.Second) + for { + select { + case _, ok := <-heartBeatChan: + if !ok { + return + } + ticker.Reset(10 * time.Second) + case <-ticker.C: + conn.Close() + return + } + } + }() p := rtmppublisher.NewPublisher(done, pktStream, code, codecs, r) r.rms.Store(camera.Code, p) for { @@ -188,6 +211,11 @@ func (r *rtmpServer) handleRtmpConn(conn *rtmp.Conn) { logs.Error("ReadPacket error : %v", err) break } + select { + case heartBeatChan <- 1: + default: + } + select { case pktStream <- pkt: default: diff --git a/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go b/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go index acfb812..6a25a3c 100644 --- a/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go +++ b/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go @@ -12,7 +12,7 @@ import ( "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" - // ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" + ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) @@ -33,7 +33,7 @@ func GetSingleRtspClientManager() *RtspClientManager { func (rs *RtspClientManager) StartClient() { go rs.startConnections() - // go rs.stopConn(ext_controller.CodeStream()) + go rs.stopConn(ext_controller.CodeStream()) } func (rc *RtspClientManager) ExistsPublisher(code string) bool { @@ -104,7 +104,7 @@ func (s *RtspClientManager) startConnections() { } go s.connRtsp(camera.Code) } - <-time.After(1 * time.Second) + <-time.After(5 * time.Second) } } diff --git a/src/rtsp2rtmp/web/dao/entity/camerashare.go b/src/rtsp2rtmp/web/dao/entity/camerashare.go index 17f6f9c..4da2ae9 100644 --- a/src/rtsp2rtmp/web/dao/entity/camerashare.go +++ b/src/rtsp2rtmp/web/dao/entity/camerashare.go @@ -13,7 +13,7 @@ type CameraShare struct { // 权限码: AuthCode string `orm:"column(auth_code)" json:"authCode"` // 启用状态: - Enabled bool `orm:"column(enabled)" json:"enabled"` + Enabled int `orm:"column(enabled)" json:"enabled"` // 创建时间: Created time.Time `orm:"column(created)" json:"created"` // 开始时间: diff --git a/src/rtsp2rtmp/web/dto/po/base/camera_share/camere_share_po.go b/src/rtsp2rtmp/web/dto/po/base/camera_share/camere_share_po.go index ec36334..88cac45 100644 --- a/src/rtsp2rtmp/web/dto/po/base/camera_share/camere_share_po.go +++ b/src/rtsp2rtmp/web/dto/po/base/camera_share/camere_share_po.go @@ -11,7 +11,7 @@ type CameraSharePO struct { // 权限码: AuthCode string `json:"authCode"` // 启用状态: - Enabled bool `json:"enabled"` + Enabled int `json:"enabled"` // 创建时间: Created time.Time `json:"created"` // 开始时间: diff --git a/src/rtsp2rtmp/web/dto/vo/base/camera/camere_vo.go b/src/rtsp2rtmp/web/dto/vo/base/camera/camere_vo.go index ef5c448..9a6e2d2 100644 --- a/src/rtsp2rtmp/web/dto/vo/base/camera/camere_vo.go +++ b/src/rtsp2rtmp/web/dto/vo/base/camera/camere_vo.go @@ -39,7 +39,7 @@ type CameraShareVO struct { // 权限码: AuthCode string `json:"authCode"` // 启用状态: - Enabled bool `json:"enabled"` + Enabled int `json:"enabled"` // 创建时间: Created time.Time `json:"created"` // 开始时间: diff --git a/src/rtsp2rtmp/web/dto/vo/base/camera_share/camere_share_vo.go b/src/rtsp2rtmp/web/dto/vo/base/camera_share/camere_share_vo.go index 6556154..8f1e434 100644 --- a/src/rtsp2rtmp/web/dto/vo/base/camera_share/camere_share_vo.go +++ b/src/rtsp2rtmp/web/dto/vo/base/camera_share/camere_share_vo.go @@ -11,7 +11,7 @@ type CameraShareVO struct { // 权限码: AuthCode string `json:"authCode"` // 启用状态: - Enabled bool `json:"enabled"` + Enabled int `json:"enabled"` // 创建时间: Created time.Time `json:"created"` // 开始时间: diff --git a/src/rtsp2rtmp/web/dyn_query/dq_mysql_generator.go b/src/rtsp2rtmp/web/dyn_query/dq_mysql_generator.go index 3d9db4d..ae63225 100644 --- a/src/rtsp2rtmp/web/dyn_query/dq_mysql_generator.go +++ b/src/rtsp2rtmp/web/dyn_query/dq_mysql_generator.go @@ -204,10 +204,28 @@ func (dynQuery *DynQueryMysql) makeConditionsToken(conditionExpr ConditionExpres params = append(params, fmt.Sprintf("%s", simpleExpr.Values[0])+"%") } else if simpleExpr.ExprType == Equal { simpleExprTokens = append(simpleExprTokens, "=", "?") - params = append(params, simpleExpr.Values[0]) + vBool, ok := simpleExpr.Values[0].(bool) + if ok { + param := 0 + if vBool { + param = 1 + } + params = append(params, param) + } else { + params = append(params, simpleExpr.Values[0]) + } } else if simpleExpr.ExprType == NotEqual { simpleExprTokens = append(simpleExprTokens, "!=", "?") - params = append(params, simpleExpr.Values[0]) + vBool, ok := simpleExpr.Values[0].(bool) + if ok { + param := 0 + if vBool { + param = 1 + } + params = append(params, param) + } else { + params = append(params, simpleExpr.Values[0]) + } } else if simpleExpr.ExprType == GT { simpleExprTokens = append(simpleExprTokens, ">", "?") params = append(params, simpleExpr.Values[0]) diff --git a/src/rtsp2rtmp/web/dyn_query/dq_postgres_generator.go b/src/rtsp2rtmp/web/dyn_query/dq_postgres_generator.go index 351aac2..db6194d 100644 --- a/src/rtsp2rtmp/web/dyn_query/dq_postgres_generator.go +++ b/src/rtsp2rtmp/web/dyn_query/dq_postgres_generator.go @@ -203,10 +203,28 @@ func (dynQuery *DynQueryPostgres) makeConditionsToken(conditionExpr ConditionExp params = append(params, fmt.Sprintf("%s", simpleExpr.Values[0])+"%") } else if simpleExpr.ExprType == Equal { simpleExprTokens = append(simpleExprTokens, "=", "?") - params = append(params, simpleExpr.Values[0]) + vBool, ok := simpleExpr.Values[0].(bool) + if ok { + param := 0 + if vBool { + param = 1 + } + params = append(params, param) + } else { + params = append(params, simpleExpr.Values[0]) + } } else if simpleExpr.ExprType == NotEqual { simpleExprTokens = append(simpleExprTokens, "!=", "?") - params = append(params, simpleExpr.Values[0]) + vBool, ok := simpleExpr.Values[0].(bool) + if ok { + param := 0 + if vBool { + param = 1 + } + params = append(params, param) + } else { + params = append(params, simpleExpr.Values[0]) + } } else if simpleExpr.ExprType == GT { simpleExprTokens = append(simpleExprTokens, ">", "?") params = append(params, simpleExpr.Values[0]) diff --git a/src/rtsp2rtmp/web/task/task.go b/src/rtsp2rtmp/web/task/task.go index 8109a54..99905d2 100644 --- a/src/rtsp2rtmp/web/task/task.go +++ b/src/rtsp2rtmp/web/task/task.go @@ -4,8 +4,10 @@ import ( "runtime/debug" "time" + "github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/logs" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" @@ -47,6 +49,11 @@ func (t *task) offlineCamera() { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() + fgUseFfmpeg, err := config.Bool("server.use-ffmpeg") + if err != nil { + logs.Error("get use-ffmpeg fail : %v", err) + fgUseFfmpeg = false + } for { condition := common.GetEmptyCondition() css, err := base_service.CameraFindCollectionByCondition(condition) @@ -57,9 +64,16 @@ func (t *task) offlineCamera() { if cs.OnlineStatus != 1 { continue } - if exists := rtmpserver.GetSingleRtmpServer().ExistsPublisher(cs.Code); !exists { - cs.OnlineStatus = 0 - base_service.CameraUpdateById(cs) + if fgUseFfmpeg { + if exists := rtmpserver.GetSingleRtmpServer().ExistsPublisher(cs.Code); !exists { + cs.OnlineStatus = 0 + base_service.CameraUpdateById(cs) + } + } else { + if exists := rtspclientmanager.GetSingleRtspClientManager().ExistsPublisher(cs.Code); !exists { + cs.OnlineStatus = 0 + base_service.CameraUpdateById(cs) + } } } <-time.After(10 * time.Minute)