Merge branch 'develop'

This commit is contained in:
hkmadao
2024-11-28 09:13:06 +08:00
20 changed files with 773 additions and 364 deletions

2
go.mod
View File

@@ -11,4 +11,4 @@ require (
github.com/lib/pq v1.10.9
)
replace github.com/deepch/vdk => github.com/hkmadao/vdk v0.0.0-20241120073805-439b6309323c
replace github.com/deepch/vdk => github.com/hkmadao/vdk v0.0.0-20241127071358-df60b9bc5ae8

4
go.sum
View File

@@ -147,8 +147,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hkmadao/vdk v0.0.0-20241120073805-439b6309323c h1:5yfTO5fnPlulgYt+acbwMWfd3vDpZpO9j+zG59CceQ8=
github.com/hkmadao/vdk v0.0.0-20241120073805-439b6309323c/go.mod h1:JlgGyR2ld6+xOIHa7XAxJh+stSDBAkdNvIPkUIdIywk=
github.com/hkmadao/vdk v0.0.0-20241127071358-df60b9bc5ae8 h1:uc354yQx2guZSJHPZtQh32Q5NQHAWsqPlm00R3dSYac=
github.com/hkmadao/vdk v0.0.0-20241127071358-df60b9bc5ae8/go.mod h1:JlgGyR2ld6+xOIHa7XAxJh+stSDBAkdNvIPkUIdIywk=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20220517205856-0058ec4f073c/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=

View File

