feat: add play control

This commit is contained in:
langhuihui
2024-09-19 20:10:34 +08:00
parent a93b8150a7
commit ddf8fc2c73
5 changed files with 106 additions and 16 deletions

View File

@@ -0,0 +1,5 @@
global:
loglevel: info
flv:
pull:
live/test: /Users/dexter/Movies/jb-demo.flv

View File

@@ -0,0 +1,15 @@
global:
loglevel: debug
tcp: :50050
http: :8081
disableall: true
rtmp:
enable: true
tcp: :1936
pull:
live/test:
url: rtmp://localhost/live/test
maxretry: -1
onpub:
push:
live/test: rtmp://localhost/live/test2

View File

@@ -56,24 +56,33 @@ func (plugin *FLVPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
w.Header().Set("Content-Type", "video/x-flv")
w.Header().Set("Transfer-Encoding", "identity")
w.WriteHeader(http.StatusOK)
var conn net.Conn
conn, err = live.Subscriber.CheckWebSocket(w, r)
if err != nil {
return
}
wto := plugin.GetCommonConf().WriteTimeout
if hijacker, ok := w.(http.Hijacker); ok && wto > 0 {
conn, _, _ := hijacker.Hijack()
conn.SetWriteDeadline(time.Now().Add(wto))
live.WriteFlvTag = func(flv net.Buffers) (err error) {
if conn == nil {
w.Header().Set("Content-Type", "video/x-flv")
w.Header().Set("Transfer-Encoding", "identity")
w.WriteHeader(http.StatusOK)
if hijacker, ok := w.(http.Hijacker); ok && wto > 0 {
conn, _, _ = hijacker.Hijack()
conn.SetWriteDeadline(time.Now().Add(wto))
_, err = flv.WriteTo(conn)
return
}
} else {
}
if conn == nil {
live.WriteFlvTag = func(flv net.Buffers) (err error) {
_, err = flv.WriteTo(w)
return
}
w.(http.Flusher).Flush()
} else {
live.WriteFlvTag = func(flv net.Buffers) (err error) {
conn.SetWriteDeadline(time.Now().Add(wto))
_, err = flv.WriteTo(conn)
return
}
}
err = live.Run()
}

View File

@@ -92,12 +92,25 @@ func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusOK)
var ctx MediaContext
ctx.conn, err = sub.CheckWebSocket(w, r)
if err != nil {
return
}
wto := p.GetCommonConf().WriteTimeout
if ctx.conn == nil {
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusOK)
if hijacker, ok := w.(http.Hijacker); ok && wto > 0 {
ctx.conn, _, _ = hijacker.Hijack()
ctx.conn.SetWriteDeadline(time.Now().Add(wto))
}
}
initSegment := mp4.CreateEmptyInit()
initSegment.Moov.Mvhd.NextTrackID = 1
var ctx MediaContext
ctx.wto = p.GetCommonConf().WriteTimeout
var ftyp *mp4.FtypBox
var offsetAudio, offsetVideo = 1, 5
@@ -165,8 +178,7 @@ func (p *MP4Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
stsd.AddChild(pcmu)
}
}
if hijacker, ok := w.(http.Hijacker); ok && ctx.wto > 0 {
ctx.conn, _, _ = hijacker.Hijack()
if ctx.conn != nil {
ctx.Writer = ctx.conn
} else {
ctx.Writer = w

View File

@@ -1,14 +1,19 @@
package m7s
import (
"encoding/binary"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"reflect"
"runtime"
"strings"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"m7s.live/m7s/v5/pkg/task"
. "m7s.live/m7s/v5/pkg"
@@ -118,6 +123,50 @@ func (s *Subscriber) Dispose() {
}
}
type PlayController struct {
task.Task
conn net.Conn
Subscriber *Subscriber
}
func (pc *PlayController) Go() (err error) {
for err == nil {
var b []byte
b, err = wsutil.ReadClientBinary(pc.conn)
if pc.Subscriber.Publisher == nil {
continue
}
if len(b) >= 3 && [3]byte(b[:3]) == [3]byte{'c', 'm', 'd'} {
switch b[3] {
case 1: // pause
pc.Subscriber.Publisher.Pause()
case 2: // resume
pc.Subscriber.Publisher.Resume()
case 3: // seek
pc.Subscriber.Publisher.Seek(time.Duration(binary.BigEndian.Uint32(b[4:8])))
case 4: // speed
pc.Subscriber.Publisher.Speed = float64(binary.BigEndian.Uint32(b[4:8])) / 100
}
}
}
return
}
func (s *Subscriber) CheckWebSocket(w http.ResponseWriter, r *http.Request) (conn net.Conn, err error) {
if r.Header.Get("Upgrade") == "websocket" {
conn, _, _, err = ws.UpgradeHTTP(r, w)
if err != nil {
return
}
var playController = &PlayController{
Subscriber: s,
conn: conn,
}
s.AddTask(playController)
}
return
}
func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time.Duration) (awi int) {
if s.Publisher == nil || dataType == nil {
return