适配引擎改造

This commit is contained in:
dexter
2022-02-17 21:50:31 +08:00
parent b46cf00f65
commit 48649e394b
8 changed files with 343 additions and 270 deletions

16
amf.go
View File

@@ -1,8 +1,8 @@
package rtmp
import (
"github.com/Monibuca/engine/v4/util"
"go.uber.org/zap"
)
// Action Message Format -- AMF 0
@@ -97,14 +97,14 @@ func (amf *AMF) decodeObject() (obj AMFValue) {
case AMF0_OBJECT:
return amf.readObject()
case AMF0_MOVIECLIP:
plugin.Println("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)")
plugin.Error("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)")
case AMF0_NULL:
return amf.readNull()
case AMF0_UNDEFINED:
amf.ReadByte()
return Undefined
case AMF0_REFERENCE:
plugin.Println("reference-type.(AMF0_REFERENCE)")
plugin.Error("reference-type.(AMF0_REFERENCE)")
case AMF0_ECMA_ARRAY:
return amf.readECMAArray()
case AMF0_END_OBJECT:
@@ -118,15 +118,15 @@ func (amf *AMF) decodeObject() (obj AMFValue) {
AMF0_XML_DOCUMENT:
return amf.readLongString()
case AMF0_UNSUPPORTED:
plugin.Println("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)")
plugin.Error("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)")
case AMF0_RECORDSET:
plugin.Println("This type is not supported and is reserved for future use.(AMF0_RECORDSET)")
plugin.Error("This type is not supported and is reserved for future use.(AMF0_RECORDSET)")
case AMF0_TYPED_OBJECT:
plugin.Println("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)")
plugin.Error("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)")
case AMF0_AVMPLUS_OBJECT:
plugin.Println("AMF0_AVMPLUS_OBJECT")
plugin.Error("AMF0_AVMPLUS_OBJECT")
default:
plugin.Warnf("Unsupported type %v", t)
plugin.Error("Unsupported type", zap.Uint8("type", t))
}
return nil
}

135
client.go
View File

