防止并发写网络

This commit is contained in:
dexter
2023-02-03 10:16:55 +08:00
parent 4615ca2472
commit f42849b149
3 changed files with 18 additions and 2 deletions

2
go.mod
View File

@@ -1,6 +1,6 @@
module m7s.live/plugin/rtmp/v4
go 1.18
go 1.19
require (
go.uber.org/zap v1.21.0

View File

@@ -2,6 +2,7 @@ package rtmp
import (
"errors"
"runtime"
"go.uber.org/zap"
. "m7s.live/engine/v4"
@@ -17,11 +18,15 @@ type AVSender struct {
func (av *AVSender) sendSequenceHead(seqHead []byte) {
av.SetTimestamp(0)
av.MessageLength = uint32(len(seqHead))
for !av.writing.CompareAndSwap(false, true) {
runtime.Gosched()
}
defer av.writing.Store(false)
av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
av.sendChunk(seqHead)
}
func (av *AVSender) sendFrame(frame *common.AVFrame,absTime uint32) (err error) {
func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) {
payloadLen := frame.AVCC.ByteLength
if payloadLen == 0 {
err := errors.New("payload is empty")
@@ -35,6 +40,10 @@ func (av *AVSender) sendFrame(frame *common.AVFrame,absTime uint32) (err error)
av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
}
av.MessageLength = uint32(payloadLen)
for !av.writing.CompareAndSwap(false, true) {
runtime.Gosched()
}
defer av.writing.Store(false)
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12),

View File

@@ -6,6 +6,8 @@ import (
"errors"
"io"
"net"
"runtime"
"sync/atomic"
"m7s.live/engine/v4/util"
)
@@ -58,6 +60,7 @@ type NetConnection struct {
tmpBuf util.Buffer //用来接收/发送小数据,复用内存
chunkHeader util.Buffer
bytePool util.BytesPool
writing atomic.Bool // false 可写true 不可写
}
func NewNetConnection(conn net.Conn) *NetConnection {
@@ -291,6 +294,10 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite))
err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0)
}
for !conn.writing.CompareAndSwap(false, true) {
runtime.Gosched()
}
defer conn.writing.Store(false)
conn.tmpBuf.Reset()
msg.Encode(&conn.tmpBuf)
head := newChunkHeader(t)