diff --git a/amf.go b/amf.go index b2b9d33..27346f3 100644 --- a/amf.go +++ b/amf.go @@ -133,6 +133,7 @@ func (amf *AMF) decodeObject() (obj AMFValue) { func (amf *AMF) writeObject(t AMFObject) { if t == nil { + amf.writeNull() return } amf.Malloc(1)[0] = AMF0_OBJECT @@ -223,7 +224,9 @@ func (amf *AMF) readObject() (m AMFObject) { if amf.Len() == 0 { return nil } - amf.ReadByte() + if amf.ReadByte() == AMF0_NULL { + return nil + } m = make(AMFObject, 0) for { k := amf.readString1() diff --git a/client.go b/client.go index b33aca0..f8b256a 100644 --- a/client.go +++ b/client.go @@ -8,7 +8,6 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4" - "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -26,7 +25,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) { client = &NetConnection{ TCPConn: conn.(*net.TCPConn), Reader: bufio.NewReader(conn), - writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, + writeChunkSize: conf.ChunkSize, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, rtmpHeader: make(map[uint32]*ChunkHeader), incompleteRtmpBody: make(map[uint32]util.Buffer), @@ -45,7 +44,12 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) { connectArg["flashVer"] = "monibuca/" + engine.Engine.Version ps := strings.Split(u.Path, "/") connectArg["app"] = ps[0] - client.SendCommand(SEND_CONNECT_MESSAGE, connectArg) + err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize)) + client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{ + CommandMessage{"connect", 1}, + connectArg, + nil, + }) for { msg, err := client.RecvMessage() if err != nil { @@ -74,12 +78,12 @@ type RTMPPusher struct { func (pusher *RTMPPusher) Connect() (err error) { pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL) - log.Info("connect", zap.String("remoteURL", pusher.RemoteURL)) + plugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL)) return } func (pusher *RTMPPusher) Push() { - pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) + pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) for { msg, err := pusher.RecvMessage() if err != nil { @@ -123,13 +127,13 @@ type RTMPPuller struct { func (puller *RTMPPuller) Connect() (err error) { puller.NetConnection, err = NewRTMPClient(puller.RemoteURL) - log.Info("connect", zap.String("remoteURL", puller.RemoteURL)) + plugin.Info("connect", zap.String("remoteURL", puller.RemoteURL)) return } func (puller *RTMPPuller) Pull() { puller.absTs = make(map[uint32]uint32) - puller.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) + puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) for { msg, err := puller.RecvMessage() if err != nil { @@ -147,6 +151,7 @@ func (puller *RTMPPuller) Pull() { if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok { puller.StreamID = response.StreamId m := &PlayMessage{} + m.TransactionId = 1 m.CommandMessage.CommandName = "play" m.StreamName = puller.Stream.StreamName puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m) diff --git a/main.go b/main.go index 9fe5a50..bce826f 100644 --- a/main.go +++ b/main.go @@ -59,8 +59,8 @@ func (c *RTMPConfig) OnEvent(event any) { } } } - -var plugin = InstallPlugin(&RTMPConfig{ +var conf = &RTMPConfig{ ChunkSize: 4096, TCP: config.TCP{ListenAddr: ":1935"}, -}) +} +var plugin = InstallPlugin(conf) diff --git a/msg.go b/msg.go index f938e26..9c859f2 100644 --- a/msg.go +++ b/msg.go @@ -215,19 +215,26 @@ func decodeCommandAMF0(chunk *Chunk) { } case "createStream": amf.readNull() - chunk.MsgData = &CreateStreamMessage{ - cmdMsg, amf.readObject(), - } - + chunk.MsgData = &cmdMsg case "play": amf.readNull() - chunk.MsgData = &PlayMessage{ + m := &PlayMessage{ cmdMsg, amf.readString(), - uint64(amf.readNumber()), - uint64(amf.readNumber()), - amf.readBool(), + float64(-2), + float64(-1), + true, } + if amf.Len() > 0 { + m.Start = amf.readNumber() + } + if amf.Len() > 0 { + m.Duration = amf.readNumber() + } + if amf.Len() > 0 { + m.Reset = amf.readBool() + } + chunk.MsgData = m case "play2": amf.readNull() chunk.MsgData = &Play2Message{ @@ -421,7 +428,9 @@ func (msg *CallMessage) Encode() []byte { amf.writeString(msg.CommandName) amf.writeNumber(float64(msg.TransactionId)) amf.writeObject(msg.Object) - amf.writeObject(msg.Optional) + if msg.Optional != nil { + amf.writeObject(msg.Optional) + } return amf.Buffer } @@ -439,19 +448,6 @@ func (msg *CallMessage) Encode3() []byte { // The client sends this command to the server to create a logical channel for message communication The publishing of audio, // video, and metadata is carried out over stream channel created using the createStream command. -type CreateStreamMessage struct { - CommandMessage - Object AMFObject -} - -func (msg *CreateStreamMessage) Encode() []byte { - var amf AMF - amf.writeString(msg.CommandName) - amf.writeNumber(float64(msg.TransactionId)) - amf.writeObject(msg.Object) - return amf.Buffer -} - /* func (msg *CreateStreamMessage) Encode3() { msg.Encode0() @@ -481,8 +477,8 @@ func (msg *CreateStreamMessage) Encode3() { type PlayMessage struct { CommandMessage StreamName string - Start uint64 - Duration uint64 + Start float64 + Duration float64 Reset bool } diff --git a/netConnection.go b/netConnection.go index 22d3fc0..b91ed7b 100644 --- a/netConnection.go +++ b/netConnection.go @@ -120,63 +120,7 @@ func (conn *NetConnection) SendCommand(message string, args any) error { // m.Millisecond = 100 // m.StreamID = conn.streamID // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m) - case SEND_CREATE_STREAM_MESSAGE: - if args != nil { - return errors.New(SEND_CREATE_STREAM_MESSAGE + ", The parameter is nil") - } - m := &CreateStreamMessage{} - m.CommandName = "createStream" - m.TransactionId = 2 - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_PLAY_MESSAGE: - data, ok := args.(AMFObject) - if !ok { - errors.New(SEND_PLAY_MESSAGE + ", The parameter is AMFObject") - } - m := new(PlayMessage) - m.CommandName = "play" - m.TransactionId = 1 - for i, v := range data { - if i == "StreamName" { - m.StreamName = v.(string) - } else if i == "Start" { - m.Start = v.(uint64) - } else if i == "Duration" { - m.Duration = v.(uint64) - } else if i == "Reset" { - m.Reset = v.(bool) - } - } - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_PLAY_RESPONSE_MESSAGE: - data, ok := args.(AMFObject) - if !ok { - errors.New(SEND_PLAY_RESPONSE_MESSAGE + ", The parameter is AMFObject") - } - - obj := make(AMFObject) - var streamID uint32 - - for i, v := range data { - switch i { - case "code", "level": - obj[i] = v - case "streamid": - if t, ok := v.(uint32); ok { - streamID = t - } - } - } - - obj["clientid"] = 1 - - m := new(ResponsePlayMessage) - m.CommandName = Response_OnStatus - m.TransactionId = 0 - m.Object = obj - m.StreamID = streamID - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_CONNECT_RESPONSE_MESSAGE: //if !ok { // errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") @@ -206,33 +150,10 @@ func (conn *NetConnection) SendCommand(message string, args any) error { m.Properties = pro m.Infomation = info return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_CONNECT_MESSAGE: - data, ok := args.(AMFObject) - if !ok { - errors.New(SEND_CONNECT_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") - } - - obj := make(AMFObject) - info := make(AMFObject) - - for i, v := range data { - switch i { - case "videoFunction", "objectEncoding", "fpad", "flashVer", "capabilities", "pageUrl", "swfUrl", "tcUrl", "videoCodecs", "app", "audioCodecs": - obj[i] = v - } - } - - m := new(CallMessage) - m.CommandName = "connect" - m.TransactionId = 1 - m.Object = obj - m.Optional = info - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_UNPUBLISH_RESPONSE_MESSAGE: data, ok := args.(AMFObject) if !ok { - errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") + return errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") } m := new(CommandMessage) m.TransactionId = data["tid"].(uint64) diff --git a/server.go b/server.go index b745c50..00248c6 100644 --- a/server.go +++ b/server.go @@ -87,7 +87,21 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { LimitType: byte(2), }) err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0) - err = nc.SendCommand(SEND_CONNECT_RESPONSE_MESSAGE, nc.objectEncoding) + m := new(ResponseConnectMessage) + m.CommandName = Response_Result + m.TransactionId = 1 + m.Properties = AMFObject{ + "fmsVer": "monibuca/" + engine.Engine.Version, + "capabilities": 31, + "mode": 1, + "Author": "dexter", + } + m.Infomation = AMFObject{ + "level": Level_Status, + "code": NetConnection_Connect_Success, + "objectEncoding": nc.objectEncoding, + } + err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m) case "createStream": streamId := atomic.AddUint32(&gstreamid, 1) plugin.Info("createStream:", zap.Uint32("streamId", streamId))