对远端拉流连接时增加flv头判断

This commit is contained in:
dexter
2022-12-18 14:29:17 +08:00
parent c5562764e5
commit a2b7c2781f
2 changed files with 30 additions and 23 deletions

View File

@@ -28,7 +28,7 @@ func (c *HDLConfig) OnEvent(event any) {
case FirstConfig:
if c.PullOnStart {
for streamPath, url := range c.PullList {
if err := HDLPlugin.Pull(streamPath, url, new(HDLPuller), false); err != nil {
if err := HDLPlugin.Pull(streamPath, url, NewHDLPuller(), false); err != nil {
HDLPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
}
@@ -37,7 +37,7 @@ func (c *HDLConfig) OnEvent(event any) {
if c.PullOnSubscribe {
for streamPath, url := range c.PullList {
if streamPath == v.Path {
if err := HDLPlugin.Pull(streamPath, url, new(HDLPuller), false); err != nil {
if err := HDLPlugin.Pull(streamPath, url, NewHDLPuller(), false); err != nil {
HDLPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
}
break
@@ -48,7 +48,7 @@ func (c *HDLConfig) OnEvent(event any) {
}
func (c *HDLConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
err := HDLPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(HDLPuller), r.URL.Query().Has("save"))
err := HDLPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), NewHDLPuller(), r.URL.Query().Has("save"))
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
} else {
@@ -114,7 +114,7 @@ func (sub *HDLSubscriber) OnEvent(event any) {
// s := util.SizeOfBuffers(v)
if _, err := v.WriteTo(sub); err != nil {
sub.Stop()
// } else {
// } else {
// println(time.Since(t)/time.Millisecond, s)
}
default:

45
pull.go
View File

@@ -1,7 +1,6 @@
package hdl
import (
"bufio"
"io"
"net/http"
"os"
@@ -13,6 +12,19 @@ import (
"m7s.live/engine/v4/util"
)
type HDLPuller struct {
Publisher
Puller
absTS uint32 //绝对时间戳
buf util.Buffer
}
func NewHDLPuller() *HDLPuller {
return &HDLPuller{
buf: util.Buffer(make([]byte, len(codec.FLVHeader))),
}
}
func (puller *HDLPuller) Connect() (err error) {
HDLPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
if strings.HasPrefix(puller.RemoteURL, "http") {
@@ -29,20 +41,20 @@ func (puller *HDLPuller) Connect() (err error) {
if err != nil {
HDLPlugin.Error("connect", zap.Error(err))
}
head := puller.buf.SubBuf(0, len(codec.FLVHeader))
_, err = io.ReadFull(puller, head)
if head[0] != 'F' || head[1] != 'L' || head[2] != 'V' {
err = codec.ErrInvalidFLV
}
return
}
func (puller *HDLPuller) Pull() {
head := util.Buffer(make([]byte, len(codec.FLVHeader)))
reader := bufio.NewReader(puller)
_, err := io.ReadFull(reader, head)
if err != nil {
return
}
head.Reset()
func (puller *HDLPuller) Pull() (err error) {
puller.buf.Reset()
var startTs uint32
for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(reader, head[:4]) {
tmp := head.SubBuf(0, 11)
_, err = io.ReadFull(reader, tmp)
for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(puller, puller.buf[:4]) {
tmp := puller.buf.SubBuf(0, 11)
_, err = io.ReadFull(puller, tmp)
if err != nil {
return
}
@@ -54,7 +66,7 @@ func (puller *HDLPuller) Pull() {
}
tmp.ReadUint24()
payload := make([]byte, dataSize)
_, err = io.ReadFull(reader, payload)
_, err = io.ReadFull(puller, payload)
if err != nil {
return
}
@@ -66,10 +78,5 @@ func (puller *HDLPuller) Pull() {
puller.WriteAVCCVideo(puller.absTS, payload)
}
}
}
type HDLPuller struct {
Publisher
Puller
absTS uint32 //绝对时间戳
return
}