Files
monibuca/puller.go
2025-09-26 15:57:26 +08:00

482 lines
11 KiB
Go

package m7s
import (
"crypto/tls"
"io"
"math"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
task "github.com/langhuihui/gotask"
pkg "m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/format"
"m7s.live/v5/pkg/util"
)
type (
Connection struct {
task.Job
Plugin *Plugin
StreamPath string // 对应本地流
Args url.Values
RemoteURL string // 远程服务器地址(用于推拉)
HTTPClient *http.Client
}
IPuller interface {
task.ITask
GetPullJob() *PullJob
}
PullerFactory = func(config.Pull) IPuller
PullJob struct {
Connection
*config.Pull
Publisher *Publisher
PublishConfig config.Publish
puller IPuller
Progress *SubscriptionProgress
}
HTTPFilePuller struct {
task.Task
PullJob PullJob
io.ReadCloser
}
RecordFilePuller struct {
task.Task
PullJob PullJob
PullStartTime, PullEndTime time.Time
Streams []RecordStream
File *os.File
MaxTS int64
seekChan chan time.Time
Type string
Loop int
}
wsReadCloser struct {
ws *websocket.Conn
}
)
// Fixed progress steps for HTTP file pull workflow
var httpFilePullSteps = []pkg.StepDef{
{Name: pkg.StepPublish, Description: "Publishing file stream"},
{Name: pkg.StepURLParsing, Description: "Determining file source type"},
{Name: pkg.StepConnection, Description: "Establishing file connection"},
{Name: pkg.StepParsing, Description: "Parsing file format"},
{Name: pkg.StepStreaming, Description: "Reading and publishing stream data"},
}
func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, proxyConf string) {
conn.RemoteURL = href
conn.StreamPath = streamPath
conn.Plugin = plugin
// Create a custom HTTP client that ignores HTTPS certificate validation
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
if proxyConf != "" {
proxy, err := url.Parse(proxyConf)
if err != nil {
return
}
tr.Proxy = http.ProxyURL(proxy)
}
conn.HTTPClient = &http.Client{Transport: tr}
}
func (p *PullJob) GetPullJob() *PullJob {
return p
}
func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf config.Pull, pubConf *config.Publish) *PullJob {
if pubConf == nil {
p.PublishConfig = plugin.GetCommonConf().Publish
} else {
p.PublishConfig = *pubConf
}
p.PublishConfig.PubType = PublishTypePull
p.Connection.Args = url.Values(conf.Args.DeepClone())
p.Pull = &conf
remoteURL := conf.URL
u, err := url.Parse(remoteURL)
if err == nil {
if u.Host == "" {
// file
remoteURL = u.Path
}
if p.Connection.Args == nil {
p.Connection.Args = u.Query()
} else {
for k, v := range u.Query() {
for _, vv := range v {
p.Connection.Args.Add(k, vv)
}
}
}
}
p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy)
p.puller = puller
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
"streamPath": streamPath,
"url": conf.URL,
"args": conf.Args,
"maxRetry": conf.MaxRetry,
})
puller.SetRetry(conf.MaxRetry, conf.RetryInterval)
if sender, webhook := plugin.getHookSender(config.HookOnPullStart); sender != nil {
puller.OnStart(func() {
alarmInfo := AlarmInfo{
AlarmName: string(config.HookOnPullStart),
StreamPath: streamPath,
AlarmType: config.AlarmPullRecover,
}
sender(webhook, alarmInfo)
})
}
if sender, webhook := plugin.getHookSender(config.HookOnPullEnd); sender != nil {
puller.OnDispose(func() {
p.Fail(puller.StopReason().Error())
alarmInfo := AlarmInfo{
AlarmName: string(config.HookOnPullEnd),
AlarmDesc: puller.StopReason().Error(),
StreamPath: streamPath,
AlarmType: config.AlarmPullOffline,
}
sender(webhook, alarmInfo)
})
}
plugin.Server.Pulls.AddTask(p, plugin.Logger.With("pullURL", conf.URL, "streamPath", streamPath))
return p
}
func (p *PullJob) GetKey() string {
return p.StreamPath
}
// Strongly typed helper.
func (p *PullJob) GoToStepConst(name pkg.StepName) {
if p.Progress == nil {
return
}
// Find step index by name
stepIndex := -1
for i, step := range p.Progress.Steps {
if step.Name == string(name) {
stepIndex = i
break
}
}
if stepIndex >= 0 {
// complete current step if moving forward
cur := p.Progress.CurrentStep
if cur != stepIndex {
cs := &p.Progress.Steps[cur]
if cs.StartedAt.IsZero() {
cs.StartedAt = time.Now()
}
if cs.CompletedAt.IsZero() {
cs.CompletedAt = time.Now()
}
}
p.Progress.CurrentStep = stepIndex
ns := &p.Progress.Steps[stepIndex]
ns.Error = ""
if ns.StartedAt.IsZero() {
ns.StartedAt = time.Now()
}
}
}
// Fail marks the current step as failed with an error message
func (p *PullJob) Fail(errorMsg string) {
if p.Progress == nil {
return
}
idx := p.Progress.CurrentStep
if idx >= 0 && idx < len(p.Progress.Steps) {
s := &p.Progress.Steps[idx]
s.Error = errorMsg
if s.StartedAt.IsZero() {
s.StartedAt = time.Now()
}
if s.CompletedAt.IsZero() { // mark failed completion time
s.CompletedAt = time.Now()
}
}
}
// SetProgressSteps sets multiple steps from a string array where every two elements represent a step (name, description)
func (p *PullJob) SetProgressStepsDefs(defs []pkg.StepDef) {
if p.Progress == nil {
return
}
p.Progress.Steps = p.Progress.Steps[:0]
for _, d := range defs {
p.Progress.Steps = append(p.Progress.Steps, Step{Name: string(d.Name), Description: d.Description})
}
}
func (p *PullJob) Publish() (err error) {
if p.TestMode > 0 {
return nil
}
streamPath := p.StreamPath
if len(p.Connection.Args) > 0 {
streamPath += "?" + p.Connection.Args.Encode()
}
var publisher *Publisher
publisher, err = p.Plugin.PublishWithConfig(p.puller, streamPath, p.PublishConfig)
if err == nil {
p.Publisher = publisher
publisher.OnDispose(func() {
if publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.MaxRetry == 0 {
p.Stop(publisher.StopReason())
} else {
p.puller.Stop(publisher.StopReason())
}
})
}
return
}
func (p *PullJob) Start() (err error) {
p.AddTask(p.puller, p.Logger)
return
}
func (p *HTTPFilePuller) Start() (err error) {
p.PullJob.SetProgressStepsDefs(httpFilePullSteps)
if p.PullJob.PublishConfig.Speed == 0 {
p.PullJob.PublishConfig.Speed = 1 // 对于文件流需要控制速度
}
if err = p.PullJob.Publish(); err != nil {
p.PullJob.Fail(err.Error())
return
}
// move to url_parsing step
p.PullJob.GoToStepConst(pkg.StepURLParsing)
if p.ReadCloser != nil {
return
}
p.PullJob.GoToStepConst(pkg.StepConnection)
remoteURL := p.PullJob.RemoteURL
p.Info("pull", "remoteurl", remoteURL)
if strings.HasPrefix(remoteURL, "http") {
var res *http.Response
if res, err = p.PullJob.HTTPClient.Get(remoteURL); err == nil {
if res.StatusCode != http.StatusOK {
p.PullJob.Fail("HTTP status not OK")
return io.EOF
}
p.ReadCloser = res.Body
}
} else if strings.HasPrefix(remoteURL, "ws") {
var ws *websocket.Conn
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
if ws, _, err = dialer.Dial(remoteURL, nil); err == nil {
p.ReadCloser = &wsReadCloser{ws: ws}
}
} else {
var res *os.File
if res, err = os.Open(remoteURL); err == nil {
p.ReadCloser = res
}
//p.PullJob.Publisher.Publish.Speed = 1
}
if err != nil {
p.PullJob.Fail(err.Error())
}
p.OnStop(p.ReadCloser.Close)
return
}
func (p *HTTPFilePuller) GetPullJob() *PullJob {
return &p.PullJob
}
func (p *HTTPFilePuller) Dispose() {
p.ReadCloser = nil
}
func (p *RecordFilePuller) GetPullJob() *PullJob {
return &p.PullJob
}
func (p *RecordFilePuller) queryRecordStreams(startTime, endTime time.Time) (err error) {
if p.PullJob.Plugin.DB == nil {
return pkg.ErrNoDB
}
queryRecord := RecordStream{
Type: p.Type,
}
tx := p.PullJob.Plugin.DB.Where(&queryRecord).Find(&p.Streams, "end_time>=? AND start_time<=? AND stream_path=?", startTime, endTime, p.PullJob.RemoteURL)
if tx.Error != nil {
return tx.Error
}
if len(p.Streams) == 0 {
return pkg.ErrNotFound
}
for _, stream := range p.Streams {
p.Debug("queryRecordStreams", "filePath", stream.FilePath)
}
p.MaxTS = endTime.Sub(startTime).Milliseconds()
return nil
}
func (p *RecordFilePuller) Start() (err error) {
p.SetRetry(0, 0)
if p.PullJob.Plugin.DB == nil {
return pkg.ErrNoDB
}
p.PullJob.PublishConfig.PubType = PublishTypeVod
if err = p.PullJob.Publish(); err != nil {
return
}
if p.PullStartTime, p.PullEndTime, err = util.TimeRangeQueryParse(p.PullJob.Connection.Args); err != nil {
return
}
p.seekChan = make(chan time.Time, 1)
loop := p.PullJob.Connection.Args.Get(util.LoopKey)
p.Loop, err = strconv.Atoi(loop)
if err != nil || p.Loop < 0 {
p.Loop = math.MaxInt32
}
publisher := p.PullJob.Publisher
if publisher != nil {
publisher.OnSeek = func(seekTime time.Time) {
// p.PullStartTime = seekTime
// p.SetRetry(1, 0)
// if util.UnixTimeReg.MatchString(p.PullJob.Args.Get(util.EndKey)) {
// p.PullJob.Args.Set(util.StartKey, strconv.FormatInt(seekTime.Unix(), 10))
// } else {
// p.PullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
// }
select {
case p.seekChan <- seekTime:
default:
}
}
}
return p.queryRecordStreams(p.PullStartTime, p.PullEndTime)
}
func (p *RecordFilePuller) GetSeekChan() chan time.Time {
return p.seekChan
}
func (p *RecordFilePuller) Dispose() {
if p.File != nil {
p.File.Close()
}
close(p.seekChan)
}
func (w *wsReadCloser) Read(p []byte) (n int, err error) {
_, message, err := w.ws.ReadMessage()
if err != nil {
return 0, err
}
return copy(p, message), nil
}
func (w *wsReadCloser) Close() error {
return w.ws.Close()
}
func (p *RecordFilePuller) CheckSeek() (needSeek bool, err error) {
select {
case p.PullStartTime = <-p.seekChan:
if err = p.queryRecordStreams(p.PullStartTime, p.PullEndTime); err != nil {
return
}
if p.File != nil {
p.File.Close()
p.File = nil
}
needSeek = true
default:
}
return
}
func NewAnnexBPuller(conf config.Pull) IPuller {
return &AnnexBPuller{}
}
type AnnexBPuller struct {
HTTPFilePuller
}
func (p *AnnexBPuller) Run() (err error) {
allocator := util.NewScalableMemoryAllocator(1 << util.MinPowerOf2)
defer allocator.Recycle()
writer := NewPublishVideoWriter[*format.AnnexB](p.PullJob.Publisher, allocator)
frame := writer.VideoFrame
// 移动到解析步骤 - 开始解析文件格式
p.PullJob.GoToStepConst(pkg.StepParsing)
// 创建 AnnexB 专用读取器
var annexbReader pkg.AnnexBReader
var hasFrame bool
// 移动到流数据读取步骤 - 开始读取和发布数据
p.PullJob.GoToStepConst(pkg.StepStreaming)
for !p.IsStopped() {
// 读取一块数据
chunkData := allocator.Malloc(8192)
n, readErr := p.ReadCloser.Read(chunkData)
if n != 8192 {
allocator.Free(chunkData[n:])
chunkData = chunkData[:n]
}
if readErr != nil && readErr != io.EOF {
p.PullJob.Fail(readErr.Error())
p.Error("读取数据失败", "error", readErr)
return readErr
}
// 将新数据追加到 AnnexB 读取器
annexbReader.AppendBuffer(chunkData)
hasFrame, err = frame.Parse(&annexbReader)
if err != nil {
p.PullJob.Fail(err.Error())
return
}
if hasFrame {
frame.SetTS32(uint32(time.Now().UnixMilli()))
if err = writer.NextVideo(); err != nil {
p.PullJob.Fail(err.Error())
return
}
frame = writer.VideoFrame
}
if readErr == io.EOF {
return
}
}
return
}