将amf迁入engine中方便服用

This commit is contained in:
dexter
2023-01-16 09:27:14 +08:00
parent 7f00f32cb7
commit a0b55d5e1b
6 changed files with 84 additions and 427 deletions

197
msg.go
View File

@@ -6,6 +6,7 @@ import (
"strings"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/util"
)
@@ -198,36 +199,36 @@ func GetRtmpMessage(chunk *Chunk) error {
// object类型要复杂点.
// 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束
func decodeCommandAMF0(chunk *Chunk) {
amf := AMF{chunk.Body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去.
cmd := amf.readString() // rtmp_amf.go, 将payload的bytes类型转换成string类型.
amf := codec.AMF{chunk.Body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去.
cmd := amf.ReadShortString() // rtmp_amf.go, 将payload的bytes类型转换成string类型.
cmdMsg := CommandMessage{
cmd,
uint64(amf.readNumber()),
uint64(amf.ReadNumber()),
}
switch cmd {
case "connect", "call":
chunk.MsgData = &CallMessage{
cmdMsg,
amf.readObject(),
amf.readObject(),
amf.ReadObject(),
amf.ReadObject(),
}
case "createStream":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &cmdMsg
case "play":
amf.readNull()
amf.Unmarshal()
m := &PlayMessage{
CURDStreamMessage{
cmdMsg,
chunk.MessageStreamID,
},
amf.readString(),
amf.ReadShortString(),
float64(-2),
float64(-1),
true,
}
for i := 0; i < 3; i++ {
if v := amf.decodeObject(); v != nil {
if v, _ := amf.Unmarshal(); v != nil {
switch vv := v.(type) {
case float64:
if i == 0 {
@@ -245,67 +246,67 @@ func decodeCommandAMF0(chunk *Chunk) {
}
chunk.MsgData = m
case "play2":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &Play2Message{
cmdMsg,
uint64(amf.readNumber()),
amf.readString(),
amf.readString(),
uint64(amf.readNumber()),
amf.readString(),
uint64(amf.ReadNumber()),
amf.ReadShortString(),
amf.ReadShortString(),
uint64(amf.ReadNumber()),
amf.ReadShortString(),
}
case "publish":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &PublishMessage{
CURDStreamMessage{
cmdMsg,
chunk.MessageStreamID,
},
amf.readString(),
amf.readString(),
amf.ReadShortString(),
amf.ReadShortString(),
}
case "pause":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &PauseMessage{
cmdMsg,
amf.readBool(),
uint64(amf.readNumber()),
amf.ReadBool(),
uint64(amf.ReadNumber()),
}
case "seek":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &SeekMessage{
cmdMsg,
uint64(amf.readNumber()),
uint64(amf.ReadNumber()),
}
case "deleteStream", "closeStream":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &CURDStreamMessage{
cmdMsg,
uint32(amf.readNumber()),
uint32(amf.ReadNumber()),
}
case "releaseStream":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &ReleaseStreamMessage{
cmdMsg,
amf.readString(),
amf.ReadShortString(),
}
case "receiveAudio", "receiveVideo":
amf.readNull()
amf.Unmarshal()
chunk.MsgData = &ReceiveAVMessage{
cmdMsg,
amf.readBool(),
amf.ReadBool(),
}
case Response_Result, Response_Error, Response_OnStatus:
if cmdMsg.TransactionId == 2 {
chunk.MsgData = &ResponseCreateStreamMessage{
cmdMsg, amf.readObject(), uint32(amf.readNumber()),
cmdMsg, amf.ReadObject(), uint32(amf.ReadNumber()),
}
return
}
response := &ResponseMessage{
cmdMsg,
amf.readObject(),
amf.readObject(), "",
amf.ReadObject(),
amf.ReadObject(), "",
}
codef := zap.String("code", response.Infomation["code"].(string))
switch response.Infomation["level"] {
@@ -360,11 +361,7 @@ func (cmd *CommandMessage) GetCommand() *CommandMessage {
}
func (msg *CommandMessage) Encode() (b []byte) {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil)
}
// Protocol control message 1.
@@ -430,29 +427,23 @@ type MetadataMessage struct {
// The called RPC name is passed as a parameter to the call command.
type CallMessage struct {
CommandMessage
Object AMFObject `json:",omitempty"`
Optional AMFObject `json:",omitempty"`
Object map[string]any `json:",omitempty"`
Optional map[string]any `json:",omitempty"`
}
func (msg *CallMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Object)
var amf codec.AMF
amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object)
if msg.Optional != nil {
amf.writeObject(msg.Optional)
amf.Marshal(msg.Optional)
}
return amf.Buffer
}
func (msg *CallMessage) Encode3() []byte {
var amf AMF
var amf codec.AMF
amf.WriteByte(0)
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Object)
amf.writeObject(msg.Optional)
return amf.Buffer
return amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object, msg.Optional)
}
// Create Stream Message.
@@ -502,12 +493,7 @@ type PlayMessage struct {
// Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush
func (msg *PlayMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
amf.writeString(msg.StreamName)
amf.writeNumber(-2000)
var amf codec.AMF
// if msg.Start > 0 {
// amf.writeNumber(msg.Start)
// }
@@ -517,7 +503,7 @@ func (msg *PlayMessage) Encode() []byte {
// }
// amf.writeBool(msg.Reset)
return amf.Buffer
return amf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000)
}
/*
@@ -591,13 +577,7 @@ type PublishMessage struct {
// “live”:发布直播数据而不录制到文件
func (msg *PublishMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
amf.writeString(msg.PublishingName)
amf.writeString(msg.PublishingType)
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType)
}
// Seek Message
@@ -631,22 +611,15 @@ func (msg *PauseMessage) Encode0() {
// Response Message. Server -> Response -> Client
//
//
// Response Connect Message
//
type ResponseConnectMessage struct {
CommandMessage
Properties AMFObject `json:",omitempty"`
Infomation AMFObject `json:",omitempty"`
Properties map[string]any `json:",omitempty"`
Infomation map[string]any `json:",omitempty"`
}
func (msg *ResponseConnectMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Properties)
amf.writeObject(msg.Infomation)
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
}
/*
@@ -654,38 +627,25 @@ func (msg *ResponseConnectMessage) Encode3() {
}*/
// Response Call Message
//
type ResponseCallMessage struct {
CommandMessage
Object AMFObject
Response AMFObject
Object map[string]any
Response map[string]any
}
func (msg *ResponseCallMessage) Encode0() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Object)
amf.writeObject(msg.Response)
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object, msg.Response)
}
//
// Response Create Stream Message
//
type ResponseCreateStreamMessage struct {
CommandMessage
Object interface{} `json:",omitempty"`
Object any `json:",omitempty"`
StreamId uint32
}
func (msg *ResponseCreateStreamMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
amf.writeNumber(float64(msg.StreamId))
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId)
}
/*
@@ -693,23 +653,21 @@ func (msg *ResponseCreateStreamMessage) Encode3() {
}*/
func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) {
amf := AMF{chunk.Body}
msg.CommandName = amf.decodeObject().(string)
msg.TransactionId = uint64(amf.decodeObject().(float64))
amf.decodeObject()
msg.StreamId = uint32(amf.decodeObject().(float64))
amf := codec.AMF{chunk.Body}
msg.CommandName = amf.ReadShortString()
msg.TransactionId = uint64(amf.ReadNumber())
amf.Unmarshal()
msg.StreamId = uint32(amf.ReadNumber())
}
func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) {
chunk.Body = chunk.Body[1:]
msg.Decode0(chunk)
}
//
// Response Play Message
//
type ResponsePlayMessage struct {
CommandMessage
Object AMFObject `json:",omitempty"`
Object map[string]any `json:",omitempty"`
Description string
StreamID uint32
}
@@ -718,13 +676,7 @@ func (msg *ResponsePlayMessage) GetStreamID() uint32 {
return msg.StreamID
}
func (msg *ResponsePlayMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
amf.writeObject(msg.Object)
amf.writeString(msg.Description)
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Object, msg.Description)
}
/*
@@ -732,23 +684,21 @@ func (msg *ResponsePlayMessage) Encode3() {
}*/
func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) {
amf := AMF{chunk.Body}
msg.CommandName = amf.decodeObject().(string)
msg.TransactionId = uint64(amf.decodeObject().(float64))
msg.Object = amf.decodeObject().(AMFObject)
amf := codec.AMF{chunk.Body}
msg.CommandName = amf.ReadShortString()
msg.TransactionId = uint64(amf.ReadNumber())
msg.Object = amf.ReadObject()
}
func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) {
chunk.Body = chunk.Body[1:]
msg.Decode0(chunk)
}
//
// Response Publish Message
//
type ResponsePublishMessage struct {
CommandMessage
Properties AMFObject `json:",omitempty"`
Infomation AMFObject `json:",omitempty"`
Properties map[string]any `json:",omitempty"`
Infomation map[string]any `json:",omitempty"`
StreamID uint32
}
@@ -762,21 +712,14 @@ func (msg *ResponsePublishMessage) GetStreamID() uint32 {
// 信息 -> level, code, description
func (msg *ResponsePublishMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Properties)
amf.writeObject(msg.Infomation)
return amf.Buffer
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
}
/*
func (msg *ResponsePublishMessage) Encode3() {
}*/
//
// Response Seek Message
//
type ResponseSeekMessage struct {
CommandMessage
Description string
@@ -788,9 +731,7 @@ func (msg *ResponseSeekMessage) Encode0() {
//func (msg *ResponseSeekMessage) Encode3() {
//}
//
// Response Pause Message
//
type ResponsePauseMessage struct {
CommandMessage
Description string
@@ -806,13 +747,11 @@ func (msg *ResponsePauseMessage) Encode0() {
//func (msg *ResponsePauseMessage) Encode3() {
//}
//
// Response Message
//
type ResponseMessage struct {
CommandMessage
Properties AMFObject `json:",omitempty"`
Infomation AMFObject `json:",omitempty"`
Properties map[string]any `json:",omitempty"`
Infomation map[string]any `json:",omitempty"`
Description string
}