fix: appName

This commit is contained in:
langhuihui
2024-03-07 20:48:58 +08:00
parent 887c9bb6c3
commit efe5c2b0ee
3 changed files with 38 additions and 27 deletions

View File

@@ -54,7 +54,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
RTMPPlugin.Error("handshake", zap.Error(err)) RTMPPlugin.Error("handshake", zap.Error(err))
return nil, err return nil, err
} }
client.appName = ps[1] client.appName = strings.Join(ps[1:len(ps)-1], "/")
err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize)) err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize))
if err != nil { if err != nil {
return return

51
msg.go
View File

@@ -80,7 +80,7 @@ func (h ChunkHeader) Clone() *ChunkHeader {
} }
type RtmpMessage interface { type RtmpMessage interface {
Encode(*util.Buffer) Encode(util.IAMF)
} }
type HaveStreamID interface { type HaveStreamID interface {
GetStreamID() uint32 GetStreamID() uint32
@@ -293,6 +293,9 @@ func decodeCommandAMF0(chunk *Chunk, body []byte) {
amf.ReadObject(), amf.ReadObject(),
amf.ReadObject(), "", amf.ReadObject(), "",
} }
if response.Infomation == nil && response.Properties != nil {
response.Infomation = response.Properties
}
codef := zap.String("code", response.Infomation["code"].(string)) codef := zap.String("code", response.Infomation["code"].(string))
switch response.Infomation["level"] { switch response.Infomation["level"] {
case Level_Status: case Level_Status:
@@ -339,8 +342,8 @@ func (cmd *CommandMessage) GetCommand() *CommandMessage {
return cmd return cmd
} }
func (msg *CommandMessage) Encode(buf *util.Buffer) { func (msg *CommandMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil) buf.Marshals(msg.CommandName, msg.TransactionId, nil)
} }
// Protocol control message 1. // Protocol control message 1.
@@ -349,7 +352,7 @@ func (msg *CommandMessage) Encode(buf *util.Buffer) {
// chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the senders subsequent chunks until further notice // chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the senders subsequent chunks until further notice
type Uint32Message uint32 type Uint32Message uint32
func (msg Uint32Message) Encode(buf *util.Buffer) { func (msg Uint32Message) Encode(buf util.IAMF) {
binary.BigEndian.PutUint32(buf.Malloc(4), uint32(msg)) binary.BigEndian.PutUint32(buf.Malloc(4), uint32(msg))
} }
@@ -374,7 +377,7 @@ type SetPeerBandwidthMessage struct {
LimitType byte LimitType byte
} }
func (msg *SetPeerBandwidthMessage) Encode(buf *util.Buffer) { func (msg *SetPeerBandwidthMessage) Encode(buf util.IAMF) {
buf.WriteUint32(msg.AcknowledgementWindowsize) buf.WriteUint32(msg.AcknowledgementWindowsize)
buf.WriteByte(msg.LimitType) buf.WriteByte(msg.LimitType)
} }
@@ -406,10 +409,10 @@ type CallMessage struct {
Optional map[string]any `json:",omitempty"` Optional map[string]any `json:",omitempty"`
} }
func (msg *CallMessage) Encode(buf *util.Buffer) { func (msg *CallMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object) buf.Marshals(msg.CommandName, msg.TransactionId, msg.Object)
if msg.Optional != nil { if msg.Optional != nil {
buf.MarshalAMFs(msg.Optional) buf.Marshals(msg.Optional)
} }
} }
@@ -465,7 +468,7 @@ type PlayMessage struct {
// Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束 // Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束
// Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush // Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush
func (msg *PlayMessage) Encode(buf *util.Buffer) { func (msg *PlayMessage) Encode(buf util.IAMF) {
// if msg.Start > 0 { // if msg.Start > 0 {
// amf.writeNumber(msg.Start) // amf.writeNumber(msg.Start)
// } // }
@@ -475,7 +478,7 @@ func (msg *PlayMessage) Encode(buf *util.Buffer) {
// } // }
// amf.writeBool(msg.Reset) // amf.writeBool(msg.Reset)
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000) buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000)
} }
/* /*
@@ -548,8 +551,8 @@ type PublishMessage struct {
// “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件. // “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件.
// “live”:发布直播数据而不录制到文件 // “live”:发布直播数据而不录制到文件
func (msg *PublishMessage) Encode(buf *util.Buffer) { func (msg *PublishMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType) buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType)
} }
// Seek Message // Seek Message
@@ -590,8 +593,8 @@ type ResponseConnectMessage struct {
Infomation map[string]any `json:",omitempty"` Infomation map[string]any `json:",omitempty"`
} }
func (msg *ResponseConnectMessage) Encode(buf *util.Buffer) { func (msg *ResponseConnectMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) buf.Marshals(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
} }
/* /*
@@ -616,8 +619,8 @@ type ResponseCreateStreamMessage struct {
StreamId uint32 StreamId uint32
} }
func (msg *ResponseCreateStreamMessage) Encode(buf *util.Buffer) { func (msg *ResponseCreateStreamMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId) buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamId)
} }
/* /*
@@ -647,8 +650,8 @@ type ResponsePlayMessage struct {
func (msg *ResponsePlayMessage) GetStreamID() uint32 { func (msg *ResponsePlayMessage) GetStreamID() uint32 {
return msg.StreamID return msg.StreamID
} }
func (msg *ResponsePlayMessage) Encode(buf *util.Buffer) { func (msg *ResponsePlayMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Infomation) buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.Infomation)
} }
/* /*
@@ -684,8 +687,8 @@ func (msg *ResponsePublishMessage) GetStreamID() uint32 {
// 属性 -> null // 属性 -> null
// 信息 -> level, code, description // 信息 -> level, code, description
func (msg *ResponsePublishMessage) Encode(buf *util.Buffer) { func (msg *ResponsePublishMessage) Encode(buf util.IAMF) {
buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) buf.Marshals(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
} }
/* /*
@@ -744,7 +747,7 @@ type StreamIDMessage struct {
StreamID uint32 StreamID uint32
} }
func (msg *StreamIDMessage) Encode(buffer *util.Buffer) { func (msg *StreamIDMessage) Encode(buffer util.IAMF) {
buffer.WriteUint16(msg.EventType) buffer.WriteUint16(msg.EventType)
msg.EventData = buffer.Malloc(4) msg.EventData = buffer.Malloc(4)
binary.BigEndian.PutUint32(msg.EventData, msg.StreamID) binary.BigEndian.PutUint32(msg.EventData, msg.StreamID)
@@ -760,7 +763,7 @@ type SetBufferMessage struct {
Millisecond uint32 Millisecond uint32
} }
func (msg *SetBufferMessage) Encode(buf *util.Buffer) { func (msg *SetBufferMessage) Encode(buf util.IAMF) {
buf.WriteUint16(msg.EventType) buf.WriteUint16(msg.EventType)
msg.EventData = buf.Malloc(8) msg.EventData = buf.Malloc(8)
binary.BigEndian.PutUint32(msg.EventData, msg.StreamID) binary.BigEndian.PutUint32(msg.EventData, msg.StreamID)
@@ -776,12 +779,12 @@ type PingRequestMessage struct {
Timestamp uint32 Timestamp uint32
} }
func (msg *PingRequestMessage) Encode(buf *util.Buffer) { func (msg *PingRequestMessage) Encode(buf util.IAMF) {
buf.WriteUint16(msg.EventType) buf.WriteUint16(msg.EventType)
msg.EventData = buf.Malloc(4) msg.EventData = buf.Malloc(4)
binary.BigEndian.PutUint32(msg.EventData, msg.Timestamp) binary.BigEndian.PutUint32(msg.EventData, msg.Timestamp)
} }
func (msg *UserControlMessage) Encode(buf *util.Buffer) { func (msg *UserControlMessage) Encode(buf util.IAMF) {
buf.WriteUint16(msg.EventType) buf.WriteUint16(msg.EventType)
} }

View File

@@ -9,6 +9,7 @@ import (
"runtime" "runtime"
"sync/atomic" "sync/atomic"
"go.uber.org/zap"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
@@ -259,7 +260,7 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
switch msg.MessageTypeID { switch msg.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE: case RTMP_MSG_CHUNK_SIZE:
conn.readChunkSize = int(msg.MsgData.(Uint32Message)) conn.readChunkSize = int(msg.MsgData.(Uint32Message))
println("read chunk size", conn.readChunkSize) RTMPPlugin.Info("msg read chunk size", zap.Int("readChunkSize", conn.readChunkSize))
case RTMP_MSG_ABORT: case RTMP_MSG_ABORT:
delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message))) delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message)))
case RTMP_MSG_ACK, RTMP_MSG_EDGE: case RTMP_MSG_ACK, RTMP_MSG_EDGE:
@@ -293,7 +294,14 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
} }
defer conn.writing.Store(false) defer conn.writing.Store(false)
conn.tmpBuf.Reset() conn.tmpBuf.Reset()
msg.Encode(&conn.tmpBuf) amf := util.AMF{conn.tmpBuf}
if conn.objectEncoding == 0 {
msg.Encode(&amf)
} else {
amf := util.AMF3{AMF: amf}
msg.Encode(&amf)
}
conn.tmpBuf = amf.Buffer
head := newChunkHeader(t) head := newChunkHeader(t)
head.MessageLength = uint32(conn.tmpBuf.Len()) head.MessageLength = uint32(conn.tmpBuf.Len())
if sid, ok := msg.(HaveStreamID); ok { if sid, ok := msg.(HaveStreamID); ok {