mirror of
https://github.com/Monibuca/plugin-rtmp.git
synced 2025-10-05 15:37:11 +08:00
first commit
This commit is contained in:
15
README.md
15
README.md
@@ -1,2 +1,17 @@
|
||||
# rtmpplugin
|
||||
the rtmp protocol plugin for monibuca
|
||||
|
||||
实现了RTMP Server的基本功能,即接收来自OBS、ffmpeg等推流器的rtmp协议推流。
|
||||
实现了RTMP协议的播放,可供rtmp协议播放器拉流播放。
|
||||
|
||||
## 插件名称
|
||||
|
||||
RTMP
|
||||
|
||||
## 配置
|
||||
目前仅有的配置是监听的端口号
|
||||
|
||||
```toml
|
||||
[Plugins.RTMP]
|
||||
ListenAddr = ":1935"
|
||||
```
|
432
amf.go
Normal file
432
amf.go
Normal file
@@ -0,0 +1,432 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/Monibuca/engine/pool"
|
||||
"github.com/Monibuca/engine/util"
|
||||
"log"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// Action Message Format -- AMF 0
|
||||
// Action Message Format -- AMF 3
|
||||
// http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf
|
||||
// http://wwwimages.adobe.com/www.adobe.com/content/dam/Adobe/en/devnet/amf/pdf/amf-file-format-spec.pdf
|
||||
|
||||
// AMF Object == AMF Object Type(1 byte) + AMF Object Value
|
||||
//
|
||||
// AMF Object Value :
|
||||
// AMF0_STRING : 2 bytes(datasize,记录string的长度) + data(string)
|
||||
// AMF0_OBJECT : AMF0_STRING + AMF Object
|
||||
// AMF0_NULL : 0 byte
|
||||
// AMF0_NUMBER : 8 bytes
|
||||
// AMF0_DATE : 10 bytes
|
||||
// AMF0_BOOLEAN : 1 byte
|
||||
// AMF0_ECMA_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF0_OBJECT
|
||||
// AMF0_STRICT_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF Object
|
||||
|
||||
// 实际测试时,AMF0_ECMA_ARRAY数据如下:
|
||||
// 8 0 0 0 13 0 8 100 117 114 97 116 105 111 110 0 0 0 0 0 0 0 0 0 0 5 119 105 100 116 104 0 64 158 0 0 0 0 0 0 0 6 104 101 105 103 104 116 0 64 144 224 0 0 0 0 0
|
||||
// 8 0 0 0 13 | { 0 8 100 117 114 97 116 105 111 110 --- 0 0 0 0 0 0 0 0 0 } | { 0 5 119 105 100 116 104 --- 0 64 158 0 0 0 0 0 0 } | { 0 6 104 101 105 103 104 116 --- 0 64 144 224 0 0 0 0 0 } |...
|
||||
// 13 | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | ...
|
||||
// 13 | {AMF0_OBJECT} | {AMF0_OBJECT} | {AMF0_OBJECT} | ...
|
||||
// 13 | {duration --- 0} | {width --- 1920} | {height --- 1080} | ...
|
||||
|
||||
const (
|
||||
AMF0_NUMBER = 0x00 // 浮点数
|
||||
AMF0_BOOLEAN = 0x01 // 布尔型
|
||||
AMF0_STRING = 0x02 // 字符串
|
||||
AMF0_OBJECT = 0x03 // 对象,开始
|
||||
AMF0_MOVIECLIP = 0x04
|
||||
AMF0_NULL = 0x05 // null
|
||||
AMF0_UNDEFINED = 0x06
|
||||
AMF0_REFERENCE = 0x07
|
||||
AMF0_ECMA_ARRAY = 0x08
|
||||
AMF0_END_OBJECT = 0x09 // 对象,结束
|
||||
AMF0_STRICT_ARRAY = 0x0A
|
||||
AMF0_DATE = 0x0B // 日期
|
||||
AMF0_LONG_STRING = 0x0C // 字符串
|
||||
AMF0_UNSUPPORTED = 0x0D
|
||||
AMF0_RECORDSET = 0x0E
|
||||
AMF0_XML_DOCUMENT = 0x0F
|
||||
AMF0_TYPED_OBJECT = 0x10
|
||||
AMF0_AVMPLUS_OBJECT = 0x11
|
||||
|
||||
AMF3_UNDEFINED = 0x00
|
||||
AMF3_NULL = 0x01
|
||||
AMF3_FALSE = 0x02
|
||||
AMF3_TRUE = 0x03
|
||||
AMF3_INTEGER = 0x04
|
||||
AMF3_DOUBLE = 0x05
|
||||
AMF3_STRING = 0x06
|
||||
AMF3_XML_DOC = 0x07
|
||||
AMF3_DATE = 0x08
|
||||
AMF3_ARRAY = 0x09
|
||||
AMF3_OBJECT = 0x0A
|
||||
AMF3_XML = 0x0B
|
||||
AMF3_BYTE_ARRAY = 0x0C
|
||||
AMF3_VECTOR_INT = 0x0D
|
||||
AMF3_VECTOR_UINT = 0x0E
|
||||
AMF3_VECTOR_DOUBLE = 0x0F
|
||||
AMF3_VECTOR_OBJECT = 0x10
|
||||
AMF3_DICTIONARY = 0x11
|
||||
)
|
||||
|
||||
var END_OBJ = []byte{0, 0, AMF0_END_OBJECT}
|
||||
|
||||
type AMFObject interface{}
|
||||
|
||||
type AMFObjects map[string]AMFObject
|
||||
|
||||
func newAMFObjects() AMFObjects {
|
||||
return make(AMFObjects, 0)
|
||||
}
|
||||
|
||||
func DecodeAMFObject(obj interface{}, key string) interface{} {
|
||||
if v, ok := obj.(AMFObjects)[key]; ok {
|
||||
return v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type AMF struct {
|
||||
*bytes.Buffer
|
||||
}
|
||||
|
||||
func newAMFEncoder() *AMF {
|
||||
return &AMF{
|
||||
new(bytes.Buffer),
|
||||
}
|
||||
}
|
||||
|
||||
func newAMFDecoder(b []byte) *AMF {
|
||||
return &AMF{
|
||||
bytes.NewBuffer(b),
|
||||
}
|
||||
}
|
||||
func (amf *AMF) readSize() (int, error) {
|
||||
b, err := readBytes(amf.Buffer, 4)
|
||||
size := int(util.BigEndian.Uint32(b))
|
||||
pool.RecycleSlice(b)
|
||||
return size, err
|
||||
}
|
||||
func (amf *AMF) readSize16() (int, error) {
|
||||
b, err := readBytes(amf.Buffer, 2)
|
||||
size := int(util.BigEndian.Uint16(b))
|
||||
pool.RecycleSlice(b)
|
||||
return size, err
|
||||
}
|
||||
func (amf *AMF) readObjects() (obj []AMFObject, err error) {
|
||||
obj = make([]AMFObject, 0)
|
||||
|
||||
for amf.Len() > 0 {
|
||||
if v, err := amf.decodeObject(); err == nil {
|
||||
obj = append(obj, v)
|
||||
} else {
|
||||
return obj, err
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) writeObjects(obj []AMFObject) (err error) {
|
||||
for _, v := range obj {
|
||||
switch data := v.(type) {
|
||||
case string:
|
||||
err = amf.writeString(data)
|
||||
case float64:
|
||||
err = amf.writeNumber(data)
|
||||
case bool:
|
||||
err = amf.writeBool(data)
|
||||
case AMFObjects:
|
||||
err = amf.encodeObject(data)
|
||||
case nil:
|
||||
err = amf.writeNull()
|
||||
default:
|
||||
log.Printf("amf encode unknown type:%v", reflect.TypeOf(data).Name())
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) decodeObject() (obj AMFObject, err error) {
|
||||
if amf.Len() == 0 {
|
||||
return nil, errors.New(fmt.Sprintf("no enough bytes, %v/%v", amf.Len(), 1))
|
||||
}
|
||||
var t byte
|
||||
if t, err = amf.ReadByte(); err != nil {
|
||||
return
|
||||
}
|
||||
if err = amf.UnreadByte(); err != nil {
|
||||
return
|
||||
}
|
||||
switch t {
|
||||
case AMF0_NUMBER:
|
||||
return amf.readNumber()
|
||||
case AMF0_BOOLEAN:
|
||||
return amf.readBool()
|
||||
case AMF0_STRING:
|
||||
return amf.readString()
|
||||
case AMF0_OBJECT:
|
||||
return amf.readObject()
|
||||
case AMF0_MOVIECLIP:
|
||||
log.Println("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)")
|
||||
case AMF0_NULL:
|
||||
return amf.readNull()
|
||||
case AMF0_UNDEFINED:
|
||||
_, err = amf.ReadByte()
|
||||
return "Undefined", err
|
||||
case AMF0_REFERENCE:
|
||||
log.Println("reference-type.(AMF0_REFERENCE)")
|
||||
case AMF0_ECMA_ARRAY:
|
||||
return amf.readECMAArray()
|
||||
case AMF0_END_OBJECT:
|
||||
_, err = amf.ReadByte()
|
||||
return "ObjectEnd", err
|
||||
case AMF0_STRICT_ARRAY:
|
||||
return amf.readStrictArray()
|
||||
case AMF0_DATE:
|
||||
return amf.readDate()
|
||||
case AMF0_LONG_STRING:
|
||||
return amf.readLongString()
|
||||
case AMF0_UNSUPPORTED:
|
||||
log.Println("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)")
|
||||
case AMF0_RECORDSET:
|
||||
log.Println("This type is not supported and is reserved for future use.(AMF0_RECORDSET)")
|
||||
case AMF0_XML_DOCUMENT:
|
||||
return amf.readLongString()
|
||||
case AMF0_TYPED_OBJECT:
|
||||
log.Println("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)")
|
||||
case AMF0_AVMPLUS_OBJECT:
|
||||
log.Println("AMF0_AVMPLUS_OBJECT")
|
||||
default:
|
||||
log.Println("Unkonw type.")
|
||||
}
|
||||
return nil, errors.New(fmt.Sprintf("Unsupported type %v", t))
|
||||
}
|
||||
|
||||
func (amf *AMF) encodeObject(t AMFObjects) (err error) {
|
||||
err = amf.writeObject()
|
||||
defer amf.writeObjectEnd()
|
||||
for k, vv := range t {
|
||||
switch vvv := vv.(type) {
|
||||
case string:
|
||||
if err = amf.writeObjectString(k, vvv); err != nil {
|
||||
return
|
||||
}
|
||||
case float64, uint, float32, int, int16, int32, int64, uint16, uint32, uint64, uint8, int8:
|
||||
if err = amf.writeObjectNumber(k, util.ToFloat64(vvv)); err != nil {
|
||||
return
|
||||
}
|
||||
case bool:
|
||||
if err = amf.writeObjectBool(k, vvv); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) readDate() (t uint64, err error) {
|
||||
_, err = amf.ReadByte() // 取出第一个字节 8 Bit == 1 Byte. buf - 1.
|
||||
var b []byte
|
||||
b, err = readBytes(amf.Buffer, 8) // 在取出8个字节,并且读到b中. buf - 8
|
||||
t = util.BigEndian.Uint64(b)
|
||||
pool.RecycleSlice(b)
|
||||
b, err = readBytes(amf.Buffer, 2)
|
||||
pool.RecycleSlice(b)
|
||||
return t, err
|
||||
}
|
||||
|
||||
func (amf *AMF) readStrictArray() (list []AMFObject, err error) {
|
||||
list = make([]AMFObject, 0)
|
||||
_, err = amf.ReadByte()
|
||||
var size int
|
||||
size, err = amf.readSize()
|
||||
for i := 0; i < size; i++ {
|
||||
if obj, err := amf.decodeObject(); err != nil {
|
||||
return list, err
|
||||
} else {
|
||||
list = append(list, obj)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) readECMAArray() (m AMFObjects, err error) {
|
||||
m = make(AMFObjects, 0)
|
||||
_, err = amf.ReadByte()
|
||||
var size int
|
||||
size, err = amf.readSize()
|
||||
for i := 0; i < size; i++ {
|
||||
var k string
|
||||
var v AMFObject
|
||||
if k, err = amf.readString1(); err == nil {
|
||||
if v, err = amf.decodeObject(); err == nil {
|
||||
if k != "" || "ObjectEnd" != v {
|
||||
m[k] = v
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) readString() (str string, err error) {
|
||||
_, err = amf.ReadByte() // 取出第一个字节 8 Bit == 1 Byte. buf - 1.
|
||||
return amf.readString1()
|
||||
}
|
||||
|
||||
func (amf *AMF) readString1() (str string, err error) {
|
||||
var size int
|
||||
size, err = amf.readSize16()
|
||||
var b []byte
|
||||
b, err = readBytes(amf.Buffer, size) // 读取全部数据,读取长度为l,因为这两个字节(l变量)保存的是数据长度
|
||||
str = string(b)
|
||||
pool.RecycleSlice(b)
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) readLongString() (str string, err error) {
|
||||
_, err = amf.ReadByte()
|
||||
var size int
|
||||
size, err = amf.readSize()
|
||||
var b []byte
|
||||
b, err = readBytes(amf.Buffer, size) // 读取全部数据,读取长度为l,因为这两个字节(l变量)保存的是数据长度
|
||||
str = string(b)
|
||||
pool.RecycleSlice(b)
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) readNull() (AMFObject, error) {
|
||||
_, err := amf.ReadByte()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (amf *AMF) readNumber() (num float64, err error) {
|
||||
// binary.read 会读取8个字节(float64),如果小于8个字节返回一个`io.ErrUnexpectedEOF`,如果大于就会返回`io.ErrShortBuffer`,读取完毕会有`io.EOF`
|
||||
_, err = amf.ReadByte()
|
||||
err = binary.Read(amf, binary.BigEndian, &num)
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (amf *AMF) readBool() (f bool, err error) {
|
||||
_, err = amf.ReadByte()
|
||||
if b, err := amf.ReadByte(); err == nil {
|
||||
return b == 1, nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (amf *AMF) readObject() (m AMFObjects, err error) {
|
||||
_, err = amf.ReadByte()
|
||||
m = make(AMFObjects, 0)
|
||||
var k string
|
||||
var v AMFObject
|
||||
for {
|
||||
if k, err = amf.readString1(); err != nil {
|
||||
break
|
||||
}
|
||||
if v, err = amf.decodeObject(); err != nil {
|
||||
break
|
||||
}
|
||||
if k == "" && "ObjectEnd" == v {
|
||||
break
|
||||
}
|
||||
m[k] = v
|
||||
}
|
||||
return m, err
|
||||
}
|
||||
|
||||
func readBytes(buf *bytes.Buffer, length int) (b []byte, err error) {
|
||||
b = pool.GetSlice(length)
|
||||
if i, _ := buf.Read(b); length != i {
|
||||
err = errors.New(fmt.Sprintf("not enough bytes,%v/%v", buf.Len(), length))
|
||||
}
|
||||
return
|
||||
}
|
||||
func (amf *AMF) writeSize16(l int) (err error) {
|
||||
b := pool.GetSlice(2)
|
||||
defer pool.RecycleSlice(b)
|
||||
util.BigEndian.PutUint16(b, uint16(l))
|
||||
_, err = amf.Write(b)
|
||||
return
|
||||
}
|
||||
func (amf *AMF) writeString(value string) error {
|
||||
v := []byte(value)
|
||||
err := amf.WriteByte(byte(AMF0_STRING))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = amf.writeSize16(len(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = amf.Write(v)
|
||||
return err
|
||||
}
|
||||
|
||||
func (amf *AMF) writeNull() error {
|
||||
return amf.WriteByte(byte(AMF0_NULL))
|
||||
}
|
||||
|
||||
func (amf *AMF) writeBool(b bool) error {
|
||||
if err := amf.WriteByte(byte(AMF0_BOOLEAN)); err != nil {
|
||||
return err
|
||||
}
|
||||
if b {
|
||||
return amf.WriteByte(byte(1))
|
||||
}
|
||||
return amf.WriteByte(byte(0))
|
||||
}
|
||||
|
||||
func (amf *AMF) writeNumber(b float64) error {
|
||||
if err := amf.WriteByte(byte(AMF0_NUMBER)); err != nil {
|
||||
return err
|
||||
}
|
||||
return binary.Write(amf, binary.BigEndian, b)
|
||||
}
|
||||
|
||||
func (amf *AMF) writeObject() error {
|
||||
return amf.WriteByte(byte(AMF0_OBJECT))
|
||||
}
|
||||
func (amf *AMF) writeKey(key string) (err error) {
|
||||
keyByte := []byte(key)
|
||||
if err = amf.writeSize16(len(keyByte)); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = amf.Write(keyByte); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
func (amf *AMF) writeObjectString(key, value string) error {
|
||||
if err := amf.writeKey(key); err != nil {
|
||||
return err
|
||||
}
|
||||
return amf.writeString(value)
|
||||
}
|
||||
|
||||
func (amf *AMF) writeObjectBool(key string, f bool) error {
|
||||
if err := amf.writeKey(key); err != nil {
|
||||
return err
|
||||
}
|
||||
return amf.writeBool(f)
|
||||
}
|
||||
|
||||
func (amf *AMF) writeObjectNumber(key string, value float64) error {
|
||||
if err := amf.writeKey(key); err != nil {
|
||||
return err
|
||||
}
|
||||
return amf.writeNumber(value)
|
||||
}
|
||||
|
||||
func (amf *AMF) writeObjectEnd() error {
|
||||
_, err := amf.Write(END_OBJ)
|
||||
return err
|
||||
}
|
202
chunk.go
Normal file
202
chunk.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/Monibuca/engine/pool"
|
||||
"github.com/Monibuca/engine/util"
|
||||
)
|
||||
|
||||
// RTMP协议中基本的数据单元称为消息(Message).
|
||||
// 当RTMP协议在互联网中传输数据的时候,消息会被拆分成更小的单元,称为消息块(Chunk).
|
||||
// 在网络上传输数据时,消息需要被拆分成较小的数据块,才适合在相应的网络环境上传输.
|
||||
|
||||
// 理论上Type 0, 1, 2的Chunk都可以使用Extended Timestamp来传递时间
|
||||
// Type 3由于严禁携带Extened Timestamp字段.但实际上只有Type 0才需要带此字段.
|
||||
// 这是因为,对Type 1, 2来说,其时间为一个差值,一般肯定小于0x00FFFFF
|
||||
|
||||
// 对于除Audio,Video以外的基它Message,其时间字段都可以是置为0的,似乎没有被用到.
|
||||
// 只有在发送视频和音频数据时,才需要特别的考虑TimeStamp字段.基本依据是,要以HandShake时为起始点0来计算时间.
|
||||
// 一般来说,建立一个相对时间,把一个视频帧的TimeStamp特意的在当前时间的基础上延迟3秒,则可以达到缓存的效果
|
||||
|
||||
const (
|
||||
RTMP_CHUNK_HEAD_12 = 0 << 6 // Chunk Basic Header = (Chunk Type << 6) | Chunk Stream ID.
|
||||
RTMP_CHUNK_HEAD_8 = 1 << 6
|
||||
RTMP_CHUNK_HEAD_4 = 2 << 6
|
||||
RTMP_CHUNK_HEAD_1 = 3 << 6
|
||||
)
|
||||
|
||||
type Chunk struct {
|
||||
*ChunkHeader
|
||||
Body []byte
|
||||
MsgData interface{}
|
||||
}
|
||||
|
||||
func (c *Chunk) Encode(msg RtmpMessage) {
|
||||
c.MsgData = msg
|
||||
c.Body = msg.Encode()
|
||||
c.MessageLength = uint32(len(c.Body))
|
||||
}
|
||||
func (c *Chunk) Recycle() {
|
||||
chunkMsgPool.Put(c)
|
||||
}
|
||||
|
||||
type ChunkHeader struct {
|
||||
ChunkBasicHeader
|
||||
ChunkMessageHeader
|
||||
ChunkExtendedTimestamp
|
||||
}
|
||||
|
||||
// Basic Header (1 to 3 bytes) : This field encodes the chunk stream ID
|
||||
// and the chunk type. Chunk type determines the format of the
|
||||
// encoded message header. The length(Basic Header) depends entirely on the chunk
|
||||
// stream ID, which is a variable-length field.
|
||||
type ChunkBasicHeader struct {
|
||||
ChunkStreamID uint32 `json:""` // 6 bit. 3 ~ 65559, 0,1,2 reserved
|
||||
ChunkType byte `json:""` // 2 bit.
|
||||
}
|
||||
|
||||
// Message Header (0, 3, 7, or 11 bytes): This field encodes
|
||||
// information about the message being sent (whether in whole or in
|
||||
// part). The length can be determined using the chunk type
|
||||
// specified in the chunk header.
|
||||
type ChunkMessageHeader struct {
|
||||
Timestamp uint32 `json:""` // 3 byte
|
||||
MessageLength uint32 `json:""` // 3 byte
|
||||
MessageTypeID byte `json:""` // 1 byte
|
||||
MessageStreamID uint32 `json:""` // 4 byte
|
||||
}
|
||||
|
||||
// Extended Timestamp (0 or 4 bytes): This field is present in certain
|
||||
// circumstances depending on the encoded timestamp or timestamp
|
||||
// delta field in the Chunk Message header. See Section 5.3.1.3 for
|
||||
// more information
|
||||
type ChunkExtendedTimestamp struct {
|
||||
ExtendTimestamp uint32 `json:",omitempty"` // 标识该字段的数据可忽略
|
||||
}
|
||||
|
||||
// ChunkBasicHeader会决定ChunkMessgaeHeader,ChunkMessgaeHeader有4种(0,3,7,11 Bytes),因此可能有4种头.
|
||||
|
||||
// 1 -> ChunkBasicHeader(1) + ChunkMessageHeader(0)
|
||||
// 4 -> ChunkBasicHeader(1) + ChunkMessageHeader(3)
|
||||
// 8 -> ChunkBasicHeader(1) + ChunkMessageHeader(7)
|
||||
// 12 -> ChunkBasicHeader(1) + ChunkMessageHeader(11)
|
||||
|
||||
func encodeChunk12(head *ChunkHeader, payload []byte, size int) (mark []byte, need []byte, err error) {
|
||||
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
|
||||
return nil, nil, errors.New("chunk error")
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
chunkBasicHead := byte(RTMP_CHUNK_HEAD_12 + head.ChunkBasicHeader.ChunkStreamID)
|
||||
buf.WriteByte(chunkBasicHead)
|
||||
|
||||
b := pool.GetSlice(3)
|
||||
util.BigEndian.PutUint24(b, head.ChunkMessageHeader.Timestamp)
|
||||
buf.Write(b)
|
||||
|
||||
util.BigEndian.PutUint24(b, head.ChunkMessageHeader.MessageLength)
|
||||
buf.Write(b)
|
||||
|
||||
buf.WriteByte(head.ChunkMessageHeader.MessageTypeID)
|
||||
pool.RecycleSlice(b)
|
||||
b = pool.GetSlice(4)
|
||||
util.LittleEndian.PutUint32(b, uint32(head.ChunkMessageHeader.MessageStreamID))
|
||||
buf.Write(b)
|
||||
|
||||
if head.ChunkMessageHeader.Timestamp == 0xffffff {
|
||||
util.LittleEndian.PutUint32(b, head.ChunkExtendedTimestamp.ExtendTimestamp)
|
||||
buf.Write(b)
|
||||
}
|
||||
pool.RecycleSlice(b)
|
||||
if len(payload) > size {
|
||||
buf.Write(payload[0:size])
|
||||
need = payload[size:]
|
||||
} else {
|
||||
buf.Write(payload)
|
||||
}
|
||||
|
||||
mark = buf.Bytes()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func encodeChunk8(head *ChunkHeader, payload []byte, size int) (mark []byte, need []byte, err error) {
|
||||
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
|
||||
return nil, nil, errors.New("chunk error")
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
chunkBasicHead := byte(RTMP_CHUNK_HEAD_8 + head.ChunkBasicHeader.ChunkStreamID)
|
||||
buf.WriteByte(chunkBasicHead)
|
||||
|
||||
b := pool.GetSlice(3)
|
||||
util.BigEndian.PutUint24(b, head.ChunkMessageHeader.Timestamp)
|
||||
buf.Write(b)
|
||||
|
||||
util.BigEndian.PutUint24(b, head.ChunkMessageHeader.MessageLength)
|
||||
buf.Write(b)
|
||||
pool.RecycleSlice(b)
|
||||
buf.WriteByte(head.ChunkMessageHeader.MessageTypeID)
|
||||
|
||||
if len(payload) > size {
|
||||
buf.Write(payload[0:size])
|
||||
need = payload[size:]
|
||||
} else {
|
||||
buf.Write(payload)
|
||||
}
|
||||
|
||||
mark = buf.Bytes()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func encodeChunk4(head *ChunkHeader, payload []byte, size int) (mark []byte, need []byte, err error) {
|
||||
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
|
||||
return nil, nil, errors.New("chunk error")
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
chunkBasicHead := byte(RTMP_CHUNK_HEAD_4 + head.ChunkBasicHeader.ChunkStreamID)
|
||||
buf.WriteByte(chunkBasicHead)
|
||||
|
||||
b := pool.GetSlice(3)
|
||||
util.BigEndian.PutUint24(b, head.ChunkMessageHeader.Timestamp)
|
||||
buf.Write(b)
|
||||
pool.RecycleSlice(b)
|
||||
if len(payload) > size {
|
||||
buf.Write(payload[0:size])
|
||||
need = payload[size:]
|
||||
} else {
|
||||
buf.Write(payload)
|
||||
}
|
||||
|
||||
mark = buf.Bytes()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func encodeChunk1(head *ChunkHeader, payload []byte, size int) (mark []byte, need []byte, err error) {
|
||||
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
|
||||
return nil, nil, errors.New("chunk error")
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
chunkBasicHead := byte(RTMP_CHUNK_HEAD_1 + head.ChunkBasicHeader.ChunkStreamID)
|
||||
buf.WriteByte(chunkBasicHead)
|
||||
|
||||
if len(payload) > size {
|
||||
buf.Write(payload[0:size])
|
||||
need = payload[size:]
|
||||
} else {
|
||||
buf.Write(payload)
|
||||
}
|
||||
|
||||
mark = buf.Bytes()
|
||||
|
||||
return
|
||||
}
|
77
event.go
Normal file
77
event.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package rtmpplugin
|
||||
|
||||
// http://help.adobe.com/zh_CN/AIR/1.5/jslr/flash/events/NetStatusEvent.html
|
||||
|
||||
const (
|
||||
Response_OnStatus = "onStatus"
|
||||
Response_Result = "_result"
|
||||
Response_Error = "_error"
|
||||
|
||||
/* Level */
|
||||
Level_Status = "status"
|
||||
Level_Error = "error"
|
||||
Level_Warning = "warning"
|
||||
|
||||
/* Code */
|
||||
/* NetStream */
|
||||
NetStream_Play_Reset = "NetStream.Play.Reset" // "status" 由播放列表重置导致
|
||||
NetStream_Play_Start = "NetStream.Play.Start" // "status" 播放已开始
|
||||
NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" // "error" 无法找到传递给 play()方法的 FLV
|
||||
NetStream_Play_Stop = "NetStream.Play.Stop" // "status" 播放已结束
|
||||
NetStream_Play_Failed = "NetStream.Play.Failed" // "error" 出于此表中列出的原因之外的某一原因(例如订阅者没有读取权限),播放发生了错误
|
||||
|
||||
NetStream_Play_Switch = "NetStream.Play.Switch"
|
||||
NetStream_Play_Complete = "NetStream.Play.Switch"
|
||||
|
||||
NetStream_Data_Start = "NetStream.Data.Start"
|
||||
|
||||
NetStream_Publish_Start = "NetStream.Publish.Start" // "status" 已经成功发布.
|
||||
NetStream_Publish_BadName = "NetStream.Publish.BadName" // "error" 试图发布已经被他人发布的流.
|
||||
NetStream_Publish_Idle = "NetStream.Publish.Idle" // "status" 流发布者空闲而没有在传输数据.
|
||||
NetStream_Unpublish_Success = "NetStream.Unpublish.Success" // "status" 已成功执行取消发布操作.
|
||||
|
||||
NetStream_Buffer_Empty = "NetStream.Buffer.Empty" // "status" 数据的接收速度不足以填充缓冲区.数据流将在缓冲区重新填充前中断,此时将发送 NetStream.Buffer.Full 消息,并且该流将重新开始播放
|
||||
NetStream_Buffer_Full = "NetStream.Buffer.Full" // "status" 缓冲区已满并且流将开始播放
|
||||
NetStream_Buffe_Flush = "NetStream.Buffer.Flush" // "status" 数据已完成流式处理,剩余的缓冲区将被清空
|
||||
NetStream_Pause_Notify = "NetStream.Pause.Notify" // "status" 流已暂停
|
||||
NetStream_Unpause_Notify = "NetStream.Unpause.Notify" // "status" 流已恢复
|
||||
|
||||
NetStream_Record_Start = "NetStream.Record.Start" // "status" 录制已开始.
|
||||
NetStream_Record_NoAccess = "NetStream.Record.NoAccess" // "error" 试图录制仍处于播放状态的流或客户端没有访问权限的流.
|
||||
NetStream_Record_Stop = "NetStream.Record.Stop" // "status" 录制已停止.
|
||||
NetStream_Record_Failed = "NetStream.Record.Failed" // "error" 尝试录制流失败.
|
||||
|
||||
NetStream_Seek_Failed = "NetStream.Seek.Failed" // "error" 搜索失败,如果流处于不可搜索状态,则会发生搜索失败.
|
||||
NetStream_Seek_InvalidTime = "NetStream.Seek.InvalidTime" // "error" 对于使用渐进式下载方式下载的视频,用户已尝试跳过到目前为止已下载的视频数据的结尾或在整个文件已下载后跳过视频的结尾进行搜寻或播放. message.details 属性包含一个时间代码,该代码指出用户可以搜寻的最后一个有效位置.
|
||||
NetStream_Seek_Notify = "NetStream.Seek.Notify" // "status" 搜寻操作完成.
|
||||
|
||||
/* NetConnect */
|
||||
NetConnection_Call_BadVersion = "NetConnection.Call.BadVersion" // "error" 以不能识别的格式编码的数据包.
|
||||
NetConnection_Call_Failed = "NetConnection.Call.Failed" // "error" NetConnection.call 方法无法调用服务器端的方法或命令.
|
||||
NetConnection_Call_Prohibited = "NetConnection.Call.Prohibited" // "error" Action Message Format (AMF) 操作因安全原因而被阻止. 或者是 AMF URL 与 SWF 不在同一个域,或者是 AMF 服务器没有信任 SWF 文件的域的策略文件.
|
||||
NetConnection_Connect_AppShutdown = "NetConnection.Connect.AppShutdown" // "error" 正在关闭指定的应用程序.
|
||||
NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" // "error" 连接时指定的应用程序名无效.
|
||||
NetConnection_Connect_Success = "NetConnection.Connect.Success" // "status" 连接尝试成功.
|
||||
NetConnection_Connect_Closed = "NetConnection.Connect.Closed" // "status" 成功关闭连接.
|
||||
NetConnection_Connect_Failed = "NetConnection.Connect.Failed" // "error" 连接尝试失败.
|
||||
NetConnection_Connect_Rejected = "NetConnection.Connect.Rejected" // "error" 连接尝试没有访问应用程序的权限.
|
||||
|
||||
/* SharedObject */
|
||||
SharedObject_Flush_Success = "SharedObject.Flush.Success" //"status" "待定"状态已解析并且 SharedObject.flush() 调用成功.
|
||||
SharedObject_Flush_Failed = "SharedObject.Flush.Failed" //"error" "待定"状态已解析,但 SharedObject.flush() 失败.
|
||||
SharedObject_BadPersistence = "SharedObject.BadPersistence" //"error" 使用永久性标志对共享对象进行了请求,但请求无法被批准,因为已经使用其它标记创建了该对象.
|
||||
SharedObject_UriMismatch = "SharedObject.UriMismatch" //"error" 试图连接到拥有与共享对象不同的 URI (URL) 的 NetConnection 对象.
|
||||
)
|
||||
|
||||
// NetConnection、NetStream 或 SharedObject 对象报告其状态时,将调度 NetStatusEvent 对象
|
||||
|
||||
type NetStatusEvent struct {
|
||||
Code string
|
||||
Level string
|
||||
}
|
||||
|
||||
func newNetStatusEvent(code, level string) (e *NetStatusEvent) {
|
||||
e.Code = code
|
||||
e.Level = level
|
||||
return e
|
||||
}
|
5
go.mod
Normal file
5
go.mod
Normal file
@@ -0,0 +1,5 @@
|
||||
module github.com/Monibuca/rtmpplugin
|
||||
|
||||
go 1.13
|
||||
|
||||
require github.com/Monibuca/engine v1.0.2
|
20
go.sum
Normal file
20
go.sum
Normal file
@@ -0,0 +1,20 @@
|
||||
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/Monibuca/engine v1.0.2 h1:UpPAEQVYrVQrLr9GVGcbu8x5Oiemqd5J2zjGZ/Fhg74=
|
||||
github.com/Monibuca/engine v1.0.2/go.mod h1:NjqVgtXuRSOyk3+NWgCuDf2p7TsBisjYxoEwA9uCZ38=
|
||||
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
|
||||
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
|
||||
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
|
||||
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
|
||||
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY=
|
||||
github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
|
||||
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
345
handshake.go
Normal file
345
handshake.go
Normal file
@@ -0,0 +1,345 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
)
|
||||
|
||||
const (
|
||||
C1S1_SIZE = 1536
|
||||
|
||||
C1S1_TIME_SIZE = 4
|
||||
C1S1_VERSION_SIZE = 4
|
||||
|
||||
C1S1_DIGEST_SIZE = 764
|
||||
C1S1_DIGEST_OFFSET_SIZE = 4
|
||||
C1S1_DIGEST_OFFSET_MAX = 764 - 32 - 4
|
||||
C1S1_DIGEST_DATA_SIZE = 32
|
||||
|
||||
C1S1_KEY_SIZE = 764
|
||||
C1S1_KEY_OFFSET_SIZE = 4
|
||||
C1S1_KEY_OFFSET_MAX = 764 - 128 - 4
|
||||
C1S1_KEY_DATA_SIZE = 128
|
||||
|
||||
RTMP_HANDSHAKE_VERSION = 0x03
|
||||
)
|
||||
|
||||
var (
|
||||
FMS_KEY = []byte{
|
||||
0x47, 0x65, 0x6e, 0x75, 0x69, 0x6e, 0x65, 0x20,
|
||||
0x41, 0x64, 0x6f, 0x62, 0x65, 0x20, 0x46, 0x6c,
|
||||
0x61, 0x73, 0x68, 0x20, 0x4d, 0x65, 0x64, 0x69,
|
||||
0x61, 0x20, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
|
||||
0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Media Server 001
|
||||
0xf0, 0xee, 0xc2, 0x4a, 0x80, 0x68, 0xbe, 0xe8,
|
||||
0x2e, 0x00, 0xd0, 0xd1, 0x02, 0x9e, 0x7e, 0x57,
|
||||
0x6e, 0xec, 0x5d, 0x2d, 0x29, 0x80, 0x6f, 0xab,
|
||||
0x93, 0xb8, 0xe6, 0x36, 0xcf, 0xeb, 0x31, 0xae,
|
||||
} // 68
|
||||
FP_KEY = []byte{
|
||||
0x47, 0x65, 0x6E, 0x75, 0x69, 0x6E, 0x65, 0x20,
|
||||
0x41, 0x64, 0x6F, 0x62, 0x65, 0x20, 0x46, 0x6C,
|
||||
0x61, 0x73, 0x68, 0x20, 0x50, 0x6C, 0x61, 0x79,
|
||||
0x65, 0x72, 0x20, 0x30, 0x30, 0x31, /* Genuine Adobe Flash Player 001 */
|
||||
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8,
|
||||
0x2E, 0x00, 0xD0, 0xD1, 0x02, 0x9E, 0x7E, 0x57,
|
||||
0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
|
||||
0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE,
|
||||
} // 62
|
||||
)
|
||||
|
||||
// C0 S0 (1 byte) : 版本号
|
||||
|
||||
// C1 S1 :
|
||||
// Time (4 bytes)
|
||||
// Zero (4 bytes) -> 这个字段必须都是0.如果不是0,代表要使用complex handshack
|
||||
// Random data (128 Bytes)
|
||||
|
||||
// C2 S2 : 参考C1 S1
|
||||
|
||||
func ReadBuf(r io.Reader, length int) (buf []byte) {
|
||||
buf = make([]byte, length)
|
||||
io.ReadFull(r, buf)
|
||||
return
|
||||
}
|
||||
|
||||
func Handshake(brw *bufio.ReadWriter) error {
|
||||
C0C1 := ReadBuf(brw, 1536+1)
|
||||
if C0C1[0] != RTMP_HANDSHAKE_VERSION {
|
||||
return errors.New("C0 Error")
|
||||
}
|
||||
|
||||
if len(C0C1[1:]) != 1536 {
|
||||
return errors.New("C1 Error")
|
||||
}
|
||||
|
||||
C1 := make([]byte, 1536)
|
||||
copy(C1, C0C1[1:])
|
||||
temp := C1[4] & 0xff
|
||||
|
||||
if temp == 0 {
|
||||
return simple_handshake(brw, C1)
|
||||
}
|
||||
|
||||
return complex_handshake(brw, C1)
|
||||
}
|
||||
|
||||
func simple_handshake(brw *bufio.ReadWriter, C1 []byte) error {
|
||||
var S0 byte
|
||||
S0 = 0x03
|
||||
S1 := make([]byte, 1536-4)
|
||||
S2 := C1
|
||||
S1_Time := uint32(0)
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
buf.WriteByte(S0)
|
||||
binary.Write(buf, binary.BigEndian, S1_Time)
|
||||
buf.Write(S1)
|
||||
buf.Write(S2)
|
||||
|
||||
brw.Write(buf.Bytes())
|
||||
brw.Flush() // Don't forget to flush
|
||||
|
||||
ReadBuf(brw, 1536)
|
||||
return nil
|
||||
}
|
||||
|
||||
func complex_handshake(brw *bufio.ReadWriter, C1 []byte) error {
|
||||
// 验证客户端,digest偏移位置和scheme由客户端定.
|
||||
scheme, challenge, digest, ok, err := validateClient(C1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Sprintf("digested handshake, scheme : %v\nchallenge : %v\ndigest : %v\nok : %v\nerr : %v\n", scheme, challenge, digest, ok, err)
|
||||
|
||||
if !ok {
|
||||
return errors.New("validateClient failed")
|
||||
}
|
||||
|
||||
// s0
|
||||
var S0 byte
|
||||
S0 = 0x03
|
||||
|
||||
// s1
|
||||
S1 := create_S1()
|
||||
S1_Digest_Offset := scheme_Digest_Offset(S1, scheme)
|
||||
S1_Part1 := S1[:S1_Digest_Offset]
|
||||
S1_Part2 := S1[S1_Digest_Offset+C1S1_DIGEST_DATA_SIZE:]
|
||||
|
||||
// s1 part1 + part2
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Write(S1_Part1)
|
||||
buf.Write(S1_Part2)
|
||||
S1_Part1_Part2 := buf.Bytes()
|
||||
|
||||
// s1 digest
|
||||
tmp_Hash, err := HMAC_SHA256(S1_Part1_Part2, FMS_KEY[:36])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// incomplete s1
|
||||
copy(S1[S1_Digest_Offset:], tmp_Hash)
|
||||
|
||||
// s2
|
||||
S2_Random := cerate_S2()
|
||||
|
||||
tmp_Hash, err = HMAC_SHA256(digest, FMS_KEY[:68])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// s2 digest
|
||||
S2_Digest, err := HMAC_SHA256(S2_Random, tmp_Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buffer := new(bytes.Buffer)
|
||||
buffer.WriteByte(S0)
|
||||
buffer.Write(S1)
|
||||
buffer.Write(S2_Random)
|
||||
buffer.Write(S2_Digest)
|
||||
|
||||
brw.Write(buffer.Bytes())
|
||||
brw.Flush()
|
||||
|
||||
ReadBuf(brw, 1536)
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateClient(C1 []byte) (scheme int, challenge []byte, digest []byte, ok bool, err error) {
|
||||
scheme, challenge, digest, ok, err = clientScheme(C1, 1)
|
||||
if ok {
|
||||
return scheme, challenge, digest, ok, nil
|
||||
}
|
||||
|
||||
scheme, challenge, digest, ok, err = clientScheme(C1, 0)
|
||||
if ok {
|
||||
return scheme, challenge, digest, ok, nil
|
||||
}
|
||||
|
||||
return scheme, challenge, digest, ok, errors.New("Client scheme error")
|
||||
}
|
||||
|
||||
func clientScheme(C1 []byte, schem int) (scheme int, challenge []byte, digest []byte, ok bool, err error) {
|
||||
digest_offset := -1
|
||||
key_offset := -1
|
||||
|
||||
if schem == 0 {
|
||||
digest_offset = scheme0_Digest_Offset(C1)
|
||||
key_offset = scheme0_Key_Offset(C1)
|
||||
} else if schem == 1 {
|
||||
digest_offset = scheme1_Digest_Offset(C1)
|
||||
key_offset = scheme1_Key_Offset(C1)
|
||||
}
|
||||
|
||||
// digest
|
||||
c1_Part1 := C1[:digest_offset]
|
||||
c1_Part2 := C1[digest_offset+C1S1_DIGEST_DATA_SIZE:]
|
||||
digest = C1[digest_offset : digest_offset+C1S1_DIGEST_DATA_SIZE]
|
||||
|
||||
// part1 + part2
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Write(c1_Part1)
|
||||
buf.Write(c1_Part2)
|
||||
c1_Part1_Part2 := buf.Bytes()
|
||||
|
||||
tmp_Hash, err := HMAC_SHA256(c1_Part1_Part2, FP_KEY[:30])
|
||||
if err != nil {
|
||||
return 0, nil, nil, false, err
|
||||
}
|
||||
|
||||
// ok
|
||||
if bytes.Compare(digest, tmp_Hash) == 0 {
|
||||
ok = true
|
||||
} else {
|
||||
ok = false
|
||||
}
|
||||
|
||||
// challenge scheme
|
||||
challenge = C1[key_offset : key_offset+C1S1_KEY_DATA_SIZE]
|
||||
scheme = schem
|
||||
return
|
||||
}
|
||||
|
||||
func scheme_Digest_Offset(C1S1 []byte, scheme int) int {
|
||||
if scheme == 0 {
|
||||
return scheme0_Digest_Offset(C1S1)
|
||||
} else if scheme == 1 {
|
||||
return scheme1_Digest_Offset(C1S1)
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
|
||||
// scheme0:
|
||||
// time + version + digest + key
|
||||
// time + version + [offset + random + digest-data + random-data] + key
|
||||
// 4 + 4 + [4 + offset + 32 + 728-offset ] + 764
|
||||
// 4 + 4 + 764 + 764
|
||||
// 0 <= scheme0_digest_offset <= 728 == 764 - 32 - 4
|
||||
// 如果digest.offset == 3,那么digest[7~38]为digest.digest-data,如果offset == 728, 那么digest[732~763]为digest-data)
|
||||
func scheme0_Digest_Offset(C1S1 []byte) int {
|
||||
scheme0_digest_offset := int(C1S1[8]&0xff) + int(C1S1[9]&0xff) + int(C1S1[10]&0xff) + int(C1S1[11]&0xff)
|
||||
|
||||
scheme0_digest_offset = (scheme0_digest_offset % C1S1_DIGEST_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_DIGEST_OFFSET_SIZE
|
||||
if scheme0_digest_offset+32 >= C1S1_SIZE {
|
||||
// digest error
|
||||
// digest 数据超出1536.
|
||||
}
|
||||
|
||||
return scheme0_digest_offset
|
||||
}
|
||||
|
||||
// key:
|
||||
// random-data + key-data + random-data + offset
|
||||
// offset + 128 + 764-offset-128-4 + 4
|
||||
// 0 <= scheme0_key_offset <= 632 == 764 - 128 - 4
|
||||
// 如果key.offset == 3, 那么key[3~130]为key-data,这个位置是相对于key结构的第0个字节开始
|
||||
func scheme0_Key_Offset(C1S1 []byte) int {
|
||||
scheme0_key_offset := int(C1S1[1532]) + int(C1S1[1533]) + int(C1S1[1534]) + int(C1S1[1535])
|
||||
|
||||
scheme0_key_offset = (scheme0_key_offset % C1S1_KEY_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_DIGEST_SIZE
|
||||
if scheme0_key_offset+128 >= C1S1_SIZE {
|
||||
// key error
|
||||
}
|
||||
|
||||
return scheme0_key_offset
|
||||
}
|
||||
|
||||
// scheme1:
|
||||
// time + version + key + digest
|
||||
// 0 <= scheme1_digest_offset <= 728 == 764 - 32 - 4
|
||||
func scheme1_Digest_Offset(C1S1 []byte) int {
|
||||
scheme1_digest_offset := int(C1S1[772]&0xff) + int(C1S1[773]&0xff) + int(C1S1[774]&0xff) + int(C1S1[775]&0xff)
|
||||
|
||||
scheme1_digest_offset = (scheme1_digest_offset % C1S1_DIGEST_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_KEY_SIZE + C1S1_DIGEST_OFFSET_SIZE
|
||||
if scheme1_digest_offset+32 >= C1S1_SIZE {
|
||||
// digest error
|
||||
}
|
||||
|
||||
return scheme1_digest_offset
|
||||
}
|
||||
|
||||
// time + version + key + digest
|
||||
// 0 <= scheme1_key_offset <= 632 == 764 - 128 - 4
|
||||
func scheme1_Key_Offset(C1S1 []byte) int {
|
||||
scheme1_key_offset := int(C1S1[768]) + int(C1S1[769]) + int(C1S1[770]) + int(C1S1[771])
|
||||
|
||||
scheme1_key_offset = (scheme1_key_offset % C1S1_KEY_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_DIGEST_SIZE
|
||||
if scheme1_key_offset+128 >= C1S1_SIZE {
|
||||
// key error
|
||||
}
|
||||
|
||||
return scheme1_key_offset
|
||||
}
|
||||
|
||||
// HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出
|
||||
// 哈希算法sha256.New, 密钥 key, 消息 message.
|
||||
func HMAC_SHA256(message []byte, key []byte) ([]byte, error) {
|
||||
mac := hmac.New(sha256.New, key)
|
||||
_, err := mac.Write(message)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mac.Sum(nil), nil
|
||||
}
|
||||
|
||||
func create_S1() []byte {
|
||||
s1_Time := []byte{0, 0, 0, 0}
|
||||
s1_Version := []byte{1, 1, 1, 1}
|
||||
s1_key_Digest := make([]byte, 1536-8)
|
||||
|
||||
for i, _ := range s1_key_Digest {
|
||||
s1_key_Digest[i] = byte(rand.Int() % 256)
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Write(s1_Time)
|
||||
buf.Write(s1_Version)
|
||||
buf.Write(s1_key_Digest)
|
||||
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func cerate_S2() []byte {
|
||||
s2_Random := make([]byte, 1536-32)
|
||||
|
||||
for i, _ := range s2_Random {
|
||||
s2_Random[i] = byte(rand.Int() % 256)
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Write(s2_Random)
|
||||
|
||||
return buf.Bytes()
|
||||
}
|
23
index.go
Normal file
23
index.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
. "github.com/Monibuca/engine"
|
||||
)
|
||||
|
||||
var config = new(ListenerConfig)
|
||||
|
||||
func init() {
|
||||
InstallPlugin(&PluginConfig{
|
||||
Name: "RTMP",
|
||||
Type: PLUGIN_SUBSCRIBER | PLUGIN_PUBLISHER,
|
||||
Config: config,
|
||||
Version: "1.0.0",
|
||||
Run: run,
|
||||
})
|
||||
}
|
||||
func run() {
|
||||
log.Printf("server rtmp start at %s", config.ListenAddr)
|
||||
log.Fatal(ListenRtmp(config.ListenAddr))
|
||||
}
|
894
msg.go
Normal file
894
msg.go
Normal file
@@ -0,0 +1,894 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/Monibuca/engine/util"
|
||||
"log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
/* RTMP Message ID*/
|
||||
|
||||
// Protocal Control Messgae(1-7)
|
||||
|
||||
// Chunk
|
||||
RTMP_MSG_CHUNK_SIZE = 1
|
||||
RTMP_MSG_ABORT = 2
|
||||
|
||||
// RTMP
|
||||
RTMP_MSG_ACK = 3
|
||||
RTMP_MSG_USER_CONTROL = 4
|
||||
RTMP_MSG_ACK_SIZE = 5
|
||||
RTMP_MSG_BANDWIDTH = 6
|
||||
RTMP_MSG_EDGE = 7
|
||||
RTMP_MSG_AUDIO = 8
|
||||
RTMP_MSG_VIDEO = 9
|
||||
RTMP_MSG_AMF3_METADATA = 15
|
||||
RTMP_MSG_AMF3_SHARED = 16
|
||||
RTMP_MSG_AMF3_COMMAND = 17
|
||||
|
||||
RTMP_MSG_AMF0_METADATA = 18
|
||||
RTMP_MSG_AMF0_SHARED = 19
|
||||
RTMP_MSG_AMF0_COMMAND = 20
|
||||
|
||||
RTMP_MSG_AGGREGATE = 22
|
||||
|
||||
RTMP_DEFAULT_CHUNK_SIZE = 128
|
||||
RTMP_MAX_CHUNK_SIZE = 65536
|
||||
RTMP_MAX_CHUNK_HEADER = 18
|
||||
|
||||
// User Control Event
|
||||
RTMP_USER_STREAM_BEGIN = 0
|
||||
RTMP_USER_STREAM_EOF = 1
|
||||
RTMP_USER_STREAM_DRY = 2
|
||||
RTMP_USER_SET_BUFFLEN = 3
|
||||
RTMP_USER_STREAM_IS_RECORDED = 4
|
||||
RTMP_USER_PING_REQUEST = 6
|
||||
RTMP_USER_PING_RESPONSE = 7
|
||||
RTMP_USER_EMPTY = 31
|
||||
|
||||
// StreamID == (ChannelID-4)/5+1
|
||||
// ChannelID == Chunk Stream ID
|
||||
// StreamID == Message Stream ID
|
||||
// Chunk Stream ID == 0, 第二个byte + 64
|
||||
// Chunk Stream ID == 1, (第三个byte) * 256 + 第二个byte + 64
|
||||
// Chunk Stream ID == 2.
|
||||
// 2 < Chunk Stream ID < 64(2的6次方)
|
||||
RTMP_CSID_CONTROL = 0x02
|
||||
RTMP_CSID_COMMAND = 0x03
|
||||
RTMP_CSID_AUDIO = 0x06
|
||||
RTMP_CSID_DATA = 0x05
|
||||
RTMP_CSID_VIDEO = 0x05
|
||||
)
|
||||
|
||||
var (
|
||||
rtmpHeaderPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return new(ChunkHeader)
|
||||
},
|
||||
}
|
||||
chunkMsgPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return new(Chunk)
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func newChunkHeader(messageType byte) *ChunkHeader {
|
||||
head := rtmpHeaderPool.Get().(*ChunkHeader)
|
||||
head.ChunkStreamID = RTMP_CSID_CONTROL
|
||||
head.Timestamp = 0
|
||||
head.MessageTypeID = messageType
|
||||
head.MessageStreamID = 0
|
||||
head.ExtendTimestamp = 0
|
||||
return head
|
||||
}
|
||||
func newRtmpHeader(chunkID uint32, timestamp uint32, messageLength uint32, messageType byte, messageStreamID uint32, extendTimestamp uint32) *ChunkHeader {
|
||||
head := rtmpHeaderPool.Get().(*ChunkHeader)
|
||||
head.ChunkStreamID = chunkID
|
||||
head.Timestamp = timestamp
|
||||
head.MessageLength = messageLength
|
||||
head.MessageTypeID = messageType
|
||||
head.MessageStreamID = messageStreamID
|
||||
head.ExtendTimestamp = extendTimestamp
|
||||
return head
|
||||
}
|
||||
|
||||
func (h *ChunkHeader) Clone() *ChunkHeader {
|
||||
head := rtmpHeaderPool.Get().(*ChunkHeader)
|
||||
head.ChunkStreamID = h.ChunkStreamID
|
||||
head.Timestamp = h.Timestamp
|
||||
head.MessageLength = h.MessageLength
|
||||
head.MessageTypeID = h.MessageTypeID
|
||||
head.MessageStreamID = h.MessageStreamID
|
||||
head.ExtendTimestamp = h.ExtendTimestamp
|
||||
|
||||
return head
|
||||
}
|
||||
|
||||
type RtmpMessage interface {
|
||||
Encode() []byte
|
||||
}
|
||||
type HaveStreamID interface {
|
||||
GetStreamID() uint32
|
||||
}
|
||||
|
||||
func GetRtmpMessage(chunk *Chunk) {
|
||||
switch chunk.MessageTypeID {
|
||||
case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE:
|
||||
chunk.MsgData = Uint32Message(util.BigEndian.Uint32(chunk.Body))
|
||||
case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件.
|
||||
{
|
||||
base := UserControlMessage{
|
||||
EventType: util.BigEndian.Uint16(chunk.Body),
|
||||
EventData: chunk.Body[2:],
|
||||
}
|
||||
switch base.EventType {
|
||||
case RTMP_USER_STREAM_BEGIN: // 服务端向客户端发送本事件通知对方一个流开始起作用可以用于通讯.在默认情况下,服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID.
|
||||
m := &StreamIDMessage{
|
||||
UserControlMessage: base,
|
||||
StreamID: 0,
|
||||
}
|
||||
if len(base.EventData) >= 4 {
|
||||
//服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID.
|
||||
m.StreamID = util.BigEndian.Uint32(base.EventData)
|
||||
}
|
||||
chunk.MsgData = m
|
||||
case RTMP_USER_STREAM_EOF, RTMP_USER_STREAM_DRY, RTMP_USER_STREAM_IS_RECORDED: // 服务端向客户端发送本事件通知客户端,数据回放完成.果没有发行额外的命令,就不再发送数据.客户端丢弃从流中接收的消息.4字节的事件数据表示,回放结束的流的ID.
|
||||
m := &StreamIDMessage{
|
||||
UserControlMessage: base,
|
||||
StreamID: util.BigEndian.Uint32(base.EventData),
|
||||
}
|
||||
chunk.MsgData = m
|
||||
case RTMP_USER_SET_BUFFLEN: // 客户端向服务端发送本事件,告知对方自己存储一个流的数据的缓存的长度(毫秒单位).当服务端开始处理一个流得时候发送本事件.事件数据的头四个字节表示流ID,后4个字节表示缓存长度(毫秒单位).
|
||||
m := &SetBufferMessage{
|
||||
StreamIDMessage: StreamIDMessage{
|
||||
UserControlMessage: base,
|
||||
StreamID: util.BigEndian.Uint32(base.EventData),
|
||||
},
|
||||
Millisecond: util.BigEndian.Uint32(base.EventData[4:]),
|
||||
}
|
||||
chunk.MsgData = m
|
||||
case RTMP_USER_PING_REQUEST: // 服务端通过本事件测试客户端是否可达.事件数据是4个字节的事件戳.代表服务调用本命令的本地时间.客户端在接收到kMsgPingRequest之后返回kMsgPingResponse事件
|
||||
m := &PingRequestMessage{
|
||||
UserControlMessage: base,
|
||||
Timestamp: util.BigEndian.Uint32(base.EventData),
|
||||
}
|
||||
chunk.MsgData = m
|
||||
case RTMP_USER_PING_RESPONSE, RTMP_USER_EMPTY: // 客户端向服务端发送本消息响应ping请求.事件数据是接kMsgPingRequest请求的时间.
|
||||
chunk.MsgData = &base
|
||||
default:
|
||||
chunk.MsgData = &base
|
||||
}
|
||||
}
|
||||
case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽.
|
||||
m := &SetPeerBandwidthMessage{
|
||||
AcknowledgementWindowsize: util.BigEndian.Uint32(chunk.Body),
|
||||
}
|
||||
if len(chunk.Body) > 4 {
|
||||
m.LimitType = chunk.Body[4]
|
||||
}
|
||||
chunk.MsgData = m
|
||||
case RTMP_MSG_EDGE: // RTMP消息类型ID=7, 用于边缘服务与源服务器.
|
||||
case RTMP_MSG_AUDIO: // RTMP消息类型ID=8, 音频数据.客户端或服务端发送本消息用于发送音频数据.
|
||||
case RTMP_MSG_VIDEO: // RTMP消息类型ID=9, 视频数据.客户端或服务端发送本消息用于发送视频数据.
|
||||
case RTMP_MSG_AMF3_METADATA: // RTMP消息类型ID=15, 数据消息.用AMF3编码.
|
||||
case RTMP_MSG_AMF3_SHARED: // RTMP消息类型ID=16, 共享对象消息.用AMF3编码.
|
||||
case RTMP_MSG_AMF3_COMMAND: // RTMP消息类型ID=17, 命令消息.用AMF3编码.
|
||||
decodeCommandAMF3(chunk)
|
||||
case RTMP_MSG_AMF0_METADATA: // RTMP消息类型ID=18, 数据消息.用AMF0编码.
|
||||
case RTMP_MSG_AMF0_SHARED: // RTMP消息类型ID=19, 共享对象消息.用AMF0编码.
|
||||
case RTMP_MSG_AMF0_COMMAND: // RTMP消息类型ID=20, 命令消息.用AMF0编码.
|
||||
decodeCommandAMF0(chunk) // 解析具体的命令消息
|
||||
case RTMP_MSG_AGGREGATE:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// 03 00 00 00 00 01 02 14 00 00 00 00 02 00 07 63 6F 6E 6E 65 63 74 00 3F F0 00 00 00 00 00 00 08
|
||||
//
|
||||
// 这个函数解析的是从02(第13个字节)开始,前面12个字节是Header,后面的是Payload,即解析Payload.
|
||||
//
|
||||
// 解析用AMF0编码的命令消息.(Payload)
|
||||
// 第一个字节(Byte)为此数据的类型.例如:string,int,bool...
|
||||
|
||||
// string就是字符类型,一个byte的amf类型,两个bytes的字符长度,和N个bytes的数据.
|
||||
// 比如: 02 00 02 33 22,第一个byte为amf类型,其后两个bytes为长度,注意这里的00 02是大端模式,33 22是字符数据
|
||||
|
||||
// umber类型其实就是double,占8bytes.
|
||||
// 比如: 00 00 00 00 00 00 00 00,第一个byte为amf类型,其后8bytes为double值0.0
|
||||
|
||||
// boolean就是布尔类型,占用1byte.
|
||||
// 比如:01 00,第一个byte为amf类型,其后1byte是值,false.
|
||||
|
||||
// object类型要复杂点.
|
||||
// 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束
|
||||
func decodeCommandAMF0(chunk *Chunk) {
|
||||
amf := newAMFDecoder(chunk.Body) // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去.
|
||||
cmd := readString(amf) // rtmp_amf.go, 将payload的bytes类型转换成string类型.
|
||||
cmdMsg := CommandMessage{
|
||||
cmd,
|
||||
readTransactionId(amf),
|
||||
}
|
||||
switch cmd {
|
||||
case "connect", "call":
|
||||
chunk.MsgData = &CallMessage{
|
||||
cmdMsg,
|
||||
readObject(amf),
|
||||
readObject(amf),
|
||||
}
|
||||
case "createStream":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &CreateStreamMessage{
|
||||
cmdMsg, readObject(amf),
|
||||
}
|
||||
case "play":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &PlayMessage{
|
||||
cmdMsg,
|
||||
readString(amf),
|
||||
readNumber(amf),
|
||||
readNumber(amf),
|
||||
readBool(amf),
|
||||
}
|
||||
case "play2":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &Play2Message{
|
||||
cmdMsg,
|
||||
readNumber(amf),
|
||||
readString(amf),
|
||||
readString(amf),
|
||||
readNumber(amf),
|
||||
readString(amf),
|
||||
}
|
||||
case "publish":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &PublishMessage{
|
||||
cmdMsg,
|
||||
readString(amf),
|
||||
readString(amf),
|
||||
}
|
||||
case "pause":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &PauseMessage{
|
||||
cmdMsg,
|
||||
readBool(amf),
|
||||
readNumber(amf),
|
||||
}
|
||||
case "seek":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &SeekMessage{
|
||||
cmdMsg,
|
||||
readNumber(amf),
|
||||
}
|
||||
case "deleteStream", "closeStream", "releaseStream":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &CURDStreamMessage{
|
||||
cmdMsg,
|
||||
uint32(readNumber(amf)),
|
||||
}
|
||||
case "receiveAudio", "receiveVideo":
|
||||
amf.readNull()
|
||||
chunk.MsgData = &ReceiveAVMessage{
|
||||
cmdMsg,
|
||||
readBool(amf),
|
||||
}
|
||||
case "_result", "_error", "onStatus":
|
||||
chunk.MsgData = &ResponseMessage{
|
||||
cmdMsg,
|
||||
readObject(amf),
|
||||
readObject(amf), "",
|
||||
}
|
||||
case "FCPublish", "FCUnpublish":
|
||||
default:
|
||||
log.Println("decode command amf0 cmd:", cmd)
|
||||
}
|
||||
}
|
||||
|
||||
func decodeCommandAMF3(chunk *Chunk) {
|
||||
chunk.Body = chunk.Body[1:]
|
||||
decodeCommandAMF0(chunk)
|
||||
}
|
||||
|
||||
func readTransactionId(amf *AMF) uint64 {
|
||||
v, _ := amf.readNumber()
|
||||
return uint64(v)
|
||||
}
|
||||
func readString(amf *AMF) string {
|
||||
v, _ := amf.readString()
|
||||
return v
|
||||
}
|
||||
func readNumber(amf *AMF) uint64 {
|
||||
v, _ := amf.readNumber()
|
||||
return uint64(v)
|
||||
}
|
||||
func readBool(amf *AMF) bool {
|
||||
v, _ := amf.readBool()
|
||||
return v
|
||||
}
|
||||
|
||||
func readObject(amf *AMF) AMFObjects {
|
||||
v, _ := amf.readObject()
|
||||
return v
|
||||
}
|
||||
|
||||
/* Command Message */
|
||||
type CommandMessage struct {
|
||||
CommandName string // 命令名. 字符串. 命令名.设置为"connect"
|
||||
TransactionId uint64 // 传输ID. 数字. 总是设为1
|
||||
}
|
||||
type Commander interface {
|
||||
GetCommand() *CommandMessage
|
||||
}
|
||||
|
||||
func (cmd *CommandMessage) GetCommand() *CommandMessage {
|
||||
return cmd
|
||||
}
|
||||
|
||||
// Protocol control message 1.
|
||||
// Set Chunk Size, is used to notify the peer of a new maximum chunk size
|
||||
|
||||
// chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the sender’s subsequent chunks until further notice
|
||||
type Uint32Message uint32
|
||||
|
||||
func (msg Uint32Message) Encode() (b []byte) {
|
||||
b = make([]byte, 4)
|
||||
util.BigEndian.PutUint32(b, uint32(msg))
|
||||
return b
|
||||
}
|
||||
|
||||
// Protocol control message 4, User Control Messages.
|
||||
// User Control messages SHOULD use message stream ID 0 (known as the control stream) and, when sent over RTMP Chunk Stream,
|
||||
// be sent on chunk stream ID 2. User Control messages are effective at the point they are received in the stream; their timestamps are ignored.
|
||||
|
||||
// Event Type (16 bits) : The first 2 bytes of the message data are used to identify the Event type. Event type is followed by Event data.
|
||||
// Event Data
|
||||
type UserControlMessage struct {
|
||||
EventType uint16
|
||||
EventData []byte
|
||||
}
|
||||
|
||||
// Protocol control message 6, Set Peer Bandwidth Message.
|
||||
// The client or the server sends this message to limit the output bandwidth of its peer.
|
||||
|
||||
// AcknowledgementWindowsize (4 bytes)
|
||||
// LimitType : The Limit Type is one of the following values: 0 - Hard, 1 - Soft, 2- Dynamic.
|
||||
type SetPeerBandwidthMessage struct {
|
||||
AcknowledgementWindowsize uint32 // 4 bytes
|
||||
LimitType byte
|
||||
}
|
||||
|
||||
func (msg *SetPeerBandwidthMessage) Encode() (b []byte) {
|
||||
b = make([]byte, 5)
|
||||
util.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize)
|
||||
b[4] = msg.LimitType
|
||||
return
|
||||
}
|
||||
|
||||
// Message 15, 18. Data Message. The client or the server sends this message to send Metadata or any
|
||||
// user data to the peer. Metadata includes details about the data(audio, video etc.) like creation time, duration,
|
||||
// theme and so on. These messages have been assigned message type value of 18 for AMF0 and message type value of 15 for AMF3
|
||||
type MetadataMessage struct {
|
||||
Proterties map[string]interface{} `json:",omitempty"`
|
||||
}
|
||||
|
||||
// Object 可选值:
|
||||
// App 客户端要连接到的服务应用名 Testapp
|
||||
// Flashver Flash播放器版本.和应用文档中getversion()函数返回的字符串相同. FMSc/1.0
|
||||
// SwfUrl 发起连接的swf文件的url file://C:/ FlvPlayer.swf
|
||||
// TcUrl 服务url.有下列的格式.protocol://servername:port/appName/appInstance rtmp://localhost::1935/testapp/instance1
|
||||
// fpad 是否使用代理 true or false
|
||||
// audioCodecs 指示客户端支持的音频编解码器 SUPPORT_SND_MP3
|
||||
// videoCodecs 指示支持的视频编解码器 SUPPORT_VID_SORENSON
|
||||
// pageUrl SWF文件被加载的页面的Url http:// somehost/sample.html
|
||||
// objectEncoding AMF编码方法 AMF编码方法 kAMF3
|
||||
|
||||
// Call Message.
|
||||
// The call method of the NetConnection object runs remote procedure calls (RPC) at the receiving end.
|
||||
// The called RPC name is passed as a parameter to the call command.
|
||||
type CallMessage struct {
|
||||
CommandMessage
|
||||
Object interface{} `json:",omitempty"`
|
||||
Optional interface{} `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (msg *CallMessage) Encode() []byte {
|
||||
amf := newAMFEncoder()
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
|
||||
if msg.Object != nil {
|
||||
amf.encodeObject(msg.Object.(AMFObjects))
|
||||
}
|
||||
if msg.Optional != nil {
|
||||
amf.encodeObject(msg.Optional.(AMFObjects))
|
||||
}
|
||||
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
func (msg *CallMessage) Encode3() []byte {
|
||||
buf := new(bytes.Buffer)
|
||||
buf.WriteByte(0)
|
||||
buf.Write(msg.Encode())
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// Create Stream Message.
|
||||
// The client sends this command to the server to create a logical channel for message communication The publishing of audio,
|
||||
// video, and metadata is carried out over stream channel created using the createStream command.
|
||||
|
||||
type CreateStreamMessage struct {
|
||||
CommandMessage
|
||||
Object interface{}
|
||||
}
|
||||
|
||||
func (msg *CreateStreamMessage) Encode() []byte {
|
||||
amf := newAMFEncoder()
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
|
||||
if msg.Object != nil {
|
||||
amf.encodeObject(msg.Object.(AMFObjects))
|
||||
}
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
/*
|
||||
func (msg *CreateStreamMessage) Encode3() {
|
||||
msg.Encode0()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
buf.WriteByte(0)
|
||||
buf.Write(msg.RtmpBody)
|
||||
msg.RtmpBody = buf.Bytes()
|
||||
}*/
|
||||
|
||||
// The following commands can be sent on the NetStream by the client to the server:
|
||||
|
||||
// Play
|
||||
// Play2
|
||||
// DeleteStream
|
||||
// CloseStream
|
||||
// ReceiveAudio
|
||||
// ReceiveVideo
|
||||
// Publish
|
||||
// Seek
|
||||
// Pause
|
||||
// Release(37)
|
||||
// FCPublish
|
||||
|
||||
// Play Message
|
||||
// The client sends this command to the server to play a stream. A playlist can also be created using this command multiple times
|
||||
type PlayMessage struct {
|
||||
CommandMessage
|
||||
StreamName string
|
||||
Start uint64
|
||||
Duration uint64
|
||||
Rest bool
|
||||
}
|
||||
|
||||
// 命令名 -> 命令名,设置为”play”
|
||||
// 传输ID -> 0
|
||||
// 命令对象
|
||||
// 流名字 -> 要播放流的名字
|
||||
// start -> 可选的参数,以秒为单位定义开始时间.默认值为 -2,表示用户首先尝试播放流名字段中定义的直播流.
|
||||
// Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束
|
||||
// Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush
|
||||
|
||||
func (msg *PlayMessage) Encode() []byte {
|
||||
amf := newAMFEncoder()
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
amf.writeNull()
|
||||
amf.writeString(msg.StreamName)
|
||||
|
||||
if msg.Start > 0 {
|
||||
amf.writeNumber(float64(msg.Start))
|
||||
}
|
||||
if msg.Duration > 0 {
|
||||
amf.writeNumber(float64(msg.Duration))
|
||||
}
|
||||
|
||||
amf.writeBool(msg.Rest)
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
/*
|
||||
func (msg *PlayMessage) Encode3() {
|
||||
}*/
|
||||
|
||||
// Play2 Message
|
||||
// Unlike the play command, play2 can switch to a different bit rate stream without changing the timeline of the content played. The
|
||||
// server maintains multiple files for all supported bitrates that the client can request in play2.
|
||||
type Play2Message struct {
|
||||
CommandMessage
|
||||
StartTime uint64
|
||||
OldStreamName string
|
||||
StreamName string
|
||||
Duration uint64
|
||||
Transition string
|
||||
}
|
||||
|
||||
func (msg *Play2Message) Encode0() {
|
||||
}
|
||||
|
||||
// Delete Stream Message
|
||||
// NetStream sends the deleteStream command when the NetStream object is getting destroyed
|
||||
type CURDStreamMessage struct {
|
||||
CommandMessage
|
||||
StreamId uint32
|
||||
}
|
||||
|
||||
func (msg *CURDStreamMessage) Encode0() {
|
||||
}
|
||||
|
||||
// Receive Audio Message
|
||||
// NetStream sends the receiveAudio message to inform the server whether to send or not to send the audio to the client
|
||||
type ReceiveAVMessage struct {
|
||||
CommandMessage
|
||||
BoolFlag bool
|
||||
}
|
||||
|
||||
func (msg *ReceiveAVMessage) Encode0() {
|
||||
}
|
||||
|
||||
// Publish Message
|
||||
// The client sends the publish command to publish a named stream to the server. Using this name,
|
||||
// any client can play this stream and receive the published audio, video, and data messages
|
||||
type PublishMessage struct {
|
||||
CommandMessage
|
||||
PublishingName string
|
||||
PublishingType string
|
||||
}
|
||||
|
||||
// 命令名 -> 命令名,设置为”publish”
|
||||
// 传输ID -> 0
|
||||
// 命令对象
|
||||
// 发布名 -> 流发布的名字
|
||||
// 发布类型 -> 设置为”live”,”record”或”append”.
|
||||
|
||||
// “record”:流被发布,并且数据被录制到一个新的文件,文件被存储到服务端的服务应用的目录的一个子目录下.如果文件已经存在则重写文件.
|
||||
// “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件.
|
||||
// “live”:发布直播数据而不录制到文件
|
||||
|
||||
func (msg *PublishMessage) Encode0() {
|
||||
}
|
||||
|
||||
// Seek Message
|
||||
// The client sends the seek command to seek the offset (in milliseconds) within a media file or playlist.
|
||||
type SeekMessage struct {
|
||||
CommandMessage
|
||||
Milliseconds uint64
|
||||
}
|
||||
|
||||
func (msg *SeekMessage) Encode0() {
|
||||
}
|
||||
|
||||
// Pause Message
|
||||
// The client sends the pause command to tell the server to pause or start playing.
|
||||
type PauseMessage struct {
|
||||
CommandMessage
|
||||
Pause bool
|
||||
Milliseconds uint64
|
||||
}
|
||||
|
||||
// 命令名 -> 命令名,设置为”pause”
|
||||
// 传输ID -> 0
|
||||
// 命令对象 -> null
|
||||
// Pause/Unpause Flag -> true 或者 false,来指示暂停或者重新播放
|
||||
// milliSeconds -> 流暂停或者重新开始所在的毫秒数.这个是客户端暂停的当前流时间.当回放已恢复时,服务器端值发送带有比这个值大的 timestamp 消息
|
||||
|
||||
func (msg *PauseMessage) Encode0() {
|
||||
}
|
||||
|
||||
//
|
||||
// Response Message. Server -> Response -> Client
|
||||
//
|
||||
|
||||
//
|
||||
// Response Connect Message
|
||||
//
|
||||
type ResponseConnectMessage struct {
|
||||
CommandMessage
|
||||
Properties interface{} `json:",omitempty"`
|
||||
Infomation interface{} `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (msg *ResponseConnectMessage) Encode() []byte {
|
||||
amf := newAMFEncoder()
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
|
||||
if msg.Properties != nil {
|
||||
amf.encodeObject(msg.Properties.(AMFObjects))
|
||||
}
|
||||
if msg.Infomation != nil {
|
||||
amf.encodeObject(msg.Infomation.(AMFObjects))
|
||||
}
|
||||
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
/*
|
||||
func (msg *ResponseConnectMessage) Encode3() {
|
||||
}*/
|
||||
|
||||
// Response Call Message
|
||||
//
|
||||
type ResponseCallMessage struct {
|
||||
CommandMessage
|
||||
Object interface{}
|
||||
Response interface{}
|
||||
}
|
||||
|
||||
func (msg *ResponseCallMessage) Encode0() []byte {
|
||||
amf := newAMFEncoder()
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
|
||||
if msg.Object != nil {
|
||||
amf.encodeObject(msg.Object.(AMFObjects))
|
||||
}
|
||||
if msg.Response != nil {
|
||||
amf.encodeObject(msg.Response.(AMFObjects))
|
||||
}
|
||||
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
//
|
||||
// Response Create Stream Message
|
||||
//
|
||||
type ResponseCreateStreamMessage struct {
|
||||
CommandMessage
|
||||
Object interface{} `json:",omitempty"`
|
||||
StreamId uint32
|
||||
}
|
||||
|
||||
func (msg *ResponseCreateStreamMessage) Encode() []byte {
|
||||
amf := newAMFEncoder() // rtmp_amf.go
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
amf.writeNull()
|
||||
amf.writeNumber(float64(msg.StreamId))
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
/*
|
||||
func (msg *ResponseCreateStreamMessage) Encode3() {
|
||||
}*/
|
||||
|
||||
func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) {
|
||||
amf := newAMFDecoder(chunk.Body)
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.CommandName = obj.(string)
|
||||
}
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.TransactionId = uint64(obj.(float64))
|
||||
}
|
||||
|
||||
amf.decodeObject()
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.StreamId = uint32(obj.(float64))
|
||||
}
|
||||
}
|
||||
func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) {
|
||||
chunk.Body = chunk.Body[1:]
|
||||
msg.Decode0(chunk)
|
||||
}
|
||||
|
||||
//
|
||||
// Response Play Message
|
||||
//
|
||||
type ResponsePlayMessage struct {
|
||||
CommandMessage
|
||||
Object interface{} `json:",omitempty"`
|
||||
Description string
|
||||
StreamID uint32
|
||||
}
|
||||
|
||||
func (msg *ResponsePlayMessage) GetStreamID() uint32 {
|
||||
return msg.StreamID
|
||||
}
|
||||
func (msg *ResponsePlayMessage) Encode() []byte {
|
||||
amf := newAMFEncoder() // rtmp_amf.go
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
amf.writeNull()
|
||||
if msg.Object != nil {
|
||||
amf.encodeObject(msg.Object.(AMFObjects))
|
||||
}
|
||||
amf.writeString(msg.Description)
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
/*
|
||||
func (msg *ResponsePlayMessage) Encode3() {
|
||||
}*/
|
||||
|
||||
func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) {
|
||||
amf := newAMFDecoder(chunk.Body)
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.CommandName = obj.(string)
|
||||
}
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.TransactionId = uint64(obj.(float64))
|
||||
}
|
||||
|
||||
obj, err := amf.decodeObject()
|
||||
if err == nil && obj != nil {
|
||||
msg.Object = obj
|
||||
} else if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.Object = obj
|
||||
}
|
||||
}
|
||||
func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) {
|
||||
chunk.Body = chunk.Body[1:]
|
||||
msg.Decode0(chunk)
|
||||
}
|
||||
|
||||
//
|
||||
// Response Publish Message
|
||||
//
|
||||
type ResponsePublishMessage struct {
|
||||
CommandMessage
|
||||
Properties interface{} `json:",omitempty"`
|
||||
Infomation interface{} `json:",omitempty"`
|
||||
StreamID uint32
|
||||
}
|
||||
|
||||
func (msg *ResponsePublishMessage) GetStreamID() uint32 {
|
||||
return msg.StreamID
|
||||
}
|
||||
|
||||
// 命令名 -> 命令名,设置为"OnStatus"
|
||||
// 传输ID -> 0
|
||||
// 属性 -> null
|
||||
// 信息 -> level, code, description
|
||||
|
||||
func (msg *ResponsePublishMessage) Encode() []byte {
|
||||
amf := newAMFEncoder()
|
||||
amf.writeString(msg.CommandName)
|
||||
amf.writeNumber(float64(msg.TransactionId))
|
||||
amf.writeNull()
|
||||
|
||||
if msg.Properties != nil {
|
||||
amf.encodeObject(msg.Properties.(AMFObjects))
|
||||
}
|
||||
if msg.Infomation != nil {
|
||||
amf.encodeObject(msg.Infomation.(AMFObjects))
|
||||
}
|
||||
|
||||
return amf.Bytes()
|
||||
}
|
||||
|
||||
/*
|
||||
func (msg *ResponsePublishMessage) Encode3() {
|
||||
}*/
|
||||
|
||||
//
|
||||
// Response Seek Message
|
||||
//
|
||||
type ResponseSeekMessage struct {
|
||||
CommandMessage
|
||||
Description string
|
||||
}
|
||||
|
||||
func (msg *ResponseSeekMessage) Encode0() {
|
||||
}
|
||||
|
||||
//func (msg *ResponseSeekMessage) Encode3() {
|
||||
//}
|
||||
|
||||
//
|
||||
// Response Pause Message
|
||||
//
|
||||
type ResponsePauseMessage struct {
|
||||
CommandMessage
|
||||
Description string
|
||||
}
|
||||
|
||||
// 命令名 -> 命令名,设置为"OnStatus"
|
||||
// 传输ID -> 0
|
||||
// 描述
|
||||
|
||||
func (msg *ResponsePauseMessage) Encode0() {
|
||||
}
|
||||
|
||||
//func (msg *ResponsePauseMessage) Encode3() {
|
||||
//}
|
||||
|
||||
//
|
||||
// Response Message
|
||||
//
|
||||
type ResponseMessage struct {
|
||||
CommandMessage
|
||||
Properties interface{} `json:",omitempty"`
|
||||
Infomation interface{} `json:",omitempty"`
|
||||
Description string
|
||||
}
|
||||
|
||||
func (msg *ResponseMessage) Encode0() {
|
||||
}
|
||||
|
||||
//func (msg *ResponseMessage) Encode3() {
|
||||
//}
|
||||
|
||||
func (msg *ResponseMessage) Decode0(chunk *Chunk) {
|
||||
amf := newAMFDecoder(chunk.Body)
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.CommandName = obj.(string)
|
||||
}
|
||||
if obj, err := amf.decodeObject(); err == nil {
|
||||
msg.TransactionId = uint64(obj.(float64))
|
||||
}
|
||||
}
|
||||
|
||||
// User Control Message 4.
|
||||
// The client or the server sends this message to notify the peer about the user control events.
|
||||
// For information about the message format, see Section 6.2.
|
||||
|
||||
// The following user control event types are supported:
|
||||
|
||||
// Stream Begin (=0)
|
||||
// The server sends this event to notify the client that a stream has become functional and can be
|
||||
// used for communication. By default, this event is sent on ID 0 after the application connect
|
||||
// command is successfully received from the client. The event data is 4-byte and represents
|
||||
// the stream ID of the stream that became functional.
|
||||
type StreamIDMessage struct {
|
||||
UserControlMessage
|
||||
StreamID uint32
|
||||
}
|
||||
|
||||
func (msg *StreamIDMessage) Encode() (b []byte) {
|
||||
b = make([]byte, 6)
|
||||
util.BigEndian.PutUint16(b, msg.EventType)
|
||||
util.BigEndian.PutUint32(b[2:], msg.StreamID)
|
||||
msg.EventData = b[2:]
|
||||
return
|
||||
}
|
||||
|
||||
// SetBuffer Length (=3)
|
||||
// The client sends this event to inform the server of the buffer size (in milliseconds) that is
|
||||
// used to buffer any data coming over a stream. This event is sent before the server starts |
|
||||
// processing the stream. The first 4 bytes of the event data represent the stream ID and the next |
|
||||
// 4 bytes represent the buffer length, in milliseconds.
|
||||
type SetBufferMessage struct {
|
||||
StreamIDMessage
|
||||
Millisecond uint32
|
||||
}
|
||||
|
||||
func (msg *SetBufferMessage) Encode() []byte {
|
||||
b := make([]byte, 10)
|
||||
util.BigEndian.PutUint16(b, msg.EventType)
|
||||
util.BigEndian.PutUint32(b[2:], msg.StreamID)
|
||||
util.BigEndian.PutUint32(b[6:], msg.Millisecond)
|
||||
msg.EventData = b[2:]
|
||||
return b
|
||||
}
|
||||
|
||||
// PingRequest (=6)
|
||||
// The server sends this event to test whether the client is reachable. Event data is a 4-byte
|
||||
// timestamp, representing the local server time when the server dispatched the command.
|
||||
// The client responds with PingResponse on receiving MsgPingRequest.
|
||||
type PingRequestMessage struct {
|
||||
UserControlMessage
|
||||
Timestamp uint32
|
||||
}
|
||||
|
||||
func (msg *PingRequestMessage) Encode() (b []byte) {
|
||||
b = make([]byte, 6)
|
||||
util.BigEndian.PutUint16(b, msg.EventType)
|
||||
util.BigEndian.PutUint32(b[2:], msg.Timestamp)
|
||||
msg.EventData = b[2:]
|
||||
return
|
||||
}
|
||||
|
||||
func (msg *UserControlMessage) Encode() []byte {
|
||||
b := make([]byte, 2)
|
||||
util.BigEndian.PutUint16(b, msg.EventType)
|
||||
msg.EventData = b[2:]
|
||||
return b
|
||||
}
|
731
netConnection.go
Normal file
731
netConnection.go
Normal file
@@ -0,0 +1,731 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"github.com/Monibuca/engine/avformat"
|
||||
"github.com/Monibuca/engine/pool"
|
||||
"github.com/Monibuca/engine/util"
|
||||
"io"
|
||||
"log"
|
||||
)
|
||||
|
||||
const (
|
||||
SEND_CHUNK_SIZE_MESSAGE = "Send Chunk Size Message"
|
||||
SEND_ACK_MESSAGE = "Send Acknowledgement Message"
|
||||
SEND_ACK_WINDOW_SIZE_MESSAGE = "Send Window Acknowledgement Size Message"
|
||||
SEND_SET_PEER_BANDWIDTH_MESSAGE = "Send Set Peer Bandwidth Message"
|
||||
|
||||
SEND_STREAM_BEGIN_MESSAGE = "Send Stream Begin Message"
|
||||
SEND_SET_BUFFER_LENGTH_MESSAGE = "Send Set Buffer Lengh Message"
|
||||
SEND_STREAM_IS_RECORDED_MESSAGE = "Send Stream Is Recorded Message"
|
||||
|
||||
SEND_PING_REQUEST_MESSAGE = "Send Ping Request Message"
|
||||
SEND_PING_RESPONSE_MESSAGE = "Send Ping Response Message"
|
||||
|
||||
SEND_CONNECT_MESSAGE = "Send Connect Message"
|
||||
SEND_CONNECT_RESPONSE_MESSAGE = "Send Connect Response Message"
|
||||
|
||||
SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message"
|
||||
SEND_CREATE_STREAM_RESPONSE_MESSAGE = "Send Create Stream Response Message"
|
||||
|
||||
SEND_PLAY_MESSAGE = "Send Play Message"
|
||||
SEND_PLAY_RESPONSE_MESSAGE = "Send Play Response Message"
|
||||
|
||||
SEND_PUBLISH_RESPONSE_MESSAGE = "Send Publish Response Message"
|
||||
SEND_PUBLISH_START_MESSAGE = "Send Publish Start Message"
|
||||
|
||||
SEND_UNPUBLISH_RESPONSE_MESSAGE = "Send Unpublish Response Message"
|
||||
|
||||
SEND_AUDIO_MESSAGE = "Send Audio Message"
|
||||
SEND_FULL_AUDIO_MESSAGE = "Send Full Audio Message"
|
||||
SEND_VIDEO_MESSAGE = "Send Video Message"
|
||||
SEND_FULL_VDIEO_MESSAGE = "Send Full Video Message"
|
||||
)
|
||||
|
||||
func newConnectResponseMessageData(objectEncoding float64) (amfobj AMFObjects) {
|
||||
amfobj = newAMFObjects()
|
||||
amfobj["fmsVer"] = "monibuca/1.0"
|
||||
amfobj["capabilities"] = 31
|
||||
amfobj["mode"] = 1
|
||||
amfobj["Author"] = "dexter"
|
||||
amfobj["level"] = Level_Status
|
||||
amfobj["code"] = NetConnection_Connect_Success
|
||||
amfobj["objectEncoding"] = uint64(objectEncoding)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func newPublishResponseMessageData(streamid uint32, code, level string) (amfobj AMFObjects) {
|
||||
amfobj = newAMFObjects()
|
||||
amfobj["code"] = code
|
||||
amfobj["level"] = level
|
||||
amfobj["streamid"] = streamid
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func newPlayResponseMessageData(streamid uint32, code, level string) (amfobj AMFObjects) {
|
||||
amfobj = newAMFObjects()
|
||||
amfobj["code"] = code
|
||||
amfobj["level"] = level
|
||||
amfobj["streamid"] = streamid
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type NetConnection struct {
|
||||
*bufio.ReadWriter
|
||||
bandwidth uint32
|
||||
readSeqNum uint32 // 当前读的字节
|
||||
writeSeqNum uint32 // 当前写的字节
|
||||
totalWrite uint32 // 总共写了多少字节
|
||||
totalRead uint32 // 总共读了多少字节
|
||||
writeChunkSize int
|
||||
readChunkSize int
|
||||
incompleteRtmpBody map[uint32][]byte // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来
|
||||
nextStreamID func(uint32) uint32 // 下一个流ID
|
||||
streamID uint32 // 流ID
|
||||
rtmpHeader map[uint32]*ChunkHeader // RtmpHeader
|
||||
objectEncoding float64
|
||||
appName string
|
||||
}
|
||||
|
||||
func (conn *NetConnection) OnConnect() (err error) {
|
||||
var msg *Chunk
|
||||
if msg, err = conn.RecvMessage(); err == nil {
|
||||
defer chunkMsgPool.Put(msg)
|
||||
if connect, ok := msg.MsgData.(*CallMessage); ok {
|
||||
if connect.CommandName == "connect" {
|
||||
app := DecodeAMFObject(connect.Object, "app") // 客户端要连接到的服务应用名
|
||||
objectEncoding := DecodeAMFObject(connect.Object, "objectEncoding") // AMF编码方法
|
||||
if objectEncoding != nil {
|
||||
conn.objectEncoding = objectEncoding.(float64)
|
||||
}
|
||||
conn.appName = app.(string)
|
||||
log.Printf("app:%v,objectEncoding:%v", app, objectEncoding)
|
||||
err = conn.SendMessage(SEND_ACK_WINDOW_SIZE_MESSAGE, uint32(512<<10))
|
||||
err = conn.SendMessage(SEND_SET_PEER_BANDWIDTH_MESSAGE, uint32(512<<10))
|
||||
err = conn.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
|
||||
err = conn.SendMessage(SEND_CONNECT_RESPONSE_MESSAGE, conn.objectEncoding)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
func (conn *NetConnection) SendMessage(message string, args interface{}) error {
|
||||
switch message {
|
||||
case SEND_CHUNK_SIZE_MESSAGE:
|
||||
size, ok := args.(uint32)
|
||||
if !ok {
|
||||
return errors.New(SEND_CHUNK_SIZE_MESSAGE + ", The parameter only one(size uint32)!")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(size))
|
||||
case SEND_ACK_MESSAGE:
|
||||
num, ok := args.(uint32)
|
||||
if !ok {
|
||||
return errors.New(SEND_ACK_MESSAGE + ", The parameter only one(number uint32)!")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_ACK, Uint32Message(num))
|
||||
case SEND_ACK_WINDOW_SIZE_MESSAGE:
|
||||
size, ok := args.(uint32)
|
||||
if !ok {
|
||||
return errors.New(SEND_ACK_WINDOW_SIZE_MESSAGE + ", The parameter only one(size uint32)!")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_ACK_SIZE, Uint32Message(size))
|
||||
case SEND_SET_PEER_BANDWIDTH_MESSAGE:
|
||||
size, ok := args.(uint32)
|
||||
if !ok {
|
||||
return errors.New(SEND_SET_PEER_BANDWIDTH_MESSAGE + ", The parameter only one(size uint32)!")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_BANDWIDTH, &SetPeerBandwidthMessage{
|
||||
AcknowledgementWindowsize: size,
|
||||
LimitType: byte(2),
|
||||
})
|
||||
case SEND_STREAM_BEGIN_MESSAGE:
|
||||
if args != nil {
|
||||
return errors.New(SEND_STREAM_BEGIN_MESSAGE + ", The parameter is nil")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: RTMP_USER_STREAM_BEGIN}, conn.streamID})
|
||||
case SEND_STREAM_IS_RECORDED_MESSAGE:
|
||||
if args != nil {
|
||||
return errors.New(SEND_STREAM_IS_RECORDED_MESSAGE + ", The parameter is nil")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: RTMP_USER_STREAM_IS_RECORDED}, conn.streamID})
|
||||
|
||||
case SEND_SET_BUFFER_LENGTH_MESSAGE:
|
||||
if args != nil {
|
||||
return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil")
|
||||
}
|
||||
m := new(SetBufferMessage)
|
||||
m.EventType = RTMP_USER_SET_BUFFLEN
|
||||
m.Millisecond = 100
|
||||
m.StreamID = conn.streamID
|
||||
return conn.writeMessage(RTMP_MSG_USER_CONTROL, m)
|
||||
case SEND_PING_REQUEST_MESSAGE:
|
||||
if args != nil {
|
||||
return errors.New(SEND_PING_REQUEST_MESSAGE + ", The parameter is nil")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_USER_CONTROL, &UserControlMessage{EventType: RTMP_USER_PING_REQUEST})
|
||||
case SEND_PING_RESPONSE_MESSAGE:
|
||||
if args != nil {
|
||||
return errors.New(SEND_PING_RESPONSE_MESSAGE + ", The parameter is nil")
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_USER_CONTROL, &UserControlMessage{EventType: RTMP_USER_PING_RESPONSE})
|
||||
case SEND_CREATE_STREAM_MESSAGE:
|
||||
if args != nil {
|
||||
return errors.New(SEND_CREATE_STREAM_MESSAGE + ", The parameter is nil")
|
||||
}
|
||||
|
||||
m := &CreateStreamMessage{}
|
||||
m.CommandName = "createStream"
|
||||
m.TransactionId = 1
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_CREATE_STREAM_RESPONSE_MESSAGE:
|
||||
tid, ok := args.(uint64)
|
||||
if !ok {
|
||||
return errors.New(SEND_CREATE_STREAM_RESPONSE_MESSAGE + ", The parameter only one(TransactionId uint64)!")
|
||||
}
|
||||
m := &ResponseCreateStreamMessage{}
|
||||
m.CommandName = Response_Result
|
||||
m.TransactionId = tid
|
||||
m.StreamId = conn.streamID
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_PLAY_MESSAGE:
|
||||
data, ok := args.(map[interface{}]interface{})
|
||||
if !ok {
|
||||
errors.New(SEND_PLAY_MESSAGE + ", The parameter is map[interface{}]interface{}")
|
||||
}
|
||||
m := new(PlayMessage)
|
||||
m.CommandName = "play"
|
||||
m.TransactionId = 1
|
||||
for i, v := range data {
|
||||
if i == "StreamPath" {
|
||||
m.StreamName = v.(string)
|
||||
} else if i == "Start" {
|
||||
m.Start = v.(uint64)
|
||||
} else if i == "Duration" {
|
||||
m.Duration = v.(uint64)
|
||||
} else if i == "Rest" {
|
||||
m.Rest = v.(bool)
|
||||
}
|
||||
}
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_PLAY_RESPONSE_MESSAGE:
|
||||
data, ok := args.(AMFObjects)
|
||||
if !ok {
|
||||
errors.New(SEND_PLAY_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
|
||||
}
|
||||
|
||||
obj := newAMFObjects()
|
||||
var streamID uint32
|
||||
|
||||
for i, v := range data {
|
||||
switch i {
|
||||
case "code", "level":
|
||||
obj[i] = v
|
||||
case "streamid":
|
||||
if t, ok := v.(uint32); ok {
|
||||
streamID = t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
obj["clientid"] = 1
|
||||
|
||||
m := new(ResponsePlayMessage)
|
||||
m.CommandName = Response_OnStatus
|
||||
m.TransactionId = 0
|
||||
m.Object = obj
|
||||
m.StreamID = streamID
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_CONNECT_RESPONSE_MESSAGE:
|
||||
data := newConnectResponseMessageData(args.(float64))
|
||||
//if !ok {
|
||||
// errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
|
||||
//}
|
||||
|
||||
//pro := newAMFObjects()
|
||||
info := newAMFObjects()
|
||||
|
||||
//for i, v := range data {
|
||||
// switch i {
|
||||
// case "fmsVer", "capabilities", "mode", "Author", "level", "code", "objectEncoding":
|
||||
// pro[i] = v
|
||||
// }
|
||||
//}
|
||||
m := new(ResponseConnectMessage)
|
||||
m.CommandName = Response_Result
|
||||
m.TransactionId = 1
|
||||
m.Properties = data
|
||||
m.Infomation = info
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_CONNECT_MESSAGE:
|
||||
data, ok := args.(AMFObjects)
|
||||
if !ok {
|
||||
errors.New(SEND_CONNECT_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
|
||||
}
|
||||
|
||||
obj := newAMFObjects()
|
||||
info := newAMFObjects()
|
||||
|
||||
for i, v := range data {
|
||||
switch i {
|
||||
case "videoFunction", "objectEncoding", "fpad", "flashVer", "capabilities", "pageUrl", "swfUrl", "tcUrl", "videoCodecs", "app", "audioCodecs":
|
||||
obj[i] = v
|
||||
}
|
||||
}
|
||||
|
||||
m := new(CallMessage)
|
||||
m.CommandName = "connect"
|
||||
m.TransactionId = 1
|
||||
m.Object = obj
|
||||
m.Optional = info
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_PUBLISH_RESPONSE_MESSAGE, SEND_PUBLISH_START_MESSAGE:
|
||||
data, ok := args.(AMFObjects)
|
||||
if !ok {
|
||||
errors.New(SEND_CONNECT_MESSAGE + "or" + SEND_PUBLISH_START_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
|
||||
}
|
||||
|
||||
info := newAMFObjects()
|
||||
var streamID uint32
|
||||
|
||||
for i, v := range data {
|
||||
switch i {
|
||||
case "code", "level":
|
||||
info[i] = v
|
||||
case "streamid":
|
||||
if t, ok := v.(uint32); ok {
|
||||
streamID = t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info["clientid"] = 1
|
||||
|
||||
m := new(ResponsePublishMessage)
|
||||
m.CommandName = Response_OnStatus
|
||||
m.TransactionId = 0
|
||||
m.Infomation = info
|
||||
m.StreamID = streamID
|
||||
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||
case SEND_UNPUBLISH_RESPONSE_MESSAGE:
|
||||
case SEND_FULL_AUDIO_MESSAGE:
|
||||
audio, ok := args.(*avformat.SendPacket)
|
||||
if !ok {
|
||||
errors.New(message + ", The parameter is AVPacket")
|
||||
}
|
||||
|
||||
return conn.sendAVMessage(audio, true, true)
|
||||
case SEND_AUDIO_MESSAGE:
|
||||
audio, ok := args.(*avformat.SendPacket)
|
||||
if !ok {
|
||||
errors.New(message + ", The parameter is AVPacket")
|
||||
}
|
||||
|
||||
return conn.sendAVMessage(audio, true, false)
|
||||
case SEND_FULL_VDIEO_MESSAGE:
|
||||
video, ok := args.(*avformat.SendPacket)
|
||||
if !ok {
|
||||
errors.New(message + ", The parameter is AVPacket")
|
||||
}
|
||||
|
||||
return conn.sendAVMessage(video, false, true)
|
||||
case SEND_VIDEO_MESSAGE:
|
||||
{
|
||||
video, ok := args.(*avformat.SendPacket)
|
||||
if !ok {
|
||||
errors.New(message + ", The parameter is AVPacket")
|
||||
}
|
||||
|
||||
return conn.sendAVMessage(video, false, false)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("send message no exist")
|
||||
}
|
||||
|
||||
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
|
||||
// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值
|
||||
// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同
|
||||
func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, isFirst bool) error {
|
||||
if conn.writeSeqNum > conn.bandwidth {
|
||||
conn.totalWrite += conn.writeSeqNum
|
||||
conn.writeSeqNum = 0
|
||||
conn.SendMessage(SEND_ACK_MESSAGE, conn.totalWrite)
|
||||
conn.SendMessage(SEND_PING_REQUEST_MESSAGE, nil)
|
||||
}
|
||||
|
||||
var err error
|
||||
var mark []byte
|
||||
var need []byte
|
||||
var head *ChunkHeader
|
||||
|
||||
if isAudio {
|
||||
head = newRtmpHeader(RTMP_CSID_AUDIO, av.Timestamp, uint32(len(av.Packet.Payload)), RTMP_MSG_AUDIO, conn.streamID, 0)
|
||||
} else {
|
||||
head = newRtmpHeader(RTMP_CSID_VIDEO, av.Timestamp, uint32(len(av.Packet.Payload)), RTMP_MSG_VIDEO, conn.streamID, 0)
|
||||
}
|
||||
|
||||
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
|
||||
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
|
||||
// 当Chunk Type为0时(即Chunk12),
|
||||
if isFirst {
|
||||
mark, need, err = encodeChunk12(head, av.Packet.Payload, conn.writeChunkSize)
|
||||
} else {
|
||||
mark, need, err = encodeChunk8(head, av.Packet.Payload, conn.writeChunkSize)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = conn.Write(mark)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.writeSeqNum += uint32(len(mark))
|
||||
|
||||
// 如果音视频数据太大,一次发送不完,那么在这里进行分割(data + Chunk Basic Header(1))
|
||||
for need != nil && len(need) > 0 {
|
||||
mark, need, err = encodeChunk1(head, need, conn.writeChunkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = conn.Write(mark)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.writeSeqNum += uint32(len(mark))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
|
||||
head, err := conn.ReadByte()
|
||||
conn.readSeqNum++
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ChunkStreamID := uint32(head & 0x3f) // 0011 1111
|
||||
ChunkType := (head & 0xc0) >> 6 // 1100 0000
|
||||
|
||||
// 如果块流ID为0,1的话,就需要计算.
|
||||
ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID)
|
||||
if err != nil {
|
||||
return nil, errors.New("get chunk stream id error :" + err.Error())
|
||||
}
|
||||
|
||||
h, ok := conn.rtmpHeader[ChunkStreamID]
|
||||
if !ok {
|
||||
h = new(ChunkHeader)
|
||||
h.ChunkStreamID = ChunkStreamID
|
||||
h.ChunkType = ChunkType
|
||||
conn.rtmpHeader[ChunkStreamID] = h
|
||||
}
|
||||
currentBody, ok := conn.incompleteRtmpBody[ChunkStreamID]
|
||||
if ChunkType != 3 && ok {
|
||||
// 如果块类型不为3,那么这个rtmp的body应该为空.
|
||||
return nil, errors.New("incompleteRtmpBody error")
|
||||
}
|
||||
|
||||
chunkHead, err := conn.readChunkType(h, ChunkType)
|
||||
if err != nil {
|
||||
return nil, errors.New("get chunk type error :" + err.Error())
|
||||
}
|
||||
msgLen := int(chunkHead.MessageLength)
|
||||
if !ok {
|
||||
currentBody = (pool.GetSlice(msgLen))[:0]
|
||||
conn.incompleteRtmpBody[ChunkStreamID] = currentBody
|
||||
}
|
||||
|
||||
markRead := len(currentBody)
|
||||
needRead := conn.readChunkSize
|
||||
unRead := msgLen - markRead
|
||||
if unRead < needRead {
|
||||
needRead = unRead
|
||||
}
|
||||
if n, err := io.ReadFull(conn, currentBody[markRead:needRead+markRead]); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
markRead += n
|
||||
conn.readSeqNum += uint32(n)
|
||||
}
|
||||
currentBody = currentBody[:markRead]
|
||||
conn.incompleteRtmpBody[ChunkStreamID] = currentBody
|
||||
|
||||
// 如果读完了一个完整的块,那么就返回这个消息,没读完继续递归读块.
|
||||
if markRead == msgLen {
|
||||
|
||||
msg := chunkMsgPool.Get().(*Chunk)
|
||||
msg.Body = currentBody
|
||||
msg.ChunkHeader = chunkHead.Clone()
|
||||
GetRtmpMessage(msg)
|
||||
delete(conn.incompleteRtmpBody, ChunkStreamID)
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
return conn.readChunk()
|
||||
}
|
||||
|
||||
func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32, err error) {
|
||||
chunkStreamID = csid
|
||||
|
||||
switch csid {
|
||||
case 0:
|
||||
{
|
||||
u8, err := conn.ReadByte()
|
||||
conn.readSeqNum++
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
chunkStreamID = 64 + uint32(u8)
|
||||
}
|
||||
case 1:
|
||||
{
|
||||
u16_0, err1 := conn.ReadByte()
|
||||
if err1 != nil {
|
||||
return 0, err1
|
||||
}
|
||||
u16_1, err1 := conn.ReadByte()
|
||||
if err1 != nil {
|
||||
return 0, err1
|
||||
}
|
||||
conn.readSeqNum += 2
|
||||
chunkStreamID = 64 + uint32(u16_0) + (uint32(u16_1) << 8)
|
||||
}
|
||||
}
|
||||
|
||||
return chunkStreamID, nil
|
||||
}
|
||||
|
||||
func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head *ChunkHeader, err error) {
|
||||
switch chunkType {
|
||||
case 0:
|
||||
{
|
||||
// Timestamp 3 bytes
|
||||
b := pool.GetSlice(3)
|
||||
if _, err := io.ReadFull(conn, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 3
|
||||
h.Timestamp = util.BigEndian.Uint24(b) //type = 0的时间戳为绝对时间,其他的都为相对时间
|
||||
|
||||
// Message Length 3 bytes
|
||||
if _, err = io.ReadFull(conn, b); err != nil { // 读取Message Length,这里的长度指的是一条信令或者一帧视频数据或音频数据的长度,而不是Chunk data的长度.
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 3
|
||||
h.MessageLength = util.BigEndian.Uint24(b)
|
||||
pool.RecycleSlice(b)
|
||||
// Message Type ID 1 bytes
|
||||
v, err := conn.ReadByte() // 读取Message Type ID
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum++
|
||||
h.MessageTypeID = v
|
||||
|
||||
// Message Stream ID 4bytes
|
||||
bb := pool.GetSlice(4)
|
||||
if _, err = io.ReadFull(conn, bb); err != nil { // 读取Message Stream ID
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 4
|
||||
h.MessageStreamID = util.LittleEndian.Uint32(bb)
|
||||
|
||||
// ExtendTimestamp 4 bytes
|
||||
if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求
|
||||
if _, err = io.ReadFull(conn, bb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 4
|
||||
h.ExtendTimestamp = util.BigEndian.Uint32(bb)
|
||||
}
|
||||
pool.RecycleSlice(bb)
|
||||
}
|
||||
case 1:
|
||||
{
|
||||
// Timestamp 3 bytes
|
||||
b := pool.GetSlice(3)
|
||||
if _, err = io.ReadFull(conn, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 3
|
||||
h.ChunkType = chunkType
|
||||
h.Timestamp = util.BigEndian.Uint24(b)
|
||||
|
||||
// Message Length 3 bytes
|
||||
if _, err = io.ReadFull(conn, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 3
|
||||
h.MessageLength = util.BigEndian.Uint24(b)
|
||||
pool.RecycleSlice(b)
|
||||
// Message Type ID 1 bytes
|
||||
v, err := conn.ReadByte()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum++
|
||||
h.MessageTypeID = v
|
||||
|
||||
// ExtendTimestamp 4 bytes
|
||||
if h.Timestamp == 0xffffff {
|
||||
bb := pool.GetSlice(4)
|
||||
if _, err := io.ReadFull(conn, bb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 4
|
||||
h.ExtendTimestamp = util.BigEndian.Uint32(bb)
|
||||
pool.RecycleSlice(bb)
|
||||
}
|
||||
}
|
||||
case 2:
|
||||
{
|
||||
// Timestamp 3 bytes
|
||||
b := pool.GetSlice(3)
|
||||
if _, err = io.ReadFull(conn, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 3
|
||||
h.ChunkType = chunkType
|
||||
h.Timestamp = util.BigEndian.Uint24(b)
|
||||
pool.RecycleSlice(b)
|
||||
// ExtendTimestamp 4 bytes
|
||||
if h.Timestamp == 0xffffff {
|
||||
bb := pool.GetSlice(4)
|
||||
if _, err := io.ReadFull(conn, bb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.readSeqNum += 4
|
||||
h.ExtendTimestamp = util.BigEndian.Uint32(bb)
|
||||
pool.RecycleSlice(bb)
|
||||
}
|
||||
}
|
||||
case 3:
|
||||
{
|
||||
h.ChunkType = chunkType
|
||||
}
|
||||
}
|
||||
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
|
||||
if conn.readSeqNum >= conn.bandwidth {
|
||||
conn.totalRead += conn.readSeqNum
|
||||
conn.readSeqNum = 0
|
||||
//sendAck(conn, conn.totalRead)
|
||||
conn.SendMessage(SEND_ACK_MESSAGE, conn.totalRead)
|
||||
}
|
||||
|
||||
msg, err = conn.readChunk()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 如果消息是类型是用户控制消息,那么我们就简单做一些相应的处理,
|
||||
// 然后继续读取下一个消息.如果不是用户控制消息,就将消息返回就好.
|
||||
messageType := msg.MessageTypeID
|
||||
if RTMP_MSG_CHUNK_SIZE <= messageType && messageType <= RTMP_MSG_EDGE {
|
||||
switch messageType {
|
||||
case RTMP_MSG_CHUNK_SIZE:
|
||||
m := msg.MsgData.(Uint32Message)
|
||||
conn.readChunkSize = int(m)
|
||||
return conn.RecvMessage()
|
||||
case RTMP_MSG_ABORT:
|
||||
m := msg.MsgData.(Uint32Message)
|
||||
delete(conn.incompleteRtmpBody, uint32(m))
|
||||
return conn.RecvMessage()
|
||||
case RTMP_MSG_ACK, RTMP_MSG_EDGE:
|
||||
return conn.RecvMessage()
|
||||
case RTMP_MSG_USER_CONTROL:
|
||||
if _, ok := msg.MsgData.(*PingRequestMessage); ok {
|
||||
//sendPingResponse(conn)
|
||||
conn.SendMessage(SEND_PING_RESPONSE_MESSAGE, nil)
|
||||
}
|
||||
return conn.RecvMessage()
|
||||
case RTMP_MSG_ACK_SIZE:
|
||||
m := msg.MsgData.(Uint32Message)
|
||||
conn.bandwidth = uint32(m)
|
||||
return conn.RecvMessage()
|
||||
case RTMP_MSG_BANDWIDTH:
|
||||
m := msg.MsgData.(*SetPeerBandwidthMessage)
|
||||
conn.bandwidth = m.AcknowledgementWindowsize
|
||||
return conn.RecvMessage()
|
||||
}
|
||||
}
|
||||
|
||||
return msg, err
|
||||
}
|
||||
func (conn *NetConnection) writeMessage(t byte, msg RtmpMessage) error {
|
||||
body := msg.Encode()
|
||||
head := newChunkHeader(t)
|
||||
head.MessageLength = uint32(len(body))
|
||||
if sid, ok := msg.(HaveStreamID); ok {
|
||||
head.MessageStreamID = sid.GetStreamID()
|
||||
}
|
||||
if conn.writeSeqNum > conn.bandwidth {
|
||||
conn.totalWrite += conn.writeSeqNum
|
||||
conn.writeSeqNum = 0
|
||||
conn.SendMessage(SEND_ACK_MESSAGE, conn.totalWrite)
|
||||
conn.SendMessage(SEND_PING_REQUEST_MESSAGE, nil)
|
||||
}
|
||||
|
||||
mark, need, err := encodeChunk12(head, body, conn.writeChunkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = conn.Write(mark)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.writeSeqNum += uint32(len(mark))
|
||||
|
||||
for need != nil && len(need) > 0 {
|
||||
mark, need, err = encodeChunk1(head, need, conn.writeChunkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = conn.Write(mark)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.writeSeqNum += uint32(len(mark))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
189
netStream.go
Normal file
189
netStream.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package rtmpplugin
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
. "github.com/Monibuca/engine"
|
||||
"github.com/Monibuca/engine/avformat"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RTMP struct {
|
||||
InputStream
|
||||
}
|
||||
|
||||
func ListenRtmp(addr string) error {
|
||||
defer log.Println("rtmp server start!")
|
||||
// defer fmt.Println("server start!")
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var tempDelay time.Duration
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
if tempDelay == 0 {
|
||||
tempDelay = 5 * time.Millisecond
|
||||
} else {
|
||||
tempDelay *= 2
|
||||
}
|
||||
if max := 1 * time.Second; tempDelay > max {
|
||||
tempDelay = max
|
||||
}
|
||||
fmt.Printf("rtmp: Accept error: %v; retrying in %v", err, tempDelay)
|
||||
time.Sleep(tempDelay)
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
tempDelay = 0
|
||||
go processRtmp(conn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var gstreamid = uint32(64)
|
||||
|
||||
func processRtmp(conn net.Conn) {
|
||||
var room *Room
|
||||
streams := make(map[uint32]*OutputStream)
|
||||
defer func() {
|
||||
conn.Close()
|
||||
if room != nil {
|
||||
room.Cancel()
|
||||
}
|
||||
}()
|
||||
var totalDuration uint32
|
||||
nc := &NetConnection{
|
||||
ReadWriter: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
|
||||
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
|
||||
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
|
||||
rtmpHeader: make(map[uint32]*ChunkHeader),
|
||||
incompleteRtmpBody: make(map[uint32][]byte),
|
||||
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
|
||||
nextStreamID: func(u uint32) uint32 {
|
||||
gstreamid++
|
||||
return gstreamid
|
||||
},
|
||||
}
|
||||
/* Handshake */
|
||||
if MayBeError(Handshake(nc.ReadWriter)) {
|
||||
return
|
||||
}
|
||||
if MayBeError(nc.OnConnect()) {
|
||||
return
|
||||
}
|
||||
for {
|
||||
if msg, err := nc.RecvMessage(); err == nil {
|
||||
if msg.MessageLength <= 0 {
|
||||
continue
|
||||
}
|
||||
switch msg.MessageTypeID {
|
||||
case RTMP_MSG_AMF0_COMMAND:
|
||||
if msg.MsgData == nil {
|
||||
break
|
||||
}
|
||||
cmd := msg.MsgData.(Commander).GetCommand()
|
||||
switch cmd.CommandName {
|
||||
case "createStream":
|
||||
nc.streamID = nc.nextStreamID(msg.ChunkStreamID)
|
||||
err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId)
|
||||
if MayBeError(err) {
|
||||
return
|
||||
}
|
||||
case "publish":
|
||||
pm := msg.MsgData.(*PublishMessage)
|
||||
streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0]
|
||||
pub := new(RTMP)
|
||||
if pub.Publish(streamPath, pub) {
|
||||
pub.FirstScreen = make([]*avformat.AVPacket, 0)
|
||||
room = pub.Room
|
||||
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
|
||||
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
|
||||
} else {
|
||||
err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, Level_Error, NetStream_Publish_BadName))
|
||||
}
|
||||
case "play":
|
||||
pm := msg.MsgData.(*PlayMessage)
|
||||
streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0]
|
||||
nc.writeChunkSize = 512
|
||||
stream := &OutputStream{SendHandler: func(packet *avformat.SendPacket) (err error) {
|
||||
switch true {
|
||||
case packet.Packet.IsADTS:
|
||||
tagPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO)
|
||||
tagPacket.Payload = avformat.ADTSToAudioSpecificConfig(packet.Packet.Payload)
|
||||
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, tagPacket)
|
||||
ADTSLength := 7 + (int(packet.Packet.Payload[1]&1) << 1)
|
||||
if len(packet.Packet.Payload) > ADTSLength {
|
||||
contentPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO)
|
||||
contentPacket.Timestamp = packet.Timestamp
|
||||
contentPacket.Payload = make([]byte, len(packet.Packet.Payload)-ADTSLength+2)
|
||||
contentPacket.Payload[0] = 0xAF
|
||||
contentPacket.Payload[1] = 0x01 //raw AAC
|
||||
copy(contentPacket.Payload[2:], packet.Packet.Payload[ADTSLength:])
|
||||
err = nc.SendMessage(SEND_AUDIO_MESSAGE, contentPacket)
|
||||
}
|
||||
case packet.Packet.IsAVCSequence:
|
||||
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, packet)
|
||||
case packet.Packet.Type == RTMP_MSG_VIDEO:
|
||||
err = nc.SendMessage(SEND_VIDEO_MESSAGE, packet)
|
||||
case packet.Packet.IsAACSequence:
|
||||
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, packet)
|
||||
case packet.Packet.Type == RTMP_MSG_AUDIO:
|
||||
err = nc.SendMessage(SEND_AUDIO_MESSAGE, packet)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
stream.Type = "RTMP"
|
||||
stream.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID)
|
||||
err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize))
|
||||
err = nc.SendMessage(SEND_STREAM_IS_RECORDED_MESSAGE, nil)
|
||||
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
|
||||
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
|
||||
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
|
||||
if err == nil {
|
||||
streams[nc.streamID] = stream
|
||||
go stream.Play(streamPath)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
case "closeStream":
|
||||
cm := msg.MsgData.(*CURDStreamMessage)
|
||||
if stream, ok := streams[cm.StreamId]; ok {
|
||||
stream.Cancel()
|
||||
delete(streams, cm.StreamId)
|
||||
}
|
||||
}
|
||||
case RTMP_MSG_AUDIO:
|
||||
pkt := avformat.NewAVPacket(RTMP_MSG_AUDIO)
|
||||
if msg.Timestamp == 0xffffff {
|
||||
totalDuration += msg.ExtendTimestamp
|
||||
} else {
|
||||
totalDuration += msg.Timestamp // 绝对时间戳
|
||||
}
|
||||
pkt.Timestamp = totalDuration
|
||||
pkt.Payload = msg.Body
|
||||
room.PushAudio(pkt)
|
||||
case RTMP_MSG_VIDEO:
|
||||
pkt := avformat.NewAVPacket(RTMP_MSG_VIDEO)
|
||||
if msg.Timestamp == 0xffffff {
|
||||
totalDuration += msg.ExtendTimestamp
|
||||
} else {
|
||||
totalDuration += msg.Timestamp // 绝对时间戳
|
||||
}
|
||||
pkt.Timestamp = totalDuration
|
||||
pkt.Payload = msg.Body
|
||||
room.PushVideo(pkt)
|
||||
}
|
||||
msg.Recycle()
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user