Restructure client library, update to new amf library.

This commit is contained in:
Jason Coene
2013-06-07 18:25:17 -05:00
parent 9c0d49e185
commit 8bd6bc1c94
11 changed files with 458 additions and 276 deletions

300
client.go
View File

@@ -1,23 +1,17 @@
package rtmp
import (
"bytes"
"code.google.com/p/go-uuid/uuid"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"github.com/elobuff/goamf"
"io"
"net"
"net/url"
"sync/atomic"
)
type ClientHandler interface {
OnConnect()
OnDisconnect()
OnReceive(message *Message)
OnRtmpConnect()
OnRtmpDisconnect()
OnRtmpCommand(command *Command)
}
type Client struct {
@@ -27,6 +21,8 @@ type Client struct {
connected bool
conn net.Conn
enc amf.Encoder
dec amf.Decoder
outBytes uint32
outMessages chan *Message
@@ -42,23 +38,25 @@ type Client struct {
inChunkStreams map[uint32]*InboundChunkStream
lastTransactionId uint32
connectionId string
}
func NewClient(url string) (*Client, error) {
func NewClient(url string, handler ClientHandler) (*Client, error) {
c := &Client{
url: url,
connected: false,
connected: false,
handler: handler,
enc: *new(amf.Encoder),
dec: *new(amf.Decoder),
outMessages: make(chan *Message, 100),
outChunkSize: DEFAULT_CHUNK_SIZE,
outWindowSize: DEFAULT_WINDOW_SIZE,
outChunkStreams: make(map[uint32]*OutboundChunkStream),
inMessages: make(chan *Message, 100),
inChunkSize: DEFAULT_CHUNK_SIZE,
inWindowSize: DEFAULT_WINDOW_SIZE,
inChunkStreams: make(map[uint32]*InboundChunkStream),
inMessages: make(chan *Message, 100),
inChunkSize: DEFAULT_CHUNK_SIZE,
inWindowSize: DEFAULT_WINDOW_SIZE,
inChunkStreams: make(map[uint32]*InboundChunkStream),
}
err := c.Connect()
@@ -84,15 +82,19 @@ func (c *Client) Connect() (err error) {
config := &tls.Config{InsecureSkipVerify: true}
c.conn, err = tls.Dial("tcp", url.Host, config)
default:
return errors.New(fmt.Sprintf("Unsupported scheme: %s", url.Scheme))
return Error("Unsupported scheme: %s", url.Scheme)
}
log.Debug("handshaking with %s", c.url)
err = c.handshake()
if err != nil {
return err
}
err = c.connectCommand()
log.Debug("sending connect command to %s", c.url)
err = c.invokeConnect()
if err != nil {
return err
}
@@ -101,8 +103,6 @@ func (c *Client) Connect() (err error) {
go c.receiveLoop()
go c.sendLoop()
log.Info("connected to %s", c.url)
return nil
}
@@ -110,276 +110,24 @@ func (c *Client) NextTransactionId() uint32 {
return atomic.AddUint32(&c.lastTransactionId, 1)
}
func (c *Client) connectCommand() (err error) {
buf := new(bytes.Buffer)
amf.WriteString(buf, "connect")
tid := c.NextTransactionId()
amf.WriteDouble(buf, float64(tid))
opts := *amf.MakeObject()
opts["app"] = ""
opts["flashVer"] = "WIN 10,1,85,3"
opts["swfUrl"] = "app://mod_ser.dat"
opts["tcUrl"] = c.url
opts["fpad"] = false
opts["capabilities"] = 239
opts["audioCodecs"] = 3191
opts["videoCodecs"] = 252
opts["videoFunction"] = 1
opts["pageUrl"] = nil
opts["objectEncoding"] = 3
cmh := *amf.MakeObject()
cmh["DSMessagingVersion"] = 1
cmh["DSId"] = "my-rtmps"
cm := *amf.MakeTypedObject()
cm.Type = "flex.messaging.messages.CommandMessage"
cm.Object["destination"] = ""
cm.Object["operation"] = 5
cm.Object["correlationId"] = ""
cm.Object["timestamp"] = 0
cm.Object["timeToLive"] = 0
cm.Object["messageId"] = uuid.New()
cm.Object["body"] = nil
cm.Object["headers"] = cmh
amf.WriteObject(buf, opts)
amf.WriteBoolean(buf, false)
amf.WriteString(buf, "nil")
amf.WriteString(buf, "")
amf.AMF3_WriteValue(buf, cm)
m := &Message{
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
Type: MESSAGE_TYPE_AMF0,
Length: uint32(buf.Len()),
Buffer: buf,
}
c.outMessages <- m
return
}
func (c *Client) Disconnect() {
c.connected = false
c.conn.Close()
c.handler.OnRtmpDisconnect()
log.Info("disconnected from %s", c.url, c.outBytes, c.inBytes)
}
func (c *Client) dispatchLoop() {
for {
m := <-c.inMessages
switch m.ChunkStreamId {
case CHUNK_STREAM_ID_PROTOCOL:
c.handleProtocolMessage(m)
case CHUNK_STREAM_ID_COMMAND:
c.handleCommandMessage(m)
}
}
}
func (c *Client) handleProtocolMessage(m *Message) {
switch m.Type {
case MESSAGE_TYPE_CHUNK_SIZE:
size := binary.BigEndian.Uint32(m.Buffer.Bytes())
log.Debug("setting chunk %d -> %d", c.inChunkSize, size)
c.inChunkSize = size
case MESSAGE_TYPE_ACK_SIZE:
log.Debug("ignoring ack size")
case MESSAGE_TYPE_BANDWIDTH:
size := binary.BigEndian.Uint32(m.Buffer.Bytes())
log.Debug("ignoring bandwidth %d", size)
default:
log.Debug("ignoring other protocol message %d", m.Type)
}
}
func (c *Client) handleCommandMessage(m *Message) {
log.Debug("command message: %+v", m)
c.connected = true
}
func (c *Client) sendLoop() {
for {
m := <-c.outMessages
var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId]
if cs == nil {
cs = NewOutboundChunkStream(m.ChunkStreamId)
}
h := cs.NewOutboundHeader(m)
var n int64 = 0
var err error
var ws uint32 = 0
var rem uint32 = m.Length
for rem > 0 {
log.Debug("rem is %d", rem)
log.Debug("send message header: %+v", h)
_, err = h.Write(c)
if err != nil {
if c.connected {
log.Warn("unable to send header: %v", err)
c.Disconnect()
}
return
}
ws = rem
if ws > c.outChunkSize {
ws = c.outChunkSize
}
n, err = io.CopyN(c, m.Buffer, int64(ws))
if err != nil {
if c.connected {
log.Warn("unable to send message")
c.Disconnect()
}
return
}
rem -= uint32(n)
// Set the header to continuation only for the
// next iteration (if it happens).
h.Format = HEADER_FORMAT_CONTINUATION
}
log.Debug("finished sending message")
}
}
func (c *Client) receiveLoop() {
for {
// Read the next header from the connection
h, err := ReadHeader(c)
if err != nil {
if c.connected {
log.Warn("unable to receive next header while connected")
c.Disconnect()
}
return
}
// Determine whether or not we already have a chunk stream
// allocated for this ID. If we don't, create one.
var cs *InboundChunkStream = c.inChunkStreams[h.ChunkStreamId]
if cs == nil {
cs = NewInboundChunkStream(h.ChunkStreamId)
c.inChunkStreams[h.ChunkStreamId] = cs
}
var ts uint32
var m *Message
if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) {
log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId)
c.Disconnect()
return
}
switch h.Format {
case HEADER_FORMAT_FULL:
// If it's an entirely new header, replace the reference in
// the chunk stream and set the working timestamp from
// the header.
cs.lastHeader = &h
ts = h.Timestamp
case HEADER_FORMAT_SAME_STREAM:
// If it's the same stream, use the last message stream id,
// but otherwise use values from the header.
h.MessageStreamId = cs.lastHeader.MessageStreamId
cs.lastHeader = &h
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
case HEADER_FORMAT_SAME_LENGTH_AND_STREAM:
// If it's the same length and stream, copy values from the
// last header and replace it.
h.MessageStreamId = cs.lastHeader.MessageStreamId
h.MessageLength = cs.lastHeader.MessageLength
h.MessageTypeId = cs.lastHeader.MessageTypeId
cs.lastHeader = &h
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
case HEADER_FORMAT_CONTINUATION:
// A full continuation of the previous stream. Copy all values.
h.MessageStreamId = cs.lastHeader.MessageStreamId
h.MessageLength = cs.lastHeader.MessageLength
h.MessageTypeId = cs.lastHeader.MessageTypeId
h.Timestamp = cs.lastHeader.Timestamp
ts = cs.lastInAbsoluteTimestamp + cs.lastHeader.Timestamp
// If there's a message already started, use it.
if cs.currentMessage != nil {
m = cs.currentMessage
}
}
if m == nil {
m = &Message{
Type: h.MessageTypeId,
ChunkStreamId: h.ChunkStreamId,
StreamId: h.MessageStreamId,
Timestamp: h.CalculateTimestamp(),
AbsoluteTimestamp: ts,
Length: h.MessageLength,
Buffer: new(bytes.Buffer),
}
}
cs.lastInAbsoluteTimestamp = ts
rs := m.RemainingBytes()
if rs > c.inChunkSize {
rs = c.inChunkSize
}
_, err = io.CopyN(m.Buffer, c, int64(rs))
if err != nil {
if c.connected {
log.Warn("unable to copy %d message bytes from buffer", rs)
c.Disconnect()
}
return
}
if m.RemainingBytes() == 0 {
cs.currentMessage = nil
c.inMessages <- m
} else {
cs.currentMessage = m
}
}
}
func (c *Client) Read(p []byte) (n int, err error) {
n, err = c.conn.Read(p)
c.inBytes += uint32(n)
log.Debug("read %d", n)
log.Trace("read %d", n)
return n, err
}
func (c *Client) Write(p []byte) (n int, err error) {
n, err = c.conn.Write(p)
c.outBytes += uint32(n)
log.Debug("write %d", n)
log.Trace("write %d", n)
return n, err
}

85
client_connect.go Normal file
View File

@@ -0,0 +1,85 @@
package rtmp
import (
"bytes"
"code.google.com/p/go-uuid/uuid"
"github.com/elobuff/goamf"
)
func (c *Client) invokeConnect() (err error) {
buf := new(bytes.Buffer)
tid := c.NextTransactionId()
c.enc.Encode(buf, "connect", 0)
c.enc.Encode(buf, float64(tid), 0)
opts := make(amf.Object)
opts["app"] = ""
opts["flashVer"] = "WIN 10,1,85,3"
opts["swfUrl"] = "app://mod_ser.dat"
opts["tcUrl"] = c.url
opts["fpad"] = false
opts["capabilities"] = 239
opts["audioCodecs"] = 3191
opts["videoCodecs"] = 252
opts["videoFunction"] = 1
opts["pageUrl"] = nil
opts["objectEncoding"] = 3
c.enc.Encode(buf, opts, 0)
c.enc.Encode(buf, false, 0)
c.enc.Encode(buf, "nil", 0)
c.enc.Encode(buf, "", 0)
cmh := make(amf.Object)
cmh["DSMessagingVersion"] = 1
cmh["DSId"] = "my-rtmps"
cm := amf.NewTypedObject()
cm.Type = "flex.messaging.messages.CommandMessage"
cm.Object["destination"] = ""
cm.Object["operation"] = 5
cm.Object["correlationId"] = ""
cm.Object["timestamp"] = 0
cm.Object["timeToLive"] = 0
cm.Object["messageId"] = uuid.New()
cm.Object["body"] = nil
cm.Object["headers"] = cmh
c.enc.Encode(buf, cm, 0)
m := &Message{
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
Type: MESSAGE_TYPE_AMF0,
Length: uint32(buf.Len()),
Buffer: buf,
}
c.outMessages <- m
return
}
func (c *Client) handleConnectResult(cmd *Command) {
log.Debug("connect response received from %s", c.url)
obj, ok := cmd.Objects[1].(amf.Object)
if !ok {
log.Error("unable to find connect result in command response")
c.Disconnect()
return
}
if obj["code"] != "NetConnection.Connect.Success" {
log.Error("connect unsuccessful: %s", obj["code"])
c.Disconnect()
return
}
c.connected = true
c.connectionId = obj["id"].(string)
c.handler.OnRtmpConnect()
log.Info("connected to %s (%s)", c.url, c.connectionId)
return
}

61
client_dispatch.go Normal file
View File

@@ -0,0 +1,61 @@
package rtmp
import (
"encoding/binary"
)
func (c *Client) dispatchLoop() {
for {
m := <-c.inMessages
switch m.ChunkStreamId {
case CHUNK_STREAM_ID_PROTOCOL:
c.dispatchProtocolMessage(m)
case CHUNK_STREAM_ID_COMMAND:
c.dispatchCommandMessage(m)
default:
log.Warn("discarding message on unknown chunk stream %d: +%v", m.ChunkStreamId, m)
}
}
}
func (c *Client) dispatchProtocolMessage(m *Message) {
switch m.Type {
case MESSAGE_TYPE_CHUNK_SIZE:
var err error
var size uint32
err = binary.Read(m.Buffer, binary.BigEndian, &size)
if err != nil {
log.Error("error decoding chunk size: %s", err)
return
}
c.inChunkSize = size
log.Debug("received chunk size, setting to %d", c.inChunkSize, size)
case MESSAGE_TYPE_ACK_SIZE:
log.Debug("received ack size, discarding")
case MESSAGE_TYPE_BANDWIDTH:
log.Debug("received bandwidth, discarding")
default:
log.Debug("received protocol message %d, discarding", m.Type)
}
}
func (c *Client) dispatchCommandMessage(m *Message) {
cmd, err := m.DecodeCommand(&c.dec)
if err != nil {
log.Error("unable to decode message type %d on stream %d into command, discarding: %s", m.Type, m.ChunkStreamId, err)
return
}
if cmd.Name == "_result" && cmd.TransactionId == 1 {
c.handleConnectResult(cmd)
} else {
c.handler.OnRtmpCommand(cmd)
return
}
}

49
client_invoke.go Normal file
View File

@@ -0,0 +1,49 @@
package rtmp
/*
func (c *Client) Invoke(destination string, operation string, objects ...interface{}) (tid uint32, err error) {
rmh := *amf.MakeObject()
rmh["DSRequestTimeout"] = 60
rmh["DSId"] = c.connectionId
rmh["DSEndpoint"] = "my-rtmps"
rm := *amf.MakeTypedObject()
rm.Type = "flex.messaging.messages.RemotingMessage"
rm.Object["destination"] = destination
rm.Object["operation"] = operation
rm.Object["messageId"] = uuid.New()
rm.Object["source"] = nil
rm.Object["timestamp"] = 0
rm.Object["timeToLive"] = 0
rm.Object["headers"] = rmh
rm.Object["body"] = objects
buf := new(bytes.Buffer)
tid = c.NextTransactionId()
amf.WriteMarker(buf, 0x00) // AMF3
amf.WriteNull(buf) // command
amf.WriteDouble(buf, float64(tid)) // tid
amf.WriteMarker(buf, amf.AMF0_ACMPLUS_OBJECT_MARKER) // amf3
log.Debug("rm: %+v", rm)
log.Debug("buf before: %+v", buf.Bytes())
_, err = amf.AMF3_WriteValue(buf, rm)
if err != nil {
log.Debug("error: %s", err)
return
}
log.Debug("buf after: %s", buf.Bytes())
m := &Message{
Type: MESSAGE_TYPE_AMF3,
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
Length: uint32(buf.Len()),
Buffer: buf,
}
c.outMessages <- msg
return
}
*/

111
client_receive.go Normal file
View File

@@ -0,0 +1,111 @@
package rtmp
import (
"bytes"
"io"
)
func (c *Client) receiveLoop() {
for {
// Read the next header from the connection
h, err := ReadHeader(c)
if err != nil {
if c.connected {
log.Warn("unable to receive next header while connected")
c.Disconnect()
}
return
}
// Determine whether or not we already have a chunk stream
// allocated for this ID. If we don't, create one.
var cs *InboundChunkStream = c.inChunkStreams[h.ChunkStreamId]
if cs == nil {
cs = NewInboundChunkStream(h.ChunkStreamId)
c.inChunkStreams[h.ChunkStreamId] = cs
}
var ts uint32
var m *Message
if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) {
log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId)
c.Disconnect()
return
}
switch h.Format {
case HEADER_FORMAT_FULL:
// If it's an entirely new header, replace the reference in
// the chunk stream and set the working timestamp from
// the header.
cs.lastHeader = &h
ts = h.Timestamp
case HEADER_FORMAT_SAME_STREAM:
// If it's the same stream, use the last message stream id,
// but otherwise use values from the header.
h.MessageStreamId = cs.lastHeader.MessageStreamId
cs.lastHeader = &h
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
case HEADER_FORMAT_SAME_LENGTH_AND_STREAM:
// If it's the same length and stream, copy values from the
// last header and replace it.
h.MessageStreamId = cs.lastHeader.MessageStreamId
h.MessageLength = cs.lastHeader.MessageLength
h.MessageTypeId = cs.lastHeader.MessageTypeId
cs.lastHeader = &h
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
case HEADER_FORMAT_CONTINUATION:
// A full continuation of the previous stream. Copy all values.
h.MessageStreamId = cs.lastHeader.MessageStreamId
h.MessageLength = cs.lastHeader.MessageLength
h.MessageTypeId = cs.lastHeader.MessageTypeId
h.Timestamp = cs.lastHeader.Timestamp
ts = cs.lastInAbsoluteTimestamp + cs.lastHeader.Timestamp
// If there's a message already started, use it.
if cs.currentMessage != nil {
m = cs.currentMessage
}
}
if m == nil {
m = &Message{
Type: h.MessageTypeId,
ChunkStreamId: h.ChunkStreamId,
StreamId: h.MessageStreamId,
Timestamp: h.CalculateTimestamp(),
AbsoluteTimestamp: ts,
Length: h.MessageLength,
Buffer: new(bytes.Buffer),
}
}
cs.lastInAbsoluteTimestamp = ts
rs := m.RemainingBytes()
if rs > c.inChunkSize {
rs = c.inChunkSize
}
_, err = io.CopyN(m.Buffer, c, int64(rs))
if err != nil {
if c.connected {
log.Warn("unable to copy %d message bytes from buffer", rs)
c.Disconnect()
}
return
}
if m.RemainingBytes() == 0 {
cs.currentMessage = nil
c.inMessages <- m
} else {
cs.currentMessage = m
}
}
}

