feat: pull rtmp

This commit is contained in:
langhuihui
2024-04-07 20:30:04 +08:00
parent 0cab4d4b07
commit dbbf711781
20 changed files with 447 additions and 106 deletions

View File

@@ -1,7 +1,10 @@
global:
loglevel: debug
rtmp:
chunksize: 65535
chunksize: 2048
subscribe:
# submode: 1
subaudio: false
subaudio: false
pull:
pullonsub:
live/pull: rtmp://localhost/live/test

BIN
example/default/default Executable file

Binary file not shown.

View File

@@ -0,0 +1,7 @@
global:
# loglevel: debug
rtmp:
chunksize: 2048
subscribe:
# submode: 1
subaudio: false

View File

@@ -0,0 +1,15 @@
global:
loglevel: debug
http:
listenaddr: :8081
listenaddrtls: :8555
rtmp:
chunksize: 2048
tcp:
listenaddr:
subscribe:
# submode: 1
subaudio: false
pull:
pullonstart:
live/pull: rtmp://localhost/live/test

19
example/multiple/main.go Normal file
View File

@@ -0,0 +1,19 @@
package main
import (
"context"
"time"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/rtmp"
)
func main() {
ctx := context.Background()
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
go m7s.Run(ctx, "config1.yaml")
time.Sleep(time.Second * 10)
m7s.NewServer().Run(ctx, "config2.yaml")
}

6
go.mod
View File

@@ -25,11 +25,11 @@ require (
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
github.com/shirou/gopsutil/v3 v3.24.3
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/tools v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1
)

6
go.sum
View File

@@ -68,12 +68,16 @@ go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -81,6 +85,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=

View File

