mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-23 13:13:10 +08:00
refactor: simplify code
This commit is contained in:
15
api.go
15
api.go
@@ -8,18 +8,13 @@ import (
|
||||
"m7s.live/m7s/v5/pkg/pb"
|
||||
)
|
||||
|
||||
type StreamSnapShot struct {
|
||||
StreamPath string
|
||||
*Publisher
|
||||
}
|
||||
|
||||
func (s *Server) StreamSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamSnapShot, err error) {
|
||||
snap := &StreamSnapShot{StreamPath: req.StreamPath}
|
||||
err = sendPromiseToServer(s, snap)
|
||||
if snap.Publisher == nil {
|
||||
result, err := s.Call(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return snap.SnapShot(), nil
|
||||
puber := result.(*Publisher)
|
||||
return puber.SnapShot(), nil
|
||||
}
|
||||
|
||||
func (s *Server) Restart(ctx context.Context, req *pb.RequestWithId) (res *emptypb.Empty, err error) {
|
||||
@@ -39,7 +34,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *empt
|
||||
}
|
||||
|
||||
func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest) (res *pb.StopSubscribeResponse, err error) {
|
||||
err = sendPromiseToServer(s, req)
|
||||
_, err = s.Call(req)
|
||||
return &pb.StopSubscribeResponse{
|
||||
Success: err == nil,
|
||||
}, err
|
||||
|
20
plugin.go
20
plugin.go
@@ -14,7 +14,6 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
. "m7s.live/m7s/v5/pkg"
|
||||
"m7s.live/m7s/v5/pkg/config"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
)
|
||||
|
||||
type DefaultYaml string
|
||||
@@ -116,16 +115,6 @@ func InstallPlugin[C iPlugin](options ...any) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func sendPromiseToServer[T any](server *Server, value T) (err error) {
|
||||
promise := util.NewPromise(value)
|
||||
server.eventChan <- promise
|
||||
<-promise.Done()
|
||||
if err = context.Cause(promise.Context); err == util.ErrResolve {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type Plugin struct {
|
||||
Unit
|
||||
Disabled bool
|
||||
@@ -236,7 +225,8 @@ func (p *Plugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
func (p *Plugin) Publish(streamPath string, options ...any) (publisher *Publisher, err error) {
|
||||
publisher = &Publisher{Publish: p.config.Publish}
|
||||
publisher.Init(p, streamPath, options...)
|
||||
return publisher, sendPromiseToServer(p.server, publisher)
|
||||
_, err = p.server.Call(publisher)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Puller, err error) {
|
||||
@@ -256,13 +246,15 @@ func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Pu
|
||||
}()
|
||||
}
|
||||
}
|
||||
return puller, sendPromiseToServer(p.server, puller)
|
||||
_, err = p.server.Call(puller)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subscriber, err error) {
|
||||
subscriber = &Subscriber{Subscribe: p.config.Subscribe}
|
||||
subscriber.Init(p, streamPath, options...)
|
||||
return subscriber, sendPromiseToServer(p.server, subscriber)
|
||||
_, err = p.server.Call(subscriber)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Plugin) registerHandler() {
|
||||
|
@@ -6,7 +6,6 @@ import (
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
"m7s.live/m7s/v5/plugin/rtmp/pkg"
|
||||
)
|
||||
|
||||
type AnnexB struct {
|
||||
@@ -57,13 +56,4 @@ type DemoPlugin struct {
|
||||
// }
|
||||
// }
|
||||
|
||||
func (p *DemoPlugin) OnPublish(publisher *m7s.Publisher) {
|
||||
subscriber, err := p.Subscribe(publisher.StreamPath)
|
||||
if err == nil {
|
||||
go subscriber.Handle(nil, func(v *rtmp.RTMPVideo) {
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var _ = m7s.InstallPlugin[DemoPlugin]()
|
||||
|
@@ -108,10 +108,11 @@ func (p *HDLPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer binary.BigEndian.PutUint32(b[:4], uint32(data.Length)+11)
|
||||
return gotFlvTag(append(net.Buffers{b[:]}, data.Buffers.Buffers...))
|
||||
}
|
||||
sub.Handle(func(audio *rtmp.RTMPAudio) error {
|
||||
return rtmpData2FlvTag(FLV_TAG_TYPE_AUDIO, &audio.RTMPData)
|
||||
}, func(video *rtmp.RTMPVideo) error {
|
||||
return rtmpData2FlvTag(FLV_TAG_TYPE_VIDEO, &video.RTMPData)
|
||||
})
|
||||
sub.Handle(m7s.SubscriberHandler{
|
||||
OnAudio: func(audio *rtmp.RTMPAudio) error {
|
||||
return rtmpData2FlvTag(FLV_TAG_TYPE_AUDIO, &audio.RTMPData)
|
||||
}, OnVideo: func(video *rtmp.RTMPVideo) error {
|
||||
return rtmpData2FlvTag(FLV_TAG_TYPE_VIDEO, &video.RTMPData)
|
||||
}})
|
||||
gotFlvTag(net.Buffers{b[:4]})
|
||||
}
|
||||
|
@@ -27,7 +27,6 @@ func (p *RTMPPlugin) OnInit() {
|
||||
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
defer conn.Close()
|
||||
logger := p.Logger.With("remote", conn.RemoteAddr().String())
|
||||
senders := make(map[uint32]*RTMPSender)
|
||||
receivers := make(map[uint32]*RTMPReceiver)
|
||||
var err error
|
||||
logger.Info("conn")
|
||||
@@ -156,18 +155,32 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
}
|
||||
case *PlayMessage:
|
||||
streamPath := nc.AppName + "/" + cmd.StreamName
|
||||
sender := &RTMPSender{}
|
||||
sender.NetConnection = nc
|
||||
sender.StreamID = cmd.StreamId
|
||||
ns := NetStream{
|
||||
NetConnection: nc,
|
||||
StreamID: cmd.StreamId,
|
||||
}
|
||||
var suber *m7s.Subscriber
|
||||
// sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
|
||||
sender.Subscriber, err = p.Subscribe(streamPath, ctx, conn)
|
||||
suber, err = p.Subscribe(streamPath, ctx, conn)
|
||||
if err != nil {
|
||||
err = sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
|
||||
err = ns.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
|
||||
} else {
|
||||
senders[sender.StreamID] = sender
|
||||
sender.BeginPlay(cmd.TransactionId)
|
||||
sender.Init()
|
||||
go sender.Handle(sender.SendAudio, sender.SendVideo)
|
||||
ns.BeginPlay(cmd.TransactionId)
|
||||
var audio, video AVSender
|
||||
audio.NetConnection = nc
|
||||
video.NetConnection = nc
|
||||
audio.ChunkStreamID = RTMP_CSID_AUDIO
|
||||
video.ChunkStreamID = RTMP_CSID_VIDEO
|
||||
audio.MessageTypeID = RTMP_MSG_AUDIO
|
||||
video.MessageTypeID = RTMP_MSG_VIDEO
|
||||
audio.MessageStreamID = ns.StreamID
|
||||
video.MessageStreamID = ns.StreamID
|
||||
go suber.Handle(m7s.SubscriberHandler{
|
||||
OnAudio: func(a *RTMPAudio) error {
|
||||
return audio.SendFrame(&a.RTMPData)
|
||||
}, OnVideo: func(v *RTMPVideo) error {
|
||||
return video.SendFrame(&v.RTMPData)
|
||||
}})
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error("sendMessage play", "error", err)
|
||||
@@ -178,7 +191,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
if r, ok := receivers[msg.MessageStreamID]; ok {
|
||||
r.WriteAudio(&RTMPAudio{msg.AVData})
|
||||
msg.AVData = RTMPData{}
|
||||
msg.AVData.ScalableMemoryAllocator = nc.ByteChunkPool
|
||||
msg.AVData.ScalableMemoryAllocator = nc.ReadPool
|
||||
} else {
|
||||
logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID)
|
||||
}
|
||||
@@ -186,7 +199,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
if r, ok := receivers[msg.MessageStreamID]; ok {
|
||||
r.WriteVideo(&RTMPVideo{msg.AVData})
|
||||
msg.AVData = RTMPData{}
|
||||
msg.AVData.ScalableMemoryAllocator = nc.ByteChunkPool
|
||||
msg.AVData.ScalableMemoryAllocator = nc.ReadPool
|
||||
} else {
|
||||
logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID)
|
||||
}
|
||||
|
@@ -80,3 +80,24 @@ func (h *ChunkHeader) WriteTo(t byte, b *util.Buffer) {
|
||||
b.WriteUint32(h.ExtendTimestamp)
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
ChunkHeader8 ChunkHeader
|
||||
ChunkHeader12 ChunkHeader
|
||||
ChunkHeader1 ChunkHeader
|
||||
IChunkHeader interface {
|
||||
WriteTo(*util.Buffer)
|
||||
}
|
||||
)
|
||||
|
||||
func (h *ChunkHeader8) WriteTo(b *util.Buffer) {
|
||||
(*ChunkHeader)(h).WriteTo(RTMP_CHUNK_HEAD_8, b)
|
||||
}
|
||||
|
||||
func (h *ChunkHeader12) WriteTo(b *util.Buffer) {
|
||||
(*ChunkHeader)(h).WriteTo(RTMP_CHUNK_HEAD_12, b)
|
||||
}
|
||||
|
||||
func (h *ChunkHeader1) WriteTo(b *util.Buffer) {
|
||||
(*ChunkHeader)(h).WriteTo(RTMP_CHUNK_HEAD_1, b)
|
||||
}
|
||||
|
@@ -67,14 +67,14 @@ var (
|
||||
// C2 S2 : 参考C1 S1
|
||||
|
||||
func (nc *NetConnection) ReadBuf(length int) (buf []byte, err error) {
|
||||
buf = nc.ByteChunkPool.Malloc(length)
|
||||
buf = nc.ReadPool.Malloc(length)
|
||||
_, err = io.ReadFull(nc.Reader, buf)
|
||||
return
|
||||
}
|
||||
|
||||
func (nc *NetConnection) Handshake() error {
|
||||
C0C1, err := nc.ReadBuf(C1S1_SIZE + 1)
|
||||
defer nc.ByteChunkPool.Free(C0C1)
|
||||
defer nc.ReadPool.Free(C0C1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -96,8 +96,8 @@ func (nc *NetConnection) Handshake() error {
|
||||
}
|
||||
|
||||
func (client *NetConnection) ClientHandshake() (err error) {
|
||||
C0C1 := client.ByteChunkPool.Malloc(C1S1_SIZE + 1)
|
||||
defer client.ByteChunkPool.Free(C0C1)
|
||||
C0C1 := client.ReadPool.Malloc(C1S1_SIZE + 1)
|
||||
defer client.ReadPool.Free(C0C1)
|
||||
C0C1[0] = RTMP_HANDSHAKE_VERSION
|
||||
if _, err = client.Write(C0C1); err == nil {
|
||||
// read S0 S1
|
||||
@@ -114,15 +114,15 @@ func (client *NetConnection) ClientHandshake() (err error) {
|
||||
}
|
||||
|
||||
func (nc *NetConnection) simple_handshake(C1 []byte) error {
|
||||
S0S1 := nc.ByteChunkPool.Malloc(C1S1_SIZE + 1)
|
||||
S0S1 := nc.ReadPool.Malloc(C1S1_SIZE + 1)
|
||||
S0S1[0] = RTMP_HANDSHAKE_VERSION
|
||||
util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF)
|
||||
copy(S0S1[5:], "Monibuca")
|
||||
nc.Write(S0S1)
|
||||
nc.Write(C1) // S2
|
||||
defer nc.ByteChunkPool.Free(S0S1)
|
||||
defer nc.ReadPool.Free(S0S1)
|
||||
C2, err := nc.ReadBuf(C1S1_SIZE)
|
||||
defer nc.ByteChunkPool.Free(C2)
|
||||
defer nc.ReadPool.Free(C2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -182,7 +182,7 @@ func (nc *NetConnection) complex_handshake(C1 []byte) error {
|
||||
buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest}
|
||||
buffer.WriteTo(nc)
|
||||
b, _ := nc.ReadBuf(1536)
|
||||
nc.ByteChunkPool.Free(b)
|
||||
nc.ReadPool.Free(b)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -2,20 +2,18 @@ package rtmp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"runtime"
|
||||
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
)
|
||||
|
||||
type AVSender struct {
|
||||
*RTMPSender
|
||||
*NetConnection
|
||||
ChunkHeader
|
||||
lastAbs uint32
|
||||
}
|
||||
|
||||
func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
|
||||
func (av *AVSender) SendFrame(frame *RTMPData) (err error) {
|
||||
// seq := frame.Sequence
|
||||
payloadLen := frame.Length
|
||||
if payloadLen == 0 {
|
||||
@@ -37,58 +35,21 @@ func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
|
||||
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
|
||||
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
|
||||
// 当Chunk Type为0时(即Chunk12),
|
||||
var chunkHeader util.Buffer = av.mem.Malloc(16)
|
||||
defer av.mem.Recycle()
|
||||
if av.lastAbs == 0 {
|
||||
av.SetTimestamp(frame.Timestamp)
|
||||
av.WriteTo(RTMP_CHUNK_HEAD_12, &chunkHeader)
|
||||
err = av.sendChunk(frame.Buffers, &av.ChunkHeader, RTMP_CHUNK_HEAD_12)
|
||||
} else {
|
||||
av.SetTimestamp(frame.Timestamp - av.lastAbs)
|
||||
av.WriteTo(RTMP_CHUNK_HEAD_8, &chunkHeader)
|
||||
err = av.sendChunk(frame.Buffers, &av.ChunkHeader, RTMP_CHUNK_HEAD_8)
|
||||
}
|
||||
av.lastAbs = frame.Timestamp
|
||||
// //数据被覆盖导致序号变了
|
||||
// if seq != frame.Sequence {
|
||||
// return errors.New("sequence is not equal")
|
||||
// }
|
||||
r := frame.Buffers
|
||||
var chunks net.Buffers
|
||||
// av.chunk = append(av.chunk, chunkHeader)
|
||||
chunks = append(chunks, chunkHeader)
|
||||
// var buffer util.Buffer = r.ToBytes()
|
||||
r.WriteNTo(av.WriteChunkSize, &chunks)
|
||||
for r.Length > 0 {
|
||||
chunkHeader = av.mem.Malloc(5)
|
||||
av.WriteTo(RTMP_CHUNK_HEAD_1, &chunkHeader)
|
||||
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
|
||||
chunks = append(chunks, chunkHeader)
|
||||
r.WriteNTo(av.WriteChunkSize, &chunks)
|
||||
}
|
||||
var nw int64
|
||||
nw, err = chunks.WriteTo(av.Conn)
|
||||
av.writeSeqNum += uint32(nw)
|
||||
return err
|
||||
}
|
||||
|
||||
type RTMPSender struct {
|
||||
*m7s.Subscriber
|
||||
NetStream
|
||||
audio, video AVSender
|
||||
mem util.RecyclableMemory
|
||||
}
|
||||
|
||||
func (r *RTMPSender) Init() {
|
||||
r.audio.RTMPSender = r
|
||||
r.video.RTMPSender = r
|
||||
r.audio.ChunkStreamID = RTMP_CSID_AUDIO
|
||||
r.video.ChunkStreamID = RTMP_CSID_VIDEO
|
||||
r.audio.MessageTypeID = RTMP_MSG_AUDIO
|
||||
r.video.MessageTypeID = RTMP_MSG_VIDEO
|
||||
r.audio.MessageStreamID = r.StreamID
|
||||
r.video.MessageStreamID = r.StreamID
|
||||
r.mem.ScalableMemoryAllocator = r.ByteChunkPool
|
||||
}
|
||||
|
||||
// func (rtmp *RTMPSender) OnEvent(event any) {
|
||||
// switch v := event.(type) {
|
||||
// case SEwaitPublish:
|
||||
@@ -114,13 +75,6 @@ func (r *RTMPSender) Init() {
|
||||
// }
|
||||
// }
|
||||
|
||||
func (r *RTMPSender) SendAudio(audio *RTMPAudio) error {
|
||||
return r.audio.sendFrame(&audio.RTMPData)
|
||||
}
|
||||
|
||||
func (r *RTMPSender) SendVideo(video *RTMPVideo) error {
|
||||
return r.video.sendFrame(&video.RTMPData)
|
||||
}
|
||||
|
||||
type RTMPReceiver struct {
|
||||
*m7s.Publisher
|
||||
|
@@ -61,13 +61,13 @@ type NetConnection struct {
|
||||
AppName string
|
||||
tmpBuf util.Buffer //用来接收/发送小数据,复用内存
|
||||
chunkHeader util.Buffer
|
||||
ByteChunkPool *util.ScalableMemoryAllocator
|
||||
chunk net.Buffers
|
||||
ReadPool *util.ScalableMemoryAllocator
|
||||
WritePool util.RecyclableMemory
|
||||
writing atomic.Bool // false 可写,true 不可写
|
||||
}
|
||||
|
||||
func NewNetConnection(conn net.Conn) *NetConnection {
|
||||
return &NetConnection{
|
||||
func NewNetConnection(conn net.Conn) (ret *NetConnection) {
|
||||
ret = &NetConnection{
|
||||
Conn: conn,
|
||||
Reader: bufio.NewReader(conn),
|
||||
WriteChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
|
||||
@@ -76,8 +76,10 @@ func NewNetConnection(conn net.Conn) *NetConnection {
|
||||
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
|
||||
tmpBuf: make(util.Buffer, 4),
|
||||
chunkHeader: make(util.Buffer, 0, 16),
|
||||
ByteChunkPool: util.NewScalableMemoryAllocator(2048),
|
||||
ReadPool: util.NewScalableMemoryAllocator(2048),
|
||||
}
|
||||
ret.WritePool.ScalableMemoryAllocator = util.NewScalableMemoryAllocator(1024)
|
||||
return
|
||||
}
|
||||
|
||||
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
|
||||
@@ -140,7 +142,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
|
||||
if !ok {
|
||||
chunk = &Chunk{}
|
||||
conn.incommingChunks[ChunkStreamID] = chunk
|
||||
chunk.AVData.ScalableMemoryAllocator = conn.ByteChunkPool
|
||||
chunk.AVData.ScalableMemoryAllocator = conn.ReadPool
|
||||
}
|
||||
|
||||
if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {
|
||||
@@ -261,7 +263,7 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
|
||||
switch msg.MessageTypeID {
|
||||
case RTMP_MSG_CHUNK_SIZE:
|
||||
conn.readChunkSize = int(msg.MsgData.(Uint32Message))
|
||||
// RTMPPlugin.Info("msg read chunk size", zap.Int("readChunkSize", conn.readChunkSize))
|
||||
conn.Info("msg read chunk size", "readChunkSize", conn.readChunkSize)
|
||||
case RTMP_MSG_ABORT:
|
||||
delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message)))
|
||||
case RTMP_MSG_ACK, RTMP_MSG_EDGE:
|
||||
@@ -308,26 +310,25 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
|
||||
if sid, ok := msg.(HaveStreamID); ok {
|
||||
head.MessageStreamID = sid.GetStreamID()
|
||||
}
|
||||
head.WriteTo(RTMP_CHUNK_HEAD_12, &conn.chunkHeader)
|
||||
for b := conn.tmpBuf; b.Len() > 0; {
|
||||
if b.CanReadN(conn.WriteChunkSize) {
|
||||
conn.sendChunk(b.ReadN(conn.WriteChunkSize))
|
||||
} else {
|
||||
conn.sendChunk(b)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return conn.sendChunk(*util.NewBuffersFromBytes(conn.tmpBuf), head, RTMP_CHUNK_HEAD_12)
|
||||
}
|
||||
|
||||
func (conn *NetConnection) sendChunk(writeBuffer ...[]byte) error {
|
||||
if n, err := conn.Write(conn.chunkHeader); err != nil {
|
||||
return err
|
||||
} else {
|
||||
conn.writeSeqNum += uint32(n)
|
||||
func (conn *NetConnection) sendChunk(r util.Buffers, head *ChunkHeader, headType byte) (err error) {
|
||||
var chunks net.Buffers
|
||||
var chunkHeader util.Buffer = conn.WritePool.Malloc(16)
|
||||
head.WriteTo(headType, &chunkHeader)
|
||||
chunks = append(chunks, chunkHeader)
|
||||
r.WriteNTo(conn.WriteChunkSize, &chunks)
|
||||
for r.Length > 0 {
|
||||
chunkHeader = conn.WritePool.Malloc(5)
|
||||
head.WriteTo(RTMP_CHUNK_HEAD_1, &chunkHeader)
|
||||
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
|
||||
chunks = append(chunks, chunkHeader)
|
||||
r.WriteNTo(conn.WriteChunkSize, &chunks)
|
||||
}
|
||||
buf := net.Buffers(writeBuffer)
|
||||
n, err := buf.WriteTo(conn)
|
||||
conn.writeSeqNum += uint32(n)
|
||||
var nw int64
|
||||
nw, err = chunks.WriteTo(conn.Conn)
|
||||
conn.writeSeqNum += uint32(nw)
|
||||
conn.WritePool.Recycle()
|
||||
return err
|
||||
}
|
||||
|
@@ -30,11 +30,11 @@ func (puller *RTMPPuller) Pull(p *m7s.Puller) (err error) {
|
||||
case RTMP_MSG_AUDIO:
|
||||
p.WriteAudio(&RTMPAudio{msg.AVData})
|
||||
msg.AVData = RTMPData{}
|
||||
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ByteChunkPool
|
||||
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ReadPool
|
||||
case RTMP_MSG_VIDEO:
|
||||
p.WriteVideo(&RTMPVideo{msg.AVData})
|
||||
msg.AVData = RTMPData{}
|
||||
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ByteChunkPool
|
||||
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ReadPool
|
||||
case RTMP_MSG_AMF0_COMMAND:
|
||||
cmd := msg.MsgData.(Commander).GetCommand()
|
||||
switch cmd.CommandName {
|
||||
|
82
server.go
82
server.go
@@ -244,48 +244,48 @@ func (s *Server) eventLoop() {
|
||||
}
|
||||
event = vv
|
||||
addPublisher(vv)
|
||||
}
|
||||
case *util.Promise[*Publisher]:
|
||||
|
||||
case *util.Promise[*Subscriber]:
|
||||
err := s.OnSubscribe(v.Value)
|
||||
if v.Fulfill(err); err != nil {
|
||||
continue
|
||||
}
|
||||
if nl := s.Subscribers.Length; nl > subCount {
|
||||
subCount = nl
|
||||
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.Value.Done())})
|
||||
}
|
||||
if !s.EnableSubEvent {
|
||||
continue
|
||||
}
|
||||
event = v.Value
|
||||
case *util.Promise[*Puller]:
|
||||
if _, ok := s.Pulls.Get(v.Value.StreamPath); ok {
|
||||
v.Fulfill(ErrStreamExist)
|
||||
continue
|
||||
} else {
|
||||
err := s.OnPublish(&v.Value.Publisher)
|
||||
v.Fulfill(err)
|
||||
if err != nil {
|
||||
case *Subscriber:
|
||||
err := s.OnSubscribe(vv)
|
||||
if v.Fulfill(err); err != nil {
|
||||
continue
|
||||
}
|
||||
if nl := s.Subscribers.Length; nl > subCount {
|
||||
subCount = nl
|
||||
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(vv.Done())})
|
||||
}
|
||||
if !s.EnableSubEvent {
|
||||
continue
|
||||
}
|
||||
s.Pulls.Add(v.Value)
|
||||
addPublisher(&v.Value.Publisher)
|
||||
event = v.Value
|
||||
case *Puller:
|
||||
if _, ok := s.Pulls.Get(vv.StreamPath); ok {
|
||||
v.Fulfill(ErrStreamExist)
|
||||
continue
|
||||
} else {
|
||||
err := s.OnPublish(&vv.Publisher)
|
||||
v.Fulfill(err)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s.Pulls.Add(vv)
|
||||
addPublisher(&vv.Publisher)
|
||||
event = v.Value
|
||||
}
|
||||
case *pb.StreamSnapRequest:
|
||||
if pub, ok := s.Streams.Get(vv.StreamPath); ok {
|
||||
v.Resolve(pub)
|
||||
} else {
|
||||
v.Fulfill(ErrNotFound)
|
||||
}
|
||||
continue
|
||||
case *pb.StopSubscribeRequest:
|
||||
if subscriber, ok := s.Subscribers.Get(int(vv.Id)); ok {
|
||||
subscriber.Stop(errors.New("stop by api"))
|
||||
v.Fulfill(nil)
|
||||
} else {
|
||||
v.Fulfill(ErrNotFound)
|
||||
}
|
||||
}
|
||||
case *util.Promise[*StreamSnapShot]:
|
||||
v.Value.Publisher, _ = s.Streams.Get(v.Value.StreamPath)
|
||||
v.Fulfill(nil)
|
||||
continue
|
||||
case *util.Promise[*pb.StopSubscribeRequest]:
|
||||
if subscriber, ok := s.Subscribers.Get(int(v.Value.Id)); ok {
|
||||
subscriber.Stop(errors.New("stop by api"))
|
||||
v.Fulfill(nil)
|
||||
} else {
|
||||
v.Fulfill(ErrNotFound)
|
||||
}
|
||||
continue
|
||||
}
|
||||
for _, plugin := range s.Plugins {
|
||||
if plugin.Disabled {
|
||||
@@ -405,5 +405,9 @@ func (s *Server) Call(arg any) (result any, err error) {
|
||||
promise := util.NewPromise(arg)
|
||||
s.eventChan <- promise
|
||||
<-promise.Done()
|
||||
return promise.Value, context.Cause(promise.Context)
|
||||
result = promise.Value
|
||||
if err = context.Cause(promise.Context); err == util.ErrResolve {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@@ -45,19 +45,18 @@ func (ps *PubSubBase) Init(p *Plugin, streamPath string, options ...any) {
|
||||
ps.StartTime = time.Now()
|
||||
}
|
||||
|
||||
type UnsubscribeEvent struct {
|
||||
*Subscriber
|
||||
}
|
||||
|
||||
type Subscriber struct {
|
||||
PubSubBase
|
||||
config.Subscribe
|
||||
Publisher *Publisher
|
||||
}
|
||||
|
||||
type ISubscriberHandler[T IAVFrame] func(data T)
|
||||
type SubscriberHandler struct {
|
||||
OnAudio any
|
||||
OnVideo any
|
||||
}
|
||||
|
||||
func (s *Subscriber) Handle(audioHandler, videoHandler any) {
|
||||
func (s *Subscriber) Handle(handler SubscriberHandler) {
|
||||
var ar, vr *AVRingReader
|
||||
var ah, vh reflect.Value
|
||||
var a1, v1 reflect.Type
|
||||
@@ -67,11 +66,11 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
|
||||
subMode, _ = strconv.Atoi(s.Args.Get(s.SubModeArgName))
|
||||
}
|
||||
var audioFrame, videoFrame, lastSentAF, lastSentVF *AVFrame
|
||||
if audioHandler != nil && s.SubAudio {
|
||||
a1 = reflect.TypeOf(audioHandler).In(0)
|
||||
if handler.OnAudio != nil && s.SubAudio {
|
||||
a1 = reflect.TypeOf(handler.OnAudio).In(0)
|
||||
}
|
||||
if videoHandler != nil && s.SubVideo {
|
||||
v1 = reflect.TypeOf(videoHandler).In(0)
|
||||
if handler.OnVideo != nil && s.SubVideo {
|
||||
v1 = reflect.TypeOf(handler.OnVideo).In(0)
|
||||
}
|
||||
createAudioReader := func() {
|
||||
if s.Publisher == nil || a1 == nil {
|
||||
@@ -81,7 +80,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
|
||||
ar = NewAVRingReader(at)
|
||||
ar.Logger = s.Logger.With("reader", a1.String())
|
||||
ar.Info("start read")
|
||||
ah = reflect.ValueOf(audioHandler)
|
||||
ah = reflect.ValueOf(handler.OnAudio)
|
||||
}
|
||||
}
|
||||
createVideoReader := func() {
|
||||
@@ -92,7 +91,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
|
||||
vr = NewAVRingReader(vt)
|
||||
vr.Logger = s.Logger.With("reader", v1.String())
|
||||
vr.Info("start read")
|
||||
vh = reflect.ValueOf(videoHandler)
|
||||
vh = reflect.ValueOf(handler.OnVideo)
|
||||
}
|
||||
}
|
||||
createAudioReader()
|
||||
|
Reference in New Issue
Block a user