This commit is contained in:
dexter
2022-03-08 17:18:05 +08:00
parent 0bbd227b72
commit ca1b08e499
6 changed files with 55 additions and 116 deletions

5
amf.go
View File

@@ -133,6 +133,7 @@ func (amf *AMF) decodeObject() (obj AMFValue) {
func (amf *AMF) writeObject(t AMFObject) { func (amf *AMF) writeObject(t AMFObject) {
if t == nil { if t == nil {
amf.writeNull()
return return
} }
amf.Malloc(1)[0] = AMF0_OBJECT amf.Malloc(1)[0] = AMF0_OBJECT
@@ -223,7 +224,9 @@ func (amf *AMF) readObject() (m AMFObject) {
if amf.Len() == 0 { if amf.Len() == 0 {
return nil return nil
} }
amf.ReadByte() if amf.ReadByte() == AMF0_NULL {
return nil
}
m = make(AMFObject, 0) m = make(AMFObject, 0)
for { for {
k := amf.readString1() k := amf.readString1()

View File

@@ -8,7 +8,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4" "m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
@@ -26,7 +25,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
client = &NetConnection{ client = &NetConnection{
TCPConn: conn.(*net.TCPConn), TCPConn: conn.(*net.TCPConn),
Reader: bufio.NewReader(conn), Reader: bufio.NewReader(conn),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, writeChunkSize: conf.ChunkSize,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
rtmpHeader: make(map[uint32]*ChunkHeader), rtmpHeader: make(map[uint32]*ChunkHeader),
incompleteRtmpBody: make(map[uint32]util.Buffer), incompleteRtmpBody: make(map[uint32]util.Buffer),
@@ -45,7 +44,12 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
connectArg["flashVer"] = "monibuca/" + engine.Engine.Version connectArg["flashVer"] = "monibuca/" + engine.Engine.Version
ps := strings.Split(u.Path, "/") ps := strings.Split(u.Path, "/")
connectArg["app"] = ps[0] connectArg["app"] = ps[0]
client.SendCommand(SEND_CONNECT_MESSAGE, connectArg) err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize))
client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{
CommandMessage{"connect", 1},
connectArg,
nil,
})
for { for {
msg, err := client.RecvMessage() msg, err := client.RecvMessage()
if err != nil { if err != nil {
@@ -74,12 +78,12 @@ type RTMPPusher struct {
func (pusher *RTMPPusher) Connect() (err error) { func (pusher *RTMPPusher) Connect() (err error) {
pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL) pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL)
log.Info("connect", zap.String("remoteURL", pusher.RemoteURL)) plugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL))
return return
} }
func (pusher *RTMPPusher) Push() { func (pusher *RTMPPusher) Push() {
pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
for { for {
msg, err := pusher.RecvMessage() msg, err := pusher.RecvMessage()
if err != nil { if err != nil {
@@ -123,13 +127,13 @@ type RTMPPuller struct {
func (puller *RTMPPuller) Connect() (err error) { func (puller *RTMPPuller) Connect() (err error) {
puller.NetConnection, err = NewRTMPClient(puller.RemoteURL) puller.NetConnection, err = NewRTMPClient(puller.RemoteURL)
log.Info("connect", zap.String("remoteURL", puller.RemoteURL)) plugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
return return
} }
func (puller *RTMPPuller) Pull() { func (puller *RTMPPuller) Pull() {
puller.absTs = make(map[uint32]uint32) puller.absTs = make(map[uint32]uint32)
puller.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
for { for {
msg, err := puller.RecvMessage() msg, err := puller.RecvMessage()
if err != nil { if err != nil {
@@ -147,6 +151,7 @@ func (puller *RTMPPuller) Pull() {
if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok { if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
puller.StreamID = response.StreamId puller.StreamID = response.StreamId
m := &PlayMessage{} m := &PlayMessage{}
m.TransactionId = 1
m.CommandMessage.CommandName = "play" m.CommandMessage.CommandName = "play"
m.StreamName = puller.Stream.StreamName m.StreamName = puller.Stream.StreamName
puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m) puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m)

View File

@@ -59,8 +59,8 @@ func (c *RTMPConfig) OnEvent(event any) {
} }
} }
} }
var conf = &RTMPConfig{
var plugin = InstallPlugin(&RTMPConfig{
ChunkSize: 4096, ChunkSize: 4096,
TCP: config.TCP{ListenAddr: ":1935"}, TCP: config.TCP{ListenAddr: ":1935"},
}) }
var plugin = InstallPlugin(conf)

44
msg.go
View File

@@ -215,19 +215,26 @@ func decodeCommandAMF0(chunk *Chunk) {
} }
case "createStream": case "createStream":
amf.readNull() amf.readNull()
chunk.MsgData = &CreateStreamMessage{ chunk.MsgData = &cmdMsg
cmdMsg, amf.readObject(),
}
case "play": case "play":
amf.readNull() amf.readNull()
chunk.MsgData = &PlayMessage{ m := &PlayMessage{
cmdMsg, cmdMsg,
amf.readString(), amf.readString(),
uint64(amf.readNumber()), float64(-2),
uint64(amf.readNumber()), float64(-1),
amf.readBool(), true,
} }
if amf.Len() > 0 {
m.Start = amf.readNumber()
}
if amf.Len() > 0 {
m.Duration = amf.readNumber()
}
if amf.Len() > 0 {
m.Reset = amf.readBool()
}
chunk.MsgData = m
case "play2": case "play2":
amf.readNull() amf.readNull()
chunk.MsgData = &Play2Message{ chunk.MsgData = &Play2Message{
@@ -421,7 +428,9 @@ func (msg *CallMessage) Encode() []byte {
amf.writeString(msg.CommandName) amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId)) amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Object) amf.writeObject(msg.Object)
amf.writeObject(msg.Optional) if msg.Optional != nil {
amf.writeObject(msg.Optional)
}
return amf.Buffer return amf.Buffer
} }
@@ -439,19 +448,6 @@ func (msg *CallMessage) Encode3() []byte {
// The client sends this command to the server to create a logical channel for message communication The publishing of audio, // The client sends this command to the server to create a logical channel for message communication The publishing of audio,
// video, and metadata is carried out over stream channel created using the createStream command. // video, and metadata is carried out over stream channel created using the createStream command.
type CreateStreamMessage struct {
CommandMessage
Object AMFObject
}
func (msg *CreateStreamMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeObject(msg.Object)
return amf.Buffer
}
/* /*
func (msg *CreateStreamMessage) Encode3() { func (msg *CreateStreamMessage) Encode3() {
msg.Encode0() msg.Encode0()
@@ -481,8 +477,8 @@ func (msg *CreateStreamMessage) Encode3() {
type PlayMessage struct { type PlayMessage struct {
CommandMessage CommandMessage
StreamName string StreamName string
Start uint64 Start float64
Duration uint64 Duration float64
Reset bool Reset bool
} }

View File

@@ -120,63 +120,7 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
// m.Millisecond = 100 // m.Millisecond = 100
// m.StreamID = conn.streamID // m.StreamID = conn.streamID
// return conn.writeMessage(RTMP_MSG_USER_CONTROL, m) // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m)
case SEND_CREATE_STREAM_MESSAGE:
if args != nil {
return errors.New(SEND_CREATE_STREAM_MESSAGE + ", The parameter is nil")
}
m := &CreateStreamMessage{}
m.CommandName = "createStream"
m.TransactionId = 2
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_PLAY_MESSAGE:
data, ok := args.(AMFObject)
if !ok {
errors.New(SEND_PLAY_MESSAGE + ", The parameter is AMFObject")
}
m := new(PlayMessage)
m.CommandName = "play"
m.TransactionId = 1
for i, v := range data {
if i == "StreamName" {
m.StreamName = v.(string)
} else if i == "Start" {
m.Start = v.(uint64)
} else if i == "Duration" {
m.Duration = v.(uint64)
} else if i == "Reset" {
m.Reset = v.(bool)
}
}
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_PLAY_RESPONSE_MESSAGE:
data, ok := args.(AMFObject)
if !ok {
errors.New(SEND_PLAY_RESPONSE_MESSAGE + ", The parameter is AMFObject")
}
obj := make(AMFObject)
var streamID uint32
for i, v := range data {
switch i {
case "code", "level":
obj[i] = v
case "streamid":
if t, ok := v.(uint32); ok {
streamID = t
}
}
}
obj["clientid"] = 1
m := new(ResponsePlayMessage)
m.CommandName = Response_OnStatus
m.TransactionId = 0
m.Object = obj
m.StreamID = streamID
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_CONNECT_RESPONSE_MESSAGE: case SEND_CONNECT_RESPONSE_MESSAGE:
//if !ok { //if !ok {
// errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") // errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
@@ -206,33 +150,10 @@ func (conn *NetConnection) SendCommand(message string, args any) error {
m.Properties = pro m.Properties = pro
m.Infomation = info m.Infomation = info
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_CONNECT_MESSAGE:
data, ok := args.(AMFObject)
if !ok {
errors.New(SEND_CONNECT_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
}
obj := make(AMFObject)
info := make(AMFObject)
for i, v := range data {
switch i {
case "videoFunction", "objectEncoding", "fpad", "flashVer", "capabilities", "pageUrl", "swfUrl", "tcUrl", "videoCodecs", "app", "audioCodecs":
obj[i] = v
}
}
m := new(CallMessage)
m.CommandName = "connect"
m.TransactionId = 1
m.Object = obj
m.Optional = info
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_UNPUBLISH_RESPONSE_MESSAGE: case SEND_UNPUBLISH_RESPONSE_MESSAGE:
data, ok := args.(AMFObject) data, ok := args.(AMFObject)
if !ok { if !ok {
errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") return errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
} }
m := new(CommandMessage) m := new(CommandMessage)
m.TransactionId = data["tid"].(uint64) m.TransactionId = data["tid"].(uint64)

View File

@@ -87,7 +87,21 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
LimitType: byte(2), LimitType: byte(2),
}) })
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0) err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0)
err = nc.SendCommand(SEND_CONNECT_RESPONSE_MESSAGE, nc.objectEncoding) m := new(ResponseConnectMessage)
m.CommandName = Response_Result
m.TransactionId = 1
m.Properties = AMFObject{
"fmsVer": "monibuca/" + engine.Engine.Version,
"capabilities": 31,
"mode": 1,
"Author": "dexter",
}
m.Infomation = AMFObject{
"level": Level_Status,
"code": NetConnection_Connect_Success,
"objectEncoding": nc.objectEncoding,
}
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case "createStream": case "createStream":
streamId := atomic.AddUint32(&gstreamid, 1) streamId := atomic.AddUint32(&gstreamid, 1)
plugin.Info("createStream:", zap.Uint32("streamId", streamId)) plugin.Info("createStream:", zap.Uint32("streamId", streamId))