First version

Header finished
This commit is contained in:
zhangpeihao
2013-03-14 15:50:13 +08:00
commit 0c4a818fd4
8 changed files with 792 additions and 0 deletions

22
.gitattributes vendored Normal file
View File

@@ -0,0 +1,22 @@
# Auto detect text files and perform LF normalization
* text=auto
# Custom for Visual Studio
*.cs diff=csharp
*.sln merge=union
*.csproj merge=union
*.vbproj merge=union
*.fsproj merge=union
*.dbproj merge=union
# Standard to msysgit
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain

163
.gitignore vendored Normal file
View File

@@ -0,0 +1,163 @@
#################
## Eclipse
#################
*.pydevproject
.project
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.classpath
.settings/
.loadpath
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# CDT-specific
.cproject
# PDT-specific
.buildpath
#################
## Visual Studio
#################
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
# User-specific files
*.suo
*.user
*.sln.docstates
# Build results
[Dd]ebug/
[Rr]elease/
*_i.c
*_p.c
*.ilk
*.meta
*.obj
*.pch
*.pdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.vspscc
.builds
*.dotCover
## TODO: If you have NuGet Package Restore enabled, uncomment this
#packages/
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opensdf
*.sdf
# Visual Studio profiler
*.psess
*.vsp
# ReSharper is a .NET coding add-in
_ReSharper*
# Installshield output folder
[Ee]xpress
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish
# Others
[Bb]in
[Oo]bj
sql
TestResults
*.Cache
ClientBin
stylecop.*
~$*
*.dbmdl
Generated_Code #added for RIA/Silverlight projects
# Backup & report files from converting an old project file to a newer
# Visual Studio version. Backup files are not needed, because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
############
## Windows
############
# Windows image file caches
Thumbs.db
# Folder config file
Desktop.ini
#############
## Python
#############
*.py[co]
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
#Translations
*.mo
#Mr Developer
.mr.developer.cfg
# Mac crap
.DS_Store

3
.travis.yml Normal file
View File

@@ -0,0 +1,3 @@
language: go
script:
- curl https://raw.github.com/daaku/go.travis/master/install | sh

22
LICENSE Normal file
View File

@@ -0,0 +1,22 @@
MIT License Terms
=================
Copyright (c) 2013 zhang peihao
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

12
README.md Normal file
View File

