Add support RTMPS source

This commit is contained in:
Alexey Khit
2023-08-17 08:00:02 +03:00
parent b016b7dc2a
commit 063a192699
3 changed files with 75 additions and 4 deletions

View File

@@ -14,6 +14,8 @@ import (
func Init() { func Init() {
streams.HandleFunc("rtmp", streamsHandle) streams.HandleFunc("rtmp", streamsHandle)
streams.HandleFunc("rtmps", streamsHandle)
streams.HandleFunc("rtmpx", streamsHandle)
api.HandleFunc("api/stream.flv", apiHandle) api.HandleFunc("api/stream.flv", apiHandle)
} }

View File

@@ -1,6 +1,8 @@
package rtmp package rtmp
import ( import (
"crypto/tls"
"errors"
"net" "net"
"net/url" "net/url"
"strings" "strings"
@@ -15,16 +17,38 @@ func Dial(rawURL string) (core.Producer, error) {
return nil, err return nil, err
} }
host := u.Host var hostname string // without port
if strings.IndexByte(host, ':') < 0 { if i := strings.IndexByte(u.Host, ':'); i > 0 {
host += ":1935" hostname = u.Host[:i]
} else {
hostname = u.Host
u.Host += ":1935"
} }
conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout) conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if u.Scheme != "rtmp" {
var conf *tls.Config
switch {
case u.Scheme == "rtmpx" || net.ParseIP(hostname) != nil:
conf = &tls.Config{InsecureSkipVerify: true}
case u.Scheme == "rtmps":
conf = &tls.Config{ServerName: hostname}
default:
return nil, errors.New("unsupported scheme: " + u.Scheme)
}
tlsConn := tls.Client(conn, conf)
if err = tlsConn.Handshake(); err != nil {
return nil, err
}
conn = tlsConn
}
rd, err := NewReader(u, conn) rd, err := NewReader(u, conn)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -1,11 +1,15 @@
package rtmp package rtmp
import ( import (
"bufio"
"encoding/binary" "encoding/binary"
"errors" "errors"
"io" "io"
"net" "net"
"net/url"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv/amf" "github.com/AlexxIT/go2rtc/pkg/flv/amf"
) )
@@ -40,6 +44,47 @@ type Reader struct {
buf []byte buf []byte
} }
func NewReader(u *url.URL, conn net.Conn) (*Reader, error) {
rd := &Reader{
url: u.String(),
headers: map[uint32]*header{},
conn: conn,
rd: bufio.NewReaderSize(conn, core.BufferSize),
}
if args := strings.Split(u.Path, "/"); len(args) >= 2 {
rd.app = args[1]
if len(args) >= 3 {
rd.stream = args[2]
if u.RawQuery != "" {
rd.stream += "?" + u.RawQuery
}
}
}
if err := rd.handshake(); err != nil {
return nil, err
}
if err := rd.sendConfig(); err != nil {
return nil, err
}
if err := rd.sendConnect(); err != nil {
return nil, err
}
if err := rd.sendPlay(); err != nil {
return nil, err
}
rd.buf = []byte{
'F', 'L', 'V', // signature
1, // version
0, // flags (has video/audio)
0, 0, 0, 9, // header size
}
return rd, nil
}
func (c *Reader) Read(p []byte) (n int, err error) { func (c *Reader) Read(p []byte) (n int, err error) {
// 1. Check temporary tempbuffer // 1. Check temporary tempbuffer
if len(c.buf) == 0 { if len(c.buf) == 0 {