mirror of
https://github.com/hkmadao/rtsp2rtmp.git
synced 2025-09-26 19:31:19 +08:00
lazy push rtmp
This commit is contained in:
@@ -11,16 +11,14 @@ import (
|
||||
"github.com/beego/beego/v2/core/config"
|
||||
"github.com/beego/beego/v2/core/logs"
|
||||
"github.com/go-cmd/cmd"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
|
||||
ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/ext"
|
||||
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
|
||||
)
|
||||
|
||||
type FFmpegInfo struct {
|
||||
startTime time.Time
|
||||
templatePasswd string
|
||||
ffMpegCmd *cmd.Cmd
|
||||
startTime time.Time
|
||||
ffMpegCmd *cmd.Cmd
|
||||
}
|
||||
|
||||
var rcmInstance *FFmpegManager
|
||||
@@ -63,14 +61,14 @@ func (rs *FFmpegManager) checkBlockFFmpegProcess() {
|
||||
}
|
||||
}()
|
||||
for {
|
||||
condition := common.GetEmptyCondition()
|
||||
condition := common.GetEqualCondition("cameraType", "rtsp")
|
||||
es, err := base_service.CameraFindCollectionByCondition(condition)
|
||||
if err != nil {
|
||||
logs.Error("camera list query error: %s", err)
|
||||
return
|
||||
}
|
||||
for _, camera := range es {
|
||||
if camera.OnlineStatus != true {
|
||||
if !camera.OnlineStatus {
|
||||
v, b := rs.rcs.Load(camera.Code)
|
||||
if b {
|
||||
ffmpegInfo := v.(FFmpegInfo)
|
||||
@@ -115,7 +113,7 @@ func (s *FFmpegManager) startConnections() {
|
||||
}
|
||||
}()
|
||||
for {
|
||||
condition := common.GetEmptyCondition()
|
||||
condition := common.GetEqualCondition("cameraType", "rtsp")
|
||||
es, err := base_service.CameraFindCollectionByCondition(condition)
|
||||
if err != nil {
|
||||
logs.Error("camera list query error: %s", err)
|
||||
@@ -125,7 +123,7 @@ func (s *FFmpegManager) startConnections() {
|
||||
if v, b := s.rcs.Load(camera.Code); b && v != nil {
|
||||
continue
|
||||
}
|
||||
if camera.Enabled != true {
|
||||
if !camera.Enabled {
|
||||
continue
|
||||
}
|
||||
go s.connRtsp(camera.Code)
|
||||
@@ -159,8 +157,8 @@ func (ffmpegManager *FFmpegManager) connRtsp(code string) {
|
||||
logs.Error("get rtmp port fail : %v", err)
|
||||
return
|
||||
}
|
||||
templatePasswd, _ := utils.GenerateId()
|
||||
rtmpUrl := fmt.Sprintf("rtmp://127.0.0.1:%d/%s/%s", rtmpPort, code, templatePasswd)
|
||||
|
||||
rtmpUrl := fmt.Sprintf("rtmp://127.0.0.1:%d/%s/%s", rtmpPort, code, camera.RtmpAuthCode)
|
||||
portOpen := checkTargetPortStatus(camera.RtspUrl)
|
||||
if !portOpen {
|
||||
logs.Error("rtspUrl: %s port not open", camera.RtspUrl)
|
||||
@@ -168,7 +166,7 @@ func (ffmpegManager *FFmpegManager) connRtsp(code string) {
|
||||
}
|
||||
// 只支持h264编码, 使用"-c:v copy", 不要使用其他选项, 出发视频转码会导致cpu很高
|
||||
ffmpegCmd := cmd.NewCmd("ffmpeg", "-i", camera.RtspUrl, "-c:v", "copy", "-c:a", "aac", "-f", "flv", rtmpUrl)
|
||||
ffmpegManager.rcs.Store(code, FFmpegInfo{startTime: time.Now(), templatePasswd: templatePasswd, ffMpegCmd: ffmpegCmd})
|
||||
ffmpegManager.rcs.Store(code, FFmpegInfo{startTime: time.Now(), ffMpegCmd: ffmpegCmd})
|
||||
logs.Info("ffmpeg start connect rtsp : command : %s %s", ffmpegCmd.Name, strings.Join(ffmpegCmd.Args, " "))
|
||||
statusChan := ffmpegCmd.Start()
|
||||
finalStatus := <-statusChan
|
||||
@@ -190,23 +188,9 @@ func (r *FFmpegManager) Delete(key interface{}) {
|
||||
r.rcs.Delete(key)
|
||||
}
|
||||
|
||||
func ValiadRtmpInfo(code string, passwd string) (fgSuccess bool) {
|
||||
v, ok := GetSingleFFmpegManager().rcs.Load(code)
|
||||
if !ok {
|
||||
fgSuccess = false
|
||||
return
|
||||
}
|
||||
fFmpegInfo, ok := v.(FFmpegInfo)
|
||||
if !ok {
|
||||
fgSuccess = false
|
||||
return
|
||||
}
|
||||
if fFmpegInfo.templatePasswd != passwd {
|
||||
fgSuccess = false
|
||||
return
|
||||
}
|
||||
fgSuccess = true
|
||||
return
|
||||
func ExistsRtspConn(code string) bool {
|
||||
_, ok := GetSingleFFmpegManager().rcs.Load(code)
|
||||
return ok
|
||||
}
|
||||
|
||||
func checkTargetPortStatus(rtspUrl string) (open bool) {
|
||||
|
@@ -1,10 +1,14 @@
|
||||
package flvadmin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/beego/beego/v2/core/logs"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/rtmpflvwriter"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
|
||||
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
|
||||
)
|
||||
|
||||
type RtmpFlvAdmin struct {
|
||||
@@ -22,24 +26,54 @@ func GetSingleRtmpFlvAdmin() *RtmpFlvAdmin {
|
||||
}
|
||||
|
||||
func (rfm *RtmpFlvAdmin) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
|
||||
rfw := rtmpflvwriter.NewRtmpFlvWriter(pktStream, code, codecs, rfm)
|
||||
condition := common.GetEqualCondition("code", code)
|
||||
camera, err := base_service.CameraFindOneByCondition(condition)
|
||||
if err != nil {
|
||||
logs.Error("FlvWrite found camera: %s error: %v, do painc", code, err)
|
||||
panic(fmt.Sprintf("FlvWrite found camera: %s error: %v", code, err))
|
||||
}
|
||||
rfw := rtmpflvwriter.NewRtmpFlvWriter(!camera.FgPassive, pktStream, code, codecs, rfm)
|
||||
rfm.rfms.Store(code, rfw)
|
||||
}
|
||||
|
||||
func (rfm *RtmpFlvAdmin) StartWrite(code string) {
|
||||
func (rfm *RtmpFlvAdmin) StartWrite(code string, needPushRtmp bool) {
|
||||
v, ok := rfm.rfms.Load(code)
|
||||
if ok {
|
||||
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
|
||||
rfw.StopWrite()
|
||||
rfm.FlvWrite(rfw.GetPktStream(), code, rfw.GetCodecs())
|
||||
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(needPushRtmp, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm)
|
||||
rfm.rfms.Store(code, rfwNew)
|
||||
}
|
||||
}
|
||||
|
||||
func (rfm *RtmpFlvAdmin) StopWrite(code string) {
|
||||
func (rfm *RtmpFlvAdmin) ReConntion(code string) {
|
||||
v, ok := rfm.rfms.Load(code)
|
||||
if ok {
|
||||
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
|
||||
rfw.StopWrite()
|
||||
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(false, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm)
|
||||
rfm.rfms.Store(code, rfwNew)
|
||||
}
|
||||
}
|
||||
|
||||
func (rfm *RtmpFlvAdmin) RemoteStartWrite(code string) {
|
||||
v, ok := rfm.rfms.Load(code)
|
||||
if ok {
|
||||
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
|
||||
if !rfw.GetNeedPushRtmp() {
|
||||
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(true, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm)
|
||||
rfm.rfms.Store(code, rfwNew)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rfm *RtmpFlvAdmin) RemoteStopWrite(code string) {
|
||||
v, ok := rfm.rfms.Load(code)
|
||||
if ok {
|
||||
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
|
||||
rfw.StopWrite()
|
||||
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(false, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm)
|
||||
rfm.rfms.Store(code, rfwNew)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,15 +84,11 @@ func (rfm *RtmpFlvAdmin) UpdateFFWS(code string, rfw *rtmpflvwriter.RtmpFlvWrite
|
||||
}
|
||||
}
|
||||
|
||||
//更新sps、pps等信息
|
||||
// 更新sps、pps等信息
|
||||
func (rfm *RtmpFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
|
||||
rfw, ok := rfm.rfms.Load(code)
|
||||
if ok {
|
||||
rfw := rfw.(*rtmpflvwriter.RtmpFlvWriter)
|
||||
rfw.SetCodecs(codecs)
|
||||
//sps、pps更新后,重新建立连接
|
||||
// logs.Warn("RtmpFlvAdmin: %s codecs change, restart RtmpFlvWriter", code)
|
||||
//这里只需要stop就可以,内部会重连
|
||||
// rfw.StopWrite()
|
||||
}
|
||||
}
|
||||
|
@@ -17,6 +17,8 @@ type IRtmpFlvManager interface {
|
||||
}
|
||||
|
||||
type RtmpFlvWriter struct {
|
||||
needPushRtmp bool
|
||||
stop bool
|
||||
done chan int
|
||||
pktStream <-chan av.Packet
|
||||
code string
|
||||
@@ -49,8 +51,14 @@ func (rfw *RtmpFlvWriter) GetCodecs() []av.CodecData {
|
||||
return rfw.codecs
|
||||
}
|
||||
|
||||
func NewRtmpFlvWriter(pktStream <-chan av.Packet, code string, codecs []av.CodecData, irfm IRtmpFlvManager) *RtmpFlvWriter {
|
||||
func (rfw *RtmpFlvWriter) GetNeedPushRtmp() bool {
|
||||
return rfw.needPushRtmp
|
||||
}
|
||||
|
||||
func NewRtmpFlvWriter(needPushRtmp bool, pktStream <-chan av.Packet, code string, codecs []av.CodecData, irfm IRtmpFlvManager) *RtmpFlvWriter {
|
||||
rfw := &RtmpFlvWriter{
|
||||
needPushRtmp: needPushRtmp,
|
||||
stop: false,
|
||||
done: make(chan int),
|
||||
pktStream: pktStream,
|
||||
code: code,
|
||||
@@ -70,6 +78,7 @@ func (rfw *RtmpFlvWriter) StopWrite() {
|
||||
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
rfw.stop = true
|
||||
close(rfw.done)
|
||||
}()
|
||||
}
|
||||
@@ -108,7 +117,7 @@ func (rfw *RtmpFlvWriter) flvWrite() {
|
||||
if camera.OnlineStatus != true {
|
||||
return
|
||||
}
|
||||
if camera.RtmpPushStatus != true {
|
||||
if camera.RtmpPushStatus != true || !rfw.needPushRtmp {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
@@ -127,10 +136,14 @@ func (rfw *RtmpFlvWriter) flvWrite() {
|
||||
logs.Info("disconnect old RtmpFlvWriter : %s", rfw.code)
|
||||
rfw.conn.Close()
|
||||
}
|
||||
if rfw.stop {
|
||||
logs.Info("stop RtmpFlvWriter : %s", rfw.code)
|
||||
return
|
||||
}
|
||||
_, pktStreamOk := <-rfw.pktStream
|
||||
if pktStreamOk {
|
||||
logs.Info("to create NewRtmpFlvWriter : %s", rfw.code)
|
||||
rfwn := NewRtmpFlvWriter(rfw.pktStream, rfw.code, rfw.codecs, rfw.irfm)
|
||||
rfwn := NewRtmpFlvWriter(true, rfw.pktStream, rfw.code, rfw.codecs, rfw.irfm)
|
||||
rfwn.irfm.UpdateFFWS(rfwn.code, rfwn)
|
||||
} else {
|
||||
logs.Info("RtmpFlvWriter pktStream is closed : %s", rfw.code)
|
||||
|
@@ -12,7 +12,7 @@ import (
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/format/rtmp"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/ffmpegmanager"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmppublisher"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver/rtmppublisher"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
|
||||
ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/ext"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/entity"
|
||||
@@ -259,13 +259,7 @@ func getParamByURI(conn *rtmp.Conn) (string, string, bool) {
|
||||
|
||||
// 权限验证
|
||||
func authentication(camera entity.Camera, code string, authCode string, conn *rtmp.Conn) bool {
|
||||
fgSuccess := ffmpegmanager.ValiadRtmpInfo(code, authCode)
|
||||
if !fgSuccess {
|
||||
logs.Error("camera %s RtmpAuthCode error : %s", code, authCode)
|
||||
conn.Close()
|
||||
return false
|
||||
}
|
||||
if camera.Enabled != true {
|
||||
if !camera.Enabled {
|
||||
logs.Error("camera %s disabled : %s", code, authCode)
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
@@ -273,6 +267,16 @@ func authentication(camera entity.Camera, code string, authCode string, conn *rt
|
||||
}
|
||||
return false
|
||||
}
|
||||
if camera.CameraType == "rtsp" && !ffmpegmanager.ExistsRtspConn(code) {
|
||||
logs.Error("camera %s CameraType: rtsp, but rtsp conn not exists", code)
|
||||
conn.Close()
|
||||
return false
|
||||
}
|
||||
if camera.RtmpAuthCode != authCode {
|
||||
logs.Error("camera %s RtmpAuthCode error : %s", code, authCode)
|
||||
conn.Close()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@@ -77,7 +77,7 @@ func (s *RtspClientManager) startConnections() {
|
||||
logs.Error("rtspManager panic %v", r)
|
||||
}
|
||||
}()
|
||||
condition := common.GetEmptyCondition()
|
||||
condition := common.GetEqualCondition("cameraType", "rtsp")
|
||||
es, err := base_service.CameraFindCollectionByCondition(condition)
|
||||
if err != nil {
|
||||
logs.Error("camera list query error: %s", err)
|
||||
|
@@ -2,6 +2,7 @@ package tcpclient
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/beego/beego/v2/core/logs"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
|
||||
@@ -10,13 +11,6 @@ import (
|
||||
)
|
||||
|
||||
func cameraAq(commandMessage CommandMessage) {
|
||||
paramStr := commandMessage.Param
|
||||
condition := common.AqCondition{}
|
||||
err := json.Unmarshal([]byte(paramStr), &condition)
|
||||
if err != nil {
|
||||
logs.Error("cameraAq message format error: %v", err)
|
||||
return
|
||||
}
|
||||
conn, err := connectAndRegister("cameraAq", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("cameraAq connect to server error: %v", err)
|
||||
@@ -24,6 +18,20 @@ func cameraAq(commandMessage CommandMessage) {
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
paramStr := commandMessage.Param
|
||||
condition := common.AqCondition{}
|
||||
err = json.Unmarshal([]byte(paramStr), &condition)
|
||||
if err != nil {
|
||||
logs.Error("cameraAq message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("cameraAq message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
cameras, err := base_service.CameraFindCollectionByCondition(condition)
|
||||
if err != nil {
|
||||
logs.Error("CameraFindCollectionByCondition error: %v", err)
|
||||
|
@@ -95,6 +95,10 @@ func commandRes(commandMessage CommandMessage) {
|
||||
flvPlay(commandMessage)
|
||||
case "flvFetchMoreData":
|
||||
flvFetchMoreData(commandMessage)
|
||||
case "startPushRtmp":
|
||||
startRtmpPush(commandMessage)
|
||||
case "stopPushRtmp":
|
||||
stopRtmpPush(commandMessage)
|
||||
default:
|
||||
logs.Error("unsupport commandType: %s", commandMessage.MessageType)
|
||||
}
|
||||
|
@@ -60,19 +60,25 @@ func (flvPush FlvPush) Write(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func flvPlay(commandMessage CommandMessage) {
|
||||
paramStr := commandMessage.Param
|
||||
playParam := PlayParam{}
|
||||
err := json.Unmarshal([]byte(paramStr), &playParam)
|
||||
if err != nil {
|
||||
logs.Error("flvPlay message format error: %v", err)
|
||||
return
|
||||
}
|
||||
conn, err := connectAndRegister("flvPlay", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("flvPlay connect to server error: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
paramStr := commandMessage.Param
|
||||
playParam := PlayParam{}
|
||||
err = json.Unmarshal([]byte(paramStr), &playParam)
|
||||
if err != nil {
|
||||
logs.Error("flvPlay message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("flvPlay message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
camera_record, err := base_service.CameraRecordSelectById(playParam.IdCameraRecord)
|
||||
if err != nil {
|
||||
|
@@ -15,13 +15,6 @@ type FetchMoreDataParam struct {
|
||||
}
|
||||
|
||||
func flvFetchMoreData(commandMessage CommandMessage) {
|
||||
paramStr := commandMessage.Param
|
||||
param := FetchMoreDataParam{}
|
||||
err := json.Unmarshal([]byte(paramStr), ¶m)
|
||||
if err != nil {
|
||||
logs.Error("flvFetchMoreData message format error: %v", err)
|
||||
return
|
||||
}
|
||||
conn, err := connectAndRegister("flvFetchMoreData", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("flvFetchMoreData connect to server error: %v", err)
|
||||
@@ -29,6 +22,20 @@ func flvFetchMoreData(commandMessage CommandMessage) {
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
paramStr := commandMessage.Param
|
||||
param := FetchMoreDataParam{}
|
||||
err = json.Unmarshal([]byte(paramStr), ¶m)
|
||||
if err != nil {
|
||||
logs.Error("flvFetchMoreData message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("flvFetchMoreData message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
value, ok := playerMap.Load(param.PlayerId)
|
||||
if !ok {
|
||||
logs.Error("playerId: %s not exists or complate", param.PlayerId)
|
||||
|
@@ -16,20 +16,26 @@ type FlvFileMediaInfoParam struct {
|
||||
}
|
||||
|
||||
func flvFileMediaInfo(commandMessage CommandMessage) {
|
||||
paramStr := commandMessage.Param
|
||||
param := FlvFileMediaInfoParam{}
|
||||
err := json.Unmarshal([]byte(paramStr), ¶m)
|
||||
if err != nil {
|
||||
logs.Error("flvFileMediaInfo message format error: %v", err)
|
||||
return
|
||||
}
|
||||
idCameraRecord := param.IdCameraRecord
|
||||
conn, err := connectAndRegister("flvFileMediaInfo", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("flvFileMediaInfo connect to server error: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
paramStr := commandMessage.Param
|
||||
param := FlvFileMediaInfoParam{}
|
||||
err = json.Unmarshal([]byte(paramStr), ¶m)
|
||||
if err != nil {
|
||||
logs.Error("flvFileMediaInfo message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("flvFileMediaInfo message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
idCameraRecord := param.IdCameraRecord
|
||||
|
||||
camera_record, err := base_service.CameraRecordSelectById(idCameraRecord)
|
||||
if err != nil {
|
||||
|
@@ -12,19 +12,25 @@ import (
|
||||
)
|
||||
|
||||
func historyVideoPage(commandMessage CommandMessage) {
|
||||
paramStr := commandMessage.Param
|
||||
pageInfoInput := common.AqPageInfoInput{}
|
||||
err := json.Unmarshal([]byte(paramStr), &pageInfoInput)
|
||||
if err != nil {
|
||||
logs.Error("historyVideoPage message format error: %v", err)
|
||||
return
|
||||
}
|
||||
conn, err := connectAndRegister("historyVideoPage", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("historyVideoPage connect to server error: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
paramStr := commandMessage.Param
|
||||
pageInfoInput := common.AqPageInfoInput{}
|
||||
err = json.Unmarshal([]byte(paramStr), &pageInfoInput)
|
||||
if err != nil {
|
||||
logs.Error("historyVideoPage message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("historyVideoPage message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
pageInfo, err := base_service.CameraRecordFindPageByCondition(pageInfoInput)
|
||||
if err != nil {
|
||||
|
75
src/rtsp2rtmp/tcpclient/rtmp_push_manager.go
Normal file
75
src/rtsp2rtmp/tcpclient/rtmp_push_manager.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package tcpclient
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/beego/beego/v2/core/logs"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
|
||||
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
|
||||
)
|
||||
|
||||
type RtmpPushParam struct {
|
||||
CameraCode string `json:"cameraCode"`
|
||||
}
|
||||
|
||||
func startRtmpPush(commandMessage CommandMessage) {
|
||||
conn, err := connectAndRegister("startPushRtmp", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("startPushRtmp connect to server error: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
paramStr := commandMessage.Param
|
||||
param := RtmpPushParam{}
|
||||
err = json.Unmarshal([]byte(paramStr), ¶m)
|
||||
if err != nil {
|
||||
logs.Error("startPushRtmp message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("startPushRtmp message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
flvadmin.GetSingleRtmpFlvAdmin().RemoteStartWrite(param.CameraCode)
|
||||
|
||||
result := common.SuccessResultData("startPushRtmp success")
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func stopRtmpPush(commandMessage CommandMessage) {
|
||||
conn, err := connectAndRegister("stopPushRtmp", commandMessage.MessageId)
|
||||
if err != nil {
|
||||
logs.Error("stopPushRtmp connect to server error: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
paramStr := commandMessage.Param
|
||||
param := RtmpPushParam{}
|
||||
err = json.Unmarshal([]byte(paramStr), ¶m)
|
||||
if err != nil {
|
||||
logs.Error("stopPushRtmp message format error: %v", err)
|
||||
result := common.ErrorResult(fmt.Sprintf("stopPushRtmp message format error: %v", err))
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
flvadmin.GetSingleRtmpFlvAdmin().RemoteStopWrite(param.CameraCode)
|
||||
|
||||
defer conn.Close()
|
||||
result := common.SuccessResultData("stopPushRtmp success")
|
||||
_, err = writeResult(result, conn)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
}
|
@@ -91,9 +91,9 @@ func RtmpPushChange(ctx *gin.Context) {
|
||||
switch {
|
||||
case q.RtmpPushStatus != true:
|
||||
logs.Info("camera [%s] stop push rtmp", q.Code)
|
||||
flvadmin.GetSingleRtmpFlvAdmin().StopWrite(q.Code)
|
||||
flvadmin.GetSingleRtmpFlvAdmin().ReConntion(q.Code)
|
||||
case q.RtmpPushStatus == true:
|
||||
flvadmin.GetSingleRtmpFlvAdmin().StartWrite(q.Code)
|
||||
flvadmin.GetSingleRtmpFlvAdmin().StartWrite(q.Code, !q.FgPassive)
|
||||
logs.Info("camera [%s] start push rtmp", q.Code)
|
||||
}
|
||||
|
||||
|
@@ -89,6 +89,41 @@ func GetCameraDesc() *common.EntityDesc {
|
||||
DataType: "DateTime",
|
||||
ValueType: "DateTime",
|
||||
};
|
||||
var fgSecretAttributeInfo = &common.AttributeInfo {
|
||||
ColumnName: "fg_secret",
|
||||
Name: "fgSecret",
|
||||
DisplayName: "加密标志",
|
||||
DataType: "Boolean",
|
||||
ValueType: "bool",
|
||||
};
|
||||
var secretAttributeInfo = &common.AttributeInfo {
|
||||
ColumnName: "secret",
|
||||
Name: "secret",
|
||||
DisplayName: "密钥",
|
||||
DataType: "String",
|
||||
ValueType: "string",
|
||||
};
|
||||
var fgPassiveAttributeInfo = &common.AttributeInfo {
|
||||
ColumnName: "fg_passive",
|
||||
Name: "fgPassive",
|
||||
DisplayName: "被动推送rtmp标志",
|
||||
DataType: "Boolean",
|
||||
ValueType: "bool",
|
||||
};
|
||||
var rtmpAuthCodeAttributeInfo = &common.AttributeInfo {
|
||||
ColumnName: "rtmp_auth_code",
|
||||
Name: "rtmpAuthCode",
|
||||
DisplayName: "rtmp识别码",
|
||||
DataType: "String",
|
||||
ValueType: "string",
|
||||
};
|
||||
var cameraTypeAttributeInfo = &common.AttributeInfo {
|
||||
ColumnName: "camera_type",
|
||||
Name: "cameraType",
|
||||
DisplayName: "摄像头类型",
|
||||
DataType: "String",
|
||||
ValueType: "string",
|
||||
};
|
||||
var cameraSharesAttributeInfo = &common.AttributeInfo {
|
||||
ColumnName: "",
|
||||
Name: "cameraShares",
|
||||
@@ -124,6 +159,11 @@ func GetCameraDesc() *common.EntityDesc {
|
||||
"saveVideo": saveVideoAttributeInfo,
|
||||
"live": liveAttributeInfo,
|
||||
"created": createdAttributeInfo,
|
||||
"fgSecret": fgSecretAttributeInfo,
|
||||
"secret": secretAttributeInfo,
|
||||
"fgPassive": fgPassiveAttributeInfo,
|
||||
"rtmpAuthCode": rtmpAuthCodeAttributeInfo,
|
||||
"cameraType": cameraTypeAttributeInfo,
|
||||
"cameraShares": cameraSharesAttributeInfo,
|
||||
},
|
||||
}
|
||||
|
@@ -27,6 +27,16 @@ type Camera struct {
|
||||
Live bool `orm:"column(live)" json:"live"`
|
||||
// 创建时间:
|
||||
Created time.Time `orm:"column(created)" json:"created"`
|
||||
// 加密标志:
|
||||
FgSecret bool `orm:"column(fg_secret)" json:"fgSecret"`
|
||||
// 密钥:
|
||||
Secret string `orm:"column(secret)" json:"secret"`
|
||||
// 被动推送rtmp标志
|
||||
FgPassive bool `orm:"column(fg_passive)" json:"fgPassive"`
|
||||
// rtmp识别码:
|
||||
RtmpAuthCode string `orm:"column(rtmp_auth_code)" json:"rtmpAuthCode"`
|
||||
// 摄像头类型:
|
||||
CameraType string `orm:"column(camera_type)" json:"cameraType"`
|
||||
// 摄像头分享
|
||||
CameraShares []CameraShare `orm:"-" json:"cameraShares"`
|
||||
}
|
||||
|
@@ -28,4 +28,16 @@ type CameraPO struct {
|
||||
Live bool `json:"live"`
|
||||
// 创建时间:
|
||||
Created time.Time `json:"created"`
|
||||
// 加密标志:
|
||||
FgSecret bool `json:"fgSecret"`
|
||||
// 密钥:
|
||||
Secret string `json:"secret"`
|
||||
// 被动推送rtmp标志
|
||||
FgPassive bool `json:"fgPassive"`
|
||||
// rtmp识别码:
|
||||
RtmpAuthCode string `json:"rtmpAuthCode"`
|
||||
// 摄像头类型:
|
||||
CameraType string `json:"cameraType"`
|
||||
// 摄像头分享
|
||||
// CameraShares []CameraShare `json:"cameraShares"`
|
||||
}
|
||||
|
@@ -28,4 +28,16 @@ type CameraVO struct {
|
||||
Live bool `json:"live"`
|
||||
// 创建时间:
|
||||
Created time.Time `json:"created"`
|
||||
// 加密标志:
|
||||
FgSecret bool `json:"fgSecret"`
|
||||
// 密钥:
|
||||
Secret string `json:"secret"`
|
||||
// 被动推送rtmp标志
|
||||
FgPassive bool `json:"fgPassive"`
|
||||
// rtmp识别码:
|
||||
RtmpAuthCode string `json:"rtmpAuthCode"`
|
||||
// 摄像头类型:
|
||||
CameraType string `json:"cameraType"`
|
||||
// 摄像头分享
|
||||
// CameraShares []CameraShareVO `json:"cameraShares"`
|
||||
}
|
||||
|
@@ -57,16 +57,13 @@ func (t *task) offlineCamera() {
|
||||
fgUseFfmpeg = false
|
||||
}
|
||||
for {
|
||||
condition := common.GetEmptyCondition()
|
||||
condition := common.GetEqualCondition("onlineStatus", true)
|
||||
css, err := base_service.CameraFindCollectionByCondition(condition)
|
||||
if err != nil {
|
||||
logs.Error("query camera error : %v", err)
|
||||
}
|
||||
for _, cs := range css {
|
||||
if cs.OnlineStatus != true {
|
||||
continue
|
||||
}
|
||||
if fgUseFfmpeg {
|
||||
if cs.CameraType == "rtmp" || fgUseFfmpeg {
|
||||
if exists := rtmpserver.GetSingleRtmpServer().ExistsPublisher(cs.Code); !exists {
|
||||
cs.OnlineStatus = false
|
||||
base_service.CameraUpdateById(cs)
|
||||
|
Reference in New Issue
Block a user