diff --git a/client.go b/client.go index f714858..b06fe5c 100644 --- a/client.go +++ b/client.go @@ -1,23 +1,17 @@ package rtmp import ( - "bytes" - "code.google.com/p/go-uuid/uuid" "crypto/tls" - "encoding/binary" - "errors" - "fmt" "github.com/elobuff/goamf" - "io" "net" "net/url" "sync/atomic" ) type ClientHandler interface { - OnConnect() - OnDisconnect() - OnReceive(message *Message) + OnRtmpConnect() + OnRtmpDisconnect() + OnRtmpCommand(command *Command) } type Client struct { @@ -27,6 +21,8 @@ type Client struct { connected bool conn net.Conn + enc amf.Encoder + dec amf.Decoder outBytes uint32 outMessages chan *Message @@ -42,23 +38,25 @@ type Client struct { inChunkStreams map[uint32]*InboundChunkStream lastTransactionId uint32 + connectionId string } -func NewClient(url string) (*Client, error) { +func NewClient(url string, handler ClientHandler) (*Client, error) { c := &Client{ url: url, - connected: false, - + connected: false, + handler: handler, + enc: *new(amf.Encoder), + dec: *new(amf.Decoder), outMessages: make(chan *Message, 100), outChunkSize: DEFAULT_CHUNK_SIZE, outWindowSize: DEFAULT_WINDOW_SIZE, outChunkStreams: make(map[uint32]*OutboundChunkStream), - - inMessages: make(chan *Message, 100), - inChunkSize: DEFAULT_CHUNK_SIZE, - inWindowSize: DEFAULT_WINDOW_SIZE, - inChunkStreams: make(map[uint32]*InboundChunkStream), + inMessages: make(chan *Message, 100), + inChunkSize: DEFAULT_CHUNK_SIZE, + inWindowSize: DEFAULT_WINDOW_SIZE, + inChunkStreams: make(map[uint32]*InboundChunkStream), } err := c.Connect() @@ -84,15 +82,19 @@ func (c *Client) Connect() (err error) { config := &tls.Config{InsecureSkipVerify: true} c.conn, err = tls.Dial("tcp", url.Host, config) default: - return errors.New(fmt.Sprintf("Unsupported scheme: %s", url.Scheme)) + return Error("Unsupported scheme: %s", url.Scheme) } + log.Debug("handshaking with %s", c.url) + err = c.handshake() if err != nil { return err } - err = c.connectCommand() + log.Debug("sending connect command to %s", c.url) + + err = c.invokeConnect() if err != nil { return err } @@ -101,8 +103,6 @@ func (c *Client) Connect() (err error) { go c.receiveLoop() go c.sendLoop() - log.Info("connected to %s", c.url) - return nil } @@ -110,276 +110,24 @@ func (c *Client) NextTransactionId() uint32 { return atomic.AddUint32(&c.lastTransactionId, 1) } -func (c *Client) connectCommand() (err error) { - buf := new(bytes.Buffer) - - amf.WriteString(buf, "connect") - - tid := c.NextTransactionId() - amf.WriteDouble(buf, float64(tid)) - - opts := *amf.MakeObject() - opts["app"] = "" - opts["flashVer"] = "WIN 10,1,85,3" - opts["swfUrl"] = "app://mod_ser.dat" - opts["tcUrl"] = c.url - opts["fpad"] = false - opts["capabilities"] = 239 - opts["audioCodecs"] = 3191 - opts["videoCodecs"] = 252 - opts["videoFunction"] = 1 - opts["pageUrl"] = nil - opts["objectEncoding"] = 3 - - cmh := *amf.MakeObject() - cmh["DSMessagingVersion"] = 1 - cmh["DSId"] = "my-rtmps" - - cm := *amf.MakeTypedObject() - cm.Type = "flex.messaging.messages.CommandMessage" - cm.Object["destination"] = "" - cm.Object["operation"] = 5 - cm.Object["correlationId"] = "" - cm.Object["timestamp"] = 0 - cm.Object["timeToLive"] = 0 - cm.Object["messageId"] = uuid.New() - cm.Object["body"] = nil - cm.Object["headers"] = cmh - - amf.WriteObject(buf, opts) - - amf.WriteBoolean(buf, false) - amf.WriteString(buf, "nil") - amf.WriteString(buf, "") - amf.AMF3_WriteValue(buf, cm) - - m := &Message{ - ChunkStreamId: CHUNK_STREAM_ID_COMMAND, - Type: MESSAGE_TYPE_AMF0, - Length: uint32(buf.Len()), - Buffer: buf, - } - - c.outMessages <- m - - return -} - func (c *Client) Disconnect() { c.connected = false c.conn.Close() + c.handler.OnRtmpDisconnect() log.Info("disconnected from %s", c.url, c.outBytes, c.inBytes) } -func (c *Client) dispatchLoop() { - for { - m := <-c.inMessages - - switch m.ChunkStreamId { - case CHUNK_STREAM_ID_PROTOCOL: - c.handleProtocolMessage(m) - case CHUNK_STREAM_ID_COMMAND: - c.handleCommandMessage(m) - } - } -} - -func (c *Client) handleProtocolMessage(m *Message) { - switch m.Type { - case MESSAGE_TYPE_CHUNK_SIZE: - size := binary.BigEndian.Uint32(m.Buffer.Bytes()) - log.Debug("setting chunk %d -> %d", c.inChunkSize, size) - c.inChunkSize = size - - case MESSAGE_TYPE_ACK_SIZE: - log.Debug("ignoring ack size") - - case MESSAGE_TYPE_BANDWIDTH: - size := binary.BigEndian.Uint32(m.Buffer.Bytes()) - log.Debug("ignoring bandwidth %d", size) - - default: - log.Debug("ignoring other protocol message %d", m.Type) - - } -} - -func (c *Client) handleCommandMessage(m *Message) { - log.Debug("command message: %+v", m) - - c.connected = true -} - -func (c *Client) sendLoop() { - for { - m := <-c.outMessages - - var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId] - if cs == nil { - cs = NewOutboundChunkStream(m.ChunkStreamId) - } - - h := cs.NewOutboundHeader(m) - - var n int64 = 0 - var err error - var ws uint32 = 0 - var rem uint32 = m.Length - - for rem > 0 { - log.Debug("rem is %d", rem) - log.Debug("send message header: %+v", h) - _, err = h.Write(c) - if err != nil { - if c.connected { - log.Warn("unable to send header: %v", err) - c.Disconnect() - } - return - } - - ws = rem - if ws > c.outChunkSize { - ws = c.outChunkSize - } - - n, err = io.CopyN(c, m.Buffer, int64(ws)) - if err != nil { - if c.connected { - log.Warn("unable to send message") - c.Disconnect() - } - return - } - - rem -= uint32(n) - - // Set the header to continuation only for the - // next iteration (if it happens). - h.Format = HEADER_FORMAT_CONTINUATION - } - - log.Debug("finished sending message") - - } -} - -func (c *Client) receiveLoop() { - for { - // Read the next header from the connection - h, err := ReadHeader(c) - if err != nil { - if c.connected { - log.Warn("unable to receive next header while connected") - c.Disconnect() - } - return - } - - // Determine whether or not we already have a chunk stream - // allocated for this ID. If we don't, create one. - var cs *InboundChunkStream = c.inChunkStreams[h.ChunkStreamId] - if cs == nil { - cs = NewInboundChunkStream(h.ChunkStreamId) - c.inChunkStreams[h.ChunkStreamId] = cs - } - - var ts uint32 - var m *Message - - if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) { - log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId) - c.Disconnect() - return - } - - switch h.Format { - case HEADER_FORMAT_FULL: - // If it's an entirely new header, replace the reference in - // the chunk stream and set the working timestamp from - // the header. - cs.lastHeader = &h - ts = h.Timestamp - - case HEADER_FORMAT_SAME_STREAM: - // If it's the same stream, use the last message stream id, - // but otherwise use values from the header. - h.MessageStreamId = cs.lastHeader.MessageStreamId - cs.lastHeader = &h - ts = cs.lastInAbsoluteTimestamp + h.Timestamp - - case HEADER_FORMAT_SAME_LENGTH_AND_STREAM: - // If it's the same length and stream, copy values from the - // last header and replace it. - h.MessageStreamId = cs.lastHeader.MessageStreamId - h.MessageLength = cs.lastHeader.MessageLength - h.MessageTypeId = cs.lastHeader.MessageTypeId - cs.lastHeader = &h - ts = cs.lastInAbsoluteTimestamp + h.Timestamp - - case HEADER_FORMAT_CONTINUATION: - // A full continuation of the previous stream. Copy all values. - h.MessageStreamId = cs.lastHeader.MessageStreamId - h.MessageLength = cs.lastHeader.MessageLength - h.MessageTypeId = cs.lastHeader.MessageTypeId - h.Timestamp = cs.lastHeader.Timestamp - ts = cs.lastInAbsoluteTimestamp + cs.lastHeader.Timestamp - - // If there's a message already started, use it. - if cs.currentMessage != nil { - m = cs.currentMessage - } - } - - if m == nil { - m = &Message{ - Type: h.MessageTypeId, - ChunkStreamId: h.ChunkStreamId, - StreamId: h.MessageStreamId, - Timestamp: h.CalculateTimestamp(), - AbsoluteTimestamp: ts, - Length: h.MessageLength, - Buffer: new(bytes.Buffer), - } - } - - cs.lastInAbsoluteTimestamp = ts - - rs := m.RemainingBytes() - if rs > c.inChunkSize { - rs = c.inChunkSize - } - - _, err = io.CopyN(m.Buffer, c, int64(rs)) - if err != nil { - if c.connected { - log.Warn("unable to copy %d message bytes from buffer", rs) - c.Disconnect() - } - - return - } - - if m.RemainingBytes() == 0 { - cs.currentMessage = nil - c.inMessages <- m - } else { - cs.currentMessage = m - } - } -} - func (c *Client) Read(p []byte) (n int, err error) { n, err = c.conn.Read(p) c.inBytes += uint32(n) - log.Debug("read %d", n) + log.Trace("read %d", n) return n, err } func (c *Client) Write(p []byte) (n int, err error) { n, err = c.conn.Write(p) c.outBytes += uint32(n) - log.Debug("write %d", n) + log.Trace("write %d", n) return n, err } diff --git a/client_connect.go b/client_connect.go new file mode 100644 index 0000000..77e9e62 --- /dev/null +++ b/client_connect.go @@ -0,0 +1,85 @@ +package rtmp + +import ( + "bytes" + "code.google.com/p/go-uuid/uuid" + "github.com/elobuff/goamf" +) + +func (c *Client) invokeConnect() (err error) { + buf := new(bytes.Buffer) + tid := c.NextTransactionId() + + c.enc.Encode(buf, "connect", 0) + c.enc.Encode(buf, float64(tid), 0) + + opts := make(amf.Object) + opts["app"] = "" + opts["flashVer"] = "WIN 10,1,85,3" + opts["swfUrl"] = "app://mod_ser.dat" + opts["tcUrl"] = c.url + opts["fpad"] = false + opts["capabilities"] = 239 + opts["audioCodecs"] = 3191 + opts["videoCodecs"] = 252 + opts["videoFunction"] = 1 + opts["pageUrl"] = nil + opts["objectEncoding"] = 3 + + c.enc.Encode(buf, opts, 0) + c.enc.Encode(buf, false, 0) + c.enc.Encode(buf, "nil", 0) + c.enc.Encode(buf, "", 0) + + cmh := make(amf.Object) + cmh["DSMessagingVersion"] = 1 + cmh["DSId"] = "my-rtmps" + + cm := amf.NewTypedObject() + cm.Type = "flex.messaging.messages.CommandMessage" + cm.Object["destination"] = "" + cm.Object["operation"] = 5 + cm.Object["correlationId"] = "" + cm.Object["timestamp"] = 0 + cm.Object["timeToLive"] = 0 + cm.Object["messageId"] = uuid.New() + cm.Object["body"] = nil + cm.Object["headers"] = cmh + + c.enc.Encode(buf, cm, 0) + + m := &Message{ + ChunkStreamId: CHUNK_STREAM_ID_COMMAND, + Type: MESSAGE_TYPE_AMF0, + Length: uint32(buf.Len()), + Buffer: buf, + } + + c.outMessages <- m + + return +} + +func (c *Client) handleConnectResult(cmd *Command) { + log.Debug("connect response received from %s", c.url) + + obj, ok := cmd.Objects[1].(amf.Object) + if !ok { + log.Error("unable to find connect result in command response") + c.Disconnect() + return + } + + if obj["code"] != "NetConnection.Connect.Success" { + log.Error("connect unsuccessful: %s", obj["code"]) + c.Disconnect() + return + } + + c.connected = true + c.connectionId = obj["id"].(string) + c.handler.OnRtmpConnect() + log.Info("connected to %s (%s)", c.url, c.connectionId) + + return +} diff --git a/client_dispatch.go b/client_dispatch.go new file mode 100644 index 0000000..fc52978 --- /dev/null +++ b/client_dispatch.go @@ -0,0 +1,61 @@ +package rtmp + +import ( + "encoding/binary" +) + +func (c *Client) dispatchLoop() { + for { + m := <-c.inMessages + + switch m.ChunkStreamId { + case CHUNK_STREAM_ID_PROTOCOL: + c.dispatchProtocolMessage(m) + case CHUNK_STREAM_ID_COMMAND: + c.dispatchCommandMessage(m) + default: + log.Warn("discarding message on unknown chunk stream %d: +%v", m.ChunkStreamId, m) + } + } +} + +func (c *Client) dispatchProtocolMessage(m *Message) { + switch m.Type { + case MESSAGE_TYPE_CHUNK_SIZE: + var err error + var size uint32 + err = binary.Read(m.Buffer, binary.BigEndian, &size) + if err != nil { + log.Error("error decoding chunk size: %s", err) + return + } + + c.inChunkSize = size + log.Debug("received chunk size, setting to %d", c.inChunkSize, size) + + case MESSAGE_TYPE_ACK_SIZE: + log.Debug("received ack size, discarding") + + case MESSAGE_TYPE_BANDWIDTH: + log.Debug("received bandwidth, discarding") + + default: + log.Debug("received protocol message %d, discarding", m.Type) + + } +} + +func (c *Client) dispatchCommandMessage(m *Message) { + cmd, err := m.DecodeCommand(&c.dec) + if err != nil { + log.Error("unable to decode message type %d on stream %d into command, discarding: %s", m.Type, m.ChunkStreamId, err) + return + } + + if cmd.Name == "_result" && cmd.TransactionId == 1 { + c.handleConnectResult(cmd) + } else { + c.handler.OnRtmpCommand(cmd) + return + } +} diff --git a/handshake.go b/client_handshake.go similarity index 100% rename from handshake.go rename to client_handshake.go diff --git a/client_invoke.go b/client_invoke.go new file mode 100644 index 0000000..2438e36 --- /dev/null +++ b/client_invoke.go @@ -0,0 +1,49 @@ +package rtmp + +/* +func (c *Client) Invoke(destination string, operation string, objects ...interface{}) (tid uint32, err error) { + rmh := *amf.MakeObject() + rmh["DSRequestTimeout"] = 60 + rmh["DSId"] = c.connectionId + rmh["DSEndpoint"] = "my-rtmps" + + rm := *amf.MakeTypedObject() + rm.Type = "flex.messaging.messages.RemotingMessage" + rm.Object["destination"] = destination + rm.Object["operation"] = operation + rm.Object["messageId"] = uuid.New() + rm.Object["source"] = nil + rm.Object["timestamp"] = 0 + rm.Object["timeToLive"] = 0 + rm.Object["headers"] = rmh + rm.Object["body"] = objects + + buf := new(bytes.Buffer) + tid = c.NextTransactionId() + + amf.WriteMarker(buf, 0x00) // AMF3 + amf.WriteNull(buf) // command + amf.WriteDouble(buf, float64(tid)) // tid + amf.WriteMarker(buf, amf.AMF0_ACMPLUS_OBJECT_MARKER) // amf3 + + log.Debug("rm: %+v", rm) + log.Debug("buf before: %+v", buf.Bytes()) + _, err = amf.AMF3_WriteValue(buf, rm) + if err != nil { + log.Debug("error: %s", err) + return + } + log.Debug("buf after: %s", buf.Bytes()) + + m := &Message{ + Type: MESSAGE_TYPE_AMF3, + ChunkStreamId: CHUNK_STREAM_ID_COMMAND, + Length: uint32(buf.Len()), + Buffer: buf, + } + + c.outMessages <- msg + + return +} +*/ diff --git a/client_receive.go b/client_receive.go new file mode 100644 index 0000000..17a13a4 --- /dev/null +++ b/client_receive.go @@ -0,0 +1,111 @@ +package rtmp + +import ( + "bytes" + "io" +) + +func (c *Client) receiveLoop() { + for { + // Read the next header from the connection + h, err := ReadHeader(c) + if err != nil { + if c.connected { + log.Warn("unable to receive next header while connected") + c.Disconnect() + } + return + } + + // Determine whether or not we already have a chunk stream + // allocated for this ID. If we don't, create one. + var cs *InboundChunkStream = c.inChunkStreams[h.ChunkStreamId] + if cs == nil { + cs = NewInboundChunkStream(h.ChunkStreamId) + c.inChunkStreams[h.ChunkStreamId] = cs + } + + var ts uint32 + var m *Message + + if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) { + log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId) + c.Disconnect() + return + } + + switch h.Format { + case HEADER_FORMAT_FULL: + // If it's an entirely new header, replace the reference in + // the chunk stream and set the working timestamp from + // the header. + cs.lastHeader = &h + ts = h.Timestamp + + case HEADER_FORMAT_SAME_STREAM: + // If it's the same stream, use the last message stream id, + // but otherwise use values from the header. + h.MessageStreamId = cs.lastHeader.MessageStreamId + cs.lastHeader = &h + ts = cs.lastInAbsoluteTimestamp + h.Timestamp + + case HEADER_FORMAT_SAME_LENGTH_AND_STREAM: + // If it's the same length and stream, copy values from the + // last header and replace it. + h.MessageStreamId = cs.lastHeader.MessageStreamId + h.MessageLength = cs.lastHeader.MessageLength + h.MessageTypeId = cs.lastHeader.MessageTypeId + cs.lastHeader = &h + ts = cs.lastInAbsoluteTimestamp + h.Timestamp + + case HEADER_FORMAT_CONTINUATION: + // A full continuation of the previous stream. Copy all values. + h.MessageStreamId = cs.lastHeader.MessageStreamId + h.MessageLength = cs.lastHeader.MessageLength + h.MessageTypeId = cs.lastHeader.MessageTypeId + h.Timestamp = cs.lastHeader.Timestamp + ts = cs.lastInAbsoluteTimestamp + cs.lastHeader.Timestamp + + // If there's a message already started, use it. + if cs.currentMessage != nil { + m = cs.currentMessage + } + } + + if m == nil { + m = &Message{ + Type: h.MessageTypeId, + ChunkStreamId: h.ChunkStreamId, + StreamId: h.MessageStreamId, + Timestamp: h.CalculateTimestamp(), + AbsoluteTimestamp: ts, + Length: h.MessageLength, + Buffer: new(bytes.Buffer), + } + } + + cs.lastInAbsoluteTimestamp = ts + + rs := m.RemainingBytes() + if rs > c.inChunkSize { + rs = c.inChunkSize + } + + _, err = io.CopyN(m.Buffer, c, int64(rs)) + if err != nil { + if c.connected { + log.Warn("unable to copy %d message bytes from buffer", rs) + c.Disconnect() + } + + return + } + + if m.RemainingBytes() == 0 { + cs.currentMessage = nil + c.inMessages <- m + } else { + cs.currentMessage = m + } + } +} diff --git a/client_send.go b/client_send.go new file mode 100644 index 0000000..104a2ae --- /dev/null +++ b/client_send.go @@ -0,0 +1,60 @@ +package rtmp + +import ( + "io" +) + +func (c *Client) sendLoop() { + for { + m := <-c.outMessages + + var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId] + if cs == nil { + cs = NewOutboundChunkStream(m.ChunkStreamId) + } + + h := cs.NewOutboundHeader(m) + + var n int64 = 0 + var err error + var ws uint32 = 0 + var rem uint32 = m.Length + + for rem > 0 { + log.Debug("send header: %+v", h) + _, err = h.Write(c) + if err != nil { + if c.connected { + log.Warn("unable to send header: %v", err) + c.Disconnect() + } + return + } + + ws = rem + if ws > c.outChunkSize { + ws = c.outChunkSize + } + + log.Debug("send bytes: %d", ws) + + n, err = io.CopyN(c, m.Buffer, int64(ws)) + if err != nil { + if c.connected { + log.Warn("unable to send message") + c.Disconnect() + } + return + } + + rem -= uint32(n) + + // Set the header to continuation only for the + // next iteration (if it happens). + h.Format = HEADER_FORMAT_CONTINUATION + } + + log.Debug("send complete") + + } +} diff --git a/command.go b/command.go new file mode 100644 index 0000000..db1991f --- /dev/null +++ b/command.go @@ -0,0 +1,8 @@ +package rtmp + +type Command struct { + Name string + TransactionId float64 + Version uint8 + Objects []interface{} +} diff --git a/const.go b/const.go index 8dcb93d..d227ced 100644 --- a/const.go +++ b/const.go @@ -38,6 +38,11 @@ const ( MESSAGE_TYPE_FLV = 0x16 ) +const ( + AMF0 = 0x00 + AMF3 = 0x03 +) + const ( MESSAGE_DISPATCH_QUEUE_LENGTH = 100 ) diff --git a/message.go b/message.go index 3aa445a..aa13f3f 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,7 @@ package rtmp import ( "bytes" + "github.com/elobuff/goamf" ) type Message struct { @@ -21,3 +22,51 @@ func (m *Message) RemainingBytes() uint32 { return m.Length - uint32(m.Buffer.Len()) } + +func (m *Message) DecodeCommand(dec *amf.Decoder) (*Command, error) { + var err error + var obj interface{} + + cmd := new(Command) + cmd.Version = AMF0 + + if m.ChunkStreamId != CHUNK_STREAM_ID_COMMAND { + return cmd, Error("message is not a command message") + } + + switch m.Type { + case MESSAGE_TYPE_AMF3: + cmd.Version = AMF3 + _, err = m.Buffer.ReadByte() + if err != nil { + return cmd, Error("unable to read first byte of amf3 message") + } + fallthrough + + case MESSAGE_TYPE_AMF0: + cmd.Name, err = dec.DecodeAmf0String(m.Buffer, true) + if err != nil { + return cmd, Error("unable to read command from amf message") + } + + cmd.TransactionId, err = dec.DecodeAmf0Number(m.Buffer, true) + if err != nil { + return cmd, Error("unable to read tid from amf message") + } + + for m.Buffer.Len() > 0 { + obj, err = dec.Decode(m.Buffer, 0) + if err != nil { + return cmd, Error("unable to read object from amf message: %s", err) + } + + cmd.Objects = append(cmd.Objects, obj) + } + default: + return cmd, Error("unable to decode message: %+v", m) + } + + log.Debug("command decoded: %+v", cmd) + + return cmd, err +} diff --git a/util.go b/util.go index 7b31fac..7ed8d1a 100644 --- a/util.go +++ b/util.go @@ -1,12 +1,18 @@ package rtmp import ( + "errors" + "fmt" "github.com/jcoene/gologger" "time" ) var log logger.Logger = *logger.NewLogger(logger.LOG_LEVEL_DEBUG, "rtmp") +func Error(f string, v ...interface{}) error { + return errors.New(fmt.Sprintf(f, v...)) +} + func GetCurrentTimestamp() uint32 { return uint32(time.Now().UnixNano()/int64(1000000)) % TIMESTAMP_MAX }