package hdl import ( "bufio" "io" "net/http" "os" "strings" . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/codec" "github.com/Monibuca/engine/v4/log" "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" ) func (puller *HDLPuller) connect() (err error) { puller.ReConnectCount++ log.Info("connect", zap.String("remoteURL", puller.RemoteURL)) if strings.HasPrefix(puller.RemoteURL, "http") { var res *http.Response if res, err = http.Get(puller.RemoteURL); err == nil { puller.OnEvent(res.Body) } } else { var res *os.File if res, err = os.Open(puller.RemoteURL); err == nil { puller.OnEvent(res) } } if err != nil { log.Error("connect", zap.Error(err)) } return } func (puller *HDLPuller) pull() { var err error defer func() { puller.Closer.Close() if !puller.Stream.IsClosed() { if err = puller.connect(); err == nil { go puller.pull() } } else { puller.Info("stop", zap.String("remoteURL", puller.RemoteURL)) } }() head := util.Buffer(make([]byte, len(codec.FLVHeader))) reader := bufio.NewReader(puller) _, err = io.ReadFull(reader, head) if err != nil { return } head.Reset() var startTs uint32 for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(reader, head[:4]) { tmp := head.SubBuf(0, 11) _, err = io.ReadFull(reader, tmp) if err != nil { return } t := tmp.ReadByte() dataSize := tmp.ReadUint24() timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24 if startTs == 0 { startTs = timestamp } tmp.ReadUint24() payload := make([]byte, dataSize) _, err = io.ReadFull(reader, payload) if err != nil { return } puller.absTS = offsetTs + (timestamp - startTs) switch t { case codec.FLV_TAG_TYPE_AUDIO: puller.AudioTrack.WriteAVCC(puller.absTS, payload) case codec.FLV_TAG_TYPE_VIDEO: puller.VideoTrack.WriteAVCC(puller.absTS, payload) } } } type HDLPuller struct { Publisher Puller absTS uint32 //绝对时间戳 } func (puller *HDLPuller) OnEvent(event any) { switch event.(type) { case SEpublish: go puller.pull() //阻塞拉流 default: puller.Publisher.OnEvent(event) } }