mirror of
https://github.com/Monibuca/plugin-hdl.git
synced 2025-10-05 16:56:55 +08:00
100 lines
2.1 KiB
Go
100 lines
2.1 KiB
Go
package hdl
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"go.uber.org/zap"
|
|
. "m7s.live/engine/v4"
|
|
"m7s.live/engine/v4/codec"
|
|
"m7s.live/engine/v4/log"
|
|
"m7s.live/engine/v4/util"
|
|
)
|
|
|
|
func (puller *HDLPuller) connect() (err error) {
|
|
puller.ReConnectCount++
|
|
log.Info("connect", zap.String("remoteURL", puller.RemoteURL))
|
|
if strings.HasPrefix(puller.RemoteURL, "http") {
|
|
var res *http.Response
|
|
if res, err = http.Get(puller.RemoteURL); err == nil {
|
|
puller.SetIO(res.Body)
|
|
}
|
|
} else {
|
|
var res *os.File
|
|
if res, err = os.Open(puller.RemoteURL); err == nil {
|
|
puller.SetIO(res)
|
|
}
|
|
}
|
|
if err != nil {
|
|
log.Error("connect", zap.Error(err))
|
|
}
|
|
return
|
|
}
|
|
func (puller *HDLPuller) pull() {
|
|
var err error
|
|
defer func() {
|
|
if puller.Closer != nil {
|
|
puller.Closer.Close()
|
|
}
|
|
if !puller.Stream.IsClosed() {
|
|
if err = puller.connect(); err == nil {
|
|
go puller.pull()
|
|
}
|
|
} else {
|
|
puller.Info("stop", zap.String("remoteURL", puller.RemoteURL))
|
|
}
|
|
}()
|
|
head := util.Buffer(make([]byte, len(codec.FLVHeader)))
|
|
reader := bufio.NewReader(puller)
|
|
_, err = io.ReadFull(reader, head)
|
|
if err != nil {
|
|
return
|
|
}
|
|
head.Reset()
|
|
var startTs uint32
|
|
for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(reader, head[:4]) {
|
|
tmp := head.SubBuf(0, 11)
|
|
_, err = io.ReadFull(reader, tmp)
|
|
if err != nil {
|
|
return
|
|
}
|
|
t := tmp.ReadByte()
|
|
dataSize := tmp.ReadUint24()
|
|
timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24
|
|
if startTs == 0 {
|
|
startTs = timestamp
|
|
}
|
|
tmp.ReadUint24()
|
|
payload := make([]byte, dataSize)
|
|
_, err = io.ReadFull(reader, payload)
|
|
if err != nil {
|
|
return
|
|
}
|
|
puller.absTS = offsetTs + (timestamp - startTs)
|
|
switch t {
|
|
case codec.FLV_TAG_TYPE_AUDIO:
|
|
puller.AudioTrack.WriteAVCC(puller.absTS, payload)
|
|
case codec.FLV_TAG_TYPE_VIDEO:
|
|
puller.VideoTrack.WriteAVCC(puller.absTS, payload)
|
|
}
|
|
}
|
|
}
|
|
|
|
type HDLPuller struct {
|
|
Publisher
|
|
Puller
|
|
absTS uint32 //绝对时间戳
|
|
}
|
|
|
|
func (puller *HDLPuller) OnEvent(event any) {
|
|
switch event.(type) {
|
|
case SEpublish:
|
|
go puller.pull() //阻塞拉流
|
|
default:
|
|
puller.Publisher.OnEvent(event)
|
|
}
|
|
}
|