feat: add ws fetch ps stream

This commit is contained in:
langhuihui
2023-05-16 23:06:23 +08:00
parent 2baf2ee5db
commit 5945e8b421
6 changed files with 121 additions and 18 deletions

42
main.go
View File

@@ -12,16 +12,21 @@ import (
"sync"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/pion/rtp"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/lang"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
)
type PSConfig struct {
config.HTTP
config.Publish
config.Subscribe
RelayMode int // 转发模式,0:转协议+不转发,1:不转协议+转发2:转协议+转发
streams sync.Map
shareTCP sync.Map
@@ -113,6 +118,43 @@ func (c *PSConfig) ServeUDP(conn *net.UDPConn) {
}
}
func (c *PSConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
streamPath := strings.TrimPrefix(r.URL.Path, "/")
if r.URL.RawQuery != "" {
streamPath += "?" + r.URL.RawQuery
}
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
return
}
var suber PSSubscriber
suber.SetIO(conn)
suber.SetParentCtx(r.Context())
suber.ID = r.RemoteAddr
if err = PSPlugin.Subscribe(streamPath, &suber); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer suber.Stop()
b, err := wsutil.ReadClientBinary(conn)
var rtpPacket rtp.Packet
if err == nil {
dc := track.NewDataTrack[[]byte]("voice")
dc.Attach(suber.Stream)
for err == nil {
err = rtpPacket.Unmarshal(b)
if err == nil {
dc.Push(rtpPacket.Payload)
}
b, err = wsutil.ReadClientBinary(conn)
}
} else {
// baseStream.Error("receive", zap.Error(err))
}
}
func Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error) {
var pubber PSPublisher
if _, loaded := conf.streams.LoadOrStore(ssrc, &pubber); loaded {

View File

@@ -182,6 +182,7 @@ func (dec *DecPSPackage) decProgramStreamMap() error {
if err != nil {
return err
}
defer dec.EsHandler.ReceivePSM(psm)
l := len(psm)
index := 2
programStreamInfoLen := util.BigEndian.Uint16(psm[index:])

View File

@@ -21,6 +21,7 @@ const (
type EsHandler interface {
ReceiveAudio(MpegPsEsStream)
ReceiveVideo(MpegPsEsStream)
ReceivePSM(util.Buffer)
}
type MpegPsEsStream struct {

View File

@@ -13,7 +13,6 @@ import (
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/track"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/ps/v4/mpegps"
@@ -26,7 +25,7 @@ type cacheItem struct {
type PSPublisher struct {
Publisher
relayTrack *track.Data
relayTrack *PSTrack
rtp.Packet
DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
// mpegps.MpegPsStream `json:"-" yaml:"-"`
@@ -45,8 +44,7 @@ func (p *PSPublisher) OnEvent(event any) {
case IPublisher:
p.dumpLen = make([]byte, 6)
if conf.RelayMode != 0 {
p.relayTrack = p.Stream.NewDataTrack("ps", nil)
p.relayTrack.Attach()
p.relayTrack = NewPSTrack(p.Stream)
}
case SEclose, SEKick:
conf.streams.Delete(p.Stream.Path)
@@ -90,20 +88,14 @@ func (p *PSPublisher) ServeUDP(conn *net.UDPConn) {
p.PushPS(bufUDP[:n])
}
}
func (p *PSPublisher) PushPS(ps util.Buffer) {
if conf.RelayMode != 0 {
item := p.pool.Get(len(ps))
copy(item.Value, ps)
p.relayTrack.Push(item)
}
if conf.RelayMode != 1 {
if err := p.Unmarshal(ps); err != nil {
p.Error("gb28181 decode rtp error:", zap.Error(err))
} else if !p.IsClosed() {
p.writeDump(ps)
}
p.pushPS()
if err := p.Unmarshal(ps); err != nil {
p.Error("gb28181 decode rtp error:", zap.Error(err))
} else if !p.IsClosed() {
p.writeDump(ps)
}
p.pushPS()
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
@@ -119,6 +111,14 @@ func (p *PSPublisher) pushPS() {
p.lastSeq = p.SequenceNumber - 1
p.pool = make(util.BytesPool, 17)
}
if conf.RelayMode != 0 {
item := p.pool.Get(len(p.Packet.Payload))
copy(item.Value, p.Packet.Payload)
p.relayTrack.Push(item)
}
if conf.RelayMode == 1 && p.relayTrack.PSM != nil {
return
}
if p.DisableReorder {
p.Feed(p.Packet.Payload)
p.lastSeq = p.SequenceNumber
@@ -210,7 +210,7 @@ func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) {
}
func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
if !conf.PubVideo {
if !conf.PubVideo || conf.RelayMode == 1 {
return
}
if p.VideoTrack == nil {
@@ -248,7 +248,7 @@ func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
}
func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
if !conf.PubAudio {
if !conf.PubAudio || conf.RelayMode == 1 {
return
}
ts, payload := es.PTS, es.Buffer
@@ -306,3 +306,8 @@ func (p *PSPublisher) Replay(f *os.File) (err error) {
}
return
}
func (p *PSPublisher) ReceivePSM(buf util.Buffer) {
if p.relayTrack != nil {
p.relayTrack.PSM = buf.Clone()
}
}

24
subscriber.go Normal file
View File

@@ -0,0 +1,24 @@
package ps
import (
"github.com/gobwas/ws/wsutil"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
)
type PSSubscriber struct {
Subscriber
}
func (ps *PSSubscriber) OnEvent(event any) {
switch v := event.(type) {
case *PSTrack:
wsutil.WriteServerBinary(ps, util.ConcatBuffers(v.GetPSM()))
go v.Play(ps.IO, func(data *common.DataFrame[*util.ListItem[util.Buffer]]) error {
return wsutil.WriteServerBinary(ps, data.Value.Value)
})
default:
ps.Subscriber.OnEvent(event)
}
}

30
track.go Normal file
View File

@@ -0,0 +1,30 @@
package ps
import (
"net"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
)
type PSTrack struct {
track.Data[*util.ListItem[util.Buffer]]
PSM util.Buffer
}
func (ps *PSTrack) GetPSM() (result net.Buffers) {
psmLen := ps.PSM.Len()
return append(net.Buffers{[]byte{0, 0, 1, 0xbc, byte(psmLen >> 8), byte(psmLen)}}, ps.PSM)
}
func NewPSTrack(s common.IStream) *PSTrack {
result := &PSTrack{}
result.Init(20)
result.SetStuff("ps", s)
result.Reset = func(f *common.DataFrame[*util.ListItem[util.Buffer]]) {
f.Value.Recycle()
}
s.AddTrack(result)
return result
}