mirror of
https://github.com/elobuff/gortmp
synced 2025-09-26 23:15:51 +08:00
go fmt
This commit is contained in:
148
chunkstream.go
148
chunkstream.go
@@ -1,100 +1,100 @@
|
||||
package rtmp
|
||||
|
||||
type OutboundChunkStream struct {
|
||||
Id uint32
|
||||
lastHeader *Header
|
||||
lastOutAbsoluteTimestamp uint32
|
||||
lastInAbsoluteTimestamp uint32
|
||||
startAtTimestamp uint32
|
||||
Id uint32
|
||||
lastHeader *Header
|
||||
lastOutAbsoluteTimestamp uint32
|
||||
lastInAbsoluteTimestamp uint32
|
||||
startAtTimestamp uint32
|
||||
}
|
||||
|
||||
type InboundChunkStream struct {
|
||||
Id uint32
|
||||
lastHeader *Header
|
||||
lastOutAbsoluteTimestamp uint32
|
||||
lastInAbsoluteTimestamp uint32
|
||||
currentMessage *Message
|
||||
Id uint32
|
||||
lastHeader *Header
|
||||
lastOutAbsoluteTimestamp uint32
|
||||
lastInAbsoluteTimestamp uint32
|
||||
currentMessage *Message
|
||||
}
|
||||
|
||||
func NewOutboundChunkStream(id uint32) *OutboundChunkStream {
|
||||
return &OutboundChunkStream {
|
||||
Id: id,
|
||||
}
|
||||
return &OutboundChunkStream{
|
||||
Id: id,
|
||||
}
|
||||
}
|
||||
|
||||
func NewInboundChunkStream(id uint32) *InboundChunkStream {
|
||||
return &InboundChunkStream {
|
||||
Id: id,
|
||||
}
|
||||
return &InboundChunkStream{
|
||||
Id: id,
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *OutboundChunkStream) NewOutboundHeader(m *Message) *Header {
|
||||
h := &Header {
|
||||
ChunkStreamId: cs.Id,
|
||||
MessageLength: uint32(m.Buffer.Len()),
|
||||
MessageTypeId: m.Type,
|
||||
MessageStreamId: m.StreamId,
|
||||
}
|
||||
h := &Header{
|
||||
ChunkStreamId: cs.Id,
|
||||
MessageLength: uint32(m.Buffer.Len()),
|
||||
MessageTypeId: m.Type,
|
||||
MessageStreamId: m.StreamId,
|
||||
}
|
||||
|
||||
ts := m.Timestamp
|
||||
if ts == TIMESTAMP_AUTO {
|
||||
ts = cs.GetTimestamp()
|
||||
m.Timestamp = ts
|
||||
m.AbsoluteTimestamp = ts
|
||||
}
|
||||
ts := m.Timestamp
|
||||
if ts == TIMESTAMP_AUTO {
|
||||
ts = cs.GetTimestamp()
|
||||
m.Timestamp = ts
|
||||
m.AbsoluteTimestamp = ts
|
||||
}
|
||||
|
||||
deltaTimestamp := uint32(0)
|
||||
if cs.lastOutAbsoluteTimestamp < m.Timestamp {
|
||||
deltaTimestamp = m.Timestamp - cs.lastOutAbsoluteTimestamp
|
||||
}
|
||||
deltaTimestamp := uint32(0)
|
||||
if cs.lastOutAbsoluteTimestamp < m.Timestamp {
|
||||
deltaTimestamp = m.Timestamp - cs.lastOutAbsoluteTimestamp
|
||||
}
|
||||
|
||||
if cs.lastHeader == nil {
|
||||
h.Format = HEADER_FORMAT_FULL
|
||||
h.Timestamp = ts
|
||||
} else {
|
||||
if h.MessageStreamId == cs.lastHeader.MessageStreamId {
|
||||
if h.MessageTypeId == cs.lastHeader.MessageTypeId && h.MessageLength == cs.lastHeader.MessageLength {
|
||||
switch cs.lastHeader.Format {
|
||||
case HEADER_FORMAT_FULL:
|
||||
h.Format = HEADER_FORMAT_SAME_LENGTH_AND_STREAM
|
||||
h.Timestamp = deltaTimestamp
|
||||
case HEADER_FORMAT_SAME_STREAM:
|
||||
fallthrough
|
||||
case HEADER_FORMAT_SAME_LENGTH_AND_STREAM:
|
||||
fallthrough
|
||||
case HEADER_FORMAT_CONTINUATION:
|
||||
if cs.lastHeader.Timestamp == deltaTimestamp {
|
||||
h.Format = HEADER_FORMAT_CONTINUATION
|
||||
} else {
|
||||
h.Format = HEADER_FORMAT_SAME_LENGTH_AND_STREAM
|
||||
h.Timestamp = deltaTimestamp
|
||||
}
|
||||
}
|
||||
} else {
|
||||
h.Format = HEADER_FORMAT_SAME_STREAM
|
||||
h.Timestamp = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
if cs.lastHeader == nil {
|
||||
h.Format = HEADER_FORMAT_FULL
|
||||
h.Timestamp = ts
|
||||
} else {
|
||||
if h.MessageStreamId == cs.lastHeader.MessageStreamId {
|
||||
if h.MessageTypeId == cs.lastHeader.MessageTypeId && h.MessageLength == cs.lastHeader.MessageLength {
|
||||
switch cs.lastHeader.Format {
|
||||
case HEADER_FORMAT_FULL:
|
||||
h.Format = HEADER_FORMAT_SAME_LENGTH_AND_STREAM
|
||||
h.Timestamp = deltaTimestamp
|
||||
case HEADER_FORMAT_SAME_STREAM:
|
||||
fallthrough
|
||||
case HEADER_FORMAT_SAME_LENGTH_AND_STREAM:
|
||||
fallthrough
|
||||
case HEADER_FORMAT_CONTINUATION:
|
||||
if cs.lastHeader.Timestamp == deltaTimestamp {
|
||||
h.Format = HEADER_FORMAT_CONTINUATION
|
||||
} else {
|
||||
h.Format = HEADER_FORMAT_SAME_LENGTH_AND_STREAM
|
||||
h.Timestamp = deltaTimestamp
|
||||
}
|
||||
}
|
||||
} else {
|
||||
h.Format = HEADER_FORMAT_SAME_STREAM
|
||||
h.Timestamp = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if h.Timestamp >= TIMESTAMP_EXTENDED {
|
||||
h.ExtendedTimestamp = m.Timestamp
|
||||
h.Timestamp = TIMESTAMP_EXTENDED
|
||||
} else {
|
||||
h.ExtendedTimestamp = 0
|
||||
}
|
||||
if h.Timestamp >= TIMESTAMP_EXTENDED {
|
||||
h.ExtendedTimestamp = m.Timestamp
|
||||
h.Timestamp = TIMESTAMP_EXTENDED
|
||||
} else {
|
||||
h.ExtendedTimestamp = 0
|
||||
}
|
||||
|
||||
cs.lastHeader = h
|
||||
cs.lastOutAbsoluteTimestamp = ts
|
||||
cs.lastHeader = h
|
||||
cs.lastOutAbsoluteTimestamp = ts
|
||||
|
||||
return h
|
||||
return h
|
||||
}
|
||||
|
||||
func (cs *OutboundChunkStream) GetTimestamp() uint32 {
|
||||
if cs.startAtTimestamp == uint32(0) {
|
||||
cs.startAtTimestamp = GetCurrentTimestamp()
|
||||
return uint32(0)
|
||||
}
|
||||
if cs.startAtTimestamp == uint32(0) {
|
||||
cs.startAtTimestamp = GetCurrentTimestamp()
|
||||
return uint32(0)
|
||||
}
|
||||
|
||||
return GetCurrentTimestamp() - cs.startAtTimestamp
|
||||
return GetCurrentTimestamp() - cs.startAtTimestamp
|
||||
}
|
||||
|
562
client.go
562
client.go
@@ -1,385 +1,385 @@
|
||||
package rtmp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
"crypto/tls"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/elobuff/goamf"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
"bytes"
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
"crypto/tls"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/elobuff/goamf"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type ClientHandler interface {
|
||||
OnConnect()
|
||||
OnDisconnect()
|
||||
OnReceive(message *Message)
|
||||
OnConnect()
|
||||
OnDisconnect()
|
||||
OnReceive(message *Message)
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
url string
|
||||
url string
|
||||
|
||||
handler ClientHandler
|
||||
connected bool
|
||||
handler ClientHandler
|
||||
connected bool
|
||||
|
||||
conn net.Conn
|
||||
conn net.Conn
|
||||
|
||||
outBytes uint32
|
||||
outMessages chan *Message
|
||||
outWindowSize uint32
|
||||
outChunkSize uint32
|
||||
outChunkStreams map[uint32]*OutboundChunkStream
|
||||
outBytes uint32
|
||||
outMessages chan *Message
|
||||
outWindowSize uint32
|
||||
outChunkSize uint32
|
||||
outChunkStreams map[uint32]*OutboundChunkStream
|
||||
|
||||
inBytes uint32
|
||||
inMessages chan *Message
|
||||
inNotify chan uint8
|
||||
inWindowSize uint32
|
||||
inChunkSize uint32
|
||||
inChunkStreams map[uint32]*InboundChunkStream
|
||||
inBytes uint32
|
||||
inMessages chan *Message
|
||||
inNotify chan uint8
|
||||
inWindowSize uint32
|
||||
inChunkSize uint32
|
||||
inChunkStreams map[uint32]*InboundChunkStream
|
||||
|
||||
lastTransactionId uint32
|
||||
lastTransactionId uint32
|
||||
}
|
||||
|
||||
func NewClient(url string) (*Client, error) {
|
||||
c := &Client{
|
||||
url: url,
|
||||
c := &Client{
|
||||
url: url,
|
||||
|
||||
connected: false,
|
||||
connected: false,
|
||||
|
||||
outMessages: make(chan *Message, 100),
|
||||
outChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
outWindowSize: DEFAULT_WINDOW_SIZE,
|
||||
outChunkStreams: make(map[uint32]*OutboundChunkStream),
|
||||
outMessages: make(chan *Message, 100),
|
||||
outChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
outWindowSize: DEFAULT_WINDOW_SIZE,
|
||||
outChunkStreams: make(map[uint32]*OutboundChunkStream),
|
||||
|
||||
inMessages: make(chan *Message, 100),
|
||||
inChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
inWindowSize: DEFAULT_WINDOW_SIZE,
|
||||
inChunkStreams: make(map[uint32]*InboundChunkStream),
|
||||
}
|
||||
inMessages: make(chan *Message, 100),
|
||||
inChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
inWindowSize: DEFAULT_WINDOW_SIZE,
|
||||
inChunkStreams: make(map[uint32]*InboundChunkStream),
|
||||
}
|
||||
|
||||
err := c.Connect()
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
err := c.Connect()
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
|
||||
return c, err
|
||||
return c, err
|
||||
}
|
||||
|
||||
func (c *Client) Connect() (err error) {
|
||||
log.Info("connecting to %s", c.url)
|
||||
log.Info("connecting to %s", c.url)
|
||||
|
||||
url, err := url.Parse(c.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
url, err := url.Parse(c.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch url.Scheme {
|
||||
case "rtmp":
|
||||
c.conn, err = net.Dial("tcp", url.Host)
|
||||
case "rtmps":
|
||||
config := &tls.Config{InsecureSkipVerify: true}
|
||||
c.conn, err = tls.Dial("tcp", url.Host, config)
|
||||
default:
|
||||
return errors.New(fmt.Sprintf("Unsupported scheme: %s", url.Scheme))
|
||||
}
|
||||
switch url.Scheme {
|
||||
case "rtmp":
|
||||
c.conn, err = net.Dial("tcp", url.Host)
|
||||
case "rtmps":
|
||||
config := &tls.Config{InsecureSkipVerify: true}
|
||||
c.conn, err = tls.Dial("tcp", url.Host, config)
|
||||
default:
|
||||
return errors.New(fmt.Sprintf("Unsupported scheme: %s", url.Scheme))
|
||||
}
|
||||
|
||||
err = c.handshake()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.handshake()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.connectCommand()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.connectCommand()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go c.dispatchLoop()
|
||||
go c.receiveLoop()
|
||||
go c.sendLoop()
|
||||
go c.dispatchLoop()
|
||||
go c.receiveLoop()
|
||||
go c.sendLoop()
|
||||
|
||||
log.Info("connected to %s", c.url)
|
||||
log.Info("connected to %s", c.url)
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) NextTransactionId() uint32 {
|
||||
return atomic.AddUint32(&c.lastTransactionId, 1)
|
||||
return atomic.AddUint32(&c.lastTransactionId, 1)
|
||||
}
|
||||
|
||||
func (c *Client) connectCommand() (err error) {
|
||||
buf := new(bytes.Buffer)
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
amf.WriteString(buf, "connect")
|
||||
amf.WriteString(buf, "connect")
|
||||
|
||||
tid := c.NextTransactionId()
|
||||
amf.WriteDouble(buf, float64(tid))
|
||||
tid := c.NextTransactionId()
|
||||
amf.WriteDouble(buf, float64(tid))
|
||||
|
||||
opts := *amf.MakeObject()
|
||||
opts["app"] = ""
|
||||
opts["flashVer"] = "WIN 10,1,85,3"
|
||||
opts["swfUrl"] = "app://mod_ser.dat"
|
||||
opts["tcUrl"] = c.url
|
||||
opts["fpad"] = false
|
||||
opts["capabilities"] = 239
|
||||
opts["audioCodecs"] = 3191
|
||||
opts["videoCodecs"] = 252
|
||||
opts["videoFunction"] = 1
|
||||
opts["pageUrl"] = nil
|
||||
opts["objectEncoding"] = 3
|
||||
opts := *amf.MakeObject()
|
||||
opts["app"] = ""
|
||||
opts["flashVer"] = "WIN 10,1,85,3"
|
||||
opts["swfUrl"] = "app://mod_ser.dat"
|
||||
opts["tcUrl"] = c.url
|
||||
opts["fpad"] = false
|
||||
opts["capabilities"] = 239
|
||||
opts["audioCodecs"] = 3191
|
||||
opts["videoCodecs"] = 252
|
||||
opts["videoFunction"] = 1
|
||||
opts["pageUrl"] = nil
|
||||
opts["objectEncoding"] = 3
|
||||
|
||||
cmh := *amf.MakeObject()
|
||||
cmh["DSMessagingVersion"] = 1
|
||||
cmh["DSId"] = "my-rtmps"
|
||||
cmh := *amf.MakeObject()
|
||||
cmh["DSMessagingVersion"] = 1
|
||||
cmh["DSId"] = "my-rtmps"
|
||||
|
||||
cm := *amf.MakeTypedObject()
|
||||
cm.Type = "flex.messaging.messages.CommandMessage"
|
||||
cm.Object["destination"] = ""
|
||||
cm.Object["operation"] = 5
|
||||
cm.Object["correlationId"] = ""
|
||||
cm.Object["timestamp"] = 0
|
||||
cm.Object["timeToLive"] = 0
|
||||
cm.Object["messageId"] = uuid.New()
|
||||
cm.Object["body"] = nil
|
||||
cm.Object["headers"] = cmh
|
||||
cm := *amf.MakeTypedObject()
|
||||
cm.Type = "flex.messaging.messages.CommandMessage"
|
||||
cm.Object["destination"] = ""
|
||||
cm.Object["operation"] = 5
|
||||
cm.Object["correlationId"] = ""
|
||||
cm.Object["timestamp"] = 0
|
||||
cm.Object["timeToLive"] = 0
|
||||
cm.Object["messageId"] = uuid.New()
|
||||
cm.Object["body"] = nil
|
||||
cm.Object["headers"] = cmh
|
||||
|
||||
amf.WriteObject(buf, opts)
|
||||
amf.WriteObject(buf, opts)
|
||||
|
||||
amf.WriteBoolean(buf, false)
|
||||
amf.WriteString(buf, "nil")
|
||||
amf.WriteString(buf, "")
|
||||
amf.AMF3_WriteValue(buf, cm)
|
||||
amf.WriteBoolean(buf, false)
|
||||
amf.WriteString(buf, "nil")
|
||||
amf.WriteString(buf, "")
|
||||
amf.AMF3_WriteValue(buf, cm)
|
||||
|
||||
m := &Message{
|
||||
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
|
||||
Type: MESSAGE_TYPE_AMF0,
|
||||
Length: uint32(buf.Len()),
|
||||
Buffer: buf,
|
||||
}
|
||||
m := &Message{
|
||||
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
|
||||
Type: MESSAGE_TYPE_AMF0,
|
||||
Length: uint32(buf.Len()),
|
||||
Buffer: buf,
|
||||
}
|
||||
|
||||
c.outMessages <- m
|
||||
c.outMessages <- m
|
||||
|
||||
return
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Client) Disconnect() {
|
||||
c.connected = false
|
||||
c.conn.Close()
|
||||
c.connected = false
|
||||
c.conn.Close()
|
||||
|
||||
log.Info("disconnected from %s", c.url, c.outBytes, c.inBytes)
|
||||
log.Info("disconnected from %s", c.url, c.outBytes, c.inBytes)
|
||||
}
|
||||
|
||||
func (c *Client) dispatchLoop() {
|
||||
for {
|
||||
m := <- c.inMessages
|
||||
for {
|
||||
m := <-c.inMessages
|
||||
|
||||
switch m.ChunkStreamId {
|
||||
case CHUNK_STREAM_ID_PROTOCOL:
|
||||
c.handleProtocolMessage(m)
|
||||
case CHUNK_STREAM_ID_COMMAND:
|
||||
c.handleCommandMessage(m)
|
||||
}
|
||||
}
|
||||
switch m.ChunkStreamId {
|
||||
case CHUNK_STREAM_ID_PROTOCOL:
|
||||
c.handleProtocolMessage(m)
|
||||
case CHUNK_STREAM_ID_COMMAND:
|
||||
c.handleCommandMessage(m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) handleProtocolMessage(m *Message) {
|
||||
switch m.Type {
|
||||
case MESSAGE_TYPE_CHUNK_SIZE:
|
||||
size := binary.BigEndian.Uint32(m.Buffer.Bytes())
|
||||
log.Debug("setting chunk %d -> %d", c.inChunkSize, size)
|
||||
c.inChunkSize = size
|
||||
switch m.Type {
|
||||
case MESSAGE_TYPE_CHUNK_SIZE:
|
||||
size := binary.BigEndian.Uint32(m.Buffer.Bytes())
|
||||
log.Debug("setting chunk %d -> %d", c.inChunkSize, size)
|
||||
c.inChunkSize = size
|
||||
|
||||
case MESSAGE_TYPE_ACK_SIZE:
|
||||
log.Debug("ignoring ack size")
|
||||
case MESSAGE_TYPE_ACK_SIZE:
|
||||
log.Debug("ignoring ack size")
|
||||
|
||||
case MESSAGE_TYPE_BANDWIDTH:
|
||||
size := binary.BigEndian.Uint32(m.Buffer.Bytes())
|
||||
log.Debug("ignoring bandwidth %d", size)
|
||||
case MESSAGE_TYPE_BANDWIDTH:
|
||||
size := binary.BigEndian.Uint32(m.Buffer.Bytes())
|
||||
log.Debug("ignoring bandwidth %d", size)
|
||||
|
||||
default:
|
||||
log.Debug("ignoring other protocol message %d", m.Type)
|
||||
default:
|
||||
log.Debug("ignoring other protocol message %d", m.Type)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) handleCommandMessage(m *Message) {
|
||||
log.Debug("command message: %+v", m)
|
||||
log.Debug("command message: %+v", m)
|
||||
|
||||
c.connected = true
|
||||
c.connected = true
|
||||
}
|
||||
|
||||
func (c *Client) sendLoop() {
|
||||
for {
|
||||
m := <- c.outMessages
|
||||
for {
|
||||
m := <-c.outMessages
|
||||
|
||||
var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId]
|
||||
if cs == nil {
|
||||
cs = NewOutboundChunkStream(m.ChunkStreamId)
|
||||
}
|
||||
var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId]
|
||||
if cs == nil {
|
||||
cs = NewOutboundChunkStream(m.ChunkStreamId)
|
||||
}
|
||||
|
||||
h := cs.NewOutboundHeader(m)
|
||||
h := cs.NewOutboundHeader(m)
|
||||
|
||||
var n int64 = 0
|
||||
var err error
|
||||
var ws uint32 = 0
|
||||
var rem uint32 = m.Length
|
||||
var n int64 = 0
|
||||
var err error
|
||||
var ws uint32 = 0
|
||||
var rem uint32 = m.Length
|
||||
|
||||
for rem > 0 {
|
||||
log.Debug("rem is %d", rem)
|
||||
log.Debug("send message header: %+v", h)
|
||||
_, err = h.Write(c)
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to send header: %v", err)
|
||||
c.Disconnect()
|
||||
}
|
||||
return
|
||||
}
|
||||
for rem > 0 {
|
||||
log.Debug("rem is %d", rem)
|
||||
log.Debug("send message header: %+v", h)
|
||||
_, err = h.Write(c)
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to send header: %v", err)
|
||||
c.Disconnect()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ws = rem
|
||||
if ws > c.outChunkSize {
|
||||
ws = c.outChunkSize
|
||||
}
|
||||
ws = rem
|
||||
if ws > c.outChunkSize {
|
||||
ws = c.outChunkSize
|
||||
}
|
||||
|
||||
n, err = io.CopyN(c, m.Buffer, int64(ws))
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to send message")
|
||||
c.Disconnect()
|
||||
}
|
||||
return
|
||||
}
|
||||
n, err = io.CopyN(c, m.Buffer, int64(ws))
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to send message")
|
||||
c.Disconnect()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
rem -= uint32(n)
|
||||
rem -= uint32(n)
|
||||
|
||||
// Set the header to continuation only for the
|
||||
// next iteration (if it happens).
|
||||
h.Format = HEADER_FORMAT_CONTINUATION
|
||||
}
|
||||
// Set the header to continuation only for the
|
||||
// next iteration (if it happens).
|
||||
h.Format = HEADER_FORMAT_CONTINUATION
|
||||
}
|
||||
|
||||
log.Debug("finished sending message")
|
||||
log.Debug("finished sending message")
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) receiveLoop() {
|
||||
for {
|
||||
// Read the next header from the connection
|
||||
h, err := ReadHeader(c)
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to receive next header while connected")
|
||||
c.Disconnect()
|
||||
}
|
||||
return
|
||||
}
|
||||
for {
|
||||
// Read the next header from the connection
|
||||
h, err := ReadHeader(c)
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to receive next header while connected")
|
||||
c.Disconnect()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Determine whether or not we already have a chunk stream
|
||||
// allocated for this ID. If we don't, create one.
|
||||
var cs *InboundChunkStream = c.inChunkStreams[h.ChunkStreamId]
|
||||
if cs == nil {
|
||||
cs = NewInboundChunkStream(h.ChunkStreamId)
|
||||
c.inChunkStreams[h.ChunkStreamId] = cs
|
||||
}
|
||||
// Determine whether or not we already have a chunk stream
|
||||
// allocated for this ID. If we don't, create one.
|
||||
var cs *InboundChunkStream = c.inChunkStreams[h.ChunkStreamId]
|
||||
if cs == nil {
|
||||
cs = NewInboundChunkStream(h.ChunkStreamId)
|
||||
c.inChunkStreams[h.ChunkStreamId] = cs
|
||||
}
|
||||
|
||||
var ts uint32
|
||||
var m *Message
|
||||
var ts uint32
|
||||
var m *Message
|
||||
|
||||
if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) {
|
||||
log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId)
|
||||
c.Disconnect()
|
||||
return
|
||||
}
|
||||
if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) {
|
||||
log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId)
|
||||
c.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
switch h.Format {
|
||||
case HEADER_FORMAT_FULL:
|
||||
// If it's an entirely new header, replace the reference in
|
||||
// the chunk stream and set the working timestamp from
|
||||
// the header.
|
||||
cs.lastHeader = &h
|
||||
ts = h.Timestamp
|
||||
switch h.Format {
|
||||
case HEADER_FORMAT_FULL:
|
||||
// If it's an entirely new header, replace the reference in
|
||||
// the chunk stream and set the working timestamp from
|
||||
// the header.
|
||||
cs.lastHeader = &h
|
||||
ts = h.Timestamp
|
||||
|
||||
case HEADER_FORMAT_SAME_STREAM:
|
||||
// If it's the same stream, use the last message stream id,
|
||||
// but otherwise use values from the header.
|
||||
h.MessageStreamId = cs.lastHeader.MessageStreamId
|
||||
cs.lastHeader = &h
|
||||
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
|
||||
case HEADER_FORMAT_SAME_STREAM:
|
||||
// If it's the same stream, use the last message stream id,
|
||||
// but otherwise use values from the header.
|
||||
h.MessageStreamId = cs.lastHeader.MessageStreamId
|
||||
cs.lastHeader = &h
|
||||
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
|
||||
|
||||
case HEADER_FORMAT_SAME_LENGTH_AND_STREAM:
|
||||
// If it's the same length and stream, copy values from the
|
||||
// last header and replace it.
|
||||
h.MessageStreamId = cs.lastHeader.MessageStreamId
|
||||
h.MessageLength = cs.lastHeader.MessageLength
|
||||
h.MessageTypeId = cs.lastHeader.MessageTypeId
|
||||
cs.lastHeader = &h
|
||||
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
|
||||
case HEADER_FORMAT_SAME_LENGTH_AND_STREAM:
|
||||
// If it's the same length and stream, copy values from the
|
||||
// last header and replace it.
|
||||
h.MessageStreamId = cs.lastHeader.MessageStreamId
|
||||
h.MessageLength = cs.lastHeader.MessageLength
|
||||
h.MessageTypeId = cs.lastHeader.MessageTypeId
|
||||
cs.lastHeader = &h
|
||||
ts = cs.lastInAbsoluteTimestamp + h.Timestamp
|
||||
|
||||
case HEADER_FORMAT_CONTINUATION:
|
||||
// A full continuation of the previous stream. Copy all values.
|
||||
h.MessageStreamId = cs.lastHeader.MessageStreamId
|
||||
h.MessageLength = cs.lastHeader.MessageLength
|
||||
h.MessageTypeId = cs.lastHeader.MessageTypeId
|
||||
h.Timestamp = cs.lastHeader.Timestamp
|
||||
ts = cs.lastInAbsoluteTimestamp + cs.lastHeader.Timestamp
|
||||
case HEADER_FORMAT_CONTINUATION:
|
||||
// A full continuation of the previous stream. Copy all values.
|
||||
h.MessageStreamId = cs.lastHeader.MessageStreamId
|
||||
h.MessageLength = cs.lastHeader.MessageLength
|
||||
h.MessageTypeId = cs.lastHeader.MessageTypeId
|
||||
h.Timestamp = cs.lastHeader.Timestamp
|
||||
ts = cs.lastInAbsoluteTimestamp + cs.lastHeader.Timestamp
|
||||
|
||||
// If there's a message already started, use it.
|
||||
if cs.currentMessage != nil {
|
||||
m = cs.currentMessage
|
||||
}
|
||||
}
|
||||
// If there's a message already started, use it.
|
||||
if cs.currentMessage != nil {
|
||||
m = cs.currentMessage
|
||||
}
|
||||
}
|
||||
|
||||
if m == nil {
|
||||
m = &Message{
|
||||
Type: h.MessageTypeId,
|
||||
ChunkStreamId: h.ChunkStreamId,
|
||||
StreamId: h.MessageStreamId,
|
||||
Timestamp: h.CalculateTimestamp(),
|
||||
AbsoluteTimestamp: ts,
|
||||
Length: h.MessageLength,
|
||||
Buffer: new(bytes.Buffer),
|
||||
}
|
||||
}
|
||||
if m == nil {
|
||||
m = &Message{
|
||||
Type: h.MessageTypeId,
|
||||
ChunkStreamId: h.ChunkStreamId,
|
||||
StreamId: h.MessageStreamId,
|
||||
Timestamp: h.CalculateTimestamp(),
|
||||
AbsoluteTimestamp: ts,
|
||||
Length: h.MessageLength,
|
||||
Buffer: new(bytes.Buffer),
|
||||
}
|
||||
}
|
||||
|
||||
cs.lastInAbsoluteTimestamp = ts
|
||||
cs.lastInAbsoluteTimestamp = ts
|
||||
|
||||
rs := m.RemainingBytes()
|
||||
if rs > c.inChunkSize {
|
||||
rs = c.inChunkSize
|
||||
}
|
||||
rs := m.RemainingBytes()
|
||||
if rs > c.inChunkSize {
|
||||
rs = c.inChunkSize
|
||||
}
|
||||
|
||||
_, err = io.CopyN(m.Buffer, c, int64(rs))
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to copy %d message bytes from buffer", rs)
|
||||
c.Disconnect()
|
||||
}
|
||||
_, err = io.CopyN(m.Buffer, c, int64(rs))
|
||||
if err != nil {
|
||||
if c.connected {
|
||||
log.Warn("unable to copy %d message bytes from buffer", rs)
|
||||
c.Disconnect()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if m.RemainingBytes() == 0 {
|
||||
cs.currentMessage = nil
|
||||
c.inMessages <- m
|
||||
} else {
|
||||
cs.currentMessage = m
|
||||
}
|
||||
}
|
||||
if m.RemainingBytes() == 0 {
|
||||
cs.currentMessage = nil
|
||||
c.inMessages <- m
|
||||
} else {
|
||||
cs.currentMessage = m
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Read(p []byte) (n int, err error) {
|
||||
n, err = c.conn.Read(p)
|
||||
c.inBytes += uint32(n)
|
||||
log.Debug("read %d", n)
|
||||
return n, err
|
||||
n, err = c.conn.Read(p)
|
||||
c.inBytes += uint32(n)
|
||||
log.Debug("read %d", n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *Client) Write(p []byte) (n int, err error) {
|
||||
n, err = c.conn.Write(p)
|
||||
c.outBytes += uint32(n)
|
||||
log.Debug("write %d", n)
|
||||
return n, err
|
||||
n, err = c.conn.Write(p)
|
||||
c.outBytes += uint32(n)
|
||||
log.Debug("write %d", n)
|
||||
return n, err
|
||||
}
|
||||
|
58
const.go
58
const.go
@@ -1,48 +1,48 @@
|
||||
package rtmp
|
||||
|
||||
const (
|
||||
TIMESTAMP_MAX = uint32(2000000000)
|
||||
TIMESTAMP_AUTO = uint32(0)
|
||||
TIMESTAMP_EXTENDED = 0xFFFFFF
|
||||
TIMESTAMP_MAX = uint32(2000000000)
|
||||
TIMESTAMP_AUTO = uint32(0)
|
||||
TIMESTAMP_EXTENDED = 0xFFFFFF
|
||||
)
|
||||
|
||||
const (
|
||||
CHUNK_STREAM_ID_PROTOCOL = uint32(2)
|
||||
CHUNK_STREAM_ID_COMMAND = uint32(3)
|
||||
CHUNK_STREAM_ID_USER_CONTROL = uint32(4)
|
||||
CHUNK_STREAM_ID_PROTOCOL = uint32(2)
|
||||
CHUNK_STREAM_ID_COMMAND = uint32(3)
|
||||
CHUNK_STREAM_ID_USER_CONTROL = uint32(4)
|
||||
)
|
||||
|
||||
const (
|
||||
HEADER_FORMAT_FULL = 0x00
|
||||
HEADER_FORMAT_SAME_STREAM = 0x01
|
||||
HEADER_FORMAT_SAME_LENGTH_AND_STREAM = 0x02
|
||||
HEADER_FORMAT_CONTINUATION = 0x03
|
||||
HEADER_FORMAT_FULL = 0x00
|
||||
HEADER_FORMAT_SAME_STREAM = 0x01
|
||||
HEADER_FORMAT_SAME_LENGTH_AND_STREAM = 0x02
|
||||
HEADER_FORMAT_CONTINUATION = 0x03
|
||||
)
|
||||
|
||||
const (
|
||||
MESSAGE_TYPE_NONE = 0x00
|
||||
MESSAGE_TYPE_CHUNK_SIZE = 0x01
|
||||
MESSAGE_TYPE_ABORT = 0x02
|
||||
MESSAGE_TYPE_ACK = 0x03
|
||||
MESSAGE_TYPE_PING = 0x04
|
||||
MESSAGE_TYPE_ACK_SIZE = 0x05
|
||||
MESSAGE_TYPE_BANDWIDTH = 0x06
|
||||
MESSAGE_TYPE_AUDIO = 0x08
|
||||
MESSAGE_TYPE_VIDEO = 0x09
|
||||
MESSAGE_TYPE_FLEX = 0x0F
|
||||
MESSAGE_TYPE_AMF3_SHARED_OBJECT = 0x10
|
||||
MESSAGE_TYPE_AMF3 = 0x11
|
||||
MESSAGE_TYPE_INVOKE = 0x12
|
||||
MESSAGE_TYPE_AMF0_SHARED_OBJECT = 0x13
|
||||
MESSAGE_TYPE_AMF0 = 0x14
|
||||
MESSAGE_TYPE_FLV = 0x16
|
||||
MESSAGE_TYPE_NONE = 0x00
|
||||
MESSAGE_TYPE_CHUNK_SIZE = 0x01
|
||||
MESSAGE_TYPE_ABORT = 0x02
|
||||
MESSAGE_TYPE_ACK = 0x03
|
||||
MESSAGE_TYPE_PING = 0x04
|
||||
MESSAGE_TYPE_ACK_SIZE = 0x05
|
||||
MESSAGE_TYPE_BANDWIDTH = 0x06
|
||||
MESSAGE_TYPE_AUDIO = 0x08
|
||||
MESSAGE_TYPE_VIDEO = 0x09
|
||||
MESSAGE_TYPE_FLEX = 0x0F
|
||||
MESSAGE_TYPE_AMF3_SHARED_OBJECT = 0x10
|
||||
MESSAGE_TYPE_AMF3 = 0x11
|
||||
MESSAGE_TYPE_INVOKE = 0x12
|
||||
MESSAGE_TYPE_AMF0_SHARED_OBJECT = 0x13
|
||||
MESSAGE_TYPE_AMF0 = 0x14
|
||||
MESSAGE_TYPE_FLV = 0x16
|
||||
)
|
||||
|
||||
const (
|
||||
MESSAGE_DISPATCH_QUEUE_LENGTH = 100
|
||||
MESSAGE_DISPATCH_QUEUE_LENGTH = 100
|
||||
)
|
||||
|
||||
const (
|
||||
DEFAULT_CHUNK_SIZE = uint32(128)
|
||||
DEFAULT_WINDOW_SIZE = uint32(2500000)
|
||||
DEFAULT_CHUNK_SIZE = uint32(128)
|
||||
DEFAULT_WINDOW_SIZE = uint32(2500000)
|
||||
)
|
||||
|
86
handshake.go
86
handshake.go
@@ -1,60 +1,60 @@
|
||||
package rtmp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
)
|
||||
|
||||
func (c *Client) handshake() error {
|
||||
C0 := []byte{0x03}
|
||||
C1 := make([]byte, 1536)
|
||||
S0 := make([]byte, 1)
|
||||
S1 := make([]byte, 1536)
|
||||
S2 := make([]byte, 1536)
|
||||
C0 := []byte{0x03}
|
||||
C1 := make([]byte, 1536)
|
||||
S0 := make([]byte, 1)
|
||||
S1 := make([]byte, 1536)
|
||||
S2 := make([]byte, 1536)
|
||||
|
||||
rand.Read(C1)
|
||||
for i := 0; i < 8; i++ {
|
||||
C1[i] = 0x00
|
||||
}
|
||||
rand.Read(C1)
|
||||
for i := 0; i < 8; i++ {
|
||||
C1[i] = 0x00
|
||||
}
|
||||
|
||||
_, err := c.Write(C0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := c.Write(C0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.Write(C1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.Write(C1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.Read(S0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.Read(S0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bytes.Equal(C0, S0) != true {
|
||||
return errors.New("Handshake failed: version mismatch")
|
||||
}
|
||||
if bytes.Equal(C0, S0) != true {
|
||||
return errors.New("Handshake failed: version mismatch")
|
||||
}
|
||||
|
||||
_, err = c.Read(S1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.Read(S1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.Write(S1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.Write(S1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.Read(S2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.Read(S2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bytes.Equal(C1, S2) != true {
|
||||
return errors.New("Handshake failed: challenge mismatch")
|
||||
}
|
||||
if bytes.Equal(C1, S2) != true {
|
||||
return errors.New("Handshake failed: challenge mismatch")
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}
|
||||
|
338
header.go
338
header.go
@@ -1,213 +1,213 @@
|
||||
package rtmp
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
type Header struct {
|
||||
Format uint8
|
||||
ChunkStreamId uint32
|
||||
MessageLength uint32
|
||||
MessageTypeId uint8
|
||||
MessageStreamId uint32
|
||||
Timestamp uint32
|
||||
ExtendedTimestamp uint32
|
||||
Format uint8
|
||||
ChunkStreamId uint32
|
||||
MessageLength uint32
|
||||
MessageTypeId uint8
|
||||
MessageStreamId uint32
|
||||
Timestamp uint32
|
||||
ExtendedTimestamp uint32
|
||||
}
|
||||
|
||||
func NewHeader() *Header {
|
||||
return &Header{}
|
||||
return &Header{}
|
||||
}
|
||||
|
||||
func ReadHeader(r io.Reader) (Header, error) {
|
||||
h := *NewHeader()
|
||||
u8 := make([]byte, 1)
|
||||
u16 := make([]byte, 2)
|
||||
u32 := make([]byte, 4)
|
||||
h := *NewHeader()
|
||||
u8 := make([]byte, 1)
|
||||
u16 := make([]byte, 2)
|
||||
u32 := make([]byte, 4)
|
||||
|
||||
// The first byte we read from the header will indicate the
|
||||
// format of the packet and chunk stream id
|
||||
_, err := r.Read(u8)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
// The first byte we read from the header will indicate the
|
||||
// format of the packet and chunk stream id
|
||||
_, err := r.Read(u8)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
|
||||
// Determine the packet format from the byte
|
||||
h.Format = (u8[0] & 0xC0) >> 6
|
||||
// Determine the packet format from the byte
|
||||
h.Format = (u8[0] & 0xC0) >> 6
|
||||
|
||||
// Determine Chunk Stream ID using the remainder of the byte
|
||||
h.ChunkStreamId = uint32(u8[0] & 0x3F)
|
||||
// Determine Chunk Stream ID using the remainder of the byte
|
||||
h.ChunkStreamId = uint32(u8[0] & 0x3F)
|
||||
|
||||
switch h.ChunkStreamId {
|
||||
case 0:
|
||||
// A Chunk Stream ID of 0 indicates that the real value
|
||||
// is between 64-319, which is reached by adding 64 to the
|
||||
// next byte.
|
||||
_, err = r.Read(u8)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.ChunkStreamId = uint32(64) + uint32(u8[0])
|
||||
switch h.ChunkStreamId {
|
||||
case 0:
|
||||
// A Chunk Stream ID of 0 indicates that the real value
|
||||
// is between 64-319, which is reached by adding 64 to the
|
||||
// next byte.
|
||||
_, err = r.Read(u8)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.ChunkStreamId = uint32(64) + uint32(u8[0])
|
||||
|
||||
case 1:
|
||||
// A Chunk Stream ID of 1 indicates that the real value
|
||||
// is between 64-65599 and can be reached by adding 64 to
|
||||
// the next byte and then multiplying the one after it
|
||||
// by 256.
|
||||
_, err = r.Read(u16)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.ChunkStreamId = uint32(u16[0]) + (256 * uint32(u16[1]))
|
||||
}
|
||||
case 1:
|
||||
// A Chunk Stream ID of 1 indicates that the real value
|
||||
// is between 64-65599 and can be reached by adding 64 to
|
||||
// the next byte and then multiplying the one after it
|
||||
// by 256.
|
||||
_, err = r.Read(u16)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.ChunkStreamId = uint32(u16[0]) + (256 * uint32(u16[1]))
|
||||
}
|
||||
|
||||
// If the header is full, same length, or same length
|
||||
// and stream, then we only need to extract the timestamp.
|
||||
if h.Format <= HEADER_FORMAT_SAME_LENGTH_AND_STREAM {
|
||||
_, err = r.Read(u32[1:])
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.Timestamp = binary.BigEndian.Uint32(u32)
|
||||
}
|
||||
// If the header is full, same length, or same length
|
||||
// and stream, then we only need to extract the timestamp.
|
||||
if h.Format <= HEADER_FORMAT_SAME_LENGTH_AND_STREAM {
|
||||
_, err = r.Read(u32[1:])
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.Timestamp = binary.BigEndian.Uint32(u32)
|
||||
}
|
||||
|
||||
// If the header is full or same stream, then we also
|
||||
// need to extract the message size and message type.
|
||||
if h.Format <= HEADER_FORMAT_SAME_STREAM {
|
||||
_, err = r.Read(u32[1:])
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.MessageLength = binary.BigEndian.Uint32(u32)
|
||||
// If the header is full or same stream, then we also
|
||||
// need to extract the message size and message type.
|
||||
if h.Format <= HEADER_FORMAT_SAME_STREAM {
|
||||
_, err = r.Read(u32[1:])
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.MessageLength = binary.BigEndian.Uint32(u32)
|
||||
|
||||
_, err = r.Read(u8)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.MessageTypeId = uint8(u8[0])
|
||||
}
|
||||
_, err = r.Read(u8)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.MessageTypeId = uint8(u8[0])
|
||||
}
|
||||
|
||||
// If the header is full, we also need to extract
|
||||
// the message stream id.
|
||||
if h.Format <= HEADER_FORMAT_FULL {
|
||||
_, err = r.Read(u32)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.MessageStreamId = binary.LittleEndian.Uint32(u32)
|
||||
}
|
||||
// If the header is full, we also need to extract
|
||||
// the message stream id.
|
||||
if h.Format <= HEADER_FORMAT_FULL {
|
||||
_, err = r.Read(u32)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
h.MessageStreamId = binary.LittleEndian.Uint32(u32)
|
||||
}
|
||||
|
||||
// If the header has an extended timestamp, we need to
|
||||
// extract that as well.
|
||||
if h.Timestamp == TIMESTAMP_EXTENDED {
|
||||
_, err = r.Read(u32)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
// If the header has an extended timestamp, we need to
|
||||
// extract that as well.
|
||||
if h.Timestamp == TIMESTAMP_EXTENDED {
|
||||
_, err = r.Read(u32)
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
|
||||
h.ExtendedTimestamp = binary.BigEndian.Uint32(u32)
|
||||
}
|
||||
h.ExtendedTimestamp = binary.BigEndian.Uint32(u32)
|
||||
}
|
||||
|
||||
return h, nil
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (h *Header) Write(w io.Writer) (n int, err error) {
|
||||
m := 0
|
||||
u8 := make([]byte, 1)
|
||||
u32 := make([]byte, 4)
|
||||
m := 0
|
||||
u8 := make([]byte, 1)
|
||||
u32 := make([]byte, 4)
|
||||
|
||||
switch {
|
||||
case h.ChunkStreamId <= 63:
|
||||
u8[0] = byte(h.Format << 6) | byte(h.ChunkStreamId)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
switch {
|
||||
case h.ChunkStreamId <= 63:
|
||||
u8[0] = byte(h.Format<<6) | byte(h.ChunkStreamId)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
|
||||
case h.ChunkStreamId <= 319:
|
||||
u8[0] = byte(h.Format << 6)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
case h.ChunkStreamId <= 319:
|
||||
u8[0] = byte(h.Format << 6)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
|
||||
u8[0] = byte(h.ChunkStreamId - 64)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
u8[0] = byte(h.ChunkStreamId - 64)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
|
||||
case h.ChunkStreamId <= 65599:
|
||||
u8[0] = byte(h.Format << 6) | byte(0x01)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
case h.ChunkStreamId <= 65599:
|
||||
u8[0] = byte(h.Format<<6) | byte(0x01)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
|
||||
tmp := uint16(h.ChunkStreamId - 64)
|
||||
err = binary.Write(w, binary.BigEndian, &tmp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 2
|
||||
tmp := uint16(h.ChunkStreamId - 64)
|
||||
err = binary.Write(w, binary.BigEndian, &tmp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 2
|
||||
|
||||
default:
|
||||
return n, errors.New("chunk stream too large")
|
||||
}
|
||||
default:
|
||||
return n, errors.New("chunk stream too large")
|
||||
}
|
||||
|
||||
if h.Format <= HEADER_FORMAT_SAME_LENGTH_AND_STREAM {
|
||||
binary.BigEndian.PutUint32(u32, h.Timestamp)
|
||||
m, err = w.Write(u32[1:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += m
|
||||
}
|
||||
if h.Format <= HEADER_FORMAT_SAME_LENGTH_AND_STREAM {
|
||||
binary.BigEndian.PutUint32(u32, h.Timestamp)
|
||||
m, err = w.Write(u32[1:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += m
|
||||
}
|
||||
|
||||
if h.Format <= HEADER_FORMAT_SAME_STREAM {
|
||||
binary.BigEndian.PutUint32(u32, h.MessageLength)
|
||||
m, err = w.Write(u32[1:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += m
|
||||
if h.Format <= HEADER_FORMAT_SAME_STREAM {
|
||||
binary.BigEndian.PutUint32(u32, h.MessageLength)
|
||||
m, err = w.Write(u32[1:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += m
|
||||
|
||||
u8[0] = byte(h.MessageTypeId)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
}
|
||||
u8[0] = byte(h.MessageTypeId)
|
||||
_, err = w.Write(u8)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 1
|
||||
}
|
||||
|
||||
if h.Format == HEADER_FORMAT_FULL {
|
||||
err = binary.Write(w, binary.LittleEndian, &h.MessageStreamId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 4
|
||||
}
|
||||
if h.Format == HEADER_FORMAT_FULL {
|
||||
err = binary.Write(w, binary.LittleEndian, &h.MessageStreamId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 4
|
||||
}
|
||||
|
||||
if h.Timestamp >= TIMESTAMP_EXTENDED {
|
||||
err = binary.Write(w, binary.BigEndian, &h.ExtendedTimestamp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 4
|
||||
}
|
||||
if h.Timestamp >= TIMESTAMP_EXTENDED {
|
||||
err = binary.Write(w, binary.BigEndian, &h.ExtendedTimestamp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n += 4
|
||||
}
|
||||
|
||||
return
|
||||
return
|
||||
}
|
||||
|
||||
func (h *Header) CalculateTimestamp() uint32 {
|
||||
if h.Timestamp >= TIMESTAMP_MAX {
|
||||
return h.ExtendedTimestamp
|
||||
}
|
||||
if h.Timestamp >= TIMESTAMP_MAX {
|
||||
return h.ExtendedTimestamp
|
||||
}
|
||||
|
||||
return h.Timestamp
|
||||
return h.Timestamp
|
||||
}
|
||||
|
24
message.go
24
message.go
@@ -1,23 +1,23 @@
|
||||
package rtmp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
Type uint8
|
||||
ChunkStreamId uint32
|
||||
StreamId uint32
|
||||
Timestamp uint32
|
||||
AbsoluteTimestamp uint32
|
||||
Length uint32
|
||||
Buffer *bytes.Buffer
|
||||
Type uint8
|
||||
ChunkStreamId uint32
|
||||
StreamId uint32
|
||||
Timestamp uint32
|
||||
AbsoluteTimestamp uint32
|
||||
Length uint32
|
||||
Buffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func (m *Message) RemainingBytes() uint32 {
|
||||
if m.Buffer == nil {
|
||||
return m.Length
|
||||
}
|
||||
if m.Buffer == nil {
|
||||
return m.Length
|
||||
}
|
||||
|
||||
return m.Length - uint32(m.Buffer.Len())
|
||||
return m.Length - uint32(m.Buffer.Len())
|
||||
}
|
||||
|
6
util.go
6
util.go
@@ -1,12 +1,12 @@
|
||||
package rtmp
|
||||
|
||||
import (
|
||||
"time"
|
||||
"github.com/elobuff/gologger"
|
||||
"github.com/elobuff/gologger"
|
||||
"time"
|
||||
)
|
||||
|
||||
var log logger.Logger = *logger.NewLogger(logger.LOG_LEVEL_DEBUG, "rtmp")
|
||||
|
||||
func GetCurrentTimestamp() uint32 {
|
||||
return uint32(time.Now().UnixNano()/int64(1000000)) % TIMESTAMP_MAX
|
||||
return uint32(time.Now().UnixNano()/int64(1000000)) % TIMESTAMP_MAX
|
||||
}
|
||||
|
Reference in New Issue
Block a user