@@ -6,7 +6,7 @@ import (
"github.com/beego/beego/v2/core/logs"
"github.com/gin-gonic/gin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvmanage"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/result"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
@@ -236,9 +236,9 @@ func RtmpPushChange(c *gin.Context) {
switch {
case q.RtmpPushStatus != 1:
logs.Info("camera [%s] stop push rtmp", q.Code)
flvmanage.GetSingleRtmpFlvManager().StopWrite(q.Code)
flvadmin.GetSingleRtmpFlvAdmin().StopWrite(q.Code)
case q.RtmpPushStatus == 1:
flvmanage.GetSingleRtmpFlvManager().StartWrite(q.Code)
flvadmin.GetSingleRtmpFlvAdmin().StartWrite(q.Code)
logs.Info("camera [%s] start push rtmp", q.Code)
}
@@ -278,9 +278,9 @@ func CameraSaveVideoChange(c *gin.Context) {
switch {
case q.SaveVideo != 1:
logs.Info("camera [%s] stop save video", q.Code)
flvmanage.GetSingleFileFlvManager().StopWrite(q.Code)
flvadmin.GetSingleFileFlvAdmin().StopWrite(q.Code)
case q.SaveVideo == 1:
flvmanage.GetSingleFileFlvManager().StartWrite(q.Code)
flvadmin.GetSingleFileFlvAdmin().StartWrite(q.Code)
logs.Info("camera [%s] start save video", q.Code)
}
@@ -319,9 +319,9 @@ func CameraLiveChange(c *gin.Context) {
}
switch {
case q.Live != 1:
flvmanage.GetSingleHttpflvAdmin().StopWrite(q.Code)
flvadmin.GetSingleHttpFlvAdmin().StopWrite(q.Code)
case q.Live == 1:
flvmanage.GetSingleHttpflvAdmin().StartWrite(q.Code)
flvadmin.GetSingleHttpFlvAdmin().StartWrite(q.Code)
}
c.JSON(http.StatusOK, r)
@@ -359,8 +359,8 @@ func CameraPlayAuthCodeReset(c *gin.Context) {
return
}
flvmanage.GetSingleHttpflvAdmin().StopWrite(q.Code)
flvmanage.GetSingleHttpflvAdmin().StartWrite(q.Code)
flvadmin.GetSingleHttpFlvAdmin().StopWrite(q.Code)
flvadmin.GetSingleHttpFlvAdmin().StartWrite(q.Code)
c.JSON(http.StatusOK, r)
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/beego/beego/v2/core/logs"
"github.com/gin-gonic/gin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvmanage"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/result"
)
@@ -82,7 +82,7 @@ func HttpFlvPlay(c *gin.Context) {
playerDone := make(chan int)
defer close(playerDone)
const timeout = 10 * time.Second
flvPlayerDone, err := flvmanage.GetSingleHttpflvAdmin().AddHttpFlvPlayer(playerDone, timeout/2, code, c.Writer)
flvPlayerDone, err := flvadmin.GetSingleHttpFlvAdmin().AddHttpFlvPlayer(playerDone, timeout/2, code, c.Writer)
if err != nil {
logs.Error("camera [%s] add player error : %s", code, err)
r.Code = 0

View File

@@ -1,192 +0,0 @@
package fileflvwriter
import (
"os"
"runtime/debug"
"time"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/flv"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type IFileFlvManager interface {
UpdateFFWS(string, *FileFlvWriter)
}
type FileFlvWriter struct {
done chan int
pktStream <-chan av.Packet
code string
codecs []av.CodecData
isStart bool
fd *os.File
iffm IFileFlvManager
}
func (ffw *FileFlvWriter) GetDone() <-chan int {
return ffw.done
}
func (ffw *FileFlvWriter) GetPktStream() <-chan av.Packet {
return ffw.pktStream
}
func (ffw *FileFlvWriter) GetCodecs() []av.CodecData {
return ffw.codecs
}
func NewFileFlvWriter(
pktStream <-chan av.Packet,
code string,
codecs []av.CodecData,
iffm IFileFlvManager,
) *FileFlvWriter {
ffw := &FileFlvWriter{
done: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
iffm: iffm,
isStart: false,
}
camera, err := models.CameraSelectOne(models.Camera{Code: code})
if err != nil {
logs.Error("query camera error : %v", err)
return ffw
}
if camera.OnlineStatus != 1 {
return ffw
}
if camera.SaveVideo != 1 {
go func() {
for {
select {
case <-ffw.GetDone():
return
case <-ffw.pktStream:
}
}
}()
return ffw
}
go ffw.flvWrite()
go ffw.splitFile()
return ffw
}
func (ffw *FileFlvWriter) StopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
close(ffw.done)
}()
}
func (ffw *FileFlvWriter) splitFile() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
for {
select {
case <-ffw.done:
return
case <-time.After(1 * time.Hour):
ffw.StopWrite()
_, pktStreamOk := <-ffw.pktStream
if pktStreamOk {
logs.Info("to create NewFileFlvWriter : %s", ffw.code)
ffwn := NewFileFlvWriter(ffw.pktStream, ffw.code, ffw.codecs, ffw.iffm)
ffwn.iffm.UpdateFFWS(ffwn.code, ffwn)
} else {
logs.Info("FileFlvWriter pktStream is closed : %s", ffw.code)
}
return
}
}
}
func (ffw *FileFlvWriter) Write(p []byte) (n int, err error) {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
n, err = ffw.fd.Write(p)
if err != nil {
logs.Error("write file error : %v", err)
}
return
}
func (ffw *FileFlvWriter) createFlvFile() error {
fd, err := os.OpenFile(getFileFlvPath()+"/"+ffw.code+"_"+time.Now().Format("20060102150405")+".flv", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
logs.Error("open file error :", err)
return err
}
ffw.fd = fd
return nil
}
//Write extends to writer.Writer
func (ffw *FileFlvWriter) flvWrite() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
if err := ffw.createFlvFile(); err != nil {
logs.Error("create file flv error : %v", err)
return
}
defer func() {
close(ffw.done)
ffw.fd.Close()
}()
muxer := flv.NewMuxer(ffw)
timeNow := time.Now().Local()
for pkt := range utils.OrDonePacket(ffw.done, ffw.pktStream) {
if ffw.isStart {
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
}
continue
}
if pkt.IsKeyFrame {
ffw.isStart = true
err := muxer.WriteHeader(ffw.codecs)
if err != nil {
logs.Error("writer header to flv file error : %v", err)
ffw.isStart = false
}
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
ffw.isStart = false
}
continue
}
if time.Now().Local().After(timeNow.Add(1 * time.Minute)) {
timeNow = time.Now().Local()
logs.Error("FileFlvWriter ingrore package: %s", ffw.code)
}
}
}
func getFileFlvPath() string {
fileFlvPath, err := config.String("server.fileflv.path")
if err != nil {
logs.Error("get fileflv path error :", err)
return ""
}
return fileFlvPath
}

View File

@@ -0,0 +1,60 @@
package flvadmin
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager"
)
type FileFlvAdmin struct {
ffws sync.Map
}
var ffmInstance *FileFlvAdmin
func init() {
ffmInstance = &FileFlvAdmin{}
}
func GetSingleFileFlvAdmin() *FileFlvAdmin {
return ffmInstance
}
func (ffm *FileFlvAdmin) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
ffw := fileflvmanager.NewFileFlvManager(pktStream, code, codecs)
ffm.ffws.Store(code, ffw)
}
func (ffm *FileFlvAdmin) StopWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvmanager.FileFlvManager)
ffw.StopWrite()
}
}
func (ffm *FileFlvAdmin) StartWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvmanager.FileFlvManager)
ffw.StopWrite()
ffm.FlvWrite(ffw.GetPktStream(), code, ffw.GetCodecs())
}
}
func (ffm *FileFlvAdmin) UpdateFFWS(code string, ffw *fileflvmanager.FileFlvManager) {
_, ok := ffm.ffws.LoadAndDelete(code)
if ok {
ffm.ffws.Store(code, ffw)
}
}
//更新sps、pps等信息
func (ffm *FileFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
rfw, ok := ffm.ffws.Load(code)
if ok {
rfw := rfw.(*fileflvmanager.FileFlvManager)
rfw.SetCodecs(codecs)
}
}

