flv文件写入script tag data

This commit is contained in:
hkmadao
2024-11-25 18:38:00 +08:00
parent bab899d01e
commit c061976d29
16 changed files with 742 additions and 355 deletions

View File

@@ -6,7 +6,7 @@ import (
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvmanage" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/result" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/result"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
@@ -236,9 +236,9 @@ func RtmpPushChange(c *gin.Context) {
switch { switch {
case q.RtmpPushStatus != 1: case q.RtmpPushStatus != 1:
logs.Info("camera [%s] stop push rtmp", q.Code) logs.Info("camera [%s] stop push rtmp", q.Code)
flvmanage.GetSingleRtmpFlvManager().StopWrite(q.Code) flvadmin.GetSingleRtmpFlvAdmin().StopWrite(q.Code)
case q.RtmpPushStatus == 1: case q.RtmpPushStatus == 1:
flvmanage.GetSingleRtmpFlvManager().StartWrite(q.Code) flvadmin.GetSingleRtmpFlvAdmin().StartWrite(q.Code)
logs.Info("camera [%s] start push rtmp", q.Code) logs.Info("camera [%s] start push rtmp", q.Code)
} }
@@ -278,9 +278,9 @@ func CameraSaveVideoChange(c *gin.Context) {
switch { switch {
case q.SaveVideo != 1: case q.SaveVideo != 1:
logs.Info("camera [%s] stop save video", q.Code) logs.Info("camera [%s] stop save video", q.Code)
flvmanage.GetSingleFileFlvManager().StopWrite(q.Code) flvadmin.GetSingleFileFlvAdmin().StopWrite(q.Code)
case q.SaveVideo == 1: case q.SaveVideo == 1:
flvmanage.GetSingleFileFlvManager().StartWrite(q.Code) flvadmin.GetSingleFileFlvAdmin().StartWrite(q.Code)
logs.Info("camera [%s] start save video", q.Code) logs.Info("camera [%s] start save video", q.Code)
} }
@@ -319,9 +319,9 @@ func CameraLiveChange(c *gin.Context) {
} }
switch { switch {
case q.Live != 1: case q.Live != 1:
flvmanage.GetSingleHttpflvAdmin().StopWrite(q.Code) flvadmin.GetSingleHttpFlvAdmin().StopWrite(q.Code)
case q.Live == 1: case q.Live == 1:
flvmanage.GetSingleHttpflvAdmin().StartWrite(q.Code) flvadmin.GetSingleHttpFlvAdmin().StartWrite(q.Code)
} }
c.JSON(http.StatusOK, r) c.JSON(http.StatusOK, r)
@@ -359,8 +359,8 @@ func CameraPlayAuthCodeReset(c *gin.Context) {
return return
} }
flvmanage.GetSingleHttpflvAdmin().StopWrite(q.Code) flvadmin.GetSingleHttpFlvAdmin().StopWrite(q.Code)
flvmanage.GetSingleHttpflvAdmin().StartWrite(q.Code) flvadmin.GetSingleHttpFlvAdmin().StartWrite(q.Code)
c.JSON(http.StatusOK, r) c.JSON(http.StatusOK, r)
} }

View File

