支持使用ffmpeg状态切换

This commit is contained in:
madao
2024-12-18 16:51:13 +08:00
parent 549ec6c2aa
commit 4dbd2e08f9
14 changed files with 115 additions and 29 deletions

View File

@@ -43,7 +43,7 @@ CREATE TABLE public.camera_share (
id varchar NOT NULL, id varchar NOT NULL,
camera_id varchar NULL, -- 摄像头标识 camera_id varchar NULL, -- 摄像头标识
auth_code varchar NULL, -- 播放权限码 auth_code varchar NULL, -- 播放权限码
enabled varchar NULL, -- 启用状态1.启用0.禁用; enabled int2 NULL, -- 启用状态1.启用0.禁用;
created timestamp(0) NULL, -- 创建时间 created timestamp(0) NULL, -- 创建时间
deadline timestamp(0) NULL, -- 截止日期 deadline timestamp(0) NULL, -- 截止日期
"name" varchar NULL, -- 分享说明 "name" varchar NULL, -- 分享说明

17
main.go
View File

@@ -9,7 +9,7 @@ import (
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/ffmpegmanager" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/ffmpegmanager"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver"
// "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web"
_ "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/register" _ "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/register"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/task" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/task"
@@ -17,13 +17,22 @@ import (
// "net/http" // "net/http"
// _ "net/http/pprof" // _ "net/http/pprof"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
) )
func main() { func main() {
// rtspclientmanager.GetSingleRtspClientManager().StartClient() fgUseFfmpeg, err := config.Bool("server.use-ffmpeg")
rtmpserver.GetSingleRtmpServer().StartRtmpServer() if err != nil {
ffmpegmanager.GetSingleFFmpegManager().StartClient() logs.Error("get use-ffmpeg fail : %v", err)
fgUseFfmpeg = false
}
if fgUseFfmpeg {
rtmpserver.GetSingleRtmpServer().StartRtmpServer()
ffmpegmanager.GetSingleFFmpegManager().StartClient()
} else {
rtspclientmanager.GetSingleRtspClientManager().StartClient()
}
task.GetSingleTask().StartTask() task.GetSingleTask().StartTask()
web.GetSingleWeb().StartWeb() web.GetSingleWeb().StartWeb()
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)

View File

@@ -1,11 +1,15 @@
server: server:
use-ffmpeg: false
security: true
user: user:
name: admin name: admin
password: admin password: admin
rtmp:
port: 1936
http: http:
port: 8080 port: 8080
static: static:
path: D:\development\project\rtsp2rtmp-web\build path: ./resources/static
fileflv: fileflv:
path: ./resources/output/live path: ./resources/output/live
log: log:

View File

@@ -1,5 +1,6 @@
server: server:
security: false use-ffmpeg: false
security: true
user: user:
name: admin name: admin
password: admin password: admin
@@ -18,5 +19,5 @@ server:
driver-type: 4 driver-type: 4
driver: postgres driver: postgres
url: user=postgres password=123456 dbname=rtsp2rtmp host=localhost port=5432 sslmode=disable TimeZone=UTC url: user=postgres password=123456 dbname=rtsp2rtmp host=localhost port=5432 sslmode=disable TimeZone=UTC
show-sql: true show-sql: false

View File

@@ -114,15 +114,9 @@ func (s *FFmpegManager) startConnections() {
logs.Error("rtspManager panic %v", r) logs.Error("rtspManager panic %v", r)
} }
}() }()
condition := common.GetEmptyCondition()
es, err := base_service.CameraFindCollectionByCondition(condition)
if err != nil {
logs.Error("camera list query error: %s", err)
return
}
for { for {
condition := common.GetEmptyCondition() condition := common.GetEmptyCondition()
es, err = base_service.CameraFindCollectionByCondition(condition) es, err := base_service.CameraFindCollectionByCondition(condition)
if err != nil { if err != nil {
logs.Error("camera list query error: %s", err) logs.Error("camera list query error: %s", err)
return return

View File

@@ -5,6 +5,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
@@ -175,11 +176,33 @@ func (r *rtmpServer) handleRtmpConn(conn *rtmp.Conn) {
done := make(chan int) done := make(chan int)
//添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包 //添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包
pktStream := make(chan av.Packet, 1024) pktStream := make(chan av.Packet, 1024)
heartBeatChan := make(chan int)
defer func() { defer func() {
close(done) close(done)
close(pktStream) close(pktStream)
close(heartBeatChan)
}() }()
go func() {
defer func() {
if recover_rusult := recover(); recover_rusult != nil {
logs.Error("HandleConn error : %v", recover_rusult)
}
}()
ticker := time.NewTicker(10 * time.Second)
for {
select {
case _, ok := <-heartBeatChan:
if !ok {
return
}
ticker.Reset(10 * time.Second)
case <-ticker.C:
conn.Close()
return
}
}
}()
p := rtmppublisher.NewPublisher(done, pktStream, code, codecs, r) p := rtmppublisher.NewPublisher(done, pktStream, code, codecs, r)
r.rms.Store(camera.Code, p) r.rms.Store(camera.Code, p)
for { for {
@@ -188,6 +211,11 @@ func (r *rtmpServer) handleRtmpConn(conn *rtmp.Conn) {
logs.Error("ReadPacket error : %v", err) logs.Error("ReadPacket error : %v", err)
break break
} }
select {
case heartBeatChan <- 1:
default:
}
select { select {
case pktStream <- pkt: case pktStream <- pkt:
default: default:

View File

@@ -12,7 +12,7 @@ import (
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
// ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
) )
@@ -33,7 +33,7 @@ func GetSingleRtspClientManager() *RtspClientManager {
func (rs *RtspClientManager) StartClient() { func (rs *RtspClientManager) StartClient() {
go rs.startConnections() go rs.startConnections()
// go rs.stopConn(ext_controller.CodeStream()) go rs.stopConn(ext_controller.CodeStream())
} }
func (rc *RtspClientManager) ExistsPublisher(code string) bool { func (rc *RtspClientManager) ExistsPublisher(code string) bool {
@@ -104,7 +104,7 @@ func (s *RtspClientManager) startConnections() {
} }
go s.connRtsp(camera.Code) go s.connRtsp(camera.Code)
} }
<-time.After(1 * time.Second) <-time.After(5 * time.Second)
} }
} }

View File

@@ -13,7 +13,7 @@ type CameraShare struct {
// 权限码: // 权限码:
AuthCode string `orm:"column(auth_code)" json:"authCode"` AuthCode string `orm:"column(auth_code)" json:"authCode"`
// 启用状态: // 启用状态:
Enabled bool `orm:"column(enabled)" json:"enabled"` Enabled int `orm:"column(enabled)" json:"enabled"`
// 创建时间: // 创建时间:
Created time.Time `orm:"column(created)" json:"created"` Created time.Time `orm:"column(created)" json:"created"`
// 开始时间: // 开始时间:

View File

@@ -11,7 +11,7 @@ type CameraSharePO struct {
// 权限码: // 权限码:
AuthCode string `json:"authCode"` AuthCode string `json:"authCode"`
// 启用状态: // 启用状态:
Enabled bool `json:"enabled"` Enabled int `json:"enabled"`
// 创建时间: // 创建时间:
Created time.Time `json:"created"` Created time.Time `json:"created"`
// 开始时间: // 开始时间:

View File

@@ -39,7 +39,7 @@ type CameraShareVO struct {
// 权限码: // 权限码:
AuthCode string `json:"authCode"` AuthCode string `json:"authCode"`
// 启用状态: // 启用状态:
Enabled bool `json:"enabled"` Enabled int `json:"enabled"`
// 创建时间: // 创建时间:
Created time.Time `json:"created"` Created time.Time `json:"created"`
// 开始时间: // 开始时间:

View File

@@ -11,7 +11,7 @@ type CameraShareVO struct {
// 权限码: // 权限码:
AuthCode string `json:"authCode"` AuthCode string `json:"authCode"`
// 启用状态: // 启用状态:
Enabled bool `json:"enabled"` Enabled int `json:"enabled"`
// 创建时间: // 创建时间:
Created time.Time `json:"created"` Created time.Time `json:"created"`
// 开始时间: // 开始时间:

View File

@@ -204,10 +204,28 @@ func (dynQuery *DynQueryMysql) makeConditionsToken(conditionExpr ConditionExpres
params = append(params, fmt.Sprintf("%s", simpleExpr.Values[0])+"%") params = append(params, fmt.Sprintf("%s", simpleExpr.Values[0])+"%")
} else if simpleExpr.ExprType == Equal { } else if simpleExpr.ExprType == Equal {
simpleExprTokens = append(simpleExprTokens, "=", "?") simpleExprTokens = append(simpleExprTokens, "=", "?")
params = append(params, simpleExpr.Values[0]) vBool, ok := simpleExpr.Values[0].(bool)
if ok {
param := 0
if vBool {
param = 1
}
params = append(params, param)
} else {
params = append(params, simpleExpr.Values[0])
}
} else if simpleExpr.ExprType == NotEqual { } else if simpleExpr.ExprType == NotEqual {
simpleExprTokens = append(simpleExprTokens, "!=", "?") simpleExprTokens = append(simpleExprTokens, "!=", "?")
params = append(params, simpleExpr.Values[0]) vBool, ok := simpleExpr.Values[0].(bool)
if ok {
param := 0
if vBool {
param = 1
}
params = append(params, param)
} else {
params = append(params, simpleExpr.Values[0])
}
} else if simpleExpr.ExprType == GT { } else if simpleExpr.ExprType == GT {
simpleExprTokens = append(simpleExprTokens, ">", "?") simpleExprTokens = append(simpleExprTokens, ">", "?")
params = append(params, simpleExpr.Values[0]) params = append(params, simpleExpr.Values[0])

View File

@@ -203,10 +203,28 @@ func (dynQuery *DynQueryPostgres) makeConditionsToken(conditionExpr ConditionExp
params = append(params, fmt.Sprintf("%s", simpleExpr.Values[0])+"%") params = append(params, fmt.Sprintf("%s", simpleExpr.Values[0])+"%")
} else if simpleExpr.ExprType == Equal { } else if simpleExpr.ExprType == Equal {
simpleExprTokens = append(simpleExprTokens, "=", "?") simpleExprTokens = append(simpleExprTokens, "=", "?")
params = append(params, simpleExpr.Values[0]) vBool, ok := simpleExpr.Values[0].(bool)
if ok {
param := 0
if vBool {
param = 1
}
params = append(params, param)
} else {
params = append(params, simpleExpr.Values[0])
}
} else if simpleExpr.ExprType == NotEqual { } else if simpleExpr.ExprType == NotEqual {
simpleExprTokens = append(simpleExprTokens, "!=", "?") simpleExprTokens = append(simpleExprTokens, "!=", "?")
params = append(params, simpleExpr.Values[0]) vBool, ok := simpleExpr.Values[0].(bool)
if ok {
param := 0
if vBool {
param = 1
}
params = append(params, param)
} else {
params = append(params, simpleExpr.Values[0])
}
} else if simpleExpr.ExprType == GT { } else if simpleExpr.ExprType == GT {
simpleExprTokens = append(simpleExprTokens, ">", "?") simpleExprTokens = append(simpleExprTokens, ">", "?")
params = append(params, simpleExpr.Values[0]) params = append(params, simpleExpr.Values[0])

View File

@@ -4,8 +4,10 @@ import (
"runtime/debug" "runtime/debug"
"time" "time"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
@@ -47,6 +49,11 @@ func (t *task) offlineCamera() {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
} }
}() }()
fgUseFfmpeg, err := config.Bool("server.use-ffmpeg")
if err != nil {
logs.Error("get use-ffmpeg fail : %v", err)
fgUseFfmpeg = false
}
for { for {
condition := common.GetEmptyCondition() condition := common.GetEmptyCondition()
css, err := base_service.CameraFindCollectionByCondition(condition) css, err := base_service.CameraFindCollectionByCondition(condition)
@@ -57,9 +64,16 @@ func (t *task) offlineCamera() {
if cs.OnlineStatus != 1 { if cs.OnlineStatus != 1 {
continue continue
} }
if exists := rtmpserver.GetSingleRtmpServer().ExistsPublisher(cs.Code); !exists { if fgUseFfmpeg {
cs.OnlineStatus = 0 if exists := rtmpserver.GetSingleRtmpServer().ExistsPublisher(cs.Code); !exists {
base_service.CameraUpdateById(cs) cs.OnlineStatus = 0
base_service.CameraUpdateById(cs)
}
} else {
if exists := rtspclientmanager.GetSingleRtspClientManager().ExistsPublisher(cs.Code); !exists {
cs.OnlineStatus = 0
base_service.CameraUpdateById(cs)
}
} }
} }
<-time.After(10 * time.Minute) <-time.After(10 * time.Minute)