From a2b7c2781f019e20d99187127b00cae665fa6ffb Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sun, 18 Dec 2022 14:29:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9=E8=BF=9C=E7=AB=AF=E6=8B=89=E6=B5=81?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=97=B6=E5=A2=9E=E5=8A=A0flv=E5=A4=B4?= =?UTF-8?q?=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 8 ++++---- pull.go | 45 ++++++++++++++++++++++++++------------------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index 739bbf7..be173ae 100644 --- a/main.go +++ b/main.go @@ -28,7 +28,7 @@ func (c *HDLConfig) OnEvent(event any) { case FirstConfig: if c.PullOnStart { for streamPath, url := range c.PullList { - if err := HDLPlugin.Pull(streamPath, url, new(HDLPuller), false); err != nil { + if err := HDLPlugin.Pull(streamPath, url, NewHDLPuller(), false); err != nil { HDLPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) } } @@ -37,7 +37,7 @@ func (c *HDLConfig) OnEvent(event any) { if c.PullOnSubscribe { for streamPath, url := range c.PullList { if streamPath == v.Path { - if err := HDLPlugin.Pull(streamPath, url, new(HDLPuller), false); err != nil { + if err := HDLPlugin.Pull(streamPath, url, NewHDLPuller(), false); err != nil { HDLPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) } break @@ -48,7 +48,7 @@ func (c *HDLConfig) OnEvent(event any) { } func (c *HDLConfig) API_Pull(rw http.ResponseWriter, r *http.Request) { - err := HDLPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(HDLPuller), r.URL.Query().Has("save")) + err := HDLPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), NewHDLPuller(), r.URL.Query().Has("save")) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) } else { @@ -114,7 +114,7 @@ func (sub *HDLSubscriber) OnEvent(event any) { // s := util.SizeOfBuffers(v) if _, err := v.WriteTo(sub); err != nil { sub.Stop() - // } else { + // } else { // println(time.Since(t)/time.Millisecond, s) } default: diff --git a/pull.go b/pull.go index 9104861..621e577 100644 --- a/pull.go +++ b/pull.go @@ -1,7 +1,6 @@ package hdl import ( - "bufio" "io" "net/http" "os" @@ -13,6 +12,19 @@ import ( "m7s.live/engine/v4/util" ) +type HDLPuller struct { + Publisher + Puller + absTS uint32 //绝对时间戳 + buf util.Buffer +} + +func NewHDLPuller() *HDLPuller { + return &HDLPuller{ + buf: util.Buffer(make([]byte, len(codec.FLVHeader))), + } +} + func (puller *HDLPuller) Connect() (err error) { HDLPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL)) if strings.HasPrefix(puller.RemoteURL, "http") { @@ -29,20 +41,20 @@ func (puller *HDLPuller) Connect() (err error) { if err != nil { HDLPlugin.Error("connect", zap.Error(err)) } + head := puller.buf.SubBuf(0, len(codec.FLVHeader)) + _, err = io.ReadFull(puller, head) + if head[0] != 'F' || head[1] != 'L' || head[2] != 'V' { + err = codec.ErrInvalidFLV + } return } -func (puller *HDLPuller) Pull() { - head := util.Buffer(make([]byte, len(codec.FLVHeader))) - reader := bufio.NewReader(puller) - _, err := io.ReadFull(reader, head) - if err != nil { - return - } - head.Reset() + +func (puller *HDLPuller) Pull() (err error) { + puller.buf.Reset() var startTs uint32 - for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(reader, head[:4]) { - tmp := head.SubBuf(0, 11) - _, err = io.ReadFull(reader, tmp) + for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(puller, puller.buf[:4]) { + tmp := puller.buf.SubBuf(0, 11) + _, err = io.ReadFull(puller, tmp) if err != nil { return } @@ -54,7 +66,7 @@ func (puller *HDLPuller) Pull() { } tmp.ReadUint24() payload := make([]byte, dataSize) - _, err = io.ReadFull(reader, payload) + _, err = io.ReadFull(puller, payload) if err != nil { return } @@ -66,10 +78,5 @@ func (puller *HDLPuller) Pull() { puller.WriteAVCCVideo(puller.absTS, payload) } } -} - -type HDLPuller struct { - Publisher - Puller - absTS uint32 //绝对时间戳 + return }