diff --git a/main.go b/main.go index 024ab9d..dc7e177 100644 --- a/main.go +++ b/main.go @@ -10,10 +10,12 @@ import ( . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/codec" + "github.com/Monibuca/engine/v4/common" "github.com/Monibuca/engine/v4/config" "github.com/Monibuca/engine/v4/util" . "github.com/logrusorgru/aurora" amf "github.com/zhangpeihao/goamf" + "go.uber.org/zap" ) type HDLConfig struct { @@ -27,20 +29,22 @@ var streamPathReg = regexp.MustCompile(`/(hdl/)?((.+)(\.flv)|(.+))`) func (c *HDLConfig) Update(override config.Config) { if c.ListenAddr != "" || c.ListenAddrTLS != "" { - plugin.Info(Green("HDL Listen at "), BrightBlue(c.ListenAddr), BrightBlue(c.ListenAddrTLS)) + plugin.Info(Green("HDL Server Start").String(), zap.String("ListenAddr", c.ListenAddr), zap.String("ListenAddrTLS", c.ListenAddrTLS)) c.Listen(plugin, c) } } func (c *HDLConfig) API_Pull(rw http.ResponseWriter, r *http.Request) { - targetURL := r.URL.Query().Get("target") - streamPath := r.URL.Query().Get("streamPath") - if c.PullStream(streamPath, Puller{RemoteURL: targetURL, Config: &c.Pull}) { - if r.URL.Query().Get("save") != "" { - c.AddPull(streamPath, targetURL) - plugin.Modified["pull"] = c.Pull - if err := plugin.Save(); err != nil { - plugin.Error(err) - } + var puller Puller + puller.StreamPath = r.URL.Query().Get("streamPath") + puller.RemoteURL = r.URL.Query().Get("target") + puller.Config = &c.Pull + c.PullStream(puller) + + if r.URL.Query().Get("save") != "" { + c.AddPull(puller.StreamPath, puller.RemoteURL) + plugin.Modified["pull"] = c.Pull + if err := plugin.Save(); err != nil { + plugin.Error("save faild", zap.Error(err)) } } } @@ -49,8 +53,31 @@ func (*HDLConfig) API_List(rw http.ResponseWriter, r *http.Request) { } var Config = new(HDLConfig) + +// 确保HDLConfig实现了PullPlugin接口 +var _ PullPlugin = Config + var plugin = InstallPlugin(Config) +type HDLSubscriber struct { + Subscriber +} + +func (sub *HDLSubscriber) OnEvent(event any) { + switch v := event.(type) { + case AudioDeConf: + if sub.AudioTrack.IsAAC() { + v.FLV.WriteTo(sub) + } + case VideoDeConf: + v.FLV.WriteTo(sub) + case common.MediaFrame: + flvTag := v.GetFLV() + flvTag.WriteTo(sub) + default: + sub.Subscriber.OnEvent(event) + } +} func (*HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { parts := streamPathReg.FindStringSubmatch(r.RequestURI) if len(parts) == 0 { @@ -63,11 +90,18 @@ func (*HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Transfer-Encoding", "chunked") w.Header().Set("Content-Type", "video/x-flv") - sub := Subscriber{ID: r.RemoteAddr, Type: "FLV"} - if sub.Subscribe(stringPath, Config.Subscribe) { - vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack() - hasVideo := vt != nil - hasAudio := at != nil + sub := &HDLSubscriber{} + sub.ID = r.RemoteAddr + sub.OnEvent(r.Context()) + if plugin.Subscribe(stringPath, sub) { + if sub.Stream.Publisher == nil { + w.WriteHeader(404) + return + } + sub.Writer = w + at, vt := sub.AudioTrack, sub.VideoTrack + hasVideo := at != nil + hasAudio := vt != nil var buffer bytes.Buffer if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil { return @@ -99,29 +133,15 @@ func (*HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { metaData["videocodecid"] = int(vt.CodecID) metaData["width"] = vt.SPSInfo.Width metaData["height"] = vt.SPSInfo.Height - sub.OnVideo = func(frame *VideoFrame) error { - frame.FLV.WriteTo(w) - return r.Context().Err() - } } if hasVideo { metaData["audiocodecid"] = int(at.CodecID) metaData["audiosamplerate"] = at.SampleRate metaData["audiosamplesize"] = at.SampleSize metaData["stereo"] = at.Channels == 2 - sub.OnAudio = func(frame *AudioFrame) error { - frame.FLV.WriteTo(w) - return r.Context().Err() - } } codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, net.Buffers{buffer.Bytes()}) - if hasVideo { - vt.DecoderConfiguration.FLV.WriteTo(w) - } - if hasAudio && at.CodecID == codec.CodecID_AAC { - at.DecoderConfiguration.FLV.WriteTo(w) - } - sub.Play(at, vt) + sub.PlayBlock(sub) } else { w.WriteHeader(500) } diff --git a/pull.go b/pull.go index 17c796d..2ad1e2f 100644 --- a/pull.go +++ b/pull.go @@ -6,15 +6,15 @@ import ( "net/http" "os" "strings" - "time" . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/codec" - "github.com/Monibuca/engine/v4/track" "github.com/Monibuca/engine/v4/util" + "go.uber.org/zap" ) func (puller *HDLPuller) pull() { + puller.ReConnectCount++ head := util.Buffer(make([]byte, len(codec.FLVHeader))) reader := bufio.NewReader(puller) _, err := io.ReadFull(reader, head) @@ -22,10 +22,8 @@ func (puller *HDLPuller) pull() { return } head.Reset() - var startTime time.Time var startTs uint32 - defer puller.UnPublish() - for offsetTs := puller.absTS; err == nil; _, err = io.ReadFull(reader, head[:4]) { + for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(reader, head[:4]) { tmp := head.SubBuf(0, 11) _, err = io.ReadFull(reader, tmp) if err != nil { @@ -34,81 +32,75 @@ func (puller *HDLPuller) pull() { t := tmp.ReadByte() dataSize := tmp.ReadUint24() timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24 + if startTs == 0 { + startTs = timestamp + } tmp.ReadUint24() payload := make([]byte, dataSize) _, err = io.ReadFull(reader, payload) if err != nil { return } - timestamp -= startTs // 相对时间戳 - puller.absTS = offsetTs + timestamp + puller.absTS = offsetTs + (timestamp - startTs) switch t { case codec.FLV_TAG_TYPE_AUDIO: - puller.at.WriteAVCC(puller.absTS, payload) + puller.AudioTrack.WriteAVCC(puller.absTS, payload) case codec.FLV_TAG_TYPE_VIDEO: - puller.vt.WriteAVCC(puller.absTS, payload) - } - if timestamp != 0 { - if startTs == 0 { - startTs = timestamp - startTime = time.Now() - } else if fast := time.Duration(timestamp)*time.Millisecond - time.Since(startTime); fast > 0 { - // 如果读取过快,导致时间戳超过真正流逝的时间,就需要睡眠,降低速度 - time.Sleep(fast) - } + puller.VideoTrack.WriteAVCC(puller.absTS, payload) } } } -var _ IPuller = (*HDLPuller)(nil) -var _ IPuller = (*FLVFile)(nil) - type HDLPuller struct { + Publisher Puller absTS uint32 //绝对时间戳 - at *track.UnknowAudio - vt *track.UnknowVideo } -// 用于发布FLV文件 -type FLVFile struct { - HDLPuller +func (puller *HDLPuller) OnEvent(event any) { + switch v := event.(type) { + case PullEvent: + if v > 0 { + go func(count PullEvent) { + puller.pull() //阻塞拉流 + // 如果流没有被关闭,则重连,重拉 + if !puller.Stream.IsClosed() { + puller.OnEvent(count) + } + }(v + 1) + } else { + // TODO: 发布失败重新发布 + if plugin.Publish(puller.StreamPath, puller) { + if strings.HasPrefix(puller.RemoteURL, "http") { + if res, err := http.Get(puller.RemoteURL); err == nil { + puller.Reader = res.Body + puller.Closer = res.Body + } else { + puller.Error(puller.RemoteURL, zap.Error(err)) + return + } + } else { + if res, err := os.Open(puller.RemoteURL); err == nil { + puller.Reader = res + puller.Closer = res + } else { + puller.Error(puller.RemoteURL, zap.Error(err)) + return + } + } + // 注入context + puller.OnEvent(Engine) + puller.OnEvent(PullEvent(1)) + } + } + default: + puller.Publisher.OnEvent(event) + } } -func (puller *FLVFile) Pull(count int) { - if count == 0 { - puller.at = puller.NewAudioTrack() - puller.vt = puller.NewVideoTrack() +func (config *HDLConfig) PullStream(puller Puller) { + client := &HDLPuller{ + Puller: puller, } - if file, err := os.Open(puller.RemoteURL); err == nil { - puller.Reader = file - puller.Closer = file - } else { - file.Close() - return - } - puller.pull() -} - -func (puller *HDLPuller) Pull(count int) { - if count == 0 { - puller.at = puller.NewAudioTrack() - puller.vt = puller.NewVideoTrack() - } - if res, err := http.Get(puller.RemoteURL); err == nil { - puller.Reader = res.Body - puller.Closer = res.Body - } else { - puller.Error(err) - return - } - puller.pull() -} - -func (config *HDLConfig) PullStream(streamPath string, puller Puller) bool { - var puber IPublisher = &HDLPuller{Puller: puller} - if !strings.HasPrefix(puller.RemoteURL, "http") { - puber = &FLVFile{HDLPuller: *puber.(*HDLPuller)} - } - return puber.Publish(streamPath, puber, Config.Publish) + client.OnEvent(PullEvent(0)) }