60
client_send.go Normal file
View File

@@ -0,0 +1,60 @@
package rtmp
import (
"io"
)
func (c *Client) sendLoop() {
for {
m := <-c.outMessages
var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId]
if cs == nil {
cs = NewOutboundChunkStream(m.ChunkStreamId)
}
h := cs.NewOutboundHeader(m)
var n int64 = 0
var err error
var ws uint32 = 0
var rem uint32 = m.Length
for rem > 0 {
log.Debug("send header: %+v", h)
_, err = h.Write(c)
if err != nil {
if c.connected {
log.Warn("unable to send header: %v", err)
c.Disconnect()
}
return
}
ws = rem
if ws > c.outChunkSize {
ws = c.outChunkSize
}
log.Debug("send bytes: %d", ws)
n, err = io.CopyN(c, m.Buffer, int64(ws))
if err != nil {
if c.connected {
log.Warn("unable to send message")
c.Disconnect()
}
return
}
rem -= uint32(n)
// Set the header to continuation only for the
// next iteration (if it happens).
h.Format = HEADER_FORMAT_CONTINUATION
}
log.Debug("send complete")
}
}

8
command.go Normal file
View File

@@ -0,0 +1,8 @@
package rtmp
type Command struct {
Name string
TransactionId float64
Version uint8
Objects []interface{}
}

