From c4bc6d97423ae71ea2c5bfedbb4c4418bfefabb0 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Mon, 30 Jan 2023 22:17:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- media.go | 140 ++++++++++++++++++++++------------------------- netConnection.go | 2 +- 2 files changed, 66 insertions(+), 76 deletions(-) diff --git a/media.go b/media.go index 72622d0..cb8f707 100644 --- a/media.go +++ b/media.go @@ -8,13 +8,62 @@ import ( "m7s.live/engine/v4/common" ) +type AVSender struct { + *RTMPSender + ChunkHeader + firstSent bool +} + +func (av *AVSender) sendSequenceHead(seqHead []byte) { + av.SetTimestamp(0) + av.MessageLength = uint32(len(seqHead)) + av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader) + av.sendChunk(seqHead) +} + +func (av *AVSender) sendFrame(frame *common.AVFrame,absTime uint32) (err error) { + payloadLen := frame.AVCC.ByteLength + if payloadLen == 0 { + err := errors.New("payload is empty") + av.Error("payload is empty", zap.Error(err)) + return err + } + if av.writeSeqNum > av.bandwidth { + av.totalWrite += av.writeSeqNum + av.writeSeqNum = 0 + av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite)) + av.SendStreamID(RTMP_USER_PING_REQUEST, 0) + } + av.MessageLength = uint32(payloadLen) + // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) + // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) + // 当Chunk Type为0时(即Chunk12), + if !av.firstSent { + av.firstSent = true + av.SetTimestamp(absTime) + av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader) + } else { + av.SetTimestamp(frame.DeltaTime) + av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader) + } + r := frame.AVCC.NewReader() + chunk := r.ReadN(av.writeChunkSize) + // payloadLen -= util.SizeOfBuffers(chunk) + av.sendChunk(chunk...) + if r.CanRead() { + // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) + for av.WriteTo(RTMP_CHUNK_HEAD_1, &av.chunkHeader); r.CanRead(); av.sendChunk(chunk...) { + chunk = r.ReadN(av.writeChunkSize) + // payloadLen -= util.SizeOfBuffers(chunk) + } + } + return nil +} + type RTMPSender struct { Subscriber NetStream - firstAudioSent bool - firstVideoSent bool - audioChunkHeader ChunkHeader - videoChunkHeader ChunkHeader + audio, video AVSender } func (rtmp *RTMPSender) OnEvent(event any) { @@ -24,86 +73,27 @@ func (rtmp *RTMPSender) OnEvent(event any) { case SEpublish: rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus) case ISubscriber: - rtmp.audioChunkHeader.ChunkStreamID = RTMP_CSID_AUDIO - rtmp.videoChunkHeader.ChunkStreamID = RTMP_CSID_VIDEO - rtmp.audioChunkHeader.MessageTypeID = RTMP_MSG_AUDIO - rtmp.videoChunkHeader.MessageTypeID = RTMP_MSG_VIDEO - rtmp.audioChunkHeader.MessageStreamID = rtmp.StreamID - rtmp.videoChunkHeader.MessageStreamID = rtmp.StreamID + rtmp.audio.RTMPSender = rtmp + rtmp.video.RTMPSender = rtmp + rtmp.audio.ChunkStreamID = RTMP_CSID_AUDIO + rtmp.video.ChunkStreamID = RTMP_CSID_VIDEO + rtmp.audio.MessageTypeID = RTMP_MSG_AUDIO + rtmp.video.MessageTypeID = RTMP_MSG_VIDEO + rtmp.audio.MessageStreamID = rtmp.StreamID + rtmp.video.MessageStreamID = rtmp.StreamID case AudioDeConf: - rtmp.audioChunkHeader.SetTimestamp(0) - rtmp.audioChunkHeader.MessageLength = uint32(len(v)) - rtmp.audioChunkHeader.WriteTo(RTMP_CHUNK_HEAD_12, &rtmp.chunkHeader) - rtmp.sendChunk(v) + rtmp.audio.sendSequenceHead(v) case VideoDeConf: - rtmp.videoChunkHeader.SetTimestamp(0) - rtmp.videoChunkHeader.MessageLength = uint32(len(v)) - rtmp.videoChunkHeader.WriteTo(RTMP_CHUNK_HEAD_12, &rtmp.chunkHeader) - rtmp.sendChunk(v) + rtmp.video.sendSequenceHead(v) case AudioFrame: - rtmp.sendAVMessage(v.AVFrame, v.AbsTime, true) + rtmp.audio.sendFrame(v.AVFrame, v.AbsTime) case VideoFrame: - rtmp.sendAVMessage(v.AVFrame, v.AbsTime, false) + rtmp.video.sendFrame(v.AVFrame, v.AbsTime) default: rtmp.Subscriber.OnEvent(event) } } -// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 -// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 -// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 -func (sender *RTMPSender) sendAVMessage(frame *common.AVFrame, absTime uint32, isAudio bool) (err error) { - payloadLen := frame.AVCC.ByteLength - if payloadLen == 0 { - err := errors.New("payload is empty") - sender.Error("payload is empty", zap.Error(err)) - return err - } - if sender.writeSeqNum > sender.bandwidth { - sender.totalWrite += sender.writeSeqNum - sender.writeSeqNum = 0 - sender.SendMessage(RTMP_MSG_ACK, Uint32Message(sender.totalWrite)) - sender.SendStreamID(RTMP_USER_PING_REQUEST, 0) - } - var head *ChunkHeader - - var isFirst = false - if isAudio { - head = &sender.audioChunkHeader - if isFirst = !sender.firstAudioSent; isFirst { - sender.firstAudioSent = true - } - } else { - head = &sender.videoChunkHeader - if isFirst = !sender.firstVideoSent; isFirst { - sender.firstVideoSent = true - } - } - head.MessageLength = uint32(payloadLen) - // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) - // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) - // 当Chunk Type为0时(即Chunk12), - if isFirst { - head.SetTimestamp(absTime) - head.WriteTo(RTMP_CHUNK_HEAD_12, &sender.chunkHeader) - } else { - head.SetTimestamp(frame.DeltaTime) - head.WriteTo(RTMP_CHUNK_HEAD_8, &sender.chunkHeader) - } - r := frame.AVCC.NewReader() - chunk := r.ReadN(sender.writeChunkSize) - // payloadLen -= util.SizeOfBuffers(chunk) - sender.sendChunk(chunk...) - if r.CanRead() { - // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) - for head.WriteTo(RTMP_CHUNK_HEAD_1, &sender.chunkHeader); r.CanRead(); sender.sendChunk(chunk...) { - chunk = r.ReadN(sender.writeChunkSize) - // payloadLen -= util.SizeOfBuffers(chunk) - } - } - return nil -} - func (r *RTMPSender) Response(tid uint64, code, level string) error { m := new(ResponsePlayMessage) m.CommandName = Response_OnStatus diff --git a/netConnection.go b/netConnection.go index 34b2858..d28e5a4 100644 --- a/netConnection.go +++ b/netConnection.go @@ -70,7 +70,7 @@ func NewNetConnection(conn net.Conn) *NetConnection { bandwidth: RTMP_MAX_CHUNK_SIZE << 3, tmpBuf: make(util.Buffer, 4), chunkHeader: make(util.Buffer, 0, 16), - bytePool: make(util.BytesPool, 16), + bytePool: make(util.BytesPool, 17), } } func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {