diff --git a/internal/rtmp/rtmp.go b/internal/rtmp/rtmp.go index 1cf7c16b..0da06dbd 100644 --- a/internal/rtmp/rtmp.go +++ b/internal/rtmp/rtmp.go @@ -14,6 +14,8 @@ import ( func Init() { streams.HandleFunc("rtmp", streamsHandle) + streams.HandleFunc("rtmps", streamsHandle) + streams.HandleFunc("rtmpx", streamsHandle) api.HandleFunc("api/stream.flv", apiHandle) } diff --git a/pkg/rtmp/producer.go b/pkg/rtmp/producer.go index 83637480..380fff17 100644 --- a/pkg/rtmp/producer.go +++ b/pkg/rtmp/producer.go @@ -1,6 +1,8 @@ package rtmp import ( + "crypto/tls" + "errors" "net" "net/url" "strings" @@ -15,16 +17,38 @@ func Dial(rawURL string) (core.Producer, error) { return nil, err } - host := u.Host - if strings.IndexByte(host, ':') < 0 { - host += ":1935" + var hostname string // without port + if i := strings.IndexByte(u.Host, ':'); i > 0 { + hostname = u.Host[:i] + } else { + hostname = u.Host + u.Host += ":1935" } - conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout) + conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout) if err != nil { return nil, err } + if u.Scheme != "rtmp" { + var conf *tls.Config + + switch { + case u.Scheme == "rtmpx" || net.ParseIP(hostname) != nil: + conf = &tls.Config{InsecureSkipVerify: true} + case u.Scheme == "rtmps": + conf = &tls.Config{ServerName: hostname} + default: + return nil, errors.New("unsupported scheme: " + u.Scheme) + } + + tlsConn := tls.Client(conn, conf) + if err = tlsConn.Handshake(); err != nil { + return nil, err + } + conn = tlsConn + } + rd, err := NewReader(u, conn) if err != nil { return nil, err diff --git a/pkg/rtmp/reader.go b/pkg/rtmp/reader.go index 465fb27a..6674a9ee 100644 --- a/pkg/rtmp/reader.go +++ b/pkg/rtmp/reader.go @@ -1,11 +1,15 @@ package rtmp import ( + "bufio" "encoding/binary" "errors" "io" "net" + "net/url" + "strings" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/flv/amf" ) @@ -40,6 +44,47 @@ type Reader struct { buf []byte } +func NewReader(u *url.URL, conn net.Conn) (*Reader, error) { + rd := &Reader{ + url: u.String(), + headers: map[uint32]*header{}, + conn: conn, + rd: bufio.NewReaderSize(conn, core.BufferSize), + } + + if args := strings.Split(u.Path, "/"); len(args) >= 2 { + rd.app = args[1] + if len(args) >= 3 { + rd.stream = args[2] + if u.RawQuery != "" { + rd.stream += "?" + u.RawQuery + } + } + } + + if err := rd.handshake(); err != nil { + return nil, err + } + if err := rd.sendConfig(); err != nil { + return nil, err + } + if err := rd.sendConnect(); err != nil { + return nil, err + } + if err := rd.sendPlay(); err != nil { + return nil, err + } + + rd.buf = []byte{ + 'F', 'L', 'V', // signature + 1, // version + 0, // flags (has video/audio) + 0, 0, 0, 9, // header size + } + + return rd, nil +} + func (c *Reader) Read(p []byte) (n int, err error) { // 1. Check temporary tempbuffer if len(c.buf) == 0 {