@@ -8,7 +8,7 @@ import (
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvmanage" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/result" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/result"
) )
@@ -82,7 +82,7 @@ func HttpFlvPlay(c *gin.Context) {
playerDone := make(chan int) playerDone := make(chan int)
defer close(playerDone) defer close(playerDone)
const timeout = 10 * time.Second const timeout = 10 * time.Second
flvPlayerDone, err := flvmanage.GetSingleHttpflvAdmin().AddHttpFlvPlayer(playerDone, timeout/2, code, c.Writer) flvPlayerDone, err := flvadmin.GetSingleHttpFlvAdmin().AddHttpFlvPlayer(playerDone, timeout/2, code, c.Writer)
if err != nil { if err != nil {
logs.Error("camera [%s] add player error : %s", code, err) logs.Error("camera [%s] add player error : %s", code, err)
r.Code = 0 r.Code = 0

View File

@@ -1,192 +0,0 @@
package fileflvwriter
import (
"os"
"runtime/debug"
"time"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/flv"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type IFileFlvManager interface {
UpdateFFWS(string, *FileFlvWriter)
}
type FileFlvWriter struct {
done chan int
pktStream <-chan av.Packet
code string
codecs []av.CodecData
isStart bool
fd *os.File
iffm IFileFlvManager
}
func (ffw *FileFlvWriter) GetDone() <-chan int {
return ffw.done
}
func (ffw *FileFlvWriter) GetPktStream() <-chan av.Packet {
return ffw.pktStream
}
func (ffw *FileFlvWriter) GetCodecs() []av.CodecData {
return ffw.codecs
}
func NewFileFlvWriter(
pktStream <-chan av.Packet,
code string,
codecs []av.CodecData,
iffm IFileFlvManager,
) *FileFlvWriter {
ffw := &FileFlvWriter{
done: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
iffm: iffm,
isStart: false,
}
camera, err := models.CameraSelectOne(models.Camera{Code: code})
if err != nil {
logs.Error("query camera error : %v", err)
return ffw
}
if camera.OnlineStatus != 1 {
return ffw
}
if camera.SaveVideo != 1 {
go func() {
for {
select {
case <-ffw.GetDone():
return
case <-ffw.pktStream:
}
}
}()
return ffw
}
go ffw.flvWrite()
go ffw.splitFile()
return ffw
}
func (ffw *FileFlvWriter) StopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
close(ffw.done)
}()
}
func (ffw *FileFlvWriter) splitFile() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
for {
select {
case <-ffw.done:
return
case <-time.After(1 * time.Hour):
ffw.StopWrite()
_, pktStreamOk := <-ffw.pktStream
if pktStreamOk {
logs.Info("to create NewFileFlvWriter : %s", ffw.code)
ffwn := NewFileFlvWriter(ffw.pktStream, ffw.code, ffw.codecs, ffw.iffm)
ffwn.iffm.UpdateFFWS(ffwn.code, ffwn)
} else {
logs.Info("FileFlvWriter pktStream is closed : %s", ffw.code)
}
return
}
}
}
func (ffw *FileFlvWriter) Write(p []byte) (n int, err error) {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
n, err = ffw.fd.Write(p)
if err != nil {
logs.Error("write file error : %v", err)
}
return
}
func (ffw *FileFlvWriter) createFlvFile() error {
fd, err := os.OpenFile(getFileFlvPath()+"/"+ffw.code+"_"+time.Now().Format("20060102150405")+".flv", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
logs.Error("open file error :", err)
return err
}
ffw.fd = fd
return nil
}
//Write extends to writer.Writer
func (ffw *FileFlvWriter) flvWrite() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
if err := ffw.createFlvFile(); err != nil {
logs.Error("create file flv error : %v", err)
return
}
defer func() {
close(ffw.done)
ffw.fd.Close()
}()
muxer := flv.NewMuxer(ffw)
timeNow := time.Now().Local()
for pkt := range utils.OrDonePacket(ffw.done, ffw.pktStream) {
if ffw.isStart {
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
}
continue
}
if pkt.IsKeyFrame {
ffw.isStart = true
err := muxer.WriteHeader(ffw.codecs)
if err != nil {
logs.Error("writer header to flv file error : %v", err)
ffw.isStart = false
}
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
ffw.isStart = false
}
continue
}
if time.Now().Local().After(timeNow.Add(1 * time.Minute)) {
timeNow = time.Now().Local()
logs.Error("FileFlvWriter ingrore package: %s", ffw.code)
}
}
}
func getFileFlvPath() string {
fileFlvPath, err := config.String("server.fileflv.path")
if err != nil {
logs.Error("get fileflv path error :", err)
return ""
}
return fileFlvPath
}

View File

@@ -0,0 +1,60 @@
package flvadmin
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager"
)
type FileFlvAdmin struct {
ffws sync.Map
}
var ffmInstance *FileFlvAdmin
func init() {
ffmInstance = &FileFlvAdmin{}
}
func GetSingleFileFlvAdmin() *FileFlvAdmin {
return ffmInstance
}
func (ffm *FileFlvAdmin) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
ffw := fileflvmanager.NewFileFlvManager(pktStream, code, codecs)
ffm.ffws.Store(code, ffw)
}
func (ffm *FileFlvAdmin) StopWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvmanager.FileFlvManager)
ffw.StopWrite()
}
}
func (ffm *FileFlvAdmin) StartWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvmanager.FileFlvManager)
ffw.StopWrite()
ffm.FlvWrite(ffw.GetPktStream(), code, ffw.GetCodecs())
}
}
func (ffm *FileFlvAdmin) UpdateFFWS(code string, ffw *fileflvmanager.FileFlvManager) {
_, ok := ffm.ffws.LoadAndDelete(code)
if ok {
ffm.ffws.Store(code, ffw)
}
}
//更新sps、pps等信息
func (ffm *FileFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
rfw, ok := ffm.ffws.Load(code)
if ok {
rfw := rfw.(*fileflvmanager.FileFlvManager)
rfw.SetCodecs(codecs)
}
}

View File

