lazy push rtmp

This commit is contained in:
madao
2025-02-03 11:22:53 +08:00
parent 15560614a4
commit 6614b3aaf7
18 changed files with 311 additions and 232 deletions

View File

@@ -28,8 +28,8 @@ func main() {
logs.Error("get use-ffmpeg fail : %v", err)
fgUseFfmpeg = false
}
rtmpserver.GetSingleRtmpServer().StartRtmpServer()
if fgUseFfmpeg {
rtmpserver.GetSingleRtmpServer().StartRtmpServer()
ffmpegmanager.GetSingleFFmpegManager().StartClient()
} else {
rtspclientmanager.GetSingleRtspClientManager().StartClient()

View File

@@ -2,7 +2,7 @@ server:
use-ffmpeg: false
security: true
rtmp:
port: 1936
port: 1934
http:
port: 8080
static:

View File

@@ -4,6 +4,7 @@ import (
"io"
"os"
"runtime/debug"
"sync"
"time"
"github.com/beego/beego/v2/core/config"
@@ -28,27 +29,28 @@ type FileFlvReader struct {
fgStart bool
fgOffSetTime bool
offSetTime time.Duration // realTime = pkt.Time - offSetTime, offsetTime from is first pkt.Time
mutex sync.Mutex
}
func (ffw *FileFlvReader) GetDone() <-chan int {
return ffw.done
func (ffr *FileFlvReader) GetDone() <-chan int {
return ffr.done
}
func (ffw *FileFlvReader) GetCode() string {
return ffw.code
func (ffr *FileFlvReader) GetCode() string {
return ffr.code
}
func (ffw *FileFlvReader) SetCodecs(codecs []av.CodecData) {
ffw.codecs = codecs
func (ffr *FileFlvReader) SetCodecs(codecs []av.CodecData) {
ffr.codecs = codecs
}
func (ffw *FileFlvReader) GetCodecs() []av.CodecData {
return ffw.codecs
func (ffr *FileFlvReader) GetCodecs() []av.CodecData {
return ffr.codecs
}
func (ffw *FileFlvReader) SetSeekSecond(seekSecond uint64) {
if seekSecond > ffw.seekSecond {
ffw.seekSecond = seekSecond
func (ffr *FileFlvReader) SetSeekSecond(seekSecond uint64) {
if seekSecond > ffr.seekSecond {
ffr.seekSecond = seekSecond
}
}
@@ -58,7 +60,7 @@ func NewFileFlvReader(
fileName string,
) *FileFlvReader {
myHttpWriter := newMyHttpWriter(writer)
ffw := &FileFlvReader{
ffr := &FileFlvReader{
fgDoneClose: false,
done: make(chan int),
httpWrite: myHttpWriter,
@@ -69,46 +71,38 @@ func NewFileFlvReader(
fgOffSetTime: false,
offSetTime: 0,
}
go ffw.flvRead()
return ffw
go ffr.flvRead()
return ffr
}
func (ffw *FileFlvReader) StopRead() {
func (ffr *FileFlvReader) StopRead() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
ffw.fgDoneClose = true
close(ffw.done)
ffr.CloseDone()
}()
}
func (ffw *FileFlvReader) TickerStopRead() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
select {
case <-time.NewTicker(30 * time.Second).C: //等待30秒再关闭
ffw.fgDoneClose = true
close(ffw.done)
case <-ffw.GetDone():
}
}()
func (ffr *FileFlvReader) CloseDone() {
ffr.mutex.Lock()
if !ffr.fgDoneClose {
ffr.fgDoneClose = true
close(ffr.done)
}
ffr.mutex.Unlock()
}
func (ffw *FileFlvReader) Read(p []byte) (n int, err error) {
func (ffr *FileFlvReader) Read(p []byte) (n int, err error) {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
n, err = ffw.fd.Read(p)
n, err = ffr.fd.Read(p)
if err != nil {
logs.Error("Read file error : %v", err)
}
@@ -116,61 +110,59 @@ func (ffw *FileFlvReader) Read(p []byte) (n int, err error) {
return
}
func (ffw *FileFlvReader) openFlvFile() error {
fullFileName := getFileFlvPath() + "/" + ffw.fileName
func (ffr *FileFlvReader) openFlvFile() error {
fullFileName := getFileFlvPath() + "/" + ffr.fileName
fd, err := os.OpenFile(fullFileName, os.O_RDWR, 0644)
if err != nil {
logs.Error("open file: %s error : %v", fullFileName, err)
return err
}
ffw.fd = fd
ffw.fullFileName = fullFileName
ffr.fd = fd
ffr.fullFileName = fullFileName
return nil
}
func (ffw *FileFlvReader) flvRead() {
func (ffr *FileFlvReader) flvRead() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
if err := ffw.openFlvFile(); err != nil {
if err := ffr.openFlvFile(); err != nil {
logs.Error("open file flv error : %v", err)
return
}
defer func() {
ffw.endTime = time.Now()
ffw.fd.Close()
ffr.endTime = time.Now()
ffr.fd.Close()
if !ffw.fgDoneClose {
close(ffw.done)
}
ffr.CloseDone()
}()
ffw.startTime = time.Now()
httpWriteMuxer := flv.NewMuxer(ffw.httpWrite)
demuxer := flv.NewDemuxer(ffw)
ffw.muxer = demuxer
ffr.startTime = time.Now()
httpWriteMuxer := flv.NewMuxer(ffr.httpWrite)
demuxer := flv.NewDemuxer(ffr)
ffr.muxer = demuxer
codecs, err := demuxer.Streams()
if err != nil {
logs.Error("read codecs from flv file error : %v", err)
return
}
ffw.codecs = codecs
ffr.codecs = codecs
httpWriteMuxer.WriteHeader(codecs)
if ffw.seekSecond > 0 {
if ffr.seekSecond > 0 {
for {
pkt, err := demuxer.ReadPacket()
if err != nil {
logs.Error("read file %s ReadPacket error : %v", ffw.fileName, err)
logs.Error("read file %s ReadPacket error : %v", ffr.fileName, err)
break
}
if !ffw.fgOffSetTime {
ffw.fgOffSetTime = true
ffw.offSetTime = pkt.Time
if !ffr.fgOffSetTime {
ffr.fgOffSetTime = true
ffr.offSetTime = pkt.Time
}
if (pkt.Time - ffw.offSetTime) >= time.Duration(ffw.seekSecond)*time.Second {
if (pkt.Time - ffr.offSetTime) >= time.Duration(ffr.seekSecond)*time.Second {
break
}
}
@@ -182,35 +174,35 @@ Loop:
for {
pkt, err := demuxer.ReadPacket()
if err != nil {
logs.Error("read file %s ReadPacket error : %v", ffw.fileName, err)
logs.Error("read file %s ReadPacket error : %v", ffr.fileName, err)
break
}
if !ffw.fgOffSetTime {
ffw.fgOffSetTime = true
ffw.offSetTime = pkt.Time
if !ffr.fgOffSetTime {
ffr.fgOffSetTime = true
ffr.offSetTime = pkt.Time
}
if !ffw.fgStart {
if !ffr.fgStart {
if !pkt.IsKeyFrame {
continue
}
ffw.fgStart = true
ffr.fgStart = true
}
err = httpWriteMuxer.WritePacket(pkt)
if err != nil {
logs.Error("read file %s WritePacket error : %v", ffw.fileName, err)
logs.Error("read file %s WritePacket error : %v", ffr.fileName, err)
break
}
sinceTime := time.Since(timeStart) + time.Duration(ffw.seekSecond)*time.Second
if (pkt.Time - ffw.offSetTime) > (sinceTime + 5*time.Minute) {
sinceTime := time.Since(timeStart) + time.Duration(ffr.seekSecond)*time.Second
if (pkt.Time - ffr.offSetTime) > (sinceTime + 5*time.Minute) {
select {
case <-ffw.done:
case <-ffr.done:
break Loop
case <-time.NewTicker(5 * time.Second).C:
}
}
select {
case <-ffw.done:
case <-ffr.done:
break Loop
default:
}

View File

@@ -5,6 +5,7 @@ import (
"io"
"os"
"runtime/debug"
"sync"
"time"
"github.com/beego/beego/v2/core/config"
@@ -39,6 +40,7 @@ type FileFlvWriter struct {
endTime time.Time
ffm IFileFlvManager
idCamera string
mutex sync.Mutex
}
func (ffw *FileFlvWriter) GetDone() <-chan int {
@@ -84,31 +86,7 @@ func NewFileFlvWriter(
isStart: false,
ffm: ffm,
}
condition := common.GetEqualCondition("code", code)
camera, err := base_service.CameraFindOneByCondition(condition)
if err != nil {
logs.Error("query camera error : %v", err)
return ffw
}
if camera.OnlineStatus != true {
return ffw
}
if camera.SaveVideo != true {
go func() {
for {
select {
case <-ffw.GetDone():
return
case _, ok := <-ffw.pktStream:
if !ok {
return
}
}
}
}()
return ffw
}
ffw.idCamera = camera.Id
go ffw.flvWrite()
return ffw
}
@@ -121,9 +99,17 @@ func (ffw *FileFlvWriter) StopWrite() {
}
}()
ffw.ffm.DeleteFFW(ffw.sessionId)
ffw.CloseDone()
}()
}
func (ffw *FileFlvWriter) CloseDone() {
ffw.mutex.Lock()
if !ffw.fgDoneClose {
ffw.fgDoneClose = true
close(ffw.done)
}()
}
ffw.mutex.Unlock()
}
func (ffw *FileFlvWriter) TickerStopWrite() {
@@ -136,8 +122,7 @@ func (ffw *FileFlvWriter) TickerStopWrite() {
select {
case <-time.NewTicker(30 * time.Second).C: //等待30秒再关闭
ffw.ffm.DeleteFFW(ffw.sessionId)
ffw.fgDoneClose = true
close(ffw.done)
ffw.CloseDone()
case <-ffw.GetDone():
}
}()
@@ -180,11 +165,36 @@ func (ffw *FileFlvWriter) flvWrite() {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
defer func() {
ffw.CloseDone()
}()
condition := common.GetEqualCondition("code", ffw.code)
camera, err := base_service.CameraFindOneByCondition(condition)
if err != nil {
logs.Error("query camera error : %v", err)
return
}
if !camera.OnlineStatus {
return
}
if !camera.SaveVideo {
for {
select {
case <-ffw.GetDone():
return
case _, ok := <-ffw.pktStream:
if !ok {
return
}
}
}
}
ffw.idCamera = camera.Id
if err := ffw.createFlvFile(); err != nil {
logs.Error("create file flv error : %v", err)
return
}
defer func() {
ffw.endTime = time.Now()
ffw.muxer.WriteTrailer()
@@ -192,10 +202,6 @@ func (ffw *FileFlvWriter) flvWrite() {
//写入script tag data主要补充视频的总时长否则使用播放器播放看不到视频总时长
ffw.writeScriptTagData()
if !ffw.fgDoneClose {
close(ffw.done)
}
}()
muxer := flv.NewMuxer(ffw)

View File

@@ -1,9 +1,11 @@
package httpflvmanage
import (
"fmt"
"io"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/beego/beego/v2/core/logs"
@@ -14,12 +16,33 @@ import (
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
)
type SyncMap struct {
sync.Map
size int32 // 原子计数器用于跟踪map的大小
}
func (sm *SyncMap) Store(key, value interface{}) {
sm.Map.Store(key, value)
atomic.AddInt32(&sm.size, 1) // 每次存储时增加计数器
}
func (sm *SyncMap) Delete(key interface{}) {
sm.Map.Delete(key)
atomic.AddInt32(&sm.size, -1) // 每次删除时减少计数器
}
func (sm *SyncMap) IsEmpty() bool {
return atomic.LoadInt32(&sm.size) == 0 // 加载计数器的值并检查是否为0
}
type HttpFlvManager struct {
done chan int
pktStream <-chan av.Packet
code string
codecs []av.CodecData
hfws sync.Map
fgDoneClose bool
done chan int
pktStream <-chan av.Packet
code string
codecs []av.CodecData
hfws SyncMap
mutex sync.Mutex
}
func (hfm *HttpFlvManager) GetCode() string {
@@ -50,35 +73,13 @@ func (hfm *HttpFlvManager) GetCodecs() []av.CodecData {
func NewHttpFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *HttpFlvManager {
hfm := &HttpFlvManager{
done: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
}
condition := common.GetEqualCondition("code", code)
camera, err := base_service.CameraFindOneByCondition(condition)
if err != nil {
logs.Error("query camera error : %v", err)
return hfm
}
if camera.OnlineStatus != true {
return hfm
}
if camera.Live != true {
go func() {
for {
select {
case <-hfm.GetDone():
return
case _, ok := <-hfm.pktStream:
if !ok {
return
}
}
}
}()
return hfm
fgDoneClose: false,
done: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
}
go hfm.flvWrite()
return hfm
}
@@ -90,10 +91,19 @@ func (hfm *HttpFlvManager) StopWrite() {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
close(hfm.done)
hfm.CloseDone()
}()
}
func (hfm *HttpFlvManager) CloseDone() {
hfm.mutex.Lock()
if !hfm.fgDoneClose {
hfm.fgDoneClose = true
close(hfm.done)
}
hfm.mutex.Unlock()
}
// Write extends to writer.Writer
func (hfm *HttpFlvManager) flvWrite() {
defer func() {
@@ -101,6 +111,28 @@ func (hfm *HttpFlvManager) flvWrite() {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
defer hfm.CloseDone()
condition := common.GetEqualCondition("code", hfm.code)
camera, err := base_service.CameraFindOneByCondition(condition)
if err != nil {
logs.Error("query camera error : %v", err)
return
}
if !camera.OnlineStatus {
return
}
if !camera.Live {
for {
select {
case <-hfm.GetDone():
return
case _, ok := <-hfm.pktStream:
if !ok {
return
}
}
}
}
for pkt := range utils.OrDonePacket(hfm.done, hfm.pktStream) {
hfm.hfws.Range(func(key, value interface{}) bool {
wi := value.(*httpflvwriter.HttpFlvWriter)
@@ -122,6 +154,18 @@ func (hfm *HttpFlvManager) AddHttpFlvPlayer(
pulseInterval time.Duration,
writer io.Writer,
) (<-chan int, error) {
condition := common.GetEqualCondition("code", hfm.code)
camera, err := base_service.CameraFindOneByCondition(condition)
if err != nil {
logs.Error("query camera error : %v", err)
return nil, err
}
if !camera.OnlineStatus {
return nil, fmt.Errorf("camera offline")
}
if !camera.Live {
return nil, fmt.Errorf("camera live disabled")
}
sessionId := utils.NextValSnowflakeID()
//添加缓冲
pktStream := make(chan av.Packet, 1024)
@@ -137,3 +181,7 @@ func (hfm *HttpFlvManager) AddHttpFlvPlayer(
func (hfm *HttpFlvManager) DeleteHFW(sesessionId int64) {
hfm.hfws.LoadAndDelete(sesessionId)
}
func (hfm *HttpFlvManager) IsCameraExistsPlayer() bool {
return hfm.hfws.IsEmpty()
}

View File

@@ -9,8 +9,6 @@ import (
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/flv"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
)
type IHttpFlvManager interface {
@@ -82,28 +80,6 @@ func NewHttpFlvWriter(
start: false,
}
condition := common.GetEqualCondition("code", code)
camera, err := base_service.CameraFindOneByCondition(condition)
if err != nil {
logs.Error("query camera error : %v", err)
return hfw
}
if camera.OnlineStatus != true {
return hfw
}
if camera.Live != true {
go func() {
for {
select {
case <-hfw.GetDone():
return
case <-hfw.pktStream:
}
}
}()
return hfw
}
go hfw.httpWrite()
return hfw
}
@@ -117,6 +93,7 @@ func (hfw *HttpFlvWriter) httpWrite() {
ticker := time.NewTicker(hfw.pulseInterval)
defer func() {
hfw.ihfm.DeleteHFW(hfw.sessionId)
close(hfw.done)
}()
pktStream := utils.OrDonePacket(hfw.playerDone, hfw.pktStream)

View File

@@ -2,11 +2,13 @@ package rtmpflvwriter
import (
"runtime/debug"
"sync"
"time"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/rtmp"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/camerastatuspush"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
@@ -19,6 +21,7 @@ type IRtmpFlvManager interface {
type RtmpFlvWriter struct {
needPushRtmp bool
stop bool
fgDoneClose bool
done chan int
pktStream <-chan av.Packet
code string
@@ -28,6 +31,7 @@ type RtmpFlvWriter struct {
conn *rtmp.Conn
pulseInterval time.Duration
irfm IRtmpFlvManager
mutex sync.Mutex
}
func (rfw *RtmpFlvWriter) GetDone() <-chan int {
@@ -59,6 +63,7 @@ func NewRtmpFlvWriter(needPushRtmp bool, pktStream <-chan av.Packet, code string
rfw := &RtmpFlvWriter{
needPushRtmp: needPushRtmp,
stop: false,
fgDoneClose: false,
done: make(chan int),
pktStream: pktStream,
code: code,
@@ -79,10 +84,19 @@ func (rfw *RtmpFlvWriter) StopWrite() {
}
}()
rfw.stop = true
close(rfw.done)
rfw.CloseDone()
}()
}
func (rfw *RtmpFlvWriter) CloseDone() {
rfw.mutex.Lock()
if !rfw.fgDoneClose {
rfw.fgDoneClose = true
close(rfw.done)
}
rfw.mutex.Unlock()
}
func (rfw *RtmpFlvWriter) createConn() error {
condition := common.GetEqualCondition("code", rfw.code)
camera, err := base_service.CameraFindOneByCondition(condition)
@@ -114,20 +128,37 @@ func (rfw *RtmpFlvWriter) flvWrite() {
logs.Error("not found camera : %s", rfw.code)
return
}
if camera.OnlineStatus != true {
if !camera.OnlineStatus {
return
}
if camera.RtmpPushStatus != true || !rfw.needPushRtmp {
defer rfw.CloseDone()
if camera.RtmpPushStatus && camera.FgPassive {
go func() {
defer func() {
camerastatuspush.CameraOfflinePush(camera.Code)
}()
camerastatuspush.CameraOnlinePush(camera.Code)
for {
select {
case <-rfw.GetDone():
return
case <-rfw.pktStream:
case <-time.NewTicker(1 * time.Second).C:
camerastatuspush.CameraOnlinePush(camera.Code)
}
}
}()
return
}
if !camera.RtmpPushStatus && !rfw.needPushRtmp {
for {
select {
case <-rfw.GetDone():
return
case _, ok := <-rfw.pktStream:
if !ok {
return
}
}
}
}
ticker := time.NewTicker(rfw.pulseInterval)

View File

@@ -5,13 +5,14 @@ import (
"fmt"
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
dto_convert "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/convert"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
)
func cameraAq(commandMessage CommandMessage) {
conn, err := connectAndRegister("cameraAq", commandMessage.MessageId)
func cameraAq(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("cameraAq", commandMessage.MessageId)
if err != nil {
logs.Error("cameraAq connect to server error: %v", err)
return
@@ -24,7 +25,7 @@ func cameraAq(commandMessage CommandMessage) {
if err != nil {
logs.Error("cameraAq message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("cameraAq message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -36,7 +37,7 @@ func cameraAq(commandMessage CommandMessage) {
if err != nil {
logs.Error("CameraFindCollectionByCondition error: %v", err)
result := common.ErrorResult("CameraFindCollectionByCondition error")
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -47,7 +48,7 @@ func cameraAq(commandMessage CommandMessage) {
if err != nil {
logs.Error("ConvertCameraToVOList error: %v", err)
result := common.ErrorResult("ConvertCameraToVOList error")
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -55,7 +56,7 @@ func cameraAq(commandMessage CommandMessage) {
return
}
result := common.SuccessResultData(voList)
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return

View File

@@ -0,0 +1,24 @@
package camerastatuspush
import (
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
)
func CameraOnlinePush(cameraCode string) {
conn, err := tcpclientcommon.ConnectAndCameraStatusRegister("cameraOnline", cameraCode)
if err != nil {
logs.Error("cameraAq connect to server error: %v", err)
return
}
defer conn.Close()
}
func CameraOfflinePush(cameraCode string) {
conn, err := tcpclientcommon.ConnectAndCameraStatusRegister("cameraOffline", cameraCode)
if err != nil {
logs.Error("cameraAq connect to server error: %v", err)
return
}
defer conn.Close()
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
@@ -20,7 +21,7 @@ func StartCommandReceiveServer() {
}
func commandReceiveConnect() {
conn, err := connectAndRegister("keepChannel", "")
conn, err := tcpclientcommon.ConnectAndKeepChannelRegister("keepChannel")
if err != nil {
logs.Error("keepChannel connect to server error: %v", err)
return
@@ -54,7 +55,7 @@ func commandReceiveConnect() {
}
}
// commandMessage := CommandMessage{}
// commandMessage := tcpclientcommon.CommandMessage{}
// err = json.Unmarshal(serverRepBytes, &commandMessage)
// if err != nil {
// logs.Error("message format error: %v", err)
@@ -71,7 +72,7 @@ func commandReceiveConnect() {
logs.Error("message DecryptAES error: %v", err)
continue
}
commandMessage := CommandMessage{}
commandMessage := tcpclientcommon.CommandMessage{}
err = json.Unmarshal(commandBytes, &commandMessage)
if err != nil {
logs.Error("message format error: %v", err)
@@ -83,7 +84,7 @@ func commandReceiveConnect() {
}
}
func commandRes(commandMessage CommandMessage) {
func commandRes(commandMessage tcpclientcommon.CommandMessage) {
switch commandMessage.MessageType {
case "cameraAq":
cameraAq(commandMessage)

View File

@@ -9,6 +9,7 @@ import (
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
@@ -59,8 +60,8 @@ func (flvPush FlvPush) Write(p []byte) (n int, err error) {
return len(p), nil
}
func flvPlay(commandMessage CommandMessage) {
conn, err := connectAndRegister("flvPlay", commandMessage.MessageId)
func flvPlay(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("flvPlay", commandMessage.MessageId)
if err != nil {
logs.Error("flvPlay connect to server error: %v", err)
return
@@ -72,7 +73,7 @@ func flvPlay(commandMessage CommandMessage) {
if err != nil {
logs.Error("flvPlay message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("flvPlay message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -84,7 +85,7 @@ func flvPlay(commandMessage CommandMessage) {
if err != nil {
logs.Error("CameraRecordSelectById error: %v", err)
result := common.ErrorResult(fmt.Sprintf("idCameraRecord: %s CameraRecordSelectById error", playParam.IdCameraRecord))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -103,7 +104,7 @@ func flvPlay(commandMessage CommandMessage) {
if ok {
logs.Error("playerId: %s exists", playParam.PlayerId)
result := common.ErrorResult(fmt.Sprintf("playerId: %s exists", playParam.PlayerId))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return

View File

@@ -6,6 +6,7 @@ import (
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
)
@@ -14,8 +15,8 @@ type FetchMoreDataParam struct {
SeekSecond uint64 `json:"seekSecond"`
}
func flvFetchMoreData(commandMessage CommandMessage) {
conn, err := connectAndRegister("flvFetchMoreData", commandMessage.MessageId)
func flvFetchMoreData(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("flvFetchMoreData", commandMessage.MessageId)
if err != nil {
logs.Error("flvFetchMoreData connect to server error: %v", err)
return
@@ -28,7 +29,7 @@ func flvFetchMoreData(commandMessage CommandMessage) {
if err != nil {
logs.Error("flvFetchMoreData message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("flvFetchMoreData message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -40,7 +41,7 @@ func flvFetchMoreData(commandMessage CommandMessage) {
if !ok {
logs.Error("playerId: %s not exists or complate", param.PlayerId)
result := common.SuccessResultMsg(fmt.Sprintf("playerId: %s not exists or complate, skip this request", param.PlayerId))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -52,7 +53,7 @@ func flvFetchMoreData(commandMessage CommandMessage) {
logs.Info("vod player [%s] fetch data, addr [%s]", param.PlayerId, conn.LocalAddr().String())
result := common.SuccessResultMsg("fetch sccess")
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return

View File

@@ -6,6 +6,7 @@ import (
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dto/vo/ext/flv_file"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
@@ -15,8 +16,8 @@ type FlvFileMediaInfoParam struct {
IdCameraRecord string `json:"idCameraRecord"`
}
func flvFileMediaInfo(commandMessage CommandMessage) {
conn, err := connectAndRegister("flvFileMediaInfo", commandMessage.MessageId)
func flvFileMediaInfo(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("flvFileMediaInfo", commandMessage.MessageId)
if err != nil {
logs.Error("flvFileMediaInfo connect to server error: %v", err)
return
@@ -28,7 +29,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) {
if err != nil {
logs.Error("flvFileMediaInfo message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("flvFileMediaInfo message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -41,7 +42,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) {
if err != nil {
logs.Error("idCameraRecord: %s CameraRecordSelectById error: %v", idCameraRecord, err)
result := common.ErrorResult(fmt.Sprintf("idCameraRecord: %s CameraRecordSelectById error", idCameraRecord))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -63,7 +64,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) {
if err != nil {
logs.Error("file: %s get mediaInfo error", camera_record.TempFileName)
result := common.ErrorResult(fmt.Sprintf("file: %s get mediaInfo error", camera_record.TempFileName))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -73,7 +74,7 @@ func flvFileMediaInfo(commandMessage CommandMessage) {
}
result := common.SuccessResultData(mediaInfo)
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return

View File

@@ -5,14 +5,15 @@ import (
"fmt"
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
dto_convert "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/convert"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/entity"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
)
func historyVideoPage(commandMessage CommandMessage) {
conn, err := connectAndRegister("historyVideoPage", commandMessage.MessageId)
func historyVideoPage(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("historyVideoPage", commandMessage.MessageId)
if err != nil {
logs.Error("historyVideoPage connect to server error: %v", err)
return
@@ -24,7 +25,7 @@ func historyVideoPage(commandMessage CommandMessage) {
if err != nil {
logs.Error("historyVideoPage message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("historyVideoPage message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -36,7 +37,7 @@ func historyVideoPage(commandMessage CommandMessage) {
if err != nil {
logs.Error("aqPage error : %v", err)
result := common.ErrorResult("CameraRecordFindPageByCondition error")
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -51,7 +52,7 @@ func historyVideoPage(commandMessage CommandMessage) {
if err != nil {
logs.Error("aqPage error: %v", err)
result := common.ErrorResult(fmt.Sprintf("ConvertCameraRecordToVOList error"))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -64,7 +65,7 @@ func historyVideoPage(commandMessage CommandMessage) {
}
pageInfo.DataList = dataList
result := common.SuccessResultData(pageInfo)
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return

View File

@@ -6,6 +6,7 @@ import (
"github.com/beego/beego/v2/core/logs"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/tcpclient/tcpclientcommon"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
)
@@ -13,8 +14,8 @@ type RtmpPushParam struct {
CameraCode string `json:"cameraCode"`
}
func startRtmpPush(commandMessage CommandMessage) {
conn, err := connectAndRegister("startPushRtmp", commandMessage.MessageId)
func startRtmpPush(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("startPushRtmp", commandMessage.MessageId)
if err != nil {
logs.Error("startPushRtmp connect to server error: %v", err)
return
@@ -26,7 +27,7 @@ func startRtmpPush(commandMessage CommandMessage) {
if err != nil {
logs.Error("startPushRtmp message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("startPushRtmp message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -36,15 +37,15 @@ func startRtmpPush(commandMessage CommandMessage) {
flvadmin.GetSingleRtmpFlvAdmin().RemoteStartWrite(param.CameraCode)
result := common.SuccessResultData("startPushRtmp success")
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
}
}
func stopRtmpPush(commandMessage CommandMessage) {
conn, err := connectAndRegister("stopPushRtmp", commandMessage.MessageId)
func stopRtmpPush(commandMessage tcpclientcommon.CommandMessage) {
conn, err := tcpclientcommon.ConnectAndResRegister("stopPushRtmp", commandMessage.MessageId)
if err != nil {
logs.Error("stopPushRtmp connect to server error: %v", err)
return
@@ -56,7 +57,7 @@ func stopRtmpPush(commandMessage CommandMessage) {
if err != nil {
logs.Error("stopPushRtmp message format error: %v", err)
result := common.ErrorResult(fmt.Sprintf("stopPushRtmp message format error: %v", err))
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return
@@ -67,7 +68,7 @@ func stopRtmpPush(commandMessage CommandMessage) {
defer conn.Close()
result := common.SuccessResultData("stopPushRtmp success")
_, err = writeResult(result, conn)
_, err = tcpclientcommon.WriteResult(result, conn)
if err != nil {
logs.Error(err)
return

View File

@@ -1,4 +1,4 @@
package tcpclient
package tcpclientcommon
import (
"encoding/json"
@@ -25,12 +25,13 @@ type RegisterInfo struct {
ClientCode string `json:"clientCode"`
DateStr string `json:"dateStr"`
Sign string `json:"sign"`
// "keepChannel" "cameraAq" "historyVideoPage" "flvFileMediaInfo" "flvPlay" "flvFetchMoreData" "startPushRtmp" "stopPushRtmp"
ConnType string `json:"connType"`
MessageId string `json:"messageId"`
// "keepChannel" "cameraOnline" "cameraOffline" "cameraAq" "historyVideoPage" "flvFileMediaInfo" "flvPlay" "flvFetchMoreData" "startPushRtmp" "stopPushRtmp"
ConnType string `json:"connType"`
MessageId string `json:"messageId"`
CameraCode string `json:"cameraCode"`
}
func newReisterInfo(connType string, messageId string) (ri RegisterInfo, err error) {
func newReisterInfo(connType string, messageId string, cameraCode string) (ri RegisterInfo, err error) {
currentDateStr := time.Now().Format(time.RFC3339)
clientCode, err := config.String("server.remote.client-code")
if err != nil {
@@ -51,11 +52,24 @@ func newReisterInfo(connType string, messageId string) (ri RegisterInfo, err err
DateStr: currentDateStr,
Sign: signStr,
MessageId: messageId,
CameraCode: cameraCode,
}
return
}
func connectAndRegister(connType string, messageId string) (conn net.Conn, err error) {
func ConnectAndKeepChannelRegister(connType string) (conn net.Conn, err error) {
return connectAndRegister(connType, "", "")
}
func ConnectAndCameraStatusRegister(connType string, cameraCode string) (conn net.Conn, err error) {
return connectAndRegister(connType, "", cameraCode)
}
func ConnectAndResRegister(connType string, messageId string) (conn net.Conn, err error) {
return connectAndRegister(connType, messageId, "")
}
func connectAndRegister(connType string, messageId string, cameraCode string) (conn net.Conn, err error) {
serverIp, err := config.String("server.remote.server-ip")
if err != nil {
logs.Error("get remote server-ip error: %v. \n", err)
@@ -73,7 +87,7 @@ func connectAndRegister(connType string, messageId string) (conn net.Conn, err e
}
// register to server
ri, err := newReisterInfo(connType, messageId)
ri, err := newReisterInfo(connType, messageId, cameraCode)
if err != nil {
logs.Error(err)
return
@@ -94,7 +108,7 @@ func connectAndRegister(connType string, messageId string) (conn net.Conn, err e
return
}
func writeResult(result common.AppResult, writer io.Writer) (n int, err error) {
func WriteResult(result common.AppResult, writer io.Writer) (n int, err error) {
messageBytes, err := json.Marshal(result)
if err != nil {
logs.Error(err)

View File

@@ -9,7 +9,6 @@ import (
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvreader"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common"
base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base"
)
@@ -40,7 +39,6 @@ func (t *task) clearToken() {
}
}()
for {
web.ClearExipresToken()
<-time.After(24 * time.Hour)
}
}

View File

@@ -4,8 +4,6 @@ import (
"net/http"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
@@ -14,22 +12,6 @@ import (
ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controller/ext"
)
var tokens sync.Map
func ClearExipresToken() {
deleteTokens := []string{}
// 遍历所有sync.Map中的键值对
tokens.Range(func(k, v interface{}) bool {
if time.Now().After(v.(time.Time).Add(30 * time.Minute)) {
deleteTokens = append(deleteTokens, k.(string))
}
return true
})
for _, v := range deleteTokens {
tokens.Delete(v)
}
}
var webInstance *web
type web struct{}