@@ -0,0 +1,12 @@
GoRTMP [![Build Status](https://secure.travis-ci.org/zhangpeihao/gortmp.png)](http://travis-ci.org/zhangpeihao/gortmp)
======
RTMP protocol implementation.
Spec:
* RTMP - http://www.adobe.com/devnet/rtmp.html
* AMF0 - http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf
* AMF3 - http://download.macromedia.com/pub/labs/amf/amf3_spec_121207.pdf
Todo:

11
defines.go Normal file
View File

@@ -0,0 +1,11 @@
// Copyright 2013, zhangpeihao All rights reserved.
package rtmp
// Chunk Message Header - "fmt" field values
const (
HEADER_FMT_FULL = 0x00
HEADER_FMT_SAME_STREAM = 0x01
HEADER_FMT_SAME_LENGTH_AND_STREAM = 0x02
HEADER_FMT_CONTINUATION = 0x03
)

343
header.go Normal file
View File

@@ -0,0 +1,343 @@
// Copyright 2013, zhangpeihao All rights reserved.
package rtmp
import (
"bufio"
"encoding/binary"
"errors"
"io"
)
// RTMP Chunk Header
//
// The header is broken down into three parts:
//
// | Basic header|Chunk Msg Header|Extended Time Stamp| Chunk Data |
//
// Chunk basic header: 1 to 3 bytes
//
// This field encodes the chunk stream ID and the chunk type. Chunk
// type determines the format of the encoded message header. The
// length depends entirely on the chunk stream ID, which is a
// variable-length field.
//
// Chunk message header: 0, 3, 7, or 11 bytes
//
// This field encodes information about the message being sent
// (whether in whole or in part). The length can be determined using
// the chunk type specified in the chunk header.
//
// Extended timestamp: 0 or 4 bytes
//
// This field MUST be sent when the normal timsestamp is set to
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
// anything else. So for values less than 0xffffff the normal
// timestamp field SHOULD be used in which case the extended timestamp
// MUST NOT be present. For values greater than or equal to 0xffffff
// the normal timestamp field MUST NOT be used and MUST be set to
// 0xffffff and the extended timestamp MUST be sent.
type Header struct {
// Basic Header
Fmt uint8
ChunkStreamID uint32
// Chunk Message Header
Timestamp uint32
MessageLength uint32
MessageTypeID uint8
MessageStreamID uint32
// Extended Timestamp
ExtendedTimestamp uint32
}
// Read Base Header from io.Reader
// High level protocol can use chunk stream ID to query the previous header instance.
func ReadBaseHeader(r io.Reader) (fmt uint8, csi uint32, err error) {
rbuf := bufio.NewReader(r)
var b byte
b, err = rbuf.ReadByte()
if err != nil {
return
}
fmt = uint8(b >> 6)
b = b & 0x3f
switch b {
case 0:
// Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
// field. ID is computed as (the second byte + 64).
b, err = rbuf.ReadByte()
if err != nil {
return
}
csi = uint32(64) + uint32(b)
case 1:
// Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
// this field. ID is computed as ((the third byte)*256 + the second byte
// + 64).
b, err = rbuf.ReadByte()
if err != nil {
return
}
csi = uint32(64) + uint32(b)
b, err = rbuf.ReadByte()
if err != nil {
return
}
csi += uint32(b) * 256
default:
// Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
// field.
csi = uint32(b)
}
return
}
// Read new chunk stream header from io.Reader
func (header *Header) ReadHeader(r io.Reader, fmt uint8, csi uint32) (err error) {
header.Fmt = fmt
header.ChunkStreamID = csi
rbuf := bufio.NewReader(r)
var b byte
tmpBuf := make([]byte, 4)
switch header.Fmt {
case HEADER_FMT_FULL:
// Chunks of Type 0 are 11 bytes long. This type MUST be used at the
// start of a chunk stream, and whenever the stream timestamp goes
// backward (e.g., because of a backward seek).
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp |message length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message length (cont) |message type id| msg stream id |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message stream id (cont) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Figure 9 Chunk Message Header Type 0
_, err = io.ReadAtLeast(rbuf, tmpBuf[1:], 3)
if err != nil {
return
}
header.Timestamp = binary.BigEndian.Uint32(tmpBuf)
_, err = io.ReadAtLeast(rbuf, tmpBuf[1:], 3)
if err != nil {
return
}
header.MessageLength = binary.BigEndian.Uint32(tmpBuf)
b, err = rbuf.ReadByte()
if err != nil {
return
}
header.MessageTypeID = uint8(b)
_, err = io.ReadAtLeast(rbuf, tmpBuf, 4)
if err != nil {
return
}
header.MessageStreamID = binary.BigEndian.Uint32(tmpBuf)
case HEADER_FMT_SAME_STREAM:
// Chunks of Type 1 are 7 bytes long. The message stream ID is not
// included; this chunk takes the same stream ID as the preceding chunk.
// Streams with variable-sized messages (for example, many video
// formats) SHOULD use this format for the first chunk of each new
// message after the first.
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp delta |message length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message length (cont) |message type id|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Figure 10 Chunk Message Header Type 1
_, err = io.ReadAtLeast(rbuf, tmpBuf[1:], 3)
if err != nil {
return
}
header.Timestamp = binary.BigEndian.Uint32(tmpBuf)
_, err = io.ReadAtLeast(rbuf, tmpBuf[1:], 3)
if err != nil {
return
}
header.MessageLength = binary.BigEndian.Uint32(tmpBuf)
b, err = rbuf.ReadByte()
if err != nil {
return
}
header.MessageTypeID = uint8(b)
case HEADER_FMT_SAME_LENGTH_AND_STREAM:
// Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the
// message length is included; this chunk has the same stream ID and
// message length as the preceding chunk. Streams with constant-sized
// messages (for example, some audio and data formats) SHOULD use this
// format for the first chunk of each message after the first.
//
// 0 1 2
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp delta |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Figure 11 Chunk Message Header Type 2
_, err = io.ReadAtLeast(rbuf, tmpBuf[1:], 3)
if err != nil {
return
}
header.Timestamp = binary.BigEndian.Uint32(tmpBuf)
case HEADER_FMT_CONTINUATION:
// Chunks of Type 3 have no header. Stream ID, message length and
// timestamp delta are not present; chunks of this type take values from
// the preceding chunk. When a single message is split into chunks, all
// chunks of a message except the first one, SHOULD use this type. Refer
// to example 2 in section 6.2.2. Stream consisting of messages of
// exactly the same size, stream ID and spacing in time SHOULD use this
// type for all chunks after chunk of Type 2. Refer to example 1 in
// section 6.2.1. If the delta between the first message and the second
// message is same as the time stamp of first message, then chunk of
// type 3 would immediately follow the chunk of type 0 as there is no
// need for a chunk of type 2 to register the delta. If Type 3 chunk
// follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is
// the same as the timestamp of Type 0 chunk.
}
// [Extended Timestamp]
// This field is transmitted only when the normal time stamp in the
// chunk message header is set to 0x00ffffff. If normal time stamp is
// set to any value less than 0x00ffffff, this field MUST NOT be
// present. This field MUST NOT be present if the timestamp field is not
// present. Type 3 chunks MUST NOT have this field.
// !!!!!! crtmpserver set this field in Type 3 !!!!!!
// Todo: Test with FMS
if header.Fmt != HEADER_FMT_CONTINUATION {
if header.Timestamp >= 0xffffff {
_, err = io.ReadAtLeast(rbuf, tmpBuf, 4)
if err != nil {
return
}
header.ExtendedTimestamp = binary.BigEndian.Uint32(tmpBuf)
} else {
header.ExtendedTimestamp = 0
}
}
return
}
// Check header is absolute
func (header *Header) IsAbsolute() bool {
return header.Fmt == HEADER_FMT_FULL
}
// Encode header into io.Writer
func (header *Header) Write(w io.Writer) (n int, err error) {
wbuf := bufio.NewWriter(w)
// Write fmt & Chunk stream ID
switch {
case header.ChunkStreamID <= 63:
err = wbuf.WriteByte(byte((header.Fmt << 6) | byte(header.ChunkStreamID)))
if err != nil {
return
}
n += 1
case header.ChunkStreamID <= 319:
err = wbuf.WriteByte(header.Fmt << 6)
if err != nil {
return
}
n += 1
err = wbuf.WriteByte(byte(header.ChunkStreamID - 64))
if err != nil {
return
}
n += 1
case header.ChunkStreamID <= 65599:
err = wbuf.WriteByte((header.Fmt << 6) | 0x01)
if err != nil {
return
}
n += 1
tmp := uint16(header.ChunkStreamID - 64)
err = binary.Write(wbuf, binary.BigEndian, &tmp)
if err != nil {
return
}
n += 2
default:
return n, errors.New("Unsupport chunk stream ID large then 65599")
}
tmpBuf := make([]byte, 4)
var m int
switch header.Fmt {
case HEADER_FMT_FULL:
// Timestamp
binary.BigEndian.PutUint32(tmpBuf, header.Timestamp)
m, err = wbuf.Write(tmpBuf[1:])
if err != nil {
return
}
n += m
// Message Length
binary.BigEndian.PutUint32(tmpBuf, header.MessageLength)
m, err = wbuf.Write(tmpBuf[1:])
if err != nil {
return
}
n += m
// Message Type
err = wbuf.WriteByte(header.MessageTypeID)
if err != nil {
return
}
n += 1
// Message Stream ID
err = binary.Write(wbuf, binary.BigEndian, &(header.MessageStreamID))
if err != nil {
return
}
n += 4
case HEADER_FMT_SAME_STREAM:
// Timestamp
binary.BigEndian.PutUint32(tmpBuf, header.Timestamp)
m, err = wbuf.Write(tmpBuf[1:])
if err != nil {
return
}
n += m
// Message Length
binary.BigEndian.PutUint32(tmpBuf, header.MessageLength)
m, err = wbuf.Write(tmpBuf[1:])
if err != nil {
return
}
n += m
// Message Type
err = wbuf.WriteByte(header.MessageTypeID)
if err != nil {
return
}
n += 1
case HEADER_FMT_SAME_LENGTH_AND_STREAM:
// Timestamp
binary.BigEndian.PutUint32(tmpBuf, header.Timestamp)
m, err = wbuf.Write(tmpBuf[1:])
if err != nil {
return
}
n += m
case HEADER_FMT_CONTINUATION:
}
// Type 3 chunks MUST NOT have Extended timestamp????
// Todo: Test with FMS
if header.Timestamp >= 0xffffff && header.Fmt != HEADER_FMT_CONTINUATION {
// Extended Timestamp
err = binary.Write(wbuf, binary.BigEndian, &(header.ExtendedTimestamp))
if err != nil {
return
}
n += 4
}
err = wbuf.Flush()
return
}

216
header_test.go Normal file
View File

@@ -0,0 +1,216 @@
package rtmp
import (
"bytes"
"testing"
)
type TestBaseHeaderCase struct {
name string
data []byte
fmt uint8
csi uint32
}
var testBaseHeaderCases = []TestBaseHeaderCase{
{"Chunk strea ID(64 - 319)", []byte{byte(0x00 | 0x00), 0x00}, 0x00, 64},
{"Chunk strea ID(64 - 319)", []byte{byte(0x00 | 0x00), 0x01}, 0x00, 65},
{"Chunk strea ID(64 - 319)", []byte{byte(0x00 | 0x00), 0xff}, 0x00, 319},
{"Chunk strea ID(64 - 65599)", []byte{byte(0x00 | 0x01), 0x00, 0x01}, 0x00, 320},
{"Chunk strea ID(64 - 65599)", []byte{byte(0x00 | 0x01), 0xff, 0xff}, 0x00, 65599},
{"Chunk strea ID 2", []byte{byte(0x00 | 0x02)}, 0x00, 2},
{"Chunk strea ID(3 - 63)", []byte{byte(0x00 | 0x03)}, 0x00, 3},
{"Chunk strea ID(3 - 63)", []byte{byte(0x00 | 0x3f)}, 0x00, 63},
{"fmt 1", []byte{byte(0x40 | 0x03)}, 0x01, 3},
{"fmt 2", []byte{byte(0x80 | 0x03)}, 0x02, 3},
{"fmt 3", []byte{byte(0xC0 | 0x03)}, 0x03, 3},
}
func TestReadBaseHeader(t *testing.T) {
for _, c := range testBaseHeaderCases {
buf := bytes.NewReader(c.data)
fmt, csi, err := ReadBaseHeader(buf)
if err != nil {
t.Errorf("TestReadBaseHeader(%s - fmt: %d, csi: %d) error: %s", c.name, c.fmt, c.csi, err.Error())
continue
}
if fmt != c.fmt || csi != c.csi {
t.Errorf("TestReadBaseHeader(%s - fmt: %d, csi: %d) got: fmt: %d, csi: %d", c.name, c.fmt, c.csi, fmt, csi)
}
}
}
type TestHeaderCase struct {
name string
data []byte
baseHeader []byte
header Header
absolute bool
}
var testHeaderCases = []TestHeaderCase{
{
"Type 0",
[]byte{
0x00, 0x00, 0x01, // Timestamp
0x00, 0x00, 0x02, // Message Length
0x03, // Message Type ID
0x00, 0x00, 0x00, 0x04, // Message Stream ID
},
[]byte{0x03},
Header{
Fmt: 0x00,
ChunkStreamID: 3,
Timestamp: 1,
MessageLength: 2,
MessageTypeID: 3,
MessageStreamID: 4,
ExtendedTimestamp: 0,
},
true,
},
{
"Type 0 - with externed timestamp",
[]byte{
0xff, 0xff, 0xff, // Timestamp
0x00, 0x00, 0x02, // Message Length
0x03, // Message Type ID
0x00, 0x00, 0x00, 0x04, // Message Stream ID
0x10, 0x00, 0x00, 0x00, // Externed timestamp
},
[]byte{0x03},
Header{
Fmt: 0x00,
ChunkStreamID: 3,
Timestamp: 0xffffff,
MessageLength: 2,
MessageTypeID: 3,
MessageStreamID: 4,
ExtendedTimestamp: 0x10000000,
},
true,
},
{
"Type 1",
[]byte{
0x00, 0x00, 0x11, // Timestamp
0x00, 0x00, 0x12, // Message Length
0x13, // Message Type ID
},
[]byte{0x43},
Header{
Fmt: 0x01,
ChunkStreamID: 3,
Timestamp: 0x11,
MessageLength: 0x12,
MessageTypeID: 0x13,
MessageStreamID: 0x04,
ExtendedTimestamp: 0,
},
false,
},
{
"Type 1 - with externed timestamp",
[]byte{
0xff, 0xff, 0xff, // Timestamp
0x00, 0x00, 0x12, // Message Length
0x13, // Message Type ID
0x11, 0x00, 0x00, 0x00, // Externed timestamp
},
[]byte{0x43},
Header{
Fmt: 0x01,
ChunkStreamID: 3,
Timestamp: 0xffffff,
MessageLength: 0x12,
MessageTypeID: 0x13,
MessageStreamID: 0x04,
ExtendedTimestamp: 0x11000000,
},
false,
},
{
"Type 2",
[]byte{
0x00, 0x00, 0x21, // Timestamp
},
[]byte{0x83},
Header{
Fmt: 0x02,
ChunkStreamID: 3,
Timestamp: 0x21,
MessageLength: 0x12,
MessageTypeID: 0x13,
MessageStreamID: 0x04,
ExtendedTimestamp: 0,
},
false,
},
{
"Type 2 - with externed timestamp",
[]byte{
0xff, 0xff, 0xff, // Timestamp
0x21, 0x00, 0x00, 0x00, // Externed timestamp
},
[]byte{0x83},
Header{
Fmt: 0x02,
ChunkStreamID: 3,
Timestamp: 0xffffff,
MessageLength: 0x12,
MessageTypeID: 0x13,
MessageStreamID: 0x04,
ExtendedTimestamp: 0x21000000,
},
false,
},
{
"Type 3",
[]byte{},
[]byte{0xc3},
Header{
Fmt: 0x03,
ChunkStreamID: 3,
Timestamp: 0xffffff,
MessageLength: 0x12,
MessageTypeID: 0x13,
MessageStreamID: 0x04,
ExtendedTimestamp: 0x21000000,
},
false,
},
}
func TestReadHeader(t *testing.T) {
header := &Header{}
for _, c := range testHeaderCases {
buf := bytes.NewReader(c.data)
err := header.ReadHeader(buf, c.header.Fmt, c.header.ChunkStreamID)
if err != nil {
t.Errorf("TestReadHeader(%s)\n\t%v\nerror: %s", c.name, c.header, err.Error())
continue
}
if *header != c.header {
t.Errorf("TestReadHeader(%s)\n\texpect: %v\n\tgot:%v", c.name, c.header, *header)
}
}
}
func TestWriteHeader(t *testing.T) {
for _, c := range testHeaderCases {
buf := new(bytes.Buffer)
n, err := c.header.Write(buf)
if err != nil {
t.Errorf("TestWriteHeader(%s)\n\t%v\nerror: %s", c.name, c.header, err.Error())
continue
}
expect := append(c.baseHeader, c.data...)
if n != len(expect) {
t.Errorf("TestWriteHeader(%s)\n\texpect length: %d\n\tn:%d", c.name, len(expect), n)
}
got := buf.Bytes()
if !bytes.Equal(expect, got) {
t.Errorf("TestWriteHeader(%s)\n\texpect: % x\n\tgot:% x", c.name, expect, got)
}
}
}