@@ -0,0 +1,144 @@
package fileflvmanager
import (
"runtime/debug"
"sync"
"time"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/fileflvmanager/fileflvwriter"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type FileFlvManager struct {
done chan int
pktStream <-chan av.Packet
code string
codecs []av.CodecData
ffws sync.Map
}
func (ffm *FileFlvManager) GetCode() string {
return ffm.code
}
func (ffm *FileFlvManager) SetCodecs(codecs []av.CodecData) {
ffm.codecs = codecs
ffm.ffws.Range(func(key, value interface{}) bool {
wi := value.(*fileflvwriter.FileFlvWriter)
wi.SetCodecs(ffm.codecs)
return true
})
}
func (ffm *FileFlvManager) GetDone() <-chan int {
return ffm.done
}
func (ffm *FileFlvManager) GetPktStream() <-chan av.Packet {
return ffm.pktStream
}
func (ffm *FileFlvManager) GetCodecs() []av.CodecData {
return ffm.codecs
}
func NewFileFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *FileFlvManager {
ffm := &FileFlvManager{
done: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
}
camera, err := models.CameraSelectOne(models.Camera{Code: code})
if err != nil {
logs.Error("query camera error : %v", err)
return ffm
}
if camera.OnlineStatus != 1 {
return ffm
}
if camera.SaveVideo != 1 {
go func() {
for {
select {
case <-ffm.GetDone():
return
case _, ok := <-ffm.pktStream:
if !ok {
return
}
}
}
}()
return ffm
}
go func() {
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ffm.done:
return
case <-ticker.C:
ffm.ffws.Range(func(key, value interface{}) bool {
ffw := value.(*fileflvwriter.FileFlvWriter)
if ffw.GetCode() == code {
ffw.TickerStopWrite()
}
return true
})
sessionId := utils.NextValSnowflakeID()
//添加缓冲,减少包到达速率震荡导致丢包
pktStream := make(chan av.Packet, 1024)
newFfw := fileflvwriter.NewFileFlvWriter(sessionId, pktStream, code, ffm.codecs, ffm)
ffm.ffws.Store(sessionId, newFfw)
}
}
}()
sessionId := utils.NextValSnowflakeID()
//添加缓冲,减少包到达速率震荡导致丢包
ffwPktStream := make(chan av.Packet, 1024)
newFfw := fileflvwriter.NewFileFlvWriter(sessionId, ffwPktStream, code, codecs, ffm)
ffm.ffws.Store(sessionId, newFfw)
go ffm.flvWrite()
return ffm
}
func (ffm *FileFlvManager) StopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
close(ffm.done)
}()
}
//Write extends to writer.Writer
func (ffm *FileFlvManager) flvWrite() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
for pkt := range utils.OrDonePacket(ffm.done, ffm.pktStream) {
ffm.ffws.Range(func(key, value interface{}) bool {
ffw := value.(*fileflvwriter.FileFlvWriter)
select {
case ffw.GetPktStream() <- pkt:
// logs.Debug("flvWrite pkt")
default:
//当播放者速率跟不上时,会发生丢包
logs.Debug("camera [%s] file flv write fail", ffm.code)
}
return true
})
}
}
func (ffm *FileFlvManager) DeleteFFW(sesessionId int64) {
ffm.ffws.LoadAndDelete(sesessionId)
}

View File

