mirror of
https://github.com/hkmadao/rtsp2rtmp.git
synced 2025-09-26 19:31:19 +08:00
修复rtsp流超时没有自动断开bug
This commit is contained in:
@@ -187,9 +187,9 @@ func (hfw *HttpFlvWriter) writerPacket(pkt av.Packet, templateTime *time.Time) e
|
||||
|
||||
//Write extends to io.Writer
|
||||
func (hfw *HttpFlvWriter) Write(p []byte) (n int, err error) {
|
||||
start := time.Now()
|
||||
// start := time.Now()
|
||||
defer func() {
|
||||
logs.Debug(time.Since(start))
|
||||
// logs.Debug(time.Since(start))
|
||||
if r := recover(); r != nil {
|
||||
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
|
||||
}
|
||||
|
@@ -31,8 +31,7 @@ func GetSingleRtspClientManager() *RtspClientManager {
|
||||
|
||||
func (rs *RtspClientManager) StartClient() {
|
||||
go rs.serveStreams()
|
||||
done := make(chan interface{})
|
||||
go rs.stopConn(done, controllers.CodeStream())
|
||||
go rs.stopConn(controllers.CodeStream())
|
||||
}
|
||||
|
||||
func (rc *RtspClientManager) ExistsPublisher(code string) bool {
|
||||
@@ -45,7 +44,7 @@ func (rc *RtspClientManager) ExistsPublisher(code string) bool {
|
||||
return exists
|
||||
}
|
||||
|
||||
func (rs *RtspClientManager) stopConn(done <-chan interface{}, codeStream <-chan string) {
|
||||
func (rs *RtspClientManager) stopConn(codeStream <-chan string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logs.Error("system painc : %v \nstack : %v", r, string(debug.Stack()))
|
||||
@@ -118,7 +117,7 @@ func (s *RtspClientManager) connRtsp(code string) {
|
||||
Debug: false,
|
||||
DialTimeout: 10 * time.Second,
|
||||
ReadWriteTimeout: 10 * time.Second,
|
||||
DisableAudio: true,
|
||||
DisableAudio: false,
|
||||
}
|
||||
session, err := rtspv2.Dial(ro)
|
||||
if err != nil {
|
||||
@@ -147,23 +146,30 @@ func (s *RtspClientManager) connRtsp(code string) {
|
||||
s.rcs.Store(code, rc)
|
||||
s.conns.Store(code, session)
|
||||
logs.Info("%s", string(session.SDPRaw))
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
for pkt := range utils.OrDoneRefPacket(done, session.OutgoingPacketQueue) {
|
||||
//不能开goroutine,不能保证包的顺序
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
rtspStream := utils.OrDoneRefPacket(done, session.OutgoingPacketQueue)
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case pktStream <- pkt:
|
||||
ticker.Reset(1 * time.Second)
|
||||
case pkt, ok := <-rtspStream:
|
||||
if !ok {
|
||||
logs.Error("camera: %s rtsp packet stream is close", code)
|
||||
break Loop
|
||||
}
|
||||
//不能开goroutine,不能保证包的顺序
|
||||
select {
|
||||
case pktStream <- pkt:
|
||||
default:
|
||||
//添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包
|
||||
logs.Debug("rtspclient lose packet")
|
||||
}
|
||||
ticker.Reset(10 * time.Second)
|
||||
case <-ticker.C:
|
||||
logs.Error("rtspclient lose packet, time out")
|
||||
default:
|
||||
//添加缓冲,缓解前后速率不一致问题,但是如果收包平均速率大于消费平均速率,依然会导致丢包
|
||||
logs.Debug("rtspclient lose packet")
|
||||
logs.Error("camera: %s read packet from rtsp time out", code)
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logs.Error("session Close error : %v", err)
|
||||
}
|
||||
//offline camera
|
||||
camera, err := models.CameraSelectOne(q)
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user