Files
engine/puller.go
2023-10-09 06:02:47 +08:00

109 lines
2.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"io"
"strings"
"time"
"go.uber.org/zap"
"m7s.live/engine/v4/config"
)
var zshutdown = zap.String("reason", "shutdown")
var znomorereconnect = zap.String("reason", "no more reconnect")
type IPuller interface {
IPublisher
Connect() error
OnConnected()
Disconnect()
Pull() error
Reconnect() bool
init(streamPath string, url string, conf *config.Pull)
startPull(IPuller)
}
// 用于远程拉流的发布者
type Puller struct {
ClientIO[config.Pull]
}
func (pub *Puller) OnConnected() {
pub.ReConnectCount = 0 // 重置重连次数
}
// 是否需要重连
func (pub *Puller) Reconnect() (ok bool) {
ok = pub.Config.RePull == -1 || pub.ReConnectCount <= pub.Config.RePull
pub.ReConnectCount++
return
}
func (pub *Puller) startPull(puller IPuller) {
badPuller := true
var stream *Stream
var err error
streamPath := pub.StreamPath
if i := strings.Index(streamPath, "?"); i >= 0 {
streamPath = streamPath[:i]
}
if oldPuller, loaded := Pullers.LoadOrStore(streamPath, puller); loaded {
pub := oldPuller.(IPuller).GetPublisher()
stream = pub.Stream
if stream != nil {
puller.Error("puller already exists", zap.Int8("streamState", int8(stream.State)))
} else {
puller.Error("puller already exists", zap.Time("createAt", pub.StartTime))
}
return
}
defer func() {
Pullers.Delete(streamPath)
puller.Disconnect()
if stream != nil {
stream.Close()
}
}()
puber := puller.GetPublisher()
var startTime time.Time
for puller.Info("start pull"); puller.Reconnect(); puller.Warn("restart pull") {
if time.Since(startTime) < 5*time.Second {
time.Sleep(5 * time.Second)
}
startTime = time.Now()
if err = puller.Connect(); err != nil {
if err == io.EOF {
puller.Info("pull complete")
return
}
puller.Error("pull connect", zap.Error(err))
if badPuller {
return
}
} else {
if err = puller.Publish(pub.StreamPath, puller); err != nil {
puller.Error("pull publish", zap.Error(err))
return
}
s := puber.Stream
if stream != s && stream != nil { // 这段代码说明老流已经中断创建了新流需要把track置空从而避免复用
puber.AudioTrack = nil
puber.VideoTrack = nil
puber.Context, puber.CancelFunc = context.WithCancel(Engine) // 老流的上下文已经取消,需要重新创建
}
stream = s
badPuller = false
if err = puller.Pull(); err != nil && !puller.IsShutdown() {
puller.Error("pull interrupt", zap.Error(err))
}
}
if puller.IsShutdown() {
puller.Info("stop pull", zshutdown)
return
}
puller.Disconnect()
}
puller.Warn("stop pull", znomorereconnect)
}