@@ -0,0 +1,267 @@
package fileflvwriter
import (
"encoding/hex"
"io"
"os"
"runtime/debug"
"strings"
"time"
"github.com/beego/beego/v2/core/config"
"github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/flv"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
)
type IFileFlvManager interface {
DeleteFFW(sesessionId int64)
}
type FileFlvWriter struct {
sessionId int64
done chan int
fgDoneClose bool
tickerDone chan int
pktStream chan av.Packet
code string
codecs []av.CodecData
isStart bool
fd *os.File
fileName string
muxer *flv.Muxer
startTime time.Time
ffm IFileFlvManager
}
func (ffw *FileFlvWriter) GetDone() <-chan int {
return ffw.done
}
func (ffw *FileFlvWriter) GetCode() string {
return ffw.code
}
func (ffw *FileFlvWriter) GetPktStream() chan av.Packet {
return ffw.pktStream
}
func (ffw *FileFlvWriter) SetCodecs(codecs []av.CodecData) {
ffw.codecs = codecs
}
func (ffw *FileFlvWriter) GetCodecs() []av.CodecData {
return ffw.codecs
}
func (ffw *FileFlvWriter) GetSessionId() int64 {
return ffw.sessionId
}
func NewFileFlvWriter(
sessionId int64,
pktStream chan av.Packet,
code string,
codecs []av.CodecData,
ffm IFileFlvManager,
) *FileFlvWriter {
ffw := &FileFlvWriter{
sessionId: sessionId,
fgDoneClose: false,
done: make(chan int),
tickerDone: make(chan int),
pktStream: pktStream,
code: code,
codecs: codecs,
isStart: false,
ffm: ffm,
}
camera, err := models.CameraSelectOne(models.Camera{Code: code})
if err != nil {
logs.Error("query camera error : %v", err)
return ffw
}
if camera.OnlineStatus != 1 {
return ffw
}
if camera.SaveVideo != 1 {
go func() {
for {
select {
case <-ffw.GetDone():
return
case _, ok := <-ffw.pktStream:
if !ok {
return
}
}
}
}()
return ffw
}
go ffw.flvWrite()
return ffw
}
func (ffw *FileFlvWriter) TickerStopWrite() {
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
//等待30秒再关闭
<-time.NewTicker(30 * time.Second).C
ffw.ffm.DeleteFFW(ffw.sessionId)
ffw.fgDoneClose = true
close(ffw.done)
}()
}
func (ffw *FileFlvWriter) Write(p []byte) (n int, err error) {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
n, err = ffw.fd.Write(p)
if err != nil {
logs.Error("write file error : %v", err)
}
return
}
func (ffw *FileFlvWriter) createFlvFile() error {
fileName := getFileFlvPath() + "/" + ffw.code + "_" + time.Now().Format("20060102150405") + "_temp.flv"
fd, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
logs.Error("open file error :", err)
return err
}
ffw.fd = fd
ffw.fileName = fileName
return nil
}
//Write extends to writer.Writer
func (ffw *FileFlvWriter) flvWrite() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
if err := ffw.createFlvFile(); err != nil {
logs.Error("create file flv error : %v", err)
return
}
defer func() {
ffw.muxer.WriteTrailer()
ffw.fd.Close()
//写入script tag data主要补充视频的总时长否则使用播放器播放看不到视频总时长
ffw.writeScriptTagData()
if !ffw.fgDoneClose {
close(ffw.done)
}
}()
muxer := flv.NewMuxer(ffw)
ffw.muxer = muxer
timeNow := time.Now().Local()
for pkt := range utils.OrDonePacket(ffw.done, ffw.pktStream) {
if ffw.isStart {
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
}
continue
}
if pkt.IsKeyFrame {
ffw.isStart = true
err := muxer.WriteHeader(ffw.codecs)
if err != nil {
logs.Error("writer header to flv file error : %v", err)
ffw.isStart = false
}
if err := muxer.WritePacket(pkt); err != nil {
logs.Error("writer packet to flv file error : %v", err)
ffw.isStart = false
}
ffw.startTime = time.Now()
continue
}
if time.Now().Local().After(timeNow.Add(1 * time.Minute)) {
timeNow = time.Now().Local()
logs.Error("FileFlvWriter ingrore package: %s", ffw.code)
}
}
}
func (ffw *FileFlvWriter) writeScriptTagData() {
reverseFileName := utils.ReverseString(ffw.fileName)
reverseNewFileName := strings.Replace(reverseFileName, utils.ReverseString("_temp.flv"), utils.ReverseString(".flv"), 1)
newFileName := utils.ReverseString(reverseNewFileName)
newflvFile, err := os.OpenFile(newFileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
logs.Error("create flv file error :", err)
return
}
flvFile, err := os.OpenFile(ffw.fileName, os.O_RDWR, 0644)
if err != nil {
logs.Error("open file error :", err)
return
}
buf := make([]byte, 10*1024)
i := 1
duration := float64(time.Now().Sub(ffw.startTime).Seconds())
durationBytes := utils.Float64ToByteBigEndian(duration)
durationHexStr := hex.EncodeToString(durationBytes)
scriptTagHexStr := "120000250000000000000002000A6F6E4D65746144617461080000000100086475726174696F6E00" + durationHexStr + "00000030"
scriptTagBytes, err := hex.DecodeString(scriptTagHexStr)
if err != nil {
logs.Error("scriptTagHexStr: %s, DecodeString error : ", scriptTagHexStr, err)
return
}
for {
_, err := flvFile.Read(buf)
if err != nil {
if err == io.EOF {
break
}
logs.Error("read flv file error : %v", err)
}
if i == 1 {
i = 2
data1 := make([]byte, len(buf)+52)
copy(data1, buf[:13])
newData := append(data1[:13], scriptTagBytes...)
newData = append(newData, buf[13:]...)
newflvFile.Write(newData)
continue
}
newflvFile.Write(buf)
}
err = flvFile.Close()
if err != nil {
logs.Error("close template flv file error :", err)
return
}
err = os.Remove(ffw.fileName)
if err != nil {
logs.Error("remove template flv file error :", err)
return
}
}
func getFileFlvPath() string {
fileFlvPath, err := config.String("server.fileflv.path")
if err != nil {
logs.Error("get fileflv path error :", err)
return ""
}
return fileFlvPath
}

View File