View File

@@ -0,0 +1,157 @@
package fileflvmanager
import (
"runtime/debug"
"sync"
"time"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvwriter"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type FileFlvManager struct {
done chan int
fgDoneClose bool
pktStream <-chan av.Packet
code string
codecs []av.CodecData
ffws sync.Map
}
func (ffm *FileFlvManager) GetCode() string {
return ffm.code
}
func (ffm *FileFlvManager) SetCodecs(codecs []av.CodecData) {
ffm.codecs = codecs
ffm.ffws.Range(func(key, value interface{}) bool {
wi := value.(*fileflvwriter.FileFlvWriter)
wi.SetCodecs(ffm.codecs)
return true
})
}
func (ffm *FileFlvManager) GetDone() <-chan int {
return ffm.done
}
func (ffm *FileFlvManager) GetPktStream() <-chan av.Packet {
return ffm.pktStream
}
func (ffm *FileFlvManager) GetCodecs() []av.CodecData {
return ffm.codecs
}
func NewFileFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *FileFlvManager {
ffm := &FileFlvManager{
done: make(chan int),
fgDoneClose: false,
pktStream: pktStream,
code: code,
codecs: codecs,
}
camera, err := models.CameraSelectOne(models.Camera{Code: code})
if err != nil {
logs.Error("query camera error : %v", err)
return ffm
}
if camera.OnlineStatus != 1 {
return ffm
}
if camera.SaveVideo != 1 {
go func() {
for {
select {
case <-ffm.GetDone():
return
case _, ok := <-ffm.pktStream:
if !ok {
return
}
}
}
}()
return ffm
}
go func() {
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ffm.GetDone():
return
case <-ticker.C:
ffm.ffws.Range(func(key, value interface{}) bool {
ffw := value.(*fileflvwriter.FileFlvWriter)
if ffw.GetCode() == code {
ffw.TickerStopWrite()
}
return true
})
sessionId := utils.NextValSnowflakeID()
//添加缓冲,减少包到达速率震荡导致丢包
pktStream := make(chan av.Packet, 1024)
newFfw := fileflvwriter.NewFileFlvWriter(sessionId, pktStream, code, ffm.codecs, ffm)
ffm.ffws.Store(sessionId, newFfw)
}
}
}()
sessionId := utils.NextValSnowflakeID()
//添加缓冲,减少包到达速率震荡导致丢包
ffwPktStream := make(chan av.Packet, 1024)
newFfw := fileflvwriter.NewFileFlvWriter(sessionId, ffwPktStream, code, codecs, ffm)
ffm.ffws.Store(sessionId, newFfw)
go ffm.flvWrite()
return ffm
}
func (ffm *FileFlvManager) StopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
ffm.fgDoneClose = true
close(ffm.done)
}()
}
//Write extends to writer.Writer
func (ffm *FileFlvManager) flvWrite() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
defer func() {
if !ffm.fgDoneClose {
close(ffm.done)
}
}()
for pkt := range utils.OrDonePacket(ffm.done, ffm.pktStream) {
ffm.ffws.Range(func(key, value interface{}) bool {
ffw := value.(*fileflvwriter.FileFlvWriter)
select {
case ffw.GetPktStream() <- pkt:
// logs.Debug("flvWrite pkt")
default:
//当播放者速率跟不上时,会发生丢包
logs.Debug("camera [%s] file flv write fail", ffm.code)
}
return true
})
}
ffm.ffws.Range(func(key, value interface{}) bool {
ffw := value.(*fileflvwriter.FileFlvWriter)
ffw.StopWrite()
return true
})
}
func (ffm *FileFlvManager) DeleteFFW(sesessionId int64) {
ffm.ffws.LoadAndDelete(sesessionId)
}

View File

