Total rework RTMP client

This commit is contained in:
Alexey Khit
2023-08-14 06:11:21 +03:00
parent 0b6fda2af5
commit 19c61e20c0
9 changed files with 507 additions and 216 deletions

View File

@@ -54,7 +54,10 @@ func handleHTTP(url string) (core.Producer, error) {
return multipart.NewClient(res) return multipart.NewClient(res)
case "video/x-flv": case "video/x-flv":
client := flv.NewClient(res.Body) var client *flv.Client
if client, err = flv.NewClient(res.Body); err != nil {
return nil, err
}
if err = client.Describe(); err != nil { if err = client.Describe(); err != nil {
return nil, err return nil, err
} }

View File

@@ -1,13 +1,15 @@
package rtmp package rtmp
import ( import (
"io"
"net/http"
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/AlexxIT/go2rtc/pkg/rtmp" "github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"io"
"net/http"
) )
func Init() { func Init() {
@@ -17,14 +19,14 @@ func Init() {
} }
func streamsHandle(url string) (core.Producer, error) { func streamsHandle(url string) (core.Producer, error) {
conn := rtmp.NewClient(url) client, err := rtmp.Dial(url)
if err := conn.Dial(); err != nil { if err != nil {
return nil, err return nil, err
} }
if err := conn.Describe(); err != nil { if err = client.Describe(); err != nil {
return nil, err return nil, err
} }
return conn, nil return client, nil
} }
func apiHandle(w http.ResponseWriter, r *http.Request) { func apiHandle(w http.ResponseWriter, r *http.Request) {
@@ -40,8 +42,7 @@ func apiHandle(w http.ResponseWriter, r *http.Request) {
return return
} }
res := &http.Response{Body: r.Body, Request: r} client, err := flv.NewClient(r.Body)
client, err := rtmp.Accept(res)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@@ -11,6 +11,7 @@ import (
) )
const ( const (
BufferSize = 64 * 1024 // 64K
ConnDialTimeout = time.Second * 3 ConnDialTimeout = time.Second * 3
ConnDeadline = time.Second * 3 ConnDeadline = time.Second * 3
ProbeTimeout = time.Second * 3 ProbeTimeout = time.Second * 3

View File

@@ -12,9 +12,9 @@ import (
) )
type Client struct { type Client struct {
URL string Transport
rd io.Reader URL string
medias []*core.Media medias []*core.Media
receivers []*core.Receiver receivers []*core.Receiver
@@ -22,15 +22,15 @@ type Client struct {
recv int recv int
} }
func NewClient(rd io.Reader) *Client { func NewClient(rd io.Reader) (*Client, error) {
return &Client{rd: rd} tr, err := NewTransport(rd)
if err != nil {
return nil, err
}
return &Client{Transport: tr}, nil
} }
func (c *Client) Describe() error { func (c *Client) Describe() error {
if err := c.ReadHeader(); err != nil {
return err
}
// Normal software sends: // Normal software sends:
// 1. Video/audio flag in header // 1. Video/audio flag in header
// 2. MetaData as first tag (with video/audio codec info) // 2. MetaData as first tag (with video/audio codec info)
@@ -45,7 +45,7 @@ func (c *Client) Describe() error {
timeout := time.Now().Add(core.ProbeTimeout) timeout := time.Now().Add(core.ProbeTimeout)
for (waitVideo || waitAudio) && time.Now().Before(timeout) { for (waitVideo || waitAudio) && time.Now().Before(timeout) {
tagType, _, b, err := c.ReadTag() tagType, _, b, err := c.Transport.ReadTag()
if err != nil { if err != nil {
return err return err
} }
@@ -123,7 +123,7 @@ func (c *Client) Play() error {
video, audio := core.VA(c.receivers) video, audio := core.VA(c.receivers)
for { for {
tagType, timeMS, b, err := c.ReadTag() tagType, timeMS, b, err := c.Transport.ReadTag()
if err != nil { if err != nil {
return err return err
} }

View File

@@ -15,28 +15,25 @@ const (
CodecAVC = 7 CodecAVC = 7
) )
func (c *Client) ReadHeader() error { // Transport - it is recommended to implement io.Closer
b := make([]byte, 9) type Transport interface {
if _, err := io.ReadFull(c.rd, b); err != nil { ReadTag() (byte, uint32, []byte, error)
return err
} }
if string(b[:3]) != "FLV" { // NewTransport - it is recommended to use bufio.Reader
return errors.New("flv: wrong header") func NewTransport(rd io.Reader) (Transport, error) {
c := &flv{rd: rd}
if err := c.readHeader(); err != nil {
return nil, err
}
return c, nil
} }
_ = b[4] // flags (skip because unsupported by Reolink cameras) type flv struct {
rd io.Reader
if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 {
if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil {
return err
}
} }
return nil func (c *flv) ReadTag() (byte, uint32, []byte, error) {
}
func (c *Client) ReadTag() (byte, uint32, []byte, error) {
// https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf // https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf
b := make([]byte, 4+11) b := make([]byte, 4+11)
if _, err := io.ReadFull(c.rd, b); err != nil { if _, err := io.ReadFull(c.rd, b); err != nil {
@@ -57,6 +54,34 @@ func (c *Client) ReadTag() (byte, uint32, []byte, error) {
return tagType, timeMS, b, nil return tagType, timeMS, b, nil
} }
func (c *flv) Close() error {
if closer, ok := c.rd.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (c *flv) readHeader() error {
b := make([]byte, 9)
if _, err := io.ReadFull(c.rd, b); err != nil {
return err
}
if string(b[:3]) != "FLV" {
return errors.New("flv: wrong header")
}
_ = b[4] // flags (skip because unsupported by Reolink cameras)
if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 {
if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil {
return err
}
}
return nil
}
func TimeToRTP(timeMS uint32, clockRate uint32) uint32 { func TimeToRTP(timeMS uint32, clockRate uint32) uint32 {
return timeMS * clockRate / 1000 return timeMS * clockRate / 1000
} }

View File

@@ -27,7 +27,7 @@ func (c *Client) Start() error {
} }
func (c *Client) Stop() error { func (c *Client) Stop() error {
if closer, ok := c.rd.(io.Closer); ok { if closer, ok := c.Transport.(io.Closer); ok {
return closer.Close() return closer.Close()
} }
return nil return nil

View File

@@ -1,156 +1,50 @@
package rtmp package rtmp
import ( import (
"encoding/base64" "bufio"
"encoding/hex" "net"
"fmt" "net/url"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/httpflv" "github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtmp"
"github.com/pion/rtp"
"net/http"
"time"
) )
// Conn for RTMP and RTMPT (flv over HTTP) func Dial(rawURL string) (*flv.Client, error) {
type Conn interface { u, err := url.Parse(rawURL)
Streams() (streams []av.CodecData, err error)
ReadPacket() (pkt av.Packet, err error)
Close() (err error)
}
type Client struct {
core.Listener
URI string
medias []*core.Media
receivers []*core.Receiver
conn Conn
closed bool
recv int
}
func NewClient(uri string) *Client {
return &Client{URI: uri}
}
func (c *Client) Dial() (err error) {
c.conn, err = rtmp.Dial(c.URI)
return
}
// Accept - convert http.Response to Client
func Accept(res *http.Response) (*Client, error) {
conn, err := httpflv.Accept(res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Client{URI: res.Request.URL.String(), conn: conn}, nil
host := u.Host
if strings.IndexByte(host, ':') < 0 {
host += ":1935"
} }
func (c *Client) Describe() (err error) { conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout)
// important to get SPS/PPS
streams, err := c.conn.Streams()
if err != nil { if err != nil {
return return nil, err
} }
for _, stream := range streams { tr := &rtmp{
switch stream.Type() { url: rawURL,
case av.H264: conn: conn,
info := stream.(h264parser.CodecData).RecordInfo rd: bufio.NewReaderSize(conn, core.BufferSize),
fmtp := fmt.Sprintf(
"profile-level-id=%02X%02X%02X;sprop-parameter-sets=%s,%s",
info.AVCProfileIndication, info.ProfileCompatibility, info.AVCLevelIndication,
base64.StdEncoding.EncodeToString(info.SPS[0]),
base64.StdEncoding.EncodeToString(info.PPS[0]),
)
codec := &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
FmtpLine: fmtp,
PayloadType: core.PayloadTypeRAW,
} }
media := &core.Media{ if args := strings.Split(u.Path, "/"); len(args) >= 2 {
Kind: core.KindVideo, tr.app = args[1]
Direction: core.DirectionRecvonly, if len(args) >= 3 {
Codecs: []*core.Codec{codec}, tr.stream = args[2]
if u.RawQuery != "" {
tr.stream += "?" + u.RawQuery
} }
c.medias = append(c.medias, media)
track := core.NewReceiver(media, codec)
c.receivers = append(c.receivers, track)
case av.AAC:
// TODO: fix support
cd := stream.(aacparser.CodecData)
codec := &core.Codec{
Name: core.CodecAAC,
ClockRate: uint32(cd.Config.SampleRate),
Channels: uint16(cd.Config.ChannelConfig),
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes),
PayloadType: core.PayloadTypeRAW,
}
media := &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
track := core.NewReceiver(media, codec)
c.receivers = append(c.receivers, track)
default:
fmt.Printf("[rtmp] unsupported codec %+v\n", stream)
} }
} }
return if err = tr.init(); err != nil {
return nil, err
} }
func (c *Client) Handle() (err error) { return &flv.Client{Transport: tr, URL: rawURL}, nil
for {
var pkt av.Packet
pkt, err = c.conn.ReadPacket()
if err != nil {
if c.closed {
return nil
}
return
}
c.recv += len(pkt.Data)
track := c.receivers[int(pkt.Idx)]
// convert seconds to RTP timestamp
timestamp := uint32(pkt.Time * time.Duration(track.Codec.ClockRate) / time.Second)
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: timestamp},
Payload: pkt.Data,
}
track.WriteRTP(packet)
}
}
func (c *Client) Close() error {
if c.conn == nil {
return nil
}
c.closed = true
return c.conn.Close()
} }

View File

@@ -1,41 +0,0 @@
package rtmp
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
for _, track := range c.receivers {
if track.Codec == codec {
return track, nil
}
}
return nil, core.ErrCantGetTrack
}
func (c *Client) Start() error {
return c.Handle()
}
func (c *Client) Stop() error {
for _, receiver := range c.receivers {
receiver.Close()
}
return c.Close()
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "RTMP active producer",
URL: c.URI,
Medias: c.medias,
Receivers: c.receivers,
Recv: c.recv,
}
return json.Marshal(info)
}

408
pkg/rtmp/rtmp.go Normal file
View File

@@ -0,0 +1,408 @@
package rtmp
import (
"encoding/binary"
"errors"
"io"
"net"
"github.com/AlexxIT/go2rtc/pkg/flv/amf"
)
const (
MsgSetPacketSize = 1
MsgServerBandwidth = 5
MsgClientBandwidth = 6
MsgCommand = 20
//MsgAck = 3
//MsgControl = 4
//MsgAudioPacket = 8
//MsgVideoPacket = 9
//MsgDataExt = 15
//MsgCommandExt = 17
//MsgData = 18
)
var ErrResponse = errors.New("rtmp: wrong response")
// rtmp - implements flv.Transport
type rtmp struct {
url string
app string
stream string
pktSize uint32
headers map[uint32]*header
conn net.Conn
rd io.Reader
}
type header struct {
msgTime uint32
msgSize uint32
msgType byte
}
func (c *rtmp) ReadTag() (byte, uint32, []byte, error) {
hdrType, sid, err := c.readHeader()
if err != nil {
return 0, 0, nil, err
}
// storing header information for support header type 3
hdr, ok := c.headers[sid]
if !ok {
hdr = &header{}
c.headers[sid] = hdr
}
var b []byte
// https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol#Packet_structure
switch hdrType {
case 0: // 12 byte header (full header)
if b, err = c.readSize(11); err != nil {
return 0, 0, nil, err
}
_ = b[7]
hdr.msgTime = Uint24(b) // timestamp
hdr.msgSize = Uint24(b[3:]) // msgdatalen
hdr.msgType = b[6] // msgtypeid
_ = binary.BigEndian.Uint32(b[7:]) // msgsid
case 1: // 8 bytes - like type b00, not including message ID (4 last bytes)
if b, err = c.readSize(7); err != nil {
return 0, 0, nil, err
}
_ = b[6]
hdr.msgTime = Uint24(b) // timestamp
hdr.msgSize = Uint24(b[3:]) // msgdatalen
hdr.msgType = b[6] // msgtypeid
case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included
if b, err = c.readSize(3); err != nil {
return 0, 0, nil, err
}
hdr.msgTime = Uint24(b) // timestamp
case 3: // 1 byte - only the Basic Header is included
// use here hdr from previous msg with same session ID (sid)
}
timeMS := hdr.msgTime
if timeMS == 0xFFFFFF {
if b, err = c.readSize(4); err != nil {
return 0, 0, nil, err
}
timeMS = binary.BigEndian.Uint32(b)
}
//log.Printf("[rtmp] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType)
// 1. Response zero size
if hdr.msgSize == 0 {
return hdr.msgType, timeMS, nil, nil
}
b = make([]byte, hdr.msgSize)
// 2. Response small packet
if c.pktSize == 0 || hdr.msgSize < c.pktSize {
if _, err = io.ReadFull(c.rd, b); err != nil {
return 0, 0, nil, err
}
return hdr.msgType, timeMS, b, nil
}
// 3. Response big packet
var i0 uint32
for i1 := c.pktSize; i1 < hdr.msgSize; i1 += c.pktSize {
if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil {
return 0, 0, nil, err
}
if _, _, err = c.readHeader(); err != nil {
return 0, 0, nil, err
}
if hdr.msgTime == 0xFFFFFF {
if _, err = c.readSize(4); err != nil {
return 0, 0, nil, err
}
}
i0 = i1
}
if _, err = io.ReadFull(c.rd, b[i0:]); err != nil {
return 0, 0, nil, err
}
return hdr.msgType, timeMS, b, nil
}
func (c *rtmp) Close() error {
return c.conn.Close()
}
func (c *rtmp) init() error {
if err := c.handshake(); err != nil {
return err
}
if err := c.sendConfig(); err != nil {
return err
}
c.headers = map[uint32]*header{}
if err := c.sendConnect(); err != nil {
return err
}
if err := c.sendPlay(); err != nil {
return err
}
return nil
}
func (c *rtmp) handshake() error {
// simple handshake without real random and check response
const randomSize = 4 + 4 + 1528
b := make([]byte, 1+randomSize)
b[0] = 0x03
if _, err := c.conn.Write(b); err != nil {
return err
}
if _, err := io.ReadFull(c.rd, b); err != nil {
return err
}
if b[0] != 3 {
return errors.New("rtmp: wrong handshake")
}
if _, err := c.conn.Write(b[1:]); err != nil {
return err
}
if _, err := io.ReadFull(c.rd, b[1:]); err != nil {
return err
}
return nil
}
func (c *rtmp) sendConfig() error {
b := make([]byte, 5)
binary.BigEndian.PutUint32(b, 65536)
if err := c.sendRequest(MsgSetPacketSize, 0, b[:4]); err != nil {
return err
}
binary.BigEndian.PutUint32(b, 2500000)
if err := c.sendRequest(MsgServerBandwidth, 0, b[:4]); err != nil {
return err
}
binary.BigEndian.PutUint32(b, 10000000) // ack size
b[4] = 2 // limit type
if err := c.sendRequest(MsgClientBandwidth, 0, b); err != nil {
return err
}
return nil
}
func (c *rtmp) sendConnect() error {
msg := amf.AMF{}
msg.WriteString("connect")
msg.WriteNumber(1)
msg.WriteObject(map[string]any{
"app": c.app,
"flashVer": "MAC 32,0,0,0",
"tcUrl": c.url,
"fpad": false, // ?
"capabilities": 15, // ?
"audioCodecs": 4071, // ?
"videoCodecs": 252, // ?
"videoFunction": 1, // ?
})
if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil {
return err
}
s, err := c.waitCode()
if err != nil {
return err
}
if s != "NetConnection.Connect.Success" {
return errors.New("rtmp: wrong code: " + s)
}
return nil
}
func (c *rtmp) sendPlay() error {
msg := amf.NewWriter()
msg.WriteString("createStream")
msg.WriteNumber(2)
msg.WriteNull()
if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil {
return err
}
args, err := c.waitResponse()
if err != nil {
return err
}
if len(args) < 4 {
return ErrResponse
}
sid, _ := args[3].(float64)
msg = amf.NewWriter()
msg.WriteString("play")
msg.WriteNumber(3)
msg.WriteNull()
msg.WriteString(c.stream)
if err = c.sendRequest(MsgCommand, uint32(sid), msg.Bytes()); err != nil {
return err
}
s, err := c.waitCode()
if err != nil {
return err
}
switch s {
case "NetStream.Play.Start", "NetStream.Play.Reset":
return nil
}
return errors.New("rtmp: wrong code: " + s)
}
func (c *rtmp) sendRequest(msgType byte, streamID uint32, payload []byte) error {
n := len(payload)
b := make([]byte, 12+n)
_ = b[12]
switch msgType {
case MsgSetPacketSize, MsgServerBandwidth, MsgClientBandwidth:
b[0] = 0x02 // chunk ID
case MsgCommand:
if streamID == 0 {
b[0] = 0x03 // chunk ID
} else {
b[0] = 0x08 // chunk ID
}
}
//PutUint24(b[1:], 0) // timestamp
PutUint24(b[4:], uint32(n)) // payload size
b[7] = msgType // message type
binary.BigEndian.PutUint32(b[8:], streamID) // message stream ID
copy(b[12:], payload)
if _, err := c.conn.Write(b); err != nil {
return err
}
return nil
}
func (c *rtmp) readHeader() (byte, uint32, error) {
b, err := c.readSize(1)
if err != nil {
return 0, 0, err
}
hdrType := b[0] >> 6
sid := uint32(b[0] & 0b111111)
switch sid {
case 0:
if b, err = c.readSize(1); err != nil {
return 0, 0, err
}
sid = 64 + uint32(b[0])
case 1:
if b, err = c.readSize(2); err != nil {
return 0, 0, err
}
sid = 64 + uint32(binary.BigEndian.Uint16(b))
}
return hdrType, sid, nil
}
func (c *rtmp) readSize(n uint32) ([]byte, error) {
b := make([]byte, n)
if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil {
return nil, err
}
return b, nil
}
func (c *rtmp) waitResponse() ([]any, error) {
for {
msgType, _, b, err := c.ReadTag()
if err != nil {
return nil, err
}
switch msgType {
case MsgSetPacketSize:
c.pktSize = binary.BigEndian.Uint32(b)
case MsgCommand:
return amf.NewReader(b).ReadItems()
}
}
}
func (c *rtmp) waitCode() (string, error) {
args, err := c.waitResponse()
if err != nil {
return "", err
}
if len(args) < 4 {
return "", ErrResponse
}
m, _ := args[3].(map[string]any)
if m == nil {
return "", ErrResponse
}
v, _ := m["code"]
if v == nil {
return "", ErrResponse
}
s, _ := v.(string)
return s, nil
}
func PutUint24(b []byte, v uint32) {
_ = b[2]
b[0] = byte(v >> 16)
b[1] = byte(v >> 8)
b[2] = byte(v)
}
func Uint24(b []byte) uint32 {
_ = b[2]
return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2])
}