@@ -1,4 +1,4 @@
package flvmanage package flvadmin
import ( import (
"errors" "errors"
@@ -7,24 +7,24 @@ import (
"time" "time"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/httpflvmanage" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/httpflvmanage"
) )
var hfas *HttpflvAdmin var hfas *HttpFlvAdmin
type HttpflvAdmin struct { type HttpFlvAdmin struct {
hfms sync.Map hfms sync.Map
} }
func init() { func init() {
hfas = &HttpflvAdmin{} hfas = &HttpFlvAdmin{}
} }
func GetSingleHttpflvAdmin() *HttpflvAdmin { func GetSingleHttpFlvAdmin() *HttpFlvAdmin {
return hfas return hfas
} }
func (hfa *HttpflvAdmin) AddHttpFlvManager( func (hfa *HttpFlvAdmin) AddHttpFlvManager(
pktStream <-chan av.Packet, pktStream <-chan av.Packet,
code string, code string,
codecs []av.CodecData, codecs []av.CodecData,
@@ -33,7 +33,7 @@ func (hfa *HttpflvAdmin) AddHttpFlvManager(
hfa.hfms.Store(code, hfm) hfa.hfms.Store(code, hfm)
} }
func (hfa *HttpflvAdmin) StopWrite(code string) { func (hfa *HttpFlvAdmin) StopWrite(code string) {
v, ok := hfa.hfms.Load(code) v, ok := hfa.hfms.Load(code)
if ok { if ok {
ffw := v.(*httpflvmanage.HttpFlvManager) ffw := v.(*httpflvmanage.HttpFlvManager)
@@ -41,7 +41,7 @@ func (hfa *HttpflvAdmin) StopWrite(code string) {
} }
} }
func (hfa *HttpflvAdmin) StartWrite(code string) { func (hfa *HttpFlvAdmin) StartWrite(code string) {
v, ok := hfa.hfms.Load(code) v, ok := hfa.hfms.Load(code)
if ok { if ok {
ffw := v.(*httpflvmanage.HttpFlvManager) ffw := v.(*httpflvmanage.HttpFlvManager)
@@ -51,7 +51,7 @@ func (hfa *HttpflvAdmin) StartWrite(code string) {
} }
//添加播放者 //添加播放者
func (hfa *HttpflvAdmin) AddHttpFlvPlayer( func (hfa *HttpFlvAdmin) AddHttpFlvPlayer(
playerDone <-chan int, playerDone <-chan int,
pulseInterval time.Duration, pulseInterval time.Duration,
code string, code string,
@@ -64,3 +64,12 @@ func (hfa *HttpflvAdmin) AddHttpFlvPlayer(
} }
return nil, errors.New("camera no connection") return nil, errors.New("camera no connection")
} }
//更新sps、pps等信息
func (hfa *HttpFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
rfw, ok := hfa.hfms.Load(code)
if ok {
rfw := rfw.(*httpflvmanage.HttpFlvManager)
rfw.SetCodecs(codecs)
}
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/httpflvwriter" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/httpflvmanage/httpflvwriter"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/models"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
) )
@@ -21,16 +21,30 @@ type HttpFlvManager struct {
hfws sync.Map hfws sync.Map
} }
func (ffw *HttpFlvManager) GetDone() <-chan int { func (hfm *HttpFlvManager) GetCode() string {
return ffw.done return hfm.code
} }
func (ffw *HttpFlvManager) GetPktStream() <-chan av.Packet { func (hfm *HttpFlvManager) SetCodecs(codecs []av.CodecData) {
return ffw.pktStream logs.Warn("HttpFlvManager: %s update codecs", hfm.code)
hfm.codecs = codecs
hfm.hfws.Range(func(key, value interface{}) bool {
wi := value.(*httpflvwriter.HttpFlvWriter)
wi.SetCodecs(hfm.codecs)
return true
})
} }
func (ffw *HttpFlvManager) GetCodecs() []av.CodecData { func (hfm *HttpFlvManager) GetDone() <-chan int {
return ffw.codecs return hfm.done
}
func (hfm *HttpFlvManager) GetPktStream() <-chan av.Packet {
return hfm.pktStream
}
func (hfm *HttpFlvManager) GetCodecs() []av.CodecData {
return hfm.codecs
} }
func NewHttpFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *HttpFlvManager { func NewHttpFlvManager(pktStream <-chan av.Packet, code string, codecs []av.CodecData) *HttpFlvManager {

View File

@@ -39,7 +39,12 @@ func (hfw *HttpFlvWriter) SetCode(code string) {
hfw.code = code hfw.code = code
} }
func (hfw *HttpFlvWriter) GetCode() string {
return hfw.code
}
func (hfw *HttpFlvWriter) SetCodecs(codecs []av.CodecData) { func (hfw *HttpFlvWriter) SetCodecs(codecs []av.CodecData) {
logs.Warn("HttpFlvWriter: %s update codecs", hfw.code)
hfw.codecs = codecs hfw.codecs = codecs
} }

View File

@@ -0,0 +1,64 @@
package flvadmin
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin/rtmpflvwriter"
)
type RtmpFlvAdmin struct {
rfms sync.Map
}
var rfmInstance *RtmpFlvAdmin
func init() {
rfmInstance = &RtmpFlvAdmin{}
}
func GetSingleRtmpFlvAdmin() *RtmpFlvAdmin {
return rfmInstance
}
func (rfm *RtmpFlvAdmin) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
rfw := rtmpflvwriter.NewRtmpFlvWriter(pktStream, code, codecs, rfm)
rfm.rfms.Store(code, rfw)
}
func (rfm *RtmpFlvAdmin) StartWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite()
rfm.FlvWrite(rfw.GetPktStream(), code, rfw.GetCodecs())
}
}
func (rfm *RtmpFlvAdmin) StopWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite()
}
}
func (rfm *RtmpFlvAdmin) UpdateFFWS(code string, rfw *rtmpflvwriter.RtmpFlvWriter) {
_, ok := rfm.rfms.LoadAndDelete(code)
if ok {
rfm.rfms.Store(code, rfw)
}
}
//更新sps、pps等信息
func (rfm *RtmpFlvAdmin) UpdateCodecs(code string, codecs []av.CodecData) {
rfw, ok := rfm.rfms.Load(code)
if ok {
rfw := rfw.(*rtmpflvwriter.RtmpFlvWriter)
rfw.SetCodecs(codecs)
//sps、pps更新后重新建立连接
// logs.Warn("RtmpFlvAdmin: %s codecs change, restart RtmpFlvWriter", code)
//这里只需要stop就可以内部会重连
// rfw.StopWrite()
}
}

