1.fix bug:rtmp server conn disconnect, client rtmp conn blocked; 2.fg encrypt field just;

This commit is contained in:
madao
2025-02-05 12:51:19 +08:00
parent ce1f0f9681
commit 6cbba876b4
9 changed files with 50 additions and 41 deletions

View File

@@ -16,8 +16,7 @@ CREATE TABLE public.camera (
live bool NULL, -- 直播状态 live bool NULL, -- 直播状态
created timestamp NULL, -- 创建时间 created timestamp NULL, -- 创建时间
id varchar(255) NOT NULL, id varchar(255) NOT NULL,
fg_secret bool NULL DEFAULT false, -- 加密标志 fg_encrypt bool NULL DEFAULT false, -- 加密标志
secret varchar NULL, -- 密钥
fg_passive bool NULL DEFAULT false, -- 被动推送rtmp标志 fg_passive bool NULL DEFAULT false, -- 被动推送rtmp标志
rtmp_auth_code varchar NULL, -- rtmp识别码 rtmp_auth_code varchar NULL, -- rtmp识别码
camera_type varchar NULL, -- rtmp识别码 camera_type varchar NULL, -- rtmp识别码
@@ -37,8 +36,7 @@ COMMENT ON COLUMN public.camera.rtmp_push_status IS 'rtmp推送状态';
COMMENT ON COLUMN public.camera.save_video IS '保存录像状态'; COMMENT ON COLUMN public.camera.save_video IS '保存录像状态';
COMMENT ON COLUMN public.camera.live IS '直播状态'; COMMENT ON COLUMN public.camera.live IS '直播状态';
COMMENT ON COLUMN public.camera.created IS '创建时间'; COMMENT ON COLUMN public.camera.created IS '创建时间';
COMMENT ON COLUMN public.camera.fg_secret IS '加密标志'; COMMENT ON COLUMN public.camera.fg_encrypt IS '加密标志';
COMMENT ON COLUMN public.camera.secret IS '密钥';
COMMENT ON COLUMN public.camera.fg_passive IS '被动推送rtmp标志'; COMMENT ON COLUMN public.camera.fg_passive IS '被动推送rtmp标志';
COMMENT ON COLUMN public.camera.rtmp_auth_code IS 'rtmp识别码'; COMMENT ON COLUMN public.camera.rtmp_auth_code IS 'rtmp识别码';
COMMENT ON COLUMN public.camera.camera_type IS 'rtmp识别码'; COMMENT ON COLUMN public.camera.camera_type IS 'rtmp识别码';

File diff suppressed because one or more lines are too long

View File

@@ -32,7 +32,7 @@ func (rfm *RtmpFlvAdmin) FlvWrite(pktStream <-chan av.Packet, code string, codec
logs.Error("FlvWrite found camera: %s error: %v, do painc", code, err) logs.Error("FlvWrite found camera: %s error: %v, do painc", code, err)
panic(fmt.Sprintf("FlvWrite found camera: %s error: %v", code, err)) panic(fmt.Sprintf("FlvWrite found camera: %s error: %v", code, err))
} }
rfw := rtmpflvwriter.NewRtmpFlvWriter(!camera.FgPassive, pktStream, code, codecs, rfm) rfw := rtmpflvwriter.NewRtmpFlvWriter(!camera.FgPassive, pktStream, code, codecs, rfm, 0)
rfm.rfms.Store(code, rfw) rfm.rfms.Store(code, rfw)
} }
@@ -41,7 +41,7 @@ func (rfm *RtmpFlvAdmin) StartWrite(code string, needPushRtmp bool) {
if ok { if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter) rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite() rfw.StopWrite()
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(needPushRtmp, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm) rfwNew := rtmpflvwriter.NewRtmpFlvWriter(needPushRtmp, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm, 0)
rfm.rfms.Store(code, rfwNew) rfm.rfms.Store(code, rfwNew)
} }
} }
@@ -51,7 +51,7 @@ func (rfm *RtmpFlvAdmin) ReConntion(code string) {
if ok { if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter) rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite() rfw.StopWrite()
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(rfw.GetNeedPushRtmp(), rfw.GetPktStream(), code, rfw.GetCodecs(), rfm) rfwNew := rtmpflvwriter.NewRtmpFlvWriter(rfw.GetNeedPushRtmp(), rfw.GetPktStream(), code, rfw.GetCodecs(), rfm, 0)
rfm.rfms.Store(code, rfwNew) rfm.rfms.Store(code, rfwNew)
} }
} }
@@ -62,7 +62,7 @@ func (rfm *RtmpFlvAdmin) RemoteStartWrite(code string) {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter) rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
if !rfw.GetNeedPushRtmp() { if !rfw.GetNeedPushRtmp() {
rfw.StopWrite() rfw.StopWrite()
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(true, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm) rfwNew := rtmpflvwriter.NewRtmpFlvWriter(true, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm, 0)
rfm.rfms.Store(code, rfwNew) rfm.rfms.Store(code, rfwNew)
} }
} }
@@ -73,7 +73,7 @@ func (rfm *RtmpFlvAdmin) RemoteStopWrite(code string) {
if ok { if ok {
rfw := v.(*rtmpflvwriter.RtmpFlvWriter) rfw := v.(*rtmpflvwriter.RtmpFlvWriter)
rfw.StopWrite() rfw.StopWrite()
rfwNew := rtmpflvwriter.NewRtmpFlvWriter(false, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm) rfwNew := rtmpflvwriter.NewRtmpFlvWriter(false, rfw.GetPktStream(), code, rfw.GetCodecs(), rfm, 0)
rfm.rfms.Store(code, rfwNew) rfm.rfms.Store(code, rfwNew)
} }
} }

