mirror of
https://github.com/zhangpeihao/gortmp
synced 2025-09-26 20:01:11 +08:00
966 lines
33 KiB
Go
966 lines
33 KiB
Go
// Copyright 2013, zhangpeihao All rights reserved.
|
||
|
||
package gortmp
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"encoding/binary"
|
||
"errors"
|
||
"github.com/zhangpeihao/goamf"
|
||
"github.com/zhangpeihao/log"
|
||
"io"
|
||
"net"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// Conn
|
||
//
|
||
// Common connection functions
|
||
type Conn interface {
|
||
Close()
|
||
Send(message *Message) error
|
||
CreateChunkStream(ID uint32) (*OutboundChunkStream, error)
|
||
CloseChunkStream(ID uint32)
|
||
NewTransactionID() uint32
|
||
CreateMediaChunkStream() (*OutboundChunkStream, error)
|
||
CloseMediaChunkStream(id uint32)
|
||
SetStreamBufferSize(streamId uint32, size uint32)
|
||
OutboundChunkStream(id uint32) (chunkStream *OutboundChunkStream, found bool)
|
||
InboundChunkStream(id uint32) (chunkStream *InboundChunkStream, found bool)
|
||
SetWindowAcknowledgementSize()
|
||
SetPeerBandwidth(peerBandwidth uint32, limitType byte)
|
||
SetChunkSize(chunkSize uint32)
|
||
SendUserControlMessage(eventId uint16)
|
||
}
|
||
|
||
// Connection handler
|
||
type ConnHandler interface {
|
||
// Received message
|
||
OnReceived(conn Conn, message *Message)
|
||
// Received command
|
||
OnReceivedRtmpCommand(conn Conn, command *Command)
|
||
// Connection closed
|
||
OnClosed(conn Conn)
|
||
}
|
||
|
||
// conn
|
||
//
|
||
// To maintain all chunk streams in one network connection.
|
||
type conn struct {
|
||
// Chunk streams
|
||
outChunkStreams map[uint32]*OutboundChunkStream
|
||
inChunkStreams map[uint32]*InboundChunkStream
|
||
|
||
// High-priority send message buffer.
|
||
// Protocol control messages are sent with highest priority.
|
||
highPriorityMessageQueue chan *Message
|
||
highPriorityMessage *Message
|
||
highPriorityMessageOffset int
|
||
|
||
// Middle-priority send message buffer.
|
||
middlePriorityMessageQueue chan *Message
|
||
middlePriorityMessage *Message
|
||
middlePriorityMessageOffset int
|
||
|
||
// Low-priority send message buffer.
|
||
// the video message is assigned the lowest priority.
|
||
lowPriorityMessageQueue chan *Message
|
||
lowPriorityMessage *Message
|
||
lowPriorityMessageOffset int
|
||
|
||
// Chunk size
|
||
inChunkSize uint32
|
||
outChunkSize uint32
|
||
outChunkSizeTemp uint32
|
||
|
||
// Bytes counter(For window ack)
|
||
inBytes uint32
|
||
outBytes uint32
|
||
|
||
// Previous window acknowledgement inbytes
|
||
inBytesPreWindow uint32
|
||
|
||
// Window size
|
||
inWindowSize uint32
|
||
outWindowSize uint32
|
||
|
||
// Bandwidth
|
||
inBandwidth uint32
|
||
outBandwidth uint32
|
||
|
||
// Bandwidth Limit
|
||
inBandwidthLimit uint8
|
||
outBandwidthLimit uint8
|
||
|
||
// Media chunk stream ID
|
||
mediaChunkStreamIDAllocator []bool
|
||
mediaChunkStreamIDAllocatorLocker sync.Mutex
|
||
|
||
// Closed
|
||
closed bool
|
||
|
||
// Handler
|
||
handler ConnHandler
|
||
|
||
// Network connection
|
||
c net.Conn
|
||
br *bufio.Reader
|
||
bw *bufio.Writer
|
||
|
||
// Last transaction ID
|
||
lastTransactionID uint32
|
||
|
||
// Error
|
||
err error
|
||
}
|
||
|
||
// Create new connection
|
||
func NewConn(c net.Conn, br *bufio.Reader, bw *bufio.Writer, handler ConnHandler, maxChannelNumber int) Conn {
|
||
conn := &conn{
|
||
c: c,
|
||
br: br,
|
||
bw: bw,
|
||
outChunkStreams: make(map[uint32]*OutboundChunkStream),
|
||
inChunkStreams: make(map[uint32]*InboundChunkStream),
|
||
highPriorityMessageQueue: make(chan *Message, DEFAULT_HIGH_PRIORITY_BUFFER_SIZE),
|
||
middlePriorityMessageQueue: make(chan *Message, DEFAULT_MIDDLE_PRIORITY_BUFFER_SIZE),
|
||
lowPriorityMessageQueue: make(chan *Message, DEFAULT_LOW_PRIORITY_BUFFER_SIZE),
|
||
inChunkSize: DEFAULT_CHUNK_SIZE,
|
||
outChunkSize: DEFAULT_CHUNK_SIZE,
|
||
inWindowSize: DEFAULT_WINDOW_SIZE,
|
||
outWindowSize: DEFAULT_WINDOW_SIZE,
|
||
inBandwidth: DEFAULT_WINDOW_SIZE,
|
||
outBandwidth: DEFAULT_WINDOW_SIZE,
|
||
inBandwidthLimit: BINDWIDTH_LIMIT_DYNAMIC,
|
||
outBandwidthLimit: BINDWIDTH_LIMIT_DYNAMIC,
|
||
handler: handler,
|
||
mediaChunkStreamIDAllocator: make([]bool, maxChannelNumber),
|
||
}
|
||
// Create "Protocol control chunk stream"
|
||
conn.outChunkStreams[CS_ID_PROTOCOL_CONTROL] = NewOutboundChunkStream(CS_ID_PROTOCOL_CONTROL)
|
||
// Create "Command message chunk stream"
|
||
conn.outChunkStreams[CS_ID_COMMAND] = NewOutboundChunkStream(CS_ID_COMMAND)
|
||
// Create "User control chunk stream"
|
||
conn.outChunkStreams[CS_ID_USER_CONTROL] = NewOutboundChunkStream(CS_ID_USER_CONTROL)
|
||
go conn.sendLoop()
|
||
go conn.readLoop()
|
||
return conn
|
||
}
|
||
|
||
// Send high priority message in continuous chunks
|
||
func (conn *conn) sendMessage(message *Message) {
|
||
chunkStream, found := conn.outChunkStreams[message.ChunkStreamID]
|
||
if !found {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"Can not found chunk strem id %d", message.ChunkStreamID)
|
||
// Error
|
||
return
|
||
}
|
||
|
||
// message.Dump(">>>")
|
||
header := chunkStream.NewOutboundHeader(message)
|
||
_, err := header.Write(conn.bw)
|
||
if err != nil {
|
||
conn.error(err, "sendMessage write header")
|
||
return
|
||
}
|
||
// header.Dump(">>>")
|
||
if header.MessageLength > conn.outChunkSize {
|
||
// chunkStream.lastHeader = nil
|
||
// Split into some chunk
|
||
_, err = CopyNToNetwork(conn.bw, message.Buf, int64(conn.outChunkSize))
|
||
if err != nil {
|
||
conn.error(err, "sendMessage copy buffer")
|
||
return
|
||
}
|
||
remain := header.MessageLength - conn.outChunkSize
|
||
// Type 3 chunk
|
||
for {
|
||
err = conn.bw.WriteByte(byte(0xc0 | byte(header.ChunkStreamID)))
|
||
if err != nil {
|
||
conn.error(err, "sendMessage Type 3 chunk header")
|
||
return
|
||
}
|
||
if remain > conn.outChunkSize {
|
||
_, err = CopyNToNetwork(conn.bw, message.Buf, int64(conn.outChunkSize))
|
||
if err != nil {
|
||
conn.error(err, "sendMessage copy split buffer 1")
|
||
return
|
||
}
|
||
remain -= conn.outChunkSize
|
||
} else {
|
||
_, err = CopyNToNetwork(conn.bw, message.Buf, int64(remain))
|
||
if err != nil {
|
||
conn.error(err, "sendMessage copy split buffer 2")
|
||
return
|
||
}
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
_, err = CopyNToNetwork(conn.bw, message.Buf, int64(header.MessageLength))
|
||
if err != nil {
|
||
conn.error(err, "sendMessage copy buffer")
|
||
return
|
||
}
|
||
}
|
||
err = FlushToNetwork(conn.bw)
|
||
if err != nil {
|
||
conn.error(err, "sendMessage Flush 3")
|
||
return
|
||
}
|
||
if message.ChunkStreamID == CS_ID_PROTOCOL_CONTROL &&
|
||
message.Type == SET_CHUNK_SIZE &&
|
||
conn.outChunkSizeTemp != 0 {
|
||
// Set chunk size
|
||
conn.outChunkSize = conn.outChunkSizeTemp
|
||
conn.outChunkSizeTemp = 0
|
||
}
|
||
}
|
||
|
||
func (conn *conn) checkAndSendHighPriorityMessage() {
|
||
for len(conn.highPriorityMessageQueue) > 0 {
|
||
message := <-conn.highPriorityMessageQueue
|
||
conn.sendMessage(message)
|
||
}
|
||
}
|
||
|
||
// send loop
|
||
func (conn *conn) sendLoop() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
if conn.err == nil {
|
||
conn.err = r.(error)
|
||
}
|
||
}
|
||
conn.Close()
|
||
}()
|
||
for !conn.closed {
|
||
select {
|
||
case message := <-conn.highPriorityMessageQueue:
|
||
// Send all high priority messages
|
||
conn.sendMessage(message)
|
||
case message := <-conn.middlePriorityMessageQueue:
|
||
// Send one middle priority messages
|
||
conn.sendMessage(message)
|
||
conn.checkAndSendHighPriorityMessage()
|
||
case message := <-conn.lowPriorityMessageQueue:
|
||
// Check high priority message queue first
|
||
conn.checkAndSendHighPriorityMessage()
|
||
conn.sendMessage(message)
|
||
case <-time.After(time.Second):
|
||
// Check close
|
||
}
|
||
}
|
||
}
|
||
|
||
// read loop
|
||
func (conn *conn) readLoop() {
|
||
|
||
defer func() {
|
||
|
||
if r := recover(); r != nil {
|
||
if conn.err == nil {
|
||
conn.err = r.(error)
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"readLoop panic:", conn.err)
|
||
}
|
||
}
|
||
conn.Close()
|
||
conn.handler.OnClosed(conn)
|
||
}()
|
||
|
||
var found bool
|
||
var chunkstream *InboundChunkStream
|
||
var remain uint32
|
||
for !conn.closed {
|
||
// Read base header
|
||
n, vfmt, csi, err := ReadBaseHeader(conn.br)
|
||
CheckError(err, "ReadBaseHeader")
|
||
conn.inBytes += uint32(n)
|
||
// Get chunk stream
|
||
chunkstream, found = conn.inChunkStreams[csi]
|
||
if !found || chunkstream == nil {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE, "New stream 1 csi: %d, fmt: %d\n", csi, vfmt)
|
||
chunkstream = NewInboundChunkStream(csi)
|
||
conn.inChunkStreams[csi] = chunkstream
|
||
}
|
||
// Read header
|
||
header := &Header{}
|
||
n, err = header.ReadHeader(conn.br, vfmt, csi, chunkstream.lastHeader)
|
||
CheckError(err, "ReadHeader")
|
||
if !found {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE, "New stream 2 csi: %d, fmt: %d, header: %+v\n", csi, vfmt, header)
|
||
}
|
||
conn.inBytes += uint32(n)
|
||
var absoluteTimestamp uint32
|
||
var message *Message
|
||
switch vfmt {
|
||
case HEADER_FMT_FULL:
|
||
chunkstream.lastHeader = header
|
||
absoluteTimestamp = header.Timestamp
|
||
case HEADER_FMT_SAME_STREAM:
|
||
// A new message with same stream ID
|
||
if chunkstream.lastHeader == nil {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"A new message with fmt: %d, csi: %d\n", vfmt, csi)
|
||
header.Dump("err")
|
||
} else {
|
||
header.MessageStreamID = chunkstream.lastHeader.MessageStreamID
|
||
}
|
||
chunkstream.lastHeader = header
|
||
absoluteTimestamp = chunkstream.lastInAbsoluteTimestamp + header.Timestamp
|
||
case HEADER_FMT_SAME_LENGTH_AND_STREAM:
|
||
// A new message with same stream ID, message length and message type
|
||
if chunkstream.lastHeader == nil {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"A new message with fmt: %d, csi: %d\n", vfmt, csi)
|
||
header.Dump("err")
|
||
}
|
||
header.MessageStreamID = chunkstream.lastHeader.MessageStreamID
|
||
header.MessageLength = chunkstream.lastHeader.MessageLength
|
||
header.MessageTypeID = chunkstream.lastHeader.MessageTypeID
|
||
chunkstream.lastHeader = header
|
||
absoluteTimestamp = chunkstream.lastInAbsoluteTimestamp + header.Timestamp
|
||
case HEADER_FMT_CONTINUATION:
|
||
if chunkstream.receivedMessage != nil {
|
||
// Continuation the previous unfinished message
|
||
message = chunkstream.receivedMessage
|
||
}
|
||
if chunkstream.lastHeader == nil {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"A new message with fmt: %d, csi: %d\n", vfmt, csi)
|
||
header.Dump("err")
|
||
} else {
|
||
header.MessageStreamID = chunkstream.lastHeader.MessageStreamID
|
||
header.MessageLength = chunkstream.lastHeader.MessageLength
|
||
header.MessageTypeID = chunkstream.lastHeader.MessageTypeID
|
||
header.Timestamp = chunkstream.lastHeader.Timestamp
|
||
}
|
||
chunkstream.lastHeader = header
|
||
absoluteTimestamp = chunkstream.lastInAbsoluteTimestamp
|
||
}
|
||
if message == nil {
|
||
// New message
|
||
message = &Message{
|
||
ChunkStreamID: csi,
|
||
Type: header.MessageTypeID,
|
||
Timestamp: header.RealTimestamp(),
|
||
Size: header.MessageLength,
|
||
StreamID: header.MessageStreamID,
|
||
Buf: new(bytes.Buffer),
|
||
IsInbound: true,
|
||
AbsoluteTimestamp: absoluteTimestamp,
|
||
}
|
||
}
|
||
chunkstream.lastInAbsoluteTimestamp = absoluteTimestamp
|
||
// Read data
|
||
remain = message.Remain()
|
||
var n64 int64
|
||
if remain <= conn.inChunkSize {
|
||
// One chunk message
|
||
for {
|
||
// n64, err = CopyNFromNetwork(message.Buf, conn.br, int64(remain))
|
||
n64, err = io.CopyN(message.Buf, conn.br, int64(remain))
|
||
if err == nil {
|
||
conn.inBytes += uint32(n64)
|
||
if remain <= uint32(n64) {
|
||
break
|
||
} else {
|
||
remain -= uint32(n64)
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Message continue copy remain: %d\n", remain)
|
||
continue
|
||
}
|
||
}
|
||
netErr, ok := err.(net.Error)
|
||
if !ok || !netErr.Temporary() {
|
||
CheckError(err, "Read data 1")
|
||
}
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Message copy blocked!\n")
|
||
}
|
||
// Finished message
|
||
conn.received(message)
|
||
chunkstream.receivedMessage = nil
|
||
} else {
|
||
// Unfinish
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_DEBUG,
|
||
"Unfinish message(remain: %d, chunksize: %d)\n", remain, conn.inChunkSize)
|
||
|
||
remain = conn.inChunkSize
|
||
for {
|
||
// n64, err = CopyNFromNetwork(message.Buf, conn.br, int64(remain))
|
||
n64, err = io.CopyN(message.Buf, conn.br, int64(remain))
|
||
if err == nil {
|
||
conn.inBytes += uint32(n64)
|
||
if remain <= uint32(n64) {
|
||
break
|
||
} else {
|
||
remain -= uint32(n64)
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Unfinish message continue copy remain: %d\n", remain)
|
||
continue
|
||
}
|
||
break
|
||
}
|
||
netErr, ok := err.(net.Error)
|
||
if !ok || !netErr.Temporary() {
|
||
CheckError(err, "Read data 2")
|
||
}
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Unfinish message copy blocked!\n")
|
||
}
|
||
chunkstream.receivedMessage = message
|
||
}
|
||
|
||
// Check window
|
||
if conn.inBytes > (conn.inBytesPreWindow + conn.inWindowSize) {
|
||
// Send window acknowledgement
|
||
ackmessage := NewMessage(CS_ID_PROTOCOL_CONTROL, ACKNOWLEDGEMENT, 0, absoluteTimestamp+1, nil)
|
||
err = binary.Write(ackmessage.Buf, binary.BigEndian, conn.inBytes)
|
||
CheckError(err, "ACK Message write data")
|
||
conn.inBytesPreWindow = conn.inBytes
|
||
conn.Send(ackmessage)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (conn *conn) error(err error, desc string) {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Conn %s err: %s\n", desc, err.Error())
|
||
if conn.err == nil {
|
||
conn.err = err
|
||
}
|
||
conn.Close()
|
||
}
|
||
|
||
func (conn *conn) Close() {
|
||
// panic(errors.New("Closed"))
|
||
|
||
conn.closed = true
|
||
conn.c.Close()
|
||
}
|
||
|
||
// Send a message by channel
|
||
func (conn *conn) Send(message *Message) error {
|
||
csiType := (message.ChunkStreamID % 6)
|
||
if csiType == CS_ID_PROTOCOL_CONTROL || csiType == CS_ID_COMMAND {
|
||
// High priority
|
||
conn.highPriorityMessageQueue <- message
|
||
return nil
|
||
}
|
||
if message.Type == VIDEO_TYPE {
|
||
// Low priority
|
||
conn.lowPriorityMessageQueue <- message
|
||
return nil
|
||
}
|
||
conn.middlePriorityMessageQueue <- message
|
||
return nil
|
||
}
|
||
|
||
func (conn *conn) CreateChunkStream(id uint32) (*OutboundChunkStream, error) {
|
||
chunkStream, found := conn.outChunkStreams[id]
|
||
if found {
|
||
return nil, errors.New("Chunk stream existed")
|
||
}
|
||
chunkStream = NewOutboundChunkStream(id)
|
||
conn.outChunkStreams[id] = chunkStream
|
||
return chunkStream, nil
|
||
}
|
||
|
||
func (conn *conn) CloseChunkStream(id uint32) {
|
||
delete(conn.outChunkStreams, id)
|
||
}
|
||
|
||
func (conn *conn) CreateMediaChunkStream() (*OutboundChunkStream, error) {
|
||
var newChunkStreamID uint32
|
||
conn.mediaChunkStreamIDAllocatorLocker.Lock()
|
||
for index, occupited := range conn.mediaChunkStreamIDAllocator {
|
||
if !occupited {
|
||
newChunkStreamID = uint32((index+1)*6 + 2)
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_DEBUG,
|
||
"index: %d, newChunkStreamID: %d\n", index, newChunkStreamID)
|
||
// since allocate a newChunkStreamID, why not set the cocupited to true
|
||
conn.mediaChunkStreamIDAllocator[index] = true
|
||
break
|
||
}
|
||
}
|
||
conn.mediaChunkStreamIDAllocatorLocker.Unlock()
|
||
if newChunkStreamID == 0 {
|
||
return nil, errors.New("No more chunk stream ID to allocate")
|
||
}
|
||
chunkSteam, err := conn.CreateChunkStream(newChunkStreamID)
|
||
if err != nil {
|
||
conn.CloseMediaChunkStream(newChunkStreamID)
|
||
return nil, err
|
||
}
|
||
return chunkSteam, nil
|
||
}
|
||
|
||
func (conn *conn) OutboundChunkStream(id uint32) (chunkStream *OutboundChunkStream, found bool) {
|
||
chunkStream, found = conn.outChunkStreams[id]
|
||
return
|
||
}
|
||
|
||
func (conn *conn) InboundChunkStream(id uint32) (chunkStream *InboundChunkStream, found bool) {
|
||
chunkStream, found = conn.inChunkStreams[id]
|
||
return
|
||
}
|
||
|
||
func (conn *conn) CloseMediaChunkStream(id uint32) {
|
||
// and the id is not the index of Allocator slice
|
||
index := (id - 2) / 6 - 1
|
||
conn.mediaChunkStreamIDAllocatorLocker.Lock()
|
||
conn.mediaChunkStreamIDAllocator[index] = false
|
||
conn.mediaChunkStreamIDAllocatorLocker.Unlock()
|
||
conn.CloseChunkStream(id)
|
||
}
|
||
|
||
func (conn *conn) NewTransactionID() uint32 {
|
||
return atomic.AddUint32(&conn.lastTransactionID, 1)
|
||
}
|
||
|
||
func (conn *conn) received(message *Message) {
|
||
message.Dump("<<<")
|
||
tmpBuf := make([]byte, 4)
|
||
var err error
|
||
var subType byte
|
||
var dataSize uint32
|
||
var timestamp uint32
|
||
var timestampExt byte
|
||
if message.Type == AGGREGATE_MESSAGE_TYPE {
|
||
// Byte stream order
|
||
// Sub message type 1 byte
|
||
// Data size 3 bytes, big endian
|
||
// Timestamp 3 bytes
|
||
// Timestamp extend 1 byte, result = (result >>> 8) | ((result & 0x000000ff) << 24);
|
||
// 3 bytes ignored
|
||
// Data
|
||
// Previous tag size 4 bytes
|
||
var firstAggregateTimestamp uint32
|
||
for message.Buf.Len() > 0 {
|
||
// Sub type
|
||
subType, err = message.Buf.ReadByte()
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE read sub type err:", err)
|
||
return
|
||
}
|
||
|
||
// data size
|
||
_, err = io.ReadAtLeast(message.Buf, tmpBuf[1:], 3)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE read data size err:", err)
|
||
return
|
||
}
|
||
dataSize = binary.BigEndian.Uint32(tmpBuf)
|
||
|
||
// Timestamp
|
||
_, err = io.ReadAtLeast(message.Buf, tmpBuf[1:], 3)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE read timestamp err:", err)
|
||
return
|
||
}
|
||
timestamp = binary.BigEndian.Uint32(tmpBuf)
|
||
|
||
// Timestamp extend
|
||
timestampExt, err = message.Buf.ReadByte()
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE read timestamp extend err:", err)
|
||
return
|
||
}
|
||
timestamp |= (uint32(timestampExt) << 24)
|
||
if firstAggregateTimestamp == 0 {
|
||
firstAggregateTimestamp = timestamp
|
||
}
|
||
|
||
// Ignore 3 bytes
|
||
_, err = io.ReadAtLeast(message.Buf, tmpBuf[1:], 3)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE read ignore bytes err:", err)
|
||
return
|
||
}
|
||
|
||
subMessage := NewMessage(message.ChunkStreamID, subType, message.StreamID, 0, nil)
|
||
subMessage.Timestamp = 0
|
||
subMessage.IsInbound = true
|
||
subMessage.Size = dataSize
|
||
subMessage.AbsoluteTimestamp = message.AbsoluteTimestamp
|
||
// Data
|
||
_, err = io.CopyN(subMessage.Buf, message.Buf, int64(dataSize))
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE copy data err:", err)
|
||
return
|
||
}
|
||
|
||
// Recursion
|
||
conn.received(subMessage)
|
||
|
||
// Previous tag size
|
||
if message.Buf.Len() >= 4 {
|
||
_, err = io.ReadAtLeast(message.Buf, tmpBuf, 4)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE read previous tag size err:", err)
|
||
return
|
||
}
|
||
tmpBuf[0] = 0
|
||
} else {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::received() AGGREGATE_MESSAGE_TYPE miss previous tag size")
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
switch message.ChunkStreamID {
|
||
case CS_ID_PROTOCOL_CONTROL:
|
||
switch message.Type {
|
||
case SET_CHUNK_SIZE:
|
||
conn.invokeSetChunkSize(message)
|
||
case ABORT_MESSAGE:
|
||
conn.invokeAbortMessage(message)
|
||
case ACKNOWLEDGEMENT:
|
||
conn.invokeAcknowledgement(message)
|
||
case USER_CONTROL_MESSAGE:
|
||
conn.invokeUserControlMessage(message)
|
||
case WINDOW_ACKNOWLEDGEMENT_SIZE:
|
||
conn.invokeWindowAcknowledgementSize(message)
|
||
case SET_PEER_BANDWIDTH:
|
||
conn.invokeSetPeerBandwidth(message)
|
||
default:
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Unkown message type %d in Protocol control chunk stream!\n", message.Type)
|
||
}
|
||
case CS_ID_COMMAND:
|
||
if message.StreamID == 0 {
|
||
cmd := &Command{}
|
||
var err error
|
||
var transactionID float64
|
||
var object interface{}
|
||
switch message.Type {
|
||
case COMMAND_AMF3:
|
||
cmd.IsFlex = true
|
||
_, err = message.Buf.ReadByte()
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"Read first in flex commad err:", err)
|
||
return
|
||
}
|
||
fallthrough
|
||
case COMMAND_AMF0:
|
||
cmd.Name, err = amf.ReadString(message.Buf)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"AMF0 Read name err:", err)
|
||
return
|
||
}
|
||
transactionID, err = amf.ReadDouble(message.Buf)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"AMF0 Read transactionID err:", err)
|
||
return
|
||
}
|
||
cmd.TransactionID = uint32(transactionID)
|
||
for message.Buf.Len() > 0 {
|
||
object, err = amf.ReadValue(message.Buf)
|
||
if err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"AMF0 Read object err:", err)
|
||
return
|
||
}
|
||
cmd.Objects = append(cmd.Objects, object)
|
||
}
|
||
default:
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"Unkown message type %d in Command chunk stream!\n", message.Type)
|
||
}
|
||
conn.invokeCommand(cmd)
|
||
} else {
|
||
conn.handler.OnReceived(conn, message)
|
||
}
|
||
default:
|
||
conn.handler.OnReceived(conn, message)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (conn *conn) invokeSetChunkSize(message *Message) {
|
||
if err := binary.Read(message.Buf, binary.BigEndian, &conn.inChunkSize); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeSetChunkSize err:", err)
|
||
}
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::invokeSetChunkSize() conn.inChunkSize = %d\n", conn.inChunkSize)
|
||
}
|
||
|
||
func (conn *conn) invokeAbortMessage(message *Message) {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::invokeAbortMessage()")
|
||
}
|
||
|
||
func (conn *conn) invokeAcknowledgement(message *Message) {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::invokeAcknowledgement(): % 2x\n", message.Buf.Bytes())
|
||
}
|
||
|
||
// User Control Message
|
||
//
|
||
// The client or the server sends this message to notify the peer about
|
||
// the user control events. This message carries Event type and Event
|
||
// data.
|
||
// +------------------------------+-------------------------
|
||
// | Event Type ( 2- bytes ) | Event Data
|
||
// +------------------------------+-------------------------
|
||
// Figure 5 Pay load for the ‘User Control Message’.
|
||
//
|
||
//
|
||
// The first 2 bytes of the message data are used to identify the Event
|
||
// type. Event type is followed by Event data. Size of Event data field
|
||
// is variable.
|
||
//
|
||
//
|
||
// The client or the server sends this message to notify the peer about
|
||
// the user control events. For information about the message format,
|
||
// refer to the User Control Messages section in the RTMP Message
|
||
// Foramts draft.
|
||
//
|
||
// The following user control event types are supported:
|
||
// +---------------+--------------------------------------------------+
|
||
// | Event | Description |
|
||
// +---------------+--------------------------------------------------+
|
||
// |Stream Begin | The server sends this event to notify the client |
|
||
// | (=0) | that a stream has become functional and can be |
|
||
// | | used for communication. By default, this event |
|
||
// | | is sent on ID 0 after the application connect |
|
||
// | | command is successfully received from the |
|
||
// | | client. The event data is 4-byte and represents |
|
||
// | | the stream ID of the stream that became |
|
||
// | | functional. |
|
||
// +---------------+--------------------------------------------------+
|
||
// | Stream EOF | The server sends this event to notify the client |
|
||
// | (=1) | that the playback of data is over as requested |
|
||
// | | on this stream. No more data is sent without |
|
||
// | | issuing additional commands. The client discards |
|
||
// | | the messages received for the stream. The |
|
||
// | | 4 bytes of event data represent the ID of the |
|
||
// | | stream on which playback has ended. |
|
||
// +---------------+--------------------------------------------------+
|
||
// | StreamDry | The server sends this event to notify the client |
|
||
// | (=2) | that there is no more data on the stream. If the |
|
||
// | | server does not detect any message for a time |
|
||
// | | period, it can notify the subscribed clients |
|
||
// | | that the stream is dry. The 4 bytes of event |
|
||
// | | data represent the stream ID of the dry stream. |
|
||
// +---------------+--------------------------------------------------+
|
||
// | SetBuffer | The client sends this event to inform the server |
|
||
// | Length (=3) | of the buffer size (in milliseconds) that is |
|
||
// | | used to buffer any data coming over a stream. |
|
||
// | | This event is sent before the server starts |
|
||
// | | processing the stream. The first 4 bytes of the |
|
||
// | | event data represent the stream ID and the next |
|
||
// | | 4 bytes represent the buffer length, in |
|
||
// | | milliseconds. |
|
||
// +---------------+--------------------------------------------------+
|
||
// | StreamIs | The server sends this event to notify the client |
|
||
// | Recorded (=4) | that the stream is a recorded stream. The |
|
||
// | | 4 bytes event data represent the stream ID of |
|
||
// | | the recorded stream. |
|
||
// +---------------+--------------------------------------------------+
|
||
// | PingRequest | The server sends this event to test whether the |
|
||
// | (=6) | client is reachable. Event data is a 4-byte |
|
||
// | | timestamp, representing the local server time |
|
||
// | | when the server dispatched the command. The |
|
||
// | | client responds with kMsgPingResponse on |
|
||
// | | receiving kMsgPingRequest. |
|
||
// +---------------+--------------------------------------------------+
|
||
// | PingResponse | The client sends this event to the server in |
|
||
// | (=7) | response to the ping request. The event data is |
|
||
// | | a 4-byte timestamp, which was received with the |
|
||
// | | kMsgPingRequest request. |
|
||
// +---------------+--------------------------------------------------+
|
||
func (conn *conn) invokeUserControlMessage(message *Message) {
|
||
var eventType uint16
|
||
err := binary.Read(message.Buf, binary.BigEndian, &eventType)
|
||
if err != nil {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeUserControlMessage() read event type err: %s\n", err.Error())
|
||
return
|
||
}
|
||
switch eventType {
|
||
case EVENT_STREAM_BEGIN:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_STREAM_BEGIN")
|
||
case EVENT_STREAM_EOF:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_STREAM_EOF")
|
||
case EVENT_STREAM_DRY:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_STREAM_DRY")
|
||
case EVENT_SET_BUFFER_LENGTH:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_SET_BUFFER_LENGTH")
|
||
case EVENT_STREAM_IS_RECORDED:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_STREAM_IS_RECORDED")
|
||
case EVENT_PING_REQUEST:
|
||
// Respond ping
|
||
// Get server timestamp
|
||
var serverTimestamp uint32
|
||
err = binary.Read(message.Buf, binary.BigEndian, &serverTimestamp)
|
||
if err != nil {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeUserControlMessage() read serverTimestamp err: %s\n", err.Error())
|
||
return
|
||
}
|
||
respmessage := NewMessage(CS_ID_PROTOCOL_CONTROL, USER_CONTROL_MESSAGE, 0, message.Timestamp+1, nil)
|
||
respEventType := uint16(EVENT_PING_RESPONSE)
|
||
if err = binary.Write(respmessage.Buf, binary.BigEndian, &respEventType); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeUserControlMessage() write event type err:", err)
|
||
return
|
||
}
|
||
if err = binary.Write(respmessage.Buf, binary.BigEndian, &serverTimestamp); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeUserControlMessage() write streamId err:", err)
|
||
return
|
||
}
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() Ping response")
|
||
conn.Send(respmessage)
|
||
case EVENT_PING_RESPONSE:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_PING_RESPONSE")
|
||
case EVENT_REQUEST_VERIFY:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_REQUEST_VERIFY")
|
||
case EVENT_RESPOND_VERIFY:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_RESPOND_VERIFY")
|
||
case EVENT_BUFFER_EMPTY:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_BUFFER_EMPTY")
|
||
case EVENT_BUFFER_READY:
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() EVENT_BUFFER_READY")
|
||
default:
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE, "conn::invokeUserControlMessage() Unknown user control message :0x%x\n", eventType)
|
||
}
|
||
}
|
||
|
||
func (conn *conn) invokeWindowAcknowledgementSize(message *Message) {
|
||
var size uint32
|
||
var err error
|
||
if err = binary.Read(message.Buf, binary.BigEndian, &size); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeWindowAcknowledgementSize read window size err:", err)
|
||
return
|
||
}
|
||
conn.inWindowSize = size
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::invokeWindowAcknowledgementSize() conn.inWindowSize = %d\n", conn.inWindowSize)
|
||
}
|
||
|
||
func (conn *conn) invokeSetPeerBandwidth(message *Message) {
|
||
var err error
|
||
var size uint32
|
||
if err = binary.Read(message.Buf, binary.BigEndian, &conn.inBandwidth); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeSetPeerBandwidth read window size err:", err)
|
||
return
|
||
}
|
||
conn.inBandwidth = size
|
||
var limit byte
|
||
if limit, err = message.Buf.ReadByte(); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::invokeSetPeerBandwidth read limit err:", err)
|
||
return
|
||
}
|
||
conn.inBandwidthLimit = uint8(limit)
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn.inBandwidthLimit = %d/n", conn.inBandwidthLimit)
|
||
}
|
||
|
||
func (conn *conn) invokeCommand(cmd *Command) {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::invokeCommand()")
|
||
conn.handler.OnReceivedRtmpCommand(conn, cmd)
|
||
}
|
||
|
||
func (conn *conn) SetStreamBufferSize(streamId uint32, size uint32) {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::SetStreamBufferSize(streamId: %d, size: %d)\n", streamId, size)
|
||
message := NewMessage(CS_ID_PROTOCOL_CONTROL, USER_CONTROL_MESSAGE, 0, 1, nil)
|
||
eventType := uint16(EVENT_SET_BUFFER_LENGTH)
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &eventType); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetStreamBufferSize write event type err:", err)
|
||
return
|
||
}
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &streamId); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetStreamBufferSize write streamId err:", err)
|
||
return
|
||
}
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &size); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetStreamBufferSize write size err:", err)
|
||
return
|
||
}
|
||
conn.Send(message)
|
||
}
|
||
|
||
func (conn *conn) SetChunkSize(size uint32) {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::SetChunkSize(size: %d)\n", size)
|
||
message := NewMessage(CS_ID_PROTOCOL_CONTROL, SET_CHUNK_SIZE, 0, 0, nil)
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &size); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetChunkSize write event type err:", err)
|
||
return
|
||
}
|
||
conn.outChunkSizeTemp = size
|
||
conn.Send(message)
|
||
}
|
||
|
||
func (conn *conn) SetWindowAcknowledgementSize() {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::SetWindowAcknowledgementSize")
|
||
// Request window acknowledgement size
|
||
message := NewMessage(CS_ID_PROTOCOL_CONTROL, WINDOW_ACKNOWLEDGEMENT_SIZE, 0, 0, nil)
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &conn.outWindowSize); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetWindowAcknowledgementSize write window size err:", err)
|
||
return
|
||
}
|
||
message.Size = uint32(message.Buf.Len())
|
||
conn.Send(message)
|
||
}
|
||
func (conn *conn) SetPeerBandwidth(peerBandwidth uint32, limitType byte) {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::SetPeerBandwidth")
|
||
// Request window acknowledgement size
|
||
message := NewMessage(CS_ID_PROTOCOL_CONTROL, SET_PEER_BANDWIDTH, 0, 0, nil)
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &peerBandwidth); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetPeerBandwidth write peerBandwidth err:", err)
|
||
return
|
||
}
|
||
if err := message.Buf.WriteByte(limitType); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SetPeerBandwidth write limitType err:", err)
|
||
return
|
||
}
|
||
message.Size = uint32(message.Buf.Len())
|
||
conn.Send(message)
|
||
}
|
||
|
||
func (conn *conn) SendUserControlMessage(eventId uint16) {
|
||
logger.ModulePrintf(logHandler, log.LOG_LEVEL_TRACE,
|
||
"conn::SendUserControlMessage")
|
||
message := NewMessage(CS_ID_PROTOCOL_CONTROL, USER_CONTROL_MESSAGE, 0, 0, nil)
|
||
if err := binary.Write(message.Buf, binary.BigEndian, &eventId); err != nil {
|
||
logger.ModulePrintln(logHandler, log.LOG_LEVEL_WARNING,
|
||
"conn::SendUserControlMessage write event type err:", err)
|
||
return
|
||
}
|
||
conn.Send(message)
|
||
}
|