@@ -0,0 +1,284 @@
package fileflvwriter
import (
"encoding/hex"
"io"
"os"
"runtime/debug"
"strings"
"time"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/flv"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type IFileFlvManager interface {
DeleteFFW(sesessionId int64)
}
type FileFlvWriter struct {
sessionId int64
done chan int
fgDoneClose bool
tickerDone chan int
pktStream chan av.Packet
code string
codecs []av.CodecData
isStart bool
fd *os.File
fileName string
muxer *flv.Muxer
startTime time.Time
endTime time.Time
ffm IFileFlvManager
}
func (ffw *FileFlvWriter) GetDone() <-chan int {
return ffw.done
}
func (ffw *FileFlvWriter) GetCode() string {
return ffw.code
}
func (ffw *FileFlvWriter) GetPktStream() chan av.Packet {
return ffw.pktStream
}
func (ffw *FileFlvWriter) SetCodecs(codecs []av.CodecData) {
ffw.codecs = codecs
}
func (ffw *FileFlvWriter) GetCodecs() []av.CodecData {
return ffw.codecs
}
func (ffw *FileFlvWriter) GetSessionId() int64 {
return ffw.sessionId
}
func NewFileFlvWriter(
sessionId int64,
pktStream chan av.Packet,
code string,
codecs []av.CodecData,
ffm IFileFlvManager,
) *FileFlvWriter {
ffw := &FileFlvWriter{
sessionId: sessionId,
fgDoneClose: false,
done: make(chan int),
tickerDone: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
isStart: false,
ffm: ffm,
}
camera, err := models.CameraSelectOne(models.Camera{Code: code})
if err != nil {
logs.Error("query camera error : %v", err)
return ffw
}
if camera.OnlineStatus != 1 {
return ffw
}
if camera.SaveVideo != 1 {
go func() {
for {
select {
case <-ffw.GetDone():
return
case _, ok := <-ffw.pktStream:
if !ok {
return
}
}
}
}()
return ffw
}
go ffw.flvWrite()
return ffw
}
func (ffw *FileFlvWriter) StopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
ffw.ffm.DeleteFFW(ffw.sessionId)
ffw.fgDoneClose = true
close(ffw.done)
}()
}
func (ffw *FileFlvWriter) TickerStopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
select {
case <-time.NewTicker(30 * time.Second).C: //等待30秒再关闭
ffw.ffm.DeleteFFW(ffw.sessionId)
ffw.fgDoneClose = true
close(ffw.done)
case <-ffw.GetDone():
}
}()
}
func (ffw *FileFlvWriter) Write(p []byte) (n int, err error) {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
n, err = ffw.fd.Write(p)
if err != nil {
logs.Error("write file error : %v", err)
}
return
}
func (ffw *FileFlvWriter) createFlvFile() error {
fileName := getFileFlvPath() + "/" + ffw.code + "_" + time.Now().Format("20060102150405") + "_temp.flv"
fd, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
logs.Error("open file error :", err)
return err
}
ffw.fd = fd
ffw.fileName = fileName
return nil
}
//Write extends to writer.Writer
func (ffw *FileFlvWriter) flvWrite() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
if err := ffw.createFlvFile(); err != nil {
logs.Error("create file flv error : %v", err)
return
}
defer func() {
ffw.endTime = time.Now()
ffw.muxer.WriteTrailer()
ffw.fd.Close()
//写入script tag data主要补充视频的总时长否则使用播放器播放看不到视频总时长
ffw.writeScriptTagData()
if !ffw.fgDoneClose {
close(ffw.done)
}
}()
muxer := flv.NewMuxer(ffw)
ffw.muxer = muxer
timeNow := time.Now().Local()
for pkt := range utils.OrDonePacket(ffw.done, ffw.pktStream) {
if ffw.isStart {
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
}
continue
}
if pkt.IsKeyFrame {
ffw.isStart = true
err := muxer.WriteHeader(ffw.codecs)
if err != nil {
logs.Error("writer header to flv file error : %v", err)
ffw.isStart = false
}
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
ffw.isStart = false
}
ffw.startTime = time.Now()
continue
}
if time.Now().Local().After(timeNow.Add(1 * time.Minute)) {
timeNow = time.Now().Local()
logs.Error("FileFlvWriter ingrore package: %s", ffw.code)
}
}
}
func (ffw *FileFlvWriter) writeScriptTagData() {
reverseFileName := utils.ReverseString(ffw.fileName)
reverseNewFileName := strings.Replace(reverseFileName, utils.ReverseString("_temp.flv"), utils.ReverseString(".flv"), 1)
newFileName := utils.ReverseString(reverseNewFileName)
newflvFile, err := os.OpenFile(newFileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
logs.Error("create flv file error :", err)
return
}
flvFile, err := os.OpenFile(ffw.fileName, os.O_RDWR, 0644)
if err != nil {
logs.Error("open file error :", err)
return
}
buf := make([]byte, 10*1024)
i := 1
duration := float64(ffw.endTime.Sub(ffw.startTime).Seconds())
durationBytes := utils.Float64ToByteBigEndian(duration)
durationHexStr := hex.EncodeToString(durationBytes)
scriptTagHexStr := "120000250000000000000002000A6F6E4D65746144617461080000000100086475726174696F6E00" + durationHexStr + "00000030"
scriptTagBytes, err := hex.DecodeString(scriptTagHexStr)
if err != nil {
logs.Error("scriptTagHexStr: %s, DecodeString error : ", scriptTagHexStr, err)
return
}
for {
_, err := flvFile.Read(buf)
if err != nil {
if err == io.EOF {
break
}
logs.Error("read flv file error : %v", err)
}
if i == 1 {
i = 2
data1 := make([]byte, len(buf)+52)
copy(data1, buf[:13])
newData := append(data1[:13], scriptTagBytes...)
newData = append(newData, buf[13:]...)
newflvFile.Write(newData)
continue
}
newflvFile.Write(buf)
}
err = flvFile.Close()
if err != nil {
logs.Error("close template flv file error :", err)
return
}
err = os.Remove(ffw.fileName)
if err != nil {
logs.Error("remove template flv file error :", err)
return
}
}
func getFileFlvPath() string {
fileFlvPath, err := config.String("server.fileflv.path")
if err != nil {
logs.Error("get fileflv path error :", err)
return ""
}
return fileFlvPath
}