View File

@@ -38,6 +38,11 @@ const (
MESSAGE_TYPE_FLV = 0x16
)
const (
AMF0 = 0x00
AMF3 = 0x03
)
const (
MESSAGE_DISPATCH_QUEUE_LENGTH = 100
)

View File

@@ -2,6 +2,7 @@ package rtmp
import (
"bytes"
"github.com/elobuff/goamf"
)
type Message struct {
@@ -21,3 +22,51 @@ func (m *Message) RemainingBytes() uint32 {
return m.Length - uint32(m.Buffer.Len())
}
func (m *Message) DecodeCommand(dec *amf.Decoder) (*Command, error) {
var err error
var obj interface{}
cmd := new(Command)
cmd.Version = AMF0
if m.ChunkStreamId != CHUNK_STREAM_ID_COMMAND {
return cmd, Error("message is not a command message")
}
switch m.Type {
case MESSAGE_TYPE_AMF3:
cmd.Version = AMF3
_, err = m.Buffer.ReadByte()
if err != nil {
return cmd, Error("unable to read first byte of amf3 message")
}
fallthrough
case MESSAGE_TYPE_AMF0:
cmd.Name, err = dec.DecodeAmf0String(m.Buffer, true)
if err != nil {
return cmd, Error("unable to read command from amf message")
}
cmd.TransactionId, err = dec.DecodeAmf0Number(m.Buffer, true)
if err != nil {
return cmd, Error("unable to read tid from amf message")
}
for m.Buffer.Len() > 0 {
obj, err = dec.Decode(m.Buffer, 0)
if err != nil {
return cmd, Error("unable to read object from amf message: %s", err)
}
cmd.Objects = append(cmd.Objects, obj)
}
default:
return cmd, Error("unable to decode message: %+v", m)
}
log.Debug("command decoded: %+v", cmd)
return cmd, err
}

View File

@@ -1,12 +1,18 @@
package rtmp
import (
"errors"
"fmt"
"github.com/jcoene/gologger"
"time"
)
var log logger.Logger = *logger.NewLogger(logger.LOG_LEVEL_DEBUG, "rtmp")
func Error(f string, v ...interface{}) error {
return errors.New(fmt.Sprintf(f, v...))
}
func GetCurrentTimestamp() uint32 {
return uint32(time.Now().UnixNano()/int64(1000000)) % TIMESTAMP_MAX
}