mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-10-07 01:03:07 +08:00
377 lines
7.9 KiB
Go
377 lines
7.9 KiB
Go
package rtmp
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/AlexxIT/go2rtc/pkg/flv/amf"
|
|
)
|
|
|
|
const (
|
|
TypeSetPacketSize = 1
|
|
TypeServerBandwidth = 5
|
|
TypeClientBandwidth = 6
|
|
TypeAudio = 8
|
|
TypeVideo = 9
|
|
TypeData = 18
|
|
TypeCommand = 20
|
|
)
|
|
|
|
type Conn struct {
|
|
App string
|
|
Stream string
|
|
Intent string
|
|
|
|
rdPacketSize uint32
|
|
wrPacketSize uint32
|
|
|
|
chunks map[byte]*chunk
|
|
streamID byte
|
|
url string
|
|
|
|
conn net.Conn
|
|
rd io.Reader
|
|
wr io.Writer
|
|
|
|
rdBuf []byte
|
|
wrBuf []byte
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (c *Conn) Close() error {
|
|
return c.conn.Close()
|
|
}
|
|
|
|
func (c *Conn) readResponse(wait func(items []any) bool) ([]any, error) {
|
|
for {
|
|
msgType, _, b, err := c.readMessage()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
//log.Printf("[rtmp] type=%d data=%s", msgType, b)
|
|
|
|
switch msgType {
|
|
case TypeSetPacketSize:
|
|
c.rdPacketSize = binary.BigEndian.Uint32(b)
|
|
case TypeCommand:
|
|
items, _ := amf.NewReader(b).ReadItems()
|
|
if wait(items) {
|
|
return items, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type chunk struct {
|
|
conn *Conn
|
|
rawTime uint32
|
|
dataSize uint32
|
|
tagType byte
|
|
streamID uint32
|
|
timeMS uint32
|
|
}
|
|
|
|
func (c *chunk) readHeader(typ byte) error {
|
|
switch typ {
|
|
case 0: // 12 byte header (full header)
|
|
b, err := c.conn.readSize(11)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.rawTime = Uint24(b)
|
|
c.dataSize = Uint24(b[3:])
|
|
c.tagType = b[6]
|
|
c.streamID = binary.LittleEndian.Uint32(b[7:])
|
|
c.timeMS = c.readExtendedTime()
|
|
|
|
case 1: // 8 bytes - like type b00, not including message ID (4 last bytes)
|
|
b, err := c.conn.readSize(7)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.rawTime = Uint24(b)
|
|
c.dataSize = Uint24(b[3:]) // msgdatalen
|
|
c.tagType = b[6] // msgtypeid
|
|
c.timeMS += c.readExtendedTime()
|
|
|
|
case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included
|
|
b, err := c.conn.readSize(3)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.rawTime = Uint24(b) // timestamp
|
|
c.timeMS += c.readExtendedTime()
|
|
|
|
case 3: // 1 byte - only the Basic Header is included
|
|
// use here hdr from previous msg with same session ID (sid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *chunk) readExtendedTime() uint32 {
|
|
if c.rawTime == 0xFFFFFF {
|
|
if b, err := c.conn.readSize(4); err == nil {
|
|
return binary.BigEndian.Uint32(b)
|
|
}
|
|
}
|
|
return c.rawTime
|
|
}
|
|
|
|
//var ErrNotImplemented = errors.New("rtmp: not implemented")
|
|
|
|
func (c *Conn) readMessage() (byte, uint32, []byte, error) {
|
|
b, err := c.readSize(1) // doesn't support big chunkID!!!
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
|
|
hdrType := b[0] >> 6
|
|
chunkID := b[0] & 0b111111
|
|
|
|
// storing header information for support header type 3
|
|
ch, ok := c.chunks[chunkID]
|
|
if !ok {
|
|
ch = &chunk{conn: c}
|
|
c.chunks[chunkID] = ch
|
|
}
|
|
|
|
if err = ch.readHeader(hdrType); err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
|
|
//log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, ch.timeMS, ch.dataSize, ch.tagType, ch.streamID)
|
|
|
|
// 1. Response zero size
|
|
if ch.dataSize == 0 {
|
|
return ch.tagType, ch.timeMS, nil, nil
|
|
}
|
|
|
|
data := make([]byte, ch.dataSize)
|
|
|
|
// 2. Response small packet
|
|
if ch.dataSize <= c.rdPacketSize {
|
|
if _, err = io.ReadFull(c.rd, data); err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
return ch.tagType, ch.timeMS, data, nil
|
|
}
|
|
|
|
// 3. Response big packet
|
|
var i0 uint32
|
|
for i1 := c.rdPacketSize; i1 < ch.dataSize; i1 += c.rdPacketSize {
|
|
if _, err = io.ReadFull(c.rd, data[i0:i1]); err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
|
|
// hopefully this will be hdrType=3 with same chunkID
|
|
if _, err = c.readSize(1); err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
|
|
_ = ch.readExtendedTime()
|
|
|
|
i0 = i1
|
|
}
|
|
|
|
if _, err = io.ReadFull(c.rd, data[i0:]); err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
|
|
return ch.tagType, ch.timeMS, data, nil
|
|
}
|
|
|
|
func (c *Conn) writeMessage(chunkID, tagType byte, timeMS uint32, payload []byte) error {
|
|
c.mu.Lock()
|
|
c.resetBuffer()
|
|
|
|
b := payload
|
|
size := uint32(len(b))
|
|
|
|
if size > c.wrPacketSize {
|
|
c.appendType0(chunkID, tagType, timeMS, size, b[:c.wrPacketSize])
|
|
|
|
for {
|
|
b = b[c.wrPacketSize:]
|
|
if uint32(len(b)) > c.wrPacketSize {
|
|
c.appendType3(chunkID, b[:c.wrPacketSize])
|
|
} else {
|
|
c.appendType3(chunkID, b)
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
c.appendType0(chunkID, tagType, timeMS, size, b)
|
|
}
|
|
|
|
//log.Printf("%d %2d %5d %6d %.32x", chunkID, tagType, timeMS, size, payload)
|
|
|
|
_, err := c.wr.Write(c.wrBuf)
|
|
c.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
func (c *Conn) resetBuffer() {
|
|
c.wrBuf = c.wrBuf[:0]
|
|
}
|
|
|
|
func (c *Conn) appendType0(chunkID, tagType byte, timeMS, size uint32, payload []byte) {
|
|
// TODO: timeMS more than 24 bit
|
|
c.wrBuf = append(c.wrBuf,
|
|
chunkID,
|
|
byte(timeMS>>16), byte(timeMS>>8), byte(timeMS),
|
|
byte(size>>16), byte(size>>8), byte(size),
|
|
tagType,
|
|
c.streamID, 0, 0, 0, // little endian streamID
|
|
)
|
|
c.wrBuf = append(c.wrBuf, payload...)
|
|
}
|
|
|
|
func (c *Conn) appendType3(chunkID byte, payload []byte) {
|
|
c.wrBuf = append(c.wrBuf, 3<<6|chunkID)
|
|
c.wrBuf = append(c.wrBuf, payload...)
|
|
}
|
|
|
|
func (c *Conn) writePacketSize() error {
|
|
b := binary.BigEndian.AppendUint32(nil, c.wrPacketSize)
|
|
return c.writeMessage(2, TypeSetPacketSize, 0, b)
|
|
}
|
|
|
|
func (c *Conn) writeConnect() error {
|
|
b := amf.EncodeItems("connect", 1, map[string]any{
|
|
"app": c.App,
|
|
"flashVer": "FMLE/3.0 (compatible; FMSc/1.0)",
|
|
"tcUrl": c.url,
|
|
})
|
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
|
return err
|
|
}
|
|
|
|
v, err := c.readResponse(func(items []any) bool {
|
|
return len(items) >= 3 && items[0] == "_result" && items[1] == float64(1)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
code := getString(v, 3, "code")
|
|
if code != "NetConnection.Connect.Success" {
|
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) writeReleaseStream() error {
|
|
b := amf.EncodeItems("releaseStream", 2, nil, c.Stream)
|
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
|
return err
|
|
}
|
|
b = amf.EncodeItems("FCPublish", 3, nil, c.Stream)
|
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (c *Conn) writeCreateStream() error {
|
|
b := amf.EncodeItems("createStream", 4, nil)
|
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
|
return err
|
|
}
|
|
|
|
v, err := c.readResponse(func(items []any) bool {
|
|
return len(items) >= 3 && items[0] == "_result" && items[1] == float64(4)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(v) == 4 {
|
|
if f, ok := v[3].(float64); ok {
|
|
c.streamID = byte(f)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
|
}
|
|
|
|
func (c *Conn) writePublish() error {
|
|
b := amf.EncodeItems("publish", 5, nil, c.Stream, "live")
|
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
|
return err
|
|
}
|
|
|
|
// YouTube can response with "onBWDone 0"
|
|
v, err := c.readResponse(func(items []any) bool {
|
|
return len(items) >= 3 && items[0] == "onStatus"
|
|
})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
code := getString(v, 3, "code")
|
|
if code != "NetStream.Publish.Start" {
|
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) writePlay() error {
|
|
b := amf.EncodeItems("play", 5, nil, c.Stream)
|
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Reolink response with ID=0, other software respose with ID=5
|
|
v, err := c.readResponse(func(items []any) bool {
|
|
return len(items) >= 3 && items[0] == "onStatus"
|
|
})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
code := getString(v, 3, "code")
|
|
if !strings.HasPrefix(code, "NetStream.Play.") {
|
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) readSize(n uint32) ([]byte, error) {
|
|
b := make([]byte, n)
|
|
if _, err := io.ReadFull(c.rd, b); err != nil {
|
|
return nil, err
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
func PutUint24(b []byte, v uint32) {
|
|
_ = b[2]
|
|
b[0] = byte(v >> 16)
|
|
b[1] = byte(v >> 8)
|
|
b[2] = byte(v)
|
|
}
|
|
|
|
func Uint24(b []byte) uint32 {
|
|
_ = b[2]
|
|
return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2])
|
|
}
|
|
|
|
func getString(v []any, i int, key string) string {
|
|
if len(v) <= i {
|
|
return ""
|
|
}
|
|
if v, ok := v[i].(map[string]any); ok {
|
|
if s, ok := v[key].(string); ok {
|
|
return s
|
|
}
|
|
}
|
|
return ""
|
|
}
|