View File

@@ -1,4 +1,4 @@
package flvmanage
package flvadmin
import (
"errors"
@@ -7,24 +7,24 @@ import (
"time"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/httpflvmanage"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/httpflvmanage"
)
var hfas *HttpflvAdmin
var hfas *HttpFlvAdmin
type HttpflvAdmin struct {
type HttpFlvAdmin struct {
hfms sync.Map
}
func init() {
hfas = &HttpflvAdmin{}
hfas = &HttpFlvAdmin{}
}
func GetSingleHttpflvAdmin() *HttpflvAdmin {
func GetSingleHttpFlvAdmin() *HttpFlvAdmin {
return hfas
}
func (hfa *HttpflvAdmin) AddHttpFlvManager(
func (hfa *HttpFlvAdmin) AddHttpFlvManager(
pktStream <-chan av.Packet,
code string,
codecs []av.CodecData,
@@ -33,7 +33,7 @@ func (hfa *HttpflvAdmin) AddHttpFlvManager(
hfa.hfms.Store(code, hfm)
}
func (hfa *HttpflvAdmin) StopWrite(code string) {
func (hfa *HttpFlvAdmin) StopWrite(code string) {
v, ok := hfa.hfms.Load(code)
if ok {
ffw := v.(*httpflvmanage.HttpFlvManager)
@@ -41,7 +41,7 @@ func (hfa *HttpflvAdmin) StopWrite(code string) {
}
}
func (hfa *HttpflvAdmin) StartWrite(code string) {
func (hfa *HttpFlvAdmin) StartWrite(code string) {
v, ok := hfa.hfms.Load(code)
if ok {
ffw := v.(*httpflvmanage.HttpFlvManager)
@@ -51,7 +51,7 @@ func (hfa *HttpflvAdmin) StartWrite(code string) {
}
//添加播放者
func (hfa *HttpflvAdmin) AddHttpFlvPlayer(
func (hfa *HttpFlvAdmin) AddHttpFlvPlayer(
playerDone <-chan int,
pulseInterval time.Duration,
code string,
@@ -64,3 +64,12 @@ func (hfa *HttpflvAdmin) AddHttpFlvPlayer(
}
return nil, errors.New("camera no connection")
}
//更新sps、pps等信息
func (hfa *HttpFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
rfw, ok := hfa.hfms.Load(code)
if ok {
rfw := rfw.(*httpflvmanage.HttpFlvManager)
rfw.SetCodecs(codecs)
}
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/httpflvwriter"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvwriter"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
@@ -21,16 +21,30 @@ type HttpFlvManager struct {
hfws sync.Map
}
func (ffw *HttpFlvManager) GetDone() <-chan int {
return ffw.done
func (hfm *HttpFlvManager) GetCode() string {
return hfm.code
}
func (ffw *HttpFlvManager) GetPktStream() <-chan av.Packet {
return ffw.pktStream
func (hfm *HttpFlvManager) SetCodecs(codecs []av.CodecData) {
logs.Warn("HttpFlvManager: %s update codecs", hfm.code)
hfm.codecs = codecs
hfm.hfws.Range(func(key, value interface{}) bool {
wi := value.(*httpflvwriter.HttpFlvWriter)
wi.SetCodecs(hfm.codecs)
return true
})
}
func (ffw *HttpFlvManager) GetCodecs() []av.CodecData {
return ffw.codecs
func (hfm *HttpFlvManager) GetDone() <-chan int {
return hfm.done
}
func (hfm *HttpFlvManager) GetPktStream() <-chan av.Packet {
return hfm.pktStream
}
func (hfm *HttpFlvManager) GetCodecs() []av.CodecData {
return hfm.codecs
}
func NewHttpFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *HttpFlvManager {

View File

@@ -39,7 +39,12 @@ func (hfw *HttpFlvWriter) SetCode(code string) {
hfw.code = code
}
func (hfw *HttpFlvWriter) GetCode() string {
return hfw.code
}
func (hfw *HttpFlvWriter) SetCodecs(codecs []av.CodecData) {
logs.Warn("HttpFlvWriter: %s update codecs", hfw.code)
hfw.codecs = codecs
}

View File

@@ -0,0 +1,64 @@
package flvadmin
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/rtmpflvwriter"
)
type RtmpFlvAdmin struct {
rfms sync.Map
}
var rfmInstance *RtmpFlvAdmin
func init() {
rfmInstance = &RtmpFlvAdmin{}
}
func GetSingleRtmpFlvAdmin() *RtmpFlvAdmin {
return rfmInstance
}
func (rfm *RtmpFlvAdmin) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
rfw := rtmpflvwriter.NewRtmpFlvWriter(pktStream, code, codecs, rfm)
rfm.rfms.Store(code, rfw)
}
func (rfm *RtmpFlvAdmin) StartWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite()
rfm.FlvWrite(rfw.GetPktStream(), code, rfw.GetCodecs())
}
}
func (rfm *RtmpFlvAdmin) StopWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite()
}
}
func (rfm *RtmpFlvAdmin) UpdateFFWS(code string, rfw *rtmpflvwriter.RtmpFlvWriter) {
_, ok := rfm.rfms.LoadAndDelete(code)
if ok {
rfm.rfms.Store(code, rfw)
}
}
//更新sps、pps等信息
func (rfm *RtmpFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
rfw, ok := rfm.rfms.Load(code)
if ok {
rfw := rfw.(*rtmpflvwriter.RtmpFlvWriter)
rfw.SetCodecs(codecs)
//sps、pps更新后重新建立连接
// logs.Warn("RtmpFlvAdmin: %s codecs change, restart RtmpFlvWriter", code)
//这里只需要stop就可以内部会重连
// rfw.StopWrite()
}
}

View File

@@ -21,6 +21,7 @@ type RtmpFlvWriter struct {
code string
codecs []av.CodecData
start bool
startTime time.Time
conn *rtmp.Conn
pulseInterval time.Duration
irfm IRtmpFlvManager
@@ -34,6 +35,15 @@ func (rfw *RtmpFlvWriter) GetPktStream() <-chan av.Packet {
return rfw.pktStream
}
func (rfw *RtmpFlvWriter) GetCode() string {
return rfw.code
}
func (rfw *RtmpFlvWriter) SetCodecs(codecs []av.CodecData) {
logs.Warn("RtmpFlvWriter: %s update codecs", rfw.code)
rfw.codecs = codecs
}
func (rfw *RtmpFlvWriter) GetCodecs() []av.CodecData {
return rfw.codecs
}
@@ -138,6 +148,10 @@ func (rfw *RtmpFlvWriter) flvWrite() {
if !ok {
return
}
// if rfw.start {
// pktTime := time.Now().Sub(rfw.startTime)
// pkt.Time = pktTime
// }
if err := rfw.writerPacket(pkt, &timeNow); err != nil {
logs.Error("flvWrite error : %v", err)
return
@@ -162,6 +176,7 @@ func (rfw *RtmpFlvWriter) writerPacket(pkt av.Packet, templateTime *time.Time) e
}
var err error
err = rfw.conn.WriteHeader(rfw.codecs)
rfw.startTime = time.Now()
logs.Info("KeyFrame WriteHeader to rtmp server : %s, codesc: %v", rfw.code, rfw.codecs)
if err != nil {
logs.Error("writer header to rtmp server error : %v", err)

View File

@@ -1,51 +0,0 @@
package flvmanage
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/fileflvwriter"
)
type fileFlvManager struct {
ffws sync.Map
}
var ffmInstance *fileFlvManager
func init() {
ffmInstance = &fileFlvManager{}
}
func GetSingleFileFlvManager() *fileFlvManager {
return ffmInstance
}
func (ffm *fileFlvManager) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
ffw := fileflvwriter.NewFileFlvWriter(pktStream, code, codecs, ffm)
ffm.ffws.Store(code, ffw)
}
func (ffm *fileFlvManager) StopWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvwriter.FileFlvWriter)
ffw.StopWrite()
}
}
func (ffm *fileFlvManager) StartWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvwriter.FileFlvWriter)
ffw.StopWrite()
ffm.FlvWrite(ffw.GetPktStream(), code, ffw.GetCodecs())
}
}
func (ffm *fileFlvManager) UpdateFFWS(code string, ffw *fileflvwriter.FileFlvWriter) {
_, ok := ffm.ffws.LoadAndDelete(code)
if ok {
ffm.ffws.Store(code, ffw)
}
}

View File

@@ -1,51 +0,0 @@
package flvmanage
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpflvwriter"
)
type rtmpFlvManager struct {
rfms sync.Map
}
var rfmInstance *rtmpFlvManager
func init() {
rfmInstance = &rtmpFlvManager{}
}
func GetSingleRtmpFlvManager() *rtmpFlvManager {
return rfmInstance
}
func (rfm *rtmpFlvManager) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
rfw := rtmpflvwriter.NewRtmpFlvWriter(pktStream, code, codecs, rfm)
rfm.rfms.Store(code, rfw)
}
func (rfm *rtmpFlvManager) StartWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
ffw := v.(*rtmpflvwriter.RtmpFlvWriter)
ffw.StopWrite()
rfm.FlvWrite(ffw.GetPktStream(), code, ffw.GetCodecs())
}
}
func (rfm *rtmpFlvManager) StopWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
ffw := v.(*rtmpflvwriter.RtmpFlvWriter)
ffw.StopWrite()
}
}
func (rfm *rtmpFlvManager) UpdateFFWS(code string, rfw *rtmpflvwriter.RtmpFlvWriter) {
_, ok := rfm.rfms.LoadAndDelete(code)
if ok {
rfm.rfms.Store(code, rfw)
}
}

