diff --git a/go.mod b/go.mod index 75d8355..63fe803 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module m7s.live/plugin/rtmp/v4 -go 1.18 +go 1.19 require ( go.uber.org/zap v1.21.0 diff --git a/media.go b/media.go index cb8f707..bce3bca 100644 --- a/media.go +++ b/media.go @@ -2,6 +2,7 @@ package rtmp import ( "errors" + "runtime" "go.uber.org/zap" . "m7s.live/engine/v4" @@ -17,11 +18,15 @@ type AVSender struct { func (av *AVSender) sendSequenceHead(seqHead []byte) { av.SetTimestamp(0) av.MessageLength = uint32(len(seqHead)) + for !av.writing.CompareAndSwap(false, true) { + runtime.Gosched() + } + defer av.writing.Store(false) av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader) av.sendChunk(seqHead) } -func (av *AVSender) sendFrame(frame *common.AVFrame,absTime uint32) (err error) { +func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) { payloadLen := frame.AVCC.ByteLength if payloadLen == 0 { err := errors.New("payload is empty") @@ -35,6 +40,10 @@ func (av *AVSender) sendFrame(frame *common.AVFrame,absTime uint32) (err error) av.SendStreamID(RTMP_USER_PING_REQUEST, 0) } av.MessageLength = uint32(payloadLen) + for !av.writing.CompareAndSwap(false, true) { + runtime.Gosched() + } + defer av.writing.Store(false) // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) // 当Chunk Type为0时(即Chunk12), diff --git a/netConnection.go b/netConnection.go index d28e5a4..9102631 100644 --- a/netConnection.go +++ b/netConnection.go @@ -6,6 +6,8 @@ import ( "errors" "io" "net" + "runtime" + "sync/atomic" "m7s.live/engine/v4/util" ) @@ -58,6 +60,7 @@ type NetConnection struct { tmpBuf util.Buffer //用来接收/发送小数据,复用内存 chunkHeader util.Buffer bytePool util.BytesPool + writing atomic.Bool // false 可写,true 不可写 } func NewNetConnection(conn net.Conn) *NetConnection { @@ -291,6 +294,10 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) { err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite)) err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0) } + for !conn.writing.CompareAndSwap(false, true) { + runtime.Gosched() + } + defer conn.writing.Store(false) conn.tmpBuf.Reset() msg.Encode(&conn.tmpBuf) head := newChunkHeader(t)