diff --git a/example/multiple/transfer/rtmp/config1.yaml b/example/multiple/transfer/rtmp/config1.yaml new file mode 100644 index 0000000..5c7f095 --- /dev/null +++ b/example/multiple/transfer/rtmp/config1.yaml @@ -0,0 +1,5 @@ +global: + loglevel: info +flv: + pull: + live/test: /Users/dexter/Movies/jb-demo.flv diff --git a/example/multiple/transfer/rtmp/config2.yaml b/example/multiple/transfer/rtmp/config2.yaml new file mode 100644 index 0000000..4d3df71 --- /dev/null +++ b/example/multiple/transfer/rtmp/config2.yaml @@ -0,0 +1,15 @@ +global: + loglevel: debug + tcp: :50050 + http: :8081 + disableall: true +rtmp: + enable: true + tcp: :1936 + pull: + live/test: + url: rtmp://localhost/live/test + maxretry: -1 + onpub: + push: + live/test: rtmp://localhost/live/test2 diff --git a/plugin/flv/index.go b/plugin/flv/index.go index 47a469e..1ac9a21 100644 --- a/plugin/flv/index.go +++ b/plugin/flv/index.go @@ -56,24 +56,33 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { return } - w.Header().Set("Content-Type", "video/x-flv") - w.Header().Set("Transfer-Encoding", "identity") - w.WriteHeader(http.StatusOK) + var conn net.Conn + conn, err = live.Subscriber.CheckWebSocket(w, r) + if err != nil { + return + } wto := plugin.GetCommonConf().WriteTimeout - if hijacker, ok := w.(http.Hijacker); ok && wto > 0 { - conn, _, _ := hijacker.Hijack() - conn.SetWriteDeadline(time.Now().Add(wto)) - live.WriteFlvTag = func(flv net.Buffers) (err error) { + if conn == nil { + w.Header().Set("Content-Type", "video/x-flv") + w.Header().Set("Transfer-Encoding", "identity") + w.WriteHeader(http.StatusOK) + if hijacker, ok := w.(http.Hijacker); ok && wto > 0 { + conn, _, _ = hijacker.Hijack() conn.SetWriteDeadline(time.Now().Add(wto)) - _, err = flv.WriteTo(conn) - return } - } else { + } + if conn == nil { live.WriteFlvTag = func(flv net.Buffers) (err error) { _, err = flv.WriteTo(w) return } w.(http.Flusher).Flush() + } else { + live.WriteFlvTag = func(flv net.Buffers) (err error) { + conn.SetWriteDeadline(time.Now().Add(wto)) + _, err = flv.WriteTo(conn) + return + } } err = live.Run() } diff --git a/plugin/mp4/index.go b/plugin/mp4/index.go index bc8154a..4be7bce 100644 --- a/plugin/mp4/index.go +++ b/plugin/mp4/index.go @@ -92,12 +92,25 @@ func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - w.Header().Set("Transfer-Encoding", "chunked") - w.Header().Set("Content-Type", "video/mp4") - w.WriteHeader(http.StatusOK) + var ctx MediaContext + ctx.conn, err = sub.CheckWebSocket(w, r) + if err != nil { + return + } + wto := p.GetCommonConf().WriteTimeout + if ctx.conn == nil { + w.Header().Set("Transfer-Encoding", "chunked") + w.Header().Set("Content-Type", "video/mp4") + w.WriteHeader(http.StatusOK) + if hijacker, ok := w.(http.Hijacker); ok && wto > 0 { + ctx.conn, _, _ = hijacker.Hijack() + ctx.conn.SetWriteDeadline(time.Now().Add(wto)) + } + } + initSegment := mp4.CreateEmptyInit() initSegment.Moov.Mvhd.NextTrackID = 1 - var ctx MediaContext + ctx.wto = p.GetCommonConf().WriteTimeout var ftyp *mp4.FtypBox var offsetAudio, offsetVideo = 1, 5 @@ -165,8 +178,7 @@ func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { stsd.AddChild(pcmu) } } - if hijacker, ok := w.(http.Hijacker); ok && ctx.wto > 0 { - ctx.conn, _, _ = hijacker.Hijack() + if ctx.conn != nil { ctx.Writer = ctx.conn } else { ctx.Writer = w diff --git a/subscriber.go b/subscriber.go index 488f5fa..e9f4f7e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,14 +1,19 @@ package m7s import ( + "encoding/binary" "errors" "fmt" + "net" + "net/http" "net/url" "reflect" "runtime" "strings" "time" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "m7s.live/m7s/v5/pkg/task" . "m7s.live/m7s/v5/pkg" @@ -118,6 +123,50 @@ func (s *Subscriber) Dispose() { } } +type PlayController struct { + task.Task + conn net.Conn + Subscriber *Subscriber +} + +func (pc *PlayController) Go() (err error) { + for err == nil { + var b []byte + b, err = wsutil.ReadClientBinary(pc.conn) + if pc.Subscriber.Publisher == nil { + continue + } + if len(b) >= 3 && [3]byte(b[:3]) == [3]byte{'c', 'm', 'd'} { + switch b[3] { + case 1: // pause + pc.Subscriber.Publisher.Pause() + case 2: // resume + pc.Subscriber.Publisher.Resume() + case 3: // seek + pc.Subscriber.Publisher.Seek(time.Duration(binary.BigEndian.Uint32(b[4:8]))) + case 4: // speed + pc.Subscriber.Publisher.Speed = float64(binary.BigEndian.Uint32(b[4:8])) / 100 + } + } + } + return +} + +func (s *Subscriber) CheckWebSocket(w http.ResponseWriter, r *http.Request) (conn net.Conn, err error) { + if r.Header.Get("Upgrade") == "websocket" { + conn, _, _, err = ws.UpgradeHTTP(r, w) + if err != nil { + return + } + var playController = &PlayController{ + Subscriber: s, + conn: conn, + } + s.AddTask(playController) + } + return +} + func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time.Duration) (awi int) { if s.Publisher == nil || dataType == nil { return