View File

@@ -98,6 +98,6 @@ func CameraSelectAll() (es []Camera, err error) {
logs.Error("查询出错:%v", err)
return es, err
}
logs.Info("查询到%d条记录", num)
logs.Debug("查询到%d条记录", num)
return es, nil
}

View File

@@ -95,7 +95,7 @@ func CameraShareSelectAll() (es []CameraShare, err error) {
logs.Error("查询出错:%v", err)
return es, err
}
logs.Info("查询到%d条记录", num)
logs.Debug("查询到%d条记录", num)
return es, nil
}

View File

@@ -3,54 +3,65 @@ package rtspclient
import (
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvmanage"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type IRtspClientManager interface {
Load(key interface{}) (interface{}, bool)
Store(key, value interface{})
Delete(key interface{})
}
type RtspClient struct {
code string
codecs []av.CodecData
connDone <-chan int
done chan int
pktStream <-chan av.Packet
ffmPktStream <-chan av.Packet
hfmPktStream <-chan av.Packet
rfmPktStream <-chan av.Packet
ircm IRtspClientManager
}
func NewRtspClient(connDone <-chan int, pktStream <-chan av.Packet, code string, codecs []av.CodecData, ircm IRtspClientManager) *RtspClient {
r := &RtspClient{
func NewRtspClient(connDone <-chan int, pktStream <-chan av.Packet, code string, codecs []av.CodecData) *RtspClient {
done := make(chan int)
rc := &RtspClient{
connDone: connDone,
done: done,
pktStream: pktStream,
code: code,
codecs: codecs,
ffmPktStream: make(chan av.Packet, 1024),
hfmPktStream: make(chan av.Packet, 1024),
rfmPktStream: make(chan av.Packet, 1024),
ircm: ircm,
}
r.pktTransfer()
return r
rc.pktTransfer()
return rc
}
func (r *RtspClient) Done() {
<-r.connDone
func (rtspClient *RtspClient) Done() {
<-rtspClient.connDone
}
func (r *RtspClient) pktTransfer() {
ffmPktStream, hfmPktStream, rfmPktStream := tee(r.connDone, r.pktStream)
r.ffmPktStream = ffmPktStream
r.hfmPktStream = hfmPktStream
r.rfmPktStream = rfmPktStream
logs.Debug("publisher [%s] create customer", r.code)
flvmanage.GetSingleFileFlvManager().FlvWrite(r.ffmPktStream, r.code, r.codecs)
flvmanage.GetSingleHttpflvAdmin().AddHttpFlvManager(r.hfmPktStream, r.code, r.codecs)
flvmanage.GetSingleRtmpFlvManager().FlvWrite(r.rfmPktStream, r.code, r.codecs)
//主动关闭
func (rtspClient *RtspClient) Close() {
close(rtspClient.done)
}
//更新sps、pps等信息
func (rtspClient *RtspClient) UpdateCodecs(codecs []av.CodecData) {
rtspClient.codecs = codecs
logs.Warn("RtspClient: %s update codecs", rtspClient.code)
flvadmin.GetSingleFileFlvAdmin().UpdateCodecs(rtspClient.code, codecs)
flvadmin.GetSingleHttpFlvAdmin().UpdateCodecs(rtspClient.code, codecs)
flvadmin.GetSingleRtmpFlvAdmin().UpdateCodecs(rtspClient.code, codecs)
}
func (rtspClient *RtspClient) pktTransfer() {
done := utils.OrDoneInt(rtspClient.done, rtspClient.connDone)
ffmPktStream, hfmPktStream, rfmPktStream := tee(done, rtspClient.pktStream)
rtspClient.ffmPktStream = ffmPktStream
rtspClient.hfmPktStream = hfmPktStream
rtspClient.rfmPktStream = rfmPktStream
logs.Info("publisher [%s] create customer", rtspClient.code)
flvadmin.GetSingleFileFlvAdmin().FlvWrite(rtspClient.ffmPktStream, rtspClient.code, rtspClient.codecs)
flvadmin.GetSingleHttpFlvAdmin().AddHttpFlvManager(rtspClient.hfmPktStream, rtspClient.code, rtspClient.codecs)
flvadmin.GetSingleRtmpFlvAdmin().FlvWrite(rtspClient.rfmPktStream, rtspClient.code, rtspClient.codecs)
}
func tee(done <-chan int, in <-chan av.Packet) (<-chan av.Packet, <-chan av.Packet, <-chan av.Packet) {

View File

@@ -30,7 +30,7 @@ func GetSingleRtspClientManager() *RtspClientManager {
}
func (rs *RtspClientManager) StartClient() {
go rs.serveStreams()
go rs.startConnections()
go rs.stopConn(controllers.CodeStream())
}
@@ -60,6 +60,7 @@ func (rs *RtspClientManager) stopConn(codeStream <-chan string) {
if b {
r := v.(*rtspv2.RTSPClient)
r.Close()
close(r.OutgoingPacketQueue)
logs.Info("camera [%s] close success", code)
rs.conns.Delete(code)
} else {
@@ -68,17 +69,27 @@ func (rs *RtspClientManager) stopConn(codeStream <-chan string) {
}
}
func (s *RtspClientManager) serveStreams() {
func (s *RtspClientManager) startConnections() {
defer func() {
if r := recover(); r != nil {
logs.Error("rtspManager panic %v", r)
}
}()
es, err := models.CameraSelectAll()
if err != nil {
logs.Error("camera list query error: %s", err)
return
}
timeTemp := time.Now()
for {
es, err := models.CameraSelectAll()
if err != nil {
logs.Error("camera list is empty")
return
timeNow := time.Now()
if timeNow.After(timeTemp.Add(30 * time.Second)) {
es, err = models.CameraSelectAll()
if err != nil {
logs.Error("camera list query error: %s", err)
return
}
timeTemp = timeNow
}
for _, camera := range es {
if v, b := s.rcs.Load(camera.Code); b && v != nil {
@@ -89,7 +100,7 @@ func (s *RtspClientManager) serveStreams() {
}
go s.connRtsp(camera.Code)
}
<-time.After(30 * time.Second)
<-time.After(1 * time.Second)
}
}
@@ -117,37 +128,56 @@ func (s *RtspClientManager) connRtsp(code string) {
return
}
logs.Info(c.Code, "connect", c.RtspURL)
ro := rtspv2.RTSPClientOptions{
rtspClientOptions := rtspv2.RTSPClientOptions{
URL: c.RtspURL,
Debug: false,
DialTimeout: 10 * time.Second,
ReadWriteTimeout: 10 * time.Second,
DisableAudio: false,
}
session, err := rtspv2.Dial(ro)
session, err := rtspv2.Dial(rtspClientOptions)
if err != nil {
logs.Error("camera [%s] conn : %v", c.Code, err)
c.OnlineStatus = 0
time.Sleep(5 * time.Second)
if c.OnlineStatus == 1 {
models.CameraUpdate(c)
}
return
}
codecs := session.CodecData
// logs.Warn("camera: %s codecs: %v", code, session.CodecData)
c.OnlineStatus = 1
models.CameraUpdate(c)
done := make(chan int)
//添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包
pktStream := make(chan av.Packet, 50)
pktStream := make(chan av.Packet, 1024)
defer func() {
close(done)
close(pktStream)
}()
rc := rtspclient.NewRtspClient(done, pktStream, code, codecs, s)
rc := rtspclient.NewRtspClient(done, pktStream, code, codecs)
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
for {
select {
case _, ok := <-session.Signals:
if !ok {
return
}
logs.Warn("camera: %s update codecs: %v", code, session.CodecData)
rc.UpdateCodecs(session.CodecData)
case <-done:
return
}
}
}()
s.rcs.Store(code, rc)
s.conns.Store(code, session)
logs.Info("%s", string(session.SDPRaw))
@@ -184,7 +214,7 @@ Loop:
models.CameraUpdate(camera)
}
logs.Error("session Close error : %v", err)
logs.Error("camera: %s session Close", code)
session.Close()
}

View File

@@ -1 +1,55 @@
package utils
import (
"encoding/binary"
"math"
)
func Float32ToByte(float float32) []byte {
bits := math.Float32bits(float)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
func ByteToFloat32(bytes []byte) float32 {
bits := binary.LittleEndian.Uint32(bytes)
return math.Float32frombits(bits)
}
func Float64ToByteLittleEndian(float float64) []byte {
bits := math.Float64bits(float)
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, bits)
return bytes
}
func Float64ToByteBigEndian(float float64) []byte {
bits := math.Float64bits(float)
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, bits)
return bytes
}
func ByteToFloat64(bytes []byte) float64 {
bits := binary.LittleEndian.Uint64(bytes)
return math.Float64frombits(bits)
}
func ReverseString(s string) string {
// 将字符串转换为rune类型的切片
runes := []rune(s)
// 获取字符串长度
n := len(runes)
// 遍历rune类型的切片交换前后两个元素的位置
for i := 0; i < n/2; i++ {
runes[i], runes[n-1-i] = runes[n-1-i], runes[i]
}
// 将rune类型的切片转换为字符串类型并返回
return string(runes)
}