diff --git a/main.go b/main.go index a0662b4..8bb6e66 100644 --- a/main.go +++ b/main.go @@ -60,7 +60,6 @@ func init() { if plugin := InstallPlugin(config); plugin != nil { plugin.HandleApi("/list", util.GetJsonHandler(getHDList, time.Second)) plugin.HandleFunc("/pull", func(rw http.ResponseWriter, r *http.Request) { - util.CORS(rw, r) targetURL := r.URL.Query().Get("target") streamPath := r.URL.Query().Get("streamPath") save := r.URL.Query().Get("save") @@ -111,7 +110,7 @@ func (config *HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } metaData := amf.Object{ - "MetaDataCreator": "m7s", + "MetaDataCreator": "m7s" + Version, "hasVideo": vt != nil, "hasAudio": at != nil, "hasMatadata": true, diff --git a/pull.go b/pull.go index 5594a0a..404cc1b 100644 --- a/pull.go +++ b/pull.go @@ -1,6 +1,7 @@ package hdl import ( + "bufio" "io" "net/http" "net/url" @@ -11,45 +12,65 @@ import ( . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/codec" "github.com/Monibuca/engine/v4/track" + "github.com/Monibuca/engine/v4/util" ) func (puller *HDLPuller) pull() { - head := make([]byte, len(codec.FLVHeader)) - io.ReadFull(puller, head) - startTime := time.Now() - for { - if t, timestamp, payload, err := codec.ReadFLVTag(puller); err == nil { - switch t { - case codec.FLV_TAG_TYPE_AUDIO: - puller.at.WriteAVCC(timestamp+puller.lastTs, payload) - case codec.FLV_TAG_TYPE_VIDEO: - puller.vt.WriteAVCC(timestamp+puller.lastTs, payload) - } - if timestamp != 0 { - elapse := time.Since(startTime) - // 如果读取过快,导致时间戳超过真正流逝的时间,就需要睡眠,降低速度 - if elapse.Milliseconds() < int64(timestamp) { - time.Sleep(time.Millisecond*time.Duration(timestamp) - elapse) - } - } - puller.lastTs = timestamp - } else { - puller.UnPublish() + head := util.Buffer(make([]byte, len(codec.FLVHeader))) + reader := bufio.NewReader(puller) + _, err := io.ReadFull(reader, head) + if err != nil { + return + } + head.Reset() + var startTime time.Time + var startTs uint32 + defer puller.UnPublish() + for offsetTs := puller.absTS; err == nil; _, err = io.ReadFull(reader, head[:4]) { + tmp := head.SubBuf(0, 11) + _, err = io.ReadFull(reader, tmp) + if err != nil { return } + t := tmp.ReadByte() + dataSize := tmp.ReadUint24() + timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24 + tmp.ReadUint24() + payload := make([]byte, dataSize) + _, err = io.ReadFull(reader, payload) + if err != nil { + return + } + timestamp -= startTs // 相对时间戳 + puller.absTS = offsetTs + timestamp + switch t { + case codec.FLV_TAG_TYPE_AUDIO: + puller.at.WriteAVCC(puller.absTS, payload) + case codec.FLV_TAG_TYPE_VIDEO: + puller.vt.WriteAVCC(puller.absTS, payload) + } + if timestamp != 0 { + if startTs == 0 { + startTs = timestamp + startTime = time.Now() + } else if fast := time.Duration(timestamp)*time.Millisecond - time.Since(startTime); fast > 0 { + // 如果读取过快,导致时间戳超过真正流逝的时间,就需要睡眠,降低速度 + time.Sleep(fast) + } + } } } type HDLPuller struct { Publisher - lastTs uint32 //断线前的时间戳 - at *track.UnknowAudio - vt *track.UnknowVideo + absTS uint32 //绝对时间戳 + at *track.UnknowAudio + vt *track.UnknowVideo io.ReadCloser } func (puller *HDLPuller) Close() { - + puller.ReadCloser.Close() } func (puller *HDLPuller) OnStateChange(old StreamState, n StreamState) bool { @@ -65,7 +86,6 @@ func (puller *HDLPuller) OnStateChange(old StreamState, n StreamState) bool { } } else { if file, err := os.Open(puller.String()); err == nil { - file.Seek(int64(len(codec.FLVHeader)), io.SeekStart) puller.ReadCloser = file } else { file.Close() @@ -83,7 +103,6 @@ func (puller *HDLPuller) OnStateChange(old StreamState, n StreamState) bool { } } else { if file, err := os.Open(puller.String()); err == nil { - file.Seek(int64(len(codec.FLVHeader)), io.SeekStart) puller.ReadCloser = file } else { file.Close()