From 6614b3aaf77125d4e8b2f7715dcf1ebfe5ba8dec Mon Sep 17 00:00:00 2001 From: madao Date: Mon, 3 Feb 2025 11:22:53 +0800 Subject: [PATCH] lazy push rtmp --- main.go | 2 +- resources/conf/conf-prod.yml | 2 +- .../fileflvreader/fileflvreader.go | 124 ++++++++---------- .../fileflvwriter/fileflvwriter.go | 72 +++++----- .../flvadmin/httpflvmanage/httpflvmanager.go | 116 +++++++++++----- .../httpflvwriter/httpflvwriter.go | 25 +--- .../flvadmin/rtmpflvwriter/rtmpflvwriter.go | 41 +++++- src/rtsp2rtmp/tcpclient/camera_aq.go | 13 +- .../camerastatuspush/camera_status_push.go | 24 ++++ src/rtsp2rtmp/tcpclient/command_receive.go | 9 +- src/rtsp2rtmp/tcpclient/flv_data_push.go | 11 +- .../tcpclient/flv_fetch_more_data.go | 11 +- src/rtsp2rtmp/tcpclient/flv_media_info.go | 13 +- src/rtsp2rtmp/tcpclient/history_video_page.go | 13 +- src/rtsp2rtmp/tcpclient/rtmp_push_manager.go | 17 +-- .../tcpclient/{ => tcpclientcommon}/common.go | 30 +++-- src/rtsp2rtmp/web/task/task.go | 2 - src/rtsp2rtmp/web/webapp.go | 18 --- 18 files changed, 311 insertions(+), 232 deletions(-) create mode 100644 src/rtsp2rtmp/tcpclient/camerastatuspush/camera_status_push.go rename src/rtsp2rtmp/tcpclient/{ => tcpclientcommon}/common.go (74%) diff --git a/main.go b/main.go index 1006e85..c431270 100644 --- a/main.go +++ b/main.go @@ -28,8 +28,8 @@ func main() { logs.Error("get use-ffmpeg fail : %v", err) fgUseFfmpeg = false } + rtmpserver.GetSingleRtmpServer().StartRtmpServer() if fgUseFfmpeg { - rtmpserver.GetSingleRtmpServer().StartRtmpServer() ffmpegmanager.GetSingleFFmpegManager().StartClient() } else { rtspclientmanager.GetSingleRtspClientManager().StartClient() diff --git a/resources/conf/conf-prod.yml b/resources/conf/conf-prod.yml index 4740d4a..2bf67e6 100644 --- a/resources/conf/conf-prod.yml +++ b/resources/conf/conf-prod.yml @@ -2,7 +2,7 @@ server: use-ffmpeg: false security: true rtmp: - port: 1936 + port: 1934 http: port: 8080 static: diff --git a/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader/fileflvreader.go b/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader/fileflvreader.go index 2ab5ac2..b77ce56 100644 --- a/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader/fileflvreader.go +++ b/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader/fileflvreader.go @@ -4,6 +4,7 @@ import ( "io" "os" "runtime/debug" + "sync" "time" "github.com/beego/beego/v2/core/config" @@ -28,27 +29,28 @@ type FileFlvReader struct { fgStart bool fgOffSetTime bool offSetTime time.Duration // realTime = pkt.Time - offSetTime, offsetTime from is first pkt.Time + mutex sync.Mutex } -func (ffw *FileFlvReader) GetDone() <-chan int { - return ffw.done +func (ffr *FileFlvReader) GetDone() <-chan int { + return ffr.done } -func (ffw *FileFlvReader) GetCode() string { - return ffw.code +func (ffr *FileFlvReader) GetCode() string { + return ffr.code } -func (ffw *FileFlvReader) SetCodecs(codecs []av.CodecData) { - ffw.codecs = codecs +func (ffr *FileFlvReader) SetCodecs(codecs []av.CodecData) { + ffr.codecs = codecs } -func (ffw *FileFlvReader) GetCodecs() []av.CodecData { - return ffw.codecs +func (ffr *FileFlvReader) GetCodecs() []av.CodecData { + return ffr.codecs } -func (ffw *FileFlvReader) SetSeekSecond(seekSecond uint64) { - if seekSecond > ffw.seekSecond { - ffw.seekSecond = seekSecond +func (ffr *FileFlvReader) SetSeekSecond(seekSecond uint64) { + if seekSecond > ffr.seekSecond { + ffr.seekSecond = seekSecond } } @@ -58,7 +60,7 @@ func NewFileFlvReader( fileName string, ) *FileFlvReader { myHttpWriter := newMyHttpWriter(writer) - ffw := &FileFlvReader{ + ffr := &FileFlvReader{ fgDoneClose: false, done: make(chan int), httpWrite: myHttpWriter, @@ -69,46 +71,38 @@ func NewFileFlvReader( fgOffSetTime: false, offSetTime: 0, } - go ffw.flvRead() - return ffw + go ffr.flvRead() + return ffr } -func (ffw *FileFlvReader) StopRead() { +func (ffr *FileFlvReader) StopRead() { go func() { defer func() { if r := recover(); r != nil { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() - ffw.fgDoneClose = true - close(ffw.done) + ffr.CloseDone() }() } -func (ffw *FileFlvReader) TickerStopRead() { - go func() { - defer func() { - if r := recover(); r != nil { - logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) - } - }() - select { - case <-time.NewTicker(30 * time.Second).C: //等待30秒再关闭 - ffw.fgDoneClose = true - close(ffw.done) - case <-ffw.GetDone(): - } - }() +func (ffr *FileFlvReader) CloseDone() { + ffr.mutex.Lock() + if !ffr.fgDoneClose { + ffr.fgDoneClose = true + close(ffr.done) + } + ffr.mutex.Unlock() } -func (ffw *FileFlvReader) Read(p []byte) (n int, err error) { +func (ffr *FileFlvReader) Read(p []byte) (n int, err error) { defer func() { if r := recover(); r != nil { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() - n, err = ffw.fd.Read(p) + n, err = ffr.fd.Read(p) if err != nil { logs.Error("Read file error : %v", err) } @@ -116,61 +110,59 @@ func (ffw *FileFlvReader) Read(p []byte) (n int, err error) { return } -func (ffw *FileFlvReader) openFlvFile() error { - fullFileName := getFileFlvPath() + "/" + ffw.fileName +func (ffr *FileFlvReader) openFlvFile() error { + fullFileName := getFileFlvPath() + "/" + ffr.fileName fd, err := os.OpenFile(fullFileName, os.O_RDWR, 0644) if err != nil { logs.Error("open file: %s error : %v", fullFileName, err) return err } - ffw.fd = fd - ffw.fullFileName = fullFileName + ffr.fd = fd + ffr.fullFileName = fullFileName return nil } -func (ffw *FileFlvReader) flvRead() { +func (ffr *FileFlvReader) flvRead() { defer func() { if r := recover(); r != nil { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() - if err := ffw.openFlvFile(); err != nil { + if err := ffr.openFlvFile(); err != nil { logs.Error("open file flv error : %v", err) return } defer func() { - ffw.endTime = time.Now() - ffw.fd.Close() + ffr.endTime = time.Now() + ffr.fd.Close() - if !ffw.fgDoneClose { - close(ffw.done) - } + ffr.CloseDone() }() - ffw.startTime = time.Now() - httpWriteMuxer := flv.NewMuxer(ffw.httpWrite) - demuxer := flv.NewDemuxer(ffw) - ffw.muxer = demuxer + ffr.startTime = time.Now() + httpWriteMuxer := flv.NewMuxer(ffr.httpWrite) + demuxer := flv.NewDemuxer(ffr) + ffr.muxer = demuxer codecs, err := demuxer.Streams() if err != nil { logs.Error("read codecs from flv file error : %v", err) return } - ffw.codecs = codecs + ffr.codecs = codecs httpWriteMuxer.WriteHeader(codecs) - if ffw.seekSecond > 0 { + if ffr.seekSecond > 0 { for { pkt, err := demuxer.ReadPacket() if err != nil { - logs.Error("read file %s ReadPacket error : %v", ffw.fileName, err) + logs.Error("read file %s ReadPacket error : %v", ffr.fileName, err) break } - if !ffw.fgOffSetTime { - ffw.fgOffSetTime = true - ffw.offSetTime = pkt.Time + if !ffr.fgOffSetTime { + ffr.fgOffSetTime = true + ffr.offSetTime = pkt.Time } - if (pkt.Time - ffw.offSetTime) >= time.Duration(ffw.seekSecond)*time.Second { + if (pkt.Time - ffr.offSetTime) >= time.Duration(ffr.seekSecond)*time.Second { break } } @@ -182,35 +174,35 @@ Loop: for { pkt, err := demuxer.ReadPacket() if err != nil { - logs.Error("read file %s ReadPacket error : %v", ffw.fileName, err) + logs.Error("read file %s ReadPacket error : %v", ffr.fileName, err) break } - if !ffw.fgOffSetTime { - ffw.fgOffSetTime = true - ffw.offSetTime = pkt.Time + if !ffr.fgOffSetTime { + ffr.fgOffSetTime = true + ffr.offSetTime = pkt.Time } - if !ffw.fgStart { + if !ffr.fgStart { if !pkt.IsKeyFrame { continue } - ffw.fgStart = true + ffr.fgStart = true } err = httpWriteMuxer.WritePacket(pkt) if err != nil { - logs.Error("read file %s WritePacket error : %v", ffw.fileName, err) + logs.Error("read file %s WritePacket error : %v", ffr.fileName, err) break } - sinceTime := time.Since(timeStart) + time.Duration(ffw.seekSecond)*time.Second - if (pkt.Time - ffw.offSetTime) > (sinceTime + 5*time.Minute) { + sinceTime := time.Since(timeStart) + time.Duration(ffr.seekSecond)*time.Second + if (pkt.Time - ffr.offSetTime) > (sinceTime + 5*time.Minute) { select { - case <-ffw.done: + case <-ffr.done: break Loop case <-time.NewTicker(5 * time.Second).C: } } select { - case <-ffw.done: + case <-ffr.done: break Loop default: } diff --git a/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvwriter/fileflvwriter.go b/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvwriter/fileflvwriter.go index f98d53e..966c3d5 100644 --- a/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvwriter/fileflvwriter.go +++ b/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvwriter/fileflvwriter.go @@ -5,6 +5,7 @@ import ( "io" "os" "runtime/debug" + "sync" "time" "github.com/beego/beego/v2/core/config" @@ -39,6 +40,7 @@ type FileFlvWriter struct { endTime time.Time ffm IFileFlvManager idCamera string + mutex sync.Mutex } func (ffw *FileFlvWriter) GetDone() <-chan int { @@ -84,31 +86,7 @@ func NewFileFlvWriter( isStart: false, ffm: ffm, } - condition := common.GetEqualCondition("code", code) - camera, err := base_service.CameraFindOneByCondition(condition) - if err != nil { - logs.Error("query camera error : %v", err) - return ffw - } - if camera.OnlineStatus != true { - return ffw - } - if camera.SaveVideo != true { - go func() { - for { - select { - case <-ffw.GetDone(): - return - case _, ok := <-ffw.pktStream: - if !ok { - return - } - } - } - }() - return ffw - } - ffw.idCamera = camera.Id + go ffw.flvWrite() return ffw } @@ -121,9 +99,17 @@ func (ffw *FileFlvWriter) StopWrite() { } }() ffw.ffm.DeleteFFW(ffw.sessionId) + ffw.CloseDone() + }() +} + +func (ffw *FileFlvWriter) CloseDone() { + ffw.mutex.Lock() + if !ffw.fgDoneClose { ffw.fgDoneClose = true close(ffw.done) - }() + } + ffw.mutex.Unlock() } func (ffw *FileFlvWriter) TickerStopWrite() { @@ -136,8 +122,7 @@ func (ffw *FileFlvWriter) TickerStopWrite() { select { case <-time.NewTicker(30 * time.Second).C: //等待30秒再关闭 ffw.ffm.DeleteFFW(ffw.sessionId) - ffw.fgDoneClose = true - close(ffw.done) + ffw.CloseDone() case <-ffw.GetDone(): } }() @@ -180,11 +165,36 @@ func (ffw *FileFlvWriter) flvWrite() { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() + defer func() { + ffw.CloseDone() + }() + condition := common.GetEqualCondition("code", ffw.code) + camera, err := base_service.CameraFindOneByCondition(condition) + if err != nil { + logs.Error("query camera error : %v", err) + return + } + if !camera.OnlineStatus { + return + } + if !camera.SaveVideo { + for { + select { + case <-ffw.GetDone(): + return + case _, ok := <-ffw.pktStream: + if !ok { + return + } + } + } + } + ffw.idCamera = camera.Id + if err := ffw.createFlvFile(); err != nil { logs.Error("create file flv error : %v", err) return } - defer func() { ffw.endTime = time.Now() ffw.muxer.WriteTrailer() @@ -192,10 +202,6 @@ func (ffw *FileFlvWriter) flvWrite() { //写入script tag data,主要补充视频的总时长,否则使用播放器播放看不到视频总时长 ffw.writeScriptTagData() - - if !ffw.fgDoneClose { - close(ffw.done) - } }() muxer := flv.NewMuxer(ffw) diff --git a/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvmanager.go b/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvmanager.go index 609f9f9..f17e588 100644 --- a/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvmanager.go +++ b/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvmanager.go @@ -1,9 +1,11 @@ package httpflvmanage import ( + "fmt" "io" "runtime/debug" "sync" + "sync/atomic" "time" "github.com/beego/beego/v2/core/logs" @@ -14,12 +16,33 @@ import ( base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) +type SyncMap struct { + sync.Map + size int32 // 原子计数器,用于跟踪map的大小 +} + +func (sm *SyncMap) Store(key, value interface{}) { + sm.Map.Store(key, value) + atomic.AddInt32(&sm.size, 1) // 每次存储时增加计数器 +} + +func (sm *SyncMap) Delete(key interface{}) { + sm.Map.Delete(key) + atomic.AddInt32(&sm.size, -1) // 每次删除时减少计数器 +} + +func (sm *SyncMap) IsEmpty() bool { + return atomic.LoadInt32(&sm.size) == 0 // 加载计数器的值并检查是否为0 +} + type HttpFlvManager struct { - done chan int - pktStream <-chan av.Packet - code string - codecs []av.CodecData - hfws sync.Map + fgDoneClose bool + done chan int + pktStream <-chan av.Packet + code string + codecs []av.CodecData + hfws SyncMap + mutex sync.Mutex } func (hfm *HttpFlvManager) GetCode() string { @@ -50,35 +73,13 @@ func (hfm *HttpFlvManager) GetCodecs() []av.CodecData { func NewHttpFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *HttpFlvManager { hfm := &HttpFlvManager{ - done: make(chan int), - pktStream: pktStream, - code: code, - codecs: codecs, - } - condition := common.GetEqualCondition("code", code) - camera, err := base_service.CameraFindOneByCondition(condition) - if err != nil { - logs.Error("query camera error : %v", err) - return hfm - } - if camera.OnlineStatus != true { - return hfm - } - if camera.Live != true { - go func() { - for { - select { - case <-hfm.GetDone(): - return - case _, ok := <-hfm.pktStream: - if !ok { - return - } - } - } - }() - return hfm + fgDoneClose: false, + done: make(chan int), + pktStream: pktStream, + code: code, + codecs: codecs, } + go hfm.flvWrite() return hfm } @@ -90,10 +91,19 @@ func (hfm *HttpFlvManager) StopWrite() { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() - close(hfm.done) + hfm.CloseDone() }() } +func (hfm *HttpFlvManager) CloseDone() { + hfm.mutex.Lock() + if !hfm.fgDoneClose { + hfm.fgDoneClose = true + close(hfm.done) + } + hfm.mutex.Unlock() +} + // Write extends to writer.Writer func (hfm *HttpFlvManager) flvWrite() { defer func() { @@ -101,6 +111,28 @@ func (hfm *HttpFlvManager) flvWrite() { logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) } }() + defer hfm.CloseDone() + condition := common.GetEqualCondition("code", hfm.code) + camera, err := base_service.CameraFindOneByCondition(condition) + if err != nil { + logs.Error("query camera error : %v", err) + return + } + if !camera.OnlineStatus { + return + } + if !camera.Live { + for { + select { + case <-hfm.GetDone(): + return + case _, ok := <-hfm.pktStream: + if !ok { + return + } + } + } + } for pkt := range utils.OrDonePacket(hfm.done, hfm.pktStream) { hfm.hfws.Range(func(key, value interface{}) bool { wi := value.(*httpflvwriter.HttpFlvWriter) @@ -122,6 +154,18 @@ func (hfm *HttpFlvManager) AddHttpFlvPlayer( pulseInterval time.Duration, writer io.Writer, ) (<-chan int, error) { + condition := common.GetEqualCondition("code", hfm.code) + camera, err := base_service.CameraFindOneByCondition(condition) + if err != nil { + logs.Error("query camera error : %v", err) + return nil, err + } + if !camera.OnlineStatus { + return nil, fmt.Errorf("camera offline") + } + if !camera.Live { + return nil, fmt.Errorf("camera live disabled") + } sessionId := utils.NextValSnowflakeID() //添加缓冲 pktStream := make(chan av.Packet, 1024) @@ -137,3 +181,7 @@ func (hfm *HttpFlvManager) AddHttpFlvPlayer( func (hfm *HttpFlvManager) DeleteHFW(sesessionId int64) { hfm.hfws.LoadAndDelete(sesessionId) } + +func (hfm *HttpFlvManager) IsCameraExistsPlayer() bool { + return hfm.hfws.IsEmpty() +} diff --git a/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvwriter/httpflvwriter.go b/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvwriter/httpflvwriter.go index b9c4eda..9b0338a 100644 --- a/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvwriter/httpflvwriter.go +++ b/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvwriter/httpflvwriter.go @@ -9,8 +9,6 @@ import ( "github.com/deepch/vdk/av" "github.com/deepch/vdk/format/flv" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" - "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" - base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) type IHttpFlvManager interface { @@ -82,28 +80,6 @@ func NewHttpFlvWriter( start: false, } - condition := common.GetEqualCondition("code", code) - camera, err := base_service.CameraFindOneByCondition(condition) - if err != nil { - logs.Error("query camera error : %v", err) - return hfw - } - if camera.OnlineStatus != true { - return hfw - } - if camera.Live != true { - go func() { - for { - select { - case <-hfw.GetDone(): - return - case <-hfw.pktStream: - } - } - }() - return hfw - } - go hfw.httpWrite() return hfw } @@ -117,6 +93,7 @@ func (hfw *HttpFlvWriter) httpWrite() { ticker := time.NewTicker(hfw.pulseInterval) defer func() { + hfw.ihfm.DeleteHFW(hfw.sessionId) close(hfw.done) }() pktStream := utils.OrDonePacket(hfw.playerDone, hfw.pktStream) diff --git a/src/rtsp2rtmp/flvadmin/rtmpflvwriter/rtmpflvwriter.go b/src/rtsp2rtmp/flvadmin/rtmpflvwriter/rtmpflvwriter.go index bce48b0..b0d2e57 100644 --- a/src/rtsp2rtmp/flvadmin/rtmpflvwriter/rtmpflvwriter.go +++ b/src/rtsp2rtmp/flvadmin/rtmpflvwriter/rtmpflvwriter.go @@ -2,11 +2,13 @@ package rtmpflvwriter import ( "runtime/debug" + "sync" "time" "github.com/beego/beego/v2/core/logs" "github.com/deepch/vdk/av" "github.com/deepch/vdk/format/rtmp" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/camerastatuspush" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" @@ -19,6 +21,7 @@ type IRtmpFlvManager interface { type RtmpFlvWriter struct { needPushRtmp bool stop bool + fgDoneClose bool done chan int pktStream <-chan av.Packet code string @@ -28,6 +31,7 @@ type RtmpFlvWriter struct { conn *rtmp.Conn pulseInterval time.Duration irfm IRtmpFlvManager + mutex sync.Mutex } func (rfw *RtmpFlvWriter) GetDone() <-chan int { @@ -59,6 +63,7 @@ func NewRtmpFlvWriter(needPushRtmp bool, pktStream <-chan av.Packet, code string rfw := &RtmpFlvWriter{ needPushRtmp: needPushRtmp, stop: false, + fgDoneClose: false, done: make(chan int), pktStream: pktStream, code: code, @@ -79,10 +84,19 @@ func (rfw *RtmpFlvWriter) StopWrite() { } }() rfw.stop = true - close(rfw.done) + rfw.CloseDone() }() } +func (rfw *RtmpFlvWriter) CloseDone() { + rfw.mutex.Lock() + if !rfw.fgDoneClose { + rfw.fgDoneClose = true + close(rfw.done) + } + rfw.mutex.Unlock() +} + func (rfw *RtmpFlvWriter) createConn() error { condition := common.GetEqualCondition("code", rfw.code) camera, err := base_service.CameraFindOneByCondition(condition) @@ -114,20 +128,37 @@ func (rfw *RtmpFlvWriter) flvWrite() { logs.Error("not found camera : %s", rfw.code) return } - if camera.OnlineStatus != true { + if !camera.OnlineStatus { return } - if camera.RtmpPushStatus != true || !rfw.needPushRtmp { + defer rfw.CloseDone() + if camera.RtmpPushStatus && camera.FgPassive { go func() { + defer func() { + camerastatuspush.CameraOfflinePush(camera.Code) + }() + camerastatuspush.CameraOnlinePush(camera.Code) for { select { case <-rfw.GetDone(): return - case <-rfw.pktStream: + case <-time.NewTicker(1 * time.Second).C: + camerastatuspush.CameraOnlinePush(camera.Code) } } }() - return + } + if !camera.RtmpPushStatus && !rfw.needPushRtmp { + for { + select { + case <-rfw.GetDone(): + return + case _, ok := <-rfw.pktStream: + if !ok { + return + } + } + } } ticker := time.NewTicker(rfw.pulseInterval) diff --git a/src/rtsp2rtmp/tcpclient/camera_aq.go b/src/rtsp2rtmp/tcpclient/camera_aq.go index 1e4dc05..30c6f5b 100644 --- a/src/rtsp2rtmp/tcpclient/camera_aq.go +++ b/src/rtsp2rtmp/tcpclient/camera_aq.go @@ -5,13 +5,14 @@ import ( "fmt" "github.com/beego/beego/v2/core/logs" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" dto_convert "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/convert" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) -func cameraAq(commandMessage CommandMessage) { - conn, err := connectAndRegister("cameraAq", commandMessage.MessageId) +func cameraAq(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("cameraAq", commandMessage.MessageId) if err != nil { logs.Error("cameraAq connect to server error: %v", err) return @@ -24,7 +25,7 @@ func cameraAq(commandMessage CommandMessage) { if err != nil { logs.Error("cameraAq message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("cameraAq message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -36,7 +37,7 @@ func cameraAq(commandMessage CommandMessage) { if err != nil { logs.Error("CameraFindCollectionByCondition error: %v", err) result := common.ErrorResult("CameraFindCollectionByCondition error") - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -47,7 +48,7 @@ func cameraAq(commandMessage CommandMessage) { if err != nil { logs.Error("ConvertCameraToVOList error: %v", err) result := common.ErrorResult("ConvertCameraToVOList error") - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -55,7 +56,7 @@ func cameraAq(commandMessage CommandMessage) { return } result := common.SuccessResultData(voList) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return diff --git a/src/rtsp2rtmp/tcpclient/camerastatuspush/camera_status_push.go b/src/rtsp2rtmp/tcpclient/camerastatuspush/camera_status_push.go new file mode 100644 index 0000000..ce75d05 --- /dev/null +++ b/src/rtsp2rtmp/tcpclient/camerastatuspush/camera_status_push.go @@ -0,0 +1,24 @@ +package camerastatuspush + +import ( + "github.com/beego/beego/v2/core/logs" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" +) + +func CameraOnlinePush(cameraCode string) { + conn, err := tcpclientcommon.ConnectAndCameraStatusRegister("cameraOnline", cameraCode) + if err != nil { + logs.Error("cameraAq connect to server error: %v", err) + return + } + defer conn.Close() +} + +func CameraOfflinePush(cameraCode string) { + conn, err := tcpclientcommon.ConnectAndCameraStatusRegister("cameraOffline", cameraCode) + if err != nil { + logs.Error("cameraAq connect to server error: %v", err) + return + } + defer conn.Close() +} diff --git a/src/rtsp2rtmp/tcpclient/command_receive.go b/src/rtsp2rtmp/tcpclient/command_receive.go index 21cebbc..bba697f 100644 --- a/src/rtsp2rtmp/tcpclient/command_receive.go +++ b/src/rtsp2rtmp/tcpclient/command_receive.go @@ -7,6 +7,7 @@ import ( "github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/logs" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" ) @@ -20,7 +21,7 @@ func StartCommandReceiveServer() { } func commandReceiveConnect() { - conn, err := connectAndRegister("keepChannel", "") + conn, err := tcpclientcommon.ConnectAndKeepChannelRegister("keepChannel") if err != nil { logs.Error("keepChannel connect to server error: %v", err) return @@ -54,7 +55,7 @@ func commandReceiveConnect() { } } - // commandMessage := CommandMessage{} + // commandMessage := tcpclientcommon.CommandMessage{} // err = json.Unmarshal(serverRepBytes, &commandMessage) // if err != nil { // logs.Error("message format error: %v", err) @@ -71,7 +72,7 @@ func commandReceiveConnect() { logs.Error("message DecryptAES error: %v", err) continue } - commandMessage := CommandMessage{} + commandMessage := tcpclientcommon.CommandMessage{} err = json.Unmarshal(commandBytes, &commandMessage) if err != nil { logs.Error("message format error: %v", err) @@ -83,7 +84,7 @@ func commandReceiveConnect() { } } -func commandRes(commandMessage CommandMessage) { +func commandRes(commandMessage tcpclientcommon.CommandMessage) { switch commandMessage.MessageType { case "cameraAq": cameraAq(commandMessage) diff --git a/src/rtsp2rtmp/tcpclient/flv_data_push.go b/src/rtsp2rtmp/tcpclient/flv_data_push.go index f82ace7..044b570 100644 --- a/src/rtsp2rtmp/tcpclient/flv_data_push.go +++ b/src/rtsp2rtmp/tcpclient/flv_data_push.go @@ -9,6 +9,7 @@ import ( "github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/logs" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" @@ -59,8 +60,8 @@ func (flvPush FlvPush) Write(p []byte) (n int, err error) { return len(p), nil } -func flvPlay(commandMessage CommandMessage) { - conn, err := connectAndRegister("flvPlay", commandMessage.MessageId) +func flvPlay(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("flvPlay", commandMessage.MessageId) if err != nil { logs.Error("flvPlay connect to server error: %v", err) return @@ -72,7 +73,7 @@ func flvPlay(commandMessage CommandMessage) { if err != nil { logs.Error("flvPlay message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("flvPlay message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -84,7 +85,7 @@ func flvPlay(commandMessage CommandMessage) { if err != nil { logs.Error("CameraRecordSelectById error: %v", err) result := common.ErrorResult(fmt.Sprintf("idCameraRecord: %s CameraRecordSelectById error", playParam.IdCameraRecord)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -103,7 +104,7 @@ func flvPlay(commandMessage CommandMessage) { if ok { logs.Error("playerId: %s exists", playParam.PlayerId) result := common.ErrorResult(fmt.Sprintf("playerId: %s exists", playParam.PlayerId)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return diff --git a/src/rtsp2rtmp/tcpclient/flv_fetch_more_data.go b/src/rtsp2rtmp/tcpclient/flv_fetch_more_data.go index 2dd8cf3..2c7d965 100644 --- a/src/rtsp2rtmp/tcpclient/flv_fetch_more_data.go +++ b/src/rtsp2rtmp/tcpclient/flv_fetch_more_data.go @@ -6,6 +6,7 @@ import ( "github.com/beego/beego/v2/core/logs" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" ) @@ -14,8 +15,8 @@ type FetchMoreDataParam struct { SeekSecond uint64 `json:"seekSecond"` } -func flvFetchMoreData(commandMessage CommandMessage) { - conn, err := connectAndRegister("flvFetchMoreData", commandMessage.MessageId) +func flvFetchMoreData(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("flvFetchMoreData", commandMessage.MessageId) if err != nil { logs.Error("flvFetchMoreData connect to server error: %v", err) return @@ -28,7 +29,7 @@ func flvFetchMoreData(commandMessage CommandMessage) { if err != nil { logs.Error("flvFetchMoreData message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("flvFetchMoreData message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -40,7 +41,7 @@ func flvFetchMoreData(commandMessage CommandMessage) { if !ok { logs.Error("playerId: %s not exists or complate", param.PlayerId) result := common.SuccessResultMsg(fmt.Sprintf("playerId: %s not exists or complate, skip this request", param.PlayerId)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -52,7 +53,7 @@ func flvFetchMoreData(commandMessage CommandMessage) { logs.Info("vod player [%s] fetch data, addr [%s]", param.PlayerId, conn.LocalAddr().String()) result := common.SuccessResultMsg("fetch sccess") - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return diff --git a/src/rtsp2rtmp/tcpclient/flv_media_info.go b/src/rtsp2rtmp/tcpclient/flv_media_info.go index 2c0277c..62137ad 100644 --- a/src/rtsp2rtmp/tcpclient/flv_media_info.go +++ b/src/rtsp2rtmp/tcpclient/flv_media_info.go @@ -6,6 +6,7 @@ import ( "github.com/beego/beego/v2/core/logs" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dto/vo/ext/flv_file" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" @@ -15,8 +16,8 @@ type FlvFileMediaInfoParam struct { IdCameraRecord string `json:"idCameraRecord"` } -func flvFileMediaInfo(commandMessage CommandMessage) { - conn, err := connectAndRegister("flvFileMediaInfo", commandMessage.MessageId) +func flvFileMediaInfo(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("flvFileMediaInfo", commandMessage.MessageId) if err != nil { logs.Error("flvFileMediaInfo connect to server error: %v", err) return @@ -28,7 +29,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) { if err != nil { logs.Error("flvFileMediaInfo message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("flvFileMediaInfo message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -41,7 +42,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) { if err != nil { logs.Error("idCameraRecord: %s CameraRecordSelectById error: %v", idCameraRecord, err) result := common.ErrorResult(fmt.Sprintf("idCameraRecord: %s CameraRecordSelectById error", idCameraRecord)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -63,7 +64,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) { if err != nil { logs.Error("file: %s get mediaInfo error", camera_record.TempFileName) result := common.ErrorResult(fmt.Sprintf("file: %s get mediaInfo error", camera_record.TempFileName)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -73,7 +74,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) { } result := common.SuccessResultData(mediaInfo) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return diff --git a/src/rtsp2rtmp/tcpclient/history_video_page.go b/src/rtsp2rtmp/tcpclient/history_video_page.go index 345e6dc..184efcd 100644 --- a/src/rtsp2rtmp/tcpclient/history_video_page.go +++ b/src/rtsp2rtmp/tcpclient/history_video_page.go @@ -5,14 +5,15 @@ import ( "fmt" "github.com/beego/beego/v2/core/logs" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" dto_convert "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/convert" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/entity" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) -func historyVideoPage(commandMessage CommandMessage) { - conn, err := connectAndRegister("historyVideoPage", commandMessage.MessageId) +func historyVideoPage(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("historyVideoPage", commandMessage.MessageId) if err != nil { logs.Error("historyVideoPage connect to server error: %v", err) return @@ -24,7 +25,7 @@ func historyVideoPage(commandMessage CommandMessage) { if err != nil { logs.Error("historyVideoPage message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("historyVideoPage message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -36,7 +37,7 @@ func historyVideoPage(commandMessage CommandMessage) { if err != nil { logs.Error("aqPage error : %v", err) result := common.ErrorResult("CameraRecordFindPageByCondition error") - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -51,7 +52,7 @@ func historyVideoPage(commandMessage CommandMessage) { if err != nil { logs.Error("aqPage error: %v", err) result := common.ErrorResult(fmt.Sprintf("ConvertCameraRecordToVOList error")) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -64,7 +65,7 @@ func historyVideoPage(commandMessage CommandMessage) { } pageInfo.DataList = dataList result := common.SuccessResultData(pageInfo) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return diff --git a/src/rtsp2rtmp/tcpclient/rtmp_push_manager.go b/src/rtsp2rtmp/tcpclient/rtmp_push_manager.go index cfe4446..23701a8 100644 --- a/src/rtsp2rtmp/tcpclient/rtmp_push_manager.go +++ b/src/rtsp2rtmp/tcpclient/rtmp_push_manager.go @@ -6,6 +6,7 @@ import ( "github.com/beego/beego/v2/core/logs" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" ) @@ -13,8 +14,8 @@ type RtmpPushParam struct { CameraCode string `json:"cameraCode"` } -func startRtmpPush(commandMessage CommandMessage) { - conn, err := connectAndRegister("startPushRtmp", commandMessage.MessageId) +func startRtmpPush(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("startPushRtmp", commandMessage.MessageId) if err != nil { logs.Error("startPushRtmp connect to server error: %v", err) return @@ -26,7 +27,7 @@ func startRtmpPush(commandMessage CommandMessage) { if err != nil { logs.Error("startPushRtmp message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("startPushRtmp message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -36,15 +37,15 @@ func startRtmpPush(commandMessage CommandMessage) { flvadmin.GetSingleRtmpFlvAdmin().RemoteStartWrite(param.CameraCode) result := common.SuccessResultData("startPushRtmp success") - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return } } -func stopRtmpPush(commandMessage CommandMessage) { - conn, err := connectAndRegister("stopPushRtmp", commandMessage.MessageId) +func stopRtmpPush(commandMessage tcpclientcommon.CommandMessage) { + conn, err := tcpclientcommon.ConnectAndResRegister("stopPushRtmp", commandMessage.MessageId) if err != nil { logs.Error("stopPushRtmp connect to server error: %v", err) return @@ -56,7 +57,7 @@ func stopRtmpPush(commandMessage CommandMessage) { if err != nil { logs.Error("stopPushRtmp message format error: %v", err) result := common.ErrorResult(fmt.Sprintf("stopPushRtmp message format error: %v", err)) - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return @@ -67,7 +68,7 @@ func stopRtmpPush(commandMessage CommandMessage) { defer conn.Close() result := common.SuccessResultData("stopPushRtmp success") - _, err = writeResult(result, conn) + _, err = tcpclientcommon.WriteResult(result, conn) if err != nil { logs.Error(err) return diff --git a/src/rtsp2rtmp/tcpclient/common.go b/src/rtsp2rtmp/tcpclient/tcpclientcommon/common.go similarity index 74% rename from src/rtsp2rtmp/tcpclient/common.go rename to src/rtsp2rtmp/tcpclient/tcpclientcommon/common.go index 8a15fe0..05fda2d 100644 --- a/src/rtsp2rtmp/tcpclient/common.go +++ b/src/rtsp2rtmp/tcpclient/tcpclientcommon/common.go @@ -1,4 +1,4 @@ -package tcpclient +package tcpclientcommon import ( "encoding/json" @@ -25,12 +25,13 @@ type RegisterInfo struct { ClientCode string `json:"clientCode"` DateStr string `json:"dateStr"` Sign string `json:"sign"` - // "keepChannel" "cameraAq" "historyVideoPage" "flvFileMediaInfo" "flvPlay" "flvFetchMoreData" "startPushRtmp" "stopPushRtmp" - ConnType string `json:"connType"` - MessageId string `json:"messageId"` + // "keepChannel" "cameraOnline" "cameraOffline" "cameraAq" "historyVideoPage" "flvFileMediaInfo" "flvPlay" "flvFetchMoreData" "startPushRtmp" "stopPushRtmp" + ConnType string `json:"connType"` + MessageId string `json:"messageId"` + CameraCode string `json:"cameraCode"` } -func newReisterInfo(connType string, messageId string) (ri RegisterInfo, err error) { +func newReisterInfo(connType string, messageId string, cameraCode string) (ri RegisterInfo, err error) { currentDateStr := time.Now().Format(time.RFC3339) clientCode, err := config.String("server.remote.client-code") if err != nil { @@ -51,11 +52,24 @@ func newReisterInfo(connType string, messageId string) (ri RegisterInfo, err err DateStr: currentDateStr, Sign: signStr, MessageId: messageId, + CameraCode: cameraCode, } return } -func connectAndRegister(connType string, messageId string) (conn net.Conn, err error) { +func ConnectAndKeepChannelRegister(connType string) (conn net.Conn, err error) { + return connectAndRegister(connType, "", "") +} + +func ConnectAndCameraStatusRegister(connType string, cameraCode string) (conn net.Conn, err error) { + return connectAndRegister(connType, "", cameraCode) +} + +func ConnectAndResRegister(connType string, messageId string) (conn net.Conn, err error) { + return connectAndRegister(connType, messageId, "") +} + +func connectAndRegister(connType string, messageId string, cameraCode string) (conn net.Conn, err error) { serverIp, err := config.String("server.remote.server-ip") if err != nil { logs.Error("get remote server-ip error: %v. \n", err) @@ -73,7 +87,7 @@ func connectAndRegister(connType string, messageId string) (conn net.Conn, err e } // register to server - ri, err := newReisterInfo(connType, messageId) + ri, err := newReisterInfo(connType, messageId, cameraCode) if err != nil { logs.Error(err) return @@ -94,7 +108,7 @@ func connectAndRegister(connType string, messageId string) (conn net.Conn, err e return } -func writeResult(result common.AppResult, writer io.Writer) (n int, err error) { +func WriteResult(result common.AppResult, writer io.Writer) (n int, err error) { messageBytes, err := json.Marshal(result) if err != nil { logs.Error(err) diff --git a/src/rtsp2rtmp/web/task/task.go b/src/rtsp2rtmp/web/task/task.go index cc3533f..94c02c6 100644 --- a/src/rtsp2rtmp/web/task/task.go +++ b/src/rtsp2rtmp/web/task/task.go @@ -9,7 +9,6 @@ import ( "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" - "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) @@ -40,7 +39,6 @@ func (t *task) clearToken() { } }() for { - web.ClearExipresToken() <-time.After(24 * time.Hour) } } diff --git a/src/rtsp2rtmp/web/webapp.go b/src/rtsp2rtmp/web/webapp.go index c6d9f73..576689f 100644 --- a/src/rtsp2rtmp/web/webapp.go +++ b/src/rtsp2rtmp/web/webapp.go @@ -4,8 +4,6 @@ import ( "net/http" "runtime/debug" "strconv" - "sync" - "time" "github.com/beego/beego/v2/core/config" "github.com/beego/beego/v2/core/logs" @@ -14,22 +12,6 @@ import ( ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/ext" ) -var tokens sync.Map - -func ClearExipresToken() { - deleteTokens := []string{} - // 遍历所有sync.Map中的键值对 - tokens.Range(func(k, v interface{}) bool { - if time.Now().After(v.(time.Time).Add(30 * time.Minute)) { - deleteTokens = append(deleteTokens, k.(string)) - } - return true - }) - for _, v := range deleteTokens { - tokens.Delete(v) - } -} - var webInstance *web type web struct{}