@@ -8,24 +8,21 @@ import (
"github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/util"
"go.uber.org/zap"
)
type RTMPClient struct {
NetConnection
}
func (client *RTMPClient) Connect(addr string) bool {
func NewRTMPClient(addr string) (client *NetConnection) {
u, err := url.Parse(addr)
if err != nil {
plugin.Error(err)
return false
plugin.Error("connect url parse", zap.Error(err))
return
}
conn, err := net.Dial("tcp", u.Host)
if err != nil {
plugin.Error(err)
return false
plugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err))
return
}
client.NetConnection = NetConnection{
client = &NetConnection{
TCPConn: conn.(*net.TCPConn),
Reader: bufio.NewReader(conn),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
@@ -36,10 +33,10 @@ func (client *RTMPClient) Connect(addr string) bool {
tmpBuf: make([]byte, 4),
// subscribers: make(map[uint32]*engine.Subscriber),
}
err = client.Handshake()
err = client.ClientHandshake()
if err != nil {
plugin.Error(err)
return false
plugin.Error("handshake", zap.Error(err))
return nil
}
connectArg := make(AMFObject)
connectArg["swfUrl"] = addr
@@ -51,7 +48,7 @@ func (client *RTMPClient) Connect(addr string) bool {
for {
msg, err := client.RecvMessage()
if err != nil {
return false
return nil
}
switch msg.MessageTypeID {
case RTMP_MSG_AMF0_COMMAND:
@@ -60,35 +57,45 @@ func (client *RTMPClient) Connect(addr string) bool {
case "_result":
response := msg.MsgData.(*ResponseMessage)
if response.Infomation["code"] == NetConnection_Connect_Success {
return true
return
} else {
return false
return nil
}
}
}
}
}
var _ engine.IPusher = (*RTMPPusher)(nil)
var _ engine.IPuller = (*RTMPPuller)(nil)
type RTMPPusher struct {
RTMPSender
engine.Pusher
RTMPClient
}
func (pusher *RTMPPusher) OnEvent(event any) any {
pusher.RTMPSender.OnEvent(event)
switch event.(type) {
case *engine.Stream:
pusher.NetConnection = NewRTMPClient(pusher.RemoteURL)
if pusher.NetConnection != nil {
pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil)
go pusher.push()
}
case engine.PushEvent:
pusher.PushCount++
if pusher.Stream == nil {
if plugin.Subscribe(pusher.StreamPath, pusher) {
}
}
}
return event
}
func (pusher *RTMPPusher) push() {
SendMedia(&pusher.NetConnection, &pusher.Subscriber)
pusher.NetConnection.Close()
}
func (pusher *RTMPPusher) Push(count int) {
if pusher.Connect(pusher.RemoteURL) {
pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil)
defer pusher.Unsubscribe()
for {
msg, err := pusher.RecvMessage()
if err != nil {
return
break
}
switch msg.MessageTypeID {
case RTMP_MSG_AMF0_COMMAND:
@@ -96,12 +103,22 @@ func (pusher *RTMPPusher) Push(count int) {
switch cmd.CommandName {
case "_result":
if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
arg := make(AMFObject)
arg["streamid"] = response.StreamId
pusher.SendCommand(SEND_PUBLISH_START_MESSAGE, arg)
pusher.StreamID = response.StreamId
m := &PublishMessage{
CURDStreamMessage{
CommandMessage{
"publish",
0,
},
response.StreamId,
},
pusher.Stream.StreamName,
"live",
}
pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
} else if response, ok := msg.MsgData.(*ResponsePublishMessage); ok {
if response.Infomation["code"] == "NetStream.Publish.Start" {
go pusher.push()
} else {
return
}
@@ -109,29 +126,60 @@ func (pusher *RTMPPusher) Push(count int) {
}
}
}
if !pusher.Stream.IsClosed() && pusher.Reconnect() {
pusher.OnEvent(engine.PullEvent(pusher.PushCount))
}
}
type RTMPPuller struct {
RTMPReceiver
engine.Puller
RTMPClient
}
func (puller *RTMPPuller) OnEvent(event any) any {
puller.RTMPReceiver.OnEvent(event)
switch event.(type) {
case *engine.Stream:
puller.NetConnection = NewRTMPClient(puller.RemoteURL)
if puller.NetConnection != nil {
puller.absTs = make(map[uint32]uint32)
puller.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil)
go puller.pull()
break
}
case engine.PullEvent:
puller.PullCount++
if puller.Stream == nil {
if plugin.Publish(puller.StreamPath, puller) {
break
}
}
}
return event
}
func (puller *RTMPPuller) pull() {
defer puller.Unpublish()
for {
msg, err := puller.RecvMessage()
if err != nil {
return
break
}
switch msg.MessageTypeID {
case RTMP_MSG_AUDIO:
puller.ReceiveAudio(msg)
case RTMP_MSG_VIDEO:
puller.ReceiveVideo(msg)
// case RTMP_MSG_AMF0_COMMAND:
// cmd := msg.MsgData.(Commander).GetCommand()
// switch cmd.CommandName {
// case "_result":
case RTMP_MSG_AMF0_COMMAND:
cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName {
case "_result":
if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
puller.StreamID = response.StreamId
m := &PlayMessage{}
m.CommandMessage.CommandName = "play"
m.StreamName = puller.Stream.StreamName
puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// if response, ok := msg.MsgData.(*ResponsePlayMessage); ok {
// if response.Object["code"] == "NetStream.Play.Start" {
@@ -141,15 +189,8 @@ func (puller *RTMPPuller) pull() {
// } else {
// return errors.New("pull faild")
// }
// }
// }
}
}
}
}
}
func (puller *RTMPPuller) Pull(count int) {
if puller.Connect(puller.RemoteURL) {
puller.SendCommand(SEND_PLAY_MESSAGE, AMFObject{"StreamPath": puller.StreamName})
puller.MediaReceiver = NewMediaReceiver(&puller.Publisher)
go puller.pull()
}
}

View File

@@ -90,7 +90,7 @@ func (nc *NetConnection) Handshake() error {
return nc.complex_handshake(C1)
}
func (client *RTMPClient) Handshake() error {
func (client *NetConnection) ClientHandshake() error {
C0C1 := make([]byte, 1536+1)
C0C1[0] = RTMP_HANDSHAKE_VERSION
client.Write(C0C1)

33
main.go
View File

@@ -2,11 +2,10 @@ package rtmp
import (
"context"
"errors"
. "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/config"
. "github.com/logrusorgru/aurora"
"go.uber.org/zap"
)
type RTMPConfig struct {
@@ -18,13 +17,15 @@ type RTMPConfig struct {
ChunkSize int
}
var _ PullPlugin = (*RTMPConfig)(nil)
func (config *RTMPConfig) Update(override config.Config) {
plugin.Info(Green("server rtmp start at"), BrightBlue(config.ListenAddr))
plugin.Info("server rtmp start at", zap.String("listen addr", config.ListenAddr))
err := config.Listen(plugin, config)
if err == context.Canceled {
plugin.Println(err)
plugin.Info("rtmp listen shutdown")
} else {
plugin.Fatal(err)
plugin.Fatal("rtmp server", zap.Error(err))
}
}
@@ -33,22 +34,16 @@ var plugin = InstallPlugin(&RTMPConfig{
TCP: config.TCP{ListenAddr: ":1935"},
})
func (config *RTMPConfig) PullStream(streamPath string, puller Puller) error {
var client RTMPPuller
client.Puller = puller
if client.Publish(streamPath, &client, config.Publish) {
return nil
} else {
return errors.New("publish faild")
func (config *RTMPConfig) PullStream(puller Puller) {
client := RTMPPuller{
Puller: puller,
}
client.OnEvent(PullEvent(0))
}
func (config *RTMPConfig) PushStream(stream *Stream, pusher Pusher) error {
var client RTMPPusher
client.ID = "RTMPPusher"
client.Pusher = pusher
if client.Subscribe(stream.Path,config.Subscribe) {
client.Pusher.Push(&client, config.Push)
func (config *RTMPConfig) PushStream(pusher Pusher) {
client := RTMPPusher{
Pusher: pusher,
}
return nil
client.OnEvent(PushEvent(0))
}

136
media.go
View File

@@ -3,55 +3,125 @@ package rtmp
import (
"net"
"github.com/Monibuca/engine/v4"
. "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/track"
"github.com/Monibuca/engine/v4/util"
)
func SendMedia(nc *NetConnection, sub *Subscriber) (err error) {
vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack()
if vt != nil {
frame := vt.DecoderConfiguration
err = nc.sendAVMessage(0, net.Buffers(frame.AVCC), false, true)
sub.OnVideo = func(frame *engine.VideoFrame) error {
return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false)
type RTMPSender struct {
Subscriber
NetStream
}
func (rtmp *RTMPSender) OnEvent(event any) any {
rtmp.Subscriber.OnEvent(event)
switch v := event.(type) {
case TrackRemoved:
//TODO
case *track.Audio:
isPlaying := rtmp.IsPlaying()
if rtmp.AddTrack(v) {
if v.CodecID == codec.CodecID_AAC {
rtmp.sendAVMessage(0, net.Buffers{rtmp.Subscriber.AudioTrack.DecoderConfiguration.AVCC}, false, true)
}
// 如果不订阅视频则遇到音频也播放,否则需要等视频先播放
if !isPlaying && !rtmp.Config.SubVideo {
go rtmp.Play()
}
}
if at != nil {
sub.OnAudio = func(frame *engine.AudioFrame) (err error) {
if at.CodecID == codec.CodecID_AAC {
frame := at.DecoderConfiguration
err = nc.sendAVMessage(0, net.Buffers{frame.AVCC}, true, true)
case *track.Video:
isPlaying := rtmp.IsPlaying()
if rtmp.AddTrack(v) {
rtmp.sendAVMessage(0, net.Buffers(rtmp.Subscriber.VideoTrack.DecoderConfiguration.AVCC), true, true)
if !isPlaying {
go rtmp.Play()
}
}
case *AudioFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false)
case *VideoFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false)
}
return event
}
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值
// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同
func (sender *RTMPSender) sendAVMessage(ts uint32, payload net.Buffers, isAudio bool, isFirst bool) (err error) {
if sender.writeSeqNum > sender.bandwidth {
sender.totalWrite += sender.writeSeqNum
sender.writeSeqNum = 0
sender.SendMessage(RTMP_MSG_ACK, Uint32Message(sender.totalWrite))
sender.SendStreamID(RTMP_USER_PING_REQUEST, 0)
}
var head *ChunkHeader
if isAudio {
head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_AUDIO, sender.StreamID, 0)
} else {
err = nc.sendAVMessage(0, frame.AVCC, true, true)
head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_VIDEO, sender.StreamID, 0)
}
sub.OnAudio = func(frame *engine.AudioFrame) error {
return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, true, false)
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12),
var chunk1 net.Buffers
if isFirst {
chunk1 = append(chunk1, sender.encodeChunk12(head))
} else {
chunk1 = append(chunk1, sender.encodeChunk8(head))
}
return
chunks := util.SplitBuffers(payload, sender.writeChunkSize)
chunk1 = append(chunk1, chunks[0]...)
sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1))
_, err = chunk1.WriteTo(sender.NetConnection)
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
for _, chunk := range chunks[1:] {
chunk1 = net.Buffers{sender.encodeChunk1(head)}
chunk1 = append(chunk1, chunk...)
sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1))
_, err = chunk1.WriteTo(sender.NetConnection)
}
}
sub.Play(at, vt)
return nil
}
func NewMediaReceiver(pub *Publisher) *MediaReceiver {
return &MediaReceiver{
pub,
make(map[uint32]uint32), pub.NewVideoTrack(), pub.NewAudioTrack(),
func (r *RTMPSender) Response(code, level string) error {
m := new(ResponsePlayMessage)
m.CommandName = Response_OnStatus
m.TransactionId = 0
m.Object = AMFObject{
"code": code,
"level": level,
"clientid": 1,
}
m.StreamID = r.StreamID
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
type MediaReceiver struct {
*Publisher
type RTMPReceiver struct {
Publisher
NetStream
absTs map[uint32]uint32
vt *track.UnknowVideo
at *track.UnknowAudio
}
func (r *MediaReceiver) ReceiveAudio(msg *Chunk) {
plugin.Tracef("rec_audio chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp)
func (r *RTMPReceiver) Response(code, level string) error {
m := new(ResponsePublishMessage)
m.CommandName = Response_OnStatus
m.TransactionId = 0
m.Infomation = AMFObject{
"code": code,
"level": level,
"clientid": 1,
}
m.StreamID = r.StreamID
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
// plugin.Tracef("rec_audio chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp)
if msg.ChunkType == 0 {
r.absTs[msg.ChunkStreamID] = 0
}
@@ -60,10 +130,10 @@ func (r *MediaReceiver) ReceiveAudio(msg *Chunk) {
} else {
r.absTs[msg.ChunkStreamID] += msg.Timestamp
}
r.at.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body)
r.AudioTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body)
}
func (r *MediaReceiver) ReceiveVideo(msg *Chunk) {
plugin.Tracef("rev_video chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp)
func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
// plugin.Tracef("rev_video chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp)
if msg.ChunkType == 0 {
r.absTs[msg.ChunkStreamID] = 0
}
@@ -72,5 +142,5 @@ func (r *MediaReceiver) ReceiveVideo(msg *Chunk) {
} else {
r.absTs[msg.ChunkStreamID] += msg.Timestamp
}
r.vt.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body)
r.VideoTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body)
}

33
msg.go
View File

@@ -6,6 +6,7 @@ import (
"strings"
"github.com/Monibuca/engine/v4/util"
"go.uber.org/zap"
)
const (
@@ -217,6 +218,7 @@ func decodeCommandAMF0(chunk *Chunk) {
chunk.MsgData = &CreateStreamMessage{
cmdMsg, amf.readObject(),
}
case "play":
amf.readNull()
chunk.MsgData = &PlayMessage{
@@ -239,7 +241,10 @@ func decodeCommandAMF0(chunk *Chunk) {
case "publish":
amf.readNull()
chunk.MsgData = &PublishMessage{
CURDStreamMessage{
cmdMsg,
chunk.MessageStreamID,
},
amf.readString(),
amf.readString(),
}
@@ -286,13 +291,14 @@ func decodeCommandAMF0(chunk *Chunk) {
amf.readObject(),
amf.readObject(), "",
}
codef := zap.String("code", response.Infomation["code"].(string))
switch response.Infomation["level"] {
case Level_Status:
plugin.Infof("_result :", response.Infomation["code"])
plugin.Info("_result :", codef)
case Level_Warning:
plugin.Warnf("_result :", response.Infomation["code"])
plugin.Warn("_result :", codef)
case Level_Error:
plugin.Errorf("_result :", response.Infomation["code"])
plugin.Error("_result :", codef)
}
if strings.HasPrefix(response.Infomation["code"].(string), "NetStream.Publish") {
chunk.MsgData = &ResponsePublishMessage{
@@ -313,7 +319,7 @@ func decodeCommandAMF0(chunk *Chunk) {
}
case "FCPublish", "FCUnpublish":
default:
plugin.Println("decode command amf0 cmd:", cmd)
plugin.Info("decode command amf0 ", zap.String("cmd", cmd))
}
}
@@ -477,7 +483,7 @@ type PlayMessage struct {
StreamName string
Start uint64
Duration uint64
Rest bool
Reset bool
}
// 命令名 -> 命令名,设置为”play”
@@ -502,7 +508,7 @@ func (msg *PlayMessage) Encode() []byte {
amf.writeNumber(float64(msg.Duration))
}
amf.writeBool(msg.Rest)
amf.writeBool(msg.Reset)
return amf.Buffer
}
@@ -532,6 +538,10 @@ type CURDStreamMessage struct {
StreamId uint32
}
func (msg *CURDStreamMessage) GetStreamID() uint32 {
return msg.StreamId
}
func (msg *CURDStreamMessage) Encode0() {
}
@@ -557,7 +567,7 @@ func (msg *ReceiveAVMessage) Encode0() {
// The client sends the publish command to publish a named stream to the server. Using this name,
// any client can play this stream and receive the published audio, video, and data messages
type PublishMessage struct {
CommandMessage
CURDStreamMessage
PublishingName string
PublishingType string
}
@@ -572,7 +582,14 @@ type PublishMessage struct {
// “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件.
// “live”:发布直播数据而不录制到文件
func (msg *PublishMessage) Encode0() {
func (msg *PublishMessage) Encode0() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
amf.writeString(msg.PublishingName)
amf.writeString(msg.PublishingType)
return amf.Buffer
}
// Seek Message

View File

@@ -28,7 +28,6 @@ const (
SEND_CONNECT_RESPONSE_MESSAGE = "Send Connect Response Message"
SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message"
SEND_CREATE_STREAM_RESPONSE_MESSAGE = "Send Create Stream Response Message"
SEND_PLAY_MESSAGE = "Send Play Message"
SEND_PLAY_RESPONSE_MESSAGE = "Send Play Response Message"
@@ -76,8 +75,6 @@ func newPlayResponseMessageData(streamid uint32, code, level string) (amfobj AMF
}
type NetConnection struct {
*MediaReceiver
subscribers map[uint32]*Subscriber
*bufio.Reader
*net.TCPConn
bandwidth uint32
@@ -88,35 +85,30 @@ type NetConnection struct {
writeChunkSize int
readChunkSize int
incompleteRtmpBody map[uint32]util.Buffer // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来
streamID uint32 // 流ID
rtmpHeader map[uint32]*ChunkHeader // RtmpHeader
objectEncoding float64
appName string
tmpBuf []byte //用来接收小数据,复用内存
}
func (conn *NetConnection) Close() {
if conn.MediaReceiver != nil {
conn.UnPublish()
}
conn.TCPConn.Close()
for _, s := range conn.subscribers {
s.Close()
}
}
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
return io.ReadFull(conn.Reader, buf)
}
func (conn *NetConnection) SendStreamID0(eventType uint16) (err error) {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, 0})
}
func (conn *NetConnection) SendStreamID(eventType uint16) (err error) {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, conn.streamID})
func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID})
}
func (conn *NetConnection) SendUserControl(eventType uint16) error {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &UserControlMessage{EventType: eventType})
}
func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) error {
m := &ResponseCreateStreamMessage{}
m.CommandName = Response_Result
m.TransactionId = tid
m.StreamId = streamID
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
func (conn *NetConnection) SendCommand(message string, args any) error {
switch message {
// case SEND_SET_BUFFER_LENGTH_MESSAGE:
@@ -137,16 +129,6 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
m.CommandName = "createStream"
m.TransactionId = 2
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_CREATE_STREAM_RESPONSE_MESSAGE:
tid, ok := args.(uint64)
if !ok {
return errors.New(SEND_CREATE_STREAM_RESPONSE_MESSAGE + ", The parameter only one(TransactionId uint64)!")
}
m := &ResponseCreateStreamMessage{}
m.CommandName = Response_Result
m.TransactionId = tid
m.StreamId = conn.streamID
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_PLAY_MESSAGE:
data, ok := args.(AMFObject)
if !ok {
@@ -156,14 +138,14 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
m.CommandName = "play"
m.TransactionId = 1
for i, v := range data {
if i == "StreamPath" {
if i == "StreamName" {
m.StreamName = v.(string)
} else if i == "Start" {
m.Start = v.(uint64)
} else if i == "Duration" {
m.Duration = v.(uint64)
} else if i == "Rest" {
m.Rest = v.(bool)
} else if i == "Reset" {
m.Reset = v.(bool)
}
}
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
@@ -246,16 +228,7 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
m.Object = obj
m.Optional = info
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_PUBLISH_RESPONSE_MESSAGE, SEND_PUBLISH_START_MESSAGE:
info := args.(AMFObject)
info["clientid"] = 1
m := new(ResponsePublishMessage)
m.CommandName = Response_OnStatus
m.TransactionId = 0
m.Infomation = info
m.StreamID = info["streamid"].(uint32)
delete(info, "streamid")
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_UNPUBLISH_RESPONSE_MESSAGE:
data, ok := args.(AMFObject)
if !ok {
@@ -269,48 +242,6 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
return errors.New("send message no exist")
}
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值
// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同
func (conn *NetConnection) sendAVMessage(ts uint32, payload net.Buffers, isAudio bool, isFirst bool) (err error) {
if conn.writeSeqNum > conn.bandwidth {
conn.totalWrite += conn.writeSeqNum
conn.writeSeqNum = 0
conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite))
conn.SendStreamID0(RTMP_USER_PING_REQUEST)
}
var head *ChunkHeader
if isAudio {
head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_AUDIO, conn.streamID, 0)
} else {
head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_VIDEO, conn.streamID, 0)
}
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12),
var chunk1 net.Buffers
if isFirst {
chunk1 = append(chunk1, conn.encodeChunk12(head))
} else {
chunk1 = append(chunk1, conn.encodeChunk8(head))
}
chunks := util.SplitBuffers(payload, conn.writeChunkSize)
chunk1 = append(chunk1, chunks[0]...)
conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk1))
_, err = chunk1.WriteTo(conn)
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
for _, chunk := range chunks[1:] {
chunk1 = net.Buffers{conn.encodeChunk1(head)}
chunk1 = append(chunk1, chunk...)
conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk1))
_, err = chunk1.WriteTo(conn)
}
return nil
}
func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
head, err := conn.ReadByte()
conn.readSeqNum++
@@ -357,7 +288,6 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
needRead = unRead
}
if n, err := conn.ReadFull(currentBody.Malloc(needRead)); err != nil {
plugin.Error(err)
return nil, err
} else {
conn.readSeqNum += uint32(n)
@@ -550,7 +480,7 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
conn.totalWrite += conn.writeSeqNum
conn.writeSeqNum = 0
err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite))
err = conn.SendStreamID0(RTMP_USER_PING_REQUEST)
err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0)
}
var chunk = net.Buffers{conn.encodeChunk12(head)}
if len(body) > conn.writeChunkSize {

View File

@@ -2,17 +2,30 @@ package rtmp
import (
"bufio"
"context"
"fmt"
"net"
"sync/atomic"
"github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/util"
"go.uber.org/zap"
)
type NetStream struct {
*NetConnection
StreamID uint32
}
func (ns *NetStream) Begin() {
ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID)
}
var gstreamid = uint32(64)
func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
senders := make(map[uint32]*RTMPSender)
receivers := make(map[uint32]*RTMPReceiver)
nc := NetConnection{
TCPConn: conn,
Reader: bufio.NewReader(conn),
@@ -22,12 +35,13 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
incompleteRtmpBody: make(map[uint32]util.Buffer),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make([]byte, 4),
subscribers: make(map[uint32]*engine.Subscriber),
}
ctx, cancel := context.WithCancel(engine.Engine)
defer nc.Close()
defer cancel()
/* Handshake */
if err := nc.Handshake(); err != nil {
plugin.Error("handshake", err)
plugin.Error("handshake", zap.Error(err))
return
}
for {
@@ -41,7 +55,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
break
}
cmd := msg.MsgData.(Commander).GetCommand()
plugin.Debugf("recv cmd '%s'", cmd.CommandName)
plugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
switch cmd.CommandName {
case "connect":
connect := msg.MsgData.(*CallMessage)
@@ -51,7 +65,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
nc.objectEncoding = objectEncoding.(float64)
}
nc.appName = app.(string)
plugin.Infof("connect app:'%s',objectEncoding:%v", nc.appName, objectEncoding)
plugin.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
nc.writeChunkSize = config.ChunkSize
err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
@@ -59,59 +73,61 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
AcknowledgementWindowsize: uint32(512 << 10),
LimitType: byte(2),
})
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0)
err = nc.SendCommand(SEND_CONNECT_RESPONSE_MESSAGE, nc.objectEncoding)
case "createStream":
nc.streamID = atomic.AddUint32(&gstreamid, 1)
plugin.Info("createStream:", nc.streamID)
err = nc.SendCommand(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId)
if err != nil {
plugin.Error(err)
return
}
streamId := atomic.AddUint32(&gstreamid, 1)
plugin.Info("createStream:", zap.Uint32("streamId", streamId))
nc.ResponseCreateStream(cmd.TransactionId, streamId)
case "publish":
pm := msg.MsgData.(*PublishMessage)
var puber engine.Publisher
if puber.Publish(nc.appName+"/"+pm.PublishingName, &nc, config.Publish) {
nc.MediaReceiver = NewMediaReceiver(&puber)
nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
receiver := &RTMPReceiver{
NetStream: NetStream{
NetConnection: &nc,
StreamID: pm.StreamId,
},
}
receiver.OnEvent(ctx)
if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) {
receiver.absTs = make(map[uint32]uint32)
receiver.Begin()
err = receiver.Response(NetStream_Publish_Start, Level_Status)
} else {
err = nc.SendCommand(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
err = receiver.Response(NetStream_Publish_BadName, Level_Error)
}
case "play":
pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + pm.StreamName
subscriber := &engine.Subscriber{
Type: "RTMP",
ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID),
sender := &RTMPSender{
NetStream: NetStream{
NetConnection: &nc,
StreamID: msg.MessageStreamID,
},
}
if subscriber.Subscribe(streamPath, config.Subscribe) {
nc.subscribers[nc.streamID] = subscriber
err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED)
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
go func() {
SendMedia(&nc, subscriber)
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Stop, Level_Status))
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Complete, Level_Status))
}()
sender.OnEvent(ctx)
sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
if plugin.Subscribe(streamPath, sender) {
senders[msg.MessageStreamID] = sender
err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED, msg.MessageStreamID)
sender.Begin()
sender.Response(NetStream_Play_Reset, Level_Status)
sender.Response(NetStream_Play_Start, Level_Status)
} else {
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Failed, Level_Error))
sender.Response(NetStream_Play_Failed, Level_Error)
}
case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := nc.subscribers[cm.StreamId]; ok {
stream.Close()
delete(nc.subscribers, cm.StreamId)
if stream, ok := senders[cm.StreamId]; ok {
stream.Unsubscribe()
delete(senders, cm.StreamId)
}
case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage)
amfobj := make(AMFObject)
if nc.Stream != nil && nc.Stream.AppName == nc.appName && nc.Stream.StreamName == cm.StreamName {
p, ok := receivers[msg.MessageStreamID]
if ok {
amfobj["level"] = "_result"
nc.Stream.UnPublish()
p.Unpublish()
} else {
amfobj["level"] = "_error"
}
@@ -119,12 +135,16 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
err = nc.SendCommand(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
}
case RTMP_MSG_AUDIO:
nc.ReceiveAudio(msg)
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveAudio(msg)
}
case RTMP_MSG_VIDEO:
nc.ReceiveVideo(msg)
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveVideo(msg)
}
}
} else {
plugin.Error(err)
plugin.Error("receive", zap.Error(err))
return
}
}