@@ -15,6 +15,16 @@ type AVFrame struct {
Timestamp time.Duration // 绝对时间戳
Wrap IAVFrame `json:"-" yaml:"-"` // 封装格式
}
func (frame *AVFrame) Reset() {
frame.DataFrame.Reset()
frame.Timestamp = 0
if frame.Wrap != nil {
frame.Wrap.Recycle()
frame.Wrap = nil
}
}
type DataFrame struct {
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Sequence uint32 // 在一个Track中的序号

View File

@@ -14,7 +14,7 @@ func (block Block) Split() (int, int) {
return block[0], block[1]
}
func (block Block) Combine(s, e int) (ret bool) {
func (block *Block) Combine(s, e int) (ret bool) {
if ret = block[0] == e; ret {
block[0] = s
} else if ret = block[1] == s; ret {
@@ -61,58 +61,98 @@ func (ma *MemoryAllocator) Malloc(size int) (memory []byte) {
return
}
func (ma *MemoryAllocator) Make(size int) (memory []byte) {
memory = ma.Malloc(size)
if memory == nil {
return make([]byte, size)
func (ma *MemoryAllocator) Free2(start, end int) bool {
if start < 0 || end > ma.Size {
return false
}
for e := ma.blocks.Front(); e != nil; e = e.Next() {
if e.Value.Combine(start, end) {
return true
}
if end < e.Value[0] {
ma.blocks.InsertBefore(Block{start, end}, e)
return true
}
}
ma.blocks.PushBack(Block{start, end})
return true
}
func (ma *MemoryAllocator) Free(mem []byte) bool {
ptr := uintptr(unsafe.Pointer(&mem[:1][0]))
start := int(int64(ptr) - ma.start)
return ma.Free2(start, start+len(mem))
}
type ScalableMemoryAllocator struct {
children []*MemoryAllocator
}
func NewScalableMemoryAllocator(size int, count int) (ret *ScalableMemoryAllocator) {
ret = &ScalableMemoryAllocator{
children: make([]*MemoryAllocator, count),
}
for i := range count {
ret.children[i] = NewMemoryAllocator(size)
}
return
}
func (ma *MemoryAllocator) Free2(start, end int) {
if start < 0 || end > ma.Size {
return
}
for e := ma.blocks.Front(); e != nil; e = e.Next() {
block := e.Value
if block.Combine(start, end) {
return
}
if end > block[0] {
ma.blocks.InsertBefore(Block{start, end}, e)
return
}
}
ma.blocks.PushBack(Block{start, end})
func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) {
memory, _, _, _ = sma.Malloc2(size)
return memory
}
func (ma *MemoryAllocator) Free(mem []byte) {
func (sma *ScalableMemoryAllocator) Malloc2(size int) (memory []byte, index, start, end int) {
for i, child := range sma.children {
index = i
if memory, start, end = child.Malloc2(size); memory != nil {
return
}
}
n := NewMemoryAllocator(max(sma.children[index].Size*2, size))
index++
memory, start, end = n.Malloc2(size)
sma.children = append(sma.children, n)
return
}
func (sma *ScalableMemoryAllocator) Free(mem []byte) bool {
ptr := uintptr(unsafe.Pointer(&mem[:1][0]))
start := int(int64(ptr) - ma.start)
ma.Free2(start, start+len(mem))
for _, child := range sma.children {
if start := int(int64(ptr) - child.start); child.Free2(start, start+len(mem)) {
return true
}
}
return false
}
func (sma *ScalableMemoryAllocator) Free2(index, start, end int) bool {
if index < 0 || index >= len(sma.children) {
return false
}
return sma.children[index].Free2(start, end)
}
type RecyclableMemory struct {
*MemoryAllocator
*ScalableMemoryAllocator
mem []int
}
func (r *RecyclableMemory) Malloc(size int) (memory []byte) {
ret, start, end := r.Malloc2(size)
if ret == nil {
return make([]byte, size)
}
if lastI := len(r.mem) - 1; lastI > 0 && r.mem[lastI] == start {
r.mem[lastI] = end
} else {
r.mem = append(r.mem, start, end)
}
ret, i, start, end := r.Malloc2(size)
// ml := len(r.mem)
// if lastI, lastE := ml-3, ml-1; lastI > 0 && r.mem[lastI] == i && r.mem[lastE] == start {
// r.mem[lastE] = end
// } else {
r.mem = append(r.mem, i, start, end)
// }
return ret
}
func (r *RecyclableMemory) Recycle() {
for i := 0; i < len(r.mem); i += 2 {
r.Free2(r.mem[i], r.mem[i+1])
for i := 0; i < len(r.mem); i += 3 {
r.Free2(r.mem[i], r.mem[i+1], r.mem[i+2])
}
r.mem = r.mem[:0]
}

View File

@@ -133,9 +133,8 @@ type Plugin struct {
Meta *PluginMeta
config config.Common
config.Config
Publishers []*Publisher
handler IPlugin
server *Server
handler IPlugin
server *Server
}
func (Plugin) nothing() {
@@ -237,12 +236,13 @@ func (p *Plugin) OnTCPConnect(conn *net.TCPConn) {
func (p *Plugin) Publish(streamPath string, options ...any) (publisher *Publisher, err error) {
publisher = &Publisher{Publish: p.config.Publish}
publisher.Init(p, streamPath, options...)
err = sendPromiseToServer(p.server, publisher)
return
return publisher, sendPromiseToServer(p.server, publisher)
}
func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Puller, err error) {
puller = &Puller{Pull: p.config.Pull}
puller = &Puller{Pull: p.config.Pull}
puller.Publish = p.config.Publish
puller.PublishTimeout = 0
puller.RemoteURL = url
puller.StreamPath = streamPath
puller.Init(p, streamPath, options...)
@@ -252,8 +252,7 @@ func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Pu
func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subscriber, err error) {
subscriber = &Subscriber{Subscribe: p.config.Subscribe}
subscriber.Init(p, streamPath, options...)
err = sendPromiseToServer(p.server, subscriber)
return
return subscriber, sendPromiseToServer(p.server, subscriber)
}
func (p *Plugin) registerHandler() {

View File

@@ -6,7 +6,6 @@ import (
"net"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
. "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
@@ -16,13 +15,26 @@ type RTMPPlugin struct {
KeepAlive bool
}
func (p *RTMPPlugin) OnInit() {
}
var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp:
listenaddr: :1935`))
func (p *RTMPPlugin) pull(streamPath, url string) {
puller, err := p.Pull(streamPath, url)
if err != nil {
p.Error("pull", "streamPath", streamPath, "url", url, "error", err)
return
}
var rtmpPuller RTMPPuller
rtmpPuller.Puller = puller
puller.Start(&rtmpPuller)
}
func (p *RTMPPlugin) OnInit() {
for streamPath, url := range p.GetCommonConf().PullOnStart {
go p.pull(streamPath, url)
}
}
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
defer conn.Close()
logger := p.Logger.With("remote", conn.RemoteAddr().String())
@@ -146,9 +158,6 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
delete(receivers, cmd.StreamId)
err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error)
} else {
if nc.ByteChunkPool.Size != 1<<20 {
nc.ByteChunkPool = util.NewMemoryAllocator(1 << 20)
}
receivers[cmd.StreamId] = receiver
err = receiver.BeginPublish(cmd.TransactionId)
}
@@ -180,7 +189,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
if r, ok := receivers[msg.MessageStreamID]; ok {
r.WriteAudio(&RTMPAudio{msg.AVData})
msg.AVData = RTMPData{}
msg.AVData.MemoryAllocator = nc.ByteChunkPool
msg.AVData.ScalableMemoryAllocator = nc.ByteChunkPool
} else {
logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID)
}
@@ -188,7 +197,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
if r, ok := receivers[msg.MessageStreamID]; ok {
r.WriteVideo(&RTMPVideo{msg.AVData})
msg.AVData = RTMPData{}
msg.AVData.MemoryAllocator = nc.ByteChunkPool
msg.AVData.ScalableMemoryAllocator = nc.ByteChunkPool
} else {
logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID)
}

96
plugin/rtmp/pkg/client.go Normal file
View File

@@ -0,0 +1,96 @@
package rtmp
import (
"crypto/tls"
"errors"
"fmt"
"log/slog"
"net"
"net/url"
"strings"
"m7s.live/m7s/v5"
)
func NewRTMPClient(addr string, logger *slog.Logger, chunkSize int) (client *NetConnection, err error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
ps := strings.Split(u.Path, "/")
if len(ps) < 3 {
return nil, errors.New("illegal rtmp url")
}
isRtmps := u.Scheme == "rtmps"
if strings.Count(u.Host, ":") == 0 {
if isRtmps {
u.Host += ":443"
} else {
u.Host += ":1935"
}
}
var conn net.Conn
if isRtmps {
var tlsconn *tls.Conn
tlsconn, err = tls.Dial("tcp", u.Host, &tls.Config{})
conn = tlsconn
} else {
conn, err = net.Dial("tcp", u.Host)
}
if err != nil {
return nil, err
}
defer func() {
if err != nil || client == nil {
conn.Close()
}
}()
client = NewNetConnection(conn)
client.Logger = logger
err = client.ClientHandshake()
if err != nil {
return nil, err
}
client.AppName = strings.Join(ps[1:len(ps)-1], "/")
err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(chunkSize))
if err != nil {
return
}
client.WriteChunkSize = chunkSize
path := u.Path
if len(u.Query()) != 0 {
path += "?" + u.RawQuery
}
err = client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{
CommandMessage{"connect", 1},
map[string]any{
"app": client.AppName,
"flashVer": "monibuca/" + m7s.Version,
"swfUrl": addr,
"tcUrl": strings.TrimSuffix(addr, path) + "/" + client.AppName,
},
nil,
})
for err != nil {
msg, err := client.RecvMessage()
if err != nil {
return nil, err
}
switch msg.MessageTypeID {
case RTMP_MSG_AMF0_COMMAND:
cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName {
case "_result":
response := msg.MsgData.(*ResponseMessage)
if response.Infomation["code"] == NetConnection_Connect_Success {
return client, nil
} else {
return nil, err
}
default:
fmt.Println(cmd.CommandName)
}
}
}
return
}

View File

@@ -67,7 +67,7 @@ var (
// C2 S2 : 参考C1 S1
func (nc *NetConnection) ReadBuf(length int) (buf []byte, err error) {
buf = nc.ByteChunkPool.Make(length)
buf = nc.ByteChunkPool.Malloc(length)
_, err = io.ReadFull(nc.Reader, buf)
return
}
@@ -96,7 +96,7 @@ func (nc *NetConnection) Handshake() error {
}
func (client *NetConnection) ClientHandshake() (err error) {
C0C1 := client.ByteChunkPool.Make(C1S1_SIZE + 1)
C0C1 := client.ByteChunkPool.Malloc(C1S1_SIZE + 1)
defer client.ByteChunkPool.Free(C0C1)
C0C1[0] = RTMP_HANDSHAKE_VERSION
if _, err = client.Write(C0C1); err == nil {
@@ -114,19 +114,19 @@ func (client *NetConnection) ClientHandshake() (err error) {
}
func (nc *NetConnection) simple_handshake(C1 []byte) error {
S0S1 := nc.ByteChunkPool.Make(C1S1_SIZE + 1)
S0S1 := nc.ByteChunkPool.Malloc(C1S1_SIZE + 1)
S0S1[0] = RTMP_HANDSHAKE_VERSION
util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF)
copy(S0S1[5:], "Monibuca")
nc.Write(S0S1)
nc.Write(C1) // S2
nc.ByteChunkPool.Free(S0S1)
defer nc.ByteChunkPool.Free(S0S1)
C2, err := nc.ReadBuf(C1S1_SIZE)
defer nc.ByteChunkPool.Free(C2)
if err != nil {
return err
}
if bytes.Compare(C2[8:], S0S1[9:]) != 0 {
if !bytes.Equal(C2[8:], S0S1[9:]) {
return errors.New("C2 Error")
}
return nil

View File

@@ -2,6 +2,7 @@ package rtmp
import (
"errors"
"net"
"runtime"
"m7s.live/m7s/v5"
@@ -36,12 +37,14 @@ func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12),
var chunkHeader util.Buffer = av.mem.Malloc(16)
defer av.mem.Recycle()
if av.lastAbs == 0 {
av.SetTimestamp(frame.Timestamp)
av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
av.WriteTo(RTMP_CHUNK_HEAD_12, &chunkHeader)
} else {
av.SetTimestamp(frame.Timestamp - av.lastAbs)
av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
av.WriteTo(RTMP_CHUNK_HEAD_8, &chunkHeader)
}
av.lastAbs = frame.Timestamp
// //数据被覆盖导致序号变了
@@ -49,21 +52,21 @@ func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
// return errors.New("sequence is not equal")
// }
r := frame.Buffers
chunkHeader := av.chunkHeader
av.chunk = append(av.chunk, chunkHeader)
var chunks net.Buffers
// av.chunk = append(av.chunk, chunkHeader)
chunks = append(chunks, chunkHeader)
// var buffer util.Buffer = r.ToBytes()
av.writeSeqNum += uint32(chunkHeader.Len() + r.WriteNTo(av.WriteChunkSize, &av.chunk))
if r.Length > 0 {
defer av.mem.Recycle()
for r.Length > 0 {
chunkHeader = av.mem.Malloc(5)
av.WriteTo(RTMP_CHUNK_HEAD_1, &chunkHeader)
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
av.chunk = append(av.chunk, chunkHeader)
av.writeSeqNum += uint32(chunkHeader.Len() + r.WriteNTo(av.WriteChunkSize, &av.chunk))
}
r.WriteNTo(av.WriteChunkSize, &chunks)
for r.Length > 0 {
chunkHeader = av.mem.Malloc(5)
av.WriteTo(RTMP_CHUNK_HEAD_1, &chunkHeader)
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
chunks = append(chunks, chunkHeader)
r.WriteNTo(av.WriteChunkSize, &chunks)
}
_, err = av.chunk.WriteTo(av.Conn)
var nw int64
nw, err = chunks.WriteTo(av.Conn)
av.writeSeqNum += uint32(nw)
return err
}
@@ -83,7 +86,7 @@ func (r *RTMPSender) Init() {
r.video.MessageTypeID = RTMP_MSG_VIDEO
r.audio.MessageStreamID = r.StreamID
r.video.MessageStreamID = r.StreamID
r.mem.MemoryAllocator = r.ByteChunkPool
r.mem.ScalableMemoryAllocator = r.ByteChunkPool
}
// func (rtmp *RTMPSender) OnEvent(event any) {

View File

@@ -61,7 +61,7 @@ type NetConnection struct {
AppName string
tmpBuf util.Buffer //用来接收/发送小数据,复用内存
chunkHeader util.Buffer
ByteChunkPool *util.MemoryAllocator
ByteChunkPool *util.ScalableMemoryAllocator
chunk net.Buffers
writing atomic.Bool // false 可写true 不可写
}
@@ -76,9 +76,10 @@ func NewNetConnection(conn net.Conn) *NetConnection {
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make(util.Buffer, 4),
chunkHeader: make(util.Buffer, 0, 16),
ByteChunkPool: util.NewMemoryAllocator(2048),
ByteChunkPool: util.NewScalableMemoryAllocator(2048, 1),
}
}
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
n, err = io.ReadFull(conn.Reader, buf)
if err == nil {
@@ -139,7 +140,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
if !ok {
chunk = &Chunk{}
conn.incommingChunks[ChunkStreamID] = chunk
chunk.AVData.MemoryAllocator = conn.ByteChunkPool
chunk.AVData.ScalableMemoryAllocator = conn.ByteChunkPool
}
if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {

77
plugin/rtmp/pkg/puller.go Normal file
View File

@@ -0,0 +1,77 @@
package rtmp
import (
"net/url"
"strings"
"m7s.live/m7s/v5"
)
type RTMPPuller struct {
*m7s.Puller
NetStream
}
func (puller *RTMPPuller) Connect() (err error) {
if puller.NetConnection, err = NewRTMPClient(puller.RemoteURL, puller.Publisher.Logger, 4096); err == nil {
puller.Closer = puller.NetConnection.Conn
puller.Info("connect", "remoteURL", puller.RemoteURL)
}
return
}
func (puller *RTMPPuller) Disconnect() {
if puller.NetConnection != nil {
puller.NetConnection.Close()
}
}
func (puller *RTMPPuller) Pull() (err error) {
err = puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
for err == nil {
msg, err := puller.RecvMessage()
if err != nil {
return err
}
switch msg.MessageTypeID {
case RTMP_MSG_AUDIO:
puller.WriteAudio(&RTMPAudio{msg.AVData})
msg.AVData = RTMPData{}
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ByteChunkPool
case RTMP_MSG_VIDEO:
puller.WriteVideo(&RTMPVideo{msg.AVData})
msg.AVData = RTMPData{}
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ByteChunkPool
case RTMP_MSG_AMF0_COMMAND:
cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName {
case "_result":
if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
puller.StreamID = response.StreamId
m := &PlayMessage{}
m.StreamId = response.StreamId
m.TransactionId = 4
m.CommandMessage.CommandName = "play"
URL, _ := url.Parse(puller.RemoteURL)
ps := strings.Split(URL.Path, "/")
puller.Args = URL.Query()
m.StreamName = ps[len(ps)-1]
if len(puller.Args) > 0 {
m.StreamName += "?" + puller.Args.Encode()
}
puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// if response, ok := msg.MsgData.(*ResponsePlayMessage); ok {
// if response.Object["code"] == "NetStream.Play.Start" {
// } else if response.Object["level"] == Level_Error {
// return errors.New(response.Object["code"].(string))
// }
// } else {
// return errors.New("pull faild")
// }
}
}
}
}
return
}

View File

@@ -85,7 +85,6 @@ func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
if err = parseSequence(); err != nil {
return err
}
} else {
}
}
return nil

View File

@@ -34,7 +34,9 @@ type Publisher struct {
func (p *Publisher) timeout() (err error) {
switch p.State {
case PublisherStateInit:
err = ErrPublishTimeout
if p.PublishTimeout > 0 {
err = ErrPublishTimeout
}
case PublisherStateTrackAdded:
if p.Publish.IdleTimeout > 0 {
err = ErrPublishIdleTimeout
@@ -53,11 +55,13 @@ func (p *Publisher) checkTimeout() (err error) {
case <-p.TimeoutTimer.C:
err = p.timeout()
default:
if p.VideoTrack != nil && !p.VideoTrack.LastValue.WriteTime.IsZero() && time.Since(p.VideoTrack.LastValue.WriteTime) > p.PublishTimeout {
err = ErrPublishTimeout
}
if p.AudioTrack != nil && !p.AudioTrack.LastValue.WriteTime.IsZero() && time.Since(p.AudioTrack.LastValue.WriteTime) > p.PublishTimeout {
err = ErrPublishTimeout
if p.PublishTimeout > 0 {
if p.VideoTrack != nil && !p.VideoTrack.LastValue.WriteTime.IsZero() && time.Since(p.VideoTrack.LastValue.WriteTime) > p.PublishTimeout {
err = ErrPublishTimeout
}
if p.AudioTrack != nil && !p.AudioTrack.LastValue.WriteTime.IsZero() && time.Since(p.AudioTrack.LastValue.WriteTime) > p.PublishTimeout {
err = ErrPublishTimeout
}
}
}
return
@@ -67,6 +71,7 @@ func (p *Publisher) RemoveSubscriber(subscriber *Subscriber) (err error) {
p.Lock()
defer p.Unlock()
delete(p.Subscribers, subscriber)
p.Info("subscriber -1", "count", len(p.Subscribers))
if p.State == PublisherStateSubscribed && len(p.Subscribers) == 0 {
p.State = PublisherStateWaitSubscriber
if p.DelayCloseTimeout > 0 {
@@ -81,10 +86,13 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
defer p.Unlock()
subscriber.Publisher = p
p.Subscribers[subscriber] = struct{}{}
p.Info("subscriber +1", "count", len(p.Subscribers))
switch p.State {
case PublisherStateTrackAdded, PublisherStateWaitSubscriber:
p.State = PublisherStateSubscribed
p.TimeoutTimer.Reset(p.PublishTimeout)
if p.PublishTimeout > 0 {
p.TimeoutTimer.Reset(p.PublishTimeout)
}
}
return
}
@@ -93,10 +101,8 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
frame := &t.Value
frame.Wrap = data
frame.Timestamp = data.GetTimestamp()
p.Debug("write", "seq", frame.Sequence)
t.Step()
if t.Value.Wrap != nil {
t.Value.Wrap.Recycle()
}
}
func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
@@ -124,12 +130,12 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
if data.IsIDR() {
if t.IDRing != nil {
p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence)
if t.HistoryRing == nil {
if l := t.Size - p.GOP; l > 12 {
t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5)
t.Reduce(5) //缩小缓冲环节省内存
}
}
// if t.HistoryRing == nil {
// if l := t.Size - p.GOP; l > 12 {
// t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5)
// t.Reduce(5) //缩小缓冲环节省内存
// }
// }
}
if p.BufferTime > 0 {
t.IDRingList.AddIDR(t.Ring)

View File

@@ -1,13 +1,16 @@
package m7s
import "m7s.live/m7s/v5/pkg/config"
import (
"io"
"time"
"m7s.live/m7s/v5/pkg/config"
)
type PullHandler interface {
Connect() error
OnConnected()
Disconnect()
Pull() error
Reconnect() bool
}
type Puller struct {
@@ -18,6 +21,48 @@ type Puller struct {
ReConnectCount int //重连次数
}
func (p *Puller) Start() error {
// 是否需要重连
func (p *Puller) reconnect() (ok bool) {
ok = p.RePull == -1 || p.ReConnectCount <= p.RePull
p.ReConnectCount++
return
}
func (p *Puller) Start(handler PullHandler) (err error) {
p.PullHandler = handler
badPuller := true
var startTime time.Time
for p.Info("start pull"); p.reconnect(); p.Warn("restart pull") {
if time.Since(startTime) < 5*time.Second {
time.Sleep(5 * time.Second)
}
startTime = time.Now()
if err = p.Connect(); err != nil {
if err == io.EOF {
p.Info("pull complete")
return
}
p.Error("pull connect", "error", err)
if badPuller {
return
}
} else {
badPuller = false
p.ReConnectCount = 0
if err = handler.Pull(); err != nil && !p.IsStopped() {
p.Error("pull interrupt", "error", err)
}
}
if p.IsStopped() {
p.Info("stop pull")
return
}
handler.Disconnect()
}
return nil
}
func (p *Puller) Stop(err error) {
p.Disconnect()
p.Publisher.Stop(err)
}

View File

@@ -52,6 +52,7 @@ type Server struct {
func NewServer() (s *Server) {
s = &Server{
Streams: make(map[string]*Publisher),
Pulls: make(map[string]*Puller),
Waiting: make(map[string][]*Subscriber),
eventChan: make(chan any, 10),
}
@@ -202,10 +203,10 @@ func (s *Server) eventLoop() {
plugin.onEvent(event)
}
default:
if subStart := 3 + pubCount; chosen < subStart {
s.onUnpublish(s.Publishers[chosen-3])
if subStart, pubIndex := 3+pubCount, chosen-3; chosen < subStart {
s.onUnpublish(s.Publishers[pubIndex])
pubCount--
s.Publishers = slices.Delete(s.Publishers, chosen-3, chosen-2)
s.Publishers = slices.Delete(s.Publishers, pubIndex, pubIndex+1)
} else {
i := chosen - subStart
s.onUnsubscribe(s.Subscribers[i])
@@ -218,6 +219,7 @@ func (s *Server) eventLoop() {
}
func (s *Server) onUnsubscribe(subscriber *Subscriber) {
s.Info("unsubscribe", "streamPath", subscriber.StreamPath)
if subscriber.Publisher != nil {
subscriber.Publisher.RemoveSubscriber(subscriber)
}
@@ -225,10 +227,14 @@ func (s *Server) onUnsubscribe(subscriber *Subscriber) {
func (s *Server) onUnpublish(publisher *Publisher) {
delete(s.Streams, publisher.StreamPath)
s.Info("unpublish", "streamPath", publisher.StreamPath, "count", len(s.Streams))
for subscriber := range publisher.Subscribers {
s.Waiting[publisher.StreamPath] = append(s.Waiting[publisher.StreamPath], subscriber)
subscriber.TimeoutTimer.Reset(publisher.WaitCloseTimeout)
}
if puller, ok := s.Pulls[publisher.StreamPath]; ok {
puller.Disconnect()
}
}
func (s *Server) OnPublish(publisher *Publisher) error {