mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-10-04 16:02:43 +08:00
Add RTMP server and publish to RMTP logic
This commit is contained in:
@@ -1,34 +1,130 @@
|
|||||||
package rtmp
|
package rtmp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/AlexxIT/go2rtc/internal/api"
|
"github.com/AlexxIT/go2rtc/internal/api"
|
||||||
|
"github.com/AlexxIT/go2rtc/internal/app"
|
||||||
"github.com/AlexxIT/go2rtc/internal/streams"
|
"github.com/AlexxIT/go2rtc/internal/streams"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/flv"
|
"github.com/AlexxIT/go2rtc/pkg/flv"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/rtmp"
|
"github.com/AlexxIT/go2rtc/pkg/rtmp"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Init() {
|
func Init() {
|
||||||
|
var conf struct {
|
||||||
|
Mod struct {
|
||||||
|
Listen string `yaml:"listen" json:"listen"`
|
||||||
|
} `yaml:"rtmp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
app.LoadConfig(&conf)
|
||||||
|
|
||||||
|
log = app.GetLogger("rtsp")
|
||||||
|
|
||||||
streams.HandleFunc("rtmp", streamsHandle)
|
streams.HandleFunc("rtmp", streamsHandle)
|
||||||
streams.HandleFunc("rtmps", streamsHandle)
|
streams.HandleFunc("rtmps", streamsHandle)
|
||||||
streams.HandleFunc("rtmpx", streamsHandle)
|
streams.HandleFunc("rtmpx", streamsHandle)
|
||||||
|
|
||||||
api.HandleFunc("api/stream.flv", apiHandle)
|
api.HandleFunc("api/stream.flv", apiHandle)
|
||||||
|
|
||||||
|
streams.HandleConsumerFunc("rtmp", streamsConsumerHandle)
|
||||||
|
streams.HandleConsumerFunc("rtmps", streamsConsumerHandle)
|
||||||
|
streams.HandleConsumerFunc("rtmpx", streamsConsumerHandle)
|
||||||
|
|
||||||
|
address := conf.Mod.Listen
|
||||||
|
if address == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ln, err := net.Listen("tcp", address)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Caller().Send()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Str("addr", address).Msg("[rtmp] listen")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
conn, err := ln.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err = tcpHandle(conn); err != nil {
|
||||||
|
log.Error().Err(err).Caller().Send()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tcpHandle(conn net.Conn) error {
|
||||||
|
client, err := rtmp.NewServer(conn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = client.ReadCommands(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch client.Intent {
|
||||||
|
case rtmp.CommandPlay:
|
||||||
|
stream := streams.Get(client.App)
|
||||||
|
if stream == nil {
|
||||||
|
return errors.New("stream not found: " + client.App)
|
||||||
|
}
|
||||||
|
|
||||||
|
cons := flv.NewConsumer()
|
||||||
|
if err = stream.AddConsumer(cons); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer stream.RemoveConsumer(cons)
|
||||||
|
|
||||||
|
if err = client.WritePlayStart(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _ = cons.WriteTo(client)
|
||||||
|
|
||||||
|
case rtmp.CommandPublish:
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var log zerolog.Logger
|
||||||
|
|
||||||
func streamsHandle(url string) (core.Producer, error) {
|
func streamsHandle(url string) (core.Producer, error) {
|
||||||
client, err := rtmp.Dial(url)
|
client, err := rtmp.DialPlay(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func streamsConsumerHandle(url string) (core.Consumer, func(), error) {
|
||||||
|
cons := flv.NewConsumer()
|
||||||
|
run := func() {
|
||||||
|
wr, err := rtmp.DialPublish(url)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, err = cons.WriteTo(wr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cons, run, nil
|
||||||
|
}
|
||||||
|
|
||||||
func apiHandle(w http.ResponseWriter, r *http.Request) {
|
func apiHandle(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Method != "POST" {
|
if r.Method != "POST" {
|
||||||
outputFLV(w, r)
|
outputFLV(w, r)
|
||||||
|
5
pkg/rtmp/README.md
Normal file
5
pkg/rtmp/README.md
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
## Useful links
|
||||||
|
|
||||||
|
- https://en.wikipedia.org/wiki/Flash_Video
|
||||||
|
- https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol
|
||||||
|
- https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf
|
163
pkg/rtmp/client.go
Normal file
163
pkg/rtmp/client.go
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
package rtmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/flv"
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||||
|
)
|
||||||
|
|
||||||
|
func DialPlay(rawURL string) (core.Producer, error) {
|
||||||
|
u, err := url.Parse(rawURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := tcp.Dial(u, core.ConnDialTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := NewClient(conn, u)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = client.play(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return flv.Open(client)
|
||||||
|
}
|
||||||
|
|
||||||
|
func DialPublish(rawURL string) (io.Writer, error) {
|
||||||
|
u, err := url.Parse(rawURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := tcp.Dial(u, core.ConnDialTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := NewClient(conn, u)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = client.publish(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(conn net.Conn, u *url.URL) (*Conn, error) {
|
||||||
|
c := &Conn{
|
||||||
|
url: u.String(),
|
||||||
|
|
||||||
|
conn: conn,
|
||||||
|
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
||||||
|
wr: conn,
|
||||||
|
|
||||||
|
chunks: map[uint8]*header{},
|
||||||
|
|
||||||
|
rdPacketSize: 128,
|
||||||
|
wrPacketSize: 4096, // OBS - 4096, Reolink - 4096
|
||||||
|
}
|
||||||
|
|
||||||
|
if args := strings.Split(u.Path, "/"); len(args) >= 2 {
|
||||||
|
c.App = args[1]
|
||||||
|
if len(args) >= 3 {
|
||||||
|
c.Stream = args[2]
|
||||||
|
if u.RawQuery != "" {
|
||||||
|
c.Stream += "?" + u.RawQuery
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.clienHandshake(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := c.writePacketSize(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) clienHandshake() error {
|
||||||
|
// simple handshake without real random and check response
|
||||||
|
b := make([]byte, 1+1536)
|
||||||
|
b[0] = 0x03
|
||||||
|
// write C0+C1
|
||||||
|
if _, err := c.conn.Write(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// read S0+S1
|
||||||
|
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// write S1
|
||||||
|
if _, err := c.conn.Write(b[1:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// read C1, skip check
|
||||||
|
if _, err := io.ReadFull(c.rd, b[1:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) play() error {
|
||||||
|
c.rdBuf = []byte{
|
||||||
|
'F', 'L', 'V', // signature
|
||||||
|
1, // version
|
||||||
|
0, // flags (has video/audio)
|
||||||
|
0, 0, 0, 9, // header size
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.writeConnect(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.writeCreateStream(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.writePlay(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) publish() error {
|
||||||
|
if err := c.writeConnect(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.writeReleaseStream(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.writeCreateStream(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.writePublish(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
_, _, _, err := c.readMessage()
|
||||||
|
//log.Printf("!!! %d %d %.30x", msgType, timeMS, b)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
353
pkg/rtmp/conn.go
Normal file
353
pkg/rtmp/conn.go
Normal file
@@ -0,0 +1,353 @@
|
|||||||
|
package rtmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/flv/amf"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
TypeSetPacketSize = 1
|
||||||
|
TypeServerBandwidth = 5
|
||||||
|
TypeClientBandwidth = 6
|
||||||
|
TypeAudio = 8
|
||||||
|
TypeVideo = 9
|
||||||
|
TypeData = 18
|
||||||
|
TypeCommand = 20
|
||||||
|
)
|
||||||
|
|
||||||
|
type Conn struct {
|
||||||
|
App string
|
||||||
|
Stream string
|
||||||
|
Intent string
|
||||||
|
|
||||||
|
rdPacketSize uint32
|
||||||
|
wrPacketSize uint32
|
||||||
|
|
||||||
|
chunks map[byte]*header
|
||||||
|
streamID byte
|
||||||
|
url string
|
||||||
|
|
||||||
|
conn net.Conn
|
||||||
|
rd io.Reader
|
||||||
|
wr io.Writer
|
||||||
|
|
||||||
|
rdBuf []byte
|
||||||
|
wrBuf []byte
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Close() error {
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) readResponse(transID float64) ([]any, error) {
|
||||||
|
for {
|
||||||
|
msgType, _, b, err := c.readMessage()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch msgType {
|
||||||
|
case TypeSetPacketSize:
|
||||||
|
c.rdPacketSize = binary.BigEndian.Uint32(b)
|
||||||
|
case TypeCommand:
|
||||||
|
items, _ := amf.NewReader(b).ReadItems()
|
||||||
|
if len(items) >= 3 && items[1] == transID {
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type header struct {
|
||||||
|
timeMS uint32
|
||||||
|
dataSize uint32
|
||||||
|
tagType byte
|
||||||
|
streamID uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
//var ErrNotImplemented = errors.New("rtmp: not implemented")
|
||||||
|
|
||||||
|
func (c *Conn) readMessage() (byte, uint32, []byte, error) {
|
||||||
|
b, err := c.readSize(1) // doesn't support big chunkID!!!
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
hdrType := b[0] >> 6
|
||||||
|
chunkID := b[0] & 0b111111
|
||||||
|
|
||||||
|
// storing header information for support header type 3
|
||||||
|
hdr, ok := c.chunks[chunkID]
|
||||||
|
if !ok {
|
||||||
|
hdr = &header{}
|
||||||
|
c.chunks[chunkID] = hdr
|
||||||
|
}
|
||||||
|
|
||||||
|
switch hdrType {
|
||||||
|
case 0: // 12 byte header (full header)
|
||||||
|
if b, err = c.readSize(11); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
_ = b[7]
|
||||||
|
hdr.timeMS = Uint24(b)
|
||||||
|
hdr.dataSize = Uint24(b[3:])
|
||||||
|
hdr.tagType = b[6]
|
||||||
|
hdr.streamID = binary.LittleEndian.Uint32(b[7:])
|
||||||
|
|
||||||
|
case 1: // 8 bytes - like type b00, not including message ID (4 last bytes)
|
||||||
|
if b, err = c.readSize(7); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
_ = b[6]
|
||||||
|
hdr.timeMS = Uint24(b) // timestamp
|
||||||
|
hdr.dataSize = Uint24(b[3:]) // msgdatalen
|
||||||
|
hdr.tagType = b[6] // msgtypeid
|
||||||
|
|
||||||
|
case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included
|
||||||
|
if b, err = c.readSize(3); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
hdr.timeMS = Uint24(b) // timestamp
|
||||||
|
|
||||||
|
case 3: // 1 byte - only the Basic Header is included
|
||||||
|
// use here hdr from previous msg with same session ID (sid)
|
||||||
|
}
|
||||||
|
|
||||||
|
timeMS := hdr.timeMS
|
||||||
|
if timeMS == 0xFFFFFF {
|
||||||
|
if b, err = c.readSize(4); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
timeMS = binary.BigEndian.Uint32(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
//log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, hdr.timeMS, hdr.dataSize, hdr.tagType, hdr.streamID)
|
||||||
|
|
||||||
|
// 1. Response zero size
|
||||||
|
if hdr.dataSize == 0 {
|
||||||
|
return hdr.tagType, timeMS, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b = make([]byte, hdr.dataSize)
|
||||||
|
|
||||||
|
// 2. Response small packet
|
||||||
|
if hdr.dataSize <= c.rdPacketSize {
|
||||||
|
if _, err = io.ReadFull(c.rd, b); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
return hdr.tagType, timeMS, b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Response big packet
|
||||||
|
var i0 uint32
|
||||||
|
for i1 := c.rdPacketSize; i1 < hdr.dataSize; i1 += c.rdPacketSize {
|
||||||
|
if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = c.readSize(1); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hdr.timeMS == 0xFFFFFF {
|
||||||
|
if _, err = c.readSize(4); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
i0 = i1
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.ReadFull(c.rd, b[i0:]); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return hdr.tagType, timeMS, b, nil
|
||||||
|
}
|
||||||
|
func (c *Conn) writeMessage(chunkID, tagType byte, timeMS uint32, payload []byte) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.resetBuffer()
|
||||||
|
|
||||||
|
b := payload
|
||||||
|
size := uint32(len(b))
|
||||||
|
|
||||||
|
if size > c.wrPacketSize {
|
||||||
|
c.appendType0(chunkID, tagType, timeMS, size, b[:c.wrPacketSize])
|
||||||
|
|
||||||
|
for {
|
||||||
|
b = b[c.wrPacketSize:]
|
||||||
|
if uint32(len(b)) > c.wrPacketSize {
|
||||||
|
c.appendType3(chunkID, b[:c.wrPacketSize])
|
||||||
|
} else {
|
||||||
|
c.appendType3(chunkID, b)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
c.appendType0(chunkID, tagType, timeMS, size, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
//log.Printf("%d %2d %5d %6d %.32x", chunkID, tagType, timeMS, size, payload)
|
||||||
|
|
||||||
|
_, err := c.wr.Write(c.wrBuf)
|
||||||
|
c.mu.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) resetBuffer() {
|
||||||
|
c.wrBuf = c.wrBuf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) appendType0(chunkID, tagType byte, timeMS, size uint32, payload []byte) {
|
||||||
|
// TODO: timeMS more than 24 bit
|
||||||
|
c.wrBuf = append(c.wrBuf,
|
||||||
|
chunkID,
|
||||||
|
byte(timeMS>>16), byte(timeMS>>8), byte(timeMS),
|
||||||
|
byte(size>>16), byte(size>>8), byte(size),
|
||||||
|
tagType,
|
||||||
|
c.streamID, 0, 0, 0, // little endian streamID
|
||||||
|
)
|
||||||
|
c.wrBuf = append(c.wrBuf, payload...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) appendType3(chunkID byte, payload []byte) {
|
||||||
|
c.wrBuf = append(c.wrBuf, 3<<6|chunkID)
|
||||||
|
c.wrBuf = append(c.wrBuf, payload...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) writePacketSize() error {
|
||||||
|
b := binary.BigEndian.AppendUint32(nil, c.wrPacketSize)
|
||||||
|
return c.writeMessage(2, TypeSetPacketSize, 0, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) writeConnect() error {
|
||||||
|
b := amf.EncodeItems("connect", 1, map[string]any{
|
||||||
|
"app": c.App,
|
||||||
|
"flashVer": "FMLE/3.0 (compatible; FMSc/1.0)",
|
||||||
|
"tcUrl": c.url,
|
||||||
|
})
|
||||||
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := c.readResponse(1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
code := getString(v, 3, "code")
|
||||||
|
if code != "NetConnection.Connect.Success" {
|
||||||
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) writeReleaseStream() error {
|
||||||
|
b := amf.EncodeItems("releaseStream", 2, nil, c.Stream)
|
||||||
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b = amf.EncodeItems("FCPublish", 3, nil, c.Stream)
|
||||||
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (c *Conn) writeCreateStream() error {
|
||||||
|
b := amf.EncodeItems("createStream", 4, nil)
|
||||||
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := c.readResponse(4)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(v) == 4 {
|
||||||
|
if f, ok := v[3].(float64); ok {
|
||||||
|
c.streamID = byte(f)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) writePublish() error {
|
||||||
|
b := amf.EncodeItems("publish", 5, nil, c.Stream, "live")
|
||||||
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := c.readResponse(0)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
code := getString(v, 3, "code")
|
||||||
|
if code != "NetStream.Publish.Start" {
|
||||||
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) writePlay() error {
|
||||||
|
b := amf.EncodeItems("play", 5, nil, c.Stream)
|
||||||
|
if err := c.writeMessage(3, TypeCommand, 0, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := c.readResponse(0)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
code := getString(v, 3, "code")
|
||||||
|
if !strings.HasPrefix(code, "NetStream.Play.") {
|
||||||
|
return fmt.Errorf("rtmp: wrong response %#v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) readSize(n uint32) ([]byte, error) {
|
||||||
|
b := make([]byte, n)
|
||||||
|
if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func PutUint24(b []byte, v uint32) {
|
||||||
|
_ = b[2]
|
||||||
|
b[0] = byte(v >> 16)
|
||||||
|
b[1] = byte(v >> 8)
|
||||||
|
b[2] = byte(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Uint24(b []byte) uint32 {
|
||||||
|
_ = b[2]
|
||||||
|
return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2])
|
||||||
|
}
|
||||||
|
|
||||||
|
func getString(v []any, i int, key string) string {
|
||||||
|
if len(v) <= i {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if v, ok := v[i].(map[string]any); ok {
|
||||||
|
if s, ok := v[key].(string); ok {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
71
pkg/rtmp/flv.go
Normal file
71
pkg/rtmp/flv.go
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
package rtmp
|
||||||
|
|
||||||
|
// Read - convert RTMP to FLV format
|
||||||
|
func (c *Conn) Read(p []byte) (n int, err error) {
|
||||||
|
// 1. Check temporary tempbuffer
|
||||||
|
if len(c.rdBuf) == 0 {
|
||||||
|
msgType, timeMS, payload, err2 := c.readMessage()
|
||||||
|
if err2 != nil {
|
||||||
|
return 0, err2
|
||||||
|
}
|
||||||
|
|
||||||
|
// previous tag size (4 byte) + header (11 byte) + payload
|
||||||
|
n = 4 + 11 + len(payload)
|
||||||
|
|
||||||
|
// 2. Check if the message fits in the buffer
|
||||||
|
if n <= len(p) {
|
||||||
|
encodeFLV(p, msgType, timeMS, payload)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Put the message into a temporary buffer
|
||||||
|
c.rdBuf = make([]byte, n)
|
||||||
|
encodeFLV(c.rdBuf, msgType, timeMS, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Send temporary buffer
|
||||||
|
n = copy(p, c.rdBuf)
|
||||||
|
c.rdBuf = c.rdBuf[n:]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func encodeFLV(b []byte, msgType byte, time uint32, payload []byte) {
|
||||||
|
_ = b[4+11]
|
||||||
|
|
||||||
|
b[0] = 0
|
||||||
|
b[1] = 0
|
||||||
|
b[2] = 0
|
||||||
|
b[3] = 0
|
||||||
|
b[4+0] = msgType
|
||||||
|
PutUint24(b[4+1:], uint32(len(payload)))
|
||||||
|
PutUint24(b[4+4:], time)
|
||||||
|
b[4+7] = byte(time >> 24)
|
||||||
|
|
||||||
|
copy(b[4+11:], payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write - convert FLV format to RTMP format
|
||||||
|
func (c *Conn) Write(p []byte) (n int, err error) {
|
||||||
|
n = len(p)
|
||||||
|
|
||||||
|
if p[0] == 'F' {
|
||||||
|
p = p[9+4:] // skip first msg with FLV header
|
||||||
|
|
||||||
|
for len(p) > 0 {
|
||||||
|
size := 11 + uint16(p[2])<<8 + uint16(p[3]) + 4
|
||||||
|
if _, err = c.Write(p[:size]); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
p = p[size:]
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode FLV: 11 bytes header + payload + 4 byte size
|
||||||
|
tagType := p[0]
|
||||||
|
timeMS := uint32(p[4])<<16 | uint32(p[5])<<8 | uint32(p[6]) | uint32(p[7])<<24
|
||||||
|
payload := p[11 : len(p)-4]
|
||||||
|
|
||||||
|
err = c.writeMessage(4, tagType, timeMS, payload)
|
||||||
|
return
|
||||||
|
}
|
@@ -1,28 +0,0 @@
|
|||||||
package rtmp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/url"
|
|
||||||
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/flv"
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Dial(rawURL string) (core.Producer, error) {
|
|
||||||
u, err := url.Parse(rawURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := tcp.Dial(u, core.ConnDialTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
rd, err := NewReader(u, conn)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return flv.Open(rd)
|
|
||||||
}
|
|
@@ -1,488 +0,0 @@
|
|||||||
package rtmp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/flv/amf"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
MsgSetPacketSize = 1
|
|
||||||
MsgServerBandwidth = 5
|
|
||||||
MsgClientBandwidth = 6
|
|
||||||
MsgCommand = 20
|
|
||||||
|
|
||||||
//MsgAck = 3
|
|
||||||
//MsgControl = 4
|
|
||||||
//MsgAudioPacket = 8
|
|
||||||
//MsgVideoPacket = 9
|
|
||||||
//MsgDataExt = 15
|
|
||||||
//MsgCommandExt = 17
|
|
||||||
//MsgData = 18
|
|
||||||
)
|
|
||||||
|
|
||||||
var ErrResponse = errors.New("rtmp: wrong response")
|
|
||||||
|
|
||||||
type Reader struct {
|
|
||||||
url string
|
|
||||||
app string
|
|
||||||
stream string
|
|
||||||
pktSize uint32
|
|
||||||
|
|
||||||
headers map[uint32]*header
|
|
||||||
|
|
||||||
conn net.Conn
|
|
||||||
rd io.Reader
|
|
||||||
|
|
||||||
buf []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewReader(u *url.URL, conn net.Conn) (*Reader, error) {
|
|
||||||
rd := &Reader{
|
|
||||||
url: u.String(),
|
|
||||||
headers: map[uint32]*header{},
|
|
||||||
conn: conn,
|
|
||||||
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
|
||||||
}
|
|
||||||
|
|
||||||
if args := strings.Split(u.Path, "/"); len(args) >= 2 {
|
|
||||||
rd.app = args[1]
|
|
||||||
if len(args) >= 3 {
|
|
||||||
rd.stream = args[2]
|
|
||||||
if u.RawQuery != "" {
|
|
||||||
rd.stream += "?" + u.RawQuery
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rd.handshake(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := rd.sendConfig(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := rd.sendConnect(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := rd.sendPlay(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
rd.buf = []byte{
|
|
||||||
'F', 'L', 'V', // signature
|
|
||||||
1, // version
|
|
||||||
0, // flags (has video/audio)
|
|
||||||
0, 0, 0, 9, // header size
|
|
||||||
}
|
|
||||||
|
|
||||||
return rd, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) Read(p []byte) (n int, err error) {
|
|
||||||
// 1. Check temporary tempbuffer
|
|
||||||
if len(c.buf) == 0 {
|
|
||||||
msgType, timeMS, payload, err2 := c.readMessage()
|
|
||||||
if err2 != nil {
|
|
||||||
return 0, err2
|
|
||||||
}
|
|
||||||
|
|
||||||
payloadSize := len(payload)
|
|
||||||
|
|
||||||
// previous tag size (4 byte) + header (11 byte) + payload
|
|
||||||
n = 4 + 11 + payloadSize
|
|
||||||
|
|
||||||
// 2. Check if the message fits in the buffer
|
|
||||||
if n <= len(p) {
|
|
||||||
encodeFLV(p, msgType, timeMS, uint32(payloadSize), payload)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Put the message into a temporary buffer
|
|
||||||
c.buf = make([]byte, n)
|
|
||||||
encodeFLV(c.buf, msgType, timeMS, uint32(payloadSize), payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Send temporary buffer
|
|
||||||
n = copy(p, c.buf)
|
|
||||||
c.buf = c.buf[n:]
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) Close() error {
|
|
||||||
return c.conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func encodeFLV(b []byte, msgType byte, time, size uint32, payload []byte) {
|
|
||||||
b[0] = 0
|
|
||||||
b[1] = 0
|
|
||||||
b[2] = 0
|
|
||||||
b[3] = 0
|
|
||||||
b[4+0] = msgType
|
|
||||||
PutUint24(b[4+1:], size)
|
|
||||||
PutUint24(b[4+4:], time)
|
|
||||||
b[4+7] = byte(time >> 24)
|
|
||||||
|
|
||||||
copy(b[4+11:], payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
type header struct {
|
|
||||||
msgTime uint32
|
|
||||||
msgSize uint32
|
|
||||||
msgType byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) readMessage() (byte, uint32, []byte, error) {
|
|
||||||
hdrType, sid, err := c.readHeader()
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// storing header information for support header type 3
|
|
||||||
hdr, ok := c.headers[sid]
|
|
||||||
if !ok {
|
|
||||||
hdr = &header{}
|
|
||||||
c.headers[sid] = hdr
|
|
||||||
}
|
|
||||||
|
|
||||||
var b []byte
|
|
||||||
|
|
||||||
// https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol#Packet_structure
|
|
||||||
switch hdrType {
|
|
||||||
case 0: // 12 byte header (full header)
|
|
||||||
if b, err = c.readSize(11); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
_ = b[7]
|
|
||||||
hdr.msgTime = Uint24(b) // timestamp
|
|
||||||
hdr.msgSize = Uint24(b[3:]) // msgdatalen
|
|
||||||
hdr.msgType = b[6] // msgtypeid
|
|
||||||
_ = binary.BigEndian.Uint32(b[7:]) // msgsid
|
|
||||||
|
|
||||||
case 1: // 8 bytes - like type b00, not including message ID (4 last bytes)
|
|
||||||
if b, err = c.readSize(7); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
_ = b[6]
|
|
||||||
hdr.msgTime = Uint24(b) // timestamp
|
|
||||||
hdr.msgSize = Uint24(b[3:]) // msgdatalen
|
|
||||||
hdr.msgType = b[6] // msgtypeid
|
|
||||||
|
|
||||||
case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included
|
|
||||||
if b, err = c.readSize(3); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
hdr.msgTime = Uint24(b) // timestamp
|
|
||||||
|
|
||||||
case 3: // 1 byte - only the Basic Header is included
|
|
||||||
// use here hdr from previous msg with same session ID (sid)
|
|
||||||
}
|
|
||||||
|
|
||||||
timeMS := hdr.msgTime
|
|
||||||
if timeMS == 0xFFFFFF {
|
|
||||||
if b, err = c.readSize(4); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
timeMS = binary.BigEndian.Uint32(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
//log.Printf("[Reader] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType)
|
|
||||||
|
|
||||||
// 1. Response zero size
|
|
||||||
if hdr.msgSize == 0 {
|
|
||||||
return hdr.msgType, timeMS, nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
b = make([]byte, hdr.msgSize)
|
|
||||||
|
|
||||||
// 2. Response small packet
|
|
||||||
if c.pktSize == 0 || hdr.msgSize < c.pktSize {
|
|
||||||
if _, err = io.ReadFull(c.rd, b); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
return hdr.msgType, timeMS, b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Response big packet
|
|
||||||
var i0 uint32
|
|
||||||
for i1 := c.pktSize; i1 < hdr.msgSize; i1 += c.pktSize {
|
|
||||||
if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, _, err = c.readHeader(); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if hdr.msgTime == 0xFFFFFF {
|
|
||||||
if _, err = c.readSize(4); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
i0 = i1
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = io.ReadFull(c.rd, b[i0:]); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return hdr.msgType, timeMS, b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) handshake() error {
|
|
||||||
// simple handshake without real random and check response
|
|
||||||
const randomSize = 4 + 4 + 1528
|
|
||||||
|
|
||||||
b := make([]byte, 1+randomSize)
|
|
||||||
b[0] = 0x03
|
|
||||||
if _, err := c.conn.Write(b); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := io.ReadFull(c.rd, b); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if b[0] != 3 {
|
|
||||||
return errors.New("Reader: wrong handshake")
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := c.conn.Write(b[1:]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := io.ReadFull(c.rd, b[1:]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) sendConfig() error {
|
|
||||||
b := make([]byte, 5)
|
|
||||||
binary.BigEndian.PutUint32(b, 65536)
|
|
||||||
if err := c.sendRequest(MsgSetPacketSize, 0, b[:4]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(b, 2500000)
|
|
||||||
if err := c.sendRequest(MsgServerBandwidth, 0, b[:4]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(b, 10000000) // ack size
|
|
||||||
b[4] = 2 // limit type
|
|
||||||
if err := c.sendRequest(MsgClientBandwidth, 0, b); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) sendConnect() error {
|
|
||||||
msg := amf.AMF{}
|
|
||||||
msg.WriteString("connect")
|
|
||||||
msg.WriteNumber(1)
|
|
||||||
msg.WriteObject(map[string]any{
|
|
||||||
"app": c.app,
|
|
||||||
"flashVer": "MAC 32,0,0,0",
|
|
||||||
"tcUrl": c.url,
|
|
||||||
"fpad": false, // ?
|
|
||||||
"capabilities": 15, // ?
|
|
||||||
"audioCodecs": 4071, // ?
|
|
||||||
"videoCodecs": 252, // ?
|
|
||||||
"videoFunction": 1, // ?
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s, err := c.waitCode("_result", float64(1)) // result with same ID
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if s != "NetConnection.Connect.Success" {
|
|
||||||
return errors.New("Reader: wrong code: " + s)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) sendPlay() error {
|
|
||||||
msg := amf.NewWriter()
|
|
||||||
msg.WriteString("createStream")
|
|
||||||
msg.WriteNumber(2)
|
|
||||||
msg.WriteNull()
|
|
||||||
|
|
||||||
if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
args, err := c.waitResponse("_result", float64(2)) // result with same ID
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(args) < 4 {
|
|
||||||
return ErrResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
sid, _ := args[3].(float64)
|
|
||||||
|
|
||||||
msg = amf.NewWriter()
|
|
||||||
msg.WriteString("play")
|
|
||||||
msg.WriteNumber(0)
|
|
||||||
msg.WriteNull()
|
|
||||||
msg.WriteString(c.stream)
|
|
||||||
|
|
||||||
if err = c.sendRequest(MsgCommand, uint32(sid), msg.Bytes()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s, err := c.waitCode("onStatus", float64(0)) // events has zero transaction ID
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch s {
|
|
||||||
case "NetStream.Play.Start", "NetStream.Play.Reset":
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return errors.New("Reader: wrong code: " + s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) sendRequest(msgType byte, streamID uint32, payload []byte) error {
|
|
||||||
n := len(payload)
|
|
||||||
b := make([]byte, 12+n)
|
|
||||||
_ = b[12]
|
|
||||||
|
|
||||||
switch msgType {
|
|
||||||
case MsgSetPacketSize, MsgServerBandwidth, MsgClientBandwidth:
|
|
||||||
b[0] = 0x02 // chunk ID
|
|
||||||
case MsgCommand:
|
|
||||||
if streamID == 0 {
|
|
||||||
b[0] = 0x03 // chunk ID
|
|
||||||
} else {
|
|
||||||
b[0] = 0x08 // chunk ID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//PutUint24(b[1:], 0) // timestamp
|
|
||||||
PutUint24(b[4:], uint32(n)) // payload size
|
|
||||||
b[7] = msgType // message type
|
|
||||||
binary.BigEndian.PutUint32(b[8:], streamID) // message stream ID
|
|
||||||
copy(b[12:], payload)
|
|
||||||
|
|
||||||
if _, err := c.conn.Write(b); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) readHeader() (byte, uint32, error) {
|
|
||||||
b, err := c.readSize(1)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
hdrType := b[0] >> 6
|
|
||||||
sid := uint32(b[0] & 0b111111)
|
|
||||||
|
|
||||||
switch sid {
|
|
||||||
case 0:
|
|
||||||
if b, err = c.readSize(1); err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
sid = 64 + uint32(b[0])
|
|
||||||
case 1:
|
|
||||||
if b, err = c.readSize(2); err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
sid = 64 + uint32(binary.BigEndian.Uint16(b))
|
|
||||||
}
|
|
||||||
|
|
||||||
return hdrType, sid, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) readSize(n uint32) ([]byte, error) {
|
|
||||||
b := make([]byte, n)
|
|
||||||
if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) waitResponse(cmd any, tid any) ([]any, error) {
|
|
||||||
for {
|
|
||||||
msgType, _, b, err := c.readMessage()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch msgType {
|
|
||||||
case MsgSetPacketSize:
|
|
||||||
c.pktSize = binary.BigEndian.Uint32(b)
|
|
||||||
|
|
||||||
case MsgCommand:
|
|
||||||
var v []any
|
|
||||||
if v, err = amf.NewReader(b).ReadItems(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(v) < 4 {
|
|
||||||
return nil, ErrResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
if v[0] == cmd && v[1] == tid {
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Reader) waitCode(cmd any, tid any) (string, error) {
|
|
||||||
args, err := c.waitResponse(cmd, tid)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(args) < 4 {
|
|
||||||
return "", ErrResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
m, _ := args[3].(map[string]any)
|
|
||||||
if m == nil {
|
|
||||||
return "", ErrResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
v, _ := m["code"]
|
|
||||||
if v == nil {
|
|
||||||
return "", ErrResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
s, _ := v.(string)
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func PutUint24(b []byte, v uint32) {
|
|
||||||
_ = b[2]
|
|
||||||
b[0] = byte(v >> 16)
|
|
||||||
b[1] = byte(v >> 8)
|
|
||||||
b[2] = byte(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Uint24(b []byte) uint32 {
|
|
||||||
_ = b[2]
|
|
||||||
return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2])
|
|
||||||
}
|
|
167
pkg/rtmp/server.go
Normal file
167
pkg/rtmp/server.go
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
package rtmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/flv/amf"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewServer(conn net.Conn) (*Conn, error) {
|
||||||
|
c := &Conn{
|
||||||
|
conn: conn,
|
||||||
|
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
||||||
|
wr: conn,
|
||||||
|
|
||||||
|
chunks: map[uint8]*header{},
|
||||||
|
|
||||||
|
rdPacketSize: 128,
|
||||||
|
wrPacketSize: 4096,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.serverHandshake(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := c.writePacketSize(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) serverHandshake() error {
|
||||||
|
b := make([]byte, 1+1536)
|
||||||
|
// read C0+C1
|
||||||
|
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// write S0+S1, skip random
|
||||||
|
if _, err := c.conn.Write(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// read S1, skip check
|
||||||
|
if _, err := io.ReadFull(c.rd, make([]byte, 1536)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// write C1
|
||||||
|
if _, err := c.conn.Write(b[1:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) ReadCommands() error {
|
||||||
|
for {
|
||||||
|
msgType, _, b, err := c.readMessage()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//log.Printf("%d %.256x", msgType, b)
|
||||||
|
|
||||||
|
switch msgType {
|
||||||
|
case TypeSetPacketSize:
|
||||||
|
c.rdPacketSize = binary.BigEndian.Uint32(b)
|
||||||
|
case TypeCommand:
|
||||||
|
if err = c.acceptCommand(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Intent != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
CommandConnect = "connect"
|
||||||
|
CommandReleaseStream = "releaseStream"
|
||||||
|
CommandCreateStream = "createStream"
|
||||||
|
CommandPublish = "publish"
|
||||||
|
CommandPlay = "play"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Conn) acceptCommand(b []byte) error {
|
||||||
|
items, err := amf.NewReader(b).ReadItems()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//log.Printf("%#v", items)
|
||||||
|
|
||||||
|
if len(items) < 2 {
|
||||||
|
return fmt.Errorf("rtmp: read command %x", b)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd, ok := items[0].(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("rtmp: read command %x", b)
|
||||||
|
}
|
||||||
|
|
||||||
|
tID, ok := items[1].(float64) // transaction ID
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("rtmp: read command %x", b)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cmd {
|
||||||
|
case CommandConnect:
|
||||||
|
if len(items) == 3 {
|
||||||
|
if v, ok := items[2].(map[string]any); ok {
|
||||||
|
c.App, _ = v["app"].(string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.App == "" {
|
||||||
|
return fmt.Errorf("rtmp: read command %x", b)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := amf.EncodeItems(
|
||||||
|
"_result", tID,
|
||||||
|
map[string]any{
|
||||||
|
"fmsVer": "FMS/3,0,1,123",
|
||||||
|
},
|
||||||
|
map[string]any{
|
||||||
|
"code": "NetConnection.Connect.Success",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return c.writeMessage(3, TypeCommand, 0, payload)
|
||||||
|
|
||||||
|
case CommandReleaseStream:
|
||||||
|
payload := amf.EncodeItems("_result", tID, nil)
|
||||||
|
return c.writeMessage(3, TypeCommand, 0, payload)
|
||||||
|
|
||||||
|
case CommandCreateStream:
|
||||||
|
payload := amf.EncodeItems("_result", tID, nil, 1)
|
||||||
|
return c.writeMessage(3, TypeCommand, 0, payload)
|
||||||
|
|
||||||
|
case CommandPublish, CommandPlay:
|
||||||
|
c.Intent = cmd
|
||||||
|
|
||||||
|
default:
|
||||||
|
println("rtmp: unknown command: " + cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) WritePlayStart() error {
|
||||||
|
payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{
|
||||||
|
"code": "NetStream.Play.Start",
|
||||||
|
})
|
||||||
|
return c.writeMessage(3, TypeCommand, 0, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) code() string {
|
||||||
|
switch c.Intent {
|
||||||
|
case CommandPlay:
|
||||||
|
return "NetStream.Play.Start"
|
||||||
|
case CommandPublish:
|
||||||
|
return "NetStream.Publish.Start"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
Reference in New Issue
Block a user