From 549ec6c2aa3aa1c3644fd22e37458d14242e9bee Mon Sep 17 00:00:00 2001 From: madao Date: Mon, 16 Dec 2024 18:07:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BD=BF=E7=94=A8ffmpeg?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5rtsp=EF=BC=8C=E9=9F=B3=E9=A2=91=E8=BD=AC?= =?UTF-8?q?=E7=A0=81=E4=B8=BAaac,=E8=BD=AC=E5=8F=91=E5=88=B01936=E7=AB=AF?= =?UTF-8?q?=E5=8F=A3,=E8=BF=9B=E8=A1=8C=E6=B5=81=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 12 + go.mod | 1 + go.sum | 4 + main.go | 10 +- resources/conf/conf-prod.yml | 6 +- src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go | 239 ++++++++++++++++ src/rtsp2rtmp/rtmppublisher/rtmppublisher.go | 89 ++++++ src/rtsp2rtmp/rtmpserver/rtmpserver.go | 259 ++++++++++++++++++ .../rtspclientmanager/rtspclientmanager.go | 5 +- src/rtsp2rtmp/web/task/task.go | 4 +- 10 files changed, 620 insertions(+), 9 deletions(-) create mode 100644 src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go create mode 100644 src/rtsp2rtmp/rtmppublisher/rtmppublisher.go create mode 100644 src/rtsp2rtmp/rtmpserver/rtmpserver.go diff --git a/README.md b/README.md index 316a0af..89185d7 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,18 @@ rtsp2rtmp项目实现了连接rtsp协议的设备(摄像头等),采集视 4. 打开:http://[ip:localhost]:[port:8080]/rtsp2rtmp ,如:http://127.0.0.1:8080/rtsp2rtmp , 默认用户名/密码: admin/admin ## 使用说明: +### 安装 FFmpeg + +使用项目之前,需要在本地机器安装好`FFmpeg`,并且将`FFmpeg`添加到环境变量`$PATH`中。 + +[点击下载FFmpeg](https://ffmpeg.org/download.html) + +配置完成后打开控制台输入以下命令测试是否输出`FFmpeg`的信息 + +``` +$ ffmpeg +ffmpeg version 7.1 Copyright (c) 2000-2024 the FFmpeg developers... +``` 1. 打开摄像头列表页面 ![](./docs/images/camera-list.png) 2. 点击创建按钮,跳转到创建摄像头页面,填写信息,完成摄像头创建 diff --git a/go.mod b/go.mod index d6c43ce..fd7c81d 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( // github.com/deepch/vdk v0.0.0-20241120073805-439b6309323c //gitUrl v0.0.0-timestamp-commitId github.com/deepch/vdk v0.0.27 github.com/gin-gonic/gin v1.7.2 + github.com/go-cmd/cmd v1.4.3 github.com/google/uuid v1.3.0 github.com/lib/pq v1.10.9 ) diff --git a/go.sum b/go.sum index 8a6b6a9..29d19a9 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/gin-gonic/gin v1.7.2 h1:Tg03T9yM2xa8j6I3Z3oqLaQRSmKvxPd6g/2HJ6zICFA= github.com/gin-gonic/gin v1.7.2/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c/go.mod h1:Gja1A+xZ9BoviGJNA2E9vFkPjjsl+CoJxSXiQM1UXtw= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= +github.com/go-cmd/cmd v1.4.3 h1:6y3G+3UqPerXvPcXvj+5QNPHT02BUw7p6PsqRxLNA7Y= +github.com/go-cmd/cmd v1.4.3/go.mod h1:u3hxg/ry+D5kwh8WvUkHLAMe2zQCaXd00t35WfQaOFk= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -88,6 +90,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= +github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.3.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= diff --git a/main.go b/main.go index 5a4720d..0471de0 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,10 @@ import ( "syscall" _ "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/conf" // 必须先导入配置文件 - "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/ffmpegmanager" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" + + // "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" _ "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/register" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/task" @@ -18,8 +21,9 @@ import ( ) func main() { - - rtspclientmanager.GetSingleRtspClientManager().StartClient() + // rtspclientmanager.GetSingleRtspClientManager().StartClient() + rtmpserver.GetSingleRtmpServer().StartRtmpServer() + ffmpegmanager.GetSingleFFmpegManager().StartClient() task.GetSingleTask().StartTask() web.GetSingleWeb().StartWeb() sigs := make(chan os.Signal, 1) diff --git a/resources/conf/conf-prod.yml b/resources/conf/conf-prod.yml index d6c49c5..135aba8 100644 --- a/resources/conf/conf-prod.yml +++ b/resources/conf/conf-prod.yml @@ -1,8 +1,10 @@ server: - security: true + security: false user: name: admin password: admin + rtmp: + port: 1936 http: port: 8080 static: @@ -16,5 +18,5 @@ server: driver-type: 4 driver: postgres url: user=postgres password=123456 dbname=rtsp2rtmp host=localhost port=5432 sslmode=disable TimeZone=UTC - show-sql: false + show-sql: true \ No newline at end of file diff --git a/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go b/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go new file mode 100644 index 0000000..5af3a8c --- /dev/null +++ b/src/rtsp2rtmp/ffmpegmanager/ffmpegmanager.go @@ -0,0 +1,239 @@ +package ffmpegmanager + +import ( + "fmt" + "net" + "runtime/debug" + "strings" + "sync" + "time" + + "github.com/beego/beego/v2/core/config" + "github.com/beego/beego/v2/core/logs" + "github.com/go-cmd/cmd" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" + ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" + base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" +) + +type FFmpegInfo struct { + startTime time.Time + templatePasswd string + ffMpegCmd *cmd.Cmd +} + +var rcmInstance *FFmpegManager + +func init() { + rcmInstance = &FFmpegManager{} +} + +type FFmpegManager struct { + rcs sync.Map +} + +func GetSingleFFmpegManager() *FFmpegManager { + return rcmInstance +} + +func (rs *FFmpegManager) StartClient() { + go rs.startConnections() + go rs.stopConn(ext_controller.CodeStream()) + go rs.checkBlockFFmpegProcess() +} + +func (rc *FFmpegManager) ExistsPublisher(code string) bool { + exists := false + rc.rcs.Range(func(key, value interface{}) bool { + codeKey := key.(string) + if codeKey == code { + exists = true + return false + } + return true + }) + return exists +} + +func (rs *FFmpegManager) checkBlockFFmpegProcess() { + defer func() { + if r := recover(); r != nil { + logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) + } + }() + for { + condition := common.GetEmptyCondition() + es, err := base_service.CameraFindCollectionByCondition(condition) + if err != nil { + logs.Error("camera list query error: %s", err) + return + } + for _, camera := range es { + if camera.OnlineStatus != 1 { + v, b := rs.rcs.Load(camera.Code) + if b { + ffmpegInfo := v.(FFmpegInfo) + if time.Since(ffmpegInfo.startTime) > 30*time.Second { + logs.Info("camera [%s] rtsp block, close it", camera.Code) + err := ffmpegInfo.ffMpegCmd.Stop() + if err != nil { + logs.Error("camera [%s] rtsp block, close it error: %v", err) + } + rs.rcs.Delete(camera.Code) + } + } + } + <-time.NewTicker(10 * time.Second).C + } + } +} + +func (rs *FFmpegManager) stopConn(codeStream <-chan string) { + defer func() { + if r := recover(); r != nil { + logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) + } + }() + + for code := range codeStream { + v, b := rs.rcs.Load(code) + if b { + ffmpegInfo := v.(FFmpegInfo) + ffmpegInfo.ffMpegCmd.Stop() + logs.Info("camera [%s] close success", code) + } else { + logs.Info("ffmpeg proccess not exist, needn't close: %s", code) + } + } +} + +func (s *FFmpegManager) startConnections() { + defer func() { + if r := recover(); r != nil { + logs.Error("rtspManager panic %v", r) + } + }() + condition := common.GetEmptyCondition() + es, err := base_service.CameraFindCollectionByCondition(condition) + if err != nil { + logs.Error("camera list query error: %s", err) + return + } + for { + condition := common.GetEmptyCondition() + es, err = base_service.CameraFindCollectionByCondition(condition) + if err != nil { + logs.Error("camera list query error: %s", err) + return + } + for _, camera := range es { + if v, b := s.rcs.Load(camera.Code); b && v != nil { + continue + } + if camera.Enabled != 1 { + continue + } + go s.connRtsp(camera.Code) + } + <-time.After(30 * time.Second) + } + +} + +func (ffmpegManager *FFmpegManager) connRtsp(code string) { + defer func() { + if r := recover(); r != nil { + logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) + } + }() + defer func() { + ffmpegManager.rcs.Delete(code) + }() + condition := common.GetEqualCondition("code", code) + camera, err := base_service.CameraFindOneByCondition(condition) + if err != nil { + logs.Error("find camera [%s] error : %v", code, err) + return + } + if camera.Enabled != 1 { + logs.Error("camera [%s] disabled : %v", code) + return + } + rtmpPort, err := config.Int("server.rtmp.port") + if err != nil { + logs.Error("get rtmp port fail : %v", err) + return + } + templatePasswd, _ := utils.UUID() + rtmpUrl := fmt.Sprintf("rtmp://127.0.0.1:%d/%s/%s", rtmpPort, code, templatePasswd) + portOpen := checkTargetPortStatus(camera.RtspUrl) + if !portOpen { + logs.Error("rtspUrl: %s port not open", camera.RtspUrl) + return + } + // 只支持h264编码, 使用"-c:v copy", 不要使用其他选项, 出发视频转码会导致cpu很高 + ffmpegCmd := cmd.NewCmd("ffmpeg", "-i", camera.RtspUrl, "-c:v", "copy", "-c:a", "aac", "-f", "flv", rtmpUrl) + ffmpegManager.rcs.Store(code, FFmpegInfo{startTime: time.Now(), templatePasswd: templatePasswd, ffMpegCmd: ffmpegCmd}) + logs.Info("ffmpeg start connect rtsp : command : %s %s", ffmpegCmd.Name, strings.Join(ffmpegCmd.Args, " ")) + statusChan := ffmpegCmd.Start() + finalStatus := <-statusChan + if finalStatus.Error != nil { + logs.Error("ffmpeg start connect rtsp failed:", finalStatus.Error) + } else { + logs.Info("ffmpeg complate connect rtsp : %s", code) + } + +} + +func (r *FFmpegManager) Load(key interface{}) (interface{}, bool) { + return r.rcs.Load(key) +} +func (r *FFmpegManager) Store(key, value interface{}) { + r.rcs.Store(key, value) +} +func (r *FFmpegManager) Delete(key interface{}) { + r.rcs.Delete(key) +} + +func ValiadRtmpInfo(code string, passwd string) (fgSuccess bool) { + v, ok := GetSingleFFmpegManager().rcs.Load(code) + if !ok { + fgSuccess = false + return + } + fFmpegInfo, ok := v.(FFmpegInfo) + if !ok { + fgSuccess = false + return + } + if fFmpegInfo.templatePasswd != passwd { + fgSuccess = false + return + } + fgSuccess = true + return +} + +func checkTargetPortStatus(rtspUrl string) (open bool) { + // rtsp://127.0.0.1:554/2 + urlSplits := strings.Split(rtspUrl, "/") + if len(urlSplits) < 3 { + logs.Warn("rtspUrl: %s error", rtspUrl) + open = false + return + } + address := urlSplits[2] + timeout := 5 * time.Second + + conn, err := net.DialTimeout("tcp", address, timeout) + if err != nil { + logs.Warn("rtspUrl: %s port not open", rtspUrl) + open = false + return + } + defer conn.Close() + + open = true + return +} diff --git a/src/rtsp2rtmp/rtmppublisher/rtmppublisher.go b/src/rtsp2rtmp/rtmppublisher/rtmppublisher.go new file mode 100644 index 0000000..d846573 --- /dev/null +++ b/src/rtsp2rtmp/rtmppublisher/rtmppublisher.go @@ -0,0 +1,89 @@ +package rtmppublisher + +import ( + "github.com/beego/beego/v2/core/logs" + "github.com/deepch/vdk/av" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/flvadmin" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" +) + +type RtmpServer interface { + Load(key interface{}) (interface{}, bool) + Store(key, value interface{}) + Delete(key interface{}) +} + +type Publisher struct { + code string + codecs []av.CodecData + done chan int + connDone <-chan int + pktStream <-chan av.Packet + ffmPktStream <-chan av.Packet + hfmPktStream <-chan av.Packet + rfmPktStream <-chan av.Packet + rtmpserver RtmpServer +} + +func NewPublisher(connDone <-chan int, pktStream <-chan av.Packet, code string, codecs []av.CodecData, rs RtmpServer) *Publisher { + r := &Publisher{ + connDone: connDone, + pktStream: pktStream, + code: code, + codecs: codecs, + ffmPktStream: make(chan av.Packet, 1024), + hfmPktStream: make(chan av.Packet, 1024), + rtmpserver: rs, + } + r.pktTransfer() + return r +} + +func (r *Publisher) Done() { + <-r.connDone +} + +func (rtmpClient *Publisher) pktTransfer() { + done := utils.OrDoneInt(rtmpClient.done, rtmpClient.connDone) + ffmPktStream, hfmPktStream, rfmPktStream := tee(done, rtmpClient.pktStream) + rtmpClient.ffmPktStream = ffmPktStream + rtmpClient.hfmPktStream = hfmPktStream + rtmpClient.rfmPktStream = rfmPktStream + logs.Debug("publisher [%s] create customer", rtmpClient.code) + flvadmin.GetSingleFileFlvAdmin().FlvWrite(rtmpClient.ffmPktStream, rtmpClient.code, rtmpClient.codecs) + flvadmin.GetSingleHttpFlvAdmin().AddHttpFlvManager(rtmpClient.hfmPktStream, rtmpClient.code, rtmpClient.codecs) + flvadmin.GetSingleRtmpFlvAdmin().FlvWrite(rtmpClient.rfmPktStream, rtmpClient.code, rtmpClient.codecs) +} + +func tee(done <-chan int, in <-chan av.Packet) (<-chan av.Packet, <-chan av.Packet, <-chan av.Packet) { + //设置缓冲,调节前后速率 + out1 := make(chan av.Packet, 1024) + out2 := make(chan av.Packet, 1024) + out3 := make(chan av.Packet, 1024) + go func() { + defer close(out1) + defer close(out2) + defer close(out3) + for val := range in { + var out1, out2, out3 = out1, out2, out3 // 私有变量覆盖 + for i := 0; i < 3; i++ { + select { + case <-done: + return + case out1 <- val: + // logs.Debug("FileFlvManager write success") + out1 = nil // 置空阻塞机制完成select轮询 + case out2 <- val: + // logs.Debug("HttpflvAdmin write success") + out2 = nil + case out3 <- val: + // logs.Debug("RtmpFlvManager write success") + out3 = nil + default: + // logs.Debug("RtspClient tee lose packet") + } + } + } + }() + return out1, out2, out3 +} diff --git a/src/rtsp2rtmp/rtmpserver/rtmpserver.go b/src/rtsp2rtmp/rtmpserver/rtmpserver.go new file mode 100644 index 0000000..8f2aa4e --- /dev/null +++ b/src/rtsp2rtmp/rtmpserver/rtmpserver.go @@ -0,0 +1,259 @@ +package rtmpserver + +import ( + "runtime/debug" + "strconv" + "strings" + "sync" + + "github.com/beego/beego/v2/core/config" + "github.com/beego/beego/v2/core/logs" + "github.com/deepch/vdk/av" + "github.com/deepch/vdk/format/rtmp" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/ffmpegmanager" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmppublisher" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" + ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/dao/entity" + base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" +) + +var rtmpserverInstance *rtmpServer + +func init() { + rtmpserverInstance = &rtmpServer{} +} + +type rtmpServer struct { + rms sync.Map + conns sync.Map +} + +func GetSingleRtmpServer() *rtmpServer { + return rtmpserverInstance +} + +func (rs *rtmpServer) StartRtmpServer() { + go rs.startRtmp() + done := make(chan interface{}) + go rs.stopConn(done, ext_controller.CodeStream()) +} + +func (rs *rtmpServer) ExistsPublisher(code string) bool { + exists := false + rs.rms.Range(func(key, value interface{}) bool { + codeKey := key.(string) + if code == codeKey { + exists = true + return false + } + return true + }) + return exists +} + +func (rs *rtmpServer) stopConn(done <-chan interface{}, codeStream <-chan string) { + defer func() { + if r := recover(); r != nil { + logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack())) + } + }() + for { + select { + case <-done: + return + case code := <-codeStream: + v, b := rs.conns.Load(code) + if b { + r := v.(*rtmp.Conn) + err := r.Close() + if err != nil { + logs.Error("camera [%s] close error : %v", code, err) + continue + } + logs.Info("camera [%s] close success", code) + } + } + } + +} + +func (r *rtmpServer) startRtmp() { + defer func() { + if recover_rusult := recover(); recover_rusult != nil { + logs.Error("system painc : %v \nstack : %v", recover_rusult, string(debug.Stack())) + } + }() + rtmpPort, err := config.Int("server.rtmp.port") + if err != nil { + logs.Error("get rtmp port fail : %v", err) + return + } + // rtmp.Debug = true + s := &rtmp.Server{ + Addr: ":" + strconv.Itoa(rtmpPort), + HandleConn: r.handleRtmpConn, + } + s.ListenAndServe() +} + +func (r *rtmpServer) handleRtmpConn(conn *rtmp.Conn) { + defer func() { + if recover_rusult := recover(); recover_rusult != nil { + logs.Error("HandleConn error : %v", recover_rusult) + } + }() + defer func() { + err := conn.Close() + if err != nil { + logs.Error("HandleConn Close err : %v", err) + } + }() + logs.Info("client arrive : %s", conn.NetConn().RemoteAddr().String()) + err := conn.Prepare() + if err != nil { + logs.Error("Prepare error : %v , remote port : %s", err, conn.NetConn().RemoteAddr().String()) + err = conn.Close() + if err != nil { + logs.Error("close conn error : %v", err) + } + return + } + + code, authCode, ok := getParamByURI(conn) + if !ok { + return + } + + condition := common.GetEqualCondition("code", code) + camera, err := base_service.CameraFindOneByCondition(condition) + if err != nil { + logs.Error("no camera error : %s", code) + err = conn.Close() + if err != nil { + logs.Error("close conn error : %v", err) + } + return + } + + if ok := authentication(camera, code, authCode, conn); !ok { + return + } + + logs.Info("publish authentication success : %s", code) + + codecs, err := conn.Streams() + if err != nil { + logs.Error("get codecs error : %v", err) + err = conn.Close() + if err != nil { + logs.Error("close conn error : %v", err) + } + return + } + v, ok := r.conns.LoadAndDelete(camera.Code) + if ok { + logs.Info("camera [%s] online , close old conn", code) + conn := v.(*rtmp.Conn) + err := conn.Close() + if err != nil { + logs.Error("camera [%s] close old conn error : %v", code, err) + } + } + v, ok = r.rms.Load(camera.Code) + if ok { + logs.Info("camera [%s] online , close old conn", camera.Code) + oldR := v.(*rtmppublisher.Publisher) + //等待旧连接关闭完成 + oldR.Done() + } + r.conns.Store(camera.Code, conn) + + camera.OnlineStatus = 1 + base_service.CameraUpdateById(camera) + + done := make(chan int) + //添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包 + pktStream := make(chan av.Packet, 1024) + defer func() { + close(done) + close(pktStream) + }() + + p := rtmppublisher.NewPublisher(done, pktStream, code, codecs, r) + r.rms.Store(camera.Code, p) + for { + pkt, err := conn.ReadPacket() + if err != nil { + logs.Error("ReadPacket error : %v", err) + break + } + select { + case pktStream <- pkt: + default: + //添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包 + logs.Debug("rtmpserver lose packet") + } + } + + camera, err = base_service.CameraFindOneByCondition(condition) + if err != nil { + logs.Error("no camera error : %s", code) + } else { + camera.OnlineStatus = 0 + base_service.CameraUpdateById(camera) + } + + r.rms.Delete(code) + r.conns.Delete(code) + err = conn.Close() + if err != nil { + logs.Error("close conn error : %v", err) + } + +} + +// 获取uri信息 +func getParamByURI(conn *rtmp.Conn) (string, string, bool) { + logs.Info("Path : %s , remote port : %s", conn.URL.Path, conn.NetConn().RemoteAddr().String()) + path := conn.URL.Path + paths := strings.Split(strings.TrimLeft(path, "/"), "/") + if len(paths) != 2 { + logs.Error("rtmp path error : %s", path) + err := conn.Close() + if err != nil { + logs.Error("close conn error : %v", err) + } + return "", "", false + } + return paths[0], paths[1], true +} + +// 权限验证 +func authentication(camera entity.Camera, code string, authCode string, conn *rtmp.Conn) bool { + fgSuccess := ffmpegmanager.ValiadRtmpInfo(code, authCode) + if !fgSuccess { + logs.Error("camera %s RtmpAuthCode error : %s", code, authCode) + conn.Close() + return false + } + if camera.Enabled != 1 { + logs.Error("camera %s disabled : %s", code, authCode) + err := conn.Close() + if err != nil { + logs.Error("close conn error : %v", err) + } + return false + } + return true +} + +func (r *rtmpServer) Load(key interface{}) (interface{}, bool) { + return r.rms.Load(key) +} +func (r *rtmpServer) Store(key, value interface{}) { + r.rms.Store(key, value) +} +func (r *rtmpServer) Delete(key interface{}) { + r.rms.Delete(key) +} diff --git a/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go b/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go index 8a42346..acfb812 100644 --- a/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go +++ b/src/rtsp2rtmp/rtspclientmanager/rtspclientmanager.go @@ -11,7 +11,8 @@ import ( "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclient" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/utils" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" - ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" + + // ext_controller "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/controllers/ext" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" ) @@ -32,7 +33,7 @@ func GetSingleRtspClientManager() *RtspClientManager { func (rs *RtspClientManager) StartClient() { go rs.startConnections() - go rs.stopConn(ext_controller.CodeStream()) + // go rs.stopConn(ext_controller.CodeStream()) } func (rc *RtspClientManager) ExistsPublisher(code string) bool { diff --git a/src/rtsp2rtmp/web/task/task.go b/src/rtsp2rtmp/web/task/task.go index 229a166..8109a54 100644 --- a/src/rtsp2rtmp/web/task/task.go +++ b/src/rtsp2rtmp/web/task/task.go @@ -5,7 +5,7 @@ import ( "time" "github.com/beego/beego/v2/core/logs" - "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtspclientmanager" + "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/rtmpserver" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web" "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/common" base_service "github.com/hkmadao/rtsp2rtmp/src/rtsp2rtmp/web/service/base" @@ -57,7 +57,7 @@ func (t *task) offlineCamera() { if cs.OnlineStatus != 1 { continue } - if exists := rtspclientmanager.GetSingleRtspClientManager().ExistsPublisher(cs.Code); !exists { + if exists := rtmpserver.GetSingleRtmpServer().ExistsPublisher(cs.Code); !exists { cs.OnlineStatus = 0 base_service.CameraUpdateById(cs) }