View File

@@ -33,6 +33,7 @@ type RtmpFlvWriter struct {
pulseInterval time.Duration pulseInterval time.Duration
irfm IRtmpFlvManager irfm IRtmpFlvManager
mutex sync.Mutex mutex sync.Mutex
reConnCount int64
} }
func (rfw *RtmpFlvWriter) GetDone() <-chan int { func (rfw *RtmpFlvWriter) GetDone() <-chan int {
@@ -60,7 +61,7 @@ func (rfw *RtmpFlvWriter) GetNeedPushRtmp() bool {
return rfw.needPushRtmp return rfw.needPushRtmp
} }
func NewRtmpFlvWriter(needPushRtmp bool, pktStream <-chan av.Packet, code string, codecs []av.CodecData, irfm IRtmpFlvManager) *RtmpFlvWriter { func NewRtmpFlvWriter(needPushRtmp bool, pktStream <-chan av.Packet, code string, codecs []av.CodecData, irfm IRtmpFlvManager, reConnCount int64) *RtmpFlvWriter {
rfw := &RtmpFlvWriter{ rfw := &RtmpFlvWriter{
needPushRtmp: needPushRtmp, needPushRtmp: needPushRtmp,
stop: false, stop: false,
@@ -72,6 +73,7 @@ func NewRtmpFlvWriter(needPushRtmp bool, pktStream <-chan av.Packet, code string
start: false, start: false,
pulseInterval: 5 * time.Second, pulseInterval: 5 * time.Second,
irfm: irfm, irfm: irfm,
reConnCount: reConnCount,
} }
go rfw.flvWrite() go rfw.flvWrite()
return rfw return rfw
@@ -120,7 +122,12 @@ func (rfw *RtmpFlvWriter) createConn() error {
logs.Error("get remote secret error: %v", err) logs.Error("get remote secret error: %v", err)
return err return err
} }
proxyConnOption := rtmp.NewProxyConnOption(rtmp.AES, clientCode, signSecret, []byte(secretStr)) var proxyConnOption rtmp.ProxyConnOption
if camera.FgEncrypt {
proxyConnOption = rtmp.NewProxyConnOption(rtmp.AES, clientCode, signSecret, []byte(secretStr))
} else {
proxyConnOption = rtmp.NewUnEncryptProxyConnOption()
}
rtmpConn, err := rtmp.DialEncrypt(camera.RtmpUrl, proxyConnOption) rtmpConn, err := rtmp.DialEncrypt(camera.RtmpUrl, proxyConnOption)
if err != nil { if err != nil {
logs.Error("rtmp client connection error : %v", err) logs.Error("rtmp client connection error : %v", err)
@@ -190,9 +197,23 @@ func (rfw *RtmpFlvWriter) flvWrite() {
} }
_, pktStreamOk := <-rfw.pktStream _, pktStreamOk := <-rfw.pktStream
if pktStreamOk { if pktStreamOk {
logs.Info("to create NewRtmpFlvWriter : %s", rfw.code) reConnDuration := time.Duration(rfw.reConnCount) * time.Duration(10) * time.Second
rfwn := NewRtmpFlvWriter(true, rfw.pktStream, rfw.code, rfw.codecs, rfw.irfm) if reConnDuration > (10 * time.Minute) {
rfwn.irfm.UpdateFFWS(rfwn.code, rfwn) reConnDuration = 10 * time.Minute
}
if reConnDuration > 0 {
<-time.NewTicker(reConnDuration).C
}
if rfw.stop {
logs.Info("stop RtmpFlvWriter : %s", rfw.code)
return
}
_, pktStreamOk := <-rfw.pktStream
if pktStreamOk {
logs.Info("to create NewRtmpFlvWriter : %s", rfw.code)
rfwn := NewRtmpFlvWriter(true, rfw.pktStream, rfw.code, rfw.codecs, rfw.irfm, rfw.reConnCount+1)
rfwn.irfm.UpdateFFWS(rfwn.code, rfwn)
}
} else { } else {
logs.Info("RtmpFlvWriter pktStream is closed : %s", rfw.code) logs.Info("RtmpFlvWriter pktStream is closed : %s", rfw.code)
} }
@@ -235,9 +256,13 @@ func (rfw *RtmpFlvWriter) writerPacket(pkt av.Packet, templateTime *time.Time) e
return err return err
} }
var err error var err error
// setDeadline
rfw.conn.NetConn().SetDeadline(time.Now().Add(10 * time.Second))
err = rfw.conn.WriteHeader(rfw.codecs) err = rfw.conn.WriteHeader(rfw.codecs)
// clear Deadline
rfw.conn.NetConn().SetDeadline(time.Time{})
rfw.startTime = time.Now() rfw.startTime = time.Now()
logs.Info("KeyFrame WriteHeader to rtmp server : %s, codesc: %v", rfw.code, rfw.codecs) // logs.Info("KeyFrame WriteHeader to rtmp server : %s, codesc: %v", rfw.code, rfw.codecs)
if err != nil { if err != nil {
logs.Error("writer header to rtmp server error : %v", err) logs.Error("writer header to rtmp server error : %v", err)
return err return err

View File

@@ -89,20 +89,13 @@ func GetCameraDesc() *common.EntityDesc {
DataType: "DateTime", DataType: "DateTime",
ValueType: "DateTime", ValueType: "DateTime",
}; };
var fgSecretAttributeInfo = &common.AttributeInfo { var fgEncryptAttributeInfo = &common.AttributeInfo {
ColumnName: "fg_secret", ColumnName: "fg_encrypt",
Name: "fgSecret", Name: "fgEncrypt",
DisplayName: "加密标志", DisplayName: "加密标志",
DataType: "Boolean", DataType: "Boolean",
ValueType: "bool", ValueType: "bool",
}; };
var secretAttributeInfo = &common.AttributeInfo {
ColumnName: "secret",
Name: "secret",
DisplayName: "密钥",
DataType: "String",
ValueType: "string",
};
var fgPassiveAttributeInfo = &common.AttributeInfo { var fgPassiveAttributeInfo = &common.AttributeInfo {
ColumnName: "fg_passive", ColumnName: "fg_passive",
Name: "fgPassive", Name: "fgPassive",
@@ -159,8 +152,7 @@ func GetCameraDesc() *common.EntityDesc {
"saveVideo": saveVideoAttributeInfo, "saveVideo": saveVideoAttributeInfo,
"live": liveAttributeInfo, "live": liveAttributeInfo,
"created": createdAttributeInfo, "created": createdAttributeInfo,
"fgSecret": fgSecretAttributeInfo, "fgEncrypt": fgEncryptAttributeInfo,
"secret": secretAttributeInfo,
"fgPassive": fgPassiveAttributeInfo, "fgPassive": fgPassiveAttributeInfo,
"rtmpAuthCode": rtmpAuthCodeAttributeInfo, "rtmpAuthCode": rtmpAuthCodeAttributeInfo,
"cameraType": cameraTypeAttributeInfo, "cameraType": cameraTypeAttributeInfo,

View File

@@ -28,9 +28,7 @@ type Camera struct {
// 创建时间: // 创建时间:
Created time.Time `orm:"column(created)" json:"created"` Created time.Time `orm:"column(created)" json:"created"`
// 加密标志: // 加密标志:
FgSecret bool `orm:"column(fg_secret)" json:"fgSecret"` FgEncrypt bool `orm:"column(fg_encrypt)" json:"fgEncrypt"`
// 密钥:
Secret string `orm:"column(secret)" json:"secret"`
// 被动推送rtmp标志 // 被动推送rtmp标志
FgPassive bool `orm:"column(fg_passive)" json:"fgPassive"` FgPassive bool `orm:"column(fg_passive)" json:"fgPassive"`
// rtmp识别码: // rtmp识别码:

View File

@@ -29,9 +29,7 @@ type CameraPO struct {
// 创建时间: // 创建时间:
Created time.Time `json:"created"` Created time.Time `json:"created"`
// 加密标志: // 加密标志:
FgSecret bool `json:"fgSecret"` FgEncrypt bool `json:"fgEncrypt"`
// 密钥:
Secret string `json:"secret"`
// 被动推送rtmp标志 // 被动推送rtmp标志
FgPassive bool `json:"fgPassive"` FgPassive bool `json:"fgPassive"`
// rtmp识别码: // rtmp识别码:

View File

@@ -29,9 +29,7 @@ type CameraVO struct {
// 创建时间: // 创建时间:
Created time.Time `json:"created"` Created time.Time `json:"created"`
// 加密标志: // 加密标志:
FgSecret bool `json:"fgSecret"` FgEncrypt bool `json:"fgEncrypt"`
// 密钥:
Secret string `json:"secret"`
// 被动推送rtmp标志 // 被动推送rtmp标志
FgPassive bool `json:"fgPassive"` FgPassive bool `json:"fgPassive"`
// rtmp识别码: // rtmp识别码:

View File

@@ -96,7 +96,7 @@ func CameraSelectByIds(ids []string) (models []entity.Camera, err error) {
// execute the raw query string // execute the raw query string
_, err_query := o.Raw(sqlStr, params...).QueryRows(&models) _, err_query := o.Raw(sqlStr, params...).QueryRows(&models)
if err_query != nil { if err_query != nil {
err = fmt.Errorf("selectByIds error: %v", err_make_sql) err = fmt.Errorf("selectByIds error: %v", err_query)
return return
} }
@@ -118,7 +118,7 @@ func CameraFindCollectionByCondition(condition common.AqCondition) (models []ent
// execute the raw query string // execute the raw query string
_, err_query := o.Raw(sqlStr, params...).QueryRows(&models) _, err_query := o.Raw(sqlStr, params...).QueryRows(&models)
if err_query != nil { if err_query != nil {
err = fmt.Errorf("findCollectionByCondition error: %v", err_make_sql) err = fmt.Errorf("findCollectionByCondition error: %v", err_query)
return return
} }
return return
@@ -140,7 +140,7 @@ func CameraFindOneByCondition(condition common.AqCondition) (model entity.Camera
models := make([]entity.Camera, 0) models := make([]entity.Camera, 0)
_, err_query := o.Raw(sqlStr, params...).QueryRows(&models) _, err_query := o.Raw(sqlStr, params...).QueryRows(&models)
if err_query != nil { if err_query != nil {
err = fmt.Errorf("findOneByCondition error: %v", err_make_sql) err = fmt.Errorf("findOneByCondition error: %v", err_query)
return return
} }
if len(models) < 1 { if len(models) < 1 {
@@ -184,7 +184,7 @@ func CameraFindPageByCondition(aqPageInfoInput common.AqPageInfoInput) (pageInfo
models := make([]entity.Camera, 0) models := make([]entity.Camera, 0)
_, err_query := o.Raw(pageSqlStr, params...).QueryRows(&models) _, err_query := o.Raw(pageSqlStr, params...).QueryRows(&models)
if err_query != nil { if err_query != nil {
err = fmt.Errorf("findPageByCondition error: %v", err_make_sql) err = fmt.Errorf("findPageByCondition error: %v", err_query)
return return
} }
dataList := make([]interface{}, 0) dataList := make([]interface{}, 0)