diff --git a/README.md b/README.md index cbcdc80..ef9885f 100644 --- a/README.md +++ b/README.md @@ -49,4 +49,7 @@ ffplay -i http://localhost:2020/live/test.flv `/hdl/pull/pull?target=[HTTP-FLV地址]&streamPath=[流标识]&save=[是否保存配置(留空则不保存)]` 或者编程方式拉流 -`PullStream(streamPath, targetURL)` \ No newline at end of file +`PullStream(streamPath, targetURL)` + +### m7s读取本地flv文件重新发布 +同上,只需要把http地址改成本地文件地址即可,如果此时开启Reconnect则会在文件读取完成后重新开始读取 \ No newline at end of file diff --git a/pull.go b/pull.go index 26a95ff..548ec75 100644 --- a/pull.go +++ b/pull.go @@ -4,60 +4,88 @@ import ( "errors" "io" "net/http" + "os" + "strings" "time" . "github.com/Monibuca/engine/v3" "github.com/Monibuca/utils/v3/codec" ) -func pull(stream *Stream, reader io.Reader, lastDisconnect uint32) error { - var lastTime uint32 - if config.Reconnect { - time.Sleep(time.Second * 5) - go pull(stream, reader, lastTime) - } else { - defer stream.Close() - } +func pull(at *AudioTrack, vt *VideoTrack, reader io.Reader, lastDisconnect uint32) (lastTime uint32) { head := make([]byte, len(codec.FLVHeader)) io.ReadFull(reader, head) - at := stream.NewAudioTrack(0) - vt := stream.NewVideoTrack(0) - for readT := time.Now(); ; readT = time.Now() { + for startTime := time.Now(); ; { if t, timestamp, payload, err := codec.ReadFLVTag(reader); err == nil { - if lastDisconnect != 0 && timestamp == 0 { - continue - } - readCost := time.Since(readT) switch t { case codec.FLV_TAG_TYPE_AUDIO: at.PushByteStream(timestamp+lastDisconnect, payload) case codec.FLV_TAG_TYPE_VIDEO: vt.PushByteStream(timestamp+lastDisconnect, payload) - if timestamp != 0 { - if duration := time.Duration(timestamp-lastTime) * time.Millisecond; readCost < duration { - time.Sleep(duration - readCost) - } + } + if timestamp != 0 { + elapse := time.Since(startTime) + // 如果读取过快,导致时间戳超过真正流逝的时间,就需要睡眠,降低速度 + if elapse.Milliseconds() < int64(timestamp) { + time.Sleep(time.Millisecond*time.Duration(timestamp) - elapse) } - lastTime = timestamp + } + lastTime = timestamp + } else { + return + } + } +} +func PullStream(streamPath, url string) error { + stream := Stream{ + Type: "HDL Pull", + StreamPath: streamPath, + } + at := stream.NewAudioTrack(0) + vt := stream.NewVideoTrack(0) + if strings.HasPrefix(url, "http") { + if res, err := http.Get(url); err == nil { + if stream.Publish() { + go func() { + lastTs := pull(at, vt, res.Body, 0) + if config.Reconnect { + for stream.Err() == nil { + time.Sleep(time.Second * 5) + lastTs = pull(at, vt, res.Body, lastTs) + } + } else { + stream.Close() + } + }() + } else { + return errors.New("Bad Name") + } + } else { + return err + } + } else { + stream.Type = "FLV File" + if file, err := os.Open(url); err == nil { + if stream.Publish() { + go func() { + lastTs := pull(at, vt, file, 0) + if config.Reconnect { + for stream.Err() == nil { + file.Seek(0, io.SeekStart) + lastTs = pull(at, vt, file, lastTs) + } + } else { + file.Close() + stream.Close() + } + }() + } else { + file.Close() + return errors.New("Bad Name") } } else { return err } } -} -func PullStream(streamPath, url string) error { - if res, err := http.Get(url); err == nil { - stream := Stream{ - Type: "HDL Pull", - StreamPath: streamPath, - } - if stream.Publish() { - go pull(&stream, res.Body, 0) - } else { - return errors.New("Bad Name") - } - } else { - return err - } return nil }