mirror of
https://github.com/Monibuca/plugin-rtmp.git
synced 2025-10-01 05:42:30 +08:00
182 lines
4.8 KiB
Go
182 lines
4.8 KiB
Go
package rtmp
|
||
|
||
import (
|
||
"errors"
|
||
"net"
|
||
"runtime"
|
||
|
||
"go.uber.org/zap"
|
||
. "m7s.live/engine/v4"
|
||
"m7s.live/engine/v4/common"
|
||
)
|
||
|
||
type AVSender struct {
|
||
*RTMPSender
|
||
ChunkHeader
|
||
firstSent bool
|
||
}
|
||
|
||
func (av *AVSender) sendSequenceHead(seqHead []byte) {
|
||
av.SetTimestamp(0)
|
||
av.MessageLength = uint32(len(seqHead))
|
||
for !av.writing.CompareAndSwap(false, true) {
|
||
runtime.Gosched()
|
||
}
|
||
defer av.writing.Store(false)
|
||
av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
|
||
av.sendChunk(seqHead)
|
||
}
|
||
|
||
func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) {
|
||
seq := frame.Sequence
|
||
payloadLen := frame.AVCC.ByteLength
|
||
if payloadLen == 0 {
|
||
err := errors.New("payload is empty")
|
||
av.Error("payload is empty", zap.Error(err))
|
||
return err
|
||
}
|
||
if av.writeSeqNum > av.bandwidth {
|
||
av.totalWrite += av.writeSeqNum
|
||
av.writeSeqNum = 0
|
||
av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite))
|
||
av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
|
||
}
|
||
av.MessageLength = uint32(payloadLen)
|
||
for !av.writing.CompareAndSwap(false, true) {
|
||
runtime.Gosched()
|
||
}
|
||
defer av.writing.Store(false)
|
||
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
|
||
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
|
||
// 当Chunk Type为0时(即Chunk12),
|
||
if !av.firstSent {
|
||
av.firstSent = true
|
||
av.SetTimestamp(absTime)
|
||
av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
|
||
} else {
|
||
av.SetTimestamp(frame.DeltaTime)
|
||
av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
|
||
}
|
||
//数据被覆盖导致序号变了
|
||
if seq != frame.Sequence {
|
||
return errors.New("sequence is not equal")
|
||
}
|
||
r := frame.AVCC.NewReader()
|
||
chunk := net.Buffers{av.chunkHeader}
|
||
av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
|
||
for r.CanRead() {
|
||
item := av.bytePool.Get(16)
|
||
defer item.Recycle()
|
||
av.WriteTo(RTMP_CHUNK_HEAD_1, &item.Value)
|
||
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
|
||
chunk = append(chunk, item.Value)
|
||
av.writeSeqNum += uint32(item.Value.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
|
||
}
|
||
_, err = chunk.WriteTo(av.Conn)
|
||
return nil
|
||
}
|
||
|
||
type RTMPSender struct {
|
||
Subscriber
|
||
NetStream
|
||
audio, video AVSender
|
||
}
|
||
|
||
func (rtmp *RTMPSender) OnEvent(event any) {
|
||
switch v := event.(type) {
|
||
case SEwaitPublish:
|
||
rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
|
||
case SEpublish:
|
||
rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
|
||
case ISubscriber:
|
||
rtmp.audio.RTMPSender = rtmp
|
||
rtmp.video.RTMPSender = rtmp
|
||
rtmp.audio.ChunkStreamID = RTMP_CSID_AUDIO
|
||
rtmp.video.ChunkStreamID = RTMP_CSID_VIDEO
|
||
rtmp.audio.MessageTypeID = RTMP_MSG_AUDIO
|
||
rtmp.video.MessageTypeID = RTMP_MSG_VIDEO
|
||
rtmp.audio.MessageStreamID = rtmp.StreamID
|
||
rtmp.video.MessageStreamID = rtmp.StreamID
|
||
case AudioDeConf:
|
||
rtmp.audio.sendSequenceHead(v)
|
||
case VideoDeConf:
|
||
rtmp.video.sendSequenceHead(v)
|
||
case AudioFrame:
|
||
if err := rtmp.audio.sendFrame(v.AVFrame, v.AbsTime); err != nil {
|
||
rtmp.Stop(zap.Error(err))
|
||
}
|
||
case VideoFrame:
|
||
if err := rtmp.video.sendFrame(v.AVFrame, v.AbsTime); err != nil {
|
||
rtmp.Stop(zap.Error(err))
|
||
}
|
||
default:
|
||
rtmp.Subscriber.OnEvent(event)
|
||
}
|
||
}
|
||
|
||
func (r *RTMPSender) Response(tid uint64, code, level string) error {
|
||
m := new(ResponsePlayMessage)
|
||
m.CommandName = Response_OnStatus
|
||
m.TransactionId = tid
|
||
m.Infomation = map[string]any{
|
||
"code": code,
|
||
"level": level,
|
||
"description": "",
|
||
}
|
||
m.StreamID = r.StreamID
|
||
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||
}
|
||
|
||
type RTMPReceiver struct {
|
||
Publisher
|
||
NetStream
|
||
}
|
||
|
||
func (r *RTMPReceiver) OnEvent(event any) {
|
||
switch v := event.(type) {
|
||
case IPublisher:
|
||
if r.Equal(v) { //第一任
|
||
|
||
} else { // 使用前任的track,因为订阅者都挂在前任的上面
|
||
r.Publisher.OnEvent(event)
|
||
if r.AudioTrack != nil {
|
||
r.AudioTrack.SetStuff(r.bytePool)
|
||
}
|
||
if r.VideoTrack != nil {
|
||
r.VideoTrack.SetStuff(r.bytePool)
|
||
}
|
||
}
|
||
default:
|
||
r.IO.OnEvent(event)
|
||
}
|
||
}
|
||
|
||
func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
|
||
m := new(ResponsePublishMessage)
|
||
m.CommandName = Response_OnStatus
|
||
m.TransactionId = tid
|
||
m.Infomation = map[string]any{
|
||
"code": code,
|
||
"level": level,
|
||
"description": "",
|
||
}
|
||
m.StreamID = r.StreamID
|
||
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||
}
|
||
|
||
func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
|
||
if r.AudioTrack == nil {
|
||
r.WriteAVCCAudio(0, &msg.AVData, r.bytePool)
|
||
return
|
||
}
|
||
r.AudioTrack.WriteAVCC(msg.ExtendTimestamp, &msg.AVData)
|
||
}
|
||
|
||
func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
|
||
if r.VideoTrack == nil {
|
||
r.WriteAVCCVideo(0, &msg.AVData, r.bytePool)
|
||
return
|
||
}
|
||
r.VideoTrack.WriteAVCC(msg.ExtendTimestamp, &msg.AVData)
|
||
}
|