diff --git a/internal/http/http.go b/internal/http/http.go index 10744cee..31560e6b 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -54,7 +54,10 @@ func handleHTTP(url string) (core.Producer, error) { return multipart.NewClient(res) case "video/x-flv": - client := flv.NewClient(res.Body) + var client *flv.Client + if client, err = flv.NewClient(res.Body); err != nil { + return nil, err + } if err = client.Describe(); err != nil { return nil, err } diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index f84c16be..efbeb0df 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -1,13 +1,15 @@ package rtmp import ( + "io" + "net/http" + "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/rtmp" "github.com/rs/zerolog/log" - "io" - "net/http" ) func Init() { @@ -17,14 +19,14 @@ func Init() { } func streamsHandle(url string) (core.Producer, error) { - conn := rtmp.NewClient(url) - if err := conn.Dial(); err != nil { + client, err := rtmp.Dial(url) + if err != nil { return nil, err } - if err := conn.Describe(); err != nil { + if err = client.Describe(); err != nil { return nil, err } - return conn, nil + return client, nil } func apiHandle(w http.ResponseWriter, r *http.Request) { @@ -40,8 +42,7 @@ func apiHandle(w http.ResponseWriter, r *http.Request) { return } - res := &http.Response{Body: r.Body, Request: r} - client, err := rtmp.Accept(res) + client, err := flv.NewClient(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/core/helpers.go b/pkg/core/helpers.go index 4f0a6e99..98c842e0 100644 --- a/pkg/core/helpers.go +++ b/pkg/core/helpers.go @@ -11,6 +11,7 @@ import ( ) const ( + BufferSize = 64 * 1024 // 64K ConnDialTimeout = time.Second * 3 ConnDeadline = time.Second * 3 ProbeTimeout = time.Second * 3 diff --git a/pkg/flv/client.go b/pkg/flv/client.go index 6a397378..09fbe1fb 100644 --- a/pkg/flv/client.go +++ b/pkg/flv/client.go @@ -12,9 +12,9 @@ import ( ) type Client struct { - URL string + Transport - rd io.Reader + URL string medias []*core.Media receivers []*core.Receiver @@ -22,15 +22,15 @@ type Client struct { recv int } -func NewClient(rd io.Reader) *Client { - return &Client{rd: rd} +func NewClient(rd io.Reader) (*Client, error) { + tr, err := NewTransport(rd) + if err != nil { + return nil, err + } + return &Client{Transport: tr}, nil } func (c *Client) Describe() error { - if err := c.ReadHeader(); err != nil { - return err - } - // Normal software sends: // 1. Video/audio flag in header // 2. MetaData as first tag (with video/audio codec info) @@ -45,7 +45,7 @@ func (c *Client) Describe() error { timeout := time.Now().Add(core.ProbeTimeout) for (waitVideo || waitAudio) && time.Now().Before(timeout) { - tagType, _, b, err := c.ReadTag() + tagType, _, b, err := c.Transport.ReadTag() if err != nil { return err } @@ -123,7 +123,7 @@ func (c *Client) Play() error { video, audio := core.VA(c.receivers) for { - tagType, timeMS, b, err := c.ReadTag() + tagType, timeMS, b, err := c.Transport.ReadTag() if err != nil { return err } diff --git a/pkg/flv/flv.go b/pkg/flv/flv.go index 641bf946..9557334b 100644 --- a/pkg/flv/flv.go +++ b/pkg/flv/flv.go @@ -15,28 +15,25 @@ const ( CodecAVC = 7 ) -func (c *Client) ReadHeader() error { - b := make([]byte, 9) - if _, err := io.ReadFull(c.rd, b); err != nil { - return err - } - - if string(b[:3]) != "FLV" { - return errors.New("flv: wrong header") - } - - _ = b[4] // flags (skip because unsupported by Reolink cameras) - - if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 { - if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil { - return err - } - } - - return nil +// Transport - it is recommended to implement io.Closer +type Transport interface { + ReadTag() (byte, uint32, []byte, error) } -func (c *Client) ReadTag() (byte, uint32, []byte, error) { +// NewTransport - it is recommended to use bufio.Reader +func NewTransport(rd io.Reader) (Transport, error) { + c := &flv{rd: rd} + if err := c.readHeader(); err != nil { + return nil, err + } + return c, nil +} + +type flv struct { + rd io.Reader +} + +func (c *flv) ReadTag() (byte, uint32, []byte, error) { // https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf b := make([]byte, 4+11) if _, err := io.ReadFull(c.rd, b); err != nil { @@ -57,6 +54,34 @@ func (c *Client) ReadTag() (byte, uint32, []byte, error) { return tagType, timeMS, b, nil } +func (c *flv) Close() error { + if closer, ok := c.rd.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func (c *flv) readHeader() error { + b := make([]byte, 9) + if _, err := io.ReadFull(c.rd, b); err != nil { + return err + } + + if string(b[:3]) != "FLV" { + return errors.New("flv: wrong header") + } + + _ = b[4] // flags (skip because unsupported by Reolink cameras) + + if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 { + if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil { + return err + } + } + + return nil +} + func TimeToRTP(timeMS uint32, clockRate uint32) uint32 { return timeMS * clockRate / 1000 } diff --git a/pkg/flv/producer.go b/pkg/flv/producer.go index 6d04d2db..efd19d8e 100644 --- a/pkg/flv/producer.go +++ b/pkg/flv/producer.go @@ -27,7 +27,7 @@ func (c *Client) Start() error { } func (c *Client) Stop() error { - if closer, ok := c.rd.(io.Closer); ok { + if closer, ok := c.Transport.(io.Closer); ok { return closer.Close() } return nil diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index dc3ed64d..dd7926d8 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -1,156 +1,50 @@ package rtmp import ( - "encoding/base64" - "encoding/hex" - "fmt" + "bufio" + "net" + "net/url" + "strings" + "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/httpflv" - "github.com/deepch/vdk/av" - "github.com/deepch/vdk/codec/aacparser" - "github.com/deepch/vdk/codec/h264parser" - "github.com/deepch/vdk/format/rtmp" - "github.com/pion/rtp" - "net/http" - "time" + "github.com/AlexxIT/go2rtc/pkg/flv" ) -// Conn for RTMP and RTMPT (flv over HTTP) -type Conn interface { - Streams() (streams []av.CodecData, err error) - ReadPacket() (pkt av.Packet, err error) - Close() (err error) -} - -type Client struct { - core.Listener - - URI string - - medias []*core.Media - receivers []*core.Receiver - - conn Conn - closed bool - - recv int -} - -func NewClient(uri string) *Client { - return &Client{URI: uri} -} - -func (c *Client) Dial() (err error) { - c.conn, err = rtmp.Dial(c.URI) - return -} - -// Accept - convert http.Response to Client -func Accept(res *http.Response) (*Client, error) { - conn, err := httpflv.Accept(res) +func Dial(rawURL string) (*flv.Client, error) { + u, err := url.Parse(rawURL) if err != nil { return nil, err } - return &Client{URI: res.Request.URL.String(), conn: conn}, nil -} -func (c *Client) Describe() (err error) { - // important to get SPS/PPS - streams, err := c.conn.Streams() + host := u.Host + if strings.IndexByte(host, ':') < 0 { + host += ":1935" + } + + conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout) if err != nil { - return + return nil, err } - for _, stream := range streams { - switch stream.Type() { - case av.H264: - info := stream.(h264parser.CodecData).RecordInfo + tr := &rtmp{ + url: rawURL, + conn: conn, + rd: bufio.NewReaderSize(conn, core.BufferSize), + } - fmtp := fmt.Sprintf( - "profile-level-id=%02X%02X%02X;sprop-parameter-sets=%s,%s", - info.AVCProfileIndication, info.ProfileCompatibility, info.AVCLevelIndication, - base64.StdEncoding.EncodeToString(info.SPS[0]), - base64.StdEncoding.EncodeToString(info.PPS[0]), - ) - - codec := &core.Codec{ - Name: core.CodecH264, - ClockRate: 90000, - FmtpLine: fmtp, - PayloadType: core.PayloadTypeRAW, + if args := strings.Split(u.Path, "/"); len(args) >= 2 { + tr.app = args[1] + if len(args) >= 3 { + tr.stream = args[2] + if u.RawQuery != "" { + tr.stream += "?" + u.RawQuery } - - media := &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - c.medias = append(c.medias, media) - - track := core.NewReceiver(media, codec) - c.receivers = append(c.receivers, track) - - case av.AAC: - // TODO: fix support - cd := stream.(aacparser.CodecData) - - codec := &core.Codec{ - Name: core.CodecAAC, - ClockRate: uint32(cd.Config.SampleRate), - Channels: uint16(cd.Config.ChannelConfig), - // a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588 - FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes), - PayloadType: core.PayloadTypeRAW, - } - - media := &core.Media{ - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - c.medias = append(c.medias, media) - - track := core.NewReceiver(media, codec) - c.receivers = append(c.receivers, track) - - default: - fmt.Printf("[rtmp] unsupported codec %+v\n", stream) } } - return -} - -func (c *Client) Handle() (err error) { - for { - var pkt av.Packet - pkt, err = c.conn.ReadPacket() - if err != nil { - if c.closed { - return nil - } - return - } - - c.recv += len(pkt.Data) - - track := c.receivers[int(pkt.Idx)] - - // convert seconds to RTP timestamp - timestamp := uint32(pkt.Time * time.Duration(track.Codec.ClockRate) / time.Second) - - packet := &rtp.Packet{ - Header: rtp.Header{Timestamp: timestamp}, - Payload: pkt.Data, - } - track.WriteRTP(packet) + if err = tr.init(); err != nil { + return nil, err } -} -func (c *Client) Close() error { - if c.conn == nil { - return nil - } - c.closed = true - return c.conn.Close() + return &flv.Client{Transport: tr, URL: rawURL}, nil } diff --git a/pkg/rtmp/producer.go b/pkg/rtmp/producer.go deleted file mode 100644 index c74eb586..00000000 --- a/pkg/rtmp/producer.go +++ /dev/null @@ -1,41 +0,0 @@ -package rtmp - -import ( - "encoding/json" - "github.com/AlexxIT/go2rtc/pkg/core" -) - -func (c *Client) GetMedias() []*core.Media { - return c.medias -} - -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - for _, track := range c.receivers { - if track.Codec == codec { - return track, nil - } - } - return nil, core.ErrCantGetTrack -} - -func (c *Client) Start() error { - return c.Handle() -} - -func (c *Client) Stop() error { - for _, receiver := range c.receivers { - receiver.Close() - } - return c.Close() -} - -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "RTMP active producer", - URL: c.URI, - Medias: c.medias, - Receivers: c.receivers, - Recv: c.recv, - } - return json.Marshal(info) -} diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go new file mode 100644 index 00000000..b91516f4 --- /dev/null +++ b/pkg/rtmp/rtmp.go @@ -0,0 +1,408 @@ +package rtmp + +import ( + "encoding/binary" + "errors" + "io" + "net" + + "github.com/AlexxIT/go2rtc/pkg/flv/amf" +) + +const ( + MsgSetPacketSize = 1 + MsgServerBandwidth = 5 + MsgClientBandwidth = 6 + MsgCommand = 20 + + //MsgAck = 3 + //MsgControl = 4 + //MsgAudioPacket = 8 + //MsgVideoPacket = 9 + //MsgDataExt = 15 + //MsgCommandExt = 17 + //MsgData = 18 +) + +var ErrResponse = errors.New("rtmp: wrong response") + +// rtmp - implements flv.Transport +type rtmp struct { + url string + app string + stream string + pktSize uint32 + + headers map[uint32]*header + + conn net.Conn + rd io.Reader +} + +type header struct { + msgTime uint32 + msgSize uint32 + msgType byte +} + +func (c *rtmp) ReadTag() (byte, uint32, []byte, error) { + hdrType, sid, err := c.readHeader() + if err != nil { + return 0, 0, nil, err + } + + // storing header information for support header type 3 + hdr, ok := c.headers[sid] + if !ok { + hdr = &header{} + c.headers[sid] = hdr + } + + var b []byte + + // https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol#Packet_structure + switch hdrType { + case 0: // 12 byte header (full header) + if b, err = c.readSize(11); err != nil { + return 0, 0, nil, err + } + _ = b[7] + hdr.msgTime = Uint24(b) // timestamp + hdr.msgSize = Uint24(b[3:]) // msgdatalen + hdr.msgType = b[6] // msgtypeid + _ = binary.BigEndian.Uint32(b[7:]) // msgsid + + case 1: // 8 bytes - like type b00, not including message ID (4 last bytes) + if b, err = c.readSize(7); err != nil { + return 0, 0, nil, err + } + _ = b[6] + hdr.msgTime = Uint24(b) // timestamp + hdr.msgSize = Uint24(b[3:]) // msgdatalen + hdr.msgType = b[6] // msgtypeid + + case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included + if b, err = c.readSize(3); err != nil { + return 0, 0, nil, err + } + hdr.msgTime = Uint24(b) // timestamp + + case 3: // 1 byte - only the Basic Header is included + // use here hdr from previous msg with same session ID (sid) + } + + timeMS := hdr.msgTime + if timeMS == 0xFFFFFF { + if b, err = c.readSize(4); err != nil { + return 0, 0, nil, err + } + timeMS = binary.BigEndian.Uint32(b) + } + + //log.Printf("[rtmp] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType) + + // 1. Response zero size + if hdr.msgSize == 0 { + return hdr.msgType, timeMS, nil, nil + } + + b = make([]byte, hdr.msgSize) + + // 2. Response small packet + if c.pktSize == 0 || hdr.msgSize < c.pktSize { + if _, err = io.ReadFull(c.rd, b); err != nil { + return 0, 0, nil, err + } + return hdr.msgType, timeMS, b, nil + } + + // 3. Response big packet + var i0 uint32 + for i1 := c.pktSize; i1 < hdr.msgSize; i1 += c.pktSize { + if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil { + return 0, 0, nil, err + } + + if _, _, err = c.readHeader(); err != nil { + return 0, 0, nil, err + } + + if hdr.msgTime == 0xFFFFFF { + if _, err = c.readSize(4); err != nil { + return 0, 0, nil, err + } + } + + i0 = i1 + } + + if _, err = io.ReadFull(c.rd, b[i0:]); err != nil { + return 0, 0, nil, err + } + + return hdr.msgType, timeMS, b, nil +} + +func (c *rtmp) Close() error { + return c.conn.Close() +} + +func (c *rtmp) init() error { + if err := c.handshake(); err != nil { + return err + } + if err := c.sendConfig(); err != nil { + return err + } + + c.headers = map[uint32]*header{} + + if err := c.sendConnect(); err != nil { + return err + } + if err := c.sendPlay(); err != nil { + return err + } + + return nil +} + +func (c *rtmp) handshake() error { + // simple handshake without real random and check response + const randomSize = 4 + 4 + 1528 + + b := make([]byte, 1+randomSize) + b[0] = 0x03 + if _, err := c.conn.Write(b); err != nil { + return err + } + + if _, err := io.ReadFull(c.rd, b); err != nil { + return err + } + + if b[0] != 3 { + return errors.New("rtmp: wrong handshake") + } + + if _, err := c.conn.Write(b[1:]); err != nil { + return err + } + + if _, err := io.ReadFull(c.rd, b[1:]); err != nil { + return err + } + + return nil +} + +func (c *rtmp) sendConfig() error { + b := make([]byte, 5) + binary.BigEndian.PutUint32(b, 65536) + if err := c.sendRequest(MsgSetPacketSize, 0, b[:4]); err != nil { + return err + } + + binary.BigEndian.PutUint32(b, 2500000) + if err := c.sendRequest(MsgServerBandwidth, 0, b[:4]); err != nil { + return err + } + + binary.BigEndian.PutUint32(b, 10000000) // ack size + b[4] = 2 // limit type + if err := c.sendRequest(MsgClientBandwidth, 0, b); err != nil { + return err + } + + return nil +} + +func (c *rtmp) sendConnect() error { + msg := amf.AMF{} + msg.WriteString("connect") + msg.WriteNumber(1) + msg.WriteObject(map[string]any{ + "app": c.app, + "flashVer": "MAC 32,0,0,0", + "tcUrl": c.url, + "fpad": false, // ? + "capabilities": 15, // ? + "audioCodecs": 4071, // ? + "videoCodecs": 252, // ? + "videoFunction": 1, // ? + }) + + if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil { + return err + } + + s, err := c.waitCode() + if err != nil { + return err + } + + if s != "NetConnection.Connect.Success" { + return errors.New("rtmp: wrong code: " + s) + } + + return nil +} + +func (c *rtmp) sendPlay() error { + msg := amf.NewWriter() + msg.WriteString("createStream") + msg.WriteNumber(2) + msg.WriteNull() + + if err := c.sendRequest(MsgCommand, 0, msg.Bytes()); err != nil { + return err + } + + args, err := c.waitResponse() + if err != nil { + return err + } + + if len(args) < 4 { + return ErrResponse + } + + sid, _ := args[3].(float64) + + msg = amf.NewWriter() + msg.WriteString("play") + msg.WriteNumber(3) + msg.WriteNull() + msg.WriteString(c.stream) + + if err = c.sendRequest(MsgCommand, uint32(sid), msg.Bytes()); err != nil { + return err + } + + s, err := c.waitCode() + if err != nil { + return err + } + + switch s { + case "NetStream.Play.Start", "NetStream.Play.Reset": + return nil + } + + return errors.New("rtmp: wrong code: " + s) +} + +func (c *rtmp) sendRequest(msgType byte, streamID uint32, payload []byte) error { + n := len(payload) + b := make([]byte, 12+n) + _ = b[12] + + switch msgType { + case MsgSetPacketSize, MsgServerBandwidth, MsgClientBandwidth: + b[0] = 0x02 // chunk ID + case MsgCommand: + if streamID == 0 { + b[0] = 0x03 // chunk ID + } else { + b[0] = 0x08 // chunk ID + } + } + + //PutUint24(b[1:], 0) // timestamp + PutUint24(b[4:], uint32(n)) // payload size + b[7] = msgType // message type + binary.BigEndian.PutUint32(b[8:], streamID) // message stream ID + copy(b[12:], payload) + + if _, err := c.conn.Write(b); err != nil { + return err + } + + return nil +} + +func (c *rtmp) readHeader() (byte, uint32, error) { + b, err := c.readSize(1) + if err != nil { + return 0, 0, err + } + + hdrType := b[0] >> 6 + sid := uint32(b[0] & 0b111111) + + switch sid { + case 0: + if b, err = c.readSize(1); err != nil { + return 0, 0, err + } + sid = 64 + uint32(b[0]) + case 1: + if b, err = c.readSize(2); err != nil { + return 0, 0, err + } + sid = 64 + uint32(binary.BigEndian.Uint16(b)) + } + + return hdrType, sid, nil +} + +func (c *rtmp) readSize(n uint32) ([]byte, error) { + b := make([]byte, n) + if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil { + return nil, err + } + return b, nil +} + +func (c *rtmp) waitResponse() ([]any, error) { + for { + msgType, _, b, err := c.ReadTag() + if err != nil { + return nil, err + } + + switch msgType { + case MsgSetPacketSize: + c.pktSize = binary.BigEndian.Uint32(b) + + case MsgCommand: + return amf.NewReader(b).ReadItems() + } + } +} + +func (c *rtmp) waitCode() (string, error) { + args, err := c.waitResponse() + if err != nil { + return "", err + } + + if len(args) < 4 { + return "", ErrResponse + } + + m, _ := args[3].(map[string]any) + if m == nil { + return "", ErrResponse + } + + v, _ := m["code"] + if v == nil { + return "", ErrResponse + } + + s, _ := v.(string) + return s, nil +} + +func PutUint24(b []byte, v uint32) { + _ = b[2] + b[0] = byte(v >> 16) + b[1] = byte(v >> 8) + b[2] = byte(v) +} + +func Uint24(b []byte) uint32 { + _ = b[2] + return uint32(b[0])<<16 | uint32(b[1])<<8 | uint32(b[2]) +}