View File

@@ -34,6 +34,15 @@ func (rfw *RtmpFlvWriter) GetPktStream() <-chan av.Packet {
return rfw.pktStream return rfw.pktStream
} }
func (rfw *RtmpFlvWriter) GetCode() string {
return rfw.code
}
func (rfw *RtmpFlvWriter) SetCodecs(codecs []av.CodecData) {
logs.Warn("RtmpFlvWriter: %s update codecs", rfw.code)
rfw.codecs = codecs
}
func (rfw *RtmpFlvWriter) GetCodecs() []av.CodecData { func (rfw *RtmpFlvWriter) GetCodecs() []av.CodecData {
return rfw.codecs return rfw.codecs
} }

View File

@@ -1,51 +0,0 @@
package flvmanage
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/fileflvwriter"
)
type fileFlvManager struct {
ffws sync.Map
}
var ffmInstance *fileFlvManager
func init() {
ffmInstance = &fileFlvManager{}
}
func GetSingleFileFlvManager() *fileFlvManager {
return ffmInstance
}
func (ffm *fileFlvManager) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
ffw := fileflvwriter.NewFileFlvWriter(pktStream, code, codecs, ffm)
ffm.ffws.Store(code, ffw)
}
func (ffm *fileFlvManager) StopWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvwriter.FileFlvWriter)
ffw.StopWrite()
}
}
func (ffm *fileFlvManager) StartWrite(code string) {
v, ok := ffm.ffws.Load(code)
if ok {
ffw := v.(*fileflvwriter.FileFlvWriter)
ffw.StopWrite()
ffm.FlvWrite(ffw.GetPktStream(), code, ffw.GetCodecs())
}
}
func (ffm *fileFlvManager) UpdateFFWS(code string, ffw *fileflvwriter.FileFlvWriter) {
_, ok := ffm.ffws.LoadAndDelete(code)
if ok {
ffm.ffws.Store(code, ffw)
}
}

View File

@@ -1,51 +0,0 @@
package flvmanage
import (
"sync"
"github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpflvwriter"
)
type rtmpFlvManager struct {
rfms sync.Map
}
var rfmInstance *rtmpFlvManager
func init() {
rfmInstance = &rtmpFlvManager{}
}
func GetSingleRtmpFlvManager() *rtmpFlvManager {
return rfmInstance
}
func (rfm *rtmpFlvManager) FlvWrite(pktStream <-chan av.Packet, code string, codecs []av.CodecData) {
rfw := rtmpflvwriter.NewRtmpFlvWriter(pktStream, code, codecs, rfm)
rfm.rfms.Store(code, rfw)
}
func (rfm *rtmpFlvManager) StartWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
ffw := v.(*rtmpflvwriter.RtmpFlvWriter)
ffw.StopWrite()
rfm.FlvWrite(ffw.GetPktStream(), code, ffw.GetCodecs())
}
}
func (rfm *rtmpFlvManager) StopWrite(code string) {
v, ok := rfm.rfms.Load(code)
if ok {
ffw := v.(*rtmpflvwriter.RtmpFlvWriter)
ffw.StopWrite()
}
}
func (rfm *rtmpFlvManager) UpdateFFWS(code string, rfw *rtmpflvwriter.RtmpFlvWriter) {
_, ok := rfm.rfms.LoadAndDelete(code)
if ok {
rfm.rfms.Store(code, rfw)
}
}

View File

@@ -3,54 +3,65 @@ package rtspclient
import ( import (
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvmanage" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin"
"github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils"
) )
type IRtspClientManager interface {
Load(key interface{}) (interface{}, bool)
Store(key, value interface{})
Delete(key interface{})
}
type RtspClient struct { type RtspClient struct {
code string code string
codecs []av.CodecData codecs []av.CodecData
connDone <-chan int connDone <-chan int
done chan int
pktStream <-chan av.Packet pktStream <-chan av.Packet
ffmPktStream <-chan av.Packet ffmPktStream <-chan av.Packet
hfmPktStream <-chan av.Packet hfmPktStream <-chan av.Packet
rfmPktStream <-chan av.Packet rfmPktStream <-chan av.Packet
ircm IRtspClientManager
} }
func NewRtspClient(connDone <-chan int, pktStream <-chan av.Packet, code string, codecs []av.CodecData, ircm IRtspClientManager) *RtspClient { func NewRtspClient(connDone <-chan int, pktStream <-chan av.Packet, code string, codecs []av.CodecData) *RtspClient {
r := &RtspClient{ done := make(chan int)
rc := &RtspClient{
connDone: connDone, connDone: connDone,
done: done,
pktStream: pktStream, pktStream: pktStream,
code: code, code: code,
codecs: codecs, codecs: codecs,
ffmPktStream: make(chan av.Packet, 1024), ffmPktStream: make(chan av.Packet, 1024),
hfmPktStream: make(chan av.Packet, 1024), hfmPktStream: make(chan av.Packet, 1024),
rfmPktStream: make(chan av.Packet, 1024), rfmPktStream: make(chan av.Packet, 1024),
ircm: ircm,
} }
r.pktTransfer() rc.pktTransfer()
return r return rc
} }
func (r *RtspClient) Done() { func (rtspClient *RtspClient) Done() {
<-r.connDone <-rtspClient.connDone
} }
func (r *RtspClient) pktTransfer() { //主动关闭
ffmPktStream, hfmPktStream, rfmPktStream := tee(r.connDone, r.pktStream) func (rtspClient *RtspClient) Close() {
r.ffmPktStream = ffmPktStream close(rtspClient.done)
r.hfmPktStream = hfmPktStream }
r.rfmPktStream = rfmPktStream
logs.Debug("publisher [%s] create customer", r.code) //更新sps、pps等信息
flvmanage.GetSingleFileFlvManager().FlvWrite(r.ffmPktStream, r.code, r.codecs) func (rtspClient *RtspClient) UpdateCodecs(codecs []av.CodecData) {
flvmanage.GetSingleHttpflvAdmin().AddHttpFlvManager(r.hfmPktStream, r.code, r.codecs) rtspClient.codecs = codecs
flvmanage.GetSingleRtmpFlvManager().FlvWrite(r.rfmPktStream, r.code, r.codecs) logs.Warn("RtspClient: %s update codecs", rtspClient.code)
flvadmin.GetSingleFileFlvAdmin().UpdateCodecs(rtspClient.code, codecs)
flvadmin.GetSingleHttpFlvAdmin().UpdateCodecs(rtspClient.code, codecs)
flvadmin.GetSingleRtmpFlvAdmin().UpdateCodecs(rtspClient.code, codecs)
}
func (rtspClient *RtspClient) pktTransfer() {
done := utils.OrDoneInt(rtspClient.done, rtspClient.connDone)
ffmPktStream, hfmPktStream, rfmPktStream := tee(done, rtspClient.pktStream)
rtspClient.ffmPktStream = ffmPktStream
rtspClient.hfmPktStream = hfmPktStream
rtspClient.rfmPktStream = rfmPktStream
logs.Info("publisher [%s] create customer", rtspClient.code)
flvadmin.GetSingleFileFlvAdmin().FlvWrite(rtspClient.ffmPktStream, rtspClient.code, rtspClient.codecs)
flvadmin.GetSingleHttpFlvAdmin().AddHttpFlvManager(rtspClient.hfmPktStream, rtspClient.code, rtspClient.codecs)
flvadmin.GetSingleRtmpFlvAdmin().FlvWrite(rtspClient.rfmPktStream, rtspClient.code, rtspClient.codecs)
} }
func tee(done <-chan int, in <-chan av.Packet) (<-chan av.Packet, <-chan av.Packet, <-chan av.Packet) { func tee(done <-chan int, in <-chan av.Packet) (<-chan av.Packet, <-chan av.Packet, <-chan av.Packet) {

View File

@@ -30,7 +30,7 @@ func GetSingleRtspClientManager() *RtspClientManager {
} }
func (rs *RtspClientManager) StartClient() { func (rs *RtspClientManager) StartClient() {
go rs.serveStreams() go rs.startConntions()
go rs.stopConn(controllers.CodeStream()) go rs.stopConn(controllers.CodeStream())
} }
@@ -68,17 +68,27 @@ func (rs *RtspClientManager) stopConn(codeStream <-chan string) {
} }
} }
func (s *RtspClientManager) serveStreams() { func (s *RtspClientManager) startConntions() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
logs.Error("rtspManager panic %v", r) logs.Error("rtspManager panic %v", r)
} }
}() }()
es, err := models.CameraSelectAll()
if err != nil {
logs.Error("camera list query error: %s", err)
return
}
timeTemp := time.Now()
for { for {
es, err := models.CameraSelectAll() timeNow := time.Now()
if err != nil { if timeNow.After(timeTemp.Add(30 * time.Second)) {
logs.Error("camera list is empty") es, err = models.CameraSelectAll()
return if err != nil {
logs.Error("camera list query error: %s", err)
return
}
timeTemp = timeNow
} }
for _, camera := range es { for _, camera := range es {
if v, b := s.rcs.Load(camera.Code); b && v != nil { if v, b := s.rcs.Load(camera.Code); b && v != nil {
@@ -89,7 +99,7 @@ func (s *RtspClientManager) serveStreams() {
} }
go s.connRtsp(camera.Code) go s.connRtsp(camera.Code)
} }
<-time.After(30 * time.Second) <-time.After(1 * time.Second)
} }
} }
@@ -135,19 +145,53 @@ func (s *RtspClientManager) connRtsp(code string) {
return return
} }
codecs := session.CodecData codecs := session.CodecData
// logs.Warn("camera: %s codecs: %v", code, session.CodecData)
c.OnlineStatus = 1 c.OnlineStatus = 1
models.CameraUpdate(c) models.CameraUpdate(c)
done := make(chan int) done := make(chan int)
//添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包 //添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包
pktStream := make(chan av.Packet, 50) pktStream := make(chan av.Packet, 1024)
defer func() { defer func() {
close(done) close(done)
close(pktStream) close(pktStream)
}() }()
rc := rtspclient.NewRtspClient(done, pktStream, code, codecs, s) rc := rtspclient.NewRtspClient(done, pktStream, code, codecs)
go func() {
ticker := time.NewTicker(4 * time.Hour)
for {
select {
case <-ticker.C:
//rtmp server bug, connect can't out 16777215(0xFFFFFF) Millisecond
logs.Warn("camera: %s connect keep 4 hour, disconnect ", code)
session.Close()
return
case <-done:
return
}
}
}()
go func() {
defer func() {
if r := recover(); r != nil {
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
}
}()
for {
select {
case _, ok := <-session.Signals:
if !ok {
return
}
logs.Warn("camera: %s update codecs: %v", code, session.CodecData)
rc.UpdateCodecs(session.CodecData)
case <-done:
return
}
}
}()
s.rcs.Store(code, rc) s.rcs.Store(code, rc)
s.conns.Store(code, session) s.conns.Store(code, session)
logs.Info("%s", string(session.SDPRaw)) logs.Info("%s", string(session.SDPRaw))

View File

@@ -1 +1,55 @@
package utils package utils
import (
"encoding/binary"
"math"
)
func Float32ToByte(float float32) []byte {
bits := math.Float32bits(float)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
func ByteToFloat32(bytes []byte) float32 {
bits := binary.LittleEndian.Uint32(bytes)
return math.Float32frombits(bits)
}
func Float64ToByteLittleEndian(float float64) []byte {
bits := math.Float64bits(float)
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, bits)
return bytes
}
func Float64ToByteBigEndian(float float64) []byte {
bits := math.Float64bits(float)
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, bits)
return bytes
}
func ByteToFloat64(bytes []byte) float64 {
bits := binary.LittleEndian.Uint64(bytes)
return math.Float64frombits(bits)
}
func ReverseString(s string) string {
// 将字符串转换为rune类型的切片
runes := []rune(s)
// 获取字符串长度
n := len(runes)
// 遍历rune类型的切片交换前后两个元素的位置
for i := 0; i < n/2; i++ {
runes[i], runes[n-1-i] = runes[n-1-i], runes[i]
}
// 将rune类型的切片转换为字符串类型并返回
return string(runes)
}